|
| 1 | +"""Combined Presidio REST service: analyzer + anonymizer on one port. |
| 2 | +
|
| 3 | +Constructs one warm AnalyzerEngine (with a native check-digit VIN recognizer) |
| 4 | +and one AnonymizerEngine at startup, exposing stock-compatible endpoints so a |
| 5 | +single PRESIDIO_URL serves both. English only. |
| 6 | +""" |
| 7 | + |
| 8 | +from typing import Any |
| 9 | + |
| 10 | +from fastapi import Body, FastAPI |
| 11 | +from presidio_analyzer import AnalyzerEngine, Pattern, PatternRecognizer, RecognizerResult |
| 12 | +from presidio_analyzer.predefined_recognizers import ( |
| 13 | + AuAbnRecognizer, |
| 14 | + AuAcnRecognizer, |
| 15 | + AuMedicareRecognizer, |
| 16 | + AuTfnRecognizer, |
| 17 | + InAadhaarRecognizer, |
| 18 | + InPanRecognizer, |
| 19 | + InPassportRecognizer, |
| 20 | + InVehicleRegistrationRecognizer, |
| 21 | + InVoterRecognizer, |
| 22 | + SgFinRecognizer, |
| 23 | + SgUenRecognizer, |
| 24 | + UkNinoRecognizer, |
| 25 | +) |
| 26 | +from presidio_anonymizer import AnonymizerEngine |
| 27 | +from presidio_anonymizer.entities import OperatorConfig |
| 28 | + |
| 29 | +# English-capable predefined recognizers Presidio ships but does NOT load by |
| 30 | +# default (UK_NINO, AU_*, IN_*, SG_*). es/it/pl/fi/th/ko recognizers are |
| 31 | +# language-locked and excluded — this image is English only. |
| 32 | +EXTRA_RECOGNIZERS = [ |
| 33 | + UkNinoRecognizer, |
| 34 | + AuAbnRecognizer, |
| 35 | + AuAcnRecognizer, |
| 36 | + AuTfnRecognizer, |
| 37 | + AuMedicareRecognizer, |
| 38 | + InPanRecognizer, |
| 39 | + InAadhaarRecognizer, |
| 40 | + InVehicleRegistrationRecognizer, |
| 41 | + InVoterRecognizer, |
| 42 | + InPassportRecognizer, |
| 43 | + SgFinRecognizer, |
| 44 | + SgUenRecognizer, |
| 45 | +] |
| 46 | + |
| 47 | + |
| 48 | +class VinRecognizer(PatternRecognizer): |
| 49 | + """VIN (17 chars, A-Z/0-9 excluding I/O/Q) with ISO 3779 check-digit |
| 50 | + validation (position 9). Validation makes accidental matches on arbitrary |
| 51 | + 17-char codes (request ids, SKUs, tokens) extremely unlikely. Some |
| 52 | + non-North-American VINs omit the check digit and are skipped — an |
| 53 | + intentional bias toward precision. |
| 54 | + """ |
| 55 | + |
| 56 | + _TRANSLIT = { |
| 57 | + **{str(d): d for d in range(10)}, |
| 58 | + "A": 1, "B": 2, "C": 3, "D": 4, "E": 5, "F": 6, "G": 7, "H": 8, |
| 59 | + "J": 1, "K": 2, "L": 3, "M": 4, "N": 5, "P": 7, "R": 9, |
| 60 | + "S": 2, "T": 3, "U": 4, "V": 5, "W": 6, "X": 7, "Y": 8, "Z": 9, |
| 61 | + } |
| 62 | + _WEIGHTS = [8, 7, 6, 5, 4, 3, 2, 10, 0, 9, 8, 7, 6, 5, 4, 3, 2] |
| 63 | + |
| 64 | + def validate_result(self, pattern_text: str): |
| 65 | + vin = pattern_text.upper() |
| 66 | + if len(vin) != 17: |
| 67 | + return False |
| 68 | + try: |
| 69 | + total = sum(self._TRANSLIT[c] * w for c, w in zip(vin, self._WEIGHTS)) |
| 70 | + except KeyError: |
| 71 | + return False |
| 72 | + check = total % 11 |
| 73 | + expected = "X" if check == 10 else str(check) |
| 74 | + return vin[8] == expected |
| 75 | + |
| 76 | + |
| 77 | +def build_analyzer() -> AnalyzerEngine: |
| 78 | + analyzer = AnalyzerEngine() |
| 79 | + vin_pattern = Pattern(name="vin", regex=r"\b[A-HJ-NPR-Z0-9]{17}\b", score=0.7) |
| 80 | + analyzer.registry.add_recognizer( |
| 81 | + VinRecognizer( |
| 82 | + supported_entity="VIN", |
| 83 | + patterns=[vin_pattern], |
| 84 | + context=["vin", "vehicle", "chassis"], |
| 85 | + ) |
| 86 | + ) |
| 87 | + for recognizer_cls in EXTRA_RECOGNIZERS: |
| 88 | + analyzer.registry.add_recognizer(recognizer_cls()) |
| 89 | + return analyzer |
| 90 | + |
| 91 | + |
| 92 | +analyzer = build_analyzer() |
| 93 | +anonymizer = AnonymizerEngine() |
| 94 | + |
| 95 | +app = FastAPI(title="Sim Presidio", docs_url=None, redoc_url=None) |
| 96 | + |
| 97 | + |
| 98 | +@app.get("/health") |
| 99 | +def health() -> dict[str, str]: |
| 100 | + return {"status": "ok"} |
| 101 | + |
| 102 | + |
| 103 | +@app.get("/supportedentities") |
| 104 | +def supported_entities(language: str = "en") -> list[str]: |
| 105 | + return analyzer.get_supported_entities(language) |
| 106 | + |
| 107 | + |
| 108 | +@app.post("/analyze") |
| 109 | +def analyze(payload: dict[str, Any] = Body(...)) -> list[dict[str, Any]]: |
| 110 | + entities = payload.get("entities") or None |
| 111 | + results = analyzer.analyze( |
| 112 | + text=payload["text"], |
| 113 | + language=payload.get("language", "en"), |
| 114 | + entities=entities, |
| 115 | + score_threshold=payload.get("score_threshold"), |
| 116 | + return_decision_process=payload.get("return_decision_process", False), |
| 117 | + ) |
| 118 | + return [r.to_dict() for r in results] |
| 119 | + |
| 120 | + |
| 121 | +@app.post("/anonymize") |
| 122 | +def anonymize(payload: dict[str, Any] = Body(...)) -> dict[str, Any]: |
| 123 | + analyzer_results = [ |
| 124 | + RecognizerResult( |
| 125 | + entity_type=r["entity_type"], |
| 126 | + start=r["start"], |
| 127 | + end=r["end"], |
| 128 | + score=r.get("score", 1.0), |
| 129 | + ) |
| 130 | + for r in payload.get("analyzer_results", []) |
| 131 | + ] |
| 132 | + raw_operators = payload.get("anonymizers") or payload.get("operators") |
| 133 | + operators = None |
| 134 | + if raw_operators: |
| 135 | + operators = {} |
| 136 | + for entity, cfg in raw_operators.items(): |
| 137 | + cfg = dict(cfg) |
| 138 | + operators[entity] = OperatorConfig(cfg.pop("type"), cfg) |
| 139 | + result = anonymizer.anonymize( |
| 140 | + text=payload["text"], |
| 141 | + analyzer_results=analyzer_results, |
| 142 | + operators=operators, |
| 143 | + ) |
| 144 | + return { |
| 145 | + "text": result.text, |
| 146 | + "items": [ |
| 147 | + { |
| 148 | + "operator": item.operator, |
| 149 | + "entity_type": item.entity_type, |
| 150 | + "start": item.start, |
| 151 | + "end": item.end, |
| 152 | + "text": item.text, |
| 153 | + } |
| 154 | + for item in result.items |
| 155 | + ], |
| 156 | + } |
0 commit comments