Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 191 additions & 0 deletions etc/scripts/dataset_pipeline/build_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
# extracts required phrases from .RULE files
# outputs a JSONL dataset for NER model training
import hashlib
import json
import re
import unicodedata
from collections import Counter
from pathlib import Path
import click

from licensedcode.models import Rule
from licensedcode.models import rules_data_dir as default_rules_data_dir
from licensedcode.required_phrases import get_required_phrase_verbatim
from licensedcode.tokenize import required_phrase_splitter


def normalize_phrase(phrase):
"""Clean raw marker phrase for training"""
result = phrase
# replace html entities
result = result.replace('"', '"').replace('&', '&')
result = result.replace('&lt;', '<').replace('&gt;', '>')
# strip xml tags like <name>,</license> but keep urls in angle brackets
result = re.sub(r'<(?![a-zA-Z]+://)[^>]+>', '', result)
# remove markdown backticks
result = result.replace('`', '')
# collapse whitespace and trim
result = re.sub(r'\s+', ' ', result).strip()
# strip trailing/leading punct thats not meaningful
result = result.strip('.,;:<>')
return result


def get_rule_type(rule):
"""is_* flag set on the rule"""
for flag in ('is_license_text', 'is_license_notice', 'is_license_reference',
'is_license_tag', 'is_license_intro', 'is_license_clue',
'is_false_positive'):
if getattr(rule, flag, False):
return flag
return 'unknown'


def tag_tokens(text):
"""Tag each word token with a BIOES label"""
tokens = []
labels = []
in_phrase = False
count = 0 # word tokens seen since the last {{

for tok in required_phrase_splitter(text):
if tok == '{{':
in_phrase = True
count = 0
continue
if tok == '}}':
if in_phrase and count > 0:
labels[-1] = 'S-REQ' if count == 1 else 'E-REQ'
in_phrase = False
count = 0
continue
tokens.append(tok)
if in_phrase:
labels.append('B-REQ' if count == 0 else 'I-REQ')
count += 1
else:
labels.append('O')
return tokens, labels


def assign_splits(results, threshold=50):
"""80/10/10 split. common expressions (>= threshold rules) get split per-rule,
rare ones stay together in one split"""
expr_counts = Counter(e['license_expression'] for e in results)
heavy = {e for e, c in expr_counts.items() if c >= threshold}

# rare expressions: assign each to the split that needs more rules
light_exprs = sorted((e for e in expr_counts if e not in heavy),
key=lambda x: (-expr_counts[x], x))
total = sum(expr_counts[e] for e in light_exprs)
targets = {'train': 0.8 * total, 'val': 0.1 * total, 'test': 0.1 * total}
filled = {'train': 0, 'val': 0, 'test': 0}
assignment = {}
for expr in light_exprs:
best = min(targets, key=lambda s: filled[s] / max(targets[s], 1))
assignment[expr] = best
filled[best] += expr_counts[expr]

return heavy, assignment


@click.command()
@click.option('--rules-dir', type=click.Path(exists=True), default=None,
help='Path to rules directory (defaults to repo rules dir)')
@click.option('--output-dir', default='dataset-output',
help='Output directory for train/val/test JSONL files')
def main(rules_dir, output_dir):
"""Extract required phrases from rule files for NER training"""
if not rules_dir:
repo_rules = Path(__file__).resolve().parents[3] / 'src' / 'licensedcode' / 'data' / 'rules'
rules_dir = str(repo_rules) if repo_rules.is_dir() else default_rules_data_dir

rules_path = Path(rules_dir)
out_dir = Path(output_dir)
out_dir.mkdir(parents=True, exist_ok=True)

total_rules = 0
annotated = 0
total_phrases = 0
results = []

click.echo(f'scanning rules from: {rules_path}')
for rf in sorted(rules_path.glob('*.RULE')):
try:
rule = Rule.from_file(rule_file=str(rf))
except Exception:
continue
total_rules += 1

# is_required_phrase rules don't need {{ }}.the flag covers them
if getattr(rule, 'is_required_phrase', False):
continue

text = rule.text or ''
if not text:
continue

# normalize line endings and unicode
text = text.replace('\r\n', '\n').replace('\r', '\n')
text = unicodedata.normalize('NFKC', text)

phrases = list(get_required_phrase_verbatim(text))
if not phrases:
continue

# word tokens + BIOES labels (computed before stripping markers)
tokens, bioes_labels = tag_tokens(text)

# strip out the {{ }} markers
text = text.replace('{{', '').replace('}}', '')

valid_phrases = [
{'phrase': p, 'phrase_normalized': normalize_phrase(p)}
for p in phrases
]

annotated += 1
total_phrases += len(valid_phrases)
results.append({
'identifier': rule.identifier,
'license_expression': rule.license_expression or '',
'rule_type': get_rule_type(rule),
'text': text,
'tokens': tokens,
'bioes_labels': bioes_labels,
'required_phrases': valid_phrases,
})

