Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 56 additions & 9 deletions common/lib/links/filters.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,77 @@
import json
import random
from typing import Literal, Optional, TypedDict
from lib.logging_utils import init_logger
from vcon import Vcon

logger = init_logger(__name__)


def is_included(options, _vcon):
class OnlyIfFilter(TypedDict, total=False):
"""``only_if`` clause inside :class:`FilterOptions`.

Exactly one of ``type`` or ``purpose`` should be supplied — they are
aliases. ``purpose`` is the draft-ietf-vcon-vcon-core-02 spelling for
attachments; ``type`` is the pre-0.4.0 name (still canonical for
analysis entries). Either works against either section so existing
configs keep matching, and spec-current configs can use ``purpose``.

Keys:
section: vCon array to scan — ``"attachments"`` or ``"analysis"``.
type / purpose: identifier to match on each element.
includes: substring/membership token to look for inside the body.
"""

section: Literal["attachments", "analysis"]
type: str
purpose: str
includes: str


class FilterOptions(TypedDict, total=False):
"""Options envelope accepted by :func:`is_included`.

Absent / empty ``only_if`` means "include everything", which is why
both ``options`` and ``options.only_if`` are optional.
"""

only_if: OnlyIfFilter


def is_included(options: Optional[FilterOptions], _vcon) -> bool:
if not options:
return True
if not options.get("only_if"):
return True
filter = options["only_if"]
section = filter["section"]
type = filter["type"]
# Accept either the spec-current ``purpose`` or legacy ``type`` key
# as the target identifier. They're treated as aliases regardless of
# section so configs migrate at their own pace.
target = filter.get("purpose") or filter.get("type")
includes = filter["includes"]

try:
for element in getattr(_vcon, section):
body_as_string = json.dumps(element["body"]) if element["encoding"] == "json" else element["body"]
if not element["type"] == type:
# draft-ietf-vcon-vcon-core-02 renamed attachment ``type`` →
# ``purpose``. Accept either on attachments so configs written
# against the legacy shape keep working alongside spec-current
# writers. Analysis kept the ``type`` field.
if section == "attachments":
if element.get("purpose") != target and element.get("type") != target:
continue
elif element.get("type") != target:
continue
if type == "tags":
tags = element["body"]
if includes in tags:
if target == "tags":
# Tags body is a JSON-encoded list of "name:value" strings —
# the one case where we have to decode before checking.
tags = Vcon.decoded_body(element)
if isinstance(tags, list) and includes in tags:
return True
elif includes in body_as_string:
continue
# Per spec §2.3.2 ``body`` is always a String regardless of
# encoding, so substring-match directly without any decode.
body = element.get("body")
if isinstance(body, str) and includes in body:
return True
except Exception as e:
logger.error(f"Error checking inclusion: {e}")
Expand Down
19 changes: 12 additions & 7 deletions common/storage/milvus/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from lib.logging_utils import init_logger
from lib.metrics import record_histogram, increment_counter
from lib.vcon_redis import VconRedis
from vcon import Vcon

try:
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType, utility
Expand Down Expand Up @@ -131,21 +132,25 @@ def extract_text_from_vcon(vcon: dict) -> str:
if "analysis" in vcon:
transcript_analyses = [a for a in vcon["analysis"] if a.get("type") == "transcript"]
for analysis in transcript_analyses:
if "body" in analysis and "text" in analysis["body"]:
text += analysis["body"]["text"] + " "
# Decode through the Vcon helper so a JSON-stringified body
# (post spec-enforcement on store) still surfaces as a dict.
body = Vcon.decoded_body(analysis)
if isinstance(body, dict) and "text" in body:
text += body["text"] + " "
extracted_components.append(f"transcript analysis")
logger.debug(f"Extracted transcript analysis from vCon {vcon_id}")

# Extract summary analysis
if "analysis" in vcon:
summary_analyses = [a for a in vcon["analysis"] if a.get("type") == "summary"]
for analysis in summary_analyses:
if "body" in analysis and isinstance(analysis["body"], str):
text += analysis["body"] + " "
body = Vcon.decoded_body(analysis)
if isinstance(body, str):
text += body + " "
extracted_components.append(f"summary analysis")
logger.debug(f"Extracted summary analysis from vCon {vcon_id}")
elif "body" in analysis and isinstance(analysis["body"], dict) and "text" in analysis["body"]:
text += analysis["body"]["text"] + " "
elif isinstance(body, dict) and "text" in body:
text += body["text"] + " "
extracted_components.append(f"summary analysis")
logger.debug(f"Extracted summary analysis from vCon {vcon_id}")

Expand Down
105 changes: 82 additions & 23 deletions common/vcon.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,28 +42,74 @@ def build_new(cls):
def tags(self):
return self.find_attachment_by_purpose("tags")

