Back to home page

sPhenix code displayed by LXR

 
 

    


File indexing completed on 2025-08-03 08:20:53

0001 //
0002 // rcdaqeventIterator   mlp 4/19/1997
0003 //
0004 // this iterator reads events froma data file. 
0005 
0006 #define MONITORINGPORT 9930
0007 
0008 #include "rcdaqEventiterator.h"
0009 #include <stdio.h>
0010 #include <stdlib.h>
0011 
0012 #include <iostream>
0013 #include <stdlib.h>
0014 
0015 #include "oncsBuffer.h"
0016 #include "gzbuffer.h"
0017 #include "lzobuffer.h"
0018 #include "Event.h"
0019 
0020 #include <stddef.h>
0021 #include <string.h>
0022 #include <sys/types.h>
0023 #include <sys/stat.h>
0024 #include <fcntl.h>
0025 #include <unistd.h>
0026 
0027 
0028 #include <netdb.h>
0029 #include <netinet/in.h>
0030 #include <sys/types.h>
0031 #include <sys/socket.h>
0032 
0033 
0034 using namespace std;
0035 
0036 
0037 // there are two similar constructors, one with just the
0038 // filename, the other with an additional status value
0039 // which is non-zero on return if anything goes wrong. 
0040 
0041 rcdaqEventiterator::~rcdaqEventiterator()
0042 {
0043      if (_sockfd) close (_sockfd);
0044      if (bp != NULL ) delete [] bp;
0045      if (bptr != NULL ) delete bptr;
0046 }  
0047 
0048 
0049 rcdaqEventiterator::rcdaqEventiterator()
0050 {
0051   string host = "localhost";
0052     
0053   if ( getenv("RCDAQHOST")  )
0054     {
0055       host = getenv("RCDAQHOST");
0056     }
0057 
0058   int status;
0059   setup (host.c_str(), status);
0060 }  
0061 
0062 rcdaqEventiterator::rcdaqEventiterator(const char *ip)
0063 {
0064   int status;
0065   setup (ip, status);
0066 }  
0067 
0068 rcdaqEventiterator::rcdaqEventiterator(const char *ip, int &status)
0069 {
0070   setup (ip, status);
0071 }  
0072 
0073 
0074 int rcdaqEventiterator::setup(const char *ip, int &status)
0075 {
0076   _defunct = 0;
0077 
0078 
0079   // go through some hoops to extract the server id from a host string like "ebdc00:1"
0080   _serverid = 0;
0081   string host;
0082   string serveridstring;
0083   
0084   string s = ip;
0085   if ( s.find(":") == s.npos)
0086     {
0087       host = s;
0088     }
0089   else
0090     {
0091       host = s.substr(0, s.find(":"));
0092       serveridstring = s.substr(s.find(":")+1, s.npos);
0093       try
0094     {
0095       _serverid = std::stoi(serveridstring);
0096     } catch (const std::invalid_argument& e)
0097     {
0098       cerr << "Invalid server is: " << serveridstring << endl;
0099       _defunct = 1;
0100       status = -2;
0101       return -3;
0102     }
0103       
0104     }
0105   
0106   struct hostent *p_host;
0107   p_host = gethostbyname(host.c_str());
0108   
0109   //  std::cout << __FILE__ << " " << __LINE__ << "  " << ip << "  " << p_host->h_name << "  " << p_host->h_addr << std::endl;
0110 
0111   
0112   if ( ! p_host ) 
0113     {
0114       status = -2;
0115       _defunct = 1;
0116       return -2;
0117     }
0118 
0119   //  std::cout << p_host->h_name << "  " << p_host->h_addr << std::endl;
0120 
0121   bptr = 0;
0122   bp = 0;
0123   allocatedsize = 0;
0124   _theIP = p_host->h_name;
0125   status = 0;
0126   last_read_status = 0;
0127   current_index = 0;
0128 
0129   memset((char *) &server, 0, sizeof(server));
0130   server.sin_family = AF_INET;
0131   bcopy(p_host->h_addr, &(server.sin_addr.s_addr), p_host->h_length);
0132   server.sin_port = htons(MONITORINGPORT + _serverid);
0133 
0134   return 0;
0135 }  
0136 
0137 void rcdaqEventiterator::identify (OSTREAM &os) const
0138 { 
0139   os << getIdTag();
0140   if ( _defunct ) os << " *** defunct";
0141   os << std::endl;
0142 
0143 };
0144 
0145 const char * rcdaqEventiterator::getIdTag () const
0146 { 
0147   static char line[512];
0148   char x[512];
0149   
0150   strcpy (line, " -- rcdaqEventiterator reading from ");
0151   strcat (line, _theIP.c_str());
0152   if (_serverid)
0153     {
0154       sprintf (x, " Serverid: %d", _serverid);
0155       strcat (line, x );
0156     }
0157   return line;
0158 };
0159 
0160 
0161 // and, finally, the only non-constructor member function to
0162 // retrieve events from the iterator.
0163 
0164 Event * rcdaqEventiterator::getNextEvent()
0165 {
0166   if ( _defunct ) return 0;
0167   Event *evt = 0;
0168 
0169   // if we had a read error before, we just return
0170   if (last_read_status) return NULL;
0171 
0172   // see if we have a buffer to read
0173   if (bptr == 0) 
0174     {
0175       if ( (last_read_status = read_next_buffer()) !=0 )
0176     {
0177       return NULL;
0178     }
0179     }
0180 
0181   while (last_read_status == 0)
0182     {
0183       if (bptr) evt =  bptr->getEvent();
0184       if (evt) return evt;
0185 
0186       last_read_status = read_next_buffer();
0187     }
0188 
0189   return NULL;
0190 
0191 }
0192 
0193 // -----------------------------------------------------
0194 // this is a private function to read the next buffer
0195 // if needed. 
0196 
0197 int rcdaqEventiterator::read_next_buffer()
0198 {
0199   if (bptr) 
0200     {
0201       delete bptr;
0202       bptr = 0;
0203     }
0204 
0205   _sockfd  = socket(AF_INET, SOCK_STREAM, 0);
0206   if ( _sockfd < 0) return 0;
0207 
0208   if ( connect(_sockfd, (struct sockaddr*) &server, sizeof(server)) < 0 ) 
0209     {
0210       //std::cout << "error in connect" << std::endl;
0211       close (_sockfd);
0212       usleep(1000);  // we just slow down a bit to limit the rate or retries
0213       return 0;
0214     }
0215   
0216   // say that this is our max size
0217   int flag = htonl(512*1024*1024 + 2048);
0218   
0219   int status = writen (_sockfd,(char *)  &flag, 4);
0220   if ( status < 0)
0221     {
0222       close (_sockfd);
0223       return 0;
0224     }
0225 
0226   
0227   int sizetobesent;
0228   status = readn (_sockfd, (char *) &sizetobesent, 4);
0229   if ( status < 0)
0230     {
0231       close (_sockfd);
0232       return 0;
0233     }
0234 
0235   buffer_size = ntohl(sizetobesent);
0236   int i;
0237   if (bp) 
0238     {
0239       if  (buffer_size > allocatedsize*4)
0240     {
0241       delete [] bp;
0242       i = (buffer_size +8191) /8192;
0243       allocatedsize = i * 2048;
0244       bp = new PHDWORD[allocatedsize];
0245     }
0246     }
0247   else
0248     {
0249       i = (buffer_size +8191) /8192;
0250       allocatedsize = i * BUFFERBLOCKSIZE/4;
0251       bp = new PHDWORD[allocatedsize];
0252 
0253     }
0254 
0255   status = readn ( _sockfd, (char *) bp, buffer_size);
0256   if ( status < 0)
0257     {
0258       close (_sockfd);
0259       return 0;
0260     }
0261 
0262   int ackvalue = htonl(101);
0263   writen (_sockfd,(char *)  &ackvalue, 4);
0264   close (_sockfd);
0265   
0266   return buffer::makeBuffer( bp, allocatedsize, &bptr);
0267 
0268 }
0269 
0270 int rcdaqEventiterator::readn (int fd, char *ptr, int nbytes)
0271 {
0272   int nleft, nread;
0273   nleft = nbytes;
0274   while ( nleft>0 )
0275     {
0276       nread = recv (fd, ptr, nleft, MSG_NOSIGNAL);
0277       if ( nread < 0 )
0278     {
0279       return nread;
0280     }
0281       else if (nread == 0) 
0282     break;
0283       nleft -= nread;
0284       ptr += nread;
0285     }
0286   return (nbytes-nleft);
0287 }
0288 
0289 
0290 int rcdaqEventiterator::writen (int fd, char *ptr, int nbytes)
0291 {
0292   int nleft, nwritten;
0293   nleft = nbytes;
0294   while ( nleft>0 )
0295     {
0296       nwritten = send (fd, ptr, nleft, MSG_NOSIGNAL);
0297       if ( nwritten < 0 )
0298     {
0299       return nwritten;
0300     }
0301       nleft -= nwritten;
0302       ptr += nwritten;
0303     }
0304   return (nbytes-nleft);
0305 }