Building AI-Powered Features

20 minpublished

Build application features that leverage AI APIs and models to deliver intelligent functionality.

Building AI-Powered Features

You've moved beyond using AI assistants to write code—now you're building features that use AI themselves. This is where vibe coding gets truly powerful: creating applications that leverage LLMs, embeddings, and other AI capabilities to deliver value to users.

Building AI-powered features requires a different mindset than traditional feature development. You're working with probabilistic systems that can surprise you, managing token budgets instead of just compute costs, and dealing with latency challenges that don't exist in typical CRUD apps. Let's explore the advanced techniques that separate functional AI features from production-ready ones.

Architecture Patterns for AI Features

The Orchestration Layer

Don't let your AI logic leak throughout your codebase. Create a dedicated orchestration layer that manages AI interactions:

from typing import Protocol, List, Dict, Any
from dataclasses import dataclass
from enum import Enum

class ModelProvider(Enum):
    OPENAI = "openai"
    ANTHROPIC = "anthropic"
    LOCAL = "local"

@dataclass
class AIRequest:
    prompt: str
    max_tokens: int
    temperature: float
    system_message: str | None = None
    context: Dict[str, Any] | None = None

@dataclass
class AIResponse:
    content: str
    tokens_used: int
    provider: ModelProvider
    latency_ms: int
    metadata: Dict[str, Any]

class AIOrchestrator:
    def __init__(self, primary_provider: ModelProvider, fallback_provider: ModelProvider | None = None):
        self.primary = self._init_provider(primary_provider)
        self.fallback = self._init_provider(fallback_provider) if fallback_provider else None
        self.circuit_breaker = CircuitBreaker(failure_threshold=5)
    
    async def execute(self, request: AIRequest) -> AIResponse:
        """Execute AI request with fallback and circuit breaker logic"""
        try:
            if not self.circuit_breaker.is_open():
                return await self._execute_with_provider(self.primary, request)
        except Exception as e:
            self.circuit_breaker.record_failure()
            if self.fallback:
                return await self._execute_with_provider(self.fallback, request)
            raise
        
        if self.fallback:
            return await self._execute_with_provider(self.fallback, request)
        raise CircuitBreakerOpenError("Primary provider circuit breaker is open")

This pattern gives you flexibility to swap providers, implement fallbacks, and maintain consistent error handling. When your primary provider goes down at 3 AM, you'll thank yourself for building in that fallback logic.

Streaming vs Batch Processing

Choose your processing strategy based on user experience requirements:

class ContentGenerator:
    async def generate_streaming(self, prompt: str, callback):
        """Stream tokens as they're generated - better UX for long content"""
        async for chunk in self.ai_client.stream(prompt):
            await callback(chunk)
            # User sees content appearing in real-time
    
    async def generate_batch(self, prompts: List[str]) -> List[str]:
        """Process multiple requests efficiently"""
        # Batch similar requests together
        tasks = [self.ai_client.complete(p) for p in prompts]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # Handle partial failures gracefully
        return [r if not isinstance(r, Exception) else self._fallback_content() 
                for r in results]

Use streaming when users are waiting (chat interfaces, content generation). Use batch processing for background tasks (summarizing documents, analyzing data). Don't stream when you need to validate the complete response before showing it to users—see hallucination-detection for why this matters.

Prompt Engineering at Scale

Template Management

Stop hardcoding prompts. Build a system for managing them:

interface PromptTemplate {
  id: string;
  version: number;
  template: string;
  variables: string[];
  metadata: {
    purpose: string;
    avgTokens: number;
    successRate?: number;
  };
}

class PromptRegistry {
  private templates: Map<string, PromptTemplate[]> = new Map();
  
  register(template: PromptTemplate): void {
    const versions = this.templates.get(template.id) || [];
    versions.push(template);
    this.templates.set(template.id, versions);
  }
  
  get(id: string, version?: number): PromptTemplate {
    const versions = this.templates.get(id);
    if (!versions) throw new Error(`Template ${id} not found`);
    
    if (version) {
      return versions.find(t => t.version === version) 
        || versions[versions.length - 1];
    }
    
    return versions[versions.length - 1]; // Latest version
  }
  
  render(id: string, variables: Record<string, any>, version?: number): string {
    const template = this.get(id, version);
    let result = template.template;
    
    for (const varName of template.variables) {
      if (!(varName in variables)) {
        throw new Error(`Missing variable: ${varName}`);
      }
      result = result.replace(
        new RegExp(`{{${varName}}}`, 'g'), 
        variables[varName]
      );
    }
    
    return result;
  }
}

