Back to home page

sPhenix code displayed by LXR

 
 

    


File indexing completed on 2026-04-05 08:15:46

0001 
0002 
0003 #define coutfl cout << __FILE__<< "  " << __LINE__ << " "
0004 #define cerrfl cerr << __FILE__<< "  " << __LINE__ << " "
0005 
0006 
0007 #include <iostream>
0008 #include <stdlib.h> 
0009 #include <unistd.h> 
0010 #include <string.h> 
0011 #include <sys/types.h> 
0012 #include <sys/socket.h> 
0013 #include <arpa/inet.h> 
0014 #include <netinet/in.h> 
0015 #include <pthread.h> 
0016 
0017 #ifdef HAVE_GETOPT_H
0018 #include "getopt.h"
0019 #endif
0020 
0021 #include <map>
0022 #include <set>
0023 
0024 
0025 
0026 
0027 #include "testEventiterator.h"
0028 #include "fileEventiterator.h"
0029 #include "rcdaqEventiterator.h"
0030 #include "oncsEventiterator.h"
0031 
0032 using namespace std;
0033 
0034 #define PORT     8080 
0035 #define MAXSIZE 5120
0036 
0037 
0038 Eventiterator *it;
0039 
0040 
0041 #define RCDAQEVENTITERATOR 1
0042 #define FILEEVENTITERATOR 2
0043 #define TESTEVENTITERATOR 3
0044 #define ONCSEVENTITERATOR 4
0045 
0046 void exitmsg()
0047 {
0048   cout << "** usage: eventSserver -ivfTrOh datastream" << std::endl;
0049   cout << "    type  eventServer -h   for more help" << std::endl;
0050   exit(0);
0051 }
0052 
0053 float requested = 0; 
0054 float sent = 0; 
0055 
0056 int last_eventnr = 0;
0057 
0058 void exithelp()
0059 {
0060 
0061   cout << std::endl;
0062   cout << "   gl1_server serves events from a given datastream" << std::endl;
0063   cout << std::endl;
0064   cout << "  List of options: " << std::endl;
0065   cout << " -f (stream is a file)" << std::endl;
0066   cout << " -T (stream is a test stream)" << std::endl;
0067   cout << " -r (stream is a rcdaq monitoring stream)" << std::endl;
0068   cout << " -O (stream is a legacy ONCS format file)" << std::endl;
0069   cout << " -h this message" << std::endl;
0070   cout << endl;
0071   cout << " debug options" << endl;
0072   cout << " -s <number> sleep so many ticks (in units of usleep)" << std::endl;
0073   cout << " -i <print event identity>" << std::endl;
0074   cout << " -v verbose" << std::endl;
0075   cout << " -c count  print -v -or -i messages only every <count> occurences " << std::endl;
0076   exit(0);
0077 }
0078 
0079 int go_on = 1;
0080 int identify = 0;
0081 int verbose = 0;
0082 int repeatcount =1;
0083 int ittype = RCDAQEVENTITERATOR;
0084 int sleeptime = 0;
0085 int old_runnumber = -9999;
0086 int msgfrequency =1;
0087 
0088 pthread_mutex_t MapSem;
0089 pthread_mutex_t M_cout;
0090 
0091 //#define RANGE (0x7ffff)
0092 #define RANGE (0x1fffff)
0093 
0094 Event* eventarray[RANGE+1] = {0};
0095 
0096 
0097 void * EventLoop( void *arg)
0098 {
0099 
0100   if ( identify) it->identify();
0101   int current_count = 0;
0102 
0103   while ( go_on)
0104     {
0105       Event *e = it->getNextEvent();
0106       if ( ! e)
0107     {
0108       go_on = 0;
0109       return 0;
0110       
0111     }
0112       e->convert();
0113       last_eventnr = e->getEvtSequence() ;
0114 
0115 
0116       if ( old_runnumber != e->getRunNumber())
0117     {
0118       old_runnumber = e->getRunNumber();
0119       requested = 0;
0120       sent = 0;
0121       for ( int i = 0; i< RANGE; i++)
0122         {
0123           delete eventarray[i];
0124           eventarray[i] = 0;
0125         }
0126     }
0127 
0128 
0129 
0130       
0131       pthread_mutex_lock( &MapSem);
0132       //      map<int, Event*>::iterator it = EventMap.begin();
0133 
0134       // if we find that our run number has changed, we clear out what we have
0135 
0136       if ( eventarray[(last_eventnr & RANGE)])
0137     {
0138       delete eventarray[(last_eventnr & RANGE)];
0139     }
0140       
0141       eventarray[(last_eventnr & RANGE)] =e;
0142     
0143 
0144       pthread_mutex_unlock( &MapSem);
0145 
0146       current_count++;
0147 
0148       if ( current_count >= msgfrequency && identify )
0149     {
0150       pthread_mutex_lock( &M_cout);
0151       e->identify();
0152       pthread_mutex_unlock( &M_cout);
0153       current_count = 0;
0154     }
0155 
0156       if (sleeptime) usleep(sleeptime);
0157     }
0158   return 0;
0159 }
0160 
0161 int send_not_found (int sockfd, const struct sockaddr * cliaddr, socklen_t len)
0162 {
0163   int  buffer[2] = {0};
0164 
0165   sendto(sockfd, (const char *) buffer, sizeof(int), 
0166      MSG_CONFIRM, cliaddr, 
0167      len);
0168 
0169   return 0;
0170 }
0171 
0172 
0173 
0174 int 
0175 main(int argc, char *argv[])
0176 { 
0177   int sockfd; 
0178   int  buffer[MAXSIZE];
0179 
0180   int c;
0181 
0182   int ThePort = PORT;
0183 
0184   int status = -1;
0185 
0186   pthread_mutex_init( &MapSem, 0);
0187   pthread_mutex_init( &M_cout, 0);
0188 
0189   while ((c = getopt(argc, argv, "d:s:c:p:ifTrOvh")) != EOF)
0190     switch (c) 
0191       {
0192       case 'd':
0193     coutfl  << " -d flag is obsolete  "  << endl;
0194     break;
0195 
0196       case 's':
0197     if ( !sscanf(optarg, "%d", &sleeptime) ) exitmsg();
0198     break;
0199 
0200       case 'c':
0201     if ( !sscanf(optarg, "%d", &msgfrequency) ) exitmsg();
0202     break;
0203 
0204       case 'p':
0205     if ( !sscanf(optarg, "%d", &ThePort) ) exitmsg();
0206     break;
0207 
0208       case 'i':
0209     identify = 1;
0210     break;
0211 
0212       case 'T':
0213     ittype = TESTEVENTITERATOR;
0214     break;
0215 
0216       case 'f':
0217     ittype = FILEEVENTITERATOR;
0218     break;
0219 
0220       case 'r':
0221     ittype = RCDAQEVENTITERATOR;
0222     break;
0223 
0224       case 'O':
0225     ittype = ONCSEVENTITERATOR;
0226     break;
0227 
0228       case 'v':   // verbose
0229     verbose++;
0230     break;
0231 
0232       case 'h':
0233     exithelp();
0234     break;
0235       }
0236 
0237   switch (ittype)
0238     {
0239     case RCDAQEVENTITERATOR:
0240       if ( optind+1>argc) 
0241     {
0242       std::string host = "localhost";
0243     
0244       if ( getenv("RCDAQHOST")  )
0245         {
0246           host = getenv("RCDAQHOST");
0247         }
0248       
0249       it = new rcdaqEventiterator(host.c_str(), status);
0250 
0251     }
0252       else
0253     {
0254       it = new rcdaqEventiterator(argv[optind], status);
0255     }
0256       break;
0257 
0258     case  TESTEVENTITERATOR:
0259       it = new testEventiterator();
0260       status =0;
0261       break;
0262 
0263     case  FILEEVENTITERATOR:
0264       if ( optind+1>argc) exitmsg();
0265       it = new fileEventiterator(argv[optind], status);
0266       break;
0267      
0268     case  ONCSEVENTITERATOR:
0269       if ( optind+1>argc) exitmsg();
0270       it = new oncsEventiterator(argv[optind], status);
0271       break;
0272 
0273       status = 1;
0274       break;
0275       
0276     default:
0277       exitmsg();
0278       break;
0279     }
0280 
0281   if (status)
0282     {
0283       delete it;
0284       it = 0;
0285       cout << "Could not open input stream" << std::endl;
0286       exit(1);
0287     }
0288 
0289 
0290   pthread_t ThreadEvt;
0291 
0292   status = pthread_create(&ThreadEvt, NULL, 
0293               EventLoop, 
0294               (void *) 0);
0295    
0296   if (status ) 
0297     {
0298       cout << "error in event thread create " << status << endl;
0299       exit(0);
0300     }
0301 
0302   
0303   
0304   struct sockaddr_in servaddr, cliaddr; 
0305   
0306   // Creating socket file descriptor 
0307   if ( (sockfd = socket(AF_INET, SOCK_DGRAM, 0)) < 0 )
0308     { 
0309       perror("socket creation failed"); 
0310       exit(EXIT_FAILURE); 
0311   } 
0312   
0313   memset(&servaddr, 0, sizeof(servaddr)); 
0314   memset(&cliaddr, 0, sizeof(cliaddr)); 
0315     
0316   // Filling server information 
0317   servaddr.sin_family = AF_INET; // IPv4 
0318   servaddr.sin_addr.s_addr = INADDR_ANY; 
0319   servaddr.sin_port = htons(ThePort); 
0320   
0321   // Bind the socket with the server address 
0322   if ( bind(sockfd, (const struct sockaddr *)&servaddr, 
0323         sizeof(servaddr)) < 0 ) 
0324     { 
0325       perror("bind failed"); 
0326       exit(EXIT_FAILURE); 
0327     } 
0328     
0329   socklen_t len;
0330   int n; 
0331   int current_count = 0;
0332 
0333   len = sizeof(cliaddr); //len is value/result 
0334 
0335   int recbuffer[10];
0336   
0337   while (1)
0338     {
0339       n = recvfrom(sockfd, (char *)recbuffer, 2*sizeof(int), 
0340            MSG_WAITALL, ( struct sockaddr *) &cliaddr, 
0341            &len);
0342       requested += 1;
0343       
0344 
0345       if ( recbuffer[0] > last_eventnr)
0346     {
0347       send_not_found(sockfd, (const struct sockaddr *) &cliaddr, len);
0348       if ( verbose && ++current_count >= msgfrequency)
0349         {
0350           
0351           pthread_mutex_lock( &M_cout);
0352           cout << "request from " << inet_ntoa(cliaddr.sin_addr) << " not delivered, beyond scope " << recbuffer[0]
0353            << " last  " << last_eventnr << " diff: " << last_eventnr - recbuffer[0] << endl;
0354           pthread_mutex_unlock( &M_cout);
0355           current_count = 0;
0356         }
0357     }
0358       else
0359     {
0360       pthread_mutex_lock( &MapSem);
0361 
0362       Event *x = eventarray[recbuffer[0] & RANGE];
0363       
0364       if ( x == 0 || x->getEvtSequence() != recbuffer[0])
0365         {
0366           pthread_mutex_unlock( &MapSem);
0367           
0368           send_not_found(sockfd, (const struct sockaddr *) &cliaddr, len);
0369           
0370           
0371           if ( verbose && ++current_count >= msgfrequency)
0372         {
0373           pthread_mutex_lock( &M_cout);
0374           cout << "request from " << inet_ntoa(cliaddr.sin_addr) << " not delivered " << recbuffer[0]
0375                << " last  " << last_eventnr << " diff: " << last_eventnr - recbuffer[0] ;
0376           if ( requested > 0) cout << " " << 100 * sent/requested << "%" << endl;
0377           else cout << endl;
0378           pthread_mutex_unlock( &M_cout);
0379           current_count = 0;
0380         }
0381         }
0382       else
0383         {
0384           int nw;
0385           x->Copy(buffer,MAXSIZE,&nw,"");
0386           pthread_mutex_unlock( &MapSem);
0387           sendto(sockfd, (const char *) buffer, nw*sizeof(int), 
0388              MSG_CONFIRM, (const struct sockaddr *) &cliaddr, 
0389              len); 
0390           sent += 1;
0391           if ( verbose && ++current_count >= msgfrequency )
0392         {
0393           pthread_mutex_lock( &M_cout);
0394           cout << "request from " << inet_ntoa(cliaddr.sin_addr) << " event sent " << recbuffer[0];
0395           if ( requested > 0) cout << " " << 100 * sent/requested << "%" << endl;
0396           else cout << endl;
0397           pthread_mutex_unlock( &M_cout);
0398           current_count = 0;
0399         }
0400         }
0401     }
0402     }
0403   return 0; 
0404 }