Back to home

03 — AGENT ARCHITECTURE

Event streaming with AgentMux

14 min read

Your agent runs a tool. The user waits. Thirty seconds pass. Still waiting. Did it freeze? Is it working? They have no idea.

Without event streaming, users are blind. They don't know if the agent is reading files, writing code, or stuck in an error loop. They just see a loading spinner and hope for the best.

Event streaming solves this. Tools emit progress updates in real-time. The frontend shows what's happening as it happens. Users see "Reading src/app.ts", "Writing updated code", "File saved successfully". They have context. They can tell if something's wrong before it fails.

What AgentMux Is

AgentMux is a message queue that tools use to broadcast events. It's based on Node's EventEmitter and provides the same API as Python's asyncio.Queue for cross-language compatibility.

Here's the complete implementation:

import { EventEmitter } from 'events';
 
export class AgentMux {
  private emitter: EventEmitter;
 
  constructor() {
    this.emitter = new EventEmitter();
    this.emitter.setMaxListeners(0); // Prevent memory leaks
  }
 
  async put(event: any): Promise<void> {
    this.emitter.emit('event', event);
  }
 
  subscribe(handler: (event: any) => void | Promise<void>): () => void {
    this.emitter.on('event', handler);
    return () => this.emitter.off('event', handler);
  }
 
  empty(): boolean {
    return this.emitter.listenerCount('event') === 0;
  }
 
  clear(): void {
    this.emitter.removeAllListeners();
  }
 
  listenerCount(): number {
    return this.emitter.listenerCount('event');
  }
}

Key characteristics:

  • Non-blocking: Events are emitted immediately, tools don't wait for consumers
  • Multi-subscriber: Multiple handlers can listen to the same event stream
  • Simple API: Just put() to emit and subscribe() to listen
  • Cleanup-friendly: Subscribe returns an unsubscribe function

Initializing AgentMux

Create one AgentMux instance per conversation and pass it to your agent:

export class CodingAgent {
  private mux: AgentMux;
 
  constructor(options: CodingAgentOptions) {
    // Allow external mux for testing or shared event streams
    this.mux = options.externalMux || new AgentMux();
  }
 
  getMux(): AgentMux {
    return this.mux;
  }
}

The mux is passed to tools via RunContext:

const agentContext: AgentContext = {
  projectId: this.projectId,
  sandboxClient: this.sandboxClient,
  mux: this.mux, // Every tool gets access to the same mux
  // ... other context
};
 
const stream = await run(this.agent, inputMessages, {
  context: agentContext,
  maxTurns: 100,
  stream: true,
});

Every tool execution in this conversation uses the same mux instance. Events from all tools flow through one stream.

Event Structure

Events are plain objects with a type and optional data:

await mux.put({
  type: 'coding_agent.create_file.started',
  data: { file_path: 'src/app.ts' }
});

The type follows a naming convention: <agent>.<operation>.<lifecycle_stage>

  • Agent: coding_agent, database_agent, design_system_agent
  • Operation: create_file, edit_file, step, studio
  • Lifecycle: started, chunk, completed, error

Examples:

  • coding_agent.read_file.started
  • coding_agent.edit_file.chunk
  • database_agent.step.completed
  • design_system_agent.generate_design_system.done

This naming makes it easy to filter events on the frontend and handle specific operations.

Basic Event Lifecycle

Most operations follow a three-phase lifecycle: started, work, completed.

export const readTool = tool({
  name: 'Read',
  description: 'Reads a file from the filesystem',
  parameters: READ_PARAMETERS,
  async execute(input: any, runContext?: RunContext<AgentContext>) {
    const { file_path } = parseResult.data;
    const sandboxClient = runContext?.context.sandboxClient;
    const mux = runContext?.context.mux;
 
    // Phase 1: Started
    await mux?.put({
      type: 'coding_agent.read_file.started',
      data: { file_path }
    });
 
    try {
      // Phase 2: Work
      const content = await sandboxClient.readFile(file_path);
 
      if (content === null) {
        await mux?.put({
          type: 'coding_agent.read_file.error',
          data: { file_path, error: 'File not found' }
        });
        return `Error: Could not read file ${file_path}`;
      }
 
      // Phase 3: Completed
      await mux?.put({
        type: 'coding_agent.read_file.completed',
        data: { file_path, length: content.length }
      });
 
      return content;
 
    } catch (error) {
      const errorMsg = error instanceof Error ? error.message : String(error);
 
      await mux?.put({
        type: 'coding_agent.read_file.error',
        data: { file_path, error: errorMsg }
      });
 
      return `Error reading file: ${errorMsg}`;
    }
  }
});

