Skip to content

MODDATAIMP-1278: Fix broken RxJava transaction lifecycle in streaming methods#727

Merged
KaterynaSenchenko merged 4 commits into
masterfrom
MODDATAIMP-1278-fix-streaming-transaction-lifecycle
Apr 15, 2026
Merged

MODDATAIMP-1278: Fix broken RxJava transaction lifecycle in streaming methods#727
KaterynaSenchenko merged 4 commits into
masterfrom
MODDATAIMP-1278-fix-streaming-transaction-lifecycle

Conversation

@okolawole-ebsco
Copy link
Copy Markdown
Contributor

@okolawole-ebsco okolawole-ebsco commented Mar 20, 2026

Purpose

Fix broken transaction lifecycle in the three streaming DAO methods (streamRecords, streamMarcRecordIds, streamSourceRecords) that caused connection pool leaks. The old code committed after rollback on error paths, double-closed connections, and silently discarded cleanup Futures.

return getCachedPool(tenantId)
  .rxGetConnection()
  .flatMapPublisher(conn -> conn.rxBegin()
    .flatMapPublisher(tx -> conn.rxPrepare(sql)
      .flatMapPublisher(pq -> pq.createStream(1)
        .toFlowable()
        .map(this::toRow)
        .map(this::toRecord))
      .doAfterTerminate(tx::commit)       // A: fires on BOTH success AND error
      .doOnError(error -> {               // B: rollback + close
          tx.rollback();
          conn.close();
      })
      .doFinally(conn::close)));          // C: close connection again

Approach

A simple fix could look like so:

return getCachedPool(tenantId)
  .rxGetConnection()
  .flatMapPublisher(conn -> conn.rxBegin()
    .flatMapPublisher(tx -> conn.rxPrepare(sql)
      .flatMapPublisher(pq -> pq.createStream(1)
        .toFlowable()
        .map(this::toRow)
        .map(this::toRecord))
      .doOnComplete(() -> tx.commit())
      .doOnError(error -> tx.rollback())
      .doFinally(conn::close)));

But its missing the following:

  • tx.commit()andtx.rollback()` still return Futures that are silently discarded. If commit fails, the error is lost.
  • It never closes the PreparedStatement (pq).
  • When a subscriber cancels mid-stream (e.g., firstOrError(), take(n)), neither doOnComplete nor doOnError fires. Only doFinally fires. So the transaction is never explicitly committed or rolled back.
  • The stream cursor should be closed before rollback to avoid issuing rollback while a fetch is in-flight.

For a full solution, consolidate the three duplicated streaming patterns into a shared streamInTransaction() helper with a StreamContext inner class. The new lifecycle ensures: commit only on success, rollback only on error/cancel, stream cursor quiesced before rollback, exactly-once cleanup via AtomicBoolean guard, and cancel-path errors logged instead of swallowed. Includes a test that exercises 25 rapid cancel cycles to prove the pool is not exhausted.

Refs: https://folio-org.atlassian.net/browse/MODDATAIMP-1278

… methods

The three streaming methods (streamRecords, streamMarcRecordIds,
streamSourceRecords) had multiple transaction lifecycle bugs:

- doAfterTerminate(tx::commit) fired on both success AND error, issuing
  a commit after a rollback
- conn.close() was called twice (doOnError + doFinally)
- Cleanup Future return values were silently discarded

Consolidate into a shared streamInTransaction() helper with a
StreamContext inner class that ensures:

- Commit only on successful stream completion
- Rollback only on error or cancellation
- Stream cursor quiesced before rollback (prevents in-flight fetch race)
- Exactly-once cleanup via AtomicBoolean guard
- Cancel-path errors logged instead of silently swallowed

Add test that exercises 25 rapid cancel-after-first-row cycles followed
by a full stream read, proving the connection pool is not exhausted.
Exercise the prepare-failure error path (invalid SQL) to cover
rollbackTransactionAndClose and the outer onErrorResumeNext handlers.
Runs 5 failed streams then a valid stream to verify pool is not
exhausted by leaked connections on the error path.
@sonarqubecloud
Copy link
Copy Markdown

@KaterynaSenchenko KaterynaSenchenko merged commit 76fa5ac into master Apr 15, 2026
28 of 29 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants