Skip to content

Commit eb0b985

Browse files
authored
Introduce Client.JobUpdate function that can store output incrementally (#1098)
Here, take a stab at a solution for #1064. We introduce a new client function `Client.JobUpdate` that takes any output currently recorded in context and stores it to the given job row. `JobUpdate` currently only sets output, but the idea is that it could be expanded in the future in case it's useful to do so. The reason that we don't have a `river.PersistOutput` in line with `river.RecordOutput` is that once we're talking about storing data, we're dealing with the usual persistence semantics. We need a client instance with access to an executor, and we probably want to have a `*Tx` variant like we have for all other functions like this one. Fixes #1064.
1 parent 00d39fc commit eb0b985

18 files changed

Lines changed: 577 additions & 30 deletions

File tree

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1010
### Added
1111

1212
- Basic stuck detection after a job's exceeded its timeout and still not returned after the executor's initiated context cancellation and waited a short margin for the cancellation to take effect. [PR #1097](https://github.com/riverqueue/river/pull/1097).
13+
- Added `Client.JobUpdate` which can be used to persist job output partway through a running work function instead of having to wait until the job is completed. [PR #1098](https://github.com/riverqueue/river/pull/1098).
1314
- Add a little more error flavor for when encountering a deadline exceeded error on leadership election suggesting that the user may want to try increasing their database pool size. [PR #1101](https://github.com/riverqueue/river/pull/1101).
1415

1516
## [0.29.0-rc.1] - 2025-12-04
1617

18+
### Added
19+
1720
- Added `HookPeriodicJobsStart` that can be used to run custom logic when a periodic job enqueuer starts up on a new leader. [PR #1084](https://github.com/riverqueue/river/pull/1084).
1821
- Added `Client.Notify().RequestResign` and `Client.Notify().RequestResignTx` functions allowing any client to request that the current leader resign. [PR #1085](https://github.com/riverqueue/river/pull/1085).
1922

client.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/riverqueue/river/internal/dbunique"
1818
"github.com/riverqueue/river/internal/hooklookup"
1919
"github.com/riverqueue/river/internal/jobcompleter"
20+
"github.com/riverqueue/river/internal/jobexecutor"
2021
"github.com/riverqueue/river/internal/leadership"
2122
"github.com/riverqueue/river/internal/maintenance"
2223
"github.com/riverqueue/river/internal/middlewarelookup"
@@ -1514,6 +1515,97 @@ func (c *Client[TTx]) jobRetry(ctx context.Context, exec riverdriver.Executor, i
15141515
})
15151516
}
15161517

1518+
// JobUpdateParams contains parameters for Client.JobUpdate and Client.JobUpdateTx.
1519+
type JobUpdateParams struct {
1520+
// Output is a new output value for a job.
1521+
//
1522+
// If not set, and a job is updated from inside a work function, the job's
1523+
// output is set based on output recorded so far using RecordOutput.
1524+
Output any
1525+
}
1526+
1527+
// JobUpdate updates the job with the given ID.
1528+
//
1529+
// If JobUpdateParams.Output is not set, this function may be used inside a job
1530+
// work function to set a job's output based on output recorded so far using
1531+
// RecordOutput.
1532+
func (c *Client[TTx]) JobUpdate(ctx context.Context, id int64, params *JobUpdateParams) (*rivertype.JobRow, error) {
1533+
return c.jobUpdate(ctx, c.driver.GetExecutor(), id, params)
1534+
}
1535+
1536+
// JobUpdateTx updates the job with the given ID.
1537+
//
1538+
// If JobUpdateParams.Output is not set, this function may be used inside a job
1539+
// work function to set a job's output based on output recorded so far using
1540+
// RecordOutput.
1541+
//
1542+
// This variant updates the job inside of a transaction.
1543+
func (c *Client[TTx]) JobUpdateTx(ctx context.Context, tx TTx, id int64, params *JobUpdateParams) (*rivertype.JobRow, error) {
1544+
return c.jobUpdate(ctx, c.driver.UnwrapExecutor(tx), id, params)
1545+
}
1546+
1547+
func (c *Client[TTx]) jobUpdate(ctx context.Context, exec riverdriver.Executor, id int64, params *JobUpdateParams) (*rivertype.JobRow, error) {
1548+
if params == nil {
1549+
params = &JobUpdateParams{}
1550+
}
1551+
1552+
outputFromWorkContext := func() json.RawMessage {
1553+
metadataUpdates, hasMetadataUpdates := jobexecutor.MetadataUpdatesFromWorkContext(ctx)
1554+
if !hasMetadataUpdates {
1555+
return nil
1556+
}
1557+
1558+
if val, ok := metadataUpdates[rivertype.MetadataKeyOutput]; ok {
1559+
return val.(json.RawMessage) //nolint:forcetypeassert
1560+
}
1561+
1562+
return nil
1563+
}()
1564+
1565+
var (
1566+
metadataDoMerge bool
1567+
metadataUpdatesBytes = []byte("{}") // even in the event of no update, still valid jsonb
1568+
)
1569+
if outputFromWorkContext != nil || params.Output != nil {
1570+
metadataDoMerge = true
1571+
1572+
var outputBytes json.RawMessage
1573+
1574+
switch {
1575+
// comes first because params takes precedence over context output
1576+
case params.Output != nil:
1577+
var err error
1578+
outputBytes, err = json.Marshal(params.Output)
1579+
if err != nil {
1580+
return nil, err
1581+
}
1582+
1583+
if err := checkOutputSize(outputBytes); err != nil {
1584+
return nil, err
1585+
}
1586+
1587+
case outputFromWorkContext != nil:
1588+
// no size check necessary here because it's already been checked in RecordOutput
1589+
outputBytes = outputFromWorkContext
1590+
}
1591+
1592+
var err error
1593+
metadataUpdatesBytes, err = json.Marshal(map[string]json.RawMessage{
1594+
rivertype.MetadataKeyOutput: outputBytes,
1595+
})
1596+
if err != nil {
1597+
return nil, fmt.Errorf("error marshaling metadata updates to JSON: %w", err)
1598+
}
1599+
}
1600+
1601+
return exec.JobUpdate(ctx, &riverdriver.JobUpdateParams{
1602+
ID: id,
1603+
MetadataDoMerge: metadataDoMerge,
1604+
Metadata: metadataUpdatesBytes,
1605+
Schema: c.config.Schema,
1606+
})
1607+
}
1608+
15171609
// ID returns the unique ID of this client as set in its config or
15181610
// auto-generated if not specified.
15191611
func (c *Client[TTx]) ID() string {

client_test.go

Lines changed: 185 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/tidwall/sjson"
2525

2626
"github.com/riverqueue/river/internal/dbunique"
27+
"github.com/riverqueue/river/internal/jobexecutor"
2728
"github.com/riverqueue/river/internal/maintenance"
2829
"github.com/riverqueue/river/internal/middlewarelookup"
2930
"github.com/riverqueue/river/internal/notifier"
@@ -4534,6 +4535,187 @@ func Test_Client_JobRetry(t *testing.T) {
45344535
})
45354536
}
45364537

4538+
func Test_Client_JobUpdate(t *testing.T) {
4539+
t.Parallel()
4540+
4541+
ctx := context.Background()
4542+
4543+
type testBundle struct {
4544+
dbPool *pgxpool.Pool
4545+
}
4546+
4547+
setup := func(t *testing.T) (*Client[pgx.Tx], *testBundle) {
4548+
t.Helper()
4549+
4550+
var (
4551+
dbPool = riversharedtest.DBPool(ctx, t)
4552+
driver = riverpgxv5.New(dbPool)
4553+
schema = riverdbtest.TestSchema(ctx, t, driver, nil)
4554+
config = newTestConfig(t, schema)
4555+
client = newTestClient(t, dbPool, config)
4556+
)
4557+
4558+
return client, &testBundle{dbPool: dbPool}
4559+
}
4560+
4561+
t.Run("AllParams", func(t *testing.T) {
4562+
t.Parallel()
4563+
4564+
client, _ := setup(t)
4565+
4566+
insertRes, err := client.Insert(ctx, noOpArgs{}, &InsertOpts{})
4567+
require.NoError(t, err)
4568+
4569+
job, err := client.JobUpdate(ctx, insertRes.Job.ID, &JobUpdateParams{
4570+
Output: "my job output",
4571+
})
4572+
require.NoError(t, err)
4573+
require.Equal(t, `"my job output"`, string(job.Output()))
4574+
4575+
updatedJob, err := client.JobGet(ctx, job.ID)
4576+
require.NoError(t, err)
4577+
require.Equal(t, `"my job output"`, string(updatedJob.Output()))
4578+
})
4579+
4580+
t.Run("NoParams", func(t *testing.T) {
4581+
t.Parallel()
4582+
4583+
client, _ := setup(t)
4584+
4585+
insertRes, err := client.Insert(ctx, noOpArgs{}, &InsertOpts{})
4586+
require.NoError(t, err)
4587+
4588+
_, err = client.JobUpdate(ctx, insertRes.Job.ID, nil)
4589+
require.NoError(t, err)
4590+
})
4591+
4592+
t.Run("OutputFromContext", func(t *testing.T) {
4593+
t.Parallel()
4594+
4595+
client, _ := setup(t)
4596+
4597+
insertRes, err := client.Insert(ctx, noOpArgs{}, &InsertOpts{})
4598+
require.NoError(t, err)
4599+
4600+
ctx := context.WithValue(ctx, jobexecutor.ContextKeyMetadataUpdates, map[string]any{})
4601+
require.NoError(t, RecordOutput(ctx, "my job output from context"))
4602+
4603+
job, err := client.JobUpdate(ctx, insertRes.Job.ID, &JobUpdateParams{})
4604+
require.NoError(t, err)
4605+
require.Equal(t, `"my job output from context"`, string(job.Output()))
4606+
4607+
updatedJob, err := client.JobGet(ctx, job.ID)
4608+
require.NoError(t, err)
4609+
require.Equal(t, `"my job output from context"`, string(updatedJob.Output()))
4610+
})
4611+
4612+
t.Run("ParamOutputTakesPrecedenceOverContextOutput", func(t *testing.T) {
4613+
t.Parallel()
4614+
4615+
client, _ := setup(t)
4616+
4617+
insertRes, err := client.Insert(ctx, noOpArgs{}, &InsertOpts{})
4618+
require.NoError(t, err)
4619+
4620+
ctx := context.WithValue(ctx, jobexecutor.ContextKeyMetadataUpdates, map[string]any{})
4621+
require.NoError(t, RecordOutput(ctx, "my job output from context"))
4622+
4623+
job, err := client.JobUpdate(ctx, insertRes.Job.ID, &JobUpdateParams{
4624+
Output: "my job output from params",
4625+
})
4626+
require.NoError(t, err)
4627+
require.Equal(t, `"my job output from params"`, string(job.Output()))
4628+
4629+
updatedJob, err := client.JobGet(ctx, job.ID)
4630+
require.NoError(t, err)
4631+
require.Equal(t, `"my job output from params"`, string(updatedJob.Output()))
4632+
})
4633+
4634+
t.Run("ParamOutputTooLarge", func(t *testing.T) {
4635+
t.Parallel()
4636+
4637+
client, _ := setup(t)
4638+
4639+
insertRes, err := client.Insert(ctx, noOpArgs{}, &InsertOpts{})
4640+
require.NoError(t, err)
4641+
4642+
_, err = client.JobUpdate(ctx, insertRes.Job.ID, &JobUpdateParams{
4643+
Output: strings.Repeat("x", maxOutputSizeBytes+1),
4644+
})
4645+
require.ErrorContains(t, err, "output is too large")
4646+
})
4647+
}
4648+
4649+
func Test_Client_JobUpdateTx(t *testing.T) {
4650+
t.Parallel()
4651+
4652+
ctx := context.Background()
4653+
4654+
type testBundle struct {
4655+
dbPool *pgxpool.Pool
4656+
executorTx riverdriver.ExecutorTx
4657+
tx pgx.Tx
4658+
}
4659+
4660+
setup := func(t *testing.T) (*Client[pgx.Tx], *testBundle) {
4661+
t.Helper()
4662+
4663+
var (
4664+
dbPool = riversharedtest.DBPool(ctx, t)
4665+
driver = riverpgxv5.New(dbPool)
4666+
schema = riverdbtest.TestSchema(ctx, t, driver, nil)
4667+
config = newTestConfig(t, schema)
4668+
client = newTestClient(t, dbPool, config)
4669+
)
4670+
4671+
tx, err := dbPool.Begin(ctx)
4672+
require.NoError(t, err)
4673+
t.Cleanup(func() { tx.Rollback(ctx) })
4674+
4675+
return client, &testBundle{
4676+
dbPool: dbPool,
4677+
executorTx: client.driver.UnwrapExecutor(tx),
4678+
tx: tx,
4679+
}
4680+
}
4681+
4682+
t.Run("AllParams", func(t *testing.T) {
4683+
t.Parallel()
4684+
4685+
client, bundle := setup(t)
4686+
4687+
insertRes, err := client.Insert(ctx, noOpArgs{}, &InsertOpts{})
4688+
require.NoError(t, err)
4689+
4690+
job, err := client.JobUpdateTx(ctx, bundle.tx, insertRes.Job.ID, &JobUpdateParams{
4691+
Output: "my job output",
4692+
})
4693+
require.NoError(t, err)
4694+
require.Equal(t, `"my job output"`, string(job.Output()))
4695+
4696+
updatedJob, err := client.JobGetTx(ctx, bundle.tx, job.ID)
4697+
require.NoError(t, err)
4698+
require.Equal(t, `"my job output"`, string(updatedJob.Output()))
4699+
4700+
// Outside of transaction shows original
4701+
updatedJob, err = client.JobGet(ctx, job.ID)
4702+
require.NoError(t, err)
4703+
require.Empty(t, string(updatedJob.Output()))
4704+
})
4705+
4706+
t.Run("NoParams", func(t *testing.T) {
4707+
t.Parallel()
4708+
4709+
client, bundle := setup(t)
4710+
4711+
insertRes, err := client.Insert(ctx, noOpArgs{}, &InsertOpts{})
4712+
require.NoError(t, err)
4713+
4714+
_, err = client.JobUpdateTx(ctx, bundle.tx, insertRes.Job.ID, nil)
4715+
require.NoError(t, err)
4716+
})
4717+
}
4718+
45374719
func Test_Client_ErrorHandler(t *testing.T) {
45384720
t.Parallel()
45394721

@@ -5859,7 +6041,7 @@ func Test_Client_RetryPolicy(t *testing.T) {
58596041
// regression protection to ensure we're testing the right number of jobs:
58606042
require.Equal(t, rivercommon.MaxAttemptsDefault, insertRes.Job.MaxAttempts)
58616043

5862-
updatedJob, err := client.driver.GetExecutor().JobUpdate(ctx, &riverdriver.JobUpdateParams{
6044+
updatedJob, err := client.driver.GetExecutor().JobUpdateFull(ctx, &riverdriver.JobUpdateFullParams{
58636045
ID: insertRes.Job.ID,
58646046
AttemptedAtDoUpdate: true,
58656047
AttemptedAt: &now, // we want a value here, but it'll be overwritten as jobs are locked by the producer
@@ -6647,7 +6829,7 @@ func Test_Client_JobCompletion(t *testing.T) {
66476829
}
66486830

66496831
AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
6650-
_, err := client.driver.GetExecutor().JobUpdate(ctx, &riverdriver.JobUpdateParams{
6832+
_, err := client.driver.GetExecutor().JobUpdateFull(ctx, &riverdriver.JobUpdateFullParams{
66516833
ID: job.ID,
66526834
FinalizedAtDoUpdate: true,
66536835
FinalizedAt: &now,
@@ -6753,7 +6935,7 @@ func Test_Client_JobCompletion(t *testing.T) {
67536935
}
67546936

67556937
AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
6756-
_, err := client.driver.GetExecutor().JobUpdate(ctx, &riverdriver.JobUpdateParams{
6938+
_, err := client.driver.GetExecutor().JobUpdateFull(ctx, &riverdriver.JobUpdateFullParams{
67576939
ID: job.ID,
67586940
ErrorsDoUpdate: true,
67596941
Errors: [][]byte{[]byte("{\"error\": \"oops\"}")},

internal/jobexecutor/job_executor_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ func TestJobExecutor_Execute(t *testing.T) {
318318
require.Equal(t, rivertype.JobStateAvailable, job.State)
319319
}
320320

321-
_, err := bundle.exec.JobUpdate(ctx, &riverdriver.JobUpdateParams{
321+
_, err := bundle.exec.JobUpdateFull(ctx, &riverdriver.JobUpdateFullParams{
322322
ID: bundle.jobRow.ID,
323323
StateDoUpdate: true,
324324
State: rivertype.JobStateRunning,
@@ -373,7 +373,7 @@ func TestJobExecutor_Execute(t *testing.T) {
373373

374374
// add a unique key so we can verify it's cleared
375375
var err error
376-
bundle.jobRow, err = bundle.exec.JobUpdate(ctx, &riverdriver.JobUpdateParams{
376+
bundle.jobRow, err = bundle.exec.JobUpdateFull(ctx, &riverdriver.JobUpdateFullParams{
377377
ID: bundle.jobRow.ID,
378378
State: rivertype.JobStateAvailable, // required for encoding but ignored
379379
})

0 commit comments

Comments
 (0)