Back to home page

sPhenix code displayed by LXR

 
 

    


File indexing completed on 2025-08-03 08:20:34

0001 
0002 
0003 #define coutfl cout << __FILE__<< "  " << __LINE__ << " "
0004 #define cerrfl cerr << __FILE__<< "  " << __LINE__ << " "
0005 
0006 
0007 #include <iostream>
0008 #include <stdlib.h> 
0009 #include <unistd.h> 
0010 #include <string.h> 
0011 #include <sys/types.h> 
0012 #include <sys/socket.h> 
0013 #include <arpa/inet.h> 
0014 #include <netinet/in.h> 
0015 #include <pthread.h> 
0016 
0017 #ifdef HAVE_GETOPT_H
0018 #include "getopt.h"
0019 #endif
0020 
0021 #include <map>
0022 #include <set>
0023 
0024 
0025 
0026 
0027 #include "testEventiterator.h"
0028 #include "fileEventiterator.h"
0029 #include "rcdaqEventiterator.h"
0030 #include "oncsEventiterator.h"
0031 
0032 using namespace std;
0033 
0034 #define PORT     8080 
0035 #define MAXSIZE 5120
0036 
0037 
0038 Eventiterator *it;
0039 
0040 
0041 #define RCDAQEVENTITERATOR 1
0042 #define FILEEVENTITERATOR 2
0043 #define TESTEVENTITERATOR 3
0044 #define ONCSEVENTITERATOR 4
0045 
0046 void exitmsg()
0047 {
0048   cout << "** usage: eventSserver -ivfTrOh datastream" << std::endl;
0049   cout << "    type  eventServer -h   for more help" << std::endl;
0050   exit(0);
0051 }
0052 
0053 float requested = 0; 
0054 float sent = 0; 
0055 
0056 int last_eventnr = 0;
0057 
0058 void exithelp()
0059 {
0060 
0061   cout << std::endl;
0062   cout << "   gl1_server serves events from a given datastream" << std::endl;
0063   cout << std::endl;
0064   cout << "  List of options: " << std::endl;
0065   cout << " -f (stream is a file)" << std::endl;
0066   cout << " -T (stream is a test stream)" << std::endl;
0067   cout << " -r (stream is a rcdaq monitoring stream)" << std::endl;
0068   cout << " -O (stream is a legacy ONCS format file)" << std::endl;
0069   cout << " -h this message" << std::endl;
0070   cout << endl;
0071   cout << " debug options" << endl;
0072   cout << " -s <number> sleep so many ticks (in units of usleep)" << std::endl;
0073   cout << " -i <print event identity>" << std::endl;
0074   cout << " -v verbose" << std::endl;
0075   cout << " -c count  print -v -or -i messages only every <count> occurences " << std::endl;
0076   exit(0);
0077 }
0078 
0079 int go_on = 1;
0080 int identify = 0;
0081 int verbose = 0;
0082 int repeatcount =1;
0083 int ittype = RCDAQEVENTITERATOR;
0084 int sleeptime = 0;
0085 int old_runnumber = -9999;
0086 int msgfrequency =1;
0087 
0088 pthread_mutex_t MapSem;
0089 pthread_mutex_t M_cout;
0090 
0091 #define RANGE (0x7ffff)
0092 
0093 Event* eventarray[RANGE+1] = {0};
0094 
0095 
0096 void * EventLoop( void *arg)
0097 {
0098 
0099   if ( identify) it->identify();
0100   int current_count = 0;
0101 
0102   while ( go_on)
0103     {
0104       Event *e = it->getNextEvent();
0105       if ( ! e)
0106     {
0107       go_on = 0;
0108       return 0;
0109       
0110     }
0111       e->convert();
0112       last_eventnr = e->getEvtSequence() ;
0113 
0114 
0115       if ( old_runnumber != e->getRunNumber())
0116     {
0117       old_runnumber = e->getRunNumber();
0118       requested = 0;
0119       sent = 0;
0120       for ( int i = 0; i< RANGE; i++)
0121         {
0122           delete eventarray[i];
0123           eventarray[i] = 0;
0124         }
0125     }
0126 
0127 
0128 
0129       
0130       pthread_mutex_lock( &MapSem);
0131       //      map<int, Event*>::iterator it = EventMap.begin();
0132 
0133       // if we find that our run number has changed, we clear out what we have
0134 
0135       if ( eventarray[(last_eventnr & RANGE)])
0136     {
0137       delete eventarray[(last_eventnr & RANGE)];
0138     }
0139       
0140       eventarray[(last_eventnr & RANGE)] =e;
0141     
0142 
0143       pthread_mutex_unlock( &MapSem);
0144 
0145       current_count++;
0146 
0147       if ( current_count >= msgfrequency && identify )
0148     {
0149       pthread_mutex_lock( &M_cout);
0150       e->identify();
0151       pthread_mutex_unlock( &M_cout);
0152       current_count = 0;
0153     }
0154 
0155       if (sleeptime) usleep(sleeptime);
0156     }
0157   return 0;
0158 }
0159 
0160 int send_not_found (int sockfd, const struct sockaddr * cliaddr, socklen_t len)
0161 {
0162   int  buffer[2] = {0};
0163 
0164   sendto(sockfd, (const char *) buffer, sizeof(int), 
0165      MSG_CONFIRM, cliaddr, 
0166      len);
0167 
0168   return 0;
0169 }
0170 
0171 
0172 
0173 int 
0174 main(int argc, char *argv[])
0175 { 
0176   int sockfd; 
0177   int  buffer[MAXSIZE];
0178 
0179   int c;
0180 
0181   int ThePort = PORT;
0182 
0183   int status = -1;
0184 
0185   pthread_mutex_init( &MapSem, 0);
0186   pthread_mutex_init( &M_cout, 0);
0187 
0188   while ((c = getopt(argc, argv, "d:s:c:p:ifTrOvh")) != EOF)
0189     switch (c) 
0190       {
0191       case 'd':
0192     coutfl  << " -d flag is obsolete  "  << endl;
0193     break;
0194 
0195       case 's':
0196     if ( !sscanf(optarg, "%d", &sleeptime) ) exitmsg();
0197     break;
0198 
0199       case 'c':
0200     if ( !sscanf(optarg, "%d", &msgfrequency) ) exitmsg();
0201     break;
0202 
0203       case 'p':
0204     if ( !sscanf(optarg, "%d", &ThePort) ) exitmsg();
0205     break;
0206 
0207       case 'i':
0208     identify = 1;
0209     break;
0210 
0211       case 'T':
0212     ittype = TESTEVENTITERATOR;
0213     break;
0214 
0215       case 'f':
0216     ittype = FILEEVENTITERATOR;
0217     break;
0218 
0219       case 'r':
0220     ittype = RCDAQEVENTITERATOR;
0221     break;
0222 
0223       case 'O':
0224     ittype = ONCSEVENTITERATOR;
0225     break;
0226 
0227       case 'v':   // verbose
0228     verbose++;
0229     break;
0230 
0231       case 'h':
0232     exithelp();
0233     break;
0234       }
0235 
0236   switch (ittype)
0237     {
0238     case RCDAQEVENTITERATOR:
0239       if ( optind+1>argc) 
0240     {
0241       std::string host = "localhost";
0242     
0243       if ( getenv("RCDAQHOST")  )
0244         {
0245           host = getenv("RCDAQHOST");
0246         }
0247       
0248       it = new rcdaqEventiterator(host.c_str(), status);
0249 
0250     }
0251       else
0252     {
0253       it = new rcdaqEventiterator(argv[optind], status);
0254     }
0255       break;
0256 
0257     case  TESTEVENTITERATOR:
0258       it = new testEventiterator();
0259       status =0;
0260       break;
0261 
0262     case  FILEEVENTITERATOR:
0263       if ( optind+1>argc) exitmsg();
0264       it = new fileEventiterator(argv[optind], status);
0265       break;
0266      
0267     case  ONCSEVENTITERATOR:
0268       if ( optind+1>argc) exitmsg();
0269       it = new oncsEventiterator(argv[optind], status);
0270       break;
0271 
0272       status = 1;
0273       break;
0274       
0275     default:
0276       exitmsg();
0277       break;
0278     }
0279 
0280   if (status)
0281     {
0282       delete it;
0283       it = 0;
0284       cout << "Could not open input stream" << std::endl;
0285       exit(1);
0286     }
0287 
0288 
0289   pthread_t ThreadEvt;
0290 
0291   status = pthread_create(&ThreadEvt, NULL, 
0292               EventLoop, 
0293               (void *) 0);
0294    
0295   if (status ) 
0296     {
0297       cout << "error in event thread create " << status << endl;
0298       exit(0);
0299     }
0300 
0301   
0302   
0303   struct sockaddr_in servaddr, cliaddr; 
0304   
0305   // Creating socket file descriptor 
0306   if ( (sockfd = socket(AF_INET, SOCK_DGRAM, 0)) < 0 )
0307     { 
0308       perror("socket creation failed"); 
0309       exit(EXIT_FAILURE); 
0310   } 
0311   
0312   memset(&servaddr, 0, sizeof(servaddr)); 
0313   memset(&cliaddr, 0, sizeof(cliaddr)); 
0314     
0315   // Filling server information 
0316   servaddr.sin_family = AF_INET; // IPv4 
0317   servaddr.sin_addr.s_addr = INADDR_ANY; 
0318   servaddr.sin_port = htons(ThePort); 
0319   
0320   // Bind the socket with the server address 
0321   if ( bind(sockfd, (const struct sockaddr *)&servaddr, 
0322         sizeof(servaddr)) < 0 ) 
0323     { 
0324       perror("bind failed"); 
0325       exit(EXIT_FAILURE); 
0326     } 
0327     
0328   socklen_t len;
0329   int n; 
0330   int current_count = 0;
0331 
0332   len = sizeof(cliaddr); //len is value/result 
0333 
0334   int recbuffer[10];
0335   
0336   while (1)
0337     {
0338       n = recvfrom(sockfd, (char *)recbuffer, 2*sizeof(int), 
0339            MSG_WAITALL, ( struct sockaddr *) &cliaddr, 
0340            &len);
0341       requested += 1;
0342       
0343 
0344       if ( recbuffer[0] > last_eventnr)
0345     {
0346       send_not_found(sockfd, (const struct sockaddr *) &cliaddr, len);
0347       if ( verbose && ++current_count >= msgfrequency)
0348         {
0349           
0350           pthread_mutex_lock( &M_cout);
0351           cout << "request from " << inet_ntoa(cliaddr.sin_addr) << " not delivered, beyond scope " << recbuffer[0]
0352            << " last  " << last_eventnr << " diff: " << last_eventnr - recbuffer[0] << endl;
0353           pthread_mutex_unlock( &M_cout);
0354           current_count = 0;
0355         }
0356     }
0357       else
0358     {
0359       pthread_mutex_lock( &MapSem);
0360 
0361       Event *x = eventarray[recbuffer[0] & RANGE];
0362       
0363       if ( x == 0 || x->getEvtSequence() != recbuffer[0])
0364         {
0365           pthread_mutex_unlock( &MapSem);
0366           
0367           send_not_found(sockfd, (const struct sockaddr *) &cliaddr, len);
0368           
0369           
0370           if ( verbose && ++current_count >= msgfrequency)
0371         {
0372           pthread_mutex_lock( &M_cout);
0373           cout << "request from " << inet_ntoa(cliaddr.sin_addr) << " not delivered " << recbuffer[0]
0374                << " last  " << last_eventnr << " diff: " << last_eventnr - recbuffer[0] ;
0375           if ( requested > 0) cout << " " << 100 * sent/requested << "%" << endl;
0376           else cout << endl;
0377           pthread_mutex_unlock( &M_cout);
0378           current_count = 0;
0379         }
0380         }
0381       else
0382         {
0383           int nw;
0384           x->Copy(buffer,MAXSIZE,&nw,"");
0385           pthread_mutex_unlock( &MapSem);
0386           sendto(sockfd, (const char *) buffer, nw*sizeof(int), 
0387              MSG_CONFIRM, (const struct sockaddr *) &cliaddr, 
0388              len); 
0389           sent += 1;
0390           if ( verbose && ++current_count >= msgfrequency )
0391         {
0392           pthread_mutex_lock( &M_cout);
0393           cout << "request from " << inet_ntoa(cliaddr.sin_addr) << " event sent " << recbuffer[0];
0394           if ( requested > 0) cout << " " << 100 * sent/requested << "%" << endl;
0395           else cout << endl;
0396           pthread_mutex_unlock( &M_cout);
0397           current_count = 0;
0398         }
0399         }
0400     }
0401     }
0402   return 0; 
0403 }