Skip to content

Stateful Pattern with Thread ID

Test agents that maintain conversation history server-side using thread identifiers. The adapter sends only the latest message plus a thread ID, and the server looks up the full history. See Blackbox Testing for the testing philosophy.

When to Use

  • Server manages conversation state
  • Multi-device sync (mobile, web, desktop)
  • Reduced bandwidth (only send new messages)
  • Backend controls conversation storage
  • Database-backed chat history

Stateless vs. Stateful

Stateless (Full History)

// Client sends ALL messages every time
POST / chat;
{
  messages: [msg1, msg2, msg3, msg4];
}

Stateful (Thread ID)

// Client sends only NEW message + thread ID
POST /chat
{ message: "latest message", threadId: "thread-123" }
// Server looks up [msg1, msg2, msg3] using threadId

Complete Working Example

This example demonstrates:

  • Server maintaining conversation history in a Map
  • Adapter sending only latest message + thread ID
  • Real LLM using full conversation context
  • Multi-turn scenario showing context retention
test_testing_remote_agents_stateful
# Source: https://github.com/langwatch/scenario/blob/main/python/examples/test_testing_remote_agents_stateful.py
"""
Example: Testing an agent that maintains stateful conversations
 
This test demonstrates handling agents that maintain conversation history server-side
using thread identifiers. The adapter sends only the latest message and thread ID,
while the server maintains the full conversation context.
"""
 
import asyncio
import json
from aiohttp import web
import aiohttp
import pytest
import pytest_asyncio
import scenario
from openai import AsyncOpenAI
from openai.types.chat import ChatCompletionMessageParam
from typing import Dict, List, Any
 
# Base URL for the test server (set during server startup)
base_url = ""
 
 
class StatefulAgentAdapter(scenario.AgentAdapter):
    """
    Adapter for testing stateful agents that maintain server-side conversation history.
 
    This adapter:
    1. Extracts only the latest message (not full history)
    2. Sends the message along with thread ID
    3. Server uses thread ID to look up and maintain full history
    4. Returns the agent's response
    """
 
    async def call(self, input: scenario.AgentInput) -> scenario.AgentReturnTypes:
        # Extract the most recent user message content
        last_message = input.messages[-1]
        content = last_message["content"]  # type: ignore[typeddict-item]
 
        # For this example, we assume content is a string
        if not isinstance(content, str):
            raise ValueError("This example only handles string content")
 
        # Send only new message + thread ID
        async with aiohttp.ClientSession() as session:
            async with session.post(
                f"{base_url}/chat",
                json={
                    "message": content,
                    "threadId": input.thread_id,
                },
            ) as response:
                result = await response.json()
                return result["response"]
 
 
# OpenAI client for LLM
client = AsyncOpenAI()
 
# Server-side conversation storage (in production, use a database)
conversations: Dict[str, List[Any]] = {}
 
 
async def stateful_handler(request: web.Request) -> web.Response:
    """
    HTTP endpoint that maintains conversation history using thread ID.
 
    The server:
    1. Receives only the latest message and thread ID
    2. Looks up full conversation history using thread ID
    3. Generates response with complete context
    4. Stores updated history
    """
    data = await request.json()
    message = data["message"]
    thread_id = data["threadId"]
 
    # Retrieve or initialize conversation history
    history = conversations.get(thread_id, [])
 
    # Add user message to history
    history.append({"role": "user", "content": message})
 
    # Generate response with FULL history
    response = await client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {
                "role": "system",
                "content": "You are a helpful weather assistant. Provide brief, friendly responses. Pretend like you have access to a weather API and make up the weather.",
            },
            *history,  # Include full conversation history
        ],
        temperature=0.7,
    )
 
    assistant_message = response.choices[0].message.content
 
    # Add assistant response to history
    if assistant_message is not None:
        history.append({"role": "assistant", "content": assistant_message})
 
    # Store updated history
    conversations[thread_id] = history
 
    # Return only the new response
    return web.json_response({"response": assistant_message})
 
 
@pytest_asyncio.fixture
async def test_server():
    """
    Start a test HTTP server before tests and shut it down after.
 
    This server simulates a deployed agent endpoint with stateful conversation management.
    """
    global base_url, conversations
 
    # Clear conversations before each test
    conversations.clear()
 
    # Create web application
    app = web.Application()
    app.router.add_post("/chat", stateful_handler)
 
    # Start server on random available port
    runner = web.AppRunner(app)
    await runner.setup()
    site = web.TCPSite(runner, "localhost", 0)
    await site.start()
 
    # Get the actual port assigned
    server = site._server
    assert server is not None
    port = server.sockets[0].getsockname()[1]  # type: ignore[union-attr]
    base_url = f"http://localhost:{port}"
 
    yield
 
    # Cleanup: stop server and clear conversations
    await runner.cleanup()
    conversations.clear()
 
 
