diff --git a/sentry_streams_k8s/sentry_streams_k8s/pipeline_step.py b/sentry_streams_k8s/sentry_streams_k8s/pipeline_step.py index 3a13b310..be29e8a2 100644 --- a/sentry_streams_k8s/sentry_streams_k8s/pipeline_step.py +++ b/sentry_streams_k8s/sentry_streams_k8s/pipeline_step.py @@ -232,9 +232,66 @@ def parse_context(context: dict[str, Any]) -> PipelineStepContext: "emergency_patch": emergency_patch_parsed, "enable_liveness_probe": context.get("enable_liveness_probe", True), "container_name": context.get("container_name", "pipeline-consumer"), + "with_canary": bool(context.get("with_canary", False)), } +def _build_merged_pipeline_deployment( + *, + base_deployment: dict[str, Any], + deployment_template: dict[str, Any], + emergency_patch: dict[str, Any], + deployment_name: str, + replica_count: int, + step_labels: dict[str, Any], + container: dict[str, Any], + volumes: list[dict[str, Any]], +) -> dict[str, Any]: + """ + Assembles a k8s deployment by layering these structures on top of the base deployment + manifest: + 1. deployment_template: provided by the user + 2. the streaming platform specific additions (including the container) + 3. emergency_patch: if provided, it overrides all other layers + """ + + pipeline_additions: dict[str, Any] = { + "metadata": { + "name": deployment_name, + "labels": step_labels, + }, + "spec": { + "replicas": replica_count, + "selector": { + "matchLabels": step_labels, + }, + "template": { + "metadata": { + "labels": step_labels, + }, + "spec": { + "containers": [container], + "volumes": volumes, + }, + }, + }, + } + try: + deepmerge(deployment_template, pipeline_additions, fail_on_scalar_overwrite=True) + except ScalarOverwriteError as e: + raise ScalarOverwriteError( + f"{e}\n\n" + f"This field is automatically set by PipelineStep and conflicts with your deployment_template. " + f"Note: Lists and dicts can be provided (they get merged), but scalar values cannot be overridden." + ) from e + + deployment = deepmerge(base_deployment, deployment_template) + deployment = deepmerge(deployment, pipeline_additions) + if emergency_patch: + deployment = deepmerge(deployment, emergency_patch) + return deployment + + class PipelineStepContext(TypedDict): """Context dictionary for PipelineStep macro.""" @@ -253,6 +310,7 @@ class PipelineStepContext(TypedDict): emergency_patch: NotRequired[dict[str, Any]] enable_liveness_probe: NotRequired[bool] container_name: NotRequired[str] + with_canary: NotRequired[bool] class PipelineStep(ExternalMacro): @@ -297,6 +355,7 @@ class PipelineStep(ExternalMacro): "cpu_per_process": 1000, "memory_per_process": 512, "replicas": 3, + "with_canary": True, } ) }} @@ -351,7 +410,11 @@ def run(self, context: dict[str, Any]) -> dict[str, Any]: 2. Merge pipeline-specific configuration onto the result Returns: - Dictionary with 'deployment' and 'configmap' keys + Dictionary with 'deployment' and 'configmap' keys. When canary + splitting is active (``with_canary`` and ``replicas`` > 1), also + includes ``canary_deployment``. In that case the main deployment's + pods use ``env: primary`` and the canary uses ``env: canary`` so + selector ``matchLabels`` do not overlap. """ ctx = parse_context(context) @@ -432,48 +495,46 @@ def run(self, context: dict[str, Any]) -> dict[str, Any]: } ) - pipeline_additions = { - "metadata": { - "name": make_k8s_name(f"{service_name}-pipeline-{pipeline_name}-{segment_id}"), - "labels": labels, - }, - "spec": { - "replicas": replicas, - "selector": { - "matchLabels": labels, - }, - "template": { - "metadata": { - "labels": labels, - }, - "spec": { - "containers": [container], - "volumes": volumes, - }, - }, - }, - } + add_canary = ctx.get("with_canary", False) and replicas > 1 + main_deployment_name = make_k8s_name( + f"{service_name}-pipeline-{pipeline_name}-{segment_id}" + ) + canary_deployment_name = make_k8s_name( + f"{service_name}-pipeline-{pipeline_name}-{segment_id}-canary" + ) - # Check for scalar conflicts between user template and pipeline additions - # This ensures pipeline additions don't override user-provided values - # while still allowing both to override base template defaults - try: - # Perform a test merge to detect conflicts - deepmerge(deployment_template, pipeline_additions, fail_on_scalar_overwrite=True) - except ScalarOverwriteError as e: - raise ScalarOverwriteError( - f"{e}\n\n" - f"This field is automatically set by PipelineStep and conflicts with your deployment_template. " - f"Note: Lists and dicts can be provided (they get merged), but scalar values cannot be overridden." - ) from e - - # No conflicts found, proceed with merging - # Both user template and pipeline additions can override base template - deployment = deepmerge(base_deployment, deployment_template) - deployment = deepmerge(deployment, pipeline_additions) - - if emergency_patch: - deployment = deepmerge(deployment, emergency_patch) + if add_canary: + deployment = _build_merged_pipeline_deployment( + base_deployment=base_deployment, + deployment_template=deployment_template, + emergency_patch=emergency_patch, + deployment_name=main_deployment_name, + replica_count=replicas - 1, + step_labels={**labels, "env": "primary"}, + container=container, + volumes=volumes, + ) + canary_deployment = _build_merged_pipeline_deployment( + base_deployment=base_deployment, + deployment_template=deployment_template, + emergency_patch=emergency_patch, + deployment_name=canary_deployment_name, + replica_count=1, + step_labels={**labels, "env": "canary"}, + container=container, + volumes=volumes, + ) + else: + deployment = _build_merged_pipeline_deployment( + base_deployment=base_deployment, + deployment_template=deployment_template, + emergency_patch=emergency_patch, + deployment_name=main_deployment_name, + replica_count=replicas, + step_labels={**labels, "env": "primary"}, + container=container, + volumes=volumes, + ) configmap = { "apiVersion": "v1", @@ -491,7 +552,10 @@ def run(self, context: dict[str, Any]) -> dict[str, Any]: metadata = cast(dict[str, Any], configmap["metadata"]) metadata["namespace"] = deployment["metadata"]["namespace"] - return { + result: dict[str, Any] = { "deployment": deployment, "configmap": configmap, } + if add_canary: + result["canary_deployment"] = canary_deployment + return result diff --git a/sentry_streams_k8s/tests/test_pipeline_step.py b/sentry_streams_k8s/tests/test_pipeline_step.py index ea390aed..a884610d 100644 --- a/sentry_streams_k8s/tests/test_pipeline_step.py +++ b/sentry_streams_k8s/tests/test_pipeline_step.py @@ -85,6 +85,11 @@ def test_parse_context() -> None: assert parsed_context["segment_id"] == 0 assert parsed_context["log_level"] == "INFO" assert parsed_context["replicas"] == 2 + assert parsed_context.get("with_canary") is False + + context["with_canary"] = True + assert parse_context(context)["with_canary"] is True + del context["with_canary"] context["deployment_template"] = yaml.dump(context["deployment_template"]) context["container_template"] = yaml.dump(context["container_template"]) @@ -111,6 +116,7 @@ def test_parse_context() -> None: } assert parsed_context["log_level"] == "DEBUG" assert parsed_context["replicas"] == 2 + assert parsed_context.get("with_canary") is False def test_build_container() -> None: @@ -426,6 +432,7 @@ def test_run_generates_complete_manifests() -> None: # Validate return structure assert "deployment" in result assert "configmap" in result + assert "canary_deployment" not in result assert isinstance(result["deployment"], dict) assert isinstance(result["configmap"], dict) @@ -478,6 +485,73 @@ def test_run_generates_complete_manifests() -> None: assert parsed_config == context["pipeline_config"] +def _minimal_pipeline_context(**overrides: Any) -> dict[str, Any]: + base: dict[str, Any] = { + "service_name": "my-service", + "pipeline_name": "profiles", + "deployment_template": {}, + "container_template": {}, + "pipeline_config": { + "env": {}, + "pipeline": { + "segments": [ + { + "steps_config": { + "myinput": { + "starts_segment": True, + "bootstrap_servers": ["127.0.0.1:9092"], + } + } + } + ] + }, + }, + "pipeline_module": "sbc.profiles", + "image_name": "my-image:latest", + "cpu_per_process": 1000, + "memory_per_process": 512, + "segment_id": 0, + "replicas": 3, + } + base.update(overrides) + return base + + +def test_run_with_canary_emits_main_and_canary_deployments() -> None: + """When with_canary is True and replicas > 1, main has env=primary and canary env=canary.""" + result = PipelineStep().run( + _minimal_pipeline_context(with_canary=True, replicas=3), + ) + assert "canary_deployment" in result + main = result["deployment"] + canary = result["canary_deployment"] + assert main["spec"]["replicas"] == 2 + assert canary["spec"]["replicas"] == 1 + assert main["metadata"]["name"] == "my-service-pipeline-profiles-0" + assert canary["metadata"]["name"] == "my-service-pipeline-profiles-0-canary" + assert main["spec"]["selector"]["matchLabels"]["env"] == "primary" + assert main["metadata"]["labels"]["env"] == "primary" + assert main["spec"]["template"]["metadata"]["labels"]["env"] == "primary" + assert canary["spec"]["selector"]["matchLabels"]["env"] == "canary" + assert canary["metadata"]["labels"]["env"] == "canary" + assert canary["spec"]["template"]["metadata"]["labels"]["env"] == "canary" + cm_name = "my-service-pipeline-profiles" + for dep in (main, canary): + vols = dep["spec"]["template"]["spec"]["volumes"] + pc = next(v for v in vols if v["name"] == "pipeline-config") + assert pc["configMap"]["name"] == cm_name + assert result["configmap"]["metadata"]["name"] == cm_name + + +def test_run_with_canary_single_replica_skips_canary_deployment() -> None: + """with_canary with replicas=1 yields a single deployment with full replica count.""" + result = PipelineStep().run( + _minimal_pipeline_context(with_canary=True, replicas=1), + ) + assert "canary_deployment" not in result + assert result["deployment"]["spec"]["replicas"] == 1 + + def test_run_includes_liveness_probe_when_enabled() -> None: """Test that run() includes liveness probe and liveness-health volume when enabled, and omits them when disabled.""" base_context: dict[str, Any] = {