Back to home page

sPhenix code displayed by LXR

 
 

    


File indexing completed on 2025-08-03 08:20:57

0001 /*
0002   This is the interface from the monitoring to pmonitor
0003 */
0004 
0005 #include "pmonitorInterface.h"
0006 #include "HistoBinDefs.h"
0007 #include "OnlMon.h"
0008 #include "OnlMonDefs.h"
0009 #include "OnlMonServer.h"
0010 
0011 #pragma GCC diagnostic push
0012 #pragma GCC diagnostic ignored "-Wunused-parameter"
0013 #include <Event/Event.h>
0014 #include <pmonitor.h>
0015 #pragma GCC diagnostic pop
0016 
0017 #include <Event/EventTypes.h>
0018 #include <Event/msg_control.h>
0019 #include <Event/msg_profile.h>
0020 #include <Event/packet.h>
0021 
0022 #include <MessageTypes.h>  // for kMESS_OBJECT, kMESS_STRING
0023 #include <TClass.h>
0024 #include <TH1.h>
0025 #include <TInetAddress.h>  // for TInetAddress
0026 #include <TMessage.h>
0027 #include <TROOT.h>
0028 #include <TServerSocket.h>
0029 #include <TSocket.h>
0030 #include <TSystem.h>
0031 #include <TThread.h>
0032 
0033 #include <pthread.h>
0034 #include <sys/types.h>  // for time_t
0035 #include <unistd.h>     // for sleep
0036 #include <csignal>
0037 #include <cstdio>       // for printf, NULL
0038 #include <cstdlib>      // for exit
0039 #include <cstring>      // for strcmp
0040 #include <iostream>     // for operator<<, basic_ostream, endl, basic_o...
0041 #include <limits>
0042 #include <sstream>
0043 #include <string>
0044 
0045 //#define ROOTTHREAD
0046 
0047 #ifndef ROOTTHREAD
0048 #define SERVER
0049 #endif
0050 
0051 #ifdef SERVER
0052 static void *server(void *);
0053 int ServerThread = 0;
0054 #endif
0055 
0056 #ifdef ROOTTHREAD
0057 static void *server(void *);
0058 static TThread *ServerThread = nullptr;
0059 #endif
0060 
0061 #ifdef USE_MUTEX
0062 pthread_mutex_t mutex;
0063 #endif
0064 
0065 TH1 *FrameWorkVars = nullptr;
0066 void signalhandler(int signum);
0067 //*********************************************************************
0068 
0069 int pinit()
0070 {
0071   OnlMonServer *Onlmonserver = OnlMonServer::instance();
0072 #ifdef USE_MUTEX
0073   Onlmonserver->GetMutex(mutex);
0074 #endif
0075   for (int i = 0; i < kMAXSIGNALS; i++)
0076   {
0077     gSystem->IgnoreSignal((ESignals) i);
0078   }
0079 #ifdef USE_MUTEX
0080   pthread_mutex_lock(&mutex);
0081 #endif
0082 signal(SIGINT, signalhandler);
0083 #if defined(SERVER) || defined(ROOTTHREAD)
0084 
0085   pthread_t ThreadId = 0;
0086   if (!ServerThread)
0087   {
0088     // std::cout << "creating server thread" << std::endl;
0089 #ifdef SERVER
0090 
0091     ServerThread = pthread_create(&ThreadId, nullptr, server, (void *) nullptr);
0092     Onlmonserver->SetThreadId(ThreadId);
0093 #endif
0094 #ifdef ROOTTHREAD
0095 
0096     ServerThread = new TThread(server, (void *) 0);
0097     ServerThread->Run();
0098 #endif
0099   }
0100 #endif
0101   // for the timestamp we need doubles
0102   FrameWorkVars = new TH1I("FrameWorkVars", "FrameWorkVars", NFRAMEWORKBINS, 0., NFRAMEWORKBINS);
0103   Onlmonserver->registerCommonHisto(FrameWorkVars);
0104 #ifdef USE_MUTEX
0105   pthread_mutex_unlock(&mutex);
0106 #endif
0107 
0108   return 0;
0109 }
0110 
0111 //*********************************************************************
0112 int process_event(Event *evt)
0113 {
0114   static time_t savetmpticks = 0x7FFFFFFF;
0115   static time_t borticks = 0;
0116   static time_t eorticks = 0;
0117   static int eventcnt = 0;
0118 
0119   OnlMonServer *se = OnlMonServer::instance();
0120   time_t tmpticks = evt->getTime();
0121 
0122   // first test if a new run has started and call BOR/EOR methods of monitors
0123   if (se->RunNumber() == -1)
0124   {
0125     savetmpticks = 0x7FFFFFFF;
0126     eventcnt = 0;
0127     int newrun = evt->getRunNumber();
0128 #ifdef USE_MUTEX
0129     pthread_mutex_lock(&mutex);
0130 #endif
0131     FrameWorkVars->SetBinContent(RUNNUMBERBIN, newrun);
0132     se->BadEvents(0);
0133     se->EventNumber(evt->getEvtSequence());
0134     se->RunNumber(newrun);
0135     // set ticks to the current event, so the begin run has the
0136     // up to date time stamp, while the end run has the time stamp from
0137     // the last event of the previous run
0138     se->CurrentTicks(tmpticks);
0139     se->BeginRun(newrun);
0140     // set trigger mask in et pool frontend
0141     borticks = se->BorTicks();
0142     FrameWorkVars->SetBinContent(BORTIMEBIN, borticks);
0143 #ifdef USE_MUTEX
0144     pthread_mutex_unlock(&mutex);
0145 #endif
0146     eorticks = borticks;
0147   }
0148   /*
0149   if (evt->getEvtLength() <= 0 || evt->getEvtLength() > 2500000)
0150   {
0151     std::ostringstream msg;
0152     msg << __PRETTY_FUNCTION__ << "Discarding event with length "
0153         << evt->getEvtLength();
0154     send_message(MSG_SEV_WARNING, msg.str());
0155     se->AddBadEvent();
0156     return 0;
0157   }
0158   */
0159   int oldrun;
0160   if ((oldrun = se->RunNumber()) != evt->getRunNumber())
0161   {
0162     // ROOT crashes when one thread updates histos while they are
0163     // being saved, need mutex protection here
0164 #ifdef USE_MUTEX
0165     pthread_mutex_lock(&mutex);
0166 #endif
0167     FrameWorkVars->SetBinContent(EORTIMEBIN, eorticks);  // set EOR time
0168     se->EndRun(oldrun);
0169     se->WriteHistoFile();
0170     se->Reset();  // reset all monitors
0171     int newrun = evt->getRunNumber();
0172     FrameWorkVars->SetBinContent(RUNNUMBERBIN, newrun);
0173     eorticks = tmpticks;  // initialize eorticks
0174     se->BadEvents(0);
0175     se->RunNumber(newrun);
0176     se->EventNumber(evt->getEvtSequence());
0177     // set ticks to the current event, so the begin run has the
0178     // up to date time stamp, while the end run has the time stamp from
0179     // the last event of the previous run
0180     se->CurrentTicks(tmpticks);
0181     se->BeginRun(newrun);
0182     borticks = se->BorTicks();
0183     FrameWorkVars->SetBinContent(BORTIMEBIN,  borticks);
0184     eventcnt = 0;
0185 #ifdef USE_MUTEX
0186     pthread_mutex_unlock(&mutex);
0187 #endif
0188     savetmpticks = 0x7FFFFFFF;
0189   }
0190 
0191   se->CurrentTicks(tmpticks);
0192   // check if we get an event which was earlier than the BOR timestamp
0193   // save earliest time stamp and number of events with earlier timestamps
0194   if (tmpticks < borticks)
0195   {
0196 #ifdef USE_MUTEX
0197     pthread_mutex_lock(&mutex);
0198 #endif
0199     FrameWorkVars->AddBinContent(EARLYEVENTNUMBIN);
0200     if (tmpticks < savetmpticks)
0201     {
0202       savetmpticks = tmpticks;
0203       FrameWorkVars->SetBinContent(EARLYEVENTTIMEBIN, tmpticks);
0204     }
0205 #ifdef USE_MUTEX
0206     pthread_mutex_unlock(&mutex);
0207 #endif
0208   }
0209   if (eorticks < se->CurrentTicks())
0210   {
0211     eorticks = se->CurrentTicks();
0212   }
0213   if (evt->getErrorCode())
0214   {
0215     std::ostringstream msg;
0216     msg << __PRETTY_FUNCTION__ << " Event with error code: "
0217         << evt->getErrorCode()
0218         << " discarding event " << evt->getEvtSequence();
0219     send_message(MSG_SEV_WARNING, msg.str());
0220     se->AddBadEvent();
0221     return 0;
0222   }
0223 
0224   eventcnt++;
0225 #ifdef USE_MUTEX
0226   pthread_mutex_lock(&mutex);
0227 #endif
0228   FrameWorkVars->SetBinContent(CURRENTTIMEBIN, se->CurrentTicks());
0229   se->EventNumber(evt->getEvtSequence());
0230   se->IncrementEventCounter();
0231   FrameWorkVars->SetBinContent(EVENTCOUNTERBIN,se->EventCounter());
0232   se->process_event(evt);
0233   FrameWorkVars->SetBinContent(GL1COUNTERBIN,se->Gl1FoundCounter());
0234 #ifdef USE_MUTEX
0235   pthread_mutex_unlock(&mutex);
0236 #endif
0237   return 0;
0238 }
0239 
0240 int setup_server()
0241 {
0242   return 0;
0243 }
0244 
0245 static void *server(void * /* arg */)
0246 {
0247   OnlMonServer *Onlmonserver = OnlMonServer::instance();
0248   int MoniPort = OnlMonDefs::MONIPORT;
0249   //  int thread_arg[5];
0250 #ifdef USE_MUTEX
0251   pthread_mutex_lock(&mutex);
0252 #endif
0253   TServerSocket *ss = nullptr;
0254   sleep(5);
0255   do
0256   {
0257     delete ss; // ok to delete nullptr
0258     ss = new TServerSocket(MoniPort, kTRUE);
0259     // Accept a connection and return a full-duplex communication socket.
0260     Onlmonserver->PortNumber(MoniPort);
0261     if ((MoniPort - OnlMonDefs::MONIPORT) >= OnlMonDefs::NUMMONIPORT)
0262     {
0263       std::ostringstream msg;
0264       msg << "Too many Online Monitors running on this machine, bailing out";
0265       send_message(MSG_SEV_FATAL, msg.str());
0266 
0267       exit(1);
0268     }
0269     MoniPort++;
0270     if (!ss->IsValid())
0271     {
0272       printf("Ignore ROOT error about socket in use, I try another one\n");
0273     }
0274   } while (!ss->IsValid());  // from do{}while
0275 
0276   // root keeps a list of sockets and tries to close them when quitting.
0277   // this interferes with my own threading and makes valgrind crash
0278   // The solution is to remove the TServerSocket *ss from roots list of
0279   // sockets. Then it will leave this socket alone.
0280   int isock = gROOT->GetListOfSockets()->IndexOf(ss);
0281   gROOT->GetListOfSockets()->RemoveAt(isock);
0282   sleep(10);
0283 #ifdef USE_MUTEX
0284   pthread_mutex_unlock(&mutex);
0285 #endif
0286 again:
0287   TSocket *s0 = ss->Accept();
0288   if (!s0)
0289   {
0290     std::cout << "Server socket " << OnlMonDefs::MONIPORT
0291               << " in use, either go to a different node or" << std::endl
0292               << "change MONIPORT in server/OnlMonDefs.h and recompile" << std::endl
0293               << "server and client" << std::endl;
0294     exit(1);
0295   }
0296   // mutex protected since writing of histo
0297   // to outgoing buffer and updating by other thread do not
0298   // go well together
0299   if (Onlmonserver->Verbosity() > 2)
0300   {
0301     TInetAddress adr = s0->GetInetAddress();
0302     std::cout << "got connection from " << std::endl;
0303     adr.Print();
0304   }
0305   //  std::cout << "try locking mutex" << std::endl;
0306 #ifdef USE_MUTEX
0307   pthread_mutex_lock(&mutex);
0308 #endif
0309   // std::cout << "got mutex" << std::endl;
0310   handleconnection(s0);
0311   // std::cout << "try releasing mutex" << std::endl;
0312 #ifdef USE_MUTEX
0313   pthread_mutex_unlock(&mutex);
0314 #endif
0315   // std::cout << "mutex released" << std::endl;
0316   delete s0;
0317   /*
0318     if (!aargh)
0319     {
0320     std::cout << "making thread" << std::endl;
0321     aargh = new TThread(handletest,(void *)0);
0322     aargh->Run();
0323     }
0324   */
0325   // std::cout << "closing socket" << std::endl;
0326   // s0->Close();
0327   goto again;
0328 }
0329 
0330 void handletest(void * /* arg */)
0331 {
0332   //  std::cout << "threading" << std::endl;
0333   return;
0334 }
0335 
0336 void handleconnection(void *arg)
0337 {
0338   TSocket *s0 = (TSocket *) arg;
0339 
0340   OnlMonServer *Onlmonserver = OnlMonServer::instance();
0341   /*
0342     int val;
0343     s0->GetOption(kSendBuffer, val);
0344     printf("sendbuffer size: %d\n", val);
0345     s0->GetOption(kRecvBuffer, val);
0346     printf("recvbuffer size: %d\n", val);
0347   */
0348   TMessage *mess = nullptr;
0349   TMessage outgoing(kMESS_OBJECT);
0350   while (true)
0351   {
0352     if (Onlmonserver->Verbosity() > 2)
0353     {
0354       std::cout << "Waiting for message" << std::endl;
0355     }
0356     s0->Recv(mess);
0357     if (!mess)
0358     {
0359       std::cout << "Broken Connection, closing socket" << std::endl;
0360       break;
0361     }
0362     if (mess->What() == kMESS_STRING)
0363     {
0364       char strchr[OnlMonDefs::MSGLEN];
0365       mess->ReadString(strchr, OnlMonDefs::MSGLEN);
0366       delete mess;
0367       mess = nullptr;
0368       std::string str = strchr;
0369       if (Onlmonserver->Verbosity() > 2)
0370       {
0371         std::cout << "received message: " << str << std::endl;
0372       }
0373       if (str == "Finished")
0374       {
0375         break;
0376       }
0377       else if (str == "WriteRootFile")
0378       {
0379         Onlmonserver->WriteHistoFile();
0380         s0->Send("Finished");
0381         break;
0382       }
0383       else if (str == "Ack")
0384       {
0385         continue;
0386       }
0387       else if (str == "HistoList")
0388       {
0389         if (Onlmonserver->Verbosity() > 2)
0390         {
0391           std::cout << "number of histos: " << Onlmonserver->nHistos() << std::endl;
0392         }
0393         for (auto monitors = Onlmonserver->monibegin(); monitors != Onlmonserver->moniend(); ++monitors)
0394         {
0395           for (auto &histos : monitors->second)
0396           {
0397             std::string subsyshisto = monitors->first + ' ' + histos.first;
0398             if (Onlmonserver->Verbosity() > 2)
0399             {
0400               std::cout << "subsystem: " << monitors->first << ", histo: " << histos.first << std::endl;
0401               std::cout << " sending: \"" << subsyshisto << "\"" << std::endl;
0402             }
0403             s0->Send(subsyshisto.c_str());
0404             int nbytes = s0->Recv(mess);
0405             delete mess;
0406             mess = nullptr;
0407             if (nbytes <= 0)
0408             {
0409               std::ostringstream msg;
0410 
0411               msg << "Problem receiving message: return code: " << nbytes;
0412               send_message(MSG_SEV_ERROR, msg.str());
0413             }
0414           }
0415         }
0416         s0->Send("Finished");
0417       }
0418       else if (str == "ALL")
0419       {
0420         if (Onlmonserver->Verbosity() > 2)
0421         {
0422           std::cout << "number of histos: " << Onlmonserver->nHistos() << std::endl;
0423         }
0424         for (unsigned int i = 0; i < Onlmonserver->nHistos(); i++)
0425         {
0426           TH1 *histo = Onlmonserver->getHisto(i);
0427           if (histo)
0428           {
0429             outgoing.Reset();
0430             outgoing.WriteObject(histo);
0431             s0->Send(outgoing);
0432             outgoing.Reset();
0433             s0->Recv(mess);
0434             delete mess;
0435             mess = nullptr;
0436           }
0437         }
0438         s0->Send("Finished");
0439       }
0440       else if (str.find("ISRUNNING") != std::string::npos)
0441       {
0442         std::string answer = "No";
0443         unsigned int pos_space = str.find(' ');
0444         std::string moniname = str.substr(pos_space + 1, str.size());
0445         for (auto moniter = Onlmonserver->monitor_vec_begin(); moniter != Onlmonserver->monitor_vec_end(); ++moniter)
0446         {
0447           if ((*moniter)->Name() == moniname)
0448           {
0449             answer = "Yes";
0450             break;
0451           }
0452         }
0453         if (Onlmonserver->Verbosity() > 2)
0454         {
0455           std::cout << "got " << str << ", replied " << answer << std::endl;
0456         }
0457         s0->Send(answer.c_str());
0458       }
0459       else if (str == "LISTMONITORS")
0460       {
0461         s0->Send("go");
0462         for (auto moniter = Onlmonserver->monitor_vec_begin(); moniter != Onlmonserver->monitor_vec_end(); ++moniter)
0463         {
0464           if (Onlmonserver->Verbosity() > 2)
0465           {
0466             std::cout << "sending " << (*moniter)->Name().c_str() << std::endl;
0467           }
0468           s0->Send((*moniter)->Name().c_str());
0469         }
0470         s0->Send("Finished");
0471         break;
0472       }
0473       else if (str == "LIST")
0474       {
0475         s0->Send("go");
0476         while (true)
0477         {
0478           char strmess[OnlMonDefs::MSGLEN];
0479           s0->Recv(mess);
0480           if (!mess)
0481           {
0482             break;
0483           }
0484           if (mess->What() == kMESS_STRING)
0485           {
0486             mess->ReadString(strmess, OnlMonDefs::MSGLEN);
0487             delete mess;
0488             mess = nullptr;
0489             if (std::string(strmess) == "alldone")
0490             {
0491               break;
0492             }
0493           }
0494           std::string str1(strmess);
0495           unsigned int pos_space = str1.find(' ');
0496           if (Onlmonserver->Verbosity() > 2)
0497           {
0498             std::cout << __PRETTY_FUNCTION__ << " getting subsystem " << str1.substr(0, pos_space) << ", histo " << str1.substr(pos_space + 1, str1.size()) << std::endl;
0499           }
0500           TH1 *histo = Onlmonserver->getHisto(str1.substr(0, pos_space), str1.substr(pos_space + 1, str1.size()));
0501           if (histo)
0502           {
0503             outgoing.Reset();
0504             outgoing.WriteObject(histo);
0505             s0->Send(outgoing);
0506             outgoing.Reset();
0507           }
0508           else
0509           {
0510             s0->Send("UnknownHisto");
0511           }
0512           //          delete mess;
0513         }
0514         s0->Send("Finished");
0515       }
0516       else
0517       {
0518         std::string strstr(str);
0519         unsigned int pos_space = str.find(' ');
0520         TH1 *histo = Onlmonserver->getHisto(strstr.substr(0, pos_space), strstr.substr(pos_space + 1, str.size()));
0521         if (histo)
0522         {
0523           //          const char *hisname = histo->GetName();
0524           outgoing.Reset();
0525           outgoing.WriteObject(histo);
0526           s0->Send(outgoing);
0527           outgoing.Reset();
0528           s0->Recv(mess);
0529           delete mess;
0530           s0->Send("Finished");
0531         }
0532         else
0533         {
0534           s0->Send("UnknownHisto");
0535         }
0536       }
0537     }
0538     else if (mess->What() == kMESS_OBJECT)
0539     {
0540       printf("got object of class: %s\n", mess->GetClass()->GetName());
0541       delete mess;
0542     }
0543     else
0544     {
0545       printf("*** Unexpected message ***\n");
0546       delete mess;
0547     }
0548   }
0549 
0550   // Close the socket.
0551   s0->Close();
0552   return;
0553 }
0554 
0555 int send_message(const int severity, const std::string &msg)
0556 {
0557   // check $ONLINE_MAIN/include/msg_profile.h for MSG defs
0558   // if you do not find your subsystem, do not initialize it and drop me a line
0559   msg_control *Message = new msg_control(MSG_TYPE_MONITORING,
0560                                          MSG_SOURCE_DAQMON,
0561                                          severity, "pmonitorInterface");
0562   std::cout << *Message << msg << std::endl;
0563   delete Message;
0564   return 0;
0565 }
0566 
0567 // we send a kill -2 to the server if it should terminate
0568 void signalhandler(int signum)
0569 {
0570   std::cout << "Signal " << signum << " received, saving histos" << std::endl;
0571   OnlMonServer *Onlmonserver = OnlMonServer::instance();
0572   Onlmonserver->WriteHistoFile();
0573   gSystem->Exit(0);
0574 }