Merge mskold@bk-internal.mysql.com:/home/bk/mysql-5.0

into  mysql.com:/windows/Linux_space/MySQL/mysql-5.0-ndb
This commit is contained in:
mskold/marty@mysql.com/linux.site 2006-12-04 15:09:47 +01:00
commit 6e8f99d29b
36 changed files with 685 additions and 160 deletions

View File

@ -140,7 +140,40 @@ b int unsigned not null,
c int unsigned, c int unsigned,
UNIQUE USING HASH (b, c) UNIQUE USING HASH (b, c)
) engine=ndbcluster; ) engine=ndbcluster;
ERROR 42000: Column 'c' is used with UNIQUE or INDEX but is not defined as NOT NULL Warnings:
Warning 1121 Ndb does not support unique index on NULL valued attributes, index access with NULL value will become full table scan
insert t2 values(1,1,NULL),(2,2,2),(3,3,NULL),(4,4,4),(5,5,NULL),(6,6,6),(7,7,NULL),(8,3,NULL),(9,3,NULL);
select * from t2 where c IS NULL order by a;
a b c
1 1 NULL
3 3 NULL
5 5 NULL
7 7 NULL
8 3 NULL
9 3 NULL
select * from t2 where b = 3 AND c IS NULL order by a;
a b c
3 3 NULL
8 3 NULL
9 3 NULL
select * from t2 where (b = 3 OR b = 5) AND c IS NULL order by a;
a b c
3 3 NULL
5 5 NULL
8 3 NULL
9 3 NULL
set @old_ecpd = @@session.engine_condition_pushdown;
set engine_condition_pushdown = true;
explain select * from t2 where (b = 3 OR b = 5) AND c IS NULL AND a < 9 order by a;
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE t2 range PRIMARY,b PRIMARY 4 NULL 1 Using where with pushed condition
select * from t2 where (b = 3 OR b = 5) AND c IS NULL AND a < 9 order by a;
a b c
3 3 NULL
5 5 NULL
8 3 NULL
set engine_condition_pushdown = @old_ecpd;
drop table t2;
CREATE TABLE t3 ( CREATE TABLE t3 (
a int unsigned NOT NULL, a int unsigned NOT NULL,
b int unsigned not null, b int unsigned not null,

View File

@ -17,8 +17,17 @@ pk1 b c
0 0 0 0 0 0
2 2 2 2 2 2
4 1 1 4 1 1
UPDATE t1 set pk1 = 4 where pk1 = 2;
ERROR 23000: Duplicate entry '4' for key 1
UPDATE IGNORE t1 set pk1 = 4 where pk1 = 2;
select * from t1 order by pk1;
pk1 b c
0 0 0
2 2 2
4 1 1
UPDATE t1 set pk1 = 1, c = 2 where pk1 = 4; UPDATE t1 set pk1 = 1, c = 2 where pk1 = 4;
ERROR 23000: Duplicate entry '' for key 0 ERROR 23000: Duplicate entry '' for key 0
UPDATE IGNORE t1 set pk1 = 1, c = 2 where pk1 = 4;
select * from t1 order by pk1; select * from t1 order by pk1;
pk1 b c pk1 b c
0 0 0 0 0 0

View File

@ -85,7 +85,6 @@ select * from t2 order by a;
drop table t2; drop table t2;
-- error 1121
CREATE TABLE t2 ( CREATE TABLE t2 (
a int unsigned NOT NULL PRIMARY KEY, a int unsigned NOT NULL PRIMARY KEY,
b int unsigned not null, b int unsigned not null,
@ -93,6 +92,20 @@ CREATE TABLE t2 (
UNIQUE USING HASH (b, c) UNIQUE USING HASH (b, c)
) engine=ndbcluster; ) engine=ndbcluster;
insert t2 values(1,1,NULL),(2,2,2),(3,3,NULL),(4,4,4),(5,5,NULL),(6,6,6),(7,7,NULL),(8,3,NULL),(9,3,NULL);
select * from t2 where c IS NULL order by a;
select * from t2 where b = 3 AND c IS NULL order by a;
select * from t2 where (b = 3 OR b = 5) AND c IS NULL order by a;
set @old_ecpd = @@session.engine_condition_pushdown;
set engine_condition_pushdown = true;
explain select * from t2 where (b = 3 OR b = 5) AND c IS NULL AND a < 9 order by a;
select * from t2 where (b = 3 OR b = 5) AND c IS NULL AND a < 9 order by a;
set engine_condition_pushdown = @old_ecpd;
drop table t2;
# #
# Show use of PRIMARY KEY USING HASH indexes # Show use of PRIMARY KEY USING HASH indexes
# #

View File

@ -23,8 +23,13 @@ UPDATE t1 set b = c;
select * from t1 order by pk1; select * from t1 order by pk1;
UPDATE t1 set pk1 = 4 where pk1 = 1; UPDATE t1 set pk1 = 4 where pk1 = 1;
select * from t1 order by pk1; select * from t1 order by pk1;
--error 1062
UPDATE t1 set pk1 = 4 where pk1 = 2;
UPDATE IGNORE t1 set pk1 = 4 where pk1 = 2;
select * from t1 order by pk1;
-- error 1062 -- error 1062
UPDATE t1 set pk1 = 1, c = 2 where pk1 = 4; UPDATE t1 set pk1 = 1, c = 2 where pk1 = 4;
UPDATE IGNORE t1 set pk1 = 1, c = 2 where pk1 = 4;
select * from t1 order by pk1; select * from t1 order by pk1;
UPDATE t1 set pk1 = pk1 + 10; UPDATE t1 set pk1 = pk1 + 10;
select * from t1 order by pk1; select * from t1 order by pk1;

View File

@ -93,7 +93,7 @@ typedef ndberror_classification_enum ndberror_classification;
const char *ndberror_status_message(ndberror_status); const char *ndberror_status_message(ndberror_status);
const char *ndberror_classification_message(ndberror_classification); const char *ndberror_classification_message(ndberror_classification);
void ndberror_update(ndberror_struct *); void ndberror_update(ndberror_struct *);
int ndb_error_string(int err_no, char *str, unsigned int size); int ndb_error_string(int err_no, char *str, int size);
#endif /* doxygen skip internal*/ #endif /* doxygen skip internal*/

View File

@ -123,13 +123,25 @@ bool
File_class::close() File_class::close()
{ {
bool rc = true; bool rc = true;
int retval = 0;
if (m_file != NULL) if (m_file != NULL)
{ {
::fflush(m_file); ::fflush(m_file);
rc = (::fclose(m_file) == 0 ? true : false); retval = ::fclose(m_file);
m_file = NULL; // Try again? while ( (retval != 0) && (errno == EINTR) ){
retval = ::fclose(m_file);
}
if( retval == 0){
rc = true;
}
else {
rc = false;
ndbout_c("ERROR: Close file error in File.cpp for %s",strerror(errno));
}
} }
m_file = NULL;
return rc; return rc;
} }

View File

@ -65,6 +65,5 @@ SocketInputStream::gets(char * buf, int bufLen) {
if(res == -1) if(res == -1)
return 0; return 0;
return buf; return buf;
} }

View File

@ -136,7 +136,7 @@ NdbSqlUtil::m_typeList[] = {
}, },
{ // 22 { // 22
Type::Bit, Type::Bit,
NULL, cmpBit,
NULL NULL
}, },
{ // 23 { // 23
@ -678,6 +678,17 @@ NdbSqlUtil::cmpText(const void* info, const void* p1, unsigned n1, const void* p
return 0; return 0;
} }
int
NdbSqlUtil::cmpBit(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
{
Uint32 n = (n1 < n2) ? n1 : n2;
char* c1 = (char*)p1;
char* c2 = (char*)p2;
int ret = memcmp(p1, p2, n);
return ret;
}
int int
NdbSqlUtil::cmpTime(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full) NdbSqlUtil::cmpTime(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
{ {
@ -698,12 +709,6 @@ NdbSqlUtil::cmpTime(const void* info, const void* p1, unsigned n1, const void* p
} }
// not yet // not yet
int
NdbSqlUtil::cmpBit(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)
{
assert(false);
return 0;
}
int int
NdbSqlUtil::cmpLongvarchar(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full) NdbSqlUtil::cmpLongvarchar(const void* info, const void* p1, unsigned n1, const void* p2, unsigned n2, bool full)

View File

@ -53,10 +53,6 @@ readln_socket(NDB_SOCKET_TYPE socket, int timeout_millis,
if(buflen <= 1) if(buflen <= 1)
return 0; return 0;
int sock_flags= fcntl(socket, F_GETFL);
if(fcntl(socket, F_SETFL, sock_flags | O_NONBLOCK) == -1)
return -1;
fd_set readset; fd_set readset;
FD_ZERO(&readset); FD_ZERO(&readset);
FD_SET(socket, &readset); FD_SET(socket, &readset);
@ -71,43 +67,70 @@ readln_socket(NDB_SOCKET_TYPE socket, int timeout_millis,
} }
if(selectRes == -1){ if(selectRes == -1){
fcntl(socket, F_SETFL, sock_flags);
return -1; return -1;
} }
const int t = recv(socket, buf, buflen, MSG_PEEK); char* ptr = buf;
int len = buflen;
if(t < 1) do
{ {
fcntl(socket, F_SETFL, sock_flags); int t;
return -1; while((t = recv(socket, ptr, len, MSG_PEEK)) == -1 && errno == EINTR);
}
if(t < 1)
for(int i=0; i< t;i++) {
{ return -1;
if(buf[i] == '\n'){
int r= recv(socket, buf, i+1, 0);
buf[i+1]= 0;
if(r < 1) {
fcntl(socket, F_SETFL, sock_flags);
return -1;
}
if(i > 0 && buf[i-1] == '\r'){
buf[i-1] = '\n';
buf[i]= '\0';
}
fcntl(socket, F_SETFL, sock_flags);
return r;
} }
}
int r= recv(socket, buf, t, 0);
if(r>=0) for(int i = 0; i<t; i++)
buf[r] = 0; {
fcntl(socket, F_SETFL, sock_flags); if(ptr[i] == '\n')
return r; {
/**
* Now consume
*/
for (len = 1 + i; len; )
{
while ((t = recv(socket, ptr, len, 0)) == -1 && errno == EINTR);
if (t < 1)
return -1;
ptr += t;
len -= t;
}
if (i > 0 && buf[i-1] == '\r')
{
buf[i-1] = '\n';
ptr--;
}
ptr[0]= 0;
return ptr - buf;
}
}
for (int tmp = t; tmp; )
{
while ((t = recv(socket, ptr, tmp, 0)) == -1 && errno == EINTR);
if (t < 1)
{
return -1;
}
ptr += t;
len -= t;
tmp -= t;
}
FD_ZERO(&readset);
FD_SET(socket, &readset);
timeout.tv_sec = (timeout_millis / 1000);
timeout.tv_usec = (timeout_millis % 1000) * 1000;
const int selectRes = select(socket + 1, &readset, 0, 0, &timeout);
if(selectRes != 1){
return -1;
}
} while (len > 0);
return -1;
} }
extern "C" extern "C"

View File

@ -6,7 +6,7 @@ Next DBTUP 4014
Next DBLQH 5043 Next DBLQH 5043
Next DBDICT 6007 Next DBDICT 6007
Next DBDIH 7178 Next DBDIH 7178
Next DBTC 8038 Next DBTC 8039
Next CMVMI 9000 Next CMVMI 9000
Next BACKUP 10022 Next BACKUP 10022
Next DBUTIL 11002 Next DBUTIL 11002
@ -287,6 +287,11 @@ ABORT OF TCKEYREQ
8037 : Invalid schema version in TCINDXREQ 8037 : Invalid schema version in TCINDXREQ
------
8038 : Simulate API disconnect just after SCAN_TAB_REQ
CMVMI CMVMI
----- -----
9000 Set RestartOnErrorInsert to restart -n 9000 Set RestartOnErrorInsert to restart -n

View File

@ -91,6 +91,7 @@ Cmvmi::Cmvmi(const Configuration & conf) :
addRecSignal(GSN_DUMP_STATE_ORD, &Cmvmi::execDUMP_STATE_ORD); addRecSignal(GSN_DUMP_STATE_ORD, &Cmvmi::execDUMP_STATE_ORD);
addRecSignal(GSN_TESTSIG, &Cmvmi::execTESTSIG); addRecSignal(GSN_TESTSIG, &Cmvmi::execTESTSIG);
addRecSignal(GSN_NODE_START_REP, &Cmvmi::execNODE_START_REP, true);
subscriberPool.setSize(5); subscriberPool.setSize(5);
@ -340,6 +341,16 @@ void Cmvmi::execSTTOR(Signal* signal)
jamEntry(); jamEntry();
if (theStartPhase == 1){ if (theStartPhase == 1){
jam(); jam();
if(theConfig.lockPagesInMainMemory())
{
int res = NdbMem_MemLockAll();
if(res != 0){
g_eventLogger.warning("Failed to memlock pages");
warningEvent("Failed to memlock pages");
}
}
sendSTTORRY(signal); sendSTTORRY(signal);
return; return;
} else if (theStartPhase == 3) { } else if (theStartPhase == 3) {
@ -359,18 +370,6 @@ void Cmvmi::execSTTOR(Signal* signal)
signal->theData[2] = NodeInfo::REP; signal->theData[2] = NodeInfo::REP;
execOPEN_COMREQ(signal); execOPEN_COMREQ(signal);
globalData.theStartLevel = NodeState::SL_STARTED; globalData.theStartLevel = NodeState::SL_STARTED;
sendSTTORRY(signal);
} else {
jam();
if(theConfig.lockPagesInMainMemory()){
int res = NdbMem_MemLockAll();
if(res != 0){
g_eventLogger.warning("Failed to memlock pages");
warningEvent("Failed to memlock pages");
}
}
sendSTTORRY(signal); sendSTTORRY(signal);
} }
} }
@ -425,7 +424,8 @@ void Cmvmi::execOPEN_COMREQ(Signal* signal)
if(len == 2){ if(len == 2){
#ifdef ERROR_INSERT #ifdef ERROR_INSERT
if (! (ERROR_INSERTED(9000) && c_error_9000_nodes_mask.get(tStartingNode))) if (! ((ERROR_INSERTED(9000) || ERROR_INSERTED(9002))
&& c_error_9000_nodes_mask.get(tStartingNode)))
#endif #endif
{ {
globalTransporterRegistry.do_connect(tStartingNode); globalTransporterRegistry.do_connect(tStartingNode);
@ -446,7 +446,8 @@ void Cmvmi::execOPEN_COMREQ(Signal* signal)
jam(); jam();
#ifdef ERROR_INSERT #ifdef ERROR_INSERT
if (ERROR_INSERTED(9000) && c_error_9000_nodes_mask.get(i)) if ((ERROR_INSERTED(9000) || ERROR_INSERTED(9002))
&& c_error_9000_nodes_mask.get(i))
continue; continue;
#endif #endif
@ -697,6 +698,7 @@ Cmvmi::execTEST_ORD(Signal * signal){
// Do nothing // Do nothing
break; break;
} }
globalSignalLoggers.flushSignalLog();
} }
#endif #endif
@ -1144,9 +1146,9 @@ Cmvmi::execDUMP_STATE_ORD(Signal* signal)
} }
#ifdef ERROR_INSERT #ifdef ERROR_INSERT
if (arg == 9000) if (arg == 9000 || arg == 9002)
{ {
SET_ERROR_INSERT_VALUE(9000); SET_ERROR_INSERT_VALUE(arg);
for (Uint32 i = 1; i<signal->getLength(); i++) for (Uint32 i = 1; i<signal->getLength(); i++)
c_error_9000_nodes_mask.set(signal->theData[i]); c_error_9000_nodes_mask.set(signal->theData[i]);
} }
@ -1193,6 +1195,17 @@ Cmvmi::execDUMP_STATE_ORD(Signal* signal)
#endif #endif
}//Cmvmi::execDUMP_STATE_ORD() }//Cmvmi::execDUMP_STATE_ORD()
void
Cmvmi::execNODE_START_REP(Signal* signal)
{
#ifdef ERROR_INSERT
if (ERROR_INSERTED(9002) && signal->theData[0] == getOwnNodeId())
{
signal->theData[0] = 9001;
execDUMP_STATE_ORD(signal);
}
#endif
}
BLOCK_FUNCTIONS(Cmvmi) BLOCK_FUNCTIONS(Cmvmi)

