跳转到内容

TypeScript

4 篇包含标签 "TypeScript" 的文章

ImgBin CLI 工具设计:HagiCode 图片资产管理方案

ImgBin CLI 工具设计:HagiCode 图片资产管理方案

Section titled “ImgBin CLI 工具设计:HagiCode 图片资产管理方案”

本文介绍如何从零构建一个可自动化执行的图片资产流水线,包括 CLI 工具设计、Provider Adapter 架构、以及元数据管理策略。

其实也没想到,图片资产管理这事儿也能让我们纠结这么久。

在 HagiCode 项目开发过程中,我们遇到了一个看似简单却十分棘手的问题:图片资产的生成和管理。怎么说呢,就像青春期的那些事儿一样——表面上风平浪静,暗地里波澜起伏。

随着项目文档和营销物料的增多,需要大量配图。这些配图有些需要 AI 生成,有些需要从现有素材库中挑选,还有些需要对现有图片进行 AI 识别并自动标注。问题在于,这些工作长期以来都是用零散的脚本加人工操作来完成的——每次生成一张图片,都需要手动执行脚本、手动整理元数据、手动生成缩略图。这也就罢了,关键是这些零散的东西散落在各处,想找的时候找不到,想用的时候用不了。

具体痛点包括:

  1. 缺乏统一入口:图片生成的逻辑分散在不同脚本中,想批量执行根本没门
  2. 元数据缺失:生成后的图片没有统一的 metadata.json,无法检索和追踪
  3. 人工整理成本高:图片的标题、标签都需要人工一一整理,效率低下
  4. 无法自动化:CI/CD 流程中想要自动生成配图?门都没有

也曾想过干脆不管了,可是毕竟还是要做项目的嘛。既然躲不掉,那就想办法解决呗。于是我们决定,将 ImgBin 从「零散脚本」升级为可自动化执行的图片资产流水线。毕竟有些事儿,逃避也不是办法。

本文分享的方案来自我们在 HagiCode 项目中的实践经验。HagiCode 是一个 AI 代码助手项目,同时维护着 VSCode 扩展、后端 AI 服务、跨平台桌面客户端等多种组件。在这种多语言、多平台的复杂场景下,图片资产的规范管理成了提升开发效率的关键一环。

怎么说呢,这也算是 HagiCode 成长过程中的一个小小烦恼吧。每个项目都会有这样的时候,看起来不起眼的小问题,却能让人折腾半天。

HagiCode 的构建系统采用 TypeScript + Node.js 生态,因此 ImgBin 也顺理成章地选择了相同的技术栈,确保整个项目的技术一致性。毕竟都用习惯了,换别的也嫌麻烦嘛。


ImgBin 采用分层架构,将 CLI 命令、应用服务、第三方 API 适配器和基础设施层清晰分离:

