update to ease the patch process

This commit is contained in:
andrey@lmy004. 2006-07-13 16:24:55 +02:00
parent 9eee0ee47a
commit 3f6de6c523
7 changed files with 1530 additions and 425 deletions

1103
.bzrignore

File diff suppressed because it is too large Load Diff

View File

@ -65,8 +65,8 @@ noinst_HEADERS = item.h item_func.h item_sum.h item_cmpfunc.h \
sp_head.h sp_pcontext.h sp_rcontext.h sp.h sp_cache.h \ sp_head.h sp_pcontext.h sp_rcontext.h sp.h sp_cache.h \
parse_file.h sql_view.h sql_trigger.h \ parse_file.h sql_view.h sql_trigger.h \
sql_array.h sql_cursor.h events.h \ sql_array.h sql_cursor.h events.h \
event_db_repository.h event_queue.h \
sql_plugin.h authors.h sql_partition.h event_data_objects.h \ sql_plugin.h authors.h sql_partition.h event_data_objects.h \
event_queue.h event_db_repository.h \
partition_info.h partition_element.h event_scheduler.h \ partition_info.h partition_element.h event_scheduler.h \
contributors.h contributors.h
mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \ mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \
@ -104,8 +104,8 @@ mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \
gstream.cc spatial.cc sql_help.cc sql_cursor.cc \ gstream.cc spatial.cc sql_help.cc sql_cursor.cc \
tztime.cc my_time.c my_user.c my_decimal.cc\ tztime.cc my_time.c my_user.c my_decimal.cc\
sp_head.cc sp_pcontext.cc sp_rcontext.cc sp.cc \ sp_head.cc sp_pcontext.cc sp_rcontext.cc sp.cc \
sp_cache.cc parse_file.cc sql_trigger.cc event_scheduler.cc\ sp_cache.cc parse_file.cc sql_trigger.cc \
events.cc event_data_objects.cc \ event_scheduler.cc events.cc event_data_objects.cc \
event_queue.cc event_db_repository.cc \ event_queue.cc event_db_repository.cc \
sql_plugin.cc sql_binlog.cc \ sql_plugin.cc sql_binlog.cc \
sql_builtin.cc sql_tablespace.cc partition_info.cc sql_builtin.cc sql_tablespace.cc partition_info.cc

View File

