Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@ build/
!**/src/test/**/build/

### IntelliJ IDEA ###
.idea/modules.xml
.idea/jarRepositories.xml
.idea/compiler.xml
.idea/libraries/
.idea/
*.iws
*.iml
*.ipr
Expand Down
12 changes: 0 additions & 12 deletions .idea/.gitignore

This file was deleted.

6 changes: 0 additions & 6 deletions .idea/AndroidProjectSystem.xml

This file was deleted.

6 changes: 0 additions & 6 deletions .idea/copilot.data.migration.agent.xml

This file was deleted.

6 changes: 0 additions & 6 deletions .idea/copilot.data.migration.edit.xml

This file was deleted.

17 changes: 0 additions & 17 deletions .idea/dataSources.xml

This file was deleted.

14 changes: 0 additions & 14 deletions .idea/dictionaries/project.xml

This file was deleted.

37 changes: 0 additions & 37 deletions .idea/gradle.xml

This file was deleted.

7 changes: 0 additions & 7 deletions .idea/kotlinc.xml

This file was deleted.

17 changes: 0 additions & 17 deletions .idea/misc.xml

This file was deleted.

6 changes: 0 additions & 6 deletions .idea/sqldialects.xml

This file was deleted.

6 changes: 0 additions & 6 deletions .idea/vcs.xml

This file was deleted.

8 changes: 4 additions & 4 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
[versions]
kotlin = "2.1.20"
kotlin = "2.3.0"
kotlinxDatetime = "0.6.1"
kotlinxSerialization = "1.7.3"
kotlinxCoroutines = "1.10.2"
kurrentClient = "1.0.1"
orgReactiveStream = "1.0.4"
mongoDriver = "5.5.0"
testContainers = "1.21.4"
exposed = "0.59.0"
postgresql = "42.7.3"
testContainers = "1.21.0"
exposed = "1.3.0"
postgresql = "42.7.11"
slf4j = "2.0.9"
redisson = "3.52.0"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ package dev.helight.krescent.exposed
import dev.helight.krescent.checkpoint.CheckpointBucket
import dev.helight.krescent.checkpoint.CheckpointStorage
import dev.helight.krescent.checkpoint.StoredCheckpoint
import org.jetbrains.exposed.sql.*
import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq
import kotlinx.datetime.toDeprecatedInstant
import kotlinx.datetime.toStdlibInstant
import org.jetbrains.exposed.v1.core.eq
import org.jetbrains.exposed.v1.jdbc.*
import kotlin.time.ExperimentalTime

