Agentic Optimization Strategies

25 minpublished

Optimize agentic workflows for reliability, efficiency, and quality through advanced configuration and monitoring.

Agentic Optimization Strategies

You've built agentic workflows that work—now it's time to make them work better. As AI agents handle increasingly complex tasks in your development process, optimization becomes critical. Inefficient agents burn API tokens, waste time, and produce subpar results. This lesson reveals advanced strategies for tuning your agentic workflows to peak performance.

Understanding Agentic Performance Bottlenecks

Before optimizing, you need to identify what's slowing down your agents. Unlike traditional code optimization, agentic workflows have unique performance characteristics.

The Three Dimensions of Agentic Performance

Token Efficiency: Every prompt consumes tokens. Redundant context, verbose instructions, or unnecessary iterations directly impact costs and latency.

Execution Speed: Agents often make sequential API calls. Each round trip adds latency. The difference between a 5-second and 45-second workflow compounds over hundreds of daily operations.

Output Quality: Fast but wrong is worthless. True optimization balances speed with accuracy, reducing iterations needed to reach acceptable results.

Here's a diagnostic wrapper to measure these dimensions:

import time
from dataclasses import dataclass
from typing import Any, Callable

@dataclass
class AgentMetrics:
    execution_time: float
    total_tokens: int
    prompt_tokens: int
    completion_tokens: int
    iterations: int
    success: bool

class AgentProfiler:
    def __init__(self):
        self.metrics_history = []
    
    def profile(self, agent_func: Callable, *args, **kwargs) -> tuple[Any, AgentMetrics]:
        start_time = time.time()
        total_prompt_tokens = 0
        total_completion_tokens = 0
        iterations = 0
        
        # Wrap the API client to track token usage
        original_create = agent_func.__globals__['client'].chat.completions.create
        
        def tracked_create(*api_args, **api_kwargs):
            nonlocal total_prompt_tokens, total_completion_tokens, iterations
            iterations += 1
            response = original_create(*api_args, **api_kwargs)
            total_prompt_tokens += response.usage.prompt_tokens
            total_completion_tokens += response.usage.completion_tokens
            return response
        
        agent_func.__globals__['client'].chat.completions.create = tracked_create
        
        try:
            result = agent_func(*args, **kwargs)
            success = True
        except Exception as e:
            result = None
            success = False
        finally:
            agent_func.__globals__['client'].chat.completions.create = original_create
        
        metrics = AgentMetrics(
            execution_time=time.time() - start_time,
            total_tokens=total_prompt_tokens + total_completion_tokens,
            prompt_tokens=total_prompt_tokens,
            completion_tokens=total_completion_tokens,
            iterations=iterations,
            success=success
        )
        
        self.metrics_history.append(metrics)
        return result, metrics
    
    def report(self):
        if not self.metrics_history:
            return "No metrics collected"
        
        avg_time = sum(m.execution_time for m in self.metrics_history) / len(self.metrics_history)
        avg_tokens = sum(m.total_tokens for m in self.metrics_history) / len(self.metrics_history)
        success_rate = sum(m.success for m in self.metrics_history) / len(self.metrics_history)
        
        return f"""
        Average Execution Time: {avg_time:.2f}s
        Average Token Usage: {avg_tokens:.0f}
        Success Rate: {success_rate:.1%}
        Total Runs: {len(self.metrics_history)}
        """

Run this profiler on your existing workflows to establish baselines before optimization.

Strategy 1: Context Pruning and Compression

The most common inefficiency in agentic workflows is bloated context. Agents often receive far more information than needed.

Implement Dynamic Context Windows

Rather than passing entire file contents or conversation histories, implement intelligent context selection:

import ast
from typing import List

