Unit TalConversationThread; Interface Uses DataNodes, GenericDataNodes, TalBase, StringStringHashTables, SyncObjs; Type TTalMessageInfo = Class(TBroadcastMessage) Private StaticallyAllocated : Boolean; FData : PTalConvData; Private Class Function MemoryManagable : Boolean; Public Procedure FreeInstance; Override; Class Function NewInstance: TObject; Override; Protected Procedure AfterDelivery; Override; Public Property Data : PTalConvData Read FData; End; TTalConversaitonDataNode = Class(TGenericDataNode) Private FUp : Boolean; FId : Integer; FMachine, FService, FTopic : String; Conv : TTalConversation; ConvValid : Boolean; Thread : TObject; WakeThread : TEvent; RequestThreadDie : Boolean; RequestedThreadAction : ( rtaRequestConnection, // The thread is active. It is trying to // create a conversation. When it succeeds, // it sets the state to rtaFindMessages, // but does not notify the data node. rtaFindMessages, // The thread is active. It is listening to // the conversation. It will automatically // broadcast normal messages, and it will notify the // data node of the special connect message. // As soon as there is a disconnect message, // it sets the state to rtaWait then // notifies the data node. An initial NACK // message is treated the same way as a disconnect. rtaWait); // The thread is inactive. It is waiting for the data // node to change the state and wake it. The thread does // not access the conversation. Procedure SetState(Up : Boolean); Procedure ReleaseOldConversation; Procedure DataFeedUp; Procedure DataFeedDown; Published // True means up. You can send requests. On transition from down to // up, resend requests. False means down. On transition from up to // down, warn listeners that we don't have data. // No false transitions. Every time we notify listeners, there was // a change. Function GetBoolean : Boolean; Override; Public // Always true. Function IsValid : Boolean; Override; Destructor Destroy; Override; Protected // The input should be an id (an integer) followed by machine, service, // and topic (all strings). // Id just allows us to create new items. // Machine, service and topic are all passed as is to // the TAL server. Constructor Create(Params : TParamList); Override; Private BroadcastChannels : THashTable; BroadcastChannelLock : TCriticalSection; Public // Call these only from the data node thread. Procedure Request(TQL : String; DataFormat : String = 'TAL4'); Procedure Advise(TQL : String; DataFormat : String = 'TAL4'); Procedure Unadvise(TQL : String; DataFormat : String = 'TAL4'); Procedure Execute(Statement : String); // Call these from any thread. Reserving a string which has already // been reserved causes an immediate error. That would be a // sign of a bigger problem, since the datafeed should only have // at most one copy of each request active at a time. // Releasing a string which has never been reserved is okay. That // probably means that a node was created then destroyed before some // of the initialization could be finished. Requesting a channel // which does not exist returns ''. This // typically means that a message arrived from the datafeed // shortly after we stoped caring about that type of message. // The real purpose of this is to save memory, especially from // fragmentation. The extra error checking is just a bonus. Function ReserveBroadcastChannel(TQL : String) : String; Function GetBroadcastChannel(TQL : String) : String; Procedure ReleaseBroadcastChannel(TQL : String); End; Implementation Uses DebugOutput, Classes, SysUtils, Windows, Math; //////////////////////////////////////////////////////////////////////// // TTalConversaitonThread //////////////////////////////////////////////////////////////////////// Var TalInitializedProperly : Boolean; Type TTalConversaitonThread = Class(TThread) Public Owner : TTalConversaitonDataNode; Protected Procedure Execute; Override; End; Procedure TTalConversaitonThread.Execute; Const GiveUp = 2000; // This should never be necessary. If a wait lasts // 2 seconds, give up. Retry = 2000; // On failure to connect, retry in 2 seconds. Var ConvStatus : TTalConvReturnValue; WaitList : Array [0..1] Of THandle; IncomingMessage : PTalConvData; OutgoingMessage : TTalMessageInfo; Channel : String; Var CurrentLiveCount : Integer; MaxLiveCount : Integer; MessageCount : Integer; NextDumpTime : TDateTime; CurrentLiveStart : TDateTime; MaxLiveTime : TDateTime; Begin If Not TalInitializedProperly Then Exit; CurrentLiveCount := 0; MaxLiveCount := 0; MessageCount := 0; NextDumpTime := 0; CurrentLiveStart := 0; MaxLiveTime := 0; Repeat Case Owner.RequestedThreadAction Of rtaRequestConnection: Begin DebugOutputWindow.AddMessage('Attempting to connect (' + Owner.FMachine + ', ' + Owner.FService + ', ' + Owner.FTopic + ')'); ConvStatus := TALConvConnect(PChar(Owner.FMachine), PChar(Owner.FService), PChar(Owner.FTopic), Owner.Conv); If ConvStatus = TALCONV_OK Then Begin Owner.ConvValid := True; Owner.RequestedThreadAction := rtaFindMessages End Else Begin DebugOutputWindow.AddMessage('TALConvConnect failed: ' + IntToStr(Ord(ConvStatus))); ConvStatus := TALConvFree(Owner.Conv); DebugOutputWindow.AddMessage('TALConvFree(' + IntToStr(Ord(Owner.Conv)) + ') -> ' + IntToStr(Ord(ConvStatus))); Owner.WakeThread.WaitFor(Retry) End End; rtaFindMessages: Begin If Now() >= NextDumpTime Then Begin If MessageCount > 0 Then DebugOutputWindow.AddMessage( Format('MessageCount=%d, MaxLiveCount=%d, MaxLiveTime=%.04f', [MessageCount, MaxLiveCount, MaxLiveTime * 24.0 * 60.0 * 60.0])); MaxLiveCount := 0; MessageCount := 0; NextDumpTime := Now + 1.0 / 24.0 / 60.0 End; IncomingMessage := TALConvGetData(Owner.Conv); If Assigned(IncomingMessage) Then Begin Inc(MessageCount); Inc(CurrentLiveCount); Case IncomingMessage^.iType Of TALCONV_INIT_ACK : Begin DebugOutputWindow.AddMessage('TALCONV_INIT_ACK'); Owner.AddToThreadQueue(Owner.DataFeedUp) End; TALCONV_INIT_NACK : Begin DebugOutputWindow.AddMessage('TALCONV_INIT_NACK'); Owner.WakeThread.WaitFor(Retry); Owner.AddToThreadQueue(Owner.DataFeedDown) End; TALCONV_TERMINATE : Begin DebugOutputWindow.AddMessage('TALCONV_TERMINATE'); Owner.RequestedThreadAction := rtaWait; Owner.AddToThreadQueue(Owner.DataFeedDown) End; TALCONV_REQUEST_DATA, TALCONV_ADVISE_DATA : Begin Channel := Owner.GetBroadcastChannel(IncomingMessage^.szItem); If Channel <> '' Then Begin OutgoingMessage := TTalMessageInfo.Create; OutgoingMessage.FData := IncomingMessage; OutgoingMessage.Send(Channel); IncomingMessage := Nil End End; TALCONV_OTHER_NACK : { I don't know why we see thing sometimes. } { I notice this alot around 5:01, when we ask TAL to refresh us. } DebugOutputWindow.AddMessage('TALCONV_OTHER_NACK: ' + IncomingMessage^.szItem); TALCONV_EXECUTE_ACK, TALCONV_OTHER_ACK : Begin // Do nothing. DON'T display an error message. End; Else DebugOutputWindow.AddMessage('Unexpected: ' + IntToStr(Ord(IncomingMessage^.iType))) End; If Assigned(IncomingMessage) Then TALConvFreeData(IncomingMessage) End Else Begin If CurrentLiveStart > 0 Then Begin CurrentLiveStart := Now - CurrentLiveStart; If CurrentLiveStart > MaxLiveTime Then MaxLiveTime := CurrentLiveStart End; TALConvGetEventHandle(Owner.Conv, WaitList[0]); WaitList[1] := Owner.WakeThread.Handle; WaitForMultipleObjects(2, @WaitList[0], False, 100); MaxLiveCount := Max(MaxLiveCount, CurrentLiveCount); CurrentLiveCount := 0; CurrentLiveStart := Now End End; rtaWait: Owner.WakeThread.WaitFor(GiveUp) End Until Terminated Or Owner.RequestThreadDie End; //////////////////////////////////////////////////////////////////////// // TTalConversaitonDataNode //////////////////////////////////////////////////////////////////////// Function TTalConversaitonDataNode.GetBoolean : Boolean; Begin Result := FUp End; Function TTalConversaitonDataNode.IsValid : Boolean; Begin Result := True End; Constructor TTalConversaitonDataNode.Create(Params : TParamList); Var T : TTalConversaitonThread; Begin Assert(Length(Params) = 4, 'Expected params: (Id, Machine, Service, Topic)'); FId := Params[0]; FMachine := Params[1]; FService := Params[2]; FTopic := Params[3]; //DebugOutputWindow.AddMessage('TTalConversaitonDataNode.Create(' + IntToStr(FId) + ', "' + FMachine + '", "' + FService + '", "' + FTopic + '")'); Inherited Create; BroadcastChannels := tHashTable.create; BroadcastChannelLock := TCriticalSection.Create; WakeThread := TEvent.Create(Nil, False, False, ''); T := TTalConversaitonThread.Create(True); Thread := T; T.Owner := Self; T.Resume End; Procedure TTalConversaitonDataNode.Request(TQL : String; DataFormat : String = 'TAL4'); Begin If FUp Then TALConvRequest(Conv, PChar(TQL), PChar(DataFormat)) End; Procedure TTalConversaitonDataNode.Advise(TQL : String; DataFormat : String = 'TAL4'); Begin If FUp Then TALConvAdvise(Conv, PChar(TQL), PChar(DataFormat)) End; Procedure TTalConversaitonDataNode.Unadvise(TQL : String; DataFormat : String = 'TAL4'); Begin If FUp Then TALConvUnadvise(Conv, PChar(TQL), PChar(DataFormat)) End; Procedure TTalConversaitonDataNode.Execute(Statement : String); Begin If FUp Then TALConvExecute(Conv, PChar(Statement)) End; Procedure TTalConversaitonDataNode.DataFeedUp; Begin // The thread automatically puts itself into the right state to listen to // messages, and sets up the conversation pointer in this object. SetState(True) End; Procedure TTalConversaitonDataNode.DataFeedDown; Begin // This object must take down the old conversation (in the right thread, // when noone is using the conversation) then notify the conversation // thread to start trying to create a new conversation. SetState(False); ReleaseOldConversation; RequestedThreadAction := rtaRequestConnection; WakeThread.SetEvent End; Procedure TTalConversaitonDataNode.ReleaseOldConversation; Begin If ConvValid Then TALConvFree(Conv); ConvValid := False End; Procedure TTalConversaitonDataNode.SetState(Up : Boolean); Begin If FUp <> Up Then Begin FUp := Up; NotifyListeners End End; Destructor TTalConversaitonDataNode.Destroy; Begin RequestThreadDie := True; WakeThread.SetEvent; Thread.Free; ReleaseOldConversation; WakeThread.Free; BroadcastChannels.Free; BroadcastChannelLock.Free; Inherited End; Function TTalConversaitonDataNode.ReserveBroadcastChannel(TQL : String) : String; Var AddedNewValue : Boolean; Begin Result := Format('TTalConversaitonDataNode.%x.%s', [Integer(Self), TQL]); BroadcastChannelLock.Enter; Try AddedNewValue := BroadcastChannels.setValue(TQL, Result) Finally BroadcastChannelLock.Leave End; If Not AddedNewValue Then Assert(False, 'Duplicate TAL request: ' + TQL) End; Function TTalConversaitonDataNode.GetBroadcastChannel(TQL : String) : String; Begin BroadcastChannelLock.Enter; Try Result := BroadcastChannels[TQL] Finally BroadcastChannelLock.Leave End End; Procedure TTalConversaitonDataNode.ReleaseBroadcastChannel(TQL : String); Begin BroadcastChannelLock.Enter; Try BroadcastChannels.remove(TQL) Finally BroadcastChannelLock.Leave End End; //////////////////////////////////////////////////////////////////////// // TTalMessageInfo //////////////////////////////////////////////////////////////////////// Procedure TTalMessageInfo.AfterDelivery; Begin TALConvFreeData(Data); Inherited End; Type PTalInfoRecord = ^TTalInfoRecord; TTalInfoRecord = Record Next : PTalInfoRecord End; Var TalInfoFreeList : PTalInfoRecord; FreeListCriticalSection : TCriticalSection; Class Function TTalMessageInfo.MemoryManagable : Boolean; Begin Result := InstanceSize = TTalMessageInfo.InstanceSize End; Procedure TTalMessageInfo.FreeInstance; Var Node : PTalInfoRecord; Begin CleanupInstance; If StaticallyAllocated Then Begin Node := PTalInfoRecord(Self); FreeListCriticalSection.Enter; Node.Next := TalInfoFreeList; TalInfoFreeList := Node; FreeListCriticalSection.Leave End Else FreeMem(Pointer(Self)) End; Class Function TTalMessageInfo.NewInstance: TObject; Var Node : PTalInfoRecord; Begin If MemoryManagable Then Begin FreeListCriticalSection.Enter; Node := TalInfoFreeList; If Assigned(Node) Then TalInfoFreeList := Node.Next; FreeListCriticalSection.Leave End Else Node := Nil; If Assigned(Node) Then Result := TObject(Node) Else GetMem(Pointer(Result), InstanceSize); InitInstance(Result) End; Procedure PreallocateNodes(Count : Integer); Var I : Integer; A : Array Of TTalMessageInfo; Begin SetLength(A, Count); For I := 0 To Pred(Count) Do Begin A[I] := TTalMessageInfo.Create; A[I].StaticallyAllocated := True End; For I := 0 To Pred(Count) Do A[I].Free End; //////////////////////////////////////////////////////////////////////// // Initialization //////////////////////////////////////////////////////////////////////// Procedure InitializeTal; Var Status : TTalConvReturnValue; Begin Status := TALConvInitialize; If Status = TALCONV_OK Then TalInitializedProperly := True //Else Not initialized yet! // TalStatusWindow.AddMessage('InitializeTal returned bad value: ' + IntToStr(Ord(Status))) End; Initialization FreeListCriticalSection := TCriticalSection.Create; PreallocateNodes(10000); InitializeTal; Finalization TALConvShutdown End.