Files
whatshooked/pkg/eventlogger/mqtt_target.go
Hein ea1209c84c
Some checks failed
CI / Test (1.22) (push) Failing after -23m51s
CI / Test (1.23) (push) Failing after -23m51s
CI / Lint (push) Has been cancelled
CI / Build (push) Has been cancelled
Release / Build and Release (push) Successful in -18m25s
mqtt
2025-12-29 23:36:22 +02:00

298 lines
8.9 KiB
Go

package eventlogger
import (
"context"
"encoding/json"
"fmt"
"strings"
"time"
"git.warky.dev/wdevs/whatshooked/pkg/config"
"git.warky.dev/wdevs/whatshooked/pkg/events"
"git.warky.dev/wdevs/whatshooked/pkg/logging"
"git.warky.dev/wdevs/whatshooked/pkg/utils"
mqtt "github.com/eclipse/paho.mqtt.golang"
"go.mau.fi/whatsmeow/types"
)
// MQTTTarget represents an MQTT logging target
type MQTTTarget struct {
client mqtt.Client
config config.MQTTConfig
waManager WhatsAppManager
eventFilter map[string]bool
}
// NewMQTTTarget creates a new MQTT target
func NewMQTTTarget(cfg config.MQTTConfig, waManager WhatsAppManager) (*MQTTTarget, error) {
if cfg.Broker == "" {
return nil, fmt.Errorf("MQTT broker is required")
}
// Set defaults
if cfg.ClientID == "" {
cfg.ClientID = fmt.Sprintf("whatshooked-%d", time.Now().Unix())
}
if cfg.TopicPrefix == "" {
cfg.TopicPrefix = "whatshooked"
}
if cfg.QoS < 0 || cfg.QoS > 2 {
cfg.QoS = 1 // Default to QoS 1
}
target := &MQTTTarget{
config: cfg,
waManager: waManager,
eventFilter: make(map[string]bool),
}
// Build event filter map for fast lookup
if len(cfg.Events) > 0 {
for _, eventType := range cfg.Events {
target.eventFilter[eventType] = true
}
}
// Create MQTT client options
opts := mqtt.NewClientOptions()
opts.AddBroker(cfg.Broker)
opts.SetClientID(cfg.ClientID)
if cfg.Username != "" {
opts.SetUsername(cfg.Username)
}
if cfg.Password != "" {
opts.SetPassword(cfg.Password)
}
opts.SetKeepAlive(60 * time.Second)
opts.SetPingTimeout(10 * time.Second)
opts.SetAutoReconnect(true)
opts.SetMaxReconnectInterval(10 * time.Second)
// Connection lost handler
opts.SetConnectionLostHandler(func(client mqtt.Client, err error) {
logging.Error("MQTT connection lost", "error", err)
})
// On connect handler - subscribe to send topics if enabled
opts.SetOnConnectHandler(func(client mqtt.Client) {
logging.Info("MQTT connected to broker", "broker", cfg.Broker)
if cfg.Subscribe {
// Subscribe to send command topic for all accounts
topic := fmt.Sprintf("%s/+/send", cfg.TopicPrefix)
if token := client.Subscribe(topic, byte(cfg.QoS), target.handleSendMessage); token.Wait() && token.Error() != nil {
logging.Error("Failed to subscribe to MQTT topic", "topic", topic, "error", token.Error())
} else {
logging.Info("Subscribed to MQTT send topic", "topic", topic)
}
}
})
// Create and connect the client
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
return nil, fmt.Errorf("failed to connect to MQTT broker: %w", token.Error())
}
target.client = client
logging.Info("MQTT target initialized", "broker", cfg.Broker, "client_id", cfg.ClientID, "subscribe", cfg.Subscribe)
return target, nil
}
// Log publishes an event to MQTT
func (m *MQTTTarget) Log(event events.Event) error {
// Check if we should filter this event
if len(m.eventFilter) > 0 {
if !m.eventFilter[string(event.Type)] {
// Event is filtered out
return nil
}
}
// Extract account_id from event data
accountID := "unknown"
if id, ok := event.Data["account_id"].(string); ok && id != "" {
accountID = id
}
// Build the topic: whatshooked/accountid/eventtype
topic := fmt.Sprintf("%s/%s/%s", m.config.TopicPrefix, accountID, event.Type)
// Marshal event to JSON
payload, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("failed to marshal event: %w", err)
}
// Publish to MQTT
token := m.client.Publish(topic, byte(m.config.QoS), m.config.Retained, payload)
token.Wait()
if token.Error() != nil {
return fmt.Errorf("failed to publish to MQTT: %w", token.Error())
}
logging.Debug("Event published to MQTT", "topic", topic, "event_type", event.Type)
return nil
}
// handleSendMessage handles incoming MQTT messages for sending WhatsApp messages
func (m *MQTTTarget) handleSendMessage(client mqtt.Client, msg mqtt.Message) {
logging.Debug("MQTT send message received", "topic", msg.Topic(), "payload", string(msg.Payload()))
// Parse topic: whatshooked/accountid/send
parts := strings.Split(msg.Topic(), "/")
if len(parts) < 3 {
logging.Error("Invalid MQTT send topic format", "topic", msg.Topic())
return
}
accountID := parts[len(parts)-2]
// Parse message payload
var sendReq struct {
Type string `json:"type"` // Message type: "text", "image", "video", "document"
To string `json:"to"` // Phone number or JID
Text string `json:"text"` // Message text (for text messages)
Caption string `json:"caption"` // Optional caption for media
MimeType string `json:"mime_type"` // MIME type for media
Filename string `json:"filename"` // Filename for documents
// Media can be provided as either base64 or URL
Base64 string `json:"base64"` // Base64 encoded media data
URL string `json:"url"` // URL to download media from
}
if err := json.Unmarshal(msg.Payload(), &sendReq); err != nil {
logging.Error("Failed to parse MQTT send message", "error", err, "payload", string(msg.Payload()))
return
}
if sendReq.To == "" {
logging.Error("Missing required field 'to' in MQTT send message", "to", sendReq.To)
return
}
// Default to text message if type not specified
if sendReq.Type == "" {
sendReq.Type = "text"
}
// Parse JID
jid, err := types.ParseJID(sendReq.To)
if err != nil {
logging.Error("Failed to parse JID", "to", sendReq.To, "error", err)
return
}
ctx := context.Background()
// Handle different message types
switch sendReq.Type {
case "text":
if sendReq.Text == "" {
logging.Error("Missing required field 'text' for text message", "account_id", accountID)
return
}
if err := m.waManager.SendTextMessage(ctx, accountID, jid, sendReq.Text); err != nil {
logging.Error("Failed to send text message via MQTT", "account_id", accountID, "to", sendReq.To, "error", err)
} else {
logging.Info("Text message sent via MQTT", "account_id", accountID, "to", sendReq.To)
}
case "image":
mediaData, err := m.getMediaData(sendReq.Base64, sendReq.URL)
if err != nil {
logging.Error("Failed to get image data", "account_id", accountID, "error", err)
return
}
// Default MIME type if not specified
if sendReq.MimeType == "" {
sendReq.MimeType = "image/jpeg"
}
if err := m.waManager.SendImage(ctx, accountID, jid, mediaData, sendReq.MimeType, sendReq.Caption); err != nil {
logging.Error("Failed to send image via MQTT", "account_id", accountID, "to", sendReq.To, "error", err)
} else {
logging.Info("Image sent via MQTT", "account_id", accountID, "to", sendReq.To, "size", len(mediaData))
}
case "video":
mediaData, err := m.getMediaData(sendReq.Base64, sendReq.URL)
if err != nil {
logging.Error("Failed to get video data", "account_id", accountID, "error", err)
return
}
// Default MIME type if not specified
if sendReq.MimeType == "" {
sendReq.MimeType = "video/mp4"
}
if err := m.waManager.SendVideo(ctx, accountID, jid, mediaData, sendReq.MimeType, sendReq.Caption); err != nil {
logging.Error("Failed to send video via MQTT", "account_id", accountID, "to", sendReq.To, "error", err)
} else {
logging.Info("Video sent via MQTT", "account_id", accountID, "to", sendReq.To, "size", len(mediaData))
}
case "document":
mediaData, err := m.getMediaData(sendReq.Base64, sendReq.URL)
if err != nil {
logging.Error("Failed to get document data", "account_id", accountID, "error", err)
return
}
// Filename is required for documents
if sendReq.Filename == "" {
sendReq.Filename = "document"
}
// Default MIME type if not specified
if sendReq.MimeType == "" {
sendReq.MimeType = "application/pdf"
}
if err := m.waManager.SendDocument(ctx, accountID, jid, mediaData, sendReq.MimeType, sendReq.Filename, sendReq.Caption); err != nil {
logging.Error("Failed to send document via MQTT", "account_id", accountID, "to", sendReq.To, "error", err)
} else {
logging.Info("Document sent via MQTT", "account_id", accountID, "to", sendReq.To, "filename", sendReq.Filename, "size", len(mediaData))
}
default:
logging.Error("Unknown message type", "type", sendReq.Type, "account_id", accountID)
}
}
// getMediaData retrieves media data from either base64 string or URL
func (m *MQTTTarget) getMediaData(base64Data, url string) ([]byte, error) {
if base64Data != "" {
return utils.DecodeBase64(base64Data)
}
if url != "" {
return utils.DownloadMedia(url)
}
return nil, fmt.Errorf("either 'base64' or 'url' must be provided for media")
}
// Close disconnects from the MQTT broker
func (m *MQTTTarget) Close() error {
if m.client != nil && m.client.IsConnected() {
// Unsubscribe if subscribed
if m.config.Subscribe {
topic := fmt.Sprintf("%s/+/send", m.config.TopicPrefix)
if token := m.client.Unsubscribe(topic); token.Wait() && token.Error() != nil {
logging.Error("Failed to unsubscribe from MQTT topic", "topic", topic, "error", token.Error())
}
}
m.client.Disconnect(250)
logging.Info("MQTT target closed")
}
return nil
}