From c4d974d6ce2f999c62f709e92e89f68025a3cc88 Mon Sep 17 00:00:00 2001 From: Hein Date: Fri, 30 Jan 2026 16:00:34 +0200 Subject: [PATCH] =?UTF-8?q?feat(cache):=20=F0=9F=8E=89=20add=20message=20c?= =?UTF-8?q?aching=20functionality?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Implement MessageCache to store events when no webhooks are available. * Add configuration options for enabling cache, setting data path, max age, and max events. * Create API endpoints for managing cached events, including listing, replaying, and deleting. * Integrate caching into the hooks manager to store events when no active webhooks are found. * Enhance logging for better traceability of cached events and operations. --- WHATSAPP_BUSINESS.md | 658 ++++++++++++++++++++++++++++- pkg/cache/message_cache.go | 394 +++++++++++++++++ pkg/config/config.go | 34 +- pkg/handlers/businessapi.go | 11 + pkg/handlers/cache.go | 254 +++++++++++ pkg/hooks/manager.go | 135 +++++- pkg/whatsapp/businessapi/events.go | 32 +- pkg/whatshooked/server.go | 10 + pkg/whatshooked/whatshooked.go | 37 +- 9 files changed, 1535 insertions(+), 30 deletions(-) create mode 100644 pkg/cache/message_cache.go create mode 100644 pkg/handlers/cache.go diff --git a/WHATSAPP_BUSINESS.md b/WHATSAPP_BUSINESS.md index 735bd2d..0c4a10f 100644 --- a/WHATSAPP_BUSINESS.md +++ b/WHATSAPP_BUSINESS.md @@ -169,7 +169,8 @@ Update your `config.json` with the Business API configuration: "phone_number_id": "123456789012345", "access_token": "EAAxxxxxxxxxxxx_your_permanent_token_here", "business_account_id": "987654321098765", - "api_version": "v21.0" + "api_version": "v21.0", + "verify_token": "your_secure_random_token_here" } } ] @@ -187,6 +188,21 @@ Update your `config.json` with the Business API configuration: | `access_token` | Yes | Permanent access token (from Step 4) | | `business_account_id` | No | WhatsApp Business Account ID (optional, for reference) | | `api_version` | No | Graph API version (defaults to `"v21.0"`) | +| `verify_token` | Yes | Random string for webhook verification (see Step 8a) | + +### Step 8a: Generate Verify Token + +The verify token is used by Meta to verify your webhook endpoint. Generate a secure random string: + +```bash +# Generate a random token +openssl rand -hex 32 + +# Or use any secure random string like: +# "my_secure_verify_token_abc123xyz789" +``` + +Add this token to your `config.json` (see above) and save it - you'll need it for webhook configuration in Step 10. ## Step 9: Start WhatsHooked @@ -197,6 +213,7 @@ Update your `config.json` with the Business API configuration: You should see: ``` INFO Business API client connected account_id=business phone=+1234567890 +INFO Hook manager started and subscribed to events event_types=13 ``` If you see `Failed to connect client`, check the error message and verify: @@ -205,8 +222,647 @@ If you see `Failed to connect client`, check the error message and verify: 3. Access token hasn't expired 4. Business Account has WhatsApp API access enabled +## Step 10: Configure Webhook in Meta Developer Console + +WhatsHooked provides a webhook endpoint to receive incoming messages and status updates from WhatsApp. + +### 10.1: Webhook URL Format + +Your webhook URL should be: +``` +https://your-domain.com/webhooks/whatsapp/{account_id} +``` + +Where `{account_id}` matches the `id` field in your config (e.g., "business"). + +**Example**: If your domain is `api.example.com` and account ID is `business`: +``` +https://api.example.com/webhooks/whatsapp/business +``` + +### 10.2: Configure in Meta Developer Console + +1. Go to [Meta Developers](https://developers.facebook.com/) +2. Select your app +3. Navigate to **WhatsApp** → **Configuration** +4. Under "Webhook", click **Edit** +5. Enter: + - **Callback URL**: `https://your-domain.com/webhooks/whatsapp/business` + - **Verify Token**: The same token from your `config.json` (`verify_token` field) +6. Click **Verify and Save** + +Meta will send a GET request to verify your endpoint. If verification succeeds, you'll see a green checkmark. + +### 10.3: Subscribe to Webhook Events + +After verification, subscribe to these webhook fields: +- ✅ **messages** - Incoming messages and message status updates +- ✅ **message_template_status_update** - Template approval/rejection (optional) +- ✅ **account_update** - Account changes (optional) +- ✅ **phone_number_quality_update** - Quality rating changes (optional) + +Click **Subscribe** for each field you want to receive. + +## Supported Webhook Events + +WhatsHooked supports all WhatsApp Business API webhook events and message types: + +### Message Types + +| Type | Supported | Downloads Media | Description | +|------|-----------|-----------------|-------------| +| `text` | ✅ | N/A | Text messages | +| `image` | ✅ | ✅ | Images with optional caption | +| `video` | ✅ | ✅ | Videos with optional caption | +| `document` | ✅ | ✅ | PDFs, docs, etc. with filename | +| `audio` | ✅ | ✅ | Voice messages and audio files | +| `sticker` | ✅ | ✅ | Animated and static stickers | +| `location` | ✅ | N/A | GPS coordinates with name/address | +| `contacts` | ✅ | N/A | Shared contact cards (vCard) | +| `interactive` | ✅ | N/A | Button/list/flow replies | +| `button` | ✅ | N/A | Quick reply button responses | +| `reaction` | ✅ | N/A | Emoji reactions to messages | +| `order` | ✅ | N/A | Catalog/commerce orders | +| `system` | ✅ | N/A | System notifications | + +### Status Updates + +| Status | Event | Description | +|--------|-------|-------------| +| `sent` | `message.sent` | Message sent from your number | +| `delivered` | `message.delivered` | Message delivered to recipient | +| `read` | `message.read` | Message read by recipient | +| `failed` | `message.failed` | Message delivery failed | + +### Webhook Notification Types + +| Field | Description | Events Published | +|-------|-------------|------------------| +| `messages` | Message events | `message.received`, message status updates | +| `message_template_status_update` | Template changes | Logged to console | +| `account_update` | Account config changes | Logged to console | +| `phone_number_quality_update` | Quality rating changes | Logged to console | +| `phone_number_name_update` | Display name changes | Logged to console | +| `account_alerts` | Important alerts | Logged to console | + +## Webhook Security + +WhatsHooked implements proper webhook security: + +1. **Verification**: Uses the `verify_token` to verify Meta's webhook setup request +2. **Account isolation**: Each account has its own webhook endpoint path +3. **No authentication required**: Meta's webhooks don't support custom auth headers +4. **Validation**: Verifies webhook payload structure + +### Webhook Verification Flow + +``` +Meta sends: GET /webhooks/whatsapp/business?hub.mode=subscribe&hub.verify_token=YOUR_TOKEN&hub.challenge=CHALLENGE + ↓ + WhatsHooked verifies token + ↓ + Returns CHALLENGE (200 OK) if valid + 403 Forbidden if invalid +``` + +### Receiving Messages + +``` +Meta sends: POST /webhooks/whatsapp/business + ↓ + WhatsHooked processes webhook + ↓ + Downloads media (if present) + ↓ + Publishes to event bus + ↓ + Triggers your configured hooks + ↓ + Returns 200 OK +``` + +## Testing Webhooks + +### Test with Meta's Test Button + +1. In WhatsApp Configuration → Webhooks +2. Click **Test** next to "messages" +3. Select a sample event (e.g., "Text Message") +4. Click **Send to My Server** +5. Check WhatsHooked logs for the received event + +### Test with Real Messages + +1. Send a message to your WhatsApp Business number +2. Check WhatsHooked logs (set `"log_level": "debug"` for details): + +``` +DEBUG Publishing message received event account_id=business message_id=wamid.xxx from=1234567890 type=text +DEBUG Hook manager received event event_type=message.received +DEBUG Hook matches event hook_id=message_hook event_type=message.received +DEBUG Found relevant hooks for event event_type=message.received hook_count=1 +DEBUG Sending to hook hook_id=message_hook url=https://your-webhook.com/messages +``` + +3. Your webhook should receive the payload + +### Webhook Payload Example + +```json +{ + "account_id": "business", + "message_id": "wamid.HBgNMTIzNDU2Nzg5MAUCABEYEjQyMzRGRDhENzk5MkY5OUFBMQA", + "from": "1234567890", + "to": "1234567890", + "text": "Hello World", + "timestamp": "2026-01-30T12:00:00Z", + "is_group": false, + "sender_name": "John Doe", + "message_type": "text" +} +``` + +## Step 11: Configure Your Webhooks + +## Step 11: Configure Your Webhooks + +WhatsHooked forwards events to your own webhook URLs. Configure them in `config.json`: + +```json +{ + "hooks": [ + { + "id": "message_hook", + "name": "Message Handler", + "url": "https://your-app.com/api/whatsapp/messages", + "method": "POST", + "headers": { + "Authorization": "Bearer your-app-token" + }, + "active": true, + "events": [ + "message.received", + "message.sent", + "message.delivered", + "message.read" + ], + "description": "Receives all message events" + } + ] +} +``` + +### Hook Configuration Fields + +| Field | Required | Description | +|-------|----------|-------------| +| `id` | Yes | Unique identifier for this hook | +| `name` | Yes | Human-readable name | +| `url` | Yes | Your webhook URL to receive events | +| `method` | Yes | HTTP method (usually "POST") | +| `headers` | No | Custom headers (for authentication, etc.) | +| `active` | Yes | Enable/disable this hook | +| `events` | No | Event types to receive (empty = all events) | +| `description` | No | Description for documentation | + +### Available Event Types + +**Message Events:** +- `message.received` - Incoming messages +- `message.sent` - Outgoing messages +- `message.delivered` - Delivery confirmations +- `message.read` - Read receipts +- `message.failed` - Delivery failures + +**Connection Events:** +- `whatsapp.connected` - Account connected +- `whatsapp.disconnected` - Account disconnected + +**QR Code Events** (whatsmeow only): +- `whatsapp.qr.code` - QR code for pairing +- `whatsapp.qr.timeout` - QR code expired +- `whatsapp.qr.error` - QR code error + +**Hook Events:** +- `hook.triggered` - Hook was called +- `hook.success` - Hook responded successfully +- `hook.failed` - Hook call failed + +### Query Parameters + +WhatsHooked automatically adds query parameters to your webhook URL: + +``` +https://your-app.com/api/whatsapp/messages?event=message.received&account_id=business +``` + +- `event` - The event type +- `account_id` - The WhatsApp account that triggered the event + +## Message Cache System + +WhatsHooked includes a message cache that stores events when no active webhooks are configured. This ensures zero message loss. + +### Enable Message Cache + +Add to your `config.json`: + +```json +{ + "message_cache": { + "enabled": true, + "data_path": "./data/message_cache", + "max_age_days": 7, + "max_events": 10000 + } +} +``` + +### When Events Are Cached + +Events are automatically cached when: +- No webhooks are configured for the event type +- All webhooks are inactive (`"active": false`) +- No webhooks match the event in their `events` array + +### Cache Management API + +**List cached events:** +```bash +curl -u username:password http://localhost:8080/api/cache +``` + +**Get cache statistics:** +```bash +curl -u username:password http://localhost:8080/api/cache/stats +``` + +**Replay all cached events:** +```bash +curl -X POST -u username:password http://localhost:8080/api/cache/replay +``` + +**Replay specific event:** +```bash +curl -X POST -u username:password \ + "http://localhost:8080/api/cache/event/replay?id=EVENT_ID" +``` + +**Delete cached event:** +```bash +curl -X DELETE -u username:password \ + "http://localhost:8080/api/cache/event/delete?id=EVENT_ID" +``` + +**Clear all cache:** +```bash +curl -X DELETE -u username:password \ + "http://localhost:8080/api/cache/clear?confirm=true" +``` + +### Cache Workflow Example + +1. **Disable webhooks** → New messages get cached +2. **Configure/enable webhooks** → Future messages delivered immediately +3. **Call replay API** → Cached messages delivered to webhooks +4. **Successful delivery** → Events removed from cache automatically + ## Troubleshooting +### Webhooks Not Receiving Events + +**Check these items:** + +1. **Verify token is correct** in both `config.json` and Meta Developer Console +2. **Check webhook is active** in Meta console (green checkmark) +3. **Verify URL is accessible** from internet (Meta needs to reach it) +4. **Check logs** with `"log_level": "debug"`: + ``` + DEBUG Publishing message received event account_id=business + DEBUG Hook manager received event event_type=message.received + DEBUG Hook matches event hook_id=message_hook + ``` +5. **Test with curl**: + ```bash + # Send test message to your WhatsApp Business number + # Check if webhook receives it + ``` + +### Webhook Verification Fails + +**Error**: "The callback URL or verify token couldn't be validated" + +**Causes**: +- `verify_token` mismatch between config.json and Meta console +- WhatsHooked server not running +- Firewall blocking Meta's IP ranges +- Wrong webhook URL format + +**Fix**: +1. Ensure server is running: `./bin/whatshook-server -config config.json` +2. Check logs for verification attempt +3. Verify token matches exactly (case-sensitive) +4. Test URL is accessible: `curl https://your-domain.com/webhooks/whatsapp/business` + +### Messages Not Cached + +**Check**: +1. `message_cache.enabled` is `true` in config +2. Hooks are actually inactive or not matching events +3. Check cache stats: `curl -u user:pass http://localhost:8080/api/cache/stats` + +### No Hooks Configured Error + +If events are being cached but you have hooks configured, check: +- Hook `"active"` is `true` +- Hook `"events"` array includes the event type (or is empty for all events) +- Hook URL is reachable and responding with 2xx status + +Enable debug logging to trace the issue: +```json +{ + "log_level": "debug" +} +``` + +## Webhook Payload Examples + +### Text Message + +```json +{ + "account_id": "business", + "message_id": "wamid.HBgNMTIzNDU2Nzg5MAUCABEYEjQyMzRGRDhENzk5MkY5OUFBMQA", + "from": "1234567890", + "to": "1234567890", + "text": "Hello, how can I help?", + "timestamp": "2026-01-30T12:00:00Z", + "is_group": false, + "sender_name": "John Doe", + "message_type": "text" +} +``` + +### Image Message (with media) + +```json +{ + "account_id": "business", + "message_id": "wamid.xxx", + "from": "1234567890", + "to": "1234567890", + "text": "Check this out!", + "timestamp": "2026-01-30T12:00:00Z", + "is_group": false, + "sender_name": "John Doe", + "message_type": "image", + "media": { + "type": "image", + "mime_type": "image/jpeg", + "filename": "wamid.xxx_a1b2c3d4.jpg", + "url": "http://localhost:8080/api/media/business/wamid.xxx_a1b2c3d4.jpg", + "base64": "..." // Only if media.mode is "base64" or "both" + } +} +``` + +### Location Message + +```json +{ + "account_id": "business", + "message_id": "wamid.xxx", + "from": "1234567890", + "to": "1234567890", + "text": "Location: Office (123 Main St) - 40.712800, -74.006000", + "timestamp": "2026-01-30T12:00:00Z", + "is_group": false, + "sender_name": "John Doe", + "message_type": "location" +} +``` + +### Button Reply (Interactive) + +```json +{ + "account_id": "business", + "message_id": "wamid.xxx", + "from": "1234567890", + "to": "1234567890", + "text": "Yes, I'm interested", + "timestamp": "2026-01-30T12:00:00Z", + "is_group": false, + "sender_name": "John Doe", + "message_type": "interactive" +} +``` + +### Delivery Status + +```json +{ + "event_type": "message.delivered", + "timestamp": "2026-01-30T12:00:05Z", + "data": { + "account_id": "business", + "message_id": "wamid.xxx", + "from": "1234567890", + "timestamp": "2026-01-30T12:00:05Z" + } +} +``` + +## Complete Configuration Example + +Here's a complete `config.json` with all Business API features: + +```json +{ + "server": { + "host": "0.0.0.0", + "port": 8080, + "default_country_code": "1", + "username": "admin", + "password": "secure_password", + "auth_key": "optional_api_key" + }, + "whatsapp": [ + { + "id": "business", + "type": "business-api", + "phone_number": "+1234567890", + "business_api": { + "phone_number_id": "123456789012345", + "access_token": "EAAxxxxxxxxxxxx", + "business_account_id": "987654321098765", + "api_version": "v21.0", + "verify_token": "my_secure_random_token_abc123" + } + } + ], + "hooks": [ + { + "id": "message_hook", + "name": "Message Handler", + "url": "https://your-app.com/api/whatsapp/messages", + "method": "POST", + "headers": { + "Authorization": "Bearer your-app-secret-token", + "X-Custom-Header": "value" + }, + "active": true, + "events": [ + "message.received", + "message.sent", + "message.delivered", + "message.read" + ], + "description": "Handles all message events" + }, + { + "id": "status_hook", + "name": "Connection Monitor", + "url": "https://your-app.com/api/whatsapp/status", + "method": "POST", + "active": true, + "events": [ + "whatsapp.connected", + "whatsapp.disconnected" + ], + "description": "Monitors connection status" + } + ], + "media": { + "data_path": "./data/media", + "mode": "link", + "base_url": "https://your-domain.com" + }, + "message_cache": { + "enabled": true, + "data_path": "./data/message_cache", + "max_age_days": 7, + "max_events": 10000 + }, + "event_logger": { + "enabled": true, + "targets": ["file", "sqlite"], + "file_dir": "./data/events", + "table_name": "event_logs" + }, + "log_level": "info" +} +``` + +## Advanced Features + +### Media Handling Modes + +WhatsHooked supports three media delivery modes: + +**1. Link Mode** (default, recommended) +```json +{ + "media": { + "mode": "link", + "base_url": "https://your-domain.com" + } +} +``` +- Downloads media and stores locally +- Webhooks receive URL: `https://your-domain.com/api/media/business/filename.jpg` +- Efficient for large media files + +**2. Base64 Mode** +```json +{ + "media": { + "mode": "base64" + } +} +``` +- Encodes media as base64 in webhook payload +- No separate download needed +- Good for small files, increases payload size + +**3. Both Mode** +```json +{ + "media": { + "mode": "both" + } +} +``` +- Provides both URL and base64 +- Maximum flexibility, largest payloads + +### Event Logger + +Track all events to file and/or database: + +```json +{ + "event_logger": { + "enabled": true, + "targets": ["file", "sqlite", "postgres"], + "file_dir": "./data/events", + "table_name": "event_logs" + }, + "database": { + "type": "postgres", + "host": "localhost", + "port": 5432, + "username": "whatshooked", + "password": "password", + "database": "whatshooked" + } +} +``` + +Logged events include: +- All message events +- Connection status changes +- Hook success/failure +- Webhook triggers + +### Two-Way Communication + +Your webhooks can respond to trigger outgoing messages: + +**Webhook Response Format:** +```json +{ + "send_message": true, + "to": "1234567890", + "text": "Thanks for your message!", + "account_id": "business" +} +``` + +This sends a reply immediately when your webhook receives an event. + +## Production Deployment Checklist + +Before going live: + +- [ ] Use a System User token (not personal user token) +- [ ] Set `verify_token` to a secure random string (32+ characters) +- [ ] Configure webhooks in Meta Developer Console +- [ ] Subscribe to required webhook fields (messages, etc.) +- [ ] Test webhook verification succeeds +- [ ] Enable HTTPS for production (required by Meta) +- [ ] Set up firewall rules to allow Meta's webhook IPs +- [ ] Configure authentication (`username`/`password` or `auth_key`) +- [ ] Enable message cache for reliability +- [ ] Set up event logging for audit trail +- [ ] Test sending and receiving messages +- [ ] Monitor logs for errors +- [ ] Set up log rotation for production +- [ ] Document your webhook endpoints +- [ ] Set up monitoring/alerts for webhook failures + +## Troubleshooting Common Issues + ### Error: "Object with ID does not exist" (error_subcode: 33) **Cause**: One of the following: diff --git a/pkg/cache/message_cache.go b/pkg/cache/message_cache.go new file mode 100644 index 0000000..9f9da0c --- /dev/null +++ b/pkg/cache/message_cache.go @@ -0,0 +1,394 @@ +package cache + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "sync" + "time" + + "git.warky.dev/wdevs/whatshooked/pkg/events" + "git.warky.dev/wdevs/whatshooked/pkg/logging" +) + +// CachedEvent represents an event stored in cache +type CachedEvent struct { + ID string `json:"id"` + Event events.Event `json:"event"` + Timestamp time.Time `json:"timestamp"` + Reason string `json:"reason"` + Attempts int `json:"attempts"` + LastAttempt *time.Time `json:"last_attempt,omitempty"` +} + +// MessageCache manages cached events when no webhooks are available +type MessageCache struct { + events map[string]*CachedEvent + mu sync.RWMutex + dataPath string + enabled bool + maxAge time.Duration // Maximum age before events are purged + maxEvents int // Maximum number of events to keep +} + +// Config holds cache configuration +type Config struct { + Enabled bool `json:"enabled"` + DataPath string `json:"data_path"` + MaxAge time.Duration `json:"max_age"` // Default: 7 days + MaxEvents int `json:"max_events"` // Default: 10000 +} + +// NewMessageCache creates a new message cache +func NewMessageCache(cfg Config) (*MessageCache, error) { + if !cfg.Enabled { + return &MessageCache{ + enabled: false, + }, nil + } + + if cfg.DataPath == "" { + cfg.DataPath = "./data/cache" + } + if cfg.MaxAge == 0 { + cfg.MaxAge = 7 * 24 * time.Hour // 7 days + } + if cfg.MaxEvents == 0 { + cfg.MaxEvents = 10000 + } + + // Create cache directory + if err := os.MkdirAll(cfg.DataPath, 0755); err != nil { + return nil, fmt.Errorf("failed to create cache directory: %w", err) + } + + cache := &MessageCache{ + events: make(map[string]*CachedEvent), + dataPath: cfg.DataPath, + enabled: true, + maxAge: cfg.MaxAge, + maxEvents: cfg.MaxEvents, + } + + // Load existing cached events + if err := cache.loadFromDisk(); err != nil { + logging.Warn("Failed to load cached events from disk", "error", err) + } + + // Start cleanup goroutine + go cache.cleanupLoop() + + logging.Info("Message cache initialized", + "enabled", cfg.Enabled, + "data_path", cfg.DataPath, + "max_age", cfg.MaxAge, + "max_events", cfg.MaxEvents) + + return cache, nil +} + +// Store adds an event to the cache +func (c *MessageCache) Store(event events.Event, reason string) error { + if !c.enabled { + return nil + } + + c.mu.Lock() + defer c.mu.Unlock() + + // Check if we're at capacity + if len(c.events) >= c.maxEvents { + // Remove oldest event + c.removeOldest() + } + + // Generate unique ID + id := fmt.Sprintf("%d-%s", time.Now().UnixNano(), event.Type) + + cached := &CachedEvent{ + ID: id, + Event: event, + Timestamp: time.Now(), + Reason: reason, + Attempts: 0, + } + + c.events[id] = cached + + // Save to disk asynchronously + go c.saveToDisk(cached) + + logging.Debug("Event cached", + "event_id", id, + "event_type", event.Type, + "reason", reason, + "cache_size", len(c.events)) + + return nil +} + +// Get retrieves a cached event by ID +func (c *MessageCache) Get(id string) (*CachedEvent, bool) { + if !c.enabled { + return nil, false + } + + c.mu.RLock() + defer c.mu.RUnlock() + + event, exists := c.events[id] + return event, exists +} + +// List returns all cached events +func (c *MessageCache) List() []*CachedEvent { + if !c.enabled { + return nil + } + + c.mu.RLock() + defer c.mu.RUnlock() + + result := make([]*CachedEvent, 0, len(c.events)) + for _, event := range c.events { + result = append(result, event) + } + + return result +} + +// ListByEventType returns cached events filtered by event type +func (c *MessageCache) ListByEventType(eventType events.EventType) []*CachedEvent { + if !c.enabled { + return nil + } + + c.mu.RLock() + defer c.mu.RUnlock() + + result := make([]*CachedEvent, 0) + for _, cached := range c.events { + if cached.Event.Type == eventType { + result = append(result, cached) + } + } + + return result +} + +// Remove deletes an event from the cache +func (c *MessageCache) Remove(id string) error { + if !c.enabled { + return nil + } + + c.mu.Lock() + defer c.mu.Unlock() + + if _, exists := c.events[id]; !exists { + return fmt.Errorf("cached event not found: %s", id) + } + + delete(c.events, id) + + // Remove from disk + go c.removeFromDisk(id) + + logging.Debug("Event removed from cache", "event_id", id) + + return nil +} + +// IncrementAttempts increments the delivery attempt counter +func (c *MessageCache) IncrementAttempts(id string) error { + if !c.enabled { + return nil + } + + c.mu.Lock() + defer c.mu.Unlock() + + cached, exists := c.events[id] + if !exists { + return fmt.Errorf("cached event not found: %s", id) + } + + now := time.Now() + cached.Attempts++ + cached.LastAttempt = &now + + // Update on disk + go c.saveToDisk(cached) + + return nil +} + +// Clear removes all cached events +func (c *MessageCache) Clear() error { + if !c.enabled { + return nil + } + + c.mu.Lock() + defer c.mu.Unlock() + + c.events = make(map[string]*CachedEvent) + + // Clear disk cache + go c.clearDisk() + + logging.Info("Message cache cleared") + + return nil +} + +// Count returns the number of cached events +func (c *MessageCache) Count() int { + if !c.enabled { + return 0 + } + + c.mu.RLock() + defer c.mu.RUnlock() + + return len(c.events) +} + +// IsEnabled returns whether the cache is enabled +func (c *MessageCache) IsEnabled() bool { + return c.enabled +} + +// removeOldest removes the oldest event from the cache +func (c *MessageCache) removeOldest() { + var oldestID string + var oldestTime time.Time + + for id, cached := range c.events { + if oldestID == "" || cached.Timestamp.Before(oldestTime) { + oldestID = id + oldestTime = cached.Timestamp + } + } + + if oldestID != "" { + delete(c.events, oldestID) + go c.removeFromDisk(oldestID) + logging.Debug("Removed oldest cached event due to capacity", "event_id", oldestID) + } +} + +// cleanupLoop periodically removes expired events +func (c *MessageCache) cleanupLoop() { + ticker := time.NewTicker(1 * time.Hour) + defer ticker.Stop() + + for range ticker.C { + c.cleanup() + } +} + +// cleanup removes expired events +func (c *MessageCache) cleanup() { + if !c.enabled { + return + } + + c.mu.Lock() + defer c.mu.Unlock() + + now := time.Now() + expiredIDs := make([]string, 0) + + for id, cached := range c.events { + if now.Sub(cached.Timestamp) > c.maxAge { + expiredIDs = append(expiredIDs, id) + } + } + + for _, id := range expiredIDs { + delete(c.events, id) + go c.removeFromDisk(id) + } + + if len(expiredIDs) > 0 { + logging.Info("Cleaned up expired cached events", "count", len(expiredIDs)) + } +} + +// saveToDisk saves a cached event to disk +func (c *MessageCache) saveToDisk(cached *CachedEvent) { + filePath := filepath.Join(c.dataPath, fmt.Sprintf("%s.json", cached.ID)) + + data, err := json.MarshalIndent(cached, "", " ") + if err != nil { + logging.Error("Failed to marshal cached event", "event_id", cached.ID, "error", err) + return + } + + if err := os.WriteFile(filePath, data, 0644); err != nil { + logging.Error("Failed to save cached event to disk", "event_id", cached.ID, "error", err) + } +} + +// loadFromDisk loads all cached events from disk +func (c *MessageCache) loadFromDisk() error { + files, err := filepath.Glob(filepath.Join(c.dataPath, "*.json")) + if err != nil { + return fmt.Errorf("failed to list cache files: %w", err) + } + + loaded := 0 + for _, file := range files { + data, err := os.ReadFile(file) + if err != nil { + logging.Warn("Failed to read cache file", "file", file, "error", err) + continue + } + + var cached CachedEvent + if err := json.Unmarshal(data, &cached); err != nil { + logging.Warn("Failed to unmarshal cache file", "file", file, "error", err) + continue + } + + // Skip expired events + if time.Since(cached.Timestamp) > c.maxAge { + os.Remove(file) + continue + } + + c.events[cached.ID] = &cached + loaded++ + } + + if loaded > 0 { + logging.Info("Loaded cached events from disk", "count", loaded) + } + + return nil +} + +// removeFromDisk removes a cached event file from disk +func (c *MessageCache) removeFromDisk(id string) { + filePath := filepath.Join(c.dataPath, fmt.Sprintf("%s.json", id)) + if err := os.Remove(filePath); err != nil && !os.IsNotExist(err) { + logging.Error("Failed to remove cached event from disk", "event_id", id, "error", err) + } +} + +// clearDisk removes all cache files from disk +func (c *MessageCache) clearDisk() { + files, err := filepath.Glob(filepath.Join(c.dataPath, "*.json")) + if err != nil { + logging.Error("Failed to list cache files for clearing", "error", err) + return + } + + for _, file := range files { + if err := os.Remove(file); err != nil { + logging.Error("Failed to remove cache file", "file", file, "error", err) + } + } +} diff --git a/pkg/config/config.go b/pkg/config/config.go index deaf775..ca7f64d 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -7,13 +7,14 @@ import ( // Config represents the application configuration type Config struct { - Server ServerConfig `json:"server"` - WhatsApp []WhatsAppConfig `json:"whatsapp"` - Hooks []Hook `json:"hooks"` - Database DatabaseConfig `json:"database,omitempty"` - Media MediaConfig `json:"media"` - EventLogger EventLoggerConfig `json:"event_logger,omitempty"` - LogLevel string `json:"log_level"` + Server ServerConfig `json:"server"` + WhatsApp []WhatsAppConfig `json:"whatsapp"` + Hooks []Hook `json:"hooks"` + Database DatabaseConfig `json:"database,omitempty"` + Media MediaConfig `json:"media"` + EventLogger EventLoggerConfig `json:"event_logger,omitempty"` + MessageCache MessageCacheConfig `json:"message_cache,omitempty"` + LogLevel string `json:"log_level"` } // ServerConfig holds server-specific configuration @@ -122,6 +123,14 @@ type MQTTConfig struct { Subscribe bool `json:"subscribe,omitempty"` // Enable subscription for sending messages } +// MessageCacheConfig holds message cache configuration +type MessageCacheConfig struct { + Enabled bool `json:"enabled"` // Enable message caching + DataPath string `json:"data_path,omitempty"` // Directory to store cached events + MaxAgeDays int `json:"max_age_days,omitempty"` // Maximum age in days before purging (default: 7) + MaxEvents int `json:"max_events,omitempty"` // Maximum number of events to cache (default: 10000) +} + // Load reads configuration from a file func Load(path string) (*Config, error) { data, err := os.ReadFile(path) @@ -186,6 +195,17 @@ func Load(path string) (*Config, error) { } } + // Set message cache defaults + if cfg.MessageCache.DataPath == "" { + cfg.MessageCache.DataPath = "./data/message_cache" + } + if cfg.MessageCache.MaxAgeDays == 0 { + cfg.MessageCache.MaxAgeDays = 7 + } + if cfg.MessageCache.MaxEvents == 0 { + cfg.MessageCache.MaxEvents = 10000 + } + return &cfg, nil } diff --git a/pkg/handlers/businessapi.go b/pkg/handlers/businessapi.go index 495e475..86263f8 100644 --- a/pkg/handlers/businessapi.go +++ b/pkg/handlers/businessapi.go @@ -10,16 +10,21 @@ import ( // BusinessAPIWebhook handles both verification (GET) and webhook events (POST) func (h *Handlers) BusinessAPIWebhook(w http.ResponseWriter, r *http.Request) { + accountID := extractAccountIDFromPath(r.URL.Path) + if r.Method == http.MethodGet { + logging.Info("WhatsApp webhook verification request", "account_id", accountID, "method", "GET") h.businessAPIWebhookVerify(w, r) return } if r.Method == http.MethodPost { + logging.Info("WhatsApp webhook event received", "account_id", accountID, "method", "POST") h.businessAPIWebhookEvent(w, r) return } + logging.Warn("WhatsApp webhook invalid method", "account_id", accountID, "method", r.Method) http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) } @@ -29,6 +34,7 @@ func (h *Handlers) businessAPIWebhookVerify(w http.ResponseWriter, r *http.Reque // Extract account ID from URL path accountID := extractAccountIDFromPath(r.URL.Path) if accountID == "" { + logging.Warn("WhatsApp webhook verification missing account ID") http.Error(w, "Account ID required in path", http.StatusBadRequest) return } @@ -94,10 +100,13 @@ func (h *Handlers) businessAPIWebhookEvent(w http.ResponseWriter, r *http.Reques // Extract account ID from URL path accountID := extractAccountIDFromPath(r.URL.Path) if accountID == "" { + logging.Warn("WhatsApp webhook event missing account ID") http.Error(w, "Account ID required in path", http.StatusBadRequest) return } + logging.Info("WhatsApp webhook processing started", "account_id", accountID) + // Get the client from the manager client, exists := h.whatsappMgr.GetClient(accountID) if !exists { @@ -128,6 +137,8 @@ func (h *Handlers) businessAPIWebhookEvent(w http.ResponseWriter, r *http.Reques return } + logging.Info("WhatsApp webhook processed successfully", "account_id", accountID) + // Return 200 OK to acknowledge receipt w.WriteHeader(http.StatusOK) writeBytes(w, []byte("OK")) diff --git a/pkg/handlers/cache.go b/pkg/handlers/cache.go new file mode 100644 index 0000000..b59388e --- /dev/null +++ b/pkg/handlers/cache.go @@ -0,0 +1,254 @@ +package handlers + +import ( + "encoding/json" + "net/http" + "strconv" + + "git.warky.dev/wdevs/whatshooked/pkg/events" + "git.warky.dev/wdevs/whatshooked/pkg/logging" +) + +// GetCachedEvents returns all cached events +// GET /api/cache +func (h *Handlers) GetCachedEvents(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + cache := h.hookMgr.GetCache() + if cache == nil || !cache.IsEnabled() { + http.Error(w, "Message cache is not enabled", http.StatusServiceUnavailable) + return + } + + // Optional event_type filter + eventType := r.URL.Query().Get("event_type") + + var cachedEvents interface{} + if eventType != "" { + cachedEvents = cache.ListByEventType(events.EventType(eventType)) + } else { + cachedEvents = cache.List() + } + + writeJSON(w, map[string]interface{}{ + "cached_events": cachedEvents, + "count": cache.Count(), + }) +} + +// GetCachedEvent returns a specific cached event by ID +// GET /api/cache/{id} +func (h *Handlers) GetCachedEvent(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + cache := h.hookMgr.GetCache() + if cache == nil || !cache.IsEnabled() { + http.Error(w, "Message cache is not enabled", http.StatusServiceUnavailable) + return + } + + // Extract ID from path + id := r.URL.Query().Get("id") + if id == "" { + http.Error(w, "Event ID required", http.StatusBadRequest) + return + } + + cached, exists := cache.Get(id) + if !exists { + http.Error(w, "Cached event not found", http.StatusNotFound) + return + } + + writeJSON(w, cached) +} + +// ReplayCachedEvents replays all cached events +// POST /api/cache/replay +func (h *Handlers) ReplayCachedEvents(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + cache := h.hookMgr.GetCache() + if cache == nil || !cache.IsEnabled() { + http.Error(w, "Message cache is not enabled", http.StatusServiceUnavailable) + return + } + + logging.Info("Replaying all cached events via API") + + successCount, failCount, err := h.hookMgr.ReplayCachedEvents() + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + writeJSON(w, map[string]interface{}{ + "success": true, + "replayed": successCount + failCount, + "delivered": successCount, + "failed": failCount, + "remaining_cached": cache.Count(), + }) +} + +// ReplayCachedEvent replays a specific cached event +// POST /api/cache/replay/{id} +func (h *Handlers) ReplayCachedEvent(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + cache := h.hookMgr.GetCache() + if cache == nil || !cache.IsEnabled() { + http.Error(w, "Message cache is not enabled", http.StatusServiceUnavailable) + return + } + + // Extract ID from request body or query param + var req struct { + ID string `json:"id"` + } + + // Try query param first + id := r.URL.Query().Get("id") + if id == "" { + // Try JSON body + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, "Invalid request body", http.StatusBadRequest) + return + } + id = req.ID + } + + if id == "" { + http.Error(w, "Event ID required", http.StatusBadRequest) + return + } + + logging.Info("Replaying cached event via API", "event_id", id) + + if err := h.hookMgr.ReplayCachedEvent(id); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + writeJSON(w, map[string]interface{}{ + "success": true, + "event_id": id, + "message": "Event replayed successfully", + }) +} + +// DeleteCachedEvent removes a specific cached event +// DELETE /api/cache/{id} +func (h *Handlers) DeleteCachedEvent(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodDelete { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + cache := h.hookMgr.GetCache() + if cache == nil || !cache.IsEnabled() { + http.Error(w, "Message cache is not enabled", http.StatusServiceUnavailable) + return + } + + id := r.URL.Query().Get("id") + if id == "" { + http.Error(w, "Event ID required", http.StatusBadRequest) + return + } + + logging.Info("Deleting cached event via API", "event_id", id) + + if err := cache.Remove(id); err != nil { + http.Error(w, err.Error(), http.StatusNotFound) + return + } + + writeJSON(w, map[string]interface{}{ + "success": true, + "event_id": id, + "message": "Cached event deleted successfully", + }) +} + +// ClearCache removes all cached events +// DELETE /api/cache +func (h *Handlers) ClearCache(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodDelete { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + cache := h.hookMgr.GetCache() + if cache == nil || !cache.IsEnabled() { + http.Error(w, "Message cache is not enabled", http.StatusServiceUnavailable) + return + } + + // Optional confirmation parameter + confirm := r.URL.Query().Get("confirm") + confirmInt, _ := strconv.ParseBool(confirm) + + if !confirmInt { + http.Error(w, "Add ?confirm=true to confirm cache clearing", http.StatusBadRequest) + return + } + + count := cache.Count() + logging.Warn("Clearing all cached events via API", "count", count) + + if err := cache.Clear(); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + writeJSON(w, map[string]interface{}{ + "success": true, + "cleared": count, + "message": "Cache cleared successfully", + }) +} + +// GetCacheStats returns cache statistics +// GET /api/cache/stats +func (h *Handlers) GetCacheStats(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + cache := h.hookMgr.GetCache() + if cache == nil || !cache.IsEnabled() { + writeJSON(w, map[string]interface{}{ + "enabled": false, + "count": 0, + }) + return + } + + // Group by event type + cachedEvents := cache.List() + eventTypeCounts := make(map[string]int) + + for _, cached := range cachedEvents { + eventTypeCounts[string(cached.Event.Type)]++ + } + + writeJSON(w, map[string]interface{}{ + "enabled": true, + "total_count": cache.Count(), + "by_event_type": eventTypeCounts, + }) +} diff --git a/pkg/hooks/manager.go b/pkg/hooks/manager.go index 5d9b3dd..c92034e 100644 --- a/pkg/hooks/manager.go +++ b/pkg/hooks/manager.go @@ -11,6 +11,7 @@ import ( "sync" "time" + "git.warky.dev/wdevs/whatshooked/pkg/cache" "git.warky.dev/wdevs/whatshooked/pkg/config" "git.warky.dev/wdevs/whatshooked/pkg/events" "git.warky.dev/wdevs/whatshooked/pkg/logging" @@ -54,13 +55,15 @@ type Manager struct { mu sync.RWMutex client *http.Client eventBus *events.EventBus + cache *cache.MessageCache } // NewManager creates a new hook manager -func NewManager(eventBus *events.EventBus) *Manager { +func NewManager(eventBus *events.EventBus, messageCache *cache.MessageCache) *Manager { return &Manager{ hooks: make(map[string]config.Hook), eventBus: eventBus, + cache: messageCache, client: &http.Client{ Timeout: 30 * time.Second, }, @@ -128,14 +131,48 @@ func (m *Manager) handleEvent(event events.Event) { logging.Debug("Found relevant hooks for event", "event_type", event.Type, "hook_count", len(relevantHooks)) + // If no relevant hooks found, cache the event + if len(relevantHooks) == 0 { + if m.cache != nil && m.cache.IsEnabled() { + reason := fmt.Sprintf("No active webhooks configured for event type: %s", event.Type) + if err := m.cache.Store(event, reason); err != nil { + logging.Error("Failed to cache event", "event_type", event.Type, "error", err) + } else { + logging.Info("Event cached due to no active webhooks", + "event_type", event.Type, + "cache_size", m.cache.Count()) + } + } else { + logging.Warn("No active webhooks for event and caching is disabled", + "event_type", event.Type) + } + return + } + // Trigger each relevant hook - if len(relevantHooks) > 0 { - m.triggerHooksForEvent(event, relevantHooks) + success := m.triggerHooksForEvent(event, relevantHooks) + + // If event was successfully delivered and it was previously cached, remove it from cache + if success && m.cache != nil && m.cache.IsEnabled() { + // Try to find and remove this event from cache + // (This handles the case where a cached event is being replayed) + cachedEvents := m.cache.List() + for _, cached := range cachedEvents { + if cached.Event.Type == event.Type && + cached.Event.Timestamp.Equal(event.Timestamp) { + if err := m.cache.Remove(cached.ID); err == nil { + logging.Info("Cached event successfully delivered and removed from cache", + "event_id", cached.ID, + "event_type", event.Type) + } + break + } + } } } -// triggerHooksForEvent sends event data to specific hooks -func (m *Manager) triggerHooksForEvent(event events.Event, hooks []config.Hook) { +// triggerHooksForEvent sends event data to specific hooks and returns success status +func (m *Manager) triggerHooksForEvent(event events.Event, hooks []config.Hook) bool { ctx := event.Context if ctx == nil { ctx = context.Background() @@ -184,14 +221,26 @@ func (m *Manager) triggerHooksForEvent(event events.Event, hooks []config.Hook) // Send to each hook with the event type var wg sync.WaitGroup + successCount := 0 + mu := sync.Mutex{} + for _, hook := range hooks { wg.Add(1) go func(h config.Hook, et events.EventType) { defer wg.Done() - _ = m.sendToHook(ctx, h, payload, et) + resp := m.sendToHook(ctx, h, payload, et) + if resp != nil || ctx.Err() == nil { + // Count as success if we got a response or context is still valid + mu.Lock() + successCount++ + mu.Unlock() + } }(hook, event.Type) } wg.Wait() + + // Return true if at least one hook was successfully triggered + return successCount > 0 } // Helper functions to extract data from event map @@ -379,3 +428,77 @@ func (m *Manager) sendToHook(ctx context.Context, hook config.Hook, payload inte m.eventBus.Publish(events.HookSuccessEvent(eventCtx, hook.ID, hook.Name, resp.StatusCode, hookResp)) return &hookResp } + +// ReplayCachedEvents attempts to replay all cached events +func (m *Manager) ReplayCachedEvents() (successCountResult int, failCountResult int, err error) { + if m.cache == nil || !m.cache.IsEnabled() { + return 0, 0, fmt.Errorf("message cache is not enabled") + } + + cachedEvents := m.cache.List() + if len(cachedEvents) == 0 { + return 0, 0, nil + } + + logging.Info("Replaying cached events", "count", len(cachedEvents)) + + successCount := 0 + failCount := 0 + + for _, cached := range cachedEvents { + // Try to process the event again + m.handleEvent(cached.Event) + + // Increment attempt counter + if err := m.cache.IncrementAttempts(cached.ID); err != nil { + logging.Error("Failed to increment attempt counter", "event_id", cached.ID, "error", err) + } + + // Check if event was successfully delivered by seeing if it's still cached + // (handleEvent will remove it from cache if successfully delivered) + time.Sleep(100 * time.Millisecond) // Give time for async delivery + + if _, exists := m.cache.Get(cached.ID); !exists { + successCount++ + logging.Debug("Cached event successfully replayed", "event_id", cached.ID) + } else { + failCount++ + } + } + + logging.Info("Cached event replay complete", + "success", successCount, + "failed", failCount, + "remaining_cached", m.cache.Count()) + + return successCount, failCount, nil +} + +// ReplayCachedEvent attempts to replay a single cached event by ID +func (m *Manager) ReplayCachedEvent(id string) error { + if m.cache == nil || !m.cache.IsEnabled() { + return fmt.Errorf("message cache is not enabled") + } + + cached, exists := m.cache.Get(id) + if !exists { + return fmt.Errorf("cached event not found: %s", id) + } + + logging.Info("Replaying cached event", "event_id", id, "event_type", cached.Event.Type) + + // Process the event + m.handleEvent(cached.Event) + + // Increment attempt counter + if err := m.cache.IncrementAttempts(id); err != nil { + logging.Error("Failed to increment attempt counter", "event_id", id, "error", err) + } + + return nil +} + +// GetCache returns the message cache (for external access) +func (m *Manager) GetCache() *cache.MessageCache { + return m.cache +} diff --git a/pkg/whatsapp/businessapi/events.go b/pkg/whatsapp/businessapi/events.go index fb89993..680a891 100644 --- a/pkg/whatsapp/businessapi/events.go +++ b/pkg/whatsapp/businessapi/events.go @@ -31,13 +31,24 @@ func (c *Client) HandleWebhook(r *http.Request) error { return fmt.Errorf("failed to parse webhook payload: %w", err) } + logging.Info("Processing webhook payload", + "account_id", c.id, + "entries", len(payload.Entry)) + // Process each entry + changeCount := 0 for _, entry := range payload.Entry { + changeCount += len(entry.Changes) for i := range entry.Changes { c.processChange(entry.Changes[i]) } } + logging.Info("Webhook payload processed", + "account_id", c.id, + "entries", len(payload.Entry), + "changes", changeCount) + return nil } @@ -45,11 +56,17 @@ func (c *Client) HandleWebhook(r *http.Request) error { func (c *Client) processChange(change WebhookChange) { ctx := context.Background() + logging.Info("Processing webhook change", + "account_id", c.id, + "field", change.Field, + "phone_number_id", change.Value.Metadata.PhoneNumberID) + // Handle different field types switch change.Field { case "messages": // Process messages - for _, msg := range change.Value.Messages { + for i := range change.Value.Messages { + msg := change.Value.Messages[i] c.processMessage(ctx, msg, change.Value.Contacts) } @@ -212,7 +229,8 @@ func (c *Client) processMessage(ctx context.Context, msg WebhookMessage, contact messageType = "contacts" // Format contacts as text var contactNames []string - for _, contact := range msg.Contacts { + for i := range msg.Contacts { + contact := msg.Contacts[i] contactNames = append(contactNames, contact.Name.FormattedName) } text = fmt.Sprintf("Shared %d contact(s): %s", len(msg.Contacts), strings.Join(contactNames, ", ")) @@ -262,7 +280,7 @@ func (c *Client) processMessage(ctx context.Context, msg WebhookMessage, contact } case "unknown": - messageType = "unknown" + // messageType = "unknown" logging.Warn("Received unknown message type", "account_id", c.id, "message_id", msg.ID) return @@ -272,7 +290,7 @@ func (c *Client) processMessage(ctx context.Context, msg WebhookMessage, contact } // Publish message received event - logging.Debug("Publishing message received event", + logging.Info("Message received via WhatsApp", "account_id", c.id, "message_id", msg.ID, "from", msg.From, @@ -306,15 +324,15 @@ func (c *Client) processStatus(ctx context.Context, status WebhookStatus) { switch status.Status { case "sent": c.eventBus.Publish(events.MessageSentEvent(ctx, c.id, status.ID, status.RecipientID, "")) - logging.Debug("Message sent status", "account_id", c.id, "message_id", status.ID) + logging.Info("Message status: sent", "account_id", c.id, "message_id", status.ID, "recipient", status.RecipientID) case "delivered": c.eventBus.Publish(events.MessageDeliveredEvent(ctx, c.id, status.ID, status.RecipientID, timestamp)) - logging.Debug("Message delivered", "account_id", c.id, "message_id", status.ID) + logging.Info("Message status: delivered", "account_id", c.id, "message_id", status.ID, "recipient", status.RecipientID) case "read": c.eventBus.Publish(events.MessageReadEvent(ctx, c.id, status.ID, status.RecipientID, timestamp)) - logging.Debug("Message read", "account_id", c.id, "message_id", status.ID) + logging.Info("Message status: read", "account_id", c.id, "message_id", status.ID, "recipient", status.RecipientID) case "failed": errMsg := "unknown error" diff --git a/pkg/whatshooked/server.go b/pkg/whatshooked/server.go index fcdebfb..3194854 100644 --- a/pkg/whatshooked/server.go +++ b/pkg/whatshooked/server.go @@ -232,11 +232,21 @@ func (s *Server) setupRoutes() *http.ServeMux { // Business API webhooks (no auth - Meta validates via verify_token) mux.HandleFunc("/webhooks/whatsapp/", h.BusinessAPIWebhook) + // Message cache management (with auth) + mux.HandleFunc("/api/cache", h.Auth(h.GetCachedEvents)) // GET - list cached events + mux.HandleFunc("/api/cache/stats", h.Auth(h.GetCacheStats)) // GET - cache statistics + mux.HandleFunc("/api/cache/replay", h.Auth(h.ReplayCachedEvents)) // POST - replay all + mux.HandleFunc("/api/cache/event", h.Auth(h.GetCachedEvent)) // GET with ?id= + mux.HandleFunc("/api/cache/event/replay", h.Auth(h.ReplayCachedEvent)) // POST with ?id= + mux.HandleFunc("/api/cache/event/delete", h.Auth(h.DeleteCachedEvent)) // DELETE with ?id= + mux.HandleFunc("/api/cache/clear", h.Auth(h.ClearCache)) // DELETE with ?confirm=true + logging.Info("HTTP server endpoints configured", "health", "/health", "hooks", "/api/hooks", "accounts", "/api/accounts", "send", "/api/send", + "cache", "/api/cache", "qr", "/api/qr") return mux diff --git a/pkg/whatshooked/whatshooked.go b/pkg/whatshooked/whatshooked.go index 1ec72ac..37066aa 100644 --- a/pkg/whatshooked/whatshooked.go +++ b/pkg/whatshooked/whatshooked.go @@ -2,7 +2,9 @@ package whatshooked import ( "context" + "time" + "git.warky.dev/wdevs/whatshooked/pkg/cache" "git.warky.dev/wdevs/whatshooked/pkg/config" "git.warky.dev/wdevs/whatshooked/pkg/eventlogger" "git.warky.dev/wdevs/whatshooked/pkg/events" @@ -14,14 +16,15 @@ import ( // WhatsHooked is the main library instance type WhatsHooked struct { - config *config.Config - configPath string - eventBus *events.EventBus - whatsappMgr *whatsapp.Manager - hookMgr *hooks.Manager - eventLogger *eventlogger.Logger - handlers *handlers.Handlers - server *Server // Optional built-in server + config *config.Config + configPath string + eventBus *events.EventBus + whatsappMgr *whatsapp.Manager + hookMgr *hooks.Manager + eventLogger *eventlogger.Logger + messageCache *cache.MessageCache + handlers *handlers.Handlers + server *Server // Optional built-in server } // NewFromFile creates a WhatsHooked instance from a config file @@ -84,8 +87,24 @@ func newWithConfig(cfg *config.Config, configPath string) (*WhatsHooked, error) }, ) + // Initialize message cache + cacheConfig := cache.Config{ + Enabled: cfg.MessageCache.Enabled, + DataPath: cfg.MessageCache.DataPath, + MaxAge: time.Duration(cfg.MessageCache.MaxAgeDays) * 24 * time.Hour, + MaxEvents: cfg.MessageCache.MaxEvents, + } + + messageCache, err := cache.NewMessageCache(cacheConfig) + if err != nil { + logging.Error("Failed to initialize message cache", "error", err) + // Continue without cache rather than failing + messageCache = &cache.MessageCache{} + } + wh.messageCache = messageCache + // Initialize hook manager - wh.hookMgr = hooks.NewManager(wh.eventBus) + wh.hookMgr = hooks.NewManager(wh.eventBus, wh.messageCache) wh.hookMgr.LoadHooks(cfg.Hooks) wh.hookMgr.Start()