#include "../shared/ThreadClass.h"
#include "../shared/GlobalConfigFile.h"
#include "../shared/SimpleLogFile.h"
#include "../shared/SelectableRequestQueue.h"
#include "../shared/CommandDispatcher.h"
#include "../shared/PollSet.h"
#include "../shared/ThreadMonitor.h"
#include "../shared/DatabaseWithRetry.h"
#include "../shared/ReplyToClient.h"
#include "../shared/TalkWithServer.h"
#include "FieldLists.h"
#include "AlertRecordMultiCast.h"
#include "RecordDispatcher.h"
/////////////////////////////////////////////////////////////////////
// RecordDispatcher
/////////////////////////////////////////////////////////////////////
// Note: this is an old style of creating threads and talking with other
// servers. New code should consider using ../shared/ContainerThread.h rather
// than ../shared/ThreadClass.h and using ../shared/ServerConnection64.h
// rather than ../shared/TalkWithServer.h.
class RecordDispatcher : public IRecordDispatcher, ThreadClass,
TalkWithServer::IMessageListener
{
private:
enum { mtNewListener, mtExternalRecord, mtSendFakeRecord, mtDoIt };
class DoIt : public Request
{
public:
typedef std::function < void() > Callback;
const Callback callback;
DoIt(Callback callback, SocketInfo *socket = NULL) :
Request(socket),
callback(callback)
{
callbackId = mtDoIt;
}
};
enum { ciData, ciStatus, ciFirstPing }; // Client id for messages from server.
SelectableRequestQueue *_incoming;
const std::string _base;
const std::string _notifyString;
const std::string _notifyStatusString;
struct Listener
{
RequestListener *listener;
int callbackId;
};
std::vector< Listener > _listeners;
std::vector< Listener > _statusListeners;
class NewListener : public Request
{
public:
Listener listener;
const bool status;
NewListener(bool status) :
Request(NULL), status(status) { callbackId = mtNewListener; }
};
LateRecordCounter _lateRecordCounter;
//A new record arrived. Share it with all listeners.
void notifyListeners(Record::Ref const &record)
{
ThreadMonitor::find().increment(_notifyString);
_lateRecordCounter.add(record);
for (std::vector< Listener >::const_iterator it = _listeners.begin();
it != _listeners.end();
it++)
{
NewRecord *request = new NewRecord(record);
request->callbackId = it->callbackId;
it->listener->newRequest(request);
}
}
void notifyStatusListeners(std::string const &name)
{
ThreadMonitor::find().increment(_notifyStatusString);
for (std::vector< Listener >::const_iterator it = _statusListeners.begin();
it != _statusListeners.end();
it++)
{
ProviderStatus *request = new ProviderStatus(name);
request->callbackId = it->callbackId;
it->listener->newRequest(request);
}
}
bool _active;
bool _thisIsMaster;
bool _tcpSlave; // We are using TalkWithServer to read from the master.
TalkWithServer *_talkWithServer;
std::string _masterName;
std::string _masterPort;
int64_t _nextId;
// If we are serving as the master, we need to assign id numbers. Usually
// get get the id numbers from _nextId. However, when we first start we
// need to know where to start from. In that case we go back to the
// database.
//
// If you restart off hours this works perfectly. If you restart midday
// it's possible that you will send alerts to other alert servers before
// the same alert gets to the database, so we might reuse a small number of
// id's. I explicitly do NOT want to hold up alerts until the database
// acknowledges them. If we ever needed to be more certain about not
// repeating an id, I'd use a flat file or something, where I had more
// speed and control.
void initNextId()
{
DatabaseWithRetry database(true, getName()); // Read only.
_nextId = database.tryQueryUntilSuccess("SELECT MAX(id) FROM " + _base)
->getIntegerField(0,0) + 1;
TclList msg;
msg<
command=alerts_send_fake_record&message_id=2
== MESSAGE 0 ========== 57 ==
{0 5 0 3 1234} {1 0 2 6 “DELL”} {2 13 4 2 1595786418}
command=alerts_send_fake_record&message_id=2&count=12
== MESSAGE 0 ========== 57 ==
{0 5 0 3 1234} {1 0 2 6 “DELL”} {2 13 4 2 1595786454}
command=alerts_send_fake_record&message_id=2&count=1299
== MESSAGE 0 ========== 57 ==
{0 5 0 3 1234} {1 0 2 6 “DELL”} {2 13 4 2 1595786463}
command=alerts_send_fake_record&message_id=2&count=129999
== MESSAGE 0 ========== 57 ==
{0 5 0 3 1234} {1 0 2 6 “DELL”} {2 13 4 2 1595786504}
command=alerts_send_fake_record&message_id=2&count=1299999
== MESSAGE 0 ========== 57 ==
{0 5 0 3 1234} {1 0 2 6 “DELL”} {2 13 4 2 1595786570}
command=alerts_send_fake_record&message_id=2&count=12999999
== MESSAGE 0 ========== 57 ==
{0 5 0 3 1234} {1 0 2 6 “DELL”} {2 13 4 2 1595786600}
^]
telnet> Connection closed.
*/
// You can verify from the alert server's log the it was receiving and
// processing messages from the mast for several minutes solid and it never
// tried to reconnect.
void checkPing()
{
if (!_tcpSlave)
return;
// need better test because multicast is not master.
checkConnection();
if (_nextPingTime < time(NULL))
{ // Timer went off.
if (_pingStatus == Ping::SomethingReceived)
// We're receiving data, so but we haven't seen the ping response
// yet. Assume the data is legit and the ping response is just
// slow because we have too much data. Disconnecting now would
// only make things slightly slower, and raise the chance that we
// completely lose some data.
ThreadMonitor::find().increment("ping late warn");
if (_pingStatus == Ping::Waiting)
{ // Nothing received recently. Looks like a network error.
// Disconnect and try again.
//TclList msg;
//msg<disconnect();
ThreadMonitor::find().increment("ping failed DISCONNECT");
}
else
{ // Time to send another ping
_nextPingTime = time(NULL) + 5;
if (!_firstPingTime)
_firstPingTime = _nextPingTime;
// Encode the _nextPingTime. So when we get a response we know:
// a) Is this is the most recent ping request?
// b) Otherwise, how old was this ping request?
int64_t clientId = ciFirstPing + _nextPingTime - _firstPingTime;
if ((int)clientId < ciFirstPing)
{ // We wrapped around. clientId is 32 bits, so this should
// only happen if we keep running for over 68 years. I'll do the
// right thing; I'll reset the connection which will cause
// _firstPingTime to reset. Should be barely noticeable if at all.
// But realistically, if we get here, it's probably a mistake, so log
// it! And don't forget, the newer communication libraries are all
// 64 bit.
_talkWithServer->disconnect();
TclList msg;
msg<disconnected()"<<"UNEXPECTED!!!"
<<"_firstPingTime"<<_firstPingTime<sendMessage(message, this, clientId, true);
_pingStatus = Ping::Waiting;
}
}
}
}
// If we have to reconnect to the master, tell it where we left off. This
// is the last thing we successfully received. This is null if we have
// not received any records, yet.
Record::Ref _lastRecord;
void checkConnection(bool createIfRequired=true)
{
if (_talkWithServer && _talkWithServer->disconnected())
{
TclList msg;
msg<disconnected()";
sendToLogFile(msg);
_talkWithServer->cancelAll();
delete _talkWithServer;
_talkWithServer = NULL;
_pingStatus = Ping::NotSent;
}
if (!createIfRequired)
// We've put this object into a simpler state. Either _talkWithServer is
// null or it's ready to go.
return;
if (_talkWithServer)
// _talkWithServer was already ready to go.
return;
TclList msg;
msg<connect"<<_masterName<<_masterPort;
sendToLogFile(msg);
_talkWithServer = new TalkWithServer("record dispatcher " + _base);
_talkWithServer->connect(_masterName, _masterPort);
if (_talkWithServer->disconnected())
// Unable to open the connection.
return;
TalkWithServer::Message message;
message["command"] = getListenCommand();
if (_lastRecord)
{ // Let the master server know where we left off. The new data should
// start seamlessly.
const ValueBox idBox = _lastRecord->lookUpValue((FieldId)MainFields::id);
bool valid;
int64_t id;
idBox.getInt(valid, id);
if (!valid)
sendToLogFile(TclList()<sendMessage(message, this, ciData, true);
message["command"] = getStatusCommand();
_talkWithServer->sendMessage(message, this, ciStatus, true);
// Request a new ping ASAP.
_firstPingTime = _nextPingTime = 0;
_pingStatus = Ping::NotSent;
}
// Received something from the master.
virtual void onMessage(std::string bytes, int clientId,
TalkWithServer::CancelId cancelId)
{
const auto somethingReceived = [this]()
{ // We received some data. This can be a good thing, or nothing at all.
// If we were waiting for a ping, mark this as almost as good, a
// warning instead of an error, and room to do even better when the
// ping finally comes. If we were not waiting for anything, or we
// already received a ping, ignore this, we are already in the best
// possible state.
if (_pingStatus == Ping::Waiting)
_pingStatus = Ping::SomethingReceived;
};
ThreadMonitor &tm = ThreadMonitor::find();
switch (clientId)
{
case ciData:
{ // Data received. Forward to the listeners.
tm.increment("ciData");
auto record = Record::create(bytes);
if (!record)
tm.increment("UNABLE TO PARSE");
else
{
_lastRecord = record;
somethingReceived();
notifyListeners(record);
}
break;
}
case ciStatus:
{ // Status message received. Forward to the listeners.
tm.increment("ciStatus");
somethingReceived();
notifyStatusListeners(bytes);
break;
}
default:
{ // Ping received.
const time_t timeout = clientId + _firstPingTime - ciFirstPing;
if (timeout == _nextPingTime)
{ // This response is from the most recent ping request.
tm.increment("ping on time");
_pingStatus = Ping::PingReceived;
_nextPingTime = 5 + time(NULL);
}
else
{ // This is an older response.
somethingReceived();
const time_t age = time(NULL) - timeout;
TclList msg;
msg<getWaitHandle());
if (_talkWithServer)
{
if (_talkWithServer->wantsRead())
pollSet.addForRead(_talkWithServer->getHandle());
if (_talkWithServer->wantsWrite())
pollSet.addForWrite(_talkWithServer->getHandle());
}
pollSet.setTimeoutMs(1000);
pollSet.poll();
tm.setState("Read from queue");
_incoming->resetWaitHandle();
while (Request *current = _incoming->getRequest())
{
switch (current->callbackId)
{
case mtNewListener:
{ // A new internal listener.
// See FeedNextServer.C for external listeners.
NewListener *request = dynamic_cast< NewListener * >(current);
if (request->status)
_statusListeners.push_back(request->listener);
else
_listeners.push_back(request->listener);
// The following line reported 8 alert listeners and 0 status listeners, before I commented it out.
// {Wed Aug 5 09:47:46 2020} 0 RecordDispatcher.C 457 threadFunction mtNewListener alerts _statusListeners.size() 0 _listeners.size() 8
//sendToLogFile(TclList()<status?"status":"alerts")<<"_statusListeners.size()"<<_statusListeners.size()<<"_listeners.size()"<<_listeners.size());
break;
}
case mtExternalRecord:
{ // This should only happen on the master. Someone is submitting
// a new record for us to forward. (Or someone is making a related
// administrative command.)
ExternalRequest *request =
dynamic_cast< ExternalRequest * >(current);
// Either supply the "record" argument, the "ping" argument,
// or the "status" argument. (I.e. there is no sub-command field.
// Which data the sender provides determines what we do here.)
auto record = Record::create(request->getProperty("record"));
if (!record)
{
if (request->getProperty("ping") == "1")
// If you request "ping"="1", you will receive "ping" as the
// response. This says that you are attached and the server
// is listening for this type of data. More than likely if
// the server is not listening for this type of data, you will
// never get a response.
addToOutputQueue(request->getSocketInfo(), "ping",
request->getResponseMessageId());
else if (!request->getProperty("status").empty())
{
//sendToLogFile(TclList()<getProperty("status"));
notifyStatusListeners(request->getProperty("status"));
}
else
tm.increment("UNABLE TO PARSE");
}
else if (!record->update(ID_FIELD, _nextId))
{
tm.increment("UNABLE TO SET ID");
}
else
{
// "record" will try to add the record to the stream. If there
// is a problem, it is logged locally. No message is sent back
// to the client either way.
_nextId++;
notifyListeners(record);
}
break;
}
case mtSendFakeRecord:
{
ExternalRequest *request =
dynamic_cast< ExternalRequest * >(current);
RecordBuilder rb;
rb.append(MainFields::symbol,
request->getProperty("symbol", "DELL"));
rb.reserveInt(MainFields::id,
strtolDefault(request->getProperty("id"), 1234));
rb.append(MainFields::timestamp,
strtolDefault(request->getProperty("timestamp"),
time(NULL)));
Record::Ref record = Record::create(rb.exportAsString());
const int count = strtolDefault(request->getProperty("count"), 1);
for (int i = 0; i < count; i++)
notifyListeners(record);
addToOutputQueue(request->getSocketInfo(), record->debugDump());
break;
}
case mtDoIt:
{
DoIt *request = dynamic_cast< DoIt * >(current);
request->callback();
break;
}
}
delete current;
}
tm.setState("ping");
checkPing();
if (_talkWithServer)
{
tm.setState("_talkWithServer");
_talkWithServer->wakeUp();
}
if (_talkWithServer && _talkWithServer->pendingResponseCount())
{
tm.setState("responses");
_talkWithServer->doResponses();
}
}
}
public:
virtual bool isActive() const { return _active; }
virtual bool isMaster() const { return _thisIsMaster; }
virtual std::string const &getBaseName() const { return _base; }
virtual std::string getListenCommand() const
{ return _base + "_record_listen"; }
virtual std::string getStatusCommand() const
{ return _base + "_status_listen"; }
virtual void listenForRecords(RequestListener *listener, int callbackId)
{
if (_active)
{
NewListener *request = new NewListener(false);
request->listener.listener = listener;
request->listener.callbackId = callbackId;
_incoming->newRequest(request);
}
}
virtual void listenForStatus(RequestListener *listener, int callbackId)
{
if (_active)
{
NewListener *request = new NewListener(true);
request->listener.listener = listener;
request->listener.callbackId = callbackId;
_incoming->newRequest(request);
}
}
RecordDispatcher(std::string const &base) :
ThreadClass("RecordDispatcher: " + base),
_incoming(NULL),
_base(base),
_notifyString("notify " + base),
_notifyStatusString("notify status " + base),
_active(false),
_thisIsMaster(false),
_tcpSlave(false),
_talkWithServer(NULL),
_pingStatus(Ping::NotSent)
{
std::string input = getConfigItem(base + "_master_info");
if (input == "")
// No source of data. Not worth creating the thread.
// Note: We've added default values for the master info to
// ../../live_server/config_common.txt, so now it's less common
// to get here. Someone would have to explicitly set this to
// the empty string to get here. It still happens, but it's no
// longer the default.
return;
// Don't create _incoming unless we start the thread. Otherwise the
// delete socket logic will fail. No sockets will ever get deleted.
// If the deadman timer goes off, we will stop listening to the socket,
// but we won't send the close message to the other end.
//
// Do not try to access _incoming unless _active is true.
_incoming = new SelectableRequestQueue(getName());
_thisIsMaster = input == "me";
CommandDispatcher *cd = CommandDispatcher::getInstance();
cd->listenForCommand(base + "_send_fake_record",
_incoming, mtSendFakeRecord);
if (_thisIsMaster)
{
cd->listenForCommand(base + "_add_record", _incoming, mtExternalRecord);
}
else if (input == "multicast")
{
TclList msg;
msg<isOkay())
// This test is NOT optional. If you try to call receiver->listen()
// and the receiver is not okay, the program might crash.
msg<<"FAILED";
else
{ // Each time the receiver receives a new event we want to move into
// this thread and call notifyListeners(). DoIt is a class that lets
// us run any code in this thread. (Newer code which uses
// ContainerThread.h would call addLambdaToQueue() for this effect.)
receiver->listen([=](Record::Ref const &record) {
DoIt *request = new DoIt([=]() {
notifyListeners(record);
});
_incoming->newRequest(request);
});
receiver->listen([=](std::string const &status) {
DoIt *request = new DoIt([=]() {
notifyStatusListeners(status);
});
_incoming->newRequest(request);
});
msg<<"success";
sendToLogFile(msg);
}
}
else
{
std::vector< std::string > pieces = explode(":", input);
if (pieces.size() != 2)
{
TclList msg;
msg<lookUpValue(MainFields::timestamp);
bool valid;
time_t timestamp;
timestampBox.getInt(valid, timestamp);
if (!valid)
_noTimeCount++;
else
{
const int64_t now = getMicroTime();
const int64_t late = now - timestamp * 1000000;
if (late <= 0)
_earlyCount++;
else
{
_totalTime += late;
if (late > _worstTime)
_worstTime = late;
}
}
}
std::string LateRecordCounter::getInfoForThreadMonitor()
{
if (!_totalCount)
return "";
TclList result;
result<<"LateRecordCounter"
<<"_totalCount"<<_totalCount;
auto const normalCount = _totalCount - _noTimeCount - _earlyCount;
if (normalCount > 0)
result<<"average late"<<(_totalTime/1000000.0/normalCount)
<<"_worstTime"<<_worstTime/1000000.0;
if (_noTimeCount)
result<<"_noTimeCount"<<_noTimeCount;
if (_earlyCount)
result<<"_earlyCount"<<_earlyCount;
_noTimeCount = _earlyCount = _totalCount = _totalTime = _worstTime = 0;
return result;
}
LateRecordCounter::LateRecordCounter() :
_noTimeCount(0),
_earlyCount(0),
_totalCount(0),
_totalTime(0),
_worstTime(0)
{ }
/////////////////////////////////////////////////////////////////////
// IRecordDispatcher
/////////////////////////////////////////////////////////////////////
IRecordDispatcher *IRecordDispatcher::_alerts = NULL;
IRecordDispatcher *IRecordDispatcher::_topList = NULL;
void IRecordDispatcher::init()
{
_alerts = new RecordDispatcher("alerts");
_topList = new RecordDispatcher("top_list");
if (!(_alerts->isActive() || _topList->isActive()))
{ // Assume this is a mistake.
TclList msg;
msg<