feat(whatsapp): ✨ Enhance webhook message handling
* Add support for new message types: audio, sticker, location, contacts, interactive, button, reaction, order, system, and unknown. * Implement logging for various webhook events for better visibility. * Update WebhookMessage struct to include new fields for enhanced message processing.
This commit is contained in:
@@ -90,20 +90,26 @@ func (m *Manager) Start() {
|
||||
for _, eventType := range allEventTypes {
|
||||
m.eventBus.Subscribe(eventType, m.handleEvent)
|
||||
}
|
||||
|
||||
logging.Info("Hook manager started and subscribed to events", "event_types", len(allEventTypes))
|
||||
}
|
||||
|
||||
// handleEvent processes any event and triggers relevant hooks
|
||||
func (m *Manager) handleEvent(event events.Event) {
|
||||
logging.Debug("Hook manager received event", "event_type", event.Type)
|
||||
|
||||
// Get hooks that are subscribed to this event type
|
||||
m.mu.RLock()
|
||||
relevantHooks := make([]config.Hook, 0)
|
||||
for _, hook := range m.hooks {
|
||||
if !hook.Active {
|
||||
logging.Debug("Skipping inactive hook", "hook_id", hook.ID)
|
||||
continue
|
||||
}
|
||||
|
||||
// If hook has no events specified, subscribe to all events
|
||||
if len(hook.Events) == 0 {
|
||||
logging.Debug("Hook subscribes to all events", "hook_id", hook.ID)
|
||||
relevantHooks = append(relevantHooks, hook)
|
||||
continue
|
||||
}
|
||||
@@ -112,6 +118,7 @@ func (m *Manager) handleEvent(event events.Event) {
|
||||
eventTypeStr := string(event.Type)
|
||||
for _, subscribedEvent := range hook.Events {
|
||||
if subscribedEvent == eventTypeStr {
|
||||
logging.Debug("Hook matches event", "hook_id", hook.ID, "event_type", eventTypeStr)
|
||||
relevantHooks = append(relevantHooks, hook)
|
||||
break
|
||||
}
|
||||
@@ -119,6 +126,8 @@ func (m *Manager) handleEvent(event events.Event) {
|
||||
}
|
||||
m.mu.RUnlock()
|
||||
|
||||
logging.Debug("Found relevant hooks for event", "event_type", event.Type, "hook_count", len(relevantHooks))
|
||||
|
||||
// Trigger each relevant hook
|
||||
if len(relevantHooks) > 0 {
|
||||
m.triggerHooksForEvent(event, relevantHooks)
|
||||
@@ -265,17 +274,24 @@ func (m *Manager) ListHooks() []config.Hook {
|
||||
|
||||
// sendToHook sends any payload to a specific hook with explicit event type
|
||||
func (m *Manager) sendToHook(ctx context.Context, hook config.Hook, payload interface{}, eventType events.EventType) *HookResponse {
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
// Create a new context detached from the incoming context to prevent cancellation
|
||||
// when the original HTTP request completes. Use a 30-second timeout to match client timeout.
|
||||
hookCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
// Use the original context for event publishing (if available)
|
||||
eventCtx := ctx
|
||||
if eventCtx == nil {
|
||||
eventCtx = context.Background()
|
||||
}
|
||||
|
||||
// Publish hook triggered event
|
||||
m.eventBus.Publish(events.HookTriggeredEvent(ctx, hook.ID, hook.Name, hook.URL, payload))
|
||||
m.eventBus.Publish(events.HookTriggeredEvent(eventCtx, hook.ID, hook.Name, hook.URL, payload))
|
||||
|
||||
data, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
logging.Error("Failed to marshal payload", "hook_id", hook.ID, "error", err)
|
||||
m.eventBus.Publish(events.HookFailedEvent(ctx, hook.ID, hook.Name, err))
|
||||
m.eventBus.Publish(events.HookFailedEvent(eventCtx, hook.ID, hook.Name, err))
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -283,7 +299,7 @@ func (m *Manager) sendToHook(ctx context.Context, hook config.Hook, payload inte
|
||||
parsedURL, err := url.Parse(hook.URL)
|
||||
if err != nil {
|
||||
logging.Error("Failed to parse hook URL", "hook_id", hook.ID, "error", err)
|
||||
m.eventBus.Publish(events.HookFailedEvent(ctx, hook.ID, hook.Name, err))
|
||||
m.eventBus.Publish(events.HookFailedEvent(eventCtx, hook.ID, hook.Name, err))
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -311,10 +327,10 @@ func (m *Manager) sendToHook(ctx context.Context, hook config.Hook, payload inte
|
||||
}
|
||||
parsedURL.RawQuery = query.Encode()
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, hook.Method, parsedURL.String(), bytes.NewReader(data))
|
||||
req, err := http.NewRequestWithContext(hookCtx, hook.Method, parsedURL.String(), bytes.NewReader(data))
|
||||
if err != nil {
|
||||
logging.Error("Failed to create request", "hook_id", hook.ID, "error", err)
|
||||
m.eventBus.Publish(events.HookFailedEvent(ctx, hook.ID, hook.Name, err))
|
||||
m.eventBus.Publish(events.HookFailedEvent(eventCtx, hook.ID, hook.Name, err))
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -328,14 +344,14 @@ func (m *Manager) sendToHook(ctx context.Context, hook config.Hook, payload inte
|
||||
resp, err := m.client.Do(req)
|
||||
if err != nil {
|
||||
logging.Error("Failed to send to hook", "hook_id", hook.ID, "error", err)
|
||||
m.eventBus.Publish(events.HookFailedEvent(ctx, hook.ID, hook.Name, err))
|
||||
m.eventBus.Publish(events.HookFailedEvent(eventCtx, hook.ID, hook.Name, err))
|
||||
return nil
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||
logging.Warn("Hook returned non-success status", "hook_id", hook.ID, "status", resp.StatusCode)
|
||||
m.eventBus.Publish(events.HookFailedEvent(ctx, hook.ID, hook.Name, fmt.Errorf("status code %d", resp.StatusCode)))
|
||||
m.eventBus.Publish(events.HookFailedEvent(eventCtx, hook.ID, hook.Name, fmt.Errorf("status code %d", resp.StatusCode)))
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -343,23 +359,23 @@ func (m *Manager) sendToHook(ctx context.Context, hook config.Hook, payload inte
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
logging.Error("Failed to read hook response", "hook_id", hook.ID, "error", err)
|
||||
m.eventBus.Publish(events.HookFailedEvent(ctx, hook.ID, hook.Name, err))
|
||||
m.eventBus.Publish(events.HookFailedEvent(eventCtx, hook.ID, hook.Name, err))
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(body) == 0 {
|
||||
m.eventBus.Publish(events.HookSuccessEvent(ctx, hook.ID, hook.Name, resp.StatusCode, nil))
|
||||
m.eventBus.Publish(events.HookSuccessEvent(eventCtx, hook.ID, hook.Name, resp.StatusCode, nil))
|
||||
return nil
|
||||
}
|
||||
|
||||
var hookResp HookResponse
|
||||
if err := json.Unmarshal(body, &hookResp); err != nil {
|
||||
logging.Debug("Hook response not JSON", "hook_id", hook.ID)
|
||||
m.eventBus.Publish(events.HookSuccessEvent(ctx, hook.ID, hook.Name, resp.StatusCode, string(body)))
|
||||
m.eventBus.Publish(events.HookSuccessEvent(eventCtx, hook.ID, hook.Name, resp.StatusCode, string(body)))
|
||||
return nil
|
||||
}
|
||||
|
||||
logging.Debug("Hook response received", "hook_id", hook.ID, "send_message", hookResp.SendMessage)
|
||||
m.eventBus.Publish(events.HookSuccessEvent(ctx, hook.ID, hook.Name, resp.StatusCode, hookResp))
|
||||
m.eventBus.Publish(events.HookSuccessEvent(eventCtx, hook.ID, hook.Name, resp.StatusCode, hookResp))
|
||||
return &hookResp
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user