Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rust/operator-binary/src/crd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ impl v1alpha1::KafkaCluster {
Ok(pod_descriptors)
}

fn extract_rolegroup_replicas(
pub fn extract_rolegroup_replicas(
&self,
kafka_role: &KafkaRole,
) -> Result<BTreeMap<String, u16>, Error> {
Expand Down
181 changes: 176 additions & 5 deletions rust/operator-binary/src/kafka_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{collections::HashMap, str::FromStr, sync::Arc};

use const_format::concatcp;
use product_config::{ProductConfigManager, types::PropertyNameKind};
use snafu::{ResultExt, Snafu};
use snafu::{OptionExt as _, ResultExt as _, Snafu};
use stackable_operator::{
cli::OperatorEnvironmentOptions,
cluster_resources::{ClusterResourceApplyStrategy, ClusterResources},
Expand All @@ -13,12 +13,14 @@ use stackable_operator::{
rbac::build_rbac_resources,
},
crd::listener,
k8s_openapi::api::apps::v1::StatefulSet,
kube::{
Resource,
Resource, ResourceExt as _,
api::DynamicObject,
core::{DeserializeGuard, error_boundary},
runtime::{controller::Action, reflector::ObjectRef},
},
kvp::consts::{K8S_APP_COMPONENT_KEY, K8S_APP_INSTANCE_KEY, K8S_APP_NAME_KEY},
logging::controller::ReconcilerError,
product_config_utils::{
ValidatedRoleConfigByPropertyKind, transform_all_roles_to_config,
Expand Down Expand Up @@ -58,6 +60,10 @@ use crate::{
pub const KAFKA_CONTROLLER_NAME: &str = "kafkacluster";
pub const KAFKA_FULL_CONTROLLER_NAME: &str = concatcp!(KAFKA_CONTROLLER_NAME, '.', OPERATOR_NAME);

/// How long to wait between reconciliation attempts while waiting for broker pods
/// to terminate during a sequenced shutdown (brokers before controllers).
const SEQUENCED_SHUTDOWN_REQUEUE: Duration = Duration::from_secs(10);

pub struct Ctx {
pub client: stackable_operator::client::Client,
pub product_config: ProductConfigManager,
Expand Down Expand Up @@ -211,6 +217,14 @@ pub enum Error {

#[snafu(display("object defines no namespace"))]
GetOpaConfig { source: authorization::Error },

#[snafu(display("object has no namespace"))]
ObjectHasNoNamespace,

#[snafu(display("failed to check broker shutdown status"))]
CheckBrokerShutdownStatus {
source: stackable_operator::client::Error,
},
}
type Result<T, E = Error> = std::result::Result<T, E>;

Expand Down Expand Up @@ -252,10 +266,37 @@ impl ReconcilerError for Error {
Error::InvalidKafkaListeners { .. } => None,
Error::BuildPodDescriptors { .. } => None,
Error::GetOpaConfig { .. } => None,
Error::ObjectHasNoNamespace => None,
Error::CheckBrokerShutdownStatus { .. } => None,
}
}
}

/// Returns `true` if all broker StatefulSets have been fully scaled down (0 running replicas).
async fn broker_pods_stopped(
client: &stackable_operator::client::Client,
kafka: &v1alpha1::KafkaCluster,
) -> Result<bool> {
let namespace = kafka.namespace().context(ObjectHasNoNamespaceSnafu)?;
let label_selector = format!(
"{K8S_APP_NAME_KEY}={APP_NAME},{K8S_APP_INSTANCE_KEY}={},{K8S_APP_COMPONENT_KEY}={}",
kafka.name_any(),
KafkaRole::Broker,
);
let list_params = stackable_operator::kube::api::ListParams {
label_selector: Some(label_selector),
..Default::default()
};
let broker_statefulsets: Vec<StatefulSet> = client
.list(namespace.as_str(), &list_params)
.await
.context(CheckBrokerShutdownStatusSnafu)?;

Ok(broker_statefulsets
.iter()
.all(|sts| sts.status.as_ref().is_none_or(|s| s.replicas == 0)))
}

pub async fn reconcile_kafka(
kafka: Arc<DeserializeGuard<v1alpha1::KafkaCluster>>,
ctx: Arc<Ctx>,
Expand All @@ -280,12 +321,68 @@ pub async fn reconcile_kafka(
)
.context(ResolveProductImageSnafu)?;

// When KRaft controllers are being scaled to 0, we must ensure brokers stop first,
// because brokers depend on controllers for the controlled shutdown protocol.
// Without this, brokers hang waiting for controller responses until
// terminationGracePeriodSeconds expires.
//
// This applies both when using clusterOperation.stopped and when manually setting
// controller replicas to 0. In either case, we prevent controllers from scaling
// down until all broker pods are gone.
//
// TODO: A validating webhook would provide better UX by rejecting invalid
// configurations (e.g. controllers=0 while brokers>0) at admission time
// rather than silently keeping controllers running.
let has_controllers = kafka.spec.controllers.is_some();
let stopped = kafka.spec.cluster_operation.stopped;

let all_controllers_zero = has_controllers
&& kafka
.extract_rolegroup_replicas(&KafkaRole::Controller)
.unwrap_or_default()
.values()
.all(|r| *r == 0);
let all_brokers_zero = kafka
.extract_rolegroup_replicas(&KafkaRole::Broker)
.unwrap_or_default()
.values()
.all(|r| *r == 0);

// Sequenced shutdown is needed when controllers are going to 0, either via
// clusterOperation.stopped or by manually setting all controller replicas to 0.
let sequenced_shutdown = has_controllers && (stopped || all_controllers_zero);

// If controllers are set to 0 but brokers aren't, warn and keep controllers
// running to avoid breaking the cluster.
if all_controllers_zero && !all_brokers_zero && !stopped {
tracing::warn!(
"Controller replicas are set to 0 but broker replicas are non-zero. \
Controllers will not be scaled down until all brokers are stopped."
);
}

// Use Default instead of ClusterStopped because ClusterStopped zeroes all
// StatefulSet replicas unconditionally via maybe_mutate(). We need to control
// replica counts per role to sequence the shutdown (brokers before controllers),
// so we set replicas to 0 ourselves on each StatefulSet as needed.
let apply_strategy = if sequenced_shutdown {
ClusterResourceApplyStrategy::Default
} else {
ClusterResourceApplyStrategy::from(&kafka.spec.cluster_operation)
};

let brokers_stopped = if sequenced_shutdown {
broker_pods_stopped(client, kafka).await?
} else {
false
};

let mut cluster_resources = ClusterResources::new(
APP_NAME,
OPERATOR_NAME,
KAFKA_CONTROLLER_NAME,
&kafka.object_ref(&()),
ClusterResourceApplyStrategy::from(&kafka.spec.cluster_operation),
apply_strategy,
&kafka.spec.object_overrides,
)
.context(CreateClusterResourcesSnafu)?;
Expand Down Expand Up @@ -361,6 +458,27 @@ pub async fn reconcile_kafka(
for (rolegroup_name, rolegroup_config) in role_config.iter() {
let rolegroup_ref = kafka.rolegroup_ref(&kafka_role, rolegroup_name);

// Skip rolegroups with 0 replicas. There are no pods to configure, and
// building configmaps would fail when all controller replicas are 0
// (no quorum voters to reference).
// NOTE: This also means the rolegroup's resources (services, configmaps,
// statefulsets) won't be added to cluster_resources, so they will be
// deleted as orphans by delete_orphaned_resources(). They get recreated
// when replicas go back above 0.
//
// Exception: during sequenced shutdown, controller rolegroups must keep
// running until all brokers are gone, even if their spec says 0 replicas.
let replicas = kafka_role
.replicas(kafka, &rolegroup_ref.role_group)
.context(FailedToResolveConfigSnafu)?
.unwrap_or(0);
let keep_controllers_running = sequenced_shutdown
&& kafka_role == KafkaRole::Controller
&& !brokers_stopped;
if replicas == 0 && !keep_controllers_running {
continue;
}

let merged_config = kafka_role
.merged_config(kafka, &rolegroup_ref.role_group)
.context(FailedToResolveConfigSnafu)?;
Expand Down Expand Up @@ -393,6 +511,15 @@ pub async fn reconcile_kafka(
)
.context(BuildPodDescriptorsSnafu)?;

tracing::warn!(
rolegroup = %rolegroup_ref,
role = %kafka_role,
total_pod_descriptors = pod_descriptors.len(),
controller_descriptors = pod_descriptors.iter().filter(|pd| pd.role == KafkaRole::Controller.to_string()).count(),
broker_descriptors = pod_descriptors.iter().filter(|pd| pd.role == KafkaRole::Broker.to_string()).count(),
"Building configmap for rolegroup",
);

let rg_configmap = build_rolegroup_config_map(
kafka,
&resolved_product_image,
Expand All @@ -406,7 +533,7 @@ pub async fn reconcile_kafka(
)
.context(BuildConfigMapSnafu)?;

let rg_statefulset = match kafka_role {
let mut rg_statefulset = match kafka_role {
KafkaRole::Broker => build_broker_rolegroup_statefulset(
kafka,
&kafka_role,
Expand All @@ -433,6 +560,44 @@ pub async fn reconcile_kafka(
.context(BuildStatefulsetSnafu)?,
};

if sequenced_shutdown {
match kafka_role {
KafkaRole::Broker => {
// Always stop brokers first.
if let Some(spec) = rg_statefulset.spec.as_mut() {
spec.replicas = Some(0);
}
}
KafkaRole::Controller => {
if brokers_stopped {
// All brokers are gone, safe to stop controllers now.
if let Some(spec) = rg_statefulset.spec.as_mut() {
spec.replicas = Some(0);
}
} else if replicas == 0 {
// Controller replicas are set to 0 in the spec, but
// brokers are still running. Preserve the current
// running replica count so controllers stay up.
let current = client
.get::<StatefulSet>(
&rg_statefulset.name_any(),
rg_statefulset
.namespace()
.as_deref()
.unwrap_or_default(),
)
.await
.ok()
.and_then(|sts| sts.spec)
.and_then(|spec| spec.replicas);
if let Some(spec) = rg_statefulset.spec.as_mut() {
spec.replicas = current;
}
}
}
}
}

if let AnyConfig::Broker(broker_config) = merged_config {
let rg_bootstrap_listener = build_broker_rolegroup_bootstrap_listener(
kafka,
Expand Down Expand Up @@ -524,7 +689,13 @@ pub async fn reconcile_kafka(
.await
.context(ApplyStatusSnafu)?;

Ok(Action::await_change())
// During sequenced shutdown, requeue until brokers are fully stopped so we can
// proceed to stop controllers on the next reconciliation.
if sequenced_shutdown && !brokers_stopped {
Ok(Action::requeue(*SEQUENCED_SHUTDOWN_REQUEUE))
} else {
Ok(Action::await_change())
}
}

pub fn error_policy(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
---
# Scale Kafka down before kuttl deletes the namespace.
# Without this, ZooKeeper and Kafka are terminated simultaneously,
# and Kafka hangs on controlled-shutdown ZooKeeper timeouts.
# Stop the KafkaCluster before kuttl deletes the namespace.
# Without this, ZooKeeper/controllers and brokers are terminated simultaneously,
# and brokers hang on controlled-shutdown timeouts.
apiVersion: kuttl.dev/v1beta1
kind: TestStep
commands:
- script: |
kubectl patch kafkacluster test-kafka -n $NAMESPACE --type merge -p '{"spec":{"brokers":{"roleGroups":{"default":{"replicas":0}}}}}'
kubectl patch kafkacluster test-kafka -n $NAMESPACE --type merge -p '{"spec":{"clusterOperation":{"stopped":true}}}'
- script: |
kubectl wait --for=delete pod -l app.kubernetes.io/instance=test-kafka -n $NAMESPACE --timeout=300s
11 changes: 11 additions & 0 deletions tests/templates/kuttl/configuration/90-shutdown-kafka.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# Stop the KafkaCluster before kuttl deletes the namespace.
# Without this, ZooKeeper/controllers and brokers are terminated simultaneously,
# and brokers hang on controlled-shutdown timeouts.
apiVersion: kuttl.dev/v1beta1
kind: TestStep
commands:
- script: |
kubectl patch kafkacluster test-kafka -n $NAMESPACE --type merge -p '{"spec":{"clusterOperation":{"stopped":true}}}'
- script: |
kubectl wait --for=delete pod -l app.kubernetes.io/instance=test-kafka -n $NAMESPACE --timeout=300s
9 changes: 4 additions & 5 deletions tests/templates/kuttl/delete-rolegroup/90-shutdown-kafka.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
---
# Scale Kafka down before kuttl deletes the namespace.
# Without this, ZooKeeper and Kafka are terminated simultaneously,
# and Kafka hangs on controlled-shutdown ZooKeeper timeouts.
# Stop the KafkaCluster before kuttl deletes the namespace.
# Without this, ZooKeeper/controllers and brokers are terminated simultaneously,
# and brokers hang on controlled-shutdown timeouts.
apiVersion: kuttl.dev/v1beta1
kind: TestStep
commands:
# Note: By the time this script runs, the secondary RoleGroup has already been deleted, therefore we don't scale it down.
- script: |
kubectl patch kafkacluster test-kafka -n $NAMESPACE --type merge -p '{"spec":{"brokers":{"roleGroups":{"default":{"replicas":0}}}}}'
kubectl patch kafkacluster test-kafka -n $NAMESPACE --type merge -p '{"spec":{"clusterOperation":{"stopped":true}}}'
- script: |
kubectl wait --for=delete pod -l app.kubernetes.io/instance=test-kafka -n $NAMESPACE --timeout=300s
8 changes: 4 additions & 4 deletions tests/templates/kuttl/kerberos/90-shutdown-kafka.yaml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
---
# Scale Kafka down before kuttl deletes the namespace.
# Without this, ZooKeeper and Kafka are terminated simultaneously,
# and Kafka hangs on controlled-shutdown ZooKeeper timeouts.
# Stop the KafkaCluster before kuttl deletes the namespace.
# Without this, ZooKeeper/controllers and brokers are terminated simultaneously,
# and brokers hang on controlled-shutdown timeouts.
apiVersion: kuttl.dev/v1beta1
kind: TestStep
commands:
- script: |
kubectl patch kafkacluster test-kafka -n $NAMESPACE --type merge -p '{"spec":{"brokers":{"roleGroups":{"default":{"replicas":0}}}}}'
kubectl patch kafkacluster test-kafka -n $NAMESPACE --type merge -p '{"spec":{"clusterOperation":{"stopped":true}}}'
- script: |
kubectl wait --for=delete pod -l app.kubernetes.io/instance=test-kafka -n $NAMESPACE --timeout=300s
8 changes: 4 additions & 4 deletions tests/templates/kuttl/logging/90-shutdown-kafka.yaml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
---
# Scale Kafka down before kuttl deletes the namespace.
# Without this, ZooKeeper and Kafka are terminated simultaneously,
# and Kafka hangs on controlled-shutdown ZooKeeper timeouts.
# Stop the KafkaCluster before kuttl deletes the namespace.
# Without this, ZooKeeper/controllers and brokers are terminated simultaneously,
# and brokers hang on controlled-shutdown timeouts.
apiVersion: kuttl.dev/v1beta1
kind: TestStep
commands:
- script: |
kubectl patch kafkacluster test-kafka -n $NAMESPACE --type merge -p '{"spec":{"brokers":{"roleGroups":{"automatic-log-config":{"replicas":0}, "custom-log-config":{"replicas":0}}}}}'
kubectl patch kafkacluster test-kafka -n $NAMESPACE --type merge -p '{"spec":{"clusterOperation":{"stopped":true}}}'
- script: |
kubectl wait --for=delete pod -l app.kubernetes.io/instance=test-kafka -n $NAMESPACE --timeout=300s
8 changes: 4 additions & 4 deletions tests/templates/kuttl/opa/90-shutdown-kafka.yaml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
---
# Scale Kafka down before kuttl deletes the namespace.
# Without this, ZooKeeper and Kafka are terminated simultaneously,
# and Kafka hangs on controlled-shutdown ZooKeeper timeouts.
# Stop the KafkaCluster before kuttl deletes the namespace.
# Without this, ZooKeeper/controllers and brokers are terminated simultaneously,
# and brokers hang on controlled-shutdown timeouts.
apiVersion: kuttl.dev/v1beta1
kind: TestStep
commands:
- script: |
kubectl patch kafkacluster test-kafka -n $NAMESPACE --type merge -p '{"spec":{"brokers":{"roleGroups":{"default":{"replicas":0}}}}}'
kubectl patch kafkacluster test-kafka -n $NAMESPACE --type merge -p '{"spec":{"clusterOperation":{"stopped":true}}}'
- script: |
kubectl wait --for=delete pod -l app.kubernetes.io/instance=test-kafka -n $NAMESPACE --timeout=300s
11 changes: 11 additions & 0 deletions tests/templates/kuttl/operations-kraft/90-shutdown-kafka.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# Stop the KafkaCluster before kuttl deletes the namespace.
# Without this, ZooKeeper/controllers and brokers are terminated simultaneously,
# and brokers hang on controlled-shutdown timeouts.
apiVersion: kuttl.dev/v1beta1
kind: TestStep
commands:
- script: |
kubectl patch kafkacluster test-kafka -n $NAMESPACE --type merge -p '{"spec":{"clusterOperation":{"stopped":true}}}'
- script: |
kubectl wait --for=delete pod -l app.kubernetes.io/instance=test-kafka -n $NAMESPACE --timeout=300s
Loading