Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from tilebox.datasets.datasets.v1 import core_pb2 as datasets_dot_v1_dot_core__pb2


class CollectionServiceStub(object):
class CollectionServiceStub:
"""CollectionService is the service definition for the Tilebox datasets service, which provides access to datasets
"""

Expand Down Expand Up @@ -38,7 +38,7 @@ def __init__(self, channel):
_registered_method=True)


class CollectionServiceServicer(object):
class CollectionServiceServicer:
"""CollectionService is the service definition for the Tilebox datasets service, which provides access to datasets
"""

Expand Down Expand Up @@ -97,7 +97,7 @@ def add_CollectionServiceServicer_to_server(servicer, server):


# This class is part of an EXPERIMENTAL API.
class CollectionService(object):
class CollectionService:
"""CollectionService is the service definition for the Tilebox datasets service, which provides access to datasets
"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from tilebox.datasets.datasets.v1 import data_access_pb2 as datasets_dot_v1_dot_data__access__pb2


class DataAccessServiceStub(object):
class DataAccessServiceStub:
"""DataAccessService provides data access and querying capabilities for Tilebox datasets.
"""

Expand All @@ -28,7 +28,7 @@ def __init__(self, channel):
_registered_method=True)


class DataAccessServiceServicer(object):
class DataAccessServiceServicer:
"""DataAccessService provides data access and querying capabilities for Tilebox datasets.
"""

Expand Down Expand Up @@ -67,7 +67,7 @@ def add_DataAccessServiceServicer_to_server(servicer, server):


# This class is part of an EXPERIMENTAL API.
class DataAccessService(object):
class DataAccessService:
"""DataAccessService provides data access and querying capabilities for Tilebox datasets.
"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from tilebox.datasets.datasets.v1 import data_ingestion_pb2 as datasets_dot_v1_dot_data__ingestion__pb2


class DataIngestionServiceStub(object):
class DataIngestionServiceStub:
"""DataIngestionService provides data ingestion and deletion capabilities for Tilebox datasets.
"""

Expand All @@ -27,7 +27,7 @@ def __init__(self, channel):
_registered_method=True)


class DataIngestionServiceServicer(object):
class DataIngestionServiceServicer:
"""DataIngestionService provides data ingestion and deletion capabilities for Tilebox datasets.
"""

Expand Down Expand Up @@ -64,7 +64,7 @@ def add_DataIngestionServiceServicer_to_server(servicer, server):


