From 1b66a976dfd5a552893811130d06a180c6749104 Mon Sep 17 00:00:00 2001 From: zaoduyuan Date: Wed, 1 Jul 2026 14:22:24 +0800 Subject: [PATCH] fix(kubernetes): terminate idle computing units --- .../config/src/main/resources/kubernetes.conf | 7 ++ .../common/config/KubernetesConfig.scala | 6 ++ .../ComputingUnitManagingService.scala | 29 +++++++- .../ComputingUnitManagingResource.scala | 68 +++++++++++++++++++ 4 files changed, 109 insertions(+), 1 deletion(-) diff --git a/common/config/src/main/resources/kubernetes.conf b/common/config/src/main/resources/kubernetes.conf index e85924e570c..b45c6ffbfeb 100644 --- a/common/config/src/main/resources/kubernetes.conf +++ b/common/config/src/main/resources/kubernetes.conf @@ -41,6 +41,13 @@ kubernetes { max-num-of-running-computing-units-per-user = 10 max-num-of-running-computing-units-per-user = ${?MAX_NUM_OF_RUNNING_COMPUTING_UNITS_PER_USER} + # Terminate Kubernetes CUs whose latest workflow execution is older than this. + computing-unit-idle-timeout-minutes = 1440 + computing-unit-idle-timeout-minutes = ${?KUBERNETES_COMPUTING_UNIT_IDLE_TIMEOUT_MINUTES} + + computing-unit-idle-check-interval-minutes = 60 + computing-unit-idle-check-interval-minutes = ${?KUBERNETES_COMPUTING_UNIT_IDLE_CHECK_INTERVAL_MINUTES} + computing-unit-cpu-limit-options = "1,2,4" computing-unit-cpu-limit-options = ${?KUBERNETES_COMPUTING_UNIT_CPU_LIMIT_OPTIONS} diff --git a/common/config/src/main/scala/org/apache/texera/common/config/KubernetesConfig.scala b/common/config/src/main/scala/org/apache/texera/common/config/KubernetesConfig.scala index f6294767365..65203666399 100644 --- a/common/config/src/main/scala/org/apache/texera/common/config/KubernetesConfig.scala +++ b/common/config/src/main/scala/org/apache/texera/common/config/KubernetesConfig.scala @@ -38,6 +38,12 @@ object KubernetesConfig { val maxNumOfRunningComputingUnitsPerUser: Int = conf.getInt("kubernetes.max-num-of-running-computing-units-per-user") + val computingUnitIdleTimeoutMinutes: Long = + conf.getLong("kubernetes.computing-unit-idle-timeout-minutes") + + val computingUnitIdleCheckIntervalMinutes: Long = + conf.getLong("kubernetes.computing-unit-idle-check-interval-minutes") + val cpuLimitOptions: List[String] = conf .getString("kubernetes.computing-unit-cpu-limit-options") diff --git a/computing-unit-managing-service/src/main/scala/org/apache/texera/service/ComputingUnitManagingService.scala b/computing-unit-managing-service/src/main/scala/org/apache/texera/service/ComputingUnitManagingService.scala index db63bbf2eb2..7c753559a8b 100644 --- a/computing-unit-managing-service/src/main/scala/org/apache/texera/service/ComputingUnitManagingService.scala +++ b/computing-unit-managing-service/src/main/scala/org/apache/texera/service/ComputingUnitManagingService.scala @@ -23,7 +23,7 @@ import com.fasterxml.jackson.module.scala.DefaultScalaModule import io.dropwizard.configuration.{EnvironmentVariableSubstitutor, SubstitutingSourceProvider} import io.dropwizard.core.Application import io.dropwizard.core.setup.{Bootstrap, Environment} -import org.apache.texera.common.config.StorageConfig +import org.apache.texera.common.config.{KubernetesConfig, StorageConfig} import org.apache.texera.auth.{AuthFeatures, RequestLoggingFilter, RoleAnnotationEnforcer} import org.apache.texera.dao.SqlServer import org.apache.texera.service.resource.{ @@ -31,9 +31,12 @@ import org.apache.texera.service.resource.{ ComputingUnitManagingResource, HealthCheckResource } +import org.slf4j.LoggerFactory import java.nio.file.Path +import java.util.concurrent.TimeUnit class ComputingUnitManagingService extends Application[ComputingUnitManagingServiceConfiguration] { + private val logger = LoggerFactory.getLogger(classOf[ComputingUnitManagingService]) override def initialize( bootstrap: Bootstrap[ComputingUnitManagingServiceConfiguration] @@ -72,6 +75,30 @@ class ComputingUnitManagingService extends Application[ComputingUnitManagingServ "ComputingUnitManagingService" ) + if ( + KubernetesConfig.kubernetesComputingUnitEnabled && + KubernetesConfig.computingUnitIdleTimeoutMinutes > 0 + ) { + environment.lifecycle + .scheduledExecutorService("idle-computing-unit-terminator") + .threads(1) + .build() + .scheduleWithFixedDelay( + () => + try { + val terminated = ComputingUnitManagingResource.terminateIdleKubernetesComputingUnits() + if (terminated > 0) { + logger.info(s"Terminated $terminated idle Kubernetes computing unit(s)") + } + } catch { + case t: Throwable => logger.warn("Failed to terminate idle Kubernetes computing units", t) + }, + KubernetesConfig.computingUnitIdleCheckIntervalMinutes, + KubernetesConfig.computingUnitIdleCheckIntervalMinutes, + TimeUnit.MINUTES + ) + } + // Route request logs through SLF4J, controlled by TEXERA_SERVICE_LOG_LEVEL RequestLoggingFilter.register(environment.getApplicationContext) } diff --git a/computing-unit-managing-service/src/main/scala/org/apache/texera/service/resource/ComputingUnitManagingResource.scala b/computing-unit-managing-service/src/main/scala/org/apache/texera/service/resource/ComputingUnitManagingResource.scala index aa02f73387e..fe3f21deac9 100644 --- a/computing-unit-managing-service/src/main/scala/org/apache/texera/service/resource/ComputingUnitManagingResource.scala +++ b/computing-unit-managing-service/src/main/scala/org/apache/texera/service/resource/ComputingUnitManagingResource.scala @@ -42,6 +42,7 @@ import org.apache.texera.common.config.{ } import org.apache.texera.dao.SqlServer import org.apache.texera.dao.SqlServer.withTransaction +import org.apache.texera.dao.jooq.generated.Tables.{WORKFLOW_COMPUTING_UNIT, WORKFLOW_EXECUTIONS} import org.apache.texera.dao.jooq.generated.enums.{PrivilegeEnum, WorkflowComputingUnitTypeEnum} import org.apache.texera.dao.jooq.generated.tables.daos.{ ComputingUnitUserAccessDao, @@ -69,6 +70,73 @@ object ComputingUnitManagingResource { .getInstance() .createDSLContext() + def terminateIdleKubernetesComputingUnits(): Int = { + if ( + !KubernetesConfig.kubernetesComputingUnitEnabled || + KubernetesConfig.computingUnitIdleTimeoutMinutes <= 0 + ) { + return 0 + } + + val now = new Timestamp(System.currentTimeMillis()) + val cutoff = new Timestamp( + now.getTime - KubernetesConfig.computingUnitIdleTimeoutMinutes * 60 * 1000 + ) + val activeStatuses = Seq(Short.box(0), Short.box(1), Short.box(2)) + + withTransaction(context) { ctx => + val cuDao = new WorkflowComputingUnitDao(ctx.configuration()) + ctx + .selectFrom(WORKFLOW_COMPUTING_UNIT) + .where( + WORKFLOW_COMPUTING_UNIT.TYPE + .eq(WorkflowComputingUnitTypeEnum.kubernetes) + .and(WORKFLOW_COMPUTING_UNIT.TERMINATE_TIME.isNull) + ) + .fetchInto(classOf[WorkflowComputingUnit]) + .asScala + .count { unit => + val cuid = unit.getCuid + val hasActiveExecution = ctx.fetchExists( + ctx + .selectOne() + .from(WORKFLOW_EXECUTIONS) + .where( + WORKFLOW_EXECUTIONS.CUID + .eq(cuid) + .and(WORKFLOW_EXECUTIONS.STATUS.in(activeStatuses: _*)) + ) + ) + val latestUpdateTime = ctx + .select(org.jooq.impl.DSL.max(WORKFLOW_EXECUTIONS.LAST_UPDATE_TIME)) + .from(WORKFLOW_EXECUTIONS) + .where(WORKFLOW_EXECUTIONS.CUID.eq(cuid)) + .fetchOne(0, classOf[Timestamp]) + val latestStartTime = ctx + .select(org.jooq.impl.DSL.max(WORKFLOW_EXECUTIONS.STARTING_TIME)) + .from(WORKFLOW_EXECUTIONS) + .where(WORKFLOW_EXECUTIONS.CUID.eq(cuid)) + .fetchOne(0, classOf[Timestamp]) + val lastExecutionTime = Seq( + Option(latestUpdateTime), + Option(latestStartTime), + Some(unit.getCreationTime) + ).flatten.maxBy(_.getTime) + + if (!hasActiveExecution && lastExecutionTime.before(cutoff)) { + if (KubernetesClient.podExists(cuid)) { + KubernetesClient.deletePod(cuid) + } + unit.setTerminateTime(now) + cuDao.update(unit) + true + } else { + false + } + } + } + } + private def icebergEnvironmentVariables: Map[String, Any] = { val base = Map[String, Any]( EnvironmentalVariable.ENV_ICEBERG_CATALOG_TYPE -> StorageConfig.icebergCatalogType