Building AI-Powered Features: A Developer's Guide

# Building AI-Powered Features You've moved beyond using AI assistants to write code—now you're building features that *use* AI themselves. This is where vibe coding gets truly powerful: creating applications that leverage LLMs, embeddings, and other AI capabilities to deliver value to users. Building AI-powered features requires a different mindset than traditional feature development. You're working with probabilistic systems that can surprise you, managing token budgets instead of just compute costs, and dealing with latency challenges that don't exist in typical CRUD apps. Let's explore the advanced techniques that separate functional AI features from production-ready ones. ## Architecture Patterns for AI Features ### The Orchestration Layer Don't let your AI logic leak throughout your codebase. Create a dedicated orchestration layer that manages AI interactions: ```python from typing import Protocol, List, Dict, Any from dataclasses import dataclass from enum import Enum class ModelProvider(Enum): OPENAI = "openai" ANTHROPIC = "anthropic" LOCAL = "local" @dataclass class AIRequest: prompt: str max_tokens: int temperature: float system_message: str | None = None context: Dict[str, Any] | None = None @dataclass class AIResponse: content: str tokens_used: int provider: ModelProvider latency_ms: int metadata: Dict[str, Any] class AIOrchestrator: def __init__(self, primary_provider: ModelProvider, fallback_provider: ModelProvider | None = None): self.primary = self._init_provider(primary_provider) self.fallback = self._init_provider(fallback_provider) if fallback_provider else None self.circuit_breaker = CircuitBreaker(failure_threshold=5) async def execute(self, request: AIRequest) -> AIResponse: """Execute AI request with fallback and circuit breaker logic""" try: if not self.circuit_breaker.is_open(): return await self._execute_with_provider(self.primary, request) except Exception as e: self.circuit_breaker.record_failure() if self.fallback: return await self._execute_with_provider(self.fallback, request) raise if self.fallback: return await self._execute_with_provider(self.fallback, request) raise CircuitBreakerOpenError("Primary provider circuit breaker is open") ``` This pattern gives you flexibility to swap providers, implement fallbacks, and maintain consistent error handling. When your primary provider goes down at 3 AM, you'll thank yourself for building in that fallback logic. ### Streaming vs Batch Processing Choose your processing strategy based on user experience requirements: ```python class ContentGenerator: async def generate_streaming(self, prompt: str, callback): """Stream tokens as they're generated - better UX for long content""" async for chunk in self.ai_client.stream(prompt): await callback(chunk) # User sees content appearing in real-time async def generate_batch(self, prompts: List[str]) -> List[str]: """Process multiple requests efficiently""" # Batch similar requests together tasks = [self.ai_client.complete(p) for p in prompts] results = await asyncio.gather(*tasks, return_exceptions=True) # Handle partial failures gracefully return [r if not isinstance(r, Exception) else self._fallback_content() for r in results] ``` Use streaming when users are waiting (chat interfaces, content generation). Use batch processing for background tasks (summarizing documents, analyzing data). Don't stream when you need to validate the complete response before showing it to users—see [hallucination-detection](/lessons/hallucination-detection) for why this matters. ## Prompt Engineering at Scale ### Template Management Stop hardcoding prompts. Build a system for managing them: ```typescript interface PromptTemplate { id: string; version: number; template: string; variables: string[]; metadata: { purpose: string; avgTokens: number; successRate?: number; }; } class PromptRegistry { private templates: Map = new Map(); register(template: PromptTemplate): void { const versions = this.templates.get(template.id) || []; versions.push(template); this.templates.set(template.id, versions); } get(id: string, version?: number): PromptTemplate { const versions = this.templates.get(id); if (!versions) throw new Error(`Template ${id} not found`); if (version) { return versions.find(t => t.version === version) || versions[versions.length - 1]; } return versions[versions.length - 1]; // Latest version } render(id: string, variables: Record, version?: number): string { const template = this.get(id, version); let result = template.template; for (const varName of template.variables) { if (!(varName in variables)) { throw new Error(`Missing variable: ${varName}`); } result = result.replace( new RegExp(`{{${varName}}}`, 'g'), variables[varName] ); } return result; } } // Usage const registry = new PromptRegistry(); registry.register({ id: 'code-review', version: 2, template: `Review this {{language}} code for security issues: {{code}} Focus on: {{focus_areas}} Output format: JSON with {issue, severity, line, suggestion}`, variables: ['language', 'code', 'focus_areas'], metadata: { purpose: 'Security-focused code review', avgTokens: 850 } }); const prompt = registry.render('code-review', { language: 'Python', code: userCode, focus_areas: 'SQL injection, XSS, authentication' }); ``` This lets you version prompts, A/B test variations, and track which prompts perform best. You'll need this when you realize your initial prompt needs refinement after seeing real user data. ### Dynamic Context Selection Don't dump your entire database into the context window. Select relevant information intelligently: ```python from sklearn.metrics.pairwise import cosine_similarity import numpy as np class SmartContextBuilder: def __init__(self, embedding_model, max_tokens: int = 4000): self.embedding_model = embedding_model self.max_tokens = max_tokens async def build_context(self, query: str, available_docs: List[Document]) -> str: """Select most relevant documents that fit in token budget""" # Get embeddings query_embedding = await self.embedding_model.embed(query) doc_embeddings = await self.embedding_model.embed_batch( [doc.content for doc in available_docs] ) # Calculate relevance scores similarities = cosine_similarity( [query_embedding], doc_embeddings )[0] # Sort by relevance scored_docs = sorted( zip(available_docs, similarities), key=lambda x: x[1], reverse=True ) # Select docs that fit in budget selected = [] total_tokens = 0 for doc, score in scored_docs: doc_tokens = self._estimate_tokens(doc.content) if total_tokens + doc_tokens > self.max_tokens: break selected.append(doc) total_tokens += doc_tokens return "\n\n---\n\n".join(doc.content for doc in selected) def _estimate_tokens(self, text: str) -> int: # Rough estimation: ~4 chars per token return len(text) // 4 ``` This approach ensures you're always using the most relevant context without wasting tokens on irrelevant information. Token costs add up quickly—optimizing context selection can cut your API bills significantly. ## Managing State and Memory ### Conversation History Management For chat interfaces, you need smart conversation memory: ```typescript interface Message { role: 'user' | 'assistant' | 'system'; content: string; tokens: number; timestamp: Date; } class ConversationMemory { private messages: Message[] = []; private maxTokens: number = 8000; private summaryThreshold: number = 6000; async addMessage(role: Message['role'], content: string): Promise { const tokens = this.estimateTokens(content); this.messages.push({ role, content, tokens, timestamp: new Date() }); // Summarize old messages if we're approaching token limit if (this.getTotalTokens() > this.summaryThreshold) { await this.compressHistory(); } } private async compressHistory(): Promise { // Keep recent messages, summarize older ones const recentMessages = this.messages.slice(-10); const oldMessages = this.messages.slice(0, -10); if (oldMessages.length === 0) return; const summary = await this.summarizeMessages(oldMessages); this.messages = [ { role: 'system', content: `Previous conversation summary: ${summary}`, tokens: this.estimateTokens(summary), timestamp: new Date() }, ...recentMessages ]; } getContext(): Message[] { return this.messages; } getTotalTokens(): number { return this.messages.reduce((sum, msg) => sum + msg.tokens, 0); } private estimateTokens(text: string): number { return Math.ceil(text.length / 4); } } ``` This prevents conversations from exceeding token limits while preserving important context. Users can have lengthy conversations without hitting errors or losing context. ## Error Handling and Resilience ### Graceful Degradation AI features should fail gracefully. Never show users raw error messages from AI providers: ```python from typing import Optional from enum import Enum class FallbackStrategy(Enum): CACHED_RESPONSE = "cached" SIMPLIFIED_PROMPT = "simplified" TRADITIONAL_ALGORITHM = "traditional" USER_NOTIFICATION = "notify" class ResilientAIFeature: def __init__(self): self.cache = ResponseCache() self.metrics = MetricsCollector() async def execute_with_fallbacks( self, request: AIRequest, fallback_strategy: FallbackStrategy ) -> AIResponse: try: response = await self.orchestrator.execute(request) await self.cache.store(request, response) return response except RateLimitError as e: self.metrics.record_error("rate_limit") if fallback_strategy == FallbackStrategy.CACHED_RESPONSE: cached = await self.cache.get_similar(request) if cached: return cached.with_metadata({"source": "cache", "reason": "rate_limit"}) raise UserFacingError( "Our AI service is experiencing high demand. Please try again in a moment.", retry_after=e.retry_after ) except ContextLengthError as e: self.metrics.record_error("context_length") if fallback_strategy == FallbackStrategy.SIMPLIFIED_PROMPT: simplified = self.simplify_request(request) return await self.execute_with_fallbacks( simplified, FallbackStrategy.USER_NOTIFICATION ) raise UserFacingError( "Your request is too complex. Try breaking it into smaller parts." ) except ModelUnavailableError as e: self.metrics.record_error("model_unavailable") if fallback_strategy == FallbackStrategy.TRADITIONAL_ALGORITHM: return await self.traditional_implementation(request) raise UserFacingError( "AI features are temporarily unavailable. Some functionality may be limited." ) ``` Always have a plan B. Your AI provider *will* have outages. Your context windows *will* be exceeded. Build for these scenarios from day one, not after they happen in production. Check out [when-not-to-use-ai](/lessons/when-not-to-use-ai) for guidance on when traditional approaches are better. ## Performance Optimization ### Request Batching and Caching Reduce costs and latency with smart caching: ```python from functools import lru_cache import hashlib import json class IntelligentCache: def __init__(self, redis_client): self.redis = redis_client self.ttl_seconds = 3600 # 1 hour default def cache_key(self, request: AIRequest) -> str: """Generate deterministic cache key""" key_data = { 'prompt': request.prompt, 'max_tokens': request.max_tokens, 'temperature': request.temperature, 'system_message': request.system_message } serialized = json.dumps(key_data, sort_keys=True) return f"ai_cache:{hashlib.sha256(serialized.encode()).hexdigest()}" async def get_or_execute( self, request: AIRequest, executor ) -> AIResponse: # Only cache deterministic requests (low temperature) if request.temperature > 0.3: return await executor(request) cache_key = self.cache_key(request) # Check cache cached = await self.redis.get(cache_key) if cached: return AIResponse.from_json(cached) # Execute and cache response = await executor(request) await self.redis.setex( cache_key, self.ttl_seconds, response.to_json() ) return response ``` ### Parallel Processing Process independent requests concurrently: ```typescript class ParallelAIProcessor { async processDocuments(documents: Document[]): Promise { // Split into chunks to avoid overwhelming the API const chunks = this.chunkArray(documents, 10); const results: ProcessedDocument[] = []; for (const chunk of chunks) { const chunkResults = await Promise.all( chunk.map(doc => this.processDocument(doc)) ); results.push(...chunkResults); // Rate limiting - pause between chunks await this.sleep(1000); } return results; } private async processDocument(doc: Document): Promise { const [summary, tags, sentiment] = await Promise.all([ this.generateSummary(doc), this.extractTags(doc), this.analyzeSentiment(doc) ]); return { doc, summary, tags, sentiment }; } private chunkArray(array: T[], size: number): T[][] { const chunks: T[][] = []; for (let i = 0; i < array.length; i += size) { chunks.push(array.slice(i, i + size)); } return chunks; } private sleep(ms: number): Promise { return new Promise(resolve => setTimeout(resolve, ms)); } } ``` ## Monitoring and Observability ### Comprehensive Metrics Track what matters for AI features: ```python from dataclasses import dataclass from datetime import datetime from typing import Dict, Any @dataclass class AIMetrics: request_id: str feature_name: str timestamp: datetime # Performance latency_ms: int tokens_used: int cost_usd: float # Quality user_feedback: Optional[int] # 1-5 rating retry_count: int was_cached: bool # Context user_id: str provider: str model: str prompt_version: int class MetricsCollector: def __init__(self, analytics_backend): self.backend = analytics_backend async def record_request(self, metrics: AIMetrics): """Record detailed metrics for each AI request""" await self.backend.write({ **metrics.__dict__, # Add derived metrics 'tokens_per_second': metrics.tokens_used / (metrics.latency_ms / 1000), 'cost_per_token': metrics.cost_usd / metrics.tokens_used if metrics.tokens_used > 0 else 0, }) async def get_feature_health(self, feature_name: str, hours: int = 24) -> Dict[str, Any]: """Aggregate health metrics for a feature""" metrics = await self.backend.query( feature=feature_name, since=datetime.now() - timedelta(hours=hours) ) return { 'total_requests': len(metrics), 'avg_latency_ms': sum(m.latency_ms for m in metrics) / len(metrics), 'total_cost_usd': sum(m.cost_usd for m in metrics), 'cache_hit_rate': sum(1 for m in metrics if m.was_cached) / len(metrics), 'avg_user_rating': sum(m.user_feedback for m in metrics if m.user_feedback) / sum(1 for m in metrics if m.user_feedback), 'error_rate': sum(m.retry_count > 0 for m in metrics) / len(metrics) } ``` These metrics help you understand both technical performance and business impact. You'll spot issues before users complain and can quantify the value your AI features provide. ## Testing AI Features ### Deterministic Testing AI features are probabilistic, but your tests shouldn't be: ```python import pytest from unittest.mock import Mock class TestAIFeature: @pytest.fixture def mock_ai_client(self): """Use mocked responses for deterministic tests""" client = Mock() client.complete.return_value = AIResponse( content="This is a test summary of the document.", tokens_used=15, provider=ModelProvider.OPENAI, latency_ms=234, metadata={} ) return client async def test_summary_generation(self, mock_ai_client): feature = SummaryFeature(ai_client=mock_ai_client) result = await feature.generate_summary("Long document text...") # Test your logic, not the AI model assert result.summary is not None assert len(result.summary) > 0 assert result.tokens_used == 15 # Verify correct prompt was used call_args = mock_ai_client.complete.call_args assert "summarize" in call_args[0][0].lower() async def test_handles_rate_limiting(self, mock_ai_client): mock_ai_client.complete.side_effect = RateLimitError(retry_after=60) feature = SummaryFeature(ai_client=mock_ai_client) with pytest.raises(UserFacingError) as exc_info: await feature.generate_summary("Text") assert "high demand" in str(exc_info.value).lower() assert exc_info.value.retry_after == 60 ``` ### Integration Testing with Real Models Run integration tests against real models in CI/CD, but carefully: ```typescript describe('AI Feature Integration Tests', () => { // Only run these in CI or with explicit flag const shouldRunIntegration = process.env.RUN_INTEGRATION_TESTS === 'true'; (shouldRunIntegration ? it : it.skip)('generates accurate code summaries', async () => { const feature = new CodeSummaryFeature(realAIClient); const testCode = ` function calculateTotal(items) { return items.reduce((sum, item) => sum + item.price, 0); } `; const summary = await feature.summarize(testCode); // Test for expected concepts, not exact wording expect(summary.toLowerCase()).toContain('calculate'); expect(summary.toLowerCase()).toContain('total'); expect(summary.toLowerCase()).toMatch(/sum|add/); }, 30000); // Longer timeout for real API calls }); ``` See [quality-control](/lessons/quality-control) for more on testing strategies. ## Security Considerations Never trust AI output blindly: ```python class SecureAIFeature: def __init__(self): self.output_validator = OutputValidator() self.sanitizer = ContentSanitizer() async def generate_user_content(self, prompt: str, user_id: str) -> str: # Sanitize input clean_prompt = self.sanitizer.sanitize_input(prompt) # Add safety instructions safe_prompt = f""" {clean_prompt} IMPORTANT: Do not include: - Personal information - Harmful content - Code that could be malicious """ response = await self.ai_client.complete(safe_prompt) # Validate output if not self.output_validator.is_safe(response.content): await self.log_unsafe_output(response, user_id) raise UnsafeContentError("Generated content failed safety checks") # Sanitize output before returning to user return self.sanitizer.sanitize_output(response.content) ``` For detailed security practices, review [security-considerations](/lessons/security-considerations). ## Cost Management ### Budget Enforcement Implement hard limits to prevent runaway costs: ```python class BudgetEnforcer: def __init__(self, daily_budget_usd: float): self.daily_budget = daily_budget_usd self.redis = redis_client async def check_budget(self, estimated_cost: float) -> bool: today = datetime.now().strftime('%Y-%m-%d') key = f"ai_spending:{today}" current_spending = float(await self.redis.get(key) or 0) if current_spending + estimated_cost > self.daily_budget: await self.notify_budget_exceeded(current_spending, estimated_cost) return False return True async def record_spending(self, cost: float): today = datetime.now().strftime('%Y-%m-%d') key = f"ai_spending:{today}" await self.redis.incrbyfloat(key, cost) await self.redis.expire(key, 86400 * 7) # Keep for 7 days ``` ## Real-World Example: Document Analysis Pipeline Here's how these techniques come together: ```python class DocumentAnalysisPipeline: def __init__(self): self.orchestrator = AIOrchestrator( primary_provider=ModelProvider.OPENAI, fallback_provider=ModelProvider.ANTHROPIC ) self.cache = IntelligentCache(redis_client) self.metrics = MetricsCollector(analytics_backend) self.budget = BudgetEnforcer(daily_budget_usd=100) self.context_builder = SmartContextBuilder(embedding_model) async def analyze_document(self, doc: Document, user_id: str) -> Analysis: request_id = generate_request_id() start_time = time.time() try: # Check budget estimated_cost = self.estimate_cost(doc) if not await self.budget.check_budget(estimated_cost): raise BudgetExceededError("Daily AI budget exceeded") # Build smart context context = await self.context_builder.build_context( doc.content, available_docs=doc.related_documents ) # Create request ai_request = AIRequest( prompt=self.build_analysis_prompt(doc, context), max_tokens=1000, temperature=0.3, system_message="You are an expert document analyst." ) # Execute with caching response = await self.cache.get_or_execute( ai_request, lambda req: self.orchestrator.execute(req) ) # Record metrics await self.metrics.record_request(AIMetrics( request_id=request_id, feature_name="document_analysis", timestamp=datetime.now(), latency_ms=int((time.time() - start_time) * 1000), tokens_used=response.tokens_used, cost_usd=response.tokens_used * 0.00002, # Example rate user_feedback=None, retry_count=0, was_cached=response.metadata.get('source') == 'cache', user_id=user_id, provider=response.provider.value, model="gpt-4", prompt_version=2 )) await self.budget.record_spending(response.tokens_used * 0.00002) return Analysis.from_ai_response(response) except Exception as e: await self.metrics.record_error(str(e)) raise ``` This example demonstrates orchestration, caching, budgeting, metrics, smart context selection, and error handling working together. This is production-ready AI feature code. ## Key Takeaways Building AI-powered features requires thinking beyond traditional software patterns. You're managing probabilistic systems with unique characteristics—token budgets, context windows, latency, and costs that scale with usage rather than infrastructure. The techniques we've covered—orchestration layers, smart context selection, conversation memory, graceful degradation, caching, monitoring, and budget enforcement—aren't optional extras. They're foundational patterns that separate hobby projects from production systems. Start with solid architecture. Build in resilience from day one. Monitor everything. And remember: AI features should enhance your application, not become a single point of failure. When your AI provider goes down, your app should degrade gracefully, not crash completely. The developers who master these advanced techniques will build the AI-powered features that users love and that scale reliably to millions of requests. The ones who skip these patterns will spend their time firefighting production incidents and explaining to management why the AI bill exploded. Which developer will you be?