diff --git a/apis/workflows/v1/worker.proto b/apis/workflows/v1/worker.proto index ce1c112..317659e 100644 --- a/apis/workflows/v1/worker.proto +++ b/apis/workflows/v1/worker.proto @@ -10,29 +10,24 @@ import "workflows/v1/core.proto"; import "workflows/v1/task.proto"; import "workflows/v1/workflows.proto"; -// HandshakeRequest is sent by the runner to a worker runtime to discover its capabilities. -message HandshakeRequest { - RunnerMetadata metadata = 1; - // The cluster that the worker runtime is running on. - workflows.v1.Cluster cluster = 2; - // The storage locations that the worker runtime can access, for triggered automations. - repeated StorageLocation locations = 3; -} - -// Some metadata about the runner that this worker runtime is connected to, to be able to include it in logs and traces. -message RunnerMetadata { +// InitializeRunnerRequest is sent by the runner to a worker runtime to discover its capabilities. +message InitializeRunnerRequest { // The id of the runner that this worker runtime is connected to. tilebox.v1.ID runner_id = 1; - // The traceparent that should be used for any traces emitted by this worker runtime (outside of task execution, - // which has its own traceparent propagation from jobs). This allows us to correlate any logs and traces emitted + // The trace_parent that should be used for any traces emitted by this worker runtime (outside of task execution, + // which has its own trace_parent propagation from jobs). This allows us to correlate any logs and traces emitted // by the worker runtime with the runner that it's connected to. - string traceparent = 2; + string trace_parent = 2; + // The cluster that the worker runtime is running on. + workflows.v1.Cluster cluster = 3; + // The workflow (and the release) that this worker was created from. + workflows.v1.Workflow workflow = 4; + // The storage locations that the worker runtime can access, for triggered automations. + repeated StorageLocation locations = 5; } // HandshakeResponse is the response to a handshake request, containing the task identifiers that the worker can execute. -message HandshakeResponse { - workflows.v1.TaskIdentifiers task_identifiers = 1; -} +message InitializeRunnerResponse {} // ExecuteTaskResponse is the response of a worker runtime to a request to execute a task, after the task has // been executed. @@ -53,7 +48,20 @@ message ExecuteTaskResponse { // WorkerService defines the RPC interface between a runner and a a worker runtime that can execute workflow tasks. service WorkerService { - rpc Handshake(HandshakeRequest) returns (HandshakeResponse) {} + // ListRegisteredTasks is called by the runner to discover the task identifiers that a worker runtime can execute. + rpc ListRegisteredTasks(google.protobuf.Empty) returns (workflows.v1.TaskIdentifiers); + + // InitializeWorker is called by the runner to initialize a worker runtime with some known metadata about the + // runner. This allows us to add those metadata to any logs and traces emitted by the worker. + rpc InitializeWorker(InitializeRunnerRequest) returns (InitializeRunnerResponse) {} + + // ExecuteTask is called by the runner to execute a task on the worker runtime. The worker runtime will execute the + // task, and then respond with the result of the task execution, which can either be a computed task, or a failed + // task. If the worker runtime crashes or becomes unreachable during task execution, the runner will treat the task + // as failed with an unknown error. rpc ExecuteTask(workflows.v1.Task) returns (ExecuteTaskResponse) {} - rpc Shutdown(google.protobuf.Empty) returns (google.protobuf.Empty) {} + + // Gracefully shuts down the worker runtime. After receiving this request, the worker runtime will + // cleanly shut down. + rpc ShutdownWorker(google.protobuf.Empty) returns (google.protobuf.Empty) {} } diff --git a/apis/workflows/v1/workflows.proto b/apis/workflows/v1/workflows.proto index 7434f47..c783969 100644 --- a/apis/workflows/v1/workflows.proto +++ b/apis/workflows/v1/workflows.proto @@ -114,10 +114,19 @@ message Artifact { // the list of task identifiers, and the command to start a worker runtime for executing the tasks in the release. // The fingerprint is a hash of the release content. message ReleaseContent { + // A fingerprint of the release content, which is a hash of the task identifiers, files, runner object path, + // and command override. Used to determine if two releases have the same content, for idempotent builds. string fingerprint = 1 [(buf.validate.field).string.pattern = "^[a-f0-9]{64}$"]; + // List of task identifiers included in the release artifact. repeated TaskIdentifier tasks = 2 [(buf.validate.field).repeated.min_items = 1]; - Path root = 3; - repeated string command = 4; + // List of files/directories included in the release artifact. + repeated Path files = 3; + // The python module/object path to the runner instance, e.g. "my_module.my_runner:runner". + string runner_object_path = 4; + // A custom command override for starting a worker runtime. This is optional, and if not set defaults to + // "uv run python -m tilebox.workflows.runner ". This allows users to specify a custom command + // for starting the worker runtime, e.g. to set environment variables or use a different entrypoint. + repeated string command_override = 5; } // Path represents a file or directory path in the release content. If it is a directory, it can have child paths.