Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
1e0ccf7
ci: compile main sources in coverage_report job
sonus21 May 2, 2026
65dfd14
Merge branch 'master' of github.com:/sonus21/rqueue
sonus21 May 3, 2026
5186137
nats-web: implement pause / soft-delete admin ops and capability-awar…
sonus21 May 5, 2026
42eb61e
nats-web: capability-aware nav / charts and stream-based peek
sonus21 May 5, 2026
2c15390
nats-web: backend-aware data-type labels and Limits-aware queue size
sonus21 May 5, 2026
81b4470
nats-web: position-based pending estimate for Limits streams, render …
sonus21 May 5, 2026
0d040b1
nats-web: per-consumer pending breakdown for Limits-retention streams
sonus21 May 5, 2026
c2dbfc2
nats-web: consumer-level Subscribers + Terminal Storage redesign
sonus21 May 5, 2026
60f6f33
nats-web: queue-detail redesign — hero, chip strip, subscriber & term…
sonus21 May 5, 2026
a6a9e69
fix: reuse single consumer for workqueue streams
sonus21 May 5, 2026
a7a3054
Revert "fix: reuse single consumer for workqueue streams"
sonus21 May 5, 2026
4fdd11b
nats-web: tighten queue-detail layout, add play/pause action button
sonus21 May 5, 2026
21ecfb5
fix: use single consumer-name suffix in resolvedConsumerName
sonus21 May 5, 2026
7bcc30d
nats-web: consumer-aware peek for Limits-retention streams
sonus21 May 5, 2026
0a4eb5c
nats-web: peek from ackFloor, not delivered.streamSeq
sonus21 May 5, 2026
524fc5c
fix: ack/nack target wrong NATS Message under multi-consumer fan-out
sonus21 May 5, 2026
f83a821
nats: regression test for inFlight key-collision under fan-out
sonus21 May 5, 2026
ec99465
nats-web: align Pending column with explorer + add Workers column
sonus21 May 5, 2026
deb1040
nats: tests for consumer-aware peek + adapt QueueModeIT to new contract
sonus21 May 5, 2026
1fc9693
nats-web: keep Pending column as numPending (yet-to-deliver only)
sonus21 May 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions nats-task-v2.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,21 @@ All v1 items are done and 360 unit tests pass. Branch `nats-backend` is ready to

## v2 pending items

### 1. Web dashboard — NATS gaps
### 1. Web dashboard — NATS gaps *(IN PROGRESS — pause/delete/explore landed)*

Controllers are no longer Redis-gated but several operations throw `BackendCapabilityException` (HTTP 501) on NATS. The front-end should hide unsupported panels proactively instead of relying on 501s.

- Expose `GET /rqueue/api/capabilities` returning the `Capabilities` record so the UI can conditionally hide panels.
- Extend `Capabilities` with dashboard-op flags: `supportsCharts`, `supportsMessageBrowse`, `supportsAdminMove`.
- Wire the flags into Pebble templates (scheduled panel, cron jobs panel, chart panel already have `hideScheduledPanel` / `hideCronJobs` hooks in `DataViewResponse`).
- ✅ `GET /rqueue/api/capabilities` already returns the `Capabilities` record so the UI can conditionally hide panels.
- ✅ `RqueueQDetailServiceImpl.getRunningTasks()` / `getScheduledTasks()` now return header-only tables on NATS instead of zero rows / 501s. Pending queue browsing routes through `MessageBroker.peek()`.
- ✅ `NatsRqueueUtilityService` implements `pauseUnpauseQueue` (persists flag + notifies local `RqueueMessageListenerContainer`), soft `deleteMessage` (KV metadata flag), `getDataType` (returns `"STREAM"`), `aggregateDataCounter`. 20 unit tests cover the path.
- ⏳ Pause-event multi-instance fan-out: `RqueueInternalPubSubChannel` is Redis-only. NATS bridge follow-up: subscribe to `rqueue.internal.<broker>` via `MessageBroker.subscribe/publish` and rebroadcast pause requests across worker JVMs.
- ⏳ Extend `Capabilities` with dashboard-op flags: `supportsCharts`, `supportsMessageBrowse`, `supportsAdminMove` (not yet — current flags suffice for the panels we hide today).
- ⏳ Pebble templates: `hideScheduledPanel` / `hideCronJobs` already wired into `DataViewResponse`. Front-end hides those panels; chart and message-browse hides still TBD.

