feat: 🎉 postgresql broker first commit of forked prototype from my original code

This commit is contained in:
2026-01-02 20:56:39 +02:00
parent e90e5902cd
commit 19e469ff54
29 changed files with 3325 additions and 2 deletions

25
.gitignore vendored
View File

@@ -25,3 +25,28 @@ go.work.sum
# env file # env file
.env .env
# Binaries directory
bin/
# Configuration (exclude actual config, keep examples)
broker.yaml
broker.yml
broker.json
!broker.example.yaml
# IDE
.vscode/
.idea/
*.swp
*.swo
*~
# OS
.DS_Store
Thumbs.db
# Logs
*.log
# Coverage
coverage.html

88
Makefile Normal file
View File

@@ -0,0 +1,88 @@
.PHONY: all build clean test install deps help
# Build variables
BINARY_NAME=pgsql-broker
BIN_DIR=bin
CMD_DIR=cmd/broker
GO=go
GOFLAGS=-v
LDFLAGS=-w -s
# Version information
VERSION ?= $(shell git describe --tags --always --dirty 2>/dev/null || echo "dev")
BUILD_TIME=$(shell date -u '+%Y-%m-%d_%H:%M:%S')
COMMIT=$(shell git rev-parse --short HEAD 2>/dev/null || echo "unknown")
# Inject version info
LDFLAGS += -X 'main.Version=$(VERSION)' -X 'main.BuildTime=$(BUILD_TIME)' -X 'main.Commit=$(COMMIT)'
all: clean deps build ## Build everything
build: ## Build the broker binary
@echo "Building $(BINARY_NAME)..."
@mkdir -p $(BIN_DIR)
$(GO) build $(GOFLAGS) -ldflags "$(LDFLAGS)" -o $(BIN_DIR)/$(BINARY_NAME) ./$(CMD_DIR)
@echo "Built: $(BIN_DIR)/$(BINARY_NAME)"
clean: ## Remove build artifacts
@echo "Cleaning..."
@rm -rf $(BIN_DIR)
@$(GO) clean
@echo "Clean complete"
deps: ## Download dependencies
@echo "Downloading dependencies..."
@$(GO) mod download
@$(GO) mod tidy
@echo "Dependencies ready"
test: ## Run tests
@echo "Running tests..."
@$(GO) test -v -race -cover ./...
install: build ## Install the binary to GOPATH/bin
@echo "Installing to GOPATH/bin..."
@$(GO) install $(GOFLAGS) -ldflags "$(LDFLAGS)" ./$(CMD_DIR)
@echo "Installed: $(BINARY_NAME)"
run: build ## Build and run the broker
@echo "Running $(BINARY_NAME)..."
@$(BIN_DIR)/$(BINARY_NAME)
fmt: ## Format the code
@echo "Formatting code..."
@$(GO) fmt ./...
@echo "Formatting complete"
vet: ## Run go vet
@echo "Running go vet..."
@$(GO) vet ./...
@echo "Vet complete"
lint: ## Run golangci-lint (if installed)
@if command -v golangci-lint >/dev/null 2>&1; then \
echo "Running golangci-lint..."; \
golangci-lint run ./...; \
else \
echo "golangci-lint not installed, skipping"; \
fi
sql-install: build ## Install SQL tables and procedures using broker CLI
@echo "Installing SQL schema..."
@$(BIN_DIR)/$(BINARY_NAME) install
sql-install-manual: ## Install SQL tables and procedures manually via psql
@echo "Installing SQL schema manually..."
@if [ -z "$$PGDATABASE" ]; then \
echo "Error: PGDATABASE environment variable not set"; \
exit 1; \
fi
@psql -f pkg/broker/install/sql/tables/00_install.sql
@psql -f pkg/broker/install/sql/procedures/00_install.sql
@echo "SQL schema installed"
help: ## Show this help message
@echo "Usage: make [target]"
@echo ""
@echo "Targets:"
@grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf " %-15s %s\n", $$1, $$2}'

294
README.md
View File

@@ -1,3 +1,293 @@
# pgsql-broker # PostgreSQL Broker
PostgreSQL Broker A robust, event-driven job processing system for PostgreSQL that uses LISTEN/NOTIFY for real-time job execution. It supports multiple queues, priority-based scheduling, and can be used both as a standalone service or as a Go library.
## Features
- **Multi-Database Support**: Single broker process can manage multiple database connections
- **Event-Driven**: Uses PostgreSQL LISTEN/NOTIFY for instant job notifications
- **Multiple Queues**: Support for concurrent job processing across multiple queues per database
- **Priority Scheduling**: Jobs can be prioritized for execution order
- **Job Dependencies**: Jobs can depend on other jobs being completed first
- **Adapter Pattern**: Clean interfaces for database and logging (easy to extend)
- **Standalone or Library**: Use as a CLI tool or integrate into your Go application
- **Configuration Management**: Viper-based config with support for YAML, JSON, and environment variables
- **Graceful Shutdown**: Proper cleanup and job completion on shutdown
- **Instance Tracking**: Monitor active broker instances through the database
- **Single Instance Per Database**: Enforces one broker instance per database to prevent conflicts
- **Embedded SQL Installer**: Database schema embedded in binary with built-in install command
## Architecture
The broker supports multi-database architecture where a single broker process can manage multiple database connections. Each database has its own instance with dedicated queues, but only ONE broker instance is allowed per database.
```
┌─────────────────────────────────────────────────────────────────┐
│ Broker Process │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌────────────────────────────┐ ┌───────────────────────────┐ │
│ │ Database Instance (DB1) │ │ Database Instance (DB2) │ │
│ ├────────────────────────────┤ ├───────────────────────────┤ │
│ │ ┌────┐ ┌────┐ ┌────┐ │ │ ┌────┐ ┌────┐ │ │
│ │ │ Q1 │ │ Q2 │ │ QN │ │ │ │ Q1 │ │ Q2 │ │ │
│ │ └────┘ └────┘ └────┘ │ │ └────┘ └────┘ │ │
│ │ │ │ │ │
│ │ ┌────────────────────────┐│ │ ┌────────────────────────┐│ │
│ │ │ PostgreSQL Adapter ││ │ │ PostgreSQL Adapter ││ │
│ │ │ - Connection Pool ││ │ │ - Connection Pool ││ │
│ │ │ - LISTEN/NOTIFY ││ │ │ - LISTEN/NOTIFY ││ │
│ │ └────────────────────────┘│ │ └────────────────────────┘│ │
│ └────────────┬───────────────┘ └──────────┬────────────────┘ │
│ │ │ │
└───────────────┼──────────────────────────────┼───────────────────┘
│ │
▼ ▼
┌──────────────────────┐ ┌──────────────────────┐
│ PostgreSQL (DB1) │ │ PostgreSQL (DB2) │
│ - broker_jobs │ │ - broker_jobs │
│ - broker_queueinstance│ │ - broker_queueinstance│
│ - broker_schedule │ │ - broker_schedule │
└──────────────────────┘ └──────────────────────┘
```
**Key Points**:
- One broker process can manage multiple databases
- Each database has exactly ONE active broker instance
- Each database instance has its own queues and workers
- Validation prevents multiple broker processes from connecting to the same database
- Different databases can have different queue counts
## Installation
### From Source
```bash
git clone git.warky.dev/wdevs/pgsql-broker
cd pgsql-broker
make build
```
The binary will be available in `bin/pgsql-broker`.
### As a Library
```bash
go get git.warky.dev/wdevs/pgsql-broker
```
## Quick Start
### 1. Setup Database
Install the required tables and stored procedures:
```bash
# Using the CLI (recommended)
./bin/pgsql-broker install --config broker.yaml
# Or with make
make sql-install
# Verify installation
./bin/pgsql-broker install --verify-only --config broker.yaml
# Or manually with psql:
psql -f pkg/broker/install/sql/tables/00_install.sql
psql -f pkg/broker/install/sql/procedures/00_install.sql
```
### 2. Configure
Create a configuration file `broker.yaml`:
```yaml
databases:
- name: db1
host: localhost
port: 5432
database: broker_db1
user: postgres
password: your_password
sslmode: disable
queue_count: 4
# Optional: add more databases
- name: db2
host: localhost
port: 5432
database: broker_db2
user: postgres
password: your_password
sslmode: disable
queue_count: 2
broker:
name: pgsql-broker
enable_debug: false
logging:
level: info
format: json
```
**Note**: Each database requires a unique `name` identifier and can have its own `queue_count` configuration.
### 3. Run the Broker
```bash
# Using the binary
./bin/pgsql-broker start --config broker.yaml
# Or with make
make run
# Or with custom log level
./bin/pgsql-broker start --log-level debug
```
### 4. Add a Job
```sql
SELECT broker_add_job(
'My Job', -- job_name
'SELECT do_something()', -- execute_str
1, -- job_queue (default: 1)
0, -- job_priority (default: 0)
'sql', -- job_language (default: 'sql')
NULL, -- run_as
NULL, -- user_login
NULL -- schedule_id
);
```
## Usage as a Library
```go
package main
import (
"git.warky.dev/wdevs/pgsql-broker/pkg/broker"
"git.warky.dev/wdevs/pgsql-broker/pkg/broker/adapter"
"git.warky.dev/wdevs/pgsql-broker/pkg/broker/config"
)
func main() {
// Load config
cfg, _ := config.LoadConfig("broker.yaml")
// Create logger
logger := adapter.NewSlogLogger(slog.LevelInfo)
// Create database adapter
dbAdapter := adapter.NewPostgresAdapter(cfg.Database.ToPostgresConfig(), logger)
// Create and start broker
instance, _ := broker.New(cfg, dbAdapter, logger, "1.0.0")
instance.Start()
// ... wait for shutdown signal ...
instance.Stop()
}
```
See the [examples](./examples/) directory for complete examples.
## Database Schema
### Tables
- **broker_queueinstance**: Tracks active broker queue instances (one per database)
- **broker_jobs**: Job queue with status tracking
- **broker_schedule**: Scheduled jobs (cron-like functionality)
### Stored Procedures
- **broker_get**: Fetch the next job from a queue
- **broker_run**: Execute a job
- **broker_set**: Set runtime options (user, application_name, etc.)
- **broker_add_job**: Add a new job to the queue
- **broker_register_instance**: Register a broker instance
- **broker_ping_instance**: Update instance heartbeat
- **broker_shutdown_instance**: Mark instance as shutdown
## Configuration Reference
See [broker.example.yaml](./broker.example.yaml) for a complete configuration example.
### Database Settings
The `databases` array can contain multiple database configurations. Each entry supports:
| Setting | Description | Default |
|---------|-------------|---------|
| `name` | Unique identifier for this database | **Required** |
| `host` | PostgreSQL host | `localhost` |
| `port` | PostgreSQL port | `5432` |
| `database` | Database name | **Required** |
| `user` | Database user | **Required** |
| `password` | Database password | - |
| `sslmode` | SSL mode | `disable` |
| `max_open_conns` | Max open connections | `25` |
| `max_idle_conns` | Max idle connections | `5` |
| `conn_max_lifetime` | Connection max lifetime | `5m` |
| `conn_max_idle_time` | Connection max idle time | `10m` |
| `queue_count` | Number of queues for this database | `4` |
### Broker Settings
Global settings applied to all database instances:
| Setting | Description | Default |
|---------|-------------|---------|
| `name` | Broker instance name | `pgsql-broker` |
| `fetch_query_que_size` | Jobs per fetch cycle | `100` |
| `queue_timer_sec` | Seconds between polls | `10` |
| `queue_buffer_size` | Job buffer size | `50` |
| `worker_idle_timeout_sec` | Worker idle timeout | `10` |
| `notify_retry_seconds` | NOTIFY retry interval | `30s` |
| `enable_debug` | Enable debug logging | `false` |
## Development
### Building
```bash
make build # Build the binary
make clean # Clean build artifacts
make deps # Download dependencies
make fmt # Format code
make vet # Run go vet
make test # Run tests
```
### Project Structure
```
pgsql-broker/
├── cmd/broker/ # CLI application
├── pkg/broker/ # Core broker package
│ ├── adapter/ # Database & logger interfaces
│ ├── config/ # Configuration management
│ ├── models/ # Data models
│ ├── queue/ # Queue management
│ ├── worker/ # Worker implementation
│ └── install/ # Database installer with embedded SQL
│ └── sql/ # SQL schema (embedded in binary)
│ ├── tables/ # Table definitions
│ └── procedures/ # Stored procedures
├── examples/ # Usage examples
└── Makefile # Build automation
```
## Contributing
Contributions are welcome! Please ensure:
- Code is formatted with `go fmt`
- Tests pass with `go test ./...`
- Documentation is updated
## License
See [LICENSE](./LICENSE) file for details.

