package cache import ( "context" "encoding/json" "fmt" "os" "path/filepath" "sort" "strings" "sync" "time" "git.warky.dev/wdevs/whatshooked/pkg/events" "git.warky.dev/wdevs/whatshooked/pkg/logging" "github.com/uptrace/bun" ) const ( storageDatabase = "database" storageDisk = "disk" ) // CachedEvent represents an event stored in cache type CachedEvent struct { ID string `json:"id"` Event events.Event `json:"event"` AccountID string `json:"account_id,omitempty"` FromNumber string `json:"from_number,omitempty"` ToNumber string `json:"to_number,omitempty"` MessageID string `json:"message_id,omitempty"` Timestamp time.Time `json:"timestamp"` Reason string `json:"reason"` Attempts int `json:"attempts"` LastAttempt *time.Time `json:"last_attempt,omitempty"` } // MessageCache manages cached events when no webhooks are available type MessageCache struct { events map[string]*CachedEvent mu sync.RWMutex dataPath string enabled bool storage string dbType string db *bun.DB maxAge time.Duration // Maximum age before events are purged maxEvents int // Maximum number of events to keep } // Config holds cache configuration type Config struct { Enabled bool `json:"enabled"` Storage string `json:"storage"` DataPath string `json:"data_path"` DBType string `json:"db_type"` DB *bun.DB `json:"-"` MaxAge time.Duration `json:"max_age"` // Default: 7 days MaxEvents int `json:"max_events"` // Default: 10000 } // cacheDBRow is the persistence representation for database-backed cache entries. type cacheDBRow struct { ID string `bun:"id"` AccountID string `bun:"account_id"` EventType string `bun:"event_type"` EventData string `bun:"event_data"` MessageID string `bun:"message_id"` FromNumber string `bun:"from_number"` ToNumber string `bun:"to_number"` Reason string `bun:"reason"` Attempts int `bun:"attempts"` Timestamp time.Time `bun:"timestamp"` LastAttempt *time.Time `bun:"last_attempt"` } // TableName tells bun which table to use for cacheDBRow. func (cacheDBRow) TableName() string { return "message_cache" } // NewMessageCache creates a new message cache func NewMessageCache(cfg Config) (*MessageCache, error) { if !cfg.Enabled { return &MessageCache{enabled: false}, nil } if cfg.Storage == "" { cfg.Storage = storageDatabase } cfg.Storage = strings.ToLower(cfg.Storage) if cfg.Storage != storageDatabase && cfg.Storage != storageDisk { logging.Warn("Unknown message cache storage backend, defaulting to disk", "storage", cfg.Storage) cfg.Storage = storageDisk } if cfg.DataPath == "" { cfg.DataPath = "./data/cache" } if cfg.MaxAge == 0 { cfg.MaxAge = 7 * 24 * time.Hour // 7 days } if cfg.MaxEvents == 0 { cfg.MaxEvents = 10000 } if cfg.Storage == storageDisk { if err := os.MkdirAll(cfg.DataPath, 0755); err != nil { return nil, fmt.Errorf("failed to create cache directory: %w", err) } } cache := &MessageCache{ events: make(map[string]*CachedEvent), dataPath: cfg.DataPath, enabled: true, storage: cfg.Storage, dbType: strings.ToLower(cfg.DBType), db: cfg.DB, maxAge: cfg.MaxAge, maxEvents: cfg.MaxEvents, } if err := cache.loadPersistedEvents(); err != nil { return nil, err } // Start cleanup goroutine go cache.cleanupLoop() logging.Info("Message cache initialized", "enabled", cfg.Enabled, "storage", cache.storage, "data_path", cfg.DataPath, "max_age", cfg.MaxAge, "max_events", cfg.MaxEvents) return cache, nil } // ConfigureDatabase attaches a database to the cache and loads persisted entries. func (c *MessageCache) ConfigureDatabase(db *bun.DB) error { if !c.enabled || c.storage != storageDatabase { return nil } if db == nil { return fmt.Errorf("database handle is nil") } c.mu.Lock() c.db = db c.mu.Unlock() return c.loadPersistedEvents() } // Store adds an event to the cache func (c *MessageCache) Store(event events.Event, reason string) error { if !c.enabled { return nil } c.mu.Lock() defer c.mu.Unlock() // Check if we're at capacity if len(c.events) >= c.maxEvents { // Remove oldest event c.removeOldest() } // Generate unique ID id := fmt.Sprintf("%d-%s", time.Now().UnixNano(), event.Type) cached := &CachedEvent{ ID: id, Event: event, AccountID: stringFromEventData(event.Data, "account_id", "accountID"), FromNumber: stringFromEventData( event.Data, "from", "from_number", "fromNumber", ), ToNumber: stringFromEventData( event.Data, "to", "to_number", "toNumber", ), MessageID: stringFromEventData(event.Data, "message_id", "messageId"), Timestamp: time.Now(), Reason: reason, Attempts: 0, } c.events[id] = cached // Persist asynchronously go c.persistCachedEvent(cached) logging.Debug("Event cached", "event_id", id, "event_type", event.Type, "reason", reason, "cache_size", len(c.events)) return nil } // Get retrieves a cached event by ID func (c *MessageCache) Get(id string) (*CachedEvent, bool) { if !c.enabled { return nil, false } c.mu.RLock() defer c.mu.RUnlock() event, exists := c.events[id] return event, exists } // List returns all cached events func (c *MessageCache) List() []*CachedEvent { if !c.enabled { return nil } c.mu.RLock() defer c.mu.RUnlock() result := make([]*CachedEvent, 0, len(c.events)) for _, event := range c.events { result = append(result, event) } return result } // ListByEventType returns cached events filtered by event type func (c *MessageCache) ListByEventType(eventType events.EventType) []*CachedEvent { if !c.enabled { return nil } c.mu.RLock() defer c.mu.RUnlock() result := make([]*CachedEvent, 0) for _, cached := range c.events { if cached.Event.Type == eventType { result = append(result, cached) } } return result } // ListPaged returns cached events filtered by event type and paged by limit/offset. // Events are sorted by cache timestamp (newest first). // A limit <= 0 returns all events from offset onward. func (c *MessageCache) ListPaged(eventType events.EventType, limit, offset int) ([]*CachedEvent, int) { if !c.enabled { return nil, 0 } c.mu.RLock() defer c.mu.RUnlock() if offset < 0 { offset = 0 } filtered := make([]*CachedEvent, 0, len(c.events)) for _, cached := range c.events { if eventType != "" && cached.Event.Type != eventType { continue } filtered = append(filtered, cached) } sort.Slice(filtered, func(i, j int) bool { if filtered[i].Timestamp.Equal(filtered[j].Timestamp) { return filtered[i].ID > filtered[j].ID } return filtered[i].Timestamp.After(filtered[j].Timestamp) }) total := len(filtered) if offset >= total { return []*CachedEvent{}, total } if limit <= 0 { return filtered[offset:], total } end := offset + limit if end > total { end = total } return filtered[offset:end], total } // Remove deletes an event from the cache func (c *MessageCache) Remove(id string) error { if !c.enabled { return nil } c.mu.Lock() defer c.mu.Unlock() if _, exists := c.events[id]; !exists { return fmt.Errorf("cached event not found: %s", id) } delete(c.events, id) // Remove persisted record asynchronously go c.removePersistedEvent(id) logging.Debug("Event removed from cache", "event_id", id) return nil } // IncrementAttempts increments the delivery attempt counter func (c *MessageCache) IncrementAttempts(id string) error { if !c.enabled { return nil } c.mu.Lock() defer c.mu.Unlock() cached, exists := c.events[id] if !exists { return fmt.Errorf("cached event not found: %s", id) } now := time.Now() cached.Attempts++ cached.LastAttempt = &now // Persist asynchronously go c.persistCachedEvent(cached) return nil } // Clear removes all cached events func (c *MessageCache) Clear() error { if !c.enabled { return nil } c.mu.Lock() defer c.mu.Unlock() c.events = make(map[string]*CachedEvent) // Clear persisted cache asynchronously go c.clearPersistedEvents() logging.Info("Message cache cleared") return nil } // Count returns the number of cached events func (c *MessageCache) Count() int { if !c.enabled { return 0 } c.mu.RLock() defer c.mu.RUnlock() return len(c.events) } // IsEnabled returns whether the cache is enabled func (c *MessageCache) IsEnabled() bool { return c.enabled } // removeOldest removes the oldest event from the cache func (c *MessageCache) removeOldest() { var oldestID string var oldestTime time.Time for id, cached := range c.events { if oldestID == "" || cached.Timestamp.Before(oldestTime) { oldestID = id oldestTime = cached.Timestamp } } if oldestID != "" { delete(c.events, oldestID) go c.removePersistedEvent(oldestID) logging.Debug("Removed oldest cached event due to capacity", "event_id", oldestID) } } // cleanupLoop periodically removes expired events func (c *MessageCache) cleanupLoop() { ticker := time.NewTicker(1 * time.Hour) defer ticker.Stop() for range ticker.C { c.cleanup() } } // cleanup removes expired events func (c *MessageCache) cleanup() { if !c.enabled { return } c.mu.Lock() defer c.mu.Unlock() now := time.Now() expiredIDs := make([]string, 0) for id, cached := range c.events { if now.Sub(cached.Timestamp) > c.maxAge { expiredIDs = append(expiredIDs, id) } } for _, id := range expiredIDs { delete(c.events, id) go c.removePersistedEvent(id) } if len(expiredIDs) > 0 { logging.Info("Cleaned up expired cached events", "count", len(expiredIDs)) } } // loadPersistedEvents loads events from the configured persistence backend. func (c *MessageCache) loadPersistedEvents() error { switch c.storage { case storageDatabase: if c.db == nil { logging.Warn("Message cache database storage selected but database is not available yet") return nil } if err := c.ensureDatabaseTable(); err != nil { return err } if err := c.loadFromDatabase(); err != nil { return err } case storageDisk: if err := c.loadFromDisk(); err != nil { return err } } return nil } func (c *MessageCache) persistCachedEvent(cached *CachedEvent) { switch c.storage { case storageDatabase: c.saveToDatabase(cached) case storageDisk: c.saveToDisk(cached) } } func (c *MessageCache) removePersistedEvent(id string) { switch c.storage { case storageDatabase: c.removeFromDatabase(id) case storageDisk: c.removeFromDisk(id) } } func (c *MessageCache) clearPersistedEvents() { switch c.storage { case storageDatabase: c.clearDatabase() case storageDisk: c.clearDisk() } } // ensureDatabaseTable creates/updates the message_cache table shape for cache events. func (c *MessageCache) ensureDatabaseTable() error { if c.db == nil { return fmt.Errorf("database handle not set for message cache") } ctx := context.Background() switch c.dbType { case "postgres", "postgresql": queries := []string{ `CREATE TABLE IF NOT EXISTS message_cache ( id VARCHAR(128) PRIMARY KEY, account_id VARCHAR(64) NOT NULL DEFAULT '', event_type VARCHAR(100) NOT NULL, event_data JSONB NOT NULL, message_id VARCHAR(255) NOT NULL DEFAULT '', from_number VARCHAR(64) NOT NULL DEFAULT '', to_number VARCHAR(64) NOT NULL DEFAULT '', reason TEXT NOT NULL DEFAULT '', attempts INTEGER NOT NULL DEFAULT 0, timestamp TIMESTAMPTZ NOT NULL, last_attempt TIMESTAMPTZ, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() )`, `ALTER TABLE message_cache ADD COLUMN IF NOT EXISTS account_id VARCHAR(64) NOT NULL DEFAULT ''`, `ALTER TABLE message_cache ADD COLUMN IF NOT EXISTS event_type VARCHAR(100) NOT NULL DEFAULT ''`, `ALTER TABLE message_cache ADD COLUMN IF NOT EXISTS event_data JSONB NOT NULL DEFAULT '{}'::jsonb`, `ALTER TABLE message_cache ADD COLUMN IF NOT EXISTS message_id VARCHAR(255) NOT NULL DEFAULT ''`, `ALTER TABLE message_cache ADD COLUMN IF NOT EXISTS from_number VARCHAR(64) NOT NULL DEFAULT ''`, `ALTER TABLE message_cache ADD COLUMN IF NOT EXISTS to_number VARCHAR(64) NOT NULL DEFAULT ''`, `ALTER TABLE message_cache ADD COLUMN IF NOT EXISTS reason TEXT NOT NULL DEFAULT ''`, `ALTER TABLE message_cache ADD COLUMN IF NOT EXISTS attempts INTEGER NOT NULL DEFAULT 0`, `ALTER TABLE message_cache ADD COLUMN IF NOT EXISTS timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW()`, `ALTER TABLE message_cache ADD COLUMN IF NOT EXISTS last_attempt TIMESTAMPTZ`, `ALTER TABLE message_cache ADD COLUMN IF NOT EXISTS created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()`, `CREATE INDEX IF NOT EXISTS idx_message_cache_timestamp ON message_cache (timestamp DESC)`, `CREATE INDEX IF NOT EXISTS idx_message_cache_event_type ON message_cache (event_type)`, } for _, q := range queries { if _, err := c.db.ExecContext(ctx, q); err != nil { return fmt.Errorf("failed to ensure message_cache table (postgres): %w", err) } } default: queries := []string{ `CREATE TABLE IF NOT EXISTS message_cache ( id TEXT PRIMARY KEY, account_id TEXT NOT NULL DEFAULT '', event_type TEXT NOT NULL, event_data TEXT NOT NULL, message_id TEXT NOT NULL DEFAULT '', from_number TEXT NOT NULL DEFAULT '', to_number TEXT NOT NULL DEFAULT '', reason TEXT NOT NULL DEFAULT '', attempts INTEGER NOT NULL DEFAULT 0, timestamp DATETIME NOT NULL, last_attempt DATETIME, created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP )`, `CREATE INDEX IF NOT EXISTS idx_message_cache_timestamp ON message_cache (timestamp DESC)`, `CREATE INDEX IF NOT EXISTS idx_message_cache_event_type ON message_cache (event_type)`, } for _, q := range queries { if _, err := c.db.ExecContext(ctx, q); err != nil { return fmt.Errorf("failed to ensure message_cache table (sqlite): %w", err) } } if err := c.ensureSQLiteColumn("account_id", "TEXT NOT NULL DEFAULT ''"); err != nil { return err } if err := c.ensureSQLiteColumn("event_type", "TEXT NOT NULL DEFAULT ''"); err != nil { return err } if err := c.ensureSQLiteColumn("event_data", "TEXT NOT NULL DEFAULT '{}'"); err != nil { return err } if err := c.ensureSQLiteColumn("message_id", "TEXT NOT NULL DEFAULT ''"); err != nil { return err } if err := c.ensureSQLiteColumn("from_number", "TEXT NOT NULL DEFAULT ''"); err != nil { return err } if err := c.ensureSQLiteColumn("to_number", "TEXT NOT NULL DEFAULT ''"); err != nil { return err } if err := c.ensureSQLiteColumn("reason", "TEXT NOT NULL DEFAULT ''"); err != nil { return err } if err := c.ensureSQLiteColumn("attempts", "INTEGER NOT NULL DEFAULT 0"); err != nil { return err } if err := c.ensureSQLiteColumn("timestamp", "DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP"); err != nil { return err } if err := c.ensureSQLiteColumn("last_attempt", "DATETIME"); err != nil { return err } if err := c.ensureSQLiteColumn("created_at", "DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP"); err != nil { return err } } return nil } func (c *MessageCache) ensureSQLiteColumn(name, definition string) error { rows, err := c.db.QueryContext(context.Background(), `PRAGMA table_info(message_cache)`) if err != nil { return fmt.Errorf("failed to inspect sqlite message_cache table: %w", err) } defer rows.Close() exists := false for rows.Next() { var cid int var colName string var colType string var notNull int var defaultValue any var pk int if err := rows.Scan(&cid, &colName, &colType, ¬Null, &defaultValue, &pk); err != nil { return fmt.Errorf("failed to scan sqlite table info: %w", err) } if strings.EqualFold(colName, name) { exists = true break } } if err := rows.Err(); err != nil { return fmt.Errorf("failed reading sqlite table info: %w", err) } if exists { return nil } query := fmt.Sprintf("ALTER TABLE message_cache ADD COLUMN %s %s", name, definition) if _, err := c.db.ExecContext(context.Background(), query); err != nil { return fmt.Errorf("failed to add sqlite message_cache column %s: %w", name, err) } return nil } func (c *MessageCache) loadFromDatabase() error { if c.db == nil { return nil } ctx := context.Background() rows := make([]cacheDBRow, 0) if err := c.db.NewSelect(). Model(&rows). Table("message_cache"). Column("id", "account_id", "event_type", "event_data", "message_id", "from_number", "to_number", "reason", "attempts", "timestamp", "last_attempt"). Scan(ctx); err != nil { return fmt.Errorf("failed to load cached events from database: %w", err) } loaded := 0 now := time.Now() expiredIDs := make([]string, 0) c.mu.Lock() for _, row := range rows { var evt events.Event if err := json.Unmarshal([]byte(row.EventData), &evt); err != nil { logging.Warn("Failed to unmarshal cached event row", "event_id", row.ID, "error", err) continue } if evt.Type == "" { evt.Type = events.EventType(row.EventType) } if evt.Timestamp.IsZero() { evt.Timestamp = row.Timestamp } cached := &CachedEvent{ ID: row.ID, Event: evt, AccountID: firstNonEmpty(row.AccountID, stringFromEventData(evt.Data, "account_id", "accountID")), FromNumber: firstNonEmpty(row.FromNumber, stringFromEventData(evt.Data, "from", "from_number", "fromNumber")), ToNumber: firstNonEmpty(row.ToNumber, stringFromEventData(evt.Data, "to", "to_number", "toNumber")), MessageID: firstNonEmpty(row.MessageID, stringFromEventData(evt.Data, "message_id", "messageId")), Timestamp: row.Timestamp, Reason: row.Reason, Attempts: row.Attempts, LastAttempt: row.LastAttempt, } if now.Sub(cached.Timestamp) > c.maxAge { expiredIDs = append(expiredIDs, row.ID) continue } c.events[cached.ID] = cached loaded++ } c.mu.Unlock() for _, id := range expiredIDs { go c.removeFromDatabase(id) } if loaded > 0 { logging.Info("Loaded cached events from database", "count", loaded) } return nil } func (c *MessageCache) saveToDatabase(cached *CachedEvent) { if c.db == nil { return } eventData, err := json.Marshal(cached.Event) if err != nil { logging.Error("Failed to marshal cached event", "event_id", cached.ID, "error", err) return } row := cacheDBRow{ ID: cached.ID, AccountID: cached.AccountID, EventType: string(cached.Event.Type), EventData: string(eventData), MessageID: cached.MessageID, FromNumber: cached.FromNumber, ToNumber: cached.ToNumber, Reason: cached.Reason, Attempts: cached.Attempts, Timestamp: cached.Timestamp, LastAttempt: cached.LastAttempt, } _, err = c.db.NewInsert(). Model(&row). Table("message_cache"). On("CONFLICT (id) DO UPDATE"). Set("account_id = EXCLUDED.account_id"). Set("event_type = EXCLUDED.event_type"). Set("event_data = EXCLUDED.event_data"). Set("message_id = EXCLUDED.message_id"). Set("from_number = EXCLUDED.from_number"). Set("to_number = EXCLUDED.to_number"). Set("reason = EXCLUDED.reason"). Set("attempts = EXCLUDED.attempts"). Set("timestamp = EXCLUDED.timestamp"). Set("last_attempt = EXCLUDED.last_attempt"). Exec(context.Background()) if err != nil { logging.Error("Failed to persist cached event to database", "event_id", cached.ID, "error", err) } } func (c *MessageCache) removeFromDatabase(id string) { if c.db == nil { return } if _, err := c.db.NewDelete(). Table("message_cache"). Where("id = ?", id). Exec(context.Background()); err != nil { logging.Error("Failed to remove cached event from database", "event_id", id, "error", err) } } func (c *MessageCache) clearDatabase() { if c.db == nil { return } if _, err := c.db.NewDelete(). Table("message_cache"). Where("1 = 1"). Exec(context.Background()); err != nil { logging.Error("Failed to clear cached events from database", "error", err) } } // saveToDisk saves a cached event to disk func (c *MessageCache) saveToDisk(cached *CachedEvent) { filePath := filepath.Join(c.dataPath, fmt.Sprintf("%s.json", cached.ID)) data, err := json.MarshalIndent(cached, "", " ") if err != nil { logging.Error("Failed to marshal cached event", "event_id", cached.ID, "error", err) return } if err := os.WriteFile(filePath, data, 0644); err != nil { logging.Error("Failed to save cached event to disk", "event_id", cached.ID, "error", err) } } // loadFromDisk loads all cached events from disk func (c *MessageCache) loadFromDisk() error { files, err := filepath.Glob(filepath.Join(c.dataPath, "*.json")) if err != nil { return fmt.Errorf("failed to list cache files: %w", err) } loaded := 0 for _, file := range files { data, err := os.ReadFile(file) if err != nil { logging.Warn("Failed to read cache file", "file", file, "error", err) continue } var cached CachedEvent if err := json.Unmarshal(data, &cached); err != nil { logging.Warn("Failed to unmarshal cache file", "file", file, "error", err) continue } // Skip expired events if time.Since(cached.Timestamp) > c.maxAge { os.Remove(file) continue } c.events[cached.ID] = &cached loaded++ } if loaded > 0 { logging.Info("Loaded cached events from disk", "count", loaded) } return nil } // removeFromDisk removes a cached event file from disk func (c *MessageCache) removeFromDisk(id string) { filePath := filepath.Join(c.dataPath, fmt.Sprintf("%s.json", id)) if err := os.Remove(filePath); err != nil && !os.IsNotExist(err) { logging.Error("Failed to remove cached event from disk", "event_id", id, "error", err) } } // clearDisk removes all cache files from disk func (c *MessageCache) clearDisk() { files, err := filepath.Glob(filepath.Join(c.dataPath, "*.json")) if err != nil { logging.Error("Failed to list cache files for clearing", "error", err) return } for _, file := range files { if err := os.Remove(file); err != nil { logging.Error("Failed to remove cache file", "file", file, "error", err) } } } func stringFromEventData(data map[string]any, keys ...string) string { for _, key := range keys { value, ok := data[key] if !ok || value == nil { continue } switch typed := value.(type) { case string: if typed != "" { return typed } default: asString := fmt.Sprintf("%v", typed) if asString != "" && asString != "" { return asString } } } return "" } func firstNonEmpty(values ...string) string { for _, value := range values { if value != "" { return value } } return "" }