Back to home page

sPhenix code displayed by LXR

 
 

    


File indexing completed on 2025-08-02 08:21:07

0001 //#define WRITEPRDF
0002 
0003 #include <stdio.h>
0004 #include <unistd.h>
0005 #include <fcntl.h>
0006 
0007 #include <stdlib.h>
0008 #include <sys/types.h>
0009 #include <sys/stat.h>
0010 #include <signal.h>
0011 #include <limits.h>
0012 #include <errno.h>
0013 #include <sys/stat.h>
0014 #include <sys/file.h>
0015 
0016 #include <pthread.h>
0017 
0018 #include <iostream>
0019 #include <iomanip>
0020 #include <sstream>
0021 
0022 #include <errno.h>
0023 #include <string.h>
0024 #include <unistd.h>
0025 #include <time.h>
0026 #include <sys/types.h>
0027 #include <sys/socket.h>
0028 #include <arpa/inet.h>
0029 #include <netinet/if_ether.h>
0030 #include <netinet/in.h>
0031 #include <netinet/ip.h>
0032 #include <net/if.h>
0033 #include <sys/ioctl.h>
0034 #include <netpacket/packet.h>
0035 #include <sys/socket.h>
0036 #include <netdb.h>
0037 
0038 #include <vector>
0039 #include <map>
0040 #include <queue>
0041 
0042 
0043 
0044 #include "getopt.h"
0045 
0046 #include "daq_device.h"
0047 #include "daqBuffer.h"
0048 #include "eloghandler.h" 
0049 #include "TriggerHandler.h" 
0050 
0051 #include "rcdaq.h"
0052 #include "rcdaq_rpc.h"
0053 #include "md5.h"
0054 
0055 #ifdef HAVE_MOSQUITTO_H
0056 #include "MQTTConnection.h"
0057 #endif
0058 
0059 
0060 
0061 int open_file_on_server(const int run_number);
0062 int server_open_Connection();
0063 int open_serverSocket(const char * host_name, const int port);
0064 int server_send_beginrun_sequence(const char * filename, const int runnumber, int fd);
0065 int server_send_rollover_sequence(const char * filename, int fd);
0066 int server_send_endrun_sequence(int fd);
0067 int server_send_close_sequence(int fd);
0068 
0069 void * mg_server (void *arg);
0070 int mg_end();
0071 int request_mg_update(const int what);
0072 
0073 
0074 pthread_mutex_t WriteSem;
0075 pthread_mutex_t WriteProtectSem;
0076 
0077 pthread_mutex_t MonitoringRequestSem;
0078 pthread_mutex_t SendSem;
0079 pthread_mutex_t SendProtectSem;
0080 
0081 pthread_mutex_t FdManagementSem;
0082 
0083 char pidfilename[128];
0084 char daq_event_env_string[128];
0085 
0086 // those are the "todo" definitions. DAQ can be woken up by a trigger
0087 // and read something, or by a command and change its status.
0088 
0089 int servernumber = 0;
0090 
0091 int uservalues[8] = {0};
0092 
0093 #define DAQ_TRIGGER 0x01
0094 #define DAQ_COMMAND 0x02
0095 #define DAQ_SPECIAL 0x04
0096 
0097 // now there are a few commands which the DAQ process obeys.
0098 
0099 #define COMMAND_INIT    1
0100 #define COMMAND_BEGIN   2
0101 #define COMMAND_END     3
0102 #define COMMAND_FINISH  4
0103 #define COMMAND_OPENP   5
0104 
0105 // now there are a few actions for the DAQ process
0106 // when DAQ-triggered 
0107 #define DAQ_INIT    1
0108 #define DAQ_READ    2
0109 
0110 using namespace std;
0111 
0112 static std::map<string,string> RunTypes;
0113 
0114 static std::string TheFileRule = "rcdaq-%08d-%04d.evt";
0115 static std::string TheRunType = " ";
0116 static std::string CurrentFilename = "";
0117 static std::string PreviousFilename = "";
0118 
0119 static std::string TheRunnumberfile;
0120 static int RunnumberfileIsSet = 0;
0121 
0122 static std::string TheRunnumberApp;
0123 static int RunnumberAppIsSet = 0;
0124 
0125 static std::string MyName = ""; 
0126 
0127 static int daq_open_flag = 0;  //no files written unless asked for
0128 static int daq_server_flag = 0;  //no server access
0129 static int daq_server_port = 0;  // invalid port
0130 static std::string daq_server_name = "";  // our server, if any
0131 
0132 static int file_is_open = 0;
0133 static int server_is_open = 0;
0134 static int current_filesequence = 0;
0135 static int outfile_fd;
0136 
0137 static int md5_enabled =1;
0138 static int md5_allow_turnoff = 0;
0139 
0140 #ifdef HAVE_MOSQUITTO_H
0141 MQTTConnection *mqtt = 0;
0142 std::string mqtt_host;
0143 int mqtt_port = 0;
0144 #endif
0145 
0146 static md5_state_t md5state;
0147 
0148 
0149 static int RunControlMode = 0;
0150 static int CurrentEventType = 0;
0151 
0152 static ElogHandler *ElogH =0;
0153 static TriggerHandler  *TriggerH =0;
0154 
0155 
0156 typedef std::vector<daq_device *> devicevector;
0157 typedef devicevector::iterator deviceiterator;
0158 
0159 #define MAXEVENTID 32
0160 static int Eventsize[MAXEVENTID];
0161 
0162 // int Daq_Status;
0163 
0164 // #define DAQ_RUNNING  0x01
0165 // #define DAQ_READING  0x02
0166 // #define DAQ_ENDREQUESTED  0x04
0167 // #define DAQ_PROTOCOL 0x10
0168 // #define DAQ_BEGININPROGRESS  0x20
0169 
0170 // the original bit-wise status word manipulation wasn't particular thread-safe.
0171 // upgrading to individual variables (and we ditch "DAQ_READING")
0172 
0173 int  DAQ_RUNNING = 0;
0174 int  DAQ_ENDREQUESTED = 0;
0175 int  DAQ_BEGININPROGRESS =0;
0176 
0177 static daqBuffer Buffer1;
0178 static daqBuffer Buffer2;
0179 
0180 int Trigger_Todo;
0181 int Command_Todo;
0182 
0183 int TheRun = 0;
0184 time_t StartTime = 0;
0185 int Buffer_number;
0186 
0187 int Event_number;
0188 int Event_number_at_last_open = 0;
0189 int Event_number_at_last_write = 0;
0190 int Event_number_at_last_end = 0;
0191 
0192 
0193 int update_delta;
0194 
0195 static int TriggerControl = 0;
0196 
0197 int ThePort=8899;
0198 
0199 int TheServerFD = 0;
0200 
0201 daqBuffer  *fillBuffer, *transportBuffer;
0202 
0203 pthread_t ThreadId;
0204 pthread_t ThreadMon;
0205 pthread_t ThreadEvt;
0206 pthread_t ThreadWeb = 0;
0207 
0208 int *thread_arg;
0209 
0210 int end_thread = 0;
0211 
0212 pthread_mutex_t M_cout;
0213 
0214 
0215 struct sockaddr_in si_mine;
0216 
0217 #define MONITORINGPORT 9930
0218 
0219 
0220 devicevector DeviceList;
0221 
0222 
0223 
0224 int NumberWritten = 0; 
0225 unsigned long long BytesInThisRun = 0;
0226 unsigned long long BytesInThisFile = 0;
0227 unsigned long long RolloverLimit = 0;
0228 
0229 unsigned long long run_volume, max_volume;
0230 int max_events;
0231 
0232 time_t last_speed_time = 0;
0233 int last_event_nr = 0;
0234 
0235 static time_t last_volume_time = 0;
0236 double last_volume = 0;
0237 
0238 int EvtId = 0;
0239 
0240 
0241 int max_seconds = 0;
0242 int verbose = 0;
0243 int runnumber=1;
0244 int packetid = 1001;
0245 int max_buffers=0;
0246 
0247 int adaptivebuffering = 15;
0248 int last_bufferwritetime  = 0;
0249 
0250 int persistentErrorCondition = 0;
0251 
0252 std::string MyHostName;
0253 std::string shortHostName;
0254 
0255 char *obtain_pidfilename()
0256 {
0257  return pidfilename;
0258 }
0259 
0260 int registerTriggerHandler ( TriggerHandler *th)
0261 {
0262   // we already have one
0263   if ( TriggerH ) return -1;
0264   TriggerH = th;
0265   return 0;
0266 }
0267 
0268 int clearTriggerHandler ()
0269 {
0270   TriggerH = 0;
0271   return 0;
0272 }
0273 
0274 
0275 void sig_handler(int i)
0276 {
0277   if (verbose)
0278     {
0279         pthread_mutex_lock(&M_cout);
0280     cout << "**interrupt " << endl;
0281     pthread_mutex_unlock(&M_cout);
0282     }
0283 
0284   TriggerControl = 0;
0285 }
0286 
0287 
0288 int reset_deadtime() 
0289 {
0290   if ( TriggerH) TriggerH->rearm();
0291   return 0;
0292 }
0293 
0294 int enable_trigger() 
0295 {
0296 
0297   TriggerControl=1;
0298   if ( TriggerH) TriggerH->enable();
0299 
0300   int status = pthread_create(&ThreadEvt, NULL, 
0301               EventLoop, 
0302               (void *) 0);
0303    
0304   if (status ) 
0305     {
0306       cout << "error in event thread create " << status << endl;
0307       exit(0);
0308     }
0309   else
0310     {
0311       if ( TriggerH) TriggerH->rearm();
0312     }
0313 
0314   return 0;
0315   
0316 }
0317 
0318 int disable_trigger() 
0319 {
0320   TriggerControl=0;  // this makes the trigger process terminate
0321   if ( TriggerH) TriggerH->disable();
0322   
0323   return 0;
0324 }
0325 
0326 
0327 int daq_setmaxevents (const int n, std::ostream& os)
0328 {
0329 
0330   max_events =n;
0331   return 0;
0332 
0333 }
0334 
0335 int daq_setmaxvolume (const int n_mb, std::ostream& os)
0336 {
0337   unsigned long long x = n_mb;
0338   max_volume =x * 1024 *1024;
0339   return 0;
0340 
0341 }
0342 
0343 int daq_setRunControlMode(const int flag, std::ostream& os)
0344 {
0345   if ( DAQ_RUNNING ) 
0346     {
0347       os << "Run is active" << endl;
0348       return -1;
0349     }
0350   if (flag) RunControlMode = 1;
0351   else RunControlMode = 0;
0352   return 0;
0353 }
0354 
0355 int daq_setrolloverlimit (const int n_gb, std::ostream& os)
0356 {
0357   if ( DAQ_RUNNING ) 
0358     {
0359       os << "Run is active" << endl;
0360       return -1;
0361     }
0362   RolloverLimit = n_gb;
0363   return 0;
0364 
0365 }
0366 
0367 int daq_setmaxbuffersize (const int n_kb, std::ostream& os)
0368 {
0369   if ( DAQ_RUNNING ) 
0370     {
0371       os << "Run is active" << endl;
0372       return -1;
0373     }
0374 
0375   Buffer1.setMaxSize(n_kb*1024);
0376   Buffer2.setMaxSize(n_kb*1024);
0377   return 0;
0378 }
0379 
0380 int daq_setadaptivebuffering (const int usecs, std::ostream& os)
0381 {
0382   adaptivebuffering = usecs;
0383   return 0;
0384 }
0385 
0386 // this call was added to allow chaging the format after the 
0387 // server was started. Before this was a compile-time option.
0388 int daq_setEventFormat(const int f, std::ostream& os )
0389 {
0390   if ( daq_running() )
0391     {
0392       os << "Run is active" << endl;
0393       return -1;
0394     }
0395       
0396   if (DeviceList.size())
0397     {
0398       os << "Cannot switch format after devices are defined" << endl;
0399       return -1;
0400     }
0401   int status =  Buffer1.setEventFormat(f);
0402   status |= Buffer2.setEventFormat(f);
0403   return status;
0404 }
0405 
0406 // this is a method for the devices to obtain
0407 // the info which format we are writing
0408  
0409 int daq_getEventFormat()
0410 {
0411   return Buffer1.getEventFormat();
0412 }
0413 
0414 int daq_getRunControlMode(std::ostream& os)
0415 {
0416   os << RunControlMode << endl;
0417   return 0;
0418 }
0419 
0420 // elog server setup
0421 int daq_set_eloghandler( const char *host, const int port, const char *logname)
0422 {
0423 
0424   if ( ElogH) delete ElogH;
0425   ElogH = new ElogHandler (host, port, logname );
0426 
0427   setenv ( "DAQ_ELOGHOST", host , 1);
0428   char str [128];
0429   sprintf(str, "%d", port);
0430   setenv ( "DAQ_ELOGPORT", str , 1);
0431   setenv ( "DAQ_ELOGLOGBOOK", logname , 1);
0432 
0433   return 0;
0434 }
0435 
0436 
0437 
0438 int readn (int fd, char *ptr, const int nbytes)
0439 {
0440   // in the interest of having just one "writen" call,
0441   // we determine if the file descriptor we got is a socket
0442   // (then we use send() ) or a file ( then we use write() ).  
0443   struct stat statbuf;
0444   fstat(fd, &statbuf);
0445   int fd_is_socket = 0;
0446   if ( S_ISSOCK(statbuf.st_mode) ) fd_is_socket = 1;
0447 
0448   int nread, nleft;
0449   nleft = nbytes;
0450   while ( nleft>0 )
0451     {
0452       if ( fd_is_socket) nread = recv (fd, ptr, nleft, MSG_NOSIGNAL);
0453       else nread = read (fd, ptr, nleft);
0454 
0455       if ( nread <= 0 ) 
0456     {
0457       return nread;
0458     }
0459 
0460       nleft -= nread;
0461       ptr += nread;
0462     }
0463 
0464   return (nbytes-nleft);
0465 }
0466 
0467 
0468 int writen (int fd, char *ptr, const int nbytes)
0469 {
0470 
0471   // in the interest of having just one "writen" call,
0472   // we determine if the file descriptor we got is a socket
0473   // (then we use send() ) or a file ( then we use write() ).  
0474   struct stat statbuf;
0475   fstat(fd, &statbuf);
0476   int fd_is_socket = 0;
0477   if ( S_ISSOCK(statbuf.st_mode) ) fd_is_socket = 1;
0478 
0479   int nleft, nwritten;
0480   nleft = nbytes;
0481   while ( nleft>0 )
0482     {
0483       nwritten = 0;
0484 
0485       if ( fd_is_socket)  nwritten = send (fd, ptr, nleft, MSG_NOSIGNAL);
0486       else  nwritten = write (fd, ptr, nleft);
0487       
0488       if ( nwritten <0 )
0489     {
0490       perror ("writen");
0491       return 0;
0492     }
0493       
0494       nleft -= nwritten;
0495       ptr += nwritten;
0496     }
0497   return (nbytes-nleft);
0498 }
0499 
0500 
0501 
0502 //-------------------------------------------------------------
0503 
0504 int open_file(const int run_number, int *fd)
0505 {
0506   
0507   if ( file_is_open) return -1;
0508 
0509 
0510   static char d[1024];
0511   sprintf( d, TheFileRule.c_str(),
0512        run_number, current_filesequence);
0513 
0514 
0515   // test if the file exists, do not overwrite
0516   int ifd =  open(d, O_WRONLY | O_CREAT | O_EXCL | O_LARGEFILE , 
0517           S_IRWXU | S_IROTH | S_IRGRP );
0518   if (ifd < 0) 
0519     {
0520       pthread_mutex_lock(&M_cout);
0521       cout << " error opening file " << d << endl;
0522       perror ( d);
0523       pthread_mutex_unlock(&M_cout);
0524 
0525       *fd = 0;
0526       return -1;
0527     }
0528 
0529   *fd = ifd;
0530   CurrentFilename = d;
0531   PreviousFilename = CurrentFilename;
0532   md5_init(&md5state);
0533 
0534   file_is_open =1;
0535   
0536   // this is tricky. If we open the very first file, we latch in this event number.
0537   // but if we need to roll over, we find that out when the current buffer is written,
0538   // and the event nr is what's in this buffer, not the last written buffer.
0539   // only for the real start this is right:
0540   if (current_filesequence == 0)
0541     {
0542       Event_number_at_last_open = Event_number;
0543       Event_number_at_last_write = Event_number;
0544     }
0545 
0546 
0547   daq_generate_json (0); // generate the "new file" report
0548   
0549   return 0;
0550 }
0551 
0552 int open_file_on_server(const int run_number)
0553 {
0554   
0555   if ( file_is_open) return -1;
0556 
0557   int status = server_open_Connection();
0558   if (status) return -1;
0559 
0560   static char d[1024];
0561   sprintf( d, TheFileRule.c_str(),
0562        run_number, current_filesequence);
0563 
0564   status = server_send_beginrun_sequence(d, run_number, TheServerFD);
0565   if ( status)
0566     {
0567       return -1;
0568     }
0569 
0570   CurrentFilename = d;
0571   PreviousFilename = CurrentFilename;
0572 
0573   file_is_open =1;
0574 
0575   return 0;
0576 }
0577 
0578 
0579 // this function can be called sirectly from daq_end_interactive, 
0580 // or we can make a thread for the "immediate" option 
0581 void *daq_end_thread (void *arg)
0582 {
0583 
0584   std::ostream *os = (std::ostream *) arg;
0585 
0586   // with an operator-induced daq_end, we make the event loop terminate
0587   // and wait for it to be done.
0588 
0589   disable_trigger();
0590 
0591   // it is possible that we call daq_end before we ever started a run
0592   if (ThreadEvt)   pthread_join(ThreadEvt, NULL);
0593 
0594   daq_end(*os);
0595   DAQ_ENDREQUESTED = 0;
0596 
0597   return 0;
0598 
0599 }
0600 
0601 
0602 
0603 
0604 void *shutdown_thread (void *arg)
0605 {
0606 
0607   unsigned long *t_args = (unsigned long *) arg;
0608 
0609   
0610   pthread_mutex_lock(&M_cout);
0611   cout << "shutting down... " <<  t_args[0] << "  " <<  t_args[1] << endl;
0612   int pid_fd = t_args[2];
0613   TriggerControl = 0;
0614   if ( TriggerH) delete TriggerH;
0615   pthread_mutex_unlock(&M_cout);
0616   // unregister out service 
0617   svc_unregister ( t_args[0], t_args[1]);
0618 
0619   flock(pid_fd, LOCK_UN);
0620   unlink(pidfilename);
0621   
0622   sleep(2);
0623   exit(0);
0624 }
0625 
0626 
0627 // this thread is watching for incoming requests for
0628 // monitoring data
0629 
0630 std::queue<int> fd_queue;  // this queue holds the monitoring requests 
0631 
0632 
0633 void *monitorRequestwatcher_thread (void *arg)
0634 {
0635 
0636   struct sockaddr_in server_addr;
0637   int sockfd;
0638   
0639   if ( (sockfd = socket(PF_INET, SOCK_STREAM, 0)) < 0 ) 
0640     {
0641       pthread_mutex_lock(&M_cout);
0642       cout  << "cannot create socket" << endl;
0643       pthread_mutex_unlock(&M_cout);
0644     }
0645 
0646   bzero( (char*)&server_addr, sizeof(server_addr));
0647   server_addr.sin_family = PF_INET;
0648   server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
0649   server_addr.sin_port = htons(MONITORINGPORT + servernumber);
0650 
0651 
0652   int status = bind(sockfd, (struct sockaddr*) &server_addr, sizeof(server_addr));
0653   if (status < 0)
0654     {
0655       perror("bind");
0656     }
0657     
0658   pthread_mutex_lock(&M_cout);
0659   cout  << "Listening for monitoring requests on port " << MONITORINGPORT + servernumber << endl;
0660   pthread_mutex_unlock(&M_cout);
0661 
0662   listen(sockfd, 16);
0663 
0664   int dd_fd;
0665   struct sockaddr_in out;
0666 
0667   while (sockfd > 0)
0668     {
0669 
0670       socklen_t  len = sizeof(out); 
0671       dd_fd = accept(sockfd,  (struct sockaddr *) &out, &len);
0672       if ( dd_fd < 0 ) 
0673     {
0674       pthread_mutex_lock(&M_cout);          
0675       cout  << "error in accept socket" << endl;
0676       pthread_mutex_unlock(&M_cout);
0677     }
0678       else
0679     {
0680       char *host = new char[64]; 
0681       
0682       getnameinfo((struct sockaddr *) &out, sizeof(struct sockaddr_in), host, 64,
0683               NULL, 0, NI_NOFQDN); 
0684       
0685       // time_t x =  time(0);
0686       // pthread_mutex_lock(&M_cout);
0687       // cout << ctime(&x) << "new request for monitoring connection accepted from " << host << endl;
0688       // pthread_mutex_unlock(&M_cout);
0689 
0690       pthread_mutex_lock(&FdManagementSem);
0691       fd_queue.push(dd_fd);
0692       pthread_mutex_unlock(&FdManagementSem);
0693       
0694       //kick off "the sender of monitoring data" thread
0695       pthread_mutex_unlock(&MonitoringRequestSem);
0696     }
0697     }
0698   return 0;
0699 }
0700 
0701 void handler ( int sig)
0702 {
0703   //pthread_mutex_lock(&M_cout);
0704   // cout  << " in handler..." << endl;
0705   //pthread_mutex_unlock(&M_cout);
0706   pthread_mutex_unlock(&SendProtectSem);
0707 }  
0708 
0709 void *sendMonitorData( void *arg)
0710 {
0711 
0712   int fd;
0713   int status;
0714   
0715   while (1)
0716     {
0717       // we wait here until a monitoring request comes
0718       //pthread_mutex_lock(&MonitoringRequestSem);
0719 
0720       // when we get here, there are requestst. Now wait for a buffer
0721       // to be available:
0722 
0723       // pthread_mutex_lock(&M_cout);
0724       // cout <<  "  locking SendSem " << endl;
0725       // pthread_mutex_unlock(&M_cout);
0726 
0727       pthread_mutex_lock( &SendSem);
0728 
0729       // Now we go through all requests in the queue
0730       while ( !fd_queue.empty() )
0731     {
0732       // now we are manipulating the queue. To do that, we need to
0733       // lock-protect this:
0734       
0735       pthread_mutex_lock(&FdManagementSem);
0736       fd = fd_queue.front();
0737       fd_queue.pop();
0738 
0739       pthread_mutex_unlock(&FdManagementSem);
0740 
0741       int max_length =0;
0742           
0743       // we always wait for a controlword which tells us what to do next.
0744       if ( (status = readn (fd, (char *) &max_length, 4) ) <= 0)
0745         {
0746           max_length = 0;
0747         }
0748       else
0749         {
0750           max_length = ntohl(max_length);
0751 
0752           status = transportBuffer->sendData(fd, max_length);
0753 
0754           int reply;
0755           if ( ! status && ( status = readn (fd, (char *) &reply, 4) ) <= 0)
0756         {
0757           // pthread_mutex_lock(&M_cout);
0758           // time_t x =  time(0);
0759           // cout << ctime(&x) << "  connection was broken for fd  " << fd << endl;
0760           // pthread_mutex_unlock(&M_cout);
0761         }
0762           else
0763         {
0764           // reply = ntohl(reply);
0765           // pthread_mutex_lock(&M_cout);
0766           // cout  << " reply = " << reply << endl;
0767           // pthread_mutex_unlock(&M_cout);
0768         }
0769         }
0770       close(fd);
0771     }
0772       // pthread_mutex_lock(&M_cout);
0773       // cout <<  "  unlocking SendProtectSem " << endl;
0774       // pthread_mutex_unlock(&M_cout);
0775 
0776       pthread_mutex_unlock(&SendProtectSem);
0777       if ( end_thread) pthread_exit(0);
0778 
0779     }
0780   return 0;
0781 }
0782 
0783 void *writebuffers ( void * arg)
0784 {
0785 
0786 
0787   while(1)
0788     {
0789 
0790       pthread_mutex_lock( &WriteSem); // we wait for an unlock
0791 
0792       
0793       last_bufferwritetime  = time(0);
0794       if ( daq_open_flag &&  outfile_fd) 
0795     {
0796       unsigned int bytecount = transportBuffer->writeout(outfile_fd);
0797       NumberWritten++;
0798       BytesInThisRun += bytecount;
0799       BytesInThisFile += bytecount;
0800     }
0801       else if ( daq_server_flag &&  TheServerFD)
0802     {
0803       unsigned int bytecount = transportBuffer->sendout(TheServerFD);
0804           NumberWritten++;
0805           BytesInThisRun += bytecount;
0806       BytesInThisFile += bytecount;
0807         }
0808 
0809       if ( end_thread) pthread_exit(0);
0810       
0811       pthread_mutex_unlock(&WriteProtectSem);
0812     }
0813   return 0;
0814 }
0815 
0816 int switch_buffer()
0817 {
0818 
0819    // pthread_mutex_lock(&M_cout);
0820    // cout << __LINE__ << "  " << __FILE__ << " switching buffer" << endl;
0821    // pthread_mutex_unlock(&M_cout);
0822 
0823   daqBuffer *spare;
0824   
0825   pthread_mutex_lock(&WriteProtectSem);
0826   pthread_mutex_lock(&SendProtectSem);
0827 
0828   fillBuffer->addEoB();
0829 
0830   //switch buffers
0831   spare = transportBuffer;
0832   transportBuffer = fillBuffer;
0833   fillBuffer = spare;
0834 
0835   //+++
0836   fillBuffer->prepare_next(++Buffer_number, TheRun);
0837 
0838 
0839 
0840   // let's see if we need to roll over
0841     if ( daq_open_flag && RolloverLimit)
0842     {
0843       unsigned int blength = transportBuffer->getLength();
0844       
0845       if ( blength + BytesInThisFile > RolloverLimit * 1024 * 1024 * 1024) 
0846     {
0847 
0848       if ( daq_server_flag)
0849         {
0850 
0851           current_filesequence++;
0852           static char d[1024];
0853           sprintf( d, TheFileRule.c_str(),
0854                TheRun, current_filesequence);
0855           
0856           cout << __FILE__ << " " << __LINE__ << " rolling over " << d << endl;
0857 
0858           int status = server_send_rollover_sequence(d,TheServerFD);
0859           CurrentFilename = d;
0860         }
0861       else   // not server
0862         {
0863           close(outfile_fd);
0864           file_is_open = 0;
0865 
0866           current_filesequence++;
0867           daq_generate_json(1);
0868 
0869           Event_number_at_last_open = Event_number_at_last_write;
0870           int status = open_file ( TheRun, &outfile_fd);
0871           if (status)
0872         {
0873           cout << MyHostName << "Could not open output file - Run " << TheRun << "  file sequence " << current_filesequence<< endl;
0874         }
0875         }
0876       // cout << MyHostName << " -- Rolling output file over at "
0877       //      << transportBuffer->getLength() + BytesInThisFile
0878       //      << " sequence: " << current_filesequence
0879       //      << " limit: " << RolloverLimit
0880       //      << " now: " << CurrentFilename 
0881       //      << endl;
0882       BytesInThisFile = 0;
0883     }
0884     }
0885   
0886   Event_number_at_last_write = Event_number;
0887   pthread_mutex_unlock(&WriteSem);
0888   pthread_mutex_unlock(&SendSem);
0889   return 0;
0890 
0891 }
0892 
0893 int daq_set_runnumberfile(const char *file, const int flag)
0894 {
0895     if ( flag)
0896     {
0897       TheRunnumberfile.clear();
0898       RunnumberfileIsSet = 0;
0899       return 0;
0900     }
0901 
0902   TheRunnumberfile = file;
0903   RunnumberfileIsSet = 1;
0904   FILE *fp = fopen(TheRunnumberfile.c_str(), "r");
0905   int r = 0;
0906   if (fp)
0907     {
0908       int status = fscanf(fp, "%d", &r);
0909       if ( status != 1) r = 0; 
0910       if ( ! TheRun )
0911     {
0912       TheRun = r;
0913     }
0914       fclose(fp);
0915     }
0916 
0917   return 0;
0918 }
0919 
0920 int daq_write_runnumberfile(const int run)
0921 {
0922   if ( !RunnumberfileIsSet ) return 1;
0923   if ( RunControlMode ) return 1;
0924 
0925   FILE *fp = fopen(TheRunnumberfile.c_str(), "w");
0926   if (fp )
0927     {
0928       fprintf(fp, "%d\n", run);
0929       fclose(fp);
0930     }
0931 
0932   return 0;
0933 }
0934 
0935 
0936 int daq_set_runnumberApp(const char *file, const int flag)
0937 {
0938   if ( flag)
0939     {
0940       TheRunnumberApp.clear();
0941       RunnumberAppIsSet = 0;
0942       return 0;
0943     }
0944   
0945   TheRunnumberApp = file;
0946   RunnumberAppIsSet = 1;
0947   return 0;
0948 }
0949 
0950 int getRunNumberFromApp()
0951 {
0952   if ( ! RunnumberAppIsSet) return -1;
0953   FILE *fp = popen(TheRunnumberApp.c_str(),"r");
0954   if (fp == NULL)
0955     {
0956       std::cerr << "error running the runnumber app" << std::endl;
0957       return -1;
0958     }
0959     char in[64];
0960     int len = fread(in, 1, 64, fp);
0961     pclose(fp);
0962     
0963     std::stringstream s (in);
0964     int run;
0965     if (! (s >> run) )
0966       {
0967     return -1;
0968       }
0969 
0970   return run;
0971 }
0972 
0973 
0974 
0975 int daq_set_filerule(const char *rule , std::ostream& os)
0976 {
0977   if ( DAQ_RUNNING ) 
0978     {
0979       os << MyHostName << "Run is active" << endl;;
0980       return -1;
0981     }
0982 
0983   TheFileRule = rule;
0984   return 0;
0985 }
0986 
0987 int daq_set_name(const char *name)
0988 {
0989   MyName = name;
0990   if (servernumber)
0991     {
0992       MyName = MyName + ":" + to_string(servernumber);
0993     }
0994   request_mg_update (MG_REQUEST_NAME);
0995   return 0;
0996 }
0997 
0998 #ifdef HAVE_MOSQUITTO_H
0999 int daq_set_mqtt_host(const char * host, const int port, std::ostream& os)
1000 {
1001   std::cout << __FILE__ << "  " << __LINE__ <<  " mqtt host " << host << " port " << port << endl;
1002   if (mqtt) delete mqtt;
1003 
1004   if (strcasecmp(host, "None") == 0) // delete existing def
1005     {
1006       mqtt = 0;
1007       mqtt_host = "";
1008       mqtt_port = 0;
1009       return 0;
1010     }
1011 
1012   mqtt_host = host;
1013   mqtt = new MQTTConnection(mqtt_host, "rcdaq", port);
1014   if ( mqtt->Status())
1015     {
1016       delete mqtt;
1017       mqtt =0;
1018       mqtt_host = "";
1019       mqtt_port = 0;
1020       os << "Could not connect to host " << host << " on port " << port << endl;
1021       return 1;
1022     }
1023 
1024   mqtt_host = host;
1025   mqtt_port = port;
1026 
1027   return 0;
1028 }
1029 
1030 int daq_get_mqtt_host(std::ostream& os)
1031 {
1032   if (!mqtt)
1033     {
1034       os << " No MQTT host defined" << endl;
1035       return 1;
1036     }
1037   os << " Host " << mqtt->GetHostName() << " " << " port " << mqtt->GetPort() << endl; 
1038   return 0;
1039 }
1040 #endif
1041 
1042 
1043 
1044 // this is selecting from any of the existing run types 
1045 int daq_setruntype(const char *type, std::ostream& os )
1046 {
1047 
1048   if ( DAQ_RUNNING ) 
1049     {
1050       os << MyHostName << "Run is active" << endl;;
1051       return -1;
1052     }
1053 
1054   std::string _type = type;
1055   std::map <string,string>::const_iterator iter = RunTypes.begin();
1056   for ( ; iter != RunTypes.end(); ++iter)
1057     {
1058       if ( iter->first == _type )
1059     {
1060       TheFileRule = iter->second;
1061       TheRunType = _type;
1062       return 0;
1063     }
1064     }
1065   os << " Run type " << type << " is not defined " << endl;
1066   return 1;
1067 }
1068 
1069 int daq_getruntype(const int flag, std::ostream& os)
1070 {
1071   std::map <string,string>::const_iterator iter = RunTypes.begin();
1072   for ( ; iter != RunTypes.end(); ++iter)
1073     {
1074       if ( iter->second == TheFileRule )
1075     {
1076       if ( flag == 2) 
1077         {
1078           os << iter->first      << " - " << iter->second << endl;
1079           return 0;
1080         }
1081       else
1082         {
1083           os << iter->first << endl;
1084           return 0;
1085         }
1086     }
1087     }
1088   return 0;
1089 }
1090 
1091 // this is defining a new run type (or re-defining an old one) 
1092 int daq_define_runtype(const char *type, const char *rule)
1093 {
1094   std::string _type = type;
1095   // std::map <string,string>::const_iterator iter = RunTypes.begin();
1096   // for ( ; iter != RunTypes.end(); ++iter)
1097   //   {
1098   //     if ( iter->first == _type )
1099   //    {
1100   //      RunTypes[_type] = rule;
1101   //      return 0;
1102   //    }
1103   //   }
1104   RunTypes[_type] = rule;
1105   return 0;
1106 }
1107 
1108 int daq_status_runtypes (std::ostream& os )
1109 {
1110   os << "  -- defined Run Types: ";
1111   return daq_list_runtypes(2, os);
1112 }
1113 
1114 int daq_list_runtypes(const int flag, std::ostream& os)
1115 {
1116 
1117   if ( flag && RunTypes.size() == 0 )
1118     {
1119       os << "  (none)" <<endl;
1120       return 0;
1121     }
1122   if (flag ==2) os << endl;
1123 
1124   std::map <string,string>::const_iterator iter = RunTypes.begin();
1125   for ( ; iter != RunTypes.end(); ++iter)
1126     {
1127       if (flag)
1128     {
1129       os << "     " << setw(12) << iter->first   << " - " << iter->second << endl;
1130     }
1131       else
1132     {
1133       os << iter->first << endl;
1134     }
1135       
1136     }
1137   return 0;
1138 }    
1139 
1140 int daq_get_name (std::ostream& os )
1141 {
1142   os << MyName << endl;
1143   return 0;
1144 }
1145 
1146 std::string daq_get_myname()
1147 {
1148   return MyName;
1149 }
1150 
1151 std::string& daq_get_filerule()
1152 {
1153   return TheFileRule;
1154 }
1155 
1156 std::string& get_current_filename()
1157 {
1158   return CurrentFilename;
1159 }
1160 
1161 std::string& get_previous_filename()
1162 {
1163   return PreviousFilename;
1164 }
1165 
1166 double daq_get_mb_per_second()
1167 {
1168 
1169   time_t now_time = time(0);
1170   if ( now_time == last_volume_time) return 0;
1171   double time_delta = now_time - last_volume_time;
1172   double mb_per_second = ( run_volume - last_volume) / time_delta / 1024. /1204.;
1173   last_volume = run_volume;
1174   last_volume_time = now_time;
1175   return mb_per_second;
1176 }
1177 
1178 double daq_get_events_per_second()
1179 {
1180   time_t now_time = time(0);
1181   if ( now_time == last_speed_time) return 0;
1182   double time_delta = now_time - last_speed_time;
1183   double events_per_second = ( Event_number - last_event_nr) / time_delta;
1184   last_event_nr = Event_number;
1185   last_speed_time = now_time;
1186   return events_per_second;
1187 }
1188 
1189 
1190 void * daq_begin_thread( void *arg)
1191 {
1192   int irun = *(int*)arg;
1193   int status = daq_begin( irun, std::cout);
1194   if (status)
1195     {
1196       // not sure what to do exactly
1197       cout << __FILE__ << " " << __LINE__ << " asynchronous begin run failed" << endl;
1198     }
1199 
1200   DAQ_BEGININPROGRESS = 0;
1201 
1202   return 0;
1203 }
1204 
1205 int daq_begin_immediate(const int irun, std::ostream& os)
1206 {
1207   static unsigned int  b_arg;
1208   b_arg = irun;
1209 
1210   if ( DAQ_RUNNING ) 
1211     {
1212       os << MyHostName << "Run is active" << endl;
1213       return -1;
1214     }
1215   if ( irun )
1216     {
1217       os << MyHostName << "Run " << irun  << " begin requested" << endl;
1218     }
1219   else   // I need to think about this a bit. We let the begin_run update the run number. 
1220     {
1221       os << MyHostName << "Run " << TheRun+1 << " begin requested" << endl;
1222     }
1223   
1224   DAQ_BEGININPROGRESS = 1;
1225   
1226   pthread_t t;
1227  
1228   int status = pthread_create(&t, NULL,
1229                           daq_begin_thread,
1230                           (void *) &b_arg);
1231   if (status ) 
1232     {
1233 
1234       cout << "begin_run failed " << status << endl;
1235       os << "begin_run failed " << status << endl;
1236       return -1;
1237     }
1238   return 0;
1239 }
1240 
1241 
1242 int daq_begin(const int irun, std::ostream& os)
1243 {
1244   if ( DAQ_RUNNING ) 
1245     {
1246       os << MyHostName << "Run is already active" << endl;;
1247       return -1;
1248     }
1249 
1250   if ( RunControlMode &&  irun ==0 )
1251     {
1252       os << MyHostName << " No automatic Run Numbers in Run Control Mode" << endl;;
1253       return -1;
1254     }
1255   
1256   if ( persistentErrorCondition)
1257     {
1258       os << MyHostName << "*** Previous error with server connection" << endl;;
1259       return -1;
1260     }
1261     
1262   if (ThreadEvt) pthread_join(ThreadEvt, NULL);
1263 
1264   
1265   // set the status to "running"
1266   DAQ_RUNNING = 1;
1267   current_filesequence = 0;
1268   
1269   // if we are in run Control mode, we don't allow automatic run numbers
1270 
1271   if (  irun ==0)
1272     {
1273       if ( RunnumberAppIsSet)
1274     {
1275       int run = getRunNumberFromApp();
1276       if ( run < 0)
1277         {
1278           os << MyHostName << "Could not obtain a run number from " << TheRunnumberApp << ", run  not started" << endl;
1279           DAQ_RUNNING = 0;
1280           return -1;
1281         }
1282       TheRun = run;
1283     }
1284       else
1285     {
1286       TheRun++;
1287     }
1288     }
1289   else
1290     {
1291       TheRun = irun;
1292     }
1293 
1294   //initialize the Buffer and event number
1295   Buffer_number = 1;
1296   Event_number  = 1;
1297   Event_number_at_last_write = 0;
1298   Event_number_at_last_open = 0;
1299   Event_number_at_last_end = 0;
1300 
1301   // initialize the run/file volume
1302   BytesInThisRun = 0;    // bytes actually written
1303   BytesInThisFile = 0;
1304 
1305   
1306   if ( daq_open_flag)
1307     {
1308       if ( daq_server_flag)
1309     {
1310       
1311       int status = open_file_on_server(TheRun);
1312       if ( !status)
1313         {
1314 
1315           if (ElogH) ElogH->BegrunLog( TheRun,"RCDAQ",  
1316                        get_current_filename());
1317 
1318           daq_write_runnumberfile(TheRun);
1319           last_bufferwritetime  = time(0);  // initialize this at begin-run
1320           
1321         }
1322       else
1323         {
1324           os << MyHostName << "Could not open remote output file - Run " << TheRun << " not started" << endl;;
1325           DAQ_RUNNING = 0;
1326           return -1;
1327         }
1328     }
1329 
1330       else  // we log to a standard local file
1331     {
1332       int status = open_file ( TheRun, &outfile_fd);
1333       if ( !status)
1334         {
1335           if (ElogH) ElogH->BegrunLog( TheRun,"RCDAQ",  
1336                        get_current_filename());
1337 
1338           daq_write_runnumberfile(TheRun);
1339           last_bufferwritetime  = time(0);  // initialize this at begin-run
1340 
1341         }
1342       else
1343         {
1344           os << MyHostName << "Could not open output file - Run " << TheRun << " not started" << endl;;
1345           DAQ_RUNNING = 0;
1346           return -1;
1347         }
1348     }
1349 
1350       
1351     }
1352 
1353 
1354 
1355   // just to be safe, clear the "end requested" bit
1356   DAQ_ENDREQUESTED = 0;
1357   
1358   cout << "starting run " << TheRun << " at " << time(0) << endl; 
1359   set_eventsizes();
1360   // initialize Buffer1 to be the fill buffer
1361   //fillBuffer      = &Buffer1;
1362   //transportBuffer = &Buffer2;
1363 
1364   // a safety check: see that the buffers haven't been adjusted 
1365   // to a smaller value than the event size
1366   int wantedmaxsize = 0;
1367   for (int i = 0; i< MAXEVENTID; i++)
1368     {
1369       if ( (4*Eventsize[i] + 4*32) > fillBuffer->getMaxSize()
1370            || (4*Eventsize[i] + 4*32) > transportBuffer->getMaxSize() ) 
1371     {  
1372       int x = 4*Eventsize[i] + 4*32;   // this is now in bytes
1373       if ( x > wantedmaxsize ) wantedmaxsize = x;
1374     }
1375     }
1376   if ( wantedmaxsize)
1377     {
1378       if ( fillBuffer->setMaxSize(wantedmaxsize) || transportBuffer->setMaxSize(wantedmaxsize))
1379     { 
1380       os << MyHostName << "Cannot start run - event sizes larger than buffer, size " 
1381          <<  wantedmaxsize/1024 << " Buffer size " 
1382          << transportBuffer->getMaxSize()/1024 << endl;
1383       DAQ_RUNNING = 0;
1384       return -1;
1385     }
1386       //      os << " Buffer size increased to " << transportBuffer->getMaxSize()/1024 << " KB"<< endl;
1387       
1388     }
1389 
1390   last_volume_time = last_speed_time =  StartTime = time(0);
1391   last_event_nr = 0;
1392   last_volume = 0;
1393 
1394       
1395   // here we sucessfully start a run. So now we set the env. variables
1396   char str[128];
1397   // RUNNUMBER
1398   sprintf(str, "%d", TheRun);
1399   setenv ( "DAQ_RUNNUMBER", str , 1);
1400 
1401   // sprintf( str, "%d", Event_number);
1402   // setenv ( "DAQ_EVENTNUMBER", str , 1);
1403   
1404   setenv ( "DAQ_FILERULE", TheFileRule.c_str() , 1);
1405 
1406   sprintf( str, "%ld", StartTime);
1407   setenv ( "DAQ_STARTTIME", str , 1);
1408          
1409   if ( daq_open_flag || daq_server_flag )
1410     {
1411       setenv ( "DAQ_FILENAME", CurrentFilename.c_str() , 1);
1412     }
1413 
1414 
1415   // we are opening a new file here, so we restart the MD5 calculation 
1416   md5_init(&md5state);
1417   
1418   fillBuffer->prepare_next(Buffer_number,TheRun);
1419 
1420   run_volume = 0;
1421   
1422   device_init();
1423   
1424   // readout the begin-run event
1425   readout(BEGRUNEVENT);
1426   
1427   //now enable the interrupts and reset the deadtime
1428   enable_trigger();
1429 
1430   request_mg_update (MG_REQUEST_SPEED);
1431 
1432   os << MyHostName << "Run " << TheRun << " started" << endl;
1433 
1434   return 0;
1435 }
1436 
1437 int daq_end_immediate(std::ostream& os)
1438 {
1439   if ( ! (DAQ_RUNNING) ) 
1440     {
1441       os << MyHostName << "Run is not active" << endl;;
1442       return -1;
1443     }
1444   os << MyHostName << "Run " << TheRun << " end requested" << endl;
1445   DAQ_ENDREQUESTED = 1;
1446 
1447   pthread_t t;
1448   
1449   void * x = &os;
1450 
1451   //cout << __FILE__ << " " << __LINE__ << " calling end_thread as thread" << endl; 
1452   int status = pthread_create(&t, NULL,
1453                           daq_end_thread,
1454                           x);
1455   if (status ) 
1456     {
1457 
1458       cout << "end_run failed " << status << endl;
1459       os << "end_run failed " << status << endl;
1460     }
1461   //cout << __FILE__ << " " << __LINE__ << " done setting up  end_thread" << endl; 
1462   return 0;
1463 }
1464 
1465 
1466 // this function is to hold further interactions until a asynchronous begin-run is over 
1467 int daq_wait_for_begin_done()
1468 {
1469     while ( DAQ_BEGININPROGRESS ) usleep(10000);
1470     return 0;
1471 }
1472 
1473 // this function is to avoid a race condition with the asynchronous "end requested" feature
1474 int daq_wait_for_actual_end()
1475 {
1476 
1477   while ( DAQ_ENDREQUESTED ) 
1478     {
1479       usleep(10000);
1480     }
1481 
1482   return 0;
1483 }
1484 
1485 int daq_end_interactive(std::ostream& os)
1486 {
1487 
1488   void *x = &os;
1489   daq_end_thread (x);
1490   return 0;
1491 }
1492 
1493   
1494 
1495 int daq_end(std::ostream& os)
1496 {
1497   
1498   if ( ! (DAQ_RUNNING) ) 
1499     {
1500       os << MyHostName << "Run is not active" << endl;;
1501       return -1;
1502     }
1503   
1504   disable_trigger();
1505   device_endrun();
1506 
1507   readout(ENDRUNEVENT);
1508   switch_buffer();  // we force a buffer flush
1509         
1510   if ( file_is_open  )
1511     {
1512 
1513       pthread_mutex_lock(&WriteProtectSem);
1514       pthread_mutex_unlock(&WriteProtectSem);
1515 
1516       double v = run_volume;
1517       v /= (1024*1024);
1518       
1519       if (ElogH) ElogH->EndrunLog( TheRun,"RCDAQ", Event_number, v, StartTime);
1520       
1521       if ( daq_server_flag)
1522     {
1523       server_send_endrun_sequence(TheServerFD);
1524       if ( server_send_close_sequence(TheServerFD) )
1525         {
1526           std::cout << __FILE__ << " " << __LINE__ << " error in closing connection...   " << std::endl;
1527         }
1528       close(TheServerFD);
1529       TheServerFD = 0;
1530     }
1531       else
1532     {
1533       close (outfile_fd);
1534       daq_generate_json(1);
1535       outfile_fd = 0;
1536     }
1537       file_is_open = 0;
1538 
1539 
1540     } 
1541 
1542 
1543   
1544   unsetenv ("DAQ_RUNNUMBER");
1545   unsetenv ("DAQ_FILENAME");
1546   unsetenv ("DAQ_STARTTIME");
1547 
1548   Event_number_at_last_end = Event_number;
1549   Event_number = 0;
1550   Event_number_at_last_write = 0;
1551   run_volume = 0;    // volume in longwords 
1552   BytesInThisRun = 0;    // bytes actually written
1553   BytesInThisFile = 0;
1554   Buffer_number = 0;
1555   PreviousFilename = CurrentFilename;
1556   CurrentFilename = "";
1557   StartTime = 0;
1558   DAQ_RUNNING = 0;
1559 
1560   last_volume_time = last_speed_time = time(0);
1561   last_event_nr = 0;
1562   last_volume = 0;
1563 
1564   request_mg_update (MG_REQUEST_SPEED);
1565 
1566   DAQ_ENDREQUESTED = 0;
1567   
1568   os << MyHostName <<  "Run " << TheRun << " ended" << endl;
1569 
1570   return 0;
1571 }
1572 
1573 int daq_fake_trigger (const int n, const int waitinterval)
1574 {
1575   int i;
1576   for ( i = 0; i < n; i++)
1577     {
1578       
1579       Trigger_Todo=DAQ_READ;
1580       
1581 //       pthread_mutex_lock(&M_cout);
1582 //       cout << "trigger" << endl;
1583 //       pthread_mutex_unlock(&M_cout);
1584 
1585       usleep (200000);
1586 
1587     }
1588   return 0;
1589 }
1590 
1591 
1592 void * EventLoop( void *arg)
1593 {
1594 
1595   // pthread_mutex_lock(&M_cout);
1596   //  std::cout << __FILE__ << " " << __LINE__ << " event loop starting...   " << std::endl;
1597   // pthread_mutex_unlock(&M_cout);
1598 
1599   int rstatus;
1600   
1601   while (TriggerControl)
1602     {
1603 
1604       // let's see if we have a TriggerHelper object
1605       if (TriggerH)
1606     {
1607       CurrentEventType = TriggerH->wait_for_trigger();
1608     }
1609       else // we auto-generate a few triggers
1610     {
1611       CurrentEventType = 1;
1612       usleep (100000);
1613     }
1614       
1615       
1616       if (CurrentEventType) 
1617     {
1618       
1619       if ( DAQ_RUNNING ) 
1620         {
1621           
1622           rstatus = readout(CurrentEventType);
1623           
1624           if (  rstatus)    // we got an endrun signal
1625         {
1626           TriggerControl = 0;
1627           //reset_deadtime();
1628           cout << __LINE__ << " calling daq_end" << endl;
1629           daq_end ( std::cout);
1630         }
1631           else
1632         {
1633           rearm(DATAEVENT);
1634           reset_deadtime();
1635         }
1636         }
1637       else  // no, we are not running
1638         {
1639           cout << "Run not active" << endl;
1640           // reset todo, and the DAQ_TRIGGER bit. 
1641           TriggerControl = 0;
1642         }
1643     }
1644     }
1645 
1646   return 0;
1647   
1648 }
1649 
1650 
1651 int add_readoutdevice ( daq_device *d)
1652 {
1653   DeviceList.push_back (d);
1654   return 0;
1655 
1656 }
1657 
1658 
1659 int device_init()
1660 {
1661 
1662   deviceiterator d_it;
1663 
1664   for ( d_it = DeviceList.begin(); d_it != DeviceList.end(); ++d_it)
1665     {
1666       //      cout << "calling init on "; 
1667       //(*d_it)->identify();
1668       (*d_it)->init();
1669     }
1670 
1671   return 0;
1672 }
1673 
1674 int device_endrun()
1675 {
1676 
1677   deviceiterator d_it;
1678 
1679   for ( d_it = DeviceList.begin(); d_it != DeviceList.end(); ++d_it)
1680     {
1681       //      cout << "calling init on "; 
1682       //(*d_it)->identify();
1683       (*d_it)->endrun();
1684     }
1685 
1686   return 0;
1687 }
1688 
1689 
1690 int readout(const int etype)
1691 {
1692 
1693   //  pthread_mutex_lock(&M_cout);
1694   // cout << " readout etype = " << etype << endl;
1695   // pthread_mutex_unlock(&M_cout);
1696 
1697   int len = EVTHEADERLENGTH;
1698 
1699   if (etype < 0 || etype>MAXEVENTID) return 0;
1700 
1701   deviceiterator d_it;
1702 
1703   sprintf( daq_event_env_string, "DAQ_EVENTNUMBER=%d", Event_number);
1704 
1705 
1706   int status = fillBuffer->nextEvent(etype,Event_number, Eventsize[etype]);
1707   if (status != 0) 
1708     {
1709       switch_buffer();
1710       status = fillBuffer->nextEvent(etype,Event_number,  Eventsize[etype]);
1711     }
1712 
1713   for ( d_it = DeviceList.begin(); d_it != DeviceList.end(); ++d_it)
1714     {
1715       len += fillBuffer->addSubevent ( (*d_it) );
1716     }
1717 
1718   Event_number++;
1719   Event_number_at_last_end = Event_number;
1720 
1721 
1722   run_volume += 4*len;
1723 
1724   int returncode = 0;
1725 
1726   if (  DAQ_RUNNING )
1727     {
1728       if ( etype == DATAEVENT && max_volume > 0 && run_volume >= max_volume) 
1729     {
1730       cout << " automatic end after " << max_volume /(1024*1024) << " Mb" << endl;
1731       returncode = 1;
1732     }
1733       
1734       if ( etype == DATAEVENT && max_events > 0 && Event_number >= max_events ) 
1735     {
1736       cout << " automatic end after " << max_events<< " events" << endl;
1737       returncode = 1;
1738     }
1739     }
1740 
1741   if ( adaptivebuffering  &&  time(0) - last_bufferwritetime > adaptivebuffering )
1742     {
1743       switch_buffer();
1744       //      cout << "adaptive buffer switching" << endl;
1745     }
1746 
1747   return returncode;
1748 }
1749 
1750 
1751 int rearm(const int etype)
1752 {
1753 
1754   if (etype < 0 || etype>MAXEVENTID) return 0;
1755 
1756   deviceiterator d_it;
1757 
1758   for ( d_it = DeviceList.begin(); d_it != DeviceList.end(); ++d_it)
1759     {
1760       (*d_it)->rearm(etype);
1761     }
1762 
1763   return 0;
1764 }
1765 
1766 void set_eventsizes()
1767 {
1768   int i;
1769   int size;
1770   deviceiterator d_it;
1771 
1772   for (i = 0; i< MAXEVENTID; i++)
1773     {
1774       size = EVTHEADERLENGTH;
1775 
1776 
1777       for ( d_it = DeviceList.begin(); d_it != DeviceList.end(); ++d_it)
1778     {
1779       size += (*d_it)->max_length(i) ;
1780     }
1781 
1782       Eventsize[i] = size;
1783       if (size>EVTHEADERLENGTH)
1784     cout << "Event id " << i << " size " << size << endl;
1785     }
1786 }
1787 
1788 int daq_open (std::ostream& os)
1789 {
1790 
1791   if ( DAQ_RUNNING ) 
1792     {
1793       os << MyHostName << "Run is active" << endl;;
1794       return -1;
1795     }
1796 
1797   // if ( daq_server_flag)
1798   //   {
1799   //     os << "Server logging is enabled" << endl;;
1800   //     return -1;
1801   //   }
1802       
1803   persistentErrorCondition = 0;
1804 
1805   daq_open_flag =1;
1806   return 0;
1807 }
1808 
1809 int daq_set_compression (const int flag, std::ostream& os)
1810 {
1811 
1812   if ( DAQ_RUNNING ) 
1813     {
1814       os << MyHostName << "Run is active" << endl;;
1815       return -1;
1816     }
1817   if (flag)
1818     {
1819       Buffer1.setCompression(1);
1820       Buffer2.setCompression(1);
1821     }
1822   else
1823     {
1824       Buffer1.setCompression(0);
1825       Buffer2.setCompression(0);
1826     }
1827 
1828   return 0;
1829 }
1830 
1831 int daq_set_md5enable (const int flag, std::ostream& os)
1832 {
1833 
1834   if ( DAQ_RUNNING ) 
1835     {
1836       os << MyHostName << "Run is active" << endl;;
1837       return -1;
1838     }
1839   if ( ! md5_allow_turnoff) 
1840     {
1841       os << MyHostName << " MD5 switchoff not enabled" << endl;;
1842       return 0;
1843     }
1844 
1845   if (flag)
1846     {
1847       Buffer1.setMD5Enabled(1);
1848       Buffer2.setMD5Enabled(1);
1849       md5_enabled = 1;
1850     }
1851   else
1852     {
1853       Buffer1.setMD5Enabled(0);
1854       Buffer2.setMD5Enabled(0);
1855       md5_enabled = 0;
1856     }
1857 
1858   return 0;
1859 }
1860 
1861 int daq_set_md5allowturnoff (const int flag, std::ostream& os)
1862 {
1863   if (flag)
1864     {
1865       md5_allow_turnoff = 1;
1866       os << MyHostName << "MD5 turnoff allowed" << endl;
1867     }
1868   else 
1869     {
1870       md5_allow_turnoff = 0;
1871       os << MyHostName << "MD5 turnoff not allowed" << endl;
1872     }
1873   return 0;
1874 }
1875 
1876 int daq_set_server (const char *hostname, const int port, std::ostream& os)
1877 {
1878 
1879   if ( DAQ_RUNNING ) 
1880     {
1881       os << MyHostName << "Run is active" << endl;;
1882       return -1;
1883     }
1884 
1885   daq_server_name = hostname;
1886   if ( daq_server_name == "None")
1887     {
1888       daq_server_flag = 0;
1889       daq_server_name = "";      
1890       daq_server_port = 0;
1891       return 0;
1892     }
1893   
1894   daq_server_flag = 1;
1895   daq_server_port = port;
1896   if ( ! daq_server_port) daq_server_port = 5001;
1897 
1898   return 0;
1899 }
1900 
1901   
1902 
1903 int server_open_Connection()
1904 {
1905     
1906   if ( !daq_server_flag)
1907     {
1908       return -1;
1909     }
1910 
1911   persistentErrorCondition = 0;
1912 
1913   int theport = daq_server_port;
1914   if ( ! theport) theport = 5001;
1915   
1916   TheServerFD = open_serverSocket(daq_server_name.c_str(), theport);
1917   if ( TheServerFD < 0)
1918     {
1919       if ( TheServerFD == -1) 
1920     {
1921       cout << __FILE__<< " " << __LINE__  << " error connecting to server " << daq_server_name << " on port " << theport << endl;
1922     }
1923       else
1924     {
1925        cout << __FILE__<< " " << __LINE__ << " error connecting to server " << daq_server_name << " on port " << theport << "  " << gai_strerror(TheServerFD) << endl;
1926     }
1927 
1928       TheServerFD = 0;
1929       persistentErrorCondition = 1;
1930       return -1;
1931     }
1932   
1933   return 0;
1934 }
1935 
1936 
1937 int daq_shutdown(const unsigned long servernumber, const unsigned long versionnumber, const int pid_fd,
1938          std::ostream& os)
1939 {
1940 
1941   if ( DAQ_RUNNING ) 
1942     {
1943       os << MyHostName << "Run is active" << endl;;
1944       return -1;
1945     }
1946 
1947   if (daq_server_flag)
1948     {
1949       daq_close(std::cout);
1950     }
1951   
1952   static unsigned long  t_args[3];
1953   t_args[0] = servernumber;
1954   t_args[1] = versionnumber;
1955   t_args[3] = pid_fd;
1956 
1957   
1958   pthread_t t;
1959 
1960   int status = pthread_create(&t, NULL, 
1961               shutdown_thread, 
1962               (void *) t_args);
1963    
1964   if (status ) 
1965     {
1966       cout << "cannot shut down " << status << endl;
1967       os << MyHostName << "cannot shut down " << status << endl;
1968       return -1;
1969     }
1970   os << " ";
1971   return 0;
1972 }
1973 
1974 int is_open()
1975 {
1976   return daq_open_flag;
1977 }
1978 
1979 int is_server_open()
1980 {
1981   return daq_server_flag;
1982 }
1983 
1984 int daq_close (std::ostream& os)
1985 {
1986 
1987   if ( DAQ_RUNNING ) 
1988     {
1989       os << MyHostName << "Run is active" << endl;;
1990       return -1;
1991     }
1992 
1993   daq_open_flag =0;
1994   persistentErrorCondition = 0;
1995 
1996   return 0;
1997 }
1998 
1999 int daq_server_close (std::ostream& os)
2000 {
2001 
2002   if ( DAQ_RUNNING ) 
2003     {
2004       os << MyHostName << "Run is active" << endl;;
2005       return -1;
2006     }
2007   if ( server_send_close_sequence(TheServerFD) )
2008     {
2009       std::cout << __FILE__ << " " << __LINE__ << " error in closing connection...   " << std::endl;
2010     }
2011   close(TheServerFD);
2012   TheServerFD = 0;
2013   
2014   daq_server_flag =0;
2015   return 0;
2016 }
2017 
2018 
2019 int daq_list_readlist(std::ostream& os)
2020 {
2021 
2022   deviceiterator d_it;
2023 
2024   for ( d_it = DeviceList.begin(); d_it != DeviceList.end(); ++d_it)
2025     {
2026       (*d_it)->identify(os);
2027     }
2028 
2029   return 0;
2030 
2031 }
2032 
2033 int daq_clear_readlist(std::ostream& os)
2034 {
2035 
2036   if ( DAQ_RUNNING ) 
2037     {
2038       os << MyHostName << "Run is active" << endl;;
2039       return -1;
2040     }
2041 
2042   deviceiterator d_it;
2043 
2044   for ( d_it = DeviceList.begin(); d_it != DeviceList.end(); ++d_it)
2045     {
2046       delete (*d_it);
2047     }
2048 
2049   DeviceList.clear();
2050   os << MyHostName << "Readlist cleared" << endl;
2051 
2052   return 0;
2053 }
2054 
2055 
2056 int rcdaq_init( const int snumber, pthread_mutex_t &M)
2057 {
2058 
2059   int status;
2060 
2061   servernumber = snumber;
2062   ThePort += servernumber;
2063     
2064   char hostname[HOST_NAME_MAX];
2065   status = gethostname(hostname, HOST_NAME_MAX);
2066   if (!status)
2067     {
2068       shortHostName = hostname;
2069       MyHostName = hostname;
2070       if (servernumber)
2071     {
2072       shortHostName = shortHostName + ":" + to_string(servernumber);
2073       MyHostName = MyHostName + ":" + to_string(servernumber);
2074     }
2075       MyHostName += " - ";
2076     }
2077   MyName = shortHostName;
2078 
2079   sprintf( daq_event_env_string, "DAQ_EVENTNUMBER=%d", -1);
2080   putenv(daq_event_env_string);
2081 
2082 
2083 
2084   //  pthread_mutex_init(&M_cout, 0); 
2085   M_cout = M;
2086 
2087   pthread_mutex_init( &WriteSem, 0);
2088   pthread_mutex_init( &WriteProtectSem, 0);
2089 
2090   
2091   pthread_mutex_init( &MonitoringRequestSem, 0);
2092   pthread_mutex_init( &SendSem, 0);
2093   pthread_mutex_init( &SendProtectSem, 0);
2094   pthread_mutex_init( &FdManagementSem,0);
2095 
2096   // pre-lock them except the "protect" ones
2097   pthread_mutex_lock( &MonitoringRequestSem);
2098   pthread_mutex_lock( &WriteSem);
2099   pthread_mutex_lock( &SendSem);
2100 
2101   ThreadEvt = 0;
2102 
2103   outfile_fd = 0;
2104 
2105   // we give the buffers our state variable
2106   Buffer1.setMD5State(&md5state);
2107   Buffer2.setMD5State(&md5state);
2108   
2109   fillBuffer = &Buffer1;
2110   transportBuffer = &Buffer2;
2111 
2112 
2113 #ifdef WRITEPRDF
2114   Buffer1.setEventFormat(DAQPRDFFORMAT);
2115   Buffer2.setEventFormat(DAQPRDFFORMAT);
2116 #endif
2117 
2118 
2119   status = pthread_create(&ThreadId, NULL, 
2120               writebuffers, 
2121               (void *) 0);
2122    
2123   if (status ) 
2124     {
2125       cout << "error in write thread create " << status << endl;
2126       exit(0);
2127     }
2128   else
2129     {
2130       pthread_mutex_lock(&M_cout); 
2131       cout << "write thread created" << endl;
2132       pthread_mutex_unlock(&M_cout);
2133     }
2134 
2135   status = pthread_create(&ThreadMon, NULL, 
2136               sendMonitorData, 
2137               (void *) 0);
2138      if (status ) 
2139     {
2140       cout << "error in send monitor data thread create " << status << endl;
2141       exit(0);
2142     }
2143   else
2144     {
2145       pthread_mutex_lock(&M_cout); 
2146       cout << "monitor thread created" << endl;
2147       pthread_mutex_unlock(&M_cout);
2148     }
2149    
2150   status = pthread_create(&ThreadMon, NULL, 
2151               monitorRequestwatcher_thread, 
2152               (void *) 0);
2153      if (status ) 
2154     {
2155       cout << "error in send monitor data thread create " << status << endl;
2156       exit(0);
2157     }
2158   else
2159     {
2160       pthread_mutex_lock(&M_cout); 
2161       cout << "monitor request thread created" << endl;
2162       pthread_mutex_unlock(&M_cout);
2163     }
2164    
2165    
2166   //  std::ostringstream outputstream;
2167   //  daq_webcontrol ( ThePort, outputstream);
2168   daq_webcontrol ( ThePort);
2169 
2170 
2171   
2172   return 0;
2173 }
2174 
2175 
2176 int get_runnumber()
2177 {
2178   if ( ! DAQ_RUNNING ) return -1; 
2179   return TheRun;
2180 }
2181 
2182 int get_oldrunnumber()  // like get_runnumber, but doesn't return -1 when stopped
2183 {
2184   return TheRun;
2185 }
2186 
2187 int get_eventnumber()
2188 {
2189   if ( !  DAQ_RUNNING ) return 0; 
2190   return Event_number;
2191 }
2192 double get_runvolume()
2193 {
2194   if ( ! DAQ_RUNNING ) return 0; 
2195   double v = run_volume;
2196   return v / (1024*1024);
2197 }
2198 int get_runduration()
2199 {
2200   if ( ! DAQ_RUNNING ) return 0; 
2201   return time(0) - StartTime;
2202 }
2203 
2204 int get_openflag()
2205 {
2206   return daq_open_flag;
2207 }
2208 int get_serverflag()
2209 {
2210   return daq_server_flag;
2211 }
2212 
2213 
2214 int daq_status( const int flag, std::ostream& os)
2215 {
2216 
2217   double volume = run_volume;
2218   volume /= (1024*1024);
2219 
2220   switch (flag)
2221     {
2222 
2223     case STATUSFORMAT_SHORT:    // "short format"
2224       
2225       if ( DAQ_RUNNING ) 
2226     {
2227       os << TheRun  << " " <<  Event_number -1 << " " 
2228          << volume << " ";
2229       os  << daq_open_flag << " ";
2230       os  << daq_server_flag << " ";
2231       os  << time(0) - StartTime << " ";
2232       os << get_current_filename() << " "
2233          << " \"" << MyName << "\"" << endl;
2234     }
2235       else
2236     {
2237       os << "-1 0 0 ";
2238       os  << daq_open_flag << " ";
2239       os  << daq_server_flag << " ";
2240       os << " 0 0"
2241          << " \"" << MyName << "\"" << endl;
2242 
2243     }
2244       
2245       break;
2246 
2247     case STATUSFORMAT_NORMAL:
2248       if ( DAQ_RUNNING ) 
2249     {
2250       os << MyHostName << "Run " << TheRun  
2251          << " Event: " << Event_number -1 
2252          << " Volume: " << volume;
2253       if ( daq_open_flag )
2254         {
2255           if ( daq_server_flag )
2256         {
2257           os << "  Logging enabled via remote server " << daq_server_name << " Port " << daq_server_port;
2258         }
2259           else
2260         {
2261           os << "  Logging enabled";
2262           if ( Buffer1.getCompression() ) os << "  compression enabled";
2263         }
2264         }
2265       else
2266         {
2267           os << "  Logging disabled";
2268         }
2269     }
2270       else   // not running
2271     {
2272       os << MyHostName << "Stopped";
2273       if ( daq_open_flag )
2274         {
2275           if ( daq_server_flag )
2276         {
2277           os << "  Logging enabled via remote server " << daq_server_name << " Port " << daq_server_port;
2278         }
2279           else
2280         {
2281           os << "  Logging enabled";
2282           if ( Buffer1.getCompression() ) os << "  compression enabled";
2283         }
2284         }
2285       else
2286         {
2287           if ( daq_server_flag )
2288         {
2289           os << "  Logging disabled, remote server set " << daq_server_name << " Port " << daq_server_port;
2290         }
2291           else
2292         {
2293           os << "  Logging disabled";
2294         }
2295         }         
2296     }
2297 
2298       if ( RolloverLimit)
2299     {
2300       os << "  File rollover: " << RolloverLimit << "GB";
2301     }
2302 
2303 
2304       os<< endl;
2305 
2306       break;
2307 
2308     default:  // flag 2++
2309       
2310       if ( DAQ_RUNNING ) 
2311     {
2312       os << MyHostName << " running"  << endl;
2313       //os << " " << MyHostName << ":" << endl;
2314       //os << "  Running" << endl;
2315       os << "  Run Number:    " << TheRun  << endl;
2316       os << "  Event:         " << Event_number << endl;;
2317       os << "  Run Volume:    " << volume << " MB"<< endl;
2318       os << "  Filerule:      " <<  daq_get_filerule() << endl;
2319       if ( RolloverLimit)
2320         {
2321           os << "  File rollover: " << RolloverLimit << "GB" << endl;
2322         }
2323       //else
2324       //  {
2325       //    os << "  File rollover:    disabled" << endl;
2326       //  }
2327 
2328       if ( daq_open_flag )
2329         {
2330           if ( daq_server_flag )
2331         {
2332           os << "  Filename on server: " << get_current_filename() << endl;
2333         }
2334           else
2335         {
2336           os << "  Filename:      " << get_current_filename() << endl;    
2337         }
2338         }
2339 
2340       os << "  Duration:      " <<  time(0) - StartTime << " s" <<endl;
2341 
2342       if (max_volume)
2343         {
2344           os << "  Volume Limit: " <<  max_volume /(1024 *1024) << " Mb" << endl;
2345         }
2346       if (max_events)
2347         {
2348           os << "  Event Limit:  " <<  max_events << endl;
2349         }
2350     }
2351       else  // not runnig
2352     {
2353       os << MyHostName << " Stopped"  << endl;
2354       if ( daq_open_flag )
2355         {
2356           if ( daq_server_flag )
2357         {
2358           os << "  Logging enabled via remote server " << daq_server_name << " Port " << daq_server_port << endl;
2359         }
2360           else
2361         {
2362           os << "  Logging enabled";
2363           if ( Buffer1.getCompression() ) os << "  compression enabled";
2364           os << endl;
2365         }
2366         }
2367       else
2368         {
2369           if ( daq_server_flag )
2370         {
2371           os << "  Logging disabled, remote server set " << daq_server_name << " Port " << daq_server_port << endl;
2372         }
2373           else
2374         {
2375           os << "  Logging disabled" << endl;
2376         }
2377         }         
2378       os << "  Filerule:     " <<  daq_get_filerule() << endl;
2379       
2380       if ( RolloverLimit)
2381         {
2382           os << "  File rollover: " << RolloverLimit << "GB" << endl;
2383         }
2384       //else
2385       //  {
2386       //    os << "  File rollover:    disabled" << endl;
2387       //  }
2388 
2389       
2390       if (max_volume)
2391         {
2392           os << "  Volume Limit: " <<  max_volume /(1024 *1024) << " Mb" << endl;
2393         }
2394       if (max_events)
2395         {
2396           os << "  Event Limit:  " <<  max_events << endl;
2397         }
2398       if ( TriggerH ) os << "  have a trigger object" << endl;
2399     }
2400       if (RunControlMode)
2401     {
2402       os << "  Run Control Mode enabled  " << endl;
2403     }
2404 
2405       os << "  Buffer Sizes:     " <<  Buffer1.getMaxSize()/1024 << " KB";
2406       if ( adaptivebuffering) 
2407     {
2408       os << " adaptive buffering: " << adaptivebuffering << " s"; 
2409     }
2410       os << endl;
2411 
2412       if ( ThePort)
2413         {
2414           os << "  Web control Port:  " << ThePort <<  endl;
2415         }
2416       else
2417         {
2418           os << "  No Web control defined" << endl;
2419         }
2420 
2421       if ( daq_getEventFormat())
2422     {
2423       os << "  Writing legacy format " << endl;
2424     }
2425 
2426       if ( ElogH)
2427         {
2428           os << "  Elog:  " << ElogH->getHost() << " " << ElogH->getLogbookName() << " Port " << ElogH->getPort() <<  endl;
2429         }
2430       else
2431         {
2432           os << "  Elog: not defined" << endl;
2433         }
2434 #ifdef HAVE_MOSQUITTO_H
2435       if (mqtt)
2436     {
2437       os << "  mqtt:     " << mqtt->GetHostName() << " Port " << mqtt->GetPort() << endl;  
2438     }
2439 #endif
2440       
2441       daq_status_runtypes ( os);
2442       daq_status_plugin(flag, os);
2443 
2444       break;
2445 
2446     }
2447 
2448   return 0;
2449 }
2450 
2451 int daq_webcontrol(const int port, std::ostream& os)
2452 {
2453 
2454   if (  port ==0)
2455     {
2456       ThePort=8899 + servernumber;
2457     }
2458   else
2459     {
2460       ThePort = port;
2461     }
2462 
2463   if ( ThreadWeb) // we had this thing running already 
2464     {
2465       mg_end();
2466       pthread_join(ThreadWeb, NULL);
2467       ThreadWeb = 0;
2468     }
2469   
2470   int status = pthread_create(&ThreadWeb, NULL, 
2471               mg_server, 
2472               (void *) &ThePort);
2473    
2474   if (status ) 
2475     {
2476       os << MyHostName << "error in web service creation " << status << endl;
2477       ThePort=0;
2478       return -1;
2479     }
2480   else
2481     {
2482       os << MyHostName << "web service created" << endl;
2483       return 0;
2484     }
2485   return 0;
2486 
2487 }
2488 
2489 int daq_getlastfilename( std::ostream& os)
2490 {
2491   if (get_previous_filename().empty() ) return -1;
2492   os <<  get_previous_filename() << endl;
2493   return 0;
2494 }
2495 
2496 int daq_getlastevent_number( std::ostream& os)
2497 {
2498   os <<  Event_number_at_last_end << endl;
2499   return Event_number_at_last_end;
2500 }
2501 
2502 int daq_running()
2503 {
2504   if ( DAQ_RUNNING ) return 1;
2505   return 0;
2506 }
2507 
2508 
2509 // the routines to deal with the remote server if we are logging this way.
2510 
2511 // 1) we open a socket with            open_serverSocket
2512 // 2) make the server open a file with server_send_beginrun_sequence
2513 // 3) ... send buffers
2514 // 4) send end-run with                server_send_endrun_sequence
2515 // 5) rinse and repeat 2...4
2516 // 6) tell the server we are done with server_send_close_sequence 
2517 
2518 int open_serverSocket(const char * hostname, const int port)
2519 {
2520 
2521   //  extern int h_errno;
2522   int sockfd = 0;
2523 
2524   struct addrinfo hints;
2525   memset(&hints, 0, sizeof(struct addrinfo));
2526   
2527   hints.ai_family = AF_INET;
2528   struct addrinfo *result, *rp;
2529 
2530   char port_str[512];
2531   sprintf(port_str, "%d", port);
2532 
2533   
2534   int status = getaddrinfo(hostname, port_str,
2535                        &hints,
2536                        &result);
2537 
2538   if ( status < 0)
2539     {
2540       cout << __FILE__<< " " << __LINE__ << " " << hostname << " " << gai_strerror(status) << endl;
2541       return status;
2542     }
2543   
2544   for (rp = result; rp != NULL; rp = rp->ai_next)
2545     {
2546 
2547       if ( (sockfd = socket(result->ai_family, result->ai_socktype,
2548                             result->ai_protocol) ) > 0 )
2549     {
2550       break;
2551     }
2552     }
2553 
2554   if ( sockfd < 0)
2555       {
2556     std::cout << __FILE__ << " " << __LINE__ << " error in socket" << std::endl;
2557     perror("socket");
2558     freeaddrinfo(result);
2559     return -1;
2560       }
2561   
2562       int xs = 512*1024;
2563   
2564       int s = setsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, &xs, sizeof(int));
2565       if (s) std::cout << "setsockopt status = " << s << std::endl;
2566 
2567   if ( connect(sockfd, rp->ai_addr, rp->ai_addrlen) < 0 ) 
2568     {
2569       std::cout << __FILE__ << " " << __LINE__ << " error in connect" << std::endl;
2570       perror("connect");
2571       freeaddrinfo(result);
2572       return -1;
2573     }
2574 
2575   freeaddrinfo(result);
2576   return sockfd;
2577 }
2578 
2579 int server_send_beginrun_sequence(const char * filename, const int runnumber, int fd)
2580 {
2581   int opcode;
2582   int status;
2583   int len;
2584   
2585   // std::cout << __FILE__ << " " << __LINE__ << " sending    " << CTRL_SENDFILENAME << std::endl ;
2586   opcode = htonl(CTRL_SENDFILENAME);
2587   status = writen(fd, (char *) &opcode, sizeof(int));
2588   if ( status != sizeof(int)) return -1;
2589 
2590   len = strlen(filename);
2591   opcode = htonl(len);
2592   //std::cout << __FILE__ << " " << __LINE__ << " sending    " << filename  << " len = " << len<< std::endl ;
2593 
2594   status = writen(fd, (char *) &opcode, sizeof(int));
2595   if ( status != sizeof(int)) return -1;
2596 
2597   status = writen(fd, (char *) filename, len);
2598   if ( status != len) return -1;
2599 
2600   status = readn (fd, (char *) &opcode, sizeof(int) );
2601   if ( status != sizeof(int)  || ntohl(opcode) != CTRL_REMOTESUCCESS )
2602     {
2603       perror("read_ack");
2604       return -1;
2605     }
2606 
2607   //std::cout << __FILE__ << " " << __LINE__ << " sending    " << CTRL_BEGINRUN  << std::endl ;
2608   opcode = htonl(CTRL_BEGINRUN);
2609   status = writen(fd, (char *) &opcode, sizeof(int));
2610   if ( status != sizeof(int)) return -1;
2611 
2612   //std::cout << __FILE__ << " " << __LINE__ << " sending    " << runnumber  << std::endl ;
2613   opcode = htonl(runnumber);
2614   status = writen(fd, (char *) &opcode, sizeof(int));
2615   if ( status != sizeof(int)) return -1;
2616   
2617   //std::cout << __FILE__ << " " << __LINE__ << " waiting for acknowledge...   " << std::endl ;
2618   status = readn (fd, (char *) &opcode, sizeof(int) );
2619   //std::cout << __FILE__ << " " << __LINE__ << " returned status " <<  ntohl(opcode) << endl;
2620   if ( status != sizeof(int)  )
2621     {
2622       perror("read_ack");
2623       return -1;
2624     }
2625   if (ntohl(opcode) != CTRL_REMOTESUCCESS )
2626     {
2627       return -1;
2628     }
2629 
2630   return 0;
2631 }
2632 
2633 int server_send_rollover_sequence(const char * filename, int fd)
2634 {
2635   int opcode;
2636   int status;
2637   int len;
2638   
2639   // std::cout << __FILE__ << " " << __LINE__ << " sending    " << CTRL_SENDFILENAME << std::endl ;
2640   opcode = htonl(CTRL_ROLLOVER);
2641   status = writen(fd, (char *) &opcode, sizeof(int));
2642   if ( status != sizeof(int)) return -1;
2643 
2644   len = strlen(filename);
2645   opcode = htonl(len);
2646   //std::cout << __FILE__ << " " << __LINE__ << " sending    " << filename  << " len = " << len<< std::endl ;
2647   status = writen(fd, (char *) &opcode, sizeof(int));
2648   if ( status != sizeof(int)) return -1;
2649 
2650   status = writen(fd, (char *) filename, len);
2651   if ( status != len) return -1;
2652 
2653   status = readn (fd, (char *) &opcode, sizeof(int) );
2654   if ( status != sizeof(int)  || ntohl(opcode) != CTRL_REMOTESUCCESS )
2655     {
2656       perror("read_ack");
2657       return -1;
2658     }
2659 
2660   return 0;
2661 }
2662 
2663 
2664 int server_send_endrun_sequence(int fd)
2665 {
2666   int opcode;
2667   int status;
2668 
2669   opcode = htonl(CTRL_ENDRUN);
2670   status = writen (fd, (char *)&opcode, sizeof(int) );
2671   if ( status != sizeof(int)) return -1;
2672   
2673   //  std::cout << __FILE__ << " " << __LINE__ << " waiting for acknowledge...   " << std::endl ;
2674   status = readn (fd, (char *) &opcode, sizeof(int) );
2675   if ( status != sizeof(int)  || ntohl(opcode) != CTRL_REMOTESUCCESS )
2676     {
2677       perror("read_ack");
2678       return -1;
2679     }
2680   //std::cout << __FILE__ << " " << __LINE__ << " ok " << std::endl;
2681 
2682   return 0;
2683 }
2684 
2685 int server_send_close_sequence(int fd)
2686 {
2687   int opcode;
2688   int status;
2689 
2690   opcode = htonl(CTRL_CLOSE);
2691   status = writen (fd, (char *)&opcode, sizeof(int) );
2692   if ( status != sizeof(int)) return -1;
2693   
2694   return 0;
2695 }
2696 
2697 // int update_fileSQLinfo()
2698 // {
2699 //   int sfd = get_sqlfd();
2700 //   md5_byte_t md5_digest[16];  
2701 //   char digest_string[33];
2702 
2703 //   if ( sfd)
2704 //     {
2705 //       md5_finish(&md5state, md5_digest);
2706 //       for ( int i=0; i< 16; i++) 
2707 //  {
2708 //    sprintf ( &digest_string[2*i], "%02x",  md5_digest[i]);
2709 //  }
2710 //       digest_string[32] = 0;
2711   
2712 //       std::ostringstream out;
2713 //       out << "update $FILETABLE set md5sum=\'" << digest_string << "\'"
2714 //    << ",lastevent=" << Event_number_at_last_write -1
2715 //    << ",events=" << Event_number_at_last_write - Event_number_at_last_open  
2716 //    << " where runnumber=" << TheRun
2717 //    << " and filename=\'" << CurrentFilename << "\';" << std::endl;
2718 //       write (sfd, out.str().c_str(), out.str().size());
2719 //     }
2720   
2721 //   return 0;
2722 // }
2723 
2724 // "what" refers to the various phases, new, update, end
2725 //int daq_generate_json (const int flag, const std::string what, const std::string type, std::ostream& os)
2726 int daq_generate_json (const int flag)
2727 {
2728 #ifdef HAVE_MOSQUITTO_H
2729 
2730   if ( !  mqtt) return 0;
2731   
2732   std::ostringstream out;
2733 
2734   if (flag == 0) // we start a new entry
2735     {
2736 
2737       out << "{\"file\": [" << endl;
2738       out << "    { \"what\":\"new\","
2739       << " \"runnumber\":" << TheRun << ","
2740       << " \"host\":\"" << shortHostName << "\","
2741       << " \"runtype\":\"" << TheRunType << "\","
2742       << " \"CurrentFileName\":\"" << CurrentFilename << "\","
2743       << " \"CurrentFileSequence\":" << current_filesequence << ","
2744       << " \"FirstEventNr\":" << Event_number_at_last_open << ","
2745       << " \"time\": " << time(0)  << " }" << endl;
2746       out << "] }" << endl;
2747     }
2748   else   // update an entry
2749     {
2750       md5_byte_t md5_digest[16];  
2751       char digest_string[33];
2752 
2753       if ( md5_enabled) 
2754     {
2755       md5_finish(&md5state, md5_digest);
2756       for ( int i=0; i< 16; i++) 
2757         {
2758           sprintf ( &digest_string[2*i], "%02x",  md5_digest[i]);
2759         }
2760       digest_string[32] = 0;
2761     }
2762       else
2763     {
2764       for ( int i=0; i< 16; i++) 
2765         {
2766           sprintf ( &digest_string[2*i], "ff");
2767         }
2768       digest_string[32] = 0;
2769     }
2770 
2771       out << "{\"file\": [" << endl;
2772       out << "    { \"what\":\"" << "update"
2773       << "\", \"runnumber\":" << TheRun << ","
2774       << " \"host\":\"" << shortHostName << "\","
2775       << " \"CurrentFileName\":\"" << CurrentFilename << "\","
2776       << " \"MD5\":\"" << digest_string << "\","
2777       << " \"LastEventNr\":" << Event_number_at_last_write -1 << ","
2778       << " \"NrEvents\":" <<  Event_number_at_last_write - Event_number_at_last_open << ","
2779       << " \"time\":" << time(0)  << " }" << endl;
2780       out << "] }" << endl;
2781     }
2782 
2783   mqtt->send(out.str());
2784   
2785 #endif
2786   
2787   return 0;
2788 }
2789 
2790 int daq_set_uservalue ( const int index, const int value, std::ostream& os  )
2791 {
2792   if ( index < 0 || index > 7)
2793     {
2794       os << "index out of range. (0..7)" << endl;
2795       return -1;
2796     }
2797   uservalues[index] = value;
2798   return 0;
2799 }
2800 
2801 
2802 int daq_get_uservalue ( const int index,  std::ostream& os )
2803 {
2804   if ( index < 0 || index > 7)
2805     {
2806       os << "index out of range. (0..7)" << endl;
2807       return -1;
2808     }
2809   os << uservalues[index] << endl;
2810   return 0;
2811 
2812 }
2813 
2814 int get_uservalue ( const int index)
2815 {
2816   if ( index < 0 || index > 7)
2817     {
2818       return 0;
2819     }
2820   return uservalues[index];
2821 
2822 }
2823