Files
pgsql-broker/pkg/broker/install/sql/tables/03_broker_jobs.sql
Hein 3e64f7ae2a
Some checks failed
Integration Tests / integration-test (push) Failing after -23m59s
feat(testing): add full integration test suite
This commit introduces a comprehensive integration test suite for the pgsql-broker.

The test suite includes:
- A Docker/Podman environment for running a PostgreSQL database, managed via a .
- Integration tests that cover the broker's lifecycle, including job creation, execution, and instance management.
- A GitHub Actions workflow to automate the execution of all tests on push and pull requests.
- A dedicated test configuration file () and helper test files.

refactor(worker): fix job processing transaction
- The worker's job processing now uses a single transaction to fetch and run a job, resolving a race condition where jobs were not in the 'running' state when being executed.
- The broker's database instance registration is now more robust, handling cases where another instance is already active.

The Makefile has been significantly updated to orchestrate the entire test flow, including setting up the database, starting/stopping the broker, and running unit and integration tests separately.
2026-01-02 23:08:17 +02:00

63 lines
3.0 KiB
PL/PgSQL

-- broker_jobs table
-- Stores jobs to be executed by the broker
CREATE TABLE IF NOT EXISTS broker_jobs (
id_broker_jobs BIGSERIAL PRIMARY KEY,
job_name VARCHAR(255) NOT NULL,
job_priority INTEGER NOT NULL DEFAULT 0,
job_queue INTEGER NOT NULL DEFAULT 1,
job_language VARCHAR(50) NOT NULL DEFAULT 'sql',
execute_str TEXT NOT NULL,
execute_result TEXT,
error_msg TEXT,
complete_status INTEGER NOT NULL DEFAULT 0,
run_as VARCHAR(100),
rid_broker_schedule BIGINT,
rid_broker_queueinstance BIGINT,
depends_on TEXT[],
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
started_at TIMESTAMP WITH TIME ZONE,
completed_at TIMESTAMP WITH TIME ZONE,
CONSTRAINT broker_jobs_complete_status_check CHECK (complete_status IN (0, 1, 2, 3, 4)),
CONSTRAINT broker_jobs_job_queue_check CHECK (job_queue > 0),
CONSTRAINT fk_schedule FOREIGN KEY (rid_broker_schedule) REFERENCES broker_schedule(id_broker_schedule) ON DELETE SET NULL,
CONSTRAINT fk_instance FOREIGN KEY (rid_broker_queueinstance) REFERENCES broker_queueinstance(id_broker_queueinstance) ON DELETE SET NULL
);
-- Indexes
CREATE INDEX IF NOT EXISTS idx_broker_jobs_status ON broker_jobs(complete_status);
CREATE INDEX IF NOT EXISTS idx_broker_jobs_queue ON broker_jobs(job_queue, complete_status, job_priority);
CREATE INDEX IF NOT EXISTS idx_broker_jobs_schedule ON broker_jobs(rid_broker_schedule);
CREATE INDEX IF NOT EXISTS idx_broker_jobs_instance ON broker_jobs(rid_broker_queueinstance);
CREATE INDEX IF NOT EXISTS idx_broker_jobs_created ON broker_jobs(created_at);
CREATE INDEX IF NOT EXISTS idx_broker_jobs_name ON broker_jobs(job_name, complete_status);
-- Comments
COMMENT ON TABLE broker_jobs IS 'Job queue for broker execution';
COMMENT ON COLUMN broker_jobs.job_name IS 'Name/description of the job';
COMMENT ON COLUMN broker_jobs.job_priority IS 'Job priority (higher = more important)';
COMMENT ON COLUMN broker_jobs.job_queue IS 'Queue number (allows parallel processing)';
COMMENT ON COLUMN broker_jobs.job_language IS 'Execution language (sql, plpgsql, etc.)';
COMMENT ON COLUMN broker_jobs.execute_str IS 'SQL or code to execute';
COMMENT ON COLUMN broker_jobs.complete_status IS '0=pending, 1=running, 2=completed, 3=failed, 4=cancelled';
COMMENT ON COLUMN broker_jobs.run_as IS 'User context to run the job as';
COMMENT ON COLUMN broker_jobs.rid_broker_schedule IS 'Reference to schedule if job was scheduled';
COMMENT ON COLUMN broker_jobs.rid_broker_queueinstance IS 'Instance that processed this job';
COMMENT ON COLUMN broker_jobs.depends_on IS 'Array of job names that must be completed before this job can run';
-- Trigger to update updated_at
CREATE OR REPLACE FUNCTION tf_broker_jobs_update_timestamp()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER t_broker_jobs_updated_at
BEFORE UPDATE ON broker_jobs
FOR EACH ROW
EXECUTE FUNCTION tf_broker_jobs_update_timestamp();