Back to home page

sPhenix code displayed by LXR

 
 

    


File indexing completed on 2025-08-03 08:20:34

0001 
0002 
0003 #define coutfl cout << __FILE__<< "  " << __LINE__ << " "
0004 #define cerrfl cerr << __FILE__<< "  " << __LINE__ << " "
0005 
0006 
0007 #include <iostream>
0008 #include <stdlib.h> 
0009 #include <unistd.h> 
0010 #include <string.h> 
0011 #include <sys/types.h> 
0012 #include <sys/socket.h> 
0013 #include <arpa/inet.h> 
0014 #include <netinet/in.h> 
0015 #include <pthread.h> 
0016 
0017 #ifdef HAVE_GETOPT_H
0018 #include "getopt.h"
0019 #endif
0020 
0021 #include <map>
0022 
0023 
0024 #include "testEventiterator.h"
0025 #include "fileEventiterator.h"
0026 #include "rcdaqEventiterator.h"
0027 #include "oncsEventiterator.h"
0028 #include "listEventiterator.h"
0029 
0030 using namespace std;
0031 
0032 #define PORT     8080 
0033 #define MAXSIZE 5120
0034 
0035 map<int, Event*> EventMap;
0036 Eventiterator *it;
0037 
0038 
0039 #define RCDAQEVENTITERATOR 1
0040 #define FILEEVENTITERATOR 2
0041 #define TESTEVENTITERATOR 3
0042 #define ONCSEVENTITERATOR 4
0043 #define LISTEVENTITERATOR 5
0044 
0045 void exitmsg()
0046 {
0047   cout << "** usage: gl1_server -nifTrOvh datastream" << std::endl;
0048   cout << "    type  gl1_server -h   for more help" << std::endl;
0049   exit(0);
0050 }
0051 
0052 float requested = 0; 
0053 float sent = 0; 
0054 
0055 void exithelp()
0056 {
0057 
0058   cout << std::endl;
0059   cout << "   gl1_server serves events from a given datastream" << std::endl;
0060   cout << std::endl;
0061   cout << "  List of options: " << std::endl;
0062   cout << " -d <number> depth of buffer" << std::endl;
0063   cout << " -e <event1,event2> buffer from event1-event2 (eg for testing on 1st N events of a run)" << std::endl;
0064   cout << " -f (stream is a file)" << std::endl;
0065   cout << " -T (stream is a test stream)" << std::endl;
0066   cout << " -r (stream is a rcdaq monitoring stream)" << std::endl;
0067   cout << " -O (stream is a legacy ONCS format file)" << std::endl;
0068   cout << " -L (stream is a file list)" << std::endl;
0069   cout << " -x act as a one-event-at-a-time server (depth is irrelevant)" << std::endl;
0070   
0071   cout << " -h this message" << std::endl;
0072   cout << endl;
0073   cout << " debug options" << endl;
0074   cout << " -s <number> sleep so many ticks (in units of usleep)" << std::endl;
0075   cout << " -i <print event identity>" << std::endl;
0076   cout << " -v verbose" << std::endl;
0077   cout << " -c count  print -v -or -i messages only every <count> occurences " << std::endl;
0078   exit(0);
0079 }
0080 
0081 unsigned int depth = 1000;
0082 int evt1 = -1;
0083 int evt2 = -1;
0084 int go_on = 1;
0085 int identify = 0;
0086 int verbose = 0;
0087 int repeatcount =1;
0088 int ittype = RCDAQEVENTITERATOR;
0089 int sleeptime = 0;
0090 int old_runnumber = -9999;
0091 int msgfrequency =1;
0092 int one_at_a_time = 0;
0093 
0094 pthread_mutex_t MapSem;
0095 
0096 
0097 
0098 void * EventLoop( void *arg)
0099 {
0100 
0101   if ( identify) it->identify();
0102   int current_count = 0;
0103 
0104   while ( go_on)
0105     {
0106       Event *e = it->getNextEvent();
0107       if ( ! e)
0108     {
0109       go_on = 0;
0110       return 0;
0111       
0112     }
0113       if ( evt1>0 && (e->getEvtSequence()<evt1 || e->getEvtSequence()>evt2) )
0114         {
0115           continue;
0116         }
0117       e->convert();
0118 
0119       pthread_mutex_lock( &MapSem);
0120       map<int, Event*>::iterator it = EventMap.begin();
0121 
0122       // if we find that our run number has changed, we clear out what we have
0123       if ( old_runnumber != e->getRunNumber())
0124     {
0125       old_runnumber = e->getRunNumber();
0126       requested = 0;
0127       sent = 0;
0128       for ( ; it != EventMap.end(); ++it)
0129         {
0130           delete it->second;
0131         }
0132       EventMap.clear();
0133     }
0134 
0135       // if the next event inserted would exceed the envisioned depth, we remove the oldest 
0136       if (EventMap.size() >= depth)
0137     {
0138       map<int, Event*>::iterator it = EventMap.begin();
0139       if ( verbose ) coutfl << "erasing event " << it->first << " depth = " << EventMap.size() << endl;
0140       delete it->second;
0141       EventMap.erase(it);
0142     }
0143 
0144       // ok, so now we insert...
0145       EventMap[e->getEvtSequence()] = e;
0146 
0147       // and unlock the map
0148       pthread_mutex_unlock( &MapSem);
0149 
0150       current_count++;
0151 
0152       if ( current_count >= msgfrequency && identify )
0153     {
0154       e->identify();
0155       current_count = 0;
0156     }
0157 
0158       if (sleeptime) usleep(sleeptime);
0159     }
0160   return 0;
0161 }
0162 
0163 int send_not_found (int sockfd, const struct sockaddr * cliaddr, socklen_t len)
0164 {
0165   int  buffer[2] = {0};
0166 
0167   sendto(sockfd, (const char *) buffer, sizeof(int), 
0168      MSG_CONFIRM, cliaddr, 
0169      len);
0170 
0171   return 0;
0172 }
0173 
0174 
0175 int 
0176 main(int argc, char *argv[])
0177 { 
0178   int sockfd; 
0179   int  buffer[MAXSIZE];
0180 
0181   int c;
0182 
0183 
0184   int status = -1;
0185 
0186   pthread_mutex_init( &MapSem, 0);
0187 
0188   while ((c = getopt(argc, argv, "d:e:s:c:ifTrOLxvh")) != EOF)
0189     switch (c) 
0190       {
0191       case 'd':
0192     if ( !sscanf(optarg, "%d", &depth) ) exitmsg();
0193     break;
0194 
0195       case 'e':
0196     if ( !sscanf(optarg, "%d,%d", &evt1, &evt2) ) exitmsg();
0197         depth = evt2 - evt1 + 1;
0198         cout << "Buffering only events from " << evt1 << " to " << evt2 << endl;
0199     break;
0200 
0201       case 's':
0202     if ( !sscanf(optarg, "%d", &sleeptime) ) exitmsg();
0203     break;
0204 
0205       case 'c':
0206     if ( !sscanf(optarg, "%d", &msgfrequency) ) exitmsg();
0207     break;
0208 
0209       case 'i':
0210     identify = 1;
0211     break;
0212 
0213       case 'x':
0214     one_at_a_time = 1;
0215     break;
0216 
0217       case 'T':
0218     ittype = TESTEVENTITERATOR;
0219     break;
0220 
0221       case 'f':
0222     ittype = FILEEVENTITERATOR;
0223     break;
0224 
0225       case 'r':
0226     ittype = RCDAQEVENTITERATOR;
0227     break;
0228 
0229       case 'O':
0230     ittype = ONCSEVENTITERATOR;
0231     break;
0232 
0233       case 'L':
0234     ittype = LISTEVENTITERATOR;
0235     break;
0236 
0237       case 'v':   // verbose
0238     verbose++;
0239     break;
0240 
0241       case 'h':
0242     exithelp();
0243     break;
0244       }
0245 
0246   switch (ittype)
0247     {
0248     case RCDAQEVENTITERATOR:
0249       if ( optind+1>argc) 
0250     {
0251       std::string host = "localhost";
0252     
0253       if ( getenv("RCDAQHOST")  )
0254         {
0255           host = getenv("RCDAQHOST");
0256         }
0257       
0258       it = new rcdaqEventiterator(host.c_str(), status);
0259 
0260     }
0261       else
0262     {
0263       it = new rcdaqEventiterator(argv[optind], status);
0264     }
0265       break;
0266 
0267     case  TESTEVENTITERATOR:
0268       it = new testEventiterator();
0269       status =0;
0270       break;
0271 
0272     case  FILEEVENTITERATOR:
0273       if ( optind+1>argc) exitmsg();
0274       it = new fileEventiterator(argv[optind], status);
0275       break;
0276      
0277     case  ONCSEVENTITERATOR:
0278       if ( optind+1>argc) exitmsg();
0279       it = new oncsEventiterator(argv[optind], status);
0280       break;
0281 
0282     case  LISTEVENTITERATOR:
0283       if ( optind+1>argc) exitmsg();
0284       it = new listEventiterator(argv[optind], status);
0285       break;
0286 
0287       status = 1;
0288       break;
0289       
0290     default:
0291       exitmsg();
0292       break;
0293     }
0294 
0295   if (status)
0296     {
0297       delete it;
0298       it = 0;
0299       cout << "Could not open input stream" << std::endl;
0300       exit(1);
0301     }
0302 
0303   Event *e = 0;
0304 
0305   pthread_t ThreadEvt = 0;
0306 
0307   if ( one_at_a_time == 0)
0308     {
0309       status = pthread_create(&ThreadEvt, NULL, 
0310                   EventLoop, 
0311                   (void *) 0);
0312    
0313       if (status ) 
0314     {
0315       cout << "error in event thread create " << status << endl;
0316       exit(0);
0317     }
0318     }
0319   
0320   
0321   struct sockaddr_in servaddr, cliaddr; 
0322   
0323   // Creating socket file descriptor 
0324   if ( (sockfd = socket(AF_INET, SOCK_DGRAM, 0)) < 0 )
0325     { 
0326       perror("socket creation failed"); 
0327       exit(EXIT_FAILURE); 
0328   } 
0329   
0330   memset(&servaddr, 0, sizeof(servaddr)); 
0331   memset(&cliaddr, 0, sizeof(cliaddr)); 
0332     
0333   // Filling server information 
0334   servaddr.sin_family = AF_INET; // IPv4 
0335   servaddr.sin_addr.s_addr = INADDR_ANY; 
0336   servaddr.sin_port = htons(PORT); 
0337   
0338   // Bind the socket with the server address 
0339   if ( bind(sockfd, (const struct sockaddr *)&servaddr, 
0340         sizeof(servaddr)) < 0 ) 
0341     { 
0342       perror("bind failed"); 
0343       exit(EXIT_FAILURE); 
0344     } 
0345     
0346   socklen_t len;
0347   int n; 
0348   int current_count = 0;
0349 
0350   len = sizeof(cliaddr); //len is value/result 
0351 
0352   int recbuffer[10];
0353   
0354   while (1)
0355     {
0356       n = recvfrom(sockfd, (char *)recbuffer, 2*sizeof(int), 
0357            MSG_WAITALL, ( struct sockaddr *) &cliaddr, 
0358            &len);
0359       requested += 1;
0360       if ( verbose && ++current_count >= msgfrequency)
0361     {
0362       cout << "request from " << inet_ntoa(cliaddr.sin_addr) << " requesting " << recbuffer[0];
0363     }
0364       
0365       
0366       if (one_at_a_time == 0)
0367     {
0368       pthread_mutex_lock( &MapSem);
0369       map<int, Event*>::iterator it = EventMap.find(recbuffer[0]);
0370       if ( it == EventMap.end() )
0371         {
0372           pthread_mutex_unlock( &MapSem);
0373           
0374           send_not_found(sockfd, (const struct sockaddr *) &cliaddr, len);
0375 
0376           if ( verbose && current_count >= msgfrequency)
0377         {
0378           cout << " Event not delivered";
0379           if ( requested > 0) cout << " " << 100 * sent/requested << "%" << endl;
0380           else cout << endl;
0381           current_count = 0;
0382         }
0383         }
0384       else
0385         {
0386           int nw;
0387           (it->second)->Copy(buffer,MAXSIZE,&nw,"");
0388           pthread_mutex_unlock( &MapSem);
0389           sendto(sockfd, (const char *) buffer, nw*sizeof(int), 
0390              MSG_CONFIRM, (const struct sockaddr *) &cliaddr, 
0391              len); 
0392           sent += 1;
0393           if ( verbose && current_count >= msgfrequency )
0394         {
0395           cout << " Event sent";
0396           if ( requested > 0) cout << " " << 100 * sent/requested << "%" << endl;
0397           current_count = 0;
0398         }
0399         }
0400     }
0401 
0402       else   // we go one at a time
0403     {
0404       // let's stick with the event that we have, if any, first. We get one if we don't.
0405       if (!e) 
0406         {
0407           e = it->getNextEvent();
0408         }
0409 
0410       // so if the stream is exhausted, we end
0411       if ( !e) 
0412         {
0413           send_not_found(sockfd, (const struct sockaddr *) &cliaddr, len);
0414           if ( verbose && ++current_count >= msgfrequency) 
0415         {
0416           cout << " end of stream" << endl;  // clean up the output
0417         }
0418           return 0;
0419         }
0420 
0421       // if we request an event that's in the past, we say "sorry"
0422       if ( recbuffer[0] < e->getEvtSequence()) 
0423         {
0424 
0425           send_not_found(sockfd, (const struct sockaddr *) &cliaddr, len);
0426 
0427           if ( verbose && current_count >= msgfrequency)
0428         {
0429           cout << " Event not delivered";
0430           if ( requested > 0) cout << " " << 100 * sent/requested << "%" << endl;
0431           else cout << endl;
0432           current_count = 0;
0433         }
0434         }
0435       else if ( recbuffer[0] >= e->getEvtSequence() )
0436         {
0437 
0438           // we ask for a valid event. If we don't have it, we skip until we have it, or find the end of the stream
0439           while (recbuffer[0] != e->getEvtSequence() )
0440         {
0441           if ( e) delete e;
0442           e = it->getNextEvent();
0443           // if this is 0 we have arrived at the end of the stream
0444           if ( !e) 
0445             {
0446               send_not_found(sockfd, (const struct sockaddr *) &cliaddr, len);
0447               if ( verbose && ++current_count >= msgfrequency) 
0448             {
0449               cout << " end of stream" << endl;  // clean up the output
0450             }
0451               return 0;
0452             }
0453         }
0454 
0455           int nw;
0456       
0457           e->Copy(buffer,MAXSIZE,&nw,"");
0458           sendto(sockfd, (const char *) buffer, nw*sizeof(int), 
0459              MSG_CONFIRM, (const struct sockaddr *) &cliaddr, 
0460              len); 
0461           sent += 1;
0462           if ( verbose && current_count >= msgfrequency )
0463         {
0464           cout << " Event sent";
0465           if ( requested > 0) cout << " " << 100 * sent/requested << "%" << endl;
0466           current_count = 0;
0467         }
0468           if ( identify) e->identify();
0469         }
0470     }
0471     }
0472   return 0; 
0473 }