Files
common-skills/scripts/orchestrate.py
T
Team c0d14c6ac1 chore: restructure skills repo with new agents and skill bundles
- Add new skills: deep-dive, docs-rag, meta-creator, ppt-maker, sdlc
- Add agent configs: g-assistent, meta-creator, sdlc with prompt files
- Add reference docs for custom agents and skills specification
- Add utility scripts: install-agents.sh, orchestrate.py, puml2svg.sh
- Update README and commit-message skill config
- Remove deprecated skills: codereview, python, testing, typescript
- Add .gitignore
2026-04-18 13:07:46 +08:00

394 lines
15 KiB
Python

#!/usr/bin/env python3
"""
orchestrate.py — Agent & Skill Orchestrator for common-skills
Dynamically discovers agents (.kiro/agents/*.json) and skills (skills/*/SKILL.md),
then routes user requests through the appropriate agent+skill pipeline with full
observability: structured logging, timing, token estimation, and trace output.
Usage:
python scripts/orchestrate.py "review my code for bugs"
python scripts/orchestrate.py "write a commit message" --agent commit-message
python scripts/orchestrate.py --list
python scripts/orchestrate.py --list-skills
python scripts/orchestrate.py "..." --dry-run
python scripts/orchestrate.py "..." --trace
python scripts/orchestrate.py "..." --log-file run.jsonl
"""
import argparse
import json
import re
import subprocess
import sys
import time
import uuid
from dataclasses import dataclass, field, asdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
# ─── Paths ────────────────────────────────────────────────────────────────────
REPO_ROOT = Path(__file__).parent.parent
AGENTS_DIR = REPO_ROOT / ".kiro" / "agents"
SKILLS_DIR = REPO_ROOT / "skills"
# ─── Data models ──────────────────────────────────────────────────────────────
@dataclass
class SkillMeta:
name: str
description: str
path: Path
frontmatter: dict = field(default_factory=dict)
def summary(self) -> str:
return f"[skill:{self.name}] {self.description}"
@dataclass
class AgentMeta:
name: str
description: str
path: Path
prompt: str
tools: list[str]
resources: list[str]
raw: dict = field(default_factory=dict)
def summary(self) -> str:
return f"[agent:{self.name}] {self.description}"
@dataclass
class TraceEvent:
"""One structured log entry in the execution trace."""
trace_id: str
timestamp: str
event: str # e.g. "route", "invoke", "grade", "error"
agent: Optional[str] = None
skill: Optional[str] = None
detail: Optional[dict] = None
def to_json(self) -> str:
return json.dumps(asdict(self), ensure_ascii=False)
# ─── Registry ─────────────────────────────────────────────────────────────────
class Registry:
"""Dynamically discovers all agents and skills from disk."""
def __init__(self):
self.agents: dict[str, AgentMeta] = {}
self.skills: dict[str, SkillMeta] = {}
self._load_agents()
self._load_skills()
def _load_agents(self):
if not AGENTS_DIR.exists():
return
for f in sorted(AGENTS_DIR.glob("*.json")):
try:
raw = json.loads(f.read_text())
self.agents[raw["name"]] = AgentMeta(
name=raw["name"],
description=raw.get("description", ""),
path=f,
prompt=raw.get("prompt", ""),
tools=raw.get("tools", raw.get("allowedTools", [])),
resources=raw.get("resources", []),
raw=raw,
)
except Exception as e:
print(f"⚠️ Could not load agent {f.name}: {e}", file=sys.stderr)
def _load_skills(self):
if not SKILLS_DIR.exists():
return
for skill_dir in sorted(SKILLS_DIR.iterdir()):
skill_md = skill_dir / "SKILL.md"
if not skill_md.exists():
continue
try:
content = skill_md.read_text()
fm = _parse_frontmatter(content)
name = fm.get("name", skill_dir.name)
self.skills[skill_dir.name] = SkillMeta(
name=name,
description=fm.get("description", ""),
path=skill_md,
frontmatter=fm,
)
except Exception as e:
print(f"⚠️ Could not load skill {skill_dir.name}: {e}", file=sys.stderr)
def list_agents(self):
for a in self.agents.values():
print(f" {a.summary()}")
print(f" tools : {', '.join(a.tools)}")
print(f" resources : {', '.join(a.resources)}")
def list_skills(self):
for s in self.skills.values():
print(f" {s.summary()}")
print(f" path : {s.path.relative_to(REPO_ROOT)}")
def _parse_frontmatter(content: str) -> dict:
"""Extract YAML-like frontmatter between --- delimiters."""
m = re.match(r"^---\s*\n(.*?)\n---", content, re.DOTALL)
if not m:
return {}
result = {}
for line in m.group(1).splitlines():
if ":" in line:
k, _, v = line.partition(":")
result[k.strip()] = v.strip()
return result
# ─── Router ───────────────────────────────────────────────────────────────────
class Router:
"""
Routes a user prompt to the best agent.
Strategy (in order):
1. Explicit --agent flag → use that agent directly
2. Keyword match against agent routing rules embedded in prompt field
3. Fallback to 'g-assistent' (general assistant) if present
4. Fallback to first available agent
"""
# Simple keyword → skill hints extracted from g-assistent routing rules
KEYWORD_HINTS: list[tuple[list[str], str]] = [
(["commit", "git commit", "提交"], "commit-message"),
(["review", "bug", "anti-pattern", "代码审查"], "codereview"),
(["python", "py "], "python"),
(["typescript", "ts ", ".ts"], "typescript"),
(["test", "测试", "unit test"], "testing"),
(["doc", "search doc", "文档"], "docs-rag"),
(["deep dive", "分析", "解释", "how does", "understand"], "deep-dive"),
(["build", "design", "sdlc", "需求", "系统设计", "任务分解"], "sdlc"),
]
def __init__(self, registry: Registry):
self.registry = registry
def route(self, prompt: str, agent_override: Optional[str] = None) -> tuple[AgentMeta, Optional[SkillMeta]]:
"""Return (agent, skill_hint) for the given prompt."""
# 1. Explicit override
if agent_override:
agent = self.registry.agents.get(agent_override)
if not agent:
raise ValueError(f"Agent '{agent_override}' not found. Available: {list(self.registry.agents)}")
skill = self._skill_for_agent(agent)
return agent, skill
# 2. Keyword routing
prompt_lower = prompt.lower()
for keywords, skill_name in self.KEYWORD_HINTS:
if any(kw in prompt_lower for kw in keywords):
# Find an agent that references this skill
agent = self._agent_for_skill(skill_name) or self._default_agent()
skill = self.registry.skills.get(skill_name)
return agent, skill
# 3. Default agent
return self._default_agent(), None
def _agent_for_skill(self, skill_name: str) -> Optional[AgentMeta]:
for agent in self.registry.agents.values():
if any(skill_name in r for r in agent.resources):
return agent
return None
def _skill_for_agent(self, agent: AgentMeta) -> Optional[SkillMeta]:
for resource in agent.resources:
# e.g. "file://.kiro/skills/commit-message/SKILL.md"
m = re.search(r"skills/([^/\"*]+)/", resource)
if m:
skill_name = m.group(1)
if skill_name in self.registry.skills:
return self.registry.skills[skill_name]
return None
def _default_agent(self) -> AgentMeta:
for name in ("g-assistent", "main"):
if name in self.registry.agents:
return self.registry.agents[name]
if self.registry.agents:
return next(iter(self.registry.agents.values()))
raise RuntimeError("No agents found in .kiro/agents/")
# ─── Executor ─────────────────────────────────────────────────────────────────
class Executor:
"""Invokes kiro-cli and captures structured output."""
def __init__(self, trace_id: str, trace: bool = False, log_file: Optional[Path] = None):
self.trace_id = trace_id
self.trace = trace
self.log_file = log_file
self._events: list[TraceEvent] = []
def emit(self, event: str, agent: str = None, skill: str = None, detail: dict = None):
e = TraceEvent(
trace_id=self.trace_id,
timestamp=datetime.now(timezone.utc).isoformat(),
event=event,
agent=agent,
skill=skill,
detail=detail or {},
)
self._events.append(e)
if self.trace:
print(f" TRACE {e.to_json()}", file=sys.stderr)
if self.log_file:
with open(self.log_file, "a") as f:
f.write(e.to_json() + "\n")
def run(self, prompt: str, agent: AgentMeta, skill: Optional[SkillMeta], dry_run: bool = False) -> dict:
self.emit("route", agent=agent.name, skill=skill.name if skill else None, detail={
"prompt_preview": prompt[:120],
"agent_tools": agent.tools,
"skill_path": str(skill.path.relative_to(REPO_ROOT)) if skill else None,
})
if dry_run:
self.emit("dry_run", agent=agent.name, skill=skill.name if skill else None)
return {
"dry_run": True, "trace_id": self.trace_id,
"agent": agent.name, "skill": skill.name if skill else None,
"elapsed_s": 0, "token_estimate": 0, "exit_code": None,
}
cmd = ["kiro-cli", "chat", "--agent", agent.name, "--no-interactive", prompt]
self.emit("invoke", agent=agent.name, skill=skill.name if skill else None, detail={"cmd": cmd})
start = time.perf_counter()
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
except subprocess.TimeoutExpired:
self.emit("error", agent=agent.name, detail={"reason": "timeout"})
raise
elapsed = round(time.perf_counter() - start, 3)
response = _strip_ansi(result.stdout).strip()
stderr = _strip_ansi(result.stderr).strip()
# Estimate tokens (rough: 1 token ≈ 4 chars)
token_est = len(prompt) // 4 + len(response) // 4
self.emit("response", agent=agent.name, skill=skill.name if skill else None, detail={
"elapsed_s": elapsed,
"exit_code": result.returncode,
"response_chars": len(response),
"token_estimate": token_est,
"has_stderr": bool(stderr),
})
return {
"trace_id": self.trace_id,
"agent": agent.name,
"skill": skill.name if skill else None,
"prompt": prompt,
"response": response,
"stderr": stderr,
"elapsed_s": elapsed,
"token_estimate": token_est,
"exit_code": result.returncode,
}
def print_summary(self, result: dict):
"""Human-readable execution summary."""
print("\n" + "" * 60)
print(f" trace_id : {result.get('trace_id')}")
print(f" agent : {result.get('agent')}")
print(f" skill : {result.get('skill') or '(none)'}")
print(f" elapsed : {result.get('elapsed_s')}s")
print(f" tokens~ : {result.get('token_estimate')}")
print(f" exit : {result.get('exit_code')}")
print("" * 60)
if result.get("dry_run"):
print(" [dry-run] No invocation made.")
return
print("\n" + result.get("response", ""))
if result.get("stderr"):
print(f"\n[stderr]\n{result['stderr']}", file=sys.stderr)
def print_trace(self):
"""Print all trace events as a timeline."""
print("\n── Execution Trace ──────────────────────────────────────")
for e in self._events:
ts = e.timestamp[11:23] # HH:MM:SS.mmm
detail_str = json.dumps(e.detail, ensure_ascii=False) if e.detail else ""
print(f" {ts} [{e.event:<10}] agent={e.agent or '-':20} skill={e.skill or '-':20} {detail_str}")
def _strip_ansi(text: str) -> str:
return re.sub(r"\x1b\[[0-9;]*[A-Za-z]", "", text)
# ─── CLI ──────────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(
description="Orchestrate kiro agents and skills",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
parser.add_argument("prompt", nargs="?", help="User prompt to route and execute")
parser.add_argument("--agent", help="Force a specific agent by name")
parser.add_argument("--dry-run", action="store_true", help="Show routing decision without invoking")
parser.add_argument("--trace", action="store_true", help="Print trace events to stderr in real time")
parser.add_argument("--log-file", type=Path, help="Append JSONL trace events to this file")
parser.add_argument("--list", action="store_true", help="List all discovered agents")
parser.add_argument("--list-skills", action="store_true", help="List all discovered skills")
args = parser.parse_args()
registry = Registry()
if args.list:
print(f"\nAgents ({len(registry.agents)}) — from {AGENTS_DIR.relative_to(REPO_ROOT)}")
registry.list_agents()
return
if args.list_skills:
print(f"\nSkills ({len(registry.skills)}) — from {SKILLS_DIR.relative_to(REPO_ROOT)}")
registry.list_skills()
return
if not args.prompt:
parser.print_help()
return
trace_id = uuid.uuid4().hex[:12]
router = Router(registry)
executor = Executor(trace_id, trace=args.trace, log_file=args.log_file)
try:
agent, skill = router.route(args.prompt, agent_override=args.agent)
except (ValueError, RuntimeError) as e:
print(f"❌ Routing error: {e}", file=sys.stderr)
sys.exit(1)
# Show routing decision
print(f"\n→ agent : {agent.name}")
print(f"→ skill : {skill.name if skill else '(none — general assistant)'}")
if skill:
print(f"→ skill description: {skill.description}")
result = executor.run(args.prompt, agent, skill, dry_run=args.dry_run)
executor.print_summary(result)
if args.trace or args.dry_run:
executor.print_trace()
if __name__ == "__main__":
main()