Data Flow Architecture

Comprehensive overview of data flow patterns including request/response flow, real-time data synchronization, and service communication.

Data Flow Architecture

Understanding how data flows through the system is crucial for maintaining performance, reliability, and debugging issues.

Request/Response Flow

Standard HTTP Request Flow

graph TD
    A[User Request] --> B[Next.js Middleware]
    B --> C[Authentication Check]
    C --> D{Authenticated?}
    D -->|No| E[Redirect to Login]
    D -->|Yes| F[Route Handler/Server Component]
    F --> G[Business Logic Service]
    G --> H[Data Validation]
    H --> I[Supabase Client]
    I --> J[PostgreSQL Database]
    J --> K[Query Results]
    K --> L[Data Processing]
    L --> M[Response Formatting]
    M --> N[Client Receives Data]
    E --> O[Login Page]

Detailed Request Processing

1. Request Initiation

// Client-side request initiation
export async function fetchProducts(filters?: ProductFilters) {
  const searchParams = new URLSearchParams()
  
  if (filters?.category) {
    searchParams.set('category', filters.category)
  }
  if (filters?.search) {
    searchParams.set('search', filters.search)
  }
  
  const response = await fetch(`/api/products?${searchParams}`)
  
  if (!response.ok) {
    throw new Error(`HTTP error! status: ${response.status}`)
  }
  
  return response.json() as ProductsResponse
}

2. Middleware Processing

// middleware.ts
export function middleware(request: NextRequest) {
  const startTime = Date.now()
  
  // CORS handling
  if (request.method === 'OPTIONS') {
    return corsResponse()
  }
  
  // Authentication check
  const token = request.cookies.get('auth-token')
  const isProtectedRoute = protectedRoutes.some(route => 
    request.nextUrl.pathname.startsWith(route)
  )
  
  if (isProtectedRoute && !token) {
    return NextResponse.redirect(new URL('/login', request.url))
  }
  
  // Add request tracking
  const response = NextResponse.next()
  response.headers.set('X-Request-ID', generateRequestId())
  response.headers.set('X-Response-Time', `${Date.now() - startTime}ms`)
  
  return response
}

3. Route Handler Processing

// app/api/products/route.ts
export async function GET(request: Request) {
  const requestId = request.headers.get('X-Request-ID')
  
  try {
    // Extract query parameters
    const { searchParams } = new URL(request.url)
    const filters = {
      category: searchParams.get('category'),
      search: searchParams.get('search'),
      limit: parseInt(searchParams.get('limit') || '50'),
      offset: parseInt(searchParams.get('offset') || '0')
    }
    
    // Validate parameters
    const validatedFilters = ProductFiltersSchema.parse(filters)
    
    // Business logic
    const products = await productService.getProducts(validatedFilters)
    
    // Response formatting
    return NextResponse.json({
      success: true,
      data: products,
      meta: {
        total: products.length,
        requestId
      }
    })
    
  } catch (error) {
    logger.error('Failed to fetch products', { error, requestId })
    
    return NextResponse.json({
      success: false,
      error: error instanceof Error ? error.message : 'Unknown error',
      requestId
    }, { status: 500 })
  }
}

4. Service Layer Processing

// services/productService.ts
export class ProductService {
  async getProducts(filters: ProductFilters): Promise<Product[]> {
    const startTime = Date.now()
    
    try {
      // Build query
      let query = this.supabase
        .from('products')
        .select(`
          id,
          name,
          price,
          sku,
          category:categories(id, name),
          inventory:inventory_items(quantity, reserved)
        `)
      
      // Apply filters
      if (filters.category) {
        query = query.eq('category_id', filters.category)
      }
      
      if (filters.search) {
        query = query.or(`name.ilike.%${filters.search}%,sku.ilike.%${filters.search}%`)
      }
      
      // Execute query
      const { data, error } = await query
        .range(filters.offset, filters.offset + filters.limit - 1)
        .order('name')
      
      if (error) {
        throw new DatabaseError(error.message)
      }
      
      // Log performance
      const duration = Date.now() - startTime
      logger.info('Products fetched', { 
        count: data?.length || 0, 
        duration,
        filters 
      })
      
      return data || []
      
    } catch (error) {
      logger.error('Failed to fetch products from database', { error, filters })
      throw error
    }
  }
}

