Event logging
This commit is contained in:
120
internal/eventlogger/postgres_target.go
Normal file
120
internal/eventlogger/postgres_target.go
Normal file
@@ -0,0 +1,120 @@
|
||||
package eventlogger
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.warky.dev/wdevs/whatshooked/internal/config"
|
||||
"git.warky.dev/wdevs/whatshooked/internal/events"
|
||||
|
||||
_ "github.com/lib/pq" // PostgreSQL driver
|
||||
)
|
||||
|
||||
// PostgresTarget logs events to PostgreSQL database
|
||||
type PostgresTarget struct {
|
||||
db *sql.DB
|
||||
tableName string
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
// NewPostgresTarget creates a new PostgreSQL logging target
|
||||
func NewPostgresTarget(dbConfig config.DatabaseConfig, tableName string) (*PostgresTarget, error) {
|
||||
// Build connection string
|
||||
connStr := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable",
|
||||
dbConfig.Host,
|
||||
dbConfig.Port,
|
||||
dbConfig.Username,
|
||||
dbConfig.Password,
|
||||
dbConfig.Database,
|
||||
)
|
||||
|
||||
// Open PostgreSQL connection
|
||||
db, err := sql.Open("postgres", connStr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to open PostgreSQL database: %w", err)
|
||||
}
|
||||
|
||||
// Test connection
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
if err := db.PingContext(ctx); err != nil {
|
||||
db.Close()
|
||||
return nil, fmt.Errorf("failed to connect to PostgreSQL: %w", err)
|
||||
}
|
||||
|
||||
target := &PostgresTarget{
|
||||
db: db,
|
||||
tableName: tableName,
|
||||
}
|
||||
|
||||
// Create table if it doesn't exist
|
||||
if err := target.createTable(); err != nil {
|
||||
db.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return target, nil
|
||||
}
|
||||
|
||||
// createTable creates the event logs table if it doesn't exist
|
||||
func (pt *PostgresTarget) createTable() error {
|
||||
query := fmt.Sprintf(`
|
||||
CREATE TABLE IF NOT EXISTS %s (
|
||||
id SERIAL PRIMARY KEY,
|
||||
event_type VARCHAR(100) NOT NULL,
|
||||
timestamp TIMESTAMP NOT NULL,
|
||||
data JSONB NOT NULL,
|
||||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||||
)
|
||||
`, pt.tableName)
|
||||
|
||||
if _, err := pt.db.Exec(query); err != nil {
|
||||
return fmt.Errorf("failed to create table: %w", err)
|
||||
}
|
||||
|
||||
// Create index on event_type and timestamp
|
||||
indexQuery := fmt.Sprintf(`
|
||||
CREATE INDEX IF NOT EXISTS idx_%s_type_timestamp
|
||||
ON %s(event_type, timestamp)
|
||||
`, pt.tableName, pt.tableName)
|
||||
|
||||
if _, err := pt.db.Exec(indexQuery); err != nil {
|
||||
return fmt.Errorf("failed to create index: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Log writes an event to PostgreSQL database
|
||||
func (pt *PostgresTarget) Log(event events.Event) error {
|
||||
pt.mu.Lock()
|
||||
defer pt.mu.Unlock()
|
||||
|
||||
// Marshal event data to JSON
|
||||
data, err := json.Marshal(event.Data)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal event data: %w", err)
|
||||
}
|
||||
|
||||
query := fmt.Sprintf(`
|
||||
INSERT INTO %s (event_type, timestamp, data)
|
||||
VALUES ($1, $2, $3)
|
||||
`, pt.tableName)
|
||||
|
||||
_, err = pt.db.Exec(query, string(event.Type), event.Timestamp, string(data))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to insert event: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close closes the PostgreSQL database connection
|
||||
func (pt *PostgresTarget) Close() error {
|
||||
return pt.db.Close()
|
||||
}
|
||||
Reference in New Issue
Block a user