File indexing completed on 2025-08-03 08:20:45
0001
0002
0003 #define coutfl cout << __FILE__<< " " << __LINE__ << " "
0004 #define cerrfl cerr << __FILE__<< " " << __LINE__ << " "
0005
0006
0007 #include <iostream>
0008 #include <stdlib.h>
0009 #include <unistd.h>
0010 #include <string.h>
0011 #include <sys/types.h>
0012 #include <sys/socket.h>
0013 #include <arpa/inet.h>
0014 #include <netinet/in.h>
0015 #include <pthread.h>
0016
0017 #ifdef HAVE_GETOPT_H
0018 #include "getopt.h"
0019 #endif
0020
0021 #include <map>
0022 #include <set>
0023
0024
0025
0026
0027 #include "testEventiterator.h"
0028 #include "fileEventiterator.h"
0029 #include "rcdaqEventiterator.h"
0030 #include "oncsEventiterator.h"
0031
0032 using namespace std;
0033
0034 #define PORT 8080
0035 #define MAXSIZE 5120
0036
0037
0038 Eventiterator *it;
0039
0040
0041 #define RCDAQEVENTITERATOR 1
0042 #define FILEEVENTITERATOR 2
0043 #define TESTEVENTITERATOR 3
0044 #define ONCSEVENTITERATOR 4
0045
0046 void exitmsg()
0047 {
0048 cout << "** usage: eventSserver -ivfTrOh datastream" << std::endl;
0049 cout << " type eventServer -h for more help" << std::endl;
0050 exit(0);
0051 }
0052
0053 float requested = 0;
0054 float sent = 0;
0055
0056 int last_eventnr = 0;
0057
0058 void exithelp()
0059 {
0060
0061 cout << std::endl;
0062 cout << " gl1_server serves events from a given datastream" << std::endl;
0063 cout << std::endl;
0064 cout << " List of options: " << std::endl;
0065 cout << " -f (stream is a file)" << std::endl;
0066 cout << " -T (stream is a test stream)" << std::endl;
0067 cout << " -r (stream is a rcdaq monitoring stream)" << std::endl;
0068 cout << " -O (stream is a legacy ONCS format file)" << std::endl;
0069 cout << " -h this message" << std::endl;
0070 cout << endl;
0071 cout << " debug options" << endl;
0072 cout << " -s <number> sleep so many ticks (in units of usleep)" << std::endl;
0073 cout << " -i <print event identity>" << std::endl;
0074 cout << " -v verbose" << std::endl;
0075 cout << " -c count print -v -or -i messages only every <count> occurences " << std::endl;
0076 exit(0);
0077 }
0078
0079 int go_on = 1;
0080 int identify = 0;
0081 int verbose = 0;
0082 int repeatcount =1;
0083 int ittype = RCDAQEVENTITERATOR;
0084 int sleeptime = 0;
0085 int old_runnumber = -9999;
0086 int msgfrequency =1;
0087
0088 pthread_mutex_t MapSem;
0089 pthread_mutex_t M_cout;
0090
0091 #define RANGE (0x7ffff)
0092
0093 Event* eventarray[RANGE+1] = {0};
0094
0095
0096 void * EventLoop( void *arg)
0097 {
0098
0099 if ( identify) it->identify();
0100 int current_count = 0;
0101
0102 while ( go_on)
0103 {
0104 Event *e = it->getNextEvent();
0105 if ( ! e)
0106 {
0107 go_on = 0;
0108 return 0;
0109
0110 }
0111 e->convert();
0112 last_eventnr = e->getEvtSequence() ;
0113
0114
0115 if ( old_runnumber != e->getRunNumber())
0116 {
0117 old_runnumber = e->getRunNumber();
0118 requested = 0;
0119 sent = 0;
0120 for ( int i = 0; i< RANGE; i++)
0121 {
0122 delete eventarray[i];
0123 eventarray[i] = 0;
0124 }
0125 }
0126
0127
0128
0129
0130 pthread_mutex_lock( &MapSem);
0131
0132
0133
0134
0135 if ( eventarray[(last_eventnr & RANGE)])
0136 {
0137 delete eventarray[(last_eventnr & RANGE)];
0138 }
0139
0140 eventarray[(last_eventnr & RANGE)] =e;
0141
0142
0143 pthread_mutex_unlock( &MapSem);
0144
0145 current_count++;
0146
0147 if ( current_count >= msgfrequency && identify )
0148 {
0149 pthread_mutex_lock( &M_cout);
0150 e->identify();
0151 pthread_mutex_unlock( &M_cout);
0152 current_count = 0;
0153 }
0154
0155 if (sleeptime) usleep(sleeptime);
0156 }
0157 return 0;
0158 }
0159
0160 int send_not_found (int sockfd, const struct sockaddr * cliaddr, socklen_t len)
0161 {
0162 int buffer[2] = {0};
0163
0164 sendto(sockfd, (const char *) buffer, sizeof(int),
0165 MSG_CONFIRM, cliaddr,
0166 len);
0167
0168 return 0;
0169 }
0170
0171
0172
0173 int
0174 main(int argc, char *argv[])
0175 {
0176 int sockfd;
0177 int buffer[MAXSIZE];
0178
0179 int c;
0180
0181 int ThePort = PORT;
0182
0183 int status = -1;
0184
0185 pthread_mutex_init( &MapSem, 0);
0186 pthread_mutex_init( &M_cout, 0);
0187
0188 while ((c = getopt(argc, argv, "d:s:c:p:ifTrOvh")) != EOF)
0189 switch (c)
0190 {
0191 case 'd':
0192 coutfl << " -d flag is obsolete " << endl;
0193 break;
0194
0195 case 's':
0196 if ( !sscanf(optarg, "%d", &sleeptime) ) exitmsg();
0197 break;
0198
0199 case 'c':
0200 if ( !sscanf(optarg, "%d", &msgfrequency) ) exitmsg();
0201 break;
0202
0203 case 'p':
0204 if ( !sscanf(optarg, "%d", &ThePort) ) exitmsg();
0205 break;
0206
0207 case 'i':
0208 identify = 1;
0209 break;
0210
0211 case 'T':
0212 ittype = TESTEVENTITERATOR;
0213 break;
0214
0215 case 'f':
0216 ittype = FILEEVENTITERATOR;
0217 break;
0218
0219 case 'r':
0220 ittype = RCDAQEVENTITERATOR;
0221 break;
0222
0223 case 'O':
0224 ittype = ONCSEVENTITERATOR;
0225 break;
0226
0227 case 'v':
0228 verbose++;
0229 break;
0230
0231 case 'h':
0232 exithelp();
0233 break;
0234 }
0235
0236 switch (ittype)
0237 {
0238 case RCDAQEVENTITERATOR:
0239 if ( optind+1>argc)
0240 {
0241 std::string host = "localhost";
0242
0243 if ( getenv("RCDAQHOST") )
0244 {
0245 host = getenv("RCDAQHOST");
0246 }
0247
0248 it = new rcdaqEventiterator(host.c_str(), status);
0249
0250 }
0251 else
0252 {
0253 it = new rcdaqEventiterator(argv[optind], status);
0254 }
0255 break;
0256
0257 case TESTEVENTITERATOR:
0258 it = new testEventiterator();
0259 status =0;
0260 break;
0261
0262 case FILEEVENTITERATOR:
0263 if ( optind+1>argc) exitmsg();
0264 it = new fileEventiterator(argv[optind], status);
0265 break;
0266
0267 case ONCSEVENTITERATOR:
0268 if ( optind+1>argc) exitmsg();
0269 it = new oncsEventiterator(argv[optind], status);
0270 break;
0271
0272 status = 1;
0273 break;
0274
0275 default:
0276 exitmsg();
0277 break;
0278 }
0279
0280 if (status)
0281 {
0282 delete it;
0283 it = 0;
0284 cout << "Could not open input stream" << std::endl;
0285 exit(1);
0286 }
0287
0288
0289 pthread_t ThreadEvt;
0290
0291 status = pthread_create(&ThreadEvt, NULL,
0292 EventLoop,
0293 (void *) 0);
0294
0295 if (status )
0296 {
0297 cout << "error in event thread create " << status << endl;
0298 exit(0);
0299 }
0300
0301
0302
0303 struct sockaddr_in servaddr, cliaddr;
0304
0305
0306 if ( (sockfd = socket(AF_INET, SOCK_DGRAM, 0)) < 0 )
0307 {
0308 perror("socket creation failed");
0309 exit(EXIT_FAILURE);
0310 }
0311
0312 memset(&servaddr, 0, sizeof(servaddr));
0313 memset(&cliaddr, 0, sizeof(cliaddr));
0314
0315
0316 servaddr.sin_family = AF_INET;
0317 servaddr.sin_addr.s_addr = INADDR_ANY;
0318 servaddr.sin_port = htons(ThePort);
0319
0320
0321 if ( bind(sockfd, (const struct sockaddr *)&servaddr,
0322 sizeof(servaddr)) < 0 )
0323 {
0324 perror("bind failed");
0325 exit(EXIT_FAILURE);
0326 }
0327
0328 socklen_t len;
0329 int n;
0330 int current_count = 0;
0331
0332 len = sizeof(cliaddr);
0333
0334 int recbuffer[10];
0335
0336 while (1)
0337 {
0338 n = recvfrom(sockfd, (char *)recbuffer, 2*sizeof(int),
0339 MSG_WAITALL, ( struct sockaddr *) &cliaddr,
0340 &len);
0341 requested += 1;
0342
0343
0344 if ( recbuffer[0] > last_eventnr)
0345 {
0346 send_not_found(sockfd, (const struct sockaddr *) &cliaddr, len);
0347 if ( verbose && ++current_count >= msgfrequency)
0348 {
0349
0350 pthread_mutex_lock( &M_cout);
0351 cout << "request from " << inet_ntoa(cliaddr.sin_addr) << " not delivered, beyond scope " << recbuffer[0]
0352 << " last " << last_eventnr << " diff: " << last_eventnr - recbuffer[0] << endl;
0353 pthread_mutex_unlock( &M_cout);
0354 current_count = 0;
0355 }
0356 }
0357 else
0358 {
0359 pthread_mutex_lock( &MapSem);
0360
0361 Event *x = eventarray[recbuffer[0] & RANGE];
0362
0363 if ( x == 0 || x->getEvtSequence() != recbuffer[0])
0364 {
0365 pthread_mutex_unlock( &MapSem);
0366
0367 send_not_found(sockfd, (const struct sockaddr *) &cliaddr, len);
0368
0369
0370 if ( verbose && ++current_count >= msgfrequency)
0371 {
0372 pthread_mutex_lock( &M_cout);
0373 cout << "request from " << inet_ntoa(cliaddr.sin_addr) << " not delivered " << recbuffer[0]
0374 << " last " << last_eventnr << " diff: " << last_eventnr - recbuffer[0] ;
0375 if ( requested > 0) cout << " " << 100 * sent/requested << "%" << endl;
0376 else cout << endl;
0377 pthread_mutex_unlock( &M_cout);
0378 current_count = 0;
0379 }
0380 }
0381 else
0382 {
0383 int nw;
0384 x->Copy(buffer,MAXSIZE,&nw,"");
0385 pthread_mutex_unlock( &MapSem);
0386 sendto(sockfd, (const char *) buffer, nw*sizeof(int),
0387 MSG_CONFIRM, (const struct sockaddr *) &cliaddr,
0388 len);
0389 sent += 1;
0390 if ( verbose && ++current_count >= msgfrequency )
0391 {
0392 pthread_mutex_lock( &M_cout);
0393 cout << "request from " << inet_ntoa(cliaddr.sin_addr) << " event sent " << recbuffer[0];
0394 if ( requested > 0) cout << " " << 100 * sent/requested << "%" << endl;
0395 else cout << endl;
0396 pthread_mutex_unlock( &M_cout);
0397 current_count = 0;
0398 }
0399 }
0400 }
0401 }
0402 return 0;
0403 }