# Building AI-Powered Features
You've moved beyond using AI assistants to write code—now you're building features that *use* AI themselves. This is where vibe coding gets truly powerful: creating applications that leverage LLMs, embeddings, and other AI capabilities to deliver value to users.
Building AI-powered features requires a different mindset than traditional feature development. You're working with probabilistic systems that can surprise you, managing token budgets instead of just compute costs, and dealing with latency challenges that don't exist in typical CRUD apps. Let's explore the advanced techniques that separate functional AI features from production-ready ones.
## Architecture Patterns for AI Features
### The Orchestration Layer
Don't let your AI logic leak throughout your codebase. Create a dedicated orchestration layer that manages AI interactions:
```python
from typing import Protocol, List, Dict, Any
from dataclasses import dataclass
from enum import Enum
class ModelProvider(Enum):
OPENAI = "openai"
ANTHROPIC = "anthropic"
LOCAL = "local"
@dataclass
class AIRequest:
prompt: str
max_tokens: int
temperature: float
system_message: str | None = None
context: Dict[str, Any] | None = None
@dataclass
class AIResponse:
content: str
tokens_used: int
provider: ModelProvider
latency_ms: int
metadata: Dict[str, Any]
class AIOrchestrator:
def __init__(self, primary_provider: ModelProvider, fallback_provider: ModelProvider | None = None):
self.primary = self._init_provider(primary_provider)
self.fallback = self._init_provider(fallback_provider) if fallback_provider else None
self.circuit_breaker = CircuitBreaker(failure_threshold=5)
async def execute(self, request: AIRequest) -> AIResponse:
"""Execute AI request with fallback and circuit breaker logic"""
try:
if not self.circuit_breaker.is_open():
return await self._execute_with_provider(self.primary, request)
except Exception as e:
self.circuit_breaker.record_failure()
if self.fallback:
return await self._execute_with_provider(self.fallback, request)
raise
if self.fallback:
return await self._execute_with_provider(self.fallback, request)
raise CircuitBreakerOpenError("Primary provider circuit breaker is open")
```
This pattern gives you flexibility to swap providers, implement fallbacks, and maintain consistent error handling. When your primary provider goes down at 3 AM, you'll thank yourself for building in that fallback logic.
### Streaming vs Batch Processing
Choose your processing strategy based on user experience requirements:
```python
class ContentGenerator:
async def generate_streaming(self, prompt: str, callback):
"""Stream tokens as they're generated - better UX for long content"""
async for chunk in self.ai_client.stream(prompt):
await callback(chunk)
# User sees content appearing in real-time
async def generate_batch(self, prompts: List[str]) -> List[str]:
"""Process multiple requests efficiently"""
# Batch similar requests together
tasks = [self.ai_client.complete(p) for p in prompts]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Handle partial failures gracefully
return [r if not isinstance(r, Exception) else self._fallback_content()
for r in results]
```
Use streaming when users are waiting (chat interfaces, content generation). Use batch processing for background tasks (summarizing documents, analyzing data). Don't stream when you need to validate the complete response before showing it to users—see [hallucination-detection](/lessons/hallucination-detection) for why this matters.
## Prompt Engineering at Scale
### Template Management
Stop hardcoding prompts. Build a system for managing them:
```typescript
interface PromptTemplate {
id: string;
version: number;
template: string;
variables: string[];
metadata: {
purpose: string;
avgTokens: number;
successRate?: number;
};
}
class PromptRegistry {
private templates: Map = new Map();
register(template: PromptTemplate): void {
const versions = this.templates.get(template.id) || [];
versions.push(template);
this.templates.set(template.id, versions);
}
get(id: string, version?: number): PromptTemplate {
const versions = this.templates.get(id);
if (!versions) throw new Error(`Template ${id} not found`);
if (version) {
return versions.find(t => t.version === version)
|| versions[versions.length - 1];
}
return versions[versions.length - 1]; // Latest version
}
render(id: string, variables: Record, version?: number): string {
const template = this.get(id, version);
let result = template.template;
for (const varName of template.variables) {
if (!(varName in variables)) {
throw new Error(`Missing variable: ${varName}`);
}
result = result.replace(
new RegExp(`{{${varName}}}`, 'g'),
variables[varName]
);
}
return result;
}
}
// Usage
const registry = new PromptRegistry();
registry.register({
id: 'code-review',
version: 2,
template: `Review this {{language}} code for security issues:
{{code}}
Focus on: {{focus_areas}}
Output format: JSON with {issue, severity, line, suggestion}`,
variables: ['language', 'code', 'focus_areas'],
metadata: {
purpose: 'Security-focused code review',
avgTokens: 850
}
});
const prompt = registry.render('code-review', {
language: 'Python',
code: userCode,
focus_areas: 'SQL injection, XSS, authentication'
});
```
This lets you version prompts, A/B test variations, and track which prompts perform best. You'll need this when you realize your initial prompt needs refinement after seeing real user data.
### Dynamic Context Selection
Don't dump your entire database into the context window. Select relevant information intelligently:
```python
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
class SmartContextBuilder:
def __init__(self, embedding_model, max_tokens: int = 4000):
self.embedding_model = embedding_model
self.max_tokens = max_tokens
async def build_context(self, query: str, available_docs: List[Document]) -> str:
"""Select most relevant documents that fit in token budget"""
# Get embeddings
query_embedding = await self.embedding_model.embed(query)
doc_embeddings = await self.embedding_model.embed_batch(
[doc.content for doc in available_docs]
)
# Calculate relevance scores
similarities = cosine_similarity(
[query_embedding],
doc_embeddings
)[0]
# Sort by relevance
scored_docs = sorted(
zip(available_docs, similarities),
key=lambda x: x[1],
reverse=True
)
# Select docs that fit in budget
selected = []
total_tokens = 0
for doc, score in scored_docs:
doc_tokens = self._estimate_tokens(doc.content)
if total_tokens + doc_tokens > self.max_tokens:
break
selected.append(doc)
total_tokens += doc_tokens
return "\n\n---\n\n".join(doc.content for doc in selected)
def _estimate_tokens(self, text: str) -> int:
# Rough estimation: ~4 chars per token
return len(text) // 4
```
This approach ensures you're always using the most relevant context without wasting tokens on irrelevant information. Token costs add up quickly—optimizing context selection can cut your API bills significantly.
## Managing State and Memory
### Conversation History Management
For chat interfaces, you need smart conversation memory:
```typescript
interface Message {
role: 'user' | 'assistant' | 'system';
content: string;
tokens: number;
timestamp: Date;
}
class ConversationMemory {
private messages: Message[] = [];
private maxTokens: number = 8000;
private summaryThreshold: number = 6000;
async addMessage(role: Message['role'], content: string): Promise {
const tokens = this.estimateTokens(content);
this.messages.push({ role, content, tokens, timestamp: new Date() });
// Summarize old messages if we're approaching token limit
if (this.getTotalTokens() > this.summaryThreshold) {
await this.compressHistory();
}
}
private async compressHistory(): Promise {
// Keep recent messages, summarize older ones
const recentMessages = this.messages.slice(-10);
const oldMessages = this.messages.slice(0, -10);
if (oldMessages.length === 0) return;
const summary = await this.summarizeMessages(oldMessages);
this.messages = [
{
role: 'system',
content: `Previous conversation summary: ${summary}`,
tokens: this.estimateTokens(summary),
timestamp: new Date()
},
...recentMessages
];
}
getContext(): Message[] {
return this.messages;
}
getTotalTokens(): number {
return this.messages.reduce((sum, msg) => sum + msg.tokens, 0);
}
private estimateTokens(text: string): number {
return Math.ceil(text.length / 4);
}
}
```
This prevents conversations from exceeding token limits while preserving important context. Users can have lengthy conversations without hitting errors or losing context.
## Error Handling and Resilience
### Graceful Degradation
AI features should fail gracefully. Never show users raw error messages from AI providers:
```python
from typing import Optional
from enum import Enum
class FallbackStrategy(Enum):
CACHED_RESPONSE = "cached"
SIMPLIFIED_PROMPT = "simplified"
TRADITIONAL_ALGORITHM = "traditional"
USER_NOTIFICATION = "notify"
class ResilientAIFeature:
def __init__(self):
self.cache = ResponseCache()
self.metrics = MetricsCollector()
async def execute_with_fallbacks(
self,
request: AIRequest,
fallback_strategy: FallbackStrategy
) -> AIResponse:
try:
response = await self.orchestrator.execute(request)
await self.cache.store(request, response)
return response
except RateLimitError as e:
self.metrics.record_error("rate_limit")
if fallback_strategy == FallbackStrategy.CACHED_RESPONSE:
cached = await self.cache.get_similar(request)
if cached:
return cached.with_metadata({"source": "cache", "reason": "rate_limit"})
raise UserFacingError(
"Our AI service is experiencing high demand. Please try again in a moment.",
retry_after=e.retry_after
)
except ContextLengthError as e:
self.metrics.record_error("context_length")
if fallback_strategy == FallbackStrategy.SIMPLIFIED_PROMPT:
simplified = self.simplify_request(request)
return await self.execute_with_fallbacks(
simplified,
FallbackStrategy.USER_NOTIFICATION
)
raise UserFacingError(
"Your request is too complex. Try breaking it into smaller parts."
)
except ModelUnavailableError as e:
self.metrics.record_error("model_unavailable")
if fallback_strategy == FallbackStrategy.TRADITIONAL_ALGORITHM:
return await self.traditional_implementation(request)
raise UserFacingError(
"AI features are temporarily unavailable. Some functionality may be limited."
)
```
Always have a plan B. Your AI provider *will* have outages. Your context windows *will* be exceeded. Build for these scenarios from day one, not after they happen in production. Check out [when-not-to-use-ai](/lessons/when-not-to-use-ai) for guidance on when traditional approaches are better.
## Performance Optimization
### Request Batching and Caching
Reduce costs and latency with smart caching:
```python
from functools import lru_cache
import hashlib
import json
class IntelligentCache:
def __init__(self, redis_client):
self.redis = redis_client
self.ttl_seconds = 3600 # 1 hour default
def cache_key(self, request: AIRequest) -> str:
"""Generate deterministic cache key"""
key_data = {
'prompt': request.prompt,
'max_tokens': request.max_tokens,
'temperature': request.temperature,
'system_message': request.system_message
}
serialized = json.dumps(key_data, sort_keys=True)
return f"ai_cache:{hashlib.sha256(serialized.encode()).hexdigest()}"
async def get_or_execute(
self,
request: AIRequest,
executor
) -> AIResponse:
# Only cache deterministic requests (low temperature)
if request.temperature > 0.3:
return await executor(request)
cache_key = self.cache_key(request)
# Check cache
cached = await self.redis.get(cache_key)
if cached:
return AIResponse.from_json(cached)
# Execute and cache
response = await executor(request)
await self.redis.setex(
cache_key,
self.ttl_seconds,
response.to_json()
)
return response
```
### Parallel Processing
Process independent requests concurrently:
```typescript
class ParallelAIProcessor {
async processDocuments(documents: Document[]): Promise {
// Split into chunks to avoid overwhelming the API
const chunks = this.chunkArray(documents, 10);
const results: ProcessedDocument[] = [];
for (const chunk of chunks) {
const chunkResults = await Promise.all(
chunk.map(doc => this.processDocument(doc))
);
results.push(...chunkResults);
// Rate limiting - pause between chunks
await this.sleep(1000);
}
return results;
}
private async processDocument(doc: Document): Promise {
const [summary, tags, sentiment] = await Promise.all([
this.generateSummary(doc),
this.extractTags(doc),
this.analyzeSentiment(doc)
]);
return { doc, summary, tags, sentiment };
}
private chunkArray(array: T[], size: number): T[][] {
const chunks: T[][] = [];
for (let i = 0; i < array.length; i += size) {
chunks.push(array.slice(i, i + size));
}
return chunks;
}
private sleep(ms: number): Promise {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
```
## Monitoring and Observability
### Comprehensive Metrics
Track what matters for AI features:
```python
from dataclasses import dataclass
from datetime import datetime
from typing import Dict, Any
@dataclass
class AIMetrics:
request_id: str
feature_name: str
timestamp: datetime
# Performance
latency_ms: int
tokens_used: int
cost_usd: float
# Quality
user_feedback: Optional[int] # 1-5 rating
retry_count: int
was_cached: bool
# Context
user_id: str
provider: str
model: str
prompt_version: int
class MetricsCollector:
def __init__(self, analytics_backend):
self.backend = analytics_backend
async def record_request(self, metrics: AIMetrics):
"""Record detailed metrics for each AI request"""
await self.backend.write({
**metrics.__dict__,
# Add derived metrics
'tokens_per_second': metrics.tokens_used / (metrics.latency_ms / 1000),
'cost_per_token': metrics.cost_usd / metrics.tokens_used if metrics.tokens_used > 0 else 0,
})
async def get_feature_health(self, feature_name: str, hours: int = 24) -> Dict[str, Any]:
"""Aggregate health metrics for a feature"""
metrics = await self.backend.query(
feature=feature_name,
since=datetime.now() - timedelta(hours=hours)
)
return {
'total_requests': len(metrics),
'avg_latency_ms': sum(m.latency_ms for m in metrics) / len(metrics),
'total_cost_usd': sum(m.cost_usd for m in metrics),
'cache_hit_rate': sum(1 for m in metrics if m.was_cached) / len(metrics),
'avg_user_rating': sum(m.user_feedback for m in metrics if m.user_feedback) /
sum(1 for m in metrics if m.user_feedback),
'error_rate': sum(m.retry_count > 0 for m in metrics) / len(metrics)
}
```
These metrics help you understand both technical performance and business impact. You'll spot issues before users complain and can quantify the value your AI features provide.
## Testing AI Features
### Deterministic Testing
AI features are probabilistic, but your tests shouldn't be:
```python
import pytest
from unittest.mock import Mock
class TestAIFeature:
@pytest.fixture
def mock_ai_client(self):
"""Use mocked responses for deterministic tests"""
client = Mock()
client.complete.return_value = AIResponse(
content="This is a test summary of the document.",
tokens_used=15,
provider=ModelProvider.OPENAI,
latency_ms=234,
metadata={}
)
return client
async def test_summary_generation(self, mock_ai_client):
feature = SummaryFeature(ai_client=mock_ai_client)
result = await feature.generate_summary("Long document text...")
# Test your logic, not the AI model
assert result.summary is not None
assert len(result.summary) > 0
assert result.tokens_used == 15
# Verify correct prompt was used
call_args = mock_ai_client.complete.call_args
assert "summarize" in call_args[0][0].lower()
async def test_handles_rate_limiting(self, mock_ai_client):
mock_ai_client.complete.side_effect = RateLimitError(retry_after=60)
feature = SummaryFeature(ai_client=mock_ai_client)
with pytest.raises(UserFacingError) as exc_info:
await feature.generate_summary("Text")
assert "high demand" in str(exc_info.value).lower()
assert exc_info.value.retry_after == 60
```
### Integration Testing with Real Models
Run integration tests against real models in CI/CD, but carefully:
```typescript
describe('AI Feature Integration Tests', () => {
// Only run these in CI or with explicit flag
const shouldRunIntegration = process.env.RUN_INTEGRATION_TESTS === 'true';
(shouldRunIntegration ? it : it.skip)('generates accurate code summaries', async () => {
const feature = new CodeSummaryFeature(realAIClient);
const testCode = `
function calculateTotal(items) {
return items.reduce((sum, item) => sum + item.price, 0);
}
`;
const summary = await feature.summarize(testCode);
// Test for expected concepts, not exact wording
expect(summary.toLowerCase()).toContain('calculate');
expect(summary.toLowerCase()).toContain('total');
expect(summary.toLowerCase()).toMatch(/sum|add/);
}, 30000); // Longer timeout for real API calls
});
```
See [quality-control](/lessons/quality-control) for more on testing strategies.
## Security Considerations
Never trust AI output blindly:
```python
class SecureAIFeature:
def __init__(self):
self.output_validator = OutputValidator()
self.sanitizer = ContentSanitizer()
async def generate_user_content(self, prompt: str, user_id: str) -> str:
# Sanitize input
clean_prompt = self.sanitizer.sanitize_input(prompt)
# Add safety instructions
safe_prompt = f"""
{clean_prompt}
IMPORTANT: Do not include:
- Personal information
- Harmful content
- Code that could be malicious
"""
response = await self.ai_client.complete(safe_prompt)
# Validate output
if not self.output_validator.is_safe(response.content):
await self.log_unsafe_output(response, user_id)
raise UnsafeContentError("Generated content failed safety checks")
# Sanitize output before returning to user
return self.sanitizer.sanitize_output(response.content)
```
For detailed security practices, review [security-considerations](/lessons/security-considerations).
## Cost Management
### Budget Enforcement
Implement hard limits to prevent runaway costs:
```python
class BudgetEnforcer:
def __init__(self, daily_budget_usd: float):
self.daily_budget = daily_budget_usd
self.redis = redis_client
async def check_budget(self, estimated_cost: float) -> bool:
today = datetime.now().strftime('%Y-%m-%d')
key = f"ai_spending:{today}"
current_spending = float(await self.redis.get(key) or 0)
if current_spending + estimated_cost > self.daily_budget:
await self.notify_budget_exceeded(current_spending, estimated_cost)
return False
return True
async def record_spending(self, cost: float):
today = datetime.now().strftime('%Y-%m-%d')
key = f"ai_spending:{today}"
await self.redis.incrbyfloat(key, cost)
await self.redis.expire(key, 86400 * 7) # Keep for 7 days
```
## Real-World Example: Document Analysis Pipeline
Here's how these techniques come together:
```python
class DocumentAnalysisPipeline:
def __init__(self):
self.orchestrator = AIOrchestrator(
primary_provider=ModelProvider.OPENAI,
fallback_provider=ModelProvider.ANTHROPIC
)
self.cache = IntelligentCache(redis_client)
self.metrics = MetricsCollector(analytics_backend)
self.budget = BudgetEnforcer(daily_budget_usd=100)
self.context_builder = SmartContextBuilder(embedding_model)
async def analyze_document(self, doc: Document, user_id: str) -> Analysis:
request_id = generate_request_id()
start_time = time.time()
try:
# Check budget
estimated_cost = self.estimate_cost(doc)
if not await self.budget.check_budget(estimated_cost):
raise BudgetExceededError("Daily AI budget exceeded")
# Build smart context
context = await self.context_builder.build_context(
doc.content,
available_docs=doc.related_documents
)
# Create request
ai_request = AIRequest(
prompt=self.build_analysis_prompt(doc, context),
max_tokens=1000,
temperature=0.3,
system_message="You are an expert document analyst."
)
# Execute with caching
response = await self.cache.get_or_execute(
ai_request,
lambda req: self.orchestrator.execute(req)
)
# Record metrics
await self.metrics.record_request(AIMetrics(
request_id=request_id,
feature_name="document_analysis",
timestamp=datetime.now(),
latency_ms=int((time.time() - start_time) * 1000),
tokens_used=response.tokens_used,
cost_usd=response.tokens_used * 0.00002, # Example rate
user_feedback=None,
retry_count=0,
was_cached=response.metadata.get('source') == 'cache',
user_id=user_id,
provider=response.provider.value,
model="gpt-4",
prompt_version=2
))
await self.budget.record_spending(response.tokens_used * 0.00002)
return Analysis.from_ai_response(response)
except Exception as e:
await self.metrics.record_error(str(e))
raise
```
This example demonstrates orchestration, caching, budgeting, metrics, smart context selection, and error handling working together. This is production-ready AI feature code.
## Key Takeaways
Building AI-powered features requires thinking beyond traditional software patterns. You're managing probabilistic systems with unique characteristics—token budgets, context windows, latency, and costs that scale with usage rather than infrastructure.
The techniques we've covered—orchestration layers, smart context selection, conversation memory, graceful degradation, caching, monitoring, and budget enforcement—aren't optional extras. They're foundational patterns that separate hobby projects from production systems.
Start with solid architecture. Build in resilience from day one. Monitor everything. And remember: AI features should enhance your application, not become a single point of failure. When your AI provider goes down, your app should degrade gracefully, not crash completely.
The developers who master these advanced techniques will build the AI-powered features that users love and that scale reliably to millions of requests. The ones who skip these patterns will spend their time firefighting production incidents and explaining to management why the AI bill exploded.
Which developer will you be?