Skip to content

LLM Integration Plan

Overview

This document outlines the integration of multiple LLM providers into the content generation system, focusing on cost optimization, flexibility, and performance. The plan prioritizes direct API integration for MVP with clear upgrade paths to more sophisticated routing and optimization strategies.

LLM Provider Comparison

Cost-Effective Options (2025 Pricing)

Provider/Model Input Cost ($/1M tokens) Output Cost ($/1M tokens) Latency Best Use Case
OpenAI GPT-4o-mini $0.15 $0.60 0.5-1s Default for all content types, best balance
Groq Llama 3.1 70B $0.05-0.10 $0.20-0.30 0.2-0.5s Bulk social posts, fastest option
Google Gemini 2.0 Flash $0.075 $0.30 0.3-0.8s Local content with geo-data
DeepSeek V3 $0.10 $0.40 0.6-1.2s Educational/science content
Anthropic Claude 3.5 Sonnet $3.00 $15.00 1-2s High-quality final reviews

Architecture Design

LLM Client Abstraction Layer

# llm_client.py - Enhanced version with all providers
import os
import asyncio
from typing import Optional, Dict, List, Union
from abc import ABC, abstractmethod
from dataclasses import dataclass
import logging

# Provider SDKs
from openai import AsyncOpenAI
from anthropic import AsyncAnthropic
import google.generativeai as genai
from groq import AsyncGroq

# Setup logging
logger = logging.getLogger(__name__)

@dataclass
class LLMResponse:
    content: str
    model: str
    provider: str
    usage: Dict[str, int]
    cost: float
    latency_ms: int

class BaseLLMProvider(ABC):
    """Abstract base class for LLM providers"""

    @abstractmethod
    async def generate(self, prompt: str, model: str, **kwargs) -> LLMResponse:
        pass

    @abstractmethod
    def calculate_cost(self, input_tokens: int, output_tokens: int, model: str) -> float:
        pass

class OpenAIProvider(BaseLLMProvider):
    def __init__(self):
        self.client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
        self.pricing = {
            "gpt-4o-mini": {"input": 0.00015, "output": 0.0006},
            "gpt-4o": {"input": 0.005, "output": 0.015},
            "gpt-4-turbo": {"input": 0.01, "output": 0.03}
        }

    async def generate(self, prompt: str, model: str = "gpt-4o-mini", **kwargs) -> LLMResponse:
        import time
        start_time = time.time()

        response = await self.client.chat.completions.create(
            model=model,
            messages=[{"role": "user", "content": prompt}],
            max_tokens=kwargs.get("max_tokens", 4000),
            temperature=kwargs.get("temperature", 0.7)
        )

        latency_ms = int((time.time() - start_time) * 1000)
        usage = response.usage
        cost = self.calculate_cost(usage.prompt_tokens, usage.completion_tokens, model)

        return LLMResponse(
            content=response.choices[0].message.content,
            model=model,
            provider="openai",
            usage={"input": usage.prompt_tokens, "output": usage.completion_tokens},
            cost=cost,
            latency_ms=latency_ms
        )

    def calculate_cost(self, input_tokens: int, output_tokens: int, model: str) -> float:
        prices = self.pricing.get(model, self.pricing["gpt-4o-mini"])
        input_cost = (input_tokens / 1000) * prices["input"]
        output_cost = (output_tokens / 1000) * prices["output"]
        return round(input_cost + output_cost, 6)

