标签:
JavaMCPStdio进程通信JSON-RPCj-langchain
前置阅读:MCP 协议通信详解:从握手到工具调用的完整流程
适合人群:希望深入理解 MCP Stdio 传输机制、或需要排查本地 MCP 服务器问题的 Java 开发者
一、Stdio 是什么,适合什么场景
MCP 支持三种传输方式:Stdio、SSE、HTTP。Stdio(标准输入输出)是其中最简单直接的一种,通过进程间通信实现客户端和服务器的双向通信:
┌─────────────┐ stdin → ┌─────────────┐ │ 客户端进程 │ │ 服务器进程 │ │ │ ← stdout │ │ └─────────────┘ ← stderr └─────────────┘ (日志)客户端启动服务器进程后,通过stdin发送 JSON-RPC 请求,从stdout读取响应,stderr用于接收服务器日志。
这种方式天然适合本地工具:npx启动的官方 MCP 服务器(server-filesystem、server-memory等)、本地 Python/Go 脚本封装的工具,都走 Stdio 通信。不需要任何网络配置,进程级别天然隔离。
代价是:不支持跨网络,并发模型是同步单线程,进程的启动和销毁需要自己管理。
二、完整对接流程
Stdio 模式的通信流程与协议层完全一致(握手三步不变),差异只在传输细节上:
客户端 服务器进程 │ │ │──── 1. ProcessBuilder.start() ────────> │(启动进程) │──── 2. 获取 stdin/stdout/stderr ──────> │(建立 I/O 流) │──── 3. 启动 stderr 监听线程 ──────────> │(必须,防止缓冲区死锁) │ │ │──── 4. 写入 initialize 到 stdin ──────> │ │<─── 5. 从 stdout 读取响应 ───────────── │ │──── 6. 写入 initialized 通知到 stdin ──> │ ← 握手完成 │ │ │──── 7. 正常工具调用 ───────────────────> │ │<─── 8. 读取工具结果 ──────────────────── │ │ │ │──── 9. 关闭 stdin → 等待退出 ──────────> │(优雅关闭)三、逐步实现
第一步:启动进程
List<String>command=newArrayList<>();// npx 启动官方 MCP 服务器command.add("npx");command.add("-y");command.add("@modelcontextprotocol/server-memory");// 或者 uv 启动 Python MCP 服务器// command.add("uv");// command.add("--directory"); command.add("/path/to/mcp");// command.add("run"); command.add("mcp-server");ProcessBuilderpb=newProcessBuilder(command);// 注入环境变量(按需)pb.environment().put("NODE_ENV","production");pb.environment().put("DEBUG","false");ProcessserverProcess=pb.start();命令列表中每个元素是独立的参数,不要把多个参数合并成一个字符串传入,否则会被当成单个带空格的命令名执行失败。
第二步:建立 I/O 流
// 明确指定 UTF-8,避免中文乱码BufferedWriterstdin=newBufferedWriter(newOutputStreamWriter(serverProcess.getOutputStream(),StandardCharsets.UTF_8));BufferedReaderstdout=newBufferedReader(newInputStreamReader(serverProcess.getInputStream(),StandardCharsets.UTF_8));BufferedReaderstderr=newBufferedReader(newInputStreamReader(serverProcess.getErrorStream(),StandardCharsets.UTF_8));名称容易让人困惑:getOutputStream()返回的是写入到服务器 stdin 的流,getInputStream()返回的是从服务器 stdout 读取的流。
第三步:启动 stderr 监听线程(必须)
这一步很容易被忽略,但它是防止死锁的关键。
ThreadstderrThread=newThread(()->{try{Stringline;while((line=stderr.readLine())!=null){if(line.contains("ERROR")||line.contains("error")){log.error("[{}] {}",serverName,line);lastError=line;}else{log.info("[{}] {}",serverName,line);}}}catch(IOExceptione){if(connected){log.error("[{}] stderr 读取异常:{}",serverName,e.getMessage());}}});stderrThread.setDaemon(true);// 守护线程,主线程退出时自动结束stderrThread.setName("mcp-stderr-"+serverName);stderrThread.start();为什么必须用独立线程?操作系统为进程间通信分配了有限的缓冲区。如果不持续读取stderr,缓冲区满后服务器进程会阻塞在写日志上,无法继续处理请求,最终导致整个通信死锁。
第四步:发送请求和接收响应
Stdio 的请求响应是严格同步的:写一条,读一条。
publicsynchronizedMcpResponsesendRequest(Stringmethod,Objectparams)throwsException{if(!connected){thrownewIllegalStateException("未建立连接");}// 构建 JSON-RPC 请求McpRequestrequest=newMcpRequest();request.id=nextRequestId();request.method=method;request.params=params;StringrequestJson=mapper.writeValueAsString(request);log.debug("[{}] → {}",serverName,requestJson);// 写入 stdin,每条消息以换行符结尾stdin.write(requestJson+"\n");stdin.flush();// 必须 flush,否则数据留在缓冲区不会发送// 从 stdout 读取响应(阻塞直到有数据)StringresponseJson=stdout.readLine();if(responseJson==null){thrownewIOException("服务器已关闭连接");}log.debug("[{}] ← {}",serverName,responseJson);McpResponseresponse=mapper.readValue(responseJson,McpResponse.class);if(response.error!=null){thrownewMcpException(response.error.code,response.error.message,response.error.data);}returnresponse;}synchronized是必要的,原因是 Stdio 通信是严格的一问一答:必须等上一个请求的响应读取完毕,才能发下一个请求,否则多线程并发写入会导致 JSON 数据交错,readLine()读到的响应也会对不上请求。
第五步:发送通知
通知没有id,也不等待响应(initialized握手通知就是这种):
protectedvoidsendNotification(Stringmethod,Objectparams)throwsException{// 用 Map 手动构建,确保序列化结果中没有 id 字段Map<String,Object>notification=newHashMap<>();notification.put("jsonrpc","2.0");notification.put("method",method);notification.put("params",params);Stringjson=mapper.writeValueAsString(notification);stdin.write(json+"\n");stdin.flush();// 不读取响应}第六步:优雅关闭
关闭顺序很重要,错误的顺序可能导致进程无法退出:
publicvoidclose(){connected=false;try{// 1. 先关闭 stdin:服务器收到 EOF 后会知道客户端断开,开始清理if(stdin!=null)stdin.close();if(stdout!=null)stdout.close();if(stderr!=null)stderr.close();// 2. 等待进程正常退出(给服务器时间做清理)if(serverProcess!=null){booleanexited=serverProcess.waitFor(5,TimeUnit.SECONDS);if(!exited){log.warn("[{}] 进程未正常退出,发送 SIGTERM",serverName);serverProcess.destroy();if(!serverProcess.waitFor(2,TimeUnit.SECONDS)){log.warn("[{}] 进程仍未退出,发送 SIGKILL",serverName);serverProcess.destroyForcibly();}}log.info("[{}] 进程退出码:{}",serverName,serverProcess.exitValue());}}catch(Exceptione){log.error("[{}] 关闭连接异常:{}",serverName,e.getMessage());}}destroy()发送 SIGTERM(允许进程做清理),destroyForcibly()发送 SIGKILL(强制终止)。先发 SIGTERM,超时后再 SIGKILL,是标准的优雅关闭流程。
四、三个容易踩坑的地方
坑一:不读 stderr 导致死锁
症状:请求发出后程序永久阻塞,无响应,CPU 占用低。
原因:服务器输出了大量日志到 stderr,缓冲区满后服务器阻塞,无法处理请求,stdout.readLine()永远等不到响应。
解决:stderr必须在单独线程中持续读取,且要在发送任何请求之前启动该线程。
坑二:忘记 flush 导致请求不发送
症状:程序卡在stdout.readLine(),服务器没有收到任何请求。
原因:BufferedWriter有内部缓冲区,写入的数据未必立即发送到操作系统管道。不调用flush()的话,数据可能在缓冲区里待很久才发出,甚至等到缓冲区满了才发。
解决:每次stdin.write(...)之后必须紧跟stdin.flush()。
坑三:进程无法退出变成僵尸进程
症状:serverProcess.waitFor()永久阻塞,进程列表中能看到该进程还在运行。
原因:服务器进程在等待 stdin 输入(因为 stdin 未关闭),或者服务器启动了子进程而父进程退出后子进程没有一起退出。
解决:关闭时先stdin.close()让服务器感知到 EOF,再waitFor(超时),超时后destroyForcibly()。
五、超时处理
stdout.readLine()默认无限阻塞,生产环境需要加超时:
// 方案一:用 CompletableFuture 加超时publicMcpResponsesendRequestWithTimeout(Stringmethod,Objectparams,longtimeoutMs)throwsException{CompletableFuture<McpResponse>future=CompletableFuture.supplyAsync(()->{try{returnsendRequest(method,params);}catch(Exceptione){thrownewCompletionException(e);}});returnfuture.get(timeoutMs,TimeUnit.MILLISECONDS);}// 方案二:轮询检查(适合简单场景)publicStringreadLineWithTimeout(longtimeoutMs)throwsIOException,TimeoutException{longdeadline=System.currentTimeMillis()+timeoutMs;while(System.currentTimeMillis()<deadline){if(stdout.ready()){returnstdout.readLine();}Thread.sleep(10);}thrownewTimeoutException("读取响应超时");}CompletableFuture方案更干净,但要注意超时后线程仍可能阻塞在readLine()——此时需要关闭流来中断阻塞,或者接受线程泄漏(在连接关闭时才会释放)。
六、与 SSE、HTTP 的横向对比
| 维度 | Stdio | SSE | HTTP |
|---|---|---|---|
| 适用场景 | 本地工具、npx 服务器 | 远程服务、需要推送 | 简单 HTTP 接口 |
| 跨网络 | ❌ | ✅ | ✅ |
| 并发模型 | 同步单线程 | 异步多路复用 | 同步(连接池) |
| 进程管理 | 需要自己管理 | 无需管理 | 无需管理 |
| 实现复杂度 | 低(线性读写) | 高(异步匹配) | 低(请求响应) |
| 启动延迟 | 高(启动进程) | 中(建立连接) | 低 |
| 调试难度 | 中 | 高 | 低 |
核心代码复杂度的差异体现在请求/响应的匹配方式上:
// Stdio:同步,写完直接读,一一对应stdin.write(request+"\n");stdin.flush();Stringresponse=stdout.readLine();// SSE:异步,用 CompletableFuture 按 id 匹配pendingResponses.put(requestId,newCompletableFuture<>());sendHttpPost(request);returnpendingResponses.get(requestId).get(30,TimeUnit.SECONDS);// HTTP:同步,但响应可能是 SSE 格式需额外解析ResponsehttpResp=httpClient.newCall(request).execute();Stringjson=parseSseFormatIfNeeded(httpResp.body().string());Stdio 的同步模型反而是最好理解和调试的,代价是无法并发。对于本地工具调用,这个代价通常完全可以接受。
七、总结
Stdio 对接的七个关键点:
- 命令参数逐个拆分,不要合并成带空格的字符串
- 明确指定 UTF-8 编码,避免中文乱码
- 必须在发请求前启动 stderr 监听线程,防止缓冲区死锁
- 每次写入后立即 flush,确保数据发送
- 用 synchronized 串行化请求,保证请求响应一一对应
- 关闭顺序:先关 stdin → waitFor → destroy → destroyForcibly
- 生产环境加超时,避免 readLine() 无限阻塞
在 j-langchain 中,这些细节全部封装在McpServerConnection内部。通过McpConnectionFactory.createConnection("name", config)创建连接,connect()完成启动和握手,后续直接调用listTools()和callTool()即可,不需要手动处理任何管道细节。
📎 相关资源
- MCP 官方规范:https://modelcontextprotocol.io
- JSON-RPC 2.0 规范:https://www.jsonrpc.org/specification
- j-langchain GitHub:https://github.com/flower-trees/j-langchain
- j-langchain Gitee 镜像:https://gitee.com/flower-trees-z/j-langchain