File indexing completed on 2025-08-03 08:20:57
0001
0002
0003
0004
0005 #include "pmonitorInterface.h"
0006 #include "HistoBinDefs.h"
0007 #include "OnlMon.h"
0008 #include "OnlMonDefs.h"
0009 #include "OnlMonServer.h"
0010
0011 #pragma GCC diagnostic push
0012 #pragma GCC diagnostic ignored "-Wunused-parameter"
0013 #include <Event/Event.h>
0014 #include <pmonitor.h>
0015 #pragma GCC diagnostic pop
0016
0017 #include <Event/EventTypes.h>
0018 #include <Event/msg_control.h>
0019 #include <Event/msg_profile.h>
0020 #include <Event/packet.h>
0021
0022 #include <MessageTypes.h> // for kMESS_OBJECT, kMESS_STRING
0023 #include <TClass.h>
0024 #include <TH1.h>
0025 #include <TInetAddress.h> // for TInetAddress
0026 #include <TMessage.h>
0027 #include <TROOT.h>
0028 #include <TServerSocket.h>
0029 #include <TSocket.h>
0030 #include <TSystem.h>
0031 #include <TThread.h>
0032
0033 #include <pthread.h>
0034 #include <sys/types.h> // for time_t
0035 #include <unistd.h> // for sleep
0036 #include <csignal>
0037 #include <cstdio> // for printf, NULL
0038 #include <cstdlib> // for exit
0039 #include <cstring> // for strcmp
0040 #include <iostream> // for operator<<, basic_ostream, endl, basic_o...
0041 #include <limits>
0042 #include <sstream>
0043 #include <string>
0044
0045
0046
0047 #ifndef ROOTTHREAD
0048 #define SERVER
0049 #endif
0050
0051 #ifdef SERVER
0052 static void *server(void *);
0053 int ServerThread = 0;
0054 #endif
0055
0056 #ifdef ROOTTHREAD
0057 static void *server(void *);
0058 static TThread *ServerThread = nullptr;
0059 #endif
0060
0061 #ifdef USE_MUTEX
0062 pthread_mutex_t mutex;
0063 #endif
0064
0065 TH1 *FrameWorkVars = nullptr;
0066 void signalhandler(int signum);
0067
0068
0069 int pinit()
0070 {
0071 OnlMonServer *Onlmonserver = OnlMonServer::instance();
0072 #ifdef USE_MUTEX
0073 Onlmonserver->GetMutex(mutex);
0074 #endif
0075 for (int i = 0; i < kMAXSIGNALS; i++)
0076 {
0077 gSystem->IgnoreSignal((ESignals) i);
0078 }
0079 #ifdef USE_MUTEX
0080 pthread_mutex_lock(&mutex);
0081 #endif
0082 signal(SIGINT, signalhandler);
0083 #if defined(SERVER) || defined(ROOTTHREAD)
0084
0085 pthread_t ThreadId = 0;
0086 if (!ServerThread)
0087 {
0088
0089 #ifdef SERVER
0090
0091 ServerThread = pthread_create(&ThreadId, nullptr, server, (void *) nullptr);
0092 Onlmonserver->SetThreadId(ThreadId);
0093 #endif
0094 #ifdef ROOTTHREAD
0095
0096 ServerThread = new TThread(server, (void *) 0);
0097 ServerThread->Run();
0098 #endif
0099 }
0100 #endif
0101
0102 FrameWorkVars = new TH1I("FrameWorkVars", "FrameWorkVars", NFRAMEWORKBINS, 0., NFRAMEWORKBINS);
0103 Onlmonserver->registerCommonHisto(FrameWorkVars);
0104 #ifdef USE_MUTEX
0105 pthread_mutex_unlock(&mutex);
0106 #endif
0107
0108 return 0;
0109 }
0110
0111
0112 int process_event(Event *evt)
0113 {
0114 static time_t savetmpticks = 0x7FFFFFFF;
0115 static time_t borticks = 0;
0116 static time_t eorticks = 0;
0117 static int eventcnt = 0;
0118
0119 OnlMonServer *se = OnlMonServer::instance();
0120 time_t tmpticks = evt->getTime();
0121
0122
0123 if (se->RunNumber() == -1)
0124 {
0125 savetmpticks = 0x7FFFFFFF;
0126 eventcnt = 0;
0127 int newrun = evt->getRunNumber();
0128 #ifdef USE_MUTEX
0129 pthread_mutex_lock(&mutex);
0130 #endif
0131 FrameWorkVars->SetBinContent(RUNNUMBERBIN, newrun);
0132 se->BadEvents(0);
0133 se->EventNumber(evt->getEvtSequence());
0134 se->RunNumber(newrun);
0135
0136
0137
0138 se->CurrentTicks(tmpticks);
0139 se->BeginRun(newrun);
0140
0141 borticks = se->BorTicks();
0142 FrameWorkVars->SetBinContent(BORTIMEBIN, borticks);
0143 #ifdef USE_MUTEX
0144 pthread_mutex_unlock(&mutex);
0145 #endif
0146 eorticks = borticks;
0147 }
0148
0149
0150
0151
0152
0153
0154
0155
0156
0157
0158
0159 int oldrun;
0160 if ((oldrun = se->RunNumber()) != evt->getRunNumber())
0161 {
0162
0163
0164 #ifdef USE_MUTEX
0165 pthread_mutex_lock(&mutex);
0166 #endif
0167 FrameWorkVars->SetBinContent(EORTIMEBIN, eorticks);
0168 se->EndRun(oldrun);
0169 se->WriteHistoFile();
0170 se->Reset();
0171 int newrun = evt->getRunNumber();
0172 FrameWorkVars->SetBinContent(RUNNUMBERBIN, newrun);
0173 eorticks = tmpticks;
0174 se->BadEvents(0);
0175 se->RunNumber(newrun);
0176 se->EventNumber(evt->getEvtSequence());
0177
0178
0179
0180 se->CurrentTicks(tmpticks);
0181 se->BeginRun(newrun);
0182 borticks = se->BorTicks();
0183 FrameWorkVars->SetBinContent(BORTIMEBIN, borticks);
0184 eventcnt = 0;
0185 #ifdef USE_MUTEX
0186 pthread_mutex_unlock(&mutex);
0187 #endif
0188 savetmpticks = 0x7FFFFFFF;
0189 }
0190
0191 se->CurrentTicks(tmpticks);
0192
0193
0194 if (tmpticks < borticks)
0195 {
0196 #ifdef USE_MUTEX
0197 pthread_mutex_lock(&mutex);
0198 #endif
0199 FrameWorkVars->AddBinContent(EARLYEVENTNUMBIN);
0200 if (tmpticks < savetmpticks)
0201 {
0202 savetmpticks = tmpticks;
0203 FrameWorkVars->SetBinContent(EARLYEVENTTIMEBIN, tmpticks);
0204 }
0205 #ifdef USE_MUTEX
0206 pthread_mutex_unlock(&mutex);
0207 #endif
0208 }
0209 if (eorticks < se->CurrentTicks())
0210 {
0211 eorticks = se->CurrentTicks();
0212 }
0213 if (evt->getErrorCode())
0214 {
0215 std::ostringstream msg;
0216 msg << __PRETTY_FUNCTION__ << " Event with error code: "
0217 << evt->getErrorCode()
0218 << " discarding event " << evt->getEvtSequence();
0219 send_message(MSG_SEV_WARNING, msg.str());
0220 se->AddBadEvent();
0221 return 0;
0222 }
0223
0224 eventcnt++;
0225 #ifdef USE_MUTEX
0226 pthread_mutex_lock(&mutex);
0227 #endif
0228 FrameWorkVars->SetBinContent(CURRENTTIMEBIN, se->CurrentTicks());
0229 se->EventNumber(evt->getEvtSequence());
0230 se->IncrementEventCounter();
0231 FrameWorkVars->SetBinContent(EVENTCOUNTERBIN,se->EventCounter());
0232 se->process_event(evt);
0233 FrameWorkVars->SetBinContent(GL1COUNTERBIN,se->Gl1FoundCounter());
0234 #ifdef USE_MUTEX
0235 pthread_mutex_unlock(&mutex);
0236 #endif
0237 return 0;
0238 }
0239
0240 int setup_server()
0241 {
0242 return 0;
0243 }
0244
0245 static void *server(void * )
0246 {
0247 OnlMonServer *Onlmonserver = OnlMonServer::instance();
0248 int MoniPort = OnlMonDefs::MONIPORT;
0249
0250 #ifdef USE_MUTEX
0251 pthread_mutex_lock(&mutex);
0252 #endif
0253 TServerSocket *ss = nullptr;
0254 sleep(5);
0255 do
0256 {
0257 delete ss;
0258 ss = new TServerSocket(MoniPort, kTRUE);
0259
0260 Onlmonserver->PortNumber(MoniPort);
0261 if ((MoniPort - OnlMonDefs::MONIPORT) >= OnlMonDefs::NUMMONIPORT)
0262 {
0263 std::ostringstream msg;
0264 msg << "Too many Online Monitors running on this machine, bailing out";
0265 send_message(MSG_SEV_FATAL, msg.str());
0266
0267 exit(1);
0268 }
0269 MoniPort++;
0270 if (!ss->IsValid())
0271 {
0272 printf("Ignore ROOT error about socket in use, I try another one\n");
0273 }
0274 } while (!ss->IsValid());
0275
0276
0277
0278
0279
0280 int isock = gROOT->GetListOfSockets()->IndexOf(ss);
0281 gROOT->GetListOfSockets()->RemoveAt(isock);
0282 sleep(10);
0283 #ifdef USE_MUTEX
0284 pthread_mutex_unlock(&mutex);
0285 #endif
0286 again:
0287 TSocket *s0 = ss->Accept();
0288 if (!s0)
0289 {
0290 std::cout << "Server socket " << OnlMonDefs::MONIPORT
0291 << " in use, either go to a different node or" << std::endl
0292 << "change MONIPORT in server/OnlMonDefs.h and recompile" << std::endl
0293 << "server and client" << std::endl;
0294 exit(1);
0295 }
0296
0297
0298
0299 if (Onlmonserver->Verbosity() > 2)
0300 {
0301 TInetAddress adr = s0->GetInetAddress();
0302 std::cout << "got connection from " << std::endl;
0303 adr.Print();
0304 }
0305
0306 #ifdef USE_MUTEX
0307 pthread_mutex_lock(&mutex);
0308 #endif
0309
0310 handleconnection(s0);
0311
0312 #ifdef USE_MUTEX
0313 pthread_mutex_unlock(&mutex);
0314 #endif
0315
0316 delete s0;
0317
0318
0319
0320
0321
0322
0323
0324
0325
0326
0327 goto again;
0328 }
0329
0330 void handletest(void * )
0331 {
0332
0333 return;
0334 }
0335
0336 void handleconnection(void *arg)
0337 {
0338 TSocket *s0 = (TSocket *) arg;
0339
0340 OnlMonServer *Onlmonserver = OnlMonServer::instance();
0341
0342
0343
0344
0345
0346
0347
0348 TMessage *mess = nullptr;
0349 TMessage outgoing(kMESS_OBJECT);
0350 while (true)
0351 {
0352 if (Onlmonserver->Verbosity() > 2)
0353 {
0354 std::cout << "Waiting for message" << std::endl;
0355 }
0356 s0->Recv(mess);
0357 if (!mess)
0358 {
0359 std::cout << "Broken Connection, closing socket" << std::endl;
0360 break;
0361 }
0362 if (mess->What() == kMESS_STRING)
0363 {
0364 char strchr[OnlMonDefs::MSGLEN];
0365 mess->ReadString(strchr, OnlMonDefs::MSGLEN);
0366 delete mess;
0367 mess = nullptr;
0368 std::string str = strchr;
0369 if (Onlmonserver->Verbosity() > 2)
0370 {
0371 std::cout << "received message: " << str << std::endl;
0372 }
0373 if (str == "Finished")
0374 {
0375 break;
0376 }
0377 else if (str == "WriteRootFile")
0378 {
0379 Onlmonserver->WriteHistoFile();
0380 s0->Send("Finished");
0381 break;
0382 }
0383 else if (str == "Ack")
0384 {
0385 continue;
0386 }
0387 else if (str == "HistoList")
0388 {
0389 if (Onlmonserver->Verbosity() > 2)
0390 {
0391 std::cout << "number of histos: " << Onlmonserver->nHistos() << std::endl;
0392 }
0393 for (auto monitors = Onlmonserver->monibegin(); monitors != Onlmonserver->moniend(); ++monitors)
0394 {
0395 for (auto &histos : monitors->second)
0396 {
0397 std::string subsyshisto = monitors->first + ' ' + histos.first;
0398 if (Onlmonserver->Verbosity() > 2)
0399 {
0400 std::cout << "subsystem: " << monitors->first << ", histo: " << histos.first << std::endl;
0401 std::cout << " sending: \"" << subsyshisto << "\"" << std::endl;
0402 }
0403 s0->Send(subsyshisto.c_str());
0404 int nbytes = s0->Recv(mess);
0405 delete mess;
0406 mess = nullptr;
0407 if (nbytes <= 0)
0408 {
0409 std::ostringstream msg;
0410
0411 msg << "Problem receiving message: return code: " << nbytes;
0412 send_message(MSG_SEV_ERROR, msg.str());
0413 }
0414 }
0415 }
0416 s0->Send("Finished");
0417 }
0418 else if (str == "ALL")
0419 {
0420 if (Onlmonserver->Verbosity() > 2)
0421 {
0422 std::cout << "number of histos: " << Onlmonserver->nHistos() << std::endl;
0423 }
0424 for (unsigned int i = 0; i < Onlmonserver->nHistos(); i++)
0425 {
0426 TH1 *histo = Onlmonserver->getHisto(i);
0427 if (histo)
0428 {
0429 outgoing.Reset();
0430 outgoing.WriteObject(histo);
0431 s0->Send(outgoing);
0432 outgoing.Reset();
0433 s0->Recv(mess);
0434 delete mess;
0435 mess = nullptr;
0436 }
0437 }
0438 s0->Send("Finished");
0439 }
0440 else if (str.find("ISRUNNING") != std::string::npos)
0441 {
0442 std::string answer = "No";
0443 unsigned int pos_space = str.find(' ');
0444 std::string moniname = str.substr(pos_space + 1, str.size());
0445 for (auto moniter = Onlmonserver->monitor_vec_begin(); moniter != Onlmonserver->monitor_vec_end(); ++moniter)
0446 {
0447 if ((*moniter)->Name() == moniname)
0448 {
0449 answer = "Yes";
0450 break;
0451 }
0452 }
0453 if (Onlmonserver->Verbosity() > 2)
0454 {
0455 std::cout << "got " << str << ", replied " << answer << std::endl;
0456 }
0457 s0->Send(answer.c_str());
0458 }
0459 else if (str == "LISTMONITORS")
0460 {
0461 s0->Send("go");
0462 for (auto moniter = Onlmonserver->monitor_vec_begin(); moniter != Onlmonserver->monitor_vec_end(); ++moniter)
0463 {
0464 if (Onlmonserver->Verbosity() > 2)
0465 {
0466 std::cout << "sending " << (*moniter)->Name().c_str() << std::endl;
0467 }
0468 s0->Send((*moniter)->Name().c_str());
0469 }
0470 s0->Send("Finished");
0471 break;
0472 }
0473 else if (str == "LIST")
0474 {
0475 s0->Send("go");
0476 while (true)
0477 {
0478 char strmess[OnlMonDefs::MSGLEN];
0479 s0->Recv(mess);
0480 if (!mess)
0481 {
0482 break;
0483 }
0484 if (mess->What() == kMESS_STRING)
0485 {
0486 mess->ReadString(strmess, OnlMonDefs::MSGLEN);
0487 delete mess;
0488 mess = nullptr;
0489 if (std::string(strmess) == "alldone")
0490 {
0491 break;
0492 }
0493 }
0494 std::string str1(strmess);
0495 unsigned int pos_space = str1.find(' ');
0496 if (Onlmonserver->Verbosity() > 2)
0497 {
0498 std::cout << __PRETTY_FUNCTION__ << " getting subsystem " << str1.substr(0, pos_space) << ", histo " << str1.substr(pos_space + 1, str1.size()) << std::endl;
0499 }
0500 TH1 *histo = Onlmonserver->getHisto(str1.substr(0, pos_space), str1.substr(pos_space + 1, str1.size()));
0501 if (histo)
0502 {
0503 outgoing.Reset();
0504 outgoing.WriteObject(histo);
0505 s0->Send(outgoing);
0506 outgoing.Reset();
0507 }
0508 else
0509 {
0510 s0->Send("UnknownHisto");
0511 }
0512
0513 }
0514 s0->Send("Finished");
0515 }
0516 else
0517 {
0518 std::string strstr(str);
0519 unsigned int pos_space = str.find(' ');
0520 TH1 *histo = Onlmonserver->getHisto(strstr.substr(0, pos_space), strstr.substr(pos_space + 1, str.size()));
0521 if (histo)
0522 {
0523
0524 outgoing.Reset();
0525 outgoing.WriteObject(histo);
0526 s0->Send(outgoing);
0527 outgoing.Reset();
0528 s0->Recv(mess);
0529 delete mess;
0530 s0->Send("Finished");
0531 }
0532 else
0533 {
0534 s0->Send("UnknownHisto");
0535 }
0536 }
0537 }
0538 else if (mess->What() == kMESS_OBJECT)
0539 {
0540 printf("got object of class: %s\n", mess->GetClass()->GetName());
0541 delete mess;
0542 }
0543 else
0544 {
0545 printf("*** Unexpected message ***\n");
0546 delete mess;
0547 }
0548 }
0549
0550
0551 s0->Close();
0552 return;
0553 }
0554
0555 int send_message(const int severity, const std::string &msg)
0556 {
0557
0558
0559 msg_control *Message = new msg_control(MSG_TYPE_MONITORING,
0560 MSG_SOURCE_DAQMON,
0561 severity, "pmonitorInterface");
0562 std::cout << *Message << msg << std::endl;
0563 delete Message;
0564 return 0;
0565 }
0566
0567
0568 void signalhandler(int signum)
0569 {
0570 std::cout << "Signal " << signum << " received, saving histos" << std::endl;
0571 OnlMonServer *Onlmonserver = OnlMonServer::instance();
0572 Onlmonserver->WriteHistoFile();
0573 gSystem->Exit(0);
0574 }