diff --git a/.github/workflows/test_integration_epic.yml b/.github/workflows/test_integration_epic.yml index 8b9ded597..b216632a6 100644 --- a/.github/workflows/test_integration_epic.yml +++ b/.github/workflows/test_integration_epic.yml @@ -94,6 +94,7 @@ jobs: $GITHUB_WORKSPACE/bin/jana \ -Pplugins=TimesliceExample \ -Pjana:nevents=100 \ + -Puse_timeslices=0 \ events.root - name: Run TimesliceExample with complex (timeslice) topology diff --git a/src/examples/misc/TimesliceExample/CMakeLists.txt b/src/examples/misc/TimesliceExample/CMakeLists.txt index efd750b94..683c1c2b7 100644 --- a/src/examples/misc/TimesliceExample/CMakeLists.txt +++ b/src/examples/misc/TimesliceExample/CMakeLists.txt @@ -7,10 +7,10 @@ if (USE_PODIO) target_link_libraries(TimesliceExample PUBLIC PodioDatamodel PodioDatamodelDict podio::podioRootIO) add_test(NAME jana-example-timeslices-simple-tests - COMMAND ${CMAKE_INSTALL_PREFIX}/bin/jana -Pplugins=TimesliceExample -Pjana:nevents=10 events.root) + COMMAND ${CMAKE_INSTALL_PREFIX}/bin/jana -Pplugins=TimesliceExample -Puse_timeslices=0 -Pjana:nevents=10 events.root) add_test(NAME jana-example-timeslices-complex-tests - COMMAND ${CMAKE_INSTALL_PREFIX}/bin/jana -Pplugins=TimesliceExample -Pjana:nevents=10 timeslices.root) + COMMAND ${CMAKE_INSTALL_PREFIX}/bin/jana -Pplugins=TimesliceExample -Puse_timeslices=1 -Pjana:nevents=10 timeslices.root) else() message(STATUS "Skipping examples/TimesliceExample because USE_PODIO=Off") diff --git a/src/examples/misc/TimesliceExample/TimesliceExample.cc b/src/examples/misc/TimesliceExample/TimesliceExample.cc index 587d8bbab..f64d6164f 100644 --- a/src/examples/misc/TimesliceExample/TimesliceExample.cc +++ b/src/examples/misc/TimesliceExample/TimesliceExample.cc @@ -26,23 +26,28 @@ void InitPlugin(JApplication *app) { // Event processor that writes events (and timeslices, if they are present) to file app->Add(new MyFileWriter()); - // Unfolder that takes timeslices and splits them into physics events. - app->Add(new MyTimesliceSplitter()); - - // Factory that produces timeslice-level protoclusters from timeslice-level hits - app->Add(new JOmniFactoryGeneratorT( - { .tag = "timeslice_protoclusterizer", - .level = JEventLevel::Timeslice, - .input_names = {"hits"}, - .output_names = {"ts_protoclusters"} - })); - - // Factory that produces event-level protoclusters from event-level hits - app->Add(new JOmniFactoryGeneratorT( - { .tag = "event_protoclusterizer", - .input_names = {"hits"}, - .output_names = {"evt_protoclusters"}} - )); + bool use_timeslices = app->RegisterParameter("use_timeslices", true); + + if (use_timeslices) { + // Unfolder that takes timeslices and splits them into physics events. + app->Add(new MyTimesliceSplitter()); + + // Factory that produces timeslice-level protoclusters from timeslice-level hits + app->Add(new JOmniFactoryGeneratorT( + { .tag = "timeslice_protoclusterizer", + .level = JEventLevel::Timeslice, + .input_names = {"hits"}, + .output_names = {"ts_protoclusters"} + })); + } + else { + // Factory that produces event-level protoclusters from event-level hits + app->Add(new JOmniFactoryGeneratorT( + { .tag = "event_protoclusterizer", + .input_names = {"hits"}, + .output_names = {"evt_protoclusters"}} + )); + } // Factory that produces event-level clusters from event-level protoclusters app->Add(new JOmniFactoryGeneratorT( diff --git a/src/libraries/JANA/Components/JOmniFactory.h b/src/libraries/JANA/Components/JOmniFactory.h index 1e5f4c32d..112a889b9 100644 --- a/src/libraries/JANA/Components/JOmniFactory.h +++ b/src/libraries/JANA/Components/JOmniFactory.h @@ -43,10 +43,9 @@ class JOmniFactory : public JFactory { } // This is more hackery to suppress the overloaded-virtual warning - // Has to virtual simply because EICrecon already declares it as override + // Has to be virtual simply because EICrecon already declares it as override using JFactory::ChangeRun; - void ChangeRun(int32_t) {}; - + virtual void ChangeRun(int32_t) {}; using ConfigType = ConfigT; diff --git a/src/libraries/JANA/JApplication.cc b/src/libraries/JANA/JApplication.cc index a4a83ec1b..be319d56d 100644 --- a/src/libraries/JANA/JApplication.cc +++ b/src/libraries/JANA/JApplication.cc @@ -71,33 +71,38 @@ void JApplication::AddPluginPath(std::string path) { void JApplication::Add(JEventSource* event_source) { /// Adds the given JEventSource to the JANA context. Ownership is passed to JComponentManager. - m_component_manager->add(event_source); + m_component_manager->Add(event_source); } void JApplication::Add(JEventSourceGenerator *source_generator) { /// Adds the given JEventSourceGenerator to the JANA context. Ownership is passed to JComponentManager. - m_component_manager->add(source_generator); + m_component_manager->Add(source_generator); } void JApplication::Add(JFactoryGenerator *factory_generator) { /// Adds the given JFactoryGenerator to the JANA context. Ownership is passed to JComponentManager. - m_component_manager->add(factory_generator); + m_component_manager->Add(factory_generator); } void JApplication::Add(JEventProcessor* processor) { /// Adds the given JEventProcessor to the JANA context. Ownership is passed to JComponentManager. - m_component_manager->add(processor); + m_component_manager->Add(processor); } void JApplication::Add(std::string event_source_name) { /// Adds the event source name (e.g. a file or socket name) to the JANA context. JANA will instantiate /// the corresponding JEventSource using a user-provided JEventSourceGenerator. - m_component_manager->add(event_source_name); + m_component_manager->Add(event_source_name); } void JApplication::Add(JEventUnfolder* unfolder) { /// Adds the given JEventUnfolder to the JANA context. Ownership is passed to JComponentManager. - m_component_manager->add(unfolder); + m_component_manager->Add(unfolder); +} + +void JApplication::Add(JEventFolder* folder) { + /// Adds the given JEventFolder to the JANA context. Ownership is passed to JComponentManager. + m_component_manager->Add(folder); } @@ -115,10 +120,6 @@ void JApplication::Initialize() { // We trigger initialization m_service_locator->get(); - auto component_manager = m_service_locator->get(); - auto plugin_loader = m_service_locator->get(); - auto topology_builder = m_service_locator->get(); - auto wiring_service = m_service_locator->get(); // Set logger on JApplication itself m_logger = m_params->GetLogger("jana"); @@ -136,6 +137,10 @@ void JApplication::Initialize() { JVersion::PrintVersionDescription(oss); LOG_INFO(m_logger) << oss.str() << LOG_END; } + auto component_manager = m_service_locator->get(); + auto plugin_loader = m_service_locator->get(); + auto topology_builder = m_service_locator->get(); + auto wiring_service = m_service_locator->get(); // Attach all plugins for (const auto& plugin_name : wiring_service->GetPluginNames()) { @@ -144,7 +149,7 @@ void JApplication::Initialize() { plugin_loader->attach_plugins(component_manager.get()); // Resolve and initialize all components - component_manager->configure_components(); + component_manager->ConfigureComponents(); // Once all components have been wired in jcm::configure_components(), there shouldn't be any unused wirings wiring_service->CheckAllWiringsAreUsed(); @@ -297,7 +302,7 @@ int JApplication::GetExitCode() { const JComponentSummary& JApplication::GetComponentSummary() { /// Returns a data object describing all components currently running - return m_component_manager->get_component_summary(); + return m_component_manager->GetComponentSummary(); } // Performance/status monitoring diff --git a/src/libraries/JANA/JApplicationFwd.h b/src/libraries/JANA/JApplicationFwd.h index 6cac6e403..c2d93f62d 100644 --- a/src/libraries/JANA/JApplicationFwd.h +++ b/src/libraries/JANA/JApplicationFwd.h @@ -19,6 +19,7 @@ class JComponentManager; class JPluginLoader; class JExecutionEngine; class JEventUnfolder; +class JEventFolder; class JServiceLocator; class JParameter; class JParameterManager; @@ -66,6 +67,7 @@ class JApplication { void Add(JEventSource* event_source); void Add(JEventProcessor* processor); void Add(JEventUnfolder* unfolder); + void Add(JEventFolder* folder); // Controlling processing diff --git a/src/libraries/JANA/JEvent.cc b/src/libraries/JANA/JEvent.cc index 8e4c69e4f..d8086cf83 100644 --- a/src/libraries/JANA/JEvent.cc +++ b/src/libraries/JANA/JEvent.cc @@ -12,7 +12,7 @@ JEvent::JEvent() : mInspector(this){ JEvent::JEvent(JApplication* app) : mInspector(this) { // Furnish the JEvent with the parameter values and factory generators provided to the JApplication app->Initialize(); - app->GetService()->configure_event(*this); + app->GetService()->ConfigureEvent(*this); } JEvent::~JEvent() { @@ -144,19 +144,6 @@ std::vector JEvent::ReleaseAllParents() { return released_parents; } -void JEvent::TakeRefToSelf() { - mReferenceCount++; -} - -int JEvent::ReleaseRefToSelf() { - int remaining_refs = mReferenceCount.fetch_sub(1); - remaining_refs -= 1; // fetch_sub post increments - if (remaining_refs < 0) { - throw JException("JEvent's own refcount has gone negative!"); - } - return remaining_refs; -} - int JEvent::GetChildCount() { return mReferenceCount; } diff --git a/src/libraries/JANA/JEvent.h b/src/libraries/JANA/JEvent.h index 75e1c367c..2ab8e7241 100644 --- a/src/libraries/JANA/JEvent.h +++ b/src/libraries/JANA/JEvent.h @@ -96,9 +96,6 @@ class JEvent : public std::enable_shared_from_this { uint64_t GetParentNumber(JEventLevel level) const; void SetParentNumber(JEventLevel level, uint64_t number); - void TakeRefToSelf(); - int ReleaseRefToSelf(); - // Lifecycle void Clear(bool processed_successfully=true); void Finish(); diff --git a/src/libraries/JANA/JEventSource.cc b/src/libraries/JANA/JEventSource.cc index bd5429982..f3790fd35 100644 --- a/src/libraries/JANA/JEventSource.cc +++ b/src/libraries/JANA/JEventSource.cc @@ -175,13 +175,8 @@ JEventSource::Result JEventSource::DoNext(std::shared_ptr event) { DoClose(false); return Result::FailureFinished; } - else if (result == Result::FailureTryAgain) { - // We end up here if we tried to read an entry in a file but it is on a tape drive and isn't ready yet - // or if we polled the socket, found no new messages, but still expect messages later - return Result::FailureTryAgain; - } else { - throw JException("Invalid JEventSource::Result value!"); + return result; } } else { // status == Closed diff --git a/src/libraries/JANA/JEventSource.h b/src/libraries/JANA/JEventSource.h index fab50c664..03fa2a4b1 100644 --- a/src/libraries/JANA/JEventSource.h +++ b/src/libraries/JANA/JEventSource.h @@ -41,7 +41,7 @@ class JEventSource : public jana::components::JComponent, bool m_enable_process_parallel = false; Status m_status = Status::Unopened; - std::vector m_event_levels; + std::vector m_parent_levels; JEventLevel m_next_level = JEventLevel::None; @@ -148,7 +148,7 @@ class JEventSource : public jana::components::JComponent, uint64_t GetSkippedEventCount() const { return m_events_skipped; }; uint64_t GetProcessedEventCount() const { return m_events_processed; }; - const std::vector GetEventLevels() { return m_event_levels; } + const std::vector GetParentLevels() { return m_parent_levels; } bool IsGetObjectsEnabled() const { return m_enable_get_objects; } bool IsFinishEventEnabled() const { return m_enable_finish_event; } @@ -181,7 +181,7 @@ class JEventSource : public jana::components::JComponent, void SetNSkip(uint64_t nskip) { m_nskip = nskip; }; void SetNextEventLevel(JEventLevel level) { m_next_level = level; } - void SetEventLevels(std::vector levels) { m_event_levels = levels; } + void SetParentLevels(std::vector levels) { m_parent_levels = levels; } JEventLevel GetNextInputLevel() const { return m_next_level; } diff --git a/src/libraries/JANA/Services/JComponentManager.cc b/src/libraries/JANA/Services/JComponentManager.cc index 21c0b774e..77f294b8d 100644 --- a/src/libraries/JANA/Services/JComponentManager.cc +++ b/src/libraries/JANA/Services/JComponentManager.cc @@ -8,6 +8,7 @@ #include #include #include +#include #include JComponentManager::JComponentManager() { @@ -31,6 +32,10 @@ JComponentManager::~JComponentManager() { LOG_TRACE(GetLogger()) << "Destroying unfolder with type=" << unfolder->GetTypeName(); delete unfolder; } + for (auto* folder : m_folders) { + LOG_TRACE(GetLogger()) << "Destroying folder with type=" << folder->GetTypeName(); + delete folder; + } // The order of deletion here sadly matters, because GlueX likes to fill // histograms inside component destructors. Thus event processors must be destroyed after @@ -52,7 +57,7 @@ void JComponentManager::Init() { // We handle parameters in configure_components() instead. } -void JComponentManager::configure_components() { +void JComponentManager::ConfigureComponents() { m_params->SetDefaultParameter("event_source_type", m_user_evt_src_typename, "Manually specifies which JEventSource should open the input file"); m_params->SetDefaultParameter("record_call_stack", @@ -76,17 +81,17 @@ void JComponentManager::configure_components() { } // Give all components a JApplication pointer and a logger - preinitialize_components(); + PreinitializeComponents(); // Resolve all event sources now that all plugins have been loaded // (Note that we need to wire the event source generators beforehand) - resolve_event_sources(); + ResolveEventSources(); // Call Summarize() and Init() in order to populate JComponentSummary and JParameterManager, respectively - initialize_components(); + InitializeComponents(); } -void JComponentManager::preinitialize_components() { +void JComponentManager::PreinitializeComponents() { for (auto* src : m_evt_srces) { src->Wire(GetApplication()); } @@ -104,9 +109,12 @@ void JComponentManager::preinitialize_components() { for (auto* unfolder : m_unfolders) { unfolder->Wire(GetApplication()); } + for (auto* folder : m_folders) { + folder->Wire(GetApplication()); + } } -void JComponentManager::initialize_components() { +void JComponentManager::InitializeComponents() { // For now, this only computes the summary for all components except factories. // However, we are likely to eventually want summaries to access information only // available after component initialization, specifically parameters. In this case, @@ -128,6 +136,11 @@ void JComponentManager::initialize_components() { unfolder->Summarize(m_summary); } + // Unfolders + for (auto * folder : m_folders) { + folder->Summarize(m_summary); + } + JFactorySet dummy_fac_set; for (auto* fac_gen : m_fac_gens) { // TODO: Get rid of this @@ -152,41 +165,46 @@ void JComponentManager::initialize_components() { } } -void JComponentManager::next_plugin(std::string plugin_name) { +void JComponentManager::NextPlugin(std::string plugin_name) { // We defer resolving event sources until we have finished loading all plugins m_current_plugin_name = plugin_name; } -void JComponentManager::add(std::string event_source_name) { +void JComponentManager::Add(std::string event_source_name) { m_src_names.push_back(event_source_name); } -void JComponentManager::add(JEventSourceGenerator *source_generator) { +void JComponentManager::Add(JEventSourceGenerator *source_generator) { source_generator->SetPluginName(m_current_plugin_name); m_src_gens.push_back(source_generator); } -void JComponentManager::add(JFactoryGenerator *factory_generator) { +void JComponentManager::Add(JFactoryGenerator *factory_generator) { factory_generator->SetPluginName(m_current_plugin_name); m_fac_gens.push_back(factory_generator); } -void JComponentManager::add(JEventSource *event_source) { +void JComponentManager::Add(JEventSource *event_source) { event_source->SetPluginName(m_current_plugin_name); m_evt_srces.push_back(event_source); } -void JComponentManager::add(JEventProcessor *processor) { +void JComponentManager::Add(JEventProcessor *processor) { processor->SetPluginName(m_current_plugin_name); m_evt_procs.push_back(processor); } -void JComponentManager::add(JEventUnfolder* unfolder) { +void JComponentManager::Add(JEventUnfolder* unfolder) { unfolder->SetPluginName(m_current_plugin_name); m_unfolders.push_back(unfolder); } -void JComponentManager::configure_event(JEvent& event) { +void JComponentManager::Add(JEventFolder* folder) { + folder->SetPluginName(m_current_plugin_name); + m_folders.push_back(folder); +} + +void JComponentManager::ConfigureEvent(JEvent& event) { auto* factory_set = event.GetFactorySet(); for (auto gen : m_fac_gens) { gen->GenerateFactories(factory_set); @@ -198,11 +216,11 @@ void JComponentManager::configure_event(JEvent& event) { -void JComponentManager::resolve_event_sources() { +void JComponentManager::ResolveEventSources() { - m_user_evt_src_gen = resolve_user_event_source_generator(); + m_user_evt_src_gen = ResolveUserEventSourceGenerator(); for (auto& source_name : m_src_names) { - auto* generator = resolve_event_source(source_name); + auto* generator = ResolveEventSource(source_name); auto source = generator->MakeJEventSource(source_name); source->SetPluginName(generator->GetPluginName()); source->Wire(GetApplication()); @@ -218,7 +236,7 @@ void JComponentManager::resolve_event_sources() { } } -JEventSourceGenerator *JComponentManager::resolve_event_source(std::string source_name) const { +JEventSourceGenerator *JComponentManager::ResolveEventSource(std::string source_name) const { // Always use the user override if they provided one if (m_user_evt_src_gen != nullptr) { @@ -246,7 +264,7 @@ JEventSourceGenerator *JComponentManager::resolve_event_source(std::string sourc } -JEventSourceGenerator* JComponentManager::resolve_user_event_source_generator() const { +JEventSourceGenerator* JComponentManager::ResolveUserEventSourceGenerator() const { // If the user didn't specify an EVENT_SOURCE_TYPE, do nothing if (m_user_evt_src_typename == "") return nullptr; @@ -280,19 +298,31 @@ std::vector& JComponentManager::get_evt_srces() { return m_evt_srces; } -std::vector& JComponentManager::get_evt_procs() { +std::vector& JComponentManager::GetSourceGenerators() { + return m_src_gens; +} + +std::vector& JComponentManager::GetSources() { + return m_evt_srces; +} + +std::vector& JComponentManager::GetProcessors() { return m_evt_procs; } -std::vector& JComponentManager::get_fac_gens() { +std::vector& JComponentManager::GetFactoryGenerators() { return m_fac_gens; } -std::vector& JComponentManager::get_unfolders() { +std::vector& JComponentManager::GetUnfolders() { return m_unfolders; } -const JComponentSummary& JComponentManager::get_component_summary() { +std::vector& JComponentManager::GetFolders() { + return m_folders; +} + +const JComponentSummary& JComponentManager::GetComponentSummary() { return m_summary; } diff --git a/src/libraries/JANA/Services/JComponentManager.h b/src/libraries/JANA/Services/JComponentManager.h index 3f6a501e7..b108da38a 100644 --- a/src/libraries/JANA/Services/JComponentManager.h +++ b/src/libraries/JANA/Services/JComponentManager.h @@ -14,6 +14,7 @@ class JEventProcessor; class JEventUnfolder; +class JEventFolder; class JComponentManager : public JService { public: @@ -23,35 +24,42 @@ class JComponentManager : public JService { void Init() override; // Called during plugin loading - void next_plugin(std::string plugin_name); + void NextPlugin(std::string plugin_name); - void add(std::string event_source_name); - void add(JEventSourceGenerator* source_generator); - void add(JFactoryGenerator* factory_generator); - void add(JEventSource* event_source); - void add(JEventProcessor* processor); - void add(JEventUnfolder* unfolder); + void Add(std::string event_source_name); + void Add(JEventSourceGenerator* source_generator); + void Add(JFactoryGenerator* factory_generator); + void Add(JEventSource* event_source); + void Add(JEventProcessor* processor); + void Add(JEventUnfolder* unfolder); + void Add(JEventFolder* folder); // Called after plugin loading - void configure_components(); + void ConfigureComponents(); // Helpers - void preinitialize_components(); - void resolve_event_sources(); - void initialize_components(); - JEventSourceGenerator* resolve_user_event_source_generator() const; - JEventSourceGenerator* resolve_event_source(std::string source_name) const; + void PreinitializeComponents(); + void ResolveEventSources(); + void InitializeComponents(); + JEventSourceGenerator* ResolveUserEventSourceGenerator() const; + JEventSourceGenerator* ResolveEventSource(std::string source_name) const; // Called after JApplication::Initialize() finishes - const JComponentSummary& get_component_summary(); + const JComponentSummary& GetComponentSummary(); + [[deprecated]] std::vector& get_evt_src_gens(); + [[deprecated]] std::vector& get_evt_srces(); - std::vector& get_evt_procs(); - std::vector& get_fac_gens(); - std::vector& get_unfolders(); - void configure_event(JEvent& event); + std::vector& GetSourceGenerators(); + std::vector& GetSources(); + std::vector& GetProcessors(); + std::vector& GetFactoryGenerators(); + std::vector& GetUnfolders(); + std::vector& GetFolders(); + + void ConfigureEvent(JEvent& event); private: @@ -64,6 +72,7 @@ class JComponentManager : public JService { std::vector m_evt_srces; std::vector m_evt_procs; std::vector m_unfolders; + std::vector m_folders; std::map m_default_tags; bool m_enable_call_graph_recording = false; diff --git a/src/libraries/JANA/Services/JPluginLoader.cc b/src/libraries/JANA/Services/JPluginLoader.cc index 1ea7b38a1..568ec9839 100644 --- a/src/libraries/JANA/Services/JPluginLoader.cc +++ b/src/libraries/JANA/Services/JPluginLoader.cc @@ -156,7 +156,7 @@ void JPluginLoader::attach_plugins(JComponentManager* jcm) { // see an extremely difficult-to-debug error (usually a segfault) stemming from the binary incompatibility // between the host application and the plugin. - jcm->next_plugin(name); + jcm->NextPlugin(name); attach_plugin(name, path); // Throws JException on failure } } diff --git a/src/libraries/JANA/Topology/JArrow.cc b/src/libraries/JANA/Topology/JArrow.cc index 5783c92aa..864c7b18f 100644 --- a/src/libraries/JANA/Topology/JArrow.cc +++ b/src/libraries/JANA/Topology/JArrow.cc @@ -2,7 +2,7 @@ #include -JArrow::Port& JArrow::AddPort(std::string name, JEventLevel level) { +JArrow::Port& JArrow::AddPort(std::string name, JEventLevel level, PortDirection direction) { if (m_port_lookup.find(name) != m_port_lookup.end()) { throw JException("Port with name '%s' already exists", name.c_str()); } @@ -10,6 +10,16 @@ JArrow::Port& JArrow::AddPort(std::string name, JEventLevel level) { auto port_raw_ptr = port.get(); m_ports.push_back(std::move(port)); m_port_lookup[name] = m_ports.size()-1; + + auto it = m_auto_port_lookup.find({level, direction}); + if (it != m_auto_port_lookup.end()) { + // There's a conflict! Multiple ports with the same level, direction + // Handle this case by disabling the lookup + it->second = -1; + } + else { + m_auto_port_lookup[{level, direction}] = m_ports.size()-1; + } return *port_raw_ptr; } @@ -43,7 +53,7 @@ void JArrow::Push(OutputData& outputs, size_t output_count, size_t location_id) port.GetPool()->Ingest(event, location_id); } else { - throw JException("Arrow %s: Port %d not wired!", m_name.c_str(), port_index); + throw JException("Arrow %s: Port %s not wired!", m_name.c_str(), port.GetName().c_str()); } } } @@ -91,5 +101,42 @@ std::string ToString(JArrow::FireResult r) { } } +std::string ToString(JArrow::PortDirection d) { + switch (d) { + case JArrow::PortDirection::In: return "In"; + case JArrow::PortDirection::Out: return "Out"; + default: return "Unknown"; + } +} + +int JArrow::GetPortIndex(JEventLevel level, PortDirection direction) { + auto it = m_auto_port_lookup.find({level, direction}); + if (it == m_auto_port_lookup.end()) { + throw JException("Unable to find port with (level=%s, direction=%s) on arrow '%s'", + toString(level).c_str(), + ToString(direction).c_str(), + GetName().c_str()); + } + else if (it->second == -1) { + throw JException("Ambiguous port with (level=%s, direction=%s) on arrow '%s'", + toString(level).c_str(), + ToString(direction).c_str(), + GetName().c_str()); + } + return it->second; +} + +int JArrow::GetPortIndex(const std::string& port_name) { + auto it = m_port_lookup.find(port_name); + if (it == m_port_lookup.end()) { + LOG_FATAL(GetLogger()) << "Unable to find port_name '" << port_name << "' on arrow '" << GetName() << "'. Valid port names are:"; + for (auto& port : m_ports) { + LOG_FATAL(GetLogger()) << " " << port->GetName(); + } + throw JException("Unable to find port_name '%s' on arrow '%s'", port_name.c_str(), GetName().c_str()); + } + return it->second; +} + diff --git a/src/libraries/JANA/Topology/JArrow.h b/src/libraries/JANA/Topology/JArrow.h index 2cc4747dc..fcf00d973 100644 --- a/src/libraries/JANA/Topology/JArrow.h +++ b/src/libraries/JANA/Topology/JArrow.h @@ -18,6 +18,7 @@ class JArrow { public: using OutputData = std::array, 2>; enum class FireResult {NotRunYet, KeepGoing, ComeBackLater, Finished}; + enum class PortDirection { In, Out }; class Port { std::string m_name; @@ -72,6 +73,7 @@ class JArrow { private: std::string m_name; // Used for human understanding + int m_id; // Used internally bool m_is_parallel = false; // Whether or not it is safe to parallelize bool m_is_source = false; // Whether or not this arrow should activate/drain the topology bool m_is_sink = false; // Whether or not tnis arrow contributes to the final event count @@ -83,6 +85,7 @@ class JArrow { clock_t::time_point m_next_visit_time=clock_t::now(); std::vector> m_ports; std::map m_port_lookup; + std::map, int> m_auto_port_lookup; JLogger m_logger; public: @@ -105,6 +108,7 @@ class JArrow { const std::string& GetName() { return m_name; } + int GetId() { return m_id; } JLogger& GetLogger() { return m_logger; } bool IsParallel() { return m_is_parallel; } bool IsSource() { return m_is_source; } @@ -112,28 +116,22 @@ class JArrow { int GetNextPortIndex() { return m_next_input_port; } void SetName(std::string name) { m_name = name; } + void SetId(int id) { m_id = id; } void SetLogger(JLogger logger) { m_logger = logger; } void SetIsParallel(bool is_parallel) { m_is_parallel = is_parallel; } void SetIsSource(bool is_source) { m_is_source = is_source; } void SetIsSink(bool is_sink) { m_is_sink = is_sink; } - Port& AddPort(std::string port_name, JEventLevel level); - Port& AddPort(std::string port_name, std::vector levels); + Port& AddPort(std::string port_name, JEventLevel level, PortDirection direction); Port& GetPort(size_t port_index) { return *m_ports.at(port_index); } - int GetPortIndex(const std::string& port_name) { - auto it = m_port_lookup.find(port_name); - if (it == m_port_lookup.end()) { - LOG_FATAL(GetLogger()) << "Unable to find port_name '" << port_name << "' on arrow '" << GetName() << "'. Valid port names are:"; - for (auto& port : m_ports) { - LOG_FATAL(GetLogger()) << " " << port->GetName(); - } - throw JException("Unable to find port_name '%s' on arrow '%s'", port_name.c_str(), GetName().c_str()); - } - return it->second; - } + Port& GetPort(JEventLevel level, PortDirection direction) { return *m_ports.at(m_auto_port_lookup.at({level, direction})); } + + int GetPortIndex(JEventLevel level, PortDirection direction); + int GetPortIndex(const std::string& port_name); void SetNextPortIndex(int input_port) { m_next_input_port = input_port; } }; std::string ToString(JArrow::FireResult r); +std::string ToString(JArrow::PortDirection d); diff --git a/src/libraries/JANA/Topology/JEventPool.cc b/src/libraries/JANA/Topology/JEventPool.cc index fc0d5442e..9389960c9 100644 --- a/src/libraries/JANA/Topology/JEventPool.cc +++ b/src/libraries/JANA/Topology/JEventPool.cc @@ -22,7 +22,7 @@ JEventPool::JEventPool(std::shared_ptr component_manager, m_owned_events.push_back(std::make_shared()); auto evt = &m_owned_events.back(); (*evt)->SetLevel(m_level); // Level needs to be set before factories get added in configure_event - m_component_manager->configure_event(**evt); + m_component_manager->ConfigureEvent(**evt); Push(evt->get(), evt_idx % location_count); } } @@ -59,7 +59,7 @@ void JEventPool::Scale(size_t capacity) { m_owned_events.push_back(std::make_shared()); auto evt = &m_owned_events.back(); (*evt)->SetLevel(m_level); // Level needs to be set before factories get added in configure_event - m_component_manager->configure_event(**evt); + m_component_manager->ConfigureEvent(**evt); Push(evt->get(), evt_idx % GetLocationCount()); } } diff --git a/src/libraries/JANA/Topology/JFoldArrow.h b/src/libraries/JANA/Topology/JFoldArrow.h index 71d729b89..ac167a197 100644 --- a/src/libraries/JANA/Topology/JFoldArrow.h +++ b/src/libraries/JANA/Topology/JFoldArrow.h @@ -26,9 +26,9 @@ class JFoldArrow : public JArrow { m_child_level(child_level) { SetName(name); - AddPort("child_in", child_level).SetEnforcesOrdering(true); - AddPort("child_out", child_level); - AddPort("parent_out", parent_level); + AddPort("child_in", child_level, PortDirection::In).SetEnforcesOrdering(true); + AddPort("child_out", child_level, PortDirection::Out); + AddPort("parent_out", parent_level, PortDirection::Out); m_next_input_port = CHILD_IN; } diff --git a/src/libraries/JANA/Topology/JMapArrow.cc b/src/libraries/JANA/Topology/JMapArrow.cc index 2dcbae4dd..ccfc643d7 100644 --- a/src/libraries/JANA/Topology/JMapArrow.cc +++ b/src/libraries/JANA/Topology/JMapArrow.cc @@ -13,18 +13,22 @@ JMapArrow::JMapArrow(std::string name, JEventLevel level) { SetName(name); SetIsParallel(true); - AddPort("in", level); - AddPort("out", level); + AddPort("in", level, PortDirection::In); + AddPort("out", level, PortDirection::Out); } -void JMapArrow::AddSource(JEventSource* source) { - m_sources.push_back(source); +void JMapArrow::SetParallelSource(bool is_parallel) { + m_parallel_source = is_parallel; } void JMapArrow::AddUnfolder(JEventUnfolder* unfolder) { m_unfolders.push_back(unfolder); } +void JMapArrow::AddFolder(JEventFolder* folder) { + m_folders.push_back(folder); +} + void JMapArrow::AddProcessor(JEventProcessor* processor) { m_procs.push_back(processor); } @@ -32,9 +36,12 @@ void JMapArrow::AddProcessor(JEventProcessor* processor) { void JMapArrow::Fire(JEvent* event, OutputData& outputs, size_t& output_count, JArrow::FireResult& status) { LOG_DEBUG(m_logger) << "Executing arrow " << GetName() << " for event# " << event->GetEventNumber() << LOG_END; - for (JEventSource* source : m_sources) { - JCallGraphEntryMaker cg_entry(*event->GetJCallGraphRecorder(), source->GetTypeName()); // times execution until this goes out of scope - source->ProcessParallel(*event); + if (m_parallel_source) { + auto* source = event->GetJEventSource(); + if (source != nullptr) { + JCallGraphEntryMaker cg_entry(*event->GetJCallGraphRecorder(), source->GetTypeName()); // times execution until this goes out of scope + event->GetJEventSource()->ProcessParallel(*event); + } } for (JEventUnfolder* unfolder : m_unfolders) { JCallGraphEntryMaker cg_entry(*event->GetJCallGraphRecorder(), unfolder->GetTypeName()); // times execution until this goes out of scope diff --git a/src/libraries/JANA/Topology/JMapArrow.h b/src/libraries/JANA/Topology/JMapArrow.h index 72c7e1b21..ea996015f 100644 --- a/src/libraries/JANA/Topology/JMapArrow.h +++ b/src/libraries/JANA/Topology/JMapArrow.h @@ -8,6 +8,7 @@ class JEventPool; class JEventSource; class JEventUnfolder; +class JEventFolder; class JEventProcessor; class JEvent; @@ -18,15 +19,17 @@ class JMapArrow : public JArrow { enum PortIndex {EVENT_IN=0, EVENT_OUT=1}; private: - std::vector m_sources; + bool m_parallel_source = false; std::vector m_unfolders; + std::vector m_folders; std::vector m_procs; public: JMapArrow(std::string name, JEventLevel level); - void AddSource(JEventSource* source); + void SetParallelSource(bool is_parallel); void AddUnfolder(JEventUnfolder* unfolder); + void AddFolder(JEventFolder* folder); void AddProcessor(JEventProcessor* proc); void Fire(JEvent* input, OutputData& outputs, size_t& output_count, JArrow::FireResult& status); diff --git a/src/libraries/JANA/Topology/JMultilevelSourceArrow.cc b/src/libraries/JANA/Topology/JMultilevelSourceArrow.cc index 82abea323..6be36f353 100644 --- a/src/libraries/JANA/Topology/JMultilevelSourceArrow.cc +++ b/src/libraries/JANA/Topology/JMultilevelSourceArrow.cc @@ -4,21 +4,17 @@ #include -void JMultilevelSourceArrow::SetEventSource(JEventSource* source) { +JMultilevelSourceArrow::JMultilevelSourceArrow(const std::string& name, JEventSource* source) { + SetIsSource(true); + SetName(name); m_source = source; - m_levels = source->GetEventLevels(); - m_child_event_level = m_levels.back(); + m_levels = source->GetParentLevels(); + m_child_event_level = source->GetLevel(); + m_levels.push_back(m_child_event_level); m_next_input_port = 0; - - size_t input_port_count = 0; - size_t output_port_count = 0; - for (auto level : m_levels) { - AddPort(toString(level) + "In", level).SetSkipFinishEvent(true); - m_port_lookup[{level, Direction::In}] = input_port_count++; - } for (auto level : m_levels) { - AddPort(toString(level) + "Out", level); - m_port_lookup[{level, Direction::Out}] = input_port_count + output_port_count++; + AddPort(toString(level) + "In", level, PortDirection::In).SetSkipFinishEvent(true); + AddPort(toString(level) + "Out", level, PortDirection::Out); } } @@ -26,10 +22,6 @@ const std::vector& JMultilevelSourceArrow::GetLevels() const { return m_levels; } -size_t JMultilevelSourceArrow::GetPortIndex(JEventLevel level, Direction direction) const { - return m_port_lookup.at({level, direction}); -}; - void JMultilevelSourceArrow::Initialize() { // We initialize everything immediately, but don't open any resources until we absolutely have to; see process(): source->DoNext() m_source->DoInit(); @@ -51,7 +43,7 @@ void JMultilevelSourceArrow::EvictNextParent(OutputData& outputs, size_t& output if (it != m_pending_parents.end()) { if (it->second.first != nullptr) { // There IS an old parent - size_t parent_output_port = GetPortIndex(m_next_input_level, Direction::Out); + size_t parent_output_port = GetPortIndex(m_next_input_level, PortDirection::Out); LOG_DEBUG(GetLogger()) << "JMultilevelSourceArrow: Evicting parent " << it->second.first->GetEventStamp() << " to port " << parent_output_port; outputs.at(output_count++) = {it->second.first, parent_output_port}; it->second.first = nullptr; @@ -66,7 +58,7 @@ void JMultilevelSourceArrow::Fire(JEvent* input, OutputData& outputs, size_t& ou LOG_DEBUG(m_logger) << "Executing arrow " << GetName() << LOG_END; auto result = m_source->DoNext(input->shared_from_this()); m_next_input_level = m_source->GetNextInputLevel(); - m_next_input_port = GetPortIndex(m_next_input_level, Direction::In); + m_next_input_port = GetPortIndex(m_next_input_level, PortDirection::In); LOG_DEBUG(GetLogger()) << "JMultilevelSourceArrow: Returned from DoNext(" << toString(input->GetLevel()) << "). Next input level is " << toString(m_next_input_level); @@ -84,7 +76,7 @@ void JMultilevelSourceArrow::Fire(JEvent* input, OutputData& outputs, size_t& ou input->SetParent(parent_pair.first); } } - outputs.at(output_count++) = {input, GetPortIndex(m_child_event_level, Direction::Out)}; + outputs.at(output_count++) = {input, GetPortIndex(m_child_event_level, PortDirection::Out)}; if (m_next_input_level != m_child_event_level) { // We have to evict the parent AFTER the successful child because the child still needs the references to that parent @@ -124,7 +116,7 @@ void JMultilevelSourceArrow::Fire(JEvent* input, OutputData& outputs, size_t& ou EvictNextParent(outputs, output_count); } // Return this event to the pool with no further action - outputs.at(output_count++) = {input, GetPortIndex(input->GetLevel(), Direction::In)}; + outputs.at(output_count++) = {input, GetPortIndex(input->GetLevel(), PortDirection::In)}; status = JArrow::FireResult::ComeBackLater; return; } @@ -133,14 +125,14 @@ void JMultilevelSourceArrow::Fire(JEvent* input, OutputData& outputs, size_t& ou EvictNextParent(outputs, output_count); } // Return this input event to the pool - outputs.at(output_count++) = {input, GetPortIndex(input->GetLevel(), Direction::In)}; + outputs.at(output_count++) = {input, GetPortIndex(input->GetLevel(), PortDirection::In)}; status = JArrow::FireResult::KeepGoing; return; } else if (result == JEventSource::Result::FailureFinished) { // Return this input event to the pool - outputs.at(output_count++) = {input, GetPortIndex(input->GetLevel(), Direction::In)}; + outputs.at(output_count++) = {input, GetPortIndex(input->GetLevel(), PortDirection::In)}; m_finish_in_progress = true; // Fall-through to if (finish_in_progress) below } @@ -155,7 +147,7 @@ void JMultilevelSourceArrow::Fire(JEvent* input, OutputData& outputs, size_t& ou // Found a parent auto parent = it->second.first; if (parent != nullptr) { - outputs.at(output_count++) = {parent, GetPortIndex(parent->GetLevel(), Direction::Out)}; + outputs.at(output_count++) = {parent, GetPortIndex(parent->GetLevel(), PortDirection::Out)}; } m_pending_parents.erase(it); } diff --git a/src/libraries/JANA/Topology/JMultilevelSourceArrow.h b/src/libraries/JANA/Topology/JMultilevelSourceArrow.h index 7f2bbb509..c2d828c1d 100644 --- a/src/libraries/JANA/Topology/JMultilevelSourceArrow.h +++ b/src/libraries/JANA/Topology/JMultilevelSourceArrow.h @@ -3,14 +3,10 @@ class JMultilevelSourceArrow : public JArrow { -public: - enum class Direction { In, Out }; private: JEventSource* m_source = nullptr; - std::vector m_levels; - std::map, size_t> m_port_lookup; JEventLevel m_child_event_level = JEventLevel::None; JEventLevel m_next_input_level; @@ -21,14 +17,8 @@ class JMultilevelSourceArrow : public JArrow { void EvictNextParent(OutputData& outputs, size_t& output_count); public: - JMultilevelSourceArrow() { - SetIsSource(true); - } - + JMultilevelSourceArrow(const std::string& arrow_name, JEventSource* source); const std::vector& GetLevels() const; - size_t GetPortIndex(JEventLevel level, Direction direction) const; - - void SetEventSource(JEventSource* source); void Initialize() override; void Fire(JEvent* input, OutputData& outputs, size_t& output_count, JArrow::FireResult& status) override; diff --git a/src/libraries/JANA/Topology/JSourceArrow.cc b/src/libraries/JANA/Topology/JSourceArrow.cc index a9ae317db..c22f904a5 100644 --- a/src/libraries/JANA/Topology/JSourceArrow.cc +++ b/src/libraries/JANA/Topology/JSourceArrow.cc @@ -13,8 +13,8 @@ JSourceArrow::JSourceArrow(std::string name, JEventLevel level, std::vector #include +#include "JANA/Topology/JArrow.h" #include "JANA/Utils/JEventLevel.h" #include "JSourceArrow.h" +#include "JMultilevelSourceArrow.h" #include "JMapArrow.h" #include "JTapArrow.h" #include "JUnfoldArrow.h" @@ -36,6 +38,7 @@ JTopologyBuilder::~JTopologyBuilder() { void JTopologyBuilder::AddArrow(JArrow* arrow) { arrows.push_back(arrow); + arrow->SetId(arrows.size()-1); auto it = arrow_lookup.find(arrow->GetName()); if (it != arrow_lookup.end()) { throw JException("AddArrow(): Arrow with name '%s' has already been added", arrow->GetName().c_str()); @@ -43,6 +46,13 @@ void JTopologyBuilder::AddArrow(JArrow* arrow) { arrow_lookup[arrow->GetName()] = arrow; } +JArrow* JTopologyBuilder::GetArrow(const std::string& arrow_name) { + auto it = arrow_lookup.find(arrow_name); + if (it == arrow_lookup.end()) { + return nullptr; + } + return it->second; +} JEventPool* JTopologyBuilder::GetOrCreatePool(JEventLevel level) { auto pool_it = pool_lookup.find(level); @@ -92,19 +102,20 @@ std::string JTopologyBuilder::PrintTopology() { t.AddColumn("Port", JTablePrinter::Justify::Left, 0); t.AddColumn("Place", JTablePrinter::Justify::Left, 0); t.AddColumn("ID", JTablePrinter::Justify::Left, 0); + t.AddColumn("Order", JTablePrinter::Justify::Left, 0); // Build index lookup for queues int i = 0; std::map lookup; - for (JEventQueue* queue : queues) { - lookup[queue] = i; - i += 1; - } // Build index lookup for pools for (JEventPool* pool : pools) { lookup[pool] = i; i += 1; } + for (JEventQueue* queue : queues) { + lookup[queue] = i; + i += 1; + } // Build table bool show_row = true; @@ -126,6 +137,18 @@ std::string JTopologyBuilder::PrintTopology() { t | port->GetName(); t | ((port->GetQueue() != nullptr) ? "Queue ": "Pool"); t | place_index; + if (port->GetEnforcesOrdering() && port->GetEstablishesOrdering()) { + t | "Both"; + } + else if (port->GetEnforcesOrdering()) { + t | "Enf"; + } + else if (port->GetEstablishesOrdering()) { + t | "Est"; + } + else { + t | ""; + } } } return t.Render(); @@ -146,11 +169,10 @@ void JTopologyBuilder::CreateTopology() { if (m_configure_topology) { m_configure_topology(*this, *m_components); - LOG_WARN(GetLogger()) << "Found custom topology configurator! Modified arrow topology is: \n" << PrintTopology() << LOG_END; + LOG_INFO(GetLogger()) << "Using custom topology configuration function" << LOG_END; } else { - AttachLevel(JEventLevel::Run, nullptr, nullptr); - LOG_INFO(GetLogger()) << "Arrow topology is:\n" << PrintTopology() << LOG_END; + CreateTopologyFromScratch(); } for (auto* arrow : arrows) { arrow->SetLogger(GetLogger()); @@ -170,9 +192,222 @@ void JTopologyBuilder::CreateTopology() { queue->SetEstablishesOrdering(false); } } - size_t i = 0; - for (auto* queue: queues) { - LOG_DEBUG(GetLogger()) << "Queue " << i++ << ": establishes_ordering: " << queue->GetEstablishesOrdering() << ", enforces_ordering: " << queue->GetEnforcesOrdering(); + LOG_INFO(GetLogger()) << "Arrow topology is:\n" << PrintTopology() << LOG_END; +} + +void JTopologyBuilder::CreateTopologyFromScratch() { + + enum class Column { Source, UnfoldAbove, BatchBefore, UnfoldBelow, FoldBelow, BatchAfter, Tap, FoldAbove}; + std::vector columns = { Column::Source, Column::UnfoldAbove, Column::BatchBefore, Column::UnfoldBelow, Column::FoldBelow, Column::BatchAfter, Column::Tap, Column::FoldAbove}; + struct Cell { + JArrow* start = nullptr; + JArrow* end = nullptr; + }; + + std::map, Cell> grid; + + // ----------------------------- + // Phase 1: Iterate over all components, adding the corresponding arrows to the grid + // ----------------------------- + + int map_counter = 1; + std::set levels_present; + + // Place all sources on grid + // ----------------------------- + std::map> sources; + for (JEventSource* source : m_components->GetSources()) { + if (source->IsEnabled()) { + sources[source->GetLevel()].push_back(source); + levels_present.insert(source->GetLevel()); + } + } + for (auto& it : sources) { + auto level = it.first; + auto level_str = toString(level); + bool need_map = false; + bool need_multi_arrow = false; + for (auto* source : it.second) { + need_map |= source->IsProcessParallelEnabled(); + need_multi_arrow |= (source->GetParentLevels().size() > 0); + } + + if (need_multi_arrow && sources.size() > 1) { + throw JException("Multiple multilevel JEventSources not supported yet"); + } + + JArrow* src_arrow; + if (need_multi_arrow) { + src_arrow = new JMultilevelSourceArrow(level_str+"MultiSource", it.second.at(0)); + // Add parent levels now. Child level is added further below. + for (auto parent_level: it.second.at(0)->GetParentLevels()) { + levels_present.insert(parent_level); + grid[{parent_level, Column::Source}] = {src_arrow, src_arrow}; + } + } + else { + src_arrow = new JSourceArrow(level_str+"Source", level, it.second); + } + AddArrow(src_arrow); + + if (need_map) { + auto* map_arrow = new JMapArrow(toString(level)+"Map"+std::to_string(map_counter++), level); + map_arrow->SetParallelSource(true); + AddArrow(map_arrow); + Connect(src_arrow, 1, map_arrow, 0); + grid[{level, Column::Source}] = {src_arrow, map_arrow}; + } + else { + grid[{level, Column::Source}] = {src_arrow, src_arrow}; + } + } + + // Place all unfolders on grid + // ----------------------------- + for (auto* unfolder: m_components->GetUnfolders()) { + + if (!unfolder->IsEnabled()) continue; + + // Create unfold arrow + // Publish at _each_ grid location + auto parent_level = unfolder->GetLevel(); + auto child_level = unfolder->GetChildLevel(); + levels_present.insert(parent_level); + levels_present.insert(child_level); + + auto* map_arrow = new JMapArrow(toString(parent_level)+"Map"+std::to_string(map_counter++), parent_level); + auto* unfold_arrow = new JUnfoldArrow(toString(child_level)+"Unfold", unfolder); + map_arrow->AddUnfolder(unfolder); + AddArrow(map_arrow); + AddArrow(unfold_arrow); + Connect(map_arrow, map_arrow->EVENT_OUT, unfold_arrow, unfold_arrow->PARENT_IN); + + if (grid.find({parent_level, Column::UnfoldBelow}) != grid.end()) { + throw JException("Only one unfolder allowed for parent level=%s", toString(parent_level).c_str()); + } + if (grid.find({child_level, Column::UnfoldAbove}) != grid.end()) { + throw JException("Only one unfolder allowed for child level=%s", toString(child_level).c_str()); + } + grid[{parent_level, Column::UnfoldBelow}] = {map_arrow, unfold_arrow}; + grid[{child_level, Column::UnfoldAbove}] = {unfold_arrow, unfold_arrow}; + } + + // Place all folders on grid + // ----------------------------- + for (auto* folder: m_components->GetFolders()) { + + if (!folder->IsEnabled()) continue; + + // Create unfold arrow + // Publish at _each_ grid location + auto parent_level = folder->GetLevel(); + auto child_level = folder->GetChildLevel(); + levels_present.insert(parent_level); + levels_present.insert(child_level); + + auto* map_arrow = new JMapArrow(toString(child_level)+"Map"+std::to_string(map_counter++), parent_level); + auto* fold_arrow = new JFoldArrow(toString(parent_level)+"Fold", parent_level, child_level); + fold_arrow->SetFolder(folder); + map_arrow->AddFolder(folder); + AddArrow(map_arrow); + AddArrow(fold_arrow); + Connect(map_arrow, map_arrow->EVENT_OUT, fold_arrow, fold_arrow->CHILD_IN); + + if (grid.find({parent_level, Column::FoldBelow}) != grid.end()) { + throw JException("Only one folder allowed for parent level=%s", toString(parent_level).c_str()); + } + if (grid.find({child_level, Column::FoldAbove}) != grid.end()) { + throw JException("Only one folder allowed for child level=%s", toString(child_level).c_str()); + } + grid[{parent_level, Column::FoldBelow}] = {fold_arrow, fold_arrow}; + grid[{child_level, Column::FoldAbove}] = {map_arrow, fold_arrow}; + } + + // Place all processors on grid + // ----------------------------- + std::map> mappable_processors; + std::map> tappable_processors; + for (auto* proc : m_components->GetProcessors()) { + if (proc->IsEnabled()) { + levels_present.insert(proc->GetLevel()); + if (proc->GetCallbackStyle() == JEventProcessor::CallbackStyle::LegacyMode && proc->IsOrderingEnabled()) { + throw JException("%s: Ordering can only be used with non-legacy JEventProcessors", proc->GetTypeName().c_str()); + } + mappable_processors[proc->GetLevel()].push_back(proc); + if (proc->GetCallbackStyle() != JEventProcessor::CallbackStyle::LegacyMode) { + tappable_processors[proc->GetLevel()].push_back(proc); + } + } + } + for (auto it : mappable_processors) { + auto level = it.first; + auto level_str = toString(level); + auto* map_arrow = new JMapArrow(level_str+"Map"+std::to_string(map_counter++), level); + for (JEventProcessor* proc : it.second) { + map_arrow->AddProcessor(proc); + } + AddArrow(map_arrow); + + auto tappable_procs_it = tappable_processors.find(level); + if (tappable_procs_it != tappable_processors.end()) { + JArrow* first_tap_arrow = nullptr; + JArrow* last_tap_arrow = nullptr; + std::tie(first_tap_arrow, last_tap_arrow) = CreateTapChain(it.second, level_str); + Connect(map_arrow, map_arrow->EVENT_OUT, first_tap_arrow, JTapArrow::EVENT_IN); + grid[{level, Column::Tap}] = {map_arrow, last_tap_arrow}; + } + else { + // ONLY legacy processors, no tap chain + grid[{level, Column::Tap}] = {map_arrow, map_arrow}; + } + } + + // ----------------------------- + // Phase 2: Iterate over all rows and all adjacent occupied column pairs, wiring horizontally + // ----------------------------- + + for (JEventLevel level : levels_present) { + + auto* pool = GetOrCreatePool(level); + JArrow* last_arrow = nullptr; + for (auto column : columns) { + auto it = grid.find({level, column}); + if (it == grid.end()) { continue; } + + JArrow* current_arrow = it->second.start; + if (last_arrow == nullptr) { + // This is the first arrow we've found, so connect the pool here + auto port_index = current_arrow->GetPortIndex(level, JArrow::PortDirection::In); + current_arrow->GetPort(port_index).Attach(pool); + } + else { + Connect(last_arrow, + last_arrow->GetPortIndex(level, JArrow::PortDirection::Out), + current_arrow, + current_arrow->GetPortIndex(level, JArrow::PortDirection::In)); + } + last_arrow = it->second.end; + + } + // Connect last_arrow to pool + auto port_index = last_arrow->GetPortIndex(level, JArrow::PortDirection::Out); + if (level == JEventLevel::PhysicsEvent) { + last_arrow->SetIsSink(true); + } + last_arrow->GetPort(port_index).Attach(pool); + } + + // ----------------------------- + // Phase 3: Traverse event hierarchy and attach levels accordingly + // ----------------------------- + // Because we haven't fully implemented the event hierarchy yet, let's just go with the fully connected graph + + for (auto outer_level : levels_present) { + for (auto inner_level : levels_present) { + if (outer_level != inner_level) { + ConnectPool(outer_level, inner_level); + } + } } } @@ -273,8 +508,6 @@ void JTopologyBuilder::Connect(JArrow* upstream, size_t upstream_port_id, JArrow downstream_port.Attach(queue); } - - upstream_port.Attach(queue); if (downstream_port.GetEnforcesOrdering()) { @@ -286,29 +519,20 @@ void JTopologyBuilder::Connect(JArrow* upstream, size_t upstream_port_id, JArrow } -void JTopologyBuilder::ConnectToFirstAvailable(JArrow* upstream, size_t upstream_port, std::vector> downstreams) { - for (auto& [downstream, downstream_port_id] : downstreams) { - if (downstream != nullptr) { - Connect(upstream, upstream_port, downstream, downstream_port_id); - return; - } - } -} - std::pair JTopologyBuilder::CreateTapChain(std::vector& procs, std::string level) { JTapArrow* first = nullptr; JTapArrow* last = nullptr; int i=1; - std::string arrow_name = level + "Tap"; for (JEventProcessor* proc : procs) { + std::string arrow_name = level + "Tap"; if (procs.size() > 1) { arrow_name += std::to_string(i++); } JTapArrow* current = new JTapArrow(arrow_name, proc->GetLevel()); current->AddProcessor(proc); - arrows.push_back(current); + AddArrow(current); if (first == nullptr) { first = current; } @@ -321,233 +545,3 @@ std::pair JTopologyBuilder::CreateTapChain(std::vector sources_at_level; - for (JEventSource* source : m_components->get_evt_srces()) { - if (source->GetLevel() == current_level && source->IsEnabled()) { - sources_at_level.push_back(source); - } - } - - // Find all unfolders at this level - std::vector unfolders_at_level; - for (JEventUnfolder* unfolder : m_components->get_unfolders()) { - if (unfolder->GetLevel() == current_level && unfolder->IsEnabled()) { - unfolders_at_level.push_back(unfolder); - } - } - - // Find all processors at this level - std::vector mappable_procs_at_level; - std::vector tappable_procs_at_level; - - for (JEventProcessor* proc : m_components->get_evt_procs()) { - - if (proc->GetLevel() == current_level && proc->IsEnabled()) { - - // This may be a weird place to do it, but let's quickly validate that users aren't - // trying to enable ordering on a legacy event processor. We don't do this in the constructor - // because we don't want to put constraints on the order in which setters can be called, apart from "before Init()" - - if (proc->GetCallbackStyle() == JEventProcessor::CallbackStyle::LegacyMode && proc->IsOrderingEnabled()) { - throw JException("%s: Ordering can only be used with non-legacy JEventProcessors", proc->GetTypeName().c_str()); - } - - mappable_procs_at_level.push_back(proc); - if (proc->GetCallbackStyle() != JEventProcessor::CallbackStyle::LegacyMode) { - tappable_procs_at_level.push_back(proc); - } - } - } - - - bool is_top_level = (parent_unfolder == nullptr); - if (is_top_level && sources_at_level.size() == 0) { - // Skip level entirely when no source is present. - LOG_TRACE(GetLogger()) << "JTopologyBuilder: No sources found at level " << current_level << ", skipping" << LOG_END; - JEventLevel next = next_level(current_level); - if (next == JEventLevel::None) { - LOG_WARN(GetLogger()) << "No sources found: Processing topology will be empty." << LOG_END; - return; - } - return AttachLevel(next, nullptr, nullptr); - } - - // Enforce constraints on what our builder will accept (at least for now) - if (!is_top_level && !sources_at_level.empty()) { - throw JException("Topology forbids event sources at lower event levels in the topology"); - } - if ((parent_unfolder == nullptr && parent_folder != nullptr) || (parent_unfolder != nullptr && parent_folder == nullptr)) { - throw JException("Topology requires matching unfolder/folder arrow pairs"); - } - if (unfolders_at_level.size() > 1) { - throw JException("Multiple JEventUnfolders provided for level %s", level_str.c_str()); - } - // Another constraint is that the highest level of the topology has an event sources, but this is automatically handled by - // the level-skipping logic above - - - // Fill out arrow grid from components at this event level - // -------------------------- - // 0. Pool - // -------------------------- - LOG_INFO(GetLogger()) << "Creating event pool with level=" << current_level << " and size=" << m_max_inflight_events[current_level]; - JEventPool* pool_at_level = new JEventPool(m_components, m_max_inflight_events[current_level], m_location_count, current_level); - pools.push_back(pool_at_level); // Hand over ownership of the pool to the topology - LOG_INFO(GetLogger()) << "Finished creating event pool"; - - // -------------------------- - // 1. Source - // -------------------------- - JSourceArrow* src_arrow = nullptr; - bool need_source = !sources_at_level.empty(); - if (need_source) { - src_arrow = new JSourceArrow(level_str+"Source", current_level, sources_at_level); - src_arrow->GetPort(JSourceArrow::EVENT_IN).Attach(pool_at_level); - src_arrow->GetPort(JSourceArrow::EVENT_OUT).Attach(pool_at_level); - arrows.push_back(src_arrow); - } - - // -------------------------- - // 2. Map1 - // -------------------------- - bool have_parallel_sources = false; - for (JEventSource* source: sources_at_level) { - have_parallel_sources |= source->IsProcessParallelEnabled(); - } - bool have_unfolder = !unfolders_at_level.empty(); - JMapArrow* map1_arrow = nullptr; - bool need_map1 = (have_parallel_sources || have_unfolder); - - if (need_map1) { - map1_arrow = new JMapArrow(level_str+"Map1", current_level); - for (JEventSource* source: sources_at_level) { - if (source->IsProcessParallelEnabled()) { - map1_arrow->AddSource(source); - } - } - for (JEventUnfolder* unf: unfolders_at_level) { - map1_arrow->AddUnfolder(unf); - } - map1_arrow->GetPort(JMapArrow::EVENT_IN).Attach(pool_at_level); - map1_arrow->GetPort(JMapArrow::EVENT_OUT).Attach(pool_at_level); - arrows.push_back(map1_arrow); - } - - // -------------------------- - // 3. Unfold - // -------------------------- - JUnfoldArrow* unfold_arrow = nullptr; - bool need_unfold = have_unfolder; - if (need_unfold) { - unfold_arrow = new JUnfoldArrow(level_str+"Unfold", unfolders_at_level[0]); - unfold_arrow->GetPort(JUnfoldArrow::REJECTED_PARENT_OUT).Attach(pool_at_level); - arrows.push_back(unfold_arrow); - } - - // -------------------------- - // 4. Fold - // -------------------------- - JFoldArrow* fold_arrow = nullptr; - bool need_fold = have_unfolder; - if(need_fold) { - fold_arrow = new JFoldArrow(level_str+"Fold", current_level, unfolders_at_level[0]->GetChildLevel()); - arrows.push_back(fold_arrow); - fold_arrow->GetPort(JFoldArrow::PARENT_OUT).Attach(pool_at_level); - } - - // -------------------------- - // 5. Map2 - // -------------------------- - JMapArrow* map2_arrow = nullptr; - bool need_map2 = !mappable_procs_at_level.empty(); - if (need_map2) { - map2_arrow = new JMapArrow(level_str+"Map2", current_level); - for (JEventProcessor* proc : mappable_procs_at_level) { - map2_arrow->AddProcessor(proc); - map2_arrow->GetPort(JMapArrow::EVENT_IN).Attach(pool_at_level); - map2_arrow->GetPort(JMapArrow::EVENT_OUT).Attach(pool_at_level); - } - arrows.push_back(map2_arrow); - } - - // -------------------------- - // 6. Tap - // -------------------------- - JTapArrow* first_tap_arrow = nullptr; - JTapArrow* last_tap_arrow = nullptr; - bool need_tap = !tappable_procs_at_level.empty(); - if (need_tap) { - std::tie(first_tap_arrow, last_tap_arrow) = CreateTapChain(tappable_procs_at_level, level_str); - first_tap_arrow->GetPort(JTapArrow::EVENT_IN).Attach(pool_at_level); - last_tap_arrow->GetPort(JTapArrow::EVENT_OUT).Attach(pool_at_level); - } - - - // Now that we've set up our component grid, we can do wiring! - // -------------------------- - // 1. Source - // -------------------------- - if (parent_unfolder != nullptr) { - parent_unfolder->GetPort(JUnfoldArrow::CHILD_IN).Attach(pool_at_level); - ConnectToFirstAvailable(parent_unfolder, JUnfoldArrow::CHILD_OUT, - {{map1_arrow, JMapArrow::EVENT_IN}, {unfold_arrow, JUnfoldArrow::PARENT_IN}, {map2_arrow, JMapArrow::EVENT_IN}, {first_tap_arrow, JTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}}); - } - if (src_arrow != nullptr) { - ConnectToFirstAvailable(src_arrow, JSourceArrow::EVENT_OUT, - {{map1_arrow, JMapArrow::EVENT_IN}, {unfold_arrow, JUnfoldArrow::PARENT_IN}, {map2_arrow, JMapArrow::EVENT_IN}, {first_tap_arrow, JTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}}); - } - if (map1_arrow != nullptr) { - ConnectToFirstAvailable(map1_arrow, JMapArrow::EVENT_OUT, - {{unfold_arrow, JUnfoldArrow::PARENT_IN}, {map2_arrow, JMapArrow::EVENT_IN}, {first_tap_arrow, JTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}}); - } - if (unfold_arrow != nullptr) { - ConnectToFirstAvailable(unfold_arrow, JUnfoldArrow::REJECTED_PARENT_OUT, - {{map2_arrow, JMapArrow::EVENT_IN}, {first_tap_arrow, JTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}}); - } - if (fold_arrow != nullptr) { - ConnectToFirstAvailable(fold_arrow, JFoldArrow::PARENT_OUT, - {{map2_arrow, JMapArrow::EVENT_IN}, {first_tap_arrow, JTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}}); - } - if (map2_arrow != nullptr) { - ConnectToFirstAvailable(map2_arrow, JMapArrow::EVENT_OUT, - {{first_tap_arrow, JTapArrow::EVENT_IN}, {parent_folder, JFoldArrow::CHILD_IN}}); - } - if (last_tap_arrow != nullptr) { - ConnectToFirstAvailable(last_tap_arrow, JTapArrow::EVENT_OUT, - {{parent_folder, JFoldArrow::CHILD_IN}}); - } - if (parent_folder != nullptr) { - parent_folder->GetPort(JFoldArrow::CHILD_OUT).Attach(pool_at_level); - } - - // Finally, we recur over lower levels! - if (need_unfold) { - auto next_level = unfolders_at_level[0]->GetChildLevel(); - AttachLevel(next_level, unfold_arrow, fold_arrow); - } - else { - // This is the lowest level - // TODO: Improve logic for determining event counts for multilevel topologies - if (last_tap_arrow != nullptr) { - last_tap_arrow->SetIsSink(true); - } - else if (map2_arrow != nullptr) { - map2_arrow->SetIsSink(true); - } - else if (map1_arrow != nullptr) { - map1_arrow->SetIsSink(true); - } - else if (src_arrow != nullptr) { - src_arrow->SetIsSink(true); - } - } -} - - - diff --git a/src/libraries/JANA/Topology/JTopologyBuilder.h b/src/libraries/JANA/Topology/JTopologyBuilder.h index 744d2ec72..ccfa1bc5f 100644 --- a/src/libraries/JANA/Topology/JTopologyBuilder.h +++ b/src/libraries/JANA/Topology/JTopologyBuilder.h @@ -53,6 +53,8 @@ class JTopologyBuilder : public JService { void Init() override; void AddArrow(JArrow* arrow); + JArrow* GetArrow(const std::string& arrow_name); + JEventPool* GetOrCreatePool(JEventLevel level); void ConnectQueue(std::string upstream_arrow_name, std::string upstream_port_name, std::string downstream_arrow_name, std::string downstream_port_name); @@ -67,6 +69,7 @@ class JTopologyBuilder : public JService { void SetConfigureFn(std::function configure_fn); void CreateTopology(); + void CreateTopologyFromScratch(); std::string PrintTopology(); @@ -76,11 +79,8 @@ class JTopologyBuilder : public JService { const JProcessorMapping& GetProcessorMapping() { return mapping; }; private: - void AttachLevel(JEventLevel current_level, JUnfoldArrow* parent_unfolder, JFoldArrow* parent_folder); - void ConnectToFirstAvailable(JArrow* upstream, size_t upstream_port_id, std::vector> downstreams); void Connect(JArrow* upstream, size_t upstream_port_id, JArrow* downstream, size_t downstream_port_id); std::pair CreateTapChain(std::vector& procs, std::string name); - JEventPool* GetOrCreatePool(JEventLevel level); }; diff --git a/src/libraries/JANA/Topology/JUnfoldArrow.h b/src/libraries/JANA/Topology/JUnfoldArrow.h index 7d69b4c6d..dafcf4258 100644 --- a/src/libraries/JANA/Topology/JUnfoldArrow.h +++ b/src/libraries/JANA/Topology/JUnfoldArrow.h @@ -8,7 +8,7 @@ class JUnfoldArrow : public JArrow { public: - enum PortIndex {PARENT_IN=0, CHILD_IN=1, CHILD_OUT=2, REJECTED_PARENT_OUT=3}; + enum PortIndex {PARENT_IN=0, CHILD_IN=1, CHILD_OUT=2, PARENT_OUT=3}; private: JEventUnfolder* m_unfolder = nullptr; @@ -20,13 +20,13 @@ class JUnfoldArrow : public JArrow { SetName(name); auto parent_level = unfolder->GetLevel(); auto child_level = unfolder->GetChildLevel(); - AddPort("parent_in", parent_level); - AddPort("child_in", child_level); - AddPort("child_out", child_level).SetEstablishesOrdering(true); + AddPort("parent_in", parent_level, PortDirection::In); + AddPort("child_in", child_level, PortDirection::In); + AddPort("child_out", child_level, PortDirection::Out).SetEstablishesOrdering(true); // Just in case there's a folder that needs this. // establishes_ordering is cheap; enforces_ordering is the expensive one - AddPort("rejected_parent_out", parent_level); + AddPort("parent_out", parent_level, PortDirection::Out); m_next_input_port = GetPortIndex("parent_in"); } @@ -46,7 +46,6 @@ class JUnfoldArrow : public JArrow { if (this->m_next_input_port == PARENT_IN) { assert(m_parent_event == nullptr); m_parent_event = event; - m_parent_event->TakeRefToSelf(); } else if (this->m_next_input_port == CHILD_IN) { assert(m_child_event == nullptr); @@ -89,8 +88,8 @@ class JUnfoldArrow : public JArrow { if (result == JEventUnfolder::Result::KeepChildNextParent) { // KeepChildNextParent is a little more complicated because we have to handle the case of the parent having no children. - // In this case the parent obviously doesn't get shared among any children, and instead it is sent to the REJECTED_PARENT_OUT port. - int child_count = m_parent_event->ReleaseRefToSelf(); // Decrement the reference count so that this can be recycled + // In this case the parent obviously doesn't get shared among any children, and instead it is sent to the PARENT_OUT port. + int child_count = m_parent_event->GetChildCount(); LOG_DEBUG(m_logger) << "Unfold finished with parent event = " << m_parent_event->GetEventNumber() << " (" << child_count << " children emitted)"; if (child_count > 0) { @@ -104,7 +103,7 @@ class JUnfoldArrow : public JArrow { else { // Parent has NO children output_count = 1; - outputs[0] = {m_parent_event, REJECTED_PARENT_OUT}; + outputs[0] = {m_parent_event, PARENT_OUT}; m_parent_event = nullptr; m_next_input_port = PARENT_IN; status = JArrow::FireResult::KeepGoing; @@ -122,9 +121,9 @@ class JUnfoldArrow : public JArrow { } else if (result == JEventUnfolder::Result::NextChildNextParent) { m_child_event->SetParent(m_parent_event); - m_parent_event->ReleaseRefToSelf(); // Decrement the reference count so that this can be recycled outputs[0] = {m_child_event, CHILD_OUT}; - output_count = 1; + outputs[1] = {m_parent_event, PARENT_OUT}; + output_count = 2; LOG_DEBUG(m_logger) << "Unfold finished with parent event = " << m_parent_event->GetEventNumber() << LOG_END; m_child_event = nullptr; m_parent_event = nullptr; diff --git a/src/libraries/JANA/Utils/JEventLevel.h b/src/libraries/JANA/Utils/JEventLevel.h index 4d65f4398..6925b94ea 100644 --- a/src/libraries/JANA/Utils/JEventLevel.h +++ b/src/libraries/JANA/Utils/JEventLevel.h @@ -5,6 +5,8 @@ #include #include #include +#include +#include enum class JEventLevel { Run, Subrun, SlowControls, Timeslice, Block, PhysicsEvent, Subevent, Task, None }; @@ -70,3 +72,60 @@ inline JEventLevel next_level(JEventLevel current_level) { default: return JEventLevel::None; } } + +class JEventLevelHierarchy { + std::vector m_bottom_levels; + std::vector m_parent_levels; + std::vector m_all_levels; + std::map> parents; + std::map> children; + +private: + void ToString(std::stringstream& ss, JEventLevel current) { + ss << current; + auto it = parents.find(current); + if (it != parents.end()) { + ss << "("; + size_t parent_count = it->second.size(); + for (size_t i=0; isecond[i]); + if (i != parent_count-1) { + ss << ", "; + } + } + ss << ")"; + } + } + +public: + + const std::vector& GetAllLevels() const { + return m_all_levels; + } + const std::vector& GetParentLevels() const { + return m_parent_levels; + } + const std::vector& GetBottomLevels() const { + return m_bottom_levels; + } + void AddBottomLevel(JEventLevel level) { + m_all_levels.push_back(level); + m_bottom_levels.push_back(level); + } + void AddParentLevel(JEventLevel child, JEventLevel parent) { + m_all_levels.push_back(parent); + m_parent_levels.push_back(parent); + parents[child].push_back(parent); + children[parent].push_back(child); + } + std::string ToString() { + std::stringstream ss; + for (size_t i=0; iAddProcessor(proc); } diff --git a/src/programs/integration_tests/SimpleOffloading.cc b/src/programs/integration_tests/SimpleOffloading.cc index e27ae0fa7..785a62f72 100644 --- a/src/programs/integration_tests/SimpleOffloading.cc +++ b/src/programs/integration_tests/SimpleOffloading.cc @@ -74,8 +74,8 @@ struct TriggerFactoryInputsArrow : public JArrow { TriggerFactoryInputsArrow(JEventLevel level) { SetName("trigger"); SetIsParallel(true); - AddPort("in", level); - AddPort("out", level); + AddPort("in", level, PortDirection::In); + AddPort("out", level, PortDirection::Out); } void Fire(JEvent* event, OutputData& outputs, size_t& output_count, JArrow::FireResult& status) override { @@ -98,8 +98,8 @@ struct OffloadArrow : public JArrow { OffloadArrow(JEventLevel level) { SetName("offload"); SetIsParallel(false); - AddPort("in", level); - AddPort("out", level); + AddPort("in", level, PortDirection::In); + AddPort("out", level, PortDirection::Out); } ~OffloadArrow() override {} @@ -118,7 +118,7 @@ struct OffloadArrow : public JArrow { void configure_topology(JTopologyBuilder& builder, JComponentManager& components) { - auto* src_arrow = new JSourceArrow("src", JEventLevel::PhysicsEvent, components.get_evt_srces()); + auto* src_arrow = new JSourceArrow("src", JEventLevel::PhysicsEvent, components.GetSources()); TriggerFactoryInputsArrow* trigger_inputs_arrow = new TriggerFactoryInputsArrow(JEventLevel::PhysicsEvent); trigger_inputs_arrow->unique_name = "B"; @@ -127,12 +127,12 @@ void configure_topology(JTopologyBuilder& builder, JComponentManager& components offload_arrow->unique_name = "B"; JMapArrow* map_arrow = new JMapArrow("map", JEventLevel::PhysicsEvent); - for (auto proc : components.get_evt_procs()) { + for (auto proc : components.GetProcessors()) { map_arrow->AddProcessor(proc); } JTapArrow* tap_arrow = new JTapArrow("tap", JEventLevel::PhysicsEvent); - for (auto proc : components.get_evt_procs()) { + for (auto proc : components.GetProcessors()) { tap_arrow->AddProcessor(proc); } diff --git a/src/programs/unit_tests/Engine/JExecutionEngineTests.cc b/src/programs/unit_tests/Engine/JExecutionEngineTests.cc index 1f0585cce..1d0660c69 100644 --- a/src/programs/unit_tests/Engine/JExecutionEngineTests.cc +++ b/src/programs/unit_tests/Engine/JExecutionEngineTests.cc @@ -138,7 +138,7 @@ TEST_CASE("JExecutionEngine_ExternalWorkers") { sut->ExchangeTask(task, worker.worker_id); REQUIRE(task.arrow != nullptr); - REQUIRE(task.arrow->GetName() == "PhysicsEventMap2"); + REQUIRE(task.arrow->GetName() == "PhysicsEventMap1"); REQUIRE(sut->GetRunStatus() == JExecutionEngine::RunStatus::Draining); REQUIRE(sut->GetPerf().event_count == 0); diff --git a/src/programs/unit_tests/Topology/JArrowTests.cc b/src/programs/unit_tests/Topology/JArrowTests.cc index 7726c5fda..cc17b4a54 100644 --- a/src/programs/unit_tests/Topology/JArrowTests.cc +++ b/src/programs/unit_tests/Topology/JArrowTests.cc @@ -9,8 +9,8 @@ struct TestData { int x; }; struct BasicParallelArrow : public JArrow { BasicParallelArrow() { - AddPort("in", JEventLevel::PhysicsEvent); - AddPort("out", JEventLevel::PhysicsEvent); + AddPort("in", JEventLevel::PhysicsEvent, PortDirection::In); + AddPort("out", JEventLevel::PhysicsEvent, PortDirection::Out); SetIsParallel(true); } diff --git a/src/programs/unit_tests/Topology/JTopologyBuilderTests.cc b/src/programs/unit_tests/Topology/JTopologyBuilderTests.cc index 20534bb2c..17c67dae6 100644 --- a/src/programs/unit_tests/Topology/JTopologyBuilderTests.cc +++ b/src/programs/unit_tests/Topology/JTopologyBuilderTests.cc @@ -25,12 +25,16 @@ class MyMultiSource : public JEventSource { public: MyMultiSource() { SetCallbackStyle(CallbackStyle::ExpertMode); - SetEventLevels({JEventLevel::Run, JEventLevel::SlowControls, JEventLevel::PhysicsEvent}); + SetParentLevels({JEventLevel::Run, JEventLevel::SlowControls}); + SetLevel(JEventLevel::PhysicsEvent); } Result Emit(JEvent& event) override { auto count = GetEmittedEventCount(); - const auto& levels = GetEventLevels(); - SetNextEventLevel(levels.at((count+1) % levels.size())); + switch (count % 3) { + case 0: SetNextEventLevel(JEventLevel::SlowControls); break; + case 1: SetNextEventLevel(JEventLevel::PhysicsEvent); break; + case 2: SetNextEventLevel(JEventLevel::Run); break; + } LOG_INFO(GetLogger()) << "Emitting " << event.GetEventStamp(); return Result::Success; // Assume that source can peek ahead to request a different level } @@ -38,12 +42,10 @@ class MyMultiSource : public JEventSource { void configure_multisource_topology(JTopologyBuilder& builder, JComponentManager& components) { - auto* src_arrow = new JMultilevelSourceArrow; - src_arrow->SetName("src"); - src_arrow->SetEventSource(components.get_evt_srces().at(0)); + auto* src_arrow = new JMultilevelSourceArrow("src", components.GetSources().at(0)); JTapArrow* tap_arrow = new JTapArrow("tap"); - for (auto proc : components.get_evt_procs()) { + for (auto proc : components.GetProcessors()) { tap_arrow->AddProcessor(proc); } diff --git a/src/programs/unit_tests/Topology/MultiLevelTopologyTests.cc b/src/programs/unit_tests/Topology/MultiLevelTopologyTests.cc index d5f934399..d949ec9c5 100644 --- a/src/programs/unit_tests/Topology/MultiLevelTopologyTests.cc +++ b/src/programs/unit_tests/Topology/MultiLevelTopologyTests.cc @@ -1,6 +1,7 @@ #include "MultiLevelTopologyTests.h" #include "JANA/Engine/JExecutionEngine.h" #include "JANA/JApplicationFwd.h" +#include "JANA/JEvent.h" #include "JANA/JException.h" #include "JANA/Topology/JArrow.h" #include "JANA/Topology/JTopologyBuilder.h" @@ -31,41 +32,62 @@ TEST_CASE("TimeslicesTests_FineGrained") { app.Initialize(); auto ee = app.GetService(); auto top = app.GetService(); - enum ArrowId {TS_SRC=0, TS_MAP=1, TS_UNF=2, TS_FLD=3, PH_MAP=4, PH_TAP=5}; + + auto src_arrow = top->GetArrow("TimesliceSource"); + auto ts_map_arrow = top->GetArrow("TimesliceMap1"); + auto unfold_arrow = top->GetArrow("PhysicsEventUnfold"); + auto pe_map_arrow = top->GetArrow("PhysicsEventMap2"); + auto pe_tap_arrow = top->GetArrow("PhysicsEventTap"); + + auto ts_pool = top->GetOrCreatePool(JEventLevel::Timeslice); + auto pe_pool = top->GetOrCreatePool(JEventLevel::PhysicsEvent); + + auto ts_map_queue = ts_map_arrow->GetPort(0).GetQueue(); + auto unfold_queue = unfold_arrow->GetPort(0).GetQueue(); + auto pe_map_queue = pe_map_arrow->GetPort(0).GetQueue(); + auto pe_tap_queue = pe_tap_arrow->GetPort(0).GetQueue(); + + // Test connectivity + REQUIRE(src_arrow->GetPort(0).GetPool() == ts_pool); + REQUIRE(src_arrow->GetPort(1).GetQueue() == ts_map_queue); + REQUIRE(ts_map_arrow->GetPort(1).GetQueue() == unfold_queue); + REQUIRE(unfold_queue == unfold_arrow->GetPort(JEventLevel::Timeslice, JArrow::PortDirection::In).GetQueue()); + REQUIRE(pe_pool == unfold_arrow->GetPort(JEventLevel::PhysicsEvent, JArrow::PortDirection::In).GetPool()); + REQUIRE(unfold_arrow->GetPort(JEventLevel::Timeslice, JArrow::PortDirection::Out).GetPool() == ts_pool); + REQUIRE(unfold_arrow->GetPort(JEventLevel::PhysicsEvent, JArrow::PortDirection::Out).GetQueue() == pe_map_queue); + REQUIRE(pe_map_arrow->GetPort(1).GetQueue() == pe_tap_queue); + REQUIRE(pe_tap_arrow->GetPort(1).GetPool() == pe_pool); + JArrow::FireResult result = JArrow::FireResult::NotRunYet; - result = ee->Fire(TS_SRC, 0); + result = ee->Fire(src_arrow->GetId(), 0); REQUIRE(result == JArrow::FireResult::KeepGoing); - REQUIRE(top->GetArrows()[TS_SRC]->GetPort(1).GetQueue() == top->GetQueues()[0]); - REQUIRE(top->GetPools()[0]->GetCapacity() == 2); - REQUIRE(top->GetPools()[0]->GetSize(0) == 1); - REQUIRE(top->GetQueues()[0]->GetSize(0) == 1); + REQUIRE(ts_pool->GetCapacity() == 2); + REQUIRE(ts_pool->GetSize(0) == 1); + REQUIRE(ts_map_queue->GetSize(0) == 1); - result = ee->Fire(TS_MAP, 0); + result = ee->Fire(ts_map_arrow->GetId(), 0); REQUIRE(result == JArrow::FireResult::KeepGoing); - REQUIRE(top->GetQueues()[0]->GetSize(0) == 0); - REQUIRE(top->GetQueues()[1]->GetSize(0) == 1); - + REQUIRE(ts_map_queue->GetSize(0) == 0); + REQUIRE(unfold_queue->GetSize(0) == 1); + // Parent - result = ee->Fire(TS_UNF, 0); + result = ee->Fire(unfold_arrow->GetId(), 0); REQUIRE(result == JArrow::FireResult::KeepGoing); // Child - result = ee->Fire(TS_UNF, 0); + result = ee->Fire(unfold_arrow->GetId(), 0); REQUIRE(result == JArrow::FireResult::KeepGoing); - result = ee->Fire(PH_MAP, 0); + result = ee->Fire(pe_map_arrow->GetId(), 0); REQUIRE(result == JArrow::FireResult::KeepGoing); - result = ee->Fire(PH_TAP, 0); + result = ee->Fire(pe_tap_arrow->GetId(), 0); REQUIRE(result == JArrow::FireResult::KeepGoing); - result = ee->Fire(TS_FLD, 0); - REQUIRE(result == JArrow::FireResult::KeepGoing); - - REQUIRE(top->GetPools()[0]->GetSize(0) == 1); // Unfolder still has parent - REQUIRE(top->GetPools()[1]->GetSize(0) == 4); // Child returned to pool + REQUIRE(ts_pool->GetSize(0) == 1); // Unfolder still has parent + REQUIRE(pe_pool->GetSize(0) == 4); // Child returned to pool } @@ -94,6 +116,7 @@ TEST_CASE("TimeslicesTests_NoEvtProcs") { JApplication app; app.SetParameterValue("jana:nevents", "5"); + app.SetParameterValue("jana:loglevel", "debug"); app.Add(new MyTimesliceSource); app.Add(new MyTimesliceUnfolder); @@ -123,7 +146,7 @@ TEST_CASE("MultilevelSource_Trivial") { auto* source = new MyMultilevelSource; auto* proc = new MyMultilevelProcessor; - source->SetEventLevels({JEventLevel::PhysicsEvent}); + source->SetLevel(JEventLevel::PhysicsEvent); source->data_stream = {{JEventLevel::PhysicsEvent, 4}, {JEventLevel::PhysicsEvent, 5}, {JEventLevel::PhysicsEvent, 6}}; proc->expected_data_stream = {{-1,-1,4}, {-1,-1,5}, {-1,-1,6}}; @@ -133,7 +156,6 @@ TEST_CASE("MultilevelSource_Trivial") { } - } // namespace multilevel_source_tests } // namespce jana diff --git a/src/programs/unit_tests/Topology/MultiLevelTopologyTests.h b/src/programs/unit_tests/Topology/MultiLevelTopologyTests.h index 84faa3466..abfe6e66e 100644 --- a/src/programs/unit_tests/Topology/MultiLevelTopologyTests.h +++ b/src/programs/unit_tests/Topology/MultiLevelTopologyTests.h @@ -181,7 +181,8 @@ struct MyMultilevelSource : public JEventSource { MyMultilevelSource() { SetCallbackStyle(CallbackStyle::ExpertMode); - SetEventLevels({JEventLevel::Run, JEventLevel::SlowControls, JEventLevel::PhysicsEvent}); + SetParentLevels({JEventLevel::Run, JEventLevel::SlowControls}); + SetLevel(JEventLevel::PhysicsEvent); m_calibs_out.SetLevel(JEventLevel::Run); m_controls_out.SetLevel(JEventLevel::SlowControls);