// Usage
const registry = new PromptRegistry();

registry.register({
  id: 'code-review',
  version: 2,
  template: `Review this {{language}} code for security issues:

{{code}}

Focus on: {{focus_areas}}
Output format: JSON with {issue, severity, line, suggestion}`,
  variables: ['language', 'code', 'focus_areas'],
  metadata: {
    purpose: 'Security-focused code review',
    avgTokens: 850
  }
});

const prompt = registry.render('code-review', {
  language: 'Python',
  code: userCode,
  focus_areas: 'SQL injection, XSS, authentication'
});

This lets you version prompts, A/B test variations, and track which prompts perform best. You'll need this when you realize your initial prompt needs refinement after seeing real user data.

Dynamic Context Selection

Don't dump your entire database into the context window. Select relevant information intelligently:

from sklearn.metrics.pairwise import cosine_similarity
import numpy as np

class SmartContextBuilder:
    def __init__(self, embedding_model, max_tokens: int = 4000):
        self.embedding_model = embedding_model
        self.max_tokens = max_tokens
    
    async def build_context(self, query: str, available_docs: List[Document]) -> str:
        """Select most relevant documents that fit in token budget"""
        
        # Get embeddings
        query_embedding = await self.embedding_model.embed(query)
        doc_embeddings = await self.embedding_model.embed_batch(
            [doc.content for doc in available_docs]
        )
        
        # Calculate relevance scores
        similarities = cosine_similarity(
            [query_embedding], 
            doc_embeddings
        )[0]
        
        # Sort by relevance
        scored_docs = sorted(
            zip(available_docs, similarities),
            key=lambda x: x[1],
            reverse=True
        )
        
        # Select docs that fit in budget
        selected = []
        total_tokens = 0
        
        for doc, score in scored_docs:
            doc_tokens = self._estimate_tokens(doc.content)
            if total_tokens + doc_tokens > self.max_tokens:
                break
            
            selected.append(doc)
            total_tokens += doc_tokens
        
        return "\n\n---\n\n".join(doc.content for doc in selected)
    
    def _estimate_tokens(self, text: str) -> int:
        # Rough estimation: ~4 chars per token
        return len(text) // 4

This approach ensures you're always using the most relevant context without wasting tokens on irrelevant information. Token costs add up quickly—optimizing context selection can cut your API bills significantly.

Managing State and Memory

Conversation History Management

For chat interfaces, you need smart conversation memory:

interface Message {
  role: 'user' | 'assistant' | 'system';
  content: string;
  tokens: number;
  timestamp: Date;
}

class ConversationMemory {
  private messages: Message[] = [];
  private maxTokens: number = 8000;
  private summaryThreshold: number = 6000;
  
  async addMessage(role: Message['role'], content: string): Promise<void> {
    const tokens = this.estimateTokens(content);
    this.messages.push({ role, content, tokens, timestamp: new Date() });
    
    // Summarize old messages if we're approaching token limit
    if (this.getTotalTokens() > this.summaryThreshold) {
      await this.compressHistory();
    }
  }
  
  private async compressHistory(): Promise<void> {
    // Keep recent messages, summarize older ones
    const recentMessages = this.messages.slice(-10);
    const oldMessages = this.messages.slice(0, -10);
    
    if (oldMessages.length === 0) return;
    
    const summary = await this.summarizeMessages(oldMessages);
    
    this.messages = [
      {
        role: 'system',
        content: `Previous conversation summary: ${summary}`,
        tokens: this.estimateTokens(summary),
        timestamp: new Date()
      },
      ...recentMessages
    ];
  }
  
  getContext(): Message[] {
    return this.messages;
  }
  
  getTotalTokens(): number {
    return this.messages.reduce((sum, msg) => sum + msg.tokens, 0);
  }
  
  private estimateTokens(text: string): number {
    return Math.ceil(text.length / 4);
  }
}

This prevents conversations from exceeding token limits while preserving important context. Users can have lengthy conversations without hitting errors or losing context.

Error Handling and Resilience

Graceful Degradation

AI features should fail gracefully. Never show users raw error messages from AI providers:

from typing import Optional
from enum import Enum

class FallbackStrategy(Enum):
    CACHED_RESPONSE = "cached"
    SIMPLIFIED_PROMPT = "simplified"
    TRADITIONAL_ALGORITHM = "traditional"
    USER_NOTIFICATION = "notify"

