Stateful Pattern with Thread ID
Test agents that maintain conversation history server-side using thread identifiers. The adapter sends only the latest message plus a thread ID, and the server looks up the full history. See Blackbox Testing for the testing philosophy.
When to Use
- Server manages conversation state
- Multi-device sync (mobile, web, desktop)
- Reduced bandwidth (only send new messages)
- Backend controls conversation storage
- Database-backed chat history
Stateless vs. Stateful
Stateless (Full History)
// Client sends ALL messages every time
POST / chat;
{
messages: [msg1, msg2, msg3, msg4];
}
Stateful (Thread ID)
// Client sends only NEW message + thread ID
POST /chat
{ message: "latest message", threadId: "thread-123" }
// Server looks up [msg1, msg2, msg3] using threadId
Complete Working Example
This example demonstrates:
- Server maintaining conversation history in a
Map
- Adapter sending only latest message + thread ID
- Real LLM using full conversation context
- Multi-turn scenario showing context retention
test_testing_remote_agents_stateful
# Source: https://github.com/langwatch/scenario/blob/main/python/examples/test_testing_remote_agents_stateful.py
"""
Example: Testing an agent that maintains stateful conversations
This test demonstrates handling agents that maintain conversation history server-side
using thread identifiers. The adapter sends only the latest message and thread ID,
while the server maintains the full conversation context.
"""
import asyncio
import json
from aiohttp import web
import aiohttp
import pytest
import pytest_asyncio
import scenario
from openai import AsyncOpenAI
from openai.types.chat import ChatCompletionMessageParam
from typing import Dict, List, Any
# Base URL for the test server (set during server startup)
base_url = ""
class StatefulAgentAdapter(scenario.AgentAdapter):
"""
Adapter for testing stateful agents that maintain server-side conversation history.
This adapter:
1. Extracts only the latest message (not full history)
2. Sends the message along with thread ID
3. Server uses thread ID to look up and maintain full history
4. Returns the agent's response
"""
async def call(self, input: scenario.AgentInput) -> scenario.AgentReturnTypes:
# Extract the most recent user message content
last_message = input.messages[-1]
content = last_message["content"] # type: ignore[typeddict-item]
# For this example, we assume content is a string
if not isinstance(content, str):
raise ValueError("This example only handles string content")
# Send only new message + thread ID
async with aiohttp.ClientSession() as session:
async with session.post(
f"{base_url}/chat",
json={
"message": content,
"threadId": input.thread_id,
},
) as response:
result = await response.json()
return result["response"]
# OpenAI client for LLM
client = AsyncOpenAI()
# Server-side conversation storage (in production, use a database)
conversations: Dict[str, List[Any]] = {}
async def stateful_handler(request: web.Request) -> web.Response:
"""
HTTP endpoint that maintains conversation history using thread ID.
The server:
1. Receives only the latest message and thread ID
2. Looks up full conversation history using thread ID
3. Generates response with complete context
4. Stores updated history
"""
data = await request.json()
message = data["message"]
thread_id = data["threadId"]
# Retrieve or initialize conversation history
history = conversations.get(thread_id, [])
# Add user message to history
history.append({"role": "user", "content": message})
# Generate response with FULL history
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": "You are a helpful weather assistant. Provide brief, friendly responses. Pretend like you have access to a weather API and make up the weather.",
},
*history, # Include full conversation history
],
temperature=0.7,
)
assistant_message = response.choices[0].message.content
# Add assistant response to history
if assistant_message is not None:
history.append({"role": "assistant", "content": assistant_message})
# Store updated history
conversations[thread_id] = history
# Return only the new response
return web.json_response({"response": assistant_message})
@pytest_asyncio.fixture
async def test_server():
"""
Start a test HTTP server before tests and shut it down after.
This server simulates a deployed agent endpoint with stateful conversation management.
"""
global base_url, conversations
# Clear conversations before each test
conversations.clear()
# Create web application
app = web.Application()
app.router.add_post("/chat", stateful_handler)
# Start server on random available port
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, "localhost", 0)
await site.start()
# Get the actual port assigned
server = site._server
assert server is not None
port = server.sockets[0].getsockname()[1] # type: ignore[union-attr]
base_url = f"http://localhost:{port}"
yield
# Cleanup: stop server and clear conversations
await runner.cleanup()
conversations.clear()
@pytest.mark.asyncio
async def test_stateful_conversation(test_server):
"""
Test agent that maintains conversation state using thread ID.
This test verifies:
- Adapter sends only latest message + thread ID
- Server maintains full conversation history
- Agent remembers context from previous turns
- Follow-up questions work correctly
- Multi-turn conversation flows naturally
"""
result = await scenario.run(
name="Stateful weather conversation",
description="Agent remembers previous turns using thread ID",
agents=[
scenario.UserSimulatorAgent(model="openai/gpt-4o-mini"),
StatefulAgentAdapter(),
scenario.JudgeAgent(
model="openai/gpt-4o-mini",
criteria=[
"Agent should remember context from message to message",
"Agent should provide relevant follow-up information",
],
),
],
script=[
scenario.user("What's the weather like in London?"),
scenario.agent(),
scenario.user("Is that normal weather here?"),
scenario.agent(),
scenario.judge(),
],
set_id="python-examples",
)
assert result.success
Key Points
- Server stores history: Uses thread ID to maintain conversation state
- Adapter sends minimal data: Only latest message + thread ID
- LLM gets full context: Server provides complete history to LLM
- Multi-turn context: Follow-up questions work because server remembers
- Thread management: Server creates/updates history per thread ID
Adapter Pattern
The adapter extracts latest message and includes thread ID:
const statefulAdapter: AgentAdapter = {
role: AgentRole.AGENT,
call: async (input) => {
// Extract only the latest message
const lastMessage = input.messages[input.messages.length - 1];
const content =
typeof lastMessage.content === "string"
? lastMessage.content
: (lastMessage.content.find((part) => part.type === "text") as any)
?.text || "";
// Send only new message + thread ID
const response = await fetch(`${baseUrl}/chat`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
message: content, // Only latest message
threadId: input.threadId, // Server uses this to look up history
}),
});
return (await response.json()).response;
},
};
Server Implementation Pattern
The server maintains history by thread ID:
// Server-side conversation storage
const conversations = new Map<string, CoreMessage[]>();
app.post("/chat", async (req, res) => {
const { message, threadId } = req.body;
// Retrieve or initialize conversation history
let history = conversations.get(threadId) || [];
// Add user message
history.push({ role: "user", content: message });
// Generate response with FULL history
const result = await generateText({
model: openai("gpt-4o-mini"),
messages: [systemMessage, ...history],
});
// Add assistant response
history.push({ role: "assistant", content: result.text });
// Store updated history
conversations.set(threadId, history);
res.json({ response: result.text });
});
Testing Context Retention
Stateful tests should verify the server maintains context. Use scripted simulations to control the conversation flow:
const result = await scenario.run({
agents: [
scenario.userSimulatorAgent(),
statefulAdapter,
scenario.judgeAgent({
criteria: [
"Agent should provide initial information",
"Agent should answer follow-up with context from previous answer",
"Agent should maintain conversational continuity",
],
}),
],
script: [
scenario.user("Tell me about Paris"),
scenario.agent(),
scenario.user("How long should I stay there?"), // Requires context!
scenario.agent(),
scenario.judge(),
],
});
The second question ("How long should I stay there?") only makes sense with context from the first answer.
Testing Your Own Stateful Agent
const myStatefulAdapter: AgentAdapter = {
role: AgentRole.AGENT,
call: async (input) => {
const lastMessage = input.messages[input.messages.length - 1];
const content =
typeof lastMessage.content === "string"
? lastMessage.content
: lastMessage.content[0].text;
const response = await fetch("https://my-agent.com/chat", {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${process.env.API_KEY}`,
},
body: JSON.stringify({
message: content,
sessionId: input.threadId, // Your API might call it sessionId, conversationId, etc.
}),
});
return (await response.json()).reply;
},
};
Production Considerations
Database Storage
Instead of in-memory Map
, use a database:
// Get history from database
const history = await db.conversations.find({ threadId });
// Save updated history
await db.conversations.update({ threadId }, { messages: newHistory });
Cleanup
Implement conversation expiry:
// Delete old conversations
await db.conversations.deleteMany({
updatedAt: { $lt: Date.now() - 7 * 24 * 60 * 60 * 1000 }, // 7 days
});
Authentication
Associate threads with users:
const { message, threadId } = req.body;
const userId = req.user.id; // From auth middleware
// Ensure thread belongs to user
const conversation = await db.conversations.findOne({
threadId,
userId,
});
See Also
- JSON Pattern - For stateless agents without thread management
- Streaming Pattern - For chunked transfer encoding
- SSE Pattern - For Server-Sent Events streaming
- Testing Remote Agents Overview - All HTTP adapter patterns
- Domain-Driven TDD - Building stateful systems with TDD
- Blackbox Testing - Testing philosophy
- Agent Integration - Core adapter interface
- Scripted Simulations - Controlling conversation flow