File indexing completed on 2025-08-02 08:21:08
0001
0002 #include <string.h>
0003 #include <sys/types.h>
0004 #include <sys/stat.h>
0005 #include <fcntl.h>
0006
0007 #include <sys/time.h>
0008 #include <unistd.h>
0009 #include <stdlib.h>
0010 #include <errno.h>
0011 #include <unistd.h>
0012 #include <signal.h>
0013 #include <dlfcn.h>
0014 #include <sys/ipc.h>
0015
0016 #include <pthread.h>
0017
0018 #ifdef HAVE_BSTRING_H
0019 #include <bstring.h>
0020 #else
0021 #include <strings.h>
0022 #endif
0023
0024 #include <sys/time.h>
0025
0026 #include <sys/socket.h>
0027 #include <net/if.h>
0028 #include <netdb.h>
0029
0030 #include <sys/wait.h>
0031 #include <sys/resource.h>
0032
0033
0034 #include <sys/types.h>
0035
0036 #ifdef SunOS
0037 #include <sys/filio.h>
0038 #endif
0039
0040 #include <sys/stat.h>
0041
0042 #include <netinet/in.h>
0043 #include <arpa/inet.h>
0044 #include <netdb.h>
0045 #include <sys/ioctl.h>
0046
0047 #include <stdio.h>
0048 #include <iostream>
0049 #include <iomanip>
0050
0051 #ifdef HAVE_GETOPT_H
0052 #include <getopt.h>
0053 #endif
0054
0055
0056 typedef unsigned int PHDWORD;
0057 typedef unsigned short SWORD;
0058 typedef unsigned char BYTE;
0059 typedef unsigned int UINT;
0060
0061 #define BUFFERBLOCKSIZE 8192U
0062
0063
0064 #define CTRL_BEGINRUN 1
0065 #define CTRL_ENDRUN 2
0066 #define CTRL_DATA 3
0067 #define CTRL_CLOSE 4
0068 #define CTRL_SENDFILENAME 5
0069 #define CTRL_ROLLOVER 6
0070
0071 #define CTRL_REMOTESUCCESS 100
0072 #define CTRL_REMOTEFAIL 101
0073
0074 #define ROLE_RECEIVED 0
0075 #define ROLE_WRITTEN 1
0076
0077
0078
0079
0080 #define INITIAL_SIZE (4*1024*1024)
0081
0082
0083 typedef struct
0084 {
0085 int dirty;
0086 int role;
0087 int bytecount;
0088 int buffersize;
0089 PHDWORD *bf;
0090 } bufferstructure;
0091
0092 int run_number = 0;
0093 int verbose = 0;
0094 int sockfd = 0;
0095 int dd_fd = 0;
0096
0097
0098 pthread_mutex_t M_cout;
0099
0100 pthread_mutex_t M_write;
0101 pthread_mutex_t M_done;
0102
0103
0104 pthread_t ThreadId;
0105 pthread_t tid;
0106 int output_fd = -1;
0107
0108 int the_port = 5001;
0109
0110 int do_not_write = 0;
0111
0112 int RunIsActive = 0;
0113 int NumberWritten = 0;
0114 int file_open = 0;
0115
0116
0117 int readn(int , char *, int);
0118 int writen(int , char *, int);
0119
0120 #if defined(SunOS) || defined(Linux)
0121 void sig_handler(int);
0122 #else
0123 void sig_handler(...);
0124 #endif
0125
0126 void *writebuffers ( void * arg);
0127 int handle_this_child( pid_t pid, const std::string &host);
0128 in_addr_t find_address_from_interface(const char *);
0129
0130 void cleanup(const int exitstatus);
0131
0132
0133 bufferstructure *bf_being_received;
0134 bufferstructure *bf_being_written;
0135
0136
0137 using namespace std;
0138
0139
0140 void exitmsg()
0141 {
0142 cout << "** This is the Super Fast Server :-)." << endl;
0143 cout << "** usage: sfs " << endl;
0144 cout << " -d disable database logging [ db not yet implemented ]" << endl;
0145 cout << " -b interface bind only to this interface" << endl;
0146 cout << " -p number use this port (default 5001)" << endl;
0147 cout << " -v increase verbosity" << endl;
0148 cout << " -x do not write data to disk (testing)" << endl;
0149 cout << " Examples:" << endl;
0150 cout << " sfs -b ens801f0 -- listen only on that interface" << endl;
0151 cout << " sfs -p 5002 -- listen on port 5002" << endl;
0152
0153 exit(0);
0154 }
0155
0156
0157
0158
0159
0160
0161
0162
0163
0164
0165 std::string listen_interface;
0166
0167 int main( int argc, char* argv[])
0168 {
0169
0170
0171 #if defined(SunOS) || defined(Linux)
0172 struct sockaddr client_addr;
0173 #else
0174 struct sockaddr_in client_addr;
0175 #endif
0176 struct sockaddr_in server_addr;
0177
0178 #if defined(SunOS)
0179 int client_addr_len = sizeof(client_addr);
0180 #else
0181 unsigned int client_addr_len = sizeof(client_addr);
0182 #endif
0183
0184 pthread_mutex_init(&M_cout, 0);
0185 pthread_mutex_init(&M_write, 0);
0186 pthread_mutex_init(&M_done, 0);
0187
0188 pthread_mutex_lock(&M_write);
0189
0190
0191 in_addr_t listen_address = htonl(INADDR_ANY);
0192
0193 char c;
0194
0195 while ((c = getopt(argc, argv, "hvdxb:p:")) != EOF)
0196 {
0197 switch (c)
0198 {
0199
0200 case 'v':
0201 verbose++;
0202 break;
0203
0204 case 'h':
0205 exitmsg();
0206 break;
0207
0208 case 'b':
0209 listen_address = find_address_from_interface(optarg);
0210 listen_interface = optarg;
0211 break;
0212
0213 case 'p':
0214 if ( !sscanf(optarg, "%d", &the_port) ) exitmsg();
0215 break;
0216
0217 case 'd':
0218
0219
0220 break;
0221
0222 case 'x':
0223 do_not_write = 1;
0224 break;
0225
0226
0227 }
0228 }
0229
0230
0231
0232 if (argc > optind)
0233 {
0234 sscanf (argv[optind],"%d", &the_port);
0235 }
0236
0237
0238
0239
0240 if ( (sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0 )
0241 {
0242 cleanup(1);
0243 }
0244
0245 int xs = 1024*1024;
0246
0247 int s = setsockopt(sockfd, SOL_SOCKET, SO_RCVBUF,
0248 &xs, sizeof(xs));
0249
0250 if (s)
0251 {
0252 perror("Setsockopt:");
0253 }
0254
0255 memset( &server_addr, 0, sizeof(server_addr));
0256 server_addr.sin_family = AF_INET;
0257 server_addr.sin_addr.s_addr = listen_address;
0258 server_addr.sin_port = htons(the_port);
0259
0260 int i;
0261
0262 if ( ( i = bind(sockfd, (struct sockaddr*) &server_addr, sizeof(server_addr))) < 0 )
0263 {
0264 perror(" bind ");
0265 cleanup(1);
0266 }
0267
0268 if ( ( i = listen(sockfd, 100) ) )
0269 {
0270 perror(" listen ");
0271 cleanup(1);
0272 }
0273
0274 if (verbose)
0275 {
0276 cout << " listening on port " << the_port;
0277 if (! listen_interface.empty() ) cout << " on interface " << listen_interface;
0278 cout << endl;
0279 }
0280
0281 signal(SIGCHLD, SIG_IGN);
0282
0283 pid_t pid;
0284 struct sockaddr_in out;
0285
0286 std::string host;
0287
0288 while (sockfd > 0)
0289 {
0290
0291 client_addr_len = sizeof(out);
0292 dd_fd = accept(sockfd, (struct sockaddr *) &out, &client_addr_len);
0293 if ( dd_fd < 0 )
0294 {
0295 pthread_mutex_lock(&M_cout);
0296 cout << "error in accept socket" << endl;
0297 pthread_mutex_unlock(&M_cout);
0298 perror (" accept" );
0299 cleanup(1);
0300 exit(1);
0301 }
0302
0303
0304 char h[512];
0305 getnameinfo((struct sockaddr *) &out, sizeof(struct sockaddr_in), h, 511,
0306 NULL, 0, NI_NOFQDN);
0307 host = h;
0308
0309 if (verbose)
0310 {
0311 cout << " new connection from " << host << " at " << time(0) << endl;
0312 }
0313
0314
0315 if ( (pid = fork()) == 0 )
0316 {
0317 close(sockfd);
0318 return handle_this_child( pid, host);
0319 }
0320 else
0321 {
0322 close (dd_fd);
0323 }
0324 }
0325 }
0326
0327 int handle_this_child( pid_t pid, const std::string &host)
0328 {
0329
0330
0331 int controlword;
0332 int local_runnr = 0;
0333
0334 bufferstructure B0;
0335 bufferstructure B1;
0336
0337 bf_being_received = &B0;
0338 bf_being_written = &B1;
0339 bufferstructure *bf_temp;
0340
0341 B0.bf = new PHDWORD[INITIAL_SIZE];
0342 B0.buffersize = INITIAL_SIZE;
0343 B0.dirty = 0;
0344 B0.role= ROLE_RECEIVED;
0345 B0.bytecount = 0;
0346
0347 B1.bf = new PHDWORD[INITIAL_SIZE];
0348 B1.buffersize = INITIAL_SIZE;
0349 B1.dirty = 0;
0350 B1.role= ROLE_RECEIVED;
0351 B1.bytecount = 0;
0352
0353 int i;
0354
0355
0356 i = pthread_create(&ThreadId, NULL,
0357 writebuffers,
0358 (void *) 0);
0359 if (i )
0360 {
0361 cout << "error in thread create " << i << endl;
0362 perror("Thread ");
0363 cleanup(1);
0364 }
0365
0366
0367
0368
0369
0370
0371
0372
0373 i = ioctl (dd_fd, FIONBIO, 0);
0374
0375
0376
0377
0378 int status;
0379 int xx;
0380
0381 int go_on = 1;
0382 while ( go_on)
0383 {
0384 if ( (status = readn (dd_fd, (char *) &xx, sizeof(xx)) ) <= 0)
0385 {
0386 cout << "error in read from socket" << endl;
0387 perror ("read " );
0388 cleanup(1);
0389 }
0390
0391 controlword = ntohl(xx);
0392
0393
0394
0395
0396
0397
0398 char *p;
0399 char filename[1024];
0400 int value, len;
0401
0402 switch (controlword)
0403 {
0404
0405 case CTRL_SENDFILENAME:
0406
0407 readn (dd_fd, (char *) &len, sizeof(int));
0408 len = ntohl(len);
0409
0410
0411 if ( len >= 1023)
0412 {
0413 i = htonl(CTRL_REMOTEFAIL);
0414 writen (dd_fd, (char *)&i, sizeof(i));
0415 break;
0416 }
0417 value = readn (dd_fd, filename, len);
0418 filename[value] = 0;
0419
0420
0421 i = htonl(CTRL_REMOTESUCCESS);
0422 writen (dd_fd, (char *)&i, sizeof(i));
0423
0424 break;
0425
0426 case CTRL_ROLLOVER:
0427
0428
0429
0430
0431
0432
0433
0434 readn (dd_fd, (char *) &len, sizeof(int));
0435 len = ntohl(len);
0436
0437
0438 if ( len >= 1023)
0439 {
0440 i = htonl(CTRL_REMOTEFAIL);
0441 writen (dd_fd, (char *)&i, sizeof(i));
0442 break;
0443 }
0444 value = readn (dd_fd, filename, len);
0445 filename[value] = 0;
0446
0447
0448 if (! do_not_write)
0449 {
0450 close ( output_fd);
0451 output_fd = open(filename, O_WRONLY | O_CREAT | O_TRUNC | O_EXCL | O_LARGEFILE ,
0452 S_IRWXU | S_IROTH | S_IRGRP );
0453 if (output_fd < 0)
0454 {
0455 cerr << "file " << filename << " exists, I will not overwrite " << endl;
0456 i = htonl(CTRL_REMOTEFAIL);
0457 writen (dd_fd, (char *)&i, sizeof(i));
0458 break;
0459 }
0460 if (verbose)
0461 {
0462 cout << " opened new rollover file " << filename << endl;
0463 }
0464 }
0465
0466 i = htonl(CTRL_REMOTESUCCESS);
0467 writen (dd_fd, (char *)&i, sizeof(i));
0468
0469 break;
0470
0471 case CTRL_BEGINRUN:
0472
0473 readn (dd_fd, (char *) &local_runnr, sizeof(local_runnr) );
0474 local_runnr = ntohl(local_runnr);
0475
0476
0477 if (! do_not_write)
0478 {
0479 output_fd = open(filename, O_WRONLY | O_CREAT | O_TRUNC | O_EXCL | O_LARGEFILE ,
0480 S_IRWXU | S_IROTH | S_IRGRP );
0481 if (output_fd < 0)
0482 {
0483 cerr << "file " << filename << " exists, I will not overwrite " << endl;
0484 i = htonl(CTRL_REMOTEFAIL);
0485 writen (dd_fd, (char *)&i, sizeof(i));
0486 break;
0487 }
0488 if (verbose)
0489 {
0490 cout << " opened new file " << filename << endl;
0491 }
0492 }
0493
0494 bf_being_received = &B0;
0495 bf_being_written = &B1;
0496
0497 bf_being_received->role = ROLE_RECEIVED;
0498 bf_being_received->dirty=0;
0499
0500 bf_being_written->role = ROLE_WRITTEN;
0501 bf_being_written->dirty = 0;
0502
0503 i = htonl(CTRL_REMOTESUCCESS);
0504 writen (dd_fd, (char *)&i, sizeof(i));
0505
0506 break;
0507
0508
0509
0510
0511
0512
0513 case CTRL_DATA:
0514 status = readn (dd_fd, (char *) &len, sizeof(i));
0515 len = ntohl(len);
0516
0517
0518 bf_being_received->bytecount = len;
0519 if ( (len+sizeof(int)-1)/sizeof(int) > bf_being_received->buffersize)
0520 {
0521 delete [] bf_being_received->bf;
0522 bf_being_received->buffersize = (len + sizeof(int)-1)/sizeof(int) + 2048;
0523
0524 pthread_mutex_lock(&M_cout);
0525
0526 pthread_mutex_unlock(&M_cout);
0527
0528 bf_being_received->bf = new PHDWORD[ bf_being_received->buffersize];
0529 }
0530
0531 p = (char *) bf_being_received->bf;
0532 status = readn (dd_fd, p, len );
0533 if ( len != status)
0534 {
0535
0536 pthread_mutex_lock(&M_cout);
0537
0538
0539 perror("ctrl_data");
0540 pthread_mutex_unlock(&M_cout);
0541 i = htonl(CTRL_REMOTEFAIL);
0542 }
0543
0544 bf_being_received->bytecount = len;
0545 bf_being_received->dirty = 1;
0546
0547
0548 pthread_mutex_lock(&M_done);
0549
0550 bf_temp = bf_being_received;
0551 bf_being_received = bf_being_written;
0552 bf_being_written = bf_temp;
0553 bf_being_written->role = ROLE_WRITTEN;
0554 bf_being_received->role = ROLE_RECEIVED;
0555
0556
0557 pthread_mutex_unlock(&M_write);
0558
0559 i = htonl(CTRL_REMOTESUCCESS);
0560 writen (dd_fd, (char *)&i, sizeof(int));
0561
0562 break;
0563
0564 case CTRL_ENDRUN:
0565
0566
0567 pthread_mutex_lock(&M_done);
0568 if (! do_not_write)
0569 {
0570 close (output_fd);
0571 if (verbose)
0572 {
0573 cout << " closed file " << filename << " at " << time(0) << endl;
0574 }
0575 }
0576
0577 i = htonl(CTRL_REMOTESUCCESS);
0578 writen (dd_fd, (char *)&i, sizeof(i));
0579 pthread_mutex_unlock(&M_done);
0580
0581 break;
0582
0583 case CTRL_CLOSE:
0584 close ( dd_fd);
0585
0586
0587 go_on = 0;
0588 if (verbose == 1)
0589 {
0590 cout << " closed connection from " << host << endl;
0591 }
0592 else if (verbose > 1)
0593 {
0594 cout << " closed connection from " << host << " at " << time(0) << endl;
0595 }
0596 break;
0597
0598 }
0599
0600 }
0601
0602 if (verbose > 1) cout << " ending thread " << " at " << time(0) << endl;
0603
0604 return 0;
0605 }
0606
0607
0608
0609 void *writebuffers ( void * arg)
0610 {
0611
0612 while(1)
0613 {
0614
0615 pthread_mutex_lock(&M_write);
0616
0617 if (! do_not_write)
0618 {
0619 pthread_mutex_lock(&M_cout);
0620
0621 pthread_mutex_unlock(&M_cout);
0622
0623 int blockcount = ( bf_being_written->bytecount + BUFFERBLOCKSIZE -1)/BUFFERBLOCKSIZE;
0624 int bytecount = blockcount*BUFFERBLOCKSIZE;
0625
0626 pthread_mutex_lock(&M_cout);
0627
0628 pthread_mutex_unlock(&M_cout);
0629
0630 int bytes = writen ( output_fd, (char *) bf_being_written->bf , bytecount );
0631 if ( bytes != bytecount)
0632 {
0633 pthread_mutex_lock(&M_cout);
0634 cout << __LINE__ << " " << __FILE__ << " write error " << bytes << " " << bytecount << endl;
0635 pthread_mutex_unlock(&M_cout);
0636 bf_being_written->dirty = -1;
0637 }
0638 else
0639 {
0640
0641 bf_being_written->dirty = 0;
0642 }
0643 }
0644 else
0645 {
0646 bf_being_written->dirty = 0;
0647 }
0648 pthread_mutex_unlock(&M_done);
0649
0650 }
0651 return 0;
0652 }
0653
0654
0655
0656
0657 int readn (int fd, char *ptr, int nbytes)
0658 {
0659
0660 int nread, nleft;
0661
0662 nleft = nbytes;
0663 while ( nleft>0 )
0664 {
0665 nread = read (fd, ptr, nleft);
0666 if ( nread <= 0 )
0667 {
0668 return nread;
0669 }
0670
0671 #ifdef FRAGMENTMONITORING
0672 history[hpos++] = nread;
0673 #endif
0674 nleft -= nread;
0675 ptr += nread;
0676 }
0677
0678 #ifdef FRAGMENTMONITORING
0679 if ( hpos >1 )
0680 {
0681 cout << "Fragmented transfer of " << nbytes << "bytes: ";
0682 for ( int i=0; i<hpos; i++)
0683 {
0684 cout << " " << history[i]<< ",";
0685 }
0686 cout << endl;
0687 }
0688 #endif
0689 return (nbytes-nleft);
0690 }
0691
0692 int writen (int fd, char *ptr, int nbytes)
0693 {
0694 int nleft, nwritten;
0695 nleft = nbytes;
0696 while ( nleft>0 )
0697 {
0698 nwritten = write (fd, ptr, nleft);
0699 if ( nwritten < 0 )
0700 return nwritten;
0701
0702 nleft -= nwritten;
0703 ptr += nwritten;
0704 }
0705 return (nbytes-nleft);
0706 }
0707
0708
0709 in_addr_t find_address_from_interface(const char *interface)
0710 {
0711 int fd;
0712 struct ifreq ifr;
0713
0714
0715 if ( strlen(interface) >= IFNAMSIZ)
0716 {
0717 cerr << " Interface name too long, ignoring "<< endl;
0718 return htonl(INADDR_ANY);
0719 }
0720
0721 fd = socket(AF_INET, SOCK_DGRAM, 0);
0722
0723
0724 ifr.ifr_addr.sa_family = AF_INET;
0725
0726
0727 strncpy(ifr.ifr_name, interface, IFNAMSIZ-1);
0728
0729 ioctl(fd, SIOCGIFADDR, &ifr);
0730
0731 close(fd);
0732 in_addr_t a = ((struct sockaddr_in *)&ifr.ifr_addr)->sin_addr.s_addr;
0733
0734 if (verbose) cout << " binding to address " << inet_ntoa( ((struct sockaddr_in *)&ifr.ifr_addr)->sin_addr ) << endl;
0735 return a;
0736 }
0737
0738
0739
0740 void cleanup( const int exitstatus)
0741 {
0742 close (dd_fd);
0743
0744 exit(exitstatus);
0745 }
0746