diff --git a/storage/innobase/btr/btr0bulk.cc b/storage/innobase/btr/btr0bulk.cc index 4118c271326..96ff81c9fab 100644 --- a/storage/innobase/btr/btr0bulk.cc +++ b/storage/innobase/btr/btr0bulk.cc @@ -1,7 +1,7 @@ /***************************************************************************** Copyright (c) 2014, 2019, Oracle and/or its affiliates. All Rights Reserved. -Copyright (c) 2017, 2019, MariaDB Corporation. +Copyright (c) 2017, 2020, MariaDB Corporation. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software @@ -29,6 +29,7 @@ Created 03/11/2014 Shaohua Wang #include "btr0cur.h" #include "btr0pcur.h" #include "ibuf0ibuf.h" +#include "page0page.h" #include "trx0trx.h" /** Innodb B-tree index fill factor for bulk load. */ @@ -143,7 +144,6 @@ PageBulk::init() } m_block = new_block; - m_block->skip_flush_check = true; m_page = new_page; m_page_zip = new_page_zip; m_page_no = new_page_no; @@ -163,7 +163,9 @@ PageBulk::init() UNIV_PAGE_SIZE - dict_index_zip_pad_optimal_page_size(m_index); m_heap_top = page_header_get_ptr(new_page, PAGE_HEAP_TOP); m_rec_no = page_header_get_field(new_page, PAGE_N_RECS); - + ut_ad(page_header_get_field(m_page, PAGE_DIRECTION) + == PAGE_NO_DIRECTION); + page_header_set_field(m_page, NULL, PAGE_DIRECTION, 0); ut_d(m_total_data = 0); /* See page_copy_rec_list_end_to_created_page() */ ut_d(page_header_set_field(m_page, NULL, PAGE_HEAP_TOP, @@ -188,7 +190,7 @@ PageBulk::insert( #ifdef UNIV_DEBUG /* Check whether records are in order. */ - if (!page_rec_is_infimum(m_cur_rec)) { + if (!page_rec_is_infimum_low(page_offset(m_cur_rec))) { rec_t* old_rec = m_cur_rec; rec_offs* old_offsets = rec_get_offsets( old_rec, m_index, NULL, page_rec_is_leaf(old_rec), @@ -206,18 +208,21 @@ PageBulk::insert( rec_offs_make_valid(insert_rec, m_index, offsets); /* 2. Insert the record in the linked list. */ - rec_t* next_rec = page_rec_get_next(m_cur_rec); - - page_rec_set_next(insert_rec, next_rec); - page_rec_set_next(m_cur_rec, insert_rec); - /* 3. Set the n_owned field in the inserted record to zero, and set the heap_no field. */ if (m_is_comp) { + ulint next_offs = rec_get_next_offs(m_cur_rec, TRUE); + rec_set_next_offs_new(insert_rec, next_offs); + rec_set_next_offs_new(m_cur_rec, page_offset(insert_rec)); + rec_set_n_owned_new(insert_rec, NULL, 0); rec_set_heap_no_new(insert_rec, PAGE_HEAP_NO_USER_LOW + m_rec_no); } else { + ulint next_offs = rec_get_next_offs(m_cur_rec, FALSE); + rec_set_next_offs_old(insert_rec, next_offs); + rec_set_next_offs_old(m_cur_rec, page_offset(insert_rec)); + rec_set_n_owned_old(insert_rec, 0); rec_set_heap_no_old(insert_rec, PAGE_HEAP_NO_USER_LOW + m_rec_no); @@ -245,17 +250,54 @@ PageBulk::insert( m_cur_rec = insert_rec; } +inline bool PageBulk::needs_finish() const +{ + ut_ad(page_align(m_cur_rec) == m_block->frame); + ut_ad(m_page == m_block->frame); + ulint n_heap= page_header_get_field(m_page, PAGE_N_HEAP); + if (!n_heap || !page_header_get_field(m_page, PAGE_DIRECTION)) + return true; + ulint heap_no; + if (n_heap & 0x8000) + { + n_heap&= 0x7fff; + heap_no= rec_get_heap_no_new(m_cur_rec); + if (heap_no == PAGE_HEAP_NO_INFIMUM && + page_header_get_field(m_page, PAGE_HEAP_TOP) == PAGE_NEW_SUPREMUM_END) + return false; + } + else + { + heap_no= rec_get_heap_no_old(m_cur_rec); + if (heap_no == PAGE_HEAP_NO_INFIMUM && + page_header_get_field(m_page, PAGE_HEAP_TOP) == PAGE_OLD_SUPREMUM_END) + return false; + } + return heap_no != n_heap - 1; +} + /** Mark end of insertion to the page. Scan all records to set page dirs, and set page header members. Note: we refer to page_copy_rec_list_end_to_created_page. */ void PageBulk::finish() { - ut_ad(m_rec_no > 0); + ut_ad(!dict_index_is_spatial(m_index)); + + if (!needs_finish()) { + return; + } + ut_ad(m_total_data + page_dir_calc_reserved_space(m_rec_no) <= page_get_free_space_of_empty(m_is_comp)); +#ifdef UNIV_DEBUG /* See page_copy_rec_list_end_to_created_page() */ - ut_d(page_dir_set_n_slots(m_page, NULL, srv_page_size / 2)); + if (m_rec_no) { + page_dir_set_n_slots(m_page, NULL, srv_page_size / 2); + } + mach_write_to_2(PAGE_HEADER + PAGE_HEAP_TOP + m_page, + ulint(m_heap_top - m_page)); +#endif ulint count = 0; ulint n_recs = 0; @@ -264,8 +306,7 @@ PageBulk::finish() page_dir_slot_t* slot = NULL; /* Set owner & dir. */ - do { - + while (!page_rec_is_supremum(insert_rec)) { count++; n_recs++; @@ -282,7 +323,7 @@ PageBulk::finish() } insert_rec = page_rec_get_next(insert_rec); - } while (!page_rec_is_supremum(insert_rec)); + } if (slot_index > 0 && (count + 1 + (PAGE_DIR_SLOT_MAX_N_OWNED + 1) / 2 @@ -305,9 +346,10 @@ PageBulk::finish() page_dir_slot_set_rec(slot, page_get_supremum_rec(m_page)); page_dir_slot_set_n_owned(slot, NULL, count + 1); - ut_ad(!dict_index_is_spatial(m_index)); - - if (!m_flush_observer && !m_page_zip) { + if (!m_rec_no) { + page_header_set_field(m_page, NULL, PAGE_DIRECTION, + PAGE_NO_DIRECTION); + } else if (!m_flush_observer && !m_page_zip) { mlog_write_ulint(PAGE_HEADER + PAGE_N_DIR_SLOTS + m_page, 2 + slot_index, MLOG_2BYTES, &m_mtr); mlog_write_ulint(PAGE_HEADER + PAGE_HEAP_TOP + m_page, @@ -344,26 +386,18 @@ PageBulk::finish() mach_write_to_2(PAGE_HEADER + PAGE_N_DIRECTION + m_page, 0); } - m_block->skip_flush_check = false; + ut_ad(!needs_finish()); + ut_ad(page_validate(m_page, m_index)); } /** Commit inserts done to the page @param[in] success Flag whether all inserts succeed. */ -void -PageBulk::commit( - bool success) +void PageBulk::commit(bool success) { - if (success) { - ut_ad(page_validate(m_page, m_index)); - - /* Set no free space left and no buffered changes in ibuf. */ - if (!dict_index_is_clust(m_index) && page_is_leaf(m_page)) { - ibuf_set_bitmap_for_bulk_load( - m_block, innobase_fill_factor == 100); - } - } - - m_mtr.commit(); + finish(); + if (success && !dict_index_is_clust(m_index) && page_is_leaf(m_page)) + ibuf_set_bitmap_for_bulk_load(m_block, innobase_fill_factor == 100); + m_mtr.commit(); } /** Compress a page of compressed table @@ -606,7 +640,9 @@ PageBulk::storeExt( const big_rec_t* big_rec, rec_offs* offsets) { - /* Note: not all fileds are initialized in btr_pcur. */ + finish(); + + /* Note: not all fields are initialized in btr_pcur. */ btr_pcur_t btr_pcur; btr_pcur.pos_state = BTR_PCUR_IS_POSITIONED; btr_pcur.latch_mode = BTR_MODIFY_LEAF; @@ -635,7 +671,7 @@ Note: log_free_check requires holding no lock/latch in current thread. */ void PageBulk::release() { - ut_ad(!dict_index_is_spatial(m_index)); + finish(); /* We fix the block because we will re-pin it soon. */ buf_block_buf_fix_inc(m_block, __FILE__, __LINE__); @@ -693,12 +729,11 @@ BtrBulk::pageSplit( { ut_ad(page_bulk->getPageZip() != NULL); - /* 1. Check if we have only one user record on the page. */ if (page_bulk->getRecNo() <= 1) { return(DB_TOO_BIG_RECORD); } - /* 2. create a new page. */ + /* Initialize a new page */ PageBulk new_page_bulk(m_index, m_trx->id, FIL_NULL, page_bulk->getLevel(), m_flush_observer); dberr_t err = new_page_bulk.init(); @@ -706,19 +741,18 @@ BtrBulk::pageSplit( return(err); } - /* 3. copy the upper half to new page. */ + /* Copy the upper half to the new page. */ rec_t* split_rec = page_bulk->getSplitRec(); new_page_bulk.copyIn(split_rec); page_bulk->copyOut(split_rec); - /* 4. commit the splitted page. */ + /* Commit the pages after split. */ err = pageCommit(page_bulk, &new_page_bulk, true); if (err != DB_SUCCESS) { pageAbort(&new_page_bulk); return(err); } - /* 5. commit the new page. */ err = pageCommit(&new_page_bulk, next_page_bulk, true); if (err != DB_SUCCESS) { pageAbort(&new_page_bulk); @@ -944,11 +978,9 @@ BtrBulk::insert( ut_ad(page_bulk->getLevel() == 0); ut_ad(page_bulk == m_page_bulks.at(0)); - /* Release all latched but leaf node. */ + /* Release all pages above the leaf level */ for (ulint level = 1; level <= m_root_level; level++) { - PageBulk* page_bulk = m_page_bulks.at(level); - - page_bulk->release(); + m_page_bulks.at(level)->release(); } err = page_bulk->storeExt(big_rec, offsets); @@ -1037,6 +1069,7 @@ BtrBulk::finish(dberr_t err) return(err); } root_page_bulk.copyIn(first_rec); + root_page_bulk.finish(); /* Remove last page. */ btr_page_free(m_index, last_block, &mtr); diff --git a/storage/innobase/buf/buf0buf.cc b/storage/innobase/buf/buf0buf.cc index 8bd0fa0a885..2456b9aeb5f 100644 --- a/storage/innobase/buf/buf0buf.cc +++ b/storage/innobase/buf/buf0buf.cc @@ -1508,8 +1508,6 @@ buf_block_init( #ifdef BTR_CUR_HASH_ADAPT block->index = NULL; #endif /* BTR_CUR_HASH_ADAPT */ - block->skip_flush_check = false; - ut_d(block->page.in_page_hash = FALSE); ut_d(block->page.in_zip_hash = FALSE); ut_d(block->page.in_flush_list = FALSE); @@ -3822,7 +3820,6 @@ buf_block_init_low( /*===============*/ buf_block_t* block) /*!< in: block to init */ { - block->skip_flush_check = false; #ifdef BTR_CUR_HASH_ADAPT /* No adaptive hash index entries may point to a previously unused (and now freshly allocated) block. */ diff --git a/storage/innobase/buf/buf0dblwr.cc b/storage/innobase/buf/buf0dblwr.cc index 3ff44129cec..7148a6fbfd0 100644 --- a/storage/innobase/buf/buf0dblwr.cc +++ b/storage/innobase/buf/buf0dblwr.cc @@ -852,10 +852,6 @@ buf_dblwr_check_block( { ut_ad(buf_block_get_state(block) == BUF_BLOCK_FILE_PAGE); - if (block->skip_flush_check) { - return; - } - switch (fil_page_get_type(block->frame)) { case FIL_PAGE_INDEX: case FIL_PAGE_RTREE: diff --git a/storage/innobase/buf/buf0lru.cc b/storage/innobase/buf/buf0lru.cc index 45dd2f2312e..ff38a744166 100644 --- a/storage/innobase/buf/buf0lru.cc +++ b/storage/innobase/buf/buf0lru.cc @@ -954,7 +954,6 @@ loop: ut_ad(buf_pool_from_block(block) == buf_pool); memset(&block->page.zip, 0, sizeof block->page.zip); - block->skip_flush_check = false; block->page.flush_observer = NULL; return(block); } diff --git a/storage/innobase/include/btr0bulk.h b/storage/innobase/include/btr0bulk.h index 4c21d929848..854414d504d 100644 --- a/storage/innobase/include/btr0bulk.h +++ b/storage/innobase/include/btr0bulk.h @@ -1,7 +1,7 @@ /***************************************************************************** Copyright (c) 2014, 2015, Oracle and/or its affiliates. All Rights Reserved. -Copyright (c) 2019, MariaDB Corporation. +Copyright (c) 2019, 2020, MariaDB Corporation. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software @@ -110,6 +110,9 @@ public: dirs, and set page header members. */ void finish(); + /** @return whether finish() actually needs to do something */ + inline bool needs_finish() const; + /** Commit mtr for a page @param[in] success Flag whether all inserts succeed. */ void commit(bool success); diff --git a/storage/innobase/include/buf0buf.h b/storage/innobase/include/buf0buf.h index fd02279e2b0..5f390d2761d 100644 --- a/storage/innobase/include/buf0buf.h +++ b/storage/innobase/include/buf0buf.h @@ -1768,9 +1768,6 @@ struct buf_block_t{ # define assert_block_ahi_empty_on_init(block) /* nothing */ # define assert_block_ahi_valid(block) /* nothing */ #endif /* BTR_CUR_HASH_ADAPT */ - bool skip_flush_check; - /*!< Skip check in buf_dblwr_check_block - during bulk load, protected by lock.*/ # ifdef UNIV_DEBUG /** @name Debug fields */ /* @{ */