package eventlogger import ( "context" "encoding/json" "fmt" "strings" "time" "git.warky.dev/wdevs/whatshooked/pkg/config" "git.warky.dev/wdevs/whatshooked/pkg/events" "git.warky.dev/wdevs/whatshooked/pkg/logging" "git.warky.dev/wdevs/whatshooked/pkg/utils" mqtt "github.com/eclipse/paho.mqtt.golang" "go.mau.fi/whatsmeow/types" ) // MQTTTarget represents an MQTT logging target type MQTTTarget struct { client mqtt.Client config config.MQTTConfig waManager WhatsAppManager eventFilter map[string]bool } // NewMQTTTarget creates a new MQTT target func NewMQTTTarget(cfg config.MQTTConfig, waManager WhatsAppManager) (*MQTTTarget, error) { if cfg.Broker == "" { return nil, fmt.Errorf("MQTT broker is required") } // Set defaults if cfg.ClientID == "" { cfg.ClientID = fmt.Sprintf("whatshooked-%d", time.Now().Unix()) } if cfg.TopicPrefix == "" { cfg.TopicPrefix = "whatshooked" } if cfg.QoS < 0 || cfg.QoS > 2 { cfg.QoS = 1 // Default to QoS 1 } target := &MQTTTarget{ config: cfg, waManager: waManager, eventFilter: make(map[string]bool), } // Build event filter map for fast lookup if len(cfg.Events) > 0 { for _, eventType := range cfg.Events { target.eventFilter[eventType] = true } } // Create MQTT client options opts := mqtt.NewClientOptions() opts.AddBroker(cfg.Broker) opts.SetClientID(cfg.ClientID) if cfg.Username != "" { opts.SetUsername(cfg.Username) } if cfg.Password != "" { opts.SetPassword(cfg.Password) } opts.SetKeepAlive(60 * time.Second) opts.SetPingTimeout(10 * time.Second) opts.SetAutoReconnect(true) opts.SetMaxReconnectInterval(10 * time.Second) // Connection lost handler opts.SetConnectionLostHandler(func(client mqtt.Client, err error) { logging.Error("MQTT connection lost", "error", err) }) // On connect handler - subscribe to send topics if enabled opts.SetOnConnectHandler(func(client mqtt.Client) { logging.Info("MQTT connected to broker", "broker", cfg.Broker) if cfg.Subscribe { // Subscribe to send command topic for all accounts topic := fmt.Sprintf("%s/+/send", cfg.TopicPrefix) if token := client.Subscribe(topic, byte(cfg.QoS), target.handleSendMessage); token.Wait() && token.Error() != nil { logging.Error("Failed to subscribe to MQTT topic", "topic", topic, "error", token.Error()) } else { logging.Info("Subscribed to MQTT send topic", "topic", topic) } } }) // Create and connect the client client := mqtt.NewClient(opts) if token := client.Connect(); token.Wait() && token.Error() != nil { return nil, fmt.Errorf("failed to connect to MQTT broker: %w", token.Error()) } target.client = client logging.Info("MQTT target initialized", "broker", cfg.Broker, "client_id", cfg.ClientID, "subscribe", cfg.Subscribe) return target, nil } // Log publishes an event to MQTT func (m *MQTTTarget) Log(event events.Event) error { // Check if we should filter this event if len(m.eventFilter) > 0 { if !m.eventFilter[string(event.Type)] { // Event is filtered out return nil } } // Extract account_id from event data accountID := "unknown" if id, ok := event.Data["account_id"].(string); ok && id != "" { accountID = id } // Build the topic: whatshooked/accountid/eventtype topic := fmt.Sprintf("%s/%s/%s", m.config.TopicPrefix, accountID, event.Type) // Marshal event to JSON payload, err := json.Marshal(event) if err != nil { return fmt.Errorf("failed to marshal event: %w", err) } // Publish to MQTT token := m.client.Publish(topic, byte(m.config.QoS), m.config.Retained, payload) token.Wait() if token.Error() != nil { return fmt.Errorf("failed to publish to MQTT: %w", token.Error()) } logging.Debug("Event published to MQTT", "topic", topic, "event_type", event.Type) return nil } // handleSendMessage handles incoming MQTT messages for sending WhatsApp messages func (m *MQTTTarget) handleSendMessage(client mqtt.Client, msg mqtt.Message) { logging.Debug("MQTT send message received", "topic", msg.Topic(), "payload", string(msg.Payload())) // Parse topic: whatshooked/accountid/send parts := strings.Split(msg.Topic(), "/") if len(parts) < 3 { logging.Error("Invalid MQTT send topic format", "topic", msg.Topic()) return } accountID := parts[len(parts)-2] // Parse message payload var sendReq struct { Type string `json:"type"` // Message type: "text", "image", "video", "document" To string `json:"to"` // Phone number or JID Text string `json:"text"` // Message text (for text messages) Caption string `json:"caption"` // Optional caption for media MimeType string `json:"mime_type"` // MIME type for media Filename string `json:"filename"` // Filename for documents // Media can be provided as either base64 or URL Base64 string `json:"base64"` // Base64 encoded media data URL string `json:"url"` // URL to download media from } if err := json.Unmarshal(msg.Payload(), &sendReq); err != nil { logging.Error("Failed to parse MQTT send message", "error", err, "payload", string(msg.Payload())) return } if sendReq.To == "" { logging.Error("Missing required field 'to' in MQTT send message", "to", sendReq.To) return } // Default to text message if type not specified if sendReq.Type == "" { sendReq.Type = "text" } // Parse JID jid, err := types.ParseJID(sendReq.To) if err != nil { logging.Error("Failed to parse JID", "to", sendReq.To, "error", err) return } ctx := context.Background() // Handle different message types switch sendReq.Type { case "text": if sendReq.Text == "" { logging.Error("Missing required field 'text' for text message", "account_id", accountID) return } if err := m.waManager.SendTextMessage(ctx, accountID, jid, sendReq.Text); err != nil { logging.Error("Failed to send text message via MQTT", "account_id", accountID, "to", sendReq.To, "error", err) } else { logging.Info("Text message sent via MQTT", "account_id", accountID, "to", sendReq.To) } case "image": mediaData, err := m.getMediaData(sendReq.Base64, sendReq.URL) if err != nil { logging.Error("Failed to get image data", "account_id", accountID, "error", err) return } // Default MIME type if not specified if sendReq.MimeType == "" { sendReq.MimeType = "image/jpeg" } if err := m.waManager.SendImage(ctx, accountID, jid, mediaData, sendReq.MimeType, sendReq.Caption); err != nil { logging.Error("Failed to send image via MQTT", "account_id", accountID, "to", sendReq.To, "error", err) } else { logging.Info("Image sent via MQTT", "account_id", accountID, "to", sendReq.To, "size", len(mediaData)) } case "video": mediaData, err := m.getMediaData(sendReq.Base64, sendReq.URL) if err != nil { logging.Error("Failed to get video data", "account_id", accountID, "error", err) return } // Default MIME type if not specified if sendReq.MimeType == "" { sendReq.MimeType = "video/mp4" } if err := m.waManager.SendVideo(ctx, accountID, jid, mediaData, sendReq.MimeType, sendReq.Caption); err != nil { logging.Error("Failed to send video via MQTT", "account_id", accountID, "to", sendReq.To, "error", err) } else { logging.Info("Video sent via MQTT", "account_id", accountID, "to", sendReq.To, "size", len(mediaData)) } case "document": mediaData, err := m.getMediaData(sendReq.Base64, sendReq.URL) if err != nil { logging.Error("Failed to get document data", "account_id", accountID, "error", err) return } // Filename is required for documents if sendReq.Filename == "" { sendReq.Filename = "document" } // Default MIME type if not specified if sendReq.MimeType == "" { sendReq.MimeType = "application/pdf" } if err := m.waManager.SendDocument(ctx, accountID, jid, mediaData, sendReq.MimeType, sendReq.Filename, sendReq.Caption); err != nil { logging.Error("Failed to send document via MQTT", "account_id", accountID, "to", sendReq.To, "error", err) } else { logging.Info("Document sent via MQTT", "account_id", accountID, "to", sendReq.To, "filename", sendReq.Filename, "size", len(mediaData)) } default: logging.Error("Unknown message type", "type", sendReq.Type, "account_id", accountID) } } // getMediaData retrieves media data from either base64 string or URL func (m *MQTTTarget) getMediaData(base64Data, url string) ([]byte, error) { if base64Data != "" { return utils.DecodeBase64(base64Data) } if url != "" { return utils.DownloadMedia(url) } return nil, fmt.Errorf("either 'base64' or 'url' must be provided for media") } // Close disconnects from the MQTT broker func (m *MQTTTarget) Close() error { if m.client != nil && m.client.IsConnected() { // Unsubscribe if subscribed if m.config.Subscribe { topic := fmt.Sprintf("%s/+/send", m.config.TopicPrefix) if token := m.client.Unsubscribe(topic); token.Wait() && token.Error() != nil { logging.Error("Failed to unsubscribe from MQTT topic", "topic", topic, "error", token.Error()) } } m.client.Disconnect(250) logging.Info("MQTT target closed") } return nil }