48
broker.example.yaml Normal file
View File

@@ -0,0 +1,48 @@
# PostgreSQL Broker Configuration Example
# Database connections settings
# The broker can manage multiple databases, each with its own instance and queues
# Important: Each database can only have ONE active broker instance
databases:
# First database
- name: db1 # Unique identifier for this database connection
host: localhost
port: 5432
database: broker_db1
user: postgres
password: your_password_here
sslmode: disable # Options: disable, require, verify-ca, verify-full
max_open_conns: 25
max_idle_conns: 5
conn_max_lifetime: 5m
conn_max_idle_time: 10m
queue_count: 4 # Number of concurrent queues for this database
# Second database (optional - add as many as needed)
- name: db2
host: localhost
port: 5432
database: broker_db2
user: postgres
password: your_password_here
sslmode: disable
max_open_conns: 25
max_idle_conns: 5
conn_max_lifetime: 5m
conn_max_idle_time: 10m
queue_count: 2 # Can have different queue count per database
# Broker settings (applied to all database instances)
broker:
name: pgsql-broker
fetch_query_que_size: 100 # Number of jobs to process per fetch cycle
queue_timer_sec: 10 # Seconds between queue polls
queue_buffer_size: 50 # Size of job buffer per queue
worker_idle_timeout_sec: 10 # Worker idle timeout
notify_retry_seconds: 30s # LISTEN/NOTIFY retry interval
enable_debug: false # Enable debug logging
# Logging settings
logging:
level: info # Options: debug, info, warn, error
format: json # Options: json, text

233
cmd/broker/main.go Normal file
View File

@@ -0,0 +1,233 @@
package main
import (
"context"
"fmt"
"log/slog"
"os"
"os/signal"
"syscall"
"github.com/spf13/cobra"
"git.warky.dev/wdevs/pgsql-broker/pkg/broker"
"git.warky.dev/wdevs/pgsql-broker/pkg/broker/adapter"
"git.warky.dev/wdevs/pgsql-broker/pkg/broker/config"
"git.warky.dev/wdevs/pgsql-broker/pkg/broker/install"
)
var (
// Version information (injected by build)
Version = "dev"
BuildTime = "unknown"
Commit = "unknown"
// Command line flags
cfgFile string
logLevel string
verifyOnly bool
)
func main() {
if err := rootCmd.Execute(); err != nil {
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
os.Exit(1)
}
}
var rootCmd = &cobra.Command{
Use: "pgsql-broker",
Short: "PostgreSQL job broker for background job processing",
Long: `PostgreSQL Broker is a job processing system that uses PostgreSQL
LISTEN/NOTIFY for event-driven job execution. It supports multiple queues,
priority-based scheduling, and horizontal scaling.`,
Version: fmt.Sprintf("%s (built %s, commit %s)", Version, BuildTime, Commit),
}
var startCmd = &cobra.Command{
Use: "start",
Short: "Start the broker instance",
Long: `Start the broker instance and begin processing jobs from the database queue.`,
RunE: func(cmd *cobra.Command, args []string) error {
return runBroker()
},
}
var versionCmd = &cobra.Command{
Use: "version",
Short: "Print version information",
Run: func(cmd *cobra.Command, args []string) {
fmt.Printf("pgsql-broker version %s\n", Version)
fmt.Printf("Built: %s\n", BuildTime)
fmt.Printf("Commit: %s\n", Commit)
},
}
var installCmd = &cobra.Command{
Use: "install",
Short: "Install database schema (tables and procedures)",
Long: `Install the required database schema including tables and stored procedures.
This command will create all necessary tables and functions in the configured database.`,
RunE: func(cmd *cobra.Command, args []string) error {
return runInstall()
},
}
func init() {
rootCmd.AddCommand(startCmd)
rootCmd.AddCommand(versionCmd)
rootCmd.AddCommand(installCmd)
// Persistent flags
rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file (default is broker.yaml)")
rootCmd.PersistentFlags().StringVar(&logLevel, "log-level", "info", "log level (debug, info, warn, error)")
// Install command flags
installCmd.Flags().BoolVar(&verifyOnly, "verify-only", false, "only verify installation without installing")
}
func runBroker() error {
// Load configuration
cfg, err := config.LoadConfig(cfgFile)
if err != nil {
return fmt.Errorf("failed to load config: %w", err)
}
// Override log level if specified
if logLevel != "" {
cfg.Logging.Level = logLevel
}
// Setup logger
logger := createLogger(cfg.Logging)
logger.Info("starting pgsql-broker", "version", Version, "databases", len(cfg.Databases))
// Create broker (manages all database instances)
b, err := broker.New(cfg, logger, Version)
if err != nil {
return fmt.Errorf("failed to create broker: %w", err)
}
// Start the broker (starts all database instances)
if err := b.Start(); err != nil {
return fmt.Errorf("failed to start broker: %w", err)
}
// Wait for shutdown signal
waitForShutdown(b, logger)
return nil
}
func runInstall() error {
// Load configuration
cfg, err := config.LoadConfig(cfgFile)
if err != nil {
return fmt.Errorf("failed to load config: %w", err)
}
// Override log level if specified
if logLevel != "" {
cfg.Logging.Level = logLevel
}
// Setup logger
logger := createLogger(cfg.Logging)
logger.Info("pgsql-broker database installer", "version", Version, "databases", len(cfg.Databases))
ctx := context.Background()
// Install/verify on all configured databases
for i, dbCfg := range cfg.Databases {
logger.Info("processing database", "index", i, "name", dbCfg.Name, "host", dbCfg.Host, "database", dbCfg.Database)
// Create database adapter
dbAdapter := adapter.NewPostgresAdapter(dbCfg.ToPostgresConfig(), logger)
// Connect to database
if err := dbAdapter.Connect(ctx); err != nil {
return fmt.Errorf("failed to connect to database %s: %w", dbCfg.Name, err)
}
// Create installer
installer := install.New(dbAdapter, logger)
if verifyOnly {
// Only verify installation
logger.Info("verifying database schema", "database", dbCfg.Name)
if err := installer.VerifyInstallation(ctx); err != nil {
dbAdapter.Close()
logger.Error("verification failed", "database", dbCfg.Name, "error", err)
return fmt.Errorf("verification failed for %s: %w", dbCfg.Name, err)
}
logger.Info("database schema verified successfully", "database", dbCfg.Name)
} else {
// Install schema
logger.Info("installing database schema", "database", dbCfg.Name)
if err := installer.InstallSchema(ctx); err != nil {
dbAdapter.Close()
logger.Error("installation failed", "database", dbCfg.Name, "error", err)
return fmt.Errorf("installation failed for %s: %w", dbCfg.Name, err)
}
// Verify installation
logger.Info("verifying installation", "database", dbCfg.Name)
if err := installer.VerifyInstallation(ctx); err != nil {
dbAdapter.Close()
logger.Error("verification failed", "database", dbCfg.Name, "error", err)
return fmt.Errorf("verification failed for %s: %w", dbCfg.Name, err)
}
logger.Info("database schema installed and verified successfully", "database", dbCfg.Name)
}
dbAdapter.Close()
}
if verifyOnly {
logger.Info("all databases verified successfully")
} else {
logger.Info("all databases installed and verified successfully")
}
return nil
}
func createLogger(cfg config.LoggingConfig) adapter.Logger {
// Parse log level
var level slog.Level
switch cfg.Level {
case "debug":
level = slog.LevelDebug
case "info":
level = slog.LevelInfo
case "warn":
level = slog.LevelWarn
case "error":
level = slog.LevelError
default:
level = slog.LevelInfo
}
// Create handler based on format
var handler slog.Handler
opts := &slog.HandlerOptions{Level: level}
if cfg.Format == "text" {
handler = slog.NewTextHandler(os.Stdout, opts)
} else {
handler = slog.NewJSONHandler(os.Stdout, opts)
}
return adapter.NewSlogLoggerWithHandler(handler)
}
func waitForShutdown(b *broker.Broker, logger adapter.Logger) {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM, syscall.SIGINT)
sig := <-sigChan
logger.Info("received shutdown signal", "signal", sig)
if err := b.Stop(); err != nil {
logger.Error("error during shutdown", "error", err)
}
}

25
go.mod Normal file
View File

@@ -0,0 +1,25 @@
module git.warky.dev/wdevs/pgsql-broker
go 1.25.5
require (
github.com/lib/pq v1.10.9
github.com/spf13/cobra v1.10.2
github.com/spf13/viper v1.21.0
)
require (
github.com/fsnotify/fsnotify v1.9.0 // indirect
github.com/go-viper/mapstructure/v2 v2.4.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/pelletier/go-toml/v2 v2.2.4 // indirect
github.com/sagikazarmark/locafero v0.11.0 // indirect
github.com/sourcegraph/conc v0.3.1-0.20240121214520-5f936abd7ae8 // indirect
github.com/spf13/afero v1.15.0 // indirect
github.com/spf13/cast v1.10.0 // indirect
github.com/spf13/pflag v1.0.10 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
go.yaml.in/yaml/v3 v3.0.4 // indirect
golang.org/x/sys v0.29.0 // indirect
golang.org/x/text v0.28.0 // indirect
)

56
go.sum Normal file
View File

