Merge whalegate.ndb.mysql.com:/home/tomas/mysql-5.1-new-ndb

into  whalegate.ndb.mysql.com:/home/tomas/mysql-5.1-build
This commit is contained in:
tomas@whalegate.ndb.mysql.com 2007-06-06 16:51:04 +02:00
commit 016cf69867
24 changed files with 428 additions and 148 deletions

View File

@ -13,6 +13,7 @@ TimeBetweenGlobalCheckpoints= 500
NoOfFragmentLogFiles= 4 NoOfFragmentLogFiles= 4
FragmentLogFileSize=12M FragmentLogFileSize=12M
DiskPageBufferMemory= CHOOSE_DiskPageBufferMemory DiskPageBufferMemory= CHOOSE_DiskPageBufferMemory
ODirect= 1
# the following parametes just function as a small regression # the following parametes just function as a small regression
# test that the parameter exists # test that the parameter exists
InitialNoOfOpenFiles= 27 InitialNoOfOpenFiles= 27

View File

@ -82,6 +82,8 @@
#define CFG_DB_BACKUP_WRITE_SIZE 136 #define CFG_DB_BACKUP_WRITE_SIZE 136
#define CFG_DB_BACKUP_MAX_WRITE_SIZE 139 #define CFG_DB_BACKUP_MAX_WRITE_SIZE 139
#define CFG_DB_WATCHDOG_INTERVAL_INITIAL 141
#define CFG_LOG_DESTINATION 147 #define CFG_LOG_DESTINATION 147
#define CFG_DB_DISCLESS 148 #define CFG_DB_DISCLESS 148
@ -114,6 +116,8 @@
#define CFG_DB_MEMREPORT_FREQUENCY 166 #define CFG_DB_MEMREPORT_FREQUENCY 166
#define CFG_DB_O_DIRECT 168
#define CFG_DB_SGA 198 /* super pool mem */ #define CFG_DB_SGA 198 /* super pool mem */
#define CFG_DB_DATA_MEM_2 199 /* used in special build in 5.1 */ #define CFG_DB_DATA_MEM_2 199 /* used in special build in 5.1 */

View File

