File indexing completed on 2025-08-03 08:20:33
0001 #include <stdlib.h>
0002 #include <signal.h>
0003 #include <dlfcn.h>
0004 #include <sys/types.h>
0005 #include <sys/stat.h>
0006 #include <fcntl.h>
0007 #include <unistd.h>
0008
0009
0010 #include "fileEventiterator.h"
0011 #include "testEventiterator.h"
0012 #include "listEventiterator.h"
0013 #include "rcdaqEventiterator.h"
0014 #include "ogzBuffer.h"
0015 #include "olzoBuffer.h"
0016 #include "oamlBuffer.h"
0017 #include "ophBuffer.h"
0018 #include "dpipe_filter.h"
0019
0020 #include "phenixTypes.h"
0021 #include "oEvent.h"
0022 #include "stdio.h"
0023 #include "EventTypes.h"
0024 #ifdef HAVE_GETOPT_H
0025 #include <getopt.h>
0026 #endif
0027
0028 #define RCDAQEVENTITERATOR 1
0029 #define FILEEVENTITERATOR 2
0030 #define TESTEVENTITERATOR 3
0031 #define ETPOOL 4
0032 #define DFILE 5
0033 #define DNULL 6
0034 #define LISTEVENTITERATOR 7
0035 #define OAML 8
0036
0037
0038 #ifndef WIN32
0039 #if defined(SunOS) || defined(Linux) || defined(OSF1)
0040 void sig_handler(int);
0041 #else
0042 void sig_handler(...);
0043 #endif
0044 #endif
0045
0046
0047 void exitmsg()
0048 {
0049 COUT << "** usage: dpipe -s -d -v -w -n -i -z -l -x source destination" << std::endl;
0050 COUT << " dpipe -h for more help" << std::endl;
0051 exit(0);
0052 }
0053
0054 void evtcountexitmsg()
0055 {
0056 COUT << "** cannot specify both -e and -c!" << std::endl;
0057 COUT << " type dpipe -h for more help" << std::endl;
0058 exit(0);
0059 }
0060
0061 void compressionexitmsg()
0062 {
0063 COUT << "** cannot specify both -z and -l!" << std::endl;
0064 COUT << " type dpipe -h for more help" << std::endl;
0065 exit(0);
0066 }
0067
0068 void exithelp()
0069 {
0070 COUT << std::endl;
0071 COUT << " dpipe reads events from one source and writes it to a destination. The source can" << std::endl;
0072 COUT << " be any of the standard sources (ET pool, file, filelist, or test stream). The destination " << std::endl;
0073 COUT << " can be a file, a ET pool, an AML server, or no destination at all (you cannot write to a test " << std::endl;
0074 COUT << " stream). Like with a Unix pipe, you can move events through a chain of sources " << std::endl;
0075 COUT << " and destinations, for example, from one ET pool into another, or into a file. " << std::endl;
0076 COUT << std::endl;
0077 COUT << " While the events move through dpipe, you can have them identify themselves. If the " << std::endl;
0078 COUT << " destination is null, this is a simple way to sift through a stream of events " << std::endl;
0079 COUT << " and look at their identification messages. " << std::endl;
0080 COUT << std::endl;
0081 COUT << " You can throttle the data flow with the -w (wait) option, and you can stop after" << std::endl;
0082 COUT << " a given number of events with the -n option. " << std::endl;
0083 COUT << std::endl;
0084 COUT << " In order to write events from a ET pool called ONLINE to a file (d.evt), use" << std::endl;
0085 COUT << std::endl;
0086 COUT << " > dpipe -s etpool -d file ONLINE d.evt" << std::endl;
0087 COUT << std::endl;
0088 COUT << " if you want to see which events are coming, add -i:" << std::endl;
0089 COUT << std::endl;
0090 COUT << " > dpipe -s etpool -d file -i ONLINE d.evt" << std::endl;
0091 COUT << std::endl;
0092 COUT << " If the output is a aml server, specify the destination as hostname:port: " << std::endl;
0093 COUT << " > dpipe -s f -d a -i filename phnxbox4.phenix.bnl.gov:8900" << std::endl << std::endl;
0094 COUT << " Note that you can abbreviate the etpool, file, listfile, and Test to d, f, l, and T." << std::endl;
0095 COUT << " > dpipe -s etpool -d file ONLINE d.evt" << std::endl;
0096 COUT << " is equivalent to " << std::endl;
0097 COUT << " > dpipe -s d -d f ONLINE d.evt" << std::endl;
0098 COUT << std::endl;
0099 COUT << " List of options: " << std::endl;
0100 COUT << " -s [d or f or l or T] source is et pool, file, listfile, or Test stream" << std::endl;
0101 COUT << " -b <size in MB> in case you write to a file, specify the buffer size (default 4MB)" << std::endl;
0102 COUT << " -d [d or f or a or n] destination is et pool or file , aml server or nothing" << std::endl;
0103 COUT << " -v verbose" << std::endl;
0104 COUT << " -w (time in milliseconds> wait time interval (in ms) between events to throttle the data flow" << std::endl;
0105 COUT << " -e <event number> start from event number" << std::endl;
0106 COUT << " -c <number> get nth event (-e gives event with number n)" << std::endl;
0107 COUT << " -n <number> stop after so many events" << std::endl;
0108 COUT << " -i have each event identify itself" << std::endl;
0109 COUT << " -z gzip-compress each output buffer" << std::endl;
0110 COUT << " -l LZO-compress each output buffer" << std::endl;
0111 COUT << " -x sharedlibrary.so load a plugin that can select events" << std::endl;
0112 COUT << " -h this message" << std::endl << std::endl;
0113 exit(0);
0114 }
0115
0116
0117
0118
0119 Eventiterator *it;
0120
0121
0122 char *sharedlib;
0123 int load_lib = 0;
0124
0125 DpipeFilter *filter = 0;
0126
0127
0128
0129 void dpipe_register ( DpipeFilter *T)
0130 {
0131 if ( filter ) delete filter;
0132 filter = T;
0133
0134 }
0135
0136 void dpipe_unregister ( DpipeFilter *T)
0137 {
0138 if ( filter && T == filter )
0139 {
0140 filter = 0;
0141 }
0142
0143 }
0144
0145
0146
0147
0148 int
0149 main(int argc, char *argv[])
0150 {
0151 int c;
0152 int status = 0;
0153
0154 int sourcetype =DFILE;
0155 int destinationtype = ETPOOL;
0156 int waitinterval = 0;
0157 int verbose = 0;
0158 int identify = 0;
0159 int maxevents = 0;
0160 int eventnr = 0;
0161 int gzipcompress = 0;
0162 int lzocompress = 0;
0163 int eventnumber =0;
0164 int countnumber =0;
0165 void *voidpointer;
0166
0167
0168 PHDWORD *buffer;
0169 oBuffer *ob = 0;
0170 int fd = 0;
0171 int buffer_size = 256*1024*4 ;
0172
0173
0174
0175 it = 0;
0176
0177
0178
0179
0180 #ifndef WIN32
0181 while ((c = getopt(argc, argv, "e:b:c:s:d:n:w:x:vhizl")) != EOF)
0182 {
0183 switch (c)
0184 {
0185
0186 case 'e':
0187 if ( !sscanf(optarg, "%d", &eventnumber) ) exitmsg();
0188 break;
0189
0190 case 'b':
0191 if ( !sscanf(optarg, "%d", &buffer_size) ) exitmsg();
0192 buffer_size = buffer_size*256*1024;
0193 break;
0194
0195 case 'c':
0196 if ( !sscanf(optarg, "%d", &countnumber) ) exitmsg();
0197 break;
0198
0199 case 's':
0200 if ( *optarg == 'T' ) sourcetype = TESTEVENTITERATOR;
0201 else if ( *optarg == 'f' ) sourcetype = FILEEVENTITERATOR;
0202 else if ( *optarg == 'r' ) sourcetype = RCDAQEVENTITERATOR;
0203 else if ( *optarg == 'l' ) sourcetype = LISTEVENTITERATOR;
0204 else exitmsg();
0205 break;
0206
0207
0208 case 'd':
0209 if ( *optarg == 'd' ) destinationtype = ETPOOL;
0210 else if ( *optarg == 'f' ) destinationtype = DFILE;
0211 else if ( *optarg == 'n' ) destinationtype = DNULL;
0212 else if ( *optarg == 'a' ) destinationtype = OAML;
0213 else exitmsg();
0214 break;
0215
0216 case 'v':
0217 verbose++;
0218 break;
0219
0220 case 'i':
0221 identify = 1;
0222 break;
0223
0224 case 'w':
0225 if ( !sscanf(optarg, "%d", &waitinterval) ) exitmsg();
0226 break;
0227
0228 case 'n':
0229 if ( !sscanf(optarg, "%d", &maxevents) ) exitmsg();
0230 break;
0231
0232 case 'z':
0233 gzipcompress = 1;
0234 break;
0235
0236 case 'l':
0237 lzocompress = 1;
0238 break;
0239
0240 case 'x':
0241 voidpointer = dlopen(optarg, RTLD_GLOBAL | RTLD_NOW);
0242 if (!voidpointer)
0243 {
0244 std::cout << "Loading of the filter library "
0245 << optarg << " failed: " << dlerror() << std::endl;
0246
0247 }
0248 if (filter) std::cout <<" Filter \"" << filter->idString() << "\" registered" << std::endl;
0249
0250 sharedlib=optarg;
0251 load_lib=1;
0252 break;
0253
0254 case 'h':
0255 exithelp();
0256 break;
0257
0258 default:
0259 break;
0260
0261 }
0262 }
0263 #else
0264 char* pszParam;
0265 char chOpt;
0266
0267 while ( (chOpt = GetOption(argc, argv, "e:b:c:s:d:n:w:vhiz", &pszParam) ) >1)
0268 {
0269
0270
0271 switch (chOpt)
0272 {
0273 case 'e':
0274
0275 if ( !sscanf(pszParam, "%d", &eventnumber) ) exitmsg();
0276 break;
0277
0278 case 'b':
0279
0280 if ( !sscanf(pszParam, "%d", &buffer_size) ) exitmsg();
0281 buffer_size = buffer_size*256*1024;
0282 break;
0283
0284 case 'c':
0285
0286 if ( !sscanf(pszParam, "%d", &countnumber) ) exitmsg();
0287 break;
0288
0289 case 's':
0290
0291 if ( *pszParam == 'T' ) sourcetype = TESTEVENTITERATOR;
0292 else if ( *pszParam == 'f' ) sourcetype = FILEEVENTITERATOR;
0293 else exitmsg();
0294 break;
0295
0296
0297 case 'd':
0298
0299 if ( *pszParam == 'f' ) destinationtype = DFILE;
0300 else if ( *pszParam == 'n' ) destinationtype = DNULL;
0301 else exitmsg();
0302 break;
0303
0304 case 'v':
0305 verbose = 1;
0306 break;
0307
0308 case 'i':
0309 identify = 1;
0310 break;
0311
0312 case 'w':
0313 if ( !sscanf(pszParam, "%d", &waitinterval) ) exitmsg();
0314 break;
0315
0316 case 'n':
0317
0318 if ( !sscanf(pszParam, "%d", &maxevents) ) exitmsg();
0319 break;
0320
0321 case 'z':
0322 gzipcompress = 1;
0323 break;
0324
0325 case 'h':
0326 exithelp();
0327 break;
0328
0329 default:
0330 break;
0331
0332 }
0333 }
0334 #endif
0335
0336
0337 if ( eventnumber && countnumber) evtcountexitmsg();
0338 if ( gzipcompress && lzocompress ) compressionexitmsg();
0339
0340
0341 #ifndef WIN32
0342 signal(SIGKILL, sig_handler);
0343 signal(SIGTERM, sig_handler);
0344 signal(SIGINT, sig_handler);
0345 #endif
0346
0347
0348 switch (sourcetype)
0349 {
0350 case RCDAQEVENTITERATOR:
0351 it = new rcdaqEventiterator(argv[optind], status);
0352 break;
0353
0354 case TESTEVENTITERATOR:
0355
0356 it = new testEventiterator();
0357 status =0;
0358 break;
0359
0360 case FILEEVENTITERATOR:
0361 it = new fileEventiterator(argv[optind], status);
0362 break;
0363
0364 case LISTEVENTITERATOR:
0365 it = new listEventiterator(argv[optind], status);
0366 break;
0367
0368 default:
0369 exitmsg();
0370 break;
0371 }
0372
0373 if (status)
0374 {
0375 delete it;
0376 COUT << "Could not open input stream" << std::endl;
0377 exit(1);
0378 }
0379
0380
0381 it->setVerbosity(verbose);
0382
0383
0384
0385 if ( destinationtype == DNULL )
0386 {
0387
0388 }
0389
0390 else if ( destinationtype == DFILE)
0391 {
0392 buffer = new PHDWORD [buffer_size];
0393
0394 unlink (argv[optind+1]);
0395
0396 fd = open (argv[optind+1], O_WRONLY | O_CREAT | O_EXCL | O_LARGEFILE ,
0397 S_IRWXU | S_IROTH | S_IRGRP );
0398
0399 if ( fd < 0)
0400 {
0401 COUT << "Could not open file: " << argv[optind+1] << std::endl;
0402 exit (1);
0403 }
0404 if (gzipcompress)
0405 {
0406 ob = new ogzBuffer (fd, buffer, buffer_size);
0407 }
0408 else if ( lzocompress)
0409 {
0410 ob = new olzoBuffer (fd, buffer, buffer_size);
0411 }
0412 else
0413 {
0414 ob = new ophBuffer (fd, buffer, buffer_size);
0415 }
0416
0417 }
0418
0419
0420 else if ( destinationtype == OAML)
0421 {
0422 buffer = new PHDWORD [buffer_size];
0423 ob = new oamlBuffer (argv[optind+1], buffer, buffer_size);
0424 }
0425
0426
0427 else
0428 {
0429 COUT << "invalid destination" << std::endl;
0430 exitmsg();
0431 }
0432
0433
0434
0435
0436 Event *evt;
0437 int take_this;
0438 int count = 0;
0439
0440 while ( ( maxevents == 0 || eventnr < maxevents) &&
0441 ( evt = it->getNextEvent()) )
0442 {
0443 take_this = 1;
0444 count++;
0445
0446 if ( eventnumber )
0447 {
0448 if ( evt->getEvtSequence() == eventnumber)
0449 eventnumber = 0;
0450 else
0451 take_this = 0;
0452 }
0453
0454 if ( countnumber && count < countnumber)
0455 take_this = 0;
0456
0457 if (take_this)
0458 {
0459 if ( (! filter) || filter->select(evt) )
0460 {
0461 if (identify) evt->identify();
0462 if ( destinationtype == DFILE || destinationtype == OAML )
0463 {
0464 status = ob->addEvent(evt);
0465 }
0466 if ( status )
0467 {
0468 COUT << "Error writing events " << std::endl;
0469 break;
0470 }
0471 eventnr++;
0472 }
0473 }
0474
0475 delete evt;
0476
0477 #ifndef WIN32
0478 if (waitinterval > 0) usleep(waitinterval*1000);
0479 #else
0480 if (waitinterval > 0) Sleep(waitinterval);
0481 #endif
0482
0483 }
0484 delete it;
0485
0486 if ( destinationtype == DFILE )
0487 {
0488 delete ob;
0489
0490 #ifndef WIN32
0491 close(fd);
0492 #endif
0493 }
0494
0495 #ifndef WIN32
0496 else if ( destinationtype == OAML )
0497 {
0498 delete ob;
0499 }
0500 #endif
0501
0502
0503 return 0;
0504 }
0505
0506 #if defined(SunOS) || defined(Linux) || defined(OSF1)
0507 void sig_handler(int i)
0508 #else
0509 void sig_handler(...)
0510 #endif
0511 {
0512 if (it) delete it;
0513 exit(0);
0514 }
0515
0516
0517