From ef9d735fa83b3d024d7615e6db8b2b5888d52a3f Mon Sep 17 00:00:00 2001 From: Eric Vin Date: Wed, 6 Nov 2024 16:49:04 -0800 Subject: [PATCH 1/9] Initial simulator stepping implementation. --- src/scenic/core/simulators.py | 351 +++++++++++++++++++++------------- 1 file changed, 223 insertions(+), 128 deletions(-) diff --git a/src/scenic/core/simulators.py b/src/scenic/core/simulators.py index 832b03632..18c1fa5c8 100644 --- a/src/scenic/core/simulators.py +++ b/src/scenic/core/simulators.py @@ -11,7 +11,8 @@ """ import abc -from collections import defaultdict +from collections import OrderedDict, defaultdict +from contextlib import contextmanager import enum import math import numbers @@ -135,9 +136,8 @@ def simulate( (rarely) and its security implications. Returns: - A `Simulation` object representing the completed simulation, or `None` if no - simulation satisfying the requirements could be found within - **maxIterations** iterations. + An initialized simulation, or `None` if no simulation satisfying + the requirements could be found within **maxIterations** iterations. Raises: SimulationCreationError: if an error occurred while trying to run a @@ -191,6 +191,53 @@ def simulate( ) return simulation + @contextmanager + def simulateStepped( + self, + scene, + maxSteps=None, + *, + name="SteppedSimulation", + timestep=None, + verbosity=None, + replay=None, + enableReplay=True, + enableDivergenceCheck=False, + divergenceTolerance=0, + continueAfterDivergence=False, + allowPickle=False, + ): + if self._destroyed: + raise RuntimeError( + "simulator cannot run additional simulations " + "(the destroy() method has already been called)" + ) + if verbosity is None: + verbosity = errors.verbosityLevel + + simulation = self.createSimulation( + scene, + maxSteps=maxSteps, + name=name, + verbosity=verbosity, + timestep=timestep, + replay=replay, + enableReplay=enableReplay, + enableDivergenceCheck=enableDivergenceCheck, + divergenceTolerance=divergenceTolerance, + continueAfterDivergence=continueAfterDivergence, + allowPickle=allowPickle, + ) + try: + yield simulation + except (RejectSimulationException, RejectionException, GuardViolation) as e: + # This simulation will be thrown out, but attach it to the exception + # to aid in debugging. + e.simulation = self + raise + finally: + simulation.cleanup() + def replay(self, scene, replay, **kwargs): """Replay a simulation. @@ -207,13 +254,15 @@ def _runSingleSimulation( if verbosity >= 2: print(f" Starting simulation {name}...") try: - simulation = self.createSimulation( + with self.simulateStepped( scene, maxSteps=maxSteps, name=name, verbosity=verbosity, **kwargs, - ) + ) as simulation: + simulation._run() + except (RejectSimulationException, RejectionException, GuardViolation) as e: if verbosity >= 2: print( @@ -339,11 +388,14 @@ def __init__( self.currentTime = 0 self.timestep = 1 if timestep is None else float(timestep) self.verbosity = verbosity + self.maxSteps = maxSteps self.name = name self.worker_num = 0 self.actionSequence = [] + self._cleaned = False + # Prepare to save or load a replay. self.initializeReplay(replay, enableReplay, enableDivergenceCheck, allowPickle) self.divergenceTolerance = divergenceTolerance @@ -356,153 +408,192 @@ def __init__( import scenic.syntax.veneer as veneer veneer.beginSimulation(self) - dynamicScenario = self.scene.dynamicScenario + self.dynamicScenario = self.scene.dynamicScenario # Create objects and perform simulator-specific initialization. self.setup() # Initialize the top-level dynamic scenario. - dynamicScenario._start() + self.dynamicScenario._start() # Update all objects in case the simulator has adjusted any dynamic # properties during setup. self.updateObjects() - # Run the simulation. - terminationType, terminationReason = self._run(dynamicScenario, maxSteps) + # Set terminationType and terminationReason to default None + self.terminationType = None + self.terminationReason = None - # Stop all remaining scenarios. - # (and reject if some 'require eventually' condition was never satisfied) - for scenario in tuple(reversed(veneer.runningScenarios)): - scenario._stop("simulation terminated") - - # Record finally-recorded values. - values = dynamicScenario._evaluateRecordedExprs(RequirementType.recordFinal) - for name, val in values.items(): - self.records[name] = val - - # Package up simulation results into a compact object. - result = SimulationResult( - self.trajectory, - self.actionSequence, - terminationType, - terminationReason, - self.records, - ) - self.result = result except (RejectSimulationException, RejectionException, GuardViolation) as e: # This simulation will be thrown out, but attach it to the exception # to aid in debugging. + self.cleanup() e.simulation = self raise - finally: - self.destroy() - for obj in self.objects: - disableDynamicProxyFor(obj) - for agent in self.agents: - if agent.behavior and agent.behavior._isRunning: - agent.behavior._stop() - # If the simulation was terminated by an exception (including rejections), - # some scenarios may still be running; we need to clean them up without - # checking their requirements, which could raise rejection exceptions. - for scenario in tuple(reversed(veneer.runningScenarios)): - scenario._stop("exception", quiet=True) - veneer.endSimulation(self) - - def _run(self, dynamicScenario, maxSteps): + + def _run(self): assert self.currentTime == 0 while True: - if self.verbosity >= 3: - print(f" Time step {self.currentTime}:") - - # Run compose blocks of compositional scenarios - # (and check if any requirements defined therein fail) - # N.B. if the top-level scenario completes, we don't immediately end - # the simulation since we need to check if any monitors reject first. - terminationReason = dynamicScenario._step() - terminationType = TerminationType.scenarioComplete - - # Record current state of the simulation - self.recordCurrentState() - - # Run monitors - newReason = dynamicScenario._runMonitors() - if newReason is not None: - terminationReason = newReason - terminationType = TerminationType.terminatedByMonitor - - # "Always" and scenario-level requirements have been checked; - # now safe to terminate if the top-level scenario has finished, - # a monitor requested termination, or we've hit the timeout - if terminationReason is not None: - return terminationType, terminationReason - terminationReason = dynamicScenario._checkSimulationTerminationConditions() - if terminationReason is not None: - return TerminationType.simulationTerminationCondition, terminationReason - if maxSteps and self.currentTime >= maxSteps: - return TerminationType.timeLimit, f"reached time limit ({maxSteps} steps)" - - # Clear lastActions for all objects - for obj in self.objects: - obj.lastActions = tuple() - - # Update agents with any objects that now have behaviors (and are not already agents) - self.agents += [ - obj for obj in self.objects if obj.behavior and obj not in self.agents - ] - - # Compute the actions of the agents in this time step - allActions = defaultdict(tuple) - schedule = self.scheduleForAgents() - if not set(self.agents) == set(schedule): - raise RuntimeError("Simulator schedule does not contain all agents") - for agent in schedule: - # If agent doesn't have a behavior right now, continue - if not agent.behavior: - continue - - # Run the agent's behavior to get its actions - actions = agent.behavior._step() - - # Handle pseudo-actions marking the end of a simulation/scenario - if isinstance(actions, _EndSimulationAction): - return TerminationType.terminatedByBehavior, str(actions) - elif isinstance(actions, _EndScenarioAction): - scenario = actions.scenario - if scenario._isRunning: - scenario._stop(actions) - if scenario is dynamicScenario: - # Top-level scenario was terminated, so whole simulation will end. - return TerminationType.terminatedByBehavior, str(actions) - actions = () - - # Check ordinary actions for compatibility - assert isinstance(actions, tuple) - if len(actions) == 1 and isinstance(actions[0], (list, tuple)): - actions = tuple(actions[0]) - if not self.actionsAreCompatible(agent, actions): - raise InvalidScenarioError( - f"agent {agent} tried incompatible action(s) {actions}" + self.advance() + + if self.terminationType: + return + + def advance(self): + if self.terminationType or self._cleaned: + raise TerminatedSimulationException() + + if self.verbosity >= 3: + print(f" Time step {self.currentTime}:") + + # Run compose blocks of compositional scenarios + # (and check if any requirements defined therein fail) + # N.B. if the top-level scenario completes, we don't immediately end + # the simulation since we need to check if any monitors reject first. + terminationReason = self.dynamicScenario._step() + terminationType = TerminationType.scenarioComplete + + # Record current state of the simulation + self.recordCurrentState() + + # Run monitors + newReason = self.dynamicScenario._runMonitors() + if newReason is not None: + terminationReason = newReason + terminationType = TerminationType.terminatedByMonitor + + # "Always" and scenario-level requirements have been checked; + # now safe to terminate if the top-level scenario has finished, + # a monitor requested termination, or we've hit the timeout + if terminationReason is not None: + return self.terminateSimulation(terminationType, terminationReason) + terminationReason = self.dynamicScenario._checkSimulationTerminationConditions() + if terminationReason is not None: + return self.terminateSimulation( + TerminationType.simulationTerminationCondition, terminationReason + ) + if self.maxSteps and self.currentTime >= self.maxSteps: + return self.terminateSimulation( + TerminationType.timeLimit, f"reached time limit ({self.maxSteps} steps)" + ) + + # Clear lastActions for all objects + for obj in self.objects: + obj.lastActions = tuple() + + # Update agents with any objects that now have behaviors (and are not already agents) + self.agents += [ + obj for obj in self.objects if obj.behavior and obj not in self.agents + ] + + # Compute the actions of the agents in this time step + allActions = defaultdict(tuple) + schedule = self.scheduleForAgents() + if not set(self.agents) == set(schedule): + raise RuntimeError("Simulator schedule does not contain all agents") + for agent in schedule: + # If agent doesn't have a behavior right now, continue + if not agent.behavior: + continue + # Run the agent's behavior to get its actions + actions = agent.behavior._step() + + # Handle pseudo-actions marking the end of a simulation/scenario + if isinstance(actions, _EndSimulationAction): + return self.terminateSimulation( + TerminationType.terminatedByBehavior, str(actions) + ) + elif isinstance(actions, _EndScenarioAction): + scenario = actions.scenario + if scenario._isRunning: + scenario._stop(actions) + if scenario is self.dynamicScenario: + # Top-level scenario was terminated, so whole simulation will end. + return self.terminateSimulation( + TerminationType.terminatedByBehavior, str(actions) ) + actions = () + + # Check ordinary actions for compatibility + assert isinstance(actions, tuple) + if len(actions) == 1 and isinstance(actions[0], (list, tuple)): + actions = tuple(actions[0]) + if not self.actionsAreCompatible(agent, actions): + raise InvalidScenarioError( + f"agent {agent} tried incompatible action(s) {actions}" + ) - # Save actions for execution below - allActions[agent] = actions + # Save actions for execution below + allActions[agent] = actions - # Log lastActions + # Log lastActions + agent.lastActions = actions + + # Execute the actions + if self.verbosity >= 3: + for agent, actions in allActions.items(): + print(f" Agent {agent} takes action(s) {actions}") agent.lastActions = actions + self.actionSequence.append(allActions) + self.executeActions(allActions) - # Execute the actions - if self.verbosity >= 3: - for agent, actions in allActions.items(): - print(f" Agent {agent} takes action(s) {actions}") - self.actionSequence.append(allActions) - self.executeActions(allActions) + # Run the simulation for a single step and read its state back into Scenic + self.step() + self.currentTime += 1 + self.updateObjects() - # Run the simulation for a single step and read its state back into Scenic - self.step() - self.currentTime += 1 - self.updateObjects() + def terminateSimulation(self, terimnationType, terminationReason): + import scenic.syntax.veneer as veneer + + # Log terminationType and terminationReason + self.terminationType = terimnationType + self.terminationReason = terminationReason + + # Stop all remaining scenarios. + # (and reject if some 'require eventually' condition was never satisfied) + for scenario in tuple(reversed(veneer.runningScenarios)): + scenario._stop("simulation terminated") + + # Record finally-recorded values. + values = self.dynamicScenario._evaluateRecordedExprs(RequirementType.recordFinal) + for name, val in values.items(): + self.records[name] = val + + # Package up simulation results into a compact object. + result = SimulationResult( + self.trajectory, + self.actionSequence, + self.terminationType, + self.terminationReason, + self.records, + ) + self.result = result + + self.cleanup() + + def cleanup(self): + # No need to repeat cleanup if we've already done it + if self._cleaned: + return + + # Remember that we have cleaned up. + self._cleaned = True + + import scenic.syntax.veneer as veneer + + self.destroy() + for obj in self.objects: + disableDynamicProxyFor(obj) + for agent in self.agents: + if agent.behavior and agent.behavior._isRunning: + agent.behavior._stop() + # If the simulation was terminated by an exception (including rejections), + # some scenarios may still be running; we need to clean them up without + # checking their requirements, which could raise rejection exceptions. + for scenario in tuple(reversed(veneer.runningScenarios)): + scenario._stop("exception", quiet=True) + veneer.endSimulation(self) def setup(self): """Set up the simulation to run in the simulator. @@ -911,3 +1002,7 @@ def __init__(self, trajectory, actions, terminationType, terminationReason, reco self.terminationType = terminationType self.terminationReason = str(terminationReason) self.records = dict(records) + + +class TerminatedSimulationException(Exception): + pass From a452067a6131eb27a9ff97d8b1248d01f7f68c35 Mon Sep 17 00:00:00 2001 From: Eric Vin Date: Wed, 6 Nov 2024 17:07:06 -0800 Subject: [PATCH 2/9] Added stepped simulation test. --- src/scenic/core/simulators.py | 18 +++++------------- tests/core/test_simulators.py | 30 +++++++++++++++++++++++++++++- 2 files changed, 34 insertions(+), 14 deletions(-) diff --git a/src/scenic/core/simulators.py b/src/scenic/core/simulators.py index 18c1fa5c8..57574767d 100644 --- a/src/scenic/core/simulators.py +++ b/src/scenic/core/simulators.py @@ -420,10 +420,6 @@ def __init__( # properties during setup. self.updateObjects() - # Set terminationType and terminationReason to default None - self.terminationType = None - self.terminationReason = None - except (RejectSimulationException, RejectionException, GuardViolation) as e: # This simulation will be thrown out, but attach it to the exception # to aid in debugging. @@ -437,11 +433,11 @@ def _run(self): while True: self.advance() - if self.terminationType: + if self.result: return def advance(self): - if self.terminationType or self._cleaned: + if self.result or self._cleaned: raise TerminatedSimulationException() if self.verbosity >= 3: @@ -543,13 +539,9 @@ def advance(self): self.currentTime += 1 self.updateObjects() - def terminateSimulation(self, terimnationType, terminationReason): + def terminateSimulation(self, terminationType, terminationReason): import scenic.syntax.veneer as veneer - # Log terminationType and terminationReason - self.terminationType = terimnationType - self.terminationReason = terminationReason - # Stop all remaining scenarios. # (and reject if some 'require eventually' condition was never satisfied) for scenario in tuple(reversed(veneer.runningScenarios)): @@ -564,8 +556,8 @@ def terminateSimulation(self, terimnationType, terminationReason): result = SimulationResult( self.trajectory, self.actionSequence, - self.terminationType, - self.terminationReason, + terminationType, + terminationReason, self.records, ) self.result = result diff --git a/tests/core/test_simulators.py b/tests/core/test_simulators.py index 149c1cad1..5358a8cec 100644 --- a/tests/core/test_simulators.py +++ b/tests/core/test_simulators.py @@ -1,6 +1,11 @@ import pytest -from scenic.core.simulators import DummySimulation, DummySimulator, Simulation +from scenic.core.simulators import ( + DummySimulation, + DummySimulator, + Simulation, + TerminatedSimulationException, +) from tests.utils import compileScenic, sampleResultFromScene, sampleSceneFrom @@ -35,6 +40,29 @@ def test_simulator_destruction(): assert "destroy() called twice" in str(e) +def test_simulator_stepped(): + simulator = DummySimulator() + scene = sampleSceneFrom("ego = new Object") + + with simulator.simulateStepped(scene, maxSteps=5) as simulation: + while simulation.result is None: + simulation.advance() + + assert simulation.result is not None + assert simulation.currentTime == 5 + + # advance() should do nothing but raise an exception + # if the simulation is already terminated + with pytest.raises(TerminatedSimulationException): + simulation.advance() + + assert simulation.currentTime == 5 + + # Ensure all values are preserved after leaving the context manager + assert simulation.result is not None + assert simulation.currentTime == 5 + + def test_simulator_set_property(): class TestSimulation(DummySimulation): def createObjectInSimulator(self, obj): From 07e773b6380ca8b8732ff00d616df2f7defa238f Mon Sep 17 00:00:00 2001 From: Eric Vin Date: Wed, 19 Nov 2025 18:30:50 -0800 Subject: [PATCH 3/9] Scene sampling parallelization prototype. --- src/scenic/__init__.py | 1 + src/scenic/__main__.py | 3 +- src/scenic/core/scenarios.py | 87 ++++++++++++++++++++++++++------- src/scenic/core/utils.py | 38 ++++++++++++++ src/scenic/syntax/translator.py | 17 ++++++- 5 files changed, 124 insertions(+), 22 deletions(-) diff --git a/src/scenic/__init__.py b/src/scenic/__init__.py index ac15ea073..fe8fbeb72 100644 --- a/src/scenic/__init__.py +++ b/src/scenic/__init__.py @@ -2,6 +2,7 @@ import scenic.core.errors as _errors from scenic.core.errors import setDebuggingOptions +from scenic.core.utils import setSeed from scenic.syntax.translator import scenarioFromFile, scenarioFromString _errors.showInternalBacktrace = False # see comment in errors module diff --git a/src/scenic/__main__.py b/src/scenic/__main__.py index 05f527fba..076ace8c9 100644 --- a/src/scenic/__main__.py +++ b/src/scenic/__main__.py @@ -185,8 +185,7 @@ if args.verbosity >= 1: print(f"Using random seed = {args.seed}") - random.seed(args.seed) - numpy.random.seed(args.seed) + scenic.setSeed(args.seed) # Load scenario from file if args.verbosity >= 1: diff --git a/src/scenic/core/scenarios.py b/src/scenic/core/scenarios.py index fa93d454b..7bf3a6e06 100644 --- a/src/scenic/core/scenarios.py +++ b/src/scenic/core/scenarios.py @@ -3,9 +3,11 @@ import dataclasses import io import itertools +import multiprocessing import random import sys import time +import warnings import numpy import trimesh @@ -38,6 +40,7 @@ ) from scenic.core.sample_checking import BasicChecker, WeightedAcceptanceChecker from scenic.core.serialization import Serializer, dumpAsScenicCode +from scenic.core.utils import generateInnerBatchHelper from scenic.core.vectors import Vector # Global params @@ -315,6 +318,7 @@ def __init__( self.dependencies = ( self._instances + paramDeps + tuple(requirementDeps) + tuple(behaviorDeps) ) + self._scenarioCreationData = None # Setup the default checker self.defaultRequirements = self.generateDefaultRequirements() @@ -400,11 +404,16 @@ def generate(self, maxIterations=2000, verbosity=0, feedback=None): Raises: `RejectionException`: if no valid sample is found in **maxIterations** iterations. """ - scenes, iterations = self.generateBatch(1, maxIterations, verbosity, feedback) - return scenes[0], iterations + return next(self.generateBatch(1, maxIterations, verbosity, feedback)) def generateBatch( - self, numScenes, maxIterations=float("inf"), verbosity=0, feedback=None + self, + numScenes, + maxIterations=float("inf"), + verbosity=0, + feedback=None, + numWorkers=0, + mute=True, ): """Sample several `Scene` objects from this scenario. @@ -416,29 +425,71 @@ def generateBatch( verbosity (int): Verbosity level. feedback (float): Feedback to pass to external samplers doing active sampling. See :mod:`scenic.core.external_params`. + numWorkers (int): The number of workers to be used when generating scenes. If numWorkers + is 0, scenes will be generated in the main process. + mute (bool): Whether or not to mute stdOut and stdErr in the worker processes. Returns: - A pair with a list of the sampled `Scene` objects and the total number - of iterations used. + An iterable of pairs with a sampled `Scene` and the number of iterations used for that scene. Raises: `RejectionException`: if not enough valid samples are found in **maxIterations** iterations. """ - totalIterations = 0 - scenes = [] + if numWorkers == 0: + totalIterations = 0 + + for _ in range(numScenes): + try: + remainingIts = maxIterations - totalIterations + scene, iterations = self._generateInner( + remainingIts, verbosity, feedback + ) + totalIterations += iterations + yield (scene, iterations) + except RejectionException: + raise RejectionException( + f"failed to generate scenario in {maxIterations} iterations" + ) + else: + if maxIterations != float("inf"): + raise RuntimeError("maxIterations not supported for parallel sampling.") - for _ in range(numScenes): - try: - remainingIts = maxIterations - totalIterations - scene, iterations = self._generateInner(remainingIts, verbosity, feedback) - scenes.append(scene) - totalIterations += iterations - except RejectionException: - raise RejectionException( - f"failed to generate scenario in {maxIterations} iterations" - ) + if feedback is not None: + raise RuntimeError("Feedback not supported for parallel sampling.") + + if verbosity > 0: + warnings.warn("Verbosity > 0 ignored during parallel sampling") + + # Initialize queues and lock + seedQueue = multiprocessing.Queue() + for _ in range(numScenes): + seedQueue.put(random.getrandbits(32)) - return scenes, totalIterations + sceneQueue = multiprocessing.Queue() + + # Initialize processes + params = (self._scenarioCreationData, seedQueue, sceneQueue, mute) + processes = [ + multiprocessing.Process(target=generateInnerBatchHelper, args=params) + for _ in range(numWorkers) + ] + try: + # Prepare process pool + for process in processes: + process.start() + + for _ in range(numScenes): + sceneBytes, iterations = sceneQueue.get() + scene = self.sceneFromBytes(sceneBytes, verify=False) + yield (scene, iterations) + + finally: + # Close processes and queues + for process in processes: + process.terminate() + + seedQueue.close() + sceneQueue.close() def _generateInner(self, maxIterations, verbosity, feedback): # choose which custom requirements will be enforced for this sample diff --git a/src/scenic/core/utils.py b/src/scenic/core/utils.py index 9817843f5..63cee18d1 100644 --- a/src/scenic/core/utils.py +++ b/src/scenic/core/utils.py @@ -4,9 +4,11 @@ import collections from contextlib import contextmanager import functools +import io import itertools import math import os +import random import signal from subprocess import CalledProcessError import sys @@ -393,3 +395,39 @@ def get_type_hints(obj, globalns=None, localns=None): wrapped = wrapped.__wrapped__ globalns = getattr(wrapped, "__globals__", {}) return typing.get_type_hints(obj, globalns, localns) + + +def setSeed(seed): + random.seed(seed) + numpy.random.seed(seed) + + +def generateInnerBatchHelper(scenarioCreationData, seedQueue, sceneQueue, mute): + if mute: + sys.stdout = open(os.devnull, "w") + sys.stderr = open(os.devnull, "w") + + from scenic.syntax.translator import _scenarioFromStream + + stream = io.BytesIO(scenarioCreationData["streamLines"]) + + scenario = _scenarioFromStream( + stream=stream, + compileOptions=scenarioCreationData["compileOptions"], + filename=scenarioCreationData["filename"], + scenario=scenarioCreationData["scenario"], + path=scenarioCreationData["path"], + _cacheImports=False, + ) + + while True: + seed = seedQueue.get() + + setSeed(seed) + + scene, iterations = scenario._generateInner( + maxIterations=float("inf"), verbosity=0, feedback=None + ) + sceneBytes = scenario.sceneToBytes(scene) + + sceneQueue.put((sceneBytes, iterations)) diff --git a/src/scenic/syntax/translator.py b/src/scenic/syntax/translator.py index 994e65b8b..e563e6fa8 100644 --- a/src/scenic/syntax/translator.py +++ b/src/scenic/syntax/translator.py @@ -77,7 +77,7 @@ def hash(self): if isinstance(value, (int, float, str)): stream.write(str(value).encode()) else: - stream.write([0]) + stream.write(str([0]).encode()) if self.scenario: stream.write(self.scenario.encode()) # We can't use `hash` because it is not deterministic @@ -165,6 +165,17 @@ def _scenarioFromStream( behavior as importing a Python module. See `purgeModulesUnsafeToCache` for a more detailed discussion of the internals behind this. """ + # Backup stream and parameters + streamLines = stream.read() + scenarioCreationData = { + "streamLines": streamLines, + "compileOptions": compileOptions, + "filename": filename, + "scenario": scenario, + "path": path, + } + stream = io.BytesIO(streamLines) + # Compile the code as if it were a top-level module oldModules = list(sys.modules.keys()) try: @@ -174,7 +185,9 @@ def _scenarioFromStream( if not _cacheImports: purgeModulesUnsafeToCache(oldModules) # Construct a Scenario from the resulting namespace - return constructScenarioFrom(namespace, scenario) + scenario = constructScenarioFrom(namespace, scenario) + scenario._scenarioCreationData = scenarioCreationData + return scenario @contextmanager From 99f4ac0a3932c79300554a92e261956ba48f1de2 Mon Sep 17 00:00:00 2001 From: Eric Vin Date: Fri, 21 Nov 2025 12:50:54 -0800 Subject: [PATCH 4/9] Scene sampling parallelization. --- src/scenic/core/scenarios.py | 116 +++++++++++++- src/scenic/core/utils.py | 9 +- tests/core/test_scenarios.py | 62 ++++++++ .../benchmark_parallelization.py | 84 ++++++++++ .../benchmarks/adjacentOpposingPair.scenic | 6 + .../benchmarks/badlyParkedCarPullingIn.scenic | 30 ++++ .../benchmarks/bypassing_03.scenic | 106 +++++++++++++ .../benchmarks/city_intersection.scenic | 131 +++++++++++++++ .../benchmarks/enclosed_occluded.scenic | 20 +++ .../benchmarks/enclosed_visible.scenic | 22 +++ .../benchmarks/fully_occluded.scenic | 15 ++ .../benchmarks/fully_visible.scenic | 12 ++ .../benchmarks/narrowGoalNew.scenic | 102 ++++++++++++ .../benchmarks/narrowGoalOld.scenic | 83 ++++++++++ .../benchmarks/pedestrian_02.scenic | 89 +++++++++++ .../parallelization/benchmarks/vacuum.scenic | 149 ++++++++++++++++++ 16 files changed, 1024 insertions(+), 12 deletions(-) create mode 100644 tools/benchmarking/parallelization/benchmark_parallelization.py create mode 100644 tools/benchmarking/parallelization/benchmarks/adjacentOpposingPair.scenic create mode 100644 tools/benchmarking/parallelization/benchmarks/badlyParkedCarPullingIn.scenic create mode 100644 tools/benchmarking/parallelization/benchmarks/bypassing_03.scenic create mode 100644 tools/benchmarking/parallelization/benchmarks/city_intersection.scenic create mode 100644 tools/benchmarking/parallelization/benchmarks/enclosed_occluded.scenic create mode 100644 tools/benchmarking/parallelization/benchmarks/enclosed_visible.scenic create mode 100644 tools/benchmarking/parallelization/benchmarks/fully_occluded.scenic create mode 100644 tools/benchmarking/parallelization/benchmarks/fully_visible.scenic create mode 100644 tools/benchmarking/parallelization/benchmarks/narrowGoalNew.scenic create mode 100644 tools/benchmarking/parallelization/benchmarks/narrowGoalOld.scenic create mode 100644 tools/benchmarking/parallelization/benchmarks/pedestrian_02.scenic create mode 100644 tools/benchmarking/parallelization/benchmarks/vacuum.scenic diff --git a/src/scenic/core/scenarios.py b/src/scenic/core/scenarios.py index 7bf3a6e06..362aaa715 100644 --- a/src/scenic/core/scenarios.py +++ b/src/scenic/core/scenarios.py @@ -404,7 +404,16 @@ def generate(self, maxIterations=2000, verbosity=0, feedback=None): Raises: `RejectionException`: if no valid sample is found in **maxIterations** iterations. """ - return next(self.generateBatch(1, maxIterations, verbosity, feedback)) + scenes, totalIterations = self.generateBatch( + numScenes=1, + maxIterations=maxIterations, + verbosity=verbosity, + feedback=feedback, + numWorkers=0, + mute=False, + ) + assert len(scenes) == 1 + return (scenes[0], totalIterations) def generateBatch( self, @@ -414,6 +423,57 @@ def generateBatch( feedback=None, numWorkers=0, mute=True, + deterministic=True, + serialized=False, + ): + """Sample several `Scene` objects from this scenario. + + For a description of how scene generation is done, see `scene generation`. + + Args: + numScenes (int): Number of scenes to generate. + maxIterations (int): Maximum number of rejection sampling iterations (over all scenes). + verbosity (int): Verbosity level. + feedback (float): Feedback to pass to external samplers doing active sampling. + See :mod:`scenic.core.external_params`. + numWorkers (int): The number of workers to be used when generating scenes. If numWorkers + is 0, scenes will be generated in the main process. + mute (bool): Whether or not to mute stdOut and stdErr in the worker processes. + deterministic (bool): Whether or not scenes will be returned in a deterministic order. + serialized (bool): Whether or not to return scenes in a serialized format. + + Returns: + A pair with a list of the sampled `Scene` objects and the total number + of iterations used. + + Raises: + `RejectionException`: if not enough valid samples are found in **maxIterations** iterations. + """ + stream = self.generateStream( + numScenes=numScenes, + maxIterations=maxIterations, + verbosity=verbosity, + feedback=feedback, + numWorkers=numWorkers, + mute=mute, + deterministic=deterministic, + serialized=serialized, + ) + results_list = list(stream) + scenes = tuple(r[0] for r in results_list) + totalIterations = sum([r[1] for r in results_list]) + return (scenes, totalIterations) + + def generateStream( + self, + numScenes, + maxIterations=float("inf"), + verbosity=0, + feedback=None, + numWorkers=0, + mute=True, + deterministic=False, + serialized=False, ): """Sample several `Scene` objects from this scenario. @@ -428,6 +488,10 @@ def generateBatch( numWorkers (int): The number of workers to be used when generating scenes. If numWorkers is 0, scenes will be generated in the main process. mute (bool): Whether or not to mute stdOut and stdErr in the worker processes. + deterministic (bool): Whether or not scenes will be returned in a deterministic order. + NOTE: Setting this to True may increase latency when waiting for the next `Scene` in + the stream. + serialized (bool): Whether or not to return scenes in a serialized format. Returns: An iterable of pairs with a sampled `Scene` and the number of iterations used for that scene. @@ -457,31 +521,67 @@ def generateBatch( if feedback is not None: raise RuntimeError("Feedback not supported for parallel sampling.") - if verbosity > 0: - warnings.warn("Verbosity > 0 ignored during parallel sampling") + # Initialize results tracking data + resultsList = [] + returnedResults = 0 # Initialize queues and lock seedQueue = multiprocessing.Queue() + seedHistory = {} + + def putSeed(): + newSeed = random.getrandbits(32) + seedQueue.put(newSeed) + seedHistory[newSeed] = len(seedHistory) + if deterministic: + resultsList.append(None) + for _ in range(numScenes): - seedQueue.put(random.getrandbits(32)) + putSeed() sceneQueue = multiprocessing.Queue() # Initialize processes - params = (self._scenarioCreationData, seedQueue, sceneQueue, mute) + params = (self._scenarioCreationData, seedQueue, sceneQueue, verbosity, mute) processes = [ multiprocessing.Process(target=generateInnerBatchHelper, args=params) for _ in range(numWorkers) ] + + # Initialized result management functions + def getResult(): + sceneBytes, resultIterations, resultSeed = sceneQueue.get() + resultScene = ( + sceneBytes + if serialized + else self.sceneFromBytes(sceneBytes, verify=False) + ) + return (resultScene, resultIterations), resultSeed + + def getNextResult(): + if not deterministic: + return getResult()[0] + + while True: + assert len(resultsList) > 0 + + if resultsList[0] is not None: + return resultsList.pop(0) + + result, resultSeed = getResult() + resultIndex = seedHistory[resultSeed] - returnedResults + assert resultsList[resultIndex] is None + resultsList[resultIndex] = result + + # Start sampling processes and yield samples try: # Prepare process pool for process in processes: process.start() for _ in range(numScenes): - sceneBytes, iterations = sceneQueue.get() - scene = self.sceneFromBytes(sceneBytes, verify=False) - yield (scene, iterations) + yield getNextResult() + returnedResults += 1 finally: # Close processes and queues diff --git a/src/scenic/core/utils.py b/src/scenic/core/utils.py index 63cee18d1..632557641 100644 --- a/src/scenic/core/utils.py +++ b/src/scenic/core/utils.py @@ -402,7 +402,9 @@ def setSeed(seed): numpy.random.seed(seed) -def generateInnerBatchHelper(scenarioCreationData, seedQueue, sceneQueue, mute): +def generateInnerBatchHelper( + scenarioCreationData, seedQueue, sceneQueue, verbosity, mute +): if mute: sys.stdout = open(os.devnull, "w") sys.stderr = open(os.devnull, "w") @@ -422,12 +424,11 @@ def generateInnerBatchHelper(scenarioCreationData, seedQueue, sceneQueue, mute): while True: seed = seedQueue.get() - setSeed(seed) scene, iterations = scenario._generateInner( - maxIterations=float("inf"), verbosity=0, feedback=None + maxIterations=float("inf"), verbosity=verbosity, feedback=None ) sceneBytes = scenario.sceneToBytes(scene) - sceneQueue.put((sceneBytes, iterations)) + sceneQueue.put((sceneBytes, iterations, seed)) diff --git a/tests/core/test_scenarios.py b/tests/core/test_scenarios.py index 548d5d146..e4365d462 100644 --- a/tests/core/test_scenarios.py +++ b/tests/core/test_scenarios.py @@ -1,6 +1,10 @@ +import random + import pytest from scenic.core.distributions import Range +from scenic.core.scenarios import Scene +from scenic.core.utils import setSeed from tests.utils import compileScenic @@ -59,3 +63,61 @@ def test_condition_scenario_params_2(): assert all(0.5 <= x <= 0.51 for x in xs) assert any(0.505 <= x for x in xs) assert any(x < 0.505 for x in xs) + + +def test_generateBatch(): + scenario = compileScenic( + """ + ego = new Object facing Range(0, 1) + require ego.heading > 0.5 + """ + ) + scenes, _ = scenario.generateBatch(2, numWorkers=2) + + assert all(isinstance(scene, Scene) for scene in scenes) + assert all(0 <= scene.objects[0].heading <= 1 for scene in scenes) + assert scenes[0].objects[0].heading != scenes[1].objects[0].heading + + +def test_generateBatch_serialized(): + scenario = compileScenic( + """ + ego = new Object facing Range(0, 1) + require ego.heading > 0.5 + """ + ) + scenesBytes, _ = scenario.generateBatch(2, numWorkers=2, serialized=True) + assert all(isinstance(b, bytes) for b in scenesBytes) + + scenes = [scenario.sceneFromBytes(b, verify=True) for b in scenesBytes] + assert all(isinstance(scene, Scene) for scene in scenes) + assert all(0 <= scene.objects[0].heading <= 1 for scene in scenes) + assert scenes[0].objects[0].heading != scenes[1].objects[0].heading + + +def test_generateStream_deterministic(): + seed = random.getrandbits(32) + + scenario = compileScenic( + """ + ego = new Object facing Range(0, 1) + require ego.heading > 0.5 + """ + ) + setSeed(seed) + streamA = tuple(scenario.generateStream(8, numWorkers=2, serialized=True)) + setSeed(seed) + streamB = tuple(scenario.generateStream(8, numWorkers=2, serialized=True)) + bytesSetA = {result[0] for result in streamA} + bytesSetB = {result[0] for result in streamB} + assert bytesSetA == bytesSetB + + setSeed(seed) + streamA = tuple( + scenario.generateStream(8, numWorkers=2, serialized=True, deterministic=True) + ) + setSeed(seed) + streamB = tuple( + scenario.generateStream(8, numWorkers=2, serialized=True, deterministic=True) + ) + assert streamA == streamB diff --git a/tools/benchmarking/parallelization/benchmark_parallelization.py b/tools/benchmarking/parallelization/benchmark_parallelization.py new file mode 100644 index 000000000..fec0f3191 --- /dev/null +++ b/tools/benchmarking/parallelization/benchmark_parallelization.py @@ -0,0 +1,84 @@ +from pathlib import Path +import time +import warnings + +import scenic + +NUM_WORKERS = 8 +BENCHMARKS_BASE_PATH = (Path(__file__).resolve().parent / "benchmarks").resolve() +MAP_PATH = ( + Path(__file__).resolve().parent.parent.parent.parent + / "assets" + / "maps" + / "CARLA" + / "Town05.xodr" +).resolve() +MESH_BASE_PATH = ( + Path(__file__).resolve().parent.parent.parent.parent / "assets" / "meshes" +).resolve() + +BENCHMARKS = [ + ("adjacentOpposingPair.scenic", {"mode2D": True, "map": MAP_PATH}), + ("badlyParkedCarPullingIn.scenic", {"mode2D": True, "map": MAP_PATH}), + ("bypassing_03.scenic", {"mode2D": True, "map": MAP_PATH}), + ("city_intersection.scenic", {"meshBasePath": MESH_BASE_PATH}), + # ("enclosed_occluded.scenic", {}), + ("enclosed_visible.scenic", {}), + ("fully_occluded.scenic", {"meshBasePath": MESH_BASE_PATH}), + ("fully_visible.scenic", {"meshBasePath": MESH_BASE_PATH}), + ("narrowGoalNew.scenic", {"meshBasePath": MESH_BASE_PATH}), + ("narrowGoalOld.scenic", {"mode2D": True, "map": MAP_PATH}), + ("pedestrian_02.scenic", {"mode2D": True, "map": MAP_PATH}), + ("vacuum.scenic", {"numToys": 0, "meshBasePath": MESH_BASE_PATH}), + ("vacuum.scenic", {"numToys": 1, "meshBasePath": MESH_BASE_PATH}), + ("vacuum.scenic", {"numToys": 2, "meshBasePath": MESH_BASE_PATH}), + ("vacuum.scenic", {"numToys": 4, "meshBasePath": MESH_BASE_PATH}), + ("vacuum.scenic", {"numToys": 8, "meshBasePath": MESH_BASE_PATH}), + # ("vacuum.scenic", {"numToys": 16, "meshBasePath": MESH_BASE_PATH}), +] + +NUM_SAMPLES = 128 + + +def run_benchmark(path, params): + scenario = scenic.scenarioFromFile( + BENCHMARKS_BASE_PATH / path, params=params, mode2D=params.get("mode2D", False) + ) + for _ in range(NUM_SAMPLES): + scenario.generate(maxIterations=float("inf")) + + +def run_benchmark_parallel(path, params, *, numWorkers): + scenario = scenic.scenarioFromFile( + BENCHMARKS_BASE_PATH / path, params=params, mode2D=params.get("mode2D", False) + ) + scenario.generateBatch(NUM_SAMPLES, maxIterations=float("inf"), numWorkers=numWorkers) + + +if __name__ == "__main__": + print("Base Performance (`generate`, numWorkers=0):") + for benchmark in BENCHMARKS: + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + start = time.time() + run_benchmark(*benchmark) + trial_time = time.time() - start + print(f"{trial_time: 7.2f} | {benchmark}") + print() + print("Base + Overhead Performance (`generateBatch`, numWorkers=1):") + for benchmark in BENCHMARKS: + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + start = time.time() + run_benchmark_parallel(*benchmark, numWorkers=1) + trial_time = time.time() - start + print(f"{trial_time: 7.2f} | {benchmark}") + print() + print("Batch Performance (`generateBatch`, numWorkers=8):") + for benchmark in BENCHMARKS: + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + start = time.time() + run_benchmark_parallel(*benchmark, numWorkers=8) + trial_time = time.time() - start + print(f"{trial_time: 7.2f} | {benchmark}") diff --git a/tools/benchmarking/parallelization/benchmarks/adjacentOpposingPair.scenic b/tools/benchmarking/parallelization/benchmarks/adjacentOpposingPair.scenic new file mode 100644 index 000000000..055bc93fe --- /dev/null +++ b/tools/benchmarking/parallelization/benchmarks/adjacentOpposingPair.scenic @@ -0,0 +1,6 @@ +model scenic.simulators.carla.model + +ego = new Car with visibleDistance 20 +c2 = new Car visible +c3 = new Car at c2 offset by Range(-10, 1) @ 0 +require abs(relative heading of c3 from c2) >= 150 deg \ No newline at end of file diff --git a/tools/benchmarking/parallelization/benchmarks/badlyParkedCarPullingIn.scenic b/tools/benchmarking/parallelization/benchmarks/badlyParkedCarPullingIn.scenic new file mode 100644 index 000000000..9e1b15b18 --- /dev/null +++ b/tools/benchmarking/parallelization/benchmarks/badlyParkedCarPullingIn.scenic @@ -0,0 +1,30 @@ +param time_step = 1.0/10 + +model scenic.domains.driving.model + +behavior PullIntoRoad(): + while (distance from self to ego) > 15: + wait + do FollowLaneBehavior(laneToFollow=ego.lane) + +ego = new Car with behavior DriveAvoidingCollisions(avoidance_threshold=5) + +rightCurb = ego.laneGroup.curb +spot = new OrientedPoint on visible rightCurb +badAngle = Uniform(1.0, -1.0) * Range(10, 20) deg +parkedCar = new Car left of spot by 0.5, + facing badAngle relative to roadDirection, + with behavior PullIntoRoad + +require (distance to parkedCar) > 20 + +monitor StopAfterInteraction(): + for i in range(50): + wait + while ego.speed > 2: + wait + for i in range(50): + wait + terminate +require monitor StopAfterInteraction() +terminate after 15 seconds # in case ego never breaks diff --git a/tools/benchmarking/parallelization/benchmarks/bypassing_03.scenic b/tools/benchmarking/parallelization/benchmarks/bypassing_03.scenic new file mode 100644 index 000000000..ebbab8023 --- /dev/null +++ b/tools/benchmarking/parallelization/benchmarks/bypassing_03.scenic @@ -0,0 +1,106 @@ +""" +TITLE: Bypassing 03 +AUTHOR: Francis Indaheng, findaheng@berkeley.edu +DESCRIPTION: Ego vehicle performs a lane change to bypass a slow +adversary vehicle but cannot return to its original lane because +the adversary accelerates. Ego vehicle must then slow down to avoid +collision with leading vehicle in new lane. +SOURCE: NHSTA, #16 +""" + +################################# +# MAP AND MODEL # +################################# + +model scenic.simulators.carla.model + +################################# +# CONSTANTS # +################################# + +MODEL = 'vehicle.lincoln.mkz2017' + +param EGO_SPEED = VerifaiRange(7, 10) +param EGO_BRAKE = VerifaiRange(0.7, 1.0) + +param ADV_DIST = VerifaiRange(10, 15) +param ADV_INIT_SPEED = VerifaiRange(2, 4) +param ADV_END_SPEED = 2 * VerifaiRange(7, 10) +ADV_BUFFER_TIME = 5 + +LEAD_DIST = globalParameters.ADV_DIST + 10 +LEAD_SPEED = globalParameters.EGO_SPEED - 4 + +BYPASS_DIST = [15, 10] +SAFE_DIST = 15 +INIT_DIST = 50 +TERM_DIST = 70 +TERM_TIME = 10 + +################################# +# AGENT BEHAVIORS # +################################# + +behavior DecelerateBehavior(brake): + take SetBrakeAction(brake) + +behavior EgoBehavior(): + try: + do FollowLaneBehavior(target_speed=globalParameters.EGO_SPEED) + interrupt when (distance to adversary) < BYPASS_DIST[0]: + fasterLaneSec = self.laneSection.fasterLane + do LaneChangeBehavior( + laneSectionToSwitch=fasterLaneSec, + target_speed=globalParameters.EGO_SPEED) + try: + do FollowLaneBehavior( + target_speed=globalParameters.EGO_SPEED, + laneToFollow=fasterLaneSec.lane) \ + until (distance to adversary) > BYPASS_DIST[1] + interrupt when (distance to lead) < SAFE_DIST: + try: + do DecelerateBehavior(globalParameters.EGO_BRAKE) + interrupt when (distance to lead) > SAFE_DIST: + do FollowLaneBehavior(target_speed=LEAD_SPEED) for TERM_TIME seconds + terminate + +behavior AdversaryBehavior(): + do FollowLaneBehavior(target_speed=globalParameters.ADV_INIT_SPEED) \ + until self.lane is not ego.lane + do FollowLaneBehavior(target_speed=globalParameters.ADV_END_SPEED) + +behavior LeadBehavior(): + fasterLaneSec = self.laneSection.fasterLane + do LaneChangeBehavior( + laneSectionToSwitch=fasterLaneSec, + target_speed=LEAD_SPEED) + do FollowLaneBehavior(target_speed=LEAD_SPEED) + +################################# +# SPATIAL RELATIONS # +################################# + +initLane = Uniform(*network.lanes) +egoSpawnPt = new OrientedPoint in initLane.centerline + +################################# +# SCENARIO SPECIFICATION # +################################# + +ego = new Car at egoSpawnPt, + with blueprint MODEL, + with behavior EgoBehavior() + +adversary = new Car following roadDirection for globalParameters.ADV_DIST, + with blueprint MODEL, + with behavior AdversaryBehavior() + +lead = new Car following roadDirection for LEAD_DIST, + with blueprint MODEL, + with behavior LeadBehavior() + +require (distance to intersection) > INIT_DIST +require (distance from adversary to intersection) > INIT_DIST +require (distance from lead to intersection) > INIT_DIST +require always (adversary.laneSection._fasterLane is not None) +terminate when (distance to egoSpawnPt) > TERM_DIST diff --git a/tools/benchmarking/parallelization/benchmarks/city_intersection.scenic b/tools/benchmarking/parallelization/benchmarks/city_intersection.scenic new file mode 100644 index 000000000..6afe51efc --- /dev/null +++ b/tools/benchmarking/parallelization/benchmarks/city_intersection.scenic @@ -0,0 +1,131 @@ +""" +Generate a city intersection driving scenario, an intersection +of two 2-lane one way roads in a city. +""" + +model scenic.simulators.webots.model + +import shapely +import time +import shutil +import os +from pathlib import Path + +class EgoCar(WebotsObject): + webotsName: "EGO" + shape: MeshShape.fromFile(globalParameters.meshBasePath / "bmwx5_hull.obj.bz2", initial_rotation=(90 deg, 0, 0)) + positionOffset: Vector(-1.43580750, 0, -0.557354985).rotatedBy(Orientation.fromEuler(*self.orientationOffset)) + cameraOffset: Vector(-1.43580750, 0, -0.557354985) + Vector(1.72, 0, 1.4) + orientationOffset: (90 deg, 0, 0) + viewAngles: (1.5, 60 deg) + visibleDistance: 100 + rayDensity: 10 + +class Car(EgoCar): + webotsName: "CAR" + +class CommercialBuilding(WebotsObject): + webotsType: "BUILDING_COMMERCIAL" + width: 22 + length: 22 + height: 100 + yaw: Uniform(1, 2, 3) * 90 deg + +class ResidentialBuilding(WebotsObject): + webotsType: "BUILDING_RESIDENTIAL" + width: 14.275 + length: 57.4 + height: 40 + yaw: 90 deg + +class GlassBuilding(WebotsObject): + webotsType: "BUILDING_GLASS" + width: 14.1 + length: 8.1 + height: 112 + yaw: Uniform(1, 2, 3) * 90 deg + +class LogImageAction(Action): + def __init__(self, visible: bool, path: str, count: int): + self.visible = visible + self.path = path + self.count = count + + def applyTo(self, obj, sim): + print("Other Car Visible:", self.visible) + + target_path = self.path + "/" + target_path += "visible" if self.visible else "invisible" + + if not os.path.exists(target_path): + os.makedirs(target_path) + + target_path += "/" + str(self.count) + ".jpeg" + + print("IMG Path:", target_path) + + # Wait for other controller to write image + time.sleep(0.001) + attempts = 0 + while not os.path.exists(localPath("images/live_img.jpeg")): + print("Waiting for image...") + attempts += 1 + time.sleep(0.001) + + if attempts > 10: + print("Could not move image...") + return + + shutil.move(localPath("images/live_img.jpeg"), target_path) + +behavior LogCamera(path): + count = 0 + while True: + visible = ego can see car + take LogImageAction(visible, path, count) + count += 1 + +# Create a region that represents both lanes of the crossing road. +crossing_road_lane = RectangularRegion((0,0,0.02), 0, 160, 5) + +car = new Car facing 90 deg, on crossing_road_lane, with regionContainedIn crossing_road_lane +require car.x > 10 + +# Create a region that represents both lanes of the bottom road. +bottom_road_lane = RectangularRegion((0,-55,0.02), 0, 5, 80) + +# Place the ego car in one of the lanes, and ensure it is fully contained. +ego = new EgoCar on bottom_road_lane, with regionContainedIn bottom_road_lane, with behavior LogCamera(localPath(f"images/{time.time_ns()}")) + +# Create a region composed of all 4 quadrants around the road +top_right_quadrant = RectangularRegion(56@56, 0, 100, 100) +top_left_quadrant = RectangularRegion(-56@56, 0, 100, 100) +bottom_right_quadrant = RectangularRegion(56@-56, 0, 100, 100) +bottom_left_quadrant = RectangularRegion(-56@-56, 0, 100, 100) + +building_region = top_right_quadrant.union(top_left_quadrant) + +# Add buildings, some randomly, some designed to block visibility of the center road +for _ in range(1): + new CommercialBuilding in building_region, with regionContainedIn building_region + +for _ in range(2): + new ResidentialBuilding in building_region, with regionContainedIn building_region + +for _ in range(2): + new GlassBuilding in building_region, with regionContainedIn building_region + +new ResidentialBuilding at (-36, -21, 0) +new CommercialBuilding at (18 + Range(-1,1), -20 + Range(-1,1), 0), facing Range(-5,5) deg +new CommercialBuilding at (50 + Range(-1,1), -22 + Range(-1,1), 0), facing Range(-5,5) deg + +# Terminate the simulation after the ego has passed through the intersection or a timeout is reached +terminate when ego.position.y > 0 +terminate after 60 seconds + +# Require that the ego can eventually see the crossing car, but not until it gets close. +require eventually (ego can see car) +require (not ego can see car) until (distance from ego to car < 75) + +# Require that the cars do not crash +require always distance to car > 2 diff --git a/tools/benchmarking/parallelization/benchmarks/enclosed_occluded.scenic b/tools/benchmarking/parallelization/benchmarks/enclosed_occluded.scenic new file mode 100644 index 000000000..0856487cf --- /dev/null +++ b/tools/benchmarking/parallelization/benchmarks/enclosed_occluded.scenic @@ -0,0 +1,20 @@ +""" Tests visibility calculation time on an object completely enclosed in two spheres, +trying to view the outside sphere. +""" + +from scipy.spatial.transform import Rotation +from pathlib import Path + +workspace = Workspace(everywhere) + +ego = new Object facing Orientation(Rotation.random()), at (Range(-0.1,0.1),Range(-0.1,0.1),Range(-0.1,0.1)) + +def get_shape(hole_size): + return MeshShape( + SpheroidRegion(dimensions=(1,1,1)).difference( + SpheroidRegion(dimensions=(0.9,0.9,0.9))).difference( + BoxRegion(dimensions=(hole_size,0.5,hole_size), position=(0,0.5,0))).mesh + ) + +occluding_sphere = new Object at (0,0,0), with shape get_shape(0.1), with width 3, with length 3, with height 3 +target_sphere = new Object at (0,0,0), with shape get_shape(0.3), with width 5, with length 5, with height 5, not visible diff --git a/tools/benchmarking/parallelization/benchmarks/enclosed_visible.scenic b/tools/benchmarking/parallelization/benchmarks/enclosed_visible.scenic new file mode 100644 index 000000000..52cc2b4d9 --- /dev/null +++ b/tools/benchmarking/parallelization/benchmarks/enclosed_visible.scenic @@ -0,0 +1,22 @@ +""" Tests visibility calculation time on an object almost completely enclosed in two spheres, +trying to view the outside sphere. +""" + +from scipy.spatial.transform import Rotation +from pathlib import Path + +workspace = Workspace(everywhere) + +ego = new Object facing Orientation(Rotation.random()), at (Range(-0.1,0.1),Range(-0.1,0.1),Range(-0.1,0.1)) + +hollow_sphere_shape = MeshShape( + SpheroidRegion(dimensions=(5,5,5)).difference(SpheroidRegion(dimensions=(4.8,4.8,4.8))).mesh + ) + +hollow_sphere_shape_with_hole = MeshShape( + SpheroidRegion(dimensions=(5.2,5.2,5.2)).difference(SpheroidRegion(dimensions=(5.01,5.01,5.01))).difference( + BoxRegion(dimensions=(0.1,0.1,1), position=(0,0,2.5))).mesh + ) + +occluding_sphere = new Object at (0,0,0), with shape hollow_sphere_shape_with_hole +target_sphere = new Object at (0,0,0), with shape hollow_sphere_shape, visible diff --git a/tools/benchmarking/parallelization/benchmarks/fully_occluded.scenic b/tools/benchmarking/parallelization/benchmarks/fully_occluded.scenic new file mode 100644 index 000000000..b61ade734 --- /dev/null +++ b/tools/benchmarking/parallelization/benchmarks/fully_occluded.scenic @@ -0,0 +1,15 @@ +""" Tests visibility calculation time on a complex mesh, completely occluded by another complex mesh""" + +from pathlib import Path + +workspace = Workspace(everywhere) + +ego = new Object + +chair_shape = MeshShape.fromFile(path=globalParameters.meshBasePath / "chair.obj.bz2", initial_rotation=(0,90 deg,0)) + +obscuring_chair = new Object with shape chair_shape, at (0,5,0), + with pitch -90 deg, with width 5, with length 5, with height 5 + +target_chair = new Object with shape chair_shape, at (0,10,0), + with width 3, with length 3, with height 3, not visible diff --git a/tools/benchmarking/parallelization/benchmarks/fully_visible.scenic b/tools/benchmarking/parallelization/benchmarks/fully_visible.scenic new file mode 100644 index 000000000..59d2935f9 --- /dev/null +++ b/tools/benchmarking/parallelization/benchmarks/fully_visible.scenic @@ -0,0 +1,12 @@ +""" Tests visibility calculation time on a complex mesh that doesn't contain its center """ + +from pathlib import Path + +workspace = Workspace(everywhere) + +ego = new Object + +chair_shape = MeshShape.fromFile(path=globalParameters.meshBasePath / "chair.obj.bz2", initial_rotation=(0,90 deg,0)) + +target_chair = new Object with shape chair_shape, at (0,10,0), + with width 3, with length 3, with height 3, visible diff --git a/tools/benchmarking/parallelization/benchmarks/narrowGoalNew.scenic b/tools/benchmarking/parallelization/benchmarks/narrowGoalNew.scenic new file mode 100644 index 000000000..2199d91e0 --- /dev/null +++ b/tools/benchmarking/parallelization/benchmarks/narrowGoalNew.scenic @@ -0,0 +1,102 @@ +model scenic.simulators.webots.model + +from pathlib import Path + +# Set up workspace +width = 10 +length = 10 +workspace = Workspace(RectangularRegion(0 @ 0, 0, width, length)) + +# types of objects + +class MarsGround(Ground): + width: width + length: length + gridSize: 20 + +class MarsHill(Hill): + position: new Point in workspace + width: Range(1,2) + length: Range(1,2) + height: Range(0.1, 0.3) + spread: Range(0.2, 0.3) + regionContainedIn: everywhere + +class Goal(WebotsObject): + """Flag indicating the goal location.""" + width: 0.1 + length: 0.1 + webotsType: 'GOAL' + +class Rover(WebotsObject): + """Mars rover.""" + width: 0.5 + length: 0.7 + height: 0.4 + webotsType: 'ROVER' + rotationOffset: (90 deg, 0, 0) + +class Debris(WebotsObject): + """Abstract class for debris scattered randomly in the workspace.""" + # Recess things into the ground slightly by default + baseOffset: (0, 0, -self.height/3) + +class BigRock(Debris): + """Large rock.""" + shape: MeshShape.fromFile(globalParameters.meshBasePath / "webots_rock_large.obj.bz2") + yaw: Range(0, 360 deg) + webotsType: 'ROCK_BIG' + positionOffset: Vector(0,0, -self.height/2) + +class Rock(Debris): + """Small rock.""" + shape: MeshShape.fromFile(globalParameters.meshBasePath / "webots_rock_small.obj.bz2") + yaw: Range(0, 360 deg) + webotsType: 'ROCK_SMALL' + positionOffset: Vector(0,0, -self.height/2) + +class Pipe(Debris): + """Pipe with variable length.""" + width: 0.2 + length: Range(0.5, 1.5) + height: self.width + shape: CylinderShape(initial_rotation=(90 deg, 0, 90 deg)) + yaw: Range(0, 360 deg) + webotsType: 'PIPE' + rotationOffset: (90 deg, 0, 90 deg) + + def startDynamicSimulation(self): + # Apply variable length + self.webotsObject.getField('height').setSFFloat(self.length) + +# Ground with random gaussian hills +ground = new MarsGround on (0,0,0), with terrain [new MarsHill for _ in range(60)] + +# Ego and goal on ground +ego = new Rover at (0, -3), on ground, with controller 'sojourner' +goal = new Goal at (Range(-2, 2), Range(2, 3)), on ground, facing (0,0,0) + +# Bottleneck made of two pipes with a rock in between +bottleneck = new OrientedPoint at ego offset by Range(-1.5, 1.5) @ Range(0.5, 1.5), facing Range(-30, 30) deg +require abs((angle to goal) - (angle to bottleneck)) <= 10 deg +new BigRock at bottleneck, on ground + +gap = 1.2 * ego.width +halfGap = gap / 2 + +leftEdge = new OrientedPoint left of bottleneck by halfGap, + facing Range(60, 120) deg relative to bottleneck.heading +rightEdge = new OrientedPoint right of bottleneck by halfGap, + facing Range(-120, -60) deg relative to bottleneck.heading + +new Pipe ahead of leftEdge, with length Range(1, 2), on ground, facing leftEdge, with parentOrientation 0 +new Pipe ahead of rightEdge, with length Range(1, 2), on ground, facing rightEdge, with parentOrientation 0 + +# Other junk because why not? + +new Pipe on ground, with parentOrientation 0 +new BigRock beyond bottleneck by Range(0.25, 0.75) @ Range(0.75, 1), on ground +new BigRock beyond bottleneck by Range(-0.75, -0.25) @ Range(0.75, 1), on ground +new Rock on ground +new Rock on ground +new Rock on ground diff --git a/tools/benchmarking/parallelization/benchmarks/narrowGoalOld.scenic b/tools/benchmarking/parallelization/benchmarks/narrowGoalOld.scenic new file mode 100644 index 000000000..aec4524bd --- /dev/null +++ b/tools/benchmarking/parallelization/benchmarks/narrowGoalOld.scenic @@ -0,0 +1,83 @@ +from scenic.simulators.webots.model import WebotsObject + +# Set up workspace +width = 5 +length = 5 +workspace = Workspace(RectangularRegion(0 @ 0, 0, width, length)) + +# types of objects + +class Goal(WebotsObject): + """Flag indicating the goal location.""" + width: 0.3 + length: 0.3 + webotsType: 'GOAL' + +class Rover(WebotsObject): + """Mars rover.""" + width: 0.5 + length: 0.7 + webotsType: 'ROVER' + rotationOffset: 90 deg + +class Debris(WebotsObject): + """Abstract class for debris scattered randomly in the workspace.""" + position: new Point in workspace + heading: Range(0, 360) deg + +class BigRock(Debris): + """Large rock.""" + width: 0.17 + length: 0.17 + webotsType: 'ROCK_BIG' + +class Rock(Debris): + """Small rock.""" + width: 0.10 + length: 0.10 + webotsType: 'ROCK_SMALL' + +class Pipe(Debris): + """Pipe with variable length.""" + width: 0.2 + length: Range(0.5, 1.5) + webotsType: 'PIPE' + + def startDynamicSimulation(self): + # Apply variable length + self.webotsObject.getField('height').setSFFloat(self.length) + # Apply 3D rotation to make pipes lie flat on surface + rotation = [cos(self.heading), sin(self.heading), 0, 90 deg] + self.webotsObject.getField('rotation').setSFRotation(rotation) + +ego = new Rover at 0 @ -2 + +goal = new Goal at Range(-2, 2) @ Range(2, 2.5) + +# Bottleneck made of two pipes with a rock in between + +gap = 1.2 * ego.width +halfGap = gap / 2 + +bottleneck = new OrientedPoint offset by Range(-1.5, 1.5) @ Range(0.5, 1.5), facing Range(-30, 30) deg + +require abs((angle to goal) - (angle to bottleneck)) <= 10 deg + +new BigRock at bottleneck + +leftEdge = new OrientedPoint at bottleneck offset by -halfGap @ 0, + facing Range(60, 120) deg relative to bottleneck.heading +rightEdge = new OrientedPoint at bottleneck offset by halfGap @ 0, + facing Range(-120, -60) deg relative to bottleneck.heading + +new Pipe ahead of leftEdge, with length Range(1, 2) +new Pipe ahead of rightEdge, with length Range(1, 2) + +# Other junk because why not? + +new Pipe +new BigRock beyond bottleneck by Range(-0.5, 0.5) @ Range(0.5, 1) +new BigRock beyond bottleneck by Range(-0.5, 0.5) @ Range(0.5, 1) +new Rock +new Rock +new Rock \ No newline at end of file diff --git a/tools/benchmarking/parallelization/benchmarks/pedestrian_02.scenic b/tools/benchmarking/parallelization/benchmarks/pedestrian_02.scenic new file mode 100644 index 000000000..5d2fc408e --- /dev/null +++ b/tools/benchmarking/parallelization/benchmarks/pedestrian_02.scenic @@ -0,0 +1,89 @@ +""" +TITLE: Pedestrian 02 +AUTHOR: Francis Indaheng, findaheng@berkeley.edu +DESCRIPTION: Both ego and adversary vehicles must suddenly stop to avoid +collision when pedestrian crosses the road unexpectedly. +SOURCE: Carla Challenge, #03 +""" + +################################# +# MAP AND MODEL # +################################# + +model scenic.simulators.carla.model + +################################# +# CONSTANTS # +################################# + +MODEL = 'vehicle.lincoln.mkz2017' + +param EGO_INIT_DIST = VerifaiRange(-30, -20) +param EGO_SPEED = VerifaiRange(7, 10) +EGO_BRAKE = 1.0 + +param ADV_INIT_DIST = VerifaiRange(40, 50) +param ADV_SPEED = VerifaiRange(7, 10) +ADV_BRAKE = 1.0 + +PED_MIN_SPEED = 1.0 +PED_THRESHOLD = 20 + +param SAFETY_DIST = VerifaiRange(10, 15) +BUFFER_DIST = 75 +CRASH_DIST = 5 +TERM_DIST = 50 + +################################# +# AGENT BEHAVIORS # +################################# + +behavior EgoBehavior(): + try: + do FollowLaneBehavior(target_speed=globalParameters.EGO_SPEED) + interrupt when withinDistanceToObjsInLane(self, globalParameters.SAFETY_DIST) and (ped in network.drivableRegion): + take SetBrakeAction(EGO_BRAKE) + interrupt when withinDistanceToAnyObjs(self, CRASH_DIST): + terminate + +behavior AdvBehavior(): + try: + do FollowLaneBehavior(target_speed=globalParameters.ADV_SPEED) + interrupt when (withinDistanceToObjsInLane(self, globalParameters.SAFETY_DIST) or (distance from adv to ped) < 10) and (ped in network.drivableRegion): + take SetBrakeAction(ADV_BRAKE) + interrupt when withinDistanceToAnyObjs(self, CRASH_DIST): + terminate + +################################# +# SPATIAL RELATIONS # +################################# + +road = Uniform(*filter(lambda r: len(r.forwardLanes.lanes) == len(r.backwardLanes.lanes) == 1, network.roads)) +egoLane = Uniform(road.forwardLanes.lanes)[0] +spawnPt = new OrientedPoint on egoLane.centerline +advSpawnPt = new OrientedPoint following roadDirection from spawnPt for globalParameters.ADV_INIT_DIST + +################################# +# SCENARIO SPECIFICATION # +################################# + +ego = new Car following roadDirection from spawnPt for globalParameters.EGO_INIT_DIST, + with blueprint MODEL, + with behavior EgoBehavior() + +ped = new Pedestrian right of spawnPt by 3, + with heading 90 deg relative to spawnPt.heading, + with regionContainedIn None, + with behavior CrossingBehavior(ego, PED_MIN_SPEED, PED_THRESHOLD) + +adv = new Car left of advSpawnPt by 3, + with blueprint MODEL, + with heading 180 deg relative to spawnPt.heading, + with behavior AdvBehavior() + +require (distance from spawnPt to intersection) > BUFFER_DIST +require always (ego.laneSection._slowerLane is None) +require always (ego.laneSection._fasterLane is None) +require always (adv.laneSection._slowerLane is None) +require always (adv.laneSection._fasterLane is None) +terminate when (distance to spawnPt) > TERM_DIST diff --git a/tools/benchmarking/parallelization/benchmarks/vacuum.scenic b/tools/benchmarking/parallelization/benchmarks/vacuum.scenic new file mode 100644 index 000000000..0b11fe6db --- /dev/null +++ b/tools/benchmarking/parallelization/benchmarks/vacuum.scenic @@ -0,0 +1,149 @@ +""" +Generate a room for the i-roomba create vacuum +""" +model scenic.simulators.webots.model + +import numpy as np +import trimesh +import random +from pathlib import Path + +param numToys = 0 +param duration = 10 + +## Class Definitions ## + +class Vacuum(WebotsObject): + webotsName: "IROBOT_CREATE" + shape: CylinderShape() + width: 0.335 + length: 0.335 + height: 0.07 + customData: str(random.getrandbits(32)) # Random seed for robot controller + +# Floor uses builtin Webots floor to keep Vacuum Sensors from breaking +# Not actually linked to WebotsObject because Webots floor is 2D +class Floor(Object): + width: 5 + length: 5 + height: 0.01 + position: (0,0,-0.005) + +class Wall(WebotsObject): + webotsAdhoc: {'physics': False} + width: 5 + length: 0.04 + height: 0.5 + +class DiningTable(WebotsObject): + webotsAdhoc: {'physics': True} + shape: MeshShape.fromFile(globalParameters.meshBasePath / "dining_table.obj.bz2") + width: Range(0.7, 1.5) + length: Range(0.7, 1.5) + height: 0.75 + density: 670 # Density of solid birch + +class DiningChair(WebotsObject): + webotsAdhoc: {'physics': True} + shape: MeshShape.fromFile(globalParameters.meshBasePath / "dining_chair.obj.bz2", initial_rotation=(180 deg, 0, 0)) + width: 0.4 + length: 0.4 + height: 1 + density: 670 # Density of solid birch + positionStdDev: (0.05, 0.05 ,0) + orientationStdDev: (10 deg, 0, 0) + +class Couch(WebotsObject): + webotsAdhoc: {'physics': False} + shape: MeshShape.fromFile(globalParameters.meshBasePath / "couch.obj.bz2", initial_rotation=(-90 deg, 0, 0)) + width: 2 + length: 0.75 + height: 0.75 + positionStdDev: (0.05, 0.5 ,0) + orientationStdDev: (5 deg, 0, 0) + +class CoffeeTable(WebotsObject): + webotsAdhoc: {'physics': False} + shape: MeshShape.fromFile(globalParameters.meshBasePath / "coffee_table.obj.bz2") + width: 1.5 + length: 0.5 + height: 0.4 + positionStdDev: (0.05, 0.05 ,0) + orientationStdDev: (5 deg, 0, 0) + +class Toy(WebotsObject): + webotsAdhoc: {'physics': True} + shape: Uniform(BoxShape(), CylinderShape(), ConeShape(), SpheroidShape()) + width: 0.1 + length: 0.1 + height: 0.1 + density: 100 + +class BlockToy(Toy): + shape: BoxShape() + +## Scene Layout ## + +# Create room region and set it as the workspace +room_region = RectangularRegion(0 @ 0, 0, 5.09, 5.09) +workspace = Workspace(room_region) + +# Create floor and walls +floor = new Floor +wall_offset = floor.width/2 + 0.04/2 + 1e-4 +right_wall = new Wall at (wall_offset, 0, 0.25), facing toward floor +left_wall = new Wall at (-wall_offset, 0, 0.25), facing toward floor +front_wall = new Wall at (0, wall_offset, 0.25), facing toward floor +back_wall = new Wall at (0, -wall_offset, 0.25), facing toward floor + +# Place vacuum on floor +ego = new Vacuum on floor + +# Create a "safe zone" around the vacuum so that it does not start stuck +safe_zone = CircularRegion(ego.position, radius=1) + +# Create a dining room region where we will place dining room furniture +dining_room_region = RectangularRegion(1.25 @ 0, 0, 2.5, 5).difference(safe_zone) + +# Place a table with 3 chairs around it, and one knocked over on the floor +dining_table = new DiningTable contained in dining_room_region, on floor, + facing Range(0, 360 deg) + +chair_1 = new DiningChair behind dining_table by -0.1, on floor, + facing toward dining_table, with regionContainedIn dining_room_region +chair_2 = new DiningChair ahead of dining_table by -0.1, on floor, + facing toward dining_table, with regionContainedIn dining_room_region +chair_3 = new DiningChair left of dining_table by -0.1, on floor, + facing toward dining_table, with regionContainedIn dining_room_region + +fallen_orientation = Uniform((0, -90 deg, 0), (0, 90 deg, 0), (0, 0, -90 deg), (0, 0, 90 deg)) + +chair_4 = new DiningChair contained in dining_room_region, facing fallen_orientation, + on floor, with baseOffset(0,0,-0.2) + +# Add some noise to the positions and yaw of the chairs around the table +mutate chair_1, chair_2, chair_3 + +# Create a living room region where we will place living room furniture +living_room_region = RectangularRegion(-1.25 @ 0, 0, 2.5, 5).difference(safe_zone) + +couch = new Couch ahead of left_wall by 0.335, + on floor, facing away from left_wall + +coffee_table = new CoffeeTable ahead of couch by 0.336, + on floor, facing away from couch + +# Add some noise to the positions of the couch and coffee table +mutate couch, coffee_table + +toy_stack = new BlockToy on floor +toy_stack = new BlockToy on toy_stack +toy_stack = new BlockToy on toy_stack + +# Spawn some toys +for _ in range(globalParameters.numToys): + new Toy on floor + +## Simulation Setup ## +terminate after globalParameters.duration * 60 seconds +record (ego.x, ego.y) as VacuumPosition From cf3c6479f530ebcd2dba37e9fe1508424b24fe60 Mon Sep 17 00:00:00 2001 From: Eric Vin Date: Mon, 1 Dec 2025 20:04:33 -0800 Subject: [PATCH 5/9] Simulation parallelization --- src/scenic/core/scenarios.py | 145 ++++++++--- src/scenic/core/simulators.py | 236 +++++++++++++++++- src/scenic/core/utils.py | 33 +-- tests/core/test_scenarios.py | 3 + ...ization.py => benchmark_scene_parallel.py} | 38 +-- .../parallelization/benchmark_sim_parallel.py | 89 +++++++ .../benchmarks/badlyParkedCarPullingIn.scenic | 2 +- .../benchmarks/bypassing_03.scenic | 3 +- 8 files changed, 458 insertions(+), 91 deletions(-) rename tools/benchmarking/parallelization/{benchmark_parallelization.py => benchmark_scene_parallel.py} (73%) create mode 100644 tools/benchmarking/parallelization/benchmark_sim_parallel.py diff --git a/src/scenic/core/scenarios.py b/src/scenic/core/scenarios.py index 362aaa715..852680ada 100644 --- a/src/scenic/core/scenarios.py +++ b/src/scenic/core/scenarios.py @@ -4,6 +4,7 @@ import io import itertools import multiprocessing +import os import random import sys import time @@ -40,7 +41,7 @@ ) from scenic.core.sample_checking import BasicChecker, WeightedAcceptanceChecker from scenic.core.serialization import Serializer, dumpAsScenicCode -from scenic.core.utils import generateInnerBatchHelper +from scenic.core.utils import setSeed from scenic.core.vectors import Vector # Global params @@ -423,7 +424,6 @@ def generateBatch( feedback=None, numWorkers=0, mute=True, - deterministic=True, serialized=False, ): """Sample several `Scene` objects from this scenario. @@ -439,7 +439,6 @@ def generateBatch( numWorkers (int): The number of workers to be used when generating scenes. If numWorkers is 0, scenes will be generated in the main process. mute (bool): Whether or not to mute stdOut and stdErr in the worker processes. - deterministic (bool): Whether or not scenes will be returned in a deterministic order. serialized (bool): Whether or not to return scenes in a serialized format. Returns: @@ -456,8 +455,9 @@ def generateBatch( feedback=feedback, numWorkers=numWorkers, mute=mute, - deterministic=deterministic, serialized=serialized, + deterministic=True, + iterationCount=True, ) results_list = list(stream) scenes = tuple(r[0] for r in results_list) @@ -471,38 +471,69 @@ def generateStream( verbosity=0, feedback=None, numWorkers=0, + bufferSize=None, mute=True, - deterministic=False, serialized=False, + deterministic=True, + iterationCount=False, ): - """Sample several `Scene` objects from this scenario. + """Sample a stream of `Scene` objects from this scenario. - For a description of how scene generation is done, see `scene generation`. + For a description of how scene generation is done, see `scene generation`. This function can produce + both finite and infinite streams (depending on the value of `numScenes`). + + .. note:: + NOTE: The deterministic parameter is by default set to True, meaning that scenes + will be returned in a fixed order for a given Scenic seed. Setting this to False + means scenes will be returned in a possibly non-deterministic order, but with possibly + decreased latency. When deterministic is set to False, the ordering of the returned scenes + is not fully random, with scenes that are easier to generate being more likely to be returned + earlier in the stream. Despite this, the overall distribution of the returned scenes still + matches the scenario. When generating an infinite stream, `deterministic` must be set to True. Args: - numScenes (int): Number of scenes to generate. + numScenes (int): Number of scenes to generate, or `float('inf')` to sample an infinite stream + of Scenes. maxIterations (int): Maximum number of rejection sampling iterations (over all scenes). verbosity (int): Verbosity level. feedback (float): Feedback to pass to external samplers doing active sampling. See :mod:`scenic.core.external_params`. numWorkers (int): The number of workers to be used when generating scenes. If numWorkers is 0, scenes will be generated in the main process. + bufferSize (int): The number of scenes to have available at any given time, or `None` to + use default values. If set to `None` and `numScenes` is finite, all scenes are generated + in a greedy fashon. If set to `None and `numScenes` is infinite, set to a default + value of `2*numWorkers`. mute (bool): Whether or not to mute stdOut and stdErr in the worker processes. - deterministic (bool): Whether or not scenes will be returned in a deterministic order. - NOTE: Setting this to True may increase latency when waiting for the next `Scene` in - the stream. serialized (bool): Whether or not to return scenes in a serialized format. + deterministic (bool): Whether or not scenes will be returned in a deterministic order. This + must be set to `True` when generating an infinite stream. + iterationCount (bool): Whether or not to return the number of iterations used to generate each + scene. If this is set to `False`, the return type is simply an iterable of `Scene` objects. Returns: - An iterable of pairs with a sampled `Scene` and the number of iterations used for that scene. - - Raises: - `RejectionException`: if not enough valid samples are found in **maxIterations** iterations. + An iterable of pairs with a sampled `Scene` and the number of iterations used for that scene + (if iterationCount is set to True). """ + if numScenes <= 0: + raise ValueError("`numScenes` must be at least 1.") + + if not isinstance(numScenes, int) and numScenes != float("inf"): + raise ValueError("`numScenes` must be either an `int` or `float('inf')`.") + + if numScenes == float("inf") and not deterministic: + raise ValueError( + "`deterministic` must be set to `True` when generating an infinite stream." + ) + + if bufferSize is None and numScenes == float("inf"): + bufferSize = 2 * numWorkers + if numWorkers == 0: totalIterations = 0 + returnedResultCount = 0 - for _ in range(numScenes): + while returnedResultCount < numScenes: try: remainingIts = maxIterations - totalIterations scene, iterations = self._generateInner( @@ -510,33 +541,39 @@ def generateStream( ) totalIterations += iterations yield (scene, iterations) + returnedResultCount += 1 except RejectionException: raise RejectionException( f"failed to generate scenario in {maxIterations} iterations" ) else: if maxIterations != float("inf"): - raise RuntimeError("maxIterations not supported for parallel sampling.") + raise ValueError("maxIterations not supported for parallel sampling.") if feedback is not None: - raise RuntimeError("Feedback not supported for parallel sampling.") + raise ValueError("Feedback not supported for parallel sampling.") # Initialize results tracking data - resultsList = [] - returnedResults = 0 + returnedResultCount = 0 + + # Initialize random generator + rand_generator = numpy.random.default_rng(random.getrandbits(32)) - # Initialize queues and lock + # Initialize queues seedQueue = multiprocessing.Queue() - seedHistory = {} + seedHistory = [] + putSeedCount = 0 def putSeed(): - newSeed = random.getrandbits(32) + nonlocal putSeedCount + newSeed = int(rand_generator.integers(2**32)) seedQueue.put(newSeed) - seedHistory[newSeed] = len(seedHistory) if deterministic: - resultsList.append(None) + seedHistory.append(newSeed) + putSeedCount += 1 - for _ in range(numScenes): + initialSeedCount = numScenes if bufferSize is None else bufferSize + for _ in range(initialSeedCount): putSeed() sceneQueue = multiprocessing.Queue() @@ -549,6 +586,8 @@ def putSeed(): ] # Initialized result management functions + resultsDict = {} + def getResult(): sceneBytes, resultIterations, resultSeed = sceneQueue.get() resultScene = ( @@ -560,28 +599,30 @@ def getResult(): def getNextResult(): if not deterministic: - return getResult()[0] + nextResult = getResult()[0] + return nextResult if iterationCount else nextResult[0] while True: - assert len(resultsList) > 0 - - if resultsList[0] is not None: - return resultsList.pop(0) + if seedHistory[0] in resultsDict: + nextResult = resultsDict[seedHistory.pop(0)] + return nextResult if iterationCount else nextResult[0] result, resultSeed = getResult() - resultIndex = seedHistory[resultSeed] - returnedResults - assert resultsList[resultIndex] is None - resultsList[resultIndex] = result + resultsDict[resultSeed] = result # Start sampling processes and yield samples try: - # Prepare process pool + # Start processes for process in processes: process.start() - for _ in range(numScenes): + # Retrieve results + while returnedResultCount < numScenes: yield getNextResult() - returnedResults += 1 + returnedResultCount += 1 + + if putSeedCount < numWorkers: + putSeed() finally: # Close processes and queues @@ -904,3 +945,33 @@ def simulationFromBytes( data = io.BytesIO(data) scene = self.sceneFromBytes(data, verify=verify, allowPickle=allowPickle) return simulator.simulate(scene, replay=data, **kwargs) + + +def generateInnerBatchHelper( + scenarioCreationData, seedQueue, sceneQueue, verbosity, mute +): + if mute: + sys.stdout = open(os.devnull, "w") + sys.stderr = open(os.devnull, "w") + + from scenic.syntax.translator import _scenarioFromStream + + scenario = _scenarioFromStream( + stream=io.BytesIO(scenarioCreationData["streamLines"]), + compileOptions=scenarioCreationData["compileOptions"], + filename=scenarioCreationData["filename"], + scenario=scenarioCreationData["scenario"], + path=scenarioCreationData["path"], + _cacheImports=False, + ) + + while True: + seed = seedQueue.get() + setSeed(seed) + + scene, iterations = scenario._generateInner( + maxIterations=float("inf"), verbosity=verbosity, feedback=None + ) + sceneBytes = scenario.sceneToBytes(scene) + + sceneQueue.put((sceneBytes, iterations, seed)) diff --git a/src/scenic/core/simulators.py b/src/scenic/core/simulators.py index 3e9c0308f..aa550ed1c 100644 --- a/src/scenic/core/simulators.py +++ b/src/scenic/core/simulators.py @@ -13,10 +13,18 @@ import abc from collections import defaultdict import enum +import io import math +import multiprocessing import numbers +import os +import random +import sys import time import types +import warnings + +import numpy from scenic.core.distributions import RejectionException from scenic.core.dynamics import GuardViolation, RejectSimulationException @@ -31,6 +39,7 @@ ) from scenic.core.requirements import RequirementType from scenic.core.serialization import Serializer +from scenic.core.utils import setSeed from scenic.core.vectors import Vector @@ -393,6 +402,7 @@ def __init__( # Package up simulation results into a compact object. result = SimulationResult( + self.name, self.trajectory, self.actionSequence, terminationType, @@ -922,6 +932,7 @@ class SimulationResult: """Result of running a simulation. Attributes: + name: Name of the simulation, if any trajectory: A tuple giving for each time step the simulation's 'state': by default the positions of every object. See `Simulation.currentState`. finalState: The last 'state' of the simulation, as above. @@ -934,7 +945,10 @@ class SimulationResult: values its expression took during the simulation. """ - def __init__(self, trajectory, actions, terminationType, terminationReason, records): + def __init__( + self, name, trajectory, actions, terminationType, terminationReason, records + ): + self.name = name self.trajectory = tuple(trajectory) assert self.trajectory self.finalState = self.trajectory[-1] @@ -942,3 +956,223 @@ def __init__(self, trajectory, actions, terminationType, terminationReason, reco self.terminationType = terminationType self.terminationReason = str(terminationReason) self.records = dict(records) + + +class SimulatorGroup: + def __init__( + self, + numWorkers, + simulatorClass, + simulatorParams=None, + bufferSize=None, + mute=True, + returnFinalState=False, + returnTrajectory=False, + ): + if numWorkers <= 0: + raise ValueError("`numWorkers` must be at least 1.") + self.numWorkers = numWorkers + self.simulatorClass = simulatorClass + simulatorParams = simulatorParams if simulatorParams else dict() + if isinstance(simulatorParams, dict): + self.simulatorParams = [simulatorParams for _ in range(numWorkers)] + elif isinstance(simulatorParams, collections.abc.Iterable): + if len(simulatorParams) != numWorkers: + raise ValueError( + "Length of `simulatorParams` does not match `numWorkers`." + ) + self.simulatorParams = tuple(simulatorParams) + else: + raise ValueError("`simulatorParams` is not a dict or iterable of dicts.") + self.bufferSize = 2 * numWorkers if bufferSize is None else bufferSize + if self.bufferSize <= 1: + raise ValueError("`bufferSize` must be at least 1.") + self.mute = mute + self.returnFinalState = returnFinalState + self.returnTrajectory = returnTrajectory + + def _jobName(self, jobId): + return f"Scene{jobId}" + + def _serializeScene(self, scene, scenario, serialized): + from scenic.core.scenarios import Scene + + if serialized: + if not isinstance(scene, bytes): + raise ValueError( + f"Scene provided has type `{type(scene)}` instead of type `bytes`, but serialized was set to True." + ) + return scene + else: + if not isinstance(scene, Scene): + raise ValueError( + f"Scene provided has type `{type(scene)}` instead of type `Scene`, but serialized was set to False." + ) + return scenario.sceneFromBytes(scene, verify=True) + + def _prepareJob( + self, scene, simulateParams, jobId, scenario, serialized, rand_generator + ): + serializedScene = self._serializeScene(scene, scenario, serialized) + jobParams = simulateParams.copy() + + if "name" in jobParams: + warnings.warn( + "`name` in `simulateParams` is ignored and overwritten by custom name when using `SimulatorGroup`." + ) + jobName = self._jobName(jobId) + jobParams["name"] = jobName + + seed = int(rand_generator.integers(2**32)) + + return (jobId, serializedScene, jobParams, seed) + + def simulateBatch(self, scenario, scenes, simulateParams=None, serialized=True): + return tuple( + v[1] + for v in self.simulateStream( + scenario, scenes, simulateParams, serialized, deterministic=True + ) + ) + + def simulateStream( + self, scenario, scenes, simulateParams=None, serialized=True, deterministic=False + ): + simulateParams = simulateParams if simulateParams else dict() + + # Create helper parameters + scenarioCreationData = scenario._scenarioCreationData + jobQueue = multiprocessing.Queue() + resultQueue = multiprocessing.Queue() + + # Initialize random generator + rand_generator = numpy.random.default_rng(random.getrandbits(32)) + + # Initialize processes + processes = [] + for simulatorParams in self.simulatorParams: + params = ( + scenarioCreationData, + self.simulatorClass, + simulatorParams, + jobQueue, + resultQueue, + self.mute, + self.returnFinalState, + self.returnTrajectory, + ) + processes.append( + multiprocessing.Process(target=simulatorGroupHelper, args=params) + ) + + # Job creation utilities + remainingJobs = 0 + jobId = 0 + + def putJob(scene): + nonlocal jobId + nonlocal remainingJobs + preparedJob = self._prepareJob( + scene, + simulateParams, + jobId, + scenario, + serialized, + rand_generator=rand_generator, + ) + jobQueue.put(preparedJob) + jobId += 1 + remainingJobs += 1 + + # Initialized result management functions + lastReturnedJob = 0 + resultsDict = {} + + def getNextResult(): + if not deterministic: + return resultQueue.get() + + nonlocal lastReturnedJob + while True: + if lastReturnedJob in resultsDict: + returnResult = (lastReturnedJob, resultsDict.pop(lastReturnedJob)) + lastReturnedJob += 1 + return returnResult + + jobId, nextResult = resultQueue.get() + resultsDict[jobId] = nextResult + + try: + # Start processes + for process in processes: + process.start() + + # Initially saturate job buffer + for _ in range(self.bufferSize): + if scene := next(scenes, None): + putJob(scene) + + # Retrieve results and replenish buffer + while remainingJobs: + simulationResult = getNextResult() + remainingJobs -= 1 + + if scene := next(scenes, None): + putJob(scene) + + yield simulationResult + + finally: + # Close processes and queues + for process in processes: + process.terminate() + + jobQueue.close() + resultQueue.close() + + +def simulatorGroupHelper( + scenarioCreationData, + simulatorClass, + simulatorParams, + jobQueue, + resultQueue, + mute, + returnFinalState, + returnTrajectory, +): + if mute: + sys.stdout = open(os.devnull, "w") + sys.stderr = open(os.devnull, "w") + + from scenic.syntax.translator import _scenarioFromStream + + scenario = _scenarioFromStream( + stream=io.BytesIO(scenarioCreationData["streamLines"]), + compileOptions=scenarioCreationData["compileOptions"], + filename=scenarioCreationData["filename"], + scenario=scenarioCreationData["scenario"], + path=scenarioCreationData["path"], + _cacheImports=False, + ) + + simulator = simulatorClass(**simulatorParams) + + while True: + jobId, serializedScene, simulateParams, seed = jobQueue.get() + setSeed(seed) + + scene = scenario.sceneFromBytes(serializedScene, verify=False) + simulation = simulator.simulate(scene, **simulateParams) + + if simulation: + simulationResult = simulation.result + simulationResult.actions = None + if not returnFinalState: + simulationResult.finalState = None + if not returnTrajectory: + simulationResult.trajectory = None + else: + simulationResult = None + + resultQueue.put((jobId, simulationResult)) diff --git a/src/scenic/core/utils.py b/src/scenic/core/utils.py index 632557641..5266af78d 100644 --- a/src/scenic/core/utils.py +++ b/src/scenic/core/utils.py @@ -7,6 +7,7 @@ import io import itertools import math +import multiprocessing import os import random import signal @@ -400,35 +401,3 @@ def get_type_hints(obj, globalns=None, localns=None): def setSeed(seed): random.seed(seed) numpy.random.seed(seed) - - -def generateInnerBatchHelper( - scenarioCreationData, seedQueue, sceneQueue, verbosity, mute -): - if mute: - sys.stdout = open(os.devnull, "w") - sys.stderr = open(os.devnull, "w") - - from scenic.syntax.translator import _scenarioFromStream - - stream = io.BytesIO(scenarioCreationData["streamLines"]) - - scenario = _scenarioFromStream( - stream=stream, - compileOptions=scenarioCreationData["compileOptions"], - filename=scenarioCreationData["filename"], - scenario=scenarioCreationData["scenario"], - path=scenarioCreationData["path"], - _cacheImports=False, - ) - - while True: - seed = seedQueue.get() - setSeed(seed) - - scene, iterations = scenario._generateInner( - maxIterations=float("inf"), verbosity=verbosity, feedback=None - ) - sceneBytes = scenario.sceneToBytes(scene) - - sceneQueue.put((sceneBytes, iterations, seed)) diff --git a/tests/core/test_scenarios.py b/tests/core/test_scenarios.py index e4365d462..d77c133a0 100644 --- a/tests/core/test_scenarios.py +++ b/tests/core/test_scenarios.py @@ -74,6 +74,7 @@ def test_generateBatch(): ) scenes, _ = scenario.generateBatch(2, numWorkers=2) + assert len(scenes) == 2 assert all(isinstance(scene, Scene) for scene in scenes) assert all(0 <= scene.objects[0].heading <= 1 for scene in scenes) assert scenes[0].objects[0].heading != scenes[1].objects[0].heading @@ -87,6 +88,7 @@ def test_generateBatch_serialized(): """ ) scenesBytes, _ = scenario.generateBatch(2, numWorkers=2, serialized=True) + assert len(scenesBytes) == 2 assert all(isinstance(b, bytes) for b in scenesBytes) scenes = [scenario.sceneFromBytes(b, verify=True) for b in scenesBytes] @@ -108,6 +110,7 @@ def test_generateStream_deterministic(): streamA = tuple(scenario.generateStream(8, numWorkers=2, serialized=True)) setSeed(seed) streamB = tuple(scenario.generateStream(8, numWorkers=2, serialized=True)) + assert len(streamA) == len(streamB) == 8 bytesSetA = {result[0] for result in streamA} bytesSetB = {result[0] for result in streamB} assert bytesSetA == bytesSetB diff --git a/tools/benchmarking/parallelization/benchmark_parallelization.py b/tools/benchmarking/parallelization/benchmark_scene_parallel.py similarity index 73% rename from tools/benchmarking/parallelization/benchmark_parallelization.py rename to tools/benchmarking/parallelization/benchmark_scene_parallel.py index fec0f3191..27b2b64fb 100644 --- a/tools/benchmarking/parallelization/benchmark_parallelization.py +++ b/tools/benchmarking/parallelization/benchmark_scene_parallel.py @@ -52,28 +52,28 @@ def run_benchmark_parallel(path, params, *, numWorkers): scenario = scenic.scenarioFromFile( BENCHMARKS_BASE_PATH / path, params=params, mode2D=params.get("mode2D", False) ) - scenario.generateBatch(NUM_SAMPLES, maxIterations=float("inf"), numWorkers=numWorkers) + scenario.generateBatch(NUM_SAMPLES, numWorkers=numWorkers) if __name__ == "__main__": - print("Base Performance (`generate`, numWorkers=0):") - for benchmark in BENCHMARKS: - with warnings.catch_warnings(): - warnings.simplefilter("ignore") - start = time.time() - run_benchmark(*benchmark) - trial_time = time.time() - start - print(f"{trial_time: 7.2f} | {benchmark}") - print() - print("Base + Overhead Performance (`generateBatch`, numWorkers=1):") - for benchmark in BENCHMARKS: - with warnings.catch_warnings(): - warnings.simplefilter("ignore") - start = time.time() - run_benchmark_parallel(*benchmark, numWorkers=1) - trial_time = time.time() - start - print(f"{trial_time: 7.2f} | {benchmark}") - print() + # print("Base Performance (`generate`, numWorkers=0):") + # for benchmark in BENCHMARKS: + # with warnings.catch_warnings(): + # warnings.simplefilter("ignore") + # start = time.time() + # run_benchmark(*benchmark) + # trial_time = time.time() - start + # print(f"{trial_time: 7.2f} | {benchmark}") + # print() + # print("Base + Overhead Performance (`generateBatch`, numWorkers=1):") + # for benchmark in BENCHMARKS: + # with warnings.catch_warnings(): + # warnings.simplefilter("ignore") + # start = time.time() + # run_benchmark_parallel(*benchmark, numWorkers=1) + # trial_time = time.time() - start + # print(f"{trial_time: 7.2f} | {benchmark}") + # print() print("Batch Performance (`generateBatch`, numWorkers=8):") for benchmark in BENCHMARKS: with warnings.catch_warnings(): diff --git a/tools/benchmarking/parallelization/benchmark_sim_parallel.py b/tools/benchmarking/parallelization/benchmark_sim_parallel.py new file mode 100644 index 000000000..4d373b013 --- /dev/null +++ b/tools/benchmarking/parallelization/benchmark_sim_parallel.py @@ -0,0 +1,89 @@ +from pathlib import Path +import time +import warnings + +import scenic +from scenic.core.utils import SimulatorGroup +from scenic.simulators.metadrive.simulator import MetaDriveSimulator + +NUM_WORKERS = 8 +BENCHMARKS_BASE_PATH = (Path(__file__).resolve().parent / "benchmarks").resolve() +MAP_PATH = ( + Path(__file__).resolve().parent.parent.parent.parent + / "assets" + / "maps" + / "CARLA" + / "Town05.xodr" +).resolve() +SUMO_MAP_PATH = ( + Path(__file__).resolve().parent.parent.parent.parent + / "assets" + / "maps" + / "CARLA" + / "Town05.net.xml" +).resolve() +MESH_BASE_PATH = ( + Path(__file__).resolve().parent.parent.parent.parent / "assets" / "meshes" +).resolve() + +BENCHMARKS = [ + ("badlyParkedCarPullingIn.scenic", {"mode2D": True, "map": MAP_PATH}), + ("bypassing_03.scenic", {"mode2D": True, "map": MAP_PATH}), +] + +NUM_SAMPLES = 128 + + +def run_benchmark(path, params): + simulator = MetaDriveSimulator(sumo_map=SUMO_MAP_PATH, render=False, real_time=False) + scenario = scenic.scenarioFromFile( + BENCHMARKS_BASE_PATH / path, params=params, mode2D=params.get("mode2D", False) + ) + for _ in range(NUM_SAMPLES): + scene, _ = scenario.generate(maxIterations=float("inf")) + simulator.simulate(scene) + + +def run_benchmark_parallel(path, params, *, numWorkers): + scenario = scenic.scenarioFromFile( + BENCHMARKS_BASE_PATH / path, params=params, mode2D=params.get("mode2D", False) + ) + scene_stream = scenario.generateStream( + NUM_SAMPLES, numWorkers=numWorkers, serialized=True + ) + sim_group = SimulatorGroup( + numWorkers=numWorkers, + simulatorClass=MetaDriveSimulator, + simulatorParams={"sumo_map": SUMO_MAP_PATH, "render": False, "real_time": False}, + mute=False, + ) + sim_group.simulateBatch(scenario=scenario, scenes=scene_stream) + + +if __name__ == "__main__": + print("Base Performance (`generate`, numWorkers=0):") + for benchmark in BENCHMARKS: + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + start = time.time() + run_benchmark(*benchmark) + trial_time = time.time() - start + print(f"{trial_time: 7.2f} | {benchmark}") + print() + print("Base + Overhead Performance (`generateBatch`, numWorkers=1):") + for benchmark in BENCHMARKS: + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + start = time.time() + run_benchmark_parallel(*benchmark, numWorkers=1) + trial_time = time.time() - start + print(f"{trial_time: 7.2f} | {benchmark}") + print() + print("Batch Performance (`generateBatch`, numWorkers=8):") + for benchmark in BENCHMARKS: + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + start = time.time() + run_benchmark_parallel(*benchmark, numWorkers=8) + trial_time = time.time() - start + print(f"{trial_time: 7.2f} | {benchmark}") diff --git a/tools/benchmarking/parallelization/benchmarks/badlyParkedCarPullingIn.scenic b/tools/benchmarking/parallelization/benchmarks/badlyParkedCarPullingIn.scenic index 9e1b15b18..ba566dc73 100644 --- a/tools/benchmarking/parallelization/benchmarks/badlyParkedCarPullingIn.scenic +++ b/tools/benchmarking/parallelization/benchmarks/badlyParkedCarPullingIn.scenic @@ -1,6 +1,6 @@ param time_step = 1.0/10 -model scenic.domains.driving.model +model scenic.simulators.metadrive.model behavior PullIntoRoad(): while (distance from self to ego) > 15: diff --git a/tools/benchmarking/parallelization/benchmarks/bypassing_03.scenic b/tools/benchmarking/parallelization/benchmarks/bypassing_03.scenic index ebbab8023..647f90ed5 100644 --- a/tools/benchmarking/parallelization/benchmarks/bypassing_03.scenic +++ b/tools/benchmarking/parallelization/benchmarks/bypassing_03.scenic @@ -12,7 +12,7 @@ SOURCE: NHSTA, #16 # MAP AND MODEL # ################################# -model scenic.simulators.carla.model +model scenic.simulators.metadrive.model ################################# # CONSTANTS # @@ -104,3 +104,4 @@ require (distance from adversary to intersection) > INIT_DIST require (distance from lead to intersection) > INIT_DIST require always (adversary.laneSection._fasterLane is not None) terminate when (distance to egoSpawnPt) > TERM_DIST +terminate after 60 seconds From 349727ffacde8a8055e3efbae0e2f686ddffd850 Mon Sep 17 00:00:00 2001 From: Eric Vin Date: Wed, 24 Dec 2025 20:55:48 -0800 Subject: [PATCH 6/9] Added documentation for setSeed. --- docs/api.rst | 7 +++++++ src/scenic/core/utils.py | 1 + 2 files changed, 8 insertions(+) diff --git a/docs/api.rst b/docs/api.rst index 440f450f2..d6d2b504b 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -40,6 +40,13 @@ the sampled values for all the global parameters and objects in the scene from t ego has foo = 2.083099362726706 +Utilities +--------- + +Scenic provides top level utility functions for functions like setting Scenic's random seed. + +.. autofunction:: scenic.setSeed + Running Dynamic Simulations --------------------------- diff --git a/src/scenic/core/utils.py b/src/scenic/core/utils.py index 5266af78d..f01d58b2c 100644 --- a/src/scenic/core/utils.py +++ b/src/scenic/core/utils.py @@ -399,5 +399,6 @@ def get_type_hints(obj, globalns=None, localns=None): def setSeed(seed): + """Set the random seed used by Scenic""" random.seed(seed) numpy.random.seed(seed) From 335b437756c32cf3c5a70aa8712e05de24c08a17 Mon Sep 17 00:00:00 2001 From: Eric Vin Date: Thu, 25 Dec 2025 11:14:52 -0800 Subject: [PATCH 7/9] Added documentation and tests for simulation parallelization. --- src/scenic/core/scenarios.py | 24 ++++---- src/scenic/core/simulators.py | 39 ++++++++++++- tests/core/test_simulators.py | 105 +++++++++++++++++++++++++++++++++- 3 files changed, 156 insertions(+), 12 deletions(-) diff --git a/src/scenic/core/scenarios.py b/src/scenic/core/scenarios.py index 852680ada..2476a4653 100644 --- a/src/scenic/core/scenarios.py +++ b/src/scenic/core/scenarios.py @@ -480,19 +480,19 @@ def generateStream( """Sample a stream of `Scene` objects from this scenario. For a description of how scene generation is done, see `scene generation`. This function can produce - both finite and infinite streams (depending on the value of `numScenes`). + both finite and infinite streams (depending on the value of ``numScenes``). .. note:: - NOTE: The deterministic parameter is by default set to True, meaning that scenes + NOTE: The ``deterministic`` parameter is by default set to True, meaning that scenes will be returned in a fixed order for a given Scenic seed. Setting this to False means scenes will be returned in a possibly non-deterministic order, but with possibly - decreased latency. When deterministic is set to False, the ordering of the returned scenes + decreased latency. When ``deterministic`` is set to ``False``, the ordering of the returned scenes is not fully random, with scenes that are easier to generate being more likely to be returned earlier in the stream. Despite this, the overall distribution of the returned scenes still - matches the scenario. When generating an infinite stream, `deterministic` must be set to True. + matches the scenario. When generating an infinite stream, ``deterministic`` must be set to ``True``. Args: - numScenes (int): Number of scenes to generate, or `float('inf')` to sample an infinite stream + numScenes (int): Number of scenes to generate, or ``float('inf')`` to sample an infinite stream of Scenes. maxIterations (int): Maximum number of rejection sampling iterations (over all scenes). verbosity (int): Verbosity level. @@ -501,9 +501,9 @@ def generateStream( numWorkers (int): The number of workers to be used when generating scenes. If numWorkers is 0, scenes will be generated in the main process. bufferSize (int): The number of scenes to have available at any given time, or `None` to - use default values. If set to `None` and `numScenes` is finite, all scenes are generated - in a greedy fashon. If set to `None and `numScenes` is infinite, set to a default - value of `2*numWorkers`. + use default values. If set to ``None`` and ``numScenes`` is finite, all scenes are generated + in a greedy fashon. If set to ``None`` and ``numScenes`` is infinite, set to a default + value of ``2*numWorkers``. mute (bool): Whether or not to mute stdOut and stdErr in the worker processes. serialized (bool): Whether or not to return scenes in a serialized format. deterministic (bool): Whether or not scenes will be returned in a deterministic order. This @@ -536,11 +536,15 @@ def generateStream( while returnedResultCount < numScenes: try: remainingIts = maxIterations - totalIterations - scene, iterations = self._generateInner( + rawScene, iterations = self._generateInner( remainingIts, verbosity, feedback ) + scene = self.sceneToBytes(rawScene) if serialized else rawScene totalIterations += iterations - yield (scene, iterations) + if iterationCount: + yield (scene, iterations) + else: + yield scene returnedResultCount += 1 except RejectionException: raise RejectionException( diff --git a/src/scenic/core/simulators.py b/src/scenic/core/simulators.py index aa550ed1c..1076c5634 100644 --- a/src/scenic/core/simulators.py +++ b/src/scenic/core/simulators.py @@ -959,6 +959,23 @@ def __init__( class SimulatorGroup: + """A group of simulators for running parallel simulations. + + Args: + numWorkers: Number of workers in this group. + simulatorClass: The simulator class this group is composed of. + simulatorParams: An optional single or list of kwarg dictionaries to be passed as parameters + when creating the simulators in this group. If simulatorParams is a list of dicts, + ``len(simulatorParams)`` should equal ``numWorkers``. + bufferSize: An optional integer indicating the size of the job buffer. If ``None``, the value is + set to ``2 * numWorkers``. + mute: Whether or not to mute stdOut and stdErr in the worker processes. + returnFinalState: Whether or not returned `SimulationResult` objects should contain the ``finalState`` + property. Set to ``False`` by default to minimize overhead. + returnTrajectory: Whether or not returned `SimulationResult` objects should contain the ``trajectry`` + property. Set to ``False`` by default to minimize overhead. + """ + def __init__( self, numWorkers, @@ -1008,7 +1025,7 @@ def _serializeScene(self, scene, scenario, serialized): raise ValueError( f"Scene provided has type `{type(scene)}` instead of type `Scene`, but serialized was set to False." ) - return scenario.sceneFromBytes(scene, verify=True) + return scenario.sceneToBytes(scene) def _prepareJob( self, scene, simulateParams, jobId, scenario, serialized, rand_generator @@ -1028,6 +1045,14 @@ def _prepareJob( return (jobId, serializedScene, jobParams, seed) def simulateBatch(self, scenario, scenes, simulateParams=None, serialized=True): + """Simulate and return a batch of `SimulationResult` objects. + + Args: + scenario: The scenario that the scenes are sampled from. + scenes: An iterator of `Scene` objects sampled from ``scenario``. + simulateParams: An optional dictionary of params to be passed to simulate internally. + serialized: Whether or not ``scenes`` contains serialized scenes. + """ return tuple( v[1] for v in self.simulateStream( @@ -1038,6 +1063,18 @@ def simulateBatch(self, scenario, scenes, simulateParams=None, serialized=True): def simulateStream( self, scenario, scenes, simulateParams=None, serialized=True, deterministic=False ): + """Generate a stream of `SimulationResult` objects. + + Args: + scenario: The scenario that the scenes are sampled from. + scenes: An iterator of `Scene` objects sampled from ``scenario``. + simulateParams: An optional dictionary of params to be passed to simulate internally. + serialized: Whether or not ``scenes`` contains serialized scenes. + deterministic: Whether or not results should be returned in a deterministic order. Setting + this to ``False`` can result in decreased latency when accessing results, but the order + will not be fixed. + """ + scenes = iter(scenes) simulateParams = simulateParams if simulateParams else dict() # Create helper parameters diff --git a/tests/core/test_simulators.py b/tests/core/test_simulators.py index 149c1cad1..1b22aad5e 100644 --- a/tests/core/test_simulators.py +++ b/tests/core/test_simulators.py @@ -1,6 +1,15 @@ +import itertools +import random + import pytest -from scenic.core.simulators import DummySimulation, DummySimulator, Simulation +import scenic +from scenic.core.simulators import ( + DummySimulation, + DummySimulator, + Simulation, + SimulatorGroup, +) from tests.utils import compileScenic, sampleResultFromScene, sampleSceneFrom @@ -94,3 +103,97 @@ class TestObj: simulator = TestSimulator() with pytest.raises(RuntimeError): result = simulator.simulate(scene, maxSteps=2) + + +@pytest.mark.slow +def test_simulator_group(): + scenario = compileScenic( + """ + behavior Foo(): + while True: + require Range(0,1) < 0.99 + wait + + new Object with behavior Foo() + """ + ) + + for numWorkers, serialized, scene_stream, sim_stream in itertools.product( + [1, 2], [True, False], [True, False], [True, False] + ): + if scene_stream: + scenes = scenario.generateStream( + 200, numWorkers=numWorkers, serialized=serialized, iterationCount=False + ) + else: + scenes, _ = scenario.generateBatch( + 200, numWorkers=numWorkers, serialized=serialized + ) + + sim_group = SimulatorGroup( + numWorkers=2, simulatorClass=DummySimulator, mute=False + ) + + simulate_params = {"maxSteps": 10} + + if sim_stream: + results = tuple( + result + for _, result in sim_group.simulateStream( + scenario, + scenes, + simulateParams=simulate_params, + serialized=serialized, + ) + ) + else: + results = sim_group.simulateBatch( + scenario, scenes, simulateParams=simulate_params, serialized=serialized + ) + + assert any(val is None for val in results) + assert any(val is not None for val in results) + + +def test_simulator_group_deterministic(): + scenario = compileScenic( + """ + behavior Foo(): + while True: + require Range(0,1) < 0.99 + wait + + new Object with behavior Foo() + """ + ) + + seed = random.getrandbits(32) + + scenic.setSeed(seed) + scenes, _ = scenario.generateBatch(200, serialized=True) + + sim_group = SimulatorGroup(numWorkers=4, simulatorClass=DummySimulator, mute=False) + simulate_params = {"maxSteps": 10} + + results1 = tuple( + result + for _, result in sim_group.simulateStream( + scenario, scenes, simulateParams=simulate_params, deterministic=True + ) + ) + + scenic.setSeed(seed) + scenes, _ = scenario.generateBatch(200, serialized=True) + + sim_group = SimulatorGroup(numWorkers=4, simulatorClass=DummySimulator, mute=False) + simulate_params = {"maxSteps": 10} + + results2 = tuple( + result + for _, result in sim_group.simulateStream( + scenario, scenes, simulateParams=simulate_params, deterministic=True + ) + ) + + assert len(results1) == len(results2) + assert all((v1 is None) == (v2 is None) for v1, v2 in zip(results1, results2)) From f673bb466ea25c9a0acf82c35ce0eedc0127d06f Mon Sep 17 00:00:00 2001 From: Eric Vin Date: Sat, 18 Apr 2026 13:09:36 -0700 Subject: [PATCH 8/9] Minor documentation and test additions. --- docs/api.rst | 4 ++-- docs/options.rst | 2 ++ tests/syntax/test_basic.py | 13 +++++++++++++ 3 files changed, 17 insertions(+), 2 deletions(-) diff --git a/docs/api.rst b/docs/api.rst index d6d2b504b..33fbbed77 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -30,8 +30,8 @@ the sampled values for all the global parameters and objects in the scene from t .. testcode:: - import random, scenic - random.seed(12345) + import scenic + scenic.setSeed(12345) scenario = scenic.scenarioFromString('ego = new Object with foo Range(0, 5)') scene, numIterations = scenario.generate() print(f'ego has foo = {scene.egoObject.foo}') diff --git a/docs/options.rst b/docs/options.rst index 1df985596..6a4d7ab2c 100644 --- a/docs/options.rst +++ b/docs/options.rst @@ -48,6 +48,8 @@ General Scenario Control (although :mod:`random` and :mod:`numpy.random` should not be used in place of Scenic's own sampling constructs in Scenic code). + The seed can be set programatically using `scenic.setSeed`. + .. option:: --scenario If the given Scenic file defines multiple scenarios, select which one to run. diff --git a/tests/syntax/test_basic.py b/tests/syntax/test_basic.py index dd022b208..55283d56a 100644 --- a/tests/syntax/test_basic.py +++ b/tests/syntax/test_basic.py @@ -330,3 +330,16 @@ class Foo(Bar): obj = sampleEgoFrom(program, mode2D=True) assert obj.heading == obj.parentOrientation.yaw == 0.56 + + +def test_setSeed(): + scenario = compileScenic("ego = new Object with foo Range(0,1)") + + scenic.setSeed(10) + s1, _ = scenario.generate() + scenic.setSeed(10) + s2, _ = scenario.generate() + s3, _ = scenario.generate() + + assert s1.objects[0].foo == s2.objects[0].foo + assert s1.objects[0].foo != s3.objects[0].foo From 24e5ee934e5b4f0df5fce771db95f5c6f3255333 Mon Sep 17 00:00:00 2001 From: Eric Vin Date: Sat, 18 Apr 2026 13:11:23 -0700 Subject: [PATCH 9/9] Simplified test. --- tests/syntax/test_basic.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/tests/syntax/test_basic.py b/tests/syntax/test_basic.py index 55283d56a..24694cc38 100644 --- a/tests/syntax/test_basic.py +++ b/tests/syntax/test_basic.py @@ -333,13 +333,11 @@ class Foo(Bar): def test_setSeed(): - scenario = compileScenic("ego = new Object with foo Range(0,1)") - scenic.setSeed(10) - s1, _ = scenario.generate() + p1 = sampleParamPFrom("param p = Range(0, 1)") scenic.setSeed(10) - s2, _ = scenario.generate() - s3, _ = scenario.generate() + p2 = sampleParamPFrom("param p = Range(0, 1)") + p3 = sampleParamPFrom("param p = Range(0, 1)") - assert s1.objects[0].foo == s2.objects[0].foo - assert s1.objects[0].foo != s3.objects[0].foo + assert p1 == p2 + assert p1 != p3