Unit TalMsgListeners; // What a mess. This only works for LIVEQUOTE and is only required for // LIVEQUOTE. This unit is responsible for taking a request for a symbol // and turning it into a request for a list. Lists of 200 symbols are more // effecient than individual requests. This takes care of requests and // advises. // // (Note: This deals with multiple TABLES in the livequote TOPIC, one of // which is also named livequote.) // // TTalMsgListenerDataNode hides this. There is one object per symbol per // type of data. // // TTalConvListenerDataNode does the bulk of the work. One of these // objects exists for each type of data request. It takes in broadcast messages // from another thread for each message on the data link. It then finds the // symbol associated with the message, and rebroadcasts to the // TTalMsgListenerDataNode. Interface Uses DataNodes, GenericDataNodes, TalBase, TalConversationThread, Classes; Type TTalMsgStatus = (tmsDown, tmsRequest, tmsAdvise); TTalTable = (ttNone, ttLiveQuote, ttRegional); TTalMsgListenerDataNode = Class(TDataNode) Private Source : TObject; FTQL, FSymbol : String; FTable : TTalTable; FOnceOnly : Boolean; FRecord : PTal4Handle; FMessageStatus : TTalMsgStatus; Procedure Initialize; Procedure NewConversationStatus; Procedure NewData(Msg : TBroadcastMessage; Owner : TDataNode); Constructor Create(TQL : String; Table : TTalTable; Symbol : String; OnceOnly : Boolean); Public // This is a low level object, a very simple wrapper around // the broadcast messages. Each call to find creates a new one. // The user must ensure that the the same request never goes down // the same conversation twice. The user should store the messages // so that higher levels can share the data. Class Procedure Find(TQL : String; Table : TTalTable; Symbol : String; OnceOnly : Boolean; OnChange : TThreadMethod; Out Node : TTalMsgListenerDataNode; Out Link : TDataNodeLink); // This should be automatic. That's the point of streaming data. // But in some cases we loose messages. Procedure RefreshSnapshotData; // This is only valid during the callback to OnEvent. // Nil means the datafeed is down. Function GetRecord : PTal4Handle; // This is only valid during the callback to OnEvent. Property MessageStatus : TTalMsgStatus Read FMessageStatus; Destructor Destroy; Override; // This is mostly aimed at debugging. Property Symbol : String Read FSymbol; End; Implementation Uses StringObjectHashTables, TalCommon, Timers, DebugOutput, SimpleMarketData, SysUtils, Math, DateUtils; Const // In principal all the data could go through one conversation. Or each // request could have it's own conversation. This constant allows us to // chose the number of conversations to help us tweak the performance of // the TAL datafeed. // // This was a nice idea, but in practice it caused memory fragmentation to // become significantly worse when we had 10 threads, or even 2 threads, // compared to just one. With 10 threads, a machine watching 1/2 the // NYSE data crashed just 15 minutes after the open. It ran out of memeory. // // I have not been able to say for certain if this does or does not help // with our bandwidth issues. However, netstat says that multiple // conversations all share one TCP/IP socket. So I'm assuming that this // (even if we could get past the memory issue) would not change our // performance. I've seen multiple seperate applications get better // peformance. But netstat tells me that each application has its own // TCP/IP socket, so that was definately doing something different from what // this constant does. ConversationCount = 1; Var LiveQuoteConversationFactory : Array [0 .. Pred(ConversationCount)] Of IGenericDataNodeFactory; //////////////////////////////////////////////////////////////////////// // TTalConvListenerDataNode //////////////////////////////////////////////////////////////////////// Const MaxListLength = 200; RequestOnlyId = -1; // No list id required. Lists are only for advises. Type TTalRecordInfo = Class(TBroadcastMessage) Public Data : PTal4Handle; Status : TTalMsgStatus; End; TPendingRequest = Class(TObject) Public ListId : Integer; OutputChannel : String; // Send messages to our listener. RequestChannel : String; // Recieve info for a request on this channel. Active : Boolean; // We are expecting data from TAL for this. End; TTalConvListenerDataNode = Class(TGenericDataNode) Public // This data node takes care of bunching the // symbols into groups of MaxListLength. Each owner manually adds and // removes symbols. Messages are returned via broadcast. Up status // is handled automatically. When the link goes down, this data // node notifies its listeners. The owner is // responsible for making sure requests are unique. Class Procedure Find(Const TQL, Symbol : String; Table : TTalTable; OnChange : TThreadMethod; Out Node : TTalConvListenerDataNode; Out Link : TDataNodeLink); Overload; // Always do this in the data node thread. Returns a channel. Function ReserveSymbol(Symbol : String; RequestOnly : Boolean) : String; // Call from any thread. Don't repeat this without // calling RemoveSymbol first. Must call ReserveSymbol before // calling this. Procedure StartSymbolData(Symbol : String); // Call from any thread. // Should never be required, but sometimes we loose a message. Procedure RefreshSymbolSnapshotData(Symbol : String); // Call from any thread. It is acceptable to stop a symbol // which was not reserved or started. // Failing to stop a symbol is bad, because that is the only way // we know to stop asking for data. Procedure RemoveSymbol(Symbol : String); Protected Constructor Create(Params : TParamList); Override; Private FTable : TTalTable; TimerEvent : ITimerEvent; FSymbolInfo : THashTable; FListInfo : Array Of Record Size : Integer; Channel : String End; FTQL : String; FConversationDataNode : TTalConversaitonDataNode; Function TqlForRequest(Symbol : String) : String; Function TqlForAdvise(ListId : Integer) : String; Procedure RequestDataForList(ListId : Integer); Procedure UnRequestDataForList(ListId : Integer); Procedure DoRequestForSymbol(Symbol : String); Procedure DoAdviseForSymbol(Symbol : String); Procedure TalConversationStatusChange; Procedure RefreshAll; Procedure OnTimerCallback; Procedure MessageReceived(Msg : TBroadcastMessage; Owner : TDataNode); Function ListName(ListId : Integer) : String; Function TableName : String; End; Function TTalConvListenerDataNode.TableName : String; Begin Case FTable Of ttLiveQuote : Result := 'LIVEQUOTE'; ttRegional : Result := 'REGIONAL'; Else Assert(False, 'Invalid table type: ' + IntToStr(Ord(FTable))) End End; Function TTalConvListenerDataNode.TqlForRequest(Symbol : String) : String; Begin Result := TableName + ';1003,' + FTQL + '1003=''' + Symbol + '''' End; Function TTalConvListenerDataNode.TqlForAdvise(ListId : Integer) : String; Begin Result := TableName + ';1003,' + FTQL + '9034=''' + ListName(ListId) + '''' End; Function TTalConvListenerDataNode.ListName(ListId : Integer) : String; Begin Result := 'List_' + IntToStr(Integer(Self)) + '_' + IntToStr(ListId); End; Procedure TTalConvListenerDataNode.RequestDataForList(ListId : Integer); Var TQL : String; Begin TQL := TqlForAdvise(ListId); If FListInfo[ListId].Channel = '' Then Begin // Initial request FListInfo[ListId].Channel := FConversationDataNode.ReserveBroadcastChannel(TQL); RegisterForBroadcast(FListInfo[ListId].Channel, MessageReceived) End; FConversationDataNode.Advise(TQL) End; Procedure TTalConvListenerDataNode.UnRequestDataForList(ListId : Integer); Var TQL : String; Begin If (ListId < Length(FListInfo)) And (FListInfo[ListId].Channel <> '') Then Begin TQL := TqlForAdvise(ListId); FConversationDataNode.Unadvise(TQL); FConversationDataNode.ReleaseBroadcastChannel(TQL); UnRegisterForBroadcast(FListInfo[ListId].Channel); FListInfo[ListId].Channel := '' End End; Procedure TTalConvListenerDataNode.DoRequestForSymbol(Symbol : String); Var RequestInfo : TPendingRequest; RequestTQL : String; Begin RequestInfo := FSymbolInfo[Symbol] As TPendingRequest; Assert(Assigned(RequestInfo)); RequestTQL := TqlForRequest(Symbol); FConversationDataNode.Request(RequestTQL) End; Procedure TTalConvListenerDataNode.RefreshSymbolSnapshotData(Symbol : String); Var RequestInfo : TPendingRequest; Begin RequestInfo := FSymbolInfo[Symbol] As TPendingRequest; If Assigned(RequestInfo) Then FConversationDataNode.Request(TqlForRequest(Symbol)) End; Procedure TTalConvListenerDataNode.DoAdviseForSymbol(Symbol : String); Var RequestInfo : TPendingRequest; Begin RequestInfo := FSymbolInfo[Symbol] As TPendingRequest; Assert(Assigned(RequestInfo)); If RequestInfo.ListId <> RequestOnlyId Then FConversationDataNode.Execute('[AddWatch(' + TableName + ',' + ListName(RequestInfo.ListId) + ',' + Symbol + ')]') End; Class Procedure TTalConvListenerDataNode.Find( Const TQL, Symbol : String; Table : TTalTable; OnChange : TThreadMethod; Out Node : TTalConvListenerDataNode; Out Link : TDataNodeLink); Var Factory : IGenericDataNodeFactory; TempNode : TGenericDataNode; Begin // We hash on the symbol to select a conversation to spread out the work. // We can't hash on the TQL because there are only a handful of unique // TQL strings. (If we wanted to go that route we'd just make one // conversation per TQL string.) We could hash on the TQL and the symbol // but I decided not to for a couple of reasons. (a) It's just that much // more work to concatinate them and (b) having all the data for a symol // in one conversation might make the different types of data for the same // symbol arrive in order. Factory := TGenericDataNodeFactory.CreateWithArgs(TTalConvListenerDataNode, TQL, Table, LiveQuoteConversationFactory[Cardinal(StringHashCode(Symbol)) Mod ConversationCount]); Factory.Find(OnChange, TempNode, Link); Node := TempNode As TTalConvListenerDataNode End; Constructor TTalConvListenerDataNode.Create(Params : TParamList); { StartTimer may be OBE. We used to loose a lot of messages when TAL reset at 5am. But I don't see this any more. And this code } Procedure StartTimer; Const // The bad time is 4:55 am. // If we start before then, we need to manually refresh at 5am. // Between 4:55 and 4:57:30 we don't really need to refresh, but // that buffer allows us to be sure we didn't miss it. // If we start between 4:47:30 and 5:00am we don't need to do // the request, and we don't want to, because we're already busy. // After 5am, if we request this at 5am, we are making a request // in the past. This will be satisfied immediately, which just // causes trouble. // This might be a lot of trouble for nothing, since the request // will probably come before the datafeed is up, and before most // of the symbols are even in our list, but this is safer. RequestTime = 1.0 / 24.0 * 5.0; // 5am Buffer = 1.0 / 24.0 / 60.0 * 2.5; // 2 1/2 minutes. Var StartTime : TDateTime; Begin { StartTimer } StartTime := Today + RequestTime; If Now - (StartTime - Buffer) > 0 Then // Do it tomorrow, we're already passed. StartTime := StartTime + 1; TimerEvent := RequestPeriodicCallback(OnTimerCallback, 24*60*60*1000, StartTime) End; { StartTimer } Var ConversationFactory : IGenericDataNodeFactory; TempNode : TGenericDataNode; Link : TDataNodeLink; Begin { TTalConvListenerDataNode.Create } Assert(Length(Params) = 3, 'Expecting TQL, Table, Conversation'); FTQL := Params[0]; FTable := Params[1]; ConversationFactory := IUnknown(Params[2]) As IGenericDataNodeFactory; //DebugOutputWindow.AddMessage('TTalConvListenerDataNode.Create("' + FTQL + '", ' + IntToStr(Ord(FTable)) + ', CF)'); Inherited Create(True); ConversationFactory.Find(TalConversationStatusChange, TempNode, Link); AddAutoLink(Link); FConversationDataNode := TempNode As TTalConversaitonDataNode; FSymbolInfo := tHashTable.create; //StartTimer End; { TTalConvListenerDataNode.Create } Function TTalConvListenerDataNode.ReserveSymbol(Symbol : String; RequestOnly : Boolean) : String; Function GetListSize(ListId : Integer) : Integer; Begin If ListId >= Length(FListInfo) Then Result := 0 Else Result := FListInfo[ListId].Size End; Procedure IncListSize(ListId : Integer); Var OldLength, I : Integer; Begin { } If ListId >= Length(FListInfo) Then Begin OldLength := Length(FListInfo); SetLength(FListInfo, Max(ListId + 10, Length(FListInfo)*2)); For I := OldLength To Pred(Length(FListInfo)) Do With FListInfo[I] Do Begin Size := 0; Channel := '' End End; Inc(FListInfo[ListId].Size) End; Var ListId : Integer; RequestInfo : TPendingRequest; Begin { TTalConvListenerDataNode.ReserveSymbol } If FSymbolInfo.containsKey(Symbol) Then Raise Exception.Create('Duplicate symbol, ' + Symbol + ', for ' + FTQL) Else Begin RequestInfo := TPendingRequest.Create; RequestInfo.OutputChannel := Format('TTalConvListenerDataNode.%d.%s', [Integer(Self), Symbol]); Result := RequestInfo.OutputChannel; If RequestOnly Then ListId := -1 Else Begin ListId := 0; While GetListSize(ListId) >= MaxListLength Do Inc(ListId); IncListSize(ListId); If FListInfo[ListId].Channel = '' Then RequestDataForList(ListId) End; RequestInfo.RequestChannel := FConversationDataNode.ReserveBroadcastChannel(TqlForRequest(Symbol)); RegisterForBroadcast(RequestInfo.RequestChannel, MessageReceived); FSymbolInfo[Symbol] := RequestInfo; RequestInfo.ListId := ListId End End; { TTalConvListenerDataNode } Procedure TTalConvListenerDataNode.StartSymbolData(Symbol : String); Begin DoRequestForSymbol(Symbol); DoAdviseForSymbol(Symbol) End; Procedure TTalConvListenerDataNode.RemoveSymbol(Symbol : String); Var RequestInfo : TPendingRequest; Begin RequestInfo := FSymbolInfo[Symbol] As TPendingRequest; If Assigned(RequestInfo) Then Begin FConversationDataNode.ReleaseBroadcastChannel(TqlForRequest(Symbol)); If RequestInfo.RequestChannel <> '' Then UnRegisterForBroadcast(RequestInfo.RequestChannel); If RequestInfo.ListId <> RequestOnlyId Then Begin FConversationDataNode.Execute('[DeleteWatch(' + TableName + ',' + ListName(RequestInfo.ListId) + ',' + Symbol + ')]'); Dec(FListInfo[RequestInfo.ListId].Size); If FListInfo[RequestInfo.ListId].Size = 0 Then UnRequestDataForList(RequestInfo.ListId) End; FSymbolInfo.remove(Symbol); RequestInfo.Free End End; Procedure TTalConvListenerDataNode.RefreshAll; Var It : tMapIterator; Time : TDateTime; Begin Time := Now; If FConversationDataNode.GetBoolean Then Begin It := FSymbolInfo.getIterator; Try While It.validEntry Do Begin DoRequestForSymbol(It.getKey); It.next End Finally It.Free End; End; Time := Now - Time; DebugOutputWindow.AddMessage(Format('Finished refresh after %.4f seconds', [Time * 24 * 60 * 60])) End; Procedure TTalConvListenerDataNode.OnTimerCallback; Begin DebugOutputWindow.AddMessage('OnTimerCallback'); DoInCorrectThread(RefreshAll) End; Procedure TTalConvListenerDataNode.TalConversationStatusChange; Var I : Integer; It : tMapIterator; Begin If FConversationDataNode.GetBoolean Then Begin // The connection just came up. Resubmit all requests and advises. It := FSymbolInfo.getIterator; Try While It.validEntry Do Begin DoRequestForSymbol(It.getKey); It.next End Finally It.Free End; For I := Low(FListInfo) To High(FListInfo) Do If FListInfo[I].Size > 0 Then RequestDataForList(I); It := FSymbolInfo.getIterator; Try While It.validEntry Do Begin DoAdviseForSymbol(It.getKey); It.next End Finally It.Free End; End Else // The conversation just went down. Tell the listeners immediately. NotifyListeners End; Procedure TTalConvListenerDataNode.MessageReceived( Msg : TBroadcastMessage; Owner : TDataNode); Var RequestInfo : TPendingRequest; Symbol : String; BroadcastInfo : TTalRecordInfo; RecordStatus, FieldStatus : TTal4Status; Field : PTal4FieldData; Tal4Handle : TTal4Handle; Begin Assert(Msg Is TTalMessageInfo); With Msg As TTalMessageInfo Do Begin If Assigned(Data) Then Begin RecordStatus := TAL4_Open(TALConvDataGetData(Data), Tal4Handle, TAL4_READ); While RecordStatus = TAL4_SUCCESS Do Begin FieldStatus := TAL4_FindField(Tal4Handle, fid_DISP_NAME, Field); If (FieldStatus = TAL4_SUCCESS) And FieldIsPresent(Field) Then Begin Symbol := FieldAsString(Field); RequestInfo := FSymbolInfo[Symbol] As TPendingRequest; If Assigned(RequestInfo) Then Begin TAL4_GotoField(Tal4Handle, 0); BroadcastInfo := TTalRecordInfo.Create; BroadcastInfo.Data := @Tal4Handle; If Data.iType = TALCONV_REQUEST_DATA Then BroadcastInfo.Status := tmsRequest Else BroadcastInfo.Status := tmsAdvise; BroadcastInfo.Send(RequestInfo.OutputChannel, True); If RequestInfo.ListId = RequestOnlyId Then RemoveSymbol(Symbol) End End; RecordStatus := TAL4_NextRecord(Tal4Handle) End End End End; //////////////////////////////////////////////////////////////////////// // TTalMsgListenerDataNode //////////////////////////////////////////////////////////////////////// Procedure TTalMsgListenerDataNode.NewData( Msg : TBroadcastMessage; Owner : TDataNode); Var Info : TTalRecordInfo; Begin Info := (Msg As TTalRecordInfo); FRecord := Info.Data; FMessageStatus := Info.Status; NotifyListeners; FRecord := Nil; FMessageStatus := tmsDown; End; Procedure TTalMsgListenerDataNode.NewConversationStatus; Begin { Data feed just went down. } NotifyListeners End; Procedure TTalMsgListenerDataNode.Initialize; Var TempNode : TTalConvListenerDataNode; Link : TDataNodeLink; Begin TTalConvListenerDataNode.Find(FTQL, Symbol, FTable, NewConversationStatus, TempNode, Link); AddAutoLink(Link); Link.SetReceiveInput(True); Source := TempNode; RegisterForBroadcast(TempNode.ReserveSymbol(FSymbol, FOnceOnly), NewData); TempNode.StartSymbolData(FSymbol) End; Procedure TTalMsgListenerDataNode.RefreshSnapshotData; Begin (Source As TTalConvListenerDataNode).RefreshSymbolSnapshotData(FSymbol) End; Constructor TTalMsgListenerDataNode.Create(TQL : String; Table : TTalTable; Symbol : String; OnceOnly : Boolean); Begin Inherited Create; FTable := Table; FTQL := TQL; FSymbol := Symbol; FOnceOnly := OnceOnly; DoInCorrectThread(Initialize) End; Class Procedure TTalMsgListenerDataNode.Find(TQL : String; Table : TTalTable; Symbol : String; OnceOnly : Boolean; OnChange : TThreadMethod; Out Node : TTalMsgListenerDataNode; Out Link : TDataNodeLink); Begin { We convert the symbol back to TAL notation here. This class is all about hiding messy details. And this is lowest level where we can easily convert the symbol. At the next lower level the interface is much wider. This class talks to its owner is contacted by a standard notification. The next lower level listens to broadcast messages. } Symbol := StarUnquote(Symbol); Node := Create(TQL, Table, Symbol, OnceOnly); Link := Node.CreateLink(OnChange) End; Function TTalMsgListenerDataNode.GetRecord : PTal4Handle; Begin Result := FRecord End; Destructor TTalMsgListenerDataNode.Destroy; Begin If Assigned(Source) Then (Source As TTalConvListenerDataNode).RemoveSymbol(FSymbol); Inherited End; //////////////////////////////////////////////////////////////////////// // Initialization //////////////////////////////////////////////////////////////////////// Procedure CreateFactories; Var I : Integer; Begin For I := Low(LiveQuoteConversationFactory) To High(LiveQuoteConversationFactory) Do LiveQuoteConversationFactory[I] := TGenericDataNodeFactory.CreateWithArgs( TTalConversaitonDataNode, I, '"ES&', 'TA_SRV', 'LIVEQUOTE'); End; Initialization CreateFactories; End.