Affected services that throw on NATS today:
- `RqueueDashboardChartServiceImpl` — time-series charts (no equivalent in JetStream)
- `RqueueUtilityServiceImpl` — move/enqueue admin ops
- `NatsMessageBrowsingRepository.viewData` — positional message browse
- `RqueueDashboardChartServiceImpl` — time-series charts (no equivalent in JetStream) — still pending
- `RqueueUtilityServiceImpl` — move/enqueue admin ops — `moveMessage`, `enqueueMessage`, `makeEmpty` deliberately remain `notSupported` (no JetStream primitive); `pauseUnpauseQueue` and `deleteMessage` now implemented
- `NatsMessageBrowsingRepository.viewData` — positional message browse (Redis-only by design)

### 2. Reactive listener container

Expand Down
11 changes: 9 additions & 2 deletions nats-task.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,16 @@ Then re-run:
./gradlew :rqueue-spring-boot-starter:test --tests "com.github.sonus21.rqueue.spring.boot.integration.NatsBackendEndToEndIT"
```

### Web-layer NATS dashboard gap (new follow-up)
### Web-layer NATS dashboard gap (new follow-up) *(PARTIAL — admin write ops landing)*

All 4 controllers and the 5 web service impls (`RqueueDashboardChartService*`, `RqueueQDetailService*`, `RqueueJobService*`, `RqueueSystemManagerService*`, `RqueueUtilityService*`) are still gated `@Conditional(RedisBackendCondition)`. On NATS the dashboard reports broker-derived sizes only; no charts, no message browse, no admin ops. Plan to fix:
All 4 controllers and the 5 web service impls (`RqueueDashboardChartService*`, `RqueueQDetailService*`, `RqueueJobService*`, `RqueueSystemManagerService*`, `RqueueUtilityService*`) are still gated `@Conditional(RedisBackendCondition)`. On NATS the dashboard reports broker-derived sizes only; no charts, no message browse, no admin ops.

Status:
- ✅ `NatsRqueueUtilityService` (rqueue-nats `@Conditional(NatsBackendCondition)`) replaces the all-stub impl: `pauseUnpauseQueue`, soft `deleteMessage`, `getDataType`, `aggregateDataCounter` work end-to-end. `moveMessage` / `enqueueMessage` / `makeEmpty` are deliberately `notSupported` (no JetStream equivalent).
- ✅ `RqueueQDetailServiceImpl` returns header-only tables for `getRunningTasks` / `getScheduledTasks` when the broker capabilities suppress those sections, instead of rendering 0-rows / 501s.
- ⏳ Charts (`RqueueDashboardChartService`), message browse, and `moveMessage` on NATS — still pending.

Plan to fix the rest:

1. Introduce repository interfaces in `rqueue-core/repository/` for the few storage primitives the web services share (queue browsing, time-series counters, atomic move). Web service impls move into core / `rqueue-web` and depend only on the repos.
2. Redis impls of the repos stay in `rqueue-redis`; NATS impls go in `rqueue-nats` and throw `BackendCapabilityException("nats", "operation", "reason")` for primitives JetStream can't model (positional message moves, time-bucket charts).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,4 +154,27 @@ public static int getActiveQueueCount() {
public static int getRegisteredQueueCount() {
return registry.size();
}

/**
* Returns every {@link QueueDetail} registered under the given queue name, including all
* {@code @RqueueListener} methods that share the same backing storage. Used by the
* dashboard to render one subscriber row per handler. Returns an empty list when no
* detail is registered.
*/
public static List<QueueDetail> getAllForQueue(String queueName) {
if (queueName == null) {
return new ArrayList<>();
}
synchronized (lock) {
List<QueueDetail> matches = registry.values().stream()
.filter(qd -> queueName.equals(qd.getName()))
.sorted(Comparator.comparing(qd -> {
String cn = qd.getConsumerName();
return cn == null ? "" : cn;
}))
.collect(Collectors.toList());
lock.notifyAll();
return matches;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,18 @@ default List<RqueueMessage> pop(

List<RqueueMessage> peek(QueueDetail q, long offset, long count);

/**
* Consumer-aware peek overload. When {@code consumerName} is non-null and the backend has
* per-consumer offsets (e.g. NATS Limits-retention streams), the implementation starts
* pagination from that consumer's next undelivered sequence so the dashboard shows messages
* still pending for that specific subscriber instead of the entire retained window. The
* default delegates to {@link #peek(QueueDetail, long, long)} for backends with a single
* shared pool.
*/
default List<RqueueMessage> peek(QueueDetail q, String consumerName, long offset, long count) {
return peek(q, offset, count);
}

/**
* Remove {@code old} from the processing store and re-enqueue {@code updated} for retry.
* {@code delayMs <= 0} means immediate; {@code delayMs > 0} means schedule after that delay.
Expand Down Expand Up @@ -198,6 +210,85 @@ default String dlqStorageDisplayName(QueueDetail q) {
return null;
}

/**
* Indicates whether {@link #size(QueueDetail)} returns an exact count or an approximation
* for the given queue. Brokers that compute pending from per-consumer position math (e.g.
* NATS JetStream Limits-retention streams) return {@code true} so the dashboard renders
* the figure with a "~" prefix instead of presenting it as authoritative. Defaults to
* {@code false} (the historical Redis behavior — exact list / sorted-set lengths).
*/
default boolean isSizeApproximate(QueueDetail q) {
return false;
}

/**
* Per-consumer pending breakdown for queues whose backend has multiple independent
* subscribers — e.g. JetStream Limits-retention streams where each durable consumer
* progresses at its own pace. Returns an ordered map of {@code consumerName -> pending}
* so the dashboard can render one row per consumer instead of a single aggregate.
*
* <p>The default returns {@code null}, signalling that the queue has a single shared pool
* (Redis lists, NATS WorkQueue streams) and the caller should fall back to
* {@link #size(QueueDetail)}. Empty / null also means "no consumers attached".
*
* @deprecated superseded by {@link #subscribers(QueueDetail)} which returns a richer view
* (consumer + pending + in-flight + shared flag). Retained for one release so
* downstream callers keep compiling.
*/
@Deprecated
default java.util.Map<String, Long> consumerPendingSizes(QueueDetail q) {
return null;
}

/**
* Per-subscriber breakdown for the queue-detail dashboard. Each entry represents one
* logical handler attached to the queue:
*
* <ul>
* <li><b>Redis</b> — one entry per {@code @RqueueListener} method that registered for
* the queue. {@code pending} is the shared list size on every row
* ({@code pendingShared = true}); {@code inFlight} is the shared processing-ZSET
* size.
* <li><b>NATS JetStream</b> — one entry per durable consumer. For WorkQueue retention
* {@code pending} is the shared stream {@code msgCount} ({@code pendingShared = true});
* for Limits retention it is the exact per-consumer {@code numPending}
* ({@code pendingShared = false}). {@code inFlight} is the consumer's
* {@code numAckPending} in both cases.
* </ul>
*
* <p>The default returns a single anonymous row backed by {@link #size(QueueDetail)}, so
* brokers that don't track named subscribers still render a working table.
*/
default java.util.List<SubscriberView> subscribers(QueueDetail q) {
long pending;
try {
pending = size(q);
} catch (RuntimeException e) {
pending = 0L;
}
return java.util.Collections.singletonList(
new SubscriberView(q.resolvedConsumerName(), pending, 0L, true));
}

/**
* Backend-aware human-readable label for the given Redis-shaped {@code DataType} on the given
* dashboard tab. Surfaces in the queue-detail page's "Data Type" column so NATS deployments
* can show "Queue (Stream)" instead of "LIST".
*
* <p>The default returns {@code null}, which the dashboard interprets as "fall back to
* {@code DataType.name()}" (the historical Redis behavior).
*
* @param tab the dashboard nav tab the row corresponds to (PENDING, RUNNING, SCHEDULED, DEAD,
* COMPLETED, etc.). May be {@code null} when called in a context without a tab.
* @param type Redis-shaped data type used by the dashboard's table rendering.
* @return display label, or {@code null} to fall through to the default rendering.
*/
default String dataTypeLabel(
com.github.sonus21.rqueue.models.enums.NavTab tab,
com.github.sonus21.rqueue.models.enums.DataType type) {
return null;
}

AutoCloseable subscribe(String channel, Consumer<String> handler);

void publish(String channel, String payload);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) 2026 Sonu Kumar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* You may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the License.
*
*/

package com.github.sonus21.rqueue.core.spi;

/**
* Per-subscriber row surfaced by {@link MessageBroker#subscribers} for the queue-detail
* dashboard. Each entry corresponds to one logical consumer attached to the queue:
*
* <ul>
* <li>On Redis, one entry per {@code @RqueueListener} method (handlers all share the
* same backing list, so {@code pending} is the same on every row and
* {@code pendingShared} is {@code true}).
* <li>On NATS JetStream, one entry per durable consumer. {@code pending} is the
* per-consumer {@code numPending} for Limits retention (exact, divergent across
* rows) and the shared {@code msgCount} for WorkQueue retention (same on every
* row, {@code pendingShared = true}).
* </ul>
*
* @param consumerName logical consumer / handler name (from {@code @RqueueListener.consumerName}
* when set, otherwise a backend-derived name like {@code rqueue-<queue>}).
* @param pending messages waiting to be processed by this subscriber.
* @param inFlight messages this subscriber has received but not yet acknowledged.
* @param pendingShared {@code true} when {@code pending} is a queue-wide aggregate rather
* than this subscriber's exclusive backlog. The dashboard renders these with a
* "(shared)" hint so it's clear the figure is not per-consumer.
*/
public record SubscriberView(
String consumerName, long pending, long inFlight, boolean pendingShared) {

public SubscriberView(String consumerName, long pending, long inFlight) {
this(consumerName, pending, inFlight, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,50 @@ public long size(QueueDetail q) {
return size == null ? 0L : size;
}

/**
* Per-subscriber rows for the Redis backend. Walks the {@link
* com.github.sonus21.rqueue.core.EndpointRegistry} for every {@code @RqueueListener}
* registered against this queue, then reports the shared list size and processing-ZSET
* size on each row. {@code pendingShared = true} on every row because Redis listeners
* compete on the same backing list — the figure is identical across rows but surfacing
* each handler still tells the operator which methods are subscribed and (joined with
* the worker registry by the dashboard) when each was last active.
*/
@Override
public java.util.List<com.github.sonus21.rqueue.core.spi.SubscriberView> subscribers(
QueueDetail q) {
long sharedPending;
try {
sharedPending = size(q);
} catch (RuntimeException e) {
sharedPending = 0L;
}
long sharedInFlight;
try {
RedisTemplate<String, RqueueMessage> rt = template.getTemplate();
Long zsetSize = rt.opsForZSet().size(q.getProcessingQueueName());
sharedInFlight = zsetSize == null ? 0L : zsetSize;
} catch (RuntimeException e) {
sharedInFlight = 0L;
}
java.util.List<QueueDetail> registered =
com.github.sonus21.rqueue.core.EndpointRegistry.getAllForQueue(q.getName());
if (registered.isEmpty()) {
// Queue is registered as primary only (no secondary handlers) — fall through to a
// single row using the queue's own consumer name so the table still renders.
return java.util.Collections.singletonList(
new com.github.sonus21.rqueue.core.spi.SubscriberView(
q.resolvedConsumerName(), sharedPending, sharedInFlight, true));
}
java.util.List<com.github.sonus21.rqueue.core.spi.SubscriberView> out =
new java.util.ArrayList<>(registered.size());
for (QueueDetail qd : registered) {
out.add(new com.github.sonus21.rqueue.core.spi.SubscriberView(
qd.resolvedConsumerName(), sharedPending, sharedInFlight, true));
}
return out;
}

@Override
public AutoCloseable subscribe(String channel, Consumer<String> handler) {
if (pubSubContainer == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,19 +174,23 @@ public Duration visibilityDuration() {
}

/**
* Returns the effective JetStream consumer name for this queue. When {@link #consumerName} is
* explicitly set it is returned as-is. Otherwise a default is derived from the queue name:
* primary (non-system-generated) queues get {@code {name}-consumer-primary}; system-generated
* priority sub-queues get {@code {name}-consumer}. The name is sanitized so that characters
* Returns the effective broker-level consumer name for this queue. When {@link #consumerName} is
* explicitly set on the {@code @RqueueListener} it is returned as-is. Otherwise the default is
* derived from the queue name as {@code {name}-consumer}. The name is sanitized so characters
* outside {@code [A-Za-z0-9_-]} (e.g. the {@code ::} priority suffix separator) are replaced
* with {@code -}, producing a valid NATS consumer name in all cases.
*
* <p>A single suffix is used regardless of {@code systemGenerated} so the bootstrap validator
* and the runtime poller agree on the consumer name. NATS workqueue streams reject multiple
* non-filtered consumers (error 10099); using two different suffixes would cause the poller to
* try creating a second consumer with a different name and fail.
*/
public String resolvedConsumerName() {
if (consumerName != null && !consumerName.isEmpty()) {
return consumerName;
}
String sanitized = name.replaceAll("[^A-Za-z0-9_-]", "-");
return systemGenerated ? sanitized + "-consumer" : sanitized + "-consumer-primary";
return sanitized + "-consumer";
}

public boolean isDoNotRetryError(Throwable throwable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,12 @@ public class QueueExploreRequest extends SerializableBase {

@JsonProperty("count")
private int itemPerPage = 20;

/**
* Optional consumer / subscriber name. When set on Limits-retention streams the broker
* starts the peek from that consumer's next undelivered sequence instead of the stream's
* first sequence, so the explorer shows messages that are still pending for this specific
* consumer rather than the entire retained window.
*/
private String consumerName;
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,30 @@ public class RedisDataDetail extends SerializableBase {
private String name;
private DataType type;
private long size;

/**
* Backend-aware human-readable label for the {@link #type}. The legacy templates surface
* this directly so deployments can display "Queue" / "Stream" on NATS instead of Redis-shaped
* "LIST" / "ZSET" tokens. When unset, templates default to {@code type.name()}.
*/
private String typeLabel;

/**
* Indicates that {@link #size} is an approximation rather than an exact count. NATS
* Limits-retention streams compute pending size from {@code lastSeq - min(delivered.streamSeq)}
* across durable consumers, which is approximate when filter subjects or per-consumer
* positions diverge. Templates render the size with a {@code ~} prefix when this is set.
*/
private boolean approximate;

/**
* Optional consumer name when the row represents a single subscriber's view of a shared
* stream (e.g. NATS Limits-retention streams). Renders next to the stream name in the
* "Name" column so each durable consumer's lag is visible separately.
*/
private String consumerName;

public RedisDataDetail(String name, DataType type, long size) {
this(name, type, size, null, false, null);
}
}
Loading
Loading