diff --git a/packages/das/src/queue/constants.ts b/packages/das/src/queue/constants.ts index fd77f4e..04046ea 100644 --- a/packages/das/src/queue/constants.ts +++ b/packages/das/src/queue/constants.ts @@ -4,6 +4,7 @@ export const FETCH_JOBS = { PR_METADATA: "fetch-pr-metadata", PR_FILES: "fetch-pr-files", BACKFILL_REPO: "backfill-repo", + ISSUE_CLOSURE: "fetch-issue-closure", } as const; export const DEFAULT_BACKFILL_DAYS = 40; diff --git a/packages/das/src/queue/fetch.processor.ts b/packages/das/src/queue/fetch.processor.ts index 4809baf..08e1e6d 100644 --- a/packages/das/src/queue/fetch.processor.ts +++ b/packages/das/src/queue/fetch.processor.ts @@ -1,7 +1,7 @@ import { Processor, WorkerHost, InjectQueue } from "@nestjs/bullmq"; import { Logger } from "@nestjs/common"; import { InjectRepository } from "@nestjs/typeorm"; -import { In, IsNull, Repository } from "typeorm"; +import { IsNull, Repository } from "typeorm"; import { Job, Queue } from "bullmq"; import { Issue, PullRequest } from "../entities"; import { GitHubFetcherService } from "../webhook/github-fetcher.service"; @@ -29,12 +29,21 @@ export interface BackfillRepoJobData { days?: number; } +export interface IssueClosureJobData { + repoFullName: string; + issueNumber: number; +} + interface PrFilesGeneration { headSha: string | null; baseSha: string | null; } -type JobData = PrMetadataJobData | PrFilesJobData | BackfillRepoJobData; +type JobData = + | PrMetadataJobData + | PrFilesJobData + | BackfillRepoJobData + | IssueClosureJobData; @Processor(FETCH_QUEUE, { concurrency: 5 }) export class FetchProcessor extends WorkerHost { @@ -68,22 +77,52 @@ export class FetchProcessor extends WorkerHost { await this.handleBackfill(repoFullName, days ?? DEFAULT_BACKFILL_DAYS); break; } + case FETCH_JOBS.ISSUE_CLOSURE: { + const { repoFullName, issueNumber } = job.data as IssueClosureJobData; + await this.handleIssueClosure(repoFullName, issueNumber); + break; + } default: this.logger.warn(`Unknown job name: ${job.name}`); } } + private async handleIssueClosure( + repoFullName: string, + issueNumber: number, + ): Promise { + this.logger.log(`Resolving closer for ${repoFullName}#${issueNumber}`); + + const issue = await this.issueRepo.findOneBy({ + repoFullName, + issueNumber, + }); + if (!issue) return; + + // Reopens already null out solvedByPr in the webhook handler; never + // re-fetch for an open issue. + if (issue.state !== "CLOSED") { + await this.issueRepo.update( + { repoFullName, issueNumber }, + { solvedByPr: null }, + ); + return; + } + + const solvedByPr = await this.fetcher.fetchIssueClosingPr( + repoFullName, + issueNumber, + ); + + await this.issueRepo.update({ repoFullName, issueNumber }, { solvedByPr }); + } + private async handlePrMetadata( repoFullName: string, prNumber: number, ): Promise { this.logger.log(`Fetching PR metadata for ${repoFullName}#${prNumber}`); - const previousPr = await this.prRepo.findOneBy({ repoFullName, prNumber }); - const previousClosingIssueNumbers = this.uniqueIssueNumbers( - previousPr?.closingIssueNumbers ?? [], - ); - const { closingIssueNumbers, body, lastEditedAt } = await this.fetcher.fetchPrMetadata(repoFullName, prNumber); const currentClosingIssueNumbers = @@ -98,14 +137,10 @@ export class FetchProcessor extends WorkerHost { }, ); - const pr = await this.prRepo.findOneBy({ repoFullName, prNumber }); - await this.reconcileSolvedIssueLinks( - repoFullName, - prNumber, - previousClosingIssueNumbers, - currentClosingIssueNumbers, - pr?.state === "MERGED", - ); + // Issue solver attribution is closure-driven (ISSUE_CLOSURE jobs read + // ClosedEvent.closer). PR metadata only refreshes the PR-side text view + // of which issues this PR claims to close — it never writes + // issues.solved_by_pr. } private async handlePrFiles(data: PrFilesJobData): Promise { @@ -230,46 +265,6 @@ export class FetchProcessor extends WorkerHost { }; } - private async reconcileSolvedIssueLinks( - repoFullName: string, - prNumber: number, - previousIssueNumbers: number[], - currentIssueNumbers: number[], - isMerged: boolean, - ): Promise { - const currentIssueNumberSet = new Set(currentIssueNumbers); - const staleIssueNumbers = previousIssueNumbers.filter( - (issueNumber) => !currentIssueNumberSet.has(issueNumber), - ); - - const clearIssueNumbers = isMerged - ? staleIssueNumbers - : this.uniqueIssueNumbers([ - ...previousIssueNumbers, - ...currentIssueNumbers, - ]); - - if (clearIssueNumbers.length > 0) { - await this.issueRepo.update( - { - repoFullName, - issueNumber: In(clearIssueNumbers), - solvedByPr: prNumber, - }, - { solvedByPr: null }, - ); - } - - if (!isMerged || currentIssueNumbers.length === 0) { - return; - } - - await this.issueRepo.update( - { repoFullName, issueNumber: In(currentIssueNumbers) }, - { solvedByPr: prNumber }, - ); - } - private uniqueIssueNumbers(issueNumbers: number[]): number[] { return [...new Set(issueNumbers)]; } diff --git a/packages/das/src/webhook/github-fetcher.service.ts b/packages/das/src/webhook/github-fetcher.service.ts index 8a0526e..ba8a666 100644 --- a/packages/das/src/webhook/github-fetcher.service.ts +++ b/packages/das/src/webhook/github-fetcher.service.ts @@ -360,6 +360,126 @@ export class GitHubFetcherService implements OnModuleInit { .filter((number): number is number => typeof number === "number"); } + // --- GraphQL: issue closure (which PR caused the current close) --- + + /** + * Resolve the PR responsible for an issue's current closed state. + * + * Reads `ClosedEvent.closer` from the issue timeline and anchors to the + * issue's current `closedAt`, so reopen-then-reclose cycles attribute to + * the latest closer, not whichever PR first declared `Closes #N` in its + * body. Returns the PR number when the closer is a merged same-repo PR; + * `null` for manual closes, non-PR closers (commits, projects), or + * `NOT_PLANNED` closures. + * + * Source of truth for `issues.solved_by_pr`. Issue discovery and the + * issue-bounty solver lookup both read from this field, so they stay 1:1. + */ + async fetchIssueClosingPr( + repoFullName: string, + issueNumber: number, + ): Promise { + const [owner, repo] = repoFullName.split("/"); + const token = await this.getTokenForRepo(repoFullName); + + const query = ` + query($owner: String!, $repo: String!, $issue: Int!) { + repository(owner: $owner, name: $repo) { + issue(number: $issue) { + closedAt + timelineItems(itemTypes: [CLOSED_EVENT], last: 20) { + nodes { + ... on ClosedEvent { + createdAt + stateReason + closer { + __typename + ... on PullRequest { + number + merged + state + baseRepository { nameWithOwner } + } + } + } + } + } + } + } + } + `; + + const res = await this.githubFetch("https://api.github.com/graphql", { + method: "POST", + headers: { + Authorization: `Bearer ${token}`, + "Content-Type": "application/json", + }, + body: JSON.stringify({ + query, + variables: { owner, repo, issue: issueNumber }, + }), + }); + + if (!res.ok) { + throw new Error( + `GraphQL issue closure fetch failed: ${res.status} ${await res.text()}`, + ); + } + + const body: any = await res.json(); + this.assertNoGraphQLErrors(body, "Issue closure fetch"); + + const issue = body.data?.repository?.issue; + if (!issue) return null; + + return this.selectClosingPrFromTimeline(repoFullName, issue); + } + + private selectClosingPrFromTimeline( + repoFullName: string, + issue: { + closedAt: string | null; + timelineItems?: { nodes?: any[] }; + }, + ): number | null { + const closedAt = issue.closedAt; + if (!closedAt) return null; + + const expectedRepo = repoFullName.toLowerCase(); + const nodes = issue.timelineItems?.nodes ?? []; + + // Walk newest to oldest, find the close event matching the issue's + // current closedAt. NOT_PLANNED closures (and anything else non-COMPLETED) + // don't attribute a solver. + for (let i = nodes.length - 1; i >= 0; i--) { + const ev = nodes[i]; + if (!ev) continue; + const stateReason = ev.stateReason; + if ( + stateReason != null && + String(stateReason).toUpperCase() !== "COMPLETED" + ) { + continue; + } + if (ev.createdAt !== closedAt) continue; + const closer = ev.closer; + if (!closer || closer.__typename !== "PullRequest") return null; + if ( + (closer.baseRepository?.nameWithOwner ?? "").toLowerCase() !== + expectedRepo + ) { + return null; + } + const merged = + closer.merged === true || + String(closer.state ?? "").toUpperCase() === "MERGED"; + if (!merged) return null; + return typeof closer.number === "number" ? closer.number : null; + } + return null; + } + // --- PR files + contents (REST for list, batched GraphQL for contents) --- /** @@ -952,6 +1072,26 @@ export class GitHubFetcherService implements OnModuleInit { } } } + closureTimeline: timelineItems( + itemTypes: [CLOSED_EVENT] + last: 20 + ) { + nodes { + ... on ClosedEvent { + createdAt + stateReason + closer { + __typename + ... on PullRequest { + number + merged + state + baseRepository { nameWithOwner } + } + } + } + } + } } } } @@ -1024,9 +1164,13 @@ export class GitHubFetcherService implements OnModuleInit { issueData.isTransferred = true; } - if (issue.state === "OPEN") { - issueData.solvedByPr = null; - } + issueData.solvedByPr = + issue.state === "CLOSED" + ? this.selectClosingPrFromTimeline(repoFullName, { + closedAt: issue.closedAt ?? null, + timelineItems: { nodes: issue.closureTimeline?.nodes ?? [] }, + }) + : null; await this.issueRepo.upsert(issueData, ["repoFullName", "issueNumber"]); diff --git a/packages/das/src/webhook/handlers/issue.handler.ts b/packages/das/src/webhook/handlers/issue.handler.ts index fb91029..a25ba8b 100644 --- a/packages/das/src/webhook/handlers/issue.handler.ts +++ b/packages/das/src/webhook/handlers/issue.handler.ts @@ -1,8 +1,11 @@ /* eslint-disable @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unsafe-argument, @typescript-eslint/no-unsafe-return, @typescript-eslint/no-unsafe-call, @typescript-eslint/no-explicit-any */ import { Injectable } from "@nestjs/common"; import { InjectRepository } from "@nestjs/typeorm"; +import { InjectQueue } from "@nestjs/bullmq"; import { Repository } from "typeorm"; +import { Queue } from "bullmq"; import { Issue, Repo } from "../../entities"; +import { FETCH_QUEUE, FETCH_JOBS } from "../../queue/constants"; @Injectable() export class IssueHandler { @@ -11,6 +14,8 @@ export class IssueHandler { private readonly issueRepo: Repository, @InjectRepository(Repo) private readonly repoRepo: Repository, + @InjectQueue(FETCH_QUEUE) + private readonly fetchQueue: Queue, ) {} async handle(payload: Record): Promise { @@ -54,6 +59,23 @@ export class IssueHandler { await this.issueRepo.upsert(data, ["repoFullName", "issueNumber"]); + // Resolve solver attribution from the issue's ClosedEvent timeline. + // The same primitive feeds issue discovery and the issue-bounty solver + // lookup so the two paths stay 1:1. + if (payload.action === "closed") { + await this.fetchQueue.add( + FETCH_JOBS.ISSUE_CLOSURE, + { repoFullName, issueNumber: issue.number }, + { + jobId: `closure-${repoFullName}-${issue.number}`, + removeOnComplete: true, + removeOnFail: true, + attempts: 3, + backoff: { type: "exponential", delay: 5000 }, + }, + ); + } + const repoUpdate: Partial = { lastEventAt: new Date().toISOString(), };