Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Python
__pycache__/
*.pyc
.coverage

# Playwright
/test-results/
/playwright-report/
/blob-report/
/e2e/
/e2e/.auth/state.json
/playwright.config.ts

# Node
/node_modules/
/package.json
/package-lock.json

# Release docs
/RELEASE_NOTES.md
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
# Changelog

## [1.4.2]

### Added
- **Graph compatibility check and repair.** The *Knowledge Graph Admin* page can now scan an existing graph for installed GSQL queries whose body has drifted from the shipped version — or that are missing entirely — and repair them in place, without rebuilding the knowledge graph. This makes it safe to pick up query fixes from a GraphRAG upgrade on graphs that already hold data. Repair runs under the per-graph lock and is refused while a rebuild is in progress. New endpoints: `GET /ui/{graphname}/migration/status` and `POST /ui/{graphname}/migration/apply`.

### Changed
- **Documents whose filenames contain spaces or mixed case now ingest reliably.** Every ingest path normalizes vertex IDs the same way, so a document is stored under one consistent id instead of diverging between paths — it becomes retrievable and participates in entity extraction instead of being silently skipped or duplicated.
- **Interrupted knowledge-graph rebuilds recover cleanly on the next run.** Chunks left unfinished by a crashed or cancelled run are reconciled before new documents are processed, and each chunk is written together with its content in one step — so rebuilds no longer leave chunks without content or emit spurious "missing content" warnings.
- **Shipped query fixes apply automatically on existing graphs.** At initialization, any query whose installed body differs from the shipped version is re-created, so improvements in the bundled queries take effect after an upgrade without a manual reinstall.
- **The document ingestion dialog reports per-file upload failures.** When some uploads fail, the dialog names the affected files and the reason instead of failing the batch opaquely.

## [1.4.1]

### Added
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@
---