@pytest.mark.asyncio
async def test_stateful_conversation(test_server):
    """
    Test agent that maintains conversation state using thread ID.
 
    This test verifies:
    - Adapter sends only latest message + thread ID
    - Server maintains full conversation history
    - Agent remembers context from previous turns
    - Follow-up questions work correctly
    - Multi-turn conversation flows naturally
    """
    result = await scenario.run(
        name="Stateful weather conversation",
        description="Agent remembers previous turns using thread ID",
        agents=[
            scenario.UserSimulatorAgent(model="openai/gpt-4o-mini"),
            StatefulAgentAdapter(),
            scenario.JudgeAgent(
                model="openai/gpt-4o-mini",
                criteria=[
                    "Agent should remember context from message to message",
                    "Agent should provide relevant follow-up information",
                ],
            ),
        ],
        script=[
            scenario.user("What's the weather like in London?"),
            scenario.agent(),
            scenario.user("Is that normal weather here?"),
            scenario.agent(),
            scenario.judge(),
        ],
        set_id="python-examples",
    )
 
    assert result.success

Key Points

  1. Server stores history: Uses thread ID to maintain conversation state
  2. Adapter sends minimal data: Only latest message + thread ID
  3. LLM gets full context: Server provides complete history to LLM
  4. Multi-turn context: Follow-up questions work because server remembers
  5. Thread management: Server creates/updates history per thread ID

Adapter Pattern

The adapter extracts latest message and includes thread ID:

const statefulAdapter: AgentAdapter = {
  role: AgentRole.AGENT,
  call: async (input) => {
    // Extract only the latest message
    const lastMessage = input.messages[input.messages.length - 1];
    const content =
      typeof lastMessage.content === "string"
        ? lastMessage.content
        : (lastMessage.content.find((part) => part.type === "text") as any)
            ?.text || "";
 
    // Send only new message + thread ID
    const response = await fetch(`${baseUrl}/chat`, {
      method: "POST",
      headers: { "Content-Type": "application/json" },
      body: JSON.stringify({
        message: content, // Only latest message
        threadId: input.threadId, // Server uses this to look up history
      }),
    });
 
    return (await response.json()).response;
  },
};

Server Implementation Pattern

The server maintains history by thread ID:

// Server-side conversation storage
const conversations = new Map<string, CoreMessage[]>();
 
app.post("/chat", async (req, res) => {
  const { message, threadId } = req.body;
 
  // Retrieve or initialize conversation history
  let history = conversations.get(threadId) || [];
 
  // Add user message
  history.push({ role: "user", content: message });
 
  // Generate response with FULL history
  const result = await generateText({
    model: openai("gpt-4o-mini"),
    messages: [systemMessage, ...history],
  });
 
  // Add assistant response
  history.push({ role: "assistant", content: result.text });
 
  // Store updated history
  conversations.set(threadId, history);
 
  res.json({ response: result.text });
});

Testing Context Retention

Stateful tests should verify the server maintains context. Use scripted simulations to control the conversation flow:

const result = await scenario.run({
  agents: [
    scenario.userSimulatorAgent(),
    statefulAdapter,
    scenario.judgeAgent({
      criteria: [
        "Agent should provide initial information",
        "Agent should answer follow-up with context from previous answer",
        "Agent should maintain conversational continuity",
      ],
    }),
  ],
  script: [
    scenario.user("Tell me about Paris"),
    scenario.agent(),
    scenario.user("How long should I stay there?"), // Requires context!
    scenario.agent(),
    scenario.judge(),
  ],
});

The second question ("How long should I stay there?") only makes sense with context from the first answer.

Testing Your Own Stateful Agent

const myStatefulAdapter: AgentAdapter = {
  role: AgentRole.AGENT,
  call: async (input) => {
    const lastMessage = input.messages[input.messages.length - 1];
    const content =
      typeof lastMessage.content === "string"
        ? lastMessage.content
        : lastMessage.content[0].text;
 
    const response = await fetch("https://my-agent.com/chat", {
      method: "POST",
      headers: {
        "Content-Type": "application/json",
        Authorization: `Bearer ${process.env.API_KEY}`,
      },
      body: JSON.stringify({
        message: content,
        sessionId: input.threadId, // Your API might call it sessionId, conversationId, etc.
      }),
    });
 
    return (await response.json()).reply;
  },
};

Production Considerations

Database Storage

Instead of in-memory Map, use a database:

// Get history from database
const history = await db.conversations.find({ threadId });
 
// Save updated history
await db.conversations.update({ threadId }, { messages: newHistory });

Cleanup

Implement conversation expiry:

// Delete old conversations
await db.conversations.deleteMany({
  updatedAt: { $lt: Date.now() - 7 * 24 * 60 * 60 * 1000 }, // 7 days
});

Authentication

Associate threads with users:

const { message, threadId } = req.body;
const userId = req.user.id; // From auth middleware
 
// Ensure thread belongs to user
const conversation = await db.conversations.findOne({
  threadId,
  userId,
});

See Also