Multi-Agent Orchestration
Coordinate multiple AI agents working together to accomplish complex development objectives efficiently.
Multi-Agent Orchestration: Coordinating AI Teams for Complex Workflows
You've mastered working with a single AI assistant. You've even experimented with multiple agents handling different tasks. Now it's time to level up: orchestrating multiple AI agents working together like a well-coordinated development team.
Multi-agent orchestration isn't about throwing more AI at a problem—it's about creating systems where specialized agents collaborate, communicate, and coordinate to solve problems beyond the capability of any single agent. Think of it as building your own AI development team, where each member has specific expertise and responsibilities.
Understanding Multi-Agent Orchestration
Multi-agent orchestration involves coordinating multiple AI agents with different roles, capabilities, and contexts. Unlike simple multi-agent workflows where agents work sequentially or in parallel without much interaction, orchestration creates intelligent collaboration patterns.
When You Need Orchestration
Orchestration becomes valuable when:
- Complex problem decomposition: A task naturally splits into specialized subtasks (architecture, implementation, testing, security review)
- Iterative refinement: Multiple perspectives improve output quality (code review, optimization suggestions, security audits)
- Context management: Different agents need different context depths to avoid over-reliance on bloated prompts
- Parallel processing: Independent subtasks can run simultaneously, then merge results
- Quality gates: Each agent validates or transforms previous agent outputs
The Orchestration Patterns
Before diving into code, understand these fundamental patterns:
Sequential Pipeline: Agents work in order, each refining the previous agent's output
Parallel Processing: Multiple agents work simultaneously on different aspects
Hierarchical: A coordinator agent delegates to specialist agents
Adversarial: Agents challenge each other's outputs (reviewer vs. implementer)
Consensus: Multiple agents vote or agree on solutions
Building Your First Orchestration System
Let's build a practical orchestration system for a common scenario: implementing a new feature with quality controls.
The Architecture Agent
import anthropic
import json
from typing import Dict, List, Any
class ArchitectAgent:
"""Designs system architecture and creates implementation plan"""
def __init__(self, client: anthropic.Anthropic):
self.client = client
self.role = "software architect"
def design(self, feature_spec: str, codebase_context: str) -> Dict[str, Any]:
"""Create architectural design for a feature"""
prompt = f"""You are a software architect. Design the implementation plan for this feature.
Feature Specification:
{feature_spec}
Codebase Context:
{codebase_context}
Provide:
1. Component breakdown
2. Interface definitions
3. Data flow
4. Integration points
5. Potential risks
Format as JSON with keys: components, interfaces, dataflow, integration_points, risks"""
response = self.client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=4000,
messages=[{"role": "user", "content": prompt}]
)
# Extract JSON from response
content = response.content[0].text
# Find JSON in markdown code blocks or raw
if "```json" in content:
json_str = content.split("```json")[1].split("```")[0]
else:
json_str = content
return json.loads(json_str)
The Implementation Agent
class ImplementationAgent:
"""Implements code based on architectural design"""
def __init__(self, client: anthropic.Anthropic):
self.client = client
self.role = "implementation engineer"
def implement(self, architecture: Dict[str, Any], component: str) -> str:
"""Implement a specific component from the architecture"""
prompt = f"""You are an implementation engineer. Write production-ready code for this component.
Architectural Design:
{json.dumps(architecture, indent=2)}
Component to Implement: {component}
Requirements:
- Follow the interface definitions exactly
- Include error handling
- Add docstrings and type hints
- Write clean, maintainable code
- Consider edge cases
Provide only the code, no explanations."""
response = self.client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=8000,
messages=[{"role": "user", "content": prompt}]
)
return response.content[0].text
The Security Reviewer Agent
class SecurityReviewerAgent:
"""Reviews code for security vulnerabilities"""
def __init__(self, client: anthropic.Anthropic):
self.client = client
self.role = "security reviewer"
def review(self, code: str, architecture: Dict[str, Any]) -> Dict[str, Any]:
"""Perform security review on implemented code"""
prompt = f"""You are a security expert reviewing code. Identify security issues and provide fixes.
Code to Review:
```python
{code}
Architectural Context:
{json.dumps(architecture, indent=2)}
Analyze for:
- Injection vulnerabilities
- Authentication/authorization issues
- Data exposure risks
- Input validation gaps
- Cryptographic weaknesses
Provide JSON with: {{"issues": [{{"severity": "high|medium|low", "description": "...", "fix": "..."}}], "approved": boolean}}"""
response = self.client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=4000,
messages=[{"role": "user", "content": prompt}]
)
content = response.content[0].text
if "```json" in content:
json_str = content.split("```json")[1].split("```")[0]
else:
json_str = content
return json.loads(json_str)
## The Orchestrator: Bringing It All Together
Now we build the orchestrator that coordinates these agents:
```python
class FeatureOrchestrator:
"""Orchestrates multi-agent feature development workflow"""
def __init__(self, client: anthropic.Anthropic):
self.client = client
self.architect = ArchitectAgent(client)
self.implementer = ImplementationAgent(client)
self.security_reviewer = SecurityReviewerAgent(client)
self.workflow_state = {}
def develop_feature(self, feature_spec: str, codebase_context: str) -> Dict[str, Any]:
"""Execute full feature development workflow"""
print("🏗️ Phase 1: Architecture Design")
architecture = self.architect.design(feature_spec, codebase_context)
self.workflow_state['architecture'] = architecture
implementations = {}
security_reviews = {}
# Implement each component
for component in architecture.get('components', []):
component_name = component.get('name', component)
print(f"\n⚙️ Phase 2: Implementing {component_name}")
code = self.implementer.implement(architecture, component_name)
implementations[component_name] = code
# Security review for each component
print(f"🔒 Phase 3: Security Review for {component_name}")
review = self.security_reviewer.review(code, architecture)
security_reviews[component_name] = review
# Handle security issues
if not review.get('approved', False):
high_severity_issues = [
issue for issue in review.get('issues', [])
if issue.get('severity') == 'high'
]
if high_severity_issues:
print(f"❌ High severity issues found in {component_name}")
print("🔄 Re-implementing with security fixes...")
# Create enhanced prompt with security fixes
fixes_context = "\n".join([
f"- {issue['description']}: {issue['fix']}"
for issue in high_severity_issues
])
# Re-implement with security context
enhanced_component = f"{component_name} (with security fixes: {fixes_context})"
code = self.implementer.implement(architecture, enhanced_component)
implementations[component_name] = code
# Re-review
review = self.security_reviewer.review(code, architecture)
security_reviews[component_name] = review
self.workflow_state['implementations'] = implementations
self.workflow_state['security_reviews'] = security_reviews
return self._generate_summary()
def _generate_summary(self) -> Dict[str, Any]:
"""Generate final summary of orchestration workflow"""
total_components = len(self.workflow_state.get('implementations', {}))
security_issues = []
for component, review in self.workflow_state.get('security_reviews', {}).items():
for issue in review.get('issues', []):
security_issues.append({
'component': component,
'severity': issue.get('severity'),
'description': issue.get('description')
})
return {
'architecture': self.workflow_state.get('architecture'),
'total_components': total_components,
'implementations': self.workflow_state.get('implementations'),
'security_summary': {
'total_issues': len(security_issues),
'high_severity': len([i for i in security_issues if i['severity'] == 'high']),
'issues': security_issues
},
'status': 'complete'
}
# Usage
client = anthropic.Anthropic(api_key="your-api-key")
orchestrator = FeatureOrchestrator(client)
feature_spec = """
Implement a user authentication system with:
- JWT token generation
- Password hashing with bcrypt
- Rate limiting for login attempts
- Session management
"""
codebase_context = """
FastAPI application
PostgreSQL database
Existing User model with id, email, hashed_password fields
"""
result = orchestrator.develop_feature(feature_spec, codebase_context)
print(json.dumps(result['security_summary'], indent=2))
Advanced Orchestration Techniques
Dynamic Agent Selection
Not every task needs every agent. Implement intelligent agent selection:
class DynamicOrchestrator:
"""Orchestrator that dynamically selects agents based on task requirements"""
def __init__(self, client: anthropic.Anthropic):
self.client = client
self.available_agents = {
'architect': ArchitectAgent(client),
'implementer': ImplementationAgent(client),
'security': SecurityReviewerAgent(client),
'performance': PerformanceReviewerAgent(client),
'tester': TestGeneratorAgent(client)
}
def select_agents(self, task_description: str) -> List[str]:
"""Use an AI to determine which agents are needed"""
prompt = f"""Analyze this development task and determine which specialist agents are needed.
Task: {task_description}
Available agents:
- architect: Design system architecture
- implementer: Write code
- security: Security review
- performance: Performance analysis
- tester: Generate test cases
Return JSON array of required agent names in execution order: ["agent1", "agent2", ...]"""
response = self.client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1000,
messages=[{"role": "user", "content": prompt}]
)
content = response.content[0].text
if "```json" in content:
json_str = content.split("```json")[1].split("```")[0]
else:
json_str = content
return json.loads(json_str)
def execute_workflow(self, task_description: str, context: str) -> Dict[str, Any]:
"""Execute workflow with dynamically selected agents"""
selected_agents = self.select_agents(task_description)
print(f"Selected agents: {', '.join(selected_agents)}")
results = {}
current_output = context
for agent_name in selected_agents:
agent = self.available_agents[agent_name]
print(f"\n🤖 Executing {agent_name}...")
# Each agent processes the accumulated context
# This is simplified - you'd customize per agent type
result = self._execute_agent(agent, task_description, current_output)
results[agent_name] = result
current_output = result # Chain outputs
return results
def _execute_agent(self, agent, task, context):
# Route to appropriate agent method based on agent type
# Simplified for example
return f"Output from {agent.role}"
Consensus and Voting Mechanisms
For critical decisions, use multiple agents to vote or reach consensus:
class ConsensusOrchestrator:
"""Uses multiple agents to reach consensus on solutions"""
def get_consensus_solution(self, problem: str, num_agents: int = 3) -> Dict[str, Any]:
"""Generate multiple solutions and find consensus"""
solutions = []
# Generate solutions from multiple agents with slight variation
for i in range(num_agents):
prompt = f"""Solve this problem. Be thorough and consider edge cases.
Problem: {problem}
Approach this from perspective #{i+1}, considering different trade-offs.
Provide: {{"solution": "...", "pros": [...], "cons": [...], "confidence": 0-100}}"""
response = self.client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=3000,
temperature=0.7 + (i * 0.1), # Slight temp variation
messages=[{"role": "user", "content": prompt}]
)
content = response.content[0].text
if "```json" in content:
json_str = content.split("```json")[1].split("```")[0]
else:
json_str = content
solutions.append(json.loads(json_str))
# Use another agent to synthesize consensus
synthesis_prompt = f"""Review these {num_agents} solutions and synthesize the best approach.
Solutions:
{json.dumps(solutions, indent=2)}
Provide:
1. Consensus solution combining best elements
2. Key disagreements between solutions
3. Recommended approach with justification
Format as JSON."""
response = self.client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=4000,
messages=[{"role": "user", "content": synthesis_prompt}]
)
return {
'individual_solutions': solutions,
'consensus': response.content[0].text
}
Managing Orchestration Complexity
State Management
As workflows grow complex, robust state management becomes critical:
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Optional
class TaskStatus(Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
BLOCKED = "blocked"
@dataclass
class TaskResult:
agent: str
output: Any
timestamp: datetime
duration_seconds: float
status: TaskStatus
error: Optional[str] = None
@dataclass
class WorkflowState:
workflow_id: str
current_phase: str
task_results: Dict[str, TaskResult] = field(default_factory=dict)
shared_context: Dict[str, Any] = field(default_factory=dict)
errors: List[str] = field(default_factory=list)
def add_result(self, task_name: str, result: TaskResult):
self.task_results[task_name] = result
def get_result(self, task_name: str) -> Optional[TaskResult]:
return self.task_results.get(task_name)
def update_context(self, key: str, value: Any):
self.shared_context[key] = value
def is_blocked(self) -> bool:
return any(
result.status == TaskStatus.BLOCKED
for result in self.task_results.values()
)
Error Handling and Recovery
Orchestration workflows need robust error handling:
class ResilientOrchestrator:
"""Orchestrator with retry logic and fallback strategies"""
def __init__(self, client: anthropic.Anthropic, max_retries: int = 3):
self.client = client
self.max_retries = max_retries
def execute_with_retry(self, agent_func, *args, **kwargs) -> Any:
"""Execute agent function with retry logic"""
for attempt in range(self.max_retries):
try:
result = agent_func(*args, **kwargs)
# Validate result quality
if self._validate_result(result):
return result
else:
print(f"Result validation failed, attempt {attempt + 1}")
if attempt == self.max_retries - 1:
return self._fallback_strategy(agent_func, *args, **kwargs)
except Exception as e:
print(f"Error on attempt {attempt + 1}: {str(e)}")
if attempt == self.max_retries - 1:
return self._fallback_strategy(agent_func, *args, **kwargs)
time.sleep(2 ** attempt) # Exponential backoff
def _validate_result(self, result: Any) -> bool:
"""Validate result meets quality standards"""
# Check for hallucination markers, incomplete responses, etc.
# See hallucination-detection lesson for detailed techniques
if isinstance(result, str):
return len(result) > 50 and "error" not in result.lower()
return result is not None
def _fallback_strategy(self, agent_func, *args, **kwargs):
"""Implement fallback when primary approach fails"""
print("⚠️ Primary approach failed, using fallback")
# Could use simpler model, different prompt, or manual intervention flag
return {"status": "requires_manual_review", "context": args}
Real-World Orchestration Patterns
The Review-Revision Loop
Use adversarial agents to improve code quality:
def review_revision_loop(code: str, max_iterations: int = 3) -> str:
"""Iteratively improve code through review-revision cycles"""
current_code = code
for iteration in range(max_iterations):
print(f"\n🔄 Iteration {iteration + 1}")
# Review phase
review = security_reviewer.review(current_code, {})
if review.get('approved') and len(review.get('issues', [])) == 0:
print("✅ Code approved!")
break
# Revision phase
revision_prompt = f"""Improve this code based on review feedback.
Current Code:
```python
{current_code}
Review Feedback:
{json.dumps(review.get('issues', []), indent=2)}
Provide improved code addressing all issues."""
response = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=8000,
messages=[{"role": "user", "content": revision_prompt}]
)
current_code = response.content[0].text
print(f"Addressed {len(review.get('issues', []))} issues")
return current_code
## Performance and Cost Optimization
Orchestration can get expensive fast. Optimize strategically:
### Selective Agent Invocation
```python
class CostOptimizedOrchestrator:
"""Minimizes API calls while maintaining quality"""
def should_invoke_agent(self, agent_type: str, context: Dict) -> bool:
"""Determine if agent invocation is necessary"""
# Skip security review for low-risk changes
if agent_type == "security":
if context.get('change_type') == 'documentation':
return False
if context.get('lines_changed', 0) < 10:
return False
# Skip performance review for non-critical paths
if agent_type == "performance":
if not context.get('is_hot_path', False):
return False
return True
def parallel_execution(self, independent_tasks: List[Dict]) -> List[Any]:
"""Execute independent agent tasks in parallel"""
import concurrent.futures
def execute_task(task):
agent = self.available_agents[task['agent']]
return agent.execute(task['input'])
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(execute_task, task) for task in independent_tasks]
return [future.result() for future in concurrent.futures.as_completed(futures)]
For more cost optimization techniques, see performance-optimization and scaling-vibe-coding.
Integration with Development Workflows
Orchestration should enhance, not disrupt, your team's workflow. See team-workflows for deeper integration patterns.
CI/CD Integration
# Example GitHub Actions integration
# .github/workflows/ai-review.yml
"""
name: AI Multi-Agent Review
on: [pull_request]
jobs:
ai-review:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Run Multi-Agent Review
env:
ANTHROPIC_API_KEY: ${{ secrets.ANTHROPIC_API_KEY }}
run: |
python orchestrator.py review \
--pr-number ${{ github.event.pull_request.number }} \
--agents security,performance,quality
"""
Common Pitfalls and Solutions
Avoid these orchestration mistakes:
Over-orchestration: Not every task needs multiple agents. Simple tasks should use simple approaches. See when-not-to-use-ai.
Context explosion: Passing full context to every agent wastes tokens and degrades performance. Give each agent only what it needs.
Ignoring failures: One failed agent shouldn't crash the entire workflow. Implement graceful degradation.
No quality gates: Without validation between agents, errors compound. Always validate outputs.
Blind trust in consensus: Multiple agents agreeing doesn't guarantee correctness. Maintain quality-control standards.
Measuring Orchestration Success
Track these metrics:
class OrchestrationMetrics:
def __init__(self):
self.metrics = {
'total_workflows': 0,
'successful_workflows': 0,
'agent_invocations': defaultdict(int),
'average_duration': 0,
'cost_per_workflow': 0,
'quality_scores': []
}
def record_workflow(self, workflow_state: WorkflowState, cost: float):
self.metrics['total_workflows'] += 1
if not workflow_state.is_blocked():
self.metrics['successful_workflows'] += 1
# Track per-agent usage
for task_name, result in workflow_state.task_results.items():
self.metrics['agent_invocations'][result.agent] += 1
self.metrics['cost_per_workflow'] = (
(self.metrics['cost_per_workflow'] * (self.metrics['total_workflows'] - 1) + cost)
/ self.metrics['total_workflows']
)
def print_summary(self):
print(f"Success Rate: {self.metrics['successful_workflows'] / self.metrics['total_workflows'] * 100:.1f}%")
print(f"Average Cost: ${self.metrics['cost_per_workflow']:.2f}")
print(f"Agent Usage: {dict(self.metrics['agent_invocations'])}")
Next Steps
You now have the foundation for multi-agent orchestration. To continue advancing:
- Build a simple orchestrator for your specific use case
- Start with 2-3 agents and expand as you identify needs
- Implement comprehensive logging and monitoring
- Review security-considerations for production deployments
- Explore mcp-development for more advanced agent capabilities
Remember: orchestration is about smart coordination, not complexity for its own sake. Start simple, measure results, and add sophistication only where it delivers clear value.
The best orchestration systems are those that solve real problems efficiently while remaining maintainable by your team. Build yours with that principle in mind.