@ -144,4 +144,6 @@ extern "C" {
#define MAX(x,y) (((x)>(y))?(x):(y)) #define MAX(x,y) (((x)>(y))?(x):(y))
#endif #endif
#define NDB_O_DIRECT_WRITE_ALIGNMENT 512
#endif #endif

View File

@ -37,9 +37,6 @@ NDB_TICKS NdbTick_CurrentMillisecond(void);
*/ */
int NdbTick_CurrentMicrosecond(NDB_TICKS * secs, Uint32 * micros); int NdbTick_CurrentMicrosecond(NDB_TICKS * secs, Uint32 * micros);
/*#define TIME_MEASUREMENT*/
#ifdef TIME_MEASUREMENT
struct MicroSecondTimer { struct MicroSecondTimer {
NDB_TICKS seconds; NDB_TICKS seconds;
NDB_TICKS micro_seconds; NDB_TICKS micro_seconds;
@ -54,7 +51,6 @@ struct MicroSecondTimer {
NDB_TICKS NdbTick_getMicrosPassed(struct MicroSecondTimer start, NDB_TICKS NdbTick_getMicrosPassed(struct MicroSecondTimer start,
struct MicroSecondTimer stop); struct MicroSecondTimer stop);
int NdbTick_getMicroTimer(struct MicroSecondTimer* time_now); int NdbTick_getMicroTimer(struct MicroSecondTimer* time_now);
#endif
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -15,7 +15,7 @@
#include <ndb_global.h> #include <ndb_global.h>
#include "NdbTick.h" #include <NdbTick.h>
#define NANOSEC_PER_SEC 1000000000 #define NANOSEC_PER_SEC 1000000000
#define MICROSEC_PER_SEC 1000000 #define MICROSEC_PER_SEC 1000000
@ -71,7 +71,6 @@ NdbTick_CurrentMicrosecond(NDB_TICKS * secs, Uint32 * micros){
} }
#endif #endif
#ifdef TIME_MEASUREMENT
int int
NdbTick_getMicroTimer(struct MicroSecondTimer* input_timer) NdbTick_getMicroTimer(struct MicroSecondTimer* input_timer)
{ {
@ -102,4 +101,3 @@ NdbTick_getMicrosPassed(struct MicroSecondTimer start,
} }
return ret_value; return ret_value;
} }
#endif

View File

@ -2771,6 +2771,8 @@ Backup::openFiles(Signal* signal, BackupRecordPtr ptr)
c_backupFilePool.getPtr(filePtr, ptr.p->dataFilePtr); c_backupFilePool.getPtr(filePtr, ptr.p->dataFilePtr);
filePtr.p->m_flags |= BackupFile::BF_OPENING; filePtr.p->m_flags |= BackupFile::BF_OPENING;
if (c_defaults.m_o_direct)
req->fileFlags |= FsOpenReq::OM_DIRECT;
req->userPointer = filePtr.i; req->userPointer = filePtr.i;
FsOpenReq::setVersion(req->fileNumber, 2); FsOpenReq::setVersion(req->fileNumber, 2);
FsOpenReq::setSuffix(req->fileNumber, FsOpenReq::S_DATA); FsOpenReq::setSuffix(req->fileNumber, FsOpenReq::S_DATA);
@ -3745,12 +3747,31 @@ Backup::OperationRecord::newFragment(Uint32 tableId, Uint32 fragNo)
} }
bool bool
Backup::OperationRecord::fragComplete(Uint32 tableId, Uint32 fragNo) Backup::OperationRecord::fragComplete(Uint32 tableId, Uint32 fragNo, bool fill_record)
{ {
Uint32 * tmp; Uint32 * tmp;
const Uint32 footSz = sizeof(BackupFormat::DataFile::FragmentFooter) >> 2; const Uint32 footSz = sizeof(BackupFormat::DataFile::FragmentFooter) >> 2;
Uint32 sz = footSz + 1;
if(dataBuffer.getWritePtr(&tmp, footSz + 1)) { if (fill_record)
{
Uint32 * new_tmp;
if (!dataBuffer.getWritePtr(&tmp, sz))
return false;
new_tmp = tmp + sz;
if ((UintPtr)new_tmp & (sizeof(Page32)-1))
{
/* padding is needed to get full write */
new_tmp += 2 /* to fit empty header minimum 2 words*/;
new_tmp = (Uint32 *)(((UintPtr)new_tmp + sizeof(Page32)-1) &
~(UintPtr)(sizeof(Page32)-1));
/* new write sz */
sz = new_tmp - tmp;
}
}
if(dataBuffer.getWritePtr(&tmp, sz)) {
jam(); jam();
* tmp = 0; // Finish record stream * tmp = 0; // Finish record stream
tmp++; tmp++;
@ -3762,7 +3783,17 @@ Backup::OperationRecord::fragComplete(Uint32 tableId, Uint32 fragNo)
foot->FragmentNo = htonl(fragNo); foot->FragmentNo = htonl(fragNo);
foot->NoOfRecords = htonl(noOfRecords); foot->NoOfRecords = htonl(noOfRecords);
foot->Checksum = htonl(0); foot->Checksum = htonl(0);
dataBuffer.updateWritePtr(footSz + 1);
if (sz != footSz + 1)
{
tmp += footSz;
memset(tmp, 0, (sz - footSz - 1) * 4);
*tmp = htonl(BackupFormat::EMPTY_ENTRY);
tmp++;
*tmp = htonl(sz - footSz - 1);
}
dataBuffer.updateWritePtr(sz);
return true; return true;
}//if }//if
return false; return false;
@ -3864,8 +3895,13 @@ Backup::fragmentCompleted(Signal* signal, BackupFilePtr filePtr)
return; return;
}//if }//if
BackupRecordPtr ptr LINT_SET_PTR;
c_backupPool.getPtr(ptr, filePtr.p->backupPtr);
OperationRecord & op = filePtr.p->operation; OperationRecord & op = filePtr.p->operation;
if(!op.fragComplete(filePtr.p->tableId, filePtr.p->fragmentNo)) { if(!op.fragComplete(filePtr.p->tableId, filePtr.p->fragmentNo,
c_defaults.m_o_direct))
{
jam(); jam();
signal->theData[0] = BackupContinueB::BUFFER_FULL_FRAG_COMPLETE; signal->theData[0] = BackupContinueB::BUFFER_FULL_FRAG_COMPLETE;
signal->theData[1] = filePtr.i; signal->theData[1] = filePtr.i;
@ -3875,9 +3911,6 @@ Backup::fragmentCompleted(Signal* signal, BackupFilePtr filePtr)
filePtr.p->m_flags &= ~(Uint32)BackupFile::BF_SCAN_THREAD; filePtr.p->m_flags &= ~(Uint32)BackupFile::BF_SCAN_THREAD;
BackupRecordPtr ptr LINT_SET_PTR;
c_backupPool.getPtr(ptr, filePtr.p->backupPtr);
if (ptr.p->is_lcp()) if (ptr.p->is_lcp())
{ {
ptr.p->slaveState.setState(STOPPING); ptr.p->slaveState.setState(STOPPING);
@ -4914,6 +4947,8 @@ Backup::lcp_open_file(Signal* signal, BackupRecordPtr ptr)
FsOpenReq::OM_CREATE | FsOpenReq::OM_CREATE |
FsOpenReq::OM_APPEND | FsOpenReq::OM_APPEND |
FsOpenReq::OM_AUTOSYNC; FsOpenReq::OM_AUTOSYNC;
if (c_defaults.m_o_direct)
req->fileFlags |= FsOpenReq::OM_DIRECT;
FsOpenReq::v2_setCount(req->fileNumber, 0xFFFFFFFF); FsOpenReq::v2_setCount(req->fileNumber, 0xFFFFFFFF);
req->auto_sync_size = c_defaults.m_disk_synch_size; req->auto_sync_size = c_defaults.m_disk_synch_size;

View File

@ -240,7 +240,7 @@ public:
* Once per fragment * Once per fragment
*/ */
bool newFragment(Uint32 tableId, Uint32 fragNo); bool newFragment(Uint32 tableId, Uint32 fragNo);
bool fragComplete(Uint32 tableId, Uint32 fragNo); bool fragComplete(Uint32 tableId, Uint32 fragNo, bool fill_record);
/** /**
* Once per scan frag (next) req/conf * Once per scan frag (next) req/conf
@ -534,6 +534,7 @@ public:
Uint32 m_disk_write_speed; Uint32 m_disk_write_speed;
Uint32 m_disk_synch_size; Uint32 m_disk_synch_size;
Uint32 m_diskless; Uint32 m_diskless;
Uint32 m_o_direct;
}; };
/** /**

View File

@ -32,7 +32,8 @@ struct BackupFormat {
TABLE_LIST = 4, TABLE_LIST = 4,
TABLE_DESCRIPTION = 5, TABLE_DESCRIPTION = 5,
GCP_ENTRY = 6, GCP_ENTRY = 6,
FRAGMENT_INFO = 7 FRAGMENT_INFO = 7,
EMPTY_ENTRY = 8
}; };
struct FileHeader { struct FileHeader {
@ -93,6 +94,13 @@ struct BackupFormat {
Uint32 NoOfRecords; Uint32 NoOfRecords;
Uint32 Checksum; Uint32 Checksum;
}; };
/* optional padding for O_DIRECT */
struct EmptyEntry {
Uint32 SectionType;
Uint32 SectionLength;
/* not used data */
};
}; };
/** /**

View File

@ -148,10 +148,13 @@ Backup::execREAD_CONFIG_REQ(Signal* signal)
c_defaults.m_disk_write_speed = 10 * (1024 * 1024); c_defaults.m_disk_write_speed = 10 * (1024 * 1024);
c_defaults.m_disk_write_speed_sr = 100 * (1024 * 1024); c_defaults.m_disk_write_speed_sr = 100 * (1024 * 1024);
c_defaults.m_disk_synch_size = 4 * (1024 * 1024); c_defaults.m_disk_synch_size = 4 * (1024 * 1024);
c_defaults.m_o_direct = true;
Uint32 noBackups = 0, noTables = 0, noAttribs = 0, noFrags = 0; Uint32 noBackups = 0, noTables = 0, noAttribs = 0, noFrags = 0;
ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_DB_DISCLESS, ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_DB_DISCLESS,
&c_defaults.m_diskless)); &c_defaults.m_diskless));
ndb_mgm_get_int_parameter(p, CFG_DB_O_DIRECT,
&c_defaults.m_o_direct);
ndb_mgm_get_int_parameter(p, CFG_DB_CHECKPOINT_SPEED_SR, ndb_mgm_get_int_parameter(p, CFG_DB_CHECKPOINT_SPEED_SR,
&c_defaults.m_disk_write_speed_sr); &c_defaults.m_disk_write_speed_sr);
ndb_mgm_get_int_parameter(p, CFG_DB_CHECKPOINT_SPEED, ndb_mgm_get_int_parameter(p, CFG_DB_CHECKPOINT_SPEED,
@ -204,7 +207,7 @@ Backup::execREAD_CONFIG_REQ(Signal* signal)
/ sizeof(Page32); / sizeof(Page32);
// We need to allocate an additional of 2 pages. 1 page because of a bug in // We need to allocate an additional of 2 pages. 1 page because of a bug in
// ArrayPool and another one for DICTTAINFO. // ArrayPool and another one for DICTTAINFO.
c_pagePool.setSize(noPages + NO_OF_PAGES_META_FILE + 2); c_pagePool.setSize(noPages + NO_OF_PAGES_META_FILE + 2, true);
{ // Init all tables { // Init all tables
SLList<Table> tables(c_tablePool); SLList<Table> tables(c_tablePool);

View File

@ -270,8 +270,8 @@ FsBuffer::getReadPtr(Uint32 ** ptr, Uint32 * sz, bool * _eof){
* ptr = &Tp[Tr]; * ptr = &Tp[Tr];
DEBUG(ndbout_c("getReadPtr() Tr: %d Tw: %d Ts: %d Tm: %d sz1: %d -> %d", DEBUG(ndbout_c("getReadPtr() Tr: %d Tmw: %d Ts: %d Tm: %d sz1: %d -> %d",
Tr, Tw, Ts, Tm, sz1, * sz)); Tr, Tmw, Ts, Tm, sz1, * sz));
return true; return true;
} }
@ -279,8 +279,8 @@ FsBuffer::getReadPtr(Uint32 ** ptr, Uint32 * sz, bool * _eof){
if(!m_eof){ if(!m_eof){
* _eof = false; * _eof = false;
DEBUG(ndbout_c("getReadPtr() Tr: %d Tw: %d Ts: %d Tm: %d sz1: %d -> false", DEBUG(ndbout_c("getReadPtr() Tr: %d Tmw: %d Ts: %d Tm: %d sz1: %d -> false",
Tr, Tw, Ts, Tm, sz1)); Tr, Tmw, Ts, Tm, sz1));
return false; return false;
} }
@ -289,8 +289,8 @@ FsBuffer::getReadPtr(Uint32 ** ptr, Uint32 * sz, bool * _eof){
* _eof = true; * _eof = true;
* ptr = &Tp[Tr]; * ptr = &Tp[Tr];
DEBUG(ndbout_c("getReadPtr() Tr: %d Tw: %d Ts: %d Tm: %d sz1: %d -> %d eof", DEBUG(ndbout_c("getReadPtr() Tr: %d Tmw: %d Ts: %d Tm: %d sz1: %d -> %d eof",
Tr, Tw, Ts, Tm, sz1, * sz)); Tr, Tmw, Ts, Tm, sz1, * sz));
return false; return false;
} }
@ -316,13 +316,13 @@ FsBuffer::getWritePtr(Uint32 ** ptr, Uint32 sz){
if(sz1 > sz){ // Note at least 1 word of slack if(sz1 > sz){ // Note at least 1 word of slack
* ptr = &Tp[Tw]; * ptr = &Tp[Tw];
DEBUG(ndbout_c("getWritePtr(%d) Tr: %d Tw: %d Ts: %d sz1: %d -> true", DEBUG(ndbout_c("getWritePtr(%d) Tw: %d sz1: %d -> true",
sz, Tr, Tw, Ts, sz1)); sz, Tw, sz1));
return true; return true;
} }
DEBUG(ndbout_c("getWritePtr(%d) Tr: %d Tw: %d Ts: %d sz1: %d -> false", DEBUG(ndbout_c("getWritePtr(%d) Tw: %d sz1: %d -> false",
sz, Tr, Tw, Ts, sz1)); sz, Tw, sz1));
return false; return false;
} }
@ -339,11 +339,15 @@ FsBuffer::updateWritePtr(Uint32 sz){
m_free -= sz; m_free -= sz;
if(Tnew < Ts){ if(Tnew < Ts){
m_writeIndex = Tnew; m_writeIndex = Tnew;
DEBUG(ndbout_c("updateWritePtr(%d) m_writeIndex: %d",
sz, m_writeIndex));
return; return;
} }
memcpy(Tp, &Tp[Ts], (Tnew - Ts) << 2); memcpy(Tp, &Tp[Ts], (Tnew - Ts) << 2);
m_writeIndex = Tnew - Ts; m_writeIndex = Tnew - Ts;
DEBUG(ndbout_c("updateWritePtr(%d) m_writeIndex: %d",
sz, m_writeIndex));
} }
inline inline

View File

@ -114,9 +114,6 @@ class Dbtup;
/* ------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------- */
/* VARIOUS CONSTANTS USED AS FLAGS TO THE FILE MANAGER. */ /* VARIOUS CONSTANTS USED AS FLAGS TO THE FILE MANAGER. */
/* ------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------- */
#define ZOPEN_READ 0
#define ZOPEN_WRITE 1
#define ZOPEN_READ_WRITE 2
#define ZVAR_NO_LOG_PAGE_WORD 1 #define ZVAR_NO_LOG_PAGE_WORD 1
#define ZLIST_OF_PAIRS 0 #define ZLIST_OF_PAIRS 0
#define ZLIST_OF_PAIRS_SYNCH 16 #define ZLIST_OF_PAIRS_SYNCH 16
@ -2687,6 +2684,7 @@ private:
UintR clfoFileSize; UintR clfoFileSize;
LogPageRecord *logPageRecord; LogPageRecord *logPageRecord;
void *logPageRecordUnaligned;
LogPageRecordPtr logPagePtr; LogPageRecordPtr logPagePtr;
UintR cfirstfreeLogPage; UintR cfirstfreeLogPage;
UintR clogPageFileSize; UintR clogPageFileSize;
@ -2890,6 +2888,7 @@ private:
UintR ctransidHash[1024]; UintR ctransidHash[1024];
Uint32 c_diskless; Uint32 c_diskless;
Uint32 c_o_direct;
Uint32 c_error_insert_table_id; Uint32 c_error_insert_table_id;
public: public:

View File

@ -49,6 +49,7 @@ void Dblqh::initData()
logFileRecord = 0; logFileRecord = 0;
logFileOperationRecord = 0; logFileOperationRecord = 0;
logPageRecord = 0; logPageRecord = 0;
logPageRecordUnaligned= 0;
pageRefRecord = 0; pageRefRecord = 0;
tablerec = 0; tablerec = 0;
tcConnectionrec = 0; tcConnectionrec = 0;
@ -107,10 +108,13 @@ void Dblqh::initRecords()
sizeof(LogFileOperationRecord), sizeof(LogFileOperationRecord),
clfoFileSize); clfoFileSize);
logPageRecord = (LogPageRecord*)allocRecord("LogPageRecord", logPageRecord =
sizeof(LogPageRecord), (LogPageRecord*)allocRecordAligned("LogPageRecord",
clogPageFileSize, sizeof(LogPageRecord),
false); clogPageFileSize,
&logPageRecordUnaligned,
NDB_O_DIRECT_WRITE_ALIGNMENT,
false);
pageRefRecord = (PageRefRecord*)allocRecord("PageRefRecord", pageRefRecord = (PageRefRecord*)allocRecord("PageRefRecord",
sizeof(PageRefRecord), sizeof(PageRefRecord),
@ -380,7 +384,7 @@ Dblqh::~Dblqh()
sizeof(LogFileOperationRecord), sizeof(LogFileOperationRecord),
clfoFileSize); clfoFileSize);
deallocRecord((void**)&logPageRecord, deallocRecord((void**)&logPageRecordUnaligned,
"LogPageRecord", "LogPageRecord",
sizeof(LogPageRecord), sizeof(LogPageRecord),
clogPageFileSize); clogPageFileSize);

View File

@ -1036,6 +1036,8 @@ void Dblqh::execREAD_CONFIG_REQ(Signal* signal)
cmaxAccOps = cscanrecFileSize * MAX_PARALLEL_OP_PER_SCAN; cmaxAccOps = cscanrecFileSize * MAX_PARALLEL_OP_PER_SCAN;
ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_DB_DISCLESS, &c_diskless)); ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_DB_DISCLESS, &c_diskless));
c_o_direct = true;
ndb_mgm_get_int_parameter(p, CFG_DB_O_DIRECT, &c_o_direct);
Uint32 tmp= 0; Uint32 tmp= 0;
ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_LQH_FRAG, &tmp)); ndbrequire(!ndb_mgm_get_int_parameter(p, CFG_LQH_FRAG, &tmp));
@ -13334,7 +13336,9 @@ void Dblqh::openFileRw(Signal* signal, LogFileRecordPtr olfLogFilePtr)
signal->theData[3] = olfLogFilePtr.p->fileName[1]; signal->theData[3] = olfLogFilePtr.p->fileName[1];
signal->theData[4] = olfLogFilePtr.p->fileName[2]; signal->theData[4] = olfLogFilePtr.p->fileName[2];
signal->theData[5] = olfLogFilePtr.p->fileName[3]; signal->theData[5] = olfLogFilePtr.p->fileName[3];
signal->theData[6] = ZOPEN_READ_WRITE | FsOpenReq::OM_AUTOSYNC | FsOpenReq::OM_CHECK_SIZE; signal->theData[6] = FsOpenReq::OM_READWRITE | FsOpenReq::OM_AUTOSYNC | FsOpenReq::OM_CHECK_SIZE;
if (c_o_direct)
signal->theData[6] |= FsOpenReq::OM_DIRECT;
req->auto_sync_size = MAX_REDO_PAGES_WITHOUT_SYNCH * sizeof(LogPageRecord); req->auto_sync_size = MAX_REDO_PAGES_WITHOUT_SYNCH * sizeof(LogPageRecord);
Uint64 sz = clogFileSize; Uint64 sz = clogFileSize;
sz *= 1024; sz *= 1024; sz *= 1024; sz *= 1024;
@ -13358,7 +13362,9 @@ void Dblqh::openLogfileInit(Signal* signal)
signal->theData[3] = logFilePtr.p->fileName[1]; signal->theData[3] = logFilePtr.p->fileName[1];
signal->theData[4] = logFilePtr.p->fileName[2]; signal->theData[4] = logFilePtr.p->fileName[2];
signal->theData[5] = logFilePtr.p->fileName[3]; signal->theData[5] = logFilePtr.p->fileName[3];
signal->theData[6] = 0x302 | FsOpenReq::OM_AUTOSYNC; signal->theData[6] = FsOpenReq::OM_READWRITE | FsOpenReq::OM_TRUNCATE | FsOpenReq::OM_CREATE | FsOpenReq::OM_AUTOSYNC;
if (c_o_direct)
signal->theData[6] |= FsOpenReq::OM_DIRECT;
req->auto_sync_size = MAX_REDO_PAGES_WITHOUT_SYNCH * sizeof(LogPageRecord); req->auto_sync_size = MAX_REDO_PAGES_WITHOUT_SYNCH * sizeof(LogPageRecord);
sendSignal(NDBFS_REF, GSN_FSOPENREQ, signal, FsOpenReq::SignalLength, JBA); sendSignal(NDBFS_REF, GSN_FSOPENREQ, signal, FsOpenReq::SignalLength, JBA);
}//Dblqh::openLogfileInit() }//Dblqh::openLogfileInit()
@ -13394,7 +13400,9 @@ void Dblqh::openNextLogfile(Signal* signal)
signal->theData[3] = onlLogFilePtr.p->fileName[1]; signal->theData[3] = onlLogFilePtr.p->fileName[1];
signal->theData[4] = onlLogFilePtr.p->fileName[2]; signal->theData[4] = onlLogFilePtr.p->fileName[2];
signal->theData[5] = onlLogFilePtr.p->fileName[3]; signal->theData[5] = onlLogFilePtr.p->fileName[3];
signal->theData[6] = 2 | FsOpenReq::OM_AUTOSYNC | FsOpenReq::OM_CHECK_SIZE; signal->theData[6] = FsOpenReq::OM_READWRITE | FsOpenReq::OM_AUTOSYNC | FsOpenReq::OM_CHECK_SIZE;
if (c_o_direct)
signal->theData[6] |= FsOpenReq::OM_DIRECT;
req->auto_sync_size = MAX_REDO_PAGES_WITHOUT_SYNCH * sizeof(LogPageRecord); req->auto_sync_size = MAX_REDO_PAGES_WITHOUT_SYNCH * sizeof(LogPageRecord);
Uint64 sz = clogFileSize; Uint64 sz = clogFileSize;
sz *= 1024; sz *= 1024; sz *= 1024; sz *= 1024;