Real-time Data Flow

WebSocket Connection Flow

graph TD
    A[Database Change] --> B[PostgreSQL Trigger]
    B --> C[Supabase Realtime Engine]
    C --> D[WebSocket Broadcast]
    D --> E[Client WebSocket Connection]
    E --> F[Subscription Handler]
    F --> G[React State Update]
    G --> H[Component Re-render]
    H --> I[UI Update]

Real-time Implementation

1. Database Triggers

-- Database trigger for real-time updates
CREATE OR REPLACE FUNCTION notify_product_change()
RETURNS trigger AS $$
BEGIN
  -- Notify realtime engine of changes
  PERFORM pg_notify('product_changes', json_build_object(
    'operation', TG_OP,
    'record', row_to_json(NEW),
    'old_record', row_to_json(OLD),
    'timestamp', now()
  )::text);
  
  RETURN COALESCE(NEW, OLD);
END;
$$ LANGUAGE plpgsql;

-- Attach trigger to products table
CREATE TRIGGER product_realtime_trigger
  AFTER INSERT OR UPDATE OR DELETE ON products
  FOR EACH ROW EXECUTE FUNCTION notify_product_change();

2. Client-side Subscription

// hooks/useRealtimeProducts.ts
export function useRealtimeProducts(filters?: ProductFilters) {
  const [products, setProducts] = useState<Product[]>([])
  const [loading, setLoading] = useState(true)
  const [error, setError] = useState<string | null>(null)
  
  useEffect(() => {
    let channel: RealtimeChannel
    
    const setupSubscription = async () => {
      try {
        // Initial data fetch
        const initialProducts = await productService.getProducts(filters)
        setProducts(initialProducts)
        setLoading(false)
        
        // Setup realtime subscription
        channel = supabase
          .channel('products-changes')
          .on('postgres_changes', {
            event: '*',
            schema: 'public',
            table: 'products',
            filter: filters?.category ? `category_id=eq.${filters.category}` : undefined
          }, (payload) => {
            handleRealtimeUpdate(payload)
          })
          .subscribe((status) => {
            if (status === 'SUBSCRIBED') {
              logger.info('Subscribed to product changes')
            } else if (status === 'CHANNEL_ERROR') {
              setError('Failed to subscribe to updates')
            }
          })
          
      } catch (err) {
        setError(err instanceof Error ? err.message : 'Failed to load products')
        setLoading(false)
      }
    }
    
    const handleRealtimeUpdate = (payload: RealtimePostgresChangesPayload<Product>) => {
      setProducts(currentProducts => {
        switch (payload.eventType) {
          case 'INSERT':
            return [...currentProducts, payload.new]
            
          case 'UPDATE':
            return currentProducts.map(product =>
              product.id === payload.new.id ? payload.new : product
            )
            
          case 'DELETE':
            return currentProducts.filter(product => product.id !== payload.old.id)
            
          default:
            return currentProducts
        }
      })
    }
    
    setupSubscription()
    
    return () => {
      if (channel) {
        supabase.removeChannel(channel)
      }
    }
  }, [filters])
  
  return { products, loading, error }
}

3. Optimistic Updates

// Optimistic updates for better UX
export function useOptimisticProducts() {
  const { products, loading, error } = useRealtimeProducts()
  const [optimisticProducts, setOptimisticProducts] = useState<Product[]>([])
  
  const createProduct = async (productData: CreateProductData) => {
    const tempId = `temp-${Date.now()}`
    const optimisticProduct: Product = {
      ...productData,
      id: tempId,
      created_at: new Date().toISOString(),
      updated_at: new Date().toISOString()
    }
    
    // Optimistically add to UI
    setOptimisticProducts(prev => [...prev, optimisticProduct])
    
    try {
      const createdProduct = await productService.createProduct(productData)
      
      // Remove optimistic product when real one comes through realtime
      setOptimisticProducts(prev => 
        prev.filter(p => p.id !== tempId)
      )
      
      return createdProduct
      
    } catch (error) {
      // Remove failed optimistic update
      setOptimisticProducts(prev => 
        prev.filter(p => p.id !== tempId)
      )
      throw error
    }
  }
  
  const allProducts = [...products, ...optimisticProducts]
  
  return { products: allProducts, loading, error, createProduct }
}