View File

@ -72,7 +72,8 @@ private:
void handleSET_VAR_REQ(Signal* signal); void handleSET_VAR_REQ(Signal* signal);
void execTESTSIG(Signal* signal); void execTESTSIG(Signal* signal);
void execNODE_START_REP(Signal* signal);
char theErrorMessage[256]; char theErrorMessage[256];
void sendSTTORRY(Signal* signal); void sendSTTORRY(Signal* signal);

View File

@ -10184,6 +10184,15 @@ Dbdict::checkDictLockQueue(Signal* signal, bool poll)
break; break;
} }
if (c_blockState != BS_IDLE)
{
/**
* If state is BS_NODE_FAILURE, it might be that no op is running
*/
jam();
break;
}
ndbrequire(c_blockState == BS_IDLE); ndbrequire(c_blockState == BS_IDLE);
lockPtr.p->locked = true; lockPtr.p->locked = true;
c_blockState = lockPtr.p->lt->blockState; c_blockState = lockPtr.p->lt->blockState;

View File

@ -1602,7 +1602,7 @@ private:
void startInfoReply(Signal *, Uint32 nodeId); void startInfoReply(Signal *, Uint32 nodeId);
// DIH specifics for execNODE_START_REP (sendDictUnlockOrd) // DIH specifics for execNODE_START_REP (sendDictUnlockOrd)
void exec_node_start_rep(Signal* signal); void execNODE_START_REP(Signal* signal);
/* /*
* Lock master DICT. Only current use is by starting node * Lock master DICT. Only current use is by starting node

View File

@ -269,7 +269,8 @@ Dbdih::Dbdih(const class Configuration & config):
addRecSignal(GSN_DICT_LOCK_CONF, &Dbdih::execDICT_LOCK_CONF); addRecSignal(GSN_DICT_LOCK_CONF, &Dbdih::execDICT_LOCK_CONF);
addRecSignal(GSN_DICT_LOCK_REF, &Dbdih::execDICT_LOCK_REF); addRecSignal(GSN_DICT_LOCK_REF, &Dbdih::execDICT_LOCK_REF);
addRecSignal(GSN_NODE_START_REP, &Dbdih::execNODE_START_REP, true);
apiConnectRecord = 0; apiConnectRecord = 0;
connectRecord = 0; connectRecord = 0;
fileRecord = 0; fileRecord = 0;

View File

@ -1390,7 +1390,7 @@ void Dbdih::execNDB_STTOR(Signal* signal)
}//Dbdih::execNDB_STTOR() }//Dbdih::execNDB_STTOR()
void void
Dbdih::exec_node_start_rep(Signal* signal) Dbdih::execNODE_START_REP(Signal* signal)
{ {
/* /*
* Send DICT_UNLOCK_ORD when this node is SL_STARTED. * Send DICT_UNLOCK_ORD when this node is SL_STARTED.

View File

@ -236,6 +236,7 @@
#define ZOPERATION_EVENT_REP 19 #define ZOPERATION_EVENT_REP 19
#define ZPREP_DROP_TABLE 20 #define ZPREP_DROP_TABLE 20
#define ZENABLE_EXPAND_CHECK 21 #define ZENABLE_EXPAND_CHECK 21
#define ZRETRY_TCKEYREF 22
/* ------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------- */
/* NODE STATE DURING SYSTEM RESTART, VARIABLES CNODES_SR_STATE */ /* NODE STATE DURING SYSTEM RESTART, VARIABLES CNODES_SR_STATE */
@ -2276,6 +2277,7 @@ private:
void releaseScanrec(Signal* signal); void releaseScanrec(Signal* signal);
void seizeScanrec(Signal* signal); void seizeScanrec(Signal* signal);
Uint32 sendKeyinfo20(Signal* signal, ScanRecord *, TcConnectionrec *); Uint32 sendKeyinfo20(Signal* signal, ScanRecord *, TcConnectionrec *);
void sendTCKEYREF(Signal*, Uint32 dst, Uint32 route, Uint32 cnt);
void sendScanFragConf(Signal* signal, Uint32 scanCompleted); void sendScanFragConf(Signal* signal, Uint32 scanCompleted);
void initCopyrec(Signal* signal); void initCopyrec(Signal* signal);
void initCopyTc(Signal* signal); void initCopyTc(Signal* signal);

