Files
pgsql-broker/pkg/broker/worker/worker.go
Hein 3e64f7ae2a
Some checks failed
Integration Tests / integration-test (push) Failing after -23m59s
feat(testing): add full integration test suite
This commit introduces a comprehensive integration test suite for the pgsql-broker.

The test suite includes:
- A Docker/Podman environment for running a PostgreSQL database, managed via a .
- Integration tests that cover the broker's lifecycle, including job creation, execution, and instance management.
- A GitHub Actions workflow to automate the execution of all tests on push and pull requests.
- A dedicated test configuration file () and helper test files.

refactor(worker): fix job processing transaction
- The worker's job processing now uses a single transaction to fetch and run a job, resolving a race condition where jobs were not in the 'running' state when being executed.
- The broker's database instance registration is now more robust, handling cases where another instance is already active.

The Makefile has been significantly updated to orchestrate the entire test flow, including setting up the database, starting/stopping the broker, and running unit and integration tests separately.
2026-01-02 23:08:17 +02:00

261 lines
5.7 KiB
Go

package worker
import (
"context"
"database/sql" // Import sql package
"fmt"
"sync"
"time"
"git.warky.dev/wdevs/pgsql-broker/pkg/broker/adapter"
"git.warky.dev/wdevs/pgsql-broker/pkg/broker/models"
)
// Worker represents a single job processing worker
type Worker struct {
ID int
QueueNumber int
InstanceID int64
db adapter.DBAdapter
logger adapter.Logger
jobChan chan models.Job
shutdown chan struct{}
wg *sync.WaitGroup
running bool
mu sync.RWMutex
lastActivity time.Time
jobsHandled int64
timerSeconds int
fetchSize int
}
// Stats holds worker statistics
type Stats struct {
LastActivity time.Time
JobsHandled int64
Running bool
}
// Config holds worker configuration
type Config struct {
ID int
QueueNumber int
InstanceID int64
DBAdapter adapter.DBAdapter
Logger adapter.Logger
BufferSize int
TimerSeconds int
FetchSize int
}
// New creates a new worker
func New(cfg Config) *Worker {
return &Worker{
ID: cfg.ID,
QueueNumber: cfg.QueueNumber,
InstanceID: cfg.InstanceID,
db: cfg.DBAdapter,
logger: cfg.Logger.With("worker_id", cfg.ID).With("queue", cfg.QueueNumber),
jobChan: make(chan models.Job, cfg.BufferSize),
shutdown: make(chan struct{}),
wg: &sync.WaitGroup{},
timerSeconds: cfg.TimerSeconds,
fetchSize: cfg.FetchSize,
}
}
// Start begins the worker processing loop
func (w *Worker) Start(ctx context.Context) error {
w.mu.Lock()
if w.running {
w.mu.Unlock()
return fmt.Errorf("worker %d already running", w.ID)
}
w.running = true
w.mu.Unlock()
w.logger.Info("worker starting")
w.wg.Add(1)
go w.processLoop(ctx)
return nil
}
// Stop gracefully stops the worker
func (w *Worker) Stop() error {
w.mu.Lock()
if !w.running {
w.mu.Unlock()
return nil
}
w.mu.Unlock()
w.logger.Info("worker stopping")
close(w.shutdown)
w.wg.Wait()
w.mu.Lock()
w.running = false
w.mu.Unlock()
w.logger.Info("worker stopped")
return nil
}
// AddJob adds a job to the worker's queue
func (w *Worker) AddJob(job models.Job) error {
select {
case w.jobChan <- job:
return nil
default:
return fmt.Errorf("worker %d job channel is full", w.ID)
}
}
// processLoop is the main worker processing loop
func (w *Worker) processLoop(ctx context.Context) {
defer w.wg.Done()
defer w.recoverPanic()
timer := time.NewTimer(time.Duration(w.timerSeconds) * time.Second)
defer timer.Stop()
for {
select {
case job := <-w.jobChan:
w.updateActivity()
w.processJobs(ctx, &job)
case <-timer.C:
// Timer expired - fetch jobs from database
if w.timerSeconds > 0 {
w.updateActivity()
w.processJobs(ctx, nil)
}
timer.Reset(time.Duration(w.timerSeconds) * time.Second)
case <-w.shutdown:
w.logger.Info("worker shutdown signal received")
return
case <-ctx.Done():
w.logger.Info("worker context cancelled")
return
}
}
}
// processJobs processes jobs from the queue within a transaction
func (w *Worker) processJobs(ctx context.Context, specificJob *models.Job) {
defer w.recoverPanic()
for i := 0; i < w.fetchSize; i++ {
tx, err := w.db.Begin(ctx) // Start transaction
if err != nil {
w.logger.Error("failed to begin transaction", "error", err)
return
}
var jobID int64
if specificJob != nil && specificJob.ID > 0 {
jobID = specificJob.ID
specificJob = nil // Only process once
} else {
jobID, err = w.fetchNextJobTx(ctx, tx) // Use transaction
if err != nil {
tx.Rollback() // Rollback on fetch error
w.logger.Error("failed to fetch job", "error", err)
return
}
}
if jobID <= 0 {
tx.Rollback() // No job found, rollback
return // No more jobs
}
// Run the job
if err := w.runJobTx(ctx, tx, jobID); err != nil { // Use transaction
tx.Rollback() // Rollback on job execution error
w.logger.Error("failed to run job", "job_id", jobID, "error", err)
} else {
tx.Commit() // Commit if job successful
w.jobsHandled++
}
}
}
// fetchNextJobTx fetches the next job from the queue within a transaction
func (w *Worker) fetchNextJobTx(ctx context.Context, tx adapter.DBTransaction) (int64, error) {
var retval int
var errmsg string
var nullableJobID sql.NullInt64
err := tx.QueryRow(ctx,
"SELECT p_retval, p_errmsg, p_job_id FROM broker_get($1, $2)",
w.QueueNumber, w.InstanceID,
).Scan(&retval, &errmsg, &nullableJobID)
if err != nil {
return 0, fmt.Errorf("query error: %w", err)
}
if retval > 0 {
return 0, fmt.Errorf("broker_get error: %s", errmsg)
}
if !nullableJobID.Valid {
return 0, nil
}
return nullableJobID.Int64, nil
}
// runJobTx executes a job within a transaction
func (w *Worker) runJobTx(ctx context.Context, tx adapter.DBTransaction, jobID int64) error {
w.logger.Debug("running job", "job_id", jobID)
var retval int
var errmsg string
err := tx.QueryRow(ctx,
"SELECT p_retval, p_errmsg FROM broker_run($1)",
jobID,
).Scan(&retval, &errmsg)
if err != nil {
return fmt.Errorf("query error: %w", err)
}
if retval > 0 {
return fmt.Errorf("broker_run error: %s", errmsg)
}
w.logger.Debug("job completed", "job_id", jobID)
return nil
}
// updateActivity updates the last activity timestamp
func (w *Worker) updateActivity() {
w.mu.Lock()
w.lastActivity = time.Now()
w.mu.Unlock()
}
// GetStats returns worker statistics
func (w *Worker) GetStats() (lastActivity time.Time, jobsHandled int64, running bool) {
w.mu.RLock()
defer w.mu.RUnlock()
return w.lastActivity, w.jobsHandled, w.running
}
// recoverPanic recovers from panics in the worker
func (w *Worker) recoverPanic() {
if r := recover(); r != nil {
w.logger.Error("worker panic recovered", "panic", r)
}
}