Skip to content

HG Content Generation System - Developer Guide

Table of Contents

  1. Development Setup and Prerequisites
  2. Project Structure and Architecture
  3. Code Style and Conventions
  4. Testing Strategies and Frameworks
  5. Contributing Guidelines and Workflow
  6. Module Development Guides
  7. API Development and Integration
  8. Database Development and Migrations
  9. Debugging and Troubleshooting
  10. Performance Optimization Techniques

Development Setup and Prerequisites

System Requirements

Node.js Environment

  • Node.js: >= 20.0.0 (LTS recommended; required for Turbo dev scripts)
  • pnpm: >= 8.0.0 (install via corepack enable pnpm)
  • npm: For global packages only

Python Environment

  • Python: >= 3.12
  • pip: Latest version
  • venv: Virtual environment management

Database and Services

  • Supabase CLI: For local database development
  • Docker: For containerized services (optional)
  • Redis: For caching and job queues

Initial Setup

1. Clone and Install Dependencies

# Clone the repository
git clone https://github.com/your-org/hg-content.git
cd hg-content

# Enable pnpm via Corepack (Node 20+)
corepack enable pnpm

# Install Node.js dependencies
pnpm install

# Set up Python environments for each service
cd apps/cpm
python -m venv venv
source venv/bin/activate  # On Windows: venv\Scripts\activate
pip install -r requirements.txt

cd ../im
python -m venv venv
source venv/bin/activate
pip install -r requirements.txt

cd ../../smm
python -m venv venv
source venv/bin/activate
pip install -r requirements.txt

2. Environment Configuration

Create environment files for each service:

Frontend (.env.local)

# Supabase Configuration
NEXT_PUBLIC_SUPABASE_URL=http://localhost:54321
NEXT_PUBLIC_SUPABASE_ANON_KEY=your-local-anon-key
SUPABASE_SERVICE_ROLE_KEY=your-local-service-role-key

# Local service URLs
NEXT_PUBLIC_CPM_API_URL=http://localhost:8001
NEXT_PUBLIC_SMM_API_URL=http://localhost:8002
NEXT_PUBLIC_IM_API_URL=http://localhost:8003

# Development flags
NODE_ENV=development
NEXT_PUBLIC_DEBUG=true

Backend Services (.env)

# Database
SUPABASE_URL=http://localhost:54321
SUPABASE_SERVICE_ROLE_KEY=your-local-service-role-key

# Redis (optional for local development)
REDIS_URL=redis://localhost:6379

# LLM API Keys (use your own keys)
OPENAI_API_KEY=sk-your-openai-key
ANTHROPIC_API_KEY=sk-ant-your-anthropic-key
GOOGLE_API_KEY=your-google-ai-key
GROQ_API_KEY=gsk_your-groq-key
OPENROUTER_API_KEY=sk-openrouter-key
OPENROUTER_DEFAULT_MODEL=openai/gpt-4o-mini
OPENROUTER_ORIGIN=http://localhost:3000
OPENROUTER_APP_NAME="HG Content (Local)"
OPENROUTER_TIMEOUT=120

# Inter-service communication
IM_BASE_URL=http://localhost:8003
SMM_BASE_URL=http://localhost:8002
CPM_BASE_URL=http://localhost:8001

# CPM client authentication (JSON map of client_id -> plaintext key)
CPM_CLIENT_API_KEYS={"550e8400-e29b-41d4-a716-446655440000":"cpm_test_example_key"}

# Development settings
LOG_LEVEL=DEBUG
ENVIRONMENT=development

3. Database Setup

# Install Supabase CLI
npm install -g @supabase/cli

# Start local Supabase instance
supabase start

# Push database schema
supabase db push

# Run seed data (optional)
cd scripts
python seed_database.py

4. Start Development Environment

# Start all services with Turbo
pnpm dev

# Or start services individually:
# Terminal 1: Frontend
cd apps/frontend && pnpm dev

# Terminal 2: CPM Service
cd apps/cpm && source venv/bin/activate && uvicorn app:app --reload --port 8001

# Terminal 3: SMM Service
cd smm && source venv/bin/activate && uvicorn app.api:app --reload --port 8002

# Terminal 4: IM Service (when available)
cd apps/im && source venv/bin/activate && uvicorn app:app --reload --port 8003

Development Tools Setup

VSCode Configuration

Create .vscode/settings.json:

{
  "python.defaultInterpreterPath": "./venv/bin/python",
  "python.formatting.provider": "black",
  "python.linting.enabled": true,
  "python.linting.pylintEnabled": true,
  "typescript.preferences.importModuleSpecifier": "relative",
  "editor.formatOnSave": true,
  "editor.codeActionsOnSave": {
    "source.organizeImports": true,
    "source.fixAll.eslint": true
  },
  "[python]": {
    "editor.defaultFormatter": "ms-python.black-formatter"
  },
  "[typescript]": {
    "editor.defaultFormatter": "esbenp.prettier-vscode"
  },
  "[typescriptreact]": {
    "editor.defaultFormatter": "esbenp.prettier-vscode"
  }
}

Git Hooks Setup

# Install pre-commit hooks
npm install -g @commitlint/cli @commitlint/config-conventional
pip install pre-commit

# Set up pre-commit
pre-commit install

Project Structure and Architecture

Monorepo Organization

hg-content/
├── apps/                          # Application modules
│   ├── frontend/                  # Next.js web application
│   │   ├── app/                   # Next.js App Router
│   │   ├── components/            # React components
│   │   ├── lib/                   # Utilities and configurations
│   │   ├── hooks/                 # Custom React hooks
│   │   ├── stores/                # Zustand state management
│   │   └── __tests__/             # Frontend tests
│   ├── cpm/                       # Content Production Module
│   │   ├── app.py                 # FastAPI application
│   │   ├── content_processor.py   # Core content processing
│   │   ├── llm_client.py          # LLM provider clients
│   │   ├── database.py            # Database operations
│   │   └── tests/                 # CPM tests
│   └── im/                        # Instructions Module (future)
├── smm/                           # Strategy Management Module
│   ├── app/                       # FastAPI application
│   ├── lib/                       # Core business logic
│   ├── sql/                       # Database migrations
│   └── tests/                     # SMM tests
├── packages/                      # Shared utilities (future)
├── supabase/                      # Database schema and migrations
├── docs/                          # Documentation
├── scripts/                       # Development and deployment scripts
├── .github/                       # CI/CD workflows
├── package.json                   # Root package configuration
├── turbo.json                     # Turborepo configuration
└── pnpm-workspace.yaml           # PNPM workspace configuration

Architecture Patterns

1. Microservices Architecture

  • Service Isolation: Each module runs independently
  • API Communication: RESTful APIs between services
  • Database Per Service: Logical separation with shared Supabase instance
  • Independent Deployment: Each service can be deployed separately

2. Frontend Architecture

  • Next.js App Router: File-based routing with server/client components
  • Component Composition: Reusable UI components with Radix UI
  • State Management: Zustand for global state, React Query for server state
  • Type Safety: Full TypeScript coverage with strict configuration

3. Backend Architecture

  • FastAPI: Async Python web framework
  • Dependency Injection: Service layer pattern
  • Database Abstraction: Supabase client with typed models
  • Error Handling: Structured exception handling with proper HTTP status codes

Code Style and Conventions

TypeScript/JavaScript Conventions

1. Naming Conventions

// Components: PascalCase
export const ContentRequestForm = () => {};

// Functions and variables: camelCase
const handleSubmit = () => {};
const userData = {};

// Constants: SCREAMING_SNAKE_CASE
const MAX_RETRY_ATTEMPTS = 3;

// Types and interfaces: PascalCase
interface UserData {
  id: string;
  name: string;
}

// Enums: PascalCase with descriptive values
enum JobStatus {
  Pending = 'pending',
  InProgress = 'in-progress',
  Completed = 'completed',
  Failed = 'failed'
}

2. File Organization

// Component file structure
import React from 'react';
import { useState, useEffect } from 'react';

// External library imports
import { useQuery } from '@tanstack/react-query';
import { Button } from '@/components/ui/button';

// Internal imports (relative)
import { useContentStore } from '../stores/useContentStore';
import { ContentRequest } from '../types';

// Component definition
export const ContentRequestForm = ({ onSubmit }: Props) => {
  // State declarations
  const [isLoading, setIsLoading] = useState(false);

  // Custom hooks
  const { createRequest } = useContentStore();

  // Effects
  useEffect(() => {
    // Effect logic
  }, []);

  // Event handlers
  const handleSubmit = async (data: ContentRequest) => {
    // Handler logic
  };

  // Render
  return (
    <form onSubmit={handleSubmit}>
      {/* JSX */}
    </form>
  );
};

3. ESLint Configuration

{
  "extends": [
    "next/core-web-vitals",
    "@typescript-eslint/recommended",
    "prettier"
  ],
  "rules": {
    "react-hooks/exhaustive-deps": "warn",
    "@typescript-eslint/no-unused-vars": "error",
    "@typescript-eslint/explicit-function-return-type": "warn",
    "prefer-const": "error",
    "no-var": "error"
  }
}

Python Conventions

1. Naming Conventions

# Classes: PascalCase
class ContentProcessor:
    pass

# Functions and variables: snake_case
def process_content(request_data: dict) -> dict:
    user_id = request_data.get('user_id')
    return {}

# Constants: SCREAMING_SNAKE_CASE
MAX_TOKENS = 4000
DEFAULT_PROVIDER = 'openai'

# Private methods: leading underscore
def _validate_request(self, data: dict) -> bool:
    pass

2. Type Hints

from typing import Optional, List, Dict, Any
from pydantic import BaseModel

# Function signatures
async def generate_content(
    prompt: str,
    provider: str = 'openai',
    max_tokens: Optional[int] = None
) -> Dict[str, Any]:
    pass

# Pydantic models for API
class ContentRequest(BaseModel):
    topic: str
    content_type: str
    keywords: List[str]
    client_id: str
    max_tokens: Optional[int] = 1000

3. Error Handling

from fastapi import HTTPException
import logging

logger = logging.getLogger(__name__)

async def process_request(request: ContentRequest):
    try:
        result = await llm_client.generate(request.prompt)
        return result
    except ValueError as e:
        logger.error(f"Invalid request data: {e}")
        raise HTTPException(status_code=400, detail=str(e))
    except Exception as e:
        logger.error(f"Unexpected error: {e}")
        raise HTTPException(status_code=500, detail="Internal server error")

Documentation Standards

1. TypeScript Documentation

/**
 * Hook for managing content requests with optimistic updates
 * and error handling.
 * 
 * @param clientId - The client ID to filter requests
 * @returns Object containing request state and mutation functions
 * 
 * @example
 * ```tsx
 * const { createRequest, isLoading, error } = useContentRequests('client-123');
 * 
 * const handleSubmit = async (data) => {
 *   await createRequest(data);
 * };
 * ```
 */
export const useContentRequests = (clientId: string) => {
  // Implementation
};

2. Python Documentation

async def generate_content(
    prompt: str,
    provider: str = 'openai',
    max_tokens: Optional[int] = None
) -> Dict[str, Any]:
    """
    Generate content using the specified LLM provider.

    Args:
        prompt: The input prompt for content generation
        provider: LLM provider to use ('openai', 'anthropic', 'google', 'groq')
        max_tokens: Maximum tokens to generate (provider-specific defaults apply)

    Returns:
        Dictionary containing generated content and metadata:
        - content: Generated text content
        - tokens_used: Number of tokens consumed
        - cost_estimate: Estimated cost in USD
        - provider_model: Specific model used

    Raises:
        ValueError: If prompt is empty or provider is invalid
        HTTPException: If API request fails

    Example:
        >>> result = await generate_content("Write about AI", "openai", 1000)
        >>> print(result['content'])
    """

Testing Strategies and Frameworks

Frontend Testing

1. Test Structure

// components/__tests__/ContentRequestForm.test.tsx
import { render, screen, fireEvent, waitFor } from '@testing-library/react';
import { QueryClient, QueryClientProvider } from '@tanstack/react-query';
import { ContentRequestForm } from '../ContentRequestForm';

const createTestQueryClient = () => new QueryClient({
  defaultOptions: {
    queries: { retry: false },
    mutations: { retry: false },
  },
});

const renderWithProviders = (component: React.ReactElement) => {
  const queryClient = createTestQueryClient();
  return render(
    <QueryClientProvider client={queryClient}>
      {component}
    </QueryClientProvider>
  );
};

describe('ContentRequestForm', () => {
  it('should submit form with valid data', async () => {
    const onSubmit = jest.fn();
    renderWithProviders(<ContentRequestForm onSubmit={onSubmit} />);

    // Fill form
    fireEvent.change(screen.getByLabelText(/topic/i), {
      target: { value: 'AI in Healthcare' }
    });

    // Submit
    fireEvent.click(screen.getByRole('button', { name: /generate/i }));

    await waitFor(() => {
      expect(onSubmit).toHaveBeenCalledWith({
        topic: 'AI in Healthcare',
        // ... other expected data
      });
    });
  });
});

