Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions common/config/src/main/resources/kubernetes.conf
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ kubernetes {
max-num-of-running-computing-units-per-user = 10
max-num-of-running-computing-units-per-user = ${?MAX_NUM_OF_RUNNING_COMPUTING_UNITS_PER_USER}

# Terminate Kubernetes CUs whose latest workflow execution is older than this.
computing-unit-idle-timeout-minutes = 1440
computing-unit-idle-timeout-minutes = ${?KUBERNETES_COMPUTING_UNIT_IDLE_TIMEOUT_MINUTES}

computing-unit-idle-check-interval-minutes = 60
computing-unit-idle-check-interval-minutes = ${?KUBERNETES_COMPUTING_UNIT_IDLE_CHECK_INTERVAL_MINUTES}

computing-unit-cpu-limit-options = "1,2,4"
computing-unit-cpu-limit-options = ${?KUBERNETES_COMPUTING_UNIT_CPU_LIMIT_OPTIONS}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ object KubernetesConfig {
val maxNumOfRunningComputingUnitsPerUser: Int =
conf.getInt("kubernetes.max-num-of-running-computing-units-per-user")

val computingUnitIdleTimeoutMinutes: Long =
conf.getLong("kubernetes.computing-unit-idle-timeout-minutes")

val computingUnitIdleCheckIntervalMinutes: Long =
conf.getLong("kubernetes.computing-unit-idle-check-interval-minutes")

val cpuLimitOptions: List[String] =
conf
.getString("kubernetes.computing-unit-cpu-limit-options")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,20 @@ import com.fasterxml.jackson.module.scala.DefaultScalaModule
import io.dropwizard.configuration.{EnvironmentVariableSubstitutor, SubstitutingSourceProvider}
import io.dropwizard.core.Application
import io.dropwizard.core.setup.{Bootstrap, Environment}
import org.apache.texera.common.config.StorageConfig
import org.apache.texera.common.config.{KubernetesConfig, StorageConfig}
import org.apache.texera.auth.{AuthFeatures, RequestLoggingFilter, RoleAnnotationEnforcer}
import org.apache.texera.dao.SqlServer
import org.apache.texera.service.resource.{
ComputingUnitAccessResource,
ComputingUnitManagingResource,
HealthCheckResource
}
import org.slf4j.LoggerFactory
import java.nio.file.Path
import java.util.concurrent.TimeUnit

class ComputingUnitManagingService extends Application[ComputingUnitManagingServiceConfiguration] {
private val logger = LoggerFactory.getLogger(classOf[ComputingUnitManagingService])

override def initialize(
bootstrap: Bootstrap[ComputingUnitManagingServiceConfiguration]
Expand Down Expand Up @@ -72,6 +75,30 @@ class ComputingUnitManagingService extends Application[ComputingUnitManagingServ
"ComputingUnitManagingService"
)

if (
KubernetesConfig.kubernetesComputingUnitEnabled &&
KubernetesConfig.computingUnitIdleTimeoutMinutes > 0
) {
environment.lifecycle
.scheduledExecutorService("idle-computing-unit-terminator")
.threads(1)
.build()
.scheduleWithFixedDelay(
() =>
try {
val terminated = ComputingUnitManagingResource.terminateIdleKubernetesComputingUnits()
if (terminated > 0) {
logger.info(s"Terminated $terminated idle Kubernetes computing unit(s)")
}
} catch {
case t: Throwable => logger.warn("Failed to terminate idle Kubernetes computing units", t)
},
KubernetesConfig.computingUnitIdleCheckIntervalMinutes,
KubernetesConfig.computingUnitIdleCheckIntervalMinutes,
TimeUnit.MINUTES
)
}

// Route request logs through SLF4J, controlled by TEXERA_SERVICE_LOG_LEVEL
RequestLoggingFilter.register(environment.getApplicationContext)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import org.apache.texera.common.config.{
}
import org.apache.texera.dao.SqlServer
import org.apache.texera.dao.SqlServer.withTransaction
import org.apache.texera.dao.jooq.generated.Tables.{WORKFLOW_COMPUTING_UNIT, WORKFLOW_EXECUTIONS}
import org.apache.texera.dao.jooq.generated.enums.{PrivilegeEnum, WorkflowComputingUnitTypeEnum}
import org.apache.texera.dao.jooq.generated.tables.daos.{
ComputingUnitUserAccessDao,
Expand Down Expand Up @@ -69,6 +70,73 @@ object ComputingUnitManagingResource {
.getInstance()
.createDSLContext()

def terminateIdleKubernetesComputingUnits(): Int = {
if (
!KubernetesConfig.kubernetesComputingUnitEnabled ||
KubernetesConfig.computingUnitIdleTimeoutMinutes <= 0
) {
return 0
}

val now = new Timestamp(System.currentTimeMillis())
val cutoff = new Timestamp(
now.getTime - KubernetesConfig.computingUnitIdleTimeoutMinutes * 60 * 1000
)
val activeStatuses = Seq(Short.box(0), Short.box(1), Short.box(2))

withTransaction(context) { ctx =>
val cuDao = new WorkflowComputingUnitDao(ctx.configuration())
ctx
.selectFrom(WORKFLOW_COMPUTING_UNIT)
.where(
WORKFLOW_COMPUTING_UNIT.TYPE
.eq(WorkflowComputingUnitTypeEnum.kubernetes)
.and(WORKFLOW_COMPUTING_UNIT.TERMINATE_TIME.isNull)
)
.fetchInto(classOf[WorkflowComputingUnit])
.asScala
.count { unit =>
val cuid = unit.getCuid
val hasActiveExecution = ctx.fetchExists(
ctx
.selectOne()
.from(WORKFLOW_EXECUTIONS)
.where(
WORKFLOW_EXECUTIONS.CUID
.eq(cuid)
.and(WORKFLOW_EXECUTIONS.STATUS.in(activeStatuses: _*))
)
)
val latestUpdateTime = ctx
.select(org.jooq.impl.DSL.max(WORKFLOW_EXECUTIONS.LAST_UPDATE_TIME))
.from(WORKFLOW_EXECUTIONS)
.where(WORKFLOW_EXECUTIONS.CUID.eq(cuid))
.fetchOne(0, classOf[Timestamp])
val latestStartTime = ctx
.select(org.jooq.impl.DSL.max(WORKFLOW_EXECUTIONS.STARTING_TIME))
.from(WORKFLOW_EXECUTIONS)
.where(WORKFLOW_EXECUTIONS.CUID.eq(cuid))
.fetchOne(0, classOf[Timestamp])
val lastExecutionTime = Seq(
Option(latestUpdateTime),
Option(latestStartTime),
Some(unit.getCreationTime)
).flatten.maxBy(_.getTime)

if (!hasActiveExecution && lastExecutionTime.before(cutoff)) {
if (KubernetesClient.podExists(cuid)) {
KubernetesClient.deletePod(cuid)
}
unit.setTerminateTime(now)
cuDao.update(unit)
true
} else {
false
}
}
}
}

private def icebergEnvironmentVariables: Map[String, Any] = {
val base = Map[String, Any](
EnvironmentalVariable.ENV_ICEBERG_CATALOG_TYPE -> StorageConfig.icebergCatalogType
Expand Down
Loading