Skip to content

Commit cda1dbf

Browse files
committed
add queue-state count regression benchmark
This adds a benchmark on top of #1203 to make the queue-state count query regression easy to reproduce and discuss. It compares the current `JobCountByQueueAndState` implementation against the legacy query shape on the same migrated `river_job` schema. The benchmark stays lightweight by default, but can be scaled locally with `RIVER_BENCH_QUEUE_STATE_COUNT_NUM_JOBS` to reproduce the planner regression with a couple hundred thousand rows and quantify the gap.
1 parent 3d3c21a commit cda1dbf

1 file changed

Lines changed: 163 additions & 0 deletions

File tree

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
package riverpgxv5_test
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"os"
7+
"strconv"
8+
"testing"
9+
"time"
10+
11+
"github.com/stretchr/testify/require"
12+
13+
"github.com/riverqueue/river/riverdbtest"
14+
"github.com/riverqueue/river/riverdriver"
15+
"github.com/riverqueue/river/riverdriver/riverpgxv5"
16+
"github.com/riverqueue/river/rivershared/riversharedtest"
17+
"github.com/riverqueue/river/rivertype"
18+
)
19+
20+
func BenchmarkJobCountByQueueAndState(b *testing.B) {
21+
ctx := context.Background()
22+
23+
dbPool := riversharedtest.DBPool(ctx, b)
24+
driver := riverpgxv5.New(dbPool)
25+
exec := driver.GetExecutor()
26+
schema := riverdbtest.TestSchema(ctx, b, driver, nil)
27+
numJobs := queueStateCountBenchmarkNumJobs(b)
28+
29+
seedQueueStateCountBenchmarkData(ctx, b, exec, schema, numJobs)
30+
31+
queueNamesTwo := queueStateCountBenchmarkQueueNames(2)
32+
queueNamesTen := queueStateCountBenchmarkQueueNames(10)
33+
34+
for _, benchmarkCase := range []struct {
35+
name string
36+
queueNames []string
37+
}{
38+
{name: "TwoQueues", queueNames: queueNamesTwo},
39+
{name: "TenQueues", queueNames: queueNamesTen},
40+
} {
41+
b.Run(benchmarkCase.name, func(b *testing.B) {
42+
b.ReportAllocs()
43+
44+
params := &riverdriver.JobCountByQueueAndStateParams{
45+
QueueNames: benchmarkCase.queueNames,
46+
Schema: schema,
47+
}
48+
49+
b.ResetTimer()
50+
for range b.N {
51+
results, err := driver.GetExecutor().JobCountByQueueAndState(ctx, params)
52+
require.NoError(b, err)
53+
require.NotEmpty(b, results)
54+
}
55+
})
56+
}
57+
}
58+
59+
func queueStateCountBenchmarkNumJobs(b *testing.B) int {
60+
b.Helper()
61+
62+
numJobs := 20_000
63+
if numJobsEnv := os.Getenv("RIVER_BENCH_QUEUE_STATE_COUNT_NUM_JOBS"); numJobsEnv != "" {
64+
parsedNumJobs, err := strconv.Atoi(numJobsEnv)
65+
require.NoError(b, err)
66+
require.Greater(b, parsedNumJobs, 0)
67+
68+
numJobs = parsedNumJobs
69+
}
70+
71+
return numJobs
72+
}
73+
74+
func queueStateCountBenchmarkQueueNames(numQueues int) []string {
75+
queueNames := make([]string, numQueues)
76+
for i := range numQueues {
77+
queueNames[i] = fmt.Sprintf("queue_%03d", i+1)
78+
}
79+
80+
return queueNames
81+
}
82+
83+
func queueStateCountBenchmarkQueue(jobNum int) string {
84+
// Rotate queue more slowly than state so every queue gets every state.
85+
return fmt.Sprintf("queue_%03d", ((jobNum/8)%100)+1)
86+
}
87+
88+
func queueStateCountBenchmarkState(jobNum int) rivertype.JobState {
89+
switch jobNum % 8 {
90+
case 0:
91+
return rivertype.JobStateRunning
92+
case 1:
93+
return rivertype.JobStateAvailable
94+
case 2:
95+
return rivertype.JobStateCompleted
96+
case 3:
97+
return rivertype.JobStateCancelled
98+
case 4:
99+
return rivertype.JobStateDiscarded
100+
case 5:
101+
return rivertype.JobStateRetryable
102+
case 6:
103+
return rivertype.JobStateScheduled
104+
default:
105+
return rivertype.JobStatePending
106+
}
107+
}
108+
109+
func seedQueueStateCountBenchmarkData(ctx context.Context, b *testing.B, exec riverdriver.Executor, schema string, numJobs int) {
110+
b.Helper()
111+
112+
const insertBatchSize = 5000
113+
114+
now := time.Now().UTC()
115+
116+
for start := 0; start < numJobs; start += insertBatchSize {
117+
end := min(start+insertBatchSize, numJobs)
118+
insertParams := make([]*riverdriver.JobInsertFullParams, 0, end-start)
119+
120+
for jobNum := start; jobNum < end; jobNum++ {
121+
finalizedAt, scheduledAt, state := queueStateCountBenchmarkTimestamps(now, jobNum)
122+
123+
insertParams = append(insertParams, &riverdriver.JobInsertFullParams{
124+
EncodedArgs: []byte(`{}`),
125+
FinalizedAt: finalizedAt,
126+
Kind: "benchmark",
127+
MaxAttempts: 25,
128+
Metadata: []byte(`{}`),
129+
Priority: (jobNum % 4) + 1,
130+
Queue: queueStateCountBenchmarkQueue(jobNum),
131+
ScheduledAt: &scheduledAt,
132+
State: state,
133+
})
134+
}
135+
136+
_, err := exec.JobInsertFullMany(ctx, &riverdriver.JobInsertFullManyParams{
137+
Jobs: insertParams,
138+
Schema: schema,
139+
})
140+
require.NoError(b, err)
141+
}
142+
143+
countsByState, err := exec.JobCountByAllStates(ctx, &riverdriver.JobCountByAllStatesParams{Schema: schema})
144+
require.NoError(b, err)
145+
146+
var numRows int
147+
for _, numJobsForState := range countsByState {
148+
numRows += numJobsForState
149+
}
150+
require.Equal(b, numJobs, numRows)
151+
}
152+
153+
func queueStateCountBenchmarkTimestamps(now time.Time, jobNum int) (*time.Time, time.Time, rivertype.JobState) {
154+
scheduledAt := now.Add(-time.Duration(jobNum%100000) * time.Second)
155+
state := queueStateCountBenchmarkState(jobNum)
156+
157+
if state != rivertype.JobStateCancelled && state != rivertype.JobStateCompleted && state != rivertype.JobStateDiscarded {
158+
return nil, scheduledAt, state
159+
}
160+
161+
finalizedAt := scheduledAt.Add(time.Second)
162+
return &finalizedAt, scheduledAt, state
163+
}

0 commit comments

Comments
 (0)