Merge jonas@perch:src/mysql-5.0
into poseidon.ndb.mysql.com:/home/tomas/mysql-5.0-main
This commit is contained in:
commit
cc6483a4fe
@ -105,7 +105,8 @@ public:
|
||||
void stopSessions(bool wait = false);
|
||||
|
||||
void foreachSession(void (*f)(Session*, void*), void *data);
|
||||
|
||||
void checkSessions();
|
||||
|
||||
private:
|
||||
struct SessionInstance {
|
||||
Service * m_service;
|
||||
@ -116,12 +117,13 @@ private:
|
||||
Service * m_service;
|
||||
NDB_SOCKET_TYPE m_socket;
|
||||
};
|
||||
MutexVector<SessionInstance> m_sessions;
|
||||
NdbLockable m_session_mutex;
|
||||
Vector<SessionInstance> m_sessions;
|
||||
MutexVector<ServiceInstance> m_services;
|
||||
unsigned m_maxSessions;
|
||||
|
||||
void doAccept();
|
||||
void checkSessions();
|
||||
void checkSessionsImpl();
|
||||
void startSession(SessionInstance &);
|
||||
|
||||
/**
|
||||
|
@ -184,9 +184,12 @@ SocketServer::doAccept(){
|
||||
SessionInstance s;
|
||||
s.m_service = si.m_service;
|
||||
s.m_session = si.m_service->newSession(childSock);
|
||||
if(s.m_session != 0){
|
||||
if(s.m_session != 0)
|
||||
{
|
||||
m_session_mutex.lock();
|
||||
m_sessions.push_back(s);
|
||||
startSession(m_sessions.back());
|
||||
m_session_mutex.unlock();
|
||||
}
|
||||
|
||||
continue;
|
||||
@ -240,10 +243,13 @@ void
|
||||
SocketServer::doRun(){
|
||||
|
||||
while(!m_stopThread){
|
||||
checkSessions();
|
||||
m_session_mutex.lock();
|
||||
checkSessionsImpl();
|
||||
if(m_sessions.size() < m_maxSessions){
|
||||
m_session_mutex.unlock();
|
||||
doAccept();
|
||||
} else {
|
||||
m_session_mutex.unlock();
|
||||
NdbSleep_MilliSleep(200);
|
||||
}
|
||||
}
|
||||
@ -276,17 +282,30 @@ transfer(NDB_SOCKET_TYPE sock){
|
||||
void
|
||||
SocketServer::foreachSession(void (*func)(SocketServer::Session*, void *), void *data)
|
||||
{
|
||||
m_session_mutex.lock();
|
||||
for(int i = m_sessions.size() - 1; i >= 0; i--){
|
||||
(*func)(m_sessions[i].m_session, data);
|
||||
}
|
||||
checkSessions();
|
||||
m_session_mutex.unlock();
|
||||
}
|
||||
|
||||
void
|
||||
SocketServer::checkSessions(){
|
||||
for(int i = m_sessions.size() - 1; i >= 0; i--){
|
||||
if(m_sessions[i].m_session->m_stopped){
|
||||
if(m_sessions[i].m_thread != 0){
|
||||
SocketServer::checkSessions()
|
||||
{
|
||||
m_session_mutex.lock();
|
||||
checkSessionsImpl();
|
||||
m_session_mutex.unlock();
|
||||
}
|
||||
|
||||
void
|
||||
SocketServer::checkSessionsImpl()
|
||||
{
|
||||
for(int i = m_sessions.size() - 1; i >= 0; i--)
|
||||
{
|
||||
if(m_sessions[i].m_session->m_stopped)
|
||||
{
|
||||
if(m_sessions[i].m_thread != 0)
|
||||
{
|
||||
void* ret;
|
||||
NdbThread_WaitFor(m_sessions[i].m_thread, &ret);
|
||||
NdbThread_Destroy(&m_sessions[i].m_thread);
|
||||
@ -301,19 +320,26 @@ SocketServer::checkSessions(){
|
||||
void
|
||||
SocketServer::stopSessions(bool wait){
|
||||
int i;
|
||||
m_session_mutex.lock();
|
||||
for(i = m_sessions.size() - 1; i>=0; i--)
|
||||
{
|
||||
m_sessions[i].m_session->stopSession();
|
||||
m_sessions[i].m_session->m_stop = true; // to make sure
|
||||
}
|
||||
m_session_mutex.unlock();
|
||||
|
||||
for(i = m_services.size() - 1; i>=0; i--)
|
||||
m_services[i].m_service->stopSessions();
|
||||
|
||||
if(wait){
|
||||
m_session_mutex.lock();
|
||||
while(m_sessions.size() > 0){
|
||||
checkSessions();
|
||||
checkSessionsImpl();
|
||||
m_session_mutex.unlock();
|
||||
NdbSleep_MilliSleep(100);
|
||||
m_session_mutex.lock();
|
||||
}
|
||||
m_session_mutex.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@ -348,4 +374,4 @@ sessionThread_C(void* _sc){
|
||||
}
|
||||
|
||||
template class MutexVector<SocketServer::ServiceInstance>;
|
||||
template class MutexVector<SocketServer::SessionInstance>;
|
||||
template class Vector<SocketServer::SessionInstance>;
|
||||
|
@ -501,6 +501,7 @@ MgmApiSession::get_nodeid(Parser_t::Context &,
|
||||
ps.tick= tick;
|
||||
m_mgmsrv.get_socket_server()->
|
||||
foreachSession(stop_session_if_timed_out,&ps);
|
||||
m_mgmsrv.get_socket_server()->checkSessions();
|
||||
error_string = "";
|
||||
continue;
|
||||
}
|
||||
@ -1558,6 +1559,7 @@ MgmApiSession::purge_stale_sessions(Parser_t::Context &ctx,
|
||||
ps.free_nodes.bitXORC(NodeBitmask()); // invert connected_nodes to get free nodes
|
||||
|
||||
m_mgmsrv.get_socket_server()->foreachSession(stop_session_if_not_connected,&ps);
|
||||
m_mgmsrv.get_socket_server()->checkSessions();
|
||||
|
||||
m_output->println("purge stale sessions reply");
|
||||
if (str.length() > 0)
|
||||
|
Loading…
x
Reference in New Issue
Block a user