From 335e4ddb94b65265e6f7f23e2726cc9e98899cf0 Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Mon, 18 May 2026 17:09:43 -0400 Subject: [PATCH 01/22] Unfolder topologies no longer pull in no-op Folder --- src/libraries/JANA/Topology/JArrow.cc | 2 +- .../JANA/Topology/JTopologyBuilder.cc | 23 +++++++++++-------- .../JANA/Topology/JTopologyBuilder.h | 2 +- src/libraries/JANA/Topology/JUnfoldArrow.h | 12 ++++++---- .../Topology/MultiLevelTopologyTests.cc | 7 ++---- 5 files changed, 24 insertions(+), 22 deletions(-) diff --git a/src/libraries/JANA/Topology/JArrow.cc b/src/libraries/JANA/Topology/JArrow.cc index 5783c92aa..4f1b2f22c 100644 --- a/src/libraries/JANA/Topology/JArrow.cc +++ b/src/libraries/JANA/Topology/JArrow.cc @@ -43,7 +43,7 @@ void JArrow::Push(OutputData& outputs, size_t output_count, size_t location_id) port.GetPool()->Ingest(event, location_id); } else { - throw JException("Arrow %s: Port %d not wired!", m_name.c_str(), port_index); + throw JException("Arrow %s: Port %s not wired!", m_name.c_str(), port.GetName().c_str()); } } } diff --git a/src/libraries/JANA/Topology/JTopologyBuilder.cc b/src/libraries/JANA/Topology/JTopologyBuilder.cc index af621a0e1..8b7e8381e 100644 --- a/src/libraries/JANA/Topology/JTopologyBuilder.cc +++ b/src/libraries/JANA/Topology/JTopologyBuilder.cc @@ -149,7 +149,7 @@ void JTopologyBuilder::CreateTopology() { LOG_WARN(GetLogger()) << "Found custom topology configurator! Modified arrow topology is: \n" << PrintTopology() << LOG_END; } else { - AttachLevel(JEventLevel::Run, nullptr, nullptr); + AttachLevel(JEventLevel::Run, nullptr, nullptr, nullptr); LOG_INFO(GetLogger()) << "Arrow topology is:\n" << PrintTopology() << LOG_END; } for (auto* arrow : arrows) { @@ -321,7 +321,7 @@ std::pair JTopologyBuilder::CreateTapChain(std::vector 1) { throw JException("Multiple JEventUnfolders provided for level %s", level_str.c_str()); } @@ -399,6 +396,10 @@ void JTopologyBuilder::AttachLevel(JEventLevel current_level, JUnfoldArrow* pare LOG_INFO(GetLogger()) << "Creating event pool with level=" << current_level << " and size=" << m_max_inflight_events[current_level]; JEventPool* pool_at_level = new JEventPool(m_components, m_max_inflight_events[current_level], m_location_count, current_level); pools.push_back(pool_at_level); // Hand over ownership of the pool to the topology + if (parent_pool != nullptr) { + LOG_DEBUG(GetLogger()) << "Attaching " << current_level << " pool to parent pool"; + pool_at_level->AttachForwardingPool(parent_pool); + } LOG_INFO(GetLogger()) << "Finished creating event pool"; // -------------------------- @@ -446,7 +447,8 @@ void JTopologyBuilder::AttachLevel(JEventLevel current_level, JUnfoldArrow* pare bool need_unfold = have_unfolder; if (need_unfold) { unfold_arrow = new JUnfoldArrow(level_str+"Unfold", unfolders_at_level[0]); - unfold_arrow->GetPort(JUnfoldArrow::REJECTED_PARENT_OUT).Attach(pool_at_level); + unfold_arrow->GetPort(JUnfoldArrow::PARENT_IN).Attach(pool_at_level); + unfold_arrow->GetPort(JUnfoldArrow::PARENT_OUT).Attach(pool_at_level); arrows.push_back(unfold_arrow); } @@ -454,7 +456,7 @@ void JTopologyBuilder::AttachLevel(JEventLevel current_level, JUnfoldArrow* pare // 4. Fold // -------------------------- JFoldArrow* fold_arrow = nullptr; - bool need_fold = have_unfolder; + bool need_fold = false; // No folders_at_level for now if(need_fold) { fold_arrow = new JFoldArrow(level_str+"Fold", current_level, unfolders_at_level[0]->GetChildLevel()); arrows.push_back(fold_arrow); @@ -495,6 +497,7 @@ void JTopologyBuilder::AttachLevel(JEventLevel current_level, JUnfoldArrow* pare // -------------------------- if (parent_unfolder != nullptr) { parent_unfolder->GetPort(JUnfoldArrow::CHILD_IN).Attach(pool_at_level); + parent_unfolder->GetPort(JUnfoldArrow::CHILD_OUT).Attach(pool_at_level); ConnectToFirstAvailable(parent_unfolder, JUnfoldArrow::CHILD_OUT, {{map1_arrow, JMapArrow::EVENT_IN}, {unfold_arrow, JUnfoldArrow::PARENT_IN}, {map2_arrow, JMapArrow::EVENT_IN}, {first_tap_arrow, JTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}}); } @@ -507,7 +510,7 @@ void JTopologyBuilder::AttachLevel(JEventLevel current_level, JUnfoldArrow* pare {{unfold_arrow, JUnfoldArrow::PARENT_IN}, {map2_arrow, JMapArrow::EVENT_IN}, {first_tap_arrow, JTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}}); } if (unfold_arrow != nullptr) { - ConnectToFirstAvailable(unfold_arrow, JUnfoldArrow::REJECTED_PARENT_OUT, + ConnectToFirstAvailable(unfold_arrow, JUnfoldArrow::PARENT_OUT, {{map2_arrow, JMapArrow::EVENT_IN}, {first_tap_arrow, JTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}}); } if (fold_arrow != nullptr) { @@ -529,7 +532,7 @@ void JTopologyBuilder::AttachLevel(JEventLevel current_level, JUnfoldArrow* pare // Finally, we recur over lower levels! if (need_unfold) { auto next_level = unfolders_at_level[0]->GetChildLevel(); - AttachLevel(next_level, unfold_arrow, fold_arrow); + AttachLevel(next_level, unfold_arrow, fold_arrow, pool_at_level); } else { // This is the lowest level diff --git a/src/libraries/JANA/Topology/JTopologyBuilder.h b/src/libraries/JANA/Topology/JTopologyBuilder.h index 744d2ec72..0742a742f 100644 --- a/src/libraries/JANA/Topology/JTopologyBuilder.h +++ b/src/libraries/JANA/Topology/JTopologyBuilder.h @@ -76,7 +76,7 @@ class JTopologyBuilder : public JService { const JProcessorMapping& GetProcessorMapping() { return mapping; }; private: - void AttachLevel(JEventLevel current_level, JUnfoldArrow* parent_unfolder, JFoldArrow* parent_folder); + void AttachLevel(JEventLevel current_level, JUnfoldArrow* parent_unfolder, JFoldArrow* parent_folder, JEventPool* parent_pool); void ConnectToFirstAvailable(JArrow* upstream, size_t upstream_port_id, std::vector> downstreams); void Connect(JArrow* upstream, size_t upstream_port_id, JArrow* downstream, size_t downstream_port_id); std::pair CreateTapChain(std::vector& procs, std::string name); diff --git a/src/libraries/JANA/Topology/JUnfoldArrow.h b/src/libraries/JANA/Topology/JUnfoldArrow.h index 7d69b4c6d..6639b5ce5 100644 --- a/src/libraries/JANA/Topology/JUnfoldArrow.h +++ b/src/libraries/JANA/Topology/JUnfoldArrow.h @@ -8,7 +8,7 @@ class JUnfoldArrow : public JArrow { public: - enum PortIndex {PARENT_IN=0, CHILD_IN=1, CHILD_OUT=2, REJECTED_PARENT_OUT=3}; + enum PortIndex {PARENT_IN=0, CHILD_IN=1, CHILD_OUT=2, PARENT_OUT=3}; private: JEventUnfolder* m_unfolder = nullptr; @@ -26,7 +26,7 @@ class JUnfoldArrow : public JArrow { // Just in case there's a folder that needs this. // establishes_ordering is cheap; enforces_ordering is the expensive one - AddPort("rejected_parent_out", parent_level); + AddPort("parent_out", parent_level); m_next_input_port = GetPortIndex("parent_in"); } @@ -89,7 +89,7 @@ class JUnfoldArrow : public JArrow { if (result == JEventUnfolder::Result::KeepChildNextParent) { // KeepChildNextParent is a little more complicated because we have to handle the case of the parent having no children. - // In this case the parent obviously doesn't get shared among any children, and instead it is sent to the REJECTED_PARENT_OUT port. + // In this case the parent obviously doesn't get shared among any children, and instead it is sent to the PARENT_OUT port. int child_count = m_parent_event->ReleaseRefToSelf(); // Decrement the reference count so that this can be recycled LOG_DEBUG(m_logger) << "Unfold finished with parent event = " << m_parent_event->GetEventNumber() << " (" << child_count << " children emitted)"; @@ -104,7 +104,7 @@ class JUnfoldArrow : public JArrow { else { // Parent has NO children output_count = 1; - outputs[0] = {m_parent_event, REJECTED_PARENT_OUT}; + outputs[0] = {m_parent_event, PARENT_OUT}; m_parent_event = nullptr; m_next_input_port = PARENT_IN; status = JArrow::FireResult::KeepGoing; @@ -123,8 +123,10 @@ class JUnfoldArrow : public JArrow { else if (result == JEventUnfolder::Result::NextChildNextParent) { m_child_event->SetParent(m_parent_event); m_parent_event->ReleaseRefToSelf(); // Decrement the reference count so that this can be recycled + // TODO: Get rid of RefToSelf mechanism outputs[0] = {m_child_event, CHILD_OUT}; - output_count = 1; + outputs[1] = {m_parent_event, PARENT_OUT}; + output_count = 2; LOG_DEBUG(m_logger) << "Unfold finished with parent event = " << m_parent_event->GetEventNumber() << LOG_END; m_child_event = nullptr; m_parent_event = nullptr; diff --git a/src/programs/unit_tests/Topology/MultiLevelTopologyTests.cc b/src/programs/unit_tests/Topology/MultiLevelTopologyTests.cc index d5f934399..7c88ac1b6 100644 --- a/src/programs/unit_tests/Topology/MultiLevelTopologyTests.cc +++ b/src/programs/unit_tests/Topology/MultiLevelTopologyTests.cc @@ -31,7 +31,7 @@ TEST_CASE("TimeslicesTests_FineGrained") { app.Initialize(); auto ee = app.GetService(); auto top = app.GetService(); - enum ArrowId {TS_SRC=0, TS_MAP=1, TS_UNF=2, TS_FLD=3, PH_MAP=4, PH_TAP=5}; + enum ArrowId {TS_SRC=0, TS_MAP=1, TS_UNF=2, PH_MAP=3, PH_TAP=4}; JArrow::FireResult result = JArrow::FireResult::NotRunYet; result = ee->Fire(TS_SRC, 0); @@ -61,9 +61,6 @@ TEST_CASE("TimeslicesTests_FineGrained") { result = ee->Fire(PH_TAP, 0); REQUIRE(result == JArrow::FireResult::KeepGoing); - result = ee->Fire(TS_FLD, 0); - REQUIRE(result == JArrow::FireResult::KeepGoing); - REQUIRE(top->GetPools()[0]->GetSize(0) == 1); // Unfolder still has parent REQUIRE(top->GetPools()[1]->GetSize(0) == 4); // Child returned to pool @@ -94,6 +91,7 @@ TEST_CASE("TimeslicesTests_NoEvtProcs") { JApplication app; app.SetParameterValue("jana:nevents", "5"); + app.SetParameterValue("jana:loglevel", "debug"); app.Add(new MyTimesliceSource); app.Add(new MyTimesliceUnfolder); @@ -133,7 +131,6 @@ TEST_CASE("MultilevelSource_Trivial") { } - } // namespace multilevel_source_tests } // namespce jana From b8abcbbc1cfb31c39f3a99d837caecd71c816ff6 Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Mon, 18 May 2026 18:04:27 -0400 Subject: [PATCH 02/22] Remove JEvent RefToSelf mechanism This mechanism is both flawed and very confusing. It fails in one particular case: When all child events are processed by a JEventFolder before the parent event gets released from the JEventUnfolder --- src/libraries/JANA/JEvent.cc | 13 ------------- src/libraries/JANA/JEvent.h | 3 --- src/libraries/JANA/Topology/JUnfoldArrow.h | 5 +---- 3 files changed, 1 insertion(+), 20 deletions(-) diff --git a/src/libraries/JANA/JEvent.cc b/src/libraries/JANA/JEvent.cc index 8e4c69e4f..6bd8ebc01 100644 --- a/src/libraries/JANA/JEvent.cc +++ b/src/libraries/JANA/JEvent.cc @@ -144,19 +144,6 @@ std::vector JEvent::ReleaseAllParents() { return released_parents; } -void JEvent::TakeRefToSelf() { - mReferenceCount++; -} - -int JEvent::ReleaseRefToSelf() { - int remaining_refs = mReferenceCount.fetch_sub(1); - remaining_refs -= 1; // fetch_sub post increments - if (remaining_refs < 0) { - throw JException("JEvent's own refcount has gone negative!"); - } - return remaining_refs; -} - int JEvent::GetChildCount() { return mReferenceCount; } diff --git a/src/libraries/JANA/JEvent.h b/src/libraries/JANA/JEvent.h index 75e1c367c..2ab8e7241 100644 --- a/src/libraries/JANA/JEvent.h +++ b/src/libraries/JANA/JEvent.h @@ -96,9 +96,6 @@ class JEvent : public std::enable_shared_from_this { uint64_t GetParentNumber(JEventLevel level) const; void SetParentNumber(JEventLevel level, uint64_t number); - void TakeRefToSelf(); - int ReleaseRefToSelf(); - // Lifecycle void Clear(bool processed_successfully=true); void Finish(); diff --git a/src/libraries/JANA/Topology/JUnfoldArrow.h b/src/libraries/JANA/Topology/JUnfoldArrow.h index 6639b5ce5..bbc987212 100644 --- a/src/libraries/JANA/Topology/JUnfoldArrow.h +++ b/src/libraries/JANA/Topology/JUnfoldArrow.h @@ -46,7 +46,6 @@ class JUnfoldArrow : public JArrow { if (this->m_next_input_port == PARENT_IN) { assert(m_parent_event == nullptr); m_parent_event = event; - m_parent_event->TakeRefToSelf(); } else if (this->m_next_input_port == CHILD_IN) { assert(m_child_event == nullptr); @@ -90,7 +89,7 @@ class JUnfoldArrow : public JArrow { if (result == JEventUnfolder::Result::KeepChildNextParent) { // KeepChildNextParent is a little more complicated because we have to handle the case of the parent having no children. // In this case the parent obviously doesn't get shared among any children, and instead it is sent to the PARENT_OUT port. - int child_count = m_parent_event->ReleaseRefToSelf(); // Decrement the reference count so that this can be recycled + int child_count = m_parent_event->GetChildCount(); LOG_DEBUG(m_logger) << "Unfold finished with parent event = " << m_parent_event->GetEventNumber() << " (" << child_count << " children emitted)"; if (child_count > 0) { @@ -122,8 +121,6 @@ class JUnfoldArrow : public JArrow { } else if (result == JEventUnfolder::Result::NextChildNextParent) { m_child_event->SetParent(m_parent_event); - m_parent_event->ReleaseRefToSelf(); // Decrement the reference count so that this can be recycled - // TODO: Get rid of RefToSelf mechanism outputs[0] = {m_child_event, CHILD_OUT}; outputs[1] = {m_parent_event, PARENT_OUT}; output_count = 2; From 91e53354178be5889f580ad02f0766cd059b5068 Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Tue, 19 May 2026 19:21:55 -0400 Subject: [PATCH 03/22] Fix overloaded virtual warnings in JOmniFactory (again) --- src/libraries/JANA/Components/JOmniFactory.h | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/libraries/JANA/Components/JOmniFactory.h b/src/libraries/JANA/Components/JOmniFactory.h index 1e5f4c32d..112a889b9 100644 --- a/src/libraries/JANA/Components/JOmniFactory.h +++ b/src/libraries/JANA/Components/JOmniFactory.h @@ -43,10 +43,9 @@ class JOmniFactory : public JFactory { } // This is more hackery to suppress the overloaded-virtual warning - // Has to virtual simply because EICrecon already declares it as override + // Has to be virtual simply because EICrecon already declares it as override using JFactory::ChangeRun; - void ChangeRun(int32_t) {}; - + virtual void ChangeRun(int32_t) {}; using ConfigType = ConfigT; From 0639e263774839803b8f84b5b2b2976f4880c5fd Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Wed, 20 May 2026 14:58:37 -0400 Subject: [PATCH 04/22] Change JEventSource::SetEventLevels() => SetParentLevels() This reduces confusion (vs JEventSource::SetLevel), is more technically correct, and lets JTopologyBuilder clearly distinguish between "parent sources" and "child sources" --- src/libraries/JANA/JEventSource.h | 6 +++--- src/libraries/JANA/Topology/JMultilevelSourceArrow.cc | 6 ++++-- .../unit_tests/Topology/JTopologyBuilderTests.cc | 10 +++++++--- .../unit_tests/Topology/MultiLevelTopologyTests.cc | 2 +- .../unit_tests/Topology/MultiLevelTopologyTests.h | 3 ++- 5 files changed, 17 insertions(+), 10 deletions(-) diff --git a/src/libraries/JANA/JEventSource.h b/src/libraries/JANA/JEventSource.h index fab50c664..03fa2a4b1 100644 --- a/src/libraries/JANA/JEventSource.h +++ b/src/libraries/JANA/JEventSource.h @@ -41,7 +41,7 @@ class JEventSource : public jana::components::JComponent, bool m_enable_process_parallel = false; Status m_status = Status::Unopened; - std::vector m_event_levels; + std::vector m_parent_levels; JEventLevel m_next_level = JEventLevel::None; @@ -148,7 +148,7 @@ class JEventSource : public jana::components::JComponent, uint64_t GetSkippedEventCount() const { return m_events_skipped; }; uint64_t GetProcessedEventCount() const { return m_events_processed; }; - const std::vector GetEventLevels() { return m_event_levels; } + const std::vector GetParentLevels() { return m_parent_levels; } bool IsGetObjectsEnabled() const { return m_enable_get_objects; } bool IsFinishEventEnabled() const { return m_enable_finish_event; } @@ -181,7 +181,7 @@ class JEventSource : public jana::components::JComponent, void SetNSkip(uint64_t nskip) { m_nskip = nskip; }; void SetNextEventLevel(JEventLevel level) { m_next_level = level; } - void SetEventLevels(std::vector levels) { m_event_levels = levels; } + void SetParentLevels(std::vector levels) { m_parent_levels = levels; } JEventLevel GetNextInputLevel() const { return m_next_level; } diff --git a/src/libraries/JANA/Topology/JMultilevelSourceArrow.cc b/src/libraries/JANA/Topology/JMultilevelSourceArrow.cc index 82abea323..fdb28e3b0 100644 --- a/src/libraries/JANA/Topology/JMultilevelSourceArrow.cc +++ b/src/libraries/JANA/Topology/JMultilevelSourceArrow.cc @@ -6,8 +6,10 @@ void JMultilevelSourceArrow::SetEventSource(JEventSource* source) { m_source = source; - m_levels = source->GetEventLevels(); - m_child_event_level = m_levels.back(); + m_levels = source->GetParentLevels(); + m_child_event_level = source->GetLevel(); + m_levels.push_back(m_child_event_level); + m_next_input_port = 0; size_t input_port_count = 0; diff --git a/src/programs/unit_tests/Topology/JTopologyBuilderTests.cc b/src/programs/unit_tests/Topology/JTopologyBuilderTests.cc index 20534bb2c..7ffaa87a5 100644 --- a/src/programs/unit_tests/Topology/JTopologyBuilderTests.cc +++ b/src/programs/unit_tests/Topology/JTopologyBuilderTests.cc @@ -25,12 +25,16 @@ class MyMultiSource : public JEventSource { public: MyMultiSource() { SetCallbackStyle(CallbackStyle::ExpertMode); - SetEventLevels({JEventLevel::Run, JEventLevel::SlowControls, JEventLevel::PhysicsEvent}); + SetParentLevels({JEventLevel::Run, JEventLevel::SlowControls}); + SetLevel(JEventLevel::PhysicsEvent); } Result Emit(JEvent& event) override { auto count = GetEmittedEventCount(); - const auto& levels = GetEventLevels(); - SetNextEventLevel(levels.at((count+1) % levels.size())); + switch (count % 3) { + case 0: SetNextEventLevel(JEventLevel::SlowControls); break; + case 1: SetNextEventLevel(JEventLevel::PhysicsEvent); break; + case 2: SetNextEventLevel(JEventLevel::Run); break; + } LOG_INFO(GetLogger()) << "Emitting " << event.GetEventStamp(); return Result::Success; // Assume that source can peek ahead to request a different level } diff --git a/src/programs/unit_tests/Topology/MultiLevelTopologyTests.cc b/src/programs/unit_tests/Topology/MultiLevelTopologyTests.cc index 7c88ac1b6..c6bc8af5d 100644 --- a/src/programs/unit_tests/Topology/MultiLevelTopologyTests.cc +++ b/src/programs/unit_tests/Topology/MultiLevelTopologyTests.cc @@ -121,7 +121,7 @@ TEST_CASE("MultilevelSource_Trivial") { auto* source = new MyMultilevelSource; auto* proc = new MyMultilevelProcessor; - source->SetEventLevels({JEventLevel::PhysicsEvent}); + source->SetLevel(JEventLevel::PhysicsEvent); source->data_stream = {{JEventLevel::PhysicsEvent, 4}, {JEventLevel::PhysicsEvent, 5}, {JEventLevel::PhysicsEvent, 6}}; proc->expected_data_stream = {{-1,-1,4}, {-1,-1,5}, {-1,-1,6}}; diff --git a/src/programs/unit_tests/Topology/MultiLevelTopologyTests.h b/src/programs/unit_tests/Topology/MultiLevelTopologyTests.h index 84faa3466..abfe6e66e 100644 --- a/src/programs/unit_tests/Topology/MultiLevelTopologyTests.h +++ b/src/programs/unit_tests/Topology/MultiLevelTopologyTests.h @@ -181,7 +181,8 @@ struct MyMultilevelSource : public JEventSource { MyMultilevelSource() { SetCallbackStyle(CallbackStyle::ExpertMode); - SetEventLevels({JEventLevel::Run, JEventLevel::SlowControls, JEventLevel::PhysicsEvent}); + SetParentLevels({JEventLevel::Run, JEventLevel::SlowControls}); + SetLevel(JEventLevel::PhysicsEvent); m_calibs_out.SetLevel(JEventLevel::Run); m_controls_out.SetLevel(JEventLevel::SlowControls); From 1641b7ec0a0d0081fa85c93de8b9e8cc818efec9 Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Sun, 24 May 2026 01:35:19 -0400 Subject: [PATCH 05/22] Add JEventLevelHierarchy --- src/libraries/JANA/Utils/JEventLevel.h | 59 ++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/src/libraries/JANA/Utils/JEventLevel.h b/src/libraries/JANA/Utils/JEventLevel.h index 4d65f4398..6925b94ea 100644 --- a/src/libraries/JANA/Utils/JEventLevel.h +++ b/src/libraries/JANA/Utils/JEventLevel.h @@ -5,6 +5,8 @@ #include #include #include +#include +#include enum class JEventLevel { Run, Subrun, SlowControls, Timeslice, Block, PhysicsEvent, Subevent, Task, None }; @@ -70,3 +72,60 @@ inline JEventLevel next_level(JEventLevel current_level) { default: return JEventLevel::None; } } + +class JEventLevelHierarchy { + std::vector m_bottom_levels; + std::vector m_parent_levels; + std::vector m_all_levels; + std::map> parents; + std::map> children; + +private: + void ToString(std::stringstream& ss, JEventLevel current) { + ss << current; + auto it = parents.find(current); + if (it != parents.end()) { + ss << "("; + size_t parent_count = it->second.size(); + for (size_t i=0; isecond[i]); + if (i != parent_count-1) { + ss << ", "; + } + } + ss << ")"; + } + } + +public: + + const std::vector& GetAllLevels() const { + return m_all_levels; + } + const std::vector& GetParentLevels() const { + return m_parent_levels; + } + const std::vector& GetBottomLevels() const { + return m_bottom_levels; + } + void AddBottomLevel(JEventLevel level) { + m_all_levels.push_back(level); + m_bottom_levels.push_back(level); + } + void AddParentLevel(JEventLevel child, JEventLevel parent) { + m_all_levels.push_back(parent); + m_parent_levels.push_back(parent); + parents[child].push_back(parent); + children[parent].push_back(child); + } + std::string ToString() { + std::stringstream ss; + for (size_t i=0; i Date: Sun, 24 May 2026 01:45:28 -0400 Subject: [PATCH 06/22] Bugfix: MapArrow handles multiple parallel sources correctly --- src/libraries/JANA/Topology/JMapArrow.cc | 13 ++++++++----- src/libraries/JANA/Topology/JMapArrow.h | 4 ++-- src/libraries/JANA/Topology/JTopologyBuilder.cc | 2 +- 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/src/libraries/JANA/Topology/JMapArrow.cc b/src/libraries/JANA/Topology/JMapArrow.cc index 2dcbae4dd..7ee41fd71 100644 --- a/src/libraries/JANA/Topology/JMapArrow.cc +++ b/src/libraries/JANA/Topology/JMapArrow.cc @@ -17,8 +17,8 @@ JMapArrow::JMapArrow(std::string name, JEventLevel level) { AddPort("out", level); } -void JMapArrow::AddSource(JEventSource* source) { - m_sources.push_back(source); +void JMapArrow::SetParallelSource(bool is_parallel) { + m_parallel_source = is_parallel; } void JMapArrow::AddUnfolder(JEventUnfolder* unfolder) { @@ -32,9 +32,12 @@ void JMapArrow::AddProcessor(JEventProcessor* processor) { void JMapArrow::Fire(JEvent* event, OutputData& outputs, size_t& output_count, JArrow::FireResult& status) { LOG_DEBUG(m_logger) << "Executing arrow " << GetName() << " for event# " << event->GetEventNumber() << LOG_END; - for (JEventSource* source : m_sources) { - JCallGraphEntryMaker cg_entry(*event->GetJCallGraphRecorder(), source->GetTypeName()); // times execution until this goes out of scope - source->ProcessParallel(*event); + if (m_parallel_source) { + auto* source = event->GetJEventSource(); + if (source != nullptr) { + JCallGraphEntryMaker cg_entry(*event->GetJCallGraphRecorder(), source->GetTypeName()); // times execution until this goes out of scope + event->GetJEventSource()->ProcessParallel(*event); + } } for (JEventUnfolder* unfolder : m_unfolders) { JCallGraphEntryMaker cg_entry(*event->GetJCallGraphRecorder(), unfolder->GetTypeName()); // times execution until this goes out of scope diff --git a/src/libraries/JANA/Topology/JMapArrow.h b/src/libraries/JANA/Topology/JMapArrow.h index 72c7e1b21..c64958f07 100644 --- a/src/libraries/JANA/Topology/JMapArrow.h +++ b/src/libraries/JANA/Topology/JMapArrow.h @@ -18,14 +18,14 @@ class JMapArrow : public JArrow { enum PortIndex {EVENT_IN=0, EVENT_OUT=1}; private: - std::vector m_sources; + bool m_parallel_source = false; std::vector m_unfolders; std::vector m_procs; public: JMapArrow(std::string name, JEventLevel level); - void AddSource(JEventSource* source); + void SetParallelSource(bool is_parallel); void AddUnfolder(JEventUnfolder* unfolder); void AddProcessor(JEventProcessor* proc); diff --git a/src/libraries/JANA/Topology/JTopologyBuilder.cc b/src/libraries/JANA/Topology/JTopologyBuilder.cc index 8b7e8381e..c27029f55 100644 --- a/src/libraries/JANA/Topology/JTopologyBuilder.cc +++ b/src/libraries/JANA/Topology/JTopologyBuilder.cc @@ -429,7 +429,7 @@ void JTopologyBuilder::AttachLevel(JEventLevel current_level, JUnfoldArrow* pare map1_arrow = new JMapArrow(level_str+"Map1", current_level); for (JEventSource* source: sources_at_level) { if (source->IsProcessParallelEnabled()) { - map1_arrow->AddSource(source); + map1_arrow->SetParallelSource(true); } } for (JEventUnfolder* unf: unfolders_at_level) { From d68747e8bb31de86869c8eafc19e5d71f143781a Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Sun, 24 May 2026 02:46:20 -0400 Subject: [PATCH 07/22] WIP: JTopologyBuilder::CreateTopologyFromScratch --- .../JANA/Topology/JTopologyBuilder.cc | 97 +++++++++++++++++++ .../JANA/Topology/JTopologyBuilder.h | 1 + 2 files changed, 98 insertions(+) diff --git a/src/libraries/JANA/Topology/JTopologyBuilder.cc b/src/libraries/JANA/Topology/JTopologyBuilder.cc index c27029f55..ff3aa95ae 100644 --- a/src/libraries/JANA/Topology/JTopologyBuilder.cc +++ b/src/libraries/JANA/Topology/JTopologyBuilder.cc @@ -7,6 +7,7 @@ #include #include +#include "JANA/Topology/JArrow.h" #include "JANA/Utils/JEventLevel.h" #include "JSourceArrow.h" #include "JMapArrow.h" @@ -176,6 +177,102 @@ void JTopologyBuilder::CreateTopology() { } } +void JTopologyBuilder::CreateTopologyFromScratch() { + + enum class Column { Source=0, UnfoldAbove=1, BatchBefore=2, UnfoldBelow=3, FoldBelow=4, BatchAfter=5, Tap=6, FoldAbove=7}; + struct Cell { + JArrow* start = nullptr; + JArrow* end = nullptr; + }; + + std::map, Cell> grid; + + // ----------------------------- + // Phase 1: Iterate over all components, adding the corresponding arrows to the grid + // ----------------------------- + + int map_counter = 0; + + // Place all sources on grid + // ----------------------------- + std::map> sources; + for (JEventSource* source : m_components->get_evt_srces()) { + if (source->IsEnabled()) { + sources[source->GetLevel()].push_back(source); + } + } + for (auto& it : sources) { + auto level = it.first; + auto level_str = toString(level); + auto* src_arrow = new JSourceArrow(level_str+"Source", level, it.second); + arrows.push_back(src_arrow); + grid[{level, Column::Source}] = {src_arrow, src_arrow}; + } + + // Place all unfolders on grid + // ----------------------------- + for (auto* unfolder: m_components->get_unfolders()) { + + if (!unfolder->IsEnabled()) continue; + + // Create unfold arrow + // Publish at _each_ grid location + auto parent_level = unfolder->GetLevel(); + auto child_level = unfolder->GetChildLevel(); + + auto* map_arrow = new JMapArrow(toString(parent_level)+"Map"+std::to_string(map_counter++), parent_level); + auto* unfold_arrow = new JUnfoldArrow(toString(child_level)+"Unfold", unfolder); + map_arrow->AddUnfolder(unfolder); + arrows.push_back(map_arrow); + arrows.push_back(unfold_arrow); + Connect(map_arrow, map_arrow->EVENT_OUT, unfold_arrow, unfold_arrow->PARENT_IN); + + grid[{parent_level, Column::UnfoldBelow}] = {map_arrow, unfold_arrow}; + grid[{child_level, Column::UnfoldAbove}] = {unfold_arrow, unfold_arrow}; + } + + // Place all processors on grid + // ----------------------------- + std::map> mappable_processors; + std::map> tappable_processors; + for (auto* proc : m_components->get_evt_procs()) { + if (proc->IsEnabled()) { + + if (proc->GetCallbackStyle() == JEventProcessor::CallbackStyle::LegacyMode && proc->IsOrderingEnabled()) { + throw JException("%s: Ordering can only be used with non-legacy JEventProcessors", proc->GetTypeName().c_str()); + } + mappable_processors[proc->GetLevel()].push_back(proc); + if (proc->GetCallbackStyle() != JEventProcessor::CallbackStyle::LegacyMode) { + tappable_processors[proc->GetLevel()].push_back(proc); + } + } + } + for (auto it : mappable_processors) { + auto level = it.first; + auto level_str = toString(level); + auto* map_arrow = new JMapArrow(level_str+"Map"+std::to_string(map_counter++), level); + for (JEventProcessor* proc : it.second) { + map_arrow->AddProcessor(proc); + } + arrows.push_back(map_arrow); + + auto tappable_procs_it = tappable_processors.find(level); + if (tappable_procs_it != tappable_processors.end()) { + JArrow* first_tap_arrow = nullptr; + JArrow* last_tap_arrow = nullptr; + std::tie(first_tap_arrow, last_tap_arrow) = CreateTapChain(it.second, level_str); + Connect(map_arrow, map_arrow->EVENT_OUT, first_tap_arrow, JTapArrow::EVENT_IN); + grid[{level, Column::Tap}] = {map_arrow, last_tap_arrow}; + } + else { + // ONLY legacy processors, no tap chain + grid[{level, Column::Tap}] = {map_arrow, map_arrow}; + } + } + + // Phase 2: Iterate over all rows and all adjacent occupied column pairs, wiring horizontally +} + void JTopologyBuilder::Init() { diff --git a/src/libraries/JANA/Topology/JTopologyBuilder.h b/src/libraries/JANA/Topology/JTopologyBuilder.h index 0742a742f..6e4b6095b 100644 --- a/src/libraries/JANA/Topology/JTopologyBuilder.h +++ b/src/libraries/JANA/Topology/JTopologyBuilder.h @@ -67,6 +67,7 @@ class JTopologyBuilder : public JService { void SetConfigureFn(std::function configure_fn); void CreateTopology(); + void CreateTopologyFromScratch(); std::string PrintTopology(); From fa1be988a270ff53a6019f4f5a5e6ce4af34e956 Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Mon, 25 May 2026 01:53:22 -0400 Subject: [PATCH 08/22] JArrow::AddPort now requires PortDirection --- src/libraries/JANA/Topology/JArrow.cc | 49 ++++++++++++++++++- src/libraries/JANA/Topology/JArrow.h | 20 +++----- src/libraries/JANA/Topology/JFoldArrow.h | 6 +-- src/libraries/JANA/Topology/JMapArrow.cc | 4 +- .../JANA/Topology/JMultilevelSourceArrow.cc | 4 +- src/libraries/JANA/Topology/JSourceArrow.cc | 4 +- src/libraries/JANA/Topology/JTapArrow.cc | 4 +- src/libraries/JANA/Topology/JUnfoldArrow.h | 8 +-- .../integration_tests/BatchedArrow.cc | 4 +- .../integration_tests/SimpleOffloading.cc | 8 +-- .../unit_tests/Topology/JArrowTests.cc | 4 +- 11 files changed, 78 insertions(+), 37 deletions(-) diff --git a/src/libraries/JANA/Topology/JArrow.cc b/src/libraries/JANA/Topology/JArrow.cc index 4f1b2f22c..864c7b18f 100644 --- a/src/libraries/JANA/Topology/JArrow.cc +++ b/src/libraries/JANA/Topology/JArrow.cc @@ -2,7 +2,7 @@ #include -JArrow::Port& JArrow::AddPort(std::string name, JEventLevel level) { +JArrow::Port& JArrow::AddPort(std::string name, JEventLevel level, PortDirection direction) { if (m_port_lookup.find(name) != m_port_lookup.end()) { throw JException("Port with name '%s' already exists", name.c_str()); } @@ -10,6 +10,16 @@ JArrow::Port& JArrow::AddPort(std::string name, JEventLevel level) { auto port_raw_ptr = port.get(); m_ports.push_back(std::move(port)); m_port_lookup[name] = m_ports.size()-1; + + auto it = m_auto_port_lookup.find({level, direction}); + if (it != m_auto_port_lookup.end()) { + // There's a conflict! Multiple ports with the same level, direction + // Handle this case by disabling the lookup + it->second = -1; + } + else { + m_auto_port_lookup[{level, direction}] = m_ports.size()-1; + } return *port_raw_ptr; } @@ -91,5 +101,42 @@ std::string ToString(JArrow::FireResult r) { } } +std::string ToString(JArrow::PortDirection d) { + switch (d) { + case JArrow::PortDirection::In: return "In"; + case JArrow::PortDirection::Out: return "Out"; + default: return "Unknown"; + } +} + +int JArrow::GetPortIndex(JEventLevel level, PortDirection direction) { + auto it = m_auto_port_lookup.find({level, direction}); + if (it == m_auto_port_lookup.end()) { + throw JException("Unable to find port with (level=%s, direction=%s) on arrow '%s'", + toString(level).c_str(), + ToString(direction).c_str(), + GetName().c_str()); + } + else if (it->second == -1) { + throw JException("Ambiguous port with (level=%s, direction=%s) on arrow '%s'", + toString(level).c_str(), + ToString(direction).c_str(), + GetName().c_str()); + } + return it->second; +} + +int JArrow::GetPortIndex(const std::string& port_name) { + auto it = m_port_lookup.find(port_name); + if (it == m_port_lookup.end()) { + LOG_FATAL(GetLogger()) << "Unable to find port_name '" << port_name << "' on arrow '" << GetName() << "'. Valid port names are:"; + for (auto& port : m_ports) { + LOG_FATAL(GetLogger()) << " " << port->GetName(); + } + throw JException("Unable to find port_name '%s' on arrow '%s'", port_name.c_str(), GetName().c_str()); + } + return it->second; +} + diff --git a/src/libraries/JANA/Topology/JArrow.h b/src/libraries/JANA/Topology/JArrow.h index 2cc4747dc..1829d9a63 100644 --- a/src/libraries/JANA/Topology/JArrow.h +++ b/src/libraries/JANA/Topology/JArrow.h @@ -18,6 +18,7 @@ class JArrow { public: using OutputData = std::array, 2>; enum class FireResult {NotRunYet, KeepGoing, ComeBackLater, Finished}; + enum class PortDirection { In, Out }; class Port { std::string m_name; @@ -83,6 +84,7 @@ class JArrow { clock_t::time_point m_next_visit_time=clock_t::now(); std::vector> m_ports; std::map m_port_lookup; + std::map, int> m_auto_port_lookup; JLogger m_logger; public: @@ -117,23 +119,15 @@ class JArrow { void SetIsSource(bool is_source) { m_is_source = is_source; } void SetIsSink(bool is_sink) { m_is_sink = is_sink; } - Port& AddPort(std::string port_name, JEventLevel level); - Port& AddPort(std::string port_name, std::vector levels); + Port& AddPort(std::string port_name, JEventLevel level, PortDirection direction); Port& GetPort(size_t port_index) { return *m_ports.at(port_index); } - int GetPortIndex(const std::string& port_name) { - auto it = m_port_lookup.find(port_name); - if (it == m_port_lookup.end()) { - LOG_FATAL(GetLogger()) << "Unable to find port_name '" << port_name << "' on arrow '" << GetName() << "'. Valid port names are:"; - for (auto& port : m_ports) { - LOG_FATAL(GetLogger()) << " " << port->GetName(); - } - throw JException("Unable to find port_name '%s' on arrow '%s'", port_name.c_str(), GetName().c_str()); - } - return it->second; - } + + int GetPortIndex(JEventLevel level, PortDirection direction); + int GetPortIndex(const std::string& port_name); void SetNextPortIndex(int input_port) { m_next_input_port = input_port; } }; std::string ToString(JArrow::FireResult r); +std::string ToString(JArrow::PortDirection d); diff --git a/src/libraries/JANA/Topology/JFoldArrow.h b/src/libraries/JANA/Topology/JFoldArrow.h index 71d729b89..ac167a197 100644 --- a/src/libraries/JANA/Topology/JFoldArrow.h +++ b/src/libraries/JANA/Topology/JFoldArrow.h @@ -26,9 +26,9 @@ class JFoldArrow : public JArrow { m_child_level(child_level) { SetName(name); - AddPort("child_in", child_level).SetEnforcesOrdering(true); - AddPort("child_out", child_level); - AddPort("parent_out", parent_level); + AddPort("child_in", child_level, PortDirection::In).SetEnforcesOrdering(true); + AddPort("child_out", child_level, PortDirection::Out); + AddPort("parent_out", parent_level, PortDirection::Out); m_next_input_port = CHILD_IN; } diff --git a/src/libraries/JANA/Topology/JMapArrow.cc b/src/libraries/JANA/Topology/JMapArrow.cc index 7ee41fd71..6531e8c53 100644 --- a/src/libraries/JANA/Topology/JMapArrow.cc +++ b/src/libraries/JANA/Topology/JMapArrow.cc @@ -13,8 +13,8 @@ JMapArrow::JMapArrow(std::string name, JEventLevel level) { SetName(name); SetIsParallel(true); - AddPort("in", level); - AddPort("out", level); + AddPort("in", level, PortDirection::In); + AddPort("out", level, PortDirection::Out); } void JMapArrow::SetParallelSource(bool is_parallel) { diff --git a/src/libraries/JANA/Topology/JMultilevelSourceArrow.cc b/src/libraries/JANA/Topology/JMultilevelSourceArrow.cc index fdb28e3b0..785abaeab 100644 --- a/src/libraries/JANA/Topology/JMultilevelSourceArrow.cc +++ b/src/libraries/JANA/Topology/JMultilevelSourceArrow.cc @@ -15,11 +15,11 @@ void JMultilevelSourceArrow::SetEventSource(JEventSource* source) { size_t input_port_count = 0; size_t output_port_count = 0; for (auto level : m_levels) { - AddPort(toString(level) + "In", level).SetSkipFinishEvent(true); + AddPort(toString(level) + "In", level, PortDirection::In).SetSkipFinishEvent(true); m_port_lookup[{level, Direction::In}] = input_port_count++; } for (auto level : m_levels) { - AddPort(toString(level) + "Out", level); + AddPort(toString(level) + "Out", level, PortDirection::Out); m_port_lookup[{level, Direction::Out}] = input_port_count + output_port_count++; } } diff --git a/src/libraries/JANA/Topology/JSourceArrow.cc b/src/libraries/JANA/Topology/JSourceArrow.cc index a9ae317db..c22f904a5 100644 --- a/src/libraries/JANA/Topology/JSourceArrow.cc +++ b/src/libraries/JANA/Topology/JSourceArrow.cc @@ -13,8 +13,8 @@ JSourceArrow::JSourceArrow(std::string name, JEventLevel level, std::vectorGetLevel(); auto child_level = unfolder->GetChildLevel(); - AddPort("parent_in", parent_level); - AddPort("child_in", child_level); - AddPort("child_out", child_level).SetEstablishesOrdering(true); + AddPort("parent_in", parent_level, PortDirection::In); + AddPort("child_in", child_level, PortDirection::In); + AddPort("child_out", child_level, PortDirection::Out).SetEstablishesOrdering(true); // Just in case there's a folder that needs this. // establishes_ordering is cheap; enforces_ordering is the expensive one - AddPort("parent_out", parent_level); + AddPort("parent_out", parent_level, PortDirection::Out); m_next_input_port = GetPortIndex("parent_in"); } diff --git a/src/programs/integration_tests/BatchedArrow.cc b/src/programs/integration_tests/BatchedArrow.cc index 14dbb5873..f5e92c069 100644 --- a/src/programs/integration_tests/BatchedArrow.cc +++ b/src/programs/integration_tests/BatchedArrow.cc @@ -16,8 +16,8 @@ class BatchedArrow : public JArrow { BatchedArrow(JEventLevel level) { SetName("BatchedArrow"); SetIsParallel(false); - AddPort("in", level); - AddPort("out", level); + AddPort("in", level, PortDirection::In); + AddPort("out", level, PortDirection::Out); } void SetBatchSize(int batch_size) { m_batch_size = batch_size; } diff --git a/src/programs/integration_tests/SimpleOffloading.cc b/src/programs/integration_tests/SimpleOffloading.cc index e27ae0fa7..27c1123b7 100644 --- a/src/programs/integration_tests/SimpleOffloading.cc +++ b/src/programs/integration_tests/SimpleOffloading.cc @@ -74,8 +74,8 @@ struct TriggerFactoryInputsArrow : public JArrow { TriggerFactoryInputsArrow(JEventLevel level) { SetName("trigger"); SetIsParallel(true); - AddPort("in", level); - AddPort("out", level); + AddPort("in", level, PortDirection::In); + AddPort("out", level, PortDirection::Out); } void Fire(JEvent* event, OutputData& outputs, size_t& output_count, JArrow::FireResult& status) override { @@ -98,8 +98,8 @@ struct OffloadArrow : public JArrow { OffloadArrow(JEventLevel level) { SetName("offload"); SetIsParallel(false); - AddPort("in", level); - AddPort("out", level); + AddPort("in", level, PortDirection::In); + AddPort("out", level, PortDirection::Out); } ~OffloadArrow() override {} diff --git a/src/programs/unit_tests/Topology/JArrowTests.cc b/src/programs/unit_tests/Topology/JArrowTests.cc index 7726c5fda..cc17b4a54 100644 --- a/src/programs/unit_tests/Topology/JArrowTests.cc +++ b/src/programs/unit_tests/Topology/JArrowTests.cc @@ -9,8 +9,8 @@ struct TestData { int x; }; struct BasicParallelArrow : public JArrow { BasicParallelArrow() { - AddPort("in", JEventLevel::PhysicsEvent); - AddPort("out", JEventLevel::PhysicsEvent); + AddPort("in", JEventLevel::PhysicsEvent, PortDirection::In); + AddPort("out", JEventLevel::PhysicsEvent, PortDirection::Out); SetIsParallel(true); } From 3fa5460e70e385849971324678ffd27e291712ee Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Mon, 25 May 2026 11:31:41 -0400 Subject: [PATCH 09/22] WIP: JTopologyBuilder::CreateTopologyFromScratch part 2 --- .../JANA/Topology/JTopologyBuilder.cc | 63 ++++++++++++++++--- 1 file changed, 56 insertions(+), 7 deletions(-) diff --git a/src/libraries/JANA/Topology/JTopologyBuilder.cc b/src/libraries/JANA/Topology/JTopologyBuilder.cc index ff3aa95ae..38f27d5b2 100644 --- a/src/libraries/JANA/Topology/JTopologyBuilder.cc +++ b/src/libraries/JANA/Topology/JTopologyBuilder.cc @@ -150,7 +150,8 @@ void JTopologyBuilder::CreateTopology() { LOG_WARN(GetLogger()) << "Found custom topology configurator! Modified arrow topology is: \n" << PrintTopology() << LOG_END; } else { - AttachLevel(JEventLevel::Run, nullptr, nullptr, nullptr); + CreateTopologyFromScratch(); + //AttachLevel(JEventLevel::Run, nullptr, nullptr, nullptr); LOG_INFO(GetLogger()) << "Arrow topology is:\n" << PrintTopology() << LOG_END; } for (auto* arrow : arrows) { @@ -179,7 +180,8 @@ void JTopologyBuilder::CreateTopology() { void JTopologyBuilder::CreateTopologyFromScratch() { - enum class Column { Source=0, UnfoldAbove=1, BatchBefore=2, UnfoldBelow=3, FoldBelow=4, BatchAfter=5, Tap=6, FoldAbove=7}; + enum class Column { Source, UnfoldAbove, BatchBefore, UnfoldBelow, FoldBelow, BatchAfter, Tap, FoldAbove}; + std::vector columns = { Column::Source, Column::UnfoldAbove, Column::BatchBefore, Column::UnfoldBelow, Column::FoldBelow, Column::BatchAfter, Column::Tap, Column::FoldAbove}; struct Cell { JArrow* start = nullptr; JArrow* end = nullptr; @@ -192,6 +194,7 @@ void JTopologyBuilder::CreateTopologyFromScratch() { // ----------------------------- int map_counter = 0; + std::set levels_present; // Place all sources on grid // ----------------------------- @@ -199,13 +202,14 @@ void JTopologyBuilder::CreateTopologyFromScratch() { for (JEventSource* source : m_components->get_evt_srces()) { if (source->IsEnabled()) { sources[source->GetLevel()].push_back(source); + levels_present.insert(source->GetLevel()); } } for (auto& it : sources) { auto level = it.first; auto level_str = toString(level); auto* src_arrow = new JSourceArrow(level_str+"Source", level, it.second); - arrows.push_back(src_arrow); + AddArrow(src_arrow); grid[{level, Column::Source}] = {src_arrow, src_arrow}; } @@ -219,12 +223,14 @@ void JTopologyBuilder::CreateTopologyFromScratch() { // Publish at _each_ grid location auto parent_level = unfolder->GetLevel(); auto child_level = unfolder->GetChildLevel(); + levels_present.insert(parent_level); + levels_present.insert(child_level); auto* map_arrow = new JMapArrow(toString(parent_level)+"Map"+std::to_string(map_counter++), parent_level); auto* unfold_arrow = new JUnfoldArrow(toString(child_level)+"Unfold", unfolder); map_arrow->AddUnfolder(unfolder); - arrows.push_back(map_arrow); - arrows.push_back(unfold_arrow); + AddArrow(map_arrow); + AddArrow(unfold_arrow); Connect(map_arrow, map_arrow->EVENT_OUT, unfold_arrow, unfold_arrow->PARENT_IN); grid[{parent_level, Column::UnfoldBelow}] = {map_arrow, unfold_arrow}; @@ -237,7 +243,7 @@ void JTopologyBuilder::CreateTopologyFromScratch() { std::map> tappable_processors; for (auto* proc : m_components->get_evt_procs()) { if (proc->IsEnabled()) { - + levels_present.insert(proc->GetLevel()); if (proc->GetCallbackStyle() == JEventProcessor::CallbackStyle::LegacyMode && proc->IsOrderingEnabled()) { throw JException("%s: Ordering can only be used with non-legacy JEventProcessors", proc->GetTypeName().c_str()); } @@ -254,7 +260,7 @@ void JTopologyBuilder::CreateTopologyFromScratch() { for (JEventProcessor* proc : it.second) { map_arrow->AddProcessor(proc); } - arrows.push_back(map_arrow); + AddArrow(map_arrow); auto tappable_procs_it = tappable_processors.find(level); if (tappable_procs_it != tappable_processors.end()) { @@ -270,7 +276,50 @@ void JTopologyBuilder::CreateTopologyFromScratch() { } } + // ----------------------------- // Phase 2: Iterate over all rows and all adjacent occupied column pairs, wiring horizontally + // ----------------------------- + + for (JEventLevel level : levels_present) { + + auto* pool = GetOrCreatePool(level); + JArrow* last_arrow = nullptr; + for (auto column : columns) { + auto it = grid.find({level, column}); + if (it == grid.end()) { continue; } + + JArrow* current_arrow = it->second.start; + if (last_arrow == nullptr) { + // This is the first arrow we've found, so connect the pool here + auto port_index = current_arrow->GetPortIndex(level, JArrow::PortDirection::In); + current_arrow->GetPort(port_index).Attach(pool); + } + else { + Connect(last_arrow, + last_arrow->GetPortIndex(level, JArrow::PortDirection::Out), + current_arrow, + current_arrow->GetPortIndex(level, JArrow::PortDirection::In)); + } + last_arrow = it->second.end; + + } + // Connect last_arrow to pool + auto port_index = last_arrow->GetPortIndex(level, JArrow::PortDirection::Out); + last_arrow->GetPort(port_index).Attach(pool); + } + + // ----------------------------- + // Phase 3: Traverse event hierarchy and attach levels accordingly + // ----------------------------- + // Because we haven't fully implemented the event hierarchy yet, let's just go with the fully connected graph + + for (auto outer_level : levels_present) { + for (auto inner_level : levels_present) { + if (outer_level != inner_level) { + ConnectPool(outer_level, inner_level); + } + } + } } From 80e1837626debdc328c24dc58162a74578fee94a Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Mon, 25 May 2026 13:38:04 -0400 Subject: [PATCH 10/22] Test cases pass again --- src/libraries/JANA/Topology/JArrow.h | 4 ++ .../JANA/Topology/JTopologyBuilder.cc | 39 ++++++++++++- .../JANA/Topology/JTopologyBuilder.h | 3 +- .../Engine/JExecutionEngineTests.cc | 2 +- .../Topology/MultiLevelTopologyTests.cc | 57 +++++++++++++------ 5 files changed, 84 insertions(+), 21 deletions(-) diff --git a/src/libraries/JANA/Topology/JArrow.h b/src/libraries/JANA/Topology/JArrow.h index 1829d9a63..fcf00d973 100644 --- a/src/libraries/JANA/Topology/JArrow.h +++ b/src/libraries/JANA/Topology/JArrow.h @@ -73,6 +73,7 @@ class JArrow { private: std::string m_name; // Used for human understanding + int m_id; // Used internally bool m_is_parallel = false; // Whether or not it is safe to parallelize bool m_is_source = false; // Whether or not this arrow should activate/drain the topology bool m_is_sink = false; // Whether or not tnis arrow contributes to the final event count @@ -107,6 +108,7 @@ class JArrow { const std::string& GetName() { return m_name; } + int GetId() { return m_id; } JLogger& GetLogger() { return m_logger; } bool IsParallel() { return m_is_parallel; } bool IsSource() { return m_is_source; } @@ -114,6 +116,7 @@ class JArrow { int GetNextPortIndex() { return m_next_input_port; } void SetName(std::string name) { m_name = name; } + void SetId(int id) { m_id = id; } void SetLogger(JLogger logger) { m_logger = logger; } void SetIsParallel(bool is_parallel) { m_is_parallel = is_parallel; } void SetIsSource(bool is_source) { m_is_source = is_source; } @@ -121,6 +124,7 @@ class JArrow { Port& AddPort(std::string port_name, JEventLevel level, PortDirection direction); Port& GetPort(size_t port_index) { return *m_ports.at(port_index); } + Port& GetPort(JEventLevel level, PortDirection direction) { return *m_ports.at(m_auto_port_lookup.at({level, direction})); } int GetPortIndex(JEventLevel level, PortDirection direction); int GetPortIndex(const std::string& port_name); diff --git a/src/libraries/JANA/Topology/JTopologyBuilder.cc b/src/libraries/JANA/Topology/JTopologyBuilder.cc index 38f27d5b2..c99e11aa2 100644 --- a/src/libraries/JANA/Topology/JTopologyBuilder.cc +++ b/src/libraries/JANA/Topology/JTopologyBuilder.cc @@ -37,6 +37,7 @@ JTopologyBuilder::~JTopologyBuilder() { void JTopologyBuilder::AddArrow(JArrow* arrow) { arrows.push_back(arrow); + arrow->SetId(arrows.size()-1); auto it = arrow_lookup.find(arrow->GetName()); if (it != arrow_lookup.end()) { throw JException("AddArrow(): Arrow with name '%s' has already been added", arrow->GetName().c_str()); @@ -44,6 +45,13 @@ void JTopologyBuilder::AddArrow(JArrow* arrow) { arrow_lookup[arrow->GetName()] = arrow; } +JArrow* JTopologyBuilder::GetArrow(const std::string& arrow_name) { + auto it = arrow_lookup.find(arrow_name); + if (it == arrow_lookup.end()) { + return nullptr; + } + return it->second; +} JEventPool* JTopologyBuilder::GetOrCreatePool(JEventLevel level) { auto pool_it = pool_lookup.find(level); @@ -193,7 +201,7 @@ void JTopologyBuilder::CreateTopologyFromScratch() { // Phase 1: Iterate over all components, adding the corresponding arrows to the grid // ----------------------------- - int map_counter = 0; + int map_counter = 1; std::set levels_present; // Place all sources on grid @@ -208,9 +216,25 @@ void JTopologyBuilder::CreateTopologyFromScratch() { for (auto& it : sources) { auto level = it.first; auto level_str = toString(level); + bool need_map = false; + for (auto* source : it.second) { + if (source->IsProcessParallelEnabled()) { + need_map = true; + } + } auto* src_arrow = new JSourceArrow(level_str+"Source", level, it.second); AddArrow(src_arrow); - grid[{level, Column::Source}] = {src_arrow, src_arrow}; + + if (need_map) { + auto* map_arrow = new JMapArrow(toString(level)+"Map"+std::to_string(map_counter++), level); + map_arrow->SetParallelSource(true); + AddArrow(map_arrow); + Connect(src_arrow, 1, map_arrow, 0); + grid[{level, Column::Source}] = {src_arrow, map_arrow}; + } + else { + grid[{level, Column::Source}] = {src_arrow, src_arrow}; + } } // Place all unfolders on grid @@ -233,6 +257,12 @@ void JTopologyBuilder::CreateTopologyFromScratch() { AddArrow(unfold_arrow); Connect(map_arrow, map_arrow->EVENT_OUT, unfold_arrow, unfold_arrow->PARENT_IN); + if (grid.find({parent_level, Column::UnfoldBelow}) != grid.end()) { + throw JException("Only one unfolder allowed for parent level=%s", toString(parent_level)); + } + if (grid.find({child_level, Column::UnfoldAbove}) != grid.end()) { + throw JException("Only one unfolder allowed for child level=%s", toString(child_level)); + } grid[{parent_level, Column::UnfoldBelow}] = {map_arrow, unfold_arrow}; grid[{child_level, Column::UnfoldAbove}] = {unfold_arrow, unfold_arrow}; } @@ -305,6 +335,9 @@ void JTopologyBuilder::CreateTopologyFromScratch() { } // Connect last_arrow to pool auto port_index = last_arrow->GetPortIndex(level, JArrow::PortDirection::Out); + if (level == JEventLevel::PhysicsEvent) { + last_arrow->SetIsSink(true); + } last_arrow->GetPort(port_index).Attach(pool); } @@ -454,7 +487,7 @@ std::pair JTopologyBuilder::CreateTapChain(std::vectorGetLevel()); current->AddProcessor(proc); - arrows.push_back(current); + AddArrow(current); if (first == nullptr) { first = current; } diff --git a/src/libraries/JANA/Topology/JTopologyBuilder.h b/src/libraries/JANA/Topology/JTopologyBuilder.h index 6e4b6095b..88bbe0ef1 100644 --- a/src/libraries/JANA/Topology/JTopologyBuilder.h +++ b/src/libraries/JANA/Topology/JTopologyBuilder.h @@ -53,6 +53,8 @@ class JTopologyBuilder : public JService { void Init() override; void AddArrow(JArrow* arrow); + JArrow* GetArrow(const std::string& arrow_name); + JEventPool* GetOrCreatePool(JEventLevel level); void ConnectQueue(std::string upstream_arrow_name, std::string upstream_port_name, std::string downstream_arrow_name, std::string downstream_port_name); @@ -81,7 +83,6 @@ class JTopologyBuilder : public JService { void ConnectToFirstAvailable(JArrow* upstream, size_t upstream_port_id, std::vector> downstreams); void Connect(JArrow* upstream, size_t upstream_port_id, JArrow* downstream, size_t downstream_port_id); std::pair CreateTapChain(std::vector& procs, std::string name); - JEventPool* GetOrCreatePool(JEventLevel level); }; diff --git a/src/programs/unit_tests/Engine/JExecutionEngineTests.cc b/src/programs/unit_tests/Engine/JExecutionEngineTests.cc index 1f0585cce..1d0660c69 100644 --- a/src/programs/unit_tests/Engine/JExecutionEngineTests.cc +++ b/src/programs/unit_tests/Engine/JExecutionEngineTests.cc @@ -138,7 +138,7 @@ TEST_CASE("JExecutionEngine_ExternalWorkers") { sut->ExchangeTask(task, worker.worker_id); REQUIRE(task.arrow != nullptr); - REQUIRE(task.arrow->GetName() == "PhysicsEventMap2"); + REQUIRE(task.arrow->GetName() == "PhysicsEventMap1"); REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Draining); REQUIRE(sut->GetPerf().event_count == 0); diff --git a/src/programs/unit_tests/Topology/MultiLevelTopologyTests.cc b/src/programs/unit_tests/Topology/MultiLevelTopologyTests.cc index c6bc8af5d..d949ec9c5 100644 --- a/src/programs/unit_tests/Topology/MultiLevelTopologyTests.cc +++ b/src/programs/unit_tests/Topology/MultiLevelTopologyTests.cc @@ -1,6 +1,7 @@ #include "MultiLevelTopologyTests.h" #include "JANA/Engine/JExecutionEngine.h" #include "JANA/JApplicationFwd.h" +#include "JANA/JEvent.h" #include "JANA/JException.h" #include "JANA/Topology/JArrow.h" #include "JANA/Topology/JTopologyBuilder.h" @@ -31,38 +32,62 @@ TEST_CASE("TimeslicesTests_FineGrained") { app.Initialize(); auto ee = app.GetService(); auto top = app.GetService(); - enum ArrowId {TS_SRC=0, TS_MAP=1, TS_UNF=2, PH_MAP=3, PH_TAP=4}; + + auto src_arrow = top->GetArrow("TimesliceSource"); + auto ts_map_arrow = top->GetArrow("TimesliceMap1"); + auto unfold_arrow = top->GetArrow("PhysicsEventUnfold"); + auto pe_map_arrow = top->GetArrow("PhysicsEventMap2"); + auto pe_tap_arrow = top->GetArrow("PhysicsEventTap"); + + auto ts_pool = top->GetOrCreatePool(JEventLevel::Timeslice); + auto pe_pool = top->GetOrCreatePool(JEventLevel::PhysicsEvent); + + auto ts_map_queue = ts_map_arrow->GetPort(0).GetQueue(); + auto unfold_queue = unfold_arrow->GetPort(0).GetQueue(); + auto pe_map_queue = pe_map_arrow->GetPort(0).GetQueue(); + auto pe_tap_queue = pe_tap_arrow->GetPort(0).GetQueue(); + + // Test connectivity + REQUIRE(src_arrow->GetPort(0).GetPool() == ts_pool); + REQUIRE(src_arrow->GetPort(1).GetQueue() == ts_map_queue); + REQUIRE(ts_map_arrow->GetPort(1).GetQueue() == unfold_queue); + REQUIRE(unfold_queue == unfold_arrow->GetPort(JEventLevel::Timeslice, JArrow::PortDirection::In).GetQueue()); + REQUIRE(pe_pool == unfold_arrow->GetPort(JEventLevel::PhysicsEvent, JArrow::PortDirection::In).GetPool()); + REQUIRE(unfold_arrow->GetPort(JEventLevel::Timeslice, JArrow::PortDirection::Out).GetPool() == ts_pool); + REQUIRE(unfold_arrow->GetPort(JEventLevel::PhysicsEvent, JArrow::PortDirection::Out).GetQueue() == pe_map_queue); + REQUIRE(pe_map_arrow->GetPort(1).GetQueue() == pe_tap_queue); + REQUIRE(pe_tap_arrow->GetPort(1).GetPool() == pe_pool); + JArrow::FireResult result = JArrow::FireResult::NotRunYet; - result = ee->Fire(TS_SRC, 0); + result = ee->Fire(src_arrow->GetId(), 0); REQUIRE(result == JArrow::FireResult::KeepGoing); - REQUIRE(top->GetArrows()[TS_SRC]->GetPort(1).GetQueue() == top->GetQueues()[0]); - REQUIRE(top->GetPools()[0]->GetCapacity() == 2); - REQUIRE(top->GetPools()[0]->GetSize(0) == 1); - REQUIRE(top->GetQueues()[0]->GetSize(0) == 1); + REQUIRE(ts_pool->GetCapacity() == 2); + REQUIRE(ts_pool->GetSize(0) == 1); + REQUIRE(ts_map_queue->GetSize(0) == 1); - result = ee->Fire(TS_MAP, 0); + result = ee->Fire(ts_map_arrow->GetId(), 0); REQUIRE(result == JArrow::FireResult::KeepGoing); - REQUIRE(top->GetQueues()[0]->GetSize(0) == 0); - REQUIRE(top->GetQueues()[1]->GetSize(0) == 1); - + REQUIRE(ts_map_queue->GetSize(0) == 0); + REQUIRE(unfold_queue->GetSize(0) == 1); + // Parent - result = ee->Fire(TS_UNF, 0); + result = ee->Fire(unfold_arrow->GetId(), 0); REQUIRE(result == JArrow::FireResult::KeepGoing); // Child - result = ee->Fire(TS_UNF, 0); + result = ee->Fire(unfold_arrow->GetId(), 0); REQUIRE(result == JArrow::FireResult::KeepGoing); - result = ee->Fire(PH_MAP, 0); + result = ee->Fire(pe_map_arrow->GetId(), 0); REQUIRE(result == JArrow::FireResult::KeepGoing); - result = ee->Fire(PH_TAP, 0); + result = ee->Fire(pe_tap_arrow->GetId(), 0); REQUIRE(result == JArrow::FireResult::KeepGoing); - REQUIRE(top->GetPools()[0]->GetSize(0) == 1); // Unfolder still has parent - REQUIRE(top->GetPools()[1]->GetSize(0) == 4); // Child returned to pool + REQUIRE(ts_pool->GetSize(0) == 1); // Unfolder still has parent + REQUIRE(pe_pool->GetSize(0) == 4); // Child returned to pool } From dc2f38d5d7012e1afc024d84a5e76ae381e36c7a Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Mon, 25 May 2026 13:53:26 -0400 Subject: [PATCH 11/22] Remove JTopologyBuilder::AttachLevel Replaced by CreateTopologyFromScratch() which is capable of working with multilevel sources/folders/unfolders --- .../JANA/Topology/JTopologyBuilder.cc | 243 ------------------ .../JANA/Topology/JTopologyBuilder.h | 2 - 2 files changed, 245 deletions(-) diff --git a/src/libraries/JANA/Topology/JTopologyBuilder.cc b/src/libraries/JANA/Topology/JTopologyBuilder.cc index c99e11aa2..dc6cf6734 100644 --- a/src/libraries/JANA/Topology/JTopologyBuilder.cc +++ b/src/libraries/JANA/Topology/JTopologyBuilder.cc @@ -159,7 +159,6 @@ void JTopologyBuilder::CreateTopology() { } else { CreateTopologyFromScratch(); - //AttachLevel(JEventLevel::Run, nullptr, nullptr, nullptr); LOG_INFO(GetLogger()) << "Arrow topology is:\n" << PrintTopology() << LOG_END; } for (auto* arrow : arrows) { @@ -465,15 +464,6 @@ void JTopologyBuilder::Connect(JArrow* upstream, size_t upstream_port_id, JArrow } -void JTopologyBuilder::ConnectToFirstAvailable(JArrow* upstream, size_t upstream_port, std::vector> downstreams) { - for (auto& [downstream, downstream_port_id] : downstreams) { - if (downstream != nullptr) { - Connect(upstream, upstream_port, downstream, downstream_port_id); - return; - } - } -} - std::pair JTopologyBuilder::CreateTapChain(std::vector& procs, std::string level) { JTapArrow* first = nullptr; @@ -500,236 +490,3 @@ std::pair JTopologyBuilder::CreateTapChain(std::vector sources_at_level; - for (JEventSource* source : m_components->get_evt_srces()) { - if (source->GetLevel() == current_level && source->IsEnabled()) { - sources_at_level.push_back(source); - } - } - - // Find all unfolders at this level - std::vector unfolders_at_level; - for (JEventUnfolder* unfolder : m_components->get_unfolders()) { - if (unfolder->GetLevel() == current_level && unfolder->IsEnabled()) { - unfolders_at_level.push_back(unfolder); - } - } - - // Find all processors at this level - std::vector mappable_procs_at_level; - std::vector tappable_procs_at_level; - - for (JEventProcessor* proc : m_components->get_evt_procs()) { - - if (proc->GetLevel() == current_level && proc->IsEnabled()) { - - // This may be a weird place to do it, but let's quickly validate that users aren't - // trying to enable ordering on a legacy event processor. We don't do this in the constructor - // because we don't want to put constraints on the order in which setters can be called, apart from "before Init()" - - if (proc->GetCallbackStyle() == JEventProcessor::CallbackStyle::LegacyMode && proc->IsOrderingEnabled()) { - throw JException("%s: Ordering can only be used with non-legacy JEventProcessors", proc->GetTypeName().c_str()); - } - - mappable_procs_at_level.push_back(proc); - if (proc->GetCallbackStyle() != JEventProcessor::CallbackStyle::LegacyMode) { - tappable_procs_at_level.push_back(proc); - } - } - } - - - bool is_top_level = (parent_unfolder == nullptr); - if (is_top_level && sources_at_level.size() == 0) { - // Skip level entirely when no source is present. - LOG_TRACE(GetLogger()) << "JTopologyBuilder: No sources found at level " << current_level << ", skipping" << LOG_END; - JEventLevel next = next_level(current_level); - if (next == JEventLevel::None) { - LOG_WARN(GetLogger()) << "No sources found: Processing topology will be empty." << LOG_END; - return; - } - return AttachLevel(next, nullptr, nullptr, nullptr); - } - - // Enforce constraints on what our builder will accept (at least for now) - if (!is_top_level && !sources_at_level.empty()) { - throw JException("Topology forbids event sources at lower event levels in the topology"); - } - if (unfolders_at_level.size() > 1) { - throw JException("Multiple JEventUnfolders provided for level %s", level_str.c_str()); - } - // Another constraint is that the highest level of the topology has an event sources, but this is automatically handled by - // the level-skipping logic above - - - // Fill out arrow grid from components at this event level - // -------------------------- - // 0. Pool - // -------------------------- - LOG_INFO(GetLogger()) << "Creating event pool with level=" << current_level << " and size=" << m_max_inflight_events[current_level]; - JEventPool* pool_at_level = new JEventPool(m_components, m_max_inflight_events[current_level], m_location_count, current_level); - pools.push_back(pool_at_level); // Hand over ownership of the pool to the topology - if (parent_pool != nullptr) { - LOG_DEBUG(GetLogger()) << "Attaching " << current_level << " pool to parent pool"; - pool_at_level->AttachForwardingPool(parent_pool); - } - LOG_INFO(GetLogger()) << "Finished creating event pool"; - - // -------------------------- - // 1. Source - // -------------------------- - JSourceArrow* src_arrow = nullptr; - bool need_source = !sources_at_level.empty(); - if (need_source) { - src_arrow = new JSourceArrow(level_str+"Source", current_level, sources_at_level); - src_arrow->GetPort(JSourceArrow::EVENT_IN).Attach(pool_at_level); - src_arrow->GetPort(JSourceArrow::EVENT_OUT).Attach(pool_at_level); - arrows.push_back(src_arrow); - } - - // -------------------------- - // 2. Map1 - // -------------------------- - bool have_parallel_sources = false; - for (JEventSource* source: sources_at_level) { - have_parallel_sources |= source->IsProcessParallelEnabled(); - } - bool have_unfolder = !unfolders_at_level.empty(); - JMapArrow* map1_arrow = nullptr; - bool need_map1 = (have_parallel_sources || have_unfolder); - - if (need_map1) { - map1_arrow = new JMapArrow(level_str+"Map1", current_level); - for (JEventSource* source: sources_at_level) { - if (source->IsProcessParallelEnabled()) { - map1_arrow->SetParallelSource(true); - } - } - for (JEventUnfolder* unf: unfolders_at_level) { - map1_arrow->AddUnfolder(unf); - } - map1_arrow->GetPort(JMapArrow::EVENT_IN).Attach(pool_at_level); - map1_arrow->GetPort(JMapArrow::EVENT_OUT).Attach(pool_at_level); - arrows.push_back(map1_arrow); - } - - // -------------------------- - // 3. Unfold - // -------------------------- - JUnfoldArrow* unfold_arrow = nullptr; - bool need_unfold = have_unfolder; - if (need_unfold) { - unfold_arrow = new JUnfoldArrow(level_str+"Unfold", unfolders_at_level[0]); - unfold_arrow->GetPort(JUnfoldArrow::PARENT_IN).Attach(pool_at_level); - unfold_arrow->GetPort(JUnfoldArrow::PARENT_OUT).Attach(pool_at_level); - arrows.push_back(unfold_arrow); - } - - // -------------------------- - // 4. Fold - // -------------------------- - JFoldArrow* fold_arrow = nullptr; - bool need_fold = false; // No folders_at_level for now - if(need_fold) { - fold_arrow = new JFoldArrow(level_str+"Fold", current_level, unfolders_at_level[0]->GetChildLevel()); - arrows.push_back(fold_arrow); - fold_arrow->GetPort(JFoldArrow::PARENT_OUT).Attach(pool_at_level); - } - - // -------------------------- - // 5. Map2 - // -------------------------- - JMapArrow* map2_arrow = nullptr; - bool need_map2 = !mappable_procs_at_level.empty(); - if (need_map2) { - map2_arrow = new JMapArrow(level_str+"Map2", current_level); - for (JEventProcessor* proc : mappable_procs_at_level) { - map2_arrow->AddProcessor(proc); - map2_arrow->GetPort(JMapArrow::EVENT_IN).Attach(pool_at_level); - map2_arrow->GetPort(JMapArrow::EVENT_OUT).Attach(pool_at_level); - } - arrows.push_back(map2_arrow); - } - - // -------------------------- - // 6. Tap - // -------------------------- - JTapArrow* first_tap_arrow = nullptr; - JTapArrow* last_tap_arrow = nullptr; - bool need_tap = !tappable_procs_at_level.empty(); - if (need_tap) { - std::tie(first_tap_arrow, last_tap_arrow) = CreateTapChain(tappable_procs_at_level, level_str); - first_tap_arrow->GetPort(JTapArrow::EVENT_IN).Attach(pool_at_level); - last_tap_arrow->GetPort(JTapArrow::EVENT_OUT).Attach(pool_at_level); - } - - - // Now that we've set up our component grid, we can do wiring! - // -------------------------- - // 1. Source - // -------------------------- - if (parent_unfolder != nullptr) { - parent_unfolder->GetPort(JUnfoldArrow::CHILD_IN).Attach(pool_at_level); - parent_unfolder->GetPort(JUnfoldArrow::CHILD_OUT).Attach(pool_at_level); - ConnectToFirstAvailable(parent_unfolder, JUnfoldArrow::CHILD_OUT, - {{map1_arrow, JMapArrow::EVENT_IN}, {unfold_arrow, JUnfoldArrow::PARENT_IN}, {map2_arrow, JMapArrow::EVENT_IN}, {first_tap_arrow, JTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}}); - } - if (src_arrow != nullptr) { - ConnectToFirstAvailable(src_arrow, JSourceArrow::EVENT_OUT, - {{map1_arrow, JMapArrow::EVENT_IN}, {unfold_arrow, JUnfoldArrow::PARENT_IN}, {map2_arrow, JMapArrow::EVENT_IN}, {first_tap_arrow, JTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}}); - } - if (map1_arrow != nullptr) { - ConnectToFirstAvailable(map1_arrow, JMapArrow::EVENT_OUT, - {{unfold_arrow, JUnfoldArrow::PARENT_IN}, {map2_arrow, JMapArrow::EVENT_IN}, {first_tap_arrow, JTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}}); - } - if (unfold_arrow != nullptr) { - ConnectToFirstAvailable(unfold_arrow, JUnfoldArrow::PARENT_OUT, - {{map2_arrow, JMapArrow::EVENT_IN}, {first_tap_arrow, JTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}}); - } - if (fold_arrow != nullptr) { - ConnectToFirstAvailable(fold_arrow, JFoldArrow::PARENT_OUT, - {{map2_arrow, JMapArrow::EVENT_IN}, {first_tap_arrow, JTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}}); - } - if (map2_arrow != nullptr) { - ConnectToFirstAvailable(map2_arrow, JMapArrow::EVENT_OUT, - {{first_tap_arrow, JTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}}); - } - if (last_tap_arrow != nullptr) { - ConnectToFirstAvailable(last_tap_arrow, JTapArrow::EVENT_OUT, - {{parent_folder, JFoldArrow::CHILD_IN}}); - } - if (parent_folder != nullptr) { - parent_folder->GetPort(JFoldArrow::CHILD_OUT).Attach(pool_at_level); - } - - // Finally, we recur over lower levels! - if (need_unfold) { - auto next_level = unfolders_at_level[0]->GetChildLevel(); - AttachLevel(next_level, unfold_arrow, fold_arrow, pool_at_level); - } - else { - // This is the lowest level - // TODO: Improve logic for determining event counts for multilevel topologies - if (last_tap_arrow != nullptr) { - last_tap_arrow->SetIsSink(true); - } - else if (map2_arrow != nullptr) { - map2_arrow->SetIsSink(true); - } - else if (map1_arrow != nullptr) { - map1_arrow->SetIsSink(true); - } - else if (src_arrow != nullptr) { - src_arrow->SetIsSink(true); - } - } -} - - - diff --git a/src/libraries/JANA/Topology/JTopologyBuilder.h b/src/libraries/JANA/Topology/JTopologyBuilder.h index 88bbe0ef1..ccfa1bc5f 100644 --- a/src/libraries/JANA/Topology/JTopologyBuilder.h +++ b/src/libraries/JANA/Topology/JTopologyBuilder.h @@ -79,8 +79,6 @@ class JTopologyBuilder : public JService { const JProcessorMapping& GetProcessorMapping() { return mapping; }; private: - void AttachLevel(JEventLevel current_level, JUnfoldArrow* parent_unfolder, JFoldArrow* parent_folder, JEventPool* parent_pool); - void ConnectToFirstAvailable(JArrow* upstream, size_t upstream_port_id, std::vector> downstreams); void Connect(JArrow* upstream, size_t upstream_port_id, JArrow* downstream, size_t downstream_port_id); std::pair CreateTapChain(std::vector& procs, std::string name); }; From d3393c5eae4b32a869982056253604602df9cee7 Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Mon, 25 May 2026 14:51:36 -0400 Subject: [PATCH 12/22] Print wiring service status after JANA splash screen --- src/libraries/JANA/JApplication.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/libraries/JANA/JApplication.cc b/src/libraries/JANA/JApplication.cc index a4a83ec1b..77e9cb457 100644 --- a/src/libraries/JANA/JApplication.cc +++ b/src/libraries/JANA/JApplication.cc @@ -115,10 +115,6 @@ void JApplication::Initialize() { // We trigger initialization m_service_locator->get(); - auto component_manager = m_service_locator->get(); - auto plugin_loader = m_service_locator->get(); - auto topology_builder = m_service_locator->get(); - auto wiring_service = m_service_locator->get(); // Set logger on JApplication itself m_logger = m_params->GetLogger("jana"); @@ -136,6 +132,10 @@ void JApplication::Initialize() { JVersion::PrintVersionDescription(oss); LOG_INFO(m_logger) << oss.str() << LOG_END; } + auto component_manager = m_service_locator->get(); + auto plugin_loader = m_service_locator->get(); + auto topology_builder = m_service_locator->get(); + auto wiring_service = m_service_locator->get(); // Attach all plugins for (const auto& plugin_name : wiring_service->GetPluginNames()) { From a3c5e3be95a0219dbcbb5430bc473688902464e2 Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Mon, 25 May 2026 14:52:23 -0400 Subject: [PATCH 13/22] Add "Order" column to topology table --- .../JANA/Topology/JTopologyBuilder.cc | 23 +++++++++++++------ 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/src/libraries/JANA/Topology/JTopologyBuilder.cc b/src/libraries/JANA/Topology/JTopologyBuilder.cc index dc6cf6734..d407e10f1 100644 --- a/src/libraries/JANA/Topology/JTopologyBuilder.cc +++ b/src/libraries/JANA/Topology/JTopologyBuilder.cc @@ -101,6 +101,7 @@ std::string JTopologyBuilder::PrintTopology() { t.AddColumn("Port", JTablePrinter::Justify::Left, 0); t.AddColumn("Place", JTablePrinter::Justify::Left, 0); t.AddColumn("ID", JTablePrinter::Justify::Left, 0); + t.AddColumn("Order", JTablePrinter::Justify::Left, 0); // Build index lookup for queues int i = 0; @@ -135,6 +136,18 @@ std::string JTopologyBuilder::PrintTopology() { t | port->GetName(); t | ((port->GetQueue() != nullptr) ? "Queue ": "Pool"); t | place_index; + if (port->GetEnforcesOrdering() && port->GetEstablishesOrdering()) { + t | "Both"; + } + else if (port->GetEnforcesOrdering()) { + t | "Enf"; + } + else if (port->GetEstablishesOrdering()) { + t | "Est"; + } + else { + t | ""; + } } } return t.Render(); @@ -155,11 +168,10 @@ void JTopologyBuilder::CreateTopology() { if (m_configure_topology) { m_configure_topology(*this, *m_components); - LOG_WARN(GetLogger()) << "Found custom topology configurator! Modified arrow topology is: \n" << PrintTopology() << LOG_END; + LOG_INFO(GetLogger()) << "Using custom topology configuration function" << LOG_END; } else { CreateTopologyFromScratch(); - LOG_INFO(GetLogger()) << "Arrow topology is:\n" << PrintTopology() << LOG_END; } for (auto* arrow : arrows) { arrow->SetLogger(GetLogger()); @@ -179,10 +191,7 @@ void JTopologyBuilder::CreateTopology() { queue->SetEstablishesOrdering(false); } } - size_t i = 0; - for (auto* queue: queues) { - LOG_DEBUG(GetLogger()) << "Queue " << i++ << ": establishes_ordering: " << queue->GetEstablishesOrdering() << ", enforces_ordering: " << queue->GetEnforcesOrdering(); - } + LOG_INFO(GetLogger()) << "Arrow topology is:\n" << PrintTopology() << LOG_END; } void JTopologyBuilder::CreateTopologyFromScratch() { @@ -470,8 +479,8 @@ std::pair JTopologyBuilder::CreateTapChain(std::vector 1) { arrow_name += std::to_string(i++); } From d72efa0e36d4b616cbfd94b2dd787e438fa39d5c Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Mon, 25 May 2026 16:12:24 -0400 Subject: [PATCH 14/22] Update JMultilevelSourceArrow to use new Port machinery --- .../JANA/Topology/JMultilevelSourceArrow.cc | 30 +++++++------------ .../JANA/Topology/JMultilevelSourceArrow.h | 12 +------- 2 files changed, 11 insertions(+), 31 deletions(-) diff --git a/src/libraries/JANA/Topology/JMultilevelSourceArrow.cc b/src/libraries/JANA/Topology/JMultilevelSourceArrow.cc index 785abaeab..6be36f353 100644 --- a/src/libraries/JANA/Topology/JMultilevelSourceArrow.cc +++ b/src/libraries/JANA/Topology/JMultilevelSourceArrow.cc @@ -4,23 +4,17 @@ #include -void JMultilevelSourceArrow::SetEventSource(JEventSource* source) { +JMultilevelSourceArrow::JMultilevelSourceArrow(const std::string& name, JEventSource* source) { + SetIsSource(true); + SetName(name); m_source = source; m_levels = source->GetParentLevels(); m_child_event_level = source->GetLevel(); m_levels.push_back(m_child_event_level); - m_next_input_port = 0; - - size_t input_port_count = 0; - size_t output_port_count = 0; for (auto level : m_levels) { AddPort(toString(level) + "In", level, PortDirection::In).SetSkipFinishEvent(true); - m_port_lookup[{level, Direction::In}] = input_port_count++; - } - for (auto level : m_levels) { AddPort(toString(level) + "Out", level, PortDirection::Out); - m_port_lookup[{level, Direction::Out}] = input_port_count + output_port_count++; } } @@ -28,10 +22,6 @@ const std::vector& JMultilevelSourceArrow::GetLevels() const { return m_levels; } -size_t JMultilevelSourceArrow::GetPortIndex(JEventLevel level, Direction direction) const { - return m_port_lookup.at({level, direction}); -}; - void JMultilevelSourceArrow::Initialize() { // We initialize everything immediately, but don't open any resources until we absolutely have to; see process(): source->DoNext() m_source->DoInit(); @@ -53,7 +43,7 @@ void JMultilevelSourceArrow::EvictNextParent(OutputData& outputs, size_t& output if (it != m_pending_parents.end()) { if (it->second.first != nullptr) { // There IS an old parent - size_t parent_output_port = GetPortIndex(m_next_input_level, Direction::Out); + size_t parent_output_port = GetPortIndex(m_next_input_level, PortDirection::Out); LOG_DEBUG(GetLogger()) << "JMultilevelSourceArrow: Evicting parent " << it->second.first->GetEventStamp() << " to port " << parent_output_port; outputs.at(output_count++) = {it->second.first, parent_output_port}; it->second.first = nullptr; @@ -68,7 +58,7 @@ void JMultilevelSourceArrow::Fire(JEvent* input, OutputData& outputs, size_t& ou LOG_DEBUG(m_logger) << "Executing arrow " << GetName() << LOG_END; auto result = m_source->DoNext(input->shared_from_this()); m_next_input_level = m_source->GetNextInputLevel(); - m_next_input_port = GetPortIndex(m_next_input_level, Direction::In); + m_next_input_port = GetPortIndex(m_next_input_level, PortDirection::In); LOG_DEBUG(GetLogger()) << "JMultilevelSourceArrow: Returned from DoNext(" << toString(input->GetLevel()) << "). Next input level is " << toString(m_next_input_level); @@ -86,7 +76,7 @@ void JMultilevelSourceArrow::Fire(JEvent* input, OutputData& outputs, size_t& ou input->SetParent(parent_pair.first); } } - outputs.at(output_count++) = {input, GetPortIndex(m_child_event_level, Direction::Out)}; + outputs.at(output_count++) = {input, GetPortIndex(m_child_event_level, PortDirection::Out)}; if (m_next_input_level != m_child_event_level) { // We have to evict the parent AFTER the successful child because the child still needs the references to that parent @@ -126,7 +116,7 @@ void JMultilevelSourceArrow::Fire(JEvent* input, OutputData& outputs, size_t& ou EvictNextParent(outputs, output_count); } // Return this event to the pool with no further action - outputs.at(output_count++) = {input, GetPortIndex(input->GetLevel(), Direction::In)}; + outputs.at(output_count++) = {input, GetPortIndex(input->GetLevel(), PortDirection::In)}; status = JArrow::FireResult::ComeBackLater; return; } @@ -135,14 +125,14 @@ void JMultilevelSourceArrow::Fire(JEvent* input, OutputData& outputs, size_t& ou EvictNextParent(outputs, output_count); } // Return this input event to the pool - outputs.at(output_count++) = {input, GetPortIndex(input->GetLevel(), Direction::In)}; + outputs.at(output_count++) = {input, GetPortIndex(input->GetLevel(), PortDirection::In)}; status = JArrow::FireResult::KeepGoing; return; } else if (result == JEventSource::Result::FailureFinished) { // Return this input event to the pool - outputs.at(output_count++) = {input, GetPortIndex(input->GetLevel(), Direction::In)}; + outputs.at(output_count++) = {input, GetPortIndex(input->GetLevel(), PortDirection::In)}; m_finish_in_progress = true; // Fall-through to if (finish_in_progress) below } @@ -157,7 +147,7 @@ void JMultilevelSourceArrow::Fire(JEvent* input, OutputData& outputs, size_t& ou // Found a parent auto parent = it->second.first; if (parent != nullptr) { - outputs.at(output_count++) = {parent, GetPortIndex(parent->GetLevel(), Direction::Out)}; + outputs.at(output_count++) = {parent, GetPortIndex(parent->GetLevel(), PortDirection::Out)}; } m_pending_parents.erase(it); } diff --git a/src/libraries/JANA/Topology/JMultilevelSourceArrow.h b/src/libraries/JANA/Topology/JMultilevelSourceArrow.h index 7f2bbb509..c2d828c1d 100644 --- a/src/libraries/JANA/Topology/JMultilevelSourceArrow.h +++ b/src/libraries/JANA/Topology/JMultilevelSourceArrow.h @@ -3,14 +3,10 @@ class JMultilevelSourceArrow : public JArrow { -public: - enum class Direction { In, Out }; private: JEventSource* m_source = nullptr; - std::vector m_levels; - std::map, size_t> m_port_lookup; JEventLevel m_child_event_level = JEventLevel::None; JEventLevel m_next_input_level; @@ -21,14 +17,8 @@ class JMultilevelSourceArrow : public JArrow { void EvictNextParent(OutputData& outputs, size_t& output_count); public: - JMultilevelSourceArrow() { - SetIsSource(true); - } - + JMultilevelSourceArrow(const std::string& arrow_name, JEventSource* source); const std::vector& GetLevels() const; - size_t GetPortIndex(JEventLevel level, Direction direction) const; - - void SetEventSource(JEventSource* source); void Initialize() override; void Fire(JEvent* input, OutputData& outputs, size_t& output_count, JArrow::FireResult& status) override; From f1b5f53cb414f4ae4be62e6073caca23a261d0d2 Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Mon, 25 May 2026 17:04:35 -0400 Subject: [PATCH 15/22] JTopologyBuilder autowires multi event sources --- src/libraries/JANA/JEventSource.cc | 7 +--- .../JANA/Topology/JTopologyBuilder.cc | 34 +++++++++++++------ .../Topology/JTopologyBuilderTests.cc | 4 +-- 3 files changed, 26 insertions(+), 19 deletions(-) diff --git a/src/libraries/JANA/JEventSource.cc b/src/libraries/JANA/JEventSource.cc index bd5429982..f3790fd35 100644 --- a/src/libraries/JANA/JEventSource.cc +++ b/src/libraries/JANA/JEventSource.cc @@ -175,13 +175,8 @@ JEventSource::Result JEventSource::DoNext(std::shared_ptr event) { DoClose(false); return Result::FailureFinished; } - else if (result == Result::FailureTryAgain) { - // We end up here if we tried to read an entry in a file but it is on a tape drive and isn't ready yet - // or if we polled the socket, found no new messages, but still expect messages later - return Result::FailureTryAgain; - } else { - throw JException("Invalid JEventSource::Result value!"); + return result; } } else { // status == Closed diff --git a/src/libraries/JANA/Topology/JTopologyBuilder.cc b/src/libraries/JANA/Topology/JTopologyBuilder.cc index d407e10f1..1b5773e62 100644 --- a/src/libraries/JANA/Topology/JTopologyBuilder.cc +++ b/src/libraries/JANA/Topology/JTopologyBuilder.cc @@ -10,10 +10,10 @@ #include "JANA/Topology/JArrow.h" #include "JANA/Utils/JEventLevel.h" #include "JSourceArrow.h" +#include "JMultilevelSourceArrow.h" #include "JMapArrow.h" #include "JTapArrow.h" #include "JUnfoldArrow.h" -#include "JFoldArrow.h" #include #include #include @@ -106,15 +106,15 @@ std::string JTopologyBuilder::PrintTopology() { // Build index lookup for queues int i = 0; std::map lookup; - for (JEventQueue* queue : queues) { - lookup[queue] = i; - i += 1; - } // Build index lookup for pools for (JEventPool* pool : pools) { lookup[pool] = i; i += 1; } + for (JEventQueue* queue : queues) { + lookup[queue] = i; + i += 1; + } // Build table bool show_row = true; @@ -225,12 +225,28 @@ void JTopologyBuilder::CreateTopologyFromScratch() { auto level = it.first; auto level_str = toString(level); bool need_map = false; + bool need_multi_arrow = false; for (auto* source : it.second) { - if (source->IsProcessParallelEnabled()) { - need_map = true; + need_map |= source->IsProcessParallelEnabled(); + need_multi_arrow |= (source->GetParentLevels().size() > 0); + } + + if (need_multi_arrow && sources.size() > 1) { + throw JException("Multiple multilevel JEventSources not supported yet"); + } + + JArrow* src_arrow; + if (need_multi_arrow) { + src_arrow = new JMultilevelSourceArrow(level_str+"MultiSource", it.second.at(0)); + // Add parent levels now. Child level is added further below. + for (auto parent_level: it.second.at(0)->GetParentLevels()) { + levels_present.insert(parent_level); + grid[{parent_level, Column::Source}] = {src_arrow, src_arrow}; } } - auto* src_arrow = new JSourceArrow(level_str+"Source", level, it.second); + else { + src_arrow = new JSourceArrow(level_str+"Source", level, it.second); + } AddArrow(src_arrow); if (need_map) { @@ -460,8 +476,6 @@ void JTopologyBuilder::Connect(JArrow* upstream, size_t upstream_port_id, JArrow downstream_port.Attach(queue); } - - upstream_port.Attach(queue); if (downstream_port.GetEnforcesOrdering()) { diff --git a/src/programs/unit_tests/Topology/JTopologyBuilderTests.cc b/src/programs/unit_tests/Topology/JTopologyBuilderTests.cc index 7ffaa87a5..e50b3324b 100644 --- a/src/programs/unit_tests/Topology/JTopologyBuilderTests.cc +++ b/src/programs/unit_tests/Topology/JTopologyBuilderTests.cc @@ -42,9 +42,7 @@ class MyMultiSource : public JEventSource { void configure_multisource_topology(JTopologyBuilder& builder, JComponentManager& components) { - auto* src_arrow = new JMultilevelSourceArrow; - src_arrow->SetName("src"); - src_arrow->SetEventSource(components.get_evt_srces().at(0)); + auto* src_arrow = new JMultilevelSourceArrow("src", components.get_evt_srces().at(0)); JTapArrow* tap_arrow = new JTapArrow("tap"); for (auto proc : components.get_evt_procs()) { From aa52d9a88d7af3afefd5f9533b52908ea588c3fc Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Mon, 25 May 2026 19:10:40 -0400 Subject: [PATCH 16/22] JTopologyBuilder autowires event folders --- src/libraries/JANA/JApplication.cc | 5 +++ src/libraries/JANA/JApplicationFwd.h | 2 ++ .../JANA/Services/JComponentManager.cc | 18 +++++++++++ .../JANA/Services/JComponentManager.h | 20 +++++++----- src/libraries/JANA/Topology/JMapArrow.cc | 4 +++ src/libraries/JANA/Topology/JMapArrow.h | 3 ++ .../JANA/Topology/JTopologyBuilder.cc | 32 +++++++++++++++++++ 7 files changed, 76 insertions(+), 8 deletions(-) diff --git a/src/libraries/JANA/JApplication.cc b/src/libraries/JANA/JApplication.cc index 77e9cb457..bdc20a02f 100644 --- a/src/libraries/JANA/JApplication.cc +++ b/src/libraries/JANA/JApplication.cc @@ -100,6 +100,11 @@ void JApplication::Add(JEventUnfolder* unfolder) { m_component_manager->add(unfolder); } +void JApplication::Add(JEventFolder* folder) { + /// Adds the given JEventFolder to the JANA context. Ownership is passed to JComponentManager. + m_component_manager->add(folder); +} + void JApplication::Initialize() { diff --git a/src/libraries/JANA/JApplicationFwd.h b/src/libraries/JANA/JApplicationFwd.h index 6cac6e403..c2d93f62d 100644 --- a/src/libraries/JANA/JApplicationFwd.h +++ b/src/libraries/JANA/JApplicationFwd.h @@ -19,6 +19,7 @@ class JComponentManager; class JPluginLoader; class JExecutionEngine; class JEventUnfolder; +class JEventFolder; class JServiceLocator; class JParameter; class JParameterManager; @@ -66,6 +67,7 @@ class JApplication { void Add(JEventSource* event_source); void Add(JEventProcessor* processor); void Add(JEventUnfolder* unfolder); + void Add(JEventFolder* folder); // Controlling processing diff --git a/src/libraries/JANA/Services/JComponentManager.cc b/src/libraries/JANA/Services/JComponentManager.cc index 21c0b774e..588c45fbe 100644 --- a/src/libraries/JANA/Services/JComponentManager.cc +++ b/src/libraries/JANA/Services/JComponentManager.cc @@ -8,6 +8,7 @@ #include #include #include +#include #include JComponentManager::JComponentManager() { @@ -31,6 +32,10 @@ JComponentManager::~JComponentManager() { LOG_TRACE(GetLogger()) << "Destroying unfolder with type=" << unfolder->GetTypeName(); delete unfolder; } + for (auto* folder : m_folders) { + LOG_TRACE(GetLogger()) << "Destroying folder with type=" << folder->GetTypeName(); + delete folder; + } // The order of deletion here sadly matters, because GlueX likes to fill // histograms inside component destructors. Thus event processors must be destroyed after @@ -104,6 +109,9 @@ void JComponentManager::preinitialize_components() { for (auto* unfolder : m_unfolders) { unfolder->Wire(GetApplication()); } + for (auto* folder : m_folders) { + folder->Wire(GetApplication()); + } } void JComponentManager::initialize_components() { @@ -128,6 +136,11 @@ void JComponentManager::initialize_components() { unfolder->Summarize(m_summary); } + // Unfolders + for (auto * folder : m_folders) { + folder->Summarize(m_summary); + } + JFactorySet dummy_fac_set; for (auto* fac_gen : m_fac_gens) { // TODO: Get rid of this @@ -186,6 +199,11 @@ void JComponentManager::add(JEventUnfolder* unfolder) { m_unfolders.push_back(unfolder); } +void JComponentManager::add(JEventFolder* folder) { + folder->SetPluginName(m_current_plugin_name); + m_folders.push_back(folder); +} + void JComponentManager::configure_event(JEvent& event) { auto* factory_set = event.GetFactorySet(); for (auto gen : m_fac_gens) { diff --git a/src/libraries/JANA/Services/JComponentManager.h b/src/libraries/JANA/Services/JComponentManager.h index 3f6a501e7..99bd1a958 100644 --- a/src/libraries/JANA/Services/JComponentManager.h +++ b/src/libraries/JANA/Services/JComponentManager.h @@ -14,6 +14,7 @@ class JEventProcessor; class JEventUnfolder; +class JEventFolder; class JComponentManager : public JService { public: @@ -23,17 +24,18 @@ class JComponentManager : public JService { void Init() override; // Called during plugin loading - void next_plugin(std::string plugin_name); + void NextPlugin(std::string plugin_name); - void add(std::string event_source_name); - void add(JEventSourceGenerator* source_generator); - void add(JFactoryGenerator* factory_generator); - void add(JEventSource* event_source); - void add(JEventProcessor* processor); - void add(JEventUnfolder* unfolder); + void Add(std::string event_source_name); + void Add(JEventSourceGenerator* source_generator); + void Add(JFactoryGenerator* factory_generator); + void Add(JEventSource* event_source); + void Add(JEventProcessor* processor); + void Add(JEventUnfolder* unfolder); + void Add(JEventFolder* folder); // Called after plugin loading - void configure_components(); + void ConfigureComponents(); // Helpers void preinitialize_components(); @@ -50,6 +52,7 @@ class JComponentManager : public JService { std::vector& get_evt_procs(); std::vector& get_fac_gens(); std::vector& get_unfolders(); + std::vector& get_folders(); void configure_event(JEvent& event); @@ -64,6 +67,7 @@ class JComponentManager : public JService { std::vector m_evt_srces; std::vector m_evt_procs; std::vector m_unfolders; + std::vector m_folders; std::map m_default_tags; bool m_enable_call_graph_recording = false; diff --git a/src/libraries/JANA/Topology/JMapArrow.cc b/src/libraries/JANA/Topology/JMapArrow.cc index 6531e8c53..ccfc643d7 100644 --- a/src/libraries/JANA/Topology/JMapArrow.cc +++ b/src/libraries/JANA/Topology/JMapArrow.cc @@ -25,6 +25,10 @@ void JMapArrow::AddUnfolder(JEventUnfolder* unfolder) { m_unfolders.push_back(unfolder); } +void JMapArrow::AddFolder(JEventFolder* folder) { + m_folders.push_back(folder); +} + void JMapArrow::AddProcessor(JEventProcessor* processor) { m_procs.push_back(processor); } diff --git a/src/libraries/JANA/Topology/JMapArrow.h b/src/libraries/JANA/Topology/JMapArrow.h index c64958f07..ea996015f 100644 --- a/src/libraries/JANA/Topology/JMapArrow.h +++ b/src/libraries/JANA/Topology/JMapArrow.h @@ -8,6 +8,7 @@ class JEventPool; class JEventSource; class JEventUnfolder; +class JEventFolder; class JEventProcessor; class JEvent; @@ -20,6 +21,7 @@ class JMapArrow : public JArrow { private: bool m_parallel_source = false; std::vector m_unfolders; + std::vector m_folders; std::vector m_procs; public: @@ -27,6 +29,7 @@ class JMapArrow : public JArrow { void SetParallelSource(bool is_parallel); void AddUnfolder(JEventUnfolder* unfolder); + void AddFolder(JEventFolder* folder); void AddProcessor(JEventProcessor* proc); void Fire(JEvent* input, OutputData& outputs, size_t& output_count, JArrow::FireResult& status); diff --git a/src/libraries/JANA/Topology/JTopologyBuilder.cc b/src/libraries/JANA/Topology/JTopologyBuilder.cc index 1b5773e62..2449a3e0e 100644 --- a/src/libraries/JANA/Topology/JTopologyBuilder.cc +++ b/src/libraries/JANA/Topology/JTopologyBuilder.cc @@ -14,6 +14,7 @@ #include "JMapArrow.h" #include "JTapArrow.h" #include "JUnfoldArrow.h" +#include "JFoldArrow.h" #include #include #include @@ -291,6 +292,37 @@ void JTopologyBuilder::CreateTopologyFromScratch() { grid[{child_level, Column::UnfoldAbove}] = {unfold_arrow, unfold_arrow}; } + // Place all folders on grid + // ----------------------------- + for (auto* folder: m_components->get_folders()) { + + if (!folder->IsEnabled()) continue; + + // Create unfold arrow + // Publish at _each_ grid location + auto parent_level = folder->GetLevel(); + auto child_level = folder->GetChildLevel(); + levels_present.insert(parent_level); + levels_present.insert(child_level); + + auto* map_arrow = new JMapArrow(toString(child_level)+"Map"+std::to_string(map_counter++), parent_level); + auto* fold_arrow = new JFoldArrow(toString(parent_level)+"Fold", parent_level, child_level); + fold_arrow->SetFolder(folder); + map_arrow->AddFolder(folder); + AddArrow(map_arrow); + AddArrow(fold_arrow); + Connect(map_arrow, map_arrow->EVENT_OUT, fold_arrow, fold_arrow->CHILD_IN); + + if (grid.find({parent_level, Column::FoldBelow}) != grid.end()) { + throw JException("Only one folder allowed for parent level=%s", toString(parent_level)); + } + if (grid.find({child_level, Column::FoldAbove}) != grid.end()) { + throw JException("Only one folder allowed for child level=%s", toString(child_level)); + } + grid[{parent_level, Column::FoldBelow}] = {fold_arrow, fold_arrow}; + grid[{child_level, Column::FoldAbove}] = {map_arrow, fold_arrow}; + } + // Place all processors on grid // ----------------------------- std::map> mappable_processors; From c6755ef4eaa6b2d6b9bf890663bf8987dc9314ce Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Mon, 25 May 2026 19:29:01 -0400 Subject: [PATCH 17/22] Convert JComponentManager methods to PascalCase --- src/libraries/JANA/JApplication.cc | 18 +++--- src/libraries/JANA/JEvent.cc | 2 +- .../JANA/Services/JComponentManager.cc | 56 ++++++++++--------- .../JANA/Services/JComponentManager.h | 26 ++++----- src/libraries/JANA/Services/JPluginLoader.cc | 2 +- src/libraries/JANA/Topology/JEventPool.cc | 4 +- .../JANA/Topology/JTopologyBuilder.cc | 8 +-- .../integration_tests/BatchedArrow.cc | 4 +- .../integration_tests/SimpleOffloading.cc | 6 +- .../Topology/JTopologyBuilderTests.cc | 4 +- 10 files changed, 67 insertions(+), 63 deletions(-) diff --git a/src/libraries/JANA/JApplication.cc b/src/libraries/JANA/JApplication.cc index bdc20a02f..be319d56d 100644 --- a/src/libraries/JANA/JApplication.cc +++ b/src/libraries/JANA/JApplication.cc @@ -71,38 +71,38 @@ void JApplication::AddPluginPath(std::string path) { void JApplication::Add(JEventSource* event_source) { /// Adds the given JEventSource to the JANA context. Ownership is passed to JComponentManager. - m_component_manager->add(event_source); + m_component_manager->Add(event_source); } void JApplication::Add(JEventSourceGenerator *source_generator) { /// Adds the given JEventSourceGenerator to the JANA context. Ownership is passed to JComponentManager. - m_component_manager->add(source_generator); + m_component_manager->Add(source_generator); } void JApplication::Add(JFactoryGenerator *factory_generator) { /// Adds the given JFactoryGenerator to the JANA context. Ownership is passed to JComponentManager. - m_component_manager->add(factory_generator); + m_component_manager->Add(factory_generator); } void JApplication::Add(JEventProcessor* processor) { /// Adds the given JEventProcessor to the JANA context. Ownership is passed to JComponentManager. - m_component_manager->add(processor); + m_component_manager->Add(processor); } void JApplication::Add(std::string event_source_name) { /// Adds the event source name (e.g. a file or socket name) to the JANA context. JANA will instantiate /// the corresponding JEventSource using a user-provided JEventSourceGenerator. - m_component_manager->add(event_source_name); + m_component_manager->Add(event_source_name); } void JApplication::Add(JEventUnfolder* unfolder) { /// Adds the given JEventUnfolder to the JANA context. Ownership is passed to JComponentManager. - m_component_manager->add(unfolder); + m_component_manager->Add(unfolder); } void JApplication::Add(JEventFolder* folder) { /// Adds the given JEventFolder to the JANA context. Ownership is passed to JComponentManager. - m_component_manager->add(folder); + m_component_manager->Add(folder); } @@ -149,7 +149,7 @@ void JApplication::Initialize() { plugin_loader->attach_plugins(component_manager.get()); // Resolve and initialize all components - component_manager->configure_components(); + component_manager->ConfigureComponents(); // Once all components have been wired in jcm::configure_components(), there shouldn't be any unused wirings wiring_service->CheckAllWiringsAreUsed(); @@ -302,7 +302,7 @@ int JApplication::GetExitCode() { const JComponentSummary& JApplication::GetComponentSummary() { /// Returns a data object describing all components currently running - return m_component_manager->get_component_summary(); + return m_component_manager->GetComponentSummary(); } // Performance/status monitoring diff --git a/src/libraries/JANA/JEvent.cc b/src/libraries/JANA/JEvent.cc index 6bd8ebc01..d8086cf83 100644 --- a/src/libraries/JANA/JEvent.cc +++ b/src/libraries/JANA/JEvent.cc @@ -12,7 +12,7 @@ JEvent::JEvent() : mInspector(this){ JEvent::JEvent(JApplication* app) : mInspector(this) { // Furnish the JEvent with the parameter values and factory generators provided to the JApplication app->Initialize(); - app->GetService()->configure_event(*this); + app->GetService()->ConfigureEvent(*this); } JEvent::~JEvent() { diff --git a/src/libraries/JANA/Services/JComponentManager.cc b/src/libraries/JANA/Services/JComponentManager.cc index 588c45fbe..2511367e6 100644 --- a/src/libraries/JANA/Services/JComponentManager.cc +++ b/src/libraries/JANA/Services/JComponentManager.cc @@ -57,7 +57,7 @@ void JComponentManager::Init() { // We handle parameters in configure_components() instead. } -void JComponentManager::configure_components() { +void JComponentManager::ConfigureComponents() { m_params->SetDefaultParameter("event_source_type", m_user_evt_src_typename, "Manually specifies which JEventSource should open the input file"); m_params->SetDefaultParameter("record_call_stack", @@ -81,17 +81,17 @@ void JComponentManager::configure_components() { } // Give all components a JApplication pointer and a logger - preinitialize_components(); + PreinitializeComponents(); // Resolve all event sources now that all plugins have been loaded // (Note that we need to wire the event source generators beforehand) - resolve_event_sources(); + ResolveEventSources(); // Call Summarize() and Init() in order to populate JComponentSummary and JParameterManager, respectively - initialize_components(); + InitializeComponents(); } -void JComponentManager::preinitialize_components() { +void JComponentManager::PreinitializeComponents() { for (auto* src : m_evt_srces) { src->Wire(GetApplication()); } @@ -114,7 +114,7 @@ void JComponentManager::preinitialize_components() { } } -void JComponentManager::initialize_components() { +void JComponentManager::InitializeComponents() { // For now, this only computes the summary for all components except factories. // However, we are likely to eventually want summaries to access information only // available after component initialization, specifically parameters. In this case, @@ -165,46 +165,46 @@ void JComponentManager::initialize_components() { } } -void JComponentManager::next_plugin(std::string plugin_name) { +void JComponentManager::NextPlugin(std::string plugin_name) { // We defer resolving event sources until we have finished loading all plugins m_current_plugin_name = plugin_name; } -void JComponentManager::add(std::string event_source_name) { +void JComponentManager::Add(std::string event_source_name) { m_src_names.push_back(event_source_name); } -void JComponentManager::add(JEventSourceGenerator *source_generator) { +void JComponentManager::Add(JEventSourceGenerator *source_generator) { source_generator->SetPluginName(m_current_plugin_name); m_src_gens.push_back(source_generator); } -void JComponentManager::add(JFactoryGenerator *factory_generator) { +void JComponentManager::Add(JFactoryGenerator *factory_generator) { factory_generator->SetPluginName(m_current_plugin_name); m_fac_gens.push_back(factory_generator); } -void JComponentManager::add(JEventSource *event_source) { +void JComponentManager::Add(JEventSource *event_source) { event_source->SetPluginName(m_current_plugin_name); m_evt_srces.push_back(event_source); } -void JComponentManager::add(JEventProcessor *processor) { +void JComponentManager::Add(JEventProcessor *processor) { processor->SetPluginName(m_current_plugin_name); m_evt_procs.push_back(processor); } -void JComponentManager::add(JEventUnfolder* unfolder) { +void JComponentManager::Add(JEventUnfolder* unfolder) { unfolder->SetPluginName(m_current_plugin_name); m_unfolders.push_back(unfolder); } -void JComponentManager::add(JEventFolder* folder) { +void JComponentManager::Add(JEventFolder* folder) { folder->SetPluginName(m_current_plugin_name); m_folders.push_back(folder); } -void JComponentManager::configure_event(JEvent& event) { +void JComponentManager::ConfigureEvent(JEvent& event) { auto* factory_set = event.GetFactorySet(); for (auto gen : m_fac_gens) { gen->GenerateFactories(factory_set); @@ -216,11 +216,11 @@ void JComponentManager::configure_event(JEvent& event) { -void JComponentManager::resolve_event_sources() { +void JComponentManager::ResolveEventSources() { - m_user_evt_src_gen = resolve_user_event_source_generator(); + m_user_evt_src_gen = ResolveUserEventSourceGenerator(); for (auto& source_name : m_src_names) { - auto* generator = resolve_event_source(source_name); + auto* generator = ResolveEventSource(source_name); auto source = generator->MakeJEventSource(source_name); source->SetPluginName(generator->GetPluginName()); source->Wire(GetApplication()); @@ -236,7 +236,7 @@ void JComponentManager::resolve_event_sources() { } } -JEventSourceGenerator *JComponentManager::resolve_event_source(std::string source_name) const { +JEventSourceGenerator *JComponentManager::ResolveEventSource(std::string source_name) const { // Always use the user override if they provided one if (m_user_evt_src_gen != nullptr) { @@ -264,7 +264,7 @@ JEventSourceGenerator *JComponentManager::resolve_event_source(std::string sourc } -JEventSourceGenerator* JComponentManager::resolve_user_event_source_generator() const { +JEventSourceGenerator* JComponentManager::ResolveUserEventSourceGenerator() const { // If the user didn't specify an EVENT_SOURCE_TYPE, do nothing if (m_user_evt_src_typename == "") return nullptr; @@ -290,27 +290,31 @@ JEventSourceGenerator* JComponentManager::resolve_user_event_source_generator() } -std::vector& JComponentManager::get_evt_src_gens() { +std::vector& JComponentManager::GetSourceGenerators() { return m_src_gens; } -std::vector& JComponentManager::get_evt_srces() { +std::vector& JComponentManager::GetSources() { return m_evt_srces; } -std::vector& JComponentManager::get_evt_procs() { +std::vector& JComponentManager::GetProcessors() { return m_evt_procs; } -std::vector& JComponentManager::get_fac_gens() { +std::vector& JComponentManager::GetFactoryGenerators() { return m_fac_gens; } -std::vector& JComponentManager::get_unfolders() { +std::vector& JComponentManager::GetUnfolders() { return m_unfolders; } -const JComponentSummary& JComponentManager::get_component_summary() { +std::vector& JComponentManager::GetFolders() { + return m_folders; +} + +const JComponentSummary& JComponentManager::GetComponentSummary() { return m_summary; } diff --git a/src/libraries/JANA/Services/JComponentManager.h b/src/libraries/JANA/Services/JComponentManager.h index 99bd1a958..d27456197 100644 --- a/src/libraries/JANA/Services/JComponentManager.h +++ b/src/libraries/JANA/Services/JComponentManager.h @@ -38,23 +38,23 @@ class JComponentManager : public JService { void ConfigureComponents(); // Helpers - void preinitialize_components(); - void resolve_event_sources(); - void initialize_components(); - JEventSourceGenerator* resolve_user_event_source_generator() const; - JEventSourceGenerator* resolve_event_source(std::string source_name) const; + void PreinitializeComponents(); + void ResolveEventSources(); + void InitializeComponents(); + JEventSourceGenerator* ResolveUserEventSourceGenerator() const; + JEventSourceGenerator* ResolveEventSource(std::string source_name) const; // Called after JApplication::Initialize() finishes - const JComponentSummary& get_component_summary(); + const JComponentSummary& GetComponentSummary(); - std::vector& get_evt_src_gens(); - std::vector& get_evt_srces(); - std::vector& get_evt_procs(); - std::vector& get_fac_gens(); - std::vector& get_unfolders(); - std::vector& get_folders(); + std::vector& GetSourceGenerators(); + std::vector& GetSources(); + std::vector& GetProcessors(); + std::vector& GetFactoryGenerators(); + std::vector& GetUnfolders(); + std::vector& GetFolders(); - void configure_event(JEvent& event); + void ConfigureEvent(JEvent& event); private: diff --git a/src/libraries/JANA/Services/JPluginLoader.cc b/src/libraries/JANA/Services/JPluginLoader.cc index 1ea7b38a1..568ec9839 100644 --- a/src/libraries/JANA/Services/JPluginLoader.cc +++ b/src/libraries/JANA/Services/JPluginLoader.cc @@ -156,7 +156,7 @@ void JPluginLoader::attach_plugins(JComponentManager* jcm) { // see an extremely difficult-to-debug error (usually a segfault) stemming from the binary incompatibility // between the host application and the plugin. - jcm->next_plugin(name); + jcm->NextPlugin(name); attach_plugin(name, path); // Throws JException on failure } } diff --git a/src/libraries/JANA/Topology/JEventPool.cc b/src/libraries/JANA/Topology/JEventPool.cc index fc0d5442e..9389960c9 100644 --- a/src/libraries/JANA/Topology/JEventPool.cc +++ b/src/libraries/JANA/Topology/JEventPool.cc @@ -22,7 +22,7 @@ JEventPool::JEventPool(std::shared_ptr component_manager, m_owned_events.push_back(std::make_shared()); auto evt = &m_owned_events.back(); (*evt)->SetLevel(m_level); // Level needs to be set before factories get added in configure_event - m_component_manager->configure_event(**evt); + m_component_manager->ConfigureEvent(**evt); Push(evt->get(), evt_idx % location_count); } } @@ -59,7 +59,7 @@ void JEventPool::Scale(size_t capacity) { m_owned_events.push_back(std::make_shared()); auto evt = &m_owned_events.back(); (*evt)->SetLevel(m_level); // Level needs to be set before factories get added in configure_event - m_component_manager->configure_event(**evt); + m_component_manager->ConfigureEvent(**evt); Push(evt->get(), evt_idx % GetLocationCount()); } } diff --git a/src/libraries/JANA/Topology/JTopologyBuilder.cc b/src/libraries/JANA/Topology/JTopologyBuilder.cc index 2449a3e0e..09eb2fa31 100644 --- a/src/libraries/JANA/Topology/JTopologyBuilder.cc +++ b/src/libraries/JANA/Topology/JTopologyBuilder.cc @@ -216,7 +216,7 @@ void JTopologyBuilder::CreateTopologyFromScratch() { // Place all sources on grid // ----------------------------- std::map> sources; - for (JEventSource* source : m_components->get_evt_srces()) { + for (JEventSource* source : m_components->GetSources()) { if (source->IsEnabled()) { sources[source->GetLevel()].push_back(source); levels_present.insert(source->GetLevel()); @@ -264,7 +264,7 @@ void JTopologyBuilder::CreateTopologyFromScratch() { // Place all unfolders on grid // ----------------------------- - for (auto* unfolder: m_components->get_unfolders()) { + for (auto* unfolder: m_components->GetUnfolders()) { if (!unfolder->IsEnabled()) continue; @@ -294,7 +294,7 @@ void JTopologyBuilder::CreateTopologyFromScratch() { // Place all folders on grid // ----------------------------- - for (auto* folder: m_components->get_folders()) { + for (auto* folder: m_components->GetFolders()) { if (!folder->IsEnabled()) continue; @@ -327,7 +327,7 @@ void JTopologyBuilder::CreateTopologyFromScratch() { // ----------------------------- std::map> mappable_processors; std::map> tappable_processors; - for (auto* proc : m_components->get_evt_procs()) { + for (auto* proc : m_components->GetProcessors()) { if (proc->IsEnabled()) { levels_present.insert(proc->GetLevel()); if (proc->GetCallbackStyle() == JEventProcessor::CallbackStyle::LegacyMode && proc->IsOrderingEnabled()) { diff --git a/src/programs/integration_tests/BatchedArrow.cc b/src/programs/integration_tests/BatchedArrow.cc index f5e92c069..c76f4d232 100644 --- a/src/programs/integration_tests/BatchedArrow.cc +++ b/src/programs/integration_tests/BatchedArrow.cc @@ -103,12 +103,12 @@ struct BatchedProc : public JEventProcessor { void configure_batched_topology(JTopologyBuilder& builder, JComponentManager& component_manager) { - auto* src_arrow = new JSourceArrow("PhysicsEventSource", JEventLevel::PhysicsEvent, component_manager.get_evt_srces()); + auto* src_arrow = new JSourceArrow("PhysicsEventSource", JEventLevel::PhysicsEvent, component_manager.GetSources()); BatchedArrow* batched_arrow = new BatchedArrow(JEventLevel::PhysicsEvent); JTapArrow* tap_arrow = new JTapArrow("PhysicsEventTap", JEventLevel::PhysicsEvent); - for (auto proc : component_manager.get_evt_procs()) { + for (auto proc : component_manager.GetProcessors()) { tap_arrow->AddProcessor(proc); } diff --git a/src/programs/integration_tests/SimpleOffloading.cc b/src/programs/integration_tests/SimpleOffloading.cc index 27c1123b7..785a62f72 100644 --- a/src/programs/integration_tests/SimpleOffloading.cc +++ b/src/programs/integration_tests/SimpleOffloading.cc @@ -118,7 +118,7 @@ struct OffloadArrow : public JArrow { void configure_topology(JTopologyBuilder& builder, JComponentManager& components) { - auto* src_arrow = new JSourceArrow("src", JEventLevel::PhysicsEvent, components.get_evt_srces()); + auto* src_arrow = new JSourceArrow("src", JEventLevel::PhysicsEvent, components.GetSources()); TriggerFactoryInputsArrow* trigger_inputs_arrow = new TriggerFactoryInputsArrow(JEventLevel::PhysicsEvent); trigger_inputs_arrow->unique_name = "B"; @@ -127,12 +127,12 @@ void configure_topology(JTopologyBuilder& builder, JComponentManager& components offload_arrow->unique_name = "B"; JMapArrow* map_arrow = new JMapArrow("map", JEventLevel::PhysicsEvent); - for (auto proc : components.get_evt_procs()) { + for (auto proc : components.GetProcessors()) { map_arrow->AddProcessor(proc); } JTapArrow* tap_arrow = new JTapArrow("tap", JEventLevel::PhysicsEvent); - for (auto proc : components.get_evt_procs()) { + for (auto proc : components.GetProcessors()) { tap_arrow->AddProcessor(proc); } diff --git a/src/programs/unit_tests/Topology/JTopologyBuilderTests.cc b/src/programs/unit_tests/Topology/JTopologyBuilderTests.cc index e50b3324b..17c67dae6 100644 --- a/src/programs/unit_tests/Topology/JTopologyBuilderTests.cc +++ b/src/programs/unit_tests/Topology/JTopologyBuilderTests.cc @@ -42,10 +42,10 @@ class MyMultiSource : public JEventSource { void configure_multisource_topology(JTopologyBuilder& builder, JComponentManager& components) { - auto* src_arrow = new JMultilevelSourceArrow("src", components.get_evt_srces().at(0)); + auto* src_arrow = new JMultilevelSourceArrow("src", components.GetSources().at(0)); JTapArrow* tap_arrow = new JTapArrow("tap"); - for (auto proc : components.get_evt_procs()) { + for (auto proc : components.GetProcessors()) { tap_arrow->AddProcessor(proc); } From 83ef629f94b3a21a18d82faddad9f4382756c95e Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Mon, 25 May 2026 20:36:21 -0400 Subject: [PATCH 18/22] Small fix --- src/libraries/JANA/Topology/JTopologyBuilder.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/libraries/JANA/Topology/JTopologyBuilder.cc b/src/libraries/JANA/Topology/JTopologyBuilder.cc index 09eb2fa31..a38854cf7 100644 --- a/src/libraries/JANA/Topology/JTopologyBuilder.cc +++ b/src/libraries/JANA/Topology/JTopologyBuilder.cc @@ -283,10 +283,10 @@ void JTopologyBuilder::CreateTopologyFromScratch() { Connect(map_arrow, map_arrow->EVENT_OUT, unfold_arrow, unfold_arrow->PARENT_IN); if (grid.find({parent_level, Column::UnfoldBelow}) != grid.end()) { - throw JException("Only one unfolder allowed for parent level=%s", toString(parent_level)); + throw JException("Only one unfolder allowed for parent level=%s", toString(parent_level).c_str()); } if (grid.find({child_level, Column::UnfoldAbove}) != grid.end()) { - throw JException("Only one unfolder allowed for child level=%s", toString(child_level)); + throw JException("Only one unfolder allowed for child level=%s", toString(child_level).c_str()); } grid[{parent_level, Column::UnfoldBelow}] = {map_arrow, unfold_arrow}; grid[{child_level, Column::UnfoldAbove}] = {unfold_arrow, unfold_arrow}; @@ -314,10 +314,10 @@ void JTopologyBuilder::CreateTopologyFromScratch() { Connect(map_arrow, map_arrow->EVENT_OUT, fold_arrow, fold_arrow->CHILD_IN); if (grid.find({parent_level, Column::FoldBelow}) != grid.end()) { - throw JException("Only one folder allowed for parent level=%s", toString(parent_level)); + throw JException("Only one folder allowed for parent level=%s", toString(parent_level).c_str()); } if (grid.find({child_level, Column::FoldAbove}) != grid.end()) { - throw JException("Only one folder allowed for child level=%s", toString(child_level)); + throw JException("Only one folder allowed for child level=%s", toString(child_level).c_str()); } grid[{parent_level, Column::FoldBelow}] = {fold_arrow, fold_arrow}; grid[{child_level, Column::FoldAbove}] = {map_arrow, fold_arrow}; From ccbd4714b4047796250070d53048ec2305a31dd8 Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Mon, 25 May 2026 23:07:07 -0400 Subject: [PATCH 19/22] Update TimesliceExample TimesliceExample requires an additional flag now to decide whether to include the unfolder or not. Previously, specifying an Unfolder with no parent source would lead to the unfolder being ignored. Now, however, specifying an Unfolder with a child source and no parent source is a valid and useful topology. --- .../misc/TimesliceExample/CMakeLists.txt | 4 +- .../misc/TimesliceExample/TimesliceExample.cc | 39 +++++++++++-------- 2 files changed, 24 insertions(+), 19 deletions(-) diff --git a/src/examples/misc/TimesliceExample/CMakeLists.txt b/src/examples/misc/TimesliceExample/CMakeLists.txt index efd750b94..683c1c2b7 100644 --- a/src/examples/misc/TimesliceExample/CMakeLists.txt +++ b/src/examples/misc/TimesliceExample/CMakeLists.txt @@ -7,10 +7,10 @@ if (USE_PODIO) target_link_libraries(TimesliceExample PUBLIC PodioDatamodel PodioDatamodelDict podio::podioRootIO) add_test(NAME jana-example-timeslices-simple-tests - COMMAND ${CMAKE_INSTALL_PREFIX}/bin/jana -Pplugins=TimesliceExample -Pjana:nevents=10 events.root) + COMMAND ${CMAKE_INSTALL_PREFIX}/bin/jana -Pplugins=TimesliceExample -Puse_timeslices=0 -Pjana:nevents=10 events.root) add_test(NAME jana-example-timeslices-complex-tests - COMMAND ${CMAKE_INSTALL_PREFIX}/bin/jana -Pplugins=TimesliceExample -Pjana:nevents=10 timeslices.root) + COMMAND ${CMAKE_INSTALL_PREFIX}/bin/jana -Pplugins=TimesliceExample -Puse_timeslices=1 -Pjana:nevents=10 timeslices.root) else() message(STATUS "Skipping examples/TimesliceExample because USE_PODIO=Off") diff --git a/src/examples/misc/TimesliceExample/TimesliceExample.cc b/src/examples/misc/TimesliceExample/TimesliceExample.cc index 587d8bbab..f64d6164f 100644 --- a/src/examples/misc/TimesliceExample/TimesliceExample.cc +++ b/src/examples/misc/TimesliceExample/TimesliceExample.cc @@ -26,23 +26,28 @@ void InitPlugin(JApplication *app) { // Event processor that writes events (and timeslices, if they are present) to file app->Add(new MyFileWriter()); - // Unfolder that takes timeslices and splits them into physics events. - app->Add(new MyTimesliceSplitter()); - - // Factory that produces timeslice-level protoclusters from timeslice-level hits - app->Add(new JOmniFactoryGeneratorT( - { .tag = "timeslice_protoclusterizer", - .level = JEventLevel::Timeslice, - .input_names = {"hits"}, - .output_names = {"ts_protoclusters"} - })); - - // Factory that produces event-level protoclusters from event-level hits - app->Add(new JOmniFactoryGeneratorT( - { .tag = "event_protoclusterizer", - .input_names = {"hits"}, - .output_names = {"evt_protoclusters"}} - )); + bool use_timeslices = app->RegisterParameter("use_timeslices", true); + + if (use_timeslices) { + // Unfolder that takes timeslices and splits them into physics events. + app->Add(new MyTimesliceSplitter()); + + // Factory that produces timeslice-level protoclusters from timeslice-level hits + app->Add(new JOmniFactoryGeneratorT( + { .tag = "timeslice_protoclusterizer", + .level = JEventLevel::Timeslice, + .input_names = {"hits"}, + .output_names = {"ts_protoclusters"} + })); + } + else { + // Factory that produces event-level protoclusters from event-level hits + app->Add(new JOmniFactoryGeneratorT( + { .tag = "event_protoclusterizer", + .input_names = {"hits"}, + .output_names = {"evt_protoclusters"}} + )); + } // Factory that produces event-level clusters from event-level protoclusters app->Add(new JOmniFactoryGeneratorT( From d3d9898d5a2d6996adf093293cd283939cea5039 Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Mon, 25 May 2026 23:25:02 -0400 Subject: [PATCH 20/22] Bring back JCM::get_evt_src_gens This is used in one place by halld_recon, apparently. --- src/libraries/JANA/Services/JComponentManager.cc | 4 ++++ src/libraries/JANA/Services/JComponentManager.h | 3 +++ 2 files changed, 7 insertions(+) diff --git a/src/libraries/JANA/Services/JComponentManager.cc b/src/libraries/JANA/Services/JComponentManager.cc index 2511367e6..ea404867a 100644 --- a/src/libraries/JANA/Services/JComponentManager.cc +++ b/src/libraries/JANA/Services/JComponentManager.cc @@ -290,6 +290,10 @@ JEventSourceGenerator* JComponentManager::ResolveUserEventSourceGenerator() cons } +std::vector& JComponentManager::get_evt_src_gens() { + return m_src_gens; +} + std::vector& JComponentManager::GetSourceGenerators() { return m_src_gens; } diff --git a/src/libraries/JANA/Services/JComponentManager.h b/src/libraries/JANA/Services/JComponentManager.h index d27456197..ef5408175 100644 --- a/src/libraries/JANA/Services/JComponentManager.h +++ b/src/libraries/JANA/Services/JComponentManager.h @@ -47,6 +47,9 @@ class JComponentManager : public JService { // Called after JApplication::Initialize() finishes const JComponentSummary& GetComponentSummary(); + // TODO: Deprecate + std::vector& get_evt_src_gens(); + std::vector& GetSourceGenerators(); std::vector& GetSources(); std::vector& GetProcessors(); From 38ebe9aa80ba390103056b7cafaf18fdc1c4e6e9 Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Mon, 25 May 2026 23:32:47 -0400 Subject: [PATCH 21/22] Small CI fix --- .github/workflows/test_integration_epic.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/test_integration_epic.yml b/.github/workflows/test_integration_epic.yml index 8b9ded597..b216632a6 100644 --- a/.github/workflows/test_integration_epic.yml +++ b/.github/workflows/test_integration_epic.yml @@ -94,6 +94,7 @@ jobs: $GITHUB_WORKSPACE/bin/jana \ -Pplugins=TimesliceExample \ -Pjana:nevents=100 \ + -Puse_timeslices=0 \ events.root - name: Run TimesliceExample with complex (timeslice) topology From fbc0d779be8ee9bc09a0295eaed338c18950635b Mon Sep 17 00:00:00 2001 From: Nathan Brei Date: Mon, 25 May 2026 23:59:20 -0400 Subject: [PATCH 22/22] Another small fix --- src/libraries/JANA/Services/JComponentManager.cc | 4 ++++ src/libraries/JANA/Services/JComponentManager.h | 4 +++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/src/libraries/JANA/Services/JComponentManager.cc b/src/libraries/JANA/Services/JComponentManager.cc index ea404867a..77f294b8d 100644 --- a/src/libraries/JANA/Services/JComponentManager.cc +++ b/src/libraries/JANA/Services/JComponentManager.cc @@ -294,6 +294,10 @@ std::vector& JComponentManager::get_evt_src_gens() { return m_src_gens; } +std::vector& JComponentManager::get_evt_srces() { + return m_evt_srces; +} + std::vector& JComponentManager::GetSourceGenerators() { return m_src_gens; } diff --git a/src/libraries/JANA/Services/JComponentManager.h b/src/libraries/JANA/Services/JComponentManager.h index ef5408175..b108da38a 100644 --- a/src/libraries/JANA/Services/JComponentManager.h +++ b/src/libraries/JANA/Services/JComponentManager.h @@ -47,8 +47,10 @@ class JComponentManager : public JService { // Called after JApplication::Initialize() finishes const JComponentSummary& GetComponentSummary(); - // TODO: Deprecate + [[deprecated]] std::vector& get_evt_src_gens(); + [[deprecated]] + std::vector& get_evt_srces(); std::vector& GetSourceGenerators(); std::vector& GetSources();