Skip to content

Data Flow and Integration Plan

Overview

This document details the complete data flow through the content generation system, showing how all modules interact and exchange information. It covers API contracts, data formats, error handling, and integration patterns between the frontend (Next.js), backend services (CPM/IM), and database (Supabase).

System Architecture Overview

graph TB
    UI[Next.js Frontend]
    CPM[Content Production Module]
    IM[Instructions Module]
    DB[(Supabase Database)]
    LLM[LLM Providers]

    UI -->|1. Create Request| CPM
    CPM -->|2. Get Prompt| IM
    IM -->|3. Fetch Strategy| DB
    IM -->|4. Return Prompt| CPM
    CPM -->|5. Generate Content| LLM
    CPM -->|6. Store Result| DB
    UI -->|7. Poll Status| DB
    DB -->|8. Real-time Updates| UI

Detailed Data Flow

1. Content Generation Request Flow

Step 1: User Initiates Request (UI → CPM)
  Trigger: User fills form and clicks "Generate"
  Data:
    - topic: string
    - content_type: blog|social|local
    - client_id: UUID
    - keywords: string[]
    - length: short|medium|long
    - priority: cost|quality|speed|balanced
    - llm_provider?: string
    - model?: string

Step 2: CPM Queues Job
  Action: Create job record, return job_id
  Response: { job_id: UUID }
  Side Effect: Background task started

Step 3: CPM Requests Prompt (CPM → IM)
  Request:
    - topic: string
    - content_type: string
    - client_id: UUID
    - keywords: string[]
  Response:
    - prompt: string
    - metadata: object

Step 4: IM Loads Strategy (IM → DB)
  Queries:
    - strategies WHERE client_id = X AND active = true
    - seo_rules WHERE client_id = X
    - content_preferences WHERE client_id = X
    - prompt_templates WHERE client_id = X AND content_type = Y

Step 5: CPM Generates Content (CPM → LLM)
  Request:
    - prompt: string (from IM)
    - provider: string
    - model: string
    - parameters: object
  Response:
    - content: string
    - usage: tokens object
    - cost: float
    - latency_ms: int

Step 6: CPM Stores Result (CPM → DB)
  Updates:
    - jobs table: status = completed, result = data
    - llm_usage table: cost tracking record

Step 7: UI Monitors Progress (UI → DB)
  Method 1: Polling
    - GET /api/content/status/{job_id}
    - Interval: 1-2 seconds
  Method 2: Real-time (Supabase)
    - Subscribe to jobs table changes
    - Filter: id = job_id

2. Real-time Update Flow

// Frontend subscription
const subscription = supabase
  .channel(`job-updates-${jobId}`)
  .on(
    'postgres_changes',
    {
      event: 'UPDATE',
      schema: 'public',
      table: 'jobs',
      filter: `id=eq.${jobId}`
    },
    (payload) => {
      // Update UI with new job status
      updateJobStatus(payload.new)
    }
  )
  .subscribe()

// Backend trigger (in CPM)
await supabase
  .from('jobs')
  .update({ 
    status: 'in_progress',
    updated_at: new Date().toISOString()
  })
  .eq('id', jobId)
  .execute()

API Contracts

CPM API

POST /generate
  Request:
    Content-Type: application/json
    Body:
      topic: string (required, min: 5 chars)
      content_type: enum[blog, social, local] (required)
      client_id: UUID (required)
      keywords: string[] (optional, max: 10)
      length: enum[short, medium, long] (optional, default: medium)
      priority: enum[cost, quality, speed, balanced] (optional, default: balanced)
      llm_provider: string (optional)
      model: string (optional)

  Response 200:
    Content-Type: application/json
    Body:
      job_id: UUID

  Response 400:
    Body:
      error: string
      details: object

GET /status/{job_id}
  Response 200:
    Body:
      job_id: UUID
      status: enum[pending, in_progress, completed, failed]
      result?: {
        content: string
        title: string
        metadata: {
          word_count: number
          keywords_used: string[]
          llm_provider: string
          model: string
          generation_cost: number
          generation_time_ms: number
          token_usage: {
            input: number
            output: number
          }
        }
      }
      error?: string
      created_at: ISO8601
      updated_at: ISO8601

  Response 404:
    Body:
      error: "Job not found"

IM API

POST /generate-prompt
  Request:
    Body:
      topic: string (required)
      content_type: enum[blog, social, local] (required)
      client_id: UUID (required)
      keywords: string[] (optional)

  Response 200:
    Body:
      job_id: UUID