@staticmethod
def decoded_body(entry):
"""Return an attachment/analysis ``body`` as a live Python value.

Per draft-ietf-vcon-vcon-core-02 §2.3.2 ``body`` is *always* a String;
the ``encoding`` tells you how to interpret it:

- ``json`` → body is a JSON-encoded object/array, parse with ``json.loads``.
- ``base64url`` → body is base64url-encoded bytes, returned verbatim
(binary decoding is caller-specific).
- ``none`` → body is a freeform string, returned verbatim.

For backwards compatibility with legacy writers that placed a raw
dict/list under ``body`` with ``encoding: none``, the dict/list is
returned as-is. ``VconRedis._enforce_spec_on_write`` later normalises
that to spec-correct ``encoding: json`` + stringified body, after
which this helper still returns the same Python value on reload.

Returns ``None`` if ``entry`` is falsy.
"""
if not entry:
return None
body = entry.get("body")
if entry.get("encoding") == "json" and isinstance(body, str):
return json.loads(body)
return body

@staticmethod
def with_decoded_body(entry):
"""Shallow copy of an attachment/analysis entry with body decoded.

Returns a new dict identical to ``entry`` except ``body`` is replaced
with the live Python value parsed via :meth:`decoded_body`. Useful
when a caller wants to navigate into ``body`` with dict syntax (e.g.
via a dot-path navigator) without having to know whether body
arrived as a JSON-encoded string from storage.

Returns ``None`` if ``entry`` is falsy.
"""
if not entry:
return None
return {**entry, "body": Vcon.decoded_body(entry)}

def get_tag(self, tag_name):
tags_attachment = self.find_attachment_by_purpose("tags")
if not tags_attachment:
return None
tag = next(
(t for t in tags_attachment["body"] if t.startswith(f"{tag_name}:")), None
)
tags = self.decoded_body(tags_attachment) or []
tag = next((t for t in tags if t.startswith(f"{tag_name}:")), None)
if not tag:
return None
tag_value = tag.split(":")[1]
return tag_value
return tag.split(":", 1)[1]

def add_tag(self, tag_name, tag_value):
tags_attachment = self.find_attachment_by_purpose("tags")
if not tags_attachment:
tags_attachment = {
"type": "tags",
"body": [],
"encoding": "none",
}
if tags_attachment is None:
# Spec 0.4.0 renamed attachment ``type`` → ``purpose``. New
# writes use the spec-current key; lookup tolerates both.
tags_attachment = {"purpose": "tags"}
self.vcon_dict["attachments"].append(tags_attachment)
tags_attachment["body"].append(f"{tag_name}:{tag_value}")
# Decode existing body so a prior add_tag (or a Redis round-trip
# via VconRedis._enforce_spec_on_write) round-trips cleanly. Write
# back as spec-correct ``encoding=json`` + stringified list, per
# draft-ietf-vcon-vcon-core-02 §2.3.2 (body is always a String).
tags = self.decoded_body(tags_attachment) or []
tags.append(f"{tag_name}:{tag_value}")
tags_attachment["body"] = json.dumps(tags)
tags_attachment["encoding"] = "json"

def find_attachment_by_purpose(self, purpose):
# IETF vCon spec 0.4.0 attachment lookup. Matches `purpose` first
Expand Down Expand Up @@ -97,55 +143,68 @@ def find_attachment_by_type(self, type):
def add_attachment(self, *, body: Union[dict, list, str], type: str, encoding="none"):
if encoding not in ['json', 'none', 'base64url']:
raise Exception("Invalid encoding")


# Per draft-ietf-vcon-vcon-core-02 §2.3.2 ``body`` is always a String.
# If a caller hands us a dict/list as a convenience, JSON-encode it
# immediately so any reader that touches the attachment between now
# and storage sees the spec-correct shape.
if isinstance(body, (dict, list)):
body = json.dumps(body)
encoding = "json"

if encoding == "json":
try:
json.loads(body)
except Exception as e:
raise Exception("Invalid JSON body: ", e)

if encoding == 'base64url':
try:
base64.urlsafe_b64decode(body)
except Exception as e:
raise Exception("Invalid base64url body: ", e)

attachment = {
self.vcon_dict["attachments"].append({
"type": type,
"body": body,
"encoding": encoding,
}
self.vcon_dict["attachments"].append(attachment)
})

def find_analysis_by_type(self, type): # TODO fix to search for specific dialog id if it's passed
return next((a for a in self.vcon_dict["analysis"] if a["type"] == type), None)

def add_analysis(self, *, type: str, dialog: Union[list, int], vendor: str, body: Union[dict, list, str], encoding="none", extra={}):

if encoding not in ['json', 'none', 'base64url']:
raise Exception("Invalid encoding")


# Per draft-ietf-vcon-vcon-core-02 §2.3.2 ``body`` is always a String.
# If a caller hands us a dict/list as a convenience, JSON-encode it
# immediately so any reader that touches the analysis between now
# and storage sees the spec-correct shape.
if isinstance(body, (dict, list)):
body = json.dumps(body)
encoding = "json"

if encoding == "json":
try:
json.loads(body)
except Exception as e:
raise Exception("Invalid JSON body: ", e)