class ResilientAIFeature:
    def __init__(self):
        self.cache = ResponseCache()
        self.metrics = MetricsCollector()
    
    async def execute_with_fallbacks(
        self, 
        request: AIRequest, 
        fallback_strategy: FallbackStrategy
    ) -> AIResponse:
        
        try:
            response = await self.orchestrator.execute(request)
            await self.cache.store(request, response)
            return response
            
        except RateLimitError as e:
            self.metrics.record_error("rate_limit")
            
            if fallback_strategy == FallbackStrategy.CACHED_RESPONSE:
                cached = await self.cache.get_similar(request)
                if cached:
                    return cached.with_metadata({"source": "cache", "reason": "rate_limit"})
            
            raise UserFacingError(
                "Our AI service is experiencing high demand. Please try again in a moment.",
                retry_after=e.retry_after
            )
            
        except ContextLengthError as e:
            self.metrics.record_error("context_length")
            
            if fallback_strategy == FallbackStrategy.SIMPLIFIED_PROMPT:
                simplified = self.simplify_request(request)
                return await self.execute_with_fallbacks(
                    simplified, 
                    FallbackStrategy.USER_NOTIFICATION
                )
            
            raise UserFacingError(
                "Your request is too complex. Try breaking it into smaller parts."
            )
            
        except ModelUnavailableError as e:
            self.metrics.record_error("model_unavailable")
            
            if fallback_strategy == FallbackStrategy.TRADITIONAL_ALGORITHM:
                return await self.traditional_implementation(request)
            
            raise UserFacingError(
                "AI features are temporarily unavailable. Some functionality may be limited."
            )

Always have a plan B. Your AI provider will have outages. Your context windows will be exceeded. Build for these scenarios from day one, not after they happen in production. Check out when-not-to-use-ai for guidance on when traditional approaches are better.

Performance Optimization

Request Batching and Caching

Reduce costs and latency with smart caching:

from functools import lru_cache
import hashlib
import json

class IntelligentCache:
    def __init__(self, redis_client):
        self.redis = redis_client
        self.ttl_seconds = 3600  # 1 hour default
    
    def cache_key(self, request: AIRequest) -> str:
        """Generate deterministic cache key"""
        key_data = {
            'prompt': request.prompt,
            'max_tokens': request.max_tokens,
            'temperature': request.temperature,
            'system_message': request.system_message
        }
        serialized = json.dumps(key_data, sort_keys=True)
        return f"ai_cache:{hashlib.sha256(serialized.encode()).hexdigest()}"
    
    async def get_or_execute(
        self, 
        request: AIRequest, 
        executor
    ) -> AIResponse:
        # Only cache deterministic requests (low temperature)
        if request.temperature > 0.3:
            return await executor(request)
        
        cache_key = self.cache_key(request)
        
        # Check cache
        cached = await self.redis.get(cache_key)
        if cached:
            return AIResponse.from_json(cached)
        
        # Execute and cache
        response = await executor(request)
        await self.redis.setex(
            cache_key, 
            self.ttl_seconds, 
            response.to_json()
        )
        
        return response

Parallel Processing

Process independent requests concurrently:

class ParallelAIProcessor {
  async processDocuments(documents: Document[]): Promise<ProcessedDocument[]> {
    // Split into chunks to avoid overwhelming the API
    const chunks = this.chunkArray(documents, 10);
    const results: ProcessedDocument[] = [];
    
    for (const chunk of chunks) {
      const chunkResults = await Promise.all(
        chunk.map(doc => this.processDocument(doc))
      );
      results.push(...chunkResults);
      
      // Rate limiting - pause between chunks
      await this.sleep(1000);
    }
    
    return results;
  }
  
  private async processDocument(doc: Document): Promise<ProcessedDocument> {
    const [summary, tags, sentiment] = await Promise.all([
      this.generateSummary(doc),
      this.extractTags(doc),
      this.analyzeSentiment(doc)
    ]);
    
    return { doc, summary, tags, sentiment };
  }
  
  private chunkArray<T>(array: T[], size: number): T[][] {
    const chunks: T[][] = [];
    for (let i = 0; i < array.length; i += size) {
      chunks.push(array.slice(i, i + size));
    }
    return chunks;
  }
  
  private sleep(ms: number): Promise<void> {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
}

Monitoring and Observability

Comprehensive Metrics

Track what matters for AI features:

from dataclasses import dataclass
from datetime import datetime
from typing import Dict, Any

@dataclass
class AIMetrics:
    request_id: str
    feature_name: str
    timestamp: datetime
    