2. Integration Tests

// integration/ContentRequestForm.integration.test.tsx
import { setupServer } from 'msw/node';
import { rest } from 'msw';
import { renderWithProviders } from '../test-utils';

const server = setupServer(
  rest.post('/api/content/generate', (req, res, ctx) => {
    return res(
      ctx.json({
        jobId: 'job-123',
        status: 'pending'
      })
    );
  })
);

beforeAll(() => server.listen());
afterEach(() => server.resetHandlers());
afterAll(() => server.close());

describe('Content Request Integration', () => {
  it('should create content request and show job status', async () => {
    // Test implementation
  });
});

3. Performance Tests

// performance/ContentRequestForm.performance.test.tsx
import { measureRender } from '../test-utils/performance';

describe('ContentRequestForm Performance', () => {
  it('should render within acceptable time limits', async () => {
    const renderTime = await measureRender(<ContentRequestForm />);
    expect(renderTime).toBeLessThan(100); // milliseconds
  });
});

Backend Testing

1. FastAPI Test Setup

# tests/conftest.py
import pytest
from fastapi.testclient import TestClient
from unittest.mock import AsyncMock
from app import app
from database import get_database

@pytest.fixture
def client():
    """Test client for FastAPI app"""
    return TestClient(app)

@pytest.fixture
def mock_database():
    """Mock database for testing"""
    return AsyncMock()

@pytest.fixture(autouse=True)
def override_dependencies(mock_database):
    """Override app dependencies with mocks"""
    app.dependency_overrides[get_database] = lambda: mock_database
    yield
    app.dependency_overrides.clear()

2. API Endpoint Tests

# tests/test_api.py
import pytest
from unittest.mock import patch, AsyncMock

def test_generate_content_success(client, mock_database):
    """Test successful content generation"""
    # Mock database response
    mock_database.create_job.return_value = {
        'id': 'job-123',
        'status': 'pending'
    }

    with patch('llm_client.OpenAIClient.generate') as mock_generate:
        mock_generate.return_value = {
            'content': 'Generated content',
            'tokens_used': 150
        }

        response = client.post('/generate', json={
            'topic': 'AI in Healthcare',
            'content_type': 'blog_post',
            'client_id': 'client-123'
        })

        assert response.status_code == 200
        assert response.json()['status'] == 'pending'

def test_generate_content_validation_error(client):
    """Test validation error handling"""
    response = client.post('/generate', json={
        'topic': '',  # Invalid empty topic
        'content_type': 'blog_post'
    })

    assert response.status_code == 422

@pytest.mark.asyncio
async def test_async_content_processing():
    """Test async content processing logic"""
    from content_processor import ContentProcessor

    processor = ContentProcessor()
    result = await processor.process_content({
        'prompt': 'Test prompt',
        'provider': 'openai'
    })

    assert 'content' in result
    assert result['tokens_used'] > 0

3. Database Tests

# tests/test_database.py
import pytest
from database import Database
from unittest.mock import AsyncMock

@pytest.fixture
async def database():
    """Database instance for testing"""
    db = Database()
    # Use test database
    db.client = AsyncMock()
    return db

@pytest.mark.asyncio
async def test_create_job(database):
    """Test job creation"""
    job_data = {
        'topic': 'Test topic',
        'client_id': 'client-123',
        'status': 'pending'
    }

    database.client.table.return_value.insert.return_value.execute.return_value = {
        'data': [{'id': 'job-123', **job_data}]
    }

    result = await database.create_job(job_data)
    assert result['id'] == 'job-123'
    assert result['status'] == 'pending'

Test Commands

# Frontend tests
cd apps/frontend
pnpm test                    # Run all tests
pnpm test:watch             # Watch mode
pnpm test:coverage          # Coverage report

# Backend tests
cd apps/cpm
source venv/bin/activate
pytest                      # Run all tests
pytest -v                   # Verbose output
pytest --cov=app           # Coverage report
pytest -k "test_api"       # Run specific tests
pytest --asyncio-mode=auto # Async test mode

Contributing Guidelines and Workflow

Git Workflow

1. Branch Naming Convention

# Feature branches
feature/add-authentication-system
feature/improve-content-quality

# Bug fixes
bugfix/fix-job-status-updates
bugfix/handle-api-timeout-errors

# Hotfixes
hotfix/critical-security-patch

# Documentation
docs/update-api-documentation
docs/add-deployment-guide

2. Commit Message Format

Follow Conventional Commits specification:

# Format: <type>[optional scope]: <description>

# Examples:
feat(cpm): add support for Anthropic Claude models
fix(frontend): resolve job status polling issue
docs(api): update authentication endpoint documentation
test(smm): add integration tests for strategy management
refactor(database): optimize query performance
chore(deps): update dependencies to latest versions

# Breaking changes:
feat(api)!: change authentication to OAuth 2.0

# Multi-line for complex changes:
feat(cpm): implement batch content generation

- Add support for processing multiple requests
- Implement queue-based batch processing
- Add cost optimization for batch operations
- Update API documentation

Closes #123

3. Pull Request Process

PR Title Format:

<type>: <brief description>

Examples:
feat: Add real-time job status updates
fix: Resolve authentication token refresh issue
docs: Update developer setup guide

PR Description Template:

## Description
Brief description of changes made.

## Type of Change
- [ ] Bug fix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected)
- [ ] Documentation update
- [ ] Performance improvement
- [ ] Code refactoring

## How Has This Been Tested?
- [ ] Unit tests pass
- [ ] Integration tests pass
- [ ] Manual testing completed
- [ ] Performance testing (if applicable)

## Testing Steps
1. Step one
2. Step two
3. Step three

## Screenshots/Videos (if applicable)
Add screenshots or videos to help explain your changes.

## Checklist
- [ ] My code follows the style guidelines of this project
- [ ] I have performed a self-review of my code
- [ ] I have commented my code, particularly in hard-to-understand areas
- [ ] I have made corresponding changes to the documentation
- [ ] My changes generate no new warnings
- [ ] I have added tests that prove my fix is effective or that my feature works
- [ ] New and existing unit tests pass locally with my changes

## Related Issues
Closes #123
Relates to #456

4. Code Review Guidelines

For Reviewers: - Review within 24 hours - Focus on logic, security, performance, and maintainability - Provide constructive feedback with examples - Approve only when all concerns are addressed

Review Checklist: - [ ] Code follows project conventions - [ ] Tests are comprehensive and pass - [ ] Documentation is updated - [ ] Security implications considered - [ ] Performance impact assessed - [ ] Backward compatibility maintained

Development Workflow

1. Setting Up for New Features

# Start from main branch
git checkout main
git pull origin main

# Create feature branch
git checkout -b feature/add-new-feature

# Make changes and commit regularly
git add .
git commit -m "feat: implement core functionality"

# Push and create PR
git push origin feature/add-new-feature

2. Code Quality Checks

# Run before committing
pnpm lint              # Check linting issues
pnpm format            # Format code
pnpm test              # Run tests
pnpm build             # Test build process

# Python services
black .                # Format Python code
pylint app/           # Check Python linting
pytest --cov=app      # Run tests with coverage

3. Pre-commit Hooks

Create .pre-commit-config.yaml:

repos:
  - repo: https://github.com/pre-commit/pre-commit-hooks
    rev: v4.4.0
    hooks:
      - id: trailing-whitespace
      - id: end-of-file-fixer
      - id: check-yaml
      - id: check-added-large-files

  - repo: https://github.com/psf/black
    rev: 22.3.0
    hooks:
      - id: black
        language_version: python3

  - repo: https://github.com/pre-commit/mirrors-eslint
    rev: v8.44.0
    hooks:
      - id: eslint
        files: \.(js|ts|tsx)$
        types: [file]

Module Development Guides

Content Production Module (CPM) Development

1. Architecture Overview

# app.py - FastAPI application entry point
from fastapi import FastAPI, BackgroundTasks
from content_processor import ContentProcessor
from llm_client import LLMClientFactory
from database import Database

app = FastAPI(title="Content Production Module")
processor = ContentProcessor()
db = Database()

@app.post("/generate")
async def generate_content(
    request: ContentRequest,
    background_tasks: BackgroundTasks
):
    # Queue processing job
    job = await db.create_job(request.dict())
    background_tasks.add_task(processor.process_job, job['id'])
    return {"job_id": job['id'], "status": "queued"}

2. Adding New LLM Providers

# llm_client.py
from abc import ABC, abstractmethod
from typing import Dict, Any

class BaseLLMClient(ABC):
    """Base class for all LLM providers"""

    @abstractmethod
    async def generate(self, prompt: str, **kwargs) -> Dict[str, Any]:
        """Generate content using the provider's API"""
        pass

    @abstractmethod
    def calculate_cost(self, input_tokens: int, output_tokens: int) -> float:
        """Calculate estimated cost for the request"""
        pass

class NewProviderClient(BaseLLMClient):
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.client = NewProviderSDK(api_key=api_key)

    async def generate(self, prompt: str, **kwargs) -> Dict[str, Any]:
        response = await self.client.completions.create(
            prompt=prompt,
            max_tokens=kwargs.get('max_tokens', 1000)
        )

        return {
            'content': response.text,
            'input_tokens': response.usage.prompt_tokens,
            'output_tokens': response.usage.completion_tokens,
            'model': response.model,
            'cost_estimate': self.calculate_cost(
                response.usage.prompt_tokens,
                response.usage.completion_tokens
            )
        }

    def calculate_cost(self, input_tokens: int, output_tokens: int) -> float:
        # Provider-specific pricing
        input_cost = (input_tokens / 1000) * 0.001  # $0.001 per 1K tokens
        output_cost = (output_tokens / 1000) * 0.002  # $0.002 per 1K tokens
        return input_cost + output_cost

# Register new provider
class LLMClientFactory:
    @staticmethod
    def create_client(provider: str, api_key: str) -> BaseLLMClient:
        clients = {
            'openai': OpenAIClient,
            'anthropic': AnthropicClient,
            'google': GoogleClient,
            'groq': GroqClient,
            'newprovider': NewProviderClient  # Add new provider
        }

        if provider not in clients:
            raise ValueError(f"Unsupported provider: {provider}")

        return clients[provider](api_key)

3. Content Processing Pipeline

# content_processor.py
import asyncio
from typing import Dict, Any
import structlog

logger = structlog.get_logger()

class ContentProcessor:
    def __init__(self):
        self.llm_factory = LLMClientFactory()
        self.db = Database()

    async def process_job(self, job_id: str):
        """Process a content generation job"""
        try:
            # Update job status
            await self.db.update_job_status(job_id, 'in-progress')

            # Get job details
            job = await self.db.get_job(job_id)

            # Get client strategy (via SMM)
            strategy = await self._get_client_strategy(job['client_id'])

            # Generate optimized prompt (via IM)
            prompt = await self._generate_prompt(job, strategy)

            # Generate content
            result = await self._generate_content(prompt, job['llm_provider'])

            # Post-process content
            processed_result = await self._post_process_content(result, strategy)

            # Save results
            await self.db.update_job_result(job_id, processed_result)
            await self.db.update_job_status(job_id, 'completed')

            logger.info("Job completed successfully", job_id=job_id)

        except Exception as e:
            logger.error("Job processing failed", job_id=job_id, error=str(e))
            await self.db.update_job_status(job_id, 'failed', error_message=str(e))

    async def _generate_content(self, prompt: str, provider: str) -> Dict[str, Any]:
        """Generate content using specified provider"""
        api_key = os.getenv(f"{provider.upper()}_API_KEY")
        client = self.llm_factory.create_client(provider, api_key)

        # Implement retry logic
        for attempt in range(3):
            try:
                return await client.generate(prompt)
            except Exception as e:
                if attempt == 2:  # Last attempt
                    raise
                await asyncio.sleep(2 ** attempt)  # Exponential backoff

Instructions Module (IM) Development

1. Prompt Template System

# lib/prompt_manager.py
from jinja2 import Template
from typing import Dict, Any, List

class PromptManager:
    def __init__(self, db: Database):
        self.db = db

    async def generate_prompt(
        self,
        client_id: str,
        content_type: str,
        context: Dict[str, Any]
    ) -> str:
        """Generate dynamic prompt based on client strategy and context"""

        # Get client-specific template
        template = await self.db.get_prompt_template(client_id, content_type)

        if not template:
            # Fall back to default template
            template = await self.db.get_default_template(content_type)

        # Render template with context
        jinja_template = Template(template['template'])
        return jinja_template.render(**context)

    async def create_template(
        self,
        client_id: str,
        name: str,
        content_type: str,
        template: str,
        variables: List[str]
    ) -> Dict[str, Any]:
        """Create new prompt template"""

        # Validate template syntax
        try:
            Template(template).render(**{var: f"test_{var}" for var in variables})
        except Exception as e:
            raise ValueError(f"Invalid template syntax: {e}")

        return await self.db.create_prompt_template({
            'client_id': client_id,
            'name': name,
            'content_type': content_type,
            'template': template,
            'variables': variables
        })