class AnthropicProvider(BaseLLMProvider):
    def __init__(self):
        self.client = AsyncAnthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
        self.pricing = {
            "claude-3-5-sonnet-20241022": {"input": 0.003, "output": 0.015},
            "claude-3-haiku-20240307": {"input": 0.00025, "output": 0.00125}
        }

    async def generate(self, prompt: str, model: str = "claude-3-5-sonnet-20241022", **kwargs) -> LLMResponse:
        import time
        start_time = time.time()

        response = await self.client.messages.create(
            model=model,
            max_tokens=kwargs.get("max_tokens", 4000),
            messages=[{"role": "user", "content": prompt}]
        )

        latency_ms = int((time.time() - start_time) * 1000)

        # Extract usage from response
        input_tokens = response.usage.input_tokens
        output_tokens = response.usage.output_tokens
        cost = self.calculate_cost(input_tokens, output_tokens, model)

        return LLMResponse(
            content=response.content[0].text,
            model=model,
            provider="anthropic",
            usage={"input": input_tokens, "output": output_tokens},
            cost=cost,
            latency_ms=latency_ms
        )

    def calculate_cost(self, input_tokens: int, output_tokens: int, model: str) -> float:
        prices = self.pricing.get(model, self.pricing["claude-3-5-sonnet-20241022"])
        input_cost = (input_tokens / 1000) * prices["input"]
        output_cost = (output_tokens / 1000) * prices["output"]
        return round(input_cost + output_cost, 6)

class GroqProvider(BaseLLMProvider):
    def __init__(self):
        self.client = AsyncGroq(api_key=os.getenv("GROQ_API_KEY"))
        self.pricing = {
            "llama-3.1-70b-versatile": {"input": 0.00005, "output": 0.00008},
            "llama-3.1-8b-instant": {"input": 0.00005, "output": 0.00008},
            "mixtral-8x7b-32768": {"input": 0.00005, "output": 0.00008}
        }

    async def generate(self, prompt: str, model: str = "llama-3.1-70b-versatile", **kwargs) -> LLMResponse:
        import time
        start_time = time.time()

        response = await self.client.chat.completions.create(
            model=model,
            messages=[{"role": "user", "content": prompt}],
            max_tokens=kwargs.get("max_tokens", 4000),
            temperature=kwargs.get("temperature", 0.7)
        )

        latency_ms = int((time.time() - start_time) * 1000)
        usage = response.usage
        cost = self.calculate_cost(usage.prompt_tokens, usage.completion_tokens, model)

        return LLMResponse(
            content=response.choices[0].message.content,
            model=model,
            provider="groq",
            usage={"input": usage.prompt_tokens, "output": usage.completion_tokens},
            cost=cost,
            latency_ms=latency_ms
        )

    def calculate_cost(self, input_tokens: int, output_tokens: int, model: str) -> float:
        prices = self.pricing.get(model, {"input": 0.00005, "output": 0.00008})
        input_cost = (input_tokens / 1000) * prices["input"]
        output_cost = (output_tokens / 1000) * prices["output"]
        return round(input_cost + output_cost, 6)

class GoogleProvider(BaseLLMProvider):
    def __init__(self):
        genai.configure(api_key=os.getenv("GOOGLE_API_KEY"))
        self.pricing = {
            "gemini-2.0-flash-exp": {"input": 0.000075, "output": 0.0003},
            "gemini-1.5-pro": {"input": 0.00125, "output": 0.005},
            "gemini-1.5-flash": {"input": 0.000075, "output": 0.0003}
        }

    async def generate(self, prompt: str, model: str = "gemini-2.0-flash-exp", **kwargs) -> LLMResponse:
        import time
        start_time = time.time()

        model_instance = genai.GenerativeModel(model)
        response = await model_instance.generate_content_async(
            prompt,
            generation_config=genai.GenerationConfig(
                max_output_tokens=kwargs.get("max_tokens", 4000),
                temperature=kwargs.get("temperature", 0.7)
            )
        )

        latency_ms = int((time.time() - start_time) * 1000)

        # Estimate tokens (Google doesn't provide exact counts)
        input_tokens = len(prompt.split()) * 1.3  # Rough estimate
        output_tokens = len(response.text.split()) * 1.3
        cost = self.calculate_cost(int(input_tokens), int(output_tokens), model)

        return LLMResponse(
            content=response.text,
            model=model,
            provider="google",
            usage={"input": int(input_tokens), "output": int(output_tokens)},
            cost=cost,
            latency_ms=latency_ms
        )

    def calculate_cost(self, input_tokens: int, output_tokens: int, model: str) -> float:
        prices = self.pricing.get(model, self.pricing["gemini-2.0-flash-exp"])
        input_cost = (input_tokens / 1000) * prices["input"]
        output_cost = (output_tokens / 1000) * prices["output"]
        return round(input_cost + output_cost, 6)

