Files
Hein c4d974d6ce
Some checks failed
CI / Test (1.23) (push) Failing after -27m1s
CI / Lint (push) Successful in -26m31s
CI / Build (push) Successful in -27m3s
CI / Test (1.22) (push) Failing after -24m58s
feat(cache): 🎉 add message caching functionality
* Implement MessageCache to store events when no webhooks are available.
* Add configuration options for enabling cache, setting data path, max age, and max events.
* Create API endpoints for managing cached events, including listing, replaying, and deleting.
* Integrate caching into the hooks manager to store events when no active webhooks are found.
* Enhance logging for better traceability of cached events and operations.
2026-01-30 16:00:34 +02:00

450 lines
12 KiB
Go

package businessapi
import (
"context"
"crypto/sha256"
"encoding/base64"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"git.warky.dev/wdevs/whatshooked/pkg/events"
"git.warky.dev/wdevs/whatshooked/pkg/logging"
)
// HandleWebhook processes incoming webhook events from WhatsApp Business API
func (c *Client) HandleWebhook(r *http.Request) error {
body, err := io.ReadAll(r.Body)
if err != nil {
return fmt.Errorf("failed to read request body: %w", err)
}
var payload WebhookPayload
if err := json.Unmarshal(body, &payload); err != nil {
return fmt.Errorf("failed to parse webhook payload: %w", err)
}
logging.Info("Processing webhook payload",
"account_id", c.id,
"entries", len(payload.Entry))
// Process each entry
changeCount := 0
for _, entry := range payload.Entry {
changeCount += len(entry.Changes)
for i := range entry.Changes {
c.processChange(entry.Changes[i])
}
}
logging.Info("Webhook payload processed",
"account_id", c.id,
"entries", len(payload.Entry),
"changes", changeCount)
return nil
}
// processChange processes a webhook change
func (c *Client) processChange(change WebhookChange) {
ctx := context.Background()
logging.Info("Processing webhook change",
"account_id", c.id,
"field", change.Field,
"phone_number_id", change.Value.Metadata.PhoneNumberID)
// Handle different field types
switch change.Field {
case "messages":
// Process messages
for i := range change.Value.Messages {
msg := change.Value.Messages[i]
c.processMessage(ctx, msg, change.Value.Contacts)
}
// Process statuses
for _, status := range change.Value.Statuses {
c.processStatus(ctx, status)
}
case "message_template_status_update":
// Log template status updates for visibility
logging.Info("Message template status update received",
"account_id", c.id,
"phone_number_id", change.Value.Metadata.PhoneNumberID)
case "account_update":
// Log account updates
logging.Info("Account update received",
"account_id", c.id,
"phone_number_id", change.Value.Metadata.PhoneNumberID)
case "phone_number_quality_update":
// Log quality updates
logging.Info("Phone number quality update received",
"account_id", c.id,
"phone_number_id", change.Value.Metadata.PhoneNumberID)
case "phone_number_name_update":
// Log name updates
logging.Info("Phone number name update received",
"account_id", c.id,
"phone_number_id", change.Value.Metadata.PhoneNumberID)
case "account_alerts":
// Log account alerts
logging.Warn("Account alert received",
"account_id", c.id,
"phone_number_id", change.Value.Metadata.PhoneNumberID)
default:
logging.Debug("Unknown webhook field type",
"account_id", c.id,
"field", change.Field)
}
}
// processMessage processes an incoming message
func (c *Client) processMessage(ctx context.Context, msg WebhookMessage, contacts []WebhookContact) {
// Get sender name from contacts
senderName := ""
for _, contact := range contacts {
if contact.WaID == msg.From {
senderName = contact.Profile.Name
break
}
}
// Parse timestamp
timestamp := c.parseTimestamp(msg.Timestamp)
var text string
var messageType string
var mimeType string
var filename string
var mediaBase64 string
var mediaURL string
// Process based on message type
switch msg.Type {
case "text":
if msg.Text != nil {
text = msg.Text.Body
}
messageType = "text"
case "image":
if msg.Image != nil {
messageType = "image"
mimeType = msg.Image.MimeType
text = msg.Image.Caption
// Download and process media
data, _, err := c.downloadMedia(ctx, msg.Image.ID)
if err != nil {
logging.Error("Failed to download image", "account_id", c.id, "media_id", msg.Image.ID, "error", err)
} else {
filename, mediaURL = c.processMediaData(msg.ID, data, mimeType, &mediaBase64)
}
}
case "video":
if msg.Video != nil {
messageType = "video"
mimeType = msg.Video.MimeType
text = msg.Video.Caption
// Download and process media
data, _, err := c.downloadMedia(ctx, msg.Video.ID)
if err != nil {
logging.Error("Failed to download video", "account_id", c.id, "media_id", msg.Video.ID, "error", err)
} else {
filename, mediaURL = c.processMediaData(msg.ID, data, mimeType, &mediaBase64)
}
}
case "document":
if msg.Document != nil {
messageType = "document"
mimeType = msg.Document.MimeType
text = msg.Document.Caption
filename = msg.Document.Filename
// Download and process media
data, _, err := c.downloadMedia(ctx, msg.Document.ID)
if err != nil {
logging.Error("Failed to download document", "account_id", c.id, "media_id", msg.Document.ID, "error", err)
} else {
filename, mediaURL = c.processMediaData(msg.ID, data, mimeType, &mediaBase64)
}
}
case "audio":
if msg.Audio != nil {
messageType = "audio"
mimeType = msg.Audio.MimeType
// Download and process media
data, _, err := c.downloadMedia(ctx, msg.Audio.ID)
if err != nil {
logging.Error("Failed to download audio", "account_id", c.id, "media_id", msg.Audio.ID, "error", err)
} else {
filename, mediaURL = c.processMediaData(msg.ID, data, mimeType, &mediaBase64)
}
}
case "sticker":
if msg.Sticker != nil {
messageType = "sticker"
mimeType = msg.Sticker.MimeType
// Download and process media
data, _, err := c.downloadMedia(ctx, msg.Sticker.ID)
if err != nil {
logging.Error("Failed to download sticker", "account_id", c.id, "media_id", msg.Sticker.ID, "error", err)
} else {
filename, mediaURL = c.processMediaData(msg.ID, data, mimeType, &mediaBase64)
}
}
case "location":
if msg.Location != nil {
messageType = "location"
// Format location as text
text = fmt.Sprintf("Location: %s (%s) - %.6f, %.6f",
msg.Location.Name, msg.Location.Address,
msg.Location.Latitude, msg.Location.Longitude)
}
case "contacts":
if len(msg.Contacts) > 0 {
messageType = "contacts"
// Format contacts as text
var contactNames []string
for i := range msg.Contacts {
contact := msg.Contacts[i]
contactNames = append(contactNames, contact.Name.FormattedName)
}
text = fmt.Sprintf("Shared %d contact(s): %s", len(msg.Contacts), strings.Join(contactNames, ", "))
}
case "interactive":
if msg.Interactive != nil {
messageType = "interactive"
switch msg.Interactive.Type {
case "button_reply":
if msg.Interactive.ButtonReply != nil {
text = msg.Interactive.ButtonReply.Title
}
case "list_reply":
if msg.Interactive.ListReply != nil {
text = msg.Interactive.ListReply.Title
}
case "nfm_reply":
if msg.Interactive.NfmReply != nil {
text = msg.Interactive.NfmReply.Body
}
}
}
case "button":
if msg.Button != nil {
messageType = "button"
text = msg.Button.Text
}
case "reaction":
if msg.Reaction != nil {
messageType = "reaction"
text = msg.Reaction.Emoji
}
case "order":
if msg.Order != nil {
messageType = "order"
text = fmt.Sprintf("Order with %d item(s): %s", len(msg.Order.ProductItems), msg.Order.Text)
}
case "system":
if msg.System != nil {
messageType = "system"
text = msg.System.Body
}
case "unknown":
// messageType = "unknown"
logging.Warn("Received unknown message type", "account_id", c.id, "message_id", msg.ID)
return
default:
logging.Warn("Unsupported message type", "account_id", c.id, "type", msg.Type)
return
}
// Publish message received event
logging.Info("Message received via WhatsApp",
"account_id", c.id,
"message_id", msg.ID,
"from", msg.From,
"type", messageType)
c.eventBus.Publish(events.MessageReceivedEvent(
ctx,
c.id,
msg.ID,
msg.From,
msg.From, // For Business API, chat is same as sender for individual messages
text,
timestamp,
false, // Business API doesn't indicate groups in this webhook
"",
senderName,
messageType,
mimeType,
filename,
mediaBase64,
mediaURL,
))
logging.Debug("Message received via Business API", "account_id", c.id, "from", msg.From, "type", messageType)
}
// processStatus processes a message status update
func (c *Client) processStatus(ctx context.Context, status WebhookStatus) {
timestamp := c.parseTimestamp(status.Timestamp)
switch status.Status {
case "sent":
c.eventBus.Publish(events.MessageSentEvent(ctx, c.id, status.ID, status.RecipientID, ""))
logging.Info("Message status: sent", "account_id", c.id, "message_id", status.ID, "recipient", status.RecipientID)
case "delivered":
c.eventBus.Publish(events.MessageDeliveredEvent(ctx, c.id, status.ID, status.RecipientID, timestamp))
logging.Info("Message status: delivered", "account_id", c.id, "message_id", status.ID, "recipient", status.RecipientID)
case "read":
c.eventBus.Publish(events.MessageReadEvent(ctx, c.id, status.ID, status.RecipientID, timestamp))
logging.Info("Message status: read", "account_id", c.id, "message_id", status.ID, "recipient", status.RecipientID)
case "failed":
errMsg := "unknown error"
if len(status.Errors) > 0 {
errMsg = fmt.Sprintf("%s (code: %d)", status.Errors[0].Title, status.Errors[0].Code)
}
c.eventBus.Publish(events.MessageFailedEvent(ctx, c.id, status.RecipientID, "", fmt.Errorf("%s", errMsg)))
logging.Error("Message failed", "account_id", c.id, "message_id", status.ID, "error", errMsg)
default:
logging.Debug("Unknown status type", "account_id", c.id, "status", status.Status)
}
}
// parseTimestamp parses a Unix timestamp string to time.Time
func (c *Client) parseTimestamp(ts string) time.Time {
unix, err := strconv.ParseInt(ts, 10, 64)
if err != nil {
logging.Warn("Failed to parse timestamp", "timestamp", ts, "error", err)
return time.Now()
}
return time.Unix(unix, 0)
}
// processMediaData processes media based on the configured mode
func (c *Client) processMediaData(messageID string, data []byte, mimeType string, mediaBase64 *string) (filename string, mediaURL string) {
mode := c.mediaConfig.Mode
// Generate filename
ext := getExtensionFromMimeType(mimeType)
hash := sha256.Sum256(data)
hashStr := hex.EncodeToString(hash[:8])
filename = fmt.Sprintf("%s_%s%s", messageID, hashStr, ext)
// Handle base64 mode
if mode == "base64" || mode == "both" {
*mediaBase64 = base64.StdEncoding.EncodeToString(data)
}
// Handle link mode
if mode == "link" || mode == "both" {
// Save file to disk
filePath, err := c.saveMediaFile(messageID, data, mimeType)
if err != nil {
logging.Error("Failed to save media file", "account_id", c.id, "message_id", messageID, "error", err)
} else {
filename = filepath.Base(filePath)
mediaURL = c.generateMediaURL(messageID, filename)
}
}
return filename, mediaURL
}
// saveMediaFile saves media data to disk
func (c *Client) saveMediaFile(messageID string, data []byte, mimeType string) (string, error) {
mediaDir := filepath.Join(c.mediaConfig.DataPath, c.id)
if err := os.MkdirAll(mediaDir, 0755); err != nil {
return "", fmt.Errorf("failed to create media directory: %w", err)
}
hash := sha256.Sum256(data)
hashStr := hex.EncodeToString(hash[:8])
ext := getExtensionFromMimeType(mimeType)
filename := fmt.Sprintf("%s_%s%s", messageID, hashStr, ext)
filePath := filepath.Join(mediaDir, filename)
if err := os.WriteFile(filePath, data, 0644); err != nil {
return "", fmt.Errorf("failed to write media file: %w", err)
}
return filePath, nil
}
// generateMediaURL generates a URL for accessing stored media
func (c *Client) generateMediaURL(messageID, filename string) string {
baseURL := c.mediaConfig.BaseURL
if baseURL == "" {
baseURL = "http://localhost:8080"
}
return fmt.Sprintf("%s/api/media/%s/%s", baseURL, c.id, filename)
}
// getExtensionFromMimeType returns the file extension for a given MIME type
func getExtensionFromMimeType(mimeType string) string {
extensions := map[string]string{
"image/jpeg": ".jpg",
"image/png": ".png",
"image/gif": ".gif",
"image/webp": ".webp",
"video/mp4": ".mp4",
"video/mpeg": ".mpeg",
"video/webm": ".webm",
"video/3gpp": ".3gp",
"application/pdf": ".pdf",
"application/msword": ".doc",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document": ".docx",
"application/vnd.ms-excel": ".xls",
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": ".xlsx",
"text/plain": ".txt",
"application/json": ".json",
"audio/mpeg": ".mp3",
"audio/mp4": ".m4a",
"audio/ogg": ".ogg",
"audio/amr": ".amr",
"audio/opus": ".opus",
}
if ext, ok := extensions[mimeType]; ok {
return ext
}
return ""
}