The frontend can show:

  1. "Reading src/app.ts..." (started event)
  2. "Read 1,234 characters from src/app.ts" (completed event)
  3. Or "Error: File not found" (error event)

Users know exactly what's happening at each stage.

Chunk Events for Progress

For long-running operations, emit chunk events to show incremental progress:

export const writeTool = tool({
  name: 'Write',
  description: 'Writes a file to the filesystem',
  parameters: WRITE_PARAMETERS,
  async execute(input: any, runContext?: RunContext<AgentContext>) {
    const { file_path, content } = parseResult.data;
    const sandboxClient = runContext?.context.sandboxClient;
    const mux = runContext?.context.mux;
 
    await mux?.put({
      type: 'coding_agent.create_file.started',
      data: { file_path }
    });
 
    try {
      // Emit chunk event with the content
      await mux?.put({
        type: 'coding_agent.create_file.chunk',
        data: { file_path, chunk: content }
      });
 
      // Perform the write
      await sandboxClient.writeFile(file_path, content);
 
      await mux?.put({
        type: 'coding_agent.create_file.completed',
        data: { file_path }
      });
 
      return `Successfully created ${file_path}`;
 
    } catch (error) {
      const errorMsg = error instanceof Error ? error.message : String(error);
 
      await mux?.put({
        type: 'coding_agent.create_file.error',
        data: { file_path, error: errorMsg }
      });
 
      return `Error creating file: ${errorMsg}`;
    }
  }
});

The frontend can display the chunk content in real-time, showing users the exact code being written before the operation completes.

Streaming Code Generation

When generating code with an LLM, stream chunks as they arrive:

// Started event
await mux.put({
  type: 'coding_agent.Write.content.started',
  data: { file_path }
});
 
// Stream chunks from LLM
for await (const chunk of stream) {
  if (chunk.type === 'content_block_delta' && chunk.delta.type === 'text_delta') {
    await mux.put({
      type: 'coding_agent.Write.content.chunk',
      data: {
        text: chunk.delta.text,
        file_path
      }
    });
  }
}
 
// Completion
await mux.put({
  type: 'coding_agent.Write.content.completed',
  data: { file_path }
});

Users see code appearing character by character, just like typing in an editor. This makes generation feel fast even when it takes seconds.

Error Events

Always emit error events when operations fail. The frontend needs to know what went wrong:

try {
  const result = await sandboxClient.runCommand(command);
 
  await mux?.put({
    type: 'coding_agent.run_terminal_command.completed',
    data: { command, output: result.stdout }
  });
 
  return result.stdout;
 
} catch (error) {
  const errorMsg = error instanceof Error ? error.message : String(error);
 
  // Emit error event
  await mux?.put({
    type: 'coding_agent.run_terminal_command.error',
    data: { command, error: errorMsg }
  });
 
  // Also emit completed event with error flag
  await mux?.put({
    type: 'coding_agent.run_terminal_command.completed',
    data: { command, error: errorMsg }
  });
 
  return `Error executing command: ${errorMsg}`;
}

Some frontends listen for *.error events to show error notifications. Others listen for *.completed and check for an error field. Emitting both ensures compatibility.

Multi-Phase Operations

Complex operations have multiple sub-phases. Emit events for each phase:

async function cloneWebsite(url: string, mux: AgentMux) {
  // Overall operation started
  await mux.put({
    type: 'clone_website.started',
    data: { url }
  });
 
  try {
    // Phase 1: Screenshots
    await mux.put({
      type: 'clone_website.screenshots.started',
      data: { url }
    });
 
    const screenshots = await captureScreenshots(url);
 
    for (const screenshot of screenshots) {
      await mux.put({
        type: 'clone_website.scroll_image',
        data: {
          url,
          image_url: screenshot.url,
          image_type: 'scroll',
          chunk_name: screenshot.name
        }
      });
    }
 
    await mux.put({
      type: 'clone_website.screenshots.done',
      data: { url }
    });
 
    // Phase 2: HTML and Styles
    await mux.put({
      type: 'clone_website.html_styles.started',
      data: { url }
    });
 
    const html = await extractHTML(url);
    const styles = await extractStyles(url);
 
    await mux.put({
      type: 'clone_website.html_styles.done',
      data: { url, html_length: html.length, style_count: styles.length }
    });
 
    // Phase 3: Assets
    await mux.put({
      type: 'clone_website.assets.started',
      data: { url }
    });
 
    const assets = await downloadAssets(url);
 
    await mux.put({
      type: 'clone_website.assets.done',
      data: { url, asset_count: assets.length }
    });
 
    // Overall completion
    await mux.put({
      type: 'clone_website.done',
      data: { url }
    });
 
  } catch (error) {
    await mux.put({
      type: 'clone_website.error',
      data: { url, error: error.message }
    });
  }
}

