Back to home page

sPhenix code displayed by LXR

 
 

    


File indexing completed on 2025-08-03 08:20:39

0001 
0002 #include <sys/types.h>
0003 #include <sys/stat.h>
0004 #include <fcntl.h>
0005 #include <unistd.h>
0006 
0007 #include "ophBuffer.h"
0008 
0009 # include "Cframe.h"
0010 # include "frameRoutines.h"
0011 # include "frameHdr.h"
0012 # include "A_Event.h"
0013 
0014 #include "BufferConstants.h"
0015 #include "EventTypes.h"
0016 
0017 
0018 // the constructor first ----------------
0019 
0020 
0021 // the constructor first ----------------
0022 ophBuffer::ophBuffer (const char *filename, PHDWORD * where, const int length, int &status
0023         , const int irun, const int iseq)
0024 {
0025   status = 0;
0026   our_fd = 1;
0027 
0028   fd =  open(filename, O_WRONLY | O_CREAT | O_EXCL | O_LARGEFILE , 
0029           S_IRWXU | S_IROTH | S_IRGRP );
0030 
0031   if ( fd < 0) 
0032     {
0033       status =1;
0034       good_object = 0;
0035       return;
0036     }
0037   good_object = 1;
0038   bptr = ( buffer_ptr) where;
0039   data_ptr = &(bptr->data[0]);
0040   max_length = length;
0041   max_size = max_length;
0042   left = max_size - BUFFERHEADERLENGTH;
0043   bptr->ID = -64;
0044   bptr->Bufseq = iseq;
0045   bptr->Runnr = 0;
0046   current_event = 0;
0047   current_etype = DATAEVENT;
0048   sequence = iseq;
0049   eventsequence = 0;
0050   runnumber = irun;
0051   byteswritten = 0;
0052 
0053   prepare_next ();
0054 }
0055 
0056 ophBuffer::ophBuffer (int fdin, PHDWORD * where, const int length
0057         , const int irun, const int iseq)
0058 {
0059   fd = fdin;
0060   our_fd = 0;
0061   good_object = 1;
0062   bptr = ( buffer_ptr) where;
0063   data_ptr = &(bptr->data[0]);
0064   max_length = length;
0065   max_size = max_length;
0066   left = max_size - BUFFERHEADERLENGTH;
0067   bptr->ID = -64;
0068   bptr->Bufseq = iseq;
0069   bptr->Runnr = 0;
0070   current_event = 0;
0071   current_etype = DATAEVENT;
0072   sequence = iseq;
0073   eventsequence = 0;
0074   runnumber = irun;
0075   byteswritten = 0;
0076 
0077   prepare_next ();
0078 }
0079 
0080 
0081 // ---------------------------------------------------------
0082 int ophBuffer::prepare_next()
0083 {
0084   
0085   // re-initialize the event header length
0086   bptr->Length =  BUFFERHEADERLENGTH*4;
0087   bptr->ID = -64;
0088   bptr->Bufseq = 0;
0089   sequence++;
0090   bptr->Runnr = runnumber;
0091 
0092   current_index = 0;
0093   left=max_size - BUFFERHEADERLENGTH ;
0094   has_end = 0;
0095   dirty = 1;
0096   return 0;
0097 }
0098 
0099 
0100 // ---------------------------------------------------------
0101 int ophBuffer::nextEvent( const unsigned int evtsize, const int etype, const int evtseq)
0102 {
0103 
0104   if (current_event) delete current_event;
0105   current_event = 0;
0106 
0107   if (evtsize > max_size - EVTHEADERLENGTH) return -1;
0108   if (evtsize <=0) return -2;
0109 
0110   if (evtsize > left-EOBLENGTH)
0111     {
0112       writeout();
0113       prepare_next();
0114     }
0115 
0116   if (etype >0)   current_etype = etype;
0117 
0118   if (evtseq > 0)  eventsequence = evtseq;
0119   else eventsequence++;
0120 
0121 
0122   current_event = new oEvent(&(bptr->data[current_index]), evtsize
0123                  ,bptr->Runnr, current_etype, eventsequence);
0124 
0125   left -= EVTHEADERLENGTH;
0126   current_index += EVTHEADERLENGTH;
0127   bptr->Length  += EVTHEADERLENGTH*4;
0128   bptr->Bufseq++;
0129 
0130   dirty = 1;
0131   return 0;
0132 }
0133 // ---------------------------------------------------------
0134 int ophBuffer::addRawEvent( unsigned int *data)
0135 {
0136   
0137   if ( ! good_object) return -1;
0138   int wstatus;
0139 
0140   unsigned int nw = data[0];
0141 
0142   if ( nw > left-EOBLENGTH)
0143     {
0144       wstatus = writeout();
0145       prepare_next();
0146       if (wstatus) return wstatus;
0147     }
0148 
0149   memcpy (  (char *) &(bptr->data[current_index]), (char *) data, 4*nw);
0150     
0151   left -= nw;
0152   current_index += nw;
0153   bptr->Length  += nw*4;
0154   bptr->Bufseq++;
0155   dirty =1;
0156   return 0; 
0157 }
0158 
0159 
0160 // ---------------------------------------------------------
0161 int ophBuffer::addEvent( Event *Evt)
0162 {
0163   
0164   if ( ! good_object) return -1;
0165   int nw;
0166 
0167   int wstatus;
0168 
0169   runnumber = Evt->getRunNumber();
0170 
0171   if ( Evt->getEvtLength() > left-EOBLENGTH)
0172     {
0173       wstatus = writeout();
0174       prepare_next();
0175       if (wstatus) return wstatus;
0176     }
0177 
0178   Evt->Copy( (int *) &(bptr->data[current_index]), Evt->getEvtLength(), &nw);
0179     
0180   left -= nw;
0181   current_index += nw;
0182   bptr->Length  += nw*4;
0183   bptr->Bufseq++;
0184   dirty =1;
0185   return 0;
0186 }
0187 
0188 // ----------------------------------------------------------
0189 int ophBuffer::addFrame( PHDWORD *frame)
0190 {
0191   int len;
0192 
0193   if ( ! good_object) return 0;
0194 
0195   len = current_event->addFrame(frame);
0196 
0197   left -= len;
0198   current_index += len;
0199   bptr->Length  += len*4;
0200   
0201   // for (int k = 0; k<current_index; k++)
0202   //  COUT << k << " " << bptr->data[k] << std::endl;
0203 
0204   //  COUT << "------------------" << std::endl;
0205 
0206 
0207   return len;
0208 }
0209 
0210 // ----------------------------------------------------------
0211 int ophBuffer::addPacket( const Packet *p) 
0212 {
0213   int len;
0214 
0215   if ( ! good_object) return 0;
0216 
0217   len = current_event->addPacket(p);
0218   if (len < 0) return 0;
0219     
0220   left -= len;
0221   current_index += len;
0222   bptr->Length  += len*4;
0223   
0224 
0225   return len;
0226 }
0227 
0228 
0229 
0230 
0231 // ----------------------------------------------------------
0232 int ophBuffer::addUnstructPacketData(PHDWORD * data, 
0233             const int length,
0234             const int id,
0235             const int wordsize,
0236             const int hitformat)
0237 {
0238   int len;
0239 
0240   if ( ! good_object) return 0;
0241 
0242   len = current_event->addUnstructPacketData(data, length
0243                          ,id , wordsize , hitformat);
0244   if (len < 0) return 0;
0245     
0246   left -= len;
0247   current_index += len;
0248   bptr->Length  += len*4;
0249   
0250   // for (int k = 0; k<current_index; k++)
0251   //  COUT << k << " " << bptr->data[k] << std::endl;
0252 
0253   //  COUT << "------------------" << std::endl;
0254 
0255 
0256   return len;
0257 }
0258 
0259 
0260 // ----------------------------------------------------------
0261 // int ophBuffer::transfer(dataProtocol * protocol)
0262 // {
0263 //   if (protocol)  
0264 //     return protocol->transfer((char *) bptr, bptr->Length);
0265 //   else
0266 //     return 0;
0267 
0268 // }
0269 
0270 
0271 // ----------------------------------------------------------
0272 // 
0273 //
0274 int ophBuffer::writeout()
0275 {
0276 
0277   //  void *writeThread(void *);
0278 
0279   if ( ! good_object || fd <= 0 ) return -1;
0280 
0281 
0282       if (! dirty) return 0;
0283       
0284       if (! has_end) addEoB();
0285 
0286       if (fd < 0) return 0;
0287 
0288 
0289       unsigned int ip =0;
0290       char *cp = (char *) bptr;
0291 
0292       while (ip < bptr->Length)
0293     {
0294       int n = write ( fd, cp, BUFFERBLOCKSIZE);
0295       if ( n != BUFFERBLOCKSIZE)
0296         {
0297           std::cout << " could not write output, bytes written: " << n << std::endl;
0298           return 0;
0299         }
0300       cp += BUFFERBLOCKSIZE;
0301       ip+=BUFFERBLOCKSIZE;
0302     }
0303       dirty = 0;
0304       byteswritten += ip;
0305       return 0;
0306 #ifdef WITHTHREADS
0307   
0308     }
0309   else
0310     {
0311       //wait for a potential old thread to complete
0312       if (ThreadId) 
0313     {
0314       pthread_join(ThreadId, NULL);
0315       byteswritten += thread_arg[2];  // the number of bytes written from previosu thread
0316     }
0317       if (! dirty) return 0;
0318       
0319       if (! has_end) addEoB();
0320      
0321       if (fd <  0) return 0;
0322 
0323       //swap the buffers around
0324       buffer_ptr tmp = bptr_being_written;
0325       bptr_being_written = bptr;
0326       bptr = tmp;
0327       dirty = 0;
0328       // now fork off the write thread
0329 
0330       thread_arg[0] = (int) fd;
0331       thread_arg[1] = (int)  bptr_being_written;
0332       thread_arg[2] = 0;
0333 
0334       //      COUT << "starting write thread" << std::endl;
0335       int s = pthread_create(&ThreadId, NULL, ophBuffer::writeThread, (void *) thread_arg);
0336       //COUT << "create status is " << s << std::endl;
0337 
0338       return 0;
0339 
0340     }
0341 #endif
0342 }
0343 
0344 
0345 // ----------------------------------------------------------
0346 int ophBuffer::setMaxSize(const int size)
0347 {
0348   if (size < 0) return -1;
0349   if (size == 0) max_size = max_length;
0350   else
0351     {
0352       max_size = (size + ( BUFFERBLOCKSIZE - size%BUFFERBLOCKSIZE) ) /4;
0353       if (max_size > max_length)
0354     {
0355       max_size = max_length;
0356       return -2;
0357     }
0358     }
0359   return 0;
0360 }
0361 
0362 // ----------------------------------------------------------
0363 int ophBuffer::getMaxSize() const
0364 {
0365   return max_size;
0366 }
0367 
0368 // ----------------------------------------------------------
0369 unsigned long long ophBuffer::getBytesWritten() const
0370 {
0371   return byteswritten;
0372 }
0373 
0374 // ----------------------------------------------------------
0375 int ophBuffer::addEoB()
0376 {
0377   if (has_end) return -1;
0378   bptr->data[current_index++] = 2;  
0379   bptr->data[current_index++] = 0;
0380   bptr->Length  += 2*4;
0381 
0382   has_end = 1;
0383   return 0;
0384 }
0385 
0386 // ----------------------------------------------------------
0387 ophBuffer::~ophBuffer()
0388 {
0389   writeout();
0390   if (our_fd) close (fd);
0391 
0392 }
0393 
0394 
0395 
0396