@@ -19,6 +19,30 @@ function base(op: QueueMetricsRawV1Input["op"], queue: string): QueueMetricsRawV
1919 } ;
2020}
2121
22+ // Cumulative counters: each op keeps a monotonic per-(queue,op) odometer, so a counter row
23+ // carries the running total in `cumulative`. deltaSumTimestamp reconstructs the increase
24+ // (last - first) from a seeded cum=0 baseline; order_key orders readings within an op.
25+ let orderKey = 0 ;
26+ function counter (
27+ op : QueueMetricsRawV1Input [ "op" ] ,
28+ queue : string ,
29+ total : number ,
30+ waits ?: number [ ]
31+ ) : QueueMetricsRawV1Input [ ] {
32+ const rows : QueueMetricsRawV1Input [ ] = [
33+ { ...base ( op , queue ) , cumulative : 0 , order_key : orderKey ++ } ,
34+ ] ;
35+ for ( let cum = 1 ; cum <= total ; cum ++ ) {
36+ rows . push ( {
37+ ...base ( op , queue ) ,
38+ cumulative : cum ,
39+ order_key : orderKey ++ ,
40+ ...( waits ? { wait_ms : waits [ cum - 1 ] } : { } ) ,
41+ } ) ;
42+ }
43+ return rows ;
44+ }
45+
2246const aggregatedRow = z . object ( {
2347 enqueue_count : z . coerce . number ( ) ,
2448 started_count : z . coerce . number ( ) ,
@@ -44,11 +68,11 @@ function readAggregated(ch: ClickHouse) {
4468 return ch . reader . query ( {
4569 name : "read-queue-metrics-aggregated" ,
4670 query : `SELECT
47- sum(enqueue_count ) AS enqueue_count,
48- sum(started_count ) AS started_count,
49- sum(ack_count ) AS ack_count,
50- sum(nack_count ) AS nack_count,
51- sum(dlq_count ) AS dlq_count,
71+ deltaSumTimestampMerge(enqueue_delta ) AS enqueue_count,
72+ deltaSumTimestampMerge(started_delta ) AS started_count,
73+ deltaSumTimestampMerge(ack_delta ) AS ack_count,
74+ deltaSumTimestampMerge(nack_delta ) AS nack_count,
75+ deltaSumTimestampMerge(dlq_delta ) AS dlq_count,
5276 sum(throttled_count) AS throttled_count,
5377 max(max_running) AS max_running,
5478 max(max_queued) AS max_queued,
@@ -82,14 +106,11 @@ describe("queue_metrics_v1", () => {
82106 const queue = "queue-a" ;
83107
84108 const rows : QueueMetricsRawV1Input [ ] = [
85- ...Array . from ( { length : 3 } , ( ) => base ( "enqueue" , queue ) ) ,
86- ...[ 100 , 200 , 300 , 400 , 500 , 600 , 700 , 800 , 900 , 1000 ] . map ( ( wait_ms ) => ( {
87- ...base ( "started" , queue ) ,
88- wait_ms,
89- } ) ) ,
90- ...Array . from ( { length : 2 } , ( ) => base ( "ack" , queue ) ) ,
91- base ( "nack" , queue ) ,
92- base ( "dlq" , queue ) ,
109+ ...counter ( "enqueue" , queue , 3 ) ,
110+ ...counter ( "started" , queue , 10 , [ 100 , 200 , 300 , 400 , 500 , 600 , 700 , 800 , 900 , 1000 ] ) ,
111+ ...counter ( "ack" , queue , 2 ) ,
112+ ...counter ( "nack" , queue , 1 ) ,
113+ ...counter ( "dlq" , queue , 1 ) ,
93114 {
94115 ...base ( "gauge" , queue ) ,
95116 running : 8 ,
@@ -155,12 +176,24 @@ describe("queue_metrics_v1", () => {
155176 const ch = new ClickHouse ( { url : clickhouseContainer . getConnectionUrl ( ) , name : "test" } ) ;
156177 const queue = "queue-b" ;
157178
158- const block = ( waits : number [ ] ) =>
159- waits . map ( ( wait_ms ) => ( { ...base ( "started" , queue ) , wait_ms } ) ) ;
160-
161- const [ e1 ] = await ch . queueMetrics . insertRaw ( block ( [ 100 , 200 , 300 , 400 , 500 ] ) , SYNC ) ;
179+ // Cumulative odometer continues across the two insert blocks (baseline 0, then 1..10);
180+ // deltaSumTimestamp state and quantile state merge across the parts into one bucket.
181+ const startedRow = ( cum : number , wait_ms ?: number ) : QueueMetricsRawV1Input => ( {
182+ ...base ( "started" , queue ) ,
183+ cumulative : cum ,
184+ order_key : orderKey ++ ,
185+ ...( wait_ms !== undefined ? { wait_ms } : { } ) ,
186+ } ) ;
187+
188+ const [ e1 ] = await ch . queueMetrics . insertRaw (
189+ [ startedRow ( 0 ) , ...[ 100 , 200 , 300 , 400 , 500 ] . map ( ( w , i ) => startedRow ( i + 1 , w ) ) ] ,
190+ SYNC
191+ ) ;
162192 expect ( e1 ) . toBeNull ( ) ;
163- const [ e2 ] = await ch . queueMetrics . insertRaw ( block ( [ 600 , 700 , 800 , 900 , 1000 ] ) , SYNC ) ;
193+ const [ e2 ] = await ch . queueMetrics . insertRaw (
194+ [ 600 , 700 , 800 , 900 , 1000 ] . map ( ( w , i ) => startedRow ( i + 6 , w ) ) ,
195+ SYNC
196+ ) ;
164197 expect ( e2 ) . toBeNull ( ) ;
165198
166199 const [ queryError , result ] = await readAggregated ( ch ) ( { queueName : queue } ) ;
0 commit comments