Skip to content

Event Stream

1 post with the tag “Event Stream”

Complete Guide to Codex SDK Console Message Parsing

Complete Guide to Codex SDK Console Message Parsing

Section titled “Complete Guide to Codex SDK Console Message Parsing”

This article explains the Codex SDK event stream mechanism, message type parsing, and best practices in real projects, helping developers quickly master the core skills behind AI execution services.

When building an AI execution service based on the Codex SDK, we inevitably run into a practical question: how should we handle the streamed event messages returned by Codex? These messages contain important information such as execution status, output content, and error details, so they deserve careful handling.

As part of the HagiCode project, we needed a reliable executor for AI coding assistant scenarios. That is exactly why we decided to study the Codex SDK event stream mechanism in depth. After all, only by understanding how the underlying messages work can we build a truly enterprise-grade AI execution platform.

The Codex SDK is a programming-assistance SDK released by OpenAI. It returns execution results through an Event Stream. Unlike the traditional request-response model, Codex uses streamed events so that we can:

  • Get execution progress in real time
  • Handle errors promptly
  • Obtain detailed token usage statistics
  • Support long-running complex tasks

Understanding these event types and parsing them correctly is essential for implementing a fully capable AI executor. In the end, nobody wants to work with a black box.

The solution shared in this article comes from our practical experience in the HagiCode project. HagiCode is an open-source AI coding assistant project dedicated to providing developers with intelligent coding support. During development, we needed to build a reliable AI execution service to handle user code execution requests, which is the direct reason we introduced the Codex SDK.

As an AI coding assistant, HagiCode needs to deal with a variety of complex code execution scenarios: getting execution progress in real time, handling errors promptly, and collecting detailed token usage statistics. By deeply understanding the Codex SDK event stream mechanism, we can build an executor that meets production environment requirements. Ultimately, whether it is software or real life, everything benefits from steady accumulation and refinement.

The Codex SDK uses the thread.runStreamed() method to return an asynchronous event iterator:

import { Codex } from '@openai/codex-sdk';
const client = new Codex({
apiKey: process.env.CODEX_API_KEY,
baseUrl: process.env.CODEX_BASE_URL,
});
const thread = client.startThread({
workingDirectory: '/path/to/project',
skipGitRepoCheck: false,
});
const { events } = await thread.runStreamed('your prompt here', {
outputSchema: {
type: 'object',
properties: {
output: { type: 'string' },
status: { type: 'string', enum: ['ok', 'action_required'] },
},
required: ['output', 'status'],
},
});
for await (const event of events) {
// Handle each event
}
Event TypeDescriptionKey Data
thread.startedThread started successfullythread_id
item.updatedMessage content updateditem.text
item.completedMessage completeditem.text
turn.completedExecution completedusage (token usage)
turn.failedExecution failederror.message
errorError eventmessage

In real projects, HagiCode’s executor component is built on top of these event types. We need to handle each kind of event carefully to ensure a smooth user experience. Good systems are built by taking details seriously.

Message content is extracted through an event handler:

private handleThreadEvent(event: ThreadEvent, onMessage: (content: string) => void): void {
// Only handle message update and completion events
if (event.type !== 'item.updated' && event.type !== 'item.completed') {
return;
}
// Only handle agent message content
if (event.item.type !== 'agent_message') {
return;
}
// Extract text content
onMessage(event.item.text);
}

Key points:

  • Only handle item.updated and item.completed events
  • Only handle content of type agent_message
  • The message content is in the event.item.text field

Codex supports JSON structured output. You can specify the return format through the outputSchema parameter:

const DEFAULT_OUTPUT_SCHEMA = {
type: 'object',
properties: {
output: { type: 'string' },
status: { type: 'string', enum: ['ok', 'action_required'] },
},
required: ['output', 'status'],
additionalProperties: false,
} as const;

The parsing function attempts to parse JSON, and if that fails it falls back to the raw text.