GET /prompt/{job_id}
  Response 200 (completed):
    Body:
      prompt: string
      metadata: {
        client_id: UUID
        content_type: string
        keywords: string[]
        template_version: string
        refined: boolean
        strategy_version: number
      }

  Response 200 (pending/in_progress):
    Body:
      status: string
      error?: string

Data Models

Database Schema Relationships

-- Core relationships
jobs
  ├── client_id -> clients.id
  └── llm_usage (1:many via job_id)

clients
  ├── parent_id -> clients.id (self-referential)
  ├── strategies (1:many)
  ├── seo_rules (1:many)
  ├── content_preferences (1:many)
  └── prompt_templates (1:many)

strategies
  ├── client_id -> clients.id
  └── strategy_history (1:many via strategy_id)

prompt_jobs
  └── (standalone, referenced by job_id)

llm_usage
  └── job_id -> jobs.id

TypeScript Types (Shared)

// types/content.ts
export interface ContentRequest {
  topic: string
  content_type: 'blog' | 'social' | 'local'
  client_id: string
  keywords?: string[]
  length?: 'short' | 'medium' | 'long'
  priority?: 'cost' | 'quality' | 'speed' | 'balanced'
  llm_provider?: string
  model?: string
}

export interface Job {
  id: string
  status: 'pending' | 'in_progress' | 'completed' | 'failed'
  params: ContentRequest
  result?: ContentResult
  error?: string
  created_at: string
  updated_at: string
}

export interface ContentResult {
  content: string
  title: string
  metadata: {
    word_count: number
    keywords_used: string[]
    llm_provider: string
    model: string
    generation_cost: number
    generation_time_ms: number
    token_usage: {
      input: number
      output: number
    }
    seo_notes?: string
  }
}

export interface Strategy {
  id: string
  client_id: string
  name: string
  config: {
    content_guidelines: {
      min_words: number
      max_words: number
      required_sections: string[]
      citation_required: boolean
    }
    seo_focus: {
      primary_keywords: string[]
      keyword_density: number
      meta_description_length: number
    }
    tone: {
      formality: string
      voice: string
      engagement: string
    }
  }
  version: number
  active: boolean
}

Integration Patterns

1. Service Communication Pattern

# Base service client
class ServiceClient:
    def __init__(self, base_url: str, timeout: int = 30):
        self.base_url = base_url
        self.timeout = timeout
        self.session = None

    async def __aenter__(self):
        self.session = aiohttp.ClientSession(
            timeout=aiohttp.ClientTimeout(total=self.timeout)
        )
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.session.close()

    async def request(self, method: str, endpoint: str, **kwargs):
        url = f"{self.base_url}{endpoint}"

        try:
            async with self.session.request(method, url, **kwargs) as response:
                data = await response.json()

                if response.status >= 400:
                    raise ServiceError(
                        f"{method} {url} failed: {data.get('error', 'Unknown error')}",
                        status=response.status,
                        data=data
                    )

                return data

        except asyncio.TimeoutError:
            raise ServiceError(f"Request to {url} timed out", status=504)
        except aiohttp.ClientError as e:
            raise ServiceError(f"Network error: {str(e)}", status=503)

# IM client implementation
class IMClient(ServiceClient):
    async def generate_prompt(self, params: Dict) -> str:
        # Start generation
        result = await self.request('POST', '/generate-prompt', json=params)
        job_id = result['job_id']

        # Poll for completion
        max_attempts = 30
        for _ in range(max_attempts):
            status = await self.request('GET', f'/prompt/{job_id}')

            if 'prompt' in status:
                return status['prompt']
            elif status.get('status') == 'failed':
                raise ServiceError(f"Prompt generation failed: {status.get('error')}")

            await asyncio.sleep(1)

        raise ServiceError("Prompt generation timed out", status=504)

2. Error Handling Pattern

# Centralized error handling
class ErrorHandler:
    @staticmethod
    def handle_service_error(error: Exception, context: str) -> Dict:
        if isinstance(error, ServiceError):
            return {
                'error': f"{context}: {error.message}",
                'status_code': error.status,
                'details': error.data
            }
        elif isinstance(error, asyncio.TimeoutError):
            return {
                'error': f"{context}: Request timed out",
                'status_code': 504
            }
        else:
            logger.error(f"Unexpected error in {context}: {str(error)}")
            return {
                'error': f"{context}: Internal server error",
                'status_code': 500
            }

