diff --git a/src/openai/lib/streaming/_assistants.py b/src/openai/lib/streaming/_assistants.py index 6efb3ca3f1..377a1dd0ca 100644 --- a/src/openai/lib/streaming/_assistants.py +++ b/src/openai/lib/streaming/_assistants.py @@ -977,14 +977,48 @@ def accumulate_event( return current_message_snapshot, new_content +def _merge_indexed_list(items: list[object]) -> list[object]: + """Merge list entries that share the same ``index`` value. + + Streaming chunks may contain multiple entries with the same logical + ``index`` (e.g. a tool-call name and its first argument fragment in one + SSE event). When such a list is stored for the first time it must be + collapsed by logical index so that later chunks merge correctly. + """ + if not items or not is_dict(items[0]) or "index" not in items[0]: # type: ignore[arg-type] + return items + + merged: dict[int, object] = {} + order: list[int] = [] + for item in items: + if not is_dict(item): + continue + idx = item.get("index") # type: ignore[union-attr] + if not isinstance(idx, int): + continue + if idx in merged: + existing = merged[idx] + if is_dict(existing): + merged[idx] = accumulate_delta(existing, item) # type: ignore[arg-type] + else: + order.append(idx) + merged[idx] = item + + return [merged[i] for i in order] + + def accumulate_delta(acc: dict[object, object], delta: dict[object, object]) -> dict[object, object]: for key, delta_value in delta.items(): if key not in acc: + if is_list(delta_value): + delta_value = _merge_indexed_list(delta_value) acc[key] = delta_value continue acc_value = acc[key] if acc_value is None: + if is_list(delta_value): + delta_value = _merge_indexed_list(delta_value) acc[key] = delta_value continue diff --git a/src/openai/lib/streaming/_deltas.py b/src/openai/lib/streaming/_deltas.py index a5e1317612..2f2dea8b89 100644 --- a/src/openai/lib/streaming/_deltas.py +++ b/src/openai/lib/streaming/_deltas.py @@ -3,14 +3,48 @@ from ..._utils import is_dict, is_list +def _merge_indexed_list(items: list[object]) -> list[object]: + """Merge list entries that share the same ``index`` value. + + Streaming chunks may contain multiple entries with the same logical + ``index`` (e.g. a tool-call name and its first argument fragment in one + SSE event). When such a list is stored for the first time it must be + collapsed by logical index so that later chunks merge correctly. + """ + if not items or not is_dict(items[0]) or "index" not in items[0]: # type: ignore[arg-type] + return items + + merged: dict[int, object] = {} + order: list[int] = [] + for item in items: + if not is_dict(item): + continue + idx = item.get("index") # type: ignore[union-attr] + if not isinstance(idx, int): + continue + if idx in merged: + existing = merged[idx] + if is_dict(existing): + merged[idx] = accumulate_delta(existing, item) # type: ignore[arg-type] + else: + order.append(idx) + merged[idx] = item + + return [merged[i] for i in order] + + def accumulate_delta(acc: dict[object, object], delta: dict[object, object]) -> dict[object, object]: for key, delta_value in delta.items(): if key not in acc: + if is_list(delta_value): + delta_value = _merge_indexed_list(delta_value) acc[key] = delta_value continue acc_value = acc[key] if acc_value is None: + if is_list(delta_value): + delta_value = _merge_indexed_list(delta_value) acc[key] = delta_value continue