# Example template usage
BLOG_POST_TEMPLATE = """
Write a comprehensive blog post about {{ topic }}.

Target Audience: {{ target_audience | default("general audience") }}
Tone: {{ tone | default("professional") }}
Word Count: {{ word_count | default("800-1200 words") }}

{% if keywords %}
SEO Keywords to include naturally:
{% for keyword in keywords %}
- {{ keyword }}
{% endfor %}
{% endif %}

{% if brand_guidelines %}
Brand Guidelines:
{{ brand_guidelines }}
{% endif %}

Structure the post with:
1. Engaging introduction
2. Main content sections with subheadings
3. Conclusion with call-to-action

Ensure the content is:
- Original and engaging
- SEO-optimized
- Factually accurate
- Aligned with brand voice
"""

Strategy Management Module (SMM) Development

1. Client Hierarchy Management

# lib/client_manager.py
from typing import List, Dict, Any, Optional
from sqlalchemy.orm import Session

class ClientManager:
    def __init__(self, db: Session):
        self.db = db

    async def create_client(
        self,
        name: str,
        client_type: str,
        parent_id: Optional[str] = None,
        metadata: Dict[str, Any] = None
    ) -> Dict[str, Any]:
        """Create new client with proper hierarchy"""

        # Validate hierarchy rules
        if client_type == 'root' and parent_id:
            raise ValueError("Root clients cannot have parents")

        if client_type == 'customer' and not parent_id:
            raise ValueError("Customer clients must have an agency parent")

        # Create client
        client = await self.db.create_client({
            'name': name,
            'slug': self._generate_slug(name),
            'type': client_type,
            'parent_id': parent_id,
            'metadata': metadata or {}
        })

        # Set up default strategies
        await self._create_default_strategies(client['id'])

        return client

    async def get_client_hierarchy(self, client_id: str) -> Dict[str, Any]:
        """Get complete client hierarchy"""
        client = await self.db.get_client(client_id)

        hierarchy = {
            'client': client,
            'parent': None,
            'children': [],
            'ancestors': [],
            'descendants': []
        }

        # Get parent chain
        if client['parent_id']:
            hierarchy['ancestors'] = await self._get_ancestors(client_id)
            hierarchy['parent'] = hierarchy['ancestors'][-1] if hierarchy['ancestors'] else None

        # Get children
        hierarchy['children'] = await self.db.get_client_children(client_id)

        # Get all descendants
        hierarchy['descendants'] = await self._get_descendants(client_id)

        return hierarchy

    async def _create_default_strategies(self, client_id: str):
        """Create default strategies for new client"""
        default_strategies = [
            {
                'name': 'Default Blog Strategy',
                'content_type': 'blog_post',
                'configuration': {
                    'tone': 'professional',
                    'target_audience': 'business professionals',
                    'word_count_range': [800, 1200],
                    'seo_focus': True,
                    'include_cta': True
                }
            },
            {
                'name': 'Default Social Media Strategy',
                'content_type': 'social_media',
                'configuration': {
                    'platforms': ['linkedin', 'twitter'],
                    'character_limits': {'twitter': 280, 'linkedin': 3000},
                    'hashtag_count': {'twitter': 2, 'linkedin': 5},
                    'tone': 'engaging'
                }
            }
        ]

        for strategy in default_strategies:
            strategy['client_id'] = client_id
            await self.db.create_strategy(strategy)

2. Strategy Configuration System

# lib/strategy_config.py
from pydantic import BaseModel, validator
from typing import Dict, Any, List, Optional
from enum import Enum

class ContentType(str, Enum):
    BLOG_POST = 'blog_post'
    SOCIAL_MEDIA = 'social_media'
    EMAIL_CAMPAIGN = 'email_campaign'
    PRESS_RELEASE = 'press_release'
    PRODUCT_DESCRIPTION = 'product_description'

class ToneStyle(str, Enum):
    PROFESSIONAL = 'professional'
    CASUAL = 'casual'
    FRIENDLY = 'friendly'
    AUTHORITATIVE = 'authoritative'
    CONVERSATIONAL = 'conversational'

class StrategyConfiguration(BaseModel):
    """Base strategy configuration model"""
    content_type: ContentType
    tone: ToneStyle
    target_audience: str
    brand_guidelines: Optional[str] = None
    seo_keywords: List[str] = []

    class Config:
        use_enum_values = True

class BlogPostStrategy(StrategyConfiguration):
    """Blog post specific strategy configuration"""
    word_count_min: int = 500
    word_count_max: int = 2000
    include_introduction: bool = True
    include_conclusion: bool = True
    include_cta: bool = True
    subheading_count: int = 3
    meta_description_length: int = 160

    @validator('word_count_max')
    def validate_word_count(cls, v, values):
        if 'word_count_min' in values and v <= values['word_count_min']:
            raise ValueError('Max word count must be greater than min word count')
        return v

class SocialMediaStrategy(StrategyConfiguration):
    """Social media specific strategy configuration"""
    platforms: List[str]
    character_limits: Dict[str, int]
    hashtag_count: Dict[str, int] = {}
    include_emoji: bool = True
    post_timing: Optional[Dict[str, str]] = None

    @validator('platforms')
    def validate_platforms(cls, v):
        valid_platforms = ['twitter', 'linkedin', 'facebook', 'instagram']
        for platform in v:
            if platform not in valid_platforms:
                raise ValueError(f'Invalid platform: {platform}')
        return v

# Strategy factory
class StrategyFactory:
    @staticmethod
    def create_strategy(content_type: str, config: Dict[str, Any]) -> StrategyConfiguration:
        strategy_classes = {
            'blog_post': BlogPostStrategy,
            'social_media': SocialMediaStrategy,
            # Add more strategy types as needed
        }

        strategy_class = strategy_classes.get(content_type, StrategyConfiguration)
        return strategy_class(**config)

Frontend Development Guide

1. Component Development Pattern

// components/content/ContentRequestForm.tsx
import { useState } from 'react';
import { useForm } from 'react-hook-form';
import { zodResolver } from '@hookform/resolvers/zod';
import { z } from 'zod';
import { Button } from '@/components/ui/button';
import { Form, FormField, FormItem, FormLabel, FormControl, FormMessage } from '@/components/ui/form';
import { useCreateContentRequest } from '@/hooks/useCreateContentRequest';
import { useContentStore } from '@/stores/useContentStore';

// Validation schema
const contentRequestSchema = z.object({
  topic: z.string().min(1, 'Topic is required').max(200, 'Topic too long'),
  contentType: z.enum(['blog_post', 'social_media', 'email_campaign']),
  keywords: z.array(z.string()).min(1, 'At least one keyword is required'),
  targetAudience: z.string().optional(),
  tone: z.enum(['professional', 'casual', 'friendly']).default('professional'),
  wordCount: z.number().min(100).max(5000).optional(),
});

type ContentRequestFormData = z.infer<typeof contentRequestSchema>;

interface ContentRequestFormProps {
  clientId: string;
  onSubmit?: (data: ContentRequestFormData) => void;
  onCancel?: () => void;
}

export const ContentRequestForm = ({ clientId, onSubmit, onCancel }: ContentRequestFormProps) => {
  const [isSubmitting, setIsSubmitting] = useState(false);
  const { createRequest, isLoading } = useCreateContentRequest();
  const { selectedClient } = useContentStore();

  const form = useForm<ContentRequestFormData>({
    resolver: zodResolver(contentRequestSchema),
    defaultValues: {
      topic: '',
      contentType: 'blog_post',
      keywords: [],
      tone: 'professional',
    },
  });

  const handleSubmit = async (data: ContentRequestFormData) => {
    try {
      setIsSubmitting(true);

      const request = {
        ...data,
        clientId,
        createdBy: selectedClient?.id,
      };

      const result = await createRequest(request);

      // Handle success
      onSubmit?.(data);
      form.reset();

      // Show success notification
      toast.success(`Content request created. Job ID: ${result.jobId}`);

    } catch (error) {
      console.error('Failed to create content request:', error);
      toast.error('Failed to create content request. Please try again.');
    } finally {
      setIsSubmitting(false);
    }
  };

  return (
    <Form {...form}>
      <form onSubmit={form.handleSubmit(handleSubmit)} className="space-y-6">
        <FormField
          control={form.control}
          name="topic"
          render={({ field }) => (
            <FormItem>
              <FormLabel>Topic</FormLabel>
              <FormControl>
                <Input 
                  placeholder="Enter content topic..." 
                  {...field} 
                />
              </FormControl>
              <FormMessage />
            </FormItem>
          )}
        />

        <FormField
          control={form.control}
          name="contentType"
          render={({ field }) => (
            <FormItem>
              <FormLabel>Content Type</FormLabel>
              <Select onValueChange={field.onChange} defaultValue={field.value}>
                <FormControl>
                  <SelectTrigger>
                    <SelectValue placeholder="Select content type" />
                  </SelectTrigger>
                </FormControl>
                <SelectContent>
                  <SelectItem value="blog_post">Blog Post</SelectItem>
                  <SelectItem value="social_media">Social Media</SelectItem>
                  <SelectItem value="email_campaign">Email Campaign</SelectItem>
                </SelectContent>
              </Select>
              <FormMessage />
            </FormItem>
          )}
        />

        <div className="flex gap-4">
          <Button 
            type="submit" 
            disabled={isSubmitting || isLoading}
            className="flex-1"
          >
            {isSubmitting ? 'Creating...' : 'Generate Content'}
          </Button>

          {onCancel && (
            <Button 
              type="button" 
              variant="outline" 
              onClick={onCancel}
              disabled={isSubmitting}
            >
              Cancel
            </Button>
          )}
        </div>
      </form>
    </Form>
  );
};

2. Custom Hook Development

// hooks/useCreateContentRequest.ts
import { useMutation, useQueryClient } from '@tanstack/react-query';
import { createContentRequest } from '@/lib/api/content';
import { ContentRequest, ContentRequestResponse } from '@/types';

export const useCreateContentRequest = () => {
  const queryClient = useQueryClient();

  return useMutation<ContentRequestResponse, Error, ContentRequest>({
    mutationFn: createContentRequest,
    onSuccess: (data, variables) => {
      // Invalidate and refetch jobs list
      queryClient.invalidateQueries({ queryKey: ['jobs', variables.clientId] });

      // Optimistically add the new job to the cache
      queryClient.setQueryData(['jobs', variables.clientId], (old: any) => ({
        ...old,
        data: [
          {
            id: data.jobId,
            status: 'pending',
            topic: variables.topic,
            contentType: variables.contentType,
            createdAt: new Date().toISOString(),
          },
          ...(old?.data || []),
        ],
      }));
    },
    onError: (error) => {
      console.error('Failed to create content request:', error);
    },
  });
};

3. State Management with Zustand

// stores/useContentStore.ts
import { create } from 'zustand';
import { immer } from 'zustand/middleware/immer';
import { persist } from 'zustand/middleware';

interface Client {
  id: string;
  name: string;
  type: 'root' | 'agency' | 'customer';
  parentId?: string;
}

interface ContentState {
  selectedClient: Client | null;
  recentRequests: ContentRequest[];
  preferences: {
    defaultContentType: string;
    defaultTone: string;
    autoSave: boolean;
  };
}

interface ContentActions {
  setSelectedClient: (client: Client | null) => void;
  addRecentRequest: (request: ContentRequest) => void;
  updatePreferences: (preferences: Partial<ContentState['preferences']>) => void;
  clearRecentRequests: () => void;
}

export const useContentStore = create<ContentState & ContentActions>()(
  persist(
    immer((set) => ({
      // Initial state
      selectedClient: null,
      recentRequests: [],
      preferences: {
        defaultContentType: 'blog_post',
        defaultTone: 'professional',
        autoSave: true,
      },

      // Actions
      setSelectedClient: (client) =>
        set((state) => {
          state.selectedClient = client;
        }),

      addRecentRequest: (request) =>
        set((state) => {
          state.recentRequests.unshift(request);
          // Keep only last 10 requests
          if (state.recentRequests.length > 10) {
            state.recentRequests = state.recentRequests.slice(0, 10);
          }
        }),

      updatePreferences: (newPreferences) =>
        set((state) => {
          Object.assign(state.preferences, newPreferences);
        }),

      clearRecentRequests: () =>
        set((state) => {
          state.recentRequests = [];
        }),
    })),
    {
      name: 'content-store',
      partialize: (state) => ({
        selectedClient: state.selectedClient,
        preferences: state.preferences,
      }),
    }
  )
);

API Development and Integration

API Design Principles

1. RESTful API Structure

GET    /api/clients                    # List all clients
POST   /api/clients                    # Create new client
GET    /api/clients/{id}               # Get specific client
PUT    /api/clients/{id}               # Update client
DELETE /api/clients/{id}               # Delete client

GET    /api/clients/{id}/strategies    # Get client strategies
POST   /api/clients/{id}/strategies    # Create strategy for client

GET    /api/jobs                       # List jobs (with filtering)
POST   /api/jobs                       # Create new job
GET    /api/jobs/{id}                  # Get job details
PUT    /api/jobs/{id}                  # Update job
DELETE /api/jobs/{id}                  # Cancel job

POST   /api/content/generate           # Generate content
GET    /api/content/templates          # Get prompt templates

2. Request/Response Standards

// Standard API response format
interface ApiResponse<T = any> {
  success: boolean;
  data?: T;
  error?: {
    code: string;
    message: string;
    details?: any;
  };
  pagination?: {
    page: number;
    pageSize: number;
    total: number;
    totalPages: number;
  };
}

// Error response format
interface ApiError {
  code: string;
  message: string;
  details?: {
    field?: string;
    value?: any;
    constraint?: string;
  }[];
}

// Example usage
const response: ApiResponse<Job[]> = {
  success: true,
  data: [
    {
      id: 'job-123',
      status: 'completed',
      topic: 'AI in Healthcare',
      result: { content: '...' }
    }
  ],
  pagination: {
    page: 1,
    pageSize: 20,
    total: 150,
    totalPages: 8
  }
};

3. Authentication & Authorization

// middleware/auth.ts
import { NextRequest, NextResponse } from 'next/server';
import { createServerComponentClient } from '@supabase/auth-helpers-nextjs';

export async function authMiddleware(request: NextRequest) {
  const supabase = createServerComponentClient({ cookies: () => request.cookies });

  try {
    const { data: { session }, error } = await supabase.auth.getSession();

    if (error || !session) {
      return NextResponse.json(
        { success: false, error: { code: 'UNAUTHORIZED', message: 'Authentication required' } },
        { status: 401 }
      );
    }

    // Add user context to request
    const requestHeaders = new Headers(request.headers);
    requestHeaders.set('x-user-id', session.user.id);
    requestHeaders.set('x-user-email', session.user.email || '');

    return NextResponse.next({
      request: {
        headers: requestHeaders,
      },
    });

  } catch (error) {
    return NextResponse.json(
      { success: false, error: { code: 'AUTH_ERROR', message: 'Authentication failed' } },
      { status: 500 }
    );
  }
}

// Usage in API routes
export async function GET(request: NextRequest) {
  const userId = request.headers.get('x-user-id');
  // Handle authenticated request
}

LLM Provider Integration (OpenRouter)

The CPM production pipeline now runs through OpenRouter as the primary aggregator. When issuing POST /generate requests from trusted services:

  • Set llm_provider to openrouter.
  • Supply a model string sourced from /api/providers/status (default: openai/gpt-4o-mini).
  • Attach the calling user context via user_id and provider_key_id so the CPM can validate ownership of the stored OpenRouter key.

Example payload accepted by apps/cpm/app.py:

{
  "content_type": "blog",
  "prompt": "Explain how AI assistants help marketing teams increase productivity.",
  "client_id": "550e8400-e29b-41d4-a716-446655440000",
  "llm_provider": "openrouter",
  "model": "deepseek/deepseek-r1-0528-qwen3-8b:free",
  "keywords": ["ai assistants", "marketing", "automation"],
  "length": "short",
  "user_id": "4d9b1c88-62eb-4b88-92b5-ca03d8bdb616",
  "provider_key_id": "1f1fe2c5-20ca-4602-a285-182443edcbe7"
}

Requests missing the provider key (or referencing a disabled key) receive 503 responses (OpenRouter API key is not configured).

The provider catalogue is available at runtime via:

curl -fsS https://hgcontent.com/api/providers/status | jq '.providers.openrouter'

This endpoint exposes real-time pricing, context lengths, and a requires_user_key flag used by the frontend to guide users through key setup.

``

Inter-Service Communication

1. Service Client Pattern

// lib/services/BaseServiceClient.ts
import axios, { AxiosInstance, AxiosRequestConfig } from 'axios';

export abstract class BaseServiceClient {
  protected client: AxiosInstance;

  constructor(baseURL: string, timeout: number = 10000) {
    this.client = axios.create({
      baseURL,
      timeout,
      headers: {
        'Content-Type': 'application/json',
      },
    });

    this.setupInterceptors();
  }

  private setupInterceptors() {
    // Request interceptor for auth
    this.client.interceptors.request.use(
      (config) => {
        const token = this.getAuthToken();
        if (token) {
          config.headers.Authorization = `Bearer ${token}`;
        }
        return config;
      },
      (error) => Promise.reject(error)
    );

    // Response interceptor for error handling
    this.client.interceptors.response.use(
      (response) => response,
      (error) => {
        if (error.response?.status === 401) {
          // Handle authentication errors
          this.handleAuthError();
        }
        return Promise.reject(error);
      }
    );
  }

  protected abstract getAuthToken(): string | null;
  protected abstract handleAuthError(): void;

  protected async request<T>(config: AxiosRequestConfig): Promise<T> {
    try {
      const response = await this.client.request<T>(config);
      return response.data;
    } catch (error) {
      this.handleRequestError(error);
      throw error;
    }
  }

  private handleRequestError(error: any) {
    // Log error details
    console.error('Service request failed:', {
      url: error.config?.url,
      method: error.config?.method,
      status: error.response?.status,
      message: error.message,
    });
  }
}

// lib/services/CPMServiceClient.ts
export class CPMServiceClient extends BaseServiceClient {
  constructor() {
    super(process.env.NEXT_PUBLIC_CPM_API_URL!);
  }

  protected getAuthToken(): string | null {
    // Get token from Supabase session or other auth store
    return null; // Implement based on your auth strategy
  }

  protected handleAuthError(): void {
    // Redirect to login or refresh token
  }

  async generateContent(request: ContentGenerationRequest): Promise<ContentGenerationResponse> {
    return this.request({
      method: 'POST',
      url: '/generate',
      data: request,
    });
  }

  async getJobStatus(jobId: string): Promise<JobStatusResponse> {
    return this.request({
      method: 'GET',
      url: `/jobs/${jobId}`,
    });
  }

  async listJobs(params: ListJobsParams): Promise<ListJobsResponse> {
    return this.request({
      method: 'GET',
      url: '/jobs',
      params,
    });
  }
}

2. Error Handling and Retry Logic

// lib/utils/apiUtils.ts
import { AxiosError } from 'axios';

export interface RetryConfig {
  maxRetries: number;
  baseDelay: number;
  maxDelay: number;
  backoffFactor: number;
}

const DEFAULT_RETRY_CONFIG: RetryConfig = {
  maxRetries: 3,
  baseDelay: 1000,
  maxDelay: 10000,
  backoffFactor: 2,
};

export async function withRetry<T>(
  operation: () => Promise<T>,
  config: Partial<RetryConfig> = {}
): Promise<T> {
  const { maxRetries, baseDelay, maxDelay, backoffFactor } = {
    ...DEFAULT_RETRY_CONFIG,
    ...config,
  };

  let lastError: Error;

  for (let attempt = 0; attempt <= maxRetries; attempt++) {
    try {
      return await operation();
    } catch (error) {
      lastError = error as Error;

      // Don't retry on client errors (4xx)
      if (error instanceof AxiosError && error.response?.status && error.response.status < 500) {
        throw error;
      }

      if (attempt === maxRetries) {
        break;
      }

      // Calculate delay with exponential backoff
      const delay = Math.min(baseDelay * Math.pow(backoffFactor, attempt), maxDelay);
      await new Promise(resolve => setTimeout(resolve, delay));
    }
  }

  throw lastError!;
}

// Usage example
export const cpmService = {
  async generateContent(request: ContentGenerationRequest) {
    return withRetry(
      () => cpmClient.generateContent(request),
      { maxRetries: 2, baseDelay: 500 }
    );
  },
};

API Documentation Generation

1. FastAPI Documentation

# app.py
from fastapi import FastAPI, HTTPException, Depends
from fastapi.openapi.docs import get_swagger_ui_html
from fastapi.openapi.utils import get_openapi
from pydantic import BaseModel, Field
from typing import List, Optional
import uvicorn

app = FastAPI(
    title="Content Production Module API",
    description="API for AI-powered content generation",
    version="1.0.0",
    contact={
        "name": "HG Content Team",
        "email": "dev@hgcontent.com",
    },
    license_info={
        "name": "MIT",
        "url": "https://opensource.org/licenses/MIT",
    },
)

# Request/Response models with detailed documentation
class ContentGenerationRequest(BaseModel):
    """Request model for content generation"""
    topic: str = Field(..., description="Main topic for content generation", max_length=200)
    content_type: str = Field(..., description="Type of content to generate", regex="^(blog_post|social_media|email_campaign)$")
    keywords: List[str] = Field(..., description="SEO keywords to include", min_items=1, max_items=10)
    client_id: str = Field(..., description="Client identifier")
    llm_provider: Optional[str] = Field("openai", description="LLM provider to use")
    max_tokens: Optional[int] = Field(1000, description="Maximum tokens to generate", ge=100, le=4000)

    class Config:
        schema_extra = {
            "example": {
                "topic": "Benefits of AI in Healthcare",
                "content_type": "blog_post",
                "keywords": ["artificial intelligence", "healthcare", "medical technology"],
                "client_id": "client-123",
                "llm_provider": "openai",
                "max_tokens": 1500
            }
        }

class ContentGenerationResponse(BaseModel):
    """Response model for content generation"""
    job_id: str = Field(..., description="Unique identifier for the generation job")
    status: str = Field(..., description="Current status of the job")
    estimated_completion: Optional[str] = Field(None, description="ISO timestamp of estimated completion")

@app.post(
    "/generate",
    response_model=ContentGenerationResponse,
    summary="Generate content",
    description="Queue a new content generation job using AI/LLM providers",
    responses={
        200: {"description": "Job created successfully"},
        400: {"description": "Invalid request data"},
        429: {"description": "Rate limit exceeded"},
        500: {"description": "Internal server error"}
    },
    tags=["Content Generation"]
)
async def generate_content(request: ContentGenerationRequest):
    """
    Generate content using AI/LLM providers.

    This endpoint queues a content generation job and returns immediately with a job ID.
    Use the `/jobs/{job_id}` endpoint to check the status and retrieve results.

    **Supported Content Types:**
    - `blog_post`: Full-length blog articles (500-2000 words)
    - `social_media`: Social media posts (platform-specific formatting)
    - `email_campaign`: Marketing email content

    **Supported LLM Providers:**
    - `openai`: GPT-4, GPT-3.5-turbo
    - `anthropic`: Claude 3.5 Sonnet, Claude 3 Haiku
    - `google`: Gemini 1.5 Pro, Gemini 1.5 Flash
    - `groq`: Llama 3.1 70B (high-speed inference)
    """
    # Implementation
    pass

2. OpenAPI Schema Generation

def custom_openapi():
    if app.openapi_schema:
        return app.openapi_schema

    openapi_schema = get_openapi(
        title="HG Content Generation API",
        version="1.0.0",
        description="Comprehensive API for AI-powered content generation with multi-client support",
        routes=app.routes,
    )

    # Add custom schema information
    openapi_schema["info"]["x-logo"] = {
        "url": "https://hgcontent.com/logo.png"
    }

    # Add server information
    openapi_schema["servers"] = [
        {"url": "https://api.hgcontent.com", "description": "Production server"},
        {"url": "https://staging-api.hgcontent.com", "description": "Staging server"},
        {"url": "http://localhost:8001", "description": "Development server"}
    ]

    # Add authentication schemes
    openapi_schema["components"]["securitySchemes"] = {
        "bearerAuth": {
            "type": "http",
            "scheme": "bearer",
            "bearerFormat": "JWT"
        }
    }

    app.openapi_schema = openapi_schema
    return app.openapi_schema

app.openapi = custom_openapi

Database Development and Migrations

Migration Strategy

1. Supabase Migration Management

-- supabase/migrations/20240801000001_create_core_tables.sql
-- Create core tables for the HG Content system

-- Enable UUID extension
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";

-- Create custom types
CREATE TYPE client_type AS ENUM ('root', 'agency', 'customer');
CREATE TYPE job_status AS ENUM ('pending', 'in_progress', 'completed', 'failed', 'cancelled');
CREATE TYPE content_type AS ENUM ('blog_post', 'social_media', 'email_campaign', 'press_release', 'product_description');

