- Introduced MessageCachePage for browsing and managing cached webhook events. - Enhanced DashboardPage to display runtime stats and message cache information. - Added new API types for message cache events and system stats. - Integrated SwaggerPage for API documentation and live request testing.
878 lines
22 KiB
Go
878 lines
22 KiB
Go
package cache
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"git.warky.dev/wdevs/whatshooked/pkg/events"
|
|
"git.warky.dev/wdevs/whatshooked/pkg/logging"
|
|
"github.com/uptrace/bun"
|
|
)
|
|
|
|
const (
|
|
storageDatabase = "database"
|
|
storageDisk = "disk"
|
|
)
|
|
|
|
// CachedEvent represents an event stored in cache
|
|
type CachedEvent struct {
|
|
ID string `json:"id"`
|
|
Event events.Event `json:"event"`
|
|
AccountID string `json:"account_id,omitempty"`
|
|
FromNumber string `json:"from_number,omitempty"`
|
|
ToNumber string `json:"to_number,omitempty"`
|
|
MessageID string `json:"message_id,omitempty"`
|
|
Timestamp time.Time `json:"timestamp"`
|
|
Reason string `json:"reason"`
|
|
Attempts int `json:"attempts"`
|
|
LastAttempt *time.Time `json:"last_attempt,omitempty"`
|
|
}
|
|
|
|
// MessageCache manages cached events when no webhooks are available
|
|
type MessageCache struct {
|
|
events map[string]*CachedEvent
|
|
mu sync.RWMutex
|
|
dataPath string
|
|
enabled bool
|
|
storage string
|
|
dbType string
|
|
db *bun.DB
|
|
maxAge time.Duration // Maximum age before events are purged
|
|
maxEvents int // Maximum number of events to keep
|
|
}
|
|
|
|
// Config holds cache configuration
|
|
type Config struct {
|
|
Enabled bool `json:"enabled"`
|
|
Storage string `json:"storage"`
|
|
DataPath string `json:"data_path"`
|
|
DBType string `json:"db_type"`
|
|
DB *bun.DB `json:"-"`
|
|
MaxAge time.Duration `json:"max_age"` // Default: 7 days
|
|
MaxEvents int `json:"max_events"` // Default: 10000
|
|
}
|
|
|
|
// cacheDBRow is the persistence representation for database-backed cache entries.
|
|
type cacheDBRow struct {
|
|
ID string `bun:"id"`
|
|
AccountID string `bun:"account_id"`
|
|
EventType string `bun:"event_type"`
|
|
EventData string `bun:"event_data"`
|
|
MessageID string `bun:"message_id"`
|
|
FromNumber string `bun:"from_number"`
|
|
ToNumber string `bun:"to_number"`
|
|
Reason string `bun:"reason"`
|
|
Attempts int `bun:"attempts"`
|
|
Timestamp time.Time `bun:"timestamp"`
|
|
LastAttempt *time.Time `bun:"last_attempt"`
|
|
}
|
|
|
|
// TableName tells bun which table to use for cacheDBRow.
|
|
func (cacheDBRow) TableName() string {
|
|
return "message_cache"
|
|
}
|
|
|
|
// NewMessageCache creates a new message cache
|
|
func NewMessageCache(cfg Config) (*MessageCache, error) {
|
|
if !cfg.Enabled {
|
|
return &MessageCache{enabled: false}, nil
|
|
}
|
|
|
|
if cfg.Storage == "" {
|
|
cfg.Storage = storageDatabase
|
|
}
|
|
cfg.Storage = strings.ToLower(cfg.Storage)
|
|
if cfg.Storage != storageDatabase && cfg.Storage != storageDisk {
|
|
logging.Warn("Unknown message cache storage backend, defaulting to disk", "storage", cfg.Storage)
|
|
cfg.Storage = storageDisk
|
|
}
|
|
|
|
if cfg.DataPath == "" {
|
|
cfg.DataPath = "./data/cache"
|
|
}
|
|
if cfg.MaxAge == 0 {
|
|
cfg.MaxAge = 7 * 24 * time.Hour // 7 days
|
|
}
|
|
if cfg.MaxEvents == 0 {
|
|
cfg.MaxEvents = 10000
|
|
}
|
|
|
|
if cfg.Storage == storageDisk {
|
|
if err := os.MkdirAll(cfg.DataPath, 0755); err != nil {
|
|
return nil, fmt.Errorf("failed to create cache directory: %w", err)
|
|
}
|
|
}
|
|
|
|
cache := &MessageCache{
|
|
events: make(map[string]*CachedEvent),
|
|
dataPath: cfg.DataPath,
|
|
enabled: true,
|
|
storage: cfg.Storage,
|
|
dbType: strings.ToLower(cfg.DBType),
|
|
db: cfg.DB,
|
|
maxAge: cfg.MaxAge,
|
|
maxEvents: cfg.MaxEvents,
|
|
}
|
|
|
|
if err := cache.loadPersistedEvents(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Start cleanup goroutine
|
|
go cache.cleanupLoop()
|
|
|
|
logging.Info("Message cache initialized",
|
|
"enabled", cfg.Enabled,
|
|
"storage", cache.storage,
|
|
"data_path", cfg.DataPath,
|
|
"max_age", cfg.MaxAge,
|
|
"max_events", cfg.MaxEvents)
|
|
|
|
return cache, nil
|
|
}
|
|
|
|
// ConfigureDatabase attaches a database to the cache and loads persisted entries.
|
|
func (c *MessageCache) ConfigureDatabase(db *bun.DB) error {
|
|
if !c.enabled || c.storage != storageDatabase {
|
|
return nil
|
|
}
|
|
if db == nil {
|
|
return fmt.Errorf("database handle is nil")
|
|
}
|
|
|
|
c.mu.Lock()
|
|
c.db = db
|
|
c.mu.Unlock()
|
|
|
|
return c.loadPersistedEvents()
|
|
}
|
|
|
|
// Store adds an event to the cache
|
|
func (c *MessageCache) Store(event events.Event, reason string) error {
|
|
if !c.enabled {
|
|
return nil
|
|
}
|
|
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
// Check if we're at capacity
|
|
if len(c.events) >= c.maxEvents {
|
|
// Remove oldest event
|
|
c.removeOldest()
|
|
}
|
|
|
|
// Generate unique ID
|
|
id := fmt.Sprintf("%d-%s", time.Now().UnixNano(), event.Type)
|
|
|
|
cached := &CachedEvent{
|
|
ID: id,
|
|
Event: event,
|
|
AccountID: stringFromEventData(event.Data, "account_id", "accountID"),
|
|
FromNumber: stringFromEventData(
|
|
event.Data,
|
|
"from",
|
|
"from_number",
|
|
"fromNumber",
|
|
),
|
|
ToNumber: stringFromEventData(
|
|
event.Data,
|
|
"to",
|
|
"to_number",
|
|
"toNumber",
|
|
),
|
|
MessageID: stringFromEventData(event.Data, "message_id", "messageId"),
|
|
Timestamp: time.Now(),
|
|
Reason: reason,
|
|
Attempts: 0,
|
|
}
|
|
|
|
c.events[id] = cached
|
|
|
|
// Persist asynchronously
|
|
go c.persistCachedEvent(cached)
|
|
|
|
logging.Debug("Event cached",
|
|
"event_id", id,
|
|
"event_type", event.Type,
|
|
"reason", reason,
|
|
"cache_size", len(c.events))
|
|
|
|
return nil
|
|
}
|
|
|
|
// Get retrieves a cached event by ID
|
|
func (c *MessageCache) Get(id string) (*CachedEvent, bool) {
|
|
if !c.enabled {
|
|
return nil, false
|
|
}
|
|
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
|
|
event, exists := c.events[id]
|
|
return event, exists
|
|
}
|
|
|
|
// List returns all cached events
|
|
func (c *MessageCache) List() []*CachedEvent {
|
|
if !c.enabled {
|
|
return nil
|
|
}
|
|
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
|
|
result := make([]*CachedEvent, 0, len(c.events))
|
|
for _, event := range c.events {
|
|
result = append(result, event)
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
// ListByEventType returns cached events filtered by event type
|
|
func (c *MessageCache) ListByEventType(eventType events.EventType) []*CachedEvent {
|
|
if !c.enabled {
|
|
return nil
|
|
}
|
|
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
|
|
result := make([]*CachedEvent, 0)
|
|
for _, cached := range c.events {
|
|
if cached.Event.Type == eventType {
|
|
result = append(result, cached)
|
|
}
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
// ListPaged returns cached events filtered by event type and paged by limit/offset.
|
|
// Events are sorted by cache timestamp (newest first).
|
|
// A limit <= 0 returns all events from offset onward.
|
|
func (c *MessageCache) ListPaged(eventType events.EventType, limit, offset int) ([]*CachedEvent, int) {
|
|
if !c.enabled {
|
|
return nil, 0
|
|
}
|
|
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
|
|
if offset < 0 {
|
|
offset = 0
|
|
}
|
|
|
|
filtered := make([]*CachedEvent, 0, len(c.events))
|
|
for _, cached := range c.events {
|
|
if eventType != "" && cached.Event.Type != eventType {
|
|
continue
|
|
}
|
|
filtered = append(filtered, cached)
|
|
}
|
|
|
|
sort.Slice(filtered, func(i, j int) bool {
|
|
if filtered[i].Timestamp.Equal(filtered[j].Timestamp) {
|
|
return filtered[i].ID > filtered[j].ID
|
|
}
|
|
return filtered[i].Timestamp.After(filtered[j].Timestamp)
|
|
})
|
|
|
|
total := len(filtered)
|
|
if offset >= total {
|
|
return []*CachedEvent{}, total
|
|
}
|
|
|
|
if limit <= 0 {
|
|
return filtered[offset:], total
|
|
}
|
|
|
|
end := offset + limit
|
|
if end > total {
|
|
end = total
|
|
}
|
|
|
|
return filtered[offset:end], total
|
|
}
|
|
|
|
// Remove deletes an event from the cache
|
|
func (c *MessageCache) Remove(id string) error {
|
|
if !c.enabled {
|
|
return nil
|
|
}
|
|
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
if _, exists := c.events[id]; !exists {
|
|
return fmt.Errorf("cached event not found: %s", id)
|
|
}
|
|
|
|
delete(c.events, id)
|
|
|
|
// Remove persisted record asynchronously
|
|
go c.removePersistedEvent(id)
|
|
|
|
logging.Debug("Event removed from cache", "event_id", id)
|
|
|
|
return nil
|
|
}
|
|
|
|
// IncrementAttempts increments the delivery attempt counter
|
|
func (c *MessageCache) IncrementAttempts(id string) error {
|
|
if !c.enabled {
|
|
return nil
|
|
}
|
|
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
cached, exists := c.events[id]
|
|
if !exists {
|
|
return fmt.Errorf("cached event not found: %s", id)
|
|
}
|
|
|
|
now := time.Now()
|
|
cached.Attempts++
|
|
cached.LastAttempt = &now
|
|
|
|
// Persist asynchronously
|
|
go c.persistCachedEvent(cached)
|
|
|
|
return nil
|
|
}
|
|
|
|
// Clear removes all cached events
|
|
func (c *MessageCache) Clear() error {
|
|
if !c.enabled {
|
|
return nil
|
|
}
|
|
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
c.events = make(map[string]*CachedEvent)
|
|
|
|
// Clear persisted cache asynchronously
|
|
go c.clearPersistedEvents()
|
|
|
|
logging.Info("Message cache cleared")
|
|
|
|
return nil
|
|
}
|
|
|
|
// Count returns the number of cached events
|
|
func (c *MessageCache) Count() int {
|
|
if !c.enabled {
|
|
return 0
|
|
}
|
|
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
|
|
return len(c.events)
|
|
}
|
|
|
|
// IsEnabled returns whether the cache is enabled
|
|
func (c *MessageCache) IsEnabled() bool {
|
|
return c.enabled
|
|
}
|
|
|
|
// removeOldest removes the oldest event from the cache
|
|
func (c *MessageCache) removeOldest() {
|
|
var oldestID string
|
|
var oldestTime time.Time
|
|
|
|
for id, cached := range c.events {
|
|
if oldestID == "" || cached.Timestamp.Before(oldestTime) {
|
|
oldestID = id
|
|
oldestTime = cached.Timestamp
|
|
}
|
|
}
|
|
|
|
if oldestID != "" {
|
|
delete(c.events, oldestID)
|
|
go c.removePersistedEvent(oldestID)
|
|
logging.Debug("Removed oldest cached event due to capacity", "event_id", oldestID)
|
|
}
|
|
}
|
|
|
|
// cleanupLoop periodically removes expired events
|
|
func (c *MessageCache) cleanupLoop() {
|
|
ticker := time.NewTicker(1 * time.Hour)
|
|
defer ticker.Stop()
|
|
|
|
for range ticker.C {
|
|
c.cleanup()
|
|
}
|
|
}
|
|
|
|
// cleanup removes expired events
|
|
func (c *MessageCache) cleanup() {
|
|
if !c.enabled {
|
|
return
|
|
}
|
|
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
now := time.Now()
|
|
expiredIDs := make([]string, 0)
|
|
|
|
for id, cached := range c.events {
|
|
if now.Sub(cached.Timestamp) > c.maxAge {
|
|
expiredIDs = append(expiredIDs, id)
|
|
}
|
|
}
|
|
|
|
for _, id := range expiredIDs {
|
|
delete(c.events, id)
|
|
go c.removePersistedEvent(id)
|
|
}
|
|
|
|
if len(expiredIDs) > 0 {
|
|
logging.Info("Cleaned up expired cached events", "count", len(expiredIDs))
|
|
}
|
|
}
|
|
|
|
// loadPersistedEvents loads events from the configured persistence backend.
|
|
func (c *MessageCache) loadPersistedEvents() error {
|
|
switch c.storage {
|
|
case storageDatabase:
|
|
if c.db == nil {
|
|
logging.Warn("Message cache database storage selected but database is not available yet")
|
|
return nil
|
|
}
|
|
if err := c.ensureDatabaseTable(); err != nil {
|
|
return err
|
|
}
|
|
if err := c.loadFromDatabase(); err != nil {
|
|
return err
|
|
}
|
|
case storageDisk:
|
|
if err := c.loadFromDisk(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *MessageCache) persistCachedEvent(cached *CachedEvent) {
|
|
switch c.storage {
|
|
case storageDatabase:
|
|
c.saveToDatabase(cached)
|
|
case storageDisk:
|
|
c.saveToDisk(cached)
|
|
}
|
|
}
|
|
|
|
func (c *MessageCache) removePersistedEvent(id string) {
|
|
switch c.storage {
|
|
case storageDatabase:
|
|
c.removeFromDatabase(id)
|
|
case storageDisk:
|
|
c.removeFromDisk(id)
|
|
}
|
|
}
|
|
|
|
func (c *MessageCache) clearPersistedEvents() {
|
|
switch c.storage {
|
|
case storageDatabase:
|
|
c.clearDatabase()
|
|
case storageDisk:
|
|
c.clearDisk()
|
|
}
|
|
}
|
|
|
|
// ensureDatabaseTable creates/updates the message_cache table shape for cache events.
|
|
func (c *MessageCache) ensureDatabaseTable() error {
|
|
if c.db == nil {
|
|
return fmt.Errorf("database handle not set for message cache")
|
|
}
|
|
|
|
ctx := context.Background()
|
|
switch c.dbType {
|
|
case "postgres", "postgresql":
|
|
queries := []string{
|
|
`CREATE TABLE IF NOT EXISTS message_cache (
|
|
id VARCHAR(128) PRIMARY KEY,
|
|
account_id VARCHAR(64) NOT NULL DEFAULT '',
|
|
event_type VARCHAR(100) NOT NULL,
|
|
event_data JSONB NOT NULL,
|
|
message_id VARCHAR(255) NOT NULL DEFAULT '',
|
|
from_number VARCHAR(64) NOT NULL DEFAULT '',
|
|
to_number VARCHAR(64) NOT NULL DEFAULT '',
|
|
reason TEXT NOT NULL DEFAULT '',
|
|
attempts INTEGER NOT NULL DEFAULT 0,
|
|
timestamp TIMESTAMPTZ NOT NULL,
|
|
last_attempt TIMESTAMPTZ,
|
|
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
|
)`,
|
|
`ALTER TABLE message_cache ADD COLUMN IF NOT EXISTS account_id VARCHAR(64) NOT NULL DEFAULT ''`,
|
|
`ALTER TABLE message_cache ADD COLUMN IF NOT EXISTS event_type VARCHAR(100) NOT NULL DEFAULT ''`,
|
|
`ALTER TABLE message_cache ADD COLUMN IF NOT EXISTS event_data JSONB NOT NULL DEFAULT '{}'::jsonb`,
|
|
`ALTER TABLE message_cache ADD COLUMN IF NOT EXISTS message_id VARCHAR(255) NOT NULL DEFAULT ''`,
|
|
`ALTER TABLE message_cache ADD COLUMN IF NOT EXISTS from_number VARCHAR(64) NOT NULL DEFAULT ''`,
|
|
`ALTER TABLE message_cache ADD COLUMN IF NOT EXISTS to_number VARCHAR(64) NOT NULL DEFAULT ''`,
|
|
`ALTER TABLE message_cache ADD COLUMN IF NOT EXISTS reason TEXT NOT NULL DEFAULT ''`,
|
|
`ALTER TABLE message_cache ADD COLUMN IF NOT EXISTS attempts INTEGER NOT NULL DEFAULT 0`,
|
|
`ALTER TABLE message_cache ADD COLUMN IF NOT EXISTS timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW()`,
|
|
`ALTER TABLE message_cache ADD COLUMN IF NOT EXISTS last_attempt TIMESTAMPTZ`,
|
|
`ALTER TABLE message_cache ADD COLUMN IF NOT EXISTS created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()`,
|
|
`CREATE INDEX IF NOT EXISTS idx_message_cache_timestamp ON message_cache (timestamp DESC)`,
|
|
`CREATE INDEX IF NOT EXISTS idx_message_cache_event_type ON message_cache (event_type)`,
|
|
}
|
|
for _, q := range queries {
|
|
if _, err := c.db.ExecContext(ctx, q); err != nil {
|
|
return fmt.Errorf("failed to ensure message_cache table (postgres): %w", err)
|
|
}
|
|
}
|
|
default:
|
|
queries := []string{
|
|
`CREATE TABLE IF NOT EXISTS message_cache (
|
|
id TEXT PRIMARY KEY,
|
|
account_id TEXT NOT NULL DEFAULT '',
|
|
event_type TEXT NOT NULL,
|
|
event_data TEXT NOT NULL,
|
|
message_id TEXT NOT NULL DEFAULT '',
|
|
from_number TEXT NOT NULL DEFAULT '',
|
|
to_number TEXT NOT NULL DEFAULT '',
|
|
reason TEXT NOT NULL DEFAULT '',
|
|
attempts INTEGER NOT NULL DEFAULT 0,
|
|
timestamp DATETIME NOT NULL,
|
|
last_attempt DATETIME,
|
|
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
|
|
)`,
|
|
`CREATE INDEX IF NOT EXISTS idx_message_cache_timestamp ON message_cache (timestamp DESC)`,
|
|
`CREATE INDEX IF NOT EXISTS idx_message_cache_event_type ON message_cache (event_type)`,
|
|
}
|
|
for _, q := range queries {
|
|
if _, err := c.db.ExecContext(ctx, q); err != nil {
|
|
return fmt.Errorf("failed to ensure message_cache table (sqlite): %w", err)
|
|
}
|
|
}
|
|
if err := c.ensureSQLiteColumn("account_id", "TEXT NOT NULL DEFAULT ''"); err != nil {
|
|
return err
|
|
}
|
|
if err := c.ensureSQLiteColumn("event_type", "TEXT NOT NULL DEFAULT ''"); err != nil {
|
|
return err
|
|
}
|
|
if err := c.ensureSQLiteColumn("event_data", "TEXT NOT NULL DEFAULT '{}'"); err != nil {
|
|
return err
|
|
}
|
|
if err := c.ensureSQLiteColumn("message_id", "TEXT NOT NULL DEFAULT ''"); err != nil {
|
|
return err
|
|
}
|
|
if err := c.ensureSQLiteColumn("from_number", "TEXT NOT NULL DEFAULT ''"); err != nil {
|
|
return err
|
|
}
|
|
if err := c.ensureSQLiteColumn("to_number", "TEXT NOT NULL DEFAULT ''"); err != nil {
|
|
return err
|
|
}
|
|
if err := c.ensureSQLiteColumn("reason", "TEXT NOT NULL DEFAULT ''"); err != nil {
|
|
return err
|
|
}
|
|
if err := c.ensureSQLiteColumn("attempts", "INTEGER NOT NULL DEFAULT 0"); err != nil {
|
|
return err
|
|
}
|
|
if err := c.ensureSQLiteColumn("timestamp", "DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP"); err != nil {
|
|
return err
|
|
}
|
|
if err := c.ensureSQLiteColumn("last_attempt", "DATETIME"); err != nil {
|
|
return err
|
|
}
|
|
if err := c.ensureSQLiteColumn("created_at", "DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP"); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *MessageCache) ensureSQLiteColumn(name, definition string) error {
|
|
rows, err := c.db.QueryContext(context.Background(), `PRAGMA table_info(message_cache)`)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to inspect sqlite message_cache table: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
exists := false
|
|
for rows.Next() {
|
|
var cid int
|
|
var colName string
|
|
var colType string
|
|
var notNull int
|
|
var defaultValue any
|
|
var pk int
|
|
if err := rows.Scan(&cid, &colName, &colType, ¬Null, &defaultValue, &pk); err != nil {
|
|
return fmt.Errorf("failed to scan sqlite table info: %w", err)
|
|
}
|
|
if strings.EqualFold(colName, name) {
|
|
exists = true
|
|
break
|
|
}
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return fmt.Errorf("failed reading sqlite table info: %w", err)
|
|
}
|
|
if exists {
|
|
return nil
|
|
}
|
|
|
|
query := fmt.Sprintf("ALTER TABLE message_cache ADD COLUMN %s %s", name, definition)
|
|
if _, err := c.db.ExecContext(context.Background(), query); err != nil {
|
|
return fmt.Errorf("failed to add sqlite message_cache column %s: %w", name, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *MessageCache) loadFromDatabase() error {
|
|
if c.db == nil {
|
|
return nil
|
|
}
|
|
|
|
ctx := context.Background()
|
|
rows := make([]cacheDBRow, 0)
|
|
if err := c.db.NewSelect().
|
|
Model(&rows).
|
|
Table("message_cache").
|
|
Column("id", "account_id", "event_type", "event_data", "message_id", "from_number", "to_number", "reason", "attempts", "timestamp", "last_attempt").
|
|
Scan(ctx); err != nil {
|
|
return fmt.Errorf("failed to load cached events from database: %w", err)
|
|
}
|
|
|
|
loaded := 0
|
|
now := time.Now()
|
|
expiredIDs := make([]string, 0)
|
|
|
|
c.mu.Lock()
|
|
for _, row := range rows {
|
|
var evt events.Event
|
|
if err := json.Unmarshal([]byte(row.EventData), &evt); err != nil {
|
|
logging.Warn("Failed to unmarshal cached event row", "event_id", row.ID, "error", err)
|
|
continue
|
|
}
|
|
if evt.Type == "" {
|
|
evt.Type = events.EventType(row.EventType)
|
|
}
|
|
if evt.Timestamp.IsZero() {
|
|
evt.Timestamp = row.Timestamp
|
|
}
|
|
|
|
cached := &CachedEvent{
|
|
ID: row.ID,
|
|
Event: evt,
|
|
AccountID: firstNonEmpty(row.AccountID, stringFromEventData(evt.Data, "account_id", "accountID")),
|
|
FromNumber: firstNonEmpty(row.FromNumber, stringFromEventData(evt.Data, "from", "from_number", "fromNumber")),
|
|
ToNumber: firstNonEmpty(row.ToNumber, stringFromEventData(evt.Data, "to", "to_number", "toNumber")),
|
|
MessageID: firstNonEmpty(row.MessageID, stringFromEventData(evt.Data, "message_id", "messageId")),
|
|
Timestamp: row.Timestamp,
|
|
Reason: row.Reason,
|
|
Attempts: row.Attempts,
|
|
LastAttempt: row.LastAttempt,
|
|
}
|
|
if now.Sub(cached.Timestamp) > c.maxAge {
|
|
expiredIDs = append(expiredIDs, row.ID)
|
|
continue
|
|
}
|
|
c.events[cached.ID] = cached
|
|
loaded++
|
|
}
|
|
c.mu.Unlock()
|
|
|
|
for _, id := range expiredIDs {
|
|
go c.removeFromDatabase(id)
|
|
}
|
|
|
|
if loaded > 0 {
|
|
logging.Info("Loaded cached events from database", "count", loaded)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *MessageCache) saveToDatabase(cached *CachedEvent) {
|
|
if c.db == nil {
|
|
return
|
|
}
|
|
|
|
eventData, err := json.Marshal(cached.Event)
|
|
if err != nil {
|
|
logging.Error("Failed to marshal cached event", "event_id", cached.ID, "error", err)
|
|
return
|
|
}
|
|
|
|
row := cacheDBRow{
|
|
ID: cached.ID,
|
|
AccountID: cached.AccountID,
|
|
EventType: string(cached.Event.Type),
|
|
EventData: string(eventData),
|
|
MessageID: cached.MessageID,
|
|
FromNumber: cached.FromNumber,
|
|
ToNumber: cached.ToNumber,
|
|
Reason: cached.Reason,
|
|
Attempts: cached.Attempts,
|
|
Timestamp: cached.Timestamp,
|
|
LastAttempt: cached.LastAttempt,
|
|
}
|
|
|
|
_, err = c.db.NewInsert().
|
|
Model(&row).
|
|
Table("message_cache").
|
|
On("CONFLICT (id) DO UPDATE").
|
|
Set("account_id = EXCLUDED.account_id").
|
|
Set("event_type = EXCLUDED.event_type").
|
|
Set("event_data = EXCLUDED.event_data").
|
|
Set("message_id = EXCLUDED.message_id").
|
|
Set("from_number = EXCLUDED.from_number").
|
|
Set("to_number = EXCLUDED.to_number").
|
|
Set("reason = EXCLUDED.reason").
|
|
Set("attempts = EXCLUDED.attempts").
|
|
Set("timestamp = EXCLUDED.timestamp").
|
|
Set("last_attempt = EXCLUDED.last_attempt").
|
|
Exec(context.Background())
|
|
if err != nil {
|
|
logging.Error("Failed to persist cached event to database", "event_id", cached.ID, "error", err)
|
|
}
|
|
}
|
|
|
|
func (c *MessageCache) removeFromDatabase(id string) {
|
|
if c.db == nil {
|
|
return
|
|
}
|
|
|
|
if _, err := c.db.NewDelete().
|
|
Table("message_cache").
|
|
Where("id = ?", id).
|
|
Exec(context.Background()); err != nil {
|
|
logging.Error("Failed to remove cached event from database", "event_id", id, "error", err)
|
|
}
|
|
}
|
|
|
|
func (c *MessageCache) clearDatabase() {
|
|
if c.db == nil {
|
|
return
|
|
}
|
|
|
|
if _, err := c.db.NewDelete().
|
|
Table("message_cache").
|
|
Where("1 = 1").
|
|
Exec(context.Background()); err != nil {
|
|
logging.Error("Failed to clear cached events from database", "error", err)
|
|
}
|
|
}
|
|
|
|
// saveToDisk saves a cached event to disk
|
|
func (c *MessageCache) saveToDisk(cached *CachedEvent) {
|
|
filePath := filepath.Join(c.dataPath, fmt.Sprintf("%s.json", cached.ID))
|
|
|
|
data, err := json.MarshalIndent(cached, "", " ")
|
|
if err != nil {
|
|
logging.Error("Failed to marshal cached event", "event_id", cached.ID, "error", err)
|
|
return
|
|
}
|
|
|
|
if err := os.WriteFile(filePath, data, 0644); err != nil {
|
|
logging.Error("Failed to save cached event to disk", "event_id", cached.ID, "error", err)
|
|
}
|
|
}
|
|
|
|
// loadFromDisk loads all cached events from disk
|
|
func (c *MessageCache) loadFromDisk() error {
|
|
files, err := filepath.Glob(filepath.Join(c.dataPath, "*.json"))
|
|
if err != nil {
|
|
return fmt.Errorf("failed to list cache files: %w", err)
|
|
}
|
|
|
|
loaded := 0
|
|
for _, file := range files {
|
|
data, err := os.ReadFile(file)
|
|
if err != nil {
|
|
logging.Warn("Failed to read cache file", "file", file, "error", err)
|
|
continue
|
|
}
|
|
|
|
var cached CachedEvent
|
|
if err := json.Unmarshal(data, &cached); err != nil {
|
|
logging.Warn("Failed to unmarshal cache file", "file", file, "error", err)
|
|
continue
|
|
}
|
|
|
|
// Skip expired events
|
|
if time.Since(cached.Timestamp) > c.maxAge {
|
|
os.Remove(file)
|
|
continue
|
|
}
|
|
|
|
c.events[cached.ID] = &cached
|
|
loaded++
|
|
}
|
|
|
|
if loaded > 0 {
|
|
logging.Info("Loaded cached events from disk", "count", loaded)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// removeFromDisk removes a cached event file from disk
|
|
func (c *MessageCache) removeFromDisk(id string) {
|
|
filePath := filepath.Join(c.dataPath, fmt.Sprintf("%s.json", id))
|
|
if err := os.Remove(filePath); err != nil && !os.IsNotExist(err) {
|
|
logging.Error("Failed to remove cached event from disk", "event_id", id, "error", err)
|
|
}
|
|
}
|
|
|
|
// clearDisk removes all cache files from disk
|
|
func (c *MessageCache) clearDisk() {
|
|
files, err := filepath.Glob(filepath.Join(c.dataPath, "*.json"))
|
|
if err != nil {
|
|
logging.Error("Failed to list cache files for clearing", "error", err)
|
|
return
|
|
}
|
|
|
|
for _, file := range files {
|
|
if err := os.Remove(file); err != nil {
|
|
logging.Error("Failed to remove cache file", "file", file, "error", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func stringFromEventData(data map[string]any, keys ...string) string {
|
|
for _, key := range keys {
|
|
value, ok := data[key]
|
|
if !ok || value == nil {
|
|
continue
|
|
}
|
|
switch typed := value.(type) {
|
|
case string:
|
|
if typed != "" {
|
|
return typed
|
|
}
|
|
default:
|
|
asString := fmt.Sprintf("%v", typed)
|
|
if asString != "" && asString != "<nil>" {
|
|
return asString
|
|
}
|
|
}
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func firstNonEmpty(values ...string) string {
|
|
for _, value := range values {
|
|
if value != "" {
|
|
return value
|
|
}
|
|
}
|
|
return ""
|
|
}
|