Agentic Optimization Strategies for AI Workflows

# Agentic Optimization Strategies You've built agentic workflows that work—now it's time to make them work *better*. As AI agents handle increasingly complex tasks in your development process, optimization becomes critical. Inefficient agents burn API tokens, waste time, and produce subpar results. This lesson reveals advanced strategies for tuning your agentic workflows to peak performance. ## Understanding Agentic Performance Bottlenecks Before optimizing, you need to identify what's slowing down your agents. Unlike traditional code optimization, agentic workflows have unique performance characteristics. ### The Three Dimensions of Agentic Performance **Token Efficiency**: Every prompt consumes tokens. Redundant context, verbose instructions, or unnecessary iterations directly impact costs and latency. **Execution Speed**: Agents often make sequential API calls. Each round trip adds latency. The difference between a 5-second and 45-second workflow compounds over hundreds of daily operations. **Output Quality**: Fast but wrong is worthless. True optimization balances speed with accuracy, reducing iterations needed to reach acceptable results. Here's a diagnostic wrapper to measure these dimensions: ```python import time from dataclasses import dataclass from typing import Any, Callable @dataclass class AgentMetrics: execution_time: float total_tokens: int prompt_tokens: int completion_tokens: int iterations: int success: bool class AgentProfiler: def __init__(self): self.metrics_history = [] def profile(self, agent_func: Callable, *args, **kwargs) -> tuple[Any, AgentMetrics]: start_time = time.time() total_prompt_tokens = 0 total_completion_tokens = 0 iterations = 0 # Wrap the API client to track token usage original_create = agent_func.__globals__['client'].chat.completions.create def tracked_create(*api_args, **api_kwargs): nonlocal total_prompt_tokens, total_completion_tokens, iterations iterations += 1 response = original_create(*api_args, **api_kwargs) total_prompt_tokens += response.usage.prompt_tokens total_completion_tokens += response.usage.completion_tokens return response agent_func.__globals__['client'].chat.completions.create = tracked_create try: result = agent_func(*args, **kwargs) success = True except Exception as e: result = None success = False finally: agent_func.__globals__['client'].chat.completions.create = original_create metrics = AgentMetrics( execution_time=time.time() - start_time, total_tokens=total_prompt_tokens + total_completion_tokens, prompt_tokens=total_prompt_tokens, completion_tokens=total_completion_tokens, iterations=iterations, success=success ) self.metrics_history.append(metrics) return result, metrics def report(self): if not self.metrics_history: return "No metrics collected" avg_time = sum(m.execution_time for m in self.metrics_history) / len(self.metrics_history) avg_tokens = sum(m.total_tokens for m in self.metrics_history) / len(self.metrics_history) success_rate = sum(m.success for m in self.metrics_history) / len(self.metrics_history) return f""" Average Execution Time: {avg_time:.2f}s Average Token Usage: {avg_tokens:.0f} Success Rate: {success_rate:.1%} Total Runs: {len(self.metrics_history)} """ ``` Run this profiler on your existing workflows to establish baselines before optimization. ## Strategy 1: Context Pruning and Compression The most common inefficiency in agentic workflows is bloated context. Agents often receive far more information than needed. ### Implement Dynamic Context Windows Rather than passing entire file contents or conversation histories, implement intelligent context selection: ```python import ast from typing import List class ContextPruner: def __init__(self, max_tokens: int = 4000): self.max_tokens = max_tokens def extract_relevant_code(self, file_path: str, target_function: str) -> str: """Extract only the function and its direct dependencies""" with open(file_path, 'r') as f: tree = ast.parse(f.read()) relevant_nodes = [] imports = [] # Collect imports and target function for node in ast.walk(tree): if isinstance(node, (ast.Import, ast.ImportFrom)): imports.append(ast.unparse(node)) elif isinstance(node, ast.FunctionDef) and node.name == target_function: relevant_nodes.append(ast.unparse(node)) # Analyze function body for called functions for child in ast.walk(node): if isinstance(child, ast.Call): if isinstance(child.func, ast.Name): # Find this function's definition for potential_dep in ast.walk(tree): if isinstance(potential_dep, ast.FunctionDef) and \ potential_dep.name == child.func.id: relevant_nodes.append(ast.unparse(potential_dep)) # Combine with size limit context = "\n".join(imports) + "\n\n" + "\n\n".join(relevant_nodes) return self._truncate_to_token_limit(context) def _truncate_to_token_limit(self, text: str) -> str: # Rough estimation: 1 token ≈ 4 characters char_limit = self.max_tokens * 4 if len(text) <= char_limit: return text return text[:char_limit] + "\n\n# ... truncated ..." def compress_conversation_history(self, messages: List[dict]) -> List[dict]: """Keep only the most recent and most relevant messages""" if len(messages) <= 10: return messages # Always keep system message and last 5 exchanges system_msgs = [m for m in messages if m['role'] == 'system'] recent_msgs = messages[-10:] # Keep error messages and important milestones important_msgs = [m for m in messages[:-10] if 'error' in m['content'].lower() or 'complete' in m['content'].lower()] return system_msgs + important_msgs + recent_msgs ``` This approach reduced token usage by 60-70% in production workflows while maintaining accuracy. ### Semantic Chunking Over Naive Splitting When you must include large documents, split them intelligently: ```python from typing import List import re class SemanticChunker: def chunk_code_by_scope(self, code: str, max_chunk_size: int = 1000) -> List[str]: """Split code at natural boundaries (class/function definitions)""" chunks = [] current_chunk = [] current_size = 0 lines = code.split('\n') for i, line in enumerate(lines): # Detect scope boundaries is_boundary = bool(re.match(r'^(class |def |async def )', line.strip())) line_size = len(line) if is_boundary and current_size > 0 and current_size + line_size > max_chunk_size: # Start new chunk at this boundary chunks.append('\n'.join(current_chunk)) current_chunk = [line] current_size = line_size else: current_chunk.append(line) current_size += line_size if current_chunk: chunks.append('\n'.join(current_chunk)) return chunks ``` ## Strategy 2: Parallel Execution Patterns Sequential agent calls are often unnecessary. Many tasks can run concurrently. ### Identifying Parallelizable Operations Look for these patterns in your workflows: - Multiple independent file analyses - Batch processing of similar items - Validation checks that don't depend on each other ```python import asyncio from typing import List, Callable, Any class ParallelAgentExecutor: def __init__(self, max_concurrent: int = 5): self.semaphore = asyncio.Semaphore(max_concurrent) async def execute_agent_task(self, agent_func: Callable, *args, **kwargs) -> Any: async with self.semaphore: # Rate limiting built-in return await asyncio.to_thread(agent_func, *args, **kwargs) async def batch_process(self, agent_func: Callable, items: List[Any]) -> List[Any]: tasks = [self.execute_agent_task(agent_func, item) for item in items] return await asyncio.gather(*tasks, return_exceptions=True) # Example: Parallel code review async def review_multiple_files(files: List[str]): executor = ParallelAgentExecutor(max_concurrent=3) def review_single_file(filepath: str) -> dict: # Your existing agent code review function response = client.chat.completions.create( model="gpt-4", messages=[ {"role": "system", "content": "You are a code reviewer."}, {"role": "user", "content": f"Review this file: {open(filepath).read()}"} ] ) return {"file": filepath, "review": response.choices[0].message.content} results = await executor.batch_process(review_single_file, files) return [r for r in results if not isinstance(r, Exception)] # Usage files = ["src/auth.py", "src/db.py", "src/api.py"] reviews = asyncio.run(review_multiple_files(files)) ``` This pattern reduced review time from 45 seconds to 12 seconds for a 5-file changeset. ## Strategy 3: Intelligent Caching Layers AI responses for identical or similar inputs are often wasteful. Implement strategic caching. ### Multi-Level Caching Architecture ```python import hashlib import json from functools import wraps from typing import Optional, Callable import redis class AgentCache: def __init__(self, redis_url: str = "redis://localhost:6379"): self.redis_client = redis.from_url(redis_url) self.local_cache = {} # Fast in-memory cache def _generate_key(self, func_name: str, args: tuple, kwargs: dict) -> str: # Create deterministic hash of inputs content = json.dumps({ 'func': func_name, 'args': args, 'kwargs': kwargs }, sort_keys=True) return f"agent_cache:{hashlib.sha256(content.encode()).hexdigest()}" def cached_agent(self, ttl: int = 3600, similarity_threshold: float = 0.95): def decorator(func: Callable): @wraps(func) def wrapper(*args, **kwargs): cache_key = self._generate_key(func.__name__, args, kwargs) # Check local cache first (microseconds) if cache_key in self.local_cache: return self.local_cache[cache_key] # Check Redis (milliseconds) cached = self.redis_client.get(cache_key) if cached: result = json.loads(cached) self.local_cache[cache_key] = result return result # Cache miss - execute agent result = func(*args, **kwargs) # Store in both caches self.local_cache[cache_key] = result self.redis_client.setex(cache_key, ttl, json.dumps(result)) return result return wrapper return decorator # Usage cache = AgentCache() @cache.cached_agent(ttl=7200) def analyze_function_complexity(code: str) -> dict: response = client.chat.completions.create( model="gpt-4", messages=[ {"role": "system", "content": "Analyze code complexity and return JSON."}, {"role": "user", "content": code} ] ) return json.loads(response.choices[0].message.content) ``` ### Semantic Similarity Caching For more sophisticated caching, detect semantically similar requests: ```python from sentence_transformers import SentenceTransformer import numpy as np class SemanticCache: def __init__(self, similarity_threshold: float = 0.92): self.model = SentenceTransformer('all-MiniLM-L6-v2') self.cache_store = {} # {embedding_key: (embedding, response)} self.threshold = similarity_threshold def get_similar(self, query: str) -> Optional[Any]: if not self.cache_store: return None query_embedding = self.model.encode(query) # Compare with all cached embeddings for key, (cached_embedding, response) in self.cache_store.items(): similarity = np.dot(query_embedding, cached_embedding) / \ (np.linalg.norm(query_embedding) * np.linalg.norm(cached_embedding)) if similarity >= self.threshold: return response return None def store(self, query: str, response: Any): embedding = self.model.encode(query) key = hashlib.sha256(query.encode()).hexdigest() self.cache_store[key] = (embedding, response) ``` Semantic caching is particularly effective for documentation generation, code explanations, and similar requests phrased differently. ## Strategy 4: Prompt Optimization Techniques The prompt itself is your primary optimization lever. Small changes can dramatically improve performance. ### Structured Output Enforcement Force agents to return structured data, eliminating parsing errors and reducing tokens: ```python from pydantic import BaseModel from typing import List import json class CodeIssue(BaseModel): line: int severity: str # "error", "warning", "info" message: str suggested_fix: str class CodeReviewResult(BaseModel): overall_score: int # 0-100 issues: List[CodeIssue] summary: str def review_code_structured(code: str) -> CodeReviewResult: prompt = f"""Review this code and return ONLY valid JSON matching this schema: {{ "overall_score": <0-100>, "issues": [ {{ "line": , "severity": "error|warning|info", "message": "", "suggested_fix": "" }} ], "summary": "" }} Code to review: ``` {code} ``` Return JSON only, no markdown formatting:""" response = client.chat.completions.create( model="gpt-4", messages=[{"role": "user", "content": prompt}], temperature=0.3, # Lower temperature for structured output response_format={"type": "json_object"} # If supported by your model ) result_dict = json.loads(response.choices[0].message.content) return CodeReviewResult(**result_dict) ``` ### Few-Shot Optimization Provide examples strategically. More isn't always better: ```python def create_optimized_few_shot_prompt(task: str, examples: List[dict]) -> str: # Select most relevant examples using embedding similarity from sentence_transformers import SentenceTransformer model = SentenceTransformer('all-MiniLM-L6-v2') task_embedding = model.encode(task) # Rank examples by similarity ranked_examples = sorted( examples, key=lambda ex: np.dot(task_embedding, model.encode(ex['input'])), reverse=True )[:3] # Use only top 3 most relevant examples_text = "\n\n".join([ f"Input: {ex['input']}\nOutput: {ex['output']}" for ex in ranked_examples ]) return f"""Examples: {examples_text} Now complete this task: {task}""" ``` ## Strategy 5: Model Selection and Routing Not every task requires your most powerful model. Route intelligently. ```python from enum import Enum class TaskComplexity(Enum): SIMPLE = "simple" # Pattern matching, formatting MODERATE = "moderate" # Code review, documentation COMPLEX = "complex" # Architecture design, debugging class ModelRouter: MODEL_MAP = { TaskComplexity.SIMPLE: "gpt-3.5-turbo", TaskComplexity.MODERATE: "gpt-4", TaskComplexity.COMPLEX: "gpt-4-turbo-preview" } def classify_task(self, task_description: str) -> TaskComplexity: # Use a fast classifier (could be rule-based or small ML model) keywords_complex = ["architecture", "design", "debug", "performance"] keywords_simple = ["format", "rename", "extract", "list"] desc_lower = task_description.lower() if any(kw in desc_lower for kw in keywords_complex): return TaskComplexity.COMPLEX elif any(kw in desc_lower for kw in keywords_simple): return TaskComplexity.SIMPLE else: return TaskComplexity.MODERATE def route_task(self, task: str, user_prompt: str) -> str: complexity = self.classify_task(task) model = self.MODEL_MAP[complexity] response = client.chat.completions.create( model=model, messages=[{"role": "user", "content": user_prompt}] ) return response.choices[0].message.content # Usage router = ModelRouter() result = router.route_task( task="Format this JSON file", user_prompt="Format this JSON: {...}" ) # Routes to gpt-3.5-turbo automatically ``` ## Strategy 6: Incremental Processing with Checkpoints For long-running agentic workflows, implement checkpointing to avoid complete re-runs on failure. ```python import pickle from pathlib import Path from typing import Any, Callable class CheckpointedAgent: def __init__(self, checkpoint_dir: str = ".agent_checkpoints"): self.checkpoint_dir = Path(checkpoint_dir) self.checkpoint_dir.mkdir(exist_ok=True) def execute_with_checkpoints(self, workflow_id: str, stages: List[tuple[str, Callable]]): results = {} for stage_name, stage_func in stages: checkpoint_file = self.checkpoint_dir / f"{workflow_id}_{stage_name}.pkl" # Try to load from checkpoint if checkpoint_file.exists(): print(f"Loading {stage_name} from checkpoint") with open(checkpoint_file, 'rb') as f: results[stage_name] = pickle.load(f) continue # Execute stage print(f"Executing {stage_name}") result = stage_func(results) # Pass previous results results[stage_name] = result # Save checkpoint with open(checkpoint_file, 'wb') as f: pickle.dump(result, f) return results def clear_checkpoints(self, workflow_id: str): for checkpoint_file in self.checkpoint_dir.glob(f"{workflow_id}_*.pkl"): checkpoint_file.unlink() # Example usage def analyze_codebase_workflow(): agent = CheckpointedAgent() def stage1_file_discovery(prev_results): # Discover all Python files return list(Path("src").rglob("*.py")) def stage2_parse_files(prev_results): # Parse AST for each file files = prev_results['file_discovery'] # ... parsing logic return parsed_data def stage3_analyze_patterns(prev_results): # AI-powered pattern analysis parsed = prev_results['parse_files'] # ... analysis logic return analysis_results return agent.execute_with_checkpoints( workflow_id="codebase_analysis_v1", stages=[ ("file_discovery", stage1_file_discovery), ("parse_files", stage2_parse_files), ("analyze_patterns", stage3_analyze_patterns) ] ) ``` ## Measuring Optimization Impact After implementing these strategies, quantify improvements: ```python class OptimizationComparison: def __init__(self): self.baseline_metrics = [] self.optimized_metrics = [] def compare(self): if not self.baseline_metrics or not self.optimized_metrics: return "Insufficient data" baseline_avg_time = np.mean([m.execution_time for m in self.baseline_metrics]) optimized_avg_time = np.mean([m.execution_time for m in self.optimized_metrics]) baseline_avg_tokens = np.mean([m.total_tokens for m in self.baseline_metrics]) optimized_avg_tokens = np.mean([m.total_tokens for m in self.optimized_metrics]) time_improvement = ((baseline_avg_time - optimized_avg_time) / baseline_avg_time) * 100 token_improvement = ((baseline_avg_tokens - optimized_avg_tokens) / baseline_avg_tokens) * 100 return f""" Performance Improvements: - Execution Time: {time_improvement:.1f}% faster - Token Usage: {token_improvement:.1f}% reduction - Cost Savings: ~${(baseline_avg_tokens - optimized_avg_tokens) * 0.00002:.4f} per run """ ``` ## Practical Optimization Workflow Follow this process for any agentic workflow: 1. **Profile**: Run with AgentProfiler to establish baseline 2. **Identify bottlenecks**: Token usage too high? Too many iterations? Slow execution? 3. **Apply targeted optimizations**: Context pruning for token issues, parallelization for speed, caching for repeated operations 4. **Measure impact**: Compare before/after metrics 5. **Iterate**: Optimization is ongoing, especially as your workflow evolves ## Integration with Existing Workflows These strategies complement lessons you've already mastered. When implementing optimizations: - Maintain [quality-control](/lessons/quality-control) measures—speed without accuracy is worthless - Keep [hallucination-detection](/lessons/hallucination-detection) checks in place - Consider [security-considerations](/lessons/security-considerations) when caching sensitive data - Apply [team-workflows](/lessons/team-workflows) principles to share optimization patterns Avoid [over-reliance](/lessons/over-reliance) on optimization. If an agent task is fundamentally unsuited for AI, no amount of optimization fixes it. Review [when-not-to-use-ai](/lessons/when-not-to-use-ai) if optimization feels like fighting the tool. ## Common Optimization Pitfalls **Over-caching**: Stale cache entries can return outdated results. Implement TTLs and cache invalidation strategies. **Premature parallelization**: Parallel execution adds complexity. Only parallelize proven bottlenecks. **Aggressive context pruning**: Removing too much context degrades quality. Always validate output quality after pruning. **Model routing complexity**: Overly complex routing logic can introduce bugs. Start simple, then refine. ## Moving Forward Agentic optimization is both art and science. Start with measurement, apply strategies incrementally, and validate each change. The techniques in this lesson have reduced operational costs by 40-70% in production environments while improving response times. As you scale your vibe coding practice ([scaling-vibe-coding](/lessons/scaling-vibe-coding)), optimization becomes non-negotiable. These strategies ensure your agentic workflows remain sustainable and cost-effective at any scale. Next steps: Apply the profiling wrapper to your most expensive workflow, identify the largest bottleneck, and implement one optimization strategy. Measure the impact before moving to the next improvement.