-- Clients table
CREATE TABLE clients (
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    name VARCHAR(255) NOT NULL,
    slug VARCHAR(255) UNIQUE NOT NULL,
    type client_type NOT NULL,
    parent_id UUID REFERENCES clients(id) ON DELETE CASCADE,
    metadata JSONB DEFAULT '{}',
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),

    -- Constraints
    CONSTRAINT valid_hierarchy CHECK (
        (type = 'root' AND parent_id IS NULL) OR
        (type != 'root' AND parent_id IS NOT NULL)
    )
);

-- Jobs table
CREATE TABLE jobs (
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    client_id UUID NOT NULL REFERENCES clients(id) ON DELETE CASCADE,
    topic VARCHAR(500) NOT NULL,
    content_type content_type NOT NULL,
    prompt TEXT NOT NULL,
    llm_provider VARCHAR(50) NOT NULL DEFAULT 'openai',
    status job_status NOT NULL DEFAULT 'pending',
    parameters JSONB DEFAULT '{}',
    result JSONB DEFAULT NULL,
    error_message TEXT DEFAULT NULL,

    -- Cost tracking
    input_tokens INTEGER DEFAULT NULL,
    output_tokens INTEGER DEFAULT NULL,
    total_tokens INTEGER DEFAULT NULL,
    estimated_cost DECIMAL(10,6) DEFAULT NULL,
    provider_model VARCHAR(100) DEFAULT NULL,

    -- Timestamps
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    started_at TIMESTAMP WITH TIME ZONE DEFAULT NULL,
    completed_at TIMESTAMP WITH TIME ZONE DEFAULT NULL,

    -- User context
    created_by UUID REFERENCES auth.users(id) ON DELETE SET NULL
);

-- Strategies table
CREATE TABLE strategies (
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    client_id UUID NOT NULL REFERENCES clients(id) ON DELETE CASCADE,
    name VARCHAR(255) NOT NULL,
    content_type content_type NOT NULL,
    configuration JSONB NOT NULL,
    is_active BOOLEAN DEFAULT true,
    version INTEGER DEFAULT 1,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    created_by UUID REFERENCES auth.users(id) ON DELETE SET NULL
);

-- Prompt templates table
CREATE TABLE prompt_templates (
    id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
    client_id UUID NOT NULL REFERENCES clients(id) ON DELETE CASCADE,
    name VARCHAR(255) NOT NULL,
    content_type content_type NOT NULL,
    template TEXT NOT NULL,
    variables JSONB DEFAULT '{}',
    version INTEGER DEFAULT 1,
    is_active BOOLEAN DEFAULT true,
    is_default BOOLEAN DEFAULT false,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    created_by UUID REFERENCES auth.users(id) ON DELETE SET NULL
);

-- Add indexes
CREATE INDEX idx_clients_parent_id ON clients(parent_id);
CREATE INDEX idx_clients_type ON clients(type);
CREATE INDEX idx_clients_slug ON clients(slug);

CREATE INDEX idx_jobs_client_id ON jobs(client_id);
CREATE INDEX idx_jobs_status ON jobs(status);
CREATE INDEX idx_jobs_created_at ON jobs(created_at);
CREATE INDEX idx_jobs_content_type ON jobs(content_type);
CREATE INDEX idx_jobs_llm_provider ON jobs(llm_provider);

CREATE INDEX idx_strategies_client_id ON strategies(client_id);
CREATE INDEX idx_strategies_content_type ON strategies(content_type);
CREATE INDEX idx_strategies_is_active ON strategies(is_active);

CREATE INDEX idx_prompt_templates_client_id ON prompt_templates(client_id);
CREATE INDEX idx_prompt_templates_content_type ON prompt_templates(content_type);
CREATE INDEX idx_prompt_templates_is_active ON prompt_templates(is_active);

-- Add triggers for updated_at
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
    NEW.updated_at = NOW();
    RETURN NEW;
END;
$$ language 'plpgsql';

CREATE TRIGGER update_clients_updated_at BEFORE UPDATE ON clients
    FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();

CREATE TRIGGER update_jobs_updated_at BEFORE UPDATE ON jobs
    FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();

CREATE TRIGGER update_strategies_updated_at BEFORE UPDATE ON strategies
    FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();

CREATE TRIGGER update_prompt_templates_updated_at BEFORE UPDATE ON prompt_templates
    FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();

2. Row-Level Security (RLS) Setup

-- supabase/migrations/20240801000002_enable_rls.sql
-- Enable Row-Level Security for multi-tenant data isolation

-- Enable RLS on all tables
ALTER TABLE clients ENABLE ROW LEVEL SECURITY;
ALTER TABLE jobs ENABLE ROW LEVEL SECURITY;
ALTER TABLE strategies ENABLE ROW LEVEL SECURITY;
ALTER TABLE prompt_templates ENABLE ROW LEVEL SECURITY;

-- Helper function to get user's accessible client IDs
CREATE OR REPLACE FUNCTION get_user_client_ids(user_id UUID)
RETURNS UUID[] AS $$
DECLARE
    client_ids UUID[];
BEGIN
    -- Get all client IDs the user has access to
    -- This includes clients they directly belong to and their children
    SELECT ARRAY(
        WITH RECURSIVE client_hierarchy AS (
            -- Base case: clients directly associated with user
            SELECT c.id, c.parent_id, c.type
            FROM clients c
            JOIN user_clients uc ON c.id = uc.client_id
            WHERE uc.user_id = user_id

            UNION

            -- Recursive case: child clients
            SELECT c.id, c.parent_id, c.type
            FROM clients c
            JOIN client_hierarchy ch ON c.parent_id = ch.id
        )
        SELECT id FROM client_hierarchy
    ) INTO client_ids;

    RETURN client_ids;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;

-- RLS Policies for clients table
CREATE POLICY "Users can view accessible clients" ON clients
    FOR SELECT
    USING (id = ANY(get_user_client_ids(auth.uid())));

CREATE POLICY "Users can update their clients" ON clients
    FOR UPDATE
    USING (id = ANY(get_user_client_ids(auth.uid())));

-- RLS Policies for jobs table
CREATE POLICY "Users can view jobs for their clients" ON jobs
    FOR SELECT
    USING (client_id = ANY(get_user_client_ids(auth.uid())));

CREATE POLICY "Users can create jobs for their clients" ON jobs
    FOR INSERT
    WITH CHECK (client_id = ANY(get_user_client_ids(auth.uid())));

CREATE POLICY "Users can update jobs for their clients" ON jobs
    FOR UPDATE
    USING (client_id = ANY(get_user_client_ids(auth.uid())));

-- RLS Policies for strategies table
CREATE POLICY "Users can view strategies for their clients" ON strategies
    FOR SELECT
    USING (client_id = ANY(get_user_client_ids(auth.uid())));

CREATE POLICY "Users can manage strategies for their clients" ON strategies
    FOR ALL
    USING (client_id = ANY(get_user_client_ids(auth.uid())));

-- RLS Policies for prompt_templates table
CREATE POLICY "Users can view templates for their clients" ON prompt_templates
    FOR SELECT
    USING (client_id = ANY(get_user_client_ids(auth.uid())));

CREATE POLICY "Users can manage templates for their clients" ON prompt_templates
    FOR ALL
    USING (client_id = ANY(get_user_client_ids(auth.uid())));

3. Database Seeding

# scripts/seed_database.py
import asyncio
import os
from supabase import create_client, Client
from typing import Dict, Any, List
import json

async def seed_database():
    """Seed the database with initial data"""

    # Initialize Supabase client
    supabase: Client = create_client(
        os.getenv("SUPABASE_URL"),
        os.getenv("SUPABASE_SERVICE_ROLE_KEY")
    )

    # Seed clients
    clients = await seed_clients(supabase)

    # Seed strategies
    await seed_strategies(supabase, clients)

    # Seed prompt templates
    await seed_prompt_templates(supabase, clients)

    print("Database seeding completed successfully!")

async def seed_clients(supabase: Client) -> Dict[str, str]:
    """Create initial client hierarchy"""

    clients_data = [
        {
            "name": "HG Content",
            "slug": "hg-content",
            "type": "root",
            "parent_id": None,
            "metadata": {
                "description": "Root client for HG Content system",
                "features": ["all"]
            }
        },
        {
            "name": "PASCO Scientific",
            "slug": "pasco-scientific",
            "type": "agency",
            "parent_id": None,  # Will be updated with root client ID
            "metadata": {
                "industry": "education",
                "target_audience": "educators, students",
                "brand_colors": ["#0066CC", "#FF6600"]
            }
        },
        {
            "name": "Heaviside Digital",
            "slug": "heaviside-digital",
            "type": "agency",
            "parent_id": None,  # Will be updated with root client ID
            "metadata": {
                "industry": "marketing_agency",
                "specialization": "local_businesses",
                "service_areas": ["ohio", "kentucky", "indiana"]
            }
        }
    ]

    created_clients = {}

    # Create root client first
    root_client_data = clients_data[0]
    result = supabase.table("clients").insert(root_client_data).execute()
    root_client_id = result.data[0]["id"]
    created_clients["root"] = root_client_id

    # Update parent_id for other clients
    for client_data in clients_data[1:]:
        client_data["parent_id"] = root_client_id
        result = supabase.table("clients").insert(client_data).execute()
        created_clients[client_data["slug"]] = result.data[0]["id"]

    # Create customer clients for Heaviside Digital
    customer_clients = [
        {
            "name": "Cincinnati Electrician Co.",
            "slug": "cincinnati-electrician",
            "type": "customer",
            "parent_id": created_clients["heaviside-digital"],
            "metadata": {
                "industry": "electrical_services",
                "location": "cincinnati_oh",
                "services": ["residential", "commercial", "emergency"]
            }
        },
        {
            "name": "Ohio Paving Contractors",
            "slug": "ohio-paving",
            "type": "customer", 
            "parent_id": created_clients["heaviside-digital"],
            "metadata": {
                "industry": "construction",
                "location": "columbus_oh",
                "services": ["asphalt_paving", "concrete", "sealcoating"]
            }
        }
    ]

    for customer_data in customer_clients:
        result = supabase.table("clients").insert(customer_data).execute()
        created_clients[customer_data["slug"]] = result.data[0]["id"]

    return created_clients

async def seed_strategies(supabase: Client, clients: Dict[str, str]):
    """Create default strategies for each client"""

    strategies_data = []

    # PASCO Scientific strategies
    pasco_strategies = [
        {
            "client_id": clients["pasco-scientific"],
            "name": "Educational Blog Strategy",
            "content_type": "blog_post",
            "configuration": {
                "tone": "educational",
                "target_audience": "high school and college educators",
                "word_count_range": [800, 1500],
                "include_experiments": True,
                "include_diagrams": True,
                "technical_level": "intermediate",
                "seo_focus": True,
                "brand_mentions": "PASCO Scientific equipment"
            },
            "is_active": True
        },
        {
            "client_id": clients["pasco-scientific"],
            "name": "Social Media Strategy",
            "content_type": "social_media",
            "configuration": {
                "platforms": ["linkedin", "twitter", "facebook"],
                "tone": "informative",
                "hashtags": ["#STEMEducation", "#PhysicsEducation", "#PASCOScientific"],
                "include_visuals": True,
                "post_frequency": "daily",
                "engagement_focus": True
            },
            "is_active": True
        }
    ]

    # Local business strategies for Heaviside customers
    local_business_strategies = [
        {
            "client_id": clients["cincinnati-electrician"],
            "name": "Local SEO Blog Strategy",
            "content_type": "blog_post",
            "configuration": {
                "tone": "professional",
                "target_audience": "homeowners and business owners",
                "location_focus": "Cincinnati, Ohio",
                "word_count_range": [600, 1000],
                "include_local_keywords": True,
                "include_service_areas": True,
                "call_to_action": "contact for quote",
                "seo_focus": True
            },
            "is_active": True
        },
        {
            "client_id": clients["ohio-paving"],
            "name": "Construction Industry Strategy",
            "content_type": "blog_post",
            "configuration": {
                "tone": "authoritative",
                "target_audience": "property managers and contractors",
                "location_focus": "Ohio",
                "word_count_range": [800, 1200],
                "technical_focus": "asphalt and concrete",
                "seasonal_content": True,
                "include_case_studies": True,
                "seo_focus": True
            },
            "is_active": True
        }
    ]

    strategies_data.extend(pasco_strategies)
    strategies_data.extend(local_business_strategies)

    # Insert all strategies
    result = supabase.table("strategies").insert(strategies_data).execute()
    print(f"Created {len(result.data)} strategies")

if __name__ == "__main__":
    asyncio.run(seed_database())

Database Performance Optimization

1. Query Optimization

-- Performance monitoring queries
-- Find slow queries
SELECT 
    query,
    calls,
    total_time,
    mean_time,
    rows
FROM pg_stat_statements
WHERE mean_time > 100  -- Queries taking more than 100ms on average
ORDER BY mean_time DESC
LIMIT 10;

-- Check index usage
SELECT 
    schemaname,
    tablename,
    indexname,
    idx_scan,
    idx_tup_read,
    idx_tup_fetch