@@ -0,0 +1,56 @@
github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k=
github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
github.com/go-viper/mapstructure/v2 v2.4.0 h1:EBsztssimR/CONLSZZ04E8qAkxNYq4Qp9LvH92wZUgs=
github.com/go-viper/mapstructure/v2 v2.4.0/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/pelletier/go-toml/v2 v2.2.4 h1:mye9XuhQ6gvn5h28+VilKrrPoQVanw5PMw/TB0t5Ec4=
github.com/pelletier/go-toml/v2 v2.2.4/go.mod h1:2gIqNv+qfxSVS7cM2xJQKtLSTLUE9V8t9Stt+h56mCY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/sagikazarmark/locafero v0.11.0 h1:1iurJgmM9G3PA/I+wWYIOw/5SyBtxapeHDcg+AAIFXc=
github.com/sagikazarmark/locafero v0.11.0/go.mod h1:nVIGvgyzw595SUSUE6tvCp3YYTeHs15MvlmU87WwIik=
github.com/sourcegraph/conc v0.3.1-0.20240121214520-5f936abd7ae8 h1:+jumHNA0Wrelhe64i8F6HNlS8pkoyMv5sreGx2Ry5Rw=
github.com/sourcegraph/conc v0.3.1-0.20240121214520-5f936abd7ae8/go.mod h1:3n1Cwaq1E1/1lhQhtRK2ts/ZwZEhjcQeJQ1RuC6Q/8U=
github.com/spf13/afero v1.15.0 h1:b/YBCLWAJdFWJTN9cLhiXXcD7mzKn9Dm86dNnfyQw1I=
github.com/spf13/afero v1.15.0/go.mod h1:NC2ByUVxtQs4b3sIUphxK0NioZnmxgyCrfzeuq8lxMg=
github.com/spf13/cast v1.10.0 h1:h2x0u2shc1QuLHfxi+cTJvs30+ZAHOGRic8uyGTDWxY=
github.com/spf13/cast v1.10.0/go.mod h1:jNfB8QC9IA6ZuY2ZjDp0KtFO2LZZlg4S/7bzP6qqeHo=
github.com/spf13/cobra v1.10.2 h1:DMTTonx5m65Ic0GOoRY2c16WCbHxOOw6xxezuLaBpcU=
github.com/spf13/cobra v1.10.2/go.mod h1:7C1pvHqHw5A4vrJfjNwvOdzYu0Gml16OCs2GRiTUUS4=
github.com/spf13/pflag v1.0.9/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spf13/pflag v1.0.10 h1:4EBh2KAYBwaONj6b2Ye1GiHfwjqyROoF4RwYO+vPwFk=
github.com/spf13/pflag v1.0.10/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spf13/viper v1.21.0 h1:x5S+0EU27Lbphp4UKm1C+1oQO+rKx36vfCoaVebLFSU=
github.com/spf13/viper v1.21.0/go.mod h1:P0lhsswPGWD/1lZJ9ny3fYnVqxiegrlNrEmgLjbTCAY=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8=
github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU=
go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc=
go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg=
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng=
golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@@ -0,0 +1,77 @@
package adapter
import (
"context"
"database/sql"
)
// DBAdapter defines the interface for database operations
type DBAdapter interface {
// Connect establishes a connection to the database
Connect(ctx context.Context) error
// Close closes the database connection
Close() error
// Ping checks if the database is reachable
Ping(ctx context.Context) error
// Begin starts a new transaction
Begin(ctx context.Context) (DBTransaction, error)
// Exec executes a query without returning rows
Exec(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
// QueryRow executes a query that returns at most one row
QueryRow(ctx context.Context, query string, args ...interface{}) DBRow
// Query executes a query that returns rows
Query(ctx context.Context, query string, args ...interface{}) (DBRows, error)
// Listen starts listening on a PostgreSQL notification channel
Listen(ctx context.Context, channel string, handler NotificationHandler) error
// Unlisten stops listening on a channel
Unlisten(ctx context.Context, channel string) error
}
// DBTransaction defines the interface for database transactions
type DBTransaction interface {
// Exec executes a query within the transaction
Exec(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
// QueryRow executes a query that returns at most one row within the transaction
QueryRow(ctx context.Context, query string, args ...interface{}) DBRow
// Query executes a query that returns rows within the transaction
Query(ctx context.Context, query string, args ...interface{}) (DBRows, error)
// Commit commits the transaction
Commit() error
// Rollback rolls back the transaction
Rollback() error
}
// DBRow defines the interface for scanning a single row
type DBRow interface {
Scan(dest ...interface{}) error
}
// DBRows defines the interface for scanning multiple rows
type DBRows interface {
Next() bool
Scan(dest ...interface{}) error
Close() error
Err() error
}
// Notification represents a PostgreSQL NOTIFY event
type Notification struct {
Channel string
Payload string
PID int
}
// NotificationHandler is called when a notification is received
type NotificationHandler func(notification *Notification)

View File

@@ -0,0 +1,22 @@
package adapter
// Logger defines the interface for logging operations
type Logger interface {
// Debug logs a debug message
Debug(msg string, args ...interface{})
// Info logs an info message
Info(msg string, args ...interface{})
// Warn logs a warning message
Warn(msg string, args ...interface{})
// Error logs an error message
Error(msg string, args ...interface{})
// Fatal logs a fatal message and exits
Fatal(msg string, args ...interface{})
// With returns a new logger with additional context
With(key string, value interface{}) Logger
}

View File

@@ -0,0 +1,311 @@
package adapter
import (
"context"
"database/sql"
"fmt"
"sync"
"time"
"github.com/lib/pq"
)
// PostgresConfig holds PostgreSQL connection configuration
type PostgresConfig struct {
Host string
Port int
Database string
User string
Password string
SSLMode string
MaxOpenConns int
MaxIdleConns int
ConnMaxLifetime time.Duration
ConnMaxIdleTime time.Duration
}
// PostgresAdapter implements DBAdapter for PostgreSQL
type PostgresAdapter struct {
config PostgresConfig
db *sql.DB
listener *pq.Listener
logger Logger
mu sync.RWMutex
}
// NewPostgresAdapter creates a new PostgreSQL adapter
func NewPostgresAdapter(config PostgresConfig, logger Logger) *PostgresAdapter {
return &PostgresAdapter{
config: config,
logger: logger,
}
}
// Connect establishes a connection to PostgreSQL
func (p *PostgresAdapter) Connect(ctx context.Context) error {
connStr := p.buildConnectionString()
db, err := sql.Open("postgres", connStr)
if err != nil {
return fmt.Errorf("failed to open database: %w", err)
}
// Configure connection pool
db.SetMaxOpenConns(p.config.MaxOpenConns)
db.SetMaxIdleConns(p.config.MaxIdleConns)
db.SetConnMaxLifetime(p.config.ConnMaxLifetime)
db.SetConnMaxIdleTime(p.config.ConnMaxIdleTime)
// Test connection
if err := db.PingContext(ctx); err != nil {
db.Close()
return fmt.Errorf("failed to ping database: %w", err)
}
p.mu.Lock()
p.db = db
p.mu.Unlock()
p.logger.Info("PostgreSQL connection established", "host", p.config.Host, "database", p.config.Database)
return nil
}
// Close closes the database connection
func (p *PostgresAdapter) Close() error {
p.mu.Lock()
defer p.mu.Unlock()
if p.listener != nil {
if err := p.listener.Close(); err != nil {
p.logger.Error("failed to close listener", "error", err)
}
p.listener = nil
}
if p.db != nil {
if err := p.db.Close(); err != nil {
return fmt.Errorf("failed to close database: %w", err)
}
p.db = nil
}
p.logger.Info("PostgreSQL connection closed")
return nil
}
// Ping checks if the database is reachable
func (p *PostgresAdapter) Ping(ctx context.Context) error {
p.mu.RLock()
db := p.db
p.mu.RUnlock()
if db == nil {
return fmt.Errorf("database connection not established")
}
return db.PingContext(ctx)
}
// Begin starts a new transaction
func (p *PostgresAdapter) Begin(ctx context.Context) (DBTransaction, error) {
p.mu.RLock()
db := p.db
p.mu.RUnlock()
if db == nil {
return nil, fmt.Errorf("database connection not established")
}
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return nil, fmt.Errorf("failed to begin transaction: %w", err)
}
return &postgresTransaction{tx: tx}, nil
}
// Exec executes a query without returning rows
func (p *PostgresAdapter) Exec(ctx context.Context, query string, args ...interface{}) (sql.Result, error) {
p.mu.RLock()
db := p.db
p.mu.RUnlock()
if db == nil {
return nil, fmt.Errorf("database connection not established")
}
return db.ExecContext(ctx, query, args...)
}
// QueryRow executes a query that returns at most one row
func (p *PostgresAdapter) QueryRow(ctx context.Context, query string, args ...interface{}) DBRow {
p.mu.RLock()
db := p.db
p.mu.RUnlock()
if db == nil {
return &postgresRow{err: fmt.Errorf("database connection not established")}
}
return &postgresRow{row: db.QueryRowContext(ctx, query, args...)}
}
// Query executes a query that returns rows
func (p *PostgresAdapter) Query(ctx context.Context, query string, args ...interface{}) (DBRows, error) {
p.mu.RLock()
db := p.db
p.mu.RUnlock()
if db == nil {
return nil, fmt.Errorf("database connection not established")
}
rows, err := db.QueryContext(ctx, query, args...)
if err != nil {
return nil, err
}
return &postgresRows{rows: rows}, nil
}
// Listen starts listening on a PostgreSQL notification channel
func (p *PostgresAdapter) Listen(ctx context.Context, channel string, handler NotificationHandler) error {
connStr := p.buildConnectionString()
reportProblem := func(ev pq.ListenerEventType, err error) {
if err != nil {
p.logger.Error("listener problem", "event", ev, "error", err)
}
}
minReconn := 10 * time.Second
maxReconn := 1 * time.Minute
p.mu.Lock()
p.listener = pq.NewListener(connStr, minReconn, maxReconn, reportProblem)
listener := p.listener
p.mu.Unlock()
if err := listener.Listen(channel); err != nil {
return fmt.Errorf("failed to listen on channel %s: %w", channel, err)
}
p.logger.Info("listening on channel", "channel", channel)
// Start notification handler in goroutine
go func() {
for {
select {
case n := <-listener.Notify:
if n != nil {
handler(&Notification{
Channel: n.Channel,
Payload: n.Extra,
PID: n.BePid,
})
}
case <-ctx.Done():
p.logger.Info("stopping listener", "channel", channel)
return
case <-time.After(90 * time.Second):
go listener.Ping()
}
}
}()
return nil
}
// Unlisten stops listening on a channel
func (p *PostgresAdapter) Unlisten(ctx context.Context, channel string) error {
p.mu.RLock()
listener := p.listener
p.mu.RUnlock()
if listener == nil {
return nil
}
return listener.Unlisten(channel)
}
// buildConnectionString builds a PostgreSQL connection string
func (p *PostgresAdapter) buildConnectionString() string {
sslMode := p.config.SSLMode
if sslMode == "" {
sslMode = "disable"
}
return fmt.Sprintf(
"host=%s port=%d user=%s password=%s dbname=%s sslmode=%s",
p.config.Host,
p.config.Port,
p.config.User,
p.config.Password,
p.config.Database,
sslMode,
)
}
// postgresTransaction implements DBTransaction
type postgresTransaction struct {
tx *sql.Tx
}
func (t *postgresTransaction) Exec(ctx context.Context, query string, args ...interface{}) (sql.Result, error) {
return t.tx.ExecContext(ctx, query, args...)
}
func (t *postgresTransaction) QueryRow(ctx context.Context, query string, args ...interface{}) DBRow {
return &postgresRow{row: t.tx.QueryRowContext(ctx, query, args...)}
}
func (t *postgresTransaction) Query(ctx context.Context, query string, args ...interface{}) (DBRows, error) {
rows, err := t.tx.QueryContext(ctx, query, args...)
if err != nil {
return nil, err
}
return &postgresRows{rows: rows}, nil
}
func (t *postgresTransaction) Commit() error {
return t.tx.Commit()
}
func (t *postgresTransaction) Rollback() error {
return t.tx.Rollback()
}
// postgresRow implements DBRow
type postgresRow struct {
row *sql.Row
err error
}
func (r *postgresRow) Scan(dest ...interface{}) error {
if r.err != nil {
return r.err
}
return r.row.Scan(dest...)
}
// postgresRows implements DBRows
type postgresRows struct {
rows *sql.Rows
}
func (r *postgresRows) Next() bool {
return r.rows.Next()
}
func (r *postgresRows) Scan(dest ...interface{}) error {
return r.rows.Scan(dest...)
}
func (r *postgresRows) Close() error {
return r.rows.Close()
}
func (r *postgresRows) Err() error {
return r.rows.Err()
}

View File

@@ -0,0 +1,80 @@
package adapter
import (
"log/slog"
"os"
)
// SlogLogger implements Logger interface using slog
type SlogLogger struct {
logger *slog.Logger
}
// NewSlogLogger creates a new slog-based logger
func NewSlogLogger(level slog.Level) *SlogLogger {
opts := &slog.HandlerOptions{
Level: level,
}
handler := slog.NewJSONHandler(os.Stdout, opts)
logger := slog.New(handler)
return &SlogLogger{
logger: logger,
}
}
// NewSlogLoggerWithHandler creates a new slog-based logger with a custom handler
func NewSlogLoggerWithHandler(handler slog.Handler) *SlogLogger {
return &SlogLogger{
logger: slog.New(handler),
}
}
// Debug logs a debug message
func (l *SlogLogger) Debug(msg string, args ...interface{}) {
l.logger.Debug(msg, l.convertArgs(args)...)
}
// Info logs an info message
func (l *SlogLogger) Info(msg string, args ...interface{}) {
l.logger.Info(msg, l.convertArgs(args)...)
}
// Warn logs a warning message
func (l *SlogLogger) Warn(msg string, args ...interface{}) {
l.logger.Warn(msg, l.convertArgs(args)...)
}
// Error logs an error message
func (l *SlogLogger) Error(msg string, args ...interface{}) {
l.logger.Error(msg, l.convertArgs(args)...)
}
// Fatal logs a fatal message and exits
func (l *SlogLogger) Fatal(msg string, args ...interface{}) {
l.logger.Error(msg, l.convertArgs(args)...)
os.Exit(1)
}
// With returns a new logger with additional context
func (l *SlogLogger) With(key string, value interface{}) Logger {
return &SlogLogger{
logger: l.logger.With(key, value),
}
}
// convertArgs converts variadic args to slog.Attr pairs
func (l *SlogLogger) convertArgs(args []interface{}) []any {
if len(args) == 0 {
return nil
}
// Convert pairs of key-value to slog format
attrs := make([]any, 0, len(args))
for i := 0; i < len(args); i += 2 {
if i+1 < len(args) {
attrs = append(attrs, args[i], args[i+1])
}
}
return attrs
}

127
pkg/broker/broker.go Normal file
View File

@@ -0,0 +1,127 @@
package broker
import (
"context"
"fmt"
"sync"
"git.warky.dev/wdevs/pgsql-broker/pkg/broker/adapter"
"git.warky.dev/wdevs/pgsql-broker/pkg/broker/config"
)
// Broker manages multiple database instances
type Broker struct {
config *config.Config
logger adapter.Logger
version string
instances []*DatabaseInstance
ctx context.Context
cancel context.CancelFunc
shutdown bool
mu sync.RWMutex
}
// New creates a new broker that manages multiple database connections
func New(cfg *config.Config, logger adapter.Logger, version string) (*Broker, error) {
ctx, cancel := context.WithCancel(context.Background())
broker := &Broker{
config: cfg,
logger: logger.With("component", "broker"),
version: version,
instances: make([]*DatabaseInstance, 0, len(cfg.Databases)),
ctx: ctx,
cancel: cancel,
}
return broker, nil
}
// Start begins all database instances
func (b *Broker) Start() error {
b.logger.Info("starting broker", "database_count", len(b.config.Databases))
// Create and start an instance for each database
for i, dbCfg := range b.config.Databases {
b.logger.Info("starting database instance", "name", dbCfg.Name, "host", dbCfg.Host, "database", dbCfg.Database)
// Create database adapter
dbAdapter := adapter.NewPostgresAdapter(dbCfg.ToPostgresConfig(), b.logger)
// Create database instance
instance, err := NewDatabaseInstance(b.config, &dbCfg, dbAdapter, b.logger, b.version, b.ctx)
if err != nil {
// Stop any already-started instances
b.stopInstances()
return fmt.Errorf("failed to create database instance %d (%s): %w", i, dbCfg.Name, err)
}
// Start the instance
if err := instance.Start(); err != nil {
// Stop any already-started instances
b.stopInstances()
return fmt.Errorf("failed to start database instance %d (%s): %w", i, dbCfg.Name, err)
}
b.instances = append(b.instances, instance)
b.logger.Info("database instance started", "name", dbCfg.Name, "instance_id", instance.ID)
}
b.logger.Info("broker started successfully", "database_instances", len(b.instances))
return nil
}
// Stop gracefully stops all database instances
func (b *Broker) Stop() error {
b.mu.Lock()
if b.shutdown {
b.mu.Unlock()
return nil
}
b.shutdown = true
b.mu.Unlock()
b.logger.Info("stopping broker")
// Cancel context
b.cancel()
// Stop all instances
b.stopInstances()
b.logger.Info("broker stopped")
return nil
}
// stopInstances stops all database instances
func (b *Broker) stopInstances() {
var wg sync.WaitGroup
for _, instance := range b.instances {
wg.Add(1)
go func(inst *DatabaseInstance) {
defer wg.Done()
if err := inst.Stop(); err != nil {
b.logger.Error("failed to stop instance", "name", inst.DatabaseName, "error", err)
}
}(instance)
}
wg.Wait()
}
// GetStats returns statistics for all database instances
func (b *Broker) GetStats() map[string]interface{} {
b.mu.RLock()
defer b.mu.RUnlock()
stats := map[string]interface{}{
"database_count": len(b.instances),
}
instanceStats := make(map[string]interface{})
for _, instance := range b.instances {
instanceStats[instance.DatabaseName] = instance.GetStats()
}
stats["instances"] = instanceStats
return stats
}

180
pkg/broker/config/config.go Normal file
View File

@@ -0,0 +1,180 @@
package config
import (
"fmt"
"time"
"github.com/spf13/viper"
"git.warky.dev/wdevs/pgsql-broker/pkg/broker/adapter"
)
// Config holds all broker configuration
type Config struct {
Databases []DatabaseConfig `mapstructure:"databases"`
Broker BrokerConfig `mapstructure:"broker"`
Logging LoggingConfig `mapstructure:"logging"`
}
// DatabaseConfig holds database connection settings
type DatabaseConfig struct {
Name string `mapstructure:"name"`
Host string `mapstructure:"host"`
Port int `mapstructure:"port"`
Database string `mapstructure:"database"`
User string `mapstructure:"user"`
Password string `mapstructure:"password"`
SSLMode string `mapstructure:"sslmode"`
MaxOpenConns int `mapstructure:"max_open_conns"`
MaxIdleConns int `mapstructure:"max_idle_conns"`
ConnMaxLifetime time.Duration `mapstructure:"conn_max_lifetime"`
ConnMaxIdleTime time.Duration `mapstructure:"conn_max_idle_time"`
QueueCount int `mapstructure:"queue_count"`
}
// BrokerConfig holds broker-specific settings
type BrokerConfig struct {
Name string `mapstructure:"name"`
FetchQueryQueSize int `mapstructure:"fetch_query_que_size"`
QueueTimerSec int `mapstructure:"queue_timer_sec"`
QueueBufferSize int `mapstructure:"queue_buffer_size"`
WorkerIdleTimeoutSec int `mapstructure:"worker_idle_timeout_sec"`
NotifyRetrySeconds time.Duration `mapstructure:"notify_retry_seconds"`
EnableDebug bool `mapstructure:"enable_debug"`
}
// LoggingConfig holds logging settings
type LoggingConfig struct {
Level string `mapstructure:"level"`
Format string `mapstructure:"format"` // json or text
}
// LoadConfig loads configuration from file and environment variables
func LoadConfig(configPath string) (*Config, error) {
v := viper.New()
// Set defaults
setDefaults(v)
// Config file settings
if configPath != "" {
v.SetConfigFile(configPath)
} else {
v.SetConfigName("broker")
v.SetConfigType("yaml")
v.AddConfigPath(".")
v.AddConfigPath("/etc/pgsql-broker/")
v.AddConfigPath("$HOME/.pgsql-broker/")
}
// Read from environment variables
v.SetEnvPrefix("BROKER")
v.AutomaticEnv()
// Read config file
if err := v.ReadInConfig(); err != nil {
if _, ok := err.(viper.ConfigFileNotFoundError); !ok {
return nil, fmt.Errorf("failed to read config file: %w", err)
}
}
var config Config
if err := v.Unmarshal(&config); err != nil {
return nil, fmt.Errorf("failed to unmarshal config: %w", err)
}
// Validate configuration
if err := validateConfig(&config); err != nil {
return nil, err
}
// Apply defaults to databases
applyDatabaseDefaults(&config)
return &config, nil
}
// setDefaults sets default configuration values
func setDefaults(v *viper.Viper) {
// Broker defaults
v.SetDefault("broker.name", "pgsql-broker")
v.SetDefault("broker.fetch_query_que_size", 100)
v.SetDefault("broker.queue_timer_sec", 10)
v.SetDefault("broker.queue_buffer_size", 50)
v.SetDefault("broker.worker_idle_timeout_sec", 10)
v.SetDefault("broker.notify_retry_seconds", 30*time.Second)
v.SetDefault("broker.enable_debug", false)
// Logging defaults
v.SetDefault("logging.level", "info")
v.SetDefault("logging.format", "json")
}
// validateConfig validates the configuration
func validateConfig(config *Config) error {
if len(config.Databases) == 0 {
return fmt.Errorf("at least one database must be configured")
}
// Validate each database configuration
for i, db := range config.Databases {
if db.Name == "" {
return fmt.Errorf("database[%d]: name is required", i)
}
if db.Host == "" {
return fmt.Errorf("database[%d] (%s): host is required", i, db.Name)
}
if db.Database == "" {
return fmt.Errorf("database[%d] (%s): database name is required", i, db.Name)
}
if db.User == "" {
return fmt.Errorf("database[%d] (%s): user is required", i, db.Name)
}
}
return nil
}
// applyDatabaseDefaults applies default values to database configurations
func applyDatabaseDefaults(config *Config) {
for i := range config.Databases {
db := &config.Databases[i]
if db.Port == 0 {
db.Port = 5432
}
if db.SSLMode == "" {
db.SSLMode = "disable"
}
if db.MaxOpenConns == 0 {
db.MaxOpenConns = 25
}
if db.MaxIdleConns == 0 {
db.MaxIdleConns = 5
}
if db.ConnMaxLifetime == 0 {
db.ConnMaxLifetime = 5 * time.Minute
}
if db.ConnMaxIdleTime == 0 {
db.ConnMaxIdleTime = 10 * time.Minute
}
if db.QueueCount == 0 {
db.QueueCount = 4
}
}
}
// ToPostgresConfig converts DatabaseConfig to adapter.PostgresConfig
func (d *DatabaseConfig) ToPostgresConfig() adapter.PostgresConfig {
return adapter.PostgresConfig{
Host: d.Host,
Port: d.Port,
Database: d.Database,
User: d.User,
Password: d.Password,
SSLMode: d.SSLMode,
MaxOpenConns: d.MaxOpenConns,
MaxIdleConns: d.MaxIdleConns,
ConnMaxLifetime: d.ConnMaxLifetime,
ConnMaxIdleTime: d.ConnMaxIdleTime,
}
}

View File

@@ -0,0 +1,326 @@
package broker
import (
"context"
"encoding/json"
"fmt"
"os"
"sync"
"time"
"git.warky.dev/wdevs/pgsql-broker/pkg/broker/adapter"
"git.warky.dev/wdevs/pgsql-broker/pkg/broker/config"
"git.warky.dev/wdevs/pgsql-broker/pkg/broker/models"
"git.warky.dev/wdevs/pgsql-broker/pkg/broker/queue"
)
// DatabaseInstance represents a broker instance for a single database
type DatabaseInstance struct {
ID int64
Name string
DatabaseName string
Hostname string
PID int
Version string
config *config.Config
dbConfig *config.DatabaseConfig
db adapter.DBAdapter
logger adapter.Logger
queues map[int]*queue.Queue
queuesMu sync.RWMutex
ctx context.Context
shutdown bool
shutdownMu sync.RWMutex
jobsHandled int64
startTime time.Time
}
// NewDatabaseInstance creates a new database instance
func NewDatabaseInstance(cfg *config.Config, dbCfg *config.DatabaseConfig, db adapter.DBAdapter, logger adapter.Logger, version string, parentCtx context.Context) (*DatabaseInstance, error) {
hostname, err := os.Hostname()
if err != nil {
hostname = "unknown"
}
instance := &DatabaseInstance{
Name: fmt.Sprintf("%s-%s", cfg.Broker.Name, dbCfg.Name),
DatabaseName: dbCfg.Name,
Hostname: hostname,
PID: os.Getpid(),
Version: version,
config: cfg,
dbConfig: dbCfg,
db: db,
logger: logger.With("component", "database-instance").With("database", dbCfg.Name),
queues: make(map[int]*queue.Queue),
ctx: parentCtx,
startTime: time.Now(),
}
return instance, nil
}
// Start begins the database instance
func (i *DatabaseInstance) Start() error {
i.logger.Info("starting database instance", "name", i.Name, "hostname", i.Hostname, "pid", i.PID)
// Connect to database
if err := i.db.Connect(i.ctx); err != nil {
return fmt.Errorf("failed to connect to database: %w", err)
}
// Register instance in database
if err := i.registerInstance(); err != nil {
return fmt.Errorf("failed to register instance: %w", err)
}
i.logger.Info("database instance registered", "id", i.ID)
// Start queues
if err := i.startQueues(); err != nil {
return fmt.Errorf("failed to start queues: %w", err)
}
// Start listening for notifications
if err := i.startListener(); err != nil {
return fmt.Errorf("failed to start listener: %w", err)
}
// Start ping routine
go i.pingRoutine()
i.logger.Info("database instance started successfully")
return nil
}
// Stop gracefully stops the database instance
func (i *DatabaseInstance) Stop() error {
i.shutdownMu.Lock()
if i.shutdown {
i.shutdownMu.Unlock()
return nil
}
i.shutdown = true
i.shutdownMu.Unlock()
i.logger.Info("stopping database instance")
// Stop all queues
i.queuesMu.Lock()
for num, q := range i.queues {
i.logger.Info("stopping queue", "number", num)
if err := q.Stop(); err != nil {
i.logger.Error("failed to stop queue", "number", num, "error", err)
}
}
i.queuesMu.Unlock()
// Update instance status in database
if err := i.shutdownInstance(); err != nil {
i.logger.Error("failed to shutdown instance in database", "error", err)
}
// Close database connection
if err := i.db.Close(); err != nil {
i.logger.Error("failed to close database", "error", err)
}
i.logger.Info("database instance stopped")
return nil
}
// registerInstance registers the instance in the database
func (i *DatabaseInstance) registerInstance() error {
var retval int
var errmsg string
var instanceID int64
err := i.db.QueryRow(i.ctx,
"SELECT p_retval, p_errmsg, p_instance_id FROM broker_register_instance($1, $2, $3, $4, $5)",
i.Name, i.Hostname, i.PID, i.Version, i.dbConfig.QueueCount,
).Scan(&retval, &errmsg, &instanceID)
if err != nil {
return fmt.Errorf("query error: %w", err)
}
if retval > 0 {
return fmt.Errorf("broker_register_instance error: %s", errmsg)
}
i.ID = instanceID
return nil
}
// startQueues initializes and starts all queues
func (i *DatabaseInstance) startQueues() error {
i.queuesMu.Lock()
defer i.queuesMu.Unlock()
for queueNum := 1; queueNum <= i.dbConfig.QueueCount; queueNum++ {
queueCfg := queue.Config{
Number: queueNum,
InstanceID: i.ID,
WorkerCount: 1, // One worker per queue for now
DBAdapter: i.db,
Logger: i.logger,
BufferSize: i.config.Broker.QueueBufferSize,
TimerSeconds: i.config.Broker.QueueTimerSec,
FetchSize: i.config.Broker.FetchQueryQueSize,
}
q := queue.New(queueCfg)
if err := q.Start(queueCfg); err != nil {
return fmt.Errorf("failed to start queue %d: %w", queueNum, err)
}
i.queues[queueNum] = q
i.logger.Info("queue started", "number", queueNum)
}
return nil
}
// startListener starts listening for database notifications
func (i *DatabaseInstance) startListener() error {
handler := func(n *adapter.Notification) {
i.handleNotification(n)
}
if err := i.db.Listen(i.ctx, "broker.event", handler); err != nil {
return fmt.Errorf("failed to start listener: %w", err)
}
return nil
}
// handleNotification processes incoming job notifications
func (i *DatabaseInstance) handleNotification(n *adapter.Notification) {
if i.config.Broker.EnableDebug {
i.logger.Debug("received notification", "channel", n.Channel, "payload", n.Payload)
}
var job models.Job
if err := json.Unmarshal([]byte(n.Payload), &job); err != nil {
i.logger.Error("failed to unmarshal notification", "error", err, "payload", n.Payload)
return
}
if job.ID <= 0 {
i.logger.Warn("notification missing job ID", "payload", n.Payload)
return
}
if job.JobQueue <= 0 {
i.logger.Warn("notification missing queue number", "job_id", job.ID)
return
}
// Get the queue
i.queuesMu.RLock()
q, exists := i.queues[job.JobQueue]
i.queuesMu.RUnlock()
if !exists {
i.logger.Warn("queue not found for job", "job_id", job.ID, "queue", job.JobQueue)
return
}
// Add job to queue
if err := q.AddJob(job); err != nil {
i.logger.Error("failed to add job to queue", "job_id", job.ID, "queue", job.JobQueue, "error", err)
}
}
// pingRoutine periodically updates the instance status in the database
func (i *DatabaseInstance) pingRoutine() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
i.shutdownMu.RLock()
if i.shutdown {
i.shutdownMu.RUnlock()
return
}
i.shutdownMu.RUnlock()
if err := i.ping(); err != nil {
i.logger.Error("ping failed", "error", err)
}
case <-i.ctx.Done():
return
}
}
}
// ping updates the instance ping timestamp
func (i *DatabaseInstance) ping() error {
var retval int
var errmsg string
err := i.db.QueryRow(i.ctx,
"SELECT p_retval, p_errmsg FROM broker_ping_instance($1, $2)",
i.ID, i.jobsHandled,
).Scan(&retval, &errmsg)
if err != nil {
return fmt.Errorf("query error: %w", err)
}
if retval > 0 {
return fmt.Errorf("broker_ping_instance error: %s", errmsg)
}
return nil
}
// shutdownInstance marks the instance as shutdown in the database
func (i *DatabaseInstance) shutdownInstance() error {
var retval int
var errmsg string
err := i.db.QueryRow(i.ctx,
"SELECT p_retval, p_errmsg FROM broker_shutdown_instance($1)",
i.ID,
).Scan(&retval, &errmsg)
if err != nil {
return fmt.Errorf("query error: %w", err)
}
if retval > 0 {
return fmt.Errorf("broker_shutdown_instance error: %s", errmsg)
}
return nil
}
// GetStats returns instance statistics
func (i *DatabaseInstance) GetStats() map[string]interface{} {
i.queuesMu.RLock()
defer i.queuesMu.RUnlock()
stats := map[string]interface{}{
"id": i.ID,
"name": i.Name,
"database_name": i.DatabaseName,
"hostname": i.Hostname,
"pid": i.PID,
"version": i.Version,
"uptime": time.Since(i.startTime).String(),
"jobs_handled": i.jobsHandled,
"queue_count": len(i.queues),
}
queueStats := make(map[int]interface{})
for num, q := range i.queues {
queueStats[num] = q.GetStats()
}
stats["queues"] = queueStats
return stats
}

