[Box Backup-commit] COMMIT r1486 - box/chris/merge/bin/bbackupd

boxbackup-dev@fluffy.co.uk boxbackup-dev@fluffy.co.uk
Sat, 24 Mar 2007 23:40:19 +0000


Author: chris
Date: 2007-03-24 23:40:19 +0000 (Sat, 24 Mar 2007)
New Revision: 1486

Modified:
   box/chris/merge/bin/bbackupd/BackupDaemon.cpp
   box/chris/merge/bin/bbackupd/BackupDaemon.h
Log:
Move all command socket communications to the worker thread, to avoid
deadlocks.

Use events, and a message list protected by a critical section, to pass
messages between threads.

(refs #3)


Modified: box/chris/merge/bin/bbackupd/BackupDaemon.cpp
===================================================================
--- box/chris/merge/bin/bbackupd/BackupDaemon.cpp	2007-03-24 23:37:26 UTC (rev 1485)
+++ box/chris/merge/bin/bbackupd/BackupDaemon.cpp	2007-03-24 23:40:19 UTC (rev 1486)
@@ -127,6 +127,29 @@
 	}
 
 #ifdef WIN32
+	// Create the event object to signal from main thread to worker
+	// when new messages are queued to be sent to the command socket.
+	mhMessageToSendEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
+	if(mhMessageToSendEvent == INVALID_HANDLE_VALUE)
+	{
+		BOX_ERROR("Failed to create event object: error " <<
+			GetLastError());
+		exit(1);
+	}
+
+	// Create the event object to signal from worker to main thread
+	// when a command has been received on the command socket.
+	mhCommandReceivedEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
+	if(mhCommandReceivedEvent == INVALID_HANDLE_VALUE)
+	{
+		BOX_ERROR("Failed to create event object: error " <<
+			GetLastError());
+		exit(1);
+	}
+
+	// Create the critical section to protect the message queue
+	InitializeCriticalSection(&mMessageQueueLock);
+
 	// Create a thread to handle the named pipe
 	HANDLE hThread;
 	unsigned int dwThreadId;
@@ -264,7 +287,6 @@
 #ifdef WIN32
 void BackupDaemon::RunHelperThread(void)
 {
-	this->mReceivedCommandConn = false;
 	mpCommandSocketInfo = new CommandSocketInfo;
 	WinNamedPipeStream& rSocket(mpCommandSocketInfo->mListeningSocket);
 
@@ -322,16 +344,74 @@
 			rSocket.Write(summary, summarySize);
 			rSocket.Write("ping\n", 5);
 
+			// old queued messages are not useful
+			EnterCriticalSection(&mMessageQueueLock);
+			mMessageList.clear();
+			ResetEvent(mhMessageToSendEvent);
+			LeaveCriticalSection(&mMessageQueueLock);
+
 			IOStreamGetLine readLine(rSocket);
 			std::string command;
 
-			while (rSocket.IsConnected() && 
-				readLine.GetLine(command) &&
-				!IsTerminateWanted())
+			while (rSocket.IsConnected() && !IsTerminateWanted())
 			{
+				HANDLE handles[2];
+				handles[0] = mhMessageToSendEvent;
+				handles[1] = rSocket.GetReadableEvent();
+				
 				BOX_TRACE("Received command '" << command 
 					<< "' over command socket");
 
+				DWORD result = WaitForMultipleObjects(
+					sizeof(handles)/sizeof(*handles),
+					handles, FALSE, 1000);
+
+				if(result == 0)
+				{
+					ResetEvent(mhMessageToSendEvent);
+
+					EnterCriticalSection(&mMessageQueueLock);
+					try
+					{
+						while (mMessageList.size() > 0)
+						{
+							std::string message = *(mMessageList.begin());
+							mMessageList.erase(mMessageList.begin());
+							printf("Sending '%s' to waiting client... ", message.c_str());
+							message += "\n";
+							rSocket.Write(message.c_str(),
+								message.length());
+
+							printf("done.\n");
+						}
+					}
+					catch (...)
+					{
+						LeaveCriticalSection(&mMessageQueueLock);
+						throw;
+					}
+					LeaveCriticalSection(&mMessageQueueLock);
+					continue;
+				}
+				else if(result == WAIT_TIMEOUT)
+				{
+					continue;
+				}
+				else if(result != 1)
+				{
+					BOX_ERROR("WaitForMultipleObjects returned invalid result " << result);
+					continue;
+				}
+
+				if(!readLine.GetLine(command))
+				{
+					BOX_ERROR("Failed to read line");
+					continue;
+				}
+
+				BOX_INFO("Received command " << command << 
+					" from client");
+
 				bool sendOK = false;
 				bool sendResponse = true;
 				bool disconnect = false;
@@ -349,6 +429,7 @@
 					this->mDoSyncFlagOut = true;
 					this->mSyncIsForcedOut = false;
 					sendOK = true;
+					SetEvent(mhCommandReceivedEvent);
 				}
 				else if(command == "force-sync")
 				{
@@ -356,18 +437,21 @@
 					this->mDoSyncFlagOut = true;
 					this->mSyncIsForcedOut = true;
 					sendOK = true;
+					SetEvent(mhCommandReceivedEvent);
 				}
 				else if(command == "reload")
 				{
 					// Reload the configuration
 					SetReloadConfigWanted();
 					sendOK = true;
+					SetEvent(mhCommandReceivedEvent);
 				}
 				else if(command == "terminate")
 				{
 					// Terminate the daemon cleanly
 					SetTerminateWanted();
 					sendOK = true;
+					SetEvent(mhCommandReceivedEvent);
 				}
 				else
 				{
@@ -390,8 +474,6 @@
 				{
 					break;
 				}
-
-				this->mReceivedCommandConn = true;
 			}
 
 			rSocket.Close();
@@ -411,6 +493,9 @@
 			BOX_ERROR("Communication error with control client");
 		}
 	}
