File indexing completed on 2025-08-03 08:20:34
0001
0002
0003 #define coutfl cout << __FILE__<< " " << __LINE__ << " "
0004 #define cerrfl cerr << __FILE__<< " " << __LINE__ << " "
0005
0006
0007 #include <iostream>
0008 #include <stdlib.h>
0009 #include <unistd.h>
0010 #include <string.h>
0011 #include <sys/types.h>
0012 #include <sys/socket.h>
0013 #include <arpa/inet.h>
0014 #include <netinet/in.h>
0015 #include <pthread.h>
0016
0017 #ifdef HAVE_GETOPT_H
0018 #include "getopt.h"
0019 #endif
0020
0021 #include <map>
0022
0023
0024 #include "testEventiterator.h"
0025 #include "fileEventiterator.h"
0026 #include "rcdaqEventiterator.h"
0027 #include "oncsEventiterator.h"
0028 #include "listEventiterator.h"
0029
0030 using namespace std;
0031
0032 #define PORT 8080
0033 #define MAXSIZE 5120
0034
0035 map<int, Event*> EventMap;
0036 Eventiterator *it;
0037
0038
0039 #define RCDAQEVENTITERATOR 1
0040 #define FILEEVENTITERATOR 2
0041 #define TESTEVENTITERATOR 3
0042 #define ONCSEVENTITERATOR 4
0043 #define LISTEVENTITERATOR 5
0044
0045 void exitmsg()
0046 {
0047 cout << "** usage: gl1_server -nifTrOvh datastream" << std::endl;
0048 cout << " type gl1_server -h for more help" << std::endl;
0049 exit(0);
0050 }
0051
0052 float requested = 0;
0053 float sent = 0;
0054
0055 void exithelp()
0056 {
0057
0058 cout << std::endl;
0059 cout << " gl1_server serves events from a given datastream" << std::endl;
0060 cout << std::endl;
0061 cout << " List of options: " << std::endl;
0062 cout << " -d <number> depth of buffer" << std::endl;
0063 cout << " -e <event1,event2> buffer from event1-event2 (eg for testing on 1st N events of a run)" << std::endl;
0064 cout << " -f (stream is a file)" << std::endl;
0065 cout << " -T (stream is a test stream)" << std::endl;
0066 cout << " -r (stream is a rcdaq monitoring stream)" << std::endl;
0067 cout << " -O (stream is a legacy ONCS format file)" << std::endl;
0068 cout << " -L (stream is a file list)" << std::endl;
0069 cout << " -x act as a one-event-at-a-time server (depth is irrelevant)" << std::endl;
0070
0071 cout << " -h this message" << std::endl;
0072 cout << endl;
0073 cout << " debug options" << endl;
0074 cout << " -s <number> sleep so many ticks (in units of usleep)" << std::endl;
0075 cout << " -i <print event identity>" << std::endl;
0076 cout << " -v verbose" << std::endl;
0077 cout << " -c count print -v -or -i messages only every <count> occurences " << std::endl;
0078 exit(0);
0079 }
0080
0081 unsigned int depth = 1000;
0082 int evt1 = -1;
0083 int evt2 = -1;
0084 int go_on = 1;
0085 int identify = 0;
0086 int verbose = 0;
0087 int repeatcount =1;
0088 int ittype = RCDAQEVENTITERATOR;
0089 int sleeptime = 0;
0090 int old_runnumber = -9999;
0091 int msgfrequency =1;
0092 int one_at_a_time = 0;
0093
0094 pthread_mutex_t MapSem;
0095
0096
0097
0098 void * EventLoop( void *arg)
0099 {
0100
0101 if ( identify) it->identify();
0102 int current_count = 0;
0103
0104 while ( go_on)
0105 {
0106 Event *e = it->getNextEvent();
0107 if ( ! e)
0108 {
0109 go_on = 0;
0110 return 0;
0111
0112 }
0113 if ( evt1>0 && (e->getEvtSequence()<evt1 || e->getEvtSequence()>evt2) )
0114 {
0115 continue;
0116 }
0117 e->convert();
0118
0119 pthread_mutex_lock( &MapSem);
0120 map<int, Event*>::iterator it = EventMap.begin();
0121
0122
0123 if ( old_runnumber != e->getRunNumber())
0124 {
0125 old_runnumber = e->getRunNumber();
0126 requested = 0;
0127 sent = 0;
0128 for ( ; it != EventMap.end(); ++it)
0129 {
0130 delete it->second;
0131 }
0132 EventMap.clear();
0133 }
0134
0135
0136 if (EventMap.size() >= depth)
0137 {
0138 map<int, Event*>::iterator it = EventMap.begin();
0139 if ( verbose ) coutfl << "erasing event " << it->first << " depth = " << EventMap.size() << endl;
0140 delete it->second;
0141 EventMap.erase(it);
0142 }
0143
0144
0145 EventMap[e->getEvtSequence()] = e;
0146
0147
0148 pthread_mutex_unlock( &MapSem);
0149
0150 current_count++;
0151
0152 if ( current_count >= msgfrequency && identify )
0153 {
0154 e->identify();
0155 current_count = 0;
0156 }
0157
0158 if (sleeptime) usleep(sleeptime);
0159 }
0160 return 0;
0161 }
0162
0163 int send_not_found (int sockfd, const struct sockaddr * cliaddr, socklen_t len)
0164 {
0165 int buffer[2] = {0};
0166
0167 sendto(sockfd, (const char *) buffer, sizeof(int),
0168 MSG_CONFIRM, cliaddr,
0169 len);
0170
0171 return 0;
0172 }
0173
0174
0175 int
0176 main(int argc, char *argv[])
0177 {
0178 int sockfd;
0179 int buffer[MAXSIZE];
0180
0181 int c;
0182
0183
0184 int status = -1;
0185
0186 pthread_mutex_init( &MapSem, 0);
0187
0188 while ((c = getopt(argc, argv, "d:e:s:c:ifTrOLxvh")) != EOF)
0189 switch (c)
0190 {
0191 case 'd':
0192 if ( !sscanf(optarg, "%d", &depth) ) exitmsg();
0193 break;
0194
0195 case 'e':
0196 if ( !sscanf(optarg, "%d,%d", &evt1, &evt2) ) exitmsg();
0197 depth = evt2 - evt1 + 1;
0198 cout << "Buffering only events from " << evt1 << " to " << evt2 << endl;
0199 break;
0200
0201 case 's':
0202 if ( !sscanf(optarg, "%d", &sleeptime) ) exitmsg();
0203 break;
0204
0205 case 'c':
0206 if ( !sscanf(optarg, "%d", &msgfrequency) ) exitmsg();
0207 break;
0208
0209 case 'i':
0210 identify = 1;
0211 break;
0212
0213 case 'x':
0214 one_at_a_time = 1;
0215 break;
0216
0217 case 'T':
0218 ittype = TESTEVENTITERATOR;
0219 break;
0220
0221 case 'f':
0222 ittype = FILEEVENTITERATOR;
0223 break;
0224
0225 case 'r':
0226 ittype = RCDAQEVENTITERATOR;
0227 break;
0228
0229 case 'O':
0230 ittype = ONCSEVENTITERATOR;
0231 break;
0232
0233 case 'L':
0234 ittype = LISTEVENTITERATOR;
0235 break;
0236
0237 case 'v':
0238 verbose++;
0239 break;
0240
0241 case 'h':
0242 exithelp();
0243 break;
0244 }
0245
0246 switch (ittype)
0247 {
0248 case RCDAQEVENTITERATOR:
0249 if ( optind+1>argc)
0250 {
0251 std::string host = "localhost";
0252
0253 if ( getenv("RCDAQHOST") )
0254 {
0255 host = getenv("RCDAQHOST");
0256 }
0257
0258 it = new rcdaqEventiterator(host.c_str(), status);
0259
0260 }
0261 else
0262 {
0263 it = new rcdaqEventiterator(argv[optind], status);
0264 }
0265 break;
0266
0267 case TESTEVENTITERATOR:
0268 it = new testEventiterator();
0269 status =0;
0270 break;
0271
0272 case FILEEVENTITERATOR:
0273 if ( optind+1>argc) exitmsg();
0274 it = new fileEventiterator(argv[optind], status);
0275 break;
0276
0277 case ONCSEVENTITERATOR:
0278 if ( optind+1>argc) exitmsg();
0279 it = new oncsEventiterator(argv[optind], status);
0280 break;
0281
0282 case LISTEVENTITERATOR:
0283 if ( optind+1>argc) exitmsg();
0284 it = new listEventiterator(argv[optind], status);
0285 break;
0286
0287 status = 1;
0288 break;
0289
0290 default:
0291 exitmsg();
0292 break;
0293 }
0294
0295 if (status)
0296 {
0297 delete it;
0298 it = 0;
0299 cout << "Could not open input stream" << std::endl;
0300 exit(1);
0301 }
0302
0303 Event *e = 0;
0304
0305 pthread_t ThreadEvt = 0;
0306
0307 if ( one_at_a_time == 0)
0308 {
0309 status = pthread_create(&ThreadEvt, NULL,
0310 EventLoop,
0311 (void *) 0);
0312
0313 if (status )
0314 {
0315 cout << "error in event thread create " << status << endl;
0316 exit(0);
0317 }
0318 }
0319
0320
0321 struct sockaddr_in servaddr, cliaddr;
0322
0323
0324 if ( (sockfd = socket(AF_INET, SOCK_DGRAM, 0)) < 0 )
0325 {
0326 perror("socket creation failed");
0327 exit(EXIT_FAILURE);
0328 }
0329
0330 memset(&servaddr, 0, sizeof(servaddr));
0331 memset(&cliaddr, 0, sizeof(cliaddr));
0332
0333
0334 servaddr.sin_family = AF_INET;
0335 servaddr.sin_addr.s_addr = INADDR_ANY;
0336 servaddr.sin_port = htons(PORT);
0337
0338
0339 if ( bind(sockfd, (const struct sockaddr *)&servaddr,
0340 sizeof(servaddr)) < 0 )
0341 {
0342 perror("bind failed");
0343 exit(EXIT_FAILURE);
0344 }
0345
0346 socklen_t len;
0347 int n;
0348 int current_count = 0;
0349
0350 len = sizeof(cliaddr);
0351
0352 int recbuffer[10];
0353
0354 while (1)
0355 {
0356 n = recvfrom(sockfd, (char *)recbuffer, 2*sizeof(int),
0357 MSG_WAITALL, ( struct sockaddr *) &cliaddr,
0358 &len);
0359 requested += 1;
0360 if ( verbose && ++current_count >= msgfrequency)
0361 {
0362 cout << "request from " << inet_ntoa(cliaddr.sin_addr) << " requesting " << recbuffer[0];
0363 }
0364
0365
0366 if (one_at_a_time == 0)
0367 {
0368 pthread_mutex_lock( &MapSem);
0369 map<int, Event*>::iterator it = EventMap.find(recbuffer[0]);
0370 if ( it == EventMap.end() )
0371 {
0372 pthread_mutex_unlock( &MapSem);
0373
0374 send_not_found(sockfd, (const struct sockaddr *) &cliaddr, len);
0375
0376 if ( verbose && current_count >= msgfrequency)
0377 {
0378 cout << " Event not delivered";
0379 if ( requested > 0) cout << " " << 100 * sent/requested << "%" << endl;
0380 else cout << endl;
0381 current_count = 0;
0382 }
0383 }
0384 else
0385 {
0386 int nw;
0387 (it->second)->Copy(buffer,MAXSIZE,&nw,"");
0388 pthread_mutex_unlock( &MapSem);
0389 sendto(sockfd, (const char *) buffer, nw*sizeof(int),
0390 MSG_CONFIRM, (const struct sockaddr *) &cliaddr,
0391 len);
0392 sent += 1;
0393 if ( verbose && current_count >= msgfrequency )
0394 {
0395 cout << " Event sent";
0396 if ( requested > 0) cout << " " << 100 * sent/requested << "%" << endl;
0397 current_count = 0;
0398 }
0399 }
0400 }
0401
0402 else
0403 {
0404
0405 if (!e)
0406 {
0407 e = it->getNextEvent();
0408 }
0409
0410
0411 if ( !e)
0412 {
0413 send_not_found(sockfd, (const struct sockaddr *) &cliaddr, len);
0414 if ( verbose && ++current_count >= msgfrequency)
0415 {
0416 cout << " end of stream" << endl;
0417 }
0418 return 0;
0419 }
0420
0421
0422 if ( recbuffer[0] < e->getEvtSequence())
0423 {
0424
0425 send_not_found(sockfd, (const struct sockaddr *) &cliaddr, len);
0426
0427 if ( verbose && current_count >= msgfrequency)
0428 {
0429 cout << " Event not delivered";
0430 if ( requested > 0) cout << " " << 100 * sent/requested << "%" << endl;
0431 else cout << endl;
0432 current_count = 0;
0433 }
0434 }
0435 else if ( recbuffer[0] >= e->getEvtSequence() )
0436 {
0437
0438
0439 while (recbuffer[0] != e->getEvtSequence() )
0440 {
0441 if ( e) delete e;
0442 e = it->getNextEvent();
0443
0444 if ( !e)
0445 {
0446 send_not_found(sockfd, (const struct sockaddr *) &cliaddr, len);
0447 if ( verbose && ++current_count >= msgfrequency)
0448 {
0449 cout << " end of stream" << endl;
0450 }
0451 return 0;
0452 }
0453 }
0454
0455 int nw;
0456
0457 e->Copy(buffer,MAXSIZE,&nw,"");
0458 sendto(sockfd, (const char *) buffer, nw*sizeof(int),
0459 MSG_CONFIRM, (const struct sockaddr *) &cliaddr,
0460 len);
0461 sent += 1;
0462 if ( verbose && current_count >= msgfrequency )
0463 {
0464 cout << " Event sent";
0465 if ( requested > 0) cout << " " << 100 * sent/requested << "%" << endl;
0466 current_count = 0;
0467 }
0468 if ( identify) e->identify();
0469 }
0470 }
0471 }
0472 return 0;
0473 }