View File

@@ -0,0 +1,246 @@
package install
import (
"context"
"embed"
"fmt"
"io/fs"
"sort"
"strings"
"git.warky.dev/wdevs/pgsql-broker/pkg/broker/adapter"
)
//go:embed all:sql
var sqlFS embed.FS
// Installer handles database schema installation
type Installer struct {
db adapter.DBAdapter
logger adapter.Logger
}
// New creates a new installer
func New(db adapter.DBAdapter, logger adapter.Logger) *Installer {
return &Installer{
db: db,
logger: logger,
}
}
// InstallSchema installs the complete database schema
func (i *Installer) InstallSchema(ctx context.Context) error {
i.logger.Info("starting schema installation")
// Install tables first
if err := i.installTables(ctx); err != nil {
return fmt.Errorf("failed to install tables: %w", err)
}
// Then install procedures
if err := i.installProcedures(ctx); err != nil {
return fmt.Errorf("failed to install procedures: %w", err)
}
i.logger.Info("schema installation completed successfully")
return nil
}
// installTables installs all table definitions
func (i *Installer) installTables(ctx context.Context) error {
i.logger.Info("installing tables")
files, err := sqlFS.ReadDir("sql/tables")
if err != nil {
return fmt.Errorf("failed to read tables directory: %w", err)
}
// Filter and sort SQL files
sqlFiles := filterAndSortSQLFiles(files)
for _, file := range sqlFiles {
// Skip install script
if file == "00_install.sql" {
continue
}
i.logger.Info("executing table script", "file", file)
content, err := sqlFS.ReadFile("sql/tables/" + file)
if err != nil {
return fmt.Errorf("failed to read file %s: %w", file, err)
}
if err := i.executeSQL(ctx, string(content)); err != nil {
return fmt.Errorf("failed to execute %s: %w", file, err)
}
}
i.logger.Info("tables installed successfully")
return nil
}
// installProcedures installs all stored procedures
func (i *Installer) installProcedures(ctx context.Context) error {
i.logger.Info("installing procedures")
files, err := sqlFS.ReadDir("sql/procedures")
if err != nil {
return fmt.Errorf("failed to read procedures directory: %w", err)
}
// Filter and sort SQL files
sqlFiles := filterAndSortSQLFiles(files)
for _, file := range sqlFiles {
// Skip install script
if file == "00_install.sql" {
continue
}
i.logger.Info("executing procedure script", "file", file)
content, err := sqlFS.ReadFile("sql/procedures/" + file)
if err != nil {
return fmt.Errorf("failed to read file %s: %w", file, err)
}
if err := i.executeSQL(ctx, string(content)); err != nil {
return fmt.Errorf("failed to execute %s: %w", file, err)
}
}
i.logger.Info("procedures installed successfully")
return nil
}
// executeSQL executes SQL statements
func (i *Installer) executeSQL(ctx context.Context, sql string) error {
// Remove comments and split by statement
statements := splitSQLStatements(sql)
for _, stmt := range statements {
stmt = strings.TrimSpace(stmt)
if stmt == "" {
continue
}
// Skip psql-specific commands
if strings.HasPrefix(stmt, "\\") {
continue
}
if _, err := i.db.Exec(ctx, stmt); err != nil {
return fmt.Errorf("failed to execute statement: %w\nStatement: %s", err, stmt)
}
}
return nil
}
// filterAndSortSQLFiles filters and sorts SQL files
func filterAndSortSQLFiles(files []fs.DirEntry) []string {
var sqlFiles []string
for _, file := range files {
if !file.IsDir() && strings.HasSuffix(file.Name(), ".sql") {
sqlFiles = append(sqlFiles, file.Name())
}
}
sort.Strings(sqlFiles)
return sqlFiles
}
// splitSQLStatements splits SQL into individual statements
func splitSQLStatements(sql string) []string {
// Simple split by semicolon
// This doesn't handle all edge cases (strings with semicolons, dollar-quoted strings, etc.)
// but works for our use case
statements := strings.Split(sql, ";")
var result []string
var buffer string
for _, stmt := range statements {
stmt = strings.TrimSpace(stmt)
if stmt == "" {
continue
}
buffer += stmt + ";"
// Check if we're inside a function definition ($$)
dollarCount := strings.Count(buffer, "$$")
if dollarCount%2 == 0 {
// Even number of $$ means we're outside function definitions
result = append(result, buffer)
buffer = ""
} else {
// Odd number means we're inside a function, keep accumulating
buffer += " "
}
}
// Add any remaining buffered content
if buffer != "" {
result = append(result, buffer)
}
return result
}
// VerifyInstallation checks if the schema is properly installed
func (i *Installer) VerifyInstallation(ctx context.Context) error {
i.logger.Info("verifying installation")
tables := []string{"broker_queueinstance", "broker_jobs", "broker_schedule"}
procedures := []string{
"broker_get",
"broker_run",
"broker_set",
"broker_add_job",
"broker_register_instance",
"broker_ping_instance",
"broker_shutdown_instance",
}
// Check tables
for _, table := range tables {
var exists bool
err := i.db.QueryRow(ctx,
"SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = $1)",
table,
).Scan(&exists)
if err != nil {
return fmt.Errorf("failed to check table %s: %w", table, err)
}
if !exists {
return fmt.Errorf("table %s does not exist", table)
}
i.logger.Info("table verified", "table", table)
}
// Check procedures
for _, proc := range procedures {
var exists bool
err := i.db.QueryRow(ctx,
"SELECT EXISTS (SELECT FROM pg_proc WHERE proname = $1)",
proc,
).Scan(&exists)
if err != nil {
return fmt.Errorf("failed to check procedure %s: %w", proc, err)
}
if !exists {
return fmt.Errorf("procedure %s does not exist", proc)
}
i.logger.Info("procedure verified", "procedure", proc)
}
i.logger.Info("installation verified successfully")
return nil
}