## Releases
* **6/23/2026**: GraphRAG v1.4.2 released. Added a knowledge graph compatibility check and repair tool to pick up shipped query fixes on existing graphs, along with more reliable ingestion for documents with spaces in their filenames and other improvements and bug fixes. See [Release Notes](https://github.com/tigergraph/graphrag/releases/tag/v1.4.2) for details.
* **5/30/2026**: GraphRAG v1.4.1 released. Added token-based login and a pre-flight upload conflict check, along with more resilient chat when vector search is unavailable and other improvements and bug fixes. See [Release Notes](https://github.com/tigergraph/graphrag/releases/tag/v1.4.1) for details.
* **5/16/2026**: GraphRAG v1.4.0 released. Added schema-aware knowledge graphs, auto retrieval method selection, and a Trace Logs UI, along with many other improvements and bug fixes. See [Release Notes](https://github.com/tigergraph/graphrag/releases/tag/v1.4.0) for details.
* **4/10/2026**: GraphRAG v1.3.0 released. Added an admin configuration UI with role-based access and per-graph chatbot LLM override, along with many other improvements and bug fixes. See [Release Notes](https://github.com/tigergraph/graphrag/releases/tag/v1.3.0) for details.
* **2/28/2026**: GraphRAG v1.2.0 released. Added Admin UI for graph initialization, document ingestion, and knowledge graph rebuild, along with many other improvements and bug fixes. See [Release Notes](https://github.com/tigergraph/graphrag/releases/tag/v1.2.0) for details.
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.4.1
1.4.2
224 changes: 224 additions & 0 deletions common/db/migrate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
"""Generic migration helpers for upgrading existing graphs to the
current release's GSQL queries and schema.

The release-cut workflow that motivates this module:

* On an existing graph (created against an older version), the customer
upgrades graphrag. The new release may ship modified GSQL query
bodies or expanded vertex/edge attributes. Without an automatic
migration step the old, stale objects keep serving requests — leading
to surprising behavior that's hard to attribute.

* ``check_and_reinstall_queries`` compares each shipped ``.gsql`` file
against the body currently installed on TigerGraph and re-creates +
re-installs only the ones whose body has actually drifted.

* ``check_and_apply_schema`` (still a stub — see TODO below) is the
schema counterpart: detect missing attributes on existing vertex /
edge types and emit ``ALTER VERTEX ... ADD ATTRIBUTE …`` statements.

Designed to be importable from both the graphrag FastAPI app (sync TG
connection via pyTigerGraph) and the ECC worker (async connection).
The sync entry points wrap the same comparison logic.
"""

from __future__ import annotations

import hashlib
import logging
import os
import re
from typing import Iterable

logger = logging.getLogger(__name__)


# ---------------------------------------------------------------------------
# GSQL body normalization
# ---------------------------------------------------------------------------
#
# We compare the local ``.gsql`` text with TG's ``SHOW QUERY`` output.
# TG may canonicalize comments and whitespace differently from what was
# CREATE-d, so a literal byte-compare is noisy. Normalize both sides
# (strip comments, collapse whitespace) before hashing.
_LINE_COMMENT_RE = re.compile(r"//[^\n]*")
_BLOCK_COMMENT_RE = re.compile(r"/\*.*?\*/", re.DOTALL)
_WHITESPACE_RE = re.compile(r"\s+")


def _normalize_gsql(body: str) -> str:
body = _BLOCK_COMMENT_RE.sub("", body)
body = _LINE_COMMENT_RE.sub("", body)
body = _WHITESPACE_RE.sub(" ", body).strip()
return body


def _gsql_hash(body: str) -> str:
return hashlib.sha256(_normalize_gsql(body).encode()).hexdigest()[:16]


# Pull the ``CREATE … QUERY <name>(…) { … }`` block out of TG's
# ``SHOW QUERY <name>`` output. The output also carries a status banner
# we don't want to fold into the hash.
_QUERY_BLOCK_RE = re.compile(
r"(CREATE\s+(?:OR\s+REPLACE\s+)?(?:DISTRIBUTED\s+)?QUERY\s+\w+.*)",
re.DOTALL,
)


def _extract_query_body(show_query_output: str) -> str:
m = _QUERY_BLOCK_RE.search(show_query_output)
return m.group(1) if m else ""


def _query_name_from_path(query_path: str) -> str:
"""``common/gsql/graphrag/StreamIds.gsql`` → ``StreamIds``."""
base = os.path.basename(query_path)
return base[:-5] if base.endswith(".gsql") else base


def _read_local_query(query_path: str) -> str | None:
try:
with open(query_path, "r") as f:
return f.read()
except FileNotFoundError:
logger.warning(f"Local query file missing: {query_path}")
return None


# ---------------------------------------------------------------------------
# Sync API (used by graphrag/app/supportai/supportai.py init_supportai)
# ---------------------------------------------------------------------------

def query_needs_update_sync(conn, graphname: str, query_path: str) -> bool:
"""Return True when the local ``.gsql`` body differs from what's
installed on TG (or when the query is missing on TG).

Wraps a synchronous ``conn.gsql()`` call. Errors fetching the
installed body are treated as "needs update" so the caller falls
back to re-installation rather than silently skipping.
"""
local_body = _read_local_query(query_path)
if local_body is None:
return False # nothing local to reinstall

q_name = _query_name_from_path(query_path)
local_hash = _gsql_hash(local_body)

try:
installed_text = conn.gsql(f"USE GRAPH {graphname}\nSHOW QUERY {q_name}")
except Exception as e:
logger.warning(f"SHOW QUERY {q_name} failed ({e}); will reinstall.")
return True

installed_body = _extract_query_body(str(installed_text))
if not installed_body:
logger.info(f"Query '{q_name}' not installed yet; will install.")
return True

installed_hash = _gsql_hash(installed_body)
drifted = local_hash != installed_hash
if drifted:
logger.info(
f"Query '{q_name}' body has drifted from local ({installed_hash} != "
f"{local_hash}); will reinstall."
)
return drifted


def filter_queries_needing_update_sync(
conn,
graphname: str,
query_paths: Iterable[str],
) -> list[str]:
"""Return the subset of ``query_paths`` whose local body differs
from TG's installed body. Use to skip unnecessary CREATE OR REPLACE
+ INSTALL QUERY ALL roundtrips on warm graphs.
"""
return [p for p in query_paths if query_needs_update_sync(conn, graphname, p)]


# ---------------------------------------------------------------------------
# Async API (used by ecc/app/graphrag/util.py install_queries)
# ---------------------------------------------------------------------------

async def query_needs_update_async(conn, query_path: str) -> bool:
"""Async variant of :func:`query_needs_update_sync` for the ECC
worker's ``AsyncTigerGraphConnection``. Reads ``conn.graphname``
rather than taking it as a separate arg, matching how the rest of
the ECC code threads the connection.
"""
local_body = _read_local_query(query_path)
if local_body is None:
return False

q_name = _query_name_from_path(query_path)
local_hash = _gsql_hash(local_body)

try:
installed_text = await conn.gsql(
f"USE GRAPH {conn.graphname}\nSHOW QUERY {q_name}"
)
except Exception as e:
logger.warning(f"SHOW QUERY {q_name} failed ({e}); will reinstall.")
return True

installed_body = _extract_query_body(str(installed_text))
if not installed_body:
logger.info(f"Query '{q_name}' not installed yet; will install.")
return True

installed_hash = _gsql_hash(installed_body)
drifted = local_hash != installed_hash
if drifted:
logger.info(
f"Query '{q_name}' body has drifted from local ({installed_hash} != "
f"{local_hash}); will reinstall."
)
return drifted


async def filter_queries_needing_update_async(
conn,
query_paths: Iterable[str],
) -> list[str]:
out: list[str] = []
for p in query_paths:
if await query_needs_update_async(conn, p):
out.append(p)
return out


# ---------------------------------------------------------------------------
# Schema migration — TODO
# ---------------------------------------------------------------------------
#
# Goal: detect attributes that exist in the shipped schema ``.gsql``
# files but are missing on the live graph, and emit
# ``ALTER VERTEX <T> ADD ATTRIBUTE <name> <type> [DEFAULT …]``
# statements wrapped in a ``CREATE SCHEMA_CHANGE JOB`` so the operator
# never has to run them by hand.
#
# Outline (deferred implementation):
# 1. Parse each shipped ``SupportAI_Schema*.gsql`` (and any other
# schema-relevant .gsql) to build the expected
# ``{vertex_type: {attr: tg_type}}`` map.
# 2. Query live schema via ``conn.getSchema()`` (or parse ``ls``
# output) to build the same map for the running graph.
# 3. For each declared type, compute ``expected - current``.
# 4. Emit one ``CREATE SCHEMA_CHANGE JOB`` that ADDs the missing
# attributes with their declared defaults.
# 5. ``RUN SCHEMA_CHANGE JOB`` and drop it.
#
# v1.4.2 doesn't add any new attributes on existing vertex types
# (the ``Document.name`` / ``Image.name`` from v2.0 A3 were
# intentionally skipped), so this is a no-op for the current release.
# Stub kept here so future migrations have a place to land.

def check_and_apply_schema(conn, graphname: str) -> dict:
"""Compare expected vertex/edge attributes against the live schema
and apply any missing additions. Returns a summary dict.

Stubbed for v1.4.2 — no attribute additions ship in this release.
"""
return {"applied": [], "skipped_reason": "no schema deltas in this release"}
21 changes: 15 additions & 6 deletions common/gsql/graphrag/StreamIds.gsql
Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
CREATE OR REPLACE DISTRIBUTED QUERY StreamIds(INT current_batch, INT ttl_batches, STRING v_type) {
/*
* Get the IDs of entitiess that have not already been processed
* (one batch at a time)
* Return one batch of unprocessed vertex IDs, partitioned by
* ``getvid(v) % ttl_batches``. Use ``getvid`` rather than
* ``vertex_to_int`` — only ``getvid`` is stable for STRING-primary-id
* vertex types.
*
* POST-ACCUM (not WHERE) gates on ``epoch_processed == 0`` so that
* collecting the id and stamping the claim (``epoch_processed = now()``)
* happen in one per-vertex step: a vertex can't be claimed without
* being returned, or returned without being claimed.
*/
ListAccum<STRING> @@ids;
Verts = {v_type};

Verts = SELECT v FROM Verts:v
WHERE vertex_to_int(v) % ttl_batches == current_batch
AND v.epoch_processed == 0
ACCUM @@ids += v.id
POST-ACCUM v.epoch_processed = datetime_to_epoch(now()); // set the processing time
WHERE getvid(v) % ttl_batches == current_batch
POST-ACCUM
IF v.epoch_processed == 0 THEN
@@ids += v.id,
v.epoch_processed = datetime_to_epoch(now())
END;

PRINT @@ids;
}
16 changes: 9 additions & 7 deletions ecc/app/eventual_consistency_checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from common.metrics.tg_proxy import TigerGraphConnectionProxy
from common.chunkers import BaseChunker
from common.extractors import BaseExtractor
from supportai.util import process_id

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -112,13 +113,14 @@ def _upsert_chunk(self, doc_id, chunk_id, chunk):
"DocumentChunk", chunk_id, "HAS_CONTENT", "Content", chunk_id
)
self.conn.upsertEdge("Document", doc_id, "HAS_CHILD", "DocumentChunk", chunk_id)
if int(chunk_id.split("_")[-1]) > 0:
idx = int(chunk_id.split("_")[-1])
if idx > 0:
self.conn.upsertEdge(
"DocumentChunk",
chunk_id,
"IS_AFTER",
"DocumentChunk",
doc_id + "_chunk_" + str(int(chunk_id.split("_")[-1]) - 1),
process_id(f"{doc_id}_chunk_{idx - 1}"),
)

# TODO: Change to loading job for all entities in document at once
Expand All @@ -127,15 +129,15 @@ def _upsert_entities(self, src_id, src_type, entities):
self.conn.upsertVertices(
"Entity",
[
(x["id"], {"definition": x["definition"], "epoch_added": date_added})
(process_id(x["id"]), {"definition": x["definition"], "epoch_added": date_added})
for x in entities
],
)
self.conn.upsertEdges(
src_type,
"CONTAINS_ENTITY",
"Entity",
[(src_id, x["id"], {}) for x in entities],
[(src_id, process_id(x["id"]), {}) for x in entities],
)

# TODO: Change to loading job for all relationships in document at once
Expand All @@ -145,7 +147,7 @@ def _upsert_rels(self, src_id, src_type, relationships):
"RelationshipType",
[
(
x["source"] + ":" + x["type"] + ":" + x["target"],
process_id(x["source"] + ":" + x["type"] + ":" + x["target"]),
{
"definition": x["definition"],
"short_name": x["type"],
Expand All @@ -164,7 +166,7 @@ def _upsert_rels(self, src_id, src_type, relationships):
"MENTIONS_RELATIONSHIP",
"RelationshipType",
[
(src_id, x["source"] + ":" + x["type"] + ":" + x["target"], {})
(src_id, process_id(x["source"] + ":" + x["type"] + ":" + x["target"]), {})
for x in relationships
],
)
Expand Down Expand Up @@ -219,7 +221,7 @@ def _process_document_content(self, v_type, vertex_id, content):
LogWriter.info(f"Chunking the content from vertex type: {v_type}")
chunks = self._chunk_document(content)
for i, chunk in enumerate(chunks):
self._upsert_chunk(vertex_id, f"{vertex_id}_chunk_{i}", chunk)
self._upsert_chunk(vertex_id, process_id(f"{vertex_id}_chunk_{i}"), chunk)

def _extract_and_upsert_entities(self, v_type, vertex_id, content):
LogWriter.info(f"Extracting and upserting entities from the content from vertex type: {v_type}")
Expand Down
Loading
Loading