Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/das/src/queue/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ export const FETCH_JOBS = {
PR_METADATA: "fetch-pr-metadata",
PR_FILES: "fetch-pr-files",
BACKFILL_REPO: "backfill-repo",
ISSUE_CLOSURE: "fetch-issue-closure",
} as const;

export const DEFAULT_BACKFILL_DAYS = 40;
Expand Down
105 changes: 50 additions & 55 deletions packages/das/src/queue/fetch.processor.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Processor, WorkerHost, InjectQueue } from "@nestjs/bullmq";
import { Logger } from "@nestjs/common";
import { InjectRepository } from "@nestjs/typeorm";
import { In, IsNull, Repository } from "typeorm";
import { IsNull, Repository } from "typeorm";
import { Job, Queue } from "bullmq";
import { Issue, PullRequest } from "../entities";
import { GitHubFetcherService } from "../webhook/github-fetcher.service";
Expand Down Expand Up @@ -29,12 +29,21 @@ export interface BackfillRepoJobData {
days?: number;
}

export interface IssueClosureJobData {
repoFullName: string;
issueNumber: number;
}

interface PrFilesGeneration {
headSha: string | null;
baseSha: string | null;
}

type JobData = PrMetadataJobData | PrFilesJobData | BackfillRepoJobData;
type JobData =
| PrMetadataJobData
| PrFilesJobData
| BackfillRepoJobData
| IssueClosureJobData;

@Processor(FETCH_QUEUE, { concurrency: 5 })
export class FetchProcessor extends WorkerHost {
Expand Down Expand Up @@ -68,22 +77,52 @@ export class FetchProcessor extends WorkerHost {
await this.handleBackfill(repoFullName, days ?? DEFAULT_BACKFILL_DAYS);
break;
}
case FETCH_JOBS.ISSUE_CLOSURE: {
const { repoFullName, issueNumber } = job.data as IssueClosureJobData;
await this.handleIssueClosure(repoFullName, issueNumber);
break;
}
default:
this.logger.warn(`Unknown job name: ${job.name}`);
}
}

private async handleIssueClosure(
repoFullName: string,
issueNumber: number,
): Promise<void> {
this.logger.log(`Resolving closer for ${repoFullName}#${issueNumber}`);

const issue = await this.issueRepo.findOneBy({
repoFullName,
issueNumber,
});
if (!issue) return;

// Reopens already null out solvedByPr in the webhook handler; never
// re-fetch for an open issue.
if (issue.state !== "CLOSED") {
await this.issueRepo.update(
{ repoFullName, issueNumber },
{ solvedByPr: null },
);
return;
}

const solvedByPr = await this.fetcher.fetchIssueClosingPr(
repoFullName,
issueNumber,
);

await this.issueRepo.update({ repoFullName, issueNumber }, { solvedByPr });
}

private async handlePrMetadata(
repoFullName: string,
prNumber: number,
): Promise<void> {
this.logger.log(`Fetching PR metadata for ${repoFullName}#${prNumber}`);

const previousPr = await this.prRepo.findOneBy({ repoFullName, prNumber });
const previousClosingIssueNumbers = this.uniqueIssueNumbers(
previousPr?.closingIssueNumbers ?? [],
);

const { closingIssueNumbers, body, lastEditedAt } =
await this.fetcher.fetchPrMetadata(repoFullName, prNumber);
const currentClosingIssueNumbers =
Expand All @@ -98,14 +137,10 @@ export class FetchProcessor extends WorkerHost {
},
);

const pr = await this.prRepo.findOneBy({ repoFullName, prNumber });
await this.reconcileSolvedIssueLinks(
repoFullName,
prNumber,
previousClosingIssueNumbers,
currentClosingIssueNumbers,
pr?.state === "MERGED",
);
// Issue solver attribution is closure-driven (ISSUE_CLOSURE jobs read
// ClosedEvent.closer). PR metadata only refreshes the PR-side text view
// of which issues this PR claims to close — it never writes
// issues.solved_by_pr.
}

