From 31ad2178181faad0569102f5333951d7e57375db Mon Sep 17 00:00:00 2001 From: Hein Date: Fri, 12 Dec 2025 09:23:54 +0200 Subject: [PATCH] Event Broken Concept --- pkg/common/adapters/database/bun.go | 8 + pkg/common/adapters/database/gorm.go | 4 + pkg/common/adapters/database/pgsql.go | 8 + pkg/common/interfaces.go | 6 + pkg/config/config.go | 50 +++ pkg/config/manager.go | 35 ++ pkg/eventbroker/README.md | 327 +++++++++++++++ pkg/eventbroker/broker.go | 453 ++++++++++++++++++++ pkg/eventbroker/broker_test.go | 524 ++++++++++++++++++++++++ pkg/eventbroker/event.go | 175 ++++++++ pkg/eventbroker/event_test.go | 314 ++++++++++++++ pkg/eventbroker/eventbroker.go | 160 ++++++++ pkg/eventbroker/example_usage.go | 266 ++++++++++++ pkg/eventbroker/factory.go | 56 +++ pkg/eventbroker/handler.go | 17 + pkg/eventbroker/hooks.go | 137 +++++++ pkg/eventbroker/metrics.go | 28 ++ pkg/eventbroker/provider.go | 70 ++++ pkg/eventbroker/provider_memory.go | 446 ++++++++++++++++++++ pkg/eventbroker/provider_memory_test.go | 419 +++++++++++++++++++ pkg/eventbroker/subscription.go | 140 +++++++ pkg/eventbroker/subscription_test.go | 270 ++++++++++++ pkg/eventbroker/worker_pool.go | 141 +++++++ pkg/funcspec/function_api_test.go | 4 + pkg/metrics/interfaces.go | 19 +- 25 files changed, 4074 insertions(+), 3 deletions(-) create mode 100644 pkg/eventbroker/README.md create mode 100644 pkg/eventbroker/broker.go create mode 100644 pkg/eventbroker/broker_test.go create mode 100644 pkg/eventbroker/event.go create mode 100644 pkg/eventbroker/event_test.go create mode 100644 pkg/eventbroker/eventbroker.go create mode 100644 pkg/eventbroker/example_usage.go create mode 100644 pkg/eventbroker/factory.go create mode 100644 pkg/eventbroker/handler.go create mode 100644 pkg/eventbroker/hooks.go create mode 100644 pkg/eventbroker/metrics.go create mode 100644 pkg/eventbroker/provider.go create mode 100644 pkg/eventbroker/provider_memory.go create mode 100644 pkg/eventbroker/provider_memory_test.go create mode 100644 pkg/eventbroker/subscription.go create mode 100644 pkg/eventbroker/subscription_test.go create mode 100644 pkg/eventbroker/worker_pool.go diff --git a/pkg/common/adapters/database/bun.go b/pkg/common/adapters/database/bun.go index fe08bc5..a83b945 100644 --- a/pkg/common/adapters/database/bun.go +++ b/pkg/common/adapters/database/bun.go @@ -196,6 +196,10 @@ func (b *BunAdapter) RunInTransaction(ctx context.Context, fn func(common.Databa }) } +func (b *BunAdapter) GetUnderlyingDB() interface{} { + return b.db +} + // BunSelectQuery implements SelectQuery for Bun type BunSelectQuery struct { query *bun.SelectQuery @@ -1208,3 +1212,7 @@ func (b *BunTxAdapter) RollbackTx(ctx context.Context) error { func (b *BunTxAdapter) RunInTransaction(ctx context.Context, fn func(common.Database) error) error { return fn(b) // Already in transaction } + +func (b *BunTxAdapter) GetUnderlyingDB() interface{} { + return b.tx +} diff --git a/pkg/common/adapters/database/gorm.go b/pkg/common/adapters/database/gorm.go index e201814..4bf1cf6 100644 --- a/pkg/common/adapters/database/gorm.go +++ b/pkg/common/adapters/database/gorm.go @@ -102,6 +102,10 @@ func (g *GormAdapter) RunInTransaction(ctx context.Context, fn func(common.Datab }) } +func (g *GormAdapter) GetUnderlyingDB() interface{} { + return g.db +} + // GormSelectQuery implements SelectQuery for GORM type GormSelectQuery struct { db *gorm.DB diff --git a/pkg/common/adapters/database/pgsql.go b/pkg/common/adapters/database/pgsql.go index f71bf26..4b81204 100644 --- a/pkg/common/adapters/database/pgsql.go +++ b/pkg/common/adapters/database/pgsql.go @@ -137,6 +137,10 @@ func (p *PgSQLAdapter) RunInTransaction(ctx context.Context, fn func(common.Data return fn(adapter) } +func (p *PgSQLAdapter) GetUnderlyingDB() interface{} { + return p.db +} + // preloadConfig represents a relationship to be preloaded type preloadConfig struct { relation string @@ -897,6 +901,10 @@ func (p *PgSQLTxAdapter) RunInTransaction(ctx context.Context, fn func(common.Da return fn(p) } +func (p *PgSQLTxAdapter) GetUnderlyingDB() interface{} { + return p.tx +} + // applyJoinPreloads adds JOINs for relationships that should use JOIN strategy func (p *PgSQLSelectQuery) applyJoinPreloads() { for _, preload := range p.preloads { diff --git a/pkg/common/interfaces.go b/pkg/common/interfaces.go index 9d53c07..57dd78f 100644 --- a/pkg/common/interfaces.go +++ b/pkg/common/interfaces.go @@ -24,6 +24,12 @@ type Database interface { CommitTx(ctx context.Context) error RollbackTx(ctx context.Context) error RunInTransaction(ctx context.Context, fn func(Database) error) error + + // GetUnderlyingDB returns the underlying database connection + // For GORM, this returns *gorm.DB + // For Bun, this returns *bun.DB + // This is useful for provider-specific features like PostgreSQL NOTIFY/LISTEN + GetUnderlyingDB() interface{} } // SelectQuery interface for building SELECT queries (compatible with both GORM and Bun) diff --git a/pkg/config/config.go b/pkg/config/config.go index af1d9ad..7c18231 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -12,6 +12,7 @@ type Config struct { Middleware MiddlewareConfig `mapstructure:"middleware"` CORS CORSConfig `mapstructure:"cors"` Database DatabaseConfig `mapstructure:"database"` + EventBroker EventBrokerConfig `mapstructure:"event_broker"` } // ServerConfig holds server-related configuration @@ -91,3 +92,52 @@ type ErrorTrackingConfig struct { SampleRate float64 `mapstructure:"sample_rate"` // Error sample rate (0.0-1.0) TracesSampleRate float64 `mapstructure:"traces_sample_rate"` // Traces sample rate (0.0-1.0) } + +// EventBrokerConfig contains configuration for the event broker +type EventBrokerConfig struct { + Enabled bool `mapstructure:"enabled"` + Provider string `mapstructure:"provider"` // memory, redis, nats, database + Mode string `mapstructure:"mode"` // sync, async + WorkerCount int `mapstructure:"worker_count"` + BufferSize int `mapstructure:"buffer_size"` + InstanceID string `mapstructure:"instance_id"` + Redis EventBrokerRedisConfig `mapstructure:"redis"` + NATS EventBrokerNATSConfig `mapstructure:"nats"` + Database EventBrokerDatabaseConfig `mapstructure:"database"` + RetryPolicy EventBrokerRetryPolicyConfig `mapstructure:"retry_policy"` +} + +// EventBrokerRedisConfig contains Redis-specific configuration +type EventBrokerRedisConfig struct { + StreamName string `mapstructure:"stream_name"` + ConsumerGroup string `mapstructure:"consumer_group"` + MaxLen int64 `mapstructure:"max_len"` + Host string `mapstructure:"host"` + Port int `mapstructure:"port"` + Password string `mapstructure:"password"` + DB int `mapstructure:"db"` +} + +// EventBrokerNATSConfig contains NATS-specific configuration +type EventBrokerNATSConfig struct { + URL string `mapstructure:"url"` + StreamName string `mapstructure:"stream_name"` + Subjects []string `mapstructure:"subjects"` + Storage string `mapstructure:"storage"` // file, memory + MaxAge time.Duration `mapstructure:"max_age"` +} + +// EventBrokerDatabaseConfig contains database provider configuration +type EventBrokerDatabaseConfig struct { + TableName string `mapstructure:"table_name"` + Channel string `mapstructure:"channel"` // PostgreSQL NOTIFY channel name + PollInterval time.Duration `mapstructure:"poll_interval"` +} + +// EventBrokerRetryPolicyConfig contains retry policy configuration +type EventBrokerRetryPolicyConfig struct { + MaxRetries int `mapstructure:"max_retries"` + InitialDelay time.Duration `mapstructure:"initial_delay"` + MaxDelay time.Duration `mapstructure:"max_delay"` + BackoffFactor float64 `mapstructure:"backoff_factor"` +} diff --git a/pkg/config/manager.go b/pkg/config/manager.go index 45b15d9..016c6bf 100644 --- a/pkg/config/manager.go +++ b/pkg/config/manager.go @@ -165,4 +165,39 @@ func setDefaults(v *viper.Viper) { // Database defaults v.SetDefault("database.url", "") + + // Event Broker defaults + v.SetDefault("event_broker.enabled", false) + v.SetDefault("event_broker.provider", "memory") + v.SetDefault("event_broker.mode", "async") + v.SetDefault("event_broker.worker_count", 10) + v.SetDefault("event_broker.buffer_size", 1000) + v.SetDefault("event_broker.instance_id", "") + + // Event Broker - Redis defaults + v.SetDefault("event_broker.redis.stream_name", "resolvespec:events") + v.SetDefault("event_broker.redis.consumer_group", "resolvespec-workers") + v.SetDefault("event_broker.redis.max_len", 10000) + v.SetDefault("event_broker.redis.host", "localhost") + v.SetDefault("event_broker.redis.port", 6379) + v.SetDefault("event_broker.redis.password", "") + v.SetDefault("event_broker.redis.db", 0) + + // Event Broker - NATS defaults + v.SetDefault("event_broker.nats.url", "nats://localhost:4222") + v.SetDefault("event_broker.nats.stream_name", "RESOLVESPEC_EVENTS") + v.SetDefault("event_broker.nats.subjects", []string{"events.>"}) + v.SetDefault("event_broker.nats.storage", "file") + v.SetDefault("event_broker.nats.max_age", "24h") + + // Event Broker - Database defaults + v.SetDefault("event_broker.database.table_name", "events") + v.SetDefault("event_broker.database.channel", "resolvespec_events") + v.SetDefault("event_broker.database.poll_interval", "1s") + + // Event Broker - Retry Policy defaults + v.SetDefault("event_broker.retry_policy.max_retries", 3) + v.SetDefault("event_broker.retry_policy.initial_delay", "1s") + v.SetDefault("event_broker.retry_policy.max_delay", "30s") + v.SetDefault("event_broker.retry_policy.backoff_factor", 2.0) } diff --git a/pkg/eventbroker/README.md b/pkg/eventbroker/README.md new file mode 100644 index 0000000..aed4861 --- /dev/null +++ b/pkg/eventbroker/README.md @@ -0,0 +1,327 @@ +# Event Broker System + +A comprehensive event handler/broker system for ResolveSpec that provides real-time event publishing, subscription, and cross-instance communication. + +## Features + +- **Multiple Sources**: Events from database, websockets, frontend, system, and internal sources +- **Event Status Tracking**: Pending, processing, completed, failed states with timestamps +- **Rich Metadata**: User IDs, session IDs, instance IDs, JSON payloads, and custom metadata +- **Sync & Async Modes**: Choose between synchronous or asynchronous event processing +- **Pattern Matching**: Subscribe to events using glob-style patterns +- **Multiple Providers**: In-memory, Redis Streams, NATS JetStream, PostgreSQL with NOTIFY +- **Hook Integration**: Automatic CRUD event capture via restheadspec hooks +- **Retry Logic**: Configurable retry policy with exponential backoff +- **Metrics**: Prometheus-compatible metrics for monitoring +- **Graceful Shutdown**: Proper cleanup and event flushing on shutdown + +## Quick Start + +### 1. Configuration + +Add to your `config.yaml`: + +```yaml +event_broker: + enabled: true + provider: memory # memory, redis, nats, database + mode: async # sync, async + worker_count: 10 + buffer_size: 1000 + instance_id: "${HOSTNAME}" +``` + +### 2. Initialize + +```go +import ( + "github.com/bitechdev/ResolveSpec/pkg/config" + "github.com/bitechdev/ResolveSpec/pkg/eventbroker" +) + +func main() { + // Load configuration + cfgMgr := config.NewManager() + cfg, _ := cfgMgr.GetConfig() + + // Initialize event broker + if err := eventbroker.Initialize(cfg.EventBroker); err != nil { + log.Fatal(err) + } +} +``` + +### 3. Subscribe to Events + +```go +// Subscribe to specific events +eventbroker.Subscribe("public.users.create", eventbroker.EventHandlerFunc( + func(ctx context.Context, event *eventbroker.Event) error { + log.Printf("New user created: %s", event.Payload) + // Send welcome email, update cache, etc. + return nil + }, +)) + +// Subscribe with patterns +eventbroker.Subscribe("*.*.delete", eventbroker.EventHandlerFunc( + func(ctx context.Context, event *eventbroker.Event) error { + log.Printf("Deleted: %s.%s", event.Schema, event.Entity) + return nil + }, +)) +``` + +### 4. Publish Events + +```go +// Create and publish an event +event := eventbroker.NewEvent(eventbroker.EventSourceDatabase, "public.users.update") +event.InstanceID = eventbroker.GetDefaultBroker().InstanceID() +event.UserID = 123 +event.SessionID = "session-456" +event.Schema = "public" +event.Entity = "users" +event.Operation = "update" + +event.SetPayload(map[string]interface{}{ + "id": 123, + "name": "John Doe", +}) + +// Async (non-blocking) +eventbroker.PublishAsync(ctx, event) + +// Sync (blocking) +eventbroker.PublishSync(ctx, event) +``` + +## Automatic CRUD Event Capture + +Automatically capture database CRUD operations: + +```go +import ( + "github.com/bitechdev/ResolveSpec/pkg/eventbroker" + "github.com/bitechdev/ResolveSpec/pkg/restheadspec" +) + +func setupHooks(handler *restheadspec.Handler) { + broker := eventbroker.GetDefaultBroker() + + // Configure which operations to capture + config := eventbroker.DefaultCRUDHookConfig() + config.EnableRead = false // Disable read events for performance + + // Register hooks + eventbroker.RegisterCRUDHooks(broker, handler.Hooks(), config) + + // Now all create/update/delete operations automatically publish events! +} +``` + +## Event Structure + +Every event contains: + +```go +type Event struct { + ID string // UUID + Source EventSource // database, websocket, system, frontend, internal + Type string // Pattern: schema.entity.operation + Status EventStatus // pending, processing, completed, failed + Payload json.RawMessage // JSON payload + UserID int // User who triggered the event + SessionID string // Session identifier + InstanceID string // Server instance identifier + Schema string // Database schema + Entity string // Database entity/table + Operation string // create, update, delete, read + CreatedAt time.Time // When event was created + ProcessedAt *time.Time // When processing started + CompletedAt *time.Time // When processing completed + Error string // Error message if failed + Metadata map[string]interface{} // Additional context + RetryCount int // Number of retry attempts +} +``` + +## Pattern Matching + +Subscribe to events using glob-style patterns: + +| Pattern | Matches | Example | +|---------|---------|---------| +| `*` | All events | Any event | +| `public.users.*` | All user operations | `public.users.create`, `public.users.update` | +| `*.*.create` | All create operations | `public.users.create`, `auth.sessions.create` | +| `public.*.*` | All events in public schema | `public.users.create`, `public.posts.delete` | +| `public.users.create` | Exact match | Only `public.users.create` | + +## Providers + +### Memory Provider (Default) + +Best for: Development, single-instance deployments + +- **Pros**: Fast, no dependencies, simple +- **Cons**: Events lost on restart, single-instance only + +```yaml +event_broker: + provider: memory +``` + +### Redis Provider (Future) + +Best for: Production, multi-instance deployments + +- **Pros**: Persistent, cross-instance pub/sub, reliable +- **Cons**: Requires Redis + +```yaml +event_broker: + provider: redis + redis: + stream_name: "resolvespec:events" + consumer_group: "resolvespec-workers" + host: "localhost" + port: 6379 +``` + +### NATS Provider (Future) + +Best for: High-performance, low-latency requirements + +- **Pros**: Very fast, built-in clustering, durable +- **Cons**: Requires NATS server + +```yaml +event_broker: + provider: nats + nats: + url: "nats://localhost:4222" + stream_name: "RESOLVESPEC_EVENTS" +``` + +### Database Provider (Future) + +Best for: Audit trails, event replay, SQL queries + +- **Pros**: No additional infrastructure, full SQL query support, PostgreSQL NOTIFY for real-time +- **Cons**: Slower than Redis/NATS + +```yaml +event_broker: + provider: database + database: + table_name: "events" + channel: "resolvespec_events" +``` + +## Processing Modes + +### Async Mode (Recommended) + +Events are queued and processed by worker pool: + +- Non-blocking event publishing +- Configurable worker count +- Better throughput +- Events may be processed out of order + +```yaml +event_broker: + mode: async + worker_count: 10 + buffer_size: 1000 +``` + +### Sync Mode + +Events are processed immediately: + +- Blocking event publishing +- Guaranteed ordering +- Immediate error feedback +- Lower throughput + +```yaml +event_broker: + mode: sync +``` + +## Retry Policy + +Configure automatic retries for failed handlers: + +```yaml +event_broker: + retry_policy: + max_retries: 3 + initial_delay: 1s + max_delay: 30s + backoff_factor: 2.0 # Exponential backoff +``` + +## Metrics + +The event broker exposes Prometheus metrics: + +- `eventbroker_events_published_total{source, type}` - Total events published +- `eventbroker_events_processed_total{source, type, status}` - Total events processed +- `eventbroker_event_processing_duration_seconds{source, type}` - Event processing duration +- `eventbroker_queue_size` - Current queue size (async mode) + +## Best Practices + +1. **Use Async Mode**: For better performance, use async mode in production +2. **Disable Read Events**: Read events can be high volume; disable if not needed +3. **Pattern Matching**: Use specific patterns to avoid processing unnecessary events +4. **Error Handling**: Always handle errors in event handlers; they won't fail the original operation +5. **Idempotency**: Make handlers idempotent as events may be retried +6. **Payload Size**: Keep payloads reasonable; avoid large objects +7. **Monitoring**: Monitor metrics to detect issues early + +## Examples + +See `example_usage.go` for comprehensive examples including: +- Basic event publishing and subscription +- Hook integration +- Error handling +- Configuration +- Pattern matching + +## Architecture + +``` +┌─────────────────┐ +│ Application │ +└────────┬────────┘ + │ + ├─ Publish Events + │ +┌────────▼────────┐ ┌──────────────┐ +│ Event Broker │◄────►│ Subscribers │ +└────────┬────────┘ └──────────────┘ + │ + ├─ Store Events + │ +┌────────▼────────┐ +│ Provider │ +│ (Memory/Redis │ +│ /NATS/DB) │ +└─────────────────┘ +``` + +## Future Enhancements + +- [ ] Database Provider with PostgreSQL NOTIFY +- [ ] Redis Streams Provider +- [ ] NATS JetStream Provider +- [ ] Event replay functionality +- [ ] Dead letter queue +- [ ] Event filtering at provider level +- [ ] Batch publishing +- [ ] Event compression +- [ ] Schema versioning diff --git a/pkg/eventbroker/broker.go b/pkg/eventbroker/broker.go new file mode 100644 index 0000000..c903f7a --- /dev/null +++ b/pkg/eventbroker/broker.go @@ -0,0 +1,453 @@ +package eventbroker + +import ( + "context" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/bitechdev/ResolveSpec/pkg/logger" +) + +// Broker is the main interface for event publishing and subscription +type Broker interface { + // Publish publishes an event (mode-dependent: sync or async) + Publish(ctx context.Context, event *Event) error + + // PublishSync publishes an event synchronously (blocks until all handlers complete) + PublishSync(ctx context.Context, event *Event) error + + // PublishAsync publishes an event asynchronously (returns immediately) + PublishAsync(ctx context.Context, event *Event) error + + // Subscribe registers a handler for events matching the pattern + Subscribe(pattern string, handler EventHandler) (SubscriptionID, error) + + // Unsubscribe removes a subscription + Unsubscribe(id SubscriptionID) error + + // Start starts the broker (begins processing events) + Start(ctx context.Context) error + + // Stop stops the broker gracefully (flushes pending events) + Stop(ctx context.Context) error + + // Stats returns broker statistics + Stats(ctx context.Context) (*BrokerStats, error) + + // InstanceID returns the instance ID of this broker + InstanceID() string +} + +// ProcessingMode determines how events are processed +type ProcessingMode string + +const ( + ProcessingModeSync ProcessingMode = "sync" + ProcessingModeAsync ProcessingMode = "async" +) + +// BrokerStats contains broker statistics +type BrokerStats struct { + InstanceID string `json:"instance_id"` + Mode ProcessingMode `json:"mode"` + IsRunning bool `json:"is_running"` + TotalPublished int64 `json:"total_published"` + TotalProcessed int64 `json:"total_processed"` + TotalFailed int64 `json:"total_failed"` + ActiveSubscribers int `json:"active_subscribers"` + QueueSize int `json:"queue_size,omitempty"` // For async mode + ActiveWorkers int `json:"active_workers,omitempty"` // For async mode + ProviderStats *ProviderStats `json:"provider_stats,omitempty"` + AdditionalStats map[string]interface{} `json:"additional_stats,omitempty"` +} + +// EventBroker implements the Broker interface +type EventBroker struct { + provider Provider + subscriptions *subscriptionManager + mode ProcessingMode + instanceID string + retryPolicy *RetryPolicy + + // Async mode fields (initialized in Phase 4) + workerPool *workerPool + + // Runtime state + isRunning atomic.Bool + stopOnce sync.Once + stopCh chan struct{} + wg sync.WaitGroup + + // Statistics + statsPublished atomic.Int64 + statsProcessed atomic.Int64 + statsFailed atomic.Int64 +} + +// RetryPolicy defines how failed events should be retried +type RetryPolicy struct { + MaxRetries int + InitialDelay time.Duration + MaxDelay time.Duration + BackoffFactor float64 +} + +// DefaultRetryPolicy returns a sensible default retry policy +func DefaultRetryPolicy() *RetryPolicy { + return &RetryPolicy{ + MaxRetries: 3, + InitialDelay: 1 * time.Second, + MaxDelay: 30 * time.Second, + BackoffFactor: 2.0, + } +} + +// Options for creating a new broker +type Options struct { + Provider Provider + Mode ProcessingMode + WorkerCount int // For async mode + BufferSize int // For async mode + RetryPolicy *RetryPolicy + InstanceID string +} + +// NewBroker creates a new event broker with the given options +func NewBroker(opts Options) (*EventBroker, error) { + if opts.Provider == nil { + return nil, fmt.Errorf("provider is required") + } + if opts.InstanceID == "" { + return nil, fmt.Errorf("instance ID is required") + } + if opts.Mode == "" { + opts.Mode = ProcessingModeAsync // Default to async + } + if opts.RetryPolicy == nil { + opts.RetryPolicy = DefaultRetryPolicy() + } + + broker := &EventBroker{ + provider: opts.Provider, + subscriptions: newSubscriptionManager(), + mode: opts.Mode, + instanceID: opts.InstanceID, + retryPolicy: opts.RetryPolicy, + stopCh: make(chan struct{}), + } + + // Worker pool will be initialized in Phase 4 for async mode + if opts.Mode == ProcessingModeAsync { + if opts.WorkerCount == 0 { + opts.WorkerCount = 10 // Default + } + if opts.BufferSize == 0 { + opts.BufferSize = 1000 // Default + } + broker.workerPool = newWorkerPool(opts.WorkerCount, opts.BufferSize, broker.processEvent) + } + + return broker, nil +} + +// Functional option pattern helpers +func WithProvider(p Provider) func(*Options) { + return func(o *Options) { o.Provider = p } +} + +func WithMode(m ProcessingMode) func(*Options) { + return func(o *Options) { o.Mode = m } +} + +func WithWorkerCount(count int) func(*Options) { + return func(o *Options) { o.WorkerCount = count } +} + +func WithBufferSize(size int) func(*Options) { + return func(o *Options) { o.BufferSize = size } +} + +func WithRetryPolicy(policy *RetryPolicy) func(*Options) { + return func(o *Options) { o.RetryPolicy = policy } +} + +func WithInstanceID(id string) func(*Options) { + return func(o *Options) { o.InstanceID = id } +} + +// Start starts the broker +func (b *EventBroker) Start(ctx context.Context) error { + if b.isRunning.Load() { + return fmt.Errorf("broker already running") + } + + b.isRunning.Store(true) + + // Start worker pool for async mode + if b.mode == ProcessingModeAsync && b.workerPool != nil { + b.workerPool.Start() + } + + logger.Info("Event broker started (mode: %s, instance: %s)", b.mode, b.instanceID) + return nil +} + +// Stop stops the broker gracefully +func (b *EventBroker) Stop(ctx context.Context) error { + var stopErr error + + b.stopOnce.Do(func() { + logger.Info("Stopping event broker...") + + // Mark as not running + b.isRunning.Store(false) + + // Close the stop channel + close(b.stopCh) + + // Stop worker pool for async mode + if b.mode == ProcessingModeAsync && b.workerPool != nil { + if err := b.workerPool.Stop(ctx); err != nil { + logger.Error("Error stopping worker pool: %v", err) + stopErr = err + } + } + + // Wait for all goroutines + b.wg.Wait() + + // Close provider + if err := b.provider.Close(); err != nil { + logger.Error("Error closing provider: %v", err) + if stopErr == nil { + stopErr = err + } + } + + logger.Info("Event broker stopped") + }) + + return stopErr +} + +// Publish publishes an event based on the broker's mode +func (b *EventBroker) Publish(ctx context.Context, event *Event) error { + if b.mode == ProcessingModeSync { + return b.PublishSync(ctx, event) + } + return b.PublishAsync(ctx, event) +} + +// PublishSync publishes an event synchronously +func (b *EventBroker) PublishSync(ctx context.Context, event *Event) error { + if !b.isRunning.Load() { + return fmt.Errorf("broker is not running") + } + + // Validate event + if err := event.Validate(); err != nil { + return fmt.Errorf("invalid event: %w", err) + } + + // Store event in provider + if err := b.provider.Publish(ctx, event); err != nil { + return fmt.Errorf("failed to publish event: %w", err) + } + + b.statsPublished.Add(1) + + // Record metrics + recordEventPublished(event) + + // Process event synchronously + if err := b.processEvent(ctx, event); err != nil { + logger.Error("Failed to process event %s: %v", event.ID, err) + b.statsFailed.Add(1) + return err + } + + b.statsProcessed.Add(1) + return nil +} + +// PublishAsync publishes an event asynchronously +func (b *EventBroker) PublishAsync(ctx context.Context, event *Event) error { + if !b.isRunning.Load() { + return fmt.Errorf("broker is not running") + } + + // Validate event + if err := event.Validate(); err != nil { + return fmt.Errorf("invalid event: %w", err) + } + + // Store event in provider + if err := b.provider.Publish(ctx, event); err != nil { + return fmt.Errorf("failed to publish event: %w", err) + } + + b.statsPublished.Add(1) + + // Record metrics + recordEventPublished(event) + + // Queue for async processing + if b.mode == ProcessingModeAsync && b.workerPool != nil { + // Update queue size metrics + updateQueueSize(int64(b.workerPool.QueueSize())) + return b.workerPool.Submit(ctx, event) + } + + // Fallback to sync if async not configured + return b.processEvent(ctx, event) +} + +// Subscribe adds a subscription for events matching the pattern +func (b *EventBroker) Subscribe(pattern string, handler EventHandler) (SubscriptionID, error) { + return b.subscriptions.Subscribe(pattern, handler) +} + +// Unsubscribe removes a subscription +func (b *EventBroker) Unsubscribe(id SubscriptionID) error { + return b.subscriptions.Unsubscribe(id) +} + +// processEvent processes an event by calling all matching handlers +func (b *EventBroker) processEvent(ctx context.Context, event *Event) error { + startTime := time.Now() + + // Get all handlers matching this event type + handlers := b.subscriptions.GetMatching(event.Type) + + if len(handlers) == 0 { + logger.Debug("No handlers for event type: %s", event.Type) + return nil + } + + logger.Debug("Processing event %s with %d handler(s)", event.ID, len(handlers)) + + // Mark event as processing + event.MarkProcessing() + if err := b.provider.UpdateStatus(ctx, event.ID, EventStatusProcessing, ""); err != nil { + logger.Warn("Failed to update event status: %v", err) + } + + // Execute all handlers + var lastErr error + for i, handler := range handlers { + if err := b.executeHandlerWithRetry(ctx, handler, event); err != nil { + logger.Error("Handler %d failed for event %s: %v", i+1, event.ID, err) + lastErr = err + // Continue processing other handlers + } + } + + // Update final status + if lastErr != nil { + event.MarkFailed(lastErr) + if err := b.provider.UpdateStatus(ctx, event.ID, EventStatusFailed, lastErr.Error()); err != nil { + logger.Warn("Failed to update event status: %v", err) + } + + // Record metrics + recordEventProcessed(event, time.Since(startTime)) + + return lastErr + } + + event.MarkCompleted() + if err := b.provider.UpdateStatus(ctx, event.ID, EventStatusCompleted, ""); err != nil { + logger.Warn("Failed to update event status: %v", err) + } + + // Record metrics + recordEventProcessed(event, time.Since(startTime)) + + return nil +} + +// executeHandlerWithRetry executes a handler with retry logic +func (b *EventBroker) executeHandlerWithRetry(ctx context.Context, handler EventHandler, event *Event) error { + var lastErr error + + for attempt := 0; attempt <= b.retryPolicy.MaxRetries; attempt++ { + if attempt > 0 { + // Calculate backoff delay + delay := b.calculateBackoff(attempt) + logger.Debug("Retrying event %s (attempt %d/%d) after %v", + event.ID, attempt, b.retryPolicy.MaxRetries, delay) + + select { + case <-time.After(delay): + case <-ctx.Done(): + return ctx.Err() + } + + event.IncrementRetry() + } + + // Execute handler + if err := handler.Handle(ctx, event); err != nil { + lastErr = err + logger.Warn("Handler failed for event %s (attempt %d): %v", event.ID, attempt+1, err) + continue + } + + // Success + return nil + } + + return fmt.Errorf("handler failed after %d attempts: %w", b.retryPolicy.MaxRetries+1, lastErr) +} + +// calculateBackoff calculates the backoff delay for a retry attempt +func (b *EventBroker) calculateBackoff(attempt int) time.Duration { + delay := float64(b.retryPolicy.InitialDelay) * pow(b.retryPolicy.BackoffFactor, float64(attempt-1)) + if delay > float64(b.retryPolicy.MaxDelay) { + delay = float64(b.retryPolicy.MaxDelay) + } + return time.Duration(delay) +} + +// pow is a simple integer power function +func pow(base float64, exp float64) float64 { + result := 1.0 + for i := 0.0; i < exp; i++ { + result *= base + } + return result +} + +// Stats returns broker statistics +func (b *EventBroker) Stats(ctx context.Context) (*BrokerStats, error) { + providerStats, err := b.provider.Stats(ctx) + if err != nil { + logger.Warn("Failed to get provider stats: %v", err) + } + + stats := &BrokerStats{ + InstanceID: b.instanceID, + Mode: b.mode, + IsRunning: b.isRunning.Load(), + TotalPublished: b.statsPublished.Load(), + TotalProcessed: b.statsProcessed.Load(), + TotalFailed: b.statsFailed.Load(), + ActiveSubscribers: b.subscriptions.Count(), + ProviderStats: providerStats, + } + + // Add async-specific stats + if b.mode == ProcessingModeAsync && b.workerPool != nil { + stats.QueueSize = b.workerPool.QueueSize() + stats.ActiveWorkers = b.workerPool.ActiveWorkers() + } + + return stats, nil +} + +// InstanceID returns the instance ID +func (b *EventBroker) InstanceID() string { + return b.instanceID +} diff --git a/pkg/eventbroker/broker_test.go b/pkg/eventbroker/broker_test.go new file mode 100644 index 0000000..9217ac8 --- /dev/null +++ b/pkg/eventbroker/broker_test.go @@ -0,0 +1,524 @@ +package eventbroker + +import ( + "context" + "errors" + "sync" + "sync/atomic" + "testing" + "time" +) + +func TestNewBroker(t *testing.T) { + provider := NewMemoryProvider(MemoryProviderOptions{ + InstanceID: "test-instance", + MaxEvents: 1000, + }) + + tests := []struct { + name string + opts Options + wantError bool + }{ + { + name: "valid options", + opts: Options{ + Provider: provider, + InstanceID: "test-instance", + Mode: ProcessingModeSync, + }, + wantError: false, + }, + { + name: "missing provider", + opts: Options{ + InstanceID: "test-instance", + }, + wantError: true, + }, + { + name: "missing instance ID", + opts: Options{ + Provider: provider, + }, + wantError: true, + }, + { + name: "async mode with defaults", + opts: Options{ + Provider: provider, + InstanceID: "test-instance", + Mode: ProcessingModeAsync, + }, + wantError: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + broker, err := NewBroker(tt.opts) + if (err != nil) != tt.wantError { + t.Errorf("NewBroker() error = %v, wantError %v", err, tt.wantError) + } + if err == nil && broker == nil { + t.Error("Expected non-nil broker") + } + }) + } +} + +func TestBrokerStartStop(t *testing.T) { + provider := NewMemoryProvider(MemoryProviderOptions{ + InstanceID: "test-instance", + }) + + broker, err := NewBroker(Options{ + Provider: provider, + InstanceID: "test-instance", + Mode: ProcessingModeSync, + }) + if err != nil { + t.Fatalf("Failed to create broker: %v", err) + } + + // Test Start + if err := broker.Start(context.Background()); err != nil { + t.Fatalf("Failed to start broker: %v", err) + } + + // Test double start (should fail) + if err := broker.Start(context.Background()); err == nil { + t.Error("Expected error on double start") + } + + // Test Stop + if err := broker.Stop(context.Background()); err != nil { + t.Fatalf("Failed to stop broker: %v", err) + } + + // Test double stop (should not fail) + if err := broker.Stop(context.Background()); err != nil { + t.Error("Double stop should not fail") + } +} + +func TestBrokerPublishSync(t *testing.T) { + provider := NewMemoryProvider(MemoryProviderOptions{ + InstanceID: "test-instance", + }) + + broker, _ := NewBroker(Options{ + Provider: provider, + InstanceID: "test-instance", + Mode: ProcessingModeSync, + }) + broker.Start(context.Background()) + defer broker.Stop(context.Background()) + + // Subscribe to events + called := false + var receivedEvent *Event + broker.Subscribe("test.*", EventHandlerFunc(func(ctx context.Context, event *Event) error { + called = true + receivedEvent = event + return nil + })) + + // Publish event + event := NewEvent(EventSourceSystem, "test.event") + event.InstanceID = "test-instance" + err := broker.PublishSync(context.Background(), event) + if err != nil { + t.Fatalf("PublishSync failed: %v", err) + } + + // Verify handler was called + if !called { + t.Error("Expected handler to be called") + } + if receivedEvent == nil || receivedEvent.ID != event.ID { + t.Error("Expected to receive the published event") + } + + // Verify event status + if event.Status != EventStatusCompleted { + t.Errorf("Expected status %s, got %s", EventStatusCompleted, event.Status) + } +} + +func TestBrokerPublishAsync(t *testing.T) { + provider := NewMemoryProvider(MemoryProviderOptions{ + InstanceID: "test-instance", + }) + + broker, _ := NewBroker(Options{ + Provider: provider, + InstanceID: "test-instance", + Mode: ProcessingModeAsync, + WorkerCount: 2, + BufferSize: 10, + }) + broker.Start(context.Background()) + defer broker.Stop(context.Background()) + + // Subscribe to events + var callCount atomic.Int32 + broker.Subscribe("test.*", EventHandlerFunc(func(ctx context.Context, event *Event) error { + callCount.Add(1) + return nil + })) + + // Publish multiple events + for i := 0; i < 5; i++ { + event := NewEvent(EventSourceSystem, "test.event") + event.InstanceID = "test-instance" + if err := broker.PublishAsync(context.Background(), event); err != nil { + t.Fatalf("PublishAsync failed: %v", err) + } + } + + // Wait for events to be processed + time.Sleep(100 * time.Millisecond) + + if callCount.Load() != 5 { + t.Errorf("Expected 5 handler calls, got %d", callCount.Load()) + } +} + +func TestBrokerPublishBeforeStart(t *testing.T) { + provider := NewMemoryProvider(MemoryProviderOptions{ + InstanceID: "test-instance", + }) + + broker, _ := NewBroker(Options{ + Provider: provider, + InstanceID: "test-instance", + }) + + event := NewEvent(EventSourceSystem, "test.event") + event.InstanceID = "test-instance" + err := broker.Publish(context.Background(), event) + if err == nil { + t.Error("Expected error when publishing before start") + } +} + +func TestBrokerHandlerError(t *testing.T) { + provider := NewMemoryProvider(MemoryProviderOptions{ + InstanceID: "test-instance", + }) + + broker, _ := NewBroker(Options{ + Provider: provider, + InstanceID: "test-instance", + Mode: ProcessingModeSync, + RetryPolicy: &RetryPolicy{ + MaxRetries: 2, + InitialDelay: 10 * time.Millisecond, + MaxDelay: 100 * time.Millisecond, + BackoffFactor: 2.0, + }, + }) + broker.Start(context.Background()) + defer broker.Stop(context.Background()) + + // Subscribe with failing handler + var callCount atomic.Int32 + broker.Subscribe("test.*", EventHandlerFunc(func(ctx context.Context, event *Event) error { + callCount.Add(1) + return errors.New("handler error") + })) + + // Publish event + event := NewEvent(EventSourceSystem, "test.event") + event.InstanceID = "test-instance" + err := broker.PublishSync(context.Background(), event) + + // Should fail after retries + if err == nil { + t.Error("Expected error from handler") + } + + // Should have been called MaxRetries+1 times (initial + retries) + if callCount.Load() != 3 { + t.Errorf("Expected 3 calls (1 initial + 2 retries), got %d", callCount.Load()) + } + + // Event should be marked as failed + if event.Status != EventStatusFailed { + t.Errorf("Expected status %s, got %s", EventStatusFailed, event.Status) + } +} + +func TestBrokerMultipleHandlers(t *testing.T) { + provider := NewMemoryProvider(MemoryProviderOptions{ + InstanceID: "test-instance", + }) + + broker, _ := NewBroker(Options{ + Provider: provider, + InstanceID: "test-instance", + Mode: ProcessingModeSync, + }) + broker.Start(context.Background()) + defer broker.Stop(context.Background()) + + // Subscribe multiple handlers + var called1, called2, called3 bool + broker.Subscribe("test.*", EventHandlerFunc(func(ctx context.Context, event *Event) error { + called1 = true + return nil + })) + broker.Subscribe("test.event", EventHandlerFunc(func(ctx context.Context, event *Event) error { + called2 = true + return nil + })) + broker.Subscribe("*", EventHandlerFunc(func(ctx context.Context, event *Event) error { + called3 = true + return nil + })) + + // Publish event + event := NewEvent(EventSourceSystem, "test.event") + event.InstanceID = "test-instance" + broker.PublishSync(context.Background(), event) + + // All handlers should be called + if !called1 || !called2 || !called3 { + t.Error("Expected all handlers to be called") + } +} + +func TestBrokerUnsubscribe(t *testing.T) { + provider := NewMemoryProvider(MemoryProviderOptions{ + InstanceID: "test-instance", + }) + + broker, _ := NewBroker(Options{ + Provider: provider, + InstanceID: "test-instance", + Mode: ProcessingModeSync, + }) + broker.Start(context.Background()) + defer broker.Stop(context.Background()) + + // Subscribe + called := false + id, _ := broker.Subscribe("test.*", EventHandlerFunc(func(ctx context.Context, event *Event) error { + called = true + return nil + })) + + // Unsubscribe + if err := broker.Unsubscribe(id); err != nil { + t.Fatalf("Unsubscribe failed: %v", err) + } + + // Publish event + event := NewEvent(EventSourceSystem, "test.event") + event.InstanceID = "test-instance" + broker.PublishSync(context.Background(), event) + + // Handler should not be called + if called { + t.Error("Expected handler not to be called after unsubscribe") + } +} + +func TestBrokerStats(t *testing.T) { + provider := NewMemoryProvider(MemoryProviderOptions{ + InstanceID: "test-instance", + }) + + broker, _ := NewBroker(Options{ + Provider: provider, + InstanceID: "test-instance", + Mode: ProcessingModeSync, + }) + broker.Start(context.Background()) + defer broker.Stop(context.Background()) + + // Subscribe + broker.Subscribe("test.*", EventHandlerFunc(func(ctx context.Context, event *Event) error { + return nil + })) + + // Publish events + for i := 0; i < 3; i++ { + event := NewEvent(EventSourceSystem, "test.event") + event.InstanceID = "test-instance" + broker.PublishSync(context.Background(), event) + } + + // Get stats + stats, err := broker.Stats(context.Background()) + if err != nil { + t.Fatalf("Stats failed: %v", err) + } + + if stats.InstanceID != "test-instance" { + t.Errorf("Expected instance ID 'test-instance', got %s", stats.InstanceID) + } + if stats.TotalPublished != 3 { + t.Errorf("Expected 3 published events, got %d", stats.TotalPublished) + } + if stats.TotalProcessed != 3 { + t.Errorf("Expected 3 processed events, got %d", stats.TotalProcessed) + } + if stats.ActiveSubscribers != 1 { + t.Errorf("Expected 1 active subscriber, got %d", stats.ActiveSubscribers) + } +} + +func TestBrokerInstanceID(t *testing.T) { + provider := NewMemoryProvider(MemoryProviderOptions{ + InstanceID: "test-instance", + }) + + broker, _ := NewBroker(Options{ + Provider: provider, + InstanceID: "my-instance", + }) + + if broker.InstanceID() != "my-instance" { + t.Errorf("Expected instance ID 'my-instance', got %s", broker.InstanceID()) + } +} + +func TestBrokerConcurrentPublish(t *testing.T) { + provider := NewMemoryProvider(MemoryProviderOptions{ + InstanceID: "test-instance", + }) + + broker, _ := NewBroker(Options{ + Provider: provider, + InstanceID: "test-instance", + Mode: ProcessingModeAsync, + WorkerCount: 5, + BufferSize: 100, + }) + broker.Start(context.Background()) + defer broker.Stop(context.Background()) + + var callCount atomic.Int32 + broker.Subscribe("test.*", EventHandlerFunc(func(ctx context.Context, event *Event) error { + callCount.Add(1) + return nil + })) + + // Publish concurrently + var wg sync.WaitGroup + for i := 0; i < 50; i++ { + wg.Add(1) + go func() { + defer wg.Done() + event := NewEvent(EventSourceSystem, "test.event") + event.InstanceID = "test-instance" + broker.PublishAsync(context.Background(), event) + }() + } + + wg.Wait() + time.Sleep(200 * time.Millisecond) // Wait for async processing + + if callCount.Load() != 50 { + t.Errorf("Expected 50 handler calls, got %d", callCount.Load()) + } +} + +func TestBrokerGracefulShutdown(t *testing.T) { + provider := NewMemoryProvider(MemoryProviderOptions{ + InstanceID: "test-instance", + }) + + broker, _ := NewBroker(Options{ + Provider: provider, + InstanceID: "test-instance", + Mode: ProcessingModeAsync, + WorkerCount: 2, + BufferSize: 10, + }) + broker.Start(context.Background()) + + var processedCount atomic.Int32 + broker.Subscribe("test.*", EventHandlerFunc(func(ctx context.Context, event *Event) error { + time.Sleep(50 * time.Millisecond) // Simulate work + processedCount.Add(1) + return nil + })) + + // Publish events + for i := 0; i < 5; i++ { + event := NewEvent(EventSourceSystem, "test.event") + event.InstanceID = "test-instance" + broker.PublishAsync(context.Background(), event) + } + + // Stop broker (should wait for events to be processed) + if err := broker.Stop(context.Background()); err != nil { + t.Fatalf("Stop failed: %v", err) + } + + // All events should be processed + if processedCount.Load() != 5 { + t.Errorf("Expected 5 processed events, got %d", processedCount.Load()) + } +} + +func TestBrokerDefaultRetryPolicy(t *testing.T) { + policy := DefaultRetryPolicy() + + if policy.MaxRetries != 3 { + t.Errorf("Expected MaxRetries 3, got %d", policy.MaxRetries) + } + if policy.InitialDelay != 1*time.Second { + t.Errorf("Expected InitialDelay 1s, got %v", policy.InitialDelay) + } + if policy.BackoffFactor != 2.0 { + t.Errorf("Expected BackoffFactor 2.0, got %f", policy.BackoffFactor) + } +} + +func TestBrokerProcessingModes(t *testing.T) { + provider := NewMemoryProvider(MemoryProviderOptions{ + InstanceID: "test-instance", + }) + + tests := []struct { + name string + mode ProcessingMode + }{ + {"sync mode", ProcessingModeSync}, + {"async mode", ProcessingModeAsync}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + broker, _ := NewBroker(Options{ + Provider: provider, + InstanceID: "test-instance", + Mode: tt.mode, + }) + broker.Start(context.Background()) + defer broker.Stop(context.Background()) + + called := false + broker.Subscribe("test.*", EventHandlerFunc(func(ctx context.Context, event *Event) error { + called = true + return nil + })) + + event := NewEvent(EventSourceSystem, "test.event") + event.InstanceID = "test-instance" + broker.Publish(context.Background(), event) + + if tt.mode == ProcessingModeAsync { + time.Sleep(50 * time.Millisecond) + } + + if !called { + t.Error("Expected handler to be called") + } + }) + } +} diff --git a/pkg/eventbroker/event.go b/pkg/eventbroker/event.go new file mode 100644 index 0000000..4cdb927 --- /dev/null +++ b/pkg/eventbroker/event.go @@ -0,0 +1,175 @@ +package eventbroker + +import ( + "encoding/json" + "fmt" + "time" + + "github.com/google/uuid" +) + +// EventSource represents where an event originated from +type EventSource string + +const ( + EventSourceDatabase EventSource = "database" + EventSourceWebSocket EventSource = "websocket" + EventSourceFrontend EventSource = "frontend" + EventSourceSystem EventSource = "system" + EventSourceInternal EventSource = "internal" +) + +// EventStatus represents the current state of an event +type EventStatus string + +const ( + EventStatusPending EventStatus = "pending" + EventStatusProcessing EventStatus = "processing" + EventStatusCompleted EventStatus = "completed" + EventStatusFailed EventStatus = "failed" +) + +// Event represents a single event in the system with complete metadata +type Event struct { + // Identification + ID string `json:"id" db:"id"` + + // Source & Classification + Source EventSource `json:"source" db:"source"` + Type string `json:"type" db:"type"` // Pattern: schema.entity.operation + + // Status Tracking + Status EventStatus `json:"status" db:"status"` + RetryCount int `json:"retry_count" db:"retry_count"` + Error string `json:"error,omitempty" db:"error"` + + // Payload + Payload json.RawMessage `json:"payload" db:"payload"` + + // Context Information + UserID int `json:"user_id" db:"user_id"` + SessionID string `json:"session_id" db:"session_id"` + InstanceID string `json:"instance_id" db:"instance_id"` + + // Database Context + Schema string `json:"schema" db:"schema"` + Entity string `json:"entity" db:"entity"` + Operation string `json:"operation" db:"operation"` // create, update, delete, read + + // Timestamps + CreatedAt time.Time `json:"created_at" db:"created_at"` + ProcessedAt *time.Time `json:"processed_at,omitempty" db:"processed_at"` + CompletedAt *time.Time `json:"completed_at,omitempty" db:"completed_at"` + + // Extensibility + Metadata map[string]interface{} `json:"metadata" db:"metadata"` +} + +// NewEvent creates a new event with defaults +func NewEvent(source EventSource, eventType string) *Event { + return &Event{ + ID: uuid.New().String(), + Source: source, + Type: eventType, + Status: EventStatusPending, + CreatedAt: time.Now(), + Metadata: make(map[string]interface{}), + RetryCount: 0, + } +} + +// EventType generates a type string from schema, entity, and operation +// Pattern: schema.entity.operation (e.g., "public.users.create") +func EventType(schema, entity, operation string) string { + return fmt.Sprintf("%s.%s.%s", schema, entity, operation) +} + +// MarkProcessing marks the event as being processed +func (e *Event) MarkProcessing() { + e.Status = EventStatusProcessing + now := time.Now() + e.ProcessedAt = &now +} + +// MarkCompleted marks the event as successfully completed +func (e *Event) MarkCompleted() { + e.Status = EventStatusCompleted + now := time.Now() + e.CompletedAt = &now +} + +// MarkFailed marks the event as failed with an error message +func (e *Event) MarkFailed(err error) { + e.Status = EventStatusFailed + e.Error = err.Error() + now := time.Now() + e.CompletedAt = &now +} + +// IncrementRetry increments the retry counter +func (e *Event) IncrementRetry() { + e.RetryCount++ +} + +// SetPayload sets the event payload from any value by marshaling to JSON +func (e *Event) SetPayload(v interface{}) error { + data, err := json.Marshal(v) + if err != nil { + return fmt.Errorf("failed to marshal payload: %w", err) + } + e.Payload = data + return nil +} + +// GetPayload unmarshals the payload into the provided value +func (e *Event) GetPayload(v interface{}) error { + if len(e.Payload) == 0 { + return fmt.Errorf("payload is empty") + } + if err := json.Unmarshal(e.Payload, v); err != nil { + return fmt.Errorf("failed to unmarshal payload: %w", err) + } + return nil +} + +// Clone creates a deep copy of the event +func (e *Event) Clone() *Event { + clone := *e + + // Deep copy metadata + if e.Metadata != nil { + clone.Metadata = make(map[string]interface{}) + for k, v := range e.Metadata { + clone.Metadata[k] = v + } + } + + // Deep copy timestamps + if e.ProcessedAt != nil { + t := *e.ProcessedAt + clone.ProcessedAt = &t + } + if e.CompletedAt != nil { + t := *e.CompletedAt + clone.CompletedAt = &t + } + + return &clone +} + +// Validate performs basic validation on the event +func (e *Event) Validate() error { + if e.ID == "" { + return fmt.Errorf("event ID is required") + } + if e.Source == "" { + return fmt.Errorf("event source is required") + } + if e.Type == "" { + return fmt.Errorf("event type is required") + } + if e.InstanceID == "" { + return fmt.Errorf("instance ID is required") + } + return nil +} diff --git a/pkg/eventbroker/event_test.go b/pkg/eventbroker/event_test.go new file mode 100644 index 0000000..eeb761f --- /dev/null +++ b/pkg/eventbroker/event_test.go @@ -0,0 +1,314 @@ +package eventbroker + +import ( + "encoding/json" + "errors" + "testing" + "time" +) + +func TestNewEvent(t *testing.T) { + event := NewEvent(EventSourceDatabase, "public.users.create") + + if event.ID == "" { + t.Error("Expected event ID to be generated") + } + if event.Source != EventSourceDatabase { + t.Errorf("Expected source %s, got %s", EventSourceDatabase, event.Source) + } + if event.Type != "public.users.create" { + t.Errorf("Expected type 'public.users.create', got %s", event.Type) + } + if event.Status != EventStatusPending { + t.Errorf("Expected status %s, got %s", EventStatusPending, event.Status) + } + if event.CreatedAt.IsZero() { + t.Error("Expected CreatedAt to be set") + } + if event.Metadata == nil { + t.Error("Expected Metadata to be initialized") + } +} + +func TestEventType(t *testing.T) { + tests := []struct { + schema string + entity string + operation string + expected string + }{ + {"public", "users", "create", "public.users.create"}, + {"admin", "roles", "update", "admin.roles.update"}, + {"", "system", "start", ".system.start"}, // Empty schema results in leading dot + } + + for _, tt := range tests { + result := EventType(tt.schema, tt.entity, tt.operation) + if result != tt.expected { + t.Errorf("EventType(%q, %q, %q) = %q, expected %q", + tt.schema, tt.entity, tt.operation, result, tt.expected) + } + } +} + +func TestEventValidate(t *testing.T) { + tests := []struct { + name string + event *Event + wantError bool + }{ + { + name: "valid event", + event: func() *Event { + e := NewEvent(EventSourceDatabase, "public.users.create") + e.InstanceID = "test-instance" + return e + }(), + wantError: false, + }, + { + name: "missing ID", + event: &Event{ + Source: EventSourceDatabase, + Type: "public.users.create", + Status: EventStatusPending, + }, + wantError: true, + }, + { + name: "missing source", + event: &Event{ + ID: "test-id", + Type: "public.users.create", + Status: EventStatusPending, + }, + wantError: true, + }, + { + name: "missing type", + event: &Event{ + ID: "test-id", + Source: EventSourceDatabase, + Status: EventStatusPending, + }, + wantError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.event.Validate() + if (err != nil) != tt.wantError { + t.Errorf("Event.Validate() error = %v, wantError %v", err, tt.wantError) + } + }) + } +} + +func TestEventSetPayload(t *testing.T) { + event := NewEvent(EventSourceDatabase, "public.users.create") + + payload := map[string]interface{}{ + "id": 1, + "name": "John Doe", + "email": "john@example.com", + } + + err := event.SetPayload(payload) + if err != nil { + t.Fatalf("SetPayload failed: %v", err) + } + + if event.Payload == nil { + t.Fatal("Expected payload to be set") + } + + // Verify payload can be unmarshaled + var result map[string]interface{} + if err := json.Unmarshal(event.Payload, &result); err != nil { + t.Fatalf("Failed to unmarshal payload: %v", err) + } + + if result["name"] != "John Doe" { + t.Errorf("Expected name 'John Doe', got %v", result["name"]) + } +} + +func TestEventGetPayload(t *testing.T) { + event := NewEvent(EventSourceDatabase, "public.users.create") + + payload := map[string]interface{}{ + "id": float64(1), // JSON unmarshals numbers as float64 + "name": "John Doe", + } + + if err := event.SetPayload(payload); err != nil { + t.Fatalf("SetPayload failed: %v", err) + } + + var result map[string]interface{} + if err := event.GetPayload(&result); err != nil { + t.Fatalf("GetPayload failed: %v", err) + } + + if result["name"] != "John Doe" { + t.Errorf("Expected name 'John Doe', got %v", result["name"]) + } +} + +func TestEventMarkProcessing(t *testing.T) { + event := NewEvent(EventSourceDatabase, "public.users.create") + event.MarkProcessing() + + if event.Status != EventStatusProcessing { + t.Errorf("Expected status %s, got %s", EventStatusProcessing, event.Status) + } + if event.ProcessedAt == nil { + t.Error("Expected ProcessedAt to be set") + } +} + +func TestEventMarkCompleted(t *testing.T) { + event := NewEvent(EventSourceDatabase, "public.users.create") + event.MarkCompleted() + + if event.Status != EventStatusCompleted { + t.Errorf("Expected status %s, got %s", EventStatusCompleted, event.Status) + } + if event.CompletedAt == nil { + t.Error("Expected CompletedAt to be set") + } +} + +func TestEventMarkFailed(t *testing.T) { + event := NewEvent(EventSourceDatabase, "public.users.create") + testErr := errors.New("test error") + event.MarkFailed(testErr) + + if event.Status != EventStatusFailed { + t.Errorf("Expected status %s, got %s", EventStatusFailed, event.Status) + } + if event.Error != "test error" { + t.Errorf("Expected error %q, got %q", "test error", event.Error) + } + if event.CompletedAt == nil { + t.Error("Expected CompletedAt to be set") + } +} + +func TestEventIncrementRetry(t *testing.T) { + event := NewEvent(EventSourceDatabase, "public.users.create") + + initialCount := event.RetryCount + event.IncrementRetry() + + if event.RetryCount != initialCount+1 { + t.Errorf("Expected retry count %d, got %d", initialCount+1, event.RetryCount) + } +} + +func TestEventJSONMarshaling(t *testing.T) { + event := NewEvent(EventSourceDatabase, "public.users.create") + event.UserID = 123 + event.SessionID = "session-123" + event.InstanceID = "instance-1" + event.Schema = "public" + event.Entity = "users" + event.Operation = "create" + event.SetPayload(map[string]interface{}{"name": "Test"}) + + // Marshal to JSON + data, err := json.Marshal(event) + if err != nil { + t.Fatalf("Failed to marshal event: %v", err) + } + + // Unmarshal back + var decoded Event + if err := json.Unmarshal(data, &decoded); err != nil { + t.Fatalf("Failed to unmarshal event: %v", err) + } + + // Verify fields + if decoded.ID != event.ID { + t.Errorf("Expected ID %s, got %s", event.ID, decoded.ID) + } + if decoded.Source != event.Source { + t.Errorf("Expected source %s, got %s", event.Source, decoded.Source) + } + if decoded.UserID != event.UserID { + t.Errorf("Expected UserID %d, got %d", event.UserID, decoded.UserID) + } +} + +func TestEventStatusString(t *testing.T) { + statuses := []EventStatus{ + EventStatusPending, + EventStatusProcessing, + EventStatusCompleted, + EventStatusFailed, + } + + for _, status := range statuses { + if string(status) == "" { + t.Errorf("EventStatus %v has empty string representation", status) + } + } +} + +func TestEventSourceString(t *testing.T) { + sources := []EventSource{ + EventSourceDatabase, + EventSourceWebSocket, + EventSourceFrontend, + EventSourceSystem, + EventSourceInternal, + } + + for _, source := range sources { + if string(source) == "" { + t.Errorf("EventSource %v has empty string representation", source) + } + } +} + +func TestEventMetadata(t *testing.T) { + event := NewEvent(EventSourceDatabase, "public.users.create") + + // Test setting metadata + event.Metadata["key1"] = "value1" + event.Metadata["key2"] = 123 + + if event.Metadata["key1"] != "value1" { + t.Errorf("Expected metadata key1 to be 'value1', got %v", event.Metadata["key1"]) + } + if event.Metadata["key2"] != 123 { + t.Errorf("Expected metadata key2 to be 123, got %v", event.Metadata["key2"]) + } +} + +func TestEventTimestamps(t *testing.T) { + event := NewEvent(EventSourceDatabase, "public.users.create") + createdAt := event.CreatedAt + + // Wait a tiny bit to ensure timestamps differ + time.Sleep(time.Millisecond) + + event.MarkProcessing() + if event.ProcessedAt == nil { + t.Fatal("ProcessedAt should be set") + } + if !event.ProcessedAt.After(createdAt) { + t.Error("ProcessedAt should be after CreatedAt") + } + + time.Sleep(time.Millisecond) + + event.MarkCompleted() + if event.CompletedAt == nil { + t.Fatal("CompletedAt should be set") + } + if !event.CompletedAt.After(*event.ProcessedAt) { + t.Error("CompletedAt should be after ProcessedAt") + } +} diff --git a/pkg/eventbroker/eventbroker.go b/pkg/eventbroker/eventbroker.go new file mode 100644 index 0000000..d7e9ee2 --- /dev/null +++ b/pkg/eventbroker/eventbroker.go @@ -0,0 +1,160 @@ +package eventbroker + +import ( + "context" + "fmt" + "sync" + + "github.com/bitechdev/ResolveSpec/pkg/config" + "github.com/bitechdev/ResolveSpec/pkg/logger" + "github.com/bitechdev/ResolveSpec/pkg/server" +) + +var ( + defaultBroker Broker + brokerMu sync.RWMutex +) + +// Initialize initializes the global event broker from configuration +func Initialize(cfg config.EventBrokerConfig) error { + if !cfg.Enabled { + logger.Info("Event broker is disabled") + return nil + } + + // Create provider + provider, err := NewProviderFromConfig(cfg) + if err != nil { + return fmt.Errorf("failed to create provider: %w", err) + } + + // Parse mode + mode := ProcessingModeAsync + if cfg.Mode == "sync" { + mode = ProcessingModeSync + } + + // Convert retry policy + retryPolicy := &RetryPolicy{ + MaxRetries: cfg.RetryPolicy.MaxRetries, + InitialDelay: cfg.RetryPolicy.InitialDelay, + MaxDelay: cfg.RetryPolicy.MaxDelay, + BackoffFactor: cfg.RetryPolicy.BackoffFactor, + } + if retryPolicy.MaxRetries == 0 { + retryPolicy = DefaultRetryPolicy() + } + + // Create broker options + opts := Options{ + Provider: provider, + Mode: mode, + WorkerCount: cfg.WorkerCount, + BufferSize: cfg.BufferSize, + RetryPolicy: retryPolicy, + InstanceID: getInstanceID(cfg.InstanceID), + } + + // Create broker + broker, err := NewBroker(opts) + if err != nil { + return fmt.Errorf("failed to create broker: %w", err) + } + + // Start broker + if err := broker.Start(context.Background()); err != nil { + return fmt.Errorf("failed to start broker: %w", err) + } + + // Set as default + SetDefaultBroker(broker) + + // Register shutdown callback + RegisterShutdown(broker) + + logger.Info("Event broker initialized successfully (provider: %s, mode: %s, instance: %s)", + cfg.Provider, cfg.Mode, opts.InstanceID) + + return nil +} + +// SetDefaultBroker sets the default global broker +func SetDefaultBroker(broker Broker) { + brokerMu.Lock() + defer brokerMu.Unlock() + defaultBroker = broker +} + +// GetDefaultBroker returns the default global broker +func GetDefaultBroker() Broker { + brokerMu.RLock() + defer brokerMu.RUnlock() + return defaultBroker +} + +// IsInitialized returns true if the default broker is initialized +func IsInitialized() bool { + return GetDefaultBroker() != nil +} + +// Publish publishes an event using the default broker +func Publish(ctx context.Context, event *Event) error { + broker := GetDefaultBroker() + if broker == nil { + return fmt.Errorf("event broker not initialized") + } + return broker.Publish(ctx, event) +} + +// PublishSync publishes an event synchronously using the default broker +func PublishSync(ctx context.Context, event *Event) error { + broker := GetDefaultBroker() + if broker == nil { + return fmt.Errorf("event broker not initialized") + } + return broker.PublishSync(ctx, event) +} + +// PublishAsync publishes an event asynchronously using the default broker +func PublishAsync(ctx context.Context, event *Event) error { + broker := GetDefaultBroker() + if broker == nil { + return fmt.Errorf("event broker not initialized") + } + return broker.PublishAsync(ctx, event) +} + +// Subscribe subscribes to events using the default broker +func Subscribe(pattern string, handler EventHandler) (SubscriptionID, error) { + broker := GetDefaultBroker() + if broker == nil { + return "", fmt.Errorf("event broker not initialized") + } + return broker.Subscribe(pattern, handler) +} + +// Unsubscribe unsubscribes from events using the default broker +func Unsubscribe(id SubscriptionID) error { + broker := GetDefaultBroker() + if broker == nil { + return fmt.Errorf("event broker not initialized") + } + return broker.Unsubscribe(id) +} + +// Stats returns statistics from the default broker +func Stats(ctx context.Context) (*BrokerStats, error) { + broker := GetDefaultBroker() + if broker == nil { + return nil, fmt.Errorf("event broker not initialized") + } + return broker.Stats(ctx) +} + +// RegisterShutdown registers the broker's shutdown with the server shutdown callbacks +func RegisterShutdown(broker Broker) { + server.RegisterShutdownCallback(func(ctx context.Context) error { + logger.Info("Shutting down event broker...") + return broker.Stop(ctx) + }) +} diff --git a/pkg/eventbroker/example_usage.go b/pkg/eventbroker/example_usage.go new file mode 100644 index 0000000..7f4ace9 --- /dev/null +++ b/pkg/eventbroker/example_usage.go @@ -0,0 +1,266 @@ +// nolint +package eventbroker + +import ( + "context" + "fmt" + "time" + + "github.com/bitechdev/ResolveSpec/pkg/logger" +) + +// Example demonstrates basic usage of the event broker +func Example() { + // 1. Create a memory provider + provider := NewMemoryProvider(MemoryProviderOptions{ + InstanceID: "example-instance", + MaxEvents: 1000, + CleanupInterval: 5 * time.Minute, + MaxAge: 1 * time.Hour, + }) + + // 2. Create a broker + broker, err := NewBroker(Options{ + Provider: provider, + Mode: ProcessingModeAsync, + WorkerCount: 5, + BufferSize: 100, + RetryPolicy: DefaultRetryPolicy(), + InstanceID: "example-instance", + }) + if err != nil { + logger.Error("Failed to create broker: %v", err) + return + } + + // 3. Start the broker + if err := broker.Start(context.Background()); err != nil { + logger.Error("Failed to start broker: %v", err) + return + } + defer func() { + err := broker.Stop(context.Background()) + if err != nil { + logger.Error("Failed to stop broker: %v", err) + } + }() + + // 4. Subscribe to events + broker.Subscribe("public.users.*", EventHandlerFunc( + func(ctx context.Context, event *Event) error { + logger.Info("User event: %s (operation: %s)", event.Type, event.Operation) + return nil + }, + )) + + broker.Subscribe("*.*.create", EventHandlerFunc( + func(ctx context.Context, event *Event) error { + logger.Info("Create event: %s.%s", event.Schema, event.Entity) + return nil + }, + )) + + // 5. Publish events + ctx := context.Background() + + // Database event + dbEvent := NewEvent(EventSourceDatabase, EventType("public", "users", "create")) + dbEvent.InstanceID = "example-instance" + dbEvent.UserID = 123 + dbEvent.SessionID = "session-456" + dbEvent.Schema = "public" + dbEvent.Entity = "users" + dbEvent.Operation = "create" + dbEvent.SetPayload(map[string]interface{}{ + "id": 123, + "name": "John Doe", + "email": "john@example.com", + }) + + if err := broker.PublishAsync(ctx, dbEvent); err != nil { + logger.Error("Failed to publish event: %v", err) + } + + // WebSocket event + wsEvent := NewEvent(EventSourceWebSocket, "chat.message") + wsEvent.InstanceID = "example-instance" + wsEvent.UserID = 123 + wsEvent.SessionID = "session-456" + wsEvent.SetPayload(map[string]interface{}{ + "room": "general", + "message": "Hello, World!", + }) + + if err := broker.PublishAsync(ctx, wsEvent); err != nil { + logger.Error("Failed to publish event: %v", err) + } + + // 6. Get statistics + time.Sleep(1 * time.Second) // Wait for processing + stats, _ := broker.Stats(ctx) + logger.Info("Broker stats: %d published, %d processed", stats.TotalPublished, stats.TotalProcessed) +} + +// ExampleWithHooks demonstrates integration with the hook system +func ExampleWithHooks() { + // This would typically be called in your main.go or initialization code + // after setting up your restheadspec.Handler + + // Pseudo-code (actual implementation would use real handler): + /* + broker := eventbroker.GetDefaultBroker() + hookRegistry := handler.Hooks() + + // Register CRUD hooks + config := eventbroker.DefaultCRUDHookConfig() + config.EnableRead = false // Disable read events for performance + + if err := eventbroker.RegisterCRUDHooks(broker, hookRegistry, config); err != nil { + logger.Error("Failed to register CRUD hooks: %v", err) + } + + // Now all CRUD operations will automatically publish events + */ +} + +// ExampleSubscriptionPatterns demonstrates different subscription patterns +func ExampleSubscriptionPatterns() { + broker := GetDefaultBroker() + if broker == nil { + return + } + + // Pattern 1: Subscribe to all events from a specific entity + broker.Subscribe("public.users.*", EventHandlerFunc( + func(ctx context.Context, event *Event) error { + fmt.Printf("User event: %s\n", event.Operation) + return nil + }, + )) + + // Pattern 2: Subscribe to a specific operation across all entities + broker.Subscribe("*.*.create", EventHandlerFunc( + func(ctx context.Context, event *Event) error { + fmt.Printf("Create event: %s.%s\n", event.Schema, event.Entity) + return nil + }, + )) + + // Pattern 3: Subscribe to all events in a schema + broker.Subscribe("public.*.*", EventHandlerFunc( + func(ctx context.Context, event *Event) error { + fmt.Printf("Public schema event: %s.%s\n", event.Entity, event.Operation) + return nil + }, + )) + + // Pattern 4: Subscribe to everything (use with caution) + broker.Subscribe("*", EventHandlerFunc( + func(ctx context.Context, event *Event) error { + fmt.Printf("Any event: %s\n", event.Type) + return nil + }, + )) +} + +// ExampleErrorHandling demonstrates error handling in event handlers +func ExampleErrorHandling() { + broker := GetDefaultBroker() + if broker == nil { + return + } + + // Handler that may fail + broker.Subscribe("public.users.create", EventHandlerFunc( + func(ctx context.Context, event *Event) error { + // Simulate processing + var user struct { + ID int `json:"id"` + Email string `json:"email"` + } + + if err := event.GetPayload(&user); err != nil { + return fmt.Errorf("invalid payload: %w", err) + } + + // Validate + if user.Email == "" { + return fmt.Errorf("email is required") + } + + // Process (e.g., send email) + logger.Info("Sending welcome email to %s", user.Email) + + return nil + }, + )) +} + +// ExampleConfiguration demonstrates initializing from configuration +func ExampleConfiguration() { + // This would typically be in your main.go + + // Pseudo-code: + /* + // Load configuration + cfgMgr := config.NewManager() + if err := cfgMgr.Load(); err != nil { + logger.Fatal("Failed to load config: %v", err) + } + + cfg, err := cfgMgr.GetConfig() + if err != nil { + logger.Fatal("Failed to get config: %v", err) + } + + // Initialize event broker + if err := eventbroker.Initialize(cfg.EventBroker); err != nil { + logger.Fatal("Failed to initialize event broker: %v", err) + } + + // Use the default broker + eventbroker.Subscribe("*.*.create", eventbroker.EventHandlerFunc( + func(ctx context.Context, event *eventbroker.Event) error { + logger.Info("Created: %s.%s", event.Schema, event.Entity) + return nil + }, + )) + */ +} + +// ExampleYAMLConfiguration shows example YAML configuration +const ExampleYAMLConfiguration = ` +event_broker: + enabled: true + provider: memory # memory, redis, nats, database + mode: async # sync, async + worker_count: 10 + buffer_size: 1000 + instance_id: "${HOSTNAME}" + + # Memory provider is default, no additional config needed + + # Redis provider (when provider: redis) + redis: + stream_name: "resolvespec:events" + consumer_group: "resolvespec-workers" + host: "localhost" + port: 6379 + + # NATS provider (when provider: nats) + nats: + url: "nats://localhost:4222" + stream_name: "RESOLVESPEC_EVENTS" + + # Database provider (when provider: database) + database: + table_name: "events" + channel: "resolvespec_events" + + # Retry policy + retry_policy: + max_retries: 3 + initial_delay: 1s + max_delay: 30s + backoff_factor: 2.0 +` diff --git a/pkg/eventbroker/factory.go b/pkg/eventbroker/factory.go new file mode 100644 index 0000000..df35219 --- /dev/null +++ b/pkg/eventbroker/factory.go @@ -0,0 +1,56 @@ +package eventbroker + +import ( + "fmt" + "os" + "time" + + "github.com/bitechdev/ResolveSpec/pkg/config" +) + +// NewProviderFromConfig creates a provider based on configuration +func NewProviderFromConfig(cfg config.EventBrokerConfig) (Provider, error) { + switch cfg.Provider { + case "memory": + cleanupInterval := 5 * time.Minute + if cfg.Database.PollInterval > 0 { + cleanupInterval = cfg.Database.PollInterval + } + + return NewMemoryProvider(MemoryProviderOptions{ + InstanceID: getInstanceID(cfg.InstanceID), + MaxEvents: 10000, + CleanupInterval: cleanupInterval, + }), nil + + case "redis": + // Redis provider will be implemented in Phase 8 + return nil, fmt.Errorf("redis provider not yet implemented") + + case "nats": + // NATS provider will be implemented in Phase 9 + return nil, fmt.Errorf("nats provider not yet implemented") + + case "database": + // Database provider will be implemented in Phase 7 + return nil, fmt.Errorf("database provider not yet implemented") + + default: + return nil, fmt.Errorf("unknown provider: %s", cfg.Provider) + } +} + +// getInstanceID returns the instance ID, defaulting to hostname if not specified +func getInstanceID(configID string) string { + if configID != "" { + return configID + } + + // Try to get hostname + if hostname, err := os.Hostname(); err == nil { + return hostname + } + + // Fallback to a default + return "resolvespec-instance" +} diff --git a/pkg/eventbroker/handler.go b/pkg/eventbroker/handler.go new file mode 100644 index 0000000..3520046 --- /dev/null +++ b/pkg/eventbroker/handler.go @@ -0,0 +1,17 @@ +package eventbroker + +import "context" + +// EventHandler processes an event +type EventHandler interface { + Handle(ctx context.Context, event *Event) error +} + +// EventHandlerFunc is a function adapter for EventHandler +// This allows using regular functions as event handlers +type EventHandlerFunc func(ctx context.Context, event *Event) error + +// Handle implements EventHandler +func (f EventHandlerFunc) Handle(ctx context.Context, event *Event) error { + return f(ctx, event) +} diff --git a/pkg/eventbroker/hooks.go b/pkg/eventbroker/hooks.go new file mode 100644 index 0000000..232835e --- /dev/null +++ b/pkg/eventbroker/hooks.go @@ -0,0 +1,137 @@ +package eventbroker + +import ( + "encoding/json" + "fmt" + + "github.com/bitechdev/ResolveSpec/pkg/logger" + "github.com/bitechdev/ResolveSpec/pkg/restheadspec" + "github.com/bitechdev/ResolveSpec/pkg/security" +) + +// CRUDHookConfig configures which CRUD operations should trigger events +type CRUDHookConfig struct { + EnableCreate bool + EnableRead bool + EnableUpdate bool + EnableDelete bool +} + +// DefaultCRUDHookConfig returns default configuration (all enabled) +func DefaultCRUDHookConfig() *CRUDHookConfig { + return &CRUDHookConfig{ + EnableCreate: true, + EnableRead: false, // Typically disabled for performance + EnableUpdate: true, + EnableDelete: true, + } +} + +// RegisterCRUDHooks registers event hooks for CRUD operations +// This integrates with the restheadspec.HookRegistry to automatically +// capture database events +func RegisterCRUDHooks(broker Broker, hookRegistry *restheadspec.HookRegistry, config *CRUDHookConfig) error { + if broker == nil { + return fmt.Errorf("broker cannot be nil") + } + if hookRegistry == nil { + return fmt.Errorf("hookRegistry cannot be nil") + } + if config == nil { + config = DefaultCRUDHookConfig() + } + + // Create hook handler factory + createHookHandler := func(operation string) restheadspec.HookFunc { + return func(hookCtx *restheadspec.HookContext) error { + // Get user context from Go context + userCtx, ok := security.GetUserContext(hookCtx.Context) + if !ok || userCtx == nil { + logger.Debug("No user context found in hook") + userCtx = &security.UserContext{} // Empty user context + } + + // Create event + event := NewEvent(EventSourceDatabase, EventType(hookCtx.Schema, hookCtx.Entity, operation)) + event.InstanceID = broker.InstanceID() + event.UserID = userCtx.UserID + event.SessionID = userCtx.SessionID + event.Schema = hookCtx.Schema + event.Entity = hookCtx.Entity + event.Operation = operation + + // Set payload based on operation + var payload interface{} + switch operation { + case "create": + payload = hookCtx.Result + case "read": + payload = hookCtx.Result + case "update": + payload = map[string]interface{}{ + "id": hookCtx.ID, + "data": hookCtx.Data, + } + case "delete": + payload = map[string]interface{}{ + "id": hookCtx.ID, + } + } + + if payload != nil { + if err := event.SetPayload(payload); err != nil { + logger.Error("Failed to set event payload: %v", err) + payload = map[string]interface{}{"error": "failed to serialize payload"} + event.Payload, _ = json.Marshal(payload) + } + } + + // Add metadata + if userCtx.UserName != "" { + event.Metadata["user_name"] = userCtx.UserName + } + if userCtx.Email != "" { + event.Metadata["user_email"] = userCtx.Email + } + if len(userCtx.Roles) > 0 { + event.Metadata["user_roles"] = userCtx.Roles + } + event.Metadata["table_name"] = hookCtx.TableName + + // Publish asynchronously to not block CRUD operation + if err := broker.PublishAsync(hookCtx.Context, event); err != nil { + logger.Error("Failed to publish %s event for %s.%s: %v", + operation, hookCtx.Schema, hookCtx.Entity, err) + // Don't fail the CRUD operation if event publishing fails + return nil + } + + logger.Debug("Published %s event for %s.%s (ID: %s)", + operation, hookCtx.Schema, hookCtx.Entity, event.ID) + return nil + } + } + + // Register hooks based on configuration + if config.EnableCreate { + hookRegistry.Register(restheadspec.AfterCreate, createHookHandler("create")) + logger.Info("Registered event hook for CREATE operations") + } + + if config.EnableRead { + hookRegistry.Register(restheadspec.AfterRead, createHookHandler("read")) + logger.Info("Registered event hook for READ operations") + } + + if config.EnableUpdate { + hookRegistry.Register(restheadspec.AfterUpdate, createHookHandler("update")) + logger.Info("Registered event hook for UPDATE operations") + } + + if config.EnableDelete { + hookRegistry.Register(restheadspec.AfterDelete, createHookHandler("delete")) + logger.Info("Registered event hook for DELETE operations") + } + + return nil +} diff --git a/pkg/eventbroker/metrics.go b/pkg/eventbroker/metrics.go new file mode 100644 index 0000000..9f70e94 --- /dev/null +++ b/pkg/eventbroker/metrics.go @@ -0,0 +1,28 @@ +package eventbroker + +import ( + "time" + + "github.com/bitechdev/ResolveSpec/pkg/metrics" +) + +// recordEventPublished records an event publication metric +func recordEventPublished(event *Event) { + if mp := metrics.GetProvider(); mp != nil { + mp.RecordEventPublished(string(event.Source), event.Type) + } +} + +// recordEventProcessed records an event processing metric +func recordEventProcessed(event *Event, duration time.Duration) { + if mp := metrics.GetProvider(); mp != nil { + mp.RecordEventProcessed(string(event.Source), event.Type, string(event.Status), duration) + } +} + +// updateQueueSize updates the event queue size metric +func updateQueueSize(size int64) { + if mp := metrics.GetProvider(); mp != nil { + mp.UpdateEventQueueSize(size) + } +} diff --git a/pkg/eventbroker/provider.go b/pkg/eventbroker/provider.go new file mode 100644 index 0000000..59951c9 --- /dev/null +++ b/pkg/eventbroker/provider.go @@ -0,0 +1,70 @@ +package eventbroker + +import ( + "context" + "time" +) + +// Provider defines the storage backend interface for events +// Implementations: MemoryProvider, RedisProvider, NATSProvider, DatabaseProvider +type Provider interface { + // Store stores an event + Store(ctx context.Context, event *Event) error + + // Get retrieves an event by ID + Get(ctx context.Context, id string) (*Event, error) + + // List lists events with optional filters + List(ctx context.Context, filter *EventFilter) ([]*Event, error) + + // UpdateStatus updates the status of an event + UpdateStatus(ctx context.Context, id string, status EventStatus, errorMsg string) error + + // Delete deletes an event by ID + Delete(ctx context.Context, id string) error + + // Stream returns a channel of events for real-time consumption + // Used for cross-instance pub/sub + // The channel is closed when the context is canceled or an error occurs + Stream(ctx context.Context, pattern string) (<-chan *Event, error) + + // Publish publishes an event to all subscribers (for distributed providers) + // For in-memory provider, this is the same as Store + // For Redis/NATS/Database, this triggers cross-instance delivery + Publish(ctx context.Context, event *Event) error + + // Close closes the provider and releases resources + Close() error + + // Stats returns provider statistics + Stats(ctx context.Context) (*ProviderStats, error) +} + +// EventFilter defines filter criteria for listing events +type EventFilter struct { + Source *EventSource + Status *EventStatus + UserID *int + Schema string + Entity string + Operation string + InstanceID string + StartTime *time.Time + EndTime *time.Time + Limit int + Offset int +} + +// ProviderStats contains statistics about the provider +type ProviderStats struct { + ProviderType string `json:"provider_type"` + TotalEvents int64 `json:"total_events"` + PendingEvents int64 `json:"pending_events"` + ProcessingEvents int64 `json:"processing_events"` + CompletedEvents int64 `json:"completed_events"` + FailedEvents int64 `json:"failed_events"` + EventsPublished int64 `json:"events_published"` + EventsConsumed int64 `json:"events_consumed"` + ActiveSubscribers int `json:"active_subscribers"` + ProviderSpecific map[string]interface{} `json:"provider_specific,omitempty"` +} diff --git a/pkg/eventbroker/provider_memory.go b/pkg/eventbroker/provider_memory.go new file mode 100644 index 0000000..a6e5215 --- /dev/null +++ b/pkg/eventbroker/provider_memory.go @@ -0,0 +1,446 @@ +package eventbroker + +import ( + "context" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/bitechdev/ResolveSpec/pkg/logger" +) + +// MemoryProvider implements Provider interface using in-memory storage +// Features: +// - Thread-safe event storage with RW mutex +// - LRU eviction when max events reached +// - In-process pub/sub (not cross-instance) +// - Automatic cleanup of old completed events +type MemoryProvider struct { + mu sync.RWMutex + events map[string]*Event + eventOrder []string // For LRU tracking + subscribers map[string][]chan *Event + instanceID string + maxEvents int + cleanupInterval time.Duration + maxAge time.Duration + + // Statistics + stats MemoryProviderStats + + // Lifecycle + stopCleanup chan struct{} + wg sync.WaitGroup + isRunning atomic.Bool +} + +// MemoryProviderStats contains statistics for the memory provider +type MemoryProviderStats struct { + TotalEvents atomic.Int64 + PendingEvents atomic.Int64 + ProcessingEvents atomic.Int64 + CompletedEvents atomic.Int64 + FailedEvents atomic.Int64 + EventsPublished atomic.Int64 + EventsConsumed atomic.Int64 + ActiveSubscribers atomic.Int32 + Evictions atomic.Int64 +} + +// MemoryProviderOptions configures the memory provider +type MemoryProviderOptions struct { + InstanceID string + MaxEvents int + CleanupInterval time.Duration + MaxAge time.Duration +} + +// NewMemoryProvider creates a new in-memory event provider +func NewMemoryProvider(opts MemoryProviderOptions) *MemoryProvider { + if opts.MaxEvents == 0 { + opts.MaxEvents = 10000 // Default + } + if opts.CleanupInterval == 0 { + opts.CleanupInterval = 5 * time.Minute // Default + } + if opts.MaxAge == 0 { + opts.MaxAge = 24 * time.Hour // Default: keep events for 24 hours + } + + mp := &MemoryProvider{ + events: make(map[string]*Event), + eventOrder: make([]string, 0), + subscribers: make(map[string][]chan *Event), + instanceID: opts.InstanceID, + maxEvents: opts.MaxEvents, + cleanupInterval: opts.CleanupInterval, + maxAge: opts.MaxAge, + stopCleanup: make(chan struct{}), + } + + mp.isRunning.Store(true) + + // Start cleanup goroutine + mp.wg.Add(1) + go mp.cleanupLoop() + + logger.Info("Memory provider initialized (max_events: %d, cleanup: %v, max_age: %v)", + opts.MaxEvents, opts.CleanupInterval, opts.MaxAge) + + return mp +} + +// Store stores an event +func (mp *MemoryProvider) Store(ctx context.Context, event *Event) error { + mp.mu.Lock() + defer mp.mu.Unlock() + + // Check if we need to evict oldest events + if len(mp.events) >= mp.maxEvents { + mp.evictOldestLocked() + } + + // Store event + mp.events[event.ID] = event.Clone() + mp.eventOrder = append(mp.eventOrder, event.ID) + + // Update statistics + mp.stats.TotalEvents.Add(1) + mp.updateStatusCountsLocked(event.Status, 1) + + return nil +} + +// Get retrieves an event by ID +func (mp *MemoryProvider) Get(ctx context.Context, id string) (*Event, error) { + mp.mu.RLock() + defer mp.mu.RUnlock() + + event, exists := mp.events[id] + if !exists { + return nil, fmt.Errorf("event not found: %s", id) + } + + return event.Clone(), nil +} + +// List lists events with optional filters +func (mp *MemoryProvider) List(ctx context.Context, filter *EventFilter) ([]*Event, error) { + mp.mu.RLock() + defer mp.mu.RUnlock() + + var results []*Event + + for _, event := range mp.events { + if mp.matchesFilter(event, filter) { + results = append(results, event.Clone()) + } + } + + // Apply limit and offset + if filter != nil { + if filter.Offset > 0 && filter.Offset < len(results) { + results = results[filter.Offset:] + } + if filter.Limit > 0 && filter.Limit < len(results) { + results = results[:filter.Limit] + } + } + + return results, nil +} + +// UpdateStatus updates the status of an event +func (mp *MemoryProvider) UpdateStatus(ctx context.Context, id string, status EventStatus, errorMsg string) error { + mp.mu.Lock() + defer mp.mu.Unlock() + + event, exists := mp.events[id] + if !exists { + return fmt.Errorf("event not found: %s", id) + } + + // Update status counts + mp.updateStatusCountsLocked(event.Status, -1) + mp.updateStatusCountsLocked(status, 1) + + // Update event + event.Status = status + if errorMsg != "" { + event.Error = errorMsg + } + + return nil +} + +// Delete deletes an event by ID +func (mp *MemoryProvider) Delete(ctx context.Context, id string) error { + mp.mu.Lock() + defer mp.mu.Unlock() + + event, exists := mp.events[id] + if !exists { + return fmt.Errorf("event not found: %s", id) + } + + // Update counts + mp.stats.TotalEvents.Add(-1) + mp.updateStatusCountsLocked(event.Status, -1) + + // Delete event + delete(mp.events, id) + + // Remove from order tracking + for i, eid := range mp.eventOrder { + if eid == id { + mp.eventOrder = append(mp.eventOrder[:i], mp.eventOrder[i+1:]...) + break + } + } + + return nil +} + +// Stream returns a channel of events for real-time consumption +// Note: This is in-process only, not cross-instance +func (mp *MemoryProvider) Stream(ctx context.Context, pattern string) (<-chan *Event, error) { + mp.mu.Lock() + defer mp.mu.Unlock() + + // Create buffered channel for events + ch := make(chan *Event, 100) + + // Store subscriber + mp.subscribers[pattern] = append(mp.subscribers[pattern], ch) + mp.stats.ActiveSubscribers.Add(1) + + // Goroutine to clean up on context cancellation + mp.wg.Add(1) + go func() { + defer mp.wg.Done() + <-ctx.Done() + + mp.mu.Lock() + defer mp.mu.Unlock() + + // Remove subscriber + subs := mp.subscribers[pattern] + for i, subCh := range subs { + if subCh == ch { + mp.subscribers[pattern] = append(subs[:i], subs[i+1:]...) + break + } + } + + mp.stats.ActiveSubscribers.Add(-1) + close(ch) + }() + + logger.Debug("Stream created for pattern: %s", pattern) + return ch, nil +} + +// Publish publishes an event to all subscribers +func (mp *MemoryProvider) Publish(ctx context.Context, event *Event) error { + // Store the event first + if err := mp.Store(ctx, event); err != nil { + return err + } + + mp.stats.EventsPublished.Add(1) + + // Notify subscribers + mp.mu.RLock() + defer mp.mu.RUnlock() + + for pattern, channels := range mp.subscribers { + if matchPattern(pattern, event.Type) { + for _, ch := range channels { + select { + case ch <- event.Clone(): + mp.stats.EventsConsumed.Add(1) + default: + // Channel full, skip + logger.Warn("Subscriber channel full for pattern: %s", pattern) + } + } + } + } + + return nil +} + +// Close closes the provider and releases resources +func (mp *MemoryProvider) Close() error { + if !mp.isRunning.Load() { + return nil + } + + mp.isRunning.Store(false) + + // Stop cleanup loop + close(mp.stopCleanup) + + // Wait for goroutines + mp.wg.Wait() + + // Close all subscriber channels + mp.mu.Lock() + for _, channels := range mp.subscribers { + for _, ch := range channels { + close(ch) + } + } + mp.subscribers = make(map[string][]chan *Event) + mp.mu.Unlock() + + logger.Info("Memory provider closed") + return nil +} + +// Stats returns provider statistics +func (mp *MemoryProvider) Stats(ctx context.Context) (*ProviderStats, error) { + return &ProviderStats{ + ProviderType: "memory", + TotalEvents: mp.stats.TotalEvents.Load(), + PendingEvents: mp.stats.PendingEvents.Load(), + ProcessingEvents: mp.stats.ProcessingEvents.Load(), + CompletedEvents: mp.stats.CompletedEvents.Load(), + FailedEvents: mp.stats.FailedEvents.Load(), + EventsPublished: mp.stats.EventsPublished.Load(), + EventsConsumed: mp.stats.EventsConsumed.Load(), + ActiveSubscribers: int(mp.stats.ActiveSubscribers.Load()), + ProviderSpecific: map[string]interface{}{ + "max_events": mp.maxEvents, + "cleanup_interval": mp.cleanupInterval.String(), + "max_age": mp.maxAge.String(), + "evictions": mp.stats.Evictions.Load(), + }, + }, nil +} + +// cleanupLoop periodically cleans up old completed events +func (mp *MemoryProvider) cleanupLoop() { + defer mp.wg.Done() + + ticker := time.NewTicker(mp.cleanupInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + mp.cleanup() + case <-mp.stopCleanup: + return + } + } +} + +// cleanup removes old completed/failed events +func (mp *MemoryProvider) cleanup() { + mp.mu.Lock() + defer mp.mu.Unlock() + + cutoff := time.Now().Add(-mp.maxAge) + removed := 0 + + for id, event := range mp.events { + // Only clean up completed or failed events that are old + if (event.Status == EventStatusCompleted || event.Status == EventStatusFailed) && + event.CreatedAt.Before(cutoff) { + + delete(mp.events, id) + mp.stats.TotalEvents.Add(-1) + mp.updateStatusCountsLocked(event.Status, -1) + + // Remove from order tracking + for i, eid := range mp.eventOrder { + if eid == id { + mp.eventOrder = append(mp.eventOrder[:i], mp.eventOrder[i+1:]...) + break + } + } + + removed++ + } + } + + if removed > 0 { + logger.Debug("Cleanup removed %d old events", removed) + } +} + +// evictOldestLocked evicts the oldest event (LRU) +// Caller must hold write lock +func (mp *MemoryProvider) evictOldestLocked() { + if len(mp.eventOrder) == 0 { + return + } + + // Get oldest event ID + oldestID := mp.eventOrder[0] + mp.eventOrder = mp.eventOrder[1:] + + // Remove event + if event, exists := mp.events[oldestID]; exists { + delete(mp.events, oldestID) + mp.stats.TotalEvents.Add(-1) + mp.updateStatusCountsLocked(event.Status, -1) + mp.stats.Evictions.Add(1) + + logger.Debug("Evicted oldest event: %s", oldestID) + } +} + +// matchesFilter checks if an event matches the filter criteria +func (mp *MemoryProvider) matchesFilter(event *Event, filter *EventFilter) bool { + if filter == nil { + return true + } + + if filter.Source != nil && event.Source != *filter.Source { + return false + } + if filter.Status != nil && event.Status != *filter.Status { + return false + } + if filter.UserID != nil && event.UserID != *filter.UserID { + return false + } + if filter.Schema != "" && event.Schema != filter.Schema { + return false + } + if filter.Entity != "" && event.Entity != filter.Entity { + return false + } + if filter.Operation != "" && event.Operation != filter.Operation { + return false + } + if filter.InstanceID != "" && event.InstanceID != filter.InstanceID { + return false + } + if filter.StartTime != nil && event.CreatedAt.Before(*filter.StartTime) { + return false + } + if filter.EndTime != nil && event.CreatedAt.After(*filter.EndTime) { + return false + } + + return true +} + +// updateStatusCountsLocked updates status statistics +// Caller must hold write lock +func (mp *MemoryProvider) updateStatusCountsLocked(status EventStatus, delta int64) { + switch status { + case EventStatusPending: + mp.stats.PendingEvents.Add(delta) + case EventStatusProcessing: + mp.stats.ProcessingEvents.Add(delta) + case EventStatusCompleted: + mp.stats.CompletedEvents.Add(delta) + case EventStatusFailed: + mp.stats.FailedEvents.Add(delta) + } +} diff --git a/pkg/eventbroker/provider_memory_test.go b/pkg/eventbroker/provider_memory_test.go new file mode 100644 index 0000000..ffce7db --- /dev/null +++ b/pkg/eventbroker/provider_memory_test.go @@ -0,0 +1,419 @@ +package eventbroker + +import ( + "context" + "testing" + "time" +) + +func TestNewMemoryProvider(t *testing.T) { + provider := NewMemoryProvider(MemoryProviderOptions{ + InstanceID: "test-instance", + MaxEvents: 100, + CleanupInterval: 1 * time.Minute, + }) + + if provider == nil { + t.Fatal("Expected non-nil provider") + } + + stats, err := provider.Stats(context.Background()) + if err != nil { + t.Fatalf("Stats failed: %v", err) + } + + if stats.ProviderType != "memory" { + t.Errorf("Expected provider type 'memory', got %s", stats.ProviderType) + } +} + +func TestMemoryProviderPublishAndGet(t *testing.T) { + provider := NewMemoryProvider(MemoryProviderOptions{ + InstanceID: "test-instance", + }) + + event := NewEvent(EventSourceDatabase, "public.users.create") + event.UserID = 123 + + // Publish event + if err := provider.Publish(context.Background(), event); err != nil { + t.Fatalf("Publish failed: %v", err) + } + + // Get event + retrieved, err := provider.Get(context.Background(), event.ID) + if err != nil { + t.Fatalf("Get failed: %v", err) + } + + if retrieved.ID != event.ID { + t.Errorf("Expected event ID %s, got %s", event.ID, retrieved.ID) + } + if retrieved.UserID != 123 { + t.Errorf("Expected user ID 123, got %d", retrieved.UserID) + } +} + +func TestMemoryProviderGetNonExistent(t *testing.T) { + provider := NewMemoryProvider(MemoryProviderOptions{ + InstanceID: "test-instance", + }) + + _, err := provider.Get(context.Background(), "non-existent-id") + if err == nil { + t.Error("Expected error when getting non-existent event") + } +} + +func TestMemoryProviderUpdateStatus(t *testing.T) { + provider := NewMemoryProvider(MemoryProviderOptions{ + InstanceID: "test-instance", + }) + + event := NewEvent(EventSourceDatabase, "public.users.create") + provider.Publish(context.Background(), event) + + // Update status to processing + err := provider.UpdateStatus(context.Background(), event.ID, EventStatusProcessing, "") + if err != nil { + t.Fatalf("UpdateStatus failed: %v", err) + } + + retrieved, _ := provider.Get(context.Background(), event.ID) + if retrieved.Status != EventStatusProcessing { + t.Errorf("Expected status %s, got %s", EventStatusProcessing, retrieved.Status) + } + + // Update status to failed with error + err = provider.UpdateStatus(context.Background(), event.ID, EventStatusFailed, "test error") + if err != nil { + t.Fatalf("UpdateStatus failed: %v", err) + } + + retrieved, _ = provider.Get(context.Background(), event.ID) + if retrieved.Status != EventStatusFailed { + t.Errorf("Expected status %s, got %s", EventStatusFailed, retrieved.Status) + } + if retrieved.Error != "test error" { + t.Errorf("Expected error 'test error', got %s", retrieved.Error) + } +} + +func TestMemoryProviderList(t *testing.T) { + provider := NewMemoryProvider(MemoryProviderOptions{ + InstanceID: "test-instance", + }) + + // Publish multiple events + for i := 0; i < 5; i++ { + event := NewEvent(EventSourceDatabase, "public.users.create") + provider.Publish(context.Background(), event) + } + + // List all events + events, err := provider.List(context.Background(), &EventFilter{}) + if err != nil { + t.Fatalf("List failed: %v", err) + } + + if len(events) != 5 { + t.Errorf("Expected 5 events, got %d", len(events)) + } +} + +func TestMemoryProviderListWithFilter(t *testing.T) { + provider := NewMemoryProvider(MemoryProviderOptions{ + InstanceID: "test-instance", + }) + + // Publish events with different types + event1 := NewEvent(EventSourceDatabase, "public.users.create") + provider.Publish(context.Background(), event1) + + event2 := NewEvent(EventSourceDatabase, "public.roles.create") + provider.Publish(context.Background(), event2) + + event3 := NewEvent(EventSourceWebSocket, "chat.message") + provider.Publish(context.Background(), event3) + + // Filter by source + source := EventSourceDatabase + events, err := provider.List(context.Background(), &EventFilter{ + Source: &source, + }) + if err != nil { + t.Fatalf("List failed: %v", err) + } + + if len(events) != 2 { + t.Errorf("Expected 2 events with database source, got %d", len(events)) + } + + // Filter by status + status := EventStatusPending + events, err = provider.List(context.Background(), &EventFilter{ + Status: &status, + }) + if err != nil { + t.Fatalf("List failed: %v", err) + } + + if len(events) != 3 { + t.Errorf("Expected 3 events with pending status, got %d", len(events)) + } +} + +func TestMemoryProviderListWithLimit(t *testing.T) { + provider := NewMemoryProvider(MemoryProviderOptions{ + InstanceID: "test-instance", + }) + + // Publish multiple events + for i := 0; i < 10; i++ { + event := NewEvent(EventSourceDatabase, "test.event") + provider.Publish(context.Background(), event) + } + + // List with limit + events, err := provider.List(context.Background(), &EventFilter{ + Limit: 5, + }) + if err != nil { + t.Fatalf("List failed: %v", err) + } + + if len(events) != 5 { + t.Errorf("Expected 5 events (limited), got %d", len(events)) + } +} + +func TestMemoryProviderDelete(t *testing.T) { + provider := NewMemoryProvider(MemoryProviderOptions{ + InstanceID: "test-instance", + }) + + event := NewEvent(EventSourceDatabase, "public.users.create") + provider.Publish(context.Background(), event) + + // Delete event + err := provider.Delete(context.Background(), event.ID) + if err != nil { + t.Fatalf("Delete failed: %v", err) + } + + // Verify deleted + _, err = provider.Get(context.Background(), event.ID) + if err == nil { + t.Error("Expected error when getting deleted event") + } +} + +func TestMemoryProviderLRUEviction(t *testing.T) { + // Create provider with small max events + provider := NewMemoryProvider(MemoryProviderOptions{ + InstanceID: "test-instance", + MaxEvents: 3, + }) + + // Publish 5 events + events := make([]*Event, 5) + for i := 0; i < 5; i++ { + events[i] = NewEvent(EventSourceDatabase, "test.event") + provider.Publish(context.Background(), events[i]) + } + + // First 2 events should be evicted + _, err := provider.Get(context.Background(), events[0].ID) + if err == nil { + t.Error("Expected first event to be evicted") + } + + _, err = provider.Get(context.Background(), events[1].ID) + if err == nil { + t.Error("Expected second event to be evicted") + } + + // Last 3 events should still exist + for i := 2; i < 5; i++ { + _, err := provider.Get(context.Background(), events[i].ID) + if err != nil { + t.Errorf("Expected event %d to still exist", i) + } + } +} + +func TestMemoryProviderCleanup(t *testing.T) { + // Create provider with short cleanup interval + provider := NewMemoryProvider(MemoryProviderOptions{ + InstanceID: "test-instance", + CleanupInterval: 100 * time.Millisecond, + MaxAge: 200 * time.Millisecond, + }) + + // Publish and complete an event + event := NewEvent(EventSourceDatabase, "test.event") + provider.Publish(context.Background(), event) + provider.UpdateStatus(context.Background(), event.ID, EventStatusCompleted, "") + + // Wait for cleanup to run + time.Sleep(400 * time.Millisecond) + + // Event should be cleaned up + _, err := provider.Get(context.Background(), event.ID) + if err == nil { + t.Error("Expected event to be cleaned up") + } + + provider.Close() +} + +func TestMemoryProviderStats(t *testing.T) { + provider := NewMemoryProvider(MemoryProviderOptions{ + InstanceID: "test-instance", + MaxEvents: 100, + }) + + // Publish events + for i := 0; i < 5; i++ { + event := NewEvent(EventSourceDatabase, "test.event") + provider.Publish(context.Background(), event) + } + + stats, err := provider.Stats(context.Background()) + if err != nil { + t.Fatalf("Stats failed: %v", err) + } + + if stats.ProviderType != "memory" { + t.Errorf("Expected provider type 'memory', got %s", stats.ProviderType) + } + if stats.TotalEvents != 5 { + t.Errorf("Expected 5 total events, got %d", stats.TotalEvents) + } +} + +func TestMemoryProviderClose(t *testing.T) { + provider := NewMemoryProvider(MemoryProviderOptions{ + InstanceID: "test-instance", + CleanupInterval: 100 * time.Millisecond, + }) + + // Publish event + event := NewEvent(EventSourceDatabase, "test.event") + provider.Publish(context.Background(), event) + + // Close provider + err := provider.Close() + if err != nil { + t.Fatalf("Close failed: %v", err) + } + + // Cleanup goroutine should be stopped + time.Sleep(200 * time.Millisecond) +} + +func TestMemoryProviderConcurrency(t *testing.T) { + provider := NewMemoryProvider(MemoryProviderOptions{ + InstanceID: "test-instance", + }) + + // Concurrent publish + done := make(chan bool, 10) + for i := 0; i < 10; i++ { + go func() { + defer func() { done <- true }() + event := NewEvent(EventSourceDatabase, "test.event") + provider.Publish(context.Background(), event) + }() + } + + // Wait for all goroutines + for i := 0; i < 10; i++ { + <-done + } + + // Verify all events were stored + events, _ := provider.List(context.Background(), &EventFilter{}) + if len(events) != 10 { + t.Errorf("Expected 10 events, got %d", len(events)) + } +} + +func TestMemoryProviderStream(t *testing.T) { + provider := NewMemoryProvider(MemoryProviderOptions{ + InstanceID: "test-instance", + }) + + // Stream is implemented for memory provider (in-process pub/sub) + ch, err := provider.Stream(context.Background(), "test.*") + if err != nil { + t.Fatalf("Stream failed: %v", err) + } + if ch == nil { + t.Error("Expected non-nil channel") + } +} + +func TestMemoryProviderTimeRangeFilter(t *testing.T) { + provider := NewMemoryProvider(MemoryProviderOptions{ + InstanceID: "test-instance", + }) + + // Publish events at different times + event1 := NewEvent(EventSourceDatabase, "test.event") + provider.Publish(context.Background(), event1) + + time.Sleep(10 * time.Millisecond) + + event2 := NewEvent(EventSourceDatabase, "test.event") + provider.Publish(context.Background(), event2) + + time.Sleep(10 * time.Millisecond) + + event3 := NewEvent(EventSourceDatabase, "test.event") + provider.Publish(context.Background(), event3) + + // Filter by time range + startTime := event2.CreatedAt.Add(-1 * time.Millisecond) + events, err := provider.List(context.Background(), &EventFilter{ + StartTime: &startTime, + }) + if err != nil { + t.Fatalf("List failed: %v", err) + } + + // Should get events 2 and 3 + if len(events) != 2 { + t.Errorf("Expected 2 events after start time, got %d", len(events)) + } +} + +func TestMemoryProviderInstanceIDFilter(t *testing.T) { + provider := NewMemoryProvider(MemoryProviderOptions{ + InstanceID: "test-instance", + }) + + // Publish events with different instance IDs + event1 := NewEvent(EventSourceDatabase, "test.event") + event1.InstanceID = "instance-1" + provider.Publish(context.Background(), event1) + + event2 := NewEvent(EventSourceDatabase, "test.event") + event2.InstanceID = "instance-2" + provider.Publish(context.Background(), event2) + + // Filter by instance ID + events, err := provider.List(context.Background(), &EventFilter{ + InstanceID: "instance-1", + }) + if err != nil { + t.Fatalf("List failed: %v", err) + } + + if len(events) != 1 { + t.Errorf("Expected 1 event with instance-1, got %d", len(events)) + } + if events[0].InstanceID != "instance-1" { + t.Errorf("Expected instance ID 'instance-1', got %s", events[0].InstanceID) + } +} diff --git a/pkg/eventbroker/subscription.go b/pkg/eventbroker/subscription.go new file mode 100644 index 0000000..370fc15 --- /dev/null +++ b/pkg/eventbroker/subscription.go @@ -0,0 +1,140 @@ +package eventbroker + +import ( + "fmt" + "strings" + "sync" + "sync/atomic" + + "github.com/bitechdev/ResolveSpec/pkg/logger" +) + +// SubscriptionID uniquely identifies a subscription +type SubscriptionID string + +// subscription represents a single subscription with its handler and pattern +type subscription struct { + id SubscriptionID + pattern string + handler EventHandler +} + +// subscriptionManager manages event subscriptions and pattern matching +type subscriptionManager struct { + mu sync.RWMutex + subscriptions map[SubscriptionID]*subscription + nextID atomic.Uint64 +} + +// newSubscriptionManager creates a new subscription manager +func newSubscriptionManager() *subscriptionManager { + return &subscriptionManager{ + subscriptions: make(map[SubscriptionID]*subscription), + } +} + +// Subscribe adds a new subscription +func (sm *subscriptionManager) Subscribe(pattern string, handler EventHandler) (SubscriptionID, error) { + if pattern == "" { + return "", fmt.Errorf("pattern cannot be empty") + } + if handler == nil { + return "", fmt.Errorf("handler cannot be nil") + } + + id := SubscriptionID(fmt.Sprintf("sub-%d", sm.nextID.Add(1))) + + sm.mu.Lock() + sm.subscriptions[id] = &subscription{ + id: id, + pattern: pattern, + handler: handler, + } + sm.mu.Unlock() + + logger.Info("Subscribed to pattern '%s' with ID: %s", pattern, id) + return id, nil +} + +// Unsubscribe removes a subscription +func (sm *subscriptionManager) Unsubscribe(id SubscriptionID) error { + sm.mu.Lock() + defer sm.mu.Unlock() + + if _, exists := sm.subscriptions[id]; !exists { + return fmt.Errorf("subscription not found: %s", id) + } + + delete(sm.subscriptions, id) + logger.Info("Unsubscribed: %s", id) + return nil +} + +// GetMatching returns all handlers that match the event type +func (sm *subscriptionManager) GetMatching(eventType string) []EventHandler { + sm.mu.RLock() + defer sm.mu.RUnlock() + + var handlers []EventHandler + for _, sub := range sm.subscriptions { + if matchPattern(sub.pattern, eventType) { + handlers = append(handlers, sub.handler) + } + } + + return handlers +} + +// Count returns the number of active subscriptions +func (sm *subscriptionManager) Count() int { + sm.mu.RLock() + defer sm.mu.RUnlock() + return len(sm.subscriptions) +} + +// Clear removes all subscriptions +func (sm *subscriptionManager) Clear() { + sm.mu.Lock() + defer sm.mu.Unlock() + sm.subscriptions = make(map[SubscriptionID]*subscription) + logger.Info("Cleared all subscriptions") +} + +// matchPattern implements glob-style pattern matching for event types +// Patterns: +// - "*" matches any single segment +// - "a.b.c" matches exactly "a.b.c" +// - "a.*.c" matches "a.anything.c" +// - "a.b.*" matches any operation on a.b +// - "*" matches everything +// +// Event type format: schema.entity.operation (e.g., "public.users.create") +func matchPattern(pattern, eventType string) bool { + // Wildcard matches everything + if pattern == "*" { + return true + } + + // Exact match + if pattern == eventType { + return true + } + + // Split pattern and event type by dots + patternParts := strings.Split(pattern, ".") + eventParts := strings.Split(eventType, ".") + + // Different number of parts can only match if pattern has wildcards + if len(patternParts) != len(eventParts) { + return false + } + + // Match each part + for i := range patternParts { + if patternParts[i] != "*" && patternParts[i] != eventParts[i] { + return false + } + } + + return true +} diff --git a/pkg/eventbroker/subscription_test.go b/pkg/eventbroker/subscription_test.go new file mode 100644 index 0000000..3317f3e --- /dev/null +++ b/pkg/eventbroker/subscription_test.go @@ -0,0 +1,270 @@ +package eventbroker + +import ( + "context" + "testing" +) + +func TestMatchPattern(t *testing.T) { + tests := []struct { + pattern string + eventType string + expected bool + }{ + // Exact matches + {"public.users.create", "public.users.create", true}, + {"public.users.create", "public.users.update", false}, + + // Wildcard matches + {"*", "public.users.create", true}, + {"*", "anything", true}, + {"public.*", "public.users", true}, + {"public.*", "public.users.create", false}, // Different number of parts + {"public.*", "admin.users", false}, + {"*.users.create", "public.users.create", true}, + {"*.users.create", "admin.users.create", true}, + {"*.users.create", "public.roles.create", false}, + {"public.*.create", "public.users.create", true}, + {"public.*.create", "public.roles.create", true}, + {"public.*.create", "public.users.update", false}, + + // Multiple wildcards + {"*.*", "public.users", true}, + {"*.*", "public.users.create", false}, // Different number of parts + {"*.*.create", "public.users.create", true}, + {"*.*.create", "admin.roles.create", true}, + {"*.*.create", "public.users.update", false}, + + // Edge cases + {"", "", true}, + {"", "something", false}, + {"something", "", false}, + } + + for _, tt := range tests { + t.Run(tt.pattern+"_vs_"+tt.eventType, func(t *testing.T) { + result := matchPattern(tt.pattern, tt.eventType) + if result != tt.expected { + t.Errorf("matchPattern(%q, %q) = %v, expected %v", + tt.pattern, tt.eventType, result, tt.expected) + } + }) + } +} + +func TestSubscriptionManager(t *testing.T) { + manager := newSubscriptionManager() + + // Create test handler + called := false + handler := EventHandlerFunc(func(ctx context.Context, event *Event) error { + called = true + return nil + }) + + // Test Subscribe + id, err := manager.Subscribe("public.users.*", handler) + if err != nil { + t.Fatalf("Subscribe failed: %v", err) + } + if id == "" { + t.Fatal("Expected non-empty subscription ID") + } + + // Test GetMatching + handlers := manager.GetMatching("public.users.create") + if len(handlers) != 1 { + t.Fatalf("Expected 1 handler, got %d", len(handlers)) + } + + // Test handler execution + event := NewEvent(EventSourceDatabase, "public.users.create") + if err := handlers[0].Handle(context.Background(), event); err != nil { + t.Fatalf("Handler execution failed: %v", err) + } + if !called { + t.Error("Expected handler to be called") + } + + // Test Count + if manager.Count() != 1 { + t.Errorf("Expected count 1, got %d", manager.Count()) + } + + // Test Unsubscribe + if err := manager.Unsubscribe(id); err != nil { + t.Fatalf("Unsubscribe failed: %v", err) + } + + // Verify unsubscribed + handlers = manager.GetMatching("public.users.create") + if len(handlers) != 0 { + t.Errorf("Expected 0 handlers after unsubscribe, got %d", len(handlers)) + } + if manager.Count() != 0 { + t.Errorf("Expected count 0 after unsubscribe, got %d", manager.Count()) + } +} + +func TestSubscriptionManagerMultipleHandlers(t *testing.T) { + manager := newSubscriptionManager() + + called1 := false + handler1 := EventHandlerFunc(func(ctx context.Context, event *Event) error { + called1 = true + return nil + }) + + called2 := false + handler2 := EventHandlerFunc(func(ctx context.Context, event *Event) error { + called2 = true + return nil + }) + + // Subscribe multiple handlers + id1, _ := manager.Subscribe("public.users.*", handler1) + id2, _ := manager.Subscribe("*.users.*", handler2) + + // Both should match + handlers := manager.GetMatching("public.users.create") + if len(handlers) != 2 { + t.Fatalf("Expected 2 handlers, got %d", len(handlers)) + } + + // Execute all handlers + event := NewEvent(EventSourceDatabase, "public.users.create") + for _, h := range handlers { + h.Handle(context.Background(), event) + } + + if !called1 || !called2 { + t.Error("Expected both handlers to be called") + } + + // Unsubscribe one + manager.Unsubscribe(id1) + handlers = manager.GetMatching("public.users.create") + if len(handlers) != 1 { + t.Errorf("Expected 1 handler after unsubscribe, got %d", len(handlers)) + } + + // Unsubscribe remaining + manager.Unsubscribe(id2) + if manager.Count() != 0 { + t.Errorf("Expected count 0 after all unsubscribe, got %d", manager.Count()) + } +} + +func TestSubscriptionManagerConcurrency(t *testing.T) { + manager := newSubscriptionManager() + + handler := EventHandlerFunc(func(ctx context.Context, event *Event) error { + return nil + }) + + // Subscribe and unsubscribe concurrently + done := make(chan bool, 10) + for i := 0; i < 10; i++ { + go func() { + defer func() { done <- true }() + id, _ := manager.Subscribe("test.*", handler) + manager.GetMatching("test.event") + manager.Unsubscribe(id) + }() + } + + // Wait for all goroutines + for i := 0; i < 10; i++ { + <-done + } + + // Should have no subscriptions left + if manager.Count() != 0 { + t.Errorf("Expected count 0 after concurrent operations, got %d", manager.Count()) + } +} + +func TestSubscriptionManagerUnsubscribeNonExistent(t *testing.T) { + manager := newSubscriptionManager() + + // Try to unsubscribe a non-existent ID + err := manager.Unsubscribe("non-existent-id") + if err == nil { + t.Error("Expected error when unsubscribing non-existent ID") + } +} + +func TestSubscriptionIDGeneration(t *testing.T) { + manager := newSubscriptionManager() + + handler := EventHandlerFunc(func(ctx context.Context, event *Event) error { + return nil + }) + + // Subscribe multiple times and ensure unique IDs + ids := make(map[SubscriptionID]bool) + for i := 0; i < 100; i++ { + id, _ := manager.Subscribe("test.*", handler) + if ids[id] { + t.Fatalf("Duplicate subscription ID: %s", id) + } + ids[id] = true + } +} + +func TestEventHandlerFunc(t *testing.T) { + called := false + var receivedEvent *Event + + handler := EventHandlerFunc(func(ctx context.Context, event *Event) error { + called = true + receivedEvent = event + return nil + }) + + event := NewEvent(EventSourceDatabase, "test.event") + err := handler.Handle(context.Background(), event) + + if err != nil { + t.Errorf("Expected no error, got %v", err) + } + if !called { + t.Error("Expected handler to be called") + } + if receivedEvent != event { + t.Error("Expected to receive the same event") + } +} + +func TestSubscriptionManagerPatternPriority(t *testing.T) { + manager := newSubscriptionManager() + + // More specific patterns should still match + specificCalled := false + genericCalled := false + + manager.Subscribe("public.users.create", EventHandlerFunc(func(ctx context.Context, event *Event) error { + specificCalled = true + return nil + })) + + manager.Subscribe("*", EventHandlerFunc(func(ctx context.Context, event *Event) error { + genericCalled = true + return nil + })) + + handlers := manager.GetMatching("public.users.create") + if len(handlers) != 2 { + t.Fatalf("Expected 2 matching handlers, got %d", len(handlers)) + } + + // Execute all handlers + event := NewEvent(EventSourceDatabase, "public.users.create") + for _, h := range handlers { + h.Handle(context.Background(), event) + } + + if !specificCalled || !genericCalled { + t.Error("Expected both specific and generic handlers to be called") + } +} diff --git a/pkg/eventbroker/worker_pool.go b/pkg/eventbroker/worker_pool.go new file mode 100644 index 0000000..f3e5952 --- /dev/null +++ b/pkg/eventbroker/worker_pool.go @@ -0,0 +1,141 @@ +package eventbroker + +import ( + "context" + "sync" + "sync/atomic" + + "github.com/bitechdev/ResolveSpec/pkg/logger" +) + +// workerPool manages a pool of workers for async event processing +type workerPool struct { + workerCount int + bufferSize int + eventQueue chan *Event + processor func(context.Context, *Event) error + + activeWorkers atomic.Int32 + isRunning atomic.Bool + stopCh chan struct{} + wg sync.WaitGroup +} + +// newWorkerPool creates a new worker pool +func newWorkerPool(workerCount, bufferSize int, processor func(context.Context, *Event) error) *workerPool { + return &workerPool{ + workerCount: workerCount, + bufferSize: bufferSize, + eventQueue: make(chan *Event, bufferSize), + processor: processor, + stopCh: make(chan struct{}), + } +} + +// Start starts the worker pool +func (wp *workerPool) Start() { + if wp.isRunning.Load() { + return + } + + wp.isRunning.Store(true) + + // Start workers + for i := 0; i < wp.workerCount; i++ { + wp.wg.Add(1) + go wp.worker(i) + } + + logger.Info("Worker pool started with %d workers", wp.workerCount) +} + +// Stop stops the worker pool gracefully +func (wp *workerPool) Stop(ctx context.Context) error { + if !wp.isRunning.Load() { + return nil + } + + wp.isRunning.Store(false) + + // Close event queue to signal workers + close(wp.eventQueue) + + // Wait for workers to finish with context timeout + done := make(chan struct{}) + go func() { + wp.wg.Wait() + close(done) + }() + + select { + case <-done: + logger.Info("Worker pool stopped gracefully") + return nil + case <-ctx.Done(): + logger.Warn("Worker pool stop timed out, some events may be lost") + return ctx.Err() + } +} + +// Submit submits an event to the queue +func (wp *workerPool) Submit(ctx context.Context, event *Event) error { + if !wp.isRunning.Load() { + return ErrWorkerPoolStopped + } + + select { + case wp.eventQueue <- event: + return nil + case <-ctx.Done(): + return ctx.Err() + default: + return ErrQueueFull + } +} + +// worker is a worker goroutine that processes events from the queue +func (wp *workerPool) worker(id int) { + defer wp.wg.Done() + + logger.Debug("Worker %d started", id) + + for event := range wp.eventQueue { + wp.activeWorkers.Add(1) + + // Process event with background context (detached from original request) + ctx := context.Background() + if err := wp.processor(ctx, event); err != nil { + logger.Error("Worker %d failed to process event %s: %v", id, event.ID, err) + } + + wp.activeWorkers.Add(-1) + } + + logger.Debug("Worker %d stopped", id) +} + +// QueueSize returns the current queue size +func (wp *workerPool) QueueSize() int { + return len(wp.eventQueue) +} + +// ActiveWorkers returns the number of currently active workers +func (wp *workerPool) ActiveWorkers() int { + return int(wp.activeWorkers.Load()) +} + +// Error definitions +var ( + ErrWorkerPoolStopped = &BrokerError{Code: "worker_pool_stopped", Message: "worker pool is stopped"} + ErrQueueFull = &BrokerError{Code: "queue_full", Message: "event queue is full"} +) + +// BrokerError represents an error from the event broker +type BrokerError struct { + Code string + Message string +} + +func (e *BrokerError) Error() string { + return e.Message +} diff --git a/pkg/funcspec/function_api_test.go b/pkg/funcspec/function_api_test.go index c653a43..defc30a 100644 --- a/pkg/funcspec/function_api_test.go +++ b/pkg/funcspec/function_api_test.go @@ -70,6 +70,10 @@ func (m *MockDatabase) RunInTransaction(ctx context.Context, fn func(common.Data return fn(m) } +func (m *MockDatabase) GetUnderlyingDB() interface{} { + return m +} + // MockResult implements common.Result interface for testing type MockResult struct { rows int64 diff --git a/pkg/metrics/interfaces.go b/pkg/metrics/interfaces.go index 4c8b62c..9d11586 100644 --- a/pkg/metrics/interfaces.go +++ b/pkg/metrics/interfaces.go @@ -30,6 +30,15 @@ type Provider interface { // UpdateCacheSize updates the cache size metric UpdateCacheSize(provider string, size int64) + // RecordEventPublished records an event publication + RecordEventPublished(source, eventType string) + + // RecordEventProcessed records an event processing with its status + RecordEventProcessed(source, eventType, status string, duration time.Duration) + + // UpdateEventQueueSize updates the event queue size metric + UpdateEventQueueSize(size int64) + // Handler returns an HTTP handler for exposing metrics (e.g., /metrics endpoint) Handler() http.Handler } @@ -59,9 +68,13 @@ func (n *NoOpProvider) IncRequestsInFlight() func (n *NoOpProvider) DecRequestsInFlight() {} func (n *NoOpProvider) RecordDBQuery(operation, table string, duration time.Duration, err error) { } -func (n *NoOpProvider) RecordCacheHit(provider string) {} -func (n *NoOpProvider) RecordCacheMiss(provider string) {} -func (n *NoOpProvider) UpdateCacheSize(provider string, size int64) {} +func (n *NoOpProvider) RecordCacheHit(provider string) {} +func (n *NoOpProvider) RecordCacheMiss(provider string) {} +func (n *NoOpProvider) UpdateCacheSize(provider string, size int64) {} +func (n *NoOpProvider) RecordEventPublished(source, eventType string) {} +func (n *NoOpProvider) RecordEventProcessed(source, eventType, status string, duration time.Duration) { +} +func (n *NoOpProvider) UpdateEventQueueSize(size int64) {} func (n *NoOpProvider) Handler() http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNotFound)