removed init_count from IO_CACHE.

Added missing mutex_unlock to slave replication code.


include/my_sys.h:
  removed init_count from IO_CACHE.
  General cleanup.
innobase/srv/srv0srv.c:
  Initailize slots to avoid purify warnings.
  Removed some compiler warnings.
mysql-test/mysql-test-run.sh:
  Automatic start of slave under gdb
mysys/mf_iocache.c:
  removed init_count
sql/field.cc:
  Cleanup
sql/log.cc:
  Cleanup
  added open_count variable.
sql/log_event.cc:
  cleanup
  use is_prefix instead of memcmp()
sql/repl_failsafe.cc:
  cleanup
sql/slave.cc:
  cleanup
  use MYSQL_LOG->open_count instead of IO_CACHE->init_count
  Added missing mutex_unlock()
sql/slave.h:
  cleanup
sql/sql_class.h:
  cleanup
  Added open_count to MYSQL_LOGL
sql/sql_parse.cc:
  removed compiler warning
sql/sql_repl.cc:
  added DBUG_xxx
sql/unireg.h:
  Added BIN_LOG_HEADER_SIZE
This commit is contained in:
unknown 2002-06-05 23:04:38 +03:00
parent ef06010563
commit 03728196ee
14 changed files with 777 additions and 677 deletions

View File

