package hooks import ( "bytes" "context" "encoding/json" "fmt" "io" "net/http" "net/url" "sync" "time" "git.warky.dev/wdevs/whatshooked/pkg/cache" "git.warky.dev/wdevs/whatshooked/pkg/config" "git.warky.dev/wdevs/whatshooked/pkg/events" "git.warky.dev/wdevs/whatshooked/pkg/logging" ) // MediaInfo represents media attachment information type MediaInfo struct { Type string `json:"type"` MimeType string `json:"mime_type,omitempty"` Filename string `json:"filename,omitempty"` URL string `json:"url,omitempty"` Base64 string `json:"base64,omitempty"` } // MessagePayload represents a message sent to webhooks type MessagePayload struct { AccountID string `json:"account_id"` MessageID string `json:"message_id"` From string `json:"from"` To string `json:"to"` Text string `json:"text"` Timestamp time.Time `json:"timestamp"` IsGroup bool `json:"is_group"` GroupName string `json:"group_name,omitempty"` SenderName string `json:"sender_name,omitempty"` MessageType string `json:"message_type"` Media *MediaInfo `json:"media,omitempty"` } // HookResponse represents a response from a webhook type HookResponse struct { SendMessage bool `json:"send_message"` To string `json:"to"` Text string `json:"text"` AccountID string `json:"account_id,omitempty"` } // Manager manages webhooks type Manager struct { hooks map[string]config.Hook mu sync.RWMutex client *http.Client eventBus *events.EventBus cache *cache.MessageCache } // NewManager creates a new hook manager func NewManager(eventBus *events.EventBus, messageCache *cache.MessageCache) *Manager { return &Manager{ hooks: make(map[string]config.Hook), eventBus: eventBus, cache: messageCache, client: &http.Client{ Timeout: 30 * time.Second, }, } } // Start begins listening for events func (m *Manager) Start() { // Get all possible event types allEventTypes := []events.EventType{ events.EventWhatsAppConnected, events.EventWhatsAppDisconnected, events.EventWhatsAppPairSuccess, events.EventWhatsAppPairFailed, events.EventWhatsAppQRCode, events.EventWhatsAppQRTimeout, events.EventWhatsAppQRError, events.EventWhatsAppPairEvent, events.EventMessageReceived, events.EventMessageSent, events.EventMessageFailed, events.EventMessageDelivered, events.EventMessageRead, } // Subscribe to all event types with a generic handler for _, eventType := range allEventTypes { m.eventBus.Subscribe(eventType, m.handleEvent) } logging.Info("Hook manager started and subscribed to events", "event_types", len(allEventTypes)) } // handleEvent processes any event and triggers relevant hooks func (m *Manager) handleEvent(event events.Event) { logging.Debug("Hook manager received event", "event_type", event.Type) // Get hooks that are subscribed to this event type m.mu.RLock() relevantHooks := make([]config.Hook, 0) for _, hook := range m.hooks { if !hook.Active { logging.Debug("Skipping inactive hook", "hook_id", hook.ID) continue } // If hook has no events specified, subscribe to all events if len(hook.Events) == 0 { logging.Debug("Hook subscribes to all events", "hook_id", hook.ID) relevantHooks = append(relevantHooks, hook) continue } // Check if this hook is subscribed to this event type eventTypeStr := string(event.Type) for _, subscribedEvent := range hook.Events { if subscribedEvent == eventTypeStr { logging.Debug("Hook matches event", "hook_id", hook.ID, "event_type", eventTypeStr) relevantHooks = append(relevantHooks, hook) break } } } m.mu.RUnlock() logging.Debug("Found relevant hooks for event", "event_type", event.Type, "hook_count", len(relevantHooks)) // If no relevant hooks found, cache the event if len(relevantHooks) == 0 { if m.cache != nil && m.cache.IsEnabled() { reason := fmt.Sprintf("No active webhooks configured for event type: %s", event.Type) if err := m.cache.Store(event, reason); err != nil { logging.Error("Failed to cache event", "event_type", event.Type, "error", err) } else { logging.Info("Event cached due to no active webhooks", "event_type", event.Type, "cache_size", m.cache.Count()) } } else { logging.Warn("No active webhooks for event and caching is disabled", "event_type", event.Type) } return } // Trigger each relevant hook success := m.triggerHooksForEvent(event, relevantHooks) // If event was successfully delivered and it was previously cached, remove it from cache if success && m.cache != nil && m.cache.IsEnabled() { // Try to find and remove this event from cache // (This handles the case where a cached event is being replayed) cachedEvents := m.cache.List() for _, cached := range cachedEvents { if cached.Event.Type == event.Type && cached.Event.Timestamp.Equal(event.Timestamp) { if err := m.cache.Remove(cached.ID); err == nil { logging.Info("Cached event successfully delivered and removed from cache", "event_id", cached.ID, "event_type", event.Type) } break } } } } // triggerHooksForEvent sends event data to specific hooks and returns success status func (m *Manager) triggerHooksForEvent(event events.Event, hooks []config.Hook) bool { ctx := event.Context if ctx == nil { ctx = context.Background() } // Create payload based on event type var payload interface{} // For message events, create MessagePayload if event.Type == events.EventMessageReceived || event.Type == events.EventMessageSent { messageType := getStringFromEvent(event.Data, "message_type") msgPayload := MessagePayload{ AccountID: getStringFromEvent(event.Data, "account_id"), MessageID: getStringFromEvent(event.Data, "message_id"), From: getStringFromEvent(event.Data, "from"), To: getStringFromEvent(event.Data, "to"), Text: getStringFromEvent(event.Data, "text"), Timestamp: getTimeFromEvent(event.Data, "timestamp"), IsGroup: getBoolFromEvent(event.Data, "is_group"), GroupName: getStringFromEvent(event.Data, "group_name"), SenderName: getStringFromEvent(event.Data, "sender_name"), MessageType: messageType, } // Add media info if message has media content if messageType != "" && messageType != "text" { msgPayload.Media = &MediaInfo{ Type: messageType, MimeType: getStringFromEvent(event.Data, "mime_type"), Filename: getStringFromEvent(event.Data, "filename"), URL: getStringFromEvent(event.Data, "media_url"), Base64: getStringFromEvent(event.Data, "media_base64"), } } payload = msgPayload } else { // For other events, create generic payload with event type and data payload = map[string]interface{}{ "event_type": string(event.Type), "timestamp": event.Timestamp, "data": event.Data, } } // Send to each hook with the event type var wg sync.WaitGroup successCount := 0 mu := sync.Mutex{} for _, hook := range hooks { wg.Add(1) go func(h config.Hook, et events.EventType) { defer wg.Done() resp := m.sendToHook(ctx, h, payload, et) if resp != nil || ctx.Err() == nil { // Count as success if we got a response or context is still valid mu.Lock() successCount++ mu.Unlock() } }(hook, event.Type) } wg.Wait() // Return true if at least one hook was successfully triggered return successCount > 0 } // Helper functions to extract data from event map func getStringFromEvent(data map[string]interface{}, key string) string { if val, ok := data[key].(string); ok { return val } return "" } func getTimeFromEvent(data map[string]interface{}, key string) time.Time { if val, ok := data[key].(time.Time); ok { return val } return time.Time{} } func getBoolFromEvent(data map[string]interface{}, key string) bool { if val, ok := data[key].(bool); ok { return val } return false } // LoadHooks loads hooks from configuration func (m *Manager) LoadHooks(hooks []config.Hook) { m.mu.Lock() defer m.mu.Unlock() for _, hook := range hooks { m.hooks[hook.ID] = hook } logging.Info("Hooks loaded", "count", len(hooks)) } // AddHook adds a new hook func (m *Manager) AddHook(hook config.Hook) { m.mu.Lock() defer m.mu.Unlock() m.hooks[hook.ID] = hook logging.Info("Hook added", "id", hook.ID, "name", hook.Name) } // RemoveHook removes a hook func (m *Manager) RemoveHook(id string) error { m.mu.Lock() defer m.mu.Unlock() if _, exists := m.hooks[id]; !exists { return fmt.Errorf("hook %s not found", id) } delete(m.hooks, id) logging.Info("Hook removed", "id", id) return nil } // GetHook returns a hook by ID func (m *Manager) GetHook(id string) (config.Hook, bool) { m.mu.RLock() defer m.mu.RUnlock() hook, exists := m.hooks[id] return hook, exists } // ListHooks returns all hooks func (m *Manager) ListHooks() []config.Hook { m.mu.RLock() defer m.mu.RUnlock() hooks := make([]config.Hook, 0, len(m.hooks)) for _, hook := range m.hooks { hooks = append(hooks, hook) } return hooks } // sendToHook sends any payload to a specific hook with explicit event type func (m *Manager) sendToHook(ctx context.Context, hook config.Hook, payload interface{}, eventType events.EventType) *HookResponse { // Create a new context detached from the incoming context to prevent cancellation // when the original HTTP request completes. Use a 30-second timeout to match client timeout. hookCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() // Use the original context for event publishing (if available) eventCtx := ctx if eventCtx == nil { eventCtx = context.Background() } // Publish hook triggered event m.eventBus.Publish(events.HookTriggeredEvent(eventCtx, hook.ID, hook.Name, hook.URL, payload)) data, err := json.Marshal(payload) if err != nil { logging.Error("Failed to marshal payload", "hook_id", hook.ID, "error", err) m.eventBus.Publish(events.HookFailedEvent(eventCtx, hook.ID, hook.Name, err)) return nil } // Build URL with query parameters parsedURL, err := url.Parse(hook.URL) if err != nil { logging.Error("Failed to parse hook URL", "hook_id", hook.ID, "error", err) m.eventBus.Publish(events.HookFailedEvent(eventCtx, hook.ID, hook.Name, err)) return nil } // Extract account_id from payload var accountID string switch p := payload.(type) { case MessagePayload: accountID = p.AccountID case map[string]interface{}: if data, ok := p["data"].(map[string]interface{}); ok { if aid, ok := data["account_id"].(string); ok { accountID = aid } } } // Add query parameters query := parsedURL.Query() if eventType != "" { query.Set("event", string(eventType)) } if accountID != "" { query.Set("account_id", accountID) } parsedURL.RawQuery = query.Encode() req, err := http.NewRequestWithContext(hookCtx, hook.Method, parsedURL.String(), bytes.NewReader(data)) if err != nil { logging.Error("Failed to create request", "hook_id", hook.ID, "error", err) m.eventBus.Publish(events.HookFailedEvent(eventCtx, hook.ID, hook.Name, err)) return nil } req.Header.Set("Content-Type", "application/json") for key, value := range hook.Headers { req.Header.Set(key, value) } logging.Debug("Sending to hook", "hook_id", hook.ID, "url", hook.URL) resp, err := m.client.Do(req) if err != nil { logging.Error("Failed to send to hook", "hook_id", hook.ID, "error", err) m.eventBus.Publish(events.HookFailedEvent(eventCtx, hook.ID, hook.Name, err)) return nil } defer resp.Body.Close() if resp.StatusCode < 200 || resp.StatusCode >= 300 { logging.Warn("Hook returned non-success status", "hook_id", hook.ID, "status", resp.StatusCode) m.eventBus.Publish(events.HookFailedEvent(eventCtx, hook.ID, hook.Name, fmt.Errorf("status code %d", resp.StatusCode))) return nil } // Try to parse response body, err := io.ReadAll(resp.Body) if err != nil { logging.Error("Failed to read hook response", "hook_id", hook.ID, "error", err) m.eventBus.Publish(events.HookFailedEvent(eventCtx, hook.ID, hook.Name, err)) return nil } if len(body) == 0 { m.eventBus.Publish(events.HookSuccessEvent(eventCtx, hook.ID, hook.Name, resp.StatusCode, nil)) return nil } var hookResp HookResponse if err := json.Unmarshal(body, &hookResp); err != nil { logging.Debug("Hook response not JSON", "hook_id", hook.ID) m.eventBus.Publish(events.HookSuccessEvent(eventCtx, hook.ID, hook.Name, resp.StatusCode, string(body))) return nil } logging.Debug("Hook response received", "hook_id", hook.ID, "send_message", hookResp.SendMessage) m.eventBus.Publish(events.HookSuccessEvent(eventCtx, hook.ID, hook.Name, resp.StatusCode, hookResp)) return &hookResp } // ReplayCachedEvents attempts to replay all cached events func (m *Manager) ReplayCachedEvents() (successCountResult int, failCountResult int, err error) { if m.cache == nil || !m.cache.IsEnabled() { return 0, 0, fmt.Errorf("message cache is not enabled") } cachedEvents := m.cache.List() if len(cachedEvents) == 0 { return 0, 0, nil } logging.Info("Replaying cached events", "count", len(cachedEvents)) successCount := 0 failCount := 0 for _, cached := range cachedEvents { // Try to process the event again m.handleEvent(cached.Event) // Increment attempt counter if err := m.cache.IncrementAttempts(cached.ID); err != nil { logging.Error("Failed to increment attempt counter", "event_id", cached.ID, "error", err) } // Check if event was successfully delivered by seeing if it's still cached // (handleEvent will remove it from cache if successfully delivered) time.Sleep(100 * time.Millisecond) // Give time for async delivery if _, exists := m.cache.Get(cached.ID); !exists { successCount++ logging.Debug("Cached event successfully replayed", "event_id", cached.ID) } else { failCount++ } } logging.Info("Cached event replay complete", "success", successCount, "failed", failCount, "remaining_cached", m.cache.Count()) return successCount, failCount, nil } // ReplayCachedEvent attempts to replay a single cached event by ID func (m *Manager) ReplayCachedEvent(id string) error { if m.cache == nil || !m.cache.IsEnabled() { return fmt.Errorf("message cache is not enabled") } cached, exists := m.cache.Get(id) if !exists { return fmt.Errorf("cached event not found: %s", id) } logging.Info("Replaying cached event", "event_id", id, "event_type", cached.Event.Type) // Process the event m.handleEvent(cached.Event) // Increment attempt counter if err := m.cache.IncrementAttempts(id); err != nil { logging.Error("Failed to increment attempt counter", "event_id", id, "error", err) } return nil } // GetCache returns the message cache (for external access) func (m *Manager) GetCache() *cache.MessageCache { return m.cache }