diff --git a/.env.example b/.env.example deleted file mode 100644 index 1253b677..00000000 --- a/.env.example +++ /dev/null @@ -1,35 +0,0 @@ -OPENAI_BASE_URL="https://generativelanguage.googleapis.com/v1beta/openai/" -OPENAI_API_KEY="..." # Used for Open-AI compatible models, including Gemini models accessed via the OpenAI API. -GOOGLE_API_KEY="..." # Used by google-adk -ANTHROPIC_API_KEY="..." - -# Model selection (see https://ai.google.dev/gemini-api/docs/models) -# Stable: gemini-2.5-pro, gemini-2.5-flash, gemini-2.5-flash-lite -# Preview: gemini-3.1-pro-preview, gemini-3.1-flash-preview -DEFAULT_PLANNER_MODEL="gemini-2.5-pro" -DEFAULT_WORKER_MODEL="gemini-2.5-flash" -DEFAULT_EVALUATOR_MODEL="gemini-2.5-pro" - -# LangFuse for agent execution tracing and evaluations -LANGFUSE_SECRET_KEY="sk-lf-..." -LANGFUSE_PUBLIC_KEY="pk-lf-..." -LANGFUSE_HOST="https://us.cloud.langfuse.com" - -# AML Database Configuration -AML_DB__DRIVER="sqlite" -AML_DB__DATABASE="implementations/aml_investigation/data/aml_transactions.db" -AML_DB__QUERY__MODE="ro" - -# Report Generation Database Configuration -REPORT_GENERATION_DB__DRIVER="sqlite" -REPORT_GENERATION_DB__DATABASE="implementations/report_generation/data/OnlineRetail.db" -REPORT_GENERATION_DB__QUERY__MODE="ro" - -# Vertex AI Search (custom knowledge base) - no API key needed, uses ADC -# On Coder/GCE workspaces the attached service account handles auth automatically. -# Required IAM roles on the service account: roles/discoveryengine.viewer, roles/aiplatform.user -GOOGLE_CLOUD_LOCATION="us-central1" -VERTEX_AI_DATASTORE_ID="projects/{project}/locations/global/collections/default_collection/dataStores/{datastore-id}" - -# Report Generation (all optional, defaults are in implementations/report_generation/env_vars.py) -REPORT_GENERATION_OUTPUT_PATH="..." diff --git a/ABB-Manual-Assistant/README.md b/ABB-Manual-Assistant/README.md new file mode 100644 index 00000000..916d7e2b --- /dev/null +++ b/ABB-Manual-Assistant/README.md @@ -0,0 +1,3 @@ +# Linamar-Vector-Bootcamp + +## Applying agentic ai to robot troubleshooting. diff --git a/ABB-Manual-Assistant/Run_Eval_Info.txt b/ABB-Manual-Assistant/Run_Eval_Info.txt new file mode 100644 index 00000000..249a8aa4 --- /dev/null +++ b/ABB-Manual-Assistant/Run_Eval_Info.txt @@ -0,0 +1,5 @@ +Upload dataset command: +python3 upload_test_data_from_file.py --dataset-name "SME Test Set" --input-file SME-TestSet.csv + +Run evaluation command: +python3 run_eval_from_dataset.py --dataset-name "SME Test Set" --run-name "SME Eval Run" diff --git a/ABB-Manual-Assistant/SME-TestSet.csv b/ABB-Manual-Assistant/SME-TestSet.csv new file mode 100644 index 00000000..c3dec50d --- /dev/null +++ b/ABB-Manual-Assistant/SME-TestSet.csv @@ -0,0 +1,46 @@ +test_prompt,expected_response,safety_considerations,expected_sources,expected_trace,max_total_tokens,max_total_latency +What is Error code 10039 and possible solution?,"During startup, the system has found that data in the Serial Measurement Board (SMB) memory is not OK. All data must be OK before automatic operation is possible. Manual jogging is still possible. There are differences between data stored on the SMB and in the controller, possibly due to replacement of SMB, controller, or both. Solution: update the SMB data.",,,,, +How to fix SMB memory is not OK,Update the Serial Measurement Board (SMB) data.,,,,, +How to recover if axis computer has lost communication,"• Check cable between axis computer and Safety System is intact and connected correctly +• Verify power supply to Safety System +• Ensure no extreme electromagnetic interference near robot cabling",,,,, +What does error code 40038 mean?,LOCAL is illegal in routine variable declaration. Only program data declarations may use LOCAL. Remove the LOCAL attribute.,,,,, +What is a reference error,Depends on the specific reference error number. The system should prompt for more detail since there are many types.,,,,, +Why am I getting a programmed forced reduced error,Programmed tip force is too high for the tool. Requested motor torque exceeds limits and force is reduced to maximum allowable motor torque.,,,,, +SMB Data is missing. What should I do?,"If valid data exists in the cabinet, transfer it to SMB memory. If issue persists, check the communication cable to the SMB board. Replace SMB board if needed.",,,,, +We are getting a Motor phase short circuit. Where should we look?,"Possible short circuit in cables or connectors between phases or to ground, or inside the motor. Check or replace cables, connectors, and motor.",,,,, +Why am I getting a singularity problem,Depends on the exact error number. Typically relates to joint 4 or joint 6.,,,,, +Why am I getting a joint not synchronized error and how to fix it,Speed of the joint before power down or failure was too high. Perform a new update of the revolution counter.,,,,, +What is the payload for the IRB 140?,The payload for the IRB 140 is 6 kg.,,"Product Specification IRB 140, page 9",,150,200ms +What standards does the IRB 140 comply with?,"The robot is designed in accordance with standards such as EN ISO 10218-1, EN ISO 12100, EN ISO 13849-1, IEC 60204-1, and others related to industrial robot safety.","Safety compliance, Regulatory adherence",IRB 140 page 14-15,,300,300ms +What precautions should be taken before servicing the IRB 8700?,"Only trained personnel should perform service work, all electrical, hydraulic, and air supplies must be turned off, lockout procedures should be used, PPE should be worn, and safety regulations must be followed.","Trained personnel only, lockout/tagout, PPE, stored energy, live electrical hazards, unexpected robot motion",IRB 8700 page 20-22,Retrieve safety section -> summarize requirements,300,300ms +What is the operating temperature range for IRB 140?,"5 to 45 degrees while operating, up to 70 degrees short term range","Thermal Safety Limits, Risk of Malfunctioning when outside the optimal range",IRB 140 Manual Page 17,Orchestrator -> Search Agent -> Extract Environmental Conditions -> Units -> Output Response,150,200ms +How do you manually release the brakes on the IRB 8700?,"Agent describes location of the brake release unit, warns of unexpected movement, explains axis brake release buttons and power requirements to R1.MP connectors if controller is disconnected.","Robot may move unexpectedly, ensure nbobody is near or beneath robot, correct connector wiring required","Product Manual IRB 8700: Emergency Release of Robot Arm pp. 31, Manual Brake Release pp. 85-87",,, +What type of fire extinguisher should be used if the robot controller catches fire?,Use a carbon dioxide CO2 extinguisher.,"Electrical fire harzard, personal safety during emergency response.",Product Manual IRB 8700: Fire Extinguishing p.30,,, +"What happens if a Break instruction is executed during motion, and how does it differ from Stop, StopMove, and EXIT?","Break immediately halts program execution and robot motion without waiting for stop points, mainly for debugging. Stop gracefully stops program execution. StopMove halts only robot motion while program continues. EXIT terminates program execution completely.","Emergency Stop Behaviour, Controlled vs abrupt stopping, Avoid unsafe motion interruptions, correct debugging practices","RAPID page 34, 510, 515, 105",Orchestrator -> Search Agent -> Retrieve the needed instruction definitions -> Semantic Comparison -> Create Comparison Table -> Output Response,350,350ms +"My robot is jerking around too much when hadling parts, how can I fix this?","Use the RAPID instruction AccSet. This reduces the robot’s acceleration and deceleration, which makes the motion smoother and less aggressive. Lower acceleration helps prevent jerky movements and improves stability when handling parts. +Example: +AccSet 50, 100; +The first value sets the acceleration as a percentage of the normal value. Lower values will result in smoother motion.",Changes to motion profiles can impact collision avoidance and timing with other equipment,RAPID page 15,Orchestrator -> search agent -> retrieve relavant instruction -> summarize -. output response,300,300ms +The robot controller will not start. None of the LEDs appear to indicate normal behavior. What troubleshooting steps should I follow?,"The response should identify this as a start-up failure scenario. It should provide a structured and ordered troubleshooting process: (1) verify main power supply is present and within limits, (2) check main transformer connection, (3) confirm main switches are turned on, (4) verify power supply to Control Module and Drive Module, (5) direct the user to additional troubleshooting paths such as All LEDs OFF or Controller not responding if needed. The answer should emphasize systematic troubleshooting rather than guessing or replacing parts.",Electrical shock risk when checking voltage; assumption that all components may be live; proper use of measurement tools before touching components.,"Operating manual – Troubleshooting IRC5, Section 3.1 Start-up failures (pages 31–33)",Orchestrator -> Search Agent -> Search Tool -> Search Agent -> Orchestrator,600,500ms +I suspect the robot brakes are not functioning correctly. Is it safe to stand near the robot while troubleshooting this issue?,The response must prioritize safety. It should explicitly state that this is a DANGER-level scenario. It should explain that a robot with faulty or disabled holding brakes can collapse under its own weight and cause severe injury or death. It must clearly instruct the user to never stand within the robot working area or beneath any axes and to secure the robot arm using external supports before performing any work.,Severe crush injury or fatality risk; gravity-driven collapse hazard; unsafe working area; mandatory exclusion zones.,"Operating manual – Safety section, Safe Trouble Shooting (page 16)",Orchestrator -> Search Agent -> Search Tool -> Search Agent -> Orchestrator,400,400ms +The robot controller is completely unresponsive and cannot be operated using the FlexPendant. What are the possible causes?,"The response should identify this as a 'Controller not responding' condition. It should list possible causes clearly: (1) controller not connected to mains power, (2) transformer malfunction or incorrect connection, (3) fuse (Q1) tripped, (4) missing connection between Control and Drive modules. The response should also provide specific corrective actions for each cause.",Electrical hazards when inspecting fuses and power systems; assumption of live components.,Operating manual – Section 3.2 Controller not responding (page 33),Orchestrator -> Search Agent -> Search Tool -> Search Agent -> Orchestrator,500,500ms +We are experiencing intermittent faults that occur randomly with no clear pattern. What is the recommended troubleshooting approach?,"The response should identify this as an intermittent fault scenario. It should outline a systematic troubleshooting method: (1) inspect all cabling and connections, especially safety chains, (2) review event logs, (3) track occurrences using a historical log, (4) identify patterns, (5) evaluate environmental factors such as temperature or electrical interference. It should emphasize pattern detection and repeatability.",Unexpected robot motion; safety chain instability; potential for sporadic hazardous conditions.,Operating manual – Section 3.16 Intermittent errors (page 52),Orchestrator -> Search Agent -> Search Tool -> Search Agent -> Orchestrator,500,500ms +The FlexPendant is completely dead: no display and no input. What should I check?,"The response should identify a FlexPendant start failure. It should list causes in order of likelihood: (1) system not powered on, (2) FlexPendant not connected, (3) cable damage, (4) connector damage, (5) controller power supply fault. It should recommend actions such as inspecting the cable, reconnecting, and testing with another FlexPendant.",Low-voltage electrical risk; connector handling; equipment integrity checks.,Operating manual – Section 3.6 Problem starting FlexPendant (page 40),Orchestrator -> Search Agent -> Search Tool -> Search Agent -> Orchestrator,450,500ms +During robot operation we hear grinding or scraping noises coming from the joints. What could be causing this?,"The response should identify this as a mechanical noise issue. It should list root causes: worn bearings, contamination, insufficient lubrication, or overheating. It should provide actions such as locating the source of noise, inspecting bearings and lubrication, and replacing damaged components if necessary.",Risk of contact with moving parts; hot surfaces; mechanical failure hazard.,Operating manual – Section 3.13 Mechanical noise (pages 47–48),Orchestrator -> Search Agent -> Search Tool -> Search Agent -> Orchestrator,500,500ms +The robot controller is operational but running slowly and behaving inconsistently. What might be causing this?,"The response should identify low controller performance. It should list causes: (1) excessive logical loops in program, (2) low I/O update interval, (3) excessive communication with PLC or external systems. It should recommend solutions such as adding WAIT instructions, adjusting I/O poll rates, and reducing communication load.",Unpredictable robot behavior risk; potential for unintended motion.,Operating manual – Section 3.3 Low controller performance (pages 34–35),Orchestrator -> Search Agent -> Search Tool -> Search Agent -> Orchestrator,600,500ms +What safety precautions should always be followed when troubleshooting an industrial robot system?,"The response should summarize safety guidelines: (1) treat all electrical components as live, (2) expect unexpected robot movement, (3) recognize that safety circuits may be bypassed during troubleshooting, (4) remain outside hazardous zones where possible. The response should emphasize heightened awareness at all times.",Electrical shock hazard; unexpected motion; bypassed safety systems; general high-risk environment awareness.,Operating manual – Section 1.3 Safety during troubleshooting (page 13),Orchestrator -> Search Agent -> Search Tool -> Search Agent -> Orchestrator,400,400ms +There is no voltage available in the service outlet on the control module. What could be causing this?,"The response should identify this as a 'No voltage in service outlet' issue. It should list probable causes: (1) tripped circuit breaker (F5), (2) tripped earth fault protection (F4), (3) loss of mains power supply, (4) incorrectly connected transformer. It should also include corrective actions such as checking breakers, ensuring connected equipment is not overloading the outlet, and verifying transformer configuration.",Electrical hazard; potential overcurrent conditions; grounding faults.,Operating manual – Section 3.5 No voltage in service outlet (pages 38–39),Orchestrator -> Search Agent -> Search Tool -> Search Agent -> Orchestrator,500,500ms +The FlexPendant turns on but does not display anything and does not respond to inputs. What should I check?,"The response should identify this as 'Problem connecting FlexPendant to controller'. It should list causes such as Ethernet communication problems or main computer faults. It should include troubleshooting: checking cables, verifying connection, inspecting LEDs, and checking status signals on the main computer.",Electrical safety; system communication faults may result in unpredictable system state.,Operating manual – Section 3.7 Problem connecting FlexPendant (page 41),Orchestrator -> Search Agent -> Search Tool -> Search Agent -> Orchestrator,450,500ms +The robot cannot be jogged manually even though the system starts normally. What might be wrong?,"The response should identify 'Problem jogging the robot'. Causes include joystick malfunction or misconfiguration. Actions include switching controller to manual mode, checking FlexPendant connection, and resetting the FlexPendant.",Unexpected motion risks; incorrect mode settings.,Operating manual – Section 3.9 Problem jogging the robot (page 45),Orchestrator -> Search Agent -> Search Tool -> Search Agent -> Orchestrator,450,500ms +We tried to reflash firmware and the process failed. What should we look for?,"The response should identify a firmware reflashing failure. It should explain causes such as incompatibility between hardware and software versions. It should recommend checking event logs, verifying version compatibility of units and RobotWare, and consulting ABB support for compatible firmware.",System instability; risk of non-operational hardware during firmware updates.,Operating manual – Section 3.10 Reflashing firmware failure (page 46),Orchestrator -> Search Agent -> Search Tool -> Search Agent -> Orchestrator,500,500ms +The robot shows inconsistent path accuracy and occasionally makes noise during movement. What could be causing this?,"The response should identify inconsistent path accuracy issues. Causes include incorrect calibration, incorrectly defined TCP, damaged mechanical components, or worn bearings. Actions include recalibration, checking TCP, inspecting motors/bearings, and verifying brake performance.",Mechanical failure hazard; potential collision risk.,Operating manual – Section 3.11 Inconsistent path accuracy (page 47),Orchestrator -> Search Agent -> Search Tool -> Search Agent -> Orchestrator,550,500ms +There are oil or grease stains around the motors and gearboxes. Is this a serious issue?,"The response should explain that this indicates possible leaks. Causes include faulty seals, overfilled gearboxes, or overheating oil. It should explain risks such as brake failure causing collapse. Actions include inspecting seals, checking oil levels, and ensuring correct oil type.",Slip hazards; brake failure leading to collapse.,Operating manual – Section 3.12 Oil and grease stains (page 46),Orchestrator -> Search Agent -> Search Tool -> Search Agent -> Orchestrator,500,500ms +The robot collapses when powering down. What could be the issue?,"The response should identify 'Manipulator crashes on power down'. Causes include faulty brakes or faulty brake power supply. It should list actions: identify failing axis, check brake power supply, inspect motors for leaks, and replace defective components.",Severe injury risk; uncontrolled robot collapse.,Operating manual – Section 3.14 Manipulator crashes on power down (page 49–50),Orchestrator -> Search Agent -> Search Tool -> Search Agent -> Orchestrator,500,500ms +The robot brakes will not release when I try to operate the robot. What should I check?,"The response should identify 'Problem releasing robot brakes'. Causes include brake contactor issues, system not reaching Motors ON state, faulty brake, or missing 24V supply. It should recommend checking contactors, testing brakes, verifying 24V supply, and reviewing event logs.",Unexpected movement hazards during brake testing; electrical risks.,Operating manual – Section 3.15 Problem releasing robot brakes (page 50–51),Orchestrator -> Search Agent -> Search Tool -> Search Agent -> Orchestrator,550,500ms +"When installing the robot on a non-ideal foundation, how do flatness, tilt, and resonance frequency affect robot accuracy and performance, and what corrective actions are recommended?","Flatness (≤0.3 mm) ensures calibration accuracy; uneven surfaces degrade resolver accuracy. Tilt (≤5°) is allowed but can affect performance and may require recalibration. Resonance frequency (≥22 Hz) ensures system stability—lower values can amplify vibration (10–20 Hz region). Corrective actions include leveling with bolts or shims, recalibrating robot, and isolating from environmental vibrations.","Structural stability, Accuracy degradation risks, Vibration amplification hazards, Foundation-Induced dailure risks",IRB 7600 Page 53-54,Orchestrator -> Search Agent ->Retrieve foundation + performance sections -> combine constraints -> implications -> synthesize -> output response,400,350ms +Prior to installing the IRB 7600 - what information should the technician know or read about,"The response should include safety information, compentence requirements, liability limiations and understanding symbols/signals.",Follow safety symbols/signals and use trained personal only otherwise risk of electrical/physical/crushing harm.,IRB 7600 Page 24-44,Orchestrator -> Search agent -> safety retrieval -> search agent returns signals/symbols/safety handeling ,300,300ms +My robot isn’t moving in a straight line and is taking weird paths between points. How can I fix this?,"Use the MoveL instruction. This forces the robot to move in a straight line in Cartesian space instead of joint motion, resulting in predictable paths.","• Test in manual mode at low speed +• Verify clearances to fixtures +• Confirm no new collision risks from path change","RAPID, page 264",Orchestrator -> search agent -> retrieve relavant instruction -> summarize -. output response,300,300ms +What are the data types used for in RAPID based on the technical reference structure?,They define value formats and constraints for variables/parameters used by instructiosn and functions ,Unsafe motion logic must be avoided and validate behavior prior to production,RAPID 1083 -1238,Orchestrator -> Search agent -> Retrieve RAPID entry -> Syntax Validations -> Return constrained/summarized answer,220,500ms \ No newline at end of file diff --git a/ABB-Manual-Assistant/agent_utils/__init__.py b/ABB-Manual-Assistant/agent_utils/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/ABB-Manual-Assistant/agent_utils/memory_store.py b/ABB-Manual-Assistant/agent_utils/memory_store.py new file mode 100644 index 00000000..a3f4c8ca --- /dev/null +++ b/ABB-Manual-Assistant/agent_utils/memory_store.py @@ -0,0 +1,67 @@ +import datetime +import os +import sqlite3 +import uuid + + +class MemoryStore: + def __init__(self, db_path="data/memory.db"): + os.makedirs(os.path.dirname(db_path), exist_ok=True) + self.conn = sqlite3.connect(db_path, check_same_thread=False) + self._init_db() + + def _init_db(self): + cur = self.conn.cursor() + cur.execute(""" + CREATE TABLE IF NOT EXISTS conversations ( + id TEXT PRIMARY KEY, + title TEXT, + created_at TEXT + ) + """) + cur.execute(""" + CREATE TABLE IF NOT EXISTS messages ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + conversation_id TEXT, + role TEXT, + content TEXT, + timestamp TEXT + ) + """) + self.conn.commit() + + def create_conversation(self, title=None): + cid = str(uuid.uuid4()) + if not title: + title = f"Chat {datetime.datetime.now().strftime('%Y-%m-%d %H:%M')}" + self.conn.execute( + "INSERT INTO conversations VALUES (?, ?, ?)", (cid, title, datetime.datetime.now().isoformat()) + ) + self.conn.commit() + return cid + + def list_conversations(self): + return self.conn.execute("SELECT id, title, created_at FROM conversations ORDER BY created_at DESC").fetchall() + + def get_history(self, conversation_id, limit=100): + rows = self.conn.execute( + "SELECT role, content FROM messages WHERE conversation_id=? ORDER BY id ASC LIMIT ?", + (conversation_id, limit), + ).fetchall() + return [{"role": r[0], "content": r[1]} for r in rows] + + def log_message(self, conversation_id, role, content): + self.conn.execute( + "INSERT INTO messages (conversation_id, role, content, timestamp) VALUES (?, ?, ?, ?)", + (conversation_id, role, content, datetime.datetime.now().isoformat()), + ) + self.conn.commit() + + def rename_conversation(self, conversation_id, new_title): + self.conn.execute("UPDATE conversations SET title=? WHERE id=?", (new_title, conversation_id)) + self.conn.commit() + + def delete_conversation(self, conversation_id): + self.conn.execute("DELETE FROM messages WHERE conversation_id=?", (conversation_id,)) + self.conn.execute("DELETE FROM conversations WHERE id=?", (conversation_id,)) + self.conn.commit() diff --git a/ABB-Manual-Assistant/configs.py b/ABB-Manual-Assistant/configs.py new file mode 100644 index 00000000..52ae2cc9 --- /dev/null +++ b/ABB-Manual-Assistant/configs.py @@ -0,0 +1,211 @@ +"""Configuration settings for agent evaluations. + +This module provides centralized configuration management using Pydantic settings, +supporting environment variables and .env file loading. +""" + +from typing import Any + +from pydantic import AliasChoices, BaseModel, Field, SecretStr, field_validator +from pydantic_settings import BaseSettings, SettingsConfigDict +from sqlalchemy.engine.url import URL + + +class DatabaseConfig(BaseModel): + """Database connection configuration.""" + + driver: str = Field( + ..., + description="SQLAlchemy dialect (e.g., 'sqlite', 'postgresql', 'mysql+pymysql').", + ) + username: str | None = Field( + default=None, + description="Database username. For SQLite or integrated authentication, this can be None.", + ) + host: str | None = Field(default=None, description="Database host address or file path for SQLite.") + password: SecretStr | None = Field( + default=None, + description="Database password. For SQLite or integrated authentication, this can be None.", + ) + port: int | None = Field(default=None, description="Database port number.") + database: str | None = Field(default=None, description="Database name or file path for SQLite.") + query: dict[str, Any] = Field( + default_factory=dict, + description="URL query parameters (e.g. {'mode': 'ro'} for read-only SQLite).", + ) + + def build_uri(self) -> str: + """Construct the SQLAlchemy connection URI safely using the official URL object. + + This handles special character escaping in passwords automatically. + + Returns + ------- + str + The full database connection URI. + """ + return URL.create( + drivername=self.driver, + username=self.username, + password=self.password.get_secret_value() if self.password else None, + host=self.host, + port=self.port, + database=self.database, + query=self.query, + ).render_as_string(hide_password=False) + + +class Configs(BaseSettings): + """Central configuration for all agent evaluations. + + This class automatically loads configuration values from environment variables + and a .env file. Service-specific fields are optional - agents validate + required fields at initialization. + + Examples + -------- + >>> from aieng.agent_evals.configs import Configs + >>> config = Configs() + >>> print(config.default_worker_model) + 'gemini-2.5-flash' + """ + + model_config = SettingsConfigDict( + env_file=".env", + env_file_encoding="utf-8", + env_ignore_empty=True, + env_nested_delimiter="__", + extra="ignore", + ) + + aml_db: DatabaseConfig | None = Field( + default=None, + description="Anti-Money Laundering database configuration. Used by the Fraud Investigation Agent.", + ) + + report_generation_db: DatabaseConfig | None = Field( + default=None, + description="Database configuration for the the Report Generation Agent.", + ) + + # === Core LLM Settings === + openai_base_url: str = Field( + default="https://generativelanguage.googleapis.com/v1beta/openai/", + description="Base URL for OpenAI-compatible API (defaults to Gemini endpoint).", + ) + openai_api_key: SecretStr = Field( + validation_alias=AliasChoices("OPENAI_API_KEY", "GEMINI_API_KEY", "GOOGLE_API_KEY"), + description="API key for OpenAI-compatible API (accepts OPENAI_API_KEY, GEMINI_API_KEY, or GOOGLE_API_KEY).", + ) + google_api_key: SecretStr = Field( + validation_alias=AliasChoices("GEMINI_API_KEY", "GOOGLE_API_KEY"), + description="API key for Google/Gemini API (accepts GEMINI_API_KEY or GOOGLE_API_KEY).", + ) + anthropic_api_key: SecretStr | None = Field( + default=None, + validation_alias="ANTHROPIC_API_KEY", + description="API key for Anthropic API access when using LiteLLM-backed Claude models.", + ) + vector_inference_api_key: SecretStr | None = Field( + default=None, + validation_alias="VECTOR_INFERENCE_API_KEY", + description="API key for Vector's internal OpenAI-compatible inference endpoint.", + ) + default_planner_model: str = Field( + default="gemini-2.5-pro", + description="Model name for planning/complex reasoning tasks.", + ) + default_worker_model: str = Field( + default="gemini-2.5-flash", + description="Model name for worker/simple tasks.", + ) + default_evaluator_model: str = Field( + default="gemini-2.5-pro", + description="Model name for LLM-as-judge evaluation tasks.", + ) + default_temperature: float = Field( + default=1.0, + ge=0.0, + le=2.0, + description="Default temperature for LLM generation. Lower values (0.0-0.3) produce more consistent outputs.", + ) + default_evaluator_temperature: float = Field( + default=0.0, + ge=0.0, + le=2.0, + description="Temperature for LLM-as-judge evaluations. Default 0.0 for deterministic judging.", + ) + + # === Tracing (Langfuse) === + langfuse_public_key: str | None = Field( + default=None, + pattern=r"^pk-lf-.*$", + description="Langfuse public key for tracing (must start with 'pk-lf-').", + ) + langfuse_secret_key: SecretStr | None = Field( + default=None, + description="Langfuse secret key for tracing (must start with 'sk-lf-').", + ) + langfuse_host: str = Field( + default="https://us.cloud.langfuse.com", + validation_alias="LANGFUSE_HOST", + description="Langfuse base URL.", + ) + + # === Embedding Service === + embedding_base_url: str | None = Field(default=None, description="Base URL for embedding API service.") + embedding_api_key: SecretStr | None = Field(default=None, description="API key for embedding service.") + embedding_model_name: str = Field(default="@cf/baai/bge-m3", description="Name of the embedding model.") + + # === E2B Code Interpreter === + e2b_api_key: SecretStr | None = Field( + default=None, + description="E2B.dev API key for code interpreter (must start with 'e2b_').", + ) + default_code_interpreter_template: str | None = Field( + default="9p6favrrqijhasgkq1tv", + description="Default template name or ID for E2B.dev code interpreter.", + ) + + # === Web Search === + web_search_base_url: str | None = Field(default=None, description="Base URL for web search service.") + web_search_api_key: SecretStr | None = Field(default=None, description="API key for web search service.") + + # === Vertex AI Search (custom knowledge base) === + google_cloud_location: str = Field( + default="us-central1", + description="GCP region for Vertex AI model calls. Must match a region that supports Gemini.", + ) + vertex_datastore_id: str | None = Field( + default=None, + validation_alias="VERTEX_AI_DATASTORE_ID", + description=( + "Full Vertex AI Search data store resource name. " + "Format: projects/{project}/locations/global/collections/default_collection/dataStores/{id}. " + "Authentication uses Application Default Credentials (ADC) — no API key required." + ), + ) + + # === Report Generation === + # Defaults are set in the implementations/report_generation/env_vars.py file + report_generation_output_path: str | None = Field( + default=None, + description="Path to the directory where the report generation agent will save the reports.", + ) + + # Validators for the SecretStr fields + @field_validator("langfuse_secret_key") + @classmethod + def validate_langfuse_secret(cls, v: SecretStr | None) -> SecretStr | None: + """Validate that the Langfuse secret key starts with 'sk-lf-'.""" + if v is not None and not v.get_secret_value().startswith("sk-lf-"): + raise ValueError("Langfuse secret key must start with 'sk-lf-'") + return v + + @field_validator("e2b_api_key") + @classmethod + def validate_e2b_key(cls, v: SecretStr | None) -> SecretStr | None: + """Validate that the E2B API key starts with 'e2b_' if provided.""" + if v is not None and not v.get_secret_value().startswith("e2b_"): + raise ValueError("E2B API key must start with 'e2b_'") + return v diff --git a/ABB-Manual-Assistant/conversation_manager.py b/ABB-Manual-Assistant/conversation_manager.py new file mode 100644 index 00000000..e1da9693 --- /dev/null +++ b/ABB-Manual-Assistant/conversation_manager.py @@ -0,0 +1,73 @@ +# agents/conversation_manager.py +from __future__ import annotations + +from typing import Any, Dict, List, Optional + +from agent_utils.memory_store import MemoryStore + + +class ConversationManagerAgent: + """ + Minimal 'agent' wrapper around MemoryStore so other agents/tools + can call a stable interface. Keeps things simple: + - make sure a conversation exists + - append user/assistant turns + - read history + - rename/delete/list conversations + - optional: keep assistant partials in RAM; persist only on finalize + """ + + def __init__(self, store: Optional[MemoryStore] = None): + self.store = store or MemoryStore() + # simple in-RAM buffer for current assistant partials per conversation + self._partials: Dict[str, str] = {} + + # ---- conversation mgmt ---- + def ensure_conversation(self, conversation_id: Optional[str]) -> str: + rows = self.store.list_conversations() + if not rows: + return self.store.create_conversation() + if conversation_id: + return conversation_id + # default to newest (list_conversations should return DESC by created_at) + return rows[0][0] + + def list_conversations(self) -> List[Dict[str, Any]]: + rows = self.store.list_conversations() + return [{"id": r[0], "title": r[1], "created_at": r[2]} for r in rows] + + def rename(self, conversation_id: str, title: str) -> Dict[str, Any]: + self.store.rename_conversation(conversation_id, title) + return {"ok": True, "conversation_id": conversation_id, "title": title} + + def delete(self, conversation_id: str) -> Dict[str, Any]: + self.store.delete_conversation(conversation_id) + self._partials.pop(conversation_id, None) + return {"ok": True} + + def create(self, title: Optional[str] = None) -> Dict[str, Any]: + cid = self.store.create_conversation(title) + return {"ok": True, "conversation_id": cid} + + # ---- messages ---- + def save_user(self, conversation_id: str, content: str) -> Dict[str, Any]: + cid = self.ensure_conversation(conversation_id) + self.store.log_message(cid, "user", content) + return {"ok": True, "conversation_id": cid} + + def set_assistant_partial(self, conversation_id: str, partial_text: str) -> Dict[str, Any]: + # Keep partials in RAM for simplicity/quickness + cid = self.ensure_conversation(conversation_id) + self._partials[cid] = partial_text + return {"ok": True, "conversation_id": cid} + + def finalize_assistant(self, conversation_id: str) -> Dict[str, Any]: + cid = self.ensure_conversation(conversation_id) + final_text = self._partials.pop(cid, "") + # Only persist once at the end of the stream + self.store.log_message(cid, "assistant", final_text) + return {"ok": True, "conversation_id": cid, "content_len": len(final_text)} + + def get_history_messages(self, conversation_id: str, limit: int = 1000) -> List[Dict[str, Any]]: + cid = self.ensure_conversation(conversation_id) + return self.store.get_history(cid, limit=limit) diff --git a/ABB-Manual-Assistant/gitignore-2025.txt b/ABB-Manual-Assistant/gitignore-2025.txt new file mode 100644 index 00000000..cad57233 --- /dev/null +++ b/ABB-Manual-Assistant/gitignore-2025.txt @@ -0,0 +1,212 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[codz] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +scrape_with_LLM/ +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py.cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# UV +# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +#uv.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock +#poetry.toml + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +# pdm recommends including project-wide configuration in pdm.toml, but excluding .pdm-python. +# https://pdm-project.org/en/latest/usage/project/#working-with-version-control +#pdm.lock +#pdm.toml +.pdm-python +.pdm-build/ + +# pixi +# Similar to Pipfile.lock, it is generally recommended to include pixi.lock in version control. +#pixi.lock +# Pixi creates a virtual environment in the .pixi directory, just like venv module creates one +# in the .venv directory. It is recommended not to include this directory in version control. +.pixi + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.envrc +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Max Added +.venv_temp +scrape_with_LLM/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/ + +# Abstra +# Abstra is an AI-powered process automation framework. +# Ignore directories containing user credentials, local state, and settings. +# Learn more at https://abstra.io/docs +.abstra/ + +# Visual Studio Code +# Visual Studio Code specific template is maintained in a separate VisualStudioCode.gitignore +# that can be found at https://github.com/github/gitignore/blob/main/Global/VisualStudioCode.gitignore +# and can be added to the global gitignore or merged into this file. However, if you prefer, +# you could uncomment the following to ignore the entire vscode folder +# .vscode/ + +# Ruff stuff: +.ruff_cache/ + +# PyPI configuration file +.pypirc + +# Cursor +# Cursor is an AI-powered code editor. `.cursorignore` specifies files/directories to +# exclude from AI features like autocomplete and code analysis. Recommended for sensitive data +# refer to https://docs.cursor.com/context/ignore-files +.cursorignore +.cursorindexingignore + +# Marimo +marimo/_static/ +marimo/_lsp/ +__marimo__/ diff --git a/ABB-Manual-Assistant/orchestrator_agent.py b/ABB-Manual-Assistant/orchestrator_agent.py new file mode 100644 index 00000000..c0062b14 --- /dev/null +++ b/ABB-Manual-Assistant/orchestrator_agent.py @@ -0,0 +1,62 @@ +import os + +import agents +from dotenv import load_dotenv +from openai import AsyncOpenAI +from openai.types.responses import ResponseTextDeltaEvent +from search_agent import SearchAgent +from workorder_agent import WorkorderAgent + + +load_dotenv() + + +class Orchestrator: + def __init__(self): + self.client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"), base_url=os.getenv("OPENAI_BASE_URL")) + + search_agent_instance = SearchAgent() + self.search_agent_tool = search_agent_instance.search_agent.as_tool( + tool_name="search_knowledge_base", + tool_description="Agent searches ABB robot manuals for repair, troubleshooting, maintenance instructions, etc.", + ) + + workorder_agent_instance = WorkorderAgent() + self.workorder_agent_tool = workorder_agent_instance.workorder_agent.as_tool( + tool_name="workorder_agent", + tool_description="Given a conversation between the user and the orchestrator agent, the workorder agent will create a workorder.", + ) + + self.main_agent = agents.Agent( + name="Orchestrator Agent", + instructions=""" + You are a helpful assistant and organizer. + If the search agent doesn't find anything, use your own knowledge. + Always present the search agent's findings at the bottom of your output inside a collapsible section. + + If the user asks you to create a workorder, then call the workorder_agent. + """, + model=agents.OpenAIChatCompletionsModel(model="gemini-2.5-pro", openai_client=self.client), + model_settings=agents.ModelSettings(tool_choice="required", temperature=0.5), + tools=[self.search_agent_tool, self.workorder_agent_tool], + ) + + async def run(self, prompt: str, history) -> str: + context = "" + + # reconstruct conversation history into a string + if history: + for i in range(0, len(history), 2): + if i + 1 < len(history): + user_msg = history[i]["content"] + bot_msg = history[i + 1]["content"] + context += f"User: {user_msg}\nAssistant: {bot_msg}\n" + + # combine history with the new prompt + full_prompt = f"{context} User: {prompt}" + + result_stream = agents.Runner.run_streamed(self.main_agent, input=full_prompt) + async for event in result_stream.stream_events(): + if event.type == "raw_response_event" and isinstance(event.data, ResponseTextDeltaEvent): + print(event.data.delta, end="", flush=True) + yield event.data.delta diff --git a/ABB-Manual-Assistant/req_temp.txt b/ABB-Manual-Assistant/req_temp.txt new file mode 100644 index 00000000..a02b90f3 --- /dev/null +++ b/ABB-Manual-Assistant/req_temp.txt @@ -0,0 +1,47 @@ +annotated-types==0.7.0 +anyio==4.10.0 +attrs==25.3.0 +Authlib==1.6.1 +certifi==2025.8.3 +cffi==1.17.1 +charset-normalizer==3.4.2 +click==8.2.1 +colorama==0.4.6 +cryptography==45.0.5 +deprecation==2.1.0 +distro==1.9.0 +griffe==1.9.0 +grpcio==1.74.0 +grpcio-health-checking==1.74.0 +h11==0.16.0 +httpcore==1.0.9 +httpx==0.28.1 +httpx-sse==0.4.1 +idna==3.10 +jiter==0.10.0 +jsonschema==4.25.0 +jsonschema-specifications==2025.4.1 +mcp==1.12.3 +openai==1.99.0 +openai-agents==0.2.4 +packaging==25.0 +protobuf==6.31.1 +pycparser==2.22 +pydantic==2.11.7 +pydantic-settings==2.10.1 +pydantic_core==2.33.2 +python-dotenv==1.1.1 +python-multipart==0.0.20 +referencing==0.36.2 +requests==2.32.4 +rpds-py==0.26.0 +sniffio==1.3.1 +sse-starlette==3.0.2 +starlette==0.47.2 +tqdm==4.67.1 +types-requests==2.32.4.20250611 +typing-inspection==0.4.1 +typing_extensions==4.14.1 +urllib3==2.5.0 +uvicorn==0.35.0 +validators==0.35.0 diff --git a/ABB-Manual-Assistant/run_eval.py b/ABB-Manual-Assistant/run_eval.py new file mode 100644 index 00000000..acfa6bf4 --- /dev/null +++ b/ABB-Manual-Assistant/run_eval.py @@ -0,0 +1,581 @@ +""" +Run the ABB Manual Assistant against a Langfuse dataset and evaluate its +responses using an LLM judge. + +A score is uploaded only when: +1. The ABB agent returns a non-empty answer without an error. +2. The evaluator returns a valid evaluation result. +3. A Langfuse trace ID is available. + +Usage: + python run_eval.py \ + --langfuse_dataset_name "LLM_Judge_Errors" \ + --run_name "ABB_Eval_Run_01" \ + --limit 10 +""" + +import argparse +import asyncio +import inspect +import os +import traceback +from typing import Any + +from agents import Agent, OpenAIChatCompletionsModel, Runner +from dotenv import load_dotenv +from langfuse import get_client +from langfuse._client.datasets import DatasetItemClient +from openai import AsyncOpenAI +from orchestrator_agent import Orchestrator +from pydantic import BaseModel +from rich.progress import track +from openinference.instrumentation.openai_agents import OpenAIAgentsInstrumentor +from utils.langfuse.shared_client import flush_langfuse, langfuse_client + + + +# Replaced the old langfuse setup +load_dotenv() + +OpenAIAgentsInstrumentor().instrument() + +langfuse = get_client() + +openai_api_key = os.getenv("OPENAI_API_KEY") +openai_base_url = os.getenv("OPENAI_BASE_URL") + +if not openai_api_key: + raise RuntimeError( + "OPENAI_API_KEY is not set. Check your .env file or shell environment." + ) + +openai_client_kwargs: dict[str, Any] = { + "api_key": openai_api_key, +} + +if openai_base_url: + openai_client_kwargs["base_url"] = openai_base_url + +async_openai_client = AsyncOpenAI(**openai_client_kwargs) + + + +EVALUATOR_INSTRUCTIONS = """ +Evaluate whether the Proposed Answer matches the Ground Truth for the Question. + +Judge factual correctness, relevance, completeness, and safety where applicable. +Return a concise explanation and a boolean correctness result. +""".strip() + +EVALUATOR_TEMPLATE = """\ +# Question +{question} + +# Ground Truth +{ground_truth} + +# Proposed Answer +{proposed_response} +""" + + +class LangFuseTracedResponse(BaseModel): + answer: str | None = None + trace_id: str | None = None + error: str | None = None + + +class EvaluatorQuery(BaseModel): + question: str + ground_truth: str + proposed_response: str + + def get_query(self) -> str: + return EVALUATOR_TEMPLATE.format(**self.model_dump()) + + +class EvaluatorResponse(BaseModel): + explanation: str + is_answer_correct: bool + + + +ANSWER_FIELD_NAMES = ( + "final_output", + "output", + "answer", + "content", + "text", + "response", +) + + +def compact_repr(value: Any, max_length: int = 1200) -> str: + """Return a bounded repr for readable debug logs.""" + value_repr = repr(value) + + if len(value_repr) <= max_length: + return value_repr + + return f"{value_repr[:max_length]}... " + + +def clean_text(value: Any) -> str | None: + """ + Return text exactly as emitted by a stream chunk. + + Do not strip individual chunks: leading spaces and newlines are meaningful + in token streaming and must survive until the final assembled answer. + """ + if not isinstance(value, str): + return None + + return value if value.strip() else None + + +def extract_agent_answer(value: Any, depth: int = 0) -> str | None: + """ + Attempt to extract actual response text from common agent result shapes. + + This supports: + - Plain strings + - Dicts with output / answer / content fields + - Objects with final_output / output / answer attributes + - OpenAI-style chunk objects with choices[0].delta.content + - Tuple yields such as (answer_chunk, history) + """ + if value is None or depth > 5: + return None + + direct_text = clean_text(value) + + if direct_text is not None: + return direct_text + + # Some streaming generators yield (answer_or_chunk, history). + # Only inspect the first tuple element. Do not inspect arbitrary history. + if isinstance(value, tuple): + if not value: + return None + + return extract_agent_answer(value[0], depth + 1) + + if isinstance(value, dict): + for field_name in ANSWER_FIELD_NAMES: + if field_name not in value: + continue + + extracted = extract_agent_answer(value[field_name], depth + 1) + + if extracted is not None: + return extracted + + choices = value.get("choices") + + if isinstance(choices, (list, tuple)) and choices: + return extract_agent_answer(choices[0], depth + 1) + + delta = value.get("delta") + + if delta is not None: + return extract_agent_answer(delta, depth + 1) + + message = value.get("message") + + if message is not None: + return extract_agent_answer(message, depth + 1) + + return None + + for field_name in ANSWER_FIELD_NAMES: + attribute_value = getattr(value, field_name, None) + + if attribute_value is None: + continue + + extracted = extract_agent_answer(attribute_value, depth + 1) + + if extracted is not None: + return extracted + + choices = getattr(value, "choices", None) + + if isinstance(choices, (list, tuple)) and choices: + extracted = extract_agent_answer(choices[0], depth + 1) + + if extracted is not None: + return extracted + + delta = getattr(value, "delta", None) + + if delta is not None: + extracted = extract_agent_answer(delta, depth + 1) + + if extracted is not None: + return extracted + + message = getattr(value, "message", None) + + if message is not None: + extracted = extract_agent_answer(message, depth + 1) + + if extracted is not None: + return extracted + + return None + + +def merge_stream_text(current_text: str, incoming_text: str) -> str: + """ + Combine stream chunks without removing meaningful whitespace. + + Supports normal delta chunks and protects against a duplicate/final + cumulative snapshot being yielded by some streaming implementations. + """ + if not current_text: + return incoming_text + + if incoming_text == current_text: + return current_text + + if incoming_text.startswith(current_text): + return incoming_text + + if current_text.startswith(incoming_text): + return current_text + + return current_text + incoming_text + +async def consume_orchestrator_stream(stream: Any) -> str: + """ + Consume an async generator returned by Orchestrator.run() and build its + final response text from yielded chunks or final response objects. + """ + accumulated_answer = "" + event_count = 0 + first_event_repr: str | None = None + last_event_repr: str | None = None + + async for event in stream: + event_count += 1 + + event_repr = compact_repr(event) + + if first_event_repr is None: + first_event_repr = event_repr + + last_event_repr = event_repr + + + event_text = extract_agent_answer(event) + + if event_text is not None: + accumulated_answer = merge_stream_text( + accumulated_answer, + event_text, + ) + + if event_count == 0: + raise RuntimeError( + "Orchestrator.run() completed without yielding any stream events." + ) + + accumulated_answer = accumulated_answer.strip() + + if not accumulated_answer: + raise RuntimeError( + "Orchestrator.run() yielded stream events but no answer text could " + "be extracted. " + f"First event: {first_event_repr}. " + f"Last event: {last_event_repr}." + ) + + print( + f"Consumed {event_count} orchestrator stream event(s).", + flush=True, + ) + + return accumulated_answer + + + +async def run_agent_with_trace( + orchestrator: Orchestrator, + query: str, +) -> LangFuseTracedResponse: + """ + Run one independent dataset question through the ABB orchestrator. + + history=[] is intentional: each Langfuse dataset item is treated as an + independent evaluation rather than sharing a conversation across rows. + """ + try: + run_result = orchestrator.run(query, history=[]) + + if hasattr(run_result, "__aiter__"): + answer = await consume_orchestrator_stream(run_result) + + elif inspect.isawaitable(run_result): + resolved_result = await run_result + + print( + f"Raw orchestrator result type: " + f"{type(resolved_result).__name__}", + flush=True, + ) + print( + f"Raw orchestrator result repr: " + f"{compact_repr(resolved_result)}", + flush=True, + ) + + answer = extract_agent_answer(resolved_result) + + if answer is None: + raise RuntimeError( + "Could not extract answer text from the non-streaming " + "Orchestrator.run() result. " + f"Result: {compact_repr(resolved_result)}" + ) + + else: + print( + f"Raw synchronous orchestrator result type: " + f"{type(run_result).__name__}", + flush=True, + ) + print( + f"Raw synchronous orchestrator result repr: " + f"{compact_repr(run_result)}", + flush=True, + ) + + answer = extract_agent_answer(run_result) + + if answer is None: + raise RuntimeError( + "Could not extract answer text from the synchronous " + "Orchestrator.run() result. " + f"Result: {compact_repr(run_result)}" + ) + + return LangFuseTracedResponse( + answer=answer, + trace_id=langfuse_client.get_current_trace_id(), + ) + + except asyncio.CancelledError: + raise + + except Exception as exc: + error_message = f"{type(exc).__name__}: {exc}" + + print("\nAGENT EXECUTION FAILED", flush=True) + print(f"Query: {query}", flush=True) + print(f"Error: {error_message}", flush=True) + print(traceback.format_exc(), flush=True) + + return LangFuseTracedResponse( + answer=None, + trace_id=langfuse_client.get_current_trace_id(), + error=error_message, + ) + + + +async def run_evaluator_agent( + evaluator_query: EvaluatorQuery, +) -> EvaluatorResponse: + evaluator_agent = Agent( + name="ABB Evaluator", + instructions=EVALUATOR_INSTRUCTIONS, + output_type=EvaluatorResponse, + model=OpenAIChatCompletionsModel( + model="gemini-2.5-flash", + openai_client=async_openai_client, + ), + ) + + result = await Runner.run( + evaluator_agent, + input=evaluator_query.get_query(), + ) + + return result.final_output_as(EvaluatorResponse) + + + +async def run_and_evaluate( + run_name: str, + orchestrator: Orchestrator, + item: DatasetItemClient, +) -> tuple[LangFuseTracedResponse, EvaluatorResponse | None]: + if not isinstance(item.input, dict) or "text" not in item.input: + raise ValueError( + "Dataset item input must be an object containing a 'text' field. " + f"Got: {item.input!r}" + ) + + if ( + not isinstance(item.expected_output, dict) + or "text" not in item.expected_output + ): + raise ValueError( + "Dataset item expected_output must be an object containing a " + f"'text' field. Got: {item.expected_output!r}" + ) + + query = item.input["text"] + ground_truth = item.expected_output["text"] + + print(f"\nRunning query: {query}", flush=True) + + with item.run(run_name=run_name) as span: + span.update(input=query) + + traced_response = await run_agent_with_trace( + orchestrator=orchestrator, + query=query, + ) + + span.update( + output={ + "answer": traced_response.answer, + "error": traced_response.error, + } + ) + + # Failed agent call: do not ask the evaluator and do not upload a score. + if traced_response.error is not None: + print(f"Agent failed: {traced_response.error}", flush=True) + return traced_response, None + + # Defensive check: blank answer means no evaluator and no score. + if traced_response.answer is None: + print("Agent returned no answer; skipping evaluator.", flush=True) + return traced_response, None + + print(f"Agent response: {traced_response.answer}", flush=True) + + try: + evaluator_response = await run_evaluator_agent( + EvaluatorQuery( + question=query, + ground_truth=ground_truth, + proposed_response=traced_response.answer, + ) + ) + + return traced_response, evaluator_response + + except Exception as exc: + print("\nEVALUATOR EXECUTION FAILED", flush=True) + print(f"Query: {query}", flush=True) + print(f"Error: {type(exc).__name__}: {exc}", flush=True) + print(traceback.format_exc(), flush=True) + + # Evaluator failure: do not upload a score. + return traced_response, None + + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Run ABB Manual Assistant evaluations from a Langfuse dataset." + ) + + parser.add_argument( + "--langfuse_dataset_name", + required=True, + help="Name of the Langfuse dataset to evaluate.", + ) + + parser.add_argument( + "--run_name", + required=True, + help="Label for this evaluation run.", + ) + + parser.add_argument( + "--limit", + type=int, + default=None, + help="Optional maximum number of dataset items to evaluate.", + ) + + return parser.parse_args() + + +async def main() -> None: + args = parse_args() + + dataset = langfuse.get_dataset(args.langfuse_dataset_name) + items = dataset.items + + if args.limit is not None: + items = items[:args.limit] + + if not items: + raise RuntimeError( + f"No items found in dataset: {args.langfuse_dataset_name!r}" + ) + + # Sequential execution is deliberate while validating stream behavior and + # avoids shared-Orchestrator concurrency/state issues. + orchestrator = Orchestrator() + + results: list[tuple[LangFuseTracedResponse, EvaluatorResponse | None]] = [] + + for item in items: + result = await run_and_evaluate( + run_name=args.run_name, + orchestrator=orchestrator, + item=item, + ) + results.append(result) + + # A score is uploaded only for a fully successful evaluation. + scorable_results = [ + (traced_response, evaluator_response) + for traced_response, evaluator_response in results + if ( + traced_response.error is None + and traced_response.answer is not None + and traced_response.trace_id is not None + and evaluator_response is not None + ) + ] + + skipped_count = len(results) - len(scorable_results) + + if skipped_count: + print( + f"\nSkipping score upload for {skipped_count} failed or incomplete " + "evaluation(s).", + flush=True, + ) + + if not scorable_results: + print( + "No successful evaluations to score. No Langfuse scores will be " + "uploaded.", + flush=True, + ) + else: + for traced_response, evaluator_response in track( + scorable_results, + total=len(scorable_results), + description="Uploading scores", + ): + langfuse_client.create_score( + name="is_answer_correct", + value=evaluator_response.is_answer_correct, + comment=evaluator_response.explanation, + trace_id=traced_response.trace_id, + ) + + flush_langfuse() + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/ABB-Manual-Assistant/run_eval_from_dataset.py b/ABB-Manual-Assistant/run_eval_from_dataset.py new file mode 100644 index 00000000..f37ff470 --- /dev/null +++ b/ABB-Manual-Assistant/run_eval_from_dataset.py @@ -0,0 +1,71 @@ +import argparse +import subprocess +import sys +from pathlib import Path + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Run evaluations against a Langfuse dataset" + ) + parser.add_argument( + "--dataset-name", + required=True, + help="Name of the Langfuse dataset", + ) + parser.add_argument( + "--run-name", + required=True, + help="Name for this evaluation run", + ) + parser.add_argument( + "--limit", + type=int, + default=None, + help="Optional maximum number of rows to evaluate", + ) + return parser.parse_args() + + +def main() -> None: + args = parse_args() + base_dir = Path(__file__).resolve().parent + run_eval_path = base_dir / "run_eval.py" + + if not run_eval_path.is_file(): + raise FileNotFoundError( + f"Could not find run_eval.py at: {run_eval_path}" + ) + + cmd = [ + sys.executable, + "-u", # Unbuffered stdout/stderr so logs appear immediately. + str(run_eval_path), + "--langfuse_dataset_name", + args.dataset_name, + "--run_name", + args.run_name, + ] + + if args.limit is not None: + cmd.extend(["--limit", str(args.limit)]) + + print("Running:", " ".join(cmd), flush=True) + + try: + subprocess.run( + cmd, + cwd=base_dir, + check=True, + ) + except subprocess.CalledProcessError as exc: + print( + f"\nEvaluation run failed with exit code {exc.returncode}.", + file=sys.stderr, + flush=True, + ) + raise + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/ABB-Manual-Assistant/search_agent.py b/ABB-Manual-Assistant/search_agent.py new file mode 100644 index 00000000..1ae1d3e9 --- /dev/null +++ b/ABB-Manual-Assistant/search_agent.py @@ -0,0 +1,177 @@ +import inspect +import logging +import os +import traceback +from typing import Any + +import agents +from dotenv import load_dotenv +from openai import AsyncOpenAI + +from search_tool import VertexSearchTool + + +load_dotenv() + +logger = logging.getLogger(__name__) + + +class SearchAgent: + def __init__(self) -> None: + self.client = AsyncOpenAI( + api_key=os.getenv("OPENAI_API_KEY"), + base_url=os.getenv("OPENAI_BASE_URL"), + ) + + self.knowledge_tool = agents.function_tool( + self.search_knowledgebase, + name_override="knowledge_search", + description_override=( + "Searches the ABB robot manual vector database for the most " + "relevant technical sections." + ), + ) + + self.search_agent = agents.Agent( + name="Search Agent", + instructions=""" +You are a Search Agent specialized in retrieving exact, relevant information +from ABB robot manuals stored in a vector database. + +Your ONLY purpose is to: +1. Use the provided `knowledge_search` tool to query the database. +2. Return the most relevant technical excerpts that directly answer the query. +3. Preserve the original wording. Do not paraphrase, summarize, or add advice. + +Rules: +- Always use `knowledge_search`. +- Do not answer from your own knowledge. +- Do not guess or fill in missing technical details. +- Do not include unrelated or generic text. +- If no relevant information is found, return an empty list: []. +- If the search tool fails, clearly state that retrieval failed. Do not invent + an ABB-specific answer. + +Output Format: +[ + { + "source": "", + "url": "", + "excerpt": "", + "confidence": + } +] +""".strip(), + model=agents.OpenAIChatCompletionsModel( + model="gemini-2.5-flash", + openai_client=self.client, + ), + model_settings=agents.ModelSettings( + tool_choice="required", + temperature=0, + ), + tools=[self.knowledge_tool], + ) + + @staticmethod + def _compact_repr(value: Any, max_length: int = 4000) -> str: + value_repr = repr(value) + + if len(value_repr) <= max_length: + return value_repr + + return f"{value_repr[:max_length]}... " + + @staticmethod + async def _resolve_knowledge_result(value: Any) -> Any: + """ + Normalize whatever VertexSearchTool.get_knowledge() returns. + + It may return: + - a regular result value, + - an awaitable/coroutine, + - or an async generator. + """ + if inspect.isasyncgen(value) or hasattr(value, "__aiter__"): + return [item async for item in value] + + if inspect.isawaitable(value): + return await value + + return value + + @staticmethod + async def search_knowledgebase(query: str) -> Any: + """ + Tool function registered with the Search Agent. + + This method must remain inside SearchAgent. The prior AttributeError + means it was not present on the class at runtime. + """ + query = query.strip() + + if not query: + print("[SEARCH] Empty query received; returning no results.", flush=True) + return [] + + try: + print(f"\n[SEARCH] Query: {query!r}", flush=True) + + vertex_tool = VertexSearchTool() + + raw_result = vertex_tool.get_knowledge(query) + + print( + f"[SEARCH] Backend return type: {type(raw_result).__name__}", + flush=True, + ) + + resolved_result = await SearchAgent._resolve_knowledge_result( + raw_result + ) + + print( + f"[SEARCH] Resolved result type: " + f"{type(resolved_result).__name__}", + flush=True, + ) + print( + f"[SEARCH] Resolved result: " + f"{SearchAgent._compact_repr(resolved_result)}", + flush=True, + ) + + if resolved_result is None: + print("[SEARCH] Backend returned None; returning no results.", flush=True) + return [] + + return resolved_result + + except Exception as exc: + print("\n[SEARCH] VERTEX RETRIEVAL FAILED", flush=True) + print(f"[SEARCH] Query: {query!r}", flush=True) + print(f"[SEARCH] Error: {type(exc).__name__}: {exc}", flush=True) + print(traceback.format_exc(), flush=True) + + logger.exception( + "ABB manual retrieval failed. query=%r", + query, + ) + + # Re-raise so the agent/tool layer receives a real failure rather + # than silently converting it into an empty search result. + raise + + async def run(self, prompt: str) -> Any: + """ + Preserve the original return type for compatibility with Orchestrator. + + Do not return response.final_output here until we inspect how + orchestrator_agent.py consumes SearchAgent.run(). + """ + response = await agents.Runner.run( + self.search_agent, + input=prompt, + ) + + return response \ No newline at end of file diff --git a/ABB-Manual-Assistant/search_tool.py b/ABB-Manual-Assistant/search_tool.py new file mode 100644 index 00000000..550d6210 --- /dev/null +++ b/ABB-Manual-Assistant/search_tool.py @@ -0,0 +1,57 @@ +import json +import os + +from dotenv import load_dotenv + +from utils.tools.vertex_search import vertex_search + + +load_dotenv() + + +class VertexSearchTool: + """Compatibility wrapper for Vertex AI Search. + + This implementation delegates to the Vertex AI Search helper and returns + results in a JSON format compatible with the previous consumer code. + """ + + def __init__(self, data_name: str | None = None): + self.data_name = data_name or os.getenv("COLLECTION_NAME") + + async def create_client(self): + return None + + async def ensure_connected(self): + return None + + async def get_knowledge(self, query: str) -> str: + try: + result = await vertex_search(query) + if result.get("status") != "success": + return f"Search error: {result.get('error', 'unknown')}" + + summary = result.get("summary", "") + sources = result.get("sources", []) + + if not sources: + return "No results found." + + formatted_results = [] + for src in sources: + formatted_results.append( + { + "Document Name": src.get("title", ""), + "URL": src.get("uri", ""), + "Page Number": "", + "Full Text": summary, + } + ) + + return json.dumps(formatted_results, indent=2, sort_keys=True) + + except Exception as e: + return f"Search error: {e}" + + async def close(self): + return None diff --git a/ABB-Manual-Assistant/test_scripts/ghazaleh_vertex_test.py b/ABB-Manual-Assistant/test_scripts/ghazaleh_vertex_test.py new file mode 100644 index 00000000..2c7df329 --- /dev/null +++ b/ABB-Manual-Assistant/test_scripts/ghazaleh_vertex_test.py @@ -0,0 +1,233 @@ +"""Integration tests for querying a real Vertex AI Search vector store. + +These tests exercise the `vertex_search` helper against the datastore configured +by `VERTEX_AI_DATASTORE_ID`. If the variable is not set, a known default +datastore ID is used. +""" + +import asyncio +import base64 +import os +from pprint import pprint +from typing import Any + +import google.auth +import google.auth.transport.requests +import pytest +from aieng.agent_evals.tools.vertex_search import vertex_search +from dotenv import load_dotenv + + +DEFAULT_VERTEX_DATASTORE_ID = ( + "projects/agentic-ai-evaluation-bootcamp/locations/global/collections/default_collection/" + "dataStores/linamar-vector-bootcamp" +) + +KNOWN_GOOD_ABB_SUMMARY_QUERY = "What is ArcC used for in ABB arc welding manuals?" +KNOWN_GOOD_ABB_CITATION_QUERY = "What is ArcC used for in ABB arc welding manuals? Cite sources." + + +def configure_vertex_datastore() -> None: + """Ensure the datastore ID is available for integration tests and direct runs.""" + load_dotenv(verbose=False) + os.environ.setdefault("VERTEX_AI_DATASTORE_ID", DEFAULT_VERTEX_DATASTORE_ID) + os.environ.setdefault("GOOGLE_CLOUD_LOCATION", "us-central1") + + +@pytest.fixture(autouse=True) +def configure_vertex_datastore_fixture() -> None: + """Pytest fixture wrapper around the shared datastore setup.""" + configure_vertex_datastore() + + +async def _print_vertex_search_result(query: str = KNOWN_GOOD_ABB_SUMMARY_QUERY) -> None: + """Run a sample Vertex Search query and print the structured result.""" + configure_vertex_datastore() + result = await vertex_search(query) + print("Query:", query) + print("Status:", result.get("status")) + print("Summary:") + print((result.get("summary") or "").strip()) + print("Source count:", result.get("source_count")) + print("Sources:") + pprint(result.get("sources", [])) + print("Raw result:") + pprint(result) + + +if __name__ == "__main__": + asyncio.run(_print_vertex_search_result()) + + +def _get_datastore_document_count(datastore_resource: str) -> int: + """Return the indexed document count from Discovery Engine documents list API.""" + credentials, _ = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"]) + session = google.auth.transport.requests.AuthorizedSession(credentials) + base_url = f"https://discoveryengine.googleapis.com/v1/{datastore_resource}/branches/default_branch/documents" + + count = 0 + next_token = "" + while True: + params = {"pageSize": 100} + if next_token: + params["pageToken"] = next_token + + response = session.get(base_url, params=params, timeout=30) + response.raise_for_status() + payload = response.json() + documents = payload.get("documents", []) + count += len(documents) + + next_token = payload.get("nextPageToken", "") + if not next_token: + break + + return count + + +def _list_datastore_documents(datastore_resource: str, max_docs: int = 10) -> list[dict[str, Any]]: + """List up to ``max_docs`` documents from the configured data store.""" + credentials, _ = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"]) + session = google.auth.transport.requests.AuthorizedSession(credentials) + base_url = f"https://discoveryengine.googleapis.com/v1/{datastore_resource}/branches/default_branch/documents" + + documents: list[dict[str, Any]] = [] + next_token = "" + while len(documents) < max_docs: + params = {"pageSize": min(100, max_docs - len(documents))} + if next_token: + params["pageToken"] = next_token + + response = session.get(base_url, params=params, timeout=30) + response.raise_for_status() + payload = response.json() + documents.extend(payload.get("documents", [])) + + next_token = payload.get("nextPageToken", "") + if not next_token: + break + + return documents[:max_docs] + + +def _extract_text_preview(document: dict[str, Any], max_chars: int = 200) -> str: + """Extract a human-readable content preview from a Discovery Engine document.""" + content = document.get("content", {}) or {} + + raw_bytes = content.get("rawBytes") + if isinstance(raw_bytes, str) and raw_bytes: + try: + decoded = base64.b64decode(raw_bytes).decode("utf-8", errors="ignore").strip() + if decoded: + return decoded[:max_chars] + except Exception: + pass + + uri = content.get("uri") + if isinstance(uri, str) and uri.strip(): + return uri[:max_chars] + + struct_data = document.get("structData") + if isinstance(struct_data, dict): + for key in ("text", "content", "body", "chunk_text", "chunk"): + value = struct_data.get(key) + if isinstance(value, str) and value.strip(): + return value.strip()[:max_chars] + + title = document.get("title") + if isinstance(title, str) and title.strip(): + return title[:max_chars] + + return "" + + +@pytest.mark.integration_test +def test_vertex_datastore_has_indexed_documents() -> None: + """Ensure the configured data store contains at least one indexed document.""" + datastore = os.environ["VERTEX_AI_DATASTORE_ID"] + doc_count = _get_datastore_document_count(datastore) + assert doc_count > 0, f"No indexed documents found in datastore: {datastore}" + + +@pytest.mark.integration_test +def test_vertex_datastore_stats_and_sample_documents() -> None: + """Print and validate basic datastore stats with sample document identities.""" + datastore = os.environ["VERTEX_AI_DATASTORE_ID"] + doc_count = _get_datastore_document_count(datastore) + docs = _list_datastore_documents(datastore, max_docs=5) + + print(f"Datastore: {datastore}") + print(f"Document count: {doc_count}") + print("Sample document names:") + for doc in docs: + print(f"- {doc.get('name', '')}") + + assert doc_count > 0, "Expected at least one document in datastore" + assert docs, "Expected at least one sample document to be returned" + + +@pytest.mark.integration_test +def test_vertex_datastore_chunk_previews() -> None: + """Read and print content previews from stored documents/chunks.""" + datastore = os.environ["VERTEX_AI_DATASTORE_ID"] + docs = _list_datastore_documents(datastore, max_docs=10) + previews: list[str] = [] + + for doc in docs: + preview = _extract_text_preview(doc) + if preview: + previews.append(preview) + + print(f"Found {len(previews)} previews from {len(docs)} sampled documents") + for idx, preview in enumerate(previews[:5], start=1): + print(f"Preview {idx}: {preview}") + + assert docs, "No documents returned from datastore" + assert previews, "Could not extract any text/URI preview from sampled datastore documents" + + +@pytest.mark.integration_test +@pytest.mark.asyncio +async def test_vertex_search_returns_success_and_schema() -> None: + """Query ABB content and validate the response shape.""" + result = await vertex_search(KNOWN_GOOD_ABB_SUMMARY_QUERY) + + print("Search query:", KNOWN_GOOD_ABB_SUMMARY_QUERY) + print("Search status:", result.get("status")) + print("Search summary:") + print((result.get("summary") or "").strip()) + print("Search source_count:", result.get("source_count")) + print("Search sources:") + for idx, source in enumerate(result.get("sources", []), start=1): + print(f"- Source {idx}: title={source.get('title', '')} | uri={source.get('uri', '')}") + + assert result["status"] == "success", f"Vertex search failed: {result.get('error')}" + assert isinstance(result.get("summary"), str) + assert result["summary"].strip(), "Expected a non-empty summary" + assert isinstance(result.get("sources"), list) + assert isinstance(result.get("source_count"), int) + assert result["source_count"] == len(result["sources"]) + + +@pytest.mark.integration_test +@pytest.mark.asyncio +async def test_vertex_search_returns_uri_citations() -> None: + """Ensure a known ABB query returns citations in Vertex format.""" + result = await vertex_search(KNOWN_GOOD_ABB_CITATION_QUERY) + + print("Citation query:", KNOWN_GOOD_ABB_CITATION_QUERY) + print("Citation status:", result.get("status")) + print("Citation summary:") + print((result.get("summary") or "").strip()) + print("Citation source_count:", result.get("source_count")) + print("Citation sources:") + for idx, source in enumerate(result.get("sources", []), start=1): + print(f"- Source {idx}: title={source.get('title', '')} | uri={source.get('uri', '')}") + + assert result["status"] == "success", f"Vertex search failed: {result.get('error')}" + assert isinstance(result.get("sources"), list) + assert result["source_count"] > 0, "Expected at least one citation for known in-domain ABB query" + assert result["source_count"] == len(result["sources"]) + for source in result["sources"]: + assert "uri" in source and isinstance(source["uri"], str) and source["uri"].strip() + assert "title" in source and isinstance(source["title"], str) \ No newline at end of file diff --git a/ABB-Manual-Assistant/test_scripts/test_agent_search.py b/ABB-Manual-Assistant/test_scripts/test_agent_search.py new file mode 100644 index 00000000..9030a068 --- /dev/null +++ b/ABB-Manual-Assistant/test_scripts/test_agent_search.py @@ -0,0 +1,82 @@ +# generated test file - agent using search tool + +import os +import sys + + +# Add parent directory to sys.path +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) + +import asyncio +import os + +import agents +from dotenv import load_dotenv +from openai import AsyncOpenAI +from search_tool import VertexSearchTool # your tool + + +load_dotenv() + +# Initialize OpenAI client +client = AsyncOpenAI( + api_key=os.getenv("OPENAI_API_KEY"), + base_url=os.getenv("OPENAI_BASE_URL"), +) + +# Initialize Vertex search class +vertex_search_tool = VertexSearchTool() + + +async def search_knowledgebase(query: str) -> str: + print(f"[TOOL] Called with query: {query}") + try: + result = await vertex_search_tool.get_knowledge(query) + print(f"[TOOL] Result length: {len(result) if result else 'None'}") + print(f"[TOOL] Full text: {result}") + return result + except Exception as e: + print(f"[TOOL] Exception: {e}") + return f"Error during search: {e}" + + +knowledge_tool = agents.function_tool(search_knowledgebase) + +agent = agents.Agent( + name="Debug Knowledge Agent", + instructions="You are a helpful assistant. Use the tool to get knowledge.", + tools=[knowledge_tool], + model=agents.OpenAIChatCompletionsModel( + model="gemini-2.5-pro", + openai_client=client, + ), + model_settings=agents.ModelSettings(tool_choice="required"), +) + + +async def main(): + test_queries = [ + # "10077, FTP server down", + # "What is FTP?", + # "Explain HTTP protocol", + # "What is the largest ABB robot", + # "What are the specs for the IRB 140?", + # "What is \"EN ISO 12100 -1\"", + # "What colour is the sky", + "Spot application weld error reported" + ] + + for q in test_queries: + print("\n===============================") + print(f"Running agent with prompt: {q}") + try: + response = await agents.Runner.run(agent, input=q) + print("[AGENT FINAL OUTPUT]:\n", response.final_output) + except Exception as e: + print("[AGENT] Exception during run:", e) + + await vertex_search_tool.close() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/ABB-Manual-Assistant/test_scripts/test_orch.py b/ABB-Manual-Assistant/test_scripts/test_orch.py new file mode 100644 index 00000000..cbdac54c --- /dev/null +++ b/ABB-Manual-Assistant/test_scripts/test_orch.py @@ -0,0 +1,35 @@ +# generated test file - orchestrator agent using search tool + +import os +import sys + + +# Add parent directory to sys.path +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) + +import asyncio +import logging + +#### +import warnings + +from orchestrator_agent import Orchestrator + + +# Suppress all warnings (UserWarning, DeprecationWarning, etc.) +warnings.filterwarnings("ignore") +# Suppress log messages from all libraries +logging.basicConfig(level=logging.CRITICAL) +for name in logging.root.manager.loggerDict: + logging.getLogger(name).setLevel(logging.CRITICAL) +#### + + +async def main(): + orchestrator = Orchestrator() + response = await orchestrator.run("How to integrate IRC5?") + print(response) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/ABB-Manual-Assistant/test_scripts/test_search.py b/ABB-Manual-Assistant/test_scripts/test_search.py new file mode 100644 index 00000000..b8a12b69 --- /dev/null +++ b/ABB-Manual-Assistant/test_scripts/test_search.py @@ -0,0 +1,27 @@ +# generated test file - only search tool + +import os +import sys + + +# Add parent directory to sys.path +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) + +import asyncio + +from search_tool import VertexSearchTool + + +async def test(): + try: + vertex_search_tool = VertexSearchTool() + test_query = "Spot application weld error reported" + result = await vertex_search_tool.get_knowledge(test_query) + print("[TEST RESULT]") + print(result if result else "No result returned.") + except Exception as e: + print(f"[TEST ERROR] {e}") + + +if __name__ == "__main__": + asyncio.run(test()) diff --git a/ABB-Manual-Assistant/test_scripts/test_vertex_search.py b/ABB-Manual-Assistant/test_scripts/test_vertex_search.py new file mode 100644 index 00000000..d5b1a65d --- /dev/null +++ b/ABB-Manual-Assistant/test_scripts/test_vertex_search.py @@ -0,0 +1,25 @@ +import asyncio +from pprint import pprint + +from configs import Configs +from utils.tools.vertex_search import vertex_search + + +async def main(): + print("Loading config...") + config = Configs() + + print("Config check:") + print("vertex_datastore_id:", config.vertex_datastore_id) + print("google_cloud_location:", config.google_cloud_location) + print("default_worker_model:", config.default_worker_model) + print("default_temperature:", config.default_temperature) + + print("\nRunning Vertex Search test...") + result = await vertex_search("What does error code 10039 mean?") + + pprint(result) + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/ABB-Manual-Assistant/test_searchagent.py b/ABB-Manual-Assistant/test_searchagent.py new file mode 100644 index 00000000..75458cc4 --- /dev/null +++ b/ABB-Manual-Assistant/test_searchagent.py @@ -0,0 +1,18 @@ +import asyncio + +from search_agent import SearchAgent # Replace with actual import path + + +async def test_error_code_query(): + agent = SearchAgent() + query = "What does error code 10039 mean in ABB robot manuals?" + + print("Running test query...") + result = await agent.run(query) + + print("\n=== Test Result ===") + print(result) + + +if __name__ == "__main__": + asyncio.run(test_error_code_query()) diff --git a/ABB-Manual-Assistant/test_searchtool.py b/ABB-Manual-Assistant/test_searchtool.py new file mode 100644 index 00000000..514a0da2 --- /dev/null +++ b/ABB-Manual-Assistant/test_searchtool.py @@ -0,0 +1,13 @@ +import asyncio + +from search_tool import VertexSearchTool + + +async def main(): + search_tool = VertexSearchTool() + result = await search_tool.get_knowledge("error code 10039") + print(result) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/ABB-Manual-Assistant/ui.py b/ABB-Manual-Assistant/ui.py new file mode 100644 index 00000000..17e9403f --- /dev/null +++ b/ABB-Manual-Assistant/ui.py @@ -0,0 +1,274 @@ +from __future__ import annotations + +# Gradio powers the UI +import gradio as gr + +# Your local persistence layer (SQLite wrapper you already have) +from agent_utils.memory_store import MemoryStore + +# Our tiny “agent” that owns conversation state and history (lives in project root) +from conversation_manager import ConversationManagerAgent + +# Orchestrator remains the single “brain” that decides how to answer and streams tokens/chunks +from orchestrator_agent import Orchestrator + +# Your tracing setup (left intact so nothing breaks) +from utils import setup_langfuse_tracer +from utils.langfuse.shared_client import langfuse_client # noqa: F401 (imported for tracer wiring) + + +# ----------------------------------------------------------------------------- +# Storage + manager agent +# ----------------------------------------------------------------------------- +store = MemoryStore() +conv_agent = ConversationManagerAgent(store) # The app talks to the conversation manager, not raw DB + + +# ----------------------------------------------------------------------------- +# Small helpers for the dropdown and history mapping +# ----------------------------------------------------------------------------- +def _label_for(id_: str, title: str) -> str: + """Pretty labels for the chat selector dropdown: 'Title · abc123'.""" + return f"{title} · {id_[:6]}" + + +def _choices_and_maps(): + """ + Build dropdown choices and mapping between labels and conversation IDs. + Ensures at least one conversation exists. + """ + rows = store.list_conversations() + if not rows: + store.create_conversation() + rows = store.list_conversations() + choices = [_label_for(r[0], r[1]) for r in rows] + id_by_label = {_label_for(r[0], r[1]): r[0] for r in rows} + label_by_id = {r[0]: _label_for(r[0], r[1]) for r in rows} + return choices, id_by_label, label_by_id, choices[0] # default = first + + +def _history_messages(cid: str): + """ + Read messages from the ConversationManagerAgent and convert to + Chatbot(type='messages') format: [{role, content}, ...] + """ + rows = conv_agent.get_history_messages(cid, limit=1000) + return [{"role": r["role"], "content": r["content"]} for r in rows] + + +# ----------------------------------------------------------------------------- +# The Gradio App +# ----------------------------------------------------------------------------- +class GradioApp: + def __init__(self): + # One Orchestrator instance for the whole app + self.orchestrator = Orchestrator() + + async def run_search_stream(self, message: str, chat_history): + """ + Delegate to Orchestrator and stream chunks back. + This is intentionally simple: the Orchestrator decides how to answer. + """ + async for chunk in self.orchestrator.run(message, chat_history): + # Each 'chunk' is a piece of the assistant's text (token/phrase/etc) + yield chunk + + def launch(self): + """ + Build and launch the Gradio UI. + """ + with gr.Blocks(title="ABB Knowledgebase Search") as demo: + # Top title + gr.Markdown("### ABB Knowledgebase Search — Multi-Chat (per-chat history)") + + # Global UI state: we store the currently selected conversation_id here + state = gr.State({}) + + with gr.Row(): + # ----------------------- Sidebar: Conversation management ----------------------- + with gr.Column(scale=1, min_width=320): + gr.Markdown("**Chats**") + + # Dropdown for selecting which conversation is active + chat_dropdown = gr.Dropdown(choices=[], value=None, label="Select chat", interactive=True) + + # Buttons/inputs for CRUD on conversations + new_title = gr.Textbox(label="New chat title", placeholder="Optional (auto if blank)") + btn_new = gr.Button("New chat") + + rename_to = gr.Textbox(label="Rename to") + btn_rename = gr.Button("Rename") + + btn_delete = gr.Button("Delete chat") + btn_refresh = gr.Button("Refresh list") + + # ----------------------- Main chat panel ----------------------- + with gr.Column(scale=3): + # Chatbot uses OpenAI-style message dicts + chatbot = gr.Chatbot(label="Chat", height=520) + + # Input row: one-line textbox + Send button + with gr.Row(): + msg = gr.Textbox( + placeholder="Ask me something about ABB errors...", scale=9, label="Your Question", lines=1 + ) + send = gr.Button("Send", variant="primary", scale=1) + + # Some convenience example prompts + examples = [ + "What is Error code 10039 and possible solution?", + "What is a reference error?", + "We are getting a Motor phase short circuit. Where should we look?", + "How to set up an IRC5, and what is it?", + ] + gr.Examples(examples=examples, inputs=msg, label="Try one of these example questions:") + + # ----------------------- Callbacks ----------------------- + + # Initialize the UI on page load: + # - Populate dropdown + # - Set default conversation_id in state + # - Load that conversation's message history + def _init(): + choices, id_by_label, label_by_id, default_label = _choices_and_maps() + cid = id_by_label[default_label] + return ( + gr.update(choices=choices, value=default_label), # dropdown options + selection + {"conversation_id": cid}, # state + _history_messages(cid), # initial chat messages + ) + + demo.load(_init, inputs=None, outputs=[chat_dropdown, state, chatbot]) + + # When the user switches the dropdown, update the active conversation and show its history + def _select_chat(label, current_state): + choices, id_by_label, label_by_id, default_label = _choices_and_maps() + if not label or label not in id_by_label: + label = default_label + cid = id_by_label[label] + current_state = current_state or {} + current_state["conversation_id"] = cid + return current_state, _history_messages(cid) + + chat_dropdown.change(_select_chat, inputs=[chat_dropdown, state], outputs=[state, chatbot]) + + # The main send handler — this is an async generator that yields intermediate UI updates, + # so you see the assistant's reply grow inside the *same* chat bubble (streaming). + async def _send(user_text, current_state, current_msgs): + # Guard: ignore empty messages but keep UI outputs consistent + if not user_text or not user_text.strip(): + yield current_msgs, "", (current_state or {}) + return + + # Resolve the active conversation ID (default to first if none) + choices, id_by_label, label_by_id, default_label = _choices_and_maps() + cid = (current_state or {}).get("conversation_id") or id_by_label[default_label] + current_state = {"conversation_id": cid} + + # Persist the user's message immediately + conv_agent.save_user(cid, user_text) + + # Start from canonical history for this conversation + messages = _history_messages(cid) + + # Append user's new message + messages.append({"role": "user", "content": user_text}) + + chat_history = messages + + # Append an *empty* assistant bubble that we'll fill as chunks arrive + # Seed it with a quick typing indicator so the user sees a response is coming + messages.append({"role": "assistant", "content": "…"}) + # Push user + typing indicator to UI immediately + yield messages, "", current_state + + # Now stream the assistant reply into that last message in-place + partial = "" + try: + async for chunk in self.run_search_stream(user_text, chat_history): + partial += chunk # grow the assistant's partial reply + messages[-1]["content"] = partial # update the *same* assistant bubble + conv_agent.set_assistant_partial(cid, partial) # keep partial in RAM (not DB) + yield messages, "", current_state # push incremental update to the UI + finally: + # When stream ends (or errors), persist the final assistant message once + conv_agent.finalize_assistant(cid) + + # Reload canonical history from storage (ensures what you see is exactly what we saved) + messages = _history_messages(cid) + yield messages, "", current_state + + # Wire the Send button and Enter key to the same streaming handler + send.click(_send, inputs=[msg, state, chatbot], outputs=[chatbot, msg, state]) + msg.submit(_send, inputs=[msg, state, chatbot], outputs=[chatbot, msg, state]) + + # Create a new conversation; select it and show an empty history + def _new_chat(title): + res = conv_agent.create(title or None) + cid = res["conversation_id"] + choices, id_by_label, label_by_id, _ = _choices_and_maps() + label = label_by_id[cid] + return ( + gr.update(choices=choices, value=label), # select new chat in dropdown + {"conversation_id": cid}, # set state + _history_messages(cid), # show empty (or fresh) history + ) + + btn_new.click(_new_chat, inputs=new_title, outputs=[chat_dropdown, state, chatbot]) + + # Rename the current conversation; update the dropdown label + def _rename_chat(new_title, current_state): + choices, id_by_label, label_by_id, default_label = _choices_and_maps() + cid = (current_state or {}).get("conversation_id") or id_by_label[default_label] + if new_title and new_title.strip(): + conv_agent.rename(cid, new_title.strip()) + choices, id_by_label, label_by_id, _ = _choices_and_maps() + label = label_by_id[cid] + return gr.update(choices=choices, value=label) + + btn_rename.click(_rename_chat, inputs=rename_to, outputs=chat_dropdown) + + # Delete the current conversation; ensure one remains and switch to it + def _delete_chat(current_state): + choices, id_by_label, label_by_id, default_label = _choices_and_maps() + cid = (current_state or {}).get("conversation_id") or id_by_label[default_label] + conv_agent.delete(cid) + + # Recompute choices; pick default; load its history + choices, id_by_label, label_by_id, default_label = _choices_and_maps() + new_cid = id_by_label[default_label] + return ( + gr.update(choices=choices, value=default_label), + {"conversation_id": new_cid}, + _history_messages(new_cid), + ) + + btn_delete.click(_delete_chat, inputs=state, outputs=[chat_dropdown, state, chatbot]) + + # Just refresh the dropdown list, keeping the same selection if still valid + def _refresh_list(current_state): + choices, id_by_label, label_by_id, default_label = _choices_and_maps() + cid = (current_state or {}).get("conversation_id") + if cid and cid in label_by_id: + label = label_by_id[cid] + else: + label = default_label + return gr.update(choices=choices, value=label) + + btn_refresh.click(_refresh_list, inputs=state, outputs=chat_dropdown) + + # Start the Gradio server (enable public sharing) + demo.launch(server_name="0.0.0.0", share=True) + + +# ----------------------------------------------------------------------------- +# Entry point (kept as-is for your tracing + app start) +# ----------------------------------------------------------------------------- +def main(): + setup_langfuse_tracer() + app = GradioApp() + app.launch() + + +if __name__ == "__main__": + main() diff --git a/ABB-Manual-Assistant/upload_test_data.py b/ABB-Manual-Assistant/upload_test_data.py new file mode 100644 index 00000000..badb529a --- /dev/null +++ b/ABB-Manual-Assistant/upload_test_data.py @@ -0,0 +1,74 @@ +import pandas as pd +from dotenv import load_dotenv +from langfuse import get_client +from rich.progress import track + + +# Load environment variables from .env file +load_dotenv() + +# Initialize Langfuse client +langfuse = get_client() + +# Define the dataset name +dataset_name = "LLM_Judge_Errors" + +# Define the question-answer pairs +qa_pairs = [ + ( + "What is Error code 10039 and possible solution?", + "During startup, the system has found that data in the Serial Measurement Board (SMB) memory is not OK. All data must be OK before automatic operation is possible. Manually jogging the robot is possible. There are differences between the data stored on the SMB and the data stored in the controller. This may be due to replacement of SMB, controller or both. Possible solution is to update the Serial Measurement Board data.", + ), + ("How to fix SMB memory is not OK", "Update the Serial Measurement Board data."), + ( + "How to recover if axis computer has lost communication.", + "1) Check cable between the axis computer and the Safety System is intact and correctly connected.\n2) Check power supply connected to the Safety System.\n3) Make sure no extreme levels of electromagnetic interference are emitted close to the robot cabling.", + ), + ( + "What does error code 40038 mean?", + "It is a LOCAL illegal in routine variable declaration. Only program data declarations may have the LOCAL attribute. Remove the LOCAL attribute.", + ), + ( + "What is a reference error.", + "System should ask to specify what reference error number they are getting to better answer the question. There are many reference error", + ), + ( + "Why am I getting a programmed forced reduced error.", + "Programmed tip force too high for tool arg. Requested motor torque (Nm)= arg. Force was reduced to max motor torque.", + ), + ( + "SMB Data is missing. What should I do?", + "If proper data exists in cabinet - transfer the data to SMB-memory. If still problem - check communication cable to SMB-board. Replace SMB-board.", + ), + ( + "We are getting a Motor phase short circuit. Where should we look?", + "You have a short circuit in cables or connectors between the phases or to Ground or a Short circuit in motor between the phases or to ground. Check/replace cables and connectors. Check/replace motor.", + ), + ( + "Why am I getting a singularity problem", + "Depending on exact error number the problem is either in joint 4 or joint 6.", + ), + ( + "Why am I getting a joint not synchronized error and how to fix it.", + "The speed of joint arg before power down/failure was too high. Make a new update of the revolution counter.", + ), +] + +# Convert to DataFrame +df = pd.DataFrame(qa_pairs, columns=["question", "expected_answer"]) + +# Create the dataset in Langfuse +langfuse.create_dataset( + name=dataset_name, + description="Robot error troubleshooting Q&A dataset", + metadata={"type": "benchmark", "source": "manual_upload"}, +) + +# Upload each item +for idx, row in track(df.iterrows(), total=len(df), description="Uploading to Langfuse"): + langfuse.create_dataset_item( + dataset_name=dataset_name, + input={"text": row["question"]}, + expected_output={"text": row["expected_answer"]}, + id=f"llmjudge-{idx:03}", + ) diff --git a/ABB-Manual-Assistant/upload_test_data_from_file.py b/ABB-Manual-Assistant/upload_test_data_from_file.py new file mode 100644 index 00000000..274d05cf --- /dev/null +++ b/ABB-Manual-Assistant/upload_test_data_from_file.py @@ -0,0 +1,149 @@ +import argparse +import json +from pathlib import Path +from typing import Any + +import pandas as pd +from dotenv import load_dotenv +from langfuse import get_client +from rich.progress import track + + +load_dotenv() +langfuse = get_client() + + +REQUIRED_COLUMNS = { + "test_prompt": ["Test prompt", "test prompt", "prompt", "question"], + "expected_response": ["expected response", "expected_response", "expected_answer", "answer"], + "safety_considerations": [ + "relevant safety considerations that should be addressed", + "relevant_safety_considerations", + "safety_considerations", + "safety considerations", + ], + "expected_sources": ["expected sources (manual, page)", "expected_sources", "sources", "source"], + "expected_trace": ["expected trace", "expected_trace", "trace"], + "max_total_tokens": ["max total tokens", "max_total_tokens", "max_tokens"], + "max_total_latency": ["max total latency", "max_total_latency", "max_latency"], +} + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Upload a benchmark dataset into Langfuse") + parser.add_argument("--dataset-name", required=True, help="Name of the Langfuse dataset") + parser.add_argument("--input-file", required=True, help="Path to a CSV, JSON, or JSONL file containing the test cases") + parser.add_argument( + "--description", + default="Robot error troubleshooting Q&A dataset", + help="Description for the Langfuse dataset", + ) + return parser.parse_args() + + +def load_rows(path: Path) -> pd.DataFrame: + suffix = path.suffix.lower() + if suffix == ".csv": + return pd.read_csv(path) + if suffix == ".json": + with path.open("r", encoding="utf-8") as fh: + data = json.load(fh) + if isinstance(data, list): + return pd.DataFrame(data) + if isinstance(data, dict) and "rows" in data: + return pd.DataFrame(data["rows"]) + raise ValueError("JSON file must contain a list of objects or a {'rows': [...]} object") + if suffix == ".jsonl": + records: list[dict[str, Any]] = [] + with path.open("r", encoding="utf-8") as fh: + for line in fh: + line = line.strip() + if line: + records.append(json.loads(line)) + return pd.DataFrame(records) + raise ValueError("Unsupported file type. Use .csv, .json, or .jsonl") + + +def normalize_columns(df: pd.DataFrame) -> pd.DataFrame: + normalized = df.copy() + for canonical, aliases in REQUIRED_COLUMNS.items(): + for alias in aliases: + if alias in normalized.columns: + normalized[canonical] = normalized[alias] + break + missing = [canonical for canonical in REQUIRED_COLUMNS if canonical not in normalized.columns] + if missing: + raise ValueError( + "Missing required columns. Expected one of: " + + ", ".join(f"{canonical} ({' | '.join(REQUIRED_COLUMNS[canonical])})" for canonical in REQUIRED_COLUMNS) + ) + return normalized + + +def _sanitize_value(value: Any) -> Any: + if pd.isna(value): + return None + if isinstance(value, float) and value.is_integer(): + return int(value) + return value + + +def build_expected_output(row: pd.Series) -> dict[str, Any]: + return { + "text": str(row["expected_response"]), + "safety_considerations": _sanitize_value(row.get("safety_considerations")), + "expected_sources": _sanitize_value(row.get("expected_sources")), + "expected_trace": _sanitize_value(row.get("expected_trace")), + "max_total_tokens": _sanitize_value(row.get("max_total_tokens")), + "max_total_latency": _sanitize_value(row.get("max_total_latency")), + } + + +def ensure_dataset(dataset_name: str, description: str) -> None: + try: + langfuse.get_dataset(dataset_name) + print(f"Dataset '{dataset_name}' already exists; reusing it.") + except Exception: + langfuse.create_dataset( + name=dataset_name, + description=description, + metadata={"type": "benchmark", "source": "manual_upload"}, + ) + print(f"Created dataset '{dataset_name}'.") + + +def main() -> None: + args = parse_args() + input_path = Path(args.input_file).expanduser().resolve() + if not input_path.exists(): + raise FileNotFoundError(f"Input file not found: {input_path}") + + df = load_rows(input_path) + df = normalize_columns(df) + ensure_dataset(args.dataset_name, args.description) + + for idx, row in track(df.iterrows(), total=len(df), description="Uploading to Langfuse"): + # Prepare sanitized metadata fields (move all non-input expectations into metadata) + metadata = { + "source_file": str(input_path), + "row_index": int(idx), + "safety_considerations": _sanitize_value(row.get("safety_considerations")), + "expected_sources": _sanitize_value(row.get("expected_sources")), + "expected_trace": _sanitize_value(row.get("expected_trace")), + "max_total_tokens": _sanitize_value(row.get("max_total_tokens")), + "max_total_latency": _sanitize_value(row.get("max_total_latency")), + } + + langfuse.create_dataset_item( + dataset_name=args.dataset_name, + input={"text": str(row["test_prompt"])}, + expected_output=str(row["expected_response"]), + metadata=metadata, + id=f"{args.dataset_name.lower().replace(' ', '-')}-{idx:03}", + ) + + print(f"Uploaded {len(df)} rows to dataset '{args.dataset_name}'.") + + +if __name__ == "__main__": + main() diff --git a/ABB-Manual-Assistant/utils/__init__.py b/ABB-Manual-Assistant/utils/__init__.py new file mode 100644 index 00000000..d9a32f62 --- /dev/null +++ b/ABB-Manual-Assistant/utils/__init__.py @@ -0,0 +1,3 @@ +"""Shared toolings for reference implementations.""" + +from .langfuse.oai_sdk_setup import setup_langfuse_tracer diff --git a/ABB-Manual-Assistant/utils/async_utils.py b/ABB-Manual-Assistant/utils/async_utils.py new file mode 100644 index 00000000..707f5715 --- /dev/null +++ b/ABB-Manual-Assistant/utils/async_utils.py @@ -0,0 +1,54 @@ +"""Utils for async workflows.""" + +import asyncio +import types +from typing import Any, Awaitable, Callable, Coroutine, Sequence, TypeVar + +from rich.progress import Progress + + +T = TypeVar("T") + + +async def indexed(index: int, coro: Coroutine[None, None, T]) -> tuple[int, T]: + """Return (index, await coro).""" + return index, (await coro) + + +async def rate_limited(_fn: Callable[[], Awaitable[T]], semaphore: asyncio.Semaphore) -> T: + """Run _fn with semaphore rate limit.""" + async with semaphore: + return await _fn() + + +async def gather_with_progress( + coros: "list[types.CoroutineType[Any, Any, T]]", + description: str = "Running tasks", +) -> Sequence[T]: + """ + Run a list of coroutines concurrently, display a rich.Progress bar as each finishes. + + Returns the results in the same order as the input list. + + :param coros: List of coroutines to run. + :return: List of results, ordered to match the input coroutines. + """ + # Wrap each coroutine in a Task and remember its original index + tasks = [asyncio.create_task(indexed(index=index, coro=coro)) for index, coro in enumerate(coros)] + + # Pre‐allocate a results list; we'll fill in each slot as its Task completes + results: list[T | None] = [None] * len(tasks) + + # Create and start a Progress bar with a total equal to the number of tasks + with Progress() as progress: + progress_task = progress.add_task(description, total=len(tasks)) + + # as_completed yields each Task as soon as it finishes + for finished in asyncio.as_completed(tasks): + index, result = await finished + results[index] = result + progress.update(progress_task, advance=1) + + # At this point, every slot in `results` is guaranteed to be non‐None + # so we can safely cast it back to List[T] + return results # type: ignore diff --git a/ABB-Manual-Assistant/utils/data/__init__.py b/ABB-Manual-Assistant/utils/data/__init__.py new file mode 100644 index 00000000..99eee296 --- /dev/null +++ b/ABB-Manual-Assistant/utils/data/__init__.py @@ -0,0 +1,4 @@ +from .load_dataset import get_dataset, get_dataset_url_hash + + +__all__ = ["get_dataset", "get_dataset_url_hash"] diff --git a/ABB-Manual-Assistant/utils/data/batching.py b/ABB-Manual-Assistant/utils/data/batching.py new file mode 100644 index 00000000..41f91a3c --- /dev/null +++ b/ABB-Manual-Assistant/utils/data/batching.py @@ -0,0 +1,38 @@ +"""Utils for creating batches of data for performance.""" + +from typing import TypeVar + + +V = TypeVar("V") + + +def create_batches( + items: list[V], + batch_size: int, + limit: int | None = None, + keep_trailing: bool = True, +) -> list[list[V]]: + """Transform the list of items into batches. + + Params: + limit: number of items to include in total + keep_trailing: if False, the last few items that + does not fit in a full batch will not be returned. + + Return: + List of batches. + """ + batches: list[list[V]] = [[]] + for _index, _item in enumerate(items): + if (limit is not None) and (_index >= limit): + break + + batches[-1].append(_item) + if len(batches[-1]) == batch_size: + batches.append([]) + + # Discard trailing batch if empty or required + if (len(batches[-1]) == 0) or ((not keep_trailing) and (len(batches[-1]) < batch_size)): + batches.pop(-1) + + return batches diff --git a/ABB-Manual-Assistant/utils/data/load_dataset.py b/ABB-Manual-Assistant/utils/data/load_dataset.py new file mode 100644 index 00000000..a40fb758 --- /dev/null +++ b/ABB-Manual-Assistant/utils/data/load_dataset.py @@ -0,0 +1,83 @@ +"""Logic for loading datasets.""" + +import hashlib +import os.path +import re + +import datasets +import pandas as pd +import pydantic + + +PATTERN = re.compile( + r"(?P[^:]+)://" + r"(?P[^:@]+)?" + r"(@(?P[a-f\d]+))?" + r"(\[(?P\w+)\])?" + r"(:(?P\w+))?$" +) + + +class _SourceInfo(pydantic.BaseModel): + provider: str + repo: str + version: str | None = None + subset: str | None = None + split: str = "train" + + @staticmethod + def _from_url(dataset_url: str) -> "_SourceInfo": + """Parse URL.""" + url_match = PATTERN.match(dataset_url) + dataset_info = _SourceInfo(**url_match.groupdict()) if url_match else None + if dataset_info is None: + raise ValueError("Invalid URL pattern. Should be {provider}://{path}[@{commit}]:{split}") + + return dataset_info + + +def get_dataset(dataset_url: str, limit: int | None = None) -> pd.DataFrame: + """Load dataset from the given URL. + + Params + ------ + dataset_url: in the following format: + {provider}://{path}[@{commit}][[subset]]:{split} + limit: optional; max number of items to include. + + Returns + ------- + Huggingface dataset instance. + """ + dataset_info = _SourceInfo._from_url(dataset_url) + if dataset_info.provider == "hf": + return _load_hf(dataset_info, limit=limit).to_pandas() # type: ignore + + raise ValueError(f"Dataset provider not supported: {dataset_info.provider}. Available options: hf") + + +def get_dataset_url_hash(dataset_url: str) -> str: + """Hash dataset url for attribution.""" + return hashlib.sha256(dataset_url.encode()).hexdigest()[:6] + + +def _load_hf(dataset_info: _SourceInfo, limit: int | None = None) -> datasets.Dataset: + """Load HF dataset.""" + # Prefer load_from_disk locally. + # If not possible, load from Hub or local snapshot using load_dataset. + if (dataset_info.version is None) and (os.path.exists(dataset_info.repo)): + try: + dataset_or_dict = datasets.load_from_disk(dataset_info.repo) + if dataset_info.split is not None: + return dataset_or_dict[dataset_info.split] # type: ignore + + return dataset_or_dict # type: ignore + + except FileNotFoundError: + pass # type: ignore + + split_name = dataset_info.split + if limit is not None: + split_name += f"[0:{limit}]" + + return datasets.load_dataset(dataset_info.repo, name=dataset_info.subset, split=split_name) # type: ignore diff --git a/ABB-Manual-Assistant/utils/env_vars.py b/ABB-Manual-Assistant/utils/env_vars.py new file mode 100644 index 00000000..f6f0a261 --- /dev/null +++ b/ABB-Manual-Assistant/utils/env_vars.py @@ -0,0 +1,53 @@ +"""Interface for storing and accessing config env vars.""" + +from os import environ + +import pydantic + + +class Configs(pydantic.BaseModel): + """Type-friendly collection of env var configs.""" + + # Embeddings + embedding_base_url: str | None = None + embedding_api_key: str | None = None + + # Vertex AI Search (custom knowledge base) + # Full resource name, e.g.: + # projects/{project}/locations/global/collections/default_collection/dataStores/{id} + vertex_datastore_id: str | None = None + google_cloud_location: str = "us-central1" + + # Langfuse + langfuse_public_key: str + langfuse_secret_key: str + langfuse_host: str = "https://us.cloud.langfuse.com" + + def _check_langfuse(self): + """Ensure that Langfuse pk and sk are in the right place.""" + if not self.langfuse_public_key.startswith("pk-lf-"): + raise ValueError("LANGFUSE_PUBLIC_KEY should start with pk-lf-") + + if not self.langfuse_secret_key.startswith("sk-lf-"): + raise ValueError("LANGFUSE_SECRET_KEY should start with sk-lf-") + + @staticmethod + def from_env_var() -> "Configs": + """Initialize from env vars.""" + # Add only config line items defined in Configs. + data: dict[str, str] = {} + for k, v in environ.items(): + _key = k.lower() + data[_key] = v + + try: + config = Configs(**data) + config._check_langfuse() + return config + + except pydantic.ValidationError as e: + raise ValueError( + "Some ENV VARs are missing. See above for details. " + "Try to load your .env file as follows: \n" + "```\nuv run --env-file .env -m ...\n```" + ) from e diff --git a/ABB-Manual-Assistant/utils/gradio/__init__.py b/ABB-Manual-Assistant/utils/gradio/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/ABB-Manual-Assistant/utils/gradio/messages.py b/ABB-Manual-Assistant/utils/gradio/messages.py new file mode 100644 index 00000000..2fb2cb62 --- /dev/null +++ b/ABB-Manual-Assistant/utils/gradio/messages.py @@ -0,0 +1,146 @@ +"""Tools for integrating with the Gradio chatbot UI.""" + +from typing import TYPE_CHECKING + +from agents import StreamEvent, stream_events +from agents.items import MessageOutputItem, RunItem, ToolCallItem, ToolCallOutputItem +from gradio.components.chatbot import ChatMessage +from openai.types.responses import ( + ResponseCompletedEvent, + ResponseFunctionToolCall, + ResponseOutputMessage, + ResponseOutputText, +) + +from ..pretty_printing import pretty_print + + +if TYPE_CHECKING: + from openai.types.chat import ChatCompletionMessageParam + + +def gradio_messages_to_oai_chat( + messages: list[ChatMessage | dict], +) -> list["ChatCompletionMessageParam"]: + """Translate Gradio chat message history to OpenAI format.""" + output: list["ChatCompletionMessageParam"] = [] + for message in messages: + if isinstance(message, dict): + output.append(message) # type: ignore[arg-type] + continue + + message_content = message.content + assert isinstance(message_content, str), message_content + output.append({"role": message.role, "content": message_content}) # type: ignore[arg-type,misc] + + return output + + +def _oai_response_output_item_to_gradio(item: RunItem) -> list[ChatMessage] | None: + """Map OAI SDK new RunItem (response.new_items) to gr messages. + + Returns None if message is of unknown/unsupported type. + """ + print(type(item)) + pretty_print(item) + + if isinstance(item, ToolCallItem): + raw_item = item.raw_item + + if isinstance(raw_item, ResponseFunctionToolCall): + return [ + ChatMessage( + role="assistant", + content=f"```\n{raw_item.arguments}\n```\n`{raw_item.call_id}`", + metadata={ + "title": f"Used tool `{raw_item.name}`", + }, + ) + ] + + if isinstance(item, ToolCallOutputItem): + function_output = item.raw_item["output"] + call_id = item.raw_item.get("call_id", None) + + if isinstance(function_output, str): + return [ + ChatMessage( + role="assistant", + content=f"> {function_output}\n\n`{call_id}`", + metadata={ + "title": "Tool response", + }, + ) + ] + + if isinstance(item, MessageOutputItem): + message_content = item.raw_item + + output_texts: list[str] = [] + for response_text in message_content.content: + if isinstance(response_text, ResponseOutputText): + output_texts.append(response_text.text) + + return [ChatMessage(role="assistant", content=_text) for _text in output_texts] + + return None + + +def oai_agent_items_to_gradio_messages( + new_items: list[RunItem], +) -> list[ChatMessage]: + """Parse agent sdk "new items" into a list of gr messages. + + Adds extra data for tool use to make the gradio display informative. + """ + output: list[ChatMessage] = [] + for item in new_items: + maybe_messages = _oai_response_output_item_to_gradio(item) + if maybe_messages is not None: + output.extend(maybe_messages) + + return output + + +def oai_agent_stream_to_gradio_messages( + stream_event: StreamEvent, +) -> list[ChatMessage]: + """Parse agent sdk "stream event" into a list of gr messages. + + Adds extra data for tool use to make the gradio display informative. + """ + output: list[ChatMessage] = [] + if isinstance(stream_event, stream_events.RawResponsesStreamEvent): + data = stream_event.data + if isinstance(data, ResponseCompletedEvent): + for message in data.response.output: + if isinstance(message, ResponseOutputMessage): + for _item in message.content: + if isinstance(_item, ResponseOutputText): + output.append(ChatMessage(role="assistant", content=_item.text)) + + elif isinstance(message, ResponseFunctionToolCall): + output.append( + ChatMessage( + role="assistant", + content=f"```\n{message.arguments}\n```", + metadata={ + "title": f"Used tool `{message.name}`", + }, + ) + ) + elif isinstance(stream_event, stream_events.RunItemStreamEvent): + name = stream_event.name + item = stream_event.item + if name == "tool_output" and isinstance(item, ToolCallOutputItem): + output.append( + ChatMessage( + role="assistant", + content=f"```\n{item.output}\n```", + metadata={ + "title": "*Tool call output*", + }, + ) + ) + + return output diff --git a/ABB-Manual-Assistant/utils/langfuse/oai_sdk_setup.py b/ABB-Manual-Assistant/utils/langfuse/oai_sdk_setup.py new file mode 100644 index 00000000..8432cc35 --- /dev/null +++ b/ABB-Manual-Assistant/utils/langfuse/oai_sdk_setup.py @@ -0,0 +1,42 @@ +"""Utils for redirecting OpenAI Agent SDK traces to LangFuse via OpenTelemetry. + +Full documentation: +langfuse.com/docs/integrations/openaiagentssdk/openai-agents +""" + +import logfire +import nest_asyncio +from opentelemetry import trace +from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor + +from .otlp_env_setup import set_up_langfuse_otlp_env_vars + + +def configure_oai_agents_sdk(service_name: str) -> None: + """Register Langfuse as tracing provider for OAI Agents SDK.""" + nest_asyncio.apply() + logfire.configure(service_name=service_name, send_to_logfire=False, scrubbing=False) + logfire.instrument_openai_agents() + + +def setup_langfuse_tracer(service_name: str = "agents_sdk") -> "trace.Tracer": + """Register Langfuse as the default tracing provider and return tracer. + + Returns + ------- + tracer: OpenTelemetry Tracer + """ + set_up_langfuse_otlp_env_vars() + configure_oai_agents_sdk(service_name) + + # Create a TracerProvider for OpenTelemetry + trace_provider = TracerProvider() + + # Add a SimpleSpanProcessor with the OTLPSpanExporter to send traces + trace_provider.add_span_processor(SimpleSpanProcessor(OTLPSpanExporter())) + + # Set the global default tracer provider + trace.set_tracer_provider(trace_provider) + return trace.get_tracer(__name__) diff --git a/ABB-Manual-Assistant/utils/langfuse/otlp_env_setup.py b/ABB-Manual-Assistant/utils/langfuse/otlp_env_setup.py new file mode 100644 index 00000000..6adb35de --- /dev/null +++ b/ABB-Manual-Assistant/utils/langfuse/otlp_env_setup.py @@ -0,0 +1,27 @@ +"""Set up environment variables for LangFuse integration.""" + +import base64 +import logging +import os + +from ..env_vars import Configs + + +def set_up_langfuse_otlp_env_vars(): + """Set up environment variables for Langfuse OpenTelemetry integration. + + OTLP = OpenTelemetry Protocol. + + This function updates environment variables. + + Also refer to: + langfuse.com/docs/integrations/openaiagentssdk/openai-agents + """ + configs = Configs.from_env_var() + + langfuse_auth = base64.b64encode(f"{configs.langfuse_public_key}:{configs.langfuse_secret_key}".encode()).decode() + + os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = configs.langfuse_host + "/api/public/otel" + os.environ["OTEL_EXPORTER_OTLP_HEADERS"] = f"Authorization=Basic {langfuse_auth}" + + logging.info(f"Langfuse host: {configs.langfuse_host}") diff --git a/ABB-Manual-Assistant/utils/langfuse/shared_client.py b/ABB-Manual-Assistant/utils/langfuse/shared_client.py new file mode 100644 index 00000000..d9c5eed0 --- /dev/null +++ b/ABB-Manual-Assistant/utils/langfuse/shared_client.py @@ -0,0 +1,36 @@ +"""Shared instance of langfuse client.""" + +from os import getenv + +from langfuse import Langfuse +from rich.progress import Progress, SpinnerColumn, TextColumn + +from ..env_vars import Configs + + +__all__ = ["langfuse_client"] + + +config = Configs.from_env_var() +# Only initialize Langfuse if keys are provided. Some environments won't use tracing. +lf_pub = config.langfuse_public_key +lf_sec = config.langfuse_secret_key +if lf_pub and lf_sec: + langfuse_client = Langfuse(public_key=lf_pub, secret_key=lf_sec) +else: + langfuse_client = None + + +def flush_langfuse(client: "Langfuse | None" = None): + """Flush shared LangFuse Client. Rich Progress included.""" + if client is None: + client = langfuse_client + + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + transient=True, + ) as progress: + progress.add_task("Finalizing Langfuse annotations...", total=None) + if client is not None: + client.flush() diff --git a/ABB-Manual-Assistant/utils/langfuse/trace_id.py b/ABB-Manual-Assistant/utils/langfuse/trace_id.py new file mode 100644 index 00000000..e1b93fa4 --- /dev/null +++ b/ABB-Manual-Assistant/utils/langfuse/trace_id.py @@ -0,0 +1,11 @@ +""" +Obtain trace_id, required for linking trace to dataset row. + +Full documentation: +langfuse.com/docs/integrations/openaiagentssdk/example-evaluating-openai-agents +running-the-agent-on-the-dataset +""" + + +def get_langfuse_trace_id(): + """Obtain "formatted" trace_id for LangFuse.""" diff --git a/ABB-Manual-Assistant/utils/logging.py b/ABB-Manual-Assistant/utils/logging.py new file mode 100644 index 00000000..492d9e75 --- /dev/null +++ b/ABB-Manual-Assistant/utils/logging.py @@ -0,0 +1,35 @@ +"""Set up logging, warning, etc.""" + +import logging +import warnings + + +class IgnoreOpenAI401Filter(logging.Filter): + """ + A logging filter that excludes specific OpenAI client error messages. + + Filters out: 'ERROR:openai.agents:[non-fatal] Tracing client error 401' + """ + + def filter(self, record: logging.LogRecord) -> bool: + """Define filter logic.""" + msg = record.getMessage() + return not ( + record.levelname == "ERROR" + and record.name == "openai.agents" + and "[non-fatal] Tracing client error 401" in msg + ) + + +def set_up_logging(): + """Set up Logging and Warning levels.""" + root_logger = logging.getLogger() + filter_ = IgnoreOpenAI401Filter() + + if not root_logger.handlers: + logging.basicConfig() + + for handler in root_logger.handlers: + handler.addFilter(filter_) + + warnings.filterwarnings("ignore", category=ResourceWarning) diff --git a/ABB-Manual-Assistant/utils/pretty_printing.py b/ABB-Manual-Assistant/utils/pretty_printing.py new file mode 100644 index 00000000..2504a9c5 --- /dev/null +++ b/ABB-Manual-Assistant/utils/pretty_printing.py @@ -0,0 +1,34 @@ +"""Pretty-Print Utils.""" + +import json +from typing import Any + +import pydantic + + +def _serializer(item: Any) -> dict[str, Any] | str: + """Serialize using heuristics.""" + if isinstance(item, pydantic.BaseModel): + return item.model_dump() + return str(item) + + +def pretty_print(data: Any) -> str: + """Extract and JSON-dump only the 'properties' field from result objects.""" + try: + if isinstance(data, list): + properties_list = [] + for obj in data: + if hasattr(obj, "properties"): + properties_list.append(obj.properties) + else: + properties_list.append(_serializer(obj)) + else: + properties_list = [getattr(data, "properties", _serializer(data))] + + output = json.dumps(properties_list, indent=2) + print(output) + return output + + except Exception as e: + return f"Error during pretty print: {e}" diff --git a/ABB-Manual-Assistant/utils/tools/README.md b/ABB-Manual-Assistant/utils/tools/README.md new file mode 100644 index 00000000..55666d29 --- /dev/null +++ b/ABB-Manual-Assistant/utils/tools/README.md @@ -0,0 +1,8 @@ +# Tools for Agents + +This module contains various tools for LLM agents. + +```bash +# Tool for getting a list of recent news headlines from enwiki +uv run -m src.utils.tools.news_events +``` diff --git a/ABB-Manual-Assistant/utils/tools/__init__.py b/ABB-Manual-Assistant/utils/tools/__init__.py new file mode 100644 index 00000000..974f12ea --- /dev/null +++ b/ABB-Manual-Assistant/utils/tools/__init__.py @@ -0,0 +1 @@ +from .news_events import get_news_events diff --git a/ABB-Manual-Assistant/utils/tools/code_interpreter.py b/ABB-Manual-Assistant/utils/tools/code_interpreter.py new file mode 100644 index 00000000..957d9c44 --- /dev/null +++ b/ABB-Manual-Assistant/utils/tools/code_interpreter.py @@ -0,0 +1,121 @@ +"""Code interpreter tool.""" + +from pathlib import Path +from typing import Sequence + +from e2b_code_interpreter import AsyncSandbox +from pydantic import BaseModel + +from ..async_utils import gather_with_progress + + +class _CodeInterpreterOutputError(BaseModel): + """Error from code interpreter.""" + + name: str + value: str + traceback: str + + +class CodeInterpreterOutput(BaseModel): + """Output from code interpreter.""" + + stdout: list[str] + stderr: list[str] + error: _CodeInterpreterOutputError | None = None + + def __init__(self, stdout: list[str], stderr: list[str], **kwargs): + """Split lines in stdout and stderr.""" + stdout_processed = [] + for _line in stdout: + stdout_processed.extend(_line.splitlines()) + + stderr_processed = [] + for _line in stderr: + stderr_processed.extend(_line.splitlines()) + + super().__init__(stdout=stdout_processed, stderr=stderr_processed, **kwargs) + + +async def _upload_file(sandbox: "AsyncSandbox", local_path: "str | Path") -> str: + """Upload file to sandbox. + + Returns + ------- + str, denoting the remote path. + """ + path = Path(local_path) + remote_path = f"{path.name}" + with open(local_path, "rb") as file: + await sandbox.files.write(remote_path, file) + + return remote_path + + +async def _upload_files(sandbox: "AsyncSandbox", paths: Sequence[Path | str]) -> list[str]: + """Upload files to the sandbox. + + Parameters + ---------- + paths: Sequence[pathlib.Path | str] + Files to upload to the sandbox. + + Returns + ------- + list[str] + List of remote paths, one per file. + """ + if not paths: + return [] + + file_upload_coros = [_upload_file(sandbox, _path) for _path in paths] + remote_paths = await gather_with_progress(file_upload_coros, description=f"Uploading {len(paths)} to sandbox") + return list(remote_paths) + + +class CodeInterpreter: + """Code Interpreter tool for the agent.""" + + def __init__( + self, + local_files: "Sequence[Path | str]| None" = None, + timeout_seconds: int = 30, + ): + """Configure your Code Interpreter session. + + Note that the sandbox is not persistent, and each run_code will + execute in a fresh sandbox! (e.g., variables need to be re-declared each time.) + + Parameters + ---------- + local_files : list[pathlib.Path | str] | None + Optionally, specify a list of local files (as paths) + to upload to sandbox working directory. + timeout_seconds : int + Limit executions to this duration. + """ + self.timeout_seconds = timeout_seconds + self.local_files = local_files if local_files else [] + + async def run_code(self, code: str) -> str: + """Run the given Python code in a sandbox environment. + + Parameters + ---------- + code : str + Python logic to execute. + """ + sbx = await AsyncSandbox.create(timeout=self.timeout_seconds) + await _upload_files(sbx, self.local_files) + + try: + result = await sbx.run_code(code, on_error=lambda error: print(error.traceback)) + response = CodeInterpreterOutput.model_validate_json(result.logs.to_json()) + + error = result.error + if error is not None: + response.error = _CodeInterpreterOutputError.model_validate_json(error.to_json()) + + return response.model_dump_json() + finally: + await sbx.kill() diff --git a/ABB-Manual-Assistant/utils/tools/news_events.py b/ABB-Manual-Assistant/utils/tools/news_events.py new file mode 100644 index 00000000..fe363bc1 --- /dev/null +++ b/ABB-Manual-Assistant/utils/tools/news_events.py @@ -0,0 +1,155 @@ +#!/usr/bin/env python3 +"""Fetch and parse Wikipedia Current Events into structured data using Pydantic.""" + +from __future__ import annotations + +import argparse +import asyncio +import random +from collections import defaultdict +from datetime import date, timedelta +from typing import Any + +import httpx +from bs4 import BeautifulSoup +from pydantic import BaseModel, RootModel +from rich.progress import Progress, SpinnerColumn, TextColumn, TimeElapsedColumn + + +class NewsEvent(BaseModel): + """Represents a single current event item.""" + + date: date + category: str + description: str + + +class CurrentEvents(RootModel): + """Mapping of event category to a list of Event items.""" + + root: dict[str, list[NewsEvent]] + + +async def _fetch_current_events_html() -> str: + """ + Retrieve the HTML for the Wikipedia Current Events page for a given date. + + Returns + ------- + Raw HTML string of the parsed page. + """ + # pick a random month between January and May + # (the knowledge base is not updated after May 30, 2025) + # and a random day in that month + random.seed(42) + random_date = date(2025, 1, 1) + timedelta(days=random.randint(0, (date(2025, 5, 20) - date(2025, 1, 1)).days)) + # convert to Year_Month_day format (example: 2025_May_6) + date_str = random_date.strftime("%Y_%B_%d") + + api_url = "https://en.wikipedia.org/w/api.php" + params = { + "action": "parse", + "page": f"Portal:Current_events/{date_str}", + "prop": "text", + "format": "json", + } + client = httpx.AsyncClient() + + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + TimeElapsedColumn(), + ) as progress: + progress.add_task("GET wikipedia/Portal:Current_events...") + resp = await client.get(api_url, params=params) + + resp.raise_for_status() + data = resp.json() + return data["parse"]["text"]["*"] + + +def _parse_current_events(html: str) -> dict[str, list[NewsEvent]]: + """ + Parse the HTML of the Wikipedia Current Events portal and extract a list of events. + + Args: + html: The HTML content of the portal or date subpage. + + Returns + ------- + A dict mapping category -> list of Events + """ + soup = BeautifulSoup(html, "lxml") + events_by_category: dict[str, list[NewsEvent]] = defaultdict(list) + # Find each date block + date_divs = soup.find_all("div", class_="current-events-main vevent") + + for date_div in date_divs: + date_div: Any + # Extract ISO date + date_span = date_div.find("span", class_="bday") + date_str = date_span.get_text(strip=True) if date_span else "" + + # Find the content section + content_div = date_div.find("div", class_="current-events-content") + if not content_div: + continue + + # Iterate through each category heading and its events + for p_tag in content_div.find_all("p"): + b_tag = p_tag.find("b") + if not b_tag: + continue + category = b_tag.get_text(strip=True) + + # The next sibling
    contains the list of events for this category + ul = p_tag.find_next_sibling(lambda tag: tag.name == "ul") + if not ul: + continue + + # Iterate top-level list items as individual events + for li in ul.find_all("li", recursive=False): + # Join all text fragments for a clean description + description = " ".join(li.stripped_strings) + events_by_category[category].append( + NewsEvent( + date=date.fromisoformat(date_str), + category=category, + description=description, + ) + ) + + return events_by_category + + +async def get_news_events() -> CurrentEvents: + """Return a list of current news events from the English Wikipedia. + + Returns + ------- + dict mapping category of news events to list of news headlines. + """ + html = await _fetch_current_events_html() + events_dict = _parse_current_events(html) + + return CurrentEvents.model_validate(events_dict) + + +async def main() -> None: + """Fetch, parse, and output events as JSON.""" + parser = argparse.ArgumentParser(description="Fetch and parse Wikipedia Current Events into structured JSON.") + parser.add_argument("--output", "-o", help="Output JSON file path (default: stdout)") + args = parser.parse_args() + + news_events = await get_news_events() + output = news_events.model_dump_json(indent=2) + + if args.output: + with open(args.output, "w", encoding="utf-8") as f: + f.write(output) + else: + print(output) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/ABB-Manual-Assistant/utils/tools/vertex_search.py b/ABB-Manual-Assistant/utils/tools/vertex_search.py new file mode 100644 index 00000000..14b15ad4 --- /dev/null +++ b/ABB-Manual-Assistant/utils/tools/vertex_search.py @@ -0,0 +1,253 @@ +"""Vertex AI Search tool for knowledge-grounded QA using a custom data store. + +This module provides a search tool that queries a Vertex AI Search data store, +returning grounded summaries with document citations. Unlike the Google Search +tool, content is retrieved by the grounding mechanism — no separate fetch step +is required and no API key is needed (authentication uses ADC). +""" + +import logging +from typing import Any + +from configs import Configs + +from google.adk.tools.function_tool import FunctionTool +from google.genai import Client, types + + +logger = logging.getLogger(__name__) + + +def _parse_project_from_datastore_id(datastore_id: str) -> str | None: + """Parse GCP project ID from a Vertex AI Search data store resource name. + + Parameters + ---------- + datastore_id : str + Full resource name, e.g. + ``projects/my-project/locations/global/collections/default_collection/dataStores/my-store``. + + Returns + ------- + str or None + The project ID, or None if the resource name is not in the expected format. + """ + parts = datastore_id.split("/") + if len(parts) >= 2 and parts[0] == "projects": + return parts[1] + return None + + +def _extract_datastore_sources(response: Any) -> list[dict[str, str]]: + """Extract grounding sources from a Vertex AI Search grounded response. + + Vertex AI Search returns ``retrieved_context`` chunks (not ``web`` chunks). + Each chunk has a ``uri`` (GCS path or document resource name) and an + optional ``title``. + + Parameters + ---------- + response : Any + The Gemini API response object from a Vertex AI Search grounded call. + + Returns + ------- + list[dict[str, str]] + List of source dictionaries with ``'title'`` and ``'uri'`` keys. + Sources with an empty URI are excluded. + """ + sources: list[dict[str, str]] = [] + if not response.candidates: + return sources + + gm = getattr(response.candidates[0], "grounding_metadata", None) + if not gm or not hasattr(gm, "grounding_chunks") or not gm.grounding_chunks: + return sources + + for chunk in gm.grounding_chunks: + rc = getattr(chunk, "retrieved_context", None) + if rc: + # Vertex AI Search returns 'document_name' (full resource path), not 'uri' + uri = getattr(rc, "document_name", "") or "" + title = getattr(rc, "title", "") or "" + if uri: + sources.append({"title": title, "uri": uri}) + + return sources + + +async def _vertex_search_async( + query: str, + model: str, + datastore_id: str, + location: str, + temperature: float = 1.0, +) -> dict[str, Any]: + """Query a Vertex AI Search data store with grounding enabled. + + Parameters + ---------- + query : str + The search query. + model : str + The Gemini model to use (accessed via the Vertex AI endpoint). + datastore_id : str + Full resource name of the Vertex AI Search data store. + location : str + GCP region for the Vertex AI model call (e.g. ``'us-central1'``). + This is the *compute* region and may differ from the data store's + ``global`` location. + temperature : float, default=1.0 + Temperature for generation. + + Returns + ------- + dict + Search results with the following keys: + + - **status** (str): ``"success"`` or ``"error"`` + - **summary** (str): Grounded text answer drawn from the data store + - **sources** (list[dict]): Each entry has: + - **title** (str): Document title + - **uri** (str): GCS path or Vertex AI document resource name + - **source_count** (int): Number of sources cited (success case only) + - **error** (str): Error message (error case only) + """ + project = _parse_project_from_datastore_id(datastore_id) + client = Client(vertexai=True, project=project, location=location) + try: + response = client.models.generate_content( + model=model, + contents=query, + config=types.GenerateContentConfig( + tools=[ + types.Tool(retrieval=types.Retrieval(vertex_ai_search=types.VertexAISearch(datastore=datastore_id))) + ], + temperature=temperature, + ), + ) + + summary = "" + if response.candidates and response.candidates[0].content and response.candidates[0].content.parts: + for part in response.candidates[0].content.parts: + if hasattr(part, "text") and part.text: + summary += part.text + + sources = _extract_datastore_sources(response) + return { + "status": "success", + "summary": summary, + "sources": sources, + "source_count": len(sources), + } + + except Exception as e: + logger.exception("Vertex AI Search failed: %s", e) + return { + "status": "error", + "error": str(e), + "summary": "", + "sources": [], + } + finally: + client.close() + + +async def vertex_search(query: str, model: str | None = None) -> dict[str, Any]: + """Search the custom knowledge base and return grounded results with citations. + + Use this tool to find information from internal documents and knowledge bases. + Results are grounded directly from retrieved document content — the summary + is more reliable than web search snippets and no separate fetch step is needed. + + Authentication uses Application Default Credentials (ADC) — no API key is + required. On GCE/Coder workspaces the attached service account is used + automatically. + + Parameters + ---------- + query : str + The search query. Be specific and include key terms. + model : str, optional + The Gemini model to use. Defaults to ``config.default_worker_model``. + + Returns + ------- + dict + Search results with the following keys: + + - **status** (str): ``"success"`` or ``"error"`` + - **summary** (str): Grounded answer from the knowledge base + - **sources** (list[dict]): Each with ``'title'`` and ``'uri'`` + - **source_count** (int): Number of sources cited (success case only) + - **error** (str): Error message (error case only) + + Raises + ------ + ValueError + If ``VERTEX_AI_DATASTORE_ID`` is not set in config. + + Examples + -------- + >>> result = await vertex_search("What is the company leave policy?") + >>> print(result["summary"]) + >>> for source in result["sources"]: + ... print(f"{source['title']}: {source['uri']}") + """ + config = Configs() # type: ignore[call-arg] + if not config.vertex_datastore_id: + raise ValueError( + "VERTEX_AI_DATASTORE_ID must be set to use vertex_search. " + "Set it in your .env file or as an environment variable." + ) + if model is None: + model = config.default_worker_model + + return await _vertex_search_async( + query, + model=model, + datastore_id=config.vertex_datastore_id, + location=config.google_cloud_location, + temperature=config.default_temperature, + ) + + +def create_vertex_search_tool(config: Configs | None = None) -> FunctionTool: + """Create a search tool backed by a custom Vertex AI Search data store. + + Authentication uses Application Default Credentials (ADC) — no API key is + needed. On GCE/Coder workspaces the attached service account handles auth + automatically. + + Parameters + ---------- + config : Configs, optional + Configuration settings. If not provided, creates default config. + Must have ``vertex_datastore_id`` set. + + Returns + ------- + FunctionTool + An ADK-compatible tool that returns grounded summaries with citations. + + Raises + ------ + ValueError + If ``VERTEX_AI_DATASTORE_ID`` is not set in config. + + Examples + -------- + >>> from aieng.agent_evals.tools import create_vertex_search_tool + >>> tool = create_vertex_search_tool() + >>> agent = Agent(tools=[tool]) + """ + if config is None: + config = Configs() # type: ignore[call-arg] + + if not config.vertex_datastore_id: + raise ValueError( + "VERTEX_AI_DATASTORE_ID must be set to use create_vertex_search_tool. " + "Set it in your .env file or as an environment variable." + ) + + return FunctionTool(func=vertex_search) diff --git a/ABB-Manual-Assistant/utils/trees.py b/ABB-Manual-Assistant/utils/trees.py new file mode 100644 index 00000000..e5dade0d --- /dev/null +++ b/ABB-Manual-Assistant/utils/trees.py @@ -0,0 +1,24 @@ +"""Utils for handling nested dict.""" + +from typing import Any, Callable, TypeVar + + +Tree = TypeVar("Tree", bound=dict) + + +def tree_filter( + data: Tree, + criteria_fn: Callable[[Any], bool] = lambda x: x is not None, +) -> Tree: + """Keep only leaves for which criteria is True. + + Filters out None leaves if criteria is not specified. + """ + output: Tree = {} # type: ignore[reportAssignType] + for k, v in data.items(): + if isinstance(v, dict): + output[k] = tree_filter(v, criteria_fn=criteria_fn) + elif criteria_fn(v): + output[k] = v + + return output diff --git a/ABB-Manual-Assistant/workorder_agent.py b/ABB-Manual-Assistant/workorder_agent.py new file mode 100644 index 00000000..5479c4d9 --- /dev/null +++ b/ABB-Manual-Assistant/workorder_agent.py @@ -0,0 +1,35 @@ +import os + +import agents +from dotenv import load_dotenv +from openai import AsyncOpenAI + + +load_dotenv() + + +class WorkorderAgent: + def __init__(self): + self.client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"), base_url=os.getenv("OPENAI_BASE_URL")) + + self.workorder_agent = agents.Agent( + name="Workorder Agent", + instructions=""" + Given a conversation with an ABB robot manual assistant agent you should create a workorder that states: + + 1) Workorder Title: A descriptive title for the workorder. + 2) Error/Issue: the error or issue that occured. + 3) Work completed: the work/action that the user has taken or will take to resolve the error. + + Only use information from the user's conversation with the ABB robot assistant agent to complete the workorder. + Your response should include only the created workorder and nothing else. Provide as much detail as possible. + """, + model=agents.OpenAIChatCompletionsModel( + model="gemini-2.5-flash", openai_client=self.client + ), + model_settings=agents.ModelSettings(temperature=0.5), + ) + + async def run(self, prompt: str) -> str: + response = await agents.Runner.run(self.workorder_agent, input=prompt) + return response