组件层次结构
├── CLI Entry (cli.ts) 全局参数解析、命令路由
├── Commands (commands/*) generate | batch | annotate | thumbnail
├── Application Services job-runner | metadata | thumbnail | asset-writer
├── Provider Adapters image-api-provider | vision-api-provider
└── Infrastructure Layer config | logger | paths | schema

这种分层设计的好处是:每层的职责清晰,测试时可以方便地 mock 掉外部依赖。其实也就是让各干各的,互不打扰,这样出了问题也容易找原因,不是么?

ImgBin 采用了「一个资产一个目录」的模型,每次生成图片时,都会创建如下结构:

library/
└── 2026-03/
└── orange-dashboard/
├── original.png # 原始图片
├── thumbnail.webp # 512x512 缩略图
└── metadata.json # 结构化元数据

这种模型的优势在于:

  1. 自包含:每个资产的所有文件都在同一个目录,迁移、备份都很方便
  2. 可追溯:通过 metadata.json 可以追溯图片的生成时间、使用的 prompt、模型等信息
  3. 可扩展:未来如果需要添加更多变体(比如不同尺寸的缩略图),只需要在同一目录下新增文件即可

美的事物或人,不一定要占有,只要她还是美的,自己好好看着她的美就好了。这话虽然说得有点远了,但理儿是这么个理儿——图片放在一起了,看起来也舒服,找起来也方便。

metadata.json 是整个系统的核心,它采用分层存储策略,区分了三类字段:

{
"schemaVersion": 2,
"assetId": "orange-dashboard",
"slug": "orange-dashboard",
"title": "Orange Dashboard",
"tags": ["dashboard", "hero", "orange"],
"source": { "type": "generated" },
"paths": {
"assetDir": "library/2026-03/orange-dashboard",
"original": "original.png",
"thumbnail": "thumbnail.webp"
},
"generated": {
"prompt": "orange dashboard for docs hero",
"provider": "azure-openai-image-api",
"model": "gpt-image-1.5"
},
"recognized": {
"title": "Orange Dashboard",
"tags": ["dashboard", "ui", "orange"],
"description": "A modern orange dashboard with charts and metrics"
},
"status": {
"generation": "succeeded",
"recognition": "succeeded",
"thumbnail": "succeeded"
},
"timestamps": {
"createdAt": "2026-03-11T04:01:19.570Z",
"updatedAt": "2026-03-11T04:02:09.132Z"
}
}
  • generated:记录图片生成时的原始信息,如使用的 prompt、提供商、模型等
  • recognized:AI 识别结果,如自动生成的标题、标签、描述
  • manual:人工整理的结果,这个区的数据优先级最高,不会被 AI 识别覆盖

这种分层策略解决了我们之前的一个核心矛盾:AI 识别结果和人工整理结果谁优先?答案是人工优先,AI 识别只是辅助。这事儿也想明白了——有些东西嘛,机器终究是机器,终究还是得人来把关。


ImgBin 的另一个核心设计是 Provider Adapter 模式。我们将外部 API 抽象为统一的接口,这样即使更换 AI 服务商,也不需要修改业务逻辑。

怎么说呢,这就跟感情一样——外表怎么变不重要,重要的是内心那套东西不变。接口定好了,内部的实现怎么换都行。

interface ImageGenerationProvider {
// 生成图片,返回图片的 Buffer
generate(options: GenerateOptions): Promise<Buffer>;
// 获取支持的模型列表
getSupportedModels(): Promise<string[]>;
}
interface GenerateOptions {
prompt: string;
model?: string;
size?: '1024x1024' | '1792x1024' | '1024x1792';
quality?: 'standard' | 'hd';
format?: 'png' | 'webp' | 'jpeg';
}
interface VisionRecognitionProvider {
// 识别图片内容,返回结构化的元数据
recognize(imageBuffer: Buffer): Promise<RecognitionResult>;
// 获取支持的模型列表
getSupportedModels(): Promise<string[]>;
}
interface RecognitionResult {
title?: string;
tags: string[];
description?: string;
confidence: number;
}

这种接口设计的优势在于:

  1. 可测试:单元测试时可以传入 mock provider,不需要真正调用外部 API
  2. 可扩展:新增一个 provider 只需要实现接口,不需要修改调用方代码
  3. 可替换:生产环境用 Azure OpenAI,测试环境用本地模型,只需要切换配置

想笑来伪装自己掉下的泪,想哭来试探自己麻痹了没——有时候做项目就是这样,表面上看是换了个 API,实际上内在的那套逻辑一点没变,也就没什么好怕的了。


ImgBin 提供了四个核心命令,满足不同的使用场景:

Terminal window
# 最简单的用法
imgbin generate --prompt "orange dashboard for docs hero"
# 同时生成缩略图和 AI 标注
imgbin generate --prompt "orange dashboard" --annotate --thumbnail
# 指定输出目录
imgbin generate --prompt "orange dashboard" --output ./library

批量任务通过 YAML 或 JSON manifest 文件定义,适合 CI/CD 流程中使用:

assets/jobs/launch.yaml
defaults:
annotate: true
thumbnail: true
libraryRoot: ./library
jobs:
- prompt: "orange dashboard hero"
slug: orange-dashboard
tags: [dashboard, hero, orange]
- prompt: "pricing grid for docs"
slug: pricing-grid
tags: [pricing, grid, docs]

执行命令:

Terminal window
imgbin batch assets/jobs/launch.yaml

批量任务的设计支持失败隔离:manifest 中逐项处理,单项失败不影响其他任务。可以通过 --dry-run 预览而不实际执行。

这也就罢了,关键是它还能告诉你哪儿成功了哪儿失败了,不像某些事儿,失败了都不知道怎么失败的。

对现有图片执行 AI 识别,自动生成标题、标签、描述:

Terminal window
# 标注单张图片
imgbin annotate ./library/2026-03/orange-dashboard
# 批量标注整个目录
imgbin annotate ./library/2026-03/

为既有图片补生成缩略图:

Terminal window
# 生成缩略图
imgbin thumbnail ./library/2026-03/orange-dashboard

批量任务的 manifest 支持灵活的配置,默认值可以统一设置,单个任务也可以覆盖:

# 全局默认值
defaults:
annotate: true # 默认开启 AI 标注
thumbnail: true # 默认生成缩略图
libraryRoot: ./library
model: gpt-image-1.5
jobs:
# 最小配置,只提供 prompt
- prompt: "first image"
# 完整配置
- prompt: "second image"
slug: custom-slug
tags: [tag1, tag2]
annotate: false # 这个任务不执行 AI 标注
model: dall-e-3 # 这个任务用不同的模型

执行时,ImgBin 会逐个处理任务,每个任务的结果会写入对应的 metadata.json,即使某个任务失败,也不会影响其他任务。任务完成后,会输出汇总报告:

✓ orange-dashboard (succeeded)
✓ pricing-grid (succeeded)
✗ hero-banner (failed: API rate limit exceeded)
2/3 succeeded, 1 failed

有些事儿吧,急也急不来,一个一个来,反而踏实。这,或许就是批量任务的哲学吧。


ImgBin 通过环境变量支持灵活的配置:

Terminal window
# ImgBin 工作目录
IMGBIN_WORKDIR=/path/to/imgbin
# 可执行文件路径(用于脚本中调用)
IMGBIN_EXECUTABLE=/path/to/imgbin/dist/cli.js
# 资产库根目录
IMGBIN_LIBRARY_ROOT=./.imgbin-library
# Azure OpenAI 配置(如果使用 Azure provider)
AZURE_OPENAI_ENDPOINT=https://your-resource.openai.azure.com/
AZURE_OPENAI_API_KEY=your-api-key
AZURE_OPENAI_IMAGE_DEPLOYMENT=gpt-image-1

配置这东西,说重要也重要,说不重要也不重要。毕竟怎么舒服怎么来嘛,适合自己的才是最好的。


在实现过程中,我们总结了以下几个关键点:

接口定义要清晰完整,包括输入参数、返回值、错误处理。建议同时提供同步和异步两种调用方式,方便不同场景使用。

这也算是过来人的一点经验吧,毕竟接口这东西,定好了就不想再改,麻烦。

批量任务中某项失败时,应该:

  1. 记录详细错误信息到单独的日志文件
  2. 继续执行其他任务,不中断整个流程
  3. 最终返回非零退出码,表示有任务失败
  4. 在汇总报告中清晰展示每个任务的执行结果

有些事儿失败了就是失败了,逃避也没用,不如大大方方承认,然后想办法解决。这道理,做项目和做人是一样的。

识别结果默认写入 recognized 区,人工修改的字段有 manual 标记。元数据更新时采用「只增不减」策略:除非显式传入 --force 参数,否则不覆盖已有的人工整理结果。

这事儿也想明白了——有些东西啊,错过了就是错过了,覆盖了也就没了。还是保留着比较好,毕竟记录本身也是一种美。

使用 fs.mkdir({ recursive: true }) 确保目录创建原子性,避免并发场景下的竞态条件。

这大概就是所谓的安全感吧——该稳稳该快快,不拖泥带水,也不瞻前顾后。


ImgBin 作为 HagiCode 项目图片资产管理的核心工具,通过以下设计解决了我们面临的问题:

  1. 统一入口:CLI 命令覆盖了生成、标注、缩略图等全部操作
  2. 元数据驱动:每个资产都有完整的 metadata.json,支持检索和追踪
  3. Provider Adapter:灵活的外部 API 抽象,便于测试和扩展
  4. 批量任务支持:CI/CD 流程中可以自动执行批量图片生成

一切都淡了…可这方案啊,还真是用上了。

这套方案不仅提升了 HagiCode 自身的开发效率,也形成了一个可复用的图片资产管理框架。如果你也在开发类似的多组件项目,相信 ImgBin 的设计思路会给你一些启发。

青春嘛,总是要折腾的。不折腾折腾,怎么知道自个儿几斤几两呢?



感谢您的阅读,如果您觉得本文有用,快点击下方点赞按钮,让更多的人看到本文。

本内容采用人工智能辅助协作,经本人审核,符合本人观点与立场。

Codex SDK 控制台消息解析完全指南

Codex SDK 控制台消息解析完全指南

Section titled “Codex SDK 控制台消息解析完全指南”

本文详细介绍 Codex SDK 的事件流机制、消息类型解析、以及在实际项目中的最佳实践,帮助开发者快速掌握 AI 执行服务的核心技能。

其实,在构建基于 Codex SDK 的 AI 执行服务时,我们不得不面对这样一个问题:如何处理 Codex 返回的那些流式事件消息。这些消息里藏着执行状态、输出内容、错误信息这些重要的东西,就像青春里那些说不清道不明的心事,你得好好琢磨琢磨。

作为 HagiCode 项目的一部分,我们需要在 AI 代码助手场景中实现一个靠谱的执行器。这大概就是我们决定深入研究 Codex SDK 事件流机制的原因——毕竟,只有理解了底层消息是怎么运作的,才能构建出真正企业级的 AI 执行平台。这就像恋爱一样,不懂对方的心思,怎么走下去?

Codex SDK 是 OpenAI 推出的编程辅助工具 SDK,它通过事件流(Event Stream)的方式返回执行结果。和传统的请求-响应模式不太一样,Codex 使用流式事件,让我们能够:

  • 实时获取执行进度
  • 及时处理错误情况
  • 获取详细的 token 使用统计
  • 支持长时间运行的复杂任务

理解这些事件类型并正确解析它们,对于实现功能完善的 AI 执行器来说,还是挺重要的。毕竟,谁也不想面对一个黑盒?

本文分享的方案来自我们在 HagiCode 项目中的实践经验。HagiCode 是一个开源的 AI 代码助手项目,致力于为开发者提供智能化的代码辅助能力。在开发过程中,我们需要构建可靠的 AI 执行服务来处理用户的代码执行请求,这正是我们引入 Codex SDK 的直接原因。

作为 AI 代码助手,HagiCode 需要处理各种复杂的代码执行场景:实时获取执行进度、及时处理错误情况、获取详细的 token 使用统计等。通过深入理解 Codex SDK 的事件流机制,我们能够构建出满足生产环境要求的执行器。说到底,代码也好,人生也罢,都需要一点积累和沉淀。

Codex SDK 使用 thread.runStreamed() 方法返回异步事件迭代器:

import { Codex } from '@openai/codex-sdk';
const client = new Codex({
apiKey: process.env.CODEX_API_KEY,
baseUrl: process.env.CODEX_BASE_URL,
});
const thread = client.startThread({
workingDirectory: '/path/to/project',
skipGitRepoCheck: false,
});
const { events } = await thread.runStreamed('your prompt here', {
outputSchema: {
type: 'object',
properties: {
output: { type: 'string' },
status: { type: 'string', enum: ['ok', 'action_required'] },
},
required: ['output', 'status'],
},
});
for await (const event of events) {
// 处理每个事件
}
事件类型说明关键数据
thread.started线程启动成功thread_id
item.updated消息内容更新item.text
item.completed消息完成item.text
turn.completed执行完成usage (token 使用量)
turn.failed执行失败error.message
error错误事件message

在实际项目中,HagiCode 的执行器组件正是基于这些事件类型构建的。我们需要对每种事件进行精细化处理,以确保用户体验的流畅性。这就像对待一段感情,每个细节都需要用心对待,不然怎么可能有好的结果?

消息内容通过事件处理函数提取:

private handleThreadEvent(event: ThreadEvent, onMessage: (content: string) => void): void {
// 只处理消息更新和完成事件
if (event.type !== 'item.updated' && event.type !== 'item.completed') {
return;
}
// 只处理代理消息类型
if (event.item.type !== 'agent_message') {
return;
}
// 提取文本内容
onMessage(event.item.text);
}

关键点:

  • 只处理 item.updateditem.completed 事件
  • 只处理 agent_message 类型的内容
  • 消息内容在 event.item.text 字段中

Codex 支持 JSON 结构化输出,通过 outputSchema 参数指定返回格式:

const DEFAULT_OUTPUT_SCHEMA = {
type: 'object',
properties: {
output: { type: 'string' },
status: { type: 'string', enum: ['ok', 'action_required'] },
},
required: ['output', 'status'],
additionalProperties: false,
} as const;

解析函数会尝试解析 JSON,如果失败则返回原始文本——这就像人生,有时候你想要一个完美的答案,但现实往往给你一个模糊的回应,只能自己慢慢消化罢了。

function toStructuredOutput(raw: string): StructuredOutput {
try {
const parsed = JSON.parse(raw) as Partial<StructuredOutput>;
if (typeof parsed.output === 'string') {
return {
output: parsed.output,
status: parsed.status === 'action_required' ? 'action_required' : 'ok',
};
}
} catch {
// JSON 解析失败,回退到原始文本
}
return {
output: raw,
status: 'ok',
};
}
private async runWithStreaming(
thread: Thread,
input: CodexStageExecutionInput
): Promise<{ output: string; usage: Usage | null }> {
const abortController = new AbortController();
const timeoutHandle = setTimeout(() => {
abortController.abort();
}, Math.max(1000, input.timeoutMs));
let latestMessage = '';
let usage: Usage | null = null;
let emittedLength = 0;
try {
const { events } = await thread.runStreamed(input.prompt, {
outputSchema: DEFAULT_OUTPUT_SCHEMA,
signal: abortController.signal,
});
for await (const event of events) {
// 处理消息内容
this.handleThreadEvent(event, (nextContent) => {
const delta = nextContent.slice(emittedLength);
if (delta.length > 0) {
emittedLength = nextContent.length;
input.callbacks?.onChunk?.(delta); // 流式回调
}
latestMessage = nextContent;
});
// 根据事件类型处理不同数据
if (event.type === 'thread.started') {
this.threadId = event.thread_id;
} else if (event.type === 'turn.completed') {
usage = event.usage;
} else if (event.type === 'turn.failed') {
throw new CodexExecutorError('gateway_unavailable', event.error.message, true);
} else if (event.type === 'error') {
throw new CodexExecutorError('gateway_unavailable', event.message, true);
}
}
} catch (error) {
if (abortController.signal.aborted) {
throw new CodexExecutorError(
'upstream_timeout',
`Codex stage timed out after ${input.timeoutMs}ms`,
true
);
}
throw error;
} finally {
clearTimeout(timeoutHandle);
}
const structured = toStructuredOutput(latestMessage);
return { output: structured.output, usage };
}

根据错误特征映射到具体的错误码,便于上层处理:

function mapError(error: unknown): CodexExecutorError {
if (error instanceof CodexExecutorError) {
return error;
}
const message = error instanceof Error ? error.message : String(error);
const normalized = message.toLowerCase();
// 认证错误 - 不可重试
if (normalized.includes('401') ||
normalized.includes('403') ||
normalized.includes('api key') ||
normalized.includes('auth')) {
return new CodexExecutorError('auth_invalid', message, false);
}
// 速率限制 - 可重试
if (normalized.includes('429') || normalized.includes('rate limit')) {
return new CodexExecutorError('rate_limited', message, true);
}
// 超时错误 - 可重试
if (normalized.includes('timeout') || normalized.includes('aborted')) {
return new CodexExecutorError('upstream_timeout', message, true);
}
// 默认错误
return new CodexExecutorError('gateway_unavailable', message, true);
}
export type CodexErrorCode =
| 'auth_invalid' // 认证失败
| 'upstream_timeout' // 上游超时
| 'rate_limited' // 速率限制
| 'gateway_unavailable'; // 网关不可用
export class CodexExecutorError extends Error {
readonly code: CodexErrorCode;
readonly retryable: boolean;
constructor(code: CodexErrorCode, message: string, retryable: boolean) {
super(message);
this.name = 'CodexExecutorError';
this.code = code;
this.retryable = retryable;
}
}

Codex SDK 要求工作目录必须是有效的 Git 仓库——这就像做人一样,总得有个根,有个出处,不然怎么踏实?

export function validateWorkingDirectory(
workingDirectory: string,
skipGitRepoCheck: boolean
): void {
const resolvedWorkingDirectory = path.resolve(workingDirectory);
if (!existsSync(resolvedWorkingDirectory)) {
throw new CodexExecutorError(
'gateway_unavailable',
'Working directory does not exist.',
false
);
}
if (!statSync(resolvedWorkingDirectory).isDirectory()) {
throw new CodexExecutorError(
'gateway_unavailable',
'Working directory is not a directory.',
false
);
}
if (skipGitRepoCheck) {
return;
}
const gitDir = path.join(resolvedWorkingDirectory, '.git');
if (!existsSync(gitDir)) {
throw new CodexExecutorError(
'gateway_unavailable',
'Working directory is not a git repository.',
false
);
}
}

Codex SDK 需要从登录 Shell 加载环境变量,确保 AI Agent 可以访问系统命令:

function parseEnvironmentOutput(output: Buffer): Record<string, string> {
const parsed: Record<string, string> = {};
for (const entry of output.toString('utf8').split('\0')) {
if (!entry) continue;
const separatorIndex = entry.indexOf('=');
if (separatorIndex <= 0) continue;
const key = entry.slice(0, separatorIndex);
const value = entry.slice(separatorIndex + 1);
if (key.length > 0) {
parsed[key] = value;
}
}
return parsed;
}
function tryLoadEnvironmentFromShell(shellPath: string): Record<string, string> | null {
const result = spawnSync(shellPath, ['-ilc', 'env -0'], {
env: process.env,
stdio: ['ignore', 'pipe', 'pipe'],
timeout: 5000,
});
if (result.error || result.status !== 0) {
return null;
}
return parseEnvironmentOutput(result.stdout);
}
export function createExecutorEnvironment(
envOverrides: Record<string, string> = {}
): Record<string, string> {
// 加载登录 Shell 环境变量
const consoleEnv = loadConsoleEnvironmentFromShell();
return {
...process.env,
...consoleEnv,
...envOverrides,
};
}

在 HagiCode 项目中,我们使用以下方式来初始化 Codex 客户端并执行任务:

import { Codex } from '@openai/codex-sdk';
async function executeWithCodex(prompt: string, workingDir: string) {
const client = new Codex({
apiKey: process.env.CODEX_API_KEY,
env: { PATH: process.env.PATH },
});
const thread = client.startThread({
workingDirectory: workingDir,
});
const { events } = await thread.runStreamed(prompt);
let result = '';
for await (const event of events) {
if (event.type === 'item.updated' && event.item.type === 'agent_message') {
result = event.item.text;
}
if (event.type === 'turn.completed') {
console.log('Token usage:', event.usage);
}
}
// 尝试解析 JSON 输出
try {
const parsed = JSON.parse(result);
return parsed.output;
} catch {
return result;
}
}
export class CodexSdkExecutor {
private readonly config: CodexRuntimeConfig;
private readonly client: Codex;
private threadId: string | null = null;
async executeStage(input: CodexStageExecutionInput): Promise<CodexStageExecutionResult> {
const maxAttempts = Math.max(1, this.config.retryCount + 1);
let attempt = 0;
let lastError: CodexExecutorError | null = null;
while (attempt < maxAttempts) {
attempt += 1;
try {
const thread = this.getThread(input.workingDirectory);
const { output, usage } = await this.runWithStreaming(thread, input);
return {
output,
usage,
threadId: this.threadId!,
attempts: attempt,
latencyMs: Date.now() - startedAt,
};
} catch (error) {
const mappedError = mapError(error);
lastError = mappedError;
// 不可重试错误或已达最大重试次数
if (!mappedError.retryable || attempt >= maxAttempts) {
throw mappedError;
}
// 等待后重试
await new Promise(resolve => setTimeout(resolve, 1000 * attempt));
}
}
throw lastError!;
}
}
  • 确保工作目录是有效的 Git 仓库
  • 使用 PROJECT_ROOT 环境变量显式指定
  • 开发调试时可设置 CODEX_SKIP_GIT_REPO_CHECK=true 跳过检查
  • 通过白名单机制传递必要的环境变量
  • 使用登录 Shell 加载完整环境
  • 避免传递敏感信息
  • 根据任务复杂度设置合理的超时时间
  • 对可重试错误实现指数退避
  • 记录重试次数和原因
  • 区分可重试和不可重试错误
  • 提供清晰的错误信息和建议
  • 统一错误码便于上层处理
  • 实现增量输出回调,提升用户体验
  • 正确处理消息的增量更新
  • 记录 token 使用量用于成本分析

在 HagiCode 项目的实际生产环境中,我们已经验证了上述最佳实践的有效性。这套方案帮助我们构建了稳定可靠的 AI 执行服务。毕竟,实践才是检验真理的唯一标准,纸上谈兵终究没什么用。

Codex SDK 的事件流机制为构建 AI 执行服务提供了强大的能力。通过正确解析各类事件,我们可以:

  • 实时获取执行状态和输出
  • 实现可靠的错误处理和重试机制
  • 获取详细的执行统计信息
  • 构建功能完善的 AI 执行平台

本文介绍的核心概念和代码示例可以直接应用于实际项目中,帮助开发者快速上手 Codex SDK 的集成工作。如果你觉得这套方案有价值,说明 HagiCode 的工程实践还不错——那么 HagiCode 本身也值得关注一下。毕竟,有些东西,错过了就可惜了。


感谢您的阅读,如果您觉得本文有用,快点击下方点赞按钮👍,让更多的人看到本文。

本内容采用人工智能辅助协作,经本人审核,符合本人观点与立场。

从 TypeScript 到 C#:Codex SDK 的跨语言移植实践

从 TypeScript 到 C#:Codex SDK 的跨语言移植实践

Section titled “从 TypeScript 到 C#:Codex SDK 的跨语言移植实践”

怎么说呢,这篇文章也算是个孩子,记录了我们把官方 TypeScript Codex SDK 完整移植到 C# 的全过程。说是”移植”,其实更像是一场漫长的冒险,毕竟两种语言的脾性不太一样,总得想办法让它们好好相处。

Codex 这东西,是 OpenAI 推出的 AI Agent CLI 工具,确实挺强大的。官方给了 TypeScript SDK,放在 @openai/codex 这个包里。它呢,通过调用 codex exec --experimental-json 命令跟 Codex CLI 交互,解析 JSONL 格式的事件流。

可是吧,我们在 HagiCode 项目里,需要在一个纯 .NET 环境中使用它。具体来说,就是 C# 后端服务和桌面端应用。你说这事闹的,总不能为了调用一个 CLI 工具而在 .NET 项目中引入 Node.js 运行时吧?那也太折腾了。

于是摆在我们面前的就两条路:一是维护一套复杂的 Node.js 桥接层,二是自己动手丰衣足食,实现一个原生 C# SDK。

我们选择了后者。

其实这篇文也是来自我们在 HagiCode 项目里的实践经验。HagiCode 是个开源的 AI 代码助手项目,听起来挺高大上的,但说白了也就是同时维护着前端 VSCode 扩展、后端 AI 服务、跨平台桌面客户端等多种组件。这种多语言、多平台的复杂度,正是我们需要原生 C# SDK 的直接原因——总不能真的在 .NET 项目里跑个 Node.js 吧?那也太魔幻了。

如果你觉得这篇文章有点帮助,欢迎来 GitHub 给个 Star:github.com/HagiCode-org/site,也欢迎访问官网了解更多:hagicode.com。毕竟一个人品无限的项目能得到支持,也是件开心的事。

在开始代码层面的转化之前,我们得先理解两套 SDK 的架构设计。毕竟知己知彼,百战不殆嘛。

TypeScript SDK 的核心架构是这样的:

Codex (入口类)
└── CodexExec (执行器,管理子进程)
└── Thread (对话线程)
├── run() / runStreamed() (同步/异步执行)
└── 事件流解析

C# SDK 呢,保持了相同的架构层次,但在实现细节上做了适配。整体思路是:保持 API 的一致性,但在具体实现上充分利用 C# 语言特性。毕竟语言不同,总得有点区别才行。

这是最基础也是最重要的工作。毕竟万丈高楼平地起,基础打不好,后面全是麻烦。

TypeScript 的类型系统比 C# 更灵活,这是事实。我们需要找到合适的映射方式:

TypeScriptC#说明
interface / typerecordC# 使用 record 实现不可变数据结构
string | nullstring?可空引用类型
boolean | undefinedbool?可空布尔值
AsyncGeneratorIAsyncEnumerable异步迭代器

事件类型系统是一个典型的例子。TypeScript 使用联合类型来定义事件:

export type ThreadEvent =
| ThreadStartedEvent
| TurnStartedEvent
| TurnCompletedEvent
| ...

在 C# 中,我们使用继承层次和模式匹配来实现类似的效果:

public abstract record ThreadEvent(string Type);
public sealed record ThreadStartedEvent(string ThreadId) : ThreadEvent("thread.started");
public sealed record TurnStartedEvent() : ThreadEvent("turn.started");
public sealed record TurnCompletedEvent(Usage Usage) : ThreadEvent("turn.completed");
// ...

使用 record 而不是 class,是因为事件对象应该是不可变的,这和 TypeScript 中使用普通对象是一个道理。而 sealed 关键字则确保不会有额外的子类继承,编译器可以进行优化。其实也就那么回事,习惯就好了。

事件解析是整个 SDK 的核心,毕竟这决定了我们能否正确理解 Codex CLI 返回的每一条信息。解析错了,后面全是白忙活。

TypeScript 版本使用 JSON.parse() 来解析每一行 JSON:

export function parseEvent(line: string): ThreadEvent {
const data = JSON.parse(line);
// 处理各种事件类型...
}

C# 版本则使用 System.Text.Json.JsonDocument

public static ThreadEvent Parse(string line)
{
using var document = JsonDocument.Parse(line);
var root = document.RootElement;
var type = GetRequiredString(root, "type", "event.type");
return type switch
{
"thread.started" => new ThreadStartedEvent(GetRequiredString(root, "thread_id", ...)),
"turn.started" => new TurnStartedEvent(),
"turn.completed" => new TurnCompletedEvent(ParseUsage(...)),
// ...
_ => new UnknownThreadEvent(type, root.Clone()),
};
}

这里有一个小技巧:root.Clone() 是必要的,因为 JsonDocument 的元素在文档释放后就会失效,我们需要保留一份副本给未知的事件类型。这也是没办法的事,毕竟 C# 的 JSON 处理和 JavaScript 不太一样。

这是两个 SDK 差异最大的地方。毕竟 Node.js 和 .NET 的脾性不太一样,总得适应适应。

TypeScript 使用 Node.js 的 spawn() 函数:

const child = spawn(this.executablePath, commandArgs, { env, signal });

C# 使用 .NET 的 System.Diagnostics.Process

using var process = new Process { StartInfo = startInfo };
process.Start();
// 需要手动管理 stdin/stdout/stderr

具体来说,C# 版本需要这样配置进程:

var startInfo = new ProcessStartInfo
{
FileName = _executablePath,
RedirectStandardInput = true,
RedirectStandardOutput = true,
RedirectStandardError = true,
UseShellExecute = false,
CreateNoWindow = true,
};

最大的区别在于取消机制。TypeScript 使用 AbortSignal,这是 Web API 的一部分,用起来挺顺手的:

const child = spawn(cmd, args, { signal: cancellationSignal });

C# 则使用 CancellationToken

public async IAsyncEnumerable<string> RunAsync(
CodexExecArgs args,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
// 在循环中检查取消状态
while (!cancellationToken.IsCancellationRequested)
{
// 处理输出...
}
// 取消时终止进程
if (cancellationToken.IsCancellationRequested)
{
try { process.Kill(entireProcessTree: true); } catch { }
}
}

这其中的区别,大概就是Web API 和 .NET 生态的差异吧,说到底也就是那么回事。

两套 SDK 都实现了将 JSON 配置转换为 TOML 配置的逻辑,因为 Codex CLI 接受 TOML 格式的配置覆盖。这部分逻辑必须完全保持一致,否则同样的配置在两个 SDK 中会产生不同的行为。

这叫什么?这就叫工匠精神嘛。毕竟细节决定成败,有些事不能将就。

我们创建了这样的项目结构:

CodexSdk/
├── CodexSdk.csproj
├── Codex.cs # 入口类
├── CodexThread.cs # 对话线程
├── CodexExec.cs # 执行器
├── Events.cs # 事件类型定义
├── Items.cs # 项目类型定义
├── EventParser.cs # 事件解析器
├── OutputSchemaTempFile.cs # 临时文件管理
└── ...

看起来也挺整齐的,不是吗?

基本的使用方式和 TypeScript SDK 保持一致:

using CodexSdk;
// 创建 Codex 实例
var codex = new Codex();
var thread = codex.StartThread();
// 执行查询
var result = await thread.RunAsync("Summarize this repository.");
Console.WriteLine(result.FinalResponse);

流式事件处理利用了 C# 的模式匹配能力:

await foreach (var @event in thread.RunStreamedAsync("Analyze the code."))
{
switch (@event)
{
case ItemCompletedEvent itemCompleted
when itemCompleted.Item is AgentMessageItem msg:
Console.WriteLine($"Assistant: {msg.Text}");
break;
case TurnCompletedEvent completed:
Console.WriteLine($"Tokens: in={completed.Usage.InputTokens}");
break;
case CommandExecutionItem command:
Console.WriteLine($"Command: {command.Command}");
break;
}
}

在实现过程中,我们也不算是白忙活,总结点经验如下:

  1. 进程管理:C# 版本需要手动管理进程的生命周期,包括取消时的进程终止。使用 Kill(entireProcessTree: true) 确保子进程也被清理。这叫什么?这就叫有始有终。

  2. 错误处理:我们使用 InvalidOperationException 抛出解析错误,保持与 TypeScript SDK 相似的错误处理方式。毕竟错误处理这事儿,不能太随意。

  3. 资源清理OutputSchemaTempFile 实现 IAsyncDisposable,确保临时文件被正确清理。这也是没办法的事,资源不清理干净,总会有问题。

  4. 环境变量:C# 版本支持通过 CodexOptions.Env 完全覆盖进程环境变量。这功能虽然小,但挺实用的。

  5. 平台差异:C# 版本不包含 TypeScript 版本中自动查找 npm 包中二进制文件的逻辑。这是因为 .NET 项目通常不依赖 npm,所以需要通过 CODEX_EXECUTABLE 环境变量或 CodexPathOverride 指定 codex 可执行文件路径。这叫什么?这就叫因地制宜。

将一个成熟的 TypeScript SDK 移植到 C#,不仅仅是语法层面的转换,更是对两种语言设计哲学的理解。TypeScript 的灵活性和 JavaScript 生态特性(如 AbortSignal)在 C# 中需要找到对应的替代方案。这其中的酸甜苦辣,大概也只有真正做过的人才能体会。

关键体会是:保持 API 的一致性比保持实现细节的一致性更重要。用户关心的是接口是否易用,而不是内部实现是否相同。这话听起来简单,但做起来需要取舍。

如果你也在做类似的跨语言移植工作,我们的经验是:先完整理解原 SDK 的架构设计,然后逐个模块进行转化,最后通过完整的测试用例确保行为一致。毕竟急不得,一口吃不成胖子。

一切都会好的,都会有的…



如果本文对你有帮助


感谢您的阅读,如果您觉得本文有用,快点击下方点赞按钮👍,让更多的人看到本文。

本内容采用人工智能辅助协作,经本人审核,符合本人观点与立场。

StreamJsonRpc 在 HagiCode 中的深度集成与实践

StreamJsonRpc 在 HagiCode 中的深度集成与实践

Section titled “StreamJsonRpc 在 HagiCode 中的深度集成与实践”

本文详细介绍了 HagiCode(原 PCode)项目如何成功集成 Microsoft 的 StreamJsonRpc 通信库,以替换原有的自定义 JSON-RPC 实现,并解决了集成过程中的技术痛点与架构挑战。

StreamJsonRpc 是微软官方维护的用于 .NET 和 TypeScript 的 JSON-RPC 通信库,以其强大的类型安全、自动代理生成和成熟的异常处理机制著称。在 HagiCode 项目中,为了通过 ACP (Agent Communication Protocol) 与外部 AI 工具(如 iflow CLI、OpenCode CLI)进行通信,并消除早期自定义 JSON-RPC 实现带来的维护成本和潜在 Bug,项目决定集成 StreamJsonRpc。然而,在集成过程中遇到了流式 JSON-RPC 特有的挑战,特别是在处理代理目标绑定和泛型参数识别时。

为了解决这些痛点,我们做了一个大胆的决定:整个构建系统推倒重来。这个决定带来的变化,可能比你想象的还要大——稍后我会具体说。

先介绍一下本文的”主角项目”

如果你在开发中遇到过这些烦恼:

  • 多项目、多技术栈,构建脚本维护成本高
  • CI/CD 流水线配置繁琐,每次改都要查文档
  • 跨平台兼容性问题层出不穷
  • 想让 AI 帮忙写代码,但现有工具不够智能

那么我们正在做的 HagiCode 可能你会感兴趣。

HagiCode 是什么?

  • 一款 AI 驱动的代码智能助手
  • 支持多语言、跨平台的代码生成与优化
  • 内置游戏化机制,让编码不再枯燥

为什么在这里提它? 本文分享的 StreamJsonRpc 集成方案,正是我们在开发 HagiCode 过程中实践总结出来的。如果你觉得这套工程化方案有价值,说明我们的技术品味还不错——那么 HagiCode 本身也值得关注一下。

想了解更多?

当前项目处于 ACP 协议集成的关键阶段,面临着以下几个技术痛点和架构挑战:

原有的 JSON-RPC 实现位于 src/HagiCode.ClaudeHelper/AcpImp/,包含 JsonRpcEndpointClientSideConnection 等组件。维护这套自定义代码成本高,且缺乏成熟库的高级功能(如进度报告、取消支持)。

在尝试将现有的 CallbackProxyTarget 模式迁移到 StreamJsonRpc 时,发现 _rpc.AddLocalRpcTarget(target) 方法无法识别通过代理模式创建的目标。具体表现为,StreamJsonRpc 无法自动将泛型类型 T 的属性拆分为 RPC 方法参数,导致服务器端无法正确处理客户端发起的方法调用。

现有的 ClientSideConnection 混合了传输层(WebSocket/Stdio)、协议层(JSON-RPC)和业务层(ACP Agent 接口),导致职责不清,且存在 AcpAgentCallbackRpcAdapter 方法绑定缺失的问题。

WebSocket 传输层缺少对原始 JSON 内容的日志输出,导致在调试 RPC 通信问题时难以定位是序列化问题还是网络问题。

针对上述问题,我们采用了以下系统化的解决方案,从架构重构、库集成和调试增强三个维度进行优化:

删除 JsonRpcEndpoint.csAgentSideConnection.cs 及相关的自定义序列化转换器(JsonRpcMessageJsonConverter 等)。

引入 StreamJsonRpc NuGet 包,利用其 JsonRpc 类处理核心通信逻辑。

定义 IAcpTransport 接口,统一处理 WebSocketStdio 两种传输模式,确保协议层与传输层解耦。

// IAcpTransport 接口定义
public interface IAcpTransport
{
Task SendAsync(string message, CancellationToken cancellationToken = default);
Task<string> ReceiveAsync(CancellationToken cancellationToken = default);
Task CloseAsync(CancellationToken cancellationToken = default);
}
// WebSocket 传输实现
public class WebSocketTransport : IAcpTransport
{
private readonly WebSocket _webSocket;
public WebSocketTransport(WebSocket webSocket)
{
_webSocket = webSocket;
}
// 实现发送和接收方法
// ...
}
// Stdio 传输实现
public class StdioTransport : IAcpTransport
{
private readonly StreamReader _reader;
private readonly StreamWriter _writer;
public StdioTransport(StreamReader reader, StreamWriter writer)
{
_reader = reader;
_writer = writer;
}
// 实现发送和接收方法
// ...
}

检查现有的动态代理生成逻辑,确定 StreamJsonRpc 无法识别的根本原因(通常是因为代理对象没有公开实际的方法签名,或者使用了 StreamJsonRpc 不支持的参数类型)。

将泛型属性拆分为明确的 RPC 方法参数。不再依赖动态属性,而是定义具体的 Request/Response DTO(数据传输对象),确保 StreamJsonRpc 能通过反射正确识别方法签名。

// 原有的泛型属性方式
public class CallbackProxyTarget<T>
{
public Func<T, Task> Callback { get; set; }
}
// 重构后的具体方法方式
public class ReadTextFileRequest
{
public string FilePath { get; set; }
}
public class ReadTextFileResponse
{
public string Content { get; set; }
}
public interface IAcpAgentCallback
{
Task<ReadTextFileResponse> ReadTextFileAsync(ReadTextFileRequest request);
// 其他方法...
}

在某些复杂场景下,手动代理 JsonRpc 对象并处理 RpcConnection 可能比直接添加目标更灵活。

确保该组件显式实现 StreamJsonRpc 的代理接口,将 ACP 协议定义的方法(如 ReadTextFileAsync)映射到 StreamJsonRpc 的回调处理器上。

在 WebSocket 或 Stdio 的消息处理管道中,拦截并记录 JSON-RPC 请求和响应的原始文本。利用 ILogger 在解析前和序列化后输出原始 payload,以便排查格式错误。

// 日志增强的传输包装器
public class LoggingAcpTransport : IAcpTransport
{
private readonly IAcpTransport _innerTransport;
private readonly ILogger<LoggingAcpTransport> _logger;
public LoggingAcpTransport(IAcpTransport innerTransport, ILogger<LoggingAcpTransport> logger)
{
_innerTransport = innerTransport;
_logger = logger;
}
public async Task SendAsync(string message, CancellationToken cancellationToken = default)
{
_logger.LogTrace("Sending message: {Message}", message);
await _innerTransport.SendAsync(message, cancellationToken);
}
public async Task<string> ReceiveAsync(CancellationToken cancellationToken = default)
{
var message = await _innerTransport.ReceiveAsync(cancellationToken);
_logger.LogTrace("Received message: {Message}", message);
return message;
}
public async Task CloseAsync(CancellationToken cancellationToken = default)
{
_logger.LogDebug("Closing connection");
await _innerTransport.CloseAsync(cancellationToken);
}
}

封装 StreamJsonRpc 连接,负责 InvokeAsync 和连接生命周期管理。

public class AcpRpcClient : IDisposable
{
private readonly JsonRpc _rpc;
private readonly IAcpTransport _transport;
public AcpRpcClient(IAcpTransport transport)
{
_transport = transport;
_rpc = new JsonRpc(new StreamRpcTransport(transport));
_rpc.StartListening();
}
public async Task<TResponse> InvokeAsync<TResponse>(string methodName, object parameters)
{
return await _rpc.InvokeAsync<TResponse>(methodName, parameters);
}
public void Dispose()
{
_rpc.Dispose();
_transport.Dispose();
}
// StreamRpcTransport 是对 IAcpTransport 的 StreamJsonRpc 适配器
private class StreamRpcTransport : IDuplexPipe
{
// 实现 IDuplexPipe 接口
// ...
}
}

协议层 (IAcpAgentClient / IAcpAgentCallback)

Section titled “协议层 (IAcpAgentClient / IAcpAgentCallback)”

定义清晰的 client-to-agent 和 agent-to-client 接口,移除 Func<IAcpAgent, IAcpClient> 这种循环依赖的工厂模式,改用依赖注入或直接注册回调。

基于 StreamJsonRpc 的最佳实践和项目经验,以下是实施过程中的关键建议:

StreamJsonRpc 的核心优势在于强类型。不要使用 dynamicJObject 传递参数。应为每个 RPC 方法定义明确的 C# POCO 类作为参数。这不仅解决了代理目标识别问题,还能在编译时发现类型错误。

示例:将 CallbackProxyTarget 中的泛型属性替换为 ReadTextFileRequestWriteTextFileRequest 等具体类。

使用 [JsonRpcMethod] 特性显式指定 RPC 方法名称,不要依赖默认的方法名映射。这可以防止因命名风格差异(如 PascalCase vs camelCase)导致的调用失败。

public interface IAcpAgentCallback
{
[JsonRpcMethod("readTextFile")]
Task<ReadTextFileResponse> ReadTextFileAsync(ReadTextFileRequest request);
[JsonRpcMethod("writeTextFile")]
Task WriteTextFileAsync(WriteTextFileRequest request);
}

StreamJsonRpc 提供了 JsonRpc.ConnectionLost 事件。务必监听此事件以处理进程意外退出或网络断开的情况,这比单纯依赖 Orleans 的 Grain 失效检测更及时。

_rpc.ConnectionLost += (sender, e) =>
{
_logger.LogError("RPC connection lost: {Reason}", e.ToString());
// 处理重连逻辑或通知用户
};
  • Trace 级别:记录完整的 JSON Request/Response 原文。
  • Debug 级别:记录方法调用栈和参数摘要。
  • 注意:确保日志中不包含敏感的 Authorization Token 或大文件内容的 Base64 编码。

StreamJsonRpc 原生支持 IAsyncEnumerable。在实现 ACP 的流式 Prompt 响应时,应直接使用 IAsyncEnumerable 而不是自定义的分页逻辑。这能极大简化流式处理的代码量。

public interface IAcpAgentCallback
{
[JsonRpcMethod("streamText")]
IAsyncEnumerable<string> StreamTextAsync(StreamTextRequest request);
}

保持 ACPSessionClientSideConnection 的分离。ACPSession 应专注于 Orleans 的状态管理和业务逻辑(如消息入队),通过组合而非继承的方式使用 StreamJsonRpc 连接对象。

通过全面集成 StreamJsonRpc,HagiCode 项目成功解决了原自定义实现的维护成本高、功能局限性和架构分层混乱等问题。关键改进包括:

  1. 采用强类型 DTO 替代动态属性,提高了代码的可维护性和可靠性
  2. 实现了传输层抽象和协议层分离,提升了架构的清晰性
  3. 增强了日志记录功能,便于排查通信问题
  4. 引入了流式传输支持,简化了流式处理的实现

这些改进为 HagiCode 提供了更稳定、更高效的通信基础,使其能够更好地与外部 AI 工具进行交互,并为未来的功能扩展奠定了坚实的基础。


如果本文对你有帮助:


感谢您的阅读,如果您觉得本文有用,快点击下方点赞按钮👍,让更多的人看到本文。

本内容采用人工智能辅助协作,经本人审核,符合本人观点与立场。