View File

@ -464,6 +464,22 @@ void Dblqh::execCONTINUEB(Signal* signal)
return; return;
} }
} }
case ZRETRY_TCKEYREF:
{
jam();
Uint32 cnt = signal->theData[1];
Uint32 ref = signal->theData[2];
if (cnt < (10 * 60 * 5))
{
jam();
/**
* Only retry for 5 minutes...then hope that API has handled it..somehow
*/
memmove(signal->theData, signal->theData+3, 4*TcKeyRef::SignalLength);
sendTCKEYREF(signal, ref, 0, cnt);
}
return;
}
default: default:
ndbrequire(false); ndbrequire(false);
break; break;
@ -2370,7 +2386,7 @@ void Dblqh::noFreeRecordLab(Signal* signal,
tcKeyRef->transId[0] = transid1; tcKeyRef->transId[0] = transid1;
tcKeyRef->transId[1] = transid2; tcKeyRef->transId[1] = transid2;
tcKeyRef->errorCode = errCode; tcKeyRef->errorCode = errCode;
sendSignal(apiRef, GSN_TCKEYREF, signal, TcKeyRef::SignalLength, JBB); sendTCKEYREF(signal, apiRef, signal->getSendersBlockRef(), 0);
} else { } else {
jam(); jam();
@ -6576,8 +6592,7 @@ void Dblqh::continueAfterLogAbortWriteLab(Signal* signal)
tcKeyRef->transId[0] = regTcPtr->transid[0]; tcKeyRef->transId[0] = regTcPtr->transid[0];
tcKeyRef->transId[1] = regTcPtr->transid[1]; tcKeyRef->transId[1] = regTcPtr->transid[1];
tcKeyRef->errorCode = regTcPtr->errorCode; tcKeyRef->errorCode = regTcPtr->errorCode;
sendSignal(regTcPtr->applRef, sendTCKEYREF(signal, regTcPtr->applRef, regTcPtr->clientBlockref, 0);
GSN_TCKEYREF, signal, TcKeyRef::SignalLength, JBB);
cleanUp(signal); cleanUp(signal);
return; return;
}//if }//if
@ -6612,6 +6627,29 @@ void Dblqh::continueAfterLogAbortWriteLab(Signal* signal)
cleanUp(signal); cleanUp(signal);
}//Dblqh::continueAfterLogAbortWriteLab() }//Dblqh::continueAfterLogAbortWriteLab()
void
Dblqh::sendTCKEYREF(Signal* signal, Uint32 ref, Uint32 routeRef, Uint32 cnt)
{
const Uint32 nodeId = refToNode(ref);
const bool connectedToNode = getNodeInfo(nodeId).m_connected;
if (likely(connectedToNode))
{
jam();
sendSignal(ref, GSN_TCKEYREF, signal, TcKeyRef::SignalLength, JBB);
}
else
{
jam();
memmove(signal->theData + 3, signal->theData, 4*TcKeyRef::SignalLength);
signal->theData[0] = ZRETRY_TCKEYREF;
signal->theData[1] = cnt + 1;
signal->theData[2] = ref;
sendSignalWithDelay(reference(), GSN_CONTINUEB, signal, 100,
TcKeyRef::SignalLength + 3);
}
}
/* ########################################################################## /* ##########################################################################
* ####### MODULE TO HANDLE TC FAILURE ####### * ####### MODULE TO HANDLE TC FAILURE #######
* *

View File

@ -942,7 +942,8 @@ public:
NF_CHECK_SCAN = 0x2, NF_CHECK_SCAN = 0x2,
NF_CHECK_TRANSACTION = 0x4, NF_CHECK_TRANSACTION = 0x4,
NF_CHECK_DROP_TAB = 0x8, NF_CHECK_DROP_TAB = 0x8,
NF_NODE_FAIL_BITS = 0xF // All bits... NF_NODE_FAIL_BITS = 0xF, // All bits...
NF_STARTED = 0x10
}; };
Uint32 m_nf_bits; Uint32 m_nf_bits;
NdbNodeBitmask m_lqh_trans_conf; NdbNodeBitmask m_lqh_trans_conf;
@ -1319,6 +1320,7 @@ private:
void execCOMMITCONF(Signal* signal); void execCOMMITCONF(Signal* signal);
void execABORTCONF(Signal* signal); void execABORTCONF(Signal* signal);
void execNODE_FAILREP(Signal* signal); void execNODE_FAILREP(Signal* signal);
void execNODE_START_REP(Signal* signal);
void execINCL_NODEREQ(Signal* signal); void execINCL_NODEREQ(Signal* signal);
void execTIME_SIGNAL(Signal* signal); void execTIME_SIGNAL(Signal* signal);
void execAPI_FAILREQ(Signal* signal); void execAPI_FAILREQ(Signal* signal);

View File

@ -290,7 +290,8 @@ Dbtc::Dbtc(const class Configuration & conf):
addRecSignal(GSN_WAIT_DROP_TAB_CONF, &Dbtc::execWAIT_DROP_TAB_CONF); addRecSignal(GSN_WAIT_DROP_TAB_CONF, &Dbtc::execWAIT_DROP_TAB_CONF);
addRecSignal(GSN_ALTER_TAB_REQ, &Dbtc::execALTER_TAB_REQ); addRecSignal(GSN_ALTER_TAB_REQ, &Dbtc::execALTER_TAB_REQ);
addRecSignal(GSN_NODE_START_REP, &Dbtc::execNODE_START_REP, true);
cacheRecord = 0; cacheRecord = 0;
apiConnectRecord = 0; apiConnectRecord = 0;
tcConnectRecord = 0; tcConnectRecord = 0;

View File

@ -3068,7 +3068,28 @@ void Dbtc::tckeyreq050Lab(Signal* signal)
}//if }//if
}//for }//for
} }
}//if
if (regTcPtr->tcNodedata[0] != getOwnNodeId())
{
jam();
for (Uint32 i = 0; i < tnoOfBackup + 1; i++)
{
HostRecordPtr hostPtr;
hostPtr.i = regTcPtr->tcNodedata[i];
ptrCheckGuard(hostPtr, chostFilesize, hostRecord);
if (hostPtr.p->m_nf_bits & HostRecord::NF_STARTED)
{
jam();
if (i != 0)
{
jam();
regTcPtr->tcNodedata[0] = hostPtr.i;
}
break;
}
}
}//if
}
jam(); jam();
regTcPtr->lastReplicaNo = 0; regTcPtr->lastReplicaNo = 0;
regTcPtr->noOfNodes = 1; regTcPtr->noOfNodes = 1;
@ -7002,6 +7023,19 @@ void Dbtc::execNODE_FAILREP(Signal* signal)
} }
}//Dbtc::execNODE_FAILREP() }//Dbtc::execNODE_FAILREP()
void
Dbtc::execNODE_START_REP(Signal* signal)
{
Uint32 nodeId = signal->theData[0];
hostptr.i = nodeId;
ptrCheckGuard(hostptr, chostFilesize, hostRecord);
if (hostptr.p->m_nf_bits == 0)
{
jam();
hostptr.p->m_nf_bits |= HostRecord::NF_STARTED;
}
}
void void
Dbtc::checkNodeFailComplete(Signal* signal, Dbtc::checkNodeFailComplete(Signal* signal,
Uint32 failedNodeId, Uint32 failedNodeId,
@ -8701,6 +8735,20 @@ void Dbtc::execSCAN_TABREQ(Signal* signal)
* IF ANY TO RECEIVE. * IF ANY TO RECEIVE.
**********************************************************/ **********************************************************/
scanptr.p->scanState = ScanRecord::WAIT_AI; scanptr.p->scanState = ScanRecord::WAIT_AI;
if (ERROR_INSERTED(8038))
{
/**
* Force API_FAILREQ
*/
DisconnectRep * const rep = (DisconnectRep *)signal->getDataPtrSend();
rep->nodeId = refToNode(apiConnectptr.p->ndbapiBlockref);
rep->err = 8038;
EXECUTE_DIRECT(CMVMI, GSN_DISCONNECT_REP, signal, 2);
CLEAR_ERROR_INSERT_VALUE;
}
return; return;
SCAN_error_check: SCAN_error_check:
@ -8790,6 +8838,7 @@ void Dbtc::initScanrec(ScanRecordPtr scanptr,
jam(); jam();
ScanFragRecPtr ptr; ScanFragRecPtr ptr;
ndbrequire(list.seize(ptr)); ndbrequire(list.seize(ptr));
ptr.p->scanFragState = ScanFragRec::IDLE;
ptr.p->scanRec = scanptr.i; ptr.p->scanRec = scanptr.i;
ptr.p->scanFragId = 0; ptr.p->scanFragId = 0;
ptr.p->m_apiPtr = cdata[i]; ptr.p->m_apiPtr = cdata[i];
@ -9582,9 +9631,17 @@ Dbtc::close_scan_req(Signal* signal, ScanRecordPtr scanPtr, bool req_received){
ScanRecord* scanP = scanPtr.p; ScanRecord* scanP = scanPtr.p;
ndbrequire(scanPtr.p->scanState != ScanRecord::IDLE); ndbrequire(scanPtr.p->scanState != ScanRecord::IDLE);
ScanRecord::ScanState old = scanPtr.p->scanState;
scanPtr.p->scanState = ScanRecord::CLOSING_SCAN; scanPtr.p->scanState = ScanRecord::CLOSING_SCAN;
scanPtr.p->m_close_scan_req = req_received; scanPtr.p->m_close_scan_req = req_received;
if (old == ScanRecord::WAIT_FRAGMENT_COUNT)
{
jam();
scanPtr.p->scanState = old;
return; // Will continue on execDI_FCOUNTCONF
}
/** /**
* Queue : Action * Queue : Action
* ============= : ================= * ============= : =================
@ -9612,11 +9669,22 @@ Dbtc::close_scan_req(Signal* signal, ScanRecordPtr scanPtr, bool req_received){
ScanFragRecPtr curr = ptr; // Remove while iterating... ScanFragRecPtr curr = ptr; // Remove while iterating...
running.next(ptr); running.next(ptr);
if(curr.p->scanFragState == ScanFragRec::WAIT_GET_PRIMCONF){ switch(curr.p->scanFragState){
case ScanFragRec::IDLE:
jam(); // real early abort
ndbrequire(old == ScanRecord::WAIT_AI);
running.release(curr);
continue;
case ScanFragRec::WAIT_GET_PRIMCONF:
jam(); jam();
continue; continue;
case ScanFragRec::LQH_ACTIVE:
jam();
break;
default:
jamLine(curr.p->scanFragState);
ndbrequire(false);
} }
ndbrequire(curr.p->scanFragState == ScanFragRec::LQH_ACTIVE);
curr.p->startFragTimer(ctcTimer); curr.p->startFragTimer(ctcTimer);
curr.p->scanFragState = ScanFragRec::LQH_ACTIVE; curr.p->scanFragState = ScanFragRec::LQH_ACTIVE;

View File

@ -196,6 +196,7 @@
#define ZTRY_TO_UPDATE_ERROR 888 #define ZTRY_TO_UPDATE_ERROR 888
#define ZCALL_ERROR 890 #define ZCALL_ERROR 890
#define ZTEMPORARY_RESOURCE_FAILURE 891 #define ZTEMPORARY_RESOURCE_FAILURE 891
#define ZUNSUPPORTED_BRANCH 892
#define ZSTORED_SEIZE_ATTRINBUFREC_ERROR 873 // Part of Scan #define ZSTORED_SEIZE_ATTRINBUFREC_ERROR 873 // Part of Scan

View File

@ -444,6 +444,11 @@ int Dbtup::TUPKEY_abort(Signal* signal, int error_type)
}//if }//if
break; break;
case 40:
ljam();
terrorCode = ZUNSUPPORTED_BRANCH;
break;
default: default:
ndbrequire(false); ndbrequire(false);
break; break;

View File

@ -1876,6 +1876,11 @@ int Dbtup::interpreterNextLab(Signal* signal,
// NULL==NULL and NULL<not-NULL // NULL==NULL and NULL<not-NULL
res1 = r1_null && r2_null ? 0 : r1_null ? -1 : 1; res1 = r1_null && r2_null ? 0 : r1_null ? -1 : 1;
} else { } else {
jam();
if (unlikely(sqlType.m_cmp == 0))
{
return TUPKEY_abort(signal, 40);
}
res1 = (*sqlType.m_cmp)(cs, s1, attrLen, s2, argLen, true); res1 = (*sqlType.m_cmp)(cs, s1, attrLen, s2, argLen, true);
} }
} else { } else {
@ -1883,6 +1888,11 @@ int Dbtup::interpreterNextLab(Signal* signal,
// NULL like NULL is true (has no practical use) // NULL like NULL is true (has no practical use)
res1 = r1_null && r2_null ? 0 : -1; res1 = r1_null && r2_null ? 0 : -1;
} else { } else {
jam();
if (unlikely(sqlType.m_like == 0))
{
return TUPKEY_abort(signal, 40);
}
res1 = (*sqlType.m_like)(cs, s1, attrLen, s2, argLen); res1 = (*sqlType.m_like)(cs, s1, attrLen, s2, argLen);
} }
} }

View File

@ -658,24 +658,26 @@ SimulatedBlock::allocRecord(const char * type, size_t s, size_t n, bool clear)
void * p = NULL; void * p = NULL;
size_t size = n*s; size_t size = n*s;
Uint64 real_size = (Uint64)((Uint64)n)*((Uint64)s);
refresh_watch_dog(); refresh_watch_dog();
if (size > 0){ if (real_size > 0){
#ifdef VM_TRACE_MEM #ifdef VM_TRACE_MEM
ndbout_c("%s::allocRecord(%s, %u, %u) = %u bytes", ndbout_c("%s::allocRecord(%s, %u, %u) = %llu bytes",
getBlockName(number()), getBlockName(number()),
type, type,
s, s,
n, n,
size); real_size);
#endif #endif
p = ndbd_malloc(size); if( real_size == (Uint64)size )
p = ndbd_malloc(size);
if (p == NULL){ if (p == NULL){
char buf1[255]; char buf1[255];
char buf2[255]; char buf2[255];
BaseString::snprintf(buf1, sizeof(buf1), "%s could not allocate memory for %s", BaseString::snprintf(buf1, sizeof(buf1), "%s could not allocate memory for %s",
getBlockName(number()), type); getBlockName(number()), type);
BaseString::snprintf(buf2, sizeof(buf2), "Requested: %ux%u = %u bytes", BaseString::snprintf(buf2, sizeof(buf2), "Requested: %ux%u = %llu bytes",
(Uint32)s, (Uint32)n, (Uint32)size); (Uint32)s, (Uint32)n, (Uint64)real_size);
ERROR_SET(fatal, NDBD_EXIT_MEMALLOC, buf1, buf2); ERROR_SET(fatal, NDBD_EXIT_MEMALLOC, buf1, buf2);
} }
@ -916,15 +918,6 @@ SimulatedBlock::execCONTINUE_FRAGMENTED(Signal * signal){
void void
SimulatedBlock::execNODE_START_REP(Signal* signal) SimulatedBlock::execNODE_START_REP(Signal* signal)
{
// common stuff for all blocks
// block specific stuff by virtual method override (default empty)
exec_node_start_rep(signal);
}
void
SimulatedBlock::exec_node_start_rep(Signal* signal)
{ {
} }

View File

@ -424,7 +424,6 @@ private:
void execSIGNAL_DROPPED_REP(Signal* signal); void execSIGNAL_DROPPED_REP(Signal* signal);
void execCONTINUE_FRAGMENTED(Signal* signal); void execCONTINUE_FRAGMENTED(Signal* signal);
void execNODE_START_REP(Signal* signal); void execNODE_START_REP(Signal* signal);
virtual void exec_node_start_rep(Signal* signal);
Uint32 c_fragmentIdCounter; Uint32 c_fragmentIdCounter;
ArrayPool<FragmentInfo> c_fragmentInfoPool; ArrayPool<FragmentInfo> c_fragmentInfoPool;

View File

@ -1337,7 +1337,7 @@ Ndb_mgmd_event_service::log(int eventType, const Uint32* theData, NodeId nodeId)
if (EventLoggerBase::event_lookup(eventType,cat,threshold,severity,textF)) if (EventLoggerBase::event_lookup(eventType,cat,threshold,severity,textF))
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
char m_text[256]; char m_text[512];
EventLogger::getText(m_text, sizeof(m_text), EventLogger::getText(m_text, sizeof(m_text),
textF, theData, nodeId); textF, theData, nodeId);
@ -1353,6 +1353,15 @@ Ndb_mgmd_event_service::log(int eventType, const Uint32* theData, NodeId nodeId)
if (ndb_logevent_body[i].index_fn) if (ndb_logevent_body[i].index_fn)
val= (*(ndb_logevent_body[i].index_fn))(val); val= (*(ndb_logevent_body[i].index_fn))(val);
str.appfmt("%s=%d\n",ndb_logevent_body[i].token, val); str.appfmt("%s=%d\n",ndb_logevent_body[i].token, val);
if(strcmp(ndb_logevent_body[i].token,"error") == 0)
{
int m_text_len= strlen(m_text);
if(sizeof(m_text)-m_text_len-3 > 0)
{
BaseString::snprintf(m_text+m_text_len, 4 , " - ");
ndb_error_string(val, m_text+(m_text_len+3), sizeof(m_text)-m_text_len-3);
}
}
} }
Vector<NDB_SOCKET_TYPE> copy; Vector<NDB_SOCKET_TYPE> copy;

View File

@ -923,7 +923,10 @@ NdbTransaction::release(){
* The user did not perform any rollback but simply closed the * The user did not perform any rollback but simply closed the
* transaction. We must rollback Ndb since Ndb have been contacted. * transaction. We must rollback Ndb since Ndb have been contacted.
************************************************************************/ ************************************************************************/
execute(Rollback); if (!theSimpleState)
{
execute(Rollback);
}
}//if }//if
theMagicNumber = 0xFE11DC; theMagicNumber = 0xFE11DC;
theInUseState = false; theInUseState = false;

View File

@ -278,6 +278,7 @@ ErrorBundle ErrorCodes[] = {
{ 885, AE, "Stack underflow in interpreter" }, { 885, AE, "Stack underflow in interpreter" },
{ 886, AE, "More than 65535 instructions executed in interpreter" }, { 886, AE, "More than 65535 instructions executed in interpreter" },
{ 897, AE, "Update attempt of primary key via ndbcluster internal api (if this occurs via the MySQL server it is a bug, please report)" }, { 897, AE, "Update attempt of primary key via ndbcluster internal api (if this occurs via the MySQL server it is a bug, please report)" },
{ 892, AE, "Unsupported type in scan filter" },
{ 4256, AE, "Must call Ndb::init() before this function" }, { 4256, AE, "Must call Ndb::init() before this function" },
{ 4257, AE, "Tried to read too much - too many getValue calls" }, { 4257, AE, "Tried to read too much - too many getValue calls" },
@ -680,11 +681,14 @@ const char *ndberror_classification_message(ndberror_classification classificati
return empty_string; return empty_string;
} }
int ndb_error_string(int err_no, char *str, unsigned int size) int ndb_error_string(int err_no, char *str, int size)
{ {
ndberror_struct error; ndberror_struct error;
unsigned int len; int len;
assert(size > 1);
if(size <= 1)
return 0;
error.code = err_no; error.code = err_no;
ndberror_update(&error); ndberror_update(&error);

View File

@ -1893,7 +1893,7 @@ runDictOps(NDBT_Context* ctx, NDBT_Step* step)
// create indexes // create indexes
const char** indlist = NDBT_Tables::getIndexes(tabName); const char** indlist = NDBT_Tables::getIndexes(tabName);
uint indnum = 0; uint indnum = 0;
while (*indlist != 0) { while (indlist != 0 && *indlist != 0) {
uint count = 0; uint count = 0;
try_create_index: try_create_index:
count++; count++;

View File

@ -23,6 +23,7 @@
#include <Vector.hpp> #include <Vector.hpp>
#include <signaldata/DumpStateOrd.hpp> #include <signaldata/DumpStateOrd.hpp>
#include <Bitmask.hpp> #include <Bitmask.hpp>
#include <RefConvert.hpp>
int runLoadTable(NDBT_Context* ctx, NDBT_Step* step){ int runLoadTable(NDBT_Context* ctx, NDBT_Step* step){
@ -919,6 +920,41 @@ int runBug20185(NDBT_Context* ctx, NDBT_Step* step){
return NDBT_OK; return NDBT_OK;
} }
int runBug24717(NDBT_Context* ctx, NDBT_Step* step){
int result = NDBT_OK;
int loops = ctx->getNumLoops();
int records = ctx->getNumRecords();
NdbRestarter restarter;
Ndb* pNdb = GETNDB(step);
HugoTransactions hugoTrans(*ctx->getTab());
int dump[] = { 9002, 0 } ;
Uint32 ownNode = refToNode(pNdb->getReference());
dump[1] = ownNode;
for (; loops; loops --)
{
int nodeId = restarter.getRandomNotMasterNodeId(rand());
restarter.restartOneDbNode(nodeId, false, true, true);
restarter.waitNodesNoStart(&nodeId, 1);
if (restarter.dumpStateOneNode(nodeId, dump, 2))
return NDBT_FAILED;
restarter.startNodes(&nodeId, 1);
for (Uint32 i = 0; i < 100; i++)
{
hugoTrans.pkReadRecords(pNdb, 100, 1, NdbOperation::LM_CommittedRead);
}
restarter.waitClusterStarted();
}
return NDBT_OK;
}
NDBT_TESTSUITE(testNodeRestart); NDBT_TESTSUITE(testNodeRestart);
TESTCASE("NoLoad", TESTCASE("NoLoad",
@ -1232,6 +1268,9 @@ TESTCASE("Bug20185",
STEP(runBug20185); STEP(runBug20185);
FINALIZER(runClearTable); FINALIZER(runClearTable);
} }
TESTCASE("Bug24717", ""){
INITIALIZER(runBug24717);
}
NDBT_TESTSUITE_END(testNodeRestart); NDBT_TESTSUITE_END(testNodeRestart);
int main(int argc, const char** argv){ int main(int argc, const char** argv){

View File

@ -630,7 +630,7 @@ int runRestarter(NDBT_Context* ctx, NDBT_Step* step){
int nodeId = restarter.getDbNodeId(lastId); int nodeId = restarter.getDbNodeId(lastId);
lastId = (lastId + 1) % restarter.getNumDbNodes(); lastId = (lastId + 1) % restarter.getNumDbNodes();
if(restarter.restartOneDbNode(nodeId) != 0){ if(restarter.restartOneDbNode(nodeId, false, false, true) != 0){
g_err << "Failed to restartNextDbNode" << endl; g_err << "Failed to restartNextDbNode" << endl;
result = NDBT_FAILED; result = NDBT_FAILED;
break; break;
@ -1125,6 +1125,39 @@ runScanParallelism(NDBT_Context* ctx, NDBT_Step* step){
return NDBT_OK; return NDBT_OK;
} }
int
runBug24447(NDBT_Context* ctx, NDBT_Step* step){
int loops = 1; //ctx->getNumLoops();
int records = ctx->getNumRecords();
int abort = ctx->getProperty("AbortProb", 15);
NdbRestarter restarter;
HugoTransactions hugoTrans(*ctx->getTab());
int i = 0;
while (i<loops && !ctx->isTestStopped())
{
g_info << i++ << ": ";
int nodeId = restarter.getRandomNotMasterNodeId(rand());
if (nodeId == -1)
nodeId = restarter.getMasterNodeId();
if (restarter.insertErrorInNode(nodeId, 8038) != 0)
{
ndbout << "Could not insert error in node="<<nodeId<<endl;
return NDBT_FAILED;
}
for (Uint32 j = 0; i<10; i++)
{
hugoTrans.scanReadRecords(GETNDB(step), records, abort, 0,
NdbOperation::LM_CommittedRead);
}
}
restarter.insertErrorInAllNodes(0);
return NDBT_OK;
}
NDBT_TESTSUITE(testScan); NDBT_TESTSUITE(testScan);
TESTCASE("ScanRead", TESTCASE("ScanRead",
"Verify scan requirement: It should be possible "\ "Verify scan requirement: It should be possible "\
@ -1603,6 +1636,12 @@ TESTCASE("ScanParallelism",
STEP(runScanParallelism); STEP(runScanParallelism);
FINALIZER(runClearTable); FINALIZER(runClearTable);
} }
TESTCASE("Bug24447",
""){
INITIALIZER(runLoadTable);
STEP(runBug24447);
FINALIZER(runClearTable);
}
NDBT_TESTSUITE_END(testScan); NDBT_TESTSUITE_END(testScan);
int main(int argc, const char** argv){ int main(int argc, const char** argv){

View File

@ -413,6 +413,10 @@ max-time: 500
cmd: testScan cmd: testScan
args: -n ScanParallelism args: -n ScanParallelism
max-time: 500
cmd: testScan
args: -n Bug24447 T1
max-time: 500 max-time: 500
cmd: testNodeRestart cmd: testNodeRestart
args: -n Bug15587 T1 args: -n Bug15587 T1
@ -453,6 +457,10 @@ max-time: 1000
cmd: testIndex cmd: testIndex
args: -n Bug21384 args: -n Bug21384
max-time: 1000
cmd: testNodeRestart
args: -n Bug24717 T1
# OLD FLEX # OLD FLEX
max-time: 500 max-time: 500
cmd: flexBench cmd: flexBench
@ -510,7 +518,7 @@ args: -n TemporaryTables T1 T6 T7 T8
max-time: 1500 max-time: 1500
cmd: testDict cmd: testDict
args: -n Restart_NR2 T1 args: -n Restart_NR2 T1 I3
# #
# TEST NDBAPI # TEST NDBAPI

View File

@ -37,7 +37,6 @@ static int g_verbose = 0;
static int try_reconnect = 3; static int try_reconnect = 3;
static int g_nodes, g_connections, g_section; static int g_nodes, g_connections, g_section;
static const char * g_connectstring = 0;
static const char * g_query = 0; static const char * g_query = 0;
static int g_nodeid = 0; static int g_nodeid = 0;
@ -486,7 +485,7 @@ fetch_configuration()
ndb_mgm_set_error_stream(mgm, stderr); ndb_mgm_set_error_stream(mgm, stderr);
if (ndb_mgm_set_connectstring(mgm, g_connectstring)) if (ndb_mgm_set_connectstring(mgm, opt_connect_str))
{ {
fprintf(stderr, "* %5d: %s\n", fprintf(stderr, "* %5d: %s\n",
ndb_mgm_get_latest_error(mgm), ndb_mgm_get_latest_error(mgm),

View File

@ -1059,6 +1059,7 @@ int ha_ndbcluster::build_index_list(Ndb *ndb, TABLE *tab, enum ILBP phase)
int error= 0; int error= 0;
const char *index_name; const char *index_name;
char unique_index_name[FN_LEN]; char unique_index_name[FN_LEN];
bool null_in_unique_index= false;
static const char* unique_suffix= "$unique"; static const char* unique_suffix= "$unique";
KEY* key_info= tab->key_info; KEY* key_info= tab->key_info;
const char **key_name= tab->s->keynames.type_names; const char **key_name= tab->s->keynames.type_names;
@ -1096,8 +1097,14 @@ int ha_ndbcluster::build_index_list(Ndb *ndb, TABLE *tab, enum ILBP phase)
error= create_unique_index(unique_index_name, key_info); error= create_unique_index(unique_index_name, key_info);
break; break;
case UNIQUE_INDEX: case UNIQUE_INDEX:
if (!(error= check_index_fields_not_null(i))) if (check_index_fields_not_null(i))
error= create_unique_index(unique_index_name, key_info); {
push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_WARN,
ER_NULL_COLUMN_IN_INDEX,
"Ndb does not support unique index on NULL valued attributes, index access with NULL value will become full table scan");
null_in_unique_index= true;
}
error= create_unique_index(unique_index_name, key_info);
break; break;
case ORDERED_INDEX: case ORDERED_INDEX:
error= create_ordered_index(index_name, key_info); error= create_ordered_index(index_name, key_info);
@ -1129,6 +1136,11 @@ int ha_ndbcluster::build_index_list(Ndb *ndb, TABLE *tab, enum ILBP phase)
m_index[i].unique_index= (void *) index; m_index[i].unique_index= (void *) index;
error= fix_unique_index_attr_order(m_index[i], index, key_info); error= fix_unique_index_attr_order(m_index[i], index, key_info);
} }
if (idx_type == UNIQUE_INDEX &&
phase != ILBP_CREATE &&
check_index_fields_not_null(i))
null_in_unique_index= true;
m_index[i].null_in_unique_index= null_in_unique_index;
} }
DBUG_RETURN(error); DBUG_RETURN(error);
@ -1150,7 +1162,7 @@ NDB_INDEX_TYPE ha_ndbcluster::get_index_type_from_table(uint inx) const
ORDERED_INDEX); ORDERED_INDEX);
} }
int ha_ndbcluster::check_index_fields_not_null(uint inx) bool ha_ndbcluster::check_index_fields_not_null(uint inx)
{ {
KEY* key_info= table->key_info + inx; KEY* key_info= table->key_info + inx;
KEY_PART_INFO* key_part= key_info->key_part; KEY_PART_INFO* key_part= key_info->key_part;
@ -1161,14 +1173,10 @@ int ha_ndbcluster::check_index_fields_not_null(uint inx)
{ {
Field* field= key_part->field; Field* field= key_part->field;
if (field->maybe_null()) if (field->maybe_null())
{ DBUG_RETURN(true);
my_printf_error(ER_NULL_COLUMN_IN_INDEX,ER(ER_NULL_COLUMN_IN_INDEX),
MYF(0),field->field_name);
DBUG_RETURN(ER_NULL_COLUMN_IN_INDEX);
}
} }
DBUG_RETURN(0); DBUG_RETURN(false);
} }
void ha_ndbcluster::release_metadata() void ha_ndbcluster::release_metadata()
@ -1261,6 +1269,12 @@ inline NDB_INDEX_TYPE ha_ndbcluster::get_index_type(uint idx_no) const
return m_index[idx_no].type; return m_index[idx_no].type;
} }
inline bool ha_ndbcluster::has_null_in_unique_index(uint idx_no) const
{
DBUG_ASSERT(idx_no < MAX_KEY);
return m_index[idx_no].null_in_unique_index;
}
/* /*
Get the flags for an index Get the flags for an index
@ -1596,7 +1610,7 @@ bool ha_ndbcluster::check_all_operations_for_error(NdbTransaction *trans,
* primary key or unique index values * primary key or unique index values
*/ */
int ha_ndbcluster::peek_indexed_rows(const byte *record) int ha_ndbcluster::peek_indexed_rows(const byte *record, bool check_pk)
{ {
NdbTransaction *trans= m_active_trans; NdbTransaction *trans= m_active_trans;
NdbOperation *op; NdbOperation *op;
@ -1609,7 +1623,7 @@ int ha_ndbcluster::peek_indexed_rows(const byte *record)
(NdbOperation::LockMode)get_ndb_lock_type(m_lock.type); (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
first= NULL; first= NULL;
if (table->s->primary_key != MAX_KEY) if (check_pk && table->s->primary_key != MAX_KEY)
{ {
/* /*
* Fetch any row with colliding primary key * Fetch any row with colliding primary key
@ -2089,6 +2103,42 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
DBUG_RETURN(next_result(buf)); DBUG_RETURN(next_result(buf));
} }
/*
Unique index scan in NDB (full table scan with scan filter)
*/
int ha_ndbcluster::unique_index_scan(const KEY* key_info,
const byte *key,
uint key_len,
byte *buf)
{
int res;
NdbScanOperation *op;
NdbTransaction *trans= m_active_trans;
DBUG_ENTER("unique_index_scan");
DBUG_PRINT("enter", ("Starting new scan on %s", m_tabname));
NdbOperation::LockMode lm=
(NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
bool need_pk = (lm == NdbOperation::LM_Read);
if (!(op=trans->getNdbScanOperation((const NDBTAB *) m_table)) ||
op->readTuples(lm,
(need_pk)?NdbScanOperation::SF_KeyInfo:0,
parallelism))
ERR_RETURN(trans->getNdbError());
m_active_cursor= op;
if (generate_scan_filter_from_key(op, key_info, key, key_len, buf))
DBUG_RETURN(ndb_err(trans));
if ((res= define_read_attrs(buf, op)))
DBUG_RETURN(res);
if (execute_no_commit(this,trans,false) != 0)
DBUG_RETURN(ndb_err(trans));
DBUG_PRINT("exit", ("Scan started successfully"));
DBUG_RETURN(next_result(buf));
}
/* /*
Start full table scan in NDB Start full table scan in NDB
*/ */
@ -2166,7 +2216,7 @@ int ha_ndbcluster::write_row(byte *record)
start_bulk_insert will set parameters to ensure that each start_bulk_insert will set parameters to ensure that each
write_row is committed individually write_row is committed individually
*/ */
int peek_res= peek_indexed_rows(record); int peek_res= peek_indexed_rows(record, true);
if (!peek_res) if (!peek_res)
{ {
@ -2335,8 +2385,26 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
NdbScanOperation* cursor= m_active_cursor; NdbScanOperation* cursor= m_active_cursor;
NdbOperation *op; NdbOperation *op;
uint i; uint i;
bool pk_update= (table->s->primary_key != MAX_KEY &&
key_cmp(table->s->primary_key, old_data, new_data));
DBUG_ENTER("update_row"); DBUG_ENTER("update_row");
/*
* If IGNORE the ignore constraint violations on primary and unique keys,
* but check that it is not part of INSERT ... ON DUPLICATE KEY UPDATE
*/
if (m_ignore_dup_key && thd->lex->sql_command == SQLCOM_UPDATE)
{
int peek_res= peek_indexed_rows(new_data, pk_update);
if (!peek_res)
{
DBUG_RETURN(HA_ERR_FOUND_DUPP_KEY);
}
if (peek_res != HA_ERR_KEY_NOT_FOUND)
DBUG_RETURN(peek_res);
}
statistic_increment(thd->status_var.ha_update_count, &LOCK_status); statistic_increment(thd->status_var.ha_update_count, &LOCK_status);
if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE) if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE)
{ {
@ -2346,8 +2414,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
} }
/* Check for update of primary key for special handling */ /* Check for update of primary key for special handling */
if ((table->s->primary_key != MAX_KEY) && if (pk_update)
(key_cmp(table->s->primary_key, old_data, new_data)))
{ {
int read_res, insert_res, delete_res, undo_res; int read_res, insert_res, delete_res, undo_res;
@ -2763,7 +2830,7 @@ int ha_ndbcluster::index_read(byte *buf,
} }
else if (type == UNIQUE_INDEX) else if (type == UNIQUE_INDEX)
{ {
DBUG_RETURN(1); DBUG_RETURN(unique_index_scan(key_info, key, key_len, buf));
} }
break; break;
case ORDERED_INDEX: case ORDERED_INDEX:
@ -2856,12 +2923,13 @@ int ha_ndbcluster::read_range_first_to_buf(const key_range *start_key,
bool eq_r, bool sorted, bool eq_r, bool sorted,
byte* buf) byte* buf)
{ {
KEY* key_info; ndb_index_type type= get_index_type(active_index);
KEY* key_info;
int error= 1; int error= 1;
DBUG_ENTER("ha_ndbcluster::read_range_first_to_buf"); DBUG_ENTER("ha_ndbcluster::read_range_first_to_buf");
DBUG_PRINT("info", ("eq_r: %d, sorted: %d", eq_r, sorted)); DBUG_PRINT("info", ("eq_r: %d, sorted: %d", eq_r, sorted));
switch (get_index_type(active_index)){ switch (type){
case PRIMARY_KEY_ORDERED_INDEX: case PRIMARY_KEY_ORDERED_INDEX:
case PRIMARY_KEY_INDEX: case PRIMARY_KEY_INDEX:
key_info= table->key_info + active_index; key_info= table->key_info + active_index;
@ -2887,6 +2955,14 @@ int ha_ndbcluster::read_range_first_to_buf(const key_range *start_key,
error= unique_index_read(start_key->key, start_key->length, buf); error= unique_index_read(start_key->key, start_key->length, buf);
DBUG_RETURN(error == HA_ERR_KEY_NOT_FOUND ? HA_ERR_END_OF_FILE : error); DBUG_RETURN(error == HA_ERR_KEY_NOT_FOUND ? HA_ERR_END_OF_FILE : error);
} }
else if (type == UNIQUE_INDEX)
{
error= unique_index_scan(key_info,
start_key->key,
start_key->length,
buf);
DBUG_RETURN(error == HA_ERR_KEY_NOT_FOUND ? HA_ERR_END_OF_FILE : error);
}
break; break;
default: default:
break; break;
@ -6117,6 +6193,30 @@ ha_ndbcluster::release_completed_operations(NdbTransaction *trans,
trans->releaseCompletedOperations(); trans->releaseCompletedOperations();
} }
bool
ha_ndbcluster::null_value_index_search(KEY_MULTI_RANGE *ranges,
KEY_MULTI_RANGE *end_range,
HANDLER_BUFFER *buffer)
{
DBUG_ENTER("null_value_index_search");
KEY* key_info= table->key_info + active_index;
KEY_MULTI_RANGE *range= ranges;
ulong reclength= table->s->reclength;
byte *curr= (byte*)buffer->buffer;
byte *end_of_buffer= (byte*)buffer->buffer_end;
for (; range<end_range && curr+reclength <= end_of_buffer;
range++)
{
const byte *key= range->start_key.key;
uint key_len= range->start_key.length;
if (check_null_in_key(key_info, key, key_len))
DBUG_RETURN(true);
curr += reclength;
}
DBUG_RETURN(false);
}
int int
ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p, ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p,
KEY_MULTI_RANGE *ranges, KEY_MULTI_RANGE *ranges,
@ -6133,11 +6233,14 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p,
NdbOperation* op; NdbOperation* op;
Thd_ndb *thd_ndb= get_thd_ndb(current_thd); Thd_ndb *thd_ndb= get_thd_ndb(current_thd);
if (uses_blob_value(m_retrieve_all_fields)) /**
* blobs and unique hash index with NULL can't be batched currently
*/
if (uses_blob_value(m_retrieve_all_fields) ||
(index_type == UNIQUE_INDEX &&
has_null_in_unique_index(active_index) &&
null_value_index_search(ranges, ranges+range_count, buffer)))
{ {
/**
* blobs can't be batched currently
*/
m_disable_multi_read= TRUE; m_disable_multi_read= TRUE;
DBUG_RETURN(handler::read_multi_range_first(found_range_p, DBUG_RETURN(handler::read_multi_range_first(found_range_p,
ranges, ranges,
@ -6193,7 +6296,6 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p,
goto range; goto range;
/* fall through */ /* fall through */
case PRIMARY_KEY_INDEX: case PRIMARY_KEY_INDEX:
{
multi_range_curr->range_flag |= UNIQUE_RANGE; multi_range_curr->range_flag |= UNIQUE_RANGE;
if ((op= m_active_trans->getNdbOperation(tab)) && if ((op= m_active_trans->getNdbOperation(tab)) &&
!op->readTuple(lm) && !op->readTuple(lm) &&
@ -6204,8 +6306,6 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p,
else else
ERR_RETURN(op ? op->getNdbError() : m_active_trans->getNdbError()); ERR_RETURN(op ? op->getNdbError() : m_active_trans->getNdbError());
break; break;
}
break;
case UNIQUE_ORDERED_INDEX: case UNIQUE_ORDERED_INDEX:
if (!(multi_range_curr->start_key.length == key_info->key_length && if (!(multi_range_curr->start_key.length == key_info->key_length &&
multi_range_curr->start_key.flag == HA_READ_KEY_EXACT && multi_range_curr->start_key.flag == HA_READ_KEY_EXACT &&
@ -6214,18 +6314,16 @@ ha_ndbcluster::read_multi_range_first(KEY_MULTI_RANGE **found_range_p,
goto range; goto range;
/* fall through */ /* fall through */
case UNIQUE_INDEX: case UNIQUE_INDEX:
{
multi_range_curr->range_flag |= UNIQUE_RANGE; multi_range_curr->range_flag |= UNIQUE_RANGE;
if ((op= m_active_trans->getNdbIndexOperation(unique_idx, tab)) && if ((op= m_active_trans->getNdbIndexOperation(unique_idx, tab)) &&
!op->readTuple(lm) && !op->readTuple(lm) &&
!set_index_key(op, key_info, multi_range_curr->start_key.key) && !set_index_key(op, key_info, multi_range_curr->start_key.key) &&
!define_read_attrs(curr, op) && !define_read_attrs(curr, op) &&
(op->setAbortOption(AO_IgnoreError), TRUE)) (op->setAbortOption(AO_IgnoreError), TRUE))
curr += reclength; curr += reclength;
else else
ERR_RETURN(op ? op->getNdbError() : m_active_trans->getNdbError()); ERR_RETURN(op ? op->getNdbError() : m_active_trans->getNdbError());
break; break;
}
case ORDERED_INDEX: case ORDERED_INDEX:
{ {
range: range:
@ -7968,31 +8066,12 @@ ha_ndbcluster::generate_scan_filter(Ndb_cond_stack *ndb_cond_stack,
NdbScanOperation *op) NdbScanOperation *op)
{ {
DBUG_ENTER("generate_scan_filter"); DBUG_ENTER("generate_scan_filter");
if (ndb_cond_stack) if (ndb_cond_stack)
{ {
DBUG_PRINT("info", ("Generating scan filter"));
NdbScanFilter filter(op); NdbScanFilter filter(op);
bool multiple_cond= FALSE;
// Wrap an AND group around multiple conditions DBUG_RETURN(generate_scan_filter_from_cond(ndb_cond_stack, filter));
if (ndb_cond_stack->next) {
multiple_cond= TRUE;
if (filter.begin() == -1)
DBUG_RETURN(1);
}
for (Ndb_cond_stack *stack= ndb_cond_stack;
(stack);
stack= stack->next)
{
Ndb_cond *cond= stack->ndb_cond;
if (build_scan_filter(cond, &filter))
{
DBUG_PRINT("info", ("build_scan_filter failed"));
DBUG_RETURN(1);
}
}
if (multiple_cond && filter.end() == -1)
DBUG_RETURN(1);
} }
else else
{ {
@ -8002,6 +8081,88 @@ ha_ndbcluster::generate_scan_filter(Ndb_cond_stack *ndb_cond_stack,
DBUG_RETURN(0); DBUG_RETURN(0);
} }
int
ha_ndbcluster::generate_scan_filter_from_cond(Ndb_cond_stack *ndb_cond_stack,
NdbScanFilter& filter)
{
DBUG_ENTER("generate_scan_filter_from_cond");
bool multiple_cond= FALSE;
DBUG_PRINT("info", ("Generating scan filter"));
// Wrap an AND group around multiple conditions
if (ndb_cond_stack->next)
{
multiple_cond= TRUE;
if (filter.begin() == -1)
DBUG_RETURN(1);
}
for (Ndb_cond_stack *stack= ndb_cond_stack;
(stack);
stack= stack->next)
{
Ndb_cond *cond= stack->ndb_cond;
if (build_scan_filter(cond, &filter))
{
DBUG_PRINT("info", ("build_scan_filter failed"));
DBUG_RETURN(1);
}
}
if (multiple_cond && filter.end() == -1)
DBUG_RETURN(1);
DBUG_RETURN(0);
}
int ha_ndbcluster::generate_scan_filter_from_key(NdbScanOperation *op,
const KEY* key_info,
const byte *key,
uint key_len,
byte *buf)
{
KEY_PART_INFO* key_part= key_info->key_part;
KEY_PART_INFO* end= key_part+key_info->key_parts;
NdbScanFilter filter(op);
int res;
DBUG_ENTER("generate_scan_filter_from_key");
filter.begin(NdbScanFilter::AND);
for (; key_part != end; key_part++)
{
Field* field= key_part->field;
uint32 pack_len= field->pack_length();
const byte* ptr= key;
char buf[256];
DBUG_PRINT("info", ("Filtering value for %s", field->field_name));
DBUG_DUMP("key", (char*)ptr, pack_len);
if (key_part->null_bit)
{
DBUG_PRINT("info", ("Generating ISNULL filter"));
if (filter.isnull(key_part->fieldnr-1) == -1)
DBUG_RETURN(1);
}
else
{
DBUG_PRINT("info", ("Generating EQ filter"));
if (filter.cmp(NdbScanFilter::COND_EQ,
key_part->fieldnr-1,
ptr,
pack_len) == -1)
DBUG_RETURN(1);
}
key += key_part->store_length;
}
// Add any pushed condition
if (m_cond_stack &&
(res= generate_scan_filter_from_cond(m_cond_stack, filter)))
DBUG_RETURN(res);
if (filter.end() == -1)
DBUG_RETURN(1);
DBUG_RETURN(0);
}
int int
ndbcluster_show_status(THD* thd) ndbcluster_show_status(THD* thd)
{ {

View File

@ -56,6 +56,7 @@ typedef struct ndb_index_data {
void *index; void *index;
void *unique_index; void *unique_index;
unsigned char *unique_index_attrid_map; unsigned char *unique_index_attrid_map;
bool null_in_unique_index;
} NDB_INDEX_DATA; } NDB_INDEX_DATA;
typedef struct st_ndbcluster_share { typedef struct st_ndbcluster_share {
@ -546,7 +547,9 @@ class ha_ndbcluster: public handler
KEY_MULTI_RANGE*ranges, uint range_count, KEY_MULTI_RANGE*ranges, uint range_count,
bool sorted, HANDLER_BUFFER *buffer); bool sorted, HANDLER_BUFFER *buffer);
int read_multi_range_next(KEY_MULTI_RANGE **found_range_p); int read_multi_range_next(KEY_MULTI_RANGE **found_range_p);
bool null_value_index_search(KEY_MULTI_RANGE *ranges,
KEY_MULTI_RANGE *end_range,
HANDLER_BUFFER *buffer);
bool get_error_message(int error, String *buf); bool get_error_message(int error, String *buf);
int info(uint); int info(uint);
int extra(enum ha_extra_function operation); int extra(enum ha_extra_function operation);
@ -649,7 +652,8 @@ private:
void release_metadata(); void release_metadata();
NDB_INDEX_TYPE get_index_type(uint idx_no) const; NDB_INDEX_TYPE get_index_type(uint idx_no) const;
NDB_INDEX_TYPE get_index_type_from_table(uint index_no) const; NDB_INDEX_TYPE get_index_type_from_table(uint index_no) const;
int check_index_fields_not_null(uint index_no); bool has_null_in_unique_index(uint idx_no) const;
bool check_index_fields_not_null(uint index_no);
int pk_read(const byte *key, uint key_len, byte *buf); int pk_read(const byte *key, uint key_len, byte *buf);
int complemented_pk_read(const byte *old_data, byte *new_data); int complemented_pk_read(const byte *old_data, byte *new_data);
@ -657,12 +661,17 @@ private:
const NdbOperation *first, const NdbOperation *first,
const NdbOperation *last, const NdbOperation *last,
uint errcode); uint errcode);
int peek_indexed_rows(const byte *record); int peek_indexed_rows(const byte *record, bool check_pk);
int unique_index_read(const byte *key, uint key_len, int unique_index_read(const byte *key, uint key_len,
byte *buf); byte *buf);
int ordered_index_scan(const key_range *start_key, int ordered_index_scan(const key_range *start_key,
const key_range *end_key, const key_range *end_key,
bool sorted, bool descending, byte* buf); bool sorted, bool descending, byte* buf);
int unique_index_scan(const KEY* key_info,
const byte *key,
uint key_len,
byte *buf);
int full_table_scan(byte * buf); int full_table_scan(byte * buf);
int fetch_next(NdbScanOperation* op); int fetch_next(NdbScanOperation* op);
int next_result(byte *buf); int next_result(byte *buf);
@ -725,6 +734,13 @@ bool uses_blob_value(bool all_fields);
int build_scan_filter(Ndb_cond* &cond, NdbScanFilter* filter); int build_scan_filter(Ndb_cond* &cond, NdbScanFilter* filter);
int generate_scan_filter(Ndb_cond_stack* cond_stack, int generate_scan_filter(Ndb_cond_stack* cond_stack,
NdbScanOperation* op); NdbScanOperation* op);
int generate_scan_filter_from_cond(Ndb_cond_stack* cond_stack,
NdbScanFilter& filter);
int generate_scan_filter_from_key(NdbScanOperation* op,
const KEY* key_info,
const byte *key,
uint key_len,
byte *buf);
friend int execute_commit(ha_ndbcluster*, NdbTransaction*); friend int execute_commit(ha_ndbcluster*, NdbTransaction*);
friend int execute_no_commit(ha_ndbcluster*, NdbTransaction*, bool); friend int execute_no_commit(ha_ndbcluster*, NdbTransaction*, bool);