MDEV-13103 Deal with page_compressed page corruption

fil_page_decompress(): Replaces fil_decompress_page().
Allow the caller detect errors. Remove
duplicated code. Use the "safe" instead of "fast" variants of
decompression routines.

fil_page_compress(): Replaces fil_compress_page().
The length of the input buffer always was srv_page_size (innodb_page_size).
Remove printouts, and remove the fil_space_t* parameter.

buf_tmp_buffer_t::reserved: Make private; the accessors acquire()
and release() will use atomic memory access.

buf_pool_reserve_tmp_slot(): Make static. Remove the second parameter.
Do not acquire any mutex. Remove the allocation of the buffers.

buf_tmp_reserve_crypt_buf(), buf_tmp_reserve_compression_buf():
Refactored away from buf_pool_reserve_tmp_slot().

buf_page_decrypt_after_read(): Make static, and simplify the logic.
Use the encryption buffer also for decompressing.

buf_page_io_complete(), buf_dblwr_process(): Check more failures.

fil_space_encrypt(): Simplify the debug checks.

fil_space_t::printed_compression_failure: Remove.

fil_get_compression_alg_name(): Remove.

fil_iterate(): Allocate a buffer for compression and decompression
only once, instead of allocating and freeing it for every page
that uses compression, during IMPORT TABLESPACE.

fil_node_get_space_id(), fil_page_is_index_page(),
fil_page_is_lzo_compressed(): Remove (unused code).
This commit is contained in:
Marko Mäkelä 2018-06-13 16:15:21 +03:00
parent 72005b7a1c
commit f5eb37129f
28 changed files with 945 additions and 2001 deletions

View File

@ -1,4 +1,3 @@
call mtr.add_suppression("InnoDB: Compression failed for space [0-9]+ name test/innodb_page_compressed[0-9] len [0-9]+ err 2 write_size [0-9]+.");
set global innodb_file_format = `Barracuda`; set global innodb_file_format = `Barracuda`;
set global innodb_file_per_table = on; set global innodb_file_per_table = on;
create table innodb_normal (c1 int not null auto_increment primary key, b char(200)) engine=innodb; create table innodb_normal (c1 int not null auto_increment primary key, b char(200)) engine=innodb;

View File

@ -1,4 +1,3 @@
call mtr.add_suppression("InnoDB: Compression failed for space [0-9]+ name test/innodb_page_compressed[0-9] len [0-9]+ err 2 write_size [0-9]+.");
set global innodb_compression_algorithm = snappy; set global innodb_compression_algorithm = snappy;
set global innodb_file_format = `Barracuda`; set global innodb_file_format = `Barracuda`;
set global innodb_file_per_table = on; set global innodb_file_per_table = on;

View File

@ -1,8 +1,6 @@
--source include/have_innodb.inc --source include/have_innodb.inc
--source include/not_embedded.inc --source include/not_embedded.inc
call mtr.add_suppression("InnoDB: Compression failed for space [0-9]+ name test/innodb_page_compressed[0-9] len [0-9]+ err 2 write_size [0-9]+.");
# All page compression test use the same # All page compression test use the same
--source include/innodb-page-compression.inc --source include/innodb-page-compression.inc

View File

@ -2,8 +2,6 @@
-- source include/have_innodb_snappy.inc -- source include/have_innodb_snappy.inc
--source include/not_embedded.inc --source include/not_embedded.inc
call mtr.add_suppression("InnoDB: Compression failed for space [0-9]+ name test/innodb_page_compressed[0-9] len [0-9]+ err 2 write_size [0-9]+.");
# snappy # snappy
set global innodb_compression_algorithm = snappy; set global innodb_compression_algorithm = snappy;

View File