# Usage in CPM
try:
    prompt = await im_client.generate_prompt(params)
except Exception as e:
    error_response = ErrorHandler.handle_service_error(e, "Prompt generation")

    # Update job with error
    await supabase.table('jobs').update({
        'status': 'failed',
        'error': error_response['error']
    }).eq('id', job_id).execute()

    return

3. Retry Logic

from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type

class RetryableClient:
    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=2, max=10),
        retry=retry_if_exception_type((aiohttp.ClientError, asyncio.TimeoutError))
    )
    async def call_with_retry(self, func, *args, **kwargs):
        return await func(*args, **kwargs)

# Usage
client = RetryableClient()
result = await client.call_with_retry(
    llm_client.generate,
    prompt=prompt,
    provider='openai'
)

Caching Strategy

1. Prompt Caching

# Redis-based caching for prompts
import redis
import hashlib
import json

class PromptCache:
    def __init__(self, redis_url: str):
        self.redis = redis.from_url(redis_url)
        self.ttl = 3600  # 1 hour

    def _generate_key(self, params: Dict) -> str:
        # Create deterministic key from parameters
        sorted_params = json.dumps(params, sort_keys=True)
        return f"prompt:{hashlib.sha256(sorted_params.encode()).hexdigest()}"

    async def get(self, params: Dict) -> Optional[str]:
        key = self._generate_key(params)
        value = self.redis.get(key)
        return value.decode() if value else None

    async def set(self, params: Dict, prompt: str):
        key = self._generate_key(params)
        self.redis.setex(key, self.ttl, prompt)

# Integration in IM
cache = PromptCache(os.getenv('REDIS_URL'))

async def generate_prompt_with_cache(params: Dict) -> str:
    # Try cache first
    cached = await cache.get(params)
    if cached:
        return cached

    # Generate if not cached
    prompt = await build_prompt(params)
    await cache.set(params, prompt)

    return prompt

2. Strategy Caching

# In-memory caching with TTL
from datetime import datetime, timedelta
from typing import Dict, Tuple, Optional

class StrategyCache:
    def __init__(self, ttl_minutes: int = 5):
        self.cache: Dict[str, Tuple[Dict, datetime]] = {}
        self.ttl = timedelta(minutes=ttl_minutes)

    def get(self, client_id: str) -> Optional[Dict]:
        if client_id in self.cache:
            data, timestamp = self.cache[client_id]
            if datetime.utcnow() - timestamp < self.ttl:
                return data
            else:
                del self.cache[client_id]
        return None

    def set(self, client_id: str, data: Dict):
        self.cache[client_id] = (data, datetime.utcnow())

    def invalidate(self, client_id: str):
        self.cache.pop(client_id, None)

Monitoring and Observability

1. Structured Logging

import structlog
from datetime import datetime

# Configure structured logging
structlog.configure(
    processors=[
        structlog.stdlib.filter_by_level,
        structlog.stdlib.add_logger_name,
        structlog.stdlib.add_log_level,
        structlog.stdlib.PositionalArgumentsFormatter(),
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.StackInfoRenderer(),
        structlog.processors.format_exc_info,
        structlog.processors.UnicodeDecoder(),
        structlog.processors.JSONRenderer()
    ],
    context_class=dict,
    logger_factory=structlog.stdlib.LoggerFactory(),
    cache_logger_on_first_use=True,
)

logger = structlog.get_logger()

# Usage in data flow
async def process_content_request(job_id: str, params: Dict):
    logger.info("content_request_started", 
        job_id=job_id,
        client_id=params['client_id'],
        content_type=params['content_type']
    )

    try:
        # Process request
        result = await generate_content(params)

        logger.info("content_request_completed",
            job_id=job_id,
            word_count=result['metadata']['word_count'],
            cost=result['metadata']['generation_cost'],
            duration_ms=result['metadata']['generation_time_ms']
        )

    except Exception as e:
        logger.error("content_request_failed",
            job_id=job_id,
            error=str(e),
            exc_info=True
        )
        raise

2. Metrics Collection

from prometheus_client import Counter, Histogram, Gauge

