作者:逆境不可逃
技术永无止境
希望我的内容可以帮助到你!!!!
大家吼 ! 我是 逆境不可逃 今天给大家带来文章《【与我学 ClaudeCode】协作篇 之 Worktree + Task Isolation :目录隔离的并行执行通道》.
Learn-Claude-Code 官方地址 :shareAI-lab/learn-claude-code: Bash is all you need - A nano claude code–like 「agent harness」, built from 0 to 1
上一篇文章:
【与我学 ClaudeCode】协作篇 之 Autonomous Agents :自组织任务认领与空闲治理-CSDN博客
Worktree + Task Isolation 是迭代的第 12 个版本(s12),核心解决多 Agent 并行修改同一文件导致的冲突问题。它在 s11 自组织任务认领的基础上,引入了 Git Worktree 实现目录级隔离,构建了「共享任务板 + 隔离执行通道」的双层架构,让每个任务都在独立目录中执行,彻底避免编辑冲突和 Git 状态污染。
学习路线:s01 > s02 > s03 >s04> s05 > s06| s07>s08 > s09 > s10 > s11 >s12
一、问题根源:为什么共享目录撑不起并行任务?
到 s11 版本,Agent 已经能自主认领和完成任务,但所有任务共享一个工作目录:
- 两个 Agent 同时重构不同模块,A 修改
config.py,B 也修改config.py,未提交的改动互相污染 - 任务回滚困难,修改混在一起,无法干净撤销单个任务的变更
- 并行执行的 Git 状态混乱,无法区分不同任务的提交和变更
真正的并行任务需要:
- 每个任务有独立的执行目录,修改互不干扰
- 任务与目录强绑定,状态可追踪、可恢复
- 全局可见性与局部隔离性兼顾,既知道谁在做什么,又避免冲突
二、六大核心设计决策
Worktree + Task Isolation 通过六个关键设计,构建了一个简单、可靠、可恢复的并行任务执行系统。
1. 共享任务板 + 隔离执行通道
核心设计:任务板继续集中在.tasks/,而文件改动发生在按任务划分的 worktree 目录中。这样既保留了全局可见性(谁在做什么、完成到哪),又避免所有人同时写同一目录导致的冲突。协调层简单(一个任务板),执行层安全(多条隔离通道)。
替代方案的致命缺陷:
单个共享工作区实现更简单,但会导致编辑冲突和混乱的 Git 状态;每个任务完全独立的存储目录可以避免冲突,但会失去团队级可见性,让规划变得更困难。
2. 显式 worktree 生命周期索引
核心设计:.worktrees/index.json记录每个 worktree 的名称、路径、分支、task_id与状态。即使上下文压缩或进程重启,这些生命周期状态仍可检查和恢复。它也为list/status/remove提供了确定性的本地数据源。
替代方案的致命缺陷:
仅依赖
git worktree list可以维护本地记录,但会丢失任务绑定元数据和自定义生命周期状态;仅在内存中保存所有状态代码更简单,但会破坏可恢复性。
3. 按通道cwd路由 + 禁止重入
核心设计:命令通过worktree_run(name, command)使用cwd参数路由到 worktree 目录。重入保护避免了在已激活的 worktree 上下文中意外二次进入,保持生命周期归属清晰。
替代方案的致命缺陷:
全局
cwd变量修改容易实现,但会在并行工作中泄漏上下文;允许静默重入会让生命周期归属变得模糊,并使清理行为复杂化。
4. 追加式生命周期事件流
核心设计:生命周期事件写入.worktrees/events.jsonl(如worktree.create.*、worktree.remove.*、task.completed)。这样状态迁移可查询、可追踪,失败也会以*.failed显式暴露,而不是静默丢失。
替代方案的致命缺陷:
仅依赖控制台日志更轻量,但在长时间会话中很脆弱,且难以审计;完整的事件总线基础设施功能强大,但对于这个教学基线来说过于笨重。
5. 任务与工作区一起收尾
核心设计:worktree_remove(..., complete_task=True)允许在一个动作里完成收尾:删除隔离目录并把绑定任务标记为completed。收尾保持为显式工具驱动迁移(worktree_keep/worktree_remove),而不是隐藏的自动清理。这样可减少状态悬挂(任务已完成但临时工作区仍活跃,或反过来)。
替代方案的致命缺陷:
完全手动收尾提供了灵活性,但会增加操作偏差;每次完成时自动删除工作区,存在在最终审查前误删工作区的风险。
6. 事件流是观测旁路,不是状态机替身
核心设计:生命周期事件提升可审计性,但真实状态源仍是任务 / 工作区状态文件。事件更适合做迁移轨迹,而不是替代主状态机。
替代方案的致命缺陷:
仅使用日志会隐藏结构化的状态迁移;仅使用事件作为状态源,在重放 / 修复语义未定义时容易出现状态漂移。
三、系统整体架构与工作原理
1. 双层架构:控制平面 + 执行平面
Control plane (.tasks/) Execution plane (.worktrees/) +------------------+ +------------------------+ | task_1.json | | auth-refactor/ | | status: in_progress <------> branch: wt/auth-refactor | worktree: "auth-refactor" | task_id: 1 | +------------------+ +------------------------+ | task_2.json | | ui-login/ | | status: pending <------> branch: wt/ui-login | worktree: "ui-login" | task_id: 2 | +------------------+ +------------------------+ | index.json (worktree registry) events.jsonl (lifecycle log)- 控制平面:
.tasks/目录下的 JSON 文件,记录任务的目标、状态、owner 和绑定的 worktree 名称 - 执行平面:
.worktrees/目录下的独立 Git 工作区,每个 worktree 对应一个任务,拥有独立的分支和目录
2. 关键组件与实现细节
(1) TaskManager:共享任务板
class TaskManager: def __init__(self, tasks_dir: Path): self.dir = tasks_dir self.dir.mkdir(parents=True, exist_ok=True) self._next_id = self._max_id() + 1 def create(self, subject: str, description: str = "") -> str: """创建新任务,状态为 pending""" task = { "id": self._next_id, "subject": subject, "description": description, "status": "pending", "owner": "", "worktree": "", "blockedBy": [], "created_at": time.time(), "updated_at": time.time(), } self._save(task) self._next_id += 1 return json.dumps(task, indent=2) def bind_worktree(self, task_id: int, worktree: str, owner: str = "") -> str: """绑定任务到 worktree,状态推进为 in_progress""" task = self._load(task_id) task["worktree"] = worktree if owner: task["owner"] = owner if task["status"] == "pending": task["status"] = "in_progress" task["updated_at"] = time.time() self._save(task) return json.dumps(task, indent=2)(2) WorktreeManager:隔离执行通道管理
class WorktreeManager: def __init__(self, repo_root: Path, tasks: TaskManager, events: EventBus): self.repo_root = repo_root self.tasks = tasks self.events = events self.dir = repo_root / ".worktrees" self.dir.mkdir(parents=True, exist_ok=True) self.index_path = self.dir / "index.json" if not self.index_path.exists(): self.index_path.write_text(json.dumps({"worktrees": []}, indent=2)) self.git_available = self._is_git_repo() def create(self, name: str, task_id: int = None, base_ref: str = "HEAD") -> str: """创建 worktree 并绑定任务""" self._validate_name(name) if self._find(name): raise ValueError(f"Worktree '{name}' already exists in index") if task_id is not None and not self.tasks.exists(task_id): raise ValueError(f"Task {task_id} not found") path = self.dir / name branch = f"wt/{name}" # 发出创建前事件 self.events.emit("worktree.create.before", task={"id": task_id}, worktree={"name": name, "base_ref": base_ref}) try: # 创建 Git worktree self._run_git(["worktree", "add", "-b", branch, str(path), base_ref]) # 更新索引 entry = { "name": name, "path": str(path), "branch": branch, "task_id": task_id, "status": "active", "created_at": time.time(), } idx = self._load_index() idx["worktrees"].append(entry) self._save_index(idx) # 绑定任务 if task_id is not None: self.tasks.bind_worktree(task_id, name) # 发出创建后事件 self.events.emit("worktree.create.after", task={"id": task_id}, worktree=entry) return json.dumps(entry, indent=2) except Exception as e: self.events.emit("worktree.create.failed", task={"id": task_id}, worktree={"name": name, "base_ref": base_ref}, error=str(e)) raise def run(self, name: str, command: str) -> str: """在指定 worktree 中执行命令,自动路由 cwd""" wt = self._find(name) if not wt: return f"Error: Unknown worktree '{name}'" path = Path(wt["path"]) if not path.exists(): return f"Error: Worktree path missing: {path}" try: # 命令在 worktree 目录中执行,不影响其他任务 r = subprocess.run( command, shell=True, cwd=path, capture_output=True, text=True, timeout=300, ) out = (r.stdout + r.stderr).strip() return out[:50000] if out else "(no output)" except subprocess.TimeoutExpired: return "Error: Timeout (300s)" def remove(self, name: str, force: bool = False, complete_task: bool = False) -> str: """删除 worktree,可选择同时完成绑定任务""" wt = self._find(name) if not wt: return f"Error: Unknown worktree '{name}'" self.events.emit("worktree.remove.before", task={"id": wt.get("task_id")}, worktree={"name": name, "path": wt.get("path")}) try: # 删除 Git worktree args = ["worktree", "remove"] if force: args.append("--force") args.append(wt["path"]) self._run_git(args) # 完成绑定任务 if complete_task and wt.get("task_id") is not None: task_id = wt["task_id"] before = json.loads(self.tasks.get(task_id)) self.tasks.update(task_id, status="completed") self.tasks.unbind_worktree(task_id) self.events.emit("task.completed", task={"id": task_id, "subject": before.get("subject", ""), "status": "completed"}, worktree={"name": name}) # 更新索引状态 idx = self._load_index() for item in idx.get("worktrees", []): if item.get("name") == name: item["status"] = "removed" item["removed_at"] = time.time() self._save_index(idx) self.events.emit("worktree.remove.after", task={"id": wt.get("task_id")}, worktree={"name": name, "path": wt.get("path"), "status": "removed"}) return f"Removed worktree '{name}'" except Exception as e: self.events.emit("worktree.remove.failed", task={"id": wt.get("task_id")}, worktree={"name": name, "path": wt.get("path")}, error=str(e)) raise(3) EventBus:生命周期事件流
class EventBus: def __init__(self, event_log_path: Path): self.path = event_log_path self.path.parent.mkdir(parents=True, exist_ok=True) if not self.path.exists(): self.path.write_text("") def emit(self, event: str, task: dict | None = None, worktree: dict | None = None, error: str | None = None): """追加写入事件流,记录状态迁移""" payload = { "event": event, "ts": time.time(), "task": task or {}, "worktree": worktree or {}, } if error: payload["error"] = error with self.path.open("a", encoding="utf-8") as f: f.write(json.dumps(payload) + "\n") def list_recent(self, limit: int = 20) -> str: """读取最近的事件,用于审计和调试""" n = max(1, min(int(limit or 20), 200)) lines = self.path.read_text(encoding="utf-8").splitlines() recent = lines[-n:] items = [] for line in recent: try: items.append(json.loads(line)) except Exception: items.append({"event": "parse_error", "raw": line}) return json.dumps(items, indent=2)(4) 新增工具集
TOOL_HANDLERS = { # 基础工具 "bash": lambda **kw: run_bash(kw["command"]), "read_file": lambda **kw: run_read(kw["path"], kw.get("limit")), "write_file": lambda **kw: run_write(kw["path"], kw["content"]), "edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]), # 任务管理工具 "task_create": lambda **kw: TASKS.create(kw["subject"], kw.get("description", "")), "task_list": lambda **kw: TASKS.list_all(), "task_get": lambda **kw: TASKS.get(kw["task_id"]), "task_update": lambda **kw: TASKS.update(kw["task_id"], kw.get("status"), kw.get("owner")), "task_bind_worktree": lambda **kw: TASKS.bind_worktree(kw["task_id"], kw["worktree"], kw.get("owner", "")), # Worktree 管理工具 "worktree_create": lambda **kw: WORKTREES.create(kw["name"], kw.get("task_id"), kw.get("base_ref", "HEAD")), "worktree_list": lambda **kw: WORKTREES.list_all(), "worktree_status": lambda **kw: WORKTREES.status(kw["name"]), "worktree_run": lambda **kw: WORKTREES.run(kw["name"], kw["command"]), "worktree_keep": lambda **kw: WORKTREES.keep(kw["name"]), "worktree_remove": lambda **kw: WORKTREES.remove(kw["name"], kw.get("force", False), kw.get("complete_task", False)), "worktree_events": lambda **kw: EVENTS.list_recent(kw.get("limit", 20)), }(5) 执行流程
四、与 Autonomous Agents(s11)的关键变更对比
| 组件 | 之前(s11 Autonomous Agents) | 之后(s12 Worktree + Task Isolation) |
|---|---|---|
| 协调机制 | 任务板(owner/status) | 任务板 + worktree 显式绑定 |
| 执行范围 | 共享目录 | 每个任务独立目录(Git Worktree) |
| 可恢复性 | 仅任务状态 | 任务状态 + worktree 索引 + 事件流 |
| 收尾流程 | 任务完成(隐式) | 任务完成 + 显式keep/remove操作 |
| 生命周期可见性 | 隐式日志 | .worktrees/events.jsonl显式事件流 |
| 冲突防护 | 无(共享目录易冲突) | 目录级隔离,修改互不干扰 |
五、核心优势与创新点
- 目录级隔离,彻底避免冲突:每个任务在独立的 Git Worktree 中执行,修改互不干扰,解决了并行任务的文件编辑冲突问题
- 任务与目录强绑定,状态可追踪:通过
task_id关联任务和 worktree,任务状态与目录状态同步,崩溃后可通过.tasks/和.worktrees/index.json重建现场 - 可恢复的生命周期管理:worktree 索引和事件流记录了完整的状态迁移过程,进程重启后可恢复所有任务和 worktree 状态
- 显式收尾流程,避免状态悬挂:
worktree_remove可同时删除目录并完成任务,worktree_keep可保留目录供后续使用,避免任务完成但目录未清理或反之的状态不一致问题 - 可审计的事件流:追加式事件流记录了所有 worktree 和任务的生命周期事件,便于调试和审计失败场景
六、运行示例:并行任务隔离执行流程
- 创建任务:领导调用
task_create("Implement auth refactor"),创建任务#1,状态为pending - 创建 worktree 并绑定:调用
worktree_create("auth-refactor", task_id=1),创建 Git Worktree 目录,任务状态推进为in_progress,worktree 索引记录状态为active - 在隔离目录中执行命令:调用
worktree_run("auth-refactor", "python auth.py"),命令在.worktrees/auth-refactor目录中执行,不影响其他任务 - 并行执行其他任务:同时创建任务
#2(UI 登录优化),绑定 worktreeui-login,两个任务在各自目录中并行执行,修改互不干扰 - 任务收尾:任务
#1完成后,调用worktree_remove("auth-refactor", complete_task=True),删除 worktree 目录,任务状态更新为completed,同时发出task.completed和worktree.remove.after事件
七、可扩展方向
- Worktree 权限控制:为不同角色的 Agent 分配不同的 worktree 访问权限,实现更细粒度的安全控制
- Worktree 分支管理:支持为 worktree 指定不同的 Git 分支,实现基于分支的任务隔离
- 任务依赖与 worktree 复用:支持任务完成后保留 worktree,供后续依赖任务复用,减少重复创建开销
- 事件流告警机制:为特定事件(如
worktree.create.failed)添加告警,实时通知系统异常 - 多仓库支持:扩展 worktree 管理,支持多个 Git 仓库的任务隔离执行
学习路线:s01 > s02 > s03 >s04> s05 > s06| s07>s08>s09 >s10>s11> s12