@ -354,14 +354,142 @@ on the io_type */
? (counter##_READ) \ ? (counter##_READ) \
: (counter##_WRITTEN)) : (counter##_WRITTEN))
/** Reserve a buffer slot for encryption, decryption or page compression.
@param[in,out] buf_pool buffer pool
@return reserved buffer slot */
static buf_tmp_buffer_t* buf_pool_reserve_tmp_slot(buf_pool_t* buf_pool)
{
for (ulint i = 0; i < buf_pool->tmp_arr->n_slots; i++) {
buf_tmp_buffer_t* slot = &buf_pool->tmp_arr->slots[i];
if (slot->acquire()) {
return slot;
}
}
/* We assume that free slot is found */
ut_error;
return NULL;
}
/** Reserve a buffer for encryption, decryption or decompression.
@param[in,out] slot reserved slot */
static void buf_tmp_reserve_crypt_buf(buf_tmp_buffer_t* slot)
{
if (!slot->crypt_buf) {
slot->crypt_buf = static_cast<byte*>(
aligned_malloc(srv_page_size, srv_page_size));
}
}
/** Reserve a buffer for compression.
@param[in,out] slot reserved slot */
static void buf_tmp_reserve_compression_buf(buf_tmp_buffer_t* slot)
{
if (!slot->comp_buf) {
/* Both snappy and lzo compression methods require that
output buffer used for compression is bigger than input
buffer. Increase the allocated buffer size accordingly. */
ulint size = srv_page_size;
#ifdef HAVE_LZO
size += LZO1X_1_15_MEM_COMPRESS;
#elif defined HAVE_SNAPPY
size = snappy_max_compressed_length(size);
#endif
slot->comp_buf = static_cast<byte*>(
aligned_malloc(size, srv_page_size));
}
}
/** Decrypt a page. /** Decrypt a page.
@param[in,out] bpage Page control block @param[in,out] bpage Page control block
@param[in,out] space tablespace @param[in,out] space tablespace
@return whether the operation was successful */ @return whether the operation was successful */
static static bool buf_page_decrypt_after_read(buf_page_t* bpage, fil_space_t* space)
bool {
buf_page_decrypt_after_read(buf_page_t* bpage, fil_space_t* space) ut_ad(space->n_pending_ios > 0);
MY_ATTRIBUTE((nonnull)); ut_ad(space->id == bpage->space);
byte* dst_frame = bpage->zip.data ? bpage->zip.data :
((buf_block_t*) bpage)->frame;
bool page_compressed = fil_page_is_compressed(dst_frame);
buf_pool_t* buf_pool = buf_pool_from_bpage(bpage);
if (bpage->offset == 0) {
/* File header pages are not encrypted/compressed */
return true;
}
/* Page is encrypted if encryption information is found from
tablespace and page contains used key_version. This is true
also for pages first compressed and then encrypted. */
buf_tmp_buffer_t* slot;
if (page_compressed) {
/* the page we read is unencrypted */
/* Find free slot from temporary memory array */
decompress:
slot = buf_pool_reserve_tmp_slot(buf_pool);
/* For decompression, use crypt_buf. */
buf_tmp_reserve_crypt_buf(slot);
decompress_with_slot:
ut_d(fil_page_type_validate(dst_frame));
bpage->write_size = fil_page_decompress(slot->crypt_buf,
dst_frame);
slot->release();
ut_ad(!bpage->write_size || fil_page_type_validate(dst_frame));
ut_ad(space->n_pending_ios > 0);
return bpage->write_size != 0;
}
if (space->crypt_data
&& mach_read_from_4(FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION
+ dst_frame)) {
/* Verify encryption checksum before we even try to
decrypt. */
if (!fil_space_verify_crypt_checksum(
dst_frame, buf_page_get_zip_size(bpage), NULL,
bpage->offset)) {
decrypt_failed:
/* Mark page encrypted in case it should be. */
if (space->crypt_data->type
!= CRYPT_SCHEME_UNENCRYPTED) {
bpage->encrypted = true;
}
return false;
}
/* Find free slot from temporary memory array */
slot = buf_pool_reserve_tmp_slot(buf_pool);
buf_tmp_reserve_crypt_buf(slot);
ut_d(fil_page_type_validate(dst_frame));
/* decrypt using crypt_buf to dst_frame */
if (!fil_space_decrypt(space, slot->crypt_buf,
dst_frame, &bpage->encrypted)) {
slot->release();
goto decrypt_failed;
}
ut_d(fil_page_type_validate(dst_frame));
if (fil_page_is_compressed_encrypted(dst_frame)) {
goto decompress_with_slot;
}
slot->release();
} else if (fil_page_is_compressed_encrypted(dst_frame)) {
goto decompress;
}
ut_ad(space->n_pending_ios > 0);
return true;
}
/********************************************************************//** /********************************************************************//**
Mark a table with the specified space pointed by bpage->space corrupted. Mark a table with the specified space pointed by bpage->space corrupted.
@ -4784,7 +4912,6 @@ buf_page_io_complete(buf_page_t* bpage, bool evict)
buf_pool_t* buf_pool = buf_pool_from_bpage(bpage); buf_pool_t* buf_pool = buf_pool_from_bpage(bpage);
const ibool uncompressed = (buf_page_get_state(bpage) const ibool uncompressed = (buf_page_get_state(bpage)
== BUF_BLOCK_FILE_PAGE); == BUF_BLOCK_FILE_PAGE);
byte* frame = NULL;
dberr_t err = DB_SUCCESS; dberr_t err = DB_SUCCESS;
ut_a(buf_page_in_file(bpage)); ut_a(buf_page_in_file(bpage));
@ -4802,19 +4929,18 @@ buf_page_io_complete(buf_page_t* bpage, bool evict)
ulint read_page_no = 0; ulint read_page_no = 0;
ulint read_space_id = 0; ulint read_space_id = 0;
uint key_version = 0; uint key_version = 0;
byte* frame = bpage->zip.data
ut_ad(bpage->zip.data || ((buf_block_t*)bpage)->frame); ? bpage->zip.data
: reinterpret_cast<buf_block_t*>(bpage)->frame;
ut_ad(frame);
fil_space_t* space = fil_space_acquire_for_io(bpage->space); fil_space_t* space = fil_space_acquire_for_io(bpage->space);
if (!space) { if (!space) {
return(DB_TABLESPACE_DELETED); return(DB_TABLESPACE_DELETED);
} }
buf_page_decrypt_after_read(bpage, space); if (!buf_page_decrypt_after_read(bpage, space)) {
err = DB_DECRYPTION_FAILED;
if (buf_page_get_zip_size(bpage)) { goto database_corrupted;
frame = bpage->zip.data;
} else {
frame = ((buf_block_t*) bpage)->frame;
} }
if (buf_page_get_zip_size(bpage)) { if (buf_page_get_zip_size(bpage)) {
@ -4978,7 +5104,7 @@ database_corrupted:
/* io_type == BUF_IO_WRITE */ /* io_type == BUF_IO_WRITE */
if (bpage->slot) { if (bpage->slot) {
/* Mark slot free */ /* Mark slot free */
bpage->slot->reserved = false; bpage->slot->release();
bpage->slot = NULL; bpage->slot = NULL;
} }
} }
@ -6233,66 +6359,6 @@ buf_page_init_for_backup_restore(
} }
#endif /* !UNIV_HOTBACKUP */ #endif /* !UNIV_HOTBACKUP */
/********************************************************************//**
Reserve unused slot from temporary memory array and allocate necessary
temporary memory if not yet allocated.
@return reserved slot */
UNIV_INTERN
buf_tmp_buffer_t*
buf_pool_reserve_tmp_slot(
/*======================*/
buf_pool_t* buf_pool, /*!< in: buffer pool where to
reserve */
bool compressed) /*!< in: is file space compressed */
{
buf_tmp_buffer_t *free_slot=NULL;
/* Array is protected by buf_pool mutex */
buf_pool_mutex_enter(buf_pool);
for(ulint i = 0; i < buf_pool->tmp_arr->n_slots; i++) {
buf_tmp_buffer_t *slot = &buf_pool->tmp_arr->slots[i];
if(slot->reserved == false) {
free_slot = slot;
break;
}
}
/* We assume that free slot is found */
ut_a(free_slot != NULL);
free_slot->reserved = true;
/* Now that we have reserved this slot we can release
buf_pool mutex */
buf_pool_mutex_exit(buf_pool);
/* Allocate temporary memory for encryption/decryption */
if (free_slot->crypt_buf == NULL) {
free_slot->crypt_buf = static_cast<byte*>(aligned_malloc(UNIV_PAGE_SIZE, UNIV_PAGE_SIZE));
memset(free_slot->crypt_buf, 0, UNIV_PAGE_SIZE);
}
/* For page compressed tables allocate temporary memory for
compression/decompression */
if (compressed && free_slot->comp_buf == NULL) {
ulint size = UNIV_PAGE_SIZE;
/* Both snappy and lzo compression methods require that
output buffer used for compression is bigger than input
buffer. Increase the allocated buffer size accordingly. */
#if HAVE_SNAPPY
size = snappy_max_compressed_length(size);
#endif
#if HAVE_LZO
size += LZO1X_1_15_MEM_COMPRESS;
#endif
free_slot->comp_buf = static_cast<byte*>(aligned_malloc(size, UNIV_PAGE_SIZE));
memset(free_slot->comp_buf, 0, size);
}
return (free_slot);
}
/** Encryption and page_compression hook that is called just before /** Encryption and page_compression hook that is called just before
a page is written to disk. a page is written to disk.
@param[in,out] space tablespace @param[in,out] space tablespace
@ -6342,16 +6408,18 @@ buf_page_encrypt_before_write(
} }
ulint zip_size = buf_page_get_zip_size(bpage); ulint zip_size = buf_page_get_zip_size(bpage);
ulint page_size = (zip_size) ? zip_size : UNIV_PAGE_SIZE; ut_ad(!zip_size || !page_compressed);
buf_pool_t* buf_pool = buf_pool_from_bpage(bpage); buf_pool_t* buf_pool = buf_pool_from_bpage(bpage);
/* Find free slot from temporary memory array */ /* Find free slot from temporary memory array */
buf_tmp_buffer_t* slot = buf_pool_reserve_tmp_slot(buf_pool, page_compressed); buf_tmp_buffer_t* slot = buf_pool_reserve_tmp_slot(buf_pool);
slot->out_buf = NULL; slot->out_buf = NULL;
bpage->slot = slot; bpage->slot = slot;
buf_tmp_reserve_crypt_buf(slot);
byte *dst_frame = slot->crypt_buf; byte *dst_frame = slot->crypt_buf;
if (!page_compressed) { if (!page_compressed) {
not_compressed:
/* Encrypt page content */ /* Encrypt page content */
byte* tmp = fil_space_encrypt(space, byte* tmp = fil_space_encrypt(space,
bpage->offset, bpage->offset,
@ -6359,32 +6427,28 @@ buf_page_encrypt_before_write(
src_frame, src_frame,
dst_frame); dst_frame);
bpage->real_size = page_size; bpage->real_size = UNIV_PAGE_SIZE;
slot->out_buf = dst_frame = tmp; slot->out_buf = dst_frame = tmp;
ut_d(fil_page_type_validate(tmp)); ut_d(fil_page_type_validate(tmp));
} else { } else {
/* First we compress the page content */ /* First we compress the page content */
ulint out_len = 0; buf_tmp_reserve_compression_buf(slot);
byte* tmp = slot->comp_buf;
byte *tmp = fil_compress_page( ulint out_len = fil_page_compress(
space, src_frame, tmp,
(byte *)src_frame,
slot->comp_buf,
page_size,
fsp_flags_get_page_compression_level(space->flags), fsp_flags_get_page_compression_level(space->flags),
fil_space_get_block_size(space, bpage->offset), fil_space_get_block_size(space, bpage->offset),
encrypted, encrypted);
&out_len); if (!out_len) {
goto not_compressed;
}
bpage->real_size = out_len; bpage->real_size = out_len;
#ifdef UNIV_DEBUG ut_d(fil_page_type_validate(tmp));
fil_page_type_validate(tmp);
#endif
if(encrypted) {
if (encrypted) {
/* And then we encrypt the page content */ /* And then we encrypt the page content */
tmp = fil_space_encrypt(space, tmp = fil_space_encrypt(space,
bpage->offset, bpage->offset,
@ -6396,131 +6460,9 @@ buf_page_encrypt_before_write(
slot->out_buf = dst_frame = tmp; slot->out_buf = dst_frame = tmp;
} }
#ifdef UNIV_DEBUG ut_d(fil_page_type_validate(dst_frame));
fil_page_type_validate(dst_frame);
#endif
// return dst_frame which will be written // return dst_frame which will be written
return dst_frame; return dst_frame;
} }
/** Decrypt a page.
@param[in,out] bpage Page control block
@param[in,out] space tablespace
@return whether the operation was successful */
static
bool
buf_page_decrypt_after_read(buf_page_t* bpage, fil_space_t* space)
{
ut_ad(space->n_pending_ios > 0);
ut_ad(space->id == bpage->space);
ulint zip_size = buf_page_get_zip_size(bpage);
ulint size = (zip_size) ? zip_size : UNIV_PAGE_SIZE;
byte* dst_frame = (zip_size) ? bpage->zip.data :
((buf_block_t*) bpage)->frame;
unsigned key_version =
mach_read_from_4(dst_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION);
bool page_compressed = fil_page_is_compressed(dst_frame);
bool page_compressed_encrypted = fil_page_is_compressed_encrypted(dst_frame);
buf_pool_t* buf_pool = buf_pool_from_bpage(bpage);
bool success = true;
if (bpage->offset == 0) {
/* File header pages are not encrypted/compressed */
return (true);
}
/* Page is encrypted if encryption information is found from
tablespace and page contains used key_version. This is true
also for pages first compressed and then encrypted. */
if (!space->crypt_data) {
key_version = 0;
}
if (page_compressed) {
/* the page we read is unencrypted */
/* Find free slot from temporary memory array */
buf_tmp_buffer_t* slot = buf_pool_reserve_tmp_slot(buf_pool, page_compressed);
#ifdef UNIV_DEBUG
fil_page_type_validate(dst_frame);
#endif
/* decompress using comp_buf to dst_frame */
fil_decompress_page(slot->comp_buf,
dst_frame,
ulong(size),
&bpage->write_size);
/* Mark this slot as free */
slot->reserved = false;
key_version = 0;
#ifdef UNIV_DEBUG
fil_page_type_validate(dst_frame);
#endif
} else {
buf_tmp_buffer_t* slot = NULL;
if (key_version) {
/* Verify encryption checksum before we even try to
decrypt. */
if (!fil_space_verify_crypt_checksum(dst_frame,
zip_size, NULL, bpage->offset)) {
/* Mark page encrypted in case it should
be. */
if (space->crypt_data->type
!= CRYPT_SCHEME_UNENCRYPTED) {
bpage->encrypted = true;
}
return (false);
}
/* Find free slot from temporary memory array */
slot = buf_pool_reserve_tmp_slot(buf_pool, page_compressed);
#ifdef UNIV_DEBUG
fil_page_type_validate(dst_frame);
#endif
/* decrypt using crypt_buf to dst_frame */
if (!fil_space_decrypt(space, slot->crypt_buf,
dst_frame, &bpage->encrypted)) {
success = false;
}
#ifdef UNIV_DEBUG
fil_page_type_validate(dst_frame);
#endif
}
if (page_compressed_encrypted && success) {
if (!slot) {
slot = buf_pool_reserve_tmp_slot(buf_pool, page_compressed);
}
#ifdef UNIV_DEBUG
fil_page_type_validate(dst_frame);
#endif
/* decompress using comp_buf to dst_frame */
fil_decompress_page(slot->comp_buf,
dst_frame,
ulong(size),
&bpage->write_size);
ut_d(fil_page_type_validate(dst_frame));
}
/* Mark this slot as free */
if (slot) {
slot->reserved = false;
}
}
ut_ad(space->n_pending_ios > 0);
return (success);
}
#endif /* !UNIV_INNOCHECKSUM */ #endif /* !UNIV_INNOCHECKSUM */

View File

@ -510,10 +510,11 @@ buf_dblwr_process()
"Restoring possible half-written data pages " "Restoring possible half-written data pages "
"from the doublewrite buffer..."); "from the doublewrite buffer...");
unaligned_read_buf = static_cast<byte*>(ut_malloc(2 * UNIV_PAGE_SIZE)); unaligned_read_buf = static_cast<byte*>(ut_malloc(3 * UNIV_PAGE_SIZE));
read_buf = static_cast<byte*>( read_buf = static_cast<byte*>(
ut_align(unaligned_read_buf, UNIV_PAGE_SIZE)); ut_align(unaligned_read_buf, UNIV_PAGE_SIZE));
byte* const buf = read_buf + UNIV_PAGE_SIZE;
for (std::list<byte*>::iterator i = recv_dblwr.pages.begin(); for (std::list<byte*>::iterator i = recv_dblwr.pages.begin();
i != recv_dblwr.pages.end(); ++i, ++page_no_dblwr ) { i != recv_dblwr.pages.end(); ++i, ++page_no_dblwr ) {
@ -562,13 +563,11 @@ buf_dblwr_process()
ignore this page (there should be redo log ignore this page (there should be redo log
records to initialize it). */ records to initialize it). */
} else { } else {
if (fil_page_is_compressed_encrypted(read_buf) ||
fil_page_is_compressed(read_buf)) {
/* Decompress the page before /* Decompress the page before
validating the checksum. */ validating the checksum. */
fil_decompress_page( ulint decomp = fil_page_decompress(buf, read_buf);
NULL, read_buf, srv_page_size, if (!decomp || (decomp != srv_page_size && zip_size)) {
NULL, true); goto bad;
} }
if (fil_space_verify_crypt_checksum( if (fil_space_verify_crypt_checksum(
@ -580,6 +579,7 @@ buf_dblwr_process()
continue; continue;
} }
bad:
/* We intentionally skip this message for /* We intentionally skip this message for
is_all_zero pages. */ is_all_zero pages. */
ib_logf(IB_LOG_LEVEL_INFO, ib_logf(IB_LOG_LEVEL_INFO,
@ -588,18 +588,15 @@ buf_dblwr_process()
space_id, page_no); space_id, page_no);
} }
/* Next, validate the doublewrite page. */ ulint decomp = fil_page_decompress(buf, page);
if (fil_page_is_compressed_encrypted(page) || if (!decomp || (decomp != srv_page_size && zip_size)) {
fil_page_is_compressed(page)) { goto bad_doublewrite;
/* Decompress the page before
validating the checksum. */
fil_decompress_page(
NULL, page, srv_page_size, NULL, true);
} }
if (!fil_space_verify_crypt_checksum(page, zip_size, NULL,
if (!fil_space_verify_crypt_checksum(page, zip_size, NULL, page_no) page_no)
&& buf_page_is_corrupted(true, page, zip_size, space)) { && buf_page_is_corrupted(true, page, zip_size, space)) {
if (!is_all_zero) { if (!is_all_zero) {
bad_doublewrite:
ib_logf(IB_LOG_LEVEL_WARN, ib_logf(IB_LOG_LEVEL_WARN,
"A doublewrite copy of page " "A doublewrite copy of page "
ULINTPF ":" ULINTPF " is corrupted.", ULINTPF ":" ULINTPF " is corrupted.",

View File

@ -708,60 +708,39 @@ fil_space_encrypt(
#ifdef UNIV_DEBUG #ifdef UNIV_DEBUG
if (tmp) { if (tmp) {
/* Verify that encrypted buffer is not corrupted */ /* Verify that encrypted buffer is not corrupted */
byte* tmp_mem = (byte *)malloc(UNIV_PAGE_SIZE);
dberr_t err = DB_SUCCESS; dberr_t err = DB_SUCCESS;
byte* src = src_frame; byte* src = src_frame;
bool page_compressed_encrypted = (mach_read_from_2(tmp+FIL_PAGE_TYPE) == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED); bool page_compressed_encrypted = (mach_read_from_2(tmp+FIL_PAGE_TYPE) == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED);
byte* comp_mem = NULL; byte uncomp_mem[UNIV_PAGE_SIZE_MAX];
byte* uncomp_mem = NULL; byte tmp_mem[UNIV_PAGE_SIZE_MAX];
ulint size = (zip_size) ? zip_size : UNIV_PAGE_SIZE; ulint size = (zip_size) ? zip_size : UNIV_PAGE_SIZE;
if (page_compressed_encrypted) { if (page_compressed_encrypted) {
comp_mem = (byte *)malloc(UNIV_PAGE_SIZE); memcpy(uncomp_mem, src, srv_page_size);
uncomp_mem = (byte *)malloc(UNIV_PAGE_SIZE); ulint unzipped1 = fil_page_decompress(
memcpy(comp_mem, src_frame, UNIV_PAGE_SIZE); tmp_mem, uncomp_mem);
fil_decompress_page(uncomp_mem, comp_mem, ut_ad(unzipped1);
srv_page_size, NULL); if (unzipped1 != srv_page_size) {
src = uncomp_mem; src = uncomp_mem;
} }
}
bool corrupted1 = buf_page_is_corrupted(true, src, zip_size, space); ut_ad(!buf_page_is_corrupted(true, src, zip_size, space));
bool ok = fil_space_decrypt(crypt_data, tmp_mem, size, tmp, &err); ut_ad(fil_space_decrypt(crypt_data, tmp_mem, size, tmp, &err));
ut_ad(err == DB_SUCCESS);
/* Need to decompress the page if it was also compressed */ /* Need to decompress the page if it was also compressed */
if (page_compressed_encrypted) { if (page_compressed_encrypted) {
memcpy(comp_mem, tmp_mem, UNIV_PAGE_SIZE); byte buf[UNIV_PAGE_SIZE_MAX];
fil_decompress_page(tmp_mem, comp_mem, memcpy(buf, tmp_mem, srv_page_size);
srv_page_size, NULL); ulint unzipped2 = fil_page_decompress(tmp_mem, buf);
ut_ad(unzipped2);
} }
bool corrupted = buf_page_is_corrupted(true, tmp_mem, zip_size, space); memcpy(tmp_mem + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION,
memcpy(tmp_mem+FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION, src+FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION, 8); src + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION, 8);
bool different = memcmp(src, tmp_mem, size); ut_ad(!memcmp(src, tmp_mem, size));
if (!ok || corrupted || corrupted1 || err != DB_SUCCESS || different) {
fprintf(stderr, "ok %d corrupted %d corrupted1 %d err %d different %d\n",
ok , corrupted, corrupted1, err, different);
fprintf(stderr, "src_frame\n");
buf_page_print(src_frame, zip_size);
fprintf(stderr, "encrypted_frame\n");
buf_page_print(tmp, zip_size);
fprintf(stderr, "decrypted_frame\n");
buf_page_print(tmp_mem, zip_size);
ut_ad(0);
} }
free(tmp_mem);
if (comp_mem) {
free(comp_mem);
}
if (uncomp_mem) {
free(uncomp_mem);
}
}
#endif /* UNIV_DEBUG */ #endif /* UNIV_DEBUG */
return tmp; return tmp;

View File

@ -342,19 +342,6 @@ fil_space_get_by_id(
return(space); return(space);
} }
/****************************************************************//**
Get space id from fil node */
ulint
fil_node_get_space_id(
/*==================*/
fil_node_t* node) /*!< in: Compressed node*/
{
ut_ad(node);
ut_ad(node->space);
return (node->space->id);
}
/*******************************************************************//** /*******************************************************************//**
Returns the table space by a given name, NULL if not found. */ Returns the table space by a given name, NULL if not found. */
UNIV_INLINE UNIV_INLINE

View File

@ -80,73 +80,26 @@ static ulint srv_data_read, srv_data_written;
#include "snappy-c.h" #include "snappy-c.h"
#endif #endif
/* Used for debugging */ /** Compress a page_compressed page before writing to a data file.
//#define UNIV_PAGECOMPRESS_DEBUG 1 @param[in] buf page to be compressed
@param[out] out_buf compressed page
/****************************************************************//** @param[in] level compression level
For page compressed pages compress the page before actual write @param[in] block_size file system block size
operation. @param[in] encrypted whether the page will be subsequently encrypted
@return compressed page to be written*/ @return actual length of compressed page
UNIV_INTERN @retval 0 if the page was not compressed */
byte* UNIV_INTERN ulint fil_page_compress(const byte* buf, byte* out_buf, ulint level,
fil_compress_page( ulint block_size, bool encrypted)
/*==============*/
fil_space_t* space, /*!< in,out: tablespace (NULL during IMPORT) */
byte* buf, /*!< in: buffer from which to write; in aio
this must be appropriately aligned */
byte* out_buf, /*!< out: compressed buffer */
ulint len, /*!< in: length of input buffer.*/
ulint level, /* in: compression level */
ulint block_size, /*!< in: block size */
bool encrypted, /*!< in: is page also encrypted */
ulint* out_len) /*!< out: actual length of compressed
page */
{ {
int err = Z_OK; int comp_level = int(level);
int comp_level = level;
ulint header_len = FIL_PAGE_DATA + FIL_PAGE_COMPRESSED_SIZE; ulint header_len = FIL_PAGE_DATA + FIL_PAGE_COMPRESSED_SIZE;
ulint write_size = 0;
#if HAVE_LZO
lzo_uint write_size_lzo = write_size;
#endif
/* Cache to avoid change during function execution */ /* Cache to avoid change during function execution */
ulint comp_method = innodb_compression_algorithm; ulint comp_method = innodb_compression_algorithm;
bool allocated = false;
/* page_compression does not apply to tables or tablespaces
that use ROW_FORMAT=COMPRESSED */
ut_ad(!space || !FSP_FLAGS_GET_ZIP_SSIZE(space->flags));
if (encrypted) { if (encrypted) {
header_len += FIL_PAGE_COMPRESSION_METHOD_SIZE; header_len += FIL_PAGE_COMPRESSION_METHOD_SIZE;
} }
if (!out_buf) {
allocated = true;
ulint size = UNIV_PAGE_SIZE;
/* Both snappy and lzo compression methods require that
output buffer used for compression is bigger than input
buffer. Increase the allocated buffer size accordingly. */
#if HAVE_SNAPPY
if (comp_method == PAGE_SNAPPY_ALGORITHM) {
size = snappy_max_compressed_length(size);
}
#endif
#if HAVE_LZO
if (comp_method == PAGE_LZO_ALGORITHM) {
size += LZO1X_1_15_MEM_COMPRESS;
}
#endif
out_buf = static_cast<byte *>(ut_malloc(size));
}
ut_ad(buf);
ut_ad(out_buf);
ut_ad(len);
ut_ad(out_len);
/* Let's not compress file space header or /* Let's not compress file space header or
extent descriptor */ extent descriptor */
switch (fil_page_get_type(buf)) { switch (fil_page_get_type(buf)) {
@ -154,8 +107,7 @@ fil_compress_page(
case FIL_PAGE_TYPE_FSP_HDR: case FIL_PAGE_TYPE_FSP_HDR:
case FIL_PAGE_TYPE_XDES: case FIL_PAGE_TYPE_XDES:
case FIL_PAGE_PAGE_COMPRESSED: case FIL_PAGE_PAGE_COMPRESSED:
*out_len = len; return 0;
goto err_exit;
} }
/* If no compression level was provided to this table, use system /* If no compression level was provided to this table, use system
@ -164,204 +116,113 @@ fil_compress_page(
comp_level = page_zip_level; comp_level = page_zip_level;
} }
DBUG_PRINT("compress", ulint write_size = srv_page_size - header_len;
("Preparing for space " ULINTPF " '%s' len " ULINTPF,
space ? space->id : 0,
space ? space->name : "(import)",
len));
write_size = UNIV_PAGE_SIZE - header_len; switch (comp_method) {
default:
switch(comp_method) { ut_ad(!"unknown compression method");
/* fall through */
case PAGE_UNCOMPRESSED:
return 0;
case PAGE_ZLIB_ALGORITHM:
{
ulong len = uLong(write_size);
if (Z_OK == compress2(
out_buf + header_len, &len,
buf, uLong(srv_page_size), comp_level)) {
write_size = len;
goto success;
}
}
break;
#ifdef HAVE_LZ4 #ifdef HAVE_LZ4
case PAGE_LZ4_ALGORITHM: case PAGE_LZ4_ALGORITHM:
# ifdef HAVE_LZ4_COMPRESS_DEFAULT
write_size = LZ4_compress_default(
reinterpret_cast<const char*>(buf),
reinterpret_cast<char*>(out_buf) + header_len,
int(srv_page_size), int(write_size));
# else
write_size = LZ4_compress_limitedOutput(
reinterpret_cast<const char*>(buf),
reinterpret_cast<char*>(out_buf) + header_len,
int(srv_page_size), int(write_size));
# endif
#ifdef HAVE_LZ4_COMPRESS_DEFAULT if (write_size) {
err = LZ4_compress_default((const char *)buf, goto success;
(char *)out_buf+header_len, len, write_size);
#else
err = LZ4_compress_limitedOutput((const char *)buf,
(char *)out_buf+header_len, len, write_size);
#endif /* HAVE_LZ4_COMPRESS_DEFAULT */
write_size = err;
if (err == 0) {
/* If error we leave the actual page as it was */
#ifndef UNIV_PAGECOMPRESS_DEBUG
if (space && !space->printed_compression_failure) {
space->printed_compression_failure = true;
#endif
ib_logf(IB_LOG_LEVEL_WARN,
"Compression failed for space " ULINTPF
" name %s len " ULINTPF
" err %d write_size " ULINTPF ".",
space->id, space->name, len,
err, write_size);
#ifndef UNIV_PAGECOMPRESS_DEBUG
}
#endif
srv_stats.pages_page_compression_error.inc();
*out_len = len;
goto err_exit;
} }
break; break;
#endif /* HAVE_LZ4 */ #endif /* HAVE_LZ4 */
#ifdef HAVE_LZO #ifdef HAVE_LZO
case PAGE_LZO_ALGORITHM: case PAGE_LZO_ALGORITHM: {
err = lzo1x_1_15_compress( lzo_uint len = write_size;
buf, len, out_buf+header_len, &write_size_lzo, out_buf+UNIV_PAGE_SIZE);
write_size = write_size_lzo; if (LZO_E_OK == lzo1x_1_15_compress(
buf, srv_page_size,
if (err != LZO_E_OK || write_size > UNIV_PAGE_SIZE-header_len) { out_buf + header_len, &len,
if (space && !space->printed_compression_failure) { out_buf + srv_page_size)
space->printed_compression_failure = true; && len <= write_size) {
ib_logf(IB_LOG_LEVEL_WARN, write_size = len;
"Compression failed for space " ULINTPF goto success;
" name %s len " ULINTPF
" err %d write_size " ULINTPF ".",
space->id, space->name, len,
err, write_size);
} }
srv_stats.pages_page_compression_error.inc();
*out_len = len;
goto err_exit;
}
break; break;
}
#endif /* HAVE_LZO */ #endif /* HAVE_LZO */
#ifdef HAVE_LZMA #ifdef HAVE_LZMA
case PAGE_LZMA_ALGORITHM: { case PAGE_LZMA_ALGORITHM: {
size_t out_pos=0; size_t out_pos = 0;
err = lzma_easy_buffer_encode(
comp_level,
LZMA_CHECK_NONE,
NULL, /* No custom allocator, use malloc/free */
reinterpret_cast<uint8_t*>(buf),
len,
reinterpret_cast<uint8_t*>(out_buf + header_len),
&out_pos,
(size_t)write_size);
if (err != LZMA_OK || out_pos > UNIV_PAGE_SIZE-header_len) {
if (space && !space->printed_compression_failure) {
space->printed_compression_failure = true;
ib_logf(IB_LOG_LEVEL_WARN,
"Compression failed for space " ULINTPF
" name %s len " ULINTPF
" err %d write_size " ULINTPF ".",
space->id, space->name, len,
err, out_pos);
}
srv_stats.pages_page_compression_error.inc();
*out_len = len;
goto err_exit;
}
if (LZMA_OK == lzma_easy_buffer_encode(
comp_level, LZMA_CHECK_NONE, NULL,
buf, srv_page_size, out_buf + header_len,
&out_pos, write_size)
&& out_pos <= write_size) {
write_size = out_pos; write_size = out_pos;
goto success;
}
break; break;
} }
#endif /* HAVE_LZMA */ #endif /* HAVE_LZMA */
#ifdef HAVE_BZIP2 #ifdef HAVE_BZIP2
case PAGE_BZIP2_ALGORITHM: { case PAGE_BZIP2_ALGORITHM: {
unsigned len = unsigned(write_size);
err = BZ2_bzBuffToBuffCompress( if (BZ_OK == BZ2_bzBuffToBuffCompress(
(char *)(out_buf + header_len), reinterpret_cast<char*>(out_buf + header_len),
(unsigned int *)&write_size, &len,
(char *)buf, const_cast<char*>(
len, reinterpret_cast<const char*>(buf)),
1, unsigned(srv_page_size), 1, 0, 0)
0, && len <= write_size) {
0); write_size = len;
goto success;
if (err != BZ_OK || write_size > UNIV_PAGE_SIZE-header_len) {
if (space && !space->printed_compression_failure) {
space->printed_compression_failure = true;
ib_logf(IB_LOG_LEVEL_WARN,
"Compression failed for space " ULINTPF
" name %s len " ULINTPF
" err %d write_size " ULINTPF ".",
space->id, space->name, len,
err, write_size);
}
srv_stats.pages_page_compression_error.inc();
*out_len = len;
goto err_exit;
} }
break; break;
} }
#endif /* HAVE_BZIP2 */ #endif /* HAVE_BZIP2 */
#ifdef HAVE_SNAPPY #ifdef HAVE_SNAPPY
case PAGE_SNAPPY_ALGORITHM: case PAGE_SNAPPY_ALGORITHM: {
{ size_t len = snappy_max_compressed_length(srv_page_size);
snappy_status cstatus;
write_size = snappy_max_compressed_length(UNIV_PAGE_SIZE);
cstatus = snappy_compress( if (SNAPPY_OK == snappy_compress(
(const char *)buf, reinterpret_cast<const char*>(buf),
(size_t)len, srv_page_size,
(char *)(out_buf+header_len), reinterpret_cast<char*>(out_buf) + header_len,
(size_t*)&write_size); &len)
&& len <= write_size) {
if (cstatus != SNAPPY_OK || write_size > UNIV_PAGE_SIZE-header_len) { write_size = len;
if (space && !space->printed_compression_failure) { goto success;
space->printed_compression_failure = true;
ib_logf(IB_LOG_LEVEL_WARN,
"Compression failed for space " ULINTPF
" name %s len " ULINTPF
" err %d write_size " ULINTPF ".",
space->id, space->name, len,
(int)cstatus, write_size);
}
srv_stats.pages_page_compression_error.inc();
*out_len = len;
goto err_exit;
} }
break; break;
} }
#endif /* HAVE_SNAPPY */ #endif /* HAVE_SNAPPY */
case PAGE_ZLIB_ALGORITHM:
err = compress2(out_buf+header_len, (ulong*)&write_size, buf,
uLong(len), comp_level);
if (err != Z_OK) {
/* If error we leave the actual page as it was */
if (space && !space->printed_compression_failure) {
space->printed_compression_failure = true;
ib_logf(IB_LOG_LEVEL_WARN,
"Compression failed for space " ULINTPF
" name %s len " ULINTPF
" rt %d write_size " ULINTPF ".",
space->id, space->name, len,
err, write_size);
} }
srv_stats.pages_page_compression_error.inc(); srv_stats.pages_page_compression_error.inc();
*out_len = len; return 0;
goto err_exit; success:
}
break;
case PAGE_UNCOMPRESSED:
*out_len = len;
return (buf);
break;
default:
ut_error;
break;
}
/* Set up the page header */ /* Set up the page header */
memcpy(out_buf, buf, FIL_PAGE_DATA); memcpy(out_buf, buf, FIL_PAGE_DATA);
/* Set up the checksum */ /* Set up the checksum */
@ -392,22 +253,11 @@ fil_compress_page(
/* Verify that page can be decompressed */ /* Verify that page can be decompressed */
{ {
byte *comp_page; page_t tmp_buf[UNIV_PAGE_SIZE_MAX];
byte *uncomp_page; page_t page[UNIV_PAGE_SIZE_MAX];
memcpy(page, out_buf, srv_page_size);
comp_page = static_cast<byte *>(ut_malloc(UNIV_PAGE_SIZE)); ut_ad(fil_page_decompress(tmp_buf, page));
uncomp_page = static_cast<byte *>(ut_malloc(UNIV_PAGE_SIZE)); ut_ad(!buf_page_is_corrupted(false, page, 0, NULL));
memcpy(comp_page, out_buf, UNIV_PAGE_SIZE);
fil_decompress_page(uncomp_page, comp_page, ulong(len), NULL);
if (buf_page_is_corrupted(false, uncomp_page, 0, space)) {
buf_page_print(uncomp_page, 0);
ut_ad(0);
}
ut_free(comp_page);
ut_free(uncomp_page);
} }
#endif /* UNIV_DEBUG */ #endif /* UNIV_DEBUG */
@ -431,323 +281,144 @@ fil_compress_page(
#endif #endif
} }
DBUG_PRINT("compress", srv_stats.page_compression_saved.add(srv_page_size - write_size);
("Succeeded for space " ULINTPF
" '%s' len " ULINTPF " out_len " ULINTPF,
space ? space->id : 0,
space ? space->name : "(import)",
len, write_size));
srv_stats.page_compression_saved.add((len - write_size));
srv_stats.pages_page_compressed.inc(); srv_stats.pages_page_compressed.inc();
/* If we do not persistently trim rest of page, we need to write it /* If we do not persistently trim rest of page, we need to write it
all */ all */
if (!srv_use_trim) { if (!srv_use_trim) {
memset(out_buf+write_size,0,len-write_size); memset(out_buf + write_size, 0, srv_page_size - write_size);
write_size = len;
} }
*out_len = write_size; return write_size;
if (allocated) {
/* TODO: reduce number of memcpy's */
memcpy(buf, out_buf, len);
} else {
return(out_buf);
}
err_exit:
if (allocated) {
ut_free(out_buf);
}
return (buf);
} }
/****************************************************************//** /** Decompress a page that may be subject to page_compressed compression.
For page compressed pages decompress the page after actual read @param[in,out] tmp_buf temporary buffer (of innodb_page_size)
operation. */ @param[in,out] buf possibly compressed page buffer
UNIV_INTERN @return size of the compressed data
void @retval 0 if decompression failed
fil_decompress_page( @retval srv_page_size if the page was not compressed */
/*================*/ UNIV_INTERN ulint fil_page_decompress(byte* tmp_buf, byte* buf)
byte* page_buf, /*!< in: preallocated buffer or NULL */
byte* buf, /*!< out: buffer from which to read; in aio
this must be appropriately aligned */
ulong len, /*!< in: length of output buffer.*/
ulint* write_size, /*!< in/out: Actual payload size of
the compressed data. */
bool return_error) /*!< in: true if only an error should
be produced when decompression fails.
By default this parameter is false. */
{ {
int err = 0; const unsigned ptype = mach_read_from_2(buf+FIL_PAGE_TYPE);
ulint actual_size = 0; ulint header_len;
ulint compression_alg = 0; ib_uint64_t compression_alg;
byte *in_buf; switch (ptype) {
ulint ptype; case FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED:
ulint header_len = FIL_PAGE_DATA + FIL_PAGE_COMPRESSED_SIZE; header_len = FIL_PAGE_DATA + FIL_PAGE_COMPRESSED_SIZE
+ FIL_PAGE_COMPRESSION_METHOD_SIZE;
ut_ad(buf); compression_alg = mach_read_from_2(
ut_ad(len); FIL_PAGE_DATA + FIL_PAGE_COMPRESSED_SIZE + buf);
break;
ptype = mach_read_from_2(buf+FIL_PAGE_TYPE); case FIL_PAGE_PAGE_COMPRESSED:
header_len = FIL_PAGE_DATA + FIL_PAGE_COMPRESSED_SIZE;
if (ptype == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED) { compression_alg = mach_read_from_8(
header_len += FIL_PAGE_COMPRESSION_METHOD_SIZE; FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION + buf);
break;
default:
return srv_page_size;
} }
/* Do not try to uncompressed pages that are not compressed */ if (mach_read_from_4(buf + FIL_PAGE_SPACE_OR_CHKSUM)
if (ptype != FIL_PAGE_PAGE_COMPRESSED && != BUF_NO_CHECKSUM_MAGIC) {
ptype != FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED) { return 0;
return;
} }
// If no buffer was given, we need to allocate temporal buffer ulint actual_size = mach_read_from_2(buf + FIL_PAGE_DATA);
if (page_buf == NULL) {
in_buf = static_cast<byte *>(ut_malloc(UNIV_PAGE_SIZE));
memset(in_buf, 0, UNIV_PAGE_SIZE);
} else {
in_buf = page_buf;
}
/* Before actual decompress, make sure that page type is correct */
if (mach_read_from_4(buf+FIL_PAGE_SPACE_OR_CHKSUM) != BUF_NO_CHECKSUM_MAGIC ||
(ptype != FIL_PAGE_PAGE_COMPRESSED &&
ptype != FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED)) {
ib_logf(IB_LOG_LEVEL_ERROR,
"Corruption: We try to uncompress corrupted page"
" CRC " ULINTPF " type " ULINTPF " len " ULINTPF ".",
mach_read_from_4(buf+FIL_PAGE_SPACE_OR_CHKSUM),
mach_read_from_2(buf+FIL_PAGE_TYPE), len);
fflush(stderr);
if (return_error) {
goto error_return;
}
ut_error;
}
/* Get compression algorithm */
if (ptype == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED) {
compression_alg = mach_read_from_2(buf+FIL_PAGE_DATA+FIL_PAGE_COMPRESSED_SIZE);
} else {
compression_alg = mach_read_from_8(buf+FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION);
}
/* Get the actual size of compressed page */
actual_size = mach_read_from_2(buf+FIL_PAGE_DATA);
/* Check if payload size is corrupted */ /* Check if payload size is corrupted */
if (actual_size == 0 || actual_size > UNIV_PAGE_SIZE) { if (actual_size == 0 || actual_size > srv_page_size - header_len) {
return 0;
}
switch (compression_alg) {
default:
ib_logf(IB_LOG_LEVEL_ERROR, ib_logf(IB_LOG_LEVEL_ERROR,
"Corruption: We try to uncompress corrupted page" "Unknown compression algorithm " UINT64PF,
" actual size " ULINTPF " compression %s.", compression_alg);
actual_size, fil_get_compression_alg_name(compression_alg)); return 0;
fflush(stderr);
if (return_error) {
goto error_return;
}
ut_error;
}
/* Store actual payload size of the compressed data. This pointer
points to buffer pool. */
if (write_size) {
*write_size = actual_size;
}
DBUG_PRINT("compress",
("Preparing for decompress for len " ULINTPF ".",
actual_size));
switch(compression_alg) {
case PAGE_ZLIB_ALGORITHM: case PAGE_ZLIB_ALGORITHM:
err= uncompress(in_buf, &len, buf+header_len, (unsigned long)actual_size); {
uLong len = srv_page_size;
/* If uncompress fails it means that page is corrupted */ if (Z_OK != uncompress(tmp_buf, &len,
if (err != Z_OK) { buf + header_len,
uLong(actual_size))
ib_logf(IB_LOG_LEVEL_ERROR, && len != srv_page_size) {
"Corruption: Page is marked as compressed" return 0;
" but uncompress failed with error %d "
" size " ULINTPF " len " ULINTPF ".",
err, actual_size, len);
fflush(stderr);
if (return_error) {
goto error_return;
} }
ut_error;
} }
break; break;
#ifdef HAVE_LZ4 #ifdef HAVE_LZ4
case PAGE_LZ4_ALGORITHM: case PAGE_LZ4_ALGORITHM:
err = LZ4_decompress_fast((const char *)buf+header_len, (char *)in_buf, len); if (LZ4_decompress_safe(reinterpret_cast<const char*>(buf)
+ header_len,
if (err != (int)actual_size) { reinterpret_cast<char*>(tmp_buf),
ib_logf(IB_LOG_LEVEL_ERROR, actual_size, srv_page_size)
"Corruption: Page is marked as compressed" == int(srv_page_size)) {
" but uncompress failed with error %d "
" size " ULINTPF " len " ULINTPF ".",
err, actual_size, len);
fflush(stderr);
if (return_error) {
goto error_return;
}
ut_error;
}
break; break;
}
return 0;
#endif /* HAVE_LZ4 */ #endif /* HAVE_LZ4 */
#ifdef HAVE_LZO #ifdef HAVE_LZO
case PAGE_LZO_ALGORITHM: { case PAGE_LZO_ALGORITHM: {
ulint olen = 0; lzo_uint len_lzo = srv_page_size;
lzo_uint olen_lzo = olen; if (LZO_E_OK == lzo1x_decompress_safe(
err = lzo1x_decompress((const unsigned char *)buf+header_len, buf + header_len,
actual_size,(unsigned char *)in_buf, &olen_lzo, NULL); actual_size, tmp_buf, &len_lzo, NULL)
&& len_lzo == srv_page_size) {
olen = olen_lzo;
if (err != LZO_E_OK || (olen == 0 || olen > UNIV_PAGE_SIZE)) {
ib_logf(IB_LOG_LEVEL_ERROR,
"Corruption: Page is marked as compressed"
" but uncompress failed with error %d "
" size " ULINTPF " len " ULINTPF ".",
err, actual_size, len);
fflush(stderr);
if (return_error) {
goto error_return;
}
ut_error;
}
break; break;
} }
return 0;
}
#endif /* HAVE_LZO */ #endif /* HAVE_LZO */
#ifdef HAVE_LZMA #ifdef HAVE_LZMA
case PAGE_LZMA_ALGORITHM: { case PAGE_LZMA_ALGORITHM: {
lzma_ret ret;
size_t src_pos = 0; size_t src_pos = 0;
size_t dst_pos = 0; size_t dst_pos = 0;
uint64_t memlimit = UINT64_MAX; uint64_t memlimit = UINT64_MAX;
ret = lzma_stream_buffer_decode( if (LZMA_OK == lzma_stream_buffer_decode(
&memlimit, &memlimit, 0, NULL, buf + header_len,
0, &src_pos, actual_size, tmp_buf, &dst_pos,
NULL, srv_page_size)
buf+header_len, && dst_pos == srv_page_size) {
&src_pos,
actual_size,
in_buf,
&dst_pos,
len);
if (ret != LZMA_OK || (dst_pos == 0 || dst_pos > UNIV_PAGE_SIZE)) {
ib_logf(IB_LOG_LEVEL_ERROR,
"Corruption: Page is marked as compressed"
" but decompression read only %ld bytes"
" size " ULINTPF "len " ULINTPF ".",
dst_pos, actual_size, len);
fflush(stderr);
if (return_error) {
goto error_return;
}
ut_error;
}
break; break;
} }
return 0;
}
#endif /* HAVE_LZMA */ #endif /* HAVE_LZMA */
#ifdef HAVE_BZIP2 #ifdef HAVE_BZIP2
case PAGE_BZIP2_ALGORITHM: { case PAGE_BZIP2_ALGORITHM: {
unsigned int dst_pos = UNIV_PAGE_SIZE; unsigned int dst_pos = srv_page_size;
if (BZ_OK == BZ2_bzBuffToBuffDecompress(
err = BZ2_bzBuffToBuffDecompress( reinterpret_cast<char*>(tmp_buf),
(char *)in_buf,
&dst_pos, &dst_pos,
(char *)(buf+header_len), reinterpret_cast<char*>(buf) + header_len,
actual_size, actual_size, 1, 0)
1, && dst_pos == srv_page_size) {
0);
if (err != BZ_OK || (dst_pos == 0 || dst_pos > UNIV_PAGE_SIZE)) {
ib_logf(IB_LOG_LEVEL_ERROR,
"Corruption: Page is marked as compressed"
" but decompression read only %du bytes"
" size " ULINTPF " len " ULINTPF " err %d.",
dst_pos, actual_size, len, err);
fflush(stderr);
if (return_error) {
goto error_return;
}
ut_error;
}
break; break;
} }
return 0;
}
#endif /* HAVE_BZIP2 */ #endif /* HAVE_BZIP2 */
#ifdef HAVE_SNAPPY #ifdef HAVE_SNAPPY
case PAGE_SNAPPY_ALGORITHM: case PAGE_SNAPPY_ALGORITHM: {
{ size_t olen = srv_page_size;
snappy_status cstatus;
ulint olen = UNIV_PAGE_SIZE;
cstatus = snappy_uncompress(
(const char *)(buf+header_len),
(size_t)actual_size,
(char *)in_buf,
(size_t*)&olen);
if (cstatus != SNAPPY_OK || olen != UNIV_PAGE_SIZE) {
ib_logf(IB_LOG_LEVEL_ERROR,
"Corruption: Page is marked as compressed"
" but decompression read only " ULINTPF " bytes"
" size " ULINTPF " len " ULINTPF " err %d.",
olen, actual_size, len, (int)cstatus);
fflush(stderr);
if (return_error) {
goto error_return;
}
ut_error;
}
if (SNAPPY_OK == snappy_uncompress(
reinterpret_cast<const char*>(buf) + header_len,
actual_size,
reinterpret_cast<char*>(tmp_buf), &olen)
&& olen == srv_page_size) {
break; break;
} }
return 0;
}
#endif /* HAVE_SNAPPY */ #endif /* HAVE_SNAPPY */
default:
ib_logf(IB_LOG_LEVEL_ERROR,
"Corruption: Page is marked as compressed"
" but compression algorithm %s"
" is not known."
,fil_get_compression_alg_name(compression_alg));
fflush(stderr);
if (return_error) {
goto error_return;
}
ut_error;
break;
} }
srv_stats.pages_page_decompressed.inc(); srv_stats.pages_page_decompressed.inc();
memcpy(buf, tmp_buf, srv_page_size);
/* Copy the uncompressed page to the buffer pool, not return actual_size;
really any other options. */
memcpy(buf, in_buf, len);
error_return:
if (page_buf != in_buf) {
ut_free(in_buf);
}
} }

View File

@ -5177,6 +5177,11 @@ ibuf_check_bitmap_on_import(
bitmap_page = ibuf_bitmap_get_map_page( bitmap_page = ibuf_bitmap_get_map_page(
space_id, page_no, zip_size, &mtr); space_id, page_no, zip_size, &mtr);
if (!bitmap_page) {
mutex_exit(&ibuf_mutex);
return DB_CORRUPTION;
}
for (i = FSP_IBUF_BITMAP_OFFSET + 1; i < page_size; i++) { for (i = FSP_IBUF_BITMAP_OFFSET + 1; i < page_size; i++) {
const ulint offset = page_no + i; const ulint offset = page_no + i;

View File

@ -1,7 +1,7 @@
/***************************************************************************** /*****************************************************************************
Copyright (c) 1995, 2016, Oracle and/or its affiliates. All Rights Reserved. Copyright (c) 1995, 2016, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2013, 2017, MariaDB Corporation. Copyright (c) 2013, 2018, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify it under This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software the terms of the GNU General Public License as published by the Free Software
@ -39,6 +39,7 @@ Created 11/5/1995 Heikki Tuuri
#include "ut0rbt.h" #include "ut0rbt.h"
#include "os0proc.h" #include "os0proc.h"
#include "log0log.h" #include "log0log.h"
#include "my_atomic.h"
/** @name Modes for buf_page_get_gen */ /** @name Modes for buf_page_get_gen */
/* @{ */ /* @{ */
@ -1506,45 +1507,16 @@ buf_page_encrypt_before_write(
buf_page_t* bpage, buf_page_t* bpage,
byte* src_frame); byte* src_frame);
/**********************************************************************
The hook that is called after page is written to disk.
The function releases any resources needed for encryption that was allocated
in buf_page_encrypt_before_write */
UNIV_INTERN
ibool
buf_page_encrypt_after_write(
/*=========================*/
buf_page_t* page); /*!< in/out: buffer page that was flushed */
/********************************************************************//**
The hook that is called just before a page is read from disk.
The function allocates memory that is used to temporarily store disk content
before getting decrypted */
UNIV_INTERN
byte*
buf_page_decrypt_before_read(
/*=========================*/
buf_page_t* page, /*!< in/out: buffer page read from disk */
ulint zip_size); /*!< in: compressed page size, or 0 */
/********************************************************************//**
The hook that is called just after a page is read from disk.
The function decrypt disk content into buf_page_t and releases the
temporary buffer that was allocated in buf_page_decrypt_before_read */
UNIV_INTERN
bool
buf_page_decrypt_after_read(
/*========================*/
buf_page_t* page); /*!< in/out: buffer page read from disk */
/** @brief The temporary memory structure. /** @brief The temporary memory structure.
NOTE! The definition appears here only for other modules of this NOTE! The definition appears here only for other modules of this
directory (buf) to see it. Do not use from outside! */ directory (buf) to see it. Do not use from outside! */
typedef struct { typedef struct {
bool reserved; /*!< true if this slot is reserved private:
int32 reserved; /*!< true if this slot is reserved
*/ */
public:
byte* crypt_buf; /*!< for encryption the data needs to be byte* crypt_buf; /*!< for encryption the data needs to be
copied to a separate buffer before it's copied to a separate buffer before it's
encrypted&written. this as a page can be encrypted&written. this as a page can be
@ -1555,6 +1527,21 @@ typedef struct {
byte* out_buf; /*!< resulting buffer after byte* out_buf; /*!< resulting buffer after
encryption/compression. This is a encryption/compression. This is a
pointer and not allocated. */ pointer and not allocated. */
/** Release the slot */
void release()
{
my_atomic_store32_explicit(&reserved, false,
MY_MEMORY_ORDER_RELAXED);
}
/** Acquire the slot
@return whether the slot was acquired */
bool acquire()
{
return !my_atomic_fas32_explicit(&reserved, true,
MY_MEMORY_ORDER_RELAXED);
}
} buf_tmp_buffer_t; } buf_tmp_buffer_t;
/** The common buffer control block structure /** The common buffer control block structure

View File

@ -341,9 +341,6 @@ struct fil_space_t {
bool is_in_unflushed_spaces; bool is_in_unflushed_spaces;
/*!< true if this space is currently in /*!< true if this space is currently in
unflushed_spaces */ unflushed_spaces */
bool printed_compression_failure;
/*!< true if we have already printed
compression failure */
fil_space_crypt_t* crypt_data; fil_space_crypt_t* crypt_data;
/*!< tablespace crypt data or NULL */ /*!< tablespace crypt data or NULL */
ulint file_block_size; ulint file_block_size;

View File

@ -68,7 +68,6 @@ fil_get_page_type_name(
} }
return "PAGE TYPE CORRUPTED"; return "PAGE TYPE CORRUPTED";
} }
/****************************************************************//** /****************************************************************//**

View File

@ -1,6 +1,6 @@
/***************************************************************************** /*****************************************************************************
Copyright (C) 2013, 2017 MariaDB Corporation. All Rights Reserved. Copyright (C) 2013, 2018 MariaDB Corporation.
This program is free software; you can redistribute it and/or modify it under This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software the terms of the GNU General Public License as published by the Free Software
@ -30,70 +30,26 @@ atomic writes information to table space.
Created 11/12/2013 Jan Lindström jan.lindstrom@skysql.com Created 11/12/2013 Jan Lindström jan.lindstrom@skysql.com
***********************************************************************/ ***********************************************************************/
/*******************************************************************//** /** Compress a page_compressed page before writing to a data file.
Find out wheather the page is index page or not @param[in] buf page to be compressed
@return true if page type index page, false if not */ @param[out] out_buf compressed page
UNIV_INLINE @param[in] level compression level
ibool @param[in] block_size file system block size
fil_page_is_index_page( @param[in] encrypted whether the page will be subsequently encrypted
/*===================*/ @return actual length of compressed page
byte *buf); /*!< in: page */ @retval 0 if the page was not compressed */
UNIV_INTERN ulint fil_page_compress(const byte* buf, byte* out_buf, ulint level,
ulint block_size, bool encrypted)
MY_ATTRIBUTE((nonnull, warn_unused_result));
/****************************************************************//** /** Decompress a page that may be subject to page_compressed compression.
Get the name of the compression algorithm used for page @param[in,out] tmp_buf temporary buffer (of innodb_page_size)
compression. @param[in,out] buf compressed page buffer
@return compression algorithm name or "UNKNOWN" if not known*/ @return size of the compressed data
UNIV_INLINE @retval 0 if decompression failed
const char* @retval srv_page_size if the page was not compressed */
fil_get_compression_alg_name( UNIV_INTERN ulint fil_page_decompress(byte* tmp_buf, byte* buf)
/*=========================*/ MY_ATTRIBUTE((nonnull, warn_unused_result));
ulint comp_alg); /*!<in: compression algorithm number */
/****************************************************************//**
For page compressed pages compress the page before actual write
operation.
@return compressed page to be written*/
UNIV_INTERN
byte*
fil_compress_page(
/*==============*/
fil_space_t* space, /*!< in,out: tablespace (NULL during IMPORT) */
byte* buf, /*!< in: buffer from which to write; in aio
this must be appropriately aligned */
byte* out_buf, /*!< out: compressed buffer */
ulint len, /*!< in: length of input buffer.*/
ulint level, /* in: compression level */
ulint block_size, /*!< in: block size */
bool encrypted, /*!< in: is page also encrypted */
ulint* out_len); /*!< out: actual length of compressed
page */
/****************************************************************//**
For page compressed pages decompress the page after actual read
operation. */
UNIV_INTERN
void
fil_decompress_page(
/*================*/
byte* page_buf, /*!< in: preallocated buffer or NULL */
byte* buf, /*!< out: buffer from which to read; in aio
this must be appropriately aligned */
ulong len, /*!< in: length of output buffer.*/
ulint* write_size, /*!< in/out: Actual payload size of
the compressed data. */
bool return_error=false);
/*!< in: true if only an error should
be produced when decompression fails.
By default this parameter is false. */
/****************************************************************//**
Get space id from fil node
@return space id*/
UNIV_INTERN
ulint
fil_node_get_space_id(
/*==================*/
fil_node_t* node); /*!< in: Node where to get space id*/
/****************************************************************//** /****************************************************************//**
Get block size from fil node Get block size from fil node
@ -120,13 +76,4 @@ ibool
fil_page_is_compressed_encrypted( fil_page_is_compressed_encrypted(
/*=============================*/ /*=============================*/
byte* buf); /*!< in: page */ byte* buf); /*!< in: page */
/*******************************************************************//**
Find out wheather the page is page compressed with lzo method
@return true if page is page compressed with lzo method*/
UNIV_INLINE
ibool
fil_page_is_lzo_compressed(
/*=======================*/
byte* buf); /*!< in: page */
#endif #endif

View File

@ -1,6 +1,6 @@
/***************************************************************************** /*****************************************************************************
Copyright (C) 2013, 2017, MariaDB Corporation. All Rights Reserved. Copyright (C) 2013, 2018, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify it under This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software the terms of the GNU General Public License as published by the Free Software
@ -49,18 +49,6 @@ fsp_flags_get_atomic_writes(
return((atomic_writes_t)FSP_FLAGS_GET_ATOMIC_WRITES(flags)); return((atomic_writes_t)FSP_FLAGS_GET_ATOMIC_WRITES(flags));
} }
/*******************************************************************//**
Find out wheather the page is index page or not
@return true if page type index page, false if not */
UNIV_INLINE
ibool
fil_page_is_index_page(
/*===================*/
byte* buf) /*!< in: page */
{
return(mach_read_from_2(buf+FIL_PAGE_TYPE) == FIL_PAGE_INDEX);
}
/*******************************************************************//** /*******************************************************************//**
Find out wheather the page is page compressed Find out wheather the page is page compressed
@return true if page is page compressed, false if not */ @return true if page is page compressed, false if not */
@ -84,59 +72,3 @@ fil_page_is_compressed_encrypted(
{ {
return(mach_read_from_2(buf+FIL_PAGE_TYPE) == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED); return(mach_read_from_2(buf+FIL_PAGE_TYPE) == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED);
} }
/****************************************************************//**
Get the name of the compression algorithm used for page
compression.
@return compression algorithm name or "UNKNOWN" if not known*/
UNIV_INLINE
const char*
fil_get_compression_alg_name(
/*=========================*/
ulint comp_alg) /*!<in: compression algorithm number */
{
switch(comp_alg) {
case PAGE_UNCOMPRESSED:
return ("uncompressed");
break;
case PAGE_ZLIB_ALGORITHM:
return ("ZLIB");
break;
case PAGE_LZ4_ALGORITHM:
return ("LZ4");
break;
case PAGE_LZO_ALGORITHM:
return ("LZO");
break;
case PAGE_LZMA_ALGORITHM:
return ("LZMA");
break;
case PAGE_BZIP2_ALGORITHM:
return ("BZIP2");
break;
case PAGE_SNAPPY_ALGORITHM:
return ("SNAPPY");
break;
/* No default to get compiler warning */
}
return ("NULL");
}
#ifndef UNIV_INNOCHECKSUM
/*******************************************************************//**
Find out wheather the page is page compressed with lzo method
@return true if page is page compressed with lzo method, false if not */
UNIV_INLINE
ibool
fil_page_is_lzo_compressed(
/*=======================*/
byte* buf) /*!< in: page */
{
return((mach_read_from_2(buf+FIL_PAGE_TYPE) == FIL_PAGE_PAGE_COMPRESSED &&
mach_read_from_8(buf+FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION) == PAGE_LZO_ALGORITHM) ||
(mach_read_from_2(buf+FIL_PAGE_TYPE) == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED &&
mach_read_from_2(buf+FIL_PAGE_DATA+FIL_PAGE_COMPRESSED_SIZE) == PAGE_LZO_ALGORITHM));
}
#endif /* UNIV_INNOCHECKSUM */

View File

@ -42,6 +42,12 @@ Created 2012-02-08 by Sunny Bains.
#include "srv0start.h" #include "srv0start.h"
#include "row0quiesce.h" #include "row0quiesce.h"
#include "fil0pagecompress.h" #include "fil0pagecompress.h"
#ifdef HAVE_LZO
#include "lzo/lzo1x.h"
#endif
#ifdef HAVE_SNAPPY
#include "snappy-c.h"
#endif
#include <vector> #include <vector>
@ -3365,15 +3371,30 @@ fil_iterate(
os_offset_t offset; os_offset_t offset;
ulint n_bytes = iter.n_io_buffers * iter.page_size; ulint n_bytes = iter.n_io_buffers * iter.page_size;
const ulint buf_size = srv_page_size
#ifdef HAVE_LZO
+ LZO1X_1_15_MEM_COMPRESS
#elif defined HAVE_SNAPPY
+ snappy_max_compressed_length(srv_page_size)
#endif
;
byte* page_compress_buf = static_cast<byte*>(
ut_malloc_low(buf_size, false));
ut_ad(!srv_read_only_mode); ut_ad(!srv_read_only_mode);
if (!page_compress_buf) {
return DB_OUT_OF_MEMORY;
}
/* TODO: For ROW_FORMAT=COMPRESSED tables we do a lot of useless /* TODO: For ROW_FORMAT=COMPRESSED tables we do a lot of useless
copying for non-index pages. Unfortunately, it is copying for non-index pages. Unfortunately, it is
required by buf_zip_decompress() */ required by buf_zip_decompress() */
dberr_t err = DB_SUCCESS;
for (offset = iter.start; offset < iter.end; offset += n_bytes) { for (offset = iter.start; offset < iter.end; offset += n_bytes) {
if (callback.is_interrupted()) { if (callback.is_interrupted()) {
return DB_INTERRUPTED; err = DB_INTERRUPTED;
goto func_exit;
} }
byte* io_buffer = iter.io_buffer; byte* io_buffer = iter.io_buffer;
@ -3404,12 +3425,13 @@ fil_iterate(
if (!os_file_read_no_error_handling(iter.file, readptr, if (!os_file_read_no_error_handling(iter.file, readptr,
offset, n_bytes)) { offset, n_bytes)) {
ib_logf(IB_LOG_LEVEL_ERROR, "os_file_read() failed"); ib_logf(IB_LOG_LEVEL_ERROR, "os_file_read() failed");
return DB_IO_ERROR; err = DB_IO_ERROR;
goto func_exit;
} }
bool updated = false; bool updated = false;
ulint n_pages_read = (ulint) n_bytes / iter.page_size;
const ulint size = iter.page_size; const ulint size = iter.page_size;
ulint n_pages_read = ulint(n_bytes) / size;
block->page.offset = offset / size; block->page.offset = offset / size;
for (ulint i = 0; i < n_pages_read; for (ulint i = 0; i < n_pages_read;
@ -3438,11 +3460,11 @@ page_corrupted:
UINT64PF " looks corrupted.", UINT64PF " looks corrupted.",
callback.filename(), callback.filename(),
ulong(offset / size), offset); ulong(offset / size), offset);
return DB_CORRUPTION; err = DB_CORRUPTION;
goto func_exit;
} }
bool decrypted = false; bool decrypted = false;
dberr_t err = DB_SUCCESS;
byte* dst = io_buffer + (i * size); byte* dst = io_buffer + (i * size);
bool frame_changed = false; bool frame_changed = false;
ulint page_type = mach_read_from_2(src+FIL_PAGE_TYPE); ulint page_type = mach_read_from_2(src+FIL_PAGE_TYPE);
@ -3451,6 +3473,10 @@ page_corrupted:
== FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED
|| page_type == FIL_PAGE_PAGE_COMPRESSED; || page_type == FIL_PAGE_PAGE_COMPRESSED;
if (page_compressed && block->page.zip.data) {
goto page_corrupted;
}
if (!encrypted) { if (!encrypted) {
} else if (!mach_read_from_4( } else if (!mach_read_from_4(
FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION
@ -3476,7 +3502,7 @@ not_encrypted:
iter.page_size, src, &err); iter.page_size, src, &err);
if (err != DB_SUCCESS) { if (err != DB_SUCCESS) {
return err; goto func_exit;
} }
if (!decrypted) { if (!decrypted) {
@ -3489,8 +3515,12 @@ not_encrypted:
/* If the original page is page_compressed, we need /* If the original page is page_compressed, we need
to decompress it before adjusting further. */ to decompress it before adjusting further. */
if (page_compressed) { if (page_compressed) {
fil_decompress_page(NULL, dst, ulong(size), ulint compress_length = fil_page_decompress(
NULL); page_compress_buf, dst);
ut_ad(compress_length != srv_page_size);
if (compress_length == 0) {
goto page_corrupted;
}
updated = true; updated = true;
} else if (buf_page_is_corrupted( } else if (buf_page_is_corrupted(
false, false,
@ -3501,7 +3531,7 @@ not_encrypted:
} }
if ((err = callback(block)) != DB_SUCCESS) { if ((err = callback(block)) != DB_SUCCESS) {
return err; goto func_exit;
} else if (!updated) { } else if (!updated) {
updated = buf_block_get_state(block) updated = buf_block_get_state(block)
== BUF_BLOCK_FILE_PAGE; == BUF_BLOCK_FILE_PAGE;
@ -3551,19 +3581,17 @@ not_encrypted:
src = io_buffer + (i * size); src = io_buffer + (i * size);
if (page_compressed) { if (page_compressed) {
ulint len = 0;
fil_compress_page(
NULL,
src,
NULL,
size,
0,/* FIXME: compression level */
512,/* FIXME: use proper block size */
encrypted,
&len);
updated = true; updated = true;
if (fil_page_compress(
src,
page_compress_buf,
0,/* FIXME: compression level */
512,/* FIXME: proper block size */
encrypted)) {
/* FIXME: remove memcpy() */
memcpy(src, page_compress_buf,
srv_page_size);
}
} }
/* If tablespace is encrypted, encrypt page before we /* If tablespace is encrypted, encrypt page before we
@ -3600,11 +3628,14 @@ not_encrypted:
offset, (ulint) n_bytes)) { offset, (ulint) n_bytes)) {
ib_logf(IB_LOG_LEVEL_ERROR, "os_file_write() failed"); ib_logf(IB_LOG_LEVEL_ERROR, "os_file_write() failed");
return DB_IO_ERROR; err = DB_IO_ERROR;
goto func_exit;
} }
} }
return DB_SUCCESS; func_exit:
ut_free(page_compress_buf);
return err;
} }
/********************************************************************//** /********************************************************************//**

View File

@ -77,15 +77,6 @@ Created 11/5/1995 Heikki Tuuri
#include "snappy-c.h" #include "snappy-c.h"
#endif #endif
/** Decrypt a page.
@param[in,out] bpage Page control block
@param[in,out] space tablespace
@return whether the operation was successful */
static
bool
buf_page_decrypt_after_read(buf_page_t* bpage, fil_space_t* space)
MY_ATTRIBUTE((nonnull));
/********************************************************************//** /********************************************************************//**
Mark a table with the specified space pointed by bpage->space corrupted. Mark a table with the specified space pointed by bpage->space corrupted.
Also remove the bpage from LRU list. Also remove the bpage from LRU list.
@ -390,6 +381,143 @@ on the io_type */
? (counter##_READ) \ ? (counter##_READ) \
: (counter##_WRITTEN)) : (counter##_WRITTEN))
/** Reserve a buffer slot for encryption, decryption or page compression.
@param[in,out] buf_pool buffer pool
@return reserved buffer slot */
static buf_tmp_buffer_t* buf_pool_reserve_tmp_slot(buf_pool_t* buf_pool)
{
for (ulint i = 0; i < buf_pool->tmp_arr->n_slots; i++) {
buf_tmp_buffer_t* slot = &buf_pool->tmp_arr->slots[i];
if (slot->acquire()) {
return slot;
}
}
/* We assume that free slot is found */
ut_error;
return NULL;
}
/** Reserve a buffer for encryption, decryption or decompression.
@param[in,out] slot reserved slot */
static void buf_tmp_reserve_crypt_buf(buf_tmp_buffer_t* slot)
{
if (!slot->crypt_buf) {
slot->crypt_buf = static_cast<byte*>(
aligned_malloc(srv_page_size, srv_page_size));
}
}
/** Reserve a buffer for compression.
@param[in,out] slot reserved slot */
static void buf_tmp_reserve_compression_buf(buf_tmp_buffer_t* slot)
{
if (!slot->comp_buf) {
/* Both snappy and lzo compression methods require that
output buffer used for compression is bigger than input
buffer. Increase the allocated buffer size accordingly. */
ulint size = srv_page_size;
#ifdef HAVE_LZO
size += LZO1X_1_15_MEM_COMPRESS;
#elif defined HAVE_SNAPPY
size = snappy_max_compressed_length(size);
#endif
slot->comp_buf = static_cast<byte*>(
aligned_malloc(size, srv_page_size));
}
}
/** Decrypt a page.
@param[in,out] bpage Page control block
@param[in,out] space tablespace
@return whether the operation was successful */
static bool buf_page_decrypt_after_read(buf_page_t* bpage, fil_space_t* space)
{
ut_ad(space->n_pending_ios > 0);
ut_ad(space->id == bpage->space);
byte* dst_frame = bpage->zip.data ? bpage->zip.data :
((buf_block_t*) bpage)->frame;
bool page_compressed = fil_page_is_compressed(dst_frame);
buf_pool_t* buf_pool = buf_pool_from_bpage(bpage);
if (bpage->offset == 0) {
/* File header pages are not encrypted/compressed */
return true;
}
/* Page is encrypted if encryption information is found from
tablespace and page contains used key_version. This is true
also for pages first compressed and then encrypted. */
buf_tmp_buffer_t* slot;
if (page_compressed) {
/* the page we read is unencrypted */
/* Find free slot from temporary memory array */
decompress:
slot = buf_pool_reserve_tmp_slot(buf_pool);
/* For decompression, use crypt_buf. */
buf_tmp_reserve_crypt_buf(slot);
decompress_with_slot:
ut_d(fil_page_type_validate(dst_frame));
bpage->write_size = fil_page_decompress(slot->crypt_buf,
dst_frame);
slot->release();
ut_ad(!bpage->write_size || fil_page_type_validate(dst_frame));
ut_ad(space->n_pending_ios > 0);
return bpage->write_size != 0;
}
if (space->crypt_data
&& mach_read_from_4(FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION
+ dst_frame)) {
/* Verify encryption checksum before we even try to
decrypt. */
if (!fil_space_verify_crypt_checksum(
dst_frame, buf_page_get_zip_size(bpage), NULL,
bpage->offset)) {
decrypt_failed:
/* Mark page encrypted in case it should be. */
if (space->crypt_data->type
!= CRYPT_SCHEME_UNENCRYPTED) {
bpage->encrypted = true;
}
return false;
}
/* Find free slot from temporary memory array */
slot = buf_pool_reserve_tmp_slot(buf_pool);
buf_tmp_reserve_crypt_buf(slot);
ut_d(fil_page_type_validate(dst_frame));
/* decrypt using crypt_buf to dst_frame */
if (!fil_space_decrypt(space, slot->crypt_buf,
dst_frame, &bpage->encrypted)) {
slot->release();
goto decrypt_failed;
}
ut_d(fil_page_type_validate(dst_frame));
if (fil_page_is_compressed_encrypted(dst_frame)) {
goto decompress_with_slot;
}
slot->release();
} else if (fil_page_is_compressed_encrypted(dst_frame)) {
goto decompress;
}
ut_ad(space->n_pending_ios > 0);
return true;
}
/********************************************************************//** /********************************************************************//**
Gets the smallest oldest_modification lsn for any page in the pool. Returns Gets the smallest oldest_modification lsn for any page in the pool. Returns
zero if all modified pages have been flushed to disk. zero if all modified pages have been flushed to disk.
@ -4730,7 +4858,6 @@ buf_page_io_complete(buf_page_t* bpage)
const ibool uncompressed = (buf_page_get_state(bpage) const ibool uncompressed = (buf_page_get_state(bpage)
== BUF_BLOCK_FILE_PAGE); == BUF_BLOCK_FILE_PAGE);
bool have_LRU_mutex = false; bool have_LRU_mutex = false;
byte* frame = NULL;
dberr_t err = DB_SUCCESS; dberr_t err = DB_SUCCESS;
ut_a(buf_page_in_file(bpage)); ut_a(buf_page_in_file(bpage));
@ -4748,19 +4875,18 @@ buf_page_io_complete(buf_page_t* bpage)
ulint read_page_no = 0; ulint read_page_no = 0;
ulint read_space_id = 0; ulint read_space_id = 0;
uint key_version = 0; uint key_version = 0;
byte* frame = bpage->zip.data
ut_ad(bpage->zip.data || ((buf_block_t*)bpage)->frame); ? bpage->zip.data
: reinterpret_cast<buf_block_t*>(bpage)->frame;
ut_ad(frame);
fil_space_t* space = fil_space_acquire_for_io(bpage->space); fil_space_t* space = fil_space_acquire_for_io(bpage->space);
if (!space) { if (!space) {
return(DB_TABLESPACE_DELETED); return(DB_TABLESPACE_DELETED);
} }
buf_page_decrypt_after_read(bpage, space); if (!buf_page_decrypt_after_read(bpage, space)) {
err = DB_DECRYPTION_FAILED;
if (buf_page_get_zip_size(bpage)) { goto database_corrupted;
frame = bpage->zip.data;
} else {
frame = ((buf_block_t*) bpage)->frame;
} }
if (buf_page_get_zip_size(bpage)) { if (buf_page_get_zip_size(bpage)) {
@ -4941,7 +5067,7 @@ database_corrupted:
/* io_type == BUF_IO_WRITE */ /* io_type == BUF_IO_WRITE */
if (bpage->slot) { if (bpage->slot) {
/* Mark slot free */ /* Mark slot free */
bpage->slot->reserved = false; bpage->slot->release();
bpage->slot = NULL; bpage->slot = NULL;
} }
} }
@ -6240,66 +6366,6 @@ buf_pool_mutex_exit(
mutex_exit(&buf_pool->LRU_list_mutex); mutex_exit(&buf_pool->LRU_list_mutex);
} }
/********************************************************************//**
Reserve unused slot from temporary memory array and allocate necessary
temporary memory if not yet allocated.
@return reserved slot */
UNIV_INTERN
buf_tmp_buffer_t*
buf_pool_reserve_tmp_slot(
/*======================*/
buf_pool_t* buf_pool, /*!< in: buffer pool where to
reserve */
bool compressed) /*!< in: is file space compressed */
{
buf_tmp_buffer_t *free_slot=NULL;
/* Array is protected by buf_pool mutex */
buf_pool_mutex_enter(buf_pool);
for(ulint i = 0; i < buf_pool->tmp_arr->n_slots; i++) {
buf_tmp_buffer_t *slot = &buf_pool->tmp_arr->slots[i];
if(slot->reserved == false) {
free_slot = slot;
break;
}
}
/* We assume that free slot is found */
ut_a(free_slot != NULL);
free_slot->reserved = true;
/* Now that we have reserved this slot we can release
buf_pool mutex */
buf_pool_mutex_exit(buf_pool);
/* Allocate temporary memory for encryption/decryption */
if (free_slot->crypt_buf == NULL) {
free_slot->crypt_buf = static_cast<byte*>(aligned_malloc(UNIV_PAGE_SIZE, UNIV_PAGE_SIZE));
memset(free_slot->crypt_buf, 0, UNIV_PAGE_SIZE);
}
/* For page compressed tables allocate temporary memory for
compression/decompression */
if (compressed && free_slot->comp_buf == NULL) {
ulint size = UNIV_PAGE_SIZE;
/* Both snappy and lzo compression methods require that
output buffer used for compression is bigger than input
buffer. Increase the allocated buffer size accordingly. */
#if HAVE_SNAPPY
size = snappy_max_compressed_length(size);
#endif
#if HAVE_LZO
size += LZO1X_1_15_MEM_COMPRESS;
#endif
free_slot->comp_buf = static_cast<byte*>(aligned_malloc(size, UNIV_PAGE_SIZE));
memset(free_slot->comp_buf, 0, size);
}
return (free_slot);
}
/** Encryption and page_compression hook that is called just before /** Encryption and page_compression hook that is called just before
a page is written to disk. a page is written to disk.
@param[in,out] space tablespace @param[in,out] space tablespace
@ -6349,16 +6415,18 @@ buf_page_encrypt_before_write(
} }
ulint zip_size = buf_page_get_zip_size(bpage); ulint zip_size = buf_page_get_zip_size(bpage);
ulint page_size = (zip_size) ? zip_size : UNIV_PAGE_SIZE; ut_ad(!zip_size || !page_compressed);
buf_pool_t* buf_pool = buf_pool_from_bpage(bpage); buf_pool_t* buf_pool = buf_pool_from_bpage(bpage);
/* Find free slot from temporary memory array */ /* Find free slot from temporary memory array */
buf_tmp_buffer_t* slot = buf_pool_reserve_tmp_slot(buf_pool, page_compressed); buf_tmp_buffer_t* slot = buf_pool_reserve_tmp_slot(buf_pool);
slot->out_buf = NULL; slot->out_buf = NULL;
bpage->slot = slot; bpage->slot = slot;
buf_tmp_reserve_crypt_buf(slot);
byte *dst_frame = slot->crypt_buf; byte *dst_frame = slot->crypt_buf;
if (!page_compressed) { if (!page_compressed) {
not_compressed:
/* Encrypt page content */ /* Encrypt page content */
byte* tmp = fil_space_encrypt(space, byte* tmp = fil_space_encrypt(space,
bpage->offset, bpage->offset,
@ -6366,32 +6434,28 @@ buf_page_encrypt_before_write(
src_frame, src_frame,
dst_frame); dst_frame);
bpage->real_size = page_size; bpage->real_size = UNIV_PAGE_SIZE;
slot->out_buf = dst_frame = tmp; slot->out_buf = dst_frame = tmp;
ut_d(fil_page_type_validate(tmp)); ut_d(fil_page_type_validate(tmp));
} else { } else {
/* First we compress the page content */ /* First we compress the page content */
ulint out_len = 0; buf_tmp_reserve_compression_buf(slot);
byte* tmp = slot->comp_buf;
byte *tmp = fil_compress_page( ulint out_len = fil_page_compress(
space, src_frame, tmp,
(byte *)src_frame,
slot->comp_buf,
page_size,
fsp_flags_get_page_compression_level(space->flags), fsp_flags_get_page_compression_level(space->flags),
fil_space_get_block_size(space, bpage->offset), fil_space_get_block_size(space, bpage->offset),
encrypted, encrypted);
&out_len); if (!out_len) {
goto not_compressed;
}
bpage->real_size = out_len; bpage->real_size = out_len;
#ifdef UNIV_DEBUG ut_d(fil_page_type_validate(tmp));
fil_page_type_validate(tmp);
#endif
if(encrypted) {
if (encrypted) {
/* And then we encrypt the page content */ /* And then we encrypt the page content */
tmp = fil_space_encrypt(space, tmp = fil_space_encrypt(space,
bpage->offset, bpage->offset,
@ -6403,130 +6467,8 @@ buf_page_encrypt_before_write(
slot->out_buf = dst_frame = tmp; slot->out_buf = dst_frame = tmp;
} }
#ifdef UNIV_DEBUG ut_d(fil_page_type_validate(dst_frame));
fil_page_type_validate(dst_frame);
#endif
// return dst_frame which will be written // return dst_frame which will be written
return dst_frame; return dst_frame;
} }
/** Decrypt a page.
@param[in,out] bpage Page control block
@param[in,out] space tablespace
@return whether the operation was successful */
static
bool
buf_page_decrypt_after_read(buf_page_t* bpage, fil_space_t* space)
{
ut_ad(space->n_pending_ios > 0);
ut_ad(space->id == bpage->space);
ulint zip_size = buf_page_get_zip_size(bpage);
ulint size = (zip_size) ? zip_size : UNIV_PAGE_SIZE;
byte* dst_frame = (zip_size) ? bpage->zip.data :
((buf_block_t*) bpage)->frame;
unsigned key_version =
mach_read_from_4(dst_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION);
bool page_compressed = fil_page_is_compressed(dst_frame);
bool page_compressed_encrypted = fil_page_is_compressed_encrypted(dst_frame);
buf_pool_t* buf_pool = buf_pool_from_bpage(bpage);
bool success = true;
if (bpage->offset == 0) {
/* File header pages are not encrypted/compressed */
return (true);
}
/* Page is encrypted if encryption information is found from
tablespace and page contains used key_version. This is true
also for pages first compressed and then encrypted. */
if (!space->crypt_data) {
key_version = 0;
}
if (page_compressed) {
/* the page we read is unencrypted */
/* Find free slot from temporary memory array */
buf_tmp_buffer_t* slot = buf_pool_reserve_tmp_slot(buf_pool, page_compressed);
#ifdef UNIV_DEBUG
fil_page_type_validate(dst_frame);
#endif
/* decompress using comp_buf to dst_frame */
fil_decompress_page(slot->comp_buf,
dst_frame,
ulong(size),
&bpage->write_size);
/* Mark this slot as free */
slot->reserved = false;
key_version = 0;
#ifdef UNIV_DEBUG
fil_page_type_validate(dst_frame);
#endif
} else {
buf_tmp_buffer_t* slot = NULL;
if (key_version) {
/* Verify encryption checksum before we even try to
decrypt. */
if (!fil_space_verify_crypt_checksum(dst_frame,
zip_size, NULL, bpage->offset)) {
/* Mark page encrypted in case it should
be. */
if (space->crypt_data->type
!= CRYPT_SCHEME_UNENCRYPTED) {
bpage->encrypted = true;
}
return (false);
}
/* Find free slot from temporary memory array */
slot = buf_pool_reserve_tmp_slot(buf_pool, page_compressed);
#ifdef UNIV_DEBUG
fil_page_type_validate(dst_frame);
#endif
/* decrypt using crypt_buf to dst_frame */
if (!fil_space_decrypt(space, slot->crypt_buf,
dst_frame, &bpage->encrypted)) {
success = false;
}
#ifdef UNIV_DEBUG
fil_page_type_validate(dst_frame);
#endif
}
if (page_compressed_encrypted && success) {
if (!slot) {
slot = buf_pool_reserve_tmp_slot(buf_pool, page_compressed);
}
#ifdef UNIV_DEBUG
fil_page_type_validate(dst_frame);
#endif
/* decompress using comp_buf to dst_frame */
fil_decompress_page(slot->comp_buf,
dst_frame,
ulong(size),
&bpage->write_size);
ut_d(fil_page_type_validate(dst_frame));
}
/* Mark this slot as free */
if (slot) {
slot->reserved = false;
}
}
ut_ad(space->n_pending_ios > 0);
return (success);
}

View File

@ -510,10 +510,11 @@ buf_dblwr_process()
"Restoring possible half-written data pages " "Restoring possible half-written data pages "
"from the doublewrite buffer..."); "from the doublewrite buffer...");
unaligned_read_buf = static_cast<byte*>(ut_malloc(2 * UNIV_PAGE_SIZE)); unaligned_read_buf = static_cast<byte*>(ut_malloc(3 * UNIV_PAGE_SIZE));
read_buf = static_cast<byte*>( read_buf = static_cast<byte*>(
ut_align(unaligned_read_buf, UNIV_PAGE_SIZE)); ut_align(unaligned_read_buf, UNIV_PAGE_SIZE));
byte* const buf = read_buf + UNIV_PAGE_SIZE;
for (std::list<byte*>::iterator i = recv_dblwr.pages.begin(); for (std::list<byte*>::iterator i = recv_dblwr.pages.begin();
i != recv_dblwr.pages.end(); ++i, ++page_no_dblwr ) { i != recv_dblwr.pages.end(); ++i, ++page_no_dblwr ) {
@ -562,13 +563,14 @@ buf_dblwr_process()
ignore this page (there should be redo log ignore this page (there should be redo log
records to initialize it). */ records to initialize it). */
} else { } else {
if (fil_page_is_compressed_encrypted(read_buf) ||
fil_page_is_compressed(read_buf)) {
/* Decompress the page before /* Decompress the page before
validating the checksum. */ validating the checksum. */
fil_decompress_page( ulint decomp = fil_page_decompress(buf, read_buf);
NULL, read_buf, srv_page_size, if (!decomp) {
NULL, true); goto bad;
}
if (!decomp || (decomp != srv_page_size && zip_size)) {
goto bad;
} }
if (fil_space_verify_crypt_checksum( if (fil_space_verify_crypt_checksum(
@ -580,6 +582,7 @@ buf_dblwr_process()
continue; continue;
} }
bad:
/* We intentionally skip this message for /* We intentionally skip this message for
is_all_zero pages. */ is_all_zero pages. */
ib_logf(IB_LOG_LEVEL_INFO, ib_logf(IB_LOG_LEVEL_INFO,
@ -588,18 +591,15 @@ buf_dblwr_process()
space_id, page_no); space_id, page_no);
} }
/* Next, validate the doublewrite page. */ ulint decomp = fil_page_decompress(buf, page);
if (fil_page_is_compressed_encrypted(page) || if (!decomp || (decomp != srv_page_size && zip_size)) {
fil_page_is_compressed(page)) { goto bad_doublewrite;
/* Decompress the page before
validating the checksum. */
fil_decompress_page(
NULL, page, srv_page_size, NULL, true);
} }
if (!fil_space_verify_crypt_checksum(page, zip_size, NULL,
if (!fil_space_verify_crypt_checksum(page, zip_size, NULL, page_no) page_no)
&& buf_page_is_corrupted(true, page, zip_size, space)) { && buf_page_is_corrupted(true, page, zip_size, space)) {
if (!is_all_zero) { if (!is_all_zero) {
bad_doublewrite:
ib_logf(IB_LOG_LEVEL_WARN, ib_logf(IB_LOG_LEVEL_WARN,
"A doublewrite copy of page " "A doublewrite copy of page "
ULINTPF ":" ULINTPF " is corrupted.", ULINTPF ":" ULINTPF " is corrupted.",

View File

@ -708,60 +708,39 @@ fil_space_encrypt(
#ifdef UNIV_DEBUG #ifdef UNIV_DEBUG
if (tmp) { if (tmp) {
/* Verify that encrypted buffer is not corrupted */ /* Verify that encrypted buffer is not corrupted */
byte* tmp_mem = (byte *)malloc(UNIV_PAGE_SIZE);
dberr_t err = DB_SUCCESS; dberr_t err = DB_SUCCESS;
byte* src = src_frame; byte* src = src_frame;
bool page_compressed_encrypted = (mach_read_from_2(tmp+FIL_PAGE_TYPE) == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED); bool page_compressed_encrypted = (mach_read_from_2(tmp+FIL_PAGE_TYPE) == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED);
byte* comp_mem = NULL; byte uncomp_mem[UNIV_PAGE_SIZE_MAX];
byte* uncomp_mem = NULL; byte tmp_mem[UNIV_PAGE_SIZE_MAX];
ulint size = (zip_size) ? zip_size : UNIV_PAGE_SIZE; ulint size = (zip_size) ? zip_size : UNIV_PAGE_SIZE;
if (page_compressed_encrypted) { if (page_compressed_encrypted) {
comp_mem = (byte *)malloc(UNIV_PAGE_SIZE); memcpy(uncomp_mem, src, srv_page_size);
uncomp_mem = (byte *)malloc(UNIV_PAGE_SIZE); ulint unzipped1 = fil_page_decompress(
memcpy(comp_mem, src_frame, UNIV_PAGE_SIZE); tmp_mem, uncomp_mem);
fil_decompress_page(uncomp_mem, comp_mem, ut_ad(unzipped1);
srv_page_size, NULL); if (unzipped1 != srv_page_size) {
src = uncomp_mem; src = uncomp_mem;
} }
}
bool corrupted1 = buf_page_is_corrupted(true, src, zip_size, space); ut_ad(!buf_page_is_corrupted(true, src, zip_size, space));
bool ok = fil_space_decrypt(crypt_data, tmp_mem, size, tmp, &err); ut_ad(fil_space_decrypt(crypt_data, tmp_mem, size, tmp, &err));
ut_ad(err == DB_SUCCESS);
/* Need to decompress the page if it was also compressed */ /* Need to decompress the page if it was also compressed */
if (page_compressed_encrypted) { if (page_compressed_encrypted) {
memcpy(comp_mem, tmp_mem, UNIV_PAGE_SIZE); byte buf[UNIV_PAGE_SIZE_MAX];
fil_decompress_page(tmp_mem, comp_mem, memcpy(buf, tmp_mem, srv_page_size);
srv_page_size, NULL); ulint unzipped2 = fil_page_decompress(tmp_mem, buf);
ut_ad(unzipped2);
} }
bool corrupted = buf_page_is_corrupted(true, tmp_mem, zip_size, space); memcpy(tmp_mem + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION,
memcpy(tmp_mem+FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION, src+FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION, 8); src + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION, 8);
bool different = memcmp(src, tmp_mem, size); ut_ad(!memcmp(src, tmp_mem, size));
if (!ok || corrupted || corrupted1 || err != DB_SUCCESS || different) {
fprintf(stderr, "ok %d corrupted %d corrupted1 %d err %d different %d\n",
ok , corrupted, corrupted1, err, different);
fprintf(stderr, "src_frame\n");
buf_page_print(src_frame, zip_size);
fprintf(stderr, "encrypted_frame\n");
buf_page_print(tmp, zip_size);
fprintf(stderr, "decrypted_frame\n");
buf_page_print(tmp_mem, zip_size);
ut_ad(0);
} }
free(tmp_mem);
if (comp_mem) {
free(comp_mem);
}
if (uncomp_mem) {
free(uncomp_mem);
}
}
#endif /* UNIV_DEBUG */ #endif /* UNIV_DEBUG */
return tmp; return tmp;

View File

@ -351,19 +351,6 @@ fil_space_get_by_id(
return(space); return(space);
} }
/****************************************************************//**
Get space id from fil node */
ulint
fil_node_get_space_id(
/*==================*/
fil_node_t* node) /*!< in: Compressed node*/
{
ut_ad(node);
ut_ad(node->space);
return (node->space->id);
}
/*******************************************************************//** /*******************************************************************//**
Returns the table space by a given name, NULL if not found. */ Returns the table space by a given name, NULL if not found. */
fil_space_t* fil_space_t*

View File

@ -80,73 +80,26 @@ static ulint srv_data_read, srv_data_written;
#include "snappy-c.h" #include "snappy-c.h"
#endif #endif
/* Used for debugging */ /** Compress a page_compressed page before writing to a data file.
//#define UNIV_PAGECOMPRESS_DEBUG 1 @param[in] buf page to be compressed
@param[out] out_buf compressed page
/****************************************************************//** @param[in] level compression level
For page compressed pages compress the page before actual write @param[in] block_size file system block size
operation. @param[in] encrypted whether the page will be subsequently encrypted
@return compressed page to be written*/ @return actual length of compressed page
UNIV_INTERN @retval 0 if the page was not compressed */
byte* UNIV_INTERN ulint fil_page_compress(const byte* buf, byte* out_buf, ulint level,
fil_compress_page( ulint block_size, bool encrypted)
/*==============*/
fil_space_t* space, /*!< in,out: tablespace (NULL during IMPORT) */
byte* buf, /*!< in: buffer from which to write; in aio
this must be appropriately aligned */
byte* out_buf, /*!< out: compressed buffer */
ulint len, /*!< in: length of input buffer.*/
ulint level, /* in: compression level */
ulint block_size, /*!< in: block size */
bool encrypted, /*!< in: is page also encrypted */
ulint* out_len) /*!< out: actual length of compressed
page */
{ {
int err = Z_OK; int comp_level = int(level);
int comp_level = level;
ulint header_len = FIL_PAGE_DATA + FIL_PAGE_COMPRESSED_SIZE; ulint header_len = FIL_PAGE_DATA + FIL_PAGE_COMPRESSED_SIZE;
ulint write_size = 0;
#if HAVE_LZO
lzo_uint write_size_lzo = write_size;
#endif
/* Cache to avoid change during function execution */ /* Cache to avoid change during function execution */
ulint comp_method = innodb_compression_algorithm; ulint comp_method = innodb_compression_algorithm;
bool allocated = false;
/* page_compression does not apply to tables or tablespaces
that use ROW_FORMAT=COMPRESSED */
ut_ad(!space || !FSP_FLAGS_GET_ZIP_SSIZE(space->flags));
if (encrypted) { if (encrypted) {
header_len += FIL_PAGE_COMPRESSION_METHOD_SIZE; header_len += FIL_PAGE_COMPRESSION_METHOD_SIZE;
} }
if (!out_buf) {
allocated = true;
ulint size = UNIV_PAGE_SIZE;
/* Both snappy and lzo compression methods require that
output buffer used for compression is bigger than input
buffer. Increase the allocated buffer size accordingly. */
#if HAVE_SNAPPY
if (comp_method == PAGE_SNAPPY_ALGORITHM) {
size = snappy_max_compressed_length(size);
}
#endif
#if HAVE_LZO
if (comp_method == PAGE_LZO_ALGORITHM) {
size += LZO1X_1_15_MEM_COMPRESS;
}
#endif
out_buf = static_cast<byte *>(ut_malloc(size));
}
ut_ad(buf);
ut_ad(out_buf);
ut_ad(len);
ut_ad(out_len);
/* Let's not compress file space header or /* Let's not compress file space header or
extent descriptor */ extent descriptor */
switch (fil_page_get_type(buf)) { switch (fil_page_get_type(buf)) {
@ -154,8 +107,7 @@ fil_compress_page(
case FIL_PAGE_TYPE_FSP_HDR: case FIL_PAGE_TYPE_FSP_HDR:
case FIL_PAGE_TYPE_XDES: case FIL_PAGE_TYPE_XDES:
case FIL_PAGE_PAGE_COMPRESSED: case FIL_PAGE_PAGE_COMPRESSED:
*out_len = len; return 0;
goto err_exit;
} }
/* If no compression level was provided to this table, use system /* If no compression level was provided to this table, use system
@ -164,204 +116,113 @@ fil_compress_page(
comp_level = page_zip_level; comp_level = page_zip_level;
} }
DBUG_PRINT("compress", ulint write_size = srv_page_size - header_len;
("Preparing for space " ULINTPF " '%s' len " ULINTPF,
space ? space->id : 0,
space ? space->name : "(import)",
len));
write_size = UNIV_PAGE_SIZE - header_len; switch (comp_method) {
default:
switch(comp_method) { ut_ad(!"unknown compression method");
/* fall through */
case PAGE_UNCOMPRESSED:
return 0;
case PAGE_ZLIB_ALGORITHM:
{
ulong len = uLong(write_size);
if (Z_OK == compress2(
out_buf + header_len, &len,
buf, uLong(srv_page_size), comp_level)) {
write_size = len;
goto success;
}
}
break;
#ifdef HAVE_LZ4 #ifdef HAVE_LZ4
case PAGE_LZ4_ALGORITHM: case PAGE_LZ4_ALGORITHM:
# ifdef HAVE_LZ4_COMPRESS_DEFAULT
write_size = LZ4_compress_default(
reinterpret_cast<const char*>(buf),
reinterpret_cast<char*>(out_buf) + header_len,
int(srv_page_size), int(write_size));
# else
write_size = LZ4_compress_limitedOutput(
reinterpret_cast<const char*>(buf),
reinterpret_cast<char*>(out_buf) + header_len,
int(srv_page_size), int(write_size));
# endif
#ifdef HAVE_LZ4_COMPRESS_DEFAULT if (write_size) {
err = LZ4_compress_default((const char *)buf, goto success;
(char *)out_buf+header_len, len, write_size);
#else
err = LZ4_compress_limitedOutput((const char *)buf,
(char *)out_buf+header_len, len, write_size);
#endif /* HAVE_LZ4_COMPRESS_DEFAULT */
write_size = err;
if (err == 0) {
/* If error we leave the actual page as it was */
#ifndef UNIV_PAGECOMPRESS_DEBUG
if (space && !space->printed_compression_failure) {
space->printed_compression_failure = true;
#endif
ib_logf(IB_LOG_LEVEL_WARN,
"Compression failed for space " ULINTPF
" name %s len " ULINTPF
" err %d write_size " ULINTPF ".",
space->id, space->name, len,
err, write_size);
#ifndef UNIV_PAGECOMPRESS_DEBUG
}
#endif
srv_stats.pages_page_compression_error.inc();
*out_len = len;
goto err_exit;
} }
break; break;
#endif /* HAVE_LZ4 */ #endif /* HAVE_LZ4 */
#ifdef HAVE_LZO #ifdef HAVE_LZO
case PAGE_LZO_ALGORITHM: case PAGE_LZO_ALGORITHM: {
err = lzo1x_1_15_compress( lzo_uint len = write_size;
buf, len, out_buf+header_len, &write_size_lzo, out_buf+UNIV_PAGE_SIZE);
write_size = write_size_lzo; if (LZO_E_OK == lzo1x_1_15_compress(
buf, srv_page_size,
if (err != LZO_E_OK || write_size > UNIV_PAGE_SIZE-header_len) { out_buf + header_len, &len,
if (space && !space->printed_compression_failure) { out_buf + srv_page_size)
space->printed_compression_failure = true; && len <= write_size) {
ib_logf(IB_LOG_LEVEL_WARN, write_size = len;
"Compression failed for space " ULINTPF goto success;
" name %s len " ULINTPF
" err %d write_size " ULINTPF ".",
space->id, space->name, len,
err, write_size);
} }
srv_stats.pages_page_compression_error.inc();
*out_len = len;
goto err_exit;
}
break; break;
}
#endif /* HAVE_LZO */ #endif /* HAVE_LZO */
#ifdef HAVE_LZMA #ifdef HAVE_LZMA
case PAGE_LZMA_ALGORITHM: { case PAGE_LZMA_ALGORITHM: {
size_t out_pos=0; size_t out_pos = 0;
err = lzma_easy_buffer_encode(
comp_level,
LZMA_CHECK_NONE,
NULL, /* No custom allocator, use malloc/free */
reinterpret_cast<uint8_t*>(buf),
len,
reinterpret_cast<uint8_t*>(out_buf + header_len),
&out_pos,
(size_t)write_size);
if (err != LZMA_OK || out_pos > UNIV_PAGE_SIZE-header_len) {
if (space && !space->printed_compression_failure) {
space->printed_compression_failure = true;
ib_logf(IB_LOG_LEVEL_WARN,
"Compression failed for space " ULINTPF
" name %s len " ULINTPF
" err %d write_size " ULINTPF ".",
space->id, space->name, len,
err, out_pos);
}
srv_stats.pages_page_compression_error.inc();
*out_len = len;
goto err_exit;
}
if (LZMA_OK == lzma_easy_buffer_encode(
comp_level, LZMA_CHECK_NONE, NULL,
buf, srv_page_size, out_buf + header_len,
&out_pos, write_size)
&& out_pos <= write_size) {
write_size = out_pos; write_size = out_pos;
goto success;
}
break; break;
} }
#endif /* HAVE_LZMA */ #endif /* HAVE_LZMA */
#ifdef HAVE_BZIP2 #ifdef HAVE_BZIP2
case PAGE_BZIP2_ALGORITHM: { case PAGE_BZIP2_ALGORITHM: {
unsigned len = unsigned(write_size);
err = BZ2_bzBuffToBuffCompress( if (BZ_OK == BZ2_bzBuffToBuffCompress(
(char *)(out_buf + header_len), reinterpret_cast<char*>(out_buf + header_len),
(unsigned int *)&write_size, &len,
(char *)buf, const_cast<char*>(
len, reinterpret_cast<const char*>(buf)),
1, unsigned(srv_page_size), 1, 0, 0)
0, && len <= write_size) {
0); write_size = len;
goto success;
if (err != BZ_OK || write_size > UNIV_PAGE_SIZE-header_len) {
if (space && !space->printed_compression_failure) {
space->printed_compression_failure = true;
ib_logf(IB_LOG_LEVEL_WARN,
"Compression failed for space " ULINTPF
" name %s len " ULINTPF
" err %d write_size " ULINTPF ".",
space->id, space->name, len,
err, write_size);
}
srv_stats.pages_page_compression_error.inc();
*out_len = len;
goto err_exit;
} }
break; break;
} }
#endif /* HAVE_BZIP2 */ #endif /* HAVE_BZIP2 */
#ifdef HAVE_SNAPPY #ifdef HAVE_SNAPPY
case PAGE_SNAPPY_ALGORITHM: case PAGE_SNAPPY_ALGORITHM: {
{ size_t len = snappy_max_compressed_length(srv_page_size);
snappy_status cstatus;
write_size = snappy_max_compressed_length(UNIV_PAGE_SIZE);
cstatus = snappy_compress( if (SNAPPY_OK == snappy_compress(
(const char *)buf, reinterpret_cast<const char*>(buf),
(size_t)len, srv_page_size,
(char *)(out_buf+header_len), reinterpret_cast<char*>(out_buf) + header_len,
(size_t*)&write_size); &len)
&& len <= write_size) {
if (cstatus != SNAPPY_OK || write_size > UNIV_PAGE_SIZE-header_len) { write_size = len;
if (space && !space->printed_compression_failure) { goto success;
space->printed_compression_failure = true;
ib_logf(IB_LOG_LEVEL_WARN,
"Compression failed for space " ULINTPF
" name %s len " ULINTPF
" err %d write_size " ULINTPF ".",
space->id, space->name, len,
(int)cstatus, write_size);
}
srv_stats.pages_page_compression_error.inc();
*out_len = len;
goto err_exit;
} }
break; break;
} }
#endif /* HAVE_SNAPPY */ #endif /* HAVE_SNAPPY */
case PAGE_ZLIB_ALGORITHM:
err = compress2(out_buf+header_len, (ulong*)&write_size, buf,
uLong(len), comp_level);
if (err != Z_OK) {
/* If error we leave the actual page as it was */
if (space && !space->printed_compression_failure) {
space->printed_compression_failure = true;
ib_logf(IB_LOG_LEVEL_WARN,
"Compression failed for space " ULINTPF
" name %s len " ULINTPF
" rt %d write_size " ULINTPF ".",
space->id, space->name, len,
err, write_size);
} }
srv_stats.pages_page_compression_error.inc(); srv_stats.pages_page_compression_error.inc();
*out_len = len; return 0;
goto err_exit; success:
}
break;
case PAGE_UNCOMPRESSED:
*out_len = len;
return (buf);
break;
default:
ut_error;
break;
}
/* Set up the page header */ /* Set up the page header */
memcpy(out_buf, buf, FIL_PAGE_DATA); memcpy(out_buf, buf, FIL_PAGE_DATA);
/* Set up the checksum */ /* Set up the checksum */
@ -392,22 +253,11 @@ fil_compress_page(
/* Verify that page can be decompressed */ /* Verify that page can be decompressed */
{ {
byte *comp_page; page_t tmp_buf[UNIV_PAGE_SIZE_MAX];
byte *uncomp_page; page_t page[UNIV_PAGE_SIZE_MAX];
memcpy(page, out_buf, srv_page_size);
comp_page = static_cast<byte *>(ut_malloc(UNIV_PAGE_SIZE)); ut_ad(fil_page_decompress(tmp_buf, page));
uncomp_page = static_cast<byte *>(ut_malloc(UNIV_PAGE_SIZE)); ut_ad(!buf_page_is_corrupted(false, page, 0, NULL));
memcpy(comp_page, out_buf, UNIV_PAGE_SIZE);
fil_decompress_page(uncomp_page, comp_page, ulong(len), NULL);
if (buf_page_is_corrupted(false, uncomp_page, 0, space)) {
buf_page_print(uncomp_page, 0);
ut_ad(0);
}
ut_free(comp_page);
ut_free(uncomp_page);
} }
#endif /* UNIV_DEBUG */ #endif /* UNIV_DEBUG */
@ -431,323 +281,144 @@ fil_compress_page(
#endif #endif
} }
DBUG_PRINT("compress", srv_stats.page_compression_saved.add(srv_page_size - write_size);
("Succeeded for space " ULINTPF
" '%s' len " ULINTPF " out_len " ULINTPF,
space ? space->id : 0,
space ? space->name : "(import)",
len, write_size));
srv_stats.page_compression_saved.add((len - write_size));
srv_stats.pages_page_compressed.inc(); srv_stats.pages_page_compressed.inc();
/* If we do not persistently trim rest of page, we need to write it /* If we do not persistently trim rest of page, we need to write it
all */ all */
if (!srv_use_trim) { if (!srv_use_trim) {
memset(out_buf+write_size,0,len-write_size); memset(out_buf + write_size, 0, srv_page_size - write_size);
write_size = len;
} }
*out_len = write_size; return write_size;
if (allocated) {
/* TODO: reduce number of memcpy's */
memcpy(buf, out_buf, len);
} else {
return(out_buf);
}
err_exit:
if (allocated) {
ut_free(out_buf);
}
return (buf);
} }
/****************************************************************//** /** Decompress a page that may be subject to page_compressed compression.
For page compressed pages decompress the page after actual read @param[in,out] tmp_buf temporary buffer (of innodb_page_size)
operation. */ @param[in,out] buf possibly compressed page buffer
UNIV_INTERN @return size of the compressed data
void @retval 0 if decompression failed
fil_decompress_page( @retval srv_page_size if the page was not compressed */
/*================*/ UNIV_INTERN ulint fil_page_decompress(byte* tmp_buf, byte* buf)
byte* page_buf, /*!< in: preallocated buffer or NULL */
byte* buf, /*!< out: buffer from which to read; in aio
this must be appropriately aligned */
ulong len, /*!< in: length of output buffer.*/
ulint* write_size, /*!< in/out: Actual payload size of
the compressed data. */
bool return_error) /*!< in: true if only an error should
be produced when decompression fails.
By default this parameter is false. */
{ {
int err = 0; const unsigned ptype = mach_read_from_2(buf+FIL_PAGE_TYPE);
ulint actual_size = 0; ulint header_len;
ulint compression_alg = 0; ib_uint64_t compression_alg;
byte *in_buf; switch (ptype) {
ulint ptype; case FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED:
ulint header_len = FIL_PAGE_DATA + FIL_PAGE_COMPRESSED_SIZE; header_len = FIL_PAGE_DATA + FIL_PAGE_COMPRESSED_SIZE
+ FIL_PAGE_COMPRESSION_METHOD_SIZE;
ut_ad(buf); compression_alg = mach_read_from_2(
ut_ad(len); FIL_PAGE_DATA + FIL_PAGE_COMPRESSED_SIZE + buf);
break;
ptype = mach_read_from_2(buf+FIL_PAGE_TYPE); case FIL_PAGE_PAGE_COMPRESSED:
header_len = FIL_PAGE_DATA + FIL_PAGE_COMPRESSED_SIZE;
if (ptype == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED) { compression_alg = mach_read_from_8(
header_len += FIL_PAGE_COMPRESSION_METHOD_SIZE; FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION + buf);
break;
default:
return srv_page_size;
} }
/* Do not try to uncompressed pages that are not compressed */ if (mach_read_from_4(buf + FIL_PAGE_SPACE_OR_CHKSUM)
if (ptype != FIL_PAGE_PAGE_COMPRESSED && != BUF_NO_CHECKSUM_MAGIC) {
ptype != FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED) { return 0;
return;
} }
// If no buffer was given, we need to allocate temporal buffer ulint actual_size = mach_read_from_2(buf + FIL_PAGE_DATA);
if (page_buf == NULL) {
in_buf = static_cast<byte *>(ut_malloc(UNIV_PAGE_SIZE));
memset(in_buf, 0, UNIV_PAGE_SIZE);
} else {
in_buf = page_buf;
}
/* Before actual decompress, make sure that page type is correct */
if (mach_read_from_4(buf+FIL_PAGE_SPACE_OR_CHKSUM) != BUF_NO_CHECKSUM_MAGIC ||
(ptype != FIL_PAGE_PAGE_COMPRESSED &&
ptype != FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED)) {
ib_logf(IB_LOG_LEVEL_ERROR,
"Corruption: We try to uncompress corrupted page"
" CRC " ULINTPF " type " ULINTPF " len " ULINTPF ".",
mach_read_from_4(buf+FIL_PAGE_SPACE_OR_CHKSUM),
mach_read_from_2(buf+FIL_PAGE_TYPE), len);
fflush(stderr);
if (return_error) {
goto error_return;
}
ut_error;
}
/* Get compression algorithm */
if (ptype == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED) {
compression_alg = mach_read_from_2(buf+FIL_PAGE_DATA+FIL_PAGE_COMPRESSED_SIZE);
} else {
compression_alg = mach_read_from_8(buf+FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION);
}
/* Get the actual size of compressed page */
actual_size = mach_read_from_2(buf+FIL_PAGE_DATA);
/* Check if payload size is corrupted */ /* Check if payload size is corrupted */
if (actual_size == 0 || actual_size > UNIV_PAGE_SIZE) { if (actual_size == 0 || actual_size > srv_page_size - header_len) {
return 0;
}
switch (compression_alg) {
default:
ib_logf(IB_LOG_LEVEL_ERROR, ib_logf(IB_LOG_LEVEL_ERROR,
"Corruption: We try to uncompress corrupted page" "Unknown compression algorithm " UINT64PF,
" actual size " ULINTPF " compression %s.", compression_alg);
actual_size, fil_get_compression_alg_name(compression_alg)); return 0;
fflush(stderr);
if (return_error) {
goto error_return;
}
ut_error;
}
/* Store actual payload size of the compressed data. This pointer
points to buffer pool. */
if (write_size) {
*write_size = actual_size;
}
DBUG_PRINT("compress",
("Preparing for decompress for len " ULINTPF ".",
actual_size));
switch(compression_alg) {
case PAGE_ZLIB_ALGORITHM: case PAGE_ZLIB_ALGORITHM:
err= uncompress(in_buf, &len, buf+header_len, (unsigned long)actual_size); {
uLong len = srv_page_size;
/* If uncompress fails it means that page is corrupted */ if (Z_OK != uncompress(tmp_buf, &len,
if (err != Z_OK) { buf + header_len,
uLong(actual_size))
ib_logf(IB_LOG_LEVEL_ERROR, && len != srv_page_size) {
"Corruption: Page is marked as compressed" return 0;
" but uncompress failed with error %d "
" size " ULINTPF " len " ULINTPF ".",
err, actual_size, len);
fflush(stderr);
if (return_error) {
goto error_return;
} }
ut_error;
} }
break; break;
#ifdef HAVE_LZ4 #ifdef HAVE_LZ4
case PAGE_LZ4_ALGORITHM: case PAGE_LZ4_ALGORITHM:
err = LZ4_decompress_fast((const char *)buf+header_len, (char *)in_buf, len); if (LZ4_decompress_safe(reinterpret_cast<const char*>(buf)
+ header_len,
if (err != (int)actual_size) { reinterpret_cast<char*>(tmp_buf),
ib_logf(IB_LOG_LEVEL_ERROR, actual_size, srv_page_size)
"Corruption: Page is marked as compressed" == int(srv_page_size)) {
" but uncompress failed with error %d "
" size " ULINTPF " len " ULINTPF ".",
err, actual_size, len);
fflush(stderr);
if (return_error) {
goto error_return;
}
ut_error;
}
break; break;
}
return 0;
#endif /* HAVE_LZ4 */ #endif /* HAVE_LZ4 */
#ifdef HAVE_LZO #ifdef HAVE_LZO
case PAGE_LZO_ALGORITHM: { case PAGE_LZO_ALGORITHM: {
ulint olen = 0; lzo_uint len_lzo = srv_page_size;
lzo_uint olen_lzo = olen; if (LZO_E_OK == lzo1x_decompress_safe(
err = lzo1x_decompress((const unsigned char *)buf+header_len, buf + header_len,
actual_size,(unsigned char *)in_buf, &olen_lzo, NULL); actual_size, tmp_buf, &len_lzo, NULL)
&& len_lzo == srv_page_size) {
olen = olen_lzo;
if (err != LZO_E_OK || (olen == 0 || olen > UNIV_PAGE_SIZE)) {
ib_logf(IB_LOG_LEVEL_ERROR,
"Corruption: Page is marked as compressed"
" but uncompress failed with error %d "
" size " ULINTPF " len " ULINTPF ".",
err, actual_size, len);
fflush(stderr);
if (return_error) {
goto error_return;
}
ut_error;
}
break; break;
} }
return 0;
}
#endif /* HAVE_LZO */ #endif /* HAVE_LZO */
#ifdef HAVE_LZMA #ifdef HAVE_LZMA
case PAGE_LZMA_ALGORITHM: { case PAGE_LZMA_ALGORITHM: {
lzma_ret ret;
size_t src_pos = 0; size_t src_pos = 0;
size_t dst_pos = 0; size_t dst_pos = 0;
uint64_t memlimit = UINT64_MAX; uint64_t memlimit = UINT64_MAX;
ret = lzma_stream_buffer_decode( if (LZMA_OK == lzma_stream_buffer_decode(
&memlimit, &memlimit, 0, NULL, buf + header_len,
0, &src_pos, actual_size, tmp_buf, &dst_pos,
NULL, srv_page_size)
buf+header_len, && dst_pos == srv_page_size) {
&src_pos,
actual_size,
in_buf,
&dst_pos,
len);
if (ret != LZMA_OK || (dst_pos == 0 || dst_pos > UNIV_PAGE_SIZE)) {
ib_logf(IB_LOG_LEVEL_ERROR,
"Corruption: Page is marked as compressed"
" but decompression read only %ld bytes"
" size " ULINTPF "len " ULINTPF ".",
dst_pos, actual_size, len);
fflush(stderr);
if (return_error) {
goto error_return;
}
ut_error;
}
break; break;
} }
return 0;
}
#endif /* HAVE_LZMA */ #endif /* HAVE_LZMA */
#ifdef HAVE_BZIP2 #ifdef HAVE_BZIP2
case PAGE_BZIP2_ALGORITHM: { case PAGE_BZIP2_ALGORITHM: {
unsigned int dst_pos = UNIV_PAGE_SIZE; unsigned int dst_pos = srv_page_size;
if (BZ_OK == BZ2_bzBuffToBuffDecompress(
err = BZ2_bzBuffToBuffDecompress( reinterpret_cast<char*>(tmp_buf),
(char *)in_buf,
&dst_pos, &dst_pos,
(char *)(buf+header_len), reinterpret_cast<char*>(buf) + header_len,
actual_size, actual_size, 1, 0)
1, && dst_pos == srv_page_size) {
0);
if (err != BZ_OK || (dst_pos == 0 || dst_pos > UNIV_PAGE_SIZE)) {
ib_logf(IB_LOG_LEVEL_ERROR,
"Corruption: Page is marked as compressed"
" but decompression read only %du bytes"
" size " ULINTPF " len " ULINTPF " err %d.",
dst_pos, actual_size, len, err);
fflush(stderr);
if (return_error) {
goto error_return;
}
ut_error;
}
break; break;
} }
return 0;
}
#endif /* HAVE_BZIP2 */ #endif /* HAVE_BZIP2 */
#ifdef HAVE_SNAPPY #ifdef HAVE_SNAPPY
case PAGE_SNAPPY_ALGORITHM: case PAGE_SNAPPY_ALGORITHM: {
{ size_t olen = srv_page_size;
snappy_status cstatus;
ulint olen = UNIV_PAGE_SIZE;
cstatus = snappy_uncompress(
(const char *)(buf+header_len),
(size_t)actual_size,
(char *)in_buf,
(size_t*)&olen);
if (cstatus != SNAPPY_OK || olen != UNIV_PAGE_SIZE) {
ib_logf(IB_LOG_LEVEL_ERROR,
"Corruption: Page is marked as compressed"
" but decompression read only " ULINTPF " bytes"
" size " ULINTPF " len " ULINTPF " err %d.",
olen, actual_size, len, (int)cstatus);
fflush(stderr);
if (return_error) {
goto error_return;
}
ut_error;
}
if (SNAPPY_OK == snappy_uncompress(
reinterpret_cast<const char*>(buf) + header_len,
actual_size,
reinterpret_cast<char*>(tmp_buf), &olen)
&& olen == srv_page_size) {
break; break;
} }
return 0;
}
#endif /* HAVE_SNAPPY */ #endif /* HAVE_SNAPPY */
default:
ib_logf(IB_LOG_LEVEL_ERROR,
"Corruption: Page is marked as compressed"
" but compression algorithm %s"
" is not known."
,fil_get_compression_alg_name(compression_alg));
fflush(stderr);
if (return_error) {
goto error_return;
}
ut_error;
break;
} }
srv_stats.pages_page_decompressed.inc(); srv_stats.pages_page_decompressed.inc();
memcpy(buf, tmp_buf, srv_page_size);
/* Copy the uncompressed page to the buffer pool, not return actual_size;
really any other options. */
memcpy(buf, in_buf, len);
error_return:
if (page_buf != in_buf) {
ut_free(in_buf);
}
} }

View File

@ -5218,6 +5218,10 @@ ibuf_check_bitmap_on_import(
bitmap_page = ibuf_bitmap_get_map_page( bitmap_page = ibuf_bitmap_get_map_page(
space_id, page_no, zip_size, &mtr); space_id, page_no, zip_size, &mtr);
if (!bitmap_page) {
mutex_exit(&ibuf_mutex);
return DB_CORRUPTION;
}
for (i = FSP_IBUF_BITMAP_OFFSET + 1; i < page_size; i++) { for (i = FSP_IBUF_BITMAP_OFFSET + 1; i < page_size; i++) {
const ulint offset = page_no + i; const ulint offset = page_no + i;

View File

@ -1,7 +1,7 @@
/***************************************************************************** /*****************************************************************************
Copyright (c) 1995, 2016, Oracle and/or its affiliates. All Rights Reserved. Copyright (c) 1995, 2016, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2013, 2017, MariaDB Corporation. Copyright (c) 2013, 2018, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify it under This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software the terms of the GNU General Public License as published by the Free Software
@ -38,6 +38,7 @@ Created 11/5/1995 Heikki Tuuri
#include "ut0rbt.h" #include "ut0rbt.h"
#include "os0proc.h" #include "os0proc.h"
#include "log0log.h" #include "log0log.h"
#include "my_atomic.h"
/** @name Modes for buf_page_get_gen */ /** @name Modes for buf_page_get_gen */
/* @{ */ /* @{ */
@ -1528,45 +1529,16 @@ buf_page_encrypt_before_write(
buf_page_t* bpage, buf_page_t* bpage,
byte* src_frame); byte* src_frame);
/**********************************************************************
The hook that is called after page is written to disk.
The function releases any resources needed for encryption that was allocated
in buf_page_encrypt_before_write */
UNIV_INTERN
ibool
buf_page_encrypt_after_write(
/*=========================*/
buf_page_t* page); /*!< in/out: buffer page that was flushed */
/********************************************************************//**
The hook that is called just before a page is read from disk.
The function allocates memory that is used to temporarily store disk content
before getting decrypted */
UNIV_INTERN
byte*
buf_page_decrypt_before_read(
/*=========================*/
buf_page_t* page, /*!< in/out: buffer page read from disk */
ulint zip_size); /*!< in: compressed page size, or 0 */
/********************************************************************//**
The hook that is called just after a page is read from disk.
The function decrypt disk content into buf_page_t and releases the
temporary buffer that was allocated in buf_page_decrypt_before_read */
UNIV_INTERN
bool
buf_page_decrypt_after_read(
/*========================*/
buf_page_t* page); /*!< in/out: buffer page read from disk */
/** @brief The temporary memory structure. /** @brief The temporary memory structure.
NOTE! The definition appears here only for other modules of this NOTE! The definition appears here only for other modules of this
directory (buf) to see it. Do not use from outside! */ directory (buf) to see it. Do not use from outside! */
typedef struct { typedef struct {
bool reserved; /*!< true if this slot is reserved private:
int32 reserved; /*!< true if this slot is reserved
*/ */
public:
byte* crypt_buf; /*!< for encryption the data needs to be byte* crypt_buf; /*!< for encryption the data needs to be
copied to a separate buffer before it's copied to a separate buffer before it's
encrypted&written. this as a page can be encrypted&written. this as a page can be
@ -1577,6 +1549,21 @@ typedef struct {
byte* out_buf; /*!< resulting buffer after byte* out_buf; /*!< resulting buffer after
encryption/compression. This is a encryption/compression. This is a
pointer and not allocated. */ pointer and not allocated. */
/** Release the slot */
void release()
{
my_atomic_store32_explicit(&reserved, false,
MY_MEMORY_ORDER_RELAXED);
}
/** Acquire the slot
@return whether the slot was acquired */
bool acquire()
{
return !my_atomic_fas32_explicit(&reserved, true,
MY_MEMORY_ORDER_RELAXED);
}
} buf_tmp_buffer_t; } buf_tmp_buffer_t;
/** The common buffer control block structure /** The common buffer control block structure

View File

@ -340,9 +340,6 @@ struct fil_space_t {
corrupted page. */ corrupted page. */
bool is_corrupt; bool is_corrupt;
/*!< true if tablespace corrupted */ /*!< true if tablespace corrupted */
bool printed_compression_failure;
/*!< true if we have already printed
compression failure */
fil_space_crypt_t* crypt_data; fil_space_crypt_t* crypt_data;
/*!< tablespace crypt data or NULL */ /*!< tablespace crypt data or NULL */
ulint file_block_size; ulint file_block_size;

View File

@ -68,7 +68,6 @@ fil_get_page_type_name(
} }
return "PAGE TYPE CORRUPTED"; return "PAGE TYPE CORRUPTED";
} }
/****************************************************************//** /****************************************************************//**

View File

@ -1,6 +1,6 @@
/***************************************************************************** /*****************************************************************************
Copyright (C) 2013, 2017 MariaDB Corporation. All Rights Reserved. Copyright (C) 2013, 2018 MariaDB Corporation.
This program is free software; you can redistribute it and/or modify it under This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software the terms of the GNU General Public License as published by the Free Software
@ -30,70 +30,26 @@ atomic writes information to table space.
Created 11/12/2013 Jan Lindström jan.lindstrom@skysql.com Created 11/12/2013 Jan Lindström jan.lindstrom@skysql.com
***********************************************************************/ ***********************************************************************/
/*******************************************************************//** /** Compress a page_compressed page before writing to a data file.
Find out wheather the page is index page or not @param[in] buf page to be compressed
@return true if page type index page, false if not */ @param[out] out_buf compressed page
UNIV_INLINE @param[in] level compression level
ibool @param[in] block_size file system block size
fil_page_is_index_page( @param[in] encrypted whether the page will be subsequently encrypted
/*===================*/ @return actual length of compressed page
byte *buf); /*!< in: page */ @retval 0 if the page was not compressed */
UNIV_INTERN ulint fil_page_compress(const byte* buf, byte* out_buf, ulint level,
ulint block_size, bool encrypted)
MY_ATTRIBUTE((nonnull, warn_unused_result));
/****************************************************************//** /** Decompress a page that may be subject to page_compressed compression.
Get the name of the compression algorithm used for page @param[in,out] tmp_buf temporary buffer (of innodb_page_size)
compression. @param[in,out] buf compressed page buffer
@return compression algorithm name or "UNKNOWN" if not known*/ @return size of the compressed data
UNIV_INLINE @retval 0 if decompression failed
const char* @retval srv_page_size if the page was not compressed */
fil_get_compression_alg_name( UNIV_INTERN ulint fil_page_decompress(byte* tmp_buf, byte* buf)
/*=========================*/ MY_ATTRIBUTE((nonnull, warn_unused_result));
ulint comp_alg); /*!<in: compression algorithm number */
/****************************************************************//**
For page compressed pages compress the page before actual write
operation.
@return compressed page to be written*/
UNIV_INTERN
byte*
fil_compress_page(
/*==============*/
fil_space_t* space, /*!< in,out: tablespace (NULL during IMPORT) */
byte* buf, /*!< in: buffer from which to write; in aio
this must be appropriately aligned */
byte* out_buf, /*!< out: compressed buffer */
ulint len, /*!< in: length of input buffer.*/
ulint level, /* in: compression level */
ulint block_size, /*!< in: block size */
bool encrypted, /*!< in: is page also encrypted */
ulint* out_len); /*!< out: actual length of compressed
page */
/****************************************************************//**
For page compressed pages decompress the page after actual read
operation. */
UNIV_INTERN
void
fil_decompress_page(
/*================*/
byte* page_buf, /*!< in: preallocated buffer or NULL */
byte* buf, /*!< out: buffer from which to read; in aio
this must be appropriately aligned */
ulong len, /*!< in: length of output buffer.*/
ulint* write_size, /*!< in/out: Actual payload size of
the compressed data. */
bool return_error=false);
/*!< in: true if only an error should
be produced when decompression fails.
By default this parameter is false. */
/****************************************************************//**
Get space id from fil node
@return space id*/
UNIV_INTERN
ulint
fil_node_get_space_id(
/*==================*/
fil_node_t* node); /*!< in: Node where to get space id*/
/****************************************************************//** /****************************************************************//**
Get block size from fil node Get block size from fil node
@ -120,13 +76,4 @@ ibool
fil_page_is_compressed_encrypted( fil_page_is_compressed_encrypted(
/*=============================*/ /*=============================*/
byte* buf); /*!< in: page */ byte* buf); /*!< in: page */
/*******************************************************************//**
Find out wheather the page is page compressed with lzo method
@return true if page is page compressed with lzo method*/
UNIV_INLINE
ibool
fil_page_is_lzo_compressed(
/*=======================*/
byte* buf); /*!< in: page */
#endif #endif

View File

@ -1,6 +1,6 @@
/***************************************************************************** /*****************************************************************************
Copyright (C) 2013, 2017, MariaDB Corporation. All Rights Reserved. Copyright (C) 2013, 2018, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify it under This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software the terms of the GNU General Public License as published by the Free Software
@ -49,18 +49,6 @@ fsp_flags_get_atomic_writes(
return((atomic_writes_t)FSP_FLAGS_GET_ATOMIC_WRITES(flags)); return((atomic_writes_t)FSP_FLAGS_GET_ATOMIC_WRITES(flags));
} }
/*******************************************************************//**
Find out wheather the page is index page or not
@return true if page type index page, false if not */
UNIV_INLINE
ibool
fil_page_is_index_page(
/*===================*/
byte* buf) /*!< in: page */
{
return(mach_read_from_2(buf+FIL_PAGE_TYPE) == FIL_PAGE_INDEX);
}
/*******************************************************************//** /*******************************************************************//**
Find out wheather the page is page compressed Find out wheather the page is page compressed
@return true if page is page compressed, false if not */ @return true if page is page compressed, false if not */
@ -84,59 +72,3 @@ fil_page_is_compressed_encrypted(
{ {
return(mach_read_from_2(buf+FIL_PAGE_TYPE) == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED); return(mach_read_from_2(buf+FIL_PAGE_TYPE) == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED);
} }
/****************************************************************//**
Get the name of the compression algorithm used for page
compression.
@return compression algorithm name or "UNKNOWN" if not known*/
UNIV_INLINE
const char*
fil_get_compression_alg_name(
/*=========================*/
ulint comp_alg) /*!<in: compression algorithm number */
{
switch(comp_alg) {
case PAGE_UNCOMPRESSED:
return ("uncompressed");
break;
case PAGE_ZLIB_ALGORITHM:
return ("ZLIB");
break;
case PAGE_LZ4_ALGORITHM:
return ("LZ4");
break;
case PAGE_LZO_ALGORITHM:
return ("LZO");
break;
case PAGE_LZMA_ALGORITHM:
return ("LZMA");
break;
case PAGE_BZIP2_ALGORITHM:
return ("BZIP2");
break;
case PAGE_SNAPPY_ALGORITHM:
return ("SNAPPY");
break;
/* No default to get compiler warning */
}
return ("NULL");
}
#ifndef UNIV_INNOCHECKSUM
/*******************************************************************//**
Find out wheather the page is page compressed with lzo method
@return true if page is page compressed with lzo method, false if not */
UNIV_INLINE
ibool
fil_page_is_lzo_compressed(
/*=======================*/
byte* buf) /*!< in: page */
{
return((mach_read_from_2(buf+FIL_PAGE_TYPE) == FIL_PAGE_PAGE_COMPRESSED &&
mach_read_from_8(buf+FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION) == PAGE_LZO_ALGORITHM) ||
(mach_read_from_2(buf+FIL_PAGE_TYPE) == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED &&
mach_read_from_2(buf+FIL_PAGE_DATA+FIL_PAGE_COMPRESSED_SIZE) == PAGE_LZO_ALGORITHM));
}
#endif /* UNIV_INNOCHECKSUM */

View File

@ -42,6 +42,12 @@ Created 2012-02-08 by Sunny Bains.
#include "srv0start.h" #include "srv0start.h"
#include "row0quiesce.h" #include "row0quiesce.h"
#include "fil0pagecompress.h" #include "fil0pagecompress.h"
#ifdef HAVE_LZO
#include "lzo/lzo1x.h"
#endif
#ifdef HAVE_SNAPPY
#include "snappy-c.h"
#endif
#include <vector> #include <vector>
@ -3364,15 +3370,30 @@ fil_iterate(
os_offset_t offset; os_offset_t offset;
ulint n_bytes = iter.n_io_buffers * iter.page_size; ulint n_bytes = iter.n_io_buffers * iter.page_size;
const ulint buf_size = srv_page_size
#ifdef HAVE_LZO
+ LZO1X_1_15_MEM_COMPRESS
#elif defined HAVE_SNAPPY
+ snappy_max_compressed_length(srv_page_size)
#endif
;
byte* page_compress_buf = static_cast<byte*>(
ut_malloc_low(buf_size, false));
ut_ad(!srv_read_only_mode); ut_ad(!srv_read_only_mode);
if (!page_compress_buf) {
return DB_OUT_OF_MEMORY;
}
/* TODO: For ROW_FORMAT=COMPRESSED tables we do a lot of useless /* TODO: For ROW_FORMAT=COMPRESSED tables we do a lot of useless
copying for non-index pages. Unfortunately, it is copying for non-index pages. Unfortunately, it is
required by buf_zip_decompress() */ required by buf_zip_decompress() */
dberr_t err = DB_SUCCESS;
for (offset = iter.start; offset < iter.end; offset += n_bytes) { for (offset = iter.start; offset < iter.end; offset += n_bytes) {
if (callback.is_interrupted()) { if (callback.is_interrupted()) {
return DB_INTERRUPTED; err = DB_INTERRUPTED;
goto func_exit;
} }
byte* io_buffer = iter.io_buffer; byte* io_buffer = iter.io_buffer;
@ -3403,12 +3424,13 @@ fil_iterate(
if (!os_file_read_no_error_handling(iter.file, readptr, if (!os_file_read_no_error_handling(iter.file, readptr,
offset, n_bytes)) { offset, n_bytes)) {
ib_logf(IB_LOG_LEVEL_ERROR, "os_file_read() failed"); ib_logf(IB_LOG_LEVEL_ERROR, "os_file_read() failed");
return DB_IO_ERROR; err = DB_IO_ERROR;
goto func_exit;
} }
bool updated = false; bool updated = false;
ulint n_pages_read = (ulint) n_bytes / iter.page_size;
const ulint size = iter.page_size; const ulint size = iter.page_size;
ulint n_pages_read = ulint(n_bytes) / size;
block->page.offset = offset / size; block->page.offset = offset / size;
for (ulint i = 0; i < n_pages_read; for (ulint i = 0; i < n_pages_read;
@ -3437,11 +3459,11 @@ page_corrupted:
UINT64PF " looks corrupted.", UINT64PF " looks corrupted.",
callback.filename(), callback.filename(),
ulong(offset / size), offset); ulong(offset / size), offset);
return DB_CORRUPTION; err = DB_CORRUPTION;
goto func_exit;
} }
bool decrypted = false; bool decrypted = false;
dberr_t err = DB_SUCCESS;
byte* dst = io_buffer + (i * size); byte* dst = io_buffer + (i * size);
bool frame_changed = false; bool frame_changed = false;
ulint page_type = mach_read_from_2(src+FIL_PAGE_TYPE); ulint page_type = mach_read_from_2(src+FIL_PAGE_TYPE);
@ -3450,6 +3472,10 @@ page_corrupted:
== FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED
|| page_type == FIL_PAGE_PAGE_COMPRESSED; || page_type == FIL_PAGE_PAGE_COMPRESSED;
if (page_compressed && block->page.zip.data) {
goto page_corrupted;
}
if (!encrypted) { if (!encrypted) {
} else if (!mach_read_from_4( } else if (!mach_read_from_4(
FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION
@ -3475,7 +3501,7 @@ not_encrypted:
iter.page_size, src, &err); iter.page_size, src, &err);
if (err != DB_SUCCESS) { if (err != DB_SUCCESS) {
return err; goto func_exit;
} }
if (!decrypted) { if (!decrypted) {
@ -3488,8 +3514,12 @@ not_encrypted:
/* If the original page is page_compressed, we need /* If the original page is page_compressed, we need
to decompress it before adjusting further. */ to decompress it before adjusting further. */
if (page_compressed) { if (page_compressed) {
fil_decompress_page(NULL, dst, ulong(size), ulint compress_length = fil_page_decompress(
NULL); page_compress_buf, dst);
ut_ad(compress_length != srv_page_size);
if (compress_length == 0) {
goto page_corrupted;
}
updated = true; updated = true;
} else if (buf_page_is_corrupted( } else if (buf_page_is_corrupted(
false, false,
@ -3500,7 +3530,7 @@ not_encrypted:
} }
if ((err = callback(block)) != DB_SUCCESS) { if ((err = callback(block)) != DB_SUCCESS) {
return err; goto func_exit;
} else if (!updated) { } else if (!updated) {
updated = buf_block_get_state(block) updated = buf_block_get_state(block)
== BUF_BLOCK_FILE_PAGE; == BUF_BLOCK_FILE_PAGE;
@ -3550,19 +3580,17 @@ not_encrypted:
src = io_buffer + (i * size); src = io_buffer + (i * size);
if (page_compressed) { if (page_compressed) {
ulint len = 0;
fil_compress_page(
NULL,
src,
NULL,
size,
0,/* FIXME: compression level */
512,/* FIXME: use proper block size */
encrypted,
&len);
updated = true; updated = true;
if (fil_page_compress(
src,
page_compress_buf,
0,/* FIXME: compression level */
512,/* FIXME: proper block size */
encrypted)) {
/* FIXME: remove memcpy() */
memcpy(src, page_compress_buf,
srv_page_size);
}
} }
/* If tablespace is encrypted, encrypt page before we /* If tablespace is encrypted, encrypt page before we
@ -3599,11 +3627,14 @@ not_encrypted:
offset, (ulint) n_bytes)) { offset, (ulint) n_bytes)) {
ib_logf(IB_LOG_LEVEL_ERROR, "os_file_write() failed"); ib_logf(IB_LOG_LEVEL_ERROR, "os_file_write() failed");
return DB_IO_ERROR; err = DB_IO_ERROR;
goto func_exit;
} }
} }
return DB_SUCCESS; func_exit:
ut_free(page_compress_buf);
return err;
} }
/********************************************************************//** /********************************************************************//**