File indexing completed on 2025-12-17 09:19:47
0001 #include "SingleInttPoolInput.h"
0002 #include "intt_pool.h"
0003
0004 #include "Fun4AllStreamingInputManager.h"
0005 #include "InputManagerType.h"
0006
0007 #include <ffarawobjects/InttRawHitContainerv2.h>
0008 #include <ffarawobjects/InttRawHitv2.h>
0009
0010 #include <fun4all/DBInterface.h>
0011
0012 #include <phool/RunnumberRange.h>
0013 #include <phool/PHCompositeNode.h>
0014 #include <phool/PHIODataNode.h> // for PHIODataNode
0015 #include <phool/PHNodeIterator.h> // for PHNodeIterator
0016 #include <phool/getClass.h>
0017 #include <phool/recoConsts.h>
0018
0019 #include <Event/Event.h>
0020 #include <Event/EventTypes.h>
0021 #include <Event/Eventiterator.h>
0022
0023 #include <TSystem.h>
0024
0025 #include <odbc++/resultset.h>
0026 #include <odbc++/statement.h>
0027
0028 #include <algorithm> // for max
0029 #include <cstdint> // for uint64_t
0030 #include <cstdlib> // for exit
0031 #include <iostream> // for operator<<, basic_o...
0032 #include <memory>
0033 #include <set>
0034 #include <utility> // for pair
0035
0036 SingleInttPoolInput::SingleInttPoolInput(const std::string &name)
0037 : SingleStreamingInput(name)
0038 {
0039 SubsystemEnum(InputManagerType::INTT);
0040 m_rawHitContainerName = "INTTRAWHIT";
0041 }
0042
0043 SingleInttPoolInput::~SingleInttPoolInput()
0044 {
0045 for (auto iter : poolmap)
0046 {
0047 if (Verbosity() > 2)
0048 {
0049 std::cout << "deleting intt pool for id " << (iter.second)->getIdentifier() << std::endl;
0050 }
0051 delete (iter.second);
0052 }
0053 }
0054
0055 void SingleInttPoolInput::FillPool(const uint64_t minBCO)
0056 {
0057 if (AllDone())
0058 {
0059 return;
0060 }
0061 while (GetEventiterator() == nullptr)
0062 {
0063 if (!OpenNextFile())
0064 {
0065 AllDone(1);
0066 return;
0067 }
0068 }
0069
0070
0071 while (GetSomeMoreEvents(0))
0072 {
0073 Event *evt = GetEventiterator()->getNextEvent();
0074 while (!evt)
0075 {
0076 fileclose();
0077 if (!OpenNextFile())
0078 {
0079 AllDone(1);
0080 return;
0081 }
0082 evt = GetEventiterator()->getNextEvent();
0083 }
0084 if (Verbosity() > 2)
0085 {
0086 std::cout << "Fetching next Event" << evt->getEvtSequence() << std::endl;
0087 }
0088 RunNumber(evt->getRunNumber());
0089 if (m_SavedRunNumber != RunNumber())
0090 {
0091 if (GetVerbosity() > 1)
0092 {
0093 std::cout << "setting streaming mode for run " << RunNumber() << std::endl;
0094 }
0095 streamingMode(IsStreaming(RunNumber()));
0096 m_SavedRunNumber = RunNumber();
0097 ConfigureStreamingInputManagerLocal(m_SavedRunNumber);
0098 }
0099
0100 if (GetVerbosity() > 1)
0101 {
0102 evt->identify();
0103 }
0104
0105 if (evt->getEvtType() != DATAEVENT)
0106 {
0107 m_NumSpecialEvents++;
0108 if (evt->getEvtType() == ENDRUNEVENT)
0109 {
0110 std::cout << "End run flag for INTT found, remaining INTT data is corrupted" << std::endl;
0111 delete evt;
0112 AllDone(1);
0113 return;
0114 }
0115 delete evt;
0116 continue;
0117 }
0118
0119 int EventSequence = evt->getEvtSequence();
0120 std::vector<Packet *> pktvec = evt->getPacketVector();
0121 if (m_SkipEarlyEvents)
0122 {
0123 for (Packet *pkt : pktvec)
0124 {
0125 int numBCOs = pkt->iValue(0, "NR_BCOS");
0126 for (int j = 0; j < numBCOs; j++)
0127 {
0128 uint64_t bco = pkt->lValue(j, "BCOLIST");
0129 if (bco < minBCO)
0130 {
0131 continue;
0132 }
0133 m_SkipEarlyEvents = false;
0134 }
0135 }
0136 }
0137 if (m_SkipEarlyEvents)
0138 {
0139 for (Packet *pkt : pktvec)
0140 {
0141 delete pkt;
0142 }
0143 delete evt;
0144 continue;
0145 }
0146 for (Packet *pkt : pktvec)
0147 {
0148 if (Verbosity() > 2)
0149 {
0150 pkt->identify();
0151 }
0152
0153 if (!poolmap.contains(pkt->getIdentifier()))
0154 {
0155 if (Verbosity() > 1)
0156 {
0157 std::cout << "starting new intt pool for packet " << pkt->getIdentifier() << std::endl;
0158 }
0159 poolmap[pkt->getIdentifier()] = new intt_pool(1000, 100);
0160 poolmap[pkt->getIdentifier()]->Verbosity(Verbosity());
0161 poolmap[pkt->getIdentifier()]->Name(std::to_string(pkt->getIdentifier()));
0162 }
0163 poolmap[pkt->getIdentifier()]->addPacket(pkt);
0164
0165 delete pkt;
0166 }
0167
0168 delete evt;
0169
0170 for (auto iter : poolmap)
0171 {
0172 intt_pool *pool = iter.second;
0173 auto packet_id = pool->getIdentifier();
0174 if (pool->depth_ok())
0175 {
0176 int num_hits = pool->iValue(0, "NR_HITS");
0177 if (Verbosity() > 1)
0178 {
0179 std::cout << "Number of Hits: " << num_hits << " for packet "
0180 << pool->getIdentifier() << std::endl;
0181 }
0182 if (Verbosity() > 2)
0183 {
0184 if (IsStandaloneMode())
0185 {
0186 std::cout << "INTT Pool in Standalone mode " << std::endl;
0187 }
0188 else
0189 {
0190 std::cout << "INTT Pool with GL1 BCO " << std::endl;
0191 }
0192 }
0193 int numBCOs = pool->iValue(0, "NR_BCOS");
0194 uint64_t largest_bco = 0;
0195 bool skipthis{true};
0196
0197 for (int j = 0; j < numBCOs; j++)
0198 {
0199 uint64_t bco = pool->lValue(j, "BCOLIST");
0200 largest_bco = std::max(largest_bco, bco);
0201 if (bco < minBCO)
0202 {
0203 continue;
0204 }
0205 if (!IsStandaloneMode() && bco > minBCO * 2)
0206 {
0207 continue;
0208 }
0209 skipthis = false;
0210 m_BclkStack.insert(bco);
0211 m_BclkStackPacketMap[packet_id].insert(bco);
0212 }
0213
0214 int nFEEs = pool->iValue(0, "UNIQUE_FEES");
0215 for (int j = 0; j < nFEEs; j++)
0216 {
0217 int fee = pool->iValue(j, "FEE_ID");
0218 int nbcos = pool->iValue(fee, "FEE_BCOS");
0219 for (int k = 0; k < nbcos; k++)
0220 {
0221 uint64_t bco = pool->lValue(fee, k, "BCOVAL");
0222 if (bco < minBCO)
0223 {
0224 continue;
0225 }
0226 if (!IsStandaloneMode() && bco > minBCO * 2)
0227 {
0228 continue;
0229 }
0230 m_FeeGTML1BCOMap[fee].insert(bco);
0231 }
0232 }
0233 if (skipthis)
0234 {
0235 if (Verbosity() > 1)
0236 {
0237 std::cout << "largest bco: 0x" << std::hex << largest_bco << ", minbco 0x" << minBCO
0238 << std::dec << ", evtno: " << EventSequence << std::endl;
0239 }
0240 }
0241 else
0242 {
0243 for (int j = 0; j < num_hits; j++)
0244 {
0245 uint64_t gtm_bco = pool->lValue(j, "BCO");
0246
0247 bool found{false};
0248 static uint64_t const header = 0xcadead;
0249 static uint64_t const footer = 0x80cafe;
0250 static uint64_t const projection = 0xffffff;
0251 for (unsigned int shift = 0; shift < 36; shift+=4)
0252 {
0253 if ((gtm_bco & (projection << shift)) == (header << shift)) {
0254 if (1 < Verbosity()) {
0255 std::cout << std::hex
0256 << " Header found in BCO!"
0257 << " bco: 0x" << gtm_bco
0258 << " projection: 0x" << (header << shift)
0259 << std::dec << std::endl;
0260 }
0261 found = true;
0262 break;
0263 }
0264 if ((gtm_bco & (projection << shift)) == (footer << shift)) {
0265 if (1 < Verbosity()) {
0266 std::cout << std::hex
0267 << " Footer found in BCO!"
0268 << " bco: 0x" << gtm_bco
0269 << " projection: 0x" << (footer << shift)
0270 << std::dec << std::endl;
0271 }
0272 found = true;
0273 break;
0274 }
0275 }
0276 if (found) { continue; }
0277
0278 if (gtm_bco < minBCO)
0279 {
0280
0281
0282
0283 continue;
0284 }
0285 if (!IsStandaloneMode() && gtm_bco > minBCO * 2)
0286 {
0287 continue;
0288 }
0289 auto newhit = std::make_unique<InttRawHitv2>();
0290 int FEE = pool->iValue(j, "FEE");
0291 newhit->set_packetid(pool->getIdentifier());
0292 newhit->set_fee(FEE);
0293 newhit->set_bco(gtm_bco);
0294 newhit->set_adc(pool->iValue(j, "ADC"));
0295 newhit->set_amplitude(pool->iValue(j, "AMPLITUDE"));
0296 newhit->set_chip_id(pool->iValue(j, "CHIP_ID"));
0297 newhit->set_channel_id(pool->iValue(j, "CHANNEL_ID"));
0298 newhit->set_word(pool->iValue(j, "DATAWORD"));
0299 newhit->set_FPHX_BCO(pool->iValue(j, "FPHX_BCO"));
0300 newhit->set_full_FPHX(pool->iValue(j, "FULL_FPHX"));
0301 newhit->set_full_ROC(pool->iValue(j, "FULL_ROC"));
0302 newhit->set_event_counter(pool->iValue(j, "EVENT_COUNTER"));
0303 gtm_bco += m_Rollover[FEE];
0304
0305 if (gtm_bco < m_PreviousClock[FEE])
0306 {
0307 m_Rollover[FEE] += 0x10000000000;
0308 gtm_bco += 0x10000000000;
0309 }
0310 m_PreviousClock[FEE] = gtm_bco;
0311 m_BeamClockFEE[gtm_bco].insert(FEE);
0312 m_FEEBclkMap[FEE] = gtm_bco;
0313 if (Verbosity() > 2)
0314 {
0315 std::cout << "evtno: " << EventSequence
0316 << ", hits: " << j
0317 << ", nr_hits: " << num_hits
0318 << ", FEE: " << FEE
0319 << ", bco: 0x" << std::hex << gtm_bco << std::dec
0320 << ", min bco: 0x" << std::hex << minBCO << std::dec
0321 << ", channel: " << newhit->get_channel_id()
0322 << ", evt_counter: " << newhit->get_event_counter() << std::endl;
0323 }
0324 if (StreamingInputManager())
0325 {
0326 StreamingInputManager()->AddInttRawHit(gtm_bco, newhit.get());
0327 }
0328 m_InttRawHitMap[gtm_bco].push_back(newhit.release());
0329 }
0330 }
0331
0332 }
0333 pool->next();
0334 }
0335 }
0336 }
0337
0338 void SingleInttPoolInput::Print(const std::string &what) const
0339 {
0340 if (what == "ALL" || what == "FEE")
0341 {
0342 for (const auto &bcliter : m_BeamClockFEE)
0343 {
0344 std::cout << "Beam clock 0x" << std::hex << bcliter.first << std::dec << std::endl;
0345 for (auto feeiter : bcliter.second)
0346 {
0347 std::cout << "FEM: " << feeiter << std::endl;
0348 }
0349 }
0350 }
0351 if (what == "ALL" || what == "FEEBCLK")
0352 {
0353 std::cout << "Printing last beamclock for every FEE" << std::endl;
0354 for (auto bcliter : m_FEEBclkMap)
0355 {
0356 std::cout << "FEE" << bcliter.first << " bclk: 0x"
0357 << std::hex << bcliter.second << std::dec << std::endl;
0358 }
0359 }
0360 if (what == "ALL" || what == "STORAGE")
0361 {
0362 for (const auto &bcliter : m_InttRawHitMap)
0363 {
0364 std::cout << "Beam clock 0x" << std::hex << bcliter.first << std::dec << std::endl;
0365 for (auto *feeiter : bcliter.second)
0366 {
0367 std::cout << "fee: " << feeiter->get_fee()
0368 << " at " << std::hex << feeiter << std::dec << std::endl;
0369 }
0370 }
0371 }
0372 if (what == "ALL" || what == "STACK")
0373 {
0374 for (const auto &[packetid, bclkstack] : m_BclkStackPacketMap)
0375 {
0376 for (const auto &bclk : bclkstack)
0377 {
0378 std::cout << "stacked bclk: 0x" << std::hex << bclk << std::dec << std::endl;
0379 }
0380 }
0381 for (auto iter : m_BclkStack)
0382 {
0383 std::cout << "stacked bclk: 0x" << std::hex << iter << std::dec << std::endl;
0384 }
0385 }
0386 }
0387
0388 void SingleInttPoolInput::CleanupUsedPackets(const uint64_t bclk)
0389 {
0390 m_BclkStack.erase(m_BclkStack.begin(), m_BclkStack.upper_bound(bclk));
0391 m_BeamClockFEE.erase(m_BeamClockFEE.begin(), m_BeamClockFEE.upper_bound(bclk));
0392 for (auto it = m_InttRawHitMap.begin(); it != m_InttRawHitMap.end() && (it->first <= bclk); it = m_InttRawHitMap.erase(it))
0393 {
0394 for (const auto &rawhit : it->second)
0395 {
0396 delete rawhit;
0397 }
0398 }
0399 m_InttRawHitMap.erase(m_InttRawHitMap.begin(), m_InttRawHitMap.upper_bound(bclk));
0400 }
0401
0402 bool SingleInttPoolInput::CheckPoolDepth(const uint64_t bclk)
0403 {
0404 for (auto iter : m_FEEBclkMap)
0405 {
0406 if (Verbosity() > 2)
0407 {
0408 std::cout << "my bclk 0x" << std::hex << iter.second
0409 << " req: 0x" << bclk << std::dec << std::endl;
0410 }
0411 if (iter.second < bclk)
0412 {
0413 if (Verbosity() > 1)
0414 {
0415 std::cout << "FEE " << iter.first << " beamclock 0x" << std::hex << iter.second
0416 << " smaller than req bclk: 0x" << bclk << std::dec << std::endl;
0417 }
0418 return false;
0419 }
0420 }
0421 return true;
0422 }
0423
0424 void SingleInttPoolInput::ClearCurrentEvent()
0425 {
0426
0427 uint64_t currentbclk = *(m_BclkStackPacketMap.begin()->second).begin();
0428
0429 CleanupUsedPackets(currentbclk);
0430
0431
0432 return;
0433 }
0434
0435 bool SingleInttPoolInput::GetSomeMoreEvents(const uint64_t ibclk)
0436 {
0437 if (AllDone())
0438 {
0439 return false;
0440 }
0441 if (poolmap.empty())
0442 {
0443
0444 return true;
0445 }
0446
0447
0448
0449
0450
0451
0452
0453
0454 uint64_t localbclk = ibclk;
0455 if (ibclk == 0)
0456 {
0457 if (m_InttRawHitMap.empty())
0458 {
0459
0460 return true;
0461 }
0462 localbclk = m_InttRawHitMap.begin()->first;
0463 }
0464
0465 std::set<int> toerase;
0466 for (auto bcliter : m_FEEBclkMap)
0467 {
0468 if (bcliter.second <= localbclk)
0469 {
0470 uint64_t highest_bclk = m_InttRawHitMap.rbegin()->first;
0471 if ((highest_bclk - m_InttRawHitMap.begin()->first) < MaxBclkDiff())
0472 {
0473
0474
0475
0476 return true;
0477 }
0478
0479 std::cout << PHWHERE << Name() << ": erasing FEE " << bcliter.first
0480 << " with stuck bclk: 0x" << std::hex << bcliter.second
0481 << " current bco range: 0x" << m_InttRawHitMap.begin()->first
0482 << ", to: 0x" << highest_bclk << ", delta: " << std::dec
0483 << (highest_bclk - m_InttRawHitMap.begin()->first)
0484 << std::dec << std::endl;
0485 toerase.insert(bcliter.first);
0486 }
0487 }
0488 for (auto iter : toerase)
0489 {
0490 m_FEEBclkMap.erase(iter);
0491 }
0492
0493 return false;
0494 }
0495
0496 void SingleInttPoolInput::CreateDSTNode(PHCompositeNode *topNode)
0497 {
0498 PHNodeIterator iter(topNode);
0499 PHCompositeNode *dstNode = dynamic_cast<PHCompositeNode *>(iter.findFirst("PHCompositeNode", "DST"));
0500 if (!dstNode)
0501 {
0502 dstNode = new PHCompositeNode("DST");
0503 topNode->addNode(dstNode);
0504 }
0505 PHNodeIterator iterDst(dstNode);
0506 PHCompositeNode *detNode = dynamic_cast<PHCompositeNode *>(iterDst.findFirst("PHCompositeNode", "INTT"));
0507 if (!detNode)
0508 {
0509 detNode = new PHCompositeNode("INTT");
0510 dstNode->addNode(detNode);
0511 }
0512 InttRawHitContainer *intthitcont = findNode::getClass<InttRawHitContainer>(detNode, m_rawHitContainerName);
0513 if (!intthitcont)
0514 {
0515 intthitcont = new InttRawHitContainerv2();
0516 PHIODataNode<PHObject> *newNode = new PHIODataNode<PHObject>(intthitcont, m_rawHitContainerName, "PHObject");
0517 detNode->addNode(newNode);
0518 }
0519 }
0520
0521
0522 void SingleInttPoolInput::ConfigureStreamingInputManagerLocal(const int runnumber)
0523 {
0524 if (StreamingInputManager())
0525 {
0526
0527 if (runnumber > 58677 && m_BcoRange < 5)
0528 {
0529 SetBcoRange(3);
0530 if (GetVerbosity() > 2)
0531 {
0532 std::cout << "INTT changed to triggered event combining with range [-"
0533 << m_NegativeBco << "," << m_BcoRange << "]" << std::endl;
0534 }
0535 }
0536 switch (m_StreamingFlag)
0537 {
0538 case InttStreamingMode::TRIGGERED:
0539 std::cout << PHWHERE << " INTT triggered event combining with range [-"
0540 << m_NegativeBco << "," << m_BcoRange << "]" << std::endl;
0541 break;
0542 case InttStreamingMode::STREAMING:
0543 std::cout << PHWHERE << " INTT streaming event combining with range [-"
0544 << m_NegativeBco << "," << m_BcoRange << "]" << std::endl;
0545 break;
0546 case InttStreamingMode::UNDEFINED:
0547 std::cout << PHWHERE << " INTT undefined streaming mode, combining with range [-"
0548 << m_NegativeBco << "," << m_BcoRange << "]" << std::endl;
0549 break;
0550 default:
0551 std::cout << PHWHERE << " Unknown INTT streaming mode: "
0552 << m_StreamingFlag << " combining with range [-"
0553 << m_NegativeBco << "," << m_BcoRange << "]" << std::endl;
0554 break;
0555 }
0556 StreamingInputManager()->SetInttBcoRange(GetBcoRange());
0557 StreamingInputManager()->SetInttNegativeBco(GetNegativeBco());
0558 }
0559 }
0560
0561 void SingleInttPoolInput::streamingMode(const bool isStreaming)
0562 {
0563 if (isStreaming)
0564 {
0565 if(RunNumber() > RunnumberRange::RUN3PP_FIRST)
0566 {
0567 SetNegativeBco(120 - 24);
0568 }
0569 else
0570 {
0571 SetNegativeBco(120 - 23);
0572 }
0573 SetBcoRange(500);
0574 if (GetVerbosity() > 2)
0575 {
0576 std::cout << "INTT set to streaming event combining" << std::endl;
0577 }
0578 return;
0579 }
0580
0581 SetNegativeBco(1);
0582 SetBcoRange(2);
0583 }
0584
0585 bool SingleInttPoolInput::IsStreaming(int runnumber)
0586 {
0587 odbc::Statement *statement = DBInterface::instance()->getStatement("daq");
0588 std::string sched_data;
0589 std::string sql = "SELECT sched_data FROM gtm_scheduler WHERE vgtm=1 AND sched_entry = 1 AND runnumber = " + std::to_string(runnumber) + ";";
0590 std::unique_ptr<odbc::ResultSet> result_set(statement->executeQuery(sql));
0591 if (result_set && result_set->next())
0592 {
0593 sched_data = result_set->getString("sched_data");
0594 }
0595 bool m_is_streaming;
0596 if (std::string{"{17,55,24,54}"} == sched_data)
0597 {
0598 m_is_streaming = true;
0599 m_StreamingFlag = 1;
0600 }
0601 else if (std::string{"{0,54,91,53}"} == sched_data)
0602 {
0603
0604 m_is_streaming = false;
0605 m_StreamingFlag = -1;
0606 }
0607 else
0608 {
0609 std::cout << PHWHERE << "Unexpected value for sched_data: '" << sched_data << "'" << std::endl;
0610 gSystem->Exit(1);
0611 exit(1);
0612 }
0613 return m_is_streaming;
0614 }