feat(dbmanager): Database connection Manager

This commit is contained in:
Hein
2026-01-02 16:19:33 +02:00
parent 443a672fcb
commit 6f05f15ff6
23 changed files with 4436 additions and 26 deletions

View File

@@ -0,0 +1,319 @@
# PostgreSQL NOTIFY/LISTEN Support
The `dbmanager` package provides built-in support for PostgreSQL's NOTIFY/LISTEN functionality through the `PostgresListener` type.
## Overview
PostgreSQL NOTIFY/LISTEN is a simple pub/sub mechanism that allows database clients to:
- **LISTEN** on named channels to receive notifications
- **NOTIFY** channels to send messages to all listeners
- Receive asynchronous notifications without polling
## Features
- ✅ Subscribe to multiple channels simultaneously
- ✅ Callback-based notification handling
- ✅ Automatic reconnection on connection loss
- ✅ Automatic resubscription after reconnection
- ✅ Thread-safe operations
- ✅ Panic recovery in notification handlers
- ✅ Dedicated connection for listening (doesn't interfere with queries)
## Usage
### Basic Usage
```go
package main
import (
"context"
"fmt"
"time"
"github.com/bitechdev/ResolveSpec/pkg/dbmanager/providers"
)
func main() {
// Create PostgreSQL provider
cfg := &providers.Config{
Name: "primary",
Type: "postgres",
Host: "localhost",
Port: 5432,
User: "postgres",
Password: "password",
Database: "myapp",
}
provider := providers.NewPostgresProvider()
ctx := context.Background()
if err := provider.Connect(ctx, cfg); err != nil {
panic(err)
}
defer provider.Close()
// Get listener
listener, err := provider.GetListener(ctx)
if err != nil {
panic(err)
}
// Subscribe to a channel
err = listener.Listen("events", func(channel, payload string) {
fmt.Printf("Received on %s: %s\n", channel, payload)
})
if err != nil {
panic(err)
}
// Send a notification
err = listener.Notify(ctx, "events", "Hello, World!")
if err != nil {
panic(err)
}
// Keep the program running
time.Sleep(1 * time.Second)
}
```
### Multiple Channels
```go
listener, _ := provider.GetListener(ctx)
// Listen to different channels with different handlers
listener.Listen("user_events", func(channel, payload string) {
fmt.Printf("User event: %s\n", payload)
})
listener.Listen("order_events", func(channel, payload string) {
fmt.Printf("Order event: %s\n", payload)
})
listener.Listen("payment_events", func(channel, payload string) {
fmt.Printf("Payment event: %s\n", payload)
})
```
### Unsubscribing
```go
// Stop listening to a specific channel
err := listener.Unlisten("user_events")
if err != nil {
fmt.Printf("Failed to unlisten: %v\n", err)
}
```
### Checking Active Channels
```go
// Get list of channels currently being listened to
channels := listener.Channels()
fmt.Printf("Listening to: %v\n", channels)
```
### Checking Connection Status
```go
if listener.IsConnected() {
fmt.Println("Listener is connected")
} else {
fmt.Println("Listener is disconnected")
}
```
## Integration with DBManager
When using the DBManager, you can access the listener through the PostgreSQL provider:
```go
// Initialize DBManager
mgr, err := dbmanager.NewManager(dbmanager.FromConfig(cfg.DBManager))
mgr.Connect(ctx)
defer mgr.Close()
// Get PostgreSQL connection
conn, err := mgr.Get("primary")
// Note: You'll need to cast to the underlying provider type
// This requires exposing the provider through the Connection interface
// or providing a helper method
```
## Use Cases
### Cache Invalidation
```go
listener.Listen("cache_invalidation", func(channel, payload string) {
// Parse the payload to determine what to invalidate
cache.Invalidate(payload)
})
```
### Real-time Updates
```go
listener.Listen("data_updates", func(channel, payload string) {
// Broadcast update to WebSocket clients
websocketBroadcast(payload)
})
```
### Configuration Reload
```go
listener.Listen("config_reload", func(channel, payload string) {
// Reload application configuration
config.Reload()
})
```
### Distributed Locking
```go
listener.Listen("lock_released", func(channel, payload string) {
// Attempt to acquire the lock
tryAcquireLock(payload)
})
```
## Automatic Reconnection
The listener automatically handles connection failures:
1. When a connection error is detected, the listener initiates reconnection
2. Once reconnected, it automatically resubscribes to all previous channels
3. Notification handlers remain active throughout the reconnection process
No manual intervention is required for reconnection.
## Error Handling
### Handler Panics
If a notification handler panics, the panic is recovered and logged. The listener continues to function normally:
```go
listener.Listen("events", func(channel, payload string) {
defer func() {
if r := recover(); r != nil {
log.Printf("Handler panic: %v", r)
}
}()
// Your event processing logic
processEvent(payload)
})
```
### Connection Errors
Connection errors trigger automatic reconnection. Check logs for reconnection events when `EnableLogging` is true.
## Thread Safety
All `PostgresListener` methods are thread-safe and can be called concurrently from multiple goroutines.
## Performance Considerations
1. **Dedicated Connection**: The listener uses a dedicated PostgreSQL connection separate from the query connection pool
2. **Asynchronous Handlers**: Notification handlers run in separate goroutines to avoid blocking
3. **Lightweight**: NOTIFY/LISTEN has minimal overhead compared to polling
## Comparison with Polling
| Feature | NOTIFY/LISTEN | Polling |
|---------|---------------|---------|
| Latency | Low (near real-time) | High (depends on poll interval) |
| Database Load | Minimal | High (constant queries) |
| Scalability | Excellent | Poor |
| Complexity | Simple | Moderate |
## Limitations
1. **PostgreSQL Only**: This feature is specific to PostgreSQL and not available for other databases
2. **No Message Persistence**: Notifications are not stored; if no listener is connected, the message is lost
3. **Payload Limit**: Notification payload is limited to 8000 bytes in PostgreSQL
4. **No Guaranteed Delivery**: If a listener disconnects, in-flight notifications may be lost
## Best Practices
1. **Keep Handlers Fast**: Notification handlers should be quick; for heavy processing, send work to a queue
2. **Use JSON Payloads**: Encode structured data as JSON for easy parsing
3. **Handle Errors Gracefully**: Always recover from panics in handlers
4. **Close Properly**: Always close the provider to ensure the listener is properly shut down
5. **Monitor Connection Status**: Use `IsConnected()` for health checks
## Example: Real-World Application
```go
// Subscribe to various application events
listener, _ := provider.GetListener(ctx)
// User registration events
listener.Listen("user_registered", func(channel, payload string) {
var event UserRegisteredEvent
json.Unmarshal([]byte(payload), &event)
// Send welcome email
sendWelcomeEmail(event.UserID)
// Invalidate user count cache
cache.Delete("user_count")
})
// Order placement events
listener.Listen("order_placed", func(channel, payload string) {
var event OrderPlacedEvent
json.Unmarshal([]byte(payload), &event)
// Notify warehouse system
warehouse.ProcessOrder(event.OrderID)
// Update inventory cache
cache.Invalidate("inventory:" + event.ProductID)
})
// Configuration changes
listener.Listen("config_updated", func(channel, payload string) {
// Reload configuration from database
appConfig.Reload()
})
```
## Triggering Notifications from SQL
You can trigger notifications directly from PostgreSQL triggers or functions:
```sql
-- Example trigger to notify on new user
CREATE OR REPLACE FUNCTION notify_user_registered()
RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify('user_registered',
json_build_object(
'user_id', NEW.id,
'email', NEW.email,
'timestamp', NOW()
)::text
);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER user_registered_trigger
AFTER INSERT ON users
FOR EACH ROW
EXECUTE FUNCTION notify_user_registered();
```
## Additional Resources
- [PostgreSQL NOTIFY Documentation](https://www.postgresql.org/docs/current/sql-notify.html)
- [PostgreSQL LISTEN Documentation](https://www.postgresql.org/docs/current/sql-listen.html)
- [pgx Driver Documentation](https://github.com/jackc/pgx)

View File

@@ -0,0 +1,214 @@
package providers
import (
"context"
"database/sql"
"fmt"
"time"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readpref"
"github.com/bitechdev/ResolveSpec/pkg/logger"
)
// MongoProvider implements Provider for MongoDB databases
type MongoProvider struct {
client *mongo.Client
config ConnectionConfig
}
// NewMongoProvider creates a new MongoDB provider
func NewMongoProvider() *MongoProvider {
return &MongoProvider{}
}
// Connect establishes a MongoDB connection
func (p *MongoProvider) Connect(ctx context.Context, cfg ConnectionConfig) error {
// Build DSN
dsn, err := cfg.BuildDSN()
if err != nil {
return fmt.Errorf("failed to build DSN: %w", err)
}
// Create client options
clientOpts := options.Client().ApplyURI(dsn)
// Set connection pool size
if cfg.GetMaxOpenConns() != nil {
maxPoolSize := uint64(*cfg.GetMaxOpenConns())
clientOpts.SetMaxPoolSize(maxPoolSize)
}
if cfg.GetMaxIdleConns() != nil {
minPoolSize := uint64(*cfg.GetMaxIdleConns())
clientOpts.SetMinPoolSize(minPoolSize)
}
// Set timeouts
clientOpts.SetConnectTimeout(cfg.GetConnectTimeout())
if cfg.GetQueryTimeout() > 0 {
clientOpts.SetTimeout(cfg.GetQueryTimeout())
}
// Set read preference if specified
if cfg.GetReadPreference() != "" {
rp, err := parseReadPreference(cfg.GetReadPreference())
if err != nil {
return fmt.Errorf("invalid read preference: %w", err)
}
clientOpts.SetReadPreference(rp)
}
// Connect with retry logic
var client *mongo.Client
var lastErr error
retryAttempts := 3
retryDelay := 1 * time.Second
for attempt := 0; attempt < retryAttempts; attempt++ {
if attempt > 0 {
delay := calculateBackoff(attempt, retryDelay, 10*time.Second)
if cfg.GetEnableLogging() {
logger.Info("Retrying MongoDB connection: attempt=%d/%d, delay=%v", attempt+1, retryAttempts, delay)
}
select {
case <-time.After(delay):
case <-ctx.Done():
return ctx.Err()
}
}
// Create MongoDB client
client, err = mongo.Connect(ctx, clientOpts)
if err != nil {
lastErr = err
if cfg.GetEnableLogging() {
logger.Warn("Failed to connect to MongoDB", "error", err)
}
continue
}
// Ping the database to verify connection
pingCtx, cancel := context.WithTimeout(ctx, cfg.GetConnectTimeout())
err = client.Ping(pingCtx, readpref.Primary())
cancel()
if err != nil {
lastErr = err
_ = client.Disconnect(ctx)
if cfg.GetEnableLogging() {
logger.Warn("Failed to ping MongoDB", "error", err)
}
continue
}
// Connection successful
break
}
if err != nil {
return fmt.Errorf("failed to connect after %d attempts: %w", retryAttempts, lastErr)
}
p.client = client
p.config = cfg
if cfg.GetEnableLogging() {
logger.Info("MongoDB connection established: name=%s, host=%s, database=%s", cfg.GetName(), cfg.GetHost(), cfg.GetDatabase())
}
return nil
}
// Close closes the MongoDB connection
func (p *MongoProvider) Close() error {
if p.client == nil {
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
err := p.client.Disconnect(ctx)
if err != nil {
return fmt.Errorf("failed to close MongoDB connection: %w", err)
}
if p.config.GetEnableLogging() {
logger.Info("MongoDB connection closed: name=%s", p.config.GetName())
}
p.client = nil
return nil
}
// HealthCheck verifies the MongoDB connection is alive
func (p *MongoProvider) HealthCheck(ctx context.Context) error {
if p.client == nil {
return fmt.Errorf("MongoDB client is nil")
}
// Use a short timeout for health checks
healthCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
if err := p.client.Ping(healthCtx, readpref.Primary()); err != nil {
return fmt.Errorf("health check failed: %w", err)
}
return nil
}
// GetNative returns an error for MongoDB (not a SQL database)
func (p *MongoProvider) GetNative() (*sql.DB, error) {
return nil, ErrNotSQLDatabase
}
// GetMongo returns the MongoDB client
func (p *MongoProvider) GetMongo() (*mongo.Client, error) {
if p.client == nil {
return nil, fmt.Errorf("MongoDB client is not initialized")
}
return p.client, nil
}
// Stats returns connection statistics for MongoDB
func (p *MongoProvider) Stats() *ConnectionStats {
if p.client == nil {
return &ConnectionStats{
Name: p.config.GetName(),
Type: "mongodb",
Connected: false,
}
}
// MongoDB doesn't expose detailed connection pool stats like sql.DB
// We return basic stats
return &ConnectionStats{
Name: p.config.GetName(),
Type: "mongodb",
Connected: true,
}
}
// parseReadPreference parses a read preference string into a readpref.ReadPref
func parseReadPreference(rp string) (*readpref.ReadPref, error) {
switch rp {
case "primary":
return readpref.Primary(), nil
case "primaryPreferred":
return readpref.PrimaryPreferred(), nil
case "secondary":
return readpref.Secondary(), nil
case "secondaryPreferred":
return readpref.SecondaryPreferred(), nil
case "nearest":
return readpref.Nearest(), nil
default:
return nil, fmt.Errorf("unknown read preference: %s", rp)
}
}

View File

@@ -0,0 +1,184 @@
package providers
import (
"context"
"database/sql"
"fmt"
"time"
_ "github.com/microsoft/go-mssqldb" // MSSQL driver
"go.mongodb.org/mongo-driver/mongo"
"github.com/bitechdev/ResolveSpec/pkg/logger"
)
// MSSQLProvider implements Provider for Microsoft SQL Server databases
type MSSQLProvider struct {
db *sql.DB
config ConnectionConfig
}
// NewMSSQLProvider creates a new MSSQL provider
func NewMSSQLProvider() *MSSQLProvider {
return &MSSQLProvider{}
}
// Connect establishes a MSSQL connection
func (p *MSSQLProvider) Connect(ctx context.Context, cfg ConnectionConfig) error {
// Build DSN
dsn, err := cfg.BuildDSN()
if err != nil {
return fmt.Errorf("failed to build DSN: %w", err)
}
// Connect with retry logic
var db *sql.DB
var lastErr error
retryAttempts := 3 // Default retry attempts
retryDelay := 1 * time.Second
for attempt := 0; attempt < retryAttempts; attempt++ {
if attempt > 0 {
delay := calculateBackoff(attempt, retryDelay, 10*time.Second)
if cfg.GetEnableLogging() {
logger.Info("Retrying MSSQL connection: attempt=%d/%d, delay=%v", attempt+1, retryAttempts, delay)
}
select {
case <-time.After(delay):
case <-ctx.Done():
return ctx.Err()
}
}
// Open database connection
db, err = sql.Open("sqlserver", dsn)
if err != nil {
lastErr = err
if cfg.GetEnableLogging() {
logger.Warn("Failed to open MSSQL connection", "error", err)
}
continue
}
// Test the connection with context timeout
connectCtx, cancel := context.WithTimeout(ctx, cfg.GetConnectTimeout())
err = db.PingContext(connectCtx)
cancel()
if err != nil {
lastErr = err
db.Close()
if cfg.GetEnableLogging() {
logger.Warn("Failed to ping MSSQL database", "error", err)
}
continue
}
// Connection successful
break
}
if err != nil {
return fmt.Errorf("failed to connect after %d attempts: %w", retryAttempts, lastErr)
}
// Configure connection pool
if cfg.GetMaxOpenConns() != nil {
db.SetMaxOpenConns(*cfg.GetMaxOpenConns())
}
if cfg.GetMaxIdleConns() != nil {
db.SetMaxIdleConns(*cfg.GetMaxIdleConns())
}
if cfg.GetConnMaxLifetime() != nil {
db.SetConnMaxLifetime(*cfg.GetConnMaxLifetime())
}
if cfg.GetConnMaxIdleTime() != nil {
db.SetConnMaxIdleTime(*cfg.GetConnMaxIdleTime())
}
p.db = db
p.config = cfg
if cfg.GetEnableLogging() {
logger.Info("MSSQL connection established: name=%s, host=%s, database=%s", cfg.GetName(), cfg.GetHost(), cfg.GetDatabase())
}
return nil
}
// Close closes the MSSQL connection
func (p *MSSQLProvider) Close() error {
if p.db == nil {
return nil
}
err := p.db.Close()
if err != nil {
return fmt.Errorf("failed to close MSSQL connection: %w", err)
}
if p.config.GetEnableLogging() {
logger.Info("MSSQL connection closed: name=%s", p.config.GetName())
}
p.db = nil
return nil
}
// HealthCheck verifies the MSSQL connection is alive
func (p *MSSQLProvider) HealthCheck(ctx context.Context) error {
if p.db == nil {
return fmt.Errorf("database connection is nil")
}
// Use a short timeout for health checks
healthCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
if err := p.db.PingContext(healthCtx); err != nil {
return fmt.Errorf("health check failed: %w", err)
}
return nil
}
// GetNative returns the native *sql.DB connection
func (p *MSSQLProvider) GetNative() (*sql.DB, error) {
if p.db == nil {
return nil, fmt.Errorf("database connection is not initialized")
}
return p.db, nil
}
// GetMongo returns an error for MSSQL (not a MongoDB connection)
func (p *MSSQLProvider) GetMongo() (*mongo.Client, error) {
return nil, ErrNotMongoDB
}
// Stats returns connection pool statistics
func (p *MSSQLProvider) Stats() *ConnectionStats {
if p.db == nil {
return &ConnectionStats{
Name: p.config.GetName(),
Type: "mssql",
Connected: false,
}
}
stats := p.db.Stats()
return &ConnectionStats{
Name: p.config.GetName(),
Type: "mssql",
Connected: true,
OpenConnections: stats.OpenConnections,
InUse: stats.InUse,
Idle: stats.Idle,
WaitCount: stats.WaitCount,
WaitDuration: stats.WaitDuration,
MaxIdleClosed: stats.MaxIdleClosed,
MaxLifetimeClosed: stats.MaxLifetimeClosed,
}
}

View File

@@ -0,0 +1,231 @@
package providers
import (
"context"
"database/sql"
"fmt"
"math"
"sync"
"time"
_ "github.com/jackc/pgx/v5/stdlib" // PostgreSQL driver
"go.mongodb.org/mongo-driver/mongo"
"github.com/bitechdev/ResolveSpec/pkg/logger"
)
// PostgresProvider implements Provider for PostgreSQL databases
type PostgresProvider struct {
db *sql.DB
config ConnectionConfig
listener *PostgresListener
mu sync.Mutex
}
// NewPostgresProvider creates a new PostgreSQL provider
func NewPostgresProvider() *PostgresProvider {
return &PostgresProvider{}
}
// Connect establishes a PostgreSQL connection
func (p *PostgresProvider) Connect(ctx context.Context, cfg ConnectionConfig) error {
// Build DSN
dsn, err := cfg.BuildDSN()
if err != nil {
return fmt.Errorf("failed to build DSN: %w", err)
}
// Connect with retry logic
var db *sql.DB
var lastErr error
retryAttempts := 3 // Default retry attempts
retryDelay := 1 * time.Second
for attempt := 0; attempt < retryAttempts; attempt++ {
if attempt > 0 {
delay := calculateBackoff(attempt, retryDelay, 10*time.Second)
if cfg.GetEnableLogging() {
logger.Info("Retrying PostgreSQL connection: attempt=%d/%d, delay=%v", attempt+1, retryAttempts, delay)
}
select {
case <-time.After(delay):
case <-ctx.Done():
return ctx.Err()
}
}
// Open database connection
db, err = sql.Open("pgx", dsn)
if err != nil {
lastErr = err
if cfg.GetEnableLogging() {
logger.Warn("Failed to open PostgreSQL connection", "error", err)
}
continue
}
// Test the connection with context timeout
connectCtx, cancel := context.WithTimeout(ctx, cfg.GetConnectTimeout())
err = db.PingContext(connectCtx)
cancel()
if err != nil {
lastErr = err
db.Close()
if cfg.GetEnableLogging() {
logger.Warn("Failed to ping PostgreSQL database", "error", err)
}
continue
}
// Connection successful
break
}
if err != nil {
return fmt.Errorf("failed to connect after %d attempts: %w", retryAttempts, lastErr)
}
// Configure connection pool
if cfg.GetMaxOpenConns() != nil {
db.SetMaxOpenConns(*cfg.GetMaxOpenConns())
}
if cfg.GetMaxIdleConns() != nil {
db.SetMaxIdleConns(*cfg.GetMaxIdleConns())
}
if cfg.GetConnMaxLifetime() != nil {
db.SetConnMaxLifetime(*cfg.GetConnMaxLifetime())
}
if cfg.GetConnMaxIdleTime() != nil {
db.SetConnMaxIdleTime(*cfg.GetConnMaxIdleTime())
}
p.db = db
p.config = cfg
if cfg.GetEnableLogging() {
logger.Info("PostgreSQL connection established: name=%s, host=%s, database=%s", cfg.GetName(), cfg.GetHost(), cfg.GetDatabase())
}
return nil
}
// Close closes the PostgreSQL connection
func (p *PostgresProvider) Close() error {
// Close listener if it exists
p.mu.Lock()
if p.listener != nil {
if err := p.listener.Close(); err != nil {
p.mu.Unlock()
return fmt.Errorf("failed to close listener: %w", err)
}
p.listener = nil
}
p.mu.Unlock()
if p.db == nil {
return nil
}
err := p.db.Close()
if err != nil {
return fmt.Errorf("failed to close PostgreSQL connection: %w", err)
}
if p.config.GetEnableLogging() {
logger.Info("PostgreSQL connection closed: name=%s", p.config.GetName())
}
p.db = nil
return nil
}
// HealthCheck verifies the PostgreSQL connection is alive
func (p *PostgresProvider) HealthCheck(ctx context.Context) error {
if p.db == nil {
return fmt.Errorf("database connection is nil")
}
// Use a short timeout for health checks
healthCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
if err := p.db.PingContext(healthCtx); err != nil {
return fmt.Errorf("health check failed: %w", err)
}
return nil
}
// GetNative returns the native *sql.DB connection
func (p *PostgresProvider) GetNative() (*sql.DB, error) {
if p.db == nil {
return nil, fmt.Errorf("database connection is not initialized")
}
return p.db, nil
}
// GetMongo returns an error for PostgreSQL (not a MongoDB connection)
func (p *PostgresProvider) GetMongo() (*mongo.Client, error) {
return nil, ErrNotMongoDB
}
// Stats returns connection pool statistics
func (p *PostgresProvider) Stats() *ConnectionStats {
if p.db == nil {
return &ConnectionStats{
Name: p.config.GetName(),
Type: "postgres",
Connected: false,
}
}
stats := p.db.Stats()
return &ConnectionStats{
Name: p.config.GetName(),
Type: "postgres",
Connected: true,
OpenConnections: stats.OpenConnections,
InUse: stats.InUse,
Idle: stats.Idle,
WaitCount: stats.WaitCount,
WaitDuration: stats.WaitDuration,
MaxIdleClosed: stats.MaxIdleClosed,
MaxLifetimeClosed: stats.MaxLifetimeClosed,
}
}
// GetListener returns a PostgreSQL listener for NOTIFY/LISTEN functionality
// The listener is lazily initialized on first call and reused for subsequent calls
func (p *PostgresProvider) GetListener(ctx context.Context) (*PostgresListener, error) {
p.mu.Lock()
defer p.mu.Unlock()
// Return existing listener if already created
if p.listener != nil {
return p.listener, nil
}
// Create new listener
listener := NewPostgresListener(p.config)
// Connect the listener
if err := listener.Connect(ctx); err != nil {
return nil, fmt.Errorf("failed to connect listener: %w", err)
}
p.listener = listener
return p.listener, nil
}
// calculateBackoff calculates exponential backoff delay
func calculateBackoff(attempt int, initial, maxDelay time.Duration) time.Duration {
delay := initial * time.Duration(math.Pow(2, float64(attempt)))
if delay > maxDelay {
delay = maxDelay
}
return delay
}

View File

@@ -0,0 +1,401 @@
package providers
import (
"context"
"fmt"
"sync"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/bitechdev/ResolveSpec/pkg/logger"
)
// NotificationHandler is called when a notification is received
type NotificationHandler func(channel string, payload string)
// PostgresListener manages PostgreSQL LISTEN/NOTIFY functionality
type PostgresListener struct {
config ConnectionConfig
conn *pgx.Conn
// Channel subscriptions
channels map[string]NotificationHandler
mu sync.RWMutex
// Lifecycle management
ctx context.Context
cancel context.CancelFunc
closed bool
closeMu sync.Mutex
reconnectC chan struct{}
}
// NewPostgresListener creates a new PostgreSQL listener
func NewPostgresListener(cfg ConnectionConfig) *PostgresListener {
ctx, cancel := context.WithCancel(context.Background())
return &PostgresListener{
config: cfg,
channels: make(map[string]NotificationHandler),
ctx: ctx,
cancel: cancel,
reconnectC: make(chan struct{}, 1),
}
}
// Connect establishes a dedicated connection for listening
func (l *PostgresListener) Connect(ctx context.Context) error {
dsn, err := l.config.BuildDSN()
if err != nil {
return fmt.Errorf("failed to build DSN: %w", err)
}
// Parse connection config
connConfig, err := pgx.ParseConfig(dsn)
if err != nil {
return fmt.Errorf("failed to parse connection config: %w", err)
}
// Connect with retry logic
var conn *pgx.Conn
var lastErr error
retryAttempts := 3
retryDelay := 1 * time.Second
for attempt := 0; attempt < retryAttempts; attempt++ {
if attempt > 0 {
delay := calculateBackoff(attempt, retryDelay, 10*time.Second)
if l.config.GetEnableLogging() {
logger.Info("Retrying PostgreSQL listener connection: attempt=%d/%d, delay=%v", attempt+1, retryAttempts, delay)
}
select {
case <-time.After(delay):
case <-ctx.Done():
return ctx.Err()
}
}
conn, err = pgx.ConnectConfig(ctx, connConfig)
if err != nil {
lastErr = err
if l.config.GetEnableLogging() {
logger.Warn("Failed to connect PostgreSQL listener", "error", err)
}
continue
}
// Test the connection
if err = conn.Ping(ctx); err != nil {
lastErr = err
conn.Close(ctx)
if l.config.GetEnableLogging() {
logger.Warn("Failed to ping PostgreSQL listener", "error", err)
}
continue
}
// Connection successful
break
}
if err != nil {
return fmt.Errorf("failed to connect listener after %d attempts: %w", retryAttempts, lastErr)
}
l.mu.Lock()
l.conn = conn
l.mu.Unlock()
// Start notification handler
go l.handleNotifications()
// Start reconnection handler
go l.handleReconnection()
if l.config.GetEnableLogging() {
logger.Info("PostgreSQL listener connected: name=%s", l.config.GetName())
}
return nil
}
// Listen subscribes to a PostgreSQL notification channel
func (l *PostgresListener) Listen(channel string, handler NotificationHandler) error {
l.closeMu.Lock()
if l.closed {
l.closeMu.Unlock()
return fmt.Errorf("listener is closed")
}
l.closeMu.Unlock()
l.mu.Lock()
defer l.mu.Unlock()
if l.conn == nil {
return fmt.Errorf("listener connection is not initialized")
}
// Execute LISTEN command
_, err := l.conn.Exec(l.ctx, fmt.Sprintf("LISTEN %s", pgx.Identifier{channel}.Sanitize()))
if err != nil {
return fmt.Errorf("failed to listen on channel %s: %w", channel, err)
}
// Store the handler
l.channels[channel] = handler
if l.config.GetEnableLogging() {
logger.Info("Listening on channel: name=%s, channel=%s", l.config.GetName(), channel)
}
return nil
}
// Unlisten unsubscribes from a PostgreSQL notification channel
func (l *PostgresListener) Unlisten(channel string) error {
l.closeMu.Lock()
if l.closed {
l.closeMu.Unlock()
return fmt.Errorf("listener is closed")
}
l.closeMu.Unlock()
l.mu.Lock()
defer l.mu.Unlock()
if l.conn == nil {
return fmt.Errorf("listener connection is not initialized")
}
// Execute UNLISTEN command
_, err := l.conn.Exec(l.ctx, fmt.Sprintf("UNLISTEN %s", pgx.Identifier{channel}.Sanitize()))
if err != nil {
return fmt.Errorf("failed to unlisten from channel %s: %w", channel, err)
}
// Remove the handler
delete(l.channels, channel)
if l.config.GetEnableLogging() {
logger.Info("Unlistened from channel: name=%s, channel=%s", l.config.GetName(), channel)
}
return nil
}
// Notify sends a notification to a PostgreSQL channel
func (l *PostgresListener) Notify(ctx context.Context, channel string, payload string) error {
l.closeMu.Lock()
if l.closed {
l.closeMu.Unlock()
return fmt.Errorf("listener is closed")
}
l.closeMu.Unlock()
l.mu.RLock()
conn := l.conn
l.mu.RUnlock()
if conn == nil {
return fmt.Errorf("listener connection is not initialized")
}
// Execute NOTIFY command
_, err := conn.Exec(ctx, "SELECT pg_notify($1, $2)", channel, payload)
if err != nil {
return fmt.Errorf("failed to notify channel %s: %w", channel, err)
}
return nil
}
// Close closes the listener and all subscriptions
func (l *PostgresListener) Close() error {
l.closeMu.Lock()
if l.closed {
l.closeMu.Unlock()
return nil
}
l.closed = true
l.closeMu.Unlock()
// Cancel context to stop background goroutines
l.cancel()
l.mu.Lock()
defer l.mu.Unlock()
if l.conn == nil {
return nil
}
// Unlisten from all channels
for channel := range l.channels {
_, _ = l.conn.Exec(context.Background(), fmt.Sprintf("UNLISTEN %s", pgx.Identifier{channel}.Sanitize()))
}
// Close connection
err := l.conn.Close(context.Background())
if err != nil {
return fmt.Errorf("failed to close listener connection: %w", err)
}
l.conn = nil
l.channels = make(map[string]NotificationHandler)
if l.config.GetEnableLogging() {
logger.Info("PostgreSQL listener closed: name=%s", l.config.GetName())
}
return nil
}
// handleNotifications processes incoming notifications
func (l *PostgresListener) handleNotifications() {
for {
select {
case <-l.ctx.Done():
return
default:
}
l.mu.RLock()
conn := l.conn
l.mu.RUnlock()
if conn == nil {
// Connection not available, wait for reconnection
time.Sleep(100 * time.Millisecond)
continue
}
// Wait for notification with timeout
ctx, cancel := context.WithTimeout(l.ctx, 5*time.Second)
notification, err := conn.WaitForNotification(ctx)
cancel()
if err != nil {
// Check if context was cancelled
if l.ctx.Err() != nil {
return
}
// Check if it's a connection error
if pgconn.Timeout(err) {
// Timeout is normal, continue waiting
continue
}
// Connection error, trigger reconnection
if l.config.GetEnableLogging() {
logger.Warn("Notification error, triggering reconnection", "error", err)
}
select {
case l.reconnectC <- struct{}{}:
default:
}
time.Sleep(1 * time.Second)
continue
}
// Process notification
l.mu.RLock()
handler, exists := l.channels[notification.Channel]
l.mu.RUnlock()
if exists && handler != nil {
// Call handler in a goroutine to avoid blocking
go func(ch, payload string) {
defer func() {
if r := recover(); r != nil {
if l.config.GetEnableLogging() {
logger.Error("Notification handler panic: channel=%s, error=%v", ch, r)
}
}
}()
handler(ch, payload)
}(notification.Channel, notification.Payload)
}
}
}
// handleReconnection manages automatic reconnection
func (l *PostgresListener) handleReconnection() {
for {
select {
case <-l.ctx.Done():
return
case <-l.reconnectC:
if l.config.GetEnableLogging() {
logger.Info("Attempting to reconnect listener: name=%s", l.config.GetName())
}
// Close existing connection
l.mu.Lock()
if l.conn != nil {
l.conn.Close(context.Background())
l.conn = nil
}
// Save current subscriptions
channels := make(map[string]NotificationHandler)
for ch, handler := range l.channels {
channels[ch] = handler
}
l.mu.Unlock()
// Attempt reconnection
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
err := l.Connect(ctx)
cancel()
if err != nil {
if l.config.GetEnableLogging() {
logger.Error("Failed to reconnect listener: name=%s, error=%v", l.config.GetName(), err)
}
// Retry after delay
time.Sleep(5 * time.Second)
select {
case l.reconnectC <- struct{}{}:
default:
}
continue
}
// Resubscribe to all channels
for channel, handler := range channels {
if err := l.Listen(channel, handler); err != nil {
if l.config.GetEnableLogging() {
logger.Error("Failed to resubscribe to channel: name=%s, channel=%s, error=%v", l.config.GetName(), channel, err)
}
}
}
if l.config.GetEnableLogging() {
logger.Info("Listener reconnected successfully: name=%s", l.config.GetName())
}
}
}
}
// IsConnected returns true if the listener is connected
func (l *PostgresListener) IsConnected() bool {
l.mu.RLock()
defer l.mu.RUnlock()
return l.conn != nil
}
// Channels returns the list of channels currently being listened to
func (l *PostgresListener) Channels() []string {
l.mu.RLock()
defer l.mu.RUnlock()
channels := make([]string, 0, len(l.channels))
for ch := range l.channels {
channels = append(channels, ch)
}
return channels
}

View File

@@ -0,0 +1,228 @@
package providers_test
import (
"context"
"fmt"
"time"
"github.com/bitechdev/ResolveSpec/pkg/dbmanager"
"github.com/bitechdev/ResolveSpec/pkg/dbmanager/providers"
)
// ExamplePostgresListener_basic demonstrates basic LISTEN/NOTIFY usage
func ExamplePostgresListener_basic() {
// Create a connection config
cfg := &dbmanager.ConnectionConfig{
Name: "example",
Type: dbmanager.DatabaseTypePostgreSQL,
Host: "localhost",
Port: 5432,
User: "postgres",
Password: "password",
Database: "testdb",
ConnectTimeout: 10 * time.Second,
EnableLogging: true,
}
// Create and connect PostgreSQL provider
provider := providers.NewPostgresProvider()
ctx := context.Background()
if err := provider.Connect(ctx, cfg); err != nil {
panic(fmt.Sprintf("Failed to connect: %v", err))
}
defer provider.Close()
// Get listener
listener, err := provider.GetListener(ctx)
if err != nil {
panic(fmt.Sprintf("Failed to get listener: %v", err))
}
// Subscribe to a channel with a handler
err = listener.Listen("user_events", func(channel, payload string) {
fmt.Printf("Received notification on %s: %s\n", channel, payload)
})
if err != nil {
panic(fmt.Sprintf("Failed to listen: %v", err))
}
// Send a notification
err = listener.Notify(ctx, "user_events", `{"event":"user_created","user_id":123}`)
if err != nil {
panic(fmt.Sprintf("Failed to notify: %v", err))
}
// Wait for notification to be processed
time.Sleep(100 * time.Millisecond)
// Unsubscribe from the channel
if err := listener.Unlisten("user_events"); err != nil {
panic(fmt.Sprintf("Failed to unlisten: %v", err))
}
}
// ExamplePostgresListener_multipleChannels demonstrates listening to multiple channels
func ExamplePostgresListener_multipleChannels() {
cfg := &dbmanager.ConnectionConfig{
Name: "example",
Type: dbmanager.DatabaseTypePostgreSQL,
Host: "localhost",
Port: 5432,
User: "postgres",
Password: "password",
Database: "testdb",
ConnectTimeout: 10 * time.Second,
EnableLogging: false,
}
provider := providers.NewPostgresProvider()
ctx := context.Background()
if err := provider.Connect(ctx, cfg); err != nil {
panic(fmt.Sprintf("Failed to connect: %v", err))
}
defer provider.Close()
listener, err := provider.GetListener(ctx)
if err != nil {
panic(fmt.Sprintf("Failed to get listener: %v", err))
}
// Listen to multiple channels
channels := []string{"orders", "payments", "notifications"}
for _, ch := range channels {
channel := ch // Capture for closure
err := listener.Listen(channel, func(ch, payload string) {
fmt.Printf("[%s] %s\n", ch, payload)
})
if err != nil {
panic(fmt.Sprintf("Failed to listen on %s: %v", channel, err))
}
}
// Send notifications to different channels
listener.Notify(ctx, "orders", "New order #12345")
listener.Notify(ctx, "payments", "Payment received $99.99")
listener.Notify(ctx, "notifications", "Welcome email sent")
// Wait for notifications
time.Sleep(200 * time.Millisecond)
// Check active channels
activeChannels := listener.Channels()
fmt.Printf("Listening to %d channels: %v\n", len(activeChannels), activeChannels)
}
// ExamplePostgresListener_withDBManager demonstrates usage with DBManager
func ExamplePostgresListener_withDBManager() {
// This example shows how to use the listener with the full DBManager
// Assume we have a DBManager instance and get a connection
// conn, _ := dbMgr.Get("primary")
// Get the underlying provider (this would need to be exposed via the Connection interface)
// For now, this is a conceptual example
ctx := context.Background()
// Create provider directly for demonstration
cfg := &dbmanager.ConnectionConfig{
Name: "primary",
Type: dbmanager.DatabaseTypePostgreSQL,
Host: "localhost",
Port: 5432,
User: "postgres",
Password: "password",
Database: "myapp",
ConnectTimeout: 10 * time.Second,
}
provider := providers.NewPostgresProvider()
if err := provider.Connect(ctx, cfg); err != nil {
panic(err)
}
defer provider.Close()
// Get listener
listener, err := provider.GetListener(ctx)
if err != nil {
panic(err)
}
// Subscribe to application events
listener.Listen("cache_invalidation", func(channel, payload string) {
fmt.Printf("Cache invalidation request: %s\n", payload)
// Handle cache invalidation logic here
})
listener.Listen("config_reload", func(channel, payload string) {
fmt.Printf("Configuration reload request: %s\n", payload)
// Handle configuration reload logic here
})
// Simulate receiving notifications
listener.Notify(ctx, "cache_invalidation", "user:123")
listener.Notify(ctx, "config_reload", "database")
time.Sleep(100 * time.Millisecond)
}
// ExamplePostgresListener_errorHandling demonstrates error handling and reconnection
func ExamplePostgresListener_errorHandling() {
cfg := &dbmanager.ConnectionConfig{
Name: "example",
Type: dbmanager.DatabaseTypePostgreSQL,
Host: "localhost",
Port: 5432,
User: "postgres",
Password: "password",
Database: "testdb",
ConnectTimeout: 10 * time.Second,
EnableLogging: true,
}
provider := providers.NewPostgresProvider()
ctx := context.Background()
if err := provider.Connect(ctx, cfg); err != nil {
panic(fmt.Sprintf("Failed to connect: %v", err))
}
defer provider.Close()
listener, err := provider.GetListener(ctx)
if err != nil {
panic(fmt.Sprintf("Failed to get listener: %v", err))
}
// The listener automatically reconnects if the connection is lost
// Subscribe with error handling in the callback
err = listener.Listen("critical_events", func(channel, payload string) {
defer func() {
if r := recover(); r != nil {
fmt.Printf("Handler panic recovered: %v\n", r)
}
}()
// Process the event
fmt.Printf("Processing critical event: %s\n", payload)
// If processing fails, the panic will be caught by the defer above
// The listener will continue to function normally
})
if err != nil {
fmt.Printf("Failed to listen: %v\n", err)
return
}
// Check if listener is connected
if listener.IsConnected() {
fmt.Println("Listener is connected and ready")
}
// Send a notification
listener.Notify(ctx, "critical_events", "system_alert")
time.Sleep(100 * time.Millisecond)
}

View File

@@ -0,0 +1,83 @@
package providers
import (
"context"
"database/sql"
"errors"
"time"
"go.mongodb.org/mongo-driver/mongo"
)
// Common errors
var (
// ErrNotSQLDatabase is returned when attempting SQL operations on a non-SQL database
ErrNotSQLDatabase = errors.New("not a SQL database")
// ErrNotMongoDB is returned when attempting MongoDB operations on a non-MongoDB connection
ErrNotMongoDB = errors.New("not a MongoDB connection")
)
// ConnectionStats contains statistics about a database connection
type ConnectionStats struct {
Name string
Type string // Database type as string to avoid circular dependency
Connected bool
LastHealthCheck time.Time
HealthCheckStatus string
// SQL connection pool stats
OpenConnections int
InUse int
Idle int
WaitCount int64
WaitDuration time.Duration
MaxIdleClosed int64
MaxLifetimeClosed int64
}
// ConnectionConfig is a minimal interface for configuration
// The actual implementation is in dbmanager package
type ConnectionConfig interface {
BuildDSN() (string, error)
GetName() string
GetType() string
GetHost() string
GetPort() int
GetUser() string
GetPassword() string
GetDatabase() string
GetFilePath() string
GetConnectTimeout() time.Duration
GetQueryTimeout() time.Duration
GetEnableLogging() bool
GetEnableMetrics() bool
GetMaxOpenConns() *int
GetMaxIdleConns() *int
GetConnMaxLifetime() *time.Duration
GetConnMaxIdleTime() *time.Duration
GetReadPreference() string
}
// Provider creates and manages the underlying database connection
type Provider interface {
// Connect establishes the database connection
Connect(ctx context.Context, cfg ConnectionConfig) error
// Close closes the connection
Close() error
// HealthCheck verifies the connection is alive
HealthCheck(ctx context.Context) error
// GetNative returns the native *sql.DB (SQL databases only)
// Returns an error for non-SQL databases
GetNative() (*sql.DB, error)
// GetMongo returns the MongoDB client (MongoDB only)
// Returns an error for non-MongoDB databases
GetMongo() (*mongo.Client, error)
// Stats returns connection statistics
Stats() *ConnectionStats
}

View File

@@ -0,0 +1,177 @@
package providers
import (
"context"
"database/sql"
"fmt"
"time"
_ "github.com/glebarez/sqlite" // Pure Go SQLite driver
"go.mongodb.org/mongo-driver/mongo"
"github.com/bitechdev/ResolveSpec/pkg/logger"
)
// SQLiteProvider implements Provider for SQLite databases
type SQLiteProvider struct {
db *sql.DB
config ConnectionConfig
}
// NewSQLiteProvider creates a new SQLite provider
func NewSQLiteProvider() *SQLiteProvider {
return &SQLiteProvider{}
}
// Connect establishes a SQLite connection
func (p *SQLiteProvider) Connect(ctx context.Context, cfg ConnectionConfig) error {
// Build DSN
dsn, err := cfg.BuildDSN()
if err != nil {
return fmt.Errorf("failed to build DSN: %w", err)
}
// Open database connection
db, err := sql.Open("sqlite", dsn)
if err != nil {
return fmt.Errorf("failed to open SQLite connection: %w", err)
}
// Test the connection with context timeout
connectCtx, cancel := context.WithTimeout(ctx, cfg.GetConnectTimeout())
err = db.PingContext(connectCtx)
cancel()
if err != nil {
db.Close()
return fmt.Errorf("failed to ping SQLite database: %w", err)
}
// Configure connection pool
// Note: SQLite works best with MaxOpenConns=1 for write operations
// but can handle multiple readers
if cfg.GetMaxOpenConns() != nil {
db.SetMaxOpenConns(*cfg.GetMaxOpenConns())
} else {
// Default to 1 for SQLite to avoid "database is locked" errors
db.SetMaxOpenConns(1)
}
if cfg.GetMaxIdleConns() != nil {
db.SetMaxIdleConns(*cfg.GetMaxIdleConns())
}
if cfg.GetConnMaxLifetime() != nil {
db.SetConnMaxLifetime(*cfg.GetConnMaxLifetime())
}
if cfg.GetConnMaxIdleTime() != nil {
db.SetConnMaxIdleTime(*cfg.GetConnMaxIdleTime())
}
// Enable WAL mode for better concurrent access
_, err = db.ExecContext(ctx, "PRAGMA journal_mode=WAL")
if err != nil {
if cfg.GetEnableLogging() {
logger.Warn("Failed to enable WAL mode for SQLite", "error", err)
}
// Don't fail connection if WAL mode cannot be enabled
}
// Set busy timeout to handle locked database
_, err = db.ExecContext(ctx, "PRAGMA busy_timeout=5000")
if err != nil {
if cfg.GetEnableLogging() {
logger.Warn("Failed to set busy timeout for SQLite", "error", err)
}
}
p.db = db
p.config = cfg
if cfg.GetEnableLogging() {
logger.Info("SQLite connection established: name=%s, filepath=%s", cfg.GetName(), cfg.GetFilePath())
}
return nil
}
// Close closes the SQLite connection
func (p *SQLiteProvider) Close() error {
if p.db == nil {
return nil
}
err := p.db.Close()
if err != nil {
return fmt.Errorf("failed to close SQLite connection: %w", err)
}
if p.config.GetEnableLogging() {
logger.Info("SQLite connection closed: name=%s", p.config.GetName())
}
p.db = nil
return nil
}
// HealthCheck verifies the SQLite connection is alive
func (p *SQLiteProvider) HealthCheck(ctx context.Context) error {
if p.db == nil {
return fmt.Errorf("database connection is nil")
}
// Use a short timeout for health checks
healthCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
// Execute a simple query to verify the database is accessible
var result int
err := p.db.QueryRowContext(healthCtx, "SELECT 1").Scan(&result)
if err != nil {
return fmt.Errorf("health check failed: %w", err)
}
if result != 1 {
return fmt.Errorf("health check returned unexpected result: %d", result)
}
return nil
}
// GetNative returns the native *sql.DB connection
func (p *SQLiteProvider) GetNative() (*sql.DB, error) {
if p.db == nil {
return nil, fmt.Errorf("database connection is not initialized")
}
return p.db, nil
}
// GetMongo returns an error for SQLite (not a MongoDB connection)
func (p *SQLiteProvider) GetMongo() (*mongo.Client, error) {
return nil, ErrNotMongoDB
}
// Stats returns connection pool statistics
func (p *SQLiteProvider) Stats() *ConnectionStats {
if p.db == nil {
return &ConnectionStats{
Name: p.config.GetName(),
Type: "sqlite",
Connected: false,
}
}
stats := p.db.Stats()
return &ConnectionStats{
Name: p.config.GetName(),
Type: "sqlite",
Connected: true,
OpenConnections: stats.OpenConnections,
InUse: stats.InUse,
Idle: stats.Idle,
WaitCount: stats.WaitCount,
WaitDuration: stats.WaitDuration,
MaxIdleClosed: stats.MaxIdleClosed,
MaxLifetimeClosed: stats.MaxLifetimeClosed,
}
}