diff --git a/extra/innochecksum.cc b/extra/innochecksum.cc index 55d41fc04ad..dc49e3e0817 100644 --- a/extra/innochecksum.cc +++ b/extra/innochecksum.cc @@ -281,7 +281,7 @@ static void init_page_size(const byte* buf) const unsigned flags = mach_read_from_4(buf + FIL_PAGE_DATA + FSP_SPACE_FLAGS); - if (FSP_FLAGS_FCRC32_HAS_MARKER(flags)) { + if (fil_space_t::full_crc32(flags)) { srv_page_size = fil_space_t::logical_size(flags); physical_page_size = srv_page_size; return; @@ -461,7 +461,7 @@ is_page_corrupted( return (false); } - if (!zip_size) { + if (!zip_size && (!is_compressed || !use_full_crc32)) { /* check the stored log sequence numbers for uncompressed tablespace. */ logseq = mach_read_from_4(buf + FIL_PAGE_LSN + 4); @@ -613,8 +613,10 @@ static bool update_checksum(byte* page, ulint flags) } } else if (use_full_crc32) { - checksum = buf_calc_page_full_crc32(page); - byte* c = page + physical_page_size - FIL_PAGE_FCRC32_CHECKSUM; + ulint payload = buf_page_full_crc32_size(page, NULL, NULL) + - FIL_PAGE_FCRC32_CHECKSUM; + checksum = ut_crc32(page, payload); + byte* c = page + payload; if (mach_read_from_4(c) == checksum) return false; mach_write_to_4(c, checksum); if (is_log_enabled) { diff --git a/extra/mariabackup/ds_local.cc b/extra/mariabackup/ds_local.cc index eea4fcecdd8..90fb88ef381 100644 --- a/extra/mariabackup/ds_local.cc +++ b/extra/mariabackup/ds_local.cc @@ -178,7 +178,9 @@ static void init_ibd_data(ds_local_file_t *local_file, const uchar *buf, size_t ulint flags = mach_read_from_4(&buf[FIL_PAGE_DATA + FSP_SPACE_FLAGS]); ulint ssize = FSP_FLAGS_GET_PAGE_SSIZE(flags); local_file->pagesize= ssize == 0 ? UNIV_PAGE_SIZE_ORIG : ((UNIV_ZIP_SIZE_MIN >> 1) << ssize); - local_file->compressed = (my_bool)FSP_FLAGS_HAS_PAGE_COMPRESSION(flags); + local_file->compressed = fil_space_t::full_crc32(flags) + ? fil_space_t::is_compressed(flags) + : bool(FSP_FLAGS_HAS_PAGE_COMPRESSION(flags)); #if defined(_WIN32) && (MYSQL_VERSION_ID > 100200) /* Make compressed file sparse, on Windows. diff --git a/extra/mariabackup/fil_cur.cc b/extra/mariabackup/fil_cur.cc index 9678fcf5775..8dcb07d069d 100644 --- a/extra/mariabackup/fil_cur.cc +++ b/extra/mariabackup/fil_cur.cc @@ -361,7 +361,8 @@ static bool page_is_corrupted(const byte *page, ulint page_no, if (page_type == FIL_PAGE_PAGE_COMPRESSED || page_type == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED) { - ulint decomp = fil_page_decompress(tmp_frame, tmp_page); + ulint decomp = fil_page_decompress(tmp_frame, tmp_page, + space->flags); page_type = mach_read_from_2(tmp_page + FIL_PAGE_TYPE); return (!decomp diff --git a/mysql-test/suite/encryption/r/innodb-spatial-index,full_crc32.rdiff b/mysql-test/suite/encryption/r/innodb-spatial-index,full_crc32.rdiff index 56b0e0eac1b..02022c97aac 100644 --- a/mysql-test/suite/encryption/r/innodb-spatial-index,full_crc32.rdiff +++ b/mysql-test/suite/encryption/r/innodb-spatial-index,full_crc32.rdiff @@ -1,6 +1,6 @@ --- innodb-spatial-index.result +++ innodb-spatial-index.result -@@ -1,22 +1,25 @@ +@@ -1,23 +1,26 @@ CREATE TABLE t1 (pk INT PRIMARY KEY AUTO_INCREMENT, c VARCHAR(256), coordinate POINT NOT NULL, SPATIAL index(coordinate)) ENGINE=INNODB ENCRYPTED=YES; @@ -14,7 +14,8 @@ -Got one of the listed errors DROP TABLE t1; CREATE TABLE t1 (pk INT PRIMARY KEY AUTO_INCREMENT, - c VARCHAR(256), coordinate POINT NOT NULL) ENCRYPTED=YES ENGINE=INNODB; + c VARCHAR(256), coordinate POINT NOT NULL) + PAGE_COMPRESSED=YES, ENCRYPTED=YES ENGINE=INNODB; ALTER TABLE t1 ADD SPATIAL INDEX b1(coordinate), ALGORITHM=COPY; -Got one of the listed errors ALTER TABLE t1 ADD SPATIAL INDEX b2(coordinate), FORCE, ALGORITHM=INPLACE; diff --git a/mysql-test/suite/encryption/r/innodb-spatial-index,strict_full_crc32.rdiff b/mysql-test/suite/encryption/r/innodb-spatial-index,strict_full_crc32.rdiff index 56b0e0eac1b..02022c97aac 100644 --- a/mysql-test/suite/encryption/r/innodb-spatial-index,strict_full_crc32.rdiff +++ b/mysql-test/suite/encryption/r/innodb-spatial-index,strict_full_crc32.rdiff @@ -1,6 +1,6 @@ --- innodb-spatial-index.result +++ innodb-spatial-index.result -@@ -1,22 +1,25 @@ +@@ -1,23 +1,26 @@ CREATE TABLE t1 (pk INT PRIMARY KEY AUTO_INCREMENT, c VARCHAR(256), coordinate POINT NOT NULL, SPATIAL index(coordinate)) ENGINE=INNODB ENCRYPTED=YES; @@ -14,7 +14,8 @@ -Got one of the listed errors DROP TABLE t1; CREATE TABLE t1 (pk INT PRIMARY KEY AUTO_INCREMENT, - c VARCHAR(256), coordinate POINT NOT NULL) ENCRYPTED=YES ENGINE=INNODB; + c VARCHAR(256), coordinate POINT NOT NULL) + PAGE_COMPRESSED=YES, ENCRYPTED=YES ENGINE=INNODB; ALTER TABLE t1 ADD SPATIAL INDEX b1(coordinate), ALGORITHM=COPY; -Got one of the listed errors ALTER TABLE t1 ADD SPATIAL INDEX b2(coordinate), FORCE, ALGORITHM=INPLACE; diff --git a/mysql-test/suite/encryption/r/innodb-spatial-index.result b/mysql-test/suite/encryption/r/innodb-spatial-index.result index 674d5ad4b5c..66c3edcd109 100644 --- a/mysql-test/suite/encryption/r/innodb-spatial-index.result +++ b/mysql-test/suite/encryption/r/innodb-spatial-index.result @@ -8,7 +8,8 @@ ALTER TABLE t1 ENCRYPTED=YES; Got one of the listed errors DROP TABLE t1; CREATE TABLE t1 (pk INT PRIMARY KEY AUTO_INCREMENT, -c VARCHAR(256), coordinate POINT NOT NULL) ENCRYPTED=YES ENGINE=INNODB; +c VARCHAR(256), coordinate POINT NOT NULL) +PAGE_COMPRESSED=YES, ENCRYPTED=YES ENGINE=INNODB; ALTER TABLE t1 ADD SPATIAL INDEX b1(coordinate), ALGORITHM=COPY; Got one of the listed errors ALTER TABLE t1 ADD SPATIAL INDEX b2(coordinate), FORCE, ALGORITHM=INPLACE; diff --git a/mysql-test/suite/encryption/t/innodb-spatial-index.test b/mysql-test/suite/encryption/t/innodb-spatial-index.test index 0465225cd4f..2caffb141e1 100644 --- a/mysql-test/suite/encryption/t/innodb-spatial-index.test +++ b/mysql-test/suite/encryption/t/innodb-spatial-index.test @@ -45,7 +45,8 @@ DROP TABLE t1; # Index creation # CREATE TABLE t1 (pk INT PRIMARY KEY AUTO_INCREMENT, -c VARCHAR(256), coordinate POINT NOT NULL) ENCRYPTED=YES ENGINE=INNODB; +c VARCHAR(256), coordinate POINT NOT NULL) +PAGE_COMPRESSED=YES, ENCRYPTED=YES ENGINE=INNODB; # FIXME: MDEV-13851 Encrypted table refuses some form of ALGORITHM=COPY, # but allows rebuild by FORCE --error $error_code diff --git a/mysql-test/suite/mariabackup/encrypted_page_compressed.result b/mysql-test/suite/mariabackup/encrypted_page_compressed.result index 293addd2b03..30a69747954 100644 --- a/mysql-test/suite/mariabackup/encrypted_page_compressed.result +++ b/mysql-test/suite/mariabackup/encrypted_page_compressed.result @@ -1,4 +1,5 @@ call mtr.add_suppression("InnoDB: Table `test`.`t1` has an unreadable root page"); +call mtr.add_suppression("InnoDB: Encrypted page .* in file .*test.t1\\.ibd looks corrupted; key_version=1"); CREATE TABLE t1 (a INT AUTO_INCREMENT PRIMARY KEY, b TEXT, c char(200)) ENGINE=InnoDB page_compressed=yes encrypted=yes; insert into t1(b, c) values("mariadb", "mariabackup"); # Corrupt the table diff --git a/mysql-test/suite/mariabackup/encrypted_page_compressed.test b/mysql-test/suite/mariabackup/encrypted_page_compressed.test index b0bcdd9e33b..54fffb7d08f 100644 --- a/mysql-test/suite/mariabackup/encrypted_page_compressed.test +++ b/mysql-test/suite/mariabackup/encrypted_page_compressed.test @@ -1,5 +1,6 @@ source include/have_file_key_management.inc; call mtr.add_suppression("InnoDB: Table `test`.`t1` has an unreadable root page"); +call mtr.add_suppression("InnoDB: Encrypted page .* in file .*test.t1\\.ibd looks corrupted; key_version=1"); CREATE TABLE t1 (a INT AUTO_INCREMENT PRIMARY KEY, b TEXT, c char(200)) ENGINE=InnoDB page_compressed=yes encrypted=yes; insert into t1(b, c) values("mariadb", "mariabackup"); diff --git a/storage/innobase/buf/buf0buf.cc b/storage/innobase/buf/buf0buf.cc index b1f158525af..d5c7e317f92 100644 --- a/storage/innobase/buf/buf0buf.cc +++ b/storage/innobase/buf/buf0buf.cc @@ -35,6 +35,7 @@ Created 11/5/1995 Heikki Tuuri #include "mach0data.h" #include "buf0buf.h" #include "buf0checksum.h" +#include "ut0crc32.h" #include #ifndef UNIV_INNOCHECKSUM @@ -478,7 +479,8 @@ static bool buf_page_decrypt_after_read(buf_page_t* bpage, fil_space_t* space) byte* dst_frame = bpage->zip.data ? bpage->zip.data : ((buf_block_t*) bpage)->frame; - bool page_compressed = fil_page_is_compressed(dst_frame); + bool page_compressed = space->is_compressed() + && buf_page_is_compressed(dst_frame, space->flags); buf_pool_t* buf_pool = buf_pool_from_bpage(bpage); if (bpage->id.page_no() == 0) { @@ -493,22 +495,31 @@ static bool buf_page_decrypt_after_read(buf_page_t* bpage, fil_space_t* space) buf_tmp_buffer_t* slot; uint key_version = buf_page_get_key_version(dst_frame, space->flags); - if (page_compressed) { + if (page_compressed && !key_version) { /* the page we read is unencrypted */ /* Find free slot from temporary memory array */ decompress: + if (space->full_crc32() + && buf_page_is_corrupted(true, dst_frame, space->flags)) { + return false; + } + slot = buf_pool_reserve_tmp_slot(buf_pool); /* For decompression, use crypt_buf. */ buf_tmp_reserve_crypt_buf(slot); + decompress_with_slot: ut_d(fil_page_type_validate(space, dst_frame)); - bpage->write_size = fil_page_decompress(slot->crypt_buf, - dst_frame); + bpage->write_size = fil_page_decompress( + slot->crypt_buf, dst_frame, space->flags); slot->release(); - ut_ad(!bpage->write_size || fil_page_type_validate(space, dst_frame)); + ut_ad(!bpage->write_size + || fil_page_type_validate(space, dst_frame)); + ut_ad(space->pending_io()); + return bpage->write_size != 0; } @@ -545,7 +556,8 @@ decrypt_failed: ut_d(fil_page_type_validate(space, dst_frame)); - if (fil_page_is_compressed_encrypted(dst_frame)) { + if ((space->full_crc32() && page_compressed) + || fil_page_is_compressed_encrypted(dst_frame)) { goto decompress_with_slot; } @@ -882,19 +894,6 @@ buf_page_is_checksum_valid_none( && checksum_field1 == BUF_NO_CHECKSUM_MAGIC); } -/** Checks if the page is in full crc32 checksum format. -@param[in] read_buf database page -@param[in] checksum_field checksum field -@return true if the page is in full crc32 checksum format. */ -bool buf_page_is_checksum_valid_full_crc32( - const byte* read_buf, - size_t checksum_field) -{ - const uint32_t full_crc32 = buf_calc_page_full_crc32(read_buf); - - return checksum_field == full_crc32; -} - /** Checks whether the lsn present in the page is lesser than the peek current lsn. @param[in] check_lsn lsn to check @@ -934,6 +933,22 @@ static void buf_page_check_lsn(bool check_lsn, const byte* read_buf) #endif /* !UNIV_INNOCHECKSUM */ } +/** Check if a page is all zeroes. +@param[in] read_buf database page +@param[in] page_size page frame size +@return whether the page is all zeroes */ +bool buf_page_is_zeroes(const void* read_buf, size_t page_size) +{ + const ulint* b = reinterpret_cast(read_buf); + const ulint* const e = b + page_size / sizeof *b; + do { + if (*b++) { + return false; + } + } while (b != e); + return true; +} + /** Check if a page is corrupt. @param[in] check_lsn whether the LSN should be checked @param[in] read_buf database page @@ -949,14 +964,18 @@ buf_page_is_corrupted( #ifndef UNIV_INNOCHECKSUM DBUG_EXECUTE_IF("buf_page_import_corrupt_failure", return(true); ); #endif - if (FSP_FLAGS_FCRC32_HAS_MARKER(fsp_flags)) { - const byte* end = read_buf + srv_page_size; - uint crc32 = mach_read_from_4(end - FIL_PAGE_FCRC32_CHECKSUM); + if (fil_space_t::full_crc32(fsp_flags)) { + bool compressed = false, corrupted = false; + const uint size = buf_page_full_crc32_size( + read_buf, &compressed, &corrupted); + if (corrupted) { + return true; + } + const byte* end = read_buf + (size - FIL_PAGE_FCRC32_CHECKSUM); + uint crc32 = mach_read_from_4(end); - if (!crc32) { - const byte* b = read_buf; - while (b != end) if (*b++) goto nonzero; - /* An all-zero page is not corrupted. */ + if (!crc32 && size == srv_page_size + && buf_page_is_zeroes(read_buf, size)) { return false; } @@ -967,14 +986,17 @@ buf_page_is_corrupted( crc32++; } }); -nonzero: - if (!buf_page_is_checksum_valid_full_crc32(read_buf, crc32)) { + + if (crc32 != ut_crc32(read_buf, + size - FIL_PAGE_FCRC32_CHECKSUM)) { return true; } - - if (!mach_read_from_4(read_buf + FIL_PAGE_FCRC32_KEY_VERSION) + if (!compressed + && !mach_read_from_4(FIL_PAGE_FCRC32_KEY_VERSION + + read_buf) && memcmp(read_buf + (FIL_PAGE_LSN + 4), - end - FIL_PAGE_FCRC32_END_LSN, 4)) { + end - (FIL_PAGE_FCRC32_END_LSN + - FIL_PAGE_FCRC32_CHECKSUM), 4)) { return true; } @@ -3962,7 +3984,6 @@ buf_zip_decompress( << ", none: " << page_zip_calc_checksum( frame, size, SRV_CHECKSUM_ALGORITHM_NONE); - goto err_exit; } @@ -5846,13 +5867,16 @@ buf_mark_space_corrupt(buf_page_t* bpage, const fil_space_t* space) /** Check if the encrypted page is corrupted for the full crc32 format. @param[in] space_id page belongs to space id @param[in] dst_frame page +@param[in] is_compressed compressed page @return true if page is corrupted or false if it isn't */ -static bool buf_encrypted_full_crc32_page_is_corrupted( +static bool buf_page_full_crc32_is_corrupted( ulint space_id, - const byte* dst_frame) + const byte* dst_frame, + bool is_compressed) { - if (memcmp(dst_frame + FIL_PAGE_LSN + 4, - dst_frame + srv_page_size - FIL_PAGE_FCRC32_END_LSN, 4)) { + if (!is_compressed + && memcmp(dst_frame + FIL_PAGE_LSN + 4, + dst_frame + srv_page_size - FIL_PAGE_FCRC32_END_LSN, 4)) { return true; } @@ -5900,9 +5924,12 @@ static dberr_t buf_page_check_corrupt(buf_page_t* bpage, fil_space_t* space) if (!still_encrypted) { /* If traditional checksums match, we assume that page is not anymore encrypted. */ - if (key_version && space->full_crc32()) { - corrupted = buf_encrypted_full_crc32_page_is_corrupted( - space->id, dst_frame); + if (space->full_crc32() + && !buf_page_is_zeroes(dst_frame, space->physical_size()) + && (key_version || space->is_compressed())) { + corrupted = buf_page_full_crc32_is_corrupted( + space->id, dst_frame, + space->is_compressed()); } else { corrupted = buf_page_is_corrupted( true, dst_frame, space->flags); @@ -7374,7 +7401,7 @@ buf_page_encrypt( && (!crypt_data->is_default_encryption() || srv_encrypt_tables); - bool page_compressed = FSP_FLAGS_HAS_PAGE_COMPRESSION(space->flags); + bool page_compressed = space->is_compressed(); if (!encrypted && !page_compressed) { /* No need to encrypt or page compress the page. @@ -7398,6 +7425,19 @@ buf_page_encrypt( buf_tmp_reserve_crypt_buf(slot); byte *dst_frame = slot->crypt_buf; + const bool full_crc32 = space->full_crc32(); + + if (full_crc32) { + /* Write LSN for the full crc32 checksum before + encryption. Because lsn is one of the input for encryption. */ + mach_write_to_8(src_frame + FIL_PAGE_LSN, + bpage->newest_modification); + if (!page_compressed) { + mach_write_to_4( + src_frame + srv_page_size - FIL_PAGE_FCRC32_END_LSN, + (ulint) bpage->newest_modification); + } + } if (!page_compressed) { not_compressed: @@ -7427,6 +7467,18 @@ not_compressed: bpage->real_size = out_len; + if (full_crc32) { + ut_d(bool compressed = false); + out_len = buf_page_full_crc32_size(tmp, +#ifdef UNIV_DEBUG + &compressed, +#else + NULL, +#endif + NULL); + ut_ad(compressed); + } + /* Workaround for MDEV-15527. */ memset(tmp + out_len, 0 , srv_page_size - out_len); ut_d(fil_page_type_validate(space, tmp)); @@ -7440,6 +7492,13 @@ not_compressed: dst_frame); } + if (full_crc32) { + compile_time_assert(FIL_PAGE_FCRC32_CHECKSUM == 4); + mach_write_to_4(tmp + out_len - 4, + ut_crc32(tmp, out_len - 4)); + ut_ad(!buf_page_is_corrupted(true, tmp, space->flags)); + } + slot->out_buf = dst_frame = tmp; } diff --git a/storage/innobase/buf/buf0checksum.cc b/storage/innobase/buf/buf0checksum.cc index 95455cb199b..c5ad0cfb657 100644 --- a/storage/innobase/buf/buf0checksum.cc +++ b/storage/innobase/buf/buf0checksum.cc @@ -1,7 +1,7 @@ /***************************************************************************** Copyright (c) 1995, 2016, Oracle and/or its affiliates. All Rights Reserved. -Copyright (c) 2017, MariaDB Corporation. +Copyright (c) 2017, 2019, MariaDB Corporation. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software @@ -102,14 +102,6 @@ buf_calc_page_old_checksum(const byte* page) (ut_fold_binary(page, FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION))); } -/** Calculate the CRC32 checksum for the whole page. -@param[in] page buffer page (srv_page_size bytes) -@return CRC32 value */ -uint32_t buf_calc_page_full_crc32(const byte* page) -{ - return ut_crc32(page, srv_page_size - FIL_PAGE_FCRC32_CHECKSUM); -} - /** Return a printable string describing the checksum algorithm. @param[in] algo algorithm @return algorithm name */ diff --git a/storage/innobase/buf/buf0dblwr.cc b/storage/innobase/buf/buf0dblwr.cc index 7dd4eeb6be3..744b0cb5060 100644 --- a/storage/innobase/buf/buf0dblwr.cc +++ b/storage/innobase/buf/buf0dblwr.cc @@ -335,20 +335,6 @@ too_small: goto start_again; } -/** Check if a page is all zeroes. -@param[in] read_buf database page -@param[in] page_size page frame size -@return whether the page is all zeroes */ -static bool buf_page_is_zeroes(const byte* read_buf, size_t page_size) -{ - for (ulint i = 0; i < page_size; i++) { - if (read_buf[i] != 0) { - return false; - } - } - return true; -} - /** At database startup initializes the doublewrite buffer memory structure if we already have a doublewrite buffer created in the data files. If we are @@ -620,7 +606,8 @@ buf_dblwr_process() } else { /* Decompress the page before validating the checksum. */ - ulint decomp = fil_page_decompress(buf, read_buf); + ulint decomp = fil_page_decompress(buf, read_buf, + space->flags); if (!decomp || (zip_size && decomp != srv_page_size)) { goto bad; } @@ -648,7 +635,7 @@ bad: << " from the doublewrite buffer."; } - ulint decomp = fil_page_decompress(buf, page); + ulint decomp = fil_page_decompress(buf, page, space->flags); if (!decomp || (zip_size && decomp != srv_page_size)) { goto bad_doublewrite; } @@ -801,52 +788,42 @@ buf_dblwr_update( } } -/********************************************************************//** -Check the LSN values on the page. */ -static -void -buf_dblwr_check_page_lsn( -/*=====================*/ - const page_t* page) /*!< in: page to check */ +#ifdef UNIV_DEBUG +/** Check the LSN values on the page. +@param[in] page page to check +@param[in] s tablespace */ +static void buf_dblwr_check_page_lsn(const page_t* page, const fil_space_t& s) { - ibool page_compressed = (mach_read_from_2(page+FIL_PAGE_TYPE) == FIL_PAGE_PAGE_COMPRESSED); - uint key_version = mach_read_from_4(page + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION); - /* Ignore page compressed or encrypted pages */ - if (page_compressed || key_version) { + if (s.is_compressed() + || buf_page_get_key_version(page, s.flags)) { return; } - bool lsn_mismatch = false; - - // MDEV-12026 FIXME: invoke fil_space_t::full_crc32() - if (memcmp(page + (FIL_PAGE_LSN + 4), - page + (srv_page_size - - FIL_PAGE_END_LSN_OLD_CHKSUM + 4), - 4)) { - if (memcmp(page + (FIL_PAGE_LSN + 4), - page + (srv_page_size - - FIL_PAGE_FCRC32_END_LSN), - 4)) { - lsn_mismatch = true; - } - } - - if (lsn_mismatch) { - // MDEV-12026 FIXME: lsn2 depends on fil_space_t::full_crc32()! - const ulint lsn1 = mach_read_from_4( - page + FIL_PAGE_LSN + 4); - const ulint lsn2 = mach_read_from_4( - page + srv_page_size - FIL_PAGE_END_LSN_OLD_CHKSUM - + 4); - - ib::error() << "The page to be written seems corrupt!" + const unsigned lsn1 = mach_read_from_4(page + FIL_PAGE_LSN + 4), + lsn2 = mach_read_from_4(page + srv_page_size + - (s.full_crc32() + ? FIL_PAGE_FCRC32_END_LSN + : FIL_PAGE_END_LSN_OLD_CHKSUM - 4)); + if (UNIV_UNLIKELY(lsn1 != lsn2)) { + ib::error() << "The page to be written to " + << s.chain.start->name << + " seems corrupt!" " The low 4 bytes of LSN fields do not match" " (" << lsn1 << " != " << lsn2 << ")!" " Noticed in the buffer pool."; } } +static void buf_dblwr_check_page_lsn(const buf_page_t& b, const byte* page) +{ + if (fil_space_t* space = fil_space_acquire_for_io(b.id.space())) { + buf_dblwr_check_page_lsn(page, *space); + space->release_for_io(); + } +} +#endif /* UNIV_DEBUG */ + /********************************************************************//** Asserts when a corrupt block is find during writing out data to the disk. */ @@ -962,8 +939,7 @@ buf_dblwr_write_block_to_datafile( const_cast(bpage)); ut_a(buf_block_get_state(block) == BUF_BLOCK_FILE_PAGE); - buf_dblwr_check_page_lsn(block->frame); - + ut_d(buf_dblwr_check_page_lsn(block->page, block->frame)); fil_io(request, sync, bpage->id, bpage->zip_size(), 0, bpage->real_size, frame, block); @@ -1056,10 +1032,7 @@ try_again: /* Check that the actual page in the buffer pool is not corrupt and the LSN values are sane. */ buf_dblwr_check_block(block); - - /* Check that the page as written to the doublewrite - buffer has sane LSN values. */ - buf_dblwr_check_page_lsn(write_buf + len2); + ut_d(buf_dblwr_check_page_lsn(block->page, write_buf + len2)); } /* Write out the first block of the doublewrite buffer */ @@ -1239,8 +1212,8 @@ buf_dblwr_write_single_page( /* Check that the page as written to the doublewrite buffer has sane LSN values. */ if (!bpage->zip.data) { - buf_dblwr_check_page_lsn( - ((buf_block_t*) bpage)->frame); + ut_d(buf_dblwr_check_page_lsn( + *bpage, ((buf_block_t*) bpage)->frame)); } } diff --git a/storage/innobase/buf/buf0flu.cc b/storage/innobase/buf/buf0flu.cc index 65f1860cb2f..37bfed420c0 100644 --- a/storage/innobase/buf/buf0flu.cc +++ b/storage/innobase/buf/buf0flu.cc @@ -728,9 +728,9 @@ void buf_flush_write_complete(buf_page_t* bpage, bool dblwr) /** Calculate the checksum of a page from compressed table and update the page. -@param[in,out] page page to update -@param[in] size compressed page size -@param[in] lsn LSN to stamp on the page */ +@param[in,out] page page to update +@param[in] size compressed page size +@param[in] lsn LSN to stamp on the page */ void buf_flush_update_zip_checksum( buf_frame_t* page, @@ -751,9 +751,15 @@ buf_flush_update_zip_checksum( @param[in,out] page page to be updated */ void buf_flush_assign_full_crc32_checksum(byte* page) { - uint32_t checksum = buf_calc_page_full_crc32(page); - mach_write_to_4(page + srv_page_size - FIL_PAGE_FCRC32_CHECKSUM, - checksum); + ut_d(bool compressed = false); + ut_d(bool corrupted = false); + ut_d(const uint size = buf_page_full_crc32_size(page, &compressed, + &corrupted)); + ut_ad(!compressed); + ut_ad(!corrupted); + ut_ad(size == uint(srv_page_size)); + const ulint payload = srv_page_size - FIL_PAGE_FCRC32_CHECKSUM; + mach_write_to_4(page + payload, ut_crc32(page, payload)); } /** Initialize a page for writing to the tablespace. @@ -776,8 +782,6 @@ buf_flush_init_for_writing( /* If page is encrypted in full crc32 format then checksum stored already as a part of fil_encrypt_buf() */ ut_ad(use_full_checksum); - ut_ad(mach_read_from_4( - page + FIL_PAGE_FCRC32_KEY_VERSION)); return; } diff --git a/storage/innobase/fil/fil0crypt.cc b/storage/innobase/fil/fil0crypt.cc index b3c73785b51..7a6d21dba13 100644 --- a/storage/innobase/fil/fil0crypt.cc +++ b/storage/innobase/fil/fil0crypt.cc @@ -565,7 +565,7 @@ static byte* fil_encrypt_buf_for_non_full_checksum( uint header_len = FIL_PAGE_DATA; if (page_compressed) { - header_len += (FIL_PAGE_COMPRESSED_SIZE + FIL_PAGE_COMPRESSION_METHOD_SIZE); + header_len += FIL_PAGE_ENCRYPT_COMP_METADATA_LEN; } /* FIL page header is not encrypted */ @@ -627,7 +627,7 @@ static byte* fil_encrypt_buf_for_non_full_checksum( @param[in] src_frame Page to encrypt @param[in,out] dst_frame Output buffer @return encrypted buffer or NULL */ -static byte* fil_encrypt_buf_for_full_checksum( +static byte* fil_encrypt_buf_for_full_crc32( fil_space_crypt_t* crypt_data, ulint space, ulint offset, @@ -635,8 +635,16 @@ static byte* fil_encrypt_buf_for_full_checksum( const byte* src_frame, byte* dst_frame) { - const uint size = uint(srv_page_size); uint key_version = fil_crypt_get_latest_key_version(crypt_data); + ut_d(bool corrupted = false); + const uint size = buf_page_full_crc32_size(src_frame, NULL, +#ifdef UNIV_DEBUG + &corrupted +#else + NULL +#endif + ); + ut_ad(!corrupted); uint srclen = size - (FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION + FIL_PAGE_FCRC32_CHECKSUM); const byte* src = src_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION; @@ -657,8 +665,10 @@ static byte* fil_encrypt_buf_for_full_checksum( ut_a(rc == MY_AES_OK); ut_a(dstlen == srclen); - ib_uint32_t checksum = buf_calc_page_full_crc32(dst_frame); - mach_write_to_4(dst_frame + size - FIL_PAGE_FCRC32_CHECKSUM, checksum); + const ulint payload = size - FIL_PAGE_FCRC32_CHECKSUM; + mach_write_to_4(dst_frame + payload, ut_crc32(dst_frame, payload)); + /* Clean the rest of the buffer. FIXME: Punch holes when writing! */ + memset(dst_frame + (payload + 4), 0, srv_page_size - (payload + 4)); srv_stats.pages_encrypted.inc(); @@ -689,14 +699,14 @@ fil_encrypt_buf( bool use_full_checksum) { if (use_full_checksum) { - return fil_encrypt_buf_for_full_checksum( - crypt_data, space, offset, - lsn, src_frame, dst_frame); + return fil_encrypt_buf_for_full_crc32( + crypt_data, space, offset, + lsn, src_frame, dst_frame); } return fil_encrypt_buf_for_non_full_checksum( - crypt_data, space, offset, lsn, - src_frame, zip_size, dst_frame); + crypt_data, space, offset, lsn, + src_frame, zip_size, dst_frame); } /** Check whether these page types are allowed to encrypt. @@ -750,15 +760,6 @@ fil_space_encrypt( const bool full_crc32 = space->full_crc32(); - if (full_crc32) { - /* Write LSN for the full crc32 checksum before - encryption. Because lsn is one of the input for encryption. */ - mach_write_to_8(src_frame + FIL_PAGE_LSN, lsn); - mach_write_to_4( - src_frame + srv_page_size - FIL_PAGE_FCRC32_END_LSN, - (ulint) lsn); - } - byte* tmp = fil_encrypt_buf(crypt_data, space->id, offset, lsn, src_frame, zip_size, dst_frame, full_crc32); @@ -768,42 +769,51 @@ fil_space_encrypt( /* Verify that encrypted buffer is not corrupted */ dberr_t err = DB_SUCCESS; byte* src = src_frame; - bool page_compressed_encrypted = (mach_read_from_2(tmp+FIL_PAGE_TYPE) == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED); - byte uncomp_mem[UNIV_PAGE_SIZE_MAX]; byte tmp_mem[UNIV_PAGE_SIZE_MAX]; - if (page_compressed_encrypted) { - memcpy(uncomp_mem, src, srv_page_size); - ulint unzipped1 = fil_page_decompress( - tmp_mem, uncomp_mem); - ut_ad(unzipped1); - if (unzipped1 != srv_page_size) { - src = uncomp_mem; - } - } - - ut_ad(full_crc32 - || !buf_page_is_corrupted(true, src, space->flags)); - - ut_ad(fil_space_decrypt(space->id, crypt_data, tmp_mem, - space->physical_size(), space->flags, - tmp, &err)); - ut_ad(err == DB_SUCCESS); - - /* Need to decompress the page if it was also compressed */ - if (page_compressed_encrypted) { - byte buf[UNIV_PAGE_SIZE_MAX]; - memcpy(buf, tmp_mem, srv_page_size); - ulint unzipped2 = fil_page_decompress(tmp_mem, buf); - ut_ad(unzipped2); - } - if (full_crc32) { + bool compressed = false, corrupted = false; + uint size = buf_page_full_crc32_size( + tmp, &compressed, &corrupted); + ut_ad(!corrupted); + ut_ad(!compressed == (size == srv_page_size)); + ut_ad(fil_space_decrypt(space->id, crypt_data, tmp_mem, + size, space->flags, tmp, + &err)); + ut_ad(err == DB_SUCCESS); memcpy(tmp_mem, src, FIL_PAGE_OFFSET); ut_ad(!memcmp(src, tmp_mem, - (space->physical_size() - - FIL_PAGE_FCRC32_CHECKSUM))); + (size - FIL_PAGE_FCRC32_CHECKSUM))); } else { + bool page_compressed_encrypted = + (mach_read_from_2(tmp+FIL_PAGE_TYPE) + == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED); + byte uncomp_mem[UNIV_PAGE_SIZE_MAX]; + + if (page_compressed_encrypted) { + memcpy(uncomp_mem, src, srv_page_size); + ulint unzipped1 = fil_page_decompress( + tmp_mem, uncomp_mem, space->flags); + ut_ad(unzipped1); + if (unzipped1 != srv_page_size) { + src = uncomp_mem; + } + } + + ut_ad(!buf_page_is_corrupted(true, src, space->flags)); + + ut_ad(fil_space_decrypt(space->id, crypt_data, tmp_mem, + space->physical_size(), + space->flags, tmp, &err)); + ut_ad(err == DB_SUCCESS); + + if (page_compressed_encrypted) { + memcpy(tmp_mem, uncomp_mem, srv_page_size); + ulint unzipped2 = fil_page_decompress( + uncomp_mem, tmp_mem, space->flags); + ut_ad(unzipped2); + } + memcpy(tmp_mem + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION, src + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION, 8); ut_ad(!memcmp(src, tmp_mem, space->physical_size())); @@ -821,7 +831,7 @@ fil_space_encrypt( @param[in,out] src_frame Page to decrypt @param[out] err DB_SUCCESS or DB_DECRYPTION_FAILED @return true if page decrypted, false if not.*/ -static bool fil_space_decrypt_for_full_checksum( +static bool fil_space_decrypt_full_crc32( ulint space, fil_space_crypt_t* crypt_data, byte* tmp_frame, @@ -838,7 +848,8 @@ static bool fil_space_decrypt_for_full_checksum( return false; } - ut_a(crypt_data != NULL && crypt_data->is_encrypted()); + ut_ad(crypt_data); + ut_ad(crypt_data->is_encrypted()); memcpy(tmp_frame, src_frame, FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION); @@ -846,9 +857,16 @@ static bool fil_space_decrypt_for_full_checksum( const byte* src = src_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION; byte* dst = tmp_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION; uint dstlen = 0; - uint srclen = uint(srv_page_size) - - (FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION - + FIL_PAGE_FCRC32_CHECKSUM); + bool corrupted = false; + uint size = buf_page_full_crc32_size(src_frame, NULL, &corrupted); + if (UNIV_UNLIKELY(corrupted)) { +fail: + *err = DB_DECRYPTION_FAILED; + return false; + } + + uint srclen = size - (FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION + + FIL_PAGE_FCRC32_CHECKSUM); int rc = encryption_scheme_decrypt(src, srclen, dst, &dstlen, crypt_data, key_version, @@ -856,8 +874,7 @@ static bool fil_space_decrypt_for_full_checksum( if (rc != MY_AES_OK || dstlen != srclen) { if (rc == -1) { - *err = DB_DECRYPTION_FAILED; - return false; + goto fail; } ib::fatal() << "Unable to decrypt data-block " @@ -913,8 +930,7 @@ static bool fil_space_decrypt_for_non_full_checksum( uint header_len = FIL_PAGE_DATA; if (page_compressed) { - header_len += (FIL_PAGE_COMPRESSED_SIZE - + FIL_PAGE_COMPRESSION_METHOD_SIZE); + header_len += FIL_PAGE_ENCRYPT_COMP_METADATA_LEN; } /* Copy FIL page header, it is not encrypted */ @@ -985,7 +1001,7 @@ fil_space_decrypt( dberr_t* err) { if (fil_space_t::full_crc32(fsp_flags)) { - return fil_space_decrypt_for_full_checksum( + return fil_space_decrypt_full_crc32( space_id, crypt_data, tmp_frame, src_frame, err); } diff --git a/storage/innobase/fil/fil0fil.cc b/storage/innobase/fil/fil0fil.cc index eed123d865e..639c8dcb23f 100644 --- a/storage/innobase/fil/fil0fil.cc +++ b/storage/innobase/fil/fil0fil.cc @@ -418,6 +418,42 @@ fil_space_is_flushed( return(true); } +/** Validate the compression algorithm for full crc32 format. +@param[in] space tablespace object +@return whether the compression algorithm support */ +static bool fil_comp_algo_validate(const fil_space_t* space) +{ + if (!space->full_crc32()) { + return true; + } + + DBUG_EXECUTE_IF("fil_comp_algo_validate_fail", + return false;); + + ulint comp_algo = space->get_compression_algo(); + switch (comp_algo) { + case PAGE_UNCOMPRESSED: + case PAGE_ZLIB_ALGORITHM: +#ifdef HAVE_LZ4 + case PAGE_LZ4_ALGORITHM: +#endif /* HAVE_LZ4 */ +#ifdef HAVE_LZO + case PAGE_LZO_ALGORITHM: +#endif /* HAVE_LZO */ +#ifdef HAVE_LZMA + case PAGE_LZMA_ALGORITHM: +#endif /* HAVE_LZMA */ +#ifdef HAVE_BZIP2 + case PAGE_BZIP2_ALGORITHM: +#endif /* HAVE_BZIP2 */ +#ifdef HAVE_SNAPPY + case PAGE_SNAPPY_ALGORITHM: +#endif /* HAVE_SNAPPY */ + return true; + } + + return false; +} /** Append a file to the chain of files of a space. @param[in] name file name of a file that is not open @@ -623,10 +659,16 @@ retry: } if (!node->read_page0(first_time_open)) { +fail: os_file_close(node->handle); node->handle = OS_FILE_CLOSED; return false; } + + if (first_time_open && !fil_comp_algo_validate(space)) { + goto fail; + } + } else if (space->purpose == FIL_TYPE_LOG) { node->handle = os_file_create( innodb_log_file_key, node->name, OS_FILE_OPEN, diff --git a/storage/innobase/fil/fil0pagecompress.cc b/storage/innobase/fil/fil0pagecompress.cc index c2f93ff0090..63e9048aee3 100644 --- a/storage/innobase/fil/fil0pagecompress.cc +++ b/storage/innobase/fil/fil0pagecompress.cc @@ -73,49 +73,24 @@ Updated 14/02/2015 #include "snappy-c.h" #endif -/** Compress a page_compressed page before writing to a data file. +/** Compress a page for the given compression algorithm. @param[in] buf page to be compressed @param[out] out_buf compressed page -@param[in] flags tablespace flags -@param[in] block_size file system block size -@param[in] encrypted whether the page will be subsequently encrypted -@return actual length of compressed page -@retval 0 if the page was not compressed */ -ulint fil_page_compress( +@param[in] header_len header length of the page +@param[in] comp_algo compression algorithm +@param[in] comp_level compression level +@return actual length of compressed page data +@retval 0 if the page was not compressed */ +static ulint fil_page_compress_low( const byte* buf, byte* out_buf, - ulint flags, - ulint block_size, - bool encrypted) + ulint header_len, + ulint comp_algo, + ulint comp_level) { - int comp_level = int(fsp_flags_get_page_compression_level(flags)); - ulint header_len = FIL_PAGE_DATA + FIL_PAGE_COMPRESSED_SIZE; - /* Cache to avoid change during function execution */ - ulint comp_method = innodb_compression_algorithm; - - if (encrypted) { - header_len += FIL_PAGE_COMPRESSION_METHOD_SIZE; - } - - /* Let's not compress file space header or - extent descriptor */ - switch (fil_page_get_type(buf)) { - case 0: - case FIL_PAGE_TYPE_FSP_HDR: - case FIL_PAGE_TYPE_XDES: - case FIL_PAGE_PAGE_COMPRESSED: - return 0; - } - - /* If no compression level was provided to this table, use system - default level */ - if (comp_level == 0) { - comp_level = int(page_zip_level); - } - ulint write_size = srv_page_size - header_len; - switch (comp_method) { + switch (comp_algo) { default: ut_ad(!"unknown compression method"); /* fall through */ @@ -125,10 +100,9 @@ ulint fil_page_compress( { ulong len = uLong(write_size); if (Z_OK == compress2( - out_buf + header_len, &len, - buf, uLong(srv_page_size), comp_level)) { - write_size = len; - goto success; + out_buf + header_len, &len, buf, + uLong(srv_page_size), int(comp_level))) { + return len; } } break; @@ -146,10 +120,7 @@ ulint fil_page_compress( int(srv_page_size), int(write_size)); # endif - if (write_size) { - goto success; - } - break; + return write_size; #endif /* HAVE_LZ4 */ #ifdef HAVE_LZO case PAGE_LZO_ALGORITHM: { @@ -160,8 +131,7 @@ ulint fil_page_compress( out_buf + header_len, &len, out_buf + srv_page_size) && len <= write_size) { - write_size = len; - goto success; + return len; } break; } @@ -175,8 +145,7 @@ ulint fil_page_compress( buf, srv_page_size, out_buf + header_len, &out_pos, write_size) && out_pos <= write_size) { - write_size = out_pos; - goto success; + return out_pos; } break; } @@ -192,8 +161,7 @@ ulint fil_page_compress( reinterpret_cast(buf)), unsigned(srv_page_size), 1, 0, 0) && len <= write_size) { - write_size = len; - goto success; + return len; } break; } @@ -209,62 +177,188 @@ ulint fil_page_compress( reinterpret_cast(out_buf) + header_len, &len) && len <= write_size) { - write_size = len; - goto success; + return len; } break; } #endif /* HAVE_SNAPPY */ } - srv_stats.pages_page_compression_error.inc(); return 0; -success: +} + +/** Compress a page_compressed page for full crc32 format. +@param[in] buf page to be compressed +@param[out] out_buf compressed page +@param[in] flags tablespace flags +@param[in] block_size file system block size +@return actual length of compressed page +@retval 0 if the page was not compressed */ +static ulint fil_page_compress_for_full_crc32( + const byte* buf, + byte* out_buf, + ulint flags, + ulint block_size, + bool encrypted) +{ + ulint comp_level = fsp_flags_get_page_compression_level(flags); + + if (comp_level == 0) { + comp_level = page_zip_level; + } + + const ulint header_len = FIL_PAGE_COMP_ALGO; + + ulint write_size = fil_page_compress_low( + buf, out_buf, header_len, + fil_space_t::get_compression_algo(flags), comp_level); + + if (write_size == 0) { +fail: + srv_stats.pages_page_compression_error.inc(); + return 0; + } + + write_size += header_len; + const ulint actual_size = write_size; + /* Write the actual length of the data & page type + for full crc32 format. */ + const bool lsb = fil_space_t::full_crc32_page_compressed_len(flags); + /* In the MSB, store the rounded-up page size. */ + write_size = (write_size + lsb + (4 + 255)) & ~255; + if (write_size >= srv_page_size) { + goto fail; + } + + /* Set up the page header */ + memcpy(out_buf, buf, header_len); + out_buf[FIL_PAGE_TYPE] = 1U << (FIL_PAGE_COMPRESS_FCRC32_MARKER - 8); + out_buf[FIL_PAGE_TYPE + 1] = byte(write_size >> 8); + /* Clean up the buffer for the remaining write_size (except checksum) */ + memset(out_buf + actual_size, 0, write_size - actual_size - 4); + if (lsb) { + /* Store the LSB */ + out_buf[write_size - 5] = byte(actual_size + (1 + 4)); + } + + if (!block_size) { + block_size = 512; + } + + ut_ad(write_size); + if (write_size & (block_size - 1)) { + size_t tmp = write_size; + write_size = (write_size + (block_size - 1)) + & ~(block_size - 1); + memset(out_buf + tmp, 0, write_size - tmp); + } + +#ifdef UNIV_DEBUG + /* Verify that page can be decompressed */ + { + page_t tmp_buf[UNIV_PAGE_SIZE_MAX]; + page_t page[UNIV_PAGE_SIZE_MAX]; + memcpy(page, out_buf, write_size); + ut_ad(fil_page_decompress(tmp_buf, page, flags)); + } +#endif + srv_stats.page_compression_saved.add(srv_page_size - write_size); + srv_stats.pages_page_compressed.inc(); + + return write_size; +} + +/** Compress a page_compressed page for non full crc32 format. +@param[in] buf page to be compressed +@param[out] out_buf compressed page +@param[in] flags tablespace flags +@param[in] block_size file system block size +@param[in] encrypted whether the page will be subsequently encrypted +@return actual length of compressed page +@retval 0 if the page was not compressed */ +static ulint fil_page_compress_for_non_full_crc32( + const byte* buf, + byte* out_buf, + ulint flags, + ulint block_size, + bool encrypted) +{ + int comp_level = int(fsp_flags_get_page_compression_level(flags)); + ulint header_len = FIL_PAGE_DATA + FIL_PAGE_COMP_METADATA_LEN; + /* Cache to avoid change during function execution */ + ulint comp_algo = innodb_compression_algorithm; + + if (encrypted) { + header_len += FIL_PAGE_ENCRYPT_COMP_ALGO; + } + + /* If no compression level was provided to this table, use system + default level */ + if (comp_level == 0) { + comp_level = int(page_zip_level); + } + + ulint write_size = fil_page_compress_low( + buf, out_buf, + header_len, comp_algo, comp_level); + + if (write_size == 0) { + srv_stats.pages_page_compression_error.inc(); + return 0; + } + /* Set up the page header */ memcpy(out_buf, buf, FIL_PAGE_DATA); /* Set up the checksum */ - mach_write_to_4(out_buf+FIL_PAGE_SPACE_OR_CHKSUM, BUF_NO_CHECKSUM_MAGIC); + mach_write_to_4(out_buf + FIL_PAGE_SPACE_OR_CHKSUM, BUF_NO_CHECKSUM_MAGIC); /* Set up the compression algorithm */ - mach_write_to_8(out_buf+FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION, comp_method); + mach_write_to_8(out_buf + FIL_PAGE_COMP_ALGO, comp_algo); if (encrypted) { /* Set up the correct page type */ - mach_write_to_2(out_buf+FIL_PAGE_TYPE, FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED); - mach_write_to_2(out_buf+FIL_PAGE_DATA+FIL_PAGE_COMPRESSED_SIZE, comp_method); + mach_write_to_2(out_buf + FIL_PAGE_TYPE, + FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED); + + mach_write_to_2(out_buf + FIL_PAGE_DATA + + FIL_PAGE_ENCRYPT_COMP_ALGO, comp_algo); } else { /* Set up the correct page type */ - mach_write_to_2(out_buf+FIL_PAGE_TYPE, FIL_PAGE_PAGE_COMPRESSED); + mach_write_to_2(out_buf + FIL_PAGE_TYPE, FIL_PAGE_PAGE_COMPRESSED); } /* Set up the actual payload lenght */ - mach_write_to_2(out_buf+FIL_PAGE_DATA, write_size); + mach_write_to_2(out_buf + FIL_PAGE_DATA + FIL_PAGE_COMP_SIZE, + write_size); #ifdef UNIV_DEBUG /* Verify */ - ut_ad(fil_page_is_compressed(out_buf) || fil_page_is_compressed_encrypted(out_buf)); - ut_ad(mach_read_from_4(out_buf+FIL_PAGE_SPACE_OR_CHKSUM) == BUF_NO_CHECKSUM_MAGIC); - ut_ad(mach_read_from_2(out_buf+FIL_PAGE_DATA) == write_size); - ut_ad(mach_read_from_8(out_buf+FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION) == (ulint)comp_method || - mach_read_from_2(out_buf+FIL_PAGE_DATA+FIL_PAGE_COMPRESSED_SIZE) == (ulint)comp_method); + ut_ad(fil_page_is_compressed(out_buf) + || fil_page_is_compressed_encrypted(out_buf)); + + ut_ad(mach_read_from_4(out_buf + FIL_PAGE_SPACE_OR_CHKSUM) + == BUF_NO_CHECKSUM_MAGIC); + + ut_ad(mach_read_from_2(out_buf + FIL_PAGE_DATA + FIL_PAGE_COMP_SIZE) + == write_size); + + bool is_compressed = (mach_read_from_8(out_buf + FIL_PAGE_COMP_ALGO) + == (ulint) comp_algo); + + bool is_encrypted_compressed = + (mach_read_from_2(out_buf + FIL_PAGE_DATA + + FIL_PAGE_ENCRYPT_COMP_ALGO) + == (ulint) comp_algo); + + ut_ad(is_compressed || is_encrypted_compressed); /* Verify that page can be decompressed */ { page_t tmp_buf[UNIV_PAGE_SIZE_MAX]; page_t page[UNIV_PAGE_SIZE_MAX]; memcpy(page, out_buf, srv_page_size); - ut_ad(fil_page_decompress(tmp_buf, page)); - ulint fsp_flags = 0; - if (fil_space_t::full_crc32(flags)) { - /* Need to construct flag for new crc32 checksum */ - fsp_flags = 1U << FSP_FLAGS_FCRC32_POS_MARKER; - fsp_flags |= FSP_FLAGS_FCRC32_PAGE_SSIZE(); - - ut_ad(!buf_page_is_corrupted(false, page, fsp_flags)); - } else { - fsp_flags = flags; - ut_ad(!buf_page_is_corrupted(false, page, fsp_flags)); - } + ut_ad(fil_page_decompress(tmp_buf, page, flags)); + ut_ad(!buf_page_is_corrupted(false, page, flags)); } #endif /* UNIV_DEBUG */ @@ -279,7 +373,8 @@ success: /* Actual write needs to be alligned on block size */ if (write_size % block_size) { size_t tmp = write_size; - write_size = (size_t)ut_uint64_align_up((ib_uint64_t)write_size, block_size); + write_size = (size_t)ut_uint64_align_up( + (ib_uint64_t)write_size, block_size); /* Clean up the end of buffer */ memset(out_buf+tmp, 0, write_size - tmp); #ifdef UNIV_DEBUG @@ -294,28 +389,203 @@ success: return write_size; } +/** Compress a page_compressed page before writing to a data file. +@param[in] buf page to be compressed +@param[out] out_buf compressed page +@param[in] flags tablespace flags +@param[in] block_size file system block size +@param[in] encrypted whether the page will be subsequently encrypted +@return actual length of compressed page +@retval 0 if the page was not compressed */ +ulint fil_page_compress( + const byte* buf, + byte* out_buf, + ulint flags, + ulint block_size, + bool encrypted) +{ + /* The full_crc32 page_compressed format assumes this. */ + ut_ad(!(block_size & 255)); + ut_ad(ut_is_2pow(block_size)); + + /* Let's not compress file space header or + extent descriptor */ + switch (fil_page_get_type(buf)) { + case 0: + case FIL_PAGE_TYPE_FSP_HDR: + case FIL_PAGE_TYPE_XDES: + case FIL_PAGE_PAGE_COMPRESSED: + return 0; + } + + if (fil_space_t::full_crc32(flags)) { + return fil_page_compress_for_full_crc32( + buf, out_buf, flags, block_size, encrypted); + } + + return fil_page_compress_for_non_full_crc32( + buf, out_buf, flags, block_size, encrypted); +} + /** Decompress a page that may be subject to page_compressed compression. @param[in,out] tmp_buf temporary buffer (of innodb_page_size) @param[in,out] buf possibly compressed page buffer +@param[in] comp_algo compression algorithm +@param[in] header_len header length of the page +@param[in] actual size actual size of the page +@retval true if the page is decompressed or false */ +static bool fil_page_decompress_low( + byte* tmp_buf, + byte* buf, + ulint comp_algo, + ulint header_len, + ulint actual_size) +{ + switch (comp_algo) { + default: + ib::error() << "Unknown compression algorithm " + << comp_algo; + return false; + case PAGE_ZLIB_ALGORITHM: + { + uLong len = srv_page_size; + return (Z_OK == uncompress(tmp_buf, &len, + buf + header_len, + uLong(actual_size)) + && len == srv_page_size); + } +#ifdef HAVE_LZ4 + case PAGE_LZ4_ALGORITHM: + return LZ4_decompress_safe( + reinterpret_cast(buf) + header_len, + reinterpret_cast(tmp_buf), + actual_size, srv_page_size) == int(srv_page_size); +#endif /* HAVE_LZ4 */ +#ifdef HAVE_LZO + case PAGE_LZO_ALGORITHM: + { + lzo_uint len_lzo = srv_page_size; + return (LZO_E_OK == lzo1x_decompress_safe( + buf + header_len, + actual_size, tmp_buf, &len_lzo, NULL) + && len_lzo == srv_page_size); + } +#endif /* HAVE_LZO */ +#ifdef HAVE_LZMA + case PAGE_LZMA_ALGORITHM: + { + size_t src_pos = 0; + size_t dst_pos = 0; + uint64_t memlimit = UINT64_MAX; + + return LZMA_OK == lzma_stream_buffer_decode( + &memlimit, 0, NULL, buf + header_len, + &src_pos, actual_size, tmp_buf, &dst_pos, + srv_page_size) + && dst_pos == srv_page_size; + } +#endif /* HAVE_LZMA */ +#ifdef HAVE_BZIP2 + case PAGE_BZIP2_ALGORITHM: + { + unsigned int dst_pos = srv_page_size; + return BZ_OK == BZ2_bzBuffToBuffDecompress( + reinterpret_cast(tmp_buf), + &dst_pos, + reinterpret_cast(buf) + header_len, + actual_size, 1, 0) + && dst_pos == srv_page_size; + } +#endif /* HAVE_BZIP2 */ +#ifdef HAVE_SNAPPY + case PAGE_SNAPPY_ALGORITHM: + { + size_t olen = srv_page_size; + + return SNAPPY_OK == snappy_uncompress( + reinterpret_cast(buf) + + header_len, + actual_size, + reinterpret_cast(tmp_buf), &olen) + && olen == srv_page_size; + } +#endif /* HAVE_SNAPPY */ + } + + return false; +} + +/** Decompress a page for full crc32 format. +@param[in,out] tmp_buf temporary buffer (of innodb_page_size) +@param[in,out] buf possibly compressed page buffer +@param[in] flags tablespace flags @return size of the compressed data @retval 0 if decompression failed @retval srv_page_size if the page was not compressed */ -ulint fil_page_decompress(byte* tmp_buf, byte* buf) +ulint fil_page_decompress_for_full_crc32(byte* tmp_buf, byte* buf, ulint flags) +{ + ut_ad(fil_space_t::full_crc32(flags)); + bool compressed = false; + size_t size = buf_page_full_crc32_size(buf, &compressed, NULL); + if (!compressed) { + ut_ad(size == srv_page_size); + return size; + } + + if (!fil_space_t::is_compressed(flags)) { + return 0; + } + + if (size >= srv_page_size) { + return 0; + } + + if (fil_space_t::full_crc32_page_compressed_len(flags)) { + compile_time_assert(FIL_PAGE_FCRC32_CHECKSUM == 4); + if (size_t lsb = buf[size - 5]) { + size += lsb - 0x100; + } + size -= 5; + } + + const size_t header_len = FIL_PAGE_COMP_ALGO; + + if (!fil_page_decompress_low(tmp_buf, buf, + fil_space_t::get_compression_algo(flags), + header_len, size - header_len)) { + return 0; + } + + srv_stats.pages_page_decompressed.inc(); + memcpy(buf, tmp_buf, srv_page_size); + return size; +} + +/** Decompress a page for non full crc32 format. +@param[in,out] tmp_buf temporary buffer (of innodb_page_size) +@param[in,out] buf possibly compressed page buffer +@return size of the compressed data +@retval 0 if decompression failed +@retval srv_page_size if the page was not compressed */ +ulint fil_page_decompress_for_non_full_crc32( + byte* tmp_buf, + byte* buf) { const unsigned ptype = mach_read_from_2(buf+FIL_PAGE_TYPE); ulint header_len; - uint64_t compression_alg; + uint comp_algo; switch (ptype) { case FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED: - header_len = FIL_PAGE_DATA + FIL_PAGE_COMPRESSED_SIZE - + FIL_PAGE_COMPRESSION_METHOD_SIZE; - compression_alg = mach_read_from_2( - FIL_PAGE_DATA + FIL_PAGE_COMPRESSED_SIZE + buf); + header_len= FIL_PAGE_DATA + FIL_PAGE_ENCRYPT_COMP_METADATA_LEN; + comp_algo = mach_read_from_2( + FIL_PAGE_DATA + FIL_PAGE_ENCRYPT_COMP_ALGO + buf); break; case FIL_PAGE_PAGE_COMPRESSED: - header_len = FIL_PAGE_DATA + FIL_PAGE_COMPRESSED_SIZE; - compression_alg = mach_read_from_8( - FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION + buf); + header_len = FIL_PAGE_DATA + FIL_PAGE_COMP_METADATA_LEN; + if (mach_read_from_6(FIL_PAGE_COMP_ALGO + buf)) { + return 0; + } + comp_algo = mach_read_from_2(FIL_PAGE_COMP_ALGO + 6 + buf); break; default: return srv_page_size; @@ -326,99 +596,38 @@ ulint fil_page_decompress(byte* tmp_buf, byte* buf) return 0; } - ulint actual_size = mach_read_from_2(buf + FIL_PAGE_DATA); + ulint actual_size = mach_read_from_2(buf + FIL_PAGE_DATA + + FIL_PAGE_COMP_SIZE); /* Check if payload size is corrupted */ if (actual_size == 0 || actual_size > srv_page_size - header_len) { return 0; } - switch (compression_alg) { - default: - ib::error() << "Unknown compression algorithm " - << compression_alg; + if (!fil_page_decompress_low(tmp_buf, buf, comp_algo, header_len, + actual_size)) { return 0; - case PAGE_ZLIB_ALGORITHM: - { - uLong len = srv_page_size; - if (Z_OK == uncompress(tmp_buf, &len, - buf + header_len, - uLong(actual_size)) - && len == srv_page_size) { - break; - } - } - return 0; -#ifdef HAVE_LZ4 - case PAGE_LZ4_ALGORITHM: - if (LZ4_decompress_safe(reinterpret_cast(buf) - + header_len, - reinterpret_cast(tmp_buf), - actual_size, srv_page_size) - == int(srv_page_size)) { - break; - } - return 0; -#endif /* HAVE_LZ4 */ -#ifdef HAVE_LZO - case PAGE_LZO_ALGORITHM: { - lzo_uint len_lzo = srv_page_size; - if (LZO_E_OK == lzo1x_decompress_safe( - buf + header_len, - actual_size, tmp_buf, &len_lzo, NULL) - && len_lzo == srv_page_size) { - break; - } - return 0; - } -#endif /* HAVE_LZO */ -#ifdef HAVE_LZMA - case PAGE_LZMA_ALGORITHM: { - size_t src_pos = 0; - size_t dst_pos = 0; - uint64_t memlimit = UINT64_MAX; - - if (LZMA_OK == lzma_stream_buffer_decode( - &memlimit, 0, NULL, buf + header_len, - &src_pos, actual_size, tmp_buf, &dst_pos, - srv_page_size) - && dst_pos == srv_page_size) { - break; - } - return 0; - } -#endif /* HAVE_LZMA */ -#ifdef HAVE_BZIP2 - case PAGE_BZIP2_ALGORITHM: { - unsigned int dst_pos = srv_page_size; - if (BZ_OK == BZ2_bzBuffToBuffDecompress( - reinterpret_cast(tmp_buf), - &dst_pos, - reinterpret_cast(buf) + header_len, - actual_size, 1, 0) - && dst_pos == srv_page_size) { - break; - } - return 0; - } -#endif /* HAVE_BZIP2 */ -#ifdef HAVE_SNAPPY - case PAGE_SNAPPY_ALGORITHM: { - size_t olen = srv_page_size; - - if (SNAPPY_OK == snappy_uncompress( - reinterpret_cast(buf) + header_len, - actual_size, - reinterpret_cast(tmp_buf), &olen) - && olen == srv_page_size) { - break; - } - return 0; - } -#endif /* HAVE_SNAPPY */ } srv_stats.pages_page_decompressed.inc(); memcpy(buf, tmp_buf, srv_page_size); return actual_size; } + +/** Decompress a page that may be subject to page_compressed compression. +@param[in,out] tmp_buf temporary buffer (of innodb_page_size) +@param[in,out] buf possibly compressed page buffer +@return size of the compressed data +@retval 0 if decompression failed +@retval srv_page_size if the page was not compressed */ +ulint fil_page_decompress( + byte* tmp_buf, + byte* buf, + ulint flags) +{ + if (fil_space_t::full_crc32(flags)) { + return fil_page_decompress_for_full_crc32(tmp_buf, buf, flags); + } + + return fil_page_decompress_for_non_full_crc32(tmp_buf, buf); +} diff --git a/storage/innobase/fsp/fsp0file.cc b/storage/innobase/fsp/fsp0file.cc index a4f83def85b..dbd7d4dd741 100644 --- a/storage/innobase/fsp/fsp0file.cc +++ b/storage/innobase/fsp/fsp0file.cc @@ -1,7 +1,7 @@ /***************************************************************************** Copyright (c) 2013, 2016, Oracle and/or its affiliates. All Rights Reserved. -Copyright (c) 2017, 2019, MariaDB Corporation. +Copyright (c) 2017, 2018, MariaDB Corporation. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc index 4a98109aca7..06f2beea97c 100644 --- a/storage/innobase/handler/ha_innodb.cc +++ b/storage/innobase/handler/ha_innodb.cc @@ -11646,9 +11646,8 @@ create_table_info_t::check_table_options() except if innodb_checksum_algorithm=full_crc32. Do not allow ENCRYPTED=YES if any SPATIAL INDEX exists. */ if (options->encryption != FIL_ENCRYPTION_ON - || (!options->page_compressed - && srv_checksum_algorithm - >= SRV_CHECKSUM_ALGORITHM_FULL_CRC32)) { + || srv_checksum_algorithm + >= SRV_CHECKSUM_ALGORITHM_FULL_CRC32) { break; } for (ulint i = 0; i < m_form->s->keys; i++) { diff --git a/storage/innobase/handler/handler0alter.cc b/storage/innobase/handler/handler0alter.cc index 4e336413abc..af973e1544e 100644 --- a/storage/innobase/handler/handler0alter.cc +++ b/storage/innobase/handler/handler0alter.cc @@ -1785,16 +1785,6 @@ ha_innobase::check_if_supported_inplace_alter( update_thd(); - /* MDEV-12026 FIXME: Implement and allow - innodb_checksum_algorithm=full_crc32 for page_compressed! */ - if (m_prebuilt->table->space - && m_prebuilt->table->space->full_crc32() - && altered_table->s->option_struct - && altered_table->s->option_struct->page_compressed) { - ut_ad(!table->s->option_struct->page_compressed); - DBUG_RETURN(HA_ALTER_INPLACE_NOT_SUPPORTED); - } - if (ha_alter_info->handler_flags & ~(INNOBASE_INPLACE_IGNORE | INNOBASE_ALTER_INSTANT @@ -10261,11 +10251,17 @@ commit_cache_norebuild( bool update = !(space->flags & FSP_FLAGS_MASK_PAGE_COMPRESSION); mutex_enter(&fil_system.mutex); - space->flags = (~FSP_FLAGS_MASK_MEM_COMPRESSION_LEVEL - & (space->flags - | FSP_FLAGS_MASK_PAGE_COMPRESSION)) - | ctx->page_compression_level + space->flags &= ~FSP_FLAGS_MASK_MEM_COMPRESSION_LEVEL; + space->flags |= ctx->page_compression_level << FSP_FLAGS_MEM_COMPRESSION_LEVEL; + if (!space->full_crc32()) { + space->flags + |= FSP_FLAGS_MASK_PAGE_COMPRESSION; + } else if (!space->is_compressed()) { + space->flags + |= innodb_compression_algorithm + << FSP_FLAGS_FCRC32_POS_COMPRESSED_ALGO; + } mutex_exit(&fil_system.mutex); if (update) { diff --git a/storage/innobase/ibuf/ibuf0ibuf.cc b/storage/innobase/ibuf/ibuf0ibuf.cc index 429163528ff..346caae40c2 100644 --- a/storage/innobase/ibuf/ibuf0ibuf.cc +++ b/storage/innobase/ibuf/ibuf0ibuf.cc @@ -4876,20 +4876,6 @@ ibuf_print( mutex_exit(&ibuf_mutex); } -/** Check if a page is all zeroes. -@param[in] read_buf database page -@param[in] size page size -@return whether the page is all zeroes */ -static bool buf_page_is_zeroes(const byte* read_buf, ulint size) -{ - for (ulint i = 0; i < size; i++) { - if (read_buf[i] != 0) { - return false; - } - } - return true; -} - /** Check the insert buffer bitmaps on IMPORT TABLESPACE. @param[in] trx transaction @param[in,out] space tablespace being imported diff --git a/storage/innobase/include/buf0buf.h b/storage/innobase/include/buf0buf.h index 18aa9143974..1f7bf5cd737 100644 --- a/storage/innobase/include/buf0buf.h +++ b/storage/innobase/include/buf0buf.h @@ -678,6 +678,12 @@ buf_block_buf_fix_inc_func( # endif /* UNIV_DEBUG */ #endif /* !UNIV_INNOCHECKSUM */ +/** Check if a page is all zeroes. +@param[in] read_buf database page +@param[in] page_size page frame size +@return whether the page is all zeroes */ +bool buf_page_is_zeroes(const void* read_buf, size_t page_size); + /** Checks if the page is in crc32 checksum format. @param[in] read_buf database page @param[in] checksum_field1 new checksum field @@ -714,14 +720,6 @@ buf_page_is_checksum_valid_none( ulint checksum_field2) MY_ATTRIBUTE((nonnull(1), warn_unused_result)); -/** Checks if the page is in full crc32 checksum format. -@param[in] read_buf database page -@param[in] checksum_field checksum field -@return true if the page is in full crc32 checksum format */ -bool buf_page_is_checksum_valid_full_crc32( - const byte* read_buf, - size_t checksum_field); - /** Check if a page is corrupt. @param[in] check_lsn whether the LSN should be checked @param[in] read_buf database page @@ -742,12 +740,55 @@ stored in 26th position. @return key version of the page. */ inline uint32_t buf_page_get_key_version(const byte* read_buf, ulint fsp_flags) { - return FSP_FLAGS_FCRC32_HAS_MARKER(fsp_flags) + return fil_space_t::full_crc32(fsp_flags) ? mach_read_from_4(read_buf + FIL_PAGE_FCRC32_KEY_VERSION) : mach_read_from_4(read_buf + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION); } +/** Read the compression info from the page. In full crc32 format, +compression info is at MSB of page type. In other format, it is +stored in page type. +@param[in] read_buf database page +@param[in] fsp_flags tablespace flags +@return true if page is compressed. */ +inline bool buf_page_is_compressed(const byte* read_buf, ulint fsp_flags) +{ + ulint page_type = mach_read_from_2(read_buf + FIL_PAGE_TYPE); + return fil_space_t::full_crc32(fsp_flags) + ? !!(page_type & 1U << FIL_PAGE_COMPRESS_FCRC32_MARKER) + : page_type == FIL_PAGE_PAGE_COMPRESSED; +} + +/** Get the compressed or uncompressed size of a full_crc32 page. +@param[in] buf page_compressed or uncompressed page +@param[out] comp whether the page could be compressed +@param[out] cr whether the page could be corrupted +@return the payload size in the file page */ +inline uint buf_page_full_crc32_size(const byte* buf, bool* comp, bool* cr) +{ + uint t = mach_read_from_2(buf + FIL_PAGE_TYPE); + uint page_size = uint(srv_page_size); + + if (!(t & 1U << FIL_PAGE_COMPRESS_FCRC32_MARKER)) { + return page_size; + } + + t &= ~(1U << FIL_PAGE_COMPRESS_FCRC32_MARKER); + t <<= 8; + + if (t < page_size) { + page_size = t; + if (comp) { + *comp = true; + } + } else if (cr) { + *cr = true; + } + + return page_size; +} + #ifndef UNIV_INNOCHECKSUM /**********************************************************************//** Gets the space id, page offset, and byte offset within page of a diff --git a/storage/innobase/include/buf0checksum.h b/storage/innobase/include/buf0checksum.h index 8e1cb43aadf..98ce879b9ea 100644 --- a/storage/innobase/include/buf0checksum.h +++ b/storage/innobase/include/buf0checksum.h @@ -1,7 +1,7 @@ /***************************************************************************** Copyright (c) 1995, 2016, Oracle and/or its affiliates. All Rights Reserved. -Copyright (c) 2017, 2018, MariaDB Corporation. +Copyright (c) 2017, 2019, MariaDB Corporation. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software @@ -56,11 +56,6 @@ because this takes that field as an input! uint32_t buf_calc_page_old_checksum(const byte* page); -/** Calculate the CRC32 checksum for the whole page. -@param[in] page buffer page (srv_page_size bytes) -@return CRC32 value */ -uint32_t buf_calc_page_full_crc32(const byte* page); - /** Return a printable string describing the checksum algorithm. @param[in] algo algorithm @return algorithm name */ diff --git a/storage/innobase/include/dict0dict.ic b/storage/innobase/include/dict0dict.ic index d1953a5ddbe..01807b26004 100644 --- a/storage/innobase/include/dict0dict.ic +++ b/storage/innobase/include/dict0dict.ic @@ -1,7 +1,7 @@ /***************************************************************************** Copyright (c) 1996, 2017, Oracle and/or its affiliates. All Rights Reserved. -Copyright (c) 2013, 2018, MariaDB Corporation. +Copyright (c) 2013, 2019, MariaDB Corporation. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software @@ -652,14 +652,18 @@ dict_tf_to_fsp_flags(ulint table_flags) DBUG_EXECUTE_IF("dict_tf_to_fsp_flags_failure", return(ULINT_UNDEFINED);); - /* Don't allow compressed row format for full crc32 algorithm */ + /* No ROW_FORMAT=COMPRESSED for innodb_checksum_algorithm=full_crc32 */ if ((srv_checksum_algorithm == SRV_CHECKSUM_ALGORITHM_STRICT_FULL_CRC32 || srv_checksum_algorithm == SRV_CHECKSUM_ALGORITHM_FULL_CRC32) - && !(table_flags & DICT_TF_MASK_ZIP_SSIZE) - && !page_compression_level) { + && !(table_flags & DICT_TF_MASK_ZIP_SSIZE)) { - fsp_flags = 1U << FSP_FLAGS_FCRC32_POS_MARKER; - fsp_flags |= FSP_FLAGS_FCRC32_PAGE_SSIZE(); + fsp_flags = 1U << FSP_FLAGS_FCRC32_POS_MARKER + | FSP_FLAGS_FCRC32_PAGE_SSIZE(); + + if (page_compression_level) { + fsp_flags |= innodb_compression_algorithm + << FSP_FLAGS_FCRC32_POS_COMPRESSED_ALGO; + } } else { /* Adjust bit zero. */ fsp_flags = DICT_TF_HAS_ATOMIC_BLOBS(table_flags) ? 1 : 0; diff --git a/storage/innobase/include/fil0fil.h b/storage/innobase/include/fil0fil.h index c1531e21f61..bde5a85ebf9 100644 --- a/storage/innobase/include/fil0fil.h +++ b/storage/innobase/include/fil0fil.h @@ -333,15 +333,46 @@ struct fil_space_t { if (full_crc32(flags)) { ulint algo = FSP_FLAGS_FCRC32_GET_COMPRESSED_ALGO( - flags); - ut_ad(algo < 6); - return (algo > 0); + flags); + DBUG_ASSERT(algo <= PAGE_ALGORITHM_LAST); + return algo > 0; } return FSP_FLAGS_HAS_PAGE_COMPRESSION(flags); } /** @return whether the compression enabled for the tablespace. */ - bool is_compressed() { return is_compressed(flags); } + bool is_compressed() const { return is_compressed(flags); } + + /** Get the compression algorithm for full crc32 format. + @param[in] flags tablespace flags + @return algorithm type of tablespace */ + static ulint get_compression_algo(ulint flags) + { + return full_crc32(flags) + ? FSP_FLAGS_FCRC32_GET_COMPRESSED_ALGO(flags) + : 0; + } + /** @return the page_compressed algorithm + @retval 0 if not page_compressed */ + ulint get_compression_algo() const { + return fil_space_t::get_compression_algo(flags); + } + /** Determine if the page_compressed page contains an extra byte + for exact compressed stream length + @param[in] flags tablespace flags + @return whether the extra byte is needed */ + static bool full_crc32_page_compressed_len(ulint flags) + { + DBUG_ASSERT(full_crc32(flags)); + switch (get_compression_algo(flags)) { + case PAGE_LZ4_ALGORITHM: + case PAGE_LZO_ALGORITHM: + case PAGE_SNAPPY_ALGORITHM: + return true; + } + return false; + } + /** Whether the full checksum matches with non full checksum flags. @param[in] flags flags present @param[in] expected expected flags @@ -351,22 +382,22 @@ struct fil_space_t { ut_ad(full_crc32(flags)); if (full_crc32(expected)) { - return false; + return get_compression_algo(flags) + == get_compression_algo(expected); } ulint page_ssize = FSP_FLAGS_FCRC32_GET_PAGE_SSIZE(flags); ulint space_page_ssize = FSP_FLAGS_GET_PAGE_SSIZE(expected); - if ((page_ssize == 5 && space_page_ssize != 0) - || (page_ssize != 5 && (space_page_ssize != page_ssize))) { + if (page_ssize == 5) { + if (space_page_ssize) { + return false; + } + } else if (space_page_ssize != page_ssize) { return false; } - if (FSP_FLAGS_HAS_PAGE_COMPRESSION(expected)) { - return false; - } - - return true; + return is_compressed(expected) == is_compressed(flags); } /** Whether old tablespace flags match full_crc32 flags. @param[in] flags flags present @@ -376,8 +407,7 @@ struct fil_space_t { { ut_ad(!full_crc32(flags)); - if (!full_crc32(expected) - || FSP_FLAGS_HAS_PAGE_COMPRESSION(expected)) { + if (!full_crc32(expected)) { return false; } @@ -385,12 +415,15 @@ struct fil_space_t { ulint space_page_ssize = FSP_FLAGS_FCRC32_GET_PAGE_SSIZE( expected); - if ((page_ssize == 0 && space_page_ssize != 5) - || (page_ssize != 0 && (space_page_ssize != page_ssize))) { + if (page_ssize) { + if (space_page_ssize != 5) { + return false; + } + } else if (space_page_ssize != page_ssize) { return false; } - return true; + return is_compressed(expected) == is_compressed(flags); } /** Whether both fsp flags are equivalent */ static bool is_flags_equal(ulint flags, ulint expected) @@ -647,6 +680,9 @@ or 64 bites of zero if no encryption */ /** This overloads FIL_PAGE_FILE_FLUSH_LSN for RTREE Split Sequence Number */ #define FIL_RTREE_SPLIT_SEQ_NUM FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION +/** Start of the page_compressed content */ +#define FIL_PAGE_COMP_ALGO FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION + /** starting from 4.1.x this contains the space id of the page */ #define FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID 34U @@ -654,18 +690,27 @@ or 64 bites of zero if no encryption */ #define FIL_PAGE_DATA 38U /*!< start of the data on the page */ -/** 32-bit key version used to encrypt the page in full crc32 format. +/** 32-bit key version used to encrypt the page in full_crc32 format. For non-encrypted page, it contains 0. */ #define FIL_PAGE_FCRC32_KEY_VERSION 0 -/* Following are used when page compression is used */ -#define FIL_PAGE_COMPRESSED_SIZE 2 /*!< Number of bytes used to store - actual payload data size on - compressed pages. */ -#define FIL_PAGE_COMPRESSION_METHOD_SIZE 2 - /*!< Number of bytes used to store - actual compression method. */ +/** page_compressed without innodb_checksum_algorithm=full_crc32 @{ */ +/** Number of bytes used to store actual payload data size on +page_compressed pages when not using full_crc32. */ +#define FIL_PAGE_COMP_SIZE 0 + +/** Number of bytes for FIL_PAGE_COMP_SIZE */ +#define FIL_PAGE_COMP_METADATA_LEN 2 + +/** Number of bytes used to store actual compression method +for encrypted tables when not using full_crc32. */ +#define FIL_PAGE_ENCRYPT_COMP_ALGO 2 + +/** Extra header size for encrypted page_compressed pages when +not using full_crc32 */ +#define FIL_PAGE_ENCRYPT_COMP_METADATA_LEN 4 /* @} */ + /** File page trailer @{ */ #define FIL_PAGE_END_LSN_OLD_CHKSUM 8 /*!< the low 4 bytes of this are used to store the page checksum, the @@ -681,8 +726,9 @@ For non-encrypted page, it contains 0. */ /* @} */ /** File page types (values of FIL_PAGE_TYPE) @{ */ -#define FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED 37401 /*!< Page is compressed and - then encrypted */ +/** page_compressed, encrypted=YES (not used for full_crc32) */ +#define FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED 37401 +/** page_compressed (not used for full_crc32) */ #define FIL_PAGE_PAGE_COMPRESSED 34354 /*!< page compressed page */ #define FIL_PAGE_INDEX 17855 /*!< B-tree node */ #define FIL_PAGE_RTREE 17854 /*!< R-tree node (SPATIAL INDEX) */ @@ -715,6 +761,12 @@ For non-encrypted page, it contains 0. */ Note: FIL_PAGE_TYPE_INSTANT maps to the same as FIL_PAGE_INDEX. */ #define FIL_PAGE_TYPE_LAST FIL_PAGE_TYPE_UNKNOWN /*!< Last page type */ +/** Set in FIL_PAGE_TYPE if for full_crc32 pages in page_compressed format. +If the flag is set, then the following holds for the remaining bits +of FIL_PAGE_TYPE: +Bits 0..7 will contain the compressed page size in bytes. +Bits 8..14 are reserved and must be 0. */ +#define FIL_PAGE_COMPRESS_FCRC32_MARKER 15 /* @} */ /** @return whether the page type is B-tree or R-tree index */ diff --git a/storage/innobase/include/fil0fil.ic b/storage/innobase/include/fil0fil.ic index e2d6b443d41..b70358e83d1 100644 --- a/storage/innobase/include/fil0fil.ic +++ b/storage/innobase/include/fil0fil.ic @@ -1,6 +1,6 @@ /***************************************************************************** -Copyright (c) 2015, 2018, MariaDB Corporation. +Copyright (c) 2015, 2019, MariaDB Corporation. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software @@ -88,6 +88,12 @@ fil_page_type_validate( { ulint page_type = mach_read_from_2(page + FIL_PAGE_TYPE); + if ((page_type & 1U << FIL_PAGE_COMPRESS_FCRC32_MARKER) + && space->full_crc32() + && space->is_compressed()) { + return true; + } + /* Validate page type */ if (!((page_type == FIL_PAGE_PAGE_COMPRESSED || page_type == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED || diff --git a/storage/innobase/include/fil0pagecompress.h b/storage/innobase/include/fil0pagecompress.h index 68e37640f66..9cd91b323c5 100644 --- a/storage/innobase/include/fil0pagecompress.h +++ b/storage/innobase/include/fil0pagecompress.h @@ -49,9 +49,13 @@ ulint fil_page_compress( /** Decompress a page that may be subject to page_compressed compression. @param[in,out] tmp_buf temporary buffer (of innodb_page_size) @param[in,out] buf compressed page buffer +@param[in] flags talespace flags @return size of the compressed data @retval 0 if decompression failed @retval srv_page_size if the page was not compressed */ -ulint fil_page_decompress(byte* tmp_buf, byte* buf) +ulint fil_page_decompress( + byte* tmp_buf, + byte* buf, + ulint flags) MY_ATTRIBUTE((nonnull, warn_unused_result)); #endif diff --git a/storage/innobase/include/fsp0types.h b/storage/innobase/include/fsp0types.h index 86a1c9cea72..1b2ede47252 100644 --- a/storage/innobase/include/fsp0types.h +++ b/storage/innobase/include/fsp0types.h @@ -386,10 +386,6 @@ in full crc32 format. */ #define FSP_FLAGS_FCRC32_GET_PAGE_SSIZE(flags) \ ((flags & FSP_FLAGS_FCRC32_MASK_PAGE_SSIZE) \ >> FSP_FLAGS_FCRC32_POS_PAGE_SSIZE) -/** @return the MARKER flag in full crc32 format */ -#define FSP_FLAGS_FCRC32_HAS_MARKER(flags) \ - ((flags & FSP_FLAGS_FCRC32_MASK_MARKER) \ - >> FSP_FLAGS_FCRC32_POS_MARKER) /** @return the COMPRESSED_ALGO flags in full crc32 format */ #define FSP_FLAGS_FCRC32_GET_COMPRESSED_ALGO(flags) \ ((flags & FSP_FLAGS_FCRC32_MASK_COMPRESSED_ALGO) \ diff --git a/storage/innobase/include/page0zip.h b/storage/innobase/include/page0zip.h index 42432d08ad9..c5376967bd8 100644 --- a/storage/innobase/include/page0zip.h +++ b/storage/innobase/include/page0zip.h @@ -495,15 +495,12 @@ page_zip_calc_checksum( ulint size, srv_checksum_algorithm_t algo); -/**********************************************************************//** -Verify a compressed page's checksum. -@return TRUE if the stored checksum is valid according to the value of +/** Verify a compressed page's checksum. +@param[in] data compressed page +@param[in] size size of compressed page +@return whether the stored checksum is valid according to the value of innodb_checksum_algorithm */ -ibool -page_zip_verify_checksum( -/*=====================*/ - const void* data, /*!< in: compressed page */ - ulint size); /*!< in: size of compressed page */ +bool page_zip_verify_checksum(const void* data, ulint size); #ifndef UNIV_INNOCHECKSUM /**********************************************************************//** diff --git a/storage/innobase/page/page0zip.cc b/storage/innobase/page/page0zip.cc index 09fbcfa9d24..8c5007c434b 100644 --- a/storage/innobase/page/page0zip.cc +++ b/storage/innobase/page/page0zip.cc @@ -2,7 +2,7 @@ Copyright (c) 2005, 2016, Oracle and/or its affiliates. All Rights Reserved. Copyright (c) 2012, Facebook Inc. -Copyright (c) 2014, 2018, MariaDB Corporation. +Copyright (c) 2014, 2019, MariaDB Corporation. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software @@ -4972,15 +4972,12 @@ page_zip_calc_checksum( return(0); } -/**********************************************************************//** -Verify a compressed page's checksum. -@return TRUE if the stored checksum is valid according to the value of +/** Verify a compressed page's checksum. +@param[in] data compressed page +@param[in] size size of compressed page +@return whether the stored checksum is valid according to the value of innodb_checksum_algorithm */ -ibool -page_zip_verify_checksum( -/*=====================*/ - const void* data, /*!< in: compressed page */ - ulint size) /*!< in: size of compressed page */ +bool page_zip_verify_checksum(const void* data, ulint size) { const uint32_t stored = mach_read_from_4( static_cast(data) + FIL_PAGE_SPACE_OR_CHKSUM); diff --git a/storage/innobase/row/row0import.cc b/storage/innobase/row/row0import.cc index c5768fe827c..108b024f986 100644 --- a/storage/innobase/row/row0import.cc +++ b/storage/innobase/row/row0import.cc @@ -2026,15 +2026,22 @@ dberr_t PageConverter::operator()(buf_block_t* block) UNIV_NOTHROW if (err != DB_SUCCESS) return err; const bool full_crc32 = fil_space_t::full_crc32(get_space_flags()); + const bool page_compressed = fil_space_t::is_compressed(get_space_flags()); if (!block->page.zip.data) { if (full_crc32 - && block->page.encrypted && block->page.id.page_no() > 0) { + && (block->page.encrypted || page_compressed) + && block->page.id.page_no() > 0) { byte* page = block->frame; mach_write_to_8(page + FIL_PAGE_LSN, m_current_lsn); - mach_write_to_4( - page + srv_page_size - FIL_PAGE_FCRC32_END_LSN, - (ulint) m_current_lsn); + + if (!page_compressed) { + mach_write_to_4( + page + (srv_page_size + - FIL_PAGE_FCRC32_END_LSN), + (ulint) m_current_lsn); + } + return err; } @@ -3311,6 +3318,7 @@ fil_iterate( return DB_OUT_OF_MEMORY; } + ulint actual_space_id = 0; const bool full_crc32 = fil_space_t::full_crc32( callback.get_space_flags()); @@ -3371,15 +3379,9 @@ fil_iterate( byte* src = readptr + i * size; const ulint page_no = page_get_page_no(src); if (!page_no && block->page.id.page_no()) { - const ulint* b = reinterpret_cast - (src); - const ulint* const e = b + size / sizeof *b; - do { - if (*b++) { - goto page_corrupted; - } - } while (b != e); - + if (!buf_page_is_zeroes(src, size)) { + goto page_corrupted; + } /* Proceed to the next page, because this one is all zero. */ continue; @@ -3395,9 +3397,19 @@ page_corrupted: goto func_exit; } - const bool page_compressed - = fil_page_is_compressed_encrypted(src) - || fil_page_is_compressed(src); + if (block->page.id.page_no() == 0) { + actual_space_id = mach_read_from_4( + src + FIL_PAGE_SPACE_ID); + } + + const bool page_compressed = + (full_crc32 + && fil_space_t::is_compressed( + callback.get_space_flags()) + && buf_page_is_compressed( + src, callback.get_space_flags())) + || (fil_page_is_compressed_encrypted(src) + || fil_page_is_compressed(src)); if (page_compressed && block->page.zip.data) { goto page_corrupted; @@ -3427,7 +3439,7 @@ not_encrypted: } decrypted = fil_space_decrypt( - block->page.id.space(), + actual_space_id, iter.crypt_data, dst, callback.physical_size(), callback.get_space_flags(), @@ -3452,7 +3464,8 @@ not_encrypted: to decompress it before adjusting further. */ if (page_compressed) { ulint compress_length = fil_page_decompress( - page_compress_buf, dst); + page_compress_buf, dst, + callback.get_space_flags()); ut_ad(compress_length != srv_page_size); if (compress_length == 0) { goto page_corrupted; @@ -3556,6 +3569,26 @@ not_encrypted: updated = true; } + + /* Write checksum for the compressed full crc32 page.*/ + if (full_crc32 && page_compressed) { + ut_ad(updated); + byte* dest = writeptr + i * size; + ut_d(bool comp = false); + ut_d(bool corrupt = false); + ulint size = buf_page_full_crc32_size( + dest, +#ifdef UNIV_DEBUG + &comp, &corrupt +#else + NULL, NULL +#endif + ); + ut_ad(!comp == (size == srv_page_size)); + ut_ad(!corrupt); + mach_write_to_4(dest + (size - 4), + ut_crc32(dest, size - 4)); + } } /* A page was updated in the set, write back to disk. */