Files
whatshooked/pkg/eventlogger/postgres_target.go
2025-12-29 09:51:16 +02:00

121 lines
2.8 KiB
Go

package eventlogger
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"sync"
"time"
"git.warky.dev/wdevs/whatshooked/pkg/config"
"git.warky.dev/wdevs/whatshooked/pkg/events"
_ "github.com/lib/pq" // PostgreSQL driver
)
// PostgresTarget logs events to PostgreSQL database
type PostgresTarget struct {
db *sql.DB
tableName string
mu sync.Mutex
}
// NewPostgresTarget creates a new PostgreSQL logging target
func NewPostgresTarget(dbConfig config.DatabaseConfig, tableName string) (*PostgresTarget, error) {
// Build connection string
connStr := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable",
dbConfig.Host,
dbConfig.Port,
dbConfig.Username,
dbConfig.Password,
dbConfig.Database,
)
// Open PostgreSQL connection
db, err := sql.Open("postgres", connStr)
if err != nil {
return nil, fmt.Errorf("failed to open PostgreSQL database: %w", err)
}
// Test connection
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := db.PingContext(ctx); err != nil {
db.Close()
return nil, fmt.Errorf("failed to connect to PostgreSQL: %w", err)
}
target := &PostgresTarget{
db: db,
tableName: tableName,
}
// Create table if it doesn't exist
if err := target.createTable(); err != nil {
db.Close()
return nil, err
}
return target, nil
}
// createTable creates the event logs table if it doesn't exist
func (pt *PostgresTarget) createTable() error {
query := fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
id SERIAL PRIMARY KEY,
event_type VARCHAR(100) NOT NULL,
timestamp TIMESTAMP NOT NULL,
data JSONB NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
`, pt.tableName)
if _, err := pt.db.Exec(query); err != nil {
return fmt.Errorf("failed to create table: %w", err)
}
// Create index on event_type and timestamp
indexQuery := fmt.Sprintf(`
CREATE INDEX IF NOT EXISTS idx_%s_type_timestamp
ON %s(event_type, timestamp)
`, pt.tableName, pt.tableName)
if _, err := pt.db.Exec(indexQuery); err != nil {
return fmt.Errorf("failed to create index: %w", err)
}
return nil
}
// Log writes an event to PostgreSQL database
func (pt *PostgresTarget) Log(event events.Event) error {
pt.mu.Lock()
defer pt.mu.Unlock()
// Marshal event data to JSON
data, err := json.Marshal(event.Data)
if err != nil {
return fmt.Errorf("failed to marshal event data: %w", err)
}
query := fmt.Sprintf(`
INSERT INTO %s (event_type, timestamp, data)
VALUES ($1, $2, $3)
`, pt.tableName)
_, err = pt.db.Exec(query, string(event.Type), event.Timestamp, string(data))
if err != nil {
return fmt.Errorf("failed to insert event: %w", err)
}
return nil
}
// Close closes the PostgreSQL database connection
func (pt *PostgresTarget) Close() error {
return pt.db.Close()
}