class LLMClient:
    """Main client for managing multiple LLM providers"""

    def __init__(self):
        self.providers = {
            "openai": OpenAIProvider(),
            "anthropic": AnthropicProvider(),
            "groq": GroqProvider(),
            "google": GoogleProvider()
        }
        self.default_models = {
            "openai": "gpt-4o-mini",
            "anthropic": "claude-3-5-sonnet-20241022",
            "groq": "llama-3.1-70b-versatile",
            "google": "gemini-2.0-flash-exp"
        }

    async def generate(
        self, 
        prompt: str, 
        provider: str = "openai", 
        model: Optional[str] = None,
        **kwargs
    ) -> LLMResponse:
        """Generate content using specified provider and model"""

        if provider not in self.providers:
            raise ValueError(f"Unknown provider: {provider}")

        model = model or self.default_models[provider]

        try:
            response = await self.providers[provider].generate(prompt, model, **kwargs)

            # Log usage for monitoring
            logger.info(f"LLM Generation - Provider: {provider}, Model: {model}, "
                       f"Cost: ${response.cost:.4f}, Latency: {response.latency_ms}ms")

            return response

        except Exception as e:
            logger.error(f"LLM generation failed - Provider: {provider}, Error: {str(e)}")
            raise

    async def generate_with_fallback(
        self,
        prompt: str,
        providers: List[str] = ["openai", "groq", "google"],
        **kwargs
    ) -> LLMResponse:
        """Try multiple providers in order until one succeeds"""

        last_error = None
        for provider in providers:
            try:
                return await self.generate(prompt, provider, **kwargs)
            except Exception as e:
                last_error = e
                logger.warning(f"Provider {provider} failed, trying next")
                continue

        raise Exception(f"All providers failed. Last error: {last_error}")

    def select_provider_for_task(self, content_type: str, priority: str = "balanced") -> tuple[str, str]:
        """Select best provider/model based on task and priority"""

        if priority == "cost":
            # Cheapest options
            if content_type == "social":
                return ("groq", "llama-3.1-70b-versatile")
            else:
                return ("openai", "gpt-4o-mini")

        elif priority == "quality":
            # Highest quality
            if content_type == "blog":
                return ("anthropic", "claude-3-5-sonnet-20241022")
            else:
                return ("openai", "gpt-4o")

        elif priority == "speed":
            # Fastest response
            return ("groq", "llama-3.1-70b-versatile")

        else:  # balanced
            # Good balance of cost/quality/speed
            if content_type == "local":
                return ("google", "gemini-2.0-flash-exp")
            else:
                return ("openai", "gpt-4o-mini")

Integration with CPM

# Updated generate_content function in CPM
async def generate_content(job_id: str, params: Dict):
    try:
        # Update job status
        supabase.table("jobs").update({
            "status": "in_progress",
            "updated_at": datetime.utcnow().isoformat()
        }).eq("id", job_id).execute()

        # Get prompt from IM
        prompt_data = await get_prompt_from_im(params)

        # Initialize LLM client
        llm_client = LLMClient()

        # Select provider based on content type and priority
        priority = params.get("priority", "balanced")
        provider, model = llm_client.select_provider_for_task(
            params["content_type"], 
            priority
        )

        # Override with user selection if provided
        if params.get("llm_provider"):
            provider = params["llm_provider"]
            model = params.get("model")

        # Generate content
        response = await llm_client.generate(
            prompt=prompt_data["prompt"],
            provider=provider,
            model=model,
            temperature=0.7 if params["content_type"] == "blog" else 0.8
        )

        # Process result
        result = {
            "content": response.content,
            "title": extract_title(response.content, params["content_type"]),
            "metadata": {
                "word_count": len(response.content.split()),
                "keywords_used": params["keywords"],
                "llm_provider": response.provider,
                "model": response.model,
                "generation_cost": response.cost,
                "generation_time_ms": response.latency_ms,
                "token_usage": response.usage
            }
        }

        # Store in Supabase
        supabase.table("jobs").update({
            "status": "completed",
            "result": result,
            "updated_at": datetime.utcnow().isoformat()
        }).eq("id", job_id).execute()

        # Track costs
        await track_llm_usage(response)

    except Exception as e:
        logger.error(f"Content generation failed: {str(e)}")
        supabase.table("jobs").update({
            "status": "failed",
            "error": str(e),
            "updated_at": datetime.utcnow().isoformat()
        }).eq("id", job_id).execute()

