MDEV-18644: Support full_crc32 for page_compressed

This is a follow-up task to MDEV-12026, which introduced
innodb_checksum_algorithm=full_crc32 and a simpler page format.
MDEV-12026 did not enable full_crc32 for page_compressed tables,
which we will be doing now.

This is joint work with Thirunarayanan Balathandayuthapani.

For innodb_checksum_algorithm=full_crc32 we change the
page_compressed format as follows:

FIL_PAGE_TYPE: The most significant bit will be set to indicate
page_compressed format. The least significant bits will contain
the compressed page size, rounded up to a multiple of 256 bytes.

The checksum will be stored in the last 4 bytes of the page
(whether it is the full page or a page_compressed page whose
size is determined by FIL_PAGE_TYPE), covering all preceding
bytes of the page. If encryption is used, then the page will
be encrypted between compression and computing the checksum.
For page_compressed, FIL_PAGE_LSN will not be repeated at
the end of the page.

FSP_SPACE_FLAGS (already implemented as part of MDEV-12026):
We will store the innodb_compression_algorithm that may be used
to compress pages. Previously, the choice of algorithm was written
to each compressed data page separately, and one would be unable
to know in advance which compression algorithm(s) are used.

fil_space_t::full_crc32_page_compressed_len(): Determine if the
page_compressed algorithm of the tablespace needs to know the
exact length of the compressed data. If yes, we will reserve and
write an extra byte for this right before the checksum.

buf_page_is_compressed(): Determine if a page uses page_compressed
(in any innodb_checksum_algorithm).

fil_page_decompress(): Pass also fil_space_t::flags so that the
format can be determined.

buf_page_is_zeroes(): Check if a page is full of zero bytes.

buf_page_full_crc32_is_corrupted(): Renamed from
buf_encrypted_full_crc32_page_is_corrupted(). For full_crc32,
we always simply validate the checksum to the page contents,
while the physical page size is explicitly specified by an
unencrypted part of the page header.

buf_page_full_crc32_size(): Determine the size of a full_crc32 page.

buf_dblwr_check_page_lsn(): Make this a debug-only function, because
it involves potentially costly lookups of fil_space_t.

create_table_info_t::check_table_options(),
ha_innobase::check_if_supported_inplace_alter(): Do allow the creation
of SPATIAL INDEX with full_crc32 also when page_compressed is used.

commit_cache_norebuild(): Preserve the compression algorithm when
updating the page_compression_level.

dict_tf_to_fsp_flags(): Set the flags for page compression algorithm.
FIXME: Maybe there should be a table option page_compression_algorithm
and a session variable to back it?
This commit is contained in:
Marko Mäkelä 2019-03-18 14:08:43 +02:00
parent 2151aed44d
commit 6b6fa3cdb1
30 changed files with 890 additions and 478 deletions

View File