# split by license expression and write
heavy, assignment = assign_splits(results)
splits = {'train': [], 'val': [], 'test': []}
for entry in results:
expr = entry['license_expression']
if expr in heavy:
# common expressions: hash rule name for 80/10/10
bucket = int(hashlib.md5(entry['identifier'].encode('utf-8')).hexdigest(), 16) % 100
if bucket < 80:
splits['train'].append(entry)
elif bucket < 90:
splits['val'].append(entry)
else:
splits['test'].append(entry)
else:
splits[assignment[expr]].append(entry)

for name, records in splits.items():
path = out_dir / f'{name}.jsonl'
with open(path, 'w', encoding='utf-8') as f:
for entry in records:
f.write(json.dumps(entry, ensure_ascii=False) + '\n')

click.echo('\ndone')
click.echo(f' rules scanned: {total_rules}')
click.echo(f' annotated: {annotated}')
click.echo(f' phrases extracted: {total_phrases}')
click.echo(f' train: {len(splits["train"])} val: {len(splits["val"])} test: {len(splits["test"])}')
click.echo(f' output: {out_dir}')


if __name__ == '__main__':
main()
101 changes: 101 additions & 0 deletions etc/scripts/dataset_pipeline/test_build_dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# tests for build_dataset.py
import sys
from pathlib import Path

sys.path.insert(0, str(Path(__file__).parent))
from build_dataset import normalize_phrase, tag_tokens, assign_splits


class TestNormalizePhrase:

def test_html_entities(self):
assert normalize_phrase('&lt;b&gt;MIT&lt;/b&gt;') == 'MIT'
assert normalize_phrase('the &quot;License&quot;') == 'the "License"'
assert normalize_phrase('foo &amp; bar') == 'foo & bar'

def test_preserves_urls_in_angle_brackets(self):
result = normalize_phrase('<http://example.com/LICENSE>')
assert result == 'http://example.com/LICENSE'

def test_strips_xml_tags(self):
assert normalize_phrase('<name>Apache 2.0</name>') == 'Apache 2.0'
assert normalize_phrase('<license>MIT</license>') == 'MIT'

def test_strips_backticks(self):
assert normalize_phrase('`MIT License`') == 'MIT License'

def test_collapses_whitespace(self):
assert normalize_phrase('GNU General\n Public License') == 'GNU General Public License'

def test_strips_trailing_punct(self):
assert normalize_phrase('Apache 2.0.') == 'Apache 2.0'
assert normalize_phrase(',MIT,') == 'MIT'

def test_empty_after_strip(self):
assert normalize_phrase('<foo>') == ''
assert normalize_phrase('...') == ''


class TestTagTokens:

def test_single_phrase(self):
tokens, labels = tag_tokens('under the {{Apache License}} terms')
assert tokens == ['under', 'the', 'Apache', 'License', 'terms']
assert labels == ['O', 'O', 'B-REQ', 'E-REQ', 'O']

def test_single_word_phrase(self):
tokens, labels = tag_tokens('use {{MIT}} license')
assert tokens == ['use', 'MIT', 'license']
assert labels == ['O', 'S-REQ', 'O']

def test_multiple_phrases(self):
tokens, labels = tag_tokens('{{Apache}} and {{MIT}} stuff')
assert tokens == ['Apache', 'and', 'MIT', 'stuff']
assert labels == ['S-REQ', 'O', 'S-REQ', 'O']

def test_long_phrase(self):
tokens, labels = tag_tokens('{{GNU General Public License}}')
assert tokens == ['GNU', 'General', 'Public', 'License']
assert labels == ['B-REQ', 'I-REQ', 'I-REQ', 'E-REQ']

def test_no_markers(self):
tokens, labels = tag_tokens('released under the license')
assert tokens == ['released', 'under', 'the', 'license']
assert labels == ['O', 'O', 'O', 'O']

def test_alignment(self):
tokens, labels = tag_tokens('licensed under {{Apache License}} or {{MIT}}')
assert len(tokens) == len(labels)

def test_empty_input(self):
tokens, labels = tag_tokens('')
assert tokens == []
assert labels == []

def test_empty_markers_ignored(self):
tokens, labels = tag_tokens('licensed under {{}} the GPL')
assert tokens == ['licensed', 'under', 'the', 'GPL']
assert labels == ['O', 'O', 'O', 'O']


class TestAssignSplits:

def test_light_expressions_no_leakage(self):
results = []
for i in range(5):
for j in range(10):
results.append({'license_expression': f'license-{i}', 'identifier': f'rule_{i}_{j}.RULE'})

heavy, assignment = assign_splits(results)
assert len(heavy) == 0
assert len(assignment) == 5
assert all(s in ('train', 'val', 'test') for s in assignment.values())

def test_heavy_expressions_detected(self):
results = [{'license_expression': 'mit', 'identifier': f'mit_{i}.RULE'} for i in range(100)]
results += [{'license_expression': 'rare-1.0', 'identifier': 'rare_1.RULE'}]

heavy, assignment = assign_splits(results)
assert 'mit' in heavy
assert 'rare-1.0' not in heavy
assert 'rare-1.0' in assignment
Loading