+
+	CloseHandle(mhCommandReceivedEvent);
+	CloseHandle(mhMessageToSendEvent);
 } 
 #endif
 
@@ -1123,25 +1208,27 @@
 void BackupDaemon::WaitOnCommandSocket(box_time_t RequiredDelay, bool &DoSyncFlagOut, bool &SyncIsForcedOut)
 {
 #ifdef WIN32
-	// Really could use some interprocess protection, mutex etc
-	// any side effect should be too bad???? :)
-	DWORD timeout = (DWORD)BoxTimeToMilliSeconds(RequiredDelay);
+	DWORD requiredDelayMs = BoxTimeToMilliSeconds(RequiredDelay);
 
-	while ( this->mReceivedCommandConn == false )
+	DWORD result = WaitForSingleObject(mhCommandReceivedEvent, 
+		(DWORD)requiredDelayMs);
+
+	if(result == WAIT_OBJECT_0)
 	{
-		Sleep(1);
-
-		if( timeout == 0 )
-		{
-			DoSyncFlagOut = false;
-			SyncIsForcedOut = false;
-			return;
-		}
-		timeout--;
+		DoSyncFlagOut = this->mDoSyncFlagOut;
+		SyncIsForcedOut = this->mSyncIsForcedOut;
+		ResetEvent(mhCommandReceivedEvent);
 	}
-	this->mReceivedCommandConn = false;
-	DoSyncFlagOut = this->mDoSyncFlagOut;
-	SyncIsForcedOut = this->mSyncIsForcedOut;
+	else if(result == WAIT_TIMEOUT)
+	{
+		DoSyncFlagOut = false;
+		SyncIsForcedOut = false;
+	}
+	else
+	{
+		BOX_ERROR("Unexpected result from WaitForSingleObject: "
+			"error " << GetLastError());
+	}
 
 	return;
 #else // ! WIN32
@@ -1383,10 +1470,6 @@
 	// The bbackupctl program can't rely on a state change, because it 
 	// may never change if the server doesn't need to be contacted.
 
-#ifdef __MINGW32__
-#warning race condition: what happens if socket is closed?
-#endif
-
 	if(mpCommandSocketInfo != NULL &&
 #ifdef WIN32
 	    mpCommandSocketInfo->mListeningSocket.IsConnected()
@@ -1395,15 +1478,18 @@
 #endif
 	    )
 	{
-		const char* message = SendStart ? "start-sync\n" : "finish-sync\n";
+		std::string message = SendStart ? "start-sync" : "finish-sync";
 		try
 		{
 #ifdef WIN32
-			mpCommandSocketInfo->mListeningSocket.Write(message, 
-				(int)strlen(message));
+			EnterCriticalSection(&mMessageQueueLock);
+			mMessageList.push_back(message);
+			SetEvent(mhMessageToSendEvent);
+			LeaveCriticalSection(&mMessageQueueLock);
 #else
-			mpCommandSocketInfo->mpConnectedSocket->Write(message,
-				strlen(message));
+			message += "\n";
+			mpCommandSocketInfo->mpConnectedSocket->Write(
+				message.c_str(), message.size());
 #endif
 		}
 		catch(std::exception &e)
@@ -2033,52 +2119,45 @@
 	// command socket if there's an error
 
 	char newState[64];
-	char newStateSize = sprintf(newState, "state %d\n", State);
+	sprintf(newState, "state %d", State);
+	std::string message = newState;
 
 #ifdef WIN32
-	#ifndef _MSC_VER
-		#warning FIX ME: race condition
-	#endif
+	EnterCriticalSection(&mMessageQueueLock);
+	mMessageList.push_back(newState);
+	SetEvent(mhMessageToSendEvent);
+	LeaveCriticalSection(&mMessageQueueLock);
+#else
+	message += "\n";
 
-	// what happens if the socket is closed by the other thread before
-	// we can write to it? Null pointer deref at best.
-	if(mpCommandSocketInfo && 
-	    mpCommandSocketInfo->mListeningSocket.IsConnected())
+	if(mpCommandSocketInfo == 0)
 	{
-		try
-		{
-			mpCommandSocketInfo->mListeningSocket.Write(newState, newStateSize);
-		}
-		catch(std::exception &e)
-		{
-			BOX_ERROR("Internal error while writing state "
-				"to command socket: " << e.what());
-			CloseCommandConnection();
-		}
-		catch(...)
-		{
-			CloseCommandConnection();
-		}
+		return;
 	}
-#else
-	if(mpCommandSocketInfo != 0 && mpCommandSocketInfo->mpConnectedSocket.get() != 0)
+
+	if(mpCommandSocketInfo->mpConnectedSocket.get() == 0)
 	{
-		// Something connected to the command socket, tell it about the new state
-		try
-		{
-			mpCommandSocketInfo->mpConnectedSocket->Write(newState, newStateSize);
-		}
-		catch(std::exception &e)
-		{
-			BOX_ERROR("Internal error while writing state "
-				"to command socket: " << e.what());
-			CloseCommandConnection();
-		}
-		catch(...)
-		{
-			CloseCommandConnection();
-		}
+		return;
 	}
+
+	// Something connected to the command socket, tell it about the new state
+	try
+	{
+		mpCommandSocketInfo->mpConnectedSocket->Write(message.c_str(),
+			message.length());
+	}
+	catch(std::exception &e)
+	{
+		BOX_ERROR("Internal error while writing state "
+			"to command socket: " << e.what());
+		CloseCommandConnection();
+	}
+	catch(...)
+	{
+		BOX_ERROR("Internal error while writing state "
+			"to command socket: unknown error");
+		CloseCommandConnection();
+	}
 #endif
 }
 

Modified: box/chris/merge/bin/bbackupd/BackupDaemon.h
===================================================================
--- box/chris/merge/bin/bbackupd/BackupDaemon.h	2007-03-24 23:37:26 UTC (rev 1485)
+++ box/chris/merge/bin/bbackupd/BackupDaemon.h	2007-03-24 23:40:19 UTC (rev 1486)
@@ -346,7 +346,10 @@
 	void RunHelperThread(void);
 
 	private:
-	bool mDoSyncFlagOut, mSyncIsForcedOut, mReceivedCommandConn;
+	bool mDoSyncFlagOut, mSyncIsForcedOut;
+	HANDLE mhMessageToSendEvent, mhCommandReceivedEvent;
+	CRITICAL_SECTION mMessageQueueLock;
+	std::vector<std::string> mMessageList;
 #endif
 };