function toStructuredOutput(raw: string): StructuredOutput {
try {
const parsed = JSON.parse(raw) as Partial<StructuredOutput>;
if (typeof parsed.output === 'string') {
return {
output: parsed.output,
status: parsed.status === 'action_required' ? 'action_required' : 'ok',
};
}
} catch {
// JSON parsing failed, fall back to the raw text
}
return {
output: raw,
status: 'ok',
};
}
private async runWithStreaming(
thread: Thread,
input: CodexStageExecutionInput
): Promise<{ output: string; usage: Usage | null }> {
const abortController = new AbortController();
const timeoutHandle = setTimeout(() => {
abortController.abort();
}, Math.max(1000, input.timeoutMs));
let latestMessage = '';
let usage: Usage | null = null;
let emittedLength = 0;
try {
const { events } = await thread.runStreamed(input.prompt, {
outputSchema: DEFAULT_OUTPUT_SCHEMA,
signal: abortController.signal,
});
for await (const event of events) {
// Handle message content
this.handleThreadEvent(event, (nextContent) => {
const delta = nextContent.slice(emittedLength);
if (delta.length > 0) {
emittedLength = nextContent.length;
input.callbacks?.onChunk?.(delta); // Streaming callback
}
latestMessage = nextContent;
});
// Process different data based on the event type
if (event.type === 'thread.started') {
this.threadId = event.thread_id;
} else if (event.type === 'turn.completed') {
usage = event.usage;
} else if (event.type === 'turn.failed') {
throw new CodexExecutorError('gateway_unavailable', event.error.message, true);
} else if (event.type === 'error') {
throw new CodexExecutorError('gateway_unavailable', event.message, true);
}
}
} catch (error) {
if (abortController.signal.aborted) {
throw new CodexExecutorError(
'upstream_timeout',
`Codex stage timed out after ${input.timeoutMs}ms`,
true
);
}
throw error;
} finally {
clearTimeout(timeoutHandle);
}
const structured = toStructuredOutput(latestMessage);
return { output: structured.output, usage };
}

Map specific error patterns to concrete error codes so the upper layers can handle them more easily:

function mapError(error: unknown): CodexExecutorError {
if (error instanceof CodexExecutorError) {
return error;
}
const message = error instanceof Error ? error.message : String(error);
const normalized = message.toLowerCase();
// Authentication errors - not retryable
if (normalized.includes('401') ||
normalized.includes('403') ||
normalized.includes('api key') ||
normalized.includes('auth')) {
return new CodexExecutorError('auth_invalid', message, false);
}
// Rate limit errors - retryable
if (normalized.includes('429') || normalized.includes('rate limit')) {
return new CodexExecutorError('rate_limited', message, true);
}
// Timeout errors - retryable
if (normalized.includes('timeout') || normalized.includes('aborted')) {
return new CodexExecutorError('upstream_timeout', message, true);
}
// Default error
return new CodexExecutorError('gateway_unavailable', message, true);
}
export type CodexErrorCode =
| 'auth_invalid' // Authentication failure
| 'upstream_timeout' // Upstream timeout
| 'rate_limited' // Rate limited
| 'gateway_unavailable'; // Gateway unavailable
export class CodexExecutorError extends Error {
readonly code: CodexErrorCode;
readonly retryable: boolean;
constructor(code: CodexErrorCode, message: string, retryable: boolean) {
super(message);
this.name = 'CodexExecutorError';
this.code = code;
this.retryable = retryable;
}
}

Working Directory and Environment Configuration

Section titled “Working Directory and Environment Configuration”

The Codex SDK requires the working directory to be a valid Git repository.

export function validateWorkingDirectory(
workingDirectory: string,
skipGitRepoCheck: boolean
): void {
const resolvedWorkingDirectory = path.resolve(workingDirectory);
if (!existsSync(resolvedWorkingDirectory)) {
throw new CodexExecutorError(
'gateway_unavailable',
'Working directory does not exist.',
false
);
}
if (!statSync(resolvedWorkingDirectory).isDirectory()) {
throw new CodexExecutorError(
'gateway_unavailable',
'Working directory is not a directory.',
false
);
}
if (skipGitRepoCheck) {
return;
}
const gitDir = path.join(resolvedWorkingDirectory, '.git');
if (!existsSync(gitDir)) {
throw new CodexExecutorError(
'gateway_unavailable',
'Working directory is not a git repository.',
false
);
}
}

The Codex SDK needs to load environment variables from the login shell so the AI Agent can access system commands:

