Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 107 additions & 43 deletions sentry_streams_k8s/sentry_streams_k8s/pipeline_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,9 +232,66 @@ def parse_context(context: dict[str, Any]) -> PipelineStepContext:
"emergency_patch": emergency_patch_parsed,
"enable_liveness_probe": context.get("enable_liveness_probe", True),
"container_name": context.get("container_name", "pipeline-consumer"),
"with_canary": bool(context.get("with_canary", False)),
}


def _build_merged_pipeline_deployment(
*,
base_deployment: dict[str, Any],
deployment_template: dict[str, Any],
emergency_patch: dict[str, Any],
deployment_name: str,
replica_count: int,
step_labels: dict[str, Any],
container: dict[str, Any],
volumes: list[dict[str, Any]],
) -> dict[str, Any]:
"""
Assembles a k8s deployment by layering these structures on top of the base deployment
manifest:
1. deployment_template: provided by the user
2. the streaming platform specific additions (including the container)
3. emergency_patch: if provided, it overrides all other layers
"""

pipeline_additions: dict[str, Any] = {
"metadata": {
"name": deployment_name,
"labels": step_labels,
},
"spec": {
"replicas": replica_count,
"selector": {
"matchLabels": step_labels,
},
"template": {
"metadata": {
"labels": step_labels,
},
"spec": {
"containers": [container],
"volumes": volumes,
},
},
},
}
try:
deepmerge(deployment_template, pipeline_additions, fail_on_scalar_overwrite=True)
except ScalarOverwriteError as e:
raise ScalarOverwriteError(
f"{e}\n\n"
f"This field is automatically set by PipelineStep and conflicts with your deployment_template. "
f"Note: Lists and dicts can be provided (they get merged), but scalar values cannot be overridden."
) from e

deployment = deepmerge(base_deployment, deployment_template)
deployment = deepmerge(deployment, pipeline_additions)
if emergency_patch:
deployment = deepmerge(deployment, emergency_patch)
return deployment


class PipelineStepContext(TypedDict):
"""Context dictionary for PipelineStep macro."""

Expand All @@ -253,6 +310,7 @@ class PipelineStepContext(TypedDict):
emergency_patch: NotRequired[dict[str, Any]]
enable_liveness_probe: NotRequired[bool]
container_name: NotRequired[str]
with_canary: NotRequired[bool]


class PipelineStep(ExternalMacro):
Expand Down Expand Up @@ -297,6 +355,7 @@ class PipelineStep(ExternalMacro):
"cpu_per_process": 1000,
"memory_per_process": 512,
"replicas": 3,
"with_canary": True,
}
)
}}
Expand Down Expand Up @@ -351,7 +410,11 @@ def run(self, context: dict[str, Any]) -> dict[str, Any]:
2. Merge pipeline-specific configuration onto the result