class ContextPruner:
    def __init__(self, max_tokens: int = 4000):
        self.max_tokens = max_tokens
    
    def extract_relevant_code(self, file_path: str, target_function: str) -> str:
        """Extract only the function and its direct dependencies"""
        with open(file_path, 'r') as f:
            tree = ast.parse(f.read())
        
        relevant_nodes = []
        imports = []
        
        # Collect imports and target function
        for node in ast.walk(tree):
            if isinstance(node, (ast.Import, ast.ImportFrom)):
                imports.append(ast.unparse(node))
            elif isinstance(node, ast.FunctionDef) and node.name == target_function:
                relevant_nodes.append(ast.unparse(node))
                # Analyze function body for called functions
                for child in ast.walk(node):
                    if isinstance(child, ast.Call):
                        if isinstance(child.func, ast.Name):
                            # Find this function's definition
                            for potential_dep in ast.walk(tree):
                                if isinstance(potential_dep, ast.FunctionDef) and \
                                   potential_dep.name == child.func.id:
                                    relevant_nodes.append(ast.unparse(potential_dep))
        
        # Combine with size limit
        context = "\n".join(imports) + "\n\n" + "\n\n".join(relevant_nodes)
        return self._truncate_to_token_limit(context)
    
    def _truncate_to_token_limit(self, text: str) -> str:
        # Rough estimation: 1 token ≈ 4 characters
        char_limit = self.max_tokens * 4
        if len(text) <= char_limit:
            return text
        return text[:char_limit] + "\n\n# ... truncated ..."
    
    def compress_conversation_history(self, messages: List[dict]) -> List[dict]:
        """Keep only the most recent and most relevant messages"""
        if len(messages) <= 10:
            return messages
        
        # Always keep system message and last 5 exchanges
        system_msgs = [m for m in messages if m['role'] == 'system']
        recent_msgs = messages[-10:]
        
        # Keep error messages and important milestones
        important_msgs = [m for m in messages[:-10] 
                         if 'error' in m['content'].lower() 
                         or 'complete' in m['content'].lower()]
        
        return system_msgs + important_msgs + recent_msgs

This approach reduced token usage by 60-70% in production workflows while maintaining accuracy.

Semantic Chunking Over Naive Splitting

When you must include large documents, split them intelligently:

from typing import List
import re

class SemanticChunker:
    def chunk_code_by_scope(self, code: str, max_chunk_size: int = 1000) -> List[str]:
        """Split code at natural boundaries (class/function definitions)"""
        chunks = []
        current_chunk = []
        current_size = 0
        
        lines = code.split('\n')
        
        for i, line in enumerate(lines):
            # Detect scope boundaries
            is_boundary = bool(re.match(r'^(class |def |async def )', line.strip()))
            line_size = len(line)
            
            if is_boundary and current_size > 0 and current_size + line_size > max_chunk_size:
                # Start new chunk at this boundary
                chunks.append('\n'.join(current_chunk))
                current_chunk = [line]
                current_size = line_size
            else:
                current_chunk.append(line)
                current_size += line_size
        
        if current_chunk:
            chunks.append('\n'.join(current_chunk))
        
        return chunks

Strategy 2: Parallel Execution Patterns

Sequential agent calls are often unnecessary. Many tasks can run concurrently.

Identifying Parallelizable Operations

Look for these patterns in your workflows:

  • Multiple independent file analyses
  • Batch processing of similar items
  • Validation checks that don't depend on each other
import asyncio
from typing import List, Callable, Any

class ParallelAgentExecutor:
    def __init__(self, max_concurrent: int = 5):
        self.semaphore = asyncio.Semaphore(max_concurrent)
    
    async def execute_agent_task(self, agent_func: Callable, *args, **kwargs) -> Any:
        async with self.semaphore:
            # Rate limiting built-in
            return await asyncio.to_thread(agent_func, *args, **kwargs)
    
    async def batch_process(self, agent_func: Callable, items: List[Any]) -> List[Any]:
        tasks = [self.execute_agent_task(agent_func, item) for item in items]
        return await asyncio.gather(*tasks, return_exceptions=True)

# Example: Parallel code review
async def review_multiple_files(files: List[str]):
    executor = ParallelAgentExecutor(max_concurrent=3)
    
    def review_single_file(filepath: str) -> dict:
        # Your existing agent code review function
        response = client.chat.completions.create(
            model="gpt-4",
            messages=[
                {"role": "system", "content": "You are a code reviewer."},
                {"role": "user", "content": f"Review this file: {open(filepath).read()}"}
            ]
        )
        return {"file": filepath, "review": response.choices[0].message.content}
    
    results = await executor.batch_process(review_single_file, files)
    return [r for r in results if not isinstance(r, Exception)]

