Back to home page

sPhenix code displayed by LXR

 
 

    


File indexing completed on 2025-08-02 08:21:01

0001 #include <daqBuffer.h>
0002 
0003 #include <daqONCSEvent.h>
0004 #include <daqPRDFEvent.h>
0005 
0006 #include <signal.h>
0007 #include <unistd.h>
0008 #include <sys/types.h>
0009 #include <sys/socket.h>
0010 #include <sys/stat.h>
0011 #include <string.h>
0012 #include <lzo/lzo1x.h>
0013 
0014 using namespace std;
0015 
0016 int daqBuffer::lzo_initialized = 0;
0017 
0018 
0019 int readn (int fd, char *ptr, const int nbytes);
0020 int writen (int fd, char *ptr, const int nbytes);
0021 
0022 
0023 // the constructor first ----------------
0024 daqBuffer::daqBuffer (const int irun, const int length
0025               , const int iseq, md5_state_t *md5state)
0026 {
0027   int *b = new int [length];
0028   bptr = ( buffer_ptr ) b;
0029 
0030   data_ptr = &(bptr->data[0]);
0031   max_length = length;   // in 32bit units
0032   max_size = max_length;
0033   _broken = 0;
0034   
0035   current_event = 0;
0036   current_etype = -1;
0037 
0038   // preset everything to ONCS format
0039   format=DAQONCSFORMAT;
0040   currentBufferID = ONCSBUFFERHEADER;
0041   _md5state = md5state;
0042   wants_compression = 0;
0043   md5_enabled = 1;
0044   wrkmem = 0;
0045   outputarraylength = 0;
0046   outputarray = 0;
0047   
0048   prepare_next (iseq, irun);
0049 }
0050 
0051 
0052 // the destructor... ----------------
0053 daqBuffer::~daqBuffer ()
0054 {
0055   int *b = (int *)  bptr;
0056   delete [] b;
0057   if (outputarray) delete []  outputarray;
0058   if (wrkmem) delete [] wrkmem;
0059 }
0060 
0061 
0062 
0063 int daqBuffer::prepare_next( const int iseq
0064               , const int irun)
0065 {
0066 
0067   //  cout << __FILE__ << " " << __LINE__ << " bptr: " << bptr << endl;
0068   
0069   if ( current_event) delete current_event;
0070   current_event = 0;
0071   
0072   // re-initialize the event header length
0073   bptr->Length =  BUFFERHEADERLENGTH*4;
0074   bptr->ID = currentBufferID;
0075   bptr->Bufseq = iseq;
0076   if (irun>0) bptr->Runnr = irun;
0077 
0078   current_index = 0;
0079   left = max_size - BUFFERHEADERLENGTH - EOBLENGTH; 
0080   has_end = 0;
0081 
0082   
0083   return 0;
0084 }
0085 
0086 // ---------------------------------------------------------
0087 int daqBuffer::nextEvent(const int etype, const int evtseq, const int evtsize)
0088 {
0089   if (current_event) delete current_event;
0090   current_event = 0;
0091   current_etype = -1;
0092 
0093   if (evtsize > left-EOBLENGTH) return -1;
0094 
0095   if ( format)
0096     {
0097       current_event = new daqPRDFEvent(&(bptr->data[current_index]), evtsize
0098                  ,bptr->Runnr, etype, evtseq);
0099       left -= (EVTHEADERLENGTH + 8);
0100       current_index += EVTHEADERLENGTH+8;
0101       bptr->Length  += (EVTHEADERLENGTH+8)*4;
0102     }
0103   else
0104     {
0105       current_event = new daqONCSEvent(&(bptr->data[current_index]), evtsize
0106                  ,bptr->Runnr, etype, evtseq);
0107       left -= EVTHEADERLENGTH;
0108       current_index += EVTHEADERLENGTH;
0109       bptr->Length  += EVTHEADERLENGTH*4;
0110     }
0111 
0112 
0113   current_etype = etype;
0114 
0115   return 0;
0116 }
0117 
0118 // ----------------------------------------------------------
0119 unsigned int daqBuffer::addSubevent( daq_device *dev)
0120 {
0121   unsigned int len;
0122 
0123   len = current_event->addSubevent(current_etype, dev);
0124 
0125   left -= len;
0126   current_index += len;
0127   bptr->Length  += len*4;
0128   
0129   return len;
0130 }
0131 
0132 // ----------------------------------------------------------
0133 unsigned int daqBuffer::addEoB()
0134 {
0135   if (has_end) return -1;
0136   bptr->data[current_index++] = 2;  
0137   bptr->data[current_index++] = 0;
0138   bptr->Length  += 2*4;
0139 
0140   has_end = 1;
0141   if ( current_event) delete current_event;
0142   current_event = 0;
0143   return 0;
0144 }
0145 
0146 // ----------------------------------------------------------
0147 // int daqBuffer::transfer(dataProtocol * protocol)
0148 // {
0149 //   if (protocol)  
0150 //     return protocol->transfer((char *) bptr, bptr->Length);
0151 //   else
0152 //     return 0;
0153 
0154 // }
0155 
0156 unsigned int daqBuffer::writeout ( int fd)
0157 {
0158 
0159   if ( _broken) return 0;
0160   if (!has_end) addEoB();
0161 
0162   unsigned int bytes = 0;;
0163 
0164   if ( ! wants_compression)
0165     {
0166       int blockcount = ( getLength() + 8192 -1)/8192;
0167       int bytecount = blockcount*8192;
0168       bytes = writen ( fd, (char *) bptr , bytecount );
0169       if ( _md5state && md5_enabled)
0170     {
0171       //cout << __FILE__ << " " << __LINE__ << " updating md5  with " << bytes << " bytes" << endl; 
0172       md5_append(_md5state, (const md5_byte_t *)bptr,bytes );
0173     }
0174       return bytes;
0175     }
0176   else // we want compression
0177     {
0178       compress();
0179       int blockcount = ( outputarray[0] + 8192 -1)/8192;
0180       int bytecount = blockcount*8192;
0181       bytes = writen ( fd, (char *) outputarray , bytecount );
0182       if ( _md5state && md5_enabled)
0183     {
0184       //cout << __FILE__ << " " << __LINE__ << " updating md5  with " << bytes << " bytes" << endl; 
0185       md5_append(_md5state, (const md5_byte_t *)outputarray,bytes );
0186     }
0187       return bytes;
0188     }
0189 }
0190 
0191 #define ACKVALUE 101
0192 
0193 unsigned int daqBuffer::sendout ( int fd )
0194 {
0195   if ( _broken) return 0;
0196 
0197   if (!has_end) addEoB();
0198 
0199   int total = getLength();
0200 
0201   //std::cout << __FILE__ << " " << __LINE__ << " sending  opcode ctrl_data" <<  CTRL_DATA << std::endl ;
0202   // send "CTRL_DATA" opcode in network byte ordering
0203   int opcode = htonl(CTRL_DATA);
0204   int status = writen(fd, (char *) &opcode, sizeof(int));
0205 
0206   //  std::cout << __FILE__ << " " << __LINE__ << " sending  buffer size " <<  total << std::endl ;
0207 
0208   // re-use variable to send the length in network byte ordering 
0209   opcode = htonl(total);
0210   status |= writen(fd, (char *) &opcode, sizeof(int));
0211   
0212   // now send the actual data
0213   char *p = (char *) bptr;
0214   int sent = writen(fd,p,total);
0215 
0216   // wait for acknowledge... we re-use the opcode variable once more
0217   readn (fd, (char *) &opcode, sizeof(int));
0218   opcode = ntohl(opcode);
0219   if ( opcode != CTRL_REMOTESUCCESS) return -1; // signal error
0220   
0221   return sent;
0222 }
0223 
0224 // this is sending the monitoring data to a client
0225 unsigned int daqBuffer::sendData ( int fd, const int max_length)
0226 {
0227   if ( _broken) return 0;
0228 
0229   if (!has_end) addEoB();
0230 
0231   int total = getLength();
0232 
0233   if ( total > max_length)
0234     {
0235       cout << "Monitoring: data size exceeds limit -- " << total << " limit: " << max_length << endl;  
0236       total = max_length;
0237     }
0238 
0239   int ntotal = htonl(total);
0240   int status = writen(fd, (char *) &ntotal, 4);
0241 
0242   char *p = (char *) bptr;
0243   int sent = writen(fd,p,total);
0244 
0245   return sent;
0246 }
0247 
0248 int daqBuffer::setCompression(const int flag)
0249 {
0250   if ( !flag)
0251     {
0252       wants_compression = 0;
0253       return 0;
0254     }
0255   else
0256     {
0257       if ( !  lzo_initialized )
0258     {
0259       if (lzo_init() != LZO_E_OK)
0260         {
0261           std::cerr << "Could not initialize LZO" << std::endl;
0262           _broken = 1;
0263         }
0264       lzo_initialized = 1;
0265     }
0266     
0267       if ( !wrkmem)
0268     {
0269       wrkmem = (lzo_bytep) lzo_malloc(LZO1X_1_12_MEM_COMPRESS);
0270       if (wrkmem)
0271         {
0272           memset(wrkmem, 0, LZO1X_1_12_MEM_COMPRESS);
0273         }
0274       else
0275         {
0276           std::cerr << "Could not allocate LZO memory" << std::endl;
0277           _broken = 1;
0278           return -1;
0279         }
0280       outputarraylength = max_length + 8192;
0281       outputarray = new unsigned int[outputarraylength];
0282     }
0283       wants_compression = 1;
0284       //cout << " LZO compression enabled" << endl;
0285       return 0;
0286     }
0287 }
0288 
0289 int daqBuffer::compress ()
0290 {
0291   if ( _broken) return -1;
0292   
0293   lzo_uint outputlength_in_bytes = outputarraylength*4-16;
0294   lzo_uint in_len = getLength(); 
0295 
0296   lzo1x_1_12_compress( (lzo_byte *) bptr,
0297             in_len,  
0298                (lzo_byte *)&outputarray[4],
0299             &outputlength_in_bytes,wrkmem);
0300 
0301 
0302   outputarray[0] = outputlength_in_bytes +4*BUFFERHEADERLENGTH;
0303   outputarray[1] = LZO1XBUFFERMARKER;
0304   outputarray[2] = bptr->Bufseq;
0305   outputarray[3] = getLength();
0306 
0307   return 0;
0308 }
0309 
0310 
0311 // ----------------------------------------------------------
0312 int daqBuffer::setMaxSize(const int size)
0313 {
0314   if (size < 0) return -1;
0315   if (size == 0) max_size = max_length;
0316   else
0317     {
0318       max_size = (size + 8191)/8192;
0319       max_size *= 2048;
0320 
0321       if (max_size > max_length)
0322     {
0323       max_size = max_length;
0324       return -2;
0325     }
0326     }
0327   return 0;
0328 }
0329 
0330 // ----------------------------------------------------------
0331 int daqBuffer::getMaxSize() const 
0332 {
0333   return max_size*4;
0334 }
0335 
0336 int daqBuffer::setEventFormat(const int f) 
0337  { 
0338    if (f)
0339      {
0340        format = DAQPRDFFORMAT;
0341        currentBufferID = PRDFBUFFERHEADER;
0342      }
0343    else
0344      {
0345        format = DAQONCSFORMAT;
0346        currentBufferID = ONCSBUFFERHEADER;
0347      }
0348    
0349    bptr->ID = currentBufferID;
0350    return 0;
0351  }
0352 
0353