private async handlePrFiles(data: PrFilesJobData): Promise<void> {
Expand Down Expand Up @@ -230,46 +265,6 @@ export class FetchProcessor extends WorkerHost {
};
}

private async reconcileSolvedIssueLinks(
repoFullName: string,
prNumber: number,
previousIssueNumbers: number[],
currentIssueNumbers: number[],
isMerged: boolean,
): Promise<void> {
const currentIssueNumberSet = new Set(currentIssueNumbers);
const staleIssueNumbers = previousIssueNumbers.filter(
(issueNumber) => !currentIssueNumberSet.has(issueNumber),
);

const clearIssueNumbers = isMerged
? staleIssueNumbers
: this.uniqueIssueNumbers([
...previousIssueNumbers,
...currentIssueNumbers,
]);

if (clearIssueNumbers.length > 0) {
await this.issueRepo.update(
{
repoFullName,
issueNumber: In(clearIssueNumbers),
solvedByPr: prNumber,
},
{ solvedByPr: null },
);
}

if (!isMerged || currentIssueNumbers.length === 0) {
return;
}

await this.issueRepo.update(
{ repoFullName, issueNumber: In(currentIssueNumbers) },
{ solvedByPr: prNumber },
);
}

private uniqueIssueNumbers(issueNumbers: number[]): number[] {
return [...new Set(issueNumbers)];
}
Expand Down
150 changes: 147 additions & 3 deletions packages/das/src/webhook/github-fetcher.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,126 @@ export class GitHubFetcherService implements OnModuleInit {
.filter((number): number is number => typeof number === "number");
}

// --- GraphQL: issue closure (which PR caused the current close) ---

/**
* Resolve the PR responsible for an issue's current closed state.
*
* Reads `ClosedEvent.closer` from the issue timeline and anchors to the
* issue's current `closedAt`, so reopen-then-reclose cycles attribute to
* the latest closer, not whichever PR first declared `Closes #N` in its
* body. Returns the PR number when the closer is a merged same-repo PR;
* `null` for manual closes, non-PR closers (commits, projects), or
* `NOT_PLANNED` closures.
*
* Source of truth for `issues.solved_by_pr`. Issue discovery and the
* issue-bounty solver lookup both read from this field, so they stay 1:1.
*/
async fetchIssueClosingPr(
repoFullName: string,
issueNumber: number,
): Promise<number | null> {
const [owner, repo] = repoFullName.split("/");
const token = await this.getTokenForRepo(repoFullName);

const query = `
query($owner: String!, $repo: String!, $issue: Int!) {
repository(owner: $owner, name: $repo) {
issue(number: $issue) {
closedAt
timelineItems(itemTypes: [CLOSED_EVENT], last: 20) {
nodes {
... on ClosedEvent {
createdAt
stateReason
closer {
__typename
... on PullRequest {
number
merged
state
baseRepository { nameWithOwner }
}
}
}
}
}
}
}
}
`;

const res = await this.githubFetch("https://api.github.com/graphql", {
method: "POST",
headers: {
Authorization: `Bearer ${token}`,
"Content-Type": "application/json",
},
body: JSON.stringify({
query,
variables: { owner, repo, issue: issueNumber },
}),
});

if (!res.ok) {
throw new Error(
`GraphQL issue closure fetch failed: ${res.status} ${await res.text()}`,
);
}

const body: any = await res.json();
this.assertNoGraphQLErrors(body, "Issue closure fetch");

const issue = body.data?.repository?.issue;
if (!issue) return null;

return this.selectClosingPrFromTimeline(repoFullName, issue);
}

