From 479f7578c0bf00800267d64ce7affc665b486ad4 Mon Sep 17 00:00:00 2001 From: Nick Larsen Date: Fri, 10 Apr 2026 12:43:25 +0200 Subject: [PATCH 1/7] test: Add final step to scale down kafka when using KRaft This ensures Kafka brokers don't hang when kraft controlers are shutdown on namespace deletion by kuttl --- .../kuttl/configuration/90-shutdown-kafka.yaml | 16 ++++++++++++++++ .../operations-kraft/90-shutdown-kafka.yaml | 16 ++++++++++++++++ .../kuttl/smoke-kraft/90-shutdown-kafka.yaml | 16 ++++++++++++++++ .../kuttl/upgrade/90-shutdown-kafka.yaml | 16 ++++++++++++++++ 4 files changed, 64 insertions(+) create mode 100644 tests/templates/kuttl/configuration/90-shutdown-kafka.yaml create mode 100644 tests/templates/kuttl/operations-kraft/90-shutdown-kafka.yaml create mode 100644 tests/templates/kuttl/smoke-kraft/90-shutdown-kafka.yaml create mode 100644 tests/templates/kuttl/upgrade/90-shutdown-kafka.yaml diff --git a/tests/templates/kuttl/configuration/90-shutdown-kafka.yaml b/tests/templates/kuttl/configuration/90-shutdown-kafka.yaml new file mode 100644 index 00000000..97ed2393 --- /dev/null +++ b/tests/templates/kuttl/configuration/90-shutdown-kafka.yaml @@ -0,0 +1,16 @@ +--- +# Scale Kafka down before kuttl deletes the namespace. +# Without this, Kafka pods may hang during shutdown. +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + # Scale down brokers first, since they depend on controllers. + - script: | + kubectl patch kafkacluster test-kafka -n $NAMESPACE --type merge -p '{"spec":{"brokers":{"roleGroups":{"default":{"replicas":0}}}}}' + - script: | + kubectl wait --for=delete pod -l app.kubernetes.io/instance=test-kafka,app.kubernetes.io/component=broker -n $NAMESPACE --timeout=300s + # Then scale down controllers once all brokers are gone. + - script: | + kubectl patch kafkacluster test-kafka -n $NAMESPACE --type merge -p '{"spec":{"controllers":{"roleGroups":{"default":{"replicas":0}}}}}' + - script: | + kubectl wait --for=delete pod -l app.kubernetes.io/instance=test-kafka -n $NAMESPACE --timeout=300s diff --git a/tests/templates/kuttl/operations-kraft/90-shutdown-kafka.yaml b/tests/templates/kuttl/operations-kraft/90-shutdown-kafka.yaml new file mode 100644 index 00000000..97ed2393 --- /dev/null +++ b/tests/templates/kuttl/operations-kraft/90-shutdown-kafka.yaml @@ -0,0 +1,16 @@ +--- +# Scale Kafka down before kuttl deletes the namespace. +# Without this, Kafka pods may hang during shutdown. +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + # Scale down brokers first, since they depend on controllers. + - script: | + kubectl patch kafkacluster test-kafka -n $NAMESPACE --type merge -p '{"spec":{"brokers":{"roleGroups":{"default":{"replicas":0}}}}}' + - script: | + kubectl wait --for=delete pod -l app.kubernetes.io/instance=test-kafka,app.kubernetes.io/component=broker -n $NAMESPACE --timeout=300s + # Then scale down controllers once all brokers are gone. + - script: | + kubectl patch kafkacluster test-kafka -n $NAMESPACE --type merge -p '{"spec":{"controllers":{"roleGroups":{"default":{"replicas":0}}}}}' + - script: | + kubectl wait --for=delete pod -l app.kubernetes.io/instance=test-kafka -n $NAMESPACE --timeout=300s diff --git a/tests/templates/kuttl/smoke-kraft/90-shutdown-kafka.yaml b/tests/templates/kuttl/smoke-kraft/90-shutdown-kafka.yaml new file mode 100644 index 00000000..8d201b64 --- /dev/null +++ b/tests/templates/kuttl/smoke-kraft/90-shutdown-kafka.yaml @@ -0,0 +1,16 @@ +--- +# Scale Kafka down before kuttl deletes the namespace. +# Without this, Kafka pods may hang during shutdown. +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + # Scale down brokers first, since they depend on controllers. + - script: | + kubectl patch kafkacluster test-kafka -n $NAMESPACE --type merge -p '{"spec":{"brokers":{"roleGroups":{"default":{"replicas":0},"automatic-log-config":{"replicas":0},"custom-log-config":{"replicas":0}}}}}' + - script: | + kubectl wait --for=delete pod -l app.kubernetes.io/instance=test-kafka,app.kubernetes.io/component=broker -n $NAMESPACE --timeout=300s + # Then scale down controllers once all brokers are gone. + - script: | + kubectl patch kafkacluster test-kafka -n $NAMESPACE --type merge -p '{"spec":{"controllers":{"roleGroups":{"automatic-log-config":{"replicas":0},"custom-log-config":{"replicas":0}}}}}' + - script: | + kubectl wait --for=delete pod -l app.kubernetes.io/instance=test-kafka -n $NAMESPACE --timeout=300s diff --git a/tests/templates/kuttl/upgrade/90-shutdown-kafka.yaml b/tests/templates/kuttl/upgrade/90-shutdown-kafka.yaml new file mode 100644 index 00000000..97ed2393 --- /dev/null +++ b/tests/templates/kuttl/upgrade/90-shutdown-kafka.yaml @@ -0,0 +1,16 @@ +--- +# Scale Kafka down before kuttl deletes the namespace. +# Without this, Kafka pods may hang during shutdown. +apiVersion: kuttl.dev/v1beta1 +kind: TestStep +commands: + # Scale down brokers first, since they depend on controllers. + - script: | + kubectl patch kafkacluster test-kafka -n $NAMESPACE --type merge -p '{"spec":{"brokers":{"roleGroups":{"default":{"replicas":0}}}}}' + - script: | + kubectl wait --for=delete pod -l app.kubernetes.io/instance=test-kafka,app.kubernetes.io/component=broker -n $NAMESPACE --timeout=300s + # Then scale down controllers once all brokers are gone. + - script: | + kubectl patch kafkacluster test-kafka -n $NAMESPACE --type merge -p '{"spec":{"controllers":{"roleGroups":{"default":{"replicas":0}}}}}' + - script: | + kubectl wait --for=delete pod -l app.kubernetes.io/instance=test-kafka -n $NAMESPACE --timeout=300s From 05789d2880c5b834d6cce5e6760f9e73e3897e4c Mon Sep 17 00:00:00 2001 From: Nick Larsen Date: Fri, 8 May 2026 12:14:05 +0200 Subject: [PATCH 2/7] fix(kraft): Shutdown brokers before controllers --- rust/operator-binary/src/kafka_controller.rs | 95 ++++++++++++++++++-- 1 file changed, 90 insertions(+), 5 deletions(-) diff --git a/rust/operator-binary/src/kafka_controller.rs b/rust/operator-binary/src/kafka_controller.rs index c62cf520..0273d78b 100644 --- a/rust/operator-binary/src/kafka_controller.rs +++ b/rust/operator-binary/src/kafka_controller.rs @@ -4,7 +4,7 @@ use std::{collections::HashMap, str::FromStr, sync::Arc}; use const_format::concatcp; use product_config::{ProductConfigManager, types::PropertyNameKind}; -use snafu::{ResultExt, Snafu}; +use snafu::{OptionExt as _, ResultExt as _, Snafu}; use stackable_operator::{ cli::OperatorEnvironmentOptions, cluster_resources::{ClusterResourceApplyStrategy, ClusterResources}, @@ -13,12 +13,14 @@ use stackable_operator::{ rbac::build_rbac_resources, }, crd::listener, + k8s_openapi::api::apps::v1::StatefulSet, kube::{ - Resource, + Resource, ResourceExt as _, api::DynamicObject, core::{DeserializeGuard, error_boundary}, runtime::{controller::Action, reflector::ObjectRef}, }, + kvp::consts::{K8S_APP_COMPONENT_KEY, K8S_APP_INSTANCE_KEY, K8S_APP_NAME_KEY}, logging::controller::ReconcilerError, product_config_utils::{ ValidatedRoleConfigByPropertyKind, transform_all_roles_to_config, @@ -58,6 +60,10 @@ use crate::{ pub const KAFKA_CONTROLLER_NAME: &str = "kafkacluster"; pub const KAFKA_FULL_CONTROLLER_NAME: &str = concatcp!(KAFKA_CONTROLLER_NAME, '.', OPERATOR_NAME); +/// How long to wait between reconciliation attempts while waiting for broker pods +/// to terminate during a sequenced shutdown (brokers before controllers). +const SEQUENCED_SHUTDOWN_REQUEUE: Duration = Duration::from_secs(10); + pub struct Ctx { pub client: stackable_operator::client::Client, pub product_config: ProductConfigManager, @@ -211,6 +217,14 @@ pub enum Error { #[snafu(display("object defines no namespace"))] GetOpaConfig { source: authorization::Error }, + + #[snafu(display("object has no namespace"))] + ObjectHasNoNamespace, + + #[snafu(display("failed to check broker shutdown status"))] + CheckBrokerShutdownStatus { + source: stackable_operator::client::Error, + }, } type Result = std::result::Result; @@ -252,10 +266,37 @@ impl ReconcilerError for Error { Error::InvalidKafkaListeners { .. } => None, Error::BuildPodDescriptors { .. } => None, Error::GetOpaConfig { .. } => None, + Error::ObjectHasNoNamespace => None, + Error::CheckBrokerShutdownStatus { .. } => None, } } } +/// Returns `true` if all broker StatefulSets have been fully scaled down (0 running replicas). +async fn broker_pods_stopped( + client: &stackable_operator::client::Client, + kafka: &v1alpha1::KafkaCluster, +) -> Result { + let namespace = kafka.namespace().context(ObjectHasNoNamespaceSnafu)?; + let label_selector = format!( + "{K8S_APP_NAME_KEY}={APP_NAME},{K8S_APP_INSTANCE_KEY}={},{K8S_APP_COMPONENT_KEY}={}", + kafka.name_any(), + KafkaRole::Broker, + ); + let list_params = stackable_operator::kube::api::ListParams { + label_selector: Some(label_selector), + ..Default::default() + }; + let broker_statefulsets: Vec = client + .list(namespace.as_str(), &list_params) + .await + .context(CheckBrokerShutdownStatusSnafu)?; + + Ok(broker_statefulsets + .iter() + .all(|sts| sts.status.as_ref().is_none_or(|s| s.replicas == 0))) +} + pub async fn reconcile_kafka( kafka: Arc>, ctx: Arc, @@ -280,12 +321,36 @@ pub async fn reconcile_kafka( ) .context(ResolveProductImageSnafu)?; + // When the cluster is being stopped and has KRaft controllers, we need to sequence + // the shutdown: brokers must stop before controllers, because brokers depend on + // controllers for the controlled shutdown protocol. Without this, brokers hang + // waiting for controller responses until terminationGracePeriodSeconds expires. + let stopped = kafka.spec.cluster_operation.stopped; + let has_controllers = kafka.spec.controllers.is_some(); + let sequenced_shutdown = stopped && has_controllers; + + // Use Default instead of ClusterStopped because ClusterStopped zeroes all + // StatefulSet replicas unconditionally via maybe_mutate(). We need to control + // replica counts per role to sequence the shutdown (brokers before controllers), + // so we set replicas to 0 ourselves on each StatefulSet as needed. + let apply_strategy = if sequenced_shutdown { + ClusterResourceApplyStrategy::Default + } else { + ClusterResourceApplyStrategy::from(&kafka.spec.cluster_operation) + }; + + let brokers_stopped = if sequenced_shutdown { + broker_pods_stopped(client, kafka).await? + } else { + false + }; + let mut cluster_resources = ClusterResources::new( APP_NAME, OPERATOR_NAME, KAFKA_CONTROLLER_NAME, &kafka.object_ref(&()), - ClusterResourceApplyStrategy::from(&kafka.spec.cluster_operation), + apply_strategy, &kafka.spec.object_overrides, ) .context(CreateClusterResourcesSnafu)?; @@ -406,7 +471,7 @@ pub async fn reconcile_kafka( ) .context(BuildConfigMapSnafu)?; - let rg_statefulset = match kafka_role { + let mut rg_statefulset = match kafka_role { KafkaRole::Broker => build_broker_rolegroup_statefulset( kafka, &kafka_role, @@ -433,6 +498,20 @@ pub async fn reconcile_kafka( .context(BuildStatefulsetSnafu)?, }; + if sequenced_shutdown { + let should_stop = match kafka_role { + // Always stop brokers first. + KafkaRole::Broker => true, + // Only stop controllers after all brokers are gone. + KafkaRole::Controller => brokers_stopped, + }; + if should_stop { + if let Some(spec) = rg_statefulset.spec.as_mut() { + spec.replicas = Some(0); + } + } + } + if let AnyConfig::Broker(broker_config) = merged_config { let rg_bootstrap_listener = build_broker_rolegroup_bootstrap_listener( kafka, @@ -524,7 +603,13 @@ pub async fn reconcile_kafka( .await .context(ApplyStatusSnafu)?; - Ok(Action::await_change()) + // During sequenced shutdown, requeue until brokers are fully stopped so we can + // proceed to stop controllers on the next reconciliation. + if sequenced_shutdown && !brokers_stopped { + Ok(Action::requeue(*SEQUENCED_SHUTDOWN_REQUEUE)) + } else { + Ok(Action::await_change()) + } } pub fn error_policy( From 92f8e020888fc8c272a41ba6ea7a456426f24002 Mon Sep 17 00:00:00 2001 From: Nick Larsen Date: Mon, 11 May 2026 14:35:22 +0200 Subject: [PATCH 3/7] test: Replace role scaledowns with clusterOperation.stopped --- .../kuttl/cluster-operation/90-shutdown-kafka.yaml | 8 ++++---- .../kuttl/configuration/90-shutdown-kafka.yaml | 13 ++++--------- .../kuttl/delete-rolegroup/90-shutdown-kafka.yaml | 9 ++++----- .../templates/kuttl/kerberos/90-shutdown-kafka.yaml | 8 ++++---- .../templates/kuttl/logging/90-shutdown-kafka.yaml | 8 ++++---- tests/templates/kuttl/opa/90-shutdown-kafka.yaml | 8 ++++---- .../kuttl/operations-kraft/90-shutdown-kafka.yaml | 13 ++++--------- .../kuttl/smoke-kraft/90-shutdown-kafka.yaml | 13 ++++--------- tests/templates/kuttl/smoke/90-shutdown-kafka.yaml | 8 ++++---- tests/templates/kuttl/tls/90-shutdown-kafka.yaml | 8 ++++---- .../templates/kuttl/upgrade/90-shutdown-kafka.yaml | 13 ++++--------- 11 files changed, 44 insertions(+), 65 deletions(-) diff --git a/tests/templates/kuttl/cluster-operation/90-shutdown-kafka.yaml b/tests/templates/kuttl/cluster-operation/90-shutdown-kafka.yaml index acce2cee..11ddd3d7 100644 --- a/tests/templates/kuttl/cluster-operation/90-shutdown-kafka.yaml +++ b/tests/templates/kuttl/cluster-operation/90-shutdown-kafka.yaml @@ -1,11 +1,11 @@ --- -# Scale Kafka down before kuttl deletes the namespace. -# Without this, ZooKeeper and Kafka are terminated simultaneously, -# and Kafka hangs on controlled-shutdown ZooKeeper timeouts. +# Stop the KafkaCluster before kuttl deletes the namespace. +# Without this, ZooKeeper/controllers and brokers are terminated simultaneously, +# and brokers hang on controlled-shutdown timeouts. apiVersion: kuttl.dev/v1beta1 kind: TestStep commands: - script: | - kubectl patch kafkacluster test-kafka -n $NAMESPACE --type merge -p '{"spec":{"brokers":{"roleGroups":{"default":{"replicas":0}}}}}' + kubectl patch kafkacluster test-kafka -n $NAMESPACE --type merge -p '{"spec":{"clusterOperation":{"stopped":true}}}' - script: | kubectl wait --for=delete pod -l app.kubernetes.io/instance=test-kafka -n $NAMESPACE --timeout=300s diff --git a/tests/templates/kuttl/configuration/90-shutdown-kafka.yaml b/tests/templates/kuttl/configuration/90-shutdown-kafka.yaml index 97ed2393..11ddd3d7 100644 --- a/tests/templates/kuttl/configuration/90-shutdown-kafka.yaml +++ b/tests/templates/kuttl/configuration/90-shutdown-kafka.yaml @@ -1,16 +1,11 @@ --- -# Scale Kafka down before kuttl deletes the namespace. -# Without this, Kafka pods may hang during shutdown. +# Stop the KafkaCluster before kuttl deletes the namespace. +# Without this, ZooKeeper/controllers and brokers are terminated simultaneously, +# and brokers hang on controlled-shutdown timeouts. apiVersion: kuttl.dev/v1beta1 kind: TestStep commands: - # Scale down brokers first, since they depend on controllers. - script: | - kubectl patch kafkacluster test-kafka -n $NAMESPACE --type merge -p '{"spec":{"brokers":{"roleGroups":{"default":{"replicas":0}}}}}' - - script: | - kubectl wait --for=delete pod -l app.kubernetes.io/instance=test-kafka,app.kubernetes.io/component=broker -n $NAMESPACE --timeout=300s - # Then scale down controllers once all brokers are gone. - - script: | - kubectl patch kafkacluster test-kafka -n $NAMESPACE --type merge -p '{"spec":{"controllers":{"roleGroups":{"default":{"replicas":0}}}}}' + kubectl patch kafkacluster test-kafka -n $NAMESPACE --type merge -p '{"spec":{"clusterOperation":{"stopped":true}}}' - script: | kubectl wait --for=delete pod -l app.kubernetes.io/instance=test-kafka -n $NAMESPACE --timeout=300s diff --git a/tests/templates/kuttl/delete-rolegroup/90-shutdown-kafka.yaml b/tests/templates/kuttl/delete-rolegroup/90-shutdown-kafka.yaml index 3339f085..11ddd3d7 100644 --- a/tests/templates/kuttl/delete-rolegroup/90-shutdown-kafka.yaml +++ b/tests/templates/kuttl/delete-rolegroup/90-shutdown-kafka.yaml @@ -1,12 +1,11 @@ --- -# Scale Kafka down before kuttl deletes the namespace. -# Without this, ZooKeeper and Kafka are terminated simultaneously, -# and Kafka hangs on controlled-shutdown ZooKeeper timeouts. +# Stop the KafkaCluster before kuttl deletes the namespace. +# Without this, ZooKeeper/controllers and brokers are terminated simultaneously, +# and brokers hang on controlled-shutdown timeouts. apiVersion: kuttl.dev/v1beta1 kind: TestStep commands: - # Note: By the time this script runs, the secondary RoleGroup has already been deleted, therefore we don't scale it down. - script: | - kubectl patch kafkacluster test-kafka -n $NAMESPACE --type merge -p '{"spec":{"brokers":{"roleGroups":{"default":{"replicas":0}}}}}' + kubectl patch kafkacluster test-kafka -n $NAMESPACE --type merge -p '{"spec":{"clusterOperation":{"stopped":true}}}' - script: | kubectl wait --for=delete pod -l app.kubernetes.io/instance=test-kafka -n $NAMESPACE --timeout=300s diff --git a/tests/templates/kuttl/kerberos/90-shutdown-kafka.yaml b/tests/templates/kuttl/kerberos/90-shutdown-kafka.yaml index acce2cee..11ddd3d7 100644 --- a/tests/templates/kuttl/kerberos/90-shutdown-kafka.yaml +++ b/tests/templates/kuttl/kerberos/90-shutdown-kafka.yaml @@ -1,11 +1,11 @@ --- -# Scale Kafka down before kuttl deletes the namespace. -# Without this, ZooKeeper and Kafka are terminated simultaneously, -# and Kafka hangs on controlled-shutdown ZooKeeper timeouts. +# Stop the KafkaCluster before kuttl deletes the namespace. +# Without this, ZooKeeper/controllers and brokers are terminated simultaneously, +# and brokers hang on controlled-shutdown timeouts. apiVersion: kuttl.dev/v1beta1 kind: TestStep commands: - script: | - kubectl patch kafkacluster test-kafka -n $NAMESPACE --type merge -p '{"spec":{"brokers":{"roleGroups":{"default":{"replicas":0}}}}}' + kubectl patch kafkacluster test-kafka -n $NAMESPACE --type merge -p '{"spec":{"clusterOperation":{"stopped":true}}}' - script: | kubectl wait --for=delete pod -l app.kubernetes.io/instance=test-kafka -n $NAMESPACE --timeout=300s diff --git a/tests/templates/kuttl/logging/90-shutdown-kafka.yaml b/tests/templates/kuttl/logging/90-shutdown-kafka.yaml index c072e08c..11ddd3d7 100644 --- a/tests/templates/kuttl/logging/90-shutdown-kafka.yaml +++ b/tests/templates/kuttl/logging/90-shutdown-kafka.yaml @@ -1,11 +1,11 @@ --- -# Scale Kafka down before kuttl deletes the namespace. -# Without this, ZooKeeper and Kafka are terminated simultaneously, -# and Kafka hangs on controlled-shutdown ZooKeeper timeouts. +# Stop the KafkaCluster before kuttl deletes the namespace. +# Without this, ZooKeeper/controllers and brokers are terminated simultaneously, +# and brokers hang on controlled-shutdown timeouts. apiVersion: kuttl.dev/v1beta1 kind: TestStep commands: - script: | - kubectl patch kafkacluster test-kafka -n $NAMESPACE --type merge -p '{"spec":{"brokers":{"roleGroups":{"automatic-log-config":{"replicas":0}, "custom-log-config":{"replicas":0}}}}}' + kubectl patch kafkacluster test-kafka -n $NAMESPACE --type merge -p '{"spec":{"clusterOperation":{"stopped":true}}}' - script: | kubectl wait --for=delete pod -l app.kubernetes.io/instance=test-kafka -n $NAMESPACE --timeout=300s diff --git a/tests/templates/kuttl/opa/90-shutdown-kafka.yaml b/tests/templates/kuttl/opa/90-shutdown-kafka.yaml index acce2cee..11ddd3d7 100644 --- a/tests/templates/kuttl/opa/90-shutdown-kafka.yaml +++ b/tests/templates/kuttl/opa/90-shutdown-kafka.yaml @@ -1,11 +1,11 @@ --- -# Scale Kafka down before kuttl deletes the namespace. -# Without this, ZooKeeper and Kafka are terminated simultaneously, -# and Kafka hangs on controlled-shutdown ZooKeeper timeouts. +# Stop the KafkaCluster before kuttl deletes the namespace. +# Without this, ZooKeeper/controllers and brokers are terminated simultaneously, +# and brokers hang on controlled-shutdown timeouts. apiVersion: kuttl.dev/v1beta1 kind: TestStep commands: - script: | - kubectl patch kafkacluster test-kafka -n $NAMESPACE --type merge -p '{"spec":{"brokers":{"roleGroups":{"default":{"replicas":0}}}}}' + kubectl patch kafkacluster test-kafka -n $NAMESPACE --type merge -p '{"spec":{"clusterOperation":{"stopped":true}}}' - script: | kubectl wait --for=delete pod -l app.kubernetes.io/instance=test-kafka -n $NAMESPACE --timeout=300s diff --git a/tests/templates/kuttl/operations-kraft/90-shutdown-kafka.yaml b/tests/templates/kuttl/operations-kraft/90-shutdown-kafka.yaml index 97ed2393..11ddd3d7 100644 --- a/tests/templates/kuttl/operations-kraft/90-shutdown-kafka.yaml +++ b/tests/templates/kuttl/operations-kraft/90-shutdown-kafka.yaml @@ -1,16 +1,11 @@ --- -# Scale Kafka down before kuttl deletes the namespace. -# Without this, Kafka pods may hang during shutdown. +# Stop the KafkaCluster before kuttl deletes the namespace. +# Without this, ZooKeeper/controllers and brokers are terminated simultaneously, +# and brokers hang on controlled-shutdown timeouts. apiVersion: kuttl.dev/v1beta1 kind: TestStep commands: - # Scale down brokers first, since they depend on controllers. - script: | - kubectl patch kafkacluster test-kafka -n $NAMESPACE --type merge -p '{"spec":{"brokers":{"roleGroups":{"default":{"replicas":0}}}}}' - - script: | - kubectl wait --for=delete pod -l app.kubernetes.io/instance=test-kafka,app.kubernetes.io/component=broker -n $NAMESPACE --timeout=300s - # Then scale down controllers once all brokers are gone. - - script: | - kubectl patch kafkacluster test-kafka -n $NAMESPACE --type merge -p '{"spec":{"controllers":{"roleGroups":{"default":{"replicas":0}}}}}' + kubectl patch kafkacluster test-kafka -n $NAMESPACE --type merge -p '{"spec":{"clusterOperation":{"stopped":true}}}' - script: | kubectl wait --for=delete pod -l app.kubernetes.io/instance=test-kafka -n $NAMESPACE --timeout=300s diff --git a/tests/templates/kuttl/smoke-kraft/90-shutdown-kafka.yaml b/tests/templates/kuttl/smoke-kraft/90-shutdown-kafka.yaml index 8d201b64..11ddd3d7 100644 --- a/tests/templates/kuttl/smoke-kraft/90-shutdown-kafka.yaml +++ b/tests/templates/kuttl/smoke-kraft/90-shutdown-kafka.yaml @@ -1,16 +1,11 @@ --- -# Scale Kafka down before kuttl deletes the namespace. -# Without this, Kafka pods may hang during shutdown. +# Stop the KafkaCluster before kuttl deletes the namespace. +# Without this, ZooKeeper/controllers and brokers are terminated simultaneously, +# and brokers hang on controlled-shutdown timeouts. apiVersion: kuttl.dev/v1beta1 kind: TestStep commands: - # Scale down brokers first, since they depend on controllers. - script: | - kubectl patch kafkacluster test-kafka -n $NAMESPACE --type merge -p '{"spec":{"brokers":{"roleGroups":{"default":{"replicas":0},"automatic-log-config":{"replicas":0},"custom-log-config":{"replicas":0}}}}}' - - script: | - kubectl wait --for=delete pod -l app.kubernetes.io/instance=test-kafka,app.kubernetes.io/component=broker -n $NAMESPACE --timeout=300s - # Then scale down controllers once all brokers are gone. - - script: | - kubectl patch kafkacluster test-kafka -n $NAMESPACE --type merge -p '{"spec":{"controllers":{"roleGroups":{"automatic-log-config":{"replicas":0},"custom-log-config":{"replicas":0}}}}}' + kubectl patch kafkacluster test-kafka -n $NAMESPACE --type merge -p '{"spec":{"clusterOperation":{"stopped":true}}}' - script: | kubectl wait --for=delete pod -l app.kubernetes.io/instance=test-kafka -n $NAMESPACE --timeout=300s diff --git a/tests/templates/kuttl/smoke/90-shutdown-kafka.yaml b/tests/templates/kuttl/smoke/90-shutdown-kafka.yaml index acce2cee..11ddd3d7 100644 --- a/tests/templates/kuttl/smoke/90-shutdown-kafka.yaml +++ b/tests/templates/kuttl/smoke/90-shutdown-kafka.yaml @@ -1,11 +1,11 @@ --- -# Scale Kafka down before kuttl deletes the namespace. -# Without this, ZooKeeper and Kafka are terminated simultaneously, -# and Kafka hangs on controlled-shutdown ZooKeeper timeouts. +# Stop the KafkaCluster before kuttl deletes the namespace. +# Without this, ZooKeeper/controllers and brokers are terminated simultaneously, +# and brokers hang on controlled-shutdown timeouts. apiVersion: kuttl.dev/v1beta1 kind: TestStep commands: - script: | - kubectl patch kafkacluster test-kafka -n $NAMESPACE --type merge -p '{"spec":{"brokers":{"roleGroups":{"default":{"replicas":0}}}}}' + kubectl patch kafkacluster test-kafka -n $NAMESPACE --type merge -p '{"spec":{"clusterOperation":{"stopped":true}}}' - script: | kubectl wait --for=delete pod -l app.kubernetes.io/instance=test-kafka -n $NAMESPACE --timeout=300s diff --git a/tests/templates/kuttl/tls/90-shutdown-kafka.yaml b/tests/templates/kuttl/tls/90-shutdown-kafka.yaml index acce2cee..11ddd3d7 100644 --- a/tests/templates/kuttl/tls/90-shutdown-kafka.yaml +++ b/tests/templates/kuttl/tls/90-shutdown-kafka.yaml @@ -1,11 +1,11 @@ --- -# Scale Kafka down before kuttl deletes the namespace. -# Without this, ZooKeeper and Kafka are terminated simultaneously, -# and Kafka hangs on controlled-shutdown ZooKeeper timeouts. +# Stop the KafkaCluster before kuttl deletes the namespace. +# Without this, ZooKeeper/controllers and brokers are terminated simultaneously, +# and brokers hang on controlled-shutdown timeouts. apiVersion: kuttl.dev/v1beta1 kind: TestStep commands: - script: | - kubectl patch kafkacluster test-kafka -n $NAMESPACE --type merge -p '{"spec":{"brokers":{"roleGroups":{"default":{"replicas":0}}}}}' + kubectl patch kafkacluster test-kafka -n $NAMESPACE --type merge -p '{"spec":{"clusterOperation":{"stopped":true}}}' - script: | kubectl wait --for=delete pod -l app.kubernetes.io/instance=test-kafka -n $NAMESPACE --timeout=300s diff --git a/tests/templates/kuttl/upgrade/90-shutdown-kafka.yaml b/tests/templates/kuttl/upgrade/90-shutdown-kafka.yaml index 97ed2393..11ddd3d7 100644 --- a/tests/templates/kuttl/upgrade/90-shutdown-kafka.yaml +++ b/tests/templates/kuttl/upgrade/90-shutdown-kafka.yaml @@ -1,16 +1,11 @@ --- -# Scale Kafka down before kuttl deletes the namespace. -# Without this, Kafka pods may hang during shutdown. +# Stop the KafkaCluster before kuttl deletes the namespace. +# Without this, ZooKeeper/controllers and brokers are terminated simultaneously, +# and brokers hang on controlled-shutdown timeouts. apiVersion: kuttl.dev/v1beta1 kind: TestStep commands: - # Scale down brokers first, since they depend on controllers. - script: | - kubectl patch kafkacluster test-kafka -n $NAMESPACE --type merge -p '{"spec":{"brokers":{"roleGroups":{"default":{"replicas":0}}}}}' - - script: | - kubectl wait --for=delete pod -l app.kubernetes.io/instance=test-kafka,app.kubernetes.io/component=broker -n $NAMESPACE --timeout=300s - # Then scale down controllers once all brokers are gone. - - script: | - kubectl patch kafkacluster test-kafka -n $NAMESPACE --type merge -p '{"spec":{"controllers":{"roleGroups":{"default":{"replicas":0}}}}}' + kubectl patch kafkacluster test-kafka -n $NAMESPACE --type merge -p '{"spec":{"clusterOperation":{"stopped":true}}}' - script: | kubectl wait --for=delete pod -l app.kubernetes.io/instance=test-kafka -n $NAMESPACE --timeout=300s From 56eefbb07fe9ce610a5f7a0cbb619cf87350eb64 Mon Sep 17 00:00:00 2001 From: Nick Larsen Date: Mon, 11 May 2026 14:41:37 +0200 Subject: [PATCH 4/7] test: Use uniquely named cluster resources kuttl seems to track cluster scoped resources and deletes them, leaving other tests running in parallel to fail. --- .../templates/kuttl/smoke-kraft/30-install-kafka.yaml.j2 | 8 ++++---- tests/templates/kuttl/smoke-kraft/70_test-tls-job.yaml.j2 | 2 +- tests/templates/kuttl/tls/40-install-kafka.yaml.j2 | 8 ++++---- tests/templates/kuttl/upgrade/02-install-kafka.yaml.j2 | 8 ++++---- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/tests/templates/kuttl/smoke-kraft/30-install-kafka.yaml.j2 b/tests/templates/kuttl/smoke-kraft/30-install-kafka.yaml.j2 index 95d85da6..7bbd4b43 100644 --- a/tests/templates/kuttl/smoke-kraft/30-install-kafka.yaml.j2 +++ b/tests/templates/kuttl/smoke-kraft/30-install-kafka.yaml.j2 @@ -36,16 +36,16 @@ data: apiVersion: authentication.stackable.tech/v1alpha1 kind: AuthenticationClass metadata: - name: test-kafka-client-auth-tls + name: test-kafka-client-auth-tls-{{ NAMESPACE }} spec: provider: tls: - clientCertSecretClass: test-kafka-client-auth-tls + clientCertSecretClass: test-kafka-client-auth-tls-{{ NAMESPACE }} --- apiVersion: secrets.stackable.tech/v1alpha1 kind: SecretClass metadata: - name: test-kafka-client-auth-tls + name: test-kafka-client-auth-tls-{{ NAMESPACE }} spec: backend: autoTls: @@ -71,7 +71,7 @@ spec: clusterConfig: metadataManager: kraft authentication: - - authenticationClass: test-kafka-client-auth-tls + - authenticationClass: test-kafka-client-auth-tls-{{ NAMESPACE }} tls: serverSecretClass: tls vectorAggregatorConfigMapName: vector-aggregator-discovery diff --git a/tests/templates/kuttl/smoke-kraft/70_test-tls-job.yaml.j2 b/tests/templates/kuttl/smoke-kraft/70_test-tls-job.yaml.j2 index f9da65e1..d48b8bf5 100644 --- a/tests/templates/kuttl/smoke-kraft/70_test-tls-job.yaml.j2 +++ b/tests/templates/kuttl/smoke-kraft/70_test-tls-job.yaml.j2 @@ -42,7 +42,7 @@ spec: volumeClaimTemplate: metadata: annotations: - secrets.stackable.tech/class: test-kafka-client-auth-tls + secrets.stackable.tech/class: test-kafka-client-auth-tls-{{ NAMESPACE }} secrets.stackable.tech/format: tls-pkcs12 secrets.stackable.tech/scope: pod,node spec: diff --git a/tests/templates/kuttl/tls/40-install-kafka.yaml.j2 b/tests/templates/kuttl/tls/40-install-kafka.yaml.j2 index ba0278a4..a722d865 100644 --- a/tests/templates/kuttl/tls/40-install-kafka.yaml.j2 +++ b/tests/templates/kuttl/tls/40-install-kafka.yaml.j2 @@ -11,16 +11,16 @@ spec: apiVersion: authentication.stackable.tech/v1alpha1 kind: AuthenticationClass metadata: - name: test-kafka-client-auth-tls + name: test-kafka-client-auth-tls-{{ NAMESPACE }} spec: provider: tls: - clientCertSecretClass: test-kafka-client-auth-tls + clientCertSecretClass: test-kafka-client-auth-tls-{{ NAMESPACE }} --- apiVersion: secrets.stackable.tech/v1alpha1 kind: SecretClass metadata: - name: test-kafka-client-auth-tls + name: test-kafka-client-auth-tls-{{ NAMESPACE }} spec: backend: autoTls: @@ -47,7 +47,7 @@ spec: clusterConfig: {% if test_scenario['values']['use-client-auth-tls'] == 'true' %} authentication: - - authenticationClass: test-kafka-client-auth-tls + - authenticationClass: test-kafka-client-auth-tls-{{ NAMESPACE }} {% endif %} tls: {% if test_scenario['values']['use-client-tls'] == 'true' %} diff --git a/tests/templates/kuttl/upgrade/02-install-kafka.yaml.j2 b/tests/templates/kuttl/upgrade/02-install-kafka.yaml.j2 index 93e1d415..40dbfcdd 100644 --- a/tests/templates/kuttl/upgrade/02-install-kafka.yaml.j2 +++ b/tests/templates/kuttl/upgrade/02-install-kafka.yaml.j2 @@ -3,16 +3,16 @@ apiVersion: authentication.stackable.tech/v1alpha1 kind: AuthenticationClass metadata: - name: test-kafka-client-auth-tls + name: test-kafka-client-auth-tls-{{ NAMESPACE }} spec: provider: tls: - clientCertSecretClass: test-kafka-client-auth-tls + clientCertSecretClass: test-kafka-client-auth-tls-{{ NAMESPACE }} --- apiVersion: secrets.stackable.tech/v1alpha1 kind: SecretClass metadata: - name: test-kafka-client-auth-tls + name: test-kafka-client-auth-tls-{{ NAMESPACE }} spec: backend: autoTls: @@ -37,7 +37,7 @@ spec: metadataManager: kraft {% if test_scenario['values']['use-client-auth-tls'] == 'true' %} authentication: - - authenticationClass: test-kafka-client-auth-tls + - authenticationClass: test-kafka-client-auth-tls-{{ NAMESPACE }} {% endif %} tls: {% if test_scenario['values']['use-client-tls'] == 'true' %} From 4126156792ce40b1bdbd28db78d90de6091d62d4 Mon Sep 17 00:00:00 2001 From: Nick Larsen Date: Mon, 11 May 2026 15:01:04 +0200 Subject: [PATCH 5/7] fix: Allow brokers to be scaled to 0 Without this, scaling brokers to 0 blocks reconciliation (so things like controller replicas being set to 0 hangs) --- rust/operator-binary/src/kafka_controller.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/rust/operator-binary/src/kafka_controller.rs b/rust/operator-binary/src/kafka_controller.rs index 0273d78b..712a722b 100644 --- a/rust/operator-binary/src/kafka_controller.rs +++ b/rust/operator-binary/src/kafka_controller.rs @@ -426,6 +426,21 @@ pub async fn reconcile_kafka( for (rolegroup_name, rolegroup_config) in role_config.iter() { let rolegroup_ref = kafka.rolegroup_ref(&kafka_role, rolegroup_name); + // Skip rolegroups with 0 replicas. There are no pods to configure, and + // building configmaps would fail when all controller replicas are 0 + // (no quorum voters to reference). + // NOTE: This also means the rolegroup's resources (services, configmaps, + // statefulsets) won't be added to cluster_resources, so they will be + // deleted as orphans by delete_orphaned_resources(). They get recreated + // when replicas go back above 0. + let replicas = kafka_role + .replicas(kafka, &rolegroup_ref.role_group) + .context(FailedToResolveConfigSnafu)? + .unwrap_or(0); + if replicas == 0 { + continue; + } + let merged_config = kafka_role .merged_config(kafka, &rolegroup_ref.role_group) .context(FailedToResolveConfigSnafu)?; From a0d64c13779713a3aeb9d98305ab0de17fc2a995 Mon Sep 17 00:00:00 2001 From: Nick Larsen Date: Fri, 3 Jul 2026 11:27:19 +0200 Subject: [PATCH 6/7] WIP: fix kraft shutdown bug --- rust/operator-binary/src/crd/mod.rs | 2 +- rust/operator-binary/src/kafka_controller.rs | 103 ++++++++++++++++--- 2 files changed, 88 insertions(+), 17 deletions(-) diff --git a/rust/operator-binary/src/crd/mod.rs b/rust/operator-binary/src/crd/mod.rs index cd6a1046..c8accdce 100644 --- a/rust/operator-binary/src/crd/mod.rs +++ b/rust/operator-binary/src/crd/mod.rs @@ -465,7 +465,7 @@ impl v1alpha1::KafkaCluster { Ok(pod_descriptors) } - fn extract_rolegroup_replicas( + pub fn extract_rolegroup_replicas( &self, kafka_role: &KafkaRole, ) -> Result, Error> { diff --git a/rust/operator-binary/src/kafka_controller.rs b/rust/operator-binary/src/kafka_controller.rs index 712a722b..cdb81ac3 100644 --- a/rust/operator-binary/src/kafka_controller.rs +++ b/rust/operator-binary/src/kafka_controller.rs @@ -321,13 +321,45 @@ pub async fn reconcile_kafka( ) .context(ResolveProductImageSnafu)?; - // When the cluster is being stopped and has KRaft controllers, we need to sequence - // the shutdown: brokers must stop before controllers, because brokers depend on - // controllers for the controlled shutdown protocol. Without this, brokers hang - // waiting for controller responses until terminationGracePeriodSeconds expires. - let stopped = kafka.spec.cluster_operation.stopped; + // When KRaft controllers are being scaled to 0, we must ensure brokers stop first, + // because brokers depend on controllers for the controlled shutdown protocol. + // Without this, brokers hang waiting for controller responses until + // terminationGracePeriodSeconds expires. + // + // This applies both when using clusterOperation.stopped and when manually setting + // controller replicas to 0. In either case, we prevent controllers from scaling + // down until all broker pods are gone. + // + // TODO: A validating webhook would provide better UX by rejecting invalid + // configurations (e.g. controllers=0 while brokers>0) at admission time + // rather than silently keeping controllers running. let has_controllers = kafka.spec.controllers.is_some(); - let sequenced_shutdown = stopped && has_controllers; + let stopped = kafka.spec.cluster_operation.stopped; + + let all_controllers_zero = has_controllers + && kafka + .extract_rolegroup_replicas(&KafkaRole::Controller) + .unwrap_or_default() + .values() + .all(|r| *r == 0); + let all_brokers_zero = kafka + .extract_rolegroup_replicas(&KafkaRole::Broker) + .unwrap_or_default() + .values() + .all(|r| *r == 0); + + // Sequenced shutdown is needed when controllers are going to 0, either via + // clusterOperation.stopped or by manually setting all controller replicas to 0. + let sequenced_shutdown = has_controllers && (stopped || all_controllers_zero); + + // If controllers are set to 0 but brokers aren't, warn and keep controllers + // running to avoid breaking the cluster. + if all_controllers_zero && !all_brokers_zero && !stopped { + tracing::warn!( + "Controller replicas are set to 0 but broker replicas are non-zero. \ + Controllers will not be scaled down until all brokers are stopped." + ); + } // Use Default instead of ClusterStopped because ClusterStopped zeroes all // StatefulSet replicas unconditionally via maybe_mutate(). We need to control @@ -433,11 +465,17 @@ pub async fn reconcile_kafka( // statefulsets) won't be added to cluster_resources, so they will be // deleted as orphans by delete_orphaned_resources(). They get recreated // when replicas go back above 0. + // + // Exception: during sequenced shutdown, controller rolegroups must keep + // running until all brokers are gone, even if their spec says 0 replicas. let replicas = kafka_role .replicas(kafka, &rolegroup_ref.role_group) .context(FailedToResolveConfigSnafu)? .unwrap_or(0); - if replicas == 0 { + let keep_controllers_running = sequenced_shutdown + && kafka_role == KafkaRole::Controller + && !brokers_stopped; + if replicas == 0 && !keep_controllers_running { continue; } @@ -473,6 +511,15 @@ pub async fn reconcile_kafka( ) .context(BuildPodDescriptorsSnafu)?; + tracing::warn!( + rolegroup = %rolegroup_ref, + role = %kafka_role, + total_pod_descriptors = pod_descriptors.len(), + controller_descriptors = pod_descriptors.iter().filter(|pd| pd.role == KafkaRole::Controller.to_string()).count(), + broker_descriptors = pod_descriptors.iter().filter(|pd| pd.role == KafkaRole::Broker.to_string()).count(), + "Building configmap for rolegroup", + ); + let rg_configmap = build_rolegroup_config_map( kafka, &resolved_product_image, @@ -514,15 +561,39 @@ pub async fn reconcile_kafka( }; if sequenced_shutdown { - let should_stop = match kafka_role { - // Always stop brokers first. - KafkaRole::Broker => true, - // Only stop controllers after all brokers are gone. - KafkaRole::Controller => brokers_stopped, - }; - if should_stop { - if let Some(spec) = rg_statefulset.spec.as_mut() { - spec.replicas = Some(0); + match kafka_role { + KafkaRole::Broker => { + // Always stop brokers first. + if let Some(spec) = rg_statefulset.spec.as_mut() { + spec.replicas = Some(0); + } + } + KafkaRole::Controller => { + if brokers_stopped { + // All brokers are gone, safe to stop controllers now. + if let Some(spec) = rg_statefulset.spec.as_mut() { + spec.replicas = Some(0); + } + } else if replicas == 0 { + // Controller replicas are set to 0 in the spec, but + // brokers are still running. Preserve the current + // running replica count so controllers stay up. + let current = client + .get::( + &rg_statefulset.name_any(), + rg_statefulset + .namespace() + .as_deref() + .unwrap_or_default(), + ) + .await + .ok() + .and_then(|sts| sts.spec) + .and_then(|spec| spec.replicas); + if let Some(spec) = rg_statefulset.spec.as_mut() { + spec.replicas = current; + } + } } } } From 0752c5bce41b8c7f851c2be68ca0a224e2c7e995 Mon Sep 17 00:00:00 2001 From: Nick Larsen Date: Fri, 3 Jul 2026 11:28:38 +0200 Subject: [PATCH 7/7] WIP: change a test to scale down brokers instead of set to shutdown. This is just to check that the code is working --- .../kuttl/smoke-kraft/90-shutdown-kafka.yaml | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/tests/templates/kuttl/smoke-kraft/90-shutdown-kafka.yaml b/tests/templates/kuttl/smoke-kraft/90-shutdown-kafka.yaml index 11ddd3d7..8d201b64 100644 --- a/tests/templates/kuttl/smoke-kraft/90-shutdown-kafka.yaml +++ b/tests/templates/kuttl/smoke-kraft/90-shutdown-kafka.yaml @@ -1,11 +1,16 @@ --- -# Stop the KafkaCluster before kuttl deletes the namespace. -# Without this, ZooKeeper/controllers and brokers are terminated simultaneously, -# and brokers hang on controlled-shutdown timeouts. +# Scale Kafka down before kuttl deletes the namespace. +# Without this, Kafka pods may hang during shutdown. apiVersion: kuttl.dev/v1beta1 kind: TestStep commands: + # Scale down brokers first, since they depend on controllers. - script: | - kubectl patch kafkacluster test-kafka -n $NAMESPACE --type merge -p '{"spec":{"clusterOperation":{"stopped":true}}}' + kubectl patch kafkacluster test-kafka -n $NAMESPACE --type merge -p '{"spec":{"brokers":{"roleGroups":{"default":{"replicas":0},"automatic-log-config":{"replicas":0},"custom-log-config":{"replicas":0}}}}}' + - script: | + kubectl wait --for=delete pod -l app.kubernetes.io/instance=test-kafka,app.kubernetes.io/component=broker -n $NAMESPACE --timeout=300s + # Then scale down controllers once all brokers are gone. + - script: | + kubectl patch kafkacluster test-kafka -n $NAMESPACE --type merge -p '{"spec":{"controllers":{"roleGroups":{"automatic-log-config":{"replicas":0},"custom-log-config":{"replicas":0}}}}}' - script: | kubectl wait --for=delete pod -l app.kubernetes.io/instance=test-kafka -n $NAMESPACE --timeout=300s