Merge jonas@perch:src/51-jonas

into  poseidon.ndb.mysql.com:/home/tomas/mysql-5.1-new-ndb
This commit is contained in:
tomas@poseidon.ndb.mysql.com 2006-06-12 13:46:10 +02:00
commit faa2f09107
26 changed files with 2796 additions and 1577 deletions

View File

@ -197,7 +197,7 @@ AC_DEFUN([MYSQL_SETUP_NDBCLUSTER], [
MAKE_BINARY_DISTRIBUTION_OPTIONS="$MAKE_BINARY_DISTRIBUTION_OPTIONS --with-ndbcluster" MAKE_BINARY_DISTRIBUTION_OPTIONS="$MAKE_BINARY_DISTRIBUTION_OPTIONS --with-ndbcluster"
# CXXFLAGS="$CXXFLAGS \$(NDB_CXXFLAGS)" CXXFLAGS="$CXXFLAGS \$(NDB_CXXFLAGS)"
if test "$have_ndb_debug" = "default" if test "$have_ndb_debug" = "default"
then then
have_ndb_debug=$with_debug have_ndb_debug=$with_debug

View File

@ -67,6 +67,9 @@ private:
static Uint32 getNRScanFlag(const Uint32 & requestInfo); static Uint32 getNRScanFlag(const Uint32 & requestInfo);
static void setNRScanFlag(Uint32 & requestInfo, Uint32 nr); static void setNRScanFlag(Uint32 & requestInfo, Uint32 nr);
static Uint32 getLcpScanFlag(const Uint32 & requestInfo);
static void setLcpScanFlag(Uint32 & requestInfo, Uint32 nr);
}; };
/** /**
@ -77,6 +80,7 @@ private:
* z = Descending (TUX) - 1 Bit 6 * z = Descending (TUX) - 1 Bit 6
* d = No disk scan - 1 Bit 7 * d = No disk scan - 1 Bit 7
* n = Node recovery scan - 1 Bit 8 * n = Node recovery scan - 1 Bit 8
* c = LCP scan - 1 Bit 9
* *
* 1111111111222222222233 * 1111111111222222222233
* 01234567890123456789012345678901 * 01234567890123456789012345678901
@ -88,6 +92,7 @@ private:
#define AS_DESCENDING_SHIFT (6) #define AS_DESCENDING_SHIFT (6)
#define AS_NO_DISK_SCAN (7) #define AS_NO_DISK_SCAN (7)
#define AS_NR_SCAN (8) #define AS_NR_SCAN (8)
#define AS_LCP_SCAN (9)
inline inline
Uint32 Uint32
@ -154,6 +159,19 @@ AccScanReq::setNRScanFlag(UintR & requestInfo, UintR val){
requestInfo |= (val << AS_NR_SCAN); requestInfo |= (val << AS_NR_SCAN);
} }
inline
Uint32
AccScanReq::getLcpScanFlag(const Uint32 & requestInfo){
return (requestInfo >> AS_LCP_SCAN) & 1;
}
inline
void
AccScanReq::setLcpScanFlag(UintR & requestInfo, UintR val){
ASSERT_BOOL(val, "AccScanReq::setNoDiskScanFlag");
requestInfo |= (val << AS_LCP_SCAN);
}
class AccScanConf { class AccScanConf {
/** /**
* Sender(s) * Sender(s)

View File

@ -141,7 +141,7 @@ public:
TuxSetLogFlags = 12002, TuxSetLogFlags = 12002,
TuxMetaDataJunk = 12009, TuxMetaDataJunk = 12009,
DumpTsman = 9000, DumpTsman = 9002,
DumpLgman = 10000, DumpLgman = 10000,
DumpPgman = 11000 DumpPgman = 11000
}; };

View File

@ -61,7 +61,8 @@ public:
static Uint32 getAttrLen(const Uint32 & requestInfo); static Uint32 getAttrLen(const Uint32 & requestInfo);
static Uint32 getScanPrio(const Uint32 & requestInfo); static Uint32 getScanPrio(const Uint32 & requestInfo);
static Uint32 getNoDiskFlag(const Uint32 & requestInfo); static Uint32 getNoDiskFlag(const Uint32 & requestInfo);
static Uint32 getLcpScanFlag(const Uint32 & requestInfo);
static void setLockMode(Uint32 & requestInfo, Uint32 lockMode); static void setLockMode(Uint32 & requestInfo, Uint32 lockMode);
static void setHoldLockFlag(Uint32 & requestInfo, Uint32 holdLock); static void setHoldLockFlag(Uint32 & requestInfo, Uint32 holdLock);
static void setKeyinfoFlag(Uint32 & requestInfo, Uint32 keyinfo); static void setKeyinfoFlag(Uint32 & requestInfo, Uint32 keyinfo);
@ -72,6 +73,7 @@ public:
static void setAttrLen(Uint32 & requestInfo, Uint32 attrLen); static void setAttrLen(Uint32 & requestInfo, Uint32 attrLen);
static void setScanPrio(Uint32& requestInfo, Uint32 prio); static void setScanPrio(Uint32& requestInfo, Uint32 prio);
static void setNoDiskFlag(Uint32& requestInfo, Uint32 val); static void setNoDiskFlag(Uint32& requestInfo, Uint32 val);
static void setLcpScanFlag(Uint32 & requestInfo, Uint32 val);
}; };
class KeyInfo20 { class KeyInfo20 {
@ -198,6 +200,7 @@ public:
* Request Info * Request Info
* *
* a = Length of attrinfo - 16 Bits (16-31) * a = Length of attrinfo - 16 Bits (16-31)
* c = LCP scan - 1 Bit 3
* d = No disk - 1 Bit 4 * d = No disk - 1 Bit 4
* l = Lock Mode - 1 Bit 5 * l = Lock Mode - 1 Bit 5
* h = Hold lock - 1 Bit 7 * h = Hold lock - 1 Bit 7
@ -205,7 +208,7 @@ public:
* r = read committed - 1 Bit 9 * r = read committed - 1 Bit 9
* x = range scan - 1 Bit 6 * x = range scan - 1 Bit 6
* z = descending - 1 Bit 10 * z = descending - 1 Bit 10
* t = tup scan -1 Bit 11 (implies x=z=0) * t = tup scan - 1 Bit 11 (implies x=z=0)
* p = Scan prio - 4 Bits (12-15) -> max 15 * p = Scan prio - 4 Bits (12-15) -> max 15
* *
* 1111111111222222222233 * 1111111111222222222233
@ -222,6 +225,7 @@ public:
#define SF_RANGE_SCAN_SHIFT (6) #define SF_RANGE_SCAN_SHIFT (6)
#define SF_DESCENDING_SHIFT (10) #define SF_DESCENDING_SHIFT (10)
#define SF_TUP_SCAN_SHIFT (11) #define SF_TUP_SCAN_SHIFT (11)
#define SF_LCP_SCAN_SHIFT (3)
#define SF_ATTR_LEN_SHIFT (16) #define SF_ATTR_LEN_SHIFT (16)
#define SF_ATTR_LEN_MASK (65535) #define SF_ATTR_LEN_MASK (65535)
@ -359,6 +363,19 @@ ScanFragReq::setNoDiskFlag(UintR & requestInfo, UintR val){
requestInfo |= (val << SF_NO_DISK_SHIFT); requestInfo |= (val << SF_NO_DISK_SHIFT);
} }
inline
Uint32
ScanFragReq::getLcpScanFlag(const Uint32 & requestInfo){
return (requestInfo >> SF_LCP_SCAN_SHIFT) & 1;
}
inline
void
ScanFragReq::setLcpScanFlag(UintR & requestInfo, UintR val){
ASSERT_BOOL(val, "ScanFragReq::setLcpScanFlag");
requestInfo |= (val << SF_LCP_SCAN_SHIFT);
}
inline inline
Uint32 Uint32
KeyInfo20::setScanInfo(Uint32 opNo, Uint32 scanNo){ KeyInfo20::setScanInfo(Uint32 opNo, Uint32 scanNo){

View File

@ -2985,22 +2985,25 @@ Backup::parseTableDescription(Signal* signal,
if (disk) if (disk)
{ {
/** /**
* Remove all disk attributes, but add DISK_REF (8 bytes) * Remove all disk attributes
*/ */
tabPtr.p->noOfAttributes -= (disk - 1); tabPtr.p->noOfAttributes -= disk;
AttributePtr attrPtr; {
ndbrequire(tabPtr.p->attributes.seize(attrPtr)); AttributePtr attrPtr;
ndbrequire(tabPtr.p->attributes.seize(attrPtr));
Uint32 sz32 = 2;
attrPtr.p->data.attrId = AttributeHeader::DISK_REF; Uint32 sz32 = 2;
attrPtr.p->data.m_flags = Attribute::COL_FIXED; attrPtr.p->data.attrId = AttributeHeader::DISK_REF;
attrPtr.p->data.sz32 = 2; attrPtr.p->data.m_flags = Attribute::COL_FIXED;
attrPtr.p->data.sz32 = 2;
attrPtr.p->data.offset = tabPtr.p->sz_FixedAttributes;
tabPtr.p->sz_FixedAttributes += sz32; attrPtr.p->data.offset = tabPtr.p->sz_FixedAttributes;
tabPtr.p->sz_FixedAttributes += sz32;
tabPtr.p->noOfAttributes ++;
}
} }
{ {
AttributePtr attrPtr; AttributePtr attrPtr;
ndbrequire(tabPtr.p->attributes.seize(attrPtr)); ndbrequire(tabPtr.p->attributes.seize(attrPtr));
@ -3309,6 +3312,7 @@ Backup::execBACKUP_FRAGMENT_REQ(Signal* signal)
ScanFragReq::setScanPrio(req->requestInfo, 1); ScanFragReq::setScanPrio(req->requestInfo, 1);
ScanFragReq::setTupScanFlag(req->requestInfo, 1); ScanFragReq::setTupScanFlag(req->requestInfo, 1);
ScanFragReq::setNoDiskFlag(req->requestInfo, 1); ScanFragReq::setNoDiskFlag(req->requestInfo, 1);
ScanFragReq::setLcpScanFlag(req->requestInfo, 1);
} }
req->transId1 = 0; req->transId1 = 0;
req->transId2 = (BACKUP << 20) + (getOwnNodeId() << 8); req->transId2 = (BACKUP << 20) + (getOwnNodeId() << 8);

View File

