mirror of
https://github.com/bitechdev/ResolveSpec.git
synced 2025-12-31 00:34:25 +00:00
fix: liniting issues and events dev
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@@ -25,3 +25,4 @@ go.work.sum
|
||||
.env
|
||||
bin/
|
||||
test.db
|
||||
testserver
|
||||
|
||||
45
Makefile
45
Makefile
@@ -13,6 +13,51 @@ test-integration:
|
||||
# Run all tests (unit + integration)
|
||||
test: test-unit test-integration
|
||||
|
||||
release-version: ## Create and push a release with specific version (use: make release-version VERSION=v1.2.3)
|
||||
@if [ -z "$(VERSION)" ]; then \
|
||||
echo "Error: VERSION is required. Usage: make release-version VERSION=v1.2.3"; \
|
||||
exit 1; \
|
||||
fi
|
||||
@version="$(VERSION)"; \
|
||||
if ! echo "$$version" | grep -q "^v"; then \
|
||||
version="v$$version"; \
|
||||
fi; \
|
||||
echo "Creating release: $$version"; \
|
||||
latest_tag=$$(git describe --tags --abbrev=0 2>/dev/null || echo ""); \
|
||||
if [ -z "$$latest_tag" ]; then \
|
||||
commit_logs=$$(git log --pretty=format:"- %s" --no-merges); \
|
||||
else \
|
||||
commit_logs=$$(git log "$${latest_tag}..HEAD" --pretty=format:"- %s" --no-merges); \
|
||||
fi; \
|
||||
if [ -z "$$commit_logs" ]; then \
|
||||
tag_message="Release $$version"; \
|
||||
else \
|
||||
tag_message="Release $$version\n\n$$commit_logs"; \
|
||||
fi; \
|
||||
git tag -a "$$version" -m "$$tag_message"; \
|
||||
git push origin "$$version"; \
|
||||
echo "Tag $$version created and pushed to remote repository."
|
||||
|
||||
|
||||
lint: ## Run linter
|
||||
@echo "Running linter..."
|
||||
@if command -v golangci-lint > /dev/null; then \
|
||||
golangci-lint run --config=.golangci.json; \
|
||||
else \
|
||||
echo "golangci-lint not installed. Install with: go install github.com/golangci/golangci-lint/cmd/golangci-lint@latest"; \
|
||||
exit 1; \
|
||||
fi
|
||||
|
||||
lintfix: ## Run linter
|
||||
@echo "Running linter..."
|
||||
@if command -v golangci-lint > /dev/null; then \
|
||||
golangci-lint run --config=.golangci.json --fix; \
|
||||
else \
|
||||
echo "golangci-lint not installed. Install with: go install github.com/golangci/golangci-lint/cmd/golangci-lint@latest"; \
|
||||
exit 1; \
|
||||
fi
|
||||
|
||||
|
||||
# Start PostgreSQL for integration tests
|
||||
docker-up:
|
||||
@echo "Starting PostgreSQL container..."
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
@@ -67,9 +67,36 @@ func main() {
|
||||
// Setup routes using new SetupMuxRoutes function (without authentication)
|
||||
resolvespec.SetupMuxRoutes(r, handler, nil)
|
||||
|
||||
// Create graceful server with configuration
|
||||
srv := server.NewGracefulServer(server.Config{
|
||||
Addr: cfg.Server.Addr,
|
||||
// Create server manager
|
||||
mgr := server.NewManager()
|
||||
|
||||
// Parse host and port from addr
|
||||
host := ""
|
||||
port := 8080
|
||||
if cfg.Server.Addr != "" {
|
||||
// Parse addr (format: ":8080" or "localhost:8080")
|
||||
if cfg.Server.Addr[0] == ':' {
|
||||
// Just port
|
||||
_, err := fmt.Sscanf(cfg.Server.Addr, ":%d", &port)
|
||||
if err != nil {
|
||||
logger.Error("Invalid server address: %s", cfg.Server.Addr)
|
||||
os.Exit(1)
|
||||
}
|
||||
} else {
|
||||
// Host and port
|
||||
_, err := fmt.Sscanf(cfg.Server.Addr, "%[^:]:%d", &host, &port)
|
||||
if err != nil {
|
||||
logger.Error("Invalid server address: %s", cfg.Server.Addr)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Add server instance
|
||||
_, err = mgr.Add(server.Config{
|
||||
Name: "api",
|
||||
Host: host,
|
||||
Port: port,
|
||||
Handler: r,
|
||||
ShutdownTimeout: cfg.Server.ShutdownTimeout,
|
||||
DrainTimeout: cfg.Server.DrainTimeout,
|
||||
@@ -77,11 +104,15 @@ func main() {
|
||||
WriteTimeout: cfg.Server.WriteTimeout,
|
||||
IdleTimeout: cfg.Server.IdleTimeout,
|
||||
})
|
||||
if err != nil {
|
||||
logger.Error("Failed to add server: %v", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// Start server with graceful shutdown
|
||||
logger.Info("Starting server on %s", cfg.Server.Addr)
|
||||
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
||||
logger.Error("Server failed to start: %v", err)
|
||||
if err := mgr.ServeWithGracefulShutdown(); err != nil {
|
||||
logger.Error("Server failed: %v", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
9
go.mod
9
go.mod
@@ -7,6 +7,7 @@ toolchain go1.24.6
|
||||
require (
|
||||
github.com/DATA-DOG/go-sqlmock v1.5.2
|
||||
github.com/bradfitz/gomemcache v0.0.0-20250403215159-8d39553ac7cf
|
||||
github.com/eclipse/paho.mqtt.golang v1.5.1
|
||||
github.com/getsentry/sentry-go v0.40.0
|
||||
github.com/glebarez/sqlite v1.11.0
|
||||
github.com/google/uuid v1.6.0
|
||||
@@ -14,6 +15,8 @@ require (
|
||||
github.com/gorilla/websocket v1.5.3
|
||||
github.com/jackc/pgx/v5 v5.6.0
|
||||
github.com/klauspost/compress v1.18.0
|
||||
github.com/mochi-mqtt/server/v2 v2.7.9
|
||||
github.com/nats-io/nats.go v1.48.0
|
||||
github.com/prometheus/client_golang v1.23.2
|
||||
github.com/redis/go-redis/v9 v9.17.1
|
||||
github.com/spf13/viper v1.21.0
|
||||
@@ -34,6 +37,7 @@ require (
|
||||
golang.org/x/crypto v0.43.0
|
||||
golang.org/x/time v0.14.0
|
||||
gorm.io/driver/postgres v1.6.0
|
||||
gorm.io/driver/sqlite v1.6.0
|
||||
gorm.io/gorm v1.30.0
|
||||
)
|
||||
|
||||
@@ -58,7 +62,6 @@ require (
|
||||
github.com/docker/go-units v0.5.0 // indirect
|
||||
github.com/dustin/go-humanize v1.0.1 // indirect
|
||||
github.com/ebitengine/purego v0.8.4 // indirect
|
||||
github.com/eclipse/paho.mqtt.golang v1.5.1 // indirect
|
||||
github.com/felixge/httpsnoop v1.0.4 // indirect
|
||||
github.com/fsnotify/fsnotify v1.9.0 // indirect
|
||||
github.com/glebarez/go-sqlite v1.21.2 // indirect
|
||||
@@ -83,9 +86,10 @@ require (
|
||||
github.com/moby/sys/user v0.4.0 // indirect
|
||||
github.com/moby/sys/userns v0.1.0 // indirect
|
||||
github.com/moby/term v0.5.0 // indirect
|
||||
github.com/mochi-mqtt/server/v2 v2.7.9 // indirect
|
||||
github.com/morikuni/aec v1.0.0 // indirect
|
||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
|
||||
github.com/nats-io/nkeys v0.4.11 // indirect
|
||||
github.com/nats-io/nuid v1.0.1 // indirect
|
||||
github.com/ncruces/go-strftime v1.0.0 // indirect
|
||||
github.com/opencontainers/go-digest v1.0.0 // indirect
|
||||
github.com/opencontainers/image-spec v1.1.1 // indirect
|
||||
@@ -133,7 +137,6 @@ require (
|
||||
google.golang.org/grpc v1.75.0 // indirect
|
||||
google.golang.org/protobuf v1.36.8 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
gorm.io/driver/sqlite v1.6.0 // indirect
|
||||
modernc.org/libc v1.67.0 // indirect
|
||||
modernc.org/mathutil v1.7.1 // indirect
|
||||
modernc.org/memory v1.11.0 // indirect
|
||||
|
||||
10
go.sum
10
go.sum
@@ -101,6 +101,8 @@ github.com/jackc/pgx/v5 v5.6.0 h1:SWJzexBzPL5jb0GEsrPMLIsi/3jOo7RHlzTjcAeDrPY=
|
||||
github.com/jackc/pgx/v5 v5.6.0/go.mod h1:DNZ/vlrUnhWCoFGxHAG8U2ljioxukquj7utPDgtQdTw=
|
||||
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
|
||||
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
|
||||
github.com/jinzhu/copier v0.3.5 h1:GlvfUwHk62RokgqVNvYsku0TATCF7bAHVwEXoBh3iJg=
|
||||
github.com/jinzhu/copier v0.3.5/go.mod h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg=
|
||||
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
|
||||
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
|
||||
github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
|
||||
@@ -144,6 +146,12 @@ github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A=
|
||||
github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc=
|
||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
|
||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
|
||||
github.com/nats-io/nats.go v1.48.0 h1:pSFyXApG+yWU/TgbKCjmm5K4wrHu86231/w84qRVR+U=
|
||||
github.com/nats-io/nats.go v1.48.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
|
||||
github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0=
|
||||
github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE=
|
||||
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
|
||||
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
|
||||
github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOFAw7w=
|
||||
github.com/ncruces/go-strftime v1.0.0/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
|
||||
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
|
||||
@@ -314,8 +322,6 @@ gorm.io/driver/postgres v1.6.0 h1:2dxzU8xJ+ivvqTRph34QX+WrRaJlmfyPqXmoGVjMBa4=
|
||||
gorm.io/driver/postgres v1.6.0/go.mod h1:vUw0mrGgrTK+uPHEhAdV4sfFELrByKVGnaVRkXDhtWo=
|
||||
gorm.io/driver/sqlite v1.6.0 h1:WHRRrIiulaPiPFmDcod6prc4l2VGVWHz80KspNsxSfQ=
|
||||
gorm.io/driver/sqlite v1.6.0/go.mod h1:AO9V1qIQddBESngQUKWL9yoH93HIeA1X6V633rBwyT8=
|
||||
gorm.io/gorm v1.25.12 h1:I0u8i2hWQItBq1WfE0o2+WuL9+8L21K9e2HHSTE/0f8=
|
||||
gorm.io/gorm v1.25.12/go.mod h1:xh7N7RHfYlNc5EmcI/El95gXusucDrQnHXe0+CgWcLQ=
|
||||
gorm.io/gorm v1.30.0 h1:qbT5aPv1UH8gI99OsRlvDToLxW5zR7FzS9acZDOZcgs=
|
||||
gorm.io/gorm v1.30.0/go.mod h1:8Z33v652h4//uMA76KjeDH8mJXPm1QNCYrMeatR0DOE=
|
||||
gotest.tools/v3 v3.5.2 h1:7koQfIKdy+I8UTetycgUqXWSDwpgv193Ka+qRsmBY8Q=
|
||||
|
||||
@@ -486,9 +486,10 @@ func extractTableAndColumn(cond string) (table string, column string) {
|
||||
return "", ""
|
||||
}
|
||||
|
||||
// extractUnqualifiedColumnName extracts the column name from an unqualified condition
|
||||
// Unused: extractUnqualifiedColumnName extracts the column name from an unqualified condition
|
||||
// For example: "rid_parentmastertaskitem is null" returns "rid_parentmastertaskitem"
|
||||
// "status = 'active'" returns "status"
|
||||
// nolint:unused
|
||||
func extractUnqualifiedColumnName(cond string) string {
|
||||
// Common SQL operators
|
||||
operators := []string{" = ", " != ", " <> ", " > ", " >= ", " < ", " <= ", " LIKE ", " like ", " IN ", " in ", " IS ", " is ", " NOT ", " not "}
|
||||
|
||||
353
pkg/eventbroker/IMPLEMENTATION_PLAN.md
Normal file
353
pkg/eventbroker/IMPLEMENTATION_PLAN.md
Normal file
@@ -0,0 +1,353 @@
|
||||
# Event Broker System Implementation Plan
|
||||
|
||||
## Overview
|
||||
Implement a comprehensive event handler/broker system for ResolveSpec that follows existing architectural patterns (Provider interface, Hook system, Config management, Graceful shutdown).
|
||||
|
||||
## Requirements Met
|
||||
- ✅ Events with sources (database, websocket, frontend, system)
|
||||
- ✅ Event statuses (pending, processing, completed, failed)
|
||||
- ✅ Timestamps, JSON payloads, user IDs, session IDs
|
||||
- ✅ Program instance IDs for tracking server instances
|
||||
- ✅ Both sync and async processing modes
|
||||
- ✅ Multiple provider backends (in-memory, Redis, NATS, database)
|
||||
- ✅ Cross-instance pub/sub support
|
||||
|
||||
## Architecture
|
||||
|
||||
### Core Components
|
||||
|
||||
**Event Structure** (with full metadata):
|
||||
```go
|
||||
type Event struct {
|
||||
ID string // UUID
|
||||
Source EventSource // database, websocket, system, frontend
|
||||
Type string // Pattern: schema.entity.operation
|
||||
Status EventStatus // pending, processing, completed, failed
|
||||
Payload json.RawMessage // JSON payload
|
||||
UserID int
|
||||
SessionID string
|
||||
InstanceID string // Server instance identifier
|
||||
Schema string
|
||||
Entity string
|
||||
Operation string // create, update, delete, read
|
||||
CreatedAt time.Time
|
||||
ProcessedAt *time.Time
|
||||
CompletedAt *time.Time
|
||||
Error string
|
||||
Metadata map[string]interface{}
|
||||
RetryCount int
|
||||
}
|
||||
```
|
||||
|
||||
**Provider Pattern** (like cache.Provider):
|
||||
```go
|
||||
type Provider interface {
|
||||
Store(ctx context.Context, event *Event) error
|
||||
Get(ctx context.Context, id string) (*Event, error)
|
||||
List(ctx context.Context, filter *EventFilter) ([]*Event, error)
|
||||
UpdateStatus(ctx context.Context, id string, status EventStatus, error string) error
|
||||
Stream(ctx context.Context, pattern string) (<-chan *Event, error)
|
||||
Publish(ctx context.Context, event *Event) error
|
||||
Close() error
|
||||
Stats(ctx context.Context) (*ProviderStats, error)
|
||||
}
|
||||
```
|
||||
|
||||
**Broker Interface**:
|
||||
```go
|
||||
type Broker interface {
|
||||
Publish(ctx context.Context, event *Event) error // Mode-dependent
|
||||
PublishSync(ctx context.Context, event *Event) error // Blocks
|
||||
PublishAsync(ctx context.Context, event *Event) error // Non-blocking
|
||||
Subscribe(pattern string, handler EventHandler) (SubscriptionID, error)
|
||||
Unsubscribe(id SubscriptionID) error
|
||||
Start(ctx context.Context) error
|
||||
Stop(ctx context.Context) error
|
||||
Stats(ctx context.Context) (*BrokerStats, error)
|
||||
}
|
||||
```
|
||||
|
||||
## Implementation Steps
|
||||
|
||||
### Phase 1: Core Foundation (Files: 1-5)
|
||||
|
||||
**1. Create `pkg/eventbroker/event.go`**
|
||||
- Event struct with all required fields (status, timestamps, user, instance ID, etc.)
|
||||
- EventSource enum (database, websocket, frontend, system, internal)
|
||||
- EventStatus enum (pending, processing, completed, failed)
|
||||
- Helper: `EventType(schema, entity, operation string) string`
|
||||
- Helper: `NewEvent()` constructor with UUID generation
|
||||
|
||||
**2. Create `pkg/eventbroker/provider.go`**
|
||||
- Provider interface definition
|
||||
- EventFilter struct for queries
|
||||
- ProviderStats struct
|
||||
|
||||
**3. Create `pkg/eventbroker/handler.go`**
|
||||
- EventHandler interface
|
||||
- EventHandlerFunc adapter type
|
||||
|
||||
**4. Create `pkg/eventbroker/broker.go`**
|
||||
- Broker interface definition
|
||||
- EventBroker struct implementation
|
||||
- ProcessingMode enum (sync, async)
|
||||
- Options struct with functional options (WithProvider, WithMode, WithWorkerCount, etc.)
|
||||
- NewBroker() constructor
|
||||
- Sync processing implementation
|
||||
|
||||
**5. Create `pkg/eventbroker/subscription.go`**
|
||||
- Pattern matching using glob syntax (e.g., "public.users.*", "*.*.create")
|
||||
- subscriptionManager struct
|
||||
- SubscriptionID type
|
||||
- Subscribe/Unsubscribe logic
|
||||
|
||||
### Phase 2: Configuration & Integration (Files: 6-8)
|
||||
|
||||
**6. Create `pkg/eventbroker/config.go`**
|
||||
- EventBrokerConfig struct
|
||||
- RedisConfig, NATSConfig, DatabaseConfig structs
|
||||
- RetryPolicyConfig struct
|
||||
|
||||
**7. Update `pkg/config/config.go`**
|
||||
- Add `EventBroker EventBrokerConfig` field to Config struct
|
||||
|
||||
**8. Update `pkg/config/manager.go`**
|
||||
- Add event broker defaults to `setDefaults()`:
|
||||
```go
|
||||
v.SetDefault("event_broker.enabled", false)
|
||||
v.SetDefault("event_broker.provider", "memory")
|
||||
v.SetDefault("event_broker.mode", "async")
|
||||
v.SetDefault("event_broker.worker_count", 10)
|
||||
v.SetDefault("event_broker.buffer_size", 1000)
|
||||
```
|
||||
|
||||
### Phase 3: Memory Provider (Files: 9)
|
||||
|
||||
**9. Create `pkg/eventbroker/provider_memory.go`**
|
||||
- MemoryProvider struct with mutex-protected map
|
||||
- In-memory event storage
|
||||
- Pattern matching for subscriptions
|
||||
- Channel-based streaming for real-time events
|
||||
- LRU eviction when max size reached
|
||||
- Cleanup goroutine for old completed events
|
||||
- **Note**: Single-instance only (no cross-instance pub/sub)
|
||||
|
||||
### Phase 4: Async Processing (Update File: 4)
|
||||
|
||||
**10. Update `pkg/eventbroker/broker.go`** (add async support)
|
||||
- workerPool struct with configurable worker count
|
||||
- Buffered channel for event queue
|
||||
- Worker goroutines that process events
|
||||
- PublishAsync() queues to channel
|
||||
- Graceful shutdown: stop accepting events, drain queue, wait for workers
|
||||
- Retry logic with exponential backoff
|
||||
|
||||
### Phase 5: Hook Integration (Files: 11)
|
||||
|
||||
**11. Create `pkg/eventbroker/hooks.go`**
|
||||
- `RegisterCRUDHooks(broker Broker, hookRegistry *restheadspec.HookRegistry)`
|
||||
- Registers AfterCreate, AfterUpdate, AfterDelete, AfterRead hooks
|
||||
- Extracts UserContext from hook context
|
||||
- Creates Event with proper metadata
|
||||
- Calls `broker.PublishAsync()` to not block CRUD operations
|
||||
|
||||
### Phase 6: Global Singleton & Factory (Files: 12-13)
|
||||
|
||||
**12. Create `pkg/eventbroker/eventbroker.go`**
|
||||
- Global `defaultBroker` variable
|
||||
- `Initialize(config *config.Config) error` - creates broker from config
|
||||
- `SetDefaultBroker(broker Broker)`
|
||||
- `GetDefaultBroker() Broker`
|
||||
- Helper functions: `Publish()`, `PublishAsync()`, `PublishSync()`, `Subscribe()`
|
||||
- `RegisterShutdown(broker Broker)` - registers with server.RegisterShutdownCallback()
|
||||
|
||||
**13. Create `pkg/eventbroker/factory.go`**
|
||||
- `NewProviderFromConfig(config EventBrokerConfig) (Provider, error)`
|
||||
- Provider selection logic (memory, redis, nats, database)
|
||||
- Returns appropriate provider based on config
|
||||
|
||||
### Phase 7: Redis Provider (Files: 14)
|
||||
|
||||
**14. Create `pkg/eventbroker/provider_redis.go`**
|
||||
- RedisProvider using Redis Streams (XADD, XREAD, XGROUP)
|
||||
- Consumer group for distributed processing
|
||||
- Cross-instance pub/sub support
|
||||
- Stream(pattern) subscribes to consumer group
|
||||
- Publish() uses XADD to append to stream
|
||||
- Graceful shutdown: acknowledge pending messages
|
||||
|
||||
**Dependencies**: `github.com/redis/go-redis/v9`
|
||||
|
||||
### Phase 8: NATS Provider (Files: 15)
|
||||
|
||||
**15. Create `pkg/eventbroker/provider_nats.go`**
|
||||
- NATSProvider using NATS JetStream
|
||||
- Subject-based routing: `events.{source}.{type}`
|
||||
- Wildcard subscriptions support
|
||||
- Durable consumers for replay
|
||||
- At-least-once delivery semantics
|
||||
|
||||
**Dependencies**: `github.com/nats-io/nats.go`
|
||||
|
||||
### Phase 9: Database Provider (Files: 16)
|
||||
|
||||
**16. Create `pkg/eventbroker/provider_database.go`**
|
||||
- DatabaseProvider using `common.Database` interface
|
||||
- Table schema creation (events table with indexes)
|
||||
- Polling-based event consumption (configurable interval)
|
||||
- Full SQL query support via List(filter)
|
||||
- Transaction support for atomic operations
|
||||
- Good for audit trails and debugging
|
||||
|
||||
### Phase 10: Metrics Integration (Files: 17)
|
||||
|
||||
**17. Create `pkg/eventbroker/metrics.go`**
|
||||
- Integrate with existing `metrics.Provider`
|
||||
- Record metrics:
|
||||
- `eventbroker_events_published_total{source, type}`
|
||||
- `eventbroker_events_processed_total{source, type, status}`
|
||||
- `eventbroker_event_processing_duration_seconds{source, type}`
|
||||
- `eventbroker_queue_size`
|
||||
- `eventbroker_workers_active`
|
||||
|
||||
**18. Update `pkg/metrics/interfaces.go`**
|
||||
- Add methods to Provider interface:
|
||||
```go
|
||||
RecordEventPublished(source, eventType string)
|
||||
RecordEventProcessed(source, eventType, status string, duration time.Duration)
|
||||
UpdateEventQueueSize(size int64)
|
||||
```
|
||||
|
||||
### Phase 11: Testing & Examples (Files: 19-20)
|
||||
|
||||
**19. Create `pkg/eventbroker/eventbroker_test.go`**
|
||||
- Unit tests for Event marshaling
|
||||
- Pattern matching tests
|
||||
- MemoryProvider tests
|
||||
- Sync vs async mode tests
|
||||
- Concurrent publish/subscribe tests
|
||||
- Graceful shutdown tests
|
||||
|
||||
**20. Create `pkg/eventbroker/example_usage.go`**
|
||||
- Basic publish example
|
||||
- Subscribe with patterns example
|
||||
- Hook integration example
|
||||
- Multiple handlers example
|
||||
- Error handling example
|
||||
|
||||
## Integration Points
|
||||
|
||||
### Hook System Integration
|
||||
```go
|
||||
// In application initialization (e.g., main.go)
|
||||
eventbroker.RegisterCRUDHooks(broker, handler.Hooks())
|
||||
```
|
||||
|
||||
This automatically publishes events for all CRUD operations:
|
||||
- `schema.entity.create` after inserts
|
||||
- `schema.entity.update` after updates
|
||||
- `schema.entity.delete` after deletes
|
||||
- `schema.entity.read` after reads
|
||||
|
||||
### Shutdown Integration
|
||||
```go
|
||||
// In application initialization
|
||||
eventbroker.RegisterShutdown(broker)
|
||||
```
|
||||
|
||||
Ensures event broker flushes pending events before shutdown.
|
||||
|
||||
### Configuration Example
|
||||
```yaml
|
||||
event_broker:
|
||||
enabled: true
|
||||
provider: redis # memory, redis, nats, database
|
||||
mode: async # sync, async
|
||||
worker_count: 10
|
||||
buffer_size: 1000
|
||||
instance_id: "${HOSTNAME}"
|
||||
|
||||
redis:
|
||||
stream_name: "resolvespec:events"
|
||||
consumer_group: "resolvespec-workers"
|
||||
host: "localhost"
|
||||
port: 6379
|
||||
```
|
||||
|
||||
## Usage Examples
|
||||
|
||||
### Publishing Custom Events
|
||||
```go
|
||||
// WebSocket event
|
||||
event := &eventbroker.Event{
|
||||
Source: eventbroker.EventSourceWebSocket,
|
||||
Type: "chat.message",
|
||||
Payload: json.RawMessage(`{"room": "lobby", "msg": "Hello"}`),
|
||||
UserID: userID,
|
||||
SessionID: sessionID,
|
||||
}
|
||||
eventbroker.PublishAsync(ctx, event)
|
||||
```
|
||||
|
||||
### Subscribing to Events
|
||||
```go
|
||||
// Subscribe to all user creation events
|
||||
eventbroker.Subscribe("public.users.create", eventbroker.EventHandlerFunc(
|
||||
func(ctx context.Context, event *eventbroker.Event) error {
|
||||
log.Printf("New user created: %s", event.Payload)
|
||||
// Send welcome email, update cache, etc.
|
||||
return nil
|
||||
},
|
||||
))
|
||||
|
||||
// Subscribe to all events from database
|
||||
eventbroker.Subscribe("*", eventbroker.EventHandlerFunc(
|
||||
func(ctx context.Context, event *eventbroker.Event) error {
|
||||
if event.Source == eventbroker.EventSourceDatabase {
|
||||
// Audit logging
|
||||
}
|
||||
return nil
|
||||
},
|
||||
))
|
||||
```
|
||||
|
||||
## Critical Files Reference
|
||||
|
||||
**Patterns to Follow**:
|
||||
- `pkg/cache/provider.go` - Provider interface pattern
|
||||
- `pkg/restheadspec/hooks.go` - Hook system integration
|
||||
- `pkg/config/manager.go` - Configuration pattern
|
||||
- `pkg/server/shutdown.go` - Shutdown callbacks
|
||||
|
||||
**Files to Modify**:
|
||||
- `pkg/config/config.go` - Add EventBroker field
|
||||
- `pkg/config/manager.go` - Add defaults
|
||||
- `pkg/metrics/interfaces.go` - Add event broker metrics
|
||||
|
||||
**New Package**:
|
||||
- `pkg/eventbroker/` (20 files total)
|
||||
|
||||
## Provider Feature Matrix
|
||||
|
||||
| Feature | Memory | Redis | NATS | Database |
|
||||
|---------|--------|-------|------|----------|
|
||||
| Persistence | ❌ | ✅ | ✅ | ✅ |
|
||||
| Cross-instance | ❌ | ✅ | ✅ | ✅ |
|
||||
| Real-time | ✅ | ✅ | ✅ | ⚠️ (polling) |
|
||||
| Query history | Limited | Limited | ✅ (replay) | ✅ (SQL) |
|
||||
| External deps | None | Redis | NATS | None |
|
||||
| Complexity | Low | Medium | Medium | Low |
|
||||
|
||||
## Implementation Order Priority
|
||||
|
||||
1. **Core + Memory Provider** (Phase 1-3) - Functional in-process event system
|
||||
2. **Async + Hooks** (Phase 4-5) - Non-blocking event dispatch integrated with CRUD
|
||||
3. **Config + Singleton** (Phase 6) - Easy initialization and usage
|
||||
4. **Redis Provider** (Phase 7) - Production-ready distributed events
|
||||
5. **Metrics** (Phase 10) - Observability
|
||||
6. **NATS/Database** (Phase 8-9) - Alternative backends
|
||||
7. **Tests + Examples** (Phase 11) - Documentation and reliability
|
||||
|
||||
## Next Steps
|
||||
|
||||
After approval, implement in order of phases. Each phase builds on previous phases and can be tested independently.
|
||||
@@ -172,12 +172,13 @@ event_broker:
|
||||
provider: memory
|
||||
```
|
||||
|
||||
### Redis Provider (Future)
|
||||
### Redis Provider
|
||||
|
||||
Best for: Production, multi-instance deployments
|
||||
|
||||
- **Pros**: Persistent, cross-instance pub/sub, reliable
|
||||
- **Cons**: Requires Redis
|
||||
- **Pros**: Persistent, cross-instance pub/sub, reliable, Redis Streams support
|
||||
- **Cons**: Requires Redis server
|
||||
- **Status**: ✅ Implemented
|
||||
|
||||
```yaml
|
||||
event_broker:
|
||||
@@ -185,16 +186,20 @@ event_broker:
|
||||
redis:
|
||||
stream_name: "resolvespec:events"
|
||||
consumer_group: "resolvespec-workers"
|
||||
max_len: 10000
|
||||
host: "localhost"
|
||||
port: 6379
|
||||
password: ""
|
||||
db: 0
|
||||
```
|
||||
|
||||
### NATS Provider (Future)
|
||||
### NATS Provider
|
||||
|
||||
Best for: High-performance, low-latency requirements
|
||||
|
||||
- **Pros**: Very fast, built-in clustering, durable
|
||||
- **Pros**: Very fast, built-in clustering, durable, JetStream support
|
||||
- **Cons**: Requires NATS server
|
||||
- **Status**: ✅ Implemented
|
||||
|
||||
```yaml
|
||||
event_broker:
|
||||
@@ -202,14 +207,17 @@ event_broker:
|
||||
nats:
|
||||
url: "nats://localhost:4222"
|
||||
stream_name: "RESOLVESPEC_EVENTS"
|
||||
storage: "file" # or "memory"
|
||||
max_age: "24h"
|
||||
```
|
||||
|
||||
### Database Provider (Future)
|
||||
### Database Provider
|
||||
|
||||
Best for: Audit trails, event replay, SQL queries
|
||||
|
||||
- **Pros**: No additional infrastructure, full SQL query support, PostgreSQL NOTIFY for real-time
|
||||
- **Cons**: Slower than Redis/NATS
|
||||
- **Cons**: Slower than Redis/NATS, requires database connection
|
||||
- **Status**: ✅ Implemented
|
||||
|
||||
```yaml
|
||||
event_broker:
|
||||
@@ -217,6 +225,7 @@ event_broker:
|
||||
database:
|
||||
table_name: "events"
|
||||
channel: "resolvespec_events"
|
||||
poll_interval: "1s"
|
||||
```
|
||||
|
||||
## Processing Modes
|
||||
@@ -314,14 +323,25 @@ See `example_usage.go` for comprehensive examples including:
|
||||
└─────────────────┘
|
||||
```
|
||||
|
||||
## Implemented Features
|
||||
|
||||
- [x] Memory Provider (in-process, single-instance)
|
||||
- [x] Redis Streams Provider (distributed, persistent)
|
||||
- [x] NATS JetStream Provider (distributed, high-performance)
|
||||
- [x] Database Provider with PostgreSQL NOTIFY (SQL-queryable, audit-friendly)
|
||||
- [x] Sync and Async processing modes
|
||||
- [x] Pattern-based subscriptions
|
||||
- [x] Hook integration for automatic CRUD events
|
||||
- [x] Retry policy with exponential backoff
|
||||
- [x] Graceful shutdown
|
||||
|
||||
## Future Enhancements
|
||||
|
||||
- [ ] Database Provider with PostgreSQL NOTIFY
|
||||
- [ ] Redis Streams Provider
|
||||
- [ ] NATS JetStream Provider
|
||||
- [ ] Event replay functionality
|
||||
- [ ] Dead letter queue
|
||||
- [ ] Event filtering at provider level
|
||||
- [ ] Batch publishing
|
||||
- [ ] Event compression
|
||||
- [ ] Schema versioning
|
||||
- [ ] Event replay functionality from specific timestamp
|
||||
- [ ] Dead letter queue for failed events
|
||||
- [ ] Event filtering at provider level for performance
|
||||
- [ ] Batch publishing support
|
||||
- [ ] Event compression for large payloads
|
||||
- [ ] Schema versioning and migration
|
||||
- [ ] Event streaming to external systems (Kafka, RabbitMQ)
|
||||
- [ ] Event aggregation and analytics
|
||||
|
||||
@@ -7,7 +7,6 @@ import (
|
||||
|
||||
"github.com/bitechdev/ResolveSpec/pkg/config"
|
||||
"github.com/bitechdev/ResolveSpec/pkg/logger"
|
||||
"github.com/bitechdev/ResolveSpec/pkg/server"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -69,9 +68,6 @@ func Initialize(cfg config.EventBrokerConfig) error {
|
||||
// Set as default
|
||||
SetDefaultBroker(broker)
|
||||
|
||||
// Register shutdown callback
|
||||
RegisterShutdown(broker)
|
||||
|
||||
logger.Info("Event broker initialized successfully (provider: %s, mode: %s, instance: %s)",
|
||||
cfg.Provider, cfg.Mode, opts.InstanceID)
|
||||
|
||||
@@ -151,10 +147,12 @@ func Stats(ctx context.Context) (*BrokerStats, error) {
|
||||
return broker.Stats(ctx)
|
||||
}
|
||||
|
||||
// RegisterShutdown registers the broker's shutdown with the server shutdown callbacks
|
||||
func RegisterShutdown(broker Broker) {
|
||||
server.RegisterShutdownCallback(func(ctx context.Context) error {
|
||||
// RegisterShutdown registers the broker's shutdown with a server manager
|
||||
// Call this from your application initialization code
|
||||
// Example: serverMgr.RegisterShutdownCallback(eventbroker.MakeShutdownCallback(broker))
|
||||
func MakeShutdownCallback(broker Broker) func(context.Context) error {
|
||||
return func(ctx context.Context) error {
|
||||
logger.Info("Shutting down event broker...")
|
||||
return broker.Stop(ctx)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,16 +24,34 @@ func NewProviderFromConfig(cfg config.EventBrokerConfig) (Provider, error) {
|
||||
}), nil
|
||||
|
||||
case "redis":
|
||||
// Redis provider will be implemented in Phase 8
|
||||
return nil, fmt.Errorf("redis provider not yet implemented")
|
||||
return NewRedisProvider(RedisProviderConfig{
|
||||
Host: cfg.Redis.Host,
|
||||
Port: cfg.Redis.Port,
|
||||
Password: cfg.Redis.Password,
|
||||
DB: cfg.Redis.DB,
|
||||
StreamName: cfg.Redis.StreamName,
|
||||
ConsumerGroup: cfg.Redis.ConsumerGroup,
|
||||
ConsumerName: getInstanceID(cfg.InstanceID),
|
||||
InstanceID: getInstanceID(cfg.InstanceID),
|
||||
MaxLen: cfg.Redis.MaxLen,
|
||||
})
|
||||
|
||||
case "nats":
|
||||
// NATS provider will be implemented in Phase 9
|
||||
return nil, fmt.Errorf("nats provider not yet implemented")
|
||||
// NATS provider initialization
|
||||
// Note: Requires github.com/nats-io/nats.go dependency
|
||||
return NewNATSProvider(NATSProviderConfig{
|
||||
URL: cfg.NATS.URL,
|
||||
StreamName: cfg.NATS.StreamName,
|
||||
SubjectPrefix: "events",
|
||||
InstanceID: getInstanceID(cfg.InstanceID),
|
||||
MaxAge: cfg.NATS.MaxAge,
|
||||
Storage: cfg.NATS.Storage, // "file" or "memory"
|
||||
})
|
||||
|
||||
case "database":
|
||||
// Database provider will be implemented in Phase 7
|
||||
return nil, fmt.Errorf("database provider not yet implemented")
|
||||
// Database provider requires a database connection
|
||||
// This should be provided externally
|
||||
return nil, fmt.Errorf("database provider requires a database connection to be configured separately")
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown provider: %s", cfg.Provider)
|
||||
|
||||
653
pkg/eventbroker/provider_database.go
Normal file
653
pkg/eventbroker/provider_database.go
Normal file
@@ -0,0 +1,653 @@
|
||||
package eventbroker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/bitechdev/ResolveSpec/pkg/common"
|
||||
"github.com/bitechdev/ResolveSpec/pkg/logger"
|
||||
)
|
||||
|
||||
// DatabaseProvider implements Provider interface using SQL database
|
||||
// Features:
|
||||
// - Persistent event storage in database table
|
||||
// - Full SQL query support for event history
|
||||
// - PostgreSQL NOTIFY/LISTEN for real-time updates (optional)
|
||||
// - Polling-based consumption with configurable interval
|
||||
// - Good for audit trails and event replay
|
||||
type DatabaseProvider struct {
|
||||
db common.Database
|
||||
tableName string
|
||||
channel string // PostgreSQL NOTIFY channel name
|
||||
pollInterval time.Duration
|
||||
instanceID string
|
||||
useNotify bool // Whether to use PostgreSQL NOTIFY
|
||||
|
||||
// Subscriptions
|
||||
mu sync.RWMutex
|
||||
subscribers map[string]*dbSubscription
|
||||
|
||||
// Statistics
|
||||
stats DatabaseProviderStats
|
||||
|
||||
// Lifecycle
|
||||
stopPolling chan struct{}
|
||||
wg sync.WaitGroup
|
||||
isRunning atomic.Bool
|
||||
}
|
||||
|
||||
// DatabaseProviderStats contains statistics for the database provider
|
||||
type DatabaseProviderStats struct {
|
||||
TotalEvents atomic.Int64
|
||||
EventsPublished atomic.Int64
|
||||
EventsConsumed atomic.Int64
|
||||
ActiveSubscribers atomic.Int32
|
||||
PollErrors atomic.Int64
|
||||
}
|
||||
|
||||
// dbSubscription represents a single database subscription
|
||||
type dbSubscription struct {
|
||||
pattern string
|
||||
ch chan *Event
|
||||
lastSeenID string
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
// DatabaseProviderConfig configures the database provider
|
||||
type DatabaseProviderConfig struct {
|
||||
DB common.Database
|
||||
TableName string
|
||||
Channel string // PostgreSQL NOTIFY channel (optional)
|
||||
PollInterval time.Duration
|
||||
InstanceID string
|
||||
UseNotify bool // Enable PostgreSQL NOTIFY/LISTEN
|
||||
}
|
||||
|
||||
// NewDatabaseProvider creates a new database event provider
|
||||
func NewDatabaseProvider(cfg DatabaseProviderConfig) (*DatabaseProvider, error) {
|
||||
// Apply defaults
|
||||
if cfg.TableName == "" {
|
||||
cfg.TableName = "events"
|
||||
}
|
||||
if cfg.Channel == "" {
|
||||
cfg.Channel = "resolvespec_events"
|
||||
}
|
||||
if cfg.PollInterval == 0 {
|
||||
cfg.PollInterval = 1 * time.Second
|
||||
}
|
||||
|
||||
dp := &DatabaseProvider{
|
||||
db: cfg.DB,
|
||||
tableName: cfg.TableName,
|
||||
channel: cfg.Channel,
|
||||
pollInterval: cfg.PollInterval,
|
||||
instanceID: cfg.InstanceID,
|
||||
useNotify: cfg.UseNotify,
|
||||
subscribers: make(map[string]*dbSubscription),
|
||||
stopPolling: make(chan struct{}),
|
||||
}
|
||||
|
||||
dp.isRunning.Store(true)
|
||||
|
||||
// Create table if it doesn't exist
|
||||
ctx := context.Background()
|
||||
if err := dp.createTable(ctx); err != nil {
|
||||
return nil, fmt.Errorf("failed to create events table: %w", err)
|
||||
}
|
||||
|
||||
// Start polling goroutine for subscriptions
|
||||
dp.wg.Add(1)
|
||||
go dp.pollLoop()
|
||||
|
||||
logger.Info("Database provider initialized (table: %s, poll_interval: %v, notify: %v)",
|
||||
cfg.TableName, cfg.PollInterval, cfg.UseNotify)
|
||||
|
||||
return dp, nil
|
||||
}
|
||||
|
||||
// Store stores an event
|
||||
func (dp *DatabaseProvider) Store(ctx context.Context, event *Event) error {
|
||||
// Marshal metadata to JSON
|
||||
metadataJSON, err := json.Marshal(event.Metadata)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal metadata: %w", err)
|
||||
}
|
||||
|
||||
// Insert event
|
||||
query := fmt.Sprintf(`
|
||||
INSERT INTO %s (
|
||||
id, source, type, status, retry_count, error,
|
||||
payload, user_id, session_id, instance_id,
|
||||
schema, entity, operation,
|
||||
created_at, processed_at, completed_at, metadata
|
||||
) VALUES (
|
||||
$1, $2, $3, $4, $5, $6,
|
||||
$7, $8, $9, $10,
|
||||
$11, $12, $13,
|
||||
$14, $15, $16, $17
|
||||
)
|
||||
`, dp.tableName)
|
||||
|
||||
_, err = dp.db.Exec(ctx, query,
|
||||
event.ID, event.Source, event.Type, event.Status, event.RetryCount, event.Error,
|
||||
event.Payload, event.UserID, event.SessionID, event.InstanceID,
|
||||
event.Schema, event.Entity, event.Operation,
|
||||
event.CreatedAt, event.ProcessedAt, event.CompletedAt, metadataJSON,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to insert event: %w", err)
|
||||
}
|
||||
|
||||
dp.stats.TotalEvents.Add(1)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get retrieves an event by ID
|
||||
func (dp *DatabaseProvider) Get(ctx context.Context, id string) (*Event, error) {
|
||||
event := &Event{}
|
||||
var metadataJSON []byte
|
||||
var processedAt, completedAt sql.NullTime
|
||||
|
||||
// Query into individual fields
|
||||
query := fmt.Sprintf(`
|
||||
SELECT id, source, type, status, retry_count, error,
|
||||
payload, user_id, session_id, instance_id,
|
||||
schema, entity, operation,
|
||||
created_at, processed_at, completed_at, metadata
|
||||
FROM %s
|
||||
WHERE id = $1
|
||||
`, dp.tableName)
|
||||
|
||||
var source, eventType, status, operation string
|
||||
|
||||
// Execute raw query
|
||||
rows, err := dp.db.GetUnderlyingDB().(interface {
|
||||
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
|
||||
}).QueryContext(ctx, query, id)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to query event: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
if !rows.Next() {
|
||||
return nil, fmt.Errorf("event not found: %s", id)
|
||||
}
|
||||
|
||||
if err := rows.Scan(
|
||||
&event.ID, &source, &eventType, &status, &event.RetryCount, &event.Error,
|
||||
&event.Payload, &event.UserID, &event.SessionID, &event.InstanceID,
|
||||
&event.Schema, &event.Entity, &operation,
|
||||
&event.CreatedAt, &processedAt, &completedAt, &metadataJSON,
|
||||
); err != nil {
|
||||
return nil, fmt.Errorf("failed to scan event: %w", err)
|
||||
}
|
||||
|
||||
// Set enum values
|
||||
event.Source = EventSource(source)
|
||||
event.Type = eventType
|
||||
event.Status = EventStatus(status)
|
||||
event.Operation = operation
|
||||
|
||||
// Handle nullable timestamps
|
||||
if processedAt.Valid {
|
||||
event.ProcessedAt = &processedAt.Time
|
||||
}
|
||||
if completedAt.Valid {
|
||||
event.CompletedAt = &completedAt.Time
|
||||
}
|
||||
|
||||
// Unmarshal metadata
|
||||
if len(metadataJSON) > 0 {
|
||||
if err := json.Unmarshal(metadataJSON, &event.Metadata); err != nil {
|
||||
logger.Warn("Failed to unmarshal metadata: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return event, nil
|
||||
}
|
||||
|
||||
// List lists events with optional filters
|
||||
func (dp *DatabaseProvider) List(ctx context.Context, filter *EventFilter) ([]*Event, error) {
|
||||
query := fmt.Sprintf("SELECT id, source, type, status, retry_count, error, "+
|
||||
"payload, user_id, session_id, instance_id, "+
|
||||
"schema, entity, operation, "+
|
||||
"created_at, processed_at, completed_at, metadata "+
|
||||
"FROM %s WHERE 1=1", dp.tableName)
|
||||
|
||||
args := []interface{}{}
|
||||
argNum := 1
|
||||
|
||||
// Build WHERE clause
|
||||
if filter != nil {
|
||||
if filter.Source != nil {
|
||||
query += fmt.Sprintf(" AND source = $%d", argNum)
|
||||
args = append(args, string(*filter.Source))
|
||||
argNum++
|
||||
}
|
||||
if filter.Status != nil {
|
||||
query += fmt.Sprintf(" AND status = $%d", argNum)
|
||||
args = append(args, string(*filter.Status))
|
||||
argNum++
|
||||
}
|
||||
if filter.UserID != nil {
|
||||
query += fmt.Sprintf(" AND user_id = $%d", argNum)
|
||||
args = append(args, *filter.UserID)
|
||||
argNum++
|
||||
}
|
||||
if filter.Schema != "" {
|
||||
query += fmt.Sprintf(" AND schema = $%d", argNum)
|
||||
args = append(args, filter.Schema)
|
||||
argNum++
|
||||
}
|
||||
if filter.Entity != "" {
|
||||
query += fmt.Sprintf(" AND entity = $%d", argNum)
|
||||
args = append(args, filter.Entity)
|
||||
argNum++
|
||||
}
|
||||
if filter.Operation != "" {
|
||||
query += fmt.Sprintf(" AND operation = $%d", argNum)
|
||||
args = append(args, filter.Operation)
|
||||
argNum++
|
||||
}
|
||||
if filter.InstanceID != "" {
|
||||
query += fmt.Sprintf(" AND instance_id = $%d", argNum)
|
||||
args = append(args, filter.InstanceID)
|
||||
argNum++
|
||||
}
|
||||
if filter.StartTime != nil {
|
||||
query += fmt.Sprintf(" AND created_at >= $%d", argNum)
|
||||
args = append(args, *filter.StartTime)
|
||||
argNum++
|
||||
}
|
||||
if filter.EndTime != nil {
|
||||
query += fmt.Sprintf(" AND created_at <= $%d", argNum)
|
||||
args = append(args, *filter.EndTime)
|
||||
argNum++
|
||||
}
|
||||
}
|
||||
|
||||
// Add ORDER BY
|
||||
query += " ORDER BY created_at DESC"
|
||||
|
||||
// Add LIMIT and OFFSET
|
||||
if filter != nil {
|
||||
if filter.Limit > 0 {
|
||||
query += fmt.Sprintf(" LIMIT $%d", argNum)
|
||||
args = append(args, filter.Limit)
|
||||
argNum++
|
||||
}
|
||||
if filter.Offset > 0 {
|
||||
query += fmt.Sprintf(" OFFSET $%d", argNum)
|
||||
args = append(args, filter.Offset)
|
||||
}
|
||||
}
|
||||
|
||||
// Execute query
|
||||
rows, err := dp.db.GetUnderlyingDB().(interface {
|
||||
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
|
||||
}).QueryContext(ctx, query, args...)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to query events: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var results []*Event
|
||||
for rows.Next() {
|
||||
event := &Event{}
|
||||
var source, eventType, status, operation string
|
||||
var metadataJSON []byte
|
||||
var processedAt, completedAt sql.NullTime
|
||||
|
||||
err := rows.Scan(
|
||||
&event.ID, &source, &eventType, &status, &event.RetryCount, &event.Error,
|
||||
&event.Payload, &event.UserID, &event.SessionID, &event.InstanceID,
|
||||
&event.Schema, &event.Entity, &operation,
|
||||
&event.CreatedAt, &processedAt, &completedAt, &metadataJSON,
|
||||
)
|
||||
if err != nil {
|
||||
logger.Warn("Failed to scan event: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Set enum values
|
||||
event.Source = EventSource(source)
|
||||
event.Type = eventType
|
||||
event.Status = EventStatus(status)
|
||||
event.Operation = operation
|
||||
|
||||
// Handle nullable timestamps
|
||||
if processedAt.Valid {
|
||||
event.ProcessedAt = &processedAt.Time
|
||||
}
|
||||
if completedAt.Valid {
|
||||
event.CompletedAt = &completedAt.Time
|
||||
}
|
||||
|
||||
// Unmarshal metadata
|
||||
if len(metadataJSON) > 0 {
|
||||
if err := json.Unmarshal(metadataJSON, &event.Metadata); err != nil {
|
||||
logger.Warn("Failed to unmarshal metadata: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
results = append(results, event)
|
||||
}
|
||||
|
||||
return results, nil
|
||||
}
|
||||
|
||||
// UpdateStatus updates the status of an event
|
||||
func (dp *DatabaseProvider) UpdateStatus(ctx context.Context, id string, status EventStatus, errorMsg string) error {
|
||||
query := fmt.Sprintf(`
|
||||
UPDATE %s
|
||||
SET status = $1, error = $2
|
||||
WHERE id = $3
|
||||
`, dp.tableName)
|
||||
|
||||
_, err := dp.db.Exec(ctx, query, string(status), errorMsg, id)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update status: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete deletes an event by ID
|
||||
func (dp *DatabaseProvider) Delete(ctx context.Context, id string) error {
|
||||
query := fmt.Sprintf("DELETE FROM %s WHERE id = $1", dp.tableName)
|
||||
|
||||
_, err := dp.db.Exec(ctx, query, id)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to delete event: %w", err)
|
||||
}
|
||||
|
||||
dp.stats.TotalEvents.Add(-1)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stream returns a channel of events for real-time consumption
|
||||
func (dp *DatabaseProvider) Stream(ctx context.Context, pattern string) (<-chan *Event, error) {
|
||||
ch := make(chan *Event, 100)
|
||||
|
||||
subCtx, cancel := context.WithCancel(ctx)
|
||||
|
||||
sub := &dbSubscription{
|
||||
pattern: pattern,
|
||||
ch: ch,
|
||||
lastSeenID: "",
|
||||
ctx: subCtx,
|
||||
cancel: cancel,
|
||||
}
|
||||
|
||||
dp.mu.Lock()
|
||||
dp.subscribers[pattern] = sub
|
||||
dp.stats.ActiveSubscribers.Add(1)
|
||||
dp.mu.Unlock()
|
||||
|
||||
return ch, nil
|
||||
}
|
||||
|
||||
// Publish publishes an event to all subscribers
|
||||
func (dp *DatabaseProvider) Publish(ctx context.Context, event *Event) error {
|
||||
// Store the event first
|
||||
if err := dp.Store(ctx, event); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
dp.stats.EventsPublished.Add(1)
|
||||
|
||||
// If using PostgreSQL NOTIFY, send notification
|
||||
if dp.useNotify {
|
||||
if err := dp.notify(ctx, event.ID); err != nil {
|
||||
logger.Warn("Failed to send NOTIFY: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close closes the provider and releases resources
|
||||
func (dp *DatabaseProvider) Close() error {
|
||||
if !dp.isRunning.Load() {
|
||||
return nil
|
||||
}
|
||||
|
||||
dp.isRunning.Store(false)
|
||||
|
||||
// Cancel all subscriptions
|
||||
dp.mu.Lock()
|
||||
for _, sub := range dp.subscribers {
|
||||
sub.cancel()
|
||||
}
|
||||
dp.mu.Unlock()
|
||||
|
||||
// Stop polling
|
||||
close(dp.stopPolling)
|
||||
|
||||
// Wait for goroutines
|
||||
dp.wg.Wait()
|
||||
|
||||
logger.Info("Database provider closed")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stats returns provider statistics
|
||||
func (dp *DatabaseProvider) Stats(ctx context.Context) (*ProviderStats, error) {
|
||||
// Get counts by status
|
||||
query := fmt.Sprintf(`
|
||||
SELECT
|
||||
COUNT(*) FILTER (WHERE status = 'pending') as pending,
|
||||
COUNT(*) FILTER (WHERE status = 'processing') as processing,
|
||||
COUNT(*) FILTER (WHERE status = 'completed') as completed,
|
||||
COUNT(*) FILTER (WHERE status = 'failed') as failed,
|
||||
COUNT(*) as total
|
||||
FROM %s
|
||||
`, dp.tableName)
|
||||
|
||||
var pending, processing, completed, failed, total int64
|
||||
|
||||
rows, err := dp.db.GetUnderlyingDB().(interface {
|
||||
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
|
||||
}).QueryContext(ctx, query)
|
||||
if err != nil {
|
||||
logger.Warn("Failed to get stats: %v", err)
|
||||
} else {
|
||||
defer rows.Close()
|
||||
if rows.Next() {
|
||||
if err := rows.Scan(&pending, &processing, &completed, &failed, &total); err != nil {
|
||||
logger.Warn("Failed to scan stats: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return &ProviderStats{
|
||||
ProviderType: "database",
|
||||
TotalEvents: total,
|
||||
PendingEvents: pending,
|
||||
ProcessingEvents: processing,
|
||||
CompletedEvents: completed,
|
||||
FailedEvents: failed,
|
||||
EventsPublished: dp.stats.EventsPublished.Load(),
|
||||
EventsConsumed: dp.stats.EventsConsumed.Load(),
|
||||
ActiveSubscribers: int(dp.stats.ActiveSubscribers.Load()),
|
||||
ProviderSpecific: map[string]interface{}{
|
||||
"table_name": dp.tableName,
|
||||
"poll_interval": dp.pollInterval.String(),
|
||||
"use_notify": dp.useNotify,
|
||||
"poll_errors": dp.stats.PollErrors.Load(),
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// pollLoop periodically polls for new events
|
||||
func (dp *DatabaseProvider) pollLoop() {
|
||||
defer dp.wg.Done()
|
||||
|
||||
ticker := time.NewTicker(dp.pollInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
dp.pollEvents()
|
||||
case <-dp.stopPolling:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// pollEvents polls for new events and delivers to subscribers
|
||||
func (dp *DatabaseProvider) pollEvents() {
|
||||
dp.mu.RLock()
|
||||
subscribers := make([]*dbSubscription, 0, len(dp.subscribers))
|
||||
for _, sub := range dp.subscribers {
|
||||
subscribers = append(subscribers, sub)
|
||||
}
|
||||
dp.mu.RUnlock()
|
||||
|
||||
for _, sub := range subscribers {
|
||||
// Query for new events since last seen
|
||||
query := fmt.Sprintf(`
|
||||
SELECT id, source, type, status, retry_count, error,
|
||||
payload, user_id, session_id, instance_id,
|
||||
schema, entity, operation,
|
||||
created_at, processed_at, completed_at, metadata
|
||||
FROM %s
|
||||
WHERE id > $1
|
||||
ORDER BY created_at ASC
|
||||
LIMIT 100
|
||||
`, dp.tableName)
|
||||
|
||||
lastSeenID := sub.lastSeenID
|
||||
if lastSeenID == "" {
|
||||
lastSeenID = "00000000-0000-0000-0000-000000000000"
|
||||
}
|
||||
|
||||
rows, err := dp.db.GetUnderlyingDB().(interface {
|
||||
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
|
||||
}).QueryContext(sub.ctx, query, lastSeenID)
|
||||
if err != nil {
|
||||
dp.stats.PollErrors.Add(1)
|
||||
logger.Warn("Failed to poll events: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
for rows.Next() {
|
||||
event := &Event{}
|
||||
var source, eventType, status, operation string
|
||||
var metadataJSON []byte
|
||||
var processedAt, completedAt sql.NullTime
|
||||
|
||||
err := rows.Scan(
|
||||
&event.ID, &source, &eventType, &status, &event.RetryCount, &event.Error,
|
||||
&event.Payload, &event.UserID, &event.SessionID, &event.InstanceID,
|
||||
&event.Schema, &event.Entity, &operation,
|
||||
&event.CreatedAt, &processedAt, &completedAt, &metadataJSON,
|
||||
)
|
||||
if err != nil {
|
||||
logger.Warn("Failed to scan event: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Set enum values
|
||||
event.Source = EventSource(source)
|
||||
event.Type = eventType
|
||||
event.Status = EventStatus(status)
|
||||
event.Operation = operation
|
||||
|
||||
// Handle nullable timestamps
|
||||
if processedAt.Valid {
|
||||
event.ProcessedAt = &processedAt.Time
|
||||
}
|
||||
if completedAt.Valid {
|
||||
event.CompletedAt = &completedAt.Time
|
||||
}
|
||||
|
||||
// Unmarshal metadata
|
||||
if len(metadataJSON) > 0 {
|
||||
if err := json.Unmarshal(metadataJSON, &event.Metadata); err != nil {
|
||||
logger.Warn("Failed to unmarshal metadata: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Check if event matches pattern
|
||||
if matchPattern(sub.pattern, event.Type) {
|
||||
select {
|
||||
case sub.ch <- event:
|
||||
dp.stats.EventsConsumed.Add(1)
|
||||
sub.lastSeenID = event.ID
|
||||
case <-sub.ctx.Done():
|
||||
rows.Close()
|
||||
return
|
||||
default:
|
||||
// Channel full, skip
|
||||
logger.Warn("Subscriber channel full for pattern: %s", sub.pattern)
|
||||
}
|
||||
}
|
||||
|
||||
sub.lastSeenID = event.ID
|
||||
}
|
||||
|
||||
rows.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// notify sends a PostgreSQL NOTIFY message
|
||||
func (dp *DatabaseProvider) notify(ctx context.Context, eventID string) error {
|
||||
query := fmt.Sprintf("NOTIFY %s, '%s'", dp.channel, eventID)
|
||||
_, err := dp.db.Exec(ctx, query)
|
||||
return err
|
||||
}
|
||||
|
||||
// createTable creates the events table if it doesn't exist
|
||||
func (dp *DatabaseProvider) createTable(ctx context.Context) error {
|
||||
query := fmt.Sprintf(`
|
||||
CREATE TABLE IF NOT EXISTS %s (
|
||||
id VARCHAR(255) PRIMARY KEY,
|
||||
source VARCHAR(50) NOT NULL,
|
||||
type VARCHAR(255) NOT NULL,
|
||||
status VARCHAR(50) NOT NULL,
|
||||
retry_count INTEGER DEFAULT 0,
|
||||
error TEXT,
|
||||
payload JSONB,
|
||||
user_id INTEGER,
|
||||
session_id VARCHAR(255),
|
||||
instance_id VARCHAR(255),
|
||||
schema VARCHAR(255),
|
||||
entity VARCHAR(255),
|
||||
operation VARCHAR(50),
|
||||
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
|
||||
processed_at TIMESTAMP,
|
||||
completed_at TIMESTAMP,
|
||||
metadata JSONB
|
||||
)
|
||||
`, dp.tableName)
|
||||
|
||||
if _, err := dp.db.Exec(ctx, query); err != nil {
|
||||
return fmt.Errorf("failed to create table: %w", err)
|
||||
}
|
||||
|
||||
// Create indexes
|
||||
indexes := []string{
|
||||
fmt.Sprintf("CREATE INDEX IF NOT EXISTS idx_%s_source ON %s(source)", dp.tableName, dp.tableName),
|
||||
fmt.Sprintf("CREATE INDEX IF NOT EXISTS idx_%s_type ON %s(type)", dp.tableName, dp.tableName),
|
||||
fmt.Sprintf("CREATE INDEX IF NOT EXISTS idx_%s_status ON %s(status)", dp.tableName, dp.tableName),
|
||||
fmt.Sprintf("CREATE INDEX IF NOT EXISTS idx_%s_created_at ON %s(created_at)", dp.tableName, dp.tableName),
|
||||
fmt.Sprintf("CREATE INDEX IF NOT EXISTS idx_%s_instance_id ON %s(instance_id)", dp.tableName, dp.tableName),
|
||||
}
|
||||
|
||||
for _, indexQuery := range indexes {
|
||||
if _, err := dp.db.Exec(ctx, indexQuery); err != nil {
|
||||
logger.Warn("Failed to create index: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
565
pkg/eventbroker/provider_nats.go
Normal file
565
pkg/eventbroker/provider_nats.go
Normal file
@@ -0,0 +1,565 @@
|
||||
package eventbroker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/nats-io/nats.go/jetstream"
|
||||
|
||||
"github.com/bitechdev/ResolveSpec/pkg/logger"
|
||||
)
|
||||
|
||||
// NATSProvider implements Provider interface using NATS JetStream
|
||||
// Features:
|
||||
// - Persistent event storage using JetStream
|
||||
// - Cross-instance pub/sub using NATS subjects
|
||||
// - Wildcard subscription support
|
||||
// - Durable consumers for event replay
|
||||
// - At-least-once delivery semantics
|
||||
type NATSProvider struct {
|
||||
nc *nats.Conn
|
||||
js jetstream.JetStream
|
||||
stream jetstream.Stream
|
||||
streamName string
|
||||
subjectPrefix string
|
||||
instanceID string
|
||||
maxAge time.Duration
|
||||
|
||||
// Subscriptions
|
||||
mu sync.RWMutex
|
||||
subscribers map[string]*natsSubscription
|
||||
|
||||
// Statistics
|
||||
stats NATSProviderStats
|
||||
|
||||
// Lifecycle
|
||||
wg sync.WaitGroup
|
||||
isRunning atomic.Bool
|
||||
}
|
||||
|
||||
// NATSProviderStats contains statistics for the NATS provider
|
||||
type NATSProviderStats struct {
|
||||
TotalEvents atomic.Int64
|
||||
EventsPublished atomic.Int64
|
||||
EventsConsumed atomic.Int64
|
||||
ActiveSubscribers atomic.Int32
|
||||
ConsumerErrors atomic.Int64
|
||||
}
|
||||
|
||||
// natsSubscription represents a single NATS subscription
|
||||
type natsSubscription struct {
|
||||
pattern string
|
||||
consumer jetstream.Consumer
|
||||
ch chan *Event
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
// NATSProviderConfig configures the NATS provider
|
||||
type NATSProviderConfig struct {
|
||||
URL string
|
||||
StreamName string
|
||||
SubjectPrefix string // e.g., "events"
|
||||
InstanceID string
|
||||
MaxAge time.Duration // How long to keep events
|
||||
Storage string // "file" or "memory"
|
||||
}
|
||||
|
||||
// NewNATSProvider creates a new NATS event provider
|
||||
func NewNATSProvider(cfg NATSProviderConfig) (*NATSProvider, error) {
|
||||
// Apply defaults
|
||||
if cfg.URL == "" {
|
||||
cfg.URL = nats.DefaultURL
|
||||
}
|
||||
if cfg.StreamName == "" {
|
||||
cfg.StreamName = "RESOLVESPEC_EVENTS"
|
||||
}
|
||||
if cfg.SubjectPrefix == "" {
|
||||
cfg.SubjectPrefix = "events"
|
||||
}
|
||||
if cfg.MaxAge == 0 {
|
||||
cfg.MaxAge = 7 * 24 * time.Hour // 7 days
|
||||
}
|
||||
if cfg.Storage == "" {
|
||||
cfg.Storage = "file"
|
||||
}
|
||||
|
||||
// Connect to NATS
|
||||
nc, err := nats.Connect(cfg.URL,
|
||||
nats.Name("resolvespec-eventbroker-"+cfg.InstanceID),
|
||||
nats.Timeout(5*time.Second),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to connect to NATS: %w", err)
|
||||
}
|
||||
|
||||
// Create JetStream context
|
||||
js, err := jetstream.New(nc)
|
||||
if err != nil {
|
||||
nc.Close()
|
||||
return nil, fmt.Errorf("failed to create JetStream context: %w", err)
|
||||
}
|
||||
|
||||
np := &NATSProvider{
|
||||
nc: nc,
|
||||
js: js,
|
||||
streamName: cfg.StreamName,
|
||||
subjectPrefix: cfg.SubjectPrefix,
|
||||
instanceID: cfg.InstanceID,
|
||||
maxAge: cfg.MaxAge,
|
||||
subscribers: make(map[string]*natsSubscription),
|
||||
}
|
||||
|
||||
np.isRunning.Store(true)
|
||||
|
||||
// Create or update stream
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
// Determine storage type
|
||||
var storage jetstream.StorageType
|
||||
if cfg.Storage == "memory" {
|
||||
storage = jetstream.MemoryStorage
|
||||
} else {
|
||||
storage = jetstream.FileStorage
|
||||
}
|
||||
|
||||
if err := np.ensureStream(ctx, storage); err != nil {
|
||||
nc.Close()
|
||||
return nil, fmt.Errorf("failed to create stream: %w", err)
|
||||
}
|
||||
|
||||
logger.Info("NATS provider initialized (stream: %s, subject: %s.*, url: %s)",
|
||||
cfg.StreamName, cfg.SubjectPrefix, cfg.URL)
|
||||
|
||||
return np, nil
|
||||
}
|
||||
|
||||
// Store stores an event
|
||||
func (np *NATSProvider) Store(ctx context.Context, event *Event) error {
|
||||
// Marshal event to JSON
|
||||
data, err := json.Marshal(event)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal event: %w", err)
|
||||
}
|
||||
|
||||
// Publish to NATS subject
|
||||
// Subject format: events.{source}.{schema}.{entity}.{operation}
|
||||
subject := np.buildSubject(event)
|
||||
|
||||
msg := &nats.Msg{
|
||||
Subject: subject,
|
||||
Data: data,
|
||||
Header: nats.Header{
|
||||
"Event-ID": []string{event.ID},
|
||||
"Event-Type": []string{event.Type},
|
||||
"Event-Source": []string{string(event.Source)},
|
||||
"Event-Status": []string{string(event.Status)},
|
||||
"Instance-ID": []string{event.InstanceID},
|
||||
},
|
||||
}
|
||||
|
||||
if _, err := np.js.PublishMsg(ctx, msg); err != nil {
|
||||
return fmt.Errorf("failed to publish event: %w", err)
|
||||
}
|
||||
|
||||
np.stats.TotalEvents.Add(1)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get retrieves an event by ID
|
||||
// Note: This is inefficient with JetStream - consider using a separate KV store for lookups
|
||||
func (np *NATSProvider) Get(ctx context.Context, id string) (*Event, error) {
|
||||
// We need to scan messages which is not ideal
|
||||
// For production, consider using NATS KV store for fast lookups
|
||||
consumer, err := np.stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
|
||||
Name: "get-" + id,
|
||||
FilterSubject: np.subjectPrefix + ".>",
|
||||
DeliverPolicy: jetstream.DeliverAllPolicy,
|
||||
AckPolicy: jetstream.AckExplicitPolicy,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create consumer: %w", err)
|
||||
}
|
||||
|
||||
// Fetch messages in batches
|
||||
msgs, err := consumer.Fetch(1000, jetstream.FetchMaxWait(5*time.Second))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to fetch messages: %w", err)
|
||||
}
|
||||
|
||||
for msg := range msgs.Messages() {
|
||||
if msg.Headers().Get("Event-ID") == id {
|
||||
var event Event
|
||||
if err := json.Unmarshal(msg.Data(), &event); err != nil {
|
||||
_ = msg.Nak()
|
||||
continue
|
||||
}
|
||||
_ = msg.Ack()
|
||||
|
||||
// Delete temporary consumer
|
||||
_ = np.stream.DeleteConsumer(ctx, "get-"+id)
|
||||
|
||||
return &event, nil
|
||||
}
|
||||
_ = msg.Ack()
|
||||
}
|
||||
|
||||
// Delete temporary consumer
|
||||
_ = np.stream.DeleteConsumer(ctx, "get-"+id)
|
||||
|
||||
return nil, fmt.Errorf("event not found: %s", id)
|
||||
}
|
||||
|
||||
// List lists events with optional filters
|
||||
func (np *NATSProvider) List(ctx context.Context, filter *EventFilter) ([]*Event, error) {
|
||||
var results []*Event
|
||||
|
||||
// Create temporary consumer
|
||||
consumer, err := np.stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
|
||||
Name: fmt.Sprintf("list-%d", time.Now().UnixNano()),
|
||||
FilterSubject: np.subjectPrefix + ".>",
|
||||
DeliverPolicy: jetstream.DeliverAllPolicy,
|
||||
AckPolicy: jetstream.AckExplicitPolicy,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create consumer: %w", err)
|
||||
}
|
||||
|
||||
defer func() { _ = np.stream.DeleteConsumer(ctx, consumer.CachedInfo().Name) }()
|
||||
|
||||
// Fetch messages in batches
|
||||
msgs, err := consumer.Fetch(1000, jetstream.FetchMaxWait(5*time.Second))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to fetch messages: %w", err)
|
||||
}
|
||||
|
||||
for msg := range msgs.Messages() {
|
||||
var event Event
|
||||
if err := json.Unmarshal(msg.Data(), &event); err != nil {
|
||||
logger.Warn("Failed to unmarshal event: %v", err)
|
||||
_ = msg.Nak()
|
||||
continue
|
||||
}
|
||||
|
||||
if np.matchesFilter(&event, filter) {
|
||||
results = append(results, &event)
|
||||
}
|
||||
|
||||
_ = msg.Ack()
|
||||
}
|
||||
|
||||
// Apply limit and offset
|
||||
if filter != nil {
|
||||
if filter.Offset > 0 && filter.Offset < len(results) {
|
||||
results = results[filter.Offset:]
|
||||
}
|
||||
if filter.Limit > 0 && filter.Limit < len(results) {
|
||||
results = results[:filter.Limit]
|
||||
}
|
||||
}
|
||||
|
||||
return results, nil
|
||||
}
|
||||
|
||||
// UpdateStatus updates the status of an event
|
||||
// Note: NATS streams are append-only, so we publish a status update event
|
||||
func (np *NATSProvider) UpdateStatus(ctx context.Context, id string, status EventStatus, errorMsg string) error {
|
||||
// Publish a status update message
|
||||
subject := fmt.Sprintf("%s.status.%s", np.subjectPrefix, id)
|
||||
|
||||
statusUpdate := map[string]interface{}{
|
||||
"event_id": id,
|
||||
"status": string(status),
|
||||
"error": errorMsg,
|
||||
"updated_at": time.Now(),
|
||||
}
|
||||
|
||||
data, err := json.Marshal(statusUpdate)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal status update: %w", err)
|
||||
}
|
||||
|
||||
if _, err := np.js.Publish(ctx, subject, data); err != nil {
|
||||
return fmt.Errorf("failed to publish status update: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete deletes an event by ID
|
||||
// Note: NATS streams don't support deletion - this just marks it in a separate subject
|
||||
func (np *NATSProvider) Delete(ctx context.Context, id string) error {
|
||||
subject := fmt.Sprintf("%s.deleted.%s", np.subjectPrefix, id)
|
||||
|
||||
deleteMsg := map[string]interface{}{
|
||||
"event_id": id,
|
||||
"deleted_at": time.Now(),
|
||||
}
|
||||
|
||||
data, err := json.Marshal(deleteMsg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal delete message: %w", err)
|
||||
}
|
||||
|
||||
if _, err := np.js.Publish(ctx, subject, data); err != nil {
|
||||
return fmt.Errorf("failed to publish delete message: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stream returns a channel of events for real-time consumption
|
||||
func (np *NATSProvider) Stream(ctx context.Context, pattern string) (<-chan *Event, error) {
|
||||
ch := make(chan *Event, 100)
|
||||
|
||||
// Convert glob pattern to NATS subject pattern
|
||||
natsSubject := np.patternToSubject(pattern)
|
||||
|
||||
// Create durable consumer
|
||||
consumerName := fmt.Sprintf("consumer-%s-%d", np.instanceID, time.Now().UnixNano())
|
||||
consumer, err := np.stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
|
||||
Name: consumerName,
|
||||
FilterSubject: natsSubject,
|
||||
DeliverPolicy: jetstream.DeliverNewPolicy,
|
||||
AckPolicy: jetstream.AckExplicitPolicy,
|
||||
AckWait: 30 * time.Second,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create consumer: %w", err)
|
||||
}
|
||||
|
||||
subCtx, cancel := context.WithCancel(ctx)
|
||||
|
||||
sub := &natsSubscription{
|
||||
pattern: pattern,
|
||||
consumer: consumer,
|
||||
ch: ch,
|
||||
ctx: subCtx,
|
||||
cancel: cancel,
|
||||
}
|
||||
|
||||
np.mu.Lock()
|
||||
np.subscribers[pattern] = sub
|
||||
np.stats.ActiveSubscribers.Add(1)
|
||||
np.mu.Unlock()
|
||||
|
||||
// Start consumer goroutine
|
||||
np.wg.Add(1)
|
||||
go np.consumeMessages(sub)
|
||||
|
||||
return ch, nil
|
||||
}
|
||||
|
||||
// Publish publishes an event to all subscribers
|
||||
func (np *NATSProvider) Publish(ctx context.Context, event *Event) error {
|
||||
// Store the event first
|
||||
if err := np.Store(ctx, event); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
np.stats.EventsPublished.Add(1)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close closes the provider and releases resources
|
||||
func (np *NATSProvider) Close() error {
|
||||
if !np.isRunning.Load() {
|
||||
return nil
|
||||
}
|
||||
|
||||
np.isRunning.Store(false)
|
||||
|
||||
// Cancel all subscriptions
|
||||
np.mu.Lock()
|
||||
for _, sub := range np.subscribers {
|
||||
sub.cancel()
|
||||
}
|
||||
np.mu.Unlock()
|
||||
|
||||
// Wait for goroutines
|
||||
np.wg.Wait()
|
||||
|
||||
// Close NATS connection
|
||||
np.nc.Close()
|
||||
|
||||
logger.Info("NATS provider closed")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stats returns provider statistics
|
||||
func (np *NATSProvider) Stats(ctx context.Context) (*ProviderStats, error) {
|
||||
streamInfo, err := np.stream.Info(ctx)
|
||||
if err != nil {
|
||||
logger.Warn("Failed to get stream info: %v", err)
|
||||
}
|
||||
|
||||
stats := &ProviderStats{
|
||||
ProviderType: "nats",
|
||||
TotalEvents: np.stats.TotalEvents.Load(),
|
||||
EventsPublished: np.stats.EventsPublished.Load(),
|
||||
EventsConsumed: np.stats.EventsConsumed.Load(),
|
||||
ActiveSubscribers: int(np.stats.ActiveSubscribers.Load()),
|
||||
ProviderSpecific: map[string]interface{}{
|
||||
"stream_name": np.streamName,
|
||||
"subject_prefix": np.subjectPrefix,
|
||||
"max_age": np.maxAge.String(),
|
||||
"consumer_errors": np.stats.ConsumerErrors.Load(),
|
||||
},
|
||||
}
|
||||
|
||||
if streamInfo != nil {
|
||||
stats.ProviderSpecific["messages"] = streamInfo.State.Msgs
|
||||
stats.ProviderSpecific["bytes"] = streamInfo.State.Bytes
|
||||
stats.ProviderSpecific["consumers"] = streamInfo.State.Consumers
|
||||
}
|
||||
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
// ensureStream creates or updates the JetStream stream
|
||||
func (np *NATSProvider) ensureStream(ctx context.Context, storage jetstream.StorageType) error {
|
||||
streamConfig := jetstream.StreamConfig{
|
||||
Name: np.streamName,
|
||||
Subjects: []string{np.subjectPrefix + ".>"},
|
||||
MaxAge: np.maxAge,
|
||||
Storage: storage,
|
||||
Retention: jetstream.LimitsPolicy,
|
||||
Discard: jetstream.DiscardOld,
|
||||
}
|
||||
|
||||
stream, err := np.js.CreateStream(ctx, streamConfig)
|
||||
if err != nil {
|
||||
// Try to update if already exists
|
||||
stream, err = np.js.UpdateStream(ctx, streamConfig)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create/update stream: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
np.stream = stream
|
||||
return nil
|
||||
}
|
||||
|
||||
// consumeMessages consumes messages from NATS for a subscription
|
||||
func (np *NATSProvider) consumeMessages(sub *natsSubscription) {
|
||||
defer np.wg.Done()
|
||||
defer close(sub.ch)
|
||||
defer func() {
|
||||
np.mu.Lock()
|
||||
delete(np.subscribers, sub.pattern)
|
||||
np.stats.ActiveSubscribers.Add(-1)
|
||||
np.mu.Unlock()
|
||||
}()
|
||||
|
||||
logger.Debug("Starting NATS consumer for pattern: %s", sub.pattern)
|
||||
|
||||
// Consume messages
|
||||
cc, err := sub.consumer.Consume(func(msg jetstream.Msg) {
|
||||
var event Event
|
||||
if err := json.Unmarshal(msg.Data(), &event); err != nil {
|
||||
logger.Warn("Failed to unmarshal event: %v", err)
|
||||
_ = msg.Nak()
|
||||
return
|
||||
}
|
||||
|
||||
// Check if event matches pattern (additional filtering)
|
||||
if matchPattern(sub.pattern, event.Type) {
|
||||
select {
|
||||
case sub.ch <- &event:
|
||||
np.stats.EventsConsumed.Add(1)
|
||||
_ = msg.Ack()
|
||||
case <-sub.ctx.Done():
|
||||
_ = msg.Nak()
|
||||
return
|
||||
}
|
||||
} else {
|
||||
_ = msg.Ack()
|
||||
}
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
np.stats.ConsumerErrors.Add(1)
|
||||
logger.Error("Failed to start consumer: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// Wait for context cancellation
|
||||
<-sub.ctx.Done()
|
||||
|
||||
// Stop consuming
|
||||
cc.Stop()
|
||||
|
||||
logger.Debug("NATS consumer stopped for pattern: %s", sub.pattern)
|
||||
}
|
||||
|
||||
// buildSubject creates a NATS subject from an event
|
||||
// Format: events.{source}.{schema}.{entity}.{operation}
|
||||
func (np *NATSProvider) buildSubject(event *Event) string {
|
||||
return fmt.Sprintf("%s.%s.%s.%s.%s",
|
||||
np.subjectPrefix,
|
||||
event.Source,
|
||||
event.Schema,
|
||||
event.Entity,
|
||||
event.Operation,
|
||||
)
|
||||
}
|
||||
|
||||
// patternToSubject converts a glob pattern to NATS subject pattern
|
||||
// Examples:
|
||||
// - "*" -> "events.>"
|
||||
// - "public.users.*" -> "events.*.public.users.*"
|
||||
// - "public.*.*" -> "events.*.public.*.*"
|
||||
func (np *NATSProvider) patternToSubject(pattern string) string {
|
||||
if pattern == "*" {
|
||||
return np.subjectPrefix + ".>"
|
||||
}
|
||||
|
||||
// For specific patterns, we need to match the event type structure
|
||||
// Event type: schema.entity.operation
|
||||
// NATS subject: events.{source}.{schema}.{entity}.{operation}
|
||||
// We use wildcard for source since pattern doesn't include it
|
||||
return fmt.Sprintf("%s.*.%s", np.subjectPrefix, pattern)
|
||||
}
|
||||
|
||||
// matchesFilter checks if an event matches the filter criteria
|
||||
func (np *NATSProvider) matchesFilter(event *Event, filter *EventFilter) bool {
|
||||
if filter == nil {
|
||||
return true
|
||||
}
|
||||
|
||||
if filter.Source != nil && event.Source != *filter.Source {
|
||||
return false
|
||||
}
|
||||
if filter.Status != nil && event.Status != *filter.Status {
|
||||
return false
|
||||
}
|
||||
if filter.UserID != nil && event.UserID != *filter.UserID {
|
||||
return false
|
||||
}
|
||||
if filter.Schema != "" && event.Schema != filter.Schema {
|
||||
return false
|
||||
}
|
||||
if filter.Entity != "" && event.Entity != filter.Entity {
|
||||
return false
|
||||
}
|
||||
if filter.Operation != "" && event.Operation != filter.Operation {
|
||||
return false
|
||||
}
|
||||
if filter.InstanceID != "" && event.InstanceID != filter.InstanceID {
|
||||
return false
|
||||
}
|
||||
if filter.StartTime != nil && event.CreatedAt.Before(*filter.StartTime) {
|
||||
return false
|
||||
}
|
||||
if filter.EndTime != nil && event.CreatedAt.After(*filter.EndTime) {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
541
pkg/eventbroker/provider_redis.go
Normal file
541
pkg/eventbroker/provider_redis.go
Normal file
@@ -0,0 +1,541 @@
|
||||
package eventbroker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
|
||||
"github.com/bitechdev/ResolveSpec/pkg/logger"
|
||||
)
|
||||
|
||||
// RedisProvider implements Provider interface using Redis Streams
|
||||
// Features:
|
||||
// - Persistent event storage using Redis Streams
|
||||
// - Cross-instance pub/sub using consumer groups
|
||||
// - Pattern-based subscription routing
|
||||
// - Automatic stream trimming to prevent unbounded growth
|
||||
type RedisProvider struct {
|
||||
client *redis.Client
|
||||
streamName string
|
||||
consumerGroup string
|
||||
consumerName string
|
||||
instanceID string
|
||||
maxLen int64
|
||||
|
||||
// Subscriptions
|
||||
mu sync.RWMutex
|
||||
subscribers map[string]*redisSubscription
|
||||
|
||||
// Statistics
|
||||
stats RedisProviderStats
|
||||
|
||||
// Lifecycle
|
||||
stopListeners chan struct{}
|
||||
wg sync.WaitGroup
|
||||
isRunning atomic.Bool
|
||||
}
|
||||
|
||||
// RedisProviderStats contains statistics for the Redis provider
|
||||
type RedisProviderStats struct {
|
||||
TotalEvents atomic.Int64
|
||||
EventsPublished atomic.Int64
|
||||
EventsConsumed atomic.Int64
|
||||
ActiveSubscribers atomic.Int32
|
||||
ConsumerErrors atomic.Int64
|
||||
}
|
||||
|
||||
// redisSubscription represents a single subscription
|
||||
type redisSubscription struct {
|
||||
pattern string
|
||||
ch chan *Event
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
// RedisProviderConfig configures the Redis provider
|
||||
type RedisProviderConfig struct {
|
||||
Host string
|
||||
Port int
|
||||
Password string
|
||||
DB int
|
||||
StreamName string
|
||||
ConsumerGroup string
|
||||
ConsumerName string
|
||||
InstanceID string
|
||||
MaxLen int64 // Maximum stream length (0 = unlimited)
|
||||
}
|
||||
|
||||
// NewRedisProvider creates a new Redis event provider
|
||||
func NewRedisProvider(cfg RedisProviderConfig) (*RedisProvider, error) {
|
||||
// Apply defaults
|
||||
if cfg.Host == "" {
|
||||
cfg.Host = "localhost"
|
||||
}
|
||||
if cfg.Port == 0 {
|
||||
cfg.Port = 6379
|
||||
}
|
||||
if cfg.StreamName == "" {
|
||||
cfg.StreamName = "resolvespec:events"
|
||||
}
|
||||
if cfg.ConsumerGroup == "" {
|
||||
cfg.ConsumerGroup = "resolvespec-workers"
|
||||
}
|
||||
if cfg.ConsumerName == "" {
|
||||
cfg.ConsumerName = cfg.InstanceID
|
||||
}
|
||||
if cfg.MaxLen == 0 {
|
||||
cfg.MaxLen = 10000 // Default max stream length
|
||||
}
|
||||
|
||||
// Create Redis client
|
||||
client := redis.NewClient(&redis.Options{
|
||||
Addr: fmt.Sprintf("%s:%d", cfg.Host, cfg.Port),
|
||||
Password: cfg.Password,
|
||||
DB: cfg.DB,
|
||||
PoolSize: 10,
|
||||
})
|
||||
|
||||
// Test connection
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
if err := client.Ping(ctx).Err(); err != nil {
|
||||
return nil, fmt.Errorf("failed to connect to Redis: %w", err)
|
||||
}
|
||||
|
||||
rp := &RedisProvider{
|
||||
client: client,
|
||||
streamName: cfg.StreamName,
|
||||
consumerGroup: cfg.ConsumerGroup,
|
||||
consumerName: cfg.ConsumerName,
|
||||
instanceID: cfg.InstanceID,
|
||||
maxLen: cfg.MaxLen,
|
||||
subscribers: make(map[string]*redisSubscription),
|
||||
stopListeners: make(chan struct{}),
|
||||
}
|
||||
|
||||
rp.isRunning.Store(true)
|
||||
|
||||
// Create consumer group if it doesn't exist
|
||||
if err := rp.ensureConsumerGroup(ctx); err != nil {
|
||||
logger.Warn("Failed to create consumer group: %v (may already exist)", err)
|
||||
}
|
||||
|
||||
logger.Info("Redis provider initialized (stream: %s, consumer_group: %s, consumer: %s)",
|
||||
cfg.StreamName, cfg.ConsumerGroup, cfg.ConsumerName)
|
||||
|
||||
return rp, nil
|
||||
}
|
||||
|
||||
// Store stores an event
|
||||
func (rp *RedisProvider) Store(ctx context.Context, event *Event) error {
|
||||
// Marshal event to JSON
|
||||
data, err := json.Marshal(event)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal event: %w", err)
|
||||
}
|
||||
|
||||
// Store in Redis Stream
|
||||
args := &redis.XAddArgs{
|
||||
Stream: rp.streamName,
|
||||
MaxLen: rp.maxLen,
|
||||
Approx: true, // Use approximate trimming for better performance
|
||||
Values: map[string]interface{}{
|
||||
"event": data,
|
||||
"id": event.ID,
|
||||
"type": event.Type,
|
||||
"source": string(event.Source),
|
||||
"status": string(event.Status),
|
||||
"instance_id": event.InstanceID,
|
||||
},
|
||||
}
|
||||
|
||||
if _, err := rp.client.XAdd(ctx, args).Result(); err != nil {
|
||||
return fmt.Errorf("failed to add event to stream: %w", err)
|
||||
}
|
||||
|
||||
rp.stats.TotalEvents.Add(1)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get retrieves an event by ID
|
||||
// Note: This scans the stream which can be slow for large streams
|
||||
// Consider using a separate hash for fast lookups if needed
|
||||
func (rp *RedisProvider) Get(ctx context.Context, id string) (*Event, error) {
|
||||
// Scan stream for event with matching ID
|
||||
args := &redis.XReadArgs{
|
||||
Streams: []string{rp.streamName, "0"},
|
||||
Count: 1000, // Read in batches
|
||||
}
|
||||
|
||||
for {
|
||||
streams, err := rp.client.XRead(ctx, args).Result()
|
||||
if err == redis.Nil {
|
||||
return nil, fmt.Errorf("event not found: %s", id)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to read stream: %w", err)
|
||||
}
|
||||
|
||||
if len(streams) == 0 {
|
||||
return nil, fmt.Errorf("event not found: %s", id)
|
||||
}
|
||||
|
||||
for _, stream := range streams {
|
||||
for _, message := range stream.Messages {
|
||||
// Check if this is the event we're looking for
|
||||
if eventID, ok := message.Values["id"].(string); ok && eventID == id {
|
||||
// Parse event
|
||||
if eventData, ok := message.Values["event"].(string); ok {
|
||||
var event Event
|
||||
if err := json.Unmarshal([]byte(eventData), &event); err != nil {
|
||||
return nil, fmt.Errorf("failed to unmarshal event: %w", err)
|
||||
}
|
||||
return &event, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If we've read messages, update start position for next iteration
|
||||
if len(stream.Messages) > 0 {
|
||||
args.Streams[1] = stream.Messages[len(stream.Messages)-1].ID
|
||||
} else {
|
||||
// No more messages
|
||||
return nil, fmt.Errorf("event not found: %s", id)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// List lists events with optional filters
|
||||
// Note: This scans the entire stream which can be slow
|
||||
// Consider using time-based or ID-based ranges for better performance
|
||||
func (rp *RedisProvider) List(ctx context.Context, filter *EventFilter) ([]*Event, error) {
|
||||
var results []*Event
|
||||
|
||||
// Read from stream
|
||||
args := &redis.XReadArgs{
|
||||
Streams: []string{rp.streamName, "0"},
|
||||
Count: 1000,
|
||||
}
|
||||
|
||||
for {
|
||||
streams, err := rp.client.XRead(ctx, args).Result()
|
||||
if err == redis.Nil {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to read stream: %w", err)
|
||||
}
|
||||
|
||||
if len(streams) == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
for _, stream := range streams {
|
||||
for _, message := range stream.Messages {
|
||||
if eventData, ok := message.Values["event"].(string); ok {
|
||||
var event Event
|
||||
if err := json.Unmarshal([]byte(eventData), &event); err != nil {
|
||||
logger.Warn("Failed to unmarshal event: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if rp.matchesFilter(&event, filter) {
|
||||
results = append(results, &event)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Update start position for next iteration
|
||||
if len(stream.Messages) > 0 {
|
||||
args.Streams[1] = stream.Messages[len(stream.Messages)-1].ID
|
||||
} else {
|
||||
// No more messages
|
||||
goto done
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
done:
|
||||
// Apply limit and offset
|
||||
if filter != nil {
|
||||
if filter.Offset > 0 && filter.Offset < len(results) {
|
||||
results = results[filter.Offset:]
|
||||
}
|
||||
if filter.Limit > 0 && filter.Limit < len(results) {
|
||||
results = results[:filter.Limit]
|
||||
}
|
||||
}
|
||||
|
||||
return results, nil
|
||||
}
|
||||
|
||||
// UpdateStatus updates the status of an event
|
||||
// Note: Redis Streams are append-only, so we need to store status updates separately
|
||||
// This uses a separate hash for status tracking
|
||||
func (rp *RedisProvider) UpdateStatus(ctx context.Context, id string, status EventStatus, errorMsg string) error {
|
||||
statusKey := fmt.Sprintf("%s:status:%s", rp.streamName, id)
|
||||
|
||||
fields := map[string]interface{}{
|
||||
"status": string(status),
|
||||
"updated_at": time.Now().Format(time.RFC3339),
|
||||
}
|
||||
|
||||
if errorMsg != "" {
|
||||
fields["error"] = errorMsg
|
||||
}
|
||||
|
||||
if err := rp.client.HSet(ctx, statusKey, fields).Err(); err != nil {
|
||||
return fmt.Errorf("failed to update status: %w", err)
|
||||
}
|
||||
|
||||
// Set TTL on status key to prevent unbounded growth
|
||||
rp.client.Expire(ctx, statusKey, 7*24*time.Hour) // 7 days
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete deletes an event by ID
|
||||
// Note: Redis Streams don't support deletion by field value
|
||||
// This marks the event as deleted in a separate set
|
||||
func (rp *RedisProvider) Delete(ctx context.Context, id string) error {
|
||||
deletedKey := fmt.Sprintf("%s:deleted", rp.streamName)
|
||||
|
||||
if err := rp.client.SAdd(ctx, deletedKey, id).Err(); err != nil {
|
||||
return fmt.Errorf("failed to mark event as deleted: %w", err)
|
||||
}
|
||||
|
||||
// Also delete the status hash if it exists
|
||||
statusKey := fmt.Sprintf("%s:status:%s", rp.streamName, id)
|
||||
rp.client.Del(ctx, statusKey)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stream returns a channel of events for real-time consumption
|
||||
// Uses Redis Streams consumer group for distributed processing
|
||||
func (rp *RedisProvider) Stream(ctx context.Context, pattern string) (<-chan *Event, error) {
|
||||
ch := make(chan *Event, 100)
|
||||
|
||||
subCtx, cancel := context.WithCancel(ctx)
|
||||
|
||||
sub := &redisSubscription{
|
||||
pattern: pattern,
|
||||
ch: ch,
|
||||
ctx: subCtx,
|
||||
cancel: cancel,
|
||||
}
|
||||
|
||||
rp.mu.Lock()
|
||||
rp.subscribers[pattern] = sub
|
||||
rp.stats.ActiveSubscribers.Add(1)
|
||||
rp.mu.Unlock()
|
||||
|
||||
// Start consumer goroutine
|
||||
rp.wg.Add(1)
|
||||
go rp.consumeStream(sub)
|
||||
|
||||
return ch, nil
|
||||
}
|
||||
|
||||
// Publish publishes an event to all subscribers (cross-instance)
|
||||
func (rp *RedisProvider) Publish(ctx context.Context, event *Event) error {
|
||||
// Store the event first
|
||||
if err := rp.Store(ctx, event); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rp.stats.EventsPublished.Add(1)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close closes the provider and releases resources
|
||||
func (rp *RedisProvider) Close() error {
|
||||
if !rp.isRunning.Load() {
|
||||
return nil
|
||||
}
|
||||
|
||||
rp.isRunning.Store(false)
|
||||
|
||||
// Cancel all subscriptions
|
||||
rp.mu.Lock()
|
||||
for _, sub := range rp.subscribers {
|
||||
sub.cancel()
|
||||
}
|
||||
rp.mu.Unlock()
|
||||
|
||||
// Stop listeners
|
||||
close(rp.stopListeners)
|
||||
|
||||
// Wait for goroutines
|
||||
rp.wg.Wait()
|
||||
|
||||
// Close Redis client
|
||||
if err := rp.client.Close(); err != nil {
|
||||
return fmt.Errorf("failed to close Redis client: %w", err)
|
||||
}
|
||||
|
||||
logger.Info("Redis provider closed")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stats returns provider statistics
|
||||
func (rp *RedisProvider) Stats(ctx context.Context) (*ProviderStats, error) {
|
||||
// Get stream info
|
||||
streamInfo, err := rp.client.XInfoStream(ctx, rp.streamName).Result()
|
||||
if err != nil && err != redis.Nil {
|
||||
logger.Warn("Failed to get stream info: %v", err)
|
||||
}
|
||||
|
||||
stats := &ProviderStats{
|
||||
ProviderType: "redis",
|
||||
TotalEvents: rp.stats.TotalEvents.Load(),
|
||||
EventsPublished: rp.stats.EventsPublished.Load(),
|
||||
EventsConsumed: rp.stats.EventsConsumed.Load(),
|
||||
ActiveSubscribers: int(rp.stats.ActiveSubscribers.Load()),
|
||||
ProviderSpecific: map[string]interface{}{
|
||||
"stream_name": rp.streamName,
|
||||
"consumer_group": rp.consumerGroup,
|
||||
"consumer_name": rp.consumerName,
|
||||
"max_len": rp.maxLen,
|
||||
"consumer_errors": rp.stats.ConsumerErrors.Load(),
|
||||
},
|
||||
}
|
||||
|
||||
if streamInfo != nil {
|
||||
stats.ProviderSpecific["stream_length"] = streamInfo.Length
|
||||
stats.ProviderSpecific["first_entry_id"] = streamInfo.FirstEntry.ID
|
||||
stats.ProviderSpecific["last_entry_id"] = streamInfo.LastEntry.ID
|
||||
}
|
||||
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
// consumeStream consumes events from the Redis Stream for a subscription
|
||||
func (rp *RedisProvider) consumeStream(sub *redisSubscription) {
|
||||
defer rp.wg.Done()
|
||||
defer close(sub.ch)
|
||||
defer func() {
|
||||
rp.mu.Lock()
|
||||
delete(rp.subscribers, sub.pattern)
|
||||
rp.stats.ActiveSubscribers.Add(-1)
|
||||
rp.mu.Unlock()
|
||||
}()
|
||||
|
||||
logger.Debug("Starting stream consumer for pattern: %s", sub.pattern)
|
||||
|
||||
// Use consumer group for distributed processing
|
||||
for {
|
||||
select {
|
||||
case <-sub.ctx.Done():
|
||||
logger.Debug("Stream consumer stopped for pattern: %s", sub.pattern)
|
||||
return
|
||||
default:
|
||||
// Read from consumer group
|
||||
args := &redis.XReadGroupArgs{
|
||||
Group: rp.consumerGroup,
|
||||
Consumer: rp.consumerName,
|
||||
Streams: []string{rp.streamName, ">"},
|
||||
Count: 10,
|
||||
Block: 1 * time.Second,
|
||||
}
|
||||
|
||||
streams, err := rp.client.XReadGroup(sub.ctx, args).Result()
|
||||
if err == redis.Nil {
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
if sub.ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
rp.stats.ConsumerErrors.Add(1)
|
||||
logger.Warn("Failed to read from consumer group: %v", err)
|
||||
time.Sleep(1 * time.Second)
|
||||
continue
|
||||
}
|
||||
|
||||
for _, stream := range streams {
|
||||
for _, message := range stream.Messages {
|
||||
if eventData, ok := message.Values["event"].(string); ok {
|
||||
var event Event
|
||||
if err := json.Unmarshal([]byte(eventData), &event); err != nil {
|
||||
logger.Warn("Failed to unmarshal event: %v", err)
|
||||
// Acknowledge message anyway to prevent redelivery
|
||||
rp.client.XAck(sub.ctx, rp.streamName, rp.consumerGroup, message.ID)
|
||||
continue
|
||||
}
|
||||
|
||||
// Check if event matches pattern
|
||||
if matchPattern(sub.pattern, event.Type) {
|
||||
select {
|
||||
case sub.ch <- &event:
|
||||
rp.stats.EventsConsumed.Add(1)
|
||||
// Acknowledge message
|
||||
rp.client.XAck(sub.ctx, rp.streamName, rp.consumerGroup, message.ID)
|
||||
case <-sub.ctx.Done():
|
||||
return
|
||||
}
|
||||
} else {
|
||||
// Acknowledge message even if it doesn't match pattern
|
||||
rp.client.XAck(sub.ctx, rp.streamName, rp.consumerGroup, message.ID)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ensureConsumerGroup creates the consumer group if it doesn't exist
|
||||
func (rp *RedisProvider) ensureConsumerGroup(ctx context.Context) error {
|
||||
// Try to create the stream and consumer group
|
||||
// MKSTREAM creates the stream if it doesn't exist
|
||||
err := rp.client.XGroupCreateMkStream(ctx, rp.streamName, rp.consumerGroup, "0").Err()
|
||||
if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// matchesFilter checks if an event matches the filter criteria
|
||||
func (rp *RedisProvider) matchesFilter(event *Event, filter *EventFilter) bool {
|
||||
if filter == nil {
|
||||
return true
|
||||
}
|
||||
|
||||
if filter.Source != nil && event.Source != *filter.Source {
|
||||
return false
|
||||
}
|
||||
if filter.Status != nil && event.Status != *filter.Status {
|
||||
return false
|
||||
}
|
||||
if filter.UserID != nil && event.UserID != *filter.UserID {
|
||||
return false
|
||||
}
|
||||
if filter.Schema != "" && event.Schema != filter.Schema {
|
||||
return false
|
||||
}
|
||||
if filter.Entity != "" && event.Entity != filter.Entity {
|
||||
return false
|
||||
}
|
||||
if filter.Operation != "" && event.Operation != filter.Operation {
|
||||
return false
|
||||
}
|
||||
if filter.InstanceID != "" && event.InstanceID != filter.InstanceID {
|
||||
return false
|
||||
}
|
||||
if filter.StartTime != nil && event.CreatedAt.Before(*filter.StartTime) {
|
||||
return false
|
||||
}
|
||||
if filter.EndTime != nil && event.CreatedAt.After(*filter.EndTime) {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
@@ -78,8 +78,8 @@ func CloseErrorTracking() error {
|
||||
// extractContext attempts to find a context.Context in the given arguments.
|
||||
// It returns the found context (or context.Background() if not found) and
|
||||
// the remaining arguments without the context.
|
||||
func extractContext(args ...interface{}) (context.Context, []interface{}) {
|
||||
ctx := context.Background()
|
||||
func extractContext(args ...interface{}) (ctx context.Context, filteredArgs []interface{}) {
|
||||
ctx = context.Background()
|
||||
var newArgs []interface{}
|
||||
found := false
|
||||
|
||||
|
||||
@@ -93,7 +93,7 @@ func (h *Handler) Start() error {
|
||||
// Subscribe to all request topics: spec/+/request
|
||||
requestTopic := fmt.Sprintf("%s/+/request", h.config.Topics.Prefix)
|
||||
if err := h.broker.Subscribe(requestTopic, h.config.QoS.Request, h.handleIncomingMessage); err != nil {
|
||||
h.broker.Stop(h.ctx)
|
||||
_ = h.broker.Stop(h.ctx)
|
||||
return fmt.Errorf("failed to subscribe to request topic: %w", err)
|
||||
}
|
||||
|
||||
@@ -130,14 +130,14 @@ func (h *Handler) Shutdown() error {
|
||||
"mqtt_client": client,
|
||||
},
|
||||
}
|
||||
h.hooks.Execute(BeforeDisconnect, hookCtx)
|
||||
_ = h.hooks.Execute(BeforeDisconnect, hookCtx)
|
||||
h.clientManager.Unregister(client.ID)
|
||||
h.hooks.Execute(AfterDisconnect, hookCtx)
|
||||
_ = h.hooks.Execute(AfterDisconnect, hookCtx)
|
||||
}
|
||||
|
||||
// Unsubscribe from request topic
|
||||
requestTopic := fmt.Sprintf("%s/+/request", h.config.Topics.Prefix)
|
||||
h.broker.Unsubscribe(requestTopic)
|
||||
_ = h.broker.Unsubscribe(requestTopic)
|
||||
|
||||
// Stop broker
|
||||
if err := h.broker.Stop(h.ctx); err != nil {
|
||||
@@ -223,7 +223,7 @@ func (h *Handler) handleIncomingMessage(topic string, payload []byte) {
|
||||
return
|
||||
}
|
||||
|
||||
h.hooks.Execute(AfterConnect, hookCtx)
|
||||
_ = h.hooks.Execute(AfterConnect, hookCtx)
|
||||
}
|
||||
|
||||
// Route message by type
|
||||
@@ -498,7 +498,7 @@ func (h *Handler) handleSubscribe(client *Client, msg *Message) {
|
||||
client.AddSubscription(sub)
|
||||
|
||||
// Execute after hook
|
||||
h.hooks.Execute(AfterSubscribe, hookCtx)
|
||||
_ = h.hooks.Execute(AfterSubscribe, hookCtx)
|
||||
|
||||
// Send response
|
||||
h.sendResponse(client.ID, msg.ID, map[string]interface{}{
|
||||
@@ -541,7 +541,7 @@ func (h *Handler) handleUnsubscribe(client *Client, msg *Message) {
|
||||
client.RemoveSubscription(subID)
|
||||
|
||||
// Execute after hook
|
||||
h.hooks.Execute(AfterUnsubscribe, hookCtx)
|
||||
_ = h.hooks.Execute(AfterUnsubscribe, hookCtx)
|
||||
|
||||
// Send response
|
||||
h.sendResponse(client.ID, msg.ID, map[string]interface{}{
|
||||
@@ -562,7 +562,7 @@ func (h *Handler) handlePing(client *Client, msg *Message) {
|
||||
|
||||
payload, _ := json.Marshal(pong)
|
||||
topic := h.getResponseTopic(client.ID)
|
||||
h.broker.Publish(topic, h.config.QoS.Response, payload)
|
||||
_ = h.broker.Publish(topic, h.config.QoS.Response, payload)
|
||||
}
|
||||
|
||||
// notifySubscribers sends notifications to subscribers
|
||||
@@ -625,7 +625,7 @@ func (h *Handler) sendError(clientID, msgID, code, message string) {
|
||||
|
||||
payload, _ := json.Marshal(errResp)
|
||||
topic := h.getResponseTopic(clientID)
|
||||
h.broker.Publish(topic, h.config.QoS.Response, payload)
|
||||
_ = h.broker.Publish(topic, h.config.QoS.Response, payload)
|
||||
}
|
||||
|
||||
// Topic helpers
|
||||
@@ -669,8 +669,8 @@ func (h *Handler) readByID(hookCtx *HookContext) (interface{}, error) {
|
||||
|
||||
// Apply preloads (simplified)
|
||||
if hookCtx.Options != nil {
|
||||
for _, preload := range hookCtx.Options.Preload {
|
||||
query = query.PreloadRelation(preload.Relation)
|
||||
for i := range hookCtx.Options.Preload {
|
||||
query = query.PreloadRelation(hookCtx.Options.Preload[i].Relation)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -683,7 +683,7 @@ func (h *Handler) readByID(hookCtx *HookContext) (interface{}, error) {
|
||||
}
|
||||
|
||||
// readMultiple reads multiple records
|
||||
func (h *Handler) readMultiple(hookCtx *HookContext) (interface{}, map[string]interface{}, error) {
|
||||
func (h *Handler) readMultiple(hookCtx *HookContext) (data interface{}, metadata map[string]interface{}, err error) {
|
||||
query := h.db.NewSelect().Model(hookCtx.ModelPtr).Table(hookCtx.TableName)
|
||||
|
||||
// Apply options
|
||||
@@ -711,8 +711,8 @@ func (h *Handler) readMultiple(hookCtx *HookContext) (interface{}, map[string]in
|
||||
}
|
||||
|
||||
// Apply preloads
|
||||
for _, preload := range hookCtx.Options.Preload {
|
||||
query = query.PreloadRelation(preload.Relation)
|
||||
for i := range hookCtx.Options.Preload {
|
||||
query = query.PreloadRelation(hookCtx.Options.Preload[i].Relation)
|
||||
}
|
||||
|
||||
// Apply columns
|
||||
@@ -727,7 +727,7 @@ func (h *Handler) readMultiple(hookCtx *HookContext) (interface{}, map[string]in
|
||||
}
|
||||
|
||||
// Get count
|
||||
metadata := make(map[string]interface{})
|
||||
metadata = make(map[string]interface{})
|
||||
countQuery := h.db.NewSelect().Model(hookCtx.ModelPtr).Table(hookCtx.TableName)
|
||||
if hookCtx.Options != nil {
|
||||
for _, filter := range hookCtx.Options.Filters {
|
||||
|
||||
@@ -13,9 +13,10 @@ import (
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/klauspost/compress/gzhttp"
|
||||
|
||||
"github.com/bitechdev/ResolveSpec/pkg/logger"
|
||||
"github.com/bitechdev/ResolveSpec/pkg/middleware"
|
||||
"github.com/klauspost/compress/gzhttp"
|
||||
)
|
||||
|
||||
// gracefulServer wraps http.Server with graceful shutdown capabilities (internal type)
|
||||
@@ -320,9 +321,9 @@ func (sm *serverManager) RestartAll() error {
|
||||
|
||||
// Retry starting all servers with exponential backoff instead of a fixed sleep.
|
||||
const (
|
||||
maxAttempts = 5
|
||||
initialBackoff = 100 * time.Millisecond
|
||||
maxBackoff = 2 * time.Second
|
||||
maxAttempts = 5
|
||||
initialBackoff = 100 * time.Millisecond
|
||||
maxBackoff = 2 * time.Second
|
||||
)
|
||||
|
||||
var lastErr error
|
||||
@@ -428,7 +429,7 @@ func newInstance(cfg Config) (*serverInstance, error) {
|
||||
}
|
||||
|
||||
addr := fmt.Sprintf("%s:%d", cfg.Host, cfg.Port)
|
||||
var handler http.Handler = cfg.Handler
|
||||
var handler = cfg.Handler
|
||||
|
||||
// Wrap with GZIP handler if enabled
|
||||
if cfg.GZIP {
|
||||
|
||||
@@ -102,14 +102,14 @@ func getCertDirectory() (string, error) {
|
||||
// Fallback to current directory if cache dir is not available
|
||||
cacheDir = "."
|
||||
}
|
||||
|
||||
|
||||
certDir := filepath.Join(cacheDir, "resolvespec", "certs")
|
||||
|
||||
|
||||
// Create directory if it doesn't exist
|
||||
if err := os.MkdirAll(certDir, 0700); err != nil {
|
||||
return "", fmt.Errorf("failed to create certificate directory: %w", err)
|
||||
}
|
||||
|
||||
|
||||
return certDir, nil
|
||||
}
|
||||
|
||||
@@ -120,31 +120,31 @@ func isCertificateValid(certFile string) bool {
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
|
||||
// Parse certificate
|
||||
block, _ := pem.Decode(certData)
|
||||
if block == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
|
||||
cert, err := x509.ParseCertificate(block.Bytes)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
|
||||
// Check if certificate is expired or will expire in the next 30 days
|
||||
now := time.Now()
|
||||
expiryThreshold := now.Add(30 * 24 * time.Hour)
|
||||
|
||||
|
||||
if now.Before(cert.NotBefore) || now.After(cert.NotAfter) {
|
||||
return false
|
||||
}
|
||||
|
||||
|
||||
// Renew if expiring soon
|
||||
if expiryThreshold.After(cert.NotAfter) {
|
||||
return false
|
||||
}
|
||||
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -156,24 +156,24 @@ func saveCertToFiles(certPEM, keyPEM []byte, host string) (certFile, keyFile str
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
|
||||
|
||||
// Sanitize hostname for safe file naming
|
||||
safeHost := sanitizeHostname(host)
|
||||
|
||||
|
||||
// Use consistent file names based on host
|
||||
certFile = filepath.Join(certDir, fmt.Sprintf("%s-cert.pem", safeHost))
|
||||
keyFile = filepath.Join(certDir, fmt.Sprintf("%s-key.pem", safeHost))
|
||||
|
||||
|
||||
// Write certificate
|
||||
if err := os.WriteFile(certFile, certPEM, 0600); err != nil {
|
||||
return "", "", fmt.Errorf("failed to write certificate: %w", err)
|
||||
}
|
||||
|
||||
|
||||
// Write key
|
||||
if err := os.WriteFile(keyFile, keyPEM, 0600); err != nil {
|
||||
return "", "", fmt.Errorf("failed to write private key: %w", err)
|
||||
}
|
||||
|
||||
|
||||
return certFile, keyFile, nil
|
||||
}
|
||||
|
||||
@@ -196,10 +196,10 @@ func setupAutoTLS(domains []string, email, cacheDir string) (*tls.Config, error)
|
||||
|
||||
// Create autocert manager
|
||||
m := &autocert.Manager{
|
||||
Prompt: autocert.AcceptTOS,
|
||||
Cache: autocert.DirCache(cacheDir),
|
||||
HostPolicy: autocert.HostWhitelist(domains...),
|
||||
Email: email,
|
||||
Prompt: autocert.AcceptTOS,
|
||||
Cache: autocert.DirCache(cacheDir),
|
||||
HostPolicy: autocert.HostWhitelist(domains...),
|
||||
Email: email,
|
||||
}
|
||||
|
||||
// Create TLS config
|
||||
@@ -211,7 +211,7 @@ func setupAutoTLS(domains []string, email, cacheDir string) (*tls.Config, error)
|
||||
|
||||
// configureTLS configures TLS for the server based on the provided configuration.
|
||||
// Returns the TLS config and certificate/key file paths (if applicable).
|
||||
func configureTLS(cfg Config) (*tls.Config, string, string, error) {
|
||||
func configureTLS(cfg Config) (tlsConfig *tls.Config, certFile string, keyFile string, err error) {
|
||||
// Option 1: Certificate files provided
|
||||
if cfg.SSLCert != "" && cfg.SSLKey != "" {
|
||||
// Validate that files exist
|
||||
|
||||
@@ -209,9 +209,9 @@ func (c *Connection) ReadPump() {
|
||||
}()
|
||||
|
||||
// Configure read parameters
|
||||
c.ws.SetReadDeadline(time.Now().Add(60 * time.Second))
|
||||
_ = c.ws.SetReadDeadline(time.Now().Add(60 * time.Second))
|
||||
c.ws.SetPongHandler(func(string) error {
|
||||
c.ws.SetReadDeadline(time.Now().Add(60 * time.Second))
|
||||
_ = c.ws.SetReadDeadline(time.Now().Add(60 * time.Second))
|
||||
return nil
|
||||
})
|
||||
|
||||
@@ -240,10 +240,10 @@ func (c *Connection) WritePump() {
|
||||
for {
|
||||
select {
|
||||
case message, ok := <-c.send:
|
||||
c.ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
|
||||
_ = c.ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
|
||||
if !ok {
|
||||
// Channel closed
|
||||
c.ws.WriteMessage(websocket.CloseMessage, []byte{})
|
||||
_ = c.ws.WriteMessage(websocket.CloseMessage, []byte{})
|
||||
return
|
||||
}
|
||||
|
||||
@@ -251,13 +251,13 @@ func (c *Connection) WritePump() {
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
w.Write(message)
|
||||
_, _ = w.Write(message)
|
||||
|
||||
// Write any queued messages
|
||||
n := len(c.send)
|
||||
for i := 0; i < n; i++ {
|
||||
w.Write([]byte{'\n'})
|
||||
w.Write(<-c.send)
|
||||
_, _ = w.Write([]byte{'\n'})
|
||||
_, _ = w.Write(<-c.send)
|
||||
}
|
||||
|
||||
if err := w.Close(); err != nil {
|
||||
@@ -265,7 +265,7 @@ func (c *Connection) WritePump() {
|
||||
}
|
||||
|
||||
case <-ticker.C:
|
||||
c.ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
|
||||
_ = c.ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
|
||||
if err := c.ws.WriteMessage(websocket.PingMessage, nil); err != nil {
|
||||
return
|
||||
}
|
||||
@@ -364,14 +364,14 @@ func (c *Connection) handleMessage(data []byte) {
|
||||
if err != nil {
|
||||
logger.Error("[WebSocketSpec] Failed to parse message: %v", err)
|
||||
errResp := NewErrorResponse("", "invalid_message", "Failed to parse message")
|
||||
c.SendJSON(errResp)
|
||||
_ = c.SendJSON(errResp)
|
||||
return
|
||||
}
|
||||
|
||||
if !msg.IsValid() {
|
||||
logger.Error("[WebSocketSpec] Invalid message received")
|
||||
errResp := NewErrorResponse(msg.ID, "invalid_message", "Message validation failed")
|
||||
c.SendJSON(errResp)
|
||||
_ = c.SendJSON(errResp)
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -6,7 +6,6 @@ import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
@@ -22,7 +21,6 @@ type Handler struct {
|
||||
db common.Database
|
||||
registry common.ModelRegistry
|
||||
hooks *HookRegistry
|
||||
nestedProcessor *common.NestedCUDProcessor
|
||||
connManager *ConnectionManager
|
||||
subscriptionManager *SubscriptionManager
|
||||
upgrader websocket.Upgrader
|
||||
@@ -49,9 +47,6 @@ func NewHandler(db common.Database, registry common.ModelRegistry) *Handler {
|
||||
ctx: ctx,
|
||||
}
|
||||
|
||||
// Initialize nested processor (nil for now, can be added later if needed)
|
||||
// handler.nestedProcessor = common.NewNestedCUDProcessor(db, registry, handler)
|
||||
|
||||
// Start connection manager
|
||||
go handler.connManager.Run()
|
||||
|
||||
@@ -110,7 +105,7 @@ func (h *Handler) HandleWebSocket(w http.ResponseWriter, r *http.Request) {
|
||||
h.connManager.Register(conn)
|
||||
|
||||
// Execute after connect hook
|
||||
h.hooks.Execute(AfterConnect, hookCtx)
|
||||
_ = h.hooks.Execute(AfterConnect, hookCtx)
|
||||
|
||||
// Start read/write pumps
|
||||
go conn.WritePump()
|
||||
@@ -130,7 +125,7 @@ func (h *Handler) HandleMessage(conn *Connection, msg *Message) {
|
||||
h.handlePing(conn, msg)
|
||||
default:
|
||||
errResp := NewErrorResponse(msg.ID, "invalid_message_type", fmt.Sprintf("Unknown message type: %s", msg.Type))
|
||||
conn.SendJSON(errResp)
|
||||
_ = conn.SendJSON(errResp)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -147,7 +142,7 @@ func (h *Handler) handleRequest(conn *Connection, msg *Message) {
|
||||
if err != nil {
|
||||
logger.Error("[WebSocketSpec] Model not found for %s.%s: %v", schema, entity, err)
|
||||
errResp := NewErrorResponse(msg.ID, "model_not_found", fmt.Sprintf("Model not found: %s.%s", schema, entity))
|
||||
conn.SendJSON(errResp)
|
||||
_ = conn.SendJSON(errResp)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -156,7 +151,7 @@ func (h *Handler) handleRequest(conn *Connection, msg *Message) {
|
||||
if err != nil {
|
||||
logger.Error("[WebSocketSpec] Model validation failed for %s.%s: %v", schema, entity, err)
|
||||
errResp := NewErrorResponse(msg.ID, "invalid_model", err.Error())
|
||||
conn.SendJSON(errResp)
|
||||
_ = conn.SendJSON(errResp)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -195,7 +190,7 @@ func (h *Handler) handleRequest(conn *Connection, msg *Message) {
|
||||
h.handleMeta(conn, msg, hookCtx)
|
||||
default:
|
||||
errResp := NewErrorResponse(msg.ID, "invalid_operation", fmt.Sprintf("Unknown operation: %s", msg.Operation))
|
||||
conn.SendJSON(errResp)
|
||||
_ = conn.SendJSON(errResp)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -205,7 +200,7 @@ func (h *Handler) handleRead(conn *Connection, msg *Message, hookCtx *HookContex
|
||||
if err := h.hooks.Execute(BeforeRead, hookCtx); err != nil {
|
||||
logger.Error("[WebSocketSpec] BeforeRead hook failed: %v", err)
|
||||
errResp := NewErrorResponse(msg.ID, "hook_error", err.Error())
|
||||
conn.SendJSON(errResp)
|
||||
_ = conn.SendJSON(errResp)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -226,7 +221,7 @@ func (h *Handler) handleRead(conn *Connection, msg *Message, hookCtx *HookContex
|
||||
if err != nil {
|
||||
logger.Error("[WebSocketSpec] Read operation failed: %v", err)
|
||||
errResp := NewErrorResponse(msg.ID, "read_error", err.Error())
|
||||
conn.SendJSON(errResp)
|
||||
_ = conn.SendJSON(errResp)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -237,14 +232,14 @@ func (h *Handler) handleRead(conn *Connection, msg *Message, hookCtx *HookContex
|
||||
if err := h.hooks.Execute(AfterRead, hookCtx); err != nil {
|
||||
logger.Error("[WebSocketSpec] AfterRead hook failed: %v", err)
|
||||
errResp := NewErrorResponse(msg.ID, "hook_error", err.Error())
|
||||
conn.SendJSON(errResp)
|
||||
_ = conn.SendJSON(errResp)
|
||||
return
|
||||
}
|
||||
|
||||
// Send response
|
||||
resp := NewResponseMessage(msg.ID, true, hookCtx.Result)
|
||||
resp.Metadata = metadata
|
||||
conn.SendJSON(resp)
|
||||
_ = conn.SendJSON(resp)
|
||||
}
|
||||
|
||||
// handleCreate processes a create operation
|
||||
@@ -253,7 +248,7 @@ func (h *Handler) handleCreate(conn *Connection, msg *Message, hookCtx *HookCont
|
||||
if err := h.hooks.Execute(BeforeCreate, hookCtx); err != nil {
|
||||
logger.Error("[WebSocketSpec] BeforeCreate hook failed: %v", err)
|
||||
errResp := NewErrorResponse(msg.ID, "hook_error", err.Error())
|
||||
conn.SendJSON(errResp)
|
||||
_ = conn.SendJSON(errResp)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -262,7 +257,7 @@ func (h *Handler) handleCreate(conn *Connection, msg *Message, hookCtx *HookCont
|
||||
if err != nil {
|
||||
logger.Error("[WebSocketSpec] Create operation failed: %v", err)
|
||||
errResp := NewErrorResponse(msg.ID, "create_error", err.Error())
|
||||
conn.SendJSON(errResp)
|
||||
_ = conn.SendJSON(errResp)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -273,13 +268,13 @@ func (h *Handler) handleCreate(conn *Connection, msg *Message, hookCtx *HookCont
|
||||
if err := h.hooks.Execute(AfterCreate, hookCtx); err != nil {
|
||||
logger.Error("[WebSocketSpec] AfterCreate hook failed: %v", err)
|
||||
errResp := NewErrorResponse(msg.ID, "hook_error", err.Error())
|
||||
conn.SendJSON(errResp)
|
||||
_ = conn.SendJSON(errResp)
|
||||
return
|
||||
}
|
||||
|
||||
// Send response
|
||||
resp := NewResponseMessage(msg.ID, true, hookCtx.Result)
|
||||
conn.SendJSON(resp)
|
||||
_ = conn.SendJSON(resp)
|
||||
|
||||
// Notify subscribers
|
||||
h.notifySubscribers(hookCtx.Schema, hookCtx.Entity, OperationCreate, data)
|
||||
@@ -291,7 +286,7 @@ func (h *Handler) handleUpdate(conn *Connection, msg *Message, hookCtx *HookCont
|
||||
if err := h.hooks.Execute(BeforeUpdate, hookCtx); err != nil {
|
||||
logger.Error("[WebSocketSpec] BeforeUpdate hook failed: %v", err)
|
||||
errResp := NewErrorResponse(msg.ID, "hook_error", err.Error())
|
||||
conn.SendJSON(errResp)
|
||||
_ = conn.SendJSON(errResp)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -300,7 +295,7 @@ func (h *Handler) handleUpdate(conn *Connection, msg *Message, hookCtx *HookCont
|
||||
if err != nil {
|
||||
logger.Error("[WebSocketSpec] Update operation failed: %v", err)
|
||||
errResp := NewErrorResponse(msg.ID, "update_error", err.Error())
|
||||
conn.SendJSON(errResp)
|
||||
_ = conn.SendJSON(errResp)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -311,13 +306,13 @@ func (h *Handler) handleUpdate(conn *Connection, msg *Message, hookCtx *HookCont
|
||||
if err := h.hooks.Execute(AfterUpdate, hookCtx); err != nil {
|
||||
logger.Error("[WebSocketSpec] AfterUpdate hook failed: %v", err)
|
||||
errResp := NewErrorResponse(msg.ID, "hook_error", err.Error())
|
||||
conn.SendJSON(errResp)
|
||||
_ = conn.SendJSON(errResp)
|
||||
return
|
||||
}
|
||||
|
||||
// Send response
|
||||
resp := NewResponseMessage(msg.ID, true, hookCtx.Result)
|
||||
conn.SendJSON(resp)
|
||||
_ = conn.SendJSON(resp)
|
||||
|
||||
// Notify subscribers
|
||||
h.notifySubscribers(hookCtx.Schema, hookCtx.Entity, OperationUpdate, data)
|
||||
@@ -329,7 +324,7 @@ func (h *Handler) handleDelete(conn *Connection, msg *Message, hookCtx *HookCont
|
||||
if err := h.hooks.Execute(BeforeDelete, hookCtx); err != nil {
|
||||
logger.Error("[WebSocketSpec] BeforeDelete hook failed: %v", err)
|
||||
errResp := NewErrorResponse(msg.ID, "hook_error", err.Error())
|
||||
conn.SendJSON(errResp)
|
||||
_ = conn.SendJSON(errResp)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -338,7 +333,7 @@ func (h *Handler) handleDelete(conn *Connection, msg *Message, hookCtx *HookCont
|
||||
if err != nil {
|
||||
logger.Error("[WebSocketSpec] Delete operation failed: %v", err)
|
||||
errResp := NewErrorResponse(msg.ID, "delete_error", err.Error())
|
||||
conn.SendJSON(errResp)
|
||||
_ = conn.SendJSON(errResp)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -346,13 +341,13 @@ func (h *Handler) handleDelete(conn *Connection, msg *Message, hookCtx *HookCont
|
||||
if err := h.hooks.Execute(AfterDelete, hookCtx); err != nil {
|
||||
logger.Error("[WebSocketSpec] AfterDelete hook failed: %v", err)
|
||||
errResp := NewErrorResponse(msg.ID, "hook_error", err.Error())
|
||||
conn.SendJSON(errResp)
|
||||
_ = conn.SendJSON(errResp)
|
||||
return
|
||||
}
|
||||
|
||||
// Send response
|
||||
resp := NewResponseMessage(msg.ID, true, map[string]interface{}{"deleted": true})
|
||||
conn.SendJSON(resp)
|
||||
_ = conn.SendJSON(resp)
|
||||
|
||||
// Notify subscribers
|
||||
h.notifySubscribers(hookCtx.Schema, hookCtx.Entity, OperationDelete, map[string]interface{}{"id": hookCtx.ID})
|
||||
@@ -362,7 +357,7 @@ func (h *Handler) handleDelete(conn *Connection, msg *Message, hookCtx *HookCont
|
||||
func (h *Handler) handleMeta(conn *Connection, msg *Message, hookCtx *HookContext) {
|
||||
metadata := h.getMetadata(hookCtx.Schema, hookCtx.Entity, hookCtx.Model)
|
||||
resp := NewResponseMessage(msg.ID, true, metadata)
|
||||
conn.SendJSON(resp)
|
||||
_ = conn.SendJSON(resp)
|
||||
}
|
||||
|
||||
// handleSubscription processes subscription messages
|
||||
@@ -374,7 +369,7 @@ func (h *Handler) handleSubscription(conn *Connection, msg *Message) {
|
||||
h.handleUnsubscribe(conn, msg)
|
||||
default:
|
||||
errResp := NewErrorResponse(msg.ID, "invalid_subscription_operation", fmt.Sprintf("Unknown subscription operation: %s", msg.Operation))
|
||||
conn.SendJSON(errResp)
|
||||
_ = conn.SendJSON(errResp)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -399,7 +394,7 @@ func (h *Handler) handleSubscribe(conn *Connection, msg *Message) {
|
||||
if err := h.hooks.Execute(BeforeSubscribe, hookCtx); err != nil {
|
||||
logger.Error("[WebSocketSpec] BeforeSubscribe hook failed: %v", err)
|
||||
errResp := NewErrorResponse(msg.ID, "hook_error", err.Error())
|
||||
conn.SendJSON(errResp)
|
||||
_ = conn.SendJSON(errResp)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -411,7 +406,7 @@ func (h *Handler) handleSubscribe(conn *Connection, msg *Message) {
|
||||
hookCtx.Subscription = sub
|
||||
|
||||
// Execute after hook
|
||||
h.hooks.Execute(AfterSubscribe, hookCtx)
|
||||
_ = h.hooks.Execute(AfterSubscribe, hookCtx)
|
||||
|
||||
// Send response
|
||||
resp := NewResponseMessage(msg.ID, true, map[string]interface{}{
|
||||
@@ -419,7 +414,7 @@ func (h *Handler) handleSubscribe(conn *Connection, msg *Message) {
|
||||
"schema": msg.Schema,
|
||||
"entity": msg.Entity,
|
||||
})
|
||||
conn.SendJSON(resp)
|
||||
_ = conn.SendJSON(resp)
|
||||
|
||||
logger.Info("[WebSocketSpec] Subscription created: %s for %s.%s (conn: %s)", subID, msg.Schema, msg.Entity, conn.ID)
|
||||
}
|
||||
@@ -429,7 +424,7 @@ func (h *Handler) handleUnsubscribe(conn *Connection, msg *Message) {
|
||||
subID := msg.SubscriptionID
|
||||
if subID == "" {
|
||||
errResp := NewErrorResponse(msg.ID, "missing_subscription_id", "Subscription ID is required for unsubscribe")
|
||||
conn.SendJSON(errResp)
|
||||
_ = conn.SendJSON(errResp)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -437,7 +432,7 @@ func (h *Handler) handleUnsubscribe(conn *Connection, msg *Message) {
|
||||
sub, exists := conn.GetSubscription(subID)
|
||||
if !exists {
|
||||
errResp := NewErrorResponse(msg.ID, "subscription_not_found", fmt.Sprintf("Subscription not found: %s", subID))
|
||||
conn.SendJSON(errResp)
|
||||
_ = conn.SendJSON(errResp)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -455,7 +450,7 @@ func (h *Handler) handleUnsubscribe(conn *Connection, msg *Message) {
|
||||
if err := h.hooks.Execute(BeforeUnsubscribe, hookCtx); err != nil {
|
||||
logger.Error("[WebSocketSpec] BeforeUnsubscribe hook failed: %v", err)
|
||||
errResp := NewErrorResponse(msg.ID, "hook_error", err.Error())
|
||||
conn.SendJSON(errResp)
|
||||
_ = conn.SendJSON(errResp)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -464,14 +459,14 @@ func (h *Handler) handleUnsubscribe(conn *Connection, msg *Message) {
|
||||
conn.RemoveSubscription(subID)
|
||||
|
||||
// Execute after hook
|
||||
h.hooks.Execute(AfterUnsubscribe, hookCtx)
|
||||
_ = h.hooks.Execute(AfterUnsubscribe, hookCtx)
|
||||
|
||||
// Send response
|
||||
resp := NewResponseMessage(msg.ID, true, map[string]interface{}{
|
||||
"unsubscribed": true,
|
||||
"subscription_id": subID,
|
||||
})
|
||||
conn.SendJSON(resp)
|
||||
_ = conn.SendJSON(resp)
|
||||
}
|
||||
|
||||
// handlePing responds to ping messages
|
||||
@@ -481,7 +476,7 @@ func (h *Handler) handlePing(conn *Connection, msg *Message) {
|
||||
Type: MessageTypePong,
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
conn.SendJSON(pong)
|
||||
_ = conn.SendJSON(pong)
|
||||
}
|
||||
|
||||
// notifySubscribers sends notifications to all subscribers of an entity
|
||||
@@ -527,8 +522,8 @@ func (h *Handler) readByID(hookCtx *HookContext) (interface{}, error) {
|
||||
|
||||
// Apply preloads (simplified for now)
|
||||
if hookCtx.Options != nil {
|
||||
for _, preload := range hookCtx.Options.Preload {
|
||||
query = query.PreloadRelation(preload.Relation)
|
||||
for i := range hookCtx.Options.Preload {
|
||||
query = query.PreloadRelation(hookCtx.Options.Preload[i].Relation)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -540,7 +535,7 @@ func (h *Handler) readByID(hookCtx *HookContext) (interface{}, error) {
|
||||
return hookCtx.ModelPtr, nil
|
||||
}
|
||||
|
||||
func (h *Handler) readMultiple(hookCtx *HookContext) (interface{}, map[string]interface{}, error) {
|
||||
func (h *Handler) readMultiple(hookCtx *HookContext) (data interface{}, metadata map[string]interface{}, err error) {
|
||||
query := h.db.NewSelect().Model(hookCtx.ModelPtr).Table(hookCtx.TableName)
|
||||
|
||||
// Apply options (simplified implementation)
|
||||
@@ -568,8 +563,8 @@ func (h *Handler) readMultiple(hookCtx *HookContext) (interface{}, map[string]in
|
||||
}
|
||||
|
||||
// Apply preloads
|
||||
for _, preload := range hookCtx.Options.Preload {
|
||||
query = query.PreloadRelation(preload.Relation)
|
||||
for i := range hookCtx.Options.Preload {
|
||||
query = query.PreloadRelation(hookCtx.Options.Preload[i].Relation)
|
||||
}
|
||||
|
||||
// Apply columns
|
||||
@@ -584,7 +579,7 @@ func (h *Handler) readMultiple(hookCtx *HookContext) (interface{}, map[string]in
|
||||
}
|
||||
|
||||
// Get count
|
||||
metadata := make(map[string]interface{})
|
||||
metadata = make(map[string]interface{})
|
||||
countQuery := h.db.NewSelect().Model(hookCtx.ModelPtr).Table(hookCtx.TableName)
|
||||
if hookCtx.Options != nil {
|
||||
for _, filter := range hookCtx.Options.Filters {
|
||||
@@ -740,8 +735,3 @@ func (h *Handler) BroadcastMessage(message interface{}, filter func(*Connection)
|
||||
func (h *Handler) GetConnection(id string) (*Connection, bool) {
|
||||
return h.connManager.GetConnection(id)
|
||||
}
|
||||
|
||||
// Helper to convert string ID to int64
|
||||
func parseID(id string) (int64, error) {
|
||||
return strconv.ParseInt(id, 10, 64)
|
||||
}
|
||||
|
||||
@@ -110,7 +110,7 @@ func ExampleWithGORM(db *gorm.DB) {
|
||||
handler := NewHandlerWithGORM(db)
|
||||
|
||||
// Register models
|
||||
handler.Registry().RegisterModel("public.users", &struct{}{})
|
||||
_ = handler.Registry().RegisterModel("public.users", &struct{}{})
|
||||
|
||||
// Register hooks (optional)
|
||||
handler.Hooks().RegisterBefore(OperationRead, func(ctx *HookContext) error {
|
||||
@@ -131,7 +131,7 @@ func ExampleWithBun(bunDB *bun.DB) {
|
||||
handler := NewHandlerWithBun(bunDB)
|
||||
|
||||
// Register models
|
||||
handler.Registry().RegisterModel("public.users", &struct{}{})
|
||||
_ = handler.Registry().RegisterModel("public.users", &struct{}{})
|
||||
|
||||
// Setup WebSocket endpoint
|
||||
// http.HandleFunc("/ws", handler.HandleWebSocket)
|
||||
|
||||
Reference in New Issue
Block a user