mirror of
https://github.com/bitechdev/ResolveSpec.git
synced 2026-01-05 11:24:26 +00:00
Event Broken Concept
This commit is contained in:
453
pkg/eventbroker/broker.go
Normal file
453
pkg/eventbroker/broker.go
Normal file
@@ -0,0 +1,453 @@
|
||||
package eventbroker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/bitechdev/ResolveSpec/pkg/logger"
|
||||
)
|
||||
|
||||
// Broker is the main interface for event publishing and subscription
|
||||
type Broker interface {
|
||||
// Publish publishes an event (mode-dependent: sync or async)
|
||||
Publish(ctx context.Context, event *Event) error
|
||||
|
||||
// PublishSync publishes an event synchronously (blocks until all handlers complete)
|
||||
PublishSync(ctx context.Context, event *Event) error
|
||||
|
||||
// PublishAsync publishes an event asynchronously (returns immediately)
|
||||
PublishAsync(ctx context.Context, event *Event) error
|
||||
|
||||
// Subscribe registers a handler for events matching the pattern
|
||||
Subscribe(pattern string, handler EventHandler) (SubscriptionID, error)
|
||||
|
||||
// Unsubscribe removes a subscription
|
||||
Unsubscribe(id SubscriptionID) error
|
||||
|
||||
// Start starts the broker (begins processing events)
|
||||
Start(ctx context.Context) error
|
||||
|
||||
// Stop stops the broker gracefully (flushes pending events)
|
||||
Stop(ctx context.Context) error
|
||||
|
||||
// Stats returns broker statistics
|
||||
Stats(ctx context.Context) (*BrokerStats, error)
|
||||
|
||||
// InstanceID returns the instance ID of this broker
|
||||
InstanceID() string
|
||||
}
|
||||
|
||||
// ProcessingMode determines how events are processed
|
||||
type ProcessingMode string
|
||||
|
||||
const (
|
||||
ProcessingModeSync ProcessingMode = "sync"
|
||||
ProcessingModeAsync ProcessingMode = "async"
|
||||
)
|
||||
|
||||
// BrokerStats contains broker statistics
|
||||
type BrokerStats struct {
|
||||
InstanceID string `json:"instance_id"`
|
||||
Mode ProcessingMode `json:"mode"`
|
||||
IsRunning bool `json:"is_running"`
|
||||
TotalPublished int64 `json:"total_published"`
|
||||
TotalProcessed int64 `json:"total_processed"`
|
||||
TotalFailed int64 `json:"total_failed"`
|
||||
ActiveSubscribers int `json:"active_subscribers"`
|
||||
QueueSize int `json:"queue_size,omitempty"` // For async mode
|
||||
ActiveWorkers int `json:"active_workers,omitempty"` // For async mode
|
||||
ProviderStats *ProviderStats `json:"provider_stats,omitempty"`
|
||||
AdditionalStats map[string]interface{} `json:"additional_stats,omitempty"`
|
||||
}
|
||||
|
||||
// EventBroker implements the Broker interface
|
||||
type EventBroker struct {
|
||||
provider Provider
|
||||
subscriptions *subscriptionManager
|
||||
mode ProcessingMode
|
||||
instanceID string
|
||||
retryPolicy *RetryPolicy
|
||||
|
||||
// Async mode fields (initialized in Phase 4)
|
||||
workerPool *workerPool
|
||||
|
||||
// Runtime state
|
||||
isRunning atomic.Bool
|
||||
stopOnce sync.Once
|
||||
stopCh chan struct{}
|
||||
wg sync.WaitGroup
|
||||
|
||||
// Statistics
|
||||
statsPublished atomic.Int64
|
||||
statsProcessed atomic.Int64
|
||||
statsFailed atomic.Int64
|
||||
}
|
||||
|
||||
// RetryPolicy defines how failed events should be retried
|
||||
type RetryPolicy struct {
|
||||
MaxRetries int
|
||||
InitialDelay time.Duration
|
||||
MaxDelay time.Duration
|
||||
BackoffFactor float64
|
||||
}
|
||||
|
||||
// DefaultRetryPolicy returns a sensible default retry policy
|
||||
func DefaultRetryPolicy() *RetryPolicy {
|
||||
return &RetryPolicy{
|
||||
MaxRetries: 3,
|
||||
InitialDelay: 1 * time.Second,
|
||||
MaxDelay: 30 * time.Second,
|
||||
BackoffFactor: 2.0,
|
||||
}
|
||||
}
|
||||
|
||||
// Options for creating a new broker
|
||||
type Options struct {
|
||||
Provider Provider
|
||||
Mode ProcessingMode
|
||||
WorkerCount int // For async mode
|
||||
BufferSize int // For async mode
|
||||
RetryPolicy *RetryPolicy
|
||||
InstanceID string
|
||||
}
|
||||
|
||||
// NewBroker creates a new event broker with the given options
|
||||
func NewBroker(opts Options) (*EventBroker, error) {
|
||||
if opts.Provider == nil {
|
||||
return nil, fmt.Errorf("provider is required")
|
||||
}
|
||||
if opts.InstanceID == "" {
|
||||
return nil, fmt.Errorf("instance ID is required")
|
||||
}
|
||||
if opts.Mode == "" {
|
||||
opts.Mode = ProcessingModeAsync // Default to async
|
||||
}
|
||||
if opts.RetryPolicy == nil {
|
||||
opts.RetryPolicy = DefaultRetryPolicy()
|
||||
}
|
||||
|
||||
broker := &EventBroker{
|
||||
provider: opts.Provider,
|
||||
subscriptions: newSubscriptionManager(),
|
||||
mode: opts.Mode,
|
||||
instanceID: opts.InstanceID,
|
||||
retryPolicy: opts.RetryPolicy,
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
// Worker pool will be initialized in Phase 4 for async mode
|
||||
if opts.Mode == ProcessingModeAsync {
|
||||
if opts.WorkerCount == 0 {
|
||||
opts.WorkerCount = 10 // Default
|
||||
}
|
||||
if opts.BufferSize == 0 {
|
||||
opts.BufferSize = 1000 // Default
|
||||
}
|
||||
broker.workerPool = newWorkerPool(opts.WorkerCount, opts.BufferSize, broker.processEvent)
|
||||
}
|
||||
|
||||
return broker, nil
|
||||
}
|
||||
|
||||
// Functional option pattern helpers
|
||||
func WithProvider(p Provider) func(*Options) {
|
||||
return func(o *Options) { o.Provider = p }
|
||||
}
|
||||
|
||||
func WithMode(m ProcessingMode) func(*Options) {
|
||||
return func(o *Options) { o.Mode = m }
|
||||
}
|
||||
|
||||
func WithWorkerCount(count int) func(*Options) {
|
||||
return func(o *Options) { o.WorkerCount = count }
|
||||
}
|
||||
|
||||
func WithBufferSize(size int) func(*Options) {
|
||||
return func(o *Options) { o.BufferSize = size }
|
||||
}
|
||||
|
||||
func WithRetryPolicy(policy *RetryPolicy) func(*Options) {
|
||||
return func(o *Options) { o.RetryPolicy = policy }
|
||||
}
|
||||
|
||||
func WithInstanceID(id string) func(*Options) {
|
||||
return func(o *Options) { o.InstanceID = id }
|
||||
}
|
||||
|
||||
// Start starts the broker
|
||||
func (b *EventBroker) Start(ctx context.Context) error {
|
||||
if b.isRunning.Load() {
|
||||
return fmt.Errorf("broker already running")
|
||||
}
|
||||
|
||||
b.isRunning.Store(true)
|
||||
|
||||
// Start worker pool for async mode
|
||||
if b.mode == ProcessingModeAsync && b.workerPool != nil {
|
||||
b.workerPool.Start()
|
||||
}
|
||||
|
||||
logger.Info("Event broker started (mode: %s, instance: %s)", b.mode, b.instanceID)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop stops the broker gracefully
|
||||
func (b *EventBroker) Stop(ctx context.Context) error {
|
||||
var stopErr error
|
||||
|
||||
b.stopOnce.Do(func() {
|
||||
logger.Info("Stopping event broker...")
|
||||
|
||||
// Mark as not running
|
||||
b.isRunning.Store(false)
|
||||
|
||||
// Close the stop channel
|
||||
close(b.stopCh)
|
||||
|
||||
// Stop worker pool for async mode
|
||||
if b.mode == ProcessingModeAsync && b.workerPool != nil {
|
||||
if err := b.workerPool.Stop(ctx); err != nil {
|
||||
logger.Error("Error stopping worker pool: %v", err)
|
||||
stopErr = err
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for all goroutines
|
||||
b.wg.Wait()
|
||||
|
||||
// Close provider
|
||||
if err := b.provider.Close(); err != nil {
|
||||
logger.Error("Error closing provider: %v", err)
|
||||
if stopErr == nil {
|
||||
stopErr = err
|
||||
}
|
||||
}
|
||||
|
||||
logger.Info("Event broker stopped")
|
||||
})
|
||||
|
||||
return stopErr
|
||||
}
|
||||
|
||||
// Publish publishes an event based on the broker's mode
|
||||
func (b *EventBroker) Publish(ctx context.Context, event *Event) error {
|
||||
if b.mode == ProcessingModeSync {
|
||||
return b.PublishSync(ctx, event)
|
||||
}
|
||||
return b.PublishAsync(ctx, event)
|
||||
}
|
||||
|
||||
// PublishSync publishes an event synchronously
|
||||
func (b *EventBroker) PublishSync(ctx context.Context, event *Event) error {
|
||||
if !b.isRunning.Load() {
|
||||
return fmt.Errorf("broker is not running")
|
||||
}
|
||||
|
||||
// Validate event
|
||||
if err := event.Validate(); err != nil {
|
||||
return fmt.Errorf("invalid event: %w", err)
|
||||
}
|
||||
|
||||
// Store event in provider
|
||||
if err := b.provider.Publish(ctx, event); err != nil {
|
||||
return fmt.Errorf("failed to publish event: %w", err)
|
||||
}
|
||||
|
||||
b.statsPublished.Add(1)
|
||||
|
||||
// Record metrics
|
||||
recordEventPublished(event)
|
||||
|
||||
// Process event synchronously
|
||||
if err := b.processEvent(ctx, event); err != nil {
|
||||
logger.Error("Failed to process event %s: %v", event.ID, err)
|
||||
b.statsFailed.Add(1)
|
||||
return err
|
||||
}
|
||||
|
||||
b.statsProcessed.Add(1)
|
||||
return nil
|
||||
}
|
||||
|
||||
// PublishAsync publishes an event asynchronously
|
||||
func (b *EventBroker) PublishAsync(ctx context.Context, event *Event) error {
|
||||
if !b.isRunning.Load() {
|
||||
return fmt.Errorf("broker is not running")
|
||||
}
|
||||
|
||||
// Validate event
|
||||
if err := event.Validate(); err != nil {
|
||||
return fmt.Errorf("invalid event: %w", err)
|
||||
}
|
||||
|
||||
// Store event in provider
|
||||
if err := b.provider.Publish(ctx, event); err != nil {
|
||||
return fmt.Errorf("failed to publish event: %w", err)
|
||||
}
|
||||
|
||||
b.statsPublished.Add(1)
|
||||
|
||||
// Record metrics
|
||||
recordEventPublished(event)
|
||||
|
||||
// Queue for async processing
|
||||
if b.mode == ProcessingModeAsync && b.workerPool != nil {
|
||||
// Update queue size metrics
|
||||
updateQueueSize(int64(b.workerPool.QueueSize()))
|
||||
return b.workerPool.Submit(ctx, event)
|
||||
}
|
||||
|
||||
// Fallback to sync if async not configured
|
||||
return b.processEvent(ctx, event)
|
||||
}
|
||||
|
||||
// Subscribe adds a subscription for events matching the pattern
|
||||
func (b *EventBroker) Subscribe(pattern string, handler EventHandler) (SubscriptionID, error) {
|
||||
return b.subscriptions.Subscribe(pattern, handler)
|
||||
}
|
||||
|
||||
// Unsubscribe removes a subscription
|
||||
func (b *EventBroker) Unsubscribe(id SubscriptionID) error {
|
||||
return b.subscriptions.Unsubscribe(id)
|
||||
}
|
||||
|
||||
// processEvent processes an event by calling all matching handlers
|
||||
func (b *EventBroker) processEvent(ctx context.Context, event *Event) error {
|
||||
startTime := time.Now()
|
||||
|
||||
// Get all handlers matching this event type
|
||||
handlers := b.subscriptions.GetMatching(event.Type)
|
||||
|
||||
if len(handlers) == 0 {
|
||||
logger.Debug("No handlers for event type: %s", event.Type)
|
||||
return nil
|
||||
}
|
||||
|
||||
logger.Debug("Processing event %s with %d handler(s)", event.ID, len(handlers))
|
||||
|
||||
// Mark event as processing
|
||||
event.MarkProcessing()
|
||||
if err := b.provider.UpdateStatus(ctx, event.ID, EventStatusProcessing, ""); err != nil {
|
||||
logger.Warn("Failed to update event status: %v", err)
|
||||
}
|
||||
|
||||
// Execute all handlers
|
||||
var lastErr error
|
||||
for i, handler := range handlers {
|
||||
if err := b.executeHandlerWithRetry(ctx, handler, event); err != nil {
|
||||
logger.Error("Handler %d failed for event %s: %v", i+1, event.ID, err)
|
||||
lastErr = err
|
||||
// Continue processing other handlers
|
||||
}
|
||||
}
|
||||
|
||||
// Update final status
|
||||
if lastErr != nil {
|
||||
event.MarkFailed(lastErr)
|
||||
if err := b.provider.UpdateStatus(ctx, event.ID, EventStatusFailed, lastErr.Error()); err != nil {
|
||||
logger.Warn("Failed to update event status: %v", err)
|
||||
}
|
||||
|
||||
// Record metrics
|
||||
recordEventProcessed(event, time.Since(startTime))
|
||||
|
||||
return lastErr
|
||||
}
|
||||
|
||||
event.MarkCompleted()
|
||||
if err := b.provider.UpdateStatus(ctx, event.ID, EventStatusCompleted, ""); err != nil {
|
||||
logger.Warn("Failed to update event status: %v", err)
|
||||
}
|
||||
|
||||
// Record metrics
|
||||
recordEventProcessed(event, time.Since(startTime))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// executeHandlerWithRetry executes a handler with retry logic
|
||||
func (b *EventBroker) executeHandlerWithRetry(ctx context.Context, handler EventHandler, event *Event) error {
|
||||
var lastErr error
|
||||
|
||||
for attempt := 0; attempt <= b.retryPolicy.MaxRetries; attempt++ {
|
||||
if attempt > 0 {
|
||||
// Calculate backoff delay
|
||||
delay := b.calculateBackoff(attempt)
|
||||
logger.Debug("Retrying event %s (attempt %d/%d) after %v",
|
||||
event.ID, attempt, b.retryPolicy.MaxRetries, delay)
|
||||
|
||||
select {
|
||||
case <-time.After(delay):
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
event.IncrementRetry()
|
||||
}
|
||||
|
||||
// Execute handler
|
||||
if err := handler.Handle(ctx, event); err != nil {
|
||||
lastErr = err
|
||||
logger.Warn("Handler failed for event %s (attempt %d): %v", event.ID, attempt+1, err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Success
|
||||
return nil
|
||||
}
|
||||
|
||||
return fmt.Errorf("handler failed after %d attempts: %w", b.retryPolicy.MaxRetries+1, lastErr)
|
||||
}
|
||||
|
||||
// calculateBackoff calculates the backoff delay for a retry attempt
|
||||
func (b *EventBroker) calculateBackoff(attempt int) time.Duration {
|
||||
delay := float64(b.retryPolicy.InitialDelay) * pow(b.retryPolicy.BackoffFactor, float64(attempt-1))
|
||||
if delay > float64(b.retryPolicy.MaxDelay) {
|
||||
delay = float64(b.retryPolicy.MaxDelay)
|
||||
}
|
||||
return time.Duration(delay)
|
||||
}
|
||||
|
||||
// pow is a simple integer power function
|
||||
func pow(base float64, exp float64) float64 {
|
||||
result := 1.0
|
||||
for i := 0.0; i < exp; i++ {
|
||||
result *= base
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// Stats returns broker statistics
|
||||
func (b *EventBroker) Stats(ctx context.Context) (*BrokerStats, error) {
|
||||
providerStats, err := b.provider.Stats(ctx)
|
||||
if err != nil {
|
||||
logger.Warn("Failed to get provider stats: %v", err)
|
||||
}
|
||||
|
||||
stats := &BrokerStats{
|
||||
InstanceID: b.instanceID,
|
||||
Mode: b.mode,
|
||||
IsRunning: b.isRunning.Load(),
|
||||
TotalPublished: b.statsPublished.Load(),
|
||||
TotalProcessed: b.statsProcessed.Load(),
|
||||
TotalFailed: b.statsFailed.Load(),
|
||||
ActiveSubscribers: b.subscriptions.Count(),
|
||||
ProviderStats: providerStats,
|
||||
}
|
||||
|
||||
// Add async-specific stats
|
||||
if b.mode == ProcessingModeAsync && b.workerPool != nil {
|
||||
stats.QueueSize = b.workerPool.QueueSize()
|
||||
stats.ActiveWorkers = b.workerPool.ActiveWorkers()
|
||||
}
|
||||
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
// InstanceID returns the instance ID
|
||||
func (b *EventBroker) InstanceID() string {
|
||||
return b.instanceID
|
||||
}
|
||||
Reference in New Issue
Block a user