Data Flow and Integration Plan¶
Overview¶
This document details the complete data flow through the content generation system, showing how all modules interact and exchange information. It covers API contracts, data formats, error handling, and integration patterns between the frontend (Next.js), backend services (CPM/IM), and database (Supabase).
System Architecture Overview¶
graph TB
UI[Next.js Frontend]
CPM[Content Production Module]
IM[Instructions Module]
DB[(Supabase Database)]
LLM[LLM Providers]
UI -->|1. Create Request| CPM
CPM -->|2. Get Prompt| IM
IM -->|3. Fetch Strategy| DB
IM -->|4. Return Prompt| CPM
CPM -->|5. Generate Content| LLM
CPM -->|6. Store Result| DB
UI -->|7. Poll Status| DB
DB -->|8. Real-time Updates| UI Detailed Data Flow¶
1. Content Generation Request Flow¶
Step 1: User Initiates Request (UI → CPM)
Trigger: User fills form and clicks "Generate"
Data:
- topic: string
- content_type: blog|social|local
- client_id: UUID
- keywords: string[]
- length: short|medium|long
- priority: cost|quality|speed|balanced
- llm_provider?: string
- model?: string
Step 2: CPM Queues Job
Action: Create job record, return job_id
Response: { job_id: UUID }
Side Effect: Background task started
Step 3: CPM Requests Prompt (CPM → IM)
Request:
- topic: string
- content_type: string
- client_id: UUID
- keywords: string[]
Response:
- prompt: string
- metadata: object
Step 4: IM Loads Strategy (IM → DB)
Queries:
- strategies WHERE client_id = X AND active = true
- seo_rules WHERE client_id = X
- content_preferences WHERE client_id = X
- prompt_templates WHERE client_id = X AND content_type = Y
Step 5: CPM Generates Content (CPM → LLM)
Request:
- prompt: string (from IM)
- provider: string
- model: string
- parameters: object
Response:
- content: string
- usage: tokens object
- cost: float
- latency_ms: int
Step 6: CPM Stores Result (CPM → DB)
Updates:
- jobs table: status = completed, result = data
- llm_usage table: cost tracking record
Step 7: UI Monitors Progress (UI → DB)
Method 1: Polling
- GET /api/content/status/{job_id}
- Interval: 1-2 seconds
Method 2: Real-time (Supabase)
- Subscribe to jobs table changes
- Filter: id = job_id
2. Real-time Update Flow¶
// Frontend subscription
const subscription = supabase
.channel(`job-updates-${jobId}`)
.on(
'postgres_changes',
{
event: 'UPDATE',
schema: 'public',
table: 'jobs',
filter: `id=eq.${jobId}`
},
(payload) => {
// Update UI with new job status
updateJobStatus(payload.new)
}
)
.subscribe()
// Backend trigger (in CPM)
await supabase
.from('jobs')
.update({
status: 'in_progress',
updated_at: new Date().toISOString()
})
.eq('id', jobId)
.execute()
API Contracts¶
CPM API¶
POST /generate
Request:
Content-Type: application/json
Body:
topic: string (required, min: 5 chars)
content_type: enum[blog, social, local] (required)
client_id: UUID (required)
keywords: string[] (optional, max: 10)
length: enum[short, medium, long] (optional, default: medium)
priority: enum[cost, quality, speed, balanced] (optional, default: balanced)
llm_provider: string (optional)
model: string (optional)
Response 200:
Content-Type: application/json
Body:
job_id: UUID
Response 400:
Body:
error: string
details: object
GET /status/{job_id}
Response 200:
Body:
job_id: UUID
status: enum[pending, in_progress, completed, failed]
result?: {
content: string
title: string
metadata: {
word_count: number
keywords_used: string[]
llm_provider: string
model: string
generation_cost: number
generation_time_ms: number
token_usage: {
input: number
output: number
}
}
}
error?: string
created_at: ISO8601
updated_at: ISO8601
Response 404:
Body:
error: "Job not found"
IM API¶
POST /generate-prompt
Request:
Body:
topic: string (required)
content_type: enum[blog, social, local] (required)
client_id: UUID (required)
keywords: string[] (optional)
Response 200:
Body:
job_id: UUID
GET /prompt/{job_id}
Response 200 (completed):
Body:
prompt: string
metadata: {
client_id: UUID
content_type: string
keywords: string[]
template_version: string
refined: boolean
strategy_version: number
}
Response 200 (pending/in_progress):
Body:
status: string
error?: string
Data Models¶
Database Schema Relationships¶
-- Core relationships
jobs
├── client_id -> clients.id
└── llm_usage (1:many via job_id)
clients
├── parent_id -> clients.id (self-referential)
├── strategies (1:many)
├── seo_rules (1:many)
├── content_preferences (1:many)
└── prompt_templates (1:many)
strategies
├── client_id -> clients.id
└── strategy_history (1:many via strategy_id)
prompt_jobs
└── (standalone, referenced by job_id)
llm_usage
└── job_id -> jobs.id
TypeScript Types (Shared)¶
// types/content.ts
export interface ContentRequest {
topic: string
content_type: 'blog' | 'social' | 'local'
client_id: string
keywords?: string[]
length?: 'short' | 'medium' | 'long'
priority?: 'cost' | 'quality' | 'speed' | 'balanced'
llm_provider?: string
model?: string
}
export interface Job {
id: string
status: 'pending' | 'in_progress' | 'completed' | 'failed'
params: ContentRequest
result?: ContentResult
error?: string
created_at: string
updated_at: string
}
export interface ContentResult {
content: string
title: string
metadata: {
word_count: number
keywords_used: string[]
llm_provider: string
model: string
generation_cost: number
generation_time_ms: number
token_usage: {
input: number
output: number
}
seo_notes?: string
}
}
export interface Strategy {
id: string
client_id: string
name: string
config: {
content_guidelines: {
min_words: number
max_words: number
required_sections: string[]
citation_required: boolean
}
seo_focus: {
primary_keywords: string[]
keyword_density: number
meta_description_length: number
}
tone: {
formality: string
voice: string
engagement: string
}
}
version: number
active: boolean
}
Integration Patterns¶
1. Service Communication Pattern¶
# Base service client
class ServiceClient:
def __init__(self, base_url: str, timeout: int = 30):
self.base_url = base_url
self.timeout = timeout
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=self.timeout)
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.session.close()
async def request(self, method: str, endpoint: str, **kwargs):
url = f"{self.base_url}{endpoint}"
try:
async with self.session.request(method, url, **kwargs) as response:
data = await response.json()
if response.status >= 400:
raise ServiceError(
f"{method} {url} failed: {data.get('error', 'Unknown error')}",
status=response.status,
data=data
)
return data
except asyncio.TimeoutError:
raise ServiceError(f"Request to {url} timed out", status=504)
except aiohttp.ClientError as e:
raise ServiceError(f"Network error: {str(e)}", status=503)
# IM client implementation
class IMClient(ServiceClient):
async def generate_prompt(self, params: Dict) -> str:
# Start generation
result = await self.request('POST', '/generate-prompt', json=params)
job_id = result['job_id']
# Poll for completion
max_attempts = 30
for _ in range(max_attempts):
status = await self.request('GET', f'/prompt/{job_id}')
if 'prompt' in status:
return status['prompt']
elif status.get('status') == 'failed':
raise ServiceError(f"Prompt generation failed: {status.get('error')}")
await asyncio.sleep(1)
raise ServiceError("Prompt generation timed out", status=504)
2. Error Handling Pattern¶
# Centralized error handling
class ErrorHandler:
@staticmethod
def handle_service_error(error: Exception, context: str) -> Dict:
if isinstance(error, ServiceError):
return {
'error': f"{context}: {error.message}",
'status_code': error.status,
'details': error.data
}
elif isinstance(error, asyncio.TimeoutError):
return {
'error': f"{context}: Request timed out",
'status_code': 504
}
else:
logger.error(f"Unexpected error in {context}: {str(error)}")
return {
'error': f"{context}: Internal server error",
'status_code': 500
}
# Usage in CPM
try:
prompt = await im_client.generate_prompt(params)
except Exception as e:
error_response = ErrorHandler.handle_service_error(e, "Prompt generation")
# Update job with error
await supabase.table('jobs').update({
'status': 'failed',
'error': error_response['error']
}).eq('id', job_id).execute()
return
3. Retry Logic¶
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
class RetryableClient:
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
retry=retry_if_exception_type((aiohttp.ClientError, asyncio.TimeoutError))
)
async def call_with_retry(self, func, *args, **kwargs):
return await func(*args, **kwargs)
# Usage
client = RetryableClient()
result = await client.call_with_retry(
llm_client.generate,
prompt=prompt,
provider='openai'
)
Caching Strategy¶
1. Prompt Caching¶
# Redis-based caching for prompts
import redis
import hashlib
import json
class PromptCache:
def __init__(self, redis_url: str):
self.redis = redis.from_url(redis_url)
self.ttl = 3600 # 1 hour
def _generate_key(self, params: Dict) -> str:
# Create deterministic key from parameters
sorted_params = json.dumps(params, sort_keys=True)
return f"prompt:{hashlib.sha256(sorted_params.encode()).hexdigest()}"
async def get(self, params: Dict) -> Optional[str]:
key = self._generate_key(params)
value = self.redis.get(key)
return value.decode() if value else None
async def set(self, params: Dict, prompt: str):
key = self._generate_key(params)
self.redis.setex(key, self.ttl, prompt)
# Integration in IM
cache = PromptCache(os.getenv('REDIS_URL'))
async def generate_prompt_with_cache(params: Dict) -> str:
# Try cache first
cached = await cache.get(params)
if cached:
return cached
# Generate if not cached
prompt = await build_prompt(params)
await cache.set(params, prompt)
return prompt
2. Strategy Caching¶
# In-memory caching with TTL
from datetime import datetime, timedelta
from typing import Dict, Tuple, Optional
class StrategyCache:
def __init__(self, ttl_minutes: int = 5):
self.cache: Dict[str, Tuple[Dict, datetime]] = {}
self.ttl = timedelta(minutes=ttl_minutes)
def get(self, client_id: str) -> Optional[Dict]:
if client_id in self.cache:
data, timestamp = self.cache[client_id]
if datetime.utcnow() - timestamp < self.ttl:
return data
else:
del self.cache[client_id]
return None
def set(self, client_id: str, data: Dict):
self.cache[client_id] = (data, datetime.utcnow())
def invalidate(self, client_id: str):
self.cache.pop(client_id, None)
Monitoring and Observability¶
1. Structured Logging¶
import structlog
from datetime import datetime
# Configure structured logging
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
structlog.processors.JSONRenderer()
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
cache_logger_on_first_use=True,
)
logger = structlog.get_logger()
# Usage in data flow
async def process_content_request(job_id: str, params: Dict):
logger.info("content_request_started",
job_id=job_id,
client_id=params['client_id'],
content_type=params['content_type']
)
try:
# Process request
result = await generate_content(params)
logger.info("content_request_completed",
job_id=job_id,
word_count=result['metadata']['word_count'],
cost=result['metadata']['generation_cost'],
duration_ms=result['metadata']['generation_time_ms']
)
except Exception as e:
logger.error("content_request_failed",
job_id=job_id,
error=str(e),
exc_info=True
)
raise
2. Metrics Collection¶
from prometheus_client import Counter, Histogram, Gauge
# Define metrics
content_requests = Counter('content_requests_total', 'Total content requests', ['client_id', 'content_type'])
content_errors = Counter('content_errors_total', 'Total content generation errors', ['client_id', 'error_type'])
generation_duration = Histogram('generation_duration_seconds', 'Content generation duration')
active_jobs = Gauge('active_jobs', 'Number of active jobs')
# Usage in flow
@generation_duration.time()
async def generate_with_metrics(params: Dict):
content_requests.labels(
client_id=params['client_id'],
content_type=params['content_type']
).inc()
active_jobs.inc()
try:
result = await generate_content(params)
return result
except Exception as e:
content_errors.labels(
client_id=params['client_id'],
error_type=type(e).__name__
).inc()
raise
finally:
active_jobs.dec()
Security Considerations¶
1. API Authentication¶
# JWT validation middleware
from fastapi import HTTPException, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt
security = HTTPBearer()
async def verify_token(credentials: HTTPAuthorizationCredentials = Security(security)):
token = credentials.credentials
try:
payload = jwt.decode(
token,
os.getenv('JWT_SECRET'),
algorithms=['HS256']
)
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token expired")
except jwt.InvalidTokenError:
raise HTTPException(status_code=401, detail="Invalid token")
# Apply to endpoints
@app.post("/generate", dependencies=[Security(verify_token)])
async def generate_content(request: ContentRequest):
# Process request
pass
2. Input Validation¶
from pydantic import BaseModel, validator, constr, conlist
class SecureContentRequest(BaseModel):
topic: constr(min_length=5, max_length=1000, strip_whitespace=True)
content_type: Literal['blog', 'social', 'local']
client_id: UUID
keywords: conlist(str, max_items=10) = []
@validator('topic')
def sanitize_topic(cls, v):
# Remove potential injection attempts
forbidden_patterns = ['<script', 'javascript:', 'onerror=']
for pattern in forbidden_patterns:
if pattern.lower() in v.lower():
raise ValueError(f"Invalid content in topic")
return v
@validator('keywords', each_item=True)
def validate_keywords(cls, v):
if len(v) > 50:
raise ValueError("Keyword too long")
return v.strip()
Performance Optimization¶
1. Connection Pooling¶
# Supabase connection pool
from supabase import create_client
from functools import lru_cache
@lru_cache(maxsize=1)
def get_supabase_client():
return create_client(
os.getenv('SUPABASE_URL'),
os.getenv('SUPABASE_KEY'),
options={
'db': {
'pool': {
'min': 2,
'max': 10
}
}
}
)
# HTTP connection pool for service calls
import aiohttp
connector = aiohttp.TCPConnector(
limit=100, # Total connection pool size
limit_per_host=30 # Per-host limit
)
session = aiohttp.ClientSession(connector=connector)
2. Batch Processing¶
# Batch job processing for efficiency
async def process_batch_requests(requests: List[ContentRequest]):
# Group by similar parameters for potential caching
grouped = {}
for req in requests:
key = (req.client_id, req.content_type)
if key not in grouped:
grouped[key] = []
grouped[key].append(req)
# Process groups in parallel
tasks = []
for (client_id, content_type), group_requests in grouped.items():
# Fetch strategy once per group
strategy = await get_strategy(client_id)
for req in group_requests:
task = process_with_strategy(req, strategy)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
Disaster Recovery¶
1. Job Recovery¶
-- Find stuck jobs (in_progress for > 5 minutes)
UPDATE jobs
SET status = 'failed',
error = 'Job timeout - marked for retry',
updated_at = NOW()
WHERE status = 'in_progress'
AND updated_at < NOW() - INTERVAL '5 minutes';
-- Retry failed jobs
INSERT INTO jobs (id, status, params, created_at, updated_at, client_id)
SELECT
gen_random_uuid(),
'pending',
params,
NOW(),
NOW(),
client_id
FROM jobs
WHERE status = 'failed'
AND error LIKE '%timeout%'
AND created_at > NOW() - INTERVAL '1 hour';
2. Circuit Breaker Pattern¶
from circuit_breaker import CircuitBreaker
class ServiceCircuitBreaker:
def __init__(self, failure_threshold=5, recovery_timeout=60):
self.breaker = CircuitBreaker(
failure_threshold=failure_threshold,
recovery_timeout=recovery_timeout
)
async def call_service(self, func, *args, **kwargs):
@self.breaker
async def wrapped():
return await func(*args, **kwargs)
try:
return await wrapped()
except Exception as e:
if self.breaker.current_state == 'open':
# Use fallback
return await self.fallback_response(*args, **kwargs)
raise
async def fallback_response(self, *args, **kwargs):
# Return cached or default response
return {"error": "Service temporarily unavailable", "fallback": True}
Testing Strategy¶
1. Integration Tests¶
# Test complete flow
import pytest
from httpx import AsyncClient
@pytest.mark.asyncio
async def test_content_generation_flow():
async with AsyncClient(app=app, base_url="http://test") as client:
# Step 1: Create request
response = await client.post("/generate", json={
"topic": "Solar energy benefits",
"content_type": "blog",
"client_id": "test-client-id",
"keywords": ["solar", "renewable"]
})
assert response.status_code == 200
job_id = response.json()["job_id"]
# Step 2: Poll for completion
max_attempts = 10
for _ in range(max_attempts):
status_response = await client.get(f"/status/{job_id}")
status_data = status_response.json()
if status_data["status"] == "completed":
assert "content" in status_data["result"]
assert len(status_data["result"]["content"]) > 100
break
await asyncio.sleep(1)
else:
pytest.fail("Job did not complete in time")
2. Load Testing¶
# Locust load test
from locust import HttpUser, task, between
class ContentGenerationUser(HttpUser):
wait_time = between(1, 3)
@task
def create_blog_content(self):
self.client.post("/generate", json={
"topic": f"Test topic {random.randint(1, 1000)}",
"content_type": "blog",
"client_id": "load-test-client",
"keywords": ["test", "load"]
})
@task
def check_status(self):
# Assume we have some job IDs from previous requests
if hasattr(self, 'job_ids') and self.job_ids:
job_id = random.choice(self.job_ids)
self.client.get(f"/status/{job_id}")
V2 Enhancements¶
1. Event-Driven Architecture¶
# Kafka integration for event streaming
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
class EventBus:
def __init__(self, bootstrap_servers: str):
self.producer = AIOKafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode()
)
async def publish(self, topic: str, event: Dict):
await self.producer.send(topic, value={
'timestamp': datetime.utcnow().isoformat(),
'event': event
})
async def subscribe(self, topic: str, handler):
consumer = AIOKafkaConsumer(
topic,
bootstrap_servers=self.bootstrap_servers,
value_deserializer=lambda v: json.loads(v.decode())
)
async for msg in consumer:
await handler(msg.value)
2. GraphQL API¶
type Query {
job(id: ID!): Job
jobs(clientId: ID!, status: JobStatus, limit: Int = 10): [Job!]!
analytics(clientId: ID!, dateRange: DateRange!): Analytics!
}
type Mutation {
generateContent(input: ContentInput!): GenerateContentResponse!
retryJob(id: ID!): Job!
cancelJob(id: ID!): Job!
}
type Subscription {
jobUpdates(id: ID!): Job!
clientActivity(clientId: ID!): ActivityEvent!
}
type Job {
id: ID!
status: JobStatus!
params: ContentParams!
result: ContentResult
error: String
createdAt: DateTime!
updatedAt: DateTime!
}
3. Distributed Tracing¶
# OpenTelemetry integration
from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode
tracer = trace.get_tracer(__name__)
async def traced_content_generation(job_id: str, params: Dict):
with tracer.start_as_current_span("content_generation") as span:
span.set_attribute("job.id", job_id)
span.set_attribute("client.id", params['client_id'])
span.set_attribute("content.type", params['content_type'])
try:
# Get prompt
with tracer.start_as_current_span("get_prompt"):
prompt = await get_prompt(params)
# Generate content
with tracer.start_as_current_span("llm_generation"):
content = await generate_content(prompt)
span.set_status(Status(StatusCode.OK))
return content
except Exception as e:
span.set_status(Status(StatusCode.ERROR, str(e)))
span.record_exception(e)
raise