# This class is part of an EXPERIMENTAL API.
class DataIngestionService(object):
class DataIngestionService:
"""DataIngestionService provides data ingestion and deletion capabilities for Tilebox datasets.
"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from tilebox.datasets.datasets.v1 import datasets_pb2 as datasets_dot_v1_dot_datasets__pb2


class DatasetServiceStub(object):
class DatasetServiceStub:
"""DatasetsService is the CRUD service for Tilebox datasets.
"""

Expand Down Expand Up @@ -43,7 +43,7 @@ def __init__(self, channel):
_registered_method=True)


class DatasetServiceServicer(object):
class DatasetServiceServicer:
"""DatasetsService is the CRUD service for Tilebox datasets.
"""

Expand Down Expand Up @@ -113,7 +113,7 @@ def add_DatasetServiceServicer_to_server(servicer, server):


# This class is part of an EXPERIMENTAL API.
class DatasetService(object):
class DatasetService:
"""DatasetsService is the CRUD service for Tilebox datasets.
"""

Expand Down
4 changes: 3 additions & 1 deletion tilebox-datasets/tilebox/datasets/tilebox/v1/query_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions tilebox-datasets/tilebox/datasets/tilebox/v1/query_pb2.pyi
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
from tilebox.datasets.buf.validate import validate_pb2 as _validate_pb2
from google.protobuf import timestamp_pb2 as _timestamp_pb2
from tilebox.datasets.tilebox.v1 import id_pb2 as _id_pb2
from google.protobuf.internal import enum_type_wrapper as _enum_type_wrapper
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from collections.abc import Mapping as _Mapping
from typing import ClassVar as _ClassVar, Optional as _Optional, Union as _Union

DESCRIPTOR: _descriptor.FileDescriptor

class SortDirection(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
__slots__ = ()
SORT_DIRECTION_UNSPECIFIED: _ClassVar[SortDirection]
SORT_DIRECTION_ASCENDING: _ClassVar[SortDirection]
SORT_DIRECTION_DESCENDING: _ClassVar[SortDirection]
SORT_DIRECTION_UNSPECIFIED: SortDirection
SORT_DIRECTION_ASCENDING: SortDirection
SORT_DIRECTION_DESCENDING: SortDirection

class TimeInterval(_message.Message):
__slots__ = ("start_time", "end_time", "start_exclusive", "end_inclusive")
START_TIME_FIELD_NUMBER: _ClassVar[int]
Expand Down
4 changes: 3 additions & 1 deletion tilebox-workflows/tests/clusters/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
from tilebox.workflows.data import (
Cluster,
)
from tilebox.workflows.workflows.v1.core_pb2 import Cluster as ClusterMessage
from tilebox.workflows.workflows.v1.workflows_pb2 import (
Cluster as ClusterMessage,
)
from tilebox.workflows.workflows.v1.workflows_pb2 import (
CreateClusterRequest,
DeleteClusterRequest,
Expand Down
3 changes: 2 additions & 1 deletion tilebox-workflows/tilebox/workflows/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@

from tilebox.workflows.client import Client
from tilebox.workflows.data import Job
from tilebox.workflows.runner.runner import Runner
from tilebox.workflows.task import ExecutionContext, Task

__all__ = ["Client", "ExecutionContext", "Job", "Task"]
__all__ = ["Client", "ExecutionContext", "Job", "Runner", "Task"]


def _init_logging(level: str = "INFO") -> None:
Expand Down
25 changes: 15 additions & 10 deletions tilebox-workflows/tilebox/workflows/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from uuid import uuid4

from _tilebox.grpc.channel import open_channel, parse_channel_info
from tilebox.datasets.sync.client import Client as DatasetsClient
from tilebox.workflows.automations.client import AutomationClient, AutomationService
from tilebox.workflows.cache import JobCache, NoCache
from tilebox.workflows.clusters.client import ClusterClient, ClusterSlugLike, to_cluster_slug
Expand All @@ -22,8 +21,10 @@
_create_tilebox_logger_provider,
)
from tilebox.workflows.observability.tracing import WorkflowTracer
from tilebox.workflows.runner.runner import Runner
from tilebox.workflows.runner.task_runner import TaskRunner, _LeaseRenewer
from tilebox.workflows.runner.task_service import TaskService
from tilebox.workflows.task import Task


class Client:
Expand Down Expand Up @@ -107,9 +108,10 @@ def jobs(self) -> JobClient:
def runner(
self,
cluster: ClusterSlugLike | None = None,
tasks: list[type] | None = None,
tasks: list[type[Task]] | None = None,
cache: JobCache | None = None,
context: type[RunnerContext] | None = None,
runner: Runner | None = None,
) -> TaskRunner:
"""Initialize a task runner.

