[Box Backup-dev] COMMIT r476 - box/chris/general/bin/bbackupd

boxbackup-dev@fluffy.co.uk boxbackup-dev@fluffy.co.uk
Mon, 20 Feb 2006 00:01:06 +0000 (GMT)


Author: chris
Date: 2006-02-20 00:01:05 +0000 (Mon, 20 Feb 2006)
New Revision: 476

Modified:
   box/chris/general/bin/bbackupd/BackupDaemon.cpp
Log:
* BackupDaemon.cpp
- Use overlapped I/O and IPC to avoid writing to control socket while
  another thread is reading from it, which causes race conditions and
  deadlocks


Modified: box/chris/general/bin/bbackupd/BackupDaemon.cpp
===================================================================
--- box/chris/general/bin/bbackupd/BackupDaemon.cpp	2006-02-19 23:59:51 UTC (rev 475)
+++ box/chris/general/bin/bbackupd/BackupDaemon.cpp	2006-02-20 00:01:05 UTC (rev 476)
@@ -125,6 +125,19 @@
 	}
 
 #ifdef WIN32
+	// Create the event object to signal when new messages are
+	// queued to be sent.
+	mhMessageToSendEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
+	if (mhMessageToSendEvent == INVALID_HANDLE_VALUE)
+	{
+		syslog(LOG_ERR, "Failed to create event object: error %d",
+			GetLastError);
+		exit(1);
+	}
+
+	// Create the critical section to protect the message queue
+	InitializeCriticalSection(&mMessageQueueLock);
+
 	// Create a thread to handle the named pipe
 	HANDLE hThread;
 	unsigned int dwThreadId;