# Usage
files = ["src/auth.py", "src/db.py", "src/api.py"]
reviews = asyncio.run(review_multiple_files(files))

This pattern reduced review time from 45 seconds to 12 seconds for a 5-file changeset.

Strategy 3: Intelligent Caching Layers

AI responses for identical or similar inputs are often wasteful. Implement strategic caching.

Multi-Level Caching Architecture

import hashlib
import json
from functools import wraps
from typing import Optional, Callable
import redis

class AgentCache:
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis_client = redis.from_url(redis_url)
        self.local_cache = {}  # Fast in-memory cache
    
    def _generate_key(self, func_name: str, args: tuple, kwargs: dict) -> str:
        # Create deterministic hash of inputs
        content = json.dumps({
            'func': func_name,
            'args': args,
            'kwargs': kwargs
        }, sort_keys=True)
        return f"agent_cache:{hashlib.sha256(content.encode()).hexdigest()}"
    
    def cached_agent(self, ttl: int = 3600, similarity_threshold: float = 0.95):
        def decorator(func: Callable):
            @wraps(func)
            def wrapper(*args, **kwargs):
                cache_key = self._generate_key(func.__name__, args, kwargs)
                
                # Check local cache first (microseconds)
                if cache_key in self.local_cache:
                    return self.local_cache[cache_key]
                
                # Check Redis (milliseconds)
                cached = self.redis_client.get(cache_key)
                if cached:
                    result = json.loads(cached)
                    self.local_cache[cache_key] = result
                    return result
                
                # Cache miss - execute agent
                result = func(*args, **kwargs)
                
                # Store in both caches
                self.local_cache[cache_key] = result
                self.redis_client.setex(cache_key, ttl, json.dumps(result))
                
                return result
            return wrapper
        return decorator

# Usage
cache = AgentCache()

@cache.cached_agent(ttl=7200)
def analyze_function_complexity(code: str) -> dict:
    response = client.chat.completions.create(
        model="gpt-4",
        messages=[
            {"role": "system", "content": "Analyze code complexity and return JSON."},
            {"role": "user", "content": code}
        ]
    )
    return json.loads(response.choices[0].message.content)

Semantic Similarity Caching

For more sophisticated caching, detect semantically similar requests:

from sentence_transformers import SentenceTransformer
import numpy as np

class SemanticCache:
    def __init__(self, similarity_threshold: float = 0.92):
        self.model = SentenceTransformer('all-MiniLM-L6-v2')
        self.cache_store = {}  # {embedding_key: (embedding, response)}
        self.threshold = similarity_threshold
    
    def get_similar(self, query: str) -> Optional[Any]:
        if not self.cache_store:
            return None
        
        query_embedding = self.model.encode(query)
        
        # Compare with all cached embeddings
        for key, (cached_embedding, response) in self.cache_store.items():
            similarity = np.dot(query_embedding, cached_embedding) / \
                        (np.linalg.norm(query_embedding) * np.linalg.norm(cached_embedding))
            
            if similarity >= self.threshold:
                return response
        
        return None
    
    def store(self, query: str, response: Any):
        embedding = self.model.encode(query)
        key = hashlib.sha256(query.encode()).hexdigest()
        self.cache_store[key] = (embedding, response)

Semantic caching is particularly effective for documentation generation, code explanations, and similar requests phrased differently.

Strategy 4: Prompt Optimization Techniques

The prompt itself is your primary optimization lever. Small changes can dramatically improve performance.

Structured Output Enforcement

Force agents to return structured data, eliminating parsing errors and reducing tokens:

from pydantic import BaseModel
from typing import List
import json

class CodeIssue(BaseModel):
    line: int
    severity: str  # "error", "warning", "info"
    message: str
    suggested_fix: str

class CodeReviewResult(BaseModel):
    overall_score: int  # 0-100
    issues: List[CodeIssue]
    summary: str