View File

@@ -0,0 +1,13 @@
-- PostgreSQL Broker Procedures Installation Script
-- Run this script to create all required stored procedures
\echo 'Installing PostgreSQL Broker procedures...'
\i 01_broker_get.sql
\i 02_broker_run.sql
\i 03_broker_set.sql
\i 04_broker_register_instance.sql
\i 05_broker_add_job.sql
\i 06_broker_ping_instance.sql
\echo 'PostgreSQL Broker procedures installed successfully!'

View File

@@ -0,0 +1,76 @@
-- broker_get function
-- Fetches the next job from the queue for a given queue number
-- Returns: p_retval (0=success, >0=error), p_errmsg (error message), p_job_id (job ID if found)
CREATE OR REPLACE FUNCTION broker_get(
p_queue_number INTEGER,
p_instance_id BIGINT DEFAULT NULL,
OUT p_retval INTEGER,
OUT p_errmsg TEXT,
OUT p_job_id BIGINT
)
RETURNS RECORD
LANGUAGE plpgsql
AS $$
DECLARE
v_job_record RECORD;
BEGIN
p_retval := 0;
p_errmsg := '';
p_job_id := NULL;
-- Validate queue number
IF p_queue_number IS NULL OR p_queue_number <= 0 THEN
p_retval := 1;
p_errmsg := 'Invalid queue number';
RETURN;
END IF;
-- Find and lock the next pending job for this queue
-- Uses SKIP LOCKED to avoid blocking on jobs being processed by other workers
-- Skip jobs with pending dependencies
SELECT id_broker_jobs, job_name, job_priority, execute_str
INTO v_job_record
FROM broker_jobs
WHERE job_queue = p_queue_number
AND complete_status = 0 -- pending
AND (
depends_on IS NULL -- no dependencies
OR depends_on = '{}' -- empty dependencies
OR NOT EXISTS ( -- all dependencies completed
SELECT 1
FROM broker_jobs dep
WHERE dep.job_name = ANY(broker_jobs.depends_on)
AND dep.complete_status = 0 -- pending dependency
)
)
ORDER BY job_priority DESC, created_at ASC
LIMIT 1
FOR UPDATE SKIP LOCKED;
-- If no job found, return success with NULL job_id
IF NOT FOUND THEN
RETURN;
END IF;
-- Update job status to running
UPDATE broker_jobs
SET complete_status = 1, -- running
started_at = NOW(),
rid_broker_queueinstance = p_instance_id,
updated_at = NOW()
WHERE id_broker_jobs = v_job_record.id_broker_jobs;
-- Return the job ID
p_job_id := v_job_record.id_broker_jobs;
EXCEPTION
WHEN OTHERS THEN
p_retval := 2;
p_errmsg := SQLERRM;
RAISE WARNING 'broker_get error: %', SQLERRM;
END;
$$;
-- Comments
COMMENT ON FUNCTION broker_get IS 'Fetches the next pending job from the specified queue';

