Complete Guide to Codex SDK Console Message Parsing
Complete Guide to Codex SDK Console Message Parsing
Section titled “Complete Guide to Codex SDK Console Message Parsing”This article explains the Codex SDK event stream mechanism, message type parsing, and best practices in real projects, helping developers quickly master the core skills behind AI execution services.
Background
Section titled “Background”When building an AI execution service based on the Codex SDK, we inevitably run into a practical question: how should we handle the streamed event messages returned by Codex? These messages contain important information such as execution status, output content, and error details, so they deserve careful handling.
As part of the HagiCode project, we needed a reliable executor for AI coding assistant scenarios. That is exactly why we decided to study the Codex SDK event stream mechanism in depth. After all, only by understanding how the underlying messages work can we build a truly enterprise-grade AI execution platform.
The Codex SDK is a programming-assistance SDK released by OpenAI. It returns execution results through an Event Stream. Unlike the traditional request-response model, Codex uses streamed events so that we can:
- Get execution progress in real time
- Handle errors promptly
- Obtain detailed token usage statistics
- Support long-running complex tasks
Understanding these event types and parsing them correctly is essential for implementing a fully capable AI executor. In the end, nobody wants to work with a black box.
About HagiCode
Section titled “About HagiCode”The solution shared in this article comes from our practical experience in the HagiCode project. HagiCode is an open-source AI coding assistant project dedicated to providing developers with intelligent coding support. During development, we needed to build a reliable AI execution service to handle user code execution requests, which is the direct reason we introduced the Codex SDK.
As an AI coding assistant, HagiCode needs to deal with a variety of complex code execution scenarios: getting execution progress in real time, handling errors promptly, and collecting detailed token usage statistics. By deeply understanding the Codex SDK event stream mechanism, we can build an executor that meets production environment requirements. Ultimately, whether it is software or real life, everything benefits from steady accumulation and refinement.
Event Stream Mechanism
Section titled “Event Stream Mechanism”Basic Concepts
Section titled “Basic Concepts”The Codex SDK uses the thread.runStreamed() method to return an asynchronous event iterator:
import { Codex } from '@openai/codex-sdk';
const client = new Codex({ apiKey: process.env.CODEX_API_KEY, baseUrl: process.env.CODEX_BASE_URL,});
const thread = client.startThread({ workingDirectory: '/path/to/project', skipGitRepoCheck: false,});
const { events } = await thread.runStreamed('your prompt here', { outputSchema: { type: 'object', properties: { output: { type: 'string' }, status: { type: 'string', enum: ['ok', 'action_required'] }, }, required: ['output', 'status'], },});
for await (const event of events) { // Handle each event}Detailed Explanation of Event Types
Section titled “Detailed Explanation of Event Types”| Event Type | Description | Key Data |
|---|---|---|
thread.started | Thread started successfully | thread_id |
item.updated | Message content updated | item.text |
item.completed | Message completed | item.text |
turn.completed | Execution completed | usage (token usage) |
turn.failed | Execution failed | error.message |
error | Error event | message |
In real projects, HagiCode’s executor component is built on top of these event types. We need to handle each kind of event carefully to ensure a smooth user experience. Good systems are built by taking details seriously.
Message Parsing Implementation
Section titled “Message Parsing Implementation”Extracting Message Content
Section titled “Extracting Message Content”Message content is extracted through an event handler:
private handleThreadEvent(event: ThreadEvent, onMessage: (content: string) => void): void { // Only handle message update and completion events if (event.type !== 'item.updated' && event.type !== 'item.completed') { return; }
// Only handle agent message content if (event.item.type !== 'agent_message') { return; }
// Extract text content onMessage(event.item.text);}Key points:
- Only handle
item.updatedanditem.completedevents - Only handle content of type
agent_message - The message content is in the
event.item.textfield
Structured Output Parsing
Section titled “Structured Output Parsing”Codex supports JSON structured output. You can specify the return format through the outputSchema parameter:
const DEFAULT_OUTPUT_SCHEMA = { type: 'object', properties: { output: { type: 'string' }, status: { type: 'string', enum: ['ok', 'action_required'] }, }, required: ['output', 'status'], additionalProperties: false,} as const;The parsing function attempts to parse JSON, and if that fails it falls back to the raw text.
function toStructuredOutput(raw: string): StructuredOutput { try { const parsed = JSON.parse(raw) as Partial<StructuredOutput>; if (typeof parsed.output === 'string') { return { output: parsed.output, status: parsed.status === 'action_required' ? 'action_required' : 'ok', }; } } catch { // JSON parsing failed, fall back to the raw text }
return { output: raw, status: 'ok', };}Complete Event Handling Flow
Section titled “Complete Event Handling Flow”private async runWithStreaming( thread: Thread, input: CodexStageExecutionInput): Promise<{ output: string; usage: Usage | null }> { const abortController = new AbortController(); const timeoutHandle = setTimeout(() => { abortController.abort(); }, Math.max(1000, input.timeoutMs));
let latestMessage = ''; let usage: Usage | null = null; let emittedLength = 0;
try { const { events } = await thread.runStreamed(input.prompt, { outputSchema: DEFAULT_OUTPUT_SCHEMA, signal: abortController.signal, });
for await (const event of events) { // Handle message content this.handleThreadEvent(event, (nextContent) => { const delta = nextContent.slice(emittedLength); if (delta.length > 0) { emittedLength = nextContent.length; input.callbacks?.onChunk?.(delta); // Streaming callback } latestMessage = nextContent; });
// Process different data based on the event type if (event.type === 'thread.started') { this.threadId = event.thread_id; } else if (event.type === 'turn.completed') { usage = event.usage; } else if (event.type === 'turn.failed') { throw new CodexExecutorError('gateway_unavailable', event.error.message, true); } else if (event.type === 'error') { throw new CodexExecutorError('gateway_unavailable', event.message, true); } } } catch (error) { if (abortController.signal.aborted) { throw new CodexExecutorError( 'upstream_timeout', `Codex stage timed out after ${input.timeoutMs}ms`, true ); } throw error; } finally { clearTimeout(timeoutHandle); }
const structured = toStructuredOutput(latestMessage); return { output: structured.output, usage };}Error Handling Strategy
Section titled “Error Handling Strategy”Error Code Mapping
Section titled “Error Code Mapping”Map specific error patterns to concrete error codes so the upper layers can handle them more easily:
function mapError(error: unknown): CodexExecutorError { if (error instanceof CodexExecutorError) { return error; }
const message = error instanceof Error ? error.message : String(error); const normalized = message.toLowerCase();
// Authentication errors - not retryable if (normalized.includes('401') || normalized.includes('403') || normalized.includes('api key') || normalized.includes('auth')) { return new CodexExecutorError('auth_invalid', message, false); }
// Rate limit errors - retryable if (normalized.includes('429') || normalized.includes('rate limit')) { return new CodexExecutorError('rate_limited', message, true); }
// Timeout errors - retryable if (normalized.includes('timeout') || normalized.includes('aborted')) { return new CodexExecutorError('upstream_timeout', message, true); }
// Default error return new CodexExecutorError('gateway_unavailable', message, true);}Error Type Definitions
Section titled “Error Type Definitions”export type CodexErrorCode = | 'auth_invalid' // Authentication failure | 'upstream_timeout' // Upstream timeout | 'rate_limited' // Rate limited | 'gateway_unavailable'; // Gateway unavailable
export class CodexExecutorError extends Error { readonly code: CodexErrorCode; readonly retryable: boolean;
constructor(code: CodexErrorCode, message: string, retryable: boolean) { super(message); this.name = 'CodexExecutorError'; this.code = code; this.retryable = retryable; }}Working Directory and Environment Configuration
Section titled “Working Directory and Environment Configuration”Working Directory Validation
Section titled “Working Directory Validation”The Codex SDK requires the working directory to be a valid Git repository.
export function validateWorkingDirectory( workingDirectory: string, skipGitRepoCheck: boolean): void { const resolvedWorkingDirectory = path.resolve(workingDirectory);
if (!existsSync(resolvedWorkingDirectory)) { throw new CodexExecutorError( 'gateway_unavailable', 'Working directory does not exist.', false ); }
if (!statSync(resolvedWorkingDirectory).isDirectory()) { throw new CodexExecutorError( 'gateway_unavailable', 'Working directory is not a directory.', false ); }
if (skipGitRepoCheck) { return; }
const gitDir = path.join(resolvedWorkingDirectory, '.git'); if (!existsSync(gitDir)) { throw new CodexExecutorError( 'gateway_unavailable', 'Working directory is not a git repository.', false ); }}Loading Environment Variables
Section titled “Loading Environment Variables”The Codex SDK needs to load environment variables from the login shell so the AI Agent can access system commands:
function parseEnvironmentOutput(output: Buffer): Record<string, string> { const parsed: Record<string, string> = {};
for (const entry of output.toString('utf8').split('\0')) { if (!entry) continue;
const separatorIndex = entry.indexOf('='); if (separatorIndex <= 0) continue;
const key = entry.slice(0, separatorIndex); const value = entry.slice(separatorIndex + 1); if (key.length > 0) { parsed[key] = value; } }
return parsed;}
function tryLoadEnvironmentFromShell(shellPath: string): Record<string, string> | null { const result = spawnSync(shellPath, ['-ilc', 'env -0'], { env: process.env, stdio: ['ignore', 'pipe', 'pipe'], timeout: 5000, });
if (result.error || result.status !== 0) { return null; }
return parseEnvironmentOutput(result.stdout);}
export function createExecutorEnvironment( envOverrides: Record<string, string> = {}): Record<string, string> { // Load environment variables from the login shell const consoleEnv = loadConsoleEnvironmentFromShell();
return { ...process.env, ...consoleEnv, ...envOverrides, };}Complete Usage Example
Section titled “Complete Usage Example”Basic Usage
Section titled “Basic Usage”In the HagiCode project, we use the following approach to initialize the Codex client and execute tasks:
import { Codex } from '@openai/codex-sdk';
async function executeWithCodex(prompt: string, workingDir: string) { const client = new Codex({ apiKey: process.env.CODEX_API_KEY, env: { PATH: process.env.PATH }, });
const thread = client.startThread({ workingDirectory: workingDir, });
const { events } = await thread.runStreamed(prompt);
let result = ''; for await (const event of events) { if (event.type === 'item.updated' && event.item.type === 'agent_message') { result = event.item.text; } if (event.type === 'turn.completed') { console.log('Token usage:', event.usage); } }
// Try to parse JSON output try { const parsed = JSON.parse(result); return parsed.output; } catch { return result; }}Complete Implementation with Retries
Section titled “Complete Implementation with Retries”export class CodexSdkExecutor { private readonly config: CodexRuntimeConfig; private readonly client: Codex; private threadId: string | null = null;
async executeStage(input: CodexStageExecutionInput): Promise<CodexStageExecutionResult> { const maxAttempts = Math.max(1, this.config.retryCount + 1); let attempt = 0; let lastError: CodexExecutorError | null = null;
while (attempt < maxAttempts) { attempt += 1;
try { const thread = this.getThread(input.workingDirectory); const { output, usage } = await this.runWithStreaming(thread, input);
return { output, usage, threadId: this.threadId!, attempts: attempt, latencyMs: Date.now() - startedAt, }; } catch (error) { const mappedError = mapError(error); lastError = mappedError;
// Non-retryable error or max retry attempts reached if (!mappedError.retryable || attempt >= maxAttempts) { throw mappedError; }
// Wait before retrying await new Promise(resolve => setTimeout(resolve, 1000 * attempt)); } }
throw lastError!; }}Best Practices
Section titled “Best Practices”1. Working Directory Requirements
Section titled “1. Working Directory Requirements”- Make sure the working directory is a valid Git repository
- Use the
PROJECT_ROOTenvironment variable to specify it explicitly - During debugging, you can set
CODEX_SKIP_GIT_REPO_CHECK=trueto skip the check
2. Environment Variable Configuration
Section titled “2. Environment Variable Configuration”- Pass only the required environment variables through a whitelist mechanism
- Use the login shell to load the full environment
- Avoid passing sensitive information
3. Timeout and Retry Strategy
Section titled “3. Timeout and Retry Strategy”- Set reasonable timeouts based on task complexity
- Implement exponential backoff for retryable errors
- Record retry counts and reasons
4. Error Handling
Section titled “4. Error Handling”- Distinguish between retryable and non-retryable errors
- Provide clear error messages and suggestions
- Use unified error codes so upper layers can handle them consistently
5. Streaming Output
Section titled “5. Streaming Output”- Implement incremental output callbacks to improve user experience
- Correctly handle incremental message updates
- Record token usage for cost analysis
In the actual production environment of the HagiCode project, we have already verified the effectiveness of the best practices above. This approach has helped us build a stable and reliable AI execution service. In the end, practical validation matters more than theory alone.
Conclusion
Section titled “Conclusion”The Codex SDK event stream mechanism provides strong capabilities for building AI execution services. By correctly parsing different kinds of events, we can:
- Get execution status and output in real time
- Implement reliable error handling and retry mechanisms
- Obtain detailed execution statistics
- Build a full-featured AI execution platform
The core concepts and code samples introduced in this article can be applied directly in real projects, helping developers get started quickly with Codex SDK integration. If you find this approach valuable, it also reflects the strength of HagiCode’s engineering practice and makes HagiCode itself worth following.
References
Section titled “References”Thank you for reading. If you found this article helpful, please click the like button below so more people can discover it.
This content was created with AI-assisted collaboration, reviewed by the author, and reflects the author’s own views and positions.
- Author: newbe36524
- Article link: https://docs.hagicode.com/blog/2026-03-09-codex-sdk-console-message-parsing/
- Copyright notice: Unless otherwise stated, all articles on this blog are licensed under BY-NC-SA. Please cite the source when reprinting.