File indexing completed on 2025-08-02 08:21:07
0001
0002
0003 #include <stdio.h>
0004 #include <unistd.h>
0005 #include <fcntl.h>
0006
0007 #include <stdlib.h>
0008 #include <sys/types.h>
0009 #include <sys/stat.h>
0010 #include <signal.h>
0011 #include <limits.h>
0012 #include <errno.h>
0013 #include <sys/stat.h>
0014 #include <sys/file.h>
0015
0016 #include <pthread.h>
0017
0018 #include <iostream>
0019 #include <iomanip>
0020 #include <sstream>
0021
0022 #include <errno.h>
0023 #include <string.h>
0024 #include <unistd.h>
0025 #include <time.h>
0026 #include <sys/types.h>
0027 #include <sys/socket.h>
0028 #include <arpa/inet.h>
0029 #include <netinet/if_ether.h>
0030 #include <netinet/in.h>
0031 #include <netinet/ip.h>
0032 #include <net/if.h>
0033 #include <sys/ioctl.h>
0034 #include <netpacket/packet.h>
0035 #include <sys/socket.h>
0036 #include <netdb.h>
0037
0038 #include <vector>
0039 #include <map>
0040 #include <queue>
0041
0042
0043
0044 #include "getopt.h"
0045
0046 #include "daq_device.h"
0047 #include "daqBuffer.h"
0048 #include "eloghandler.h"
0049 #include "TriggerHandler.h"
0050
0051 #include "rcdaq.h"
0052 #include "rcdaq_rpc.h"
0053 #include "md5.h"
0054
0055 #ifdef HAVE_MOSQUITTO_H
0056 #include "MQTTConnection.h"
0057 #endif
0058
0059
0060
0061 int open_file_on_server(const int run_number);
0062 int server_open_Connection();
0063 int open_serverSocket(const char * host_name, const int port);
0064 int server_send_beginrun_sequence(const char * filename, const int runnumber, int fd);
0065 int server_send_rollover_sequence(const char * filename, int fd);
0066 int server_send_endrun_sequence(int fd);
0067 int server_send_close_sequence(int fd);
0068
0069 void * mg_server (void *arg);
0070 int mg_end();
0071 int request_mg_update(const int what);
0072
0073
0074 pthread_mutex_t WriteSem;
0075 pthread_mutex_t WriteProtectSem;
0076
0077 pthread_mutex_t MonitoringRequestSem;
0078 pthread_mutex_t SendSem;
0079 pthread_mutex_t SendProtectSem;
0080
0081 pthread_mutex_t FdManagementSem;
0082
0083 char pidfilename[128];
0084 char daq_event_env_string[128];
0085
0086
0087
0088
0089 int servernumber = 0;
0090
0091 int uservalues[8] = {0};
0092
0093 #define DAQ_TRIGGER 0x01
0094 #define DAQ_COMMAND 0x02
0095 #define DAQ_SPECIAL 0x04
0096
0097
0098
0099 #define COMMAND_INIT 1
0100 #define COMMAND_BEGIN 2
0101 #define COMMAND_END 3
0102 #define COMMAND_FINISH 4
0103 #define COMMAND_OPENP 5
0104
0105
0106
0107 #define DAQ_INIT 1
0108 #define DAQ_READ 2
0109
0110 using namespace std;
0111
0112 static std::map<string,string> RunTypes;
0113
0114 static std::string TheFileRule = "rcdaq-%08d-%04d.evt";
0115 static std::string TheRunType = " ";
0116 static std::string CurrentFilename = "";
0117 static std::string PreviousFilename = "";
0118
0119 static std::string TheRunnumberfile;
0120 static int RunnumberfileIsSet = 0;
0121
0122 static std::string TheRunnumberApp;
0123 static int RunnumberAppIsSet = 0;
0124
0125 static std::string MyName = "";
0126
0127 static int daq_open_flag = 0;
0128 static int daq_server_flag = 0;
0129 static int daq_server_port = 0;
0130 static std::string daq_server_name = "";
0131
0132 static int file_is_open = 0;
0133 static int server_is_open = 0;
0134 static int current_filesequence = 0;
0135 static int outfile_fd;
0136
0137 static int md5_enabled =1;
0138 static int md5_allow_turnoff = 0;
0139
0140 #ifdef HAVE_MOSQUITTO_H
0141 MQTTConnection *mqtt = 0;
0142 std::string mqtt_host;
0143 int mqtt_port = 0;
0144 #endif
0145
0146 static md5_state_t md5state;
0147
0148
0149 static int RunControlMode = 0;
0150 static int CurrentEventType = 0;
0151
0152 static ElogHandler *ElogH =0;
0153 static TriggerHandler *TriggerH =0;
0154
0155
0156 typedef std::vector<daq_device *> devicevector;
0157 typedef devicevector::iterator deviceiterator;
0158
0159 #define MAXEVENTID 32
0160 static int Eventsize[MAXEVENTID];
0161
0162
0163
0164
0165
0166
0167
0168
0169
0170
0171
0172
0173 int DAQ_RUNNING = 0;
0174 int DAQ_ENDREQUESTED = 0;
0175 int DAQ_BEGININPROGRESS =0;
0176
0177 static daqBuffer Buffer1;
0178 static daqBuffer Buffer2;
0179
0180 int Trigger_Todo;
0181 int Command_Todo;
0182
0183 int TheRun = 0;
0184 time_t StartTime = 0;
0185 int Buffer_number;
0186
0187 int Event_number;
0188 int Event_number_at_last_open = 0;
0189 int Event_number_at_last_write = 0;
0190 int Event_number_at_last_end = 0;
0191
0192
0193 int update_delta;
0194
0195 static int TriggerControl = 0;
0196
0197 int ThePort=8899;
0198
0199 int TheServerFD = 0;
0200
0201 daqBuffer *fillBuffer, *transportBuffer;
0202
0203 pthread_t ThreadId;
0204 pthread_t ThreadMon;
0205 pthread_t ThreadEvt;
0206 pthread_t ThreadWeb = 0;
0207
0208 int *thread_arg;
0209
0210 int end_thread = 0;
0211
0212 pthread_mutex_t M_cout;
0213
0214
0215 struct sockaddr_in si_mine;
0216
0217 #define MONITORINGPORT 9930
0218
0219
0220 devicevector DeviceList;
0221
0222
0223
0224 int NumberWritten = 0;
0225 unsigned long long BytesInThisRun = 0;
0226 unsigned long long BytesInThisFile = 0;
0227 unsigned long long RolloverLimit = 0;
0228
0229 unsigned long long run_volume, max_volume;
0230 int max_events;
0231
0232 time_t last_speed_time = 0;
0233 int last_event_nr = 0;
0234
0235 static time_t last_volume_time = 0;
0236 double last_volume = 0;
0237
0238 int EvtId = 0;
0239
0240
0241 int max_seconds = 0;
0242 int verbose = 0;
0243 int runnumber=1;
0244 int packetid = 1001;
0245 int max_buffers=0;
0246
0247 int adaptivebuffering = 15;
0248 int last_bufferwritetime = 0;
0249
0250 int persistentErrorCondition = 0;
0251
0252 std::string MyHostName;
0253 std::string shortHostName;
0254
0255 char *obtain_pidfilename()
0256 {
0257 return pidfilename;
0258 }
0259
0260 int registerTriggerHandler ( TriggerHandler *th)
0261 {
0262
0263 if ( TriggerH ) return -1;
0264 TriggerH = th;
0265 return 0;
0266 }
0267
0268 int clearTriggerHandler ()
0269 {
0270 TriggerH = 0;
0271 return 0;
0272 }
0273
0274
0275 void sig_handler(int i)
0276 {
0277 if (verbose)
0278 {
0279 pthread_mutex_lock(&M_cout);
0280 cout << "**interrupt " << endl;
0281 pthread_mutex_unlock(&M_cout);
0282 }
0283
0284 TriggerControl = 0;
0285 }
0286
0287
0288 int reset_deadtime()
0289 {
0290 if ( TriggerH) TriggerH->rearm();
0291 return 0;
0292 }
0293
0294 int enable_trigger()
0295 {
0296
0297 TriggerControl=1;
0298 if ( TriggerH) TriggerH->enable();
0299
0300 int status = pthread_create(&ThreadEvt, NULL,
0301 EventLoop,
0302 (void *) 0);
0303
0304 if (status )
0305 {
0306 cout << "error in event thread create " << status << endl;
0307 exit(0);
0308 }
0309 else
0310 {
0311 if ( TriggerH) TriggerH->rearm();
0312 }
0313
0314 return 0;
0315
0316 }
0317
0318 int disable_trigger()
0319 {
0320 TriggerControl=0;
0321 if ( TriggerH) TriggerH->disable();
0322
0323 return 0;
0324 }
0325
0326
0327 int daq_setmaxevents (const int n, std::ostream& os)
0328 {
0329
0330 max_events =n;
0331 return 0;
0332
0333 }
0334
0335 int daq_setmaxvolume (const int n_mb, std::ostream& os)
0336 {
0337 unsigned long long x = n_mb;
0338 max_volume =x * 1024 *1024;
0339 return 0;
0340
0341 }
0342
0343 int daq_setRunControlMode(const int flag, std::ostream& os)
0344 {
0345 if ( DAQ_RUNNING )
0346 {
0347 os << "Run is active" << endl;
0348 return -1;
0349 }
0350 if (flag) RunControlMode = 1;
0351 else RunControlMode = 0;
0352 return 0;
0353 }
0354
0355 int daq_setrolloverlimit (const int n_gb, std::ostream& os)
0356 {
0357 if ( DAQ_RUNNING )
0358 {
0359 os << "Run is active" << endl;
0360 return -1;
0361 }
0362 RolloverLimit = n_gb;
0363 return 0;
0364
0365 }
0366
0367 int daq_setmaxbuffersize (const int n_kb, std::ostream& os)
0368 {
0369 if ( DAQ_RUNNING )
0370 {
0371 os << "Run is active" << endl;
0372 return -1;
0373 }
0374
0375 Buffer1.setMaxSize(n_kb*1024);
0376 Buffer2.setMaxSize(n_kb*1024);
0377 return 0;
0378 }
0379
0380 int daq_setadaptivebuffering (const int usecs, std::ostream& os)
0381 {
0382 adaptivebuffering = usecs;
0383 return 0;
0384 }
0385
0386
0387
0388 int daq_setEventFormat(const int f, std::ostream& os )
0389 {
0390 if ( daq_running() )
0391 {
0392 os << "Run is active" << endl;
0393 return -1;
0394 }
0395
0396 if (DeviceList.size())
0397 {
0398 os << "Cannot switch format after devices are defined" << endl;
0399 return -1;
0400 }
0401 int status = Buffer1.setEventFormat(f);
0402 status |= Buffer2.setEventFormat(f);
0403 return status;
0404 }
0405
0406
0407
0408
0409 int daq_getEventFormat()
0410 {
0411 return Buffer1.getEventFormat();
0412 }
0413
0414 int daq_getRunControlMode(std::ostream& os)
0415 {
0416 os << RunControlMode << endl;
0417 return 0;
0418 }
0419
0420
0421 int daq_set_eloghandler( const char *host, const int port, const char *logname)
0422 {
0423
0424 if ( ElogH) delete ElogH;
0425 ElogH = new ElogHandler (host, port, logname );
0426
0427 setenv ( "DAQ_ELOGHOST", host , 1);
0428 char str [128];
0429 sprintf(str, "%d", port);
0430 setenv ( "DAQ_ELOGPORT", str , 1);
0431 setenv ( "DAQ_ELOGLOGBOOK", logname , 1);
0432
0433 return 0;
0434 }
0435
0436
0437
0438 int readn (int fd, char *ptr, const int nbytes)
0439 {
0440
0441
0442
0443 struct stat statbuf;
0444 fstat(fd, &statbuf);
0445 int fd_is_socket = 0;
0446 if ( S_ISSOCK(statbuf.st_mode) ) fd_is_socket = 1;
0447
0448 int nread, nleft;
0449 nleft = nbytes;
0450 while ( nleft>0 )
0451 {
0452 if ( fd_is_socket) nread = recv (fd, ptr, nleft, MSG_NOSIGNAL);
0453 else nread = read (fd, ptr, nleft);
0454
0455 if ( nread <= 0 )
0456 {
0457 return nread;
0458 }
0459
0460 nleft -= nread;
0461 ptr += nread;
0462 }
0463
0464 return (nbytes-nleft);
0465 }
0466
0467
0468 int writen (int fd, char *ptr, const int nbytes)
0469 {
0470
0471
0472
0473
0474 struct stat statbuf;
0475 fstat(fd, &statbuf);
0476 int fd_is_socket = 0;
0477 if ( S_ISSOCK(statbuf.st_mode) ) fd_is_socket = 1;
0478
0479 int nleft, nwritten;
0480 nleft = nbytes;
0481 while ( nleft>0 )
0482 {
0483 nwritten = 0;
0484
0485 if ( fd_is_socket) nwritten = send (fd, ptr, nleft, MSG_NOSIGNAL);
0486 else nwritten = write (fd, ptr, nleft);
0487
0488 if ( nwritten <0 )
0489 {
0490 perror ("writen");
0491 return 0;
0492 }
0493
0494 nleft -= nwritten;
0495 ptr += nwritten;
0496 }
0497 return (nbytes-nleft);
0498 }
0499
0500
0501
0502
0503
0504 int open_file(const int run_number, int *fd)
0505 {
0506
0507 if ( file_is_open) return -1;
0508
0509
0510 static char d[1024];
0511 sprintf( d, TheFileRule.c_str(),
0512 run_number, current_filesequence);
0513
0514
0515
0516 int ifd = open(d, O_WRONLY | O_CREAT | O_EXCL | O_LARGEFILE ,
0517 S_IRWXU | S_IROTH | S_IRGRP );
0518 if (ifd < 0)
0519 {
0520 pthread_mutex_lock(&M_cout);
0521 cout << " error opening file " << d << endl;
0522 perror ( d);
0523 pthread_mutex_unlock(&M_cout);
0524
0525 *fd = 0;
0526 return -1;
0527 }
0528
0529 *fd = ifd;
0530 CurrentFilename = d;
0531 PreviousFilename = CurrentFilename;
0532 md5_init(&md5state);
0533
0534 file_is_open =1;
0535
0536
0537
0538
0539
0540 if (current_filesequence == 0)
0541 {
0542 Event_number_at_last_open = Event_number;
0543 Event_number_at_last_write = Event_number;
0544 }
0545
0546
0547 daq_generate_json (0);
0548
0549 return 0;
0550 }
0551
0552 int open_file_on_server(const int run_number)
0553 {
0554
0555 if ( file_is_open) return -1;
0556
0557 int status = server_open_Connection();
0558 if (status) return -1;
0559
0560 static char d[1024];
0561 sprintf( d, TheFileRule.c_str(),
0562 run_number, current_filesequence);
0563
0564 status = server_send_beginrun_sequence(d, run_number, TheServerFD);
0565 if ( status)
0566 {
0567 return -1;
0568 }
0569
0570 CurrentFilename = d;
0571 PreviousFilename = CurrentFilename;
0572
0573 file_is_open =1;
0574
0575 return 0;
0576 }
0577
0578
0579
0580
0581 void *daq_end_thread (void *arg)
0582 {
0583
0584 std::ostream *os = (std::ostream *) arg;
0585
0586
0587
0588
0589 disable_trigger();
0590
0591
0592 if (ThreadEvt) pthread_join(ThreadEvt, NULL);
0593
0594 daq_end(*os);
0595 DAQ_ENDREQUESTED = 0;
0596
0597 return 0;
0598
0599 }
0600
0601
0602
0603
0604 void *shutdown_thread (void *arg)
0605 {
0606
0607 unsigned long *t_args = (unsigned long *) arg;
0608
0609
0610 pthread_mutex_lock(&M_cout);
0611 cout << "shutting down... " << t_args[0] << " " << t_args[1] << endl;
0612 int pid_fd = t_args[2];
0613 TriggerControl = 0;
0614 if ( TriggerH) delete TriggerH;
0615 pthread_mutex_unlock(&M_cout);
0616
0617 svc_unregister ( t_args[0], t_args[1]);
0618
0619 flock(pid_fd, LOCK_UN);
0620 unlink(pidfilename);
0621
0622 sleep(2);
0623 exit(0);
0624 }
0625
0626
0627
0628
0629
0630 std::queue<int> fd_queue;
0631
0632
0633 void *monitorRequestwatcher_thread (void *arg)
0634 {
0635
0636 struct sockaddr_in server_addr;
0637 int sockfd;
0638
0639 if ( (sockfd = socket(PF_INET, SOCK_STREAM, 0)) < 0 )
0640 {
0641 pthread_mutex_lock(&M_cout);
0642 cout << "cannot create socket" << endl;
0643 pthread_mutex_unlock(&M_cout);
0644 }
0645
0646 bzero( (char*)&server_addr, sizeof(server_addr));
0647 server_addr.sin_family = PF_INET;
0648 server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
0649 server_addr.sin_port = htons(MONITORINGPORT + servernumber);
0650
0651
0652 int status = bind(sockfd, (struct sockaddr*) &server_addr, sizeof(server_addr));
0653 if (status < 0)
0654 {
0655 perror("bind");
0656 }
0657
0658 pthread_mutex_lock(&M_cout);
0659 cout << "Listening for monitoring requests on port " << MONITORINGPORT + servernumber << endl;
0660 pthread_mutex_unlock(&M_cout);
0661
0662 listen(sockfd, 16);
0663
0664 int dd_fd;
0665 struct sockaddr_in out;
0666
0667 while (sockfd > 0)
0668 {
0669
0670 socklen_t len = sizeof(out);
0671 dd_fd = accept(sockfd, (struct sockaddr *) &out, &len);
0672 if ( dd_fd < 0 )
0673 {
0674 pthread_mutex_lock(&M_cout);
0675 cout << "error in accept socket" << endl;
0676 pthread_mutex_unlock(&M_cout);
0677 }
0678 else
0679 {
0680 char *host = new char[64];
0681
0682 getnameinfo((struct sockaddr *) &out, sizeof(struct sockaddr_in), host, 64,
0683 NULL, 0, NI_NOFQDN);
0684
0685
0686
0687
0688
0689
0690 pthread_mutex_lock(&FdManagementSem);
0691 fd_queue.push(dd_fd);
0692 pthread_mutex_unlock(&FdManagementSem);
0693
0694
0695 pthread_mutex_unlock(&MonitoringRequestSem);
0696 }
0697 }
0698 return 0;
0699 }
0700
0701 void handler ( int sig)
0702 {
0703
0704
0705
0706 pthread_mutex_unlock(&SendProtectSem);
0707 }
0708
0709 void *sendMonitorData( void *arg)
0710 {
0711
0712 int fd;
0713 int status;
0714
0715 while (1)
0716 {
0717
0718
0719
0720
0721
0722
0723
0724
0725
0726
0727 pthread_mutex_lock( &SendSem);
0728
0729
0730 while ( !fd_queue.empty() )
0731 {
0732
0733
0734
0735 pthread_mutex_lock(&FdManagementSem);
0736 fd = fd_queue.front();
0737 fd_queue.pop();
0738
0739 pthread_mutex_unlock(&FdManagementSem);
0740
0741 int max_length =0;
0742
0743
0744 if ( (status = readn (fd, (char *) &max_length, 4) ) <= 0)
0745 {
0746 max_length = 0;
0747 }
0748 else
0749 {
0750 max_length = ntohl(max_length);
0751
0752 status = transportBuffer->sendData(fd, max_length);
0753
0754 int reply;
0755 if ( ! status && ( status = readn (fd, (char *) &reply, 4) ) <= 0)
0756 {
0757
0758
0759
0760
0761 }
0762 else
0763 {
0764
0765
0766
0767
0768 }
0769 }
0770 close(fd);
0771 }
0772
0773
0774
0775
0776 pthread_mutex_unlock(&SendProtectSem);
0777 if ( end_thread) pthread_exit(0);
0778
0779 }
0780 return 0;
0781 }
0782
0783 void *writebuffers ( void * arg)
0784 {
0785
0786
0787 while(1)
0788 {
0789
0790 pthread_mutex_lock( &WriteSem);
0791
0792
0793 last_bufferwritetime = time(0);
0794 if ( daq_open_flag && outfile_fd)
0795 {
0796 unsigned int bytecount = transportBuffer->writeout(outfile_fd);
0797 NumberWritten++;
0798 BytesInThisRun += bytecount;
0799 BytesInThisFile += bytecount;
0800 }
0801 else if ( daq_server_flag && TheServerFD)
0802 {
0803 unsigned int bytecount = transportBuffer->sendout(TheServerFD);
0804 NumberWritten++;
0805 BytesInThisRun += bytecount;
0806 BytesInThisFile += bytecount;
0807 }
0808
0809 if ( end_thread) pthread_exit(0);
0810
0811 pthread_mutex_unlock(&WriteProtectSem);
0812 }
0813 return 0;
0814 }
0815
0816 int switch_buffer()
0817 {
0818
0819
0820
0821
0822
0823 daqBuffer *spare;
0824
0825 pthread_mutex_lock(&WriteProtectSem);
0826 pthread_mutex_lock(&SendProtectSem);
0827
0828 fillBuffer->addEoB();
0829
0830
0831 spare = transportBuffer;
0832 transportBuffer = fillBuffer;
0833 fillBuffer = spare;
0834
0835
0836 fillBuffer->prepare_next(++Buffer_number, TheRun);
0837
0838
0839
0840
0841 if ( daq_open_flag && RolloverLimit)
0842 {
0843 unsigned int blength = transportBuffer->getLength();
0844
0845 if ( blength + BytesInThisFile > RolloverLimit * 1024 * 1024 * 1024)
0846 {
0847
0848 if ( daq_server_flag)
0849 {
0850
0851 current_filesequence++;
0852 static char d[1024];
0853 sprintf( d, TheFileRule.c_str(),
0854 TheRun, current_filesequence);
0855
0856 cout << __FILE__ << " " << __LINE__ << " rolling over " << d << endl;
0857
0858 int status = server_send_rollover_sequence(d,TheServerFD);
0859 CurrentFilename = d;
0860 }
0861 else
0862 {
0863 close(outfile_fd);
0864 file_is_open = 0;
0865
0866 current_filesequence++;
0867 daq_generate_json(1);
0868
0869 Event_number_at_last_open = Event_number_at_last_write;
0870 int status = open_file ( TheRun, &outfile_fd);
0871 if (status)
0872 {
0873 cout << MyHostName << "Could not open output file - Run " << TheRun << " file sequence " << current_filesequence<< endl;
0874 }
0875 }
0876
0877
0878
0879
0880
0881
0882 BytesInThisFile = 0;
0883 }
0884 }
0885
0886 Event_number_at_last_write = Event_number;
0887 pthread_mutex_unlock(&WriteSem);
0888 pthread_mutex_unlock(&SendSem);
0889 return 0;
0890
0891 }
0892
0893 int daq_set_runnumberfile(const char *file, const int flag)
0894 {
0895 if ( flag)
0896 {
0897 TheRunnumberfile.clear();
0898 RunnumberfileIsSet = 0;
0899 return 0;
0900 }
0901
0902 TheRunnumberfile = file;
0903 RunnumberfileIsSet = 1;
0904 FILE *fp = fopen(TheRunnumberfile.c_str(), "r");
0905 int r = 0;
0906 if (fp)
0907 {
0908 int status = fscanf(fp, "%d", &r);
0909 if ( status != 1) r = 0;
0910 if ( ! TheRun )
0911 {
0912 TheRun = r;
0913 }
0914 fclose(fp);
0915 }
0916
0917 return 0;
0918 }
0919
0920 int daq_write_runnumberfile(const int run)
0921 {
0922 if ( !RunnumberfileIsSet ) return 1;
0923 if ( RunControlMode ) return 1;
0924
0925 FILE *fp = fopen(TheRunnumberfile.c_str(), "w");
0926 if (fp )
0927 {
0928 fprintf(fp, "%d\n", run);
0929 fclose(fp);
0930 }
0931
0932 return 0;
0933 }
0934
0935
0936 int daq_set_runnumberApp(const char *file, const int flag)
0937 {
0938 if ( flag)
0939 {
0940 TheRunnumberApp.clear();
0941 RunnumberAppIsSet = 0;
0942 return 0;
0943 }
0944
0945 TheRunnumberApp = file;
0946 RunnumberAppIsSet = 1;
0947 return 0;
0948 }
0949
0950 int getRunNumberFromApp()
0951 {
0952 if ( ! RunnumberAppIsSet) return -1;
0953 FILE *fp = popen(TheRunnumberApp.c_str(),"r");
0954 if (fp == NULL)
0955 {
0956 std::cerr << "error running the runnumber app" << std::endl;
0957 return -1;
0958 }
0959 char in[64];
0960 int len = fread(in, 1, 64, fp);
0961 pclose(fp);
0962
0963 std::stringstream s (in);
0964 int run;
0965 if (! (s >> run) )
0966 {
0967 return -1;
0968 }
0969
0970 return run;
0971 }
0972
0973
0974
0975 int daq_set_filerule(const char *rule , std::ostream& os)
0976 {
0977 if ( DAQ_RUNNING )
0978 {
0979 os << MyHostName << "Run is active" << endl;;
0980 return -1;
0981 }
0982
0983 TheFileRule = rule;
0984 return 0;
0985 }
0986
0987 int daq_set_name(const char *name)
0988 {
0989 MyName = name;
0990 if (servernumber)
0991 {
0992 MyName = MyName + ":" + to_string(servernumber);
0993 }
0994 request_mg_update (MG_REQUEST_NAME);
0995 return 0;
0996 }
0997
0998 #ifdef HAVE_MOSQUITTO_H
0999 int daq_set_mqtt_host(const char * host, const int port, std::ostream& os)
1000 {
1001 std::cout << __FILE__ << " " << __LINE__ << " mqtt host " << host << " port " << port << endl;
1002 if (mqtt) delete mqtt;
1003
1004 if (strcasecmp(host, "None") == 0)
1005 {
1006 mqtt = 0;
1007 mqtt_host = "";
1008 mqtt_port = 0;
1009 return 0;
1010 }
1011
1012 mqtt_host = host;
1013 mqtt = new MQTTConnection(mqtt_host, "rcdaq", port);
1014 if ( mqtt->Status())
1015 {
1016 delete mqtt;
1017 mqtt =0;
1018 mqtt_host = "";
1019 mqtt_port = 0;
1020 os << "Could not connect to host " << host << " on port " << port << endl;
1021 return 1;
1022 }
1023
1024 mqtt_host = host;
1025 mqtt_port = port;
1026
1027 return 0;
1028 }
1029
1030 int daq_get_mqtt_host(std::ostream& os)
1031 {
1032 if (!mqtt)
1033 {
1034 os << " No MQTT host defined" << endl;
1035 return 1;
1036 }
1037 os << " Host " << mqtt->GetHostName() << " " << " port " << mqtt->GetPort() << endl;
1038 return 0;
1039 }
1040 #endif
1041
1042
1043
1044
1045 int daq_setruntype(const char *type, std::ostream& os )
1046 {
1047
1048 if ( DAQ_RUNNING )
1049 {
1050 os << MyHostName << "Run is active" << endl;;
1051 return -1;
1052 }
1053
1054 std::string _type = type;
1055 std::map <string,string>::const_iterator iter = RunTypes.begin();
1056 for ( ; iter != RunTypes.end(); ++iter)
1057 {
1058 if ( iter->first == _type )
1059 {
1060 TheFileRule = iter->second;
1061 TheRunType = _type;
1062 return 0;
1063 }
1064 }
1065 os << " Run type " << type << " is not defined " << endl;
1066 return 1;
1067 }
1068
1069 int daq_getruntype(const int flag, std::ostream& os)
1070 {
1071 std::map <string,string>::const_iterator iter = RunTypes.begin();
1072 for ( ; iter != RunTypes.end(); ++iter)
1073 {
1074 if ( iter->second == TheFileRule )
1075 {
1076 if ( flag == 2)
1077 {
1078 os << iter->first << " - " << iter->second << endl;
1079 return 0;
1080 }
1081 else
1082 {
1083 os << iter->first << endl;
1084 return 0;
1085 }
1086 }
1087 }
1088 return 0;
1089 }
1090
1091
1092 int daq_define_runtype(const char *type, const char *rule)
1093 {
1094 std::string _type = type;
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104 RunTypes[_type] = rule;
1105 return 0;
1106 }
1107
1108 int daq_status_runtypes (std::ostream& os )
1109 {
1110 os << " -- defined Run Types: ";
1111 return daq_list_runtypes(2, os);
1112 }
1113
1114 int daq_list_runtypes(const int flag, std::ostream& os)
1115 {
1116
1117 if ( flag && RunTypes.size() == 0 )
1118 {
1119 os << " (none)" <<endl;
1120 return 0;
1121 }
1122 if (flag ==2) os << endl;
1123
1124 std::map <string,string>::const_iterator iter = RunTypes.begin();
1125 for ( ; iter != RunTypes.end(); ++iter)
1126 {
1127 if (flag)
1128 {
1129 os << " " << setw(12) << iter->first << " - " << iter->second << endl;
1130 }
1131 else
1132 {
1133 os << iter->first << endl;
1134 }
1135
1136 }
1137 return 0;
1138 }
1139
1140 int daq_get_name (std::ostream& os )
1141 {
1142 os << MyName << endl;
1143 return 0;
1144 }
1145
1146 std::string daq_get_myname()
1147 {
1148 return MyName;
1149 }
1150
1151 std::string& daq_get_filerule()
1152 {
1153 return TheFileRule;
1154 }
1155
1156 std::string& get_current_filename()
1157 {
1158 return CurrentFilename;
1159 }
1160
1161 std::string& get_previous_filename()
1162 {
1163 return PreviousFilename;
1164 }
1165
1166 double daq_get_mb_per_second()
1167 {
1168
1169 time_t now_time = time(0);
1170 if ( now_time == last_volume_time) return 0;
1171 double time_delta = now_time - last_volume_time;
1172 double mb_per_second = ( run_volume - last_volume) / time_delta / 1024. /1204.;
1173 last_volume = run_volume;
1174 last_volume_time = now_time;
1175 return mb_per_second;
1176 }
1177
1178 double daq_get_events_per_second()
1179 {
1180 time_t now_time = time(0);
1181 if ( now_time == last_speed_time) return 0;
1182 double time_delta = now_time - last_speed_time;
1183 double events_per_second = ( Event_number - last_event_nr) / time_delta;
1184 last_event_nr = Event_number;
1185 last_speed_time = now_time;
1186 return events_per_second;
1187 }
1188
1189
1190 void * daq_begin_thread( void *arg)
1191 {
1192 int irun = *(int*)arg;
1193 int status = daq_begin( irun, std::cout);
1194 if (status)
1195 {
1196
1197 cout << __FILE__ << " " << __LINE__ << " asynchronous begin run failed" << endl;
1198 }
1199
1200 DAQ_BEGININPROGRESS = 0;
1201
1202 return 0;
1203 }
1204
1205 int daq_begin_immediate(const int irun, std::ostream& os)
1206 {
1207 static unsigned int b_arg;
1208 b_arg = irun;
1209
1210 if ( DAQ_RUNNING )
1211 {
1212 os << MyHostName << "Run is active" << endl;
1213 return -1;
1214 }
1215 if ( irun )
1216 {
1217 os << MyHostName << "Run " << irun << " begin requested" << endl;
1218 }
1219 else
1220 {
1221 os << MyHostName << "Run " << TheRun+1 << " begin requested" << endl;
1222 }
1223
1224 DAQ_BEGININPROGRESS = 1;
1225
1226 pthread_t t;
1227
1228 int status = pthread_create(&t, NULL,
1229 daq_begin_thread,
1230 (void *) &b_arg);
1231 if (status )
1232 {
1233
1234 cout << "begin_run failed " << status << endl;
1235 os << "begin_run failed " << status << endl;
1236 return -1;
1237 }
1238 return 0;
1239 }
1240
1241
1242 int daq_begin(const int irun, std::ostream& os)
1243 {
1244 if ( DAQ_RUNNING )
1245 {
1246 os << MyHostName << "Run is already active" << endl;;
1247 return -1;
1248 }
1249
1250 if ( RunControlMode && irun ==0 )
1251 {
1252 os << MyHostName << " No automatic Run Numbers in Run Control Mode" << endl;;
1253 return -1;
1254 }
1255
1256 if ( persistentErrorCondition)
1257 {
1258 os << MyHostName << "*** Previous error with server connection" << endl;;
1259 return -1;
1260 }
1261
1262 if (ThreadEvt) pthread_join(ThreadEvt, NULL);
1263
1264
1265
1266 DAQ_RUNNING = 1;
1267 current_filesequence = 0;
1268
1269
1270
1271 if ( irun ==0)
1272 {
1273 if ( RunnumberAppIsSet)
1274 {
1275 int run = getRunNumberFromApp();
1276 if ( run < 0)
1277 {
1278 os << MyHostName << "Could not obtain a run number from " << TheRunnumberApp << ", run not started" << endl;
1279 DAQ_RUNNING = 0;
1280 return -1;
1281 }
1282 TheRun = run;
1283 }
1284 else
1285 {
1286 TheRun++;
1287 }
1288 }
1289 else
1290 {
1291 TheRun = irun;
1292 }
1293
1294
1295 Buffer_number = 1;
1296 Event_number = 1;
1297 Event_number_at_last_write = 0;
1298 Event_number_at_last_open = 0;
1299 Event_number_at_last_end = 0;
1300
1301
1302 BytesInThisRun = 0;
1303 BytesInThisFile = 0;
1304
1305
1306 if ( daq_open_flag)
1307 {
1308 if ( daq_server_flag)
1309 {
1310
1311 int status = open_file_on_server(TheRun);
1312 if ( !status)
1313 {
1314
1315 if (ElogH) ElogH->BegrunLog( TheRun,"RCDAQ",
1316 get_current_filename());
1317
1318 daq_write_runnumberfile(TheRun);
1319 last_bufferwritetime = time(0);
1320
1321 }
1322 else
1323 {
1324 os << MyHostName << "Could not open remote output file - Run " << TheRun << " not started" << endl;;
1325 DAQ_RUNNING = 0;
1326 return -1;
1327 }
1328 }
1329
1330 else
1331 {
1332 int status = open_file ( TheRun, &outfile_fd);
1333 if ( !status)
1334 {
1335 if (ElogH) ElogH->BegrunLog( TheRun,"RCDAQ",
1336 get_current_filename());
1337
1338 daq_write_runnumberfile(TheRun);
1339 last_bufferwritetime = time(0);
1340
1341 }
1342 else
1343 {
1344 os << MyHostName << "Could not open output file - Run " << TheRun << " not started" << endl;;
1345 DAQ_RUNNING = 0;
1346 return -1;
1347 }
1348 }
1349
1350
1351 }
1352
1353
1354
1355
1356 DAQ_ENDREQUESTED = 0;
1357
1358 cout << "starting run " << TheRun << " at " << time(0) << endl;
1359 set_eventsizes();
1360
1361
1362
1363
1364
1365
1366 int wantedmaxsize = 0;
1367 for (int i = 0; i< MAXEVENTID; i++)
1368 {
1369 if ( (4*Eventsize[i] + 4*32) > fillBuffer->getMaxSize()
1370 || (4*Eventsize[i] + 4*32) > transportBuffer->getMaxSize() )
1371 {
1372 int x = 4*Eventsize[i] + 4*32;
1373 if ( x > wantedmaxsize ) wantedmaxsize = x;
1374 }
1375 }
1376 if ( wantedmaxsize)
1377 {
1378 if ( fillBuffer->setMaxSize(wantedmaxsize) || transportBuffer->setMaxSize(wantedmaxsize))
1379 {
1380 os << MyHostName << "Cannot start run - event sizes larger than buffer, size "
1381 << wantedmaxsize/1024 << " Buffer size "
1382 << transportBuffer->getMaxSize()/1024 << endl;
1383 DAQ_RUNNING = 0;
1384 return -1;
1385 }
1386
1387
1388 }
1389
1390 last_volume_time = last_speed_time = StartTime = time(0);
1391 last_event_nr = 0;
1392 last_volume = 0;
1393
1394
1395
1396 char str[128];
1397
1398 sprintf(str, "%d", TheRun);
1399 setenv ( "DAQ_RUNNUMBER", str , 1);
1400
1401
1402
1403
1404 setenv ( "DAQ_FILERULE", TheFileRule.c_str() , 1);
1405
1406 sprintf( str, "%ld", StartTime);
1407 setenv ( "DAQ_STARTTIME", str , 1);
1408
1409 if ( daq_open_flag || daq_server_flag )
1410 {
1411 setenv ( "DAQ_FILENAME", CurrentFilename.c_str() , 1);
1412 }
1413
1414
1415
1416 md5_init(&md5state);
1417
1418 fillBuffer->prepare_next(Buffer_number,TheRun);
1419
1420 run_volume = 0;
1421
1422 device_init();
1423
1424
1425 readout(BEGRUNEVENT);
1426
1427
1428 enable_trigger();
1429
1430 request_mg_update (MG_REQUEST_SPEED);
1431
1432 os << MyHostName << "Run " << TheRun << " started" << endl;
1433
1434 return 0;
1435 }
1436
1437 int daq_end_immediate(std::ostream& os)
1438 {
1439 if ( ! (DAQ_RUNNING) )
1440 {
1441 os << MyHostName << "Run is not active" << endl;;
1442 return -1;
1443 }
1444 os << MyHostName << "Run " << TheRun << " end requested" << endl;
1445 DAQ_ENDREQUESTED = 1;
1446
1447 pthread_t t;
1448
1449 void * x = &os;
1450
1451
1452 int status = pthread_create(&t, NULL,
1453 daq_end_thread,
1454 x);
1455 if (status )
1456 {
1457
1458 cout << "end_run failed " << status << endl;
1459 os << "end_run failed " << status << endl;
1460 }
1461
1462 return 0;
1463 }
1464
1465
1466
1467 int daq_wait_for_begin_done()
1468 {
1469 while ( DAQ_BEGININPROGRESS ) usleep(10000);
1470 return 0;
1471 }
1472
1473
1474 int daq_wait_for_actual_end()
1475 {
1476
1477 while ( DAQ_ENDREQUESTED )
1478 {
1479 usleep(10000);
1480 }
1481
1482 return 0;
1483 }
1484
1485 int daq_end_interactive(std::ostream& os)
1486 {
1487
1488 void *x = &os;
1489 daq_end_thread (x);
1490 return 0;
1491 }
1492
1493
1494
1495 int daq_end(std::ostream& os)
1496 {
1497
1498 if ( ! (DAQ_RUNNING) )
1499 {
1500 os << MyHostName << "Run is not active" << endl;;
1501 return -1;
1502 }
1503
1504 disable_trigger();
1505 device_endrun();
1506
1507 readout(ENDRUNEVENT);
1508 switch_buffer();
1509
1510 if ( file_is_open )
1511 {
1512
1513 pthread_mutex_lock(&WriteProtectSem);
1514 pthread_mutex_unlock(&WriteProtectSem);
1515
1516 double v = run_volume;
1517 v /= (1024*1024);
1518
1519 if (ElogH) ElogH->EndrunLog( TheRun,"RCDAQ", Event_number, v, StartTime);
1520
1521 if ( daq_server_flag)
1522 {
1523 server_send_endrun_sequence(TheServerFD);
1524 if ( server_send_close_sequence(TheServerFD) )
1525 {
1526 std::cout << __FILE__ << " " << __LINE__ << " error in closing connection... " << std::endl;
1527 }
1528 close(TheServerFD);
1529 TheServerFD = 0;
1530 }
1531 else
1532 {
1533 close (outfile_fd);
1534 daq_generate_json(1);
1535 outfile_fd = 0;
1536 }
1537 file_is_open = 0;
1538
1539
1540 }
1541
1542
1543
1544 unsetenv ("DAQ_RUNNUMBER");
1545 unsetenv ("DAQ_FILENAME");
1546 unsetenv ("DAQ_STARTTIME");
1547
1548 Event_number_at_last_end = Event_number;
1549 Event_number = 0;
1550 Event_number_at_last_write = 0;
1551 run_volume = 0;
1552 BytesInThisRun = 0;
1553 BytesInThisFile = 0;
1554 Buffer_number = 0;
1555 PreviousFilename = CurrentFilename;
1556 CurrentFilename = "";
1557 StartTime = 0;
1558 DAQ_RUNNING = 0;
1559
1560 last_volume_time = last_speed_time = time(0);
1561 last_event_nr = 0;
1562 last_volume = 0;
1563
1564 request_mg_update (MG_REQUEST_SPEED);
1565
1566 DAQ_ENDREQUESTED = 0;
1567
1568 os << MyHostName << "Run " << TheRun << " ended" << endl;
1569
1570 return 0;
1571 }
1572
1573 int daq_fake_trigger (const int n, const int waitinterval)
1574 {
1575 int i;
1576 for ( i = 0; i < n; i++)
1577 {
1578
1579 Trigger_Todo=DAQ_READ;
1580
1581
1582
1583
1584
1585 usleep (200000);
1586
1587 }
1588 return 0;
1589 }
1590
1591
1592 void * EventLoop( void *arg)
1593 {
1594
1595
1596
1597
1598
1599 int rstatus;
1600
1601 while (TriggerControl)
1602 {
1603
1604
1605 if (TriggerH)
1606 {
1607 CurrentEventType = TriggerH->wait_for_trigger();
1608 }
1609 else
1610 {
1611 CurrentEventType = 1;
1612 usleep (100000);
1613 }
1614
1615
1616 if (CurrentEventType)
1617 {
1618
1619 if ( DAQ_RUNNING )
1620 {
1621
1622 rstatus = readout(CurrentEventType);
1623
1624 if ( rstatus)
1625 {
1626 TriggerControl = 0;
1627
1628 cout << __LINE__ << " calling daq_end" << endl;
1629 daq_end ( std::cout);
1630 }
1631 else
1632 {
1633 rearm(DATAEVENT);
1634 reset_deadtime();
1635 }
1636 }
1637 else
1638 {
1639 cout << "Run not active" << endl;
1640
1641 TriggerControl = 0;
1642 }
1643 }
1644 }
1645
1646 return 0;
1647
1648 }
1649
1650
1651 int add_readoutdevice ( daq_device *d)
1652 {
1653 DeviceList.push_back (d);
1654 return 0;
1655
1656 }
1657
1658
1659 int device_init()
1660 {
1661
1662 deviceiterator d_it;
1663
1664 for ( d_it = DeviceList.begin(); d_it != DeviceList.end(); ++d_it)
1665 {
1666
1667
1668 (*d_it)->init();
1669 }
1670
1671 return 0;
1672 }
1673
1674 int device_endrun()
1675 {
1676
1677 deviceiterator d_it;
1678
1679 for ( d_it = DeviceList.begin(); d_it != DeviceList.end(); ++d_it)
1680 {
1681
1682
1683 (*d_it)->endrun();
1684 }
1685
1686 return 0;
1687 }
1688
1689
1690 int readout(const int etype)
1691 {
1692
1693
1694
1695
1696
1697 int len = EVTHEADERLENGTH;
1698
1699 if (etype < 0 || etype>MAXEVENTID) return 0;
1700
1701 deviceiterator d_it;
1702
1703 sprintf( daq_event_env_string, "DAQ_EVENTNUMBER=%d", Event_number);
1704
1705
1706 int status = fillBuffer->nextEvent(etype,Event_number, Eventsize[etype]);
1707 if (status != 0)
1708 {
1709 switch_buffer();
1710 status = fillBuffer->nextEvent(etype,Event_number, Eventsize[etype]);
1711 }
1712
1713 for ( d_it = DeviceList.begin(); d_it != DeviceList.end(); ++d_it)
1714 {
1715 len += fillBuffer->addSubevent ( (*d_it) );
1716 }
1717
1718 Event_number++;
1719 Event_number_at_last_end = Event_number;
1720
1721
1722 run_volume += 4*len;
1723
1724 int returncode = 0;
1725
1726 if ( DAQ_RUNNING )
1727 {
1728 if ( etype == DATAEVENT && max_volume > 0 && run_volume >= max_volume)
1729 {
1730 cout << " automatic end after " << max_volume /(1024*1024) << " Mb" << endl;
1731 returncode = 1;
1732 }
1733
1734 if ( etype == DATAEVENT && max_events > 0 && Event_number >= max_events )
1735 {
1736 cout << " automatic end after " << max_events<< " events" << endl;
1737 returncode = 1;
1738 }
1739 }
1740
1741 if ( adaptivebuffering && time(0) - last_bufferwritetime > adaptivebuffering )
1742 {
1743 switch_buffer();
1744
1745 }
1746
1747 return returncode;
1748 }
1749
1750
1751 int rearm(const int etype)
1752 {
1753
1754 if (etype < 0 || etype>MAXEVENTID) return 0;
1755
1756 deviceiterator d_it;
1757
1758 for ( d_it = DeviceList.begin(); d_it != DeviceList.end(); ++d_it)
1759 {
1760 (*d_it)->rearm(etype);
1761 }
1762
1763 return 0;
1764 }
1765
1766 void set_eventsizes()
1767 {
1768 int i;
1769 int size;
1770 deviceiterator d_it;
1771
1772 for (i = 0; i< MAXEVENTID; i++)
1773 {
1774 size = EVTHEADERLENGTH;
1775
1776
1777 for ( d_it = DeviceList.begin(); d_it != DeviceList.end(); ++d_it)
1778 {
1779 size += (*d_it)->max_length(i) ;
1780 }
1781
1782 Eventsize[i] = size;
1783 if (size>EVTHEADERLENGTH)
1784 cout << "Event id " << i << " size " << size << endl;
1785 }
1786 }
1787
1788 int daq_open (std::ostream& os)
1789 {
1790
1791 if ( DAQ_RUNNING )
1792 {
1793 os << MyHostName << "Run is active" << endl;;
1794 return -1;
1795 }
1796
1797
1798
1799
1800
1801
1802
1803 persistentErrorCondition = 0;
1804
1805 daq_open_flag =1;
1806 return 0;
1807 }
1808
1809 int daq_set_compression (const int flag, std::ostream& os)
1810 {
1811
1812 if ( DAQ_RUNNING )
1813 {
1814 os << MyHostName << "Run is active" << endl;;
1815 return -1;
1816 }
1817 if (flag)
1818 {
1819 Buffer1.setCompression(1);
1820 Buffer2.setCompression(1);
1821 }
1822 else
1823 {
1824 Buffer1.setCompression(0);
1825 Buffer2.setCompression(0);
1826 }
1827
1828 return 0;
1829 }
1830
1831 int daq_set_md5enable (const int flag, std::ostream& os)
1832 {
1833
1834 if ( DAQ_RUNNING )
1835 {
1836 os << MyHostName << "Run is active" << endl;;
1837 return -1;
1838 }
1839 if ( ! md5_allow_turnoff)
1840 {
1841 os << MyHostName << " MD5 switchoff not enabled" << endl;;
1842 return 0;
1843 }
1844
1845 if (flag)
1846 {
1847 Buffer1.setMD5Enabled(1);
1848 Buffer2.setMD5Enabled(1);
1849 md5_enabled = 1;
1850 }
1851 else
1852 {
1853 Buffer1.setMD5Enabled(0);
1854 Buffer2.setMD5Enabled(0);
1855 md5_enabled = 0;
1856 }
1857
1858 return 0;
1859 }
1860
1861 int daq_set_md5allowturnoff (const int flag, std::ostream& os)
1862 {
1863 if (flag)
1864 {
1865 md5_allow_turnoff = 1;
1866 os << MyHostName << "MD5 turnoff allowed" << endl;
1867 }
1868 else
1869 {
1870 md5_allow_turnoff = 0;
1871 os << MyHostName << "MD5 turnoff not allowed" << endl;
1872 }
1873 return 0;
1874 }
1875
1876 int daq_set_server (const char *hostname, const int port, std::ostream& os)
1877 {
1878
1879 if ( DAQ_RUNNING )
1880 {
1881 os << MyHostName << "Run is active" << endl;;
1882 return -1;
1883 }
1884
1885 daq_server_name = hostname;
1886 if ( daq_server_name == "None")
1887 {
1888 daq_server_flag = 0;
1889 daq_server_name = "";
1890 daq_server_port = 0;
1891 return 0;
1892 }
1893
1894 daq_server_flag = 1;
1895 daq_server_port = port;
1896 if ( ! daq_server_port) daq_server_port = 5001;
1897
1898 return 0;
1899 }
1900
1901
1902
1903 int server_open_Connection()
1904 {
1905
1906 if ( !daq_server_flag)
1907 {
1908 return -1;
1909 }
1910
1911 persistentErrorCondition = 0;
1912
1913 int theport = daq_server_port;
1914 if ( ! theport) theport = 5001;
1915
1916 TheServerFD = open_serverSocket(daq_server_name.c_str(), theport);
1917 if ( TheServerFD < 0)
1918 {
1919 if ( TheServerFD == -1)
1920 {
1921 cout << __FILE__<< " " << __LINE__ << " error connecting to server " << daq_server_name << " on port " << theport << endl;
1922 }
1923 else
1924 {
1925 cout << __FILE__<< " " << __LINE__ << " error connecting to server " << daq_server_name << " on port " << theport << " " << gai_strerror(TheServerFD) << endl;
1926 }
1927
1928 TheServerFD = 0;
1929 persistentErrorCondition = 1;
1930 return -1;
1931 }
1932
1933 return 0;
1934 }
1935
1936
1937 int daq_shutdown(const unsigned long servernumber, const unsigned long versionnumber, const int pid_fd,
1938 std::ostream& os)
1939 {
1940
1941 if ( DAQ_RUNNING )
1942 {
1943 os << MyHostName << "Run is active" << endl;;
1944 return -1;
1945 }
1946
1947 if (daq_server_flag)
1948 {
1949 daq_close(std::cout);
1950 }
1951
1952 static unsigned long t_args[3];
1953 t_args[0] = servernumber;
1954 t_args[1] = versionnumber;
1955 t_args[3] = pid_fd;
1956
1957
1958 pthread_t t;
1959
1960 int status = pthread_create(&t, NULL,
1961 shutdown_thread,
1962 (void *) t_args);
1963
1964 if (status )
1965 {
1966 cout << "cannot shut down " << status << endl;
1967 os << MyHostName << "cannot shut down " << status << endl;
1968 return -1;
1969 }
1970 os << " ";
1971 return 0;
1972 }
1973
1974 int is_open()
1975 {
1976 return daq_open_flag;
1977 }
1978
1979 int is_server_open()
1980 {
1981 return daq_server_flag;
1982 }
1983
1984 int daq_close (std::ostream& os)
1985 {
1986
1987 if ( DAQ_RUNNING )
1988 {
1989 os << MyHostName << "Run is active" << endl;;
1990 return -1;
1991 }
1992
1993 daq_open_flag =0;
1994 persistentErrorCondition = 0;
1995
1996 return 0;
1997 }
1998
1999 int daq_server_close (std::ostream& os)
2000 {
2001
2002 if ( DAQ_RUNNING )
2003 {
2004 os << MyHostName << "Run is active" << endl;;
2005 return -1;
2006 }
2007 if ( server_send_close_sequence(TheServerFD) )
2008 {
2009 std::cout << __FILE__ << " " << __LINE__ << " error in closing connection... " << std::endl;
2010 }
2011 close(TheServerFD);
2012 TheServerFD = 0;
2013
2014 daq_server_flag =0;
2015 return 0;
2016 }
2017
2018
2019 int daq_list_readlist(std::ostream& os)
2020 {
2021
2022 deviceiterator d_it;
2023
2024 for ( d_it = DeviceList.begin(); d_it != DeviceList.end(); ++d_it)
2025 {
2026 (*d_it)->identify(os);
2027 }
2028
2029 return 0;
2030
2031 }
2032
2033 int daq_clear_readlist(std::ostream& os)
2034 {
2035
2036 if ( DAQ_RUNNING )
2037 {
2038 os << MyHostName << "Run is active" << endl;;
2039 return -1;
2040 }
2041
2042 deviceiterator d_it;
2043
2044 for ( d_it = DeviceList.begin(); d_it != DeviceList.end(); ++d_it)
2045 {
2046 delete (*d_it);
2047 }
2048
2049 DeviceList.clear();
2050 os << MyHostName << "Readlist cleared" << endl;
2051
2052 return 0;
2053 }
2054
2055
2056 int rcdaq_init( const int snumber, pthread_mutex_t &M)
2057 {
2058
2059 int status;
2060
2061 servernumber = snumber;
2062 ThePort += servernumber;
2063
2064 char hostname[HOST_NAME_MAX];
2065 status = gethostname(hostname, HOST_NAME_MAX);
2066 if (!status)
2067 {
2068 shortHostName = hostname;
2069 MyHostName = hostname;
2070 if (servernumber)
2071 {
2072 shortHostName = shortHostName + ":" + to_string(servernumber);
2073 MyHostName = MyHostName + ":" + to_string(servernumber);
2074 }
2075 MyHostName += " - ";
2076 }
2077 MyName = shortHostName;
2078
2079 sprintf( daq_event_env_string, "DAQ_EVENTNUMBER=%d", -1);
2080 putenv(daq_event_env_string);
2081
2082
2083
2084
2085 M_cout = M;
2086
2087 pthread_mutex_init( &WriteSem, 0);
2088 pthread_mutex_init( &WriteProtectSem, 0);
2089
2090
2091 pthread_mutex_init( &MonitoringRequestSem, 0);
2092 pthread_mutex_init( &SendSem, 0);
2093 pthread_mutex_init( &SendProtectSem, 0);
2094 pthread_mutex_init( &FdManagementSem,0);
2095
2096
2097 pthread_mutex_lock( &MonitoringRequestSem);
2098 pthread_mutex_lock( &WriteSem);
2099 pthread_mutex_lock( &SendSem);
2100
2101 ThreadEvt = 0;
2102
2103 outfile_fd = 0;
2104
2105
2106 Buffer1.setMD5State(&md5state);
2107 Buffer2.setMD5State(&md5state);
2108
2109 fillBuffer = &Buffer1;
2110 transportBuffer = &Buffer2;
2111
2112
2113 #ifdef WRITEPRDF
2114 Buffer1.setEventFormat(DAQPRDFFORMAT);
2115 Buffer2.setEventFormat(DAQPRDFFORMAT);
2116 #endif
2117
2118
2119 status = pthread_create(&ThreadId, NULL,
2120 writebuffers,
2121 (void *) 0);
2122
2123 if (status )
2124 {
2125 cout << "error in write thread create " << status << endl;
2126 exit(0);
2127 }
2128 else
2129 {
2130 pthread_mutex_lock(&M_cout);
2131 cout << "write thread created" << endl;
2132 pthread_mutex_unlock(&M_cout);
2133 }
2134
2135 status = pthread_create(&ThreadMon, NULL,
2136 sendMonitorData,
2137 (void *) 0);
2138 if (status )
2139 {
2140 cout << "error in send monitor data thread create " << status << endl;
2141 exit(0);
2142 }
2143 else
2144 {
2145 pthread_mutex_lock(&M_cout);
2146 cout << "monitor thread created" << endl;
2147 pthread_mutex_unlock(&M_cout);
2148 }
2149
2150 status = pthread_create(&ThreadMon, NULL,
2151 monitorRequestwatcher_thread,
2152 (void *) 0);
2153 if (status )
2154 {
2155 cout << "error in send monitor data thread create " << status << endl;
2156 exit(0);
2157 }
2158 else
2159 {
2160 pthread_mutex_lock(&M_cout);
2161 cout << "monitor request thread created" << endl;
2162 pthread_mutex_unlock(&M_cout);
2163 }
2164
2165
2166
2167
2168 daq_webcontrol ( ThePort);
2169
2170
2171
2172 return 0;
2173 }
2174
2175
2176 int get_runnumber()
2177 {
2178 if ( ! DAQ_RUNNING ) return -1;
2179 return TheRun;
2180 }
2181
2182 int get_oldrunnumber()
2183 {
2184 return TheRun;
2185 }
2186
2187 int get_eventnumber()
2188 {
2189 if ( ! DAQ_RUNNING ) return 0;
2190 return Event_number;
2191 }
2192 double get_runvolume()
2193 {
2194 if ( ! DAQ_RUNNING ) return 0;
2195 double v = run_volume;
2196 return v / (1024*1024);
2197 }
2198 int get_runduration()
2199 {
2200 if ( ! DAQ_RUNNING ) return 0;
2201 return time(0) - StartTime;
2202 }
2203
2204 int get_openflag()
2205 {
2206 return daq_open_flag;
2207 }
2208 int get_serverflag()
2209 {
2210 return daq_server_flag;
2211 }
2212
2213
2214 int daq_status( const int flag, std::ostream& os)
2215 {
2216
2217 double volume = run_volume;
2218 volume /= (1024*1024);
2219
2220 switch (flag)
2221 {
2222
2223 case STATUSFORMAT_SHORT:
2224
2225 if ( DAQ_RUNNING )
2226 {
2227 os << TheRun << " " << Event_number -1 << " "
2228 << volume << " ";
2229 os << daq_open_flag << " ";
2230 os << daq_server_flag << " ";
2231 os << time(0) - StartTime << " ";
2232 os << get_current_filename() << " "
2233 << " \"" << MyName << "\"" << endl;
2234 }
2235 else
2236 {
2237 os << "-1 0 0 ";
2238 os << daq_open_flag << " ";
2239 os << daq_server_flag << " ";
2240 os << " 0 0"
2241 << " \"" << MyName << "\"" << endl;
2242
2243 }
2244
2245 break;
2246
2247 case STATUSFORMAT_NORMAL:
2248 if ( DAQ_RUNNING )
2249 {
2250 os << MyHostName << "Run " << TheRun
2251 << " Event: " << Event_number -1
2252 << " Volume: " << volume;
2253 if ( daq_open_flag )
2254 {
2255 if ( daq_server_flag )
2256 {
2257 os << " Logging enabled via remote server " << daq_server_name << " Port " << daq_server_port;
2258 }
2259 else
2260 {
2261 os << " Logging enabled";
2262 if ( Buffer1.getCompression() ) os << " compression enabled";
2263 }
2264 }
2265 else
2266 {
2267 os << " Logging disabled";
2268 }
2269 }
2270 else
2271 {
2272 os << MyHostName << "Stopped";
2273 if ( daq_open_flag )
2274 {
2275 if ( daq_server_flag )
2276 {
2277 os << " Logging enabled via remote server " << daq_server_name << " Port " << daq_server_port;
2278 }
2279 else
2280 {
2281 os << " Logging enabled";
2282 if ( Buffer1.getCompression() ) os << " compression enabled";
2283 }
2284 }
2285 else
2286 {
2287 if ( daq_server_flag )
2288 {
2289 os << " Logging disabled, remote server set " << daq_server_name << " Port " << daq_server_port;
2290 }
2291 else
2292 {
2293 os << " Logging disabled";
2294 }
2295 }
2296 }
2297
2298 if ( RolloverLimit)
2299 {
2300 os << " File rollover: " << RolloverLimit << "GB";
2301 }
2302
2303
2304 os<< endl;
2305
2306 break;
2307
2308 default:
2309
2310 if ( DAQ_RUNNING )
2311 {
2312 os << MyHostName << " running" << endl;
2313
2314
2315 os << " Run Number: " << TheRun << endl;
2316 os << " Event: " << Event_number << endl;;
2317 os << " Run Volume: " << volume << " MB"<< endl;
2318 os << " Filerule: " << daq_get_filerule() << endl;
2319 if ( RolloverLimit)
2320 {
2321 os << " File rollover: " << RolloverLimit << "GB" << endl;
2322 }
2323
2324
2325
2326
2327
2328 if ( daq_open_flag )
2329 {
2330 if ( daq_server_flag )
2331 {
2332 os << " Filename on server: " << get_current_filename() << endl;
2333 }
2334 else
2335 {
2336 os << " Filename: " << get_current_filename() << endl;
2337 }
2338 }
2339
2340 os << " Duration: " << time(0) - StartTime << " s" <<endl;
2341
2342 if (max_volume)
2343 {
2344 os << " Volume Limit: " << max_volume /(1024 *1024) << " Mb" << endl;
2345 }
2346 if (max_events)
2347 {
2348 os << " Event Limit: " << max_events << endl;
2349 }
2350 }
2351 else
2352 {
2353 os << MyHostName << " Stopped" << endl;
2354 if ( daq_open_flag )
2355 {
2356 if ( daq_server_flag )
2357 {
2358 os << " Logging enabled via remote server " << daq_server_name << " Port " << daq_server_port << endl;
2359 }
2360 else
2361 {
2362 os << " Logging enabled";
2363 if ( Buffer1.getCompression() ) os << " compression enabled";
2364 os << endl;
2365 }
2366 }
2367 else
2368 {
2369 if ( daq_server_flag )
2370 {
2371 os << " Logging disabled, remote server set " << daq_server_name << " Port " << daq_server_port << endl;
2372 }
2373 else
2374 {
2375 os << " Logging disabled" << endl;
2376 }
2377 }
2378 os << " Filerule: " << daq_get_filerule() << endl;
2379
2380 if ( RolloverLimit)
2381 {
2382 os << " File rollover: " << RolloverLimit << "GB" << endl;
2383 }
2384
2385
2386
2387
2388
2389
2390 if (max_volume)
2391 {
2392 os << " Volume Limit: " << max_volume /(1024 *1024) << " Mb" << endl;
2393 }
2394 if (max_events)
2395 {
2396 os << " Event Limit: " << max_events << endl;
2397 }
2398 if ( TriggerH ) os << " have a trigger object" << endl;
2399 }
2400 if (RunControlMode)
2401 {
2402 os << " Run Control Mode enabled " << endl;
2403 }
2404
2405 os << " Buffer Sizes: " << Buffer1.getMaxSize()/1024 << " KB";
2406 if ( adaptivebuffering)
2407 {
2408 os << " adaptive buffering: " << adaptivebuffering << " s";
2409 }
2410 os << endl;
2411
2412 if ( ThePort)
2413 {
2414 os << " Web control Port: " << ThePort << endl;
2415 }
2416 else
2417 {
2418 os << " No Web control defined" << endl;
2419 }
2420
2421 if ( daq_getEventFormat())
2422 {
2423 os << " Writing legacy format " << endl;
2424 }
2425
2426 if ( ElogH)
2427 {
2428 os << " Elog: " << ElogH->getHost() << " " << ElogH->getLogbookName() << " Port " << ElogH->getPort() << endl;
2429 }
2430 else
2431 {
2432 os << " Elog: not defined" << endl;
2433 }
2434 #ifdef HAVE_MOSQUITTO_H
2435 if (mqtt)
2436 {
2437 os << " mqtt: " << mqtt->GetHostName() << " Port " << mqtt->GetPort() << endl;
2438 }
2439 #endif
2440
2441 daq_status_runtypes ( os);
2442 daq_status_plugin(flag, os);
2443
2444 break;
2445
2446 }
2447
2448 return 0;
2449 }
2450
2451 int daq_webcontrol(const int port, std::ostream& os)
2452 {
2453
2454 if ( port ==0)
2455 {
2456 ThePort=8899 + servernumber;
2457 }
2458 else
2459 {
2460 ThePort = port;
2461 }
2462
2463 if ( ThreadWeb)
2464 {
2465 mg_end();
2466 pthread_join(ThreadWeb, NULL);
2467 ThreadWeb = 0;
2468 }
2469
2470 int status = pthread_create(&ThreadWeb, NULL,
2471 mg_server,
2472 (void *) &ThePort);
2473
2474 if (status )
2475 {
2476 os << MyHostName << "error in web service creation " << status << endl;
2477 ThePort=0;
2478 return -1;
2479 }
2480 else
2481 {
2482 os << MyHostName << "web service created" << endl;
2483 return 0;
2484 }
2485 return 0;
2486
2487 }
2488
2489 int daq_getlastfilename( std::ostream& os)
2490 {
2491 if (get_previous_filename().empty() ) return -1;
2492 os << get_previous_filename() << endl;
2493 return 0;
2494 }
2495
2496 int daq_getlastevent_number( std::ostream& os)
2497 {
2498 os << Event_number_at_last_end << endl;
2499 return Event_number_at_last_end;
2500 }
2501
2502 int daq_running()
2503 {
2504 if ( DAQ_RUNNING ) return 1;
2505 return 0;
2506 }
2507
2508
2509
2510
2511
2512
2513
2514
2515
2516
2517
2518 int open_serverSocket(const char * hostname, const int port)
2519 {
2520
2521
2522 int sockfd = 0;
2523
2524 struct addrinfo hints;
2525 memset(&hints, 0, sizeof(struct addrinfo));
2526
2527 hints.ai_family = AF_INET;
2528 struct addrinfo *result, *rp;
2529
2530 char port_str[512];
2531 sprintf(port_str, "%d", port);
2532
2533
2534 int status = getaddrinfo(hostname, port_str,
2535 &hints,
2536 &result);
2537
2538 if ( status < 0)
2539 {
2540 cout << __FILE__<< " " << __LINE__ << " " << hostname << " " << gai_strerror(status) << endl;
2541 return status;
2542 }
2543
2544 for (rp = result; rp != NULL; rp = rp->ai_next)
2545 {
2546
2547 if ( (sockfd = socket(result->ai_family, result->ai_socktype,
2548 result->ai_protocol) ) > 0 )
2549 {
2550 break;
2551 }
2552 }
2553
2554 if ( sockfd < 0)
2555 {
2556 std::cout << __FILE__ << " " << __LINE__ << " error in socket" << std::endl;
2557 perror("socket");
2558 freeaddrinfo(result);
2559 return -1;
2560 }
2561
2562 int xs = 512*1024;
2563
2564 int s = setsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, &xs, sizeof(int));
2565 if (s) std::cout << "setsockopt status = " << s << std::endl;
2566
2567 if ( connect(sockfd, rp->ai_addr, rp->ai_addrlen) < 0 )
2568 {
2569 std::cout << __FILE__ << " " << __LINE__ << " error in connect" << std::endl;
2570 perror("connect");
2571 freeaddrinfo(result);
2572 return -1;
2573 }
2574
2575 freeaddrinfo(result);
2576 return sockfd;
2577 }
2578
2579 int server_send_beginrun_sequence(const char * filename, const int runnumber, int fd)
2580 {
2581 int opcode;
2582 int status;
2583 int len;
2584
2585
2586 opcode = htonl(CTRL_SENDFILENAME);
2587 status = writen(fd, (char *) &opcode, sizeof(int));
2588 if ( status != sizeof(int)) return -1;
2589
2590 len = strlen(filename);
2591 opcode = htonl(len);
2592
2593
2594 status = writen(fd, (char *) &opcode, sizeof(int));
2595 if ( status != sizeof(int)) return -1;
2596
2597 status = writen(fd, (char *) filename, len);
2598 if ( status != len) return -1;
2599
2600 status = readn (fd, (char *) &opcode, sizeof(int) );
2601 if ( status != sizeof(int) || ntohl(opcode) != CTRL_REMOTESUCCESS )
2602 {
2603 perror("read_ack");
2604 return -1;
2605 }
2606
2607
2608 opcode = htonl(CTRL_BEGINRUN);
2609 status = writen(fd, (char *) &opcode, sizeof(int));
2610 if ( status != sizeof(int)) return -1;
2611
2612
2613 opcode = htonl(runnumber);
2614 status = writen(fd, (char *) &opcode, sizeof(int));
2615 if ( status != sizeof(int)) return -1;
2616
2617
2618 status = readn (fd, (char *) &opcode, sizeof(int) );
2619
2620 if ( status != sizeof(int) )
2621 {
2622 perror("read_ack");
2623 return -1;
2624 }
2625 if (ntohl(opcode) != CTRL_REMOTESUCCESS )
2626 {
2627 return -1;
2628 }
2629
2630 return 0;
2631 }
2632
2633 int server_send_rollover_sequence(const char * filename, int fd)
2634 {
2635 int opcode;
2636 int status;
2637 int len;
2638
2639
2640 opcode = htonl(CTRL_ROLLOVER);
2641 status = writen(fd, (char *) &opcode, sizeof(int));
2642 if ( status != sizeof(int)) return -1;
2643
2644 len = strlen(filename);
2645 opcode = htonl(len);
2646
2647 status = writen(fd, (char *) &opcode, sizeof(int));
2648 if ( status != sizeof(int)) return -1;
2649
2650 status = writen(fd, (char *) filename, len);
2651 if ( status != len) return -1;
2652
2653 status = readn (fd, (char *) &opcode, sizeof(int) );
2654 if ( status != sizeof(int) || ntohl(opcode) != CTRL_REMOTESUCCESS )
2655 {
2656 perror("read_ack");
2657 return -1;
2658 }
2659
2660 return 0;
2661 }
2662
2663
2664 int server_send_endrun_sequence(int fd)
2665 {
2666 int opcode;
2667 int status;
2668
2669 opcode = htonl(CTRL_ENDRUN);
2670 status = writen (fd, (char *)&opcode, sizeof(int) );
2671 if ( status != sizeof(int)) return -1;
2672
2673
2674 status = readn (fd, (char *) &opcode, sizeof(int) );
2675 if ( status != sizeof(int) || ntohl(opcode) != CTRL_REMOTESUCCESS )
2676 {
2677 perror("read_ack");
2678 return -1;
2679 }
2680
2681
2682 return 0;
2683 }
2684
2685 int server_send_close_sequence(int fd)
2686 {
2687 int opcode;
2688 int status;
2689
2690 opcode = htonl(CTRL_CLOSE);
2691 status = writen (fd, (char *)&opcode, sizeof(int) );
2692 if ( status != sizeof(int)) return -1;
2693
2694 return 0;
2695 }
2696
2697
2698
2699
2700
2701
2702
2703
2704
2705
2706
2707
2708
2709
2710
2711
2712
2713
2714
2715
2716
2717
2718
2719
2720
2721
2722
2723
2724
2725
2726 int daq_generate_json (const int flag)
2727 {
2728 #ifdef HAVE_MOSQUITTO_H
2729
2730 if ( ! mqtt) return 0;
2731
2732 std::ostringstream out;
2733
2734 if (flag == 0)
2735 {
2736
2737 out << "{\"file\": [" << endl;
2738 out << " { \"what\":\"new\","
2739 << " \"runnumber\":" << TheRun << ","
2740 << " \"host\":\"" << shortHostName << "\","
2741 << " \"runtype\":\"" << TheRunType << "\","
2742 << " \"CurrentFileName\":\"" << CurrentFilename << "\","
2743 << " \"CurrentFileSequence\":" << current_filesequence << ","
2744 << " \"FirstEventNr\":" << Event_number_at_last_open << ","
2745 << " \"time\": " << time(0) << " }" << endl;
2746 out << "] }" << endl;
2747 }
2748 else
2749 {
2750 md5_byte_t md5_digest[16];
2751 char digest_string[33];
2752
2753 if ( md5_enabled)
2754 {
2755 md5_finish(&md5state, md5_digest);
2756 for ( int i=0; i< 16; i++)
2757 {
2758 sprintf ( &digest_string[2*i], "%02x", md5_digest[i]);
2759 }
2760 digest_string[32] = 0;
2761 }
2762 else
2763 {
2764 for ( int i=0; i< 16; i++)
2765 {
2766 sprintf ( &digest_string[2*i], "ff");
2767 }
2768 digest_string[32] = 0;
2769 }
2770
2771 out << "{\"file\": [" << endl;
2772 out << " { \"what\":\"" << "update"
2773 << "\", \"runnumber\":" << TheRun << ","
2774 << " \"host\":\"" << shortHostName << "\","
2775 << " \"CurrentFileName\":\"" << CurrentFilename << "\","
2776 << " \"MD5\":\"" << digest_string << "\","
2777 << " \"LastEventNr\":" << Event_number_at_last_write -1 << ","
2778 << " \"NrEvents\":" << Event_number_at_last_write - Event_number_at_last_open << ","
2779 << " \"time\":" << time(0) << " }" << endl;
2780 out << "] }" << endl;
2781 }
2782
2783 mqtt->send(out.str());
2784
2785 #endif
2786
2787 return 0;
2788 }
2789
2790 int daq_set_uservalue ( const int index, const int value, std::ostream& os )
2791 {
2792 if ( index < 0 || index > 7)
2793 {
2794 os << "index out of range. (0..7)" << endl;
2795 return -1;
2796 }
2797 uservalues[index] = value;
2798 return 0;
2799 }
2800
2801
2802 int daq_get_uservalue ( const int index, std::ostream& os )
2803 {
2804 if ( index < 0 || index > 7)
2805 {
2806 os << "index out of range. (0..7)" << endl;
2807 return -1;
2808 }
2809 os << uservalues[index] << endl;
2810 return 0;
2811
2812 }
2813
2814 int get_uservalue ( const int index)
2815 {
2816 if ( index < 0 || index > 7)
2817 {
2818 return 0;
2819 }
2820 return uservalues[index];
2821
2822 }
2823