@ -17,14 +17,13 @@
#ifndef DBACC_H #ifndef DBACC_H
#define DBACC_H #define DBACC_H
#ifdef VM_TRACE
#define ACC_SAFE_QUEUE
#endif
#include <pc.hpp> #include <pc.hpp>
#include <SimulatedBlock.hpp> #include <SimulatedBlock.hpp>
// primary key is stored in TUP
#include "../dbtup/Dbtup.hpp"
#ifdef DBACC_C #ifdef DBACC_C
// Debug Macros // Debug Macros
#define dbgWord32(ptr, ind, val) #define dbgWord32(ptr, ind, val)
@ -135,7 +134,10 @@ ndbout << "Ptr: " << ptr.p->word32 << " \tIndex: " << tmp_string << " \tValue: "
#define ZRIGHT 2 #define ZRIGHT 2
#define ZROOTFRAGMENTSIZE 32 #define ZROOTFRAGMENTSIZE 32
#define ZSCAN_LOCK_ALL 3 #define ZSCAN_LOCK_ALL 3
#define ZSCAN_OP 5 /**
* Check kernel_types for other operation types
*/
#define ZSCAN_OP 6
#define ZSCAN_REC_SIZE 256 #define ZSCAN_REC_SIZE 256
#define ZSTAND_BY 2 #define ZSTAND_BY 2
#define ZTABLESIZE 16 #define ZTABLESIZE 16
@ -358,7 +360,6 @@ struct Fragmentrec {
// List of lock owners and list of lock waiters to support LCP handling // List of lock owners and list of lock waiters to support LCP handling
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
Uint32 lockOwnersList; Uint32 lockOwnersList;
Uint32 m_current_sequence_no;
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
// References to Directory Ranges (which in turn references directories, which // References to Directory Ranges (which in turn references directories, which
@ -478,7 +479,7 @@ struct Fragmentrec {
/* OPERATIONREC */ /* OPERATIONREC */
/* --------------------------------------------------------------------------------- */ /* --------------------------------------------------------------------------------- */
struct Operationrec { struct Operationrec {
Uint32 keydata[8]; Uint32 m_op_bits;
Uint32 localdata[2]; Uint32 localdata[2];
Uint32 elementIsforward; Uint32 elementIsforward;
Uint32 elementPage; Uint32 elementPage;
@ -487,42 +488,61 @@ struct Operationrec {
Uint32 fragptr; Uint32 fragptr;
Uint32 hashvaluePart; Uint32 hashvaluePart;
Uint32 hashValue; Uint32 hashValue;
Uint32 insertDeleteLen;
Uint32 keyinfoPage;
Uint32 nextLockOwnerOp; Uint32 nextLockOwnerOp;
Uint32 nextOp; Uint32 nextOp;
Uint32 nextParallelQue; Uint32 nextParallelQue;
Uint32 nextQueOp; union {
Uint32 nextSerialQue; Uint32 nextSerialQue;
Uint32 m_lock_owner_ptr_i; // if nextParallelQue = RNIL, else undefined
};
Uint32 prevOp; Uint32 prevOp;
Uint32 prevLockOwnerOp; Uint32 prevLockOwnerOp;
Uint32 prevParallelQue; union {
Uint32 prevQueOp; Uint32 prevParallelQue;
Uint32 prevSerialQue; Uint32 m_lo_last_parallel_op_ptr_i;
};
union {
Uint32 prevSerialQue;
Uint32 m_lo_last_serial_op_ptr_i;
};
Uint32 scanRecPtr; Uint32 scanRecPtr;
Uint32 transId1; Uint32 transId1;
Uint32 transId2; Uint32 transId2;
Uint32 longPagePtr;
Uint32 longKeyPageIndex;
Uint32 m_sequence_no;
State opState;
Uint32 userptr; Uint32 userptr;
State transactionstate;
Uint16 elementContainer; Uint16 elementContainer;
Uint16 tupkeylen; Uint16 tupkeylen;
Uint32 xfrmtupkeylen; Uint32 xfrmtupkeylen;
Uint32 userblockref; Uint32 userblockref;
Uint32 scanBits; Uint32 scanBits;
Uint8 elementIsDisappeared;
Uint8 insertIsDone; enum OpBits {
Uint8 lockMode; OP_MASK = 0x0000F // 4 bits for operation type
Uint8 lockOwner; ,OP_LOCK_MODE = 0x00010 // 0 - shared lock, 1 = exclusive lock
Uint8 nodeType; ,OP_ACC_LOCK_MODE = 0x00020 // Or:de lock mode of all operation
Uint8 operation; // before me
Uint8 opSimple; ,OP_LOCK_OWNER = 0x00040
Uint8 dirtyRead; ,OP_RUN_QUEUE = 0x00080 // In parallell queue of lock owner
Uint8 commitDeleteCheckFlag; ,OP_DIRTY_READ = 0x00100
Uint8 isAccLockReq; ,OP_LOCK_REQ = 0x00200 // isAccLockReq
,OP_COMMIT_DELETE_CHECK = 0x00400
,OP_INSERT_IS_DONE = 0x00800
,OP_ELEMENT_DISAPPEARED = 0x01000
,OP_STATE_MASK = 0xF0000
,OP_STATE_IDLE = 0xF0000
,OP_STATE_WAITING = 0x00000
,OP_STATE_RUNNING = 0x10000
,OP_STATE_EXECUTED = 0x30000
,OP_EXECUTED_DIRTY_READ = 0x3050F
,OP_INITIAL = ~(Uint32)0
};
bool is_same_trans(const Operationrec* op) const {
return
transId1 == op->transId1 && transId2 == op->transId2;
}
}; /* p2c: size = 168 bytes */ }; /* p2c: size = 168 bytes */
typedef Ptr<Operationrec> OperationrecPtr; typedef Ptr<Operationrec> OperationrecPtr;
@ -585,7 +605,6 @@ struct ScanRec {
Uint32 scanUserblockref; Uint32 scanUserblockref;
Uint32 scanMask; Uint32 scanMask;
Uint8 scanLockMode; Uint8 scanLockMode;
Uint8 scanKeyinfoFlag;
Uint8 scanTimer; Uint8 scanTimer;
Uint8 scanContinuebCounter; Uint8 scanContinuebCounter;
Uint8 scanReadCommittedFlag; Uint8 scanReadCommittedFlag;
@ -610,7 +629,8 @@ public:
virtual ~Dbacc(); virtual ~Dbacc();
// pointer to TUP instance in this thread // pointer to TUP instance in this thread
Dbtup* c_tup; class Dbtup* c_tup;
class Dblqh* c_lqh;
void execACCMINUPDATE(Signal* signal); void execACCMINUPDATE(Signal* signal);
@ -648,7 +668,8 @@ private:
void ACCKEY_error(Uint32 fromWhere); void ACCKEY_error(Uint32 fromWhere);
void commitDeleteCheck(); void commitDeleteCheck();
void report_dealloc(Signal* signal, const Operationrec* opPtrP);
typedef void * RootfragmentrecPtr; typedef void * RootfragmentrecPtr;
void initRootFragPageZero(FragmentrecPtr, Page8Ptr); void initRootFragPageZero(FragmentrecPtr, Page8Ptr);
void initFragAdd(Signal*, FragmentrecPtr); void initFragAdd(Signal*, FragmentrecPtr);
@ -687,14 +708,30 @@ private:
bool addfragtotab(Signal* signal, Uint32 rootIndex, Uint32 fragId); bool addfragtotab(Signal* signal, Uint32 rootIndex, Uint32 fragId);
void initOpRec(Signal* signal); void initOpRec(Signal* signal);
void sendAcckeyconf(Signal* signal); void sendAcckeyconf(Signal* signal);
Uint32 placeReadInLockQueue(Signal* signal);
void placeSerialQueueRead(Signal* signal);
void checkOnlyReadEntry(Signal* signal);
Uint32 getNoParallelTransaction(const Operationrec*); Uint32 getNoParallelTransaction(const Operationrec*);
void moveLastParallelQueue(Signal* signal);
void moveLastParallelQueueWrite(Signal* signal); #ifdef VM_TRACE
Uint32 placeWriteInLockQueue(Signal* signal); Uint32 getNoParallelTransactionFull(const Operationrec*);
void placeSerialQueueWrite(Signal* signal); #endif
#ifdef ACC_SAFE_QUEUE
bool validate_lock_queue(OperationrecPtr opPtr);
Uint32 get_parallel_head(OperationrecPtr opPtr);
void dump_lock_queue(OperationrecPtr loPtr);
#else
bool validate_lock_queue(OperationrecPtr) { return true;}
#endif
public:
void execACCKEY_ORD(Signal* signal, Uint32 opPtrI);
void startNext(Signal* signal, OperationrecPtr lastOp);
private:
Uint32 placeReadInLockQueue(OperationrecPtr lockOwnerPtr);
Uint32 placeWriteInLockQueue(OperationrecPtr lockOwnerPtr);
void placeSerialQueue(OperationrecPtr lockOwner, OperationrecPtr op);
void abortSerieQueueOperation(Signal* signal, OperationrecPtr op);
void abortParallelQueueOperation(Signal* signal, OperationrecPtr op);
void expandcontainer(Signal* signal); void expandcontainer(Signal* signal);
void shrinkcontainer(Signal* signal); void shrinkcontainer(Signal* signal);
void nextcontainerinfoExp(Signal* signal); void nextcontainerinfoExp(Signal* signal);
@ -724,8 +761,8 @@ private:
void increaselistcont(Signal* signal); void increaselistcont(Signal* signal);
void seizeLeftlist(Signal* signal); void seizeLeftlist(Signal* signal);
void seizeRightlist(Signal* signal); void seizeRightlist(Signal* signal);
Uint32 readTablePk(Uint32 localkey1); Uint32 readTablePk(Uint32 localkey1, Uint32 eh, const Operationrec*);
void getElement(Signal* signal); Uint32 getElement(Signal* signal, OperationrecPtr& lockOwner);
void getdirindex(Signal* signal); void getdirindex(Signal* signal);
void commitdelete(Signal* signal); void commitdelete(Signal* signal);
void deleteElement(Signal* signal); void deleteElement(Signal* signal);
@ -734,12 +771,17 @@ private:
void releaseRightlist(Signal* signal); void releaseRightlist(Signal* signal);
void checkoverfreelist(Signal* signal); void checkoverfreelist(Signal* signal);
void abortOperation(Signal* signal); void abortOperation(Signal* signal);
void accAbortReqLab(Signal* signal);
void commitOperation(Signal* signal); void commitOperation(Signal* signal);
void copyOpInfo(Signal* signal); void copyOpInfo(OperationrecPtr dst, OperationrecPtr src);
Uint32 executeNextOperation(Signal* signal); Uint32 executeNextOperation(Signal* signal);
void releaselock(Signal* signal); void releaselock(Signal* signal);
void release_lockowner(Signal* signal, OperationrecPtr, bool commit);
void startNew(Signal* signal, OperationrecPtr newOwner);
void abortWaitingOperation(Signal*, OperationrecPtr);
void abortExecutedOperation(Signal*, OperationrecPtr);
void takeOutFragWaitQue(Signal* signal); void takeOutFragWaitQue(Signal* signal);
void check_lock_upgrade(Signal* signal, OperationrecPtr release_op, bool lo);
void check_lock_upgrade(Signal* signal, OperationrecPtr lock_owner, void check_lock_upgrade(Signal* signal, OperationrecPtr lock_owner,
OperationrecPtr release_op); OperationrecPtr release_op);
void allocOverflowPage(Signal* signal); void allocOverflowPage(Signal* signal);
@ -788,8 +830,8 @@ private:
void senddatapagesLab(Signal* signal); void senddatapagesLab(Signal* signal);
void sttorrysignalLab(Signal* signal); void sttorrysignalLab(Signal* signal);
void sendholdconfsignalLab(Signal* signal); void sendholdconfsignalLab(Signal* signal);
void accIsLockedLab(Signal* signal); void accIsLockedLab(Signal* signal, OperationrecPtr lockOwnerPtr);
void insertExistElemLab(Signal* signal); void insertExistElemLab(Signal* signal, OperationrecPtr lockOwnerPtr);
void refaccConnectLab(Signal* signal); void refaccConnectLab(Signal* signal);
void releaseScanLab(Signal* signal); void releaseScanLab(Signal* signal);
void ndbrestart1Lab(Signal* signal); void ndbrestart1Lab(Signal* signal);
@ -848,8 +890,6 @@ private:
Operationrec *operationrec; Operationrec *operationrec;
OperationrecPtr operationRecPtr; OperationrecPtr operationRecPtr;
OperationrecPtr idrOperationRecPtr; OperationrecPtr idrOperationRecPtr;
OperationrecPtr copyInOperPtr;
OperationrecPtr copyOperPtr;
OperationrecPtr mlpqOperPtr; OperationrecPtr mlpqOperPtr;
OperationrecPtr queOperPtr; OperationrecPtr queOperPtr;
OperationrecPtr readWriteOpPtr; OperationrecPtr readWriteOpPtr;
@ -893,8 +933,6 @@ private:
Page8Ptr lcnPageptr; Page8Ptr lcnPageptr;
Page8Ptr lcnCopyPageptr; Page8Ptr lcnCopyPageptr;
Page8Ptr lupPageptr; Page8Ptr lupPageptr;
Page8Ptr priPageptr;
Page8Ptr pwiPageptr;
Page8Ptr ciPageidptr; Page8Ptr ciPageidptr;
Page8Ptr gsePageidptr; Page8Ptr gsePageidptr;
Page8Ptr isoPageptr; Page8Ptr isoPageptr;
@ -934,8 +972,6 @@ private:
Tabrec *tabrec; Tabrec *tabrec;
TabrecPtr tabptr; TabrecPtr tabptr;
Uint32 ctablesize; Uint32 ctablesize;
Uint32 tpwiElementptr;
Uint32 tpriElementptr;
Uint32 tgseElementptr; Uint32 tgseElementptr;
Uint32 tgseContainerptr; Uint32 tgseContainerptr;
Uint32 trlHead; Uint32 trlHead;
@ -969,8 +1005,6 @@ private:
Uint32 tdelForward; Uint32 tdelForward;
Uint32 tiopPageId; Uint32 tiopPageId;
Uint32 tipPageId; Uint32 tipPageId;
Uint32 tgeLocked;
Uint32 tgeResult;
Uint32 tgeContainerptr; Uint32 tgeContainerptr;
Uint32 tgeElementptr; Uint32 tgeElementptr;
Uint32 tgeForward; Uint32 tgeForward;

View File

@ -130,8 +130,6 @@ Dbacc::Dbacc(Block_context& ctx):
&fragrecptr, &fragrecptr,
&operationRecPtr, &operationRecPtr,
&idrOperationRecPtr, &idrOperationRecPtr,
&copyInOperPtr,
&copyOperPtr,
&mlpqOperPtr, &mlpqOperPtr,
&queOperPtr, &queOperPtr,
&readWriteOpPtr, &readWriteOpPtr,
@ -161,8 +159,6 @@ Dbacc::Dbacc(Block_context& ctx):
&lcnPageptr, &lcnPageptr,
&lcnCopyPageptr, &lcnCopyPageptr,
&lupPageptr, &lupPageptr,
&priPageptr,
&pwiPageptr,
&ciPageidptr, &ciPageidptr,
&gsePageidptr, &gsePageidptr,
&isoPageptr, &isoPageptr,

File diff suppressed because it is too large Load Diff

View File

@ -33,7 +33,8 @@
// primary key is stored in TUP // primary key is stored in TUP
#include "../dbtup/Dbtup.hpp" #include "../dbtup/Dbtup.hpp"
#include "../dbacc/Dbacc.hpp" class Dbacc;
class Dbtup;
#ifdef DBLQH_C #ifdef DBLQH_C
// Constants // Constants
@ -571,6 +572,7 @@ public:
Uint8 rangeScan; Uint8 rangeScan;
Uint8 descending; Uint8 descending;
Uint8 tupScan; Uint8 tupScan;
Uint8 lcpScan;
Uint8 scanTcWaiting; Uint8 scanTcWaiting;
Uint8 scanKeyinfoFlag; Uint8 scanKeyinfoFlag;
Uint8 m_last_row; Uint8 m_last_row;
@ -2556,8 +2558,19 @@ private:
Dbtup* c_tup; Dbtup* c_tup;
Dbacc* c_acc; Dbacc* c_acc;
/**
* Read primary key from tup
*/
Uint32 readPrimaryKeys(ScanRecord*, TcConnectionrec*, Uint32 * dst); Uint32 readPrimaryKeys(ScanRecord*, TcConnectionrec*, Uint32 * dst);
/**
* Read primary key from operation
*/
public:
Uint32 readPrimaryKeys(Uint32 opPtrI, Uint32 * dst, bool xfrm);
private:
void acckeyconf_tupkeyreq(Signal*, TcConnectionrec*, Fragrecord*, Uint32, Uint32); void acckeyconf_tupkeyreq(Signal*, TcConnectionrec*, Fragrecord*, Uint32, Uint32);
void acckeyconf_load_diskpage(Signal*,TcConnectionrecPtr,Fragrecord*,Uint32); void acckeyconf_load_diskpage(Signal*,TcConnectionrecPtr,Fragrecord*,Uint32);
@ -2924,6 +2937,11 @@ public:
} }
DLHashTable<ScanRecord> c_scanTakeOverHash; DLHashTable<ScanRecord> c_scanTakeOverHash;
inline bool TRACE_OP_CHECK(const TcConnectionrec* regTcPtr);
#ifdef ERROR_INSERT
void TRACE_OP_DUMP(const TcConnectionrec* regTcPtr, const char * pos);
#endif
}; };
inline inline
@ -2991,10 +3009,19 @@ Dblqh::accminupdate(Signal* signal, Uint32 opId, const Local_key* key)
signal->theData[1] = key->m_page_no << MAX_TUPLES_BITS | key->m_page_idx; signal->theData[1] = key->m_page_no << MAX_TUPLES_BITS | key->m_page_idx;
c_acc->execACCMINUPDATE(signal); c_acc->execACCMINUPDATE(signal);
if (ERROR_INSERTED(5712)) if (ERROR_INSERTED(5712) || ERROR_INSERTED(5713))
ndbout << " LK: " << *key; ndbout << " LK: " << *key;
regTcPtr.p->m_row_id = *key; regTcPtr.p->m_row_id = *key;
} }
inline
bool
Dblqh::TRACE_OP_CHECK(const TcConnectionrec* regTcPtr)
{
return (ERROR_INSERTED(5712) &&
(regTcPtr->operation == ZINSERT ||
regTcPtr->operation == ZDELETE)) ||
ERROR_INSERTED(5713);
}
#endif #endif

View File

