Your agent runs a tool. The user waits. Thirty seconds pass. Still waiting. Did it freeze? Is it working? They have no idea.
Without event streaming, users are blind. They don't know if the agent is reading files, writing code, or stuck in an error loop. They just see a loading spinner and hope for the best.
Event streaming solves this. Tools emit progress updates in real-time. The frontend shows what's happening as it happens. Users see "Reading src/app.ts", "Writing updated code", "File saved successfully". They have context. They can tell if something's wrong before it fails.
What AgentMux Is
AgentMux is a message queue that tools use to broadcast events. It's based on Node's EventEmitter and provides the same API as Python's asyncio.Queue for cross-language compatibility.
Here's the complete implementation:
import { EventEmitter } from 'events';
export class AgentMux {
private emitter: EventEmitter;
constructor() {
this.emitter = new EventEmitter();
this.emitter.setMaxListeners(0); // Prevent memory leaks
}
async put(event: any): Promise<void> {
this.emitter.emit('event', event);
}
subscribe(handler: (event: any) => void | Promise<void>): () => void {
this.emitter.on('event', handler);
return () => this.emitter.off('event', handler);
}
empty(): boolean {
return this.emitter.listenerCount('event') === 0;
}
clear(): void {
this.emitter.removeAllListeners();
}
listenerCount(): number {
return this.emitter.listenerCount('event');
}
}Key characteristics:
- Non-blocking: Events are emitted immediately, tools don't wait for consumers
- Multi-subscriber: Multiple handlers can listen to the same event stream
- Simple API: Just
put()to emit andsubscribe()to listen - Cleanup-friendly: Subscribe returns an unsubscribe function
Initializing AgentMux
Create one AgentMux instance per conversation and pass it to your agent:
export class CodingAgent {
private mux: AgentMux;
constructor(options: CodingAgentOptions) {
// Allow external mux for testing or shared event streams
this.mux = options.externalMux || new AgentMux();
}
getMux(): AgentMux {
return this.mux;
}
}The mux is passed to tools via RunContext:
const agentContext: AgentContext = {
projectId: this.projectId,
sandboxClient: this.sandboxClient,
mux: this.mux, // Every tool gets access to the same mux
// ... other context
};
const stream = await run(this.agent, inputMessages, {
context: agentContext,
maxTurns: 100,
stream: true,
});Every tool execution in this conversation uses the same mux instance. Events from all tools flow through one stream.
Event Structure
Events are plain objects with a type and optional data:
await mux.put({
type: 'coding_agent.create_file.started',
data: { file_path: 'src/app.ts' }
});The type follows a naming convention: <agent>.<operation>.<lifecycle_stage>
- Agent:
coding_agent,database_agent,design_system_agent - Operation:
create_file,edit_file,step,studio - Lifecycle:
started,chunk,completed,error
Examples:
coding_agent.read_file.startedcoding_agent.edit_file.chunkdatabase_agent.step.completeddesign_system_agent.generate_design_system.done
This naming makes it easy to filter events on the frontend and handle specific operations.
Basic Event Lifecycle
Most operations follow a three-phase lifecycle: started, work, completed.
export const readTool = tool({
name: 'Read',
description: 'Reads a file from the filesystem',
parameters: READ_PARAMETERS,
async execute(input: any, runContext?: RunContext<AgentContext>) {
const { file_path } = parseResult.data;
const sandboxClient = runContext?.context.sandboxClient;
const mux = runContext?.context.mux;
// Phase 1: Started
await mux?.put({
type: 'coding_agent.read_file.started',
data: { file_path }
});
try {
// Phase 2: Work
const content = await sandboxClient.readFile(file_path);
if (content === null) {
await mux?.put({
type: 'coding_agent.read_file.error',
data: { file_path, error: 'File not found' }
});
return `Error: Could not read file ${file_path}`;
}
// Phase 3: Completed
await mux?.put({
type: 'coding_agent.read_file.completed',
data: { file_path, length: content.length }
});
return content;
} catch (error) {
const errorMsg = error instanceof Error ? error.message : String(error);
await mux?.put({
type: 'coding_agent.read_file.error',
data: { file_path, error: errorMsg }
});
return `Error reading file: ${errorMsg}`;
}
}
});The frontend can show:
- "Reading src/app.ts..." (started event)
- "Read 1,234 characters from src/app.ts" (completed event)
- Or "Error: File not found" (error event)
Users know exactly what's happening at each stage.
Chunk Events for Progress
For long-running operations, emit chunk events to show incremental progress:
export const writeTool = tool({
name: 'Write',
description: 'Writes a file to the filesystem',
parameters: WRITE_PARAMETERS,
async execute(input: any, runContext?: RunContext<AgentContext>) {
const { file_path, content } = parseResult.data;
const sandboxClient = runContext?.context.sandboxClient;
const mux = runContext?.context.mux;
await mux?.put({
type: 'coding_agent.create_file.started',
data: { file_path }
});
try {
// Emit chunk event with the content
await mux?.put({
type: 'coding_agent.create_file.chunk',
data: { file_path, chunk: content }
});
// Perform the write
await sandboxClient.writeFile(file_path, content);
await mux?.put({
type: 'coding_agent.create_file.completed',
data: { file_path }
});
return `Successfully created ${file_path}`;
} catch (error) {
const errorMsg = error instanceof Error ? error.message : String(error);
await mux?.put({
type: 'coding_agent.create_file.error',
data: { file_path, error: errorMsg }
});
return `Error creating file: ${errorMsg}`;
}
}
});The frontend can display the chunk content in real-time, showing users the exact code being written before the operation completes.
Streaming Code Generation
When generating code with an LLM, stream chunks as they arrive:
// Started event
await mux.put({
type: 'coding_agent.Write.content.started',
data: { file_path }
});
// Stream chunks from LLM
for await (const chunk of stream) {
if (chunk.type === 'content_block_delta' && chunk.delta.type === 'text_delta') {
await mux.put({
type: 'coding_agent.Write.content.chunk',
data: {
text: chunk.delta.text,
file_path
}
});
}
}
// Completion
await mux.put({
type: 'coding_agent.Write.content.completed',
data: { file_path }
});Users see code appearing character by character, just like typing in an editor. This makes generation feel fast even when it takes seconds.
Error Events
Always emit error events when operations fail. The frontend needs to know what went wrong:
try {
const result = await sandboxClient.runCommand(command);
await mux?.put({
type: 'coding_agent.run_terminal_command.completed',
data: { command, output: result.stdout }
});
return result.stdout;
} catch (error) {
const errorMsg = error instanceof Error ? error.message : String(error);
// Emit error event
await mux?.put({
type: 'coding_agent.run_terminal_command.error',
data: { command, error: errorMsg }
});
// Also emit completed event with error flag
await mux?.put({
type: 'coding_agent.run_terminal_command.completed',
data: { command, error: errorMsg }
});
return `Error executing command: ${errorMsg}`;
}Some frontends listen for *.error events to show error notifications. Others listen for *.completed and check for an error field. Emitting both ensures compatibility.
Multi-Phase Operations
Complex operations have multiple sub-phases. Emit events for each phase:
async function cloneWebsite(url: string, mux: AgentMux) {
// Overall operation started
await mux.put({
type: 'clone_website.started',
data: { url }
});
try {
// Phase 1: Screenshots
await mux.put({
type: 'clone_website.screenshots.started',
data: { url }
});
const screenshots = await captureScreenshots(url);
for (const screenshot of screenshots) {
await mux.put({
type: 'clone_website.scroll_image',
data: {
url,
image_url: screenshot.url,
image_type: 'scroll',
chunk_name: screenshot.name
}
});
}
await mux.put({
type: 'clone_website.screenshots.done',
data: { url }
});
// Phase 2: HTML and Styles
await mux.put({
type: 'clone_website.html_styles.started',
data: { url }
});
const html = await extractHTML(url);
const styles = await extractStyles(url);
await mux.put({
type: 'clone_website.html_styles.done',
data: { url, html_length: html.length, style_count: styles.length }
});
// Phase 3: Assets
await mux.put({
type: 'clone_website.assets.started',
data: { url }
});
const assets = await downloadAssets(url);
await mux.put({
type: 'clone_website.assets.done',
data: { url, asset_count: assets.length }
});
// Overall completion
await mux.put({
type: 'clone_website.done',
data: { url }
});
} catch (error) {
await mux.put({
type: 'clone_website.error',
data: { url, error: error.message }
});
}
}The frontend shows a progress breakdown:
- ✓ Capturing screenshots (3/3)
- ✓ Extracting HTML and styles
- ⏳ Downloading assets (12/45)
Each phase emits its own started/done events, giving users fine-grained visibility.
Step-Based Events
For agents that work through explicit steps, use a step event pattern:
export class DatabaseHelpers {
constructor(private mux: AgentMux) {}
async emitStepStarted(stepTitle: string, stepType: string = 'generic', details?: any) {
await this.mux.put({
type: 'database_agent.step.started',
data: {
title: stepTitle,
status: 'pending',
step_type: stepType,
details
}
});
}
async emitStepCompleted(stepTitle: string, stepType: string = 'generic', details?: any) {
await this.mux.put({
type: 'database_agent.step.completed',
data: {
title: stepTitle,
status: 'completed',
step_type: stepType,
details
}
});
}
async emitStepFailed(stepTitle: string, error: string, stepType: string = 'generic') {
await this.mux.put({
type: 'database_agent.step.failed',
data: {
title: stepTitle,
status: 'failed',
error,
step_type: stepType
}
});
}
}Use these helpers in tools:
export const manageTableSchemaTool = tool({
name: 'manage_table_schema',
async execute({ diff }, runContext?: RunContext<DatabaseAgentContext>) {
const { helpers } = runContext?.context || {};
try {
await helpers.emitStepStarted('Setting up tables', 'schema');
// Apply schema changes
await applySchemaChanges(diff);
// Run drizzle commands
const result = await runDrizzleCommands(['generate', 'push']);
if (result.includes('❌')) {
await helpers.emitStepFailed('Set up tables', result, 'schema');
return `Failed: ${result}`;
}
await helpers.emitStepCompleted('Set up tables', 'schema');
return '✅ Successfully modified tables';
} catch (error) {
await helpers.emitStepFailed('Set up tables', error.message, 'schema');
return `Error: ${error.message}`;
}
}
});The frontend shows a step list that updates in real-time:
- ✓ Installing packages
- ✓ Setting up database
- ⏳ Setting up tables
- ⏳ Starting Drizzle Studio
Streaming Events to Frontend
Connect the mux to an SSE (Server-Sent Events) stream so the frontend receives events in real-time:
import { streamSSE } from 'hono/streaming';
agentRoutes.post('/:projectId/chat', async (c) => {
const projectId = c.req.param('projectId');
const { message } = await c.req.json();
const agent = new CodingAgent({ projectId, sandboxClient });
const mux = agent.getMux();
return streamSSE(c, async (stream) => {
// Subscribe to mux events
const unsubscribe = mux.subscribe((event: any) => {
stream.writeSSE({
data: JSON.stringify(event),
});
});
try {
// Initial event
await stream.writeSSE({
data: JSON.stringify({ type: 'init', message: 'Starting agent...' }),
});
// Run agent (this will emit events through the mux)
await agent.processMessage(message);
// Completion event
await stream.writeSSE({
data: JSON.stringify({ type: 'complete', data: 'Agent finished' }),
});
} finally {
// Always cleanup subscription
unsubscribe();
}
});
});The frontend connects to this endpoint and listens for events:
const eventSource = new EventSource(`/api/agents/${projectId}/chat`);
eventSource.onmessage = (e) => {
const event = JSON.parse(e.data);
switch (event.type) {
case 'coding_agent.create_file.started':
showNotification(`Creating ${event.data.file_path}...`);
break;
case 'coding_agent.create_file.chunk':
appendCodeToEditor(event.data.chunk);
break;
case 'coding_agent.create_file.completed':
showNotification(`Created ${event.data.file_path}`);
break;
case 'coding_agent.create_file.error':
showError(event.data.error);
break;
}
};Each event updates the UI immediately. No polling, no waiting for completion—users see progress as it happens.
Metadata in Events
Include useful metadata in event data so the frontend can make decisions:
await mux.put({
type: 'coding_agent.edit_file.completed',
data: {
file_path: target_file,
result: `File ${target_file} updated successfully`,
old_code: originalCode,
new_code: updatedCode,
additions: 15,
deletions: 3,
has_syntax_errors: false
}
});The frontend can use this metadata to:
- Show a diff view (old_code vs new_code)
- Display statistics (+15, -3)
- Highlight files with syntax errors
- Filter events by file type
More metadata means smarter UI decisions without additional requests.
Event Filtering Patterns
Frontend code often filters events by type prefix:
eventSource.onmessage = (e) => {
const event = JSON.parse(e.data);
// Handle all file operations
if (event.type.startsWith('coding_agent.') && event.type.includes('file')) {
updateFileTree(event);
}
// Handle step-based operations
if (event.type.includes('step.')) {
updateStepList(event);
}
// Handle errors from any tool
if (event.type.endsWith('.error')) {
showErrorNotification(event.data.error);
}
// Handle completion events
if (event.type.endsWith('.completed') || event.type.endsWith('.done')) {
markOperationComplete(event);
}
};Consistent naming conventions make filtering reliable.
Optional Chaining for Mux
Not all contexts provide a mux (like in tests). Use optional chaining so tools work regardless:
const mux = runContext?.context.mux;
// This works whether mux exists or not
await mux?.put({
type: 'coding_agent.read_file.started',
data: { file_path }
});If mux is undefined, mux?.put() is a no-op. The tool continues working, just without event emission. This makes testing easier—you can test tool logic without mocking event streams.
Testing Event Emission
In tests, provide a test mux to verify events are emitted correctly:
test('emits events during file creation', async () => {
const events: any[] = [];
const testMux = new AgentMux();
testMux.subscribe((event) => {
events.push(event);
});
const context: AgentContext = {
projectId: 'test',
sandboxClient: mockSandboxClient,
mux: testMux,
};
await writeTool.execute(
{ file_path: 'test.ts', content: 'const x = 1;' },
{ context }
);
expect(events).toHaveLength(3);
expect(events[0].type).toBe('coding_agent.create_file.started');
expect(events[1].type).toBe('coding_agent.create_file.chunk');
expect(events[2].type).toBe('coding_agent.create_file.completed');
});This verifies the event lifecycle without testing the actual UI consumption.
Common Patterns
Pattern 1: Pre-validation Events
Emit started events before validation so the frontend knows the tool was invoked:
await mux?.put({
type: 'coding_agent.bash.started',
data: { command }
});
// Now validate
const isProhibited = prohibitedCommands.some((pattern) => pattern.test(command));
if (isProhibited) {
await mux?.put({
type: 'coding_agent.bash.error',
data: { command, error: 'Command not allowed' }
});
return 'Error: This command is not allowed';
}Users see "Running command..." even if the command is rejected. Without this, the UI would show nothing.
Pattern 2: Progress Percentage
For operations with known steps, include progress percentage:
const totalSteps = 5;
let currentStep = 0;
for (const step of steps) {
currentStep++;
await mux.put({
type: 'database_agent.migration.progress',
data: {
step: step.name,
progress: (currentStep / totalSteps) * 100,
current: currentStep,
total: totalSteps
}
});
await executeStep(step);
}The frontend can render a progress bar: "Step 3 of 5 (60%)"
Pattern 3: Contextual Events
Include context from the agent state in events:
await mux.put({
type: 'coding_agent.todo_write.completed',
data: {
todos: updatedTodos,
total_tasks: updatedTodos.length,
pending_tasks: updatedTodos.filter(t => t.status === 'pending').length,
completed_tasks: updatedTodos.filter(t => t.status === 'completed').length
}
});The frontend can update multiple UI elements from one event: task list, progress counters, status indicators.
What We're Skipping
Event replay, event persistence, complex pub/sub patterns, event sourcing, CQRS. These add architectural complexity you don't need initially.
AgentMux is intentionally simple—emit events, subscribe to them, stream to frontend. This handles 95% of use cases. Start here. Add sophistication when your requirements demand it.
What's Next
Your tools now emit events that stream to the frontend in real-time. Users see what's happening, operations feel responsive, and errors are visible immediately.
Next, we'll look at how the agent orchestrates tools—managing conversation history, handling streaming responses, and coordinating multi-turn interactions. That's where individual tools become part of a cohesive system that can handle complex, long-running tasks.