Service Communication

Internal Service Communication

1. Direct Function Calls

// Synchronous service communication
export class OrderService {
  constructor(
    private productService: ProductService,
    private inventoryService: InventoryService,
    private customerService: CustomerService
  ) {}
  
  async createOrder(orderData: CreateOrderData): Promise<Order> {
    // Validate customer
    const customer = await this.customerService.getCustomer(orderData.customerId)
    if (!customer) {
      throw new BusinessError('Customer not found')
    }
    
    // Validate products and check inventory
    const orderItems = await Promise.all(
      orderData.items.map(async (item) => {
        const product = await this.productService.getProduct(item.productId)
        if (!product) {
          throw new BusinessError(`Product ${item.productId} not found`)
        }
        
        const availability = await this.inventoryService.checkAvailability(
          item.productId, 
          item.quantity
        )
        if (!availability.available) {
          throw new BusinessError(
            `Insufficient inventory for ${product.name}. Available: ${availability.quantity}`
          )
        }
        
        return {
          ...item,
          unitPrice: product.price,
          totalPrice: product.price * item.quantity
        }
      })
    )
    
    // Create order
    const order = await this.createOrderRecord({
      ...orderData,
      items: orderItems,
      totalAmount: orderItems.reduce((sum, item) => sum + item.totalPrice, 0)
    })
    
    // Reserve inventory
    await this.inventoryService.reserveInventory(
      order.id,
      orderItems.map(item => ({
        productId: item.productId,
        quantity: item.quantity
      }))
    )
    
    return order
  }
}

2. Event-Driven Communication

// Event-driven service communication
export class EventBus {
  private eventHandlers: Map<string, Array<(data: any) => Promise<void>>> = new Map()
  
  async emit(eventName: string, data: any) {
    const handlers = this.eventHandlers.get(eventName) || []
    
    // Execute handlers in parallel
    await Promise.allSettled(
      handlers.map(handler => handler(data))
    )
  }
  
  on(eventName: string, handler: (data: any) => Promise<void>) {
    const handlers = this.eventHandlers.get(eventName) || []
    this.eventHandlers.set(eventName, [...handlers, handler])
  }
}

// Service event handlers
export class InventoryEventHandlers {
  constructor(private inventoryService: InventoryService) {
    this.setupEventHandlers()
  }
  
  private setupEventHandlers() {
    eventBus.on('order.created', async (order: Order) => {
      await this.handleOrderCreated(order)
    })
    
    eventBus.on('order.cancelled', async (order: Order) => {
      await this.handleOrderCancelled(order)
    })
    
    eventBus.on('product.discontinued', async (product: Product) => {
      await this.handleProductDiscontinued(product)
    })
  }
  
  private async handleOrderCreated(order: Order) {
    try {
      // Reserve inventory for order items
      await this.inventoryService.reserveInventory(
        order.id,
        order.items.map(item => ({
          productId: item.productId,
          quantity: item.quantity
        }))
      )
      
      logger.info('Inventory reserved for order', { orderId: order.id })
      
    } catch (error) {
      logger.error('Failed to reserve inventory for order', { 
        orderId: order.id, 
        error 
      })
      
      // Emit failure event
      await eventBus.emit('inventory.reservation.failed', { order, error })
    }
  }
}

External Service Communication

1. HTTP API Integration

// External API service wrapper
export class ExternalProductService {
  private httpClient: HttpClient
  