@@ -264,14 +277,14 @@
 {
 	mpCommandSocketInfo = new CommandSocketInfo;
 	this->mReceivedCommandConn = false;
+	WinNamedPipeStream& rSocket(mpCommandSocketInfo->mListeningSocket);
 
 	// loop until the parent process exits
 	while (TRUE)
 	{
 		try
 		{
-			mpCommandSocketInfo->mListeningSocket.Accept(
-				BOX_NAMED_PIPE_NAME);
+			rSocket.Accept(BOX_NAMED_PIPE_NAME);
 
 			// This next section comes from Ben's original function
 			// Log
@@ -289,18 +302,73 @@
 				conf.GetKeyValueInt("MaxUploadWait"),
 				mState);
 
-			mpCommandSocketInfo->mListeningSocket.Write(summary, summarySize);
-			mpCommandSocketInfo->mListeningSocket.Write("ping\n", 5);
+			rSocket.Write(summary, summarySize);
+			rSocket.Write("ping\n", 5);
 
-			IOStreamGetLine readLine(mpCommandSocketInfo->mListeningSocket);
+			// old queued messages are not useful
+			EnterCriticalSection(&mMessageQueueLock);
+			mMessageList.clear();
+			ResetEvent(mhMessageToSendEvent);
+			LeaveCriticalSection(&mMessageQueueLock);
+
+			IOStreamGetLine readLine(rSocket);
 			std::string command;
 
-			while (mpCommandSocketInfo->mListeningSocket.IsConnected() &&
-			       readLine.GetLine(command) )
+			while (rSocket.IsConnected())
 			{
-				TRACE1("Receiving command '%s' over "
-					"command socket\n", command.c_str());
+				HANDLE handles[2];
+				handles[0] = mhMessageToSendEvent;
+				handles[1] = rSocket.GetReadableEvent();
 
+				DWORD result = WaitForMultipleObjects(
+					sizeof(handles)/sizeof(*handles),
+					handles, FALSE, 1000);
+
+				if (result == 0)
+				{
+					ResetEvent(mhMessageToSendEvent);
+
+					EnterCriticalSection(&mMessageQueueLock);
+					try
+					{
+						while (mMessageList.size() > 0)
+						{
+							std::string message = *(mMessageList.begin());
+							mMessageList.erase(mMessageList.begin());
+							printf("Sending '%s' to waiting client... ", message.c_str());
+							message += "\n";
+							rSocket.Write(message.c_str(),
+								message.length());
+
+							printf("done.\n");
+						}
+					}
+					catch (...)
+					{
+						LeaveCriticalSection(&mMessageQueueLock);
+						throw;
+					}
+					LeaveCriticalSection(&mMessageQueueLock);
+					continue;
+				}
+				else if (result == WAIT_TIMEOUT)
+				{
+					continue;
+				}
+				else if (result != 1)
+				{
+					::syslog(LOG_ERR, "WaitForMultipleObjects returned invalid result %d", result);
+					continue;
+				}
+
+				if (!readLine.GetLine(command))
+				{
+					::syslog(LOG_ERR, "Failed to read line");
+					continue;
+				}
+
+				printf("Received command '%s' from client\n", command.c_str());
+
 				bool sendOK = false;
 				bool sendResponse = true;
 				bool disconnect = false;
@@ -338,12 +406,18 @@
 					SetTerminateWanted();
 					sendOK = true;
 				}
+				else
+				{
+					::syslog(LOG_ERR, "Received unknown command '%s' from client", command.c_str());
+					sendResponse = true;
+					sendOK = false;
+				}
 
 				// Send a response back?
 				if (sendResponse)
 				{
 					const char* response = sendOK ? "ok\n" : "error\n";
-					mpCommandSocketInfo->mListeningSocket.Write(
+					rSocket.Write(
 						response, strlen(response));
 				}
 
@@ -355,7 +429,7 @@
 				this->mReceivedCommandConn = true;
 			}
 
-			mpCommandSocketInfo->mListeningSocket.Close();
+			rSocket.Close();
 		}
 		catch (BoxException &e)
 		{
@@ -1134,13 +1208,9 @@
 // --------------------------------------------------------------------------
 void BackupDaemon::SendSyncStartOrFinish(bool SendStart)
 {
-	// The bbackupctl program can't rely on a state change, because it may never
-	// change if the server doesn't need to be contacted.
+	// The bbackupctl program can't rely on a state change, because it 
+	// may never change if the server doesn't need to be contacted.
 
-#ifdef __MINGW32__
-#warning race condition: what happens if socket is closed?
-#endif
-
 	if (mpCommandSocketInfo != NULL &&
 #ifdef WIN32
 	    mpCommandSocketInfo->mListeningSocket.IsConnected()
@@ -1149,13 +1219,16 @@
 #endif
 	    )
 	{
-		const char* message = SendStart ? "start-sync\n" : "finish-sync\n";
+		std::string message = SendStart ? "start-sync" : "finish-sync";
 		try
 		{
 #ifdef WIN32
-			mpCommandSocketInfo->mListeningSocket.Write(message, 
-				(int)strlen(message));
+			EnterCriticalSection(&mMessageQueueLock);
+			mMessageList.push_back(message);
+			SetEvent(mhMessageToSendEvent);
+			LeaveCriticalSection(&mMessageQueueLock);
 #else
+			message += "\n";
 			mpCommandSocketInfo->mpConnectedSocket->Write(message,
 				strlen(message));
 #endif
@@ -1748,40 +1821,37 @@
 	// command socket if there's an error
 
 	char newState[64];
-	char newStateSize = sprintf(newState, "state %d\n", State);
+	sprintf(newState, "state %d", State);
+	std::string message = newState;
 
 #ifdef WIN32
-	#ifndef _MSC_VER
-		#warning FIX ME: race condition
-	#endif
+	EnterCriticalSection(&mMessageQueueLock);
+	mMessageList.push_back(newState);
+	SetEvent(mhMessageToSendEvent);
+	LeaveCriticalSection(&mMessageQueueLock);
+#else
+	message += "\n";
 
-	// what happens if the socket is closed by the other thread before
-	// we can write to it? Null pointer deref at best.
-	if (mpCommandSocketInfo && 
-	    mpCommandSocketInfo->mListeningSocket.IsConnected())
+	if(mpCommandSocketInfo == 0)
 	{
-		try
-		{
-			mpCommandSocketInfo->mListeningSocket.Write(newState, newStateSize);
-		}
-		catch(...)
-		{
-			CloseCommandConnection();
-		}
+		return;
 	}
-#else
-	if(mpCommandSocketInfo != 0 && mpCommandSocketInfo->mpConnectedSocket.get() != 0)
+
+	if (mpCommandSocketInfo->mpConnectedSocket.get() == 0)
 	{
-		// Something connected to the command socket, tell it about the new state
-		try
-		{
-			mpCommandSocketInfo->mpConnectedSocket->Write(newState, newStateSize);
-		}
-		catch(...)
-		{
-			CloseCommandConnection();
-		}
+		return;
 	}
+
+	// Something connected to the command socket, tell it about the new state
+	try
+	{
+		mpCommandSocketInfo->mpConnectedSocket->Write(message.c_str(),
+			message.length());
+	}
+	catch(...)
+	{
+		CloseCommandConnection();
+	}
 #endif
 }