#181 Add support for DynamoDB as the storage of bookeeping, schemas, locks, journal and offset information#730
#181 Add support for DynamoDB as the storage of bookeeping, schemas, locks, journal and offset information#730
Conversation
…prove error handling in builders
WalkthroughThis pull request adds comprehensive DynamoDB support to Pramen's bookkeeping subsystem. It introduces DynamoDB-backed implementations for bookkeeping, offset management, journal, metadata storage, and distributed locking. Core traits are updated to extend Changes
Sequence DiagramsequenceDiagram
participant App as Application
participant BookConfig as BookkeeperConfig
participant Bookkeeper as Bookkeeper Factory
participant OffsetMgr as OffsetManagerDynamoDb
participant Journal as JournalDynamoDB
participant TokenLock as TokenLockFactoryDynamoDb
participant DynamoDB as AWS DynamoDB
App->>BookConfig: Load config with dynamoDbRegion
BookConfig->>BookConfig: Parse DynamoDB settings
App->>Bookkeeper: fromConfig(config)
Bookkeeper->>Bookkeeper: Check hasBookkeepingDynamoDb
Bookkeeper->>OffsetMgr: Build DynamoDB instance
OffsetMgr->>DynamoDB: DescribeTable (offsets)
DynamoDB-->>OffsetMgr: Table not found
OffsetMgr->>DynamoDB: CreateTable (offsets)
OffsetMgr->>DynamoDB: Poll until ACTIVE
DynamoDB-->>OffsetMgr: Table ready
Bookkeeper->>Journal: Build DynamoDB instance
Journal->>DynamoDB: DescribeTable (journal)
DynamoDB-->>Journal: Table not found
Journal->>DynamoDB: CreateTable (journal)
Journal->>DynamoDB: Poll until ACTIVE
DynamoDB-->>Journal: Table ready
Bookkeeper->>TokenLock: Build DynamoDB instance
TokenLock->>DynamoDB: DescribeTable (locks)
DynamoDB-->>TokenLock: Table not found
TokenLock->>DynamoDB: CreateTable (locks)
TokenLock->>DynamoDB: Poll until ACTIVE
DynamoDB-->>TokenLock: Table ready
Bookkeeper-->>App: Bookkeeper instance (all components initialized)
Note over OffsetMgr,TokenLock: All extend AutoCloseable
App->>Bookkeeper: close()
Bookkeeper->>OffsetMgr: close()
Bookkeeper->>Journal: close()
Bookkeeper->>TokenLock: close()
OffsetMgr->>DynamoDB: Close client
Journal->>DynamoDB: Close client
TokenLock->>DynamoDB: Close client
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Unit Test Coverage
Files
|
There was a problem hiding this comment.
Actionable comments posted: 15
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManager.scala (1)
36-92: 🛠️ Refactor suggestion | 🟠 MajorDon’t hide offset-manager cleanup behind a trait-level no-op.
OffsetManagerDynamoDbnow has real shutdown logic, whileOffsetManagerJdbcwill silently inherit this default. Keepingclose()abstract here—or forcing explicit no-op overrides where ownership is delegated—makes resource semantics intentional instead of accidental.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManager.scala` around lines 36 - 92, The trait OffsetManager currently provides a no-op default close() implementation which hides required cleanup; remove the default override def close(): Unit = {} from the OffsetManager trait so close() is abstract, and then implement explicit close() methods in concrete classes (e.g., OffsetManagerDynamoDb and OffsetManagerJdbc) to ensure resources are cleaned up or intentionally no-oped; update the implementations to compile by adding close() where missing.pramen/api/src/main/scala/za/co/absa/pramen/api/MetadataManager.scala (1)
21-74: 🛠️ Refactor suggestion | 🟠 MajorKeep
close()explicit on resource-owning implementations.Making
close()a concrete no-op here removes the compiler check that concrete managers must opt into their lifecycle semantics. Withpramen/core/src/main/scala/za/co/absa/pramen/core/metadata/MetadataManagerBase.scala:1-50sitting under both JDBC and DynamoDB managers, a missing override now silently degrades into “do nothing” instead of failing fast at compile time.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/api/src/main/scala/za/co/absa/pramen/api/MetadataManager.scala` around lines 21 - 74, The trait MetadataManager currently provides a concrete no-op close() which prevents the compiler from forcing resource-owning implementations to declare lifecycle behavior; change the signature in MetadataManager from "override def close(): Unit = {}" to an abstract "def close(): Unit" (i.e. remove the default body/override) so concrete classes must explicitly implement/override close; also update any implementations (e.g. classes extending MetadataManagerBase under JDBC/DynamoDB) that rely on the implicit no-op to add an explicit close implementation (even if it's a no-op) to make lifecycle intent explicit.pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scala (1)
236-243:⚠️ Potential issue | 🔴 CriticalAdd
bookkeeper.close()to the aggregate closer.The
closableblock at lines 236-243 should also callbookkeeper.close(). Concrete implementations likeBookkeeperDynamoDbandBookkeeperJdbchave meaningfulclose()logic that manages critical resources (DynamoDB client, database connections). Without this call, these resources will leak whenclosable.close()is invoked.Relevant code
val closable = new AutoCloseable { override def close(): Unit = { mongoDbConnection.foreach(_.close()) dbOpt.foreach(_.close()) tokenFactory.close() journal.close() metadataManager.close() bookkeeper.close() // Add this line } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scala` around lines 236 - 243, The closable AutoCloseable currently omits calling bookkeeper.close(), which leaks resources for implementations like BookkeeperDynamoDb and BookkeeperJdbc; update the anonymous AutoCloseable implementation (closable) to invoke bookkeeper.close() inside its close() method after closing mongoDbConnection and dbOpt and before/after tokenFactory.close(), journal.close(), and metadataManager.close(), ensuring bookkeeper's concrete resources are released.
🧹 Nitpick comments (5)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/metadata/MetadataManagerDynamoDbBuilderSuite.scala (1)
25-97: These tests only prove the fluent API compiles.Each positive case asserts
isInstanceOf[MetadataManagerDynamoDbBuilder], which the method signatures already guarantee. Because none of them callbuild(), malformed values like the ARN on Line 43 still pass and the default-prefix / endpoint / credentials wiring stays untested.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/core/src/test/scala/za/co/absa/pramen/core/tests/metadata/MetadataManagerDynamoDbBuilderSuite.scala` around lines 25 - 97, The tests only check that the fluent builder methods compile; fix them to exercise build() and assert resulting instance configuration: call MetadataManagerDynamoDb.builder, chain the relevant setters (withRegion, withTableArn, withTablePrefix, withCredentialsProvider, withEndpoint) then call build() and assert the built MetadataManagerDynamoDb (or its accessible config) contains the expected region, tableArn/tablePrefix, endpoint and credentials provider values; keep the existing negative test that intercepts build() without region but change the positive cases to call build() and verify actual wiring rather than isInstanceOf checks.pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scala (1)
105-112: Use the lock factory’s own default prefix constant here.Line 106 is building
TokenLockFactoryDynamoDbbut falls back toBookkeeperDynamoDb.DEFAULT_TABLE_PREFIX. If those defaults ever diverge, the lock backend will quietly start using a different default naming convention from the one defined by the lock implementation itself.Suggested fix
- val tablePrefix = bookkeepingConfig.dynamoDbTablePrefix.getOrElse(BookkeeperDynamoDb.DEFAULT_TABLE_PREFIX) + val tablePrefix = bookkeepingConfig.dynamoDbTablePrefix.getOrElse(TokenLockFactoryDynamoDb.DEFAULT_TABLE_PREFIX)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scala` around lines 105 - 112, The code currently falls back to BookkeeperDynamoDb.DEFAULT_TABLE_PREFIX when building TokenLockFactoryDynamoDb, which can diverge from the lock factory's own default; change the fallback to use TokenLockFactoryDynamoDb.DEFAULT_TABLE_PREFIX instead (i.e. replace the getOrElse(BookkeeperDynamoDb.DEFAULT_TABLE_PREFIX) usage on bookkeepingConfig.dynamoDbTablePrefix with getOrElse(TokenLockFactoryDynamoDb.DEFAULT_TABLE_PREFIX)) so the TokenLockFactoryDynamoDb builder uses its native default prefix.pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalDynamoDBBuilderSuite.scala (1)
25-97: These assertions don’t exercise the journal builder.The positive cases only check the static return type, so they won’t catch broken default-prefix wiring or malformed inputs such as the ARN on Line 43. Prefer one or two end-to-end
build()smoke tests and keep the negative validation case.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalDynamoDBBuilderSuite.scala` around lines 25 - 97, Replace the superficial type-only assertions with one or two end-to-end smoke tests that call JournalDynamoDB.builder...build() to verify real behavior: e.g. construct a builder via JournalDynamoDB.builder.withRegion(...) (and optionally withTableArn/withTablePrefix/withEndpoint/withCredentialsProvider), call build(), then assert the resulting JournalDynamoDB instance has the expected tablePrefix (default when not provided), correctly parsed tableArn handling, and any endpoint/credentials set; keep the existing negative test that intercepts IllegalArgumentException from builder.build() when region is missing. Use the builder methods (withRegion, withTableArn, withTablePrefix, withCredentialsProvider, withEndpoint) and the build() method to locate where to change tests.pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockDynamoDb.scala (2)
142-164: Consider adding owner verification in updateTicket.The
updateTicketmethod updates the expiration without verifying ownership. If another process has taken over the lock (e.g., after expiration), this could inadvertently extend the other process's lock. Adding a condition expression would make this safer:🛡️ Suggested improvement
val updateRequest = UpdateItemRequest.builder() .tableName(tableName) .key(Map( ATTR_TOKEN -> AttributeValue.builder().s(escapedToken).build() ).asJava) .updateExpression(s"SET $ATTR_EXPIRES = :expires") + .conditionExpression(s"$ATTR_OWNER = :owner") .expressionAttributeValues(Map( - ":expires" -> AttributeValue.builder().n(newTicket.toString).build() + ":expires" -> AttributeValue.builder().n(newTicket.toString).build(), + ":owner" -> AttributeValue.builder().s(owner).build() ).asJava) .build()This prevents a rare race condition where a lock expires, is taken by another process, and the original holder's watcher thread updates the expiration.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockDynamoDb.scala` around lines 142 - 164, The updateTicket method currently unconditionally sets ATTR_EXPIRES which can extend another process's lock; modify updateTicket (and the UpdateItemRequest built there) to include a condition expression that verifies ownership (e.g., compare ATTR_OWNER or the token holder attribute to the current owner/id) so the update only succeeds if the lock is still held by this process, and add handling for the conditional-failure case (do not treat it as success; log or ignore the ConditionalCheckFailedException) so the watcher doesn't silently extend someone else's lock; refer to updateTicket, getNewTicket, escapedToken, ATTR_TOKEN and ATTR_EXPIRES to locate the change.
178-185: Add null-safety check for required DynamoDB item attributes.The code assumes
ATTR_EXPIRESandATTR_OWNERwill always be present in the item. If the DynamoDB item is malformed (e.g., created by a different version or manually edited), accessing these attributes could throwNullPointerException.Consider adding defensive null checks:
🛡️ Proposed defensive fix
if (response.hasItem && !response.item().isEmpty) { val item = response.item() + val tokenAttr = item.get(ATTR_TOKEN) + val ownerAttr = item.get(ATTR_OWNER) + val expiresAttr = item.get(ATTR_EXPIRES) + + if (tokenAttr == null || ownerAttr == null || expiresAttr == null) { + log.warn(s"Malformed lock item for $escapedToken - missing required attributes") + return None + } + Some(LockTicket( - token = item.get(ATTR_TOKEN).s(), - owner = item.get(ATTR_OWNER).s(), - expires = item.get(ATTR_EXPIRES).n().toLong, + token = tokenAttr.s(), + owner = ownerAttr.s(), + expires = expiresAttr.n().toLong, createdAt = Option(item.get(ATTR_CREATED_AT)).map(_.n().toLong) ))🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockDynamoDb.scala` around lines 178 - 185, The DynamoDB item handling in TokenLockDynamoDb assumes ATTR_EXPIRES, ATTR_OWNER and ATTR_TOKEN are always present; add defensive null-safety checks before constructing the LockTicket: verify item.containsKey or wrap item.get(ATTR_...) in Option/nullable checks for ATTR_TOKEN, ATTR_OWNER and ATTR_EXPIRES and only construct Some(LockTicket(...)) if those required attributes are present and non-empty; treat ATTR_CREATED_AT as optional (keep current Option mapping), and if required attrs are missing log a warning and return None (or otherwise fail gracefully) instead of calling .s()/.n() on null.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pramen/core/pom.xml`:
- Around line 147-151: The Maven dependency for the AWS SDK DynamoDB
(groupId=software.amazon.awssdk, artifactId=dynamodb) is missing a scope and
defaults to compile, causing Maven builds to bundle the SDK; update that
dependency entry to include <scope>provided</scope> so it matches the SBT
Dependencies.scala Provided scope and prevents packaging the AWS SDK into the
artifact.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala`:
- Around line 495-499: The BookkeeperDynamoDb.deleteTable currently throws
UnsupportedOperationException which violates the Bookkeeper contract; implement
wildcard deletion inside BookkeeperDynamoDb.deleteTable by scanning the DynamoDB
bookkeeper table for items whose table/name attribute matches the provided
wildcard (convert wildcard to a prefix/regex or Dynamo-compatible
FilterExpression), paginate through Scan results, delete matching items using
DeleteItem or BatchWriteItem for efficiency, collect and return the deleted item
names as Seq[String], and ensure proper error handling and logging;
alternatively, if you prefer to disallow wildcards for DynamoDB, move this check
to the BookkeeperDynamoDb constructor/factory and throw a configuration-time
IllegalArgumentException with a clear message so callers fail fast.
- Around line 66-68: The init() method currently creates/checks tables using
bookkeepingTableBaseName and schemaTableBaseName which mismatches the resolved
names used by all CRUDs; update init() to use the resolved private vals
bookkeepingTableName and schemaTableName instead (replace usages of
bookkeepingTableBaseName and schemaTableBaseName in init()), ensuring any calls
that pass names into ensureTable/describe/createTable use bookkeepingTableName
and schemaTableName so the created tables match the identifiers used by the CRUD
methods.
- Around line 141-165: The createBookkeepingTable function currently builds a
DynamoDB key schema with ATTR_TABLE_NAME (HASH) and ATTR_INFO_DATE (RANGE),
which enforces one item per table/date and causes overwrites; modify
createBookkeepingTable to include ATTR_BATCH_ID as an additional sort key
component (or change the sort key to a composite that includes ATTR_INFO_DATE
and ATTR_BATCH_ID) so the primary key uniquely identifies (tableName, infoDate,
batchId). Update attributeDefinitions to include ATTR_BATCH_ID with
ScalarAttributeType.S and adjust any code that assumes the old two-part key
accordingly (references: createBookkeepingTable, ATTR_TABLE_NAME,
ATTR_INFO_DATE, ATTR_BATCH_ID).
- Around line 838-848: The getFullTableName method builds incorrect ARNs by
checking "/table/" and appending "table/" when the ARN already ends with a
slash; update the logic in getFullTableName to use the DynamoDB standard token
":table/" (not "/table/"), and handle three cases: if arn contains ":table/" and
endsWith("/") then return arn + tableName (append only the name), if arn
contains ":table/" and does not endWith("/") then assume it already includes the
table name and return arn as-is, otherwise append ":table/" + tableName to the
provided arn; update the matching branches in getFullTableName accordingly to
avoid producing "table/table" sequences.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerDynamoDb.scala`:
- Around line 123-126: The composite primary key built in startWriteOffsets uses
createdAt.toEpochMilli (createdAtMilli) which can collide for concurrent writes;
change the key generation so it cannot repeat within the same millisecond—for
example, stop reducing Instant to millis and instead incorporate the full
Instant string (createdAt.toString which includes nanoseconds) or append a short
random/UUID suffix to compositeKey (e.g., use createdAt and a UUID) so
compositeKey is unique for concurrent calls to startWriteOffsets and prevents
silent overwrites/races.
- Around line 301-317: getOffsetRecords and getMaximumInfoDate currently query
DynamoDB without handling pagination and use a filterExpression on
ATTR_INFO_DATE, which can miss results when a partition >1MB; update both
methods (getOffsetRecords, getMaximumInfoDate) to iterate pages using the
QueryResponse.lastEvaluatedKey and repeatedly call dynamoDbClient.query until
lastEvaluatedKey is empty, accumulating result.items(); also change the query
keyConditionExpression to use begins_with on the composite sort key (e.g.,
compositeKey begins_with "<infoDate>#") instead of filtering by ATTR_INFO_DATE
to limit data scanned (use expressionAttributeValues for the begins_with prefix)
and remove the post-query filter. Ensure QueryRequest.builder() is built
per-page with ExclusiveStartKey when present and that itemToOffsetRecord is
applied to all accumulated items.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalDynamoDB.scala`:
- Around line 78-80: The current JournalDynamoDB primary key uses
(ATTR_JOB_NAME, ATTR_FINISHED_AT) causing collisions and preventing efficient
time-range reads; change the write path that builds items (where itemBuilder is
populated and putItem is called) to include a unique per-entry identifier (e.g.,
UUID or nano-timestamp) as a new attribute and include that identifier in the
primary key so simultaneous finishes do not overwrite each other; add a Global
Secondary Index on ATTR_FINISHED_AT (or a numeric finishedAt attribute) so
getEntries can perform a Query over finishedAt ranges instead of a full table
Scan; where getEntries currently performs an unpaginated ScanRequest, replace it
with either a paginated Scan loop that uses ExclusiveStartKey/LastEvaluatedKey
to fetch all pages or — preferably — change getEntries to Query the new GSI with
KeyConditionExpression for the finishedAt range and handle Query pagination
similarly.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scala`:
- Around line 51-53: TokenLockFactoryDynamoDb builds locksTableName via
BookkeeperDynamoDb.getFullTableName(tableArn, locksTableBaseName) but subsequent
DynamoDB calls still use locksTableBaseName, which breaks cross-account/region
behavior; update all usages in TokenLockFactoryDynamoDb (including table
creation/reads/writes around the init/ensure block and the sections currently
referencing locksTableBaseName at lines ~58–60 and ~83–85) to use locksTableName
instead so that all table operations consistently use the resolved identifier
returned by BookkeeperDynamoDb.getFullTableName.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/metadata/MetadataManagerDynamoDb.scala`:
- Around line 83-102: getMetadataFromStorage and deleteMetadataFromStorage
currently run a single QueryRequest and will miss items if DynamoDB returns a
paginated result; update both methods in MetadataManagerDynamoDb to loop on
QueryResponse.lastEvaluatedKey (use ExclusiveStartKey on subsequent
QueryRequest) until null, aggregating all items before building the final Map in
getMetadataFromStorage; for deleteMetadataFromStorage, collect all keys across
pages and perform deletes for every page (or aggregate and use BatchWriteItem in
batches of 25) until lastEvaluatedKey is empty, ensuring you still read
ATTR_COMPOSITE_KEY, ATTR_METADATA_KEY, ATTR_METADATA_VALUE, ATTR_LAST_UPDATED
and use the same AttributeValue shaping as the existing QueryRequest/Item
deletions.
In `@pramen/examples/dynamodb_bookkeeping/dynamodb_with_locks.conf`:
- Around line 125-132: The IAM policy Resource array is missing several
Pramen-created tables; update the Resource list (the commented ARN entries like
"arn:aws:dynamodb:*:*:table/pramen_production_bookkeeping",
"pramen_production_schemas", "pramen_production_locks") to also include the
journal, metadata and offsets tables (e.g., add
"arn:aws:dynamodb:*:*:table/pramen_production_journal",
"arn:aws:dynamodb:*:*:table/pramen_production_metadata",
"arn:aws:dynamodb:*:*:table/pramen_production_offsets") or replace with a safe
prefix/wildcard such as "arn:aws:dynamodb:*:*:table/pramen_production_*" so all
tables created by Pramen are covered and permission errors are avoided.
In `@pramen/examples/dynamodb_bookkeeping/README.md`:
- Line 437: The README references a non-existent example file
TokenLockFactoryDynamoDbExample.scala; update the link to point to the actual
implementation TokenLockFactoryDynamoDb.scala (or remove the reference
entirely). Open pramen/examples/dynamodb_bookkeeping/README.md, find the line
mentioning
core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDbExample.scala
and replace it with
core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scala
(or delete the sentence if no example reference is desired) so the documentation
points to an existing symbol/class.
In `@pramen/project/Versions.scala`:
- Line 40: The awsSdkVersion constant in Versions.scala is set to a non-existent
Maven version; update the value of awsSdkVersion to a valid released AWS SDK v2
artifact (e.g., "2.34.0" or a later available version) and ensure any places
that reference Versions.awsSdkVersion (dependency declarations or build files)
will pick up the new value so dependency resolution succeeds.
In `@README.md`:
- Around line 2592-2598: The Maven snippet uses the placeholder
${aws.sdk.version} which is defined only in this repo's POM and will break if
users copy-paste; update the README dependency block by replacing
${aws.sdk.version} with the actual AWS SDK version used in this project (or
alternatively add the corresponding <properties> block showing aws.sdk.version)
so the example is copy-pasteable; refer to the dependency XML fragment and the
${aws.sdk.version} token when making the change.
---
Outside diff comments:
In `@pramen/api/src/main/scala/za/co/absa/pramen/api/MetadataManager.scala`:
- Around line 21-74: The trait MetadataManager currently provides a concrete
no-op close() which prevents the compiler from forcing resource-owning
implementations to declare lifecycle behavior; change the signature in
MetadataManager from "override def close(): Unit = {}" to an abstract "def
close(): Unit" (i.e. remove the default body/override) so concrete classes must
explicitly implement/override close; also update any implementations (e.g.
classes extending MetadataManagerBase under JDBC/DynamoDB) that rely on the
implicit no-op to add an explicit close implementation (even if it's a no-op) to
make lifecycle intent explicit.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scala`:
- Around line 236-243: The closable AutoCloseable currently omits calling
bookkeeper.close(), which leaks resources for implementations like
BookkeeperDynamoDb and BookkeeperJdbc; update the anonymous AutoCloseable
implementation (closable) to invoke bookkeeper.close() inside its close() method
after closing mongoDbConnection and dbOpt and before/after tokenFactory.close(),
journal.close(), and metadataManager.close(), ensuring bookkeeper's concrete
resources are released.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManager.scala`:
- Around line 36-92: The trait OffsetManager currently provides a no-op default
close() implementation which hides required cleanup; remove the default override
def close(): Unit = {} from the OffsetManager trait so close() is abstract, and
then implement explicit close() methods in concrete classes (e.g.,
OffsetManagerDynamoDb and OffsetManagerJdbc) to ensure resources are cleaned up
or intentionally no-oped; update the implementations to compile by adding
close() where missing.
---
Nitpick comments:
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scala`:
- Around line 105-112: The code currently falls back to
BookkeeperDynamoDb.DEFAULT_TABLE_PREFIX when building TokenLockFactoryDynamoDb,
which can diverge from the lock factory's own default; change the fallback to
use TokenLockFactoryDynamoDb.DEFAULT_TABLE_PREFIX instead (i.e. replace the
getOrElse(BookkeeperDynamoDb.DEFAULT_TABLE_PREFIX) usage on
bookkeepingConfig.dynamoDbTablePrefix with
getOrElse(TokenLockFactoryDynamoDb.DEFAULT_TABLE_PREFIX)) so the
TokenLockFactoryDynamoDb builder uses its native default prefix.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockDynamoDb.scala`:
- Around line 142-164: The updateTicket method currently unconditionally sets
ATTR_EXPIRES which can extend another process's lock; modify updateTicket (and
the UpdateItemRequest built there) to include a condition expression that
verifies ownership (e.g., compare ATTR_OWNER or the token holder attribute to
the current owner/id) so the update only succeeds if the lock is still held by
this process, and add handling for the conditional-failure case (do not treat it
as success; log or ignore the ConditionalCheckFailedException) so the watcher
doesn't silently extend someone else's lock; refer to updateTicket,
getNewTicket, escapedToken, ATTR_TOKEN and ATTR_EXPIRES to locate the change.
- Around line 178-185: The DynamoDB item handling in TokenLockDynamoDb assumes
ATTR_EXPIRES, ATTR_OWNER and ATTR_TOKEN are always present; add defensive
null-safety checks before constructing the LockTicket: verify item.containsKey
or wrap item.get(ATTR_...) in Option/nullable checks for ATTR_TOKEN, ATTR_OWNER
and ATTR_EXPIRES and only construct Some(LockTicket(...)) if those required
attributes are present and non-empty; treat ATTR_CREATED_AT as optional (keep
current Option mapping), and if required attrs are missing log a warning and
return None (or otherwise fail gracefully) instead of calling .s()/.n() on null.
In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalDynamoDBBuilderSuite.scala`:
- Around line 25-97: Replace the superficial type-only assertions with one or
two end-to-end smoke tests that call JournalDynamoDB.builder...build() to verify
real behavior: e.g. construct a builder via
JournalDynamoDB.builder.withRegion(...) (and optionally
withTableArn/withTablePrefix/withEndpoint/withCredentialsProvider), call
build(), then assert the resulting JournalDynamoDB instance has the expected
tablePrefix (default when not provided), correctly parsed tableArn handling, and
any endpoint/credentials set; keep the existing negative test that intercepts
IllegalArgumentException from builder.build() when region is missing. Use the
builder methods (withRegion, withTableArn, withTablePrefix,
withCredentialsProvider, withEndpoint) and the build() method to locate where to
change tests.
In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/tests/metadata/MetadataManagerDynamoDbBuilderSuite.scala`:
- Around line 25-97: The tests only check that the fluent builder methods
compile; fix them to exercise build() and assert resulting instance
configuration: call MetadataManagerDynamoDb.builder, chain the relevant setters
(withRegion, withTableArn, withTablePrefix, withCredentialsProvider,
withEndpoint) then call build() and assert the built MetadataManagerDynamoDb (or
its accessible config) contains the expected region, tableArn/tablePrefix,
endpoint and credentials provider values; keep the existing negative test that
intercepts build() without region but change the positive cases to call build()
and verify actual wiring rather than isInstanceOf checks.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: a93de128-e535-4c03-9bcd-8d89e08cbee7
📒 Files selected for processing (30)
README.mdpramen/api/src/main/scala/za/co/absa/pramen/api/MetadataManager.scalapramen/api/src/main/scala/za/co/absa/pramen/api/lock/TokenLockFactory.scalapramen/core/pom.xmlpramen/core/src/main/scala/za/co/absa/pramen/core/app/AppContext.scalapramen/core/src/main/scala/za/co/absa/pramen/core/app/config/BookkeeperConfig.scalapramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/Bookkeeper.scalapramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scalapramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManager.scalapramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerCached.scalapramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/OffsetManagerDynamoDb.scalapramen/core/src/main/scala/za/co/absa/pramen/core/journal/Journal.scalapramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalDynamoDB.scalapramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockDynamoDb.scalapramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scalapramen/core/src/main/scala/za/co/absa/pramen/core/metadata/MetadataManagerDynamoDb.scalapramen/core/src/test/scala/za/co/absa/pramen/core/BookkeepingConfigFactory.scalapramen/core/src/test/scala/za/co/absa/pramen/core/model/QueryBuilderSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDynamoDbBuilderSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/OffsetManagerDynamoDbBuilderSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/JournalDynamoDBBuilderSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/lock/TokenLockFactoryDynamoDbBuilderSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/metadata/MetadataManagerDynamoDbBuilderSuite.scalapramen/examples/dynamodb_bookkeeping/README.mdpramen/examples/dynamodb_bookkeeping/dynamodb_bookkeeping.confpramen/examples/dynamodb_bookkeeping/dynamodb_with_locks.confpramen/extras/src/main/scala/za/co/absa/pramen/extras/notification/mq/SingleMessageProducer.scalapramen/pom.xmlpramen/project/Dependencies.scalapramen/project/Versions.scala
| <!-- AWS SDK for DynamoDB --> | ||
| <dependency> | ||
| <groupId>software.amazon.awssdk</groupId> | ||
| <artifactId>dynamodb</artifactId> | ||
| </dependency> |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check if parent pom.xml defines scope for DynamoDB in dependencyManagement
echo "=== Check parent pom.xml for DynamoDB dependency management ==="
fd -t f "pom.xml" --exec grep -l "dependencyManagement" {} \; | head -5 | xargs -I {} sh -c 'echo "=== {} ===" && grep -A5 "dynamodb" {}'Repository: AbsaOSS/pramen
Length of output: 769
🏁 Script executed:
# Find and inspect Dependencies.scala
fd -t f "Dependencies.scala" -x cat -n {}Repository: AbsaOSS/pramen
Length of output: 4271
🏁 Script executed:
# Check parent pom.xml for dependencyManagement section and AWS SDK scopes
fd -t f "pom.xml" -path "*/pramen/pom.xml" -x head -200 {} | grep -A 100 "dependencyManagement"Repository: AbsaOSS/pramen
Length of output: 226
🏁 Script executed:
# Search for scope definitions in pom files for AWS SDK dependencies
rg "aws.*sdk|scope>" -A 1 -B 1 --type xmlRepository: AbsaOSS/pramen
Length of output: 4754
Add <scope>provided</scope> to align Maven and SBT configurations.
In Dependencies.scala, DynamoDB is scoped as Provided (line 32), matching other framework dependencies like Spark. However, this Maven dependency lacks an explicit scope, defaulting to compile. This causes Maven-built artifacts to bundle AWS SDK while SBT-built ones do not. Add the scope to align configurations:
Suggested fix
<!-- AWS SDK for DynamoDB -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>dynamodb</artifactId>
+ <scope>provided</scope>
</dependency>📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| <!-- AWS SDK for DynamoDB --> | |
| <dependency> | |
| <groupId>software.amazon.awssdk</groupId> | |
| <artifactId>dynamodb</artifactId> | |
| </dependency> | |
| <!-- AWS SDK for DynamoDB --> | |
| <dependency> | |
| <groupId>software.amazon.awssdk</groupId> | |
| <artifactId>dynamodb</artifactId> | |
| <scope>provided</scope> | |
| </dependency> |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pramen/core/pom.xml` around lines 147 - 151, The Maven dependency for the AWS
SDK DynamoDB (groupId=software.amazon.awssdk, artifactId=dynamodb) is missing a
scope and defaults to compile, causing Maven builds to bundle the SDK; update
that dependency entry to include <scope>provided</scope> so it matches the SBT
Dependencies.scala Provided scope and prevents packaging the AWS SDK into the
artifact.
| // Full table names/ARNs | ||
| private val bookkeepingTableName = getFullTableName(tableArn, bookkeepingTableBaseName) | ||
| private val schemaTableName = getFullTableName(tableArn, schemaTableBaseName) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
head -120 pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala | tail -70Repository: AbsaOSS/pramen
Length of output: 2718
🏁 Script executed:
sed -n '60,110p' pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scalaRepository: AbsaOSS/pramen
Length of output: 2165
🏁 Script executed:
sed -n '838,850p' pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scalaRepository: AbsaOSS/pramen
Length of output: 549
🏁 Script executed:
grep -n "bookkeepingTableName\|schemaTableName" pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala | head -50Repository: AbsaOSS/pramen
Length of output: 938
🏁 Script executed:
sed -n '130,180p' pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scalaRepository: AbsaOSS/pramen
Length of output: 1583
Initialize using the resolved table names that CRUD operations use.
Lines 67-68 define bookkeepingTableName and schemaTableName by resolving base names through getFullTableName(tableArn, ...). However, the init() method at lines 89-104 checks and creates tables using only the base names. All CRUD operations (lines 252, 296, 328, 425, 445, 464, 486, 506, 543, 617) use the resolved names. When tableArn is provided, these are different identifiers—initialization will set up the wrong tables while read/write operations target entirely different ones.
Update the init() method to use bookkeepingTableName and schemaTableName instead of the base names.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala`
around lines 66 - 68, The init() method currently creates/checks tables using
bookkeepingTableBaseName and schemaTableBaseName which mismatches the resolved
names used by all CRUDs; update init() to use the resolved private vals
bookkeepingTableName and schemaTableName instead (replace usages of
bookkeepingTableBaseName and schemaTableBaseName in init()), ensuring any calls
that pass names into ensureTable/describe/createTable use bookkeepingTableName
and schemaTableName so the created tables match the identifiers used by the CRUD
methods.
| private def createBookkeepingTable(tableName: String): Unit = { | ||
| val createTableRequest = CreateTableRequest.builder() | ||
| .tableName(tableName) | ||
| .keySchema( | ||
| KeySchemaElement.builder() | ||
| .attributeName(ATTR_TABLE_NAME) | ||
| .keyType(KeyType.HASH) | ||
| .build(), | ||
| KeySchemaElement.builder() | ||
| .attributeName(ATTR_INFO_DATE) | ||
| .keyType(KeyType.RANGE) | ||
| .build() | ||
| ) | ||
| .attributeDefinitions( | ||
| AttributeDefinition.builder() | ||
| .attributeName(ATTR_TABLE_NAME) | ||
| .attributeType(ScalarAttributeType.S) | ||
| .build(), | ||
| AttributeDefinition.builder() | ||
| .attributeName(ATTR_INFO_DATE) | ||
| .attributeType(ScalarAttributeType.S) | ||
| .build() | ||
| ) | ||
| .billingMode(BillingMode.PAY_PER_REQUEST) // On-demand billing | ||
| .build() |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
find . -name "BookkeeperDynamoDb.scala" -type fRepository: AbsaOSS/pramen
Length of output: 145
🏁 Script executed:
find . -path "*bookkeeper*" -name "*.scala" -type f | head -20Repository: AbsaOSS/pramen
Length of output: 1826
🏁 Script executed:
wc -l ./pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scalaRepository: AbsaOSS/pramen
Length of output: 149
🏁 Script executed:
sed -n '141,165p' ./pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scalaRepository: AbsaOSS/pramen
Length of output: 909
🏁 Script executed:
sed -n '327,355p' ./pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scalaRepository: AbsaOSS/pramen
Length of output: 1206
🏁 Script executed:
sed -n '349,352p' ./pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scalaRepository: AbsaOSS/pramen
Length of output: 177
🏁 Script executed:
sed -n '457,483p' ./pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scalaRepository: AbsaOSS/pramen
Length of output: 1187
🏁 Script executed:
sed -n '141,180p' ./pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scalaRepository: AbsaOSS/pramen
Length of output: 1321
🏁 Script executed:
rg -n "def putItem|putItem.*=" ./pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala -A 30 | head -80Repository: AbsaOSS/pramen
Length of output: 40
🏁 Script executed:
rg -n "def saveChunk|def save|putItem\|PutItem" ./pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala -B 2 -A 20 | head -100Repository: AbsaOSS/pramen
Length of output: 1693
🏁 Script executed:
rg -n "ATTR_BATCH_ID|ATTR_JOB_FINISHED|ATTR_" ./pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala | head -30Repository: AbsaOSS/pramen
Length of output: 2115
🏁 Script executed:
sed -n '1,50p' ./pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scalaRepository: AbsaOSS/pramen
Length of output: 2042
🏁 Script executed:
rg -n "dataChunkToItem|itemToDataChunk|ATTR_BATCH_ID\|ATTR_JOB_FINISHED" ./pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala -B 2 -A 10 | head -120Repository: AbsaOSS/pramen
Length of output: 3563
🏁 Script executed:
sed -n '586,605p' ./pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scalaRepository: AbsaOSS/pramen
Length of output: 1283
🏁 Script executed:
sed -n '409,432p' ./pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scalaRepository: AbsaOSS/pramen
Length of output: 858
Add a uniqueness attribute to the key schema to support multiple chunks per day.
The current composite key (tableName, infoDate) enforces one item per day, but the code at lines 327-355 and 457-483 assumes multiple chunks (differentiated by batchId or jobFinished) can coexist for the same table/date. Every second putItem() call for the same table/date overwrites the previous chunk, causing immediate data loss during reruns and preventing cleanup from removing stale batches.
Suggested fix: Add ATTR_BATCH_ID to the key schema as a range key component (or create a composite sort key), so the primary key becomes (tableName, infoDate, batchId). Alternatively, use a different uniqueness strategy such as a timestamp-based sort key component.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala`
around lines 141 - 165, The createBookkeepingTable function currently builds a
DynamoDB key schema with ATTR_TABLE_NAME (HASH) and ATTR_INFO_DATE (RANGE),
which enforces one item per table/date and causes overwrites; modify
createBookkeepingTable to include ATTR_BATCH_ID as an additional sort key
component (or change the sort key to a composite that includes ATTR_INFO_DATE
and ATTR_BATCH_ID) so the primary key uniquely identifies (tableName, infoDate,
batchId). Update attributeDefinitions to include ATTR_BATCH_ID with
ScalarAttributeType.S and adjust any code that assumes the old two-part key
accordingly (references: createBookkeepingTable, ATTR_TABLE_NAME,
ATTR_INFO_DATE, ATTR_BATCH_ID).
| override def deleteTable(tableWithWildcard: String): Seq[String] = { | ||
| // DynamoDB implementation for wildcard deletion | ||
| // This would require scanning and deleting matching items | ||
| throw new UnsupportedOperationException("deleteTable with wildcards is not yet implemented for DynamoDB bookkeeper") | ||
| } |
There was a problem hiding this comment.
deleteTable still breaks the Bookkeeper contract in Dynamo mode.
The Bookkeeper interface exposes this as a supported operation, so throwing UnsupportedOperationException here will break callers as soon as they switch backends. This needs a DynamoDB implementation or an earlier configuration-time rejection.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala`
around lines 495 - 499, The BookkeeperDynamoDb.deleteTable currently throws
UnsupportedOperationException which violates the Bookkeeper contract; implement
wildcard deletion inside BookkeeperDynamoDb.deleteTable by scanning the DynamoDB
bookkeeper table for items whose table/name attribute matches the provided
wildcard (convert wildcard to a prefix/regex or Dynamo-compatible
FilterExpression), paginate through Scan results, delete matching items using
DeleteItem or BatchWriteItem for efficiency, collect and return the deleted item
names as Seq[String], and ensure proper error handling and logging;
alternatively, if you prefer to disallow wildcards for DynamoDB, move this check
to the BookkeeperDynamoDb constructor/factory and throw a configuration-time
IllegalArgumentException with a clear message so callers fail fast.
| def getFullTableName(tableArn: Option[String], tableName: String): String = { | ||
| tableArn match { | ||
| case Some(arn) if arn.nonEmpty => | ||
| // If ARN ends with table/, append the table name, otherwise append /table/tableName | ||
| if (arn.endsWith("/")) { | ||
| s"${arn}table/$tableName" | ||
| } else if (arn.contains("/table/")) { | ||
| arn // ARN already includes table path | ||
| } else { | ||
| s"$arn/table/$tableName" | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
find . -name "BookkeeperDynamoDb.scala" -type fRepository: AbsaOSS/pramen
Length of output: 145
🏁 Script executed:
find . -name "*test*" -o -name "*Test*" | grep -i dynamo | head -20Repository: AbsaOSS/pramen
Length of output: 40
🏁 Script executed:
cd pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper && head -n 860 BookkeeperDynamoDb.scala | tail -n 50Repository: AbsaOSS/pramen
Length of output: 1549
🌐 Web query:
DynamoDB ARN format specification AWS
💡 Result:
The ARN format specification for DynamoDB resources follows the general AWS ARN structure: arn:partition:dynamodb:region:account-id:resource-type/resource-id. Key DynamoDB resource types and their formats: - Table: arn:{Partition}:dynamodb:{Region}:{Account}:table/{TableName} Example: arn:aws:dynamodb:us-east-1:123456789012:table/MusicCollection - Stream: arn:{Partition}:dynamodb:{Region}:{Account}:table/{TableName}/stream/{StreamLabel} Example: arn:aws:dynamodb:us-east-1:123456789012:table/MusicCollection/stream/2024-02-12T18:57:26.492 StreamLabel is a timestamp in ISO 8601 format (YYYY-MM-DDTHH:MM:SS.mmm). - Index: arn:{Partition}:dynamodb:{Region}:{Account}:table/{TableName}/index/{IndexName} - Backup: arn:{Partition}:dynamodb:{Region}:{Account}:table/{TableName}/backup/{BackupName} - Global Table: Supported via AWS::DynamoDB::GlobalTable in CloudFormation, follows similar table ARN pattern. Partition is typically 'aws' (standard), 'aws-cn' (China), or 'aws-us-gov' (GovCloud). Region, account-id (12-digit without hyphens), and resource identifiers are specific to the resource. These formats are used in IAM policies, API calls, and resource references. Examples appear consistently across AWS docs, CloudFormation, IAM policies, and API references.
Citations:
- 1: https://docs.aws.amazon.com/AWSCloudFormation/latest/TemplateReference/aws-resource-dynamodb-table.html
- 2: https://docs.aws.amazon.com/IAM/latest/UserGuide/reference-arns.html
- 3: https://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html
- 4: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/rbac-examples.html
- 5: https://docs.aws.amazon.com/r53recovery/latest/dg/recovery-readiness.resource-types-arns.html
- 6: https://docs.aws.amazon.com/service-authorization/latest/reference/list_amazondynamodb.html
- 7: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html
🏁 Script executed:
find . -path "*test*" -name "*BookkeeperDynamoDb*" -o -path "*test*" -name "*Bookkeeper*Dynamo*"Repository: AbsaOSS/pramen
Length of output: 163
🏁 Script executed:
rg "getFullTableName" -nRepository: AbsaOSS/pramen
Length of output: 8271
🏁 Script executed:
rg "tableArn" -n -A 3 -B 3 | head -100Repository: AbsaOSS/pramen
Length of output: 11519
🏁 Script executed:
sed -n '130,155p' pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDynamoDbBuilderSuite.scalaRepository: AbsaOSS/pramen
Length of output: 805
🏁 Script executed:
cat pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDynamoDbBuilderSuite.scala | sed -n '130,160p'Repository: AbsaOSS/pramen
Length of output: 805
🏁 Script executed:
rg "locksTableName|journalTableFullName|metadataTableFullName|bookkeepingTableName|schemaTableName" -A 5 -B 2 | head -150Repository: AbsaOSS/pramen
Length of output: 17556
🏁 Script executed:
sed -n '100,120p' pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalDynamoDB.scalaRepository: AbsaOSS/pramen
Length of output: 1141
🏁 Script executed:
sed -n '45,75p' pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scalaRepository: AbsaOSS/pramen
Length of output: 851
getFullTableName builds invalid DynamoDB ARNs.
The method's logic is flawed for handling DynamoDB ARN prefixes. When input ends with / (e.g., arn:aws:dynamodb:us-east-1:123456789012:table/), it appends table/ producing the malformed result arn:aws:dynamodb:us-east-1:123456789012:table/table/<name>. Additionally, the check for /table/ substring does not align with standard DynamoDB ARN format which uses :table/ (colon-slash), making the logic fragile and error-prone.
This breaks ARN-based configuration for every manager (MetadataManagerDynamoDb, TokenLockFactoryDynamoDb, JournalDynamoDB, OffsetManagerDynamoDb) that depends on this helper to construct valid table identifiers for DynamoDB API calls.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDynamoDb.scala`
around lines 838 - 848, The getFullTableName method builds incorrect ARNs by
checking "/table/" and appending "table/" when the ARN already ends with a
slash; update the logic in getFullTableName to use the DynamoDB standard token
":table/" (not "/table/"), and handle three cases: if arn contains ":table/" and
endsWith("/") then return arn + tableName (append only the name), if arn
contains ":table/" and does not endWith("/") then assume it already includes the
table name and return arn as-is, otherwise append ":table/" + tableName to the
provided arn; update the matching branches in getFullTableName accordingly to
avoid producing "table/table" sequences.
| override def getMetadataFromStorage(tableName: String, infoDate: LocalDate): Map[String, MetadataValue] = { | ||
| try { | ||
| val compositeKey = s"$tableName#$infoDate" | ||
|
|
||
| val queryRequest = QueryRequest.builder() | ||
| .tableName(metadataTableFullName) | ||
| .keyConditionExpression(s"${MetadataManagerDynamoDb.ATTR_COMPOSITE_KEY} = :composite_key") | ||
| .expressionAttributeValues(Map( | ||
| ":composite_key" -> AttributeValue.builder().s(compositeKey).build() | ||
| ).asJava) | ||
| .build() | ||
|
|
||
| val result = dynamoDbClient.query(queryRequest) | ||
|
|
||
| result.items().asScala.map { item => | ||
| val key = item.get(MetadataManagerDynamoDb.ATTR_METADATA_KEY).s() | ||
| val value = item.get(MetadataManagerDynamoDb.ATTR_METADATA_VALUE).s() | ||
| val lastUpdated = Instant.ofEpochSecond(item.get(MetadataManagerDynamoDb.ATTR_LAST_UPDATED).n().toLong) | ||
| key -> MetadataValue(value, lastUpdated) | ||
| }.toMap |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
cat -n pramen/core/src/main/scala/za/co/absa/pramen/core/metadata/MetadataManagerDynamoDb.scala | sed -n '80,170p'Repository: AbsaOSS/pramen
Length of output: 4532
🏁 Script executed:
rg -n "getMetadataFromStorage|deleteMetadataFromStorage" pramen/core/src/main/scala/ -A 3 -B 1 | head -80Repository: AbsaOSS/pramen
Length of output: 9502
Add pagination to getMetadataFromStorage and deleteMetadataFromStorage.
The query at line 95 in getMetadataFromStorage(tableName, infoDate) and the query at line 164 in deleteMetadataFromStorage(tableName, infoDate) do not loop on lastEvaluatedKey. If a (tableName, infoDate) partition exceeds DynamoDB's 1 MB default page size, getMetadataFromStorage returns incomplete metadata and deleteMetadataFromStorage leaves stale items without warning.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/metadata/MetadataManagerDynamoDb.scala`
around lines 83 - 102, getMetadataFromStorage and deleteMetadataFromStorage
currently run a single QueryRequest and will miss items if DynamoDB returns a
paginated result; update both methods in MetadataManagerDynamoDb to loop on
QueryResponse.lastEvaluatedKey (use ExclusiveStartKey on subsequent
QueryRequest) until null, aggregating all items before building the final Map in
getMetadataFromStorage; for deleteMetadataFromStorage, collect all keys across
pages and perform deletes for every page (or aggregate and use BatchWriteItem in
batches of 25) until lastEvaluatedKey is empty, ensuring you still read
ATTR_COMPOSITE_KEY, ATTR_METADATA_KEY, ATTR_METADATA_VALUE, ATTR_LAST_UPDATED
and use the same AttributeValue shaping as the existing QueryRequest/Item
deletions.
| # "Resource": [ | ||
| # "arn:aws:dynamodb:*:*:table/pramen_production_bookkeeping", | ||
| # "arn:aws:dynamodb:*:*:table/pramen_production_schemas", | ||
| # "arn:aws:dynamodb:*:*:table/pramen_production_locks" | ||
| # ] | ||
| # } | ||
| # ] | ||
| # } |
There was a problem hiding this comment.
IAM policy example is incomplete - missing tables.
The IAM policy Resource list only includes 3 tables (bookkeeping, schemas, locks) but the documentation above mentions 5+ tables that are automatically created: journal, metadata, and offsets. Users following this example may encounter permission errors.
Consider updating to include all tables:
"Resource": [
"arn:aws:dynamodb:*:*:table/pramen_production_bookkeeping",
"arn:aws:dynamodb:*:*:table/pramen_production_schemas",
- "arn:aws:dynamodb:*:*:table/pramen_production_locks"
+ "arn:aws:dynamodb:*:*:table/pramen_production_locks",
+ "arn:aws:dynamodb:*:*:table/pramen_production_journal",
+ "arn:aws:dynamodb:*:*:table/pramen_production_metadata",
+ "arn:aws:dynamodb:*:*:table/pramen_production_offsets"
]🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pramen/examples/dynamodb_bookkeeping/dynamodb_with_locks.conf` around lines
125 - 132, The IAM policy Resource array is missing several Pramen-created
tables; update the Resource list (the commented ARN entries like
"arn:aws:dynamodb:*:*:table/pramen_production_bookkeeping",
"pramen_production_schemas", "pramen_production_locks") to also include the
journal, metadata and offsets tables (e.g., add
"arn:aws:dynamodb:*:*:table/pramen_production_journal",
"arn:aws:dynamodb:*:*:table/pramen_production_metadata",
"arn:aws:dynamodb:*:*:table/pramen_production_offsets") or replace with a safe
prefix/wildcard such as "arn:aws:dynamodb:*:*:table/pramen_production_*" so all
tables created by Pramen are covered and permission errors are avoided.
| .build() | ||
| ``` | ||
|
|
||
| See `core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDbExample.scala` for more examples. |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check if the referenced example file exists
fd -i "TokenLockFactoryDynamoDbExample.scala" --type f
# Also search for any example files related to TokenLockFactory
rg -l "TokenLockFactoryDynamoDb" --type scala -g '*Example*'Repository: AbsaOSS/pramen
Length of output: 40
🏁 Script executed:
# Search for any TokenLockFactory files (not just examples)
rg "TokenLockFactory" --type scala -l
# Check if the lock directory exists
fd -i "lock" -type d | head -20
# Find scala files in any lock-related directories
fd -i "lock.*\.scala$" --type f | head -20
# Search for files containing DynamoDb and Example
rg "DynamoDb.*Example|Example.*DynamoDb" --type scala -lRepository: AbsaOSS/pramen
Length of output: 3801
🏁 Script executed:
# Find all files in lock directory
fd . "pramen/core/src/main/scala/za/co/absa/pramen/core/lock/" --type f
# Check if any example files exist anywhere with DynamoDb
find . -name "*Example*" -type f | grep -i dynamodb
# Look at the TokenLockFactoryDynamoDb file to see if it has examples
head -100 pramen/core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scalaRepository: AbsaOSS/pramen
Length of output: 4552
Remove or update the reference to non-existent example file.
The documentation references core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDbExample.scala at line 437, but this file does not exist in the codebase. Either update the reference to point to the actual TokenLockFactoryDynamoDb.scala implementation file or remove the reference entirely.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pramen/examples/dynamodb_bookkeeping/README.md` at line 437, The README
references a non-existent example file TokenLockFactoryDynamoDbExample.scala;
update the link to point to the actual implementation
TokenLockFactoryDynamoDb.scala (or remove the reference entirely). Open
pramen/examples/dynamodb_bookkeeping/README.md, find the line mentioning
core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDbExample.scala
and replace it with
core/src/main/scala/za/co/absa/pramen/core/lock/TokenLockFactoryDynamoDb.scala
(or delete the sentence if no example reference is desired) so the documentation
points to an existing symbol/class.
| val scalatestVersion = "3.2.14" | ||
| val mockitoVersion = "2.28.2" | ||
| val httpClientVersion = "4.5.14" | ||
| val awsSdkVersion = "2.42.23" |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check if AWS SDK version 2.42.23 exists on Maven Central
echo "=== Checking AWS SDK DynamoDB version on Maven Central ==="
curl -s "https://search.maven.org/solrsearch/select?q=g:software.amazon.awssdk+AND+a:dynamodb&rows=5&wt=json" | jq '.response.docs[] | {version: .latestVersion}'
echo ""
echo "=== Check for security advisories ==="
gh api graphql -f query='
{
securityVulnerabilities(first: 5, ecosystem: MAVEN, package: "software.amazon.awssdk:dynamodb") {
nodes {
advisory {
summary
severity
publishedAt
}
vulnerableVersionRange
firstPatchedVersion {
identifier
}
}
}
}'Repository: AbsaOSS/pramen
Length of output: 342
🏁 Script executed:
#!/bin/bash
# Query for specific version 2.42.23 and surrounding versions
echo "=== Checking if AWS SDK DynamoDB 2.42.23 exists ==="
curl -s "https://search.maven.org/solrsearch/select?q=g:software.amazon.awssdk+AND+a:dynamodb+AND+v:2.42.23&rows=1&wt=json" | jq '.response'
echo ""
echo "=== Listing all versions in 2.40+ range ==="
curl -s "https://search.maven.org/solrsearch/select?q=g:software.amazon.awssdk+AND+a:dynamodb+AND+v:[2.40+TO+2.50]&rows=20&wt=json" | jq '.response.docs[] | {version: .v}' | sort -u
echo ""
echo "=== Latest 10 versions available ==="
curl -s "https://search.maven.org/solrsearch/select?q=g:software.amazon.awssdk+AND+a:dynamodb&rows=10&wt=json" | jq -r '.response.docs[] | .v' | sort -V | tail -10Repository: AbsaOSS/pramen
Length of output: 245
🏁 Script executed:
#!/bin/bash
# Get actual available versions
echo "=== Latest 15 AWS SDK DynamoDB versions ==="
curl -s "https://search.maven.org/solrsearch/select?q=g:software.amazon.awssdk+AND+a:dynamodb&rows=100&wt=json" | jq -r '.response.docs | sort_by(.v) | reverse | .[0:15] | .[] | .v'Repository: AbsaOSS/pramen
Length of output: 106
🏁 Script executed:
#!/bin/bash
# Get the complete JSON response first
echo "=== AWS SDK DynamoDB versions from Maven Central ==="
curl -s "https://search.maven.org/solrsearch/select?q=g:software.amazon.awssdk+AND+a:dynamodb&rows=50&wt=json" | jq '.response.docs[0:15] | map({version: .v, timestamp: .timestamp}) | sort_by(.version) | reverse'Repository: AbsaOSS/pramen
Length of output: 174
🏁 Script executed:
#!/bin/bash
# Get raw response structure
echo "=== Raw Maven Central response structure ==="
curl -s "https://search.maven.org/solrsearch/select?q=g:software.amazon.awssdk+AND+a:dynamodb&rows=5&wt=json" | jq '.response.docs[0]'Repository: AbsaOSS/pramen
Length of output: 530
🏁 Script executed:
#!/bin/bash
# Verify the actual version in the Versions.scala file
cat pramen/project/Versions.scala | grep -A 2 -B 2 "awsSdkVersion"Repository: AbsaOSS/pramen
Length of output: 218
AWS SDK version 2.42.23 does not exist in Maven Central.
The specified version does not exist; the latest available version is 2.34.0. This will cause a build failure when the dependency cannot be resolved. Update to an available version, such as 2.34.0 or later.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pramen/project/Versions.scala` at line 40, The awsSdkVersion constant in
Versions.scala is set to a non-existent Maven version; update the value of
awsSdkVersion to a valid released AWS SDK v2 artifact (e.g., "2.34.0" or a later
available version) and ensure any places that reference Versions.awsSdkVersion
(dependency declarations or build files) will pick up the new value so
dependency resolution succeeds.
| ```xml | ||
| <dependency> | ||
| <groupId>software.amazon.awssdk</groupId> | ||
| <artifactId>dynamodb</artifactId> | ||
| <version>${aws.sdk.version}</version> | ||
| </dependency> | ||
| ``` |
There was a problem hiding this comment.
Make the Maven example copy-pasteable.
${aws.sdk.version} is defined in this repository’s POM, not in a downstream user’s project, so the snippet on Line 2596 will fail as pasted. Either inline the version here or show the matching <properties> block.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@README.md` around lines 2592 - 2598, The Maven snippet uses the placeholder
${aws.sdk.version} which is defined only in this repo's POM and will break if
users copy-paste; update the README dependency block by replacing
${aws.sdk.version} with the actual AWS SDK version used in this project (or
alternatively add the corresponding <properties> block showing aws.sdk.version)
so the example is copy-pasteable; refer to the dependency XML fragment and the
${aws.sdk.version} token when making the change.
Closes #181
Summary by CodeRabbit
New Features
Documentation
Chores