Cost Tracking and Monitoring

Database Schema for Usage Tracking

CREATE TABLE llm_usage (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    job_id UUID REFERENCES jobs(id),
    provider TEXT NOT NULL,
    model TEXT NOT NULL,
    input_tokens INTEGER NOT NULL,
    output_tokens INTEGER NOT NULL,
    cost DECIMAL(10, 6) NOT NULL,
    latency_ms INTEGER NOT NULL,
    created_at TIMESTAMPTZ DEFAULT NOW()
);

-- Indexes for analytics
CREATE INDEX idx_usage_provider ON llm_usage(provider);
CREATE INDEX idx_usage_created ON llm_usage(created_at);
CREATE INDEX idx_usage_job ON llm_usage(job_id);

-- Daily usage summary view
CREATE VIEW daily_llm_costs AS
SELECT 
    DATE(created_at) as date,
    provider,
    model,
    COUNT(*) as request_count,
    SUM(input_tokens) as total_input_tokens,
    SUM(output_tokens) as total_output_tokens,
    SUM(cost) as total_cost,
    AVG(latency_ms) as avg_latency_ms
FROM llm_usage
GROUP BY DATE(created_at), provider, model;

Usage Tracking Function

async def track_llm_usage(response: LLMResponse, job_id: Optional[str] = None):
    """Track LLM usage for cost monitoring"""
    usage_data = {
        "job_id": job_id,
        "provider": response.provider,
        "model": response.model,
        "input_tokens": response.usage["input"],
        "output_tokens": response.usage["output"],
        "cost": response.cost,
        "latency_ms": response.latency_ms,
        "created_at": datetime.utcnow().isoformat()
    }

    supabase.table("llm_usage").insert(usage_data).execute()

Environment Variables

# OpenAI
OPENAI_API_KEY=sk-...

# Anthropic
ANTHROPIC_API_KEY=sk-ant-...

# Google
GOOGLE_API_KEY=AIza...

# Groq
GROQ_API_KEY=gsk_...

# DeepSeek (if added)
DEEPSEEK_API_KEY=...

# Cost limits (optional)
DAILY_COST_LIMIT=10.00
MONTHLY_COST_LIMIT=200.00

Testing Strategy

Unit Tests for LLM Client

# test_llm_client.py
import pytest
from unittest.mock import AsyncMock, patch
from llm_client import LLMClient, LLMResponse

@pytest.mark.asyncio
async def test_openai_generation():
    client = LLMClient()

    with patch.object(client.providers["openai"].client.chat.completions, 'create') as mock_create:
        mock_create.return_value = AsyncMock(
            choices=[AsyncMock(message=AsyncMock(content="Test response"))],
            usage=AsyncMock(prompt_tokens=10, completion_tokens=20)
        )

        response = await client.generate("Test prompt", provider="openai")

        assert response.content == "Test response"
        assert response.provider == "openai"
        assert response.usage["input"] == 10
        assert response.usage["output"] == 20

@pytest.mark.asyncio
async def test_provider_selection():
    client = LLMClient()

    # Test cost priority
    provider, model = client.select_provider_for_task("social", "cost")
    assert provider == "groq"

    # Test quality priority
    provider, model = client.select_provider_for_task("blog", "quality")
    assert provider == "anthropic"

    # Test speed priority
    provider, model = client.select_provider_for_task("any", "speed")
    assert provider == "groq"