The frontend shows a progress breakdown:

  • ✓ Capturing screenshots (3/3)
  • ✓ Extracting HTML and styles
  • ⏳ Downloading assets (12/45)

Each phase emits its own started/done events, giving users fine-grained visibility.

Step-Based Events

For agents that work through explicit steps, use a step event pattern:

export class DatabaseHelpers {
  constructor(private mux: AgentMux) {}
 
  async emitStepStarted(stepTitle: string, stepType: string = 'generic', details?: any) {
    await this.mux.put({
      type: 'database_agent.step.started',
      data: {
        title: stepTitle,
        status: 'pending',
        step_type: stepType,
        details
      }
    });
  }
 
  async emitStepCompleted(stepTitle: string, stepType: string = 'generic', details?: any) {
    await this.mux.put({
      type: 'database_agent.step.completed',
      data: {
        title: stepTitle,
        status: 'completed',
        step_type: stepType,
        details
      }
    });
  }
 
  async emitStepFailed(stepTitle: string, error: string, stepType: string = 'generic') {
    await this.mux.put({
      type: 'database_agent.step.failed',
      data: {
        title: stepTitle,
        status: 'failed',
        error,
        step_type: stepType
      }
    });
  }
}

Use these helpers in tools:

export const manageTableSchemaTool = tool({
  name: 'manage_table_schema',
  async execute({ diff }, runContext?: RunContext<DatabaseAgentContext>) {
    const { helpers } = runContext?.context || {};
 
    try {
      await helpers.emitStepStarted('Setting up tables', 'schema');
 
      // Apply schema changes
      await applySchemaChanges(diff);
 
      // Run drizzle commands
      const result = await runDrizzleCommands(['generate', 'push']);
 
      if (result.includes('❌')) {
        await helpers.emitStepFailed('Set up tables', result, 'schema');
        return `Failed: ${result}`;
      }
 
      await helpers.emitStepCompleted('Set up tables', 'schema');
      return '✅ Successfully modified tables';
 
    } catch (error) {
      await helpers.emitStepFailed('Set up tables', error.message, 'schema');
      return `Error: ${error.message}`;
    }
  }
});

The frontend shows a step list that updates in real-time:

  1. ✓ Installing packages
  2. ✓ Setting up database
  3. ⏳ Setting up tables
  4. ⏳ Starting Drizzle Studio

Streaming Events to Frontend

Connect the mux to an SSE (Server-Sent Events) stream so the frontend receives events in real-time:

import { streamSSE } from 'hono/streaming';
 
agentRoutes.post('/:projectId/chat', async (c) => {
  const projectId = c.req.param('projectId');
  const { message } = await c.req.json();
 
  const agent = new CodingAgent({ projectId, sandboxClient });
  const mux = agent.getMux();
 
  return streamSSE(c, async (stream) => {
    // Subscribe to mux events
    const unsubscribe = mux.subscribe((event: any) => {
      stream.writeSSE({
        data: JSON.stringify(event),
      });
    });
 
    try {
      // Initial event
      await stream.writeSSE({
        data: JSON.stringify({ type: 'init', message: 'Starting agent...' }),
      });
 
      // Run agent (this will emit events through the mux)
      await agent.processMessage(message);
 
      // Completion event
      await stream.writeSSE({
        data: JSON.stringify({ type: 'complete', data: 'Agent finished' }),
      });
 
    } finally {
      // Always cleanup subscription
      unsubscribe();
    }
  });
});

The frontend connects to this endpoint and listens for events:

const eventSource = new EventSource(`/api/agents/${projectId}/chat`);
 
eventSource.onmessage = (e) => {
  const event = JSON.parse(e.data);
 
  switch (event.type) {
    case 'coding_agent.create_file.started':
      showNotification(`Creating ${event.data.file_path}...`);
      break;
 
    case 'coding_agent.create_file.chunk':
      appendCodeToEditor(event.data.chunk);
      break;
 
    case 'coding_agent.create_file.completed':
      showNotification(`Created ${event.data.file_path}`);
      break;
 
    case 'coding_agent.create_file.error':
      showError(event.data.error);
      break;
  }
};

Each event updates the UI immediately. No polling, no waiting for completion—users see progress as it happens.

Metadata in Events

Include useful metadata in event data so the frontend can make decisions:

await mux.put({
  type: 'coding_agent.edit_file.completed',
  data: {
    file_path: target_file,
    result: `File ${target_file} updated successfully`,
    old_code: originalCode,
    new_code: updatedCode,
    additions: 15,
    deletions: 3,
    has_syntax_errors: false
  }
});

The frontend can use this metadata to:

  • Show a diff view (old_code vs new_code)
  • Display statistics (+15, -3)
  • Highlight files with syntax errors
  • Filter events by file type

More metadata means smarter UI decisions without additional requests.

Event Filtering Patterns

Frontend code often filters events by type prefix:

eventSource.onmessage = (e) => {
  const event = JSON.parse(e.data);
 
  // Handle all file operations
  if (event.type.startsWith('coding_agent.') && event.type.includes('file')) {
    updateFileTree(event);
  }
 
  // Handle step-based operations
  if (event.type.includes('step.')) {
    updateStepList(event);
  }
 
  // Handle errors from any tool
  if (event.type.endsWith('.error')) {
    showErrorNotification(event.data.error);
  }
 
  // Handle completion events
  if (event.type.endsWith('.completed') || event.type.endsWith('.done')) {
    markOperationComplete(event);
  }
};

Consistent naming conventions make filtering reliable.

Optional Chaining for Mux

Not all contexts provide a mux (like in tests). Use optional chaining so tools work regardless:

const mux = runContext?.context.mux;
 
// This works whether mux exists or not
await mux?.put({
  type: 'coding_agent.read_file.started',
  data: { file_path }
});

If mux is undefined, mux?.put() is a no-op. The tool continues working, just without event emission. This makes testing easier—you can test tool logic without mocking event streams.

Testing Event Emission

In tests, provide a test mux to verify events are emitted correctly:

test('emits events during file creation', async () => {
  const events: any[] = [];
  const testMux = new AgentMux();
 
  testMux.subscribe((event) => {
    events.push(event);
  });
 
  const context: AgentContext = {
    projectId: 'test',
    sandboxClient: mockSandboxClient,
    mux: testMux,
  };
 
  await writeTool.execute(
    { file_path: 'test.ts', content: 'const x = 1;' },
    { context }
  );
 
  expect(events).toHaveLength(3);
  expect(events[0].type).toBe('coding_agent.create_file.started');
  expect(events[1].type).toBe('coding_agent.create_file.chunk');
  expect(events[2].type).toBe('coding_agent.create_file.completed');
});

This verifies the event lifecycle without testing the actual UI consumption.

Common Patterns

Pattern 1: Pre-validation Events

Emit started events before validation so the frontend knows the tool was invoked:

await mux?.put({
  type: 'coding_agent.bash.started',
  data: { command }
});
 
// Now validate
const isProhibited = prohibitedCommands.some((pattern) => pattern.test(command));
 
if (isProhibited) {
  await mux?.put({
    type: 'coding_agent.bash.error',
    data: { command, error: 'Command not allowed' }
  });
 
  return 'Error: This command is not allowed';
}

Users see "Running command..." even if the command is rejected. Without this, the UI would show nothing.

Pattern 2: Progress Percentage

For operations with known steps, include progress percentage:

const totalSteps = 5;
let currentStep = 0;
 
for (const step of steps) {
  currentStep++;
 
  await mux.put({
    type: 'database_agent.migration.progress',
    data: {
      step: step.name,
      progress: (currentStep / totalSteps) * 100,
      current: currentStep,
      total: totalSteps
    }
  });
 
  await executeStep(step);
}

The frontend can render a progress bar: "Step 3 of 5 (60%)"

Pattern 3: Contextual Events

Include context from the agent state in events:

await mux.put({
  type: 'coding_agent.todo_write.completed',
  data: {
    todos: updatedTodos,
    total_tasks: updatedTodos.length,
    pending_tasks: updatedTodos.filter(t => t.status === 'pending').length,
    completed_tasks: updatedTodos.filter(t => t.status === 'completed').length
  }
});

The frontend can update multiple UI elements from one event: task list, progress counters, status indicators.

What We're Skipping

Event replay, event persistence, complex pub/sub patterns, event sourcing, CQRS. These add architectural complexity you don't need initially.

AgentMux is intentionally simple—emit events, subscribe to them, stream to frontend. This handles 95% of use cases. Start here. Add sophistication when your requirements demand it.

What's Next

Your tools now emit events that stream to the frontend in real-time. Users see what's happening, operations feel responsive, and errors are visible immediately.

Next, we'll look at how the agent orchestrates tools—managing conversation history, handling streaming responses, and coordinating multi-turn interactions. That's where individual tools become part of a cohesive system that can handle complex, long-running tasks.