@ -302,33 +302,41 @@ typedef int (*IO_CACHE_CALLBACK)(struct st_io_cache*);
typedef struct st_io_cache /* Used when cacheing files */ typedef struct st_io_cache /* Used when cacheing files */
{ {
/* pos_in_file is offset in file corresponding to the first byte of /* Offset in file corresponding to the first byte of byte* buffer. */
byte* buffer. end_of_file is the offset of end of file for READ_CACHE my_off_t pos_in_file;
and WRITE_CACHE. For SEQ_READ_APPEND it the maximum of the actual /*
end of file and the position represented by read_end. The offset of end of file for READ_CACHE and WRITE_CACHE.
For SEQ_READ_APPEND it the maximum of the actual end of file and
the position represented by read_end.
*/ */
my_off_t pos_in_file,end_of_file; my_off_t end_of_file;
/* read_pos points to current read position in the buffer /* Points to current read position in the buffer */
read_end is the non-inclusive boundary in the buffer for the currently byte *read_pos;
valid read area /* the non-inclusive boundary in the buffer for the currently valid read */
buffer is the read buffer byte *read_end;
not sure about request_pos except that it is used in async_io byte *buffer; /* The read buffer */
/* Used in ASYNC_IO */
byte *request_pos;
/* Only used in WRITE caches and in SEQ_READ_APPEND to buffer writes */
byte *write_buffer;
/*
Only used in SEQ_READ_APPEND, and points to the current read position
in the write buffer. Note that reads in SEQ_READ_APPEND caches can
happen from both read buffer (byte* buffer) and write buffer
(byte* write_buffer).
*/ */
byte *read_pos,*read_end,*buffer,*request_pos; byte *append_read_pos;
/* write_buffer is used only in WRITE caches and in SEQ_READ_APPEND to /* Points to current write position in the write buffer */
buffer writes byte *write_pos;
append_read_pos is only used in SEQ_READ_APPEND, and points to the /* The non-inclusive boundary of the valid write area */
current read position in the write buffer. Note that reads in byte *write_end;
SEQ_READ_APPEND caches can happen from both read buffer (byte* buffer),
and write buffer (byte* write_buffer). /*
write_pos points to current write position in the write buffer and Current_pos and current_end are convenience variables used by
write_end is the non-inclusive boundary of the valid write area my_b_tell() and other routines that need to know the current offset
*/ current_pos points to &write_pos, and current_end to &write_end in a
byte *write_buffer, *append_read_pos, *write_pos, *write_end; WRITE_CACHE, and &read_pos and &read_end respectively otherwise
/* current_pos and current_end are convenience variables used by
my_b_tell() and other routines that need to know the current offset
current_pos points to &write_pos, and current_end to &write_end in a
WRITE_CACHE, and &read_pos and &read_end respectively otherwise
*/ */
byte **current_pos, **current_end; byte **current_pos, **current_end;
/* The lock is for append buffer used in SEQ_READ_APPEND cache */ /* The lock is for append buffer used in SEQ_READ_APPEND cache */
@ -336,70 +344,64 @@ typedef struct st_io_cache /* Used when cacheing files */
pthread_mutex_t append_buffer_lock; pthread_mutex_t append_buffer_lock;
/* need mutex copying from append buffer to read buffer */ /* need mutex copying from append buffer to read buffer */
#endif #endif
/* a caller will use my_b_read() macro to read from the cache /*
if the data is already in cache, it will be simply copied with A caller will use my_b_read() macro to read from the cache
memcpy() and internal variables will be accordinging updated with if the data is already in cache, it will be simply copied with
no functions invoked. However, if the data is not fully in the cache, memcpy() and internal variables will be accordinging updated with
my_b_read() will call read_function to fetch the data. read_function no functions invoked. However, if the data is not fully in the cache,
must never be invoked directly my_b_read() will call read_function to fetch the data. read_function
must never be invoked directly.
*/ */
int (*read_function)(struct st_io_cache *,byte *,uint); int (*read_function)(struct st_io_cache *,byte *,uint);
/* same idea as in the case of read_function, except my_b_write() needs to /*
be replaced with my_b_append() for a SEQ_READ_APPEND cache Same idea as in the case of read_function, except my_b_write() needs to
be replaced with my_b_append() for a SEQ_READ_APPEND cache
*/ */
int (*write_function)(struct st_io_cache *,const byte *,uint); int (*write_function)(struct st_io_cache *,const byte *,uint);
/* specifies the type of the cache. Depending on the type of the cache /*
Specifies the type of the cache. Depending on the type of the cache
certain operations might not be available and yield unpredicatable certain operations might not be available and yield unpredicatable
results. Details to be documented later results. Details to be documented later
*/ */
enum cache_type type; enum cache_type type;
/* callbacks when the actual read I/O happens. These were added and /*
are currently used for binary logging of LOAD DATA INFILE - when a Callbacks when the actual read I/O happens. These were added and
block is read from the file, we create a block create/append event, and are currently used for binary logging of LOAD DATA INFILE - when a
when IO_CACHE is closed, we create an end event. These functions could, block is read from the file, we create a block create/append event, and
of course be used for other things when IO_CACHE is closed, we create an end event. These functions could,
of course be used for other things
*/ */
IO_CACHE_CALLBACK pre_read; IO_CACHE_CALLBACK pre_read;
IO_CACHE_CALLBACK post_read; IO_CACHE_CALLBACK post_read;
IO_CACHE_CALLBACK pre_close; IO_CACHE_CALLBACK pre_close;
void* arg; /* for use by pre/post_read */ void* arg; /* for use by pre/post_read */
char *file_name; /* if used with 'open_cached_file' */ char *file_name; /* if used with 'open_cached_file' */
char *dir,*prefix; char *dir,*prefix;
File file; /* file descriptor */ File file; /* file descriptor */
/* seek_not_done is set by my_b_seek() to inform the upcoming read/write /*
operation that a seek needs to be preformed prior to the actual I/O seek_not_done is set by my_b_seek() to inform the upcoming read/write
error is 0 if the cache operation was successful, -1 if there was a operation that a seek needs to be preformed prior to the actual I/O
"hard" error, and the actual number of I/O-ed bytes if the read/write was error is 0 if the cache operation was successful, -1 if there was a
partial "hard" error, and the actual number of I/O-ed bytes if the read/write was
partial.
*/ */
int seek_not_done,error; int seek_not_done,error;
/* buffer_length is the size of memory allocated for buffer or write_buffer /* buffer_length is memory size allocated for buffer or write_buffer */
read_length is the same as buffer_length except when we use async io uint buffer_length;
not sure why we need it /* read_length is the same as buffer_length except when we use async io */
*/ uint read_length;
uint buffer_length,read_length;
myf myflags; /* Flags used to my_read/my_write */ myf myflags; /* Flags used to my_read/my_write */
/* /*
alloced_buffer is 1 if the buffer was allocated by init_io_cache() and alloced_buffer is 1 if the buffer was allocated by init_io_cache() and
0 if it was supplied by the user 0 if it was supplied by the user.
Currently READ_NET is the only one that will use a buffer allocated Currently READ_NET is the only one that will use a buffer allocated
somewhere else somewhere else
*/ */
my_bool alloced_buffer; my_bool alloced_buffer;
/* init_count is incremented every time we call init_io_cache()
It is not reset in end_io_cache(). This variable
was introduced for slave relay logs - RELAY_LOG_INFO stores a pointer
to IO_CACHE that could in some cases refer to the IO_CACHE of the
currently active relay log. The IO_CACHE then could be closed,
re-opened and start pointing to a different log file. In that case,
we could not know reliably if this happened without init_count
one must be careful with bzero() prior to the subsequent init_io_cache()
call
*/
int init_count;
#ifdef HAVE_AIOWAIT #ifdef HAVE_AIOWAIT
/* as inidicated by ifdef, this is for async I/O, we will have /*
Sinisa comment this some time As inidicated by ifdef, this is for async I/O, which is not currently
used (because it's not reliable on all systems)
*/ */
uint inited; uint inited;
my_off_t aio_read_pos; my_off_t aio_read_pos;
@ -428,7 +430,6 @@ typedef int (*qsort2_cmp)(const void *, const void *, const void *);
((info)->write_pos+=(Count)),0) : \ ((info)->write_pos+=(Count)),0) : \
(*(info)->write_function)((info),(Buffer),(Count))) (*(info)->write_function)((info),(Buffer),(Count)))
#define my_b_get(info) \ #define my_b_get(info) \
((info)->read_pos != (info)->read_end ?\ ((info)->read_pos != (info)->read_end ?\
((info)->read_pos++, (int) (uchar) (info)->read_pos[-1]) :\ ((info)->read_pos++, (int) (uchar) (info)->read_pos[-1]) :\

View File

@ -1631,6 +1631,7 @@ srv_init(void)
for (i = 0; i < OS_THREAD_MAX_N; i++) { for (i = 0; i < OS_THREAD_MAX_N; i++) {
slot = srv_table_get_nth_slot(i); slot = srv_table_get_nth_slot(i);
slot->in_use = FALSE; slot->in_use = FALSE;
slot->type=0; /* Avoid purify errors */
slot->event = os_event_create(NULL); slot->event = os_event_create(NULL);
ut_a(slot->event); ut_a(slot->event);
} }
@ -1899,7 +1900,6 @@ srv_conc_exit_innodb(
trx_t* trx) /* in: transaction object associated with the trx_t* trx) /* in: transaction object associated with the
thread */ thread */
{ {
srv_conc_slot_t* slot = NULL;
if (srv_thread_concurrency >= 500) { if (srv_thread_concurrency >= 500) {
@ -2514,11 +2514,11 @@ loop:
can drop tables lazily after there no longer are SELECT can drop tables lazily after there no longer are SELECT
queries to them. */ queries to them. */
srv_main_thread_op_info = "doing background drop tables"; srv_main_thread_op_info = (char*) "doing background drop tables";
row_drop_tables_for_mysql_in_background(); row_drop_tables_for_mysql_in_background();
srv_main_thread_op_info = ""; srv_main_thread_op_info = (char*) "";
if (srv_force_recovery >= SRV_FORCE_NO_BACKGROUND) { if (srv_force_recovery >= SRV_FORCE_NO_BACKGROUND) {
@ -2630,19 +2630,19 @@ background_loop:
/* In this loop we run background operations when the server /* In this loop we run background operations when the server
is quiet and we also come here about once in 10 seconds */ is quiet and we also come here about once in 10 seconds */
srv_main_thread_op_info = "doing background drop tables"; srv_main_thread_op_info = (char*) "doing background drop tables";
n_tables_to_drop = row_drop_tables_for_mysql_in_background(); n_tables_to_drop = row_drop_tables_for_mysql_in_background();
srv_main_thread_op_info = ""; srv_main_thread_op_info = (char*) "";
srv_main_thread_op_info = "flushing buffer pool pages"; srv_main_thread_op_info = (char*) "flushing buffer pool pages";
/* Flush a few oldest pages to make the checkpoint younger */ /* Flush a few oldest pages to make the checkpoint younger */
n_pages_flushed = buf_flush_batch(BUF_FLUSH_LIST, 10, ut_dulint_max); n_pages_flushed = buf_flush_batch(BUF_FLUSH_LIST, 10, ut_dulint_max);
srv_main_thread_op_info = "making checkpoint"; srv_main_thread_op_info = (char*) "making checkpoint";
/* Make a new checkpoint about once in 10 seconds */ /* Make a new checkpoint about once in 10 seconds */

View File

@ -675,9 +675,9 @@ manager_launch()
ident=$1 ident=$1
shift shift
if [ $USE_MANAGER = 0 ] ; then if [ $USE_MANAGER = 0 ] ; then
$@ >$CUR_MYERR 2>&1 & $@ >$CUR_MYERR 2>&1 &
sleep 2 #hack sleep 2 #hack
return return
fi fi
$MYSQL_MANAGER_CLIENT $MANAGER_QUIET_OPT --user=$MYSQL_MANAGER_USER \ $MYSQL_MANAGER_CLIENT $MANAGER_QUIET_OPT --user=$MYSQL_MANAGER_USER \
--password=$MYSQL_MANAGER_PW --port=$MYSQL_MANAGER_PORT <<EOF --password=$MYSQL_MANAGER_PW --port=$MYSQL_MANAGER_PORT <<EOF
@ -687,7 +687,7 @@ set_exec_stderr $ident $CUR_MYERR
set_exec_con $ident root localhost $CUR_MYSOCK set_exec_con $ident root localhost $CUR_MYSOCK
start_exec $ident $START_WAIT_TIMEOUT start_exec $ident $START_WAIT_TIMEOUT
EOF EOF
abort_if_failed "Could not execute manager command" abort_if_failed "Could not execute manager command"
} }
manager_term() manager_term()
@ -887,13 +887,23 @@ start_slave()
"gdb -x $GDB_SLAVE_INIT" $SLAVE_MYSQLD "gdb -x $GDB_SLAVE_INIT" $SLAVE_MYSQLD
elif [ x$DO_GDB = x1 ] elif [ x$DO_GDB = x1 ]
then then
$ECHO "set args $slave_args" > $GDB_SLAVE_INIT
if [ x$MANUAL_GDB = x1 ] if [ x$MANUAL_GDB = x1 ]
then then
$ECHO "set args $slave_args" > $GDB_SLAVE_INIT
echo "To start gdb for the slave, type in another window:" echo "To start gdb for the slave, type in another window:"
echo "cd $CWD ; gdb -x $GDB_SLAVE_INIT $SLAVE_MYSQLD" echo "cd $CWD ; gdb -x $GDB_SLAVE_INIT $SLAVE_MYSQLD"
wait_for_slave=1500 wait_for_slave=1500
else else
( $ECHO set args $slave_args;
if [ $USE_MANAGER = 0 ] ; then
cat <<EOF
b mysql_parse
commands 1
disa 1
end
r
EOF
fi ) > $GDB_SLAVE_INIT
manager_launch $slave_ident $XTERM -display $DISPLAY -title "Slave" -e \ manager_launch $slave_ident $XTERM -display $DISPLAY -title "Slave" -e \
gdb -x $GDB_SLAVE_INIT $SLAVE_MYSQLD gdb -x $GDB_SLAVE_INIT $SLAVE_MYSQLD
fi fi

View File

@ -122,8 +122,6 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize,
info->pos_in_file= seek_offset; info->pos_in_file= seek_offset;
info->pre_close = info->pre_read = info->post_read = 0; info->pre_close = info->pre_read = info->post_read = 0;
info->arg = 0; info->arg = 0;
info->init_count++; /* we assume the user had set it to 0 prior to
first call */
info->alloced_buffer = 0; info->alloced_buffer = 0;
info->buffer=0; info->buffer=0;
info->seek_not_done= test(file >= 0); info->seek_not_done= test(file >= 0);

View File

@ -4628,7 +4628,7 @@ bool Field_num::eq_def(Field *field)
*****************************************************************************/ *****************************************************************************/
/* /*
** Make a field from the .frm file info Make a field from the .frm file info
*/ */
uint32 calc_pack_length(enum_field_types type,uint32 length) uint32 calc_pack_length(enum_field_types type,uint32 length)
@ -4657,6 +4657,7 @@ uint32 calc_pack_length(enum_field_types type,uint32 length)
case FIELD_TYPE_LONG_BLOB: return 4+portable_sizeof_char_ptr; case FIELD_TYPE_LONG_BLOB: return 4+portable_sizeof_char_ptr;
case FIELD_TYPE_SET: case FIELD_TYPE_SET:
case FIELD_TYPE_ENUM: abort(); return 0; // This shouldn't happen case FIELD_TYPE_ENUM: abort(); return 0; // This shouldn't happen
default: return 0;
} }
return 0; // This shouldn't happen return 0; // This shouldn't happen
} }

View File

@ -80,10 +80,10 @@ static int find_uniq_filename(char *name)
DBUG_RETURN(0); DBUG_RETURN(0);
} }
MYSQL_LOG::MYSQL_LOG(): last_time(0), query_start(0),index_file(-1), MYSQL_LOG::MYSQL_LOG()
name(0), log_type(LOG_CLOSED),write_error(0), :bytes_written(0), last_time(0), query_start(0), index_file(-1), name(0),
inited(0), file_id(1),no_rotate(0), file_id(1), open_count(1), log_type(LOG_CLOSED), write_error(0), inited(0),
need_start_event(1),bytes_written(0) no_rotate(0), need_start_event(1)
{ {
/* /*
We don't want to intialize LOCK_Log here as the thread system may We don't want to intialize LOCK_Log here as the thread system may
@ -173,8 +173,10 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg,
File file= -1; File file= -1;
bool do_magic; bool do_magic;
int open_flags = O_CREAT | O_APPEND | O_BINARY; int open_flags = O_CREAT | O_APPEND | O_BINARY;
DBUG_ENTER("MYSQL_LOG::open");
if (!inited && log_type_arg == LOG_BIN && *fn_ext(log_name)) if (!inited && log_type_arg == LOG_BIN && *fn_ext(log_name))
no_rotate = 1; no_rotate = 1;
init(log_type_arg,io_cache_type_arg,no_auto_events_arg); init(log_type_arg,io_cache_type_arg,no_auto_events_arg);
if (!(name=my_strdup(log_name,MYF(MY_WME)))) if (!(name=my_strdup(log_name,MYF(MY_WME))))
@ -196,6 +198,7 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg,
do_magic = ((log_type == LOG_BIN) && !my_stat(log_file_name, do_magic = ((log_type == LOG_BIN) && !my_stat(log_file_name,
&tmp_stat, MYF(0))); &tmp_stat, MYF(0)));
open_count++;
if ((file=my_open(log_file_name,open_flags, if ((file=my_open(log_file_name,open_flags,
MYF(MY_WME | ME_WAITTANG))) < 0 || MYF(MY_WME | ME_WAITTANG))) < 0 ||
init_io_cache(&log_file, file, IO_SIZE, io_cache_type, init_io_cache(&log_file, file, IO_SIZE, io_cache_type,
@ -237,10 +240,10 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg,
bool error; bool error;
if (do_magic) if (do_magic)
{ {
if (my_b_write(&log_file, (byte*) BINLOG_MAGIC, 4) || if (my_b_write(&log_file, (byte*) BINLOG_MAGIC, BIN_LOG_HEADER_SIZE) ||
open_index(O_APPEND | O_RDWR | O_CREAT)) open_index(O_APPEND | O_RDWR | O_CREAT))
goto err; goto err;
bytes_written += 4; bytes_written += BIN_LOG_HEADER_SIZE;
} }
if (need_start_event && !no_auto_events) if (need_start_event && !no_auto_events)
@ -262,7 +265,7 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg,
goto err; goto err;
} }
} }
return; DBUG_VOID_RETURN;
err: err:
sql_print_error("Could not use %s for logging (error %d)", log_name,errno); sql_print_error("Could not use %s for logging (error %d)", log_name,errno);
@ -271,7 +274,7 @@ err:
end_io_cache(&log_file); end_io_cache(&log_file);
x_free(name); name=0; x_free(name); name=0;
log_type=LOG_CLOSED; log_type=LOG_CLOSED;
return; DBUG_VOID_RETURN;
} }
int MYSQL_LOG::get_current_log(LOG_INFO* linfo) int MYSQL_LOG::get_current_log(LOG_INFO* linfo)
@ -284,6 +287,7 @@ int MYSQL_LOG::get_current_log(LOG_INFO* linfo)
} }
// if log_name is "" we stop at the first entry // if log_name is "" we stop at the first entry
int MYSQL_LOG::find_first_log(LOG_INFO* linfo, const char* log_name, int MYSQL_LOG::find_first_log(LOG_INFO* linfo, const char* log_name,
bool need_mutex) bool need_mutex)
{ {
@ -294,8 +298,10 @@ int MYSQL_LOG::find_first_log(LOG_INFO* linfo, const char* log_name,
uint log_name_len = (uint) strlen(log_name); uint log_name_len = (uint) strlen(log_name);
IO_CACHE io_cache; IO_CACHE io_cache;
// mutex needed because we need to make sure the file pointer does not move /*
// from under our feet Mutex needed because we need to make sure the file pointer does not move
from under our feet
*/
if (need_mutex) if (need_mutex)
pthread_mutex_lock(&LOCK_index); pthread_mutex_lock(&LOCK_index);
if (init_io_cache(&io_cache, index_file, IO_SIZE, READ_CACHE, (my_off_t) 0, if (init_io_cache(&io_cache, index_file, IO_SIZE, READ_CACHE, (my_off_t) 0,
@ -304,7 +310,7 @@ int MYSQL_LOG::find_first_log(LOG_INFO* linfo, const char* log_name,
error = LOG_INFO_SEEK; error = LOG_INFO_SEEK;
goto err; goto err;
} }
for(;;) for (;;)
{ {
uint length; uint length;
if (!(length=my_b_gets(&io_cache, fname, FN_REFLEN-1))) if (!(length=my_b_gets(&io_cache, fname, FN_REFLEN-1)))
@ -336,9 +342,12 @@ err:
int MYSQL_LOG::find_next_log(LOG_INFO* linfo, bool need_lock) int MYSQL_LOG::find_next_log(LOG_INFO* linfo, bool need_lock)
{ {
// mutex needed because we need to make sure the file pointer does not move /*
// from under our feet Mutex needed because we need to make sure the file pointer does not move
if (index_file < 0) return LOG_INFO_INVALID; from under our feet
*/
if (index_file < 0)
return LOG_INFO_INVALID;
int error = 0; int error = 0;
char* fname = linfo->log_file_name; char* fname = linfo->log_file_name;
IO_CACHE io_cache; IO_CACHE io_cache;
@ -382,7 +391,7 @@ int MYSQL_LOG::reset_logs(THD* thd)
goto err; goto err;
} }
for(;;) for (;;)
{ {
my_delete(linfo.log_file_name, MYF(MY_WME)); my_delete(linfo.log_file_name, MYF(MY_WME));
if (find_next_log(&linfo)) if (find_next_log(&linfo))
@ -490,7 +499,7 @@ err:
rli->linfo.log_file_name); rli->linfo.log_file_name);
goto err2; goto err2;
} }
rli->relay_log_pos = 4; rli->relay_log_pos = BIN_LOG_HEADER_SIZE;
strnmov(rli->relay_log_name,rli->linfo.log_file_name, strnmov(rli->relay_log_name,rli->linfo.log_file_name,
sizeof(rli->relay_log_name)); sizeof(rli->relay_log_name));
flush_relay_log_info(rli); flush_relay_log_info(rli);
@ -550,7 +559,7 @@ int MYSQL_LOG::purge_logs(THD* thd, const char* to_log)
my_off_t init_purge_offset= my_b_tell(&io_cache); my_off_t init_purge_offset= my_b_tell(&io_cache);
if (!(fname_len=my_b_gets(&io_cache, fname, FN_REFLEN))) if (!(fname_len=my_b_gets(&io_cache, fname, FN_REFLEN)))
{ {
if(!io_cache.error) if (!io_cache.error)
break; break;
error = LOG_INFO_IO; error = LOG_INFO_IO;
goto err; goto err;
@ -993,8 +1002,13 @@ bool MYSQL_LOG::write(THD *thd, IO_CACHE *cache)
if (is_open()) if (is_open())
{ {
/*
We come here when the queries to be logged could not fit into memory
and part of the queries are stored in a log file on disk.
*/
uint length; uint length;
//QQ: this looks like a bug - why READ_CACHE? /* Read from the file used to cache the queries .*/
if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0)) if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
{ {
sql_print_error(ER(ER_ERROR_ON_WRITE), cache->file_name, errno); sql_print_error(ER(ER_ERROR_ON_WRITE), cache->file_name, errno);
@ -1003,6 +1017,7 @@ bool MYSQL_LOG::write(THD *thd, IO_CACHE *cache)
length=my_b_bytes_in_cache(cache); length=my_b_bytes_in_cache(cache);
do do
{ {
/* Write data to the binary log file */
if (my_b_write(&log_file, cache->read_pos, length)) if (my_b_write(&log_file, cache->read_pos, length))
{ {
if (!write_error) if (!write_error)
@ -1168,19 +1183,23 @@ void MYSQL_LOG:: wait_for_update(THD* thd)
const char* old_msg = thd->enter_cond(&update_cond, &LOCK_log, const char* old_msg = thd->enter_cond(&update_cond, &LOCK_log,
"Slave: waiting for binlog update"); "Slave: waiting for binlog update");
pthread_cond_wait(&update_cond, &LOCK_log); pthread_cond_wait(&update_cond, &LOCK_log);
// this is not a bug - we unlock the mutex for the caller, and expect him /*
// to lock it and then not unlock it upon return. This is a rather odd This is not a bug:
// way of doing things, but this is the cleanest way I could think of to We unlock the mutex for the caller, and expect him to lock it and
// solve the race deadlock caused by THD::awake() first acquiring mysys_var then not unlock it upon return. This is a rather odd way of doing
// mutex and then the current mutex, while wait_for_update being called with things, but this is the cleanest way I could think of to solve the
// the current mutex already aquired and THD::exit_cond() trying to acquire race deadlock caused by THD::awake() first acquiring mysys_var
// mysys_var mutex. We do need the mutex to be acquired prior to the mutex and then the current mutex, while wait_for_update being
// invocation of wait_for_update in all cases, so mutex acquisition inside called with the current mutex already aquired and THD::exit_cond()
// wait_for_update() is not an option trying to acquire mysys_var mutex. We do need the mutex to be
acquired prior to the invocation of wait_for_update in all cases,
so mutex acquisition inside wait_for_update() is not an option.
*/
pthread_mutex_unlock(&LOCK_log); pthread_mutex_unlock(&LOCK_log);
thd->exit_cond(old_msg); thd->exit_cond(old_msg);
} }
void MYSQL_LOG::close(bool exiting) void MYSQL_LOG::close(bool exiting)
{ // One can't set log_type here! { // One can't set log_type here!
if (is_open()) if (is_open())

View File

@ -159,11 +159,11 @@ static void cleanup_load_tmpdir()
if (!(dirp=my_dir(slave_load_tmpdir,MYF(MY_WME)))) if (!(dirp=my_dir(slave_load_tmpdir,MYF(MY_WME))))
return; return;
for (i=0;i<(uint)dirp->number_off_files;i++) for (i=0 ; i < (uint)dirp->number_off_files; i++)
{ {
file=dirp->dir_entry+i; file=dirp->dir_entry+i;
if (!memcmp(file->name,"SQL_LOAD-",9)) if (is_prefix(file->name,"SQL_LOAD-"))
my_delete(file->name,MYF(MY_WME)); my_delete(file->name,MYF(0));
} }
my_dirend(dirp); my_dirend(dirp);
@ -246,7 +246,7 @@ void Load_log_event::pack_info(String* packet)
char buf[256]; char buf[256];
String tmp(buf, sizeof(buf)); String tmp(buf, sizeof(buf));
tmp.length(0); tmp.length(0);
if(db && db_len) if (db && db_len)
{ {
tmp.append("use "); tmp.append("use ");
tmp.append(db, db_len); tmp.append(db, db_len);
@ -256,9 +256,9 @@ void Load_log_event::pack_info(String* packet)
tmp.append("LOAD DATA INFILE '"); tmp.append("LOAD DATA INFILE '");
tmp.append(fname, fname_len); tmp.append(fname, fname_len);
tmp.append("' ", 2); tmp.append("' ", 2);
if(sql_ex.opt_flags && REPLACE_FLAG ) if (sql_ex.opt_flags && REPLACE_FLAG )
tmp.append(" REPLACE "); tmp.append(" REPLACE ");
else if(sql_ex.opt_flags && IGNORE_FLAG ) else if (sql_ex.opt_flags && IGNORE_FLAG )
tmp.append(" IGNORE "); tmp.append(" IGNORE ");
tmp.append("INTO TABLE "); tmp.append("INTO TABLE ");
@ -305,7 +305,7 @@ void Load_log_event::pack_info(String* packet)
tmp.append(" ("); tmp.append(" (");
for(i = 0; i < num_fields; i++) for(i = 0; i < num_fields; i++)
{ {
if(i) if (i)
tmp.append(" ,"); tmp.append(" ,");
tmp.append( field); tmp.append( field);
@ -326,7 +326,7 @@ void Rotate_log_event::pack_info(String* packet)
tmp.append(new_log_ident, ident_len); tmp.append(new_log_ident, ident_len);
tmp.append(";pos="); tmp.append(";pos=");
tmp.append(llstr(pos,buf)); tmp.append(llstr(pos,buf));
if(flags & LOG_EVENT_FORCED_ROTATE_F) if (flags & LOG_EVENT_FORCED_ROTATE_F)
tmp.append("; forced by master"); tmp.append("; forced by master");
net_store_data(packet, tmp.ptr(), tmp.length()); net_store_data(packet, tmp.ptr(), tmp.length());
} }
@ -436,7 +436,7 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet,
// if the read hits eof, we must report it as eof // if the read hits eof, we must report it as eof
// so the caller will know it can go into cond_wait to be woken up // so the caller will know it can go into cond_wait to be woken up
// on the next update to the log // on the next update to the log
if(!file->error) return LOG_READ_EOF; if (!file->error) return LOG_READ_EOF;
return file->error > 0 ? LOG_READ_TRUNC: LOG_READ_IO; return file->error > 0 ? LOG_READ_TRUNC: LOG_READ_IO;
} }
data_len = uint4korr(buf + EVENT_LEN_OFFSET); data_len = uint4korr(buf + EVENT_LEN_OFFSET);
@ -452,7 +452,7 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet,
{ {
if (packet->append(file, data_len)) if (packet->append(file, data_len))
{ {
if(log_lock) if (log_lock)
pthread_mutex_unlock(log_lock); pthread_mutex_unlock(log_lock);
// here we should never hit eof in a non-error condtion // here we should never hit eof in a non-error condtion
// eof means we are reading the event partially, which should // eof means we are reading the event partially, which should
@ -467,13 +467,13 @@ int Log_event::read_log_event(IO_CACHE* file, String* packet,
#endif // MYSQL_CLIENT #endif // MYSQL_CLIENT
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
#define UNLOCK_MUTEX if(log_lock) pthread_mutex_unlock(log_lock); #define UNLOCK_MUTEX if (log_lock) pthread_mutex_unlock(log_lock);
#else #else
#define UNLOCK_MUTEX #define UNLOCK_MUTEX
#endif #endif
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
#define LOCK_MUTEX if(log_lock) pthread_mutex_lock(log_lock); #define LOCK_MUTEX if (log_lock) pthread_mutex_lock(log_lock);
#else #else
#define LOCK_MUTEX #define LOCK_MUTEX
#endif #endif
@ -672,7 +672,7 @@ void Rotate_log_event::print(FILE* file, bool short_form, char* last_db)
if (new_log_ident) if (new_log_ident)
my_fwrite(file, (byte*) new_log_ident, (uint)ident_len, my_fwrite(file, (byte*) new_log_ident, (uint)ident_len,
MYF(MY_NABP | MY_WME)); MYF(MY_NABP | MY_WME));
fprintf(file, "pos=%s\n", llstr(pos, buf)); fprintf(file, " pos: %s\n", llstr(pos, buf));
fflush(file); fflush(file);
} }
@ -701,11 +701,10 @@ Rotate_log_event::Rotate_log_event(const char* buf, int event_len,
bool old_format): bool old_format):
Log_event(buf, old_format),new_log_ident(NULL),alloced(0) Log_event(buf, old_format),new_log_ident(NULL),alloced(0)
{ {
// the caller will ensure that event_len is what we have at // The caller will ensure that event_len is what we have at EVENT_LEN_OFFSET
// EVENT_LEN_OFFSET
int header_size = (old_format) ? OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN; int header_size = (old_format) ? OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN;
uint ident_offset; uint ident_offset;
if(event_len < header_size) if (event_len < header_size)
return; return;
buf += header_size; buf += header_size;
if (old_format) if (old_format)
@ -753,8 +752,8 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg,
#endif #endif
Query_log_event::Query_log_event(const char* buf, int event_len, Query_log_event::Query_log_event(const char* buf, int event_len,
bool old_format): bool old_format)
Log_event(buf, old_format),data_buf(0), query(NULL), db(NULL) :Log_event(buf, old_format),data_buf(0), query(NULL), db(NULL)
{ {
ulong data_len; ulong data_len;
if (old_format) if (old_format)
@ -801,9 +800,9 @@ void Query_log_event::print(FILE* file, bool short_form, char* last_db)
bool same_db = 0; bool same_db = 0;
if(db && last_db) if (db && last_db)
{ {
if(!(same_db = !memcmp(last_db, db, db_len + 1))) if (!(same_db = !memcmp(last_db, db, db_len + 1)))
memcpy(last_db, db, db_len + 1); memcpy(last_db, db, db_len + 1);
} }
@ -864,7 +863,7 @@ int Intvar_log_event::write_data(IO_CACHE* file)
void Intvar_log_event::print(FILE* file, bool short_form, char* last_db) void Intvar_log_event::print(FILE* file, bool short_form, char* last_db)
{ {
char llbuff[22]; char llbuff[22];
if(!short_form) if (!short_form)
{ {
print_header(file); print_header(file);
fprintf(file, "\tIntvar\n"); fprintf(file, "\tIntvar\n");
@ -961,11 +960,12 @@ char* sql_ex_info::init(char* buf,char* buf_end,bool use_new_format)
if (use_new_format) if (use_new_format)
{ {
empty_flags=0; empty_flags=0;
/* the code below assumes that buf will not disappear from /*
under our feet during the lifetime of the event. This assumption The code below assumes that buf will not disappear from
holds true in the slave thread if the log is in new format, but is not under our feet during the lifetime of the event. This assumption
the case when we have old format because we will be reusing net buffer holds true in the slave thread if the log is in new format, but is not
to read the actual file before we write out the Create_file event the case when we have old format because we will be reusing net buffer
to read the actual file before we write out the Create_file event.
*/ */
if (read_str(buf, buf_end, field_term, field_term_len) || if (read_str(buf, buf_end, field_term, field_term_len) ||
read_str(buf, buf_end, enclosed, enclosed_len) || read_str(buf, buf_end, enclosed, enclosed_len) ||
@ -1003,77 +1003,75 @@ char* sql_ex_info::init(char* buf,char* buf_end,bool use_new_format)
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
Load_log_event::Load_log_event(THD* thd, sql_exchange* ex, Load_log_event::Load_log_event(THD* thd, sql_exchange* ex,
const char* db_arg, const char* table_name_arg, const char* db_arg, const char* table_name_arg,
List<Item>& fields_arg, enum enum_duplicates handle_dup): List<Item>& fields_arg, enum enum_duplicates handle_dup)
Log_event(thd),thread_id(thd->thread_id), :Log_event(thd),thread_id(thd->thread_id), num_fields(0),fields(0),
num_fields(0),fields(0),field_lens(0),field_block_len(0), field_lens(0),field_block_len(0), table_name(table_name_arg),
table_name(table_name_arg), db(db_arg), fname(ex->file_name)
db(db_arg), {
fname(ex->file_name) time_t end_time;
time(&end_time);
exec_time = (ulong) (end_time - thd->start_time);
db_len = (db) ? (uint32) strlen(db) : 0;
table_name_len = (table_name) ? (uint32) strlen(table_name) : 0;
fname_len = (fname) ? (uint) strlen(fname) : 0;
sql_ex.field_term = (char*) ex->field_term->ptr();
sql_ex.field_term_len = (uint8) ex->field_term->length();
sql_ex.enclosed = (char*) ex->enclosed->ptr();
sql_ex.enclosed_len = (uint8) ex->enclosed->length();
sql_ex.line_term = (char*) ex->line_term->ptr();
sql_ex.line_term_len = (uint8) ex->line_term->length();
sql_ex.line_start = (char*) ex->line_start->ptr();
sql_ex.line_start_len = (uint8) ex->line_start->length();
sql_ex.escaped = (char*) ex->escaped->ptr();
sql_ex.escaped_len = (uint8) ex->escaped->length();
sql_ex.opt_flags = 0;
sql_ex.cached_new_format = -1;
if (ex->dumpfile)
sql_ex.opt_flags |= DUMPFILE_FLAG;
if (ex->opt_enclosed)
sql_ex.opt_flags |= OPT_ENCLOSED_FLAG;
sql_ex.empty_flags = 0;
switch(handle_dup)
{ {
time_t end_time; case DUP_IGNORE: sql_ex.opt_flags |= IGNORE_FLAG; break;
time(&end_time); case DUP_REPLACE: sql_ex.opt_flags |= REPLACE_FLAG; break;
exec_time = (ulong) (end_time - thd->start_time); case DUP_ERROR: break;
db_len = (db) ? (uint32) strlen(db) : 0;
table_name_len = (table_name) ? (uint32) strlen(table_name) : 0;
fname_len = (fname) ? (uint) strlen(fname) : 0;
sql_ex.field_term = (char*) ex->field_term->ptr();
sql_ex.field_term_len = (uint8) ex->field_term->length();
sql_ex.enclosed = (char*) ex->enclosed->ptr();
sql_ex.enclosed_len = (uint8) ex->enclosed->length();
sql_ex.line_term = (char*) ex->line_term->ptr();
sql_ex.line_term_len = (uint8) ex->line_term->length();
sql_ex.line_start = (char*) ex->line_start->ptr();
sql_ex.line_start_len = (uint8) ex->line_start->length();
sql_ex.escaped = (char*) ex->escaped->ptr();
sql_ex.escaped_len = (uint8) ex->escaped->length();
sql_ex.opt_flags = 0;
sql_ex.cached_new_format = -1;
if(ex->dumpfile)
sql_ex.opt_flags |= DUMPFILE_FLAG;
if(ex->opt_enclosed)
sql_ex.opt_flags |= OPT_ENCLOSED_FLAG;
sql_ex.empty_flags = 0;
switch(handle_dup)
{
case DUP_IGNORE: sql_ex.opt_flags |= IGNORE_FLAG; break;
case DUP_REPLACE: sql_ex.opt_flags |= REPLACE_FLAG; break;
case DUP_ERROR: break;
}
if(!ex->field_term->length())
sql_ex.empty_flags |= FIELD_TERM_EMPTY;
if(!ex->enclosed->length())
sql_ex.empty_flags |= ENCLOSED_EMPTY;
if(!ex->line_term->length())
sql_ex.empty_flags |= LINE_TERM_EMPTY;
if(!ex->line_start->length())
sql_ex.empty_flags |= LINE_START_EMPTY;
if(!ex->escaped->length())
sql_ex.empty_flags |= ESCAPED_EMPTY;
skip_lines = ex->skip_lines;
List_iterator<Item> li(fields_arg);
field_lens_buf.length(0);
fields_buf.length(0);
Item* item;
while((item = li++))
{
num_fields++;
uchar len = (uchar) strlen(item->name);
field_block_len += len + 1;
fields_buf.append(item->name, len + 1);
field_lens_buf.append((char*)&len, 1);
}
field_lens = (const uchar*)field_lens_buf.ptr();
fields = fields_buf.ptr();
} }
if (!ex->field_term->length())
sql_ex.empty_flags |= FIELD_TERM_EMPTY;
if (!ex->enclosed->length())
sql_ex.empty_flags |= ENCLOSED_EMPTY;
if (!ex->line_term->length())
sql_ex.empty_flags |= LINE_TERM_EMPTY;
if (!ex->line_start->length())
sql_ex.empty_flags |= LINE_START_EMPTY;
if (!ex->escaped->length())
sql_ex.empty_flags |= ESCAPED_EMPTY;
skip_lines = ex->skip_lines;
List_iterator<Item> li(fields_arg);
field_lens_buf.length(0);
fields_buf.length(0);
Item* item;
while ((item = li++))
{
num_fields++;
uchar len = (uchar) strlen(item->name);
field_block_len += len + 1;
fields_buf.append(item->name, len + 1);
field_lens_buf.append((char*)&len, 1);
}
field_lens = (const uchar*)field_lens_buf.ptr();
fields = fields_buf.ptr();
}
#endif #endif
// the caller must do buf[event_len] = 0 before he starts using the // the caller must do buf[event_len] = 0 before he starts using the
@ -1145,32 +1143,32 @@ void Load_log_event::print(FILE* file, bool short_form, char* last_db)
bool same_db = 0; bool same_db = 0;
if(db && last_db) if (db && last_db)
{ {
if(!(same_db = !memcmp(last_db, db, db_len + 1))) if (!(same_db = !memcmp(last_db, db, db_len + 1)))
memcpy(last_db, db, db_len + 1); memcpy(last_db, db, db_len + 1);
} }
if(db && db[0] && !same_db) if (db && db[0] && !same_db)
fprintf(file, "use %s;\n", db); fprintf(file, "use %s;\n", db);
fprintf(file, "LOAD DATA INFILE '%-*s' ", fname_len, fname); fprintf(file, "LOAD DATA INFILE '%-*s' ", fname_len, fname);
if(sql_ex.opt_flags && REPLACE_FLAG ) if (sql_ex.opt_flags && REPLACE_FLAG )
fprintf(file," REPLACE "); fprintf(file," REPLACE ");
else if(sql_ex.opt_flags && IGNORE_FLAG ) else if (sql_ex.opt_flags && IGNORE_FLAG )
fprintf(file," IGNORE "); fprintf(file," IGNORE ");
fprintf(file, "INTO TABLE %s ", table_name); fprintf(file, "INTO TABLE %s ", table_name);
if(sql_ex.field_term) if (sql_ex.field_term)
{ {
fprintf(file, " FIELDS TERMINATED BY "); fprintf(file, " FIELDS TERMINATED BY ");
pretty_print_str(file, sql_ex.field_term, sql_ex.field_term_len); pretty_print_str(file, sql_ex.field_term, sql_ex.field_term_len);
} }
if(sql_ex.enclosed) if (sql_ex.enclosed)
{ {
if(sql_ex.opt_flags && OPT_ENCLOSED_FLAG ) if (sql_ex.opt_flags && OPT_ENCLOSED_FLAG )
fprintf(file," OPTIONALLY "); fprintf(file," OPTIONALLY ");
fprintf(file, " ENCLOSED BY "); fprintf(file, " ENCLOSED BY ");
pretty_print_str(file, sql_ex.enclosed, sql_ex.enclosed_len); pretty_print_str(file, sql_ex.enclosed, sql_ex.enclosed_len);
@ -1194,7 +1192,7 @@ void Load_log_event::print(FILE* file, bool short_form, char* last_db)
pretty_print_str(file, sql_ex.line_start, sql_ex.line_start_len); pretty_print_str(file, sql_ex.line_start, sql_ex.line_start_len);
} }
if((int)skip_lines > 0) if ((int)skip_lines > 0)
fprintf(file, " IGNORE %ld LINES ", (long) skip_lines); fprintf(file, " IGNORE %ld LINES ", (long) skip_lines);
if (num_fields) if (num_fields)
@ -1204,7 +1202,7 @@ void Load_log_event::print(FILE* file, bool short_form, char* last_db)
fprintf( file, " ("); fprintf( file, " (");
for(i = 0; i < num_fields; i++) for(i = 0; i < num_fields; i++)
{ {
if(i) if (i)
fputc(',', file); fputc(',', file);
fprintf(file, field); fprintf(file, field);
@ -1282,7 +1280,7 @@ Slave_log_event::~Slave_log_event()
void Slave_log_event::print(FILE* file, bool short_form, char* last_db) void Slave_log_event::print(FILE* file, bool short_form, char* last_db)
{ {
char llbuff[22]; char llbuff[22];
if(short_form) if (short_form)
return; return;
print_header(file); print_header(file);
fputc('\n', file); fputc('\n', file);
@ -1314,7 +1312,7 @@ void Slave_log_event::init_from_mem_pool(int data_size)
master_host_len = strlen(master_host); master_host_len = strlen(master_host);
// safety // safety
master_log = master_host + master_host_len + 1; master_log = master_host + master_host_len + 1;
if(master_log > mem_pool + data_size) if (master_log > mem_pool + data_size)
{ {
master_host = 0; master_host = 0;
return; return;
@ -1326,9 +1324,9 @@ Slave_log_event::Slave_log_event(const char* buf, int event_len):
Log_event(buf,0),mem_pool(0),master_host(0) Log_event(buf,0),mem_pool(0),master_host(0)
{ {
event_len -= LOG_EVENT_HEADER_LEN; event_len -= LOG_EVENT_HEADER_LEN;
if(event_len < 0) if (event_len < 0)
return; return;
if(!(mem_pool = (char*)my_malloc(event_len + 1, MYF(MY_WME)))) if (!(mem_pool = (char*)my_malloc(event_len + 1, MYF(MY_WME))))
return; return;
memcpy(mem_pool, buf + LOG_EVENT_HEADER_LEN, event_len); memcpy(mem_pool, buf + LOG_EVENT_HEADER_LEN, event_len);
mem_pool[event_len] = 0; mem_pool[event_len] = 0;
@ -1341,7 +1339,7 @@ Create_file_log_event::Create_file_log_event(THD* thd_arg, sql_exchange* ex,
List<Item>& fields_arg, enum enum_duplicates handle_dup, List<Item>& fields_arg, enum enum_duplicates handle_dup,
char* block_arg, uint block_len_arg): char* block_arg, uint block_len_arg):
Load_log_event(thd_arg,ex,db_arg,table_name_arg,fields_arg,handle_dup), Load_log_event(thd_arg,ex,db_arg,table_name_arg,fields_arg,handle_dup),
fake_base(0),block(block_arg),block_len(block_len_arg), fake_base(0),block(block_arg),block_len(block_len_arg),
file_id(thd_arg->file_id = mysql_bin_log.next_file_id()) file_id(thd_arg->file_id = mysql_bin_log.next_file_id())
{ {
sql_ex.force_new_format(); sql_ex.force_new_format();
@ -1409,7 +1407,7 @@ void Create_file_log_event::print(FILE* file, bool short_form,
if (short_form) if (short_form)
return; return;
Load_log_event::print(file, 1, last_db); Load_log_event::print(file, 1, last_db);
fprintf(file, " file_id=%d, block_len=%d\n", file_id, block_len); fprintf(file, " file_id: %d block_len: %d\n", file_id, block_len);
} }
#endif #endif
@ -1444,7 +1442,7 @@ Append_block_log_event::Append_block_log_event(THD* thd_arg, char* block_arg,
Append_block_log_event::Append_block_log_event(const char* buf, int len): Append_block_log_event::Append_block_log_event(const char* buf, int len):
Log_event(buf, 0),block(0) Log_event(buf, 0),block(0)
{ {
if((uint)len < APPEND_BLOCK_EVENT_OVERHEAD) if ((uint)len < APPEND_BLOCK_EVENT_OVERHEAD)
return; return;
file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN + AB_FILE_ID_OFFSET); file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN + AB_FILE_ID_OFFSET);
block = (char*)buf + APPEND_BLOCK_EVENT_OVERHEAD; block = (char*)buf + APPEND_BLOCK_EVENT_OVERHEAD;
@ -1467,7 +1465,7 @@ void Append_block_log_event::print(FILE* file, bool short_form,
return; return;
print_header(file); print_header(file);
fputc('\n', file); fputc('\n', file);
fprintf(file, "#Append_block: file_id=%d, block_len=%d\n", fprintf(file, "#Append_block: file_id: %d block_len: %d\n",
file_id, block_len); file_id, block_len);
} }
#endif #endif
@ -1496,7 +1494,7 @@ Delete_file_log_event::Delete_file_log_event(THD* thd_arg):
Delete_file_log_event::Delete_file_log_event(const char* buf, int len): Delete_file_log_event::Delete_file_log_event(const char* buf, int len):
Log_event(buf, 0),file_id(0) Log_event(buf, 0),file_id(0)
{ {
if((uint)len < DELETE_FILE_EVENT_OVERHEAD) if ((uint)len < DELETE_FILE_EVENT_OVERHEAD)
return; return;
file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN + AB_FILE_ID_OFFSET); file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN + AB_FILE_ID_OFFSET);
} }
@ -1543,7 +1541,7 @@ Execute_load_log_event::Execute_load_log_event(THD* thd_arg):
Execute_load_log_event::Execute_load_log_event(const char* buf,int len): Execute_load_log_event::Execute_load_log_event(const char* buf,int len):
Log_event(buf, 0),file_id(0) Log_event(buf, 0),file_id(0)
{ {
if((uint)len < EXEC_LOAD_EVENT_OVERHEAD) if ((uint)len < EXEC_LOAD_EVENT_OVERHEAD)
return; return;
file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN + EL_FILE_ID_OFFSET); file_id = uint4korr(buf + LOG_EVENT_HEADER_LEN + EL_FILE_ID_OFFSET);
} }
@ -1662,7 +1660,7 @@ int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli)
thd->query = 0; thd->query = 0;
thd->query_error = 0; thd->query_error = 0;
if(db_ok(thd->db, replicate_do_db, replicate_ignore_db)) if (db_ok(thd->db, replicate_do_db, replicate_ignore_db))
{ {
thd->set_time((time_t)when); thd->set_time((time_t)when);
thd->current_tablenr = 0; thd->current_tablenr = 0;
@ -1676,7 +1674,7 @@ int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli)
tables.name = tables.real_name = (char*)table_name; tables.name = tables.real_name = (char*)table_name;
tables.lock_type = TL_WRITE; tables.lock_type = TL_WRITE;
// the table will be opened in mysql_load // the table will be opened in mysql_load
if(table_rules_on && !tables_ok(thd, &tables)) if (table_rules_on && !tables_ok(thd, &tables))
{ {
// TODO: this is a bug - this needs to be moved to the I/O thread // TODO: this is a bug - this needs to be moved to the I/O thread
if (net) if (net)
@ -1712,14 +1710,14 @@ int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli)
// about the packet sequence // about the packet sequence
thd->net.pkt_nr = net->pkt_nr; thd->net.pkt_nr = net->pkt_nr;
} }
if(mysql_load(thd, &ex, &tables, fields, handle_dup, net != 0, if (mysql_load(thd, &ex, &tables, fields, handle_dup, net != 0,
TL_WRITE)) TL_WRITE))
thd->query_error = 1; thd->query_error = 1;
if(thd->cuted_fields) if (thd->cuted_fields)
sql_print_error("Slave: load data infile at position %s in log \ sql_print_error("Slave: load data infile at position %s in log \
'%s' produced %d warning(s)", llstr(rli->master_log_pos,llbuff), RPL_LOG_NAME, '%s' produced %d warning(s)", llstr(rli->master_log_pos,llbuff), RPL_LOG_NAME,
thd->cuted_fields ); thd->cuted_fields );
if(net) if (net)
net->pkt_nr = thd->net.pkt_nr; net->pkt_nr = thd->net.pkt_nr;
} }
} }
@ -1735,7 +1733,7 @@ int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli)
thd->net.vio = 0; thd->net.vio = 0;
thd->db = 0;// prevent db from being freed thd->db = 0;// prevent db from being freed
close_thread_tables(thd); close_thread_tables(thd);
if(thd->query_error) if (thd->query_error)
{ {
int sql_error = thd->net.last_errno; int sql_error = thd->net.last_errno;
if (!sql_error) if (!sql_error)
@ -1749,7 +1747,7 @@ int Load_log_event::exec_event(NET* net, struct st_relay_log_info* rli)
} }
free_root(&thd->mem_root,0); free_root(&thd->mem_root,0);
if(thd->fatal_error) if (thd->fatal_error)
{ {
sql_print_error("Slave: Fatal error running LOAD DATA INFILE "); sql_print_error("Slave: Fatal error running LOAD DATA INFILE ");
return 1; return 1;
@ -1849,7 +1847,7 @@ int Intvar_log_event::exec_event(struct st_relay_log_info* rli)
int Slave_log_event::exec_event(struct st_relay_log_info* rli) int Slave_log_event::exec_event(struct st_relay_log_info* rli)
{ {
if(mysql_bin_log.is_open()) if (mysql_bin_log.is_open())
mysql_bin_log.write(this); mysql_bin_log.write(this);
return Log_event::exec_event(rli); return Log_event::exec_event(rli);
} }
@ -1978,11 +1976,12 @@ int Execute_load_log_event::exec_event(struct st_relay_log_info* rli)
slave_print_error(rli,0, "File '%s' appears corrupted", fname); slave_print_error(rli,0, "File '%s' appears corrupted", fname);
goto err; goto err;
} }
// we want to disable binary logging in slave thread /*
// because we need the file events to appear in the same order We want to disable binary logging in slave thread because we need the file
// as they do on the master relative to other events, so that we events to appear in the same order as they do on the master relative to
// can preserve ascending order of log sequence numbers - needed other events, so that we can preserve ascending order of log sequence
// to handle failover numbers - needed to handle failover .
*/
save_options = thd->options; save_options = thd->options;
thd->options &= ~ (ulong) (OPTION_BIN_LOG); thd->options &= ~ (ulong) (OPTION_BIN_LOG);
lev->thd = thd; lev->thd = thd;

View File

@ -670,8 +670,10 @@ int load_master_data(THD* thd)
int restart_thread_mask; int restart_thread_mask;
mc_mysql_init(&mysql); mc_mysql_init(&mysql);
// we do not want anyone messing with the slave at all for the entire /*
// duration of the data load; We do not want anyone messing with the slave at all for the entire
duration of the data load.
*/
LOCK_ACTIVE_MI; LOCK_ACTIVE_MI;
lock_slave_threads(active_mi); lock_slave_threads(active_mi);
init_thread_mask(&restart_thread_mask,active_mi,0 /*not inverse*/); init_thread_mask(&restart_thread_mask,active_mi,0 /*not inverse*/);
@ -707,8 +709,10 @@ int load_master_data(THD* thd)
if (!(num_dbs = (uint) mc_mysql_num_rows(db_res))) if (!(num_dbs = (uint) mc_mysql_num_rows(db_res)))
goto err; goto err;
// in theory, the master could have no databases at all /*
// and run with skip-grant In theory, the master could have no databases at all
and run with skip-grant
*/
if (!(table_res = (MYSQL_RES**)thd->alloc(num_dbs * sizeof(MYSQL_RES*)))) if (!(table_res = (MYSQL_RES**)thd->alloc(num_dbs * sizeof(MYSQL_RES*))))
{ {
@ -716,10 +720,12 @@ int load_master_data(THD* thd)
goto err; goto err;
} }
// this is a temporary solution until we have online backup /*
// capabilities - to be replaced once online backup is working This is a temporary solution until we have online backup
// we wait to issue FLUSH TABLES WITH READ LOCK for as long as we capabilities - to be replaced once online backup is working
// can to minimize the lock time we wait to issue FLUSH TABLES WITH READ LOCK for as long as we
can to minimize the lock time.
*/
if (mc_mysql_query(&mysql, "FLUSH TABLES WITH READ LOCK", 0) || if (mc_mysql_query(&mysql, "FLUSH TABLES WITH READ LOCK", 0) ||
mc_mysql_query(&mysql, "SHOW MASTER STATUS",0) || mc_mysql_query(&mysql, "SHOW MASTER STATUS",0) ||
!(master_status_res = mc_mysql_store_result(&mysql))) !(master_status_res = mc_mysql_store_result(&mysql)))
@ -729,8 +735,10 @@ int load_master_data(THD* thd)
goto err; goto err;
} }
// go through every table in every database, and if the replication /*
// rules allow replicating it, get it Go through every table in every database, and if the replication
rules allow replicating it, get it
*/
table_res_end = table_res + num_dbs; table_res_end = table_res + num_dbs;
@ -819,7 +827,7 @@ int load_master_data(THD* thd)
} }
} }
thd->proc_info="purging old relay logs"; thd->proc_info="purging old relay logs";
if (purge_relay_logs(&active_mi->rli,0 /* not only reset, but also reinit*/, if (purge_relay_logs(&active_mi->rli,0 /* not only reset, but also reinit */,
&errmsg)) &errmsg))
{ {
send_error(&thd->net, 0, "Failed purging old relay logs"); send_error(&thd->net, 0, "Failed purging old relay logs");

File diff suppressed because it is too large Load Diff

View File

@ -84,18 +84,9 @@ typedef struct st_relay_log_info
volatile my_off_t master_log_pos; volatile my_off_t master_log_pos;
/* /*
current offset in the relay log. Protected with internal locks.
pending - in some cases we do not increment offset immediately after Must get data_lock when resetting the logs.
processing an event, because the following event needs to be processed
atomically together with this one ( so far, there is only one type of
such event - Intvar_event that sets auto_increment value). However, once
both events have been processed, we need to increment by the cumulative
offset. pending stored the extra offset to be added to the position.
*/ */
ulonglong relay_log_pos, pending;
// protected with internal locks
// must get data_lock when resetting the logs
MYSQL_LOG relay_log; MYSQL_LOG relay_log;
LOG_INFO linfo; LOG_INFO linfo;
IO_CACHE cache_buf,*cur_log; IO_CACHE cache_buf,*cur_log;
@ -125,9 +116,6 @@ typedef struct st_relay_log_info
*/ */
pthread_cond_t start_cond, stop_cond, data_cond; pthread_cond_t start_cond, stop_cond, data_cond;
// if not set, the value of other members of the structure are undefined
bool inited;
// parent master info structure // parent master info structure
struct st_master_info *mi; struct st_master_info *mi;
@ -135,9 +123,19 @@ typedef struct st_relay_log_info
Needed to deal properly with cur_log getting closed and re-opened with Needed to deal properly with cur_log getting closed and re-opened with
a different log under our feet a different log under our feet
*/ */
int cur_log_init_count; uint32 cur_log_old_open_count;
volatile bool abort_slave, slave_running; /*
current offset in the relay log.
pending - in some cases we do not increment offset immediately after
processing an event, because the following event needs to be processed
atomically together with this one ( so far, there is only one type of
such event - Intvar_event that sets auto_increment value). However, once
both events have been processed, we need to increment by the cumulative
offset. pending stored the extra offset to be added to the position.
*/
ulonglong relay_log_pos, pending;
ulonglong log_space_limit,log_space_total;
/* /*
Needed for problems when slave stops and we want to restart it Needed for problems when slave stops and we want to restart it
@ -145,45 +143,47 @@ typedef struct st_relay_log_info
errors, and have been manually applied by DBA already. errors, and have been manually applied by DBA already.
*/ */
volatile uint32 slave_skip_counter; volatile uint32 slave_skip_counter;
pthread_mutex_t log_space_lock;
pthread_cond_t log_space_cond;
THD * sql_thd;
int last_slave_errno;
#ifndef DBUG_OFF #ifndef DBUG_OFF
int events_till_abort; int events_till_abort;
#endif #endif
int last_slave_errno;
char last_slave_error[MAX_SLAVE_ERRMSG]; char last_slave_error[MAX_SLAVE_ERRMSG];
THD* sql_thd;
// if not set, the value of other members of the structure are undefined
bool inited;
volatile bool abort_slave, slave_running;
bool log_pos_current; bool log_pos_current;
bool abort_pos_wait; bool abort_pos_wait;
bool skip_log_purge; bool skip_log_purge;
ulonglong log_space_limit,log_space_total;
pthread_mutex_t log_space_lock;
pthread_cond_t log_space_cond;
st_relay_log_info():info_fd(-1),cur_log_fd(-1),inited(0), st_relay_log_info()
cur_log_init_count(0), :info_fd(-1),cur_log_fd(-1), cur_log_old_open_count(0),
abort_slave(0),slave_running(0), inited(0), abort_slave(0), slave_running(0), log_pos_current(0),
log_pos_current(0),abort_pos_wait(0), abort_pos_wait(0), skip_log_purge(0)
skip_log_purge(0) {
{ relay_log_name[0] = master_log_name[0] = 0;
relay_log_name[0] = master_log_name[0] = 0; bzero(&info_file,sizeof(info_file));
bzero(&info_file,sizeof(info_file)); pthread_mutex_init(&run_lock, MY_MUTEX_INIT_FAST);
pthread_mutex_init(&run_lock, MY_MUTEX_INIT_FAST); pthread_mutex_init(&data_lock, MY_MUTEX_INIT_FAST);
pthread_mutex_init(&data_lock, MY_MUTEX_INIT_FAST); pthread_mutex_init(&log_space_lock, MY_MUTEX_INIT_FAST);
pthread_mutex_init(&log_space_lock, MY_MUTEX_INIT_FAST); pthread_cond_init(&data_cond, NULL);
pthread_cond_init(&data_cond, NULL); pthread_cond_init(&start_cond, NULL);
pthread_cond_init(&start_cond, NULL); pthread_cond_init(&stop_cond, NULL);
pthread_cond_init(&stop_cond, NULL); pthread_cond_init(&log_space_cond, NULL);
pthread_cond_init(&log_space_cond, NULL); }
}
~st_relay_log_info() ~st_relay_log_info()
{ {
pthread_mutex_destroy(&run_lock); pthread_mutex_destroy(&run_lock);
pthread_mutex_destroy(&data_lock); pthread_mutex_destroy(&data_lock);
pthread_mutex_destroy(&log_space_lock); pthread_mutex_destroy(&log_space_lock);
pthread_cond_destroy(&data_cond); pthread_cond_destroy(&data_cond);
pthread_cond_destroy(&start_cond); pthread_cond_destroy(&start_cond);
pthread_cond_destroy(&stop_cond); pthread_cond_destroy(&stop_cond);
pthread_cond_destroy(&log_space_cond); pthread_cond_destroy(&log_space_cond);
} }
inline void inc_pending(ulonglong val) inline void inc_pending(ulonglong val)
{ {
pending += val; pending += val;
@ -215,40 +215,33 @@ typedef struct st_relay_log_info
int wait_for_pos(THD* thd, String* log_name, ulonglong log_pos); int wait_for_pos(THD* thd, String* log_name, ulonglong log_pos);
} RELAY_LOG_INFO; } RELAY_LOG_INFO;
/*
repopen_relay_log() is called when we notice that the current "hot" log
got rotated under our feet
*/
IO_CACHE* reopen_relay_log(RELAY_LOG_INFO* rli, const char** errmsg);
Log_event* next_event(RELAY_LOG_INFO* rli); Log_event* next_event(RELAY_LOG_INFO* rli);
/* /*
st_master_info contains information about how to connect to a master, st_master_info contains information about how to connect to a master,
current master log name, and current log offset, as well as misc current master log name, and current log offset, as well as misc
control variables control variables
st_master_info is initialized once from the master.info file if such st_master_info is initialized once from the master.info file if such
exists. Otherwise, data members corresponding to master.info fields are exists. Otherwise, data members corresponding to master.info fields
initialized with defaults specified by master-* options. The initialization are initialized with defaults specified by master-* options. The
is done through init_master_info() call. initialization is done through init_master_info() call.
The format of master.info file: The format of master.info file:
log_name log_name
log_pos log_pos
master_host master_host
master_user master_user
master_pass master_pass
master_port master_port
master_connect_retry master_connect_retry
To write out the contents of master.info file to disk ( needed every To write out the contents of master.info file to disk ( needed every
time we read and queue data from the master ), a call to time we read and queue data from the master ), a call to
flush_master_info() is required. flush_master_info() is required.
To clean up, call end_master_info() To clean up, call end_master_info()
*/ */

View File

@ -60,48 +60,49 @@ class Log_event;
class MYSQL_LOG { class MYSQL_LOG {
private: private:
pthread_mutex_t LOCK_log, LOCK_index; pthread_mutex_t LOCK_log, LOCK_index;
pthread_cond_t update_cond;
ulonglong bytes_written;
time_t last_time,query_start; time_t last_time,query_start;
IO_CACHE log_file; IO_CACHE log_file;
File index_file; File index_file;
char *name; char *name;
volatile enum_log_type log_type;
char time_buff[20],db[NAME_LEN+1]; char time_buff[20],db[NAME_LEN+1];
char log_file_name[FN_REFLEN],index_file_name[FN_REFLEN]; char log_file_name[FN_REFLEN],index_file_name[FN_REFLEN];
bool write_error,inited; // current file sequence number for load data infile binary logging
uint file_id; // current file sequence number for load data infile uint file_id;
// binary logging uint open_count; // For replication
bool no_rotate; // for binlog - if log name can never change /*
// we should not try to rotate it or write any rotation events For binlog - if log name can never change we should not try to rotate it
// the user should use FLUSH MASTER instead of FLUSH LOGS for or write any rotation events. The user should use FLUSH MASTER instead
// purging of FLUSH LOGS for purging.
*/
volatile enum_log_type log_type;
enum cache_type io_cache_type; enum cache_type io_cache_type;
bool write_error,inited;
bool no_rotate;
bool need_start_event; bool need_start_event;
pthread_cond_t update_cond;
bool no_auto_events; // for relay binlog bool no_auto_events; // for relay binlog
ulonglong bytes_written;
friend class Log_event; friend class Log_event;
public: public:
MYSQL_LOG(); MYSQL_LOG();
~MYSQL_LOG(); ~MYSQL_LOG();
pthread_mutex_t* get_log_lock() { return &LOCK_log; }
void reset_bytes_written() void reset_bytes_written()
{ {
bytes_written = 0; bytes_written = 0;
} }
void harvest_bytes_written(ulonglong* counter) void harvest_bytes_written(ulonglong* counter)
{ {
#ifndef DBUG_OFF #ifndef DBUG_OFF
char buf1[22],buf2[22]; char buf1[22],buf2[22];
#endif #endif
DBUG_ENTER("harvest_bytes_written"); DBUG_ENTER("harvest_bytes_written");
(*counter)+=bytes_written; (*counter)+=bytes_written;
DBUG_PRINT("info",("counter=%s,bytes_written=%s", llstr(*counter,buf1), DBUG_PRINT("info",("counter: %s bytes_written: %s", llstr(*counter,buf1),
llstr(bytes_written,buf2))); llstr(bytes_written,buf2)));
bytes_written=0; bytes_written=0;
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
IO_CACHE* get_log_file() { return &log_file; }
void signal_update() { pthread_cond_broadcast(&update_cond);} void signal_update() { pthread_cond_broadcast(&update_cond);}
void wait_for_update(THD* thd); void wait_for_update(THD* thd);
void set_need_start_event() { need_start_event = 1; } void set_need_start_event() { need_start_event = 1; }
@ -135,8 +136,8 @@ public:
int purge_logs(THD* thd, const char* to_log); int purge_logs(THD* thd, const char* to_log);
int purge_first_log(struct st_relay_log_info* rli); int purge_first_log(struct st_relay_log_info* rli);
int reset_logs(THD* thd); int reset_logs(THD* thd);
void close(bool exiting = 0); // if we are exiting, we also want to close the // if we are exiting, we also want to close the index file
// index file void close(bool exiting = 0);
// iterating through the log index file // iterating through the log index file
int find_first_log(LOG_INFO* linfo, const char* log_name, int find_first_log(LOG_INFO* linfo, const char* log_name,
@ -146,11 +147,15 @@ public:
uint next_file_id(); uint next_file_id();
inline bool is_open() { return log_type != LOG_CLOSED; } inline bool is_open() { return log_type != LOG_CLOSED; }
char* get_index_fname() { return index_file_name;} inline char* get_index_fname() { return index_file_name;}
char* get_log_fname() { return log_file_name; } inline char* get_log_fname() { return log_file_name; }
void lock_index() { pthread_mutex_lock(&LOCK_index);} inline pthread_mutex_t* get_log_lock() { return &LOCK_log; }
void unlock_index() { pthread_mutex_unlock(&LOCK_index);} inline IO_CACHE* get_log_file() { return &log_file; }
File get_index_file() { return index_file;}
inline void lock_index() { pthread_mutex_lock(&LOCK_index);}
inline void unlock_index() { pthread_mutex_unlock(&LOCK_index);}
inline File get_index_file() { return index_file;}
inline uint32 get_open_count() { return open_count; }
}; };
/* character conversion tables */ /* character conversion tables */

View File

@ -2891,6 +2891,7 @@ bool add_field_to_list(char *field_name, enum_field_types type,
case FIELD_TYPE_STRING: case FIELD_TYPE_STRING:
case FIELD_TYPE_VAR_STRING: case FIELD_TYPE_VAR_STRING:
case FIELD_TYPE_NULL: case FIELD_TYPE_NULL:
case FIELD_TYPE_GEOMETRY:
break; break;
case FIELD_TYPE_DECIMAL: case FIELD_TYPE_DECIMAL:
if (!length) if (!length)

View File

@ -154,6 +154,7 @@ File open_binlog(IO_CACHE *log, const char *log_file_name,
const char **errmsg) const char **errmsg)
{ {
File file; File file;
DBUG_ENTER("open_binlog");
if ((file = my_open(log_file_name, O_RDONLY | O_BINARY, MYF(MY_WME))) < 0 || if ((file = my_open(log_file_name, O_RDONLY | O_BINARY, MYF(MY_WME))) < 0 ||
init_io_cache(log, file, IO_SIZE*2, READ_CACHE, 0, 0, init_io_cache(log, file, IO_SIZE*2, READ_CACHE, 0, 0,
@ -164,7 +165,7 @@ File open_binlog(IO_CACHE *log, const char *log_file_name,
} }
if (check_binlog_magic(log,errmsg)) if (check_binlog_magic(log,errmsg))
goto err; goto err;
return file; DBUG_RETURN(file);
err: err:
if (file >= 0) if (file >= 0)
@ -172,7 +173,7 @@ err:
my_close(file,MYF(0)); my_close(file,MYF(0));
end_io_cache(log); end_io_cache(log);
} }
return -1; DBUG_RETURN(-1);
} }
@ -628,7 +629,8 @@ int reset_slave(MASTER_INFO* mi)
char fname[FN_REFLEN]; char fname[FN_REFLEN];
int restart_thread_mask = 0,error=0; int restart_thread_mask = 0,error=0;
const char* errmsg=0; const char* errmsg=0;
DBUG_ENTER("reset_slave");
lock_slave_threads(mi); lock_slave_threads(mi);
init_thread_mask(&restart_thread_mask,mi,0 /* not inverse */); init_thread_mask(&restart_thread_mask,mi,0 /* not inverse */);
if ((error=terminate_slave_threads(mi,restart_thread_mask,1 /*skip lock*/)) if ((error=terminate_slave_threads(mi,restart_thread_mask,1 /*skip lock*/))
@ -649,14 +651,14 @@ int reset_slave(MASTER_INFO* mi)
goto err; goto err;
} }
if (restart_thread_mask) if (restart_thread_mask)
error=start_slave_threads(0 /* mutex not needed*/, error=start_slave_threads(0 /* mutex not needed */,
1 /* wait for start*/, 1 /* wait for start*/,
mi,master_info_file,relay_log_info_file, mi,master_info_file,relay_log_info_file,
restart_thread_mask); restart_thread_mask);
// TODO: fix error messages so they get to the client // TODO: fix error messages so they get to the client
err: err:
unlock_slave_threads(mi); unlock_slave_threads(mi);
return error; DBUG_RETURN(error);
} }
void kill_zombie_dump_threads(uint32 slave_server_id) void kill_zombie_dump_threads(uint32 slave_server_id)

View File

@ -129,6 +129,10 @@ bfill((A)->null_flags,(A)->null_bytes,255);\
*/ */
#define MIN_TURBOBM_PATTERN_LEN 3 #define MIN_TURBOBM_PATTERN_LEN 3
/* Defines for binary logging */
#define BIN_LOG_HEADER_SIZE 4
/* Include prototypes for unireg */ /* Include prototypes for unireg */
#include "mysqld_error.h" #include "mysqld_error.h"