Skip to content

Content Production Module (CPM) Plan

Overview

The Content Production Module is the core content generation service that creates blogs, social media posts, and local-focused content using LLMs. This document outlines the implementation plan for the MVP version with clear upgrade paths to V2.

Key Features

  • Content Generation: Produce blogs (800-2000 words), social posts (100-300 words), and local content
  • Job Queuing: Handle multiple simultaneous requests using FastAPI BackgroundTasks
  • SEO Optimization: Built-in prompts for 2025 SEO (long-tail keywords, E-A-T, AI snippets)
  • Error Handling: Retry logic and comprehensive error logging
  • Output Formatting: JSON responses with content and metadata

Technology Stack

  • Language: Python 3.12+
  • Framework: FastAPI
  • LLM Libraries: OpenAI SDK, Anthropic SDK, Google Generative AI
  • Async Support: asyncio, aiohttp
  • Database: Supabase (for job storage and status tracking)
  • Deployment: Railway

API Endpoints

Endpoint Method Description Request Body Response
/generate POST Queue content generation { "topic": string, "content_type": "blog/social/local", "client_id": string, "keywords": array<string>, "length": "short/medium/long", "llm_provider": string (optional), "model": string (optional) } { "job_id": string }
/status/{job_id} GET Check job status/result None { "status": string, "result": object (if done) }
/health GET Service health check None { "status": "ok" }

Implementation Code Structure

Main Application (app.py)

from fastapi import FastAPI, BackgroundTasks, HTTPException
from pydantic import BaseModel
from typing import List, Optional, Dict
import uuid
import os
from datetime import datetime
from supabase import create_client, Client
import asyncio
import httpx

# Initialize FastAPI
app = FastAPI(title="Content Production Module", version="1.0.0")

# Initialize Supabase
supabase_url = os.getenv("SUPABASE_URL")
supabase_key = os.getenv("SUPABASE_KEY")
supabase: Client = create_client(supabase_url, supabase_key)

# Request Models
class GenerateRequest(BaseModel):
    topic: str
    content_type: str  # blog/social/local
    client_id: str
    keywords: List[str] = []
    length: Optional[str] = "medium"
    llm_provider: Optional[str] = "openai"
    model: Optional[str] = None

class JobStatus(BaseModel):
    job_id: str
    status: str
    result: Optional[Dict] = None
    error: Optional[str] = None
    created_at: str
    updated_at: str

# LLM Client abstraction
from llm_client import LLMClient

# Background task for content generation
async def generate_content(job_id: str, params: Dict):
    try:
        # Update job status to in_progress
        supabase.table("jobs").update({
            "status": "in_progress",
            "updated_at": datetime.utcnow().isoformat()
        }).eq("id", job_id).execute()

        # Call Instructions Module for prompt
        async with httpx.AsyncClient() as client:
            im_response = await client.post(
                f"{os.getenv('IM_SERVICE_URL')}/generate-prompt",
                json={
                    "topic": params["topic"],
                    "client_id": params["client_id"],
                    "content_type": params["content_type"],
                    "keywords": params["keywords"]
                }
            )
            prompt_data = im_response.json()

        # Initialize LLM client
        llm = LLMClient(provider=params.get("llm_provider", "openai"))

        # Generate content
        content = await llm.generate(
            prompt=prompt_data["prompt"],
            model=params.get("model")
        )

        # Process and format content
        result = {
            "content": content,
            "title": extract_title(content, params["content_type"]),
            "metadata": {
                "word_count": len(content.split()),
                "keywords_used": params["keywords"],
                "llm_provider": params["llm_provider"],
                "model": llm.get_model_used(),
                "seo_notes": generate_seo_notes(content, params["keywords"])
            }
        }

        # Update job with result
        supabase.table("jobs").update({
            "status": "completed",
            "result": result,
            "updated_at": datetime.utcnow().isoformat()
        }).eq("id", job_id).execute()

    except Exception as e:
        # Log error and update job status
        supabase.table("jobs").update({
            "status": "failed",
            "error": str(e),
            "updated_at": datetime.utcnow().isoformat()
        }).eq("id", job_id).execute()

# API Endpoints
@app.post("/generate", response_model=Dict[str, str])
async def generate(request: GenerateRequest, background_tasks: BackgroundTasks):
    # Create job record
    job_id = str(uuid.uuid4())
    job_data = {
        "id": job_id,
        "status": "pending",
        "params": request.dict(),
        "created_at": datetime.utcnow().isoformat(),
        "updated_at": datetime.utcnow().isoformat()
    }

    supabase.table("jobs").insert(job_data).execute()

    # Queue background task
    background_tasks.add_task(generate_content, job_id, request.dict())

    return {"job_id": job_id}

@app.get("/status/{job_id}", response_model=JobStatus)
async def get_status(job_id: str):
    result = supabase.table("jobs").select("*").eq("id", job_id).execute()

    if not result.data:
        raise HTTPException(status_code=404, detail="Job not found")

    job = result.data[0]
    return JobStatus(
        job_id=job["id"],
        status=job["status"],
        result=job.get("result"),
        error=job.get("error"),
        created_at=job["created_at"],
        updated_at=job["updated_at"]
    )

@app.get("/health")
async def health():
    return {"status": "ok", "service": "CPM", "version": "1.0.0"}

# Helper functions
def extract_title(content: str, content_type: str) -> str:
    # Extract or generate title based on content type
    if content_type == "blog":
        # Look for first heading or generate from first paragraph
        lines = content.split('\n')
        for line in lines:
            if line.strip().startswith('#'):
                return line.strip('#').strip()
    return content.split('\n')[0][:100]

def generate_seo_notes(content: str, keywords: List[str]) -> str:
    # Basic SEO analysis
    notes = []
    content_lower = content.lower()

    for keyword in keywords:
        count = content_lower.count(keyword.lower())
        if count > 0:
            notes.append(f"'{keyword}' appears {count} times")

    return "; ".join(notes)

LLM Client (llm_client.py)

import os
from typing import Optional, Dict
from openai import AsyncOpenAI
from anthropic import AsyncAnthropic
import google.generativeai as genai

class LLMClient:
    def __init__(self, provider: str = "openai"):
        self.provider = provider
        self.model_used = None

        # Initialize clients
        self.clients = {
            "openai": AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY")),
            "anthropic": AsyncAnthropic(api_key=os.getenv("ANTHROPIC_API_KEY")),
        }

        # Configure Google client if needed
        if provider == "google":
            genai.configure(api_key=os.getenv("GOOGLE_API_KEY"))

    async def generate(self, prompt: str, model: Optional[str] = None) -> str:
        # Default models per provider
        default_models = {
            "openai": "gpt-4o-mini",
            "anthropic": "claude-3-5-sonnet-20241022",
            "google": "gemini-2.0-flash-exp"
        }

        model = model or default_models.get(self.provider)
        self.model_used = model

        if self.provider == "openai":
            client = self.clients["openai"]
            response = await client.chat.completions.create(
                model=model,
                messages=[{"role": "user", "content": prompt}],
                max_tokens=4000,
                temperature=0.7
            )
            return response.choices[0].message.content

        elif self.provider == "anthropic":
            client = self.clients["anthropic"]
            response = await client.messages.create(
                model=model,
                max_tokens=4000,
                messages=[{"role": "user", "content": prompt}]
            )
            return response.content[0].text

        elif self.provider == "google":
            model_instance = genai.GenerativeModel(model)
            response = await model_instance.generate_content_async(prompt)
            return response.text

        else:
            raise ValueError(f"Unsupported provider: {self.provider}")

    def get_model_used(self) -> str:
        return self.model_used

Requirements (requirements.txt)

fastapi==0.111.0
uvicorn==0.30.1
pydantic==2.7.1
supabase==2.5.0
openai==1.35.0
anthropic==0.30.0
google-generativeai==0.6.0
httpx==0.27.0
python-dotenv==1.0.1

Railway Configuration (railway.json)

{
  "$schema": "https://railway.app/railway.schema.json",
  "build": {
    "builder": "NIXPACKS"
  },
  "deploy": {
    "startCommand": "uvicorn app:app --host 0.0.0.0 --port $PORT",
    "restartPolicyType": "ON_FAILURE",
    "restartPolicyMaxRetries": 3
  }
}

Database Schema (Supabase)

Jobs Table

CREATE TABLE jobs (
  id UUID PRIMARY KEY,
  status TEXT NOT NULL CHECK (status IN ('pending', 'in_progress', 'completed', 'failed')),
  params JSONB NOT NULL,
  result JSONB,
  error TEXT,
  created_at TIMESTAMPTZ NOT NULL,
  updated_at TIMESTAMPTZ NOT NULL,
  client_id TEXT NOT NULL
);

-- Indexes for performance
CREATE INDEX idx_jobs_status ON jobs(status);
CREATE INDEX idx_jobs_client_id ON jobs(client_id);
CREATE INDEX idx_jobs_created_at ON jobs(created_at DESC);

Environment Variables

# Supabase
SUPABASE_URL=your_supabase_url
SUPABASE_KEY=your_supabase_anon_key

# LLM Providers
OPENAI_API_KEY=your_openai_key
ANTHROPIC_API_KEY=your_anthropic_key
GOOGLE_API_KEY=your_google_key

# Service URLs
IM_SERVICE_URL=https://your-im-service.railway.app

# Railway
PORT=8000

Testing Strategy

Unit Tests (test_app.py)

import pytest
from fastapi.testclient import TestClient
from unittest.mock import patch, AsyncMock
from app import app

client = TestClient(app)

def test_health_endpoint():
    response = client.get("/health")
    assert response.status_code == 200
    assert response.json()["status"] == "ok"

@patch('app.supabase')
def test_generate_endpoint(mock_supabase):
    mock_supabase.table.return_value.insert.return_value.execute.return_value = None

    response = client.post("/generate", json={
        "topic": "Test Topic",
        "content_type": "blog",
        "client_id": "test_client",
        "keywords": ["test", "keyword"]
    })

    assert response.status_code == 200
    assert "job_id" in response.json()

# Add more tests for error cases, status endpoint, etc.

Deployment Checklist

MVP Launch

  • Set up Supabase project and create jobs table
  • Configure environment variables in Railway
  • Deploy CPM service to Railway
  • Test API endpoints with curl/Postman
  • Monitor logs and performance
  • Set up error alerting

Performance Optimization

  • Implement connection pooling for Supabase
  • Add Redis caching for repeated prompts (V2)
  • Optimize LLM calls with streaming (V2)
  • Add request rate limiting

V2 Upgrade Path

1. Advanced Workers (Celery + Redis)

# celery_app.py
from celery import Celery
import os

celery_app = Celery(
    'cpm',
    broker=os.getenv('REDIS_URL'),
    backend=os.getenv('REDIS_URL')
)

@celery_app.task
def generate_content_task(job_id: str, params: dict):
    # Move generation logic here
    pass

2. RAG Integration

# rag_client.py
from langchain.vectorstores import SupabaseVectorStore
from langchain.embeddings import OpenAIEmbeddings

class RAGClient:
    def __init__(self):
        self.vectorstore = SupabaseVectorStore(
            client=supabase,
            embedding=OpenAIEmbeddings(),
            table_name="documents"
        )

    async def enhance_prompt(self, prompt: str, context_type: str):
        # Retrieve relevant documents
        docs = await self.vectorstore.similarity_search(prompt, k=5)
        # Enhance prompt with retrieved context
        return enhanced_prompt

3. Multi-Model Routing

# router.py
class ModelRouter:
    def select_model(self, content_type: str, priority: str):
        if priority == "cost":
            return ("groq", "llama-3.1-70b")
        elif priority == "quality":
            return ("anthropic", "claude-3-5-sonnet")
        else:
            return ("openai", "gpt-4o-mini")

Monitoring and Metrics

Key Metrics to Track

  • Job completion rate
  • Average generation time per content type
  • Token usage and costs per provider
  • Error rates and types
  • API response times

Logging

import logging
from pythonjsonlogger import jsonlogger

# Configure structured logging
logHandler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter()
logHandler.setFormatter(formatter)
logger = logging.getLogger()
logger.addHandler(logHandler)
logger.setLevel(logging.INFO)

Cost Estimation

MVP (Monthly)

  • Railway hosting: ~$5-10
  • Supabase: Free tier
  • LLM costs (1000 generations):
  • GPT-4o-mini: ~$10-20
  • Total: ~$15-30/month

V2 Scaling (Monthly)

  • Railway (with Redis): ~$20-30
  • Supabase Pro: $25
  • LLM costs (10,000 generations): ~$100-200
  • Total: ~$145-255/month

Security Considerations

API Security

  • Implement API key authentication
  • Rate limiting per client
  • Input validation and sanitization
  • Secure storage of API keys

Data Privacy

  • Encrypt sensitive data in Supabase
  • Implement data retention policies
  • Audit logging for all operations
  • GDPR compliance for content storage

Success Metrics

MVP Goals

  • 95% uptime
  • <30s generation time for blogs
  • <5s generation time for social posts
  • 80% content approval rate

V2 Goals

  • 99% uptime
  • Handle 100+ concurrent jobs
  • <15s generation time for blogs
  • 90% content approval rate