diff --git a/core/Command/TaskProcessing/WorkerCommand.php b/core/Command/TaskProcessing/WorkerCommand.php index 1f6d473f4e849..faf6f8da17f4c 100644 --- a/core/Command/TaskProcessing/WorkerCommand.php +++ b/core/Command/TaskProcessing/WorkerCommand.php @@ -165,17 +165,31 @@ private function processNextTask(OutputInterface $output, array $taskTypes = []) // Fetch the oldest scheduled task across all eligible task types in one query. // This naturally prevents starvation: regardless of how many tasks one provider // has queued, another provider's older tasks will be picked up first. - try { - $task = $this->taskProcessingManager->getNextScheduledTask(array_keys($eligibleProviders)); - } catch (NotFoundException) { - return false; - } catch (Exception $e) { - $this->logger->error('Unknown error while retrieving scheduled TaskProcessing tasks', ['exception' => $e]); - return false; - } + $taskIdsToIgnore = []; + while (true) { + try { + $task = $this->taskProcessingManager->getNextScheduledTask(array_keys($eligibleProviders), $taskIdsToIgnore); + } catch (NotFoundException) { + return false; + } catch (Exception $e) { + $this->logger->error('Unknown error while retrieving scheduled TaskProcessing tasks', ['exception' => $e]); + return false; + } - $taskTypeId = $task->getTaskTypeId(); - $provider = $eligibleProviders[$taskTypeId]; + $taskTypeId = $task->getTaskTypeId(); + if (!isset($eligibleProviders[$taskTypeId])) { + $taskIdsToIgnore[] = (int)$task->getId(); + continue; + } + $provider = $eligibleProviders[$taskTypeId]; + + // Atomically claim the task; if another worker grabbed it between + // the SELECT above and now, lockTask() returns false -> skip it. + if ($this->taskProcessingManager->lockTask($task)) { + break; + } + $taskIdsToIgnore[] = (int)$task->getId(); + } $output->writeln( 'Processing task ' . $task->getId() . ' of type ' . $taskTypeId . ' with provider ' . $provider->getId(), diff --git a/lib/private/TaskProcessing/Db/TaskMapper.php b/lib/private/TaskProcessing/Db/TaskMapper.php index 4e5e1c4695af7..5818929b6172a 100644 --- a/lib/private/TaskProcessing/Db/TaskMapper.php +++ b/lib/private/TaskProcessing/Db/TaskMapper.php @@ -227,7 +227,7 @@ public function lockTask(Entity $entity): int { $qb->update($this->tableName) ->set('status', $qb->createPositionalParameter(\OCP\TaskProcessing\Task::STATUS_RUNNING, IQueryBuilder::PARAM_INT)) ->where($qb->expr()->eq('id', $qb->createPositionalParameter($entity->getId(), IQueryBuilder::PARAM_INT))) - ->andWhere($qb->expr()->neq('status', $qb->createPositionalParameter(2, IQueryBuilder::PARAM_INT))); + ->andWhere($qb->expr()->eq('status', $qb->createPositionalParameter(\OCP\TaskProcessing\Task::STATUS_SCHEDULED, IQueryBuilder::PARAM_INT))); try { return $qb->executeStatement(); } catch (Exception) {