@ -24,65 +24,12 @@
#define EVEX_MAX_INTERVAL_VALUE 1000000000L #define EVEX_MAX_INTERVAL_VALUE 1000000000L
/*
Switches the security context
SYNOPSIS
event_change_security_context()
thd Thread
user The user
host The host of the user
db The schema for which the security_ctx will be loaded
backup Where to store the old context
RETURN VALUE
FALSE OK
TRUE Error (generates error too)
*/
static bool static bool
event_change_security_context(THD *thd, LEX_STRING user, LEX_STRING host, event_change_security_context(THD *thd, LEX_STRING user, LEX_STRING host,
LEX_STRING db, Security_context *backup) LEX_STRING db, Security_context *backup);
{
DBUG_ENTER("event_change_security_context");
DBUG_PRINT("info",("%s@%s@%s", user.str, host.str, db.str));
#ifndef NO_EMBEDDED_ACCESS_CHECKS
*backup= thd->main_security_ctx;
if (acl_getroot_no_password(&thd->main_security_ctx, user.str, host.str,
host.str, db.str))
{
my_error(ER_NO_SUCH_USER, MYF(0), user.str, host.str);
DBUG_RETURN(TRUE);
}
thd->security_ctx= &thd->main_security_ctx;
#endif
DBUG_RETURN(FALSE);
}
/*
Restores the security context
SYNOPSIS
event_restore_security_context()
thd Thread
backup Context to switch to
*/
static void static void
event_restore_security_context(THD *thd, Security_context *backup) event_restore_security_context(THD *thd, Security_context *backup);
{
DBUG_ENTER("event_restore_security_context");
#ifndef NO_EMBEDDED_ACCESS_CHECKS
if (backup)
{
thd->main_security_ctx= *backup;
thd->security_ctx= &thd->main_security_ctx;
}
#endif
DBUG_VOID_RETURN;
}
/* /*
Returns a new instance Returns a new instance
@ -236,47 +183,6 @@ Event_parse_data::init_body(THD *thd)
} }
/*
Inits definer (definer_user and definer_host) during parsing.
SYNOPSIS
Event_parse_data::init_definer()
thd Thread
*/
void
Event_parse_data::init_definer(THD *thd)
{
int definer_user_len;
int definer_host_len;
DBUG_ENTER("Event_parse_data::init_definer");
DBUG_PRINT("info",("init definer_user thd->mem_root=0x%lx "
"thd->sec_ctx->priv_user=0x%lx", thd->mem_root,
thd->security_ctx->priv_user));
definer_user_len= strlen(thd->security_ctx->priv_user);
definer_host_len= strlen(thd->security_ctx->priv_host);
/* + 1 for @ */
DBUG_PRINT("info",("init definer as whole"));
definer.length= definer_user_len + definer_host_len + 1;
definer.str= thd->alloc(definer.length + 1);
DBUG_PRINT("info",("copy the user"));
memcpy(definer.str, thd->security_ctx->priv_user, definer_user_len);
definer.str[definer_user_len]= '@';
DBUG_PRINT("info",("copy the host"));
memcpy(definer.str + definer_user_len + 1, thd->security_ctx->priv_host,
definer_host_len);
definer.str[definer.length]= '\0';
DBUG_PRINT("info",("definer [%s] initted", definer.str));
DBUG_VOID_RETURN;
}
/* /*
Sets time for execution for one-time event. Sets time for execution for one-time event.
@ -645,6 +551,47 @@ Event_parse_data::check_parse_data(THD *thd)
} }
/*
Inits definer (definer_user and definer_host) during parsing.
SYNOPSIS
Event_parse_data::init_definer()
thd Thread
*/
void
Event_parse_data::init_definer(THD *thd)
{
int definer_user_len;
int definer_host_len;
DBUG_ENTER("Event_parse_data::init_definer");
DBUG_PRINT("info",("init definer_user thd->mem_root=0x%lx "
"thd->sec_ctx->priv_user=0x%lx", thd->mem_root,
thd->security_ctx->priv_user));
definer_user_len= strlen(thd->security_ctx->priv_user);
definer_host_len= strlen(thd->security_ctx->priv_host);
/* + 1 for @ */
DBUG_PRINT("info",("init definer as whole"));
definer.length= definer_user_len + definer_host_len + 1;
definer.str= thd->alloc(definer.length + 1);
DBUG_PRINT("info",("copy the user"));
memcpy(definer.str, thd->security_ctx->priv_user, definer_user_len);
definer.str[definer_user_len]= '@';
DBUG_PRINT("info",("copy the host"));
memcpy(definer.str + definer_user_len + 1, thd->security_ctx->priv_host,
definer_host_len);
definer.str[definer.length]= '\0';
DBUG_PRINT("info",("definer [%s] initted", definer.str));
DBUG_VOID_RETURN;
}
/* /*
Constructor Constructor
@ -1667,6 +1614,69 @@ Event_job_data::get_fake_create_event(THD *thd, String *buf)
} }
/*
Executes the event (the underlying sp_head object);
SYNOPSIS
Event_job_data::execute()
thd THD
RETURN VALUE
0 success
-99 No rights on this.dbname.str
others retcodes of sp_head::execute_procedure()
*/
int
Event_job_data::execute(THD *thd)
{
Security_context save_ctx;
/* this one is local and not needed after exec */
int ret= 0;
DBUG_ENTER("Event_job_data::execute");
DBUG_PRINT("info", ("EXECUTING %s.%s", dbname.str, name.str));
if ((ret= compile(thd, NULL)))
goto done;
event_change_security_context(thd, definer_user, definer_host, dbname,
&save_ctx);
/*
THD::~THD will clean this or if there is DROP DATABASE in the SP then
it will be free there. It should not point to our buffer which is allocated
on a mem_root.
*/
thd->db= my_strdup(dbname.str, MYF(0));
thd->db_length= dbname.length;
if (!check_access(thd, EVENT_ACL,dbname.str, 0, 0, 0,is_schema_db(dbname.str)))
{
List<Item> empty_item_list;
empty_item_list.empty();
if (thd->enable_slow_log)
sphead->m_flags|= sp_head::LOG_SLOW_STATEMENTS;
sphead->m_flags|= sp_head::LOG_GENERAL_LOG;
ret= sphead->execute_procedure(thd, &empty_item_list);
}
else
{
DBUG_PRINT("error", ("%s@%s has no rights on %s", definer_user.str,
definer_host.str, dbname.str));
ret= -99;
}
event_restore_security_context(thd, &save_ctx);
done:
thd->end_statement();
thd->cleanup_after_query();
DBUG_PRINT("info", ("EXECUTED %s.%s ret=%d", dbname.str, name.str, ret));
DBUG_RETURN(ret);
}
/* /*
Compiles an event before it's execution. Compiles the anonymous Compiles an event before it's execution. Compiles the anonymous
sp_head object held by the event sp_head object held by the event
@ -1799,69 +1809,6 @@ done:
} }
/*
Executes the event (the underlying sp_head object);
SYNOPSIS
Event_job_data::execute()
thd THD
RETURN VALUE
0 success
-99 No rights on this.dbname.str
others retcodes of sp_head::execute_procedure()
*/
int
Event_job_data::execute(THD *thd)
{
Security_context save_ctx;
/* this one is local and not needed after exec */
int ret= 0;
DBUG_ENTER("Event_job_data::execute");
DBUG_PRINT("info", ("EXECUTING %s.%s", dbname.str, name.str));
if ((ret= compile(thd, NULL)))
goto done;
event_change_security_context(thd, definer_user, definer_host, dbname,
&save_ctx);
/*
THD::~THD will clean this or if there is DROP DATABASE in the SP then
it will be free there. It should not point to our buffer which is allocated
on a mem_root.
*/
thd->db= my_strdup(dbname.str, MYF(0));
thd->db_length= dbname.length;
if (!check_access(thd, EVENT_ACL,dbname.str, 0, 0, 0,is_schema_db(dbname.str)))
{
List<Item> empty_item_list;
empty_item_list.empty();
if (thd->enable_slow_log)
sphead->m_flags|= sp_head::LOG_SLOW_STATEMENTS;
sphead->m_flags|= sp_head::LOG_GENERAL_LOG;
ret= sphead->execute_procedure(thd, &empty_item_list);
}
else
{
DBUG_PRINT("error", ("%s@%s has no rights on %s", definer_user.str,
definer_host.str, dbname.str));
ret= -99;
}
event_restore_security_context(thd, &save_ctx);
done:
thd->end_statement();
thd->cleanup_after_query();
DBUG_PRINT("info", ("EXECUTED %s.%s ret=%d", dbname.str, name.str, ret));
DBUG_RETURN(ret);
}
/* /*
Checks whether two events are in the same schema Checks whether two events are in the same schema
@ -1899,3 +1846,62 @@ event_basic_identifier_equal(LEX_STRING db, LEX_STRING name, Event_basic *b)
return !sortcmp_lex_string(name, b->name, system_charset_info) && return !sortcmp_lex_string(name, b->name, system_charset_info) &&
!sortcmp_lex_string(db, b->dbname, system_charset_info); !sortcmp_lex_string(db, b->dbname, system_charset_info);
} }
/*
Switches the security context
SYNOPSIS
event_change_security_context()
thd Thread
user The user
host The host of the user
db The schema for which the security_ctx will be loaded
backup Where to store the old context
RETURN VALUE
FALSE OK
TRUE Error (generates error too)
*/
static bool
event_change_security_context(THD *thd, LEX_STRING user, LEX_STRING host,
LEX_STRING db, Security_context *backup)
{
DBUG_ENTER("event_change_security_context");
DBUG_PRINT("info",("%s@%s@%s", user.str, host.str, db.str));
#ifndef NO_EMBEDDED_ACCESS_CHECKS
*backup= thd->main_security_ctx;
if (acl_getroot_no_password(&thd->main_security_ctx, user.str, host.str,
host.str, db.str))
{
my_error(ER_NO_SUCH_USER, MYF(0), user.str, host.str);
DBUG_RETURN(TRUE);
}
thd->security_ctx= &thd->main_security_ctx;
#endif
DBUG_RETURN(FALSE);
}
/*
Restores the security context
SYNOPSIS
event_restore_security_context()
thd Thread
backup Context to switch to
*/
static void
event_restore_security_context(THD *thd, Security_context *backup)
{
DBUG_ENTER("event_restore_security_context");
#ifndef NO_EMBEDDED_ACCESS_CHECKS
if (backup)
{
thd->main_security_ctx= *backup;
thd->security_ctx= &thd->main_security_ctx;
}
#endif
DBUG_VOID_RETURN;
}

View File

@ -41,6 +41,7 @@ struct event_queue_param
Event_queue *queue; Event_queue *queue;
pthread_mutex_t LOCK_loaded; pthread_mutex_t LOCK_loaded;
pthread_cond_t COND_loaded; pthread_cond_t COND_loaded;
bool loading_finished;
}; };
@ -85,9 +86,14 @@ event_queue_loader_thread(void *arg)
DBUG_ENTER("event_queue_loader_thread"); DBUG_ENTER("event_queue_loader_thread");
pthread_mutex_lock(&param->LOCK_loaded); pthread_mutex_lock(&param->LOCK_loaded);
param->queue->check_system_tables(thd);
param->queue->load_events_from_db(thd); param->queue->load_events_from_db(thd);
param->loading_finished= TRUE;
pthread_cond_signal(&param->COND_loaded); pthread_cond_signal(&param->COND_loaded);
pthread_mutex_unlock(&param->LOCK_loaded); pthread_mutex_unlock(&param->LOCK_loaded);
end: end:
@ -113,8 +119,6 @@ Event_queue::Event_queue()
mutex_last_attempted_lock_in_func= ""; mutex_last_attempted_lock_in_func= "";
mutex_queue_data_locked= mutex_queue_data_attempting_lock= FALSE; mutex_queue_data_locked= mutex_queue_data_attempting_lock= FALSE;
queue_loaded= FALSE;
} }
@ -195,8 +199,10 @@ Event_queue::init_queue(Event_db_repository *db_repo, Event_scheduler *sched)
event_queue_param_value= (struct event_queue_param *) event_queue_param_value= (struct event_queue_param *)
my_malloc(sizeof(struct event_queue_param), MYF(0)); my_malloc(sizeof(struct event_queue_param), MYF(0));
event_queue_param_value->thd= new_thd; event_queue_param_value->thd= new_thd;
event_queue_param_value->queue= this; event_queue_param_value->queue= this;
event_queue_param_value->loading_finished= FALSE;
pthread_mutex_init(&event_queue_param_value->LOCK_loaded, MY_MUTEX_INIT_FAST); pthread_mutex_init(&event_queue_param_value->LOCK_loaded, MY_MUTEX_INIT_FAST);
pthread_cond_init(&event_queue_param_value->COND_loaded, NULL); pthread_cond_init(&event_queue_param_value->COND_loaded, NULL);
@ -207,8 +213,8 @@ Event_queue::init_queue(Event_db_repository *db_repo, Event_scheduler *sched)
{ {
do { do {
pthread_cond_wait(&event_queue_param_value->COND_loaded, pthread_cond_wait(&event_queue_param_value->COND_loaded,
&event_queue_param_value->LOCK_loaded); &event_queue_param_value->LOCK_loaded);
} while (queue_loaded == FALSE); } while (event_queue_param_value->loading_finished == FALSE);
} }
pthread_mutex_unlock(&event_queue_param_value->LOCK_loaded); pthread_mutex_unlock(&event_queue_param_value->LOCK_loaded);
@ -662,8 +668,6 @@ end:
close_thread_tables(thd); close_thread_tables(thd);
queue_loaded= TRUE;
DBUG_PRINT("info", ("Status code %d. Loaded %d event(s)", ret, count)); DBUG_PRINT("info", ("Status code %d. Loaded %d event(s)", ret, count));
DBUG_RETURN(ret); DBUG_RETURN(ret);
} }
@ -683,7 +687,7 @@ end:
TRUE Error TRUE Error
*/ */
bool void
Event_queue::check_system_tables(THD *thd) Event_queue::check_system_tables(THD *thd)
{ {
TABLE_LIST tables; TABLE_LIST tables;
@ -702,39 +706,35 @@ Event_queue::check_system_tables(THD *thd)
tables.lock_type= TL_READ; tables.lock_type= TL_READ;
if ((ret= simple_open_n_lock_tables(thd, &tables))) if ((ret= simple_open_n_lock_tables(thd, &tables)))
sql_print_error("Cannot open mysql.db");
else
{ {
ret= table_check_intact(tables.table, MYSQL_DB_FIELD_COUNT, sql_print_error("Cannot open mysql.db");
mysql_db_table_fields, &mysql_db_table_last_check, goto end;
ER_CANNOT_LOAD_FROM_TABLE);
close_thread_tables(thd);
} }
if (ret) ret= table_check_intact(tables.table, MYSQL_DB_FIELD_COUNT,
DBUG_RETURN(TRUE); mysql_db_table_fields, &mysql_db_table_last_check,
ER_CANNOT_LOAD_FROM_TABLE);
close_thread_tables(thd);
bzero((char*) &tables, sizeof(tables)); bzero((char*) &tables, sizeof(tables));
tables.db= (char*) "mysql"; tables.db= (char*) "mysql";
tables.table_name= tables.alias= (char*) "user"; tables.table_name= tables.alias= (char*) "user";
tables.lock_type= TL_READ; tables.lock_type= TL_READ;
if ((ret= simple_open_n_lock_tables(thd, &tables))) if (simple_open_n_lock_tables(thd, &tables))
sql_print_error("Cannot open mysql.db"); sql_print_error("Cannot open mysql.db");
else else
{ {
if (tables.table->s->fields < 29 || if (tables.table->s->fields < 29 ||
strncmp(tables.table->field[29]->field_name, strncmp(tables.table->field[29]->field_name,
STRING_WITH_LEN("Event_priv"))) STRING_WITH_LEN("Event_priv")))
{
sql_print_error("mysql.user has no `Event_priv` column at position 29"); sql_print_error("mysql.user has no `Event_priv` column at position 29");
ret= TRUE;
}
close_thread_tables(thd); close_thread_tables(thd);
} }
end:
thd->restore_backup_open_tables_state(&backup); thd->restore_backup_open_tables_state(&backup);
DBUG_RETURN(ret); DBUG_VOID_RETURN;
} }

