Back to home page

sPhenix code displayed by LXR

 
 

    


File indexing completed on 2025-08-03 08:20:50

0001 
0002 #include <sys/types.h>
0003 #include <sys/stat.h>
0004 #include <fcntl.h>
0005 #include <unistd.h>
0006 #include <string.h>
0007 
0008 #include "ospBuffer.h"
0009 #include "ospEvent.h"
0010 
0011 #include "BufferConstants.h"
0012 #include "EventTypes.h"
0013 
0014 
0015 // the constructor first ----------------
0016 
0017 
0018 // the constructor first ----------------
0019 ospBuffer::ospBuffer (const char *filename, PHDWORD * where, const int length, int &status
0020         , const int irun, const int iseq)
0021 {
0022   status = 0;
0023   our_fd = 1;
0024 
0025   fd =  open(filename, O_WRONLY | O_CREAT | O_EXCL | O_LARGEFILE , 
0026           S_IRWXU | S_IROTH | S_IRGRP );
0027 
0028   if ( fd < 0) 
0029     {
0030       status =1;
0031       good_object = 0;
0032       return;
0033     }
0034   good_object = 1;
0035   bptr = ( buffer_ptr) where;
0036   data_ptr = &(bptr->data[0]);
0037   max_length = length;
0038   max_size = max_length;
0039   left = max_size - BUFFERHEADERLENGTH;
0040   bptr->ID = -64;
0041   bptr->Bufseq = iseq;
0042   bptr->Runnr = 0;
0043   current_event = 0;
0044   current_etype = DATAEVENT;
0045   sequence = iseq;
0046   eventsequence = 0;
0047   runnumber = irun;
0048   byteswritten = 0;
0049 
0050   prepare_next ();
0051 }
0052 
0053 ospBuffer::ospBuffer (int fdin, PHDWORD * where, const int length
0054         , const int irun, const int iseq)
0055 {
0056   fd = fdin;
0057   our_fd = 0;
0058   good_object = 1;
0059   bptr = ( buffer_ptr) where;
0060   data_ptr = &(bptr->data[0]);
0061   max_length = length;
0062   max_size = max_length;
0063   left = max_size - BUFFERHEADERLENGTH;
0064   bptr->ID = -64;
0065   bptr->Bufseq = iseq;
0066   bptr->Runnr = 0;
0067   current_event = 0;
0068   current_etype = DATAEVENT;
0069   sequence = iseq;
0070   eventsequence = 0;
0071   runnumber = irun;
0072   byteswritten = 0;
0073 
0074   prepare_next ();
0075 }
0076 
0077 
0078 // ---------------------------------------------------------
0079 int ospBuffer::nextEvent( const unsigned int evtsize, const int etype, const int evtseq)
0080   
0081 {
0082 
0083   if (current_event) delete current_event;
0084   current_event = 0;
0085 
0086   if (evtsize > max_size - EVTHEADERLENGTH) return -1;
0087   if (evtsize <=0) return -2;
0088 
0089   if (evtsize > left-EOBLENGTH)
0090     {
0091       writeout();
0092       prepare_next();
0093     }
0094 
0095   if (etype >0)   current_etype = etype;
0096 
0097   if (evtseq > 0)  eventsequence = evtseq;
0098   else eventsequence++;
0099 
0100 
0101   current_event = new ospEvent(&(bptr->data[current_index]), evtsize
0102                  ,bptr->Runnr, current_etype, eventsequence);
0103 
0104   left -= EVTHEADERLENGTH;
0105   current_index += EVTHEADERLENGTH;
0106   bptr->Length  += EVTHEADERLENGTH*4;
0107   bptr->Bufseq++;
0108 
0109   dirty = 1;
0110   return 0;
0111 }
0112 
0113 
0114 
0115 
0116 // ---------------------------------------------------------
0117 int ospBuffer::addRawEvent( unsigned int *data)
0118 {
0119   
0120   if ( ! good_object) return -1;
0121   int wstatus;
0122 
0123   unsigned int nw = data[0];
0124 
0125   if ( nw > left-EOBLENGTH)
0126     {
0127       wstatus = writeout();
0128       prepare_next();
0129       if (wstatus) return wstatus;
0130     }
0131 
0132   memcpy (  (char *) &(bptr->data[current_index]), (char *) data, 4*nw);
0133     
0134   left -= nw;
0135   current_index += nw;
0136   bptr->Length  += nw*4;
0137   bptr->Bufseq++;
0138   dirty =1;
0139   return 0; 
0140 }
0141 
0142 
0143 // ---------------------------------------------------------
0144 int ospBuffer::addEvent( Event *Evt)
0145 {
0146   
0147   if ( ! good_object) return -1;
0148   int nw;
0149 
0150   int wstatus;
0151 
0152   runnumber = Evt->getRunNumber();
0153 
0154   if ( Evt->getEvtLength() > left-EOBLENGTH)
0155     {
0156       wstatus = writeout();
0157       prepare_next();
0158       if (wstatus) return wstatus;
0159     }
0160 
0161   Evt->Copy( (int *) &(bptr->data[current_index]), Evt->getEvtLength(), &nw);
0162     
0163   left -= nw;
0164   current_index += nw;
0165   bptr->Length  += nw*4;
0166   bptr->Bufseq++;
0167   dirty =1;
0168   return 0;
0169 }
0170 
0171 
0172 // ----------------------------------------------------------
0173 int ospBuffer::addPacket( const Packet *p) 
0174 {
0175   int len;
0176 
0177   if ( ! good_object) return 0;
0178 
0179   len = current_event->addPacket(p);
0180   if (len < 0) return 0;
0181     
0182   left -= len;
0183   current_index += len;
0184   bptr->Length  += len*4;
0185   
0186 
0187   return len;
0188 }
0189 
0190 
0191 
0192 
0193 // ----------------------------------------------------------
0194 int ospBuffer::addUnstructPacketData(PHDWORD * data, 
0195             const int length,
0196             const int id,
0197             const int wordsize,
0198             const int hitformat)
0199 {
0200   int len;
0201 
0202   if ( ! good_object) return 0;
0203 
0204   len = current_event->addUnstructPacketData(data, length
0205                          ,id , wordsize , hitformat);
0206   if (len < 0) return 0;
0207     
0208   left -= len;
0209   current_index += len;
0210   bptr->Length  += len*4;
0211   
0212   // for (int k = 0; k<current_index; k++)
0213   //  COUT << k << " " << bptr->data[k] << std::endl;
0214 
0215   //  COUT << "------------------" << std::endl;
0216 
0217 
0218   return len;
0219 }
0220 
0221 // ---------------------------------------------------------
0222 int ospBuffer::prepare_next()
0223 {
0224   
0225   // re-initialize the event header length
0226   bptr->Length =  BUFFERHEADERLENGTH*4;
0227   bptr->ID = 0xffffc0c0;
0228   bptr->Bufseq = 0;
0229   sequence++;
0230   bptr->Runnr = runnumber;
0231 
0232   current_index = 0;
0233   left=max_size - BUFFERHEADERLENGTH ;
0234   has_end = 0;
0235   dirty = 1;
0236   return 0;
0237 }
0238 
0239 // ----------------------------------------------------------
0240 int ospBuffer::setMaxSize(const int size)
0241 {
0242   if (size < 0) return -1;
0243   if (size == 0) max_size = max_length;
0244   else
0245     {
0246       max_size = (size + ( BUFFERBLOCKSIZE - size%BUFFERBLOCKSIZE) ) /4;
0247       if (max_size > max_length)
0248     {
0249       max_size = max_length;
0250       return -2;
0251     }
0252     }
0253   return 0;
0254 }
0255 
0256 // ----------------------------------------------------------
0257 int ospBuffer::getMaxSize() const
0258 {
0259   return max_size;
0260 }
0261 
0262 // ----------------------------------------------------------
0263 unsigned long long ospBuffer::getBytesWritten() const
0264 {
0265   return byteswritten;
0266 }
0267 
0268 // ----------------------------------------------------------
0269 // 
0270 //
0271 int ospBuffer::writeout()
0272 {
0273 
0274   //  void *writeThread(void *);
0275 
0276   if ( ! good_object || fd <= 0 ) return -1;
0277 
0278 
0279       if (! dirty) return 0;
0280       
0281       if (! has_end) addEoB();
0282 
0283       if (fd < 0) return 0;
0284 
0285 
0286       unsigned int ip =0;
0287       char *cp = (char *) bptr;
0288 
0289       while (ip < bptr->Length)
0290     {
0291       int n = write ( fd, cp, BUFFERBLOCKSIZE);
0292       if ( n != BUFFERBLOCKSIZE)
0293         {
0294           std::cout << " could not write output, bytes written: " << n << std::endl;
0295           return 0;
0296         }
0297       cp += BUFFERBLOCKSIZE;
0298       ip+=BUFFERBLOCKSIZE;
0299     }
0300       dirty = 0;
0301       byteswritten += ip;
0302       return 0;
0303 #ifdef WITHTHREADS
0304   
0305     }
0306   else
0307     {
0308       //wait for a potential old thread to complete
0309       if (ThreadId) 
0310     {
0311       pthread_join(ThreadId, NULL);
0312       byteswritten += thread_arg[2];  // the number of bytes written from previosu thread
0313     }
0314       if (! dirty) return 0;
0315       
0316       if (! has_end) addEoB();
0317      
0318       if (fd <  0) return 0;
0319 
0320       //swap the buffers around
0321       buffer_ptr tmp = bptr_being_written;
0322       bptr_being_written = bptr;
0323       bptr = tmp;
0324       dirty = 0;
0325       // now fork off the write thread
0326 
0327       thread_arg[0] = (int) fd;
0328       thread_arg[1] = (int)  bptr_being_written;
0329       thread_arg[2] = 0;
0330 
0331       //      COUT << "starting write thread" << std::endl;
0332       int s = pthread_create(&ThreadId, NULL, ophBuffer::writeThread, (void *) thread_arg);
0333       //COUT << "create status is " << s << std::endl;
0334 
0335       return 0;
0336 
0337     }
0338 #endif
0339 }
0340 
0341 
0342 
0343 // ----------------------------------------------------------
0344 int ospBuffer::addEoB()
0345 {
0346   if (has_end) return -1;
0347   bptr->data[current_index++] = 2;  
0348   bptr->data[current_index++] = 0;
0349   bptr->Length  += 2*4;
0350 
0351   has_end = 1;
0352   return 0;
0353 }
0354 
0355 
0356 // ----------------------------------------------------------
0357 ospBuffer::~ospBuffer()
0358 {
0359   writeout();
0360   if (our_fd) close (fd);
0361 
0362 }
0363 
0364 
0365