Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 88 additions & 41 deletions src/assets/__tests__/__snapshots__/assets.snapshot.test.ts.snap
Original file line number Diff line number Diff line change
Expand Up @@ -3417,21 +3417,39 @@ agent = Agent(
)


# Session and Runner
async def setup_session_and_runner(user_id, session_id):
ensure_credentials_loaded()
session_service = InMemorySessionService()
session = await session_service.create_session(
# Module-level session service and runner (preserves history across invocations)
_session_service = InMemorySessionService()
_runner = None


def get_or_create_runner():
global _runner
if _runner is None:
ensure_credentials_loaded()
_runner = Runner(
agent=agent,
app_name=APP_NAME,
session_service=_session_service,
)
return _runner


async def get_or_create_session(user_id, session_id):
session = await _session_service.get_session(
app_name=APP_NAME, user_id=user_id, session_id=session_id
)
runner = Runner(agent=agent, app_name=APP_NAME, session_service=session_service)
return session, runner
if session is None:
session = await _session_service.create_session(
app_name=APP_NAME, user_id=user_id, session_id=session_id
)
return session


# Agent Interaction
async def call_agent_async(query, user_id, session_id):
content = types.Content(role="user", parts=[types.Part(text=query)])
session, runner = await setup_session_and_runner(user_id, session_id)
runner = get_or_create_runner()
session = await get_or_create_session(user_id, session_id)
events = runner.run_async(
user_id=user_id, session_id=session.id, new_message=content
)
Expand Down Expand Up @@ -3718,6 +3736,7 @@ exports[`Assets Directory Snapshots > Python framework assets > python/python/ht
from typing import Any

from langchain_core.messages import HumanMessage{{#if hasConfigBundle}}, SystemMessage{{/if}}
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.prebuilt import create_react_agent
from langchain.tools import tool
{{#if hasConfigBundle}}
Expand Down Expand Up @@ -3765,6 +3784,9 @@ def add_numbers(a: int, b: int) -> int:
# Define a collection of tools used by the model
tools = [add_numbers]

# Module-level checkpointer preserves conversation history across invocations
_checkpointer = InMemorySaver()

{{#if sessionStorageMountPath}}
SESSION_STORAGE_PATH = "{{sessionStorageMountPath}}"

Expand Down Expand Up @@ -3857,29 +3879,44 @@ async def invoke(payload, context):
if mcp_client:
mcp_tools = await mcp_client.get_tools()

# Define the agent using create_react_agent
# Define the agent using create_react_agent (checkpointer is shared across invocations)
{{#if hasConfigBundle}}
graph = create_react_agent(get_or_create_model(), tools=mcp_tools + tools, prompt=DEFAULT_SYSTEM_PROMPT)
graph = create_react_agent(
get_or_create_model(),
tools=mcp_tools + tools,
prompt=DEFAULT_SYSTEM_PROMPT,
checkpointer=_checkpointer,
)
callback = ConfigBundleCallback()

# Process the user prompt
prompt = payload.get("prompt", "What can you help me with?")
session_id = getattr(context, "session_id", "default-session")
log.info(f"Agent input: {prompt}")

# Run the agent with config bundle callback
# Run the agent with config bundle callback (checkpointer auto-loads/saves history per session)
result = await graph.ainvoke(
{"messages": [HumanMessage(content=prompt)]},
config={"callbacks": [callback]},
config={"callbacks": [callback], "configurable": {"thread_id": session_id}},
)
{{else}}
graph = create_react_agent(get_or_create_model(), tools=mcp_tools + tools, prompt=DEFAULT_SYSTEM_PROMPT)
graph = create_react_agent(
get_or_create_model(),
tools=mcp_tools + tools,
prompt=DEFAULT_SYSTEM_PROMPT,
checkpointer=_checkpointer,
)

# Process the user prompt
prompt = payload.get("prompt", "What can you help me with?")
session_id = getattr(context, "session_id", "default-session")
log.info(f"Agent input: {prompt}")

# Run the agent
result = await graph.ainvoke({"messages": [HumanMessage(content=prompt)]})
# Run the agent (checkpointer auto-loads/saves history per session)
result = await graph.ainvoke(
{"messages": [HumanMessage(content=prompt)]},
config={"configurable": {"thread_id": session_id}},
)
{{/if}}

# Return result
Expand Down Expand Up @@ -4242,7 +4279,8 @@ Thumbs.db

exports[`Assets Directory Snapshots > Python framework assets > python/python/http/openaiagents/base/main.py should match snapshot 1`] = `
"import os
from agents import Agent, Runner, function_tool
from functools import lru_cache
from agents import Agent, Runner, SQLiteSession, function_tool
from bedrock_agentcore.runtime import BedrockAgentCoreApp
from model.load import load_model
{{#if hasGateway}}
Expand Down Expand Up @@ -4340,8 +4378,17 @@ You have persistent storage at {{sessionStorageMountPath}}. Use file tools to re
{{/if}}
"""


# Caches up to 128 active sessions; LRU eviction silently resets history for
# the oldest session. For production use, replace with a durable session store
# (e.g. SQLiteSession with a file path).
@lru_cache(maxsize=128)
def get_session(session_id):
return SQLiteSession(session_id)


# Define the agent execution
async def main(query):
async def main(query, session):
ensure_credentials_loaded()
try:
{{#if hasGateway}}
Expand All @@ -4353,7 +4400,7 @@ async def main(query):
mcp_servers=mcp_servers,
tools=tools
)
result = await Runner.run(agent, query)
result = await Runner.run(agent, query, session=session)
return result
else:
agent = Agent(
Expand All @@ -4363,7 +4410,7 @@ async def main(query):
mcp_servers=[],
tools=tools
)
result = await Runner.run(agent, query)
result = await Runner.run(agent, query, session=session)
return result
{{else}}
if mcp_servers:
Expand All @@ -4376,7 +4423,7 @@ async def main(query):
mcp_servers=active_servers,
tools=tools
)
result = await Runner.run(agent, query)
result = await Runner.run(agent, query, session=session)
return result
else:
agent = Agent(
Expand All @@ -4386,7 +4433,7 @@ async def main(query):
mcp_servers=[],
tools=tools
)
result = await Runner.run(agent, query)
result = await Runner.run(agent, query, session=session)
return result
{{/if}}
except Exception as e:
Expand All @@ -4400,9 +4447,11 @@ async def invoke(payload, context):

# Process the user prompt
prompt = payload.get("prompt", "What can you help me with?")
session_id = getattr(context, "session_id", "default-session")
session = get_session(session_id)

# Run the agent
result = await main(prompt)
# Run the agent (session automatically loads/saves conversation history)
result = await main(prompt, session)

# Return result
return {"result": result.final_output}
Expand Down Expand Up @@ -4655,7 +4704,8 @@ Thumbs.db"
`;

exports[`Assets Directory Snapshots > Python framework assets > python/python/http/strands/base/main.py should match snapshot 1`] = `
"from typing import Any
"from functools import lru_cache
from typing import Any

from strands import Agent, tool
{{#if hasConfigBundle}}
Expand Down Expand Up @@ -4821,26 +4871,26 @@ def agent_factory():
return get_or_create_agent
get_or_create_agent = agent_factory()
{{else}}
# Caches up to 128 active sessions; LRU eviction silently resets history for
# the oldest session. For production use, replace with a durable session store
# (e.g. Strands FileSessionManager).
{{#if hasConfigBundle}}
def create_agent():
@lru_cache(maxsize=128)
def get_or_create_agent(session_id):
return Agent(
model=load_model(),
system_prompt=DEFAULT_SYSTEM_PROMPT,
tools=tools,
hooks=[ConfigBundleHook()],
)
{{else}}
_agent = None

def get_or_create_agent():
global _agent
if _agent is None:
_agent = Agent(
model=load_model(),
system_prompt=DEFAULT_SYSTEM_PROMPT,
tools=tools
)
return _agent
@lru_cache(maxsize=128)
def get_or_create_agent(session_id):
return Agent(
model=load_model(),
system_prompt=DEFAULT_SYSTEM_PROMPT,
tools=tools
)
{{/if}}
{{/if}}

Expand All @@ -4854,11 +4904,8 @@ async def invoke(payload, context):
user_id = getattr(context, 'user_id', 'default-user')
agent = get_or_create_agent(session_id, user_id)
{{else}}
{{#if hasConfigBundle}}
agent = create_agent()
{{else}}
agent = get_or_create_agent()
{{/if}}
session_id = getattr(context, 'session_id', 'default-session')
agent = get_or_create_agent(session_id)
{{/if}}

# Execute and format response
Expand Down
34 changes: 26 additions & 8 deletions src/assets/python/http/googleadk/base/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,21 +113,39 @@ def ensure_credentials_loaded():
)


# Session and Runner
async def setup_session_and_runner(user_id, session_id):
ensure_credentials_loaded()
session_service = InMemorySessionService()
session = await session_service.create_session(
# Module-level session service and runner (preserves history across invocations)
_session_service = InMemorySessionService()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unbounded in-memory session growth (DoS / OOM in long-lived containers).

InMemorySessionService is a plain in-memory store with no built-in eviction. With this PR it is now process-lifetime scoped, and every distinct (user_id, session_id) invocation creates a new entry that is never freed. An attacker (or a noisy client) can rotate session_ids and grow the process's heap until it OOMs — the runtime container typically has a fixed memory budget, so this is a denial-of-service vector.

The sibling Strands and OpenAIAgents templates in this same PR explicitly cap their session caches with @lru_cache(maxsize=128) and call out LRU eviction in a comment. The googleadk template (and the langchain template, with InMemorySaver()) should apply the same mitigation, or call out durable-store guidance just as prominently. As written, these two templates are noticeably less safe to ship into production than the other two.

_runner = None


def get_or_create_runner():
global _runner
if _runner is None:
ensure_credentials_loaded()
_runner = Runner(
agent=agent,
app_name=APP_NAME,
session_service=_session_service,
)
return _runner


async def get_or_create_session(user_id, session_id):
session = await _session_service.get_session(
app_name=APP_NAME, user_id=user_id, session_id=session_id
)
runner = Runner(agent=agent, app_name=APP_NAME, session_service=session_service)
return session, runner
if session is None:
session = await _session_service.create_session(
app_name=APP_NAME, user_id=user_id, session_id=session_id
)
return session


# Agent Interaction
async def call_agent_async(query, user_id, session_id):
content = types.Content(role="user", parts=[types.Part(text=query)])
session, runner = await setup_session_and_runner(user_id, session_id)
runner = get_or_create_runner()
session = await get_or_create_session(user_id, session_id)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cross-user conversation access via client-supplied user_id.

user_id here flows from the entrypoint, where it is read from payload (caller-controlled): user_id = payload.get("user_id", "default_user"). Before this change that was harmless — every invocation built a fresh InMemorySessionService — but the new module-level _session_service now persists history keyed by (app_name, user_id, session_id) across invocations.

That means any caller can now set user_id to another user's identifier and, with a known/guessable session_id, attach to that user's stored conversation: reading prior messages on the next turn and appending new ones. The Strands template avoids this by sourcing user_id from context (getattr(context, 'user_id', 'default-user')); this template should do the same — read user_id from context, not payload, before it is used to scope persistent session state.

events = runner.run_async(
user_id=user_id, session_id=session.id, new_message=content
)
Expand Down
33 changes: 26 additions & 7 deletions src/assets/python/http/langchain_langgraph/base/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import Any

from langchain_core.messages import HumanMessage{{#if hasConfigBundle}}, SystemMessage{{/if}}
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.prebuilt import create_react_agent
from langchain.tools import tool
{{#if hasConfigBundle}}
Expand Down Expand Up @@ -49,6 +50,9 @@ def add_numbers(a: int, b: int) -> int:
# Define a collection of tools used by the model
tools = [add_numbers]

# Module-level checkpointer preserves conversation history across invocations
_checkpointer = InMemorySaver()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unbounded in-memory checkpoint growth (DoS / OOM in long-lived containers).

InMemorySaver is a process-lifetime, unbounded dict-style store: every distinct thread_id (which this PR derives from context.session_id) creates checkpoint entries that are never evicted. In a long-running runtime container, a client that rotates session_id (or any caller producing many sessions over time) can grow the heap until the process OOMs — a denial-of-service vector against the runtime.

The sibling Strands and OpenAIAgents templates in this same PR explicitly cap their session caches with @lru_cache(maxsize=128) and document the trade-off in a comment. This template should either bound _checkpointer similarly (or wrap session→checkpointer mapping in an LRU), or include the same prominent guidance pointing users at a durable backend (e.g. SqliteSaver/PostgresSaver) before deploying. As written, this template is noticeably less safe than its peers in this PR.


{{#if sessionStorageMountPath}}
SESSION_STORAGE_PATH = "{{sessionStorageMountPath}}"

Expand Down Expand Up @@ -141,29 +145,44 @@ async def invoke(payload, context):
if mcp_client:
mcp_tools = await mcp_client.get_tools()

# Define the agent using create_react_agent
# Define the agent using create_react_agent (checkpointer is shared across invocations)
{{#if hasConfigBundle}}
graph = create_react_agent(get_or_create_model(), tools=mcp_tools + tools, prompt=DEFAULT_SYSTEM_PROMPT)
graph = create_react_agent(
get_or_create_model(),
tools=mcp_tools + tools,
prompt=DEFAULT_SYSTEM_PROMPT,
checkpointer=_checkpointer,
)
callback = ConfigBundleCallback()

# Process the user prompt
prompt = payload.get("prompt", "What can you help me with?")
session_id = getattr(context, "session_id", "default-session")
log.info(f"Agent input: {prompt}")

# Run the agent with config bundle callback
# Run the agent with config bundle callback (checkpointer auto-loads/saves history per session)
result = await graph.ainvoke(
{"messages": [HumanMessage(content=prompt)]},
config={"callbacks": [callback]},
config={"callbacks": [callback], "configurable": {"thread_id": session_id}},
)
{{else}}
graph = create_react_agent(get_or_create_model(), tools=mcp_tools + tools, prompt=DEFAULT_SYSTEM_PROMPT)
graph = create_react_agent(
get_or_create_model(),
tools=mcp_tools + tools,
prompt=DEFAULT_SYSTEM_PROMPT,
checkpointer=_checkpointer,
)

# Process the user prompt
prompt = payload.get("prompt", "What can you help me with?")
session_id = getattr(context, "session_id", "default-session")
log.info(f"Agent input: {prompt}")

# Run the agent
result = await graph.ainvoke({"messages": [HumanMessage(content=prompt)]})
# Run the agent (checkpointer auto-loads/saves history per session)
result = await graph.ainvoke(
{"messages": [HumanMessage(content=prompt)]},
config={"configurable": {"thread_id": session_id}},
)
{{/if}}

# Return result
Expand Down
Loading
Loading