View File

@@ -0,0 +1,113 @@
-- broker_run function
-- Executes a job by its ID
-- Returns: p_retval (0=success, >0=error), p_errmsg (error message)
CREATE OR REPLACE FUNCTION broker_run(
p_job_id BIGINT,
OUT p_retval INTEGER,
OUT p_errmsg TEXT
)
RETURNS RECORD
LANGUAGE plpgsql
AS $$
DECLARE
v_job_record RECORD;
v_execute_result TEXT;
v_error_occurred BOOLEAN := false;
BEGIN
p_retval := 0;
p_errmsg := '';
v_execute_result := '';
-- Validate job ID
IF p_job_id IS NULL OR p_job_id <= 0 THEN
p_retval := 1;
p_errmsg := 'Invalid job ID';
RETURN;
END IF;
-- Get job details
SELECT id_broker_jobs, execute_str, job_language, run_as, complete_status
INTO v_job_record
FROM broker_jobs
WHERE id_broker_jobs = p_job_id
FOR UPDATE;
-- Check if job exists
IF NOT FOUND THEN
p_retval := 2;
p_errmsg := 'Job not found';
RETURN;
END IF;
-- Check if job is in running state
IF v_job_record.complete_status != 1 THEN
p_retval := 3;
p_errmsg := format('Job is not in running state (status: %s)', v_job_record.complete_status);
RETURN;
END IF;
-- Execute the job
BEGIN
-- For SQL/PLPGSQL jobs, execute directly
IF v_job_record.job_language IN ('sql', 'plpgsql') THEN
EXECUTE v_job_record.execute_str;
v_execute_result := 'Success';
ELSE
-- Other languages would need external execution
p_retval := 4;
p_errmsg := format('Unsupported job language: %s', v_job_record.job_language);
v_error_occurred := true;
END IF;
EXCEPTION
WHEN OTHERS THEN
v_error_occurred := true;
p_retval := 5;
p_errmsg := SQLERRM;
v_execute_result := format('Error: %s', SQLERRM);
END;
-- Update job with results
IF v_error_occurred THEN
UPDATE broker_jobs
SET complete_status = 3, -- failed
error_msg = p_errmsg,
execute_result = v_execute_result,
completed_at = NOW(),
updated_at = NOW()
WHERE id_broker_jobs = p_job_id;
ELSE
UPDATE broker_jobs
SET complete_status = 2, -- completed
execute_result = v_execute_result,
error_msg = NULL,
completed_at = NOW(),
updated_at = NOW()
WHERE id_broker_jobs = p_job_id;
END IF;
EXCEPTION
WHEN OTHERS THEN
p_retval := 6;
p_errmsg := SQLERRM;
RAISE WARNING 'broker_run error: %', SQLERRM;
-- Try to update job status to failed
BEGIN
UPDATE broker_jobs
SET complete_status = 3, -- failed
error_msg = SQLERRM,
completed_at = NOW(),
updated_at = NOW()
WHERE id_broker_jobs = p_job_id;
EXCEPTION
WHEN OTHERS THEN
-- Ignore update errors
NULL;
END;
END;
$$;
-- Comments
COMMENT ON FUNCTION broker_run IS 'Executes a job by its ID and updates the status';

View File

@@ -0,0 +1,95 @@
-- broker_set function
-- Sets broker runtime options and context
-- Supports: user, application_name, and custom settings
-- Returns: p_retval (0=success, >0=error), p_errmsg (error message)
CREATE OR REPLACE FUNCTION broker_set(
p_option_name TEXT,
p_option_value TEXT,
OUT p_retval INTEGER,
OUT p_errmsg TEXT
)
RETURNS RECORD
LANGUAGE plpgsql
AS $$
DECLARE
v_sql TEXT;
BEGIN
p_retval := 0;
p_errmsg := '';
-- Validate inputs
IF p_option_name IS NULL OR p_option_name = '' THEN
p_retval := 1;
p_errmsg := 'Option name is required';
RETURN;
END IF;
-- Handle different option types
CASE LOWER(p_option_name)
WHEN 'user' THEN
-- Set session user context
-- This is useful for audit trails and permissions
BEGIN
v_sql := format('SET SESSION AUTHORIZATION %I', p_option_value);
EXECUTE v_sql;
EXCEPTION
WHEN OTHERS THEN
p_retval := 2;
p_errmsg := format('Failed to set user: %s', SQLERRM);
RETURN;
END;
WHEN 'application_name' THEN
-- Set application name (visible in pg_stat_activity)
BEGIN
v_sql := format('SET application_name TO %L', p_option_value);
EXECUTE v_sql;
EXCEPTION
WHEN OTHERS THEN
p_retval := 3;
p_errmsg := format('Failed to set application_name: %s', SQLERRM);
RETURN;
END;
WHEN 'search_path' THEN
-- Set schema search path
BEGIN
v_sql := format('SET search_path TO %s', p_option_value);
EXECUTE v_sql;
EXCEPTION
WHEN OTHERS THEN
p_retval := 4;
p_errmsg := format('Failed to set search_path: %s', SQLERRM);
RETURN;
END;
WHEN 'timezone' THEN
-- Set timezone
BEGIN
v_sql := format('SET timezone TO %L', p_option_value);
EXECUTE v_sql;
EXCEPTION
WHEN OTHERS THEN
p_retval := 5;
p_errmsg := format('Failed to set timezone: %s', SQLERRM);
RETURN;
END;
ELSE
-- Unknown option
p_retval := 10;
p_errmsg := format('Unknown option: %s', p_option_name);
RETURN;
END CASE;
EXCEPTION
WHEN OTHERS THEN
p_retval := 99;
p_errmsg := SQLERRM;
RAISE WARNING 'broker_set error: %', SQLERRM;
END;
$$;
-- Comments
COMMENT ON FUNCTION broker_set IS 'Sets broker runtime options and session context (user, application_name, search_path, timezone)';

View File

@@ -0,0 +1,82 @@
-- broker_register_instance function
-- Registers a new broker instance in the database
-- Returns: p_retval (0=success, >0=error), p_errmsg (error message), p_instance_id (new instance ID)
CREATE OR REPLACE FUNCTION broker_register_instance(
p_name TEXT,
p_hostname TEXT,
p_pid INTEGER,
p_version TEXT,
p_queue_count INTEGER,
OUT p_retval INTEGER,
OUT p_errmsg TEXT,
OUT p_instance_id BIGINT
)
RETURNS RECORD
LANGUAGE plpgsql
AS $$
DECLARE
v_active_count INTEGER;
BEGIN
p_retval := 0;
p_errmsg := '';
p_instance_id := NULL;
-- Validate inputs
IF p_name IS NULL OR p_name = '' THEN
p_retval := 1;
p_errmsg := 'Instance name is required';
RETURN;
END IF;
IF p_hostname IS NULL OR p_hostname = '' THEN
p_retval := 2;
p_errmsg := 'Hostname is required';
RETURN;
END IF;
-- Check for existing active instances
-- Only one broker instance should be active per database
SELECT COUNT(*)
INTO v_active_count
FROM broker_queueinstance
WHERE status = 'active';
IF v_active_count > 0 THEN
p_retval := 3;
p_errmsg := 'Another broker instance is already active in this database. Only one broker instance per database is allowed.';
RETURN;
END IF;
-- Insert new instance
INSERT INTO broker_queueinstance (
name,
hostname,
pid,
version,
status,
queue_count,
started_at,
last_ping_at
) VALUES (
p_name,
p_hostname,
p_pid,
p_version,
'active',
p_queue_count,
NOW(),
NOW()
)
RETURNING id_broker_queueinstance INTO p_instance_id;
EXCEPTION
WHEN OTHERS THEN
p_retval := 99;
p_errmsg := SQLERRM;
RAISE WARNING 'broker_register_instance error: %', SQLERRM;
END;
$$;
-- Comments
COMMENT ON FUNCTION broker_register_instance IS 'Registers a new broker instance';

View File

@@ -0,0 +1,91 @@
-- broker_add_job function
-- Adds a new job to the broker queue and sends a notification
-- Returns: p_retval (0=success, >0=error), p_errmsg (error message), p_job_id (new job ID)
CREATE OR REPLACE FUNCTION broker_add_job(
p_job_name TEXT,
p_execute_str TEXT,
p_job_queue INTEGER DEFAULT 1,
p_job_priority INTEGER DEFAULT 0,
p_job_language TEXT DEFAULT 'sql',
p_run_as TEXT DEFAULT NULL,
p_schedule_id BIGINT DEFAULT NULL,
p_depends_on TEXT[] DEFAULT NULL,
OUT p_retval INTEGER,
OUT p_errmsg TEXT,
OUT p_job_id BIGINT
)
RETURNS RECORD
LANGUAGE plpgsql
AS $$
DECLARE
v_notification_payload JSON;
BEGIN
p_retval := 0;
p_errmsg := '';
p_job_id := NULL;
-- Validate inputs
IF p_job_name IS NULL OR p_job_name = '' THEN
p_retval := 1;
p_errmsg := 'Job name is required';
RETURN;
END IF;
IF p_execute_str IS NULL OR p_execute_str = '' THEN
p_retval := 2;
p_errmsg := 'Execute string is required';
RETURN;
END IF;
IF p_job_queue IS NULL OR p_job_queue <= 0 THEN
p_retval := 3;
p_errmsg := 'Invalid job queue number';
RETURN;
END IF;
-- Insert new job
INSERT INTO broker_jobs (
job_name,
job_priority,
job_queue,
job_language,
execute_str,
run_as,
rid_broker_schedule,
depends_on,
complete_status
) VALUES (
p_job_name,
p_job_priority,
p_job_queue,
p_job_language,
p_execute_str,
p_run_as,
p_schedule_id,
p_depends_on,
0 -- pending
)
RETURNING id_broker_jobs INTO p_job_id;
-- Create notification payload
v_notification_payload := json_build_object(
'id', p_job_id,
'job_name', p_job_name,
'job_queue', p_job_queue,
'job_priority', p_job_priority
);
-- Send notification to broker
PERFORM pg_notify('broker.event', v_notification_payload::text);
EXCEPTION
WHEN OTHERS THEN
p_retval := 99;
p_errmsg := SQLERRM;
RAISE WARNING 'broker_add_job error: %', SQLERRM;
END;
$$;
-- Comments
COMMENT ON FUNCTION broker_add_job IS 'Adds a new job to the broker queue and sends a NOTIFY event';

