diff --git a/.gitignore b/.gitignore index a37e8ec..8d2f757 100644 --- a/.gitignore +++ b/.gitignore @@ -25,3 +25,4 @@ go.work.sum .env bin/ test.db +testserver diff --git a/Makefile b/Makefile index de7dd55..6c0942f 100644 --- a/Makefile +++ b/Makefile @@ -13,6 +13,51 @@ test-integration: # Run all tests (unit + integration) test: test-unit test-integration +release-version: ## Create and push a release with specific version (use: make release-version VERSION=v1.2.3) + @if [ -z "$(VERSION)" ]; then \ + echo "Error: VERSION is required. Usage: make release-version VERSION=v1.2.3"; \ + exit 1; \ + fi + @version="$(VERSION)"; \ + if ! echo "$$version" | grep -q "^v"; then \ + version="v$$version"; \ + fi; \ + echo "Creating release: $$version"; \ + latest_tag=$$(git describe --tags --abbrev=0 2>/dev/null || echo ""); \ + if [ -z "$$latest_tag" ]; then \ + commit_logs=$$(git log --pretty=format:"- %s" --no-merges); \ + else \ + commit_logs=$$(git log "$${latest_tag}..HEAD" --pretty=format:"- %s" --no-merges); \ + fi; \ + if [ -z "$$commit_logs" ]; then \ + tag_message="Release $$version"; \ + else \ + tag_message="Release $$version\n\n$$commit_logs"; \ + fi; \ + git tag -a "$$version" -m "$$tag_message"; \ + git push origin "$$version"; \ + echo "Tag $$version created and pushed to remote repository." + + +lint: ## Run linter + @echo "Running linter..." + @if command -v golangci-lint > /dev/null; then \ + golangci-lint run --config=.golangci.json; \ + else \ + echo "golangci-lint not installed. Install with: go install github.com/golangci/golangci-lint/cmd/golangci-lint@latest"; \ + exit 1; \ + fi + +lintfix: ## Run linter + @echo "Running linter..." + @if command -v golangci-lint > /dev/null; then \ + golangci-lint run --config=.golangci.json --fix; \ + else \ + echo "golangci-lint not installed. Install with: go install github.com/golangci/golangci-lint/cmd/golangci-lint@latest"; \ + exit 1; \ + fi + + # Start PostgreSQL for integration tests docker-up: @echo "Starting PostgreSQL container..." diff --git a/cmd/testserver/main.go b/cmd/testserver/main.go index 02574b3..4d53334 100644 --- a/cmd/testserver/main.go +++ b/cmd/testserver/main.go @@ -1,8 +1,8 @@ package main import ( + "fmt" "log" - "net/http" "os" "time" @@ -67,9 +67,36 @@ func main() { // Setup routes using new SetupMuxRoutes function (without authentication) resolvespec.SetupMuxRoutes(r, handler, nil) - // Create graceful server with configuration - srv := server.NewGracefulServer(server.Config{ - Addr: cfg.Server.Addr, + // Create server manager + mgr := server.NewManager() + + // Parse host and port from addr + host := "" + port := 8080 + if cfg.Server.Addr != "" { + // Parse addr (format: ":8080" or "localhost:8080") + if cfg.Server.Addr[0] == ':' { + // Just port + _, err := fmt.Sscanf(cfg.Server.Addr, ":%d", &port) + if err != nil { + logger.Error("Invalid server address: %s", cfg.Server.Addr) + os.Exit(1) + } + } else { + // Host and port + _, err := fmt.Sscanf(cfg.Server.Addr, "%[^:]:%d", &host, &port) + if err != nil { + logger.Error("Invalid server address: %s", cfg.Server.Addr) + os.Exit(1) + } + } + } + + // Add server instance + _, err = mgr.Add(server.Config{ + Name: "api", + Host: host, + Port: port, Handler: r, ShutdownTimeout: cfg.Server.ShutdownTimeout, DrainTimeout: cfg.Server.DrainTimeout, @@ -77,11 +104,15 @@ func main() { WriteTimeout: cfg.Server.WriteTimeout, IdleTimeout: cfg.Server.IdleTimeout, }) + if err != nil { + logger.Error("Failed to add server: %v", err) + os.Exit(1) + } // Start server with graceful shutdown logger.Info("Starting server on %s", cfg.Server.Addr) - if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { - logger.Error("Server failed to start: %v", err) + if err := mgr.ServeWithGracefulShutdown(); err != nil { + logger.Error("Server failed: %v", err) os.Exit(1) } } diff --git a/go.mod b/go.mod index 2033ea6..153ad93 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ toolchain go1.24.6 require ( github.com/DATA-DOG/go-sqlmock v1.5.2 github.com/bradfitz/gomemcache v0.0.0-20250403215159-8d39553ac7cf + github.com/eclipse/paho.mqtt.golang v1.5.1 github.com/getsentry/sentry-go v0.40.0 github.com/glebarez/sqlite v1.11.0 github.com/google/uuid v1.6.0 @@ -14,6 +15,8 @@ require ( github.com/gorilla/websocket v1.5.3 github.com/jackc/pgx/v5 v5.6.0 github.com/klauspost/compress v1.18.0 + github.com/mochi-mqtt/server/v2 v2.7.9 + github.com/nats-io/nats.go v1.48.0 github.com/prometheus/client_golang v1.23.2 github.com/redis/go-redis/v9 v9.17.1 github.com/spf13/viper v1.21.0 @@ -34,6 +37,7 @@ require ( golang.org/x/crypto v0.43.0 golang.org/x/time v0.14.0 gorm.io/driver/postgres v1.6.0 + gorm.io/driver/sqlite v1.6.0 gorm.io/gorm v1.30.0 ) @@ -58,7 +62,6 @@ require ( github.com/docker/go-units v0.5.0 // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/ebitengine/purego v0.8.4 // indirect - github.com/eclipse/paho.mqtt.golang v1.5.1 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/fsnotify/fsnotify v1.9.0 // indirect github.com/glebarez/go-sqlite v1.21.2 // indirect @@ -83,9 +86,10 @@ require ( github.com/moby/sys/user v0.4.0 // indirect github.com/moby/sys/userns v0.1.0 // indirect github.com/moby/term v0.5.0 // indirect - github.com/mochi-mqtt/server/v2 v2.7.9 // indirect github.com/morikuni/aec v1.0.0 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/nats-io/nkeys v0.4.11 // indirect + github.com/nats-io/nuid v1.0.1 // indirect github.com/ncruces/go-strftime v1.0.0 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.1 // indirect @@ -133,7 +137,6 @@ require ( google.golang.org/grpc v1.75.0 // indirect google.golang.org/protobuf v1.36.8 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - gorm.io/driver/sqlite v1.6.0 // indirect modernc.org/libc v1.67.0 // indirect modernc.org/mathutil v1.7.1 // indirect modernc.org/memory v1.11.0 // indirect diff --git a/go.sum b/go.sum index bb8ab68..7c0571a 100644 --- a/go.sum +++ b/go.sum @@ -101,6 +101,8 @@ github.com/jackc/pgx/v5 v5.6.0 h1:SWJzexBzPL5jb0GEsrPMLIsi/3jOo7RHlzTjcAeDrPY= github.com/jackc/pgx/v5 v5.6.0/go.mod h1:DNZ/vlrUnhWCoFGxHAG8U2ljioxukquj7utPDgtQdTw= github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/jinzhu/copier v0.3.5 h1:GlvfUwHk62RokgqVNvYsku0TATCF7bAHVwEXoBh3iJg= +github.com/jinzhu/copier v0.3.5/go.mod h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg= github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= @@ -144,6 +146,12 @@ github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/nats-io/nats.go v1.48.0 h1:pSFyXApG+yWU/TgbKCjmm5K4wrHu86231/w84qRVR+U= +github.com/nats-io/nats.go v1.48.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g= +github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0= +github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOFAw7w= github.com/ncruces/go-strftime v1.0.0/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= @@ -314,8 +322,6 @@ gorm.io/driver/postgres v1.6.0 h1:2dxzU8xJ+ivvqTRph34QX+WrRaJlmfyPqXmoGVjMBa4= gorm.io/driver/postgres v1.6.0/go.mod h1:vUw0mrGgrTK+uPHEhAdV4sfFELrByKVGnaVRkXDhtWo= gorm.io/driver/sqlite v1.6.0 h1:WHRRrIiulaPiPFmDcod6prc4l2VGVWHz80KspNsxSfQ= gorm.io/driver/sqlite v1.6.0/go.mod h1:AO9V1qIQddBESngQUKWL9yoH93HIeA1X6V633rBwyT8= -gorm.io/gorm v1.25.12 h1:I0u8i2hWQItBq1WfE0o2+WuL9+8L21K9e2HHSTE/0f8= -gorm.io/gorm v1.25.12/go.mod h1:xh7N7RHfYlNc5EmcI/El95gXusucDrQnHXe0+CgWcLQ= gorm.io/gorm v1.30.0 h1:qbT5aPv1UH8gI99OsRlvDToLxW5zR7FzS9acZDOZcgs= gorm.io/gorm v1.30.0/go.mod h1:8Z33v652h4//uMA76KjeDH8mJXPm1QNCYrMeatR0DOE= gotest.tools/v3 v3.5.2 h1:7koQfIKdy+I8UTetycgUqXWSDwpgv193Ka+qRsmBY8Q= diff --git a/pkg/common/sql_helpers.go b/pkg/common/sql_helpers.go index 2036dfb..6730db6 100644 --- a/pkg/common/sql_helpers.go +++ b/pkg/common/sql_helpers.go @@ -486,9 +486,10 @@ func extractTableAndColumn(cond string) (table string, column string) { return "", "" } -// extractUnqualifiedColumnName extracts the column name from an unqualified condition +// Unused: extractUnqualifiedColumnName extracts the column name from an unqualified condition // For example: "rid_parentmastertaskitem is null" returns "rid_parentmastertaskitem" // "status = 'active'" returns "status" +// nolint:unused func extractUnqualifiedColumnName(cond string) string { // Common SQL operators operators := []string{" = ", " != ", " <> ", " > ", " >= ", " < ", " <= ", " LIKE ", " like ", " IN ", " in ", " IS ", " is ", " NOT ", " not "} diff --git a/pkg/eventbroker/IMPLEMENTATION_PLAN.md b/pkg/eventbroker/IMPLEMENTATION_PLAN.md new file mode 100644 index 0000000..1633a7c --- /dev/null +++ b/pkg/eventbroker/IMPLEMENTATION_PLAN.md @@ -0,0 +1,353 @@ +# Event Broker System Implementation Plan + +## Overview +Implement a comprehensive event handler/broker system for ResolveSpec that follows existing architectural patterns (Provider interface, Hook system, Config management, Graceful shutdown). + +## Requirements Met +- ✅ Events with sources (database, websocket, frontend, system) +- ✅ Event statuses (pending, processing, completed, failed) +- ✅ Timestamps, JSON payloads, user IDs, session IDs +- ✅ Program instance IDs for tracking server instances +- ✅ Both sync and async processing modes +- ✅ Multiple provider backends (in-memory, Redis, NATS, database) +- ✅ Cross-instance pub/sub support + +## Architecture + +### Core Components + +**Event Structure** (with full metadata): +```go +type Event struct { + ID string // UUID + Source EventSource // database, websocket, system, frontend + Type string // Pattern: schema.entity.operation + Status EventStatus // pending, processing, completed, failed + Payload json.RawMessage // JSON payload + UserID int + SessionID string + InstanceID string // Server instance identifier + Schema string + Entity string + Operation string // create, update, delete, read + CreatedAt time.Time + ProcessedAt *time.Time + CompletedAt *time.Time + Error string + Metadata map[string]interface{} + RetryCount int +} +``` + +**Provider Pattern** (like cache.Provider): +```go +type Provider interface { + Store(ctx context.Context, event *Event) error + Get(ctx context.Context, id string) (*Event, error) + List(ctx context.Context, filter *EventFilter) ([]*Event, error) + UpdateStatus(ctx context.Context, id string, status EventStatus, error string) error + Stream(ctx context.Context, pattern string) (<-chan *Event, error) + Publish(ctx context.Context, event *Event) error + Close() error + Stats(ctx context.Context) (*ProviderStats, error) +} +``` + +**Broker Interface**: +```go +type Broker interface { + Publish(ctx context.Context, event *Event) error // Mode-dependent + PublishSync(ctx context.Context, event *Event) error // Blocks + PublishAsync(ctx context.Context, event *Event) error // Non-blocking + Subscribe(pattern string, handler EventHandler) (SubscriptionID, error) + Unsubscribe(id SubscriptionID) error + Start(ctx context.Context) error + Stop(ctx context.Context) error + Stats(ctx context.Context) (*BrokerStats, error) +} +``` + +## Implementation Steps + +### Phase 1: Core Foundation (Files: 1-5) + +**1. Create `pkg/eventbroker/event.go`** +- Event struct with all required fields (status, timestamps, user, instance ID, etc.) +- EventSource enum (database, websocket, frontend, system, internal) +- EventStatus enum (pending, processing, completed, failed) +- Helper: `EventType(schema, entity, operation string) string` +- Helper: `NewEvent()` constructor with UUID generation + +**2. Create `pkg/eventbroker/provider.go`** +- Provider interface definition +- EventFilter struct for queries +- ProviderStats struct + +**3. Create `pkg/eventbroker/handler.go`** +- EventHandler interface +- EventHandlerFunc adapter type + +**4. Create `pkg/eventbroker/broker.go`** +- Broker interface definition +- EventBroker struct implementation +- ProcessingMode enum (sync, async) +- Options struct with functional options (WithProvider, WithMode, WithWorkerCount, etc.) +- NewBroker() constructor +- Sync processing implementation + +**5. Create `pkg/eventbroker/subscription.go`** +- Pattern matching using glob syntax (e.g., "public.users.*", "*.*.create") +- subscriptionManager struct +- SubscriptionID type +- Subscribe/Unsubscribe logic + +### Phase 2: Configuration & Integration (Files: 6-8) + +**6. Create `pkg/eventbroker/config.go`** +- EventBrokerConfig struct +- RedisConfig, NATSConfig, DatabaseConfig structs +- RetryPolicyConfig struct + +**7. Update `pkg/config/config.go`** +- Add `EventBroker EventBrokerConfig` field to Config struct + +**8. Update `pkg/config/manager.go`** +- Add event broker defaults to `setDefaults()`: + ```go + v.SetDefault("event_broker.enabled", false) + v.SetDefault("event_broker.provider", "memory") + v.SetDefault("event_broker.mode", "async") + v.SetDefault("event_broker.worker_count", 10) + v.SetDefault("event_broker.buffer_size", 1000) + ``` + +### Phase 3: Memory Provider (Files: 9) + +**9. Create `pkg/eventbroker/provider_memory.go`** +- MemoryProvider struct with mutex-protected map +- In-memory event storage +- Pattern matching for subscriptions +- Channel-based streaming for real-time events +- LRU eviction when max size reached +- Cleanup goroutine for old completed events +- **Note**: Single-instance only (no cross-instance pub/sub) + +### Phase 4: Async Processing (Update File: 4) + +**10. Update `pkg/eventbroker/broker.go`** (add async support) +- workerPool struct with configurable worker count +- Buffered channel for event queue +- Worker goroutines that process events +- PublishAsync() queues to channel +- Graceful shutdown: stop accepting events, drain queue, wait for workers +- Retry logic with exponential backoff + +### Phase 5: Hook Integration (Files: 11) + +**11. Create `pkg/eventbroker/hooks.go`** +- `RegisterCRUDHooks(broker Broker, hookRegistry *restheadspec.HookRegistry)` +- Registers AfterCreate, AfterUpdate, AfterDelete, AfterRead hooks +- Extracts UserContext from hook context +- Creates Event with proper metadata +- Calls `broker.PublishAsync()` to not block CRUD operations + +### Phase 6: Global Singleton & Factory (Files: 12-13) + +**12. Create `pkg/eventbroker/eventbroker.go`** +- Global `defaultBroker` variable +- `Initialize(config *config.Config) error` - creates broker from config +- `SetDefaultBroker(broker Broker)` +- `GetDefaultBroker() Broker` +- Helper functions: `Publish()`, `PublishAsync()`, `PublishSync()`, `Subscribe()` +- `RegisterShutdown(broker Broker)` - registers with server.RegisterShutdownCallback() + +**13. Create `pkg/eventbroker/factory.go`** +- `NewProviderFromConfig(config EventBrokerConfig) (Provider, error)` +- Provider selection logic (memory, redis, nats, database) +- Returns appropriate provider based on config + +### Phase 7: Redis Provider (Files: 14) + +**14. Create `pkg/eventbroker/provider_redis.go`** +- RedisProvider using Redis Streams (XADD, XREAD, XGROUP) +- Consumer group for distributed processing +- Cross-instance pub/sub support +- Stream(pattern) subscribes to consumer group +- Publish() uses XADD to append to stream +- Graceful shutdown: acknowledge pending messages + +**Dependencies**: `github.com/redis/go-redis/v9` + +### Phase 8: NATS Provider (Files: 15) + +**15. Create `pkg/eventbroker/provider_nats.go`** +- NATSProvider using NATS JetStream +- Subject-based routing: `events.{source}.{type}` +- Wildcard subscriptions support +- Durable consumers for replay +- At-least-once delivery semantics + +**Dependencies**: `github.com/nats-io/nats.go` + +### Phase 9: Database Provider (Files: 16) + +**16. Create `pkg/eventbroker/provider_database.go`** +- DatabaseProvider using `common.Database` interface +- Table schema creation (events table with indexes) +- Polling-based event consumption (configurable interval) +- Full SQL query support via List(filter) +- Transaction support for atomic operations +- Good for audit trails and debugging + +### Phase 10: Metrics Integration (Files: 17) + +**17. Create `pkg/eventbroker/metrics.go`** +- Integrate with existing `metrics.Provider` +- Record metrics: + - `eventbroker_events_published_total{source, type}` + - `eventbroker_events_processed_total{source, type, status}` + - `eventbroker_event_processing_duration_seconds{source, type}` + - `eventbroker_queue_size` + - `eventbroker_workers_active` + +**18. Update `pkg/metrics/interfaces.go`** +- Add methods to Provider interface: + ```go + RecordEventPublished(source, eventType string) + RecordEventProcessed(source, eventType, status string, duration time.Duration) + UpdateEventQueueSize(size int64) + ``` + +### Phase 11: Testing & Examples (Files: 19-20) + +**19. Create `pkg/eventbroker/eventbroker_test.go`** +- Unit tests for Event marshaling +- Pattern matching tests +- MemoryProvider tests +- Sync vs async mode tests +- Concurrent publish/subscribe tests +- Graceful shutdown tests + +**20. Create `pkg/eventbroker/example_usage.go`** +- Basic publish example +- Subscribe with patterns example +- Hook integration example +- Multiple handlers example +- Error handling example + +## Integration Points + +### Hook System Integration +```go +// In application initialization (e.g., main.go) +eventbroker.RegisterCRUDHooks(broker, handler.Hooks()) +``` + +This automatically publishes events for all CRUD operations: +- `schema.entity.create` after inserts +- `schema.entity.update` after updates +- `schema.entity.delete` after deletes +- `schema.entity.read` after reads + +### Shutdown Integration +```go +// In application initialization +eventbroker.RegisterShutdown(broker) +``` + +Ensures event broker flushes pending events before shutdown. + +### Configuration Example +```yaml +event_broker: + enabled: true + provider: redis # memory, redis, nats, database + mode: async # sync, async + worker_count: 10 + buffer_size: 1000 + instance_id: "${HOSTNAME}" + + redis: + stream_name: "resolvespec:events" + consumer_group: "resolvespec-workers" + host: "localhost" + port: 6379 +``` + +## Usage Examples + +### Publishing Custom Events +```go +// WebSocket event +event := &eventbroker.Event{ + Source: eventbroker.EventSourceWebSocket, + Type: "chat.message", + Payload: json.RawMessage(`{"room": "lobby", "msg": "Hello"}`), + UserID: userID, + SessionID: sessionID, +} +eventbroker.PublishAsync(ctx, event) +``` + +### Subscribing to Events +```go +// Subscribe to all user creation events +eventbroker.Subscribe("public.users.create", eventbroker.EventHandlerFunc( + func(ctx context.Context, event *eventbroker.Event) error { + log.Printf("New user created: %s", event.Payload) + // Send welcome email, update cache, etc. + return nil + }, +)) + +// Subscribe to all events from database +eventbroker.Subscribe("*", eventbroker.EventHandlerFunc( + func(ctx context.Context, event *eventbroker.Event) error { + if event.Source == eventbroker.EventSourceDatabase { + // Audit logging + } + return nil + }, +)) +``` + +## Critical Files Reference + +**Patterns to Follow**: +- `pkg/cache/provider.go` - Provider interface pattern +- `pkg/restheadspec/hooks.go` - Hook system integration +- `pkg/config/manager.go` - Configuration pattern +- `pkg/server/shutdown.go` - Shutdown callbacks + +**Files to Modify**: +- `pkg/config/config.go` - Add EventBroker field +- `pkg/config/manager.go` - Add defaults +- `pkg/metrics/interfaces.go` - Add event broker metrics + +**New Package**: +- `pkg/eventbroker/` (20 files total) + +## Provider Feature Matrix + +| Feature | Memory | Redis | NATS | Database | +|---------|--------|-------|------|----------| +| Persistence | ❌ | ✅ | ✅ | ✅ | +| Cross-instance | ❌ | ✅ | ✅ | ✅ | +| Real-time | ✅ | ✅ | ✅ | ⚠️ (polling) | +| Query history | Limited | Limited | ✅ (replay) | ✅ (SQL) | +| External deps | None | Redis | NATS | None | +| Complexity | Low | Medium | Medium | Low | + +## Implementation Order Priority + +1. **Core + Memory Provider** (Phase 1-3) - Functional in-process event system +2. **Async + Hooks** (Phase 4-5) - Non-blocking event dispatch integrated with CRUD +3. **Config + Singleton** (Phase 6) - Easy initialization and usage +4. **Redis Provider** (Phase 7) - Production-ready distributed events +5. **Metrics** (Phase 10) - Observability +6. **NATS/Database** (Phase 8-9) - Alternative backends +7. **Tests + Examples** (Phase 11) - Documentation and reliability + +## Next Steps + +After approval, implement in order of phases. Each phase builds on previous phases and can be tested independently. diff --git a/pkg/eventbroker/README.md b/pkg/eventbroker/README.md index aed4861..333315c 100644 --- a/pkg/eventbroker/README.md +++ b/pkg/eventbroker/README.md @@ -172,12 +172,13 @@ event_broker: provider: memory ``` -### Redis Provider (Future) +### Redis Provider Best for: Production, multi-instance deployments -- **Pros**: Persistent, cross-instance pub/sub, reliable -- **Cons**: Requires Redis +- **Pros**: Persistent, cross-instance pub/sub, reliable, Redis Streams support +- **Cons**: Requires Redis server +- **Status**: ✅ Implemented ```yaml event_broker: @@ -185,16 +186,20 @@ event_broker: redis: stream_name: "resolvespec:events" consumer_group: "resolvespec-workers" + max_len: 10000 host: "localhost" port: 6379 + password: "" + db: 0 ``` -### NATS Provider (Future) +### NATS Provider Best for: High-performance, low-latency requirements -- **Pros**: Very fast, built-in clustering, durable +- **Pros**: Very fast, built-in clustering, durable, JetStream support - **Cons**: Requires NATS server +- **Status**: ✅ Implemented ```yaml event_broker: @@ -202,14 +207,17 @@ event_broker: nats: url: "nats://localhost:4222" stream_name: "RESOLVESPEC_EVENTS" + storage: "file" # or "memory" + max_age: "24h" ``` -### Database Provider (Future) +### Database Provider Best for: Audit trails, event replay, SQL queries - **Pros**: No additional infrastructure, full SQL query support, PostgreSQL NOTIFY for real-time -- **Cons**: Slower than Redis/NATS +- **Cons**: Slower than Redis/NATS, requires database connection +- **Status**: ✅ Implemented ```yaml event_broker: @@ -217,6 +225,7 @@ event_broker: database: table_name: "events" channel: "resolvespec_events" + poll_interval: "1s" ``` ## Processing Modes @@ -314,14 +323,25 @@ See `example_usage.go` for comprehensive examples including: └─────────────────┘ ``` +## Implemented Features + +- [x] Memory Provider (in-process, single-instance) +- [x] Redis Streams Provider (distributed, persistent) +- [x] NATS JetStream Provider (distributed, high-performance) +- [x] Database Provider with PostgreSQL NOTIFY (SQL-queryable, audit-friendly) +- [x] Sync and Async processing modes +- [x] Pattern-based subscriptions +- [x] Hook integration for automatic CRUD events +- [x] Retry policy with exponential backoff +- [x] Graceful shutdown + ## Future Enhancements -- [ ] Database Provider with PostgreSQL NOTIFY -- [ ] Redis Streams Provider -- [ ] NATS JetStream Provider -- [ ] Event replay functionality -- [ ] Dead letter queue -- [ ] Event filtering at provider level -- [ ] Batch publishing -- [ ] Event compression -- [ ] Schema versioning +- [ ] Event replay functionality from specific timestamp +- [ ] Dead letter queue for failed events +- [ ] Event filtering at provider level for performance +- [ ] Batch publishing support +- [ ] Event compression for large payloads +- [ ] Schema versioning and migration +- [ ] Event streaming to external systems (Kafka, RabbitMQ) +- [ ] Event aggregation and analytics diff --git a/pkg/eventbroker/eventbroker.go b/pkg/eventbroker/eventbroker.go index d7e9ee2..3d9428b 100644 --- a/pkg/eventbroker/eventbroker.go +++ b/pkg/eventbroker/eventbroker.go @@ -7,7 +7,6 @@ import ( "github.com/bitechdev/ResolveSpec/pkg/config" "github.com/bitechdev/ResolveSpec/pkg/logger" - "github.com/bitechdev/ResolveSpec/pkg/server" ) var ( @@ -69,9 +68,6 @@ func Initialize(cfg config.EventBrokerConfig) error { // Set as default SetDefaultBroker(broker) - // Register shutdown callback - RegisterShutdown(broker) - logger.Info("Event broker initialized successfully (provider: %s, mode: %s, instance: %s)", cfg.Provider, cfg.Mode, opts.InstanceID) @@ -151,10 +147,12 @@ func Stats(ctx context.Context) (*BrokerStats, error) { return broker.Stats(ctx) } -// RegisterShutdown registers the broker's shutdown with the server shutdown callbacks -func RegisterShutdown(broker Broker) { - server.RegisterShutdownCallback(func(ctx context.Context) error { +// RegisterShutdown registers the broker's shutdown with a server manager +// Call this from your application initialization code +// Example: serverMgr.RegisterShutdownCallback(eventbroker.MakeShutdownCallback(broker)) +func MakeShutdownCallback(broker Broker) func(context.Context) error { + return func(ctx context.Context) error { logger.Info("Shutting down event broker...") return broker.Stop(ctx) - }) + } } diff --git a/pkg/eventbroker/factory.go b/pkg/eventbroker/factory.go index df35219..560cbc4 100644 --- a/pkg/eventbroker/factory.go +++ b/pkg/eventbroker/factory.go @@ -24,16 +24,34 @@ func NewProviderFromConfig(cfg config.EventBrokerConfig) (Provider, error) { }), nil case "redis": - // Redis provider will be implemented in Phase 8 - return nil, fmt.Errorf("redis provider not yet implemented") + return NewRedisProvider(RedisProviderConfig{ + Host: cfg.Redis.Host, + Port: cfg.Redis.Port, + Password: cfg.Redis.Password, + DB: cfg.Redis.DB, + StreamName: cfg.Redis.StreamName, + ConsumerGroup: cfg.Redis.ConsumerGroup, + ConsumerName: getInstanceID(cfg.InstanceID), + InstanceID: getInstanceID(cfg.InstanceID), + MaxLen: cfg.Redis.MaxLen, + }) case "nats": - // NATS provider will be implemented in Phase 9 - return nil, fmt.Errorf("nats provider not yet implemented") + // NATS provider initialization + // Note: Requires github.com/nats-io/nats.go dependency + return NewNATSProvider(NATSProviderConfig{ + URL: cfg.NATS.URL, + StreamName: cfg.NATS.StreamName, + SubjectPrefix: "events", + InstanceID: getInstanceID(cfg.InstanceID), + MaxAge: cfg.NATS.MaxAge, + Storage: cfg.NATS.Storage, // "file" or "memory" + }) case "database": - // Database provider will be implemented in Phase 7 - return nil, fmt.Errorf("database provider not yet implemented") + // Database provider requires a database connection + // This should be provided externally + return nil, fmt.Errorf("database provider requires a database connection to be configured separately") default: return nil, fmt.Errorf("unknown provider: %s", cfg.Provider) diff --git a/pkg/eventbroker/provider_database.go b/pkg/eventbroker/provider_database.go new file mode 100644 index 0000000..0379100 --- /dev/null +++ b/pkg/eventbroker/provider_database.go @@ -0,0 +1,653 @@ +package eventbroker + +import ( + "context" + "database/sql" + "encoding/json" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/bitechdev/ResolveSpec/pkg/common" + "github.com/bitechdev/ResolveSpec/pkg/logger" +) + +// DatabaseProvider implements Provider interface using SQL database +// Features: +// - Persistent event storage in database table +// - Full SQL query support for event history +// - PostgreSQL NOTIFY/LISTEN for real-time updates (optional) +// - Polling-based consumption with configurable interval +// - Good for audit trails and event replay +type DatabaseProvider struct { + db common.Database + tableName string + channel string // PostgreSQL NOTIFY channel name + pollInterval time.Duration + instanceID string + useNotify bool // Whether to use PostgreSQL NOTIFY + + // Subscriptions + mu sync.RWMutex + subscribers map[string]*dbSubscription + + // Statistics + stats DatabaseProviderStats + + // Lifecycle + stopPolling chan struct{} + wg sync.WaitGroup + isRunning atomic.Bool +} + +// DatabaseProviderStats contains statistics for the database provider +type DatabaseProviderStats struct { + TotalEvents atomic.Int64 + EventsPublished atomic.Int64 + EventsConsumed atomic.Int64 + ActiveSubscribers atomic.Int32 + PollErrors atomic.Int64 +} + +// dbSubscription represents a single database subscription +type dbSubscription struct { + pattern string + ch chan *Event + lastSeenID string + ctx context.Context + cancel context.CancelFunc +} + +// DatabaseProviderConfig configures the database provider +type DatabaseProviderConfig struct { + DB common.Database + TableName string + Channel string // PostgreSQL NOTIFY channel (optional) + PollInterval time.Duration + InstanceID string + UseNotify bool // Enable PostgreSQL NOTIFY/LISTEN +} + +// NewDatabaseProvider creates a new database event provider +func NewDatabaseProvider(cfg DatabaseProviderConfig) (*DatabaseProvider, error) { + // Apply defaults + if cfg.TableName == "" { + cfg.TableName = "events" + } + if cfg.Channel == "" { + cfg.Channel = "resolvespec_events" + } + if cfg.PollInterval == 0 { + cfg.PollInterval = 1 * time.Second + } + + dp := &DatabaseProvider{ + db: cfg.DB, + tableName: cfg.TableName, + channel: cfg.Channel, + pollInterval: cfg.PollInterval, + instanceID: cfg.InstanceID, + useNotify: cfg.UseNotify, + subscribers: make(map[string]*dbSubscription), + stopPolling: make(chan struct{}), + } + + dp.isRunning.Store(true) + + // Create table if it doesn't exist + ctx := context.Background() + if err := dp.createTable(ctx); err != nil { + return nil, fmt.Errorf("failed to create events table: %w", err) + } + + // Start polling goroutine for subscriptions + dp.wg.Add(1) + go dp.pollLoop() + + logger.Info("Database provider initialized (table: %s, poll_interval: %v, notify: %v)", + cfg.TableName, cfg.PollInterval, cfg.UseNotify) + + return dp, nil +} + +// Store stores an event +func (dp *DatabaseProvider) Store(ctx context.Context, event *Event) error { + // Marshal metadata to JSON + metadataJSON, err := json.Marshal(event.Metadata) + if err != nil { + return fmt.Errorf("failed to marshal metadata: %w", err) + } + + // Insert event + query := fmt.Sprintf(` + INSERT INTO %s ( + id, source, type, status, retry_count, error, + payload, user_id, session_id, instance_id, + schema, entity, operation, + created_at, processed_at, completed_at, metadata + ) VALUES ( + $1, $2, $3, $4, $5, $6, + $7, $8, $9, $10, + $11, $12, $13, + $14, $15, $16, $17 + ) + `, dp.tableName) + + _, err = dp.db.Exec(ctx, query, + event.ID, event.Source, event.Type, event.Status, event.RetryCount, event.Error, + event.Payload, event.UserID, event.SessionID, event.InstanceID, + event.Schema, event.Entity, event.Operation, + event.CreatedAt, event.ProcessedAt, event.CompletedAt, metadataJSON, + ) + + if err != nil { + return fmt.Errorf("failed to insert event: %w", err) + } + + dp.stats.TotalEvents.Add(1) + return nil +} + +// Get retrieves an event by ID +func (dp *DatabaseProvider) Get(ctx context.Context, id string) (*Event, error) { + event := &Event{} + var metadataJSON []byte + var processedAt, completedAt sql.NullTime + + // Query into individual fields + query := fmt.Sprintf(` + SELECT id, source, type, status, retry_count, error, + payload, user_id, session_id, instance_id, + schema, entity, operation, + created_at, processed_at, completed_at, metadata + FROM %s + WHERE id = $1 + `, dp.tableName) + + var source, eventType, status, operation string + + // Execute raw query + rows, err := dp.db.GetUnderlyingDB().(interface { + QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) + }).QueryContext(ctx, query, id) + if err != nil { + return nil, fmt.Errorf("failed to query event: %w", err) + } + defer rows.Close() + + if !rows.Next() { + return nil, fmt.Errorf("event not found: %s", id) + } + + if err := rows.Scan( + &event.ID, &source, &eventType, &status, &event.RetryCount, &event.Error, + &event.Payload, &event.UserID, &event.SessionID, &event.InstanceID, + &event.Schema, &event.Entity, &operation, + &event.CreatedAt, &processedAt, &completedAt, &metadataJSON, + ); err != nil { + return nil, fmt.Errorf("failed to scan event: %w", err) + } + + // Set enum values + event.Source = EventSource(source) + event.Type = eventType + event.Status = EventStatus(status) + event.Operation = operation + + // Handle nullable timestamps + if processedAt.Valid { + event.ProcessedAt = &processedAt.Time + } + if completedAt.Valid { + event.CompletedAt = &completedAt.Time + } + + // Unmarshal metadata + if len(metadataJSON) > 0 { + if err := json.Unmarshal(metadataJSON, &event.Metadata); err != nil { + logger.Warn("Failed to unmarshal metadata: %v", err) + } + } + + return event, nil +} + +// List lists events with optional filters +func (dp *DatabaseProvider) List(ctx context.Context, filter *EventFilter) ([]*Event, error) { + query := fmt.Sprintf("SELECT id, source, type, status, retry_count, error, "+ + "payload, user_id, session_id, instance_id, "+ + "schema, entity, operation, "+ + "created_at, processed_at, completed_at, metadata "+ + "FROM %s WHERE 1=1", dp.tableName) + + args := []interface{}{} + argNum := 1 + + // Build WHERE clause + if filter != nil { + if filter.Source != nil { + query += fmt.Sprintf(" AND source = $%d", argNum) + args = append(args, string(*filter.Source)) + argNum++ + } + if filter.Status != nil { + query += fmt.Sprintf(" AND status = $%d", argNum) + args = append(args, string(*filter.Status)) + argNum++ + } + if filter.UserID != nil { + query += fmt.Sprintf(" AND user_id = $%d", argNum) + args = append(args, *filter.UserID) + argNum++ + } + if filter.Schema != "" { + query += fmt.Sprintf(" AND schema = $%d", argNum) + args = append(args, filter.Schema) + argNum++ + } + if filter.Entity != "" { + query += fmt.Sprintf(" AND entity = $%d", argNum) + args = append(args, filter.Entity) + argNum++ + } + if filter.Operation != "" { + query += fmt.Sprintf(" AND operation = $%d", argNum) + args = append(args, filter.Operation) + argNum++ + } + if filter.InstanceID != "" { + query += fmt.Sprintf(" AND instance_id = $%d", argNum) + args = append(args, filter.InstanceID) + argNum++ + } + if filter.StartTime != nil { + query += fmt.Sprintf(" AND created_at >= $%d", argNum) + args = append(args, *filter.StartTime) + argNum++ + } + if filter.EndTime != nil { + query += fmt.Sprintf(" AND created_at <= $%d", argNum) + args = append(args, *filter.EndTime) + argNum++ + } + } + + // Add ORDER BY + query += " ORDER BY created_at DESC" + + // Add LIMIT and OFFSET + if filter != nil { + if filter.Limit > 0 { + query += fmt.Sprintf(" LIMIT $%d", argNum) + args = append(args, filter.Limit) + argNum++ + } + if filter.Offset > 0 { + query += fmt.Sprintf(" OFFSET $%d", argNum) + args = append(args, filter.Offset) + } + } + + // Execute query + rows, err := dp.db.GetUnderlyingDB().(interface { + QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) + }).QueryContext(ctx, query, args...) + if err != nil { + return nil, fmt.Errorf("failed to query events: %w", err) + } + defer rows.Close() + + var results []*Event + for rows.Next() { + event := &Event{} + var source, eventType, status, operation string + var metadataJSON []byte + var processedAt, completedAt sql.NullTime + + err := rows.Scan( + &event.ID, &source, &eventType, &status, &event.RetryCount, &event.Error, + &event.Payload, &event.UserID, &event.SessionID, &event.InstanceID, + &event.Schema, &event.Entity, &operation, + &event.CreatedAt, &processedAt, &completedAt, &metadataJSON, + ) + if err != nil { + logger.Warn("Failed to scan event: %v", err) + continue + } + + // Set enum values + event.Source = EventSource(source) + event.Type = eventType + event.Status = EventStatus(status) + event.Operation = operation + + // Handle nullable timestamps + if processedAt.Valid { + event.ProcessedAt = &processedAt.Time + } + if completedAt.Valid { + event.CompletedAt = &completedAt.Time + } + + // Unmarshal metadata + if len(metadataJSON) > 0 { + if err := json.Unmarshal(metadataJSON, &event.Metadata); err != nil { + logger.Warn("Failed to unmarshal metadata: %v", err) + } + } + + results = append(results, event) + } + + return results, nil +} + +// UpdateStatus updates the status of an event +func (dp *DatabaseProvider) UpdateStatus(ctx context.Context, id string, status EventStatus, errorMsg string) error { + query := fmt.Sprintf(` + UPDATE %s + SET status = $1, error = $2 + WHERE id = $3 + `, dp.tableName) + + _, err := dp.db.Exec(ctx, query, string(status), errorMsg, id) + if err != nil { + return fmt.Errorf("failed to update status: %w", err) + } + + return nil +} + +// Delete deletes an event by ID +func (dp *DatabaseProvider) Delete(ctx context.Context, id string) error { + query := fmt.Sprintf("DELETE FROM %s WHERE id = $1", dp.tableName) + + _, err := dp.db.Exec(ctx, query, id) + if err != nil { + return fmt.Errorf("failed to delete event: %w", err) + } + + dp.stats.TotalEvents.Add(-1) + return nil +} + +// Stream returns a channel of events for real-time consumption +func (dp *DatabaseProvider) Stream(ctx context.Context, pattern string) (<-chan *Event, error) { + ch := make(chan *Event, 100) + + subCtx, cancel := context.WithCancel(ctx) + + sub := &dbSubscription{ + pattern: pattern, + ch: ch, + lastSeenID: "", + ctx: subCtx, + cancel: cancel, + } + + dp.mu.Lock() + dp.subscribers[pattern] = sub + dp.stats.ActiveSubscribers.Add(1) + dp.mu.Unlock() + + return ch, nil +} + +// Publish publishes an event to all subscribers +func (dp *DatabaseProvider) Publish(ctx context.Context, event *Event) error { + // Store the event first + if err := dp.Store(ctx, event); err != nil { + return err + } + + dp.stats.EventsPublished.Add(1) + + // If using PostgreSQL NOTIFY, send notification + if dp.useNotify { + if err := dp.notify(ctx, event.ID); err != nil { + logger.Warn("Failed to send NOTIFY: %v", err) + } + } + + return nil +} + +// Close closes the provider and releases resources +func (dp *DatabaseProvider) Close() error { + if !dp.isRunning.Load() { + return nil + } + + dp.isRunning.Store(false) + + // Cancel all subscriptions + dp.mu.Lock() + for _, sub := range dp.subscribers { + sub.cancel() + } + dp.mu.Unlock() + + // Stop polling + close(dp.stopPolling) + + // Wait for goroutines + dp.wg.Wait() + + logger.Info("Database provider closed") + return nil +} + +// Stats returns provider statistics +func (dp *DatabaseProvider) Stats(ctx context.Context) (*ProviderStats, error) { + // Get counts by status + query := fmt.Sprintf(` + SELECT + COUNT(*) FILTER (WHERE status = 'pending') as pending, + COUNT(*) FILTER (WHERE status = 'processing') as processing, + COUNT(*) FILTER (WHERE status = 'completed') as completed, + COUNT(*) FILTER (WHERE status = 'failed') as failed, + COUNT(*) as total + FROM %s + `, dp.tableName) + + var pending, processing, completed, failed, total int64 + + rows, err := dp.db.GetUnderlyingDB().(interface { + QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) + }).QueryContext(ctx, query) + if err != nil { + logger.Warn("Failed to get stats: %v", err) + } else { + defer rows.Close() + if rows.Next() { + if err := rows.Scan(&pending, &processing, &completed, &failed, &total); err != nil { + logger.Warn("Failed to scan stats: %v", err) + } + } + } + + return &ProviderStats{ + ProviderType: "database", + TotalEvents: total, + PendingEvents: pending, + ProcessingEvents: processing, + CompletedEvents: completed, + FailedEvents: failed, + EventsPublished: dp.stats.EventsPublished.Load(), + EventsConsumed: dp.stats.EventsConsumed.Load(), + ActiveSubscribers: int(dp.stats.ActiveSubscribers.Load()), + ProviderSpecific: map[string]interface{}{ + "table_name": dp.tableName, + "poll_interval": dp.pollInterval.String(), + "use_notify": dp.useNotify, + "poll_errors": dp.stats.PollErrors.Load(), + }, + }, nil +} + +// pollLoop periodically polls for new events +func (dp *DatabaseProvider) pollLoop() { + defer dp.wg.Done() + + ticker := time.NewTicker(dp.pollInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + dp.pollEvents() + case <-dp.stopPolling: + return + } + } +} + +// pollEvents polls for new events and delivers to subscribers +func (dp *DatabaseProvider) pollEvents() { + dp.mu.RLock() + subscribers := make([]*dbSubscription, 0, len(dp.subscribers)) + for _, sub := range dp.subscribers { + subscribers = append(subscribers, sub) + } + dp.mu.RUnlock() + + for _, sub := range subscribers { + // Query for new events since last seen + query := fmt.Sprintf(` + SELECT id, source, type, status, retry_count, error, + payload, user_id, session_id, instance_id, + schema, entity, operation, + created_at, processed_at, completed_at, metadata + FROM %s + WHERE id > $1 + ORDER BY created_at ASC + LIMIT 100 + `, dp.tableName) + + lastSeenID := sub.lastSeenID + if lastSeenID == "" { + lastSeenID = "00000000-0000-0000-0000-000000000000" + } + + rows, err := dp.db.GetUnderlyingDB().(interface { + QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) + }).QueryContext(sub.ctx, query, lastSeenID) + if err != nil { + dp.stats.PollErrors.Add(1) + logger.Warn("Failed to poll events: %v", err) + continue + } + + for rows.Next() { + event := &Event{} + var source, eventType, status, operation string + var metadataJSON []byte + var processedAt, completedAt sql.NullTime + + err := rows.Scan( + &event.ID, &source, &eventType, &status, &event.RetryCount, &event.Error, + &event.Payload, &event.UserID, &event.SessionID, &event.InstanceID, + &event.Schema, &event.Entity, &operation, + &event.CreatedAt, &processedAt, &completedAt, &metadataJSON, + ) + if err != nil { + logger.Warn("Failed to scan event: %v", err) + continue + } + + // Set enum values + event.Source = EventSource(source) + event.Type = eventType + event.Status = EventStatus(status) + event.Operation = operation + + // Handle nullable timestamps + if processedAt.Valid { + event.ProcessedAt = &processedAt.Time + } + if completedAt.Valid { + event.CompletedAt = &completedAt.Time + } + + // Unmarshal metadata + if len(metadataJSON) > 0 { + if err := json.Unmarshal(metadataJSON, &event.Metadata); err != nil { + logger.Warn("Failed to unmarshal metadata: %v", err) + } + } + + // Check if event matches pattern + if matchPattern(sub.pattern, event.Type) { + select { + case sub.ch <- event: + dp.stats.EventsConsumed.Add(1) + sub.lastSeenID = event.ID + case <-sub.ctx.Done(): + rows.Close() + return + default: + // Channel full, skip + logger.Warn("Subscriber channel full for pattern: %s", sub.pattern) + } + } + + sub.lastSeenID = event.ID + } + + rows.Close() + } +} + +// notify sends a PostgreSQL NOTIFY message +func (dp *DatabaseProvider) notify(ctx context.Context, eventID string) error { + query := fmt.Sprintf("NOTIFY %s, '%s'", dp.channel, eventID) + _, err := dp.db.Exec(ctx, query) + return err +} + +// createTable creates the events table if it doesn't exist +func (dp *DatabaseProvider) createTable(ctx context.Context) error { + query := fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + id VARCHAR(255) PRIMARY KEY, + source VARCHAR(50) NOT NULL, + type VARCHAR(255) NOT NULL, + status VARCHAR(50) NOT NULL, + retry_count INTEGER DEFAULT 0, + error TEXT, + payload JSONB, + user_id INTEGER, + session_id VARCHAR(255), + instance_id VARCHAR(255), + schema VARCHAR(255), + entity VARCHAR(255), + operation VARCHAR(50), + created_at TIMESTAMP NOT NULL DEFAULT NOW(), + processed_at TIMESTAMP, + completed_at TIMESTAMP, + metadata JSONB + ) + `, dp.tableName) + + if _, err := dp.db.Exec(ctx, query); err != nil { + return fmt.Errorf("failed to create table: %w", err) + } + + // Create indexes + indexes := []string{ + fmt.Sprintf("CREATE INDEX IF NOT EXISTS idx_%s_source ON %s(source)", dp.tableName, dp.tableName), + fmt.Sprintf("CREATE INDEX IF NOT EXISTS idx_%s_type ON %s(type)", dp.tableName, dp.tableName), + fmt.Sprintf("CREATE INDEX IF NOT EXISTS idx_%s_status ON %s(status)", dp.tableName, dp.tableName), + fmt.Sprintf("CREATE INDEX IF NOT EXISTS idx_%s_created_at ON %s(created_at)", dp.tableName, dp.tableName), + fmt.Sprintf("CREATE INDEX IF NOT EXISTS idx_%s_instance_id ON %s(instance_id)", dp.tableName, dp.tableName), + } + + for _, indexQuery := range indexes { + if _, err := dp.db.Exec(ctx, indexQuery); err != nil { + logger.Warn("Failed to create index: %v", err) + } + } + + return nil +} diff --git a/pkg/eventbroker/provider_nats.go b/pkg/eventbroker/provider_nats.go new file mode 100644 index 0000000..c2a4bd8 --- /dev/null +++ b/pkg/eventbroker/provider_nats.go @@ -0,0 +1,565 @@ +package eventbroker + +import ( + "context" + "encoding/json" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" + + "github.com/bitechdev/ResolveSpec/pkg/logger" +) + +// NATSProvider implements Provider interface using NATS JetStream +// Features: +// - Persistent event storage using JetStream +// - Cross-instance pub/sub using NATS subjects +// - Wildcard subscription support +// - Durable consumers for event replay +// - At-least-once delivery semantics +type NATSProvider struct { + nc *nats.Conn + js jetstream.JetStream + stream jetstream.Stream + streamName string + subjectPrefix string + instanceID string + maxAge time.Duration + + // Subscriptions + mu sync.RWMutex + subscribers map[string]*natsSubscription + + // Statistics + stats NATSProviderStats + + // Lifecycle + wg sync.WaitGroup + isRunning atomic.Bool +} + +// NATSProviderStats contains statistics for the NATS provider +type NATSProviderStats struct { + TotalEvents atomic.Int64 + EventsPublished atomic.Int64 + EventsConsumed atomic.Int64 + ActiveSubscribers atomic.Int32 + ConsumerErrors atomic.Int64 +} + +// natsSubscription represents a single NATS subscription +type natsSubscription struct { + pattern string + consumer jetstream.Consumer + ch chan *Event + ctx context.Context + cancel context.CancelFunc +} + +// NATSProviderConfig configures the NATS provider +type NATSProviderConfig struct { + URL string + StreamName string + SubjectPrefix string // e.g., "events" + InstanceID string + MaxAge time.Duration // How long to keep events + Storage string // "file" or "memory" +} + +// NewNATSProvider creates a new NATS event provider +func NewNATSProvider(cfg NATSProviderConfig) (*NATSProvider, error) { + // Apply defaults + if cfg.URL == "" { + cfg.URL = nats.DefaultURL + } + if cfg.StreamName == "" { + cfg.StreamName = "RESOLVESPEC_EVENTS" + } + if cfg.SubjectPrefix == "" { + cfg.SubjectPrefix = "events" + } + if cfg.MaxAge == 0 { + cfg.MaxAge = 7 * 24 * time.Hour // 7 days + } + if cfg.Storage == "" { + cfg.Storage = "file" + } + + // Connect to NATS + nc, err := nats.Connect(cfg.URL, + nats.Name("resolvespec-eventbroker-"+cfg.InstanceID), + nats.Timeout(5*time.Second), + ) + if err != nil { + return nil, fmt.Errorf("failed to connect to NATS: %w", err) + } + + // Create JetStream context + js, err := jetstream.New(nc) + if err != nil { + nc.Close() + return nil, fmt.Errorf("failed to create JetStream context: %w", err) + } + + np := &NATSProvider{ + nc: nc, + js: js, + streamName: cfg.StreamName, + subjectPrefix: cfg.SubjectPrefix, + instanceID: cfg.InstanceID, + maxAge: cfg.MaxAge, + subscribers: make(map[string]*natsSubscription), + } + + np.isRunning.Store(true) + + // Create or update stream + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // Determine storage type + var storage jetstream.StorageType + if cfg.Storage == "memory" { + storage = jetstream.MemoryStorage + } else { + storage = jetstream.FileStorage + } + + if err := np.ensureStream(ctx, storage); err != nil { + nc.Close() + return nil, fmt.Errorf("failed to create stream: %w", err) + } + + logger.Info("NATS provider initialized (stream: %s, subject: %s.*, url: %s)", + cfg.StreamName, cfg.SubjectPrefix, cfg.URL) + + return np, nil +} + +// Store stores an event +func (np *NATSProvider) Store(ctx context.Context, event *Event) error { + // Marshal event to JSON + data, err := json.Marshal(event) + if err != nil { + return fmt.Errorf("failed to marshal event: %w", err) + } + + // Publish to NATS subject + // Subject format: events.{source}.{schema}.{entity}.{operation} + subject := np.buildSubject(event) + + msg := &nats.Msg{ + Subject: subject, + Data: data, + Header: nats.Header{ + "Event-ID": []string{event.ID}, + "Event-Type": []string{event.Type}, + "Event-Source": []string{string(event.Source)}, + "Event-Status": []string{string(event.Status)}, + "Instance-ID": []string{event.InstanceID}, + }, + } + + if _, err := np.js.PublishMsg(ctx, msg); err != nil { + return fmt.Errorf("failed to publish event: %w", err) + } + + np.stats.TotalEvents.Add(1) + return nil +} + +// Get retrieves an event by ID +// Note: This is inefficient with JetStream - consider using a separate KV store for lookups +func (np *NATSProvider) Get(ctx context.Context, id string) (*Event, error) { + // We need to scan messages which is not ideal + // For production, consider using NATS KV store for fast lookups + consumer, err := np.stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{ + Name: "get-" + id, + FilterSubject: np.subjectPrefix + ".>", + DeliverPolicy: jetstream.DeliverAllPolicy, + AckPolicy: jetstream.AckExplicitPolicy, + }) + if err != nil { + return nil, fmt.Errorf("failed to create consumer: %w", err) + } + + // Fetch messages in batches + msgs, err := consumer.Fetch(1000, jetstream.FetchMaxWait(5*time.Second)) + if err != nil { + return nil, fmt.Errorf("failed to fetch messages: %w", err) + } + + for msg := range msgs.Messages() { + if msg.Headers().Get("Event-ID") == id { + var event Event + if err := json.Unmarshal(msg.Data(), &event); err != nil { + _ = msg.Nak() + continue + } + _ = msg.Ack() + + // Delete temporary consumer + _ = np.stream.DeleteConsumer(ctx, "get-"+id) + + return &event, nil + } + _ = msg.Ack() + } + + // Delete temporary consumer + _ = np.stream.DeleteConsumer(ctx, "get-"+id) + + return nil, fmt.Errorf("event not found: %s", id) +} + +// List lists events with optional filters +func (np *NATSProvider) List(ctx context.Context, filter *EventFilter) ([]*Event, error) { + var results []*Event + + // Create temporary consumer + consumer, err := np.stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{ + Name: fmt.Sprintf("list-%d", time.Now().UnixNano()), + FilterSubject: np.subjectPrefix + ".>", + DeliverPolicy: jetstream.DeliverAllPolicy, + AckPolicy: jetstream.AckExplicitPolicy, + }) + if err != nil { + return nil, fmt.Errorf("failed to create consumer: %w", err) + } + + defer func() { _ = np.stream.DeleteConsumer(ctx, consumer.CachedInfo().Name) }() + + // Fetch messages in batches + msgs, err := consumer.Fetch(1000, jetstream.FetchMaxWait(5*time.Second)) + if err != nil { + return nil, fmt.Errorf("failed to fetch messages: %w", err) + } + + for msg := range msgs.Messages() { + var event Event + if err := json.Unmarshal(msg.Data(), &event); err != nil { + logger.Warn("Failed to unmarshal event: %v", err) + _ = msg.Nak() + continue + } + + if np.matchesFilter(&event, filter) { + results = append(results, &event) + } + + _ = msg.Ack() + } + + // Apply limit and offset + if filter != nil { + if filter.Offset > 0 && filter.Offset < len(results) { + results = results[filter.Offset:] + } + if filter.Limit > 0 && filter.Limit < len(results) { + results = results[:filter.Limit] + } + } + + return results, nil +} + +// UpdateStatus updates the status of an event +// Note: NATS streams are append-only, so we publish a status update event +func (np *NATSProvider) UpdateStatus(ctx context.Context, id string, status EventStatus, errorMsg string) error { + // Publish a status update message + subject := fmt.Sprintf("%s.status.%s", np.subjectPrefix, id) + + statusUpdate := map[string]interface{}{ + "event_id": id, + "status": string(status), + "error": errorMsg, + "updated_at": time.Now(), + } + + data, err := json.Marshal(statusUpdate) + if err != nil { + return fmt.Errorf("failed to marshal status update: %w", err) + } + + if _, err := np.js.Publish(ctx, subject, data); err != nil { + return fmt.Errorf("failed to publish status update: %w", err) + } + + return nil +} + +// Delete deletes an event by ID +// Note: NATS streams don't support deletion - this just marks it in a separate subject +func (np *NATSProvider) Delete(ctx context.Context, id string) error { + subject := fmt.Sprintf("%s.deleted.%s", np.subjectPrefix, id) + + deleteMsg := map[string]interface{}{ + "event_id": id, + "deleted_at": time.Now(), + } + + data, err := json.Marshal(deleteMsg) + if err != nil { + return fmt.Errorf("failed to marshal delete message: %w", err) + } + + if _, err := np.js.Publish(ctx, subject, data); err != nil { + return fmt.Errorf("failed to publish delete message: %w", err) + } + + return nil +} + +// Stream returns a channel of events for real-time consumption +func (np *NATSProvider) Stream(ctx context.Context, pattern string) (<-chan *Event, error) { + ch := make(chan *Event, 100) + + // Convert glob pattern to NATS subject pattern + natsSubject := np.patternToSubject(pattern) + + // Create durable consumer + consumerName := fmt.Sprintf("consumer-%s-%d", np.instanceID, time.Now().UnixNano()) + consumer, err := np.stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{ + Name: consumerName, + FilterSubject: natsSubject, + DeliverPolicy: jetstream.DeliverNewPolicy, + AckPolicy: jetstream.AckExplicitPolicy, + AckWait: 30 * time.Second, + }) + if err != nil { + return nil, fmt.Errorf("failed to create consumer: %w", err) + } + + subCtx, cancel := context.WithCancel(ctx) + + sub := &natsSubscription{ + pattern: pattern, + consumer: consumer, + ch: ch, + ctx: subCtx, + cancel: cancel, + } + + np.mu.Lock() + np.subscribers[pattern] = sub + np.stats.ActiveSubscribers.Add(1) + np.mu.Unlock() + + // Start consumer goroutine + np.wg.Add(1) + go np.consumeMessages(sub) + + return ch, nil +} + +// Publish publishes an event to all subscribers +func (np *NATSProvider) Publish(ctx context.Context, event *Event) error { + // Store the event first + if err := np.Store(ctx, event); err != nil { + return err + } + + np.stats.EventsPublished.Add(1) + return nil +} + +// Close closes the provider and releases resources +func (np *NATSProvider) Close() error { + if !np.isRunning.Load() { + return nil + } + + np.isRunning.Store(false) + + // Cancel all subscriptions + np.mu.Lock() + for _, sub := range np.subscribers { + sub.cancel() + } + np.mu.Unlock() + + // Wait for goroutines + np.wg.Wait() + + // Close NATS connection + np.nc.Close() + + logger.Info("NATS provider closed") + return nil +} + +// Stats returns provider statistics +func (np *NATSProvider) Stats(ctx context.Context) (*ProviderStats, error) { + streamInfo, err := np.stream.Info(ctx) + if err != nil { + logger.Warn("Failed to get stream info: %v", err) + } + + stats := &ProviderStats{ + ProviderType: "nats", + TotalEvents: np.stats.TotalEvents.Load(), + EventsPublished: np.stats.EventsPublished.Load(), + EventsConsumed: np.stats.EventsConsumed.Load(), + ActiveSubscribers: int(np.stats.ActiveSubscribers.Load()), + ProviderSpecific: map[string]interface{}{ + "stream_name": np.streamName, + "subject_prefix": np.subjectPrefix, + "max_age": np.maxAge.String(), + "consumer_errors": np.stats.ConsumerErrors.Load(), + }, + } + + if streamInfo != nil { + stats.ProviderSpecific["messages"] = streamInfo.State.Msgs + stats.ProviderSpecific["bytes"] = streamInfo.State.Bytes + stats.ProviderSpecific["consumers"] = streamInfo.State.Consumers + } + + return stats, nil +} + +// ensureStream creates or updates the JetStream stream +func (np *NATSProvider) ensureStream(ctx context.Context, storage jetstream.StorageType) error { + streamConfig := jetstream.StreamConfig{ + Name: np.streamName, + Subjects: []string{np.subjectPrefix + ".>"}, + MaxAge: np.maxAge, + Storage: storage, + Retention: jetstream.LimitsPolicy, + Discard: jetstream.DiscardOld, + } + + stream, err := np.js.CreateStream(ctx, streamConfig) + if err != nil { + // Try to update if already exists + stream, err = np.js.UpdateStream(ctx, streamConfig) + if err != nil { + return fmt.Errorf("failed to create/update stream: %w", err) + } + } + + np.stream = stream + return nil +} + +// consumeMessages consumes messages from NATS for a subscription +func (np *NATSProvider) consumeMessages(sub *natsSubscription) { + defer np.wg.Done() + defer close(sub.ch) + defer func() { + np.mu.Lock() + delete(np.subscribers, sub.pattern) + np.stats.ActiveSubscribers.Add(-1) + np.mu.Unlock() + }() + + logger.Debug("Starting NATS consumer for pattern: %s", sub.pattern) + + // Consume messages + cc, err := sub.consumer.Consume(func(msg jetstream.Msg) { + var event Event + if err := json.Unmarshal(msg.Data(), &event); err != nil { + logger.Warn("Failed to unmarshal event: %v", err) + _ = msg.Nak() + return + } + + // Check if event matches pattern (additional filtering) + if matchPattern(sub.pattern, event.Type) { + select { + case sub.ch <- &event: + np.stats.EventsConsumed.Add(1) + _ = msg.Ack() + case <-sub.ctx.Done(): + _ = msg.Nak() + return + } + } else { + _ = msg.Ack() + } + }) + + if err != nil { + np.stats.ConsumerErrors.Add(1) + logger.Error("Failed to start consumer: %v", err) + return + } + + // Wait for context cancellation + <-sub.ctx.Done() + + // Stop consuming + cc.Stop() + + logger.Debug("NATS consumer stopped for pattern: %s", sub.pattern) +} + +// buildSubject creates a NATS subject from an event +// Format: events.{source}.{schema}.{entity}.{operation} +func (np *NATSProvider) buildSubject(event *Event) string { + return fmt.Sprintf("%s.%s.%s.%s.%s", + np.subjectPrefix, + event.Source, + event.Schema, + event.Entity, + event.Operation, + ) +} + +// patternToSubject converts a glob pattern to NATS subject pattern +// Examples: +// - "*" -> "events.>" +// - "public.users.*" -> "events.*.public.users.*" +// - "public.*.*" -> "events.*.public.*.*" +func (np *NATSProvider) patternToSubject(pattern string) string { + if pattern == "*" { + return np.subjectPrefix + ".>" + } + + // For specific patterns, we need to match the event type structure + // Event type: schema.entity.operation + // NATS subject: events.{source}.{schema}.{entity}.{operation} + // We use wildcard for source since pattern doesn't include it + return fmt.Sprintf("%s.*.%s", np.subjectPrefix, pattern) +} + +// matchesFilter checks if an event matches the filter criteria +func (np *NATSProvider) matchesFilter(event *Event, filter *EventFilter) bool { + if filter == nil { + return true + } + + if filter.Source != nil && event.Source != *filter.Source { + return false + } + if filter.Status != nil && event.Status != *filter.Status { + return false + } + if filter.UserID != nil && event.UserID != *filter.UserID { + return false + } + if filter.Schema != "" && event.Schema != filter.Schema { + return false + } + if filter.Entity != "" && event.Entity != filter.Entity { + return false + } + if filter.Operation != "" && event.Operation != filter.Operation { + return false + } + if filter.InstanceID != "" && event.InstanceID != filter.InstanceID { + return false + } + if filter.StartTime != nil && event.CreatedAt.Before(*filter.StartTime) { + return false + } + if filter.EndTime != nil && event.CreatedAt.After(*filter.EndTime) { + return false + } + + return true +} diff --git a/pkg/eventbroker/provider_redis.go b/pkg/eventbroker/provider_redis.go new file mode 100644 index 0000000..78ed3b1 --- /dev/null +++ b/pkg/eventbroker/provider_redis.go @@ -0,0 +1,541 @@ +package eventbroker + +import ( + "context" + "encoding/json" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/redis/go-redis/v9" + + "github.com/bitechdev/ResolveSpec/pkg/logger" +) + +// RedisProvider implements Provider interface using Redis Streams +// Features: +// - Persistent event storage using Redis Streams +// - Cross-instance pub/sub using consumer groups +// - Pattern-based subscription routing +// - Automatic stream trimming to prevent unbounded growth +type RedisProvider struct { + client *redis.Client + streamName string + consumerGroup string + consumerName string + instanceID string + maxLen int64 + + // Subscriptions + mu sync.RWMutex + subscribers map[string]*redisSubscription + + // Statistics + stats RedisProviderStats + + // Lifecycle + stopListeners chan struct{} + wg sync.WaitGroup + isRunning atomic.Bool +} + +// RedisProviderStats contains statistics for the Redis provider +type RedisProviderStats struct { + TotalEvents atomic.Int64 + EventsPublished atomic.Int64 + EventsConsumed atomic.Int64 + ActiveSubscribers atomic.Int32 + ConsumerErrors atomic.Int64 +} + +// redisSubscription represents a single subscription +type redisSubscription struct { + pattern string + ch chan *Event + ctx context.Context + cancel context.CancelFunc +} + +// RedisProviderConfig configures the Redis provider +type RedisProviderConfig struct { + Host string + Port int + Password string + DB int + StreamName string + ConsumerGroup string + ConsumerName string + InstanceID string + MaxLen int64 // Maximum stream length (0 = unlimited) +} + +// NewRedisProvider creates a new Redis event provider +func NewRedisProvider(cfg RedisProviderConfig) (*RedisProvider, error) { + // Apply defaults + if cfg.Host == "" { + cfg.Host = "localhost" + } + if cfg.Port == 0 { + cfg.Port = 6379 + } + if cfg.StreamName == "" { + cfg.StreamName = "resolvespec:events" + } + if cfg.ConsumerGroup == "" { + cfg.ConsumerGroup = "resolvespec-workers" + } + if cfg.ConsumerName == "" { + cfg.ConsumerName = cfg.InstanceID + } + if cfg.MaxLen == 0 { + cfg.MaxLen = 10000 // Default max stream length + } + + // Create Redis client + client := redis.NewClient(&redis.Options{ + Addr: fmt.Sprintf("%s:%d", cfg.Host, cfg.Port), + Password: cfg.Password, + DB: cfg.DB, + PoolSize: 10, + }) + + // Test connection + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if err := client.Ping(ctx).Err(); err != nil { + return nil, fmt.Errorf("failed to connect to Redis: %w", err) + } + + rp := &RedisProvider{ + client: client, + streamName: cfg.StreamName, + consumerGroup: cfg.ConsumerGroup, + consumerName: cfg.ConsumerName, + instanceID: cfg.InstanceID, + maxLen: cfg.MaxLen, + subscribers: make(map[string]*redisSubscription), + stopListeners: make(chan struct{}), + } + + rp.isRunning.Store(true) + + // Create consumer group if it doesn't exist + if err := rp.ensureConsumerGroup(ctx); err != nil { + logger.Warn("Failed to create consumer group: %v (may already exist)", err) + } + + logger.Info("Redis provider initialized (stream: %s, consumer_group: %s, consumer: %s)", + cfg.StreamName, cfg.ConsumerGroup, cfg.ConsumerName) + + return rp, nil +} + +// Store stores an event +func (rp *RedisProvider) Store(ctx context.Context, event *Event) error { + // Marshal event to JSON + data, err := json.Marshal(event) + if err != nil { + return fmt.Errorf("failed to marshal event: %w", err) + } + + // Store in Redis Stream + args := &redis.XAddArgs{ + Stream: rp.streamName, + MaxLen: rp.maxLen, + Approx: true, // Use approximate trimming for better performance + Values: map[string]interface{}{ + "event": data, + "id": event.ID, + "type": event.Type, + "source": string(event.Source), + "status": string(event.Status), + "instance_id": event.InstanceID, + }, + } + + if _, err := rp.client.XAdd(ctx, args).Result(); err != nil { + return fmt.Errorf("failed to add event to stream: %w", err) + } + + rp.stats.TotalEvents.Add(1) + return nil +} + +// Get retrieves an event by ID +// Note: This scans the stream which can be slow for large streams +// Consider using a separate hash for fast lookups if needed +func (rp *RedisProvider) Get(ctx context.Context, id string) (*Event, error) { + // Scan stream for event with matching ID + args := &redis.XReadArgs{ + Streams: []string{rp.streamName, "0"}, + Count: 1000, // Read in batches + } + + for { + streams, err := rp.client.XRead(ctx, args).Result() + if err == redis.Nil { + return nil, fmt.Errorf("event not found: %s", id) + } + if err != nil { + return nil, fmt.Errorf("failed to read stream: %w", err) + } + + if len(streams) == 0 { + return nil, fmt.Errorf("event not found: %s", id) + } + + for _, stream := range streams { + for _, message := range stream.Messages { + // Check if this is the event we're looking for + if eventID, ok := message.Values["id"].(string); ok && eventID == id { + // Parse event + if eventData, ok := message.Values["event"].(string); ok { + var event Event + if err := json.Unmarshal([]byte(eventData), &event); err != nil { + return nil, fmt.Errorf("failed to unmarshal event: %w", err) + } + return &event, nil + } + } + } + + // If we've read messages, update start position for next iteration + if len(stream.Messages) > 0 { + args.Streams[1] = stream.Messages[len(stream.Messages)-1].ID + } else { + // No more messages + return nil, fmt.Errorf("event not found: %s", id) + } + } + } +} + +// List lists events with optional filters +// Note: This scans the entire stream which can be slow +// Consider using time-based or ID-based ranges for better performance +func (rp *RedisProvider) List(ctx context.Context, filter *EventFilter) ([]*Event, error) { + var results []*Event + + // Read from stream + args := &redis.XReadArgs{ + Streams: []string{rp.streamName, "0"}, + Count: 1000, + } + + for { + streams, err := rp.client.XRead(ctx, args).Result() + if err == redis.Nil { + break + } + if err != nil { + return nil, fmt.Errorf("failed to read stream: %w", err) + } + + if len(streams) == 0 { + break + } + + for _, stream := range streams { + for _, message := range stream.Messages { + if eventData, ok := message.Values["event"].(string); ok { + var event Event + if err := json.Unmarshal([]byte(eventData), &event); err != nil { + logger.Warn("Failed to unmarshal event: %v", err) + continue + } + + if rp.matchesFilter(&event, filter) { + results = append(results, &event) + } + } + } + + // Update start position for next iteration + if len(stream.Messages) > 0 { + args.Streams[1] = stream.Messages[len(stream.Messages)-1].ID + } else { + // No more messages + goto done + } + } + } + +done: + // Apply limit and offset + if filter != nil { + if filter.Offset > 0 && filter.Offset < len(results) { + results = results[filter.Offset:] + } + if filter.Limit > 0 && filter.Limit < len(results) { + results = results[:filter.Limit] + } + } + + return results, nil +} + +// UpdateStatus updates the status of an event +// Note: Redis Streams are append-only, so we need to store status updates separately +// This uses a separate hash for status tracking +func (rp *RedisProvider) UpdateStatus(ctx context.Context, id string, status EventStatus, errorMsg string) error { + statusKey := fmt.Sprintf("%s:status:%s", rp.streamName, id) + + fields := map[string]interface{}{ + "status": string(status), + "updated_at": time.Now().Format(time.RFC3339), + } + + if errorMsg != "" { + fields["error"] = errorMsg + } + + if err := rp.client.HSet(ctx, statusKey, fields).Err(); err != nil { + return fmt.Errorf("failed to update status: %w", err) + } + + // Set TTL on status key to prevent unbounded growth + rp.client.Expire(ctx, statusKey, 7*24*time.Hour) // 7 days + + return nil +} + +// Delete deletes an event by ID +// Note: Redis Streams don't support deletion by field value +// This marks the event as deleted in a separate set +func (rp *RedisProvider) Delete(ctx context.Context, id string) error { + deletedKey := fmt.Sprintf("%s:deleted", rp.streamName) + + if err := rp.client.SAdd(ctx, deletedKey, id).Err(); err != nil { + return fmt.Errorf("failed to mark event as deleted: %w", err) + } + + // Also delete the status hash if it exists + statusKey := fmt.Sprintf("%s:status:%s", rp.streamName, id) + rp.client.Del(ctx, statusKey) + + return nil +} + +// Stream returns a channel of events for real-time consumption +// Uses Redis Streams consumer group for distributed processing +func (rp *RedisProvider) Stream(ctx context.Context, pattern string) (<-chan *Event, error) { + ch := make(chan *Event, 100) + + subCtx, cancel := context.WithCancel(ctx) + + sub := &redisSubscription{ + pattern: pattern, + ch: ch, + ctx: subCtx, + cancel: cancel, + } + + rp.mu.Lock() + rp.subscribers[pattern] = sub + rp.stats.ActiveSubscribers.Add(1) + rp.mu.Unlock() + + // Start consumer goroutine + rp.wg.Add(1) + go rp.consumeStream(sub) + + return ch, nil +} + +// Publish publishes an event to all subscribers (cross-instance) +func (rp *RedisProvider) Publish(ctx context.Context, event *Event) error { + // Store the event first + if err := rp.Store(ctx, event); err != nil { + return err + } + + rp.stats.EventsPublished.Add(1) + return nil +} + +// Close closes the provider and releases resources +func (rp *RedisProvider) Close() error { + if !rp.isRunning.Load() { + return nil + } + + rp.isRunning.Store(false) + + // Cancel all subscriptions + rp.mu.Lock() + for _, sub := range rp.subscribers { + sub.cancel() + } + rp.mu.Unlock() + + // Stop listeners + close(rp.stopListeners) + + // Wait for goroutines + rp.wg.Wait() + + // Close Redis client + if err := rp.client.Close(); err != nil { + return fmt.Errorf("failed to close Redis client: %w", err) + } + + logger.Info("Redis provider closed") + return nil +} + +// Stats returns provider statistics +func (rp *RedisProvider) Stats(ctx context.Context) (*ProviderStats, error) { + // Get stream info + streamInfo, err := rp.client.XInfoStream(ctx, rp.streamName).Result() + if err != nil && err != redis.Nil { + logger.Warn("Failed to get stream info: %v", err) + } + + stats := &ProviderStats{ + ProviderType: "redis", + TotalEvents: rp.stats.TotalEvents.Load(), + EventsPublished: rp.stats.EventsPublished.Load(), + EventsConsumed: rp.stats.EventsConsumed.Load(), + ActiveSubscribers: int(rp.stats.ActiveSubscribers.Load()), + ProviderSpecific: map[string]interface{}{ + "stream_name": rp.streamName, + "consumer_group": rp.consumerGroup, + "consumer_name": rp.consumerName, + "max_len": rp.maxLen, + "consumer_errors": rp.stats.ConsumerErrors.Load(), + }, + } + + if streamInfo != nil { + stats.ProviderSpecific["stream_length"] = streamInfo.Length + stats.ProviderSpecific["first_entry_id"] = streamInfo.FirstEntry.ID + stats.ProviderSpecific["last_entry_id"] = streamInfo.LastEntry.ID + } + + return stats, nil +} + +// consumeStream consumes events from the Redis Stream for a subscription +func (rp *RedisProvider) consumeStream(sub *redisSubscription) { + defer rp.wg.Done() + defer close(sub.ch) + defer func() { + rp.mu.Lock() + delete(rp.subscribers, sub.pattern) + rp.stats.ActiveSubscribers.Add(-1) + rp.mu.Unlock() + }() + + logger.Debug("Starting stream consumer for pattern: %s", sub.pattern) + + // Use consumer group for distributed processing + for { + select { + case <-sub.ctx.Done(): + logger.Debug("Stream consumer stopped for pattern: %s", sub.pattern) + return + default: + // Read from consumer group + args := &redis.XReadGroupArgs{ + Group: rp.consumerGroup, + Consumer: rp.consumerName, + Streams: []string{rp.streamName, ">"}, + Count: 10, + Block: 1 * time.Second, + } + + streams, err := rp.client.XReadGroup(sub.ctx, args).Result() + if err == redis.Nil { + continue + } + if err != nil { + if sub.ctx.Err() != nil { + return + } + rp.stats.ConsumerErrors.Add(1) + logger.Warn("Failed to read from consumer group: %v", err) + time.Sleep(1 * time.Second) + continue + } + + for _, stream := range streams { + for _, message := range stream.Messages { + if eventData, ok := message.Values["event"].(string); ok { + var event Event + if err := json.Unmarshal([]byte(eventData), &event); err != nil { + logger.Warn("Failed to unmarshal event: %v", err) + // Acknowledge message anyway to prevent redelivery + rp.client.XAck(sub.ctx, rp.streamName, rp.consumerGroup, message.ID) + continue + } + + // Check if event matches pattern + if matchPattern(sub.pattern, event.Type) { + select { + case sub.ch <- &event: + rp.stats.EventsConsumed.Add(1) + // Acknowledge message + rp.client.XAck(sub.ctx, rp.streamName, rp.consumerGroup, message.ID) + case <-sub.ctx.Done(): + return + } + } else { + // Acknowledge message even if it doesn't match pattern + rp.client.XAck(sub.ctx, rp.streamName, rp.consumerGroup, message.ID) + } + } + } + } + } + } +} + +// ensureConsumerGroup creates the consumer group if it doesn't exist +func (rp *RedisProvider) ensureConsumerGroup(ctx context.Context) error { + // Try to create the stream and consumer group + // MKSTREAM creates the stream if it doesn't exist + err := rp.client.XGroupCreateMkStream(ctx, rp.streamName, rp.consumerGroup, "0").Err() + if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" { + return err + } + return nil +} + +// matchesFilter checks if an event matches the filter criteria +func (rp *RedisProvider) matchesFilter(event *Event, filter *EventFilter) bool { + if filter == nil { + return true + } + + if filter.Source != nil && event.Source != *filter.Source { + return false + } + if filter.Status != nil && event.Status != *filter.Status { + return false + } + if filter.UserID != nil && event.UserID != *filter.UserID { + return false + } + if filter.Schema != "" && event.Schema != filter.Schema { + return false + } + if filter.Entity != "" && event.Entity != filter.Entity { + return false + } + if filter.Operation != "" && event.Operation != filter.Operation { + return false + } + if filter.InstanceID != "" && event.InstanceID != filter.InstanceID { + return false + } + if filter.StartTime != nil && event.CreatedAt.Before(*filter.StartTime) { + return false + } + if filter.EndTime != nil && event.CreatedAt.After(*filter.EndTime) { + return false + } + + return true +} diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go index d1c7705..b58082f 100644 --- a/pkg/logger/logger.go +++ b/pkg/logger/logger.go @@ -78,8 +78,8 @@ func CloseErrorTracking() error { // extractContext attempts to find a context.Context in the given arguments. // It returns the found context (or context.Background() if not found) and // the remaining arguments without the context. -func extractContext(args ...interface{}) (context.Context, []interface{}) { - ctx := context.Background() +func extractContext(args ...interface{}) (ctx context.Context, filteredArgs []interface{}) { + ctx = context.Background() var newArgs []interface{} found := false diff --git a/pkg/mqttspec/handler.go b/pkg/mqttspec/handler.go index 53757ef..d90aae9 100644 --- a/pkg/mqttspec/handler.go +++ b/pkg/mqttspec/handler.go @@ -93,7 +93,7 @@ func (h *Handler) Start() error { // Subscribe to all request topics: spec/+/request requestTopic := fmt.Sprintf("%s/+/request", h.config.Topics.Prefix) if err := h.broker.Subscribe(requestTopic, h.config.QoS.Request, h.handleIncomingMessage); err != nil { - h.broker.Stop(h.ctx) + _ = h.broker.Stop(h.ctx) return fmt.Errorf("failed to subscribe to request topic: %w", err) } @@ -130,14 +130,14 @@ func (h *Handler) Shutdown() error { "mqtt_client": client, }, } - h.hooks.Execute(BeforeDisconnect, hookCtx) + _ = h.hooks.Execute(BeforeDisconnect, hookCtx) h.clientManager.Unregister(client.ID) - h.hooks.Execute(AfterDisconnect, hookCtx) + _ = h.hooks.Execute(AfterDisconnect, hookCtx) } // Unsubscribe from request topic requestTopic := fmt.Sprintf("%s/+/request", h.config.Topics.Prefix) - h.broker.Unsubscribe(requestTopic) + _ = h.broker.Unsubscribe(requestTopic) // Stop broker if err := h.broker.Stop(h.ctx); err != nil { @@ -223,7 +223,7 @@ func (h *Handler) handleIncomingMessage(topic string, payload []byte) { return } - h.hooks.Execute(AfterConnect, hookCtx) + _ = h.hooks.Execute(AfterConnect, hookCtx) } // Route message by type @@ -498,7 +498,7 @@ func (h *Handler) handleSubscribe(client *Client, msg *Message) { client.AddSubscription(sub) // Execute after hook - h.hooks.Execute(AfterSubscribe, hookCtx) + _ = h.hooks.Execute(AfterSubscribe, hookCtx) // Send response h.sendResponse(client.ID, msg.ID, map[string]interface{}{ @@ -541,7 +541,7 @@ func (h *Handler) handleUnsubscribe(client *Client, msg *Message) { client.RemoveSubscription(subID) // Execute after hook - h.hooks.Execute(AfterUnsubscribe, hookCtx) + _ = h.hooks.Execute(AfterUnsubscribe, hookCtx) // Send response h.sendResponse(client.ID, msg.ID, map[string]interface{}{ @@ -562,7 +562,7 @@ func (h *Handler) handlePing(client *Client, msg *Message) { payload, _ := json.Marshal(pong) topic := h.getResponseTopic(client.ID) - h.broker.Publish(topic, h.config.QoS.Response, payload) + _ = h.broker.Publish(topic, h.config.QoS.Response, payload) } // notifySubscribers sends notifications to subscribers @@ -625,7 +625,7 @@ func (h *Handler) sendError(clientID, msgID, code, message string) { payload, _ := json.Marshal(errResp) topic := h.getResponseTopic(clientID) - h.broker.Publish(topic, h.config.QoS.Response, payload) + _ = h.broker.Publish(topic, h.config.QoS.Response, payload) } // Topic helpers @@ -669,8 +669,8 @@ func (h *Handler) readByID(hookCtx *HookContext) (interface{}, error) { // Apply preloads (simplified) if hookCtx.Options != nil { - for _, preload := range hookCtx.Options.Preload { - query = query.PreloadRelation(preload.Relation) + for i := range hookCtx.Options.Preload { + query = query.PreloadRelation(hookCtx.Options.Preload[i].Relation) } } @@ -683,7 +683,7 @@ func (h *Handler) readByID(hookCtx *HookContext) (interface{}, error) { } // readMultiple reads multiple records -func (h *Handler) readMultiple(hookCtx *HookContext) (interface{}, map[string]interface{}, error) { +func (h *Handler) readMultiple(hookCtx *HookContext) (data interface{}, metadata map[string]interface{}, err error) { query := h.db.NewSelect().Model(hookCtx.ModelPtr).Table(hookCtx.TableName) // Apply options @@ -711,8 +711,8 @@ func (h *Handler) readMultiple(hookCtx *HookContext) (interface{}, map[string]in } // Apply preloads - for _, preload := range hookCtx.Options.Preload { - query = query.PreloadRelation(preload.Relation) + for i := range hookCtx.Options.Preload { + query = query.PreloadRelation(hookCtx.Options.Preload[i].Relation) } // Apply columns @@ -727,7 +727,7 @@ func (h *Handler) readMultiple(hookCtx *HookContext) (interface{}, map[string]in } // Get count - metadata := make(map[string]interface{}) + metadata = make(map[string]interface{}) countQuery := h.db.NewSelect().Model(hookCtx.ModelPtr).Table(hookCtx.TableName) if hookCtx.Options != nil { for _, filter := range hookCtx.Options.Filters { diff --git a/pkg/server/manager.go b/pkg/server/manager.go index a211dc3..451fbc3 100644 --- a/pkg/server/manager.go +++ b/pkg/server/manager.go @@ -13,9 +13,10 @@ import ( "syscall" "time" + "github.com/klauspost/compress/gzhttp" + "github.com/bitechdev/ResolveSpec/pkg/logger" "github.com/bitechdev/ResolveSpec/pkg/middleware" - "github.com/klauspost/compress/gzhttp" ) // gracefulServer wraps http.Server with graceful shutdown capabilities (internal type) @@ -320,9 +321,9 @@ func (sm *serverManager) RestartAll() error { // Retry starting all servers with exponential backoff instead of a fixed sleep. const ( - maxAttempts = 5 - initialBackoff = 100 * time.Millisecond - maxBackoff = 2 * time.Second + maxAttempts = 5 + initialBackoff = 100 * time.Millisecond + maxBackoff = 2 * time.Second ) var lastErr error @@ -428,7 +429,7 @@ func newInstance(cfg Config) (*serverInstance, error) { } addr := fmt.Sprintf("%s:%d", cfg.Host, cfg.Port) - var handler http.Handler = cfg.Handler + var handler = cfg.Handler // Wrap with GZIP handler if enabled if cfg.GZIP { diff --git a/pkg/server/tls.go b/pkg/server/tls.go index a2a308d..1890774 100644 --- a/pkg/server/tls.go +++ b/pkg/server/tls.go @@ -102,14 +102,14 @@ func getCertDirectory() (string, error) { // Fallback to current directory if cache dir is not available cacheDir = "." } - + certDir := filepath.Join(cacheDir, "resolvespec", "certs") - + // Create directory if it doesn't exist if err := os.MkdirAll(certDir, 0700); err != nil { return "", fmt.Errorf("failed to create certificate directory: %w", err) } - + return certDir, nil } @@ -120,31 +120,31 @@ func isCertificateValid(certFile string) bool { if err != nil { return false } - + // Parse certificate block, _ := pem.Decode(certData) if block == nil { return false } - + cert, err := x509.ParseCertificate(block.Bytes) if err != nil { return false } - + // Check if certificate is expired or will expire in the next 30 days now := time.Now() expiryThreshold := now.Add(30 * 24 * time.Hour) - + if now.Before(cert.NotBefore) || now.After(cert.NotAfter) { return false } - + // Renew if expiring soon if expiryThreshold.After(cert.NotAfter) { return false } - + return true } @@ -156,24 +156,24 @@ func saveCertToFiles(certPEM, keyPEM []byte, host string) (certFile, keyFile str if err != nil { return "", "", err } - + // Sanitize hostname for safe file naming safeHost := sanitizeHostname(host) - + // Use consistent file names based on host certFile = filepath.Join(certDir, fmt.Sprintf("%s-cert.pem", safeHost)) keyFile = filepath.Join(certDir, fmt.Sprintf("%s-key.pem", safeHost)) - + // Write certificate if err := os.WriteFile(certFile, certPEM, 0600); err != nil { return "", "", fmt.Errorf("failed to write certificate: %w", err) } - + // Write key if err := os.WriteFile(keyFile, keyPEM, 0600); err != nil { return "", "", fmt.Errorf("failed to write private key: %w", err) } - + return certFile, keyFile, nil } @@ -196,10 +196,10 @@ func setupAutoTLS(domains []string, email, cacheDir string) (*tls.Config, error) // Create autocert manager m := &autocert.Manager{ - Prompt: autocert.AcceptTOS, - Cache: autocert.DirCache(cacheDir), - HostPolicy: autocert.HostWhitelist(domains...), - Email: email, + Prompt: autocert.AcceptTOS, + Cache: autocert.DirCache(cacheDir), + HostPolicy: autocert.HostWhitelist(domains...), + Email: email, } // Create TLS config @@ -211,7 +211,7 @@ func setupAutoTLS(domains []string, email, cacheDir string) (*tls.Config, error) // configureTLS configures TLS for the server based on the provided configuration. // Returns the TLS config and certificate/key file paths (if applicable). -func configureTLS(cfg Config) (*tls.Config, string, string, error) { +func configureTLS(cfg Config) (tlsConfig *tls.Config, certFile string, keyFile string, err error) { // Option 1: Certificate files provided if cfg.SSLCert != "" && cfg.SSLKey != "" { // Validate that files exist diff --git a/pkg/websocketspec/connection.go b/pkg/websocketspec/connection.go index f3e4c17..06fdaf9 100644 --- a/pkg/websocketspec/connection.go +++ b/pkg/websocketspec/connection.go @@ -209,9 +209,9 @@ func (c *Connection) ReadPump() { }() // Configure read parameters - c.ws.SetReadDeadline(time.Now().Add(60 * time.Second)) + _ = c.ws.SetReadDeadline(time.Now().Add(60 * time.Second)) c.ws.SetPongHandler(func(string) error { - c.ws.SetReadDeadline(time.Now().Add(60 * time.Second)) + _ = c.ws.SetReadDeadline(time.Now().Add(60 * time.Second)) return nil }) @@ -240,10 +240,10 @@ func (c *Connection) WritePump() { for { select { case message, ok := <-c.send: - c.ws.SetWriteDeadline(time.Now().Add(10 * time.Second)) + _ = c.ws.SetWriteDeadline(time.Now().Add(10 * time.Second)) if !ok { // Channel closed - c.ws.WriteMessage(websocket.CloseMessage, []byte{}) + _ = c.ws.WriteMessage(websocket.CloseMessage, []byte{}) return } @@ -251,13 +251,13 @@ func (c *Connection) WritePump() { if err != nil { return } - w.Write(message) + _, _ = w.Write(message) // Write any queued messages n := len(c.send) for i := 0; i < n; i++ { - w.Write([]byte{'\n'}) - w.Write(<-c.send) + _, _ = w.Write([]byte{'\n'}) + _, _ = w.Write(<-c.send) } if err := w.Close(); err != nil { @@ -265,7 +265,7 @@ func (c *Connection) WritePump() { } case <-ticker.C: - c.ws.SetWriteDeadline(time.Now().Add(10 * time.Second)) + _ = c.ws.SetWriteDeadline(time.Now().Add(10 * time.Second)) if err := c.ws.WriteMessage(websocket.PingMessage, nil); err != nil { return } @@ -364,14 +364,14 @@ func (c *Connection) handleMessage(data []byte) { if err != nil { logger.Error("[WebSocketSpec] Failed to parse message: %v", err) errResp := NewErrorResponse("", "invalid_message", "Failed to parse message") - c.SendJSON(errResp) + _ = c.SendJSON(errResp) return } if !msg.IsValid() { logger.Error("[WebSocketSpec] Invalid message received") errResp := NewErrorResponse(msg.ID, "invalid_message", "Message validation failed") - c.SendJSON(errResp) + _ = c.SendJSON(errResp) return } diff --git a/pkg/websocketspec/handler.go b/pkg/websocketspec/handler.go index 757401d..e7f25cd 100644 --- a/pkg/websocketspec/handler.go +++ b/pkg/websocketspec/handler.go @@ -6,7 +6,6 @@ import ( "fmt" "net/http" "reflect" - "strconv" "time" "github.com/google/uuid" @@ -22,7 +21,6 @@ type Handler struct { db common.Database registry common.ModelRegistry hooks *HookRegistry - nestedProcessor *common.NestedCUDProcessor connManager *ConnectionManager subscriptionManager *SubscriptionManager upgrader websocket.Upgrader @@ -49,9 +47,6 @@ func NewHandler(db common.Database, registry common.ModelRegistry) *Handler { ctx: ctx, } - // Initialize nested processor (nil for now, can be added later if needed) - // handler.nestedProcessor = common.NewNestedCUDProcessor(db, registry, handler) - // Start connection manager go handler.connManager.Run() @@ -110,7 +105,7 @@ func (h *Handler) HandleWebSocket(w http.ResponseWriter, r *http.Request) { h.connManager.Register(conn) // Execute after connect hook - h.hooks.Execute(AfterConnect, hookCtx) + _ = h.hooks.Execute(AfterConnect, hookCtx) // Start read/write pumps go conn.WritePump() @@ -130,7 +125,7 @@ func (h *Handler) HandleMessage(conn *Connection, msg *Message) { h.handlePing(conn, msg) default: errResp := NewErrorResponse(msg.ID, "invalid_message_type", fmt.Sprintf("Unknown message type: %s", msg.Type)) - conn.SendJSON(errResp) + _ = conn.SendJSON(errResp) } } @@ -147,7 +142,7 @@ func (h *Handler) handleRequest(conn *Connection, msg *Message) { if err != nil { logger.Error("[WebSocketSpec] Model not found for %s.%s: %v", schema, entity, err) errResp := NewErrorResponse(msg.ID, "model_not_found", fmt.Sprintf("Model not found: %s.%s", schema, entity)) - conn.SendJSON(errResp) + _ = conn.SendJSON(errResp) return } @@ -156,7 +151,7 @@ func (h *Handler) handleRequest(conn *Connection, msg *Message) { if err != nil { logger.Error("[WebSocketSpec] Model validation failed for %s.%s: %v", schema, entity, err) errResp := NewErrorResponse(msg.ID, "invalid_model", err.Error()) - conn.SendJSON(errResp) + _ = conn.SendJSON(errResp) return } @@ -195,7 +190,7 @@ func (h *Handler) handleRequest(conn *Connection, msg *Message) { h.handleMeta(conn, msg, hookCtx) default: errResp := NewErrorResponse(msg.ID, "invalid_operation", fmt.Sprintf("Unknown operation: %s", msg.Operation)) - conn.SendJSON(errResp) + _ = conn.SendJSON(errResp) } } @@ -205,7 +200,7 @@ func (h *Handler) handleRead(conn *Connection, msg *Message, hookCtx *HookContex if err := h.hooks.Execute(BeforeRead, hookCtx); err != nil { logger.Error("[WebSocketSpec] BeforeRead hook failed: %v", err) errResp := NewErrorResponse(msg.ID, "hook_error", err.Error()) - conn.SendJSON(errResp) + _ = conn.SendJSON(errResp) return } @@ -226,7 +221,7 @@ func (h *Handler) handleRead(conn *Connection, msg *Message, hookCtx *HookContex if err != nil { logger.Error("[WebSocketSpec] Read operation failed: %v", err) errResp := NewErrorResponse(msg.ID, "read_error", err.Error()) - conn.SendJSON(errResp) + _ = conn.SendJSON(errResp) return } @@ -237,14 +232,14 @@ func (h *Handler) handleRead(conn *Connection, msg *Message, hookCtx *HookContex if err := h.hooks.Execute(AfterRead, hookCtx); err != nil { logger.Error("[WebSocketSpec] AfterRead hook failed: %v", err) errResp := NewErrorResponse(msg.ID, "hook_error", err.Error()) - conn.SendJSON(errResp) + _ = conn.SendJSON(errResp) return } // Send response resp := NewResponseMessage(msg.ID, true, hookCtx.Result) resp.Metadata = metadata - conn.SendJSON(resp) + _ = conn.SendJSON(resp) } // handleCreate processes a create operation @@ -253,7 +248,7 @@ func (h *Handler) handleCreate(conn *Connection, msg *Message, hookCtx *HookCont if err := h.hooks.Execute(BeforeCreate, hookCtx); err != nil { logger.Error("[WebSocketSpec] BeforeCreate hook failed: %v", err) errResp := NewErrorResponse(msg.ID, "hook_error", err.Error()) - conn.SendJSON(errResp) + _ = conn.SendJSON(errResp) return } @@ -262,7 +257,7 @@ func (h *Handler) handleCreate(conn *Connection, msg *Message, hookCtx *HookCont if err != nil { logger.Error("[WebSocketSpec] Create operation failed: %v", err) errResp := NewErrorResponse(msg.ID, "create_error", err.Error()) - conn.SendJSON(errResp) + _ = conn.SendJSON(errResp) return } @@ -273,13 +268,13 @@ func (h *Handler) handleCreate(conn *Connection, msg *Message, hookCtx *HookCont if err := h.hooks.Execute(AfterCreate, hookCtx); err != nil { logger.Error("[WebSocketSpec] AfterCreate hook failed: %v", err) errResp := NewErrorResponse(msg.ID, "hook_error", err.Error()) - conn.SendJSON(errResp) + _ = conn.SendJSON(errResp) return } // Send response resp := NewResponseMessage(msg.ID, true, hookCtx.Result) - conn.SendJSON(resp) + _ = conn.SendJSON(resp) // Notify subscribers h.notifySubscribers(hookCtx.Schema, hookCtx.Entity, OperationCreate, data) @@ -291,7 +286,7 @@ func (h *Handler) handleUpdate(conn *Connection, msg *Message, hookCtx *HookCont if err := h.hooks.Execute(BeforeUpdate, hookCtx); err != nil { logger.Error("[WebSocketSpec] BeforeUpdate hook failed: %v", err) errResp := NewErrorResponse(msg.ID, "hook_error", err.Error()) - conn.SendJSON(errResp) + _ = conn.SendJSON(errResp) return } @@ -300,7 +295,7 @@ func (h *Handler) handleUpdate(conn *Connection, msg *Message, hookCtx *HookCont if err != nil { logger.Error("[WebSocketSpec] Update operation failed: %v", err) errResp := NewErrorResponse(msg.ID, "update_error", err.Error()) - conn.SendJSON(errResp) + _ = conn.SendJSON(errResp) return } @@ -311,13 +306,13 @@ func (h *Handler) handleUpdate(conn *Connection, msg *Message, hookCtx *HookCont if err := h.hooks.Execute(AfterUpdate, hookCtx); err != nil { logger.Error("[WebSocketSpec] AfterUpdate hook failed: %v", err) errResp := NewErrorResponse(msg.ID, "hook_error", err.Error()) - conn.SendJSON(errResp) + _ = conn.SendJSON(errResp) return } // Send response resp := NewResponseMessage(msg.ID, true, hookCtx.Result) - conn.SendJSON(resp) + _ = conn.SendJSON(resp) // Notify subscribers h.notifySubscribers(hookCtx.Schema, hookCtx.Entity, OperationUpdate, data) @@ -329,7 +324,7 @@ func (h *Handler) handleDelete(conn *Connection, msg *Message, hookCtx *HookCont if err := h.hooks.Execute(BeforeDelete, hookCtx); err != nil { logger.Error("[WebSocketSpec] BeforeDelete hook failed: %v", err) errResp := NewErrorResponse(msg.ID, "hook_error", err.Error()) - conn.SendJSON(errResp) + _ = conn.SendJSON(errResp) return } @@ -338,7 +333,7 @@ func (h *Handler) handleDelete(conn *Connection, msg *Message, hookCtx *HookCont if err != nil { logger.Error("[WebSocketSpec] Delete operation failed: %v", err) errResp := NewErrorResponse(msg.ID, "delete_error", err.Error()) - conn.SendJSON(errResp) + _ = conn.SendJSON(errResp) return } @@ -346,13 +341,13 @@ func (h *Handler) handleDelete(conn *Connection, msg *Message, hookCtx *HookCont if err := h.hooks.Execute(AfterDelete, hookCtx); err != nil { logger.Error("[WebSocketSpec] AfterDelete hook failed: %v", err) errResp := NewErrorResponse(msg.ID, "hook_error", err.Error()) - conn.SendJSON(errResp) + _ = conn.SendJSON(errResp) return } // Send response resp := NewResponseMessage(msg.ID, true, map[string]interface{}{"deleted": true}) - conn.SendJSON(resp) + _ = conn.SendJSON(resp) // Notify subscribers h.notifySubscribers(hookCtx.Schema, hookCtx.Entity, OperationDelete, map[string]interface{}{"id": hookCtx.ID}) @@ -362,7 +357,7 @@ func (h *Handler) handleDelete(conn *Connection, msg *Message, hookCtx *HookCont func (h *Handler) handleMeta(conn *Connection, msg *Message, hookCtx *HookContext) { metadata := h.getMetadata(hookCtx.Schema, hookCtx.Entity, hookCtx.Model) resp := NewResponseMessage(msg.ID, true, metadata) - conn.SendJSON(resp) + _ = conn.SendJSON(resp) } // handleSubscription processes subscription messages @@ -374,7 +369,7 @@ func (h *Handler) handleSubscription(conn *Connection, msg *Message) { h.handleUnsubscribe(conn, msg) default: errResp := NewErrorResponse(msg.ID, "invalid_subscription_operation", fmt.Sprintf("Unknown subscription operation: %s", msg.Operation)) - conn.SendJSON(errResp) + _ = conn.SendJSON(errResp) } } @@ -399,7 +394,7 @@ func (h *Handler) handleSubscribe(conn *Connection, msg *Message) { if err := h.hooks.Execute(BeforeSubscribe, hookCtx); err != nil { logger.Error("[WebSocketSpec] BeforeSubscribe hook failed: %v", err) errResp := NewErrorResponse(msg.ID, "hook_error", err.Error()) - conn.SendJSON(errResp) + _ = conn.SendJSON(errResp) return } @@ -411,7 +406,7 @@ func (h *Handler) handleSubscribe(conn *Connection, msg *Message) { hookCtx.Subscription = sub // Execute after hook - h.hooks.Execute(AfterSubscribe, hookCtx) + _ = h.hooks.Execute(AfterSubscribe, hookCtx) // Send response resp := NewResponseMessage(msg.ID, true, map[string]interface{}{ @@ -419,7 +414,7 @@ func (h *Handler) handleSubscribe(conn *Connection, msg *Message) { "schema": msg.Schema, "entity": msg.Entity, }) - conn.SendJSON(resp) + _ = conn.SendJSON(resp) logger.Info("[WebSocketSpec] Subscription created: %s for %s.%s (conn: %s)", subID, msg.Schema, msg.Entity, conn.ID) } @@ -429,7 +424,7 @@ func (h *Handler) handleUnsubscribe(conn *Connection, msg *Message) { subID := msg.SubscriptionID if subID == "" { errResp := NewErrorResponse(msg.ID, "missing_subscription_id", "Subscription ID is required for unsubscribe") - conn.SendJSON(errResp) + _ = conn.SendJSON(errResp) return } @@ -437,7 +432,7 @@ func (h *Handler) handleUnsubscribe(conn *Connection, msg *Message) { sub, exists := conn.GetSubscription(subID) if !exists { errResp := NewErrorResponse(msg.ID, "subscription_not_found", fmt.Sprintf("Subscription not found: %s", subID)) - conn.SendJSON(errResp) + _ = conn.SendJSON(errResp) return } @@ -455,7 +450,7 @@ func (h *Handler) handleUnsubscribe(conn *Connection, msg *Message) { if err := h.hooks.Execute(BeforeUnsubscribe, hookCtx); err != nil { logger.Error("[WebSocketSpec] BeforeUnsubscribe hook failed: %v", err) errResp := NewErrorResponse(msg.ID, "hook_error", err.Error()) - conn.SendJSON(errResp) + _ = conn.SendJSON(errResp) return } @@ -464,14 +459,14 @@ func (h *Handler) handleUnsubscribe(conn *Connection, msg *Message) { conn.RemoveSubscription(subID) // Execute after hook - h.hooks.Execute(AfterUnsubscribe, hookCtx) + _ = h.hooks.Execute(AfterUnsubscribe, hookCtx) // Send response resp := NewResponseMessage(msg.ID, true, map[string]interface{}{ "unsubscribed": true, "subscription_id": subID, }) - conn.SendJSON(resp) + _ = conn.SendJSON(resp) } // handlePing responds to ping messages @@ -481,7 +476,7 @@ func (h *Handler) handlePing(conn *Connection, msg *Message) { Type: MessageTypePong, Timestamp: time.Now(), } - conn.SendJSON(pong) + _ = conn.SendJSON(pong) } // notifySubscribers sends notifications to all subscribers of an entity @@ -527,8 +522,8 @@ func (h *Handler) readByID(hookCtx *HookContext) (interface{}, error) { // Apply preloads (simplified for now) if hookCtx.Options != nil { - for _, preload := range hookCtx.Options.Preload { - query = query.PreloadRelation(preload.Relation) + for i := range hookCtx.Options.Preload { + query = query.PreloadRelation(hookCtx.Options.Preload[i].Relation) } } @@ -540,7 +535,7 @@ func (h *Handler) readByID(hookCtx *HookContext) (interface{}, error) { return hookCtx.ModelPtr, nil } -func (h *Handler) readMultiple(hookCtx *HookContext) (interface{}, map[string]interface{}, error) { +func (h *Handler) readMultiple(hookCtx *HookContext) (data interface{}, metadata map[string]interface{}, err error) { query := h.db.NewSelect().Model(hookCtx.ModelPtr).Table(hookCtx.TableName) // Apply options (simplified implementation) @@ -568,8 +563,8 @@ func (h *Handler) readMultiple(hookCtx *HookContext) (interface{}, map[string]in } // Apply preloads - for _, preload := range hookCtx.Options.Preload { - query = query.PreloadRelation(preload.Relation) + for i := range hookCtx.Options.Preload { + query = query.PreloadRelation(hookCtx.Options.Preload[i].Relation) } // Apply columns @@ -584,7 +579,7 @@ func (h *Handler) readMultiple(hookCtx *HookContext) (interface{}, map[string]in } // Get count - metadata := make(map[string]interface{}) + metadata = make(map[string]interface{}) countQuery := h.db.NewSelect().Model(hookCtx.ModelPtr).Table(hookCtx.TableName) if hookCtx.Options != nil { for _, filter := range hookCtx.Options.Filters { @@ -740,8 +735,3 @@ func (h *Handler) BroadcastMessage(message interface{}, filter func(*Connection) func (h *Handler) GetConnection(id string) (*Connection, bool) { return h.connManager.GetConnection(id) } - -// Helper to convert string ID to int64 -func parseID(id string) (int64, error) { - return strconv.ParseInt(id, 10, 64) -} diff --git a/pkg/websocketspec/websocketspec.go b/pkg/websocketspec/websocketspec.go index 5830dde..fc9497d 100644 --- a/pkg/websocketspec/websocketspec.go +++ b/pkg/websocketspec/websocketspec.go @@ -110,7 +110,7 @@ func ExampleWithGORM(db *gorm.DB) { handler := NewHandlerWithGORM(db) // Register models - handler.Registry().RegisterModel("public.users", &struct{}{}) + _ = handler.Registry().RegisterModel("public.users", &struct{}{}) // Register hooks (optional) handler.Hooks().RegisterBefore(OperationRead, func(ctx *HookContext) error { @@ -131,7 +131,7 @@ func ExampleWithBun(bunDB *bun.DB) { handler := NewHandlerWithBun(bunDB) // Register models - handler.Registry().RegisterModel("public.users", &struct{}{}) + _ = handler.Registry().RegisterModel("public.users", &struct{}{}) // Setup WebSocket endpoint // http.HandleFunc("/ws", handler.HandleWebSocket)