Files
pgsql-broker/pkg/broker/database_instance.go
Hein 3e64f7ae2a
Some checks failed
Integration Tests / integration-test (push) Failing after -23m59s
feat(testing): add full integration test suite
This commit introduces a comprehensive integration test suite for the pgsql-broker.

The test suite includes:
- A Docker/Podman environment for running a PostgreSQL database, managed via a .
- Integration tests that cover the broker's lifecycle, including job creation, execution, and instance management.
- A GitHub Actions workflow to automate the execution of all tests on push and pull requests.
- A dedicated test configuration file () and helper test files.

refactor(worker): fix job processing transaction
- The worker's job processing now uses a single transaction to fetch and run a job, resolving a race condition where jobs were not in the 'running' state when being executed.
- The broker's database instance registration is now more robust, handling cases where another instance is already active.

The Makefile has been significantly updated to orchestrate the entire test flow, including setting up the database, starting/stopping the broker, and running unit and integration tests separately.
2026-01-02 23:08:17 +02:00

373 lines
10 KiB
Go

package broker
import (
"context"
"database/sql" // Import sql package
"encoding/json"
"fmt"
"os"
"sync"
"time"
"git.warky.dev/wdevs/pgsql-broker/pkg/broker/adapter"
"git.warky.dev/wdevs/pgsql-broker/pkg/broker/config"
"git.warky.dev/wdevs/pgsql-broker/pkg/broker/models"
"git.warky.dev/wdevs/pgsql-broker/pkg/broker/queue"
)
// DatabaseInstance represents a broker instance for a single database
type DatabaseInstance struct {
ID int64
Name string
DatabaseName string
Hostname string
PID int
Version string
config *config.Config
dbConfig *config.DatabaseConfig
db adapter.DBAdapter
logger adapter.Logger
queues map[int]*queue.Queue
queuesMu sync.RWMutex
ctx context.Context
shutdown bool
shutdownMu sync.RWMutex
jobsHandled int64
startTime time.Time
}
// NewDatabaseInstance creates a new database instance
func NewDatabaseInstance(cfg *config.Config, dbCfg *config.DatabaseConfig, db adapter.DBAdapter, logger adapter.Logger, version string, parentCtx context.Context) (*DatabaseInstance, error) {
hostname, err := os.Hostname()
if err != nil {
hostname = "unknown"
}
instance := &DatabaseInstance{
Name: fmt.Sprintf("%s-%s", cfg.Broker.Name, dbCfg.Name),
DatabaseName: dbCfg.Name,
Hostname: hostname,
PID: os.Getpid(),
Version: version,
config: cfg,
dbConfig: dbCfg,
db: db,
logger: logger.With("component", "database-instance").With("database", dbCfg.Name),
queues: make(map[int]*queue.Queue),
ctx: parentCtx,
startTime: time.Now(),
}
return instance, nil
}
// Start begins the database instance
func (i *DatabaseInstance) Start() error {
i.logger.Info("starting database instance", "name", i.Name, "hostname", i.Hostname, "pid", i.PID)
// Connect to database
if err := i.db.Connect(i.ctx); err != nil {
return fmt.Errorf("failed to connect to database: %w", err)
}
// Register instance in database
if err := i.registerInstance(); err != nil {
return fmt.Errorf("failed to register instance: %w", err)
}
i.logger.Info("database instance registered", "id", i.ID)
// Start queues
if err := i.startQueues(); err != nil {
return fmt.Errorf("failed to start queues: %w", err)
}
// Start listening for notifications
if err := i.startListener(); err != nil {
return fmt.Errorf("failed to start listener: %w", err)
}
// Start ping routine
go i.pingRoutine()
i.logger.Info("database instance started successfully")
return nil
}
// Stop gracefully stops the database instance
func (i *DatabaseInstance) Stop() error {
i.shutdownMu.Lock()
if i.shutdown {
i.shutdownMu.Unlock()
return nil
}
i.shutdown = true
i.shutdownMu.Unlock()
i.logger.Info("stopping database instance")
// Stop all queues
i.queuesMu.Lock()
for num, q := range i.queues {
i.logger.Info("stopping queue", "number", num)
if err := q.Stop(); err != nil {
i.logger.Error("failed to stop queue", "number", num, "error", err)
}
}
i.queuesMu.Unlock()
// Update instance status in database
if err := i.shutdownInstance(); err != nil {
i.logger.Error("failed to shutdown instance in database", "error", err)
}
// Close database connection
if err := i.db.Close(); err != nil {
i.logger.Error("failed to close database", "error", err)
}
i.logger.Info("database instance stopped")
return nil
}
// registerInstance registers the instance in the database
func (i *DatabaseInstance) registerInstance() error {
var retval int
var errmsg string
var nullableInstanceID sql.NullInt64 // Change to nullable type
i.logger.Debug("registering instance", "name", i.Name, "hostname", i.Hostname, "pid", i.PID, "version", i.Version, "queue_count", i.dbConfig.QueueCount)
err := i.db.QueryRow(i.ctx,
"SELECT p_retval, p_errmsg, p_instance_id FROM broker_register_instance($1, $2, $3, $4, $5)",
i.Name, i.Hostname, i.PID, i.Version, i.dbConfig.QueueCount,
).Scan(&retval, &errmsg, &nullableInstanceID)
if err != nil {
i.logger.Error("query error during instance registration", "error", err)
return fmt.Errorf("query error: %w", err)
}
if retval == 3 {
i.logger.Warn("another broker instance is already active, attempting to retrieve ID", "error", errmsg)
// Try to retrieve the ID of the active instance
var activeID int64
err := i.db.QueryRow(i.ctx,
"SELECT id_broker_queueinstance FROM broker_queueinstance WHERE name = $1 AND hostname = $2 AND status = 'active' ORDER BY started_at DESC LIMIT 1",
i.Name, i.Hostname,
).Scan(&activeID)
if err != nil {
i.logger.Error("failed to retrieve ID of active instance", "error", err)
return fmt.Errorf("failed to retrieve ID of active instance: %w", err)
}
i.ID = activeID
i.logger.Info("retrieved active instance ID", "id", i.ID)
return nil
} else if retval > 0 {
i.logger.Error("broker_register_instance error", "retval", retval, "errmsg", errmsg)
return fmt.Errorf("broker_register_instance error: %s", errmsg)
}
// If successfully registered, nullableInstanceID.Valid will be true
if nullableInstanceID.Valid {
i.ID = nullableInstanceID.Int64
i.logger.Info("registered new instance", "id", i.ID)
// Debug logging: Retrieve all entries from broker_queueinstance
rows, err := i.db.Query(i.ctx, "SELECT id_broker_queueinstance, name, hostname, status FROM broker_queueinstance")
if err != nil {
i.logger.Error("debug query failed", "error", err)
} else {
defer rows.Close()
for rows.Next() {
var id int64
var name, hostname, status string
if err := rows.Scan(&id, &name, &hostname, &status); err != nil {
i.logger.Error("debug scan failed", "error", err)
break
}
i.logger.Debug("broker_queueinstance entry", "id", id, "name", name, "hostname", hostname, "status", status)
}
}
} else {
// This case should ideally not happen if retval is 0 (success)
// but if it does, it means p_instance_id was NULL despite success.
// This would be an unexpected scenario.
i.logger.Error("broker_register_instance returned success but no instance ID", "retval", retval, "errmsg", errmsg)
return fmt.Errorf("broker_register_instance returned success but no instance ID")
}
return nil
}
// startQueues initializes and starts all queues
func (i *DatabaseInstance) startQueues() error {
i.queuesMu.Lock()
defer i.queuesMu.Unlock()
for queueNum := 1; queueNum <= i.dbConfig.QueueCount; queueNum++ {
queueCfg := queue.Config{
Number: queueNum,
InstanceID: i.ID,
WorkerCount: 1, // One worker per queue for now
DBAdapter: i.db,
Logger: i.logger,
BufferSize: i.config.Broker.QueueBufferSize,
TimerSeconds: i.config.Broker.QueueTimerSec,
FetchSize: i.config.Broker.FetchQueryQueSize,
}
q := queue.New(queueCfg)
if err := q.Start(queueCfg); err != nil {
return fmt.Errorf("failed to start queue %d: %w", queueNum, err)
}
i.queues[queueNum] = q
i.logger.Info("queue started", "number", queueNum)
}
return nil
}
// startListener starts listening for database notifications
func (i *DatabaseInstance) startListener() error {
handler := func(n *adapter.Notification) {
i.handleNotification(n)
}
if err := i.db.Listen(i.ctx, "broker.event", handler); err != nil {
return fmt.Errorf("failed to start listener: %w", err)
}
return nil
}
// handleNotification processes incoming job notifications
func (i *DatabaseInstance) handleNotification(n *adapter.Notification) {
if i.config.Broker.EnableDebug {
i.logger.Debug("received notification", "channel", n.Channel, "payload", n.Payload)
}
var job models.Job
if err := json.Unmarshal([]byte(n.Payload), &job); err != nil {
i.logger.Error("failed to unmarshal notification", "error", err, "payload", n.Payload)
return
}
if job.ID <= 0 {
i.logger.Warn("notification missing job ID", "payload", n.Payload)
return
}
if job.JobQueue <= 0 {
i.logger.Warn("notification missing queue number", "job_id", job.ID)
return
}
// Get the queue
i.queuesMu.RLock()
q, exists := i.queues[job.JobQueue]
i.queuesMu.RUnlock()
if !exists {
i.logger.Warn("queue not found for job", "job_id", job.ID, "queue", job.JobQueue)
return
}
// Add job to queue
if err := q.AddJob(job); err != nil {
i.logger.Error("failed to add job to queue", "job_id", job.ID, "queue", job.JobQueue, "error", err)
}
}
// pingRoutine periodically updates the instance status in the database
func (i *DatabaseInstance) pingRoutine() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
i.shutdownMu.RLock()
if i.shutdown {
i.shutdownMu.RUnlock()
return
}
i.shutdownMu.RUnlock()
if err := i.ping(); err != nil {
i.logger.Error("ping failed", "error", err)
}
case <-i.ctx.Done():
return
}
}
}
// ping updates the instance ping timestamp
func (i *DatabaseInstance) ping() error {
var retval int
var errmsg string
err := i.db.QueryRow(i.ctx,
"SELECT p_retval, p_errmsg FROM broker_ping_instance($1, $2)",
i.ID, i.jobsHandled,
).Scan(&retval, &errmsg)
if err != nil {
return fmt.Errorf("query error: %w", err)
}
if retval > 0 {
return fmt.Errorf("broker_ping_instance error: %s", errmsg)
}
return nil
}
// shutdownInstance marks the instance as shutdown in the database
func (i *DatabaseInstance) shutdownInstance() error {
var retval int
var errmsg string
err := i.db.QueryRow(i.ctx,
"SELECT p_retval, p_errmsg FROM broker_shutdown_instance($1)",
i.ID,
).Scan(&retval, &errmsg)
if err != nil {
return fmt.Errorf("query error: %w", err)
}
if retval > 0 {
return fmt.Errorf("broker_shutdown_instance error: %s", errmsg)
}
return nil
}
// GetStats returns instance statistics
func (i *DatabaseInstance) GetStats() map[string]interface{} {
i.queuesMu.RLock()
defer i.queuesMu.RUnlock()
stats := map[string]interface{}{
"id": i.ID,
"name": i.Name,
"database_name": i.DatabaseName,
"hostname": i.Hostname,
"pid": i.PID,
"version": i.Version,
"uptime": time.Since(i.startTime).String(),
"jobs_handled": i.jobsHandled,
"queue_count": len(i.queues),
}
queueStats := make(map[int]interface{})
for num, q := range i.queues {
queueStats[num] = q.GetStats()
}
stats["queues"] = queueStats
return stats
}