LLM Integration Plan¶
Overview¶
This document outlines the integration of multiple LLM providers into the content generation system, focusing on cost optimization, flexibility, and performance. The plan prioritizes direct API integration for MVP with clear upgrade paths to more sophisticated routing and optimization strategies.
LLM Provider Comparison¶
Cost-Effective Options (2025 Pricing)¶
| Provider/Model | Input Cost ($/1M tokens) | Output Cost ($/1M tokens) | Latency | Best Use Case |
|---|---|---|---|---|
| OpenAI GPT-4o-mini | $0.15 | $0.60 | 0.5-1s | Default for all content types, best balance |
| Groq Llama 3.1 70B | $0.05-0.10 | $0.20-0.30 | 0.2-0.5s | Bulk social posts, fastest option |
| Google Gemini 2.0 Flash | $0.075 | $0.30 | 0.3-0.8s | Local content with geo-data |
| DeepSeek V3 | $0.10 | $0.40 | 0.6-1.2s | Educational/science content |
| Anthropic Claude 3.5 Sonnet | $3.00 | $15.00 | 1-2s | High-quality final reviews |
Architecture Design¶
LLM Client Abstraction Layer¶
# llm_client.py - Enhanced version with all providers
import os
import asyncio
from typing import Optional, Dict, List, Union
from abc import ABC, abstractmethod
from dataclasses import dataclass
import logging
# Provider SDKs
from openai import AsyncOpenAI
from anthropic import AsyncAnthropic
import google.generativeai as genai
from groq import AsyncGroq
# Setup logging
logger = logging.getLogger(__name__)
@dataclass
class LLMResponse:
content: str
model: str
provider: str
usage: Dict[str, int]
cost: float
latency_ms: int
class BaseLLMProvider(ABC):
"""Abstract base class for LLM providers"""
@abstractmethod
async def generate(self, prompt: str, model: str, **kwargs) -> LLMResponse:
pass
@abstractmethod
def calculate_cost(self, input_tokens: int, output_tokens: int, model: str) -> float:
pass
class OpenAIProvider(BaseLLMProvider):
def __init__(self):
self.client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
self.pricing = {
"gpt-4o-mini": {"input": 0.00015, "output": 0.0006},
"gpt-4o": {"input": 0.005, "output": 0.015},
"gpt-4-turbo": {"input": 0.01, "output": 0.03}
}
async def generate(self, prompt: str, model: str = "gpt-4o-mini", **kwargs) -> LLMResponse:
import time
start_time = time.time()
response = await self.client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
max_tokens=kwargs.get("max_tokens", 4000),
temperature=kwargs.get("temperature", 0.7)
)
latency_ms = int((time.time() - start_time) * 1000)
usage = response.usage
cost = self.calculate_cost(usage.prompt_tokens, usage.completion_tokens, model)
return LLMResponse(
content=response.choices[0].message.content,
model=model,
provider="openai",
usage={"input": usage.prompt_tokens, "output": usage.completion_tokens},
cost=cost,
latency_ms=latency_ms
)
def calculate_cost(self, input_tokens: int, output_tokens: int, model: str) -> float:
prices = self.pricing.get(model, self.pricing["gpt-4o-mini"])
input_cost = (input_tokens / 1000) * prices["input"]
output_cost = (output_tokens / 1000) * prices["output"]
return round(input_cost + output_cost, 6)
class AnthropicProvider(BaseLLMProvider):
def __init__(self):
self.client = AsyncAnthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
self.pricing = {
"claude-3-5-sonnet-20241022": {"input": 0.003, "output": 0.015},
"claude-3-haiku-20240307": {"input": 0.00025, "output": 0.00125}
}
async def generate(self, prompt: str, model: str = "claude-3-5-sonnet-20241022", **kwargs) -> LLMResponse:
import time
start_time = time.time()
response = await self.client.messages.create(
model=model,
max_tokens=kwargs.get("max_tokens", 4000),
messages=[{"role": "user", "content": prompt}]
)
latency_ms = int((time.time() - start_time) * 1000)
# Extract usage from response
input_tokens = response.usage.input_tokens
output_tokens = response.usage.output_tokens
cost = self.calculate_cost(input_tokens, output_tokens, model)
return LLMResponse(
content=response.content[0].text,
model=model,
provider="anthropic",
usage={"input": input_tokens, "output": output_tokens},
cost=cost,
latency_ms=latency_ms
)
def calculate_cost(self, input_tokens: int, output_tokens: int, model: str) -> float:
prices = self.pricing.get(model, self.pricing["claude-3-5-sonnet-20241022"])
input_cost = (input_tokens / 1000) * prices["input"]
output_cost = (output_tokens / 1000) * prices["output"]
return round(input_cost + output_cost, 6)
class GroqProvider(BaseLLMProvider):
def __init__(self):
self.client = AsyncGroq(api_key=os.getenv("GROQ_API_KEY"))
self.pricing = {
"llama-3.1-70b-versatile": {"input": 0.00005, "output": 0.00008},
"llama-3.1-8b-instant": {"input": 0.00005, "output": 0.00008},
"mixtral-8x7b-32768": {"input": 0.00005, "output": 0.00008}
}
async def generate(self, prompt: str, model: str = "llama-3.1-70b-versatile", **kwargs) -> LLMResponse:
import time
start_time = time.time()
response = await self.client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
max_tokens=kwargs.get("max_tokens", 4000),
temperature=kwargs.get("temperature", 0.7)
)
latency_ms = int((time.time() - start_time) * 1000)
usage = response.usage
cost = self.calculate_cost(usage.prompt_tokens, usage.completion_tokens, model)
return LLMResponse(
content=response.choices[0].message.content,
model=model,
provider="groq",
usage={"input": usage.prompt_tokens, "output": usage.completion_tokens},
cost=cost,
latency_ms=latency_ms
)
def calculate_cost(self, input_tokens: int, output_tokens: int, model: str) -> float:
prices = self.pricing.get(model, {"input": 0.00005, "output": 0.00008})
input_cost = (input_tokens / 1000) * prices["input"]
output_cost = (output_tokens / 1000) * prices["output"]
return round(input_cost + output_cost, 6)
class GoogleProvider(BaseLLMProvider):
def __init__(self):
genai.configure(api_key=os.getenv("GOOGLE_API_KEY"))
self.pricing = {
"gemini-2.0-flash-exp": {"input": 0.000075, "output": 0.0003},
"gemini-1.5-pro": {"input": 0.00125, "output": 0.005},
"gemini-1.5-flash": {"input": 0.000075, "output": 0.0003}
}
async def generate(self, prompt: str, model: str = "gemini-2.0-flash-exp", **kwargs) -> LLMResponse:
import time
start_time = time.time()
model_instance = genai.GenerativeModel(model)
response = await model_instance.generate_content_async(
prompt,
generation_config=genai.GenerationConfig(
max_output_tokens=kwargs.get("max_tokens", 4000),
temperature=kwargs.get("temperature", 0.7)
)
)
latency_ms = int((time.time() - start_time) * 1000)
# Estimate tokens (Google doesn't provide exact counts)
input_tokens = len(prompt.split()) * 1.3 # Rough estimate
output_tokens = len(response.text.split()) * 1.3
cost = self.calculate_cost(int(input_tokens), int(output_tokens), model)
return LLMResponse(
content=response.text,
model=model,
provider="google",
usage={"input": int(input_tokens), "output": int(output_tokens)},
cost=cost,
latency_ms=latency_ms
)
def calculate_cost(self, input_tokens: int, output_tokens: int, model: str) -> float:
prices = self.pricing.get(model, self.pricing["gemini-2.0-flash-exp"])
input_cost = (input_tokens / 1000) * prices["input"]
output_cost = (output_tokens / 1000) * prices["output"]
return round(input_cost + output_cost, 6)
class LLMClient:
"""Main client for managing multiple LLM providers"""
def __init__(self):
self.providers = {
"openai": OpenAIProvider(),
"anthropic": AnthropicProvider(),
"groq": GroqProvider(),
"google": GoogleProvider()
}
self.default_models = {
"openai": "gpt-4o-mini",
"anthropic": "claude-3-5-sonnet-20241022",
"groq": "llama-3.1-70b-versatile",
"google": "gemini-2.0-flash-exp"
}
async def generate(
self,
prompt: str,
provider: str = "openai",
model: Optional[str] = None,
**kwargs
) -> LLMResponse:
"""Generate content using specified provider and model"""
if provider not in self.providers:
raise ValueError(f"Unknown provider: {provider}")
model = model or self.default_models[provider]
try:
response = await self.providers[provider].generate(prompt, model, **kwargs)
# Log usage for monitoring
logger.info(f"LLM Generation - Provider: {provider}, Model: {model}, "
f"Cost: ${response.cost:.4f}, Latency: {response.latency_ms}ms")
return response
except Exception as e:
logger.error(f"LLM generation failed - Provider: {provider}, Error: {str(e)}")
raise
async def generate_with_fallback(
self,
prompt: str,
providers: List[str] = ["openai", "groq", "google"],
**kwargs
) -> LLMResponse:
"""Try multiple providers in order until one succeeds"""
last_error = None
for provider in providers:
try:
return await self.generate(prompt, provider, **kwargs)
except Exception as e:
last_error = e
logger.warning(f"Provider {provider} failed, trying next")
continue
raise Exception(f"All providers failed. Last error: {last_error}")
def select_provider_for_task(self, content_type: str, priority: str = "balanced") -> tuple[str, str]:
"""Select best provider/model based on task and priority"""
if priority == "cost":
# Cheapest options
if content_type == "social":
return ("groq", "llama-3.1-70b-versatile")
else:
return ("openai", "gpt-4o-mini")
elif priority == "quality":
# Highest quality
if content_type == "blog":
return ("anthropic", "claude-3-5-sonnet-20241022")
else:
return ("openai", "gpt-4o")
elif priority == "speed":
# Fastest response
return ("groq", "llama-3.1-70b-versatile")
else: # balanced
# Good balance of cost/quality/speed
if content_type == "local":
return ("google", "gemini-2.0-flash-exp")
else:
return ("openai", "gpt-4o-mini")
Integration with CPM¶
# Updated generate_content function in CPM
async def generate_content(job_id: str, params: Dict):
try:
# Update job status
supabase.table("jobs").update({
"status": "in_progress",
"updated_at": datetime.utcnow().isoformat()
}).eq("id", job_id).execute()
# Get prompt from IM
prompt_data = await get_prompt_from_im(params)
# Initialize LLM client
llm_client = LLMClient()
# Select provider based on content type and priority
priority = params.get("priority", "balanced")
provider, model = llm_client.select_provider_for_task(
params["content_type"],
priority
)
# Override with user selection if provided
if params.get("llm_provider"):
provider = params["llm_provider"]
model = params.get("model")
# Generate content
response = await llm_client.generate(
prompt=prompt_data["prompt"],
provider=provider,
model=model,
temperature=0.7 if params["content_type"] == "blog" else 0.8
)
# Process result
result = {
"content": response.content,
"title": extract_title(response.content, params["content_type"]),
"metadata": {
"word_count": len(response.content.split()),
"keywords_used": params["keywords"],
"llm_provider": response.provider,
"model": response.model,
"generation_cost": response.cost,
"generation_time_ms": response.latency_ms,
"token_usage": response.usage
}
}
# Store in Supabase
supabase.table("jobs").update({
"status": "completed",
"result": result,
"updated_at": datetime.utcnow().isoformat()
}).eq("id", job_id).execute()
# Track costs
await track_llm_usage(response)
except Exception as e:
logger.error(f"Content generation failed: {str(e)}")
supabase.table("jobs").update({
"status": "failed",
"error": str(e),
"updated_at": datetime.utcnow().isoformat()
}).eq("id", job_id).execute()
Cost Tracking and Monitoring¶
Database Schema for Usage Tracking¶
CREATE TABLE llm_usage (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
job_id UUID REFERENCES jobs(id),
provider TEXT NOT NULL,
model TEXT NOT NULL,
input_tokens INTEGER NOT NULL,
output_tokens INTEGER NOT NULL,
cost DECIMAL(10, 6) NOT NULL,
latency_ms INTEGER NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- Indexes for analytics
CREATE INDEX idx_usage_provider ON llm_usage(provider);
CREATE INDEX idx_usage_created ON llm_usage(created_at);
CREATE INDEX idx_usage_job ON llm_usage(job_id);
-- Daily usage summary view
CREATE VIEW daily_llm_costs AS
SELECT
DATE(created_at) as date,
provider,
model,
COUNT(*) as request_count,
SUM(input_tokens) as total_input_tokens,
SUM(output_tokens) as total_output_tokens,
SUM(cost) as total_cost,
AVG(latency_ms) as avg_latency_ms
FROM llm_usage
GROUP BY DATE(created_at), provider, model;
Usage Tracking Function¶
async def track_llm_usage(response: LLMResponse, job_id: Optional[str] = None):
"""Track LLM usage for cost monitoring"""
usage_data = {
"job_id": job_id,
"provider": response.provider,
"model": response.model,
"input_tokens": response.usage["input"],
"output_tokens": response.usage["output"],
"cost": response.cost,
"latency_ms": response.latency_ms,
"created_at": datetime.utcnow().isoformat()
}
supabase.table("llm_usage").insert(usage_data).execute()
Environment Variables¶
# OpenAI
OPENAI_API_KEY=sk-...
# Anthropic
ANTHROPIC_API_KEY=sk-ant-...
# Google
GOOGLE_API_KEY=AIza...
# Groq
GROQ_API_KEY=gsk_...
# DeepSeek (if added)
DEEPSEEK_API_KEY=...
# Cost limits (optional)
DAILY_COST_LIMIT=10.00
MONTHLY_COST_LIMIT=200.00
Testing Strategy¶
Unit Tests for LLM Client¶
# test_llm_client.py
import pytest
from unittest.mock import AsyncMock, patch
from llm_client import LLMClient, LLMResponse
@pytest.mark.asyncio
async def test_openai_generation():
client = LLMClient()
with patch.object(client.providers["openai"].client.chat.completions, 'create') as mock_create:
mock_create.return_value = AsyncMock(
choices=[AsyncMock(message=AsyncMock(content="Test response"))],
usage=AsyncMock(prompt_tokens=10, completion_tokens=20)
)
response = await client.generate("Test prompt", provider="openai")
assert response.content == "Test response"
assert response.provider == "openai"
assert response.usage["input"] == 10
assert response.usage["output"] == 20
@pytest.mark.asyncio
async def test_provider_selection():
client = LLMClient()
# Test cost priority
provider, model = client.select_provider_for_task("social", "cost")
assert provider == "groq"
# Test quality priority
provider, model = client.select_provider_for_task("blog", "quality")
assert provider == "anthropic"
# Test speed priority
provider, model = client.select_provider_for_task("any", "speed")
assert provider == "groq"
@pytest.mark.asyncio
async def test_fallback_mechanism():
client = LLMClient()
# Mock first provider to fail
with patch.object(client.providers["openai"], 'generate', side_effect=Exception("API Error")):
with patch.object(client.providers["groq"], 'generate') as mock_groq:
mock_groq.return_value = LLMResponse(
content="Fallback response",
model="llama-3.1-70b",
provider="groq",
usage={"input": 10, "output": 20},
cost=0.001,
latency_ms=300
)
response = await client.generate_with_fallback("Test prompt")
assert response.provider == "groq"
Cost Optimization Strategies¶
1. Smart Routing Based on Content Type¶
class SmartRouter:
def __init__(self, llm_client: LLMClient, supabase_client):
self.llm_client = llm_client
self.supabase = supabase_client
async def route_request(self, params: Dict) -> tuple[str, str]:
"""Route to optimal provider based on content and current usage"""
# Check daily spend
daily_spend = await self.get_daily_spend()
if daily_spend > float(os.getenv("DAILY_COST_LIMIT", "10.00")):
# Switch to cheapest provider
return ("groq", "llama-3.1-70b-versatile")
# Content-based routing
if params["content_type"] == "social":
# Short content, use fast/cheap
return ("groq", "llama-3.1-70b-versatile")
elif params["content_type"] == "blog":
if params.get("client_id") == "PASCO":
# Science content needs accuracy
return ("openai", "gpt-4o-mini")
else:
# General blog content
return ("openai", "gpt-4o-mini")
elif params["content_type"] == "local":
# Local content benefits from Google's geo understanding
return ("google", "gemini-2.0-flash-exp")
return ("openai", "gpt-4o-mini") # Default
async def get_daily_spend(self) -> float:
"""Get today's total LLM spend"""
today = datetime.utcnow().date()
result = self.supabase.table("llm_usage").select("cost").gte(
"created_at", f"{today}T00:00:00Z"
).execute()
return sum(row["cost"] for row in result.data)
2. Caching Strategy¶
# cache_manager.py
import hashlib
import json
from typing import Optional
class CacheManager:
def __init__(self, supabase_client):
self.supabase = supabase_client
def generate_cache_key(self, prompt: str, provider: str, model: str) -> str:
"""Generate unique cache key for prompt"""
data = f"{prompt}:{provider}:{model}"
return hashlib.sha256(data.encode()).hexdigest()
async def get_cached(self, prompt: str, provider: str, model: str) -> Optional[str]:
"""Check if we have a cached response"""
cache_key = self.generate_cache_key(prompt, provider, model)
result = self.supabase.table("llm_cache").select("*").eq(
"cache_key", cache_key
).gte(
"expires_at", datetime.utcnow().isoformat()
).execute()
if result.data:
return result.data[0]["content"]
return None
async def cache_response(
self,
prompt: str,
provider: str,
model: str,
content: str,
ttl_hours: int = 24
):
"""Cache LLM response"""
cache_key = self.generate_cache_key(prompt, provider, model)
expires_at = datetime.utcnow() + timedelta(hours=ttl_hours)
self.supabase.table("llm_cache").upsert({
"cache_key": cache_key,
"prompt_hash": hashlib.sha256(prompt.encode()).hexdigest()[:16],
"provider": provider,
"model": model,
"content": content,
"expires_at": expires_at.isoformat(),
"created_at": datetime.utcnow().isoformat()
}).execute()
V2 Enhancements¶
1. Advanced Model Selection with LangChain¶
from langchain.llms import OpenAI, Anthropic
from langchain.model_laboratory import ModelLaboratory
# Compare outputs from multiple models
models = [
OpenAI(model="gpt-4o-mini"),
Anthropic(model="claude-3-haiku"),
# Add more models
]
model_lab = ModelLaboratory.from_llms(models)
model_lab.compare("Generate a blog post about solar energy")
2. Self-Hosted Model Integration¶
class OllamaProvider(BaseLLMProvider):
"""Provider for self-hosted models via Ollama"""
def __init__(self, base_url: str = "http://localhost:11434"):
self.base_url = base_url
self.pricing = {"all": {"input": 0, "output": 0}} # Free!
async def generate(self, prompt: str, model: str = "llama3", **kwargs) -> LLMResponse:
import httpx
import time
start_time = time.time()
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.base_url}/api/generate",
json={
"model": model,
"prompt": prompt,
"stream": False
}
)
data = response.json()
latency_ms = int((time.time() - start_time) * 1000)
return LLMResponse(
content=data["response"],
model=model,
provider="ollama",
usage={"input": 0, "output": 0}, # Ollama doesn't track tokens
cost=0,
latency_ms=latency_ms
)
3. A/B Testing Framework¶
class ABTestManager:
def __init__(self, llm_client: LLMClient, supabase_client):
self.llm_client = llm_client
self.supabase = supabase_client
async def run_ab_test(self, prompt: str, variants: List[Dict]) -> Dict:
"""Run A/B test with different providers/models"""
results = []
for variant in variants:
response = await self.llm_client.generate(
prompt,
provider=variant["provider"],
model=variant["model"]
)
results.append({
"variant": variant,
"response": response,
"quality_score": await self.evaluate_quality(response.content)
})
# Store test results
self.supabase.table("ab_tests").insert({
"prompt": prompt[:100], # Store truncated prompt
"variants": variants,
"results": [
{
"provider": r["variant"]["provider"],
"model": r["variant"]["model"],
"cost": r["response"].cost,
"latency_ms": r["response"].latency_ms,
"quality_score": r["quality_score"]
} for r in results
],
"created_at": datetime.utcnow().isoformat()
}).execute()
return results
Monitoring Dashboard Queries¶
-- Daily cost by provider
SELECT
provider,
DATE(created_at) as date,
SUM(cost) as daily_cost,
COUNT(*) as requests
FROM llm_usage
WHERE created_at >= NOW() - INTERVAL '7 days'
GROUP BY provider, DATE(created_at)
ORDER BY date DESC, daily_cost DESC;
-- Average latency by model
SELECT
provider,
model,
AVG(latency_ms) as avg_latency,
PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY latency_ms) as p95_latency
FROM llm_usage
WHERE created_at >= NOW() - INTERVAL '24 hours'
GROUP BY provider, model;
-- Cost per content type
SELECT
j.params->>'content_type' as content_type,
l.provider,
COUNT(*) as requests,
SUM(l.cost) as total_cost,
AVG(l.cost) as avg_cost_per_request
FROM llm_usage l
JOIN jobs j ON l.job_id = j.id
WHERE l.created_at >= NOW() - INTERVAL '7 days'
GROUP BY content_type, l.provider;
Best Practices¶
- Always use retry logic with exponential backoff
- Monitor rate limits and implement proper queuing
- Cache similar requests to reduce costs
- Use streaming for real-time applications (V2)
- Implement cost alerts to prevent bill shock
- Regular model evaluation to ensure quality
- Keep API keys secure using environment variables
- Log all requests for debugging and analytics
Estimated Costs¶
MVP (1000 generations/month)¶
- 70% GPT-4o-mini: ~$15
- 20% Groq Llama: ~$2
- 10% Others: ~$3
- Total: ~$20/month
Production (10,000 generations/month)¶
- Mixed routing strategy: ~$100-150/month
- With caching (30% hit rate): ~$70-105/month
- Self-hosted addition: +$50 infrastructure, -$30 API costs
- Total: ~$90-125/month