@pytest.mark.asyncio
async def test_fallback_mechanism():
    client = LLMClient()

    # Mock first provider to fail
    with patch.object(client.providers["openai"], 'generate', side_effect=Exception("API Error")):
        with patch.object(client.providers["groq"], 'generate') as mock_groq:
            mock_groq.return_value = LLMResponse(
                content="Fallback response",
                model="llama-3.1-70b",
                provider="groq",
                usage={"input": 10, "output": 20},
                cost=0.001,
                latency_ms=300
            )

            response = await client.generate_with_fallback("Test prompt")
            assert response.provider == "groq"

Cost Optimization Strategies

1. Smart Routing Based on Content Type

class SmartRouter:
    def __init__(self, llm_client: LLMClient, supabase_client):
        self.llm_client = llm_client
        self.supabase = supabase_client

    async def route_request(self, params: Dict) -> tuple[str, str]:
        """Route to optimal provider based on content and current usage"""

        # Check daily spend
        daily_spend = await self.get_daily_spend()

        if daily_spend > float(os.getenv("DAILY_COST_LIMIT", "10.00")):
            # Switch to cheapest provider
            return ("groq", "llama-3.1-70b-versatile")

        # Content-based routing
        if params["content_type"] == "social":
            # Short content, use fast/cheap
            return ("groq", "llama-3.1-70b-versatile")

        elif params["content_type"] == "blog":
            if params.get("client_id") == "PASCO":
                # Science content needs accuracy
                return ("openai", "gpt-4o-mini")
            else:
                # General blog content
                return ("openai", "gpt-4o-mini")

        elif params["content_type"] == "local":
            # Local content benefits from Google's geo understanding
            return ("google", "gemini-2.0-flash-exp")

        return ("openai", "gpt-4o-mini")  # Default

    async def get_daily_spend(self) -> float:
        """Get today's total LLM spend"""
        today = datetime.utcnow().date()
        result = self.supabase.table("llm_usage").select("cost").gte(
            "created_at", f"{today}T00:00:00Z"
        ).execute()

        return sum(row["cost"] for row in result.data)

2. Caching Strategy

# cache_manager.py
import hashlib
import json
from typing import Optional

class CacheManager:
    def __init__(self, supabase_client):
        self.supabase = supabase_client

    def generate_cache_key(self, prompt: str, provider: str, model: str) -> str:
        """Generate unique cache key for prompt"""
        data = f"{prompt}:{provider}:{model}"
        return hashlib.sha256(data.encode()).hexdigest()

    async def get_cached(self, prompt: str, provider: str, model: str) -> Optional[str]:
        """Check if we have a cached response"""
        cache_key = self.generate_cache_key(prompt, provider, model)

        result = self.supabase.table("llm_cache").select("*").eq(
            "cache_key", cache_key
        ).gte(
            "expires_at", datetime.utcnow().isoformat()
        ).execute()

        if result.data:
            return result.data[0]["content"]
        return None

    async def cache_response(
        self, 
        prompt: str, 
        provider: str, 
        model: str, 
        content: str,
        ttl_hours: int = 24
    ):
        """Cache LLM response"""
        cache_key = self.generate_cache_key(prompt, provider, model)
        expires_at = datetime.utcnow() + timedelta(hours=ttl_hours)

        self.supabase.table("llm_cache").upsert({
            "cache_key": cache_key,
            "prompt_hash": hashlib.sha256(prompt.encode()).hexdigest()[:16],
            "provider": provider,
            "model": model,
            "content": content,
            "expires_at": expires_at.isoformat(),
            "created_at": datetime.utcnow().isoformat()
        }).execute()

V2 Enhancements

1. Advanced Model Selection with LangChain

from langchain.llms import OpenAI, Anthropic
from langchain.model_laboratory import ModelLaboratory

# Compare outputs from multiple models
models = [
    OpenAI(model="gpt-4o-mini"),
    Anthropic(model="claude-3-haiku"),
    # Add more models
]

model_lab = ModelLaboratory.from_llms(models)
model_lab.compare("Generate a blog post about solar energy")

2. Self-Hosted Model Integration

class OllamaProvider(BaseLLMProvider):
    """Provider for self-hosted models via Ollama"""

    def __init__(self, base_url: str = "http://localhost:11434"):
        self.base_url = base_url
        self.pricing = {"all": {"input": 0, "output": 0}}  # Free!

    async def generate(self, prompt: str, model: str = "llama3", **kwargs) -> LLMResponse:
        import httpx
        import time

        start_time = time.time()

        async with httpx.AsyncClient() as client:
            response = await client.post(
                f"{self.base_url}/api/generate",
                json={
                    "model": model,
                    "prompt": prompt,
                    "stream": False
                }
            )

        data = response.json()
        latency_ms = int((time.time() - start_time) * 1000)

        return LLMResponse(
            content=data["response"],
            model=model,
            provider="ollama",
            usage={"input": 0, "output": 0},  # Ollama doesn't track tokens
            cost=0,
            latency_ms=latency_ms
        )

3. A/B Testing Framework

class ABTestManager:
    def __init__(self, llm_client: LLMClient, supabase_client):
        self.llm_client = llm_client
        self.supabase = supabase_client

    async def run_ab_test(self, prompt: str, variants: List[Dict]) -> Dict:
        """Run A/B test with different providers/models"""
        results = []

        for variant in variants:
            response = await self.llm_client.generate(
                prompt,
                provider=variant["provider"],
                model=variant["model"]
            )

            results.append({
                "variant": variant,
                "response": response,
                "quality_score": await self.evaluate_quality(response.content)
            })

        # Store test results
        self.supabase.table("ab_tests").insert({
            "prompt": prompt[:100],  # Store truncated prompt
            "variants": variants,
            "results": [
                {
                    "provider": r["variant"]["provider"],
                    "model": r["variant"]["model"],
                    "cost": r["response"].cost,
                    "latency_ms": r["response"].latency_ms,
                    "quality_score": r["quality_score"]
                } for r in results
            ],
            "created_at": datetime.utcnow().isoformat()
        }).execute()

        return results

Monitoring Dashboard Queries

-- Daily cost by provider
SELECT 
    provider,
    DATE(created_at) as date,
    SUM(cost) as daily_cost,
    COUNT(*) as requests
FROM llm_usage
WHERE created_at >= NOW() - INTERVAL '7 days'
GROUP BY provider, DATE(created_at)
ORDER BY date DESC, daily_cost DESC;

-- Average latency by model
SELECT 
    provider,
    model,
    AVG(latency_ms) as avg_latency,
    PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY latency_ms) as p95_latency
FROM llm_usage
WHERE created_at >= NOW() - INTERVAL '24 hours'
GROUP BY provider, model;

-- Cost per content type
SELECT 
    j.params->>'content_type' as content_type,
    l.provider,
    COUNT(*) as requests,
    SUM(l.cost) as total_cost,
    AVG(l.cost) as avg_cost_per_request
FROM llm_usage l
JOIN jobs j ON l.job_id = j.id
WHERE l.created_at >= NOW() - INTERVAL '7 days'
GROUP BY content_type, l.provider;

Best Practices

  1. Always use retry logic with exponential backoff
  2. Monitor rate limits and implement proper queuing
  3. Cache similar requests to reduce costs
  4. Use streaming for real-time applications (V2)
  5. Implement cost alerts to prevent bill shock
  6. Regular model evaluation to ensure quality
  7. Keep API keys secure using environment variables
  8. Log all requests for debugging and analytics

Estimated Costs

MVP (1000 generations/month)

  • 70% GPT-4o-mini: ~$15
  • 20% Groq Llama: ~$2
  • 10% Others: ~$3
  • Total: ~$20/month

Production (10,000 generations/month)

  • Mixed routing strategy: ~$100-150/month
  • With caching (30% hit rate): ~$70-105/month
  • Self-hosted addition: +$50 infrastructure, -$30 API costs
  • Total: ~$90-125/month