Expand All @@ -118,12 +120,17 @@ def runner(
tasks: A list of task the runner is able to execute.
cache: The cache to share between tasks.
context: The type of the runner context to use. Defaults to RunnerContext.
runner: A runner definition containing tasks, cache and context configuration.

Returns:
A task runner.
"""
if runner is not None and (tasks is not None or cache is not None or context is not None):
raise ValueError("Pass either runner or tasks/cache/context, not both.")

runner_definition = runner or Runner(tasks=tasks, cache=cache, context=context)
if cache is None:
cache = NoCache() # a no-op cache that will raise an error if it's used
cache = runner_definition.cache or NoCache() # a no-op cache that will raise an error if it's used

found_cluster = self.clusters().find(to_cluster_slug(cluster or ""))

Expand All @@ -134,14 +141,13 @@ def runner(
# lets refactor this to a lazy loading mechanism in the future
storage_locations = []

runner_context_type = context or RunnerContext
runner_context_type = runner_definition.context or RunnerContext
runner_context = runner_context_type(
self._tracer,
datasets_client=DatasetsClient(**self._auth), # ty: ignore[invalid-argument-type]
storage_locations=storage_locations,
)

runner = TaskRunner(
task_runner = TaskRunner(
TaskService(self._channel),
found_cluster.slug,
cache,
Expand All @@ -152,11 +158,10 @@ def runner(
runner_logger=StructuredLogger(self._runner_logger, {}),
)

if tasks is not None:
for task in tasks:
runner.register(task)
for task in runner_definition.tasks_by_identifier.values():
task_runner.register(task)

return runner
return task_runner

def clusters(self) -> ClusterClient:
"""
Expand Down
74 changes: 66 additions & 8 deletions tilebox-workflows/tilebox/workflows/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,9 @@
except ModuleNotFoundError:
from typing import Any as S3Client

from tilebox.datasets.sync.client import Client as DatasetsClient
from tilebox.workflows.observability.tracing import NoopWorkflowTracer, WorkflowTracer
from tilebox.workflows.workflows.v1 import automation_pb2 as automation_pb
from tilebox.workflows.workflows.v1 import core_pb2, job_pb2, task_pb2
from tilebox.workflows.workflows.v1 import core_pb2, job_pb2, task_pb2, workflows_pb2

_VERSION_PATTERN = re.compile(r"^v(\d+)\.(\d+)$") # matches a version string in the format "v3.2"

Expand Down Expand Up @@ -323,19 +322,35 @@ class Cluster:
deletable: bool

@classmethod # lets use typing.Self once we require python >= 3.11
def from_message(cls, cluster: core_pb2.Cluster) -> "Cluster":
def from_message(cls, cluster: workflows_pb2.Cluster) -> "Cluster":
"""Convert a Cluster protobuf message to a Cluster object."""
return cls(slug=cluster.slug, display_name=cluster.display_name, deletable=cluster.deletable)

def to_message(self) -> core_pb2.Cluster:
def to_message(self) -> workflows_pb2.Cluster:
"""Convert a Cluster object to a Cluster protobuf message."""
return core_pb2.Cluster(slug=self.slug, display_name=self.display_name, deletable=self.deletable)
return workflows_pb2.Cluster(slug=self.slug, display_name=self.display_name, deletable=self.deletable)


@dataclass(order=True, frozen=True)
class Workflow:
slug: str
name: str
description: str

@classmethod
def from_message(cls, workflow: workflows_pb2.Workflow) -> "Workflow":
"""Convert a Workflow protobuf message to a Workflow object."""
return cls(slug=workflow.slug, name=workflow.name, description=workflow.description)

def to_message(self) -> workflows_pb2.Workflow:
"""Convert a Workflow object to a Workflow protobuf message."""
return workflows_pb2.Workflow(slug=self.slug, name=self.name, description=self.description)


@dataclass
class NextTaskToRun:
cluster_slug: str
identifiers: dict[TaskIdentifier, type]
identifiers: dict[TaskIdentifier, type[Any]]

# from message not needed, as we never return this from the server

Expand Down Expand Up @@ -474,6 +489,51 @@ def to_message(self) -> task_pb2.ComputedTask:
)


@dataclass
class FailedTask:
task_id: UUID
display: str | None
was_workflow_error: bool
progress_updates: list[ProgressIndicator]

@classmethod
def from_message(cls, failed_task: task_pb2.TaskFailedRequest) -> "FailedTask":
"""Convert a TaskFailedRequest protobuf message to a FailedTask object."""
return cls(
task_id=uuid_message_to_uuid(failed_task.task_id),
display=failed_task.display,
was_workflow_error=failed_task.was_workflow_error,
progress_updates=[ProgressIndicator.from_message(progress) for progress in failed_task.progress_updates],
)

@classmethod
def from_task_error(
cls,
task: Task,
error: Exception,
was_workflow_error: bool,
progress_updates: list[ProgressIndicator],
) -> "FailedTask":
# job output is limited to 1KB, so truncate the error message if necessary
error_message = repr(error)[: (1024 - len(task.display or "None") - 1)]
display = f"{task.display}" if error_message == "" else f"{task.display}\n{error_message}"
return cls(
task_id=task.id,
display=display,
was_workflow_error=was_workflow_error,
progress_updates=progress_updates,
)

def to_message(self) -> task_pb2.TaskFailedRequest:
"""Convert a FailedTask object to a TaskFailedRequest protobuf message."""
return task_pb2.TaskFailedRequest(
task_id=uuid_to_uuid_message(self.task_id),
display=self.display,
was_workflow_error=self.was_workflow_error,
progress_updates=[progress.to_message() for progress in self.progress_updates],
)


def _parse_version(version: str) -> tuple[int, int]:
"""
Parse the major and minor version from a string in the format "vMajor.Minor" and returns them as tuple of ints.
Expand Down Expand Up @@ -924,13 +984,11 @@ class RunnerContext:
def __init__(
self,
tracer: WorkflowTracer | None = None,
datasets_client: DatasetsClient | None = None,
storage_locations: list[StorageLocation] | None = None,
) -> None:
if tracer is None:
tracer = NoopWorkflowTracer()
self.tracer = tracer
self.datasets_client = datasets_client
self.storage_locations = {
sl.id: sl._with_runner_context(self) # noqa: SLF001
for sl in storage_locations or []
Expand Down
Loading
Loading