View File

@ -277,6 +277,14 @@ void Ndbcntr::execSTTOR(Signal* signal)
break; break;
case ZSTART_PHASE_1: case ZSTART_PHASE_1:
jam(); jam();
{
Uint32 db_watchdog_interval = 0;
const ndb_mgm_configuration_iterator * p =
m_ctx.m_config.getOwnConfigIterator();
ndb_mgm_get_int_parameter(p, CFG_DB_WATCHDOG_INTERVAL, &db_watchdog_interval);
ndbrequire(db_watchdog_interval);
update_watch_dog_timer(db_watchdog_interval);
}
startPhase1Lab(signal); startPhase1Lab(signal);
break; break;
case ZSTART_PHASE_2: case ZSTART_PHASE_2:

View File

@ -163,7 +163,12 @@ AsyncFile::run()
theStartFlag = true; theStartFlag = true;
// Create write buffer for bigger writes // Create write buffer for bigger writes
theWriteBufferSize = WRITEBUFFERSIZE; theWriteBufferSize = WRITEBUFFERSIZE;
theWriteBuffer = (char *) ndbd_malloc(theWriteBufferSize); theWriteBufferUnaligned = (char *) ndbd_malloc(theWriteBufferSize +
NDB_O_DIRECT_WRITE_ALIGNMENT-1);
theWriteBuffer = (char *)
(((UintPtr)theWriteBufferUnaligned + NDB_O_DIRECT_WRITE_ALIGNMENT - 1) &
~(UintPtr)(NDB_O_DIRECT_WRITE_ALIGNMENT - 1));
NdbMutex_Unlock(theStartMutexPtr); NdbMutex_Unlock(theStartMutexPtr);
NdbCondition_Signal(theStartConditionPtr); NdbCondition_Signal(theStartConditionPtr);
@ -247,6 +252,78 @@ AsyncFile::run()
static char g_odirect_readbuf[2*GLOBAL_PAGE_SIZE -1]; static char g_odirect_readbuf[2*GLOBAL_PAGE_SIZE -1];
#endif #endif
int
AsyncFile::check_odirect_write(Uint32 flags, int& new_flags, int mode)
{
assert(new_flags & (O_CREAT | O_TRUNC));
#ifdef O_DIRECT
int ret;
char * bufptr = (char*)((UintPtr(g_odirect_readbuf)+(GLOBAL_PAGE_SIZE - 1)) & ~(GLOBAL_PAGE_SIZE - 1));
while (((ret = ::write(theFd, bufptr, GLOBAL_PAGE_SIZE)) == -1) &&
(errno == EINTR));
if (ret == -1)
{
new_flags &= ~O_DIRECT;
ndbout_c("%s Failed to write using O_DIRECT, disabling",
theFileName.c_str());
}
close(theFd);
theFd = ::open(theFileName.c_str(), new_flags, mode);
if (theFd == -1)
return errno;
#endif
return 0;
}
int
AsyncFile::check_odirect_read(Uint32 flags, int &new_flags, int mode)
{
#ifdef O_DIRECT
int ret;
char * bufptr = (char*)((UintPtr(g_odirect_readbuf)+(GLOBAL_PAGE_SIZE - 1)) & ~(GLOBAL_PAGE_SIZE - 1));
while (((ret = ::read(theFd, bufptr, GLOBAL_PAGE_SIZE)) == -1) &&
(errno == EINTR));
if (ret == -1)
{
ndbout_c("%s Failed to read using O_DIRECT, disabling",
theFileName.c_str());
goto reopen;
}
if(lseek(theFd, 0, SEEK_SET) != 0)
{
return errno;
}
if ((flags & FsOpenReq::OM_CHECK_SIZE) == 0)
{
struct stat buf;
if ((fstat(theFd, &buf) == -1))
{
return errno;
}
else if ((buf.st_size % GLOBAL_PAGE_SIZE) != 0)
{
ndbout_c("%s filesize not a multiple of %d, disabling O_DIRECT",
theFileName.c_str(), GLOBAL_PAGE_SIZE);
goto reopen;
}
}
return 0;
reopen:
close(theFd);
new_flags &= ~O_DIRECT;
theFd = ::open(theFileName.c_str(), new_flags, mode);
if (theFd == -1)
return errno;
#endif
return 0;
}
void AsyncFile::openReq(Request* request) void AsyncFile::openReq(Request* request)
{ {
m_auto_sync_freq = 0; m_auto_sync_freq = 0;
@ -312,7 +389,7 @@ void AsyncFile::openReq(Request* request)
} }
#else #else
Uint32 flags = request->par.open.flags; Uint32 flags = request->par.open.flags;
Uint32 new_flags = 0; int new_flags = 0;
// Convert file open flags from Solaris to Liux // Convert file open flags from Solaris to Liux
if (flags & FsOpenReq::OM_CREATE) if (flags & FsOpenReq::OM_CREATE)
@ -343,10 +420,6 @@ void AsyncFile::openReq(Request* request)
{ {
new_flags |= O_DIRECT; new_flags |= O_DIRECT;
} }
#elif defined O_SYNC
{
flags |= FsOpenReq::OM_SYNC;
}
#endif #endif
if ((flags & FsOpenReq::OM_SYNC) && ! (flags & FsOpenReq::OM_INIT)) if ((flags & FsOpenReq::OM_SYNC) && ! (flags & FsOpenReq::OM_INIT))
@ -355,15 +428,19 @@ void AsyncFile::openReq(Request* request)
new_flags |= O_SYNC; new_flags |= O_SYNC;
#endif #endif
} }
const char * rw = "";
switch(flags & 0x3){ switch(flags & 0x3){
case FsOpenReq::OM_READONLY: case FsOpenReq::OM_READONLY:
rw = "r";
new_flags |= O_RDONLY; new_flags |= O_RDONLY;
break; break;
case FsOpenReq::OM_WRITEONLY: case FsOpenReq::OM_WRITEONLY:
rw = "w";
new_flags |= O_WRONLY; new_flags |= O_WRONLY;
break; break;
case FsOpenReq::OM_READWRITE: case FsOpenReq::OM_READWRITE:
rw = "rw";
new_flags |= O_RDWR; new_flags |= O_RDWR;
break; break;
default: default:
@ -404,11 +481,6 @@ no_odirect:
if (new_flags & O_DIRECT) if (new_flags & O_DIRECT)
{ {
new_flags &= ~O_DIRECT; new_flags &= ~O_DIRECT;
flags |= FsOpenReq::OM_SYNC;
#ifdef O_SYNC
if (! (flags & FsOpenReq::OM_INIT))
new_flags |= O_SYNC;
#endif
goto no_odirect; goto no_odirect;
} }
#endif #endif
@ -421,11 +493,6 @@ no_odirect:
else if (new_flags & O_DIRECT) else if (new_flags & O_DIRECT)
{ {
new_flags &= ~O_DIRECT; new_flags &= ~O_DIRECT;
flags |= FsOpenReq::OM_SYNC;
#ifdef O_SYNC
if (! (flags & FsOpenReq::OM_INIT))
new_flags |= O_SYNC;
#endif
goto no_odirect; goto no_odirect;
} }
#endif #endif
@ -512,7 +579,6 @@ no_odirect:
{ {
ndbout_c("error on first write(%d), disable O_DIRECT", err); ndbout_c("error on first write(%d), disable O_DIRECT", err);
new_flags &= ~O_DIRECT; new_flags &= ~O_DIRECT;
flags |= FsOpenReq::OM_SYNC;
close(theFd); close(theFd);
theFd = ::open(theFileName.c_str(), new_flags, mode); theFd = ::open(theFileName.c_str(), new_flags, mode);
if (theFd != -1) if (theFd != -1)
@ -532,26 +598,32 @@ no_odirect:
else if (flags & FsOpenReq::OM_DIRECT) else if (flags & FsOpenReq::OM_DIRECT)
{ {
#ifdef O_DIRECT #ifdef O_DIRECT
do { if (flags & (FsOpenReq::OM_TRUNCATE | FsOpenReq::OM_CREATE))
int ret; {
char * bufptr = (char*)((UintPtr(g_odirect_readbuf)+(GLOBAL_PAGE_SIZE - 1)) & ~(GLOBAL_PAGE_SIZE - 1)); request->error = check_odirect_write(flags, new_flags, mode);
while (((ret = ::read(theFd, bufptr, GLOBAL_PAGE_SIZE)) == -1) && (errno == EINTR)); }
if (ret == -1) else
{ {
ndbout_c("%s Failed to read using O_DIRECT, disabling", theFileName.c_str()); request->error = check_odirect_read(flags, new_flags, mode);
flags |= FsOpenReq::OM_SYNC; }
flags |= FsOpenReq::OM_INIT;
break; if (request->error)
} return;
if(lseek(theFd, 0, SEEK_SET) != 0)
{
request->error = errno;
return;
}
} while (0);
#endif #endif
} }
#ifdef VM_TRACE
if (flags & FsOpenReq::OM_DIRECT)
{
#ifdef O_DIRECT
ndbout_c("%s %s O_DIRECT: %d",
theFileName.c_str(), rw,
!!(new_flags & O_DIRECT));
#else
ndbout_c("%s %s O_DIRECT: 0",
theFileName.c_str(), rw);
#endif
}
#endif
if ((flags & FsOpenReq::OM_SYNC) && (flags & FsOpenReq::OM_INIT)) if ((flags & FsOpenReq::OM_SYNC) && (flags & FsOpenReq::OM_INIT))
{ {
#ifdef O_SYNC #ifdef O_SYNC
@ -562,6 +634,10 @@ no_odirect:
new_flags &= ~(O_CREAT | O_TRUNC); new_flags &= ~(O_CREAT | O_TRUNC);
new_flags |= O_SYNC; new_flags |= O_SYNC;
theFd = ::open(theFileName.c_str(), new_flags, mode); theFd = ::open(theFileName.c_str(), new_flags, mode);
if (theFd == -1)
{
request->error = errno;
}
#endif #endif
} }
#endif #endif
@ -1079,7 +1155,8 @@ AsyncFile::rmrfReq(Request * request, char * path, bool removePath){
void AsyncFile::endReq() void AsyncFile::endReq()
{ {
// Thread is ended with return // Thread is ended with return
if (theWriteBuffer) ndbd_free(theWriteBuffer, theWriteBufferSize); if (theWriteBufferUnaligned)
ndbd_free(theWriteBufferUnaligned, theWriteBufferSize);
} }

View File

@ -234,9 +234,13 @@ private:
bool theStartFlag; bool theStartFlag;
int theWriteBufferSize; int theWriteBufferSize;
char* theWriteBuffer; char* theWriteBuffer;
void* theWriteBufferUnaligned;
size_t m_write_wo_sync; // Writes wo/ sync size_t m_write_wo_sync; // Writes wo/ sync
size_t m_auto_sync_freq; // Auto sync freq in bytes size_t m_auto_sync_freq; // Auto sync freq in bytes
int check_odirect_read(Uint32 flags, int&new_flags, int mode);
int check_odirect_write(Uint32 flags, int&new_flags, int mode);
public: public:
SimulatedBlock& m_fs; SimulatedBlock& m_fs;
Ptr<GlobalPage> m_page_ptr; Ptr<GlobalPage> m_page_ptr;

View File

@ -557,6 +557,9 @@ Restore::restore_next(Signal* signal, FilePtr file_ptr)
case BackupFormat::GCP_ENTRY: case BackupFormat::GCP_ENTRY:
parse_gcp_entry(signal, file_ptr, data, len); parse_gcp_entry(signal, file_ptr, data, len);
break; break;
case BackupFormat::EMPTY_ENTRY:
// skip
break;
case 0x4e444242: // 'NDBB' case 0x4e444242: // 'NDBB'
if (check_file_version(signal, ntohl(* (data+2))) == 0) if (check_file_version(signal, ntohl(* (data+2))) == 0)
{ {

View File

@ -443,6 +443,11 @@ Configuration::setupConfiguration(){
"TimeBetweenWatchDogCheck missing"); "TimeBetweenWatchDogCheck missing");
} }
if(iter.get(CFG_DB_WATCHDOG_INTERVAL_INITIAL, &_timeBetweenWatchDogCheckInitial)){
ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG, "Invalid configuration fetched",
"TimeBetweenWatchDogCheckInitial missing");
}
/** /**
* Get paths * Get paths
*/ */
@ -462,9 +467,12 @@ Configuration::setupConfiguration(){
* Create the watch dog thread * Create the watch dog thread
*/ */
{ {
Uint32 t = _timeBetweenWatchDogCheck; if (_timeBetweenWatchDogCheckInitial < _timeBetweenWatchDogCheck)
_timeBetweenWatchDogCheckInitial = _timeBetweenWatchDogCheck;
Uint32 t = _timeBetweenWatchDogCheckInitial;
t = globalEmulatorData.theWatchDog ->setCheckInterval(t); t = globalEmulatorData.theWatchDog ->setCheckInterval(t);
_timeBetweenWatchDogCheck = t; _timeBetweenWatchDogCheckInitial = t;
} }
ConfigValues* cf = ConfigValuesFactory::extractCurrentSection(iter.m_config); ConfigValues* cf = ConfigValuesFactory::extractCurrentSection(iter.m_config);

View File

@ -84,6 +84,7 @@ private:
Uint32 _maxErrorLogs; Uint32 _maxErrorLogs;
Uint32 _lockPagesInMainMemory; Uint32 _lockPagesInMainMemory;
Uint32 _timeBetweenWatchDogCheck; Uint32 _timeBetweenWatchDogCheck;
Uint32 _timeBetweenWatchDogCheckInitial;
ndb_mgm_configuration * m_ownConfig; ndb_mgm_configuration * m_ownConfig;
ndb_mgm_configuration * m_clusterConfig; ndb_mgm_configuration * m_clusterConfig;

View File

@ -19,6 +19,7 @@
#include <NdbOut.hpp> #include <NdbOut.hpp>
#include <GlobalData.hpp> #include <GlobalData.hpp>
#include <Emulator.hpp> #include <Emulator.hpp>
#include <WatchDog.hpp>
#include <ErrorHandlingMacros.hpp> #include <ErrorHandlingMacros.hpp>
#include <TimeQueue.hpp> #include <TimeQueue.hpp>
#include <TransporterRegistry.hpp> #include <TransporterRegistry.hpp>
@ -38,6 +39,9 @@
#include <AttributeDescriptor.hpp> #include <AttributeDescriptor.hpp>
#include <NdbSqlUtil.hpp> #include <NdbSqlUtil.hpp>
#include <EventLogger.hpp>
extern EventLogger g_eventLogger;
#define ljamEntry() jamEntryLine(30000 + __LINE__) #define ljamEntry() jamEntryLine(30000 + __LINE__)
#define ljam() jamLine(30000 + __LINE__) #define ljam() jamLine(30000 + __LINE__)
@ -655,14 +659,20 @@ SimulatedBlock::getBatSize(Uint16 blockNo){
return sb->theBATSize; return sb->theBATSize;
} }
void* SimulatedBlock::allocRecord(const char * type, size_t s, size_t n, bool clear, Uint32 paramId)
{
return allocRecordAligned(type, s, n, 0, 0, clear, paramId);
}
void* void*
SimulatedBlock::allocRecord(const char * type, size_t s, size_t n, bool clear, Uint32 paramId) SimulatedBlock::allocRecordAligned(const char * type, size_t s, size_t n, void **unaligned_buffer, Uint32 align, bool clear, Uint32 paramId)
{ {
void * p = NULL; void * p = NULL;
size_t size = n*s; Uint32 over_alloc = unaligned_buffer ? (align - 1) : 0;
Uint64 real_size = (Uint64)((Uint64)n)*((Uint64)s); size_t size = n*s + over_alloc;
refresh_watch_dog(); Uint64 real_size = (Uint64)((Uint64)n)*((Uint64)s) + over_alloc;
refresh_watch_dog(9);
if (real_size > 0){ if (real_size > 0){
#ifdef VM_TRACE_MEM #ifdef VM_TRACE_MEM
ndbout_c("%s::allocRecord(%s, %u, %u) = %llu bytes", ndbout_c("%s::allocRecord(%s, %u, %u) = %llu bytes",
@ -696,14 +706,24 @@ SimulatedBlock::allocRecord(const char * type, size_t s, size_t n, bool clear, U
char * ptr = (char*)p; char * ptr = (char*)p;
const Uint32 chunk = 128 * 1024; const Uint32 chunk = 128 * 1024;
while(size > chunk){ while(size > chunk){
refresh_watch_dog(); refresh_watch_dog(9);
memset(ptr, 0, chunk); memset(ptr, 0, chunk);
ptr += chunk; ptr += chunk;
size -= chunk; size -= chunk;
} }
refresh_watch_dog(); refresh_watch_dog(9);
memset(ptr, 0, size); memset(ptr, 0, size);
} }
if (unaligned_buffer)
{
*unaligned_buffer = p;
p = (void *)(((UintPtr)p + over_alloc) & ~(UintPtr)(over_alloc));
#ifdef VM_TRACE
g_eventLogger.info("'%s' (%u) %llu %llu, alignment correction %u bytes",
type, align, (Uint64)p, (Uint64)p+n*s,
(Uint32)((UintPtr)p - (UintPtr)*unaligned_buffer));
#endif
}
} }
return p; return p;
} }
@ -720,9 +740,16 @@ SimulatedBlock::deallocRecord(void ** ptr,
} }
void void
SimulatedBlock::refresh_watch_dog() SimulatedBlock::refresh_watch_dog(Uint32 place)
{ {
globalData.incrementWatchDogCounter(1); globalData.incrementWatchDogCounter(place);
}
void
SimulatedBlock::update_watch_dog_timer(Uint32 interval)
{
extern EmulatorData globalEmulatorData;
globalEmulatorData.theWatchDog->setCheckInterval(interval);
} }
void void

View File

@ -334,7 +334,8 @@ protected:
* Refresh Watch Dog in initialising code * Refresh Watch Dog in initialising code
* *
*/ */
void refresh_watch_dog(); void refresh_watch_dog(Uint32 place = 1);
void update_watch_dog_timer(Uint32 interval);
/** /**
* Prog error * Prog error
@ -377,6 +378,7 @@ protected:
* *
*/ */
void* allocRecord(const char * type, size_t s, size_t n, bool clear = true, Uint32 paramId = 0); void* allocRecord(const char * type, size_t s, size_t n, bool clear = true, Uint32 paramId = 0);
void* allocRecordAligned(const char * type, size_t s, size_t n, void **unaligned_buffer, Uint32 align = NDB_O_DIRECT_WRITE_ALIGNMENT, bool clear = true, Uint32 paramId = 0);
/** /**
* Deallocate record * Deallocate record

View File

@ -25,6 +25,8 @@
#include <ErrorHandlingMacros.hpp> #include <ErrorHandlingMacros.hpp>
#include <EventLogger.hpp> #include <EventLogger.hpp>
#include <NdbTick.h>
extern EventLogger g_eventLogger; extern EventLogger g_eventLogger;
extern "C" extern "C"
@ -72,73 +74,115 @@ WatchDog::doStop(){
} }
} }
const char *get_action(Uint32 IPValue)
{
const char *action;
switch (IPValue) {
case 1:
action = "Job Handling";
break;
case 2:
action = "Scanning Timers";
break;
case 3:
action = "External I/O";
break;
case 4:
action = "Print Job Buffers at crash";
break;
case 5:
action = "Checking connections";
break;
case 6:
action = "Performing Send";
break;
case 7:
action = "Polling for Receive";
break;
case 8:
action = "Performing Receive";
break;
case 9:
action = "Allocating memory";
break;
default:
action = "Unknown place";
break;
}//switch
return action;
}
void void
WatchDog::run(){ WatchDog::run()
unsigned int anIPValue; {
unsigned int alerts = 0; unsigned int anIPValue, sleep_time;
unsigned int oldIPValue = 0; unsigned int oldIPValue = 0;
unsigned int theIntervalCheck = theInterval;
struct MicroSecondTimer start_time, last_time, now;
NdbTick_getMicroTimer(&start_time);
last_time = start_time;
// WatchDog for the single threaded NDB // WatchDog for the single threaded NDB
while(!theStop){ while (!theStop)
Uint32 tmp = theInterval / 500; {
tmp= (tmp ? tmp : 1); sleep_time= 100;
while(!theStop && tmp > 0){ NdbSleep_MilliSleep(sleep_time);
NdbSleep_MilliSleep(500);
tmp--;
}
if(theStop) if(theStop)
break; break;
NdbTick_getMicroTimer(&now);
if (NdbTick_getMicrosPassed(last_time, now)/1000 > sleep_time*2)
{
struct tms my_tms;
times(&my_tms);
g_eventLogger.info("Watchdog: User time: %llu System time: %llu",
(Uint64)my_tms.tms_utime,
(Uint64)my_tms.tms_stime);
g_eventLogger.warning("Watchdog: Warning overslept %u ms, expected %u ms.",
NdbTick_getMicrosPassed(last_time, now)/1000,
sleep_time);
}
last_time = now;
// Verify that the IP thread is not stuck in a loop // Verify that the IP thread is not stuck in a loop
anIPValue = *theIPValue; anIPValue = *theIPValue;
if(anIPValue != 0) { if (anIPValue != 0)
{
oldIPValue = anIPValue; oldIPValue = anIPValue;
globalData.incrementWatchDogCounter(0); globalData.incrementWatchDogCounter(0);
alerts = 0; NdbTick_getMicroTimer(&start_time);
} else { theIntervalCheck = theInterval;
const char *last_stuck_action; }
alerts++; else
switch (oldIPValue) { {
case 1: int warn = 1;
last_stuck_action = "Job Handling"; Uint32 elapsed = NdbTick_getMicrosPassed(start_time, now)/1000;
break; /*
case 2: oldIPValue == 9 indicates malloc going on, this can take some time
last_stuck_action = "Scanning Timers"; so only warn if we pass the watchdog interval
break; */
case 3: if (oldIPValue == 9)
last_stuck_action = "External I/O"; if (elapsed < theIntervalCheck)
break; warn = 0;
case 4: else
last_stuck_action = "Print Job Buffers at crash"; theIntervalCheck += theInterval;
break;
case 5: if (warn)
last_stuck_action = "Checking connections";
break;
case 6:
last_stuck_action = "Performing Send";
break;
case 7:
last_stuck_action = "Polling for Receive";
break;
case 8:
last_stuck_action = "Performing Receive";
break;
default:
last_stuck_action = "Unknown place";
break;
}//switch
g_eventLogger.warning("Ndb kernel is stuck in: %s", last_stuck_action);
{ {
struct tms my_tms; const char *last_stuck_action = get_action(oldIPValue);
times(&my_tms); g_eventLogger.warning("Ndb kernel is stuck in: %s", last_stuck_action);
g_eventLogger.info("User time: %llu System time: %llu", {
(Uint64)my_tms.tms_utime, struct tms my_tms;
(Uint64)my_tms.tms_stime); times(&my_tms);
} g_eventLogger.info("Watchdog: User time: %llu System time: %llu",
if(alerts == 3){ (Uint64)my_tms.tms_utime,
shutdownSystem(last_stuck_action); (Uint64)my_tms.tms_stime);
}
if (elapsed > 3 * theInterval)
{
shutdownSystem(last_stuck_action);
}
} }
} }
} }

