From 19e469ff5460ff718dc964271e027318eb901344 Mon Sep 17 00:00:00 2001 From: Hein Date: Fri, 2 Jan 2026 20:56:39 +0200 Subject: [PATCH] feat: :tada: postgresql broker first commit of forked prototype from my original code --- .gitignore | 25 ++ Makefile | 88 +++++ README.md | 294 +++++++++++++++- broker.example.yaml | 48 +++ cmd/broker/main.go | 233 +++++++++++++ go.mod | 25 ++ go.sum | 56 +++ pkg/broker/adapter/database.go | 77 +++++ pkg/broker/adapter/logger.go | 22 ++ pkg/broker/adapter/postgres.go | 311 +++++++++++++++++ pkg/broker/adapter/slog_logger.go | 80 +++++ pkg/broker/broker.go | 127 +++++++ pkg/broker/config/config.go | 180 ++++++++++ pkg/broker/database_instance.go | 326 ++++++++++++++++++ pkg/broker/install/install.go | 246 +++++++++++++ .../install/sql/procedures/00_install.sql | 13 + .../install/sql/procedures/01_broker_get.sql | 76 ++++ .../install/sql/procedures/02_broker_run.sql | 113 ++++++ .../install/sql/procedures/03_broker_set.sql | 95 +++++ .../04_broker_register_instance.sql | 82 +++++ .../sql/procedures/05_broker_add_job.sql | 91 +++++ .../procedures/06_broker_ping_instance.sql | 98 ++++++ pkg/broker/install/sql/tables/00_install.sql | 10 + .../sql/tables/01_broker_queueinstance.sql | 31 ++ .../install/sql/tables/02_broker_jobs.sql | 62 ++++ .../install/sql/tables/03_broker_schedule.sql | 50 +++ pkg/broker/models/models.go | 73 ++++ pkg/broker/queue/queue.go | 148 ++++++++ pkg/broker/worker/worker.go | 247 +++++++++++++ 29 files changed, 3325 insertions(+), 2 deletions(-) create mode 100644 Makefile create mode 100644 broker.example.yaml create mode 100644 cmd/broker/main.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 pkg/broker/adapter/database.go create mode 100644 pkg/broker/adapter/logger.go create mode 100644 pkg/broker/adapter/postgres.go create mode 100644 pkg/broker/adapter/slog_logger.go create mode 100644 pkg/broker/broker.go create mode 100644 pkg/broker/config/config.go create mode 100644 pkg/broker/database_instance.go create mode 100644 pkg/broker/install/install.go create mode 100644 pkg/broker/install/sql/procedures/00_install.sql create mode 100644 pkg/broker/install/sql/procedures/01_broker_get.sql create mode 100644 pkg/broker/install/sql/procedures/02_broker_run.sql create mode 100644 pkg/broker/install/sql/procedures/03_broker_set.sql create mode 100644 pkg/broker/install/sql/procedures/04_broker_register_instance.sql create mode 100644 pkg/broker/install/sql/procedures/05_broker_add_job.sql create mode 100644 pkg/broker/install/sql/procedures/06_broker_ping_instance.sql create mode 100644 pkg/broker/install/sql/tables/00_install.sql create mode 100644 pkg/broker/install/sql/tables/01_broker_queueinstance.sql create mode 100644 pkg/broker/install/sql/tables/02_broker_jobs.sql create mode 100644 pkg/broker/install/sql/tables/03_broker_schedule.sql create mode 100644 pkg/broker/models/models.go create mode 100644 pkg/broker/queue/queue.go create mode 100644 pkg/broker/worker/worker.go diff --git a/.gitignore b/.gitignore index 5b90e79..d19aac0 100644 --- a/.gitignore +++ b/.gitignore @@ -25,3 +25,28 @@ go.work.sum # env file .env +# Binaries directory +bin/ + +# Configuration (exclude actual config, keep examples) +broker.yaml +broker.yml +broker.json +!broker.example.yaml + +# IDE +.vscode/ +.idea/ +*.swp +*.swo +*~ + +# OS +.DS_Store +Thumbs.db + +# Logs +*.log + +# Coverage +coverage.html diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..f363d9f --- /dev/null +++ b/Makefile @@ -0,0 +1,88 @@ +.PHONY: all build clean test install deps help + +# Build variables +BINARY_NAME=pgsql-broker +BIN_DIR=bin +CMD_DIR=cmd/broker +GO=go +GOFLAGS=-v +LDFLAGS=-w -s + +# Version information +VERSION ?= $(shell git describe --tags --always --dirty 2>/dev/null || echo "dev") +BUILD_TIME=$(shell date -u '+%Y-%m-%d_%H:%M:%S') +COMMIT=$(shell git rev-parse --short HEAD 2>/dev/null || echo "unknown") + +# Inject version info +LDFLAGS += -X 'main.Version=$(VERSION)' -X 'main.BuildTime=$(BUILD_TIME)' -X 'main.Commit=$(COMMIT)' + +all: clean deps build ## Build everything + +build: ## Build the broker binary + @echo "Building $(BINARY_NAME)..." + @mkdir -p $(BIN_DIR) + $(GO) build $(GOFLAGS) -ldflags "$(LDFLAGS)" -o $(BIN_DIR)/$(BINARY_NAME) ./$(CMD_DIR) + @echo "Built: $(BIN_DIR)/$(BINARY_NAME)" + +clean: ## Remove build artifacts + @echo "Cleaning..." + @rm -rf $(BIN_DIR) + @$(GO) clean + @echo "Clean complete" + +deps: ## Download dependencies + @echo "Downloading dependencies..." + @$(GO) mod download + @$(GO) mod tidy + @echo "Dependencies ready" + +test: ## Run tests + @echo "Running tests..." + @$(GO) test -v -race -cover ./... + +install: build ## Install the binary to GOPATH/bin + @echo "Installing to GOPATH/bin..." + @$(GO) install $(GOFLAGS) -ldflags "$(LDFLAGS)" ./$(CMD_DIR) + @echo "Installed: $(BINARY_NAME)" + +run: build ## Build and run the broker + @echo "Running $(BINARY_NAME)..." + @$(BIN_DIR)/$(BINARY_NAME) + +fmt: ## Format the code + @echo "Formatting code..." + @$(GO) fmt ./... + @echo "Formatting complete" + +vet: ## Run go vet + @echo "Running go vet..." + @$(GO) vet ./... + @echo "Vet complete" + +lint: ## Run golangci-lint (if installed) + @if command -v golangci-lint >/dev/null 2>&1; then \ + echo "Running golangci-lint..."; \ + golangci-lint run ./...; \ + else \ + echo "golangci-lint not installed, skipping"; \ + fi + +sql-install: build ## Install SQL tables and procedures using broker CLI + @echo "Installing SQL schema..." + @$(BIN_DIR)/$(BINARY_NAME) install + +sql-install-manual: ## Install SQL tables and procedures manually via psql + @echo "Installing SQL schema manually..." + @if [ -z "$$PGDATABASE" ]; then \ + echo "Error: PGDATABASE environment variable not set"; \ + exit 1; \ + fi + @psql -f pkg/broker/install/sql/tables/00_install.sql + @psql -f pkg/broker/install/sql/procedures/00_install.sql + @echo "SQL schema installed" + +help: ## Show this help message + @echo "Usage: make [target]" + @echo "" + @echo "Targets:" + @grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf " %-15s %s\n", $$1, $$2}' diff --git a/README.md b/README.md index f420191..2c8df75 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,293 @@ -# pgsql-broker +# PostgreSQL Broker -PostgreSQL Broker \ No newline at end of file +A robust, event-driven job processing system for PostgreSQL that uses LISTEN/NOTIFY for real-time job execution. It supports multiple queues, priority-based scheduling, and can be used both as a standalone service or as a Go library. + +## Features + +- **Multi-Database Support**: Single broker process can manage multiple database connections +- **Event-Driven**: Uses PostgreSQL LISTEN/NOTIFY for instant job notifications +- **Multiple Queues**: Support for concurrent job processing across multiple queues per database +- **Priority Scheduling**: Jobs can be prioritized for execution order +- **Job Dependencies**: Jobs can depend on other jobs being completed first +- **Adapter Pattern**: Clean interfaces for database and logging (easy to extend) +- **Standalone or Library**: Use as a CLI tool or integrate into your Go application +- **Configuration Management**: Viper-based config with support for YAML, JSON, and environment variables +- **Graceful Shutdown**: Proper cleanup and job completion on shutdown +- **Instance Tracking**: Monitor active broker instances through the database +- **Single Instance Per Database**: Enforces one broker instance per database to prevent conflicts +- **Embedded SQL Installer**: Database schema embedded in binary with built-in install command + +## Architecture + +The broker supports multi-database architecture where a single broker process can manage multiple database connections. Each database has its own instance with dedicated queues, but only ONE broker instance is allowed per database. + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ Broker Process │ +├─────────────────────────────────────────────────────────────────┤ +│ │ +│ ┌────────────────────────────┐ ┌───────────────────────────┐ │ +│ │ Database Instance (DB1) │ │ Database Instance (DB2) │ │ +│ ├────────────────────────────┤ ├───────────────────────────┤ │ +│ │ ┌────┐ ┌────┐ ┌────┐ │ │ ┌────┐ ┌────┐ │ │ +│ │ │ Q1 │ │ Q2 │ │ QN │ │ │ │ Q1 │ │ Q2 │ │ │ +│ │ └────┘ └────┘ └────┘ │ │ └────┘ └────┘ │ │ +│ │ │ │ │ │ +│ │ ┌────────────────────────┐│ │ ┌────────────────────────┐│ │ +│ │ │ PostgreSQL Adapter ││ │ │ PostgreSQL Adapter ││ │ +│ │ │ - Connection Pool ││ │ │ - Connection Pool ││ │ +│ │ │ - LISTEN/NOTIFY ││ │ │ - LISTEN/NOTIFY ││ │ +│ │ └────────────────────────┘│ │ └────────────────────────┘│ │ +│ └────────────┬───────────────┘ └──────────┬────────────────┘ │ +│ │ │ │ +└───────────────┼──────────────────────────────┼───────────────────┘ + │ │ + ▼ ▼ + ┌──────────────────────┐ ┌──────────────────────┐ + │ PostgreSQL (DB1) │ │ PostgreSQL (DB2) │ + │ - broker_jobs │ │ - broker_jobs │ + │ - broker_queueinstance│ │ - broker_queueinstance│ + │ - broker_schedule │ │ - broker_schedule │ + └──────────────────────┘ └──────────────────────┘ +``` + +**Key Points**: +- One broker process can manage multiple databases +- Each database has exactly ONE active broker instance +- Each database instance has its own queues and workers +- Validation prevents multiple broker processes from connecting to the same database +- Different databases can have different queue counts + +## Installation + +### From Source + +```bash +git clone git.warky.dev/wdevs/pgsql-broker +cd pgsql-broker +make build +``` + +The binary will be available in `bin/pgsql-broker`. + +### As a Library + +```bash +go get git.warky.dev/wdevs/pgsql-broker +``` + +## Quick Start + +### 1. Setup Database + +Install the required tables and stored procedures: + +```bash +# Using the CLI (recommended) +./bin/pgsql-broker install --config broker.yaml + +# Or with make +make sql-install + +# Verify installation +./bin/pgsql-broker install --verify-only --config broker.yaml + +# Or manually with psql: +psql -f pkg/broker/install/sql/tables/00_install.sql +psql -f pkg/broker/install/sql/procedures/00_install.sql +``` + +### 2. Configure + +Create a configuration file `broker.yaml`: + +```yaml +databases: + - name: db1 + host: localhost + port: 5432 + database: broker_db1 + user: postgres + password: your_password + sslmode: disable + queue_count: 4 + + # Optional: add more databases + - name: db2 + host: localhost + port: 5432 + database: broker_db2 + user: postgres + password: your_password + sslmode: disable + queue_count: 2 + +broker: + name: pgsql-broker + enable_debug: false + +logging: + level: info + format: json +``` + +**Note**: Each database requires a unique `name` identifier and can have its own `queue_count` configuration. + +### 3. Run the Broker + +```bash +# Using the binary +./bin/pgsql-broker start --config broker.yaml + +# Or with make +make run + +# Or with custom log level +./bin/pgsql-broker start --log-level debug +``` + +### 4. Add a Job + +```sql +SELECT broker_add_job( + 'My Job', -- job_name + 'SELECT do_something()', -- execute_str + 1, -- job_queue (default: 1) + 0, -- job_priority (default: 0) + 'sql', -- job_language (default: 'sql') + NULL, -- run_as + NULL, -- user_login + NULL -- schedule_id +); +``` + +## Usage as a Library + +```go +package main + +import ( + "git.warky.dev/wdevs/pgsql-broker/pkg/broker" + "git.warky.dev/wdevs/pgsql-broker/pkg/broker/adapter" + "git.warky.dev/wdevs/pgsql-broker/pkg/broker/config" +) + +func main() { + // Load config + cfg, _ := config.LoadConfig("broker.yaml") + + // Create logger + logger := adapter.NewSlogLogger(slog.LevelInfo) + + // Create database adapter + dbAdapter := adapter.NewPostgresAdapter(cfg.Database.ToPostgresConfig(), logger) + + // Create and start broker + instance, _ := broker.New(cfg, dbAdapter, logger, "1.0.0") + instance.Start() + + // ... wait for shutdown signal ... + + instance.Stop() +} +``` + +See the [examples](./examples/) directory for complete examples. + +## Database Schema + +### Tables + +- **broker_queueinstance**: Tracks active broker queue instances (one per database) +- **broker_jobs**: Job queue with status tracking +- **broker_schedule**: Scheduled jobs (cron-like functionality) + +### Stored Procedures + +- **broker_get**: Fetch the next job from a queue +- **broker_run**: Execute a job +- **broker_set**: Set runtime options (user, application_name, etc.) +- **broker_add_job**: Add a new job to the queue +- **broker_register_instance**: Register a broker instance +- **broker_ping_instance**: Update instance heartbeat +- **broker_shutdown_instance**: Mark instance as shutdown + +## Configuration Reference + +See [broker.example.yaml](./broker.example.yaml) for a complete configuration example. + +### Database Settings + +The `databases` array can contain multiple database configurations. Each entry supports: + +| Setting | Description | Default | +|---------|-------------|---------| +| `name` | Unique identifier for this database | **Required** | +| `host` | PostgreSQL host | `localhost` | +| `port` | PostgreSQL port | `5432` | +| `database` | Database name | **Required** | +| `user` | Database user | **Required** | +| `password` | Database password | - | +| `sslmode` | SSL mode | `disable` | +| `max_open_conns` | Max open connections | `25` | +| `max_idle_conns` | Max idle connections | `5` | +| `conn_max_lifetime` | Connection max lifetime | `5m` | +| `conn_max_idle_time` | Connection max idle time | `10m` | +| `queue_count` | Number of queues for this database | `4` | + +### Broker Settings + +Global settings applied to all database instances: + +| Setting | Description | Default | +|---------|-------------|---------| +| `name` | Broker instance name | `pgsql-broker` | +| `fetch_query_que_size` | Jobs per fetch cycle | `100` | +| `queue_timer_sec` | Seconds between polls | `10` | +| `queue_buffer_size` | Job buffer size | `50` | +| `worker_idle_timeout_sec` | Worker idle timeout | `10` | +| `notify_retry_seconds` | NOTIFY retry interval | `30s` | +| `enable_debug` | Enable debug logging | `false` | + +## Development + +### Building + +```bash +make build # Build the binary +make clean # Clean build artifacts +make deps # Download dependencies +make fmt # Format code +make vet # Run go vet +make test # Run tests +``` + +### Project Structure + +``` +pgsql-broker/ +├── cmd/broker/ # CLI application +├── pkg/broker/ # Core broker package +│ ├── adapter/ # Database & logger interfaces +│ ├── config/ # Configuration management +│ ├── models/ # Data models +│ ├── queue/ # Queue management +│ ├── worker/ # Worker implementation +│ └── install/ # Database installer with embedded SQL +│ └── sql/ # SQL schema (embedded in binary) +│ ├── tables/ # Table definitions +│ └── procedures/ # Stored procedures +├── examples/ # Usage examples +└── Makefile # Build automation +``` + +## Contributing + +Contributions are welcome! Please ensure: +- Code is formatted with `go fmt` +- Tests pass with `go test ./...` +- Documentation is updated + +## License + +See [LICENSE](./LICENSE) file for details. \ No newline at end of file diff --git a/broker.example.yaml b/broker.example.yaml new file mode 100644 index 0000000..c4a1215 --- /dev/null +++ b/broker.example.yaml @@ -0,0 +1,48 @@ +# PostgreSQL Broker Configuration Example + +# Database connections settings +# The broker can manage multiple databases, each with its own instance and queues +# Important: Each database can only have ONE active broker instance +databases: + # First database + - name: db1 # Unique identifier for this database connection + host: localhost + port: 5432 + database: broker_db1 + user: postgres + password: your_password_here + sslmode: disable # Options: disable, require, verify-ca, verify-full + max_open_conns: 25 + max_idle_conns: 5 + conn_max_lifetime: 5m + conn_max_idle_time: 10m + queue_count: 4 # Number of concurrent queues for this database + + # Second database (optional - add as many as needed) + - name: db2 + host: localhost + port: 5432 + database: broker_db2 + user: postgres + password: your_password_here + sslmode: disable + max_open_conns: 25 + max_idle_conns: 5 + conn_max_lifetime: 5m + conn_max_idle_time: 10m + queue_count: 2 # Can have different queue count per database + +# Broker settings (applied to all database instances) +broker: + name: pgsql-broker + fetch_query_que_size: 100 # Number of jobs to process per fetch cycle + queue_timer_sec: 10 # Seconds between queue polls + queue_buffer_size: 50 # Size of job buffer per queue + worker_idle_timeout_sec: 10 # Worker idle timeout + notify_retry_seconds: 30s # LISTEN/NOTIFY retry interval + enable_debug: false # Enable debug logging + +# Logging settings +logging: + level: info # Options: debug, info, warn, error + format: json # Options: json, text diff --git a/cmd/broker/main.go b/cmd/broker/main.go new file mode 100644 index 0000000..c7e2287 --- /dev/null +++ b/cmd/broker/main.go @@ -0,0 +1,233 @@ +package main + +import ( + "context" + "fmt" + "log/slog" + "os" + "os/signal" + "syscall" + + "github.com/spf13/cobra" + "git.warky.dev/wdevs/pgsql-broker/pkg/broker" + "git.warky.dev/wdevs/pgsql-broker/pkg/broker/adapter" + "git.warky.dev/wdevs/pgsql-broker/pkg/broker/config" + "git.warky.dev/wdevs/pgsql-broker/pkg/broker/install" +) + +var ( + // Version information (injected by build) + Version = "dev" + BuildTime = "unknown" + Commit = "unknown" + + // Command line flags + cfgFile string + logLevel string + verifyOnly bool +) + +func main() { + if err := rootCmd.Execute(); err != nil { + fmt.Fprintf(os.Stderr, "Error: %v\n", err) + os.Exit(1) + } +} + +var rootCmd = &cobra.Command{ + Use: "pgsql-broker", + Short: "PostgreSQL job broker for background job processing", + Long: `PostgreSQL Broker is a job processing system that uses PostgreSQL +LISTEN/NOTIFY for event-driven job execution. It supports multiple queues, +priority-based scheduling, and horizontal scaling.`, + Version: fmt.Sprintf("%s (built %s, commit %s)", Version, BuildTime, Commit), +} + +var startCmd = &cobra.Command{ + Use: "start", + Short: "Start the broker instance", + Long: `Start the broker instance and begin processing jobs from the database queue.`, + RunE: func(cmd *cobra.Command, args []string) error { + return runBroker() + }, +} + +var versionCmd = &cobra.Command{ + Use: "version", + Short: "Print version information", + Run: func(cmd *cobra.Command, args []string) { + fmt.Printf("pgsql-broker version %s\n", Version) + fmt.Printf("Built: %s\n", BuildTime) + fmt.Printf("Commit: %s\n", Commit) + }, +} + +var installCmd = &cobra.Command{ + Use: "install", + Short: "Install database schema (tables and procedures)", + Long: `Install the required database schema including tables and stored procedures. +This command will create all necessary tables and functions in the configured database.`, + RunE: func(cmd *cobra.Command, args []string) error { + return runInstall() + }, +} + +func init() { + rootCmd.AddCommand(startCmd) + rootCmd.AddCommand(versionCmd) + rootCmd.AddCommand(installCmd) + + // Persistent flags + rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file (default is broker.yaml)") + rootCmd.PersistentFlags().StringVar(&logLevel, "log-level", "info", "log level (debug, info, warn, error)") + + // Install command flags + installCmd.Flags().BoolVar(&verifyOnly, "verify-only", false, "only verify installation without installing") +} + +func runBroker() error { + // Load configuration + cfg, err := config.LoadConfig(cfgFile) + if err != nil { + return fmt.Errorf("failed to load config: %w", err) + } + + // Override log level if specified + if logLevel != "" { + cfg.Logging.Level = logLevel + } + + // Setup logger + logger := createLogger(cfg.Logging) + logger.Info("starting pgsql-broker", "version", Version, "databases", len(cfg.Databases)) + + // Create broker (manages all database instances) + b, err := broker.New(cfg, logger, Version) + if err != nil { + return fmt.Errorf("failed to create broker: %w", err) + } + + // Start the broker (starts all database instances) + if err := b.Start(); err != nil { + return fmt.Errorf("failed to start broker: %w", err) + } + + // Wait for shutdown signal + waitForShutdown(b, logger) + + return nil +} + +func runInstall() error { + // Load configuration + cfg, err := config.LoadConfig(cfgFile) + if err != nil { + return fmt.Errorf("failed to load config: %w", err) + } + + // Override log level if specified + if logLevel != "" { + cfg.Logging.Level = logLevel + } + + // Setup logger + logger := createLogger(cfg.Logging) + logger.Info("pgsql-broker database installer", "version", Version, "databases", len(cfg.Databases)) + + ctx := context.Background() + + // Install/verify on all configured databases + for i, dbCfg := range cfg.Databases { + logger.Info("processing database", "index", i, "name", dbCfg.Name, "host", dbCfg.Host, "database", dbCfg.Database) + + // Create database adapter + dbAdapter := adapter.NewPostgresAdapter(dbCfg.ToPostgresConfig(), logger) + + // Connect to database + if err := dbAdapter.Connect(ctx); err != nil { + return fmt.Errorf("failed to connect to database %s: %w", dbCfg.Name, err) + } + + // Create installer + installer := install.New(dbAdapter, logger) + + if verifyOnly { + // Only verify installation + logger.Info("verifying database schema", "database", dbCfg.Name) + if err := installer.VerifyInstallation(ctx); err != nil { + dbAdapter.Close() + logger.Error("verification failed", "database", dbCfg.Name, "error", err) + return fmt.Errorf("verification failed for %s: %w", dbCfg.Name, err) + } + logger.Info("database schema verified successfully", "database", dbCfg.Name) + } else { + // Install schema + logger.Info("installing database schema", "database", dbCfg.Name) + if err := installer.InstallSchema(ctx); err != nil { + dbAdapter.Close() + logger.Error("installation failed", "database", dbCfg.Name, "error", err) + return fmt.Errorf("installation failed for %s: %w", dbCfg.Name, err) + } + + // Verify installation + logger.Info("verifying installation", "database", dbCfg.Name) + if err := installer.VerifyInstallation(ctx); err != nil { + dbAdapter.Close() + logger.Error("verification failed", "database", dbCfg.Name, "error", err) + return fmt.Errorf("verification failed for %s: %w", dbCfg.Name, err) + } + + logger.Info("database schema installed and verified successfully", "database", dbCfg.Name) + } + + dbAdapter.Close() + } + + if verifyOnly { + logger.Info("all databases verified successfully") + } else { + logger.Info("all databases installed and verified successfully") + } + return nil +} + +func createLogger(cfg config.LoggingConfig) adapter.Logger { + // Parse log level + var level slog.Level + switch cfg.Level { + case "debug": + level = slog.LevelDebug + case "info": + level = slog.LevelInfo + case "warn": + level = slog.LevelWarn + case "error": + level = slog.LevelError + default: + level = slog.LevelInfo + } + + // Create handler based on format + var handler slog.Handler + opts := &slog.HandlerOptions{Level: level} + + if cfg.Format == "text" { + handler = slog.NewTextHandler(os.Stdout, opts) + } else { + handler = slog.NewJSONHandler(os.Stdout, opts) + } + + return adapter.NewSlogLoggerWithHandler(handler) +} + +func waitForShutdown(b *broker.Broker, logger adapter.Logger) { + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM, syscall.SIGINT) + + sig := <-sigChan + logger.Info("received shutdown signal", "signal", sig) + + if err := b.Stop(); err != nil { + logger.Error("error during shutdown", "error", err) + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..c0650b8 --- /dev/null +++ b/go.mod @@ -0,0 +1,25 @@ +module git.warky.dev/wdevs/pgsql-broker + +go 1.25.5 + +require ( + github.com/lib/pq v1.10.9 + github.com/spf13/cobra v1.10.2 + github.com/spf13/viper v1.21.0 +) + +require ( + github.com/fsnotify/fsnotify v1.9.0 // indirect + github.com/go-viper/mapstructure/v2 v2.4.0 // indirect + github.com/inconshreveable/mousetrap v1.1.0 // indirect + github.com/pelletier/go-toml/v2 v2.2.4 // indirect + github.com/sagikazarmark/locafero v0.11.0 // indirect + github.com/sourcegraph/conc v0.3.1-0.20240121214520-5f936abd7ae8 // indirect + github.com/spf13/afero v1.15.0 // indirect + github.com/spf13/cast v1.10.0 // indirect + github.com/spf13/pflag v1.0.10 // indirect + github.com/subosito/gotenv v1.6.0 // indirect + go.yaml.in/yaml/v3 v3.0.4 // indirect + golang.org/x/sys v0.29.0 // indirect + golang.org/x/text v0.28.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..3fa1763 --- /dev/null +++ b/go.sum @@ -0,0 +1,56 @@ +github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= +github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= +github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k= +github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= +github.com/go-viper/mapstructure/v2 v2.4.0 h1:EBsztssimR/CONLSZZ04E8qAkxNYq4Qp9LvH92wZUgs= +github.com/go-viper/mapstructure/v2 v2.4.0/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= +github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +github.com/pelletier/go-toml/v2 v2.2.4 h1:mye9XuhQ6gvn5h28+VilKrrPoQVanw5PMw/TB0t5Ec4= +github.com/pelletier/go-toml/v2 v2.2.4/go.mod h1:2gIqNv+qfxSVS7cM2xJQKtLSTLUE9V8t9Stt+h56mCY= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/sagikazarmark/locafero v0.11.0 h1:1iurJgmM9G3PA/I+wWYIOw/5SyBtxapeHDcg+AAIFXc= +github.com/sagikazarmark/locafero v0.11.0/go.mod h1:nVIGvgyzw595SUSUE6tvCp3YYTeHs15MvlmU87WwIik= +github.com/sourcegraph/conc v0.3.1-0.20240121214520-5f936abd7ae8 h1:+jumHNA0Wrelhe64i8F6HNlS8pkoyMv5sreGx2Ry5Rw= +github.com/sourcegraph/conc v0.3.1-0.20240121214520-5f936abd7ae8/go.mod h1:3n1Cwaq1E1/1lhQhtRK2ts/ZwZEhjcQeJQ1RuC6Q/8U= +github.com/spf13/afero v1.15.0 h1:b/YBCLWAJdFWJTN9cLhiXXcD7mzKn9Dm86dNnfyQw1I= +github.com/spf13/afero v1.15.0/go.mod h1:NC2ByUVxtQs4b3sIUphxK0NioZnmxgyCrfzeuq8lxMg= +github.com/spf13/cast v1.10.0 h1:h2x0u2shc1QuLHfxi+cTJvs30+ZAHOGRic8uyGTDWxY= +github.com/spf13/cast v1.10.0/go.mod h1:jNfB8QC9IA6ZuY2ZjDp0KtFO2LZZlg4S/7bzP6qqeHo= +github.com/spf13/cobra v1.10.2 h1:DMTTonx5m65Ic0GOoRY2c16WCbHxOOw6xxezuLaBpcU= +github.com/spf13/cobra v1.10.2/go.mod h1:7C1pvHqHw5A4vrJfjNwvOdzYu0Gml16OCs2GRiTUUS4= +github.com/spf13/pflag v1.0.9/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/spf13/pflag v1.0.10 h1:4EBh2KAYBwaONj6b2Ye1GiHfwjqyROoF4RwYO+vPwFk= +github.com/spf13/pflag v1.0.10/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/spf13/viper v1.21.0 h1:x5S+0EU27Lbphp4UKm1C+1oQO+rKx36vfCoaVebLFSU= +github.com/spf13/viper v1.21.0/go.mod h1:P0lhsswPGWD/1lZJ9ny3fYnVqxiegrlNrEmgLjbTCAY= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= +github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= +go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc= +go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= +golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= +golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng= +golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pkg/broker/adapter/database.go b/pkg/broker/adapter/database.go new file mode 100644 index 0000000..cbcdef9 --- /dev/null +++ b/pkg/broker/adapter/database.go @@ -0,0 +1,77 @@ +package adapter + +import ( + "context" + "database/sql" +) + +// DBAdapter defines the interface for database operations +type DBAdapter interface { + // Connect establishes a connection to the database + Connect(ctx context.Context) error + + // Close closes the database connection + Close() error + + // Ping checks if the database is reachable + Ping(ctx context.Context) error + + // Begin starts a new transaction + Begin(ctx context.Context) (DBTransaction, error) + + // Exec executes a query without returning rows + Exec(ctx context.Context, query string, args ...interface{}) (sql.Result, error) + + // QueryRow executes a query that returns at most one row + QueryRow(ctx context.Context, query string, args ...interface{}) DBRow + + // Query executes a query that returns rows + Query(ctx context.Context, query string, args ...interface{}) (DBRows, error) + + // Listen starts listening on a PostgreSQL notification channel + Listen(ctx context.Context, channel string, handler NotificationHandler) error + + // Unlisten stops listening on a channel + Unlisten(ctx context.Context, channel string) error +} + +// DBTransaction defines the interface for database transactions +type DBTransaction interface { + // Exec executes a query within the transaction + Exec(ctx context.Context, query string, args ...interface{}) (sql.Result, error) + + // QueryRow executes a query that returns at most one row within the transaction + QueryRow(ctx context.Context, query string, args ...interface{}) DBRow + + // Query executes a query that returns rows within the transaction + Query(ctx context.Context, query string, args ...interface{}) (DBRows, error) + + // Commit commits the transaction + Commit() error + + // Rollback rolls back the transaction + Rollback() error +} + +// DBRow defines the interface for scanning a single row +type DBRow interface { + Scan(dest ...interface{}) error +} + +// DBRows defines the interface for scanning multiple rows +type DBRows interface { + Next() bool + Scan(dest ...interface{}) error + Close() error + Err() error +} + +// Notification represents a PostgreSQL NOTIFY event +type Notification struct { + Channel string + Payload string + PID int +} + +// NotificationHandler is called when a notification is received +type NotificationHandler func(notification *Notification) diff --git a/pkg/broker/adapter/logger.go b/pkg/broker/adapter/logger.go new file mode 100644 index 0000000..6bc82ad --- /dev/null +++ b/pkg/broker/adapter/logger.go @@ -0,0 +1,22 @@ +package adapter + +// Logger defines the interface for logging operations +type Logger interface { + // Debug logs a debug message + Debug(msg string, args ...interface{}) + + // Info logs an info message + Info(msg string, args ...interface{}) + + // Warn logs a warning message + Warn(msg string, args ...interface{}) + + // Error logs an error message + Error(msg string, args ...interface{}) + + // Fatal logs a fatal message and exits + Fatal(msg string, args ...interface{}) + + // With returns a new logger with additional context + With(key string, value interface{}) Logger +} diff --git a/pkg/broker/adapter/postgres.go b/pkg/broker/adapter/postgres.go new file mode 100644 index 0000000..32b7a4a --- /dev/null +++ b/pkg/broker/adapter/postgres.go @@ -0,0 +1,311 @@ +package adapter + +import ( + "context" + "database/sql" + "fmt" + "sync" + "time" + + "github.com/lib/pq" +) + +// PostgresConfig holds PostgreSQL connection configuration +type PostgresConfig struct { + Host string + Port int + Database string + User string + Password string + SSLMode string + MaxOpenConns int + MaxIdleConns int + ConnMaxLifetime time.Duration + ConnMaxIdleTime time.Duration +} + +// PostgresAdapter implements DBAdapter for PostgreSQL +type PostgresAdapter struct { + config PostgresConfig + db *sql.DB + listener *pq.Listener + logger Logger + mu sync.RWMutex +} + +// NewPostgresAdapter creates a new PostgreSQL adapter +func NewPostgresAdapter(config PostgresConfig, logger Logger) *PostgresAdapter { + return &PostgresAdapter{ + config: config, + logger: logger, + } +} + +// Connect establishes a connection to PostgreSQL +func (p *PostgresAdapter) Connect(ctx context.Context) error { + connStr := p.buildConnectionString() + + db, err := sql.Open("postgres", connStr) + if err != nil { + return fmt.Errorf("failed to open database: %w", err) + } + + // Configure connection pool + db.SetMaxOpenConns(p.config.MaxOpenConns) + db.SetMaxIdleConns(p.config.MaxIdleConns) + db.SetConnMaxLifetime(p.config.ConnMaxLifetime) + db.SetConnMaxIdleTime(p.config.ConnMaxIdleTime) + + // Test connection + if err := db.PingContext(ctx); err != nil { + db.Close() + return fmt.Errorf("failed to ping database: %w", err) + } + + p.mu.Lock() + p.db = db + p.mu.Unlock() + + p.logger.Info("PostgreSQL connection established", "host", p.config.Host, "database", p.config.Database) + return nil +} + +// Close closes the database connection +func (p *PostgresAdapter) Close() error { + p.mu.Lock() + defer p.mu.Unlock() + + if p.listener != nil { + if err := p.listener.Close(); err != nil { + p.logger.Error("failed to close listener", "error", err) + } + p.listener = nil + } + + if p.db != nil { + if err := p.db.Close(); err != nil { + return fmt.Errorf("failed to close database: %w", err) + } + p.db = nil + } + + p.logger.Info("PostgreSQL connection closed") + return nil +} + +// Ping checks if the database is reachable +func (p *PostgresAdapter) Ping(ctx context.Context) error { + p.mu.RLock() + db := p.db + p.mu.RUnlock() + + if db == nil { + return fmt.Errorf("database connection not established") + } + + return db.PingContext(ctx) +} + +// Begin starts a new transaction +func (p *PostgresAdapter) Begin(ctx context.Context) (DBTransaction, error) { + p.mu.RLock() + db := p.db + p.mu.RUnlock() + + if db == nil { + return nil, fmt.Errorf("database connection not established") + } + + tx, err := db.BeginTx(ctx, nil) + if err != nil { + return nil, fmt.Errorf("failed to begin transaction: %w", err) + } + + return &postgresTransaction{tx: tx}, nil +} + +// Exec executes a query without returning rows +func (p *PostgresAdapter) Exec(ctx context.Context, query string, args ...interface{}) (sql.Result, error) { + p.mu.RLock() + db := p.db + p.mu.RUnlock() + + if db == nil { + return nil, fmt.Errorf("database connection not established") + } + + return db.ExecContext(ctx, query, args...) +} + +// QueryRow executes a query that returns at most one row +func (p *PostgresAdapter) QueryRow(ctx context.Context, query string, args ...interface{}) DBRow { + p.mu.RLock() + db := p.db + p.mu.RUnlock() + + if db == nil { + return &postgresRow{err: fmt.Errorf("database connection not established")} + } + + return &postgresRow{row: db.QueryRowContext(ctx, query, args...)} +} + +// Query executes a query that returns rows +func (p *PostgresAdapter) Query(ctx context.Context, query string, args ...interface{}) (DBRows, error) { + p.mu.RLock() + db := p.db + p.mu.RUnlock() + + if db == nil { + return nil, fmt.Errorf("database connection not established") + } + + rows, err := db.QueryContext(ctx, query, args...) + if err != nil { + return nil, err + } + + return &postgresRows{rows: rows}, nil +} + +// Listen starts listening on a PostgreSQL notification channel +func (p *PostgresAdapter) Listen(ctx context.Context, channel string, handler NotificationHandler) error { + connStr := p.buildConnectionString() + + reportProblem := func(ev pq.ListenerEventType, err error) { + if err != nil { + p.logger.Error("listener problem", "event", ev, "error", err) + } + } + + minReconn := 10 * time.Second + maxReconn := 1 * time.Minute + + p.mu.Lock() + p.listener = pq.NewListener(connStr, minReconn, maxReconn, reportProblem) + listener := p.listener + p.mu.Unlock() + + if err := listener.Listen(channel); err != nil { + return fmt.Errorf("failed to listen on channel %s: %w", channel, err) + } + + p.logger.Info("listening on channel", "channel", channel) + + // Start notification handler in goroutine + go func() { + for { + select { + case n := <-listener.Notify: + if n != nil { + handler(&Notification{ + Channel: n.Channel, + Payload: n.Extra, + PID: n.BePid, + }) + } + case <-ctx.Done(): + p.logger.Info("stopping listener", "channel", channel) + return + case <-time.After(90 * time.Second): + go listener.Ping() + } + } + }() + + return nil +} + +// Unlisten stops listening on a channel +func (p *PostgresAdapter) Unlisten(ctx context.Context, channel string) error { + p.mu.RLock() + listener := p.listener + p.mu.RUnlock() + + if listener == nil { + return nil + } + + return listener.Unlisten(channel) +} + +// buildConnectionString builds a PostgreSQL connection string +func (p *PostgresAdapter) buildConnectionString() string { + sslMode := p.config.SSLMode + if sslMode == "" { + sslMode = "disable" + } + + return fmt.Sprintf( + "host=%s port=%d user=%s password=%s dbname=%s sslmode=%s", + p.config.Host, + p.config.Port, + p.config.User, + p.config.Password, + p.config.Database, + sslMode, + ) +} + +// postgresTransaction implements DBTransaction +type postgresTransaction struct { + tx *sql.Tx +} + +func (t *postgresTransaction) Exec(ctx context.Context, query string, args ...interface{}) (sql.Result, error) { + return t.tx.ExecContext(ctx, query, args...) +} + +func (t *postgresTransaction) QueryRow(ctx context.Context, query string, args ...interface{}) DBRow { + return &postgresRow{row: t.tx.QueryRowContext(ctx, query, args...)} +} + +func (t *postgresTransaction) Query(ctx context.Context, query string, args ...interface{}) (DBRows, error) { + rows, err := t.tx.QueryContext(ctx, query, args...) + if err != nil { + return nil, err + } + return &postgresRows{rows: rows}, nil +} + +func (t *postgresTransaction) Commit() error { + return t.tx.Commit() +} + +func (t *postgresTransaction) Rollback() error { + return t.tx.Rollback() +} + +// postgresRow implements DBRow +type postgresRow struct { + row *sql.Row + err error +} + +func (r *postgresRow) Scan(dest ...interface{}) error { + if r.err != nil { + return r.err + } + return r.row.Scan(dest...) +} + +// postgresRows implements DBRows +type postgresRows struct { + rows *sql.Rows +} + +func (r *postgresRows) Next() bool { + return r.rows.Next() +} + +func (r *postgresRows) Scan(dest ...interface{}) error { + return r.rows.Scan(dest...) +} + +func (r *postgresRows) Close() error { + return r.rows.Close() +} + +func (r *postgresRows) Err() error { + return r.rows.Err() +} diff --git a/pkg/broker/adapter/slog_logger.go b/pkg/broker/adapter/slog_logger.go new file mode 100644 index 0000000..93494ed --- /dev/null +++ b/pkg/broker/adapter/slog_logger.go @@ -0,0 +1,80 @@ +package adapter + +import ( + "log/slog" + "os" +) + +// SlogLogger implements Logger interface using slog +type SlogLogger struct { + logger *slog.Logger +} + +// NewSlogLogger creates a new slog-based logger +func NewSlogLogger(level slog.Level) *SlogLogger { + opts := &slog.HandlerOptions{ + Level: level, + } + handler := slog.NewJSONHandler(os.Stdout, opts) + logger := slog.New(handler) + + return &SlogLogger{ + logger: logger, + } +} + +// NewSlogLoggerWithHandler creates a new slog-based logger with a custom handler +func NewSlogLoggerWithHandler(handler slog.Handler) *SlogLogger { + return &SlogLogger{ + logger: slog.New(handler), + } +} + +// Debug logs a debug message +func (l *SlogLogger) Debug(msg string, args ...interface{}) { + l.logger.Debug(msg, l.convertArgs(args)...) +} + +// Info logs an info message +func (l *SlogLogger) Info(msg string, args ...interface{}) { + l.logger.Info(msg, l.convertArgs(args)...) +} + +// Warn logs a warning message +func (l *SlogLogger) Warn(msg string, args ...interface{}) { + l.logger.Warn(msg, l.convertArgs(args)...) +} + +// Error logs an error message +func (l *SlogLogger) Error(msg string, args ...interface{}) { + l.logger.Error(msg, l.convertArgs(args)...) +} + +// Fatal logs a fatal message and exits +func (l *SlogLogger) Fatal(msg string, args ...interface{}) { + l.logger.Error(msg, l.convertArgs(args)...) + os.Exit(1) +} + +// With returns a new logger with additional context +func (l *SlogLogger) With(key string, value interface{}) Logger { + return &SlogLogger{ + logger: l.logger.With(key, value), + } +} + +// convertArgs converts variadic args to slog.Attr pairs +func (l *SlogLogger) convertArgs(args []interface{}) []any { + if len(args) == 0 { + return nil + } + + // Convert pairs of key-value to slog format + attrs := make([]any, 0, len(args)) + for i := 0; i < len(args); i += 2 { + if i+1 < len(args) { + attrs = append(attrs, args[i], args[i+1]) + } + } + return attrs +} diff --git a/pkg/broker/broker.go b/pkg/broker/broker.go new file mode 100644 index 0000000..38d788b --- /dev/null +++ b/pkg/broker/broker.go @@ -0,0 +1,127 @@ +package broker + +import ( + "context" + "fmt" + "sync" + + "git.warky.dev/wdevs/pgsql-broker/pkg/broker/adapter" + "git.warky.dev/wdevs/pgsql-broker/pkg/broker/config" +) + +// Broker manages multiple database instances +type Broker struct { + config *config.Config + logger adapter.Logger + version string + instances []*DatabaseInstance + ctx context.Context + cancel context.CancelFunc + shutdown bool + mu sync.RWMutex +} + +// New creates a new broker that manages multiple database connections +func New(cfg *config.Config, logger adapter.Logger, version string) (*Broker, error) { + ctx, cancel := context.WithCancel(context.Background()) + + broker := &Broker{ + config: cfg, + logger: logger.With("component", "broker"), + version: version, + instances: make([]*DatabaseInstance, 0, len(cfg.Databases)), + ctx: ctx, + cancel: cancel, + } + + return broker, nil +} + +// Start begins all database instances +func (b *Broker) Start() error { + b.logger.Info("starting broker", "database_count", len(b.config.Databases)) + + // Create and start an instance for each database + for i, dbCfg := range b.config.Databases { + b.logger.Info("starting database instance", "name", dbCfg.Name, "host", dbCfg.Host, "database", dbCfg.Database) + + // Create database adapter + dbAdapter := adapter.NewPostgresAdapter(dbCfg.ToPostgresConfig(), b.logger) + + // Create database instance + instance, err := NewDatabaseInstance(b.config, &dbCfg, dbAdapter, b.logger, b.version, b.ctx) + if err != nil { + // Stop any already-started instances + b.stopInstances() + return fmt.Errorf("failed to create database instance %d (%s): %w", i, dbCfg.Name, err) + } + + // Start the instance + if err := instance.Start(); err != nil { + // Stop any already-started instances + b.stopInstances() + return fmt.Errorf("failed to start database instance %d (%s): %w", i, dbCfg.Name, err) + } + + b.instances = append(b.instances, instance) + b.logger.Info("database instance started", "name", dbCfg.Name, "instance_id", instance.ID) + } + + b.logger.Info("broker started successfully", "database_instances", len(b.instances)) + return nil +} + +// Stop gracefully stops all database instances +func (b *Broker) Stop() error { + b.mu.Lock() + if b.shutdown { + b.mu.Unlock() + return nil + } + b.shutdown = true + b.mu.Unlock() + + b.logger.Info("stopping broker") + + // Cancel context + b.cancel() + + // Stop all instances + b.stopInstances() + + b.logger.Info("broker stopped") + return nil +} + +// stopInstances stops all database instances +func (b *Broker) stopInstances() { + var wg sync.WaitGroup + for _, instance := range b.instances { + wg.Add(1) + go func(inst *DatabaseInstance) { + defer wg.Done() + if err := inst.Stop(); err != nil { + b.logger.Error("failed to stop instance", "name", inst.DatabaseName, "error", err) + } + }(instance) + } + wg.Wait() +} + +// GetStats returns statistics for all database instances +func (b *Broker) GetStats() map[string]interface{} { + b.mu.RLock() + defer b.mu.RUnlock() + + stats := map[string]interface{}{ + "database_count": len(b.instances), + } + + instanceStats := make(map[string]interface{}) + for _, instance := range b.instances { + instanceStats[instance.DatabaseName] = instance.GetStats() + } + stats["instances"] = instanceStats + + return stats +} diff --git a/pkg/broker/config/config.go b/pkg/broker/config/config.go new file mode 100644 index 0000000..c1f3c5f --- /dev/null +++ b/pkg/broker/config/config.go @@ -0,0 +1,180 @@ +package config + +import ( + "fmt" + "time" + + "github.com/spf13/viper" + "git.warky.dev/wdevs/pgsql-broker/pkg/broker/adapter" +) + +// Config holds all broker configuration +type Config struct { + Databases []DatabaseConfig `mapstructure:"databases"` + Broker BrokerConfig `mapstructure:"broker"` + Logging LoggingConfig `mapstructure:"logging"` +} + +// DatabaseConfig holds database connection settings +type DatabaseConfig struct { + Name string `mapstructure:"name"` + Host string `mapstructure:"host"` + Port int `mapstructure:"port"` + Database string `mapstructure:"database"` + User string `mapstructure:"user"` + Password string `mapstructure:"password"` + SSLMode string `mapstructure:"sslmode"` + MaxOpenConns int `mapstructure:"max_open_conns"` + MaxIdleConns int `mapstructure:"max_idle_conns"` + ConnMaxLifetime time.Duration `mapstructure:"conn_max_lifetime"` + ConnMaxIdleTime time.Duration `mapstructure:"conn_max_idle_time"` + QueueCount int `mapstructure:"queue_count"` +} + +// BrokerConfig holds broker-specific settings +type BrokerConfig struct { + Name string `mapstructure:"name"` + FetchQueryQueSize int `mapstructure:"fetch_query_que_size"` + QueueTimerSec int `mapstructure:"queue_timer_sec"` + QueueBufferSize int `mapstructure:"queue_buffer_size"` + WorkerIdleTimeoutSec int `mapstructure:"worker_idle_timeout_sec"` + NotifyRetrySeconds time.Duration `mapstructure:"notify_retry_seconds"` + EnableDebug bool `mapstructure:"enable_debug"` +} + +// LoggingConfig holds logging settings +type LoggingConfig struct { + Level string `mapstructure:"level"` + Format string `mapstructure:"format"` // json or text +} + +// LoadConfig loads configuration from file and environment variables +func LoadConfig(configPath string) (*Config, error) { + v := viper.New() + + // Set defaults + setDefaults(v) + + // Config file settings + if configPath != "" { + v.SetConfigFile(configPath) + } else { + v.SetConfigName("broker") + v.SetConfigType("yaml") + v.AddConfigPath(".") + v.AddConfigPath("/etc/pgsql-broker/") + v.AddConfigPath("$HOME/.pgsql-broker/") + } + + // Read from environment variables + v.SetEnvPrefix("BROKER") + v.AutomaticEnv() + + // Read config file + if err := v.ReadInConfig(); err != nil { + if _, ok := err.(viper.ConfigFileNotFoundError); !ok { + return nil, fmt.Errorf("failed to read config file: %w", err) + } + } + + var config Config + if err := v.Unmarshal(&config); err != nil { + return nil, fmt.Errorf("failed to unmarshal config: %w", err) + } + + // Validate configuration + if err := validateConfig(&config); err != nil { + return nil, err + } + + // Apply defaults to databases + applyDatabaseDefaults(&config) + + return &config, nil +} + +// setDefaults sets default configuration values +func setDefaults(v *viper.Viper) { + // Broker defaults + v.SetDefault("broker.name", "pgsql-broker") + v.SetDefault("broker.fetch_query_que_size", 100) + v.SetDefault("broker.queue_timer_sec", 10) + v.SetDefault("broker.queue_buffer_size", 50) + v.SetDefault("broker.worker_idle_timeout_sec", 10) + v.SetDefault("broker.notify_retry_seconds", 30*time.Second) + v.SetDefault("broker.enable_debug", false) + + // Logging defaults + v.SetDefault("logging.level", "info") + v.SetDefault("logging.format", "json") +} + +// validateConfig validates the configuration +func validateConfig(config *Config) error { + if len(config.Databases) == 0 { + return fmt.Errorf("at least one database must be configured") + } + + // Validate each database configuration + for i, db := range config.Databases { + if db.Name == "" { + return fmt.Errorf("database[%d]: name is required", i) + } + if db.Host == "" { + return fmt.Errorf("database[%d] (%s): host is required", i, db.Name) + } + if db.Database == "" { + return fmt.Errorf("database[%d] (%s): database name is required", i, db.Name) + } + if db.User == "" { + return fmt.Errorf("database[%d] (%s): user is required", i, db.Name) + } + } + + return nil +} + +// applyDatabaseDefaults applies default values to database configurations +func applyDatabaseDefaults(config *Config) { + for i := range config.Databases { + db := &config.Databases[i] + + if db.Port == 0 { + db.Port = 5432 + } + if db.SSLMode == "" { + db.SSLMode = "disable" + } + if db.MaxOpenConns == 0 { + db.MaxOpenConns = 25 + } + if db.MaxIdleConns == 0 { + db.MaxIdleConns = 5 + } + if db.ConnMaxLifetime == 0 { + db.ConnMaxLifetime = 5 * time.Minute + } + if db.ConnMaxIdleTime == 0 { + db.ConnMaxIdleTime = 10 * time.Minute + } + if db.QueueCount == 0 { + db.QueueCount = 4 + } + } +} + +// ToPostgresConfig converts DatabaseConfig to adapter.PostgresConfig +func (d *DatabaseConfig) ToPostgresConfig() adapter.PostgresConfig { + return adapter.PostgresConfig{ + Host: d.Host, + Port: d.Port, + Database: d.Database, + User: d.User, + Password: d.Password, + SSLMode: d.SSLMode, + MaxOpenConns: d.MaxOpenConns, + MaxIdleConns: d.MaxIdleConns, + ConnMaxLifetime: d.ConnMaxLifetime, + ConnMaxIdleTime: d.ConnMaxIdleTime, + } +} diff --git a/pkg/broker/database_instance.go b/pkg/broker/database_instance.go new file mode 100644 index 0000000..8283910 --- /dev/null +++ b/pkg/broker/database_instance.go @@ -0,0 +1,326 @@ +package broker + +import ( + "context" + "encoding/json" + "fmt" + "os" + "sync" + "time" + + "git.warky.dev/wdevs/pgsql-broker/pkg/broker/adapter" + "git.warky.dev/wdevs/pgsql-broker/pkg/broker/config" + "git.warky.dev/wdevs/pgsql-broker/pkg/broker/models" + "git.warky.dev/wdevs/pgsql-broker/pkg/broker/queue" +) + +// DatabaseInstance represents a broker instance for a single database +type DatabaseInstance struct { + ID int64 + Name string + DatabaseName string + Hostname string + PID int + Version string + config *config.Config + dbConfig *config.DatabaseConfig + db adapter.DBAdapter + logger adapter.Logger + queues map[int]*queue.Queue + queuesMu sync.RWMutex + ctx context.Context + shutdown bool + shutdownMu sync.RWMutex + jobsHandled int64 + startTime time.Time +} + +// NewDatabaseInstance creates a new database instance +func NewDatabaseInstance(cfg *config.Config, dbCfg *config.DatabaseConfig, db adapter.DBAdapter, logger adapter.Logger, version string, parentCtx context.Context) (*DatabaseInstance, error) { + hostname, err := os.Hostname() + if err != nil { + hostname = "unknown" + } + + instance := &DatabaseInstance{ + Name: fmt.Sprintf("%s-%s", cfg.Broker.Name, dbCfg.Name), + DatabaseName: dbCfg.Name, + Hostname: hostname, + PID: os.Getpid(), + Version: version, + config: cfg, + dbConfig: dbCfg, + db: db, + logger: logger.With("component", "database-instance").With("database", dbCfg.Name), + queues: make(map[int]*queue.Queue), + ctx: parentCtx, + startTime: time.Now(), + } + + return instance, nil +} + +// Start begins the database instance +func (i *DatabaseInstance) Start() error { + i.logger.Info("starting database instance", "name", i.Name, "hostname", i.Hostname, "pid", i.PID) + + // Connect to database + if err := i.db.Connect(i.ctx); err != nil { + return fmt.Errorf("failed to connect to database: %w", err) + } + + // Register instance in database + if err := i.registerInstance(); err != nil { + return fmt.Errorf("failed to register instance: %w", err) + } + + i.logger.Info("database instance registered", "id", i.ID) + + // Start queues + if err := i.startQueues(); err != nil { + return fmt.Errorf("failed to start queues: %w", err) + } + + // Start listening for notifications + if err := i.startListener(); err != nil { + return fmt.Errorf("failed to start listener: %w", err) + } + + // Start ping routine + go i.pingRoutine() + + i.logger.Info("database instance started successfully") + return nil +} + +// Stop gracefully stops the database instance +func (i *DatabaseInstance) Stop() error { + i.shutdownMu.Lock() + if i.shutdown { + i.shutdownMu.Unlock() + return nil + } + i.shutdown = true + i.shutdownMu.Unlock() + + i.logger.Info("stopping database instance") + + // Stop all queues + i.queuesMu.Lock() + for num, q := range i.queues { + i.logger.Info("stopping queue", "number", num) + if err := q.Stop(); err != nil { + i.logger.Error("failed to stop queue", "number", num, "error", err) + } + } + i.queuesMu.Unlock() + + // Update instance status in database + if err := i.shutdownInstance(); err != nil { + i.logger.Error("failed to shutdown instance in database", "error", err) + } + + // Close database connection + if err := i.db.Close(); err != nil { + i.logger.Error("failed to close database", "error", err) + } + + i.logger.Info("database instance stopped") + return nil +} + +// registerInstance registers the instance in the database +func (i *DatabaseInstance) registerInstance() error { + var retval int + var errmsg string + var instanceID int64 + + err := i.db.QueryRow(i.ctx, + "SELECT p_retval, p_errmsg, p_instance_id FROM broker_register_instance($1, $2, $3, $4, $5)", + i.Name, i.Hostname, i.PID, i.Version, i.dbConfig.QueueCount, + ).Scan(&retval, &errmsg, &instanceID) + + if err != nil { + return fmt.Errorf("query error: %w", err) + } + + if retval > 0 { + return fmt.Errorf("broker_register_instance error: %s", errmsg) + } + + i.ID = instanceID + return nil +} + +// startQueues initializes and starts all queues +func (i *DatabaseInstance) startQueues() error { + i.queuesMu.Lock() + defer i.queuesMu.Unlock() + + for queueNum := 1; queueNum <= i.dbConfig.QueueCount; queueNum++ { + queueCfg := queue.Config{ + Number: queueNum, + InstanceID: i.ID, + WorkerCount: 1, // One worker per queue for now + DBAdapter: i.db, + Logger: i.logger, + BufferSize: i.config.Broker.QueueBufferSize, + TimerSeconds: i.config.Broker.QueueTimerSec, + FetchSize: i.config.Broker.FetchQueryQueSize, + } + + q := queue.New(queueCfg) + if err := q.Start(queueCfg); err != nil { + return fmt.Errorf("failed to start queue %d: %w", queueNum, err) + } + + i.queues[queueNum] = q + i.logger.Info("queue started", "number", queueNum) + } + + return nil +} + +// startListener starts listening for database notifications +func (i *DatabaseInstance) startListener() error { + handler := func(n *adapter.Notification) { + i.handleNotification(n) + } + + if err := i.db.Listen(i.ctx, "broker.event", handler); err != nil { + return fmt.Errorf("failed to start listener: %w", err) + } + + return nil +} + +// handleNotification processes incoming job notifications +func (i *DatabaseInstance) handleNotification(n *adapter.Notification) { + if i.config.Broker.EnableDebug { + i.logger.Debug("received notification", "channel", n.Channel, "payload", n.Payload) + } + + var job models.Job + if err := json.Unmarshal([]byte(n.Payload), &job); err != nil { + i.logger.Error("failed to unmarshal notification", "error", err, "payload", n.Payload) + return + } + + if job.ID <= 0 { + i.logger.Warn("notification missing job ID", "payload", n.Payload) + return + } + + if job.JobQueue <= 0 { + i.logger.Warn("notification missing queue number", "job_id", job.ID) + return + } + + // Get the queue + i.queuesMu.RLock() + q, exists := i.queues[job.JobQueue] + i.queuesMu.RUnlock() + + if !exists { + i.logger.Warn("queue not found for job", "job_id", job.ID, "queue", job.JobQueue) + return + } + + // Add job to queue + if err := q.AddJob(job); err != nil { + i.logger.Error("failed to add job to queue", "job_id", job.ID, "queue", job.JobQueue, "error", err) + } +} + +// pingRoutine periodically updates the instance status in the database +func (i *DatabaseInstance) pingRoutine() { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + i.shutdownMu.RLock() + if i.shutdown { + i.shutdownMu.RUnlock() + return + } + i.shutdownMu.RUnlock() + + if err := i.ping(); err != nil { + i.logger.Error("ping failed", "error", err) + } + + case <-i.ctx.Done(): + return + } + } +} + +// ping updates the instance ping timestamp +func (i *DatabaseInstance) ping() error { + var retval int + var errmsg string + + err := i.db.QueryRow(i.ctx, + "SELECT p_retval, p_errmsg FROM broker_ping_instance($1, $2)", + i.ID, i.jobsHandled, + ).Scan(&retval, &errmsg) + + if err != nil { + return fmt.Errorf("query error: %w", err) + } + + if retval > 0 { + return fmt.Errorf("broker_ping_instance error: %s", errmsg) + } + + return nil +} + +// shutdownInstance marks the instance as shutdown in the database +func (i *DatabaseInstance) shutdownInstance() error { + var retval int + var errmsg string + + err := i.db.QueryRow(i.ctx, + "SELECT p_retval, p_errmsg FROM broker_shutdown_instance($1)", + i.ID, + ).Scan(&retval, &errmsg) + + if err != nil { + return fmt.Errorf("query error: %w", err) + } + + if retval > 0 { + return fmt.Errorf("broker_shutdown_instance error: %s", errmsg) + } + + return nil +} + +// GetStats returns instance statistics +func (i *DatabaseInstance) GetStats() map[string]interface{} { + i.queuesMu.RLock() + defer i.queuesMu.RUnlock() + + stats := map[string]interface{}{ + "id": i.ID, + "name": i.Name, + "database_name": i.DatabaseName, + "hostname": i.Hostname, + "pid": i.PID, + "version": i.Version, + "uptime": time.Since(i.startTime).String(), + "jobs_handled": i.jobsHandled, + "queue_count": len(i.queues), + } + + queueStats := make(map[int]interface{}) + for num, q := range i.queues { + queueStats[num] = q.GetStats() + } + stats["queues"] = queueStats + + return stats +} diff --git a/pkg/broker/install/install.go b/pkg/broker/install/install.go new file mode 100644 index 0000000..7e71479 --- /dev/null +++ b/pkg/broker/install/install.go @@ -0,0 +1,246 @@ +package install + +import ( + "context" + "embed" + "fmt" + "io/fs" + "sort" + "strings" + + "git.warky.dev/wdevs/pgsql-broker/pkg/broker/adapter" +) + +//go:embed all:sql +var sqlFS embed.FS + +// Installer handles database schema installation +type Installer struct { + db adapter.DBAdapter + logger adapter.Logger +} + +// New creates a new installer +func New(db adapter.DBAdapter, logger adapter.Logger) *Installer { + return &Installer{ + db: db, + logger: logger, + } +} + +// InstallSchema installs the complete database schema +func (i *Installer) InstallSchema(ctx context.Context) error { + i.logger.Info("starting schema installation") + + // Install tables first + if err := i.installTables(ctx); err != nil { + return fmt.Errorf("failed to install tables: %w", err) + } + + // Then install procedures + if err := i.installProcedures(ctx); err != nil { + return fmt.Errorf("failed to install procedures: %w", err) + } + + i.logger.Info("schema installation completed successfully") + return nil +} + +// installTables installs all table definitions +func (i *Installer) installTables(ctx context.Context) error { + i.logger.Info("installing tables") + + files, err := sqlFS.ReadDir("sql/tables") + if err != nil { + return fmt.Errorf("failed to read tables directory: %w", err) + } + + // Filter and sort SQL files + sqlFiles := filterAndSortSQLFiles(files) + + for _, file := range sqlFiles { + // Skip install script + if file == "00_install.sql" { + continue + } + + i.logger.Info("executing table script", "file", file) + + content, err := sqlFS.ReadFile("sql/tables/" + file) + if err != nil { + return fmt.Errorf("failed to read file %s: %w", file, err) + } + + if err := i.executeSQL(ctx, string(content)); err != nil { + return fmt.Errorf("failed to execute %s: %w", file, err) + } + } + + i.logger.Info("tables installed successfully") + return nil +} + +// installProcedures installs all stored procedures +func (i *Installer) installProcedures(ctx context.Context) error { + i.logger.Info("installing procedures") + + files, err := sqlFS.ReadDir("sql/procedures") + if err != nil { + return fmt.Errorf("failed to read procedures directory: %w", err) + } + + // Filter and sort SQL files + sqlFiles := filterAndSortSQLFiles(files) + + for _, file := range sqlFiles { + // Skip install script + if file == "00_install.sql" { + continue + } + + i.logger.Info("executing procedure script", "file", file) + + content, err := sqlFS.ReadFile("sql/procedures/" + file) + if err != nil { + return fmt.Errorf("failed to read file %s: %w", file, err) + } + + if err := i.executeSQL(ctx, string(content)); err != nil { + return fmt.Errorf("failed to execute %s: %w", file, err) + } + } + + i.logger.Info("procedures installed successfully") + return nil +} + +// executeSQL executes SQL statements +func (i *Installer) executeSQL(ctx context.Context, sql string) error { + // Remove comments and split by statement + statements := splitSQLStatements(sql) + + for _, stmt := range statements { + stmt = strings.TrimSpace(stmt) + if stmt == "" { + continue + } + + // Skip psql-specific commands + if strings.HasPrefix(stmt, "\\") { + continue + } + + if _, err := i.db.Exec(ctx, stmt); err != nil { + return fmt.Errorf("failed to execute statement: %w\nStatement: %s", err, stmt) + } + } + + return nil +} + +// filterAndSortSQLFiles filters and sorts SQL files +func filterAndSortSQLFiles(files []fs.DirEntry) []string { + var sqlFiles []string + for _, file := range files { + if !file.IsDir() && strings.HasSuffix(file.Name(), ".sql") { + sqlFiles = append(sqlFiles, file.Name()) + } + } + sort.Strings(sqlFiles) + return sqlFiles +} + +// splitSQLStatements splits SQL into individual statements +func splitSQLStatements(sql string) []string { + // Simple split by semicolon + // This doesn't handle all edge cases (strings with semicolons, dollar-quoted strings, etc.) + // but works for our use case + statements := strings.Split(sql, ";") + + var result []string + var buffer string + + for _, stmt := range statements { + stmt = strings.TrimSpace(stmt) + if stmt == "" { + continue + } + + buffer += stmt + ";" + + // Check if we're inside a function definition ($$) + dollarCount := strings.Count(buffer, "$$") + if dollarCount%2 == 0 { + // Even number of $$ means we're outside function definitions + result = append(result, buffer) + buffer = "" + } else { + // Odd number means we're inside a function, keep accumulating + buffer += " " + } + } + + // Add any remaining buffered content + if buffer != "" { + result = append(result, buffer) + } + + return result +} + +// VerifyInstallation checks if the schema is properly installed +func (i *Installer) VerifyInstallation(ctx context.Context) error { + i.logger.Info("verifying installation") + + tables := []string{"broker_queueinstance", "broker_jobs", "broker_schedule"} + procedures := []string{ + "broker_get", + "broker_run", + "broker_set", + "broker_add_job", + "broker_register_instance", + "broker_ping_instance", + "broker_shutdown_instance", + } + + // Check tables + for _, table := range tables { + var exists bool + err := i.db.QueryRow(ctx, + "SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = $1)", + table, + ).Scan(&exists) + + if err != nil { + return fmt.Errorf("failed to check table %s: %w", table, err) + } + + if !exists { + return fmt.Errorf("table %s does not exist", table) + } + + i.logger.Info("table verified", "table", table) + } + + // Check procedures + for _, proc := range procedures { + var exists bool + err := i.db.QueryRow(ctx, + "SELECT EXISTS (SELECT FROM pg_proc WHERE proname = $1)", + proc, + ).Scan(&exists) + + if err != nil { + return fmt.Errorf("failed to check procedure %s: %w", proc, err) + } + + if !exists { + return fmt.Errorf("procedure %s does not exist", proc) + } + + i.logger.Info("procedure verified", "procedure", proc) + } + + i.logger.Info("installation verified successfully") + return nil +} diff --git a/pkg/broker/install/sql/procedures/00_install.sql b/pkg/broker/install/sql/procedures/00_install.sql new file mode 100644 index 0000000..414831f --- /dev/null +++ b/pkg/broker/install/sql/procedures/00_install.sql @@ -0,0 +1,13 @@ +-- PostgreSQL Broker Procedures Installation Script +-- Run this script to create all required stored procedures + +\echo 'Installing PostgreSQL Broker procedures...' + +\i 01_broker_get.sql +\i 02_broker_run.sql +\i 03_broker_set.sql +\i 04_broker_register_instance.sql +\i 05_broker_add_job.sql +\i 06_broker_ping_instance.sql + +\echo 'PostgreSQL Broker procedures installed successfully!' diff --git a/pkg/broker/install/sql/procedures/01_broker_get.sql b/pkg/broker/install/sql/procedures/01_broker_get.sql new file mode 100644 index 0000000..9d6dd13 --- /dev/null +++ b/pkg/broker/install/sql/procedures/01_broker_get.sql @@ -0,0 +1,76 @@ +-- broker_get function +-- Fetches the next job from the queue for a given queue number +-- Returns: p_retval (0=success, >0=error), p_errmsg (error message), p_job_id (job ID if found) + +CREATE OR REPLACE FUNCTION broker_get( + p_queue_number INTEGER, + p_instance_id BIGINT DEFAULT NULL, + OUT p_retval INTEGER, + OUT p_errmsg TEXT, + OUT p_job_id BIGINT +) +RETURNS RECORD +LANGUAGE plpgsql +AS $$ +DECLARE + v_job_record RECORD; +BEGIN + p_retval := 0; + p_errmsg := ''; + p_job_id := NULL; + + -- Validate queue number + IF p_queue_number IS NULL OR p_queue_number <= 0 THEN + p_retval := 1; + p_errmsg := 'Invalid queue number'; + RETURN; + END IF; + + -- Find and lock the next pending job for this queue + -- Uses SKIP LOCKED to avoid blocking on jobs being processed by other workers + -- Skip jobs with pending dependencies + SELECT id_broker_jobs, job_name, job_priority, execute_str + INTO v_job_record + FROM broker_jobs + WHERE job_queue = p_queue_number + AND complete_status = 0 -- pending + AND ( + depends_on IS NULL -- no dependencies + OR depends_on = '{}' -- empty dependencies + OR NOT EXISTS ( -- all dependencies completed + SELECT 1 + FROM broker_jobs dep + WHERE dep.job_name = ANY(broker_jobs.depends_on) + AND dep.complete_status = 0 -- pending dependency + ) + ) + ORDER BY job_priority DESC, created_at ASC + LIMIT 1 + FOR UPDATE SKIP LOCKED; + + -- If no job found, return success with NULL job_id + IF NOT FOUND THEN + RETURN; + END IF; + + -- Update job status to running + UPDATE broker_jobs + SET complete_status = 1, -- running + started_at = NOW(), + rid_broker_queueinstance = p_instance_id, + updated_at = NOW() + WHERE id_broker_jobs = v_job_record.id_broker_jobs; + + -- Return the job ID + p_job_id := v_job_record.id_broker_jobs; + +EXCEPTION + WHEN OTHERS THEN + p_retval := 2; + p_errmsg := SQLERRM; + RAISE WARNING 'broker_get error: %', SQLERRM; +END; +$$; + +-- Comments +COMMENT ON FUNCTION broker_get IS 'Fetches the next pending job from the specified queue'; diff --git a/pkg/broker/install/sql/procedures/02_broker_run.sql b/pkg/broker/install/sql/procedures/02_broker_run.sql new file mode 100644 index 0000000..9c12f6d --- /dev/null +++ b/pkg/broker/install/sql/procedures/02_broker_run.sql @@ -0,0 +1,113 @@ +-- broker_run function +-- Executes a job by its ID +-- Returns: p_retval (0=success, >0=error), p_errmsg (error message) + +CREATE OR REPLACE FUNCTION broker_run( + p_job_id BIGINT, + OUT p_retval INTEGER, + OUT p_errmsg TEXT +) +RETURNS RECORD +LANGUAGE plpgsql +AS $$ +DECLARE + v_job_record RECORD; + v_execute_result TEXT; + v_error_occurred BOOLEAN := false; +BEGIN + p_retval := 0; + p_errmsg := ''; + v_execute_result := ''; + + -- Validate job ID + IF p_job_id IS NULL OR p_job_id <= 0 THEN + p_retval := 1; + p_errmsg := 'Invalid job ID'; + RETURN; + END IF; + + -- Get job details + SELECT id_broker_jobs, execute_str, job_language, run_as, complete_status + INTO v_job_record + FROM broker_jobs + WHERE id_broker_jobs = p_job_id + FOR UPDATE; + + -- Check if job exists + IF NOT FOUND THEN + p_retval := 2; + p_errmsg := 'Job not found'; + RETURN; + END IF; + + -- Check if job is in running state + IF v_job_record.complete_status != 1 THEN + p_retval := 3; + p_errmsg := format('Job is not in running state (status: %s)', v_job_record.complete_status); + RETURN; + END IF; + + -- Execute the job + BEGIN + -- For SQL/PLPGSQL jobs, execute directly + IF v_job_record.job_language IN ('sql', 'plpgsql') THEN + EXECUTE v_job_record.execute_str; + v_execute_result := 'Success'; + ELSE + -- Other languages would need external execution + p_retval := 4; + p_errmsg := format('Unsupported job language: %s', v_job_record.job_language); + v_error_occurred := true; + END IF; + + EXCEPTION + WHEN OTHERS THEN + v_error_occurred := true; + p_retval := 5; + p_errmsg := SQLERRM; + v_execute_result := format('Error: %s', SQLERRM); + END; + + -- Update job with results + IF v_error_occurred THEN + UPDATE broker_jobs + SET complete_status = 3, -- failed + error_msg = p_errmsg, + execute_result = v_execute_result, + completed_at = NOW(), + updated_at = NOW() + WHERE id_broker_jobs = p_job_id; + ELSE + UPDATE broker_jobs + SET complete_status = 2, -- completed + execute_result = v_execute_result, + error_msg = NULL, + completed_at = NOW(), + updated_at = NOW() + WHERE id_broker_jobs = p_job_id; + END IF; + +EXCEPTION + WHEN OTHERS THEN + p_retval := 6; + p_errmsg := SQLERRM; + RAISE WARNING 'broker_run error: %', SQLERRM; + + -- Try to update job status to failed + BEGIN + UPDATE broker_jobs + SET complete_status = 3, -- failed + error_msg = SQLERRM, + completed_at = NOW(), + updated_at = NOW() + WHERE id_broker_jobs = p_job_id; + EXCEPTION + WHEN OTHERS THEN + -- Ignore update errors + NULL; + END; +END; +$$; + +-- Comments +COMMENT ON FUNCTION broker_run IS 'Executes a job by its ID and updates the status'; diff --git a/pkg/broker/install/sql/procedures/03_broker_set.sql b/pkg/broker/install/sql/procedures/03_broker_set.sql new file mode 100644 index 0000000..d177f77 --- /dev/null +++ b/pkg/broker/install/sql/procedures/03_broker_set.sql @@ -0,0 +1,95 @@ +-- broker_set function +-- Sets broker runtime options and context +-- Supports: user, application_name, and custom settings +-- Returns: p_retval (0=success, >0=error), p_errmsg (error message) + +CREATE OR REPLACE FUNCTION broker_set( + p_option_name TEXT, + p_option_value TEXT, + OUT p_retval INTEGER, + OUT p_errmsg TEXT +) +RETURNS RECORD +LANGUAGE plpgsql +AS $$ +DECLARE + v_sql TEXT; +BEGIN + p_retval := 0; + p_errmsg := ''; + + -- Validate inputs + IF p_option_name IS NULL OR p_option_name = '' THEN + p_retval := 1; + p_errmsg := 'Option name is required'; + RETURN; + END IF; + + -- Handle different option types + CASE LOWER(p_option_name) + WHEN 'user' THEN + -- Set session user context + -- This is useful for audit trails and permissions + BEGIN + v_sql := format('SET SESSION AUTHORIZATION %I', p_option_value); + EXECUTE v_sql; + EXCEPTION + WHEN OTHERS THEN + p_retval := 2; + p_errmsg := format('Failed to set user: %s', SQLERRM); + RETURN; + END; + + WHEN 'application_name' THEN + -- Set application name (visible in pg_stat_activity) + BEGIN + v_sql := format('SET application_name TO %L', p_option_value); + EXECUTE v_sql; + EXCEPTION + WHEN OTHERS THEN + p_retval := 3; + p_errmsg := format('Failed to set application_name: %s', SQLERRM); + RETURN; + END; + + WHEN 'search_path' THEN + -- Set schema search path + BEGIN + v_sql := format('SET search_path TO %s', p_option_value); + EXECUTE v_sql; + EXCEPTION + WHEN OTHERS THEN + p_retval := 4; + p_errmsg := format('Failed to set search_path: %s', SQLERRM); + RETURN; + END; + + WHEN 'timezone' THEN + -- Set timezone + BEGIN + v_sql := format('SET timezone TO %L', p_option_value); + EXECUTE v_sql; + EXCEPTION + WHEN OTHERS THEN + p_retval := 5; + p_errmsg := format('Failed to set timezone: %s', SQLERRM); + RETURN; + END; + + ELSE + -- Unknown option + p_retval := 10; + p_errmsg := format('Unknown option: %s', p_option_name); + RETURN; + END CASE; + +EXCEPTION + WHEN OTHERS THEN + p_retval := 99; + p_errmsg := SQLERRM; + RAISE WARNING 'broker_set error: %', SQLERRM; +END; +$$; + +-- Comments +COMMENT ON FUNCTION broker_set IS 'Sets broker runtime options and session context (user, application_name, search_path, timezone)'; diff --git a/pkg/broker/install/sql/procedures/04_broker_register_instance.sql b/pkg/broker/install/sql/procedures/04_broker_register_instance.sql new file mode 100644 index 0000000..18049d8 --- /dev/null +++ b/pkg/broker/install/sql/procedures/04_broker_register_instance.sql @@ -0,0 +1,82 @@ +-- broker_register_instance function +-- Registers a new broker instance in the database +-- Returns: p_retval (0=success, >0=error), p_errmsg (error message), p_instance_id (new instance ID) + +CREATE OR REPLACE FUNCTION broker_register_instance( + p_name TEXT, + p_hostname TEXT, + p_pid INTEGER, + p_version TEXT, + p_queue_count INTEGER, + OUT p_retval INTEGER, + OUT p_errmsg TEXT, + OUT p_instance_id BIGINT +) +RETURNS RECORD +LANGUAGE plpgsql +AS $$ +DECLARE + v_active_count INTEGER; +BEGIN + p_retval := 0; + p_errmsg := ''; + p_instance_id := NULL; + + -- Validate inputs + IF p_name IS NULL OR p_name = '' THEN + p_retval := 1; + p_errmsg := 'Instance name is required'; + RETURN; + END IF; + + IF p_hostname IS NULL OR p_hostname = '' THEN + p_retval := 2; + p_errmsg := 'Hostname is required'; + RETURN; + END IF; + + -- Check for existing active instances + -- Only one broker instance should be active per database + SELECT COUNT(*) + INTO v_active_count + FROM broker_queueinstance + WHERE status = 'active'; + + IF v_active_count > 0 THEN + p_retval := 3; + p_errmsg := 'Another broker instance is already active in this database. Only one broker instance per database is allowed.'; + RETURN; + END IF; + + -- Insert new instance + INSERT INTO broker_queueinstance ( + name, + hostname, + pid, + version, + status, + queue_count, + started_at, + last_ping_at + ) VALUES ( + p_name, + p_hostname, + p_pid, + p_version, + 'active', + p_queue_count, + NOW(), + NOW() + ) + RETURNING id_broker_queueinstance INTO p_instance_id; + +EXCEPTION + WHEN OTHERS THEN + p_retval := 99; + p_errmsg := SQLERRM; + RAISE WARNING 'broker_register_instance error: %', SQLERRM; +END; +$$; + +-- Comments +COMMENT ON FUNCTION broker_register_instance IS 'Registers a new broker instance'; diff --git a/pkg/broker/install/sql/procedures/05_broker_add_job.sql b/pkg/broker/install/sql/procedures/05_broker_add_job.sql new file mode 100644 index 0000000..0de3fc7 --- /dev/null +++ b/pkg/broker/install/sql/procedures/05_broker_add_job.sql @@ -0,0 +1,91 @@ +-- broker_add_job function +-- Adds a new job to the broker queue and sends a notification +-- Returns: p_retval (0=success, >0=error), p_errmsg (error message), p_job_id (new job ID) + +CREATE OR REPLACE FUNCTION broker_add_job( + p_job_name TEXT, + p_execute_str TEXT, + p_job_queue INTEGER DEFAULT 1, + p_job_priority INTEGER DEFAULT 0, + p_job_language TEXT DEFAULT 'sql', + p_run_as TEXT DEFAULT NULL, + p_schedule_id BIGINT DEFAULT NULL, + p_depends_on TEXT[] DEFAULT NULL, + OUT p_retval INTEGER, + OUT p_errmsg TEXT, + OUT p_job_id BIGINT +) +RETURNS RECORD +LANGUAGE plpgsql +AS $$ +DECLARE + v_notification_payload JSON; +BEGIN + p_retval := 0; + p_errmsg := ''; + p_job_id := NULL; + + -- Validate inputs + IF p_job_name IS NULL OR p_job_name = '' THEN + p_retval := 1; + p_errmsg := 'Job name is required'; + RETURN; + END IF; + + IF p_execute_str IS NULL OR p_execute_str = '' THEN + p_retval := 2; + p_errmsg := 'Execute string is required'; + RETURN; + END IF; + + IF p_job_queue IS NULL OR p_job_queue <= 0 THEN + p_retval := 3; + p_errmsg := 'Invalid job queue number'; + RETURN; + END IF; + + -- Insert new job + INSERT INTO broker_jobs ( + job_name, + job_priority, + job_queue, + job_language, + execute_str, + run_as, + rid_broker_schedule, + depends_on, + complete_status + ) VALUES ( + p_job_name, + p_job_priority, + p_job_queue, + p_job_language, + p_execute_str, + p_run_as, + p_schedule_id, + p_depends_on, + 0 -- pending + ) + RETURNING id_broker_jobs INTO p_job_id; + + -- Create notification payload + v_notification_payload := json_build_object( + 'id', p_job_id, + 'job_name', p_job_name, + 'job_queue', p_job_queue, + 'job_priority', p_job_priority + ); + + -- Send notification to broker + PERFORM pg_notify('broker.event', v_notification_payload::text); + +EXCEPTION + WHEN OTHERS THEN + p_retval := 99; + p_errmsg := SQLERRM; + RAISE WARNING 'broker_add_job error: %', SQLERRM; +END; +$$; + +-- Comments +COMMENT ON FUNCTION broker_add_job IS 'Adds a new job to the broker queue and sends a NOTIFY event'; diff --git a/pkg/broker/install/sql/procedures/06_broker_ping_instance.sql b/pkg/broker/install/sql/procedures/06_broker_ping_instance.sql new file mode 100644 index 0000000..4037bd4 --- /dev/null +++ b/pkg/broker/install/sql/procedures/06_broker_ping_instance.sql @@ -0,0 +1,98 @@ +-- broker_ping_instance function +-- Updates the last_ping_at timestamp for a broker instance +-- Returns: p_retval (0=success, >0=error), p_errmsg (error message) + +CREATE OR REPLACE FUNCTION broker_ping_instance( + p_instance_id BIGINT, + p_jobs_handled BIGINT DEFAULT NULL, + OUT p_retval INTEGER, + OUT p_errmsg TEXT +) +RETURNS RECORD +LANGUAGE plpgsql +AS $$ +BEGIN + p_retval := 0; + p_errmsg := ''; + + -- Validate instance ID + IF p_instance_id IS NULL OR p_instance_id <= 0 THEN + p_retval := 1; + p_errmsg := 'Invalid instance ID'; + RETURN; + END IF; + + -- Update ping timestamp + IF p_jobs_handled IS NOT NULL THEN + UPDATE broker_queueinstance + SET last_ping_at = NOW(), + jobs_handled = p_jobs_handled + WHERE id_broker_queueinstance = p_instance_id; + ELSE + UPDATE broker_queueinstance + SET last_ping_at = NOW() + WHERE id_broker_queueinstance = p_instance_id; + END IF; + + -- Check if instance was found + IF NOT FOUND THEN + p_retval := 2; + p_errmsg := 'Instance not found'; + RETURN; + END IF; + +EXCEPTION + WHEN OTHERS THEN + p_retval := 99; + p_errmsg := SQLERRM; + RAISE WARNING 'broker_ping_instance error: %', SQLERRM; +END; +$$; + +-- broker_shutdown_instance function +-- Marks a broker instance as shutdown +-- Returns: p_retval (0=success, >0=error), p_errmsg (error message) + +CREATE OR REPLACE FUNCTION broker_shutdown_instance( + p_instance_id BIGINT, + OUT p_retval INTEGER, + OUT p_errmsg TEXT +) +RETURNS RECORD +LANGUAGE plpgsql +AS $$ +BEGIN + p_retval := 0; + p_errmsg := ''; + + -- Validate instance ID + IF p_instance_id IS NULL OR p_instance_id <= 0 THEN + p_retval := 1; + p_errmsg := 'Invalid instance ID'; + RETURN; + END IF; + + -- Update instance status + UPDATE broker_queueinstance + SET status = 'shutdown', + shutdown_at = NOW() + WHERE id_broker_queueinstance = p_instance_id; + + -- Check if instance was found + IF NOT FOUND THEN + p_retval := 2; + p_errmsg := 'Instance not found'; + RETURN; + END IF; + +EXCEPTION + WHEN OTHERS THEN + p_retval := 99; + p_errmsg := SQLERRM; + RAISE WARNING 'broker_shutdown_instance error: %', SQLERRM; +END; +$$; + +-- Comments +COMMENT ON FUNCTION broker_ping_instance IS 'Updates the last ping timestamp for an instance'; +COMMENT ON FUNCTION broker_shutdown_instance IS 'Marks an instance as shutdown'; diff --git a/pkg/broker/install/sql/tables/00_install.sql b/pkg/broker/install/sql/tables/00_install.sql new file mode 100644 index 0000000..09d6a74 --- /dev/null +++ b/pkg/broker/install/sql/tables/00_install.sql @@ -0,0 +1,10 @@ +-- PostgreSQL Broker Tables Installation Script +-- Run this script to create all required tables + +\echo 'Installing PostgreSQL Broker tables...' + +\i 01_broker_queueinstance.sql +\i 03_broker_schedule.sql +\i 02_broker_jobs.sql + +\echo 'PostgreSQL Broker tables installed successfully!' diff --git a/pkg/broker/install/sql/tables/01_broker_queueinstance.sql b/pkg/broker/install/sql/tables/01_broker_queueinstance.sql new file mode 100644 index 0000000..dbd1462 --- /dev/null +++ b/pkg/broker/install/sql/tables/01_broker_queueinstance.sql @@ -0,0 +1,31 @@ +-- broker_queueinstance table +-- Tracks active and historical broker queue instances + +CREATE TABLE IF NOT EXISTS broker_queueinstance ( + id_broker_queueinstance BIGSERIAL PRIMARY KEY, + name VARCHAR(255) NOT NULL, + hostname VARCHAR(255) NOT NULL, + pid INTEGER NOT NULL, + version VARCHAR(50) NOT NULL, + status VARCHAR(20) NOT NULL DEFAULT 'active', + started_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + last_ping_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + shutdown_at TIMESTAMP WITH TIME ZONE, + queue_count INTEGER NOT NULL DEFAULT 0, + jobs_handled BIGINT NOT NULL DEFAULT 0, + + CONSTRAINT broker_queueinstance_status_check CHECK (status IN ('active', 'inactive', 'shutdown')) +); + +-- Indexes +CREATE INDEX IF NOT EXISTS idx_broker_queueinstance_status ON broker_queueinstance(status); +CREATE INDEX IF NOT EXISTS idx_broker_queueinstance_hostname ON broker_queueinstance(hostname); +CREATE INDEX IF NOT EXISTS idx_broker_queueinstance_last_ping ON broker_queueinstance(last_ping_at); + +-- Comments +COMMENT ON TABLE broker_queueinstance IS 'Tracks broker queue instances (active and historical)'; +COMMENT ON COLUMN broker_queueinstance.name IS 'Human-readable name of the broker instance'; +COMMENT ON COLUMN broker_queueinstance.hostname IS 'Hostname where the broker is running'; +COMMENT ON COLUMN broker_queueinstance.pid IS 'Process ID of the broker'; +COMMENT ON COLUMN broker_queueinstance.status IS 'Current status: active, inactive, or shutdown'; +COMMENT ON COLUMN broker_queueinstance.jobs_handled IS 'Total number of jobs handled by this instance'; diff --git a/pkg/broker/install/sql/tables/02_broker_jobs.sql b/pkg/broker/install/sql/tables/02_broker_jobs.sql new file mode 100644 index 0000000..8475439 --- /dev/null +++ b/pkg/broker/install/sql/tables/02_broker_jobs.sql @@ -0,0 +1,62 @@ +-- broker_jobs table +-- Stores jobs to be executed by the broker + +CREATE TABLE IF NOT EXISTS broker_jobs ( + id_broker_jobs BIGSERIAL PRIMARY KEY, + job_name VARCHAR(255) NOT NULL, + job_priority INTEGER NOT NULL DEFAULT 0, + job_queue INTEGER NOT NULL DEFAULT 1, + job_language VARCHAR(50) NOT NULL DEFAULT 'sql', + execute_str TEXT NOT NULL, + execute_result TEXT, + error_msg TEXT, + complete_status INTEGER NOT NULL DEFAULT 0, + run_as VARCHAR(100), + rid_broker_schedule BIGINT, + rid_broker_queueinstance BIGINT, + depends_on TEXT[], + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + started_at TIMESTAMP WITH TIME ZONE, + completed_at TIMESTAMP WITH TIME ZONE, + + CONSTRAINT broker_jobs_complete_status_check CHECK (complete_status IN (0, 1, 2, 3, 4)), + CONSTRAINT broker_jobs_job_queue_check CHECK (job_queue > 0), + CONSTRAINT fk_schedule FOREIGN KEY (rid_broker_schedule) REFERENCES broker_schedule(id_broker_schedule) ON DELETE SET NULL, + CONSTRAINT fk_instance FOREIGN KEY (rid_broker_queueinstance) REFERENCES broker_queueinstance(id_broker_queueinstance) ON DELETE SET NULL +); + +-- Indexes +CREATE INDEX IF NOT EXISTS idx_broker_jobs_status ON broker_jobs(complete_status); +CREATE INDEX IF NOT EXISTS idx_broker_jobs_queue ON broker_jobs(job_queue, complete_status, job_priority); +CREATE INDEX IF NOT EXISTS idx_broker_jobs_schedule ON broker_jobs(rid_broker_schedule); +CREATE INDEX IF NOT EXISTS idx_broker_jobs_instance ON broker_jobs(rid_broker_queueinstance); +CREATE INDEX IF NOT EXISTS idx_broker_jobs_created ON broker_jobs(created_at); +CREATE INDEX IF NOT EXISTS idx_broker_jobs_name ON broker_jobs(job_name, complete_status); + +-- Comments +COMMENT ON TABLE broker_jobs IS 'Job queue for broker execution'; +COMMENT ON COLUMN broker_jobs.job_name IS 'Name/description of the job'; +COMMENT ON COLUMN broker_jobs.job_priority IS 'Job priority (higher = more important)'; +COMMENT ON COLUMN broker_jobs.job_queue IS 'Queue number (allows parallel processing)'; +COMMENT ON COLUMN broker_jobs.job_language IS 'Execution language (sql, plpgsql, etc.)'; +COMMENT ON COLUMN broker_jobs.execute_str IS 'SQL or code to execute'; +COMMENT ON COLUMN broker_jobs.complete_status IS '0=pending, 1=running, 2=completed, 3=failed, 4=cancelled'; +COMMENT ON COLUMN broker_jobs.run_as IS 'User context to run the job as'; +COMMENT ON COLUMN broker_jobs.rid_broker_schedule IS 'Reference to schedule if job was scheduled'; +COMMENT ON COLUMN broker_jobs.rid_broker_queueinstance IS 'Instance that processed this job'; +COMMENT ON COLUMN broker_jobs.depends_on IS 'Array of job names that must be completed before this job can run'; + +-- Trigger to update updated_at +CREATE OR REPLACE FUNCTION tf_broker_jobs_update_timestamp() +RETURNS TRIGGER AS $$ +BEGIN + NEW.updated_at = NOW(); + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +CREATE TRIGGER t_broker_jobs_updated_at + BEFORE UPDATE ON broker_jobs + FOR EACH ROW + EXECUTE FUNCTION tf_broker_jobs_update_timestamp(); diff --git a/pkg/broker/install/sql/tables/03_broker_schedule.sql b/pkg/broker/install/sql/tables/03_broker_schedule.sql new file mode 100644 index 0000000..78ce5a7 --- /dev/null +++ b/pkg/broker/install/sql/tables/03_broker_schedule.sql @@ -0,0 +1,50 @@ +-- broker_schedule table +-- Stores scheduled jobs (cron-like functionality) + +CREATE TABLE IF NOT EXISTS broker_schedule ( + id_broker_schedule BIGSERIAL PRIMARY KEY, + name VARCHAR(255) NOT NULL UNIQUE, + cron_expr VARCHAR(100) NOT NULL, + enabled BOOLEAN NOT NULL DEFAULT true, + job_name VARCHAR(255) NOT NULL, + job_priority INTEGER NOT NULL DEFAULT 0, + job_queue INTEGER NOT NULL DEFAULT 1, + job_language VARCHAR(50) NOT NULL DEFAULT 'sql', + execute_str TEXT NOT NULL, + run_as VARCHAR(100), + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + last_run_at TIMESTAMP WITH TIME ZONE, + next_run_at TIMESTAMP WITH TIME ZONE, + + CONSTRAINT broker_schedule_job_queue_check CHECK (job_queue > 0) +); + +-- Indexes +CREATE INDEX IF NOT EXISTS idx_broker_schedule_enabled ON broker_schedule(enabled); +CREATE INDEX IF NOT EXISTS idx_broker_schedule_next_run ON broker_schedule(next_run_at) WHERE enabled = true; +CREATE INDEX IF NOT EXISTS idx_broker_schedule_name ON broker_schedule(name); + +-- Comments +COMMENT ON TABLE broker_schedule IS 'Scheduled jobs (cron-like functionality)'; +COMMENT ON COLUMN broker_schedule.name IS 'Unique name for the schedule'; +COMMENT ON COLUMN broker_schedule.cron_expr IS 'Cron expression for scheduling'; +COMMENT ON COLUMN broker_schedule.enabled IS 'Whether the schedule is active'; +COMMENT ON COLUMN broker_schedule.job_name IS 'Name of the job to create'; +COMMENT ON COLUMN broker_schedule.execute_str IS 'SQL or code to execute'; +COMMENT ON COLUMN broker_schedule.last_run_at IS 'Last time the job was executed'; +COMMENT ON COLUMN broker_schedule.next_run_at IS 'Next scheduled execution time'; + +-- Trigger to update updated_at +CREATE OR REPLACE FUNCTION tf_broker_schedule_update_timestamp() +RETURNS TRIGGER AS $$ +BEGIN + NEW.updated_at = NOW(); + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +CREATE TRIGGER t_broker_schedule_updated_at + BEFORE UPDATE ON broker_schedule + FOR EACH ROW + EXECUTE FUNCTION tf_broker_schedule_update_timestamp(); diff --git a/pkg/broker/models/models.go b/pkg/broker/models/models.go new file mode 100644 index 0000000..a413360 --- /dev/null +++ b/pkg/broker/models/models.go @@ -0,0 +1,73 @@ +package models + +import "time" + +// Job represents a broker job +type Job struct { + ID int64 `json:"id"` + JobName string `json:"job_name"` + JobPriority int32 `json:"job_priority"` + JobQueue int `json:"job_queue"` + JobLanguage string `json:"job_language"` + ExecuteStr string `json:"execute_str"` + ExecuteResult string `json:"execute_result"` + ErrorMsg string `json:"error_msg"` + CompleteStatus int `json:"complete_status"` + RunAs string `json:"run_as"` + UserLogin string `json:"user_login"` + ScheduleID int64 `json:"schedule_id"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` +} + +// Instance represents a broker instance +type Instance struct { + ID int64 `json:"id"` + Name string `json:"name"` + Hostname string `json:"hostname"` + PID int `json:"pid"` + Version string `json:"version"` + Status string `json:"status"` // active, inactive, shutdown + StartedAt time.Time `json:"started_at"` + LastPingAt time.Time `json:"last_ping_at"` + ShutdownAt time.Time `json:"shutdown_at"` + QueueCount int `json:"queue_count"` + JobsHandled int64 `json:"jobs_handled"` +} + +// Schedule represents a job schedule +type Schedule struct { + ID int64 `json:"id"` + Name string `json:"name"` + CronExpr string `json:"cron_expr"` + Enabled bool `json:"enabled"` + JobName string `json:"job_name"` + JobPriority int32 `json:"job_priority"` + JobQueue int `json:"job_queue"` + ExecuteStr string `json:"execute_str"` + RunAs string `json:"run_as"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` + LastRunAt time.Time `json:"last_run_at"` + NextRunAt time.Time `json:"next_run_at"` +} + +// JobStatus represents job completion statuses +type JobStatus int + +const ( + JobStatusPending JobStatus = 0 + JobStatusRunning JobStatus = 1 + JobStatusCompleted JobStatus = 2 + JobStatusFailed JobStatus = 3 + JobStatusCancelled JobStatus = 4 +) + +// InstanceStatus represents instance statuses +type InstanceStatus string + +const ( + InstanceStatusActive InstanceStatus = "active" + InstanceStatusInactive InstanceStatus = "inactive" + InstanceStatusShutdown InstanceStatus = "shutdown" +) diff --git a/pkg/broker/queue/queue.go b/pkg/broker/queue/queue.go new file mode 100644 index 0000000..fd547b8 --- /dev/null +++ b/pkg/broker/queue/queue.go @@ -0,0 +1,148 @@ +package queue + +import ( + "context" + "fmt" + "sync" + + "git.warky.dev/wdevs/pgsql-broker/pkg/broker/adapter" + "git.warky.dev/wdevs/pgsql-broker/pkg/broker/models" + "git.warky.dev/wdevs/pgsql-broker/pkg/broker/worker" +) + +// Queue manages a collection of workers for a specific queue number +type Queue struct { + Number int + InstanceID int64 + workers []*worker.Worker + db adapter.DBAdapter + logger adapter.Logger + ctx context.Context + cancel context.CancelFunc + mu sync.RWMutex + workerCount int +} + +// Config holds queue configuration +type Config struct { + Number int + InstanceID int64 + WorkerCount int + DBAdapter adapter.DBAdapter + Logger adapter.Logger + BufferSize int + TimerSeconds int + FetchSize int +} + +// New creates a new queue manager +func New(cfg Config) *Queue { + ctx, cancel := context.WithCancel(context.Background()) + + logger := cfg.Logger.With("queue", cfg.Number) + + return &Queue{ + Number: cfg.Number, + InstanceID: cfg.InstanceID, + workers: make([]*worker.Worker, 0, cfg.WorkerCount), + db: cfg.DBAdapter, + logger: logger, + ctx: ctx, + cancel: cancel, + workerCount: cfg.WorkerCount, + } +} + +// Start initializes and starts all workers in the queue +func (q *Queue) Start(cfg Config) error { + q.mu.Lock() + defer q.mu.Unlock() + + q.logger.Info("starting queue", "worker_count", q.workerCount) + + for i := 0; i < q.workerCount; i++ { + w := worker.New(worker.Config{ + ID: i + 1, + QueueNumber: q.Number, + InstanceID: q.InstanceID, + DBAdapter: cfg.DBAdapter, + Logger: cfg.Logger, + BufferSize: cfg.BufferSize, + TimerSeconds: cfg.TimerSeconds, + FetchSize: cfg.FetchSize, + }) + + if err := w.Start(q.ctx); err != nil { + return fmt.Errorf("failed to start worker %d: %w", i+1, err) + } + + q.workers = append(q.workers, w) + } + + q.logger.Info("queue started successfully", "workers", len(q.workers)) + return nil +} + +// Stop gracefully stops all workers in the queue +func (q *Queue) Stop() error { + q.mu.Lock() + defer q.mu.Unlock() + + q.logger.Info("stopping queue") + + // Cancel context to signal all workers + q.cancel() + + // Stop each worker + var stopErrors []error + for i, w := range q.workers { + if err := w.Stop(); err != nil { + stopErrors = append(stopErrors, fmt.Errorf("worker %d: %w", i+1, err)) + } + } + + if len(stopErrors) > 0 { + return fmt.Errorf("errors stopping workers: %v", stopErrors) + } + + q.logger.Info("queue stopped successfully") + return nil +} + +// AddJob adds a job to the least busy worker +func (q *Queue) AddJob(job models.Job) error { + q.mu.RLock() + defer q.mu.RUnlock() + + if len(q.workers) == 0 { + return fmt.Errorf("no workers available") + } + + // Simple round-robin: use first available worker + // Could be enhanced with load balancing + for _, w := range q.workers { + if err := w.AddJob(job); err == nil { + return nil + } + } + + return fmt.Errorf("all workers are busy") +} + +// GetStats returns statistics for all workers in the queue +func (q *Queue) GetStats() map[int]worker.Stats { + q.mu.RLock() + defer q.mu.RUnlock() + + stats := make(map[int]worker.Stats) + for i, w := range q.workers { + lastActivity, jobsHandled, running := w.GetStats() + stats[i+1] = worker.Stats{ + LastActivity: lastActivity, + JobsHandled: jobsHandled, + Running: running, + } + } + + return stats +} diff --git a/pkg/broker/worker/worker.go b/pkg/broker/worker/worker.go new file mode 100644 index 0000000..c57394e --- /dev/null +++ b/pkg/broker/worker/worker.go @@ -0,0 +1,247 @@ +package worker + +import ( + "context" + "fmt" + "sync" + "time" + + "git.warky.dev/wdevs/pgsql-broker/pkg/broker/adapter" + "git.warky.dev/wdevs/pgsql-broker/pkg/broker/models" +) + +// Worker represents a single job processing worker +type Worker struct { + ID int + QueueNumber int + InstanceID int64 + db adapter.DBAdapter + logger adapter.Logger + jobChan chan models.Job + shutdown chan struct{} + wg *sync.WaitGroup + running bool + mu sync.RWMutex + lastActivity time.Time + jobsHandled int64 + timerSeconds int + fetchSize int +} + +// Stats holds worker statistics +type Stats struct { + LastActivity time.Time + JobsHandled int64 + Running bool +} + +// Config holds worker configuration +type Config struct { + ID int + QueueNumber int + InstanceID int64 + DBAdapter adapter.DBAdapter + Logger adapter.Logger + BufferSize int + TimerSeconds int + FetchSize int +} + +// New creates a new worker +func New(cfg Config) *Worker { + return &Worker{ + ID: cfg.ID, + QueueNumber: cfg.QueueNumber, + InstanceID: cfg.InstanceID, + db: cfg.DBAdapter, + logger: cfg.Logger.With("worker_id", cfg.ID).With("queue", cfg.QueueNumber), + jobChan: make(chan models.Job, cfg.BufferSize), + shutdown: make(chan struct{}), + wg: &sync.WaitGroup{}, + timerSeconds: cfg.TimerSeconds, + fetchSize: cfg.FetchSize, + } +} + +// Start begins the worker processing loop +func (w *Worker) Start(ctx context.Context) error { + w.mu.Lock() + if w.running { + w.mu.Unlock() + return fmt.Errorf("worker %d already running", w.ID) + } + w.running = true + w.mu.Unlock() + + w.logger.Info("worker starting") + + w.wg.Add(1) + go w.processLoop(ctx) + + return nil +} + +// Stop gracefully stops the worker +func (w *Worker) Stop() error { + w.mu.Lock() + if !w.running { + w.mu.Unlock() + return nil + } + w.mu.Unlock() + + w.logger.Info("worker stopping") + close(w.shutdown) + w.wg.Wait() + + w.mu.Lock() + w.running = false + w.mu.Unlock() + + w.logger.Info("worker stopped") + return nil +} + +// AddJob adds a job to the worker's queue +func (w *Worker) AddJob(job models.Job) error { + select { + case w.jobChan <- job: + return nil + default: + return fmt.Errorf("worker %d job channel is full", w.ID) + } +} + +// processLoop is the main worker processing loop +func (w *Worker) processLoop(ctx context.Context) { + defer w.wg.Done() + defer w.recoverPanic() + + timer := time.NewTimer(time.Duration(w.timerSeconds) * time.Second) + defer timer.Stop() + + for { + select { + case job := <-w.jobChan: + w.updateActivity() + w.processJobs(ctx, &job) + + case <-timer.C: + // Timer expired - fetch jobs from database + if w.timerSeconds > 0 { + w.updateActivity() + w.processJobs(ctx, nil) + } + timer.Reset(time.Duration(w.timerSeconds) * time.Second) + + case <-w.shutdown: + w.logger.Info("worker shutdown signal received") + return + + case <-ctx.Done(): + w.logger.Info("worker context cancelled") + return + } + } +} + +// processJobs processes jobs from the queue +func (w *Worker) processJobs(ctx context.Context, specificJob *models.Job) { + defer w.recoverPanic() + + for i := 0; i < w.fetchSize; i++ { + var jobID int64 + + if specificJob != nil && specificJob.ID > 0 { + jobID = specificJob.ID + specificJob = nil // Only process once + } else { + // Fetch next job from database + var err error + jobID, err = w.fetchNextJob(ctx) + if err != nil { + w.logger.Error("failed to fetch job", "error", err) + return + } + } + + if jobID <= 0 { + // No more jobs + return + } + + // Run the job + if err := w.runJob(ctx, jobID); err != nil { + w.logger.Error("failed to run job", "job_id", jobID, "error", err) + } else { + w.jobsHandled++ + } + } +} + +// fetchNextJob fetches the next job from the queue +func (w *Worker) fetchNextJob(ctx context.Context) (int64, error) { + var retval int + var errmsg string + var jobID int64 + + err := w.db.QueryRow(ctx, + "SELECT p_retval, p_errmsg, p_job_id FROM broker_get($1, $2)", + w.QueueNumber, w.InstanceID, + ).Scan(&retval, &errmsg, &jobID) + + if err != nil { + return 0, fmt.Errorf("query error: %w", err) + } + + if retval > 0 { + return 0, fmt.Errorf("broker_get error: %s", errmsg) + } + + return jobID, nil +} + +// runJob executes a job +func (w *Worker) runJob(ctx context.Context, jobID int64) error { + w.logger.Debug("running job", "job_id", jobID) + + var retval int + var errmsg string + + err := w.db.QueryRow(ctx, + "SELECT p_retval, p_errmsg FROM broker_run($1)", + jobID, + ).Scan(&retval, &errmsg) + + if err != nil { + return fmt.Errorf("query error: %w", err) + } + + if retval > 0 { + return fmt.Errorf("broker_run error: %s", errmsg) + } + + w.logger.Debug("job completed", "job_id", jobID) + return nil +} + +// updateActivity updates the last activity timestamp +func (w *Worker) updateActivity() { + w.mu.Lock() + w.lastActivity = time.Now() + w.mu.Unlock() +} + +// GetStats returns worker statistics +func (w *Worker) GetStats() (lastActivity time.Time, jobsHandled int64, running bool) { + w.mu.RLock() + defer w.mu.RUnlock() + return w.lastActivity, w.jobsHandled, w.running +} + +// recoverPanic recovers from panics in the worker +func (w *Worker) recoverPanic() { + if r := recover(); r != nil { + w.logger.Error("worker panic recovered", "panic", r) + } +}