Major refactor to library
This commit is contained in:
365
pkg/hooks/manager.go
Normal file
365
pkg/hooks/manager.go
Normal file
@@ -0,0 +1,365 @@
|
||||
package hooks
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.warky.dev/wdevs/whatshooked/pkg/config"
|
||||
"git.warky.dev/wdevs/whatshooked/pkg/events"
|
||||
"git.warky.dev/wdevs/whatshooked/pkg/logging"
|
||||
)
|
||||
|
||||
// MediaInfo represents media attachment information
|
||||
type MediaInfo struct {
|
||||
Type string `json:"type"`
|
||||
MimeType string `json:"mime_type,omitempty"`
|
||||
Filename string `json:"filename,omitempty"`
|
||||
URL string `json:"url,omitempty"`
|
||||
Base64 string `json:"base64,omitempty"`
|
||||
}
|
||||
|
||||
// MessagePayload represents a message sent to webhooks
|
||||
type MessagePayload struct {
|
||||
AccountID string `json:"account_id"`
|
||||
MessageID string `json:"message_id"`
|
||||
From string `json:"from"`
|
||||
To string `json:"to"`
|
||||
Text string `json:"text"`
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
IsGroup bool `json:"is_group"`
|
||||
GroupName string `json:"group_name,omitempty"`
|
||||
SenderName string `json:"sender_name,omitempty"`
|
||||
MessageType string `json:"message_type"`
|
||||
Media *MediaInfo `json:"media,omitempty"`
|
||||
}
|
||||
|
||||
// HookResponse represents a response from a webhook
|
||||
type HookResponse struct {
|
||||
SendMessage bool `json:"send_message"`
|
||||
To string `json:"to"`
|
||||
Text string `json:"text"`
|
||||
AccountID string `json:"account_id,omitempty"`
|
||||
}
|
||||
|
||||
// Manager manages webhooks
|
||||
type Manager struct {
|
||||
hooks map[string]config.Hook
|
||||
mu sync.RWMutex
|
||||
client *http.Client
|
||||
eventBus *events.EventBus
|
||||
}
|
||||
|
||||
// NewManager creates a new hook manager
|
||||
func NewManager(eventBus *events.EventBus) *Manager {
|
||||
return &Manager{
|
||||
hooks: make(map[string]config.Hook),
|
||||
eventBus: eventBus,
|
||||
client: &http.Client{
|
||||
Timeout: 30 * time.Second,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Start begins listening for events
|
||||
func (m *Manager) Start() {
|
||||
// Get all possible event types
|
||||
allEventTypes := []events.EventType{
|
||||
events.EventWhatsAppConnected,
|
||||
events.EventWhatsAppDisconnected,
|
||||
events.EventWhatsAppPairSuccess,
|
||||
events.EventWhatsAppPairFailed,
|
||||
events.EventWhatsAppQRCode,
|
||||
events.EventWhatsAppQRTimeout,
|
||||
events.EventWhatsAppQRError,
|
||||
events.EventWhatsAppPairEvent,
|
||||
events.EventMessageReceived,
|
||||
events.EventMessageSent,
|
||||
events.EventMessageFailed,
|
||||
events.EventMessageDelivered,
|
||||
events.EventMessageRead,
|
||||
}
|
||||
|
||||
// Subscribe to all event types with a generic handler
|
||||
for _, eventType := range allEventTypes {
|
||||
m.eventBus.Subscribe(eventType, m.handleEvent)
|
||||
}
|
||||
}
|
||||
|
||||
// handleEvent processes any event and triggers relevant hooks
|
||||
func (m *Manager) handleEvent(event events.Event) {
|
||||
// Get hooks that are subscribed to this event type
|
||||
m.mu.RLock()
|
||||
relevantHooks := make([]config.Hook, 0)
|
||||
for _, hook := range m.hooks {
|
||||
if !hook.Active {
|
||||
continue
|
||||
}
|
||||
|
||||
// If hook has no events specified, subscribe to all events
|
||||
if len(hook.Events) == 0 {
|
||||
relevantHooks = append(relevantHooks, hook)
|
||||
continue
|
||||
}
|
||||
|
||||
// Check if this hook is subscribed to this event type
|
||||
eventTypeStr := string(event.Type)
|
||||
for _, subscribedEvent := range hook.Events {
|
||||
if subscribedEvent == eventTypeStr {
|
||||
relevantHooks = append(relevantHooks, hook)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
m.mu.RUnlock()
|
||||
|
||||
// Trigger each relevant hook
|
||||
if len(relevantHooks) > 0 {
|
||||
m.triggerHooksForEvent(event, relevantHooks)
|
||||
}
|
||||
}
|
||||
|
||||
// triggerHooksForEvent sends event data to specific hooks
|
||||
func (m *Manager) triggerHooksForEvent(event events.Event, hooks []config.Hook) {
|
||||
ctx := event.Context
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
|
||||
// Create payload based on event type
|
||||
var payload interface{}
|
||||
|
||||
// For message events, create MessagePayload
|
||||
if event.Type == events.EventMessageReceived || event.Type == events.EventMessageSent {
|
||||
messageType := getStringFromEvent(event.Data, "message_type")
|
||||
|
||||
msgPayload := MessagePayload{
|
||||
AccountID: getStringFromEvent(event.Data, "account_id"),
|
||||
MessageID: getStringFromEvent(event.Data, "message_id"),
|
||||
From: getStringFromEvent(event.Data, "from"),
|
||||
To: getStringFromEvent(event.Data, "to"),
|
||||
Text: getStringFromEvent(event.Data, "text"),
|
||||
Timestamp: getTimeFromEvent(event.Data, "timestamp"),
|
||||
IsGroup: getBoolFromEvent(event.Data, "is_group"),
|
||||
GroupName: getStringFromEvent(event.Data, "group_name"),
|
||||
SenderName: getStringFromEvent(event.Data, "sender_name"),
|
||||
MessageType: messageType,
|
||||
}
|
||||
|
||||
// Add media info if message has media content
|
||||
if messageType != "" && messageType != "text" {
|
||||
msgPayload.Media = &MediaInfo{
|
||||
Type: messageType,
|
||||
MimeType: getStringFromEvent(event.Data, "mime_type"),
|
||||
Filename: getStringFromEvent(event.Data, "filename"),
|
||||
URL: getStringFromEvent(event.Data, "media_url"),
|
||||
Base64: getStringFromEvent(event.Data, "media_base64"),
|
||||
}
|
||||
}
|
||||
|
||||
payload = msgPayload
|
||||
} else {
|
||||
// For other events, create generic payload with event type and data
|
||||
payload = map[string]interface{}{
|
||||
"event_type": string(event.Type),
|
||||
"timestamp": event.Timestamp,
|
||||
"data": event.Data,
|
||||
}
|
||||
}
|
||||
|
||||
// Send to each hook with the event type
|
||||
var wg sync.WaitGroup
|
||||
for _, hook := range hooks {
|
||||
wg.Add(1)
|
||||
go func(h config.Hook, et events.EventType) {
|
||||
defer wg.Done()
|
||||
_ = m.sendToHook(ctx, h, payload, et)
|
||||
}(hook, event.Type)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// Helper functions to extract data from event map
|
||||
func getStringFromEvent(data map[string]interface{}, key string) string {
|
||||
if val, ok := data[key].(string); ok {
|
||||
return val
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func getTimeFromEvent(data map[string]interface{}, key string) time.Time {
|
||||
if val, ok := data[key].(time.Time); ok {
|
||||
return val
|
||||
}
|
||||
return time.Time{}
|
||||
}
|
||||
|
||||
func getBoolFromEvent(data map[string]interface{}, key string) bool {
|
||||
if val, ok := data[key].(bool); ok {
|
||||
return val
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// LoadHooks loads hooks from configuration
|
||||
func (m *Manager) LoadHooks(hooks []config.Hook) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
for _, hook := range hooks {
|
||||
m.hooks[hook.ID] = hook
|
||||
}
|
||||
|
||||
logging.Info("Hooks loaded", "count", len(hooks))
|
||||
}
|
||||
|
||||
// AddHook adds a new hook
|
||||
func (m *Manager) AddHook(hook config.Hook) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
m.hooks[hook.ID] = hook
|
||||
logging.Info("Hook added", "id", hook.ID, "name", hook.Name)
|
||||
}
|
||||
|
||||
// RemoveHook removes a hook
|
||||
func (m *Manager) RemoveHook(id string) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
if _, exists := m.hooks[id]; !exists {
|
||||
return fmt.Errorf("hook %s not found", id)
|
||||
}
|
||||
|
||||
delete(m.hooks, id)
|
||||
logging.Info("Hook removed", "id", id)
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetHook returns a hook by ID
|
||||
func (m *Manager) GetHook(id string) (config.Hook, bool) {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
|
||||
hook, exists := m.hooks[id]
|
||||
return hook, exists
|
||||
}
|
||||
|
||||
// ListHooks returns all hooks
|
||||
func (m *Manager) ListHooks() []config.Hook {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
|
||||
hooks := make([]config.Hook, 0, len(m.hooks))
|
||||
for _, hook := range m.hooks {
|
||||
hooks = append(hooks, hook)
|
||||
}
|
||||
return hooks
|
||||
}
|
||||
|
||||
// sendToHook sends any payload to a specific hook with explicit event type
|
||||
func (m *Manager) sendToHook(ctx context.Context, hook config.Hook, payload interface{}, eventType events.EventType) *HookResponse {
|
||||
if ctx == nil {
|
||||
ctx = context.Background()
|
||||
}
|
||||
|
||||
// Publish hook triggered event
|
||||
m.eventBus.Publish(events.HookTriggeredEvent(ctx, hook.ID, hook.Name, hook.URL, payload))
|
||||
|
||||
data, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
logging.Error("Failed to marshal payload", "hook_id", hook.ID, "error", err)
|
||||
m.eventBus.Publish(events.HookFailedEvent(ctx, hook.ID, hook.Name, err))
|
||||
return nil
|
||||
}
|
||||
|
||||
// Build URL with query parameters
|
||||
parsedURL, err := url.Parse(hook.URL)
|
||||
if err != nil {
|
||||
logging.Error("Failed to parse hook URL", "hook_id", hook.ID, "error", err)
|
||||
m.eventBus.Publish(events.HookFailedEvent(ctx, hook.ID, hook.Name, err))
|
||||
return nil
|
||||
}
|
||||
|
||||
// Extract account_id from payload
|
||||
var accountID string
|
||||
|
||||
switch p := payload.(type) {
|
||||
case MessagePayload:
|
||||
accountID = p.AccountID
|
||||
case map[string]interface{}:
|
||||
if data, ok := p["data"].(map[string]interface{}); ok {
|
||||
if aid, ok := data["account_id"].(string); ok {
|
||||
accountID = aid
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Add query parameters
|
||||
query := parsedURL.Query()
|
||||
if eventType != "" {
|
||||
query.Set("event", string(eventType))
|
||||
}
|
||||
if accountID != "" {
|
||||
query.Set("account_id", accountID)
|
||||
}
|
||||
parsedURL.RawQuery = query.Encode()
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, hook.Method, parsedURL.String(), bytes.NewReader(data))
|
||||
if err != nil {
|
||||
logging.Error("Failed to create request", "hook_id", hook.ID, "error", err)
|
||||
m.eventBus.Publish(events.HookFailedEvent(ctx, hook.ID, hook.Name, err))
|
||||
return nil
|
||||
}
|
||||
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
for key, value := range hook.Headers {
|
||||
req.Header.Set(key, value)
|
||||
}
|
||||
|
||||
logging.Debug("Sending to hook", "hook_id", hook.ID, "url", hook.URL)
|
||||
|
||||
resp, err := m.client.Do(req)
|
||||
if err != nil {
|
||||
logging.Error("Failed to send to hook", "hook_id", hook.ID, "error", err)
|
||||
m.eventBus.Publish(events.HookFailedEvent(ctx, hook.ID, hook.Name, err))
|
||||
return nil
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||
logging.Warn("Hook returned non-success status", "hook_id", hook.ID, "status", resp.StatusCode)
|
||||
m.eventBus.Publish(events.HookFailedEvent(ctx, hook.ID, hook.Name, fmt.Errorf("status code %d", resp.StatusCode)))
|
||||
return nil
|
||||
}
|
||||
|
||||
// Try to parse response
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
logging.Error("Failed to read hook response", "hook_id", hook.ID, "error", err)
|
||||
m.eventBus.Publish(events.HookFailedEvent(ctx, hook.ID, hook.Name, err))
|
||||
return nil
|
||||
}
|
||||
|
||||
if len(body) == 0 {
|
||||
m.eventBus.Publish(events.HookSuccessEvent(ctx, hook.ID, hook.Name, resp.StatusCode, nil))
|
||||
return nil
|
||||
}
|
||||
|
||||
var hookResp HookResponse
|
||||
if err := json.Unmarshal(body, &hookResp); err != nil {
|
||||
logging.Debug("Hook response not JSON", "hook_id", hook.ID)
|
||||
m.eventBus.Publish(events.HookSuccessEvent(ctx, hook.ID, hook.Name, resp.StatusCode, string(body)))
|
||||
return nil
|
||||
}
|
||||
|
||||
logging.Debug("Hook response received", "hook_id", hook.ID, "send_message", hookResp.SendMessage)
|
||||
m.eventBus.Publish(events.HookSuccessEvent(ctx, hook.ID, hook.Name, resp.StatusCode, hookResp))
|
||||
return &hookResp
|
||||
}
|
||||
Reference in New Issue
Block a user