Skip to main content

Event-Driven Logging Architecture

This directory contains the event-driven logging infrastructure that decouples usage tracking from the gateway service using NATS JetStream.

Architecture

┌─────────┐       ┌──────────────┐       ┌──────────────┐       ┌────────────┐
│ Gateway │──────>│     NATS     │──────>│     Log      │──────>│ PostgreSQL │
│         │ pub   │  JetStream   │ sub   │  Consumer    │ batch │            │
└─────────┘       └──────────────┘       └──────────────┘       └────────────┘
   Tool              Persistent             Batching              Audit
 Execution             Events              Processing              Logs

Components

1. Event Publisher (pkg/events/)

  • types.go: Event schemas (ToolExecutionEvent, TunnelStatusEvent, PolicyBlockEvent)
  • publisher.go: NATS JetStream publisher with automatic reconnection
  • mock.go: Mock publisher for testing

2. Gateway (internal/gateway/usage_tracker.go)

  • Publishes tool execution events to NATS asynchronously
  • Non-blocking, fire-and-forget pattern
  • No local batching (moved to consumer)

3. Log Consumer Service (cmd/log-consumer/, internal/logconsumer/)

  • Subscribes to NATS usage log stream
  • Batches events (100 entries or 5 seconds)
  • Writes to PostgreSQL using existing database methods
  • Durable consumer with manual acknowledgment

Benefits

Reliability: Events persist in NATS even if gateway crashes ✅ Decoupling: Gateway and logging are independent services ✅ Scalability: Consumer can be horizontally scaled ✅ Zero Data Loss: Durable consumer with acknowledgments ✅ Observability: NATS monitoring endpoint

Quick Start

# Start all services
docker-compose up --build

# View logs
docker-compose logs -f gateway
docker-compose logs -f log-consumer

Local Development

# Terminal 1: Start infrastructure
docker-compose up postgres nats

# Terminal 2: Start gateway
make run-gateway

# Terminal 3: Start log-consumer
make run-log-consumer

Configuration

Gateway

NATS_URL=nats://localhost:4222
DATABASE_URL=postgres://postgres:postgres@localhost:5432/nitaq?sslmode=disable
HTTP_PORT=8080
TUNNEL_PORT=7000

Log Consumer

NATS_URL=nats://localhost:4222
DATABASE_URL=postgres://postgres:postgres@localhost:5432/nitaq?sslmode=disable
BATCH_SIZE=100
FLUSH_INTERVAL=5s
DEBUG=true

Testing

# Unit tests
go test ./internal/gateway -v -run TestUsageTracker
go test ./pkg/events -v

# Integration test
./test-integration.sh

Monitoring

NATS Monitoring

JetStream Streams

  • AGENTICOS_USAGE - Tool usage logs
  • AGENTICOS_TUNNEL - Tunnel state changes (future)
  • AGENTICOS_POLICY - Policy blocks (future)

Future Extensions

This architecture is ready for:
  1. Alert Engine - Subscribe to events, evaluate OPA rules, trigger alerts
  2. Notification Worker - Route alerts to Slack, PagerDuty, webhooks
  3. Prometheus Metrics - Track throughput, lag, error rates
  4. Horizontal Scaling - Run multiple log-consumer instances

Troubleshooting

Gateway can’t connect to NATS

# Check NATS is running
docker ps | grep nats

# Check NATS logs
docker logs nitaq-nats

Events not reaching database

# Check log-consumer logs
docker logs nitaq-log-consumer

# Check NATS stream
curl http://localhost:8222/jsz

Performance tuning

# Increase batch size for higher throughput
BATCH_SIZE=500 make run-log-consumer

# Decrease flush interval for lower latency
FLUSH_INTERVAL=1s make run-log-consumer

Migration from Old Architecture

The old in-memory channel approach has been completely replaced. Key changes:
  • ❌ Removed: worker(), flush(), Start(), Stop() methods
  • ✅ Added: NATS publisher, event schemas, log-consumer service
  • ✅ Same: Database schema, API unchanged, backward compatible
See IMPLEMENTATION_SUMMARY.md for full details.