Skip to content

Commit e4e5fc2

Browse files
committed
fix: add new source
1 parent 1175e84 commit e4e5fc2

9 files changed

Lines changed: 319 additions & 76 deletions

File tree

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
import { listDatasets, processDataset } from './activities/activities'
1+
import { listDatasets, listSources, processDataset } from './activities/activities'
22

3-
export { listDatasets, processDataset }
3+
export { listDatasets, listSources, processDataset }

services/apps/automatic_projects_discovery_worker/src/activities/activities.ts

Lines changed: 30 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,17 @@ import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
66
import { getServiceLogger } from '@crowd/logging'
77

88
import { svc } from '../main'
9-
import { getSource } from '../sources/registry'
9+
import { getAvailableSourceNames, getSource } from '../sources/registry'
1010
import { IDatasetDescriptor } from '../sources/types'
1111

1212
const log = getServiceLogger()
1313

1414
const BATCH_SIZE = 5000
1515

16+
export async function listSources(): Promise<string[]> {
17+
return getAvailableSourceNames()
18+
}
19+
1620
export async function listDatasets(sourceName: string): Promise<IDatasetDescriptor[]> {
1721
const source = getSource(sourceName)
1822
const datasets = await source.listAvailableDatasets()
@@ -32,40 +36,41 @@ export async function processDataset(
3236
log.info({ sourceName, datasetId: dataset.id, url: dataset.url }, 'Processing dataset...')
3337

3438
const source = getSource(sourceName)
39+
const stream = await source.fetchDatasetStream(dataset)
3540

36-
// We use streaming (not full download) because each CSV is ~119MB / ~750K rows.
37-
// Streaming keeps memory usage low (only one batch in memory at a time) and leverages
38-
// Node.js backpressure: if DB writes are slow, the HTTP stream pauses automatically.
39-
const httpStream = await source.fetchDatasetStream(dataset)
40-
41-
httpStream.on('error', (err: Error) => {
42-
log.error({ datasetId: dataset.id, error: err.message }, 'HTTP stream error.')
41+
stream.on('error', (err: Error) => {
42+
log.error({ datasetId: dataset.id, error: err.message }, 'Stream error.')
4343
})
4444

45-
// Pipe the raw HTTP response directly into csv-parse.
46-
// Data flows as: HTTP response → csv-parse → for-await → batch → DB
47-
const parser = httpStream.pipe(
48-
parse({
49-
columns: true,
50-
skip_empty_lines: true,
51-
trim: true,
52-
}),
53-
)
54-
55-
parser.on('error', (err) => {
56-
log.error({ datasetId: dataset.id, error: err.message }, 'CSV parser error.')
57-
})
45+
// For CSV sources: pipe through csv-parse to get Record<string, string> objects.
46+
// For JSON sources: the stream already emits pre-parsed objects in object mode.
47+
const records =
48+
source.format === 'json'
49+
? stream
50+
: stream.pipe(
51+
parse({
52+
columns: true,
53+
skip_empty_lines: true,
54+
trim: true,
55+
}),
56+
)
57+
58+
if (source.format !== 'json') {
59+
;(records as ReturnType<typeof parse>).on('error', (err) => {
60+
log.error({ datasetId: dataset.id, error: err.message }, 'CSV parser error.')
61+
})
62+
}
5863

5964
let batch: IDbProjectCatalogCreate[] = []
6065
let totalProcessed = 0
6166
let totalSkipped = 0
6267
let batchNumber = 0
6368
let totalRows = 0
6469

65-
for await (const rawRow of parser) {
70+
for await (const rawRow of records) {
6671
totalRows++
6772

68-
const parsed = source.parseRow(rawRow)
73+
const parsed = source.parseRow(rawRow as Record<string, unknown>)
6974
if (!parsed) {
7075
totalSkipped++
7176
continue
@@ -75,7 +80,8 @@ export async function processDataset(
7580
projectSlug: parsed.projectSlug,
7681
repoName: parsed.repoName,
7782
repoUrl: parsed.repoUrl,
78-
criticalityScore: parsed.criticalityScore,
83+
ossfCriticalityScore: parsed.ossfCriticalityScore,
84+
lfCriticalityScore: parsed.lfCriticalityScore,
7985
})
8086

8187
if (batch.length >= BATCH_SIZE) {
Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
import http from 'http'
2+
import https from 'https'
3+
import { Readable } from 'stream'
4+
5+
import { getServiceLogger } from '@crowd/logging'
6+
7+
import { IDatasetDescriptor, IDiscoverySource, IDiscoverySourceRow } from '../types'
8+
9+
const log = getServiceLogger()
10+
11+
const DEFAULT_API_URL = 'https://hypervascular-nonduplicative-vern.ngrok-free.dev'
12+
const PAGE_SIZE = 100
13+
14+
interface LfApiResponse {
15+
page: number
16+
pageSize: number
17+
total: number
18+
totalPages: number
19+
data: LfApiRow[]
20+
}
21+
22+
interface LfApiRow {
23+
runDate: string
24+
repoUrl: string
25+
owner: string
26+
repoName: string
27+
contributors: number
28+
organizations: number
29+
sizeSloc: number
30+
lastUpdated: number
31+
age: number
32+
commitFreq: number
33+
score: number
34+
}
35+
36+
function getApiBaseUrl(): string {
37+
return (process.env.LF_CRITICALITY_SCORE_API_URL ?? DEFAULT_API_URL).replace(/\/$/, '')
38+
}
39+
40+
async function fetchPage(
41+
baseUrl: string,
42+
startDate: string,
43+
endDate: string,
44+
page: number,
45+
): Promise<LfApiResponse> {
46+
const url = `${baseUrl}/projects/scores?startDate=${startDate}&endDate=${endDate}&page=${page}&pageSize=${PAGE_SIZE}`
47+
48+
return new Promise((resolve, reject) => {
49+
const client = url.startsWith('https://') ? https : http
50+
51+
const req = client.get(url, (res) => {
52+
if (res.statusCode !== 200) {
53+
reject(new Error(`LF Criticality Score API returned status ${res.statusCode} for ${url}`))
54+
res.resume()
55+
return
56+
}
57+
58+
const chunks: Uint8Array[] = []
59+
res.on('data', (chunk: Uint8Array) => chunks.push(chunk))
60+
res.on('end', () => {
61+
try {
62+
resolve(JSON.parse(Buffer.concat(chunks).toString('utf8')) as LfApiResponse)
63+
} catch (err) {
64+
reject(new Error(`Failed to parse LF Criticality Score API response: ${err}`))
65+
}
66+
})
67+
res.on('error', reject)
68+
})
69+
70+
req.on('error', reject)
71+
req.end()
72+
})
73+
}
74+
75+
/**
76+
* Generates the first day and last day of a given month.
77+
* monthOffset = 0 → current month, -1 → previous month, etc.
78+
*/
79+
function monthRange(monthOffset: number): { startDate: string; endDate: string } {
80+
const now = new Date()
81+
const year = now.getUTCFullYear()
82+
const month = now.getUTCMonth() + monthOffset // can be negative; Date handles rollover
83+
84+
const first = new Date(Date.UTC(year, month, 1))
85+
const last = new Date(Date.UTC(year, month + 1, 0)) // last day of month
86+
87+
const pad = (n: number) => String(n).padStart(2, '0')
88+
const fmt = (d: Date) =>
89+
`${d.getUTCFullYear()}-${pad(d.getUTCMonth() + 1)}-${pad(d.getUTCDate())}`
90+
91+
return { startDate: fmt(first), endDate: fmt(last) }
92+
}
93+
94+
export class LfCriticalityScoreSource implements IDiscoverySource {
95+
public readonly name = 'lf-criticality-score'
96+
public readonly format = 'json' as const
97+
98+
async listAvailableDatasets(): Promise<IDatasetDescriptor[]> {
99+
const baseUrl = getApiBaseUrl()
100+
101+
// Return one dataset per month for the last 12 months (newest first)
102+
const datasets: IDatasetDescriptor[] = []
103+
104+
for (let offset = 0; offset >= -11; offset--) {
105+
const { startDate, endDate } = monthRange(offset)
106+
const id = startDate.slice(0, 7) // e.g. "2026-02"
107+
108+
datasets.push({
109+
id,
110+
date: startDate,
111+
url: `${baseUrl}/projects/scores?startDate=${startDate}&endDate=${endDate}`,
112+
})
113+
}
114+
115+
return datasets
116+
}
117+
118+
/**
119+
* Returns an object-mode Readable that fetches all pages from the API
120+
* and pushes each row as a plain object. Activities.ts iterates this
121+
* directly (no csv-parse) because format === 'json'.
122+
*/
123+
async fetchDatasetStream(dataset: IDatasetDescriptor): Promise<Readable> {
124+
const baseUrl = getApiBaseUrl()
125+
126+
// Extract startDate and endDate from the stored URL
127+
const parsed = new URL(dataset.url)
128+
const startDate = parsed.searchParams.get('startDate') ?? ''
129+
const endDate = parsed.searchParams.get('endDate') ?? ''
130+
131+
const stream = new Readable({ objectMode: true, read() {} })
132+
133+
// Fetch pages asynchronously and push rows into the stream
134+
;(async () => {
135+
try {
136+
let page = 1
137+
let totalPages = 1
138+
139+
do {
140+
const response = await fetchPage(baseUrl, startDate, endDate, page)
141+
totalPages = response.totalPages
142+
143+
for (const row of response.data) {
144+
stream.push(row)
145+
}
146+
147+
log.debug(
148+
{ datasetId: dataset.id, page, totalPages, rowsInPage: response.data.length },
149+
'LF Criticality Score page fetched.',
150+
)
151+
152+
page++
153+
} while (page <= totalPages)
154+
155+
stream.push(null) // signal end of stream
156+
} catch (err) {
157+
stream.destroy(err instanceof Error ? err : new Error(String(err)))
158+
}
159+
})()
160+
161+
return stream
162+
}
163+
164+
parseRow(rawRow: Record<string, unknown>): IDiscoverySourceRow | null {
165+
const repoUrl = rawRow['repoUrl'] as string | undefined
166+
if (!repoUrl) {
167+
return null
168+
}
169+
170+
let repoName = ''
171+
let projectSlug = ''
172+
173+
try {
174+
const urlPath = new URL(repoUrl).pathname.replace(/^\//, '').replace(/\/$/, '')
175+
projectSlug = urlPath
176+
repoName = urlPath.split('/').pop() || ''
177+
} catch {
178+
const parts = repoUrl.replace(/\/$/, '').split('/')
179+
projectSlug = parts.slice(-2).join('/')
180+
repoName = parts.pop() || ''
181+
}
182+
183+
if (!projectSlug || !repoName) {
184+
return null
185+
}
186+
187+
const score = rawRow['score']
188+
const lfCriticalityScore = typeof score === 'number' ? score : parseFloat(score as string)
189+
190+
return {
191+
projectSlug,
192+
repoName,
193+
repoUrl,
194+
lfCriticalityScore: Number.isNaN(lfCriticalityScore) ? undefined : lfCriticalityScore,
195+
}
196+
}
197+
}

services/apps/automatic_projects_discovery_worker/src/sources/ossf-criticality-score/source.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ export class OssfCriticalityScoreSource implements IDiscoverySource {
3939
}
4040

4141
// CSV columns use dot notation (e.g. "repo.url", "default_score")
42-
parseRow(rawRow: Record<string, string>): IDiscoverySourceRow | null {
43-
const repoUrl = rawRow['repo.url']
42+
parseRow(rawRow: Record<string, unknown>): IDiscoverySourceRow | null {
43+
const repoUrl = rawRow['repo.url'] as string | undefined
4444
if (!repoUrl) {
4545
return null
4646
}
@@ -62,14 +62,14 @@ export class OssfCriticalityScoreSource implements IDiscoverySource {
6262
return null
6363
}
6464

65-
const criticalityScoreRaw = rawRow['default_score']
66-
const criticalityScore = criticalityScoreRaw ? parseFloat(criticalityScoreRaw) : undefined
65+
const scoreRaw = rawRow['default_score']
66+
const ossfCriticalityScore = scoreRaw ? parseFloat(scoreRaw as string) : undefined
6767

6868
return {
6969
projectSlug,
7070
repoName,
7171
repoUrl,
72-
criticalityScore: Number.isNaN(criticalityScore) ? undefined : criticalityScore,
72+
ossfCriticalityScore: Number.isNaN(ossfCriticalityScore) ? undefined : ossfCriticalityScore,
7373
}
7474
}
7575
}

services/apps/automatic_projects_discovery_worker/src/sources/registry.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
1+
import { LfCriticalityScoreSource } from './lf-criticality-score/source'
12
import { OssfCriticalityScoreSource } from './ossf-criticality-score/source'
23
import { IDiscoverySource } from './types'
34

45
// To add a new source: instantiate it here.
5-
const sources: IDiscoverySource[] = [new OssfCriticalityScoreSource()]
6+
const sources: IDiscoverySource[] = [
7+
new OssfCriticalityScoreSource(),
8+
new LfCriticalityScoreSource(),
9+
]
610

711
export function getSource(name: string): IDiscoverySource {
812
const source = sources.find((s) => s.name === name)

services/apps/automatic_projects_discovery_worker/src/sources/types.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,20 @@ export interface IDatasetDescriptor {
88

99
export interface IDiscoverySource {
1010
name: string
11+
/**
12+
* 'csv' (default): fetchDatasetStream returns a raw text stream, piped through csv-parse.
13+
* 'json': fetchDatasetStream returns an object-mode Readable that emits pre-parsed records.
14+
*/
15+
format?: 'csv' | 'json'
1116
listAvailableDatasets(): Promise<IDatasetDescriptor[]>
1217
fetchDatasetStream(dataset: IDatasetDescriptor): Promise<Readable>
13-
parseRow(rawRow: Record<string, string>): IDiscoverySourceRow | null
18+
parseRow(rawRow: Record<string, unknown>): IDiscoverySourceRow | null
1419
}
1520

1621
export interface IDiscoverySourceRow {
1722
projectSlug: string
1823
repoName: string
1924
repoUrl: string
20-
criticalityScore?: number
25+
ossfCriticalityScore?: number
26+
lfCriticalityScore?: number
2127
}

0 commit comments

Comments
 (0)