Back to home page

sPhenix code displayed by LXR

 
 

    


File indexing completed on 2025-08-03 08:20:46

0001 
0002 #include "oamlBuffer.h"
0003 #include "BufferConstants.h"
0004 
0005 #include <unistd.h>
0006 #include <netdb.h>
0007 #include <cstring>
0008 #include <arpa/inet.h>
0009 #include <netinet/in.h>
0010 #include <sys/socket.h>
0011 
0012 #if defined(SunOS) || defined(OSF1)
0013 #include <strings.h>
0014 #endif
0015 
0016 
0017 // the constructor first ----------------
0018 oamlBuffer::oamlBuffer ( const char * host, const int port, PHDWORD * where, 
0019               const int length, 
0020               const int irun, 
0021               const int iseq): 
0022   olzoBuffer(0,where,length,irun,iseq)
0023 {
0024   good_object = 1;
0025 
0026   has_begun = 0;  // we will defer the connection until we have the first buffer to write
0027 
0028 
0029   ThePort = port;
0030   strcpy(HostName, host);
0031 
0032   if ( connect_aml() ) good_object =0;
0033 
0034   DontOverrideRunNumber = 0;
0035   RunNumber = irun;
0036   if ( irun) 
0037     {
0038       DontOverrideRunNumber = 1;
0039     }
0040 }
0041 
0042 
0043 oamlBuffer::oamlBuffer ( const char * hostport,  PHDWORD * where, 
0044               const int length, 
0045               const int irun, 
0046               const int iseq): 
0047   olzoBuffer(0,where,length,irun,iseq)
0048 {
0049   good_object = 1;
0050 
0051   has_begun = 0;  // we will defer the connection until we have the first buffer to write
0052 
0053   char *t = new char[strlen(hostport)+1];
0054   strcpy ( t, hostport);
0055 
0056   char *token = strtok(t, ":");
0057   if (token)
0058     {
0059       strcpy( HostName, token);
0060       token = strtok(0, ":");
0061       if (token)
0062     {
0063       sscanf(token, "%d", &ThePort);
0064     }
0065     }
0066 
0067   delete [] t;
0068 
0069   if ( connect_aml() ) good_object =0;
0070 
0071   DontOverrideRunNumber = 0;
0072   RunNumber = irun;
0073   if ( irun) 
0074     {
0075       DontOverrideRunNumber = 1;
0076     }
0077 
0078 }
0079 
0080 
0081 int oamlBuffer::connect_aml ()
0082 {
0083 
0084   struct sockaddr_in server_addr;
0085   struct hostent *p_host;
0086   p_host = gethostbyname(HostName);
0087 
0088   if ( ! p_host) return -3;
0089 
0090   bzero( (char*) &server_addr, sizeof(server_addr) );
0091   server_addr.sin_family = AF_INET;
0092   bcopy(p_host->h_addr, &(server_addr.sin_addr.s_addr), p_host->h_length);
0093   server_addr.sin_port = htons(ThePort);
0094 
0095 
0096   if ( (sockfd = socket(AF_INET, SOCK_STREAM, 0) ) < 0 )
0097     {
0098       std::cout << "error in socket" << std::endl;
0099       good_object = 0;
0100       return -1;
0101     }
0102 
0103   int xs = 512*1024;
0104   
0105   int s = setsockopt(sockfd, SOL_SOCKET, SO_SNDBUF,
0106              &xs, 4);
0107   if (s) std::cout << "setsockopt status = " << s << std::endl;
0108 
0109   if ( connect(sockfd, (struct sockaddr*) &server_addr
0110            , sizeof(server_addr)) < 0 ) 
0111     {
0112       std::cout << "error in connect" << std::endl;
0113       good_object = 0;
0114       return -1;
0115     }
0116 
0117   return 0;
0118 }
0119 
0120 int oamlBuffer::begin_run ()
0121 {
0122   int i;
0123   int controlword = htonl(CTRL_BEGINRUN);
0124   int hrun;
0125   if ( DontOverrideRunNumber )
0126     {
0127       hrun = htonl(RunNumber);  // this is our own, cached value from the constructor
0128     }
0129   else
0130     {
0131       hrun = htonl(runnumber);  // this is the one which the oBuffer parent maintains 
0132     }
0133   writen (sockfd,(char *)  &controlword, 4);
0134   writen (sockfd,(char *) &hrun, 4);
0135   readn (sockfd, (char *) &i, 4);
0136   has_begun = 1;
0137 
0138   return 0;
0139 
0140 }
0141 
0142 // ----------------------------------------------------------
0143 // returns the number of bytes written, including record wasted space.
0144 //
0145 int oamlBuffer::writeout()
0146 {
0147 
0148   int cstatus = 0;
0149 
0150   if (! dirty) return 0;
0151 
0152   if ( ! good_object) return 1;
0153   if ( ! has_begun ) cstatus = begin_run();
0154 
0155   if ( cstatus ) return cstatus;
0156 
0157   if (! has_end) addEoB();
0158 
0159   lzo_uint outputlength_in_bytes = outputarraylength*4-16;
0160   lzo_uint in_len = bptr->Length; 
0161 
0162   lzo1x_1_12_compress( (lzo_byte *) bptr,
0163                in_len,  
0164                (lzo_byte *)&outputarray[4],
0165                &outputlength_in_bytes,wrkmem);
0166 
0167 
0168   outputarray[0] = outputlength_in_bytes +4*BUFFERHEADERLENGTH;
0169   outputarray[1] =  LZO1XBUFFERMARKER;
0170   outputarray[2] = bptr->Bufseq;
0171   outputarray[3] = bptr->Length;
0172 
0173 
0174   int controlword = htonl(CTRL_DATA);
0175   //  std::cout << __LINE__ << "  sent control word" << std::endl;
0176   writen (sockfd,(char *)  &controlword, 4);
0177   int i = htonl(outputarray[0]);
0178   writen (sockfd,(char *) &i, 4);
0179   writen (sockfd,(char *)outputarray, outputarray[0] );
0180       
0181   //  std::cout << __LINE__ << "  waiting for ack" << std::endl;
0182   readn (sockfd, (char *) &i, 4);
0183   //  std::cout << __LINE__ << "  got it" << std::endl;
0184     
0185 
0186 
0187   dirty = 0;
0188   byteswritten += bptr->Length;
0189   return 0;
0190 }
0191 
0192 
0193 // ----------------------------------------------------------
0194 oamlBuffer::~oamlBuffer()
0195 {
0196   int wstatus =   writeout();
0197 
0198   if ( ! wstatus) 
0199     {
0200       
0201       int i;
0202       int controlword = htonl(CTRL_ENDRUN);
0203       writen (sockfd, (char *)&controlword, 4);
0204       
0205       
0206       readn (sockfd, (char *) &i, 4);
0207       
0208       controlword = htonl(CTRL_CLOSE);
0209       writen (sockfd, (char *)&controlword, 4);
0210     }
0211   dirty = 0;
0212   close (sockfd);
0213       
0214 }
0215 
0216 int oamlBuffer::readn (int fd, char *ptr, int nbytes)
0217 {
0218   int nleft, nread;
0219   nleft = nbytes;
0220   while ( nleft>0 )
0221     {
0222       nread = read (fd, ptr, nleft);
0223       if ( nread < 0 ) 
0224     return nread;
0225       else if (nread == 0) 
0226     break;
0227       nleft -= nread;
0228       ptr += nread;
0229     }
0230   return (nbytes-nleft);
0231 }
0232 
0233 
0234 int oamlBuffer::writen (int fd, char *ptr, int nbytes)
0235 {
0236   int nleft, nwritten;
0237   nleft = nbytes;
0238   while ( nleft>0 )
0239     {
0240       nwritten = write (fd, ptr, nleft);
0241       if ( nwritten < 0 ) 
0242     return nwritten;
0243 
0244       nleft -= nwritten;
0245       ptr += nwritten;
0246     }
0247   return (nbytes-nleft);
0248 }