From c92c4736f54336ef996e67966a7a5e8a023457cf Mon Sep 17 00:00:00 2001 From: Chengbiao Jin Date: Tue, 23 Jun 2026 13:32:00 -0700 Subject: [PATCH 1/7] GML-2132 Add knowledge graph compatibility check and repair - Scan an existing graph for installed queries that have drifted from the shipped version or are missing - Repair them in place without rebuilding the knowledge graph - Refuse repair while a rebuild is in progress and run it under the per-graph lock --- common/db/migrate.py | 224 +++++++++++++++ graphrag-ui/src/pages/setup/KGAdmin.tsx | 349 ++++++++++++++++++++++- graphrag/app/routers/ui.py | 364 +++++++++++++++++++++++- 3 files changed, 930 insertions(+), 7 deletions(-) create mode 100644 common/db/migrate.py diff --git a/common/db/migrate.py b/common/db/migrate.py new file mode 100644 index 0000000..c76864d --- /dev/null +++ b/common/db/migrate.py @@ -0,0 +1,224 @@ +"""Generic migration helpers for upgrading existing graphs to the +current release's GSQL queries and schema. + +The release-cut workflow that motivates this module: + +* On an existing graph (created against an older version), the customer + upgrades graphrag. The new release may ship modified GSQL query + bodies or expanded vertex/edge attributes. Without an automatic + migration step the old, stale objects keep serving requests — leading + to surprising behavior that's hard to attribute. + +* ``check_and_reinstall_queries`` compares each shipped ``.gsql`` file + against the body currently installed on TigerGraph and re-creates + + re-installs only the ones whose body has actually drifted. + +* ``check_and_apply_schema`` (still a stub — see TODO below) is the + schema counterpart: detect missing attributes on existing vertex / + edge types and emit ``ALTER VERTEX ... ADD ATTRIBUTE …`` statements. + +Designed to be importable from both the graphrag FastAPI app (sync TG +connection via pyTigerGraph) and the ECC worker (async connection). +The sync entry points wrap the same comparison logic. +""" + +from __future__ import annotations + +import hashlib +import logging +import os +import re +from typing import Iterable + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# GSQL body normalization +# --------------------------------------------------------------------------- +# +# We compare the local ``.gsql`` text with TG's ``SHOW QUERY`` output. +# TG may canonicalize comments and whitespace differently from what was +# CREATE-d, so a literal byte-compare is noisy. Normalize both sides +# (strip comments, collapse whitespace) before hashing. +_LINE_COMMENT_RE = re.compile(r"//[^\n]*") +_BLOCK_COMMENT_RE = re.compile(r"/\*.*?\*/", re.DOTALL) +_WHITESPACE_RE = re.compile(r"\s+") + + +def _normalize_gsql(body: str) -> str: + body = _BLOCK_COMMENT_RE.sub("", body) + body = _LINE_COMMENT_RE.sub("", body) + body = _WHITESPACE_RE.sub(" ", body).strip() + return body + + +def _gsql_hash(body: str) -> str: + return hashlib.sha256(_normalize_gsql(body).encode()).hexdigest()[:16] + + +# Pull the ``CREATE … QUERY (…) { … }`` block out of TG's +# ``SHOW QUERY `` output. The output also carries a status banner +# we don't want to fold into the hash. +_QUERY_BLOCK_RE = re.compile( + r"(CREATE\s+(?:OR\s+REPLACE\s+)?(?:DISTRIBUTED\s+)?QUERY\s+\w+.*)", + re.DOTALL, +) + + +def _extract_query_body(show_query_output: str) -> str: + m = _QUERY_BLOCK_RE.search(show_query_output) + return m.group(1) if m else "" + + +def _query_name_from_path(query_path: str) -> str: + """``common/gsql/graphrag/StreamIds.gsql`` → ``StreamIds``.""" + base = os.path.basename(query_path) + return base[:-5] if base.endswith(".gsql") else base + + +def _read_local_query(query_path: str) -> str | None: + try: + with open(query_path, "r") as f: + return f.read() + except FileNotFoundError: + logger.warning(f"Local query file missing: {query_path}") + return None + + +# --------------------------------------------------------------------------- +# Sync API (used by graphrag/app/supportai/supportai.py init_supportai) +# --------------------------------------------------------------------------- + +def query_needs_update_sync(conn, graphname: str, query_path: str) -> bool: + """Return True when the local ``.gsql`` body differs from what's + installed on TG (or when the query is missing on TG). + + Wraps a synchronous ``conn.gsql()`` call. Errors fetching the + installed body are treated as "needs update" so the caller falls + back to re-installation rather than silently skipping. + """ + local_body = _read_local_query(query_path) + if local_body is None: + return False # nothing local to reinstall + + q_name = _query_name_from_path(query_path) + local_hash = _gsql_hash(local_body) + + try: + installed_text = conn.gsql(f"USE GRAPH {graphname}\nSHOW QUERY {q_name}") + except Exception as e: + logger.warning(f"SHOW QUERY {q_name} failed ({e}); will reinstall.") + return True + + installed_body = _extract_query_body(str(installed_text)) + if not installed_body: + logger.info(f"Query '{q_name}' not installed yet; will install.") + return True + + installed_hash = _gsql_hash(installed_body) + drifted = local_hash != installed_hash + if drifted: + logger.info( + f"Query '{q_name}' body has drifted from local ({installed_hash} != " + f"{local_hash}); will reinstall." + ) + return drifted + + +def filter_queries_needing_update_sync( + conn, + graphname: str, + query_paths: Iterable[str], +) -> list[str]: + """Return the subset of ``query_paths`` whose local body differs + from TG's installed body. Use to skip unnecessary CREATE OR REPLACE + + INSTALL QUERY ALL roundtrips on warm graphs. + """ + return [p for p in query_paths if query_needs_update_sync(conn, graphname, p)] + + +# --------------------------------------------------------------------------- +# Async API (used by ecc/app/graphrag/util.py install_queries) +# --------------------------------------------------------------------------- + +async def query_needs_update_async(conn, query_path: str) -> bool: + """Async variant of :func:`query_needs_update_sync` for the ECC + worker's ``AsyncTigerGraphConnection``. Reads ``conn.graphname`` + rather than taking it as a separate arg, matching how the rest of + the ECC code threads the connection. + """ + local_body = _read_local_query(query_path) + if local_body is None: + return False + + q_name = _query_name_from_path(query_path) + local_hash = _gsql_hash(local_body) + + try: + installed_text = await conn.gsql( + f"USE GRAPH {conn.graphname}\nSHOW QUERY {q_name}" + ) + except Exception as e: + logger.warning(f"SHOW QUERY {q_name} failed ({e}); will reinstall.") + return True + + installed_body = _extract_query_body(str(installed_text)) + if not installed_body: + logger.info(f"Query '{q_name}' not installed yet; will install.") + return True + + installed_hash = _gsql_hash(installed_body) + drifted = local_hash != installed_hash + if drifted: + logger.info( + f"Query '{q_name}' body has drifted from local ({installed_hash} != " + f"{local_hash}); will reinstall." + ) + return drifted + + +async def filter_queries_needing_update_async( + conn, + query_paths: Iterable[str], +) -> list[str]: + out: list[str] = [] + for p in query_paths: + if await query_needs_update_async(conn, p): + out.append(p) + return out + + +# --------------------------------------------------------------------------- +# Schema migration — TODO +# --------------------------------------------------------------------------- +# +# Goal: detect attributes that exist in the shipped schema ``.gsql`` +# files but are missing on the live graph, and emit +# ``ALTER VERTEX ADD ATTRIBUTE [DEFAULT …]`` +# statements wrapped in a ``CREATE SCHEMA_CHANGE JOB`` so the operator +# never has to run them by hand. +# +# Outline (deferred implementation): +# 1. Parse each shipped ``SupportAI_Schema*.gsql`` (and any other +# schema-relevant .gsql) to build the expected +# ``{vertex_type: {attr: tg_type}}`` map. +# 2. Query live schema via ``conn.getSchema()`` (or parse ``ls`` +# output) to build the same map for the running graph. +# 3. For each declared type, compute ``expected - current``. +# 4. Emit one ``CREATE SCHEMA_CHANGE JOB`` that ADDs the missing +# attributes with their declared defaults. +# 5. ``RUN SCHEMA_CHANGE JOB`` and drop it. +# +# v1.4.2 doesn't add any new attributes on existing vertex types +# (the ``Document.name`` / ``Image.name`` from v2.0 A3 were +# intentionally skipped), so this is a no-op for the current release. +# Stub kept here so future migrations have a place to land. + +def check_and_apply_schema(conn, graphname: str) -> dict: + """Compare expected vertex/edge attributes against the live schema + and apply any missing additions. Returns a summary dict. + + Stubbed for v1.4.2 — no attribute additions ship in this release. + """ + return {"applied": [], "skipped_reason": "no schema deltas in this release"} diff --git a/graphrag-ui/src/pages/setup/KGAdmin.tsx b/graphrag-ui/src/pages/setup/KGAdmin.tsx index b84490e..7be4492 100644 --- a/graphrag-ui/src/pages/setup/KGAdmin.tsx +++ b/graphrag-ui/src/pages/setup/KGAdmin.tsx @@ -2,7 +2,7 @@ import React, { useState, useEffect, useRef } from "react"; import { Button } from "@/components/ui/button"; import { Input } from "@/components/ui/input"; import { TagInput, TypeHint } from "@/components/ui/tag-input"; -import { Database, Loader2, RefreshCw, Upload } from "lucide-react"; +import { Database, Loader2, RefreshCw, Upload, Wrench } from "lucide-react"; import { pauseIdleTimer, resumeIdleTimer, pingIdleTimer } from "@/hooks/useIdleTimeout"; import { Dialog, @@ -35,6 +35,28 @@ const INPUT_CLIP_FIX: React.CSSProperties = { lineHeight: "1.5", }; +/** + * Returns a human-readable error string when a graph name violates naming rules, + * or null when the name is valid. + * Rules: must start with a letter or underscore; remaining chars must be + * letters, digits, or underscores (hyphens and other special characters are not allowed). + */ +function getGraphNameError(name: string): string | null { + const firstChar = name[0]; + if (/[0-9]/.test(firstChar)) { + return `Invalid graph name — cannot start with a number ('${firstChar}'). Use a letter or underscore as the first character.`; + } + if (!/^[A-Za-z_]/.test(name)) { + return `Invalid graph name — cannot start with '${firstChar}'. Use a letter or underscore as the first character.`; + } + const invalidChars = [...new Set(name.slice(1).replace(/[A-Za-z0-9_]/g, "").split(""))].filter(Boolean); + if (invalidChars.length > 0) { + const listed = invalidChars.map((c) => `'${c}'`).join(", "); + return `Invalid graph name — ${listed} ${invalidChars.length === 1 ? "is" : "are"} not allowed. Only letters, numbers, and underscores are permitted.`; + } + return null; +} + const KGAdmin = () => { const [confirm, confirmDialog, isConfirmDialogOpen] = useConfirm(); const [showAlert, alertDialog] = useAlert(); @@ -45,6 +67,23 @@ const KGAdmin = () => { const [initializeDialogOpen, setInitializeDialogOpen] = useState(false); const [refreshDialogOpen, setRefreshDialogOpen] = useState(false); const [ingestDialogOpen, setIngestDialogOpen] = useState(false); + const [migrationDialogOpen, setMigrationDialogOpen] = useState(false); + + // Migration Assistant state + const [migrationGraph, setMigrationGraph] = useState(""); + const [migrationStatus, setMigrationStatus] = useState<{ + queries?: { + outdated: string[]; + up_to_date: string[]; + not_installed: string[]; + missing_files: string[]; + }; + needs_repair?: boolean; + } | null>(null); + const [migrationChecking, setMigrationChecking] = useState(false); + const [migrationApplying, setMigrationApplying] = useState(false); + const [migrationMessage, setMigrationMessage] = useState(""); + const [migrationApplyNotInstalled, setMigrationApplyNotInstalled] = useState(false); // Reset states when dialogs close const handleInitializeDialogChange = (open: boolean) => { if (!open && isConfirmDialogOpen) { @@ -82,6 +121,96 @@ const KGAdmin = () => { setCollectedEdgeDescs({}); }; + const runMigrationCheck = async (graph: string) => { + if (!graph.trim()) { + setMigrationMessage("Pick a graph to check."); + return; + } + const creds = sessionStorage.getItem("auth"); + if (!creds) { + setMigrationMessage("Not authenticated."); + return; + } + setMigrationChecking(true); + setMigrationMessage(""); + setMigrationStatus(null); + try { + const resp = await fetch(`/ui/${graph}/migration/status`, { + headers: { Authorization: creds }, + }); + const data = await resp.json(); + if (!resp.ok) { + setMigrationMessage( + data?.detail || `Check failed: ${resp.statusText}` + ); + return; + } + setMigrationStatus(data); + if (!data.needs_repair) { + setMigrationMessage("✅ Graph is up to date — no repairs needed."); + } else { + const out = data.queries?.outdated?.length || 0; + const miss = data.queries?.not_installed?.length || 0; + setMigrationMessage( + `Found ${out} outdated query(s) and ${miss} not installed.` + ); + } + } catch (err: any) { + setMigrationMessage(`Check failed: ${err.message || err}`); + } finally { + setMigrationChecking(false); + } + }; + + const runMigrationApply = async () => { + if (!migrationGraph) return; + const creds = sessionStorage.getItem("auth"); + if (!creds) { + setMigrationMessage("Not authenticated."); + return; + } + setMigrationApplying(true); + setMigrationMessage("Applying repairs…"); + try { + const resp = await fetch(`/ui/${migrationGraph}/migration/apply`, { + method: "POST", + headers: { + Authorization: creds, + "Content-Type": "application/json", + }, + body: JSON.stringify({ + apply_outdated: true, + apply_not_installed: migrationApplyNotInstalled, + }), + }); + const data = await resp.json(); + if (!resp.ok) { + setMigrationMessage( + data?.detail || `Apply failed: ${resp.statusText}` + ); + return; + } + const reinst = data.queries_reinstalled?.length || 0; + const newInst = data.queries_installed_new?.length || 0; + const errs = data.errors?.length || 0; + if (errs > 0) { + setMigrationMessage( + `⚠️ Repaired ${reinst} outdated, installed ${newInst} new — ${errs} error(s).` + ); + } else { + setMigrationMessage( + `✅ Repaired ${reinst} outdated; installed ${newInst} new query(s).` + ); + } + // Re-run the check so the user sees the updated state. + await runMigrationCheck(migrationGraph); + } catch (err: any) { + setMigrationMessage(`Apply failed: ${err.message || err}`); + } finally { + setMigrationApplying(false); + } + }; + const handleRefreshDialogChange = (open: boolean) => { if (!open && isConfirmDialogOpen) { return; @@ -615,8 +744,9 @@ const KGAdmin = () => { setPrecheckMessage("Please enter a graph name first."); return; } - if (!/^[A-Za-z_][A-Za-z0-9_]*$/.test(graphName)) { - setPrecheckMessage("Invalid graph name — must start with a letter or underscore."); + const nameError = getGraphNameError(graphName); + if (nameError) { + setPrecheckMessage(nameError); return; } setPrecheckRunning(true); @@ -712,8 +842,9 @@ const KGAdmin = () => { return; } - if (!/^[A-Za-z_][A-Za-z0-9_]*$/.test(graphName)) { - setStatusMessage("Invalid graph name. Must start with a letter or underscore, followed by letters, digits, or underscores."); + const nameError = getGraphNameError(graphName); + if (nameError) { + setStatusMessage(nameError); setStatusType("error"); return; } @@ -1197,6 +1328,34 @@ const KGAdmin = () => { + {/* Compatibility Check Card */} +
+
+
+ +
+

