Skip to content
25 changes: 24 additions & 1 deletion MIGRATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,7 @@ doc2 = Document(content="Berlin is the capital of Germany.", meta={"lang": "en",
assert doc1.id == doc2.id
```

It is possible to migrate an existing index without rerunning your indexing pipeline, for example to avoid recalculating embeddings. To do that, read stored documents, regenerate their IDs using Haystack 3.0, write the updated documents, and delete the documents stored under their old IDs.
It is possible to migrate an existing index without rerunning your indexing pipeline, for example to avoid recalculating embeddings. To do that, read stored documents, regenerate their IDs using Haystack 3.0, write the updated documents, and delete the documents stored under their old IDs.

```python
from dataclasses import replace
Expand Down Expand Up @@ -870,3 +870,26 @@ store.write_documents(new_documents, policy=DuplicatePolicy.OVERWRITE)
new_ids = {doc.id for doc in new_documents}
store.delete_documents([doc.id for doc in old_documents if doc.id not in new_ids])
```

### Components now resolve API keys at warm-up

**What changed:** Components that use external services now create their resources (such as API clients) during `warm_up()` instead of in `__init__`. As a consequence, a missing API key (for example, an unset environment variable behind a `Secret.from_env_var` default) is now reported at warm-up or first run rather than at construction. This affects OpenAI and Azure OpenAI components.

**Why:** Creating resources in `warm_up()` / `warm_up_async()` and releasing them in `close()` / `close_async()` gives components and pipelines a single, predictable resource lifecycle.

**How to migrate:** If you relied on construction failing for a missing API key, expect the same error at `warm_up()` (or the first `run`) instead.

Before (v2.x), with `OPENAI_API_KEY` unset:
```python
from haystack.components.embedders import OpenAITextEmbedder

embedder = OpenAITextEmbedder() # raised here
```

After (v3.0), with `OPENAI_API_KEY` unset:
```python
from haystack.components.embedders import OpenAITextEmbedder

embedder = OpenAITextEmbedder() # no error at construction
embedder.warm_up() # raised here
```
46 changes: 33 additions & 13 deletions haystack/components/agents/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ def __init__(
self.tool_concurrency_limit = tool_concurrency_limit
self.tool_streaming_callback_passthrough = tool_streaming_callback_passthrough
self._confirmation_strategies = confirmation_strategies or {}
self._is_warmed_up = False
self._tools_warmed_up = False

# --- State schema ---
# shallow copy is sufficient: we only add a top-level "messages" key, never mutate nested values
Expand Down Expand Up @@ -574,16 +574,38 @@ def _register_prompt_variables(self) -> None:
else:
component.set_input_type(self, name=var_name, type=Any, default=None)

def warm_up(self) -> None:
"""
Warm up the Agent.
"""
if not self._is_warmed_up:
if hasattr(self.chat_generator, "warm_up"):
self.chat_generator.warm_up()
def _warm_up_tools(self) -> None:
"""Warm up the configured tools once."""
if not self._tools_warmed_up:
if self.tools:
warm_up_tools(self.tools)
Comment thread
sjrl marked this conversation as resolved.
self._is_warmed_up = True
self._tools_warmed_up = True

def warm_up(self) -> None:
"""Warm up the tools and the underlying chat generator."""
self._warm_up_tools()
if hasattr(self.chat_generator, "warm_up"):
self.chat_generator.warm_up()

async def warm_up_async(self) -> None:
"""Warm up the tools and the underlying chat generator on the serving event loop."""
self._warm_up_tools()
if hasattr(self.chat_generator, "warm_up_async"):
await self.chat_generator.warm_up_async()
elif hasattr(self.chat_generator, "warm_up"):
self.chat_generator.warm_up()

def close(self) -> None:
"""Release the underlying chat generator's resources."""
if hasattr(self.chat_generator, "close"):
self.chat_generator.close()

async def close_async(self) -> None:
"""Release the underlying chat generator's async resources."""
if hasattr(self.chat_generator, "close_async"):
await self.chat_generator.close_async()
elif hasattr(self.chat_generator, "close"):
self.chat_generator.close()

def to_dict(self) -> dict[str, Any]:
"""
Expand Down Expand Up @@ -828,8 +850,7 @@ def run(
- Any additional keys defined in the `state_schema`.
"""
agent_inputs = {"messages": messages, "streaming_callback": streaming_callback, **kwargs}
if not self._is_warmed_up:
self.warm_up()
self.warm_up()

exe_context = self._initialize_fresh_execution(
messages=messages,
Expand Down Expand Up @@ -903,8 +924,7 @@ async def run_async(
- Any additional keys defined in the `state_schema`.
"""
agent_inputs = {"messages": messages, "streaming_callback": streaming_callback, **kwargs}
if not self._is_warmed_up:
self.warm_up()
await self.warm_up_async()

exe_context = self._initialize_fresh_execution(
messages=messages,
Expand Down
60 changes: 48 additions & 12 deletions haystack/components/audio/whisper_remote.py
Comment thread
anakin87 marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -98,18 +98,50 @@ def __init__(
)
whisper_params["response_format"] = "json"
self.whisper_params = whisper_params
self.client = OpenAI(
api_key=api_key.resolve_value(),
organization=organization,
base_url=api_base_url,
http_client=init_http_client(self.http_client_kwargs, async_client=False),
)
self.async_client = AsyncOpenAI(
api_key=api_key.resolve_value(),
organization=organization,
base_url=api_base_url,
http_client=init_http_client(self.http_client_kwargs, async_client=True),
)

self.client: OpenAI | None = None
self.async_client: AsyncOpenAI | None = None

def _client_kwargs(self) -> dict[str, Any]:
return {
"api_key": self.api_key.resolve_value(),
"organization": self.organization,
"base_url": self.api_base_url,
}

def warm_up(self) -> None:
"""
Initializes the synchronous OpenAI client.
"""
if self.client is None:
self.client = OpenAI(
http_client=init_http_client(self.http_client_kwargs, async_client=False), **self._client_kwargs()
)

async def warm_up_async(self) -> None: # noqa: RUF029
"""
Initializes the asynchronous OpenAI client on the serving event loop.
"""
if self.async_client is None:
self.async_client = AsyncOpenAI(
http_client=init_http_client(self.http_client_kwargs, async_client=True), **self._client_kwargs()
)

def close(self) -> None:
"""
Releases the synchronous OpenAI client.
"""
if self.client is not None:
self.client.close()
self.client = None

async def close_async(self) -> None:
"""
Releases the asynchronous OpenAI client.
"""
if self.async_client is not None:
await self.async_client.close()
self.async_client = None

def to_dict(self) -> dict[str, Any]:
"""
Expand Down Expand Up @@ -152,6 +184,7 @@ def run(self, sources: list[str | Path | ByteStream]) -> dict[str, Any]:
- `documents`: A list of documents, one document for each file.
The content of each document is the transcribed text.
"""
self.warm_up()
documents = []

for source in sources:
Expand All @@ -163,6 +196,7 @@ def run(self, sources: list[str | Path | ByteStream]) -> dict[str, Any]:
file = io.BytesIO(source.data)
file.name = str(source.meta["file_path"]) if "file_path" in source.meta else "__fallback__.wav"

assert self.client is not None # mypy: client is built by warm_up above
content = self.client.audio.transcriptions.create(file=file, model=self.model, **self.whisper_params)
doc = Document(content=content.text, meta=source.meta)
documents.append(doc)
Expand All @@ -184,6 +218,7 @@ async def run_async(self, sources: list[str | Path | ByteStream]) -> dict[str, A
- `documents`: A list of documents, one document for each file.
The content of each document is the transcribed text.
"""
await self.warm_up_async()
documents = []

for source in sources:
Expand All @@ -195,6 +230,7 @@ async def run_async(self, sources: list[str | Path | ByteStream]) -> dict[str, A
file = io.BytesIO(source.data)
file.name = str(source.meta["file_path"]) if "file_path" in source.meta else "__fallback__.wav"

assert self.async_client is not None # mypy: async_client is built by warm_up_async above
content = await self.async_client.audio.transcriptions.create(
file=file, model=self.model, **self.whisper_params
)
Expand Down
71 changes: 53 additions & 18 deletions haystack/components/embedders/azure_document_embedder.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,32 +137,67 @@ def __init__( # noqa: PLR0913 (too-many-arguments)
self.progress_bar = progress_bar
self.meta_fields_to_embed = meta_fields_to_embed or []
self.embedding_separator = embedding_separator
self.timeout = timeout if timeout is not None else float(os.environ.get("OPENAI_TIMEOUT", "30.0"))
self.max_retries = max_retries if max_retries is not None else int(os.environ.get("OPENAI_MAX_RETRIES", "5"))
self.timeout = timeout
self.max_retries = max_retries
self.default_headers = default_headers or {}
self.azure_ad_token_provider = azure_ad_token_provider
self.http_client_kwargs = http_client_kwargs
self.raise_on_failure = raise_on_failure

client_args: dict[str, Any] = {
"api_version": api_version,
"azure_endpoint": azure_endpoint,
"azure_deployment": azure_deployment,
"azure_ad_token_provider": azure_ad_token_provider,
"api_key": api_key.resolve_value() if api_key is not None else None,
"azure_ad_token": azure_ad_token.resolve_value() if azure_ad_token is not None else None,
"organization": organization,
"timeout": self.timeout,
"max_retries": self.max_retries,
self.client: AzureOpenAI | None = None
self.async_client: AsyncAzureOpenAI | None = None

def _client_kwargs(self) -> dict[str, Any]:
timeout = self.timeout if self.timeout is not None else float(os.environ.get("OPENAI_TIMEOUT", "30.0"))
max_retries = (
self.max_retries if self.max_retries is not None else int(os.environ.get("OPENAI_MAX_RETRIES", "5"))
)
return {
"api_version": self.api_version,
"azure_endpoint": self.azure_endpoint,
"azure_deployment": self.azure_deployment,
"azure_ad_token_provider": self.azure_ad_token_provider,
"api_key": self.api_key.resolve_value() if self.api_key is not None else None,
"azure_ad_token": self.azure_ad_token.resolve_value() if self.azure_ad_token is not None else None,
"organization": self.organization,
"timeout": timeout,
"max_retries": max_retries,
"default_headers": self.default_headers,
}

self.client = AzureOpenAI(
http_client=init_http_client(self.http_client_kwargs, async_client=False), **client_args
)
self.async_client = AsyncAzureOpenAI(
http_client=init_http_client(self.http_client_kwargs, async_client=True), **client_args
)
def warm_up(self) -> None:
"""
Initializes the synchronous AzureOpenAI client.
"""
if self.client is None:
self.client = AzureOpenAI(
http_client=init_http_client(self.http_client_kwargs, async_client=False), **self._client_kwargs()
)

async def warm_up_async(self) -> None: # noqa: RUF029
"""
Initializes the asynchronous AzureOpenAI client on the serving event loop.
"""
if self.async_client is None:
self.async_client = AsyncAzureOpenAI(
http_client=init_http_client(self.http_client_kwargs, async_client=True), **self._client_kwargs()
)

def close(self) -> None:
"""
Releases the synchronous AzureOpenAI client.
"""
if self.client is not None:
self.client.close()
self.client = None

async def close_async(self) -> None:
"""
Releases the asynchronous AzureOpenAI client.
"""
if self.async_client is not None:
await self.async_client.close()
self.async_client = None

def to_dict(self) -> dict[str, Any]:
"""
Expand Down
71 changes: 53 additions & 18 deletions haystack/components/embedders/azure_text_embedder.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,33 +117,68 @@ def __init__( # noqa: PLR0913
self.model = azure_deployment
self.dimensions = dimensions
self.organization = organization
self.timeout = timeout if timeout is not None else float(os.environ.get("OPENAI_TIMEOUT", "30.0"))
self.max_retries = max_retries if max_retries is not None else int(os.environ.get("OPENAI_MAX_RETRIES", "5"))
self.timeout = timeout
self.max_retries = max_retries
self.prefix = prefix
self.suffix = suffix
self.default_headers = default_headers or {}
self.azure_ad_token_provider = azure_ad_token_provider
self.http_client_kwargs = http_client_kwargs

client_kwargs: dict[str, Any] = {
"api_version": api_version,
"azure_endpoint": azure_endpoint,
"azure_deployment": azure_deployment,
"azure_ad_token_provider": azure_ad_token_provider,
"api_key": api_key.resolve_value() if api_key is not None else None,
"azure_ad_token": azure_ad_token.resolve_value() if azure_ad_token is not None else None,
"organization": organization,
"timeout": self.timeout,
"max_retries": self.max_retries,
self.client: AzureOpenAI | None = None
self.async_client: AsyncAzureOpenAI | None = None

def _client_kwargs(self) -> dict[str, Any]:
timeout = self.timeout if self.timeout is not None else float(os.environ.get("OPENAI_TIMEOUT", "30.0"))
max_retries = (
self.max_retries if self.max_retries is not None else int(os.environ.get("OPENAI_MAX_RETRIES", "5"))
)
return {
"api_version": self.api_version,
"azure_endpoint": self.azure_endpoint,
"azure_deployment": self.azure_deployment,
"azure_ad_token_provider": self.azure_ad_token_provider,
"api_key": self.api_key.resolve_value() if self.api_key is not None else None,
"azure_ad_token": self.azure_ad_token.resolve_value() if self.azure_ad_token is not None else None,
"organization": self.organization,
"timeout": timeout,
"max_retries": max_retries,
"default_headers": self.default_headers,
}

self.client = AzureOpenAI(
http_client=init_http_client(self.http_client_kwargs, async_client=False), **client_kwargs
)
self.async_client = AsyncAzureOpenAI(
http_client=init_http_client(self.http_client_kwargs, async_client=True), **client_kwargs
)
def warm_up(self) -> None:
"""
Initializes the synchronous Azure OpenAI client.
"""
if self.client is None:
self.client = AzureOpenAI(
http_client=init_http_client(self.http_client_kwargs, async_client=False), **self._client_kwargs()
)

async def warm_up_async(self) -> None: # noqa: RUF029
"""
Initializes the asynchronous Azure OpenAI client on the serving event loop.
"""
if self.async_client is None:
self.async_client = AsyncAzureOpenAI(
http_client=init_http_client(self.http_client_kwargs, async_client=True), **self._client_kwargs()
)

def close(self) -> None:
"""
Releases the synchronous Azure OpenAI client.
"""
if self.client is not None:
self.client.close()
self.client = None

async def close_async(self) -> None:
"""
Releases the asynchronous Azure OpenAI client.
"""
if self.async_client is not None:
await self.async_client.close()
self.async_client = None

def to_dict(self) -> dict[str, Any]:
"""
Expand Down
Loading
Loading