@ -67,48 +67,70 @@
// seen only when we debug the product // seen only when we debug the product
#ifdef VM_TRACE #ifdef VM_TRACE
#define DEBUG(x) ndbout << "DBLQH: "<< x << endl; #define DEBUG(x) ndbout << "DBLQH: "<< x << endl;
static
NdbOut & NdbOut &
operator<<(NdbOut& out, Dblqh::TcConnectionrec::TransactionState state){ operator<<(NdbOut& out, Dblqh::TcConnectionrec::TransactionState state){
out << (int)state; out << (int)state;
return out; return out;
} }
static
NdbOut & NdbOut &
operator<<(NdbOut& out, Dblqh::TcConnectionrec::LogWriteState state){ operator<<(NdbOut& out, Dblqh::TcConnectionrec::LogWriteState state){
out << (int)state; out << (int)state;
return out; return out;
} }
static
NdbOut & NdbOut &
operator<<(NdbOut& out, Dblqh::TcConnectionrec::ListState state){ operator<<(NdbOut& out, Dblqh::TcConnectionrec::ListState state){
out << (int)state; out << (int)state;
return out; return out;
} }
static
NdbOut & NdbOut &
operator<<(NdbOut& out, Dblqh::TcConnectionrec::AbortState state){ operator<<(NdbOut& out, Dblqh::TcConnectionrec::AbortState state){
out << (int)state; out << (int)state;
return out; return out;
} }
static
NdbOut & NdbOut &
operator<<(NdbOut& out, Dblqh::ScanRecord::ScanState state){ operator<<(NdbOut& out, Dblqh::ScanRecord::ScanState state){
out << (int)state; out << (int)state;
return out; return out;
} }
static
NdbOut & NdbOut &
operator<<(NdbOut& out, Dblqh::LogFileOperationRecord::LfoState state){ operator<<(NdbOut& out, Dblqh::LogFileOperationRecord::LfoState state){
out << (int)state; out << (int)state;
return out; return out;
} }
static
NdbOut & NdbOut &
operator<<(NdbOut& out, Dblqh::ScanRecord::ScanType state){ operator<<(NdbOut& out, Dblqh::ScanRecord::ScanType state){
out << (int)state; out << (int)state;
return out; return out;
} }
static
NdbOut &
operator<<(NdbOut& out, Operation_t op)
{
switch(op){
case ZREAD: out << "READ"; break;
case ZREAD_EX: out << "READ-EX"; break;
case ZINSERT: out << "INSERT"; break;
case ZUPDATE: out << "UPDATE"; break;
case ZDELETE: out << "DELETE"; break;
case ZWRITE: out << "WRITE"; break;
}
return out;
}
#else #else
#define DEBUG(x) #define DEBUG(x)
#endif #endif
@ -120,7 +142,7 @@ const Uint32 NR_ScanNo = 0;
#if defined VM_TRACE || defined ERROR_INSERT || defined NDBD_TRACENR #if defined VM_TRACE || defined ERROR_INSERT || defined NDBD_TRACENR
#include <NdbConfig.h> #include <NdbConfig.h>
NdbOut * tracenrout = 0; static NdbOut * tracenrout = 0;
static int TRACENR_FLAG = 0; static int TRACENR_FLAG = 0;
#define TRACENR(x) (* tracenrout) << x #define TRACENR(x) (* tracenrout) << x
#define SET_TRACENR_FLAG TRACENR_FLAG = 1 #define SET_TRACENR_FLAG TRACENR_FLAG = 1
@ -132,6 +154,13 @@ static int TRACENR_FLAG = 0;
#define CLEAR_TRACENR_FLAG #define CLEAR_TRACENR_FLAG
#endif #endif
#ifdef ERROR_INSERT
static NdbOut * traceopout = 0;
#define TRACE_OP(regTcPtr, place) do { if (TRACE_OP_CHECK(regTcPtr)) TRACE_OP_DUMP(regTcPtr, place); } while(0)
#else
#define TRACE_OP(x, y) {}
#endif
/* ------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------- */
/* ------- SEND SYSTEM ERROR ------- */ /* ------- SEND SYSTEM ERROR ------- */
/* */ /* */
@ -454,6 +483,10 @@ void Dblqh::execSTTOR(Signal* signal)
name = NdbConfig_SignalLogFileName(getOwnNodeId()); name = NdbConfig_SignalLogFileName(getOwnNodeId());
tracenrout = new NdbOut(* new FileOutputStream(fopen(name, "w+"))); tracenrout = new NdbOut(* new FileOutputStream(fopen(name, "w+")));
#endif #endif
#ifdef ERROR_INSERT
traceopout = &ndbout;
#endif
return; return;
break; break;
@ -2531,14 +2564,15 @@ void Dblqh::execTUPKEYCONF(Signal* signal)
case TcConnectionrec::WAIT_ACC_ABORT: case TcConnectionrec::WAIT_ACC_ABORT:
case TcConnectionrec::ABORT_QUEUED: case TcConnectionrec::ABORT_QUEUED:
jam(); jam();
/* -------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------- */
/* IGNORE SINCE ABORT OF THIS OPERATION IS ONGOING ALREADY. */ /* IGNORE SINCE ABORT OF THIS OPERATION IS ONGOING ALREADY. */
/* -------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------- */
break; break;
default: default:
ndbrequire(false); ndbrequire(false);
break; break;
}//switch }//switch
}//Dblqh::execTUPKEYCONF() }//Dblqh::execTUPKEYCONF()
/* ************> */ /* ************> */
@ -2560,6 +2594,8 @@ void Dblqh::execTUPKEYREF(Signal* signal)
c_fragment_pool.getPtr(regFragptr); c_fragment_pool.getPtr(regFragptr);
fragptr = regFragptr; fragptr = regFragptr;
TRACE_OP(regTcPtr, "TUPKEYREF");
if (unlikely(activeCreat == Fragrecord::AC_NR_COPY)) if (unlikely(activeCreat == Fragrecord::AC_NR_COPY))
{ {
jam(); jam();
@ -2568,7 +2604,7 @@ void Dblqh::execTUPKEYREF(Signal* signal)
ndbassert(regTcPtr->transactionState == TcConnectionrec::WAIT_TUP || ndbassert(regTcPtr->transactionState == TcConnectionrec::WAIT_TUP ||
regTcPtr->transactionState ==TcConnectionrec::WAIT_TUP_TO_ABORT); regTcPtr->transactionState ==TcConnectionrec::WAIT_TUP_TO_ABORT);
} }
switch (tcConnectptr.p->transactionState) { switch (tcConnectptr.p->transactionState) {
case TcConnectionrec::WAIT_TUP: case TcConnectionrec::WAIT_TUP:
jam(); jam();
@ -3606,57 +3642,7 @@ void Dblqh::endgettupkeyLab(Signal* signal)
regTcPtr->transactionState = TcConnectionrec::WAIT_ATTR; regTcPtr->transactionState = TcConnectionrec::WAIT_ATTR;
return; return;
}//if }//if
//#define TRACE_LQHKEYREQ
#ifdef TRACE_LQHKEYREQ
{
ndbout << (regTcPtr->operation == ZREAD ? "READ" :
regTcPtr->operation == ZUPDATE ? "UPDATE" :
regTcPtr->operation == ZINSERT ? "INSERT" :
regTcPtr->operation == ZDELETE ? "DELETE" : "<Other>")
<< "(" << (int)regTcPtr->operation << ")"
<< " from=(" << getBlockName(refToBlock(regTcPtr->clientBlockref))
<< ", " << refToNode(regTcPtr->clientBlockref) << ")"
<< " table=" << regTcPtr->tableref << " ";
ndbout << "hash: " << hex << regTcPtr->hashValue << endl;
ndbout << "key=[" << hex;
Uint32 i;
for(i = 0; i<regTcPtr->primKeyLen && i < 4; i++){
ndbout << hex << regTcPtr->tupkeyData[i] << " ";
}
DatabufPtr regDatabufptr;
regDatabufptr.i = regTcPtr->firstTupkeybuf;
while(i < regTcPtr->primKeyLen)
{
ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
for(Uint32 j = 0; j<4 && i<regTcPtr->primKeyLen; j++, i++)
ndbout << hex << regDatabufptr.p->data[j] << " ";
}
ndbout << "]" << endl;
ndbout << "attr=[" << hex;
for(i = 0; i<regTcPtr->reclenAiLqhkey && i < 5; i++)
ndbout << hex << regTcPtr->firstAttrinfo[i] << " ";
AttrbufPtr regAttrinbufptr;
regAttrinbufptr.i= regTcPtr->firstAttrinbuf;
while(i < regTcPtr->totReclenAi)
{
ptrCheckGuard(regAttrinbufptr, cattrinbufFileSize, attrbuf);
Uint32 dataLen = regAttrinbufptr.p->attrbuf[ZINBUF_DATA_LEN];
ndbrequire(dataLen != 0);
ndbrequire(i + dataLen <= regTcPtr->totReclenAi);
for(Uint32 j= 0; j<dataLen; j++, i++)
ndbout << hex << regAttrinbufptr.p->attrbuf[j] << " ";
regAttrinbufptr.i = regAttrinbufptr.p->attrbuf[ZINBUF_NEXT];
}
ndbout << "]" << endl;
}
#endif
/* ---------------------------------------------------------------------- */ /* ---------------------------------------------------------------------- */
/* NOW RECEPTION OF LQHKEYREQ IS COMPLETED THE NEXT STEP IS TO START*/ /* NOW RECEPTION OF LQHKEYREQ IS COMPLETED THE NEXT STEP IS TO START*/
/* PROCESSING THE MESSAGE. IF THE MESSAGE IS TO A STAND-BY NODE */ /* PROCESSING THE MESSAGE. IF THE MESSAGE IS TO A STAND-BY NODE */
@ -3763,6 +3749,7 @@ void Dblqh::prepareContinueAfterBlockedLab(Signal* signal)
/* ----------------------------------------------------------------- */ /* ----------------------------------------------------------------- */
if (TRACENR_FLAG) if (TRACENR_FLAG)
{ {
TRACE_OP(regTcPtr, "RECEIVED");
switch (regTcPtr->operation) { switch (regTcPtr->operation) {
case ZREAD: TRACENR("READ"); break; case ZREAD: TRACENR("READ"); break;
case ZUPDATE: TRACENR("UPDATE"); break; case ZUPDATE: TRACENR("UPDATE"); break;
@ -3847,6 +3834,9 @@ Dblqh::exec_acckeyreq(Signal* signal, TcConnectionrecPtr regTcPtr)
signal->theData[8] = sig2; signal->theData[8] = sig2;
signal->theData[9] = sig3; signal->theData[9] = sig3;
signal->theData[10] = sig4; signal->theData[10] = sig4;
TRACE_OP(regTcPtr.p, "ACC");
if (regTcPtr.p->primKeyLen > 4) { if (regTcPtr.p->primKeyLen > 4) {
sendKeyinfoAcc(signal, 11); sendKeyinfoAcc(signal, 11);
}//if }//if
@ -4133,7 +4123,7 @@ Dblqh::nr_copy_delete_row(Signal* signal,
jam(); jam();
ndbrequire(rowid == 0); ndbrequire(rowid == 0);
signal->theData[0] = accPtr; signal->theData[0] = accPtr;
signal->theData[1] = false; signal->theData[1] = 0;
EXECUTE_DIRECT(ref, GSN_ACC_ABORTREQ, signal, 2); EXECUTE_DIRECT(ref, GSN_ACC_ABORTREQ, signal, 2);
jamEntry(); jamEntry();
return; return;
@ -4144,16 +4134,18 @@ Dblqh::nr_copy_delete_row(Signal* signal,
*/ */
ndbrequire(regTcPtr.p->m_dealloc == 0); ndbrequire(regTcPtr.p->m_dealloc == 0);
Local_key save = regTcPtr.p->m_row_id; Local_key save = regTcPtr.p->m_row_id;
signal->theData[0] = regTcPtr.p->accConnectrec;
c_acc->execACCKEY_ORD(signal, accPtr);
signal->theData[0] = accPtr;
EXECUTE_DIRECT(ref, GSN_ACC_COMMITREQ, signal, 1); EXECUTE_DIRECT(ref, GSN_ACC_COMMITREQ, signal, 1);
jamEntry(); jamEntry();
ndbrequire(regTcPtr.p->m_dealloc == 1); ndbrequire(regTcPtr.p->m_dealloc == 1);
int ret = c_tup->nr_delete(signal, regTcPtr.i, int ret = c_tup->nr_delete(signal, regTcPtr.i,
fragPtr.p->tupFragptr, &regTcPtr.p->m_row_id, fragPtr.p->tupFragptr, &regTcPtr.p->m_row_id,
regTcPtr.p->gci); regTcPtr.p->gci);
jamEntry(); jamEntry();
if (ret) if (ret)
{ {
ndbassert(ret == 1); ndbassert(ret == 1);
@ -4167,7 +4159,7 @@ Dblqh::nr_copy_delete_row(Signal* signal,
} }
TRACENR("DELETED: " << regTcPtr.p->m_row_id << endl); TRACENR("DELETED: " << regTcPtr.p->m_row_id << endl);
regTcPtr.p->m_dealloc = 0; regTcPtr.p->m_dealloc = 0;
regTcPtr.p->m_row_id = save; regTcPtr.p->m_row_id = save;
fragptr = fragPtr; fragptr = fragPtr;
@ -4274,6 +4266,45 @@ Dblqh::nr_delete_complete(Signal* signal, Nr_op_info* op)
} }
} }
Uint32
Dblqh::readPrimaryKeys(Uint32 opPtrI, Uint32 * dst, bool xfrm)
{
TcConnectionrecPtr regTcPtr;
DatabufPtr regDatabufptr;
Uint64 Tmp[MAX_KEY_SIZE_IN_WORDS >> 1];
jamEntry();
regTcPtr.i = opPtrI;
ptrCheckGuard(regTcPtr, ctcConnectrecFileSize, tcConnectionrec);
Uint32 tableId = regTcPtr.p->tableref;
Uint32 keyLen = regTcPtr.p->primKeyLen;
regDatabufptr.i = regTcPtr.p->firstTupkeybuf;
Uint32 * tmp = xfrm ? (Uint32*)Tmp : dst;
memcpy(tmp, regTcPtr.p->tupkeyData, sizeof(regTcPtr.p->tupkeyData));
if (keyLen > 4)
{
tmp += 4;
Uint32 pos = 4;
do {
ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
memcpy(tmp, regDatabufptr.p->data, sizeof(regDatabufptr.p->data));
regDatabufptr.i = regDatabufptr.p->nextDatabuf;
tmp += sizeof(regDatabufptr.p->data) >> 2;
pos += sizeof(regDatabufptr.p->data) >> 2;
} while(pos < keyLen);
}
if (xfrm)
{
jam();
Uint32 keyPartLen[MAX_ATTRIBUTES_IN_INDEX];
return xfrm_key(tableId, (Uint32*)Tmp, dst, ~0, keyPartLen);
}
return keyLen;
}
/* =*======================================================================= */ /* =*======================================================================= */
/* ======= SEND KEYINFO TO ACC ======= */ /* ======= SEND KEYINFO TO ACC ======= */
@ -4447,10 +4478,6 @@ Dblqh::acckeyconf_tupkeyreq(Signal* signal, TcConnectionrec* regTcPtr,
* ----------------------------------------------------------------------- */ * ----------------------------------------------------------------------- */
Uint32 page_idx = local_key & MAX_TUPLES_PER_PAGE; Uint32 page_idx = local_key & MAX_TUPLES_PER_PAGE;
Uint32 page_no = local_key >> MAX_TUPLES_BITS; Uint32 page_no = local_key >> MAX_TUPLES_BITS;
#ifdef TRACE_LQHKEYREQ
ndbout << "localkey: [ " << hex << page_no << " " << page_idx << "]"
<< endl;
#endif
Uint32 Ttupreq = regTcPtr->dirtyOp; Uint32 Ttupreq = regTcPtr->dirtyOp;
Ttupreq = Ttupreq + (regTcPtr->opSimple << 1); Ttupreq = Ttupreq + (regTcPtr->opSimple << 1);
Ttupreq = Ttupreq + (op << 6); Ttupreq = Ttupreq + (op << 6);
@ -4506,70 +4533,13 @@ Dblqh::acckeyconf_tupkeyreq(Signal* signal, TcConnectionrec* regTcPtr,
tupKeyReq->m_row_id_page_no = sig0; tupKeyReq->m_row_id_page_no = sig0;
tupKeyReq->m_row_id_page_idx = sig1; tupKeyReq->m_row_id_page_idx = sig1;
if (ERROR_INSERTED(5712) && regTcPtr->operation == ZINSERT) TRACE_OP(regTcPtr, "TUPKEYREQ");
{
ndbout << "INSERT " << regFragptrP->tabRef
<< "(" << regFragptrP->fragId << ")";
{
ndbout << "key=[" << hex;
Uint32 i;
for(i = 0; i<regTcPtr->primKeyLen && i < 4; i++){
ndbout << hex << regTcPtr->tupkeyData[i] << " ";
}
DatabufPtr regDatabufptr;
regDatabufptr.i = regTcPtr->firstTupkeybuf;
while(i < regTcPtr->primKeyLen)
{
ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
for(Uint32 j = 0; j<4 && i<regTcPtr->primKeyLen; j++, i++)
ndbout << hex << regDatabufptr.p->data[j] << " ";
}
ndbout << "] ";
}
if(regTcPtr->m_use_rowid)
ndbout << " " << regTcPtr->m_row_id;
}
if (ERROR_INSERTED(5712) && regTcPtr->operation == ZDELETE)
{
Local_key lk; lk.assref(local_key);
ndbout << "DELETE " << regFragptrP->tabRef
<< "(" << regFragptrP->fragId << ") " << lk;
{
ndbout << "key=[" << hex;
Uint32 i;
for(i = 0; i<regTcPtr->primKeyLen && i < 4; i++){
ndbout << hex << regTcPtr->tupkeyData[i] << " ";
}
DatabufPtr regDatabufptr;
regDatabufptr.i = regTcPtr->firstTupkeybuf;
while(i < regTcPtr->primKeyLen)
{
ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
for(Uint32 j = 0; j<4 && i<regTcPtr->primKeyLen; j++, i++)
ndbout << hex << regDatabufptr.p->data[j] << " ";
}
ndbout << "]" << endl;
}
}
regTcPtr->m_use_rowid |= (op == ZINSERT); regTcPtr->m_use_rowid |= (op == ZINSERT);
regTcPtr->m_row_id.m_page_no = page_no; regTcPtr->m_row_id.m_page_no = page_no;
regTcPtr->m_row_id.m_page_idx = page_idx; regTcPtr->m_row_id.m_page_idx = page_idx;
EXECUTE_DIRECT(tup, GSN_TUPKEYREQ, signal, TupKeyReq::SignalLength); EXECUTE_DIRECT(tup, GSN_TUPKEYREQ, signal, TupKeyReq::SignalLength);
if (ERROR_INSERTED(5712) && regTcPtr->operation == ZINSERT)
{
ndbout << endl;
}
}//Dblqh::execACCKEYCONF() }//Dblqh::execACCKEYCONF()
void void
@ -4654,27 +4624,37 @@ void Dblqh::tupkeyConfLab(Signal* signal)
const TupKeyConf * const tupKeyConf = (TupKeyConf *)&signal->theData[0]; const TupKeyConf * const tupKeyConf = (TupKeyConf *)&signal->theData[0];
TcConnectionrec * const regTcPtr = tcConnectptr.p; TcConnectionrec * const regTcPtr = tcConnectptr.p;
Uint32 activeCreat = regTcPtr->activeCreat; Uint32 activeCreat = regTcPtr->activeCreat;
Uint32 readLen = tupKeyConf->readLength;
Uint32 writeLen = tupKeyConf->writeLength;
Uint32 accOp = regTcPtr->accConnectrec;
c_acc->execACCKEY_ORD(signal, accOp);
TRACE_OP(regTcPtr, "TUPKEYCONF");
if (regTcPtr->simpleRead) { if (regTcPtr->simpleRead) {
jam(); jam();
/* ---------------------------------------------------------------------- /* ----------------------------------------------------------------------
* THE OPERATION IS A SIMPLE READ. WE WILL IMMEDIATELY COMMIT THE OPERATION. * THE OPERATION IS A SIMPLE READ.
* SINCE WE HAVE NOT RELEASED THE FRAGMENT LOCK (FOR LOCAL CHECKPOINTS) YET * WE WILL IMMEDIATELY COMMIT THE OPERATION.
* SINCE WE HAVE NOT RELEASED THE FRAGMENT LOCK
* (FOR LOCAL CHECKPOINTS) YET
* WE CAN GO IMMEDIATELY TO COMMIT_CONTINUE_AFTER_BLOCKED. * WE CAN GO IMMEDIATELY TO COMMIT_CONTINUE_AFTER_BLOCKED.
* WE HAVE ALREADY SENT THE RESPONSE SO WE ARE NOT INTERESTED IN READ LENGTH * WE HAVE ALREADY SENT THE RESPONSE SO WE ARE NOT INTERESTED IN
* ---------------------------------------------------------------------- */ * READ LENGTH
* --------------------------------------------------------------------- */
commitContinueAfterBlockedLab(signal); commitContinueAfterBlockedLab(signal);
return; return;
}//if }//if
if (tupKeyConf->readLength != 0) { if (readLen != 0)
{
jam(); jam();
/* SET BIT 15 IN REQINFO */ /* SET BIT 15 IN REQINFO */
LqhKeyReq::setApplicationAddressFlag(regTcPtr->reqinfo, 1); LqhKeyReq::setApplicationAddressFlag(regTcPtr->reqinfo, 1);
regTcPtr->readlenAi = readLen;
regTcPtr->readlenAi = tupKeyConf->readLength;
}//if }//if
regTcPtr->totSendlenAi = tupKeyConf->writeLength; regTcPtr->totSendlenAi = writeLen;
ndbrequire(regTcPtr->totSendlenAi == regTcPtr->currTupAiLen); ndbrequire(regTcPtr->totSendlenAi == regTcPtr->currTupAiLen);
if (unlikely(activeCreat == Fragrecord::AC_NR_COPY)) if (unlikely(activeCreat == Fragrecord::AC_NR_COPY))
@ -5597,6 +5577,8 @@ void Dblqh::releaseOprec(Signal* signal)
if (TRACENR_FLAG) if (TRACENR_FLAG)
TRACENR("DELETED: " << regTcPtr->m_row_id << endl); TRACENR("DELETED: " << regTcPtr->m_row_id << endl);
TRACE_OP(regTcPtr, "DEALLOC");
signal->theData[0] = regTcPtr->fragmentid; signal->theData[0] = regTcPtr->fragmentid;
signal->theData[1] = regTcPtr->tableref; signal->theData[1] = regTcPtr->tableref;
@ -5818,6 +5800,10 @@ void Dblqh::execCOMMIT(Signal* signal)
ptrAss(tcConnectptr, regTcConnectionrec); ptrAss(tcConnectptr, regTcConnectionrec);
if ((tcConnectptr.p->transid[0] == transid1) && if ((tcConnectptr.p->transid[0] == transid1) &&
(tcConnectptr.p->transid[1] == transid2)) { (tcConnectptr.p->transid[1] == transid2)) {
TcConnectionrec * const regTcPtr = tcConnectptr.p;
TRACE_OP(regTcPtr, "COMMIT");
commitReqLab(signal, gci); commitReqLab(signal, gci);
return; return;
}//if }//if
@ -5937,6 +5923,10 @@ void Dblqh::execCOMPLETE(Signal* signal)
if ((tcConnectptr.p->transactionState == TcConnectionrec::COMMITTED) && if ((tcConnectptr.p->transactionState == TcConnectionrec::COMMITTED) &&
(tcConnectptr.p->transid[0] == transid1) && (tcConnectptr.p->transid[0] == transid1) &&
(tcConnectptr.p->transid[1] == transid2)) { (tcConnectptr.p->transid[1] == transid2)) {
TcConnectionrec * const regTcPtr = tcConnectptr.p;
TRACE_OP(regTcPtr, "COMPLETE");
if (tcConnectptr.p->seqNoReplica != 0 && if (tcConnectptr.p->seqNoReplica != 0 &&
tcConnectptr.p->activeCreat == Fragrecord::AC_NORMAL) { tcConnectptr.p->activeCreat == Fragrecord::AC_NORMAL) {
jam(); jam();
@ -6313,12 +6303,16 @@ void Dblqh::commitContinueAfterBlockedLab(Signal* signal)
TRACENR(endl); TRACENR(endl);
} }
TRACE_OP(regTcPtr.p, "ACC_COMMITREQ");
Uint32 acc = refToBlock(regTcPtr.p->tcAccBlockref); Uint32 acc = refToBlock(regTcPtr.p->tcAccBlockref);
signal->theData[0] = regTcPtr.p->accConnectrec; signal->theData[0] = regTcPtr.p->accConnectrec;
EXECUTE_DIRECT(acc, GSN_ACC_COMMITREQ, signal, 1); EXECUTE_DIRECT(acc, GSN_ACC_COMMITREQ, signal, 1);
} else { } else {
if(!dirtyOp){ if(!dirtyOp){
TRACE_OP(regTcPtr.p, "ACC_COMMITREQ");
Uint32 acc = refToBlock(regTcPtr.p->tcAccBlockref); Uint32 acc = refToBlock(regTcPtr.p->tcAccBlockref);
signal->theData[0] = regTcPtr.p->accConnectrec; signal->theData[0] = regTcPtr.p->accConnectrec;
EXECUTE_DIRECT(acc, GSN_ACC_COMMITREQ, signal, 1); EXECUTE_DIRECT(acc, GSN_ACC_COMMITREQ, signal, 1);
@ -6362,6 +6356,8 @@ Dblqh::tupcommit_conf_callback(Signal* signal, Uint32 tcPtrI)
c_fragment_pool.getPtr(regFragptr); c_fragment_pool.getPtr(regFragptr);
fragptr = regFragptr; fragptr = regFragptr;
TRACE_OP(tcPtr, "ACC_COMMITREQ");
Uint32 acc = refToBlock(tcPtr->tcAccBlockref); Uint32 acc = refToBlock(tcPtr->tcAccBlockref);
signal->theData[0] = tcPtr->accConnectrec; signal->theData[0] = tcPtr->accConnectrec;
EXECUTE_DIRECT(acc, GSN_ACC_COMMITREQ, signal, 1); EXECUTE_DIRECT(acc, GSN_ACC_COMMITREQ, signal, 1);
@ -6670,6 +6666,8 @@ void Dblqh::execABORT(Signal* signal)
regTcPtr->commitAckMarker = RNIL; regTcPtr->commitAckMarker = RNIL;
} }
TRACE_OP(regTcPtr, "ABORT");
abortStateHandlerLab(signal); abortStateHandlerLab(signal);
return; return;
@ -7087,23 +7085,30 @@ void Dblqh::abortContinueAfterBlockedLab(Signal* signal, bool canBlock)
* ALSO AS PART OF A NORMAL ABORT WITHOUT BLOCKING. * ALSO AS PART OF A NORMAL ABORT WITHOUT BLOCKING.
* WE MUST ABORT TUP BEFORE ACC TO ENSURE THAT NO ONE RACES IN * WE MUST ABORT TUP BEFORE ACC TO ENSURE THAT NO ONE RACES IN
* AND SEES A STATE IN TUP. * AND SEES A STATE IN TUP.
* ------------------------------------------------------------------------ */ * ----------------------------------------------------------------------- */
TcConnectionrec * const regTcPtr = tcConnectptr.p; TcConnectionrec * const regTcPtr = tcConnectptr.p;
fragptr.i = regTcPtr->fragmentptr;
c_fragment_pool.getPtr(fragptr); TRACE_OP(regTcPtr, "ACC ABORT");
signal->theData[0] = regTcPtr->tupConnectrec;
EXECUTE_DIRECT(DBTUP, GSN_TUP_ABORTREQ, signal, 1);
regTcPtr->transactionState = TcConnectionrec::WAIT_ACC_ABORT; regTcPtr->transactionState = TcConnectionrec::WAIT_ACC_ABORT;
signal->theData[0] = regTcPtr->accConnectrec; signal->theData[0] = regTcPtr->accConnectrec;
signal->theData[1] = true; signal->theData[1] = 2; // JOB BUFFER IF NEEDED
EXECUTE_DIRECT(DBACC, GSN_ACC_ABORTREQ, signal, 2); EXECUTE_DIRECT(DBACC, GSN_ACC_ABORTREQ, signal, 2);
/* ------------------------------------------------------------------------
* We need to insert a real-time break by sending ACC_ABORTCONF through the if (signal->theData[1] == RNIL)
* job buffer to ensure that we catch any ACCKEYCONF or TUPKEYCONF or {
* TUPKEYREF that are in the job buffer but not yet processed. Doing jam();
* everything without that would race and create a state error when they /* ------------------------------------------------------------------------
* are executed. * We need to insert a real-time break by sending ACC_ABORTCONF through the
* ----------------------------------------------------------------------- */ * job buffer to ensure that we catch any ACCKEYCONF or TUPKEYCONF or
* TUPKEYREF that are in the job buffer but not yet processed. Doing
* everything without that would race and create a state error when they
* are executed.
* --------------------------------------------------------------------- */
return;
}
execACC_ABORTCONF(signal);
return; return;
}//Dblqh::abortContinueAfterBlockedLab() }//Dblqh::abortContinueAfterBlockedLab()
@ -7117,6 +7122,11 @@ void Dblqh::execACC_ABORTCONF(Signal* signal)
ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec); ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
TcConnectionrec * const regTcPtr = tcConnectptr.p; TcConnectionrec * const regTcPtr = tcConnectptr.p;
ndbrequire(regTcPtr->transactionState == TcConnectionrec::WAIT_ACC_ABORT); ndbrequire(regTcPtr->transactionState == TcConnectionrec::WAIT_ACC_ABORT);
TRACE_OP(regTcPtr, "ACC_ABORTCONF");
signal->theData[0] = regTcPtr->tupConnectrec;
EXECUTE_DIRECT(DBTUP, GSN_TUP_ABORTREQ, signal, 1);
continueAbortLab(signal); continueAbortLab(signal);
return; return;
}//Dblqh::execACC_ABORTCONF() }//Dblqh::execACC_ABORTCONF()
@ -7623,6 +7633,7 @@ void Dblqh::execNEXT_SCANCONF(Signal* signal)
scanLockReleasedLab(signal); scanLockReleasedLab(signal);
break; break;
default: default:
ndbout_c("%d", scanptr.p->scanState);
ndbrequire(false); ndbrequire(false);
}//switch }//switch
}//Dblqh::execNEXT_SCANCONF() }//Dblqh::execNEXT_SCANCONF()
@ -8351,6 +8362,8 @@ void Dblqh::continueAfterReceivingAllAiLab(Signal* signal)
AccScanReq::setDescendingFlag(req->requestInfo, scanptr.p->descending); AccScanReq::setDescendingFlag(req->requestInfo, scanptr.p->descending);
AccScanReq::setNoDiskScanFlag(req->requestInfo, AccScanReq::setNoDiskScanFlag(req->requestInfo,
!tcConnectptr.p->m_disk_table); !tcConnectptr.p->m_disk_table);
AccScanReq::setLcpScanFlag(req->requestInfo, scanptr.p->lcpScan);
req->transId1 = tcConnectptr.p->transid[0]; req->transId1 = tcConnectptr.p->transid[0];
req->transId2 = tcConnectptr.p->transid[1]; req->transId2 = tcConnectptr.p->transid[1];
req->savePointId = tcConnectptr.p->savePointId; req->savePointId = tcConnectptr.p->savePointId;
@ -8837,7 +8850,9 @@ void Dblqh::nextScanConfScanLab(Signal* signal)
}//if }//if
// If accOperationPtr == RNIL no record was returned by ACC // If accOperationPtr == RNIL no record was returned by ACC
if (nextScanConf->accOperationPtr == RNIL) { Uint32 accOpPtr = nextScanConf->accOperationPtr;
if (accOpPtr == RNIL)
{
jam(); jam();
/************************************************************* /*************************************************************
* STOP THE SCAN PROCESS IF THIS HAS BEEN REQUESTED. * STOP THE SCAN PROCESS IF THIS HAS BEEN REQUESTED.
@ -8871,7 +8886,8 @@ void Dblqh::nextScanConfScanLab(Signal* signal)
jam(); jam();
set_acc_ptr_in_scan_record(scanptr.p, set_acc_ptr_in_scan_record(scanptr.p,
scanptr.p->m_curr_batch_size_rows, scanptr.p->m_curr_batch_size_rows,
nextScanConf->accOperationPtr); accOpPtr);
jam(); jam();
nextScanConfLoopLab(signal); nextScanConfLoopLab(signal);
}//Dblqh::nextScanConfScanLab() }//Dblqh::nextScanConfScanLab()
@ -9049,6 +9065,7 @@ Dblqh::readPrimaryKeys(ScanRecord *scanP, TcConnectionrec *tcConP, Uint32 *dst)
} }
int ret = c_tup->accReadPk(tableId, fragId, fragPageId, pageIndex, dst, false); int ret = c_tup->accReadPk(tableId, fragId, fragPageId, pageIndex, dst, false);
jamEntry();
if(0) if(0)
ndbout_c("readPrimaryKeys(table: %d fragment: %d [ %d %d ] -> %d", ndbout_c("readPrimaryKeys(table: %d fragment: %d [ %d %d ] -> %d",
tableId, fragId, fragPageId, pageIndex, ret); tableId, fragId, fragPageId, pageIndex, ret);
@ -9071,12 +9088,25 @@ void Dblqh::scanTupkeyConfLab(Signal* signal)
tcConnectptr.p->transactionState = TcConnectionrec::SCAN_STATE_USED; tcConnectptr.p->transactionState = TcConnectionrec::SCAN_STATE_USED;
scanptr.i = tcConnectptr.p->tcScanRec; scanptr.i = tcConnectptr.p->tcScanRec;
c_scanRecordPool.getPtr(scanptr); c_scanRecordPool.getPtr(scanptr);
Uint32 rows = scanptr.p->m_curr_batch_size_rows;
Uint32 accOpPtr= get_acc_ptr_from_scan_record(scanptr.p, rows, false);
if (accOpPtr != (Uint32)-1)
{
c_acc->execACCKEY_ORD(signal, accOpPtr);
jamEntry();
}
else
{
ndbassert(refToBlock(scanptr.p->scanBlockref) != DBACC);
}
if (scanptr.p->scanCompletedStatus == ZTRUE) { if (scanptr.p->scanCompletedStatus == ZTRUE) {
/* --------------------------------------------------------------------- /* ---------------------------------------------------------------------
* STOP THE SCAN PROCESS IF THIS HAS BEEN REQUESTED. * STOP THE SCAN PROCESS IF THIS HAS BEEN REQUESTED.
* --------------------------------------------------------------------- */ * --------------------------------------------------------------------- */
if ((scanptr.p->scanLockHold == ZTRUE) && if ((scanptr.p->scanLockHold == ZTRUE) && rows)
(scanptr.p->m_curr_batch_size_rows > 0)) { {
jam(); jam();
scanptr.p->scanReleaseCounter = 1; scanptr.p->scanReleaseCounter = 1;
scanReleaseLocksLab(signal); scanReleaseLocksLab(signal);
@ -9093,7 +9123,7 @@ void Dblqh::scanTupkeyConfLab(Signal* signal)
}//if }//if
ndbrequire(scanptr.p->m_curr_batch_size_rows < MAX_PARALLEL_OP_PER_SCAN); ndbrequire(scanptr.p->m_curr_batch_size_rows < MAX_PARALLEL_OP_PER_SCAN);
scanptr.p->m_curr_batch_size_bytes+= tdata4; scanptr.p->m_curr_batch_size_bytes+= tdata4;
scanptr.p->m_curr_batch_size_rows++; scanptr.p->m_curr_batch_size_rows = rows + 1;
scanptr.p->m_last_row = tdata5; scanptr.p->m_last_row = tdata5;
if (scanptr.p->check_scan_batch_completed() | tdata5){ if (scanptr.p->check_scan_batch_completed() | tdata5){
if (scanptr.p->scanLockHold == ZTRUE) { if (scanptr.p->scanLockHold == ZTRUE) {
@ -9103,7 +9133,7 @@ void Dblqh::scanTupkeyConfLab(Signal* signal)
return; return;
} else { } else {
jam(); jam();
scanptr.p->scanReleaseCounter = scanptr.p->m_curr_batch_size_rows; scanptr.p->scanReleaseCounter = rows + 1;
scanReleaseLocksLab(signal); scanReleaseLocksLab(signal);
return; return;
} }
@ -9187,12 +9217,24 @@ void Dblqh::scanTupkeyRefLab(Signal* signal)
tcConnectptr.p->transactionState = TcConnectionrec::SCAN_STATE_USED; tcConnectptr.p->transactionState = TcConnectionrec::SCAN_STATE_USED;
scanptr.i = tcConnectptr.p->tcScanRec; scanptr.i = tcConnectptr.p->tcScanRec;
c_scanRecordPool.getPtr(scanptr); c_scanRecordPool.getPtr(scanptr);
Uint32 rows = scanptr.p->m_curr_batch_size_rows;
Uint32 accOpPtr= get_acc_ptr_from_scan_record(scanptr.p, rows, false);
if (accOpPtr != (Uint32)-1)
{
c_acc->execACCKEY_ORD(signal, accOpPtr);
}
else
{
ndbassert(refToBlock(scanptr.p->scanBlockref) != DBACC);
}
if (scanptr.p->scanCompletedStatus == ZTRUE) { if (scanptr.p->scanCompletedStatus == ZTRUE) {
/* --------------------------------------------------------------------- /* ---------------------------------------------------------------------
* STOP THE SCAN PROCESS IF THIS HAS BEEN REQUESTED. * STOP THE SCAN PROCESS IF THIS HAS BEEN REQUESTED.
* --------------------------------------------------------------------- */ * --------------------------------------------------------------------- */
if ((scanptr.p->scanLockHold == ZTRUE) && if ((scanptr.p->scanLockHold == ZTRUE) && rows)
(scanptr.p->m_curr_batch_size_rows > 0)) { {
jam(); jam();
scanptr.p->scanReleaseCounter = 1; scanptr.p->scanReleaseCounter = 1;
scanReleaseLocksLab(signal); scanReleaseLocksLab(signal);
@ -9213,8 +9255,8 @@ void Dblqh::scanTupkeyRefLab(Signal* signal)
scanptr.p->scanReleaseCounter = 1; scanptr.p->scanReleaseCounter = 1;
} else { } else {
jam(); jam();
scanptr.p->m_curr_batch_size_rows++; scanptr.p->m_curr_batch_size_rows = rows + 1;
scanptr.p->scanReleaseCounter = scanptr.p->m_curr_batch_size_rows; scanptr.p->scanReleaseCounter = rows + 1;
}//if }//if
/* -------------------------------------------------------------------- /* --------------------------------------------------------------------
* WE NEED TO RELEASE ALL LOCKS CURRENTLY * WE NEED TO RELEASE ALL LOCKS CURRENTLY
@ -9224,7 +9266,7 @@ void Dblqh::scanTupkeyRefLab(Signal* signal)
return; return;
}//if }//if
Uint32 time_passed= tcConnectptr.p->tcTimer - cLqhTimeOutCount; Uint32 time_passed= tcConnectptr.p->tcTimer - cLqhTimeOutCount;
if (scanptr.p->m_curr_batch_size_rows > 0) { if (rows) {
if (time_passed > 1) { if (time_passed > 1) {
/* ----------------------------------------------------------------------- /* -----------------------------------------------------------------------
* WE NEED TO ENSURE THAT WE DO NOT SEARCH FOR THE NEXT TUPLE FOR A * WE NEED TO ENSURE THAT WE DO NOT SEARCH FOR THE NEXT TUPLE FOR A
@ -9232,7 +9274,7 @@ void Dblqh::scanTupkeyRefLab(Signal* signal)
* THE FOUND TUPLE IF FOUND TUPLES ARE RARE. If more than 10 ms passed we * THE FOUND TUPLE IF FOUND TUPLES ARE RARE. If more than 10 ms passed we
* send the found tuples to the API. * send the found tuples to the API.
* ----------------------------------------------------------------------- */ * ----------------------------------------------------------------------- */
scanptr.p->scanReleaseCounter = scanptr.p->m_curr_batch_size_rows + 1; scanptr.p->scanReleaseCounter = rows + 1;
scanReleaseLocksLab(signal); scanReleaseLocksLab(signal);
return; return;
} }
@ -9377,7 +9419,7 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
const Uint32 readCommitted = ScanFragReq::getReadCommittedFlag(reqinfo); const Uint32 readCommitted = ScanFragReq::getReadCommittedFlag(reqinfo);
const Uint32 rangeScan = ScanFragReq::getRangeScanFlag(reqinfo); const Uint32 rangeScan = ScanFragReq::getRangeScanFlag(reqinfo);
const Uint32 descending = ScanFragReq::getDescendingFlag(reqinfo); const Uint32 descending = ScanFragReq::getDescendingFlag(reqinfo);
const Uint32 tupScan = ScanFragReq::getTupScanFlag(reqinfo); Uint32 tupScan = ScanFragReq::getTupScanFlag(reqinfo);
const Uint32 attrLen = ScanFragReq::getAttrLen(reqinfo); const Uint32 attrLen = ScanFragReq::getAttrLen(reqinfo);
const Uint32 scanPrio = ScanFragReq::getScanPrio(reqinfo); const Uint32 scanPrio = ScanFragReq::getScanPrio(reqinfo);
@ -9395,6 +9437,11 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
scanptr.p->m_max_batch_size_rows = max_rows; scanptr.p->m_max_batch_size_rows = max_rows;
scanptr.p->m_max_batch_size_bytes = max_bytes; scanptr.p->m_max_batch_size_bytes = max_bytes;
#if 0
if (! rangeScan)
tupScan = 1;
#endif
if (! rangeScan && ! tupScan) if (! rangeScan && ! tupScan)
scanptr.p->scanBlockref = tcConnectptr.p->tcAccBlockref; scanptr.p->scanBlockref = tcConnectptr.p->tcAccBlockref;
else if (! tupScan) else if (! tupScan)
@ -9408,6 +9455,7 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
scanptr.p->rangeScan = rangeScan; scanptr.p->rangeScan = rangeScan;
scanptr.p->descending = descending; scanptr.p->descending = descending;
scanptr.p->tupScan = tupScan; scanptr.p->tupScan = tupScan;
scanptr.p->lcpScan = ScanFragReq::getLcpScanFlag(reqinfo);
scanptr.p->scanState = ScanRecord::SCAN_FREE; scanptr.p->scanState = ScanRecord::SCAN_FREE;
scanptr.p->scanFlag = ZFALSE; scanptr.p->scanFlag = ZFALSE;
scanptr.p->m_row_id.setNull(); scanptr.p->m_row_id.setNull();
@ -9416,7 +9464,7 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
scanptr.p->scanApiOpPtr = scanFragReq->clientOpPtr; scanptr.p->scanApiOpPtr = scanFragReq->clientOpPtr;
scanptr.p->m_last_row = 0; scanptr.p->m_last_row = 0;
scanptr.p->scanStoredProcId = RNIL; scanptr.p->scanStoredProcId = RNIL;
scanptr.p->copyPtr = RNIL;
if (max_rows == 0 || (max_bytes > 0 && max_rows > max_bytes)){ if (max_rows == 0 || (max_bytes > 0 && max_rows > max_bytes)){
jam(); jam();
return ScanFragRef::ZWRONG_BATCH_SIZE; return ScanFragRef::ZWRONG_BATCH_SIZE;
@ -9437,8 +9485,10 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
* !idx uses 1 - (MAX_PARALLEL_SCANS_PER_FRAG - 1) = 1-11 * !idx uses 1 - (MAX_PARALLEL_SCANS_PER_FRAG - 1) = 1-11
* idx uses from MAX_PARALLEL_SCANS_PER_FRAG - MAX = 12-42) * idx uses from MAX_PARALLEL_SCANS_PER_FRAG - MAX = 12-42)
*/ */
Uint32 start = (rangeScan || tupScan ? MAX_PARALLEL_SCANS_PER_FRAG : 1 ); tupScan = 0; // Make sure that close tup scan does not start acc scan incorrectly
Uint32 stop = (rangeScan || tupScan ? MAX_PARALLEL_INDEX_SCANS_PER_FRAG : MAX_PARALLEL_SCANS_PER_FRAG - 1); Uint32 start = (rangeScan || tupScan) ? MAX_PARALLEL_SCANS_PER_FRAG : 1 ;
Uint32 stop = (rangeScan || tupScan) ? MAX_PARALLEL_INDEX_SCANS_PER_FRAG :
MAX_PARALLEL_SCANS_PER_FRAG - 1;
stop += start; stop += start;
Uint32 free = tFragPtr.p->m_scanNumberMask.find(start); Uint32 free = tFragPtr.p->m_scanNumberMask.find(start);
@ -9872,6 +9922,8 @@ void Dblqh::execCOPY_FRAGREQ(Signal* signal)
fragptr.p->m_scanNumberMask.clear(NR_ScanNo); fragptr.p->m_scanNumberMask.clear(NR_ScanNo);
scanptr.p->scanBlockref = DBTUP_REF; scanptr.p->scanBlockref = DBTUP_REF;
scanptr.p->scanLockHold = ZFALSE; scanptr.p->scanLockHold = ZFALSE;
scanptr.p->m_curr_batch_size_rows = 0;
scanptr.p->m_curr_batch_size_bytes= 0;
initScanTc(0, initScanTc(0,
0, 0,
@ -10074,7 +10126,7 @@ void Dblqh::nextScanConfCopyLab(Signal* signal)
initCopyTc(signal, ZDELETE); initCopyTc(signal, ZDELETE);
set_acc_ptr_in_scan_record(scanptr.p, 0, RNIL); set_acc_ptr_in_scan_record(scanptr.p, 0, RNIL);
tcConP->gci = nextScanConf->gci; tcConP->gci = nextScanConf->gci;
tcConP->primKeyLen = 0; tcConP->primKeyLen = 0;
tcConP->totSendlenAi = 0; tcConP->totSendlenAi = 0;
tcConP->connectState = TcConnectionrec::COPY_CONNECTED; tcConP->connectState = TcConnectionrec::COPY_CONNECTED;
@ -10197,6 +10249,12 @@ void Dblqh::copyTupkeyConfLab(Signal* signal)
scanptr.i = tcConnectptr.p->tcScanRec; scanptr.i = tcConnectptr.p->tcScanRec;
c_scanRecordPool.getPtr(scanptr); c_scanRecordPool.getPtr(scanptr);
ScanRecord* scanP = scanptr.p; ScanRecord* scanP = scanptr.p;
Uint32 rows = scanP->m_curr_batch_size_rows;
Uint32 accOpPtr= get_acc_ptr_from_scan_record(scanP, rows, false);
ndbassert(accOpPtr != (Uint32)-1);
c_acc->execACCKEY_ORD(signal, accOpPtr);
if (tcConnectptr.p->errorCode != 0) { if (tcConnectptr.p->errorCode != 0) {
jam(); jam();
closeCopyLab(signal); closeCopyLab(signal);
@ -18538,6 +18596,21 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal)
} }
ndbrequire(arg != 2308); ndbrequire(arg != 2308);
} }
#ifdef ERROR_INSERT
if (arg == 5712 || arg == 5713)
{
if (arg == 5712)
{
traceopout = &ndbout;
}
else if (arg == 5713)
{
traceopout = tracenrout;
}
SET_ERROR_INSERT_VALUE(arg);
}
#endif
}//Dblqh::execDUMP_STATE_ORD() }//Dblqh::execDUMP_STATE_ORD()
@ -18702,3 +18775,39 @@ void Dblqh::writeDbgInfoPageHeader(LogPageRecordPtr logP, Uint32 place,
logP.p->logPageWord[ZPOS_IN_WRITING]= 1; logP.p->logPageWord[ZPOS_IN_WRITING]= 1;
} }
#if defined ERROR_INSERT
void
Dblqh::TRACE_OP_DUMP(const Dblqh::TcConnectionrec* regTcPtr, const char * pos)
{
(* traceopout)
<< "[ " << hex << regTcPtr->transid[0]
<< " " << hex << regTcPtr->transid[1] << " ] " << dec
<< pos
<< " " << (Operation_t)regTcPtr->operation
<< " " << regTcPtr->tableref
<< "(" << regTcPtr->fragmentid << ")"
<< "(" << (regTcPtr->seqNoReplica == 0 ? "P" : "B") << ")" ;
{
(* traceopout) << "key=[" << hex;
Uint32 i;
for(i = 0; i<regTcPtr->primKeyLen && i < 4; i++){
(* traceopout) << hex << regTcPtr->tupkeyData[i] << " ";
}
DatabufPtr regDatabufptr;
regDatabufptr.i = regTcPtr->firstTupkeybuf;
while(i < regTcPtr->primKeyLen)
{
ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
for(Uint32 j = 0; j<4 && i<regTcPtr->primKeyLen; j++, i++)
(* traceopout) << hex << regDatabufptr.p->data[j] << " ";
}
(* traceopout) << "] ";
}
if (regTcPtr->m_use_rowid)
(* traceopout) << " " << regTcPtr->m_row_id;
(* traceopout) << endl;
}
#endif

View File

@ -7052,6 +7052,18 @@ void Dbtc::checkScanActiveInFailedLqh(Signal* signal,
found = true; found = true;
} }
} }
ScanFragList deliv(c_scan_frag_pool, scanptr.p->m_delivered_scan_frags);
for(deliv.first(ptr); !ptr.isNull(); deliv.next(ptr))
{
jam();
if (refToNode(ptr.p->lqhBlockref) == failedNodeId)
{
jam();
found = true;
break;
}
}
} }
if(found){ if(found){
jam(); jam();
@ -7081,15 +7093,20 @@ Dbtc::nodeFailCheckTransactions(Signal* signal,
for (transPtr.i = transPtrI; transPtr.i < capiConnectFilesize; transPtr.i++) for (transPtr.i = transPtrI; transPtr.i < capiConnectFilesize; transPtr.i++)
{ {
ptrCheckGuard(transPtr, capiConnectFilesize, apiConnectRecord); ptrCheckGuard(transPtr, capiConnectFilesize, apiConnectRecord);
Uint32 state = transPtr.p->apiConnectstate;
if (transPtr.p->m_transaction_nodes.get(failedNodeId)) if (transPtr.p->m_transaction_nodes.get(failedNodeId))
{ {
jam(); jam();
// Force timeout regardless of state // avoid assertion in timeoutfoundlab
c_appl_timeout_value = 1; if (state != CS_PREPARE_TO_COMMIT)
setApiConTimer(transPtr.i, TtcTimer - 2, __LINE__); {
timeOutFoundLab(signal, transPtr.i, ZNODEFAIL_BEFORE_COMMIT); // Force timeout regardless of state
c_appl_timeout_value = TapplTimeout; c_appl_timeout_value = 1;
setApiConTimer(transPtr.i, TtcTimer - 2, __LINE__);
timeOutFoundLab(signal, transPtr.i, ZNODEFAIL_BEFORE_COMMIT);
c_appl_timeout_value = TapplTimeout;
}
} }
// Send CONTINUEB to continue later // Send CONTINUEB to continue later

View File

@ -297,7 +297,6 @@ enum TransState {
}; };
enum TupleState { enum TupleState {
TUPLE_INITIAL_INSERT = 0,
TUPLE_PREPARED = 1, TUPLE_PREPARED = 1,
TUPLE_ALREADY_ABORTED = 2, TUPLE_ALREADY_ABORTED = 2,
TUPLE_TO_BE_COMMITTED = 3 TUPLE_TO_BE_COMMITTED = 3
@ -305,7 +304,6 @@ enum TupleState {
enum State { enum State {
NOT_INITIALIZED = 0, NOT_INITIALIZED = 0,
COMMON_AREA_PAGES = 1,
IDLE = 17, IDLE = 17,
ACTIVE = 18, ACTIVE = 18,
SYSTEM_RESTART = 19, SYSTEM_RESTART = 19,
@ -1441,7 +1439,6 @@ private:
void execSET_VAR_REQ(Signal* signal); void execSET_VAR_REQ(Signal* signal);
void execDROP_TAB_REQ(Signal* signal); void execDROP_TAB_REQ(Signal* signal);
void execALTER_TAB_REQ(Signal* signal); void execALTER_TAB_REQ(Signal* signal);
void execTUP_ALLOCREQ(Signal* signal);
void execTUP_DEALLOCREQ(Signal* signal); void execTUP_DEALLOCREQ(Signal* signal);
void execTUP_WRITELOG_REQ(Signal* signal); void execTUP_WRITELOG_REQ(Signal* signal);

View File

@ -202,29 +202,6 @@ Dbtup::receive_attrinfo(Signal* signal, Uint32 op,
} }
} }
void Dbtup::execTUP_ALLOCREQ(Signal* signal)
{
OperationrecPtr regOperPtr;
jamEntry();
regOperPtr.i= signal->theData[0];
c_operation_pool.getPtr(regOperPtr);
regOperPtr.p->op_struct.tuple_state= TUPLE_INITIAL_INSERT;
//ndbout_c("execTUP_ALLOCREQ");
signal->theData[0]= 0;
signal->theData[1]= ~0 >> MAX_TUPLES_BITS;
signal->theData[2]= (1 << MAX_TUPLES_BITS) - 1;
return;
mem_error:
jam();
signal->theData[0]= ZMEM_NOMEM_ERROR;
return;
}
void void
Dbtup::setChecksum(Tuple_header* tuple_ptr, Dbtup::setChecksum(Tuple_header* tuple_ptr,
Tablerec* regTabPtr) Tablerec* regTabPtr)
@ -455,13 +432,13 @@ Dbtup::load_diskpage(Signal* signal,
ptrCheckGuard(tabptr, cnoOfTablerec, tablerec); ptrCheckGuard(tabptr, cnoOfTablerec, tablerec);
Tablerec* regTabPtr = tabptr.p; Tablerec* regTabPtr = tabptr.p;
if(regOperPtr->op_struct.tuple_state == TUPLE_INITIAL_INSERT) if(local_key == ~(Uint32)0)
{ {
jam(); jam();
regOperPtr->op_struct.m_wait_log_buffer= 1; regOperPtr->op_struct.m_wait_log_buffer= 1;
regOperPtr->op_struct.m_load_diskpage_on_commit= 1; regOperPtr->op_struct.m_load_diskpage_on_commit= 1;
return 1; return 1;
} }
jam(); jam();
Uint32 page_idx= local_key & MAX_TUPLES_PER_PAGE; Uint32 page_idx= local_key & MAX_TUPLES_PER_PAGE;
@ -663,7 +640,7 @@ void Dbtup::execTUPKEYREQ(Signal* signal)
regOperPtr->savepointId= sig1; regOperPtr->savepointId= sig1;
regOperPtr->op_struct.primary_replica= sig2; regOperPtr->op_struct.primary_replica= sig2;
regOperPtr->m_tuple_location.m_page_idx= sig3; Uint32 pageidx = regOperPtr->m_tuple_location.m_page_idx= sig3;
sig1= tupKeyReq->opRef; sig1= tupKeyReq->opRef;
sig2= tupKeyReq->tcOpIndex; sig2= tupKeyReq->tcOpIndex;
@ -673,7 +650,7 @@ void Dbtup::execTUPKEYREQ(Signal* signal)
req_struct.tc_operation_ptr= sig1; req_struct.tc_operation_ptr= sig1;
req_struct.TC_index= sig2; req_struct.TC_index= sig2;
req_struct.TC_ref= sig3; req_struct.TC_ref= sig3;
req_struct.frag_page_id= sig4; Uint32 pageid = req_struct.frag_page_id= sig4;
req_struct.m_use_rowid = (TrequestInfo >> 11) & 1; req_struct.m_use_rowid = (TrequestInfo >> 11) & 1;
sig1= tupKeyReq->attrBufLen; sig1= tupKeyReq->attrBufLen;
@ -706,7 +683,8 @@ void Dbtup::execTUPKEYREQ(Signal* signal)
copyAttrinfo(regOperPtr, &cinBuffer[0]); copyAttrinfo(regOperPtr, &cinBuffer[0]);
if(Roptype == ZINSERT && get_tuple_state(regOperPtr)== TUPLE_INITIAL_INSERT) Uint32 localkey = (pageid << MAX_TUPLES_BITS) + pageidx;
if(Roptype == ZINSERT && localkey == ~0)
{ {
// No tuple allocatated yet // No tuple allocatated yet
goto do_insert; goto do_insert;
@ -1159,49 +1137,6 @@ Dbtup::prepare_initial_insert(KeyReqStruct *req_struct,
disk_undo ? (Tuple_header::DISK_ALLOC | Tuple_header::DISK_INLINE) : 0; disk_undo ? (Tuple_header::DISK_ALLOC | Tuple_header::DISK_INLINE) : 0;
} }
void
Dbtup::fix_disk_insert_no_mem_insert(KeyReqStruct *req_struct,
Operationrec* regOperPtr,
Tablerec* regTabPtr)
{
regOperPtr->m_undo_buffer_space= sizeof(Dbtup::Disk_undo::Alloc);
req_struct->check_offset[DD]= regTabPtr->get_check_offset(DD);
const Uint32 cnt1= regTabPtr->m_attributes[MM].m_no_of_varsize;
const Uint32 cnt2= regTabPtr->m_attributes[DD].m_no_of_varsize;
Uint32 *ptr= req_struct->m_tuple_ptr->get_var_part_ptr(regTabPtr);
if(cnt1)
{
// Disk part is 32-bit aligned
char *varptr = req_struct->m_var_data[MM].m_data_ptr;
ptr= ALIGN_WORD(varptr + regTabPtr->m_offsets[MM].m_max_var_offset);
}
else
{
ptr -= Tuple_header::HeaderSize;
}
req_struct->m_disk_ptr= (Tuple_header*)ptr;
if(cnt2)
{
KeyReqStruct::Var_data *dst= &req_struct->m_var_data[DD];
ptr=((Tuple_header*)ptr)->m_data+regTabPtr->m_offsets[DD].m_varpart_offset;
dst->m_data_ptr= (char*)(((Uint16*)ptr)+cnt2+1);
dst->m_offset_array_ptr= req_struct->var_pos_array + (cnt1 << 1);
dst->m_var_len_offset= cnt2;
dst->m_max_var_offset= regTabPtr->m_offsets[DD].m_max_var_offset;
}
// Set all null bits
memset(req_struct->m_disk_ptr->m_null_bits+
regTabPtr->m_offsets[DD].m_null_offset, 0xFF,
4*regTabPtr->m_offsets[DD].m_null_words);
req_struct->m_tuple_ptr->m_header_bits =
(Tuple_header::DISK_ALLOC | Tuple_header::DISK_INLINE);
}
int Dbtup::handleInsertReq(Signal* signal, int Dbtup::handleInsertReq(Signal* signal,
Ptr<Operationrec> regOperPtr, Ptr<Operationrec> regOperPtr,
Ptr<Fragrecord> fragPtr, Ptr<Fragrecord> fragPtr,
@ -1215,8 +1150,8 @@ int Dbtup::handleInsertReq(Signal* signal,
Tuple_header *tuple_ptr; Tuple_header *tuple_ptr;
bool disk = regTabPtr->m_no_of_disk_attributes > 0; bool disk = regTabPtr->m_no_of_disk_attributes > 0;
bool mem_insert = get_tuple_state(regOperPtr.p) == TUPLE_INITIAL_INSERT; bool mem_insert = regOperPtr.p->is_first_operation();
bool disk_insert = regOperPtr.p->is_first_operation() && disk; bool disk_insert = mem_insert && disk;
bool varsize = regTabPtr->m_attributes[MM].m_no_of_varsize; bool varsize = regTabPtr->m_attributes[MM].m_no_of_varsize;
bool rowid = req_struct->m_use_rowid; bool rowid = req_struct->m_use_rowid;
Uint32 real_page_id = regOperPtr.p->m_tuple_location.m_page_no; Uint32 real_page_id = regOperPtr.p->m_tuple_location.m_page_no;
@ -1244,21 +1179,16 @@ int Dbtup::handleInsertReq(Signal* signal,
if(mem_insert) if(mem_insert)
{ {
jam(); jam();
ndbassert(regOperPtr.p->is_first_operation()); // disk insert
prepare_initial_insert(req_struct, regOperPtr.p, regTabPtr); prepare_initial_insert(req_struct, regOperPtr.p, regTabPtr);
} }
else else
{ {
if (!regOperPtr.p->is_first_operation()) Operationrec* prevOp= req_struct->prevOpPtr.p;
{ ndbassert(prevOp->op_struct.op_type == ZDELETE);
Operationrec* prevOp= req_struct->prevOpPtr.p; tup_version= prevOp->tupVersion + 1;
ndbassert(prevOp->op_struct.op_type == ZDELETE);
tup_version= prevOp->tupVersion + 1; if(!prevOp->is_first_operation())
org= (Tuple_header*)c_undo_buffer.get_ptr(&prevOp->m_copy_tuple_location);
if(!prevOp->is_first_operation())
org= (Tuple_header*)c_undo_buffer.get_ptr(&prevOp->m_copy_tuple_location);
}
if (regTabPtr->need_expand()) if (regTabPtr->need_expand())
expand_tuple(req_struct, sizes, org, regTabPtr, !disk_insert); expand_tuple(req_struct, sizes, org, regTabPtr, !disk_insert);
else else
@ -1268,11 +1198,6 @@ int Dbtup::handleInsertReq(Signal* signal,
if (disk_insert) if (disk_insert)
{ {
int res; int res;
if (unlikely(!mem_insert))
{
sizes[DD] = sizes[DD+2] = regTabPtr->m_offsets[DD].m_fix_header_size;
fix_disk_insert_no_mem_insert(req_struct, regOperPtr.p, regTabPtr);
}
if (ERROR_INSERTED(4015)) if (ERROR_INSERTED(4015))
{ {
@ -1381,6 +1306,7 @@ int Dbtup::handleInsertReq(Signal* signal,
} }
if (unlikely(ptr == 0)) if (unlikely(ptr == 0))
{ {
jam();
goto alloc_rowid_error; goto alloc_rowid_error;
} }
} }
@ -1396,7 +1322,7 @@ int Dbtup::handleInsertReq(Signal* signal,
(varsize ? Tuple_header::CHAINED_ROW : 0); (varsize ? Tuple_header::CHAINED_ROW : 0);
regOperPtr.p->m_tuple_location.m_page_no = real_page_id; regOperPtr.p->m_tuple_location.m_page_no = real_page_id;
} }
else if(!rowid || !regOperPtr.p->is_first_operation()) else
{ {
int ret; int ret;
if (ERROR_INSERTED(4020)) if (ERROR_INSERTED(4020))
@ -1417,20 +1343,6 @@ int Dbtup::handleInsertReq(Signal* signal,
req_struct->m_use_rowid = false; req_struct->m_use_rowid = false;
base->m_header_bits &= ~(Uint32)Tuple_header::FREE; base->m_header_bits &= ~(Uint32)Tuple_header::FREE;
} }
else
{
if ((req_struct->m_row_id.m_page_no == frag_page_id &&
req_struct->m_row_id.m_page_idx == regOperPtr.p->m_tuple_location.m_page_idx))
{
ndbout_c("no mem insert but rowid (same)");
base->m_header_bits &= ~(Uint32)Tuple_header::FREE;
}
else
{
// no mem insert, but rowid
ndbrequire(false);
}
}
base->m_header_bits |= Tuple_header::ALLOC & base->m_header_bits |= Tuple_header::ALLOC &
(regOperPtr.p->is_first_operation() ? ~0 : 1); (regOperPtr.p->is_first_operation() ? ~0 : 1);

View File

@ -89,7 +89,6 @@ Dbtup::Dbtup(Block_context& ctx, Pgman* pgman)
addRecSignal(GSN_DROP_TAB_REQ, &Dbtup::execDROP_TAB_REQ); addRecSignal(GSN_DROP_TAB_REQ, &Dbtup::execDROP_TAB_REQ);
addRecSignal(GSN_TUP_ALLOCREQ, &Dbtup::execTUP_ALLOCREQ);
addRecSignal(GSN_TUP_DEALLOCREQ, &Dbtup::execTUP_DEALLOCREQ); addRecSignal(GSN_TUP_DEALLOCREQ, &Dbtup::execTUP_DEALLOCREQ);
addRecSignal(GSN_TUP_WRITELOG_REQ, &Dbtup::execTUP_WRITELOG_REQ); addRecSignal(GSN_TUP_WRITELOG_REQ, &Dbtup::execTUP_WRITELOG_REQ);

View File

@ -53,7 +53,10 @@ Dbtup::execACC_SCANREQ(Signal* signal)
Fragrecord& frag = *fragPtr.p; Fragrecord& frag = *fragPtr.p;
// flags // flags
Uint32 bits = 0; Uint32 bits = 0;
if (frag.m_lcp_scan_op == RNIL) {
if (!AccScanReq::getLcpScanFlag(req->requestInfo) ||
tablePtr.p->m_no_of_disk_attributes == 0)
{
// seize from pool and link to per-fragment list // seize from pool and link to per-fragment list
LocalDLList<ScanOp> list(c_scanOpPool, frag.m_scanList); LocalDLList<ScanOp> list(c_scanOpPool, frag.m_scanList);
if (! list.seize(scanPtr)) { if (! list.seize(scanPtr)) {
@ -63,23 +66,25 @@ Dbtup::execACC_SCANREQ(Signal* signal)
if (!AccScanReq::getNoDiskScanFlag(req->requestInfo) if (!AccScanReq::getNoDiskScanFlag(req->requestInfo)
&& tablePtr.p->m_no_of_disk_attributes) && tablePtr.p->m_no_of_disk_attributes)
{ {
bits |= ScanOp::SCAN_DD; bits |= ScanOp::SCAN_DD;
} }
bool mm = (bits & ScanOp::SCAN_DD); bool mm = (bits & ScanOp::SCAN_DD);
if (tablePtr.p->m_attributes[mm].m_no_of_varsize > 0) { if (tablePtr.p->m_attributes[mm].m_no_of_varsize > 0) {
bits |= ScanOp::SCAN_VS; bits |= ScanOp::SCAN_VS;
// disk pages have fixed page format // disk pages have fixed page format
ndbrequire(! (bits & ScanOp::SCAN_DD)); ndbrequire(! (bits & ScanOp::SCAN_DD));
} }
if (! AccScanReq::getReadCommittedFlag(req->requestInfo)) { if (! AccScanReq::getReadCommittedFlag(req->requestInfo)) {
if (AccScanReq::getLockMode(req->requestInfo) == 0) if (AccScanReq::getLockMode(req->requestInfo) == 0)
bits |= ScanOp::SCAN_LOCK_SH; bits |= ScanOp::SCAN_LOCK_SH;
else else
bits |= ScanOp::SCAN_LOCK_EX; bits |= ScanOp::SCAN_LOCK_EX;
} }
} else { } else {
jam(); jam();
// LCP scan and disk
ndbrequire(frag.m_lcp_scan_op == c_lcp_scan_op); ndbrequire(frag.m_lcp_scan_op == c_lcp_scan_op);
c_scanOpPool.getPtr(scanPtr, frag.m_lcp_scan_op); c_scanOpPool.getPtr(scanPtr, frag.m_lcp_scan_op);
bits |= ScanOp::SCAN_LCP; bits |= ScanOp::SCAN_LCP;
@ -156,7 +161,7 @@ Dbtup::execNEXT_SCANREQ(Signal* signal)
conf->scanPtr = scan.m_userPtr; conf->scanPtr = scan.m_userPtr;
unsigned signalLength = 1; unsigned signalLength = 1;
sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF, sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF,
signal, signalLength, JBB); signal, signalLength, JBB);
return; return;
} }
break; break;
@ -171,7 +176,7 @@ Dbtup::execNEXT_SCANREQ(Signal* signal)
lockReq->requestInfo = AccLockReq::AbortWithConf; lockReq->requestInfo = AccLockReq::AbortWithConf;
lockReq->accOpPtr = scan.m_accLockOp; lockReq->accOpPtr = scan.m_accLockOp;
EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ,
signal, AccLockReq::UndoSignalLength); signal, AccLockReq::UndoSignalLength);
jamEntry(); jamEntry();
ndbrequire(lockReq->returnCode == AccLockReq::Success); ndbrequire(lockReq->returnCode == AccLockReq::Success);
scan.m_state = ScanOp::Aborting; scan.m_state = ScanOp::Aborting;
@ -182,10 +187,10 @@ Dbtup::execNEXT_SCANREQ(Signal* signal)
ndbrequire(scan.m_accLockOp != RNIL); ndbrequire(scan.m_accLockOp != RNIL);
AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend(); AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
lockReq->returnCode = RNIL; lockReq->returnCode = RNIL;
lockReq->requestInfo = AccLockReq::Unlock; lockReq->requestInfo = AccLockReq::Abort;
lockReq->accOpPtr = scan.m_accLockOp; lockReq->accOpPtr = scan.m_accLockOp;
EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ,
signal, AccLockReq::UndoSignalLength); signal, AccLockReq::UndoSignalLength);
jamEntry(); jamEntry();
ndbrequire(lockReq->returnCode == AccLockReq::Success); ndbrequire(lockReq->returnCode == AccLockReq::Success);
scan.m_accLockOp = RNIL; scan.m_accLockOp = RNIL;
@ -433,7 +438,7 @@ Dbtup::execACCKEYCONF(Signal* signal)
jam(); jam();
AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend(); AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
lockReq->returnCode = RNIL; lockReq->returnCode = RNIL;
lockReq->requestInfo = AccLockReq::Unlock; lockReq->requestInfo = AccLockReq::Abort;
lockReq->accOpPtr = scan.m_accLockOp; lockReq->accOpPtr = scan.m_accLockOp;
EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength); EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
jamEntry(); jamEntry();
@ -582,12 +587,15 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
Fragrecord& frag = *fragPtr.p; Fragrecord& frag = *fragPtr.p;
// tuple found // tuple found
Tuple_header* th = 0; Tuple_header* th = 0;
Uint32 thbits = 0;
Uint32 loop_count = 0; Uint32 loop_count = 0;
Uint32 scanGCI = scanPtr.p->m_scanGCI; Uint32 scanGCI = scanPtr.p->m_scanGCI;
Uint32 foundGCI; Uint32 foundGCI;
bool mm = (bits & ScanOp::SCAN_DD); const bool mm = (bits & ScanOp::SCAN_DD);
bool lcp = (bits & ScanOp::SCAN_LCP); const bool lcp = (bits & ScanOp::SCAN_LCP);
const bool dirty = (bits & ScanOp::SCAN_LOCK) == 0;
Uint32 lcp_list = fragPtr.p->m_lcp_keep_list; Uint32 lcp_list = fragPtr.p->m_lcp_keep_list;
Uint32 size = table.m_offsets[mm].m_fix_header_size + Uint32 size = table.m_offsets[mm].m_fix_header_size +
(bits & ScanOp::SCAN_VS ? Tuple_header::HeaderSize + 1: 0); (bits & ScanOp::SCAN_VS ? Tuple_header::HeaderSize + 1: 0);
@ -750,22 +758,22 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
{ {
pos.m_get = ScanPos::Get_next_tuple_fs; pos.m_get = ScanPos::Get_next_tuple_fs;
th = (Tuple_header*)&page->m_data[key.m_page_idx]; th = (Tuple_header*)&page->m_data[key.m_page_idx];
thbits = th->m_header_bits;
if (likely(! (bits & ScanOp::SCAN_NR))) if (likely(! (bits & ScanOp::SCAN_NR)))
{ {
if (! (th->m_header_bits & Tuple_header::FREE)) { jam();
goto found_tuple; if (! (thbits & Tuple_header::FREE))
}
else
{ {
jam(); if (! ((thbits & Tuple_header::ALLOC) && dirty))
// skip free tuple goto found_tuple;
} }
} }
else else
{ {
if ((foundGCI = *th->get_mm_gci(tablePtr.p)) > scanGCI) if ((foundGCI = *th->get_mm_gci(tablePtr.p)) > scanGCI)
{ {
if (! (th->m_header_bits & Tuple_header::FREE)) if (! (thbits & Tuple_header::FREE))
{ {
jam(); jam();
goto found_tuple; goto found_tuple;
@ -775,9 +783,11 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
goto found_deleted_rowid; goto found_deleted_rowid;
} }
} }
else else if (thbits != Fix_page::FREE_RECORD &&
th->m_operation_ptr_i != RNIL)
{ {
jam(); jam();
goto found_tuple; // Locked tuple...
// skip free tuple // skip free tuple
} }
} }
@ -793,8 +803,7 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
jam(); jam();
{ {
// caller has already set pos.m_get to next tuple // caller has already set pos.m_get to next tuple
if (! (bits & ScanOp::SCAN_LCP && if (! (bits & ScanOp::SCAN_LCP && thbits & Tuple_header::LCP_SKIP)) {
th->m_header_bits & Tuple_header::LCP_SKIP)) {
Local_key& key_mm = pos.m_key_mm; Local_key& key_mm = pos.m_key_mm;
if (! (bits & ScanOp::SCAN_DD)) { if (! (bits & ScanOp::SCAN_DD)) {
key_mm = pos.m_key; key_mm = pos.m_key;
@ -810,7 +819,11 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
} else { } else {
jam(); jam();
// clear it so that it will show up in next LCP // clear it so that it will show up in next LCP
th->m_header_bits &= ~(Uint32)Tuple_header::LCP_SKIP; th->m_header_bits = thbits & ~(Uint32)Tuple_header::LCP_SKIP;
if (tablePtr.p->m_bits & Tablerec::TR_Checksum) {
jam();
setChecksum(th, tablePtr.p);
}
} }
} }
break; break;
@ -833,7 +846,7 @@ Dbtup::scanNext(Signal* signal, ScanOpPtr scanPtr)
th = (Tuple_header*)(mmpage->m_data + key_mm.m_page_idx); th = (Tuple_header*)(mmpage->m_data + key_mm.m_page_idx);
if ((foundGCI = *th->get_mm_gci(tablePtr.p)) > scanGCI) if ((foundGCI = *th->get_mm_gci(tablePtr.p)) > scanGCI)
{ {
if (! (th->m_header_bits & Tuple_header::FREE)) if (! (thbits & Tuple_header::FREE))
break; break;
} }
} }
@ -893,7 +906,7 @@ found_lcp_keep:
NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend(); NextScanConf* const conf = (NextScanConf*)signal->getDataPtrSend();
conf->scanPtr = scan.m_userPtr; conf->scanPtr = scan.m_userPtr;
conf->accOperationPtr = RNIL + 1; conf->accOperationPtr = (Uint32)-1;
conf->fragId = frag.fragmentId; conf->fragId = frag.fragmentId;
conf->localKey[0] = lcp_list; conf->localKey[0] = lcp_list;
conf->localKey[1] = 0; conf->localKey[1] = 0;

View File

@ -321,7 +321,7 @@ Dbtux::execNEXT_SCANREQ(Signal* signal)
conf->scanPtr = scan.m_userPtr; conf->scanPtr = scan.m_userPtr;
unsigned signalLength = 1; unsigned signalLength = 1;
sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF, sendSignal(scanPtr.p->m_userRef, GSN_NEXT_SCANCONF,
signal, signalLength, JBB); signal, signalLength, JBB);
return; return;
} }
break; break;
@ -344,7 +344,8 @@ Dbtux::execNEXT_SCANREQ(Signal* signal)
lockReq->returnCode = RNIL; lockReq->returnCode = RNIL;
lockReq->requestInfo = AccLockReq::AbortWithConf; lockReq->requestInfo = AccLockReq::AbortWithConf;
lockReq->accOpPtr = scan.m_accLockOp; lockReq->accOpPtr = scan.m_accLockOp;
EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength); EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal,
AccLockReq::UndoSignalLength);
jamEntry(); jamEntry();
ndbrequire(lockReq->returnCode == AccLockReq::Success); ndbrequire(lockReq->returnCode == AccLockReq::Success);
scan.m_state = ScanOp::Aborting; scan.m_state = ScanOp::Aborting;
@ -355,9 +356,10 @@ Dbtux::execNEXT_SCANREQ(Signal* signal)
ndbrequire(scan.m_accLockOp != RNIL); ndbrequire(scan.m_accLockOp != RNIL);
AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend(); AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
lockReq->returnCode = RNIL; lockReq->returnCode = RNIL;
lockReq->requestInfo = AccLockReq::Unlock; lockReq->requestInfo = AccLockReq::Abort;
lockReq->accOpPtr = scan.m_accLockOp; lockReq->accOpPtr = scan.m_accLockOp;
EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength); EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal,
AccLockReq::UndoSignalLength);
jamEntry(); jamEntry();
ndbrequire(lockReq->returnCode == AccLockReq::Success); ndbrequire(lockReq->returnCode == AccLockReq::Success);
scan.m_accLockOp = RNIL; scan.m_accLockOp = RNIL;
@ -612,7 +614,7 @@ Dbtux::execACCKEYCONF(Signal* signal)
jam(); jam();
AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend(); AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
lockReq->returnCode = RNIL; lockReq->returnCode = RNIL;
lockReq->requestInfo = AccLockReq::Unlock; lockReq->requestInfo = AccLockReq::Abort;
lockReq->accOpPtr = scan.m_accLockOp; lockReq->accOpPtr = scan.m_accLockOp;
EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength); EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
jamEntry(); jamEntry();

View File

@ -38,6 +38,7 @@
#include <mgmapi_config_parameters.h> #include <mgmapi_config_parameters.h>
int global_flag_send_heartbeat_now= 0; int global_flag_send_heartbeat_now= 0;
int global_flag_skip_invalidate_cache = 0;
// Just a C wrapper for threadMain // Just a C wrapper for threadMain
extern "C" extern "C"
@ -458,11 +459,14 @@ ClusterMgr::reportNodeFailed(NodeId nodeId){
theNode.nfCompleteRep = false; theNode.nfCompleteRep = false;
if(noOfAliveNodes == 0) if(noOfAliveNodes == 0)
{ {
theFacade.m_globalDictCache.lock(); if (!global_flag_skip_invalidate_cache)
theFacade.m_globalDictCache.invalidate_all(); {
theFacade.m_globalDictCache.unlock(); theFacade.m_globalDictCache.lock();
m_connect_count ++; theFacade.m_globalDictCache.invalidate_all();
m_cluster_state = CS_waiting_for_clean_cache; theFacade.m_globalDictCache.unlock();
m_connect_count ++;
m_cluster_state = CS_waiting_for_clean_cache;
}
NFCompleteRep rep; NFCompleteRep rep;
for(Uint32 i = 1; i<MAX_NODES; i++){ for(Uint32 i = 1; i<MAX_NODES; i++){
if(theNodes[i].defined && theNodes[i].nfCompleteRep == false){ if(theNodes[i].defined && theNodes[i].nfCompleteRep == false){

View File

@ -478,10 +478,14 @@ int NdbScanOperation::nextResultImpl(bool fetchAllowed, bool forceSend)
*/ */
PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter, PollGuard poll_guard(tp, &theNdb->theImpl->theWaiter,
theNdb->theNdbBlockNumber); theNdb->theNdbBlockNumber);
if(theError.code)
return -1;
Uint32 seq = theNdbCon->theNodeSequence; const Uint32 seq = theNdbCon->theNodeSequence;
if(theError.code)
{
goto err4;
}
if(seq == tp->getNodeSequence(nodeId) && send_next_scan(idx, false) == 0) if(seq == tp->getNodeSequence(nodeId) && send_next_scan(idx, false) == 0)
{ {
@ -564,6 +568,10 @@ int NdbScanOperation::nextResultImpl(bool fetchAllowed, bool forceSend)
if(theError.code == 0) if(theError.code == 0)
setErrorCode(4028); // seq changed = Node fail setErrorCode(4028); // seq changed = Node fail
break; break;
case -4:
err4:
setErrorCode(theError.code);
break;
} }
theNdbCon->theTransactionIsStarted = false; theNdbCon->theTransactionIsStarted = false;

View File

@ -108,6 +108,8 @@ public:
NDBT_ResultRow& get_row(Uint32 idx) { return *rows[idx];} NDBT_ResultRow& get_row(Uint32 idx) { return *rows[idx];}
int execute_async(Ndb*, NdbTransaction::ExecType, NdbTransaction::AbortOption = NdbTransaction::AbortOnError); int execute_async(Ndb*, NdbTransaction::ExecType, NdbTransaction::AbortOption = NdbTransaction::AbortOnError);
int execute_async_prepare(Ndb*, NdbTransaction::ExecType, NdbTransaction::AbortOption = NdbTransaction::AbortOnError);
int wait_async(Ndb*, int timeout = -1); int wait_async(Ndb*, int timeout = -1);
protected: protected:

View File

@ -1090,11 +1090,6 @@ runMassiveRollback4(NDBT_Context* ctx, NDBT_Step* step){
ok = false; ok = false;
break; break;
} }
if (hugoOps.execute_NoCommit(pNdb) != 0)
{
ok = false;
break;
}
} }
hugoOps.execute_Rollback(pNdb); hugoOps.execute_Rollback(pNdb);
CHECK(hugoOps.closeTransaction(pNdb) == 0); CHECK(hugoOps.closeTransaction(pNdb) == 0);
@ -1199,6 +1194,62 @@ runTupErrors(NDBT_Context* ctx, NDBT_Step* step){
return NDBT_OK; return NDBT_OK;
} }
int
runInsertError(NDBT_Context* ctx, NDBT_Step* step){
int result = NDBT_OK;
HugoOperations hugoOp1(*ctx->getTab());
HugoOperations hugoOp2(*ctx->getTab());
Ndb* pNdb = GETNDB(step);
NdbRestarter restarter;
restarter.insertErrorInAllNodes(4017);
const Uint32 LOOPS = 10;
for (Uint32 i = 0; i<LOOPS; i++)
{
CHECK(hugoOp1.startTransaction(pNdb) == 0);
CHECK(hugoOp1.pkInsertRecord(pNdb, 1) == 0);
CHECK(hugoOp2.startTransaction(pNdb) == 0);
CHECK(hugoOp2.pkReadRecord(pNdb, 1, 1) == 0);
CHECK(hugoOp1.execute_async_prepare(pNdb, NdbTransaction::Commit) == 0);
CHECK(hugoOp2.execute_async_prepare(pNdb, NdbTransaction::Commit) == 0);
hugoOp1.wait_async(pNdb);
hugoOp2.wait_async(pNdb);
CHECK(hugoOp1.closeTransaction(pNdb) == 0);
CHECK(hugoOp2.closeTransaction(pNdb) == 0);
}
restarter.insertErrorInAllNodes(0);
return result;
}
int
runInsertError2(NDBT_Context* ctx, NDBT_Step* step){
int result = NDBT_OK;
HugoOperations hugoOp1(*ctx->getTab());
Ndb* pNdb = GETNDB(step);
NdbRestarter restarter;
restarter.insertErrorInAllNodes(4017);
const Uint32 LOOPS = 1;
for (Uint32 i = 0; i<LOOPS; i++)
{
CHECK(hugoOp1.startTransaction(pNdb) == 0);
CHECK(hugoOp1.pkInsertRecord(pNdb, 1) == 0);
CHECK(hugoOp1.pkDeleteRecord(pNdb, 1) == 0);
hugoOp1.execute_NoCommit(pNdb);
CHECK(hugoOp1.closeTransaction(pNdb) == 0);
}
restarter.insertErrorInAllNodes(0);
return NDBT_OK;
}
NDBT_TESTSUITE(testBasic); NDBT_TESTSUITE(testBasic);
TESTCASE("PkInsert", TESTCASE("PkInsert",
"Verify that we can insert and delete from this table using PK" "Verify that we can insert and delete from this table using PK"
@ -1449,16 +1500,16 @@ TESTCASE("MassiveTransaction",
INITIALIZER(runLoadTable2); INITIALIZER(runLoadTable2);
FINALIZER(runClearTable2); FINALIZER(runClearTable2);
} }
TESTCASE("Fill",
"Verify what happens when we fill the db" ){
INITIALIZER(runFillTable);
INITIALIZER(runPkRead);
FINALIZER(runClearTable2);
}
TESTCASE("TupError", TESTCASE("TupError",
"Verify what happens when we fill the db" ){ "Verify what happens when we fill the db" ){
INITIALIZER(runTupErrors); INITIALIZER(runTupErrors);
} }
TESTCASE("InsertError", "" ){
INITIALIZER(runInsertError);
}
TESTCASE("InsertError2", "" ){
INITIALIZER(runInsertError2);
}
NDBT_TESTSUITE_END(testBasic); NDBT_TESTSUITE_END(testBasic);
#if 0 #if 0
@ -1469,6 +1520,12 @@ TESTCASE("ReadConsistency",
STEP(runReadOne); STEP(runReadOne);
FINALIZER(runClearTable2); FINALIZER(runClearTable2);
} }
TESTCASE("Fill",
"Verify what happens when we fill the db" ){
INITIALIZER(runFillTable);
INITIALIZER(runPkRead);
FINALIZER(runClearTable2);
}
#endif #endif
int main(int argc, const char** argv){ int main(int argc, const char** argv){

View File

@ -696,7 +696,10 @@ runBug18612(NDBT_Context* ctx, NDBT_Step* step){
do { do {
int tmp = restarter.getRandomNodeOtherNodeGroup(node1, rand()); int tmp = restarter.getRandomNodeOtherNodeGroup(node1, rand());
if (tmp == -1) if (tmp == -1)
break; {
ctx->stopTest();
return NDBT_OK;
}
node1 = tmp; node1 = tmp;
} while(nodesmask.get(node1)); } while(nodesmask.get(node1));
@ -876,12 +879,15 @@ int runBug20185(NDBT_Context* ctx, NDBT_Step* step){
HugoOperations hugoOps(*ctx->getTab()); HugoOperations hugoOps(*ctx->getTab());
Ndb* pNdb = GETNDB(step); Ndb* pNdb = GETNDB(step);
const int masterNode = restarter.getMasterNodeId();
int dump[] = { 7090, 20 } ; int dump[] = { 7090, 20 } ;
if (restarter.dumpStateAllNodes(dump, 2)) if (restarter.dumpStateAllNodes(dump, 2))
return NDBT_FAILED; return NDBT_FAILED;
NdbSleep_MilliSleep(3000); NdbSleep_MilliSleep(3000);
retry:
if(hugoOps.startTransaction(pNdb) != 0) if(hugoOps.startTransaction(pNdb) != 0)
return NDBT_FAILED; return NDBT_FAILED;
@ -891,8 +897,14 @@ int runBug20185(NDBT_Context* ctx, NDBT_Step* step){
if (hugoOps.execute_NoCommit(pNdb) != 0) if (hugoOps.execute_NoCommit(pNdb) != 0)
return NDBT_FAILED; return NDBT_FAILED;
int nodeId;
const int node = hugoOps.getTransaction()->getConnectedNodeId(); const int node = hugoOps.getTransaction()->getConnectedNodeId();
if (node != masterNode)
{
hugoOps.closeTransaction(pNdb);
goto retry;
}
int nodeId;
do { do {
nodeId = restarter.getDbNodeId(rand() % restarter.getNumDbNodes()); nodeId = restarter.getDbNodeId(rand() % restarter.getNumDbNodes());
} while (nodeId == node); } while (nodeId == node);

View File

@ -665,9 +665,9 @@ main(int argc, const char** argv){
for(Uint32 i = 0; i < 12; i++) for(Uint32 i = 0; i < 12; i++)
{ {
if(i == 6 || i == 8 || i == 10) if(false && (i == 6 || i == 8 || i == 10))
continue; continue;
BaseString name("bug_9749"); BaseString name("bug_9749");
name.appfmt("_%d", i); name.appfmt("_%d", i);
NDBT_TestCaseImpl1 *pt = new NDBT_TestCaseImpl1(&ts, NDBT_TestCaseImpl1 *pt = new NDBT_TestCaseImpl1(&ts,

View File

@ -286,15 +286,26 @@ int runRandScanRead(NDBT_Context* ctx, NDBT_Step* step){
int records = ctx->getNumRecords(); int records = ctx->getNumRecords();
int parallelism = ctx->getProperty("Parallelism", 240); int parallelism = ctx->getProperty("Parallelism", 240);
int abort = ctx->getProperty("AbortProb", 5); int abort = ctx->getProperty("AbortProb", 5);
int tupscan = ctx->getProperty("TupScan", (Uint32)0);
int i = 0; int i = 0;
HugoTransactions hugoTrans(*ctx->getTab()); HugoTransactions hugoTrans(*ctx->getTab());
while (i<loops && !ctx->isTestStopped()) { while (i<loops && !ctx->isTestStopped()) {
g_info << i << ": "; g_info << i << ": ";
NdbOperation::LockMode lm = (NdbOperation::LockMode)(rand() % 3); NdbOperation::LockMode lm = (NdbOperation::LockMode)(rand() % 3);
int scan_flags = 0;
if (tupscan == 1)
scan_flags |= NdbScanOperation::SF_TupScan;
else if (tupscan == 2 && ((rand() & 0x800)))
{
scan_flags |= NdbScanOperation::SF_TupScan;
}
if (hugoTrans.scanReadRecords(GETNDB(step), if (hugoTrans.scanReadRecords(GETNDB(step),
records, abort, parallelism, records, abort, parallelism,
lm) != 0){ lm,
scan_flags) != 0){
return NDBT_FAILED; return NDBT_FAILED;
} }
i++; i++;
@ -1320,6 +1331,16 @@ TESTCASE("ScanRead488",
STEPS(runRandScanRead, 70); STEPS(runRandScanRead, 70);
FINALIZER(runClearTable); FINALIZER(runClearTable);
} }
TESTCASE("ScanRead488T",
"Verify scan requirement: It's only possible to have 11 concurrent "\
"scans per fragment running in Ndb kernel at the same time. "\
"When this limit is exceeded the scan will be aborted with errorcode "\
"488."){
TC_PROPERTY("TupScan", 1);
INITIALIZER(runLoadTable);
STEPS(runRandScanRead, 70);
FINALIZER(runClearTable);
}
TESTCASE("ScanRead488O", TESTCASE("ScanRead488O",
"Verify scan requirement: It's only possible to have 11 concurrent "\ "Verify scan requirement: It's only possible to have 11 concurrent "\
"scans per fragment running in Ndb kernel at the same time. "\ "scans per fragment running in Ndb kernel at the same time. "\
@ -1336,6 +1357,7 @@ TESTCASE("ScanRead488_Mixed",
"scans per fragment running in Ndb kernel at the same time. "\ "scans per fragment running in Ndb kernel at the same time. "\
"When this limit is exceeded the scan will be aborted with errorcode "\ "When this limit is exceeded the scan will be aborted with errorcode "\
"488."){ "488."){
TC_PROPERTY("TupScan", 2);
INITIALIZER(createOrderedPkIndex); INITIALIZER(createOrderedPkIndex);
INITIALIZER(runLoadTable); INITIALIZER(runLoadTable);
STEPS(runRandScanRead, 50); STEPS(runRandScanRead, 50);

View File

@ -199,7 +199,7 @@ max-time: 500
cmd: testBasicAsynch cmd: testBasicAsynch
args: -n PkDeleteAsynch args: -n PkDeleteAsynch
max-time: 500 max-time: 1000
cmd: testBasic cmd: testBasic
args: -n MassiveRollback T1 T7 D1 D2 args: -n MassiveRollback T1 T7 D1 D2
@ -219,6 +219,14 @@ max-time: 500
cmd: testBasic cmd: testBasic
args: -n TupError args: -n TupError
max-time: 500
cmd: testBasic
args: -n InsertError T1
max-time: 500
cmd: testBasic
args: -n InsertError2 T1
max-time: 500 max-time: 500
cmd: testTimeout cmd: testTimeout
args: T1 args: T1
@ -273,6 +281,10 @@ max-time: 500
cmd: testScan cmd: testScan
args: -n ScanRead488O -l 10 T6 D1 D2 args: -n ScanRead488O -l 10 T6 D1 D2
max-time: 1000
cmd: testScan
args: -n ScanRead488T -l 10 T6 D1 D2
max-time: 1000 max-time: 1000
cmd: testScan cmd: testScan
args: -n ScanRead488_Mixed -l 10 T6 D1 D2 args: -n ScanRead488_Mixed -l 10 T6 D1 D2

View File

@ -471,16 +471,33 @@ HugoOperations::execute_async(Ndb* pNdb, NdbTransaction::ExecType et,
return NDBT_OK; return NDBT_OK;
} }
int
HugoOperations::execute_async_prepare(Ndb* pNdb, NdbTransaction::ExecType et,
NdbTransaction::AbortOption eao){
m_async_reply= 0;
pTrans->executeAsynchPrepare(et,
HugoOperations_async_callback,
this,
eao);
return NDBT_OK;
}
int int
HugoOperations::wait_async(Ndb* pNdb, int timeout) HugoOperations::wait_async(Ndb* pNdb, int timeout)
{ {
pNdb->pollNdb(1000); volatile int * wait = &m_async_reply;
while (!* wait)
if(m_async_reply)
{ {
if(m_async_return) pNdb->sendPollNdb(1000);
ndbout << "ERROR: " << pNdb->getNdbError(m_async_return) << endl;
return m_async_return; if(* wait)
{
if(m_async_return)
ndbout << "ERROR: " << pNdb->getNdbError(m_async_return) << endl;
return m_async_return;
}
} }
ndbout_c("wait returned nothing..."); ndbout_c("wait returned nothing...");
return -1; return -1;

View File

@ -1110,6 +1110,7 @@ static int opt_timer;
static char * opt_remote_mgm = NULL; static char * opt_remote_mgm = NULL;
static char * opt_testname = NULL; static char * opt_testname = NULL;
static int opt_verbose; static int opt_verbose;
static int opt_seed = 0;
static struct my_option my_long_options[] = static struct my_option my_long_options[] =
{ {
@ -1129,6 +1130,9 @@ static struct my_option my_long_options[] =
{ "loops", 'l', "Number of loops", { "loops", 'l', "Number of loops",
(gptr*) &opt_loops, (gptr*) &opt_loops, 0, (gptr*) &opt_loops, (gptr*) &opt_loops, 0,
GET_INT, REQUIRED_ARG, 5, 0, 0, 0, 0, 0 }, GET_INT, REQUIRED_ARG, 5, 0, 0, 0, 0, 0 },
{ "seed", 1024, "Random seed",
(gptr*) &opt_seed, (gptr*) &opt_seed, 0,
GET_INT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
{ "testname", 'n', "Name of test to run", { "testname", 'n', "Name of test to run",
(gptr*) &opt_testname, (gptr*) &opt_testname, 0, (gptr*) &opt_testname, (gptr*) &opt_testname, 0,
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 }, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
@ -1145,6 +1149,8 @@ static struct my_option my_long_options[] =
{ 0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0} { 0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}
}; };
extern int global_flag_skip_invalidate_cache;
static void usage() static void usage()
{ {
ndb_std_print_version(); ndb_std_print_version();
@ -1224,6 +1230,16 @@ int NDBT_TestSuite::execute(int argc, const char** argv){
{ {
return NDBT_ProgramExit(NDBT_FAILED); return NDBT_ProgramExit(NDBT_FAILED);
} }
if (opt_seed == 0)
{
opt_seed = NdbTick_CurrentMillisecond();
}
ndbout_c("random seed: %u", opt_seed);
srand(opt_seed);
srandom(opt_seed);
global_flag_skip_invalidate_cache = 1;
{ {
Ndb ndb(&con, "TEST_DB"); Ndb ndb(&con, "TEST_DB");