def review_code_structured(code: str) -> CodeReviewResult:
    prompt = f"""Review this code and return ONLY valid JSON matching this schema:
{{
  "overall_score": <0-100>,
  "issues": [
    {{
      "line": <line_number>,
      "severity": "error|warning|info",
      "message": "<description>",
      "suggested_fix": "<fix>"
    }}
  ],
  "summary": "<brief_summary>"
}}

Code to review:

{code}


Return JSON only, no markdown formatting:"""

    response = client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": prompt}],
        temperature=0.3,  # Lower temperature for structured output
        response_format={"type": "json_object"}  # If supported by your model
    )
    
    result_dict = json.loads(response.choices[0].message.content)
    return CodeReviewResult(**result_dict)

Few-Shot Optimization

Provide examples strategically. More isn't always better:

def create_optimized_few_shot_prompt(task: str, examples: List[dict]) -> str:
    # Select most relevant examples using embedding similarity
    from sentence_transformers import SentenceTransformer
    
    model = SentenceTransformer('all-MiniLM-L6-v2')
    task_embedding = model.encode(task)
    
    # Rank examples by similarity
    ranked_examples = sorted(
        examples,
        key=lambda ex: np.dot(task_embedding, model.encode(ex['input'])),
        reverse=True
    )[:3]  # Use only top 3 most relevant
    
    examples_text = "\n\n".join([
        f"Input: {ex['input']}\nOutput: {ex['output']}"
        for ex in ranked_examples
    ])
    
    return f"""Examples:
{examples_text}

Now complete this task:
{task}"""

Strategy 5: Model Selection and Routing

Not every task requires your most powerful model. Route intelligently.

from enum import Enum

class TaskComplexity(Enum):
    SIMPLE = "simple"      # Pattern matching, formatting
    MODERATE = "moderate"  # Code review, documentation
    COMPLEX = "complex"    # Architecture design, debugging

class ModelRouter:
    MODEL_MAP = {
        TaskComplexity.SIMPLE: "gpt-3.5-turbo",
        TaskComplexity.MODERATE: "gpt-4",
        TaskComplexity.COMPLEX: "gpt-4-turbo-preview"
    }
    
    def classify_task(self, task_description: str) -> TaskComplexity:
        # Use a fast classifier (could be rule-based or small ML model)
        keywords_complex = ["architecture", "design", "debug", "performance"]
        keywords_simple = ["format", "rename", "extract", "list"]
        
        desc_lower = task_description.lower()
        
        if any(kw in desc_lower for kw in keywords_complex):
            return TaskComplexity.COMPLEX
        elif any(kw in desc_lower for kw in keywords_simple):
            return TaskComplexity.SIMPLE
        else:
            return TaskComplexity.MODERATE
    
    def route_task(self, task: str, user_prompt: str) -> str:
        complexity = self.classify_task(task)
        model = self.MODEL_MAP[complexity]
        
        response = client.chat.completions.create(
            model=model,
            messages=[{"role": "user", "content": user_prompt}]
        )
        
        return response.choices[0].message.content

# Usage
router = ModelRouter()
result = router.route_task(
    task="Format this JSON file",
    user_prompt="Format this JSON: {...}"
)  # Routes to gpt-3.5-turbo automatically

Strategy 6: Incremental Processing with Checkpoints

For long-running agentic workflows, implement checkpointing to avoid complete re-runs on failure.

import pickle
from pathlib import Path
from typing import Any, Callable

class CheckpointedAgent:
    def __init__(self, checkpoint_dir: str = ".agent_checkpoints"):
        self.checkpoint_dir = Path(checkpoint_dir)
        self.checkpoint_dir.mkdir(exist_ok=True)
    
    def execute_with_checkpoints(self, workflow_id: str, stages: List[tuple[str, Callable]]):
        results = {}
        
        for stage_name, stage_func in stages:
            checkpoint_file = self.checkpoint_dir / f"{workflow_id}_{stage_name}.pkl"
            
            # Try to load from checkpoint
            if checkpoint_file.exists():
                print(f"Loading {stage_name} from checkpoint")
                with open(checkpoint_file, 'rb') as f:
                    results[stage_name] = pickle.load(f)
                continue
            
            # Execute stage
            print(f"Executing {stage_name}")
            result = stage_func(results)  # Pass previous results
            results[stage_name] = result
            
            # Save checkpoint
            with open(checkpoint_file, 'wb') as f:
                pickle.dump(result, f)
        
        return results
    
    def clear_checkpoints(self, workflow_id: str):
        for checkpoint_file in self.checkpoint_dir.glob(f"{workflow_id}_*.pkl"):
            checkpoint_file.unlink()