# Define metrics
content_requests = Counter('content_requests_total', 'Total content requests', ['client_id', 'content_type'])
content_errors = Counter('content_errors_total', 'Total content generation errors', ['client_id', 'error_type'])
generation_duration = Histogram('generation_duration_seconds', 'Content generation duration')
active_jobs = Gauge('active_jobs', 'Number of active jobs')

# Usage in flow
@generation_duration.time()
async def generate_with_metrics(params: Dict):
    content_requests.labels(
        client_id=params['client_id'],
        content_type=params['content_type']
    ).inc()

    active_jobs.inc()
    try:
        result = await generate_content(params)
        return result
    except Exception as e:
        content_errors.labels(
            client_id=params['client_id'],
            error_type=type(e).__name__
        ).inc()
        raise
    finally:
        active_jobs.dec()

Security Considerations

1. API Authentication

# JWT validation middleware
from fastapi import HTTPException, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt

security = HTTPBearer()

async def verify_token(credentials: HTTPAuthorizationCredentials = Security(security)):
    token = credentials.credentials

    try:
        payload = jwt.decode(
            token,
            os.getenv('JWT_SECRET'),
            algorithms=['HS256']
        )
        return payload
    except jwt.ExpiredSignatureError:
        raise HTTPException(status_code=401, detail="Token expired")
    except jwt.InvalidTokenError:
        raise HTTPException(status_code=401, detail="Invalid token")

# Apply to endpoints
@app.post("/generate", dependencies=[Security(verify_token)])
async def generate_content(request: ContentRequest):
    # Process request
    pass

2. Input Validation

from pydantic import BaseModel, validator, constr, conlist

class SecureContentRequest(BaseModel):
    topic: constr(min_length=5, max_length=1000, strip_whitespace=True)
    content_type: Literal['blog', 'social', 'local']
    client_id: UUID
    keywords: conlist(str, max_items=10) = []

    @validator('topic')
    def sanitize_topic(cls, v):
        # Remove potential injection attempts
        forbidden_patterns = ['<script', 'javascript:', 'onerror=']
        for pattern in forbidden_patterns:
            if pattern.lower() in v.lower():
                raise ValueError(f"Invalid content in topic")
        return v

    @validator('keywords', each_item=True)
    def validate_keywords(cls, v):
        if len(v) > 50:
            raise ValueError("Keyword too long")
        return v.strip()

Performance Optimization

1. Connection Pooling

# Supabase connection pool
from supabase import create_client
from functools import lru_cache

@lru_cache(maxsize=1)
def get_supabase_client():
    return create_client(
        os.getenv('SUPABASE_URL'),
        os.getenv('SUPABASE_KEY'),
        options={
            'db': {
                'pool': {
                    'min': 2,
                    'max': 10
                }
            }
        }
    )

# HTTP connection pool for service calls
import aiohttp

connector = aiohttp.TCPConnector(
    limit=100,  # Total connection pool size
    limit_per_host=30  # Per-host limit
)

session = aiohttp.ClientSession(connector=connector)

2. Batch Processing

# Batch job processing for efficiency
async def process_batch_requests(requests: List[ContentRequest]):
    # Group by similar parameters for potential caching
    grouped = {}
    for req in requests:
        key = (req.client_id, req.content_type)
        if key not in grouped:
            grouped[key] = []
        grouped[key].append(req)

    # Process groups in parallel
    tasks = []
    for (client_id, content_type), group_requests in grouped.items():
        # Fetch strategy once per group
        strategy = await get_strategy(client_id)

        for req in group_requests:
            task = process_with_strategy(req, strategy)
            tasks.append(task)

    results = await asyncio.gather(*tasks, return_exceptions=True)
    return results

Disaster Recovery

1. Job Recovery

-- Find stuck jobs (in_progress for > 5 minutes)
UPDATE jobs 
SET status = 'failed',
    error = 'Job timeout - marked for retry',
    updated_at = NOW()
WHERE status = 'in_progress' 
  AND updated_at < NOW() - INTERVAL '5 minutes';

-- Retry failed jobs
INSERT INTO jobs (id, status, params, created_at, updated_at, client_id)
SELECT 
    gen_random_uuid(),
    'pending',
    params,
    NOW(),
    NOW(),
    client_id
FROM jobs
WHERE status = 'failed'
  AND error LIKE '%timeout%'
  AND created_at > NOW() - INTERVAL '1 hour';

2. Circuit Breaker Pattern

from circuit_breaker import CircuitBreaker

