diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 064b140..25264bd 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -6,7 +6,7 @@ kotlinxCoroutines = "1.10.2" kurrentClient = "1.0.1" orgReactiveStream = "1.0.4" mongoDriver = "5.5.0" -testContainers = "1.21.0" +testContainers = "1.21.4" exposed = "0.59.0" postgresql = "42.7.3" slf4j = "2.0.9" diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 37f853b..1a70468 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-8.13-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-9.5.0-bin.zip networkTimeout=10000 validateDistributionUrl=true zipStoreBase=GRADLE_USER_HOME diff --git a/krescent-core/src/main/kotlin/dev/helight/krescent/supervisor/BalancedReadModelJob.kt b/krescent-core/src/main/kotlin/dev/helight/krescent/supervisor/BalancedReadModelJob.kt index 5ae2bd7..0521a5d 100644 --- a/krescent-core/src/main/kotlin/dev/helight/krescent/supervisor/BalancedReadModelJob.kt +++ b/krescent-core/src/main/kotlin/dev/helight/krescent/supervisor/BalancedReadModelJob.kt @@ -2,7 +2,6 @@ package dev.helight.krescent.supervisor import dev.helight.krescent.model.ReadModelBase import dev.helight.krescent.model.ReadModelBase.Extension.strategy -import dev.helight.krescent.model.ReadModelBase.Extension.stream import dev.helight.krescent.source.StreamingEventSource import dev.helight.krescent.source.strategy.StreamingSourcingStrategy import org.slf4j.LoggerFactory @@ -12,8 +11,8 @@ import kotlin.time.Duration import kotlin.time.DurationUnit import kotlin.time.toDuration -class BalancedReadModelJob( - val modelSupplier: () -> ReadModelBase, +class BalancedReadModelJob( + val modelSupplier: () -> T, val source: StreamingEventSource? = null, val retryAttempts: Int = 3, val retryDelay: Duration = 1.toDuration(DurationUnit.SECONDS), @@ -36,6 +35,8 @@ class BalancedReadModelJob( private var currentAttempt = 0 private var timeout: Long = 0L private var lastStart: Long = 0L + override var current: T? = null + override var ready: Boolean = false override suspend fun condition(supervisor: ModelSupervisor): Boolean { return !supervisor.startupMutex.isLocked && System.currentTimeMillis() > timeout @@ -48,13 +49,16 @@ class BalancedReadModelJob( override suspend fun run(supervisor: ModelSupervisor) { val source = sourceSupplier() - val model = modelSupplier() + current = modelSupplier() if (preventParallelCatchup) { - model.strategy(source, StreamingSourcingStrategy { + current!!.strategy(source, StreamingSourcingStrategy { supervisor.startupMutex.unlock(this) + ready = true }) } else { - model.stream(source) + current!!.strategy(source, StreamingSourcingStrategy { + ready = true + }) } } @@ -62,6 +66,8 @@ class BalancedReadModelJob( if (supervisor.startupMutex.holdsLock(this)) { supervisor.startupMutex.unlock(this) } + current = null + ready = false } @Suppress("ConvertTwoComparisonsToRangeCheck") @@ -69,6 +75,8 @@ class BalancedReadModelJob( if (supervisor.startupMutex.holdsLock(this)) { supervisor.startupMutex.unlock(this) } + current = null + ready = false if (System.currentTimeMillis() - lastStart >= retryResetDelay.toLong(DurationUnit.MILLISECONDS)) { currentAttempt = 0 @@ -85,6 +93,11 @@ class BalancedReadModelJob( } } + override fun toString(): String { + return "BalancedReadModelJob(ready=$ready, current=$current, lastStart=$lastStart, timeout=$timeout, currentAttempt=$currentAttempt, source=$source)" + } + + companion object { private val logger = LoggerFactory.getLogger(BalancedReadModelJob::class.java) diff --git a/krescent-core/src/main/kotlin/dev/helight/krescent/supervisor/ModelJob.kt b/krescent-core/src/main/kotlin/dev/helight/krescent/supervisor/ModelJob.kt index 6655b35..7de5110 100644 --- a/krescent-core/src/main/kotlin/dev/helight/krescent/supervisor/ModelJob.kt +++ b/krescent-core/src/main/kotlin/dev/helight/krescent/supervisor/ModelJob.kt @@ -1,6 +1,12 @@ package dev.helight.krescent.supervisor +import dev.helight.krescent.model.ReadModelBase + interface ModelJob { + val current: ReadModelBase? + val ready: Boolean + get() = true + suspend fun condition(supervisor: ModelSupervisor) = !supervisor.startupMutex.isLocked suspend fun run(supervisor: ModelSupervisor) suspend fun onBefore(supervisor: ModelSupervisor) {} diff --git a/krescent-core/src/main/kotlin/dev/helight/krescent/supervisor/ModelSupervisor.kt b/krescent-core/src/main/kotlin/dev/helight/krescent/supervisor/ModelSupervisor.kt index 5a02332..7fdc0cc 100644 --- a/krescent-core/src/main/kotlin/dev/helight/krescent/supervisor/ModelSupervisor.kt +++ b/krescent-core/src/main/kotlin/dev/helight/krescent/supervisor/ModelSupervisor.kt @@ -2,26 +2,37 @@ package dev.helight.krescent.supervisor import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.first import kotlinx.coroutines.selects.onTimeout import kotlinx.coroutines.selects.select import kotlinx.coroutines.sync.Mutex import org.slf4j.LoggerFactory import kotlin.coroutines.CoroutineContext +import kotlin.time.Duration.Companion.milliseconds class ModelSupervisor( val keepAliveChecks: Long = 100L, ) { - private val jobs = mutableListOf() private val notifications = Channel(Channel.CONFLATED) private val logger = LoggerFactory.getLogger(ModelSupervisor::class.java) - private var started = false private var currentJob: CoroutineScope? = null private var panic: Exception? = null + + private val stateFlow: MutableStateFlow = MutableStateFlow(SupervisorState.STOPPED) + val state: StateFlow get() = stateFlow + + val jobs = mutableListOf() val startupMutex = Mutex() + inline fun modelOf(): T? { + return jobs.map { it.given.current }.filterIsInstance().firstOrNull() + } + fun register(job: ModelJob) { - if (started) error("Cannot register new jobs after the supervisor has started.") + if (!state.value.startable) error("Cannot register new jobs while supervisor is not stopped.") jobs.add(ScheduledJob(job)) } @@ -39,10 +50,23 @@ class ModelSupervisor( } suspend fun execute() { - if (started) error("Supervisor is already running.") + if (!state.value.startable) error("Supervisor is already running.") supervisor() } + suspend fun launch(context: CoroutineScope) = coroutineScope { + if (!state.value.startable) error("Supervisor is already running.") + stateFlow.value = SupervisorState.STOPPED + val awaitable = async { + state.first { + check(it != SupervisorState.PANIC) { "Supervisor encountered a panic." } + it == SupervisorState.RUNNING + } + } + context.launch { supervisor() } + awaitable.await() + } + @OptIn(ExperimentalCoroutinesApi::class) private suspend fun supervisor() = coroutineScope { currentJob = this @@ -50,20 +74,37 @@ class ModelSupervisor( val supervision = SupervisorJob() val context = supervision + Dispatchers.Default try { - started = true + stateFlow.value = SupervisorState.STARTING logger.debug("Starting model supervisor.") while (isActive) { - panic?.let { throw it } + panic?.let { + stateFlow.value = SupervisorState.PANIC + throw it + } + var allReady = true for (scheduled in jobs) { + if (!scheduled.isActive || !scheduled.given.ready) allReady = false if (scheduled.isActive) continue if (!scheduled.given.condition(this@ModelSupervisor)) continue scheduled.given.onBefore(this@ModelSupervisor) recreateJob(scheduled, context) - panic?.let { throw it } + panic?.let { + stateFlow.value = SupervisorState.PANIC + throw it + } } + + val currentState = stateFlow.value + if (allReady && currentState != SupervisorState.RUNNING) { + stateFlow.value = SupervisorState.RUNNING + logger.debug("Model supervisor is now running.") + } else if (!allReady && currentState == SupervisorState.RUNNING) { + stateFlow.value = SupervisorState.RECOVERING + } + select { notifications.onReceive {} - onTimeout(keepAliveChecks) {} + onTimeout(keepAliveChecks.milliseconds) {} } } } finally { @@ -73,7 +114,7 @@ class ModelSupervisor( scheduled.error = null scheduled.job = null } - started = false + stateFlow.value = SupervisorState.STOPPED logger.debug("Model supervisor has exited.") } } @@ -100,7 +141,7 @@ class ModelSupervisor( logger.debug("Model job '{}' has been (re)started.", scheduled.given) } - private class ScheduledJob( + class ScheduledJob( val given: ModelJob, ) { var job: Job? = null @@ -108,6 +149,20 @@ class ModelSupervisor( val isActive: Boolean get() = job?.isActive == true && error == null + + override fun toString(): String { + return "ScheduledJob(given=$given, job=$job, error=$error, isActive=$isActive)" + } + + + } + + enum class SupervisorState(val startable: Boolean = false) { + STOPPED(true), + PANIC(true), + STARTING, + RUNNING, + RECOVERING, } } diff --git a/krescent-core/src/test/kotlin/dev/helight/krescent/CatchupFlowTest.kt b/krescent-core/src/test/kotlin/dev/helight/krescent/CatchupFlowTest.kt index 2fb1c9b..e9844ac 100644 --- a/krescent-core/src/test/kotlin/dev/helight/krescent/CatchupFlowTest.kt +++ b/krescent-core/src/test/kotlin/dev/helight/krescent/CatchupFlowTest.kt @@ -5,28 +5,29 @@ import kotlinx.coroutines.flow.flow import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.Test import kotlin.test.assertContentEquals +import kotlin.time.Duration.Companion.milliseconds class CatchupFlowTest { @Test fun `Create catchup flow deduplication`() = runBlocking { val firstFlow = flow { emit(1) - delay(80) + delay(80.milliseconds) emit(2) - delay(80) + delay(80.milliseconds) emit(3) - delay(80) + delay(80.milliseconds) emit(4) } val secondFlow = flow { - delay(10) + delay(10.milliseconds) emit(3) - delay(10) + delay(10.milliseconds) emit(4) - delay(10) + delay(10.milliseconds) emit(5) - delay(50) + delay(50.milliseconds) emit(6) } diff --git a/krescent-core/src/test/kotlin/dev/helight/krescent/JoinSequentialFlowTest.kt b/krescent-core/src/test/kotlin/dev/helight/krescent/JoinSequentialFlowTest.kt index 53ab8bc..d038da1 100644 --- a/krescent-core/src/test/kotlin/dev/helight/krescent/JoinSequentialFlowTest.kt +++ b/krescent-core/src/test/kotlin/dev/helight/krescent/JoinSequentialFlowTest.kt @@ -5,6 +5,7 @@ import kotlinx.coroutines.flow.flow import kotlinx.coroutines.runBlocking import kotlin.test.Test import kotlin.test.assertEquals +import kotlin.time.Duration.Companion.milliseconds /** * Test class for the CoroutineUtilsKt class focusing on the 'joinSequentialFlows' function. @@ -42,18 +43,18 @@ class JoinSequentialFlowTest { fun `test joinSequentialFlows with delay between emissions`() = runBlocking { val firstFlow = flow { emit("X") - delay(100) + delay(100.milliseconds) emit("Y") - delay(100) + delay(100.milliseconds) emit("Z") } val secondFlow = flow { - delay(50) + delay(50.milliseconds) emit("7") - delay(50) + delay(50.milliseconds) emit("8") - delay(50) + delay(50.milliseconds) emit("9") } @@ -134,14 +135,14 @@ class JoinSequentialFlowTest { fun `test joinSequentialFlows with second flow emitting after delay`() = runBlocking { val firstFlow = flow { emit("Alpha") - delay(200) + delay(200.milliseconds) emit("Beta") } val secondFlow = flow { - delay(300) + delay(300.milliseconds) emit("1") - delay(100) + delay(100.milliseconds) emit("2") } @@ -158,14 +159,14 @@ class JoinSequentialFlowTest { fun `test joinSequentialFlows with overlapping emissions`() = runBlocking { val firstFlow = flow { emit("Start-1A") - delay(80) + delay(80.milliseconds) emit("Start-1B") } val secondFlow = flow { - delay(50) + delay(50.milliseconds) emit("Live-2A") - delay(100) + delay(100.milliseconds) emit("Live-2B") } diff --git a/krescent-core/src/test/kotlin/dev/helight/krescent/bookstore/BookCountReadModelTest.kt b/krescent-core/src/test/kotlin/dev/helight/krescent/bookstore/BookCountReadModelTest.kt index 9e9c437..cd61dd7 100644 --- a/krescent-core/src/test/kotlin/dev/helight/krescent/bookstore/BookCountReadModelTest.kt +++ b/krescent-core/src/test/kotlin/dev/helight/krescent/bookstore/BookCountReadModelTest.kt @@ -20,6 +20,7 @@ import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.assertNotNull import kotlin.test.assertNull +import kotlin.time.Duration.Companion.milliseconds class BookCountReadModelTest { @@ -80,7 +81,7 @@ class BookCountReadModelTest { useCheckpoints(checkpointStorage, MinimizedCheckpointStrategy()) }.stream(source) } - delay(200) + delay(200.milliseconds) assertNotNull(checkpointStorage.getLatestCheckpoint("books.counts")) job.cancelAndJoin() } diff --git a/krescent-core/src/test/kotlin/dev/helight/krescent/bookstore/BookWriteModelTest.kt b/krescent-core/src/test/kotlin/dev/helight/krescent/bookstore/BookWriteModelTest.kt index 238682a..d2a6270 100644 --- a/krescent-core/src/test/kotlin/dev/helight/krescent/bookstore/BookWriteModelTest.kt +++ b/krescent-core/src/test/kotlin/dev/helight/krescent/bookstore/BookWriteModelTest.kt @@ -27,6 +27,7 @@ import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.assertFalse import kotlin.test.assertTrue +import kotlin.time.Duration.Companion.milliseconds class BookWriteModelTest { @@ -74,7 +75,7 @@ class BookWriteModelTest { wasWellBehaved = false } isWriting = true - delay(50) + delay(50.milliseconds) removeCopies(1) isWriting = false } diff --git a/krescent-core/src/test/kotlin/dev/helight/krescent/bookstore/Events.kt b/krescent-core/src/test/kotlin/dev/helight/krescent/bookstore/Events.kt index ee9b700..a888e42 100644 --- a/krescent-core/src/test/kotlin/dev/helight/krescent/bookstore/Events.kt +++ b/krescent-core/src/test/kotlin/dev/helight/krescent/bookstore/Events.kt @@ -113,4 +113,4 @@ val finalBookStates = mapOf( ) ) -val bookOneEvtCount = 5 \ No newline at end of file +const val bookOneEvtCount = 5 \ No newline at end of file diff --git a/krescent-core/src/test/kotlin/dev/helight/krescent/bookstore/ListedBooksReadModelTest.kt b/krescent-core/src/test/kotlin/dev/helight/krescent/bookstore/ListedBooksReadModelTest.kt index 6876d86..eaa3f55 100644 --- a/krescent-core/src/test/kotlin/dev/helight/krescent/bookstore/ListedBooksReadModelTest.kt +++ b/krescent-core/src/test/kotlin/dev/helight/krescent/bookstore/ListedBooksReadModelTest.kt @@ -14,19 +14,18 @@ import kotlinx.serialization.json.Json import kotlinx.serialization.json.decodeFromJsonElement import kotlinx.serialization.json.encodeToJsonElement import kotlin.test.* +import kotlin.time.Duration.Companion.milliseconds class ListedBooksReadModelTest { @Test fun `Test single large catchup`() = runBlocking { val buffer = mutableMapOf() - buffer.put( - "3", BookState( - title = "Left Overs", - author = "", - price = 1.0, - copies = 1 - ) + buffer["3"] = BookState( + title = "Left Overs", + author = "", + price = 1.0, + copies = 1 ) val source = InMemoryEventStore(bookstoreSimulatedEventStream.toMutableList()) source.buildEventModel("books.listed", 1, bookstoreEventCatalog) { @@ -42,13 +41,11 @@ class ListedBooksReadModelTest { handler { when (it) { - is BookAddedEvent -> view.put( - it.bookId, BookState( - title = it.title, - author = it.author, - price = it.price, - copies = it.copies - ) + is BookAddedEvent -> view[it.bookId] = BookState( + title = it.title, + author = it.author, + price = it.price, + copies = it.copies ) is BookRemovedEvent -> view.remove(it.bookId) @@ -70,13 +67,11 @@ class ListedBooksReadModelTest { @Test fun `Test delayed catchup receives new elements`() = runBlocking { val buffer = mutableMapOf() - buffer.put( - "3", BookState( - title = "Left Overs", - author = "", - price = 1.0, - copies = 1 - ) + buffer["3"] = BookState( + title = "Left Overs", + author = "", + price = 1.0, + copies = 1 ) val source = InMemoryEventStore() source.publish(bookstoreSimulatedEventStream[0]) @@ -95,13 +90,11 @@ class ListedBooksReadModelTest { handler { when (it) { - is BookAddedEvent -> view.put( - it.bookId, BookState( - title = it.title, - author = it.author, - price = it.price, - copies = it.copies - ) + is BookAddedEvent -> view[it.bookId] = BookState( + title = it.title, + author = it.author, + price = it.price, + copies = it.copies ) is BookRemovedEvent -> view.remove(it.bookId) @@ -131,13 +124,11 @@ class ListedBooksReadModelTest { @Test fun `Test streamed event source working as expected`() = runBlocking { val buffer = mutableMapOf() - buffer.put( - "3", BookState( - title = "Left Overs", - author = "", - price = 1.0, - copies = 1 - ) + buffer["3"] = BookState( + title = "Left Overs", + author = "", + price = 1.0, + copies = 1 ) val source = InMemoryEventStore() source.publish(bookstoreSimulatedEventStream[0]) @@ -156,13 +147,11 @@ class ListedBooksReadModelTest { handler { when (it) { - is BookAddedEvent -> view.put( - it.bookId, BookState( - title = it.title, - author = it.author, - price = it.price, - copies = it.copies - ) + is BookAddedEvent -> view[it.bookId] = BookState( + title = it.title, + author = it.author, + price = it.price, + copies = it.copies ) is BookRemovedEvent -> view.remove(it.bookId) @@ -179,15 +168,15 @@ class ListedBooksReadModelTest { val streamingJob = launch { model.stream() } - delay(50) + delay(50.milliseconds) assertEquals(1, buffer.size) source.publish(bookstoreSimulatedEventStream[1]) - delay(50) + delay(50.milliseconds) assertEquals(2, buffer.size) source.publishAll(bookstoreSimulatedEventStream.drop(2)) - delay(50) + delay(50.milliseconds) assertEquals(2, buffer.size) streamingJob.cancelAndJoin() @@ -196,13 +185,11 @@ class ListedBooksReadModelTest { @Test fun `Test function based event model catchup with checkpoints`() = runBlocking { val buffer = mutableMapOf() - buffer.put( - "3", BookState( - title = "Left Overs", - author = "", - price = 1.0, - copies = 1 - ) + buffer["3"] = BookState( + title = "Left Overs", + author = "", + price = 1.0, + copies = 1 ) val source = InMemoryEventStore() val checkpointStorage = InMemoryCheckpointStorage() @@ -226,13 +213,11 @@ class ListedBooksReadModelTest { handler { when (it) { - is BookAddedEvent -> view.put( - it.bookId, BookState( - title = it.title, - author = it.author, - price = it.price, - copies = it.copies - ) + is BookAddedEvent -> view[it.bookId] = BookState( + title = it.title, + author = it.author, + price = it.price, + copies = it.copies ) is BookRemovedEvent -> view.remove(it.bookId) @@ -250,20 +235,20 @@ class ListedBooksReadModelTest { var streamingJob = launch { model.stream() } - delay(50) + delay(50.milliseconds) assertEquals(1, buffer.size) assertNull(checkpointStorage.getLatestCheckpoint("books.listed")) // Checkpoint and normal event processing manualCheckpointStrategy.mark() // Checkpoint after the next event source.publish(bookstoreSimulatedEventStream[1]) - delay(50) + delay(50.milliseconds) assertEquals(2, buffer.size) assertNotNull(checkpointStorage.getLatestCheckpoint("books.listed")) // Publish the rest of the event stream, no checkpoint should be created here source.publishAll(bookstoreSimulatedEventStream.drop(2)) - delay(50) + delay(50.milliseconds) assertEquals(2, buffer.size) assertEquals(buffer["1"], finalBookStates["1"]) assertEquals(buffer["2"], finalBookStates["2"]) @@ -271,33 +256,29 @@ class ListedBooksReadModelTest { streamingJob.cancelAndJoin() // Check if the restored state matches the expected non-final state - buffer.put( - "3", BookState( - title = "Left Overs", - author = "", - price = 1.0, - copies = 1 - ) + buffer["3"] = BookState( + title = "Left Overs", + author = "", + price = 1.0, + copies = 1 ) model.restore() - delay(50) + delay(50.milliseconds) assertEquals(2, buffer.size) assertNotEquals(buffer["1"], finalBookStates["1"]) // Check if streaming from the restored state results in the final state - buffer.put( - "3", BookState( - title = "Left Overs", - author = "", - price = 1.0, - copies = 1 - ) + buffer["3"] = BookState( + title = "Left Overs", + author = "", + price = 1.0, + copies = 1 ) streamingJob = launch { model.stream() } - delay(50) + delay(50.milliseconds) assertEquals(2, buffer.size) assertEquals(buffer["1"], finalBookStates["1"]) assertEquals(buffer["2"], finalBookStates["2"]) diff --git a/krescent-core/src/test/kotlin/dev/helight/krescent/situations/CheckpointingSituations.kt b/krescent-core/src/test/kotlin/dev/helight/krescent/situations/CheckpointingSituations.kt index 526e6ec..8eed20a 100644 --- a/krescent-core/src/test/kotlin/dev/helight/krescent/situations/CheckpointingSituations.kt +++ b/krescent-core/src/test/kotlin/dev/helight/krescent/situations/CheckpointingSituations.kt @@ -14,6 +14,7 @@ import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.assertThrows import kotlin.test.Test import kotlin.test.assertEquals +import kotlin.time.Duration.Companion.milliseconds class CheckpointingSituations { @@ -62,7 +63,7 @@ class CheckpointingSituations { } store.publish(bookstoreEventCatalog.create(BookAddedEvent("1", "", "", 1.0, 1))) store.publish(bookstoreEventCatalog.create(BookAddedEvent("1", "", "", 1.0, 1))) - delay(50) + delay(50.milliseconds) job.cancelAndJoin() store.publish(bookstoreEventCatalog.create(BookAddedEvent("1", "", "", 1.0, 1))) assertEquals(counter, 2) diff --git a/krescent-core/src/test/kotlin/dev/helight/krescent/supervisor/BalancedReadModelJobTest.kt b/krescent-core/src/test/kotlin/dev/helight/krescent/supervisor/BalancedReadModelJobTest.kt index c8f5ba8..dc11f2f 100644 --- a/krescent-core/src/test/kotlin/dev/helight/krescent/supervisor/BalancedReadModelJobTest.kt +++ b/krescent-core/src/test/kotlin/dev/helight/krescent/supervisor/BalancedReadModelJobTest.kt @@ -8,6 +8,7 @@ import org.junit.jupiter.api.assertThrows import java.util.concurrent.atomic.AtomicInteger import kotlin.test.Test import kotlin.test.assertEquals +import kotlin.time.Duration.Companion.milliseconds import kotlin.time.DurationUnit import kotlin.time.toDuration @@ -15,7 +16,7 @@ class BalancedReadModelJobTest { @Test fun `BalancedReadModelJob processes bookstore events`() = runBlocking { - withTimeout(1000) { + withTimeout(1000.milliseconds) { val source = InMemoryEventStore(bookstoreSimulatedEventStream.toMutableList()) val target = mutableMapOf() @@ -36,7 +37,7 @@ class BalancedReadModelJobTest { val supJob = async { supervisor.execute() } // Give the supervisor time to perform catchup and start streaming - delay(500) + delay(500.milliseconds) // Stop the supervisor and ensure it finished supJob.cancelAndJoin() @@ -49,7 +50,7 @@ class BalancedReadModelJobTest { @Test fun `BalancedReadModelJob panics after many crashes`(): Unit = runBlocking { - withTimeout(1000) { + withTimeout(1000.milliseconds) { val source = InMemoryEventStore(bookstoreSimulatedEventStream.toMutableList()) val target = mutableMapOf() diff --git a/krescent-core/src/test/kotlin/dev/helight/krescent/supervisor/ModelSupervisorTest.kt b/krescent-core/src/test/kotlin/dev/helight/krescent/supervisor/ModelSupervisorTest.kt index 61ebb09..ebb211f 100644 --- a/krescent-core/src/test/kotlin/dev/helight/krescent/supervisor/ModelSupervisorTest.kt +++ b/krescent-core/src/test/kotlin/dev/helight/krescent/supervisor/ModelSupervisorTest.kt @@ -1,20 +1,70 @@ package dev.helight.krescent.supervisor +import dev.helight.krescent.bookstore.BooksAvailableReadModel +import dev.helight.krescent.bookstore.bookstoreSimulatedEventStream +import dev.helight.krescent.event.logging.ConsoleLoggingEventStreamProcessor.Companion.useConsoleLogging +import dev.helight.krescent.model.EventModelBase.Extension.withConfiguration +import dev.helight.krescent.model.ReadModelBase +import dev.helight.krescent.source.impl.InMemoryEventStore +import dev.helight.krescent.source.impl.SimulatedDelayStreamingEventSource import kotlinx.coroutines.* import kotlinx.coroutines.sync.Mutex +import java.util.concurrent.atomic.AtomicInteger import kotlin.test.Test import kotlin.test.assertEquals +import kotlin.time.Duration.Companion.milliseconds class ModelSupervisorTest { + @Test + fun `Test the new awaiting mechanism`() = runBlocking { + val supervisor = ModelSupervisor() + val job = CrashingModelJob(timesToCrash = 1) + supervisor.register(job) + + val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default) + supervisor.launch(scope) + assertEquals(0, job.timesToCrash) + assertEquals(1, job.timesCompleted) + assertEquals(1, job.timesFailCaught) + scope.cancel() + } + + @Test + fun `Test with bookstore ReadModel`() = runBlocking { + val supervisor = ModelSupervisor() + val crashCount = AtomicInteger(1) + supervisor.register( + BalancedReadModelJob( + modelSupplier = { + BooksAvailableReadModel(crashCount = crashCount).withConfiguration { + useConsoleLogging() + } + }, + sourceSupplier = { + SimulatedDelayStreamingEventSource( + 100, 1, + InMemoryEventStore(bookstoreSimulatedEventStream.toMutableList()), + ) + + } + ) + ) + val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default) + supervisor.launch(scope) + val model = supervisor.modelOf()!! + assertEquals(9, model.target["1"]) + + } + @Test fun `Test restarting after a crash`() = runBlocking { - withTimeout(500) { + withTimeout(500.milliseconds) { val supervisor = ModelSupervisor() val job = CrashingModelJob(timesToCrash = 1) supervisor.register(job) val supervisorJob = async { supervisor.execute() } - delay(100) + delay(100.milliseconds) supervisorJob.cancelAndJoin() assertEquals(0, job.timesToCrash) assertEquals(1, job.timesCompleted) @@ -24,18 +74,18 @@ class ModelSupervisorTest { @Test fun `Test preconditions using a locking job`() = runBlocking { - withTimeout(1000) { + withTimeout(1000.milliseconds) { val supervisor = ModelSupervisor(1) val mutex = Mutex() val jobs = List(5) { MutexClaimingJob(mutex) } supervisor.register(jobs) val supervisorJob = async { supervisor.execute() } - delay(25) + delay(25.milliseconds) assertEquals(1, jobs.count { it.hasLocked }) assertEquals(0, jobs.count { it.hasCompleted }) - delay(50) + delay(50.milliseconds) assertEquals(1, jobs.count { it.hasCompleted }) - delay(300) + delay(300.milliseconds) supervisorJob.cancelAndJoin() assertEquals(5, jobs.count { it.hasCompleted }) assertEquals(5, jobs.count { it.hasLocked }) @@ -57,13 +107,16 @@ class CrashingModelJob( throw RuntimeException("Crash!") } timesCompleted++ - delay(1000) - println() + delay(1000.milliseconds) + println("Exit") } override suspend fun onFailed(supervisor: ModelSupervisor, error: Throwable) { timesFailCaught++ } + + override val current: ReadModelBase? + get() = null } class MutexClaimingJob( @@ -78,9 +131,12 @@ class MutexClaimingJob( override suspend fun run(supervisor: ModelSupervisor) { hasLocked = true - delay(50) + delay(50.milliseconds) mutex.unlock() hasCompleted = true - delay(1000) + delay(1000.milliseconds) } + + override val current: ReadModelBase? + get() = null } \ No newline at end of file diff --git a/krescent-core/src/test/kotlin/dev/helight/krescent/synchronization/LocalLockProviderTest.kt b/krescent-core/src/test/kotlin/dev/helight/krescent/synchronization/LocalLockProviderTest.kt index 83a26e8..9453c1e 100644 --- a/krescent-core/src/test/kotlin/dev/helight/krescent/synchronization/LocalLockProviderTest.kt +++ b/krescent-core/src/test/kotlin/dev/helight/krescent/synchronization/LocalLockProviderTest.kt @@ -8,6 +8,7 @@ import kotlinx.coroutines.runBlocking import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.assertTrue +import kotlin.time.Duration.Companion.milliseconds class LocalLockProviderTest : KrescentLockProviderContract { @@ -36,13 +37,13 @@ class LocalLockProviderTest : KrescentLockProviderContract { async { val lock = provider.getLock("test-identity") lock.runGuarded { - delay(50) + delay(50.milliseconds) } }, async { val lock = provider.getLock("another-identity") lock.runGuarded { - delay(50) + delay(50.milliseconds) } } ).awaitAll() @@ -50,7 +51,7 @@ class LocalLockProviderTest : KrescentLockProviderContract { assertTrue(timeAfter - timeStart < 95) System.gc() - delay(100) + delay(100.milliseconds) provider.cleanup() assertEquals(provider.locks.size, 1) diff --git a/krescent-core/src/test/kotlin/dev/helight/krescent/synchronization/LocalSharedLockProviderTest.kt b/krescent-core/src/test/kotlin/dev/helight/krescent/synchronization/LocalSharedLockProviderTest.kt index ba77b49..b9a4061 100644 --- a/krescent-core/src/test/kotlin/dev/helight/krescent/synchronization/LocalSharedLockProviderTest.kt +++ b/krescent-core/src/test/kotlin/dev/helight/krescent/synchronization/LocalSharedLockProviderTest.kt @@ -7,6 +7,7 @@ import kotlinx.coroutines.delay import kotlinx.coroutines.runBlocking import org.junit.jupiter.api.Test import kotlin.test.assertTrue +import kotlin.time.Duration.Companion.milliseconds class LocalSharedLockProviderTest { @@ -18,13 +19,13 @@ class LocalSharedLockProviderTest { async { val lock = provider.getLock("test-identity") lock.runGuarded { - delay(50) + delay(50.milliseconds) } }, async { val lock = provider.getLock("test-identity") lock.runGuarded { - delay(50) + delay(50.milliseconds) } } ).awaitAll() @@ -40,13 +41,13 @@ class LocalSharedLockProviderTest { async { val lock = provider.getLock("test-identity") lock.runGuarded { - delay(50) + delay(50.milliseconds) } }, async { val lock = provider.getLock("another-identity") lock.runGuarded { - delay(50) + delay(50.milliseconds) } } ).awaitAll() @@ -62,13 +63,13 @@ class LocalSharedLockProviderTest { async { val lock = provider.getMultiLock("a", "b") lock.runGuarded { - delay(50) + delay(50.milliseconds) } }, async { val lock = provider.getMultiLock("x", "y") lock.runGuarded { - delay(50) + delay(50.milliseconds) } } ).awaitAll()