View File

@ -56,7 +56,7 @@ public:
void void
drop_schema_events(THD *thd, LEX_STRING schema); drop_schema_events(THD *thd, LEX_STRING schema);
static bool void
check_system_tables(THD *thd); check_system_tables(THD *thd);
void void
@ -99,8 +99,6 @@ protected:
/* The sorted queue with the Event_job_data objects */ /* The sorted queue with the Event_job_data objects */
QUEUE queue; QUEUE queue;
bool queue_loaded;
uint mutex_last_locked_at_line; uint mutex_last_locked_at_line;
uint mutex_last_unlocked_at_line; uint mutex_last_unlocked_at_line;
uint mutex_last_attempted_lock_at_line; uint mutex_last_attempted_lock_at_line;

View File

@ -28,19 +28,12 @@
#define SCHED_FUNC "<unknown>" #define SCHED_FUNC "<unknown>"
#endif #endif
#define LOCK_SCHEDULER_DATA() lock_data(SCHED_FUNC, __LINE__) #define LOCK_DATA() lock_data(SCHED_FUNC, __LINE__)
#define UNLOCK_SCHEDULER_DATA() unlock_data(SCHED_FUNC, __LINE__) #define UNLOCK_DATA() unlock_data(SCHED_FUNC, __LINE__)
#define COND_STATE_WAIT(timer) cond_wait(timer, SCHED_FUNC, __LINE__) #define COND_STATE_WAIT(timer) cond_wait(timer, SCHED_FUNC, __LINE__)
extern pthread_attr_t connection_attrib; extern pthread_attr_t connection_attrib;
struct scheduler_param
{
THD *thd;
Event_scheduler *scheduler;
};
static static
LEX_STRING scheduler_states_names[] = LEX_STRING scheduler_states_names[] =
{ {
@ -49,6 +42,11 @@ LEX_STRING scheduler_states_names[] =
{ C_STRING_WITH_LEN("STOPPING")} { C_STRING_WITH_LEN("STOPPING")}
}; };
struct scheduler_param {
THD *thd;
Event_scheduler *scheduler;
};
/* /*
Prints the stack of infos, warnings, errors from thd to Prints the stack of infos, warnings, errors from thd to
@ -99,54 +97,6 @@ evex_print_warnings(THD *thd, Event_job_data *et)
} }
/*
Performs pre- pthread_create() initialisation of THD. Do this
in the thread that will pass THD to the child thread. In the
child thread call post_init_event_thread().
SYNOPSIS
pre_init_event_thread()
thd The THD of the thread. Has to be allocated by the caller.
NOTES
1. The host of the thead is my_localhost
2. thd->net is initted with NULL - no communication.
*/
void
pre_init_event_thread(THD* thd)
{
DBUG_ENTER("pre_init_event_thread");
thd->client_capabilities= 0;
thd->security_ctx->master_access= 0;
thd->security_ctx->db_access= 0;
thd->security_ctx->host_or_ip= (char*)my_localhost;
thd->security_ctx->set_user((char*)"event_scheduler");
my_net_init(&thd->net, NULL);
thd->net.read_timeout= slave_net_timeout;
thd->slave_thread= 0;
thd->options|= OPTION_AUTO_IS_NULL;
thd->client_capabilities|= CLIENT_MULTI_RESULTS;
pthread_mutex_lock(&LOCK_thread_count);
thd->thread_id= thread_id++;
threads.append(thd);
thread_count++;
thread_running++;
pthread_mutex_unlock(&LOCK_thread_count);
/*
Guarantees that we will see the thread in SHOW PROCESSLIST though its
vio is NULL.
*/
thd->proc_info= "Initialized";
thd->version= refresh_version;
thd->set_time();
DBUG_VOID_RETURN;
}
/* /*
Performs post initialization of structures in a new thread. Performs post initialization of structures in a new thread.
@ -201,6 +151,54 @@ deinit_event_thread(THD *thd)
} }
/*
Performs pre- pthread_create() initialisation of THD. Do this
in the thread that will pass THD to the child thread. In the
child thread call post_init_event_thread().
SYNOPSIS
pre_init_event_thread()
thd The THD of the thread. Has to be allocated by the caller.
NOTES
1. The host of the thead is my_localhost
2. thd->net is initted with NULL - no communication.
*/
void
pre_init_event_thread(THD* thd)
{
DBUG_ENTER("pre_init_event_thread");
thd->client_capabilities= 0;
thd->security_ctx->master_access= 0;
thd->security_ctx->db_access= 0;
thd->security_ctx->host_or_ip= (char*)my_localhost;
my_net_init(&thd->net, NULL);
thd->security_ctx->set_user((char*)"event_scheduler");
thd->net.read_timeout= slave_net_timeout;
thd->slave_thread= 0;
thd->options|= OPTION_AUTO_IS_NULL;
thd->client_capabilities|= CLIENT_MULTI_RESULTS;
pthread_mutex_lock(&LOCK_thread_count);
thd->thread_id= thread_id++;
threads.append(thd);
thread_count++;
thread_running++;
pthread_mutex_unlock(&LOCK_thread_count);
/*
Guarantees that we will see the thread in SHOW PROCESSLIST though its
vio is NULL.
*/
thd->proc_info= "Initialized";
thd->version= refresh_version;
thd->set_time();
DBUG_VOID_RETURN;
}
/* /*
Function that executes the scheduler, Function that executes the scheduler,
@ -259,33 +257,32 @@ event_worker_thread(void *arg)
thd->thread_stack= (char *) &thd; // remember where our stack is thd->thread_stack= (char *) &thd; // remember where our stack is
DBUG_ENTER("event_worker_thread"); DBUG_ENTER("event_worker_thread");
if (post_init_event_thread(thd)) if (!post_init_event_thread(thd))
goto end; {
DBUG_PRINT("info", ("Baikonur, time is %d, BURAN reporting and operational."
"THD=0x%lx", time(NULL), thd));
DBUG_PRINT("info", ("Baikonur, time is %d, BURAN reporting and operational." sql_print_information("SCHEDULER: [%s.%s of %s] executing in thread %lu",
"THD=0x%lx", time(NULL), thd));
sql_print_information("SCHEDULER: [%s.%s of %s] executing in thread %lu",
event->dbname.str, event->name.str,
event->definer.str, thd->thread_id);
thd->enable_slow_log= TRUE;
ret= event->execute(thd);
evex_print_warnings(thd, event);
sql_print_information("SCHEDULER: [%s.%s of %s] executed "
" in thread thread %lu. RetCode=%d",
event->dbname.str, event->name.str,
event->definer.str, thd->thread_id, ret);
if (ret == EVEX_COMPILE_ERROR)
sql_print_information("SCHEDULER: COMPILE ERROR for event %s.%s of %s",
event->dbname.str, event->name.str, event->dbname.str, event->name.str,
event->definer.str); event->definer.str, thd->thread_id);
else if (ret == EVEX_MICROSECOND_UNSUP)
sql_print_information("SCHEDULER: MICROSECOND is not supported");
thd->enable_slow_log= TRUE;
ret= event->execute(thd);
evex_print_warnings(thd, event);
sql_print_information("SCHEDULER: [%s.%s of %s] executed "
" in thread thread %lu. RetCode=%d",
event->dbname.str, event->name.str,
event->definer.str, thd->thread_id, ret);
if (ret == EVEX_COMPILE_ERROR)
sql_print_information("SCHEDULER: COMPILE ERROR for event %s.%s of %s",
event->dbname.str, event->name.str,
event->definer.str);
else if (ret == EVEX_MICROSECOND_UNSUP)
sql_print_information("SCHEDULER: MICROSECOND is not supported");
}
end: end:
DBUG_PRINT("info", ("BURAN %s.%s is landing!", event->dbname.str, DBUG_PRINT("info", ("BURAN %s.%s is landing!", event->dbname.str,
event->name.str)); event->name.str));
@ -293,7 +290,7 @@ end:
deinit_event_thread(thd); deinit_event_thread(thd);
DBUG_RETURN(0); // Against gcc warnings DBUG_RETURN(0); // Can't return anything here
} }
@ -305,17 +302,15 @@ end:
Event_scheduler::init_scheduler() Event_scheduler::init_scheduler()
*/ */
bool void
Event_scheduler::init_scheduler(Event_queue *q) Event_scheduler::init_scheduler(Event_queue *q)
{ {
LOCK_SCHEDULER_DATA(); LOCK_DATA();
thread_id= 0;
state= INITIALIZED;
queue= q; queue= q;
started_events= 0; started_events= 0;
UNLOCK_SCHEDULER_DATA(); thread_id= 0;
state= INITIALIZED;
return FALSE; UNLOCK_DATA();
} }
@ -377,7 +372,7 @@ Event_scheduler::start()
struct scheduler_param *scheduler_param_value; struct scheduler_param *scheduler_param_value;
DBUG_ENTER("Event_scheduler::start"); DBUG_ENTER("Event_scheduler::start");
LOCK_SCHEDULER_DATA(); LOCK_DATA();
DBUG_PRINT("info", ("state before action %s", scheduler_states_names[state])); DBUG_PRINT("info", ("state before action %s", scheduler_states_names[state]));
if (state > INITIALIZED) if (state > INITIALIZED)
goto end; goto end;
@ -408,7 +403,7 @@ Event_scheduler::start()
DBUG_PRINT("info", ("Setting state go RUNNING")); DBUG_PRINT("info", ("Setting state go RUNNING"));
state= RUNNING; state= RUNNING;
end: end:
UNLOCK_SCHEDULER_DATA(); UNLOCK_DATA();
if (ret && new_thd) if (ret && new_thd)
{ {
@ -426,56 +421,6 @@ end:
} }
/*
Stops the scheduler (again). Waits for acknowledgement from the
scheduler that it has stopped - synchronous stopping.
SYNOPSIS
Event_scheduler::stop()
RETURN VALUE
FALSE OK
TRUE Error (not reported)
*/
bool
Event_scheduler::stop()
{
THD *thd= current_thd;
DBUG_ENTER("Event_scheduler::stop");
DBUG_PRINT("enter", ("thd=0x%lx", current_thd));
LOCK_SCHEDULER_DATA();
DBUG_PRINT("info", ("state before action %s", scheduler_states_names[state]));
if (state != RUNNING)
goto end;
state= STOPPING;
DBUG_PRINT("info", ("Manager thread has id %d", thread_id));
sql_print_information("SCHEDULER: Killing manager thread %lu", thread_id);
pthread_cond_signal(&COND_state);
/* Guarantee we don't catch spurious signals */
sql_print_information("SCHEDULER: Waiting the manager thread to reply");
do {
DBUG_PRINT("info", ("Waiting for COND_started_or_stopped from the manager "
"thread. Current value of state is %s . "
"workers count=%d", scheduler_states_names[state].str,
workers_count()));
/* thd could be 0x0, when shutting down */
COND_STATE_WAIT(NULL);
} while (state == STOPPING);
DBUG_PRINT("info", ("Manager thread has cleaned up. Set state to INIT"));
thread_id= 0;
end:
UNLOCK_SCHEDULER_DATA();
DBUG_RETURN(FALSE);
}
/* /*
The main loop of the scheduler. The main loop of the scheduler.
@ -496,7 +441,7 @@ Event_scheduler::run(THD *thd)
Event_job_data *job_data; Event_job_data *job_data;
DBUG_ENTER("Event_scheduler::run"); DBUG_ENTER("Event_scheduler::run");
LOCK_SCHEDULER_DATA(); LOCK_DATA();
thread_id= thd->thread_id; thread_id= thd->thread_id;
sql_print_information("SCHEDULER: Manager thread started with id %lu", sql_print_information("SCHEDULER: Manager thread started with id %lu",
@ -529,7 +474,7 @@ Event_scheduler::run(THD *thd)
COND_STATE_WAIT(NULL); COND_STATE_WAIT(NULL);
thd->exit_cond(""); thd->exit_cond("");
DBUG_PRINT("info", ("Woke up. Got COND_state")); DBUG_PRINT("info", ("Woke up. Got COND_state"));
LOCK_SCHEDULER_DATA(); LOCK_DATA();
} }
else if (abstime.tv_sec) else if (abstime.tv_sec)
{ {
@ -545,16 +490,16 @@ Event_scheduler::run(THD *thd)
1. Spurious wake-up 1. Spurious wake-up
2. The top of the queue was changed (new one becase of create/update) 2. The top of the queue was changed (new one becase of create/update)
*/ */
/* This will do implicit UNLOCK_SCHEDULER_DATA() */ /* This will do implicit UNLOCK_DATA() */
thd->exit_cond(""); thd->exit_cond("");
DBUG_PRINT("info", ("Woke up. Got COND_stat or time for execution.")); DBUG_PRINT("info", ("Woke up. Got COND_stat or time for execution."));
LOCK_SCHEDULER_DATA(); LOCK_DATA();
} }
else else
{ {
UNLOCK_SCHEDULER_DATA(); UNLOCK_DATA();
res= execute_top(thd, job_data); res= execute_top(thd, job_data);
LOCK_SCHEDULER_DATA(); LOCK_DATA();
if (res) if (res)
break; break;
++started_events; ++started_events;
@ -565,7 +510,7 @@ Event_scheduler::run(THD *thd)
pthread_cond_signal(&COND_state); pthread_cond_signal(&COND_state);
error: error:
state= INITIALIZED; state= INITIALIZED;
UNLOCK_SCHEDULER_DATA(); UNLOCK_DATA();
sql_print_information("SCHEDULER: Stopped"); sql_print_information("SCHEDULER: Stopped");
DBUG_RETURN(res); DBUG_RETURN(res);
@ -627,23 +572,52 @@ error:
/* /*
Returns the current state of the scheduler Stops the scheduler (again). Waits for acknowledgement from the
scheduler that it has stopped - synchronous stopping.
SYNOPSIS SYNOPSIS
Event_scheduler::get_state() Event_scheduler::stop()
RETURN VALUE RETURN VALUE
The state of the scheduler (INITIALIZED | RUNNING | STOPPING) FALSE OK
TRUE Error (not reported)
*/ */
enum Event_scheduler::enum_state bool
Event_scheduler::get_state() Event_scheduler::stop()
{ {
enum Event_scheduler::enum_state ret; THD *thd= current_thd;
LOCK_SCHEDULER_DATA(); DBUG_ENTER("Event_scheduler::stop");
ret= state; DBUG_PRINT("enter", ("thd=0x%lx", current_thd));
UNLOCK_SCHEDULER_DATA();
return ret; LOCK_DATA();
DBUG_PRINT("info", ("state before action %s", scheduler_states_names[state]));
if (state != RUNNING)
goto end;
state= STOPPING;
DBUG_PRINT("info", ("Manager thread has id %d", thread_id));
sql_print_information("SCHEDULER: Killing manager thread %lu", thread_id);
pthread_cond_signal(&COND_state);
/* Guarantee we don't catch spurious signals */
sql_print_information("SCHEDULER: Waiting the manager thread to reply");
do {
DBUG_PRINT("info", ("Waiting for COND_started_or_stopped from the manager "
"thread. Current value of state is %s . "
"workers count=%d", scheduler_states_names[state].str,
workers_count()));
/* thd could be 0x0, when shutting down */
COND_STATE_WAIT(NULL);
} while (state == STOPPING);
DBUG_PRINT("info", ("Manager thread has cleaned up. Set state to INIT"));
thread_id= 0;
end:
UNLOCK_DATA();
DBUG_RETURN(FALSE);
} }
@ -697,7 +671,7 @@ Event_scheduler::queue_changed()
/* /*
Auxiliary function for locking LOCK_scheduler_state. Used Auxiliary function for locking LOCK_scheduler_state. Used
by the LOCK_SCHEDULER_DATA macro. by the LOCK_DATA macro.
SYNOPSIS SYNOPSIS
Event_scheduler::lock_data() Event_scheduler::lock_data()
@ -720,7 +694,7 @@ Event_scheduler::lock_data(const char *func, uint line)
/* /*
Auxiliary function for unlocking LOCK_scheduler_state. Used Auxiliary function for unlocking LOCK_scheduler_state. Used
by the UNLOCK_SCHEDULER_DATA macro. by the UNLOCK_DATA macro.
SYNOPSIS SYNOPSIS
Event_scheduler::unlock_data() Event_scheduler::unlock_data()
@ -754,29 +728,51 @@ Event_scheduler::unlock_data(const char *func, uint line)
*/ */
void void
Event_scheduler::cond_wait(struct timespec *abstime, Event_scheduler::cond_wait(struct timespec *abstime, const char *func,
const char *func, uint line) uint line)
{ {
DBUG_ENTER("Event_scheduler::cond_wait");
waiting_on_cond= TRUE; waiting_on_cond= TRUE;
mutex_last_unlocked_at_line= line; mutex_last_unlocked_at_line= line;
mutex_scheduler_data_locked= FALSE; mutex_scheduler_data_locked= FALSE;
mutex_last_unlocked_in_func= func; mutex_last_unlocked_in_func= func;
if (!abstime)
if (abstime)
pthread_cond_timedwait(&COND_state, &LOCK_scheduler_state, abstime);
else
pthread_cond_wait(&COND_state, &LOCK_scheduler_state); pthread_cond_wait(&COND_state, &LOCK_scheduler_state);
else
pthread_cond_timedwait(&COND_state, &LOCK_scheduler_state, abstime);
mutex_last_locked_in_func= func; mutex_last_locked_in_func= func;
mutex_last_locked_at_line= line; mutex_last_locked_at_line= line;
mutex_scheduler_data_locked= TRUE; mutex_scheduler_data_locked= TRUE;
waiting_on_cond= FALSE; waiting_on_cond= FALSE;
DBUG_VOID_RETURN;
} }
/*
Returns the current state of the scheduler
SYNOPSIS
Event_scheduler::get_state()
RETURN VALUE
The state of the scheduler (INITIALIZED | RUNNING | STOPPING)
*/
enum Event_scheduler::enum_state
Event_scheduler::get_state()
{
enum Event_scheduler::enum_state ret;
DBUG_ENTER("Event_scheduler::get_state");
LOCK_DATA();
ret= state;
UNLOCK_DATA();
DBUG_RETURN(ret);
}
/*
REMOVE THIS COMMENT AFTER PATCH REVIEW. USED TO HELP DIFF
Returns whether the scheduler was initialized.
*/
/* /*
Dumps the internal status of the scheduler Dumps the internal status of the scheduler
@ -805,80 +801,82 @@ Event_scheduler::dump_internal_status(THD *thd)
tmp_string.length(0); tmp_string.length(0);
int_string.length(0); int_string.length(0);
protocol->prepare_for_resend(); do
protocol->store(STRING_WITH_LEN("scheduler state"), scs);
protocol->store(scheduler_states_names[state].str,
scheduler_states_names[state].length, scs);
if ((ret= protocol->write()))
goto end;
/* thread_id */
protocol->prepare_for_resend();
protocol->store(STRING_WITH_LEN("thread_id"), scs);
if (thread_id)
{ {
int_string.set((longlong) thread_id, scs); protocol->prepare_for_resend();
protocol->store(STRING_WITH_LEN("scheduler state"), scs);
protocol->store(scheduler_states_names[state].str,
scheduler_states_names[state].length, scs);
if ((ret= protocol->write()))
break;
/* thread_id */
protocol->prepare_for_resend();
protocol->store(STRING_WITH_LEN("thread_id"), scs);
if (thread_id)
{
int_string.set((longlong) thread_id, scs);
protocol->store(&int_string);
}
else
protocol->store_null();
if ((ret= protocol->write()))
break;
/* last locked at*/
protocol->prepare_for_resend();
protocol->store(STRING_WITH_LEN("scheduler last locked at"), scs);
tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(),
tmp_string.alloced_length(), "%s::%d",
mutex_last_locked_in_func,
mutex_last_locked_at_line));
protocol->store(&tmp_string);
if ((ret= protocol->write()))
break;
/* last unlocked at*/
protocol->prepare_for_resend();
protocol->store(STRING_WITH_LEN("scheduler last unlocked at"), scs);
tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(),
tmp_string.alloced_length(), "%s::%d",
mutex_last_unlocked_in_func,
mutex_last_unlocked_at_line));
protocol->store(&tmp_string);
if ((ret= protocol->write()))
break;
/* waiting on */
protocol->prepare_for_resend();
protocol->store(STRING_WITH_LEN("scheduler waiting on condition"), scs);
int_string.set((longlong) waiting_on_cond, scs);
protocol->store(&int_string); protocol->store(&int_string);
} if ((ret= protocol->write()))
else break;
protocol->store_null();
if ((ret= protocol->write()))
goto end;
/* last locked at*/ /* workers_count */
protocol->prepare_for_resend(); protocol->prepare_for_resend();
protocol->store(STRING_WITH_LEN("scheduler last locked at"), scs); protocol->store(STRING_WITH_LEN("scheduler workers count"), scs);
tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(), int_string.set((longlong) workers_count(), scs);
tmp_string.alloced_length(), "%s::%d", protocol->store(&int_string);
mutex_last_locked_in_func, if ((ret= protocol->write()))
mutex_last_locked_at_line)); break;
protocol->store(&tmp_string);
if ((ret= protocol->write()))
goto end;
/* last unlocked at*/ /* workers_count */
protocol->prepare_for_resend(); protocol->prepare_for_resend();
protocol->store(STRING_WITH_LEN("scheduler last unlocked at"), scs); protocol->store(STRING_WITH_LEN("scheduler executed events"), scs);
tmp_string.length(scs->cset->snprintf(scs, (char*) tmp_string.ptr(), int_string.set((longlong) started_events, scs);
tmp_string.alloced_length(), "%s::%d", protocol->store(&int_string);
mutex_last_unlocked_in_func, if ((ret= protocol->write()))
mutex_last_unlocked_at_line)); break;
protocol->store(&tmp_string);
if ((ret= protocol->write()))
goto end;
/* waiting on */ /* scheduler_data_locked */
protocol->prepare_for_resend(); protocol->prepare_for_resend();
protocol->store(STRING_WITH_LEN("scheduler waiting on condition"), scs); protocol->store(STRING_WITH_LEN("scheduler data locked"), scs);
int_string.set((longlong) waiting_on_cond, scs); int_string.set((longlong) mutex_scheduler_data_locked, scs);
protocol->store(&int_string); protocol->store(&int_string);
if ((ret= protocol->write())) ret= protocol->write();
goto end; } while (0);
/* workers_count */
protocol->prepare_for_resend();
protocol->store(STRING_WITH_LEN("scheduler workers count"), scs);
int_string.set((longlong) workers_count(), scs);
protocol->store(&int_string);
if ((ret= protocol->write()))
goto end;
/* workers_count */
protocol->prepare_for_resend();
protocol->store(STRING_WITH_LEN("scheduler executed events"), scs);
int_string.set((longlong) started_events, scs);
protocol->store(&int_string);
if ((ret= protocol->write()))
goto end;
/* scheduler_data_locked */
protocol->prepare_for_resend();
protocol->store(STRING_WITH_LEN("scheduler data locked"), scs);
int_string.set((longlong) mutex_scheduler_data_locked, scs);
protocol->store(&int_string);
ret= protocol->write();
end:
#endif #endif
DBUG_RETURN(ret); DBUG_RETURN(ret);

View File

@ -56,7 +56,7 @@ public:
bool bool
run(THD *thd); run(THD *thd);
bool void
init_scheduler(Event_queue *queue); init_scheduler(Event_queue *queue);
void void