+ Compatibility Check +

+

+ Check an existing graph against the current release and repair any drifted queries. +

+
+
+ +
+
+ {/* Initialize Dialog */} @@ -2512,7 +2671,7 @@ const KGAdmin = () => { + + + + {isRebuildRunning && ( +
+ ⚠️ A rebuild is currently in progress. Migration check and repair are disabled + until the rebuild completes — they would conflict on TG's catalog locks. +
+ )} + + {migrationStatus && ( +
+ {(migrationStatus.queries?.outdated?.length ?? 0) > 0 && ( +
+
+ Outdated ({migrationStatus.queries!.outdated.length}) — body drifted from current release: +
+
+ {migrationStatus.queries!.outdated.join(", ")} +
+
+ )} + {(migrationStatus.queries?.not_installed?.length ?? 0) > 0 && ( +
+
+ Not installed ({migrationStatus.queries!.not_installed.length}): +
+
+ {migrationStatus.queries!.not_installed.join(", ")} +
+
+ )} + {(migrationStatus.queries?.up_to_date?.length ?? 0) > 0 && ( +
+
+ Up to date ({migrationStatus.queries!.up_to_date.length}). +
+
+ )} +
+ )} + + {migrationStatus?.needs_repair && (migrationStatus.queries?.not_installed?.length ?? 0) > 0 && ( + + )} + + {migrationMessage && ( +
+ {migrationMessage} +
+ )} + + + + + {migrationStatus?.needs_repair && ( + + )} + + + {confirmDialog} {alertDialog} diff --git a/graphrag/app/routers/ui.py b/graphrag/app/routers/ui.py index c0d5426..cf42e1c 100644 --- a/graphrag/app/routers/ui.py +++ b/graphrag/app/routers/ui.py @@ -1183,6 +1183,356 @@ def suggest_type_descriptions( } +def _get_domain_schema_for_render(conn, graphname: str): + """Snapshot the live domain schema for templated-retriever rendering. + + Returns ``(domain_vts, domain_edges, include_entity)`` sorted for + deterministic rendering. Returns empty lists on any failure — the + caller falls back to comparing the unrendered template (acceptable + for graphs without dynamic schema). + """ + try: + from common.db.schema_utils import read_existing_schema, is_structural_type + snapshot = read_existing_schema(conn) + domain_vts = sorted( + v for v in snapshot.vertex_types if not is_structural_type(v) + ) + domain_edges = sorted( + e for e in snapshot.edge_pairs.keys() + if not is_structural_type(e) and not e.startswith("reverse_") + ) + # Always include Entity in the community-walk start set, matching + # what install_retrievers does at init. + return domain_vts, domain_edges, True + except Exception as e: + logger.warning( + f"_get_domain_schema_for_render({graphname}) failed: {e}; " + f"templated retrievers will be checked against unrendered template" + ) + return [], [], False + + +def _local_query_hash(q_path: str, q_name: str, + domain_vts: list, domain_edges: list, include_entity: bool): + """Return the normalized hash for the LOCAL query body. + + For templated retrievers, render the template with the graph's live + domain VTs / edges first — otherwise the local template (no domain + types injected) won't match the installed body (which has them + baked in at install time). + """ + from common.db.migrate import _gsql_hash, _read_local_query + from common.db.retriever_render import TEMPLATED_RETRIEVERS, render_retriever_body + + body = _read_local_query(q_path) + if body is None: + return None + if q_name in TEMPLATED_RETRIEVERS: + body = render_retriever_body( + body, + domain_vts=domain_vts, + domain_edges=domain_edges, + include_entity=include_entity, + ) + return _gsql_hash(body) + + +# DISTRIBUTED QUERY bodies the migration assistant checks for drift, +# spanning the ECC and supportai namespaces and the shipped retrievers. +# Loading-job and schema-change .gsql files are excluded — they aren't +# queries and SHOW QUERY can't introspect them; schema-change tracking +# lives in ``common.db.migrate.check_and_apply_schema``. +_MIGRATION_QUERY_PATHS = [ + # graphrag namespace (ECC REQUIRED_QUERIES) + "common/gsql/graphrag/StreamIds.gsql", + "common/gsql/graphrag/StreamDocContent.gsql", + "common/gsql/graphrag/StreamChunkContent.gsql", + "common/gsql/graphrag/SetEpochProcessing.gsql", + "common/gsql/graphrag/get_vertices_or_remove.gsql", + # graphrag namespace (COMMUNITY_QUERIES) + "common/gsql/graphrag/get_community_children.gsql", + "common/gsql/graphrag/communities_have_desc.gsql", + "common/gsql/graphrag/graphrag_delete_all_communities.gsql", + "common/gsql/graphrag/graphrag_stream_entity_community_pairs.gsql", + "common/gsql/graphrag/graphrag_stream_all_ids.gsql", + "common/gsql/graphrag/louvain/graphrag_louvain_init.gsql", + "common/gsql/graphrag/louvain/graphrag_louvain_communities.gsql", + "common/gsql/graphrag/louvain/modularity.gsql", + "common/gsql/graphrag/louvain/stream_community.gsql", + # supportai (ECC checker + init_supportai) + "common/gsql/supportai/Scan_For_Updates.gsql", + "common/gsql/supportai/Update_Vertices_Processing_Status.gsql", + "common/gsql/supportai/Selected_Set_Display.gsql", + "common/gsql/supportai/ECC_Status.gsql", + "common/gsql/supportai/Check_Nonexistent_Vertices.gsql", + # supportai retrievers — only the vector variants and Display queries + # init_supportai actually installs on vector-enabled graphs. The legacy + # non-vector retrievers are excluded so they aren't flagged as missing. + "common/gsql/supportai/retrievers/Chunk_Sibling_Vector_Search.gsql", + "common/gsql/supportai/retrievers/Content_Similarity_Vector_Search.gsql", + "common/gsql/supportai/retrievers/GraphRAG_Community_Search_Display.gsql", + "common/gsql/supportai/retrievers/GraphRAG_Community_Vector_Search.gsql", + "common/gsql/supportai/retrievers/GraphRAG_Hybrid_Search_Display.gsql", + "common/gsql/supportai/retrievers/GraphRAG_Hybrid_Vector_Search.gsql", +] + + +@router.get(route_prefix + "/{graphname}/migration/status") +def migration_status( + graphname: ValidGraphName, + creds: Annotated[tuple[list[str], HTTPBasicCredentials], Depends(ui_basic_auth)], +): + """Compatibility check for an existing graph. Reports each shipped + GSQL query as ``up_to_date``, ``outdated`` (installed but body + drifted from local), or ``not_installed`` (query expected by the + current release but absent on TG). + + Read-only — does NOT modify the graph. Pair with + ``POST /migration/apply`` to actually repair. + + Schema-attribute drift is reported as ``{}`` for now; the detection + is stubbed in ``common.db.migrate.check_and_apply_schema``. + """ + from common.db.migrate import _gsql_hash, _extract_query_body + + import os.path + + cred_obj = creds[1] + conn = get_db_connection_pwd_manual(graphname, cred_obj.username, cred_obj.password) + + # Read live domain schema once for templated-retriever rendering. + domain_vts, domain_edges, include_entity = _get_domain_schema_for_render(conn, graphname) + + outdated: list[str] = [] + up_to_date: list[str] = [] + not_installed: list[str] = [] + missing_files: list[str] = [] + + # Use SHOW QUERY as the single source of truth: a query that + # doesn't exist on TG returns output with no extractable CREATE + # block (or an explicit error string). Avoids the extra + # ``getEndpoints`` round-trip and its token-auth requirement. + for q_path in _MIGRATION_QUERY_PATHS: + if not os.path.exists(q_path): + missing_files.append(q_path) + continue + q_name = os.path.splitext(os.path.basename(q_path))[0] + local_hash = _local_query_hash( + q_path, q_name, domain_vts, domain_edges, include_entity + ) + if local_hash is None: + missing_files.append(q_path) + continue + try: + show_out = conn.gsql(f"USE GRAPH {graphname}\nSHOW QUERY {q_name}") + except Exception as e: + logger.warning(f"migration_status: SHOW QUERY {q_name} failed: {e}") + not_installed.append(q_name) + continue + s = str(show_out) + installed_body = _extract_query_body(s) + if not installed_body: + not_installed.append(q_name) + continue + if _gsql_hash(installed_body) != local_hash: + outdated.append(q_name) + else: + up_to_date.append(q_name) + + return { + "graphname": graphname, + "queries": { + "outdated": outdated, + "up_to_date": up_to_date, + "not_installed": not_installed, + "missing_files": missing_files, + }, + "schema": { + # Populated from common.db.migrate.check_and_apply_schema once + # attribute additions are tracked; empty until then. + "missing_attributes": {}, + "schema_change_required": False, + }, + "needs_repair": bool(outdated) or bool(not_installed), + } + + +@router.post(route_prefix + "/{graphname}/migration/apply") +def migration_apply( + graphname: ValidGraphName, + creds: Annotated[tuple[list[str], HTTPBasicCredentials], Depends(ui_basic_auth)], + payload: Annotated[dict | None, Body()] = None, +): + """Apply repairs reported by ``GET /migration/status``. + + Request body (all optional): + + { + "apply_outdated": true, # re-create installed queries whose body has drifted + "apply_not_installed": false, # install expected queries that are missing on TG + "apply_schema": false # stubbed — no-op until check_and_apply_schema is implemented + } + + Default behavior: repair only drifted queries. The operator must + opt in to installing missing queries — some are conditional on + vector schema and may be missing intentionally. + + Acquires the per-graph lock for the duration of the repair so that + a concurrent ingest / rebuild / schema-extraction on the same graph + cannot race against the CREATE OR REPLACE + INSTALL QUERY ALL + sequence. Also rejects upfront if any rebuild is in flight + (rebuilds hold their own catalog locks on TG and would deadlock). + """ + from common.db.migrate import _gsql_hash, _extract_query_body, _read_local_query, check_and_apply_schema + + import os.path + + body = payload or {} + apply_outdated = bool(body.get("apply_outdated", True)) + apply_not_installed = bool(body.get("apply_not_installed", False)) + apply_schema = bool(body.get("apply_schema", False)) + + # Pre-flight: reject if any rebuild is in flight. The rebuild's + # INSTALL QUERY ALL holds TG catalog locks that would cause our + # CREATE OR REPLACE calls to time-out, leaving the graph half- + # migrated. + currently_rebuilding = get_rebuilding_graph() + if currently_rebuilding: + raise HTTPException( + status_code=409, + detail=f"Graph '{currently_rebuilding}' is currently being rebuilt. " + f"Migration repair cannot run while a rebuild is in flight." + ) + + # Acquire the per-graph lock so concurrent create_ingest / ingest / + # schema_extraction can't race against our query reinstalls. They + # all use the same lock and will 409 until we release. + if not acquire_graph_lock(graphname, "migration"): + current = get_current_operation(graphname) or "another operation" + raise HTTPException( + status_code=409, + detail=f"Graph '{graphname}' is currently busy with '{current}'. " + f"Wait for it to finish before running migration repair." + ) + + try: + cred_obj = creds[1] + conn = get_db_connection_pwd_manual(graphname, cred_obj.username, cred_obj.password) + return _migration_apply_inner( + graphname, conn, + apply_outdated=apply_outdated, + apply_not_installed=apply_not_installed, + apply_schema=apply_schema, + ) + finally: + release_graph_lock(graphname, "migration") + + +def _migration_apply_inner( + graphname: str, + conn, + apply_outdated: bool, + apply_not_installed: bool, + apply_schema: bool, +): + """Body of migration_apply, separated so the outer wrapper handles + the graph-lock acquire/release boilerplate. + """ + from common.db.migrate import _gsql_hash, _extract_query_body, check_and_apply_schema + import os.path + + # Read live domain schema for templated-retriever rendering. + domain_vts, domain_edges, include_entity = _get_domain_schema_for_render(conn, graphname) + + reinstalled: list[str] = [] + installed_new: list[str] = [] + errors: list[dict] = [] + + # SHOW QUERY is the single source of truth: empty / missing body + # means the query isn't installed; non-empty body that differs from + # local (after rendering for templated retrievers) means drift. + paths_to_create: list[tuple[str, bool]] = [] # (path, was_installed) + for q_path in _MIGRATION_QUERY_PATHS: + if not os.path.exists(q_path): + continue + q_name = os.path.splitext(os.path.basename(q_path))[0] + local_hash = _local_query_hash( + q_path, q_name, domain_vts, domain_edges, include_entity + ) + if local_hash is None: + continue + try: + show_out = conn.gsql(f"USE GRAPH {graphname}\nSHOW QUERY {q_name}") + except Exception as e: + errors.append({"query": q_name, "phase": "detect", "error": str(e)}) + continue + installed_body = _extract_query_body(str(show_out)) + if not installed_body: + if apply_not_installed: + paths_to_create.append((q_path, False)) + continue + if not apply_outdated: + continue + if _gsql_hash(installed_body) != local_hash: + paths_to_create.append((q_path, True)) + + # Pass 1: re-create each drifted/missing query body (CREATE OR REPLACE). + # Templated retrievers get rendered with the live domain schema before + # being sent — same as install_retrievers does at init time. Otherwise + # we'd push the un-templated body and the next rebuild's hybrid/ + # community walks wouldn't traverse domain edges. + from common.db.retriever_render import TEMPLATED_RETRIEVERS, render_retriever_body + for q_path, was_installed in paths_to_create: + q_name = os.path.splitext(os.path.basename(q_path))[0] + try: + with open(q_path, "r") as f: + q_body = f.read() + if q_name in TEMPLATED_RETRIEVERS: + q_body = render_retriever_body( + q_body, + domain_vts=domain_vts, + domain_edges=domain_edges, + include_entity=include_entity, + ) + res = conn.gsql( + f"USE GRAPH {graphname}\nBEGIN\n{q_body}\nEND\n" + ) + logger.info(f"Migration: created/updated '{q_name}' ({str(res)[:120]})") + if was_installed: + reinstalled.append(q_name) + else: + installed_new.append(q_name) + except Exception as e: + logger.error(f"Migration: failed to create '{q_name}': {e}", exc_info=True) + errors.append({"query": q_name, "phase": "create", "error": str(e)}) + + # Pass 2: a single INSTALL QUERY ALL covers everything just re-created. + if reinstalled or installed_new: + try: + install_res = conn.gsql(f"USE GRAPH {graphname}\nINSTALL QUERY ALL\n") + logger.info(f"Migration: INSTALL QUERY ALL returned {str(install_res)[:200]}") + except Exception as e: + logger.error(f"Migration: INSTALL QUERY ALL failed: {e}", exc_info=True) + errors.append({"query": "*", "phase": "install", "error": str(e)}) + + schema_result = {"applied": [], "skipped_reason": "skipped by request"} + if apply_schema: + try: + schema_result = check_and_apply_schema(conn, graphname) + except Exception as e: + logger.error(f"Migration: schema repair failed: {e}", exc_info=True) + errors.append({"query": "*", "phase": "schema", "error": str(e)}) + + return { + "graphname": graphname, + "queries_reinstalled": reinstalled, + "queries_installed_new": installed_new, + "schema_result": schema_result, + "errors": errors, + "success": not errors, + } + + @router.post(route_prefix + "/{graphname}/initialize_graph") def init_graph( graphname: ValidGraphName, @@ -1770,7 +2120,19 @@ async def forceupdate( status_code=409, detail=f"Graph '{currently_rebuilding}' is currently being rebuilt. Only one rebuild allowed at a time." ) - + + # Reject if a per-graph operation (migration repair, schema + # extraction, ingest job creation) is currently holding the graph + # lock — running INSTALL QUERY ALL concurrently with a rebuild's + # query install would deadlock on TG's catalog lock. + current_op = get_current_operation(graphname) + if current_op and current_op not in ("rebuild",): + raise HTTPException( + status_code=409, + detail=f"Graph '{graphname}' is currently busy with '{current_op}'. " + f"Wait for it to finish before triggering a rebuild." + ) + # Try to acquire global rebuild lock (async, non-blocking) if not await acquire_rebuild_lock(graphname): currently_rebuilding = get_rebuilding_graph() From 5236794874815fdaf80b2aa1df99772c7fb38313 Mon Sep 17 00:00:00 2001 From: Chengbiao Jin Date: Tue, 23 Jun 2026 13:32:00 -0700 Subject: [PATCH 2/7] GML-2133 Harden document ingestion and unify vertex ID handling - Reconcile chunks left unfinished by an interrupted run before processing new documents - Write each chunk together with its content so cancellation can't leave chunks without content - Normalize vertex IDs the same way across every ingest path so documents with spaces or mixed case in filenames stay consistent - Re-create installed queries that have drifted from the shipped version on initialization - Surface ingestion failures as clear errors instead of failing silently --- common/gsql/graphrag/StreamIds.gsql | 21 +- ecc/app/eventual_consistency_checker.py | 16 +- ecc/app/graphrag/graph_rag.py | 266 +++++++++++++-------- ecc/app/graphrag/util.py | 124 +++++++++- ecc/app/graphrag/workers.py | 62 +++-- ecc/app/supportai/util.py | 5 +- ecc/app/supportai/workers.py | 9 +- graphrag/app/routers/supportai.py | 18 +- graphrag/app/supportai/supportai.py | 1 + graphrag/app/supportai/supportai_ingest.py | 47 +++- 10 files changed, 390 insertions(+), 179 deletions(-) diff --git a/common/gsql/graphrag/StreamIds.gsql b/common/gsql/graphrag/StreamIds.gsql index 892d8c3..78d896a 100644 --- a/common/gsql/graphrag/StreamIds.gsql +++ b/common/gsql/graphrag/StreamIds.gsql @@ -1,16 +1,25 @@ CREATE OR REPLACE DISTRIBUTED QUERY StreamIds(INT current_batch, INT ttl_batches, STRING v_type) { /* - * Get the IDs of entitiess that have not already been processed - * (one batch at a time) + * Return one batch of unprocessed vertex IDs, partitioned by + * ``getvid(v) % ttl_batches``. Use ``getvid`` rather than + * ``vertex_to_int`` — only ``getvid`` is stable for STRING-primary-id + * vertex types. + * + * POST-ACCUM (not WHERE) gates on ``epoch_processed == 0`` so that + * collecting the id and stamping the claim (``epoch_processed = now()``) + * happen in one per-vertex step: a vertex can't be claimed without + * being returned, or returned without being claimed. */ ListAccum @@ids; Verts = {v_type}; Verts = SELECT v FROM Verts:v - WHERE vertex_to_int(v) % ttl_batches == current_batch - AND v.epoch_processed == 0 - ACCUM @@ids += v.id - POST-ACCUM v.epoch_processed = datetime_to_epoch(now()); // set the processing time + WHERE getvid(v) % ttl_batches == current_batch + POST-ACCUM + IF v.epoch_processed == 0 THEN + @@ids += v.id, + v.epoch_processed = datetime_to_epoch(now()) + END; PRINT @@ids; } diff --git a/ecc/app/eventual_consistency_checker.py b/ecc/app/eventual_consistency_checker.py index fb501fe..df3ae55 100644 --- a/ecc/app/eventual_consistency_checker.py +++ b/ecc/app/eventual_consistency_checker.py @@ -23,6 +23,7 @@ from common.metrics.tg_proxy import TigerGraphConnectionProxy from common.chunkers import BaseChunker from common.extractors import BaseExtractor +from supportai.util import process_id logger = logging.getLogger(__name__) @@ -112,13 +113,14 @@ def _upsert_chunk(self, doc_id, chunk_id, chunk): "DocumentChunk", chunk_id, "HAS_CONTENT", "Content", chunk_id ) self.conn.upsertEdge("Document", doc_id, "HAS_CHILD", "DocumentChunk", chunk_id) - if int(chunk_id.split("_")[-1]) > 0: + idx = int(chunk_id.split("_")[-1]) + if idx > 0: self.conn.upsertEdge( "DocumentChunk", chunk_id, "IS_AFTER", "DocumentChunk", - doc_id + "_chunk_" + str(int(chunk_id.split("_")[-1]) - 1), + process_id(f"{doc_id}_chunk_{idx - 1}"), ) # TODO: Change to loading job for all entities in document at once @@ -127,7 +129,7 @@ def _upsert_entities(self, src_id, src_type, entities): self.conn.upsertVertices( "Entity", [ - (x["id"], {"definition": x["definition"], "epoch_added": date_added}) + (process_id(x["id"]), {"definition": x["definition"], "epoch_added": date_added}) for x in entities ], ) @@ -135,7 +137,7 @@ def _upsert_entities(self, src_id, src_type, entities): src_type, "CONTAINS_ENTITY", "Entity", - [(src_id, x["id"], {}) for x in entities], + [(src_id, process_id(x["id"]), {}) for x in entities], ) # TODO: Change to loading job for all relationships in document at once @@ -145,7 +147,7 @@ def _upsert_rels(self, src_id, src_type, relationships): "RelationshipType", [ ( - x["source"] + ":" + x["type"] + ":" + x["target"], + process_id(x["source"] + ":" + x["type"] + ":" + x["target"]), { "definition": x["definition"], "short_name": x["type"], @@ -164,7 +166,7 @@ def _upsert_rels(self, src_id, src_type, relationships): "MENTIONS_RELATIONSHIP", "RelationshipType", [ - (src_id, x["source"] + ":" + x["type"] + ":" + x["target"], {}) + (src_id, process_id(x["source"] + ":" + x["type"] + ":" + x["target"]), {}) for x in relationships ], ) @@ -219,7 +221,7 @@ def _process_document_content(self, v_type, vertex_id, content): LogWriter.info(f"Chunking the content from vertex type: {v_type}") chunks = self._chunk_document(content) for i, chunk in enumerate(chunks): - self._upsert_chunk(vertex_id, f"{vertex_id}_chunk_{i}", chunk) + self._upsert_chunk(vertex_id, process_id(f"{vertex_id}_chunk_{i}"), chunk) def _extract_and_upsert_entities(self, v_type, vertex_id, content): LogWriter.info(f"Extracting and upserting entities from the content from vertex type: {v_type}") diff --git a/ecc/app/graphrag/graph_rag.py b/ecc/app/graphrag/graph_rag.py index d2bee46..9115fa7 100644 --- a/ecc/app/graphrag/graph_rag.py +++ b/ecc/app/graphrag/graph_rag.py @@ -17,7 +17,7 @@ import logging import time import traceback -from collections import defaultdict +from collections import Counter, defaultdict import httpx from aiochannel import Channel, ChannelClosed @@ -54,39 +54,48 @@ async def stream_docs( progress=None, ): """ - Streams the document contents into the docs_chan. + Stream unprocessed Documents (epoch_processed == 0) into docs_chan. + + Single-probe scan: call StreamIds with ttl_batches=1 to fetch ALL + unprocessed Document ids in one round-trip. Same rationale as + stream_chunks — partitioning iterated the vertex space even when + nothing was to do; on this TG build that's not the bottleneck. + + ``ttl_batches`` is preserved for caller-compatibility but ignored. *progress* (optional) is a callable invoked once when document streaming completes — runtime hands the rebuild status forward from "Chunking documents" to "Extracting entities and relationships" at that boundary. """ - logger.info(f"streaming docs ({ttl_batches} batches)") + _ = ttl_batches # intentionally unused — see docstring + logger.info("streaming docs (single-probe scan)") + probe = await stream_ids(conn, "Document", 0, 1) n_docs = 0 - for i in range(ttl_batches): - doc_ids = await stream_ids(conn, "Document", i, ttl_batches) - if doc_ids["error"]: - # continue to the next batch. - # These docs will not be marked as processed, so the ecc will process it eventually. - continue - - for d in doc_ids["ids"]: - try: - async with tg_sem: - res = await conn.runInstalledQuery( - "StreamDocContent", - # 1-tuple form for VERTEX params; see - # stream_chunks for the deprecation context. - params={"doc": (d,)}, - ) - # Demoted from INFO — ``d`` is a user document ID. - logger.debug(f"stream_docs writes {d} to docs") - await docs_chan.put(res[0]["DocContent"][0]) - n_docs += 1 - except Exception as e: - exc = traceback.format_exc() - logger.error(f"Error retrieving doc: {d} --> {e}\n{exc}") - continue # try retrieving the next doc + if probe.get("error"): + logger.warning("stream_docs: StreamIds probe failed; nothing to stream") + else: + doc_ids_all = probe.get("ids") or [] + if not doc_ids_all: + logger.info("stream_docs: no unprocessed Documents (epoch_processed == 0)") + else: + logger.info( + f"stream_docs: {len(doc_ids_all)} unprocessed Document(s) to stream" + ) + for d in doc_ids_all: + try: + async with tg_sem: + res = await conn.runInstalledQuery( + "StreamDocContent", + params={"doc": (d,)}, + ) + logger.debug(f"stream_docs writes {d} to docs") + await docs_chan.put(res[0]["DocContent"][0]) + n_docs += 1 + except Exception as e: + exc = traceback.format_exc() + logger.error(f"Error retrieving doc: {d} --> {e}\n{exc}") + continue logger.info(f"stream_docs done: {n_docs} document(s) streamed") # close the docs chan -- this function is the only sender @@ -105,61 +114,74 @@ async def stream_chunks( ttl_batches: int = 10, ): """ - Streams the chunk contents into the extract_chan and embed_chan + Stream residual unprocessed DocumentChunks into extract_chan + embed_chan. + + Single-probe scan: call StreamIds with ttl_batches=1 (no partitioning) + to get the full set of unprocessed DocumentChunk ids in one round-trip. + StreamIds' POST-ACCUM atomically claims and marks the chunks in the same + query. The original 100-batch loop iterated the partition space even + when there was nothing to do — this collapses to one query, which is + the common case under the post-back-port ordering where stream_chunks + runs first and only sees residual orphans from prior crashed ECC runs. + + ``ttl_batches`` is preserved for caller-compatibility but ignored; the + partitioning was a hedge against scanning huge vertex sets in one + query and that's not the bottleneck on this TG build. """ - logger.info(f"streaming chunks ({ttl_batches} batches)") + _ = ttl_batches # intentionally unused — see docstring + logger.info("streaming chunks (single-probe scan)") + probe = await stream_ids(conn, "DocumentChunk", 0, 1) + if probe.get("error"): + logger.warning("stream_chunks: StreamIds probe failed; nothing to process") + logger.info("stream_chunks done: 0 chunk(s) streamed") + await extract_chan.put(None) + return + chunk_ids_all = probe.get("ids") or [] + if not chunk_ids_all: + logger.info("stream_chunks: no residual chunks (epoch_processed == 0)") + logger.info("stream_chunks done: 0 chunk(s) streamed") + await extract_chan.put(None) + return + logger.info( + f"stream_chunks: {len(chunk_ids_all)} residual chunk(s) to process" + ) n_chunks = 0 - for i in range(ttl_batches): - chunk_ids = await stream_ids(conn, "DocumentChunk", i, ttl_batches) - if chunk_ids["error"]: - continue - - for c in chunk_ids["ids"]: - try: - # Retry briefly when ChunkContent is empty — that - # happens when stream_ids surfaced a DocumentChunk - # vertex but its HAS_CONTENT edge upsert hasn't - # flushed yet (the loader runs in batches). Without - # the retry the chunk gets silently dropped and - # extracted only on the next ECC sweep. - chunk_rows = [] - for attempt in range(3): - async with tg_sem: - res = await conn.runInstalledQuery( - "StreamChunkContent", - # 1-tuple form is the supported shape for - # VERTEX params in current pyTigerGraph; - # the plain-value form raises a deprecation - # warning and falls back to a slower GET. - params={"chunk": (c,)}, - ) - chunk_rows = (res[0] if res else {}).get("ChunkContent") or [] - if chunk_rows: - break - # Back off and try again — the loader's batch - # interval is a few seconds. - await asyncio.sleep(2 * (attempt + 1)) - if not chunk_rows: - logger.warning( - f"No content row for chunk {c} after retries; skipping" + for c in chunk_ids_all: + try: + # stream_chunks runs before chunk_docs writes new chunks, so an + # empty ChunkContent here means a genuine orphan from a crashed + # prior run (DocumentChunk exists but no Content). Retry briefly + # in case of a transient read. + chunk_rows = [] + for attempt in range(3): + async with tg_sem: + res = await conn.runInstalledQuery( + "StreamChunkContent", + params={"chunk": (c,)}, ) - continue - content = chunk_rows[0]["attributes"]["text"].encode( - 'raw_unicode_escape' - ).decode('unicode_escape') - logger.debug("chunk writes to extract_chan") - await extract_chan.put((content, c)) - - # send chunks to be embedded - logger.debug("chunk writes to embed_chan") - await embed_chan.put((c, content, "DocumentChunk")) - n_chunks += 1 - if n_chunks % 100 == 0: - logger.info(f"streaming chunks: {n_chunks} streamed") - except Exception as e: - exc = traceback.format_exc() - logger.error(f"Error retrieving chunk: {c} --> {e}\n{exc}") - continue # try retrieving the next doc + chunk_rows = (res[0] if res else {}).get("ChunkContent") or [] + if chunk_rows: + break + await asyncio.sleep(2 * (attempt + 1)) + if not chunk_rows: + logger.warning( + f"No content row for chunk {c} after retries; skipping" + ) + continue + content = chunk_rows[0]["attributes"]["text"].encode( + 'raw_unicode_escape' + ).decode('unicode_escape') + logger.debug("chunk writes to extract_chan") + await extract_chan.put((content, c)) + logger.debug("chunk writes to embed_chan") + await embed_chan.put((c, content, "DocumentChunk")) + n_chunks += 1 + if n_chunks % 100 == 0: + logger.info(f"streaming chunks: {n_chunks} streamed") + except Exception as e: + exc = traceback.format_exc() + logger.error(f"Error retrieving chunk: {c} --> {e}\n{exc}") + continue logger.info(f"stream_chunks done: {n_chunks} chunk(s) streamed") logger.info("closing extract_chan") @@ -241,6 +263,7 @@ async def load(conn: AsyncTigerGraphConnection): graph_cfg = get_graphrag_config(conn.graphname) batch_size = graph_cfg.get("load_batch_size", 500) upsert_delay = graph_cfg.get("upsert_delay", 0) + batch_seq = 0 # while the load q is still open or has contents while not load_q.closed() or not load_q.empty(): if load_q.closed(): @@ -256,6 +279,8 @@ async def load(conn: AsyncTigerGraphConnection): } n_verts = 0 n_edges = 0 + vt_counts: Counter = Counter() + et_counts: Counter = Counter() size = ( load_q.qsize() if load_q.closed() or load_q.should_flush() @@ -270,30 +295,59 @@ async def load(conn: AsyncTigerGraphConnection): match t: case "vertices": vt, v_id, attr = elem - batch[t][vt][v_id] = attr + batch["vertices"][vt][v_id] = attr n_verts += 1 + vt_counts[vt] += 1 case "edges": src_v_type, src_v_id, edge_type, tgt_v_type, tgt_v_id, attrs = ( elem ) - batch[t][src_v_type][src_v_id][edge_type][tgt_v_type][ + batch["edges"][src_v_type][src_v_id][edge_type][tgt_v_type][ tgt_v_id ] = attrs n_edges += 1 + et_counts[edge_type] += 1 + case "group": + # Atomic multi-vertex + multi-edge bundle from + # ``upsert_group``. Producers enqueue all related + # ops as one item so cancellation can't split + # them; unpack each into the same batch dict so + # they reach TG in one upsertData call. + for vt, v_id, attr in elem.get("vertices", []): + batch["vertices"][vt][v_id] = attr + n_verts += 1 + vt_counts[vt] += 1 + for (src_v_type, src_v_id, edge_type, tgt_v_type, tgt_v_id, attrs) in elem.get("edges", []): + batch["edges"][src_v_type][src_v_id][edge_type][tgt_v_type][tgt_v_id] = attrs + n_edges += 1 + et_counts[edge_type] += 1 case _: logger.debug(f"Unexpected data {t} -> {elem} in load_q") - data = json.dumps(batch) - logger.info( - f"Upserting batch size of {size}. ({n_verts} verts | {n_edges} edges. {len(data.encode())/1000:,} kb)" - ) - + batch_seq += 1 if n_verts > 0 or n_edges > 0: + data = json.dumps(batch) + vt_brief = ", ".join(f"{k}={n}" for k, n in vt_counts.most_common()) or "-" + et_brief = ", ".join(f"{k}={n}" for k, n in et_counts.most_common()) or "-" + logger.info( + f"Upserting batch #{batch_seq}: size {size}. " + f"({n_verts} verts [{vt_brief}] | {n_edges} edges [{et_brief}]. " + f"{len(data.encode())/1000:,} kb)" + ) loading_event.clear() - await upsert_batch(conn, data) + await upsert_batch( + conn, data, + expected_vertices=n_verts, expected_edges=n_edges, + batch_seq=batch_seq, + ) loading_event.set() if upsert_delay > 0: await asyncio.sleep(upsert_delay) + else: + logger.info( + f"Skipping empty batch #{batch_seq}: size {size} " + f"(no vertices or edges to send)" + ) else: await asyncio.sleep(1) @@ -575,21 +629,37 @@ def _report(msg: str) -> None: num_chunk_senders = 2 async with asyncio.TaskGroup() as grp: - # get docs - grp.create_task(stream_docs(conn, docs_chan, 100, progress=progress)) - # process docs - grp.create_task( - chunk_docs(conn, docs_chan, embed_chan, upsert_chan, extract_chan) - ) - # process existing chunks - grp.create_task(stream_chunks(conn, extract_chan, embed_chan, 100)) + # PHASE 1 — residual chunk sweep + # stream_chunks is the gatekeeper for chunks that exist in TG + # but weren't fully processed (e.g. crashed prior ECC run, or + # DocumentChunk/Content vertices loaded by external means). + # It MUST complete before stream_docs/chunk_docs run, otherwise + # the concurrent upsertData visibility window (vertex visible + # in StreamIds query before its HAS_CONTENT edge commits) makes + # stream_chunks claim freshly-being-written chunks and emit + # spurious "No content row for chunk" warnings. + sc_task = grp.create_task(stream_chunks(conn, extract_chan, embed_chan, 100)) + + # PHASE 2 — new-doc pipeline (stream_docs + chunk_docs) + # Deferred behind stream_chunks completion. chunk_docs feeds + # extract_chan with chunk text directly from Python memory, so + # the extract worker doesn't need any TG round-trip for new + # chunks. With the residual sweep already done, there's no + # remaining producer racing chunk_docs' load_q writes. + async def new_doc_pipeline(): + await sc_task + async with asyncio.TaskGroup() as inner: + inner.create_task( + stream_docs(conn, docs_chan, 100, progress=progress) + ) + inner.create_task( + chunk_docs(conn, docs_chan, embed_chan, upsert_chan, extract_chan) + ) + grp.create_task(new_doc_pipeline()) - # upsert chunks grp.create_task(upsert(upsert_chan)) grp.create_task(load(conn)) - # embed grp.create_task(embed(embed_chan, embedding_store, graphname)) - # extract entities grp.create_task( extract(extract_chan, upsert_chan, embed_chan, extractor, conn, num_chunk_senders) ) diff --git a/ecc/app/graphrag/util.py b/ecc/app/graphrag/util.py index 897853f..c388f48 100644 --- a/ecc/app/graphrag/util.py +++ b/ecc/app/graphrag/util.py @@ -81,9 +81,12 @@ async def install_queries( requried_queries: list[str], conn: AsyncTigerGraphConnection, ): + from common.db.migrate import query_needs_update_async + installed_queries = [q.split("/")[-1] for q in await conn.getEndpoints(dynamic=True) if f"/{conn.graphname}/" in q] required_names = set() + drift_detected = False for q in requried_queries: q_name = q.split("/")[-1] required_names.add(q_name) @@ -92,8 +95,18 @@ async def install_queries( if res["error"]: raise Exception(res["message"]) logger.info(f"Successfully created query '{q_name}'.") + continue + # Already installed — check whether the shipped body has drifted + # from what's on TG. If so, re-create so the new body actually + # takes effect after a graphrag version upgrade. + if await query_needs_update_async(conn, f"{q}.gsql"): + res = await workers.install_query(conn, q, False) + if res["error"]: + raise Exception(res["message"]) + logger.info(f"Re-installed '{q_name}' (body drift detected).") + drift_detected = True - if required_names.issubset(set(installed_queries)): + if not drift_detected and required_names.issubset(set(installed_queries)): logger.info("All required queries already installed, skipping INSTALL QUERY ALL.") return @@ -239,7 +252,7 @@ def process_id(v_id: str): has_func = re.compile(r"(.*)\(").findall(v_id) if len(has_func) > 0: v_id = has_func[0] - v_id = v_id.replace(" ", "-").lower().replace("/", "_").replace("(", "").replace(")", "") + v_id = v_id.replace(" ", "_").lower().replace("/", "_").replace("(", "").replace(")", "") if v_id == "''" or v_id == '""': return "" @@ -259,7 +272,7 @@ def normalize_type_name(name: str) -> str: Applies in order: - 1. ``process_id`` (lowercase, whitespace → ``-``, strip parens). + 1. ``process_id`` (lowercase, whitespace → ``_``, strip parens). 2. Strip a single trailing semantic-suffix from :data:`_TYPE_SUFFIXES` (e.g. ``company_type`` → ``company``). 3. Singularize trailing ``ies`` → ``y`` (``companies`` → @@ -295,6 +308,32 @@ def normalize_type_name(name: str) -> str: return base +async def upsert_group( + conn: AsyncTigerGraphConnection, + vertices: list, + edges: list, +): + """Enqueue a bundle of related vertices + edges as a single load_q + item, so the flusher batches them together and cancellation between + individual upserts cannot leave half-applied state. + + ``vertices``: list of ``(vertex_type, vertex_id, attributes_dict)`` + ``edges``: list of ``(src_v_type, src_v_id, edge_type, tgt_v_type, + tgt_v_id, attributes_dict)`` + + A single ``load_q.put`` is one suspension point — either the whole + bundle lands in the queue or nothing does. + """ + packed_vertices = [ + (vt, str(v_id), map_attrs(attrs)) for vt, v_id, attrs in vertices + ] + packed_edges = [ + (s_t, str(s_id), e_t, t_t, str(t_id), map_attrs(attrs) if attrs else {}) + for (s_t, s_id, e_t, t_t, t_id, attrs) in edges + ] + await load_q.put(("group", {"vertices": packed_vertices, "edges": packed_edges})) + + async def upsert_vertex( conn: AsyncTigerGraphConnection, vertex_type: str, @@ -302,7 +341,6 @@ async def upsert_vertex( attributes: dict, ): logger.debug(f"Upsert vertex: {vertex_id} as {vertex_type}") - vertex_id = vertex_id.replace(" ", "_") attrs = map_attrs(attributes) await load_q.put(("vertices", (vertex_type, vertex_id, attrs))) @@ -460,14 +498,84 @@ def _coerce_value(value, tg_type: str): return None -async def upsert_batch(conn: AsyncTigerGraphConnection, data: str): +async def upsert_batch( + conn: AsyncTigerGraphConnection, + data: str, + expected_vertices: int | None = None, + expected_edges: int | None = None, + batch_seq: int | None = None, +): + """Send a batched JSON upsert to TG and log the response. + + ``expected_vertices`` / ``expected_edges`` are the pre-send batch + counts, logged alongside the TG response so an "expected != accepted + + skipped" gap is visible. + + ``batch_seq`` is the per-batch counter from the flusher; echoed on + every log line this function emits. + """ + seq_tag = f"#{batch_seq}" if batch_seq is not None else "" async with tg_sem: try: res = await conn.upsertData(data) - logger.info(f"Upsert res: {res}") + acc_v = (res or {}).get("accepted_vertices", 0) if isinstance(res, dict) else 0 + sk_v = (res or {}).get("skipped_vertices", 0) if isinstance(res, dict) else 0 + acc_e = (res or {}).get("accepted_edges", 0) if isinstance(res, dict) else 0 + sk_e = (res or {}).get("skipped_edges", 0) if isinstance(res, dict) else 0 + ev = expected_vertices if expected_vertices is not None else "?" + ee = expected_edges if expected_edges is not None else "?" + untracked_v = (expected_vertices - acc_v - sk_v) if expected_vertices is not None else None + untracked_e = (expected_edges - acc_e - sk_e) if expected_edges is not None else None + vfit = ("OK" if untracked_v == 0 else f"GAP={untracked_v}") if untracked_v is not None else "" + efit = ("OK" if untracked_e == 0 else f"GAP={untracked_e}") if untracked_e is not None else "" + logger.info( + f"Upsert res {seq_tag}: vertices sent={ev} accepted={acc_v} skipped={sk_v} {vfit} | " + f"edges sent={ee} accepted={acc_e} skipped={sk_e} {efit}" + ) + # Diagnostic: TG can silently skip vertices/edges (schema + # mismatch, primary-id conflict, etc.) and only surface a + # count. When that happens, log the type/id breakdown of what + # was in the batch so the missing ones can be identified, and + # verify which vertex IDs actually landed in TG. + if sk_v or sk_e: + try: + payload = json.loads(data) + except Exception: + payload = {} + v_section = payload.get("vertices", {}) or {} + logger.warning( + f"[SKIP-DIAG {seq_tag}] batch had skipped_vertices={sk_v} " + f"skipped_edges={sk_e}; sent vertex types: " + f"{ {vt: len(vids) for vt, vids in v_section.items()} }" + ) + from urllib.parse import quote + for vt, vids in v_section.items(): + sent = list(vids.keys())[:200] + missing = [] + for vid in sent: + try: + url = ( + conn.restppUrl + "/graph/" + conn.graphname + + "/vertices/" + vt + "/" + quote(vid, safe="") + ) + r = await conn._req("GET", url) + body = r if isinstance(r, dict) else {} + if not (body.get("results") or []): + missing.append(vid) + except Exception: + missing.append(vid) + if missing: + logger.warning( + f"[SKIP-DIAG {seq_tag}] type={vt}: {len(missing)}/{len(sent)} " + f"missing after upsert" + ) + logger.debug( + f"[SKIP-DIAG {seq_tag}] type={vt} first 10 missing ids: {missing[:10]}" + ) except Exception as e: err = traceback.format_exc() - logger.error(f"Upsert err with {data}:\n{err}") + logger.error(f"Upsert err {seq_tag}:\n{err}") + logger.debug(f"Upsert err {seq_tag} payload: {data}") return {"error": True, "message": str(e)} @@ -502,8 +610,6 @@ async def upsert_edge( else: attrs = map_attrs(attributes) logger.debug(f"Upsert edge: {src_v_id} -[{edge_type}]-> {tgt_v_id}") - src_v_id = src_v_id.replace(" ", "_") - tgt_v_id = tgt_v_id.replace(" ", "_") await load_q.put( ( "edges", diff --git a/ecc/app/graphrag/workers.py b/ecc/app/graphrag/workers.py index 0f692b8..d3959b8 100644 --- a/ecc/app/graphrag/workers.py +++ b/ecc/app/graphrag/workers.py @@ -92,11 +92,7 @@ async def chunk_doc( else: chunker_type = "" - v_id = util.process_id(doc["v_id"]) - if v_id != doc["v_id"]: - # v_id is a sanitized form of a user document ID — DEBUG. - logger.debug(f"""Cloning doc/content {doc["v_id"]} -> {v_id}""") - await upsert_chan.put((upsert_doc, (conn, v_id, chunker_type, doc["attributes"]["text"]))) + v_id = doc["v_id"].lower() # Use get_chunker for all types (including images) # For images, get_chunker returns SingleChunker which preserves markdown image references @@ -107,7 +103,7 @@ async def chunk_doc( # v_id / chunk_id derive from user document content. logger.debug(f"Chunking {v_id} into {len(chunks)} chunk(s)") for i, chunk in enumerate(chunks): - chunk_id = f"{v_id}_chunk_{i}" + chunk_id = util.process_id(f"{v_id}_chunk_{i}") logger.debug(f"Processing chunk {chunk_id}") # send chunks to be upserted (func, args) @@ -146,33 +142,30 @@ async def upsert_doc(conn: AsyncTigerGraphConnection, doc_id, ctype, content_tex async def upsert_chunk(conn: AsyncTigerGraphConnection, doc_id, chunk_id, chunk): logger.debug(f"Upserting chunk {chunk_id}") date_added = int(time.time()) - await util.upsert_vertex( - conn, - "DocumentChunk", - chunk_id, - attributes={"epoch_added": date_added, "epoch_processed": date_added, "idx": int(chunk_id.split("_")[-1])}, - ) - await util.upsert_vertex( - conn, - "Content", - chunk_id, - attributes={"text": chunk, "epoch_added": date_added}, - ) - await util.upsert_edge( - conn, "DocumentChunk", chunk_id, "HAS_CONTENT", "Content", chunk_id - ) - await util.upsert_edge( - conn, "Document", doc_id, "HAS_CHILD", "DocumentChunk", chunk_id - ) - if int(chunk_id.split("_")[-1]) > 0: - await util.upsert_edge( - conn, - "DocumentChunk", - chunk_id, - "IS_AFTER", - "DocumentChunk", - doc_id + "_chunk_" + str(int(chunk_id.split("_")[-1]) - 1), - ) + # Build the chunk's full vertex + edge bundle and enqueue atomically. + # Three separate ``await util.upsert_vertex/edge`` calls would let + # asyncio cancellation split DocumentChunk from its sibling Content, + # producing the "chunk exists but Content missing" pattern that + # surfaces as repeated "No content row for chunk" warnings. + vertices = [ + ("DocumentChunk", chunk_id, { + "epoch_added": date_added, + "epoch_processed": date_added, + "idx": int(chunk_id.split("_")[-1]), + }), + ("Content", chunk_id, {"text": chunk, "epoch_added": date_added}), + ] + edges = [ + ("DocumentChunk", chunk_id, "HAS_CONTENT", "Content", chunk_id, None), + ("Document", doc_id, "HAS_CHILD", "DocumentChunk", chunk_id, None), + ] + idx = int(chunk_id.split("_")[-1]) + if idx > 0: + edges.append(( + "DocumentChunk", chunk_id, "IS_AFTER", + "DocumentChunk", util.process_id(f"{doc_id}_chunk_{idx - 1}"), None, + )) + await util.upsert_group(conn, vertices, edges) embed_sem = asyncio.Semaphore(util._worker_concurrency) @@ -757,7 +750,8 @@ async def process_community( await util.loading_event.wait() async with comm_sem: - logger.info(f"Processing Community: {comm_id}") + logger.info(f"Processing community at layer {i}") + logger.debug(f"Processing Community: {comm_id}") # get the children of the community children = await util.get_commuinty_children(conn, i, comm_id) comm_id = util.process_id(comm_id) diff --git a/ecc/app/supportai/util.py b/ecc/app/supportai/util.py index 1b1328a..0d62c66 100644 --- a/ecc/app/supportai/util.py +++ b/ecc/app/supportai/util.py @@ -185,7 +185,7 @@ def process_id(v_id: str): has_func = re.compile(r"(.*)\(").findall(v_id) if len(has_func) > 0: v_id = has_func[0] - v_id = v_id.replace(" ", "-").lower().replace("/", "_").replace("(", "").replace(")", "") + v_id = v_id.replace(" ", "_").lower().replace("/", "_").replace("(", "").replace(")", "") if v_id == "''" or v_id == '""': return "" @@ -199,7 +199,6 @@ async def upsert_vertex( attributes: dict, ): logger.info(f"Upsert vertex: {vertex_type} {vertex_id}") - vertex_id = vertex_id.replace(" ", "_") attrs = map_attrs(attributes) data = json.dumps({"vertices": {vertex_type: {vertex_id: attrs}}}) headers = make_headers(conn) @@ -244,8 +243,6 @@ async def upsert_edge( attrs = {} else: attrs = map_attrs(attributes) - src_v_id = src_v_id.replace(" ", "_") - tgt_v_id = tgt_v_id.replace(" ", "_") data = json.dumps( { "edges": { diff --git a/ecc/app/supportai/workers.py b/ecc/app/supportai/workers.py index ce85274..b80cb21 100644 --- a/ecc/app/supportai/workers.py +++ b/ecc/app/supportai/workers.py @@ -82,7 +82,7 @@ async def chunk_doc( else: chunker_type = "" - v_id = util.process_id(doc["v_id"]) + v_id = doc["v_id"].lower() # Use markdown chunker for all documents # Image descriptions wrapped in headers will naturally become single chunks @@ -93,7 +93,7 @@ async def chunk_doc( # to DEBUG so the steady-state log doesn't carry data identifiers. logger.debug(f"Chunking {v_id} into {len(chunks)} chunk(s)") for i, chunk in enumerate(chunks): - chunk_id = f"{v_id}_chunk_{i}" + chunk_id = util.process_id(f"{v_id}_chunk_{i}") # send chunks to be upserted (func, args) logger.debug("chunk writes to upsert_chan") await upsert_chan.put((upsert_chunk, (conn, v_id, chunk_id, chunk))) @@ -130,14 +130,15 @@ async def upsert_chunk(conn: TigerGraphConnection, doc_id, chunk_id, chunk): await util.upsert_edge( conn, "Document", doc_id, "HAS_CHILD", "DocumentChunk", chunk_id ) - if int(chunk_id.split("_")[-1]) > 0: + idx = int(chunk_id.split("_")[-1]) + if idx > 0: await util.upsert_edge( conn, "DocumentChunk", chunk_id, "IS_AFTER", "DocumentChunk", - doc_id + "_chunk_" + str(int(chunk_id.split("_")[-1]) - 1), + util.process_id(f"{doc_id}_chunk_{idx - 1}"), ) diff --git a/graphrag/app/routers/supportai.py b/graphrag/app/routers/supportai.py index 97266d9..bb27805 100644 --- a/graphrag/app/routers/supportai.py +++ b/graphrag/app/routers/supportai.py @@ -91,8 +91,13 @@ def create_ingest( credentials: Annotated[HTTPBase, Depends(security)], ): conn = conn.state.conn - - return supportai.create_ingest(graphname, cfg, conn) + try: + return supportai.create_ingest(graphname, cfg, conn) + except HTTPException: + raise + except Exception as e: + logger.error(f"create_ingest failed for graph '{graphname}': {e}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Ingest preparation failed: {str(e)}") @router.post("/{graphname}/graphrag/ingest") @@ -104,8 +109,13 @@ def ingest( credentials: Annotated[HTTPBase, Depends(security)], ): conn = conn.state.conn - - return supportai.ingest(graphname, loader_info, conn) + try: + return supportai.ingest(graphname, loader_info, conn) + except HTTPException: + raise + except Exception as e: + logger.error(f"ingest failed for graph '{graphname}': {e}", exc_info=True) + raise HTTPException(status_code=500, detail=f"Ingestion failed: {str(e)}") @router.post("/{graphname}/graphrag/search") diff --git a/graphrag/app/supportai/supportai.py b/graphrag/app/supportai/supportai.py index df09942..799dc20 100644 --- a/graphrag/app/supportai/supportai.py +++ b/graphrag/app/supportai/supportai.py @@ -528,6 +528,7 @@ def create_ingest( res["data_path"] = "in_temp_storage" res["data_source_id"] = res_ingest_config except Exception as e: + logger.error(f"Server folder processing failed for graph '{graphname}', path '{data_path}': {e}", exc_info=True) raise Exception(f"Error during server folder processing: {e}") elif ingest_config.data_source.lower() == "local": res["data_source_id"] = "DocumentContent" diff --git a/graphrag/app/supportai/supportai_ingest.py b/graphrag/app/supportai/supportai_ingest.py index 4d29729..c159777 100644 --- a/graphrag/app/supportai/supportai_ingest.py +++ b/graphrag/app/supportai/supportai_ingest.py @@ -5,6 +5,8 @@ from common.py_schemas import BatchDocumentIngest, Document, DocumentChunk, KnowledgeGraph from typing import List, Union import json +import logging +import re from datetime import datetime from common.status import Status, IngestionProgress from common.extractors import LLMEntityRelationshipExtractor @@ -12,6 +14,18 @@ from langchain.prompts import ChatPromptTemplate from langchain.output_parsers import PydanticOutputParser +logger = logging.getLogger(__name__) + + +def _process_id(v_id: str) -> str: + has_func = re.compile(r"(.*)\(").findall(v_id) + if has_func: + v_id = has_func[0] + v_id = v_id.replace(" ", "_").lower().replace("/", "_").replace("(", "").replace(")", "") + if v_id in ("''", '""'): + return "" + return v_id + class BaseIngestion: def __init__( @@ -70,7 +84,7 @@ def chunk_document(self, document, chunker, chunker_params): chunks = chunker(document.text) chunks = [ DocumentChunk( - document_chunk_id=f"{document.document_id}_chunk_{i}", text=chunk + document_chunk_id=_process_id(f"{document.document_id}_chunk_{i}"), text=chunk ) for i, chunk in enumerate(chunks) ] @@ -92,7 +106,7 @@ def upsert_chunk(self, chunk: DocumentChunk): now = datetime.now() date_added = now.strftime("%Y-%m-%d %H:%M:%S") chunk_id = chunk.document_chunk_id - doc_id = chunk.document_chunk_id.split("_")[0] + doc_id = chunk_id.rsplit("_chunk_", 1)[0] self.status.progress.chunk_failures[chunk_id] = [] try: self.conn.upsertVertex( @@ -115,15 +129,17 @@ def upsert_chunk(self, chunk: DocumentChunk): self.conn.upsertEdge( "Document", doc_id, "HAS_CHILD", "DocumentChunk", chunk_id ) - if int(chunk_id.split("_")[-1]) > 0: + idx = int(chunk_id.split("_")[-1]) + if idx > 0: self.conn.upsertEdge( "DocumentChunk", chunk_id, "IS_AFTER", "DocumentChunk", - doc_id + "_chunk_" + str(int(chunk_id.split("_")[-1]) - 1), + _process_id(f"{doc_id}_chunk_{idx - 1}"), ) except Exception as e: + logger.error(f"Failed to upsert chunk '{chunk_id}' (doc '{doc_id}'): {e}", exc_info=True) self.status.progress.chunk_failures[chunk_id].append(e) if chunk.entities != []: @@ -132,7 +148,7 @@ def upsert_chunk(self, chunk: DocumentChunk): "Entity", [ ( - x["id"], + _process_id(x["id"]), { "definition": x["definition"], "date_added": date_added, @@ -145,9 +161,10 @@ def upsert_chunk(self, chunk: DocumentChunk): "DocumentChunk", "CONTAINS_ENTITY", "Entity", - [(chunk_id, x["id"], {}) for x in chunk.entities], + [(chunk_id, _process_id(x["id"]), {}) for x in chunk.entities], ) except Exception as e: + logger.error(f"Failed to upsert entities for chunk '{chunk_id}': {e}", exc_info=True) self.status.progress.chunk_failures[chunk_id].append(e) if chunk.relationships != []: @@ -156,7 +173,7 @@ def upsert_chunk(self, chunk: DocumentChunk): "RelationshipType", [ ( - x["source"] + ":" + x["type"] + ":" + x["target"], + _process_id(x["source"] + ":" + x["type"] + ":" + x["target"]), { "definition": x["definition"], "short_name": x["type"], @@ -179,19 +196,20 @@ def upsert_chunk(self, chunk: DocumentChunk): [ ( chunk_id, - x["source"] + ":" + x["type"] + ":" + x["target"], + _process_id(x["source"] + ":" + x["type"] + ":" + x["target"]), {}, ) for x in chunk.relationships ], ) except Exception as e: + logger.error(f"Failed to upsert relationships for chunk '{chunk_id}': {e}", exc_info=True) self.status.progress.chunk_failures[chunk_id].append(e) def upsert_document(self, document: Document): now = datetime.now() date_added = now.strftime("%Y-%m-%d %H:%M:%S") - doc_id = document.document_id + doc_id = _process_id(document.document_id) doc_collection = document.document_collection self.status.progress.doc_failures[doc_id] = [] try: @@ -207,6 +225,7 @@ def upsert_document(self, document: Document): ) self.conn.upsertEdge("Document", doc_id, "HAS_CONTENT", "Content", doc_id) except Exception as e: + logger.error(f"Failed to upsert document '{doc_id}': {e}", exc_info=True) self.status.progress.doc_failures[doc_id].append(e) if document.entities != []: @@ -215,7 +234,7 @@ def upsert_document(self, document: Document): "Entity", [ ( - x["id"], + _process_id(x["id"]), { "definition": x["definition"], "date_added": date_added, @@ -228,9 +247,10 @@ def upsert_document(self, document: Document): "Document", "CONTAINS_ENTITY", "Entity", - [(doc_id, x["id"], {}) for x in document.entities], + [(doc_id, _process_id(x["id"]), {}) for x in document.entities], ) except Exception as e: + logger.error(f"Failed to upsert entities for document '{doc_id}': {e}", exc_info=True) self.status.progress.doc_failures[doc_id].append(e) if document.relationships != []: @@ -239,7 +259,7 @@ def upsert_document(self, document: Document): "RelationshipType", [ ( - x["source"] + ":" + x["type"] + ":" + x["target"], + _process_id(x["source"] + ":" + x["type"] + ":" + x["target"]), { "definition": x["definition"], "short_name": x["type"], @@ -259,11 +279,12 @@ def upsert_document(self, document: Document): "MENTIONS_RELATIONSHIP", "RelationshipType", [ - (doc_id, x["source"] + ":" + x["type"] + ":" + x["target"], {}) + (doc_id, _process_id(x["source"] + ":" + x["type"] + ":" + x["target"]), {}) for x in document.relationships ], ) except Exception as e: + logger.error(f"Failed to upsert relationships for document '{doc_id}': {e}", exc_info=True) self.status.progress.doc_failures[doc_id].append(e) From 7f077f75fb990786dd4dd3afd8e3328c835bf952 Mon Sep 17 00:00:00 2001 From: Chengbiao Jin Date: Tue, 23 Jun 2026 13:32:00 -0700 Subject: [PATCH 3/7] GML-2134 Report per-file upload failures in the ingestion dialog --- graphrag-ui/src/pages/setup/IngestGraph.tsx | 77 ++++++++++++--------- 1 file changed, 43 insertions(+), 34 deletions(-) diff --git a/graphrag-ui/src/pages/setup/IngestGraph.tsx b/graphrag-ui/src/pages/setup/IngestGraph.tsx index feb4bfe..c276eb2 100644 --- a/graphrag-ui/src/pages/setup/IngestGraph.tsx +++ b/graphrag-ui/src/pages/setup/IngestGraph.tsx @@ -21,6 +21,7 @@ import { import { useConfirm } from "@/hooks/useConfirm"; import { pingIdleTimer } from "@/hooks/useIdleTimeout"; import { resolveUploadConflicts } from "@/utils/uploadConflicts"; +import { safeJson } from "@/utils/safeJson"; interface IngestGraphProps { isModal?: boolean; @@ -100,7 +101,7 @@ const IngestGraph: React.FC = ({ isModal = false }) => { const response = await fetch(`/ui/${ingestGraphName}/uploads/list`, { headers: { Authorization: creds! }, }); - const data = await response.json(); + const data = await safeJson(response); setUploadedFiles(data.files || []); } catch (error) { console.error("Error fetching files:", error); @@ -179,11 +180,11 @@ const IngestGraph: React.FC = ({ isModal = false }) => { ); if (!response.ok) { - const errorData = await response.json(); + const errorData = await safeJson(response); throw new Error(errorData.detail || `Upload failed: ${response.statusText}`); } - const data = await response.json(); + const data = await safeJson(response); if (data.status === "success") { const uploadedCount = selectedFiles?.length || 0; setUploadMessage("✅ Successfully uploaded the files. Processing..."); @@ -233,7 +234,7 @@ const IngestGraph: React.FC = ({ isModal = false }) => { return; } let uploadedCount = 0; - let failedCount = 0; + const failedFiles: string[] = []; const totalFiles = filesArray.length; // Upload files one at a time to avoid 413 errors @@ -261,27 +262,35 @@ const IngestGraph: React.FC = ({ isModal = false }) => { ); if (!response.ok) { - throw new Error(`Upload failed with status ${response.status}`); + const errData = await safeJson(response); + const reason = errData.detail || `HTTP ${response.status}`; + console.error(`File "${file.name}" failed: ${reason}`); + failedFiles.push(`${file.name} (${reason})`); + continue; } - const data = await response.json(); + const data = await safeJson(response); if (data.status === "success") { uploadedCount++; } else { - failedCount++; - console.error(`File ${file.name} failed:`, data); + const reason = data.message || "unexpected response"; + console.error(`File "${file.name}" failed:`, data); + failedFiles.push(`${file.name} (${reason})`); } - } catch (err) { - console.error(`File ${file.name} error:`, err); - failedCount++; + } catch (err: any) { + console.error(`File "${file.name}" error:`, err); + failedFiles.push(`${file.name} (${err.message || "unknown error"})`); } } // Show final result - if (failedCount === 0) { + if (failedFiles.length === 0) { setUploadMessage(`✅ Successfully uploaded all ${uploadedCount} files. Processing...`); } else { - setUploadMessage(`⚠️ Uploaded ${uploadedCount} files successfully, ${failedCount} failed. Processing...`); + const failedList = failedFiles.join(", "); + setUploadMessage( + `⚠️ Uploaded ${uploadedCount}/${totalFiles} files. Failed: ${failedList}` + ); } setSelectedFiles(null); @@ -318,7 +327,7 @@ const IngestGraph: React.FC = ({ isModal = false }) => { headers: { Authorization: creds! }, } ); - const data = await response.json(); + const data = await safeJson(response); setUploadMessage(`✅ ${data.message}`); await fetchUploadedFiles(); } catch (error: any) { @@ -341,7 +350,7 @@ const IngestGraph: React.FC = ({ isModal = false }) => { method: "DELETE", headers: { Authorization: creds! }, }); - const data = await response.json(); + const data = await safeJson(response); setUploadMessage(`✅ ${data.message}`); await fetchUploadedFiles(); } catch (error: any) { @@ -358,7 +367,7 @@ const IngestGraph: React.FC = ({ isModal = false }) => { const response = await fetch(`/ui/${ingestGraphName}/cloud/list`, { headers: { Authorization: creds! }, }); - const data = await response.json(); + const data = await safeJson(response); setDownloadedFiles(data.files || []); } catch (error) { console.error("Error fetching downloaded files:", error); @@ -433,10 +442,10 @@ const IngestGraph: React.FC = ({ isModal = false }) => { }); if (!response.ok) { - const errorData = await response.json(); + const errorData = await safeJson(response); throw new Error(errorData.detail || `Download failed: ${response.statusText}`); } - const data = await response.json(); + const data = await safeJson(response); if (data.status === "success") { const downloadCount = data.downloaded_files?.length || downloadedFiles.length; setDownloadMessage("✅ Successfully downloaded the files. Processing..."); @@ -478,7 +487,7 @@ const IngestGraph: React.FC = ({ isModal = false }) => { headers: { Authorization: creds! }, } ); - const data = await response.json(); + const data = await safeJson(response); setDownloadMessage(`✅ ${data.message}`); await fetchDownloadedFiles(); } catch (error: any) { @@ -501,7 +510,7 @@ const IngestGraph: React.FC = ({ isModal = false }) => { method: "DELETE", headers: { Authorization: creds! }, }); - const data = await response.json(); + const data = await safeJson(response); setDownloadMessage(`✅ ${data.message}`); await fetchDownloadedFiles(); } catch (error: any) { @@ -546,12 +555,12 @@ const IngestGraph: React.FC = ({ isModal = false }) => { } ); if (!createResp.ok) { - const err = await createResp.json(); + const err = await safeJson(createResp); throw new Error( err.detail || `Failed to create ingest job: ${createResp.statusText}` ); } - const createData = await createResp.json(); + const createData = await safeJson(createResp); jobData = { load_job_id: createData.load_job_id, data_source_id: createData.data_source_id, @@ -575,11 +584,11 @@ const IngestGraph: React.FC = ({ isModal = false }) => { }); if (!ingestResponse.ok) { - const errorData = await ingestResponse.json(); + const errorData = await safeJson(ingestResponse); throw new Error(errorData.detail || `Failed to ingest: ${ingestResponse.statusText}`); } - const ingestData = await ingestResponse.json(); + const ingestData = await safeJson(ingestResponse); console.log("Ingest response:", ingestData); setIngestMessage(`✅ Ingestion completed successfully!`); @@ -627,11 +636,11 @@ const IngestGraph: React.FC = ({ isModal = false }) => { }); if (!createResponse.ok) { - const errorData = await createResponse.json(); + const errorData = await safeJson(createResponse); throw new Error(errorData.detail || `Failed to create ingest job: ${createResponse.statusText}`); } - const createData = await createResponse.json(); + const createData = await safeJson(createResponse); console.log("Create ingest response:", createData); // Store ingest job data for later use @@ -662,11 +671,11 @@ const IngestGraph: React.FC = ({ isModal = false }) => { }); if (!ingestResponse.ok) { - const errorData = await ingestResponse.json(); + const errorData = await safeJson(ingestResponse); throw new Error(errorData.detail || `Failed to run ingest: ${ingestResponse.statusText}`); } - const ingestData = await ingestResponse.json(); + const ingestData = await safeJson(ingestResponse); console.log("Ingest response:", ingestData); setIngestMessage(`✅ Data ingested successfully! Processed documents from ${folderPath}/`); @@ -719,12 +728,12 @@ const IngestGraph: React.FC = ({ isModal = false }) => { console.log("create_ingest response status:", createResponse.status); if (!createResponse.ok) { - const errorData = await createResponse.json(); + const errorData = await safeJson(createResponse); console.error("create_ingest error:", errorData); throw new Error(errorData.detail || `Failed to create ingest job: ${createResponse.statusText}`); } - const createData = await createResponse.json(); + const createData = await safeJson(createResponse); console.log("create_ingest response data:", createData); setIngestJobData({ @@ -870,14 +879,14 @@ const IngestGraph: React.FC = ({ isModal = false }) => { ); if (!createResponse.ok) { - const errorData = await createResponse.json(); + const errorData = await safeJson(createResponse); throw new Error( errorData.detail || `Failed to create ingest job: ${createResponse.statusText}` ); } - const createData = await createResponse.json(); + const createData = await safeJson(createResponse); // Step 2: Run ingest loadingInfo = { @@ -904,13 +913,13 @@ const IngestGraph: React.FC = ({ isModal = false }) => { }); if (!ingestResponse.ok) { - const errorData = await ingestResponse.json(); + const errorData = await safeJson(ingestResponse); throw new Error( errorData.detail || `Failed to run ingest: ${ingestResponse.statusText}` ); } - const ingestData = await ingestResponse.json(); + const ingestData = await safeJson(ingestResponse); const filesIngested = ingestData.summary.map((file: any) => file.file_path); setIngestMessage( From 50ddb5576a5f158b8b6c942ea2044ca6a5579f6e Mon Sep 17 00:00:00 2001 From: Chengbiao Jin Date: Tue, 23 Jun 2026 13:32:14 -0700 Subject: [PATCH 4/7] GML-2135 Release 1.4.2 - Bump version to 1.4.2 - Update CHANGELOG and README releases - Add .gitignore - GML-2131 Honor configured GS/RESTPP ports at login --- .gitignore | 20 ++++++++++++++++++++ CHANGELOG.md | 11 +++++++++++ README.md | 2 ++ VERSION | 2 +- graphrag/app/routers/ui.py | 4 ++++ 5 files changed, 38 insertions(+), 1 deletion(-) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..fbb5958 --- /dev/null +++ b/.gitignore @@ -0,0 +1,20 @@ +# Python +__pycache__/ +*.pyc +.coverage + +# Playwright +/test-results/ +/playwright-report/ +/blob-report/ +/e2e/ +/e2e/.auth/state.json +/playwright.config.ts + +# Node +/node_modules/ +/package.json +/package-lock.json + +# Release docs +/RELEASE_NOTES.md diff --git a/CHANGELOG.md b/CHANGELOG.md index a50cf0b..1d48d1e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,16 @@ # Changelog +## [1.4.2] + +### Added +- **Graph compatibility check and repair.** The *Knowledge Graph Admin* page can now scan an existing graph for installed GSQL queries whose body has drifted from the shipped version — or that are missing entirely — and repair them in place, without rebuilding the knowledge graph. This makes it safe to pick up query fixes from a GraphRAG upgrade on graphs that already hold data. Repair runs under the per-graph lock and is refused while a rebuild is in progress. New endpoints: `GET /ui/{graphname}/migration/status` and `POST /ui/{graphname}/migration/apply`. + +### Changed +- **Documents whose filenames contain spaces or mixed case now ingest reliably.** Every ingest path normalizes vertex IDs the same way, so a document is stored under one consistent id instead of diverging between paths — it becomes retrievable and participates in entity extraction instead of being silently skipped or duplicated. +- **Interrupted knowledge-graph rebuilds recover cleanly on the next run.** Chunks left unfinished by a crashed or cancelled run are reconciled before new documents are processed, and each chunk is written together with its content in one step — so rebuilds no longer leave chunks without content or emit spurious "missing content" warnings. +- **Shipped query fixes apply automatically on existing graphs.** At initialization, any query whose installed body differs from the shipped version is re-created, so improvements in the bundled queries take effect after an upgrade without a manual reinstall. +- **The document ingestion dialog reports per-file upload failures.** When some uploads fail, the dialog names the affected files and the reason instead of failing the batch opaquely. + ## [1.4.1] ### Added diff --git a/README.md b/README.md index 4e85aff..f4d9409 100644 --- a/README.md +++ b/README.md @@ -63,6 +63,8 @@ --- ## Releases +* **6/23/2026**: GraphRAG v1.4.2 released. Added a knowledge graph compatibility check and repair tool to pick up shipped query fixes on existing graphs, along with more reliable ingestion for documents with spaces in their filenames and other improvements and bug fixes. See [Release Notes](https://github.com/tigergraph/graphrag/releases/tag/v1.4.2) for details. +* **5/30/2026**: GraphRAG v1.4.1 released. Added token-based login and a pre-flight upload conflict check, along with more resilient chat when vector search is unavailable and other improvements and bug fixes. See [Release Notes](https://github.com/tigergraph/graphrag/releases/tag/v1.4.1) for details. * **5/16/2026**: GraphRAG v1.4.0 released. Added schema-aware knowledge graphs, auto retrieval method selection, and a Trace Logs UI, along with many other improvements and bug fixes. See [Release Notes](https://github.com/tigergraph/graphrag/releases/tag/v1.4.0) for details. * **4/10/2026**: GraphRAG v1.3.0 released. Added an admin configuration UI with role-based access and per-graph chatbot LLM override, along with many other improvements and bug fixes. See [Release Notes](https://github.com/tigergraph/graphrag/releases/tag/v1.3.0) for details. * **2/28/2026**: GraphRAG v1.2.0 released. Added Admin UI for graph initialization, document ingestion, and knowledge graph rebuild, along with many other improvements and bug fixes. See [Release Notes](https://github.com/tigergraph/graphrag/releases/tag/v1.2.0) for details. diff --git a/VERSION b/VERSION index 347f583..9df886c 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.4.1 +1.4.2 diff --git a/graphrag/app/routers/ui.py b/graphrag/app/routers/ui.py index cf42e1c..0820782 100644 --- a/graphrag/app/routers/ui.py +++ b/graphrag/app/routers/ui.py @@ -497,11 +497,15 @@ def auth(usr: str, password: str, conn=None) -> tuple[list[str], TigerGraphConne conn = TigerGraphConnection( host=db_config["hostname"], graphname="", apiToken=password, + gsPort=db_config.get("gsPort"), + restppPort=db_config.get("restppPort"), ) else: conn = TigerGraphConnection( host=db_config["hostname"], graphname="", username=usr, password=password, + gsPort=db_config.get("gsPort"), + restppPort=db_config.get("restppPort"), ) try: From f9b27b6a72c22fcbdac6420e7c9c6ab6e6e2ba50 Mon Sep 17 00:00:00 2001 From: omarAyoubi <94470413+OmarAyo1@users.noreply.github.com> Date: Wed, 24 Jun 2026 00:42:18 +0400 Subject: [PATCH 5/7] GML-2131: Respect configured TigerGraph ports during UI login (#44) --- graphrag/app/routers/ui.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/graphrag/app/routers/ui.py b/graphrag/app/routers/ui.py index c0d5426..22d4b3d 100644 --- a/graphrag/app/routers/ui.py +++ b/graphrag/app/routers/ui.py @@ -493,15 +493,25 @@ def auth(usr: str, password: str, conn=None) -> tuple[list[str], TigerGraphConne # * ``__GSQL__secret:`` → TigerGraph's native secret # convention; pyTigerGraph already understands it when sent # as plain username/password, so no special handling here. + connection_kwargs = { + "host": db_config["hostname"], + "graphname": "", + } + if db_config.get("gsPort") is not None: + connection_kwargs["gsPort"] = db_config["gsPort"] + if db_config.get("restppPort") is not None: + connection_kwargs["restppPort"] = db_config["restppPort"] + if usr == _UI_TOKEN_SENTINEL: conn = TigerGraphConnection( - host=db_config["hostname"], graphname="", + **connection_kwargs, apiToken=password, ) else: conn = TigerGraphConnection( - host=db_config["hostname"], graphname="", - username=usr, password=password, + **connection_kwargs, + username=usr, + password=password, ) try: From efff2609fbb05bf537537d15e40da907012aeff3 Mon Sep 17 00:00:00 2001 From: Chengbiao Jin Date: Tue, 23 Jun 2026 13:48:19 -0700 Subject: [PATCH 6/7] GML-2131 Guard configured ports in role-resolution connection - Omit gsPort/restppPort unless configured, matching the auth() path, so absent config falls back to pyTigerGraph defaults --- graphrag/app/routers/ui.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/graphrag/app/routers/ui.py b/graphrag/app/routers/ui.py index 1acd2d0..de38351 100644 --- a/graphrag/app/routers/ui.py +++ b/graphrag/app/routers/ui.py @@ -259,22 +259,25 @@ def _get_user_role_details( # (``__GSQL__secret``) and classic user/password both go through # the username/password slots (pyTigerGraph routes the secret # case natively). + connection_kwargs = { + "host": db_config.get("hostname"), + "graphname": "", + } + if db_config.get("gsPort") is not None: + connection_kwargs["gsPort"] = db_config["gsPort"] + if db_config.get("restppPort") is not None: + connection_kwargs["restppPort"] = db_config["restppPort"] + if username == _UI_TOKEN_SENTINEL: conn = TigerGraphConnection( - host=db_config.get("hostname"), - gsPort=db_config.get("gsPort"), - restppPort=db_config.get("restppPort"), - graphname="", + **connection_kwargs, apiToken=password, ) else: conn = TigerGraphConnection( - host=db_config.get("hostname"), + **connection_kwargs, username=username, password=password, - gsPort=db_config.get("gsPort"), - restppPort=db_config.get("restppPort"), - graphname="", ) # Transient GSQL hiccups when the role-cache TTL expires were From a1d6ff856c6f915b82764f86f2dc95aa2c1491b1 Mon Sep 17 00:00:00 2001 From: Chengbiao Jin Date: Tue, 23 Jun 2026 15:15:43 -0700 Subject: [PATCH 7/7] GML-2133 Refine ingestion changes per review - Restore partitioned batch streaming for documents and chunks - Check vertex existence through the TigerGraph client instead of hand-built requests - Use lowercase document ids consistently across the batch ingest path --- ecc/app/graphrag/graph_rag.py | 164 ++++++++------------- ecc/app/graphrag/util.py | 21 +-- graphrag/app/supportai/supportai_ingest.py | 12 +- 3 files changed, 79 insertions(+), 118 deletions(-) diff --git a/ecc/app/graphrag/graph_rag.py b/ecc/app/graphrag/graph_rag.py index 9115fa7..9950c0d 100644 --- a/ecc/app/graphrag/graph_rag.py +++ b/ecc/app/graphrag/graph_rag.py @@ -54,48 +54,34 @@ async def stream_docs( progress=None, ): """ - Stream unprocessed Documents (epoch_processed == 0) into docs_chan. - - Single-probe scan: call StreamIds with ttl_batches=1 to fetch ALL - unprocessed Document ids in one round-trip. Same rationale as - stream_chunks — partitioning iterated the vertex space even when - nothing was to do; on this TG build that's not the bottleneck. - - ``ttl_batches`` is preserved for caller-compatibility but ignored. + Streams the document contents into the docs_chan, over ``ttl_batches`` + partitions (each StreamIds call claims and returns one partition). *progress* (optional) is a callable invoked once when document streaming completes — runtime hands the rebuild status forward from "Chunking documents" to "Extracting entities and relationships" at that boundary. """ - _ = ttl_batches # intentionally unused — see docstring - logger.info("streaming docs (single-probe scan)") - probe = await stream_ids(conn, "Document", 0, 1) + logger.info(f"streaming docs ({ttl_batches} batches)") n_docs = 0 - if probe.get("error"): - logger.warning("stream_docs: StreamIds probe failed; nothing to stream") - else: - doc_ids_all = probe.get("ids") or [] - if not doc_ids_all: - logger.info("stream_docs: no unprocessed Documents (epoch_processed == 0)") - else: - logger.info( - f"stream_docs: {len(doc_ids_all)} unprocessed Document(s) to stream" - ) - for d in doc_ids_all: - try: - async with tg_sem: - res = await conn.runInstalledQuery( - "StreamDocContent", - params={"doc": (d,)}, - ) - logger.debug(f"stream_docs writes {d} to docs") - await docs_chan.put(res[0]["DocContent"][0]) - n_docs += 1 - except Exception as e: - exc = traceback.format_exc() - logger.error(f"Error retrieving doc: {d} --> {e}\n{exc}") - continue + for i in range(ttl_batches): + doc_ids = await stream_ids(conn, "Document", i, ttl_batches) + if doc_ids["error"]: + continue + for d in doc_ids["ids"]: + try: + async with tg_sem: + res = await conn.runInstalledQuery( + "StreamDocContent", + params={"doc": (d,)}, + ) + logger.debug(f"stream_docs writes {d} to docs") + await docs_chan.put(res[0]["DocContent"][0]) + n_docs += 1 + except Exception as e: + exc = traceback.format_exc() + logger.error(f"Error retrieving doc: {d} --> {e}\n{exc}") + continue logger.info(f"stream_docs done: {n_docs} document(s) streamed") # close the docs chan -- this function is the only sender @@ -114,74 +100,54 @@ async def stream_chunks( ttl_batches: int = 10, ): """ - Stream residual unprocessed DocumentChunks into extract_chan + embed_chan. - - Single-probe scan: call StreamIds with ttl_batches=1 (no partitioning) - to get the full set of unprocessed DocumentChunk ids in one round-trip. - StreamIds' POST-ACCUM atomically claims and marks the chunks in the same - query. The original 100-batch loop iterated the partition space even - when there was nothing to do — this collapses to one query, which is - the common case under the post-back-port ordering where stream_chunks - runs first and only sees residual orphans from prior crashed ECC runs. - - ``ttl_batches`` is preserved for caller-compatibility but ignored; the - partitioning was a hedge against scanning huge vertex sets in one - query and that's not the bottleneck on this TG build. + Stream residual unprocessed DocumentChunks into extract_chan + embed_chan, + over ``ttl_batches`` partitions (each StreamIds call claims and returns one + partition via ``getvid % ttl_batches``). Under the current ordering + stream_chunks runs before chunk_docs writes new chunks, so it normally only + sees residual orphans from a prior crashed ECC run. """ - _ = ttl_batches # intentionally unused — see docstring - logger.info("streaming chunks (single-probe scan)") - probe = await stream_ids(conn, "DocumentChunk", 0, 1) - if probe.get("error"): - logger.warning("stream_chunks: StreamIds probe failed; nothing to process") - logger.info("stream_chunks done: 0 chunk(s) streamed") - await extract_chan.put(None) - return - chunk_ids_all = probe.get("ids") or [] - if not chunk_ids_all: - logger.info("stream_chunks: no residual chunks (epoch_processed == 0)") - logger.info("stream_chunks done: 0 chunk(s) streamed") - await extract_chan.put(None) - return - logger.info( - f"stream_chunks: {len(chunk_ids_all)} residual chunk(s) to process" - ) + logger.info(f"streaming chunks ({ttl_batches} batches)") n_chunks = 0 - for c in chunk_ids_all: - try: - # stream_chunks runs before chunk_docs writes new chunks, so an - # empty ChunkContent here means a genuine orphan from a crashed - # prior run (DocumentChunk exists but no Content). Retry briefly - # in case of a transient read. - chunk_rows = [] - for attempt in range(3): - async with tg_sem: - res = await conn.runInstalledQuery( - "StreamChunkContent", - params={"chunk": (c,)}, + for i in range(ttl_batches): + chunk_ids = await stream_ids(conn, "DocumentChunk", i, ttl_batches) + if chunk_ids["error"]: + continue + for c in chunk_ids["ids"]: + try: + # stream_chunks runs before chunk_docs writes new chunks, so an + # empty ChunkContent here means a genuine orphan from a crashed + # prior run (DocumentChunk exists but no Content). Retry briefly + # in case of a transient read. + chunk_rows = [] + for attempt in range(3): + async with tg_sem: + res = await conn.runInstalledQuery( + "StreamChunkContent", + params={"chunk": (c,)}, + ) + chunk_rows = (res[0] if res else {}).get("ChunkContent") or [] + if chunk_rows: + break + await asyncio.sleep(2 * (attempt + 1)) + if not chunk_rows: + logger.warning( + f"No content row for chunk {c} after retries; skipping" ) - chunk_rows = (res[0] if res else {}).get("ChunkContent") or [] - if chunk_rows: - break - await asyncio.sleep(2 * (attempt + 1)) - if not chunk_rows: - logger.warning( - f"No content row for chunk {c} after retries; skipping" - ) + continue + content = chunk_rows[0]["attributes"]["text"].encode( + 'raw_unicode_escape' + ).decode('unicode_escape') + logger.debug("chunk writes to extract_chan") + await extract_chan.put((content, c)) + logger.debug("chunk writes to embed_chan") + await embed_chan.put((c, content, "DocumentChunk")) + n_chunks += 1 + if n_chunks % 100 == 0: + logger.info(f"streaming chunks: {n_chunks} streamed") + except Exception as e: + exc = traceback.format_exc() + logger.error(f"Error retrieving chunk: {c} --> {e}\n{exc}") continue - content = chunk_rows[0]["attributes"]["text"].encode( - 'raw_unicode_escape' - ).decode('unicode_escape') - logger.debug("chunk writes to extract_chan") - await extract_chan.put((content, c)) - logger.debug("chunk writes to embed_chan") - await embed_chan.put((c, content, "DocumentChunk")) - n_chunks += 1 - if n_chunks % 100 == 0: - logger.info(f"streaming chunks: {n_chunks} streamed") - except Exception as e: - exc = traceback.format_exc() - logger.error(f"Error retrieving chunk: {c} --> {e}\n{exc}") - continue logger.info(f"stream_chunks done: {n_chunks} chunk(s) streamed") logger.info("closing extract_chan") diff --git a/ecc/app/graphrag/util.py b/ecc/app/graphrag/util.py index c388f48..107346e 100644 --- a/ecc/app/graphrag/util.py +++ b/ecc/app/graphrag/util.py @@ -534,9 +534,9 @@ async def upsert_batch( ) # Diagnostic: TG can silently skip vertices/edges (schema # mismatch, primary-id conflict, etc.) and only surface a - # count. When that happens, log the type/id breakdown of what - # was in the batch so the missing ones can be identified, and - # verify which vertex IDs actually landed in TG. + # count. When that happens, check which of the sent vertex + # ids actually landed in TG so the missing ones can be + # identified. if sk_v or sk_e: try: payload = json.loads(data) @@ -548,19 +548,13 @@ async def upsert_batch( f"skipped_edges={sk_e}; sent vertex types: " f"{ {vt: len(vids) for vt, vids in v_section.items()} }" ) - from urllib.parse import quote for vt, vids in v_section.items(): sent = list(vids.keys())[:200] missing = [] for vid in sent: try: - url = ( - conn.restppUrl + "/graph/" + conn.graphname - + "/vertices/" + vt + "/" + quote(vid, safe="") - ) - r = await conn._req("GET", url) - body = r if isinstance(r, dict) else {} - if not (body.get("results") or []): + found = await conn.getVerticesById(vt, vid) + if not found: missing.append(vid) except Exception: missing.append(vid) @@ -582,10 +576,7 @@ async def upsert_batch( async def check_vertex_exists(conn, v_id: str): async with tg_sem: try: - from urllib.parse import quote - url = (conn.restppUrl + "/graph/" + conn.graphname - + "/vertices/Entity/" + quote(v_id, safe="")) - res = await conn._req("GET", url, params={"select": "description"}) + res = await conn.getVerticesById("Entity", v_id, select="description") except Exception as e: if "is not a valid vertex id" not in str(e): diff --git a/graphrag/app/supportai/supportai_ingest.py b/graphrag/app/supportai/supportai_ingest.py index c159777..ae19697 100644 --- a/graphrag/app/supportai/supportai_ingest.py +++ b/graphrag/app/supportai/supportai_ingest.py @@ -102,11 +102,13 @@ def upsert_documents(self, documents: List[Document]): for doc in documents: self.upsert_document(doc) - def upsert_chunk(self, chunk: DocumentChunk): + def upsert_chunk(self, chunk: DocumentChunk, doc_id: str): now = datetime.now() date_added = now.strftime("%Y-%m-%d %H:%M:%S") chunk_id = chunk.document_chunk_id - doc_id = chunk_id.rsplit("_chunk_", 1)[0] + # ``doc_id`` is the caller's already-lowercased Document id + # (lowercase-only, no normalization); chunk ids stay process_id- + # normalized. Use it for the Document endpoints, not the chunk prefix. self.status.progress.chunk_failures[chunk_id] = [] try: self.conn.upsertVertex( @@ -209,7 +211,7 @@ def upsert_chunk(self, chunk: DocumentChunk): def upsert_document(self, document: Document): now = datetime.now() date_added = now.strftime("%Y-%m-%d %H:%M:%S") - doc_id = _process_id(document.document_id) + doc_id = document.document_id.lower() doc_collection = document.document_collection self.status.progress.doc_failures[doc_id] = [] try: @@ -309,6 +311,8 @@ def _ingest(self, documents: List[Document], chunker, chunker_params): doc.document_id: len(doc.document_chunks) for doc in documents } for doc in documents: + # Document id is lowercase-only; compute once per document. + doc_id = doc.document_id.lower() res = self.document_er_extraction(doc) doc.entities = res["nodes"] doc.relationships = res["rels"] @@ -320,7 +324,7 @@ def _ingest(self, documents: List[Document], chunker, chunker_params): res = self.document_er_extraction(chunk) chunk.entities = res["nodes"] chunk.relationships = res["rels"] - self.upsert_chunk(chunk) + self.upsert_chunk(chunk, doc_id) self.upsert_document(doc) self.status.progress.num_docs_ingested += 1 self.status.status = "complete"