Files
ResolveSpec/pkg/eventbroker/provider_database.go
2025-12-30 14:40:45 +02:00

654 lines
17 KiB
Go

package eventbroker
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/bitechdev/ResolveSpec/pkg/common"
"github.com/bitechdev/ResolveSpec/pkg/logger"
)
// DatabaseProvider implements Provider interface using SQL database
// Features:
// - Persistent event storage in database table
// - Full SQL query support for event history
// - PostgreSQL NOTIFY/LISTEN for real-time updates (optional)
// - Polling-based consumption with configurable interval
// - Good for audit trails and event replay
type DatabaseProvider struct {
db common.Database
tableName string
channel string // PostgreSQL NOTIFY channel name
pollInterval time.Duration
instanceID string
useNotify bool // Whether to use PostgreSQL NOTIFY
// Subscriptions
mu sync.RWMutex
subscribers map[string]*dbSubscription
// Statistics
stats DatabaseProviderStats
// Lifecycle
stopPolling chan struct{}
wg sync.WaitGroup
isRunning atomic.Bool
}
// DatabaseProviderStats contains statistics for the database provider
type DatabaseProviderStats struct {
TotalEvents atomic.Int64
EventsPublished atomic.Int64
EventsConsumed atomic.Int64
ActiveSubscribers atomic.Int32
PollErrors atomic.Int64
}
// dbSubscription represents a single database subscription
type dbSubscription struct {
pattern string
ch chan *Event
lastSeenID string
ctx context.Context
cancel context.CancelFunc
}
// DatabaseProviderConfig configures the database provider
type DatabaseProviderConfig struct {
DB common.Database
TableName string
Channel string // PostgreSQL NOTIFY channel (optional)
PollInterval time.Duration
InstanceID string
UseNotify bool // Enable PostgreSQL NOTIFY/LISTEN
}
// NewDatabaseProvider creates a new database event provider
func NewDatabaseProvider(cfg DatabaseProviderConfig) (*DatabaseProvider, error) {
// Apply defaults
if cfg.TableName == "" {
cfg.TableName = "events"
}
if cfg.Channel == "" {
cfg.Channel = "resolvespec_events"
}
if cfg.PollInterval == 0 {
cfg.PollInterval = 1 * time.Second
}
dp := &DatabaseProvider{
db: cfg.DB,
tableName: cfg.TableName,
channel: cfg.Channel,
pollInterval: cfg.PollInterval,
instanceID: cfg.InstanceID,
useNotify: cfg.UseNotify,
subscribers: make(map[string]*dbSubscription),
stopPolling: make(chan struct{}),
}
dp.isRunning.Store(true)
// Create table if it doesn't exist
ctx := context.Background()
if err := dp.createTable(ctx); err != nil {
return nil, fmt.Errorf("failed to create events table: %w", err)
}
// Start polling goroutine for subscriptions
dp.wg.Add(1)
go dp.pollLoop()
logger.Info("Database provider initialized (table: %s, poll_interval: %v, notify: %v)",
cfg.TableName, cfg.PollInterval, cfg.UseNotify)
return dp, nil
}
// Store stores an event
func (dp *DatabaseProvider) Store(ctx context.Context, event *Event) error {
// Marshal metadata to JSON
metadataJSON, err := json.Marshal(event.Metadata)
if err != nil {
return fmt.Errorf("failed to marshal metadata: %w", err)
}
// Insert event
query := fmt.Sprintf(`
INSERT INTO %s (
id, source, type, status, retry_count, error,
payload, user_id, session_id, instance_id,
schema, entity, operation,
created_at, processed_at, completed_at, metadata
) VALUES (
$1, $2, $3, $4, $5, $6,
$7, $8, $9, $10,
$11, $12, $13,
$14, $15, $16, $17
)
`, dp.tableName)
_, err = dp.db.Exec(ctx, query,
event.ID, event.Source, event.Type, event.Status, event.RetryCount, event.Error,
event.Payload, event.UserID, event.SessionID, event.InstanceID,
event.Schema, event.Entity, event.Operation,
event.CreatedAt, event.ProcessedAt, event.CompletedAt, metadataJSON,
)
if err != nil {
return fmt.Errorf("failed to insert event: %w", err)
}
dp.stats.TotalEvents.Add(1)
return nil
}
// Get retrieves an event by ID
func (dp *DatabaseProvider) Get(ctx context.Context, id string) (*Event, error) {
event := &Event{}
var metadataJSON []byte
var processedAt, completedAt sql.NullTime
// Query into individual fields
query := fmt.Sprintf(`
SELECT id, source, type, status, retry_count, error,
payload, user_id, session_id, instance_id,
schema, entity, operation,
created_at, processed_at, completed_at, metadata
FROM %s
WHERE id = $1
`, dp.tableName)
var source, eventType, status, operation string
// Execute raw query
rows, err := dp.db.GetUnderlyingDB().(interface {
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
}).QueryContext(ctx, query, id)
if err != nil {
return nil, fmt.Errorf("failed to query event: %w", err)
}
defer rows.Close()
if !rows.Next() {
return nil, fmt.Errorf("event not found: %s", id)
}
if err := rows.Scan(
&event.ID, &source, &eventType, &status, &event.RetryCount, &event.Error,
&event.Payload, &event.UserID, &event.SessionID, &event.InstanceID,
&event.Schema, &event.Entity, &operation,
&event.CreatedAt, &processedAt, &completedAt, &metadataJSON,
); err != nil {
return nil, fmt.Errorf("failed to scan event: %w", err)
}
// Set enum values
event.Source = EventSource(source)
event.Type = eventType
event.Status = EventStatus(status)
event.Operation = operation
// Handle nullable timestamps
if processedAt.Valid {
event.ProcessedAt = &processedAt.Time
}
if completedAt.Valid {
event.CompletedAt = &completedAt.Time
}
// Unmarshal metadata
if len(metadataJSON) > 0 {
if err := json.Unmarshal(metadataJSON, &event.Metadata); err != nil {
logger.Warn("Failed to unmarshal metadata: %v", err)
}
}
return event, nil
}
// List lists events with optional filters
func (dp *DatabaseProvider) List(ctx context.Context, filter *EventFilter) ([]*Event, error) {
query := fmt.Sprintf("SELECT id, source, type, status, retry_count, error, "+
"payload, user_id, session_id, instance_id, "+
"schema, entity, operation, "+
"created_at, processed_at, completed_at, metadata "+
"FROM %s WHERE 1=1", dp.tableName)
args := []interface{}{}
argNum := 1
// Build WHERE clause
if filter != nil {
if filter.Source != nil {
query += fmt.Sprintf(" AND source = $%d", argNum)
args = append(args, string(*filter.Source))
argNum++
}
if filter.Status != nil {
query += fmt.Sprintf(" AND status = $%d", argNum)
args = append(args, string(*filter.Status))
argNum++
}
if filter.UserID != nil {
query += fmt.Sprintf(" AND user_id = $%d", argNum)
args = append(args, *filter.UserID)
argNum++
}
if filter.Schema != "" {
query += fmt.Sprintf(" AND schema = $%d", argNum)
args = append(args, filter.Schema)
argNum++
}
if filter.Entity != "" {
query += fmt.Sprintf(" AND entity = $%d", argNum)
args = append(args, filter.Entity)
argNum++
}
if filter.Operation != "" {
query += fmt.Sprintf(" AND operation = $%d", argNum)
args = append(args, filter.Operation)
argNum++
}
if filter.InstanceID != "" {
query += fmt.Sprintf(" AND instance_id = $%d", argNum)
args = append(args, filter.InstanceID)
argNum++
}
if filter.StartTime != nil {
query += fmt.Sprintf(" AND created_at >= $%d", argNum)
args = append(args, *filter.StartTime)
argNum++
}
if filter.EndTime != nil {
query += fmt.Sprintf(" AND created_at <= $%d", argNum)
args = append(args, *filter.EndTime)
argNum++
}
}
// Add ORDER BY
query += " ORDER BY created_at DESC"
// Add LIMIT and OFFSET
if filter != nil {
if filter.Limit > 0 {
query += fmt.Sprintf(" LIMIT $%d", argNum)
args = append(args, filter.Limit)
argNum++
}
if filter.Offset > 0 {
query += fmt.Sprintf(" OFFSET $%d", argNum)
args = append(args, filter.Offset)
}
}
// Execute query
rows, err := dp.db.GetUnderlyingDB().(interface {
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
}).QueryContext(ctx, query, args...)
if err != nil {
return nil, fmt.Errorf("failed to query events: %w", err)
}
defer rows.Close()
var results []*Event
for rows.Next() {
event := &Event{}
var source, eventType, status, operation string
var metadataJSON []byte
var processedAt, completedAt sql.NullTime
err := rows.Scan(
&event.ID, &source, &eventType, &status, &event.RetryCount, &event.Error,
&event.Payload, &event.UserID, &event.SessionID, &event.InstanceID,
&event.Schema, &event.Entity, &operation,
&event.CreatedAt, &processedAt, &completedAt, &metadataJSON,
)
if err != nil {
logger.Warn("Failed to scan event: %v", err)
continue
}
// Set enum values
event.Source = EventSource(source)
event.Type = eventType
event.Status = EventStatus(status)
event.Operation = operation
// Handle nullable timestamps
if processedAt.Valid {
event.ProcessedAt = &processedAt.Time
}
if completedAt.Valid {
event.CompletedAt = &completedAt.Time
}
// Unmarshal metadata
if len(metadataJSON) > 0 {
if err := json.Unmarshal(metadataJSON, &event.Metadata); err != nil {
logger.Warn("Failed to unmarshal metadata: %v", err)
}
}
results = append(results, event)
}
return results, nil
}
// UpdateStatus updates the status of an event
func (dp *DatabaseProvider) UpdateStatus(ctx context.Context, id string, status EventStatus, errorMsg string) error {
query := fmt.Sprintf(`
UPDATE %s
SET status = $1, error = $2
WHERE id = $3
`, dp.tableName)
_, err := dp.db.Exec(ctx, query, string(status), errorMsg, id)
if err != nil {
return fmt.Errorf("failed to update status: %w", err)
}
return nil
}
// Delete deletes an event by ID
func (dp *DatabaseProvider) Delete(ctx context.Context, id string) error {
query := fmt.Sprintf("DELETE FROM %s WHERE id = $1", dp.tableName)
_, err := dp.db.Exec(ctx, query, id)
if err != nil {
return fmt.Errorf("failed to delete event: %w", err)
}
dp.stats.TotalEvents.Add(-1)
return nil
}
// Stream returns a channel of events for real-time consumption
func (dp *DatabaseProvider) Stream(ctx context.Context, pattern string) (<-chan *Event, error) {
ch := make(chan *Event, 100)
subCtx, cancel := context.WithCancel(ctx)
sub := &dbSubscription{
pattern: pattern,
ch: ch,
lastSeenID: "",
ctx: subCtx,
cancel: cancel,
}
dp.mu.Lock()
dp.subscribers[pattern] = sub
dp.stats.ActiveSubscribers.Add(1)
dp.mu.Unlock()
return ch, nil
}
// Publish publishes an event to all subscribers
func (dp *DatabaseProvider) Publish(ctx context.Context, event *Event) error {
// Store the event first
if err := dp.Store(ctx, event); err != nil {
return err
}
dp.stats.EventsPublished.Add(1)
// If using PostgreSQL NOTIFY, send notification
if dp.useNotify {
if err := dp.notify(ctx, event.ID); err != nil {
logger.Warn("Failed to send NOTIFY: %v", err)
}
}
return nil
}
// Close closes the provider and releases resources
func (dp *DatabaseProvider) Close() error {
if !dp.isRunning.Load() {
return nil
}
dp.isRunning.Store(false)
// Cancel all subscriptions
dp.mu.Lock()
for _, sub := range dp.subscribers {
sub.cancel()
}
dp.mu.Unlock()
// Stop polling
close(dp.stopPolling)
// Wait for goroutines
dp.wg.Wait()
logger.Info("Database provider closed")
return nil
}
// Stats returns provider statistics
func (dp *DatabaseProvider) Stats(ctx context.Context) (*ProviderStats, error) {
// Get counts by status
query := fmt.Sprintf(`
SELECT
COUNT(*) FILTER (WHERE status = 'pending') as pending,
COUNT(*) FILTER (WHERE status = 'processing') as processing,
COUNT(*) FILTER (WHERE status = 'completed') as completed,
COUNT(*) FILTER (WHERE status = 'failed') as failed,
COUNT(*) as total
FROM %s
`, dp.tableName)
var pending, processing, completed, failed, total int64
rows, err := dp.db.GetUnderlyingDB().(interface {
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
}).QueryContext(ctx, query)
if err != nil {
logger.Warn("Failed to get stats: %v", err)
} else {
defer rows.Close()
if rows.Next() {
if err := rows.Scan(&pending, &processing, &completed, &failed, &total); err != nil {
logger.Warn("Failed to scan stats: %v", err)
}
}
}
return &ProviderStats{
ProviderType: "database",
TotalEvents: total,
PendingEvents: pending,
ProcessingEvents: processing,
CompletedEvents: completed,
FailedEvents: failed,
EventsPublished: dp.stats.EventsPublished.Load(),
EventsConsumed: dp.stats.EventsConsumed.Load(),
ActiveSubscribers: int(dp.stats.ActiveSubscribers.Load()),
ProviderSpecific: map[string]interface{}{
"table_name": dp.tableName,
"poll_interval": dp.pollInterval.String(),
"use_notify": dp.useNotify,
"poll_errors": dp.stats.PollErrors.Load(),
},
}, nil
}
// pollLoop periodically polls for new events
func (dp *DatabaseProvider) pollLoop() {
defer dp.wg.Done()
ticker := time.NewTicker(dp.pollInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
dp.pollEvents()
case <-dp.stopPolling:
return
}
}
}
// pollEvents polls for new events and delivers to subscribers
func (dp *DatabaseProvider) pollEvents() {
dp.mu.RLock()
subscribers := make([]*dbSubscription, 0, len(dp.subscribers))
for _, sub := range dp.subscribers {
subscribers = append(subscribers, sub)
}
dp.mu.RUnlock()
for _, sub := range subscribers {
// Query for new events since last seen
query := fmt.Sprintf(`
SELECT id, source, type, status, retry_count, error,
payload, user_id, session_id, instance_id,
schema, entity, operation,
created_at, processed_at, completed_at, metadata
FROM %s
WHERE id > $1
ORDER BY created_at ASC
LIMIT 100
`, dp.tableName)
lastSeenID := sub.lastSeenID
if lastSeenID == "" {
lastSeenID = "00000000-0000-0000-0000-000000000000"
}
rows, err := dp.db.GetUnderlyingDB().(interface {
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
}).QueryContext(sub.ctx, query, lastSeenID)
if err != nil {
dp.stats.PollErrors.Add(1)
logger.Warn("Failed to poll events: %v", err)
continue
}
for rows.Next() {
event := &Event{}
var source, eventType, status, operation string
var metadataJSON []byte
var processedAt, completedAt sql.NullTime
err := rows.Scan(
&event.ID, &source, &eventType, &status, &event.RetryCount, &event.Error,
&event.Payload, &event.UserID, &event.SessionID, &event.InstanceID,
&event.Schema, &event.Entity, &operation,
&event.CreatedAt, &processedAt, &completedAt, &metadataJSON,
)
if err != nil {
logger.Warn("Failed to scan event: %v", err)
continue
}
// Set enum values
event.Source = EventSource(source)
event.Type = eventType
event.Status = EventStatus(status)
event.Operation = operation
// Handle nullable timestamps
if processedAt.Valid {
event.ProcessedAt = &processedAt.Time
}
if completedAt.Valid {
event.CompletedAt = &completedAt.Time
}
// Unmarshal metadata
if len(metadataJSON) > 0 {
if err := json.Unmarshal(metadataJSON, &event.Metadata); err != nil {
logger.Warn("Failed to unmarshal metadata: %v", err)
}
}
// Check if event matches pattern
if matchPattern(sub.pattern, event.Type) {
select {
case sub.ch <- event:
dp.stats.EventsConsumed.Add(1)
sub.lastSeenID = event.ID
case <-sub.ctx.Done():
rows.Close()
return
default:
// Channel full, skip
logger.Warn("Subscriber channel full for pattern: %s", sub.pattern)
}
}
sub.lastSeenID = event.ID
}
rows.Close()
}
}
// notify sends a PostgreSQL NOTIFY message
func (dp *DatabaseProvider) notify(ctx context.Context, eventID string) error {
query := fmt.Sprintf("NOTIFY %s, '%s'", dp.channel, eventID)
_, err := dp.db.Exec(ctx, query)
return err
}
// createTable creates the events table if it doesn't exist
func (dp *DatabaseProvider) createTable(ctx context.Context) error {
query := fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id VARCHAR(255) PRIMARY KEY,
source VARCHAR(50) NOT NULL,
type VARCHAR(255) NOT NULL,
status VARCHAR(50) NOT NULL,
retry_count INTEGER DEFAULT 0,
error TEXT,
payload JSONB,
user_id INTEGER,
session_id VARCHAR(255),
instance_id VARCHAR(255),
schema VARCHAR(255),
entity VARCHAR(255),
operation VARCHAR(50),
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
processed_at TIMESTAMP,
completed_at TIMESTAMP,
metadata JSONB
)
`, dp.tableName)
if _, err := dp.db.Exec(ctx, query); err != nil {
return fmt.Errorf("failed to create table: %w", err)
}
// Create indexes
indexes := []string{
fmt.Sprintf("CREATE INDEX IF NOT EXISTS idx_%s_source ON %s(source)", dp.tableName, dp.tableName),
fmt.Sprintf("CREATE INDEX IF NOT EXISTS idx_%s_type ON %s(type)", dp.tableName, dp.tableName),
fmt.Sprintf("CREATE INDEX IF NOT EXISTS idx_%s_status ON %s(status)", dp.tableName, dp.tableName),
fmt.Sprintf("CREATE INDEX IF NOT EXISTS idx_%s_created_at ON %s(created_at)", dp.tableName, dp.tableName),
fmt.Sprintf("CREATE INDEX IF NOT EXISTS idx_%s_instance_id ON %s(instance_id)", dp.tableName, dp.tableName),
}
for _, indexQuery := range indexes {
if _, err := dp.db.Exec(ctx, indexQuery); err != nil {
logger.Warn("Failed to create index: %v", err)
}
}
return nil
}