private selectClosingPrFromTimeline(
repoFullName: string,
issue: {
closedAt: string | null;
timelineItems?: { nodes?: any[] };
},
): number | null {
const closedAt = issue.closedAt;
if (!closedAt) return null;

const expectedRepo = repoFullName.toLowerCase();
const nodes = issue.timelineItems?.nodes ?? [];

// Walk newest to oldest, find the close event matching the issue's
// current closedAt. NOT_PLANNED closures (and anything else non-COMPLETED)
// don't attribute a solver.
for (let i = nodes.length - 1; i >= 0; i--) {
const ev = nodes[i];
if (!ev) continue;
const stateReason = ev.stateReason;
if (
stateReason != null &&
String(stateReason).toUpperCase() !== "COMPLETED"
) {
continue;
}
if (ev.createdAt !== closedAt) continue;
const closer = ev.closer;
if (!closer || closer.__typename !== "PullRequest") return null;
if (
(closer.baseRepository?.nameWithOwner ?? "").toLowerCase() !==
expectedRepo
) {
return null;
}
const merged =
closer.merged === true ||
String(closer.state ?? "").toUpperCase() === "MERGED";
if (!merged) return null;
return typeof closer.number === "number" ? closer.number : null;
}
return null;
}

// --- PR files + contents (REST for list, batched GraphQL for contents) ---

/**
Expand Down Expand Up @@ -952,6 +1072,26 @@ export class GitHubFetcherService implements OnModuleInit {
}
}
}
closureTimeline: timelineItems(
itemTypes: [CLOSED_EVENT]
last: 20
) {
nodes {
... on ClosedEvent {
createdAt
stateReason
closer {
__typename
... on PullRequest {
number
merged
state
baseRepository { nameWithOwner }
}
}
}
}
}
}
}
}
Expand Down Expand Up @@ -1024,9 +1164,13 @@ export class GitHubFetcherService implements OnModuleInit {
issueData.isTransferred = true;
}

if (issue.state === "OPEN") {
issueData.solvedByPr = null;
}
issueData.solvedByPr =
issue.state === "CLOSED"
? this.selectClosingPrFromTimeline(repoFullName, {
closedAt: issue.closedAt ?? null,
timelineItems: { nodes: issue.closureTimeline?.nodes ?? [] },
})
: null;

await this.issueRepo.upsert(issueData, ["repoFullName", "issueNumber"]);

Expand Down
22 changes: 22 additions & 0 deletions packages/das/src/webhook/handlers/issue.handler.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
/* eslint-disable @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unsafe-argument, @typescript-eslint/no-unsafe-return, @typescript-eslint/no-unsafe-call, @typescript-eslint/no-explicit-any */
import { Injectable } from "@nestjs/common";
import { InjectRepository } from "@nestjs/typeorm";
import { InjectQueue } from "@nestjs/bullmq";
import { Repository } from "typeorm";
import { Queue } from "bullmq";
import { Issue, Repo } from "../../entities";
import { FETCH_QUEUE, FETCH_JOBS } from "../../queue/constants";

@Injectable()
export class IssueHandler {
Expand All @@ -11,6 +14,8 @@ export class IssueHandler {
private readonly issueRepo: Repository<Issue>,
@InjectRepository(Repo)
private readonly repoRepo: Repository<Repo>,
@InjectQueue(FETCH_QUEUE)
private readonly fetchQueue: Queue,
) {}

async handle(payload: Record<string, any>): Promise<void> {
Expand Down Expand Up @@ -54,6 +59,23 @@ export class IssueHandler {

await this.issueRepo.upsert(data, ["repoFullName", "issueNumber"]);

// Resolve solver attribution from the issue's ClosedEvent timeline.
// The same primitive feeds issue discovery and the issue-bounty solver
// lookup so the two paths stay 1:1.
if (payload.action === "closed") {
await this.fetchQueue.add(
FETCH_JOBS.ISSUE_CLOSURE,
{ repoFullName, issueNumber: issue.number },
{
jobId: `closure-${repoFullName}-${issue.number}`,
removeOnComplete: true,
removeOnFail: true,
attempts: 3,
backoff: { type: "exponential", delay: 5000 },
},
);
}

const repoUpdate: Partial<Repo> = {
lastEventAt: new Date().toISOString(),
};
Expand Down
Loading