Server-Sent Events (SSE) Pattern
Test agents that stream responses using Server-Sent Events format. SSE uses data:
prefixed lines and ends with data: [DONE]
. See Blackbox Testing for the testing philosophy.
When to Use
- OpenAI-style streaming APIs
- EventSource-compatible endpoints
- Legacy systems using SSE
- Specific frameworks that chose SSE over chunked transfer
SSE Format
Server-Sent Events send data in this format:
data: {"content": "Hello"}
data: {"content": " world"}
data: [DONE]
Each line starts with data:
, followed by JSON or the completion marker [DONE]
.
Complete Working Example
This example demonstrates:
- Creating a server that streams in SSE format with real LLM
- Building an adapter that parses SSE protocol
- Proper line buffering for incomplete chunks
- Handling the
[DONE]
completion marker
test_testing_remote_agents_sse
# Source: https://github.com/langwatch/scenario/blob/main/python/examples/test_testing_remote_agents_sse.py
"""
Example: Testing an agent that streams OpenAI responses via SSE
The handler forwards OpenAI's native chunk format directly.
The adapter parses the SSE stream and extracts content from OpenAI chunks.
"""
import asyncio
import json
from aiohttp import web
import aiohttp
import pytest
import pytest_asyncio
import scenario
from openai import AsyncOpenAI
# Base URL for the test server (set during server startup)
base_url = ""
class SSEAgentAdapter(scenario.AgentAdapter):
"""
Adapter for testing agents that stream OpenAI responses via SSE.
Parses SSE stream, extracts content from OpenAI chunk format, and returns complete response.
"""
async def call(self, input: scenario.AgentInput) -> scenario.AgentReturnTypes:
# Request SSE stream from your agent
async with aiohttp.ClientSession() as session:
async with session.post(
f"{base_url}/chat/sse",
headers={
"Accept": "text/event-stream", # Indicate we expect SSE format
"Content-Type": "application/json",
},
json={"messages": input.messages},
) as response:
full_response = ""
buffer = ""
# Read stream chunk by chunk
async for chunk in response.content.iter_any():
# Decode chunk and add to buffer
buffer += chunk.decode("utf-8")
# Process complete lines
lines = buffer.split("\n")
buffer = (
lines[-1] if lines else ""
) # Keep incomplete line in buffer
# Parse SSE format: "data: {...}\n"
for line in lines[:-1]:
if line.startswith("data: "):
data = line[6:] # Remove "data: " prefix
if data != "[DONE]":
try:
# Parse OpenAI chunk structure
chunk = json.loads(data)
content = (
chunk.get("choices", [{}])[0]
.get("delta", {})
.get("content")
)
if content:
full_response += content
except (json.JSONDecodeError, KeyError, IndexError):
pass
# Return complete response after stream ends
return full_response
# OpenAI client for LLM
client = AsyncOpenAI()
async def sse_handler(request: web.Request) -> web.StreamResponse:
"""
HTTP endpoint that forwards OpenAI streaming chunks in SSE format.
"""
data = await request.json()
messages = data["messages"]
# Determine last user message content
last_msg = messages[-1]
content = last_msg["content"]
if not isinstance(content, str):
content = ""
# Set up SSE response headers
response = web.StreamResponse()
response.headers["Content-Type"] = "text/event-stream"
response.headers["Cache-Control"] = "no-cache"
response.headers["Connection"] = "keep-alive"
await response.prepare(request)
# Stream response using real LLM
stream = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": "You are a helpful weather assistant. Provide brief, friendly responses. Pretend you have access to weather data. Pretend like you have access to a weather API and make up the weather.",
},
{"role": "user", "content": content},
],
temperature=0.7,
stream=True,
)
# Forward OpenAI chunks in SSE format
async for chunk in stream:
chunk_dict = chunk.model_dump()
await response.write(f"data: {json.dumps(chunk_dict)}\n\n".encode("utf-8"))
# Send completion marker
await response.write(b"data: [DONE]\n\n")
await response.write_eof()
return response
@pytest_asyncio.fixture
async def test_server():
"""
Start a test HTTP server before tests and shut it down after.
This server simulates a deployed agent endpoint with SSE format.
"""
global base_url
# Create web application
app = web.Application()
app.router.add_post("/chat/sse", sse_handler)
# Start server on random available port
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, "localhost", 0)
await site.start()
# Get the actual port assigned
server = site._server
assert server is not None
port = server.sockets[0].getsockname()[1] # type: ignore[union-attr]
base_url = f"http://localhost:{port}"
yield
# Cleanup: stop server
await runner.cleanup()
@pytest.mark.asyncio
async def test_sse_response(test_server):
"""
Test agent that streams OpenAI responses via SSE.
Verifies adapter parses OpenAI chunks and extracts complete response.
"""
result = await scenario.run(
name="SSE weather response",
description="User asks about weather and receives SSE-formatted stream",
agents=[
scenario.UserSimulatorAgent(model="openai/gpt-4o-mini"),
SSEAgentAdapter(),
scenario.JudgeAgent(
model="openai/gpt-4o-mini",
criteria=[
"Agent should provide weather information",
"Response should be complete and coherent",
],
),
],
script=[
scenario.user("What's the weather like in Tokyo today?"),
scenario.agent(),
scenario.judge(),
],
set_id="python-examples",
)
assert result.success
Key Points
- SSE Headers:
Content-Type: text/event-stream
andCache-Control: no-cache
- Line buffering: Handle incomplete lines across chunks
- Parse
data:
prefix: Extract JSON after the prefix - [DONE] marker: Signals end of stream
- Error handling: Skip malformed JSON lines gracefully
Adapter Pattern
The adapter parses SSE format with proper buffering:
const sseAdapter: AgentAdapter = {
role: AgentRole.AGENT,
call: async (input) => {
const response = await fetch(`${baseUrl}/chat/sse`, {
method: "POST",
headers: {
Accept: "text/event-stream", // Request SSE format
"Content-Type": "application/json",
},
body: JSON.stringify({ messages: input.messages }),
});
let fullResponse = "";
const reader = response.body?.getReader();
const decoder = new TextDecoder();
let buffer = ""; // Buffer for incomplete lines
if (reader) {
while (true) {
const { done, value } = await reader.read();
if (done) break;
// Add to buffer
buffer += decoder.decode(value, { stream: true });
// Process complete lines
const lines = buffer.split("\n");
buffer = lines.pop() || ""; // Keep incomplete line
for (const line of lines) {
if (line.startsWith("data: ")) {
const data = line.slice(6);
if (data !== "[DONE]") {
try {
fullResponse += JSON.parse(data).content;
} catch (e) {
// Skip malformed JSON
}
}
}
}
}
}
return fullResponse;
},
};
Line Buffering
SSE chunks may split lines mid-way. The adapter handles this by:
- Maintaining a buffer: Incomplete lines stay in buffer
- Splitting on newlines: Process complete lines
- Keeping last line: Pop() leaves incomplete line for next chunk
Without buffering, you might get:
Chunk 1: "data: {\"content\""
Chunk 2: ": \"Hello\"}\n"
With buffering:
Buffer after chunk 1: "data: {\"content\": \"Hello\"}\n"
Process after chunk 2: Complete line parsed correctly
Testing Your Own SSE Agent
const mySSEAdapter: AgentAdapter = {
role: AgentRole.AGENT,
call: async (input) => {
const response = await fetch("https://my-agent.com/sse", {
method: "POST",
headers: { Accept: "text/event-stream" },
body: JSON.stringify({ query: input.messages }),
});
let result = "";
const reader = response.body?.getReader();
const decoder = new TextDecoder();
let buffer = "";
if (reader) {
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() || "";
for (const line of lines) {
if (line.startsWith("data: ")) {
const data = line.slice(6);
if (data !== "[DONE]") {
const parsed = JSON.parse(data);
// Adjust field name to match your API
result += parsed.text || parsed.content || parsed.delta;
}
}
}
}
}
return result;
},
};
See Also
- Streaming Pattern - For chunked transfer encoding without SSE format
- JSON Pattern - For complete non-streaming responses
- Stateful Pattern - For server-side conversation state
- Testing Remote Agents Overview - All HTTP adapter patterns
- Blackbox Testing - Testing philosophy
- Agent Integration - Core adapter interface