FROM pg_stat_user_indexes
WHERE idx_scan = 0  -- Unused indexes
ORDER BY schemaname, tablename;

-- Find missing indexes
SELECT 
    schemaname,
    tablename,
    seq_scan,
    seq_tup_read,
    seq_tup_read / seq_scan AS avg_seq_read
FROM pg_stat_user_tables
WHERE seq_scan > 0
  AND seq_tup_read / seq_scan > 1000  -- Tables doing large sequential scans
ORDER BY seq_tup_read DESC;

2. Composite Indexes

-- Optimized indexes for common query patterns
-- Jobs queries often filter by client_id, status, and created_at
CREATE INDEX CONCURRENTLY idx_jobs_client_status_created 
ON jobs(client_id, status, created_at DESC);

-- Analytics queries often aggregate by date ranges and providers
CREATE INDEX CONCURRENTLY idx_jobs_analytics 
ON jobs(created_at, llm_provider, estimated_cost) 
WHERE status = 'completed';

-- Strategy lookups by client and content type
CREATE INDEX CONCURRENTLY idx_strategies_client_content_active 
ON strategies(client_id, content_type, is_active) 
WHERE is_active = true;

-- Template searches
CREATE INDEX CONCURRENTLY idx_templates_client_content_active 
ON prompt_templates(client_id, content_type, is_active) 
WHERE is_active = true;

-- Full-text search on job topics and results
CREATE INDEX CONCURRENTLY idx_jobs_topic_search 
ON jobs USING gin(to_tsvector('english', topic));

CREATE INDEX CONCURRENTLY idx_jobs_content_search 
ON jobs USING gin(to_tsvector('english', (result->>'content'))) 
WHERE result IS NOT NULL;

3. Partitioning Strategy

-- Partition jobs table by month for better performance
-- Create partitioned table
CREATE TABLE jobs_partitioned (
    LIKE jobs INCLUDING ALL
) PARTITION BY RANGE (created_at);

-- Create monthly partitions
CREATE TABLE jobs_2024_08 PARTITION OF jobs_partitioned
    FOR VALUES FROM ('2024-08-01') TO ('2024-09-01');

CREATE TABLE jobs_2024_09 PARTITION OF jobs_partitioned
    FOR VALUES FROM ('2024-09-01') TO ('2024-10-01');

-- Create function to automatically create new partitions
CREATE OR REPLACE FUNCTION create_monthly_partition(table_name text, start_date date)
RETURNS void AS $$
DECLARE
    partition_name text;
    end_date date;
BEGIN
    partition_name := table_name || '_' || to_char(start_date, 'YYYY_MM');
    end_date := start_date + interval '1 month';

    EXECUTE format('CREATE TABLE %I PARTITION OF %I FOR VALUES FROM (%L) TO (%L)',
                   partition_name, table_name, start_date, end_date);
END;
$$ LANGUAGE plpgsql;

-- Set up automatic partition creation
CREATE OR REPLACE FUNCTION create_next_partition()
RETURNS void AS $$
BEGIN
    PERFORM create_monthly_partition('jobs_partitioned', date_trunc('month', CURRENT_DATE + interval '1 month'));
END;
$$ LANGUAGE plpgsql;

-- Schedule monthly partition creation (using pg_cron if available)
-- SELECT cron.schedule('create-partition', '0 0 1 * *', 'SELECT create_next_partition();');

Debugging and Troubleshooting

Debugging Tools and Techniques

1. Frontend Debugging

React DevTools Configuration:

// lib/debug.ts
export const isDevelopment = process.env.NODE_ENV === 'development';

export const debugConfig = {
  enableReactQueryDevtools: isDevelopment,
  enableReduxDevtools: isDevelopment,
  logLevel: isDevelopment ? 'debug' : 'error',
  enablePerformanceLogging: isDevelopment,
};

// Enable debugging features conditionally
if (isDevelopment) {
  // Enable why-did-you-render
  import('@welldone-software/why-did-you-render').then((whyDidYouRender) => {
    whyDidYouRender.default(React, {
      trackAllPureComponents: true,
      trackHooks: true,
    });
  });
}

Error Boundary Implementation:

// components/ErrorBoundary.tsx
import React, { Component, ErrorInfo, ReactNode } from 'react';
import { Button } from '@/components/ui/button';
import { Alert, AlertDescription, AlertTitle } from '@/components/ui/alert';

interface Props {
  children: ReactNode;
  fallback?: ReactNode;
  onError?: (error: Error, errorInfo: ErrorInfo) => void;
}

interface State {
  hasError: boolean;
  error?: Error;
  errorInfo?: ErrorInfo;
}

export class ErrorBoundary extends Component<Props, State> {
  constructor(props: Props) {
    super(props);
    this.state = { hasError: false };
  }

  static getDerivedStateFromError(error: Error): State {
    return { hasError: true, error };
  }

  componentDidCatch(error: Error, errorInfo: ErrorInfo) {
    this.setState({ errorInfo });

    // Log error to monitoring service
    console.error('Error boundary caught an error:', error, errorInfo);

    // Report to error tracking service (e.g., Sentry)
    if (typeof window !== 'undefined' && window.Sentry) {
      window.Sentry.captureException(error, {
        contexts: {
          react: {
            componentStack: errorInfo.componentStack,
          },
        },
      });
    }

    this.props.onError?.(error, errorInfo);
  }

  handleRetry = () => {
    this.setState({ hasError: false, error: undefined, errorInfo: undefined });
  };

  render() {
    if (this.state.hasError) {
      if (this.props.fallback) {
        return this.props.fallback;
      }

      return (
        <div className="min-h-screen flex items-center justify-center p-4">
          <div className="max-w-md w-full">
            <Alert variant="destructive">
              <AlertTitle>Something went wrong</AlertTitle>
              <AlertDescription className="mt-2">
                {this.state.error?.message || 'An unexpected error occurred'}
              </AlertDescription>
            </Alert>

            <div className="mt-4 space-y-2">
              <Button onClick={this.handleRetry} className="w-full">
                Try Again
              </Button>
              <Button 
                variant="outline" 
                onClick={() => window.location.reload()} 
                className="w-full"
              >
                Reload Page
              </Button>
            </div>

            {process.env.NODE_ENV === 'development' && this.state.errorInfo && (
              <details className="mt-4">
                <summary className="cursor-pointer text-sm font-medium">
                  Error Details (Development)
                </summary>
                <pre className="mt-2 p-2 bg-gray-100 rounded text-xs overflow-auto">
                  {this.state.error?.stack}
                  {this.state.errorInfo.componentStack}
                </pre>
              </details>
            )}
          </div>
        </div>
      );
    }

    return this.props.children;
  }
}

Performance Monitoring:

// hooks/usePerformanceMonitoring.ts
import { useEffect, useRef } from 'react';

export const usePerformanceMonitoring = (componentName: string) => {
  const startTime = useRef<number>();
  const renderCount = useRef(0);

  useEffect(() => {
    if (process.env.NODE_ENV === 'development') {
      startTime.current = performance.now();
      renderCount.current += 1;
    }
  });

  useEffect(() => {
    if (process.env.NODE_ENV === 'development' && startTime.current) {
      const endTime = performance.now();
      const renderTime = endTime - startTime.current;

      console.log(`[Performance] ${componentName}:`, {
        renderTime: `${renderTime.toFixed(2)}ms`,
        renderCount: renderCount.current,
      });

      // Log slow renders
      if (renderTime > 16) { // 60fps threshold
        console.warn(`[Performance Warning] ${componentName} took ${renderTime.toFixed(2)}ms to render`);
      }
    }
  });
};

// Usage in components
export const ContentRequestForm = () => {
  usePerformanceMonitoring('ContentRequestForm');
  // Component implementation
};

2. Backend Debugging

Structured Logging Setup:

# lib/logging_config.py
import structlog
import logging
import sys
from typing import Any, Dict

def configure_logging(level: str = "INFO", format_json: bool = True):
    """Configure structured logging for the application"""

    # Configure standard library logging
    logging.basicConfig(
        format="%(message)s",
        stream=sys.stdout,
        level=getattr(logging, level.upper()),
    )

    # Configure structlog
    if format_json:
        processors = [
            structlog.stdlib.filter_by_level,
            structlog.stdlib.add_logger_name,
            structlog.stdlib.add_log_level,
            structlog.stdlib.PositionalArgumentsFormatter(),
            add_request_id,
            structlog.processors.TimeStamper(fmt="iso"),
            structlog.processors.StackInfoRenderer(),
            structlog.processors.format_exc_info,
            structlog.processors.UnicodeDecoder(),
            structlog.processors.JSONRenderer()
        ]
    else:
        processors = [
            structlog.stdlib.filter_by_level,
            structlog.stdlib.add_logger_name,
            structlog.stdlib.add_log_level,
            structlog.stdlib.PositionalArgumentsFormatter(),
            add_request_id,
            structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"),
            structlog.processors.StackInfoRenderer(),
            structlog.processors.format_exc_info,
            structlog.processors.UnicodeDecoder(),
            structlog.dev.ConsoleRenderer()
        ]

    structlog.configure(
        processors=processors,
        context_class=dict,
        logger_factory=structlog.stdlib.LoggerFactory(),
        wrapper_class=structlog.stdlib.BoundLogger,
        cache_logger_on_first_use=True,
    )

def add_request_id(logger, method_name, event_dict):
    """Add request ID to log entries"""
    # Get request ID from context (FastAPI dependency)
    import contextvars
    request_id = contextvars.ContextVar('request_id', default=None).get()
    if request_id:
        event_dict['request_id'] = request_id
    return event_dict

# Usage in FastAPI app
import uuid
from fastapi import FastAPI, Request
from contextvars import ContextVar

request_id_contextvar: ContextVar[str] = ContextVar('request_id')

@app.middleware("http")
async def add_request_id_middleware(request: Request, call_next):
    request_id = str(uuid.uuid4())
    request_id_contextvar.set(request_id)

    # Add to response headers for debugging
    response = await call_next(request)
    response.headers["X-Request-ID"] = request_id
    return response

Database Query Debugging:

# lib/database_debug.py
import time
import structlog
from functools import wraps
from typing import Any, Callable

logger = structlog.get_logger()

def log_database_queries(func: Callable) -> Callable:
    """Decorator to log database queries with timing"""
    @wraps(func)
    async def wrapper(*args, **kwargs):
        start_time = time.time()
        query_name = func.__name__

        try:
            logger.debug("Database query started", query=query_name, args=args, kwargs=kwargs)
            result = await func(*args, **kwargs)

            duration = time.time() - start_time
            logger.info(
                "Database query completed",
                query=query_name,
                duration=f"{duration:.3f}s",
                result_count=len(result) if isinstance(result, list) else 1
            )

            # Warn on slow queries
            if duration > 1.0:  # 1 second threshold
                logger.warning(
                    "Slow database query detected",
                    query=query_name,
                    duration=f"{duration:.3f}s"
                )

            return result

        except Exception as e:
            duration = time.time() - start_time
            logger.error(
                "Database query failed",
                query=query_name,
                duration=f"{duration:.3f}s",
                error=str(e),
                error_type=type(e).__name__
            )
            raise

    return wrapper

# Usage
class Database:
    @log_database_queries
    async def get_jobs(self, client_id: str, status: str = None):
        # Database query implementation
        pass

API Request/Response Logging:

# middleware/logging_middleware.py
import time
import json
from fastapi import Request, Response
from fastapi.middleware.base import BaseHTTPMiddleware
import structlog

logger = structlog.get_logger()

class LoggingMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        start_time = time.time()

        # Log request
        request_body = None
        if request.method in ["POST", "PUT", "PATCH"]:
            body = await request.body()
            if body:
                try:
                    request_body = json.loads(body.decode())
                    # Remove sensitive data
                    if isinstance(request_body, dict):
                        request_body = self._sanitize_data(request_body)
                except json.JSONDecodeError:
                    request_body = body.decode()[:1000]  # First 1000 chars

        logger.info(
            "Request started",
            method=request.method,
            url=str(request.url),
            headers=dict(request.headers),
            body=request_body,
            client_ip=request.client.host if request.client else None
        )

        # Process request
        try:
            response = await call_next(request)
            duration = time.time() - start_time

            logger.info(
                "Request completed",
                method=request.method,
                url=str(request.url),
                status_code=response.status_code,
                duration=f"{duration:.3f}s"
            )

            return response

        except Exception as e:
            duration = time.time() - start_time

            logger.error(
                "Request failed",
                method=request.method,
                url=str(request.url),
                duration=f"{duration:.3f}s",
                error=str(e),
                error_type=type(e).__name__
            )
            raise

    def _sanitize_data(self, data: dict) -> dict:
        """Remove sensitive information from logged data"""
        sensitive_keys = ['password', 'api_key', 'token', 'secret']
        sanitized = data.copy()

        for key in sensitive_keys:
            if key in sanitized:
                sanitized[key] = "[REDACTED]"

        return sanitized

