Data Flow Architecture
Comprehensive overview of data flow patterns including request/response flow, real-time data synchronization, and service communication.
Data Flow Architecture
Understanding how data flows through the system is crucial for maintaining performance, reliability, and debugging issues.
Request/Response Flow
Standard HTTP Request Flow
graph TD
A[User Request] --> B[Next.js Middleware]
B --> C[Authentication Check]
C --> D{Authenticated?}
D -->|No| E[Redirect to Login]
D -->|Yes| F[Route Handler/Server Component]
F --> G[Business Logic Service]
G --> H[Data Validation]
H --> I[Supabase Client]
I --> J[PostgreSQL Database]
J --> K[Query Results]
K --> L[Data Processing]
L --> M[Response Formatting]
M --> N[Client Receives Data]
E --> O[Login Page]
Detailed Request Processing
1. Request Initiation
// Client-side request initiation
export async function fetchProducts(filters?: ProductFilters) {
const searchParams = new URLSearchParams()
if (filters?.category) {
searchParams.set('category', filters.category)
}
if (filters?.search) {
searchParams.set('search', filters.search)
}
const response = await fetch(`/api/products?${searchParams}`)
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`)
}
return response.json() as ProductsResponse
}
2. Middleware Processing
// middleware.ts
export function middleware(request: NextRequest) {
const startTime = Date.now()
// CORS handling
if (request.method === 'OPTIONS') {
return corsResponse()
}
// Authentication check
const token = request.cookies.get('auth-token')
const isProtectedRoute = protectedRoutes.some(route =>
request.nextUrl.pathname.startsWith(route)
)
if (isProtectedRoute && !token) {
return NextResponse.redirect(new URL('/login', request.url))
}
// Add request tracking
const response = NextResponse.next()
response.headers.set('X-Request-ID', generateRequestId())
response.headers.set('X-Response-Time', `${Date.now() - startTime}ms`)
return response
}
3. Route Handler Processing
// app/api/products/route.ts
export async function GET(request: Request) {
const requestId = request.headers.get('X-Request-ID')
try {
// Extract query parameters
const { searchParams } = new URL(request.url)
const filters = {
category: searchParams.get('category'),
search: searchParams.get('search'),
limit: parseInt(searchParams.get('limit') || '50'),
offset: parseInt(searchParams.get('offset') || '0')
}
// Validate parameters
const validatedFilters = ProductFiltersSchema.parse(filters)
// Business logic
const products = await productService.getProducts(validatedFilters)
// Response formatting
return NextResponse.json({
success: true,
data: products,
meta: {
total: products.length,
requestId
}
})
} catch (error) {
logger.error('Failed to fetch products', { error, requestId })
return NextResponse.json({
success: false,
error: error instanceof Error ? error.message : 'Unknown error',
requestId
}, { status: 500 })
}
}
4. Service Layer Processing
// services/productService.ts
export class ProductService {
async getProducts(filters: ProductFilters): Promise<Product[]> {
const startTime = Date.now()
try {
// Build query
let query = this.supabase
.from('products')
.select(`
id,
name,
price,
sku,
category:categories(id, name),
inventory:inventory_items(quantity, reserved)
`)
// Apply filters
if (filters.category) {
query = query.eq('category_id', filters.category)
}
if (filters.search) {
query = query.or(`name.ilike.%${filters.search}%,sku.ilike.%${filters.search}%`)
}
// Execute query
const { data, error } = await query
.range(filters.offset, filters.offset + filters.limit - 1)
.order('name')
if (error) {
throw new DatabaseError(error.message)
}
// Log performance
const duration = Date.now() - startTime
logger.info('Products fetched', {
count: data?.length || 0,
duration,
filters
})
return data || []
} catch (error) {
logger.error('Failed to fetch products from database', { error, filters })
throw error
}
}
}
Real-time Data Flow
WebSocket Connection Flow
graph TD
A[Database Change] --> B[PostgreSQL Trigger]
B --> C[Supabase Realtime Engine]
C --> D[WebSocket Broadcast]
D --> E[Client WebSocket Connection]
E --> F[Subscription Handler]
F --> G[React State Update]
G --> H[Component Re-render]
H --> I[UI Update]
Real-time Implementation
1. Database Triggers
-- Database trigger for real-time updates
CREATE OR REPLACE FUNCTION notify_product_change()
RETURNS trigger AS $$
BEGIN
-- Notify realtime engine of changes
PERFORM pg_notify('product_changes', json_build_object(
'operation', TG_OP,
'record', row_to_json(NEW),
'old_record', row_to_json(OLD),
'timestamp', now()
)::text);
RETURN COALESCE(NEW, OLD);
END;
$$ LANGUAGE plpgsql;
-- Attach trigger to products table
CREATE TRIGGER product_realtime_trigger
AFTER INSERT OR UPDATE OR DELETE ON products
FOR EACH ROW EXECUTE FUNCTION notify_product_change();
2. Client-side Subscription
// hooks/useRealtimeProducts.ts
export function useRealtimeProducts(filters?: ProductFilters) {
const [products, setProducts] = useState<Product[]>([])
const [loading, setLoading] = useState(true)
const [error, setError] = useState<string | null>(null)
useEffect(() => {
let channel: RealtimeChannel
const setupSubscription = async () => {
try {
// Initial data fetch
const initialProducts = await productService.getProducts(filters)
setProducts(initialProducts)
setLoading(false)
// Setup realtime subscription
channel = supabase
.channel('products-changes')
.on('postgres_changes', {
event: '*',
schema: 'public',
table: 'products',
filter: filters?.category ? `category_id=eq.${filters.category}` : undefined
}, (payload) => {
handleRealtimeUpdate(payload)
})
.subscribe((status) => {
if (status === 'SUBSCRIBED') {
logger.info('Subscribed to product changes')
} else if (status === 'CHANNEL_ERROR') {
setError('Failed to subscribe to updates')
}
})
} catch (err) {
setError(err instanceof Error ? err.message : 'Failed to load products')
setLoading(false)
}
}
const handleRealtimeUpdate = (payload: RealtimePostgresChangesPayload<Product>) => {
setProducts(currentProducts => {
switch (payload.eventType) {
case 'INSERT':
return [...currentProducts, payload.new]
case 'UPDATE':
return currentProducts.map(product =>
product.id === payload.new.id ? payload.new : product
)
case 'DELETE':
return currentProducts.filter(product => product.id !== payload.old.id)
default:
return currentProducts
}
})
}
setupSubscription()
return () => {
if (channel) {
supabase.removeChannel(channel)
}
}
}, [filters])
return { products, loading, error }
}
3. Optimistic Updates
// Optimistic updates for better UX
export function useOptimisticProducts() {
const { products, loading, error } = useRealtimeProducts()
const [optimisticProducts, setOptimisticProducts] = useState<Product[]>([])
const createProduct = async (productData: CreateProductData) => {
const tempId = `temp-${Date.now()}`
const optimisticProduct: Product = {
...productData,
id: tempId,
created_at: new Date().toISOString(),
updated_at: new Date().toISOString()
}
// Optimistically add to UI
setOptimisticProducts(prev => [...prev, optimisticProduct])
try {
const createdProduct = await productService.createProduct(productData)
// Remove optimistic product when real one comes through realtime
setOptimisticProducts(prev =>
prev.filter(p => p.id !== tempId)
)
return createdProduct
} catch (error) {
// Remove failed optimistic update
setOptimisticProducts(prev =>
prev.filter(p => p.id !== tempId)
)
throw error
}
}
const allProducts = [...products, ...optimisticProducts]
return { products: allProducts, loading, error, createProduct }
}
Service Communication
Internal Service Communication
1. Direct Function Calls
// Synchronous service communication
export class OrderService {
constructor(
private productService: ProductService,
private inventoryService: InventoryService,
private customerService: CustomerService
) {}
async createOrder(orderData: CreateOrderData): Promise<Order> {
// Validate customer
const customer = await this.customerService.getCustomer(orderData.customerId)
if (!customer) {
throw new BusinessError('Customer not found')
}
// Validate products and check inventory
const orderItems = await Promise.all(
orderData.items.map(async (item) => {
const product = await this.productService.getProduct(item.productId)
if (!product) {
throw new BusinessError(`Product ${item.productId} not found`)
}
const availability = await this.inventoryService.checkAvailability(
item.productId,
item.quantity
)
if (!availability.available) {
throw new BusinessError(
`Insufficient inventory for ${product.name}. Available: ${availability.quantity}`
)
}
return {
...item,
unitPrice: product.price,
totalPrice: product.price * item.quantity
}
})
)
// Create order
const order = await this.createOrderRecord({
...orderData,
items: orderItems,
totalAmount: orderItems.reduce((sum, item) => sum + item.totalPrice, 0)
})
// Reserve inventory
await this.inventoryService.reserveInventory(
order.id,
orderItems.map(item => ({
productId: item.productId,
quantity: item.quantity
}))
)
return order
}
}
2. Event-Driven Communication
// Event-driven service communication
export class EventBus {
private eventHandlers: Map<string, Array<(data: any) => Promise<void>>> = new Map()
async emit(eventName: string, data: any) {
const handlers = this.eventHandlers.get(eventName) || []
// Execute handlers in parallel
await Promise.allSettled(
handlers.map(handler => handler(data))
)
}
on(eventName: string, handler: (data: any) => Promise<void>) {
const handlers = this.eventHandlers.get(eventName) || []
this.eventHandlers.set(eventName, [...handlers, handler])
}
}
// Service event handlers
export class InventoryEventHandlers {
constructor(private inventoryService: InventoryService) {
this.setupEventHandlers()
}
private setupEventHandlers() {
eventBus.on('order.created', async (order: Order) => {
await this.handleOrderCreated(order)
})
eventBus.on('order.cancelled', async (order: Order) => {
await this.handleOrderCancelled(order)
})
eventBus.on('product.discontinued', async (product: Product) => {
await this.handleProductDiscontinued(product)
})
}
private async handleOrderCreated(order: Order) {
try {
// Reserve inventory for order items
await this.inventoryService.reserveInventory(
order.id,
order.items.map(item => ({
productId: item.productId,
quantity: item.quantity
}))
)
logger.info('Inventory reserved for order', { orderId: order.id })
} catch (error) {
logger.error('Failed to reserve inventory for order', {
orderId: order.id,
error
})
// Emit failure event
await eventBus.emit('inventory.reservation.failed', { order, error })
}
}
}
External Service Communication
1. HTTP API Integration
// External API service wrapper
export class ExternalProductService {
private httpClient: HttpClient
constructor() {
this.httpClient = new HttpClient({
baseURL: process.env.EXTERNAL_API_URL,
timeout: 30000,
retries: 3
})
}
async getProductInfo(barcode: string): Promise<ExternalProductInfo | null> {
try {
const response = await this.httpClient.get(`/products/${barcode}`, {
headers: {
'Authorization': `Bearer ${process.env.EXTERNAL_API_KEY}`,
'Content-Type': 'application/json'
}
})
if (response.status === 404) {
return null // Product not found
}
if (!response.ok) {
throw new ExternalServiceError(
`External API error: ${response.status} ${response.statusText}`
)
}
return response.data
} catch (error) {
if (error instanceof NetworkError) {
logger.warn('External API network error', { barcode, error })
return null // Graceful degradation
}
throw error
}
}
}
2. Message Queue Integration
// Queue-based communication for async operations
export class MessageQueueService {
private redis: Redis
constructor() {
this.redis = new Redis(process.env.REDIS_URL)
}
async publishEvent(queueName: string, event: any) {
const message = {
id: generateId(),
timestamp: new Date().toISOString(),
data: event
}
await this.redis.lpush(queueName, JSON.stringify(message))
logger.info('Event published to queue', { queueName, eventId: message.id })
}
async consumeEvents(queueName: string, handler: (event: any) => Promise<void>) {
while (true) {
try {
const result = await this.redis.brpop(queueName, 5) // 5 second timeout
if (result) {
const [, messageStr] = result
const message = JSON.parse(messageStr)
await handler(message.data)
logger.info('Event processed', { queueName, eventId: message.id })
}
} catch (error) {
logger.error('Error processing queue event', { queueName, error })
await new Promise(resolve => setTimeout(resolve, 1000)) // Backoff
}
}
}
}
Data Flow Monitoring
Request Tracing
// Request tracing for debugging
export class RequestTracer {
static trace<T>(operation: string, fn: () => Promise<T>): Promise<T> {
const traceId = generateTraceId()
const startTime = Date.now()
logger.info('Operation started', { operation, traceId })
return fn()
.then(result => {
const duration = Date.now() - startTime
logger.info('Operation completed', {
operation,
traceId,
duration,
success: true
})
return result
})
.catch(error => {
const duration = Date.now() - startTime
logger.error('Operation failed', {
operation,
traceId,
duration,
error: error.message,
success: false
})
throw error
})
}
}
// Usage
export async function getProducts(filters: ProductFilters): Promise<Product[]> {
return RequestTracer.trace('getProducts', async () => {
return await productService.getProducts(filters)
})
}
Performance Monitoring
// Performance monitoring for data flow
export class PerformanceMonitor {
private static metrics: Map<string, number[]> = new Map()
static recordMetric(operation: string, duration: number) {
const existing = this.metrics.get(operation) || []
existing.push(duration)
// Keep only last 100 measurements
if (existing.length > 100) {
existing.shift()
}
this.metrics.set(operation, existing)
}
static getMetrics(operation: string) {
const measurements = this.metrics.get(operation) || []
if (measurements.length === 0) {
return null
}
const sorted = [...measurements].sort((a, b) => a - b)
return {
count: measurements.length,
avg: measurements.reduce((sum, val) => sum + val, 0) / measurements.length,
min: sorted[0],
max: sorted[sorted.length - 1],
p50: sorted[Math.floor(sorted.length * 0.5)],
p95: sorted[Math.floor(sorted.length * 0.95)],
p99: sorted[Math.floor(sorted.length * 0.99)]
}
}
}
This data flow architecture ensures reliable, performant, and observable data movement throughout the system while maintaining flexibility for future enhancements.