Merge bk-internal.mysql.com:/home/bk/mysql-5.1-new
into mysql.com:/home/dlenev/src/mysql-5.1-merges
This commit is contained in:
commit
a440d2268f
@ -27,6 +27,7 @@ rpl_sp : Bug #16456
|
||||
#ndb_dd_disk2memory : Bug #16466
|
||||
ndb_autodiscover : Needs to be fixed w.r.t binlog
|
||||
ndb_autodiscover2 : Needs to be fixed w.r.t binlog
|
||||
ndb_blob : BLOB replication causes core in master1 (Pekka will fix)
|
||||
system_mysql_db : Needs fixing
|
||||
system_mysql_db_fix : Needs fixing
|
||||
#ndb_alter_table_row : sometimes wrong error 1015!=1046
|
||||
|
@ -2729,7 +2729,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
|
||||
{
|
||||
// sometimes get TE_ALTER with invalid table
|
||||
DBUG_ASSERT(pOp->getEventType() == NdbDictionary::Event::TE_ALTER ||
|
||||
! IS_NDB_BLOB_PREFIX(pOp->getTable()->getName()));
|
||||
! IS_NDB_BLOB_PREFIX(pOp->getEvent()->getTable()->getName()));
|
||||
ndb->
|
||||
setReportThreshEventGCISlip(ndb_report_thresh_binlog_epoch_slip);
|
||||
ndb->setReportThreshEventFreeMem(ndb_report_thresh_binlog_mem_usage);
|
||||
@ -2870,7 +2870,7 @@ err:
|
||||
DBUG_PRINT("info",("removing all event operations"));
|
||||
while ((op= ndb->getEventOperation()))
|
||||
{
|
||||
DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(op->getTable()->getName()));
|
||||
DBUG_ASSERT(! IS_NDB_BLOB_PREFIX(op->getEvent()->getTable()->getName()));
|
||||
DBUG_PRINT("info",("removing event operation on %s",
|
||||
op->getEvent()->getName()));
|
||||
NDB_SHARE *share= (NDB_SHARE*) op->getCustomData();
|
||||
|
@ -754,7 +754,7 @@ extern const GlobalSignalNumber NO_OF_SIGNAL_NAMES;
|
||||
#define GSN_SUB_SYNC_REQ 582
|
||||
#define GSN_SUB_SYNC_REF 583
|
||||
#define GSN_SUB_SYNC_CONF 584
|
||||
#define GSN_SUB_META_DATA 585
|
||||
/* 585 unused */
|
||||
#define GSN_SUB_TABLE_DATA 586
|
||||
|
||||
#define GSN_CREATE_TABLE_REQ 587
|
||||
|
@ -33,6 +33,7 @@ class AlterTabReq {
|
||||
friend class Dbdih;
|
||||
friend class Dbtc;
|
||||
friend class Dblqh;
|
||||
friend class Suma;
|
||||
|
||||
/**
|
||||
* For printing
|
||||
@ -103,7 +104,6 @@ class AlterTabConf {
|
||||
friend class Dbtc;
|
||||
friend class Dblqh;
|
||||
friend class Dbtup;
|
||||
friend class Suma;
|
||||
|
||||
/**
|
||||
* For printing
|
||||
|
@ -36,6 +36,7 @@ class AlterTableReq {
|
||||
* Sender(s) / Reciver(s)
|
||||
*/
|
||||
friend class NdbTableImpl;
|
||||
friend class NdbEventOperationImpl;
|
||||
friend class NdbDictInterface;
|
||||
friend class Dbdict;
|
||||
|
||||
|
@ -277,22 +277,6 @@ struct SubSyncConf {
|
||||
Uint32 senderData;
|
||||
};
|
||||
|
||||
struct SubMetaData {
|
||||
/**
|
||||
* Sender(s)/Reciver(s)
|
||||
*/
|
||||
friend struct SumaParticipant;
|
||||
friend struct Grep;
|
||||
|
||||
friend bool printSUB_META_DATA(FILE *, const Uint32 *, Uint32, Uint16);
|
||||
STATIC_CONST( SignalLength = 3 );
|
||||
SECTION( DICT_TAB_INFO = 0 );
|
||||
|
||||
Uint32 gci;
|
||||
Uint32 senderData;
|
||||
Uint32 tableId;
|
||||
};
|
||||
|
||||
struct SubTableData {
|
||||
/**
|
||||
* Sender(s)/Reciver(s)
|
||||
@ -301,7 +285,11 @@ struct SubTableData {
|
||||
friend struct Grep;
|
||||
|
||||
friend bool printSUB_TABLE_DATA(FILE *, const Uint32 *, Uint32, Uint16);
|
||||
STATIC_CONST( SignalLength = 5 );
|
||||
STATIC_CONST( SignalLength = 7 );
|
||||
SECTION( DICT_TAB_INFO = 0 );
|
||||
SECTION( ATTR_INFO = 0 );
|
||||
SECTION( AFTER_VALUES = 1 );
|
||||
SECTION( BEFORE_VALUES = 2 );
|
||||
|
||||
enum LogType {
|
||||
SCAN = 1,
|
||||
@ -317,6 +305,8 @@ struct SubTableData {
|
||||
Uint8 ndbd_nodeid;
|
||||
Uint8 not_used3;
|
||||
Uint32 logType;
|
||||
Uint32 changeMask;
|
||||
Uint32 totalLen;
|
||||
};
|
||||
|
||||
struct SubSyncContinueReq {
|
||||
|
@ -162,6 +162,7 @@ public:
|
||||
|
||||
class Table; // forward declaration
|
||||
class Tablespace; // forward declaration
|
||||
// class NdbEventOperation; // forward declaration
|
||||
|
||||
/**
|
||||
* @class Column
|
||||
@ -885,6 +886,7 @@ public:
|
||||
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
|
||||
friend class NdbDictionaryImpl;
|
||||
friend class NdbTableImpl;
|
||||
friend class NdbEventOperationImpl;
|
||||
#endif
|
||||
class NdbTableImpl & m_impl;
|
||||
Table(NdbTableImpl&);
|
||||
@ -1182,6 +1184,12 @@ public:
|
||||
* Get unique identifier for the event
|
||||
*/
|
||||
const char *getName() const;
|
||||
/**
|
||||
* Get table that the event is defined on
|
||||
*
|
||||
* @return pointer to table or NULL if no table has been defined
|
||||
*/
|
||||
const NdbDictionary::Table * getTable() const;
|
||||
/**
|
||||
* Define table on which events should be detected
|
||||
*
|
||||
|
@ -176,6 +176,26 @@ public:
|
||||
*/
|
||||
NdbDictionary::Event::TableEvent getEventType() const;
|
||||
|
||||
/**
|
||||
* Check if table name has changed, for event TE_ALTER
|
||||
*/
|
||||
const bool tableNameChanged() const;
|
||||
|
||||
/**
|
||||
* Check if table frm has changed, for event TE_ALTER
|
||||
*/
|
||||
const bool tableFrmChanged() const;
|
||||
|
||||
/**
|
||||
* Check if table fragmentation has changed, for event TE_ALTER
|
||||
*/
|
||||
const bool tableFragmentationChanged() const;
|
||||
|
||||
/**
|
||||
* Check if table range partition list name has changed, for event TE_ALTER
|
||||
*/
|
||||
const bool tableRangeListChanged() const;
|
||||
|
||||
/**
|
||||
* Retrieve the GCI of the latest retrieved event
|
||||
*
|
||||
@ -200,14 +220,13 @@ public:
|
||||
|
||||
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
|
||||
/** these are subject to change at any time */
|
||||
const NdbDictionary::Table *getTable() const;
|
||||
const NdbDictionary::Event *getEvent() const;
|
||||
const NdbRecAttr *getFirstPkAttr() const;
|
||||
const NdbRecAttr *getFirstPkPreAttr() const;
|
||||
const NdbRecAttr *getFirstDataAttr() const;
|
||||
const NdbRecAttr *getFirstDataPreAttr() const;
|
||||
|
||||
bool validateTable(NdbDictionary::Table &table) const;
|
||||
// bool validateTable(NdbDictionary::Table &table) const;
|
||||
|
||||
void setCustomData(void * data);
|
||||
void * getCustomData() const;
|
||||
|
@ -166,7 +166,6 @@ SignalDataPrintFunctions[] = {
|
||||
{ GSN_SUB_SYNC_REQ, printSUB_SYNC_REQ },
|
||||
{ GSN_SUB_SYNC_REF, printSUB_SYNC_REF },
|
||||
{ GSN_SUB_SYNC_CONF, printSUB_SYNC_CONF },
|
||||
{ GSN_SUB_META_DATA, printSUB_META_DATA },
|
||||
{ GSN_SUB_TABLE_DATA, printSUB_TABLE_DATA },
|
||||
{ GSN_SUB_SYNC_CONTINUE_REQ, printSUB_SYNC_CONTINUE_REQ },
|
||||
{ GSN_SUB_SYNC_CONTINUE_REF, printSUB_SYNC_CONTINUE_REF },
|
||||
|
@ -540,7 +540,6 @@ const GsnName SignalNames [] = {
|
||||
,{ GSN_SUB_SYNC_REQ, "SUB_SYNC_REQ" }
|
||||
,{ GSN_SUB_SYNC_REF, "SUB_SYNC_REF" }
|
||||
,{ GSN_SUB_SYNC_CONF, "SUB_SYNC_CONF" }
|
||||
,{ GSN_SUB_META_DATA, "SUB_META_DATA" }
|
||||
,{ GSN_SUB_TABLE_DATA, "SUB_TABLE_DATA" }
|
||||
,{ GSN_SUB_SYNC_CONTINUE_REQ, "SUB_SYNC_CONTINUE_REQ" }
|
||||
,{ GSN_SUB_SYNC_CONTINUE_REF, "SUB_SYNC_CONTINUE_REF" }
|
||||
|
@ -169,17 +169,6 @@ printSUB_SYNC_CONF(FILE * output, const Uint32 * theData,
|
||||
return false;
|
||||
}
|
||||
|
||||
bool
|
||||
printSUB_META_DATA(FILE * output, const Uint32 * theData,
|
||||
Uint32 len, Uint16 receiverBlockNo) {
|
||||
const SubMetaData * const sig = (SubMetaData *)theData;
|
||||
fprintf(output, " gci: %x\n", sig->gci);
|
||||
fprintf(output, " senderData: %x\n", sig->senderData);
|
||||
fprintf(output, " senderData: %x\n", sig->senderData);
|
||||
fprintf(output, " tableId: %x\n", sig->tableId);
|
||||
return false;
|
||||
}
|
||||
|
||||
bool
|
||||
printSUB_TABLE_DATA(FILE * output, const Uint32 * theData,
|
||||
Uint32 len, Uint16 receiverBlockNo) {
|
||||
|
@ -4666,12 +4666,7 @@ Dbdict::alterTab_writeSchemaConf(Signal* signal,
|
||||
|
||||
SegmentedSectionPtr tabInfoPtr;
|
||||
getSection(tabInfoPtr, alterTabPtr.p->m_tabInfoPtrI);
|
||||
|
||||
writeTableFile(signal, tableId, tabInfoPtr, &callback);
|
||||
|
||||
alterTabPtr.p->m_tabInfoPtrI = RNIL;
|
||||
signal->setSection(tabInfoPtr, 0);
|
||||
releaseSections(signal);
|
||||
}
|
||||
|
||||
void
|
||||
@ -4685,8 +4680,32 @@ Dbdict::alterTab_writeTableConf(Signal* signal,
|
||||
Uint32 coordinatorRef = alterTabPtr.p->m_coordinatorRef;
|
||||
TableRecordPtr tabPtr;
|
||||
c_tableRecordPool.getPtr(tabPtr, alterTabPtr.p->m_alterTableId);
|
||||
|
||||
// Alter table commit request handled successfully
|
||||
// Inform Suma so it can send events to any subscribers of the table
|
||||
AlterTabReq * req = (AlterTabReq*)signal->getDataPtrSend();
|
||||
if (coordinatorRef == reference())
|
||||
req->senderRef = alterTabPtr.p->m_senderRef;
|
||||
else
|
||||
req->senderRef = 0;
|
||||
req->senderData = callbackData;
|
||||
req->tableId = tabPtr.p->tableId;
|
||||
req->tableVersion = tabPtr.p->tableVersion;
|
||||
req->gci = tabPtr.p->gciTableCreated;
|
||||
req->requestType = AlterTabReq::AlterTableCommit;
|
||||
req->changeMask = alterTabPtr.p->m_changeMask;
|
||||
SegmentedSectionPtr tabInfoPtr;
|
||||
getSection(tabInfoPtr, alterTabPtr.p->m_tabInfoPtrI);
|
||||
signal->setSection(tabInfoPtr, AlterTabReq::DICT_TAB_INFO);
|
||||
#ifndef DBUG_OFF
|
||||
ndbout_c("DICT_TAB_INFO in DICT");
|
||||
SimplePropertiesSectionReader reader(tabInfoPtr, getSectionSegmentPool());
|
||||
reader.printAll(ndbout);
|
||||
#endif
|
||||
EXECUTE_DIRECT(SUMA, GSN_ALTER_TAB_REQ, signal,
|
||||
AlterTabReq::SignalLength);
|
||||
releaseSections(signal);
|
||||
alterTabPtr.p->m_tabInfoPtrI = RNIL;
|
||||
jamEntry();
|
||||
AlterTabConf * conf = (AlterTabConf*)signal->getDataPtrSend();
|
||||
conf->senderRef = reference();
|
||||
conf->senderData = callbackData;
|
||||
@ -4694,17 +4713,7 @@ Dbdict::alterTab_writeTableConf(Signal* signal,
|
||||
conf->tableVersion = tabPtr.p->tableVersion;
|
||||
conf->gci = tabPtr.p->gciTableCreated;
|
||||
conf->requestType = AlterTabReq::AlterTableCommit;
|
||||
{
|
||||
AlterTabConf tmp= *conf;
|
||||
if (coordinatorRef == reference())
|
||||
conf->senderRef = alterTabPtr.p->m_senderRef;
|
||||
else
|
||||
conf->senderRef = 0;
|
||||
EXECUTE_DIRECT(SUMA, GSN_ALTER_TAB_CONF, signal,
|
||||
AlterTabConf::SignalLength);
|
||||
jamEntry();
|
||||
*conf= tmp;
|
||||
}
|
||||
conf->changeMask = alterTabPtr.p->m_changeMask;
|
||||
sendSignal(coordinatorRef, GSN_ALTER_TAB_CONF, signal,
|
||||
AlterTabConf::SignalLength, JBB);
|
||||
|
||||
|
@ -2559,8 +2559,9 @@ Suma::reportAllSubscribers(Signal *signal,
|
||||
data->gci = m_last_complete_gci + 1;
|
||||
data->tableId = subPtr.p->m_tableId;
|
||||
data->operation = table_event;
|
||||
data->logType = 0;
|
||||
data->ndbd_nodeid = refToNode(reference());
|
||||
data->changeMask = 0;
|
||||
data->totalLen = 0;
|
||||
|
||||
TablePtr tabPtr;
|
||||
c_tables.getPtr(tabPtr, subPtr.p->m_table_ptrI);
|
||||
@ -3177,7 +3178,9 @@ Suma::execFIRE_TRIG_ORD(Signal* signal)
|
||||
LinearSectionPtr ptr[3];
|
||||
const Uint32 nptr= reformat(signal, ptr,
|
||||
f_buffer, sz, b_buffer, b_trigBufferSize);
|
||||
|
||||
Uint32 ptrLen= 0;
|
||||
for(Uint32 i =0; i < nptr; i++)
|
||||
ptrLen+= ptr[i].sz;
|
||||
/**
|
||||
* Signal to subscriber(s)
|
||||
*/
|
||||
@ -3188,6 +3191,8 @@ Suma::execFIRE_TRIG_ORD(Signal* signal)
|
||||
data->tableId = tabPtr.p->m_tableId;
|
||||
data->operation = event;
|
||||
data->logType = 0;
|
||||
data->changeMask = 0;
|
||||
data->totalLen = ptrLen;
|
||||
|
||||
{
|
||||
LocalDLList<Subscriber> list(c_subscriberPool,tabPtr.p->c_subscribers);
|
||||
@ -3435,17 +3440,19 @@ Suma::execDROP_TAB_CONF(Signal *signal)
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
|
||||
static Uint32 b_dti_buf[10000];
|
||||
|
||||
void
|
||||
Suma::execALTER_TAB_CONF(Signal *signal)
|
||||
Suma::execALTER_TAB_REQ(Signal *signal)
|
||||
{
|
||||
jamEntry();
|
||||
DBUG_ENTER("Suma::execALTER_TAB_CONF");
|
||||
ndbassert(signal->getNoOfSections() == 0);
|
||||
|
||||
AlterTabConf * const conf = (AlterTabConf*)signal->getDataPtr();
|
||||
Uint32 senderRef= conf->senderRef;
|
||||
Uint32 tableId= conf->tableId;
|
||||
DBUG_ENTER("Suma::execALTER_TAB_REQ");
|
||||
ndbassert(signal->getNoOfSections() == 1);
|
||||
|
||||
AlterTabReq * const req = (AlterTabReq*)signal->getDataPtr();
|
||||
Uint32 senderRef= req->senderRef;
|
||||
Uint32 tableId= req->tableId;
|
||||
Uint32 changeMask= req->changeMask;
|
||||
TablePtr tabPtr;
|
||||
if (!c_tables.find(tabPtr, tableId) ||
|
||||
tabPtr.p->m_state == Table::DROPPED ||
|
||||
@ -3464,13 +3471,30 @@ Suma::execALTER_TAB_CONF(Signal *signal)
|
||||
DBUG_VOID_RETURN;
|
||||
}
|
||||
// dict coordinator sends info to API
|
||||
|
||||
|
||||
// Copy DICT_TAB_INFO to local buffer
|
||||
SegmentedSectionPtr tabInfoPtr;
|
||||
signal->getSection(tabInfoPtr, AlterTabReq::DICT_TAB_INFO);
|
||||
#ifndef DBUG_OFF
|
||||
ndbout_c("DICT_TAB_INFO in SUMA, tabInfoPtr.sz = %d", tabInfoPtr.sz);
|
||||
SimplePropertiesSectionReader reader(tabInfoPtr, getSectionSegmentPool());
|
||||
reader.printAll(ndbout);
|
||||
#endif
|
||||
copy(b_dti_buf, tabInfoPtr);
|
||||
LinearSectionPtr ptr[3];
|
||||
ptr[0].p = b_dti_buf;
|
||||
ptr[0].sz = tabInfoPtr.sz;
|
||||
|
||||
releaseSections(signal);
|
||||
|
||||
SubTableData * data = (SubTableData*)signal->getDataPtrSend();
|
||||
data->gci = m_last_complete_gci+1;
|
||||
data->tableId = tableId;
|
||||
data->operation = NdbDictionary::Event::_TE_ALTER;
|
||||
data->req_nodeid = refToNode(senderRef);
|
||||
|
||||
data->logType = 0;
|
||||
data->changeMask = changeMask;
|
||||
data->totalLen = tabInfoPtr.sz;
|
||||
{
|
||||
LocalDLList<Subscriber> subbs(c_subscriberPool,tabPtr.p->c_subscribers);
|
||||
SubscriberPtr subbPtr;
|
||||
@ -3490,8 +3514,9 @@ Suma::execALTER_TAB_CONF(Signal *signal)
|
||||
}
|
||||
|
||||
data->senderData= subbPtr.p->m_senderData;
|
||||
sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
|
||||
SubTableData::SignalLength, JBB);
|
||||
Callback c = { 0, 0 };
|
||||
sendFragmentedSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
|
||||
SubTableData::SignalLength, JBB, ptr, 1, c);
|
||||
DBUG_PRINT("info",("sent to subscriber %d", subbPtr.i));
|
||||
}
|
||||
}
|
||||
@ -4668,7 +4693,9 @@ Suma::resend_bucket(Signal* signal, Uint32 buck, Uint32 min_gci,
|
||||
const Uint32 nptr= reformat(signal, ptr,
|
||||
src, sz_1,
|
||||
src + sz_1, sz - 2 - sz_1);
|
||||
|
||||
Uint32 ptrLen= 0;
|
||||
for(Uint32 i =0; i < nptr; i++)
|
||||
ptrLen+= ptr[i].sz;
|
||||
/**
|
||||
* Signal to subscriber(s)
|
||||
*/
|
||||
@ -4680,6 +4707,8 @@ Suma::resend_bucket(Signal* signal, Uint32 buck, Uint32 min_gci,
|
||||
data->tableId = tabPtr.p->m_tableId;
|
||||
data->operation = event;
|
||||
data->logType = 0;
|
||||
data->changeMask = 0;
|
||||
data->totalLen = ptrLen;
|
||||
|
||||
{
|
||||
LocalDLList<Subscriber> list(c_subscriberPool,tabPtr.p->c_subscribers);
|
||||
|
@ -70,7 +70,7 @@ public:
|
||||
void execGET_TABLEID_REF(Signal* signal);
|
||||
|
||||
void execDROP_TAB_CONF(Signal* signal);
|
||||
void execALTER_TAB_CONF(Signal* signal);
|
||||
void execALTER_TAB_REQ(Signal* signal);
|
||||
void execCREATE_TAB_CONF(Signal* signal);
|
||||
/**
|
||||
* Scan interface
|
||||
|
@ -76,7 +76,7 @@ Suma::Suma(const Configuration & conf) :
|
||||
* Dict interface
|
||||
*/
|
||||
addRecSignal(GSN_DROP_TAB_CONF, &Suma::execDROP_TAB_CONF);
|
||||
addRecSignal(GSN_ALTER_TAB_CONF, &Suma::execALTER_TAB_CONF);
|
||||
addRecSignal(GSN_ALTER_TAB_REQ, &Suma::execALTER_TAB_REQ);
|
||||
addRecSignal(GSN_CREATE_TAB_CONF, &Suma::execCREATE_TAB_CONF);
|
||||
|
||||
#if 0
|
||||
|
@ -833,6 +833,12 @@ NdbDictionary::Event::setTable(const Table& table)
|
||||
m_impl.setTable(table);
|
||||
}
|
||||
|
||||
const NdbDictionary::Table *
|
||||
NdbDictionary::Event::getTable() const
|
||||
{
|
||||
return m_impl.getTable();
|
||||
}
|
||||
|
||||
void
|
||||
NdbDictionary::Event::setTable(const char * table)
|
||||
{
|
||||
|
@ -1069,7 +1069,6 @@ void NdbEventImpl::init()
|
||||
{
|
||||
m_eventId= RNIL;
|
||||
m_eventKey= RNIL;
|
||||
m_tableId= RNIL;
|
||||
mi_type= 0;
|
||||
m_dur= NdbDictionary::Event::ED_UNDEFINED;
|
||||
m_mergeEvents = false;
|
||||
@ -1081,6 +1080,8 @@ NdbEventImpl::~NdbEventImpl()
|
||||
{
|
||||
for (unsigned i = 0; i < m_columns.size(); i++)
|
||||
delete m_columns[i];
|
||||
if (m_tableImpl)
|
||||
delete m_tableImpl;
|
||||
}
|
||||
|
||||
void NdbEventImpl::setName(const char * name)
|
||||
@ -1096,10 +1097,29 @@ const char *NdbEventImpl::getName() const
|
||||
void
|
||||
NdbEventImpl::setTable(const NdbDictionary::Table& table)
|
||||
{
|
||||
m_tableImpl= &NdbTableImpl::getImpl(table);
|
||||
setTable(&NdbTableImpl::getImpl(table));
|
||||
m_tableName.assign(m_tableImpl->getName());
|
||||
}
|
||||
|
||||
void
|
||||
NdbEventImpl::setTable(NdbTableImpl *tableImpl)
|
||||
{
|
||||
DBUG_ASSERT(tableImpl->m_status != NdbDictionary::Object::Invalid);
|
||||
if (!m_tableImpl)
|
||||
m_tableImpl = new NdbTableImpl();
|
||||
// Copy table, since event might be accessed from different threads
|
||||
m_tableImpl->assign(*tableImpl);
|
||||
}
|
||||
|
||||
const NdbDictionary::Table *
|
||||
NdbEventImpl::getTable() const
|
||||
{
|
||||
if (m_tableImpl)
|
||||
return m_tableImpl->m_facade;
|
||||
else
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void
|
||||
NdbEventImpl::setTable(const char * table)
|
||||
{
|
||||
@ -1248,6 +1268,21 @@ NdbDictionaryImpl::fetchGlobalTableImpl(const BaseString& internalTableName)
|
||||
return info;
|
||||
}
|
||||
|
||||
void
|
||||
NdbDictionaryImpl::putTable(NdbTableImpl *impl)
|
||||
{
|
||||
m_globalHash->lock();
|
||||
m_globalHash->put(impl->m_internalName.c_str(), impl);
|
||||
m_globalHash->unlock();
|
||||
Ndb_local_table_info *info=
|
||||
Ndb_local_table_info::create(impl, m_local_table_data_size);
|
||||
|
||||
m_localHash.put(impl->m_internalName.c_str(), info);
|
||||
|
||||
m_ndb.theFirstTupleId[impl->getTableId()] = ~0;
|
||||
m_ndb.theLastTupleId[impl->getTableId()] = ~0;
|
||||
}
|
||||
|
||||
#if 0
|
||||
bool
|
||||
NdbDictionaryImpl::setTransporter(class TransporterFacade * tf)
|
||||
@ -3075,13 +3110,11 @@ NdbDictionaryImpl::createEvent(NdbEventImpl & evnt)
|
||||
evnt.getTableName()));
|
||||
DBUG_RETURN(-1);
|
||||
}
|
||||
evnt.setTable(tab);
|
||||
}
|
||||
|
||||
DBUG_PRINT("info",("Table: id: %d version: %d", tab->m_id, tab->m_version));
|
||||
|
||||
evnt.m_tableId = tab->m_id;
|
||||
evnt.m_tableVersion = tab->m_version;
|
||||
evnt.m_tableImpl = tab;
|
||||
NdbTableImpl &table = *evnt.m_tableImpl;
|
||||
|
||||
int attributeList_sz = evnt.m_attrIds.size();
|
||||
@ -3103,7 +3136,7 @@ NdbDictionaryImpl::createEvent(NdbEventImpl & evnt)
|
||||
attributeList_sz = evnt.m_columns.size();
|
||||
|
||||
DBUG_PRINT("info",("Event on tableId=%d, tableVersion=%d, event name %s, no of columns %d",
|
||||
evnt.m_tableId, evnt.m_tableVersion,
|
||||
table.m_id, table.m_version,
|
||||
evnt.m_name.c_str(),
|
||||
evnt.m_columns.size()));
|
||||
|
||||
@ -3211,11 +3244,12 @@ NdbDictInterface::createEvent(class Ndb & ndb,
|
||||
req->setRequestType(CreateEvntReq::RT_USER_GET);
|
||||
} else {
|
||||
DBUG_PRINT("info",("tableId: %u tableVersion: %u",
|
||||
evnt.m_tableId, evnt.m_tableVersion));
|
||||
evnt.m_tableImpl->m_id,
|
||||
evnt.m_tableImpl->m_version));
|
||||
// creating event in Dictionary
|
||||
req->setRequestType(CreateEvntReq::RT_USER_CREATE);
|
||||
req->setTableId(evnt.m_tableId);
|
||||
req->setTableVersion(evnt.m_tableVersion);
|
||||
req->setTableId(evnt.m_tableImpl->m_id);
|
||||
req->setTableVersion(evnt.m_tableImpl->m_version);
|
||||
req->setAttrListBitmask(evnt.m_attrListBitmask);
|
||||
req->setEventType(evnt.mi_type);
|
||||
req->clearFlags();
|
||||
@ -3266,14 +3300,12 @@ NdbDictInterface::createEvent(class Ndb & ndb,
|
||||
// NdbEventImpl *evntImpl = (NdbEventImpl *)evntConf->getUserData();
|
||||
|
||||
if (getFlag) {
|
||||
evnt.m_tableId = evntConf->getTableId();
|
||||
evnt.m_tableVersion = evntConf->getTableVersion();
|
||||
evnt.m_attrListBitmask = evntConf->getAttrListBitmask();
|
||||
evnt.mi_type = evntConf->getEventType();
|
||||
evnt.setTable(dataPtr);
|
||||
} else {
|
||||
if (evnt.m_tableId != evntConf->getTableId() ||
|
||||
evnt.m_tableVersion != evntConf->getTableVersion() ||
|
||||
if (evnt.m_tableImpl->m_id != evntConf->getTableId() ||
|
||||
evnt.m_tableImpl->m_version != evntConf->getTableVersion() ||
|
||||
//evnt.m_attrListBitmask != evntConf->getAttrListBitmask() ||
|
||||
evnt.mi_type != evntConf->getEventType()) {
|
||||
ndbout_c("ERROR*************");
|
||||
@ -3367,7 +3399,7 @@ NdbDictionaryImpl::getEvent(const char * eventName)
|
||||
DBUG_ENTER("NdbDictionaryImpl::getEvent");
|
||||
DBUG_PRINT("enter",("eventName= %s", eventName));
|
||||
|
||||
NdbEventImpl *ev = new NdbEventImpl();
|
||||
NdbEventImpl *ev = new NdbEventImpl();
|
||||
if (ev == NULL) {
|
||||
DBUG_RETURN(NULL);
|
||||
}
|
||||
@ -3384,9 +3416,16 @@ NdbDictionaryImpl::getEvent(const char * eventName)
|
||||
// We only have the table name with internal name
|
||||
DBUG_PRINT("info",("table %s", ev->getTableName()));
|
||||
Ndb_local_table_info *info;
|
||||
int retry= 0;
|
||||
while (1)
|
||||
info= get_local_table_info(ev->getTableName(), true);
|
||||
if (info == 0)
|
||||
{
|
||||
DBUG_PRINT("error",("unable to find table %s", ev->getTableName()));
|
||||
delete ev;
|
||||
DBUG_RETURN(NULL);
|
||||
}
|
||||
if (info->m_table_impl->m_status == NdbDictionary::Object::Invalid)
|
||||
{
|
||||
removeCachedObject(*info->m_table_impl);
|
||||
info= get_local_table_info(ev->getTableName(), true);
|
||||
if (info == 0)
|
||||
{
|
||||
@ -3394,40 +3433,22 @@ NdbDictionaryImpl::getEvent(const char * eventName)
|
||||
delete ev;
|
||||
DBUG_RETURN(NULL);
|
||||
}
|
||||
|
||||
if (ev->m_tableId == info->m_table_impl->m_id &&
|
||||
ev->m_tableVersion == info->m_table_impl->m_version)
|
||||
break;
|
||||
DBUG_PRINT("error",("%s: retry=%d: "
|
||||
"table version mismatch, event: [%u,%u] table: [%u,%u]",
|
||||
ev->getTableName(), retry,
|
||||
ev->m_tableId, ev->m_tableVersion,
|
||||
info->m_table_impl->m_id, info->m_table_impl->m_version));
|
||||
if (retry)
|
||||
{
|
||||
m_error.code= 241;
|
||||
delete ev;
|
||||
DBUG_RETURN(NULL);
|
||||
}
|
||||
invalidateObject(*info->m_table_impl);
|
||||
retry++;
|
||||
}
|
||||
|
||||
ev->m_tableImpl= info->m_table_impl;
|
||||
ev->setTable(m_ndb.externalizeTableName(ev->getTableName()));
|
||||
|
||||
ev->setTable(info->m_table_impl);
|
||||
ev->setTable(m_ndb.externalizeTableName(ev->getTableName()));
|
||||
// get the columns from the attrListBitmask
|
||||
|
||||
NdbTableImpl &table = *ev->m_tableImpl;
|
||||
AttributeMask & mask = ev->m_attrListBitmask;
|
||||
unsigned attributeList_sz = mask.count();
|
||||
|
||||
DBUG_PRINT("info",("Table: id: %d version: %d", table.m_id, table.m_version));
|
||||
DBUG_PRINT("info",("Table: id: %d version: %d",
|
||||
table.m_id, table.m_version));
|
||||
|
||||
#ifndef DBUG_OFF
|
||||
char buf[128] = {0};
|
||||
mask.getText(buf);
|
||||
DBUG_PRINT("info",("attributeList_sz= %d, mask= %s", attributeList_sz, buf));
|
||||
DBUG_PRINT("info",("attributeList_sz= %d, mask= %s",
|
||||
attributeList_sz, buf));
|
||||
#endif
|
||||
|
||||
|
||||
@ -4649,7 +4670,7 @@ NdbDictionaryImpl::fix_blob_events(const NdbDictionary::Table* table, const char
|
||||
{
|
||||
const NdbTableImpl& t = table->m_impl;
|
||||
const NdbEventImpl* ev = getEvent(ev_name);
|
||||
assert(ev != NULL && ev->m_tableImpl == &t);
|
||||
assert(ev != NULL);
|
||||
Uint32 i;
|
||||
for (i = 0; i < t.m_columns.size(); i++) {
|
||||
assert(t.m_columns[i] != NULL);
|
||||
|
@ -261,6 +261,13 @@ public:
|
||||
};
|
||||
|
||||
class NdbEventImpl : public NdbDictionary::Event, public NdbDictObjectImpl {
|
||||
friend class NdbDictInterface;
|
||||
friend class NdbDictionaryImpl;
|
||||
friend class NdbEventOperation;
|
||||
friend class NdbEventOperationImpl;
|
||||
friend class NdbEventBuffer;
|
||||
friend class EventBufData_hash;
|
||||
friend class NdbBlob;
|
||||
public:
|
||||
NdbEventImpl();
|
||||
NdbEventImpl(NdbDictionary::Event &);
|
||||
@ -270,6 +277,7 @@ public:
|
||||
void setName(const char * name);
|
||||
const char * getName() const;
|
||||
void setTable(const NdbDictionary::Table& table);
|
||||
const NdbDictionary::Table * getTable() const;
|
||||
void setTable(const char * table);
|
||||
const char * getTableName() const;
|
||||
void addTableEvent(const NdbDictionary::Event::TableEvent t);
|
||||
@ -287,8 +295,6 @@ public:
|
||||
|
||||
Uint32 m_eventId;
|
||||
Uint32 m_eventKey;
|
||||
Uint32 m_tableId;
|
||||
Uint32 m_tableVersion;
|
||||
AttributeMask m_attrListBitmask;
|
||||
BaseString m_name;
|
||||
Uint32 mi_type;
|
||||
@ -296,7 +302,6 @@ public:
|
||||
NdbDictionary::Event::EventReport m_rep;
|
||||
bool m_mergeEvents;
|
||||
|
||||
NdbTableImpl *m_tableImpl;
|
||||
BaseString m_tableName;
|
||||
Vector<NdbColumnImpl *> m_columns;
|
||||
Vector<unsigned> m_attrIds;
|
||||
@ -304,6 +309,9 @@ public:
|
||||
static NdbEventImpl & getImpl(NdbDictionary::Event & t);
|
||||
static NdbEventImpl & getImpl(const NdbDictionary::Event & t);
|
||||
NdbDictionary::Event * m_facade;
|
||||
private:
|
||||
NdbTableImpl *m_tableImpl;
|
||||
void setTable(NdbTableImpl *tableImpl);
|
||||
};
|
||||
|
||||
struct NdbFilegroupImpl : public NdbDictObjectImpl {
|
||||
@ -561,6 +569,7 @@ public:
|
||||
int listIndexes(List& list, Uint32 indexId);
|
||||
|
||||
NdbTableImpl * getTable(const char * tableName, void **data= 0);
|
||||
void putTable(NdbTableImpl *impl);
|
||||
Ndb_local_table_info* get_local_table_info(
|
||||
const BaseString& internalTableName, bool do_add_blob_tables);
|
||||
NdbIndexImpl * getIndex(const char * indexName,
|
||||
|
@ -97,6 +97,26 @@ NdbEventOperation::hasError() const
|
||||
return m_impl.m_has_error;
|
||||
}
|
||||
|
||||
const bool NdbEventOperation::tableNameChanged() const
|
||||
{
|
||||
return m_impl.tableNameChanged();
|
||||
}
|
||||
|
||||
const bool NdbEventOperation::tableFrmChanged() const
|
||||
{
|
||||
return m_impl.tableFrmChanged();
|
||||
}
|
||||
|
||||
const bool NdbEventOperation::tableFragmentationChanged() const
|
||||
{
|
||||
return m_impl.tableFragmentationChanged();
|
||||
}
|
||||
|
||||
const bool NdbEventOperation::tableRangeListChanged() const
|
||||
{
|
||||
return m_impl.tableRangeListChanged();
|
||||
}
|
||||
|
||||
Uint64
|
||||
NdbEventOperation::getGCI() const
|
||||
{
|
||||
@ -124,10 +144,6 @@ NdbEventOperation::print()
|
||||
/*
|
||||
* Internal for the mysql server
|
||||
*/
|
||||
const NdbDictionary::Table *NdbEventOperation::getTable() const
|
||||
{
|
||||
return m_impl.m_eventImpl->m_tableImpl->m_facade;
|
||||
}
|
||||
const NdbDictionary::Event *NdbEventOperation::getEvent() const
|
||||
{
|
||||
return m_impl.m_eventImpl->m_facade;
|
||||
@ -148,6 +164,7 @@ const NdbRecAttr* NdbEventOperation::getFirstDataPreAttr() const
|
||||
{
|
||||
return m_impl.theFirstDataAttrs[1];
|
||||
}
|
||||
/*
|
||||
bool NdbEventOperation::validateTable(NdbDictionary::Table &table) const
|
||||
{
|
||||
DBUG_ENTER("NdbEventOperation::validateTable");
|
||||
@ -159,7 +176,7 @@ bool NdbEventOperation::validateTable(NdbDictionary::Table &table) const
|
||||
}
|
||||
DBUG_RETURN(res);
|
||||
}
|
||||
|
||||
*/
|
||||
void NdbEventOperation::setCustomData(void * data)
|
||||
{
|
||||
m_impl.m_custom_data= data;
|
||||
|
@ -41,6 +41,7 @@
|
||||
#include <NdbBlob.hpp>
|
||||
#include <NdbEventOperation.hpp>
|
||||
#include "NdbEventOperationImpl.hpp"
|
||||
#include <signaldata/AlterTable.hpp>
|
||||
|
||||
#include <EventLogger.hpp>
|
||||
extern EventLogger g_eventLogger;
|
||||
@ -92,6 +93,7 @@ NdbEventOperationImpl::NdbEventOperationImpl(NdbEventOperation &N,
|
||||
const char* eventName)
|
||||
: NdbEventOperation(*this), m_facade(&N), m_magic_number(0),
|
||||
m_ndb(theNdb), m_state(EO_ERROR), mi_type(0), m_oid(~(Uint32)0),
|
||||
m_change_mask(0),
|
||||
#ifdef VM_TRACE
|
||||
m_data_done_count(0), m_data_count(0),
|
||||
#endif
|
||||
@ -334,7 +336,6 @@ NdbEventOperationImpl::getBlobHandle(const NdbColumnImpl *tAttrInfo, int n)
|
||||
NdbEventOperationImpl* tLastBlopOp = NULL;
|
||||
while (tBlobOp != NULL) {
|
||||
if (strcmp(tBlobOp->m_eventImpl->m_name.c_str(), bename) == 0) {
|
||||
assert(tBlobOp->m_eventImpl->m_tableImpl == tAttrInfo->m_blobTable);
|
||||
break;
|
||||
}
|
||||
tLastBlopOp = tBlobOp;
|
||||
@ -554,6 +555,26 @@ NdbEventOperationImpl::stop()
|
||||
DBUG_RETURN(r);
|
||||
}
|
||||
|
||||
const bool NdbEventOperationImpl::tableNameChanged() const
|
||||
{
|
||||
return (bool)AlterTableReq::getNameFlag(m_change_mask);
|
||||
}
|
||||
|
||||
const bool NdbEventOperationImpl::tableFrmChanged() const
|
||||
{
|
||||
return (bool)AlterTableReq::getFrmFlag(m_change_mask);
|
||||
}
|
||||
|
||||
const bool NdbEventOperationImpl::tableFragmentationChanged() const
|
||||
{
|
||||
return (bool)AlterTableReq::getFragDataFlag(m_change_mask);
|
||||
}
|
||||
|
||||
const bool NdbEventOperationImpl::tableRangeListChanged() const
|
||||
{
|
||||
return (bool)AlterTableReq::getRangeListFlag(m_change_mask);
|
||||
}
|
||||
|
||||
Uint64
|
||||
NdbEventOperationImpl::getGCI()
|
||||
{
|
||||
@ -566,6 +587,35 @@ NdbEventOperationImpl::getLatestGCI()
|
||||
return m_ndb->theEventBuffer->getLatestGCI();
|
||||
}
|
||||
|
||||
bool
|
||||
NdbEventOperationImpl::execSUB_TABLE_DATA(NdbApiSignal * signal,
|
||||
LinearSectionPtr ptr[3])
|
||||
{
|
||||
DBUG_ENTER("NdbEventOperationImpl::execSUB_TABLE_DATA");
|
||||
const SubTableData * const sdata=
|
||||
CAST_CONSTPTR(SubTableData, signal->getDataPtr());
|
||||
|
||||
if(signal->isFirstFragment()){
|
||||
m_fragmentId = signal->getFragmentId();
|
||||
m_buffer.grow(4 * sdata->totalLen);
|
||||
} else {
|
||||
if(m_fragmentId != signal->getFragmentId()){
|
||||
abort();
|
||||
}
|
||||
}
|
||||
const Uint32 i = SubTableData::DICT_TAB_INFO;
|
||||
DBUG_PRINT("info", ("Accumulated %u bytes for fragment %u",
|
||||
4 * ptr[i].sz, m_fragmentId));
|
||||
m_buffer.append(ptr[i].p, 4 * ptr[i].sz);
|
||||
|
||||
if(!signal->isLastFragment()){
|
||||
DBUG_RETURN(FALSE);
|
||||
}
|
||||
|
||||
DBUG_RETURN(TRUE);
|
||||
}
|
||||
|
||||
|
||||
int
|
||||
NdbEventOperationImpl::receive_event()
|
||||
{
|
||||
@ -574,6 +624,26 @@ NdbEventOperationImpl::receive_event()
|
||||
Uint32 operation= (Uint32)m_data_item->sdata->operation;
|
||||
DBUG_PRINT_EVENT("info",("sdata->operation %u",operation));
|
||||
|
||||
if (operation == NdbDictionary::Event::_TE_ALTER)
|
||||
{
|
||||
// Parse the new table definition and
|
||||
// create a table object
|
||||
NdbDictionary::Dictionary *myDict = m_ndb->getDictionary();
|
||||
NdbDictionaryImpl *dict = & NdbDictionaryImpl::getImpl(*myDict);
|
||||
NdbError error;
|
||||
NdbDictInterface dif(error);
|
||||
NdbTableImpl *at;
|
||||
m_change_mask = m_data_item->sdata->changeMask;
|
||||
error.code = dif.parseTableInfo(&at,
|
||||
(Uint32*)m_buffer.get_data(),
|
||||
m_buffer.length() / 4,
|
||||
true);
|
||||
m_buffer.clear();
|
||||
if ( m_eventImpl->m_tableImpl)
|
||||
delete m_eventImpl->m_tableImpl;
|
||||
m_eventImpl->m_tableImpl = at;
|
||||
}
|
||||
|
||||
if (unlikely(operation >= NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT))
|
||||
{
|
||||
DBUG_RETURN_EVENT(1);
|
||||
@ -1389,7 +1459,7 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
|
||||
*/
|
||||
DBUG_RETURN_EVENT(0);
|
||||
}
|
||||
|
||||
|
||||
const bool is_blob_event = (op->theMainOp != NULL);
|
||||
const bool is_data_event =
|
||||
sdata->operation < NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT;
|
||||
|
@ -22,6 +22,7 @@
|
||||
#include <transporter/TransporterDefinitions.hpp>
|
||||
#include <NdbRecAttr.hpp>
|
||||
#include <AttributeHeader.hpp>
|
||||
#include <UtilBuffer.hpp>
|
||||
|
||||
#define NDB_EVENT_OP_MAGIC_NUMBER 0xA9F301B4
|
||||
|
||||
@ -200,8 +201,14 @@ public:
|
||||
NdbBlob *getBlobHandle(const NdbColumnImpl *, int n);
|
||||
int readBlobParts(char* buf, NdbBlob* blob, Uint32 part, Uint32 count);
|
||||
int receive_event();
|
||||
const bool tableNameChanged() const;
|
||||
const bool tableFrmChanged() const;
|
||||
const bool tableFragmentationChanged() const;
|
||||
const bool tableRangeListChanged() const;
|
||||
Uint64 getGCI();
|
||||
Uint64 getLatestGCI();
|
||||
bool execSUB_TABLE_DATA(NdbApiSignal * signal,
|
||||
LinearSectionPtr ptr[3]);
|
||||
|
||||
NdbDictionary::Event::TableEvent getEventType();
|
||||
|
||||
@ -240,6 +247,12 @@ public:
|
||||
void *m_custom_data;
|
||||
int m_has_error;
|
||||
|
||||
Uint32 m_fragmentId;
|
||||
UtilBuffer m_buffer;
|
||||
|
||||
// Bit mask for what has changed in a table (for TE_ALTER event)
|
||||
Uint32 m_change_mask;
|
||||
|
||||
#ifdef VM_TRACE
|
||||
Uint32 m_data_done_count;
|
||||
Uint32 m_data_count;
|
||||
|
@ -703,7 +703,6 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
|
||||
aSignal, ptr);
|
||||
return;
|
||||
|
||||
case GSN_SUB_META_DATA:
|
||||
case GSN_SUB_REMOVE_CONF:
|
||||
case GSN_SUB_REMOVE_REF:
|
||||
return; // ignore these signals
|
||||
@ -726,6 +725,16 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
|
||||
const SubTableData * const sdata=
|
||||
CAST_CONSTPTR(SubTableData, aSignal->getDataPtr());
|
||||
const Uint32 oid = sdata->senderData;
|
||||
NdbEventOperationImpl *op= (NdbEventOperationImpl*)int2void(oid);
|
||||
|
||||
if (op->m_magic_number != NDB_EVENT_OP_MAGIC_NUMBER)
|
||||
g_eventLogger.error("dropped GSN_SUB_TABLE_DATA due to wrong magic "
|
||||
"number");
|
||||
|
||||
// Accumulate DIC_TAB_INFO for TE_ALTER events
|
||||
if (sdata->operation == NdbDictionary::Event::_TE_ALTER &&
|
||||
!op->execSUB_TABLE_DATA(aSignal, ptr))
|
||||
return;
|
||||
|
||||
for (int i= aSignal->m_noOfSections;i < 3; i++) {
|
||||
ptr[i].p = NULL;
|
||||
@ -736,12 +745,7 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
|
||||
sdata->senderData, sdata->gci, sdata->operation,
|
||||
sdata->tableId));
|
||||
|
||||
NdbEventOperationImpl *op= (NdbEventOperationImpl*)int2void(oid);
|
||||
if (op->m_magic_number == NDB_EVENT_OP_MAGIC_NUMBER)
|
||||
theEventBuffer->insertDataL(op,sdata, ptr);
|
||||
else
|
||||
g_eventLogger.error("dropped GSN_SUB_TABLE_DATA due to wrong magic "
|
||||
"number");
|
||||
theEventBuffer->insertDataL(op,sdata, ptr);
|
||||
return;
|
||||
}
|
||||
case GSN_DIHNDBTAMPER:
|
||||
|
Loading…
x
Reference in New Issue
Block a user