Unit TalkWithServer; { This communicates with the server. This works at a slightly lower level than GetAlertData. GetAlertData knows a lot about specific types of messages and when to connect and disconnect. This unit only receives messages from the main program and sends them to the server, and vice versa. It knows nothing about the content. This unit is responsible for marshing and unmarshaling the messages. It takes in messages in the form of TMessageToServer. See TOnMessageFromServer for the return message format. This unit is also responsible for compression. And this unit combines multiple messages going from the client to the server for the sake of effeciency using WakeMeSoon. The next lower level is GenericServerConnection. } Interface Uses Classes, WakeMeSoon, ZLibEx, GenericServerConnection; Type TServerConnectionStatus = (scsNew, // Never been used. scsTrying, // Connect requested. scsConnected, // Data is flowing scsDisconnected); // No longer can be used. // For simplicity you can never reuse a connection. When a connection fails // you can throw it away and start with a new one. That way it is perfectly // clear when all the details are reset. TMessageToServer = Array Of Record Name, Value : String; End; // We return one of these when you send a message. This is what you need // to cancel the message. At one time we used only the ServerId to cancel // the message, but it might be possible that you are trying to cancel a // message that is long gone, since server id's are reused. We saw this in // the Java code and it caused a lot of problems. TUniqueMessageId = Record // This is the id that we send to the server, and that the server sends // back to us. Our data structure allows us to effeciently look up a // message by this id. ServerId : Integer; // We increment this each time we send a message. It is almost // gaurenteed to be unique. UniqueId : Int64; End; TOnMessageFromServer = Procedure(Success : Boolean; // Returns true if we received a message. // If the connection closes when we were // expecting a message, we send a negative // response. Body : String; // From the server, as is. OriginalMessage : TMessageToServer) // As is. Of Object; TOnServerAutoRetry = Procedure(Msg : String) Of Object; TTalkWithServer = Class(TObject) Private ServerConnection: TGenericServerConnection; Procedure ConnectionUp; Procedure ConnectionDown; Procedure ServerConnectionSessionClosed(Connection : TGenericServerConnection); Procedure ServerConnectionSessionConnected(Connection : TGenericServerConnection); Procedure ServerConnectionDataAvailable(Connection : TGenericServerConnection); Private // Outgoing to server Wakeup : TWakeMeSoon; OutgoingBuffer : String; // Already compressed, if need be. Compressor : TZLibCompressor; Procedure SendString(S : String); Procedure CompressAndSendString(S : String); Procedure CompressAndSendInteger(I : Integer); Procedure CompressAndSendStringWithSize(S : String); Procedure FlushCompressionBuffer; Private // Incoming from server Decompressor : TZLibDecompressor; Procedure CheckForCompressedMessage; Private // Status to client FOnConnected, FOnDisconnected : TThreadMethod; FOnMessagePreview : TOnMessageFromServer; FOnAutoRetry : TOnServerAutoRetry; FServerConnectionStatus : TServerConnectionStatus; FErrorMessage : String; // There is no limit on the number of outstanding requests there can // be, but this is optimized for a short list. OutstandingRequests : Array Of Record OriginalRequest : TMessageToServer; UniqueId : Int64; Callback : TOnMessageFromServer; Streaming : Boolean; End; Procedure SendErrorResponse(Msg : TMessageToServer; Callback : TOnMessageFromServer; Streaming : Boolean); Procedure SendGoodResponse(MsgId : Integer; Body : String); Procedure ServerConnectionAutoRetry(Source : TGenericServerConnection; Msg : String); Private // Options FSocksUsername, FSocksPassword, FSocksHost : String; FSocksVersion, FSocksPort : Integer; FProxyUserName, FProxyPassword, FProxyServer : String; FProxyPort : Integer; FTimeoutMs : Integer; FUseHttpTunnel : Integer; FHttpTunnelAddress : String; FTcpIpServerAddress : String; Public Constructor Create; Destructor Destroy; Override; Procedure Connect; Function SendMessage(Msg : TMessageToServer; Response : TOnMessageFromServer = Nil; StreamingResponse : Boolean = False ) : TUniqueMessageId; Procedure AbandonMessage(MessageId : TUniqueMessageId); Property OnConnected : TThreadMethod Read FOnConnected Write FOnConnected; // If you get the OnDisconnected callback, and you didn't ask for // the shutdown, then read the error message. Property OnDisconnected : TThreadMethod Read FOnDisconnected Write FOnDisconnected; Property ErrorMessage : String Read FErrorMessage; // An error was worth reporting to the user. Aside from reporting, // we are taking care of this ourselves. Property OnAutoRetry : TOnServerAutoRetry Read FOnAutoRetry Write FOnAutoRetry; Property OnMessagePreview : TOnMessageFromServer Read FOnMessagePreview Write FOnMessagePreview; Property ServerConnectionStatus : TServerConnectionStatus Read FServerConnectionStatus; Property SocksHost : String Read FSocksHost Write FSocksHost; Property SocksPort : Integer Read FSocksPort Write FSocksPort; Property SocksUsername : String Read FSocksUsername Write FSocksUsername; Property SocksPassword : String Read FSocksPassword Write FSocksPassword; Property SocksVersion : Integer Read FSocksVersion Write FSocksVersion; Property ProxyUserName : String Read FProxyUserName Write FProxyUserName; Property ProxyPassword : String Read FProxyPassword Write FProxyPassword; Property ProxyServer : String Read FProxyServer Write FProxyServer; Property ProxyPort : Integer Read FProxyPort Write FProxyPort; Property TimeoutMs : Integer Read FTimeoutMs Write FTimeoutMs; Property UseHttpTunnel : Integer Read FUseHttpTunnel Write FUseHttpTunnel; Property HttpTunnelAddress : String Read FHttpTunnelAddress Write FHttpTunnelAddress; Property TcpIpServerAddress : String Read FTcpIpServerAddress Write FTcpIpServerAddress; End; { Name followed by value, followed by the next name, followed by the next value. Repeat as needed. } Function CreateMessageToServer(Fields : Array Of Const) : TMessageToServer; { Warning: The server always sends things with a . for a decimal seperator. StrToFloat uses the locale to decide on the decimal seperator. } Function StrToFloatServer(S : String) : Extended; Const EmptyUniqueMessageId : TUniqueMessageId = (ServerId : 0; UniqueId : 0); Function MessageIdIsEmpty(Const Id : TUniqueMessageId) : Boolean; Implementation {//$DEFINE DETAILED_LOG} Uses SocketServerConnections, HttpTunnels, MiscDebugWindows, StrUtils, Math, SysUtils; {$IFDEF DETAILED_LOG} // This quotes things the same way that you quote them in Pascal. Function QuoteForUser(S : String) : String; Var QuotesOpen : Boolean; Procedure OpenQuote; Begin If Not QuotesOpen Then Begin Result := Result + ''''; QuotesOpen := True End End; Procedure CloseQuote; Begin If QuotesOpen Then Begin Result := Result + ''''; QuotesOpen := False End End; Var I : Integer; Begin QuotesOpen := False; Result := ''; For I := 1 To Length(S) Do If S[I] = '''' Then Begin OpenQuote; Result := Result + '''''' End Else If S[I] In [' '..'~'] Then Begin OpenQuote; Result := Result + S[I] End Else Begin CloseQuote; Result := Result + '#' + IntToStr(Ord(S[I])) End; If Result = '' Then OpenQuote; CloseQuote End; {$ENDIF} Var NextUniqueMessageId : Int64 = 0; Function MessageIdIsEmpty(Const Id : TUniqueMessageId) : Boolean; Begin Result := Id.ServerId = 0 End; Function CreateMessageToServer(Fields : Array Of Const) : TMessageToServer; Function GetStringField(Index : Integer) : String; Begin { GetStringField } With Fields[Index] Do Case VType Of vtInteger: Result := IntToStr(VInteger); vtBoolean: Result := IfThen(VBoolean, '1', '0'); vtChar: Result := VChar; vtExtended: Result := FloatToStr(VExtended^); vtString: Result := VString^; vtPChar: Result := VPChar; //vtObject: Result := VObject.ClassName; //vtClass: Result := VClass.ClassName; vtAnsiString: Result := string(VAnsiString); vtCurrency: Result := CurrToStr(VCurrency^); vtVariant: Result := string(VVariant^); vtInt64: Result := IntToStr(VInt64^); Else Assert(False, 'Unknown field type.') End End; { GetStringField } Var I : Integer; Begin { CreateMessageToServer } Assert(Not Odd(Length(Fields))); SetLength(Result, Length(Fields) Div 2); For I := 0 To Pred(Length(Result)) Do With Result[I] Do Begin Name := GetStringField(I * 2); Value := GetStringField(Succ(I * 2)) End End; { CreateMessageToServer } Constructor TTalkWithServer.Create; Begin // The default error message: FErrorMessage := 'Unable to connect to server.'; // We do not want to flush the buffer after each message. That would be // ineffecient; the compression ratio would not be as good. We often send // two or more messages at the same time. TWake me soon will group those // requests so we only send flush the buffer after the last message. We // don't pause for any given amount of time. Instead, we send the // messages the next time we are idle. Note: This has the side effect of // grouping the data so we send fewer requests on the network. This helps // the TCP/IP performance some, and it helps the HTTP performance a lot. Wakeup := TWakeMeSoon.Create; Wakeup.OnWakeUp := FlushCompressionBuffer; Compressor := TZLibCompressor.Create; Decompressor := TZLibDecompressor.Create; // The initial handshake is very simple. We open the socket. We send the // following string. After that, there are two seperate streams of // compressed messages. We never reset the compression. That is to say, // we compress the entire stream, not the individual messages. SendString('command=set_output&mode=zlib'#13#10'command=set_input&mode=zlib'#13#10); End; Destructor TTalkWithServer.Destroy; Begin If ServerConnectionStatus <> scsDisconnected Then // Make sure that every request has a corresponding response. This is // a promise that we make to every client. This is the only way that // most clients will know that they need to retry. ConnectionDown; ServerConnection.Free; Wakeup.Free; Compressor.Free; Decompressor.Free End; Procedure TTalkWithServer.SendString(S : String); Begin If Length(S) > 0 Then Case ServerConnectionStatus Of scsNew, scsTrying : OutgoingBuffer := OutgoingBuffer + S; scsConnected : ServerConnection.SendStr(S); // Else, the message was dropped on the floor. It's possible that // we were in the process of sending a message when the connection // was closed, so this case is unaviodable, and not an error. End End; Procedure TTalkWithServer.CompressAndSendString(S : String); Begin Compressor.AddMore(S); If Length(Compressor.Compressed) >= 5000 Then Begin SendString(Compressor.Compressed); Compressor.Compressed := '' End; // Make sure we flush the buffer some time in the near future. Wakeup.RequestWakeup End; Procedure TTalkWithServer.CompressAndSendInteger(I : Integer); Var S : String; Begin SetLength(S, SizeOf(I)); PInteger(PChar(S))^ := I; CompressAndSendString(S) End; Procedure TTalkWithServer.CompressAndSendStringWithSize(S : String); Begin CompressAndSendInteger(Length(S)); CompressAndSendString(S) End; Procedure TTalkWithServer.FlushCompressionBuffer; Begin Compressor.Flush; SendString(Compressor.Compressed); Compressor.Compressed := '' End; { Reading input from the server going to the client. This will unmarshal as many complete messages as we can find and send them all to the client. Integers are sent as four bytes in the standard Intel byte order. Each message starts with a message id. This came from the message_id field that we sent to the server. This is sent as an integer. After that we receive the length of the message. That is also an integer. Length describes the number of bytes. Finally we receive the body of the message. This can be any string of bytes. Most often it is XML, but sometimes it contains binary data, including gif files. } Procedure TTalkWithServer.CheckForCompressedMessage; Type THeader = Record MessageId : Integer; Length : Integer; End; PHeader = ^THeader; Var Header : PHeader; Body : String; TotalSize : Integer; Consumed : Integer; Begin {$IFDEF DETAILED_LOG} SendMiscDebugMessage('In TTalkWithServer.CheckForCompressedMessage.'); {$ENDIF} Consumed := 0; Repeat {$IFDEF DETAILED_LOG} SendMiscDebugMessage('Decompressor.Decompressed = ' + QuoteForUser(Decompressor.Decompressed)); {$ENDIF} If Length(Decompressor.Decompressed) - Consumed < SizeOf(THeader) Then Break; Header := PHeader(PChar(Decompressor.Decompressed) + Consumed); TotalSize := Header^.Length + SizeOf(THeader); If Length(Decompressor.Decompressed) - Consumed < TotalSize Then Break; SetString(Body, PChar(Decompressor.Decompressed) + SizeOf(THeader) + Consumed, Header^.Length); {$IFDEF DETAILED_LOG} SendMiscDebugMessage('Sending MessageId=' + IntToStr(Header^.MessageId) + ', Body=' + QuoteforUser(Body)); {$ENDIF} SendGoodResponse(Header^.MessageId, Body); Consumed := Consumed + TotalSize Until False; Decompressor.Decompressed := MidBStr(Decompressor.Decompressed, Succ(Consumed), MaxInt); {$IFDEF DETAILED_LOG} SendMiscDebugMessage('TTalkWithServer.CheckForCompressedMessage done.'); {$ENDIF} End; // The main part of the client can send messages at any time. We wait until // the socket is open before sending anything. Otherwise the socket library // would throw an exception Procedure TTalkWithServer.ConnectionUp; Begin // Set this before the call to SendStr. SendStr could call our // ConnectionDown callback. FServerConnectionStatus := scsConnected; // The socket will not accept anything before it is up. // Once it's up, then it does all the buffering for us. ServerConnection.SendStr(OutgoingBuffer); OutgoingBuffer := ''; If Assigned(FOnConnected) Then FOnConnected End; // This is a message to the user and it is intended only for debugging. Function DescribeMessageToServer(Msg : TMessageToServer) : String; Var I : Integer; Begin Result := '('; For I := Low(Msg) To High(Msg) Do Begin If I > 0 Then Result := Result + ', '; Result := Result + '"' + Msg[I].Name + '" --> "' + Msg[I].Value + '"' End; Result := Result + ')' End; Procedure TTalkWithServer.ConnectionDown; Var I : Integer; Begin FServerConnectionStatus := scsDisconnected; OutgoingBuffer := ''; Compressor.Compressed := ''; If Assigned(FOnDisconnected) Then FOnDisconnected; For I := Low(OutstandingRequests) To High(OutstandingRequests) Do With OutstandingRequests[I] Do If Assigned(Callback) Then SendErrorResponse(OriginalRequest, Callback, Streaming); SetLength(OutstandingRequests, 0) End; Procedure TTalkWithServer.SendErrorResponse(Msg : TMessageToServer; Callback : TOnMessageFromServer; Streaming : Boolean); Begin Try // We don't display the error for streaming messages. This is a little // ugly. We're trying to solve a specific problem. The server can // disconnect us in a timeout. This happens when we are already stopped, // and not really expecting data. In this case we don't want to see a // yellow tickmark because that implies that there was a problem and we // are going to retry. Yuck! If Assigned(OnMessagePreview) And Not Streaming Then OnMessagePreview(False, '', Msg); Callback(False, '', Msg) Except On Ex : Exception Do SendMiscDebugMessage(Ex, 'TTalkWithServer.SendErrorResponse, ' + DescribeMessageToServer(Msg)) End End; Procedure TTalkWithServer.SendGoodResponse(MsgId : Integer; Body : String); Var SaveCallback : TOnMessageFromServer; SaveRequest : TMessageToServer; Begin SetLength(SaveRequest, 0); // Avoid silly compiler warning. If (MsgId > 0) And (MsgId <= Length(OutstandingRequests)) Then Begin With OutstandingRequests[MsgId - 1] Do Begin { Save this info right away. The callback can resize the array. We don't want to keep a pointer to the old array. So we do the cleanup before we do the callbacks, and we save copies so that we can do the callbacks. } SaveCallback := Callback; SaveRequest := OriginalRequest; If Not Streaming Then Begin Callback := Nil; SetLength(OriginalRequest, 0) End End; If Assigned(SaveCallback) Then Try If Assigned(OnMessagePreview) Then OnMessagePreview(True, Body, SaveRequest); SaveCallback(True, Body, SaveRequest) Except On Ex : Exception Do SendMiscDebugMessage(Ex, 'TTalkWithServer.SendGoodResponse, ' + DescribeMessageToServer(SaveRequest)) End End End; // In some cases the server connection will fail, causing this TTalkWithServer // to fail. In those cases the owner of this object is reponsible for // creating a new object and trying again. In some cases the server // connection will automatically perform one or more retries before giving up. // In that case, it should use this procedure to report the status out to the // main program. That way the user can see that there is a problem. Procedure TTalkWithServer.ServerConnectionAutoRetry( Source : TGenericServerConnection; Msg : String); Begin If Assigned(OnAutoRetry) Then OnAutoRetry(Msg) End; Procedure TTalkWithServer.Connect; Procedure TcpIpConnect; Var ColonPosition : Integer; Address, Port : String; Begin { TcpIpConnect } ColonPosition := Pos(':', TcpIpServerAddress); If ColonPosition > 0 Then Begin Port := MidBStr(TcpIpServerAddress, Succ(ColonPosition), MaxInt); Address := LeftBStr(TcpIpServerAddress, Pred(ColonPosition)) End Else Begin Port := '8888'; Address := TcpIpServerAddress End; ServerConnection := TSocketServerConnection.Create(Address, Port) End; { TcpIpConnect } Begin { TTalkWithServer.Connect } Assert((ServerConnectionStatus = scsNew) And (Not Assigned(ServerConnection))); If UseHttpTunnel = 0 Then TcpIpConnect Else ServerConnection := THttpTunnel.Create(HttpTunnelAddress); ServerConnection.OnSessionClosed := ServerConnectionSessionClosed; ServerConnection.OnSessionConnected := ServerConnectionSessionConnected; ServerConnection.OnDataAvailable := ServerConnectionDataAvailable; ServerConnection.OnAutoRetry := ServerConnectionAutoRetry; If SocksVersion In [4,5] Then Begin ServerConnection.SocksServer := SocksHost; ServerConnection.SocksPort := IntToStr(SocksPort); ServerConnection.SocksUsercode := SocksUserName; ServerConnection.SocksPassword := SocksPassword; ServerConnection.SocksLevel := IntToStr(SocksVersion) End; ServerConnection.ProxyUserName := ProxyUserName; ServerConnection.ProxyPassword := ProxyPassword; ServerConnection.ProxyServer := ProxyServer; ServerConnection.ProxyPort := ProxyPort; ServerConnection.TimeoutMs := TimeoutMs; // If there is an error in the attempt to connect, we will report the // error before we get a chance to set FErrorMessage properly. Luckly // we get another chance to report the error message after this. So we // set FErrorMessage to '' for now. We will still see an extra yellow // tick mark. (It would be nice to get rid of this, and only see one // yellow tick mark because there was only one failure.) However, we will // not display any messages in the ToolTip until after we have had a // chance to set FErrorMessage to the right value. // // This is important so we don't overwhealm the list of errors. // Typically if we see one error here, we will see a lot of them. // If one error message is repeated over and over, the ToolTip will // properly merge all of the messages togather. However, if two different // messages alternate, then you will quickly delete all of the old // messages. That's what would happen if we did not set FErrorMessage to // the empty string right here. FErrorMessage := ''; If Not ServerConnection.Connect Then FErrorMessage := ServerConnection.ConnectionErrorMessage Else FErrorMessage := 'Error in server connection.' End; { TTalkWithServer.Connect } // The caller does not want any more messages. This is often called by // a destructor; if the listen is gone we can't deliver a message to him! // At this time the message id is still reserved. The server does not know // about this, so it might still send us a message with this id. Procedure TTalkWithServer.AbandonMessage(MessageId : TUniqueMessageId); Begin If (MessageId.ServerId > 0) And (MessageId.ServerId <= Length(OutstandingRequests)) Then With OutstandingRequests[Pred(MessageId.ServerId)] Do If UniqueId = MessageId.UniqueId Then Callback := Nil End; { There's basically 3 ways to send a message. If we don't want a response, then we set the message_id field to 0 (or leave it blank). In some cases the server will notice the 0 and will not send us a reply, or will send us a simpler reply. In any case, all messages returned to us with a message id of 0 are thrown away by this unit. We can also say that we want a single response. In that case this function will generate a new message id. It will store that in the message_id field of the outgoing message. It will mark the message_id as used, so we don't use it again. And it will return the message id to the caller, in case he wants to cancel the message. We can also say that we want a stream of responses. Each of the responses will have the same message id. We treat a streaming response similar to a single response. This function does all of the same setup, but also records that we are expecting a streaming response. The part of this code that receives messages needs to know if we are expecting a single response or a streaming response. Note: Certain server commands will give no response, others will give one response, and others will give multiple responses. Whoever calls this function must tell us which type of command this is. This unit has no other way to know. If you call this unit with the wrong information, probelems can occur. The message to the server is a series of name value pairs. (This was meant to replace an HTTP-based API, and the names and values are like the part of the URL after the question mark.) A value can be any string. A name cannot be the empty string. (A name should not be duplicated, but this unit does not worry about that issue.) To encode an integer, send the four bytes in the standard Intel byte order. (A.K.A. "little endian") To encode a string, send the size of the string as an integer, as described above, then send the bytes of the string. The size is always the number of bytes. We don't know or care about character sets. To encode a name value pair, first send the name and then send the value. Both the name and value are encoded as strings, as described above. To encode a message, send each name value pair in the way described above. The order does not matter. (This is only true if you've followed the rules and you have not used any illegal names!) Then send the integer 0. Note: The message id is just another name value pair. The name is "message_id". The value is the id, printed in decimal. The caller should not include a field named "message_id". } Function TTalkWithServer.SendMessage(Msg : TMessageToServer; Response : TOnMessageFromServer = Nil; StreamingResponse : Boolean = False ) : TUniqueMessageId; Function BadMessage : Boolean; Var I : Integer; Begin { BadMessage } Result := False; For I := Low(Msg) To High(Msg) Do If Msg[I].Name <> '' Then Exit; Result := True End; { BadMessage } Var I : Integer; Begin { TTalkWithServer.SendMessage } If (ServerConnectionStatus = scsDisconnected) Or BadMessage Then Begin If Assigned(Response) Then SendErrorResponse(Msg, Response, StreamingResponse); Result.ServerId := 0; Result.UniqueId := 0; End Else Begin If Assigned(Response) Then Begin I := Pred(Length(OutstandingRequests)); While (I >= 0) And (Length(OutstandingRequests[I].OriginalRequest) > 0) Do Dec(I); If I < 0 Then Begin // Make the list longer. I := Length(OutstandingRequests); SetLength(OutstandingRequests, Max(8, 2*Length(OutstandingRequests))) End; With OutstandingRequests[I] Do Begin OriginalRequest := Msg; Callback := Response; Streaming := StreamingResponse End; // Valid message ids start with 1. Inc(I) End Else // 0 Means no response required. I := 0; Result.ServerId := I; If I > 0 Then Begin Result.UniqueId := NextUniqueMessageId; OutstandingRequests[Pred(I)].UniqueId := NextUniqueMessageId; Inc(NextUniqueMessageId) End Else Result.UniqueId := 0; If I > 0 Then Begin CompressAndSendStringWithSize('message_id'); CompressAndSendStringWithSize(IntToStr(I)) End; For I := Low(Msg) To High(Msg) Do With Msg[I] Do If Name <> '' Then Begin CompressAndSendStringWithSize(Name); CompressAndSendStringWithSize(Value) End; CompressAndSendInteger(0) End End; { TTalkWithServer.SendMessage } // This is a callback from the lower level. Procedure TTalkWithServer.ServerConnectionSessionClosed(Connection : TGenericServerConnection); Begin ConnectionDown End; // This is a callback from the lower level. Procedure TTalkWithServer.ServerConnectionSessionConnected(Connection : TGenericServerConnection); Begin // We have successfully connected to the server. FErrorMessage := 'Unexpected disconnect from server.'; ConnectionUp End; // This is a callback from the lower level. Procedure TTalkWithServer.ServerConnectionDataAvailable(Connection : TGenericServerConnection); Begin Try Decompressor.AddMore(ServerConnection.GetAll); Except On Ex : Exception Do Begin FErrorMessage := 'Invalid data from server: ' + Ex.Message; // This next line should be redundant. SendMiscDebugMessage(Ex, 'TalkWithServer, TTalkWithServer.ServerConnectionDataAvailable'); Connection.Close End End; CheckForCompressedMessage End; Function StrToFloatServer(S : String) : Extended; Var I : Integer; Begin If DecimalSeparator <> '.' Then Begin SetLength(S, Length(S)); For I := 1 To Length(S) Do If S[I] = '.' Then S[I] := DecimalSeparator End; Result := StrToFloat(S) End; End.