class ServiceCircuitBreaker:
    def __init__(self, failure_threshold=5, recovery_timeout=60):
        self.breaker = CircuitBreaker(
            failure_threshold=failure_threshold,
            recovery_timeout=recovery_timeout
        )

    async def call_service(self, func, *args, **kwargs):
        @self.breaker
        async def wrapped():
            return await func(*args, **kwargs)

        try:
            return await wrapped()
        except Exception as e:
            if self.breaker.current_state == 'open':
                # Use fallback
                return await self.fallback_response(*args, **kwargs)
            raise

    async def fallback_response(self, *args, **kwargs):
        # Return cached or default response
        return {"error": "Service temporarily unavailable", "fallback": True}

Testing Strategy

1. Integration Tests

# Test complete flow
import pytest
from httpx import AsyncClient

@pytest.mark.asyncio
async def test_content_generation_flow():
    async with AsyncClient(app=app, base_url="http://test") as client:
        # Step 1: Create request
        response = await client.post("/generate", json={
            "topic": "Solar energy benefits",
            "content_type": "blog",
            "client_id": "test-client-id",
            "keywords": ["solar", "renewable"]
        })

        assert response.status_code == 200
        job_id = response.json()["job_id"]

        # Step 2: Poll for completion
        max_attempts = 10
        for _ in range(max_attempts):
            status_response = await client.get(f"/status/{job_id}")
            status_data = status_response.json()

            if status_data["status"] == "completed":
                assert "content" in status_data["result"]
                assert len(status_data["result"]["content"]) > 100
                break

            await asyncio.sleep(1)
        else:
            pytest.fail("Job did not complete in time")

2. Load Testing

# Locust load test
from locust import HttpUser, task, between

class ContentGenerationUser(HttpUser):
    wait_time = between(1, 3)

    @task
    def create_blog_content(self):
        self.client.post("/generate", json={
            "topic": f"Test topic {random.randint(1, 1000)}",
            "content_type": "blog",
            "client_id": "load-test-client",
            "keywords": ["test", "load"]
        })

    @task
    def check_status(self):
        # Assume we have some job IDs from previous requests
        if hasattr(self, 'job_ids') and self.job_ids:
            job_id = random.choice(self.job_ids)
            self.client.get(f"/status/{job_id}")

V2 Enhancements

1. Event-Driven Architecture

# Kafka integration for event streaming
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer

class EventBus:
    def __init__(self, bootstrap_servers: str):
        self.producer = AIOKafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode()
        )

    async def publish(self, topic: str, event: Dict):
        await self.producer.send(topic, value={
            'timestamp': datetime.utcnow().isoformat(),
            'event': event
        })

    async def subscribe(self, topic: str, handler):
        consumer = AIOKafkaConsumer(
            topic,
            bootstrap_servers=self.bootstrap_servers,
            value_deserializer=lambda v: json.loads(v.decode())
        )

        async for msg in consumer:
            await handler(msg.value)

2. GraphQL API

type Query {
  job(id: ID!): Job
  jobs(clientId: ID!, status: JobStatus, limit: Int = 10): [Job!]!
  analytics(clientId: ID!, dateRange: DateRange!): Analytics!
}

type Mutation {
  generateContent(input: ContentInput!): GenerateContentResponse!
  retryJob(id: ID!): Job!
  cancelJob(id: ID!): Job!
}

type Subscription {
  jobUpdates(id: ID!): Job!
  clientActivity(clientId: ID!): ActivityEvent!
}

type Job {
  id: ID!
  status: JobStatus!
  params: ContentParams!
  result: ContentResult
  error: String
  createdAt: DateTime!
  updatedAt: DateTime!
}

3. Distributed Tracing

# OpenTelemetry integration
from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode

tracer = trace.get_tracer(__name__)

async def traced_content_generation(job_id: str, params: Dict):
    with tracer.start_as_current_span("content_generation") as span:
        span.set_attribute("job.id", job_id)
        span.set_attribute("client.id", params['client_id'])
        span.set_attribute("content.type", params['content_type'])

        try:
            # Get prompt
            with tracer.start_as_current_span("get_prompt"):
                prompt = await get_prompt(params)

            # Generate content
            with tracer.start_as_current_span("llm_generation"):
                content = await generate_content(prompt)

            span.set_status(Status(StatusCode.OK))
            return content

        except Exception as e:
            span.set_status(Status(StatusCode.ERROR, str(e)))
            span.record_exception(e)
            raise