Back to home page

sPhenix code displayed by LXR

 
 

    


File indexing completed on 2025-08-02 08:21:08

0001 
0002 #include <string.h>
0003 #include <sys/types.h>
0004 #include <sys/stat.h>
0005 #include <fcntl.h>
0006 
0007 #include <sys/time.h>
0008 #include <unistd.h>
0009 #include <stdlib.h>
0010 #include <errno.h>
0011 #include <unistd.h>
0012 #include <signal.h>
0013 #include <dlfcn.h>
0014 #include <sys/ipc.h>
0015 
0016 #include <pthread.h>
0017 
0018 #ifdef HAVE_BSTRING_H
0019 #include <bstring.h>
0020 #else
0021 #include <strings.h>
0022 #endif
0023 
0024 #include <sys/time.h>
0025 
0026 #include <sys/socket.h>
0027 #include <net/if.h>
0028 #include <netdb.h> 
0029 
0030 #include <sys/wait.h>
0031 #include <sys/resource.h>
0032 
0033 
0034 #include <sys/types.h>
0035 
0036 #ifdef SunOS
0037 #include <sys/filio.h>
0038 #endif
0039 
0040 #include <sys/stat.h>
0041 
0042 #include <netinet/in.h>
0043 #include <arpa/inet.h>
0044 #include <netdb.h>
0045 #include <sys/ioctl.h>
0046 
0047 #include <stdio.h>
0048 #include <iostream>
0049 #include <iomanip>
0050 
0051 #ifdef HAVE_GETOPT_H
0052 #include <getopt.h>
0053 #endif
0054 
0055 
0056 typedef unsigned int PHDWORD;
0057 typedef unsigned short SWORD;
0058 typedef unsigned char BYTE;
0059 typedef unsigned int UINT;
0060 
0061 #define BUFFERBLOCKSIZE 8192U
0062 
0063 
0064 #define CTRL_BEGINRUN        1
0065 #define CTRL_ENDRUN          2
0066 #define CTRL_DATA            3
0067 #define CTRL_CLOSE           4
0068 #define CTRL_SENDFILENAME    5
0069 #define CTRL_ROLLOVER        6
0070 
0071 #define CTRL_REMOTESUCCESS 100
0072 #define CTRL_REMOTEFAIL    101
0073 
0074 #define ROLE_RECEIVED 0
0075 #define ROLE_WRITTEN  1
0076 
0077 
0078 
0079 // initial size 16 Mbytes ( /4 in int units)
0080 #define INITIAL_SIZE (4*1024*1024)
0081 
0082 
0083 typedef struct 
0084 {
0085   int dirty;
0086   int role;
0087   int bytecount;
0088   int buffersize;
0089   PHDWORD *bf;
0090 } bufferstructure;
0091 
0092 int run_number = 0;
0093 int verbose = 0;
0094 int sockfd = 0;
0095 int dd_fd = 0;
0096 
0097 
0098 pthread_mutex_t M_cout;
0099 
0100 pthread_mutex_t M_write;
0101 pthread_mutex_t M_done;
0102 
0103 
0104 pthread_t ThreadId;
0105 pthread_t       tid;
0106 int output_fd = -1;
0107 
0108 int the_port = 5001;
0109 
0110 int do_not_write = 0;
0111 
0112 int RunIsActive = 0; 
0113 int NumberWritten = 0; 
0114 int file_open = 0;
0115 
0116 
0117 int readn(int , char *, int);
0118 int writen(int , char *, int);
0119 
0120 #if defined(SunOS) || defined(Linux) 
0121 void sig_handler(int);
0122 #else
0123 void sig_handler(...);
0124 #endif
0125 
0126 void *writebuffers ( void * arg);
0127 int handle_this_child( pid_t pid, const std::string &host);
0128 in_addr_t find_address_from_interface(const char *);
0129 
0130 void cleanup(const int exitstatus);
0131 
0132 
0133 bufferstructure *bf_being_received;
0134 bufferstructure *bf_being_written;
0135  
0136 
0137 using namespace std;
0138 
0139 
0140 void exitmsg()
0141 {
0142   cout << "** This is the Super Fast Server :-)." << endl;
0143   cout << "** usage: sfs " << endl;
0144   cout << "   -d disable database logging [ db not yet implemented ]" << endl;
0145   cout << "   -b interface    bind only to this interface" << endl;
0146   cout << "   -p number       use this port (default 5001)" << endl;
0147   cout << "   -v increase verbosity" << endl;
0148   cout << "   -x do not write data to disk (testing)" << endl;
0149   cout << "  Examples:" << endl;
0150   cout << "    sfs -b ens801f0      -- listen only on that interface" << endl;
0151   cout << "    sfs -p 5002          -- listen on port 5002" << endl;
0152   
0153   exit(0);
0154 }
0155 
0156 
0157 //char *s_opcode[] = {
0158 //          "Invalid code",
0159 //          "CTRL_BEGINRUN",
0160 //          "CTRL_ENDRUN",
0161 //          "CTRL_DATA",
0162 //          "CTRL_CLOSE",
0163 //          "CTRL_SENDFILENAME"};
0164 
0165 std::string listen_interface;
0166 
0167 int main( int argc, char* argv[])
0168 {
0169 
0170 
0171 #if defined(SunOS) || defined(Linux) 
0172   struct sockaddr client_addr;
0173 #else
0174   struct sockaddr_in client_addr;
0175 #endif
0176   struct sockaddr_in server_addr;
0177 
0178 #if defined(SunOS)
0179   int client_addr_len = sizeof(client_addr);
0180 #else
0181   unsigned int client_addr_len = sizeof(client_addr);
0182 #endif
0183 
0184   pthread_mutex_init(&M_cout, 0); 
0185   pthread_mutex_init(&M_write, 0); 
0186   pthread_mutex_init(&M_done, 0); 
0187 
0188   pthread_mutex_lock(&M_write);
0189 
0190   // by default, we listen on all interfaces
0191   in_addr_t listen_address = htonl(INADDR_ANY);  
0192 
0193   char c;
0194   
0195   while ((c = getopt(argc, argv, "hvdxb:p:")) != EOF)
0196     {
0197       switch (c) 
0198     {
0199 
0200     case 'v':   // verbose
0201       verbose++;
0202       break;
0203 
0204     case 'h': 
0205       exitmsg();
0206       break;
0207 
0208     case 'b':   // bind to this interface
0209       listen_address = find_address_from_interface(optarg);
0210       listen_interface = optarg;
0211       break;
0212 
0213     case 'p':   // port number
0214       if ( !sscanf(optarg, "%d", &the_port) ) exitmsg();
0215       break;
0216 
0217     case 'd':   // no database
0218       // databaseflag=1;
0219       // cout << "database access enabled" << endl;
0220       break;
0221 
0222     case 'x':   // no writing
0223       do_not_write = 1;
0224       break;
0225 
0226 
0227     }
0228     }
0229 
0230 
0231 
0232   if (argc > optind)
0233     {
0234       sscanf (argv[optind],"%d", &the_port);
0235     }
0236   
0237   // ------------------------
0238   // now set up the sockets
0239 
0240   if ( (sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0 ) 
0241     {
0242       cleanup(1);
0243     }
0244   //  int xs = 64*1024+21845;
0245   int xs = 1024*1024;
0246 
0247   int s = setsockopt(sockfd, SOL_SOCKET, SO_RCVBUF,
0248              &xs, sizeof(xs));
0249 
0250   if (s)
0251     {
0252       perror("Setsockopt:");
0253     }
0254 
0255   memset( &server_addr, 0, sizeof(server_addr));
0256   server_addr.sin_family = AF_INET;
0257   server_addr.sin_addr.s_addr = listen_address;
0258   server_addr.sin_port = htons(the_port);
0259 
0260   int i;
0261   
0262   if ( ( i = bind(sockfd, (struct sockaddr*) &server_addr, sizeof(server_addr))) < 0 )
0263     {
0264       perror(" bind ");
0265       cleanup(1);
0266     }
0267 
0268   if ( ( i =  listen(sockfd, 100) ) )
0269     {
0270       perror(" listen ");
0271       cleanup(1);
0272     }
0273 
0274   if (verbose)
0275     {
0276       cout << " listening on port " << the_port;
0277       if (! listen_interface.empty() ) cout << " on interface " << listen_interface;
0278       cout << endl; 
0279     }
0280   
0281   signal(SIGCHLD, SIG_IGN); 
0282 
0283   pid_t pid;
0284   struct sockaddr_in out;
0285 
0286   std::string host;
0287   
0288   while  (sockfd > 0)
0289     {
0290             
0291       client_addr_len = sizeof(out); 
0292       dd_fd = accept(sockfd,  (struct sockaddr *) &out, &client_addr_len);
0293       if ( dd_fd < 0 ) 
0294     {
0295       pthread_mutex_lock(&M_cout);
0296       cout  << "error in accept socket" << endl;
0297       pthread_mutex_unlock(&M_cout);
0298       perror (" accept" );
0299       cleanup(1);
0300       exit(1);
0301     }
0302 
0303 
0304       char h[512]; 
0305       getnameinfo((struct sockaddr *) &out, sizeof(struct sockaddr_in), h, 511,
0306           NULL, 0, NI_NOFQDN);
0307       host = h;
0308       
0309       if (verbose)
0310     {
0311           cout << " new connection from " << host << " at " << time(0) << endl;
0312     }
0313       
0314       
0315       if ( (pid = fork()) == 0 ) 
0316     {
0317       close(sockfd);
0318       return handle_this_child( pid, host);
0319     }
0320       else
0321     {
0322       close (dd_fd);
0323     }
0324     }
0325 }
0326 
0327 int handle_this_child( pid_t pid, const std::string &host)
0328 {
0329 
0330 
0331   int controlword;
0332   int local_runnr = 0;
0333 
0334   bufferstructure B0;
0335   bufferstructure B1;
0336   
0337   bf_being_received = &B0;
0338   bf_being_written = &B1;
0339   bufferstructure *bf_temp;
0340 
0341   B0.bf = new PHDWORD[INITIAL_SIZE];
0342   B0.buffersize = INITIAL_SIZE;
0343   B0.dirty = 0;
0344   B0.role= ROLE_RECEIVED;
0345   B0.bytecount = 0;
0346               
0347   B1.bf = new PHDWORD[INITIAL_SIZE];
0348   B1.buffersize = INITIAL_SIZE;
0349   B1.dirty = 0;
0350   B1.role= ROLE_RECEIVED;
0351   B1.bytecount = 0;
0352               
0353   int i;
0354   
0355   // we make a thread that will write out our buffers
0356   i = pthread_create(&ThreadId, NULL, 
0357              writebuffers, 
0358              (void *) 0);
0359   if (i )
0360     {
0361       cout << "error in thread create " << i << endl;
0362       perror("Thread ");
0363       cleanup(1);
0364     }
0365   // else
0366   //   {
0367   //     pthread_mutex_lock(&M_cout); 
0368   //     cout << "write thread created" << endl;
0369   //     pthread_mutex_unlock(&M_cout);
0370   //   }
0371     
0372   // should be the default, but set it to blocking mode anyway
0373   i = ioctl (dd_fd,  FIONBIO, 0);
0374     
0375   // find out where we were contacted from
0376     
0377 
0378   int status;
0379   int xx;
0380 
0381   int go_on = 1;
0382   while ( go_on)
0383     {
0384       if ( (status = readn (dd_fd, (char *) &xx, sizeof(xx)) ) <= 0)
0385     {
0386       cout  << "error in read from socket" << endl;
0387       perror ("read " );
0388       cleanup(1);
0389     }
0390     
0391       controlword = ntohl(xx);
0392 
0393       // cout << endl;
0394       // cout << __FILE__ << " " << __LINE__  << " controlword = " << controlword << " ntew: " << xx << " " ;
0395       // if ( controlword >=0 && controlword <=5) cout  << s_opcode[controlword];
0396       // cout << endl;
0397     
0398       char *p;
0399       char filename[1024];
0400       int value, len;
0401       
0402       switch (controlword) 
0403     {
0404 
0405     case  CTRL_SENDFILENAME:
0406       // read the length of what we are about to get 
0407       readn (dd_fd, (char *) &len, sizeof(int));
0408       len  = ntohl(len);
0409       // acknowledge... or not
0410       //cout  << " filename len  = " << len << endl;
0411       if ( len >= 1023)
0412         {
0413           i = htonl(CTRL_REMOTEFAIL);
0414           writen (dd_fd, (char *)&i, sizeof(i));
0415           break;
0416         }
0417       value = readn (dd_fd, filename, len);
0418       filename[value] = 0;
0419       //cout  << " filename is " << filename << endl;
0420 
0421           i = htonl(CTRL_REMOTESUCCESS);
0422           writen (dd_fd, (char *)&i, sizeof(i));
0423       
0424       break;
0425       
0426     case  CTRL_ROLLOVER:
0427 
0428       // after we receive CTRL_ROLLOVER, we get
0429       // - the length of the filename
0430       // the actual filename
0431       // then we send back the status
0432       
0433       // read the length of what we are about to get 
0434       readn (dd_fd, (char *) &len, sizeof(int));
0435       len  = ntohl(len);
0436       // acknowledge... or not
0437       //      cout << __FILE__ << " " << __LINE__ <<  " filename len  = " << len << endl;
0438       if ( len >= 1023)
0439         {
0440           i = htonl(CTRL_REMOTEFAIL);
0441           writen (dd_fd, (char *)&i, sizeof(i));
0442           break;
0443         }
0444       value = readn (dd_fd, filename, len);
0445       filename[value] = 0;
0446       // cout << __FILE__ << " " << __LINE__  << " filename is " << filename << endl;
0447 
0448       if (! do_not_write)
0449         {
0450           close ( output_fd);
0451           output_fd =  open(filename,  O_WRONLY | O_CREAT | O_TRUNC | O_EXCL | O_LARGEFILE , 
0452                 S_IRWXU | S_IROTH | S_IRGRP );
0453           if (output_fd < 0) 
0454         {
0455           cerr << "file " << filename << " exists, I will not overwrite " << endl;
0456           i = htonl(CTRL_REMOTEFAIL);
0457           writen (dd_fd, (char *)&i, sizeof(i));
0458           break;
0459         }
0460           if (verbose)
0461         {
0462           cout << " opened new rollover file " << filename << endl; 
0463         }
0464         }
0465 
0466       i = htonl(CTRL_REMOTESUCCESS);
0467       writen (dd_fd, (char *)&i, sizeof(i));
0468       
0469       break;
0470       
0471     case  CTRL_BEGINRUN:
0472       
0473       readn (dd_fd, (char *) &local_runnr, sizeof(local_runnr) );
0474       local_runnr = ntohl(local_runnr);
0475       //cout  << " runnumber = " << local_runnr << endl;
0476 
0477       if (! do_not_write)
0478         {
0479           output_fd =  open(filename,  O_WRONLY | O_CREAT | O_TRUNC | O_EXCL | O_LARGEFILE , 
0480                 S_IRWXU | S_IROTH | S_IRGRP );
0481           if (output_fd < 0) 
0482         {
0483           cerr << "file " << filename << " exists, I will not overwrite " << endl;
0484           i = htonl(CTRL_REMOTEFAIL);
0485           writen (dd_fd, (char *)&i, sizeof(i));
0486           break;
0487         }
0488           if (verbose)
0489         {
0490           cout << " opened new file " << filename << endl; 
0491         }
0492         }
0493 
0494       bf_being_received = &B0;
0495       bf_being_written = &B1;
0496         
0497       bf_being_received->role = ROLE_RECEIVED;
0498       bf_being_received->dirty=0;
0499         
0500       bf_being_written->role = ROLE_WRITTEN;
0501       bf_being_written->dirty = 0;
0502         
0503       i = htonl(CTRL_REMOTESUCCESS);
0504       writen (dd_fd, (char *)&i, sizeof(i));
0505       
0506       break;
0507       
0508 
0509 
0510 
0511 
0512 
0513     case  CTRL_DATA:
0514       status = readn (dd_fd, (char *) &len, sizeof(i));
0515       len = ntohl(len);
0516       //cout  << " data! len = " << len << endl;
0517       
0518       bf_being_received->bytecount = len;
0519       if ( (len+sizeof(int)-1)/sizeof(int) > bf_being_received->buffersize)
0520         { 
0521           delete [] bf_being_received->bf;
0522           bf_being_received->buffersize = (len + sizeof(int)-1)/sizeof(int) + 2048;
0523           
0524           pthread_mutex_lock(&M_cout);
0525           //          cout << "expanding buffer to " << bf_being_received->buffersize << " for host " << host << endl;
0526           pthread_mutex_unlock(&M_cout);
0527           
0528           bf_being_received->bf = new PHDWORD[ bf_being_received->buffersize];
0529         }
0530       
0531       p = (char *) bf_being_received->bf;
0532       status = readn (dd_fd, p,   len );
0533       if ( len != status)
0534         {
0535           
0536           pthread_mutex_lock(&M_cout);
0537           //          cout << "error on transfer: Expected " << len 
0538           //       << " got status  " << status  << "  for host " << host << endl;
0539           perror("ctrl_data");
0540           pthread_mutex_unlock(&M_cout);
0541           i = htonl(CTRL_REMOTEFAIL);
0542         }
0543       // store the actual byte length we received
0544       bf_being_received->bytecount = len;
0545       bf_being_received->dirty = 1;
0546  
0547       //  cout << __FILE__ << " " << __LINE__ << " waiting for write thread " << endl;
0548       pthread_mutex_lock(&M_done);
0549       // and switch the buffers
0550       bf_temp = bf_being_received;
0551       bf_being_received = bf_being_written;
0552       bf_being_written = bf_temp;
0553       bf_being_written->role = ROLE_WRITTEN;
0554       bf_being_received->role = ROLE_RECEIVED;
0555           
0556       //      cout << __FILE__ << " " << __LINE__ << " switching buffers " << bf_being_received << "  " << bf_being_received << endl;
0557       pthread_mutex_unlock(&M_write);
0558       
0559       i = htonl(CTRL_REMOTESUCCESS);
0560       writen (dd_fd, (char *)&i, sizeof(int));
0561       
0562       break;
0563 
0564     case  CTRL_ENDRUN:
0565       //cout  << " endrun signal " << endl;
0566       
0567       pthread_mutex_lock(&M_done);
0568       if (! do_not_write)
0569         {
0570           close (output_fd);
0571           if (verbose)
0572         {
0573           cout << " closed file "  << filename << " at " << time(0) << endl;
0574         }
0575         }
0576 
0577       i = htonl(CTRL_REMOTESUCCESS);
0578       writen (dd_fd, (char *)&i, sizeof(i));
0579       pthread_mutex_unlock(&M_done);
0580 
0581       break;
0582       
0583     case CTRL_CLOSE:
0584       close ( dd_fd);
0585 
0586       // we set go_on to 0 so our loop stops and we return 
0587       go_on = 0;
0588       if (verbose == 1)
0589         {
0590           cout << " closed connection from " << host << endl; 
0591         }
0592       else if (verbose > 1)
0593         {
0594           cout << " closed connection from "  << host << " at " << time(0) << endl;
0595         }
0596       break;
0597       
0598     }
0599           
0600     }
0601   
0602   if (verbose > 1) cout << " ending thread " << " at " << time(0) << endl;
0603   
0604   return 0;
0605 }
0606 
0607 
0608 
0609 void *writebuffers ( void * arg)
0610 {
0611 
0612   while(1)
0613     {
0614 
0615       pthread_mutex_lock(&M_write);
0616 
0617       if (! do_not_write)
0618     {
0619       pthread_mutex_lock(&M_cout);
0620       //cout << __LINE__ << "  " << __FILE__ << " write thread unlocked  " <<  endl;
0621       pthread_mutex_unlock(&M_cout);
0622       
0623       int blockcount = ( bf_being_written->bytecount + BUFFERBLOCKSIZE -1)/BUFFERBLOCKSIZE;
0624       int bytecount = blockcount*BUFFERBLOCKSIZE;
0625       
0626       pthread_mutex_lock(&M_cout);
0627       //cout << __LINE__ << "  " << __FILE__ << " write thread unlocked, block count " << blockcount <<  endl;
0628       pthread_mutex_unlock(&M_cout);
0629       
0630       int bytes = writen ( output_fd, (char *) bf_being_written->bf , bytecount );
0631       if ( bytes != bytecount)
0632         {
0633           pthread_mutex_lock(&M_cout);
0634           cout << __LINE__ << "  " << __FILE__ << " write error " << bytes << "  " << bytecount <<  endl;
0635           pthread_mutex_unlock(&M_cout);
0636           bf_being_written->dirty = -1;  // mark as "error"
0637         }
0638       else
0639         {
0640           //      usleep(1000000);
0641           bf_being_written->dirty = 0;
0642         }
0643     }
0644       else
0645     { 
0646       bf_being_written->dirty = 0;
0647     }
0648       pthread_mutex_unlock(&M_done);
0649       
0650     }
0651   return 0;
0652 }
0653 
0654 
0655 
0656 
0657 int readn (int fd, char *ptr, int nbytes)
0658 {
0659 
0660   int nread, nleft;
0661   //int nleft, nread;
0662   nleft = nbytes;
0663   while ( nleft>0 )
0664     {
0665       nread = read (fd, ptr, nleft);
0666       if ( nread <= 0 ) 
0667     {
0668       return nread;
0669     }
0670 
0671 #ifdef FRAGMENTMONITORING
0672       history[hpos++] = nread;
0673 #endif
0674       nleft -= nread;
0675       ptr += nread;
0676     }
0677 
0678 #ifdef FRAGMENTMONITORING
0679   if ( hpos >1 ) 
0680     {
0681       cout << "Fragmented transfer of " << nbytes << "bytes: ";
0682       for ( int i=0; i<hpos; i++)
0683     {
0684       cout << " " << history[i]<< ",";
0685     }
0686       cout << endl;
0687     }
0688 #endif
0689   return (nbytes-nleft);
0690 }
0691   
0692 int writen (int fd, char *ptr, int nbytes)
0693 {
0694   int nleft, nwritten;
0695   nleft = nbytes;
0696   while ( nleft>0 )
0697     {
0698       nwritten = write (fd, ptr, nleft);
0699       if ( nwritten < 0 ) 
0700     return nwritten;
0701 
0702       nleft -= nwritten;
0703       ptr += nwritten;
0704     }
0705   return (nbytes-nleft);
0706 }
0707 
0708 
0709 in_addr_t find_address_from_interface(const char *interface)
0710 {
0711   int fd;
0712   struct ifreq ifr;
0713 
0714   // check the no one plays tricks with us...
0715   if ( strlen(interface) >= IFNAMSIZ)
0716     {
0717       cerr << " Interface name too long, ignoring "<< endl;
0718       return htonl(INADDR_ANY);
0719     }
0720   
0721   fd = socket(AF_INET, SOCK_DGRAM, 0);
0722 
0723   // I want to find an IPv4 IP address
0724   ifr.ifr_addr.sa_family = AF_INET;
0725 
0726   // I want IP address attached to "interface"
0727   strncpy(ifr.ifr_name, interface, IFNAMSIZ-1);
0728 
0729   ioctl(fd, SIOCGIFADDR, &ifr);
0730 
0731   close(fd);
0732   in_addr_t a =  ((struct sockaddr_in *)&ifr.ifr_addr)->sin_addr.s_addr;
0733 
0734   if (verbose)  cout << "  binding to address " << inet_ntoa(  ((struct sockaddr_in *)&ifr.ifr_addr)->sin_addr ) << endl;
0735   return a;
0736 }
0737 
0738 
0739 
0740 void cleanup( const int exitstatus)
0741 {
0742   close (dd_fd);
0743 
0744   exit(exitstatus);
0745 }
0746