    # Performance
    latency_ms: int
    tokens_used: int
    cost_usd: float
    
    # Quality
    user_feedback: Optional[int]  # 1-5 rating
    retry_count: int
    was_cached: bool
    
    # Context
    user_id: str
    provider: str
    model: str
    prompt_version: int

class MetricsCollector:
    def __init__(self, analytics_backend):
        self.backend = analytics_backend
    
    async def record_request(self, metrics: AIMetrics):
        """Record detailed metrics for each AI request"""
        await self.backend.write({
            **metrics.__dict__,
            
            # Add derived metrics
            'tokens_per_second': metrics.tokens_used / (metrics.latency_ms / 1000),
            'cost_per_token': metrics.cost_usd / metrics.tokens_used if metrics.tokens_used > 0 else 0,
        })
    
    async def get_feature_health(self, feature_name: str, hours: int = 24) -> Dict[str, Any]:
        """Aggregate health metrics for a feature"""
        metrics = await self.backend.query(
            feature=feature_name,
            since=datetime.now() - timedelta(hours=hours)
        )
        
        return {
            'total_requests': len(metrics),
            'avg_latency_ms': sum(m.latency_ms for m in metrics) / len(metrics),
            'total_cost_usd': sum(m.cost_usd for m in metrics),
            'cache_hit_rate': sum(1 for m in metrics if m.was_cached) / len(metrics),
            'avg_user_rating': sum(m.user_feedback for m in metrics if m.user_feedback) / 
                              sum(1 for m in metrics if m.user_feedback),
            'error_rate': sum(m.retry_count > 0 for m in metrics) / len(metrics)
        }

These metrics help you understand both technical performance and business impact. You'll spot issues before users complain and can quantify the value your AI features provide.

Testing AI Features

Deterministic Testing

AI features are probabilistic, but your tests shouldn't be:

import pytest
from unittest.mock import Mock

class TestAIFeature:
    @pytest.fixture
    def mock_ai_client(self):
        """Use mocked responses for deterministic tests"""
        client = Mock()
        client.complete.return_value = AIResponse(
            content="This is a test summary of the document.",
            tokens_used=15,
            provider=ModelProvider.OPENAI,
            latency_ms=234,
            metadata={}
        )
        return client
    
    async def test_summary_generation(self, mock_ai_client):
        feature = SummaryFeature(ai_client=mock_ai_client)
        result = await feature.generate_summary("Long document text...")
        
        # Test your logic, not the AI model
        assert result.summary is not None
        assert len(result.summary) > 0
        assert result.tokens_used == 15
        
        # Verify correct prompt was used
        call_args = mock_ai_client.complete.call_args
        assert "summarize" in call_args[0][0].lower()
    
    async def test_handles_rate_limiting(self, mock_ai_client):
        mock_ai_client.complete.side_effect = RateLimitError(retry_after=60)
        
        feature = SummaryFeature(ai_client=mock_ai_client)
        
        with pytest.raises(UserFacingError) as exc_info:
            await feature.generate_summary("Text")
        
        assert "high demand" in str(exc_info.value).lower()
        assert exc_info.value.retry_after == 60

Integration Testing with Real Models

Run integration tests against real models in CI/CD, but carefully:

describe('AI Feature Integration Tests', () => {
  // Only run these in CI or with explicit flag
  const shouldRunIntegration = process.env.RUN_INTEGRATION_TESTS === 'true';
  
  (shouldRunIntegration ? it : it.skip)('generates accurate code summaries', async () => {
    const feature = new CodeSummaryFeature(realAIClient);
    
    const testCode = `
      function calculateTotal(items) {
        return items.reduce((sum, item) => sum + item.price, 0);
      }
    `;
    
    const summary = await feature.summarize(testCode);
    
    // Test for expected concepts, not exact wording
    expect(summary.toLowerCase()).toContain('calculate');
    expect(summary.toLowerCase()).toContain('total');
    expect(summary.toLowerCase()).toMatch(/sum|add/);
  }, 30000); // Longer timeout for real API calls
});

See quality-control for more on testing strategies.

Security Considerations

Never trust AI output blindly:

class SecureAIFeature:
    def __init__(self):
        self.output_validator = OutputValidator()
        self.sanitizer = ContentSanitizer()
    
    async def generate_user_content(self, prompt: str, user_id: str) -> str:
        # Sanitize input
        clean_prompt = self.sanitizer.sanitize_input(prompt)
        
