diff --git a/.changeset/clever-bears-accept.md b/.changeset/clever-bears-accept.md new file mode 100644 index 00000000..80474ba7 --- /dev/null +++ b/.changeset/clever-bears-accept.md @@ -0,0 +1,5 @@ +--- +'@e2b/code-interpreter-template': patch +--- + +interrupt kernel execution on client disconnect diff --git a/js/tests/interrupt.test.ts b/js/tests/interrupt.test.ts new file mode 100644 index 00000000..be4084b0 --- /dev/null +++ b/js/tests/interrupt.test.ts @@ -0,0 +1,25 @@ +import { expect } from 'vitest' + +import { sandboxTest, wait } from './setup' + +sandboxTest( + 'subsequent execution works after client timeout', + async ({ sandbox }) => { + // Start a long-running execution with a short timeout. + // This simulates a client disconnect: the SDK aborts the connection, + // which should trigger the server to interrupt the kernel (#213). + await expect( + sandbox.runCode('import time; time.sleep(300)', { timeoutMs: 3_000 }) + ).rejects.toThrow() + + // Wait for the server to detect the disconnect (via keepalive write + // failure) and interrupt the kernel. + await wait(5_000) + + // Run a simple execution. Without the kernel interrupt fix, this would + // block behind the still-running sleep(30) and time out. + const result = await sandbox.runCode('1 + 1', { timeoutMs: 10_000 }) + expect(result.text).toEqual('2') + }, + 60_000 +) diff --git a/python/tests/async/test_async_interrupt.py b/python/tests/async/test_async_interrupt.py new file mode 100644 index 00000000..ad8a9a0e --- /dev/null +++ b/python/tests/async/test_async_interrupt.py @@ -0,0 +1,25 @@ +import asyncio + +import pytest + +from e2b import TimeoutException +from e2b_code_interpreter.code_interpreter_async import AsyncSandbox + + +async def test_subsequent_execution_works_after_client_timeout( + async_sandbox: AsyncSandbox, +): + # Start a long-running execution with a short timeout. + # This simulates a client disconnect: the SDK closes the connection, + # which should trigger the server to interrupt the kernel (#213). + with pytest.raises(TimeoutException): + await async_sandbox.run_code("import time; time.sleep(300)", timeout=3) + + # Wait for the server to detect the disconnect (via keepalive write + # failure) and interrupt the kernel. + await asyncio.sleep(5) + + # Run a simple execution. Without the kernel interrupt fix, this would + # block behind the still-running sleep(30) and time out. + result = await async_sandbox.run_code("1 + 1", timeout=10) + assert result.text == "2" diff --git a/python/tests/sync/test_interrupt.py b/python/tests/sync/test_interrupt.py new file mode 100644 index 00000000..d4c5ff15 --- /dev/null +++ b/python/tests/sync/test_interrupt.py @@ -0,0 +1,23 @@ +import time + +import pytest + +from e2b import TimeoutException +from e2b_code_interpreter.code_interpreter_sync import Sandbox + + +def test_subsequent_execution_works_after_client_timeout(sandbox: Sandbox): + # Start a long-running execution with a short timeout. + # This simulates a client disconnect: the SDK closes the connection, + # which should trigger the server to interrupt the kernel (#213). + with pytest.raises(TimeoutException): + sandbox.run_code("import time; time.sleep(300)", timeout=3) + + # Wait for the server to detect the disconnect (via keepalive write + # failure) and interrupt the kernel. + time.sleep(5) + + # Run a simple execution. Without the kernel interrupt fix, this would + # block behind the still-running sleep(30) and time out. + result = sandbox.run_code("1 + 1", timeout=10) + assert result.text == "2" diff --git a/template/server/messaging.py b/template/server/messaging.py index 0b151f8e..d2492797 100644 --- a/template/server/messaging.py +++ b/template/server/messaging.py @@ -4,6 +4,8 @@ import uuid import asyncio +import httpx + from asyncio import Queue from typing import ( Dict, @@ -26,6 +28,7 @@ OutputType, UnexpectedEndOfExecution, ) +from consts import JUPYTER_BASE_URL from errors import ExecutionError from envs import get_envs @@ -33,6 +36,7 @@ MAX_RECONNECT_RETRIES = 3 PING_TIMEOUT = 30 +KEEPALIVE_INTERVAL = 5 # seconds between keepalive pings during streaming class Execution: @@ -97,6 +101,22 @@ async def connect(self): name="receive_message", ) + async def interrupt(self): + """Interrupt the current kernel execution via the Jupyter REST API.""" + try: + async with httpx.AsyncClient() as client: + response = await client.post( + f"{JUPYTER_BASE_URL}/api/kernels/{self.context_id}/interrupt" + ) + if response.is_success: + logger.info(f"Kernel {self.context_id} interrupted successfully") + else: + logger.error( + f"Failed to interrupt kernel {self.context_id}: {response.status_code}" + ) + except Exception as e: + logger.error(f"Error interrupting kernel {self.context_id}: {e}") + def _get_execute_request( self, msg_id: str, code: Union[str, StrictStr], background: bool ) -> str: @@ -238,8 +258,24 @@ async def _cleanup_env_vars(self, env_vars: Dict[StrictStr, str]): async def _wait_for_result(self, message_id: str): queue = self._executions[message_id].queue + # Use a timeout on queue.get() to periodically send keepalives. + # Without keepalives, the generator blocks indefinitely waiting for + # kernel output. If the client silently disappears (e.g. network + # failure), uvicorn can only detect the broken connection when it + # tries to write — so we force a write every KEEPALIVE_INTERVAL + # seconds. This ensures timely disconnect detection and kernel + # interrupt for abandoned executions (see #213). while True: - output = await queue.get() + try: + output = await asyncio.wait_for(queue.get(), timeout=KEEPALIVE_INTERVAL) + except asyncio.TimeoutError: + # Yield a keepalive so Starlette writes to the socket. + # If the client has disconnected, the write fails and + # uvicorn delivers http.disconnect, which cancels this + # generator via CancelledError. + yield {"type": "keepalive"} + continue + if output.type == OutputType.END_OF_EXECUTION: break @@ -362,11 +398,26 @@ async def execute( ) await execution.queue.put(UnexpectedEndOfExecution()) - # Stream the results - async for item in self._wait_for_result(message_id): - yield item - - del self._executions[message_id] + # Stream the results. + # If the client disconnects (Starlette cancels the task), we + # interrupt the kernel so the next execution isn't blocked (#213). + try: + async for item in self._wait_for_result(message_id): + yield item + except (asyncio.CancelledError, GeneratorExit): + logger.warning( + f"Client disconnected during execution ({message_id}), interrupting kernel" + ) + # Shield the interrupt from the ongoing cancellation so + # the HTTP request to the kernel actually completes. + try: + await asyncio.shield(self.interrupt()) + except asyncio.CancelledError: + pass + raise + finally: + if message_id in self._executions: + del self._executions[message_id] # Clean up env vars in a separate request after the main code has run if env_vars: