Content Production Module (CPM) Plan¶
Overview¶
The Content Production Module is the core content generation service that creates blogs, social media posts, and local-focused content using LLMs. This document outlines the implementation plan for the MVP version with clear upgrade paths to V2.
Key Features¶
- Content Generation: Produce blogs (800-2000 words), social posts (100-300 words), and local content
- Job Queuing: Handle multiple simultaneous requests using FastAPI BackgroundTasks
- SEO Optimization: Built-in prompts for 2025 SEO (long-tail keywords, E-A-T, AI snippets)
- Error Handling: Retry logic and comprehensive error logging
- Output Formatting: JSON responses with content and metadata
Technology Stack¶
- Language: Python 3.12+
- Framework: FastAPI
- LLM Libraries: OpenAI SDK, Anthropic SDK, Google Generative AI
- Async Support: asyncio, aiohttp
- Database: Supabase (for job storage and status tracking)
- Deployment: Railway
API Endpoints¶
| Endpoint | Method | Description | Request Body | Response |
|---|---|---|---|---|
/generate | POST | Queue content generation | { "topic": string, "content_type": "blog/social/local", "client_id": string, "keywords": array<string>, "length": "short/medium/long", "llm_provider": string (optional), "model": string (optional) } | { "job_id": string } |
/status/{job_id} | GET | Check job status/result | None | { "status": string, "result": object (if done) } |
/health | GET | Service health check | None | { "status": "ok" } |
Implementation Code Structure¶
Main Application (app.py)¶
from fastapi import FastAPI, BackgroundTasks, HTTPException
from pydantic import BaseModel
from typing import List, Optional, Dict
import uuid
import os
from datetime import datetime
from supabase import create_client, Client
import asyncio
import httpx
# Initialize FastAPI
app = FastAPI(title="Content Production Module", version="1.0.0")
# Initialize Supabase
supabase_url = os.getenv("SUPABASE_URL")
supabase_key = os.getenv("SUPABASE_KEY")
supabase: Client = create_client(supabase_url, supabase_key)
# Request Models
class GenerateRequest(BaseModel):
topic: str
content_type: str # blog/social/local
client_id: str
keywords: List[str] = []
length: Optional[str] = "medium"
llm_provider: Optional[str] = "openai"
model: Optional[str] = None
class JobStatus(BaseModel):
job_id: str
status: str
result: Optional[Dict] = None
error: Optional[str] = None
created_at: str
updated_at: str
# LLM Client abstraction
from llm_client import LLMClient
# Background task for content generation
async def generate_content(job_id: str, params: Dict):
try:
# Update job status to in_progress
supabase.table("jobs").update({
"status": "in_progress",
"updated_at": datetime.utcnow().isoformat()
}).eq("id", job_id).execute()
# Call Instructions Module for prompt
async with httpx.AsyncClient() as client:
im_response = await client.post(
f"{os.getenv('IM_SERVICE_URL')}/generate-prompt",
json={
"topic": params["topic"],
"client_id": params["client_id"],
"content_type": params["content_type"],
"keywords": params["keywords"]
}
)
prompt_data = im_response.json()
# Initialize LLM client
llm = LLMClient(provider=params.get("llm_provider", "openai"))
# Generate content
content = await llm.generate(
prompt=prompt_data["prompt"],
model=params.get("model")
)
# Process and format content
result = {
"content": content,
"title": extract_title(content, params["content_type"]),
"metadata": {
"word_count": len(content.split()),
"keywords_used": params["keywords"],
"llm_provider": params["llm_provider"],
"model": llm.get_model_used(),
"seo_notes": generate_seo_notes(content, params["keywords"])
}
}
# Update job with result
supabase.table("jobs").update({
"status": "completed",
"result": result,
"updated_at": datetime.utcnow().isoformat()
}).eq("id", job_id).execute()
except Exception as e:
# Log error and update job status
supabase.table("jobs").update({
"status": "failed",
"error": str(e),
"updated_at": datetime.utcnow().isoformat()
}).eq("id", job_id).execute()
# API Endpoints
@app.post("/generate", response_model=Dict[str, str])
async def generate(request: GenerateRequest, background_tasks: BackgroundTasks):
# Create job record
job_id = str(uuid.uuid4())
job_data = {
"id": job_id,
"status": "pending",
"params": request.dict(),
"created_at": datetime.utcnow().isoformat(),
"updated_at": datetime.utcnow().isoformat()
}
supabase.table("jobs").insert(job_data).execute()
# Queue background task
background_tasks.add_task(generate_content, job_id, request.dict())
return {"job_id": job_id}
@app.get("/status/{job_id}", response_model=JobStatus)
async def get_status(job_id: str):
result = supabase.table("jobs").select("*").eq("id", job_id).execute()
if not result.data:
raise HTTPException(status_code=404, detail="Job not found")
job = result.data[0]
return JobStatus(
job_id=job["id"],
status=job["status"],
result=job.get("result"),
error=job.get("error"),
created_at=job["created_at"],
updated_at=job["updated_at"]
)
@app.get("/health")
async def health():
return {"status": "ok", "service": "CPM", "version": "1.0.0"}
# Helper functions
def extract_title(content: str, content_type: str) -> str:
# Extract or generate title based on content type
if content_type == "blog":
# Look for first heading or generate from first paragraph
lines = content.split('\n')
for line in lines:
if line.strip().startswith('#'):
return line.strip('#').strip()
return content.split('\n')[0][:100]
def generate_seo_notes(content: str, keywords: List[str]) -> str:
# Basic SEO analysis
notes = []
content_lower = content.lower()
for keyword in keywords:
count = content_lower.count(keyword.lower())
if count > 0:
notes.append(f"'{keyword}' appears {count} times")
return "; ".join(notes)
LLM Client (llm_client.py)¶
import os
from typing import Optional, Dict
from openai import AsyncOpenAI
from anthropic import AsyncAnthropic
import google.generativeai as genai
class LLMClient:
def __init__(self, provider: str = "openai"):
self.provider = provider
self.model_used = None
# Initialize clients
self.clients = {
"openai": AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY")),
"anthropic": AsyncAnthropic(api_key=os.getenv("ANTHROPIC_API_KEY")),
}
# Configure Google client if needed
if provider == "google":
genai.configure(api_key=os.getenv("GOOGLE_API_KEY"))
async def generate(self, prompt: str, model: Optional[str] = None) -> str:
# Default models per provider
default_models = {
"openai": "gpt-4o-mini",
"anthropic": "claude-3-5-sonnet-20241022",
"google": "gemini-2.0-flash-exp"
}
model = model or default_models.get(self.provider)
self.model_used = model
if self.provider == "openai":
client = self.clients["openai"]
response = await client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
max_tokens=4000,
temperature=0.7
)
return response.choices[0].message.content
elif self.provider == "anthropic":
client = self.clients["anthropic"]
response = await client.messages.create(
model=model,
max_tokens=4000,
messages=[{"role": "user", "content": prompt}]
)
return response.content[0].text
elif self.provider == "google":
model_instance = genai.GenerativeModel(model)
response = await model_instance.generate_content_async(prompt)
return response.text
else:
raise ValueError(f"Unsupported provider: {self.provider}")
def get_model_used(self) -> str:
return self.model_used
Requirements (requirements.txt)¶
fastapi==0.111.0
uvicorn==0.30.1
pydantic==2.7.1
supabase==2.5.0
openai==1.35.0
anthropic==0.30.0
google-generativeai==0.6.0
httpx==0.27.0
python-dotenv==1.0.1
Railway Configuration (railway.json)¶
{
"$schema": "https://railway.app/railway.schema.json",
"build": {
"builder": "NIXPACKS"
},
"deploy": {
"startCommand": "uvicorn app:app --host 0.0.0.0 --port $PORT",
"restartPolicyType": "ON_FAILURE",
"restartPolicyMaxRetries": 3
}
}
Database Schema (Supabase)¶
Jobs Table¶
CREATE TABLE jobs (
id UUID PRIMARY KEY,
status TEXT NOT NULL CHECK (status IN ('pending', 'in_progress', 'completed', 'failed')),
params JSONB NOT NULL,
result JSONB,
error TEXT,
created_at TIMESTAMPTZ NOT NULL,
updated_at TIMESTAMPTZ NOT NULL,
client_id TEXT NOT NULL
);
-- Indexes for performance
CREATE INDEX idx_jobs_status ON jobs(status);
CREATE INDEX idx_jobs_client_id ON jobs(client_id);
CREATE INDEX idx_jobs_created_at ON jobs(created_at DESC);
Environment Variables¶
# Supabase
SUPABASE_URL=your_supabase_url
SUPABASE_KEY=your_supabase_anon_key
# LLM Providers
OPENAI_API_KEY=your_openai_key
ANTHROPIC_API_KEY=your_anthropic_key
GOOGLE_API_KEY=your_google_key
# Service URLs
IM_SERVICE_URL=https://your-im-service.railway.app
# Railway
PORT=8000
Testing Strategy¶
Unit Tests (test_app.py)¶
import pytest
from fastapi.testclient import TestClient
from unittest.mock import patch, AsyncMock
from app import app
client = TestClient(app)
def test_health_endpoint():
response = client.get("/health")
assert response.status_code == 200
assert response.json()["status"] == "ok"
@patch('app.supabase')
def test_generate_endpoint(mock_supabase):
mock_supabase.table.return_value.insert.return_value.execute.return_value = None
response = client.post("/generate", json={
"topic": "Test Topic",
"content_type": "blog",
"client_id": "test_client",
"keywords": ["test", "keyword"]
})
assert response.status_code == 200
assert "job_id" in response.json()
# Add more tests for error cases, status endpoint, etc.
Deployment Checklist¶
MVP Launch¶
- Set up Supabase project and create jobs table
- Configure environment variables in Railway
- Deploy CPM service to Railway
- Test API endpoints with curl/Postman
- Monitor logs and performance
- Set up error alerting
Performance Optimization¶
- Implement connection pooling for Supabase
- Add Redis caching for repeated prompts (V2)
- Optimize LLM calls with streaming (V2)
- Add request rate limiting
V2 Upgrade Path¶
1. Advanced Workers (Celery + Redis)¶
# celery_app.py
from celery import Celery
import os
celery_app = Celery(
'cpm',
broker=os.getenv('REDIS_URL'),
backend=os.getenv('REDIS_URL')
)
@celery_app.task
def generate_content_task(job_id: str, params: dict):
# Move generation logic here
pass
2. RAG Integration¶
# rag_client.py
from langchain.vectorstores import SupabaseVectorStore
from langchain.embeddings import OpenAIEmbeddings
class RAGClient:
def __init__(self):
self.vectorstore = SupabaseVectorStore(
client=supabase,
embedding=OpenAIEmbeddings(),
table_name="documents"
)
async def enhance_prompt(self, prompt: str, context_type: str):
# Retrieve relevant documents
docs = await self.vectorstore.similarity_search(prompt, k=5)
# Enhance prompt with retrieved context
return enhanced_prompt
3. Multi-Model Routing¶
# router.py
class ModelRouter:
def select_model(self, content_type: str, priority: str):
if priority == "cost":
return ("groq", "llama-3.1-70b")
elif priority == "quality":
return ("anthropic", "claude-3-5-sonnet")
else:
return ("openai", "gpt-4o-mini")
Monitoring and Metrics¶
Key Metrics to Track¶
- Job completion rate
- Average generation time per content type
- Token usage and costs per provider
- Error rates and types
- API response times
Logging¶
import logging
from pythonjsonlogger import jsonlogger
# Configure structured logging
logHandler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter()
logHandler.setFormatter(formatter)
logger = logging.getLogger()
logger.addHandler(logHandler)
logger.setLevel(logging.INFO)
Cost Estimation¶
MVP (Monthly)¶
- Railway hosting: ~$5-10
- Supabase: Free tier
- LLM costs (1000 generations):
- GPT-4o-mini: ~$10-20
- Total: ~$15-30/month
V2 Scaling (Monthly)¶
- Railway (with Redis): ~$20-30
- Supabase Pro: $25
- LLM costs (10,000 generations): ~$100-200
- Total: ~$145-255/month
Security Considerations¶
API Security¶
- Implement API key authentication
- Rate limiting per client
- Input validation and sanitization
- Secure storage of API keys
Data Privacy¶
- Encrypt sensitive data in Supabase
- Implement data retention policies
- Audit logging for all operations
- GDPR compliance for content storage
Success Metrics¶
MVP Goals¶
- 95% uptime
- <30s generation time for blogs
- <5s generation time for social posts
- 80% content approval rate
V2 Goals¶
- 99% uptime
- Handle 100+ concurrent jobs
- <15s generation time for blogs
- 90% content approval rate