SQL, IB: (0.10) VTMD tracking [closes #124]

IB: Fixes in logic when to do versioned or usual row updates. Now it is
able to do unversioned updates for versioned tables just by disabling
`TABLE_SHARE::versioned` flag.

SQL: DDL tracking for:
* RENAME TABLE, ALTER TABLE .. RENAME TO;
* DROP TABLE;
* data-modifying operations (f.ex. ALTER TABLE .. ADD/DROP COLUMN).
This commit is contained in:
Aleksey Midenkov 2017-06-15 16:02:32 +03:00
parent efaa0d66da
commit 448374a228
26 changed files with 1536 additions and 51 deletions

View File

@ -211,6 +211,7 @@ extern ulonglong strtoull(const char *str, char **ptr, int base);
#define STRING_WITH_LEN(X) (X), ((size_t) (sizeof(X) - 1))
#define USTRING_WITH_LEN(X) ((uchar*) X), ((size_t) (sizeof(X) - 1))
#define C_STRING_WITH_LEN(X) ((char *) (X)), ((size_t) (sizeof(X) - 1))
#define LEX_STRING_WITH_LEN(X) (X).str, (X).length
struct st_mysql_const_lex_string
{

View File

@ -117,6 +117,8 @@ SET(SQL_EMBEDDED_SOURCES emb_qcache.cc libmysqld.c lib_sql.cc
../sql/ha_sequence.cc ../sql/ha_sequence.h
../sql/temporary_tables.cc
../sql/session_tracker.cc
../sql/item_vers.cc
../sql/vtmd.cc
${GEN_SOURCES}
${MYSYS_LIBWRAP_SOURCE}
)

View File

@ -82,7 +82,7 @@ create procedure drop_last_historical(table_name_arg varchar(255))
begin
call concat_exec2('drop table ', get_historical_table_name(table_name_arg));
end~~
set versioning_ddl_survival = 1;
set versioning_ddl_survival= on;
create or replace table t (a int) with system versioning;
insert into t values (1);
update t set a=2 where a=1;
@ -106,6 +106,10 @@ call concat_exec3('select @tm=sys_trx_end from ', get_historical_table_name('t')
@tm=sys_trx_end
1
call drop_last_historical('t');
set versioning_ddl_survival= off;
drop table t_vtmd;
drop table t;
set versioning_ddl_survival= on;
create or replace table t (a int) with system versioning;
insert into t values (1);
update t set a=2 where a=1;
@ -129,6 +133,10 @@ call concat_exec3('select @tm=sys_trx_end from ', get_historical_table_name('t')
@tm=sys_trx_end
1
call drop_last_historical('t');
set versioning_ddl_survival= off;
drop table t_vtmd;
drop table t;
set versioning_ddl_survival= on;
create or replace table t (a int) with system versioning engine innodb;
insert into t values (1);
update t set a=2 where a=1;
@ -152,6 +160,10 @@ call concat_exec3('select @tm=sys_trx_end from ', get_historical_table_name('t')
@tm=sys_trx_end
1
call drop_last_historical('t');
set versioning_ddl_survival= off;
drop table t_vtmd;
drop table t;
set versioning_ddl_survival= on;
create or replace table t (a int) with system versioning engine innodb;
insert into t values (1);
update t set a=2 where a=1;
@ -172,6 +184,7 @@ vtmd_template CREATE TABLE `vtmd_template` (
`archive_name` varchar(64) COLLATE utf8_bin DEFAULT NULL COMMENT 'Name of archive table',
`col_renames` blob DEFAULT NULL COMMENT 'Column name mappings from previous lifetime',
PRIMARY KEY (`end`),
KEY `archive_name` (`archive_name`),
PERIOD FOR SYSTEM_TIME (`start`, `end`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin STATS_PERSISTENT=0 WITH SYSTEM VERSIONING
call verify_vtq;
@ -181,6 +194,13 @@ No A B C D
3 1 1 1 1
4 1 1 1 1
5 1 1 1 1
6 1 1 1 1
7 1 1 1 1
8 1 1 1 1
9 1 1 1 1
10 1 1 1 1
11 1 1 1 1
12 1 1 1 1
drop table t;
drop procedure verify_vtq;
drop procedure innodb_verify_vtq;

View File

@ -0,0 +1,271 @@
create or replace procedure drop_archives (in vtmd_name varchar(64))
begin
declare archive_name varchar(64);
declare cur_done bool default false;
declare cur cursor for
select cur_tmp.archive_name from cur_tmp;
declare continue handler for not found set cur_done = true;
set @tmp= concat('
create or replace temporary table
cur_tmp as
select vtmd.archive_name from ', vtmd_name, ' as vtmd
for system_time all
where vtmd.archive_name is not null
group by vtmd.archive_name');
prepare stmt from @tmp; execute stmt; drop prepare stmt;
open cur;
fetch_loop: loop
fetch cur into archive_name;
if cur_done then
leave fetch_loop;
end if;
set @tmp= concat('drop table ', archive_name);
prepare stmt from @tmp; execute stmt; drop prepare stmt;
end loop;
drop table cur_tmp;
end~~
create or replace procedure check_vtmd (in vtmd_name varchar(64))
begin
set @tmp= concat('
create or replace temporary table
tmp_vtmd as
select * from ', vtmd_name, ' as vtmd
for system_time all');
prepare stmt from @tmp; execute stmt; drop prepare stmt;
set @inf= 0xFFFFFFFFFFFFFFFF + 0;
set @start= null;
select start from tmp_vtmd for system_time all order by start limit 1 into @start;
select @start > 0 and @start < @inf;
select
start = @start as A_start,
(@start:= end) and end = @inf as B_end,
name,
substr(archive_name, 1, instr(archive_name, '_')) as C_archive_name
from tmp_vtmd for system_time all;
drop table tmp_vtmd;
end~~
set versioning_ddl_survival= off;
create or replace table t0 (x int) with system versioning;
show tables;
Tables_in_test
t0
set versioning_ddl_survival= on;
create or replace table t0 (x int) with system versioning;
show tables;
Tables_in_test
t0
t0_vtmd
show create table t0_vtmd;
Table Create Table
t0_vtmd CREATE TABLE `t0_vtmd` (
`start` bigint(20) unsigned GENERATED ALWAYS AS ROW START COMMENT 'TRX_ID of table lifetime start',
`end` bigint(20) unsigned GENERATED ALWAYS AS ROW END NOT NULL COMMENT 'TRX_ID of table lifetime end',
`name` varchar(64) COLLATE utf8_bin NOT NULL COMMENT 'Table name during period [start, end)',
`archive_name` varchar(64) COLLATE utf8_bin DEFAULT NULL COMMENT 'Name of archive table',
`col_renames` blob DEFAULT NULL COMMENT 'Column name mappings from previous lifetime',
PRIMARY KEY (`end`),
KEY `archive_name` (`archive_name`),
PERIOD FOR SYSTEM_TIME (`start`, `end`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin STATS_PERSISTENT=0 WITH SYSTEM VERSIONING
call check_vtmd('t0_vtmd');
@start > 0 and @start < @inf
1
A_start B_end name C_archive_name
1 1 t0 NULL
set versioning_ddl_survival= off;
drop table t0;
set versioning_ddl_survival= on;
create or replace table t0 (x int) with system versioning;
ERROR HY000: VTMD error: `test.t0_vtmd` exists and not empty!
alter table t0 add column (y int);
call check_vtmd('t0_vtmd');
@start > 0 and @start < @inf
1
A_start B_end name C_archive_name
1 0 t0 t0_
1 1 t0 NULL
call drop_archives('t0_vtmd');
drop table t0_vtmd;
alter table t0 drop column y;
call check_vtmd('t0_vtmd');
@start > 0 and @start < @inf
1
A_start B_end name C_archive_name
1 1 t0 t0_
call drop_archives('t0_vtmd');
set versioning_ddl_survival= off;
drop tables t0, t0_vtmd;
set versioning_ddl_survival= on;
set versioning_ddl_survival= off;
create or replace table x0 (x int) with system versioning;
set versioning_ddl_survival= on;
rename table x0 to d0;
show tables;
Tables_in_test
d0
set versioning_ddl_survival= off;
drop table d0;
set versioning_ddl_survival= on;
create or replace table x0 (x int) with system versioning;
rename table x0 to d0;
show tables;
Tables_in_test
d0
d0_vtmd
call check_vtmd('d0_vtmd');
@start > 0 and @start < @inf
1
A_start B_end name C_archive_name
1 0 x0 NULL
1 1 d0 NULL
set versioning_ddl_survival= off;
drop table d0;
set versioning_ddl_survival= on;
create or replace table x0 (x int) with system versioning;
rename table x0 to d0;
ERROR HY000: VTMD error: `test.d0_vtmd` table already exists!
show tables;
Tables_in_test
d0_vtmd
x0
x0_vtmd
drop table x0_vtmd;
rename table x0 to d0;
Warnings:
Warning 4088 `test.d0_vtmd` table already exists!
show tables;
Tables_in_test
d0
d0_vtmd
rename table d0 to duck;
rename table duck to bay;
rename table bay to sheer;
rename table sheer to t0;
call check_vtmd('t0_vtmd');
@start > 0 and @start < @inf
1
A_start B_end name C_archive_name
1 0 x0 NULL
1 0 d0 NULL
1 0 duck NULL
1 0 bay NULL
1 0 sheer NULL
1 1 t0 NULL
alter table t0 add column (y int);
call check_vtmd('t0_vtmd');
@start > 0 and @start < @inf
1
A_start B_end name C_archive_name
1 0 x0 t0_
1 0 d0 t0_
1 0 duck t0_
1 0 bay t0_
1 0 sheer t0_
1 0 t0 t0_
1 1 t0 NULL
alter table t0 add column (z int);
alter table t0 drop column y;
alter table t0 drop column z;
create database db0;
rename table t0 to db0.t0;
show tables;
Tables_in_test
use db0;
show tables;
Tables_in_db0
t0
t0_TIMESTAMP_SUFFIX
t0_TIMESTAMP_SUFFIX
t0_TIMESTAMP_SUFFIX
t0_TIMESTAMP_SUFFIX
t0_vtmd
call test.check_vtmd('db0.t0_vtmd');
@start > 0 and @start < @inf
1
A_start B_end name C_archive_name
1 0 x0 t0_
1 0 d0 t0_
1 0 duck t0_
1 0 bay t0_
1 0 sheer t0_
1 0 t0 t0_
1 0 t0 t0_
1 0 t0 t0_
1 0 t0 t0_
1 1 t0 NULL
create database db1;
rename table t0 to db1.other_name;
show tables;
Tables_in_db0
use db1;
show tables;
Tables_in_db1
other_name
other_name_vtmd
t0_TIMESTAMP_SUFFIX
t0_TIMESTAMP_SUFFIX
t0_TIMESTAMP_SUFFIX
t0_TIMESTAMP_SUFFIX
call test.check_vtmd('db1.other_name_vtmd');
@start > 0 and @start < @inf
1
A_start B_end name C_archive_name
1 0 x0 t0_
1 0 d0 t0_
1 0 duck t0_
1 0 bay t0_
1 0 sheer t0_
1 0 t0 t0_
1 0 t0 t0_
1 0 t0 t0_
1 0 t0 t0_
1 0 t0 NULL
1 1 other_name NULL
alter table other_name rename to t1;
call test.check_vtmd('db1.t1_vtmd');
@start > 0 and @start < @inf
1
A_start B_end name C_archive_name
1 0 x0 t0_
1 0 d0 t0_
1 0 duck t0_
1 0 bay t0_
1 0 sheer t0_
1 0 t0 t0_
1 0 t0 t0_
1 0 t0 t0_
1 0 t0 t0_
1 0 t0 NULL
1 0 other_name NULL
1 1 t1 NULL
alter table t1 rename to test.t2, add column (y int);
use test;
show tables;
Tables_in_test
t0_TIMESTAMP_SUFFIX
t0_TIMESTAMP_SUFFIX
t0_TIMESTAMP_SUFFIX
t0_TIMESTAMP_SUFFIX
t2
t2_vtmd
call check_vtmd('t2_vtmd');
@start > 0 and @start < @inf
1
A_start B_end name C_archive_name
1 0 x0 t0_
1 0 d0 t0_
1 0 duck t0_
1 0 bay t0_
1 0 sheer t0_
1 0 t0 t0_
1 0 t0 t0_
1 0 t0 t0_
1 0 t0 t0_
1 0 t0 t1_
1 0 other_name t1_
1 0 t1 t1_
1 1 t2 NULL
drop database db0;
drop database db1;
drop database test;
create database test;

View File

@ -30,7 +30,7 @@ end~~
delimiter ;~~
set versioning_ddl_survival = 1;
set versioning_ddl_survival= on;
create or replace table t (a int) with system versioning;
insert into t values (1);
@ -49,6 +49,11 @@ call concat_exec3('select @tm=sys_trx_end from ', get_historical_table_name('t')
call drop_last_historical('t');
set versioning_ddl_survival= off;
drop table t_vtmd;
drop table t;
set versioning_ddl_survival= on;
# same for INNODB ALGORITHM=COPY
create or replace table t (a int) with system versioning;
insert into t values (1);
@ -67,6 +72,11 @@ call concat_exec3('select @tm=sys_trx_end from ', get_historical_table_name('t')
call drop_last_historical('t');
set versioning_ddl_survival= off;
drop table t_vtmd;
drop table t;
set versioning_ddl_survival= on;
# same for INNODB default ALGORITHM
create or replace table t (a int) with system versioning engine innodb;
insert into t values (1);
@ -85,6 +95,11 @@ call concat_exec3('select @tm=sys_trx_end from ', get_historical_table_name('t')
call drop_last_historical('t');
set versioning_ddl_survival= off;
drop table t_vtmd;
drop table t;
set versioning_ddl_survival= on;
# no DDL for INNODB explicit ALGORITHM=INPLACE
create or replace table t (a int) with system versioning engine innodb;
insert into t values (1);

View File

@ -0,0 +1 @@
--innodb --default-storage-engine=innodb

View File

@ -0,0 +1,161 @@
-- source include/have_innodb.inc
delimiter ~~;
create or replace procedure drop_archives (in vtmd_name varchar(64))
begin
declare archive_name varchar(64);
declare cur_done bool default false;
declare cur cursor for
select cur_tmp.archive_name from cur_tmp;
declare continue handler for not found set cur_done = true;
set @tmp= concat('
create or replace temporary table
cur_tmp as
select vtmd.archive_name from ', vtmd_name, ' as vtmd
for system_time all
where vtmd.archive_name is not null
group by vtmd.archive_name');
prepare stmt from @tmp; execute stmt; drop prepare stmt;
open cur;
fetch_loop: loop
fetch cur into archive_name;
if cur_done then
leave fetch_loop;
end if;
set @tmp= concat('drop table ', archive_name);
prepare stmt from @tmp; execute stmt; drop prepare stmt;
end loop;
drop table cur_tmp;
end~~
delimiter ;~~
delimiter ~~;
create or replace procedure check_vtmd (in vtmd_name varchar(64))
begin
set @tmp= concat('
create or replace temporary table
tmp_vtmd as
select * from ', vtmd_name, ' as vtmd
for system_time all');
prepare stmt from @tmp; execute stmt; drop prepare stmt;
set @inf= 0xFFFFFFFFFFFFFFFF + 0;
set @start= null;
select start from tmp_vtmd for system_time all order by start limit 1 into @start;
select @start > 0 and @start < @inf;
select
start = @start as A_start,
(@start:= end) and end = @inf as B_end,
name,
substr(archive_name, 1, instr(archive_name, '_')) as C_archive_name
from tmp_vtmd for system_time all;
drop table tmp_vtmd;
end~~
delimiter ;~~
# create
set versioning_ddl_survival= off;
create or replace table t0 (x int) with system versioning;
show tables;
set versioning_ddl_survival= on;
create or replace table t0 (x int) with system versioning;
show tables;
show create table t0_vtmd;
call check_vtmd('t0_vtmd');
set versioning_ddl_survival= off;
drop table t0;
set versioning_ddl_survival= on;
--error ER_VERS_VTMD_ERROR
create or replace table t0 (x int) with system versioning;
# alter
alter table t0 add column (y int);
call check_vtmd('t0_vtmd');
call drop_archives('t0_vtmd');
drop table t0_vtmd;
alter table t0 drop column y;
call check_vtmd('t0_vtmd');
call drop_archives('t0_vtmd');
set versioning_ddl_survival= off;
drop tables t0, t0_vtmd;
set versioning_ddl_survival= on;
# rename
set versioning_ddl_survival= off;
create or replace table x0 (x int) with system versioning;
set versioning_ddl_survival= on;
rename table x0 to d0;
show tables;
set versioning_ddl_survival= off;
drop table d0;
set versioning_ddl_survival= on;
create or replace table x0 (x int) with system versioning;
rename table x0 to d0;
show tables;
call check_vtmd('d0_vtmd');
set versioning_ddl_survival= off;
drop table d0;
set versioning_ddl_survival= on;
create or replace table x0 (x int) with system versioning;
--error ER_VERS_VTMD_ERROR
rename table x0 to d0;
show tables;
drop table x0_vtmd;
rename table x0 to d0;
show tables;
rename table d0 to duck;
rename table duck to bay;
rename table bay to sheer;
rename table sheer to t0;
call check_vtmd('t0_vtmd');
alter table t0 add column (y int);
call check_vtmd('t0_vtmd');
# rename to different schema
alter table t0 add column (z int);
alter table t0 drop column y;
alter table t0 drop column z;
create database db0;
rename table t0 to db0.t0;
show tables;
use db0;
--replace_regex /\d{8}_\d{6}_\d{6}/TIMESTAMP_SUFFIX/
show tables;
call test.check_vtmd('db0.t0_vtmd');
create database db1;
rename table t0 to db1.other_name;
show tables;
use db1;
--replace_regex /\d{8}_\d{6}_\d{6}/TIMESTAMP_SUFFIX/
show tables;
call test.check_vtmd('db1.other_name_vtmd');
# alter rename
alter table other_name rename to t1;
call test.check_vtmd('db1.t1_vtmd');
# alter rename and modify to different schema
alter table t1 rename to test.t2, add column (y int);
use test;
--replace_regex /\d{8}_\d{6}_\d{6}/TIMESTAMP_SUFFIX/
show tables;
call check_vtmd('t2_vtmd');
drop database db0;
drop database db1;
drop database test;
create database test;

View File

@ -23,6 +23,8 @@ set sql_mode='';
set @orig_storage_engine=@@storage_engine;
set storage_engine=myisam;
set versioning_ddl_survival=off;
set @have_innodb= (select count(engine) from information_schema.engines where engine='INNODB' and support != 'NO');
SET @innodb_or_myisam=IF(@have_innodb <> 0, 'InnoDB', 'MyISAM');
@ -129,14 +131,15 @@ SET @create_innodb_index_stats="CREATE TABLE IF NOT EXISTS innodb_index_stats (
PRIMARY KEY (database_name, table_name, index_name, stat_name)
) ENGINE=INNODB DEFAULT CHARSET=utf8 COLLATE=utf8_bin STATS_PERSISTENT=0";
SET @create_vtmd_template="CREATE TABLE IF NOT EXISTS vtmd_template (
SET @create_vtmd_template="CREATE OR REPLACE TABLE vtmd_template (
start BIGINT UNSIGNED GENERATED ALWAYS AS ROW START COMMENT 'TRX_ID of table lifetime start',
end BIGINT UNSIGNED GENERATED ALWAYS AS ROW END COMMENT 'TRX_ID of table lifetime end',
name VARCHAR(64) NOT NULL COMMENT 'Table name during period [start, end)',
archive_name VARCHAR(64) NULL COMMENT 'Name of archive table',
col_renames BLOB COMMENT 'Column name mappings from previous lifetime',
PERIOD FOR SYSTEM_TIME(start, end),
PRIMARY KEY (end)
PRIMARY KEY (end),
INDEX (archive_name)
) ENGINE=INNODB DEFAULT CHARSET=utf8 COLLATE=utf8_bin STATS_PERSISTENT=0 WITH SYSTEM VERSIONING";
SET @str=IF(@have_innodb <> 0, @create_innodb_table_stats, "SET @dummy = 0");

View File

@ -152,6 +152,7 @@ SET (SQL_SOURCE
${GEN_SOURCES}
${GEN_DIGEST_SOURCES}
${MYSYS_LIBWRAP_SOURCE}
vtmd.cc
)
IF (CMAKE_SYSTEM_NAME MATCHES "Linux" OR

View File

@ -403,6 +403,7 @@ enum enum_alter_inplace_result {
#define HA_CREATE_TMP_ALTER 8U
#define HA_LEX_CREATE_SEQUENCE 16U
#define HA_VERSIONED_TABLE 32U
#define HA_VTMD 64U
#define HA_MAX_REC_LENGTH 65535
@ -1840,6 +1841,11 @@ struct Table_scope_and_contents_source_st
{
return options & HA_VERSIONED_TABLE;
}
bool vtmd() const
{
return options & HA_VTMD;
}
};

View File

@ -7547,5 +7547,8 @@ ER_VERS_WRONG_QUERY
WARN_VERS_ALIAS_TOO_LONG
eng "Auto generated alias for `%s.%s` is too long; using `%s`."
ER_VERS_VTMD_ERROR
eng "VTMD error: %s"
ER_WRONG_TABLESPACE_NAME 42000
eng "Incorrect tablespace name `%-.192s`"

View File

@ -126,6 +126,22 @@ public:
enum enum_enable_or_disable { LEAVE_AS_IS, ENABLE, DISABLE };
bool vers_data_modifying() const
{
return flags & (
ALTER_ADD_COLUMN |
ALTER_DROP_COLUMN |
ALTER_CHANGE_COLUMN |
ALTER_DROP_PARTITION |
ALTER_COALESCE_PARTITION |
ALTER_REORGANIZE_PARTITION |
ALTER_TABLE_REORG |
ALTER_REMOVE_PARTITIONING |
ALTER_EXCHANGE_PARTITION |
ALTER_TRUNCATE_PARTITION |
ALTER_COLUMN_ORDER);
}
/**
The different values of the ALGORITHM clause.
Describes which algorithm to use when altering the table.

View File

@ -1171,7 +1171,7 @@ public:
void copy_non_errors_from_wi(THD *thd, const Warning_info *src_wi);
private:
protected:
Warning_info *get_warning_info() { return m_wi_stack.front(); }
const Warning_info *get_warning_info() const { return m_wi_stack.front(); }

View File

@ -7029,6 +7029,19 @@ int set_statement_var_if_exists(THD *thd, const char *var_name,
}
Query_tables_backup::Query_tables_backup(THD* _thd) :
thd(_thd)
{
thd->lex->reset_n_backup_query_tables_list(&backup);
}
Query_tables_backup::~Query_tables_backup()
{
thd->lex->restore_backup_query_tables_list(&backup);
}
bool LEX::sp_add_cfetch(THD *thd, const LEX_STRING &name)
{
uint offset;

View File

@ -1914,6 +1914,18 @@ private:
};
class Query_tables_backup
{
THD *thd;
Query_tables_list backup;
public:
Query_tables_backup(THD *_thd);
~Query_tables_backup();
const Query_tables_list& get() const { return backup; }
};
/*
st_parsing_options contains the flags for constructions that are
allowed in the current statement.

View File

@ -30,6 +30,7 @@
#include "sql_base.h" // tdc_remove_table, lock_table_names,
#include "sql_handler.h" // mysql_ha_rm_tables
#include "sql_statistics.h"
#include "vtmd.h"
static TABLE_LIST *rename_tables(THD *thd, TABLE_LIST *table_list,
bool skip_error);
@ -298,12 +299,23 @@ do_rename(THD *thd, TABLE_LIST *ren_table, char *new_db, char *new_table_name,
LEX_STRING new_db_name= { (char*)new_db, strlen(new_db)};
(void) rename_table_in_stat_tables(thd, &db_name, &table_name,
&new_db_name, &new_table);
if ((rc= Table_triggers_list::change_table_name(thd, ren_table->db,
old_alias,
ren_table->table_name,
new_db,
new_alias)))
VTMD_rename vtmd(*ren_table);
if (thd->variables.vers_ddl_survival)
{
rc= vtmd.try_rename(thd, new_db_name, new_table);
if (rc)
goto revert_table_name;
}
rc= Table_triggers_list::change_table_name(thd, ren_table->db,
old_alias,
ren_table->table_name,
new_db,
new_alias);
if (rc)
{
if (thd->variables.vers_ddl_survival)
vtmd.revert_rename(thd, new_db_name);
revert_table_name:
/*
We've succeeded in renaming table's .frm and in updating
corresponding handler data, but have failed to update table's

View File

@ -56,6 +56,7 @@
#include "sql_audit.h"
#include "sql_sequence.h"
#include "tztime.h"
#include "vtmd.h" // System Versioning
#ifdef __WIN__
@ -2276,6 +2277,7 @@ int mysql_rm_table_no_locks(THD *thd, TABLE_LIST *tables, bool if_exists,
char *db=table->db;
size_t db_length= table->db_length;
handlerton *table_type= 0;
VTMD_drop vtmd(*table);
DBUG_PRINT("table", ("table_l: '%s'.'%s' table: 0x%lx s: 0x%lx",
table->db, table->table_name, (long) table->table,
@ -2474,8 +2476,24 @@ int mysql_rm_table_no_locks(THD *thd, TABLE_LIST *tables, bool if_exists,
// Remove extension for delete
*(end= path + path_length - reg_ext_length)= '\0';
error= ha_delete_table(thd, table_type, path, db, table->table_name,
!dont_log_query);
if (thd->lex->sql_command == SQLCOM_DROP_TABLE &&
thd->variables.vers_ddl_survival &&
table_type && table_type != view_pseudo_hton)
{
error= vtmd.check_exists(thd);
if (error)
goto non_tmp_err;
if (!vtmd.exists)
goto drop_table;
error= mysql_rename_table(table_type, table->db, table->table_name,
table->db, vtmd.archive_name(thd), NO_FK_CHECKS);
}
else
{
drop_table:
error= ha_delete_table(thd, table_type, path, db, table->table_name,
!dont_log_query);
}
if (!error)
{
@ -2510,8 +2528,18 @@ int mysql_rm_table_no_locks(THD *thd, TABLE_LIST *tables, bool if_exists,
else if (frm_delete_error && if_exists)
thd->clear_error();
}
non_tmp_err:
non_tmp_error|= MY_TEST(error);
}
if (!error && vtmd.exists)
{
error= vtmd.update(thd);
if (error)
mysql_rename_table(table_type, table->db, vtmd.archive_name(),
table->db, table->table_name, NO_FK_CHECKS);
}
if (error)
{
if (wrong_tables.length())
@ -5040,6 +5068,7 @@ bool mysql_create_table(THD *thd, TABLE_LIST *create_table,
{
thd->locked_tables_list.unlink_all_closed_tables(thd, NULL, 0);
result= 1;
goto err;
}
else
{
@ -5048,6 +5077,16 @@ bool mysql_create_table(THD *thd, TABLE_LIST *create_table,
}
}
if (create_info->versioned() && thd->variables.vers_ddl_survival)
{
VTMD_table vtmd(*create_table);
if (vtmd.update(thd))
{
result= 1;
goto err;
}
}
err:
/* In RBR we don't need to log CREATE TEMPORARY TABLE */
if (thd->is_current_stmt_binlog_format_row() && create_info->tmp_table())
@ -5155,15 +5194,6 @@ static void make_unique_constraint_name(THD *thd, LEX_STRING *name,
** Alter a table definition
****************************************************************************/
static void vers_table_name_date(THD *thd, const char *table_name,
char *new_name, size_t new_name_size)
{
const MYSQL_TIME now= thd->query_start_TIME();
my_snprintf(new_name, new_name_size, "%s_%04d%02d%02d_%02d%02d%02d_%06d",
table_name, now.year, now.month, now.day, now.hour, now.minute,
now.second, now.second_part);
}
bool operator!=(const MYSQL_TIME &lhs, const MYSQL_TIME &rhs)
{
return lhs.year != rhs.year || lhs.month != rhs.month || lhs.day != rhs.day ||
@ -5466,6 +5496,7 @@ bool mysql_create_like_table(THD* thd, TABLE_LIST* table,
/* Replace type of source table with one specified in the statement. */
local_create_info.options&= ~HA_LEX_CREATE_TMP_TABLE;
local_create_info.options|= create_info->tmp_table();
local_create_info.options|= create_info->options;
/* Reset auto-increment counter for the new table. */
local_create_info.auto_increment_value= 0;
/*
@ -8529,18 +8560,27 @@ simple_rename_or_index_change(THD *thd, TABLE_LIST *table_list,
if (mysql_rename_table(old_db_type, alter_ctx->db, alter_ctx->table_name,
alter_ctx->new_db, alter_ctx->new_alias, 0))
error= -1;
else if (Table_triggers_list::change_table_name(thd,
alter_ctx->db,
alter_ctx->alias,
alter_ctx->table_name,
alter_ctx->new_db,
alter_ctx->new_alias))
else
{
(void) mysql_rename_table(old_db_type,
alter_ctx->new_db, alter_ctx->new_alias,
alter_ctx->db, alter_ctx->table_name,
NO_FK_CHECKS);
error= -1;
VTMD_rename vtmd(*table_list);
if (thd->variables.vers_ddl_survival && vtmd.try_rename(thd, new_db_name, new_table_name))
goto revert_table_name;
else if (Table_triggers_list::change_table_name(thd,
alter_ctx->db,
alter_ctx->alias,
alter_ctx->table_name,
alter_ctx->new_db,
alter_ctx->new_alias))
{
if (thd->variables.vers_ddl_survival)
vtmd.revert_rename(thd, new_db_name);
revert_table_name:
(void) mysql_rename_table(old_db_type,
alter_ctx->new_db, alter_ctx->new_alias,
alter_ctx->db, alter_ctx->table_name,
NO_FK_CHECKS);
error= -1;
}
}
}
@ -8672,7 +8712,11 @@ bool mysql_alter_table(THD *thd,char *new_db, char *new_name,
&alter_prelocking_strategy);
thd->open_options&= ~HA_OPEN_FOR_ALTER;
bool versioned= table_list->table && table_list->table->versioned();
if (versioned && thd->variables.vers_ddl_survival)
bool vers_data_mod= versioned &&
thd->variables.vers_ddl_survival &&
alter_info->vers_data_modifying();
if (vers_data_mod)
{
table_list->set_lock_type(thd, TL_WRITE);
if (thd->mdl_context.upgrade_shared_lock(table_list->table->mdl_ticket,
@ -9003,7 +9047,7 @@ bool mysql_alter_table(THD *thd,char *new_db, char *new_name,
Upgrade from MDL_SHARED_UPGRADABLE to MDL_SHARED_NO_WRITE.
Afterwards it's safe to take the table level lock.
*/
if ((!(versioned && thd->variables.vers_ddl_survival) &&
if ((!vers_data_mod &&
thd->mdl_context.upgrade_shared_lock(
mdl_ticket, MDL_SHARED_NO_WRITE,
thd->variables.lock_wait_timeout)) ||
@ -9435,8 +9479,7 @@ bool mysql_alter_table(THD *thd,char *new_db, char *new_name,
alter_info->keys_onoff,
&alter_ctx))
{
if (table->versioned_by_sql() && new_versioned &&
thd->variables.vers_ddl_survival)
if (vers_data_mod && new_versioned && table->versioned_by_sql())
{
// Failure of this function may result in corruption of an original table.
vers_reset_alter_copy(thd, table);
@ -9538,8 +9581,8 @@ bool mysql_alter_table(THD *thd,char *new_db, char *new_name,
anything goes wrong while renaming the new table.
*/
char backup_name[FN_LEN];
if (versioned && thd->variables.vers_ddl_survival)
vers_table_name_date(thd, alter_ctx.table_name, backup_name,
if (vers_data_mod)
VTMD_table::archive_name(thd, alter_ctx.table_name, backup_name,
sizeof(backup_name));
else
my_snprintf(backup_name, sizeof(backup_name), "%s2-%lx-%lx",
@ -9572,6 +9615,17 @@ bool mysql_alter_table(THD *thd,char *new_db, char *new_name,
goto err_with_mdl;
}
if (vers_data_mod && new_versioned)
{
DBUG_ASSERT(alter_info && table_list);
VTMD_rename vtmd(*table_list);
bool rc= alter_info->flags & Alter_info::ALTER_RENAME ?
vtmd.try_rename(thd, alter_ctx.new_db, alter_ctx.new_alias, backup_name) :
vtmd.update(thd, backup_name);
if (rc)
goto err_after_rename;
}
// Check if we renamed the table and if so update trigger files.
if (alter_ctx.is_table_renamed())
{
@ -9582,6 +9636,7 @@ bool mysql_alter_table(THD *thd,char *new_db, char *new_name,
alter_ctx.new_db,
alter_ctx.new_alias))
{
err_after_rename:
// Rename succeeded, delete the new table.
(void) quick_rm_table(thd, new_db_type,
alter_ctx.new_db, alter_ctx.new_alias, 0);
@ -9596,7 +9651,7 @@ bool mysql_alter_table(THD *thd,char *new_db, char *new_name,
}
// ALTER TABLE succeeded, delete the backup of the old table.
if (!(versioned && new_versioned && thd->variables.vers_ddl_survival) &&
if (!(vers_data_mod && new_versioned) &&
quick_rm_table(thd, old_db_type, alter_ctx.db, backup_name, FN_IS_TMP))
{
/*

View File

@ -1201,6 +1201,8 @@ int TABLE_SHARE::init_from_binary_frm_image(THD *thd, bool write,
uint ext_key_parts= 0;
plugin_ref se_plugin= 0;
const uchar *system_period= 0;
bool vtmd_used= false;
share->vtmd= false;
const uchar *extra2_field_flags= 0;
size_t extra2_field_flags_length= 0;
@ -1239,7 +1241,7 @@ int TABLE_SHARE::init_from_binary_frm_image(THD *thd, bool write,
if (*extra2 != '/') // old frm had '/' there
{
const uchar *e2end= extra2 + len;
while (extra2 + 3 < e2end)
while (extra2 + 3 <= e2end)
{
uchar type= *extra2++;
size_t length= *extra2++;
@ -1308,6 +1310,14 @@ int TABLE_SHARE::init_from_binary_frm_image(THD *thd, bool write,
extra2_field_flags= extra2;
extra2_field_flags_length= length;
break;
case EXTRA2_VTMD:
if (vtmd_used)
goto err;
share->vtmd= *extra2;
if (share->vtmd)
share->table_category= TABLE_CATEGORY_LOG;
vtmd_used= true;
break;
default:
/* abort frm parsing if it's an unknown but important extra2 value */
if (type >= EXTRA2_ENGINE_IMPORTANT)
@ -7642,6 +7652,27 @@ void TABLE::vers_update_fields()
DBUG_VOID_RETURN;
}
bool TABLE_LIST::vers_vtmd_name(String& out) const
{
static const char *vtmd_suffix= "_vtmd";
static const size_t vtmd_suffix_len= strlen(vtmd_suffix);
if (table_name_length > NAME_CHAR_LEN - vtmd_suffix_len)
{
my_printf_error(ER_VERS_VTMD_ERROR, "Table name is longer than %d characters", MYF(0), int(NAME_CHAR_LEN - vtmd_suffix_len));
return true;
}
out.set(table_name, table_name_length, table_alias_charset);
if (out.append(vtmd_suffix, vtmd_suffix_len + 1))
{
my_message(ER_VERS_VTMD_ERROR, "Failed allocate VTMD name", MYF(0));
return true;
}
out.length(out.length() - 1);
return false;
}
/**
Reset markers that fields are being updated
*/

View File

@ -751,6 +751,7 @@ struct TABLE_SHARE
*/
bool versioned;
bool vtmd;
uint16 row_start_field;
uint16 row_end_field;
uint32 hist_part_id;
@ -2354,6 +2355,7 @@ struct TABLE_LIST
/* System Versioning */
vers_select_conds_t vers_conditions;
bool vers_vtmd_name(String &out) const;
/**
@brief

View File

@ -268,6 +268,11 @@ LEX_CUSTRING build_frm_image(THD *thd, const char *table,
extra2_size+= 1 + 1 + 2 * sizeof(uint16);
}
if (create_info->vtmd())
{
extra2_size+= 1 + 1 + 1;
}
bool has_extra2_field_flags_= has_extra2_field_flags(create_fields);
if (has_extra2_field_flags_)
{
@ -340,6 +345,13 @@ LEX_CUSTRING build_frm_image(THD *thd, const char *table,
pos+= sizeof(uint16);
}
if (create_info->vtmd())
{
*pos++= EXTRA2_VTMD;
*pos++= 1;
*pos++= 1;
}
if (has_extra2_field_flags_)
{
*pos++= EXTRA2_FIELD_FLAGS;

View File

@ -174,6 +174,7 @@ enum extra2_frm_value_type {
EXTRA2_GIS=2,
EXTRA2_PERIOD_FOR_SYSTEM_TIME=4,
EXTRA2_FIELD_FLAGS=8,
EXTRA2_VTMD=16,
#define EXTRA2_ENGINE_IMPORTANT 128

165
sql/vers_utils.h Normal file
View File

@ -0,0 +1,165 @@
#ifndef VERS_UTILS_INCLUDED
#define VERS_UTILS_INCLUDED
#include "table.h"
#include "sql_class.h"
class MDL_auto_lock
{
THD *thd;
TABLE_LIST &table;
bool error;
public:
MDL_auto_lock(THD *_thd, TABLE_LIST &_table) :
thd(_thd), table(_table)
{
DBUG_ASSERT(thd);
table.mdl_request.init(MDL_key::TABLE, table.db, table.table_name, MDL_EXCLUSIVE, MDL_EXPLICIT);
error= thd->mdl_context.acquire_lock(&table.mdl_request, thd->variables.lock_wait_timeout);
}
~MDL_auto_lock()
{
if (!error)
{
DBUG_ASSERT(table.mdl_request.ticket);
thd->mdl_context.release_lock(table.mdl_request.ticket);
table.mdl_request.ticket= NULL;
}
}
bool acquire_error() const { return error; }
};
struct Compare_strncmp
{
int operator()(const LEX_STRING& a, const LEX_STRING& b) const
{
return strncmp(a.str, b.str, a.length);
}
static CHARSET_INFO* charset()
{
return system_charset_info;
}
};
template <CHARSET_INFO* &CS= system_charset_info>
struct Compare_my_strcasecmp
{
int operator()(const LEX_STRING& a, const LEX_STRING& b) const
{
DBUG_ASSERT(a.str[a.length] == 0 && b.str[b.length] == 0);
return my_strcasecmp(CS, a.str, b.str);
}
static CHARSET_INFO* charset()
{
return CS;
}
};
typedef Compare_my_strcasecmp<files_charset_info> Compare_fs;
typedef Compare_my_strcasecmp<table_alias_charset> Compare_t;
struct LEX_STRING_u : public LEX_STRING
{
LEX_STRING_u()
{
str= NULL;
LEX_STRING::length= 0;
}
LEX_STRING_u(const char *_str, uint32 _len, CHARSET_INFO *)
{
str= const_cast<char *>(_str);
LEX_STRING::length= _len;
}
uint32 length() const
{
return LEX_STRING::length;
}
const char *ptr() const
{
return LEX_STRING::str;
}
const LEX_STRING& lex_string() const
{
return *this;
}
};
template <class Compare= Compare_strncmp, class Storage= LEX_STRING_u>
struct XString : public Storage
{
public:
XString() {}
XString(char *_str, size_t _len) :
Storage(_str, _len, Compare::charset())
{
}
XString(LEX_STRING& src) :
Storage(src.str, src.length, Compare::charset())
{
}
XString(char *_str) :
Storage(_str, strlen(_str), Compare::charset())
{
}
bool operator== (const XString& b) const
{
return Storage::length() == b.length() && 0 == Compare()(this->lex_string(), b.lex_string());
}
bool operator!= (const XString& b) const
{
return !(*this == b);
}
operator const char* () const
{
return Storage::ptr();
}
};
typedef XString<> LString;
typedef XString<Compare_fs> LString_fs;
typedef XString<Compare_strncmp, String> SString;
typedef XString<Compare_fs, String> SString_fs;
typedef XString<Compare_t, String> SString_t;
#define XSTRING_WITH_LEN(X) (X).ptr(), (X).length()
#define DB_WITH_LEN(X) (X).db, (X).db_length
#define TABLE_NAME_WITH_LEN(X) (X).table_name, (X).table_name_length
class Local_da : public Diagnostics_area
{
THD *thd;
uint sql_error;
Diagnostics_area *saved_da;
public:
Local_da(THD *_thd, uint _sql_error= 0) :
Diagnostics_area(_thd->query_id, false, true),
thd(_thd),
sql_error(_sql_error),
saved_da(_thd->get_stmt_da())
{
thd->set_stmt_da(this);
}
~Local_da()
{
if (saved_da)
finish();
}
void finish()
{
DBUG_ASSERT(saved_da && thd);
thd->set_stmt_da(saved_da);
if (is_error())
my_error(sql_error ? sql_error : sql_errno(), MYF(0), message());
if (warn_count() > error_count())
saved_da->copy_non_errors_from_wi(thd, get_warning_info());
saved_da= NULL;
}
};
#endif // VERS_UTILS_INCLUDED

495
sql/vtmd.cc Normal file
View File

@ -0,0 +1,495 @@
#include "vtmd.h"
#include "sql_base.h"
#include "sql_class.h"
#include "sql_handler.h" // mysql_ha_rm_tables()
#include "sql_table.h"
#include "table_cache.h" // tdc_remove_table()
#include "key.h"
LString VERS_VTMD_TEMPLATE(C_STRING_WITH_LEN("vtmd_template"));
bool
VTMD_table::create(THD *thd)
{
Table_specification_st create_info;
TABLE_LIST src_table, table;
create_info.init(DDL_options_st::OPT_LIKE);
create_info.options|= HA_VTMD;
create_info.alias= vtmd_name;
table.init_one_table(
DB_WITH_LEN(about),
XSTRING_WITH_LEN(vtmd_name),
vtmd_name,
TL_READ);
src_table.init_one_table(
LEX_STRING_WITH_LEN(MYSQL_SCHEMA_NAME),
XSTRING_WITH_LEN(VERS_VTMD_TEMPLATE),
VERS_VTMD_TEMPLATE,
TL_READ);
Query_tables_backup backup(thd);
thd->lex->sql_command= backup.get().sql_command;
thd->lex->add_to_query_tables(&src_table);
MDL_auto_lock mdl_lock(thd, table);
if (mdl_lock.acquire_error())
return true;
Reprepare_observer *reprepare_observer= thd->m_reprepare_observer;
partition_info *work_part_info= thd->work_part_info;
thd->m_reprepare_observer= NULL;
thd->work_part_info= NULL;
bool rc= mysql_create_like_table(thd, &table, &src_table, &create_info);
thd->m_reprepare_observer= reprepare_observer;
thd->work_part_info= work_part_info;
return rc;
}
bool
VTMD_table::find_record(ulonglong sys_trx_end, bool &found)
{
int error;
key_buf_t key;
found= false;
DBUG_ASSERT(vtmd);
if (key.allocate(vtmd->s->max_unique_length))
return true;
DBUG_ASSERT(sys_trx_end);
vtmd->vers_end_field()->set_notnull();
vtmd->vers_end_field()->store(sys_trx_end, true);
key_copy(key, vtmd->record[0], vtmd->key_info + IDX_TRX_END, 0);
error= vtmd->file->ha_index_read_idx_map(vtmd->record[1], IDX_TRX_END,
key,
HA_WHOLE_KEY,
HA_READ_KEY_EXACT);
if (error)
{
if (error == HA_ERR_RECORD_DELETED || error == HA_ERR_KEY_NOT_FOUND)
return false;
vtmd->file->print_error(error, MYF(0));
return true;
}
restore_record(vtmd, record[1]);
found= true;
return false;
}
bool
VTMD_table::update(THD *thd, const char* archive_name)
{
TABLE_LIST vtmd_tl;
bool result= true;
bool close_log= false;
bool found= false;
bool created= false;
int error;
size_t an_len= 0;
Open_tables_backup open_tables_backup;
ulonglong save_thd_options;
{
Local_da local_da(thd, ER_VERS_VTMD_ERROR);
save_thd_options= thd->variables.option_bits;
thd->variables.option_bits&= ~OPTION_BIN_LOG;
if (about.vers_vtmd_name(vtmd_name))
goto quit;
while (true) // max 2 iterations
{
vtmd_tl.init_one_table(
DB_WITH_LEN(about),
XSTRING_WITH_LEN(vtmd_name),
vtmd_name,
TL_WRITE_CONCURRENT_INSERT);
vtmd= open_log_table(thd, &vtmd_tl, &open_tables_backup);
if (vtmd)
break;
if (!created && local_da.is_error() && local_da.sql_errno() == ER_NO_SUCH_TABLE)
{
local_da.reset_diagnostics_area();
if (create(thd))
goto quit;
created= true;
continue;
}
goto quit;
}
close_log= true;
if (!vtmd->versioned())
{
my_message(ER_VERS_VTMD_ERROR, "VTMD is not versioned", MYF(0));
goto quit;
}
if (!created && find_record(ULONGLONG_MAX, found))
goto quit;
if ((error= vtmd->file->extra(HA_EXTRA_MARK_AS_LOG_TABLE)))
{
vtmd->file->print_error(error, MYF(0));
goto quit;
}
/* Honor next number columns if present */
vtmd->next_number_field= vtmd->found_next_number_field;
if (vtmd->s->fields != FIELD_COUNT)
{
my_printf_error(ER_VERS_VTMD_ERROR, "`%s.%s` unexpected fields count: %d", MYF(0),
vtmd->s->db.str, vtmd->s->table_name.str, vtmd->s->fields);
goto quit;
}
if (archive_name)
{
an_len= strlen(archive_name);
vtmd->field[FLD_ARCHIVE_NAME]->store(archive_name, an_len, table_alias_charset);
vtmd->field[FLD_ARCHIVE_NAME]->set_notnull();
}
else
{
vtmd->field[FLD_ARCHIVE_NAME]->set_null();
}
vtmd->field[FLD_COL_RENAMES]->set_null();
if (found)
{
if (thd->lex->sql_command == SQLCOM_CREATE_TABLE)
{
my_printf_error(ER_VERS_VTMD_ERROR, "`%s.%s` exists and not empty!", MYF(0),
vtmd->s->db.str, vtmd->s->table_name.str);
goto quit;
}
vtmd->mark_columns_needed_for_update(); // not needed?
if (archive_name)
{
vtmd->s->versioned= false;
error= vtmd->file->ha_update_row(vtmd->record[1], vtmd->record[0]);
vtmd->s->versioned= true;
if (!error)
{
if (thd->lex->sql_command == SQLCOM_DROP_TABLE)
{
error= vtmd->file->ha_delete_row(vtmd->record[0]);
}
else
{
DBUG_ASSERT(thd->lex->sql_command == SQLCOM_ALTER_TABLE);
ulonglong sys_trx_end= (ulonglong) vtmd->vers_start_field()->val_int();
store_record(vtmd, record[1]);
vtmd->field[FLD_NAME]->store(TABLE_NAME_WITH_LEN(about), system_charset_info);
vtmd->field[FLD_NAME]->set_notnull();
vtmd->field[FLD_ARCHIVE_NAME]->set_null();
error= vtmd->file->ha_update_row(vtmd->record[1], vtmd->record[0]);
if (error)
goto err;
DBUG_ASSERT(an_len);
while (true)
{ // fill archive_name of last sequential renames
bool found;
if (find_record(sys_trx_end, found))
goto quit;
if (!found || !vtmd->field[FLD_ARCHIVE_NAME]->is_null())
break;
store_record(vtmd, record[1]);
vtmd->field[FLD_ARCHIVE_NAME]->store(archive_name, an_len, table_alias_charset);
vtmd->field[FLD_ARCHIVE_NAME]->set_notnull();
vtmd->s->versioned= false;
error= vtmd->file->ha_update_row(vtmd->record[1], vtmd->record[0]);
vtmd->s->versioned= true;
if (error)
goto err;
sys_trx_end= (ulonglong) vtmd->vers_start_field()->val_int();
} // while (true)
} // else (thd->lex->sql_command != SQLCOM_DROP_TABLE)
} // if (!error)
} // if (archive_name)
else
{
vtmd->field[FLD_NAME]->store(TABLE_NAME_WITH_LEN(about), system_charset_info);
vtmd->field[FLD_NAME]->set_notnull();
error= vtmd->file->ha_update_row(vtmd->record[1], vtmd->record[0]);
}
} // if (found)
else
{
vtmd->field[FLD_NAME]->store(TABLE_NAME_WITH_LEN(about), system_charset_info);
vtmd->field[FLD_NAME]->set_notnull();
vtmd->mark_columns_needed_for_insert(); // not needed?
error= vtmd->file->ha_write_row(vtmd->record[0]);
}
if (error)
{
err:
vtmd->file->print_error(error, MYF(0));
goto quit;
}
result= false;
}
quit:
if (close_log)
close_log_table(thd, &open_tables_backup);
thd->variables.option_bits= save_thd_options;
return result;
}
bool
VTMD_rename::move_archives(THD *thd, LString &new_db)
{
TABLE_LIST vtmd_tl;
vtmd_tl.init_one_table(
DB_WITH_LEN(about),
XSTRING_WITH_LEN(vtmd_name),
vtmd_name,
TL_READ);
int error;
bool rc= false;
SString_fs archive;
bool end_keyread= false;
bool index_end= false;
Open_tables_backup open_tables_backup;
key_buf_t key;
vtmd= open_log_table(thd, &vtmd_tl, &open_tables_backup);
if (!vtmd)
return true;
if (key.allocate(vtmd->key_info[IDX_ARCHIVE_NAME].key_length))
{
close_log_table(thd, &open_tables_backup);
return true;
}
if ((error= vtmd->file->ha_start_keyread(IDX_ARCHIVE_NAME)))
goto err;
end_keyread= true;
if ((error= vtmd->file->ha_index_init(IDX_ARCHIVE_NAME, true)))
goto err;
index_end= true;
error= vtmd->file->ha_index_first(vtmd->record[0]);
while (!error)
{
if (!vtmd->field[FLD_ARCHIVE_NAME]->is_null())
{
vtmd->field[FLD_ARCHIVE_NAME]->val_str(&archive);
key_copy(key,
vtmd->record[0],
&vtmd->key_info[IDX_ARCHIVE_NAME],
vtmd->key_info[IDX_ARCHIVE_NAME].key_length,
false);
error= vtmd->file->ha_index_read_map(
vtmd->record[0],
key,
vtmd->key_info[IDX_ARCHIVE_NAME].ext_key_part_map,
HA_READ_PREFIX_LAST);
if (!error)
{
if ((rc= move_table(thd, archive, new_db)))
break;
error= vtmd->file->ha_index_next(vtmd->record[0]);
}
}
else
{
archive.length(0);
error= vtmd->file->ha_index_next(vtmd->record[0]);
}
}
if (error && error != HA_ERR_END_OF_FILE)
{
err:
vtmd->file->print_error(error, MYF(0));
rc= true;
}
if (index_end)
vtmd->file->ha_index_end();
if (end_keyread)
vtmd->file->ha_end_keyread();
close_log_table(thd, &open_tables_backup);
return rc;
}
bool
VTMD_rename::move_table(THD *thd, SString_fs &table_name, LString &new_db)
{
handlerton *table_hton= NULL;
if (!ha_table_exists(thd, about.db, table_name, &table_hton) || !table_hton)
{
push_warning_printf(
thd, Sql_condition::WARN_LEVEL_WARN,
ER_VERS_VTMD_ERROR,
"`%s.%s` archive doesn't exist",
about.db, table_name.ptr());
return false;
}
if (ha_table_exists(thd, new_db, table_name))
{
my_printf_error(ER_VERS_VTMD_ERROR, "`%s.%s` archive already exists!", MYF(0),
new_db.ptr(), table_name.ptr());
return true;
}
TABLE_LIST tl;
tl.init_one_table(
DB_WITH_LEN(about),
XSTRING_WITH_LEN(table_name),
table_name,
TL_WRITE_ONLY);
tl.mdl_request.set_type(MDL_EXCLUSIVE);
mysql_ha_rm_tables(thd, &tl);
if (lock_table_names(thd, &tl, 0, thd->variables.lock_wait_timeout, 0))
return true;
tdc_remove_table(thd, TDC_RT_REMOVE_ALL, about.db, table_name, false);
bool rc= mysql_rename_table(
table_hton,
about.db, table_name,
new_db, table_name,
NO_FK_CHECKS);
if (!rc)
query_cache_invalidate3(thd, &tl, 0);
return rc;
}
bool
VTMD_rename::try_rename(THD *thd, LString new_db, LString new_alias, const char *archive_name)
{
Local_da local_da(thd, ER_VERS_VTMD_ERROR);
TABLE_LIST new_table;
if (check_exists(thd))
return true;
new_table.init_one_table(
XSTRING_WITH_LEN(new_db),
XSTRING_WITH_LEN(new_alias),
new_alias, TL_READ);
if (new_table.vers_vtmd_name(vtmd_new_name))
return true;
if (ha_table_exists(thd, new_db, vtmd_new_name))
{
if (exists)
{
my_printf_error(ER_VERS_VTMD_ERROR, "`%s.%s` table already exists!", MYF(0),
new_db.ptr(), vtmd_new_name.ptr());
return true;
}
push_warning_printf(
thd, Sql_condition::WARN_LEVEL_WARN,
ER_VERS_VTMD_ERROR,
"`%s.%s` table already exists!",
new_db.ptr(), vtmd_new_name.ptr());
return false;
}
if (!exists)
return false;
bool same_db= true;
if (LString_fs(DB_WITH_LEN(about)) != new_db)
{
// Move archives before VTMD so if the operation is interrupted, it could be continued.
if (move_archives(thd, new_db))
return true;
same_db= false;
}
TABLE_LIST vtmd_tl;
vtmd_tl.init_one_table(
DB_WITH_LEN(about),
XSTRING_WITH_LEN(vtmd_name),
vtmd_name,
TL_WRITE_ONLY);
vtmd_tl.mdl_request.set_type(MDL_EXCLUSIVE);
mysql_ha_rm_tables(thd, &vtmd_tl);
if (lock_table_names(thd, &vtmd_tl, 0, thd->variables.lock_wait_timeout, 0))
return true;
tdc_remove_table(thd, TDC_RT_REMOVE_ALL, about.db, vtmd_name, false);
bool rc= mysql_rename_table(hton,
about.db, vtmd_name,
new_db, vtmd_new_name,
NO_FK_CHECKS);
if (!rc)
{
query_cache_invalidate3(thd, &vtmd_tl, 0);
if (same_db || archive_name || new_alias != LString(TABLE_NAME_WITH_LEN(about)))
{
local_da.finish();
VTMD_table new_vtmd(new_table);
rc= new_vtmd.update(thd, archive_name);
}
}
return rc;
}
bool
VTMD_rename::revert_rename(THD *thd, LString new_db)
{
DBUG_ASSERT(hton);
Local_da local_da(thd, ER_VERS_VTMD_ERROR);
TABLE_LIST vtmd_tl;
vtmd_tl.init_one_table(
DB_WITH_LEN(about),
XSTRING_WITH_LEN(vtmd_new_name),
vtmd_new_name,
TL_WRITE_ONLY);
vtmd_tl.mdl_request.set_type(MDL_EXCLUSIVE);
mysql_ha_rm_tables(thd, &vtmd_tl);
if (lock_table_names(thd, &vtmd_tl, 0, thd->variables.lock_wait_timeout, 0))
return true;
tdc_remove_table(thd, TDC_RT_REMOVE_ALL, new_db, vtmd_new_name, false);
bool rc= mysql_rename_table(
hton,
new_db, vtmd_new_name,
new_db, vtmd_name,
NO_FK_CHECKS);
if (!rc)
query_cache_invalidate3(thd, &vtmd_tl, 0);
return rc;
}
void VTMD_table::archive_name(
THD* thd,
const char* table_name,
char* new_name,
size_t new_name_size)
{
const MYSQL_TIME now= thd->query_start_TIME();
my_snprintf(new_name, new_name_size, "%s_%04d%02d%02d_%02d%02d%02d_%06d",
table_name, now.year, now.month, now.day, now.hour, now.minute,
now.second, now.second_part);
}

175
sql/vtmd.h Normal file
View File

@ -0,0 +1,175 @@
#ifndef VTMD_INCLUDED
#define VTMD_INCLUDED
#include "table.h"
#include "unireg.h"
#include <mysqld_error.h>
#include "my_sys.h"
#include "vers_utils.h"
class key_buf_t
{
uchar* buf;
key_buf_t(const key_buf_t&); // disabled
key_buf_t& operator= (const key_buf_t&); // disabled
public:
key_buf_t() : buf(NULL)
{}
~key_buf_t()
{
if (buf)
my_free(buf);
}
bool allocate(size_t alloc_size)
{
DBUG_ASSERT(!buf);
buf= static_cast<uchar *>(my_malloc(alloc_size, MYF(0)));
if (!buf)
{
my_message(ER_VERS_VTMD_ERROR, "failed to allocate key buffer", MYF(0));
return true;
}
return false;
}
operator uchar* ()
{
DBUG_ASSERT(buf);
return reinterpret_cast<uchar *>(buf);
}
};
class THD;
class VTMD_table
{
protected:
TABLE *vtmd;
const TABLE_LIST &about;
SString_t vtmd_name;
private:
VTMD_table(const VTMD_table&); // prohibit copying references
public:
enum {
FLD_START= 0,
FLD_END,
FLD_NAME,
FLD_ARCHIVE_NAME,
FLD_COL_RENAMES,
FIELD_COUNT
};
enum {
IDX_TRX_END= 0,
IDX_ARCHIVE_NAME
};
VTMD_table(TABLE_LIST &_about) :
vtmd(NULL),
about(_about)
{}
bool create(THD *thd);
bool find_record(ulonglong sys_trx_end, bool &found);
bool update(THD *thd, const char* archive_name= NULL);
static void archive_name(THD *thd, const char *table_name, char *new_name, size_t new_name_size);
void archive_name(THD *thd, char *new_name, size_t new_name_size)
{
archive_name(thd, about.table_name, new_name, new_name_size);
}
};
class VTMD_exists : public VTMD_table
{
protected:
handlerton *hton;
public:
bool exists;
public:
VTMD_exists(TABLE_LIST &_about) :
VTMD_table(_about),
hton(NULL),
exists(false)
{}
bool check_exists(THD *thd); // returns error status
};
class VTMD_rename : public VTMD_exists
{
SString_t vtmd_new_name;
public:
VTMD_rename(TABLE_LIST &_about) :
VTMD_exists(_about)
{}
bool try_rename(THD *thd, LString new_db, LString new_alias, const char* archive_name= NULL);
bool revert_rename(THD *thd, LString new_db);
private:
bool move_archives(THD *thd, LString &new_db);
bool move_table(THD *thd, SString_fs &table_name, LString &new_db);
};
class VTMD_drop : public VTMD_exists
{
char archive_name_[NAME_CHAR_LEN];
public:
VTMD_drop(TABLE_LIST &_about) :
VTMD_exists(_about)
{
*archive_name_= 0;
}
const char* archive_name(THD *thd)
{
VTMD_table::archive_name(thd, archive_name_, sizeof(archive_name_));
return archive_name_;
}
const char* archive_name() const
{
DBUG_ASSERT(*archive_name_);
return archive_name_;
}
bool update(THD *thd)
{
DBUG_ASSERT(*archive_name_);
return VTMD_exists::update(thd, archive_name_);
}
};
inline
bool
VTMD_exists::check_exists(THD *thd)
{
if (about.vers_vtmd_name(vtmd_name))
return true;
exists= ha_table_exists(thd, about.db, vtmd_name, &hton);
if (exists && !hton)
{
my_printf_error(ER_VERS_VTMD_ERROR, "`%s.%s` handlerton empty!", MYF(0),
about.db, vtmd_name.ptr());
return true;
}
return false;
}
#endif // VTMD_INCLUDED

View File

@ -8717,11 +8717,11 @@ no_commit:
innobase_srv_conc_enter_innodb(m_prebuilt);
vers_set_fields = table->versioned() && (
!is_innopart() &&
sql_command != SQLCOM_CREATE_TABLE) ?
ROW_INS_VERSIONED :
ROW_INS_NORMAL;
vers_set_fields = (table->versioned() && !is_innopart() &&
(sql_command != SQLCOM_CREATE_TABLE || table->s->vtmd))
?
ROW_INS_VERSIONED :
ROW_INS_NORMAL;
/* Step-5: Execute insert graph that will result in actual insert. */
error = row_insert_for_mysql((byte*) record, m_prebuilt, vers_set_fields);
@ -9500,7 +9500,8 @@ ha_innobase::update_row(
upd_t* uvect = row_get_prebuilt_update_vector(m_prebuilt);
ib_uint64_t autoinc;
bool vers_set_fields;
bool vers_set_fields = false;
bool vers_ins_row = false;
/* Build an update vector from the modified fields in the rows
(uses m_upd_buf of the handle) */
@ -9526,12 +9527,23 @@ ha_innobase::update_row(
innobase_srv_conc_enter_innodb(m_prebuilt);
vers_set_fields = m_prebuilt->upd_node->versioned && !is_innopart();
if (!table->versioned())
m_prebuilt->upd_node->versioned = false;
if (m_prebuilt->upd_node->versioned && !is_innopart()) {
vers_set_fields = true;
if (thd_sql_command(m_user_thd) == SQLCOM_ALTER_TABLE && !table->s->vtmd)
{
m_prebuilt->upd_node->vers_delete = true;
} else {
m_prebuilt->upd_node->vers_delete = false;
vers_ins_row = true;
}
}
error = row_update_for_mysql((byte*) old_row, m_prebuilt, vers_set_fields);
if (error == DB_SUCCESS && vers_set_fields &&
thd_sql_command(m_user_thd) != SQLCOM_ALTER_TABLE) {
if (error == DB_SUCCESS && vers_ins_row) {
if (trx->id != static_cast<trx_id_t>(table->vers_start_field()->val_int()))
error = row_insert_for_mysql((byte*) old_row, m_prebuilt, ROW_INS_HISTORICAL);
}

View File

@ -1875,6 +1875,7 @@ row_update_for_mysql_using_upd_graph(
upd_cascade_t* new_upd_nodes;
upd_cascade_t* processed_cascades;
bool got_s_lock = false;
bool vers_delete = prebuilt->upd_node->vers_delete;
DBUG_ENTER("row_update_for_mysql_using_upd_graph");
@ -2004,8 +2005,7 @@ run_again:
upd_field_t* ufield;
dict_col_t* col;
unsigned col_idx;
if (node->is_delete ||
thd_sql_command(trx->mysql_thd) == SQLCOM_ALTER_TABLE) {
if (node->is_delete || vers_delete) {
ufield = &uvect->fields[0];
uvect->n_fields = 0;
node->is_delete = false;