File indexing completed on 2025-08-03 08:20:53
0001
0002
0003
0004
0005
0006 #define MONITORINGPORT 9930
0007
0008 #include "rcdaqEventiterator.h"
0009 #include <stdio.h>
0010 #include <stdlib.h>
0011
0012 #include <iostream>
0013 #include <stdlib.h>
0014
0015 #include "oncsBuffer.h"
0016 #include "gzbuffer.h"
0017 #include "lzobuffer.h"
0018 #include "Event.h"
0019
0020 #include <stddef.h>
0021 #include <string.h>
0022 #include <sys/types.h>
0023 #include <sys/stat.h>
0024 #include <fcntl.h>
0025 #include <unistd.h>
0026
0027
0028 #include <netdb.h>
0029 #include <netinet/in.h>
0030 #include <sys/types.h>
0031 #include <sys/socket.h>
0032
0033
0034 using namespace std;
0035
0036
0037
0038
0039
0040
0041 rcdaqEventiterator::~rcdaqEventiterator()
0042 {
0043 if (_sockfd) close (_sockfd);
0044 if (bp != NULL ) delete [] bp;
0045 if (bptr != NULL ) delete bptr;
0046 }
0047
0048
0049 rcdaqEventiterator::rcdaqEventiterator()
0050 {
0051 string host = "localhost";
0052
0053 if ( getenv("RCDAQHOST") )
0054 {
0055 host = getenv("RCDAQHOST");
0056 }
0057
0058 int status;
0059 setup (host.c_str(), status);
0060 }
0061
0062 rcdaqEventiterator::rcdaqEventiterator(const char *ip)
0063 {
0064 int status;
0065 setup (ip, status);
0066 }
0067
0068 rcdaqEventiterator::rcdaqEventiterator(const char *ip, int &status)
0069 {
0070 setup (ip, status);
0071 }
0072
0073
0074 int rcdaqEventiterator::setup(const char *ip, int &status)
0075 {
0076 _defunct = 0;
0077
0078
0079
0080 _serverid = 0;
0081 string host;
0082 string serveridstring;
0083
0084 string s = ip;
0085 if ( s.find(":") == s.npos)
0086 {
0087 host = s;
0088 }
0089 else
0090 {
0091 host = s.substr(0, s.find(":"));
0092 serveridstring = s.substr(s.find(":")+1, s.npos);
0093 try
0094 {
0095 _serverid = std::stoi(serveridstring);
0096 } catch (const std::invalid_argument& e)
0097 {
0098 cerr << "Invalid server is: " << serveridstring << endl;
0099 _defunct = 1;
0100 status = -2;
0101 return -3;
0102 }
0103
0104 }
0105
0106 struct hostent *p_host;
0107 p_host = gethostbyname(host.c_str());
0108
0109
0110
0111
0112 if ( ! p_host )
0113 {
0114 status = -2;
0115 _defunct = 1;
0116 return -2;
0117 }
0118
0119
0120
0121 bptr = 0;
0122 bp = 0;
0123 allocatedsize = 0;
0124 _theIP = p_host->h_name;
0125 status = 0;
0126 last_read_status = 0;
0127 current_index = 0;
0128
0129 memset((char *) &server, 0, sizeof(server));
0130 server.sin_family = AF_INET;
0131 bcopy(p_host->h_addr, &(server.sin_addr.s_addr), p_host->h_length);
0132 server.sin_port = htons(MONITORINGPORT + _serverid);
0133
0134 return 0;
0135 }
0136
0137 void rcdaqEventiterator::identify (OSTREAM &os) const
0138 {
0139 os << getIdTag();
0140 if ( _defunct ) os << " *** defunct";
0141 os << std::endl;
0142
0143 };
0144
0145 const char * rcdaqEventiterator::getIdTag () const
0146 {
0147 static char line[512];
0148 char x[512];
0149
0150 strcpy (line, " -- rcdaqEventiterator reading from ");
0151 strcat (line, _theIP.c_str());
0152 if (_serverid)
0153 {
0154 sprintf (x, " Serverid: %d", _serverid);
0155 strcat (line, x );
0156 }
0157 return line;
0158 };
0159
0160
0161
0162
0163
0164 Event * rcdaqEventiterator::getNextEvent()
0165 {
0166 if ( _defunct ) return 0;
0167 Event *evt = 0;
0168
0169
0170 if (last_read_status) return NULL;
0171
0172
0173 if (bptr == 0)
0174 {
0175 if ( (last_read_status = read_next_buffer()) !=0 )
0176 {
0177 return NULL;
0178 }
0179 }
0180
0181 while (last_read_status == 0)
0182 {
0183 if (bptr) evt = bptr->getEvent();
0184 if (evt) return evt;
0185
0186 last_read_status = read_next_buffer();
0187 }
0188
0189 return NULL;
0190
0191 }
0192
0193
0194
0195
0196
0197 int rcdaqEventiterator::read_next_buffer()
0198 {
0199 if (bptr)
0200 {
0201 delete bptr;
0202 bptr = 0;
0203 }
0204
0205 _sockfd = socket(AF_INET, SOCK_STREAM, 0);
0206 if ( _sockfd < 0) return 0;
0207
0208 if ( connect(_sockfd, (struct sockaddr*) &server, sizeof(server)) < 0 )
0209 {
0210
0211 close (_sockfd);
0212 usleep(1000);
0213 return 0;
0214 }
0215
0216
0217 int flag = htonl(512*1024*1024 + 2048);
0218
0219 int status = writen (_sockfd,(char *) &flag, 4);
0220 if ( status < 0)
0221 {
0222 close (_sockfd);
0223 return 0;
0224 }
0225
0226
0227 int sizetobesent;
0228 status = readn (_sockfd, (char *) &sizetobesent, 4);
0229 if ( status < 0)
0230 {
0231 close (_sockfd);
0232 return 0;
0233 }
0234
0235 buffer_size = ntohl(sizetobesent);
0236 int i;
0237 if (bp)
0238 {
0239 if (buffer_size > allocatedsize*4)
0240 {
0241 delete [] bp;
0242 i = (buffer_size +8191) /8192;
0243 allocatedsize = i * 2048;
0244 bp = new PHDWORD[allocatedsize];
0245 }
0246 }
0247 else
0248 {
0249 i = (buffer_size +8191) /8192;
0250 allocatedsize = i * BUFFERBLOCKSIZE/4;
0251 bp = new PHDWORD[allocatedsize];
0252
0253 }
0254
0255 status = readn ( _sockfd, (char *) bp, buffer_size);
0256 if ( status < 0)
0257 {
0258 close (_sockfd);
0259 return 0;
0260 }
0261
0262 int ackvalue = htonl(101);
0263 writen (_sockfd,(char *) &ackvalue, 4);
0264 close (_sockfd);
0265
0266 return buffer::makeBuffer( bp, allocatedsize, &bptr);
0267
0268 }
0269
0270 int rcdaqEventiterator::readn (int fd, char *ptr, int nbytes)
0271 {
0272 int nleft, nread;
0273 nleft = nbytes;
0274 while ( nleft>0 )
0275 {
0276 nread = recv (fd, ptr, nleft, MSG_NOSIGNAL);
0277 if ( nread < 0 )
0278 {
0279 return nread;
0280 }
0281 else if (nread == 0)
0282 break;
0283 nleft -= nread;
0284 ptr += nread;
0285 }
0286 return (nbytes-nleft);
0287 }
0288
0289
0290 int rcdaqEventiterator::writen (int fd, char *ptr, int nbytes)
0291 {
0292 int nleft, nwritten;
0293 nleft = nbytes;
0294 while ( nleft>0 )
0295 {
0296 nwritten = send (fd, ptr, nleft, MSG_NOSIGNAL);
0297 if ( nwritten < 0 )
0298 {
0299 return nwritten;
0300 }
0301 nleft -= nwritten;
0302 ptr += nwritten;
0303 }
0304 return (nbytes-nleft);
0305 }