File indexing completed on 2026-04-06 08:15:49
0001
0002
0003 #define coutfl cout << __FILE__<< " " << __LINE__ << " "
0004 #define cerrfl cerr << __FILE__<< " " << __LINE__ << " "
0005
0006
0007 #include <iostream>
0008 #include <stdlib.h>
0009 #include <unistd.h>
0010 #include <string.h>
0011 #include <sys/types.h>
0012 #include <sys/socket.h>
0013 #include <arpa/inet.h>
0014 #include <netinet/in.h>
0015 #include <pthread.h>
0016
0017 #ifdef HAVE_GETOPT_H
0018 #include "getopt.h"
0019 #endif
0020
0021 #include <map>
0022 #include <set>
0023
0024
0025
0026
0027 #include "testEventiterator.h"
0028 #include "fileEventiterator.h"
0029 #include "rcdaqEventiterator.h"
0030 #include "oncsEventiterator.h"
0031
0032 using namespace std;
0033
0034 #define PORT 8080
0035 #define MAXSIZE 5120
0036
0037
0038 Eventiterator *it;
0039
0040
0041 #define RCDAQEVENTITERATOR 1
0042 #define FILEEVENTITERATOR 2
0043 #define TESTEVENTITERATOR 3
0044 #define ONCSEVENTITERATOR 4
0045
0046 void exitmsg()
0047 {
0048 cout << "** usage: eventSserver -ivfTrOh datastream" << std::endl;
0049 cout << " type eventServer -h for more help" << std::endl;
0050 exit(0);
0051 }
0052
0053 float requested = 0;
0054 float sent = 0;
0055
0056 int last_eventnr = 0;
0057
0058 void exithelp()
0059 {
0060
0061 cout << std::endl;
0062 cout << " gl1_server serves events from a given datastream" << std::endl;
0063 cout << std::endl;
0064 cout << " List of options: " << std::endl;
0065 cout << " -f (stream is a file)" << std::endl;
0066 cout << " -T (stream is a test stream)" << std::endl;
0067 cout << " -r (stream is a rcdaq monitoring stream)" << std::endl;
0068 cout << " -O (stream is a legacy ONCS format file)" << std::endl;
0069 cout << " -h this message" << std::endl;
0070 cout << endl;
0071 cout << " debug options" << endl;
0072 cout << " -s <number> sleep so many ticks (in units of usleep)" << std::endl;
0073 cout << " -i <print event identity>" << std::endl;
0074 cout << " -v verbose" << std::endl;
0075 cout << " -c count print -v -or -i messages only every <count> occurences " << std::endl;
0076 exit(0);
0077 }
0078
0079 int go_on = 1;
0080 int identify = 0;
0081 int verbose = 0;
0082 int repeatcount =1;
0083 int ittype = RCDAQEVENTITERATOR;
0084 int sleeptime = 0;
0085 int old_runnumber = -9999;
0086 int msgfrequency =1;
0087
0088 pthread_mutex_t MapSem;
0089 pthread_mutex_t M_cout;
0090
0091
0092 #define RANGE (0x1fffff)
0093
0094 Event* eventarray[RANGE+1] = {0};
0095
0096
0097 void * EventLoop( void *arg)
0098 {
0099
0100 if ( identify) it->identify();
0101 int current_count = 0;
0102
0103 while ( go_on)
0104 {
0105 Event *e = it->getNextEvent();
0106 if ( ! e)
0107 {
0108 go_on = 0;
0109 return 0;
0110
0111 }
0112 e->convert();
0113 last_eventnr = e->getEvtSequence() ;
0114
0115
0116 if ( old_runnumber != e->getRunNumber())
0117 {
0118 old_runnumber = e->getRunNumber();
0119 requested = 0;
0120 sent = 0;
0121 for ( int i = 0; i< RANGE; i++)
0122 {
0123 delete eventarray[i];
0124 eventarray[i] = 0;
0125 }
0126 }
0127
0128
0129
0130
0131 pthread_mutex_lock( &MapSem);
0132
0133
0134
0135
0136 if ( eventarray[(last_eventnr & RANGE)])
0137 {
0138 delete eventarray[(last_eventnr & RANGE)];
0139 }
0140
0141 eventarray[(last_eventnr & RANGE)] =e;
0142
0143
0144 pthread_mutex_unlock( &MapSem);
0145
0146 current_count++;
0147
0148 if ( current_count >= msgfrequency && identify )
0149 {
0150 pthread_mutex_lock( &M_cout);
0151 e->identify();
0152 pthread_mutex_unlock( &M_cout);
0153 current_count = 0;
0154 }
0155
0156 if (sleeptime) usleep(sleeptime);
0157 }
0158 return 0;
0159 }
0160
0161 int send_not_found (int sockfd, const struct sockaddr * cliaddr, socklen_t len)
0162 {
0163 int buffer[2] = {0};
0164
0165 sendto(sockfd, (const char *) buffer, sizeof(int),
0166 MSG_CONFIRM, cliaddr,
0167 len);
0168
0169 return 0;
0170 }
0171
0172
0173
0174 int
0175 main(int argc, char *argv[])
0176 {
0177 int sockfd;
0178 int buffer[MAXSIZE];
0179
0180 int c;
0181
0182 int ThePort = PORT;
0183
0184 int status = -1;
0185
0186 pthread_mutex_init( &MapSem, 0);
0187 pthread_mutex_init( &M_cout, 0);
0188
0189 while ((c = getopt(argc, argv, "d:s:c:p:ifTrOvh")) != EOF)
0190 switch (c)
0191 {
0192 case 'd':
0193 coutfl << " -d flag is obsolete " << endl;
0194 break;
0195
0196 case 's':
0197 if ( !sscanf(optarg, "%d", &sleeptime) ) exitmsg();
0198 break;
0199
0200 case 'c':
0201 if ( !sscanf(optarg, "%d", &msgfrequency) ) exitmsg();
0202 break;
0203
0204 case 'p':
0205 if ( !sscanf(optarg, "%d", &ThePort) ) exitmsg();
0206 break;
0207
0208 case 'i':
0209 identify = 1;
0210 break;
0211
0212 case 'T':
0213 ittype = TESTEVENTITERATOR;
0214 break;
0215
0216 case 'f':
0217 ittype = FILEEVENTITERATOR;
0218 break;
0219
0220 case 'r':
0221 ittype = RCDAQEVENTITERATOR;
0222 break;
0223
0224 case 'O':
0225 ittype = ONCSEVENTITERATOR;
0226 break;
0227
0228 case 'v':
0229 verbose++;
0230 break;
0231
0232 case 'h':
0233 exithelp();
0234 break;
0235 }
0236
0237 switch (ittype)
0238 {
0239 case RCDAQEVENTITERATOR:
0240 if ( optind+1>argc)
0241 {
0242 std::string host = "localhost";
0243
0244 if ( getenv("RCDAQHOST") )
0245 {
0246 host = getenv("RCDAQHOST");
0247 }
0248
0249 it = new rcdaqEventiterator(host.c_str(), status);
0250
0251 }
0252 else
0253 {
0254 it = new rcdaqEventiterator(argv[optind], status);
0255 }
0256 break;
0257
0258 case TESTEVENTITERATOR:
0259 it = new testEventiterator();
0260 status =0;
0261 break;
0262
0263 case FILEEVENTITERATOR:
0264 if ( optind+1>argc) exitmsg();
0265 it = new fileEventiterator(argv[optind], status);
0266 break;
0267
0268 case ONCSEVENTITERATOR:
0269 if ( optind+1>argc) exitmsg();
0270 it = new oncsEventiterator(argv[optind], status);
0271 break;
0272
0273 status = 1;
0274 break;
0275
0276 default:
0277 exitmsg();
0278 break;
0279 }
0280
0281 if (status)
0282 {
0283 delete it;
0284 it = 0;
0285 cout << "Could not open input stream" << std::endl;
0286 exit(1);
0287 }
0288
0289
0290 pthread_t ThreadEvt;
0291
0292 status = pthread_create(&ThreadEvt, NULL,
0293 EventLoop,
0294 (void *) 0);
0295
0296 if (status )
0297 {
0298 cout << "error in event thread create " << status << endl;
0299 exit(0);
0300 }
0301
0302
0303
0304 struct sockaddr_in servaddr, cliaddr;
0305
0306
0307 if ( (sockfd = socket(AF_INET, SOCK_DGRAM, 0)) < 0 )
0308 {
0309 perror("socket creation failed");
0310 exit(EXIT_FAILURE);
0311 }
0312
0313 memset(&servaddr, 0, sizeof(servaddr));
0314 memset(&cliaddr, 0, sizeof(cliaddr));
0315
0316
0317 servaddr.sin_family = AF_INET;
0318 servaddr.sin_addr.s_addr = INADDR_ANY;
0319 servaddr.sin_port = htons(ThePort);
0320
0321
0322 if ( bind(sockfd, (const struct sockaddr *)&servaddr,
0323 sizeof(servaddr)) < 0 )
0324 {
0325 perror("bind failed");
0326 exit(EXIT_FAILURE);
0327 }
0328
0329 socklen_t len;
0330 int n;
0331 int current_count = 0;
0332
0333 len = sizeof(cliaddr);
0334
0335 int recbuffer[10];
0336
0337 while (1)
0338 {
0339 n = recvfrom(sockfd, (char *)recbuffer, 2*sizeof(int),
0340 MSG_WAITALL, ( struct sockaddr *) &cliaddr,
0341 &len);
0342 requested += 1;
0343
0344
0345 if ( recbuffer[0] > last_eventnr)
0346 {
0347 send_not_found(sockfd, (const struct sockaddr *) &cliaddr, len);
0348 if ( verbose && ++current_count >= msgfrequency)
0349 {
0350
0351 pthread_mutex_lock( &M_cout);
0352 cout << "request from " << inet_ntoa(cliaddr.sin_addr) << " not delivered, beyond scope " << recbuffer[0]
0353 << " last " << last_eventnr << " diff: " << last_eventnr - recbuffer[0] << endl;
0354 pthread_mutex_unlock( &M_cout);
0355 current_count = 0;
0356 }
0357 }
0358 else
0359 {
0360 pthread_mutex_lock( &MapSem);
0361
0362 Event *x = eventarray[recbuffer[0] & RANGE];
0363
0364 if ( x == 0 || x->getEvtSequence() != recbuffer[0])
0365 {
0366 pthread_mutex_unlock( &MapSem);
0367
0368 send_not_found(sockfd, (const struct sockaddr *) &cliaddr, len);
0369
0370
0371 if ( verbose && ++current_count >= msgfrequency)
0372 {
0373 pthread_mutex_lock( &M_cout);
0374 cout << "request from " << inet_ntoa(cliaddr.sin_addr) << " not delivered " << recbuffer[0]
0375 << " last " << last_eventnr << " diff: " << last_eventnr - recbuffer[0] ;
0376 if ( requested > 0) cout << " " << 100 * sent/requested << "%" << endl;
0377 else cout << endl;
0378 pthread_mutex_unlock( &M_cout);
0379 current_count = 0;
0380 }
0381 }
0382 else
0383 {
0384 int nw;
0385 x->Copy(buffer,MAXSIZE,&nw,"");
0386 pthread_mutex_unlock( &MapSem);
0387 sendto(sockfd, (const char *) buffer, nw*sizeof(int),
0388 MSG_CONFIRM, (const struct sockaddr *) &cliaddr,
0389 len);
0390 sent += 1;
0391 if ( verbose && ++current_count >= msgfrequency )
0392 {
0393 pthread_mutex_lock( &M_cout);
0394 cout << "request from " << inet_ntoa(cliaddr.sin_addr) << " event sent " << recbuffer[0];
0395 if ( requested > 0) cout << " " << 100 * sent/requested << "%" << endl;
0396 else cout << endl;
0397 pthread_mutex_unlock( &M_cout);
0398 current_count = 0;
0399 }
0400 }
0401 }
0402 }
0403 return 0;
0404 }