Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ kotlinxCoroutines = "1.10.2"
kurrentClient = "1.0.1"
orgReactiveStream = "1.0.4"
mongoDriver = "5.5.0"
testContainers = "1.21.0"
testContainers = "1.21.4"
exposed = "0.59.0"
postgresql = "42.7.3"
slf4j = "2.0.9"
Expand Down
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-8.13-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-9.5.0-bin.zip
networkTimeout=10000
validateDistributionUrl=true
zipStoreBase=GRADLE_USER_HOME
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package dev.helight.krescent.supervisor

import dev.helight.krescent.model.ReadModelBase
import dev.helight.krescent.model.ReadModelBase.Extension.strategy
import dev.helight.krescent.model.ReadModelBase.Extension.stream
import dev.helight.krescent.source.StreamingEventSource
import dev.helight.krescent.source.strategy.StreamingSourcingStrategy
import org.slf4j.LoggerFactory
Expand All @@ -12,8 +11,8 @@ import kotlin.time.Duration
import kotlin.time.DurationUnit
import kotlin.time.toDuration

class BalancedReadModelJob(
val modelSupplier: () -> ReadModelBase,
class BalancedReadModelJob<T: ReadModelBase>(
val modelSupplier: () -> T,
val source: StreamingEventSource? = null,
val retryAttempts: Int = 3,
val retryDelay: Duration = 1.toDuration(DurationUnit.SECONDS),
Expand All @@ -36,6 +35,8 @@ class BalancedReadModelJob(
private var currentAttempt = 0
private var timeout: Long = 0L
private var lastStart: Long = 0L
override var current: T? = null
override var ready: Boolean = false

override suspend fun condition(supervisor: ModelSupervisor): Boolean {
return !supervisor.startupMutex.isLocked && System.currentTimeMillis() > timeout
Expand All @@ -48,27 +49,34 @@ class BalancedReadModelJob(

override suspend fun run(supervisor: ModelSupervisor) {
val source = sourceSupplier()
val model = modelSupplier()
current = modelSupplier()
if (preventParallelCatchup) {
model.strategy(source, StreamingSourcingStrategy {
current!!.strategy(source, StreamingSourcingStrategy {
supervisor.startupMutex.unlock(this)
ready = true
})
} else {
model.stream(source)
current!!.strategy(source, StreamingSourcingStrategy {
ready = true
})
}
}

override suspend fun onExited(supervisor: ModelSupervisor) {
if (supervisor.startupMutex.holdsLock(this)) {
supervisor.startupMutex.unlock(this)
}
current = null
ready = false
}

@Suppress("ConvertTwoComparisonsToRangeCheck")
override suspend fun onFailed(supervisor: ModelSupervisor, error: Throwable) {
if (supervisor.startupMutex.holdsLock(this)) {
supervisor.startupMutex.unlock(this)
}
current = null
ready = false

if (System.currentTimeMillis() - lastStart >= retryResetDelay.toLong(DurationUnit.MILLISECONDS)) {
currentAttempt = 0
Expand All @@ -85,6 +93,11 @@ class BalancedReadModelJob(
}
}

override fun toString(): String {
return "BalancedReadModelJob(ready=$ready, current=$current, lastStart=$lastStart, timeout=$timeout, currentAttempt=$currentAttempt, source=$source)"
}


companion object {
private val logger = LoggerFactory.getLogger(BalancedReadModelJob::class.java)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
package dev.helight.krescent.supervisor

import dev.helight.krescent.model.ReadModelBase

interface ModelJob {
val current: ReadModelBase?
val ready: Boolean
get() = true

suspend fun condition(supervisor: ModelSupervisor) = !supervisor.startupMutex.isLocked
suspend fun run(supervisor: ModelSupervisor)
suspend fun onBefore(supervisor: ModelSupervisor) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,37 @@ package dev.helight.krescent.supervisor

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.selects.onTimeout
import kotlinx.coroutines.selects.select
import kotlinx.coroutines.sync.Mutex
import org.slf4j.LoggerFactory
import kotlin.coroutines.CoroutineContext
import kotlin.time.Duration.Companion.milliseconds

class ModelSupervisor(
val keepAliveChecks: Long = 100L,
) {

private val jobs = mutableListOf<ScheduledJob>()
private val notifications = Channel<Unit>(Channel.CONFLATED)
private val logger = LoggerFactory.getLogger(ModelSupervisor::class.java)
private var started = false
private var currentJob: CoroutineScope? = null
private var panic: Exception? = null

private val stateFlow: MutableStateFlow<SupervisorState> = MutableStateFlow(SupervisorState.STOPPED)
val state: StateFlow<SupervisorState> get() = stateFlow

val jobs = mutableListOf<ScheduledJob>()
val startupMutex = Mutex()

inline fun <reified T> modelOf(): T? {
return jobs.map { it.given.current }.filterIsInstance<T>().firstOrNull()
}

fun register(job: ModelJob) {
if (started) error("Cannot register new jobs after the supervisor has started.")
if (!state.value.startable) error("Cannot register new jobs while supervisor is not stopped.")
jobs.add(ScheduledJob(job))
}

Expand All @@ -39,31 +50,61 @@ class ModelSupervisor(
}

suspend fun execute() {
if (started) error("Supervisor is already running.")
if (!state.value.startable) error("Supervisor is already running.")
supervisor()
}

suspend fun launch(context: CoroutineScope) = coroutineScope {
if (!state.value.startable) error("Supervisor is already running.")
stateFlow.value = SupervisorState.STOPPED
val awaitable = async {
state.first {
check(it != SupervisorState.PANIC) { "Supervisor encountered a panic." }
it == SupervisorState.RUNNING
}
}
context.launch { supervisor() }
awaitable.await()
}

@OptIn(ExperimentalCoroutinesApi::class)
private suspend fun supervisor() = coroutineScope {
currentJob = this
panic = null
val supervision = SupervisorJob()
val context = supervision + Dispatchers.Default
try {
started = true
stateFlow.value = SupervisorState.STARTING
logger.debug("Starting model supervisor.")
while (isActive) {
panic?.let { throw it }
panic?.let {
stateFlow.value = SupervisorState.PANIC
throw it
}
var allReady = true
for (scheduled in jobs) {
if (!scheduled.isActive || !scheduled.given.ready) allReady = false
if (scheduled.isActive) continue
if (!scheduled.given.condition(this@ModelSupervisor)) continue
scheduled.given.onBefore(this@ModelSupervisor)
recreateJob(scheduled, context)
panic?.let { throw it }
panic?.let {
stateFlow.value = SupervisorState.PANIC
throw it
}
}

val currentState = stateFlow.value
if (allReady && currentState != SupervisorState.RUNNING) {
stateFlow.value = SupervisorState.RUNNING
logger.debug("Model supervisor is now running.")
} else if (!allReady && currentState == SupervisorState.RUNNING) {
stateFlow.value = SupervisorState.RECOVERING
}

select {
notifications.onReceive {}
onTimeout(keepAliveChecks) {}
onTimeout(keepAliveChecks.milliseconds) {}
}
}
} finally {
Expand All @@ -73,7 +114,7 @@ class ModelSupervisor(
scheduled.error = null
scheduled.job = null
}
started = false
stateFlow.value = SupervisorState.STOPPED
logger.debug("Model supervisor has exited.")
}
}
Expand All @@ -100,14 +141,28 @@ class ModelSupervisor(
logger.debug("Model job '{}' has been (re)started.", scheduled.given)
}

private class ScheduledJob(
class ScheduledJob(
val given: ModelJob,
) {
var job: Job? = null
var error: Throwable? = null

val isActive: Boolean
get() = job?.isActive == true && error == null

override fun toString(): String {
return "ScheduledJob(given=$given, job=$job, error=$error, isActive=$isActive)"
}


}

enum class SupervisorState(val startable: Boolean = false) {
STOPPED(true),
PANIC(true),
STARTING,
RUNNING,
RECOVERING,
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,29 @@ import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Test
import kotlin.test.assertContentEquals
import kotlin.time.Duration.Companion.milliseconds

class CatchupFlowTest {
@Test
fun `Create catchup flow deduplication`() = runBlocking {
val firstFlow = flow {
emit(1)
delay(80)
delay(80.milliseconds)
emit(2)
delay(80)
delay(80.milliseconds)
emit(3)
delay(80)
delay(80.milliseconds)
emit(4)
}

val secondFlow = flow {
delay(10)
delay(10.milliseconds)
emit(3)
delay(10)
delay(10.milliseconds)
emit(4)
delay(10)
delay(10.milliseconds)
emit(5)
delay(50)
delay(50.milliseconds)
emit(6)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.time.Duration.Companion.milliseconds

/**
* Test class for the CoroutineUtilsKt class focusing on the 'joinSequentialFlows' function.
Expand Down Expand Up @@ -42,18 +43,18 @@ class JoinSequentialFlowTest {
fun `test joinSequentialFlows with delay between emissions`() = runBlocking {
val firstFlow = flow {
emit("X")
delay(100)
delay(100.milliseconds)
emit("Y")
delay(100)
delay(100.milliseconds)
emit("Z")
}

val secondFlow = flow {
delay(50)
delay(50.milliseconds)
emit("7")
delay(50)
delay(50.milliseconds)
emit("8")
delay(50)
delay(50.milliseconds)
emit("9")
}

Expand Down Expand Up @@ -134,14 +135,14 @@ class JoinSequentialFlowTest {
fun `test joinSequentialFlows with second flow emitting after delay`() = runBlocking {
val firstFlow = flow {
emit("Alpha")
delay(200)
delay(200.milliseconds)
emit("Beta")
}

val secondFlow = flow {
delay(300)
delay(300.milliseconds)
emit("1")
delay(100)
delay(100.milliseconds)
emit("2")
}

Expand All @@ -158,14 +159,14 @@ class JoinSequentialFlowTest {
fun `test joinSequentialFlows with overlapping emissions`() = runBlocking {
val firstFlow = flow {
emit("Start-1A")
delay(80)
delay(80.milliseconds)
emit("Start-1B")
}

val secondFlow = flow {
delay(50)
delay(50.milliseconds)
emit("Live-2A")
delay(100)
delay(100.milliseconds)
emit("Live-2B")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertNotNull
import kotlin.test.assertNull
import kotlin.time.Duration.Companion.milliseconds

class BookCountReadModelTest {

Expand Down Expand Up @@ -80,7 +81,7 @@ class BookCountReadModelTest {
useCheckpoints(checkpointStorage, MinimizedCheckpointStrategy())
}.stream(source)
}
delay(200)
delay(200.milliseconds)
assertNotNull(checkpointStorage.getLatestCheckpoint("books.counts"))
job.cancelAndJoin()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertFalse
import kotlin.test.assertTrue
import kotlin.time.Duration.Companion.milliseconds

class BookWriteModelTest {

Expand Down Expand Up @@ -74,7 +75,7 @@ class BookWriteModelTest {
wasWellBehaved = false
}
isWriting = true
delay(50)
delay(50.milliseconds)
removeCopies(1)
isWriting = false
}
Expand Down
Loading
Loading