File indexing completed on 2025-08-03 08:20:35
0001
0002 #include "oamlBuffer.h"
0003 #include "BufferConstants.h"
0004
0005 #include <unistd.h>
0006 #include <netdb.h>
0007 #include <cstring>
0008 #include <arpa/inet.h>
0009 #include <netinet/in.h>
0010 #include <sys/socket.h>
0011
0012 #if defined(SunOS) || defined(OSF1)
0013 #include <strings.h>
0014 #endif
0015
0016
0017
0018 oamlBuffer::oamlBuffer ( const char * host, const int port, PHDWORD * where,
0019 const int length,
0020 const int irun,
0021 const int iseq):
0022 olzoBuffer(0,where,length,irun,iseq)
0023 {
0024 good_object = 1;
0025
0026 has_begun = 0;
0027
0028
0029 ThePort = port;
0030 strcpy(HostName, host);
0031
0032 if ( connect_aml() ) good_object =0;
0033
0034 DontOverrideRunNumber = 0;
0035 RunNumber = irun;
0036 if ( irun)
0037 {
0038 DontOverrideRunNumber = 1;
0039 }
0040 }
0041
0042
0043 oamlBuffer::oamlBuffer ( const char * hostport, PHDWORD * where,
0044 const int length,
0045 const int irun,
0046 const int iseq):
0047 olzoBuffer(0,where,length,irun,iseq)
0048 {
0049 good_object = 1;
0050
0051 has_begun = 0;
0052
0053 char *t = new char[strlen(hostport)+1];
0054 strcpy ( t, hostport);
0055
0056 char *token = strtok(t, ":");
0057 if (token)
0058 {
0059 strcpy( HostName, token);
0060 token = strtok(0, ":");
0061 if (token)
0062 {
0063 sscanf(token, "%d", &ThePort);
0064 }
0065 }
0066
0067 delete [] t;
0068
0069 if ( connect_aml() ) good_object =0;
0070
0071 DontOverrideRunNumber = 0;
0072 RunNumber = irun;
0073 if ( irun)
0074 {
0075 DontOverrideRunNumber = 1;
0076 }
0077
0078 }
0079
0080
0081 int oamlBuffer::connect_aml ()
0082 {
0083
0084 struct sockaddr_in server_addr;
0085 struct hostent *p_host;
0086 p_host = gethostbyname(HostName);
0087
0088 if ( ! p_host) return -3;
0089
0090 bzero( (char*) &server_addr, sizeof(server_addr) );
0091 server_addr.sin_family = AF_INET;
0092 bcopy(p_host->h_addr, &(server_addr.sin_addr.s_addr), p_host->h_length);
0093 server_addr.sin_port = htons(ThePort);
0094
0095
0096 if ( (sockfd = socket(AF_INET, SOCK_STREAM, 0) ) < 0 )
0097 {
0098 std::cout << "error in socket" << std::endl;
0099 good_object = 0;
0100 return -1;
0101 }
0102
0103 int xs = 512*1024;
0104
0105 int s = setsockopt(sockfd, SOL_SOCKET, SO_SNDBUF,
0106 &xs, 4);
0107 if (s) std::cout << "setsockopt status = " << s << std::endl;
0108
0109 if ( connect(sockfd, (struct sockaddr*) &server_addr
0110 , sizeof(server_addr)) < 0 )
0111 {
0112 std::cout << "error in connect" << std::endl;
0113 good_object = 0;
0114 return -1;
0115 }
0116
0117 return 0;
0118 }
0119
0120 int oamlBuffer::begin_run ()
0121 {
0122 int i;
0123 int controlword = htonl(CTRL_BEGINRUN);
0124 int hrun;
0125 if ( DontOverrideRunNumber )
0126 {
0127 hrun = htonl(RunNumber);
0128 }
0129 else
0130 {
0131 hrun = htonl(runnumber);
0132 }
0133 writen (sockfd,(char *) &controlword, 4);
0134 writen (sockfd,(char *) &hrun, 4);
0135 readn (sockfd, (char *) &i, 4);
0136 has_begun = 1;
0137
0138 return 0;
0139
0140 }
0141
0142
0143
0144
0145 int oamlBuffer::writeout()
0146 {
0147
0148 int cstatus = 0;
0149
0150 if (! dirty) return 0;
0151
0152 if ( ! good_object) return 1;
0153 if ( ! has_begun ) cstatus = begin_run();
0154
0155 if ( cstatus ) return cstatus;
0156
0157 if (! has_end) addEoB();
0158
0159 lzo_uint outputlength_in_bytes = outputarraylength*4-16;
0160 lzo_uint in_len = bptr->Length;
0161
0162 lzo1x_1_12_compress( (lzo_byte *) bptr,
0163 in_len,
0164 (lzo_byte *)&outputarray[4],
0165 &outputlength_in_bytes,wrkmem);
0166
0167
0168 outputarray[0] = outputlength_in_bytes +4*BUFFERHEADERLENGTH;
0169 outputarray[1] = LZO1XBUFFERMARKER;
0170 outputarray[2] = bptr->Bufseq;
0171 outputarray[3] = bptr->Length;
0172
0173
0174 int controlword = htonl(CTRL_DATA);
0175
0176 writen (sockfd,(char *) &controlword, 4);
0177 int i = htonl(outputarray[0]);
0178 writen (sockfd,(char *) &i, 4);
0179 writen (sockfd,(char *)outputarray, outputarray[0] );
0180
0181
0182 readn (sockfd, (char *) &i, 4);
0183
0184
0185
0186
0187 dirty = 0;
0188 byteswritten += bptr->Length;
0189 return 0;
0190 }
0191
0192
0193
0194 oamlBuffer::~oamlBuffer()
0195 {
0196 int wstatus = writeout();
0197
0198 if ( ! wstatus)
0199 {
0200
0201 int i;
0202 int controlword = htonl(CTRL_ENDRUN);
0203 writen (sockfd, (char *)&controlword, 4);
0204
0205
0206 readn (sockfd, (char *) &i, 4);
0207
0208 controlword = htonl(CTRL_CLOSE);
0209 writen (sockfd, (char *)&controlword, 4);
0210 }
0211 dirty = 0;
0212 close (sockfd);
0213
0214 }
0215
0216 int oamlBuffer::readn (int fd, char *ptr, int nbytes)
0217 {
0218 int nleft, nread;
0219 nleft = nbytes;
0220 while ( nleft>0 )
0221 {
0222 nread = read (fd, ptr, nleft);
0223 if ( nread < 0 )
0224 return nread;
0225 else if (nread == 0)
0226 break;
0227 nleft -= nread;
0228 ptr += nread;
0229 }
0230 return (nbytes-nleft);
0231 }
0232
0233
0234 int oamlBuffer::writen (int fd, char *ptr, int nbytes)
0235 {
0236 int nleft, nwritten;
0237 nleft = nbytes;
0238 while ( nleft>0 )
0239 {
0240 nwritten = write (fd, ptr, nleft);
0241 if ( nwritten < 0 )
0242 return nwritten;
0243
0244 nleft -= nwritten;
0245 ptr += nwritten;
0246 }
0247 return (nbytes-nleft);
0248 }