File indexing completed on 2025-08-03 08:20:39
0001
0002 #include <sys/types.h>
0003 #include <sys/stat.h>
0004 #include <fcntl.h>
0005 #include <unistd.h>
0006 #include <string.h>
0007
0008 #include "ospBuffer.h"
0009 #include "ospEvent.h"
0010
0011 #include "BufferConstants.h"
0012 #include "EventTypes.h"
0013
0014
0015
0016
0017
0018
0019 ospBuffer::ospBuffer (const char *filename, PHDWORD * where, const int length, int &status
0020 , const int irun, const int iseq)
0021 {
0022 status = 0;
0023 our_fd = 1;
0024
0025 fd = open(filename, O_WRONLY | O_CREAT | O_EXCL | O_LARGEFILE ,
0026 S_IRWXU | S_IROTH | S_IRGRP );
0027
0028 if ( fd < 0)
0029 {
0030 status =1;
0031 good_object = 0;
0032 return;
0033 }
0034 good_object = 1;
0035 bptr = ( buffer_ptr) where;
0036 data_ptr = &(bptr->data[0]);
0037 max_length = length;
0038 max_size = max_length;
0039 left = max_size - BUFFERHEADERLENGTH;
0040 bptr->ID = -64;
0041 bptr->Bufseq = iseq;
0042 bptr->Runnr = 0;
0043 current_event = 0;
0044 current_etype = DATAEVENT;
0045 sequence = iseq;
0046 eventsequence = 0;
0047 runnumber = irun;
0048 byteswritten = 0;
0049
0050 prepare_next ();
0051 }
0052
0053 ospBuffer::ospBuffer (int fdin, PHDWORD * where, const int length
0054 , const int irun, const int iseq)
0055 {
0056 fd = fdin;
0057 our_fd = 0;
0058 good_object = 1;
0059 bptr = ( buffer_ptr) where;
0060 data_ptr = &(bptr->data[0]);
0061 max_length = length;
0062 max_size = max_length;
0063 left = max_size - BUFFERHEADERLENGTH;
0064 bptr->ID = -64;
0065 bptr->Bufseq = iseq;
0066 bptr->Runnr = 0;
0067 current_event = 0;
0068 current_etype = DATAEVENT;
0069 sequence = iseq;
0070 eventsequence = 0;
0071 runnumber = irun;
0072 byteswritten = 0;
0073
0074 prepare_next ();
0075 }
0076
0077
0078
0079 int ospBuffer::nextEvent( const unsigned int evtsize, const int etype, const int evtseq)
0080
0081 {
0082
0083 if (current_event) delete current_event;
0084 current_event = 0;
0085
0086 if (evtsize > max_size - EVTHEADERLENGTH) return -1;
0087 if (evtsize <=0) return -2;
0088
0089 if (evtsize > left-EOBLENGTH)
0090 {
0091 writeout();
0092 prepare_next();
0093 }
0094
0095 if (etype >0) current_etype = etype;
0096
0097 if (evtseq > 0) eventsequence = evtseq;
0098 else eventsequence++;
0099
0100
0101 current_event = new ospEvent(&(bptr->data[current_index]), evtsize
0102 ,bptr->Runnr, current_etype, eventsequence);
0103
0104 left -= EVTHEADERLENGTH;
0105 current_index += EVTHEADERLENGTH;
0106 bptr->Length += EVTHEADERLENGTH*4;
0107 bptr->Bufseq++;
0108
0109 dirty = 1;
0110 return 0;
0111 }
0112
0113
0114
0115
0116
0117 int ospBuffer::addRawEvent( unsigned int *data)
0118 {
0119
0120 if ( ! good_object) return -1;
0121 int wstatus;
0122
0123 unsigned int nw = data[0];
0124
0125 if ( nw > left-EOBLENGTH)
0126 {
0127 wstatus = writeout();
0128 prepare_next();
0129 if (wstatus) return wstatus;
0130 }
0131
0132 memcpy ( (char *) &(bptr->data[current_index]), (char *) data, 4*nw);
0133
0134 left -= nw;
0135 current_index += nw;
0136 bptr->Length += nw*4;
0137 bptr->Bufseq++;
0138 dirty =1;
0139 return 0;
0140 }
0141
0142
0143
0144 int ospBuffer::addEvent( Event *Evt)
0145 {
0146
0147 if ( ! good_object) return -1;
0148 int nw;
0149
0150 int wstatus;
0151
0152 runnumber = Evt->getRunNumber();
0153
0154 if ( Evt->getEvtLength() > left-EOBLENGTH)
0155 {
0156 wstatus = writeout();
0157 prepare_next();
0158 if (wstatus) return wstatus;
0159 }
0160
0161 Evt->Copy( (int *) &(bptr->data[current_index]), Evt->getEvtLength(), &nw);
0162
0163 left -= nw;
0164 current_index += nw;
0165 bptr->Length += nw*4;
0166 bptr->Bufseq++;
0167 dirty =1;
0168 return 0;
0169 }
0170
0171
0172
0173 int ospBuffer::addPacket( const Packet *p)
0174 {
0175 int len;
0176
0177 if ( ! good_object) return 0;
0178
0179 len = current_event->addPacket(p);
0180 if (len < 0) return 0;
0181
0182 left -= len;
0183 current_index += len;
0184 bptr->Length += len*4;
0185
0186
0187 return len;
0188 }
0189
0190
0191
0192
0193
0194 int ospBuffer::addUnstructPacketData(PHDWORD * data,
0195 const int length,
0196 const int id,
0197 const int wordsize,
0198 const int hitformat)
0199 {
0200 int len;
0201
0202 if ( ! good_object) return 0;
0203
0204 len = current_event->addUnstructPacketData(data, length
0205 ,id , wordsize , hitformat);
0206 if (len < 0) return 0;
0207
0208 left -= len;
0209 current_index += len;
0210 bptr->Length += len*4;
0211
0212
0213
0214
0215
0216
0217
0218 return len;
0219 }
0220
0221
0222 int ospBuffer::prepare_next()
0223 {
0224
0225
0226 bptr->Length = BUFFERHEADERLENGTH*4;
0227 bptr->ID = 0xffffc0c0;
0228 bptr->Bufseq = 0;
0229 sequence++;
0230 bptr->Runnr = runnumber;
0231
0232 current_index = 0;
0233 left=max_size - BUFFERHEADERLENGTH ;
0234 has_end = 0;
0235 dirty = 1;
0236 return 0;
0237 }
0238
0239
0240 int ospBuffer::setMaxSize(const int size)
0241 {
0242 if (size < 0) return -1;
0243 if (size == 0) max_size = max_length;
0244 else
0245 {
0246 max_size = (size + ( BUFFERBLOCKSIZE - size%BUFFERBLOCKSIZE) ) /4;
0247 if (max_size > max_length)
0248 {
0249 max_size = max_length;
0250 return -2;
0251 }
0252 }
0253 return 0;
0254 }
0255
0256
0257 int ospBuffer::getMaxSize() const
0258 {
0259 return max_size;
0260 }
0261
0262
0263 unsigned long long ospBuffer::getBytesWritten() const
0264 {
0265 return byteswritten;
0266 }
0267
0268
0269
0270
0271 int ospBuffer::writeout()
0272 {
0273
0274
0275
0276 if ( ! good_object || fd <= 0 ) return -1;
0277
0278
0279 if (! dirty) return 0;
0280
0281 if (! has_end) addEoB();
0282
0283 if (fd < 0) return 0;
0284
0285
0286 unsigned int ip =0;
0287 char *cp = (char *) bptr;
0288
0289 while (ip < bptr->Length)
0290 {
0291 int n = write ( fd, cp, BUFFERBLOCKSIZE);
0292 if ( n != BUFFERBLOCKSIZE)
0293 {
0294 std::cout << " could not write output, bytes written: " << n << std::endl;
0295 return 0;
0296 }
0297 cp += BUFFERBLOCKSIZE;
0298 ip+=BUFFERBLOCKSIZE;
0299 }
0300 dirty = 0;
0301 byteswritten += ip;
0302 return 0;
0303 #ifdef WITHTHREADS
0304
0305 }
0306 else
0307 {
0308
0309 if (ThreadId)
0310 {
0311 pthread_join(ThreadId, NULL);
0312 byteswritten += thread_arg[2];
0313 }
0314 if (! dirty) return 0;
0315
0316 if (! has_end) addEoB();
0317
0318 if (fd < 0) return 0;
0319
0320
0321 buffer_ptr tmp = bptr_being_written;
0322 bptr_being_written = bptr;
0323 bptr = tmp;
0324 dirty = 0;
0325
0326
0327 thread_arg[0] = (int) fd;
0328 thread_arg[1] = (int) bptr_being_written;
0329 thread_arg[2] = 0;
0330
0331
0332 int s = pthread_create(&ThreadId, NULL, ophBuffer::writeThread, (void *) thread_arg);
0333
0334
0335 return 0;
0336
0337 }
0338 #endif
0339 }
0340
0341
0342
0343
0344 int ospBuffer::addEoB()
0345 {
0346 if (has_end) return -1;
0347 bptr->data[current_index++] = 2;
0348 bptr->data[current_index++] = 0;
0349 bptr->Length += 2*4;
0350
0351 has_end = 1;
0352 return 0;
0353 }
0354
0355
0356
0357 ospBuffer::~ospBuffer()
0358 {
0359 writeout();
0360 if (our_fd) close (fd);
0361
0362 }
0363
0364
0365