View File

@@ -0,0 +1,98 @@
-- broker_ping_instance function
-- Updates the last_ping_at timestamp for a broker instance
-- Returns: p_retval (0=success, >0=error), p_errmsg (error message)
CREATE OR REPLACE FUNCTION broker_ping_instance(
p_instance_id BIGINT,
p_jobs_handled BIGINT DEFAULT NULL,
OUT p_retval INTEGER,
OUT p_errmsg TEXT
)
RETURNS RECORD
LANGUAGE plpgsql
AS $$
BEGIN
p_retval := 0;
p_errmsg := '';
-- Validate instance ID
IF p_instance_id IS NULL OR p_instance_id <= 0 THEN
p_retval := 1;
p_errmsg := 'Invalid instance ID';
RETURN;
END IF;
-- Update ping timestamp
IF p_jobs_handled IS NOT NULL THEN
UPDATE broker_queueinstance
SET last_ping_at = NOW(),
jobs_handled = p_jobs_handled
WHERE id_broker_queueinstance = p_instance_id;
ELSE
UPDATE broker_queueinstance
SET last_ping_at = NOW()
WHERE id_broker_queueinstance = p_instance_id;
END IF;
-- Check if instance was found
IF NOT FOUND THEN
p_retval := 2;
p_errmsg := 'Instance not found';
RETURN;
END IF;
EXCEPTION
WHEN OTHERS THEN
p_retval := 99;
p_errmsg := SQLERRM;
RAISE WARNING 'broker_ping_instance error: %', SQLERRM;
END;
$$;
-- broker_shutdown_instance function
-- Marks a broker instance as shutdown
-- Returns: p_retval (0=success, >0=error), p_errmsg (error message)
CREATE OR REPLACE FUNCTION broker_shutdown_instance(
p_instance_id BIGINT,
OUT p_retval INTEGER,
OUT p_errmsg TEXT
)
RETURNS RECORD
LANGUAGE plpgsql
AS $$
BEGIN
p_retval := 0;
p_errmsg := '';
-- Validate instance ID
IF p_instance_id IS NULL OR p_instance_id <= 0 THEN
p_retval := 1;
p_errmsg := 'Invalid instance ID';
RETURN;
END IF;
-- Update instance status
UPDATE broker_queueinstance
SET status = 'shutdown',
shutdown_at = NOW()
WHERE id_broker_queueinstance = p_instance_id;
-- Check if instance was found
IF NOT FOUND THEN
p_retval := 2;
p_errmsg := 'Instance not found';
RETURN;
END IF;
EXCEPTION
WHEN OTHERS THEN
p_retval := 99;
p_errmsg := SQLERRM;
RAISE WARNING 'broker_shutdown_instance error: %', SQLERRM;
END;
$$;
-- Comments
COMMENT ON FUNCTION broker_ping_instance IS 'Updates the last ping timestamp for an instance';
COMMENT ON FUNCTION broker_shutdown_instance IS 'Marks an instance as shutdown';

View File

@@ -0,0 +1,10 @@
-- PostgreSQL Broker Tables Installation Script
-- Run this script to create all required tables
\echo 'Installing PostgreSQL Broker tables...'
\i 01_broker_queueinstance.sql
\i 03_broker_schedule.sql
\i 02_broker_jobs.sql
\echo 'PostgreSQL Broker tables installed successfully!'

View File

@@ -0,0 +1,31 @@
-- broker_queueinstance table
-- Tracks active and historical broker queue instances
CREATE TABLE IF NOT EXISTS broker_queueinstance (
id_broker_queueinstance BIGSERIAL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
hostname VARCHAR(255) NOT NULL,
pid INTEGER NOT NULL,
version VARCHAR(50) NOT NULL,
status VARCHAR(20) NOT NULL DEFAULT 'active',
started_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
last_ping_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
shutdown_at TIMESTAMP WITH TIME ZONE,
queue_count INTEGER NOT NULL DEFAULT 0,
jobs_handled BIGINT NOT NULL DEFAULT 0,
CONSTRAINT broker_queueinstance_status_check CHECK (status IN ('active', 'inactive', 'shutdown'))
);
-- Indexes
CREATE INDEX IF NOT EXISTS idx_broker_queueinstance_status ON broker_queueinstance(status);
CREATE INDEX IF NOT EXISTS idx_broker_queueinstance_hostname ON broker_queueinstance(hostname);
CREATE INDEX IF NOT EXISTS idx_broker_queueinstance_last_ping ON broker_queueinstance(last_ping_at);
-- Comments
COMMENT ON TABLE broker_queueinstance IS 'Tracks broker queue instances (active and historical)';
COMMENT ON COLUMN broker_queueinstance.name IS 'Human-readable name of the broker instance';
COMMENT ON COLUMN broker_queueinstance.hostname IS 'Hostname where the broker is running';
COMMENT ON COLUMN broker_queueinstance.pid IS 'Process ID of the broker';
COMMENT ON COLUMN broker_queueinstance.status IS 'Current status: active, inactive, or shutdown';
COMMENT ON COLUMN broker_queueinstance.jobs_handled IS 'Total number of jobs handled by this instance';

View File

@@ -0,0 +1,62 @@
-- broker_jobs table
-- Stores jobs to be executed by the broker
CREATE TABLE IF NOT EXISTS broker_jobs (
id_broker_jobs BIGSERIAL PRIMARY KEY,
job_name VARCHAR(255) NOT NULL,
job_priority INTEGER NOT NULL DEFAULT 0,
job_queue INTEGER NOT NULL DEFAULT 1,
job_language VARCHAR(50) NOT NULL DEFAULT 'sql',
execute_str TEXT NOT NULL,
execute_result TEXT,
error_msg TEXT,
complete_status INTEGER NOT NULL DEFAULT 0,
run_as VARCHAR(100),
rid_broker_schedule BIGINT,
rid_broker_queueinstance BIGINT,
depends_on TEXT[],
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
started_at TIMESTAMP WITH TIME ZONE,
completed_at TIMESTAMP WITH TIME ZONE,
CONSTRAINT broker_jobs_complete_status_check CHECK (complete_status IN (0, 1, 2, 3, 4)),
CONSTRAINT broker_jobs_job_queue_check CHECK (job_queue > 0),
CONSTRAINT fk_schedule FOREIGN KEY (rid_broker_schedule) REFERENCES broker_schedule(id_broker_schedule) ON DELETE SET NULL,
CONSTRAINT fk_instance FOREIGN KEY (rid_broker_queueinstance) REFERENCES broker_queueinstance(id_broker_queueinstance) ON DELETE SET NULL
);
-- Indexes
CREATE INDEX IF NOT EXISTS idx_broker_jobs_status ON broker_jobs(complete_status);
CREATE INDEX IF NOT EXISTS idx_broker_jobs_queue ON broker_jobs(job_queue, complete_status, job_priority);
CREATE INDEX IF NOT EXISTS idx_broker_jobs_schedule ON broker_jobs(rid_broker_schedule);
CREATE INDEX IF NOT EXISTS idx_broker_jobs_instance ON broker_jobs(rid_broker_queueinstance);
CREATE INDEX IF NOT EXISTS idx_broker_jobs_created ON broker_jobs(created_at);
CREATE INDEX IF NOT EXISTS idx_broker_jobs_name ON broker_jobs(job_name, complete_status);
-- Comments
COMMENT ON TABLE broker_jobs IS 'Job queue for broker execution';
COMMENT ON COLUMN broker_jobs.job_name IS 'Name/description of the job';
COMMENT ON COLUMN broker_jobs.job_priority IS 'Job priority (higher = more important)';
COMMENT ON COLUMN broker_jobs.job_queue IS 'Queue number (allows parallel processing)';
COMMENT ON COLUMN broker_jobs.job_language IS 'Execution language (sql, plpgsql, etc.)';
COMMENT ON COLUMN broker_jobs.execute_str IS 'SQL or code to execute';
COMMENT ON COLUMN broker_jobs.complete_status IS '0=pending, 1=running, 2=completed, 3=failed, 4=cancelled';
COMMENT ON COLUMN broker_jobs.run_as IS 'User context to run the job as';
COMMENT ON COLUMN broker_jobs.rid_broker_schedule IS 'Reference to schedule if job was scheduled';
COMMENT ON COLUMN broker_jobs.rid_broker_queueinstance IS 'Instance that processed this job';
COMMENT ON COLUMN broker_jobs.depends_on IS 'Array of job names that must be completed before this job can run';
-- Trigger to update updated_at
CREATE OR REPLACE FUNCTION tf_broker_jobs_update_timestamp()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER t_broker_jobs_updated_at
BEFORE UPDATE ON broker_jobs
FOR EACH ROW
EXECUTE FUNCTION tf_broker_jobs_update_timestamp();

View File

@@ -0,0 +1,50 @@
-- broker_schedule table
-- Stores scheduled jobs (cron-like functionality)
CREATE TABLE IF NOT EXISTS broker_schedule (
id_broker_schedule BIGSERIAL PRIMARY KEY,
name VARCHAR(255) NOT NULL UNIQUE,
cron_expr VARCHAR(100) NOT NULL,
enabled BOOLEAN NOT NULL DEFAULT true,
job_name VARCHAR(255) NOT NULL,
job_priority INTEGER NOT NULL DEFAULT 0,
job_queue INTEGER NOT NULL DEFAULT 1,
job_language VARCHAR(50) NOT NULL DEFAULT 'sql',
execute_str TEXT NOT NULL,
run_as VARCHAR(100),
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
last_run_at TIMESTAMP WITH TIME ZONE,
next_run_at TIMESTAMP WITH TIME ZONE,
CONSTRAINT broker_schedule_job_queue_check CHECK (job_queue > 0)
);
-- Indexes
CREATE INDEX IF NOT EXISTS idx_broker_schedule_enabled ON broker_schedule(enabled);
CREATE INDEX IF NOT EXISTS idx_broker_schedule_next_run ON broker_schedule(next_run_at) WHERE enabled = true;
CREATE INDEX IF NOT EXISTS idx_broker_schedule_name ON broker_schedule(name);
-- Comments
COMMENT ON TABLE broker_schedule IS 'Scheduled jobs (cron-like functionality)';
COMMENT ON COLUMN broker_schedule.name IS 'Unique name for the schedule';
COMMENT ON COLUMN broker_schedule.cron_expr IS 'Cron expression for scheduling';
COMMENT ON COLUMN broker_schedule.enabled IS 'Whether the schedule is active';
COMMENT ON COLUMN broker_schedule.job_name IS 'Name of the job to create';
COMMENT ON COLUMN broker_schedule.execute_str IS 'SQL or code to execute';
COMMENT ON COLUMN broker_schedule.last_run_at IS 'Last time the job was executed';
COMMENT ON COLUMN broker_schedule.next_run_at IS 'Next scheduled execution time';
-- Trigger to update updated_at
CREATE OR REPLACE FUNCTION tf_broker_schedule_update_timestamp()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER t_broker_schedule_updated_at
BEFORE UPDATE ON broker_schedule
FOR EACH ROW
EXECUTE FUNCTION tf_broker_schedule_update_timestamp();

