From 4bfe1812d8ed55586754736774be8c3a378f0828 Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Wed, 6 May 2026 17:37:52 -0700 Subject: [PATCH 1/4] feat(k8s): Add optional canary Deployment to PipelineStep Emit a second Deployment when with_canary is set and replicas > 1, sharing the same ConfigMap and applying emergency_patch to both manifests. Co-authored-by: Cursor --- .../sentry_streams_k8s/pipeline_step.py | 140 ++++++++++++------ .../tests/test_pipeline_step.py | 72 +++++++++ 2 files changed, 169 insertions(+), 43 deletions(-) diff --git a/sentry_streams_k8s/sentry_streams_k8s/pipeline_step.py b/sentry_streams_k8s/sentry_streams_k8s/pipeline_step.py index 3a13b310..ed98a556 100644 --- a/sentry_streams_k8s/sentry_streams_k8s/pipeline_step.py +++ b/sentry_streams_k8s/sentry_streams_k8s/pipeline_step.py @@ -232,9 +232,58 @@ def parse_context(context: dict[str, Any]) -> PipelineStepContext: "emergency_patch": emergency_patch_parsed, "enable_liveness_probe": context.get("enable_liveness_probe", True), "container_name": context.get("container_name", "pipeline-consumer"), + "with_canary": bool(context.get("with_canary", False)), } +def _build_merged_pipeline_deployment( + *, + base_deployment: dict[str, Any], + deployment_template: dict[str, Any], + emergency_patch: dict[str, Any], + deployment_name: str, + replica_count: int, + step_labels: dict[str, Any], + container: dict[str, Any], + volumes: list[dict[str, Any]], +) -> dict[str, Any]: + pipeline_additions: dict[str, Any] = { + "metadata": { + "name": deployment_name, + "labels": step_labels, + }, + "spec": { + "replicas": replica_count, + "selector": { + "matchLabels": step_labels, + }, + "template": { + "metadata": { + "labels": step_labels, + }, + "spec": { + "containers": [container], + "volumes": volumes, + }, + }, + }, + } + try: + deepmerge(deployment_template, pipeline_additions, fail_on_scalar_overwrite=True) + except ScalarOverwriteError as e: + raise ScalarOverwriteError( + f"{e}\n\n" + f"This field is automatically set by PipelineStep and conflicts with your deployment_template. " + f"Note: Lists and dicts can be provided (they get merged), but scalar values cannot be overridden." + ) from e + + deployment = deepmerge(base_deployment, deployment_template) + deployment = deepmerge(deployment, pipeline_additions) + if emergency_patch: + deployment = deepmerge(deployment, emergency_patch) + return deployment + + class PipelineStepContext(TypedDict): """Context dictionary for PipelineStep macro.""" @@ -253,6 +302,7 @@ class PipelineStepContext(TypedDict): emergency_patch: NotRequired[dict[str, Any]] enable_liveness_probe: NotRequired[bool] container_name: NotRequired[str] + with_canary: NotRequired[bool] class PipelineStep(ExternalMacro): @@ -297,6 +347,7 @@ class PipelineStep(ExternalMacro): "cpu_per_process": 1000, "memory_per_process": 512, "replicas": 3, + "with_canary": True, } ) }} @@ -351,7 +402,9 @@ def run(self, context: dict[str, Any]) -> dict[str, Any]: 2. Merge pipeline-specific configuration onto the result Returns: - Dictionary with 'deployment' and 'configmap' keys + Dictionary with 'deployment' and 'configmap' keys. When canary + splitting is active (``with_canary`` and ``replicas`` > 1), also + includes ``canary_deployment``. """ ctx = parse_context(context) @@ -432,48 +485,46 @@ def run(self, context: dict[str, Any]) -> dict[str, Any]: } ) - pipeline_additions = { - "metadata": { - "name": make_k8s_name(f"{service_name}-pipeline-{pipeline_name}-{segment_id}"), - "labels": labels, - }, - "spec": { - "replicas": replicas, - "selector": { - "matchLabels": labels, - }, - "template": { - "metadata": { - "labels": labels, - }, - "spec": { - "containers": [container], - "volumes": volumes, - }, - }, - }, - } + effective_canary = ctx.get("with_canary", False) and replicas > 1 + main_deployment_name = make_k8s_name( + f"{service_name}-pipeline-{pipeline_name}-{segment_id}" + ) + canary_deployment_name = make_k8s_name( + f"{service_name}-pipeline-{pipeline_name}-{segment_id}-canary" + ) - # Check for scalar conflicts between user template and pipeline additions - # This ensures pipeline additions don't override user-provided values - # while still allowing both to override base template defaults - try: - # Perform a test merge to detect conflicts - deepmerge(deployment_template, pipeline_additions, fail_on_scalar_overwrite=True) - except ScalarOverwriteError as e: - raise ScalarOverwriteError( - f"{e}\n\n" - f"This field is automatically set by PipelineStep and conflicts with your deployment_template. " - f"Note: Lists and dicts can be provided (they get merged), but scalar values cannot be overridden." - ) from e - - # No conflicts found, proceed with merging - # Both user template and pipeline additions can override base template - deployment = deepmerge(base_deployment, deployment_template) - deployment = deepmerge(deployment, pipeline_additions) - - if emergency_patch: - deployment = deepmerge(deployment, emergency_patch) + if effective_canary: + deployment = _build_merged_pipeline_deployment( + base_deployment=base_deployment, + deployment_template=deployment_template, + emergency_patch=emergency_patch, + deployment_name=main_deployment_name, + replica_count=replicas - 1, + step_labels=labels, + container=container, + volumes=volumes, + ) + canary_deployment = _build_merged_pipeline_deployment( + base_deployment=base_deployment, + deployment_template=deployment_template, + emergency_patch=emergency_patch, + deployment_name=canary_deployment_name, + replica_count=1, + step_labels={**labels, "env": "canary"}, + container=container, + volumes=volumes, + ) + else: + deployment = _build_merged_pipeline_deployment( + base_deployment=base_deployment, + deployment_template=deployment_template, + emergency_patch=emergency_patch, + deployment_name=main_deployment_name, + replica_count=replicas, + step_labels=labels, + container=container, + volumes=volumes, + ) configmap = { "apiVersion": "v1", @@ -491,7 +542,10 @@ def run(self, context: dict[str, Any]) -> dict[str, Any]: metadata = cast(dict[str, Any], configmap["metadata"]) metadata["namespace"] = deployment["metadata"]["namespace"] - return { + result: dict[str, Any] = { "deployment": deployment, "configmap": configmap, } + if effective_canary: + result["canary_deployment"] = canary_deployment + return result diff --git a/sentry_streams_k8s/tests/test_pipeline_step.py b/sentry_streams_k8s/tests/test_pipeline_step.py index ea390aed..01c62cc9 100644 --- a/sentry_streams_k8s/tests/test_pipeline_step.py +++ b/sentry_streams_k8s/tests/test_pipeline_step.py @@ -85,6 +85,11 @@ def test_parse_context() -> None: assert parsed_context["segment_id"] == 0 assert parsed_context["log_level"] == "INFO" assert parsed_context["replicas"] == 2 + assert parsed_context.get("with_canary") is False + + context["with_canary"] = True + assert parse_context(context)["with_canary"] is True + del context["with_canary"] context["deployment_template"] = yaml.dump(context["deployment_template"]) context["container_template"] = yaml.dump(context["container_template"]) @@ -111,6 +116,7 @@ def test_parse_context() -> None: } assert parsed_context["log_level"] == "DEBUG" assert parsed_context["replicas"] == 2 + assert parsed_context.get("with_canary") is False def test_build_container() -> None: @@ -426,6 +432,7 @@ def test_run_generates_complete_manifests() -> None: # Validate return structure assert "deployment" in result assert "configmap" in result + assert "canary_deployment" not in result assert isinstance(result["deployment"], dict) assert isinstance(result["configmap"], dict) @@ -478,6 +485,71 @@ def test_run_generates_complete_manifests() -> None: assert parsed_config == context["pipeline_config"] +def _minimal_pipeline_context(**overrides: Any) -> dict[str, Any]: + base: dict[str, Any] = { + "service_name": "my-service", + "pipeline_name": "profiles", + "deployment_template": {}, + "container_template": {}, + "pipeline_config": { + "env": {}, + "pipeline": { + "segments": [ + { + "steps_config": { + "myinput": { + "starts_segment": True, + "bootstrap_servers": ["127.0.0.1:9092"], + } + } + } + ] + }, + }, + "pipeline_module": "sbc.profiles", + "image_name": "my-image:latest", + "cpu_per_process": 1000, + "memory_per_process": 512, + "segment_id": 0, + "replicas": 3, + } + base.update(overrides) + return base + + +def test_run_with_canary_emits_main_and_canary_deployments() -> None: + """When with_canary is True and replicas > 1, main has replicas-1 and canary has env=canary.""" + result = PipelineStep().run( + _minimal_pipeline_context(with_canary=True, replicas=3), + ) + assert "canary_deployment" in result + main = result["deployment"] + canary = result["canary_deployment"] + assert main["spec"]["replicas"] == 2 + assert canary["spec"]["replicas"] == 1 + assert main["metadata"]["name"] == "my-service-pipeline-profiles-0" + assert canary["metadata"]["name"] == "my-service-pipeline-profiles-0-canary" + assert "env" not in main["spec"]["selector"]["matchLabels"] + assert canary["spec"]["selector"]["matchLabels"]["env"] == "canary" + assert canary["metadata"]["labels"]["env"] == "canary" + assert canary["spec"]["template"]["metadata"]["labels"]["env"] == "canary" + cm_name = "my-service-pipeline-profiles" + for dep in (main, canary): + vols = dep["spec"]["template"]["spec"]["volumes"] + pc = next(v for v in vols if v["name"] == "pipeline-config") + assert pc["configMap"]["name"] == cm_name + assert result["configmap"]["metadata"]["name"] == cm_name + + +def test_run_with_canary_single_replica_skips_canary_deployment() -> None: + """with_canary with replicas=1 yields a single deployment with full replica count.""" + result = PipelineStep().run( + _minimal_pipeline_context(with_canary=True, replicas=1), + ) + assert "canary_deployment" not in result + assert result["deployment"]["spec"]["replicas"] == 1 + + def test_run_includes_liveness_probe_when_enabled() -> None: """Test that run() includes liveness probe and liveness-health volume when enabled, and omits them when disabled.""" base_context: dict[str, Any] = { From b410c1a41b542318e6ae388bfb034357f1906334 Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Thu, 7 May 2026 17:10:31 -0700 Subject: [PATCH 2/4] Add some docs --- .../sentry_streams_k8s/pipeline_step.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/sentry_streams_k8s/sentry_streams_k8s/pipeline_step.py b/sentry_streams_k8s/sentry_streams_k8s/pipeline_step.py index ed98a556..655c1922 100644 --- a/sentry_streams_k8s/sentry_streams_k8s/pipeline_step.py +++ b/sentry_streams_k8s/sentry_streams_k8s/pipeline_step.py @@ -247,6 +247,14 @@ def _build_merged_pipeline_deployment( container: dict[str, Any], volumes: list[dict[str, Any]], ) -> dict[str, Any]: + """ + Assembles a k8s deployment by layering these structures on top of the base deployment + manifest: + 1. deployment_template: provided by the user + 2. the steraming platform specific additions (including the container) + 3. emergency_patch: if provided, it overrides all other layers + """ + pipeline_additions: dict[str, Any] = { "metadata": { "name": deployment_name, @@ -485,7 +493,7 @@ def run(self, context: dict[str, Any]) -> dict[str, Any]: } ) - effective_canary = ctx.get("with_canary", False) and replicas > 1 + add_canary = ctx.get("with_canary", False) and replicas > 1 main_deployment_name = make_k8s_name( f"{service_name}-pipeline-{pipeline_name}-{segment_id}" ) @@ -493,7 +501,7 @@ def run(self, context: dict[str, Any]) -> dict[str, Any]: f"{service_name}-pipeline-{pipeline_name}-{segment_id}-canary" ) - if effective_canary: + if add_canary: deployment = _build_merged_pipeline_deployment( base_deployment=base_deployment, deployment_template=deployment_template, @@ -546,6 +554,6 @@ def run(self, context: dict[str, Any]) -> dict[str, Any]: "deployment": deployment, "configmap": configmap, } - if effective_canary: + if add_canary: result["canary_deployment"] = canary_deployment return result From 8bebd295637bf580ab1ad2dc5d12be7adbacc690 Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Fri, 8 May 2026 11:33:29 -0700 Subject: [PATCH 3/4] fix(k8s): Add env:primary to main deployment when canary split is on Use disjoint matchLabels (primary vs canary) to avoid overlapping selectors. Also fix a docstring typo ("streaming"). Co-authored-by: Cursor --- sentry_streams_k8s/sentry_streams_k8s/pipeline_step.py | 8 +++++--- sentry_streams_k8s/tests/test_pipeline_step.py | 6 ++++-- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/sentry_streams_k8s/sentry_streams_k8s/pipeline_step.py b/sentry_streams_k8s/sentry_streams_k8s/pipeline_step.py index 655c1922..7d00a7a4 100644 --- a/sentry_streams_k8s/sentry_streams_k8s/pipeline_step.py +++ b/sentry_streams_k8s/sentry_streams_k8s/pipeline_step.py @@ -251,7 +251,7 @@ def _build_merged_pipeline_deployment( Assembles a k8s deployment by layering these structures on top of the base deployment manifest: 1. deployment_template: provided by the user - 2. the steraming platform specific additions (including the container) + 2. the streaming platform specific additions (including the container) 3. emergency_patch: if provided, it overrides all other layers """ @@ -412,7 +412,9 @@ def run(self, context: dict[str, Any]) -> dict[str, Any]: Returns: Dictionary with 'deployment' and 'configmap' keys. When canary splitting is active (``with_canary`` and ``replicas`` > 1), also - includes ``canary_deployment``. + includes ``canary_deployment``. In that case the main deployment's + pods use ``env: primary`` and the canary uses ``env: canary`` so + selector ``matchLabels`` do not overlap. """ ctx = parse_context(context) @@ -508,7 +510,7 @@ def run(self, context: dict[str, Any]) -> dict[str, Any]: emergency_patch=emergency_patch, deployment_name=main_deployment_name, replica_count=replicas - 1, - step_labels=labels, + step_labels={**labels, "env": "primary"}, container=container, volumes=volumes, ) diff --git a/sentry_streams_k8s/tests/test_pipeline_step.py b/sentry_streams_k8s/tests/test_pipeline_step.py index 01c62cc9..a884610d 100644 --- a/sentry_streams_k8s/tests/test_pipeline_step.py +++ b/sentry_streams_k8s/tests/test_pipeline_step.py @@ -518,7 +518,7 @@ def _minimal_pipeline_context(**overrides: Any) -> dict[str, Any]: def test_run_with_canary_emits_main_and_canary_deployments() -> None: - """When with_canary is True and replicas > 1, main has replicas-1 and canary has env=canary.""" + """When with_canary is True and replicas > 1, main has env=primary and canary env=canary.""" result = PipelineStep().run( _minimal_pipeline_context(with_canary=True, replicas=3), ) @@ -529,7 +529,9 @@ def test_run_with_canary_emits_main_and_canary_deployments() -> None: assert canary["spec"]["replicas"] == 1 assert main["metadata"]["name"] == "my-service-pipeline-profiles-0" assert canary["metadata"]["name"] == "my-service-pipeline-profiles-0-canary" - assert "env" not in main["spec"]["selector"]["matchLabels"] + assert main["spec"]["selector"]["matchLabels"]["env"] == "primary" + assert main["metadata"]["labels"]["env"] == "primary" + assert main["spec"]["template"]["metadata"]["labels"]["env"] == "primary" assert canary["spec"]["selector"]["matchLabels"]["env"] == "canary" assert canary["metadata"]["labels"]["env"] == "canary" assert canary["spec"]["template"]["metadata"]["labels"]["env"] == "canary" From 337de6a15aba439738c637cfa24ac021c2bf5068 Mon Sep 17 00:00:00 2001 From: Filippo Pacifici Date: Fri, 8 May 2026 11:52:36 -0700 Subject: [PATCH 4/4] Add env when needed --- sentry_streams_k8s/sentry_streams_k8s/pipeline_step.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sentry_streams_k8s/sentry_streams_k8s/pipeline_step.py b/sentry_streams_k8s/sentry_streams_k8s/pipeline_step.py index 7d00a7a4..be29e8a2 100644 --- a/sentry_streams_k8s/sentry_streams_k8s/pipeline_step.py +++ b/sentry_streams_k8s/sentry_streams_k8s/pipeline_step.py @@ -531,7 +531,7 @@ def run(self, context: dict[str, Any]) -> dict[str, Any]: emergency_patch=emergency_patch, deployment_name=main_deployment_name, replica_count=replicas, - step_labels=labels, + step_labels={**labels, "env": "primary"}, container=container, volumes=volumes, )