3. Common Issues and Solutions

Memory Leaks in React:

// Common issue: Not cleaning up subscriptions
// ❌ Bad
const MyComponent = () => {
  const [data, setData] = useState(null);

  useEffect(() => {
    const subscription = websocket.subscribe(setData);
    // Missing cleanup!
  }, []);

  return <div>{data}</div>;
};

// ✅ Good
const MyComponent = () => {
  const [data, setData] = useState(null);

  useEffect(() => {
    const subscription = websocket.subscribe(setData);

    return () => {
      subscription.unsubscribe(); // Cleanup
    };
  }, []);

  return <div>{data}</div>;
};

// Memory leak detector hook
const useMemoryLeakDetector = (componentName: string) => {
  useEffect(() => {
    const startMemory = performance.memory?.usedJSHeapSize;

    return () => {
      if (process.env.NODE_ENV === 'development' && performance.memory) {
        const endMemory = performance.memory.usedJSHeapSize;
        const memoryDiff = endMemory - startMemory;

        if (memoryDiff > 1000000) { // 1MB threshold
          console.warn(`[Memory Leak Warning] Component ${componentName} may have leaked ${(memoryDiff / 1024 / 1024).toFixed(2)}MB`);
        }
      }
    };
  }, [componentName]);
};

Database Connection Issues:

# Connection pool debugging
import asyncio
from contextlib import asynccontextmanager

class DatabaseConnectionDebugger:
    def __init__(self, supabase_client):
        self.client = supabase_client
        self.active_connections = 0
        self.max_connections = 0

    @asynccontextmanager
    async def get_connection(self):
        """Debug connection usage"""
        self.active_connections += 1
        self.max_connections = max(self.max_connections, self.active_connections)

        logger.debug(
            "Database connection acquired",
            active_connections=self.active_connections,
            max_connections=self.max_connections
        )

        try:
            yield self.client
        finally:
            self.active_connections -= 1
            logger.debug(
                "Database connection released",
                active_connections=self.active_connections
            )

            # Warn if too many connections
            if self.active_connections > 10:
                logger.warning(
                    "High number of active database connections",
                    active_connections=self.active_connections
                )

# Usage
db_debugger = DatabaseConnectionDebugger(supabase_client)

async def get_jobs():
    async with db_debugger.get_connection() as db:
        return await db.from_('jobs').select('*').execute()

Debugging Commands and Tools

1. Development Scripts

# package.json scripts for debugging
{
  "scripts": {
    "dev:debug": "NODE_ENV=development DEBUG=* next dev",
    "dev:inspect": "NODE_OPTIONS='--inspect' next dev",
    "test:debug": "jest --detectOpenHandles --forceExit",
    "build:analyze": "ANALYZE=true next build",
    "db:debug": "supabase db reset --debug",
    "logs:frontend": "vercel logs --project=hg-content-frontend",
    "logs:backend": "railway logs --service=cpm"
  }
}

# Python debugging commands
# Start with debugger
python -m pdb app.py

# Profile memory usage
python -m memory_profiler app.py

# Profile performance
python -m cProfile -o profile.stats app.py
python -c "import pstats; p = pstats.Stats('profile.stats'); p.sort_stats('cumulative').print_stats(10)"

2. Environment-Specific Debugging

# Development environment
export DEBUG=true
export LOG_LEVEL=DEBUG
export DATABASE_DEBUG=true

# Production debugging (careful!)
export LOG_LEVEL=INFO
export SENTRY_DEBUG=false
export PERFORMANCE_MONITORING=true

Performance Optimization Techniques

Frontend Optimization

1. Code Splitting and Lazy Loading

// app/layout.tsx - Strategic code splitting
import { Suspense, lazy } from 'react';
import { LoadingSpinner } from '@/components/ui/loading-spinner';

// Lazy load heavy components
const AnalyticsDashboard = lazy(() => import('@/components/analytics/AnalyticsDashboard'));
const StrategyEditor = lazy(() => import('@/components/strategies/StrategyEditor'));

export default function Layout({ children }: { children: React.ReactNode }) {
  return (
    <html lang="en">
      <body>
        <Suspense fallback={<LoadingSpinner />}>
          {children}
        </Suspense>
      </body>
    </html>
  );
}

// Route-based code splitting
// app/(dashboard)/analytics/page.tsx
import dynamic from 'next/dynamic';
import { Skeleton } from '@/components/ui/skeleton';

const AnalyticsDashboard = dynamic(
  () => import('@/components/analytics/AnalyticsDashboard'),
  {
    loading: () => (
      <div className="space-y-4">
        <Skeleton className="h-8 w-64" />
        <div className="grid grid-cols-1 md:grid-cols-3 gap-4">
          {Array.from({ length: 6 }).map((_, i) => (
            <Skeleton key={i} className="h-32" />
          ))}
        </div>
      </div>
    ),
    ssr: false, // Client-side only for heavy charts
  }
);

export default function AnalyticsPage() {
  return <AnalyticsDashboard />;
}

2. React Query Optimization

// lib/queryClient.ts
import { QueryClient } from '@tanstack/react-query';

export const queryClient = new QueryClient({
  defaultOptions: {
    queries: {
      // Cache for 5 minutes
      staleTime: 5 * 60 * 1000,
      // Keep in cache for 10 minutes
      cacheTime: 10 * 60 * 1000,
      // Retry failed requests 2 times
      retry: 2,
      // Retry with exponential backoff
      retryDelay: (attemptIndex) => Math.min(1000 * 2 ** attemptIndex, 30000),
      // Refetch on window focus for critical data only
      refetchOnWindowFocus: false,
      // Background refetch for stale data
      refetchOnMount: true,
    },
    mutations: {
      retry: 1,
    },
  },
});

// Optimized query hooks
export const useJobs = (clientId: string, options: { enabled?: boolean } = {}) => {
  return useQuery({
    queryKey: ['jobs', clientId],
    queryFn: () => jobsApi.getJobs(clientId),
    enabled: !!clientId && options.enabled !== false,
    // Optimize for frequent updates
    refetchInterval: 30000, // 30 seconds
    // Use previous data while refetching
    keepPreviousData: true,
    // Select only needed fields
    select: (data) => data.map(job => ({
      id: job.id,
      status: job.status,
      topic: job.topic,
      createdAt: job.createdAt,
      // Exclude heavy result data from list view
    })),
  });
};

// Prefetch related data
export const usePrefetchJobDetails = () => {
  const queryClient = useQueryClient();

  return useCallback((jobId: string) => {
    queryClient.prefetchQuery({
      queryKey: ['job', jobId],
      queryFn: () => jobsApi.getJob(jobId),
      staleTime: 2 * 60 * 1000, // 2 minutes
    });
  }, [queryClient]);
};

3. Component Optimization

// Memoization strategies
import { memo, useMemo, useCallback, useState, useEffect } from 'react';

// Memoize expensive calculations
const JobAnalytics = memo(({ jobs }: { jobs: Job[] }) => {
  const analytics = useMemo(() => {
    // Expensive calculation
    return {
      totalJobs: jobs.length,
      completedJobs: jobs.filter(job => job.status === 'completed').length,
      averageCost: jobs.reduce((sum, job) => sum + (job.estimatedCost || 0), 0) / jobs.length,
      successRate: jobs.filter(job => job.status === 'completed').length / jobs.length * 100,
    };
  }, [jobs]);

  return (
    <div className="grid grid-cols-2 md:grid-cols-4 gap-4">
      <div>Total Jobs: {analytics.totalJobs}</div>
      <div>Completed: {analytics.completedJobs}</div>
      <div>Avg Cost: ${analytics.averageCost.toFixed(4)}</div>
      <div>Success Rate: {analytics.successRate.toFixed(1)}%</div>
    </div>
  );
});

// Virtualized lists for large datasets
import { FixedSizeList as List } from 'react-window';

const VirtualizedJobList = ({ jobs }: { jobs: Job[] }) => {
  const Row = useCallback(({ index, style }: { index: number; style: React.CSSProperties }) => (
    <div style={style} className="p-2 border-b">
      <JobItem job={jobs[index]} />
    </div>
  ), [jobs]);

  return (
    <List
      height={600} // Fixed height
      itemCount={jobs.length}
      itemSize={80} // Height of each row
      width="100%"
    >
      {Row}
    </List>
  );
};

// Debounced search
const useDebounce = <T,>(value: T, delay: number): T => {
  const [debouncedValue, setDebouncedValue] = useState<T>(value);

  useEffect(() => {
    const handler = setTimeout(() => {
      setDebouncedValue(value);
    }, delay);

    return () => {
      clearTimeout(handler);
    };
  }, [value, delay]);

  return debouncedValue;
};

const SearchableJobList = () => {
  const [searchTerm, setSearchTerm] = useState('');
  const debouncedSearchTerm = useDebounce(searchTerm, 300);

  const { data: jobs } = useJobs(clientId, {
    enabled: !!debouncedSearchTerm || debouncedSearchTerm === '',
  });

  const filteredJobs = useMemo(() => {
    if (!debouncedSearchTerm) return jobs || [];
    return jobs?.filter(job => 
      job.topic.toLowerCase().includes(debouncedSearchTerm.toLowerCase())
    ) || [];
  }, [jobs, debouncedSearchTerm]);

  return (
    <div>
      <input
        type="text"
        value={searchTerm}
        onChange={(e) => setSearchTerm(e.target.value)}
        placeholder="Search jobs..."
      />
      <VirtualizedJobList jobs={filteredJobs} />
    </div>
  );
};

Backend Optimization

1. Database Query Optimization

# Efficient database queries
from typing import List, Optional
import asyncio

class OptimizedDatabase:
    def __init__(self, supabase_client):
        self.client = supabase_client

    async def get_jobs_with_analytics(
        self, 
        client_id: str, 
        status: Optional[str] = None,
        limit: int = 50,
        offset: int = 0
    ) -> dict:
        """Get jobs with aggregated analytics in a single query"""

        # Use raw SQL for complex queries
        query = """
        WITH job_data AS (
            SELECT 
                id, topic, status, created_at, estimated_cost, llm_provider,
                ROW_NUMBER() OVER (ORDER BY created_at DESC) as row_num
            FROM jobs 
            WHERE client_id = %(client_id)s
            AND (%(status)s IS NULL OR status = %(status)s)
        ),
        job_analytics AS (
            SELECT 
                COUNT(*) as total_jobs,
                COUNT(*) FILTER (WHERE status = 'completed') as completed_jobs,
                COUNT(*) FILTER (WHERE status = 'failed') as failed_jobs,
                AVG(estimated_cost) FILTER (WHERE estimated_cost IS NOT NULL) as avg_cost,
                SUM(estimated_cost) FILTER (WHERE estimated_cost IS NOT NULL) as total_cost
            FROM jobs 
            WHERE client_id = %(client_id)s
            AND (%(status)s IS NULL OR status = %(status)s)
        )
        SELECT 
            json_build_object(
                'jobs', json_agg(
                    json_build_object(
                        'id', jd.id,
                        'topic', jd.topic,
                        'status', jd.status,
                        'created_at', jd.created_at,
                        'estimated_cost', jd.estimated_cost,
                        'llm_provider', jd.llm_provider
                    )
                ) FILTER (WHERE jd.row_num <= %(limit)s AND jd.row_num > %(offset)s),
                'analytics', json_build_object(
                    'total_jobs', ja.total_jobs,
                    'completed_jobs', ja.completed_jobs,
                    'failed_jobs', ja.failed_jobs,
                    'success_rate', 
                        CASE 
                            WHEN ja.total_jobs > 0 THEN (ja.completed_jobs::float / ja.total_jobs * 100)
                            ELSE 0
                        END,
                    'avg_cost', ja.avg_cost,
                    'total_cost', ja.total_cost
                ),
                'pagination', json_build_object(
                    'total', ja.total_jobs,
                    'limit', %(limit)s,
                    'offset', %(offset)s,
                    'has_more', ja.total_jobs > %(offset)s + %(limit)s
                )
            ) as result
        FROM job_data jd
        CROSS JOIN job_analytics ja
        GROUP BY ja.total_jobs, ja.completed_jobs, ja.failed_jobs, ja.avg_cost, ja.total_cost
        """

        result = await self.client.rpc('execute_sql', {
            'sql': query,
            'params': {
                'client_id': client_id,
                'status': status,
                'limit': limit,
                'offset': offset
            }
        }).execute()

        return result.data[0]['result'] if result.data else {}

    async def batch_update_job_statuses(self, updates: List[dict]) -> List[dict]:
        """Batch update multiple job statuses efficiently"""

        # Prepare batch update
        update_cases = []
        job_ids = []

        for update in updates:
            job_ids.append(update['id'])
            update_cases.append(f"WHEN '{update['id']}' THEN '{update['status']}'")

        if not update_cases:
            return []

        query = f"""
        UPDATE jobs 
        SET 
            status = CASE id {' '.join(update_cases)} END,
            updated_at = NOW()
        WHERE id IN ({','.join([f"'{id}'" for id in job_ids])})
        RETURNING id, status, updated_at
        """

        result = await self.client.rpc('execute_sql', {'sql': query}).execute()
        return result.data

    # Connection pooling and caching
    from functools import lru_cache
    import asyncio

    @lru_cache(maxsize=100)
    async def get_client_strategy_cached(self, client_id: str, content_type: str) -> dict:
        """Cache client strategies to reduce database hits"""
        result = await self.client.table('strategies').select('*').eq(
            'client_id', client_id
        ).eq('content_type', content_type).eq('is_active', True).execute()

        return result.data[0] if result.data else {}

    async def bulk_create_jobs(self, jobs_data: List[dict]) -> List[dict]:
        """Efficiently create multiple jobs"""
        # Use batch insert instead of individual inserts
        result = await self.client.table('jobs').insert(jobs_data).execute()

        # Process in background
        job_ids = [job['id'] for job in result.data]
        asyncio.create_task(self._process_jobs_background(job_ids))

        return result.data

    async def _process_jobs_background(self, job_ids: List[str]):
        """Background job processing"""
        for job_id in job_ids:
            try:
                await self._process_single_job(job_id)
            except Exception as e:
                logger.error(f"Background job processing failed: {e}", job_id=job_id)