class ExposedCheckpointStorage(
val database: Database,
Expand All @@ -15,23 +18,25 @@ class ExposedCheckpointStorage(
table.create(database)
}

@OptIn(ExperimentalTime::class)
override suspend fun storeCheckpoint(checkpoint: StoredCheckpoint): Unit = jdbcSuspendTransaction(database) {
table.upsert(keys = arrayOf(table.namespace)) {
it[namespace] = checkpoint.namespace
it[position] = checkpoint.position
it[timestamp] = checkpoint.timestamp
it[timestamp] = checkpoint.timestamp.toStdlibInstant()
it[version] = checkpoint.version
it[data] = checkpoint.data.encodeToByteArray()
}
}

@OptIn(ExperimentalTime::class)
override suspend fun getLatestCheckpoint(namespace: String): StoredCheckpoint? = jdbcSuspendTransaction(database) {
table.selectAll().where { table.namespace eq namespace }.firstOrNull()?.let {
StoredCheckpoint(
namespace = it[table.namespace],
position = it[table.position],
version = it[table.version],
timestamp = it[table.timestamp],
timestamp = it[table.timestamp].toDeprecatedInstant(),
data = CheckpointBucket.fromByteArray(it[table.data])
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,26 @@ package dev.helight.krescent.exposed

import dev.helight.krescent.event.EventMessage
import dev.helight.krescent.source.EventPublisher
import org.jetbrains.exposed.sql.Database
import org.jetbrains.exposed.sql.insert
import java.util.*
import kotlinx.datetime.toStdlibInstant
import org.jetbrains.exposed.v1.jdbc.Database
import org.jetbrains.exposed.v1.jdbc.insert
import kotlin.time.ExperimentalTime
import kotlin.uuid.ExperimentalUuidApi
import kotlin.uuid.Uuid

class ExposedEventPublisher(
val database: Database,
val streamId: String,
val table: KrescentEventLogTable = KrescentEventLogTable(),
) : EventPublisher {
@OptIn(ExperimentalTime::class)
@OptIn(ExperimentalTime::class, ExperimentalUuidApi::class)
override suspend fun publish(event: EventMessage) {
jdbcSuspendTransaction(database) {
table.insert {
it[uid] = UUID.fromString(event.id)
it[uid] = Uuid.parse(event.id)
it[streamId] = this@ExposedEventPublisher.streamId
it[type] = event.type
it[timestamp] = event.timestamp
it[timestamp] = event.timestamp.toStdlibInstant()
it[data] = event.payload
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,21 @@ import dev.helight.krescent.source.StoredEventSource
import dev.helight.krescent.source.StreamingToken
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.channelFlow
import org.jetbrains.exposed.sql.*
import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq
import org.jetbrains.exposed.sql.SqlExpressionBuilder.like
import org.jetbrains.exposed.sql.SqlExpressionBuilder.regexp
import org.jetbrains.exposed.sql.json.contains
import kotlinx.datetime.toDeprecatedInstant
import org.jetbrains.exposed.v1.core.ResultRow
import org.jetbrains.exposed.v1.core.SortOrder
import org.jetbrains.exposed.v1.core.eq
import org.jetbrains.exposed.v1.core.inList
import org.jetbrains.exposed.v1.core.like
import org.jetbrains.exposed.v1.core.regexp
import org.jetbrains.exposed.v1.jdbc.Database
import org.jetbrains.exposed.v1.jdbc.Query
import org.jetbrains.exposed.v1.jdbc.andWhere
import org.jetbrains.exposed.v1.jdbc.select
import org.jetbrains.exposed.v1.json.contains
import kotlin.math.min
import kotlin.time.ExperimentalTime
import kotlin.uuid.ExperimentalUuidApi

@OptIn(ExperimentalTime::class)
class ExposedEventSource(
Expand Down Expand Up @@ -67,12 +75,13 @@ class ExposedEventSource(
}
}

@OptIn(ExperimentalUuidApi::class)
private fun mapRowToPair(row: ResultRow): Pair<EventMessage, ExposedStreamingToken> {
val event = EventMessage(
id = row[table.uid].toString(),
type = row[table.type],
timestamp = row[table.timestamp],
payload = row[table.data],
timestamp = row[table.timestamp].toDeprecatedInstant(),
payload = row[table.data] ,
)
val token = ExposedStreamingToken.PositionToken(row[table.id].value)
return event to token
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package dev.helight.krescent.exposed

import dev.helight.krescent.source.StreamingToken
import org.jetbrains.exposed.sql.Query
import org.jetbrains.exposed.sql.andWhere
import org.jetbrains.exposed.sql.selectAll
import org.jetbrains.exposed.v1.core.greater
import org.jetbrains.exposed.v1.jdbc.Query
import org.jetbrains.exposed.v1.jdbc.andWhere
import org.jetbrains.exposed.v1.jdbc.selectAll

sealed class ExposedStreamingToken : StreamingToken<ExposedStreamingToken> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@ import kotlinx.serialization.Serializable
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.decodeFromJsonElement
import kotlinx.serialization.json.encodeToJsonElement
import org.jetbrains.exposed.sql.*
import org.jetbrains.exposed.v1.core.Table
import org.jetbrains.exposed.v1.core.Transaction
import org.jetbrains.exposed.v1.jdbc.Database
import org.jetbrains.exposed.v1.jdbc.SchemaUtils
import org.jetbrains.exposed.v1.jdbc.exists
import java.util.logging.Logger

class ExposedTableProjector(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,27 @@ package dev.helight.krescent.exposed

import kotlinx.serialization.json.Json
import kotlinx.serialization.json.JsonElement
import org.jetbrains.exposed.dao.id.LongIdTable
import org.jetbrains.exposed.sql.Database
import org.jetbrains.exposed.sql.SchemaUtils
import org.jetbrains.exposed.sql.Table
import org.jetbrains.exposed.sql.json.json
import org.jetbrains.exposed.sql.json.jsonb
import org.jetbrains.exposed.sql.kotlin.datetime.KotlinInstantColumnType
import org.jetbrains.exposed.sql.upsert
import org.jetbrains.exposed.v1.core.Table
import org.jetbrains.exposed.v1.core.dao.id.LongIdTable
import org.jetbrains.exposed.v1.core.eq
import org.jetbrains.exposed.v1.datetime.KotlinInstantColumnType
import org.jetbrains.exposed.v1.datetime.timestamp
import org.jetbrains.exposed.v1.jdbc.Database
import org.jetbrains.exposed.v1.jdbc.SchemaUtils
import org.jetbrains.exposed.v1.jdbc.select
import org.jetbrains.exposed.v1.jdbc.upsert
import org.jetbrains.exposed.v1.json.json
import org.jetbrains.exposed.v1.json.jsonb
import kotlin.time.ExperimentalTime
import kotlin.uuid.ExperimentalUuidApi

@OptIn(ExperimentalTime::class)
class KrescentEventLogTable(tableName: String = "krescent") : LongIdTable(tableName) {
@OptIn(ExperimentalUuidApi::class)
val uid = uuid("uuid").uniqueIndex()
val streamId = text("streamId").index()
val type = text("type").index()
val timestamp = registerColumn("timestamp", KotlinInstantColumnType()).index()
val timestamp = timestamp("timestamp").index()

val data = jsonb("data", {
Json.encodeToString(it)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package dev.helight.krescent.exposed

import kotlinx.coroutines.coroutineScope
import org.jetbrains.exposed.sql.Database
import org.jetbrains.exposed.sql.Transaction
import org.jetbrains.exposed.sql.transactions.TransactionManager
import org.jetbrains.exposed.sql.transactions.experimental.newSuspendedTransaction
import org.jetbrains.exposed.v1.core.Transaction
import org.jetbrains.exposed.v1.jdbc.Database
import org.jetbrains.exposed.v1.jdbc.transactions.TransactionManager
import org.jetbrains.exposed.v1.jdbc.transactions.suspendTransaction

internal suspend fun <T> jdbcSuspendTransaction(
database: Database,
Expand All @@ -16,7 +16,7 @@ internal suspend fun <T> jdbcSuspendTransaction(
?.takeIf { it.db == database }
?.let { return@coroutineScope statement.invoke(it) }

return@coroutineScope newSuspendedTransaction(db = database) {
statement.invoke(this@newSuspendedTransaction)
return@coroutineScope suspendTransaction(db = database) {
statement.invoke(this@suspendTransaction)
}
}
Loading
Loading