  constructor() {
    this.httpClient = new HttpClient({
      baseURL: process.env.EXTERNAL_API_URL,
      timeout: 30000,
      retries: 3
    })
  }
  
  async getProductInfo(barcode: string): Promise<ExternalProductInfo | null> {
    try {
      const response = await this.httpClient.get(`/products/${barcode}`, {
        headers: {
          'Authorization': `Bearer ${process.env.EXTERNAL_API_KEY}`,
          'Content-Type': 'application/json'
        }
      })
      
      if (response.status === 404) {
        return null // Product not found
      }
      
      if (!response.ok) {
        throw new ExternalServiceError(
          `External API error: ${response.status} ${response.statusText}`
        )
      }
      
      return response.data
      
    } catch (error) {
      if (error instanceof NetworkError) {
        logger.warn('External API network error', { barcode, error })
        return null // Graceful degradation
      }
      
      throw error
    }
  }
}

2. Message Queue Integration

// Queue-based communication for async operations
export class MessageQueueService {
  private redis: Redis
  
  constructor() {
    this.redis = new Redis(process.env.REDIS_URL)
  }
  
  async publishEvent(queueName: string, event: any) {
    const message = {
      id: generateId(),
      timestamp: new Date().toISOString(),
      data: event
    }
    
    await this.redis.lpush(queueName, JSON.stringify(message))
    logger.info('Event published to queue', { queueName, eventId: message.id })
  }
  
  async consumeEvents(queueName: string, handler: (event: any) => Promise<void>) {
    while (true) {
      try {
        const result = await this.redis.brpop(queueName, 5) // 5 second timeout
        
        if (result) {
          const [, messageStr] = result
          const message = JSON.parse(messageStr)
          
          await handler(message.data)
          logger.info('Event processed', { queueName, eventId: message.id })
        }
        
      } catch (error) {
        logger.error('Error processing queue event', { queueName, error })
        await new Promise(resolve => setTimeout(resolve, 1000)) // Backoff
      }
    }
  }
}

Data Flow Monitoring

Request Tracing

// Request tracing for debugging
export class RequestTracer {
  static trace<T>(operation: string, fn: () => Promise<T>): Promise<T> {
    const traceId = generateTraceId()
    const startTime = Date.now()
    
    logger.info('Operation started', { operation, traceId })
    
    return fn()
      .then(result => {
        const duration = Date.now() - startTime
        logger.info('Operation completed', { 
          operation, 
          traceId, 
          duration,
          success: true 
        })
        return result
      })
      .catch(error => {
        const duration = Date.now() - startTime
        logger.error('Operation failed', { 
          operation, 
          traceId, 
          duration,
          error: error.message,
          success: false 
        })
        throw error
      })
  }
}

// Usage
export async function getProducts(filters: ProductFilters): Promise<Product[]> {
  return RequestTracer.trace('getProducts', async () => {
    return await productService.getProducts(filters)
  })
}

Performance Monitoring

// Performance monitoring for data flow
export class PerformanceMonitor {
  private static metrics: Map<string, number[]> = new Map()
  
  static recordMetric(operation: string, duration: number) {
    const existing = this.metrics.get(operation) || []
    existing.push(duration)
    
    // Keep only last 100 measurements
    if (existing.length > 100) {
      existing.shift()
    }
    
    this.metrics.set(operation, existing)
  }
  
  static getMetrics(operation: string) {
    const measurements = this.metrics.get(operation) || []
    
    if (measurements.length === 0) {
      return null
    }
    
    const sorted = [...measurements].sort((a, b) => a - b)
    
    return {
      count: measurements.length,
      avg: measurements.reduce((sum, val) => sum + val, 0) / measurements.length,
      min: sorted[0],
      max: sorted[sorted.length - 1],
      p50: sorted[Math.floor(sorted.length * 0.5)],
      p95: sorted[Math.floor(sorted.length * 0.95)],
      p99: sorted[Math.floor(sorted.length * 0.99)]
    }
  }
}

This data flow architecture ensures reliable, performant, and observable data movement throughout the system while maintaining flexibility for future enhancements.