View File

@ -579,6 +579,18 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
"70", "70",
STR_VALUE(MAX_INT_RNIL) }, STR_VALUE(MAX_INT_RNIL) },
{
CFG_DB_WATCHDOG_INTERVAL_INITIAL,
"TimeBetweenWatchDogCheckInitial",
DB_TOKEN,
"Time between execution checks inside a database node in the early start phases when memory is allocated",
ConfigInfo::CI_USED,
true,
ConfigInfo::CI_INT,
"6000",
"70",
STR_VALUE(MAX_INT_RNIL) },
{ {
CFG_DB_STOP_ON_ERROR, CFG_DB_STOP_ON_ERROR,
"StopOnError", "StopOnError",
@ -1321,6 +1333,18 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
"0", "0",
STR_VALUE(MAX_INT_RNIL) }, STR_VALUE(MAX_INT_RNIL) },
{
CFG_DB_O_DIRECT,
"ODirect",
DB_TOKEN,
"Use O_DIRECT file write/read when possible",
ConfigInfo::CI_USED,
true,
ConfigInfo::CI_BOOL,
"false",
"false",
"true"},
/*************************************************************************** /***************************************************************************
* API * API
***************************************************************************/ ***************************************************************************/

View File

@ -873,13 +873,32 @@ bool RestoreDataIterator::readFragmentHeader(int & ret, Uint32 *fragmentId)
debug << "RestoreDataIterator::getNextFragment" << endl; debug << "RestoreDataIterator::getNextFragment" << endl;
if (buffer_read(&Header, sizeof(Header), 1) != 1){ while (1)
{
/* read first part of header */
if (buffer_read(&Header, 8, 1) != 1)
{
ret = 0;
return false;
} // if
/* skip if EMPTY_ENTRY */
Header.SectionType = ntohl(Header.SectionType);
Header.SectionLength = ntohl(Header.SectionLength);
if (Header.SectionType == BackupFormat::EMPTY_ENTRY)
{
void *tmp;
buffer_get_ptr(&tmp, Header.SectionLength*4-8, 1);
continue;
}
break;
}
/* read rest of header */
if (buffer_read(((char*)&Header)+8, sizeof(Header)-8, 1) != 1)
{
ret = 0; ret = 0;
return false; return false;
} // if }
Header.SectionType = ntohl(Header.SectionType);
Header.SectionLength = ntohl(Header.SectionLength);
Header.TableId = ntohl(Header.TableId); Header.TableId = ntohl(Header.TableId);
Header.FragmentNo = ntohl(Header.FragmentNo); Header.FragmentNo = ntohl(Header.FragmentNo);
Header.ChecksumType = ntohl(Header.ChecksumType); Header.ChecksumType = ntohl(Header.ChecksumType);