feat(cache): 🎉 add message caching functionality
* Implement MessageCache to store events when no webhooks are available. * Add configuration options for enabling cache, setting data path, max age, and max events. * Create API endpoints for managing cached events, including listing, replaying, and deleting. * Integrate caching into the hooks manager to store events when no active webhooks are found. * Enhance logging for better traceability of cached events and operations.
This commit is contained in:
@@ -11,6 +11,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.warky.dev/wdevs/whatshooked/pkg/cache"
|
||||
"git.warky.dev/wdevs/whatshooked/pkg/config"
|
||||
"git.warky.dev/wdevs/whatshooked/pkg/events"
|
||||
"git.warky.dev/wdevs/whatshooked/pkg/logging"
|
||||
@@ -54,13 +55,15 @@ type Manager struct {
|
||||
mu sync.RWMutex
|
||||
client *http.Client
|
||||
eventBus *events.EventBus
|
||||
cache *cache.MessageCache
|
||||
}
|
||||
|
||||
// NewManager creates a new hook manager
|
||||
func NewManager(eventBus *events.EventBus) *Manager {
|
||||
func NewManager(eventBus *events.EventBus, messageCache *cache.MessageCache) *Manager {
|
||||
return &Manager{
|
||||
hooks: make(map[string]config.Hook),
|
||||
eventBus: eventBus,
|
||||
cache: messageCache,
|
||||
client: &http.Client{
|
||||
Timeout: 30 * time.Second,
|
||||
},
|
||||
@@ -128,14 +131,48 @@ func (m *Manager) handleEvent(event events.Event) {
|
||||
|
||||
logging.Debug("Found relevant hooks for event", "event_type", event.Type, "hook_count", len(relevantHooks))
|
||||
|
||||
// If no relevant hooks found, cache the event
|
||||
if len(relevantHooks) == 0 {
|
||||
if m.cache != nil && m.cache.IsEnabled() {
|
||||
reason := fmt.Sprintf("No active webhooks configured for event type: %s", event.Type)
|
||||
if err := m.cache.Store(event, reason); err != nil {
|
||||
logging.Error("Failed to cache event", "event_type", event.Type, "error", err)
|
||||
} else {
|
||||
logging.Info("Event cached due to no active webhooks",
|
||||
"event_type", event.Type,
|
||||
"cache_size", m.cache.Count())
|
||||
}
|
||||
} else {
|
||||
logging.Warn("No active webhooks for event and caching is disabled",
|
||||
"event_type", event.Type)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Trigger each relevant hook
|
||||
if len(relevantHooks) > 0 {
|
||||
m.triggerHooksForEvent(event, relevantHooks)
|
||||
success := m.triggerHooksForEvent(event, relevantHooks)
|
||||
|
||||
// If event was successfully delivered and it was previously cached, remove it from cache
|
||||
if success && m.cache != nil && m.cache.IsEnabled() {
|
||||
// Try to find and remove this event from cache
|
||||
// (This handles the case where a cached event is being replayed)
|
||||
cachedEvents := m.cache.List()
|
||||
for _, cached := range cachedEvents {
|
||||
if cached.Event.Type == event.Type &&
|
||||
cached.Event.Timestamp.Equal(event.Timestamp) {
|
||||
if err := m.cache.Remove(cached.ID); err == nil {
|
||||
logging.Info("Cached event successfully delivered and removed from cache",
|
||||
"event_id", cached.ID,
|
||||
"event_type", event.Type)
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// triggerHooksForEvent sends event data to specific hooks
|
||||
func (m *Manager) triggerHooksForEvent(event events.Event, hooks []config.Hook) {
|
||||
// triggerHooksForEvent sends event data to specific hooks and returns success status
|
||||
func (m *Manager) triggerHooksForEvent(event events.Event, hooks []config.Hook) bool {
|
||||
ctx := event.Context
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
@@ -184,14 +221,26 @@ func (m *Manager) triggerHooksForEvent(event events.Event, hooks []config.Hook)
|
||||
|
||||
// Send to each hook with the event type
|
||||
var wg sync.WaitGroup
|
||||
successCount := 0
|
||||
mu := sync.Mutex{}
|
||||
|
||||
for _, hook := range hooks {
|
||||
wg.Add(1)
|
||||
go func(h config.Hook, et events.EventType) {
|
||||
defer wg.Done()
|
||||
_ = m.sendToHook(ctx, h, payload, et)
|
||||
resp := m.sendToHook(ctx, h, payload, et)
|
||||
if resp != nil || ctx.Err() == nil {
|
||||
// Count as success if we got a response or context is still valid
|
||||
mu.Lock()
|
||||
successCount++
|
||||
mu.Unlock()
|
||||
}
|
||||
}(hook, event.Type)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// Return true if at least one hook was successfully triggered
|
||||
return successCount > 0
|
||||
}
|
||||
|
||||
// Helper functions to extract data from event map
|
||||
@@ -379,3 +428,77 @@ func (m *Manager) sendToHook(ctx context.Context, hook config.Hook, payload inte
|
||||
m.eventBus.Publish(events.HookSuccessEvent(eventCtx, hook.ID, hook.Name, resp.StatusCode, hookResp))
|
||||
return &hookResp
|
||||
}
|
||||
|
||||
// ReplayCachedEvents attempts to replay all cached events
|
||||
func (m *Manager) ReplayCachedEvents() (successCountResult int, failCountResult int, err error) {
|
||||
if m.cache == nil || !m.cache.IsEnabled() {
|
||||
return 0, 0, fmt.Errorf("message cache is not enabled")
|
||||
}
|
||||
|
||||
cachedEvents := m.cache.List()
|
||||
if len(cachedEvents) == 0 {
|
||||
return 0, 0, nil
|
||||
}
|
||||
|
||||
logging.Info("Replaying cached events", "count", len(cachedEvents))
|
||||
|
||||
successCount := 0
|
||||
failCount := 0
|
||||
|
||||
for _, cached := range cachedEvents {
|
||||
// Try to process the event again
|
||||
m.handleEvent(cached.Event)
|
||||
|
||||
// Increment attempt counter
|
||||
if err := m.cache.IncrementAttempts(cached.ID); err != nil {
|
||||
logging.Error("Failed to increment attempt counter", "event_id", cached.ID, "error", err)
|
||||
}
|
||||
|
||||
// Check if event was successfully delivered by seeing if it's still cached
|
||||
// (handleEvent will remove it from cache if successfully delivered)
|
||||
time.Sleep(100 * time.Millisecond) // Give time for async delivery
|
||||
|
||||
if _, exists := m.cache.Get(cached.ID); !exists {
|
||||
successCount++
|
||||
logging.Debug("Cached event successfully replayed", "event_id", cached.ID)
|
||||
} else {
|
||||
failCount++
|
||||
}
|
||||
}
|
||||
|
||||
logging.Info("Cached event replay complete",
|
||||
"success", successCount,
|
||||
"failed", failCount,
|
||||
"remaining_cached", m.cache.Count())
|
||||
|
||||
return successCount, failCount, nil
|
||||
}
|
||||
|
||||
// ReplayCachedEvent attempts to replay a single cached event by ID
|
||||
func (m *Manager) ReplayCachedEvent(id string) error {
|
||||
if m.cache == nil || !m.cache.IsEnabled() {
|
||||
return fmt.Errorf("message cache is not enabled")
|
||||
}
|
||||
|
||||
cached, exists := m.cache.Get(id)
|
||||
if !exists {
|
||||
return fmt.Errorf("cached event not found: %s", id)
|
||||
}
|
||||
|
||||
logging.Info("Replaying cached event", "event_id", id, "event_type", cached.Event.Type)
|
||||
|
||||
// Process the event
|
||||
m.handleEvent(cached.Event)
|
||||
|
||||
// Increment attempt counter
|
||||
if err := m.cache.IncrementAttempts(id); err != nil {
|
||||
logging.Error("Failed to increment attempt counter", "event_id", id, "error", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetCache returns the message cache (for external access)
|
||||
func (m *Manager) GetCache() *cache.MessageCache {
|
||||
return m.cache
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user