File indexing completed on 2025-08-02 08:21:01
0001 #include <daqBuffer.h>
0002
0003 #include <daqONCSEvent.h>
0004 #include <daqPRDFEvent.h>
0005
0006 #include <signal.h>
0007 #include <unistd.h>
0008 #include <sys/types.h>
0009 #include <sys/socket.h>
0010 #include <sys/stat.h>
0011 #include <string.h>
0012 #include <lzo/lzo1x.h>
0013
0014 using namespace std;
0015
0016 int daqBuffer::lzo_initialized = 0;
0017
0018
0019 int readn (int fd, char *ptr, const int nbytes);
0020 int writen (int fd, char *ptr, const int nbytes);
0021
0022
0023
0024 daqBuffer::daqBuffer (const int irun, const int length
0025 , const int iseq, md5_state_t *md5state)
0026 {
0027 int *b = new int [length];
0028 bptr = ( buffer_ptr ) b;
0029
0030 data_ptr = &(bptr->data[0]);
0031 max_length = length;
0032 max_size = max_length;
0033 _broken = 0;
0034
0035 current_event = 0;
0036 current_etype = -1;
0037
0038
0039 format=DAQONCSFORMAT;
0040 currentBufferID = ONCSBUFFERHEADER;
0041 _md5state = md5state;
0042 wants_compression = 0;
0043 md5_enabled = 1;
0044 wrkmem = 0;
0045 outputarraylength = 0;
0046 outputarray = 0;
0047
0048 prepare_next (iseq, irun);
0049 }
0050
0051
0052
0053 daqBuffer::~daqBuffer ()
0054 {
0055 int *b = (int *) bptr;
0056 delete [] b;
0057 if (outputarray) delete [] outputarray;
0058 if (wrkmem) delete [] wrkmem;
0059 }
0060
0061
0062
0063 int daqBuffer::prepare_next( const int iseq
0064 , const int irun)
0065 {
0066
0067
0068
0069 if ( current_event) delete current_event;
0070 current_event = 0;
0071
0072
0073 bptr->Length = BUFFERHEADERLENGTH*4;
0074 bptr->ID = currentBufferID;
0075 bptr->Bufseq = iseq;
0076 if (irun>0) bptr->Runnr = irun;
0077
0078 current_index = 0;
0079 left = max_size - BUFFERHEADERLENGTH - EOBLENGTH;
0080 has_end = 0;
0081
0082
0083 return 0;
0084 }
0085
0086
0087 int daqBuffer::nextEvent(const int etype, const int evtseq, const int evtsize)
0088 {
0089 if (current_event) delete current_event;
0090 current_event = 0;
0091 current_etype = -1;
0092
0093 if (evtsize > left-EOBLENGTH) return -1;
0094
0095 if ( format)
0096 {
0097 current_event = new daqPRDFEvent(&(bptr->data[current_index]), evtsize
0098 ,bptr->Runnr, etype, evtseq);
0099 left -= (EVTHEADERLENGTH + 8);
0100 current_index += EVTHEADERLENGTH+8;
0101 bptr->Length += (EVTHEADERLENGTH+8)*4;
0102 }
0103 else
0104 {
0105 current_event = new daqONCSEvent(&(bptr->data[current_index]), evtsize
0106 ,bptr->Runnr, etype, evtseq);
0107 left -= EVTHEADERLENGTH;
0108 current_index += EVTHEADERLENGTH;
0109 bptr->Length += EVTHEADERLENGTH*4;
0110 }
0111
0112
0113 current_etype = etype;
0114
0115 return 0;
0116 }
0117
0118
0119 unsigned int daqBuffer::addSubevent( daq_device *dev)
0120 {
0121 unsigned int len;
0122
0123 len = current_event->addSubevent(current_etype, dev);
0124
0125 left -= len;
0126 current_index += len;
0127 bptr->Length += len*4;
0128
0129 return len;
0130 }
0131
0132
0133 unsigned int daqBuffer::addEoB()
0134 {
0135 if (has_end) return -1;
0136 bptr->data[current_index++] = 2;
0137 bptr->data[current_index++] = 0;
0138 bptr->Length += 2*4;
0139
0140 has_end = 1;
0141 if ( current_event) delete current_event;
0142 current_event = 0;
0143 return 0;
0144 }
0145
0146
0147
0148
0149
0150
0151
0152
0153
0154
0155
0156 unsigned int daqBuffer::writeout ( int fd)
0157 {
0158
0159 if ( _broken) return 0;
0160 if (!has_end) addEoB();
0161
0162 unsigned int bytes = 0;;
0163
0164 if ( ! wants_compression)
0165 {
0166 int blockcount = ( getLength() + 8192 -1)/8192;
0167 int bytecount = blockcount*8192;
0168 bytes = writen ( fd, (char *) bptr , bytecount );
0169 if ( _md5state && md5_enabled)
0170 {
0171
0172 md5_append(_md5state, (const md5_byte_t *)bptr,bytes );
0173 }
0174 return bytes;
0175 }
0176 else
0177 {
0178 compress();
0179 int blockcount = ( outputarray[0] + 8192 -1)/8192;
0180 int bytecount = blockcount*8192;
0181 bytes = writen ( fd, (char *) outputarray , bytecount );
0182 if ( _md5state && md5_enabled)
0183 {
0184
0185 md5_append(_md5state, (const md5_byte_t *)outputarray,bytes );
0186 }
0187 return bytes;
0188 }
0189 }
0190
0191 #define ACKVALUE 101
0192
0193 unsigned int daqBuffer::sendout ( int fd )
0194 {
0195 if ( _broken) return 0;
0196
0197 if (!has_end) addEoB();
0198
0199 int total = getLength();
0200
0201
0202
0203 int opcode = htonl(CTRL_DATA);
0204 int status = writen(fd, (char *) &opcode, sizeof(int));
0205
0206
0207
0208
0209 opcode = htonl(total);
0210 status |= writen(fd, (char *) &opcode, sizeof(int));
0211
0212
0213 char *p = (char *) bptr;
0214 int sent = writen(fd,p,total);
0215
0216
0217 readn (fd, (char *) &opcode, sizeof(int));
0218 opcode = ntohl(opcode);
0219 if ( opcode != CTRL_REMOTESUCCESS) return -1;
0220
0221 return sent;
0222 }
0223
0224
0225 unsigned int daqBuffer::sendData ( int fd, const int max_length)
0226 {
0227 if ( _broken) return 0;
0228
0229 if (!has_end) addEoB();
0230
0231 int total = getLength();
0232
0233 if ( total > max_length)
0234 {
0235 cout << "Monitoring: data size exceeds limit -- " << total << " limit: " << max_length << endl;
0236 total = max_length;
0237 }
0238
0239 int ntotal = htonl(total);
0240 int status = writen(fd, (char *) &ntotal, 4);
0241
0242 char *p = (char *) bptr;
0243 int sent = writen(fd,p,total);
0244
0245 return sent;
0246 }
0247
0248 int daqBuffer::setCompression(const int flag)
0249 {
0250 if ( !flag)
0251 {
0252 wants_compression = 0;
0253 return 0;
0254 }
0255 else
0256 {
0257 if ( ! lzo_initialized )
0258 {
0259 if (lzo_init() != LZO_E_OK)
0260 {
0261 std::cerr << "Could not initialize LZO" << std::endl;
0262 _broken = 1;
0263 }
0264 lzo_initialized = 1;
0265 }
0266
0267 if ( !wrkmem)
0268 {
0269 wrkmem = (lzo_bytep) lzo_malloc(LZO1X_1_12_MEM_COMPRESS);
0270 if (wrkmem)
0271 {
0272 memset(wrkmem, 0, LZO1X_1_12_MEM_COMPRESS);
0273 }
0274 else
0275 {
0276 std::cerr << "Could not allocate LZO memory" << std::endl;
0277 _broken = 1;
0278 return -1;
0279 }
0280 outputarraylength = max_length + 8192;
0281 outputarray = new unsigned int[outputarraylength];
0282 }
0283 wants_compression = 1;
0284
0285 return 0;
0286 }
0287 }
0288
0289 int daqBuffer::compress ()
0290 {
0291 if ( _broken) return -1;
0292
0293 lzo_uint outputlength_in_bytes = outputarraylength*4-16;
0294 lzo_uint in_len = getLength();
0295
0296 lzo1x_1_12_compress( (lzo_byte *) bptr,
0297 in_len,
0298 (lzo_byte *)&outputarray[4],
0299 &outputlength_in_bytes,wrkmem);
0300
0301
0302 outputarray[0] = outputlength_in_bytes +4*BUFFERHEADERLENGTH;
0303 outputarray[1] = LZO1XBUFFERMARKER;
0304 outputarray[2] = bptr->Bufseq;
0305 outputarray[3] = getLength();
0306
0307 return 0;
0308 }
0309
0310
0311
0312 int daqBuffer::setMaxSize(const int size)
0313 {
0314 if (size < 0) return -1;
0315 if (size == 0) max_size = max_length;
0316 else
0317 {
0318 max_size = (size + 8191)/8192;
0319 max_size *= 2048;
0320
0321 if (max_size > max_length)
0322 {
0323 max_size = max_length;
0324 return -2;
0325 }
0326 }
0327 return 0;
0328 }
0329
0330
0331 int daqBuffer::getMaxSize() const
0332 {
0333 return max_size*4;
0334 }
0335
0336 int daqBuffer::setEventFormat(const int f)
0337 {
0338 if (f)
0339 {
0340 format = DAQPRDFFORMAT;
0341 currentBufferID = PRDFBUFFERHEADER;
0342 }
0343 else
0344 {
0345 format = DAQONCSFORMAT;
0346 currentBufferID = ONCSBUFFERHEADER;
0347 }
0348
0349 bptr->ID = currentBufferID;
0350 return 0;
0351 }
0352
0353