        # Add safety instructions
        safe_prompt = f"""
        {clean_prompt}
        
        IMPORTANT: Do not include:
        - Personal information
        - Harmful content
        - Code that could be malicious
        """
        
        response = await self.ai_client.complete(safe_prompt)
        
        # Validate output
        if not self.output_validator.is_safe(response.content):
            await self.log_unsafe_output(response, user_id)
            raise UnsafeContentError("Generated content failed safety checks")
        
        # Sanitize output before returning to user
        return self.sanitizer.sanitize_output(response.content)

For detailed security practices, review security-considerations.

Cost Management

Budget Enforcement

Implement hard limits to prevent runaway costs:

class BudgetEnforcer:
    def __init__(self, daily_budget_usd: float):
        self.daily_budget = daily_budget_usd
        self.redis = redis_client
    
    async def check_budget(self, estimated_cost: float) -> bool:
        today = datetime.now().strftime('%Y-%m-%d')
        key = f"ai_spending:{today}"
        
        current_spending = float(await self.redis.get(key) or 0)
        
        if current_spending + estimated_cost > self.daily_budget:
            await self.notify_budget_exceeded(current_spending, estimated_cost)
            return False
        
        return True
    
    async def record_spending(self, cost: float):
        today = datetime.now().strftime('%Y-%m-%d')
        key = f"ai_spending:{today}"
        
        await self.redis.incrbyfloat(key, cost)
        await self.redis.expire(key, 86400 * 7)  # Keep for 7 days

Real-World Example: Document Analysis Pipeline

Here's how these techniques come together:

class DocumentAnalysisPipeline:
    def __init__(self):
        self.orchestrator = AIOrchestrator(
            primary_provider=ModelProvider.OPENAI,
            fallback_provider=ModelProvider.ANTHROPIC
        )
        self.cache = IntelligentCache(redis_client)
        self.metrics = MetricsCollector(analytics_backend)
        self.budget = BudgetEnforcer(daily_budget_usd=100)
        self.context_builder = SmartContextBuilder(embedding_model)
    
    async def analyze_document(self, doc: Document, user_id: str) -> Analysis:
        request_id = generate_request_id()
        start_time = time.time()
        
        try:
            # Check budget
            estimated_cost = self.estimate_cost(doc)
            if not await self.budget.check_budget(estimated_cost):
                raise BudgetExceededError("Daily AI budget exceeded")
            
            # Build smart context
            context = await self.context_builder.build_context(
                doc.content,
                available_docs=doc.related_documents
            )
            
            # Create request
            ai_request = AIRequest(
                prompt=self.build_analysis_prompt(doc, context),
                max_tokens=1000,
                temperature=0.3,
                system_message="You are an expert document analyst."
            )
            
            # Execute with caching
            response = await self.cache.get_or_execute(
                ai_request,
                lambda req: self.orchestrator.execute(req)
            )
            
            # Record metrics
            await self.metrics.record_request(AIMetrics(
                request_id=request_id,
                feature_name="document_analysis",
                timestamp=datetime.now(),
                latency_ms=int((time.time() - start_time) * 1000),
                tokens_used=response.tokens_used,
                cost_usd=response.tokens_used * 0.00002,  # Example rate
                user_feedback=None,
                retry_count=0,
                was_cached=response.metadata.get('source') == 'cache',
                user_id=user_id,
                provider=response.provider.value,
                model="gpt-4",
                prompt_version=2
            ))
            
            await self.budget.record_spending(response.tokens_used * 0.00002)
            
            return Analysis.from_ai_response(response)
            
        except Exception as e:
            await self.metrics.record_error(str(e))
            raise

This example demonstrates orchestration, caching, budgeting, metrics, smart context selection, and error handling working together. This is production-ready AI feature code.

Key Takeaways

Building AI-powered features requires thinking beyond traditional software patterns. You're managing probabilistic systems with unique characteristics—token budgets, context windows, latency, and costs that scale with usage rather than infrastructure.

The techniques we've covered—orchestration layers, smart context selection, conversation memory, graceful degradation, caching, monitoring, and budget enforcement—aren't optional extras. They're foundational patterns that separate hobby projects from production systems.

Start with solid architecture. Build in resilience from day one. Monitor everything. And remember: AI features should enhance your application, not become a single point of failure. When your AI provider goes down, your app should degrade gracefully, not crash completely.

The developers who master these advanced techniques will build the AI-powered features that users love and that scale reliably to millions of requests. The ones who skip these patterns will spend their time firefighting production incidents and explaining to management why the AI bill exploded.

Which developer will you be?