HG Content Generation System - Developer Guide¶
Table of Contents¶
- Development Setup and Prerequisites
- Project Structure and Architecture
- Code Style and Conventions
- Testing Strategies and Frameworks
- Contributing Guidelines and Workflow
- Module Development Guides
- API Development and Integration
- Database Development and Migrations
- Debugging and Troubleshooting
- Performance Optimization Techniques
Development Setup and Prerequisites¶
System Requirements¶
Node.js Environment¶
- Node.js: >= 20.0.0 (LTS recommended; required for Turbo dev scripts)
- pnpm: >= 8.0.0 (install via
corepack enable pnpm) - npm: For global packages only
Python Environment¶
- Python: >= 3.12
- pip: Latest version
- venv: Virtual environment management
Database and Services¶
- Supabase CLI: For local database development
- Docker: For containerized services (optional)
- Redis: For caching and job queues
Initial Setup¶
1. Clone and Install Dependencies¶
# Clone the repository
git clone https://github.com/your-org/hg-content.git
cd hg-content
# Enable pnpm via Corepack (Node 20+)
corepack enable pnpm
# Install Node.js dependencies
pnpm install
# Set up Python environments for each service
cd apps/cpm
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
pip install -r requirements.txt
cd ../im
python -m venv venv
source venv/bin/activate
pip install -r requirements.txt
cd ../../smm
python -m venv venv
source venv/bin/activate
pip install -r requirements.txt
2. Environment Configuration¶
Create environment files for each service:
Frontend (.env.local)
# Supabase Configuration
NEXT_PUBLIC_SUPABASE_URL=http://localhost:54321
NEXT_PUBLIC_SUPABASE_ANON_KEY=your-local-anon-key
SUPABASE_SERVICE_ROLE_KEY=your-local-service-role-key
# Local service URLs
NEXT_PUBLIC_CPM_API_URL=http://localhost:8001
NEXT_PUBLIC_SMM_API_URL=http://localhost:8002
NEXT_PUBLIC_IM_API_URL=http://localhost:8003
# Development flags
NODE_ENV=development
NEXT_PUBLIC_DEBUG=true
Backend Services (.env)
# Database
SUPABASE_URL=http://localhost:54321
SUPABASE_SERVICE_ROLE_KEY=your-local-service-role-key
# Redis (optional for local development)
REDIS_URL=redis://localhost:6379
# LLM API Keys (use your own keys)
OPENAI_API_KEY=sk-your-openai-key
ANTHROPIC_API_KEY=sk-ant-your-anthropic-key
GOOGLE_API_KEY=your-google-ai-key
GROQ_API_KEY=gsk_your-groq-key
OPENROUTER_API_KEY=sk-openrouter-key
OPENROUTER_DEFAULT_MODEL=openai/gpt-4o-mini
OPENROUTER_ORIGIN=http://localhost:3000
OPENROUTER_APP_NAME="HG Content (Local)"
OPENROUTER_TIMEOUT=120
# Inter-service communication
IM_BASE_URL=http://localhost:8003
SMM_BASE_URL=http://localhost:8002
CPM_BASE_URL=http://localhost:8001
# CPM client authentication (JSON map of client_id -> plaintext key)
CPM_CLIENT_API_KEYS={"550e8400-e29b-41d4-a716-446655440000":"cpm_test_example_key"}
# Development settings
LOG_LEVEL=DEBUG
ENVIRONMENT=development
3. Database Setup¶
# Install Supabase CLI
npm install -g @supabase/cli
# Start local Supabase instance
supabase start
# Push database schema
supabase db push
# Run seed data (optional)
cd scripts
python seed_database.py
4. Start Development Environment¶
# Start all services with Turbo
pnpm dev
# Or start services individually:
# Terminal 1: Frontend
cd apps/frontend && pnpm dev
# Terminal 2: CPM Service
cd apps/cpm && source venv/bin/activate && uvicorn app:app --reload --port 8001
# Terminal 3: SMM Service
cd smm && source venv/bin/activate && uvicorn app.api:app --reload --port 8002
# Terminal 4: IM Service (when available)
cd apps/im && source venv/bin/activate && uvicorn app:app --reload --port 8003
Development Tools Setup¶
VSCode Configuration¶
Create .vscode/settings.json:
{
"python.defaultInterpreterPath": "./venv/bin/python",
"python.formatting.provider": "black",
"python.linting.enabled": true,
"python.linting.pylintEnabled": true,
"typescript.preferences.importModuleSpecifier": "relative",
"editor.formatOnSave": true,
"editor.codeActionsOnSave": {
"source.organizeImports": true,
"source.fixAll.eslint": true
},
"[python]": {
"editor.defaultFormatter": "ms-python.black-formatter"
},
"[typescript]": {
"editor.defaultFormatter": "esbenp.prettier-vscode"
},
"[typescriptreact]": {
"editor.defaultFormatter": "esbenp.prettier-vscode"
}
}
Git Hooks Setup¶
# Install pre-commit hooks
npm install -g @commitlint/cli @commitlint/config-conventional
pip install pre-commit
# Set up pre-commit
pre-commit install
Project Structure and Architecture¶
Monorepo Organization¶
hg-content/
├── apps/ # Application modules
│ ├── frontend/ # Next.js web application
│ │ ├── app/ # Next.js App Router
│ │ ├── components/ # React components
│ │ ├── lib/ # Utilities and configurations
│ │ ├── hooks/ # Custom React hooks
│ │ ├── stores/ # Zustand state management
│ │ └── __tests__/ # Frontend tests
│ ├── cpm/ # Content Production Module
│ │ ├── app.py # FastAPI application
│ │ ├── content_processor.py # Core content processing
│ │ ├── llm_client.py # LLM provider clients
│ │ ├── database.py # Database operations
│ │ └── tests/ # CPM tests
│ └── im/ # Instructions Module (future)
├── smm/ # Strategy Management Module
│ ├── app/ # FastAPI application
│ ├── lib/ # Core business logic
│ ├── sql/ # Database migrations
│ └── tests/ # SMM tests
├── packages/ # Shared utilities (future)
├── supabase/ # Database schema and migrations
├── docs/ # Documentation
├── scripts/ # Development and deployment scripts
├── .github/ # CI/CD workflows
├── package.json # Root package configuration
├── turbo.json # Turborepo configuration
└── pnpm-workspace.yaml # PNPM workspace configuration
Architecture Patterns¶
1. Microservices Architecture¶
- Service Isolation: Each module runs independently
- API Communication: RESTful APIs between services
- Database Per Service: Logical separation with shared Supabase instance
- Independent Deployment: Each service can be deployed separately
2. Frontend Architecture¶
- Next.js App Router: File-based routing with server/client components
- Component Composition: Reusable UI components with Radix UI
- State Management: Zustand for global state, React Query for server state
- Type Safety: Full TypeScript coverage with strict configuration
3. Backend Architecture¶
- FastAPI: Async Python web framework
- Dependency Injection: Service layer pattern
- Database Abstraction: Supabase client with typed models
- Error Handling: Structured exception handling with proper HTTP status codes
Code Style and Conventions¶
TypeScript/JavaScript Conventions¶
1. Naming Conventions¶
// Components: PascalCase
export const ContentRequestForm = () => {};
// Functions and variables: camelCase
const handleSubmit = () => {};
const userData = {};
// Constants: SCREAMING_SNAKE_CASE
const MAX_RETRY_ATTEMPTS = 3;
// Types and interfaces: PascalCase
interface UserData {
id: string;
name: string;
}
// Enums: PascalCase with descriptive values
enum JobStatus {
Pending = 'pending',
InProgress = 'in-progress',
Completed = 'completed',
Failed = 'failed'
}
2. File Organization¶
// Component file structure
import React from 'react';
import { useState, useEffect } from 'react';
// External library imports
import { useQuery } from '@tanstack/react-query';
import { Button } from '@/components/ui/button';
// Internal imports (relative)
import { useContentStore } from '../stores/useContentStore';
import { ContentRequest } from '../types';
// Component definition
export const ContentRequestForm = ({ onSubmit }: Props) => {
// State declarations
const [isLoading, setIsLoading] = useState(false);
// Custom hooks
const { createRequest } = useContentStore();
// Effects
useEffect(() => {
// Effect logic
}, []);
// Event handlers
const handleSubmit = async (data: ContentRequest) => {
// Handler logic
};
// Render
return (
<form onSubmit={handleSubmit}>
{/* JSX */}
</form>
);
};
3. ESLint Configuration¶
{
"extends": [
"next/core-web-vitals",
"@typescript-eslint/recommended",
"prettier"
],
"rules": {
"react-hooks/exhaustive-deps": "warn",
"@typescript-eslint/no-unused-vars": "error",
"@typescript-eslint/explicit-function-return-type": "warn",
"prefer-const": "error",
"no-var": "error"
}
}
Python Conventions¶
1. Naming Conventions¶
# Classes: PascalCase
class ContentProcessor:
pass
# Functions and variables: snake_case
def process_content(request_data: dict) -> dict:
user_id = request_data.get('user_id')
return {}
# Constants: SCREAMING_SNAKE_CASE
MAX_TOKENS = 4000
DEFAULT_PROVIDER = 'openai'
# Private methods: leading underscore
def _validate_request(self, data: dict) -> bool:
pass
2. Type Hints¶
from typing import Optional, List, Dict, Any
from pydantic import BaseModel
# Function signatures
async def generate_content(
prompt: str,
provider: str = 'openai',
max_tokens: Optional[int] = None
) -> Dict[str, Any]:
pass
# Pydantic models for API
class ContentRequest(BaseModel):
topic: str
content_type: str
keywords: List[str]
client_id: str
max_tokens: Optional[int] = 1000
3. Error Handling¶
from fastapi import HTTPException
import logging
logger = logging.getLogger(__name__)
async def process_request(request: ContentRequest):
try:
result = await llm_client.generate(request.prompt)
return result
except ValueError as e:
logger.error(f"Invalid request data: {e}")
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Unexpected error: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
Documentation Standards¶
1. TypeScript Documentation¶
/**
* Hook for managing content requests with optimistic updates
* and error handling.
*
* @param clientId - The client ID to filter requests
* @returns Object containing request state and mutation functions
*
* @example
* ```tsx
* const { createRequest, isLoading, error } = useContentRequests('client-123');
*
* const handleSubmit = async (data) => {
* await createRequest(data);
* };
* ```
*/
export const useContentRequests = (clientId: string) => {
// Implementation
};
2. Python Documentation¶
async def generate_content(
prompt: str,
provider: str = 'openai',
max_tokens: Optional[int] = None
) -> Dict[str, Any]:
"""
Generate content using the specified LLM provider.
Args:
prompt: The input prompt for content generation
provider: LLM provider to use ('openai', 'anthropic', 'google', 'groq')
max_tokens: Maximum tokens to generate (provider-specific defaults apply)
Returns:
Dictionary containing generated content and metadata:
- content: Generated text content
- tokens_used: Number of tokens consumed
- cost_estimate: Estimated cost in USD
- provider_model: Specific model used
Raises:
ValueError: If prompt is empty or provider is invalid
HTTPException: If API request fails
Example:
>>> result = await generate_content("Write about AI", "openai", 1000)
>>> print(result['content'])
"""
Testing Strategies and Frameworks¶
Frontend Testing¶
1. Test Structure¶
// components/__tests__/ContentRequestForm.test.tsx
import { render, screen, fireEvent, waitFor } from '@testing-library/react';
import { QueryClient, QueryClientProvider } from '@tanstack/react-query';
import { ContentRequestForm } from '../ContentRequestForm';
const createTestQueryClient = () => new QueryClient({
defaultOptions: {
queries: { retry: false },
mutations: { retry: false },
},
});
const renderWithProviders = (component: React.ReactElement) => {
const queryClient = createTestQueryClient();
return render(
<QueryClientProvider client={queryClient}>
{component}
</QueryClientProvider>
);
};
describe('ContentRequestForm', () => {
it('should submit form with valid data', async () => {
const onSubmit = jest.fn();
renderWithProviders(<ContentRequestForm onSubmit={onSubmit} />);
// Fill form
fireEvent.change(screen.getByLabelText(/topic/i), {
target: { value: 'AI in Healthcare' }
});
// Submit
fireEvent.click(screen.getByRole('button', { name: /generate/i }));
await waitFor(() => {
expect(onSubmit).toHaveBeenCalledWith({
topic: 'AI in Healthcare',
// ... other expected data
});
});
});
});
2. Integration Tests¶
// integration/ContentRequestForm.integration.test.tsx
import { setupServer } from 'msw/node';
import { rest } from 'msw';
import { renderWithProviders } from '../test-utils';
const server = setupServer(
rest.post('/api/content/generate', (req, res, ctx) => {
return res(
ctx.json({
jobId: 'job-123',
status: 'pending'
})
);
})
);
beforeAll(() => server.listen());
afterEach(() => server.resetHandlers());
afterAll(() => server.close());
describe('Content Request Integration', () => {
it('should create content request and show job status', async () => {
// Test implementation
});
});
3. Performance Tests¶
// performance/ContentRequestForm.performance.test.tsx
import { measureRender } from '../test-utils/performance';
describe('ContentRequestForm Performance', () => {
it('should render within acceptable time limits', async () => {
const renderTime = await measureRender(<ContentRequestForm />);
expect(renderTime).toBeLessThan(100); // milliseconds
});
});
Backend Testing¶
1. FastAPI Test Setup¶
# tests/conftest.py
import pytest
from fastapi.testclient import TestClient
from unittest.mock import AsyncMock
from app import app
from database import get_database
@pytest.fixture
def client():
"""Test client for FastAPI app"""
return TestClient(app)
@pytest.fixture
def mock_database():
"""Mock database for testing"""
return AsyncMock()
@pytest.fixture(autouse=True)
def override_dependencies(mock_database):
"""Override app dependencies with mocks"""
app.dependency_overrides[get_database] = lambda: mock_database
yield
app.dependency_overrides.clear()
2. API Endpoint Tests¶
# tests/test_api.py
import pytest
from unittest.mock import patch, AsyncMock
def test_generate_content_success(client, mock_database):
"""Test successful content generation"""
# Mock database response
mock_database.create_job.return_value = {
'id': 'job-123',
'status': 'pending'
}
with patch('llm_client.OpenAIClient.generate') as mock_generate:
mock_generate.return_value = {
'content': 'Generated content',
'tokens_used': 150
}
response = client.post('/generate', json={
'topic': 'AI in Healthcare',
'content_type': 'blog_post',
'client_id': 'client-123'
})
assert response.status_code == 200
assert response.json()['status'] == 'pending'
def test_generate_content_validation_error(client):
"""Test validation error handling"""
response = client.post('/generate', json={
'topic': '', # Invalid empty topic
'content_type': 'blog_post'
})
assert response.status_code == 422
@pytest.mark.asyncio
async def test_async_content_processing():
"""Test async content processing logic"""
from content_processor import ContentProcessor
processor = ContentProcessor()
result = await processor.process_content({
'prompt': 'Test prompt',
'provider': 'openai'
})
assert 'content' in result
assert result['tokens_used'] > 0
3. Database Tests¶
# tests/test_database.py
import pytest
from database import Database
from unittest.mock import AsyncMock
@pytest.fixture
async def database():
"""Database instance for testing"""
db = Database()
# Use test database
db.client = AsyncMock()
return db
@pytest.mark.asyncio
async def test_create_job(database):
"""Test job creation"""
job_data = {
'topic': 'Test topic',
'client_id': 'client-123',
'status': 'pending'
}
database.client.table.return_value.insert.return_value.execute.return_value = {
'data': [{'id': 'job-123', **job_data}]
}
result = await database.create_job(job_data)
assert result['id'] == 'job-123'
assert result['status'] == 'pending'
Test Commands¶
# Frontend tests
cd apps/frontend
pnpm test # Run all tests
pnpm test:watch # Watch mode
pnpm test:coverage # Coverage report
# Backend tests
cd apps/cpm
source venv/bin/activate
pytest # Run all tests
pytest -v # Verbose output
pytest --cov=app # Coverage report
pytest -k "test_api" # Run specific tests
pytest --asyncio-mode=auto # Async test mode
Contributing Guidelines and Workflow¶
Git Workflow¶
1. Branch Naming Convention¶
# Feature branches
feature/add-authentication-system
feature/improve-content-quality
# Bug fixes
bugfix/fix-job-status-updates
bugfix/handle-api-timeout-errors
# Hotfixes
hotfix/critical-security-patch
# Documentation
docs/update-api-documentation
docs/add-deployment-guide
2. Commit Message Format¶
Follow Conventional Commits specification:
# Format: <type>[optional scope]: <description>
# Examples:
feat(cpm): add support for Anthropic Claude models
fix(frontend): resolve job status polling issue
docs(api): update authentication endpoint documentation
test(smm): add integration tests for strategy management
refactor(database): optimize query performance
chore(deps): update dependencies to latest versions
# Breaking changes:
feat(api)!: change authentication to OAuth 2.0
# Multi-line for complex changes:
feat(cpm): implement batch content generation
- Add support for processing multiple requests
- Implement queue-based batch processing
- Add cost optimization for batch operations
- Update API documentation
Closes #123
3. Pull Request Process¶
PR Title Format:
<type>: <brief description>
Examples:
feat: Add real-time job status updates
fix: Resolve authentication token refresh issue
docs: Update developer setup guide
PR Description Template:
## Description
Brief description of changes made.
## Type of Change
- [ ] Bug fix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected)
- [ ] Documentation update
- [ ] Performance improvement
- [ ] Code refactoring
## How Has This Been Tested?
- [ ] Unit tests pass
- [ ] Integration tests pass
- [ ] Manual testing completed
- [ ] Performance testing (if applicable)
## Testing Steps
1. Step one
2. Step two
3. Step three
## Screenshots/Videos (if applicable)
Add screenshots or videos to help explain your changes.
## Checklist
- [ ] My code follows the style guidelines of this project
- [ ] I have performed a self-review of my code
- [ ] I have commented my code, particularly in hard-to-understand areas
- [ ] I have made corresponding changes to the documentation
- [ ] My changes generate no new warnings
- [ ] I have added tests that prove my fix is effective or that my feature works
- [ ] New and existing unit tests pass locally with my changes
## Related Issues
Closes #123
Relates to #456
4. Code Review Guidelines¶
For Reviewers: - Review within 24 hours - Focus on logic, security, performance, and maintainability - Provide constructive feedback with examples - Approve only when all concerns are addressed
Review Checklist: - [ ] Code follows project conventions - [ ] Tests are comprehensive and pass - [ ] Documentation is updated - [ ] Security implications considered - [ ] Performance impact assessed - [ ] Backward compatibility maintained
Development Workflow¶
1. Setting Up for New Features¶
# Start from main branch
git checkout main
git pull origin main
# Create feature branch
git checkout -b feature/add-new-feature
# Make changes and commit regularly
git add .
git commit -m "feat: implement core functionality"
# Push and create PR
git push origin feature/add-new-feature
2. Code Quality Checks¶
# Run before committing
pnpm lint # Check linting issues
pnpm format # Format code
pnpm test # Run tests
pnpm build # Test build process
# Python services
black . # Format Python code
pylint app/ # Check Python linting
pytest --cov=app # Run tests with coverage
3. Pre-commit Hooks¶
Create .pre-commit-config.yaml:
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.4.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml
- id: check-added-large-files
- repo: https://github.com/psf/black
rev: 22.3.0
hooks:
- id: black
language_version: python3
- repo: https://github.com/pre-commit/mirrors-eslint
rev: v8.44.0
hooks:
- id: eslint
files: \.(js|ts|tsx)$
types: [file]
Module Development Guides¶
Content Production Module (CPM) Development¶
1. Architecture Overview¶
# app.py - FastAPI application entry point
from fastapi import FastAPI, BackgroundTasks
from content_processor import ContentProcessor
from llm_client import LLMClientFactory
from database import Database
app = FastAPI(title="Content Production Module")
processor = ContentProcessor()
db = Database()
@app.post("/generate")
async def generate_content(
request: ContentRequest,
background_tasks: BackgroundTasks
):
# Queue processing job
job = await db.create_job(request.dict())
background_tasks.add_task(processor.process_job, job['id'])
return {"job_id": job['id'], "status": "queued"}
2. Adding New LLM Providers¶
# llm_client.py
from abc import ABC, abstractmethod
from typing import Dict, Any
class BaseLLMClient(ABC):
"""Base class for all LLM providers"""
@abstractmethod
async def generate(self, prompt: str, **kwargs) -> Dict[str, Any]:
"""Generate content using the provider's API"""
pass
@abstractmethod
def calculate_cost(self, input_tokens: int, output_tokens: int) -> float:
"""Calculate estimated cost for the request"""
pass
class NewProviderClient(BaseLLMClient):
def __init__(self, api_key: str):
self.api_key = api_key
self.client = NewProviderSDK(api_key=api_key)
async def generate(self, prompt: str, **kwargs) -> Dict[str, Any]:
response = await self.client.completions.create(
prompt=prompt,
max_tokens=kwargs.get('max_tokens', 1000)
)
return {
'content': response.text,
'input_tokens': response.usage.prompt_tokens,
'output_tokens': response.usage.completion_tokens,
'model': response.model,
'cost_estimate': self.calculate_cost(
response.usage.prompt_tokens,
response.usage.completion_tokens
)
}
def calculate_cost(self, input_tokens: int, output_tokens: int) -> float:
# Provider-specific pricing
input_cost = (input_tokens / 1000) * 0.001 # $0.001 per 1K tokens
output_cost = (output_tokens / 1000) * 0.002 # $0.002 per 1K tokens
return input_cost + output_cost
# Register new provider
class LLMClientFactory:
@staticmethod
def create_client(provider: str, api_key: str) -> BaseLLMClient:
clients = {
'openai': OpenAIClient,
'anthropic': AnthropicClient,
'google': GoogleClient,
'groq': GroqClient,
'newprovider': NewProviderClient # Add new provider
}
if provider not in clients:
raise ValueError(f"Unsupported provider: {provider}")
return clients[provider](api_key)
3. Content Processing Pipeline¶
# content_processor.py
import asyncio
from typing import Dict, Any
import structlog
logger = structlog.get_logger()
class ContentProcessor:
def __init__(self):
self.llm_factory = LLMClientFactory()
self.db = Database()
async def process_job(self, job_id: str):
"""Process a content generation job"""
try:
# Update job status
await self.db.update_job_status(job_id, 'in-progress')
# Get job details
job = await self.db.get_job(job_id)
# Get client strategy (via SMM)
strategy = await self._get_client_strategy(job['client_id'])
# Generate optimized prompt (via IM)
prompt = await self._generate_prompt(job, strategy)
# Generate content
result = await self._generate_content(prompt, job['llm_provider'])
# Post-process content
processed_result = await self._post_process_content(result, strategy)
# Save results
await self.db.update_job_result(job_id, processed_result)
await self.db.update_job_status(job_id, 'completed')
logger.info("Job completed successfully", job_id=job_id)
except Exception as e:
logger.error("Job processing failed", job_id=job_id, error=str(e))
await self.db.update_job_status(job_id, 'failed', error_message=str(e))
async def _generate_content(self, prompt: str, provider: str) -> Dict[str, Any]:
"""Generate content using specified provider"""
api_key = os.getenv(f"{provider.upper()}_API_KEY")
client = self.llm_factory.create_client(provider, api_key)
# Implement retry logic
for attempt in range(3):
try:
return await client.generate(prompt)
except Exception as e:
if attempt == 2: # Last attempt
raise
await asyncio.sleep(2 ** attempt) # Exponential backoff
Instructions Module (IM) Development¶
1. Prompt Template System¶
# lib/prompt_manager.py
from jinja2 import Template
from typing import Dict, Any, List
class PromptManager:
def __init__(self, db: Database):
self.db = db
async def generate_prompt(
self,
client_id: str,
content_type: str,
context: Dict[str, Any]
) -> str:
"""Generate dynamic prompt based on client strategy and context"""
# Get client-specific template
template = await self.db.get_prompt_template(client_id, content_type)
if not template:
# Fall back to default template
template = await self.db.get_default_template(content_type)
# Render template with context
jinja_template = Template(template['template'])
return jinja_template.render(**context)
async def create_template(
self,
client_id: str,
name: str,
content_type: str,
template: str,
variables: List[str]
) -> Dict[str, Any]:
"""Create new prompt template"""
# Validate template syntax
try:
Template(template).render(**{var: f"test_{var}" for var in variables})
except Exception as e:
raise ValueError(f"Invalid template syntax: {e}")
return await self.db.create_prompt_template({
'client_id': client_id,
'name': name,
'content_type': content_type,
'template': template,
'variables': variables
})
# Example template usage
BLOG_POST_TEMPLATE = """
Write a comprehensive blog post about {{ topic }}.
Target Audience: {{ target_audience | default("general audience") }}
Tone: {{ tone | default("professional") }}
Word Count: {{ word_count | default("800-1200 words") }}
{% if keywords %}
SEO Keywords to include naturally:
{% for keyword in keywords %}
- {{ keyword }}
{% endfor %}
{% endif %}
{% if brand_guidelines %}
Brand Guidelines:
{{ brand_guidelines }}
{% endif %}
Structure the post with:
1. Engaging introduction
2. Main content sections with subheadings
3. Conclusion with call-to-action
Ensure the content is:
- Original and engaging
- SEO-optimized
- Factually accurate
- Aligned with brand voice
"""
Strategy Management Module (SMM) Development¶
1. Client Hierarchy Management¶
# lib/client_manager.py
from typing import List, Dict, Any, Optional
from sqlalchemy.orm import Session
class ClientManager:
def __init__(self, db: Session):
self.db = db
async def create_client(
self,
name: str,
client_type: str,
parent_id: Optional[str] = None,
metadata: Dict[str, Any] = None
) -> Dict[str, Any]:
"""Create new client with proper hierarchy"""
# Validate hierarchy rules
if client_type == 'root' and parent_id:
raise ValueError("Root clients cannot have parents")
if client_type == 'customer' and not parent_id:
raise ValueError("Customer clients must have an agency parent")
# Create client
client = await self.db.create_client({
'name': name,
'slug': self._generate_slug(name),
'type': client_type,
'parent_id': parent_id,
'metadata': metadata or {}
})
# Set up default strategies
await self._create_default_strategies(client['id'])
return client
async def get_client_hierarchy(self, client_id: str) -> Dict[str, Any]:
"""Get complete client hierarchy"""
client = await self.db.get_client(client_id)
hierarchy = {
'client': client,
'parent': None,
'children': [],
'ancestors': [],
'descendants': []
}
# Get parent chain
if client['parent_id']:
hierarchy['ancestors'] = await self._get_ancestors(client_id)
hierarchy['parent'] = hierarchy['ancestors'][-1] if hierarchy['ancestors'] else None
# Get children
hierarchy['children'] = await self.db.get_client_children(client_id)
# Get all descendants
hierarchy['descendants'] = await self._get_descendants(client_id)
return hierarchy
async def _create_default_strategies(self, client_id: str):
"""Create default strategies for new client"""
default_strategies = [
{
'name': 'Default Blog Strategy',
'content_type': 'blog_post',
'configuration': {
'tone': 'professional',
'target_audience': 'business professionals',
'word_count_range': [800, 1200],
'seo_focus': True,
'include_cta': True
}
},
{
'name': 'Default Social Media Strategy',
'content_type': 'social_media',
'configuration': {
'platforms': ['linkedin', 'twitter'],
'character_limits': {'twitter': 280, 'linkedin': 3000},
'hashtag_count': {'twitter': 2, 'linkedin': 5},
'tone': 'engaging'
}
}
]
for strategy in default_strategies:
strategy['client_id'] = client_id
await self.db.create_strategy(strategy)
2. Strategy Configuration System¶
# lib/strategy_config.py
from pydantic import BaseModel, validator
from typing import Dict, Any, List, Optional
from enum import Enum
class ContentType(str, Enum):
BLOG_POST = 'blog_post'
SOCIAL_MEDIA = 'social_media'
EMAIL_CAMPAIGN = 'email_campaign'
PRESS_RELEASE = 'press_release'
PRODUCT_DESCRIPTION = 'product_description'
class ToneStyle(str, Enum):
PROFESSIONAL = 'professional'
CASUAL = 'casual'
FRIENDLY = 'friendly'
AUTHORITATIVE = 'authoritative'
CONVERSATIONAL = 'conversational'
class StrategyConfiguration(BaseModel):
"""Base strategy configuration model"""
content_type: ContentType
tone: ToneStyle
target_audience: str
brand_guidelines: Optional[str] = None
seo_keywords: List[str] = []
class Config:
use_enum_values = True
class BlogPostStrategy(StrategyConfiguration):
"""Blog post specific strategy configuration"""
word_count_min: int = 500
word_count_max: int = 2000
include_introduction: bool = True
include_conclusion: bool = True
include_cta: bool = True
subheading_count: int = 3
meta_description_length: int = 160
@validator('word_count_max')
def validate_word_count(cls, v, values):
if 'word_count_min' in values and v <= values['word_count_min']:
raise ValueError('Max word count must be greater than min word count')
return v
class SocialMediaStrategy(StrategyConfiguration):
"""Social media specific strategy configuration"""
platforms: List[str]
character_limits: Dict[str, int]
hashtag_count: Dict[str, int] = {}
include_emoji: bool = True
post_timing: Optional[Dict[str, str]] = None
@validator('platforms')
def validate_platforms(cls, v):
valid_platforms = ['twitter', 'linkedin', 'facebook', 'instagram']
for platform in v:
if platform not in valid_platforms:
raise ValueError(f'Invalid platform: {platform}')
return v
# Strategy factory
class StrategyFactory:
@staticmethod
def create_strategy(content_type: str, config: Dict[str, Any]) -> StrategyConfiguration:
strategy_classes = {
'blog_post': BlogPostStrategy,
'social_media': SocialMediaStrategy,
# Add more strategy types as needed
}
strategy_class = strategy_classes.get(content_type, StrategyConfiguration)
return strategy_class(**config)
Frontend Development Guide¶
1. Component Development Pattern¶
// components/content/ContentRequestForm.tsx
import { useState } from 'react';
import { useForm } from 'react-hook-form';
import { zodResolver } from '@hookform/resolvers/zod';
import { z } from 'zod';
import { Button } from '@/components/ui/button';
import { Form, FormField, FormItem, FormLabel, FormControl, FormMessage } from '@/components/ui/form';
import { useCreateContentRequest } from '@/hooks/useCreateContentRequest';
import { useContentStore } from '@/stores/useContentStore';
// Validation schema
const contentRequestSchema = z.object({
topic: z.string().min(1, 'Topic is required').max(200, 'Topic too long'),
contentType: z.enum(['blog_post', 'social_media', 'email_campaign']),
keywords: z.array(z.string()).min(1, 'At least one keyword is required'),
targetAudience: z.string().optional(),
tone: z.enum(['professional', 'casual', 'friendly']).default('professional'),
wordCount: z.number().min(100).max(5000).optional(),
});
type ContentRequestFormData = z.infer<typeof contentRequestSchema>;
interface ContentRequestFormProps {
clientId: string;
onSubmit?: (data: ContentRequestFormData) => void;
onCancel?: () => void;
}
export const ContentRequestForm = ({ clientId, onSubmit, onCancel }: ContentRequestFormProps) => {
const [isSubmitting, setIsSubmitting] = useState(false);
const { createRequest, isLoading } = useCreateContentRequest();
const { selectedClient } = useContentStore();
const form = useForm<ContentRequestFormData>({
resolver: zodResolver(contentRequestSchema),
defaultValues: {
topic: '',
contentType: 'blog_post',
keywords: [],
tone: 'professional',
},
});
const handleSubmit = async (data: ContentRequestFormData) => {
try {
setIsSubmitting(true);
const request = {
...data,
clientId,
createdBy: selectedClient?.id,
};
const result = await createRequest(request);
// Handle success
onSubmit?.(data);
form.reset();
// Show success notification
toast.success(`Content request created. Job ID: ${result.jobId}`);
} catch (error) {
console.error('Failed to create content request:', error);
toast.error('Failed to create content request. Please try again.');
} finally {
setIsSubmitting(false);
}
};
return (
<Form {...form}>
<form onSubmit={form.handleSubmit(handleSubmit)} className="space-y-6">
<FormField
control={form.control}
name="topic"
render={({ field }) => (
<FormItem>
<FormLabel>Topic</FormLabel>
<FormControl>
<Input
placeholder="Enter content topic..."
{...field}
/>
</FormControl>
<FormMessage />
</FormItem>
)}
/>
<FormField
control={form.control}
name="contentType"
render={({ field }) => (
<FormItem>
<FormLabel>Content Type</FormLabel>
<Select onValueChange={field.onChange} defaultValue={field.value}>
<FormControl>
<SelectTrigger>
<SelectValue placeholder="Select content type" />
</SelectTrigger>
</FormControl>
<SelectContent>
<SelectItem value="blog_post">Blog Post</SelectItem>
<SelectItem value="social_media">Social Media</SelectItem>
<SelectItem value="email_campaign">Email Campaign</SelectItem>
</SelectContent>
</Select>
<FormMessage />
</FormItem>
)}
/>
<div className="flex gap-4">
<Button
type="submit"
disabled={isSubmitting || isLoading}
className="flex-1"
>
{isSubmitting ? 'Creating...' : 'Generate Content'}
</Button>
{onCancel && (
<Button
type="button"
variant="outline"
onClick={onCancel}
disabled={isSubmitting}
>
Cancel
</Button>
)}
</div>
</form>
</Form>
);
};
2. Custom Hook Development¶
// hooks/useCreateContentRequest.ts
import { useMutation, useQueryClient } from '@tanstack/react-query';
import { createContentRequest } from '@/lib/api/content';
import { ContentRequest, ContentRequestResponse } from '@/types';
export const useCreateContentRequest = () => {
const queryClient = useQueryClient();
return useMutation<ContentRequestResponse, Error, ContentRequest>({
mutationFn: createContentRequest,
onSuccess: (data, variables) => {
// Invalidate and refetch jobs list
queryClient.invalidateQueries({ queryKey: ['jobs', variables.clientId] });
// Optimistically add the new job to the cache
queryClient.setQueryData(['jobs', variables.clientId], (old: any) => ({
...old,
data: [
{
id: data.jobId,
status: 'pending',
topic: variables.topic,
contentType: variables.contentType,
createdAt: new Date().toISOString(),
},
...(old?.data || []),
],
}));
},
onError: (error) => {
console.error('Failed to create content request:', error);
},
});
};
3. State Management with Zustand¶
// stores/useContentStore.ts
import { create } from 'zustand';
import { immer } from 'zustand/middleware/immer';
import { persist } from 'zustand/middleware';
interface Client {
id: string;
name: string;
type: 'root' | 'agency' | 'customer';
parentId?: string;
}
interface ContentState {
selectedClient: Client | null;
recentRequests: ContentRequest[];
preferences: {
defaultContentType: string;
defaultTone: string;
autoSave: boolean;
};
}
interface ContentActions {
setSelectedClient: (client: Client | null) => void;
addRecentRequest: (request: ContentRequest) => void;
updatePreferences: (preferences: Partial<ContentState['preferences']>) => void;
clearRecentRequests: () => void;
}
export const useContentStore = create<ContentState & ContentActions>()(
persist(
immer((set) => ({
// Initial state
selectedClient: null,
recentRequests: [],
preferences: {
defaultContentType: 'blog_post',
defaultTone: 'professional',
autoSave: true,
},
// Actions
setSelectedClient: (client) =>
set((state) => {
state.selectedClient = client;
}),
addRecentRequest: (request) =>
set((state) => {
state.recentRequests.unshift(request);
// Keep only last 10 requests
if (state.recentRequests.length > 10) {
state.recentRequests = state.recentRequests.slice(0, 10);
}
}),
updatePreferences: (newPreferences) =>
set((state) => {
Object.assign(state.preferences, newPreferences);
}),
clearRecentRequests: () =>
set((state) => {
state.recentRequests = [];
}),
})),
{
name: 'content-store',
partialize: (state) => ({
selectedClient: state.selectedClient,
preferences: state.preferences,
}),
}
)
);
API Development and Integration¶
API Design Principles¶
1. RESTful API Structure¶
GET /api/clients # List all clients
POST /api/clients # Create new client
GET /api/clients/{id} # Get specific client
PUT /api/clients/{id} # Update client
DELETE /api/clients/{id} # Delete client
GET /api/clients/{id}/strategies # Get client strategies
POST /api/clients/{id}/strategies # Create strategy for client
GET /api/jobs # List jobs (with filtering)
POST /api/jobs # Create new job
GET /api/jobs/{id} # Get job details
PUT /api/jobs/{id} # Update job
DELETE /api/jobs/{id} # Cancel job
POST /api/content/generate # Generate content
GET /api/content/templates # Get prompt templates
2. Request/Response Standards¶
// Standard API response format
interface ApiResponse<T = any> {
success: boolean;
data?: T;
error?: {
code: string;
message: string;
details?: any;
};
pagination?: {
page: number;
pageSize: number;
total: number;
totalPages: number;
};
}
// Error response format
interface ApiError {
code: string;
message: string;
details?: {
field?: string;
value?: any;
constraint?: string;
}[];
}
// Example usage
const response: ApiResponse<Job[]> = {
success: true,
data: [
{
id: 'job-123',
status: 'completed',
topic: 'AI in Healthcare',
result: { content: '...' }
}
],
pagination: {
page: 1,
pageSize: 20,
total: 150,
totalPages: 8
}
};
3. Authentication & Authorization¶
// middleware/auth.ts
import { NextRequest, NextResponse } from 'next/server';
import { createServerComponentClient } from '@supabase/auth-helpers-nextjs';
export async function authMiddleware(request: NextRequest) {
const supabase = createServerComponentClient({ cookies: () => request.cookies });
try {
const { data: { session }, error } = await supabase.auth.getSession();
if (error || !session) {
return NextResponse.json(
{ success: false, error: { code: 'UNAUTHORIZED', message: 'Authentication required' } },
{ status: 401 }
);
}
// Add user context to request
const requestHeaders = new Headers(request.headers);
requestHeaders.set('x-user-id', session.user.id);
requestHeaders.set('x-user-email', session.user.email || '');
return NextResponse.next({
request: {
headers: requestHeaders,
},
});
} catch (error) {
return NextResponse.json(
{ success: false, error: { code: 'AUTH_ERROR', message: 'Authentication failed' } },
{ status: 500 }
);
}
}
// Usage in API routes
export async function GET(request: NextRequest) {
const userId = request.headers.get('x-user-id');
// Handle authenticated request
}
LLM Provider Integration (OpenRouter)¶
The CPM production pipeline now runs through OpenRouter as the primary aggregator. When issuing POST /generate requests from trusted services:
- Set
llm_providertoopenrouter. - Supply a
modelstring sourced from/api/providers/status(default:openai/gpt-4o-mini). - Attach the calling user context via
user_idandprovider_key_idso the CPM can validate ownership of the stored OpenRouter key.
Example payload accepted by apps/cpm/app.py:
{
"content_type": "blog",
"prompt": "Explain how AI assistants help marketing teams increase productivity.",
"client_id": "550e8400-e29b-41d4-a716-446655440000",
"llm_provider": "openrouter",
"model": "deepseek/deepseek-r1-0528-qwen3-8b:free",
"keywords": ["ai assistants", "marketing", "automation"],
"length": "short",
"user_id": "4d9b1c88-62eb-4b88-92b5-ca03d8bdb616",
"provider_key_id": "1f1fe2c5-20ca-4602-a285-182443edcbe7"
}
Requests missing the provider key (or referencing a disabled key) receive 503 responses (OpenRouter API key is not configured).
The provider catalogue is available at runtime via:
This endpoint exposes real-time pricing, context lengths, and a requires_user_key flag used by the frontend to guide users through key setup.
``
Inter-Service Communication¶
1. Service Client Pattern¶
// lib/services/BaseServiceClient.ts
import axios, { AxiosInstance, AxiosRequestConfig } from 'axios';
export abstract class BaseServiceClient {
protected client: AxiosInstance;
constructor(baseURL: string, timeout: number = 10000) {
this.client = axios.create({
baseURL,
timeout,
headers: {
'Content-Type': 'application/json',
},
});
this.setupInterceptors();
}
private setupInterceptors() {
// Request interceptor for auth
this.client.interceptors.request.use(
(config) => {
const token = this.getAuthToken();
if (token) {
config.headers.Authorization = `Bearer ${token}`;
}
return config;
},
(error) => Promise.reject(error)
);
// Response interceptor for error handling
this.client.interceptors.response.use(
(response) => response,
(error) => {
if (error.response?.status === 401) {
// Handle authentication errors
this.handleAuthError();
}
return Promise.reject(error);
}
);
}
protected abstract getAuthToken(): string | null;
protected abstract handleAuthError(): void;
protected async request<T>(config: AxiosRequestConfig): Promise<T> {
try {
const response = await this.client.request<T>(config);
return response.data;
} catch (error) {
this.handleRequestError(error);
throw error;
}
}
private handleRequestError(error: any) {
// Log error details
console.error('Service request failed:', {
url: error.config?.url,
method: error.config?.method,
status: error.response?.status,
message: error.message,
});
}
}
// lib/services/CPMServiceClient.ts
export class CPMServiceClient extends BaseServiceClient {
constructor() {
super(process.env.NEXT_PUBLIC_CPM_API_URL!);
}
protected getAuthToken(): string | null {
// Get token from Supabase session or other auth store
return null; // Implement based on your auth strategy
}
protected handleAuthError(): void {
// Redirect to login or refresh token
}
async generateContent(request: ContentGenerationRequest): Promise<ContentGenerationResponse> {
return this.request({
method: 'POST',
url: '/generate',
data: request,
});
}
async getJobStatus(jobId: string): Promise<JobStatusResponse> {
return this.request({
method: 'GET',
url: `/jobs/${jobId}`,
});
}
async listJobs(params: ListJobsParams): Promise<ListJobsResponse> {
return this.request({
method: 'GET',
url: '/jobs',
params,
});
}
}
2. Error Handling and Retry Logic¶
// lib/utils/apiUtils.ts
import { AxiosError } from 'axios';
export interface RetryConfig {
maxRetries: number;
baseDelay: number;
maxDelay: number;
backoffFactor: number;
}
const DEFAULT_RETRY_CONFIG: RetryConfig = {
maxRetries: 3,
baseDelay: 1000,
maxDelay: 10000,
backoffFactor: 2,
};
export async function withRetry<T>(
operation: () => Promise<T>,
config: Partial<RetryConfig> = {}
): Promise<T> {
const { maxRetries, baseDelay, maxDelay, backoffFactor } = {
...DEFAULT_RETRY_CONFIG,
...config,
};
let lastError: Error;
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
return await operation();
} catch (error) {
lastError = error as Error;
// Don't retry on client errors (4xx)
if (error instanceof AxiosError && error.response?.status && error.response.status < 500) {
throw error;
}
if (attempt === maxRetries) {
break;
}
// Calculate delay with exponential backoff
const delay = Math.min(baseDelay * Math.pow(backoffFactor, attempt), maxDelay);
await new Promise(resolve => setTimeout(resolve, delay));
}
}
throw lastError!;
}
// Usage example
export const cpmService = {
async generateContent(request: ContentGenerationRequest) {
return withRetry(
() => cpmClient.generateContent(request),
{ maxRetries: 2, baseDelay: 500 }
);
},
};
API Documentation Generation¶
1. FastAPI Documentation¶
# app.py
from fastapi import FastAPI, HTTPException, Depends
from fastapi.openapi.docs import get_swagger_ui_html
from fastapi.openapi.utils import get_openapi
from pydantic import BaseModel, Field
from typing import List, Optional
import uvicorn
app = FastAPI(
title="Content Production Module API",
description="API for AI-powered content generation",
version="1.0.0",
contact={
"name": "HG Content Team",
"email": "dev@hgcontent.com",
},
license_info={
"name": "MIT",
"url": "https://opensource.org/licenses/MIT",
},
)
# Request/Response models with detailed documentation
class ContentGenerationRequest(BaseModel):
"""Request model for content generation"""
topic: str = Field(..., description="Main topic for content generation", max_length=200)
content_type: str = Field(..., description="Type of content to generate", regex="^(blog_post|social_media|email_campaign)$")
keywords: List[str] = Field(..., description="SEO keywords to include", min_items=1, max_items=10)
client_id: str = Field(..., description="Client identifier")
llm_provider: Optional[str] = Field("openai", description="LLM provider to use")
max_tokens: Optional[int] = Field(1000, description="Maximum tokens to generate", ge=100, le=4000)
class Config:
schema_extra = {
"example": {
"topic": "Benefits of AI in Healthcare",
"content_type": "blog_post",
"keywords": ["artificial intelligence", "healthcare", "medical technology"],
"client_id": "client-123",
"llm_provider": "openai",
"max_tokens": 1500
}
}
class ContentGenerationResponse(BaseModel):
"""Response model for content generation"""
job_id: str = Field(..., description="Unique identifier for the generation job")
status: str = Field(..., description="Current status of the job")
estimated_completion: Optional[str] = Field(None, description="ISO timestamp of estimated completion")
@app.post(
"/generate",
response_model=ContentGenerationResponse,
summary="Generate content",
description="Queue a new content generation job using AI/LLM providers",
responses={
200: {"description": "Job created successfully"},
400: {"description": "Invalid request data"},
429: {"description": "Rate limit exceeded"},
500: {"description": "Internal server error"}
},
tags=["Content Generation"]
)
async def generate_content(request: ContentGenerationRequest):
"""
Generate content using AI/LLM providers.
This endpoint queues a content generation job and returns immediately with a job ID.
Use the `/jobs/{job_id}` endpoint to check the status and retrieve results.
**Supported Content Types:**
- `blog_post`: Full-length blog articles (500-2000 words)
- `social_media`: Social media posts (platform-specific formatting)
- `email_campaign`: Marketing email content
**Supported LLM Providers:**
- `openai`: GPT-4, GPT-3.5-turbo
- `anthropic`: Claude 3.5 Sonnet, Claude 3 Haiku
- `google`: Gemini 1.5 Pro, Gemini 1.5 Flash
- `groq`: Llama 3.1 70B (high-speed inference)
"""
# Implementation
pass
2. OpenAPI Schema Generation¶
def custom_openapi():
if app.openapi_schema:
return app.openapi_schema
openapi_schema = get_openapi(
title="HG Content Generation API",
version="1.0.0",
description="Comprehensive API for AI-powered content generation with multi-client support",
routes=app.routes,
)
# Add custom schema information
openapi_schema["info"]["x-logo"] = {
"url": "https://hgcontent.com/logo.png"
}
# Add server information
openapi_schema["servers"] = [
{"url": "https://api.hgcontent.com", "description": "Production server"},
{"url": "https://staging-api.hgcontent.com", "description": "Staging server"},
{"url": "http://localhost:8001", "description": "Development server"}
]
# Add authentication schemes
openapi_schema["components"]["securitySchemes"] = {
"bearerAuth": {
"type": "http",
"scheme": "bearer",
"bearerFormat": "JWT"
}
}
app.openapi_schema = openapi_schema
return app.openapi_schema
app.openapi = custom_openapi
Database Development and Migrations¶
Migration Strategy¶
1. Supabase Migration Management¶
-- supabase/migrations/20240801000001_create_core_tables.sql
-- Create core tables for the HG Content system
-- Enable UUID extension
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
-- Create custom types
CREATE TYPE client_type AS ENUM ('root', 'agency', 'customer');
CREATE TYPE job_status AS ENUM ('pending', 'in_progress', 'completed', 'failed', 'cancelled');
CREATE TYPE content_type AS ENUM ('blog_post', 'social_media', 'email_campaign', 'press_release', 'product_description');
-- Clients table
CREATE TABLE clients (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
name VARCHAR(255) NOT NULL,
slug VARCHAR(255) UNIQUE NOT NULL,
type client_type NOT NULL,
parent_id UUID REFERENCES clients(id) ON DELETE CASCADE,
metadata JSONB DEFAULT '{}',
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
-- Constraints
CONSTRAINT valid_hierarchy CHECK (
(type = 'root' AND parent_id IS NULL) OR
(type != 'root' AND parent_id IS NOT NULL)
)
);
-- Jobs table
CREATE TABLE jobs (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
client_id UUID NOT NULL REFERENCES clients(id) ON DELETE CASCADE,
topic VARCHAR(500) NOT NULL,
content_type content_type NOT NULL,
prompt TEXT NOT NULL,
llm_provider VARCHAR(50) NOT NULL DEFAULT 'openai',
status job_status NOT NULL DEFAULT 'pending',
parameters JSONB DEFAULT '{}',
result JSONB DEFAULT NULL,
error_message TEXT DEFAULT NULL,
-- Cost tracking
input_tokens INTEGER DEFAULT NULL,
output_tokens INTEGER DEFAULT NULL,
total_tokens INTEGER DEFAULT NULL,
estimated_cost DECIMAL(10,6) DEFAULT NULL,
provider_model VARCHAR(100) DEFAULT NULL,
-- Timestamps
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
started_at TIMESTAMP WITH TIME ZONE DEFAULT NULL,
completed_at TIMESTAMP WITH TIME ZONE DEFAULT NULL,
-- User context
created_by UUID REFERENCES auth.users(id) ON DELETE SET NULL
);
-- Strategies table
CREATE TABLE strategies (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
client_id UUID NOT NULL REFERENCES clients(id) ON DELETE CASCADE,
name VARCHAR(255) NOT NULL,
content_type content_type NOT NULL,
configuration JSONB NOT NULL,
is_active BOOLEAN DEFAULT true,
version INTEGER DEFAULT 1,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
created_by UUID REFERENCES auth.users(id) ON DELETE SET NULL
);
-- Prompt templates table
CREATE TABLE prompt_templates (
id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
client_id UUID NOT NULL REFERENCES clients(id) ON DELETE CASCADE,
name VARCHAR(255) NOT NULL,
content_type content_type NOT NULL,
template TEXT NOT NULL,
variables JSONB DEFAULT '{}',
version INTEGER DEFAULT 1,
is_active BOOLEAN DEFAULT true,
is_default BOOLEAN DEFAULT false,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
created_by UUID REFERENCES auth.users(id) ON DELETE SET NULL
);
-- Add indexes
CREATE INDEX idx_clients_parent_id ON clients(parent_id);
CREATE INDEX idx_clients_type ON clients(type);
CREATE INDEX idx_clients_slug ON clients(slug);
CREATE INDEX idx_jobs_client_id ON jobs(client_id);
CREATE INDEX idx_jobs_status ON jobs(status);
CREATE INDEX idx_jobs_created_at ON jobs(created_at);
CREATE INDEX idx_jobs_content_type ON jobs(content_type);
CREATE INDEX idx_jobs_llm_provider ON jobs(llm_provider);
CREATE INDEX idx_strategies_client_id ON strategies(client_id);
CREATE INDEX idx_strategies_content_type ON strategies(content_type);
CREATE INDEX idx_strategies_is_active ON strategies(is_active);
CREATE INDEX idx_prompt_templates_client_id ON prompt_templates(client_id);
CREATE INDEX idx_prompt_templates_content_type ON prompt_templates(content_type);
CREATE INDEX idx_prompt_templates_is_active ON prompt_templates(is_active);
-- Add triggers for updated_at
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$$ language 'plpgsql';
CREATE TRIGGER update_clients_updated_at BEFORE UPDATE ON clients
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();
CREATE TRIGGER update_jobs_updated_at BEFORE UPDATE ON jobs
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();
CREATE TRIGGER update_strategies_updated_at BEFORE UPDATE ON strategies
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();
CREATE TRIGGER update_prompt_templates_updated_at BEFORE UPDATE ON prompt_templates
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();
2. Row-Level Security (RLS) Setup¶
-- supabase/migrations/20240801000002_enable_rls.sql
-- Enable Row-Level Security for multi-tenant data isolation
-- Enable RLS on all tables
ALTER TABLE clients ENABLE ROW LEVEL SECURITY;
ALTER TABLE jobs ENABLE ROW LEVEL SECURITY;
ALTER TABLE strategies ENABLE ROW LEVEL SECURITY;
ALTER TABLE prompt_templates ENABLE ROW LEVEL SECURITY;
-- Helper function to get user's accessible client IDs
CREATE OR REPLACE FUNCTION get_user_client_ids(user_id UUID)
RETURNS UUID[] AS $$
DECLARE
client_ids UUID[];
BEGIN
-- Get all client IDs the user has access to
-- This includes clients they directly belong to and their children
SELECT ARRAY(
WITH RECURSIVE client_hierarchy AS (
-- Base case: clients directly associated with user
SELECT c.id, c.parent_id, c.type
FROM clients c
JOIN user_clients uc ON c.id = uc.client_id
WHERE uc.user_id = user_id
UNION
-- Recursive case: child clients
SELECT c.id, c.parent_id, c.type
FROM clients c
JOIN client_hierarchy ch ON c.parent_id = ch.id
)
SELECT id FROM client_hierarchy
) INTO client_ids;
RETURN client_ids;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
-- RLS Policies for clients table
CREATE POLICY "Users can view accessible clients" ON clients
FOR SELECT
USING (id = ANY(get_user_client_ids(auth.uid())));
CREATE POLICY "Users can update their clients" ON clients
FOR UPDATE
USING (id = ANY(get_user_client_ids(auth.uid())));
-- RLS Policies for jobs table
CREATE POLICY "Users can view jobs for their clients" ON jobs
FOR SELECT
USING (client_id = ANY(get_user_client_ids(auth.uid())));
CREATE POLICY "Users can create jobs for their clients" ON jobs
FOR INSERT
WITH CHECK (client_id = ANY(get_user_client_ids(auth.uid())));
CREATE POLICY "Users can update jobs for their clients" ON jobs
FOR UPDATE
USING (client_id = ANY(get_user_client_ids(auth.uid())));
-- RLS Policies for strategies table
CREATE POLICY "Users can view strategies for their clients" ON strategies
FOR SELECT
USING (client_id = ANY(get_user_client_ids(auth.uid())));
CREATE POLICY "Users can manage strategies for their clients" ON strategies
FOR ALL
USING (client_id = ANY(get_user_client_ids(auth.uid())));
-- RLS Policies for prompt_templates table
CREATE POLICY "Users can view templates for their clients" ON prompt_templates
FOR SELECT
USING (client_id = ANY(get_user_client_ids(auth.uid())));
CREATE POLICY "Users can manage templates for their clients" ON prompt_templates
FOR ALL
USING (client_id = ANY(get_user_client_ids(auth.uid())));
3. Database Seeding¶
# scripts/seed_database.py
import asyncio
import os
from supabase import create_client, Client
from typing import Dict, Any, List
import json
async def seed_database():
"""Seed the database with initial data"""
# Initialize Supabase client
supabase: Client = create_client(
os.getenv("SUPABASE_URL"),
os.getenv("SUPABASE_SERVICE_ROLE_KEY")
)
# Seed clients
clients = await seed_clients(supabase)
# Seed strategies
await seed_strategies(supabase, clients)
# Seed prompt templates
await seed_prompt_templates(supabase, clients)
print("Database seeding completed successfully!")
async def seed_clients(supabase: Client) -> Dict[str, str]:
"""Create initial client hierarchy"""
clients_data = [
{
"name": "HG Content",
"slug": "hg-content",
"type": "root",
"parent_id": None,
"metadata": {
"description": "Root client for HG Content system",
"features": ["all"]
}
},
{
"name": "PASCO Scientific",
"slug": "pasco-scientific",
"type": "agency",
"parent_id": None, # Will be updated with root client ID
"metadata": {
"industry": "education",
"target_audience": "educators, students",
"brand_colors": ["#0066CC", "#FF6600"]
}
},
{
"name": "Heaviside Digital",
"slug": "heaviside-digital",
"type": "agency",
"parent_id": None, # Will be updated with root client ID
"metadata": {
"industry": "marketing_agency",
"specialization": "local_businesses",
"service_areas": ["ohio", "kentucky", "indiana"]
}
}
]
created_clients = {}
# Create root client first
root_client_data = clients_data[0]
result = supabase.table("clients").insert(root_client_data).execute()
root_client_id = result.data[0]["id"]
created_clients["root"] = root_client_id
# Update parent_id for other clients
for client_data in clients_data[1:]:
client_data["parent_id"] = root_client_id
result = supabase.table("clients").insert(client_data).execute()
created_clients[client_data["slug"]] = result.data[0]["id"]
# Create customer clients for Heaviside Digital
customer_clients = [
{
"name": "Cincinnati Electrician Co.",
"slug": "cincinnati-electrician",
"type": "customer",
"parent_id": created_clients["heaviside-digital"],
"metadata": {
"industry": "electrical_services",
"location": "cincinnati_oh",
"services": ["residential", "commercial", "emergency"]
}
},
{
"name": "Ohio Paving Contractors",
"slug": "ohio-paving",
"type": "customer",
"parent_id": created_clients["heaviside-digital"],
"metadata": {
"industry": "construction",
"location": "columbus_oh",
"services": ["asphalt_paving", "concrete", "sealcoating"]
}
}
]
for customer_data in customer_clients:
result = supabase.table("clients").insert(customer_data).execute()
created_clients[customer_data["slug"]] = result.data[0]["id"]
return created_clients
async def seed_strategies(supabase: Client, clients: Dict[str, str]):
"""Create default strategies for each client"""
strategies_data = []
# PASCO Scientific strategies
pasco_strategies = [
{
"client_id": clients["pasco-scientific"],
"name": "Educational Blog Strategy",
"content_type": "blog_post",
"configuration": {
"tone": "educational",
"target_audience": "high school and college educators",
"word_count_range": [800, 1500],
"include_experiments": True,
"include_diagrams": True,
"technical_level": "intermediate",
"seo_focus": True,
"brand_mentions": "PASCO Scientific equipment"
},
"is_active": True
},
{
"client_id": clients["pasco-scientific"],
"name": "Social Media Strategy",
"content_type": "social_media",
"configuration": {
"platforms": ["linkedin", "twitter", "facebook"],
"tone": "informative",
"hashtags": ["#STEMEducation", "#PhysicsEducation", "#PASCOScientific"],
"include_visuals": True,
"post_frequency": "daily",
"engagement_focus": True
},
"is_active": True
}
]
# Local business strategies for Heaviside customers
local_business_strategies = [
{
"client_id": clients["cincinnati-electrician"],
"name": "Local SEO Blog Strategy",
"content_type": "blog_post",
"configuration": {
"tone": "professional",
"target_audience": "homeowners and business owners",
"location_focus": "Cincinnati, Ohio",
"word_count_range": [600, 1000],
"include_local_keywords": True,
"include_service_areas": True,
"call_to_action": "contact for quote",
"seo_focus": True
},
"is_active": True
},
{
"client_id": clients["ohio-paving"],
"name": "Construction Industry Strategy",
"content_type": "blog_post",
"configuration": {
"tone": "authoritative",
"target_audience": "property managers and contractors",
"location_focus": "Ohio",
"word_count_range": [800, 1200],
"technical_focus": "asphalt and concrete",
"seasonal_content": True,
"include_case_studies": True,
"seo_focus": True
},
"is_active": True
}
]
strategies_data.extend(pasco_strategies)
strategies_data.extend(local_business_strategies)
# Insert all strategies
result = supabase.table("strategies").insert(strategies_data).execute()
print(f"Created {len(result.data)} strategies")
if __name__ == "__main__":
asyncio.run(seed_database())
Database Performance Optimization¶
1. Query Optimization¶
-- Performance monitoring queries
-- Find slow queries
SELECT
query,
calls,
total_time,
mean_time,
rows
FROM pg_stat_statements
WHERE mean_time > 100 -- Queries taking more than 100ms on average
ORDER BY mean_time DESC
LIMIT 10;
-- Check index usage
SELECT
schemaname,
tablename,
indexname,
idx_scan,
idx_tup_read,
idx_tup_fetch
FROM pg_stat_user_indexes
WHERE idx_scan = 0 -- Unused indexes
ORDER BY schemaname, tablename;
-- Find missing indexes
SELECT
schemaname,
tablename,
seq_scan,
seq_tup_read,
seq_tup_read / seq_scan AS avg_seq_read
FROM pg_stat_user_tables
WHERE seq_scan > 0
AND seq_tup_read / seq_scan > 1000 -- Tables doing large sequential scans
ORDER BY seq_tup_read DESC;
2. Composite Indexes¶
-- Optimized indexes for common query patterns
-- Jobs queries often filter by client_id, status, and created_at
CREATE INDEX CONCURRENTLY idx_jobs_client_status_created
ON jobs(client_id, status, created_at DESC);
-- Analytics queries often aggregate by date ranges and providers
CREATE INDEX CONCURRENTLY idx_jobs_analytics
ON jobs(created_at, llm_provider, estimated_cost)
WHERE status = 'completed';
-- Strategy lookups by client and content type
CREATE INDEX CONCURRENTLY idx_strategies_client_content_active
ON strategies(client_id, content_type, is_active)
WHERE is_active = true;
-- Template searches
CREATE INDEX CONCURRENTLY idx_templates_client_content_active
ON prompt_templates(client_id, content_type, is_active)
WHERE is_active = true;
-- Full-text search on job topics and results
CREATE INDEX CONCURRENTLY idx_jobs_topic_search
ON jobs USING gin(to_tsvector('english', topic));
CREATE INDEX CONCURRENTLY idx_jobs_content_search
ON jobs USING gin(to_tsvector('english', (result->>'content')))
WHERE result IS NOT NULL;
3. Partitioning Strategy¶
-- Partition jobs table by month for better performance
-- Create partitioned table
CREATE TABLE jobs_partitioned (
LIKE jobs INCLUDING ALL
) PARTITION BY RANGE (created_at);
-- Create monthly partitions
CREATE TABLE jobs_2024_08 PARTITION OF jobs_partitioned
FOR VALUES FROM ('2024-08-01') TO ('2024-09-01');
CREATE TABLE jobs_2024_09 PARTITION OF jobs_partitioned
FOR VALUES FROM ('2024-09-01') TO ('2024-10-01');
-- Create function to automatically create new partitions
CREATE OR REPLACE FUNCTION create_monthly_partition(table_name text, start_date date)
RETURNS void AS $$
DECLARE
partition_name text;
end_date date;
BEGIN
partition_name := table_name || '_' || to_char(start_date, 'YYYY_MM');
end_date := start_date + interval '1 month';
EXECUTE format('CREATE TABLE %I PARTITION OF %I FOR VALUES FROM (%L) TO (%L)',
partition_name, table_name, start_date, end_date);
END;
$$ LANGUAGE plpgsql;
-- Set up automatic partition creation
CREATE OR REPLACE FUNCTION create_next_partition()
RETURNS void AS $$
BEGIN
PERFORM create_monthly_partition('jobs_partitioned', date_trunc('month', CURRENT_DATE + interval '1 month'));
END;
$$ LANGUAGE plpgsql;
-- Schedule monthly partition creation (using pg_cron if available)
-- SELECT cron.schedule('create-partition', '0 0 1 * *', 'SELECT create_next_partition();');
Debugging and Troubleshooting¶
Debugging Tools and Techniques¶
1. Frontend Debugging¶
React DevTools Configuration:
// lib/debug.ts
export const isDevelopment = process.env.NODE_ENV === 'development';
export const debugConfig = {
enableReactQueryDevtools: isDevelopment,
enableReduxDevtools: isDevelopment,
logLevel: isDevelopment ? 'debug' : 'error',
enablePerformanceLogging: isDevelopment,
};
// Enable debugging features conditionally
if (isDevelopment) {
// Enable why-did-you-render
import('@welldone-software/why-did-you-render').then((whyDidYouRender) => {
whyDidYouRender.default(React, {
trackAllPureComponents: true,
trackHooks: true,
});
});
}
Error Boundary Implementation:
// components/ErrorBoundary.tsx
import React, { Component, ErrorInfo, ReactNode } from 'react';
import { Button } from '@/components/ui/button';
import { Alert, AlertDescription, AlertTitle } from '@/components/ui/alert';
interface Props {
children: ReactNode;
fallback?: ReactNode;
onError?: (error: Error, errorInfo: ErrorInfo) => void;
}
interface State {
hasError: boolean;
error?: Error;
errorInfo?: ErrorInfo;
}
export class ErrorBoundary extends Component<Props, State> {
constructor(props: Props) {
super(props);
this.state = { hasError: false };
}
static getDerivedStateFromError(error: Error): State {
return { hasError: true, error };
}
componentDidCatch(error: Error, errorInfo: ErrorInfo) {
this.setState({ errorInfo });
// Log error to monitoring service
console.error('Error boundary caught an error:', error, errorInfo);
// Report to error tracking service (e.g., Sentry)
if (typeof window !== 'undefined' && window.Sentry) {
window.Sentry.captureException(error, {
contexts: {
react: {
componentStack: errorInfo.componentStack,
},
},
});
}
this.props.onError?.(error, errorInfo);
}
handleRetry = () => {
this.setState({ hasError: false, error: undefined, errorInfo: undefined });
};
render() {
if (this.state.hasError) {
if (this.props.fallback) {
return this.props.fallback;
}
return (
<div className="min-h-screen flex items-center justify-center p-4">
<div className="max-w-md w-full">
<Alert variant="destructive">
<AlertTitle>Something went wrong</AlertTitle>
<AlertDescription className="mt-2">
{this.state.error?.message || 'An unexpected error occurred'}
</AlertDescription>
</Alert>
<div className="mt-4 space-y-2">
<Button onClick={this.handleRetry} className="w-full">
Try Again
</Button>
<Button
variant="outline"
onClick={() => window.location.reload()}
className="w-full"
>
Reload Page
</Button>
</div>
{process.env.NODE_ENV === 'development' && this.state.errorInfo && (
<details className="mt-4">
<summary className="cursor-pointer text-sm font-medium">
Error Details (Development)
</summary>
<pre className="mt-2 p-2 bg-gray-100 rounded text-xs overflow-auto">
{this.state.error?.stack}
{this.state.errorInfo.componentStack}
</pre>
</details>
)}
</div>
</div>
);
}
return this.props.children;
}
}
Performance Monitoring:
// hooks/usePerformanceMonitoring.ts
import { useEffect, useRef } from 'react';
export const usePerformanceMonitoring = (componentName: string) => {
const startTime = useRef<number>();
const renderCount = useRef(0);
useEffect(() => {
if (process.env.NODE_ENV === 'development') {
startTime.current = performance.now();
renderCount.current += 1;
}
});
useEffect(() => {
if (process.env.NODE_ENV === 'development' && startTime.current) {
const endTime = performance.now();
const renderTime = endTime - startTime.current;
console.log(`[Performance] ${componentName}:`, {
renderTime: `${renderTime.toFixed(2)}ms`,
renderCount: renderCount.current,
});
// Log slow renders
if (renderTime > 16) { // 60fps threshold
console.warn(`[Performance Warning] ${componentName} took ${renderTime.toFixed(2)}ms to render`);
}
}
});
};
// Usage in components
export const ContentRequestForm = () => {
usePerformanceMonitoring('ContentRequestForm');
// Component implementation
};
2. Backend Debugging¶
Structured Logging Setup:
# lib/logging_config.py
import structlog
import logging
import sys
from typing import Any, Dict
def configure_logging(level: str = "INFO", format_json: bool = True):
"""Configure structured logging for the application"""
# Configure standard library logging
logging.basicConfig(
format="%(message)s",
stream=sys.stdout,
level=getattr(logging, level.upper()),
)
# Configure structlog
if format_json:
processors = [
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
add_request_id,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
structlog.processors.JSONRenderer()
]
else:
processors = [
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
add_request_id,
structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
structlog.dev.ConsoleRenderer()
]
structlog.configure(
processors=processors,
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
def add_request_id(logger, method_name, event_dict):
"""Add request ID to log entries"""
# Get request ID from context (FastAPI dependency)
import contextvars
request_id = contextvars.ContextVar('request_id', default=None).get()
if request_id:
event_dict['request_id'] = request_id
return event_dict
# Usage in FastAPI app
import uuid
from fastapi import FastAPI, Request
from contextvars import ContextVar
request_id_contextvar: ContextVar[str] = ContextVar('request_id')
@app.middleware("http")
async def add_request_id_middleware(request: Request, call_next):
request_id = str(uuid.uuid4())
request_id_contextvar.set(request_id)
# Add to response headers for debugging
response = await call_next(request)
response.headers["X-Request-ID"] = request_id
return response
Database Query Debugging:
# lib/database_debug.py
import time
import structlog
from functools import wraps
from typing import Any, Callable
logger = structlog.get_logger()
def log_database_queries(func: Callable) -> Callable:
"""Decorator to log database queries with timing"""
@wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.time()
query_name = func.__name__
try:
logger.debug("Database query started", query=query_name, args=args, kwargs=kwargs)
result = await func(*args, **kwargs)
duration = time.time() - start_time
logger.info(
"Database query completed",
query=query_name,
duration=f"{duration:.3f}s",
result_count=len(result) if isinstance(result, list) else 1
)
# Warn on slow queries
if duration > 1.0: # 1 second threshold
logger.warning(
"Slow database query detected",
query=query_name,
duration=f"{duration:.3f}s"
)
return result
except Exception as e:
duration = time.time() - start_time
logger.error(
"Database query failed",
query=query_name,
duration=f"{duration:.3f}s",
error=str(e),
error_type=type(e).__name__
)
raise
return wrapper
# Usage
class Database:
@log_database_queries
async def get_jobs(self, client_id: str, status: str = None):
# Database query implementation
pass
API Request/Response Logging:
# middleware/logging_middleware.py
import time
import json
from fastapi import Request, Response
from fastapi.middleware.base import BaseHTTPMiddleware
import structlog
logger = structlog.get_logger()
class LoggingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
start_time = time.time()
# Log request
request_body = None
if request.method in ["POST", "PUT", "PATCH"]:
body = await request.body()
if body:
try:
request_body = json.loads(body.decode())
# Remove sensitive data
if isinstance(request_body, dict):
request_body = self._sanitize_data(request_body)
except json.JSONDecodeError:
request_body = body.decode()[:1000] # First 1000 chars
logger.info(
"Request started",
method=request.method,
url=str(request.url),
headers=dict(request.headers),
body=request_body,
client_ip=request.client.host if request.client else None
)
# Process request
try:
response = await call_next(request)
duration = time.time() - start_time
logger.info(
"Request completed",
method=request.method,
url=str(request.url),
status_code=response.status_code,
duration=f"{duration:.3f}s"
)
return response
except Exception as e:
duration = time.time() - start_time
logger.error(
"Request failed",
method=request.method,
url=str(request.url),
duration=f"{duration:.3f}s",
error=str(e),
error_type=type(e).__name__
)
raise
def _sanitize_data(self, data: dict) -> dict:
"""Remove sensitive information from logged data"""
sensitive_keys = ['password', 'api_key', 'token', 'secret']
sanitized = data.copy()
for key in sensitive_keys:
if key in sanitized:
sanitized[key] = "[REDACTED]"
return sanitized
3. Common Issues and Solutions¶
Memory Leaks in React:
// Common issue: Not cleaning up subscriptions
// ❌ Bad
const MyComponent = () => {
const [data, setData] = useState(null);
useEffect(() => {
const subscription = websocket.subscribe(setData);
// Missing cleanup!
}, []);
return <div>{data}</div>;
};
// ✅ Good
const MyComponent = () => {
const [data, setData] = useState(null);
useEffect(() => {
const subscription = websocket.subscribe(setData);
return () => {
subscription.unsubscribe(); // Cleanup
};
}, []);
return <div>{data}</div>;
};
// Memory leak detector hook
const useMemoryLeakDetector = (componentName: string) => {
useEffect(() => {
const startMemory = performance.memory?.usedJSHeapSize;
return () => {
if (process.env.NODE_ENV === 'development' && performance.memory) {
const endMemory = performance.memory.usedJSHeapSize;
const memoryDiff = endMemory - startMemory;
if (memoryDiff > 1000000) { // 1MB threshold
console.warn(`[Memory Leak Warning] Component ${componentName} may have leaked ${(memoryDiff / 1024 / 1024).toFixed(2)}MB`);
}
}
};
}, [componentName]);
};
Database Connection Issues:
# Connection pool debugging
import asyncio
from contextlib import asynccontextmanager
class DatabaseConnectionDebugger:
def __init__(self, supabase_client):
self.client = supabase_client
self.active_connections = 0
self.max_connections = 0
@asynccontextmanager
async def get_connection(self):
"""Debug connection usage"""
self.active_connections += 1
self.max_connections = max(self.max_connections, self.active_connections)
logger.debug(
"Database connection acquired",
active_connections=self.active_connections,
max_connections=self.max_connections
)
try:
yield self.client
finally:
self.active_connections -= 1
logger.debug(
"Database connection released",
active_connections=self.active_connections
)
# Warn if too many connections
if self.active_connections > 10:
logger.warning(
"High number of active database connections",
active_connections=self.active_connections
)
# Usage
db_debugger = DatabaseConnectionDebugger(supabase_client)
async def get_jobs():
async with db_debugger.get_connection() as db:
return await db.from_('jobs').select('*').execute()
Debugging Commands and Tools¶
1. Development Scripts¶
# package.json scripts for debugging
{
"scripts": {
"dev:debug": "NODE_ENV=development DEBUG=* next dev",
"dev:inspect": "NODE_OPTIONS='--inspect' next dev",
"test:debug": "jest --detectOpenHandles --forceExit",
"build:analyze": "ANALYZE=true next build",
"db:debug": "supabase db reset --debug",
"logs:frontend": "vercel logs --project=hg-content-frontend",
"logs:backend": "railway logs --service=cpm"
}
}
# Python debugging commands
# Start with debugger
python -m pdb app.py
# Profile memory usage
python -m memory_profiler app.py
# Profile performance
python -m cProfile -o profile.stats app.py
python -c "import pstats; p = pstats.Stats('profile.stats'); p.sort_stats('cumulative').print_stats(10)"
2. Environment-Specific Debugging¶
# Development environment
export DEBUG=true
export LOG_LEVEL=DEBUG
export DATABASE_DEBUG=true
# Production debugging (careful!)
export LOG_LEVEL=INFO
export SENTRY_DEBUG=false
export PERFORMANCE_MONITORING=true
Performance Optimization Techniques¶
Frontend Optimization¶
1. Code Splitting and Lazy Loading¶
// app/layout.tsx - Strategic code splitting
import { Suspense, lazy } from 'react';
import { LoadingSpinner } from '@/components/ui/loading-spinner';
// Lazy load heavy components
const AnalyticsDashboard = lazy(() => import('@/components/analytics/AnalyticsDashboard'));
const StrategyEditor = lazy(() => import('@/components/strategies/StrategyEditor'));
export default function Layout({ children }: { children: React.ReactNode }) {
return (
<html lang="en">
<body>
<Suspense fallback={<LoadingSpinner />}>
{children}
</Suspense>
</body>
</html>
);
}
// Route-based code splitting
// app/(dashboard)/analytics/page.tsx
import dynamic from 'next/dynamic';
import { Skeleton } from '@/components/ui/skeleton';
const AnalyticsDashboard = dynamic(
() => import('@/components/analytics/AnalyticsDashboard'),
{
loading: () => (
<div className="space-y-4">
<Skeleton className="h-8 w-64" />
<div className="grid grid-cols-1 md:grid-cols-3 gap-4">
{Array.from({ length: 6 }).map((_, i) => (
<Skeleton key={i} className="h-32" />
))}
</div>
</div>
),
ssr: false, // Client-side only for heavy charts
}
);
export default function AnalyticsPage() {
return <AnalyticsDashboard />;
}
2. React Query Optimization¶
// lib/queryClient.ts
import { QueryClient } from '@tanstack/react-query';
export const queryClient = new QueryClient({
defaultOptions: {
queries: {
// Cache for 5 minutes
staleTime: 5 * 60 * 1000,
// Keep in cache for 10 minutes
cacheTime: 10 * 60 * 1000,
// Retry failed requests 2 times
retry: 2,
// Retry with exponential backoff
retryDelay: (attemptIndex) => Math.min(1000 * 2 ** attemptIndex, 30000),
// Refetch on window focus for critical data only
refetchOnWindowFocus: false,
// Background refetch for stale data
refetchOnMount: true,
},
mutations: {
retry: 1,
},
},
});
// Optimized query hooks
export const useJobs = (clientId: string, options: { enabled?: boolean } = {}) => {
return useQuery({
queryKey: ['jobs', clientId],
queryFn: () => jobsApi.getJobs(clientId),
enabled: !!clientId && options.enabled !== false,
// Optimize for frequent updates
refetchInterval: 30000, // 30 seconds
// Use previous data while refetching
keepPreviousData: true,
// Select only needed fields
select: (data) => data.map(job => ({
id: job.id,
status: job.status,
topic: job.topic,
createdAt: job.createdAt,
// Exclude heavy result data from list view
})),
});
};
// Prefetch related data
export const usePrefetchJobDetails = () => {
const queryClient = useQueryClient();
return useCallback((jobId: string) => {
queryClient.prefetchQuery({
queryKey: ['job', jobId],
queryFn: () => jobsApi.getJob(jobId),
staleTime: 2 * 60 * 1000, // 2 minutes
});
}, [queryClient]);
};
3. Component Optimization¶
// Memoization strategies
import { memo, useMemo, useCallback, useState, useEffect } from 'react';
// Memoize expensive calculations
const JobAnalytics = memo(({ jobs }: { jobs: Job[] }) => {
const analytics = useMemo(() => {
// Expensive calculation
return {
totalJobs: jobs.length,
completedJobs: jobs.filter(job => job.status === 'completed').length,
averageCost: jobs.reduce((sum, job) => sum + (job.estimatedCost || 0), 0) / jobs.length,
successRate: jobs.filter(job => job.status === 'completed').length / jobs.length * 100,
};
}, [jobs]);
return (
<div className="grid grid-cols-2 md:grid-cols-4 gap-4">
<div>Total Jobs: {analytics.totalJobs}</div>
<div>Completed: {analytics.completedJobs}</div>
<div>Avg Cost: ${analytics.averageCost.toFixed(4)}</div>
<div>Success Rate: {analytics.successRate.toFixed(1)}%</div>
</div>
);
});
// Virtualized lists for large datasets
import { FixedSizeList as List } from 'react-window';
const VirtualizedJobList = ({ jobs }: { jobs: Job[] }) => {
const Row = useCallback(({ index, style }: { index: number; style: React.CSSProperties }) => (
<div style={style} className="p-2 border-b">
<JobItem job={jobs[index]} />
</div>
), [jobs]);
return (
<List
height={600} // Fixed height
itemCount={jobs.length}
itemSize={80} // Height of each row
width="100%"
>
{Row}
</List>
);
};
// Debounced search
const useDebounce = <T,>(value: T, delay: number): T => {
const [debouncedValue, setDebouncedValue] = useState<T>(value);
useEffect(() => {
const handler = setTimeout(() => {
setDebouncedValue(value);
}, delay);
return () => {
clearTimeout(handler);
};
}, [value, delay]);
return debouncedValue;
};
const SearchableJobList = () => {
const [searchTerm, setSearchTerm] = useState('');
const debouncedSearchTerm = useDebounce(searchTerm, 300);
const { data: jobs } = useJobs(clientId, {
enabled: !!debouncedSearchTerm || debouncedSearchTerm === '',
});
const filteredJobs = useMemo(() => {
if (!debouncedSearchTerm) return jobs || [];
return jobs?.filter(job =>
job.topic.toLowerCase().includes(debouncedSearchTerm.toLowerCase())
) || [];
}, [jobs, debouncedSearchTerm]);
return (
<div>
<input
type="text"
value={searchTerm}
onChange={(e) => setSearchTerm(e.target.value)}
placeholder="Search jobs..."
/>
<VirtualizedJobList jobs={filteredJobs} />
</div>
);
};
Backend Optimization¶
1. Database Query Optimization¶
# Efficient database queries
from typing import List, Optional
import asyncio
class OptimizedDatabase:
def __init__(self, supabase_client):
self.client = supabase_client
async def get_jobs_with_analytics(
self,
client_id: str,
status: Optional[str] = None,
limit: int = 50,
offset: int = 0
) -> dict:
"""Get jobs with aggregated analytics in a single query"""
# Use raw SQL for complex queries
query = """
WITH job_data AS (
SELECT
id, topic, status, created_at, estimated_cost, llm_provider,
ROW_NUMBER() OVER (ORDER BY created_at DESC) as row_num
FROM jobs
WHERE client_id = %(client_id)s
AND (%(status)s IS NULL OR status = %(status)s)
),
job_analytics AS (
SELECT
COUNT(*) as total_jobs,
COUNT(*) FILTER (WHERE status = 'completed') as completed_jobs,
COUNT(*) FILTER (WHERE status = 'failed') as failed_jobs,
AVG(estimated_cost) FILTER (WHERE estimated_cost IS NOT NULL) as avg_cost,
SUM(estimated_cost) FILTER (WHERE estimated_cost IS NOT NULL) as total_cost
FROM jobs
WHERE client_id = %(client_id)s
AND (%(status)s IS NULL OR status = %(status)s)
)
SELECT
json_build_object(
'jobs', json_agg(
json_build_object(
'id', jd.id,
'topic', jd.topic,
'status', jd.status,
'created_at', jd.created_at,
'estimated_cost', jd.estimated_cost,
'llm_provider', jd.llm_provider
)
) FILTER (WHERE jd.row_num <= %(limit)s AND jd.row_num > %(offset)s),
'analytics', json_build_object(
'total_jobs', ja.total_jobs,
'completed_jobs', ja.completed_jobs,
'failed_jobs', ja.failed_jobs,
'success_rate',
CASE
WHEN ja.total_jobs > 0 THEN (ja.completed_jobs::float / ja.total_jobs * 100)
ELSE 0
END,
'avg_cost', ja.avg_cost,
'total_cost', ja.total_cost
),
'pagination', json_build_object(
'total', ja.total_jobs,
'limit', %(limit)s,
'offset', %(offset)s,
'has_more', ja.total_jobs > %(offset)s + %(limit)s
)
) as result
FROM job_data jd
CROSS JOIN job_analytics ja
GROUP BY ja.total_jobs, ja.completed_jobs, ja.failed_jobs, ja.avg_cost, ja.total_cost
"""
result = await self.client.rpc('execute_sql', {
'sql': query,
'params': {
'client_id': client_id,
'status': status,
'limit': limit,
'offset': offset
}
}).execute()
return result.data[0]['result'] if result.data else {}
async def batch_update_job_statuses(self, updates: List[dict]) -> List[dict]:
"""Batch update multiple job statuses efficiently"""
# Prepare batch update
update_cases = []
job_ids = []
for update in updates:
job_ids.append(update['id'])
update_cases.append(f"WHEN '{update['id']}' THEN '{update['status']}'")
if not update_cases:
return []
query = f"""
UPDATE jobs
SET
status = CASE id {' '.join(update_cases)} END,
updated_at = NOW()
WHERE id IN ({','.join([f"'{id}'" for id in job_ids])})
RETURNING id, status, updated_at
"""
result = await self.client.rpc('execute_sql', {'sql': query}).execute()
return result.data
# Connection pooling and caching
from functools import lru_cache
import asyncio
@lru_cache(maxsize=100)
async def get_client_strategy_cached(self, client_id: str, content_type: str) -> dict:
"""Cache client strategies to reduce database hits"""
result = await self.client.table('strategies').select('*').eq(
'client_id', client_id
).eq('content_type', content_type).eq('is_active', True).execute()
return result.data[0] if result.data else {}
async def bulk_create_jobs(self, jobs_data: List[dict]) -> List[dict]:
"""Efficiently create multiple jobs"""
# Use batch insert instead of individual inserts
result = await self.client.table('jobs').insert(jobs_data).execute()
# Process in background
job_ids = [job['id'] for job in result.data]
asyncio.create_task(self._process_jobs_background(job_ids))
return result.data
async def _process_jobs_background(self, job_ids: List[str]):
"""Background job processing"""
for job_id in job_ids:
try:
await self._process_single_job(job_id)
except Exception as e:
logger.error(f"Background job processing failed: {e}", job_id=job_id)
2. Caching Strategies¶
# Redis caching implementation
import redis.asyncio as redis
import json
import pickle
from typing import Any, Optional
from functools import wraps
class CacheManager:
def __init__(self, redis_url: str):
self.redis = redis.from_url(redis_url)
async def get(self, key: str) -> Optional[Any]:
"""Get value from cache"""
try:
value = await self.redis.get(key)
if value:
return pickle.loads(value)
except Exception as e:
logger.warning(f"Cache get failed: {e}", key=key)
return None
async def set(self, key: str, value: Any, ttl: int = 3600) -> bool:
"""Set value in cache with TTL"""
try:
serialized = pickle.dumps(value)
await self.redis.setex(key, ttl, serialized)
return True
except Exception as e:
logger.warning(f"Cache set failed: {e}", key=key)
return False
async def delete(self, key: str) -> bool:
"""Delete key from cache"""
try:
await self.redis.delete(key)
return True
except Exception as e:
logger.warning(f"Cache delete failed: {e}", key=key)
return False
async def invalidate_pattern(self, pattern: str):
"""Invalidate all keys matching pattern"""
try:
keys = await self.redis.keys(pattern)
if keys:
await self.redis.delete(*keys)
except Exception as e:
logger.warning(f"Cache invalidation failed: {e}", pattern=pattern)
# Cache decorator
def cached(ttl: int = 3600, key_prefix: str = ""):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# Generate cache key
cache_key = f"{key_prefix}:{func.__name__}:{hash(str(args) + str(kwargs))}"
# Try to get from cache
cached_result = await cache_manager.get(cache_key)
if cached_result is not None:
logger.debug(f"Cache hit: {cache_key}")
return cached_result
# Execute function
result = await func(*args, **kwargs)
# Store in cache
await cache_manager.set(cache_key, result, ttl)
logger.debug(f"Cache set: {cache_key}")
return result
return wrapper
return decorator
# Usage examples
cache_manager = CacheManager(os.getenv("REDIS_URL"))
@cached(ttl=1800, key_prefix="client_strategy") # 30 minutes
async def get_client_strategy(client_id: str, content_type: str) -> dict:
# Database query
pass
@cached(ttl=300, key_prefix="job_analytics") # 5 minutes
async def get_job_analytics(client_id: str) -> dict:
# Heavy analytics calculation
pass
# Cache invalidation on updates
async def update_client_strategy(client_id: str, strategy_data: dict):
# Update database
result = await database.update_strategy(client_id, strategy_data)
# Invalidate related cache
await cache_manager.invalidate_pattern(f"client_strategy:*:{client_id}:*")
return result
3. Async Processing and Queue Management¶
# Background job processing with Celery alternative
import asyncio
from typing import Dict, Any, Callable
import structlog
from enum import Enum
logger = structlog.get_logger()
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
class AsyncTaskQueue:
def __init__(self, max_workers: int = 10):
self.max_workers = max_workers
self.queue = asyncio.Queue()
self.workers = []
self.tasks: Dict[str, Dict[str, Any]] = {}
self.running = False
async def start(self):
"""Start the task queue workers"""
self.running = True
self.workers = [
asyncio.create_task(self._worker(f"worker-{i}"))
for i in range(self.max_workers)
]
logger.info(f"Started {self.max_workers} task queue workers")
async def stop(self):
"""Stop the task queue workers"""
self.running = False
# Cancel all workers
for worker in self.workers:
worker.cancel()
# Wait for workers to finish
await asyncio.gather(*self.workers, return_exceptions=True)
logger.info("Task queue workers stopped")
async def enqueue(self, task_id: str, func: Callable, *args, **kwargs) -> str:
"""Enqueue a task for processing"""
task_data = {
'id': task_id,
'func': func,
'args': args,
'kwargs': kwargs,
'status': TaskStatus.PENDING,
'created_at': asyncio.get_event_loop().time(),
'result': None,
'error': None
}
self.tasks[task_id] = task_data
await self.queue.put(task_data)
logger.info(f"Task enqueued: {task_id}")
return task_id
async def get_task_status(self, task_id: str) -> Dict[str, Any]:
"""Get task status and result"""
return self.tasks.get(task_id, {})
async def _worker(self, worker_name: str):
"""Worker coroutine to process tasks"""
logger.info(f"Worker {worker_name} started")
while self.running:
try:
# Get task from queue with timeout
task_data = await asyncio.wait_for(
self.queue.get(),
timeout=1.0
)
await self._process_task(worker_name, task_data)
self.queue.task_done()
except asyncio.TimeoutError:
continue # Check if still running
except Exception as e:
logger.error(f"Worker {worker_name} error: {e}")
async def _process_task(self, worker_name: str, task_data: Dict[str, Any]):
"""Process a single task"""
task_id = task_data['id']
try:
# Update status to running
task_data['status'] = TaskStatus.RUNNING
task_data['started_at'] = asyncio.get_event_loop().time()
logger.info(f"Worker {worker_name} processing task: {task_id}")
# Execute the task
func = task_data['func']
args = task_data['args']
kwargs = task_data['kwargs']
if asyncio.iscoroutinefunction(func):
result = await func(*args, **kwargs)
else:
result = func(*args, **kwargs)
# Update task with result
task_data['status'] = TaskStatus.COMPLETED
task_data['result'] = result
task_data['completed_at'] = asyncio.get_event_loop().time()
logger.info(f"Task completed: {task_id}")
except Exception as e:
# Update task with error
task_data['status'] = TaskStatus.FAILED
task_data['error'] = str(e)
task_data['failed_at'] = asyncio.get_event_loop().time()
logger.error(f"Task failed: {task_id}, error: {e}")
# Usage in FastAPI application
task_queue = AsyncTaskQueue(max_workers=5)
@app.on_event("startup")
async def startup_event():
await task_queue.start()
@app.on_event("shutdown")
async def shutdown_event():
await task_queue.stop()
# Content generation with queue
async def process_content_generation(job_id: str, request_data: dict):
"""Background content generation task"""
try:
# Process the content generation
result = await content_processor.generate_content(request_data)
# Save result to database
await database.update_job_result(job_id, result)
return result
except Exception as e:
await database.update_job_status(job_id, 'failed', error_message=str(e))
raise
@app.post("/generate")
async def generate_content_endpoint(request: ContentRequest):
# Create job record
job = await database.create_job(request.dict())
# Queue background processing
await task_queue.enqueue(
task_id=job['id'],
func=process_content_generation,
job_id=job['id'],
request_data=request.dict()
)
return {"job_id": job['id'], "status": "queued"}
@app.get("/jobs/{job_id}/status")
async def get_job_status(job_id: str):
# Get from queue first (for real-time status)
queue_status = await task_queue.get_task_status(job_id)
if queue_status:
return {
"job_id": job_id,
"status": queue_status['status'].value,
"result": queue_status.get('result'),
"error": queue_status.get('error')
}
# Fall back to database
job = await database.get_job(job_id)
return {
"job_id": job_id,
"status": job['status'],
"result": job.get('result'),
"error": job.get('error_message')
}
Performance Monitoring¶
1. Application Metrics¶
# Performance monitoring setup
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time
from functools import wraps
# Metrics
REQUEST_COUNT = Counter('http_requests_total', 'Total HTTP requests', ['method', 'endpoint', 'status'])
REQUEST_DURATION = Histogram('http_request_duration_seconds', 'HTTP request duration')
ACTIVE_CONNECTIONS = Gauge('active_connections', 'Active database connections')
JOB_PROCESSING_TIME = Histogram('job_processing_duration_seconds', 'Job processing time', ['provider'])
CACHE_HIT_RATE = Counter('cache_requests_total', 'Cache requests', ['type'])
def monitor_endpoint(func):
"""Decorator to monitor endpoint performance"""
@wraps(func)
async def wrapper(request, *args, **kwargs):
start_time = time.time()
status_code = 200
try:
response = await func(request, *args, **kwargs)
if hasattr(response, 'status_code'):
status_code = response.status_code
return response
except Exception as e:
status_code = 500
raise
finally:
# Record metrics
duration = time.time() - start_time
REQUEST_COUNT.labels(
method=request.method,
endpoint=request.url.path,
status=status_code
).inc()
REQUEST_DURATION.observe(duration)
return wrapper
# Start Prometheus metrics server
start_http_server(8000)
# Usage in FastAPI
@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
# Record metrics
duration = time.time() - start_time
REQUEST_COUNT.labels(
method=request.method,
endpoint=request.url.path,
status=response.status_code
).inc()
REQUEST_DURATION.observe(duration)
return response
2. Performance Testing¶
# Load testing with locust
from locust import HttpUser, task, between
import random
class ContentGenerationUser(HttpUser):
wait_time = between(1, 3)
def on_start(self):
"""Set up test data"""
self.client_id = "test-client"
self.topics = [
"AI in Healthcare",
"Future of Remote Work",
"Sustainable Energy Solutions",
"Digital Marketing Trends",
"Cloud Computing Benefits"
]
@task(3)
def generate_content(self):
"""Test content generation endpoint"""
payload = {
"topic": random.choice(self.topics),
"content_type": "blog_post",
"keywords": ["technology", "innovation"],
"client_id": self.client_id,
"llm_provider": "openai"
}
with self.client.post("/generate", json=payload, catch_response=True) as response:
if response.status_code == 200:
job_id = response.json().get("job_id")
if job_id:
response.success()
# Check job status
self.check_job_status(job_id)
else:
response.failure("No job_id in response")
else:
response.failure(f"Request failed with status {response.status_code}")
@task(1)
def check_job_status(self, job_id: str = None):
"""Test job status endpoint"""
if not job_id:
# Use a dummy job ID for testing
job_id = "test-job-123"
with self.client.get(f"/jobs/{job_id}", catch_response=True) as response:
if response.status_code == 200:
response.success()
elif response.status_code == 404:
response.success() # Expected for dummy job IDs
else:
response.failure(f"Unexpected status code: {response.status_code}")
@task(1)
def list_jobs(self):
"""Test job listing endpoint"""
with self.client.get(f"/jobs?client_id={self.client_id}", catch_response=True) as response:
if response.status_code == 200:
jobs = response.json().get("data", [])
response.success()
else:
response.failure(f"Request failed with status {response.status_code}")
# Run load test
# locust -f load_test.py --host=http://localhost:8001
This Developer Guide is a living document that evolves with the HG Content Generation System. For the latest updates and additional resources, refer to the project repository and internal documentation.
Last Updated: August 2025
Version: 1.0.0
Maintainer: HG Content Development Team