function parseEnvironmentOutput(output: Buffer): Record<string, string> {
const parsed: Record<string, string> = {};
for (const entry of output.toString('utf8').split('\0')) {
if (!entry) continue;
const separatorIndex = entry.indexOf('=');
if (separatorIndex <= 0) continue;
const key = entry.slice(0, separatorIndex);
const value = entry.slice(separatorIndex + 1);
if (key.length > 0) {
parsed[key] = value;
}
}
return parsed;
}
function tryLoadEnvironmentFromShell(shellPath: string): Record<string, string> | null {
const result = spawnSync(shellPath, ['-ilc', 'env -0'], {
env: process.env,
stdio: ['ignore', 'pipe', 'pipe'],
timeout: 5000,
});
if (result.error || result.status !== 0) {
return null;
}
return parseEnvironmentOutput(result.stdout);
}
export function createExecutorEnvironment(
envOverrides: Record<string, string> = {}
): Record<string, string> {
// Load environment variables from the login shell
const consoleEnv = loadConsoleEnvironmentFromShell();
return {
...process.env,
...consoleEnv,
...envOverrides,
};
}

In the HagiCode project, we use the following approach to initialize the Codex client and execute tasks:

import { Codex } from '@openai/codex-sdk';
async function executeWithCodex(prompt: string, workingDir: string) {
const client = new Codex({
apiKey: process.env.CODEX_API_KEY,
env: { PATH: process.env.PATH },
});
const thread = client.startThread({
workingDirectory: workingDir,
});
const { events } = await thread.runStreamed(prompt);
let result = '';
for await (const event of events) {
if (event.type === 'item.updated' && event.item.type === 'agent_message') {
result = event.item.text;
}
if (event.type === 'turn.completed') {
console.log('Token usage:', event.usage);
}
}
// Try to parse JSON output
try {
const parsed = JSON.parse(result);
return parsed.output;
} catch {
return result;
}
}
export class CodexSdkExecutor {
private readonly config: CodexRuntimeConfig;
private readonly client: Codex;
private threadId: string | null = null;
async executeStage(input: CodexStageExecutionInput): Promise<CodexStageExecutionResult> {
const maxAttempts = Math.max(1, this.config.retryCount + 1);
let attempt = 0;
let lastError: CodexExecutorError | null = null;
while (attempt < maxAttempts) {
attempt += 1;
try {
const thread = this.getThread(input.workingDirectory);
const { output, usage } = await this.runWithStreaming(thread, input);
return {
output,
usage,
threadId: this.threadId!,
attempts: attempt,
latencyMs: Date.now() - startedAt,
};
} catch (error) {
const mappedError = mapError(error);
lastError = mappedError;
// Non-retryable error or max retry attempts reached
if (!mappedError.retryable || attempt >= maxAttempts) {
throw mappedError;
}
// Wait before retrying
await new Promise(resolve => setTimeout(resolve, 1000 * attempt));
}
}
throw lastError!;
}
}
  • Make sure the working directory is a valid Git repository
  • Use the PROJECT_ROOT environment variable to specify it explicitly
  • During debugging, you can set CODEX_SKIP_GIT_REPO_CHECK=true to skip the check
  • Pass only the required environment variables through a whitelist mechanism
  • Use the login shell to load the full environment
  • Avoid passing sensitive information
  • Set reasonable timeouts based on task complexity
  • Implement exponential backoff for retryable errors
  • Record retry counts and reasons
  • Distinguish between retryable and non-retryable errors
  • Provide clear error messages and suggestions
  • Use unified error codes so upper layers can handle them consistently
  • Implement incremental output callbacks to improve user experience
  • Correctly handle incremental message updates
  • Record token usage for cost analysis

In the actual production environment of the HagiCode project, we have already verified the effectiveness of the best practices above. This approach has helped us build a stable and reliable AI execution service. In the end, practical validation matters more than theory alone.

The Codex SDK event stream mechanism provides strong capabilities for building AI execution services. By correctly parsing different kinds of events, we can:

  • Get execution status and output in real time
  • Implement reliable error handling and retry mechanisms
  • Obtain detailed execution statistics
  • Build a full-featured AI execution platform

The core concepts and code samples introduced in this article can be applied directly in real projects, helping developers get started quickly with Codex SDK integration. If you find this approach valuable, it also reflects the strength of HagiCode’s engineering practice and makes HagiCode itself worth following.


Thank you for reading. If you found this article helpful, please click the like button below so more people can discover it.

This content was created with AI-assisted collaboration, reviewed by the author, and reflects the author’s own views and positions.