@ -281,7 +281,7 @@ static void init_page_size(const byte* buf)
const unsigned flags = mach_read_from_4(buf + FIL_PAGE_DATA
+ FSP_SPACE_FLAGS);
if (FSP_FLAGS_FCRC32_HAS_MARKER(flags)) {
if (fil_space_t::full_crc32(flags)) {
srv_page_size = fil_space_t::logical_size(flags);
physical_page_size = srv_page_size;
return;
@ -461,7 +461,7 @@ is_page_corrupted(
return (false);
}
if (!zip_size) {
if (!zip_size && (!is_compressed || !use_full_crc32)) {
/* check the stored log sequence numbers
for uncompressed tablespace. */
logseq = mach_read_from_4(buf + FIL_PAGE_LSN + 4);
@ -613,8 +613,10 @@ static bool update_checksum(byte* page, ulint flags)
}
} else if (use_full_crc32) {
checksum = buf_calc_page_full_crc32(page);
byte* c = page + physical_page_size - FIL_PAGE_FCRC32_CHECKSUM;
ulint payload = buf_page_full_crc32_size(page, NULL, NULL)
- FIL_PAGE_FCRC32_CHECKSUM;
checksum = ut_crc32(page, payload);
byte* c = page + payload;
if (mach_read_from_4(c) == checksum) return false;
mach_write_to_4(c, checksum);
if (is_log_enabled) {

View File

@ -178,7 +178,9 @@ static void init_ibd_data(ds_local_file_t *local_file, const uchar *buf, size_t
ulint flags = mach_read_from_4(&buf[FIL_PAGE_DATA + FSP_SPACE_FLAGS]);
ulint ssize = FSP_FLAGS_GET_PAGE_SSIZE(flags);
local_file->pagesize= ssize == 0 ? UNIV_PAGE_SIZE_ORIG : ((UNIV_ZIP_SIZE_MIN >> 1) << ssize);
local_file->compressed = (my_bool)FSP_FLAGS_HAS_PAGE_COMPRESSION(flags);
local_file->compressed = fil_space_t::full_crc32(flags)
? fil_space_t::is_compressed(flags)
: bool(FSP_FLAGS_HAS_PAGE_COMPRESSION(flags));
#if defined(_WIN32) && (MYSQL_VERSION_ID > 100200)
/* Make compressed file sparse, on Windows.

View File

@ -361,7 +361,8 @@ static bool page_is_corrupted(const byte *page, ulint page_no,
if (page_type == FIL_PAGE_PAGE_COMPRESSED
|| page_type == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED) {
ulint decomp = fil_page_decompress(tmp_frame, tmp_page);
ulint decomp = fil_page_decompress(tmp_frame, tmp_page,
space->flags);
page_type = mach_read_from_2(tmp_page + FIL_PAGE_TYPE);
return (!decomp

View File

@ -1,6 +1,6 @@
--- innodb-spatial-index.result
+++ innodb-spatial-index.result
@@ -1,22 +1,25 @@
@@ -1,23 +1,26 @@
CREATE TABLE t1 (pk INT PRIMARY KEY AUTO_INCREMENT,
c VARCHAR(256), coordinate POINT NOT NULL, SPATIAL index(coordinate)) ENGINE=INNODB
ENCRYPTED=YES;
@ -14,7 +14,8 @@
-Got one of the listed errors
DROP TABLE t1;
CREATE TABLE t1 (pk INT PRIMARY KEY AUTO_INCREMENT,
c VARCHAR(256), coordinate POINT NOT NULL) ENCRYPTED=YES ENGINE=INNODB;
c VARCHAR(256), coordinate POINT NOT NULL)
PAGE_COMPRESSED=YES, ENCRYPTED=YES ENGINE=INNODB;
ALTER TABLE t1 ADD SPATIAL INDEX b1(coordinate), ALGORITHM=COPY;
-Got one of the listed errors
ALTER TABLE t1 ADD SPATIAL INDEX b2(coordinate), FORCE, ALGORITHM=INPLACE;

View File

@ -1,6 +1,6 @@
--- innodb-spatial-index.result
+++ innodb-spatial-index.result
@@ -1,22 +1,25 @@
@@ -1,23 +1,26 @@
CREATE TABLE t1 (pk INT PRIMARY KEY AUTO_INCREMENT,
c VARCHAR(256), coordinate POINT NOT NULL, SPATIAL index(coordinate)) ENGINE=INNODB
ENCRYPTED=YES;
@ -14,7 +14,8 @@
-Got one of the listed errors
DROP TABLE t1;
CREATE TABLE t1 (pk INT PRIMARY KEY AUTO_INCREMENT,
c VARCHAR(256), coordinate POINT NOT NULL) ENCRYPTED=YES ENGINE=INNODB;
c VARCHAR(256), coordinate POINT NOT NULL)
PAGE_COMPRESSED=YES, ENCRYPTED=YES ENGINE=INNODB;
ALTER TABLE t1 ADD SPATIAL INDEX b1(coordinate), ALGORITHM=COPY;
-Got one of the listed errors
ALTER TABLE t1 ADD SPATIAL INDEX b2(coordinate), FORCE, ALGORITHM=INPLACE;

View File

@ -8,7 +8,8 @@ ALTER TABLE t1 ENCRYPTED=YES;
Got one of the listed errors
DROP TABLE t1;
CREATE TABLE t1 (pk INT PRIMARY KEY AUTO_INCREMENT,
c VARCHAR(256), coordinate POINT NOT NULL) ENCRYPTED=YES ENGINE=INNODB;
c VARCHAR(256), coordinate POINT NOT NULL)
PAGE_COMPRESSED=YES, ENCRYPTED=YES ENGINE=INNODB;
ALTER TABLE t1 ADD SPATIAL INDEX b1(coordinate), ALGORITHM=COPY;
Got one of the listed errors
ALTER TABLE t1 ADD SPATIAL INDEX b2(coordinate), FORCE, ALGORITHM=INPLACE;

View File

@ -45,7 +45,8 @@ DROP TABLE t1;
# Index creation
#
CREATE TABLE t1 (pk INT PRIMARY KEY AUTO_INCREMENT,
c VARCHAR(256), coordinate POINT NOT NULL) ENCRYPTED=YES ENGINE=INNODB;
c VARCHAR(256), coordinate POINT NOT NULL)
PAGE_COMPRESSED=YES, ENCRYPTED=YES ENGINE=INNODB;
# FIXME: MDEV-13851 Encrypted table refuses some form of ALGORITHM=COPY,
# but allows rebuild by FORCE
--error $error_code

View File

@ -1,4 +1,5 @@
call mtr.add_suppression("InnoDB: Table `test`.`t1` has an unreadable root page");
call mtr.add_suppression("InnoDB: Encrypted page .* in file .*test.t1\\.ibd looks corrupted; key_version=1");
CREATE TABLE t1 (a INT AUTO_INCREMENT PRIMARY KEY, b TEXT, c char(200)) ENGINE=InnoDB page_compressed=yes encrypted=yes;
insert into t1(b, c) values("mariadb", "mariabackup");
# Corrupt the table

View File

@ -1,5 +1,6 @@
source include/have_file_key_management.inc;
call mtr.add_suppression("InnoDB: Table `test`.`t1` has an unreadable root page");
call mtr.add_suppression("InnoDB: Encrypted page .* in file .*test.t1\\.ibd looks corrupted; key_version=1");
CREATE TABLE t1 (a INT AUTO_INCREMENT PRIMARY KEY, b TEXT, c char(200)) ENGINE=InnoDB page_compressed=yes encrypted=yes;
insert into t1(b, c) values("mariadb", "mariabackup");

View File

@ -35,6 +35,7 @@ Created 11/5/1995 Heikki Tuuri
#include "mach0data.h"
#include "buf0buf.h"
#include "buf0checksum.h"
#include "ut0crc32.h"
#include <string.h>
#ifndef UNIV_INNOCHECKSUM
@ -478,7 +479,8 @@ static bool buf_page_decrypt_after_read(buf_page_t* bpage, fil_space_t* space)
byte* dst_frame = bpage->zip.data ? bpage->zip.data :
((buf_block_t*) bpage)->frame;
bool page_compressed = fil_page_is_compressed(dst_frame);
bool page_compressed = space->is_compressed()
&& buf_page_is_compressed(dst_frame, space->flags);
buf_pool_t* buf_pool = buf_pool_from_bpage(bpage);
if (bpage->id.page_no() == 0) {
@ -493,22 +495,31 @@ static bool buf_page_decrypt_after_read(buf_page_t* bpage, fil_space_t* space)
buf_tmp_buffer_t* slot;
uint key_version = buf_page_get_key_version(dst_frame, space->flags);
if (page_compressed) {
if (page_compressed && !key_version) {
/* the page we read is unencrypted */
/* Find free slot from temporary memory array */
decompress:
if (space->full_crc32()
&& buf_page_is_corrupted(true, dst_frame, space->flags)) {
return false;
}
slot = buf_pool_reserve_tmp_slot(buf_pool);
/* For decompression, use crypt_buf. */
buf_tmp_reserve_crypt_buf(slot);
decompress_with_slot:
ut_d(fil_page_type_validate(space, dst_frame));
bpage->write_size = fil_page_decompress(slot->crypt_buf,
dst_frame);
bpage->write_size = fil_page_decompress(
slot->crypt_buf, dst_frame, space->flags);
slot->release();
ut_ad(!bpage->write_size || fil_page_type_validate(space, dst_frame));
ut_ad(!bpage->write_size
|| fil_page_type_validate(space, dst_frame));
ut_ad(space->pending_io());
return bpage->write_size != 0;
}
@ -545,7 +556,8 @@ decrypt_failed:
ut_d(fil_page_type_validate(space, dst_frame));
if (fil_page_is_compressed_encrypted(dst_frame)) {
if ((space->full_crc32() && page_compressed)
|| fil_page_is_compressed_encrypted(dst_frame)) {
goto decompress_with_slot;
}
@ -882,19 +894,6 @@ buf_page_is_checksum_valid_none(
&& checksum_field1 == BUF_NO_CHECKSUM_MAGIC);
}
/** Checks if the page is in full crc32 checksum format.
@param[in] read_buf database page
@param[in] checksum_field checksum field
@return true if the page is in full crc32 checksum format. */
bool buf_page_is_checksum_valid_full_crc32(
const byte* read_buf,
size_t checksum_field)
{
const uint32_t full_crc32 = buf_calc_page_full_crc32(read_buf);
return checksum_field == full_crc32;
}
/** Checks whether the lsn present in the page is lesser than the
peek current lsn.
@param[in] check_lsn lsn to check
@ -934,6 +933,22 @@ static void buf_page_check_lsn(bool check_lsn, const byte* read_buf)
#endif /* !UNIV_INNOCHECKSUM */
}
/** Check if a page is all zeroes.
@param[in] read_buf database page
@param[in] page_size page frame size
@return whether the page is all zeroes */
bool buf_page_is_zeroes(const void* read_buf, size_t page_size)
{
const ulint* b = reinterpret_cast<const ulint*>(read_buf);
const ulint* const e = b + page_size / sizeof *b;
do {
if (*b++) {
return false;
}
} while (b != e);
return true;
}
/** Check if a page is corrupt.
@param[in] check_lsn whether the LSN should be checked
@param[in] read_buf database page
@ -949,14 +964,18 @@ buf_page_is_corrupted(
#ifndef UNIV_INNOCHECKSUM
DBUG_EXECUTE_IF("buf_page_import_corrupt_failure", return(true); );
#endif
if (FSP_FLAGS_FCRC32_HAS_MARKER(fsp_flags)) {
const byte* end = read_buf + srv_page_size;
uint crc32 = mach_read_from_4(end - FIL_PAGE_FCRC32_CHECKSUM);
if (fil_space_t::full_crc32(fsp_flags)) {
bool compressed = false, corrupted = false;
const uint size = buf_page_full_crc32_size(
read_buf, &compressed, &corrupted);
if (corrupted) {
return true;
}
const byte* end = read_buf + (size - FIL_PAGE_FCRC32_CHECKSUM);
uint crc32 = mach_read_from_4(end);
if (!crc32) {
const byte* b = read_buf;
while (b != end) if (*b++) goto nonzero;
/* An all-zero page is not corrupted. */
if (!crc32 && size == srv_page_size
&& buf_page_is_zeroes(read_buf, size)) {
return false;
}
@ -967,14 +986,17 @@ buf_page_is_corrupted(
crc32++;
}
});
nonzero:
if (!buf_page_is_checksum_valid_full_crc32(read_buf, crc32)) {
if (crc32 != ut_crc32(read_buf,
size - FIL_PAGE_FCRC32_CHECKSUM)) {
return true;
}
if (!mach_read_from_4(read_buf + FIL_PAGE_FCRC32_KEY_VERSION)
if (!compressed
&& !mach_read_from_4(FIL_PAGE_FCRC32_KEY_VERSION
+ read_buf)
&& memcmp(read_buf + (FIL_PAGE_LSN + 4),
end - FIL_PAGE_FCRC32_END_LSN, 4)) {
end - (FIL_PAGE_FCRC32_END_LSN
- FIL_PAGE_FCRC32_CHECKSUM), 4)) {
return true;
}
@ -3962,7 +3984,6 @@ buf_zip_decompress(
<< ", none: "
<< page_zip_calc_checksum(
frame, size, SRV_CHECKSUM_ALGORITHM_NONE);
goto err_exit;
}
@ -5846,13 +5867,16 @@ buf_mark_space_corrupt(buf_page_t* bpage, const fil_space_t* space)
/** Check if the encrypted page is corrupted for the full crc32 format.
@param[in] space_id page belongs to space id
@param[in] dst_frame page
@param[in] is_compressed compressed page
@return true if page is corrupted or false if it isn't */
static bool buf_encrypted_full_crc32_page_is_corrupted(
static bool buf_page_full_crc32_is_corrupted(
ulint space_id,
const byte* dst_frame)
const byte* dst_frame,
bool is_compressed)
{
if (memcmp(dst_frame + FIL_PAGE_LSN + 4,
dst_frame + srv_page_size - FIL_PAGE_FCRC32_END_LSN, 4)) {
if (!is_compressed
&& memcmp(dst_frame + FIL_PAGE_LSN + 4,
dst_frame + srv_page_size - FIL_PAGE_FCRC32_END_LSN, 4)) {
return true;
}
@ -5900,9 +5924,12 @@ static dberr_t buf_page_check_corrupt(buf_page_t* bpage, fil_space_t* space)
if (!still_encrypted) {
/* If traditional checksums match, we assume that page is
not anymore encrypted. */
if (key_version && space->full_crc32()) {
corrupted = buf_encrypted_full_crc32_page_is_corrupted(
space->id, dst_frame);
if (space->full_crc32()
&& !buf_page_is_zeroes(dst_frame, space->physical_size())
&& (key_version || space->is_compressed())) {
corrupted = buf_page_full_crc32_is_corrupted(
space->id, dst_frame,
space->is_compressed());
} else {
corrupted = buf_page_is_corrupted(
true, dst_frame, space->flags);
@ -7374,7 +7401,7 @@ buf_page_encrypt(
&& (!crypt_data->is_default_encryption()
|| srv_encrypt_tables);
bool page_compressed = FSP_FLAGS_HAS_PAGE_COMPRESSION(space->flags);
bool page_compressed = space->is_compressed();
if (!encrypted && !page_compressed) {
/* No need to encrypt or page compress the page.
@ -7398,6 +7425,19 @@ buf_page_encrypt(
buf_tmp_reserve_crypt_buf(slot);
byte *dst_frame = slot->crypt_buf;
const bool full_crc32 = space->full_crc32();
if (full_crc32) {
/* Write LSN for the full crc32 checksum before
encryption. Because lsn is one of the input for encryption. */
mach_write_to_8(src_frame + FIL_PAGE_LSN,
bpage->newest_modification);
if (!page_compressed) {
mach_write_to_4(
src_frame + srv_page_size - FIL_PAGE_FCRC32_END_LSN,
(ulint) bpage->newest_modification);
}
}
if (!page_compressed) {
not_compressed:
@ -7427,6 +7467,18 @@ not_compressed:
bpage->real_size = out_len;
if (full_crc32) {
ut_d(bool compressed = false);
out_len = buf_page_full_crc32_size(tmp,
#ifdef UNIV_DEBUG
&compressed,
#else
NULL,
#endif
NULL);
ut_ad(compressed);
}
/* Workaround for MDEV-15527. */
memset(tmp + out_len, 0 , srv_page_size - out_len);
ut_d(fil_page_type_validate(space, tmp));
@ -7440,6 +7492,13 @@ not_compressed:
dst_frame);
}
if (full_crc32) {
compile_time_assert(FIL_PAGE_FCRC32_CHECKSUM == 4);
mach_write_to_4(tmp + out_len - 4,
ut_crc32(tmp, out_len - 4));
ut_ad(!buf_page_is_corrupted(true, tmp, space->flags));
}
slot->out_buf = dst_frame = tmp;
}

View File

@ -1,7 +1,7 @@
/*****************************************************************************
Copyright (c) 1995, 2016, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2017, MariaDB Corporation.
Copyright (c) 2017, 2019, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
@ -102,14 +102,6 @@ buf_calc_page_old_checksum(const byte* page)
(ut_fold_binary(page, FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION)));
}
/** Calculate the CRC32 checksum for the whole page.
@param[in] page buffer page (srv_page_size bytes)
@return CRC32 value */
uint32_t buf_calc_page_full_crc32(const byte* page)
{
return ut_crc32(page, srv_page_size - FIL_PAGE_FCRC32_CHECKSUM);
}
/** Return a printable string describing the checksum algorithm.
@param[in] algo algorithm
@return algorithm name */

View File

@ -335,20 +335,6 @@ too_small:
goto start_again;
}
/** Check if a page is all zeroes.
@param[in] read_buf database page
@param[in] page_size page frame size
@return whether the page is all zeroes */
static bool buf_page_is_zeroes(const byte* read_buf, size_t page_size)
{
for (ulint i = 0; i < page_size; i++) {
if (read_buf[i] != 0) {
return false;
}
}
return true;
}
/**
At database startup initializes the doublewrite buffer memory structure if
we already have a doublewrite buffer created in the data files. If we are
@ -620,7 +606,8 @@ buf_dblwr_process()
} else {
/* Decompress the page before
validating the checksum. */
ulint decomp = fil_page_decompress(buf, read_buf);
ulint decomp = fil_page_decompress(buf, read_buf,
space->flags);
if (!decomp || (zip_size && decomp != srv_page_size)) {
goto bad;
}
@ -648,7 +635,7 @@ bad:
<< " from the doublewrite buffer.";
}
ulint decomp = fil_page_decompress(buf, page);
ulint decomp = fil_page_decompress(buf, page, space->flags);
if (!decomp || (zip_size && decomp != srv_page_size)) {
goto bad_doublewrite;
}
@ -801,52 +788,42 @@ buf_dblwr_update(
}
}
/********************************************************************//**
Check the LSN values on the page. */
static
void
buf_dblwr_check_page_lsn(
/*=====================*/
const page_t* page) /*!< in: page to check */
#ifdef UNIV_DEBUG
/** Check the LSN values on the page.
@param[in] page page to check
@param[in] s tablespace */
static void buf_dblwr_check_page_lsn(const page_t* page, const fil_space_t& s)
{
ibool page_compressed = (mach_read_from_2(page+FIL_PAGE_TYPE) == FIL_PAGE_PAGE_COMPRESSED);
uint key_version = mach_read_from_4(page + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION);
/* Ignore page compressed or encrypted pages */
if (page_compressed || key_version) {
if (s.is_compressed()
|| buf_page_get_key_version(page, s.flags)) {
return;
}
bool lsn_mismatch = false;
// MDEV-12026 FIXME: invoke fil_space_t::full_crc32()
if (memcmp(page + (FIL_PAGE_LSN + 4),
page + (srv_page_size
- FIL_PAGE_END_LSN_OLD_CHKSUM + 4),
4)) {
if (memcmp(page + (FIL_PAGE_LSN + 4),
page + (srv_page_size
- FIL_PAGE_FCRC32_END_LSN),
4)) {
lsn_mismatch = true;
}
}
if (lsn_mismatch) {
// MDEV-12026 FIXME: lsn2 depends on fil_space_t::full_crc32()!
const ulint lsn1 = mach_read_from_4(
page + FIL_PAGE_LSN + 4);
const ulint lsn2 = mach_read_from_4(
page + srv_page_size - FIL_PAGE_END_LSN_OLD_CHKSUM
+ 4);
ib::error() << "The page to be written seems corrupt!"
const unsigned lsn1 = mach_read_from_4(page + FIL_PAGE_LSN + 4),
lsn2 = mach_read_from_4(page + srv_page_size
- (s.full_crc32()
? FIL_PAGE_FCRC32_END_LSN
: FIL_PAGE_END_LSN_OLD_CHKSUM - 4));
if (UNIV_UNLIKELY(lsn1 != lsn2)) {
ib::error() << "The page to be written to "
<< s.chain.start->name <<
" seems corrupt!"
" The low 4 bytes of LSN fields do not match"
" (" << lsn1 << " != " << lsn2 << ")!"
" Noticed in the buffer pool.";
}
}
static void buf_dblwr_check_page_lsn(const buf_page_t& b, const byte* page)
{
if (fil_space_t* space = fil_space_acquire_for_io(b.id.space())) {
buf_dblwr_check_page_lsn(page, *space);
space->release_for_io();
}
}
#endif /* UNIV_DEBUG */
/********************************************************************//**
Asserts when a corrupt block is find during writing out data to the
disk. */
@ -962,8 +939,7 @@ buf_dblwr_write_block_to_datafile(
const_cast<buf_page_t*>(bpage));
ut_a(buf_block_get_state(block) == BUF_BLOCK_FILE_PAGE);
buf_dblwr_check_page_lsn(block->frame);
ut_d(buf_dblwr_check_page_lsn(block->page, block->frame));
fil_io(request,
sync, bpage->id, bpage->zip_size(), 0, bpage->real_size,
frame, block);
@ -1056,10 +1032,7 @@ try_again:
/* Check that the actual page in the buffer pool is
not corrupt and the LSN values are sane. */
buf_dblwr_check_block(block);
/* Check that the page as written to the doublewrite
buffer has sane LSN values. */
buf_dblwr_check_page_lsn(write_buf + len2);
ut_d(buf_dblwr_check_page_lsn(block->page, write_buf + len2));
}
/* Write out the first block of the doublewrite buffer */
@ -1239,8 +1212,8 @@ buf_dblwr_write_single_page(
/* Check that the page as written to the doublewrite
buffer has sane LSN values. */
if (!bpage->zip.data) {
buf_dblwr_check_page_lsn(
((buf_block_t*) bpage)->frame);
ut_d(buf_dblwr_check_page_lsn(
*bpage, ((buf_block_t*) bpage)->frame));
}
}

View File

@ -728,9 +728,9 @@ void buf_flush_write_complete(buf_page_t* bpage, bool dblwr)
/** Calculate the checksum of a page from compressed table and update
the page.
@param[in,out] page page to update
@param[in] size compressed page size
@param[in] lsn LSN to stamp on the page */
@param[in,out] page page to update
@param[in] size compressed page size
@param[in] lsn LSN to stamp on the page */
void
buf_flush_update_zip_checksum(
buf_frame_t* page,
@ -751,9 +751,15 @@ buf_flush_update_zip_checksum(
@param[in,out] page page to be updated */
void buf_flush_assign_full_crc32_checksum(byte* page)
{
uint32_t checksum = buf_calc_page_full_crc32(page);
mach_write_to_4(page + srv_page_size - FIL_PAGE_FCRC32_CHECKSUM,
checksum);
ut_d(bool compressed = false);
ut_d(bool corrupted = false);
ut_d(const uint size = buf_page_full_crc32_size(page, &compressed,
&corrupted));
ut_ad(!compressed);
ut_ad(!corrupted);
ut_ad(size == uint(srv_page_size));
const ulint payload = srv_page_size - FIL_PAGE_FCRC32_CHECKSUM;
mach_write_to_4(page + payload, ut_crc32(page, payload));
}
/** Initialize a page for writing to the tablespace.
@ -776,8 +782,6 @@ buf_flush_init_for_writing(
/* If page is encrypted in full crc32 format then
checksum stored already as a part of fil_encrypt_buf() */
ut_ad(use_full_checksum);
ut_ad(mach_read_from_4(
page + FIL_PAGE_FCRC32_KEY_VERSION));
return;
}

View File

@ -565,7 +565,7 @@ static byte* fil_encrypt_buf_for_non_full_checksum(
uint header_len = FIL_PAGE_DATA;
if (page_compressed) {
header_len += (FIL_PAGE_COMPRESSED_SIZE + FIL_PAGE_COMPRESSION_METHOD_SIZE);
header_len += FIL_PAGE_ENCRYPT_COMP_METADATA_LEN;
}
/* FIL page header is not encrypted */
@ -627,7 +627,7 @@ static byte* fil_encrypt_buf_for_non_full_checksum(
@param[in] src_frame Page to encrypt
@param[in,out] dst_frame Output buffer
@return encrypted buffer or NULL */
static byte* fil_encrypt_buf_for_full_checksum(
static byte* fil_encrypt_buf_for_full_crc32(
fil_space_crypt_t* crypt_data,
ulint space,
ulint offset,
@ -635,8 +635,16 @@ static byte* fil_encrypt_buf_for_full_checksum(
const byte* src_frame,
byte* dst_frame)
{
const uint size = uint(srv_page_size);
uint key_version = fil_crypt_get_latest_key_version(crypt_data);
ut_d(bool corrupted = false);
const uint size = buf_page_full_crc32_size(src_frame, NULL,
#ifdef UNIV_DEBUG
&corrupted
#else
NULL
#endif
);
ut_ad(!corrupted);
uint srclen = size - (FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION
+ FIL_PAGE_FCRC32_CHECKSUM);
const byte* src = src_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION;
@ -657,8 +665,10 @@ static byte* fil_encrypt_buf_for_full_checksum(
ut_a(rc == MY_AES_OK);
ut_a(dstlen == srclen);
ib_uint32_t checksum = buf_calc_page_full_crc32(dst_frame);
mach_write_to_4(dst_frame + size - FIL_PAGE_FCRC32_CHECKSUM, checksum);
const ulint payload = size - FIL_PAGE_FCRC32_CHECKSUM;
mach_write_to_4(dst_frame + payload, ut_crc32(dst_frame, payload));
/* Clean the rest of the buffer. FIXME: Punch holes when writing! */
memset(dst_frame + (payload + 4), 0, srv_page_size - (payload + 4));
srv_stats.pages_encrypted.inc();
@ -689,14 +699,14 @@ fil_encrypt_buf(
bool use_full_checksum)
{
if (use_full_checksum) {
return fil_encrypt_buf_for_full_checksum(
crypt_data, space, offset,
lsn, src_frame, dst_frame);
return fil_encrypt_buf_for_full_crc32(
crypt_data, space, offset,
lsn, src_frame, dst_frame);
}
return fil_encrypt_buf_for_non_full_checksum(
crypt_data, space, offset, lsn,
src_frame, zip_size, dst_frame);
crypt_data, space, offset, lsn,
src_frame, zip_size, dst_frame);
}
/** Check whether these page types are allowed to encrypt.
@ -750,15 +760,6 @@ fil_space_encrypt(
const bool full_crc32 = space->full_crc32();
if (full_crc32) {
/* Write LSN for the full crc32 checksum before
encryption. Because lsn is one of the input for encryption. */
mach_write_to_8(src_frame + FIL_PAGE_LSN, lsn);
mach_write_to_4(
src_frame + srv_page_size - FIL_PAGE_FCRC32_END_LSN,
(ulint) lsn);
}
byte* tmp = fil_encrypt_buf(crypt_data, space->id, offset, lsn,
src_frame, zip_size, dst_frame,
full_crc32);
@ -768,42 +769,51 @@ fil_space_encrypt(
/* Verify that encrypted buffer is not corrupted */
dberr_t err = DB_SUCCESS;
byte* src = src_frame;
bool page_compressed_encrypted = (mach_read_from_2(tmp+FIL_PAGE_TYPE) == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED);
byte uncomp_mem[UNIV_PAGE_SIZE_MAX];
byte tmp_mem[UNIV_PAGE_SIZE_MAX];
if (page_compressed_encrypted) {
memcpy(uncomp_mem, src, srv_page_size);
ulint unzipped1 = fil_page_decompress(
tmp_mem, uncomp_mem);
ut_ad(unzipped1);
if (unzipped1 != srv_page_size) {
src = uncomp_mem;
}
}
ut_ad(full_crc32
|| !buf_page_is_corrupted(true, src, space->flags));
ut_ad(fil_space_decrypt(space->id, crypt_data, tmp_mem,
space->physical_size(), space->flags,
tmp, &err));
ut_ad(err == DB_SUCCESS);
/* Need to decompress the page if it was also compressed */
if (page_compressed_encrypted) {
byte buf[UNIV_PAGE_SIZE_MAX];
memcpy(buf, tmp_mem, srv_page_size);
ulint unzipped2 = fil_page_decompress(tmp_mem, buf);
ut_ad(unzipped2);
}
if (full_crc32) {
bool compressed = false, corrupted = false;
uint size = buf_page_full_crc32_size(
tmp, &compressed, &corrupted);
ut_ad(!corrupted);
ut_ad(!compressed == (size == srv_page_size));
ut_ad(fil_space_decrypt(space->id, crypt_data, tmp_mem,
size, space->flags, tmp,
&err));
ut_ad(err == DB_SUCCESS);
memcpy(tmp_mem, src, FIL_PAGE_OFFSET);
ut_ad(!memcmp(src, tmp_mem,
(space->physical_size()
- FIL_PAGE_FCRC32_CHECKSUM)));
(size - FIL_PAGE_FCRC32_CHECKSUM)));
} else {
bool page_compressed_encrypted =
(mach_read_from_2(tmp+FIL_PAGE_TYPE)
== FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED);
byte uncomp_mem[UNIV_PAGE_SIZE_MAX];
if (page_compressed_encrypted) {
memcpy(uncomp_mem, src, srv_page_size);
ulint unzipped1 = fil_page_decompress(
tmp_mem, uncomp_mem, space->flags);
ut_ad(unzipped1);
if (unzipped1 != srv_page_size) {
src = uncomp_mem;
}
}
ut_ad(!buf_page_is_corrupted(true, src, space->flags));
ut_ad(fil_space_decrypt(space->id, crypt_data, tmp_mem,
space->physical_size(),
space->flags, tmp, &err));
ut_ad(err == DB_SUCCESS);
if (page_compressed_encrypted) {
memcpy(tmp_mem, uncomp_mem, srv_page_size);
ulint unzipped2 = fil_page_decompress(
uncomp_mem, tmp_mem, space->flags);
ut_ad(unzipped2);
}
memcpy(tmp_mem + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION,
src + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION, 8);
ut_ad(!memcmp(src, tmp_mem, space->physical_size()));
@ -821,7 +831,7 @@ fil_space_encrypt(
@param[in,out] src_frame Page to decrypt
@param[out] err DB_SUCCESS or DB_DECRYPTION_FAILED
@return true if page decrypted, false if not.*/
static bool fil_space_decrypt_for_full_checksum(
static bool fil_space_decrypt_full_crc32(
ulint space,
fil_space_crypt_t* crypt_data,
byte* tmp_frame,
@ -838,7 +848,8 @@ static bool fil_space_decrypt_for_full_checksum(
return false;
}
ut_a(crypt_data != NULL && crypt_data->is_encrypted());
ut_ad(crypt_data);
ut_ad(crypt_data->is_encrypted());
memcpy(tmp_frame, src_frame, FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION);
@ -846,9 +857,16 @@ static bool fil_space_decrypt_for_full_checksum(
const byte* src = src_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION;
byte* dst = tmp_frame + FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION;
uint dstlen = 0;
uint srclen = uint(srv_page_size)
- (FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION
+ FIL_PAGE_FCRC32_CHECKSUM);
bool corrupted = false;
uint size = buf_page_full_crc32_size(src_frame, NULL, &corrupted);
if (UNIV_UNLIKELY(corrupted)) {
fail:
*err = DB_DECRYPTION_FAILED;
return false;
}
uint srclen = size - (FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION
+ FIL_PAGE_FCRC32_CHECKSUM);
int rc = encryption_scheme_decrypt(src, srclen, dst, &dstlen,
crypt_data, key_version,
@ -856,8 +874,7 @@ static bool fil_space_decrypt_for_full_checksum(
if (rc != MY_AES_OK || dstlen != srclen) {
if (rc == -1) {
*err = DB_DECRYPTION_FAILED;
return false;
goto fail;
}
ib::fatal() << "Unable to decrypt data-block "
@ -913,8 +930,7 @@ static bool fil_space_decrypt_for_non_full_checksum(
uint header_len = FIL_PAGE_DATA;
if (page_compressed) {
header_len += (FIL_PAGE_COMPRESSED_SIZE
+ FIL_PAGE_COMPRESSION_METHOD_SIZE);
header_len += FIL_PAGE_ENCRYPT_COMP_METADATA_LEN;
}
/* Copy FIL page header, it is not encrypted */
@ -985,7 +1001,7 @@ fil_space_decrypt(
dberr_t* err)
{
if (fil_space_t::full_crc32(fsp_flags)) {
return fil_space_decrypt_for_full_checksum(
return fil_space_decrypt_full_crc32(
space_id, crypt_data, tmp_frame, src_frame, err);
}

View File

@ -418,6 +418,42 @@ fil_space_is_flushed(
return(true);
}
/** Validate the compression algorithm for full crc32 format.
@param[in] space tablespace object
@return whether the compression algorithm support */
static bool fil_comp_algo_validate(const fil_space_t* space)
{
if (!space->full_crc32()) {
return true;
}
DBUG_EXECUTE_IF("fil_comp_algo_validate_fail",
return false;);
ulint comp_algo = space->get_compression_algo();
switch (comp_algo) {
case PAGE_UNCOMPRESSED:
case PAGE_ZLIB_ALGORITHM:
#ifdef HAVE_LZ4
case PAGE_LZ4_ALGORITHM:
#endif /* HAVE_LZ4 */
#ifdef HAVE_LZO
case PAGE_LZO_ALGORITHM:
#endif /* HAVE_LZO */
#ifdef HAVE_LZMA
case PAGE_LZMA_ALGORITHM:
#endif /* HAVE_LZMA */
#ifdef HAVE_BZIP2
case PAGE_BZIP2_ALGORITHM:
#endif /* HAVE_BZIP2 */
#ifdef HAVE_SNAPPY
case PAGE_SNAPPY_ALGORITHM:
#endif /* HAVE_SNAPPY */
return true;
}
return false;
}
/** Append a file to the chain of files of a space.
@param[in] name file name of a file that is not open
@ -623,10 +659,16 @@ retry:
}
if (!node->read_page0(first_time_open)) {
fail:
os_file_close(node->handle);
node->handle = OS_FILE_CLOSED;
return false;
}
if (first_time_open && !fil_comp_algo_validate(space)) {
goto fail;
}
} else if (space->purpose == FIL_TYPE_LOG) {
node->handle = os_file_create(
innodb_log_file_key, node->name, OS_FILE_OPEN,

View File

@ -73,49 +73,24 @@ Updated 14/02/2015
#include "snappy-c.h"
#endif
/** Compress a page_compressed page before writing to a data file.
/** Compress a page for the given compression algorithm.
@param[in] buf page to be compressed
@param[out] out_buf compressed page
@param[in] flags tablespace flags
@param[in] block_size file system block size
@param[in] encrypted whether the page will be subsequently encrypted
@return actual length of compressed page
@retval 0 if the page was not compressed */
ulint fil_page_compress(
@param[in] header_len header length of the page
@param[in] comp_algo compression algorithm
@param[in] comp_level compression level
@return actual length of compressed page data
@retval 0 if the page was not compressed */
static ulint fil_page_compress_low(
const byte* buf,
byte* out_buf,
ulint flags,
ulint block_size,
bool encrypted)
ulint header_len,
ulint comp_algo,
ulint comp_level)
{
int comp_level = int(fsp_flags_get_page_compression_level(flags));
ulint header_len = FIL_PAGE_DATA + FIL_PAGE_COMPRESSED_SIZE;
/* Cache to avoid change during function execution */
ulint comp_method = innodb_compression_algorithm;
if (encrypted) {
header_len += FIL_PAGE_COMPRESSION_METHOD_SIZE;
}
/* Let's not compress file space header or
extent descriptor */
switch (fil_page_get_type(buf)) {
case 0:
case FIL_PAGE_TYPE_FSP_HDR:
case FIL_PAGE_TYPE_XDES:
case FIL_PAGE_PAGE_COMPRESSED:
return 0;
}
/* If no compression level was provided to this table, use system
default level */
if (comp_level == 0) {
comp_level = int(page_zip_level);
}
ulint write_size = srv_page_size - header_len;
switch (comp_method) {
switch (comp_algo) {
default:
ut_ad(!"unknown compression method");
/* fall through */
@ -125,10 +100,9 @@ ulint fil_page_compress(
{
ulong len = uLong(write_size);
if (Z_OK == compress2(
out_buf + header_len, &len,
buf, uLong(srv_page_size), comp_level)) {
write_size = len;
goto success;
out_buf + header_len, &len, buf,
uLong(srv_page_size), int(comp_level))) {
return len;
}
}
break;
@ -146,10 +120,7 @@ ulint fil_page_compress(
int(srv_page_size), int(write_size));
# endif
if (write_size) {
goto success;
}
break;
return write_size;
#endif /* HAVE_LZ4 */
#ifdef HAVE_LZO
case PAGE_LZO_ALGORITHM: {
@ -160,8 +131,7 @@ ulint fil_page_compress(
out_buf + header_len, &len,
out_buf + srv_page_size)
&& len <= write_size) {
write_size = len;
goto success;
return len;
}
break;
}
@ -175,8 +145,7 @@ ulint fil_page_compress(
buf, srv_page_size, out_buf + header_len,
&out_pos, write_size)
&& out_pos <= write_size) {
write_size = out_pos;
goto success;
return out_pos;
}
break;
}
@ -192,8 +161,7 @@ ulint fil_page_compress(
reinterpret_cast<const char*>(buf)),
unsigned(srv_page_size), 1, 0, 0)
&& len <= write_size) {
write_size = len;
goto success;
return len;
}
break;
}
@ -209,62 +177,188 @@ ulint fil_page_compress(
reinterpret_cast<char*>(out_buf) + header_len,
&len)
&& len <= write_size) {
write_size = len;
goto success;
return len;
}
break;
}
#endif /* HAVE_SNAPPY */
}
srv_stats.pages_page_compression_error.inc();
return 0;
success:
}
/** Compress a page_compressed page for full crc32 format.
@param[in] buf page to be compressed
@param[out] out_buf compressed page
@param[in] flags tablespace flags
@param[in] block_size file system block size
@return actual length of compressed page
@retval 0 if the page was not compressed */
static ulint fil_page_compress_for_full_crc32(
const byte* buf,
byte* out_buf,
ulint flags,
ulint block_size,
bool encrypted)
{
ulint comp_level = fsp_flags_get_page_compression_level(flags);
if (comp_level == 0) {
comp_level = page_zip_level;
}
const ulint header_len = FIL_PAGE_COMP_ALGO;
ulint write_size = fil_page_compress_low(
buf, out_buf, header_len,
fil_space_t::get_compression_algo(flags), comp_level);
if (write_size == 0) {
fail:
srv_stats.pages_page_compression_error.inc();
return 0;
}
write_size += header_len;
const ulint actual_size = write_size;
/* Write the actual length of the data & page type
for full crc32 format. */
const bool lsb = fil_space_t::full_crc32_page_compressed_len(flags);
/* In the MSB, store the rounded-up page size. */
write_size = (write_size + lsb + (4 + 255)) & ~255;
if (write_size >= srv_page_size) {
goto fail;
}
/* Set up the page header */
memcpy(out_buf, buf, header_len);
out_buf[FIL_PAGE_TYPE] = 1U << (FIL_PAGE_COMPRESS_FCRC32_MARKER - 8);
out_buf[FIL_PAGE_TYPE + 1] = byte(write_size >> 8);
/* Clean up the buffer for the remaining write_size (except checksum) */
memset(out_buf + actual_size, 0, write_size - actual_size - 4);
if (lsb) {
/* Store the LSB */
out_buf[write_size - 5] = byte(actual_size + (1 + 4));
}
if (!block_size) {
block_size = 512;
}
ut_ad(write_size);
if (write_size & (block_size - 1)) {
size_t tmp = write_size;
write_size = (write_size + (block_size - 1))
& ~(block_size - 1);
memset(out_buf + tmp, 0, write_size - tmp);
}
#ifdef UNIV_DEBUG
/* Verify that page can be decompressed */
{
page_t tmp_buf[UNIV_PAGE_SIZE_MAX];
page_t page[UNIV_PAGE_SIZE_MAX];
memcpy(page, out_buf, write_size);
ut_ad(fil_page_decompress(tmp_buf, page, flags));
}
#endif
srv_stats.page_compression_saved.add(srv_page_size - write_size);
srv_stats.pages_page_compressed.inc();
return write_size;
}
/** Compress a page_compressed page for non full crc32 format.
@param[in] buf page to be compressed
@param[out] out_buf compressed page
@param[in] flags tablespace flags
@param[in] block_size file system block size
@param[in] encrypted whether the page will be subsequently encrypted
@return actual length of compressed page
@retval 0 if the page was not compressed */
static ulint fil_page_compress_for_non_full_crc32(
const byte* buf,
byte* out_buf,
ulint flags,
ulint block_size,
bool encrypted)
{
int comp_level = int(fsp_flags_get_page_compression_level(flags));
ulint header_len = FIL_PAGE_DATA + FIL_PAGE_COMP_METADATA_LEN;
/* Cache to avoid change during function execution */
ulint comp_algo = innodb_compression_algorithm;
if (encrypted) {
header_len += FIL_PAGE_ENCRYPT_COMP_ALGO;
}
/* If no compression level was provided to this table, use system
default level */
if (comp_level == 0) {
comp_level = int(page_zip_level);
}
ulint write_size = fil_page_compress_low(
buf, out_buf,
header_len, comp_algo, comp_level);
if (write_size == 0) {
srv_stats.pages_page_compression_error.inc();
return 0;
}
/* Set up the page header */
memcpy(out_buf, buf, FIL_PAGE_DATA);
/* Set up the checksum */
mach_write_to_4(out_buf+FIL_PAGE_SPACE_OR_CHKSUM, BUF_NO_CHECKSUM_MAGIC);
mach_write_to_4(out_buf + FIL_PAGE_SPACE_OR_CHKSUM, BUF_NO_CHECKSUM_MAGIC);
/* Set up the compression algorithm */
mach_write_to_8(out_buf+FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION, comp_method);
mach_write_to_8(out_buf + FIL_PAGE_COMP_ALGO, comp_algo);
if (encrypted) {
/* Set up the correct page type */
mach_write_to_2(out_buf+FIL_PAGE_TYPE, FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED);
mach_write_to_2(out_buf+FIL_PAGE_DATA+FIL_PAGE_COMPRESSED_SIZE, comp_method);
mach_write_to_2(out_buf + FIL_PAGE_TYPE,
FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED);
mach_write_to_2(out_buf + FIL_PAGE_DATA
+ FIL_PAGE_ENCRYPT_COMP_ALGO, comp_algo);
} else {
/* Set up the correct page type */
mach_write_to_2(out_buf+FIL_PAGE_TYPE, FIL_PAGE_PAGE_COMPRESSED);
mach_write_to_2(out_buf + FIL_PAGE_TYPE, FIL_PAGE_PAGE_COMPRESSED);
}
/* Set up the actual payload lenght */
mach_write_to_2(out_buf+FIL_PAGE_DATA, write_size);
mach_write_to_2(out_buf + FIL_PAGE_DATA + FIL_PAGE_COMP_SIZE,
write_size);
#ifdef UNIV_DEBUG
/* Verify */
ut_ad(fil_page_is_compressed(out_buf) || fil_page_is_compressed_encrypted(out_buf));
ut_ad(mach_read_from_4(out_buf+FIL_PAGE_SPACE_OR_CHKSUM) == BUF_NO_CHECKSUM_MAGIC);
ut_ad(mach_read_from_2(out_buf+FIL_PAGE_DATA) == write_size);
ut_ad(mach_read_from_8(out_buf+FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION) == (ulint)comp_method ||
mach_read_from_2(out_buf+FIL_PAGE_DATA+FIL_PAGE_COMPRESSED_SIZE) == (ulint)comp_method);
ut_ad(fil_page_is_compressed(out_buf)
|| fil_page_is_compressed_encrypted(out_buf));
ut_ad(mach_read_from_4(out_buf + FIL_PAGE_SPACE_OR_CHKSUM)
== BUF_NO_CHECKSUM_MAGIC);
ut_ad(mach_read_from_2(out_buf + FIL_PAGE_DATA + FIL_PAGE_COMP_SIZE)
== write_size);
bool is_compressed = (mach_read_from_8(out_buf + FIL_PAGE_COMP_ALGO)
== (ulint) comp_algo);
bool is_encrypted_compressed =
(mach_read_from_2(out_buf + FIL_PAGE_DATA
+ FIL_PAGE_ENCRYPT_COMP_ALGO)
== (ulint) comp_algo);
ut_ad(is_compressed || is_encrypted_compressed);
/* Verify that page can be decompressed */
{
page_t tmp_buf[UNIV_PAGE_SIZE_MAX];
page_t page[UNIV_PAGE_SIZE_MAX];
memcpy(page, out_buf, srv_page_size);
ut_ad(fil_page_decompress(tmp_buf, page));
ulint fsp_flags = 0;
if (fil_space_t::full_crc32(flags)) {
/* Need to construct flag for new crc32 checksum */
fsp_flags = 1U << FSP_FLAGS_FCRC32_POS_MARKER;
fsp_flags |= FSP_FLAGS_FCRC32_PAGE_SSIZE();
ut_ad(!buf_page_is_corrupted(false, page, fsp_flags));
} else {
fsp_flags = flags;
ut_ad(!buf_page_is_corrupted(false, page, fsp_flags));
}
ut_ad(fil_page_decompress(tmp_buf, page, flags));
ut_ad(!buf_page_is_corrupted(false, page, flags));
}
#endif /* UNIV_DEBUG */
@ -279,7 +373,8 @@ success:
/* Actual write needs to be alligned on block size */
if (write_size % block_size) {
size_t tmp = write_size;
write_size = (size_t)ut_uint64_align_up((ib_uint64_t)write_size, block_size);
write_size = (size_t)ut_uint64_align_up(
(ib_uint64_t)write_size, block_size);
/* Clean up the end of buffer */
memset(out_buf+tmp, 0, write_size - tmp);
#ifdef UNIV_DEBUG
@ -294,28 +389,203 @@ success:
return write_size;
}
/** Compress a page_compressed page before writing to a data file.
@param[in] buf page to be compressed
@param[out] out_buf compressed page
@param[in] flags tablespace flags
@param[in] block_size file system block size
@param[in] encrypted whether the page will be subsequently encrypted
@return actual length of compressed page
@retval 0 if the page was not compressed */
ulint fil_page_compress(
const byte* buf,
byte* out_buf,
ulint flags,
ulint block_size,
bool encrypted)
{
/* The full_crc32 page_compressed format assumes this. */
ut_ad(!(block_size & 255));
ut_ad(ut_is_2pow(block_size));
/* Let's not compress file space header or
extent descriptor */
switch (fil_page_get_type(buf)) {
case 0:
case FIL_PAGE_TYPE_FSP_HDR:
case FIL_PAGE_TYPE_XDES:
case FIL_PAGE_PAGE_COMPRESSED:
return 0;
}
if (fil_space_t::full_crc32(flags)) {
return fil_page_compress_for_full_crc32(
buf, out_buf, flags, block_size, encrypted);
}
return fil_page_compress_for_non_full_crc32(
buf, out_buf, flags, block_size, encrypted);
}
/** Decompress a page that may be subject to page_compressed compression.
@param[in,out] tmp_buf temporary buffer (of innodb_page_size)
@param[in,out] buf possibly compressed page buffer
@param[in] comp_algo compression algorithm
@param[in] header_len header length of the page
@param[in] actual size actual size of the page
@retval true if the page is decompressed or false */
static bool fil_page_decompress_low(
byte* tmp_buf,
byte* buf,
ulint comp_algo,
ulint header_len,
ulint actual_size)
{
switch (comp_algo) {
default:
ib::error() << "Unknown compression algorithm "
<< comp_algo;
return false;
case PAGE_ZLIB_ALGORITHM:
{
uLong len = srv_page_size;
return (Z_OK == uncompress(tmp_buf, &len,
buf + header_len,
uLong(actual_size))
&& len == srv_page_size);
}
#ifdef HAVE_LZ4
case PAGE_LZ4_ALGORITHM:
return LZ4_decompress_safe(
reinterpret_cast<const char*>(buf) + header_len,
reinterpret_cast<char*>(tmp_buf),
actual_size, srv_page_size) == int(srv_page_size);
#endif /* HAVE_LZ4 */
#ifdef HAVE_LZO
case PAGE_LZO_ALGORITHM:
{
lzo_uint len_lzo = srv_page_size;
return (LZO_E_OK == lzo1x_decompress_safe(
buf + header_len,
actual_size, tmp_buf, &len_lzo, NULL)
&& len_lzo == srv_page_size);
}
#endif /* HAVE_LZO */
#ifdef HAVE_LZMA
case PAGE_LZMA_ALGORITHM:
{
size_t src_pos = 0;
size_t dst_pos = 0;
uint64_t memlimit = UINT64_MAX;
return LZMA_OK == lzma_stream_buffer_decode(
&memlimit, 0, NULL, buf + header_len,
&src_pos, actual_size, tmp_buf, &dst_pos,
srv_page_size)
&& dst_pos == srv_page_size;
}
#endif /* HAVE_LZMA */
#ifdef HAVE_BZIP2
case PAGE_BZIP2_ALGORITHM:
{
unsigned int dst_pos = srv_page_size;
return BZ_OK == BZ2_bzBuffToBuffDecompress(
reinterpret_cast<char*>(tmp_buf),
&dst_pos,
reinterpret_cast<char*>(buf) + header_len,
actual_size, 1, 0)
&& dst_pos == srv_page_size;
}
#endif /* HAVE_BZIP2 */
#ifdef HAVE_SNAPPY
case PAGE_SNAPPY_ALGORITHM:
{
size_t olen = srv_page_size;
return SNAPPY_OK == snappy_uncompress(
reinterpret_cast<const char*>(buf)
+ header_len,
actual_size,
reinterpret_cast<char*>(tmp_buf), &olen)
&& olen == srv_page_size;
}
#endif /* HAVE_SNAPPY */
}
return false;
}
/** Decompress a page for full crc32 format.
@param[in,out] tmp_buf temporary buffer (of innodb_page_size)
@param[in,out] buf possibly compressed page buffer
@param[in] flags tablespace flags
@return size of the compressed data
@retval 0 if decompression failed
@retval srv_page_size if the page was not compressed */
ulint fil_page_decompress(byte* tmp_buf, byte* buf)
ulint fil_page_decompress_for_full_crc32(byte* tmp_buf, byte* buf, ulint flags)
{
ut_ad(fil_space_t::full_crc32(flags));
bool compressed = false;
size_t size = buf_page_full_crc32_size(buf, &compressed, NULL);
if (!compressed) {
ut_ad(size == srv_page_size);
return size;
}
if (!fil_space_t::is_compressed(flags)) {
return 0;
}
if (size >= srv_page_size) {
return 0;
}
if (fil_space_t::full_crc32_page_compressed_len(flags)) {
compile_time_assert(FIL_PAGE_FCRC32_CHECKSUM == 4);
if (size_t lsb = buf[size - 5]) {
size += lsb - 0x100;
}
size -= 5;
}
const size_t header_len = FIL_PAGE_COMP_ALGO;
if (!fil_page_decompress_low(tmp_buf, buf,
fil_space_t::get_compression_algo(flags),
header_len, size - header_len)) {
return 0;
}
srv_stats.pages_page_decompressed.inc();
memcpy(buf, tmp_buf, srv_page_size);
return size;
}
/** Decompress a page for non full crc32 format.
@param[in,out] tmp_buf temporary buffer (of innodb_page_size)
@param[in,out] buf possibly compressed page buffer
@return size of the compressed data
@retval 0 if decompression failed
@retval srv_page_size if the page was not compressed */
ulint fil_page_decompress_for_non_full_crc32(
byte* tmp_buf,
byte* buf)
{
const unsigned ptype = mach_read_from_2(buf+FIL_PAGE_TYPE);
ulint header_len;
uint64_t compression_alg;
uint comp_algo;
switch (ptype) {
case FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED:
header_len = FIL_PAGE_DATA + FIL_PAGE_COMPRESSED_SIZE
+ FIL_PAGE_COMPRESSION_METHOD_SIZE;
compression_alg = mach_read_from_2(
FIL_PAGE_DATA + FIL_PAGE_COMPRESSED_SIZE + buf);
header_len= FIL_PAGE_DATA + FIL_PAGE_ENCRYPT_COMP_METADATA_LEN;
comp_algo = mach_read_from_2(
FIL_PAGE_DATA + FIL_PAGE_ENCRYPT_COMP_ALGO + buf);
break;
case FIL_PAGE_PAGE_COMPRESSED:
header_len = FIL_PAGE_DATA + FIL_PAGE_COMPRESSED_SIZE;
compression_alg = mach_read_from_8(
FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION + buf);
header_len = FIL_PAGE_DATA + FIL_PAGE_COMP_METADATA_LEN;
if (mach_read_from_6(FIL_PAGE_COMP_ALGO + buf)) {
return 0;
}
comp_algo = mach_read_from_2(FIL_PAGE_COMP_ALGO + 6 + buf);
break;
default:
return srv_page_size;
@ -326,99 +596,38 @@ ulint fil_page_decompress(byte* tmp_buf, byte* buf)
return 0;
}
ulint actual_size = mach_read_from_2(buf + FIL_PAGE_DATA);
ulint actual_size = mach_read_from_2(buf + FIL_PAGE_DATA
+ FIL_PAGE_COMP_SIZE);
/* Check if payload size is corrupted */
if (actual_size == 0 || actual_size > srv_page_size - header_len) {
return 0;
}
switch (compression_alg) {
default:
ib::error() << "Unknown compression algorithm "
<< compression_alg;
if (!fil_page_decompress_low(tmp_buf, buf, comp_algo, header_len,
actual_size)) {
return 0;
case PAGE_ZLIB_ALGORITHM:
{
uLong len = srv_page_size;
if (Z_OK == uncompress(tmp_buf, &len,
buf + header_len,
uLong(actual_size))
&& len == srv_page_size) {
break;
}
}
return 0;
#ifdef HAVE_LZ4
case PAGE_LZ4_ALGORITHM:
if (LZ4_decompress_safe(reinterpret_cast<const char*>(buf)
+ header_len,
reinterpret_cast<char*>(tmp_buf),
actual_size, srv_page_size)
== int(srv_page_size)) {
break;
}
return 0;
#endif /* HAVE_LZ4 */
#ifdef HAVE_LZO
case PAGE_LZO_ALGORITHM: {
lzo_uint len_lzo = srv_page_size;
if (LZO_E_OK == lzo1x_decompress_safe(
buf + header_len,
actual_size, tmp_buf, &len_lzo, NULL)
&& len_lzo == srv_page_size) {
break;
}
return 0;
}
#endif /* HAVE_LZO */
#ifdef HAVE_LZMA
case PAGE_LZMA_ALGORITHM: {
size_t src_pos = 0;
size_t dst_pos = 0;
uint64_t memlimit = UINT64_MAX;
if (LZMA_OK == lzma_stream_buffer_decode(
&memlimit, 0, NULL, buf + header_len,
&src_pos, actual_size, tmp_buf, &dst_pos,
srv_page_size)
&& dst_pos == srv_page_size) {
break;
}
return 0;
}
#endif /* HAVE_LZMA */
#ifdef HAVE_BZIP2
case PAGE_BZIP2_ALGORITHM: {
unsigned int dst_pos = srv_page_size;
if (BZ_OK == BZ2_bzBuffToBuffDecompress(
reinterpret_cast<char*>(tmp_buf),
&dst_pos,
reinterpret_cast<char*>(buf) + header_len,
actual_size, 1, 0)
&& dst_pos == srv_page_size) {
break;
}
return 0;
}
#endif /* HAVE_BZIP2 */
#ifdef HAVE_SNAPPY
case PAGE_SNAPPY_ALGORITHM: {
size_t olen = srv_page_size;
if (SNAPPY_OK == snappy_uncompress(
reinterpret_cast<const char*>(buf) + header_len,
actual_size,
reinterpret_cast<char*>(tmp_buf), &olen)
&& olen == srv_page_size) {
break;
}
return 0;
}
#endif /* HAVE_SNAPPY */
}
srv_stats.pages_page_decompressed.inc();
memcpy(buf, tmp_buf, srv_page_size);
return actual_size;
}
/** Decompress a page that may be subject to page_compressed compression.
@param[in,out] tmp_buf temporary buffer (of innodb_page_size)
@param[in,out] buf possibly compressed page buffer
@return size of the compressed data
@retval 0 if decompression failed
@retval srv_page_size if the page was not compressed */
ulint fil_page_decompress(
byte* tmp_buf,
byte* buf,
ulint flags)
{
if (fil_space_t::full_crc32(flags)) {
return fil_page_decompress_for_full_crc32(tmp_buf, buf, flags);
}
return fil_page_decompress_for_non_full_crc32(tmp_buf, buf);
}

View File

@ -1,7 +1,7 @@
/*****************************************************************************
Copyright (c) 2013, 2016, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2017, 2019, MariaDB Corporation.
Copyright (c) 2017, 2018, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software

View File

@ -11646,9 +11646,8 @@ create_table_info_t::check_table_options()
except if innodb_checksum_algorithm=full_crc32.
Do not allow ENCRYPTED=YES if any SPATIAL INDEX exists. */
if (options->encryption != FIL_ENCRYPTION_ON
|| (!options->page_compressed
&& srv_checksum_algorithm
>= SRV_CHECKSUM_ALGORITHM_FULL_CRC32)) {
|| srv_checksum_algorithm
>= SRV_CHECKSUM_ALGORITHM_FULL_CRC32) {
break;
}
for (ulint i = 0; i < m_form->s->keys; i++) {

View File

@ -1785,16 +1785,6 @@ ha_innobase::check_if_supported_inplace_alter(
update_thd();
/* MDEV-12026 FIXME: Implement and allow
innodb_checksum_algorithm=full_crc32 for page_compressed! */
if (m_prebuilt->table->space
&& m_prebuilt->table->space->full_crc32()
&& altered_table->s->option_struct
&& altered_table->s->option_struct->page_compressed) {
ut_ad(!table->s->option_struct->page_compressed);
DBUG_RETURN(HA_ALTER_INPLACE_NOT_SUPPORTED);
}
if (ha_alter_info->handler_flags
& ~(INNOBASE_INPLACE_IGNORE
| INNOBASE_ALTER_INSTANT
@ -10261,11 +10251,17 @@ commit_cache_norebuild(
bool update = !(space->flags
& FSP_FLAGS_MASK_PAGE_COMPRESSION);
mutex_enter(&fil_system.mutex);
space->flags = (~FSP_FLAGS_MASK_MEM_COMPRESSION_LEVEL
& (space->flags
| FSP_FLAGS_MASK_PAGE_COMPRESSION))
| ctx->page_compression_level
space->flags &= ~FSP_FLAGS_MASK_MEM_COMPRESSION_LEVEL;
space->flags |= ctx->page_compression_level
<< FSP_FLAGS_MEM_COMPRESSION_LEVEL;
if (!space->full_crc32()) {
space->flags
|= FSP_FLAGS_MASK_PAGE_COMPRESSION;
} else if (!space->is_compressed()) {
space->flags
|= innodb_compression_algorithm
<< FSP_FLAGS_FCRC32_POS_COMPRESSED_ALGO;
}
mutex_exit(&fil_system.mutex);
if (update) {

View File

@ -4876,20 +4876,6 @@ ibuf_print(
mutex_exit(&ibuf_mutex);
}
/** Check if a page is all zeroes.
@param[in] read_buf database page
@param[in] size page size
@return whether the page is all zeroes */
static bool buf_page_is_zeroes(const byte* read_buf, ulint size)
{
for (ulint i = 0; i < size; i++) {
if (read_buf[i] != 0) {
return false;
}
}
return true;
}
/** Check the insert buffer bitmaps on IMPORT TABLESPACE.
@param[in] trx transaction
@param[in,out] space tablespace being imported

View File

@ -678,6 +678,12 @@ buf_block_buf_fix_inc_func(
# endif /* UNIV_DEBUG */
#endif /* !UNIV_INNOCHECKSUM */
/** Check if a page is all zeroes.
@param[in] read_buf database page
@param[in] page_size page frame size
@return whether the page is all zeroes */
bool buf_page_is_zeroes(const void* read_buf, size_t page_size);
/** Checks if the page is in crc32 checksum format.
@param[in] read_buf database page
@param[in] checksum_field1 new checksum field
@ -714,14 +720,6 @@ buf_page_is_checksum_valid_none(
ulint checksum_field2)
MY_ATTRIBUTE((nonnull(1), warn_unused_result));
/** Checks if the page is in full crc32 checksum format.
@param[in] read_buf database page
@param[in] checksum_field checksum field
@return true if the page is in full crc32 checksum format */
bool buf_page_is_checksum_valid_full_crc32(
const byte* read_buf,
size_t checksum_field);
/** Check if a page is corrupt.
@param[in] check_lsn whether the LSN should be checked
@param[in] read_buf database page
@ -742,12 +740,55 @@ stored in 26th position.
@return key version of the page. */
inline uint32_t buf_page_get_key_version(const byte* read_buf, ulint fsp_flags)
{
return FSP_FLAGS_FCRC32_HAS_MARKER(fsp_flags)
return fil_space_t::full_crc32(fsp_flags)
? mach_read_from_4(read_buf + FIL_PAGE_FCRC32_KEY_VERSION)
: mach_read_from_4(read_buf
+ FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION);
}
/** Read the compression info from the page. In full crc32 format,
compression info is at MSB of page type. In other format, it is
stored in page type.
@param[in] read_buf database page
@param[in] fsp_flags tablespace flags
@return true if page is compressed. */
inline bool buf_page_is_compressed(const byte* read_buf, ulint fsp_flags)
{
ulint page_type = mach_read_from_2(read_buf + FIL_PAGE_TYPE);
return fil_space_t::full_crc32(fsp_flags)
? !!(page_type & 1U << FIL_PAGE_COMPRESS_FCRC32_MARKER)
: page_type == FIL_PAGE_PAGE_COMPRESSED;
}
/** Get the compressed or uncompressed size of a full_crc32 page.
@param[in] buf page_compressed or uncompressed page
@param[out] comp whether the page could be compressed
@param[out] cr whether the page could be corrupted
@return the payload size in the file page */
inline uint buf_page_full_crc32_size(const byte* buf, bool* comp, bool* cr)
{
uint t = mach_read_from_2(buf + FIL_PAGE_TYPE);
uint page_size = uint(srv_page_size);
if (!(t & 1U << FIL_PAGE_COMPRESS_FCRC32_MARKER)) {
return page_size;
}
t &= ~(1U << FIL_PAGE_COMPRESS_FCRC32_MARKER);
t <<= 8;
if (t < page_size) {
page_size = t;
if (comp) {
*comp = true;
}
} else if (cr) {
*cr = true;
}
return page_size;
}
#ifndef UNIV_INNOCHECKSUM
/**********************************************************************//**
Gets the space id, page offset, and byte offset within page of a

View File

@ -1,7 +1,7 @@
/*****************************************************************************
Copyright (c) 1995, 2016, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2017, 2018, MariaDB Corporation.
Copyright (c) 2017, 2019, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
@ -56,11 +56,6 @@ because this takes that field as an input!
uint32_t
buf_calc_page_old_checksum(const byte* page);
/** Calculate the CRC32 checksum for the whole page.
@param[in] page buffer page (srv_page_size bytes)
@return CRC32 value */
uint32_t buf_calc_page_full_crc32(const byte* page);
/** Return a printable string describing the checksum algorithm.
@param[in] algo algorithm
@return algorithm name */

View File

@ -1,7 +1,7 @@
/*****************************************************************************
Copyright (c) 1996, 2017, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2013, 2018, MariaDB Corporation.
Copyright (c) 2013, 2019, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
@ -652,14 +652,18 @@ dict_tf_to_fsp_flags(ulint table_flags)
DBUG_EXECUTE_IF("dict_tf_to_fsp_flags_failure",
return(ULINT_UNDEFINED););
/* Don't allow compressed row format for full crc32 algorithm */
/* No ROW_FORMAT=COMPRESSED for innodb_checksum_algorithm=full_crc32 */
if ((srv_checksum_algorithm == SRV_CHECKSUM_ALGORITHM_STRICT_FULL_CRC32
|| srv_checksum_algorithm == SRV_CHECKSUM_ALGORITHM_FULL_CRC32)
&& !(table_flags & DICT_TF_MASK_ZIP_SSIZE)
&& !page_compression_level) {
&& !(table_flags & DICT_TF_MASK_ZIP_SSIZE)) {
fsp_flags = 1U << FSP_FLAGS_FCRC32_POS_MARKER;
fsp_flags |= FSP_FLAGS_FCRC32_PAGE_SSIZE();
fsp_flags = 1U << FSP_FLAGS_FCRC32_POS_MARKER
| FSP_FLAGS_FCRC32_PAGE_SSIZE();
if (page_compression_level) {
fsp_flags |= innodb_compression_algorithm
<< FSP_FLAGS_FCRC32_POS_COMPRESSED_ALGO;
}
} else {
/* Adjust bit zero. */
fsp_flags = DICT_TF_HAS_ATOMIC_BLOBS(table_flags) ? 1 : 0;

View File

@ -333,15 +333,46 @@ struct fil_space_t {
if (full_crc32(flags)) {
ulint algo = FSP_FLAGS_FCRC32_GET_COMPRESSED_ALGO(
flags);
ut_ad(algo < 6);
return (algo > 0);
flags);
DBUG_ASSERT(algo <= PAGE_ALGORITHM_LAST);
return algo > 0;
}
return FSP_FLAGS_HAS_PAGE_COMPRESSION(flags);
}
/** @return whether the compression enabled for the tablespace. */
bool is_compressed() { return is_compressed(flags); }
bool is_compressed() const { return is_compressed(flags); }
/** Get the compression algorithm for full crc32 format.
@param[in] flags tablespace flags
@return algorithm type of tablespace */
static ulint get_compression_algo(ulint flags)
{
return full_crc32(flags)
? FSP_FLAGS_FCRC32_GET_COMPRESSED_ALGO(flags)
: 0;
}
/** @return the page_compressed algorithm
@retval 0 if not page_compressed */
ulint get_compression_algo() const {
return fil_space_t::get_compression_algo(flags);
}
/** Determine if the page_compressed page contains an extra byte
for exact compressed stream length
@param[in] flags tablespace flags
@return whether the extra byte is needed */
static bool full_crc32_page_compressed_len(ulint flags)
{
DBUG_ASSERT(full_crc32(flags));
switch (get_compression_algo(flags)) {
case PAGE_LZ4_ALGORITHM:
case PAGE_LZO_ALGORITHM:
case PAGE_SNAPPY_ALGORITHM:
return true;
}
return false;
}
/** Whether the full checksum matches with non full checksum flags.
@param[in] flags flags present
@param[in] expected expected flags
@ -351,22 +382,22 @@ struct fil_space_t {
ut_ad(full_crc32(flags));
if (full_crc32(expected)) {
return false;
return get_compression_algo(flags)
== get_compression_algo(expected);
}
ulint page_ssize = FSP_FLAGS_FCRC32_GET_PAGE_SSIZE(flags);
ulint space_page_ssize = FSP_FLAGS_GET_PAGE_SSIZE(expected);
if ((page_ssize == 5 && space_page_ssize != 0)
|| (page_ssize != 5 && (space_page_ssize != page_ssize))) {
if (page_ssize == 5) {
if (space_page_ssize) {
return false;
}
} else if (space_page_ssize != page_ssize) {
return false;
}
if (FSP_FLAGS_HAS_PAGE_COMPRESSION(expected)) {
return false;
}
return true;
return is_compressed(expected) == is_compressed(flags);
}
/** Whether old tablespace flags match full_crc32 flags.
@param[in] flags flags present
@ -376,8 +407,7 @@ struct fil_space_t {
{
ut_ad(!full_crc32(flags));
if (!full_crc32(expected)
|| FSP_FLAGS_HAS_PAGE_COMPRESSION(expected)) {
if (!full_crc32(expected)) {
return false;
}
@ -385,12 +415,15 @@ struct fil_space_t {
ulint space_page_ssize = FSP_FLAGS_FCRC32_GET_PAGE_SSIZE(
expected);
if ((page_ssize == 0 && space_page_ssize != 5)
|| (page_ssize != 0 && (space_page_ssize != page_ssize))) {
if (page_ssize) {
if (space_page_ssize != 5) {
return false;
}
} else if (space_page_ssize != page_ssize) {
return false;
}
return true;
return is_compressed(expected) == is_compressed(flags);
}
/** Whether both fsp flags are equivalent */
static bool is_flags_equal(ulint flags, ulint expected)
@ -647,6 +680,9 @@ or 64 bites of zero if no encryption */
/** This overloads FIL_PAGE_FILE_FLUSH_LSN for RTREE Split Sequence Number */
#define FIL_RTREE_SPLIT_SEQ_NUM FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION
/** Start of the page_compressed content */
#define FIL_PAGE_COMP_ALGO FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION
/** starting from 4.1.x this contains the space id of the page */
#define FIL_PAGE_ARCH_LOG_NO_OR_SPACE_ID 34U
@ -654,18 +690,27 @@ or 64 bites of zero if no encryption */
#define FIL_PAGE_DATA 38U /*!< start of the data on the page */
/** 32-bit key version used to encrypt the page in full crc32 format.
/** 32-bit key version used to encrypt the page in full_crc32 format.
For non-encrypted page, it contains 0. */
#define FIL_PAGE_FCRC32_KEY_VERSION 0
/* Following are used when page compression is used */
#define FIL_PAGE_COMPRESSED_SIZE 2 /*!< Number of bytes used to store
actual payload data size on
compressed pages. */
#define FIL_PAGE_COMPRESSION_METHOD_SIZE 2
/*!< Number of bytes used to store
actual compression method. */
/** page_compressed without innodb_checksum_algorithm=full_crc32 @{ */
/** Number of bytes used to store actual payload data size on
page_compressed pages when not using full_crc32. */
#define FIL_PAGE_COMP_SIZE 0
/** Number of bytes for FIL_PAGE_COMP_SIZE */
#define FIL_PAGE_COMP_METADATA_LEN 2
/** Number of bytes used to store actual compression method
for encrypted tables when not using full_crc32. */
#define FIL_PAGE_ENCRYPT_COMP_ALGO 2
/** Extra header size for encrypted page_compressed pages when
not using full_crc32 */
#define FIL_PAGE_ENCRYPT_COMP_METADATA_LEN 4
/* @} */
/** File page trailer @{ */
#define FIL_PAGE_END_LSN_OLD_CHKSUM 8 /*!< the low 4 bytes of this are used
to store the page checksum, the
@ -681,8 +726,9 @@ For non-encrypted page, it contains 0. */
/* @} */
/** File page types (values of FIL_PAGE_TYPE) @{ */
#define FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED 37401 /*!< Page is compressed and
then encrypted */
/** page_compressed, encrypted=YES (not used for full_crc32) */
#define FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED 37401
/** page_compressed (not used for full_crc32) */
#define FIL_PAGE_PAGE_COMPRESSED 34354 /*!< page compressed page */
#define FIL_PAGE_INDEX 17855 /*!< B-tree node */
#define FIL_PAGE_RTREE 17854 /*!< R-tree node (SPATIAL INDEX) */
@ -715,6 +761,12 @@ For non-encrypted page, it contains 0. */
Note: FIL_PAGE_TYPE_INSTANT maps to the same as FIL_PAGE_INDEX. */
#define FIL_PAGE_TYPE_LAST FIL_PAGE_TYPE_UNKNOWN
/*!< Last page type */
/** Set in FIL_PAGE_TYPE if for full_crc32 pages in page_compressed format.
If the flag is set, then the following holds for the remaining bits
of FIL_PAGE_TYPE:
Bits 0..7 will contain the compressed page size in bytes.
Bits 8..14 are reserved and must be 0. */
#define FIL_PAGE_COMPRESS_FCRC32_MARKER 15
/* @} */
/** @return whether the page type is B-tree or R-tree index */

View File

@ -1,6 +1,6 @@
/*****************************************************************************
Copyright (c) 2015, 2018, MariaDB Corporation.
Copyright (c) 2015, 2019, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
@ -88,6 +88,12 @@ fil_page_type_validate(
{
ulint page_type = mach_read_from_2(page + FIL_PAGE_TYPE);
if ((page_type & 1U << FIL_PAGE_COMPRESS_FCRC32_MARKER)
&& space->full_crc32()
&& space->is_compressed()) {
return true;
}
/* Validate page type */
if (!((page_type == FIL_PAGE_PAGE_COMPRESSED ||
page_type == FIL_PAGE_PAGE_COMPRESSED_ENCRYPTED ||

View File

@ -49,9 +49,13 @@ ulint fil_page_compress(
/** Decompress a page that may be subject to page_compressed compression.
@param[in,out] tmp_buf temporary buffer (of innodb_page_size)
@param[in,out] buf compressed page buffer
@param[in] flags talespace flags
@return size of the compressed data
@retval 0 if decompression failed
@retval srv_page_size if the page was not compressed */
ulint fil_page_decompress(byte* tmp_buf, byte* buf)
ulint fil_page_decompress(
byte* tmp_buf,
byte* buf,
ulint flags)
MY_ATTRIBUTE((nonnull, warn_unused_result));
#endif

View File

@ -386,10 +386,6 @@ in full crc32 format. */
#define FSP_FLAGS_FCRC32_GET_PAGE_SSIZE(flags) \
((flags & FSP_FLAGS_FCRC32_MASK_PAGE_SSIZE) \
>> FSP_FLAGS_FCRC32_POS_PAGE_SSIZE)
/** @return the MARKER flag in full crc32 format */
#define FSP_FLAGS_FCRC32_HAS_MARKER(flags) \
((flags & FSP_FLAGS_FCRC32_MASK_MARKER) \
>> FSP_FLAGS_FCRC32_POS_MARKER)
/** @return the COMPRESSED_ALGO flags in full crc32 format */
#define FSP_FLAGS_FCRC32_GET_COMPRESSED_ALGO(flags) \
((flags & FSP_FLAGS_FCRC32_MASK_COMPRESSED_ALGO) \

View File

@ -495,15 +495,12 @@ page_zip_calc_checksum(
ulint size,
srv_checksum_algorithm_t algo);
/**********************************************************************//**
Verify a compressed page's checksum.
@return TRUE if the stored checksum is valid according to the value of
/** Verify a compressed page's checksum.
@param[in] data compressed page
@param[in] size size of compressed page
@return whether the stored checksum is valid according to the value of
innodb_checksum_algorithm */
ibool
page_zip_verify_checksum(
/*=====================*/
const void* data, /*!< in: compressed page */
ulint size); /*!< in: size of compressed page */
bool page_zip_verify_checksum(const void* data, ulint size);
#ifndef UNIV_INNOCHECKSUM
/**********************************************************************//**

View File

@ -2,7 +2,7 @@
Copyright (c) 2005, 2016, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2012, Facebook Inc.
Copyright (c) 2014, 2018, MariaDB Corporation.
Copyright (c) 2014, 2019, MariaDB Corporation.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
@ -4972,15 +4972,12 @@ page_zip_calc_checksum(
return(0);
}
/**********************************************************************//**
Verify a compressed page's checksum.
@return TRUE if the stored checksum is valid according to the value of
/** Verify a compressed page's checksum.
@param[in] data compressed page
@param[in] size size of compressed page
@return whether the stored checksum is valid according to the value of
innodb_checksum_algorithm */
ibool
page_zip_verify_checksum(
/*=====================*/
const void* data, /*!< in: compressed page */
ulint size) /*!< in: size of compressed page */
bool page_zip_verify_checksum(const void* data, ulint size)
{
const uint32_t stored = mach_read_from_4(
static_cast<const byte*>(data) + FIL_PAGE_SPACE_OR_CHKSUM);

View File

@ -2026,15 +2026,22 @@ dberr_t PageConverter::operator()(buf_block_t* block) UNIV_NOTHROW
if (err != DB_SUCCESS) return err;
const bool full_crc32 = fil_space_t::full_crc32(get_space_flags());
const bool page_compressed = fil_space_t::is_compressed(get_space_flags());
if (!block->page.zip.data) {
if (full_crc32
&& block->page.encrypted && block->page.id.page_no() > 0) {
&& (block->page.encrypted || page_compressed)
&& block->page.id.page_no() > 0) {
byte* page = block->frame;
mach_write_to_8(page + FIL_PAGE_LSN, m_current_lsn);
mach_write_to_4(
page + srv_page_size - FIL_PAGE_FCRC32_END_LSN,
(ulint) m_current_lsn);
if (!page_compressed) {
mach_write_to_4(
page + (srv_page_size
- FIL_PAGE_FCRC32_END_LSN),
(ulint) m_current_lsn);
}
return err;
}
@ -3311,6 +3318,7 @@ fil_iterate(
return DB_OUT_OF_MEMORY;
}
ulint actual_space_id = 0;
const bool full_crc32 = fil_space_t::full_crc32(
callback.get_space_flags());
@ -3371,15 +3379,9 @@ fil_iterate(
byte* src = readptr + i * size;
const ulint page_no = page_get_page_no(src);
if (!page_no && block->page.id.page_no()) {
const ulint* b = reinterpret_cast<const ulint*>
(src);
const ulint* const e = b + size / sizeof *b;
do {
if (*b++) {
goto page_corrupted;
}
} while (b != e);
if (!buf_page_is_zeroes(src, size)) {
goto page_corrupted;
}
/* Proceed to the next page,
because this one is all zero. */
continue;
@ -3395,9 +3397,19 @@ page_corrupted:
goto func_exit;
}
const bool page_compressed
= fil_page_is_compressed_encrypted(src)
|| fil_page_is_compressed(src);
if (block->page.id.page_no() == 0) {
actual_space_id = mach_read_from_4(
src + FIL_PAGE_SPACE_ID);
}
const bool page_compressed =
(full_crc32
&& fil_space_t::is_compressed(
callback.get_space_flags())
&& buf_page_is_compressed(
src, callback.get_space_flags()))
|| (fil_page_is_compressed_encrypted(src)
|| fil_page_is_compressed(src));
if (page_compressed && block->page.zip.data) {
goto page_corrupted;
@ -3427,7 +3439,7 @@ not_encrypted:
}
decrypted = fil_space_decrypt(
block->page.id.space(),
actual_space_id,
iter.crypt_data, dst,
callback.physical_size(),
callback.get_space_flags(),
@ -3452,7 +3464,8 @@ not_encrypted:
to decompress it before adjusting further. */
if (page_compressed) {
ulint compress_length = fil_page_decompress(
page_compress_buf, dst);
page_compress_buf, dst,
callback.get_space_flags());
ut_ad(compress_length != srv_page_size);
if (compress_length == 0) {
goto page_corrupted;
@ -3556,6 +3569,26 @@ not_encrypted:
updated = true;
}
/* Write checksum for the compressed full crc32 page.*/
if (full_crc32 && page_compressed) {
ut_ad(updated);
byte* dest = writeptr + i * size;
ut_d(bool comp = false);
ut_d(bool corrupt = false);
ulint size = buf_page_full_crc32_size(
dest,
#ifdef UNIV_DEBUG
&comp, &corrupt
#else
NULL, NULL
#endif
);
ut_ad(!comp == (size == srv_page_size));
ut_ad(!corrupt);
mach_write_to_4(dest + (size - 4),
ut_crc32(dest, size - 4));
}
}
/* A page was updated in the set, write back to disk. */