Skip to content

async_api jobs killed by transient Redis errors during update_stateRedisError and "state actually missing" are conflated into a single fatal path #1219

@mihow

Description

@mihow

Note on scope

This ticket is the code-path half of the bug: a single transient Redis error
kills an async job irrecoverably because of how AsyncJobStateManager.update_state()
and its caller handle RedisError. There is a sibling config-path bug (missing
socket_keepalive on the Django Redis cache connection) captured separately in
2026-04-10-antenna-new-issue-redis-cache-keepalive.md. Fixing the config bug
reduces the frequency of the transient; fixing this code-path bug ensures the
remaining transients don't destroy user data.

Both should be fixed. Either one alone is insufficient.

Summary

In ami/ml/orchestration/async_job_state.py, AsyncJobStateManager.update_state() returns None for two very different conditions:

  1. redis.exceptions.RedisError raised during the pipeline call (transient: connection reset, timeout, broker restart)
  2. total_raw is None — the job:<id>:pending_images_total key genuinely doesn't exist (state was actually cleaned up)
# async_job_state.py
try:
    ...
    results = pipe.execute()
except RedisError as e:
    logger.error(f"Redis error updating job {self.job_id} state: {e}")
    return None                                   # ← transient

...
if total_raw is None:
    return None                                   # ← state actually gone

The caller in ami/jobs/tasks.py:process_nats_pipeline_result() treats both the same way — no retry, no distinguishing log, immediate call to _fail_job:

# tasks.py, first call (before save_results)
progress_info = state_manager.update_state(processed_image_ids, stage="process", ...)
if not progress_info:
    _ack_task_via_nats(reply_subject, logger)
    _fail_job(job_id, "Redis state missing for job")
    return

# tasks.py, second call (after save_results + NATS ack)
progress_info = state_manager.update_state(processed_image_ids, stage="results")
if not progress_info:
    _fail_job(job_id, "Redis state missing for job")
    return

_fail_job sets job status to FAILURE, marks finished_at, and calls cleanup_async_job_resources, which deletes the Redis state AND the NATS stream + consumer. So a single 153ms connection blip to Redis doesn't just incorrectly fail the job — it destroys the job's work queue, making recovery impossible without re-running the whole job from scratch.

Observed production incident

An async_api job with 840 images was processing normally:

  • Both stages (process, results) advancing together
  • 507 / 840 images completed, 0 failures
  • Thousands of detections and classifications already saved to the DB
  • Celery worker logs show every task returning succeeded in 0.06s, NATS acks going through cleanly

A single process_nats_pipeline_result task hit a Redis connection reset:

20:39:51,279 ERROR/ForkPoolWorker-37 Redis error updating job <id> state:
  Error while reading from redis:6379 : (104, 'Connection reset by peer')
20:39:51,432 ERROR/ForkPoolWorker-37 Job <id> marked as FAILURE: Redis state missing for job

153ms elapsed between the Redis error and the fatal _fail_job call. Other concurrent process_nats_pipeline_result tasks running on the same celery worker for the same job had no trouble reaching Redis before and after this event — a classic transient. Yet the job was marked FAILURE, its NATS stream + consumer were deleted, and its Redis state was wiped.

The user's only evidence of what happened is a single line in the job log:

ERROR Job 2403 marked as FAILURE: Redis state missing for job

Which is actively misleading — the state wasn't missing, the connection was briefly reset.

Why this is hard to notice

  • The confusing log message suggests a cleanup race or state eviction, not a transient network issue
  • The underlying Redis error updating job <id> state: ... is logged at ERROR level by async_job_state.py but only to the module logger — it doesn't surface in the job's own logs (which is what users read from the UI)
  • Other concurrent tasks in the same worker succeed, so the failure doesn't look like a Redis outage
  • FAILURE_THRESHOLD doesn't apply here (0 failures recorded in progress), so users see a job in FAILURE state with 0 failed images — confusing
  • Celery's automatic retry isn't configured for process_nats_pipeline_result (the task has no autoretry_for=(RedisError,)), so there's no resilience

Proposed fix

Two-layer defence:

1. Stop conflating transient and terminal states in update_state

Option A — raise transients, let the caller handle retries:

def update_state(self, processed_image_ids, stage, failed_image_ids=None) -> JobStateProgress | None:
    redis = self._get_redis()
    pending_key = self._get_pending_key(stage)

    # Let RedisError propagate — don't swallow it as a None return.
    with redis.pipeline() as pipe:
        ...
        results = pipe.execute()

    ...
    if total_raw is None:
        return None  # State genuinely gone (cleaned up / expired)
    ...

Option B — return a typed result that distinguishes the three cases:

@dataclass
class UpdateStateResult:
    progress: JobStateProgress | None
    state_missing: bool = False  # total_raw was None
    transient_error: Exception | None = None

def update_state(...) -> UpdateStateResult:
    ...

Option A is smaller and composes better with Celery's retry mechanism.

2. Make process_nats_pipeline_result retry on transient Redis errors

@shared_task(
    bind=True,
    autoretry_for=(RedisError, ConnectionError),
    retry_backoff=True,
    retry_backoff_max=30,
    retry_jitter=True,
    max_retries=5,
)
def process_nats_pipeline_result(self, job_id, result_data, reply_subject):
    ...

With option A in place, Celery's built-in retry handles this cleanly: the NATS message is not yet acked (ack happens later in the function), so on retry either the task completes successfully or eventually reaches max_retries and we call _fail_job — at which point we've genuinely exhausted retries and the failure is real.

3. Only call _fail_job when the state is genuinely gone

Reserve the "Redis state missing for job" log message for the total_raw is None case. If we get there, either the job was cleaned up concurrently (legitimate race) or the keys really expired (7-day TTL should make this essentially impossible). Rename the log for clarity:

if not progress_info:
    _ack_task_via_nats(reply_subject, logger)
    _fail_job(job_id, "Job state keys not found in Redis (job may have been cleaned up concurrently)")
    return

4. Surface the transient Redis error to the job logger

async_job_state.py uses logger = logging.getLogger(__name__) — the module logger, not the job logger. Users reading the job's log in the UI never see the real cause. Either pass a job_logger into the AsyncJobStateManager or have the caller log the exception against the job logger before retrying/failing.

Scope

This is not a duplicate of #1168 (zombie consumers that Django never learns about). That one is about workers never posting results; this one is about results being posted successfully but Django incorrectly killing the job on the result-handling side due to a transient infra blip.

It's also not a duplicate of #1174 (fail-fast on NATS unreachable). That one is about outbound NATS errors; this one is about inbound Redis errors during the result-processing code path.

How to reproduce

  1. Start a moderately-sized async_api job (hundreds of images)
  2. Inject a single Redis connection drop partway through — options:
    • redis-cli CLIENT KILL TYPE normal on the broker
    • Brief Redis container restart (< 5s)
    • iptables drop of a single TCP FIN
  3. Observe: job is marked FAILURE with "Redis state missing" despite other concurrent process_nats_pipeline_result tasks succeeding both before and after the drop

Related

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions