Back to home page

sPhenix code displayed by LXR

 
 

    


File indexing completed on 2025-08-05 08:10:07

0001 from pathlib import Path
0002 import os
0003 import json
0004 import functools
0005 import tarfile
0006 import urllib.request
0007 import subprocess
0008 import sys
0009 import re
0010 import collections
0011 
0012 import pytest
0013 
0014 from helpers import (
0015     geant4Enabled,
0016     dd4hepEnabled,
0017     hepmc3Enabled,
0018     pythia8Enabled,
0019     exatrkxEnabled,
0020     onnxEnabled,
0021     AssertCollectionExistsAlg,
0022     failure_threshold,
0023 )
0024 
0025 import acts
0026 from acts.examples import (
0027     Sequencer,
0028     GenericDetector,
0029     AlignedDetector,
0030 )
0031 from acts.examples.odd import getOpenDataDetector
0032 
0033 
0034 u = acts.UnitConstants
0035 
0036 
0037 @pytest.fixture
0038 def field():
0039     return acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0040 
0041 
0042 @pytest.fixture
0043 def seq():
0044     return Sequencer(events=10, numThreads=1)
0045 
0046 
0047 def assert_csv_output(csv_path, stem):
0048     __tracebackhide__ = True
0049     # print(list(csv_path.iterdir()))
0050     assert len([f for f in csv_path.iterdir() if f.name.endswith(stem + ".csv")]) > 0
0051     assert all([f.stat().st_size > 100 for f in csv_path.iterdir()])
0052 
0053 
0054 def assert_entries(root_file, tree_name, exp=None, non_zero=False):
0055     __tracebackhide__ = True
0056     import ROOT
0057 
0058     ROOT.PyConfig.IgnoreCommandLineOptions = True
0059     ROOT.gROOT.SetBatch(True)
0060 
0061     rf = ROOT.TFile.Open(str(root_file))
0062     keys = [k.GetName() for k in rf.GetListOfKeys()]
0063     assert tree_name in keys
0064     print("Entries:", rf.Get(tree_name).GetEntries())
0065     if non_zero:
0066         assert rf.Get(tree_name).GetEntries() > 0, f"{root_file}:{tree_name}"
0067     if exp is not None:
0068         assert rf.Get(tree_name).GetEntries() == exp, f"{root_file}:{tree_name}"
0069 
0070 
0071 def assert_has_entries(root_file, tree_name):
0072     __tracebackhide__ = True
0073     assert_entries(root_file, tree_name, non_zero=True)
0074 
0075 
0076 @pytest.mark.slow
0077 @pytest.mark.skipif(not pythia8Enabled, reason="Pythia8 not set up")
0078 def test_pythia8(tmp_path, seq, assert_root_hash):
0079     from pythia8 import runPythia8
0080 
0081     (tmp_path / "csv").mkdir()
0082 
0083     assert not (tmp_path / "pythia8_particles.root").exists()
0084     assert len(list((tmp_path / "csv").iterdir())) == 0
0085 
0086     events = seq.config.events
0087 
0088     runPythia8(str(tmp_path), outputRoot=True, outputCsv=True, s=seq).run()
0089 
0090     del seq
0091 
0092     fp = tmp_path / "pythia8_particles.root"
0093     assert fp.exists()
0094     assert fp.stat().st_size > 2**10 * 50
0095     assert_entries(fp, "particles", events)
0096     assert_root_hash(fp.name, fp)
0097 
0098     assert len(list((tmp_path / "csv").iterdir())) > 0
0099     assert_csv_output(tmp_path / "csv", "particles")
0100 
0101 
0102 def test_fatras(trk_geo, tmp_path, field, assert_root_hash):
0103     from fatras import runFatras
0104 
0105     csv = tmp_path / "csv"
0106     csv.mkdir()
0107 
0108     nevents = 10
0109 
0110     root_files = [
0111         (
0112             "particles_simulation.root",
0113             "particles",
0114         ),
0115         (
0116             "hits.root",
0117             "hits",
0118         ),
0119     ]
0120 
0121     assert len(list(csv.iterdir())) == 0
0122     for rf, _ in root_files:
0123         assert not (tmp_path / rf).exists()
0124 
0125     seq = Sequencer(events=nevents)
0126     runFatras(trk_geo, field, str(tmp_path), s=seq).run()
0127 
0128     del seq
0129 
0130     assert_csv_output(csv, "particles_final")
0131     assert_csv_output(csv, "particles_initial")
0132     assert_csv_output(csv, "hits")
0133     for f, tn in root_files:
0134         rfp = tmp_path / f
0135         assert rfp.exists()
0136         assert rfp.stat().st_size > 2**10 * 10
0137 
0138         assert_has_entries(rfp, tn)
0139         assert_root_hash(f, rfp)
0140 
0141 
0142 @pytest.mark.slow
0143 @pytest.mark.odd
0144 @pytest.mark.skipif(not geant4Enabled, reason="Geant4 not set up")
0145 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
0146 def test_geant4(tmp_path, assert_root_hash):
0147     # This test literally only ensures that the geant 4 example can run without erroring out
0148     getOpenDataDetector()  # just to make sure it can build
0149 
0150     csv = tmp_path / "csv"
0151     csv.mkdir()
0152 
0153     root_files = [
0154         "particles_simulation.root",
0155         "hits.root",
0156     ]
0157 
0158     assert len(list(csv.iterdir())) == 0
0159     for rf in root_files:
0160         assert not (tmp_path / rf).exists()
0161 
0162     script = (
0163         Path(__file__).parent.parent.parent.parent
0164         / "Examples"
0165         / "Scripts"
0166         / "Python"
0167         / "geant4.py"
0168     )
0169     assert script.exists()
0170     env = os.environ.copy()
0171     env["ACTS_LOG_FAILURE_THRESHOLD"] = "WARNING"
0172     try:
0173         subprocess.check_call(
0174             [sys.executable, str(script)],
0175             cwd=tmp_path,
0176             env=env,
0177             stderr=subprocess.STDOUT,
0178         )
0179     except subprocess.CalledProcessError as e:
0180         print(e.output.decode("utf-8"))
0181         raise
0182 
0183     assert_csv_output(csv, "particles_final")
0184     assert_csv_output(csv, "particles_initial")
0185     assert_csv_output(csv, "hits")
0186     for f in root_files:
0187         rfp = tmp_path / f
0188         assert rfp.exists()
0189         assert rfp.stat().st_size > 2**10 * 10
0190 
0191         assert_root_hash(f, rfp)
0192 
0193 
0194 def test_seeding(tmp_path, trk_geo, field, assert_root_hash):
0195     from seeding import runSeeding
0196 
0197     field = acts.ConstantBField(acts.Vector3(0, 0, 2 * acts.UnitConstants.T))
0198 
0199     csv = tmp_path / "csv"
0200     csv.mkdir()
0201 
0202     seq = Sequencer(events=10, numThreads=1)
0203 
0204     root_files = [
0205         (
0206             "estimatedparams.root",
0207             "estimatedparams",
0208         ),
0209         (
0210             "performance_seeding.root",
0211             None,
0212         ),
0213         (
0214             "particles.root",
0215             "particles",
0216         ),
0217         (
0218             "particles_simulation.root",
0219             "particles",
0220         ),
0221     ]
0222 
0223     for fn, _ in root_files:
0224         fp = tmp_path / fn
0225         assert not fp.exists()
0226 
0227     assert len(list(csv.iterdir())) == 0
0228 
0229     runSeeding(trk_geo, field, outputDir=str(tmp_path), s=seq).run()
0230 
0231     del seq
0232 
0233     for fn, tn in root_files:
0234         fp = tmp_path / fn
0235         assert fp.exists()
0236         assert fp.stat().st_size > 100
0237 
0238         if tn is not None:
0239             assert_has_entries(fp, tn)
0240             assert_root_hash(fn, fp)
0241 
0242     assert_csv_output(csv, "particles")
0243     assert_csv_output(csv, "particles_final")
0244     assert_csv_output(csv, "particles_initial")
0245 
0246 
0247 def test_seeding_orthogonal(tmp_path, trk_geo, field, assert_root_hash):
0248     from seeding import runSeeding, SeedingAlgorithm
0249 
0250     field = acts.ConstantBField(acts.Vector3(0, 0, 2 * acts.UnitConstants.T))
0251 
0252     csv = tmp_path / "csv"
0253     csv.mkdir()
0254 
0255     seq = Sequencer(events=10, numThreads=1)
0256 
0257     root_files = [
0258         (
0259             "estimatedparams.root",
0260             "estimatedparams",
0261         ),
0262         (
0263             "performance_seeding.root",
0264             None,
0265         ),
0266         (
0267             "particles.root",
0268             "particles",
0269         ),
0270         (
0271             "particles_simulation.root",
0272             "particles",
0273         ),
0274     ]
0275 
0276     for fn, _ in root_files:
0277         fp = tmp_path / fn
0278         assert not fp.exists()
0279 
0280     assert len(list(csv.iterdir())) == 0
0281 
0282     runSeeding(
0283         trk_geo,
0284         field,
0285         outputDir=str(tmp_path),
0286         s=seq,
0287         seedingAlgorithm=SeedingAlgorithm.Orthogonal,
0288     ).run()
0289 
0290     del seq
0291 
0292     for fn, tn in root_files:
0293         fp = tmp_path / fn
0294         assert fp.exists()
0295         assert fp.stat().st_size > 100
0296 
0297         if tn is not None:
0298             assert_has_entries(fp, tn)
0299             assert_root_hash(fn, fp)
0300 
0301     assert_csv_output(csv, "particles")
0302     assert_csv_output(csv, "particles_final")
0303     assert_csv_output(csv, "particles_initial")
0304 
0305 
0306 def test_itk_seeding(tmp_path, trk_geo, field, assert_root_hash):
0307     field = acts.ConstantBField(acts.Vector3(0, 0, 2 * acts.UnitConstants.T))
0308 
0309     csv = tmp_path / "csv"
0310     csv.mkdir()
0311 
0312     seq = Sequencer(events=10, numThreads=1)
0313 
0314     root_files = [
0315         (
0316             "estimatedparams.root",
0317             "estimatedparams",
0318         ),
0319         (
0320             "performance_seeding.root",
0321             None,
0322         ),
0323         (
0324             "particles.root",
0325             "particles",
0326         ),
0327         (
0328             "particles_simulation.root",
0329             "particles",
0330         ),
0331     ]
0332 
0333     for fn, _ in root_files:
0334         fp = tmp_path / fn
0335         assert not fp.exists()
0336 
0337     assert len(list(csv.iterdir())) == 0
0338 
0339     rnd = acts.examples.RandomNumbers(seed=42)
0340 
0341     from acts.examples.simulation import (
0342         addParticleGun,
0343         EtaConfig,
0344         MomentumConfig,
0345         ParticleConfig,
0346         addFatras,
0347         addDigitization,
0348     )
0349 
0350     addParticleGun(
0351         seq,
0352         MomentumConfig(1.0 * u.GeV, 10.0 * u.GeV, True),
0353         EtaConfig(-4.0, 4.0, True),
0354         ParticleConfig(1, acts.PdgParticle.eMuon, True),
0355         outputDirCsv=tmp_path / "csv",
0356         outputDirRoot=str(tmp_path),
0357         rnd=rnd,
0358     )
0359 
0360     addFatras(
0361         seq,
0362         trk_geo,
0363         field,
0364         outputDirCsv=tmp_path / "csv",
0365         outputDirRoot=str(tmp_path),
0366         rnd=rnd,
0367     )
0368 
0369     srcdir = Path(__file__).resolve().parent.parent.parent.parent
0370     addDigitization(
0371         seq,
0372         trk_geo,
0373         field,
0374         digiConfigFile=srcdir
0375         / "Examples/Algorithms/Digitization/share/default-smearing-config-generic.json",
0376         rnd=rnd,
0377     )
0378 
0379     from acts.examples.reconstruction import (
0380         addSeeding,
0381         TruthSeedRanges,
0382     )
0383     from acts.examples.itk import itkSeedingAlgConfig, InputSpacePointsType
0384 
0385     addSeeding(
0386         seq,
0387         trk_geo,
0388         field,
0389         TruthSeedRanges(pt=(1.0 * u.GeV, None), eta=(-4, 4), nHits=(9, None)),
0390         *itkSeedingAlgConfig(InputSpacePointsType.PixelSpacePoints),
0391         acts.logging.VERBOSE,
0392         geoSelectionConfigFile=srcdir
0393         / "Examples/Algorithms/TrackFinding/share/geoSelection-genericDetector.json",
0394         inputParticles="particles_final",  # use this to reproduce the original root_file_hashes.txt - remove to fix
0395         outputDirRoot=str(tmp_path),
0396     )
0397 
0398     seq.run()
0399 
0400     del seq
0401 
0402     for fn, tn in root_files:
0403         fp = tmp_path / fn
0404         assert fp.exists()
0405         assert fp.stat().st_size > 100
0406 
0407         if tn is not None:
0408             assert_has_entries(fp, tn)
0409             assert_root_hash(fn, fp)
0410 
0411     assert_csv_output(csv, "particles")
0412     assert_csv_output(csv, "particles_final")
0413     assert_csv_output(csv, "particles_initial")
0414 
0415 
0416 @pytest.mark.slow
0417 def test_propagation(tmp_path, trk_geo, field, seq, assert_root_hash):
0418     from propagation import runPropagation
0419 
0420     obj = tmp_path / "obj"
0421     obj.mkdir()
0422 
0423     root_files = [
0424         (
0425             "propagation_steps.root",
0426             "propagation_steps",
0427             10000,
0428         )
0429     ]
0430 
0431     for fn, _, _ in root_files:
0432         fp = tmp_path / fn
0433         assert not fp.exists()
0434 
0435     assert len(list(obj.iterdir())) == 0
0436 
0437     runPropagation(trk_geo, field, str(tmp_path), s=seq).run()
0438 
0439     for fn, tn, ee in root_files:
0440         fp = tmp_path / fn
0441         assert fp.exists()
0442         assert fp.stat().st_size > 2**10 * 50
0443         assert_entries(fp, tn, ee)
0444         assert_root_hash(fn, fp)
0445 
0446     assert len(list(obj.iterdir())) > 0
0447 
0448 
0449 @pytest.mark.slow
0450 @pytest.mark.odd
0451 @pytest.mark.skipif(not geant4Enabled, reason="Geant4 not set up")
0452 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
0453 def test_material_recording(tmp_path, material_recording, assert_root_hash):
0454     root_files = [
0455         (
0456             "geant4_material_tracks.root",
0457             "material-tracks",
0458             200,
0459         )
0460     ]
0461 
0462     for fn, tn, ee in root_files:
0463         fp = material_recording / fn
0464         assert fp.exists()
0465         assert fp.stat().st_size > 2**10 * 50
0466         assert_entries(fp, tn, ee)
0467         assert_root_hash(fn, fp)
0468 
0469 
0470 @pytest.mark.slow
0471 @pytest.mark.odd
0472 @pytest.mark.skipif(not hepmc3Enabled, reason="HepMC3 plugin not available")
0473 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
0474 @pytest.mark.skipif(not geant4Enabled, reason="Geant4 not set up")
0475 def test_event_recording(tmp_path):
0476     script = (
0477         Path(__file__).parent.parent.parent.parent
0478         / "Examples"
0479         / "Scripts"
0480         / "Python"
0481         / "event_recording.py"
0482     )
0483     assert script.exists()
0484 
0485     env = os.environ.copy()
0486     env["NEVENTS"] = "1"
0487     env["ACTS_LOG_FAILURE_THRESHOLD"] = "WARNING"
0488     try:
0489         subprocess.check_call(
0490             [sys.executable, str(script)],
0491             cwd=tmp_path,
0492             env=env,
0493             stderr=subprocess.STDOUT,
0494         )
0495     except subprocess.CalledProcessError as e:
0496         print(e.output.decode("utf-8"))
0497         raise
0498 
0499     from acts.examples.hepmc3 import HepMC3AsciiReader
0500 
0501     out_path = tmp_path / "hepmc3"
0502     # out_path.mkdir()
0503 
0504     assert len([f for f in out_path.iterdir() if f.name.endswith("events.hepmc3")]) > 0
0505     assert all([f.stat().st_size > 100 for f in out_path.iterdir()])
0506 
0507     s = Sequencer(numThreads=1)
0508 
0509     s.addReader(
0510         HepMC3AsciiReader(
0511             level=acts.logging.INFO,
0512             inputDir=str(out_path),
0513             inputStem="events",
0514             outputEvents="hepmc-events",
0515         )
0516     )
0517 
0518     alg = AssertCollectionExistsAlg(
0519         "hepmc-events", name="check_alg", level=acts.logging.INFO
0520     )
0521     s.addAlgorithm(alg)
0522 
0523     s.run()
0524 
0525     assert alg.events_seen == 1
0526 
0527 
0528 @pytest.mark.parametrize("revFiltMomThresh", [0 * u.GeV, 1 * u.TeV])
0529 @pytest.mark.parametrize("directNavigation", [False, True])
0530 def test_truth_tracking_kalman(
0531     tmp_path, assert_root_hash, revFiltMomThresh, directNavigation, detector_config
0532 ):
0533     from truth_tracking_kalman import runTruthTrackingKalman
0534 
0535     field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0536 
0537     seq = Sequencer(events=10, numThreads=1)
0538 
0539     root_files = [
0540         ("trackstates_fitter.root", "trackstates", 19),
0541         ("tracksummary_fitter.root", "tracksummary", 10),
0542         ("performance_track_fitter.root", None, -1),
0543     ]
0544 
0545     for fn, _, _ in root_files:
0546         fp = tmp_path / fn
0547         assert not fp.exists()
0548 
0549     runTruthTrackingKalman(
0550         trackingGeometry=detector_config.trackingGeometry,
0551         field=field,
0552         digiConfigFile=detector_config.digiConfigFile,
0553         outputDir=tmp_path,
0554         reverseFilteringMomThreshold=revFiltMomThresh,
0555         directNavigation=directNavigation,
0556         s=seq,
0557     )
0558 
0559     seq.run()
0560 
0561     del seq
0562 
0563     for fn, tn, ee in root_files:
0564         fp = tmp_path / fn
0565         assert fp.exists()
0566         assert fp.stat().st_size > 1024
0567         if tn is not None:
0568             assert_has_entries(fp, tn)
0569             assert_root_hash(fn, fp)
0570 
0571     import ROOT
0572 
0573     ROOT.PyConfig.IgnoreCommandLineOptions = True
0574     ROOT.gROOT.SetBatch(True)
0575     rf = ROOT.TFile.Open(str(tmp_path / "tracksummary_fitter.root"))
0576     keys = [k.GetName() for k in rf.GetListOfKeys()]
0577     assert "tracksummary" in keys
0578     for entry in rf.Get("tracksummary"):
0579         assert entry.hasFittedParams
0580 
0581 
0582 def test_truth_tracking_gsf(tmp_path, assert_root_hash, detector_config):
0583     from truth_tracking_gsf import runTruthTrackingGsf
0584 
0585     field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0586 
0587     seq = Sequencer(
0588         events=10,
0589         numThreads=1,
0590         fpeMasks=[
0591             (
0592                 "Core/include/Acts/TrackFitting/detail/GsfUtils.hpp:197",
0593                 acts.FpeType.FLTUND,
0594                 1,
0595             ),
0596         ],
0597     )
0598 
0599     root_files = [
0600         ("trackstates_gsf.root", "trackstates"),
0601         ("tracksummary_gsf.root", "tracksummary"),
0602     ]
0603 
0604     for fn, _ in root_files:
0605         fp = tmp_path / fn
0606         assert not fp.exists()
0607 
0608     runTruthTrackingGsf(
0609         trackingGeometry=detector_config.trackingGeometry,
0610         decorators=detector_config.decorators,
0611         field=field,
0612         digiConfigFile=detector_config.digiConfigFile,
0613         outputDir=tmp_path,
0614         s=seq,
0615     )
0616 
0617     # See https://github.com/acts-project/acts/issues/1300
0618     with failure_threshold(acts.logging.FATAL):
0619         seq.run()
0620 
0621     del seq
0622 
0623     for fn, tn in root_files:
0624         fp = tmp_path / fn
0625         assert fp.exists()
0626         assert fp.stat().st_size > 1024
0627         if tn is not None:
0628             assert_root_hash(fn, fp)
0629 
0630 
0631 def test_particle_gun(tmp_path, assert_root_hash):
0632     from particle_gun import runParticleGun
0633 
0634     s = Sequencer(events=20, numThreads=-1)
0635 
0636     csv_dir = tmp_path / "csv"
0637     root_file = tmp_path / "particles.root"
0638 
0639     assert not csv_dir.exists()
0640     assert not root_file.exists()
0641 
0642     runParticleGun(str(tmp_path), s=s).run()
0643 
0644     assert csv_dir.exists()
0645     assert root_file.exists()
0646 
0647     assert len([f for f in csv_dir.iterdir() if f.name.endswith("particles.csv")]) > 0
0648     assert all([f.stat().st_size > 100 for f in csv_dir.iterdir()])
0649 
0650     assert root_file.stat().st_size > 200
0651     assert_entries(root_file, "particles", 20)
0652     assert_root_hash(root_file.name, root_file)
0653 
0654 
0655 @pytest.mark.slow
0656 @pytest.mark.odd
0657 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
0658 def test_material_mapping(material_recording, tmp_path, assert_root_hash):
0659     map_file = tmp_path / "material-map_tracks.root"
0660     assert not map_file.exists()
0661 
0662     s = Sequencer(numThreads=1)
0663 
0664     detector, trackingGeometry, decorators = getOpenDataDetector()
0665 
0666     from material_mapping import runMaterialMapping
0667 
0668     runMaterialMapping(
0669         trackingGeometry,
0670         decorators,
0671         outputDir=str(tmp_path),
0672         inputDir=material_recording,
0673         mappingStep=1,
0674         s=s,
0675     )
0676 
0677     s.run()
0678 
0679     # MaterialMapping alg only writes on destruct.
0680     # See https://github.com/acts-project/acts/issues/881
0681     del s
0682 
0683     mat_file = tmp_path / "material-map.json"
0684 
0685     assert mat_file.exists()
0686     assert mat_file.stat().st_size > 10
0687 
0688     with mat_file.open() as fh:
0689         assert json.load(fh)
0690 
0691     assert map_file.exists()
0692     assert_entries(map_file, "material-tracks", 200)
0693     assert_root_hash(map_file.name, map_file)
0694 
0695     val_file = tmp_path / "propagation-material.root"
0696     assert not val_file.exists()
0697 
0698     # test the validation as well
0699 
0700     # we need to destroy the ODD to reload with material
0701     del trackingGeometry
0702     del detector
0703 
0704     detector, trackingGeometry, decorators = getOpenDataDetector(
0705         mdecorator=acts.IMaterialDecorator.fromFile(mat_file),
0706     )
0707 
0708     from material_validation import runMaterialValidation
0709 
0710     s = Sequencer(events=10, numThreads=1)
0711 
0712     field = acts.NullBField()
0713 
0714     runMaterialValidation(
0715         trackingGeometry, decorators, field, outputDir=str(tmp_path), s=s
0716     )
0717 
0718     s.run()
0719 
0720     assert val_file.exists()
0721     assert_entries(val_file, "material-tracks", 10000)
0722     assert_root_hash(val_file.name, val_file)
0723 
0724 
0725 @pytest.mark.slow
0726 @pytest.mark.odd
0727 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
0728 def test_volume_material_mapping(material_recording, tmp_path, assert_root_hash):
0729     map_file = tmp_path / "material-map-volume_tracks.root"
0730     assert not map_file.exists()
0731 
0732     s = Sequencer(numThreads=1)
0733 
0734     geo_map = Path(__file__).parent / "geometry-volume-map.json"
0735     assert geo_map.exists()
0736     assert geo_map.stat().st_size > 10
0737     with geo_map.open() as fh:
0738         assert json.load(fh)
0739 
0740     detector, trackingGeometry, decorators = getOpenDataDetector(
0741         mdecorator=acts.IMaterialDecorator.fromFile(geo_map),
0742     )
0743 
0744     from material_mapping import runMaterialMapping
0745 
0746     runMaterialMapping(
0747         trackingGeometry,
0748         decorators,
0749         mapName="material-map-volume",
0750         outputDir=str(tmp_path),
0751         inputDir=material_recording,
0752         mappingStep=1,
0753         s=s,
0754     )
0755 
0756     s.run()
0757 
0758     # MaterialMapping alg only writes on destruct.
0759     # See https://github.com/acts-project/acts/issues/881
0760     del s
0761 
0762     mat_file = tmp_path / "material-map-volume.json"
0763 
0764     assert mat_file.exists()
0765     assert mat_file.stat().st_size > 10
0766 
0767     with mat_file.open() as fh:
0768         assert json.load(fh)
0769 
0770     assert map_file.exists()
0771     assert_entries(map_file, "material-tracks", 200)
0772     assert_root_hash(map_file.name, map_file)
0773 
0774     val_file = tmp_path / "propagation-volume-material.root"
0775     assert not val_file.exists()
0776 
0777     # test the validation as well
0778 
0779     # we need to destroy the ODD to reload with material
0780     del trackingGeometry
0781     del detector
0782 
0783     detector, trackingGeometry, decorators = getOpenDataDetector(
0784         mdecorator=acts.IMaterialDecorator.fromFile(mat_file),
0785     )
0786 
0787     from material_validation import runMaterialValidation
0788 
0789     s = Sequencer(events=10, numThreads=1)
0790 
0791     field = acts.NullBField()
0792 
0793     runMaterialValidation(
0794         trackingGeometry,
0795         decorators,
0796         field,
0797         outputDir=str(tmp_path),
0798         outputName="propagation-volume-material",
0799         s=s,
0800     )
0801 
0802     s.run()
0803 
0804     assert val_file.exists()
0805     assert_root_hash(val_file.name, val_file)
0806 
0807 
0808 @pytest.mark.parametrize(
0809     "geoFactory,nobj",
0810     [
0811         (GenericDetector.create, 450),
0812         pytest.param(
0813             getOpenDataDetector,
0814             540,
0815             marks=[
0816                 pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up"),
0817                 pytest.mark.slow,
0818                 pytest.mark.odd,
0819             ],
0820         ),
0821         (functools.partial(AlignedDetector.create, iovSize=1), 450),
0822     ],
0823 )
0824 @pytest.mark.slow
0825 def test_geometry_example(geoFactory, nobj, tmp_path):
0826     detector, trackingGeometry, decorators = geoFactory()
0827 
0828     from geometry import runGeometry
0829 
0830     json_dir = tmp_path / "json"
0831     csv_dir = tmp_path / "csv"
0832     obj_dir = tmp_path / "obj"
0833 
0834     for d in (json_dir, csv_dir, obj_dir):
0835         d.mkdir()
0836 
0837     events = 5
0838 
0839     kwargs = dict(
0840         trackingGeometry=trackingGeometry,
0841         decorators=decorators,
0842         events=events,
0843         outputDir=str(tmp_path),
0844     )
0845 
0846     runGeometry(outputJson=True, **kwargs)
0847     runGeometry(outputJson=False, **kwargs)
0848 
0849     assert len(list(obj_dir.iterdir())) == nobj
0850     assert all(f.stat().st_size > 200 for f in obj_dir.iterdir())
0851 
0852     assert len(list(csv_dir.iterdir())) == 3 * events
0853     assert all(f.stat().st_size > 200 for f in csv_dir.iterdir())
0854 
0855     detector_files = [csv_dir / f"event{i:>09}-detectors.csv" for i in range(events)]
0856     for detector_file in detector_files:
0857         assert detector_file.exists()
0858         assert detector_file.stat().st_size > 200
0859 
0860     contents = [f.read_text() for f in detector_files]
0861     ref = contents[0]
0862     for c in contents[1:]:
0863         if isinstance(detector, AlignedDetector):
0864             assert c != ref, "Detector writeout is expected to be different"
0865         else:
0866             assert c == ref, "Detector writeout is expected to be identical"
0867 
0868     if not isinstance(detector, AlignedDetector):
0869         for f in [json_dir / f"event{i:>09}-detector.json" for i in range(events)]:
0870             assert detector_file.exists()
0871             with f.open() as fh:
0872                 data = json.load(fh)
0873                 assert data
0874         material_file = tmp_path / "geometry-map.json"
0875         assert material_file.exists()
0876         assert material_file.stat().st_size > 200
0877 
0878 
0879 DIGI_SHARE_DIR = (
0880     Path(__file__).parent.parent.parent.parent
0881     / "Examples/Algorithms/Digitization/share"
0882 )
0883 
0884 
0885 @pytest.mark.parametrize(
0886     "digi_config_file",
0887     [
0888         DIGI_SHARE_DIR / "default-smearing-config-generic.json",
0889         DIGI_SHARE_DIR / "default-geometric-config-generic.json",
0890     ],
0891     ids=["smeared", "geometric"],
0892 )
0893 def test_digitization_example(trk_geo, tmp_path, assert_root_hash, digi_config_file):
0894     from digitization import runDigitization
0895 
0896     s = Sequencer(events=10, numThreads=-1)
0897 
0898     csv_dir = tmp_path / "csv"
0899     root_file = tmp_path / "measurements.root"
0900 
0901     assert not root_file.exists()
0902     assert not csv_dir.exists()
0903 
0904     field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0905     runDigitization(
0906         trk_geo, field, outputDir=tmp_path, digiConfigFile=digi_config_file, s=s
0907     )
0908 
0909     s.run()
0910 
0911     assert root_file.exists()
0912     assert csv_dir.exists()
0913 
0914     assert len(list(csv_dir.iterdir())) == 3 * s.config.events
0915     assert all(f.stat().st_size > 50 for f in csv_dir.iterdir())
0916 
0917     assert_entries(root_file, "vol9", 0)
0918     assert_entries(root_file, "vol14", 0)
0919 
0920     if "smearing" in digi_config_file.name:
0921         filled_entries = [f"vol{tn}" for tn in (8, 12, 13, 16, 17, 18)]
0922     else:
0923         # fmt: off
0924         filled_entries = [
0925             'vol8', 'vol8_lay2', 'vol12_lay8_mod117', 'vol12_lay10', 'vol12_lay10_mod154',
0926             'vol12_lay10_mod163', 'vol12_lay12', 'vol12_lay12_mod150', 'vol13',
0927             'vol13_lay2', 'vol16_lay2_mod53', 'vol16_lay4', 'vol16_lay6', 'vol16_lay8',
0928             'vol16_lay10', 'vol16_lay12', 'vol17', 'vol17_lay2', 'vol18_lay2',
0929             'vol18_lay2_mod1', 'vol18_lay2_mod49', 'vol18_lay2_mod86', 'vol18_lay4',
0930         ]
0931         # fmt: on
0932 
0933     for entry in filled_entries:
0934         assert_has_entries(root_file, entry)
0935 
0936     assert_root_hash(root_file.name, root_file)
0937 
0938 
0939 @pytest.mark.parametrize(
0940     "digi_config_file",
0941     [
0942         DIGI_SHARE_DIR / "default-smearing-config-generic.json",
0943         DIGI_SHARE_DIR / "default-geometric-config-generic.json",
0944     ],
0945     ids=["smeared", "geometric"],
0946 )
0947 def test_digitization_example_input(
0948     trk_geo, tmp_path, assert_root_hash, digi_config_file
0949 ):
0950     from particle_gun import runParticleGun
0951     from digitization import runDigitization
0952 
0953     ptcl_dir = tmp_path / "ptcl"
0954     ptcl_dir.mkdir()
0955     pgs = Sequencer(events=20, numThreads=-1)
0956     runParticleGun(str(ptcl_dir), s=pgs)
0957     pgs.run()
0958 
0959     s = Sequencer(numThreads=-1)
0960 
0961     csv_dir = tmp_path / "csv"
0962     root_file = tmp_path / "measurements.root"
0963 
0964     assert not root_file.exists()
0965     assert not csv_dir.exists()
0966 
0967     assert_root_hash(
0968         "particles.root",
0969         ptcl_dir / "particles.root",
0970     )
0971 
0972     field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0973     runDigitization(
0974         trk_geo,
0975         field,
0976         outputDir=tmp_path,
0977         digiConfigFile=digi_config_file,
0978         particlesInput=ptcl_dir / "particles.root",
0979         s=s,
0980         doMerge=True,
0981     )
0982 
0983     s.run()
0984 
0985     assert root_file.exists()
0986     assert csv_dir.exists()
0987 
0988     assert len(list(csv_dir.iterdir())) == 3 * pgs.config.events
0989     assert all(f.stat().st_size > 50 for f in csv_dir.iterdir())
0990 
0991     assert_entries(root_file, "vol7", 0)
0992     assert_entries(root_file, "vol9", 0)
0993 
0994     if "smearing" in digi_config_file.name:
0995         filled_entries = [f"vol{tn}" for tn in (8, 12, 13, 16, 17, 18)]
0996     else:
0997         # fmt: off
0998         filled_entries = [
0999             "vol8", "vol8_lay2", "vol12_lay8_mod120", "vol12_lay10_mod120",
1000             "vol12_lay10_mod144", "vol12_lay12", "vol12_lay12_mod111",
1001             "vol12_lay12_mod137", "vol12_lay12_mod170", "vol13", "vol13_lay2",
1002             "vol14_lay2_mod93", "vol14_lay2_mod102", "vol14_lay2_mod112",
1003             "vol14_lay2_mod118", "vol14_lay4_mod112", "vol14_lay4_mod118",
1004             "vol14_lay4_mod152", "vol14_lay4_mod161", "vol16_lay4", "vol16_lay6",
1005             "vol16_lay8", "vol16_lay10", "vol16_lay12", "vol17", "vol17_lay2",
1006             "vol18_lay2", "vol18_lay2_mod71", "vol18_lay4", "vol18_lay6",
1007             "vol18_lay8", "vol18_lay10"
1008         ]
1009         # fmt: on
1010 
1011     for entry in filled_entries:
1012         assert_has_entries(root_file, entry)
1013 
1014     assert_root_hash(root_file.name, root_file)
1015 
1016 
1017 def test_digitization_config_example(trk_geo, tmp_path):
1018     from digitization_config import runDigitizationConfig
1019 
1020     out_file = tmp_path / "output.json"
1021     assert not out_file.exists()
1022 
1023     input = (
1024         Path(__file__).parent
1025         / "../../../Examples/Algorithms/Digitization/share/default-smearing-config-generic.json"
1026     )
1027     assert input.exists(), input.resolve()
1028 
1029     runDigitizationConfig(trk_geo, input=input, output=out_file)
1030 
1031     assert out_file.exists()
1032 
1033     with out_file.open() as fh:
1034         data = json.load(fh)
1035     assert len(data.keys()) == 2
1036     assert data["acts-geometry-hierarchy-map"]["format-version"] == 0
1037     assert (
1038         data["acts-geometry-hierarchy-map"]["value-identifier"]
1039         == "digitization-configuration"
1040     )
1041     assert len(data["entries"]) == 27
1042 
1043 
1044 @pytest.mark.parametrize(
1045     "truthSmeared,truthEstimated",
1046     [
1047         [False, False],
1048         [False, True],
1049         [True, False],
1050     ],
1051     ids=["full_seeding", "truth_estimated", "truth_smeared"],
1052 )
1053 @pytest.mark.slow
1054 def test_ckf_tracks_example(
1055     tmp_path, assert_root_hash, truthSmeared, truthEstimated, detector_config
1056 ):
1057     csv = tmp_path / "csv"
1058 
1059     assert not csv.exists()
1060 
1061     field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
1062     events = 100
1063     s = Sequencer(events=events, numThreads=1)  # Digitization is not thread-safe
1064 
1065     root_files = [
1066         (
1067             "performance_ckf.root",
1068             None,
1069         ),
1070         (
1071             "trackstates_ckf.root",
1072             "trackstates",
1073         ),
1074         (
1075             "tracksummary_ckf.root",
1076             "tracksummary",
1077         ),
1078     ]
1079 
1080     if not truthSmeared:
1081         root_files += [
1082             (
1083                 "performance_seeding.root",
1084                 None,
1085             ),
1086         ]
1087 
1088     for rf, _ in root_files:
1089         assert not (tmp_path / rf).exists()
1090 
1091     from ckf_tracks import runCKFTracks
1092 
1093     runCKFTracks(
1094         detector_config.trackingGeometry,
1095         detector_config.decorators,
1096         field=field,
1097         outputCsv=True,
1098         outputDir=tmp_path,
1099         geometrySelection=detector_config.geometrySelection,
1100         digiConfigFile=detector_config.digiConfigFile,
1101         truthSmearedSeeded=truthSmeared,
1102         truthEstimatedSeeded=truthEstimated,
1103         s=s,
1104     )
1105 
1106     s.run()
1107 
1108     del s  # files are closed in destructors, not great
1109 
1110     assert csv.exists()
1111     for rf, tn in root_files:
1112         rp = tmp_path / rf
1113         assert rp.exists()
1114         if tn is not None:
1115             assert_root_hash(rf, rp)
1116 
1117     assert (
1118         len([f for f in csv.iterdir() if f.name.endswith("tracks_ckf.csv")]) == events
1119     )
1120     assert all([f.stat().st_size > 300 for f in csv.iterdir()])
1121 
1122 
1123 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
1124 @pytest.mark.odd
1125 @pytest.mark.slow
1126 def test_full_chain_odd_example(tmp_path):
1127     # This test literally only ensures that the full chain example can run without erroring out
1128     getOpenDataDetector()  # just to make sure it can build
1129 
1130     script = (
1131         Path(__file__).parent.parent.parent.parent
1132         / "Examples"
1133         / "Scripts"
1134         / "Python"
1135         / "full_chain_odd.py"
1136     )
1137     assert script.exists()
1138     env = os.environ.copy()
1139     env["ACTS_LOG_FAILURE_THRESHOLD"] = "ERROR"
1140     try:
1141         subprocess.check_call(
1142             [sys.executable, str(script), "-n1"],
1143             cwd=tmp_path,
1144             env=env,
1145             stderr=subprocess.STDOUT,
1146         )
1147     except subprocess.CalledProcessError as e:
1148         print(e.output.decode("utf-8"))
1149         raise
1150 
1151 
1152 @pytest.mark.skipif(
1153     not dd4hepEnabled or not geant4Enabled, reason="DD4hep and/or Geant4 not set up"
1154 )
1155 @pytest.mark.slow
1156 def test_full_chain_odd_example_pythia_geant4(tmp_path):
1157     # This test literally only ensures that the full chain example can run without erroring out
1158     getOpenDataDetector()  # just to make sure it can build
1159 
1160     script = (
1161         Path(__file__).parent.parent.parent.parent
1162         / "Examples"
1163         / "Scripts"
1164         / "Python"
1165         / "full_chain_odd.py"
1166     )
1167     assert script.exists()
1168     env = os.environ.copy()
1169     env["ACTS_LOG_FAILURE_THRESHOLD"] = "ERROR"
1170     try:
1171         stdout = subprocess.check_output(
1172             [
1173                 sys.executable,
1174                 str(script),
1175                 "-n1",
1176                 "--geant4",
1177                 "--ttbar",
1178                 "--ttbar-pu",
1179                 "50",
1180             ],
1181             cwd=tmp_path,
1182             env=env,
1183             stderr=subprocess.STDOUT,
1184         )
1185         stdout = stdout.decode("utf-8")
1186     except subprocess.CalledProcessError as e:
1187         print(e.output.decode("utf-8"))
1188         raise
1189 
1190     # collect and compare known errors
1191     errors = []
1192     error_regex = re.compile(r"^\d\d:\d\d:\d\d\s+(\w+)\s+ERROR\s+", re.MULTILINE)
1193     for match in error_regex.finditer(stdout):
1194         (algo,) = match.groups()
1195         errors.append(algo)
1196     errors = collections.Counter(errors)
1197     assert dict(errors) == {}, stdout
1198 
1199 
1200 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
1201 @pytest.mark.skipif(not onnxEnabled, reason="ONNX plugin not enabled")
1202 @pytest.mark.slow
1203 def test_ML_Ambiguity_Solver(tmp_path, assert_root_hash):
1204     root_file = "performance_ambiML.root"
1205     output_dir = "odd_output"
1206     assert not (tmp_path / root_file).exists()
1207     # This test literally only ensures that the full chain example can run without erroring out
1208     getOpenDataDetector()  # just to make sure it can build
1209 
1210     script = (
1211         Path(__file__).parent.parent.parent.parent
1212         / "Examples"
1213         / "Scripts"
1214         / "Python"
1215         / "full_chain_odd.py"
1216     )
1217     assert script.exists()
1218     env = os.environ.copy()
1219     env["ACTS_LOG_FAILURE_THRESHOLD"] = "ERROR"
1220     try:
1221         subprocess.check_call(
1222             [sys.executable, str(script), "-n1", "--MLSolver"],
1223             cwd=tmp_path,
1224             env=env,
1225             stderr=subprocess.STDOUT,
1226         )
1227     except subprocess.CalledProcessError as e:
1228         print(e.output.decode("utf-8"))
1229         raise
1230 
1231     rfp = tmp_path / output_dir / root_file
1232     assert rfp.exists()
1233 
1234     assert_root_hash(root_file, rfp)
1235 
1236 
1237 def test_bfield_writing(tmp_path, seq, assert_root_hash):
1238     from bfield_writing import runBFieldWriting
1239 
1240     root_files = [
1241         ("solenoid.root", "solenoid", 100),
1242         ("solenoid2.root", "solenoid", 100),
1243     ]
1244 
1245     for fn, _, _ in root_files:
1246         fp = tmp_path / fn
1247         assert not fp.exists()
1248 
1249     runBFieldWriting(outputDir=tmp_path, rewrites=1)
1250 
1251     for fn, tn, ee in root_files:
1252         fp = tmp_path / fn
1253         assert fp.exists()
1254         assert fp.stat().st_size > 2**10 * 2
1255         assert_entries(fp, tn, ee)
1256         assert_root_hash(fn, fp)
1257 
1258 
1259 @pytest.mark.parametrize("backend", ["onnx", "torch"])
1260 @pytest.mark.parametrize("hardware", ["cpu", "gpu"])
1261 @pytest.mark.skipif(not exatrkxEnabled, reason="ExaTrkX environment not set up")
1262 def test_exatrkx(tmp_path, trk_geo, field, assert_root_hash, backend, hardware):
1263     if backend == "onnx" and hardware == "cpu":
1264         pytest.skip("Combination of ONNX and CPU not yet supported")
1265 
1266     root_file = "performance_track_finding.root"
1267     assert not (tmp_path / root_file).exists()
1268 
1269     if backend == "onnx":
1270         url = "https://acts.web.cern.ch/ci/exatrkx/onnx_models_v01.tar"
1271     else:
1272         url = "https://acts.web.cern.ch/ci/exatrkx/torchscript_models_v01.tar"
1273 
1274     tarfile_name = tmp_path / "models.tar"
1275     urllib.request.urlretrieve(url, tarfile_name)
1276     tarfile.open(tarfile_name).extractall(tmp_path)
1277     script = (
1278         Path(__file__).parent.parent.parent.parent
1279         / "Examples"
1280         / "Scripts"
1281         / "Python"
1282         / "exatrkx.py"
1283     )
1284     assert script.exists()
1285     env = os.environ.copy()
1286     env["ACTS_LOG_FAILURE_THRESHOLD"] = "WARNING"
1287 
1288     if hardware == "cpu":
1289         env["CUDA_VISIBLE_DEVICES"] = ""
1290 
1291     try:
1292         subprocess.check_call(
1293             [sys.executable, str(script), backend],
1294             cwd=tmp_path,
1295             env=env,
1296             stderr=subprocess.STDOUT,
1297         )
1298     except subprocess.CalledProcessError as e:
1299         print(e.output.decode("utf-8"))
1300         raise
1301 
1302     rfp = tmp_path / root_file
1303     assert rfp.exists()
1304 
1305     assert_root_hash(root_file, rfp)