Unit VolumeWeightedDataNodes; Interface Uses DataNodes, GenericTickDataNodes, GenericTosDataNode, VolumeWeightedData, GenericDataNodes, Classes; { Currently these only accumulate blocks. They never discard blocks. GetBlocks returns a list which is identical to the previous call, or it contains all the previous data plus one or more new blocks on the right hand side. Potentially one of these data nodes should have the ability to reset themselves when better data is available. Although this may never happen, we still have a contract for that, because if it ever happened, it could effect a lot of code. The rule is that this data node can only do one of two things. It can present an empty list, or it can present a list which is identical to the previous list, with something optionally added to the right hand side. If one of these had 10 items, then it decided to reset itself, and the new version has only 5 items, it must do this in two steps. First it signals all listeners, and presents the empty list. Then it presents the new list with 5 members. Changing existing data, although now allowed, is still very disruptive, and should be avoided. } Type TVolumeWeightedDataNode = Class(TDataNodeWithStringKey) Public Function GetBlocks : TVolumeBlocks; Function GetBlockCount : Integer; Destructor Destroy; Override; Protected FSymbol : String; FGroupBy : Integer; VolumeBlocks : TVolumeBlocks; VolumeBlockFactory : TVolumeBlockFactory; Class Function EncodeInt(I : Integer) : String; Class Procedure DecodeInt(Encoded : String; Out I : Integer; Out Rest : String); Private End; TVolumeWeightedHistoricalDataNode = Class(TVolumeWeightedDataNode) Public Class Procedure Find(Symbol : String; GroupBy : Integer; OnChange : TThreadMethod; Out Node : TVolumeWeightedDataNode; Out Link : TDataNodeLink); Protected Class Function CreateNew(Data : String) : TDataNodeWithStringKey; Override; Private TickData : TGenericTickDataNode; Procedure NewData; Constructor Create(Symbol : String; GroupBy : Integer); End; TVolumeWeightedRealTimeDataNode = Class(TVolumeWeightedDataNode) Public Class Procedure Find(Symbol : String; GroupBy : Integer; OnChange : TThreadMethod; Out Node : TVolumeWeightedDataNode; Out Link : TDataNodeLink); Class Procedure SetHistoryValid(Valid : Boolean); Class Function GetHistoryValid : Boolean; Protected Class Function CreateNew(Data : String) : TDataNodeWithStringKey; Override; Private TosData : TGenericTosDataNode; Procedure NewData; Constructor Create(Symbol : String; GroupBy : Integer); End; Implementation Uses CsvFileData, StandardPlaceHolders, Math, SysUtils; //////////////////////////////////////////////////////////////////////// // TVolumeWeightedDataNode //////////////////////////////////////////////////////////////////////// Function TVolumeWeightedDataNode.GetBlocks : TVolumeBlocks; Begin If Length(VolumeBlocks) = 0 Then VolumeBlocks := VolumeBlockFactory.GetBlocks; Result := VolumeBlocks End; Function TVolumeWeightedDataNode.GetBlockCount : Integer; Begin Result := VolumeBlockFactory.GetBlockCount End; Destructor TVolumeWeightedDataNode.Destroy; Begin VolumeBlockFactory.Free; Inherited End; Class Function TVolumeWeightedDataNode.EncodeInt(I : Integer) : String; Begin SetLength(Result, SizeOf(Integer)); PInteger(PChar(Result))^ := I End; Class Procedure TVolumeWeightedDataNode.DecodeInt(Encoded : String; Out I : Integer; Out Rest : String); Begin Assert(Length(Encoded) >= SizeOf(Integer)); I := PInteger(PChar(Encoded))^; Rest := Copy(Encoded, Succ(SizeOf(Integer)), MaxInt) End; //////////////////////////////////////////////////////////////////////// // TVolumeWeightedHistoricalDataNode //////////////////////////////////////////////////////////////////////// Class Procedure TVolumeWeightedHistoricalDataNode.Find(Symbol : String; GroupBy : Integer; OnChange : TThreadMethod; Out Node : TVolumeWeightedDataNode; Out Link : TDataNodeLink); Var TempNode : TDataNodeWithStringKey; Data : String; Begin Data := EncodeInt(GroupBy) + Symbol; FindCommon(TVolumeWeightedHistoricalDataNode, Data, OnChange, TempNode, Link);; Node := TempNode As TVolumeWeightedDataNode End; Class Function TVolumeWeightedHistoricalDataNode.CreateNew(Data : String) : TDataNodeWithStringKey; Var GroupBy : Integer; Symbol : String; Begin DecodeInt(Data, GroupBy, Symbol); Result := Create(Symbol, GroupBy) End; Constructor TVolumeWeightedHistoricalDataNode.Create(Symbol : String; GroupBy : Integer); Var Link : TDataNodeLink; Begin Inherited Create; FSymbol := Symbol; FGroupBy := GroupBy; VolumeBlockFactory := TVolumeBlockFactory.Create(GroupBy); TGenericTickDataNode.Find(Symbol, NewData, TickData, Link); AddAutoLink(Link); DoInCorrectThread(NewData) End; Procedure TVolumeWeightedHistoricalDataNode.NewData; Var Ticks : TTickRecordArray; I : Integer; Begin VolumeBlockFactory.Free; VolumeBlockFactory := TVolumeBlockFactory.Create(FGroupBy); SetLength(VolumeBlocks, 0); Ticks := TickData.GetRecords; If (Length(Ticks) > 0) Then Assert(Ticks[0].Time >= Ticks[Pred(Length(Ticks))].Time); For I := Pred(Length(Ticks)) DownTo 0 Do With Ticks[I] Do If Trade Then VolumeBlockFactory.AddPrint(TradePrice, TradeVolume, Time); NotifyListeners End; //////////////////////////////////////////////////////////////////////// // TVolumeWeightedRealTimeDataNode //////////////////////////////////////////////////////////////////////// Var HistoryValid : Boolean = True; Class Procedure TVolumeWeightedRealTimeDataNode.SetHistoryValid(Valid : Boolean); Begin HistoryValid := Valid End; Class Function TVolumeWeightedRealTimeDataNode.GetHistoryValid : Boolean; Begin Result := HistoryValid End; Class Procedure TVolumeWeightedRealTimeDataNode.Find(Symbol : String; GroupBy : Integer; OnChange : TThreadMethod; Out Node : TVolumeWeightedDataNode; Out Link : TDataNodeLink); Var TempNode : TDataNodeWithStringKey; Data : String; Begin Data := EncodeInt(GroupBy) + Symbol; FindCommon(TVolumeWeightedRealTimeDataNode, Data, OnChange, TempNode, Link); Node := TempNode As TVolumeWeightedDataNode End; Class Function TVolumeWeightedRealTimeDataNode.CreateNew(Data : String) : TDataNodeWithStringKey; Var GroupBy : Integer; Symbol : String; Begin DecodeInt(Data, GroupBy, Symbol); Result := Create(Symbol, GroupBy) End; Constructor TVolumeWeightedRealTimeDataNode.Create(Symbol : String; GroupBy : Integer); Var Link : TDataNodeLink; HistoricalData : TFileOwnerDataNode; Begin Inherited Create; FSymbol := Symbol; FGroupBy := GroupBy; VolumeBlockFactory := TVolumeBlockFactory.Create(GroupBy); TGenericTosDataNode.Find(Symbol, NewData, TosData, Link); AddAutoLink(Link); If GetHistoryValid Then Begin { Ideally we start with the history which was automatically prepaired for us by another task, then add realtime data. But if we start late there will be a big hole. In that case it's better to use no history and just the real time. } TFileOwnerDataNode.Find('VB_OvernightData.csv', Nil, HistoricalData, Link); VolumeBlockFactory.InitializeWith(VolumeBlocksFromString(HistoricalData.ThreadSafeGet('Vol', Format('%s_%d', [Symbol, GroupBy])))); Link.Release End End; Procedure TVolumeWeightedRealTimeDataNode.NewData; Var OldLength : Integer; LastPrint : TTosData; Begin If TosData.IsValid Then Begin LastPrint := TosData.GetLast^; If LastPrint.EventType = etNewPrint Then Begin OldLength := VolumeBlockFactory.GetBlockCount; VolumeBlockFactory.AddPrint(LastPrint.Price, LastPrint.Size, GetSubmitTime); If OldLength <> VolumeBlockFactory.GetBlockCount Then Begin SetLength(VolumeBlocks, 0); NotifyListeners End End End End; End.