File indexing completed on 2025-08-03 08:20:50
0001
0002 #include <sys/types.h>
0003 #include <sys/stat.h>
0004 #include <fcntl.h>
0005 #include <unistd.h>
0006
0007 #include "ophBuffer.h"
0008
0009 # include "Cframe.h"
0010 # include "frameRoutines.h"
0011 # include "frameHdr.h"
0012 # include "A_Event.h"
0013
0014 #include "BufferConstants.h"
0015 #include "EventTypes.h"
0016
0017
0018
0019
0020
0021
0022 ophBuffer::ophBuffer (const char *filename, PHDWORD * where, const int length, int &status
0023 , const int irun, const int iseq)
0024 {
0025 status = 0;
0026 our_fd = 1;
0027
0028 fd = open(filename, O_WRONLY | O_CREAT | O_EXCL | O_LARGEFILE ,
0029 S_IRWXU | S_IROTH | S_IRGRP );
0030
0031 if ( fd < 0)
0032 {
0033 status =1;
0034 good_object = 0;
0035 return;
0036 }
0037 good_object = 1;
0038 bptr = ( buffer_ptr) where;
0039 data_ptr = &(bptr->data[0]);
0040 max_length = length;
0041 max_size = max_length;
0042 left = max_size - BUFFERHEADERLENGTH;
0043 bptr->ID = -64;
0044 bptr->Bufseq = iseq;
0045 bptr->Runnr = 0;
0046 current_event = 0;
0047 current_etype = DATAEVENT;
0048 sequence = iseq;
0049 eventsequence = 0;
0050 runnumber = irun;
0051 byteswritten = 0;
0052
0053 prepare_next ();
0054 }
0055
0056 ophBuffer::ophBuffer (int fdin, PHDWORD * where, const int length
0057 , const int irun, const int iseq)
0058 {
0059 fd = fdin;
0060 our_fd = 0;
0061 good_object = 1;
0062 bptr = ( buffer_ptr) where;
0063 data_ptr = &(bptr->data[0]);
0064 max_length = length;
0065 max_size = max_length;
0066 left = max_size - BUFFERHEADERLENGTH;
0067 bptr->ID = -64;
0068 bptr->Bufseq = iseq;
0069 bptr->Runnr = 0;
0070 current_event = 0;
0071 current_etype = DATAEVENT;
0072 sequence = iseq;
0073 eventsequence = 0;
0074 runnumber = irun;
0075 byteswritten = 0;
0076
0077 prepare_next ();
0078 }
0079
0080
0081
0082 int ophBuffer::prepare_next()
0083 {
0084
0085
0086 bptr->Length = BUFFERHEADERLENGTH*4;
0087 bptr->ID = -64;
0088 bptr->Bufseq = 0;
0089 sequence++;
0090 bptr->Runnr = runnumber;
0091
0092 current_index = 0;
0093 left=max_size - BUFFERHEADERLENGTH ;
0094 has_end = 0;
0095 dirty = 1;
0096 return 0;
0097 }
0098
0099
0100
0101 int ophBuffer::nextEvent( const unsigned int evtsize, const int etype, const int evtseq)
0102 {
0103
0104 if (current_event) delete current_event;
0105 current_event = 0;
0106
0107 if (evtsize > max_size - EVTHEADERLENGTH) return -1;
0108 if (evtsize <=0) return -2;
0109
0110 if (evtsize > left-EOBLENGTH)
0111 {
0112 writeout();
0113 prepare_next();
0114 }
0115
0116 if (etype >0) current_etype = etype;
0117
0118 if (evtseq > 0) eventsequence = evtseq;
0119 else eventsequence++;
0120
0121
0122 current_event = new oEvent(&(bptr->data[current_index]), evtsize
0123 ,bptr->Runnr, current_etype, eventsequence);
0124
0125 left -= EVTHEADERLENGTH;
0126 current_index += EVTHEADERLENGTH;
0127 bptr->Length += EVTHEADERLENGTH*4;
0128 bptr->Bufseq++;
0129
0130 dirty = 1;
0131 return 0;
0132 }
0133
0134 int ophBuffer::addRawEvent( unsigned int *data)
0135 {
0136
0137 if ( ! good_object) return -1;
0138 int wstatus;
0139
0140 unsigned int nw = data[0];
0141
0142 if ( nw > left-EOBLENGTH)
0143 {
0144 wstatus = writeout();
0145 prepare_next();
0146 if (wstatus) return wstatus;
0147 }
0148
0149 memcpy ( (char *) &(bptr->data[current_index]), (char *) data, 4*nw);
0150
0151 left -= nw;
0152 current_index += nw;
0153 bptr->Length += nw*4;
0154 bptr->Bufseq++;
0155 dirty =1;
0156 return 0;
0157 }
0158
0159
0160
0161 int ophBuffer::addEvent( Event *Evt)
0162 {
0163
0164 if ( ! good_object) return -1;
0165 int nw;
0166
0167 int wstatus;
0168
0169 runnumber = Evt->getRunNumber();
0170
0171 if ( Evt->getEvtLength() > left-EOBLENGTH)
0172 {
0173 wstatus = writeout();
0174 prepare_next();
0175 if (wstatus) return wstatus;
0176 }
0177
0178 Evt->Copy( (int *) &(bptr->data[current_index]), Evt->getEvtLength(), &nw);
0179
0180 left -= nw;
0181 current_index += nw;
0182 bptr->Length += nw*4;
0183 bptr->Bufseq++;
0184 dirty =1;
0185 return 0;
0186 }
0187
0188
0189 int ophBuffer::addFrame( PHDWORD *frame)
0190 {
0191 int len;
0192
0193 if ( ! good_object) return 0;
0194
0195 len = current_event->addFrame(frame);
0196
0197 left -= len;
0198 current_index += len;
0199 bptr->Length += len*4;
0200
0201
0202
0203
0204
0205
0206
0207 return len;
0208 }
0209
0210
0211 int ophBuffer::addPacket( const Packet *p)
0212 {
0213 int len;
0214
0215 if ( ! good_object) return 0;
0216
0217 len = current_event->addPacket(p);
0218 if (len < 0) return 0;
0219
0220 left -= len;
0221 current_index += len;
0222 bptr->Length += len*4;
0223
0224
0225 return len;
0226 }
0227
0228
0229
0230
0231
0232 int ophBuffer::addUnstructPacketData(PHDWORD * data,
0233 const int length,
0234 const int id,
0235 const int wordsize,
0236 const int hitformat)
0237 {
0238 int len;
0239
0240 if ( ! good_object) return 0;
0241
0242 len = current_event->addUnstructPacketData(data, length
0243 ,id , wordsize , hitformat);
0244 if (len < 0) return 0;
0245
0246 left -= len;
0247 current_index += len;
0248 bptr->Length += len*4;
0249
0250
0251
0252
0253
0254
0255
0256 return len;
0257 }
0258
0259
0260
0261
0262
0263
0264
0265
0266
0267
0268
0269
0270
0271
0272
0273
0274 int ophBuffer::writeout()
0275 {
0276
0277
0278
0279 if ( ! good_object || fd <= 0 ) return -1;
0280
0281
0282 if (! dirty) return 0;
0283
0284 if (! has_end) addEoB();
0285
0286 if (fd < 0) return 0;
0287
0288
0289 unsigned int ip =0;
0290 char *cp = (char *) bptr;
0291
0292 while (ip < bptr->Length)
0293 {
0294 int n = write ( fd, cp, BUFFERBLOCKSIZE);
0295 if ( n != BUFFERBLOCKSIZE)
0296 {
0297 std::cout << " could not write output, bytes written: " << n << std::endl;
0298 return 0;
0299 }
0300 cp += BUFFERBLOCKSIZE;
0301 ip+=BUFFERBLOCKSIZE;
0302 }
0303 dirty = 0;
0304 byteswritten += ip;
0305 return 0;
0306 #ifdef WITHTHREADS
0307
0308 }
0309 else
0310 {
0311
0312 if (ThreadId)
0313 {
0314 pthread_join(ThreadId, NULL);
0315 byteswritten += thread_arg[2];
0316 }
0317 if (! dirty) return 0;
0318
0319 if (! has_end) addEoB();
0320
0321 if (fd < 0) return 0;
0322
0323
0324 buffer_ptr tmp = bptr_being_written;
0325 bptr_being_written = bptr;
0326 bptr = tmp;
0327 dirty = 0;
0328
0329
0330 thread_arg[0] = (int) fd;
0331 thread_arg[1] = (int) bptr_being_written;
0332 thread_arg[2] = 0;
0333
0334
0335 int s = pthread_create(&ThreadId, NULL, ophBuffer::writeThread, (void *) thread_arg);
0336
0337
0338 return 0;
0339
0340 }
0341 #endif
0342 }
0343
0344
0345
0346 int ophBuffer::setMaxSize(const int size)
0347 {
0348 if (size < 0) return -1;
0349 if (size == 0) max_size = max_length;
0350 else
0351 {
0352 max_size = (size + ( BUFFERBLOCKSIZE - size%BUFFERBLOCKSIZE) ) /4;
0353 if (max_size > max_length)
0354 {
0355 max_size = max_length;
0356 return -2;
0357 }
0358 }
0359 return 0;
0360 }
0361
0362
0363 int ophBuffer::getMaxSize() const
0364 {
0365 return max_size;
0366 }
0367
0368
0369 unsigned long long ophBuffer::getBytesWritten() const
0370 {
0371 return byteswritten;
0372 }
0373
0374
0375 int ophBuffer::addEoB()
0376 {
0377 if (has_end) return -1;
0378 bptr->data[current_index++] = 2;
0379 bptr->data[current_index++] = 0;
0380 bptr->Length += 2*4;
0381
0382 has_end = 1;
0383 return 0;
0384 }
0385
0386
0387 ophBuffer::~ophBuffer()
0388 {
0389 writeout();
0390 if (our_fd) close (fd);
0391
0392 }
0393
0394
0395
0396