Files
pgsql-broker/README.md

293 lines
11 KiB
Markdown

# PostgreSQL Broker
A robust, event-driven job processing system for PostgreSQL that uses LISTEN/NOTIFY for real-time job execution. It supports multiple queues, priority-based scheduling, and can be used both as a standalone service or as a Go library.
## Features
- **Multi-Database Support**: Single broker process can manage multiple database connections
- **Event-Driven**: Uses PostgreSQL LISTEN/NOTIFY for instant job notifications
- **Multiple Queues**: Support for concurrent job processing across multiple queues per database
- **Priority Scheduling**: Jobs can be prioritized for execution order
- **Job Dependencies**: Jobs can depend on other jobs being completed first
- **Adapter Pattern**: Clean interfaces for database and logging (easy to extend)
- **Standalone or Library**: Use as a CLI tool or integrate into your Go application
- **Configuration Management**: Viper-based config with support for YAML, JSON, and environment variables
- **Graceful Shutdown**: Proper cleanup and job completion on shutdown
- **Instance Tracking**: Monitor active broker instances through the database
- **Single Instance Per Database**: Enforces one broker instance per database to prevent conflicts
- **Embedded SQL Installer**: Database schema embedded in binary with built-in install command
## Architecture
The broker supports multi-database architecture where a single broker process can manage multiple database connections. Each database has its own instance with dedicated queues, but only ONE broker instance is allowed per database.
```
┌─────────────────────────────────────────────────────────────────┐
│ Broker Process │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌────────────────────────────┐ ┌───────────────────────────┐ │
│ │ Database Instance (DB1) │ │ Database Instance (DB2) │ │
│ ├────────────────────────────┤ ├───────────────────────────┤ │
│ │ ┌────┐ ┌────┐ ┌────┐ │ │ ┌────┐ ┌────┐ │ │
│ │ │ Q1 │ │ Q2 │ │ QN │ │ │ │ Q1 │ │ Q2 │ │ │
│ │ └────┘ └────┘ └────┘ │ │ └────┘ └────┘ │ │
│ │ │ │ │ │
│ │ ┌────────────────────────┐│ │ ┌────────────────────────┐│ │
│ │ │ PostgreSQL Adapter ││ │ │ PostgreSQL Adapter ││ │
│ │ │ - Connection Pool ││ │ │ - Connection Pool ││ │
│ │ │ - LISTEN/NOTIFY ││ │ │ - LISTEN/NOTIFY ││ │
│ │ └────────────────────────┘│ │ └────────────────────────┘│ │
│ └────────────┬───────────────┘ └──────────┬────────────────┘ │
│ │ │ │
└───────────────┼──────────────────────────────┼───────────────────┘
│ │
▼ ▼
┌──────────────────────┐ ┌──────────────────────┐
│ PostgreSQL (DB1) │ │ PostgreSQL (DB2) │
│ - broker_jobs │ │ - broker_jobs │
│ - broker_queueinstance│ │ - broker_queueinstance│
│ - broker_schedule │ │ - broker_schedule │
└──────────────────────┘ └──────────────────────┘
```
**Key Points**:
- One broker process can manage multiple databases
- Each database has exactly ONE active broker instance
- Each database instance has its own queues and workers
- Validation prevents multiple broker processes from connecting to the same database
- Different databases can have different queue counts
## Installation
### From Source
```bash
git clone git.warky.dev/wdevs/pgsql-broker
cd pgsql-broker
make build
```
The binary will be available in `bin/pgsql-broker`.
### As a Library
```bash
go get git.warky.dev/wdevs/pgsql-broker
```
## Quick Start
### 1. Setup Database
Install the required tables and stored procedures:
```bash
# Using the CLI (recommended)
./bin/pgsql-broker install --config broker.yaml
# Or with make
make sql-install
# Verify installation
./bin/pgsql-broker install --verify-only --config broker.yaml
# Or manually with psql:
psql -f pkg/broker/install/sql/tables/00_install.sql
psql -f pkg/broker/install/sql/procedures/00_install.sql
```
### 2. Configure
Create a configuration file `broker.yaml`:
```yaml
databases:
- name: db1
host: localhost
port: 5432
database: broker_db1
user: postgres
password: your_password
sslmode: disable
queue_count: 4
# Optional: add more databases
- name: db2
host: localhost
port: 5432
database: broker_db2
user: postgres
password: your_password
sslmode: disable
queue_count: 2
broker:
name: pgsql-broker
enable_debug: false
logging:
level: info
format: json
```
**Note**: Each database requires a unique `name` identifier and can have its own `queue_count` configuration.
### 3. Run the Broker
```bash
# Using the binary
./bin/pgsql-broker start --config broker.yaml
# Or with make
make run
# Or with custom log level
./bin/pgsql-broker start --log-level debug
```
### 4. Add a Job
```sql
SELECT broker_add_job(
'My Job', -- job_name
'SELECT do_something()', -- execute_str
1, -- job_queue (default: 1)
0, -- job_priority (default: 0)
'sql', -- job_language (default: 'sql')
NULL, -- run_as
NULL, -- user_login
NULL -- schedule_id
);
```
## Usage as a Library
```go
package main
import (
"git.warky.dev/wdevs/pgsql-broker/pkg/broker"
"git.warky.dev/wdevs/pgsql-broker/pkg/broker/adapter"
"git.warky.dev/wdevs/pgsql-broker/pkg/broker/config"
)
func main() {
// Load config
cfg, _ := config.LoadConfig("broker.yaml")
// Create logger
logger := adapter.NewSlogLogger(slog.LevelInfo)
// Create database adapter
dbAdapter := adapter.NewPostgresAdapter(cfg.Database.ToPostgresConfig(), logger)
// Create and start broker
instance, _ := broker.New(cfg, dbAdapter, logger, "1.0.0")
instance.Start()
// ... wait for shutdown signal ...
instance.Stop()
}
```
See the [examples](./examples/) directory for complete examples.
## Database Schema
### Tables
- **broker_queueinstance**: Tracks active broker queue instances (one per database)
- **broker_jobs**: Job queue with status tracking
- **broker_schedule**: Scheduled jobs (cron-like functionality)
### Stored Procedures
- **broker_get**: Fetch the next job from a queue
- **broker_run**: Execute a job
- **broker_set**: Set runtime options (user, application_name, etc.)
- **broker_add_job**: Add a new job to the queue
- **broker_register_instance**: Register a broker instance
- **broker_ping_instance**: Update instance heartbeat
- **broker_shutdown_instance**: Mark instance as shutdown
## Configuration Reference
See [broker.example.yaml](./broker.example.yaml) for a complete configuration example.
### Database Settings
The `databases` array can contain multiple database configurations. Each entry supports:
| Setting | Description | Default |
|---------|-------------|---------|
| `name` | Unique identifier for this database | **Required** |
| `host` | PostgreSQL host | `localhost` |
| `port` | PostgreSQL port | `5432` |
| `database` | Database name | **Required** |
| `user` | Database user | **Required** |
| `password` | Database password | - |
| `sslmode` | SSL mode | `disable` |
| `max_open_conns` | Max open connections | `25` |
| `max_idle_conns` | Max idle connections | `5` |
| `conn_max_lifetime` | Connection max lifetime | `5m` |
| `conn_max_idle_time` | Connection max idle time | `10m` |
| `queue_count` | Number of queues for this database | `4` |
### Broker Settings
Global settings applied to all database instances:
| Setting | Description | Default |
|---------|-------------|---------|
| `name` | Broker instance name | `pgsql-broker` |
| `fetch_query_que_size` | Jobs per fetch cycle | `100` |
| `queue_timer_sec` | Seconds between polls | `10` |
| `queue_buffer_size` | Job buffer size | `50` |
| `worker_idle_timeout_sec` | Worker idle timeout | `10` |
| `notify_retry_seconds` | NOTIFY retry interval | `30s` |
| `enable_debug` | Enable debug logging | `false` |
## Development
### Building
```bash
make build # Build the binary
make clean # Clean build artifacts
make deps # Download dependencies
make fmt # Format code
make vet # Run go vet
make test # Run tests
```
### Project Structure
```
pgsql-broker/
├── cmd/broker/ # CLI application
├── pkg/broker/ # Core broker package
│ ├── adapter/ # Database & logger interfaces
│ ├── config/ # Configuration management
│ ├── models/ # Data models
│ ├── queue/ # Queue management
│ ├── worker/ # Worker implementation
│ └── install/ # Database installer with embedded SQL
│ └── sql/ # SQL schema (embedded in binary)
│ ├── tables/ # Table definitions
│ └── procedures/ # Stored procedures
├── examples/ # Usage examples
└── Makefile # Build automation
```
## Contributing
Contributions are welcome! Please ensure:
- Code is formatted with `go fmt`
- Tests pass with `go test ./...`
- Documentation is updated
## License
See [LICENSE](./LICENSE) file for details.