Back to home page

sPhenix code displayed by LXR

 
 

    


File indexing completed on 2025-08-03 08:20:33

0001 #include <stdlib.h>
0002 #include <signal.h>
0003 #include <dlfcn.h>
0004 #include <sys/types.h>
0005 #include <sys/stat.h>
0006 #include <fcntl.h>
0007 #include <unistd.h>
0008 
0009 
0010 #include "fileEventiterator.h"
0011 #include "testEventiterator.h"
0012 #include "listEventiterator.h"
0013 #include "rcdaqEventiterator.h"
0014 #include "ogzBuffer.h"
0015 #include "olzoBuffer.h"
0016 #include "oamlBuffer.h"
0017 #include "ophBuffer.h"
0018 #include "dpipe_filter.h"
0019 
0020 #include "phenixTypes.h"
0021 #include "oEvent.h"
0022 #include "stdio.h"
0023 #include "EventTypes.h"
0024 #ifdef HAVE_GETOPT_H
0025 #include <getopt.h>
0026 #endif
0027 
0028 #define RCDAQEVENTITERATOR 1
0029 #define FILEEVENTITERATOR 2
0030 #define TESTEVENTITERATOR 3
0031 #define ETPOOL 4
0032 #define DFILE 5
0033 #define DNULL 6
0034 #define LISTEVENTITERATOR 7
0035 #define OAML 8
0036 
0037 
0038 #ifndef WIN32
0039 #if defined(SunOS) || defined(Linux) || defined(OSF1) 
0040 void sig_handler(int);
0041 #else
0042 void sig_handler(...);
0043 #endif
0044 #endif
0045 
0046 
0047 void exitmsg()
0048 {
0049   COUT << "** usage: dpipe -s -d -v -w -n -i -z -l -x source destination" << std::endl;
0050   COUT << "          dpipe -h for more help" << std::endl;
0051   exit(0);
0052 }
0053 
0054 void evtcountexitmsg()
0055 {
0056   COUT << "** cannot specify both -e and -c!" << std::endl;
0057   COUT << "    type  dpipe -h  for more help" << std::endl;
0058   exit(0);
0059 }
0060 
0061 void compressionexitmsg()
0062 {
0063   COUT << "** cannot specify both -z and -l!" << std::endl;
0064   COUT << "    type  dpipe -h  for more help" << std::endl;
0065   exit(0);
0066 }
0067 
0068 void exithelp()
0069 {
0070   COUT <<  std::endl;
0071   COUT << " dpipe reads events from one source and writes it to a destination. The source can" << std::endl;
0072   COUT << " be any of the standard sources (ET pool, file, filelist, or test stream). The destination " << std::endl;
0073   COUT << " can be a file, a ET pool, an AML server, or no destination at all (you cannot write to a test " << std::endl;
0074   COUT << " stream). Like with a Unix pipe, you can move events through a chain of sources " << std::endl;
0075   COUT << " and destinations, for example, from one ET pool into another, or into a file. " << std::endl;
0076   COUT <<  std::endl;
0077   COUT << " While the events move through dpipe, you can have them identify themselves. If the " << std::endl;
0078   COUT << " destination is null, this is a simple way to sift through a stream of events " << std::endl;
0079   COUT << " and look at their identification messages. " << std::endl;
0080   COUT <<  std::endl;
0081   COUT << " You can throttle the data flow with the -w (wait) option, and you can stop after" << std::endl;
0082   COUT << " a given number of events with the -n option. " << std::endl;
0083   COUT <<  std::endl;
0084   COUT << " In order to write events from a ET pool called ONLINE to a file (d.evt), use" << std::endl;
0085   COUT <<  std::endl;
0086   COUT << " > dpipe -s etpool -d file ONLINE d.evt" << std::endl;
0087   COUT <<  std::endl;
0088   COUT << " if you want to see which events are coming, add -i:" << std::endl;
0089   COUT <<  std::endl;
0090   COUT << " > dpipe -s etpool -d file -i ONLINE d.evt" << std::endl;
0091   COUT <<  std::endl;
0092   COUT << " If the output is a aml server, specify the destination as hostname:port: " << std::endl;
0093   COUT << " > dpipe -s f -d a -i filename phnxbox4.phenix.bnl.gov:8900" << std::endl << std::endl;
0094   COUT << " Note that you can abbreviate the etpool, file, listfile, and Test to d, f, l, and T." << std::endl;
0095   COUT << " > dpipe -s etpool -d file  ONLINE d.evt" << std::endl;
0096   COUT << " is equivalent to " << std::endl;
0097   COUT << " > dpipe -s d  -d f  ONLINE d.evt" << std::endl;
0098   COUT <<  std::endl;
0099   COUT << "  List of options: " << std::endl;
0100   COUT << " -s [d or f or l or T] source is et pool, file, listfile, or Test stream" << std::endl;
0101   COUT << " -b <size in MB> in case you write to a file, specify the buffer size (default 4MB)" << std::endl;
0102   COUT << " -d [d or f or a  or n] destination is et pool or file , aml server or nothing" << std::endl;
0103   COUT << " -v verbose" << std::endl;
0104   COUT << " -w (time in milliseconds> wait time interval (in ms) between events to throttle the data flow" << std::endl;
0105   COUT << " -e <event number>  start from event number" << std::endl;
0106   COUT << " -c <number> get nth event (-e gives event with number n)" << std::endl;
0107   COUT << " -n <number> stop after so many events" << std::endl;
0108   COUT << " -i have each event identify itself" << std::endl;
0109   COUT << " -z gzip-compress each output buffer" << std::endl;
0110   COUT << " -l LZO-compress each output buffer" << std::endl;
0111   COUT << " -x sharedlibrary.so load a plugin that can select events" << std::endl;
0112   COUT << " -h this message" << std::endl << std::endl;
0113   exit(0);
0114 }
0115 
0116 
0117 // The global pointer to the Eventiterator (we must be able to 
0118 // get at it in the signal handler) 
0119 Eventiterator *it;
0120 
0121 
0122 char *sharedlib;
0123 int load_lib = 0;
0124 
0125 DpipeFilter *filter = 0;
0126 
0127 
0128 
0129 void dpipe_register ( DpipeFilter *T)
0130 {
0131   if ( filter ) delete filter;
0132   filter = T;
0133 
0134 }
0135 
0136 void dpipe_unregister ( DpipeFilter *T)
0137 {
0138   if ( filter && T == filter ) 
0139     {
0140       filter = 0;
0141     }
0142 
0143 }
0144 
0145 
0146 
0147 
0148 int 
0149 main(int argc, char *argv[])
0150 {
0151   int c;
0152   int status = 0;
0153 
0154   int sourcetype =DFILE;
0155   int destinationtype = ETPOOL;
0156   int waitinterval = 0;
0157   int verbose = 0;
0158   int identify = 0;
0159   int maxevents = 0;
0160   int eventnr = 0;
0161   int gzipcompress = 0;
0162   int lzocompress = 0;
0163   int eventnumber =0;
0164   int countnumber =0;
0165   void *voidpointer;
0166 
0167 
0168   PHDWORD  *buffer;
0169   oBuffer *ob = 0;
0170   int fd = 0;
0171   int buffer_size = 256*1024*4 ;  // makes  4MB (specifies how many dwords, so *4)
0172 
0173 
0174   // initialize the it pointer to 0;
0175   it = 0;
0176 
0177   //  if (argc < 3) exitmsg();
0178   //    COUT << "parsing input" << std::endl;
0179 
0180 #ifndef WIN32
0181   while ((c = getopt(argc, argv, "e:b:c:s:d:n:w:x:vhizl")) != EOF)
0182     {
0183       switch (c) 
0184     {
0185       // the -s (source type) switch
0186     case 'e':
0187       if ( !sscanf(optarg, "%d", &eventnumber) ) exitmsg();
0188       break;
0189 
0190     case 'b':
0191       if ( !sscanf(optarg, "%d", &buffer_size) ) exitmsg();
0192       buffer_size = buffer_size*256*1024;
0193       break;
0194 
0195     case 'c':
0196       if ( !sscanf(optarg, "%d", &countnumber) ) exitmsg();
0197       break;
0198 
0199     case 's':
0200       if ( *optarg == 'T' ) sourcetype = TESTEVENTITERATOR;
0201       else if ( *optarg == 'f' ) sourcetype = FILEEVENTITERATOR;
0202       else if ( *optarg == 'r' ) sourcetype = RCDAQEVENTITERATOR;
0203       else if ( *optarg == 'l' ) sourcetype = LISTEVENTITERATOR;
0204       else  exitmsg();
0205       break;
0206                     
0207       // the -d (destination type) switch
0208     case 'd':
0209       if ( *optarg == 'd' ) destinationtype = ETPOOL;
0210       else if ( *optarg == 'f' ) destinationtype = DFILE;
0211       else if ( *optarg == 'n' ) destinationtype = DNULL;
0212       else if ( *optarg == 'a' ) destinationtype = OAML;
0213       else exitmsg();
0214       break;
0215 
0216     case 'v':   // verbose
0217       verbose++;
0218       break;
0219 
0220     case 'i':   // identify
0221       identify = 1;
0222       break;
0223 
0224     case 'w':   // wait interval
0225       if ( !sscanf(optarg, "%d", &waitinterval) ) exitmsg();
0226       break;
0227 
0228     case 'n':   // number of events
0229       if ( !sscanf(optarg, "%d", &maxevents) ) exitmsg();
0230       break;
0231 
0232     case 'z':   // gzip-compress
0233       gzipcompress = 1;
0234       break;
0235 
0236     case 'l':   // lzo-compress
0237       lzocompress = 1;
0238       break;
0239 
0240     case 'x':   // load a filter shared lib
0241       voidpointer = dlopen(optarg, RTLD_GLOBAL | RTLD_NOW);
0242       if (!voidpointer) 
0243         {
0244           std::cout << "Loading of the filter library " 
0245            << optarg << " failed: " << dlerror() << std::endl;
0246 
0247         }
0248       if (filter) std::cout <<" Filter \"" << filter->idString() << "\" registered" << std::endl; 
0249 
0250       sharedlib=optarg;
0251       load_lib=1;
0252       break;
0253 
0254     case 'h':
0255       exithelp();
0256       break;
0257 
0258     default:
0259       break;
0260                         
0261     }
0262     }
0263 #else
0264   char* pszParam;             // gotten parameter
0265   char chOpt;
0266 
0267   while ( (chOpt = GetOption(argc, argv, "e:b:c:s:d:n:w:vhiz", &pszParam) ) >1)
0268     {
0269       // COUT << "option is " << chOpt  << std::endl;
0270       // chOpt is valid argument
0271       switch (chOpt)
0272     {
0273     case 'e':
0274       //COUT << "parameter is " <<  pszParam << std::endl;
0275       if ( !sscanf(pszParam, "%d", &eventnumber) ) exitmsg();
0276       break;
0277       
0278     case 'b':
0279       //COUT << "parameter is " <<  pszParam << std::endl;
0280       if ( !sscanf(pszParam, "%d", &buffer_size) ) exitmsg();
0281       buffer_size = buffer_size*256*1024;
0282       break;
0283 
0284     case 'c':
0285       //COUT << "parameter is " <<  pszParam << std::endl;
0286       if ( !sscanf(pszParam, "%d", &countnumber) ) exitmsg();
0287       break;
0288 
0289     case 's':
0290       //COUT << "parameter is " <<  pszParam << std::endl;
0291       if ( *pszParam == 'T' ) sourcetype = TESTEVENTITERATOR;
0292       else if ( *pszParam == 'f' ) sourcetype = FILEEVENTITERATOR;
0293       else  exitmsg();
0294       break;
0295                     
0296       // the -d (destination type) switch
0297     case 'd':
0298       // COUT << "parameter is " <<  pszParam << std::endl;
0299       if ( *pszParam == 'f' ) destinationtype = DFILE;
0300       else if ( *pszParam == 'n' ) destinationtype = DNULL;
0301       else exitmsg();
0302       break;
0303 
0304     case 'v':   // verbose
0305       verbose = 1;
0306       break;
0307 
0308     case 'i':   // identify
0309       identify = 1;
0310       break;
0311 
0312     case 'w':   // wait interval
0313       if ( !sscanf(pszParam, "%d", &waitinterval) ) exitmsg();
0314       break;
0315 
0316     case 'n':   // number of events
0317       //COUT << "parameter is " <<  pszParam << std::endl;
0318       if ( !sscanf(pszParam, "%d", &maxevents) ) exitmsg();
0319       break;
0320 
0321     case 'z':   // gzip-compress
0322       gzipcompress = 1;
0323       break;
0324 
0325     case 'h':
0326       exithelp();
0327       break;
0328 
0329     default:
0330       break;
0331        
0332      }
0333      }
0334 #endif
0335 
0336 
0337   if ( eventnumber && countnumber) evtcountexitmsg();
0338   if ( gzipcompress && lzocompress ) compressionexitmsg();
0339 
0340   // install some handlers for the most common signals
0341 #ifndef WIN32
0342   signal(SIGKILL, sig_handler);
0343   signal(SIGTERM, sig_handler);
0344   signal(SIGINT,  sig_handler);
0345 #endif
0346 
0347   // see if we can open the file
0348   switch (sourcetype)
0349     {
0350     case RCDAQEVENTITERATOR:
0351       it = new rcdaqEventiterator(argv[optind], status);
0352       break;
0353 
0354     case  TESTEVENTITERATOR:
0355       
0356       it = new testEventiterator();
0357       status =0;
0358       break;
0359 
0360     case  FILEEVENTITERATOR:
0361       it = new fileEventiterator(argv[optind], status);
0362       break;
0363        
0364     case  LISTEVENTITERATOR:
0365       it = new listEventiterator(argv[optind], status);
0366       break;
0367        
0368     default:
0369       exitmsg();
0370       break;
0371     }
0372 
0373   if (status)
0374     {
0375       delete it;
0376       COUT << "Could not open input stream" << std::endl;
0377       exit(1);
0378     }
0379 
0380   // set the verbosity of the iterator
0381   it->setVerbosity(verbose);
0382 
0383   // now we'll see where the data go to
0384 
0385   if ( destinationtype == DNULL )
0386     {
0387       //    identify = 1;
0388     }
0389 
0390   else if (  destinationtype == DFILE)
0391     {
0392       buffer = new PHDWORD [buffer_size];
0393 
0394       unlink (argv[optind+1]);
0395  
0396       fd = open (argv[optind+1], O_WRONLY | O_CREAT | O_EXCL | O_LARGEFILE , 
0397           S_IRWXU | S_IROTH | S_IRGRP );
0398 
0399       if ( fd < 0) 
0400     {
0401       COUT << "Could not open file: " <<  argv[optind+1] << std::endl;
0402       exit (1);
0403     }
0404       if (gzipcompress) 
0405     {
0406       ob = new ogzBuffer (fd, buffer, buffer_size);
0407     }
0408       else if ( lzocompress)
0409     {
0410       ob = new olzoBuffer (fd, buffer, buffer_size);
0411     }
0412       else
0413     {
0414       ob = new ophBuffer (fd, buffer, buffer_size);
0415     }
0416 
0417     }
0418 
0419 
0420   else if (  destinationtype == OAML)
0421     {
0422       buffer = new PHDWORD [buffer_size];
0423       ob = new oamlBuffer (argv[optind+1], buffer, buffer_size);
0424     }
0425 
0426             
0427   else 
0428     {
0429       COUT << "invalid destination" << std::endl;
0430       exitmsg();
0431     }
0432     
0433 
0434   //  COUT << "waitinterval is " << waitinterval << std::endl;
0435   // ok. now go through the events
0436   Event *evt;
0437   int take_this;
0438   int count = 0;
0439   //COUT << " max events = " <<  maxevents << std::endl;
0440   while ( ( maxevents == 0 || eventnr < maxevents)  &&
0441       ( evt = it->getNextEvent())  )
0442     {
0443       take_this = 1;
0444       count++;
0445 
0446       if ( eventnumber )
0447     {
0448       if ( evt->getEvtSequence() == eventnumber)
0449         eventnumber = 0;
0450       else
0451         take_this = 0;
0452     }
0453 
0454       if ( countnumber && count < countnumber)
0455     take_this = 0;
0456 
0457       if (take_this)
0458     {
0459       if ( (! filter) ||  filter->select(evt) )
0460         {
0461           if (identify) evt->identify();
0462           if ( destinationtype == DFILE || destinationtype == OAML )
0463         {
0464           status = ob->addEvent(evt);
0465         }
0466           if ( status ) 
0467         {
0468           COUT << "Error writing events " << std::endl;
0469           break;
0470         }
0471           eventnr++;
0472         }
0473     }
0474       
0475       delete evt;
0476       
0477 #ifndef WIN32
0478       if (waitinterval > 0) usleep(waitinterval*1000);
0479 #else
0480       if (waitinterval > 0) Sleep(waitinterval);
0481 #endif
0482 
0483     }
0484   delete it;
0485 
0486   if ( destinationtype == DFILE  )
0487     {
0488       delete ob;
0489       
0490 #ifndef WIN32
0491       close(fd);
0492 #endif
0493     }
0494 
0495 #ifndef WIN32
0496   else if ( destinationtype == OAML  )
0497     {
0498       delete ob;
0499     }
0500 #endif
0501 
0502   
0503   return 0;
0504 }
0505 
0506 #if defined(SunOS) || defined(Linux) || defined(OSF1) 
0507 void sig_handler(int i)
0508 #else
0509   void sig_handler(...)
0510 #endif
0511 {
0512   if (it) delete it;
0513   exit(0);
0514 }
0515 
0516 
0517