View File

@@ -0,0 +1,73 @@
package models
import "time"
// Job represents a broker job
type Job struct {
ID int64 `json:"id"`
JobName string `json:"job_name"`
JobPriority int32 `json:"job_priority"`
JobQueue int `json:"job_queue"`
JobLanguage string `json:"job_language"`
ExecuteStr string `json:"execute_str"`
ExecuteResult string `json:"execute_result"`
ErrorMsg string `json:"error_msg"`
CompleteStatus int `json:"complete_status"`
RunAs string `json:"run_as"`
UserLogin string `json:"user_login"`
ScheduleID int64 `json:"schedule_id"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
// Instance represents a broker instance
type Instance struct {
ID int64 `json:"id"`
Name string `json:"name"`
Hostname string `json:"hostname"`
PID int `json:"pid"`
Version string `json:"version"`
Status string `json:"status"` // active, inactive, shutdown
StartedAt time.Time `json:"started_at"`
LastPingAt time.Time `json:"last_ping_at"`
ShutdownAt time.Time `json:"shutdown_at"`
QueueCount int `json:"queue_count"`
JobsHandled int64 `json:"jobs_handled"`
}
// Schedule represents a job schedule
type Schedule struct {
ID int64 `json:"id"`
Name string `json:"name"`
CronExpr string `json:"cron_expr"`
Enabled bool `json:"enabled"`
JobName string `json:"job_name"`
JobPriority int32 `json:"job_priority"`
JobQueue int `json:"job_queue"`
ExecuteStr string `json:"execute_str"`
RunAs string `json:"run_as"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
LastRunAt time.Time `json:"last_run_at"`
NextRunAt time.Time `json:"next_run_at"`
}
// JobStatus represents job completion statuses
type JobStatus int
const (
JobStatusPending JobStatus = 0
JobStatusRunning JobStatus = 1
JobStatusCompleted JobStatus = 2
JobStatusFailed JobStatus = 3
JobStatusCancelled JobStatus = 4
)
// InstanceStatus represents instance statuses
type InstanceStatus string
const (
InstanceStatusActive InstanceStatus = "active"
InstanceStatusInactive InstanceStatus = "inactive"
InstanceStatusShutdown InstanceStatus = "shutdown"
)

148
pkg/broker/queue/queue.go Normal file
View File

@@ -0,0 +1,148 @@
package queue
import (
"context"
"fmt"
"sync"
"git.warky.dev/wdevs/pgsql-broker/pkg/broker/adapter"
"git.warky.dev/wdevs/pgsql-broker/pkg/broker/models"
"git.warky.dev/wdevs/pgsql-broker/pkg/broker/worker"
)
// Queue manages a collection of workers for a specific queue number
type Queue struct {
Number int
InstanceID int64
workers []*worker.Worker
db adapter.DBAdapter
logger adapter.Logger
ctx context.Context
cancel context.CancelFunc
mu sync.RWMutex
workerCount int
}
// Config holds queue configuration
type Config struct {
Number int
InstanceID int64
WorkerCount int
DBAdapter adapter.DBAdapter
Logger adapter.Logger
BufferSize int
TimerSeconds int
FetchSize int
}
// New creates a new queue manager
func New(cfg Config) *Queue {
ctx, cancel := context.WithCancel(context.Background())
logger := cfg.Logger.With("queue", cfg.Number)
return &Queue{
Number: cfg.Number,
InstanceID: cfg.InstanceID,
workers: make([]*worker.Worker, 0, cfg.WorkerCount),
db: cfg.DBAdapter,
logger: logger,
ctx: ctx,
cancel: cancel,
workerCount: cfg.WorkerCount,
}
}
// Start initializes and starts all workers in the queue
func (q *Queue) Start(cfg Config) error {
q.mu.Lock()
defer q.mu.Unlock()
q.logger.Info("starting queue", "worker_count", q.workerCount)
for i := 0; i < q.workerCount; i++ {
w := worker.New(worker.Config{
ID: i + 1,
QueueNumber: q.Number,
InstanceID: q.InstanceID,
DBAdapter: cfg.DBAdapter,
Logger: cfg.Logger,
BufferSize: cfg.BufferSize,
TimerSeconds: cfg.TimerSeconds,
FetchSize: cfg.FetchSize,
})
if err := w.Start(q.ctx); err != nil {
return fmt.Errorf("failed to start worker %d: %w", i+1, err)
}
q.workers = append(q.workers, w)
}
q.logger.Info("queue started successfully", "workers", len(q.workers))
return nil
}
// Stop gracefully stops all workers in the queue
func (q *Queue) Stop() error {
q.mu.Lock()
defer q.mu.Unlock()
q.logger.Info("stopping queue")
// Cancel context to signal all workers
q.cancel()
// Stop each worker
var stopErrors []error
for i, w := range q.workers {
if err := w.Stop(); err != nil {
stopErrors = append(stopErrors, fmt.Errorf("worker %d: %w", i+1, err))
}
}
if len(stopErrors) > 0 {
return fmt.Errorf("errors stopping workers: %v", stopErrors)
}
q.logger.Info("queue stopped successfully")
return nil
}
// AddJob adds a job to the least busy worker
func (q *Queue) AddJob(job models.Job) error {
q.mu.RLock()
defer q.mu.RUnlock()
if len(q.workers) == 0 {
return fmt.Errorf("no workers available")
}
// Simple round-robin: use first available worker
// Could be enhanced with load balancing
for _, w := range q.workers {
if err := w.AddJob(job); err == nil {
return nil
}
}
return fmt.Errorf("all workers are busy")
}
// GetStats returns statistics for all workers in the queue
func (q *Queue) GetStats() map[int]worker.Stats {
q.mu.RLock()
defer q.mu.RUnlock()
stats := make(map[int]worker.Stats)
for i, w := range q.workers {
lastActivity, jobsHandled, running := w.GetStats()
stats[i+1] = worker.Stats{
LastActivity: lastActivity,
JobsHandled: jobsHandled,
Running: running,
}
}
return stats
}

247
pkg/broker/worker/worker.go Normal file
View File

@@ -0,0 +1,247 @@
package worker
import (
"context"
"fmt"
"sync"
"time"
"git.warky.dev/wdevs/pgsql-broker/pkg/broker/adapter"
"git.warky.dev/wdevs/pgsql-broker/pkg/broker/models"
)
// Worker represents a single job processing worker
type Worker struct {
ID int
QueueNumber int
InstanceID int64
db adapter.DBAdapter
logger adapter.Logger
jobChan chan models.Job
shutdown chan struct{}
wg *sync.WaitGroup
running bool
mu sync.RWMutex
lastActivity time.Time
jobsHandled int64
timerSeconds int
fetchSize int
}
// Stats holds worker statistics
type Stats struct {
LastActivity time.Time
JobsHandled int64
Running bool
}
// Config holds worker configuration
type Config struct {
ID int
QueueNumber int
InstanceID int64
DBAdapter adapter.DBAdapter
Logger adapter.Logger
BufferSize int
TimerSeconds int
FetchSize int
}
// New creates a new worker
func New(cfg Config) *Worker {
return &Worker{
ID: cfg.ID,
QueueNumber: cfg.QueueNumber,
InstanceID: cfg.InstanceID,
db: cfg.DBAdapter,
logger: cfg.Logger.With("worker_id", cfg.ID).With("queue", cfg.QueueNumber),
jobChan: make(chan models.Job, cfg.BufferSize),
shutdown: make(chan struct{}),
wg: &sync.WaitGroup{},
timerSeconds: cfg.TimerSeconds,
fetchSize: cfg.FetchSize,
}
}
// Start begins the worker processing loop
func (w *Worker) Start(ctx context.Context) error {
w.mu.Lock()
if w.running {
w.mu.Unlock()
return fmt.Errorf("worker %d already running", w.ID)
}
w.running = true
w.mu.Unlock()
w.logger.Info("worker starting")
w.wg.Add(1)
go w.processLoop(ctx)
return nil
}
// Stop gracefully stops the worker
func (w *Worker) Stop() error {
w.mu.Lock()
if !w.running {
w.mu.Unlock()
return nil
}
w.mu.Unlock()
w.logger.Info("worker stopping")
close(w.shutdown)
w.wg.Wait()
w.mu.Lock()
w.running = false
w.mu.Unlock()
w.logger.Info("worker stopped")
return nil
}
// AddJob adds a job to the worker's queue
func (w *Worker) AddJob(job models.Job) error {
select {
case w.jobChan <- job:
return nil
default:
return fmt.Errorf("worker %d job channel is full", w.ID)
}
}
// processLoop is the main worker processing loop
func (w *Worker) processLoop(ctx context.Context) {
defer w.wg.Done()
defer w.recoverPanic()
timer := time.NewTimer(time.Duration(w.timerSeconds) * time.Second)
defer timer.Stop()
for {
select {
case job := <-w.jobChan:
w.updateActivity()
w.processJobs(ctx, &job)
case <-timer.C:
// Timer expired - fetch jobs from database
if w.timerSeconds > 0 {
w.updateActivity()
w.processJobs(ctx, nil)
}
timer.Reset(time.Duration(w.timerSeconds) * time.Second)
case <-w.shutdown:
w.logger.Info("worker shutdown signal received")
return
case <-ctx.Done():
w.logger.Info("worker context cancelled")
return
}
}
}
// processJobs processes jobs from the queue
func (w *Worker) processJobs(ctx context.Context, specificJob *models.Job) {
defer w.recoverPanic()
for i := 0; i < w.fetchSize; i++ {
var jobID int64
if specificJob != nil && specificJob.ID > 0 {
jobID = specificJob.ID
specificJob = nil // Only process once
} else {
// Fetch next job from database
var err error
jobID, err = w.fetchNextJob(ctx)
if err != nil {
w.logger.Error("failed to fetch job", "error", err)
return
}
}
if jobID <= 0 {
// No more jobs
return
}
// Run the job
if err := w.runJob(ctx, jobID); err != nil {
w.logger.Error("failed to run job", "job_id", jobID, "error", err)
} else {
w.jobsHandled++
}
}
}
// fetchNextJob fetches the next job from the queue
func (w *Worker) fetchNextJob(ctx context.Context) (int64, error) {
var retval int
var errmsg string
var jobID int64
err := w.db.QueryRow(ctx,
"SELECT p_retval, p_errmsg, p_job_id FROM broker_get($1, $2)",
w.QueueNumber, w.InstanceID,
).Scan(&retval, &errmsg, &jobID)
if err != nil {
return 0, fmt.Errorf("query error: %w", err)
}
if retval > 0 {
return 0, fmt.Errorf("broker_get error: %s", errmsg)
}
return jobID, nil
}
// runJob executes a job
func (w *Worker) runJob(ctx context.Context, jobID int64) error {
w.logger.Debug("running job", "job_id", jobID)
var retval int
var errmsg string
err := w.db.QueryRow(ctx,
"SELECT p_retval, p_errmsg FROM broker_run($1)",
jobID,
).Scan(&retval, &errmsg)
if err != nil {
return fmt.Errorf("query error: %w", err)
}
if retval > 0 {
return fmt.Errorf("broker_run error: %s", errmsg)
}
w.logger.Debug("job completed", "job_id", jobID)
return nil
}
// updateActivity updates the last activity timestamp
func (w *Worker) updateActivity() {
w.mu.Lock()
w.lastActivity = time.Now()
w.mu.Unlock()
}
// GetStats returns worker statistics
func (w *Worker) GetStats() (lastActivity time.Time, jobsHandled int64, running bool) {
w.mu.RLock()
defer w.mu.RUnlock()
return w.lastActivity, w.jobsHandled, w.running
}
// recoverPanic recovers from panics in the worker
func (w *Worker) recoverPanic() {
if r := recover(); r != nil {
w.logger.Error("worker panic recovered", "panic", r)
}
}