Agentic Optimization Strategies
Optimize agentic workflows for reliability, efficiency, and quality through advanced configuration and monitoring.
Agentic Optimization Strategies
You've built agentic workflows that work—now it's time to make them work better. As AI agents handle increasingly complex tasks in your development process, optimization becomes critical. Inefficient agents burn API tokens, waste time, and produce subpar results. This lesson reveals advanced strategies for tuning your agentic workflows to peak performance.
Understanding Agentic Performance Bottlenecks
Before optimizing, you need to identify what's slowing down your agents. Unlike traditional code optimization, agentic workflows have unique performance characteristics.
The Three Dimensions of Agentic Performance
Token Efficiency: Every prompt consumes tokens. Redundant context, verbose instructions, or unnecessary iterations directly impact costs and latency.
Execution Speed: Agents often make sequential API calls. Each round trip adds latency. The difference between a 5-second and 45-second workflow compounds over hundreds of daily operations.
Output Quality: Fast but wrong is worthless. True optimization balances speed with accuracy, reducing iterations needed to reach acceptable results.
Here's a diagnostic wrapper to measure these dimensions:
import time
from dataclasses import dataclass
from typing import Any, Callable
@dataclass
class AgentMetrics:
execution_time: float
total_tokens: int
prompt_tokens: int
completion_tokens: int
iterations: int
success: bool
class AgentProfiler:
def __init__(self):
self.metrics_history = []
def profile(self, agent_func: Callable, *args, **kwargs) -> tuple[Any, AgentMetrics]:
start_time = time.time()
total_prompt_tokens = 0
total_completion_tokens = 0
iterations = 0
# Wrap the API client to track token usage
original_create = agent_func.__globals__['client'].chat.completions.create
def tracked_create(*api_args, **api_kwargs):
nonlocal total_prompt_tokens, total_completion_tokens, iterations
iterations += 1
response = original_create(*api_args, **api_kwargs)
total_prompt_tokens += response.usage.prompt_tokens
total_completion_tokens += response.usage.completion_tokens
return response
agent_func.__globals__['client'].chat.completions.create = tracked_create
try:
result = agent_func(*args, **kwargs)
success = True
except Exception as e:
result = None
success = False
finally:
agent_func.__globals__['client'].chat.completions.create = original_create
metrics = AgentMetrics(
execution_time=time.time() - start_time,
total_tokens=total_prompt_tokens + total_completion_tokens,
prompt_tokens=total_prompt_tokens,
completion_tokens=total_completion_tokens,
iterations=iterations,
success=success
)
self.metrics_history.append(metrics)
return result, metrics
def report(self):
if not self.metrics_history:
return "No metrics collected"
avg_time = sum(m.execution_time for m in self.metrics_history) / len(self.metrics_history)
avg_tokens = sum(m.total_tokens for m in self.metrics_history) / len(self.metrics_history)
success_rate = sum(m.success for m in self.metrics_history) / len(self.metrics_history)
return f"""
Average Execution Time: {avg_time:.2f}s
Average Token Usage: {avg_tokens:.0f}
Success Rate: {success_rate:.1%}
Total Runs: {len(self.metrics_history)}
"""
Run this profiler on your existing workflows to establish baselines before optimization.
Strategy 1: Context Pruning and Compression
The most common inefficiency in agentic workflows is bloated context. Agents often receive far more information than needed.
Implement Dynamic Context Windows
Rather than passing entire file contents or conversation histories, implement intelligent context selection:
import ast
from typing import List
class ContextPruner:
def __init__(self, max_tokens: int = 4000):
self.max_tokens = max_tokens
def extract_relevant_code(self, file_path: str, target_function: str) -> str:
"""Extract only the function and its direct dependencies"""
with open(file_path, 'r') as f:
tree = ast.parse(f.read())
relevant_nodes = []
imports = []
# Collect imports and target function
for node in ast.walk(tree):
if isinstance(node, (ast.Import, ast.ImportFrom)):
imports.append(ast.unparse(node))
elif isinstance(node, ast.FunctionDef) and node.name == target_function:
relevant_nodes.append(ast.unparse(node))
# Analyze function body for called functions
for child in ast.walk(node):
if isinstance(child, ast.Call):
if isinstance(child.func, ast.Name):
# Find this function's definition
for potential_dep in ast.walk(tree):
if isinstance(potential_dep, ast.FunctionDef) and \
potential_dep.name == child.func.id:
relevant_nodes.append(ast.unparse(potential_dep))
# Combine with size limit
context = "\n".join(imports) + "\n\n" + "\n\n".join(relevant_nodes)
return self._truncate_to_token_limit(context)
def _truncate_to_token_limit(self, text: str) -> str:
# Rough estimation: 1 token ≈ 4 characters
char_limit = self.max_tokens * 4
if len(text) <= char_limit:
return text
return text[:char_limit] + "\n\n# ... truncated ..."
def compress_conversation_history(self, messages: List[dict]) -> List[dict]:
"""Keep only the most recent and most relevant messages"""
if len(messages) <= 10:
return messages
# Always keep system message and last 5 exchanges
system_msgs = [m for m in messages if m['role'] == 'system']
recent_msgs = messages[-10:]
# Keep error messages and important milestones
important_msgs = [m for m in messages[:-10]
if 'error' in m['content'].lower()
or 'complete' in m['content'].lower()]
return system_msgs + important_msgs + recent_msgs
This approach reduced token usage by 60-70% in production workflows while maintaining accuracy.
Semantic Chunking Over Naive Splitting
When you must include large documents, split them intelligently:
from typing import List
import re
class SemanticChunker:
def chunk_code_by_scope(self, code: str, max_chunk_size: int = 1000) -> List[str]:
"""Split code at natural boundaries (class/function definitions)"""
chunks = []
current_chunk = []
current_size = 0
lines = code.split('\n')
for i, line in enumerate(lines):
# Detect scope boundaries
is_boundary = bool(re.match(r'^(class |def |async def )', line.strip()))
line_size = len(line)
if is_boundary and current_size > 0 and current_size + line_size > max_chunk_size:
# Start new chunk at this boundary
chunks.append('\n'.join(current_chunk))
current_chunk = [line]
current_size = line_size
else:
current_chunk.append(line)
current_size += line_size
if current_chunk:
chunks.append('\n'.join(current_chunk))
return chunks
Strategy 2: Parallel Execution Patterns
Sequential agent calls are often unnecessary. Many tasks can run concurrently.
Identifying Parallelizable Operations
Look for these patterns in your workflows:
- Multiple independent file analyses
- Batch processing of similar items
- Validation checks that don't depend on each other
import asyncio
from typing import List, Callable, Any
class ParallelAgentExecutor:
def __init__(self, max_concurrent: int = 5):
self.semaphore = asyncio.Semaphore(max_concurrent)
async def execute_agent_task(self, agent_func: Callable, *args, **kwargs) -> Any:
async with self.semaphore:
# Rate limiting built-in
return await asyncio.to_thread(agent_func, *args, **kwargs)
async def batch_process(self, agent_func: Callable, items: List[Any]) -> List[Any]:
tasks = [self.execute_agent_task(agent_func, item) for item in items]
return await asyncio.gather(*tasks, return_exceptions=True)
# Example: Parallel code review
async def review_multiple_files(files: List[str]):
executor = ParallelAgentExecutor(max_concurrent=3)
def review_single_file(filepath: str) -> dict:
# Your existing agent code review function
response = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": "You are a code reviewer."},
{"role": "user", "content": f"Review this file: {open(filepath).read()}"}
]
)
return {"file": filepath, "review": response.choices[0].message.content}
results = await executor.batch_process(review_single_file, files)
return [r for r in results if not isinstance(r, Exception)]
# Usage
files = ["src/auth.py", "src/db.py", "src/api.py"]
reviews = asyncio.run(review_multiple_files(files))
This pattern reduced review time from 45 seconds to 12 seconds for a 5-file changeset.
Strategy 3: Intelligent Caching Layers
AI responses for identical or similar inputs are often wasteful. Implement strategic caching.
Multi-Level Caching Architecture
import hashlib
import json
from functools import wraps
from typing import Optional, Callable
import redis
class AgentCache:
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis_client = redis.from_url(redis_url)
self.local_cache = {} # Fast in-memory cache
def _generate_key(self, func_name: str, args: tuple, kwargs: dict) -> str:
# Create deterministic hash of inputs
content = json.dumps({
'func': func_name,
'args': args,
'kwargs': kwargs
}, sort_keys=True)
return f"agent_cache:{hashlib.sha256(content.encode()).hexdigest()}"
def cached_agent(self, ttl: int = 3600, similarity_threshold: float = 0.95):
def decorator(func: Callable):
@wraps(func)
def wrapper(*args, **kwargs):
cache_key = self._generate_key(func.__name__, args, kwargs)
# Check local cache first (microseconds)
if cache_key in self.local_cache:
return self.local_cache[cache_key]
# Check Redis (milliseconds)
cached = self.redis_client.get(cache_key)
if cached:
result = json.loads(cached)
self.local_cache[cache_key] = result
return result
# Cache miss - execute agent
result = func(*args, **kwargs)
# Store in both caches
self.local_cache[cache_key] = result
self.redis_client.setex(cache_key, ttl, json.dumps(result))
return result
return wrapper
return decorator
# Usage
cache = AgentCache()
@cache.cached_agent(ttl=7200)
def analyze_function_complexity(code: str) -> dict:
response = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": "Analyze code complexity and return JSON."},
{"role": "user", "content": code}
]
)
return json.loads(response.choices[0].message.content)
Semantic Similarity Caching
For more sophisticated caching, detect semantically similar requests:
from sentence_transformers import SentenceTransformer
import numpy as np
class SemanticCache:
def __init__(self, similarity_threshold: float = 0.92):
self.model = SentenceTransformer('all-MiniLM-L6-v2')
self.cache_store = {} # {embedding_key: (embedding, response)}
self.threshold = similarity_threshold
def get_similar(self, query: str) -> Optional[Any]:
if not self.cache_store:
return None
query_embedding = self.model.encode(query)
# Compare with all cached embeddings
for key, (cached_embedding, response) in self.cache_store.items():
similarity = np.dot(query_embedding, cached_embedding) / \
(np.linalg.norm(query_embedding) * np.linalg.norm(cached_embedding))
if similarity >= self.threshold:
return response
return None
def store(self, query: str, response: Any):
embedding = self.model.encode(query)
key = hashlib.sha256(query.encode()).hexdigest()
self.cache_store[key] = (embedding, response)
Semantic caching is particularly effective for documentation generation, code explanations, and similar requests phrased differently.
Strategy 4: Prompt Optimization Techniques
The prompt itself is your primary optimization lever. Small changes can dramatically improve performance.
Structured Output Enforcement
Force agents to return structured data, eliminating parsing errors and reducing tokens:
from pydantic import BaseModel
from typing import List
import json
class CodeIssue(BaseModel):
line: int
severity: str # "error", "warning", "info"
message: str
suggested_fix: str
class CodeReviewResult(BaseModel):
overall_score: int # 0-100
issues: List[CodeIssue]
summary: str
def review_code_structured(code: str) -> CodeReviewResult:
prompt = f"""Review this code and return ONLY valid JSON matching this schema:
{{
"overall_score": <0-100>,
"issues": [
{{
"line": <line_number>,
"severity": "error|warning|info",
"message": "<description>",
"suggested_fix": "<fix>"
}}
],
"summary": "<brief_summary>"
}}
Code to review:
{code}
Return JSON only, no markdown formatting:"""
response = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
temperature=0.3, # Lower temperature for structured output
response_format={"type": "json_object"} # If supported by your model
)
result_dict = json.loads(response.choices[0].message.content)
return CodeReviewResult(**result_dict)
Few-Shot Optimization
Provide examples strategically. More isn't always better:
def create_optimized_few_shot_prompt(task: str, examples: List[dict]) -> str:
# Select most relevant examples using embedding similarity
from sentence_transformers import SentenceTransformer
model = SentenceTransformer('all-MiniLM-L6-v2')
task_embedding = model.encode(task)
# Rank examples by similarity
ranked_examples = sorted(
examples,
key=lambda ex: np.dot(task_embedding, model.encode(ex['input'])),
reverse=True
)[:3] # Use only top 3 most relevant
examples_text = "\n\n".join([
f"Input: {ex['input']}\nOutput: {ex['output']}"
for ex in ranked_examples
])
return f"""Examples:
{examples_text}
Now complete this task:
{task}"""
Strategy 5: Model Selection and Routing
Not every task requires your most powerful model. Route intelligently.
from enum import Enum
class TaskComplexity(Enum):
SIMPLE = "simple" # Pattern matching, formatting
MODERATE = "moderate" # Code review, documentation
COMPLEX = "complex" # Architecture design, debugging
class ModelRouter:
MODEL_MAP = {
TaskComplexity.SIMPLE: "gpt-3.5-turbo",
TaskComplexity.MODERATE: "gpt-4",
TaskComplexity.COMPLEX: "gpt-4-turbo-preview"
}
def classify_task(self, task_description: str) -> TaskComplexity:
# Use a fast classifier (could be rule-based or small ML model)
keywords_complex = ["architecture", "design", "debug", "performance"]
keywords_simple = ["format", "rename", "extract", "list"]
desc_lower = task_description.lower()
if any(kw in desc_lower for kw in keywords_complex):
return TaskComplexity.COMPLEX
elif any(kw in desc_lower for kw in keywords_simple):
return TaskComplexity.SIMPLE
else:
return TaskComplexity.MODERATE
def route_task(self, task: str, user_prompt: str) -> str:
complexity = self.classify_task(task)
model = self.MODEL_MAP[complexity]
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": user_prompt}]
)
return response.choices[0].message.content
# Usage
router = ModelRouter()
result = router.route_task(
task="Format this JSON file",
user_prompt="Format this JSON: {...}"
) # Routes to gpt-3.5-turbo automatically
Strategy 6: Incremental Processing with Checkpoints
For long-running agentic workflows, implement checkpointing to avoid complete re-runs on failure.
import pickle
from pathlib import Path
from typing import Any, Callable
class CheckpointedAgent:
def __init__(self, checkpoint_dir: str = ".agent_checkpoints"):
self.checkpoint_dir = Path(checkpoint_dir)
self.checkpoint_dir.mkdir(exist_ok=True)
def execute_with_checkpoints(self, workflow_id: str, stages: List[tuple[str, Callable]]):
results = {}
for stage_name, stage_func in stages:
checkpoint_file = self.checkpoint_dir / f"{workflow_id}_{stage_name}.pkl"
# Try to load from checkpoint
if checkpoint_file.exists():
print(f"Loading {stage_name} from checkpoint")
with open(checkpoint_file, 'rb') as f:
results[stage_name] = pickle.load(f)
continue
# Execute stage
print(f"Executing {stage_name}")
result = stage_func(results) # Pass previous results
results[stage_name] = result
# Save checkpoint
with open(checkpoint_file, 'wb') as f:
pickle.dump(result, f)
return results
def clear_checkpoints(self, workflow_id: str):
for checkpoint_file in self.checkpoint_dir.glob(f"{workflow_id}_*.pkl"):
checkpoint_file.unlink()
# Example usage
def analyze_codebase_workflow():
agent = CheckpointedAgent()
def stage1_file_discovery(prev_results):
# Discover all Python files
return list(Path("src").rglob("*.py"))
def stage2_parse_files(prev_results):
# Parse AST for each file
files = prev_results['file_discovery']
# ... parsing logic
return parsed_data
def stage3_analyze_patterns(prev_results):
# AI-powered pattern analysis
parsed = prev_results['parse_files']
# ... analysis logic
return analysis_results
return agent.execute_with_checkpoints(
workflow_id="codebase_analysis_v1",
stages=[
("file_discovery", stage1_file_discovery),
("parse_files", stage2_parse_files),
("analyze_patterns", stage3_analyze_patterns)
]
)
Measuring Optimization Impact
After implementing these strategies, quantify improvements:
class OptimizationComparison:
def __init__(self):
self.baseline_metrics = []
self.optimized_metrics = []
def compare(self):
if not self.baseline_metrics or not self.optimized_metrics:
return "Insufficient data"
baseline_avg_time = np.mean([m.execution_time for m in self.baseline_metrics])
optimized_avg_time = np.mean([m.execution_time for m in self.optimized_metrics])
baseline_avg_tokens = np.mean([m.total_tokens for m in self.baseline_metrics])
optimized_avg_tokens = np.mean([m.total_tokens for m in self.optimized_metrics])
time_improvement = ((baseline_avg_time - optimized_avg_time) / baseline_avg_time) * 100
token_improvement = ((baseline_avg_tokens - optimized_avg_tokens) / baseline_avg_tokens) * 100
return f"""
Performance Improvements:
- Execution Time: {time_improvement:.1f}% faster
- Token Usage: {token_improvement:.1f}% reduction
- Cost Savings: ~${(baseline_avg_tokens - optimized_avg_tokens) * 0.00002:.4f} per run
"""
Practical Optimization Workflow
Follow this process for any agentic workflow:
- Profile: Run with AgentProfiler to establish baseline
- Identify bottlenecks: Token usage too high? Too many iterations? Slow execution?
- Apply targeted optimizations: Context pruning for token issues, parallelization for speed, caching for repeated operations
- Measure impact: Compare before/after metrics
- Iterate: Optimization is ongoing, especially as your workflow evolves
Integration with Existing Workflows
These strategies complement lessons you've already mastered. When implementing optimizations:
- Maintain quality-control measures—speed without accuracy is worthless
- Keep hallucination-detection checks in place
- Consider security-considerations when caching sensitive data
- Apply team-workflows principles to share optimization patterns
Avoid over-reliance on optimization. If an agent task is fundamentally unsuited for AI, no amount of optimization fixes it. Review when-not-to-use-ai if optimization feels like fighting the tool.
Common Optimization Pitfalls
Over-caching: Stale cache entries can return outdated results. Implement TTLs and cache invalidation strategies.
Premature parallelization: Parallel execution adds complexity. Only parallelize proven bottlenecks.
Aggressive context pruning: Removing too much context degrades quality. Always validate output quality after pruning.
Model routing complexity: Overly complex routing logic can introduce bugs. Start simple, then refine.
Moving Forward
Agentic optimization is both art and science. Start with measurement, apply strategies incrementally, and validate each change. The techniques in this lesson have reduced operational costs by 40-70% in production environments while improving response times.
As you scale your vibe coding practice (scaling-vibe-coding), optimization becomes non-negotiable. These strategies ensure your agentic workflows remain sustainable and cost-effective at any scale.
Next steps: Apply the profiling wrapper to your most expensive workflow, identify the largest bottleneck, and implement one optimization strategy. Measure the impact before moving to the next improvement.