- Introduced MessageCachePage for browsing and managing cached webhook events. - Enhanced DashboardPage to display runtime stats and message cache information. - Added new API types for message cache events and system stats. - Integrated SwaggerPage for API documentation and live request testing.
488 lines
15 KiB
Go
488 lines
15 KiB
Go
package storage
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"fmt"
|
|
"time"
|
|
|
|
"git.warky.dev/wdevs/whatshooked/pkg/config"
|
|
"git.warky.dev/wdevs/whatshooked/pkg/models"
|
|
"github.com/uptrace/bun"
|
|
"github.com/uptrace/bun/dialect/pgdialect"
|
|
"github.com/uptrace/bun/dialect/sqlitedialect"
|
|
"github.com/uptrace/bun/driver/pgdriver"
|
|
"github.com/uptrace/bun/driver/sqliteshim"
|
|
"github.com/uptrace/bun/extra/bundebug"
|
|
)
|
|
|
|
// DB is the global database instance
|
|
var DB *bun.DB
|
|
var dbType string // Store the database type for later use
|
|
|
|
// Initialize sets up the database connection based on configuration
|
|
func Initialize(cfg *config.DatabaseConfig) error {
|
|
var sqldb *sql.DB
|
|
var err error
|
|
|
|
dbType = cfg.Type
|
|
switch cfg.Type {
|
|
case "postgres", "postgresql":
|
|
dsn := fmt.Sprintf("postgres://%s:%s@%s:%d/%s?sslmode=disable",
|
|
cfg.Username, cfg.Password, cfg.Host, cfg.Port, cfg.Database)
|
|
sqldb = sql.OpenDB(pgdriver.NewConnector(pgdriver.WithDSN(dsn)))
|
|
DB = bun.NewDB(sqldb, pgdialect.New())
|
|
|
|
case "sqlite":
|
|
sqldb, err = sql.Open(sqliteshim.ShimName, cfg.SQLitePath)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to open sqlite database: %w", err)
|
|
}
|
|
DB = bun.NewDB(sqldb, sqlitedialect.New())
|
|
|
|
default:
|
|
return fmt.Errorf("unsupported database type: %s", cfg.Type)
|
|
}
|
|
|
|
// Add query hook for debugging (optional, can be removed in production)
|
|
DB.AddQueryHook(bundebug.NewQueryHook(
|
|
bundebug.WithVerbose(true),
|
|
bundebug.FromEnv("BUNDEBUG"),
|
|
))
|
|
|
|
// Set connection pool settings
|
|
sqldb.SetMaxIdleConns(10)
|
|
sqldb.SetMaxOpenConns(100)
|
|
sqldb.SetConnMaxLifetime(time.Hour)
|
|
|
|
// Test the connection
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
if err := sqldb.PingContext(ctx); err != nil {
|
|
return fmt.Errorf("failed to ping database: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// CreateTables creates database tables based on BUN models
|
|
func CreateTables(ctx context.Context) error {
|
|
if DB == nil {
|
|
return fmt.Errorf("database not initialized")
|
|
}
|
|
|
|
// For SQLite, use raw SQL with compatible syntax
|
|
if dbType == "sqlite" {
|
|
return createTablesSQLite(ctx)
|
|
}
|
|
|
|
// For PostgreSQL, use BUN's auto-generation
|
|
models := []interface{}{
|
|
(*models.ModelPublicUsers)(nil),
|
|
(*models.ModelPublicAPIKey)(nil),
|
|
(*models.ModelPublicHook)(nil),
|
|
(*models.ModelPublicWhatsappAccount)(nil),
|
|
(*models.ModelPublicEventLog)(nil),
|
|
(*models.ModelPublicSession)(nil),
|
|
}
|
|
|
|
for _, model := range models {
|
|
_, err := DB.NewCreateTable().Model(model).IfNotExists().Exec(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create table: %w", err)
|
|
}
|
|
}
|
|
|
|
if err := ensureMessageCacheTable(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// createTablesSQLite creates tables using SQLite-compatible SQL
|
|
func createTablesSQLite(ctx context.Context) error {
|
|
tables := []string{
|
|
// Users table
|
|
`CREATE TABLE IF NOT EXISTS users (
|
|
id VARCHAR(36) PRIMARY KEY,
|
|
username VARCHAR(255) NOT NULL UNIQUE,
|
|
email VARCHAR(255) NOT NULL UNIQUE,
|
|
password VARCHAR(255) NOT NULL,
|
|
full_name VARCHAR(255),
|
|
role VARCHAR(50) NOT NULL DEFAULT 'user',
|
|
active BOOLEAN NOT NULL DEFAULT 1,
|
|
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
|
deleted_at TIMESTAMP
|
|
)`,
|
|
|
|
// API Keys table
|
|
`CREATE TABLE IF NOT EXISTS api_keys (
|
|
id VARCHAR(36) PRIMARY KEY,
|
|
user_id VARCHAR(36) NOT NULL,
|
|
name VARCHAR(255) NOT NULL,
|
|
key VARCHAR(255) NOT NULL UNIQUE,
|
|
key_prefix VARCHAR(20),
|
|
permissions TEXT,
|
|
active BOOLEAN NOT NULL DEFAULT 1,
|
|
last_used_at TIMESTAMP,
|
|
expires_at TIMESTAMP,
|
|
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
|
deleted_at TIMESTAMP,
|
|
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
|
|
)`,
|
|
|
|
// Hooks table
|
|
`CREATE TABLE IF NOT EXISTS hooks (
|
|
id VARCHAR(36) PRIMARY KEY,
|
|
user_id VARCHAR(36) NOT NULL,
|
|
name VARCHAR(255) NOT NULL,
|
|
url TEXT NOT NULL,
|
|
method VARCHAR(10) NOT NULL DEFAULT 'POST',
|
|
headers TEXT,
|
|
events TEXT,
|
|
active BOOLEAN NOT NULL DEFAULT 1,
|
|
allow_insecure BOOLEAN NOT NULL DEFAULT 0,
|
|
retry_count INTEGER NOT NULL DEFAULT 3,
|
|
timeout INTEGER NOT NULL DEFAULT 30,
|
|
secret VARCHAR(255),
|
|
description TEXT,
|
|
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
|
deleted_at TIMESTAMP,
|
|
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
|
|
)`,
|
|
|
|
// WhatsApp Accounts table
|
|
`CREATE TABLE IF NOT EXISTS whatsapp_account (
|
|
id VARCHAR(36) PRIMARY KEY,
|
|
user_id VARCHAR(36) NOT NULL,
|
|
phone_number VARCHAR(20) NOT NULL UNIQUE,
|
|
display_name VARCHAR(255),
|
|
account_type VARCHAR(50) NOT NULL DEFAULT 'whatsmeow',
|
|
config TEXT,
|
|
active BOOLEAN NOT NULL DEFAULT 1,
|
|
status VARCHAR(50) NOT NULL DEFAULT 'disconnected',
|
|
session_path TEXT,
|
|
last_connected_at TIMESTAMP,
|
|
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
|
deleted_at TIMESTAMP,
|
|
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
|
|
)`,
|
|
|
|
// Event Logs table
|
|
`CREATE TABLE IF NOT EXISTS event_logs (
|
|
id VARCHAR(36) PRIMARY KEY,
|
|
user_id VARCHAR(36),
|
|
account_id VARCHAR(36),
|
|
event_type VARCHAR(100) NOT NULL,
|
|
event_data TEXT,
|
|
from_number VARCHAR(20),
|
|
to_number VARCHAR(20),
|
|
message_id VARCHAR(255),
|
|
status VARCHAR(50),
|
|
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
|
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE SET NULL,
|
|
FOREIGN KEY (account_id) REFERENCES whatsapp_account(id) ON DELETE SET NULL
|
|
)`,
|
|
|
|
// Sessions table
|
|
`CREATE TABLE IF NOT EXISTS sessions (
|
|
id VARCHAR(36) PRIMARY KEY,
|
|
user_id VARCHAR(36) NOT NULL,
|
|
token VARCHAR(500) NOT NULL UNIQUE,
|
|
expires_at TIMESTAMP NOT NULL,
|
|
ip_address VARCHAR(45),
|
|
user_agent TEXT,
|
|
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
|
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
|
|
)`,
|
|
|
|
// Message Cache table
|
|
`CREATE TABLE IF NOT EXISTS message_cache (
|
|
id VARCHAR(128) PRIMARY KEY,
|
|
account_id VARCHAR(64) NOT NULL DEFAULT '',
|
|
event_type VARCHAR(100) NOT NULL,
|
|
event_data TEXT NOT NULL,
|
|
message_id VARCHAR(255) NOT NULL DEFAULT '',
|
|
from_number VARCHAR(64) NOT NULL DEFAULT '',
|
|
to_number VARCHAR(64) NOT NULL DEFAULT '',
|
|
reason TEXT NOT NULL DEFAULT '',
|
|
attempts INTEGER NOT NULL DEFAULT 0,
|
|
timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
|
last_attempt TIMESTAMP,
|
|
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
|
|
)`,
|
|
|
|
`CREATE INDEX IF NOT EXISTS idx_message_cache_timestamp ON message_cache(timestamp DESC)`,
|
|
`CREATE INDEX IF NOT EXISTS idx_message_cache_event_type ON message_cache(event_type)`,
|
|
}
|
|
|
|
for _, sql := range tables {
|
|
if _, err := DB.ExecContext(ctx, sql); err != nil {
|
|
return fmt.Errorf("failed to create table: %w", err)
|
|
}
|
|
}
|
|
|
|
if err := migrateLegacyWhatsAppAccountsTable(ctx); err != nil {
|
|
return err
|
|
}
|
|
if err := ensureWhatsAppAccountColumnsSQLite(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := ensureMessageCacheTable(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func migrateLegacyWhatsAppAccountsTable(ctx context.Context) error {
|
|
if DB == nil {
|
|
return fmt.Errorf("database not initialized")
|
|
}
|
|
|
|
var legacyCount int
|
|
if err := DB.NewSelect().
|
|
Table("sqlite_master").
|
|
ColumnExpr("COUNT(1)").
|
|
Where("type = 'table' AND name = 'whatsapp_accounts'").
|
|
Scan(ctx, &legacyCount); err != nil {
|
|
return fmt.Errorf("failed to inspect legacy whatsapp_accounts table: %w", err)
|
|
}
|
|
|
|
var currentCount int
|
|
if err := DB.NewSelect().
|
|
Table("sqlite_master").
|
|
ColumnExpr("COUNT(1)").
|
|
Where("type = 'table' AND name = 'whatsapp_account'").
|
|
Scan(ctx, ¤tCount); err != nil {
|
|
return fmt.Errorf("failed to inspect whatsapp_account table: %w", err)
|
|
}
|
|
|
|
if legacyCount > 0 {
|
|
if currentCount == 0 {
|
|
if _, err := DB.ExecContext(ctx, `ALTER TABLE whatsapp_accounts RENAME TO whatsapp_account`); err != nil {
|
|
return fmt.Errorf("failed to migrate table whatsapp_accounts -> whatsapp_account: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
mergeSQL := `INSERT OR IGNORE INTO whatsapp_account
|
|
(id, user_id, phone_number, display_name, account_type, config, active, status, session_path, last_connected_at, created_at, updated_at, deleted_at)
|
|
SELECT
|
|
id,
|
|
user_id,
|
|
phone_number,
|
|
'' AS display_name,
|
|
COALESCE(account_type, 'whatsmeow') AS account_type,
|
|
COALESCE(business_api_config, '') AS config,
|
|
COALESCE(active, 1) AS active,
|
|
CASE
|
|
WHEN COALESCE(active, 1) = 0 THEN 'disconnected'
|
|
WHEN COALESCE(connected, 0) = 1 THEN 'connected'
|
|
ELSE 'disconnected'
|
|
END AS status,
|
|
'' AS session_path,
|
|
last_connected_at,
|
|
created_at,
|
|
updated_at,
|
|
deleted_at
|
|
FROM whatsapp_accounts`
|
|
if _, err := DB.ExecContext(ctx, mergeSQL); err != nil {
|
|
return fmt.Errorf("failed to merge legacy whatsapp_accounts data: %w", err)
|
|
}
|
|
if _, err := DB.ExecContext(ctx, `DROP TABLE whatsapp_accounts`); err != nil {
|
|
return fmt.Errorf("failed to drop legacy whatsapp_accounts table: %w", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func ensureWhatsAppAccountColumnsSQLite(ctx context.Context) error {
|
|
if err := ensureSQLiteColumn(ctx, "whatsapp_account", "display_name", "VARCHAR(255)"); err != nil {
|
|
return err
|
|
}
|
|
if err := ensureSQLiteColumn(ctx, "whatsapp_account", "config", "TEXT"); err != nil {
|
|
return err
|
|
}
|
|
if err := ensureSQLiteColumn(ctx, "whatsapp_account", "status", "VARCHAR(50) NOT NULL DEFAULT 'disconnected'"); err != nil {
|
|
return err
|
|
}
|
|
if err := ensureSQLiteColumn(ctx, "whatsapp_account", "session_path", "TEXT"); err != nil {
|
|
return err
|
|
}
|
|
if err := ensureSQLiteColumn(ctx, "whatsapp_account", "updated_at", "TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP"); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Backfill config/status from legacy columns if they still exist on this table.
|
|
if hasBusinessAPIConfig, err := sqliteColumnExists(ctx, "whatsapp_account", "business_api_config"); err != nil {
|
|
return err
|
|
} else if hasBusinessAPIConfig {
|
|
if _, err := DB.ExecContext(ctx, `UPDATE whatsapp_account SET config = COALESCE(config, business_api_config, '')`); err != nil {
|
|
return fmt.Errorf("failed to backfill config from business_api_config: %w", err)
|
|
}
|
|
}
|
|
|
|
if hasConnected, err := sqliteColumnExists(ctx, "whatsapp_account", "connected"); err != nil {
|
|
return err
|
|
} else if hasConnected {
|
|
if _, err := DB.ExecContext(ctx, `UPDATE whatsapp_account
|
|
SET status = CASE
|
|
WHEN COALESCE(active, 1) = 0 THEN 'disconnected'
|
|
WHEN COALESCE(connected, 0) = 1 THEN 'connected'
|
|
WHEN status = '' OR status IS NULL THEN 'disconnected'
|
|
ELSE status
|
|
END`); err != nil {
|
|
return fmt.Errorf("failed to backfill status from connected column: %w", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func sqliteColumnExists(ctx context.Context, table, column string) (bool, error) {
|
|
rows, err := DB.QueryContext(ctx, fmt.Sprintf("PRAGMA table_info(%s)", table))
|
|
if err != nil {
|
|
return false, fmt.Errorf("failed to inspect sqlite table %s: %w", table, err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
for rows.Next() {
|
|
var cid int
|
|
var colName string
|
|
var colType string
|
|
var notNull int
|
|
var defaultValue any
|
|
var pk int
|
|
if err := rows.Scan(&cid, &colName, &colType, ¬Null, &defaultValue, &pk); err != nil {
|
|
return false, fmt.Errorf("failed to scan sqlite table info for %s: %w", table, err)
|
|
}
|
|
if colName == column {
|
|
return true, nil
|
|
}
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return false, fmt.Errorf("failed reading sqlite table info for %s: %w", table, err)
|
|
}
|
|
return false, nil
|
|
}
|
|
|
|
func ensureSQLiteColumn(ctx context.Context, table, name, definition string) error {
|
|
exists, err := sqliteColumnExists(ctx, table, name)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if exists {
|
|
return nil
|
|
}
|
|
|
|
query := fmt.Sprintf("ALTER TABLE %s ADD COLUMN %s %s", table, name, definition)
|
|
if _, err := DB.ExecContext(ctx, query); err != nil {
|
|
return fmt.Errorf("failed to add sqlite column %s.%s: %w", table, name, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func ensureMessageCacheTable(ctx context.Context) error {
|
|
if DB == nil {
|
|
return fmt.Errorf("database not initialized")
|
|
}
|
|
|
|
if dbType == "postgres" || dbType == "postgresql" {
|
|
queries := []string{
|
|
`CREATE TABLE IF NOT EXISTS message_cache (
|
|
id VARCHAR(128) PRIMARY KEY,
|
|
account_id VARCHAR(64) NOT NULL DEFAULT '',
|
|
event_type VARCHAR(100) NOT NULL,
|
|
event_data JSONB NOT NULL,
|
|
message_id VARCHAR(255) NOT NULL DEFAULT '',
|
|
from_number VARCHAR(64) NOT NULL DEFAULT '',
|
|
to_number VARCHAR(64) NOT NULL DEFAULT '',
|
|
reason TEXT NOT NULL DEFAULT '',
|
|
attempts INTEGER NOT NULL DEFAULT 0,
|
|
timestamp TIMESTAMPTZ NOT NULL,
|
|
last_attempt TIMESTAMPTZ,
|
|
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
|
)`,
|
|
`ALTER TABLE message_cache ADD COLUMN IF NOT EXISTS account_id VARCHAR(64) NOT NULL DEFAULT ''`,
|
|
`ALTER TABLE message_cache ADD COLUMN IF NOT EXISTS event_type VARCHAR(100) NOT NULL DEFAULT ''`,
|
|
`ALTER TABLE message_cache ADD COLUMN IF NOT EXISTS event_data JSONB NOT NULL DEFAULT '{}'::jsonb`,
|
|
`ALTER TABLE message_cache ADD COLUMN IF NOT EXISTS message_id VARCHAR(255) NOT NULL DEFAULT ''`,
|
|
`ALTER TABLE message_cache ADD COLUMN IF NOT EXISTS from_number VARCHAR(64) NOT NULL DEFAULT ''`,
|
|
`ALTER TABLE message_cache ADD COLUMN IF NOT EXISTS to_number VARCHAR(64) NOT NULL DEFAULT ''`,
|
|
`ALTER TABLE message_cache ADD COLUMN IF NOT EXISTS reason TEXT NOT NULL DEFAULT ''`,
|
|
`ALTER TABLE message_cache ADD COLUMN IF NOT EXISTS attempts INTEGER NOT NULL DEFAULT 0`,
|
|
`ALTER TABLE message_cache ADD COLUMN IF NOT EXISTS timestamp TIMESTAMPTZ NOT NULL DEFAULT NOW()`,
|
|
`ALTER TABLE message_cache ADD COLUMN IF NOT EXISTS last_attempt TIMESTAMPTZ`,
|
|
`ALTER TABLE message_cache ADD COLUMN IF NOT EXISTS created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()`,
|
|
`CREATE INDEX IF NOT EXISTS idx_message_cache_timestamp ON message_cache (timestamp DESC)`,
|
|
`CREATE INDEX IF NOT EXISTS idx_message_cache_event_type ON message_cache (event_type)`,
|
|
}
|
|
for _, query := range queries {
|
|
if _, err := DB.ExecContext(ctx, query); err != nil {
|
|
return fmt.Errorf("failed to ensure postgres message_cache table: %w", err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
queries := []string{
|
|
`CREATE TABLE IF NOT EXISTS message_cache (
|
|
id TEXT PRIMARY KEY,
|
|
account_id TEXT NOT NULL DEFAULT '',
|
|
event_type TEXT NOT NULL,
|
|
event_data TEXT NOT NULL,
|
|
message_id TEXT NOT NULL DEFAULT '',
|
|
from_number TEXT NOT NULL DEFAULT '',
|
|
to_number TEXT NOT NULL DEFAULT '',
|
|
reason TEXT NOT NULL DEFAULT '',
|
|
attempts INTEGER NOT NULL DEFAULT 0,
|
|
timestamp DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
|
last_attempt DATETIME,
|
|
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
|
|
)`,
|
|
`CREATE INDEX IF NOT EXISTS idx_message_cache_timestamp ON message_cache(timestamp DESC)`,
|
|
`CREATE INDEX IF NOT EXISTS idx_message_cache_event_type ON message_cache(event_type)`,
|
|
}
|
|
for _, query := range queries {
|
|
if _, err := DB.ExecContext(ctx, query); err != nil {
|
|
return fmt.Errorf("failed to ensure sqlite message_cache table: %w", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Close closes the database connection
|
|
func Close() error {
|
|
if DB == nil {
|
|
return nil
|
|
}
|
|
|
|
return DB.Close()
|
|
}
|
|
|
|
// GetDB returns the database instance
|
|
func GetDB() *bun.DB {
|
|
return DB
|
|
}
|
|
|
|
// HealthCheck checks if the database connection is healthy
|
|
func HealthCheck() error {
|
|
if DB == nil {
|
|
return fmt.Errorf("database not initialized")
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer cancel()
|
|
|
|
return DB.PingContext(ctx)
|
|
}
|