File indexing completed on 2025-08-05 08:10:07
0001 from pathlib import Path
0002 import os
0003 import json
0004 import functools
0005 import tarfile
0006 import urllib.request
0007 import subprocess
0008 import sys
0009 import re
0010 import collections
0011
0012 import pytest
0013
0014 from helpers import (
0015 geant4Enabled,
0016 dd4hepEnabled,
0017 hepmc3Enabled,
0018 pythia8Enabled,
0019 exatrkxEnabled,
0020 onnxEnabled,
0021 AssertCollectionExistsAlg,
0022 failure_threshold,
0023 )
0024
0025 import acts
0026 from acts.examples import (
0027 Sequencer,
0028 GenericDetector,
0029 AlignedDetector,
0030 )
0031 from acts.examples.odd import getOpenDataDetector
0032
0033
0034 u = acts.UnitConstants
0035
0036
0037 @pytest.fixture
0038 def field():
0039 return acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0040
0041
0042 @pytest.fixture
0043 def seq():
0044 return Sequencer(events=10, numThreads=1)
0045
0046
0047 def assert_csv_output(csv_path, stem):
0048 __tracebackhide__ = True
0049
0050 assert len([f for f in csv_path.iterdir() if f.name.endswith(stem + ".csv")]) > 0
0051 assert all([f.stat().st_size > 100 for f in csv_path.iterdir()])
0052
0053
0054 def assert_entries(root_file, tree_name, exp=None, non_zero=False):
0055 __tracebackhide__ = True
0056 import ROOT
0057
0058 ROOT.PyConfig.IgnoreCommandLineOptions = True
0059 ROOT.gROOT.SetBatch(True)
0060
0061 rf = ROOT.TFile.Open(str(root_file))
0062 keys = [k.GetName() for k in rf.GetListOfKeys()]
0063 assert tree_name in keys
0064 print("Entries:", rf.Get(tree_name).GetEntries())
0065 if non_zero:
0066 assert rf.Get(tree_name).GetEntries() > 0, f"{root_file}:{tree_name}"
0067 if exp is not None:
0068 assert rf.Get(tree_name).GetEntries() == exp, f"{root_file}:{tree_name}"
0069
0070
0071 def assert_has_entries(root_file, tree_name):
0072 __tracebackhide__ = True
0073 assert_entries(root_file, tree_name, non_zero=True)
0074
0075
0076 @pytest.mark.slow
0077 @pytest.mark.skipif(not pythia8Enabled, reason="Pythia8 not set up")
0078 def test_pythia8(tmp_path, seq, assert_root_hash):
0079 from pythia8 import runPythia8
0080
0081 (tmp_path / "csv").mkdir()
0082
0083 assert not (tmp_path / "pythia8_particles.root").exists()
0084 assert len(list((tmp_path / "csv").iterdir())) == 0
0085
0086 events = seq.config.events
0087
0088 runPythia8(str(tmp_path), outputRoot=True, outputCsv=True, s=seq).run()
0089
0090 del seq
0091
0092 fp = tmp_path / "pythia8_particles.root"
0093 assert fp.exists()
0094 assert fp.stat().st_size > 2**10 * 50
0095 assert_entries(fp, "particles", events)
0096 assert_root_hash(fp.name, fp)
0097
0098 assert len(list((tmp_path / "csv").iterdir())) > 0
0099 assert_csv_output(tmp_path / "csv", "particles")
0100
0101
0102 def test_fatras(trk_geo, tmp_path, field, assert_root_hash):
0103 from fatras import runFatras
0104
0105 csv = tmp_path / "csv"
0106 csv.mkdir()
0107
0108 nevents = 10
0109
0110 root_files = [
0111 (
0112 "particles_simulation.root",
0113 "particles",
0114 ),
0115 (
0116 "hits.root",
0117 "hits",
0118 ),
0119 ]
0120
0121 assert len(list(csv.iterdir())) == 0
0122 for rf, _ in root_files:
0123 assert not (tmp_path / rf).exists()
0124
0125 seq = Sequencer(events=nevents)
0126 runFatras(trk_geo, field, str(tmp_path), s=seq).run()
0127
0128 del seq
0129
0130 assert_csv_output(csv, "particles_final")
0131 assert_csv_output(csv, "particles_initial")
0132 assert_csv_output(csv, "hits")
0133 for f, tn in root_files:
0134 rfp = tmp_path / f
0135 assert rfp.exists()
0136 assert rfp.stat().st_size > 2**10 * 10
0137
0138 assert_has_entries(rfp, tn)
0139 assert_root_hash(f, rfp)
0140
0141
0142 @pytest.mark.slow
0143 @pytest.mark.odd
0144 @pytest.mark.skipif(not geant4Enabled, reason="Geant4 not set up")
0145 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
0146 def test_geant4(tmp_path, assert_root_hash):
0147
0148 getOpenDataDetector()
0149
0150 csv = tmp_path / "csv"
0151 csv.mkdir()
0152
0153 root_files = [
0154 "particles_simulation.root",
0155 "hits.root",
0156 ]
0157
0158 assert len(list(csv.iterdir())) == 0
0159 for rf in root_files:
0160 assert not (tmp_path / rf).exists()
0161
0162 script = (
0163 Path(__file__).parent.parent.parent.parent
0164 / "Examples"
0165 / "Scripts"
0166 / "Python"
0167 / "geant4.py"
0168 )
0169 assert script.exists()
0170 env = os.environ.copy()
0171 env["ACTS_LOG_FAILURE_THRESHOLD"] = "WARNING"
0172 try:
0173 subprocess.check_call(
0174 [sys.executable, str(script)],
0175 cwd=tmp_path,
0176 env=env,
0177 stderr=subprocess.STDOUT,
0178 )
0179 except subprocess.CalledProcessError as e:
0180 print(e.output.decode("utf-8"))
0181 raise
0182
0183 assert_csv_output(csv, "particles_final")
0184 assert_csv_output(csv, "particles_initial")
0185 assert_csv_output(csv, "hits")
0186 for f in root_files:
0187 rfp = tmp_path / f
0188 assert rfp.exists()
0189 assert rfp.stat().st_size > 2**10 * 10
0190
0191 assert_root_hash(f, rfp)
0192
0193
0194 def test_seeding(tmp_path, trk_geo, field, assert_root_hash):
0195 from seeding import runSeeding
0196
0197 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * acts.UnitConstants.T))
0198
0199 csv = tmp_path / "csv"
0200 csv.mkdir()
0201
0202 seq = Sequencer(events=10, numThreads=1)
0203
0204 root_files = [
0205 (
0206 "estimatedparams.root",
0207 "estimatedparams",
0208 ),
0209 (
0210 "performance_seeding.root",
0211 None,
0212 ),
0213 (
0214 "particles.root",
0215 "particles",
0216 ),
0217 (
0218 "particles_simulation.root",
0219 "particles",
0220 ),
0221 ]
0222
0223 for fn, _ in root_files:
0224 fp = tmp_path / fn
0225 assert not fp.exists()
0226
0227 assert len(list(csv.iterdir())) == 0
0228
0229 runSeeding(trk_geo, field, outputDir=str(tmp_path), s=seq).run()
0230
0231 del seq
0232
0233 for fn, tn in root_files:
0234 fp = tmp_path / fn
0235 assert fp.exists()
0236 assert fp.stat().st_size > 100
0237
0238 if tn is not None:
0239 assert_has_entries(fp, tn)
0240 assert_root_hash(fn, fp)
0241
0242 assert_csv_output(csv, "particles")
0243 assert_csv_output(csv, "particles_final")
0244 assert_csv_output(csv, "particles_initial")
0245
0246
0247 def test_seeding_orthogonal(tmp_path, trk_geo, field, assert_root_hash):
0248 from seeding import runSeeding, SeedingAlgorithm
0249
0250 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * acts.UnitConstants.T))
0251
0252 csv = tmp_path / "csv"
0253 csv.mkdir()
0254
0255 seq = Sequencer(events=10, numThreads=1)
0256
0257 root_files = [
0258 (
0259 "estimatedparams.root",
0260 "estimatedparams",
0261 ),
0262 (
0263 "performance_seeding.root",
0264 None,
0265 ),
0266 (
0267 "particles.root",
0268 "particles",
0269 ),
0270 (
0271 "particles_simulation.root",
0272 "particles",
0273 ),
0274 ]
0275
0276 for fn, _ in root_files:
0277 fp = tmp_path / fn
0278 assert not fp.exists()
0279
0280 assert len(list(csv.iterdir())) == 0
0281
0282 runSeeding(
0283 trk_geo,
0284 field,
0285 outputDir=str(tmp_path),
0286 s=seq,
0287 seedingAlgorithm=SeedingAlgorithm.Orthogonal,
0288 ).run()
0289
0290 del seq
0291
0292 for fn, tn in root_files:
0293 fp = tmp_path / fn
0294 assert fp.exists()
0295 assert fp.stat().st_size > 100
0296
0297 if tn is not None:
0298 assert_has_entries(fp, tn)
0299 assert_root_hash(fn, fp)
0300
0301 assert_csv_output(csv, "particles")
0302 assert_csv_output(csv, "particles_final")
0303 assert_csv_output(csv, "particles_initial")
0304
0305
0306 def test_itk_seeding(tmp_path, trk_geo, field, assert_root_hash):
0307 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * acts.UnitConstants.T))
0308
0309 csv = tmp_path / "csv"
0310 csv.mkdir()
0311
0312 seq = Sequencer(events=10, numThreads=1)
0313
0314 root_files = [
0315 (
0316 "estimatedparams.root",
0317 "estimatedparams",
0318 ),
0319 (
0320 "performance_seeding.root",
0321 None,
0322 ),
0323 (
0324 "particles.root",
0325 "particles",
0326 ),
0327 (
0328 "particles_simulation.root",
0329 "particles",
0330 ),
0331 ]
0332
0333 for fn, _ in root_files:
0334 fp = tmp_path / fn
0335 assert not fp.exists()
0336
0337 assert len(list(csv.iterdir())) == 0
0338
0339 rnd = acts.examples.RandomNumbers(seed=42)
0340
0341 from acts.examples.simulation import (
0342 addParticleGun,
0343 EtaConfig,
0344 MomentumConfig,
0345 ParticleConfig,
0346 addFatras,
0347 addDigitization,
0348 )
0349
0350 addParticleGun(
0351 seq,
0352 MomentumConfig(1.0 * u.GeV, 10.0 * u.GeV, True),
0353 EtaConfig(-4.0, 4.0, True),
0354 ParticleConfig(1, acts.PdgParticle.eMuon, True),
0355 outputDirCsv=tmp_path / "csv",
0356 outputDirRoot=str(tmp_path),
0357 rnd=rnd,
0358 )
0359
0360 addFatras(
0361 seq,
0362 trk_geo,
0363 field,
0364 outputDirCsv=tmp_path / "csv",
0365 outputDirRoot=str(tmp_path),
0366 rnd=rnd,
0367 )
0368
0369 srcdir = Path(__file__).resolve().parent.parent.parent.parent
0370 addDigitization(
0371 seq,
0372 trk_geo,
0373 field,
0374 digiConfigFile=srcdir
0375 / "Examples/Algorithms/Digitization/share/default-smearing-config-generic.json",
0376 rnd=rnd,
0377 )
0378
0379 from acts.examples.reconstruction import (
0380 addSeeding,
0381 TruthSeedRanges,
0382 )
0383 from acts.examples.itk import itkSeedingAlgConfig, InputSpacePointsType
0384
0385 addSeeding(
0386 seq,
0387 trk_geo,
0388 field,
0389 TruthSeedRanges(pt=(1.0 * u.GeV, None), eta=(-4, 4), nHits=(9, None)),
0390 *itkSeedingAlgConfig(InputSpacePointsType.PixelSpacePoints),
0391 acts.logging.VERBOSE,
0392 geoSelectionConfigFile=srcdir
0393 / "Examples/Algorithms/TrackFinding/share/geoSelection-genericDetector.json",
0394 inputParticles="particles_final",
0395 outputDirRoot=str(tmp_path),
0396 )
0397
0398 seq.run()
0399
0400 del seq
0401
0402 for fn, tn in root_files:
0403 fp = tmp_path / fn
0404 assert fp.exists()
0405 assert fp.stat().st_size > 100
0406
0407 if tn is not None:
0408 assert_has_entries(fp, tn)
0409 assert_root_hash(fn, fp)
0410
0411 assert_csv_output(csv, "particles")
0412 assert_csv_output(csv, "particles_final")
0413 assert_csv_output(csv, "particles_initial")
0414
0415
0416 @pytest.mark.slow
0417 def test_propagation(tmp_path, trk_geo, field, seq, assert_root_hash):
0418 from propagation import runPropagation
0419
0420 obj = tmp_path / "obj"
0421 obj.mkdir()
0422
0423 root_files = [
0424 (
0425 "propagation_steps.root",
0426 "propagation_steps",
0427 10000,
0428 )
0429 ]
0430
0431 for fn, _, _ in root_files:
0432 fp = tmp_path / fn
0433 assert not fp.exists()
0434
0435 assert len(list(obj.iterdir())) == 0
0436
0437 runPropagation(trk_geo, field, str(tmp_path), s=seq).run()
0438
0439 for fn, tn, ee in root_files:
0440 fp = tmp_path / fn
0441 assert fp.exists()
0442 assert fp.stat().st_size > 2**10 * 50
0443 assert_entries(fp, tn, ee)
0444 assert_root_hash(fn, fp)
0445
0446 assert len(list(obj.iterdir())) > 0
0447
0448
0449 @pytest.mark.slow
0450 @pytest.mark.odd
0451 @pytest.mark.skipif(not geant4Enabled, reason="Geant4 not set up")
0452 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
0453 def test_material_recording(tmp_path, material_recording, assert_root_hash):
0454 root_files = [
0455 (
0456 "geant4_material_tracks.root",
0457 "material-tracks",
0458 200,
0459 )
0460 ]
0461
0462 for fn, tn, ee in root_files:
0463 fp = material_recording / fn
0464 assert fp.exists()
0465 assert fp.stat().st_size > 2**10 * 50
0466 assert_entries(fp, tn, ee)
0467 assert_root_hash(fn, fp)
0468
0469
0470 @pytest.mark.slow
0471 @pytest.mark.odd
0472 @pytest.mark.skipif(not hepmc3Enabled, reason="HepMC3 plugin not available")
0473 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
0474 @pytest.mark.skipif(not geant4Enabled, reason="Geant4 not set up")
0475 def test_event_recording(tmp_path):
0476 script = (
0477 Path(__file__).parent.parent.parent.parent
0478 / "Examples"
0479 / "Scripts"
0480 / "Python"
0481 / "event_recording.py"
0482 )
0483 assert script.exists()
0484
0485 env = os.environ.copy()
0486 env["NEVENTS"] = "1"
0487 env["ACTS_LOG_FAILURE_THRESHOLD"] = "WARNING"
0488 try:
0489 subprocess.check_call(
0490 [sys.executable, str(script)],
0491 cwd=tmp_path,
0492 env=env,
0493 stderr=subprocess.STDOUT,
0494 )
0495 except subprocess.CalledProcessError as e:
0496 print(e.output.decode("utf-8"))
0497 raise
0498
0499 from acts.examples.hepmc3 import HepMC3AsciiReader
0500
0501 out_path = tmp_path / "hepmc3"
0502
0503
0504 assert len([f for f in out_path.iterdir() if f.name.endswith("events.hepmc3")]) > 0
0505 assert all([f.stat().st_size > 100 for f in out_path.iterdir()])
0506
0507 s = Sequencer(numThreads=1)
0508
0509 s.addReader(
0510 HepMC3AsciiReader(
0511 level=acts.logging.INFO,
0512 inputDir=str(out_path),
0513 inputStem="events",
0514 outputEvents="hepmc-events",
0515 )
0516 )
0517
0518 alg = AssertCollectionExistsAlg(
0519 "hepmc-events", name="check_alg", level=acts.logging.INFO
0520 )
0521 s.addAlgorithm(alg)
0522
0523 s.run()
0524
0525 assert alg.events_seen == 1
0526
0527
0528 @pytest.mark.parametrize("revFiltMomThresh", [0 * u.GeV, 1 * u.TeV])
0529 @pytest.mark.parametrize("directNavigation", [False, True])
0530 def test_truth_tracking_kalman(
0531 tmp_path, assert_root_hash, revFiltMomThresh, directNavigation, detector_config
0532 ):
0533 from truth_tracking_kalman import runTruthTrackingKalman
0534
0535 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0536
0537 seq = Sequencer(events=10, numThreads=1)
0538
0539 root_files = [
0540 ("trackstates_fitter.root", "trackstates", 19),
0541 ("tracksummary_fitter.root", "tracksummary", 10),
0542 ("performance_track_fitter.root", None, -1),
0543 ]
0544
0545 for fn, _, _ in root_files:
0546 fp = tmp_path / fn
0547 assert not fp.exists()
0548
0549 runTruthTrackingKalman(
0550 trackingGeometry=detector_config.trackingGeometry,
0551 field=field,
0552 digiConfigFile=detector_config.digiConfigFile,
0553 outputDir=tmp_path,
0554 reverseFilteringMomThreshold=revFiltMomThresh,
0555 directNavigation=directNavigation,
0556 s=seq,
0557 )
0558
0559 seq.run()
0560
0561 del seq
0562
0563 for fn, tn, ee in root_files:
0564 fp = tmp_path / fn
0565 assert fp.exists()
0566 assert fp.stat().st_size > 1024
0567 if tn is not None:
0568 assert_has_entries(fp, tn)
0569 assert_root_hash(fn, fp)
0570
0571 import ROOT
0572
0573 ROOT.PyConfig.IgnoreCommandLineOptions = True
0574 ROOT.gROOT.SetBatch(True)
0575 rf = ROOT.TFile.Open(str(tmp_path / "tracksummary_fitter.root"))
0576 keys = [k.GetName() for k in rf.GetListOfKeys()]
0577 assert "tracksummary" in keys
0578 for entry in rf.Get("tracksummary"):
0579 assert entry.hasFittedParams
0580
0581
0582 def test_truth_tracking_gsf(tmp_path, assert_root_hash, detector_config):
0583 from truth_tracking_gsf import runTruthTrackingGsf
0584
0585 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0586
0587 seq = Sequencer(
0588 events=10,
0589 numThreads=1,
0590 fpeMasks=[
0591 (
0592 "Core/include/Acts/TrackFitting/detail/GsfUtils.hpp:197",
0593 acts.FpeType.FLTUND,
0594 1,
0595 ),
0596 ],
0597 )
0598
0599 root_files = [
0600 ("trackstates_gsf.root", "trackstates"),
0601 ("tracksummary_gsf.root", "tracksummary"),
0602 ]
0603
0604 for fn, _ in root_files:
0605 fp = tmp_path / fn
0606 assert not fp.exists()
0607
0608 runTruthTrackingGsf(
0609 trackingGeometry=detector_config.trackingGeometry,
0610 decorators=detector_config.decorators,
0611 field=field,
0612 digiConfigFile=detector_config.digiConfigFile,
0613 outputDir=tmp_path,
0614 s=seq,
0615 )
0616
0617
0618 with failure_threshold(acts.logging.FATAL):
0619 seq.run()
0620
0621 del seq
0622
0623 for fn, tn in root_files:
0624 fp = tmp_path / fn
0625 assert fp.exists()
0626 assert fp.stat().st_size > 1024
0627 if tn is not None:
0628 assert_root_hash(fn, fp)
0629
0630
0631 def test_particle_gun(tmp_path, assert_root_hash):
0632 from particle_gun import runParticleGun
0633
0634 s = Sequencer(events=20, numThreads=-1)
0635
0636 csv_dir = tmp_path / "csv"
0637 root_file = tmp_path / "particles.root"
0638
0639 assert not csv_dir.exists()
0640 assert not root_file.exists()
0641
0642 runParticleGun(str(tmp_path), s=s).run()
0643
0644 assert csv_dir.exists()
0645 assert root_file.exists()
0646
0647 assert len([f for f in csv_dir.iterdir() if f.name.endswith("particles.csv")]) > 0
0648 assert all([f.stat().st_size > 100 for f in csv_dir.iterdir()])
0649
0650 assert root_file.stat().st_size > 200
0651 assert_entries(root_file, "particles", 20)
0652 assert_root_hash(root_file.name, root_file)
0653
0654
0655 @pytest.mark.slow
0656 @pytest.mark.odd
0657 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
0658 def test_material_mapping(material_recording, tmp_path, assert_root_hash):
0659 map_file = tmp_path / "material-map_tracks.root"
0660 assert not map_file.exists()
0661
0662 s = Sequencer(numThreads=1)
0663
0664 detector, trackingGeometry, decorators = getOpenDataDetector()
0665
0666 from material_mapping import runMaterialMapping
0667
0668 runMaterialMapping(
0669 trackingGeometry,
0670 decorators,
0671 outputDir=str(tmp_path),
0672 inputDir=material_recording,
0673 mappingStep=1,
0674 s=s,
0675 )
0676
0677 s.run()
0678
0679
0680
0681 del s
0682
0683 mat_file = tmp_path / "material-map.json"
0684
0685 assert mat_file.exists()
0686 assert mat_file.stat().st_size > 10
0687
0688 with mat_file.open() as fh:
0689 assert json.load(fh)
0690
0691 assert map_file.exists()
0692 assert_entries(map_file, "material-tracks", 200)
0693 assert_root_hash(map_file.name, map_file)
0694
0695 val_file = tmp_path / "propagation-material.root"
0696 assert not val_file.exists()
0697
0698
0699
0700
0701 del trackingGeometry
0702 del detector
0703
0704 detector, trackingGeometry, decorators = getOpenDataDetector(
0705 mdecorator=acts.IMaterialDecorator.fromFile(mat_file),
0706 )
0707
0708 from material_validation import runMaterialValidation
0709
0710 s = Sequencer(events=10, numThreads=1)
0711
0712 field = acts.NullBField()
0713
0714 runMaterialValidation(
0715 trackingGeometry, decorators, field, outputDir=str(tmp_path), s=s
0716 )
0717
0718 s.run()
0719
0720 assert val_file.exists()
0721 assert_entries(val_file, "material-tracks", 10000)
0722 assert_root_hash(val_file.name, val_file)
0723
0724
0725 @pytest.mark.slow
0726 @pytest.mark.odd
0727 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
0728 def test_volume_material_mapping(material_recording, tmp_path, assert_root_hash):
0729 map_file = tmp_path / "material-map-volume_tracks.root"
0730 assert not map_file.exists()
0731
0732 s = Sequencer(numThreads=1)
0733
0734 geo_map = Path(__file__).parent / "geometry-volume-map.json"
0735 assert geo_map.exists()
0736 assert geo_map.stat().st_size > 10
0737 with geo_map.open() as fh:
0738 assert json.load(fh)
0739
0740 detector, trackingGeometry, decorators = getOpenDataDetector(
0741 mdecorator=acts.IMaterialDecorator.fromFile(geo_map),
0742 )
0743
0744 from material_mapping import runMaterialMapping
0745
0746 runMaterialMapping(
0747 trackingGeometry,
0748 decorators,
0749 mapName="material-map-volume",
0750 outputDir=str(tmp_path),
0751 inputDir=material_recording,
0752 mappingStep=1,
0753 s=s,
0754 )
0755
0756 s.run()
0757
0758
0759
0760 del s
0761
0762 mat_file = tmp_path / "material-map-volume.json"
0763
0764 assert mat_file.exists()
0765 assert mat_file.stat().st_size > 10
0766
0767 with mat_file.open() as fh:
0768 assert json.load(fh)
0769
0770 assert map_file.exists()
0771 assert_entries(map_file, "material-tracks", 200)
0772 assert_root_hash(map_file.name, map_file)
0773
0774 val_file = tmp_path / "propagation-volume-material.root"
0775 assert not val_file.exists()
0776
0777
0778
0779
0780 del trackingGeometry
0781 del detector
0782
0783 detector, trackingGeometry, decorators = getOpenDataDetector(
0784 mdecorator=acts.IMaterialDecorator.fromFile(mat_file),
0785 )
0786
0787 from material_validation import runMaterialValidation
0788
0789 s = Sequencer(events=10, numThreads=1)
0790
0791 field = acts.NullBField()
0792
0793 runMaterialValidation(
0794 trackingGeometry,
0795 decorators,
0796 field,
0797 outputDir=str(tmp_path),
0798 outputName="propagation-volume-material",
0799 s=s,
0800 )
0801
0802 s.run()
0803
0804 assert val_file.exists()
0805 assert_root_hash(val_file.name, val_file)
0806
0807
0808 @pytest.mark.parametrize(
0809 "geoFactory,nobj",
0810 [
0811 (GenericDetector.create, 450),
0812 pytest.param(
0813 getOpenDataDetector,
0814 540,
0815 marks=[
0816 pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up"),
0817 pytest.mark.slow,
0818 pytest.mark.odd,
0819 ],
0820 ),
0821 (functools.partial(AlignedDetector.create, iovSize=1), 450),
0822 ],
0823 )
0824 @pytest.mark.slow
0825 def test_geometry_example(geoFactory, nobj, tmp_path):
0826 detector, trackingGeometry, decorators = geoFactory()
0827
0828 from geometry import runGeometry
0829
0830 json_dir = tmp_path / "json"
0831 csv_dir = tmp_path / "csv"
0832 obj_dir = tmp_path / "obj"
0833
0834 for d in (json_dir, csv_dir, obj_dir):
0835 d.mkdir()
0836
0837 events = 5
0838
0839 kwargs = dict(
0840 trackingGeometry=trackingGeometry,
0841 decorators=decorators,
0842 events=events,
0843 outputDir=str(tmp_path),
0844 )
0845
0846 runGeometry(outputJson=True, **kwargs)
0847 runGeometry(outputJson=False, **kwargs)
0848
0849 assert len(list(obj_dir.iterdir())) == nobj
0850 assert all(f.stat().st_size > 200 for f in obj_dir.iterdir())
0851
0852 assert len(list(csv_dir.iterdir())) == 3 * events
0853 assert all(f.stat().st_size > 200 for f in csv_dir.iterdir())
0854
0855 detector_files = [csv_dir / f"event{i:>09}-detectors.csv" for i in range(events)]
0856 for detector_file in detector_files:
0857 assert detector_file.exists()
0858 assert detector_file.stat().st_size > 200
0859
0860 contents = [f.read_text() for f in detector_files]
0861 ref = contents[0]
0862 for c in contents[1:]:
0863 if isinstance(detector, AlignedDetector):
0864 assert c != ref, "Detector writeout is expected to be different"
0865 else:
0866 assert c == ref, "Detector writeout is expected to be identical"
0867
0868 if not isinstance(detector, AlignedDetector):
0869 for f in [json_dir / f"event{i:>09}-detector.json" for i in range(events)]:
0870 assert detector_file.exists()
0871 with f.open() as fh:
0872 data = json.load(fh)
0873 assert data
0874 material_file = tmp_path / "geometry-map.json"
0875 assert material_file.exists()
0876 assert material_file.stat().st_size > 200
0877
0878
0879 DIGI_SHARE_DIR = (
0880 Path(__file__).parent.parent.parent.parent
0881 / "Examples/Algorithms/Digitization/share"
0882 )
0883
0884
0885 @pytest.mark.parametrize(
0886 "digi_config_file",
0887 [
0888 DIGI_SHARE_DIR / "default-smearing-config-generic.json",
0889 DIGI_SHARE_DIR / "default-geometric-config-generic.json",
0890 ],
0891 ids=["smeared", "geometric"],
0892 )
0893 def test_digitization_example(trk_geo, tmp_path, assert_root_hash, digi_config_file):
0894 from digitization import runDigitization
0895
0896 s = Sequencer(events=10, numThreads=-1)
0897
0898 csv_dir = tmp_path / "csv"
0899 root_file = tmp_path / "measurements.root"
0900
0901 assert not root_file.exists()
0902 assert not csv_dir.exists()
0903
0904 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0905 runDigitization(
0906 trk_geo, field, outputDir=tmp_path, digiConfigFile=digi_config_file, s=s
0907 )
0908
0909 s.run()
0910
0911 assert root_file.exists()
0912 assert csv_dir.exists()
0913
0914 assert len(list(csv_dir.iterdir())) == 3 * s.config.events
0915 assert all(f.stat().st_size > 50 for f in csv_dir.iterdir())
0916
0917 assert_entries(root_file, "vol9", 0)
0918 assert_entries(root_file, "vol14", 0)
0919
0920 if "smearing" in digi_config_file.name:
0921 filled_entries = [f"vol{tn}" for tn in (8, 12, 13, 16, 17, 18)]
0922 else:
0923
0924 filled_entries = [
0925 'vol8', 'vol8_lay2', 'vol12_lay8_mod117', 'vol12_lay10', 'vol12_lay10_mod154',
0926 'vol12_lay10_mod163', 'vol12_lay12', 'vol12_lay12_mod150', 'vol13',
0927 'vol13_lay2', 'vol16_lay2_mod53', 'vol16_lay4', 'vol16_lay6', 'vol16_lay8',
0928 'vol16_lay10', 'vol16_lay12', 'vol17', 'vol17_lay2', 'vol18_lay2',
0929 'vol18_lay2_mod1', 'vol18_lay2_mod49', 'vol18_lay2_mod86', 'vol18_lay4',
0930 ]
0931
0932
0933 for entry in filled_entries:
0934 assert_has_entries(root_file, entry)
0935
0936 assert_root_hash(root_file.name, root_file)
0937
0938
0939 @pytest.mark.parametrize(
0940 "digi_config_file",
0941 [
0942 DIGI_SHARE_DIR / "default-smearing-config-generic.json",
0943 DIGI_SHARE_DIR / "default-geometric-config-generic.json",
0944 ],
0945 ids=["smeared", "geometric"],
0946 )
0947 def test_digitization_example_input(
0948 trk_geo, tmp_path, assert_root_hash, digi_config_file
0949 ):
0950 from particle_gun import runParticleGun
0951 from digitization import runDigitization
0952
0953 ptcl_dir = tmp_path / "ptcl"
0954 ptcl_dir.mkdir()
0955 pgs = Sequencer(events=20, numThreads=-1)
0956 runParticleGun(str(ptcl_dir), s=pgs)
0957 pgs.run()
0958
0959 s = Sequencer(numThreads=-1)
0960
0961 csv_dir = tmp_path / "csv"
0962 root_file = tmp_path / "measurements.root"
0963
0964 assert not root_file.exists()
0965 assert not csv_dir.exists()
0966
0967 assert_root_hash(
0968 "particles.root",
0969 ptcl_dir / "particles.root",
0970 )
0971
0972 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
0973 runDigitization(
0974 trk_geo,
0975 field,
0976 outputDir=tmp_path,
0977 digiConfigFile=digi_config_file,
0978 particlesInput=ptcl_dir / "particles.root",
0979 s=s,
0980 doMerge=True,
0981 )
0982
0983 s.run()
0984
0985 assert root_file.exists()
0986 assert csv_dir.exists()
0987
0988 assert len(list(csv_dir.iterdir())) == 3 * pgs.config.events
0989 assert all(f.stat().st_size > 50 for f in csv_dir.iterdir())
0990
0991 assert_entries(root_file, "vol7", 0)
0992 assert_entries(root_file, "vol9", 0)
0993
0994 if "smearing" in digi_config_file.name:
0995 filled_entries = [f"vol{tn}" for tn in (8, 12, 13, 16, 17, 18)]
0996 else:
0997
0998 filled_entries = [
0999 "vol8", "vol8_lay2", "vol12_lay8_mod120", "vol12_lay10_mod120",
1000 "vol12_lay10_mod144", "vol12_lay12", "vol12_lay12_mod111",
1001 "vol12_lay12_mod137", "vol12_lay12_mod170", "vol13", "vol13_lay2",
1002 "vol14_lay2_mod93", "vol14_lay2_mod102", "vol14_lay2_mod112",
1003 "vol14_lay2_mod118", "vol14_lay4_mod112", "vol14_lay4_mod118",
1004 "vol14_lay4_mod152", "vol14_lay4_mod161", "vol16_lay4", "vol16_lay6",
1005 "vol16_lay8", "vol16_lay10", "vol16_lay12", "vol17", "vol17_lay2",
1006 "vol18_lay2", "vol18_lay2_mod71", "vol18_lay4", "vol18_lay6",
1007 "vol18_lay8", "vol18_lay10"
1008 ]
1009
1010
1011 for entry in filled_entries:
1012 assert_has_entries(root_file, entry)
1013
1014 assert_root_hash(root_file.name, root_file)
1015
1016
1017 def test_digitization_config_example(trk_geo, tmp_path):
1018 from digitization_config import runDigitizationConfig
1019
1020 out_file = tmp_path / "output.json"
1021 assert not out_file.exists()
1022
1023 input = (
1024 Path(__file__).parent
1025 / "../../../Examples/Algorithms/Digitization/share/default-smearing-config-generic.json"
1026 )
1027 assert input.exists(), input.resolve()
1028
1029 runDigitizationConfig(trk_geo, input=input, output=out_file)
1030
1031 assert out_file.exists()
1032
1033 with out_file.open() as fh:
1034 data = json.load(fh)
1035 assert len(data.keys()) == 2
1036 assert data["acts-geometry-hierarchy-map"]["format-version"] == 0
1037 assert (
1038 data["acts-geometry-hierarchy-map"]["value-identifier"]
1039 == "digitization-configuration"
1040 )
1041 assert len(data["entries"]) == 27
1042
1043
1044 @pytest.mark.parametrize(
1045 "truthSmeared,truthEstimated",
1046 [
1047 [False, False],
1048 [False, True],
1049 [True, False],
1050 ],
1051 ids=["full_seeding", "truth_estimated", "truth_smeared"],
1052 )
1053 @pytest.mark.slow
1054 def test_ckf_tracks_example(
1055 tmp_path, assert_root_hash, truthSmeared, truthEstimated, detector_config
1056 ):
1057 csv = tmp_path / "csv"
1058
1059 assert not csv.exists()
1060
1061 field = acts.ConstantBField(acts.Vector3(0, 0, 2 * u.T))
1062 events = 100
1063 s = Sequencer(events=events, numThreads=1)
1064
1065 root_files = [
1066 (
1067 "performance_ckf.root",
1068 None,
1069 ),
1070 (
1071 "trackstates_ckf.root",
1072 "trackstates",
1073 ),
1074 (
1075 "tracksummary_ckf.root",
1076 "tracksummary",
1077 ),
1078 ]
1079
1080 if not truthSmeared:
1081 root_files += [
1082 (
1083 "performance_seeding.root",
1084 None,
1085 ),
1086 ]
1087
1088 for rf, _ in root_files:
1089 assert not (tmp_path / rf).exists()
1090
1091 from ckf_tracks import runCKFTracks
1092
1093 runCKFTracks(
1094 detector_config.trackingGeometry,
1095 detector_config.decorators,
1096 field=field,
1097 outputCsv=True,
1098 outputDir=tmp_path,
1099 geometrySelection=detector_config.geometrySelection,
1100 digiConfigFile=detector_config.digiConfigFile,
1101 truthSmearedSeeded=truthSmeared,
1102 truthEstimatedSeeded=truthEstimated,
1103 s=s,
1104 )
1105
1106 s.run()
1107
1108 del s
1109
1110 assert csv.exists()
1111 for rf, tn in root_files:
1112 rp = tmp_path / rf
1113 assert rp.exists()
1114 if tn is not None:
1115 assert_root_hash(rf, rp)
1116
1117 assert (
1118 len([f for f in csv.iterdir() if f.name.endswith("tracks_ckf.csv")]) == events
1119 )
1120 assert all([f.stat().st_size > 300 for f in csv.iterdir()])
1121
1122
1123 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
1124 @pytest.mark.odd
1125 @pytest.mark.slow
1126 def test_full_chain_odd_example(tmp_path):
1127
1128 getOpenDataDetector()
1129
1130 script = (
1131 Path(__file__).parent.parent.parent.parent
1132 / "Examples"
1133 / "Scripts"
1134 / "Python"
1135 / "full_chain_odd.py"
1136 )
1137 assert script.exists()
1138 env = os.environ.copy()
1139 env["ACTS_LOG_FAILURE_THRESHOLD"] = "ERROR"
1140 try:
1141 subprocess.check_call(
1142 [sys.executable, str(script), "-n1"],
1143 cwd=tmp_path,
1144 env=env,
1145 stderr=subprocess.STDOUT,
1146 )
1147 except subprocess.CalledProcessError as e:
1148 print(e.output.decode("utf-8"))
1149 raise
1150
1151
1152 @pytest.mark.skipif(
1153 not dd4hepEnabled or not geant4Enabled, reason="DD4hep and/or Geant4 not set up"
1154 )
1155 @pytest.mark.slow
1156 def test_full_chain_odd_example_pythia_geant4(tmp_path):
1157
1158 getOpenDataDetector()
1159
1160 script = (
1161 Path(__file__).parent.parent.parent.parent
1162 / "Examples"
1163 / "Scripts"
1164 / "Python"
1165 / "full_chain_odd.py"
1166 )
1167 assert script.exists()
1168 env = os.environ.copy()
1169 env["ACTS_LOG_FAILURE_THRESHOLD"] = "ERROR"
1170 try:
1171 stdout = subprocess.check_output(
1172 [
1173 sys.executable,
1174 str(script),
1175 "-n1",
1176 "--geant4",
1177 "--ttbar",
1178 "--ttbar-pu",
1179 "50",
1180 ],
1181 cwd=tmp_path,
1182 env=env,
1183 stderr=subprocess.STDOUT,
1184 )
1185 stdout = stdout.decode("utf-8")
1186 except subprocess.CalledProcessError as e:
1187 print(e.output.decode("utf-8"))
1188 raise
1189
1190
1191 errors = []
1192 error_regex = re.compile(r"^\d\d:\d\d:\d\d\s+(\w+)\s+ERROR\s+", re.MULTILINE)
1193 for match in error_regex.finditer(stdout):
1194 (algo,) = match.groups()
1195 errors.append(algo)
1196 errors = collections.Counter(errors)
1197 assert dict(errors) == {}, stdout
1198
1199
1200 @pytest.mark.skipif(not dd4hepEnabled, reason="DD4hep not set up")
1201 @pytest.mark.skipif(not onnxEnabled, reason="ONNX plugin not enabled")
1202 @pytest.mark.slow
1203 def test_ML_Ambiguity_Solver(tmp_path, assert_root_hash):
1204 root_file = "performance_ambiML.root"
1205 output_dir = "odd_output"
1206 assert not (tmp_path / root_file).exists()
1207
1208 getOpenDataDetector()
1209
1210 script = (
1211 Path(__file__).parent.parent.parent.parent
1212 / "Examples"
1213 / "Scripts"
1214 / "Python"
1215 / "full_chain_odd.py"
1216 )
1217 assert script.exists()
1218 env = os.environ.copy()
1219 env["ACTS_LOG_FAILURE_THRESHOLD"] = "ERROR"
1220 try:
1221 subprocess.check_call(
1222 [sys.executable, str(script), "-n1", "--MLSolver"],
1223 cwd=tmp_path,
1224 env=env,
1225 stderr=subprocess.STDOUT,
1226 )
1227 except subprocess.CalledProcessError as e:
1228 print(e.output.decode("utf-8"))
1229 raise
1230
1231 rfp = tmp_path / output_dir / root_file
1232 assert rfp.exists()
1233
1234 assert_root_hash(root_file, rfp)
1235
1236
1237 def test_bfield_writing(tmp_path, seq, assert_root_hash):
1238 from bfield_writing import runBFieldWriting
1239
1240 root_files = [
1241 ("solenoid.root", "solenoid", 100),
1242 ("solenoid2.root", "solenoid", 100),
1243 ]
1244
1245 for fn, _, _ in root_files:
1246 fp = tmp_path / fn
1247 assert not fp.exists()
1248
1249 runBFieldWriting(outputDir=tmp_path, rewrites=1)
1250
1251 for fn, tn, ee in root_files:
1252 fp = tmp_path / fn
1253 assert fp.exists()
1254 assert fp.stat().st_size > 2**10 * 2
1255 assert_entries(fp, tn, ee)
1256 assert_root_hash(fn, fp)
1257
1258
1259 @pytest.mark.parametrize("backend", ["onnx", "torch"])
1260 @pytest.mark.parametrize("hardware", ["cpu", "gpu"])
1261 @pytest.mark.skipif(not exatrkxEnabled, reason="ExaTrkX environment not set up")
1262 def test_exatrkx(tmp_path, trk_geo, field, assert_root_hash, backend, hardware):
1263 if backend == "onnx" and hardware == "cpu":
1264 pytest.skip("Combination of ONNX and CPU not yet supported")
1265
1266 root_file = "performance_track_finding.root"
1267 assert not (tmp_path / root_file).exists()
1268
1269 if backend == "onnx":
1270 url = "https://acts.web.cern.ch/ci/exatrkx/onnx_models_v01.tar"
1271 else:
1272 url = "https://acts.web.cern.ch/ci/exatrkx/torchscript_models_v01.tar"
1273
1274 tarfile_name = tmp_path / "models.tar"
1275 urllib.request.urlretrieve(url, tarfile_name)
1276 tarfile.open(tarfile_name).extractall(tmp_path)
1277 script = (
1278 Path(__file__).parent.parent.parent.parent
1279 / "Examples"
1280 / "Scripts"
1281 / "Python"
1282 / "exatrkx.py"
1283 )
1284 assert script.exists()
1285 env = os.environ.copy()
1286 env["ACTS_LOG_FAILURE_THRESHOLD"] = "WARNING"
1287
1288 if hardware == "cpu":
1289 env["CUDA_VISIBLE_DEVICES"] = ""
1290
1291 try:
1292 subprocess.check_call(
1293 [sys.executable, str(script), backend],
1294 cwd=tmp_path,
1295 env=env,
1296 stderr=subprocess.STDOUT,
1297 )
1298 except subprocess.CalledProcessError as e:
1299 print(e.output.decode("utf-8"))
1300 raise
1301
1302 rfp = tmp_path / root_file
1303 assert rfp.exists()
1304
1305 assert_root_hash(root_file, rfp)