if encoding == 'base64url':
try:
base64.urlsafe_b64decode(body)
except Exception as e:
raise Exception("Invalid base64url body: ", e)

analysis = {
self.vcon_dict["analysis"].append({
"type": type,
"dialog": dialog,
"vendor": vendor,
"body": body,
"encoding": encoding,
**extra,
}
self.vcon_dict["analysis"].append(analysis)
})

def add_party(self, party: dict):
self.vcon_dict["parties"].append(party)
Expand Down
5 changes: 4 additions & 1 deletion conserver/links/check_and_tag/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from lib.metrics import record_histogram, increment_counter
import time
from lib.links.filters import is_included, randomly_execute_with_sampling
from vcon import Vcon

logger = init_logger(__name__)

Expand Down Expand Up @@ -128,7 +129,9 @@ def run(
if not source:
logger.warning("No %s found for vCon: %s", source_type, vCon.uuid)
continue
source_text = navigate_dict(source, text_location)
# Decode body so a dotted ``text_location`` like ``body.transcript``
# can drill through a JSON-encoded body (spec-current shape).
source_text = navigate_dict(Vcon.with_decoded_body(source), text_location)
if not source_text:
logger.warning("No source_text found at %s for vCon: %s", text_location, vCon.uuid)
continue
Expand Down
17 changes: 15 additions & 2 deletions conserver/links/check_and_tag/tests/test_check_and_tag.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,18 @@ def test_navigate_dict():
assert navigate_dict({"body": {"text": "hello"}}, "body.missing") is None


def test_navigate_dict_drills_into_json_encoded_body_via_helper():
# Spec-current shape: encoding=json, body is a stringified dict.
# navigate_dict cannot drill into a string, so the link feeds it the
# output of Vcon.with_decoded_body first.
source = {
"type": "transcript",
"body": json.dumps({"transcript": "the actual text"}),
"encoding": "json",
}
assert navigate_dict(Vcon.with_decoded_body(source), "body.transcript") == "the actual text"


def test_run_requires_tag_name():
with pytest.raises(ValueError, match="tag_name is required"):
run(
Expand Down Expand Up @@ -81,7 +93,8 @@ def test_run_applies_tag_when_evaluation_is_positive(
assert result == "test-uuid"
assert sample_vcon.get_tag("topic") == "billing"
analysis = get_analysis_for_type(sample_vcon, 0, "tag_evaluation")
assert analysis["body"]["applies"] is True
# add_analysis JSON-encodes a dict body at the boundary (spec).
assert Vcon.decoded_body(analysis)["applies"] is True
mock_instance.store_vcon.assert_called_once_with(sample_vcon)
mock_increment_counter.assert_any_call(
"conserver.link.openai.tags_applied",
Expand Down Expand Up @@ -130,7 +143,7 @@ def test_run_records_negative_evaluation_without_applying_tag(
assert result == "test-uuid"
assert sample_vcon.get_tag("topic") is None
analysis = get_analysis_for_type(sample_vcon, 0, "tag_evaluation")
assert analysis["body"]["applies"] is False
assert Vcon.decoded_body(analysis)["applies"] is False
mock_instance.store_vcon.assert_called_once_with(sample_vcon)
mock_increment_counter.assert_not_called()
mock_record_histogram.assert_called_once()
Expand Down
5 changes: 4 additions & 1 deletion conserver/links/detect_engagement/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from lib.metrics import record_histogram, increment_counter
import time
from lib.links.filters import is_included, randomly_execute_with_sampling
from vcon import Vcon
import os
logger = init_logger(__name__)

Expand Down Expand Up @@ -97,7 +98,9 @@ def run(
logger.warning("No %s found for vCon: %s", source_type, vCon.uuid)
continue

source_text = navigate_dict(source, text_location)
# Decode body so a dotted ``text_location`` like ``body.transcript``
# can drill through a JSON-encoded body (spec-current shape).
source_text = navigate_dict(Vcon.with_decoded_body(source), text_location)
if not source_text:
logger.warning("No source_text found at %s for vCon: %s", text_location, vCon.uuid)
continue
Expand Down
8 changes: 7 additions & 1 deletion conserver/links/hugging_llm_link/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from lib.error_tracking import init_error_tracker
from lib.metrics import record_histogram, increment_counter
from lib.vcon_redis import VconRedis
from vcon import Vcon

# Initialize services
init_error_tracker()
Expand Down Expand Up @@ -229,7 +230,12 @@ def _get_transcript_text(self, vcon) -> str:
transcript_text = ""
for analysis in vcon.analysis:
if analysis["type"] == "transcript":
transcript = analysis["body"].get("transcript", "")
# Decode body so a JSON-encoded transcript (post-spec-normalize)
# still surfaces as a dict here.
body = Vcon.decoded_body(analysis)
if not isinstance(body, dict):
continue
transcript = body.get("transcript", "")
if transcript:
transcript_text += transcript + "\n"
return transcript_text.strip()
Expand Down
Loading
Loading