Returns:
Dictionary with 'deployment' and 'configmap' keys
Dictionary with 'deployment' and 'configmap' keys. When canary
splitting is active (``with_canary`` and ``replicas`` > 1), also
includes ``canary_deployment``. In that case the main deployment's
pods use ``env: primary`` and the canary uses ``env: canary`` so
selector ``matchLabels`` do not overlap.
"""

ctx = parse_context(context)
Expand Down Expand Up @@ -432,48 +495,46 @@ def run(self, context: dict[str, Any]) -> dict[str, Any]:
}
)

pipeline_additions = {
"metadata": {
"name": make_k8s_name(f"{service_name}-pipeline-{pipeline_name}-{segment_id}"),
"labels": labels,
},
"spec": {
"replicas": replicas,
"selector": {
"matchLabels": labels,
},
"template": {
"metadata": {
"labels": labels,
},
"spec": {
"containers": [container],
"volumes": volumes,
},
},
},
}
add_canary = ctx.get("with_canary", False) and replicas > 1
main_deployment_name = make_k8s_name(
f"{service_name}-pipeline-{pipeline_name}-{segment_id}"
)
canary_deployment_name = make_k8s_name(
f"{service_name}-pipeline-{pipeline_name}-{segment_id}-canary"
)

# Check for scalar conflicts between user template and pipeline additions
# This ensures pipeline additions don't override user-provided values
# while still allowing both to override base template defaults
try:
# Perform a test merge to detect conflicts
deepmerge(deployment_template, pipeline_additions, fail_on_scalar_overwrite=True)
except ScalarOverwriteError as e:
raise ScalarOverwriteError(
f"{e}\n\n"
f"This field is automatically set by PipelineStep and conflicts with your deployment_template. "
f"Note: Lists and dicts can be provided (they get merged), but scalar values cannot be overridden."
) from e

# No conflicts found, proceed with merging
# Both user template and pipeline additions can override base template
deployment = deepmerge(base_deployment, deployment_template)
deployment = deepmerge(deployment, pipeline_additions)

if emergency_patch:
deployment = deepmerge(deployment, emergency_patch)
if add_canary:
deployment = _build_merged_pipeline_deployment(
base_deployment=base_deployment,
deployment_template=deployment_template,
emergency_patch=emergency_patch,
deployment_name=main_deployment_name,
replica_count=replicas - 1,
step_labels={**labels, "env": "primary"},
container=container,
volumes=volumes,
)
Comment thread
sentry[bot] marked this conversation as resolved.
canary_deployment = _build_merged_pipeline_deployment(
base_deployment=base_deployment,
deployment_template=deployment_template,
emergency_patch=emergency_patch,
deployment_name=canary_deployment_name,
replica_count=1,
step_labels={**labels, "env": "canary"},
container=container,
volumes=volumes,
)
Comment thread
cursor[bot] marked this conversation as resolved.
else:
deployment = _build_merged_pipeline_deployment(
base_deployment=base_deployment,
deployment_template=deployment_template,
emergency_patch=emergency_patch,
deployment_name=main_deployment_name,
replica_count=replicas,
step_labels={**labels, "env": "primary"},
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The env: primary label is unconditionally added to Kubernetes deployments, which will break updates for existing non-canary deployments due to an immutable field.
Severity: HIGH

Suggested Fix

Remove the addition of the env: primary label from the else block at line 534. The step_labels dictionary should only be assigned labels when add_canary is False. This will ensure the label is only added for canary deployments as intended. Consider adding a test to assert the env label is absent when canaries are disabled.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent. Verify if this is a real issue. If it is, propose a fix; if not, explain why it's
not valid.

Location: sentry_streams_k8s/sentry_streams_k8s/pipeline_step.py#L534

Potential issue: The `env: primary` label is unconditionally added to the
`spec.selector.matchLabels`, `metadata.labels`, and `spec.template.metadata.labels` of
all Kubernetes deployments, even when the `with_canary` flag is `False`. Because the
`spec.selector.matchLabels` field is immutable in Kubernetes, attempting to apply this
updated configuration to any existing deployment will be rejected. This forces users who
are not using the canary feature to manually delete and recreate their deployments,
causing a service interruption. The logic in the `else` branch at line 534 incorrectly
adds this label when `add_canary` is `False`.

container=container,
volumes=volumes,
)
Comment thread
cursor[bot] marked this conversation as resolved.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New selector label breaks existing non-canary deployments

High Severity

The else branch (non-canary path) unconditionally adds "env": "primary" to step_labels, which propagates into spec.selector.matchLabels. Before this PR, the selector only contained pipeline-app, pipeline, and service. Since Kubernetes Deployment selectors are immutable after creation, any existing deployment that was created without the env label will fail to update — kubectl apply (or equivalent) will be rejected by the API server. This affects all deployments, not just those opting into canary. The env label in the selector is only needed when canary splitting is active to differentiate primary from canary pods; for non-canary deployments, the original label set suffices.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 337de6a. Configure here.


configmap = {
"apiVersion": "v1",
Expand All @@ -491,7 +552,10 @@ def run(self, context: dict[str, Any]) -> dict[str, Any]:
metadata = cast(dict[str, Any], configmap["metadata"])
metadata["namespace"] = deployment["metadata"]["namespace"]

return {
result: dict[str, Any] = {
"deployment": deployment,
"configmap": configmap,
}
if add_canary:
result["canary_deployment"] = canary_deployment
return result
74 changes: 74 additions & 0 deletions sentry_streams_k8s/tests/test_pipeline_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ def test_parse_context() -> None:
assert parsed_context["segment_id"] == 0
assert parsed_context["log_level"] == "INFO"
assert parsed_context["replicas"] == 2
assert parsed_context.get("with_canary") is False

context["with_canary"] = True
assert parse_context(context)["with_canary"] is True
del context["with_canary"]

context["deployment_template"] = yaml.dump(context["deployment_template"])
context["container_template"] = yaml.dump(context["container_template"])
Expand All @@ -111,6 +116,7 @@ def test_parse_context() -> None:
}
assert parsed_context["log_level"] == "DEBUG"
assert parsed_context["replicas"] == 2
assert parsed_context.get("with_canary") is False


def test_build_container() -> None:
Expand Down Expand Up @@ -426,6 +432,7 @@ def test_run_generates_complete_manifests() -> None:
# Validate return structure
assert "deployment" in result
assert "configmap" in result
assert "canary_deployment" not in result
assert isinstance(result["deployment"], dict)
assert isinstance(result["configmap"], dict)

Expand Down Expand Up @@ -478,6 +485,73 @@ def test_run_generates_complete_manifests() -> None:
assert parsed_config == context["pipeline_config"]


def _minimal_pipeline_context(**overrides: Any) -> dict[str, Any]:
base: dict[str, Any] = {
"service_name": "my-service",
"pipeline_name": "profiles",
"deployment_template": {},
"container_template": {},
"pipeline_config": {
"env": {},
"pipeline": {
"segments": [
{
"steps_config": {
"myinput": {
"starts_segment": True,
"bootstrap_servers": ["127.0.0.1:9092"],
}
}
}
]
},
},
"pipeline_module": "sbc.profiles",
"image_name": "my-image:latest",
"cpu_per_process": 1000,
"memory_per_process": 512,
"segment_id": 0,
"replicas": 3,
}
base.update(overrides)
return base


def test_run_with_canary_emits_main_and_canary_deployments() -> None:
"""When with_canary is True and replicas > 1, main has env=primary and canary env=canary."""
result = PipelineStep().run(
_minimal_pipeline_context(with_canary=True, replicas=3),
)
assert "canary_deployment" in result
main = result["deployment"]
canary = result["canary_deployment"]
assert main["spec"]["replicas"] == 2
assert canary["spec"]["replicas"] == 1
assert main["metadata"]["name"] == "my-service-pipeline-profiles-0"
assert canary["metadata"]["name"] == "my-service-pipeline-profiles-0-canary"
assert main["spec"]["selector"]["matchLabels"]["env"] == "primary"
assert main["metadata"]["labels"]["env"] == "primary"
assert main["spec"]["template"]["metadata"]["labels"]["env"] == "primary"
assert canary["spec"]["selector"]["matchLabels"]["env"] == "canary"
assert canary["metadata"]["labels"]["env"] == "canary"
assert canary["spec"]["template"]["metadata"]["labels"]["env"] == "canary"
cm_name = "my-service-pipeline-profiles"
for dep in (main, canary):
vols = dep["spec"]["template"]["spec"]["volumes"]
pc = next(v for v in vols if v["name"] == "pipeline-config")
assert pc["configMap"]["name"] == cm_name
assert result["configmap"]["metadata"]["name"] == cm_name


def test_run_with_canary_single_replica_skips_canary_deployment() -> None:
"""with_canary with replicas=1 yields a single deployment with full replica count."""
result = PipelineStep().run(
_minimal_pipeline_context(with_canary=True, replicas=1),
)
assert "canary_deployment" not in result
assert result["deployment"]["spec"]["replicas"] == 1


def test_run_includes_liveness_probe_when_enabled() -> None:
"""Test that run() includes liveness probe and liveness-health volume when enabled, and omits them when disabled."""
base_context: dict[str, Any] = {
Expand Down
Loading