# Example usage
def analyze_codebase_workflow():
    agent = CheckpointedAgent()
    
    def stage1_file_discovery(prev_results):
        # Discover all Python files
        return list(Path("src").rglob("*.py"))
    
    def stage2_parse_files(prev_results):
        # Parse AST for each file
        files = prev_results['file_discovery']
        # ... parsing logic
        return parsed_data
    
    def stage3_analyze_patterns(prev_results):
        # AI-powered pattern analysis
        parsed = prev_results['parse_files']
        # ... analysis logic
        return analysis_results
    
    return agent.execute_with_checkpoints(
        workflow_id="codebase_analysis_v1",
        stages=[
            ("file_discovery", stage1_file_discovery),
            ("parse_files", stage2_parse_files),
            ("analyze_patterns", stage3_analyze_patterns)
        ]
    )

Measuring Optimization Impact

After implementing these strategies, quantify improvements:

class OptimizationComparison:
    def __init__(self):
        self.baseline_metrics = []
        self.optimized_metrics = []
    
    def compare(self):
        if not self.baseline_metrics or not self.optimized_metrics:
            return "Insufficient data"
        
        baseline_avg_time = np.mean([m.execution_time for m in self.baseline_metrics])
        optimized_avg_time = np.mean([m.execution_time for m in self.optimized_metrics])
        
        baseline_avg_tokens = np.mean([m.total_tokens for m in self.baseline_metrics])
        optimized_avg_tokens = np.mean([m.total_tokens for m in self.optimized_metrics])
        
        time_improvement = ((baseline_avg_time - optimized_avg_time) / baseline_avg_time) * 100
        token_improvement = ((baseline_avg_tokens - optimized_avg_tokens) / baseline_avg_tokens) * 100
        
        return f"""
        Performance Improvements:
        - Execution Time: {time_improvement:.1f}% faster
        - Token Usage: {token_improvement:.1f}% reduction
        - Cost Savings: ~${(baseline_avg_tokens - optimized_avg_tokens) * 0.00002:.4f} per run
        """

Practical Optimization Workflow

Follow this process for any agentic workflow:

  1. Profile: Run with AgentProfiler to establish baseline
  2. Identify bottlenecks: Token usage too high? Too many iterations? Slow execution?
  3. Apply targeted optimizations: Context pruning for token issues, parallelization for speed, caching for repeated operations
  4. Measure impact: Compare before/after metrics
  5. Iterate: Optimization is ongoing, especially as your workflow evolves

Integration with Existing Workflows

These strategies complement lessons you've already mastered. When implementing optimizations:

Avoid over-reliance on optimization. If an agent task is fundamentally unsuited for AI, no amount of optimization fixes it. Review when-not-to-use-ai if optimization feels like fighting the tool.

Common Optimization Pitfalls

Over-caching: Stale cache entries can return outdated results. Implement TTLs and cache invalidation strategies.

Premature parallelization: Parallel execution adds complexity. Only parallelize proven bottlenecks.

Aggressive context pruning: Removing too much context degrades quality. Always validate output quality after pruning.

Model routing complexity: Overly complex routing logic can introduce bugs. Start simple, then refine.

Moving Forward

Agentic optimization is both art and science. Start with measurement, apply strategies incrementally, and validate each change. The techniques in this lesson have reduced operational costs by 40-70% in production environments while improving response times.

As you scale your vibe coding practice (scaling-vibe-coding), optimization becomes non-negotiable. These strategies ensure your agentic workflows remain sustainable and cost-effective at any scale.

Next steps: Apply the profiling wrapper to your most expensive workflow, identify the largest bottleneck, and implement one optimization strategy. Measure the impact before moving to the next improvement.