diff --git a/.bzrignore b/.bzrignore index 7e1a3545374..6f5cd2de56a 100644 --- a/.bzrignore +++ b/.bzrignore @@ -800,3 +800,4 @@ vio/test-sslclient vio/test-sslserver vio/viotest-ssl libmysqld/sql_view.cc +libmysqld/examples/client_test.cc diff --git a/include/mysql.h b/include/mysql.h index 0f3fdc90548..2af1c657aeb 100644 --- a/include/mysql.h +++ b/include/mysql.h @@ -580,6 +580,12 @@ typedef struct st_mysql_stmt int (*read_row_func)(struct st_mysql_stmt *stmt, unsigned char **row); unsigned long stmt_id; /* Id for prepared statement */ + unsigned long flags; /* i.e. type of cursor to open */ + /* + Copied from mysql->server_status after execute/fetch to know + server-side cursor status for this statement. + */ + unsigned int server_status; unsigned int last_errno; /* error code */ unsigned int param_count; /* inpute parameters count */ unsigned int field_count; /* number of columns in result set */ @@ -608,7 +614,12 @@ enum enum_stmt_attr_type In the new API we do that only by request because it slows down mysql_stmt_store_result sufficiently. */ - STMT_ATTR_UPDATE_MAX_LENGTH + STMT_ATTR_UPDATE_MAX_LENGTH, + /* + unsigned long with combination of cursor flags (read only, for update, + etc) + */ + STMT_ATTR_CURSOR_TYPE }; diff --git a/include/mysql_com.h b/include/mysql_com.h index 01f26399953..fa73895000c 100644 --- a/include/mysql_com.h +++ b/include/mysql_com.h @@ -49,7 +49,7 @@ enum enum_server_command COM_TIME, COM_DELAYED_INSERT, COM_CHANGE_USER, COM_BINLOG_DUMP, COM_TABLE_DUMP, COM_CONNECT_OUT, COM_REGISTER_SLAVE, COM_PREPARE, COM_EXECUTE, COM_LONG_DATA, COM_CLOSE_STMT, - COM_RESET_STMT, COM_SET_OPTION, + COM_RESET_STMT, COM_SET_OPTION, COM_FETCH, COM_END /* Must be last */ }; @@ -132,6 +132,17 @@ enum enum_server_command #define SERVER_MORE_RESULTS_EXISTS 8 /* Multi query - next query exists */ #define SERVER_QUERY_NO_GOOD_INDEX_USED 16 #define SERVER_QUERY_NO_INDEX_USED 32 +/* + The server was able to fulfill client request and open read-only + non-scrollable cursor for the query. This flag comes in server + status with reply to COM_EXECUTE and COM_EXECUTE_DIRECT commands. +*/ +#define SERVER_STATUS_CURSOR_EXISTS 64 +/* + This flag is sent with last row of read-only cursor, in reply to + COM_FETCH command. +*/ +#define SERVER_STATUS_LAST_ROW_SENT 128 #define MYSQL_ERRMSG_SIZE 512 #define NET_READ_TIMEOUT 30 /* Timeout on read */ @@ -257,6 +268,16 @@ enum enum_shutdown_level { KILL_CONNECTION= 255 }; + +enum enum_cursor_type +{ + CURSOR_TYPE_NO_CURSOR= 0, + CURSOR_TYPE_READ_ONLY= 1, + CURSOR_TYPE_FOR_UPDATE= 2, + CURSOR_TYPE_SCROLLABLE= 4 +}; + + /* options for mysql_set_option */ enum enum_mysql_set_option { diff --git a/libmysql/libmysql.c b/libmysql/libmysql.c index fc7728c98e0..2dd35809a7b 100644 --- a/libmysql/libmysql.c +++ b/libmysql/libmysql.c @@ -1668,6 +1668,7 @@ myodbc_remove_escape(MYSQL *mysql,char *name) static int stmt_read_row_unbuffered(MYSQL_STMT *stmt, unsigned char **row); static int stmt_read_row_buffered(MYSQL_STMT *stmt, unsigned char **row); +static int stmt_read_row_from_cursor(MYSQL_STMT *stmt, unsigned char **row); static int stmt_read_row_no_data(MYSQL_STMT *stmt, unsigned char **row); /* @@ -2387,7 +2388,7 @@ static my_bool execute(MYSQL_STMT *stmt, char *packet, ulong length) mysql->last_used_con= mysql; int4store(buff, stmt->stmt_id); /* Send stmt id to server */ - buff[4]= (char) 0; /* no flags */ + buff[4]= (char) stmt->flags; int4store(buff+5, 1); /* iteration count */ if (cli_advanced_command(mysql, COM_EXECUTE, buff, sizeof(buff), packet, length, 1) || @@ -2397,6 +2398,7 @@ static my_bool execute(MYSQL_STMT *stmt, char *packet, ulong length) DBUG_RETURN(1); } stmt->affected_rows= mysql->affected_rows; + stmt->server_status= mysql->server_status; stmt->insert_id= mysql->insert_id; DBUG_RETURN(0); } @@ -2552,6 +2554,59 @@ error: return rc; } + +/* + Fetch statement row using server side cursor. + + SYNOPSIS + stmt_read_row_from_cursor() + + RETURN VALUE + 0 success + 1 error + MYSQL_NO_DATA end of data +*/ + +static int +stmt_read_row_from_cursor(MYSQL_STMT *stmt, unsigned char **row) +{ + if (stmt->data_cursor) + return stmt_read_row_buffered(stmt, row); + if (stmt->server_status & SERVER_STATUS_LAST_ROW_SENT) + stmt->server_status &= ~SERVER_STATUS_LAST_ROW_SENT; + else + { + MYSQL *mysql= stmt->mysql; + NET *net= &mysql->net; + MYSQL_DATA *result= &stmt->result; + char buff[4 /* statement id */ + + 4 /* number of rows to fetch */]; + + free_root(&result->alloc, MYF(MY_KEEP_PREALLOC)); + result->data= NULL; + result->rows= 0; + /* Send row request to the server */ + int4store(buff, stmt->stmt_id); + int4store(buff + 4, 1); /* number of rows to fetch */ + if (cli_advanced_command(mysql, COM_FETCH, buff, sizeof(buff), + NullS, 0, 1)) + { + set_stmt_errmsg(stmt, net->last_error, net->last_errno, net->sqlstate); + return 1; + } + stmt->server_status= mysql->server_status; + if (cli_read_binary_rows(stmt)) + return 1; + stmt->server_status= mysql->server_status; + + stmt->data_cursor= result->data; + return stmt_read_row_buffered(stmt, row); + } + *row= 0; + return MYSQL_NO_DATA; +} + + /* Default read row function to not SIGSEGV in client in case of wrong sequence of API calls. @@ -2593,6 +2648,9 @@ my_bool STDCALL mysql_stmt_attr_set(MYSQL_STMT *stmt, case STMT_ATTR_UPDATE_MAX_LENGTH: stmt->update_max_length= value ? *(const my_bool*) value : 0; break; + case STMT_ATTR_CURSOR_TYPE: + stmt->flags= value ? *(const unsigned long *) value : 0; + break; default: return TRUE; } @@ -2608,6 +2666,9 @@ my_bool STDCALL mysql_stmt_attr_get(MYSQL_STMT *stmt, case STMT_ATTR_UPDATE_MAX_LENGTH: *(unsigned long *) value= stmt->update_max_length; break; + case STMT_ATTR_CURSOR_TYPE: + *(unsigned long *) value= stmt->flags; + break; default: return TRUE; } @@ -2711,9 +2772,17 @@ int STDCALL mysql_stmt_execute(MYSQL_STMT *stmt) stmt->state= MYSQL_STMT_EXECUTE_DONE; if (stmt->field_count) { - stmt->mysql->unbuffered_fetch_owner= &stmt->unbuffered_fetch_cancelled; - stmt->unbuffered_fetch_cancelled= FALSE; - stmt->read_row_func= stmt_read_row_unbuffered; + if (stmt->server_status & SERVER_STATUS_CURSOR_EXISTS) + { + mysql->status= MYSQL_STATUS_READY; + stmt->read_row_func= stmt_read_row_from_cursor; + } + else + { + stmt->mysql->unbuffered_fetch_owner= &stmt->unbuffered_fetch_cancelled; + stmt->unbuffered_fetch_cancelled= FALSE; + stmt->read_row_func= stmt_read_row_unbuffered; + } } DBUG_RETURN(0); } diff --git a/libmysqld/examples/Makefile.am b/libmysqld/examples/Makefile.am index b3db54d305a..7e32df82e80 100644 --- a/libmysqld/examples/Makefile.am +++ b/libmysqld/examples/Makefile.am @@ -27,7 +27,7 @@ mysql_SOURCES = mysql.cc readline.cc completion_hash.cc \ mysql_LDADD = @readline_link@ @TERMCAP_LIB@ $(LDADD) client_test_LINK = $(CXXLINK) -client_test_SOURCES = client_test.c +client_test_SOURCES = client_test.cc clean: rm -f $(client_sources) diff --git a/libmysqld/lib_sql.cc b/libmysqld/lib_sql.cc index 0adf9aeb86a..1f37b115f6c 100644 --- a/libmysqld/lib_sql.cc +++ b/libmysqld/lib_sql.cc @@ -568,7 +568,7 @@ err: C_MODE_END -bool Protocol::send_fields(List *list, uint flag) +bool Protocol::send_fields(List *list, uint flags) { List_iterator_fast it(*list); Item *item; @@ -615,7 +615,7 @@ bool Protocol::send_fields(List *list, uint flag) if (INTERNAL_NUM_FIELD(client_field)) client_field->flags|= NUM_FLAG; - if (flag & 2) + if (flags & Protocol::SEND_DEFAULTS) { char buff[80]; String tmp(buff, sizeof(buff), default_charset_info), *res; diff --git a/sql/ha_innodb.cc b/sql/ha_innodb.cc index fdaf5198b7f..700b8fafe19 100644 --- a/sql/ha_innodb.cc +++ b/sql/ha_innodb.cc @@ -4891,7 +4891,8 @@ innodb_show_status( field_list.push_back(new Item_empty_string("Status", flen)); - if (protocol->send_fields(&field_list, 1)) { + if (protocol->send_fields(&field_list, Protocol::SEND_NUM_ROWS | + Protocol::SEND_EOF)) { my_free(str, MYF(0)); diff --git a/sql/mysql_priv.h b/sql/mysql_priv.h index b0acf64c4ff..bae1dcf5ecf 100644 --- a/sql/mysql_priv.h +++ b/sql/mysql_priv.h @@ -694,6 +694,7 @@ int mysql_stmt_prepare(THD *thd, char *packet, uint packet_length, LEX_STRING *name=NULL); void mysql_stmt_execute(THD *thd, char *packet, uint packet_length); void mysql_sql_stmt_execute(THD *thd, LEX_STRING *stmt_name); +void mysql_stmt_fetch(THD *thd, char *packet, uint packet_length); void mysql_stmt_free(THD *thd, char *packet); void mysql_stmt_reset(THD *thd, char *packet); void mysql_stmt_get_longdata(THD *thd, char *pos, ulong packet_length); diff --git a/sql/protocol.cc b/sql/protocol.cc index 065fcd3d4af..75383001014 100644 --- a/sql/protocol.cc +++ b/sql/protocol.cc @@ -487,6 +487,7 @@ void Protocol::init(THD *thd_arg) flag Bit mask with the following functions: 1 send number of rows 2 send default values + 4 don't write eof packet DESCRIPTION Sum fields has table name empty and field_name. @@ -497,7 +498,7 @@ void Protocol::init(THD *thd_arg) */ #ifndef EMBEDDED_LIBRARY -bool Protocol::send_fields(List *list, uint flag) +bool Protocol::send_fields(List *list, uint flags) { List_iterator_fast it(*list); Item *item; @@ -508,7 +509,7 @@ bool Protocol::send_fields(List *list, uint flag) CHARSET_INFO *thd_charset= thd->variables.character_set_results; DBUG_ENTER("send_fields"); - if (flag & 1) + if (flags & SEND_NUM_ROWS) { // Packet with number of elements char *pos=net_store_length(buff, (uint) list->elements); (void) my_net_write(&thd->net, buff,(uint) (pos-buff)); @@ -594,7 +595,7 @@ bool Protocol::send_fields(List *list, uint flag) } } local_packet->length((uint) (pos - local_packet->ptr())); - if (flag & 2) + if (flags & SEND_DEFAULTS) item->send(&prot, &tmp); // Send default value if (prot.write()) break; /* purecov: inspected */ @@ -603,7 +604,8 @@ bool Protocol::send_fields(List *list, uint flag) #endif } - my_net_write(&thd->net, eof_buff, 1); + if (flags & SEND_EOF) + my_net_write(&thd->net, eof_buff, 1); DBUG_RETURN(prepare_for_send(list)); err: @@ -962,12 +964,6 @@ void Protocol_prep::prepare_for_resend() bool Protocol_prep::store(const char *from, uint length, CHARSET_INFO *fromcs) { CHARSET_INFO *tocs= thd->variables.character_set_results; -#ifndef DEBUG_OFF - DBUG_ASSERT(field_types == 0 || - field_types[field_pos] == MYSQL_TYPE_DECIMAL || - (field_types[field_pos] >= MYSQL_TYPE_ENUM && - field_types[field_pos] <= MYSQL_TYPE_GEOMETRY)); -#endif field_pos++; return store_string_aux(from, length, fromcs, tocs); } @@ -975,12 +971,6 @@ bool Protocol_prep::store(const char *from, uint length, CHARSET_INFO *fromcs) bool Protocol_prep::store(const char *from,uint length, CHARSET_INFO *fromcs, CHARSET_INFO *tocs) { -#ifndef DEBUG_OFF - DBUG_ASSERT(field_types == 0 || - field_types[field_pos] == MYSQL_TYPE_DECIMAL || - (field_types[field_pos] >= MYSQL_TYPE_ENUM && - field_types[field_pos] <= MYSQL_TYPE_GEOMETRY)); -#endif field_pos++; return store_string_aux(from, length, fromcs, tocs); } @@ -998,10 +988,6 @@ bool Protocol_prep::store_null() bool Protocol_prep::store_tiny(longlong from) { -#ifndef DEBUG_OFF - DBUG_ASSERT(field_types == 0 || - field_types[field_pos] == MYSQL_TYPE_TINY); -#endif char buff[1]; field_pos++; buff[0]= (uchar) from; @@ -1011,11 +997,6 @@ bool Protocol_prep::store_tiny(longlong from) bool Protocol_prep::store_short(longlong from) { -#ifndef DEBUG_OFF - DBUG_ASSERT(field_types == 0 || - field_types[field_pos] == MYSQL_TYPE_SHORT || - field_types[field_pos] == MYSQL_TYPE_YEAR); -#endif field_pos++; char *to= packet->prep_append(2, PACKET_BUFFER_EXTRA_ALLOC); if (!to) @@ -1027,11 +1008,6 @@ bool Protocol_prep::store_short(longlong from) bool Protocol_prep::store_long(longlong from) { -#ifndef DEBUG_OFF - DBUG_ASSERT(field_types == 0 || - field_types[field_pos] == MYSQL_TYPE_INT24 || - field_types[field_pos] == MYSQL_TYPE_LONG); -#endif field_pos++; char *to= packet->prep_append(4, PACKET_BUFFER_EXTRA_ALLOC); if (!to) @@ -1043,10 +1019,6 @@ bool Protocol_prep::store_long(longlong from) bool Protocol_prep::store_longlong(longlong from, bool unsigned_flag) { -#ifndef DEBUG_OFF - DBUG_ASSERT(field_types == 0 || - field_types[field_pos] == MYSQL_TYPE_LONGLONG); -#endif field_pos++; char *to= packet->prep_append(8, PACKET_BUFFER_EXTRA_ALLOC); if (!to) @@ -1058,10 +1030,6 @@ bool Protocol_prep::store_longlong(longlong from, bool unsigned_flag) bool Protocol_prep::store(float from, uint32 decimals, String *buffer) { -#ifndef DEBUG_OFF - DBUG_ASSERT(field_types == 0 || - field_types[field_pos] == MYSQL_TYPE_FLOAT); -#endif field_pos++; char *to= packet->prep_append(4, PACKET_BUFFER_EXTRA_ALLOC); if (!to) @@ -1073,10 +1041,6 @@ bool Protocol_prep::store(float from, uint32 decimals, String *buffer) bool Protocol_prep::store(double from, uint32 decimals, String *buffer) { -#ifndef DEBUG_OFF - DBUG_ASSERT(field_types == 0 || - field_types[field_pos] == MYSQL_TYPE_DOUBLE); -#endif field_pos++; char *to= packet->prep_append(8, PACKET_BUFFER_EXTRA_ALLOC); if (!to) @@ -1100,12 +1064,6 @@ bool Protocol_prep::store(Field *field) bool Protocol_prep::store(TIME *tm) { -#ifndef DEBUG_OFF - DBUG_ASSERT(field_types == 0 || - field_types[field_pos] == MYSQL_TYPE_DATETIME || - field_types[field_pos] == MYSQL_TYPE_DATE || - field_types[field_pos] == MYSQL_TYPE_TIMESTAMP); -#endif char buff[12],*pos; uint length; field_pos++; @@ -1140,10 +1098,6 @@ bool Protocol_prep::store_date(TIME *tm) bool Protocol_prep::store_time(TIME *tm) { -#ifndef DEBUG_OFF - DBUG_ASSERT(field_types == 0 || - field_types[field_pos] == MYSQL_TYPE_TIME); -#endif char buff[13], *pos; uint length; field_pos++; diff --git a/sql/protocol.h b/sql/protocol.h index 079c06ae155..8dc2f506c6c 100644 --- a/sql/protocol.h +++ b/sql/protocol.h @@ -50,7 +50,12 @@ public: Protocol(THD *thd_arg) { init(thd_arg); } virtual ~Protocol() {} void init(THD* thd_arg); - virtual bool send_fields(List *list, uint flag); + + static const uint SEND_NUM_ROWS= 1; + static const uint SEND_DEFAULTS= 2; + static const uint SEND_EOF= 4; + virtual bool send_fields(List *list, uint flags); + bool send_records_num(List *list, ulonglong records); bool store(I_List *str_list); bool store(const char *from, CHARSET_INFO *cs); @@ -163,7 +168,7 @@ public: prev_record= &data; return Protocol_simple::prepare_for_send(item_list); } - bool send_fields(List *list, uint flag); + bool send_fields(List *list, uint flags); bool write(); uint get_field_count() { return field_count; } }; diff --git a/sql/protocol_cursor.cc b/sql/protocol_cursor.cc index 749b66785d4..d2c99dcaebc 100644 --- a/sql/protocol_cursor.cc +++ b/sql/protocol_cursor.cc @@ -26,7 +26,7 @@ #include "mysql_priv.h" #include -bool Protocol_cursor::send_fields(List *list, uint flag) +bool Protocol_cursor::send_fields(List *list, uint flags) { List_iterator_fast it(*list); Item *item; @@ -67,7 +67,7 @@ bool Protocol_cursor::send_fields(List *list, uint flag) if (INTERNAL_NUM_FIELD(client_field)) client_field->flags|= NUM_FLAG; - if (flag & 2) + if (flags & Protocol::SEND_DEFAULTS) { char buff[80]; String tmp(buff, sizeof(buff), default_charset_info), *res; diff --git a/sql/repl_failsafe.cc b/sql/repl_failsafe.cc index 4feb24f06b2..bb70b793d3b 100644 --- a/sql/repl_failsafe.cc +++ b/sql/repl_failsafe.cc @@ -461,7 +461,8 @@ int show_new_master(THD* thd) field_list.push_back(new Item_empty_string("Log_name", 20)); field_list.push_back(new Item_return_int("Log_pos", 10, MYSQL_TYPE_LONGLONG)); - if (protocol->send_fields(&field_list, 1)) + if (protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(-1); protocol->prepare_for_resend(); protocol->store(lex_mi->log_file_name, &my_charset_bin); @@ -651,7 +652,8 @@ int show_slave_hosts(THD* thd) field_list.push_back(new Item_return_int("Master_id", 10, MYSQL_TYPE_LONG)); - if (protocol->send_fields(&field_list, 1)) + if (protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(-1); pthread_mutex_lock(&LOCK_slave_list); diff --git a/sql/slave.cc b/sql/slave.cc index 8843e1561ec..0defbe35163 100644 --- a/sql/slave.cc +++ b/sql/slave.cc @@ -2335,7 +2335,8 @@ int show_master_info(THD* thd, MASTER_INFO* mi) field_list.push_back(new Item_return_int("Seconds_Behind_Master", 10, MYSQL_TYPE_LONGLONG)); - if (protocol->send_fields(&field_list, 1)) + if (protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(-1); if (mi->host[0]) diff --git a/sql/sp.cc b/sql/sp.cc index 4164a27ca5f..83ed95c7c4c 100644 --- a/sql/sp.cc +++ b/sql/sp.cc @@ -563,7 +563,8 @@ db_show_routine_status(THD *thd, int type, const char *wild) } } /* Print header */ - if (thd->protocol->send_fields(&field_list,1)) + if (thd->protocol->send_fields(&field_list, Protocol::SEND_NUM_ROWS | + Protocol::SEND_EOF)) { res= SP_INTERNAL_ERROR; goto err_case; diff --git a/sql/sp_head.cc b/sql/sp_head.cc index 32c25c98425..10e6d8f6db4 100644 --- a/sql/sp_head.cc +++ b/sql/sp_head.cc @@ -898,7 +898,8 @@ sp_head::show_create_procedure(THD *thd) // 1024 is for not to confuse old clients field_list.push_back(new Item_empty_string("Create Procedure", max(buffer.length(), 1024))); - if (protocol->send_fields(&field_list, 1)) + if (protocol->send_fields(&field_list, Protocol::SEND_NUM_ROWS | + Protocol::SEND_EOF)) { res= 1; goto done; @@ -964,7 +965,8 @@ sp_head::show_create_function(THD *thd) field_list.push_back(new Item_empty_string("sql_mode", sql_mode_len)); field_list.push_back(new Item_empty_string("Create Function", max(buffer.length(),1024))); - if (protocol->send_fields(&field_list, 1)) + if (protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) { res= 1; goto done; diff --git a/sql/sql_acl.cc b/sql/sql_acl.cc index 41c629e5c12..07c3e59fd81 100644 --- a/sql/sql_acl.cc +++ b/sql/sql_acl.cc @@ -3163,7 +3163,8 @@ int mysql_show_grants(THD *thd,LEX_USER *lex_user) strxmov(buff,"Grants for ",lex_user->user.str,"@", lex_user->host.str,NullS); field_list.push_back(field); - if (protocol->send_fields(&field_list,1)) + if (protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(-1); rw_wrlock(&LOCK_grant); diff --git a/sql/sql_class.cc b/sql/sql_class.cc index 103a5080caf..da64479abf2 100644 --- a/sql/sql_class.cc +++ b/sql/sql_class.cc @@ -675,7 +675,8 @@ int THD::send_explain_fields(select_result *result) item->maybe_null=1; field_list.push_back(new Item_return_int("rows",10, MYSQL_TYPE_LONGLONG)); field_list.push_back(new Item_empty_string("Extra",255)); - return (result->send_fields(field_list,1)); + return (result->send_fields(field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)); } #ifdef SIGNAL_WITH_VIO_CLOSE @@ -722,9 +723,9 @@ sql_exchange::sql_exchange(char *name,bool flag) escaped= &default_escaped; } -bool select_send::send_fields(List &list,uint flag) +bool select_send::send_fields(List &list, uint flags) { - return thd->protocol->send_fields(&list,flag); + return thd->protocol->send_fields(&list, flags); } /* Send data to client. Returns 0 if ok */ @@ -1354,7 +1355,8 @@ Statement::Statement(THD *thd) allow_sum_func(0), lex(&main_lex), query(0), - query_length(0) + query_length(0), + cursor(0) { name.str= NULL; } @@ -1372,7 +1374,8 @@ Statement::Statement() allow_sum_func(0), /* initialized later */ lex(&main_lex), query(0), /* these two are set */ - query_length(0) /* in alloc_query() */ + query_length(0), /* in alloc_query() */ + cursor(0) { } @@ -1391,6 +1394,7 @@ void Statement::set_statement(Statement *stmt) lex= stmt->lex; query= stmt->query; query_length= stmt->query_length; + cursor= stmt->cursor; } diff --git a/sql/sql_class.h b/sql/sql_class.h index 00d32543da0..eccaf072008 100644 --- a/sql/sql_class.h +++ b/sql/sql_class.h @@ -487,6 +487,9 @@ public: void set_item_arena(Item_arena *set); }; + +class Cursor; + /* State of a single command executed against this connection. One connection can contain a lot of simultaneously running statements, @@ -543,6 +546,7 @@ public: */ char *query; uint32 query_length; // current query length + Cursor *cursor; public: /* We build without RTTI, so dynamic_cast can't be used. */ @@ -1054,6 +1058,7 @@ public: { DBUG_ASSERT(current_arena!=0); cleanup_items(current_arena->free_list); + /* no need to reset free_list as it won't be used anymore */ free_items(free_list); close_thread_tables(this); // to close derived tables free_root(&mem_root, MYF(0)); @@ -1108,7 +1113,7 @@ public: unit= u; return 0; } - virtual bool send_fields(List &list,uint flag)=0; + virtual bool send_fields(List &list, uint flags)=0; virtual bool send_data(List &items)=0; virtual bool initialize_tables (JOIN *join=0) { return 0; } virtual void send_error(uint errcode,const char *err); @@ -1120,7 +1125,7 @@ public: class select_send :public select_result { public: select_send() {} - bool send_fields(List &list,uint flag); + bool send_fields(List &list, uint flags); bool send_data(List &items); bool send_eof(); }; @@ -1138,7 +1143,7 @@ public: select_to_file(sql_exchange *ex) :exchange(ex), file(-1),row_count(0L) { path[0]=0; } ~select_to_file(); - bool send_fields(List &list, uint flag) { return 0; } + bool send_fields(List &list, uint flags) { return 0; } void send_error(uint errcode,const char *err); }; @@ -1185,8 +1190,7 @@ class select_insert :public select_result { } ~select_insert(); int prepare(List &list, SELECT_LEX_UNIT *u); - bool send_fields(List &list, uint flag) - { return 0; } + bool send_fields(List &list, uint flags) { return 0; } bool send_data(List &items); void send_error(uint errcode,const char *err); bool send_eof(); @@ -1273,8 +1277,7 @@ class select_union :public select_result { select_union(TABLE *table_par); ~select_union(); int prepare(List &list, SELECT_LEX_UNIT *u); - bool send_fields(List &list, uint flag) - { return 0; } + bool send_fields(List &list, uint flags) { return 0; } bool send_data(List &items); bool send_eof(); bool flush(); @@ -1288,7 +1291,7 @@ protected: Item_subselect *item; public: select_subselect(Item_subselect *item); - bool send_fields(List &list, uint flag) { return 0; }; + bool send_fields(List &list, uint flags) { return 0; } bool send_data(List &items)=0; bool send_eof() { return 0; }; @@ -1457,8 +1460,7 @@ public: multi_delete(THD *thd, TABLE_LIST *dt, uint num_of_tables); ~multi_delete(); int prepare(List &list, SELECT_LEX_UNIT *u); - bool send_fields(List &list, - uint flag) { return 0; } + bool send_fields(List &list, uint flags) { return 0; } bool send_data(List &items); bool initialize_tables (JOIN *join); void send_error(uint errcode,const char *err); @@ -1486,7 +1488,7 @@ public: List *values, enum_duplicates handle_duplicates); ~multi_update(); int prepare(List &list, SELECT_LEX_UNIT *u); - bool send_fields(List &list, uint flag) { return 0; } + bool send_fields(List &list, uint flags) { return 0; } bool send_data(List &items); bool initialize_tables (JOIN *join); void send_error(uint errcode,const char *err); @@ -1515,7 +1517,7 @@ public: select_dumpvar(void) { var_list.empty(); local_vars.empty(); vars.empty(); row_count=0;} ~select_dumpvar() {} int prepare(List &list, SELECT_LEX_UNIT *u); - bool send_fields(List &list, uint flag) {return 0;} + bool send_fields(List &list, uint flags) { return 0; } bool send_data(List &items); bool send_eof(); }; diff --git a/sql/sql_error.cc b/sql/sql_error.cc index 8aa7bdf9a7f..d68d62a8820 100644 --- a/sql/sql_error.cc +++ b/sql/sql_error.cc @@ -185,7 +185,8 @@ my_bool mysqld_show_warnings(THD *thd, ulong levels_to_show) field_list.push_back(new Item_return_int("Code",4, MYSQL_TYPE_LONG)); field_list.push_back(new Item_empty_string("Message",MYSQL_ERRMSG_SIZE)); - if (thd->protocol->send_fields(&field_list,1)) + if (thd->protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(1); MYSQL_ERROR *err; diff --git a/sql/sql_handler.cc b/sql/sql_handler.cc index 8d06d3f0017..0df3d617d7f 100644 --- a/sql/sql_handler.cc +++ b/sql/sql_handler.cc @@ -252,7 +252,7 @@ int mysql_ha_read(THD *thd, TABLE_LIST *tables, insert_fields(thd, tables, tables->db, tables->alias, &it, 0); select_limit+=offset_limit; - protocol->send_fields(&list,1); + protocol->send_fields(&list, Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF); HANDLER_TABLES_HACK(thd); MYSQL_LOCK *lock=mysql_lock_tables(thd,&tables->table,1); diff --git a/sql/sql_help.cc b/sql/sql_help.cc index 08a8fc626cc..85d5271d4c3 100644 --- a/sql/sql_help.cc +++ b/sql/sql_help.cc @@ -426,7 +426,8 @@ int send_answer_1(Protocol *protocol, String *s1, String *s2, String *s3) field_list.push_back(new Item_empty_string("description",1000)); field_list.push_back(new Item_empty_string("example",1000)); - if (protocol->send_fields(&field_list,1)) + if (protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(1); protocol->prepare_for_resend(); @@ -468,7 +469,8 @@ int send_header_2(Protocol *protocol, bool for_category) field_list.push_back(new Item_empty_string("source_category_name",64)); field_list.push_back(new Item_empty_string("name",64)); field_list.push_back(new Item_empty_string("is_it_category",1)); - DBUG_RETURN(protocol->send_fields(&field_list,1)); + DBUG_RETURN(protocol->send_fields(&field_list, Protocol::SEND_NUM_ROWS | + Protocol::SEND_EOF)); } /* diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index 7f42d6052d9..21d2a51bfeb 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -1452,6 +1452,11 @@ bool dispatch_command(enum enum_server_command command, THD *thd, mysql_stmt_execute(thd, packet, packet_length); break; } + case COM_FETCH: + { + mysql_stmt_fetch(thd, packet, packet_length); + break; + } case COM_LONG_DATA: { mysql_stmt_get_longdata(thd, packet, packet_length); @@ -1545,7 +1550,6 @@ bool dispatch_command(enum enum_server_command command, THD *thd, send_error(thd,ER_NO_DB_ERROR); break; } - thd->free_list=0; pend= strend(packet); thd->convert_string(&conv_name, system_charset_info, packet, (uint) (pend-packet), thd->charset()); @@ -1567,6 +1571,7 @@ bool dispatch_command(enum enum_server_command command, THD *thd, break; mysqld_list_fields(thd,&table_list,fields); free_items(thd->free_list); + thd->free_list=0; /* free_list should never point to garbage */ break; } #endif @@ -4443,6 +4448,7 @@ void mysql_parse(THD *thd, char *inBuf, uint length) } thd->proc_info="freeing items"; free_items(thd->free_list); /* Free strings used by items */ + thd->free_list= 0; /* free_list should never point to garbage */ lex_end(lex); } DBUG_VOID_RETURN; @@ -4470,6 +4476,7 @@ bool mysql_test_parse_for_slave(THD *thd, char *inBuf, uint length) all_tables_not_ok(thd,(TABLE_LIST*) lex->select_lex.table_list.first)) error= 1; /* Ignore question */ free_items(thd->free_list); /* Free strings used by items */ + thd->free_list= 0; /* free_list should never point to garbage */ lex_end(lex); DBUG_RETURN(error); diff --git a/sql/sql_prepare.cc b/sql/sql_prepare.cc index ed5e75a3622..9d9a0dd73e2 100644 --- a/sql/sql_prepare.cc +++ b/sql/sql_prepare.cc @@ -75,6 +75,8 @@ Long data handling: #ifdef EMBEDDED_LIBRARY /* include MYSQL_BIND headers */ #include +#else +#include #endif /****************************************************************************** @@ -107,7 +109,7 @@ public: }; static void execute_stmt(THD *thd, Prepared_statement *stmt, - String *expanded_query, bool set_context); + String *expanded_query); /****************************************************************************** Implementation @@ -166,7 +168,8 @@ static bool send_prep_stmt(Prepared_statement *stmt, uint columns) return my_net_write(net, buff, sizeof(buff)) || (stmt->param_count && stmt->thd->protocol_simple.send_fields((List *) - &stmt->lex->param_list, 0)) || + &stmt->lex->param_list, + Protocol::SEND_EOF)) || net_flush(net); return 0; } @@ -1098,7 +1101,8 @@ static int mysql_test_select(Prepared_statement *stmt, if (!text_protocol) { if (send_prep_stmt(stmt, lex->select_lex.item_list.elements) || - thd->protocol_simple.send_fields(&lex->select_lex.item_list, 0) + thd->protocol_simple.send_fields(&lex->select_lex.item_list, + Protocol::SEND_EOF) #ifndef EMBEDDED_LIBRARY || net_flush(&thd->net) #endif @@ -1476,6 +1480,12 @@ static int send_prepare_results(Prepared_statement *stmt, bool text_protocol) case SQLCOM_SHOW_GRANTS: case SQLCOM_DROP_TABLE: case SQLCOM_RENAME_TABLE: + case SQLCOM_ALTER_TABLE: + case SQLCOM_COMMIT: + case SQLCOM_CREATE_INDEX: + case SQLCOM_DROP_INDEX: + case SQLCOM_ROLLBACK: + case SQLCOM_TRUNCATE: break; default: @@ -1756,6 +1766,7 @@ static void reset_stmt_params(Prepared_statement *stmt) void mysql_stmt_execute(THD *thd, char *packet, uint packet_length) { ulong stmt_id= uint4korr(packet); + ulong flags= (ulong) ((uchar) packet[4]); /* Query text for binary log, or empty string if the query is not put into binary log. @@ -1782,6 +1793,28 @@ void mysql_stmt_execute(THD *thd, char *packet, uint packet_length) DBUG_VOID_RETURN; } + if (flags & (ulong) CURSOR_TYPE_READ_ONLY) + { + if (stmt->lex->result) + { + /* + If lex->result is set in the parser, this is not a SELECT + statement: we can't open a cursor for it. + */ + flags= 0; + } + else + { + if (!stmt->cursor && + !(stmt->cursor= new (&stmt->mem_root) Cursor())) + { + send_error(thd, ER_OUT_OF_RESOURCES); + DBUG_VOID_RETURN; + } + /* If lex->result is set, mysql_execute_command will use it */ + stmt->lex->result= &stmt->cursor->result; + } + } #ifndef EMBEDDED_LIBRARY if (stmt->param_count) { @@ -1800,16 +1833,55 @@ void mysql_stmt_execute(THD *thd, char *packet, uint packet_length) if (stmt->param_count && stmt->set_params_data(stmt, &expanded_query)) goto set_params_data_err; #endif + thd->stmt_backup.set_statement(thd); + thd->set_statement(stmt); thd->current_arena= stmt; + reset_stmt_for_execute(thd, stmt->lex); + /* From now cursors assume that thd->mem_root is clean */ + if (expanded_query.length() && + alloc_query(thd, (char *)expanded_query.ptr(), + expanded_query.length()+1)) + { + my_error(ER_OUTOFMEMORY, 0, expanded_query.length()); + goto err; + } + thd->protocol= &thd->protocol_prep; // Switch to binary protocol - execute_stmt(thd, stmt, &expanded_query, true); + if (!(specialflag & SPECIAL_NO_PRIOR)) + my_pthread_setprio(pthread_self(),QUERY_PRIOR); + mysql_execute_command(thd); + if (!(specialflag & SPECIAL_NO_PRIOR)) + my_pthread_setprio(pthread_self(), WAIT_PRIOR); thd->protocol= &thd->protocol_simple; // Use normal protocol + + if (flags & (ulong) CURSOR_TYPE_READ_ONLY) + { + if (stmt->cursor->is_open()) + stmt->cursor->init_from_thd(thd); + thd->set_item_arena(&thd->stmt_backup); + } + else + { + thd->lex->unit.cleanup(); + cleanup_items(stmt->free_list); + reset_stmt_params(stmt); + close_thread_tables(thd); /* to close derived tables */ + /* + Free items that were created during this execution of the PS by + query optimizer. + */ + free_items(thd->free_list); + thd->free_list= 0; + } + + thd->set_statement(&thd->stmt_backup); thd->current_arena= 0; DBUG_VOID_RETURN; set_params_data_err: reset_stmt_params(stmt); my_error(ER_WRONG_ARGUMENTS, MYF(0), "mysql_stmt_execute"); +err: send_error(thd); DBUG_VOID_RETURN; } @@ -1845,7 +1917,6 @@ void mysql_sql_stmt_execute(THD *thd, LEX_STRING *stmt_name) DBUG_VOID_RETURN; } - thd->free_list= NULL; thd->stmt_backup.set_statement(thd); thd->set_statement(stmt); if (stmt->set_params_from_vars(stmt, @@ -1856,7 +1927,7 @@ void mysql_sql_stmt_execute(THD *thd, LEX_STRING *stmt_name) send_error(thd); } thd->current_arena= stmt; - execute_stmt(thd, stmt, &expanded_query, false); + execute_stmt(thd, stmt, &expanded_query); thd->current_arena= 0; DBUG_VOID_RETURN; } @@ -1872,20 +1943,13 @@ void mysql_sql_stmt_execute(THD *thd, LEX_STRING *stmt_name) placeholders replaced with actual values. Otherwise empty string. NOTES - Caller must set parameter values and thd::protocol. - thd->free_list is assumed to be garbage. + Caller must set parameter values and thd::protocol. */ static void execute_stmt(THD *thd, Prepared_statement *stmt, - String *expanded_query, bool set_context) + String *expanded_query) { DBUG_ENTER("execute_stmt"); - if (set_context) - { - thd->free_list= NULL; - thd->stmt_backup.set_statement(thd); - thd->set_statement(stmt); - } reset_stmt_for_execute(thd, stmt->lex); if (expanded_query->length() && @@ -1899,21 +1963,76 @@ static void execute_stmt(THD *thd, Prepared_statement *stmt, if (!(specialflag & SPECIAL_NO_PRIOR)) my_pthread_setprio(pthread_self(),QUERY_PRIOR); mysql_execute_command(thd); - thd->lex->unit.cleanup(); if (!(specialflag & SPECIAL_NO_PRIOR)) my_pthread_setprio(pthread_self(), WAIT_PRIOR); + thd->lex->unit.cleanup(); cleanup_items(stmt->free_list); reset_stmt_params(stmt); close_thread_tables(thd); // to close derived tables thd->set_statement(&thd->stmt_backup); /* Free Items that were created during this execution of the PS. */ free_items(thd->free_list); + /* + In the rest of prepared statements code we assume that free_list + never points to garbage: keep this predicate true. + */ thd->free_list= 0; DBUG_VOID_RETURN; } +/* + COM_FETCH handler: fetches requested amount of rows from cursor + SYNOPSIS +*/ + +void mysql_stmt_fetch(THD *thd, char *packet, uint packet_length) +{ + /* assume there is always place for 8-16 bytes */ + ulong stmt_id= uint4korr(packet); + ulong num_rows= uint4korr(packet+=4); + Statement *stmt; + int error; + + DBUG_ENTER("mysql_stmt_fetch"); + + if (!(stmt= thd->stmt_map.find(stmt_id)) || + !stmt->cursor || + !stmt->cursor->is_open()) + { + my_error(ER_UNKNOWN_STMT_HANDLER, MYF(0), stmt_id, "fetch"); + send_error(thd); + DBUG_VOID_RETURN; + } + + thd->stmt_backup.set_statement(thd); + thd->stmt_backup.set_item_arena(thd); + thd->set_statement(stmt); + stmt->cursor->init_thd(thd); + + if (!(specialflag & SPECIAL_NO_PRIOR)) + my_pthread_setprio(pthread_self(), QUERY_PRIOR); + + thd->protocol= &thd->protocol_prep; // Switch to binary protocol + error= stmt->cursor->fetch(num_rows); + thd->protocol= &thd->protocol_simple; // Use normal protocol + + if (!(specialflag & SPECIAL_NO_PRIOR)) + my_pthread_setprio(pthread_self(), WAIT_PRIOR); + + /* Restore THD state */ + stmt->cursor->reset_thd(thd); + thd->set_statement(&thd->stmt_backup); + thd->set_item_arena(&thd->stmt_backup); + + if (error && error != -4) + send_error(thd, ER_OUT_OF_RESOURCES); + + DBUG_VOID_RETURN; +} + + /* Reset a prepared statement in case there was a recoverable error. SYNOPSIS @@ -2084,8 +2203,11 @@ void Prepared_statement::setup_set_params() } } + Prepared_statement::~Prepared_statement() { + if (cursor) + cursor->Cursor::~Cursor(); free_items(free_list); } diff --git a/sql/sql_repl.cc b/sql/sql_repl.cc index 98bf4e86aba..9e38a65d412 100644 --- a/sql/sql_repl.cc +++ b/sql/sql_repl.cc @@ -1297,7 +1297,8 @@ int show_binlog_events(THD* thd) Format_description_log_event(3); /* MySQL 4.0 by default */ Log_event::init_show_field_list(&field_list); - if (protocol-> send_fields(&field_list, 1)) + if (protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(-1); if (mysql_bin_log.is_open()) @@ -1426,7 +1427,8 @@ int show_binlog_info(THD* thd) field_list.push_back(new Item_empty_string("Binlog_Do_DB",255)); field_list.push_back(new Item_empty_string("Binlog_Ignore_DB",255)); - if (protocol->send_fields(&field_list, 1)) + if (protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(-1); protocol->prepare_for_resend(); @@ -1476,7 +1478,8 @@ int show_binlogs(THD* thd) } field_list.push_back(new Item_empty_string("Log_name", 255)); - if (protocol->send_fields(&field_list, 1)) + if (protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(1); mysql_bin_log.lock_index(); index_file=mysql_bin_log.get_index_file(); diff --git a/sql/sql_select.cc b/sql/sql_select.cc index 756b5f3c017..fce77969bcf 100644 --- a/sql/sql_select.cc +++ b/sql/sql_select.cc @@ -1105,7 +1105,8 @@ JOIN::exec() (zero_result_cause?zero_result_cause:"No tables used")); else { - result->send_fields(fields_list,1); + result->send_fields(fields_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF); if (!having || having->val_int()) { if (do_send_rows && (procedure ? (procedure->send_row(fields_list) || @@ -1512,12 +1513,45 @@ JOIN::exec() DBUG_VOID_RETURN; } } + /* XXX: When can we have here thd->net.report_error not zero? */ + if (thd->net.report_error) + { + error= thd->net.report_error; + DBUG_VOID_RETURN; + } curr_join->having= curr_join->tmp_having; - thd->proc_info="Sending data"; - error= thd->net.report_error || - do_select(curr_join, curr_fields_list, NULL, procedure); - thd->limit_found_rows= curr_join->send_records; - thd->examined_row_count= curr_join->examined_rows; + curr_join->fields= curr_fields_list; + curr_join->procedure= procedure; + + if (unit == &thd->lex->unit && + (unit->fake_select_lex == 0 || select_lex == unit->fake_select_lex) && + thd->cursor && tables != const_tables) + { + /* + We are here if this is JOIN::exec for the last select of the main unit + and the client requested to open a cursor. + We check that not all tables are constant because this case is not + handled by do_select() separately, and this case is not implemented + for cursors yet. + */ + DBUG_ASSERT(error == 0); + /* + curr_join is used only for reusable joins - that is, + to perform SELECT for each outer row (like in subselects). + This join is main, so we know for sure that curr_join == join. + */ + DBUG_ASSERT(curr_join == this); + /* Open cursor for the last join sweep */ + error= thd->cursor->open(this); + } + else + { + thd->proc_info="Sending data"; + error= do_select(curr_join, curr_fields_list, NULL, procedure); + thd->limit_found_rows= curr_join->send_records; + thd->examined_row_count= curr_join->examined_rows; + } + DBUG_VOID_RETURN; } @@ -1566,6 +1600,306 @@ JOIN::cleanup() } +/************************* Cursor ******************************************/ + +void +Cursor::init_from_thd(THD *thd) +{ + /* + We need to save and reset thd->mem_root, otherwise it'll be freed + later in mysql_parse. + */ + mem_root= thd->mem_root; + init_sql_alloc(&thd->mem_root, + thd->variables.query_alloc_block_size, + thd->variables.query_prealloc_size); + + /* + The same is true for open tables and lock: save tables and zero THD + pointers to prevent table close in close_thread_tables (This is a part + of the temporary solution to make cursors work with minimal changes to + the current source base). + */ + derived_tables= thd->derived_tables; + open_tables= thd->open_tables; + lock= thd->lock; + query_id= thd->query_id; + free_list= thd->free_list; + reset_thd(thd); + /* + XXX: thd->locked_tables is not changed. + What problems can we have with it if cursor is open? + */ + /* + TODO: grab thd->free_list here? + */ +} + + +void +Cursor::init_thd(THD *thd) +{ + thd->mem_root= mem_root; + + DBUG_ASSERT(thd->derived_tables == 0); + thd->derived_tables= derived_tables; + + DBUG_ASSERT(thd->open_tables == 0); + thd->open_tables= open_tables; + + DBUG_ASSERT(thd->lock== 0); + thd->lock= lock; + thd->query_id= query_id; + thd->free_list= free_list; +} + + +void +Cursor::reset_thd(THD *thd) +{ + thd->derived_tables= 0; + thd->open_tables= 0; + thd->lock= 0; + thd->free_list= 0; +} + + +int +Cursor::open(JOIN *join_arg) +{ + join= join_arg; + + THD *thd= join->thd; + + /* First non-constant table */ + JOIN_TAB *join_tab= join->join_tab + join->const_tables; + + /* + Send fields description to the client; server_status is sent + in 'EOF' packet, which ends send_fields(). + */ + thd->server_status|= SERVER_STATUS_CURSOR_EXISTS; + join->result->send_fields(*join->fields, Protocol::SEND_NUM_ROWS); + ::send_eof(thd); + thd->server_status&= ~SERVER_STATUS_CURSOR_EXISTS; + + /* Prepare JOIN for reading rows. */ + + Next_select_func end_select= join->sort_and_group || join->procedure && + join->procedure->flags & PROC_GROUP ? + end_send_group : end_send; + + join->join_tab[join->tables-1].next_select= end_select; + join->send_records= 0; + join->fetch_limit= join->unit->offset_limit_cnt; + + /* Disable JOIN CACHE as it is not working with cursors yet */ + for (JOIN_TAB *tab= join_tab; tab != join->join_tab + join->tables - 1; ++tab) + { + if (tab->next_select == sub_select_cache) + tab->next_select= sub_select; + } + + DBUG_ASSERT(join_tab->table->reginfo.not_exists_optimize == 0); + DBUG_ASSERT(join_tab->not_used_in_distinct == 0); + /* + null_row is set only if row not found and it's outer join: should never + happen for the first table in join_tab list + */ + DBUG_ASSERT(join_tab->table->null_row == 0); + + return join_tab->read_first_record(join_tab); +} + + +/* + DESCRIPTION + Fetch next num_rows rows from the cursor and sent them to the client + PRECONDITION: + Cursor is open + RETURN VALUES: + -4 there are more rows, send_eof sent to the client + 0 no more rows, send_eof was sent to the client, cursor is closed + other fatal fetch error, cursor is closed (error is not reported) +*/ + +int +Cursor::fetch(ulong num_rows) +{ + THD *thd= join->thd; + JOIN_TAB *join_tab= join->join_tab + join->const_tables;; + COND *on_expr= join_tab->on_expr; + COND *select_cond= join_tab->select_cond; + READ_RECORD *info= &join_tab->read_record; + + int error= 0; + + join->fetch_limit+= num_rows; + + /* + Run while there are new rows in the first table; + For each row, satisfying ON and WHERE clauses (those parts of them which + can be evaluated early), call next_select. + */ + do + { + int no_more_rows; + + join->examined_rows++; + + if (thd->killed) /* Aborted by user */ + { + my_error(ER_SERVER_SHUTDOWN,MYF(0)); + return -1; + } + + if (on_expr == 0 || on_expr->val_int()) + { + if (select_cond == 0 || select_cond->val_int()) + { + /* + TODO: call table->unlock_row() to unlock row failed selection, + when this feature will be used. + */ + error= join_tab->next_select(join, join_tab + 1, 0); + DBUG_ASSERT(error <= 0); + if (error) + { + /* real error or LIMIT/FETCH LIMIT worked */ + if (error == -4) + { + /* + FETCH LIMIT, read ahead one row, and close cursor + if there is no more rows XXX: to be fixed to support + non-equi-joins! + */ + if ((no_more_rows= info->read_record(info))) + error= no_more_rows > 0 ? -1: 0; + } + break; + } + } + } + /* read next row; break loop if there was an error */ + if ((no_more_rows= info->read_record(info))) + { + if (no_more_rows > 0) + error= -1; + else + { + enum { END_OF_RECORDS= 1 }; + error= join_tab->next_select(join, join_tab+1, (int) END_OF_RECORDS); + } + break; + } + } + while (thd->net.report_error == 0); + + if (thd->net.report_error) + error= -1; + + switch (error) { + /* Fetch limit worked, possibly more rows are there */ + case -4: + if (thd->transaction.all.innobase_tid) + ha_release_temporary_latches(thd); + thd->server_status|= SERVER_STATUS_CURSOR_EXISTS; + ::send_eof(thd); + thd->server_status&= ~SERVER_STATUS_CURSOR_EXISTS; + /* save references to memory, allocated during fetch */ + mem_root= thd->mem_root; + free_list= thd->free_list; + break; + /* Limit clause worked: this is the same as 'no more rows' */ + case -3: /* LIMIT clause worked */ + error= 0; + /* fallthrough */ + case 0: /* No more rows */ + if (thd->transaction.all.innobase_tid) + ha_release_temporary_latches(thd); + close(); + thd->server_status|= SERVER_STATUS_LAST_ROW_SENT; + ::send_eof(thd); + thd->server_status&= ~SERVER_STATUS_LAST_ROW_SENT; + join= 0; + unit= 0; + free_items(thd->free_list); + thd->free_list= free_list= 0; + /* + Must be last, as some memory might be allocated for free purposes, + like in free_tmp_table() (TODO: fix this issue) + */ + mem_root= thd->mem_root; + free_root(&mem_root, MYF(0)); + break; + default: + close(); + join= 0; + unit= 0; + free_items(thd->free_list); + thd->free_list= free_list= 0; + /* + Must be last, as some memory might be allocated for free purposes, + like in free_tmp_table() (TODO: fix this issue) + */ + mem_root= thd->mem_root; + free_root(&mem_root, MYF(0)); + break; + } + return error; +} + + +void +Cursor::close() +{ + THD *thd= join->thd; + join->join_free(0); + if (unit) + { + /* In case of UNIONs JOIN is freed inside unit->cleanup() */ + unit->cleanup(); + } + else + { + join->cleanup(); + delete join; + } + /* XXX: Another hack: closing tables used in the cursor */ + { + DBUG_ASSERT(lock || open_tables || derived_tables); + + TABLE *tmp_open_tables= thd->open_tables; + TABLE *tmp_derived_tables= thd->derived_tables; + MYSQL_LOCK *tmp_lock= thd->lock; + + thd->open_tables= open_tables; + thd->derived_tables= derived_tables; + thd->lock= lock; + close_thread_tables(thd); + + thd->open_tables= tmp_derived_tables; + thd->derived_tables= tmp_derived_tables; + thd->lock= tmp_lock; + } +} + + +Cursor::~Cursor() +{ + if (is_open()) + close(); + free_items(free_list); + /* + Must be last, as some memory might be allocated for free purposes, + like in free_tmp_table() (TODO: fix this issue) + */ + free_root(&mem_root, MYF(0)); +} + +/*********************************************************************/ + + int mysql_select(THD *thd, Item ***rref_pointer_array, TABLE_LIST *tables, uint wild_num, List &fields, @@ -1637,6 +1971,16 @@ mysql_select(THD *thd, Item ***rref_pointer_array, join->exec(); + if (thd->cursor && thd->cursor->is_open()) + { + /* + A cursor was opened for the last sweep in exec(). + We are here only if this is mysql_select for top-level SELECT_LEX_UNIT + and there were no error. + */ + free_join= 0; + } + if (thd->lex->describe & DESCRIBE_EXTENDED) { select_lex->where= join->conds_history; @@ -5310,7 +5654,8 @@ return_zero_rows(JOIN *join, select_result *result,TABLE_LIST *tables, if (having && having->val_int() == 0) send_row=0; } - if (!(result->send_fields(fields,1))) + if (!(result->send_fields(fields, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF))) { if (send_row) { @@ -7035,7 +7380,7 @@ do_select(JOIN *join,List *fields,TABLE *table,Procedure *procedure) { int error= 0; JOIN_TAB *join_tab; - int (*end_select)(JOIN *, struct st_join_table *,bool); + Next_select_func end_select; DBUG_ENTER("do_select"); join->procedure=procedure; @@ -7043,7 +7388,8 @@ do_select(JOIN *join,List *fields,TABLE *table,Procedure *procedure) Tell the client how many fields there are in a row */ if (!table) - join->result->send_fields(*fields,1); + join->result->send_fields(*fields, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF); else { VOID(table->file->extra(HA_EXTRA_WRITE_CACHE)); @@ -8076,6 +8422,14 @@ end_send(JOIN *join, JOIN_TAB *join_tab __attribute__((unused)), } DBUG_RETURN(-3); // Abort nicely } + else if (join->send_records >= join->fetch_limit) + { + /* + There is a server side cursor and all rows for + this fetch request are sent. + */ + DBUG_RETURN(-4); + } } else { @@ -8150,6 +8504,14 @@ end_send_group(JOIN *join, JOIN_TAB *join_tab __attribute__((unused)), join->do_send_rows=0; join->unit->select_limit_cnt = HA_POS_ERROR; } + else if (join->send_records >= join->fetch_limit) + { + /* + There is a server side cursor and all rows + for this fetch request are sent. + */ + DBUG_RETURN(-4); + } } } else diff --git a/sql/sql_select.h b/sql/sql_select.h index 8ffe50e6db2..284e4315917 100644 --- a/sql/sql_select.h +++ b/sql/sql_select.h @@ -81,6 +81,10 @@ enum join_type { JT_UNKNOWN,JT_SYSTEM,JT_CONST,JT_EQ_REF,JT_REF,JT_MAYBE_REF, class JOIN; +typedef int (*Next_select_func)(JOIN *,struct st_join_table *,bool); +typedef int (*Read_record_func)(struct st_join_table *tab); + + typedef struct st_join_table { TABLE *table; KEYUSE *keyuse; /* pointer to first used key */ @@ -95,8 +99,8 @@ typedef struct st_join_table { st_join_table *first_upper; /* first inner table for embedding outer join */ st_join_table *first_unmatched; /* used for optimization purposes only */ const char *info; - int (*read_first_record)(struct st_join_table *tab); - int (*next_select)(JOIN *,struct st_join_table *,bool); + Read_record_func read_first_record; + Next_select_func next_select; READ_RECORD read_record; double worst_seeks; key_map const_keys; /* Keys with constant part */ @@ -149,6 +153,16 @@ class JOIN :public Sql_alloc bool do_send_rows; table_map const_table_map,found_const_table_map,outer_join; ha_rows send_records,found_records,examined_rows,row_limit, select_limit; + /* + Used to fetch no more than given amount of rows per one + fetch operation of server side cursor. + The value is checked in end_send and end_send_group in fashion, similar + to offset_limit_cnt: + - fetch_limit= HA_POS_ERROR if there is no cursor. + - when we open a cursor, we set fetch_limit to 0, + - on each fetch iteration we add num_rows to fetch to fetch_limit + */ + ha_rows fetch_limit; POSITION positions[MAX_TABLES+1],best_positions[MAX_TABLES+1]; double best_read; List *fields; @@ -239,6 +253,7 @@ class JOIN :public Sql_alloc do_send_rows= 1; send_records= 0; found_records= 0; + fetch_limit= HA_POS_ERROR; examined_rows= 0; exec_tmp_table1= 0; exec_tmp_table2= 0; @@ -319,6 +334,44 @@ class JOIN :public Sql_alloc }; +/* + Server-side cursor (now stands only for basic read-only cursor) + See class implementation in sql_select.cc +*/ + +class Cursor: public Sql_alloc, public Item_arena +{ + JOIN *join; + SELECT_LEX_UNIT *unit; + + TABLE *open_tables; + MYSQL_LOCK *lock; + TABLE *derived_tables; + /* List of items created during execution */ + ulong query_id; +public: + select_send result; + + /* Temporary implementation as now we replace THD state by value */ + /* Save THD state into cursor */ + void init_from_thd(THD *thd); + /* Restore THD from cursor to continue cursor execution */ + void init_thd(THD *thd); + /* bzero cursor state in THD */ + void reset_thd(THD *thd); + + int open(JOIN *join); + int fetch(ulong num_rows); + void reset() { join= 0; } + bool is_open() const { return join != 0; } + void close(); + + void set_unit(SELECT_LEX_UNIT *unit_arg) { unit= unit_arg; } + Cursor() :join(0), unit(0) {} + ~Cursor(); +}; + + typedef struct st_select_check { uint const_ref,reg_ref; } SELECT_CHECK; diff --git a/sql/sql_show.cc b/sql/sql_show.cc index 5064a95d468..c11207eac24 100644 --- a/sql/sql_show.cc +++ b/sql/sql_show.cc @@ -66,7 +66,8 @@ mysqld_show_dbs(THD *thd,const char *wild) strxmov(end," (",wild,")",NullS); field_list.push_back(field); - if (protocol->send_fields(&field_list,1)) + if (protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(1); if (mysql_find_files(thd,&files,NullS,mysql_data_home,wild,1)) DBUG_RETURN(1); @@ -107,7 +108,8 @@ int mysqld_show_open_tables(THD *thd,const char *wild) field_list.push_back(new Item_return_int("In_use", 1, MYSQL_TYPE_TINY)); field_list.push_back(new Item_return_int("Name_locked", 4, MYSQL_TYPE_TINY)); - if (protocol->send_fields(&field_list,1)) + if (protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(1); if (!(open_list=list_open_tables(thd,wild)) && thd->is_fatal_error) @@ -160,7 +162,8 @@ int mysqld_show_tables(THD *thd,const char *db,const char *wild) field_list.push_back(field); if (show_type) field_list.push_back(new Item_empty_string("table_type", 10)); - if (protocol->send_fields(&field_list,1)) + if (protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(1); if (mysql_find_files(thd,&files,db,path,wild,0)) DBUG_RETURN(-1); @@ -208,7 +211,8 @@ int mysqld_show_storage_engines(THD *thd) field_list.push_back(new Item_empty_string("Support",10)); field_list.push_back(new Item_empty_string("Comment",80)); - if (protocol->send_fields(&field_list,1)) + if (protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(1); const char *default_type_name= @@ -282,7 +286,8 @@ int mysqld_show_privileges(THD *thd) field_list.push_back(new Item_empty_string("Context",15)); field_list.push_back(new Item_empty_string("Comment",NAME_LEN)); - if (protocol->send_fields(&field_list,1)) + if (protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(1); show_privileges_st *privilege= sys_privileges; @@ -357,7 +362,8 @@ int mysqld_show_column_types(THD *thd) field_list.push_back(new Item_empty_string("Default",NAME_LEN)); field_list.push_back(new Item_empty_string("Comment",NAME_LEN)); - if (protocol->send_fields(&field_list,1)) + if (protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(1); /* TODO: Change the loop to not use 'i' */ @@ -530,7 +536,8 @@ int mysqld_extend_show_tables(THD *thd,const char *db,const char *wild) item->maybe_null=1; field_list.push_back(item=new Item_empty_string("Comment",80)); item->maybe_null=1; - if (protocol->send_fields(&field_list,1)) + if (protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(1); if (mysql_find_files(thd,&files,db,path,wild,0)) @@ -721,7 +728,7 @@ mysqld_show_fields(THD *thd, TABLE_LIST *table_list,const char *wild, } // Send first number of fields and records if (protocol->send_records_num(&field_list, (ulonglong)file->records) || - protocol->send_fields(&field_list,0)) + protocol->send_fields(&field_list, Protocol::SEND_EOF)) DBUG_RETURN(1); restore_record(table,default_values); // Get empty record @@ -859,7 +866,8 @@ mysqld_show_create(THD *thd, TABLE_LIST *table_list) field_list.push_back(new Item_empty_string("Create Table", max(buffer.length(),1024))); - if (protocol->send_fields(&field_list, 1)) + if (protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(1); protocol->prepare_for_resend(); buffer.length(0); @@ -943,7 +951,8 @@ int mysqld_show_create_db(THD *thd, char *dbname, field_list.push_back(new Item_empty_string("Database",NAME_LEN)); field_list.push_back(new Item_empty_string("Create Database",1024)); - if (protocol->send_fields(&field_list,1)) + if (protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(1); protocol->prepare_for_resend(); @@ -985,7 +994,8 @@ mysqld_show_logs(THD *thd) field_list.push_back(new Item_empty_string("Type",10)); field_list.push_back(new Item_empty_string("Status",10)); - if (protocol->send_fields(&field_list,1)) + if (protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(1); #ifdef HAVE_BERKELEY_DB @@ -1034,7 +1044,8 @@ mysqld_show_keys(THD *thd, TABLE_LIST *table_list) field_list.push_back(new Item_empty_string("Comment",255)); item->maybe_null=1; - if (protocol->send_fields(&field_list,1)) + if (protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(1); KEY *key_info=table->key_info; @@ -1121,7 +1132,8 @@ mysqld_list_fields(THD *thd, TABLE_LIST *table_list, const char *wild) field_list.push_back(new Item_field(field)); } restore_record(table,default_values); // Get empty record - if (thd->protocol->send_fields(&field_list,2)) + if (thd->protocol->send_fields(&field_list, Protocol::SEND_DEFAULTS | + Protocol::SEND_EOF)) DBUG_VOID_RETURN; net_flush(&thd->net); DBUG_VOID_RETURN; @@ -1615,7 +1627,8 @@ void mysqld_list_processes(THD *thd,const char *user, bool verbose) field->maybe_null=1; field_list.push_back(field=new Item_empty_string("Info",max_query_length)); field->maybe_null=1; - if (protocol->send_fields(&field_list,1)) + if (protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_VOID_RETURN; VOID(pthread_mutex_lock(&LOCK_thread_count)); // For unlink from list @@ -1751,7 +1764,8 @@ int mysqld_show_collations(THD *thd, const char *wild) field_list.push_back(new Item_empty_string("Compiled",30)); field_list.push_back(new Item_return_int("Sortlen",3, FIELD_TYPE_SHORT)); - if (protocol->send_fields(&field_list, 1)) + if (protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(1); for ( cs= all_charsets ; cs < all_charsets+255 ; cs++ ) @@ -1804,7 +1818,8 @@ int mysqld_show_charsets(THD *thd, const char *wild) field_list.push_back(new Item_empty_string("Default collation",60)); field_list.push_back(new Item_return_int("Maxlen",3, FIELD_TYPE_SHORT)); - if (protocol->send_fields(&field_list, 1)) + if (protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(1); for ( cs= all_charsets ; cs < all_charsets+255 ; cs++ ) @@ -1838,7 +1853,8 @@ int mysqld_show(THD *thd, const char *wild, show_var_st *variables, field_list.push_back(new Item_empty_string("Variable_name",30)); field_list.push_back(new Item_empty_string("Value",256)); - if (protocol->send_fields(&field_list,1)) + if (protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(1); /* purecov: inspected */ null_lex_str.str= 0; // For sys_var->value_ptr() null_lex_str.length= 0; diff --git a/sql/sql_table.cc b/sql/sql_table.cc index ea57536d7c1..93fb7930da7 100644 --- a/sql/sql_table.cc +++ b/sql/sql_table.cc @@ -1738,7 +1738,8 @@ static int mysql_admin_table(THD* thd, TABLE_LIST* tables, item->maybe_null = 1; field_list.push_back(item = new Item_empty_string("Msg_text", 255)); item->maybe_null = 1; - if (protocol->send_fields(&field_list, 1)) + if (protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(-1); mysql_ha_close(thd, tables, /*dont_send_ok*/ 1, /*dont_lock*/ 1); @@ -3449,7 +3450,8 @@ int mysql_checksum_table(THD *thd, TABLE_LIST *tables, HA_CHECK_OPT *check_opt) item->maybe_null= 1; field_list.push_back(item=new Item_int("Checksum",(longlong) 1,21)); item->maybe_null= 1; - if (protocol->send_fields(&field_list, 1)) + if (protocol->send_fields(&field_list, + Protocol::SEND_NUM_ROWS | Protocol::SEND_EOF)) DBUG_RETURN(-1); for (table= tables; table; table= table->next_local) diff --git a/sql/sql_union.cc b/sql/sql_union.cc index 35f8a390308..d6b776571f2 100644 --- a/sql/sql_union.cc +++ b/sql/sql_union.cc @@ -31,7 +31,13 @@ int mysql_union(THD *thd, LEX *lex, select_result *result, int res, res_cln; if (!(res= unit->prepare(thd, result, SELECT_NO_UNLOCK))) res= unit->exec(); - res_cln= unit->cleanup(); + if (res == 0 && thd->cursor && thd->cursor->is_open()) + { + thd->cursor->set_unit(unit); + res_cln= 0; + } + else + res_cln= unit->cleanup(); DBUG_RETURN(res?res:res_cln); } diff --git a/tests/Makefile.am b/tests/Makefile.am index 5d0e4627b69..0a86b330f18 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -35,7 +35,7 @@ INCLUDES = -I$(top_srcdir)/include $(openssl_includes) LIBS = @CLIENT_LIBS@ LDADD = @CLIENT_EXTRA_LDFLAGS@ ../libmysql/libmysqlclient.la client_test_LDADD= $(LDADD) $(CXXLDFLAGS) -client_test_SOURCES= client_test.c +client_test_SOURCES= client_test.cc insert_test_DEPENDENCIES= $(LIBRARIES) $(pkglib_LTLIBRARIES) select_test_DEPENDENCIES= $(LIBRARIES) $(pkglib_LTLIBRARIES) diff --git a/tests/client_test.c b/tests/client_test.cc similarity index 97% rename from tests/client_test.c rename to tests/client_test.cc index 13f5a3ac852..36adbc381e1 100644 --- a/tests/client_test.c +++ b/tests/client_test.cc @@ -693,6 +693,240 @@ static void client_use_result() } +/* + Accepts arbitrary number of queries and runs them against the database. + Used to fill tables for each test. +*/ + +void fill_tables(const char **query_list, unsigned query_count) +{ + int rc; + for (const char **query= query_list; query < query_list + query_count; + ++query) + { + rc= mysql_query(mysql, *query); + if (rc) + { + fprintf(stderr, + "fill_tables failed: query is\n" + "%s,\n" + "error: %s\n", *query, mysql_error(mysql)); + exit(1); + } + } +} + +/* + All state of fetch from one statement: statement handle, out buffers, + fetch position. + See fetch_n for for the only use case. +*/ + +struct Stmt_fetch +{ + enum { MAX_COLUMN_LENGTH= 255 }; + + Stmt_fetch() {} + ~Stmt_fetch(); + + void init(unsigned stmt_no_arg, const char *query_arg); + int fetch_row(); + + const char *query; + unsigned stmt_no; + MYSQL_STMT *stmt; + bool is_open; + MYSQL_BIND *bind_array; + char **out_data; + unsigned long *out_data_length; + unsigned column_count; + unsigned row_count; +}; + +/* + Create statement handle, prepare it with statement, execute and allocate + fetch buffers. +*/ + +void Stmt_fetch::init(unsigned stmt_no_arg, const char *query_arg) +{ + unsigned long type= CURSOR_TYPE_READ_ONLY; + int rc; + unsigned i; + MYSQL_RES *metadata; + + /* Save query and statement number for error messages */ + stmt_no= stmt_no_arg; + query= query_arg; + + stmt= mysql_stmt_init(mysql); + + rc= mysql_stmt_prepare(stmt, query, strlen(query)); + if (rc) + { + fprintf(stderr, + "mysql_stmt_prepare of stmt %d failed:\n" + "query: %s\n" + "error: %s\n", + stmt_no, query, mysql_stmt_error(stmt)); + exit(1); + } + + /* + The attribute is sent to server on execute and asks to open read-only + for result set + */ + mysql_stmt_attr_set(stmt, STMT_ATTR_CURSOR_TYPE, (const void *) &type); + + rc= mysql_stmt_execute(stmt); + if (rc) + { + fprintf(stderr, + "mysql_stmt_execute of stmt %d failed:\n" + "query: %s\n" + "error: %s\n", + stmt_no, query, mysql_stmt_error(stmt)); + exit(1); + } + + /* Find out total number of columns in result set */ + metadata= mysql_stmt_result_metadata(stmt); + column_count= mysql_num_fields(metadata); + mysql_free_result(metadata); + + /* + Now allocate bind handles and buffers for output data: + calloc memory to reduce number of MYSQL_BIND members we need to + set up. + */ + + bind_array= (MYSQL_BIND *) calloc(1, sizeof(MYSQL_BIND) * column_count); + out_data= (char **) calloc(1, sizeof(*out_data) * column_count); + out_data_length= (unsigned long *) calloc(1, + sizeof(*out_data_length) * column_count); + + for (i= 0; i < column_count; ++i) + { + out_data[i]= (char *) calloc(1, MAX_COLUMN_LENGTH); + bind_array[i].buffer_type= MYSQL_TYPE_STRING; + bind_array[i].buffer= out_data[i]; + bind_array[i].buffer_length= MAX_COLUMN_LENGTH; + bind_array[i].length= out_data_length + i; + } + + mysql_stmt_bind_result(stmt, bind_array); + + row_count= 0; + is_open= true; + + /* Ready for reading rows */ +} + + +/* Fetch and print one row from cursor */ + +int Stmt_fetch::fetch_row() +{ + int rc; + unsigned i; + + if ((rc= mysql_stmt_fetch(stmt)) == 0) + { + ++row_count; + printf("Stmt %d fetched row %d:\n", stmt_no, row_count); + for (i= 0; i < column_count; ++i) + { + out_data[i][out_data_length[i]]= '\0'; + printf("column %d: %s\n", i+1, out_data[i]); + } + } + else + is_open= false; + return rc; +} + + +Stmt_fetch::~Stmt_fetch() +{ + unsigned i; + + for (i= 0; i < column_count; ++i) + free(out_data[i]); + free(out_data); + free(bind_array); + mysql_stmt_close(stmt); +} + +/* We need these to compile without libstdc++ */ + +void *operator new[] (size_t sz) +{ + return (void *) malloc (sz ? sz : 1); +} + +void operator delete[] (void *ptr) throw () +{ + if (ptr) + free(ptr); +} + +/* + For given array of queries, open query_count cursors and fetch + from them in simultaneous manner. + In case there was an error in one of the cursors, continue + reading from the rest. +*/ + +bool fetch_n(const char **query_list, unsigned query_count) +{ + unsigned open_statements= query_count; + unsigned i; + int rc, error_count= 0; + Stmt_fetch *stmt_array= new Stmt_fetch[query_count]; + Stmt_fetch *stmt; + + for (i= 0; i < query_count; ++i) + { + /* Init will exit(1) in case of error */ + stmt_array[i].init(i, query_list[i]); + } + + while (open_statements) + { + for (stmt= stmt_array; stmt < stmt_array + query_count; ++stmt) + { + if (stmt->is_open && (rc= stmt->fetch_row())) + { + --open_statements; + /* + We try to fetch from the rest of the statements in case of + error + */ + if (rc != MYSQL_NO_DATA) + { + fprintf(stderr, + "Got error reading rows from statement %d,\n" + "query is: %s,\n" + "error message: %s", stmt - stmt_array, stmt->query, + mysql_stmt_error(stmt->stmt)); + ++error_count; + } + } + } + } + if (error_count) + fprintf(stderr, "Fetch FAILED"); + else + { + unsigned total_row_count= 0; + for (stmt= stmt_array; stmt < stmt_array + query_count; ++stmt) + total_row_count+= stmt->row_count; + printf("Success, total rows fetched: %d\n", total_row_count); + } + delete [] stmt_array; + return error_count != 0; +} + /* Separate thread query to test some cases */ static my_bool thread_query(char *query) @@ -10043,7 +10277,7 @@ static void test_view() int rc, i; MYSQL_BIND bind[1]; char str_data[50]; - long length = 0L; + ulong length = 0L; long is_null = 0L; const char *query= "SELECT COUNT(*) FROM v1 WHERE `SERVERNAME`=?"; @@ -10141,7 +10375,7 @@ static void test_view_2where() int rc, i; MYSQL_BIND bind[8]; char parms[8][100]; - long length[8]; + ulong length[8]; const char *query= "SELECT `RELID` ,`REPORT` ,`HANDLE` ,`LOG_GROUP` ,`USERNAME` ,`VARIANT` ,`TYPE` ,`VERSION` ,`ERFDAT` ,`ERFTIME` ,`ERFNAME` ,`AEDAT` ,`AETIME` ,`AENAME` ,`DEPENDVARS` ,`INACTIVE` FROM `V_LTDX` WHERE `MANDT` = ? AND `RELID` = ? AND `REPORT` = ? AND `HANDLE` = ? AND `LOG_GROUP` = ? AND `USERNAME` IN ( ? , ? ) AND `TYPE` = ?"; myheader("test_view_2where"); @@ -10189,7 +10423,7 @@ static void test_view_star() int rc, i; MYSQL_BIND bind[8]; char parms[8][100]; - long length[8]; + ulong length[8]; const char *query= "SELECT * FROM vt1 WHERE a IN (?,?)"; myheader("test_view_star"); @@ -10338,7 +10572,7 @@ static void test_view_insert_fields() { MYSQL_STMT *stmt; char parm[11][1000]; - long l[11]; + ulong l[11]; int rc, i; MYSQL_BIND bind[11]; const char *query= "INSERT INTO `v1` ( `K1C4` ,`K2C4` ,`K3C4` ,`K4N4` ,`F1C4` ,`F2I4` ,`F3N5` ,`F7F8` ,`F6N4` ,`F5C8` ,`F9D8` ) VALUES( ? , ? , ? , ? , ? , ? , ? , ? , ? , ? , ? )"; @@ -10390,6 +10624,61 @@ static void test_view_insert_fields() } + +static void test_basic_cursors() +{ + myheader("test_basic_cursors"); + const char *basic_tables[]= + { + "DROP TABLE IF EXISTS t1, t2", + + "CREATE TABLE t1 " + "(id INTEGER NOT NULL PRIMARY KEY, " + " name VARCHAR(20) NOT NULL)", + + "INSERT INTO t1 (id, name) VALUES " + " (2, 'Ja'), (3, 'Ede'), " + " (4, 'Haag'), (5, 'Kabul'), " + " (6, 'Almere'), (7, 'Utrecht'), " + " (8, 'Qandahar'), (9, 'Amsterdam'), " + " (10, 'Amersfoort'), (11, 'Constantine')", + + "CREATE TABLE t2 " + "(id INTEGER NOT NULL PRIMARY KEY, " + " name VARCHAR(20) NOT NULL)", + + "INSERT INTO t2 (id, name) VALUES " + " (4, 'Guam'), (5, 'Aruba'), " + " (6, 'Angola'), (7, 'Albania'), " + " (8, 'Anguilla'), (9, 'Argentina'), " + " (10, 'Azerbaijan'), (11, 'Afghanistan'), " + " (12, 'Burkina Faso'), (13, 'Faroe Islands')" + }; + + fill_tables(basic_tables, sizeof(basic_tables)/sizeof(*basic_tables)); + + const char *queries[]= + { + "SELECT * FROM t1", + "SELECT * FROM t2" + }; + + fetch_n(queries, sizeof(queries)/sizeof(*queries)); +} + + +static void test_cursors_with_union() +{ + myheader("test_cursors_with_union"); + + const char *queries[]= + { + "SELECT t1.name FROM t1 UNION SELECT t2.name FROM t2", + "SELECT t1.id FROM t1 WHERE t1.id < 5" + }; + fetch_n(queries, sizeof(queries)/sizeof(*queries)); +} + /* Read and parse arguments and MySQL options from my.cnf */ @@ -10694,6 +10983,8 @@ int main(int argc, char **argv) test_view_insert(); /* inserting in VIEW without field list */ test_left_join_view(); /* left join on VIEW with WHERE condition */ test_view_insert_fields(); /* insert into VIOEW with fields list */ + test_basic_cursors(); + test_cursors_with_union(); /* XXX: PLEASE RUN THIS PROGRAM UNDER VALGRIND AND VERIFY THAT YOUR TEST DOESN'T CONTAIN WARNINGS/ERRORS BEFORE YOU PUSH.