2. Caching Strategies

# Redis caching implementation
import redis.asyncio as redis
import json
import pickle
from typing import Any, Optional
from functools import wraps

class CacheManager:
    def __init__(self, redis_url: str):
        self.redis = redis.from_url(redis_url)

    async def get(self, key: str) -> Optional[Any]:
        """Get value from cache"""
        try:
            value = await self.redis.get(key)
            if value:
                return pickle.loads(value)
        except Exception as e:
            logger.warning(f"Cache get failed: {e}", key=key)
        return None

    async def set(self, key: str, value: Any, ttl: int = 3600) -> bool:
        """Set value in cache with TTL"""
        try:
            serialized = pickle.dumps(value)
            await self.redis.setex(key, ttl, serialized)
            return True
        except Exception as e:
            logger.warning(f"Cache set failed: {e}", key=key)
            return False

    async def delete(self, key: str) -> bool:
        """Delete key from cache"""
        try:
            await self.redis.delete(key)
            return True
        except Exception as e:
            logger.warning(f"Cache delete failed: {e}", key=key)
            return False

    async def invalidate_pattern(self, pattern: str):
        """Invalidate all keys matching pattern"""
        try:
            keys = await self.redis.keys(pattern)
            if keys:
                await self.redis.delete(*keys)
        except Exception as e:
            logger.warning(f"Cache invalidation failed: {e}", pattern=pattern)

# Cache decorator
def cached(ttl: int = 3600, key_prefix: str = ""):
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            # Generate cache key
            cache_key = f"{key_prefix}:{func.__name__}:{hash(str(args) + str(kwargs))}"

            # Try to get from cache
            cached_result = await cache_manager.get(cache_key)
            if cached_result is not None:
                logger.debug(f"Cache hit: {cache_key}")
                return cached_result

            # Execute function
            result = await func(*args, **kwargs)

            # Store in cache
            await cache_manager.set(cache_key, result, ttl)
            logger.debug(f"Cache set: {cache_key}")

            return result
        return wrapper
    return decorator

# Usage examples
cache_manager = CacheManager(os.getenv("REDIS_URL"))

@cached(ttl=1800, key_prefix="client_strategy")  # 30 minutes
async def get_client_strategy(client_id: str, content_type: str) -> dict:
    # Database query
    pass

@cached(ttl=300, key_prefix="job_analytics")  # 5 minutes
async def get_job_analytics(client_id: str) -> dict:
    # Heavy analytics calculation
    pass

# Cache invalidation on updates
async def update_client_strategy(client_id: str, strategy_data: dict):
    # Update database
    result = await database.update_strategy(client_id, strategy_data)

    # Invalidate related cache
    await cache_manager.invalidate_pattern(f"client_strategy:*:{client_id}:*")

    return result

3. Async Processing and Queue Management

# Background job processing with Celery alternative
import asyncio
from typing import Dict, Any, Callable
import structlog
from enum import Enum

logger = structlog.get_logger()

class TaskStatus(Enum):
    PENDING = "pending"
    RUNNING = "running" 
    COMPLETED = "completed"
    FAILED = "failed"

class AsyncTaskQueue:
    def __init__(self, max_workers: int = 10):
        self.max_workers = max_workers
        self.queue = asyncio.Queue()
        self.workers = []
        self.tasks: Dict[str, Dict[str, Any]] = {}
        self.running = False

    async def start(self):
        """Start the task queue workers"""
        self.running = True
        self.workers = [
            asyncio.create_task(self._worker(f"worker-{i}"))
            for i in range(self.max_workers)
        ]
        logger.info(f"Started {self.max_workers} task queue workers")

    async def stop(self):
        """Stop the task queue workers"""
        self.running = False

        # Cancel all workers
        for worker in self.workers:
            worker.cancel()

        # Wait for workers to finish
        await asyncio.gather(*self.workers, return_exceptions=True)
        logger.info("Task queue workers stopped")

    async def enqueue(self, task_id: str, func: Callable, *args, **kwargs) -> str:
        """Enqueue a task for processing"""
        task_data = {
            'id': task_id,
            'func': func,
            'args': args,
            'kwargs': kwargs,
            'status': TaskStatus.PENDING,
            'created_at': asyncio.get_event_loop().time(),
            'result': None,
            'error': None
        }

        self.tasks[task_id] = task_data
        await self.queue.put(task_data)

        logger.info(f"Task enqueued: {task_id}")
        return task_id

    async def get_task_status(self, task_id: str) -> Dict[str, Any]:
        """Get task status and result"""
        return self.tasks.get(task_id, {})

    async def _worker(self, worker_name: str):
        """Worker coroutine to process tasks"""
        logger.info(f"Worker {worker_name} started")

        while self.running:
            try:
                # Get task from queue with timeout
                task_data = await asyncio.wait_for(
                    self.queue.get(), 
                    timeout=1.0
                )

                await self._process_task(worker_name, task_data)
                self.queue.task_done()

            except asyncio.TimeoutError:
                continue  # Check if still running
            except Exception as e:
                logger.error(f"Worker {worker_name} error: {e}")

    async def _process_task(self, worker_name: str, task_data: Dict[str, Any]):
        """Process a single task"""
        task_id = task_data['id']

        try:
            # Update status to running
            task_data['status'] = TaskStatus.RUNNING
            task_data['started_at'] = asyncio.get_event_loop().time()

            logger.info(f"Worker {worker_name} processing task: {task_id}")

            # Execute the task
            func = task_data['func']
            args = task_data['args']
            kwargs = task_data['kwargs']

            if asyncio.iscoroutinefunction(func):
                result = await func(*args, **kwargs)
            else:
                result = func(*args, **kwargs)

            # Update task with result
            task_data['status'] = TaskStatus.COMPLETED
            task_data['result'] = result
            task_data['completed_at'] = asyncio.get_event_loop().time()

            logger.info(f"Task completed: {task_id}")

        except Exception as e:
            # Update task with error
            task_data['status'] = TaskStatus.FAILED
            task_data['error'] = str(e)
            task_data['failed_at'] = asyncio.get_event_loop().time()

            logger.error(f"Task failed: {task_id}, error: {e}")

# Usage in FastAPI application
task_queue = AsyncTaskQueue(max_workers=5)

@app.on_event("startup")
async def startup_event():
    await task_queue.start()

@app.on_event("shutdown") 
async def shutdown_event():
    await task_queue.stop()

# Content generation with queue
async def process_content_generation(job_id: str, request_data: dict):
    """Background content generation task"""
    try:
        # Process the content generation
        result = await content_processor.generate_content(request_data)

        # Save result to database
        await database.update_job_result(job_id, result)

        return result

    except Exception as e:
        await database.update_job_status(job_id, 'failed', error_message=str(e))
        raise

@app.post("/generate")
async def generate_content_endpoint(request: ContentRequest):
    # Create job record
    job = await database.create_job(request.dict())

    # Queue background processing
    await task_queue.enqueue(
        task_id=job['id'],
        func=process_content_generation,
        job_id=job['id'],
        request_data=request.dict()
    )

    return {"job_id": job['id'], "status": "queued"}

@app.get("/jobs/{job_id}/status")
async def get_job_status(job_id: str):
    # Get from queue first (for real-time status)
    queue_status = await task_queue.get_task_status(job_id)

    if queue_status:
        return {
            "job_id": job_id,
            "status": queue_status['status'].value,
            "result": queue_status.get('result'),
            "error": queue_status.get('error')
        }

    # Fall back to database
    job = await database.get_job(job_id)
    return {
        "job_id": job_id,
        "status": job['status'],
        "result": job.get('result'),
        "error": job.get('error_message')
    }

Performance Monitoring

1. Application Metrics

# Performance monitoring setup
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time
from functools import wraps

# Metrics
REQUEST_COUNT = Counter('http_requests_total', 'Total HTTP requests', ['method', 'endpoint', 'status'])
REQUEST_DURATION = Histogram('http_request_duration_seconds', 'HTTP request duration')
ACTIVE_CONNECTIONS = Gauge('active_connections', 'Active database connections')
JOB_PROCESSING_TIME = Histogram('job_processing_duration_seconds', 'Job processing time', ['provider'])
CACHE_HIT_RATE = Counter('cache_requests_total', 'Cache requests', ['type'])

def monitor_endpoint(func):
    """Decorator to monitor endpoint performance"""
    @wraps(func)
    async def wrapper(request, *args, **kwargs):
        start_time = time.time()
        status_code = 200

        try:
            response = await func(request, *args, **kwargs)
            if hasattr(response, 'status_code'):
                status_code = response.status_code
            return response
        except Exception as e:
            status_code = 500
            raise
        finally:
            # Record metrics
            duration = time.time() - start_time
            REQUEST_COUNT.labels(
                method=request.method,
                endpoint=request.url.path,
                status=status_code
            ).inc()
            REQUEST_DURATION.observe(duration)

    return wrapper

# Start Prometheus metrics server
start_http_server(8000)

# Usage in FastAPI
@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
    start_time = time.time()
    response = await call_next(request)

    # Record metrics
    duration = time.time() - start_time
    REQUEST_COUNT.labels(
        method=request.method,
        endpoint=request.url.path,
        status=response.status_code
    ).inc()
    REQUEST_DURATION.observe(duration)

    return response

2. Performance Testing

# Load testing with locust
from locust import HttpUser, task, between
import random

class ContentGenerationUser(HttpUser):
    wait_time = between(1, 3)

    def on_start(self):
        """Set up test data"""
        self.client_id = "test-client"
        self.topics = [
            "AI in Healthcare",
            "Future of Remote Work", 
            "Sustainable Energy Solutions",
            "Digital Marketing Trends",
            "Cloud Computing Benefits"
        ]

    @task(3)
    def generate_content(self):
        """Test content generation endpoint"""
        payload = {
            "topic": random.choice(self.topics),
            "content_type": "blog_post",
            "keywords": ["technology", "innovation"],
            "client_id": self.client_id,
            "llm_provider": "openai"
        }

        with self.client.post("/generate", json=payload, catch_response=True) as response:
            if response.status_code == 200:
                job_id = response.json().get("job_id")
                if job_id:
                    response.success()
                    # Check job status
                    self.check_job_status(job_id)
                else:
                    response.failure("No job_id in response")
            else:
                response.failure(f"Request failed with status {response.status_code}")

    @task(1)
    def check_job_status(self, job_id: str = None):
        """Test job status endpoint"""
        if not job_id:
            # Use a dummy job ID for testing
            job_id = "test-job-123"

        with self.client.get(f"/jobs/{job_id}", catch_response=True) as response:
            if response.status_code == 200:
                response.success()
            elif response.status_code == 404:
                response.success()  # Expected for dummy job IDs
            else:
                response.failure(f"Unexpected status code: {response.status_code}")

    @task(1)
    def list_jobs(self):
        """Test job listing endpoint"""
        with self.client.get(f"/jobs?client_id={self.client_id}", catch_response=True) as response:
            if response.status_code == 200:
                jobs = response.json().get("data", [])
                response.success()
            else:
                response.failure(f"Request failed with status {response.status_code}")

# Run load test
# locust -f load_test.py --host=http://localhost:8001

This Developer Guide is a living document that evolves with the HG Content Generation System. For the latest updates and additional resources, refer to the project repository and internal documentation.

Last Updated: August 2025
Version: 1.0.0
Maintainer: HG Content Development Team