diff --git a/Makefile b/Makefile index f0aba4c..c3b144c 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -.PHONY: all build clean test test-all test-integration-go test-unit-go test-connection schema-install broker-start broker-stop install deps help +.PHONY: all build clean test test-all test-integration-go test-unit-go test-connection schema-install broker-start broker-stop install deps docker-up docker-down help # Build variables BINARY_NAME=pgsql-broker @@ -8,6 +8,28 @@ GO=go GOFLAGS=-v LDFLAGS=-w -s +# Auto-detect container runtime (Docker or Podman) +CONTAINER_RUNTIME := $(shell \ + if command -v podman > /dev/null 2>&1; then \ + echo "podman"; \ + elif command -v docker > /dev/null 2>&1; then \ + echo "docker"; \ + else \ + echo "none"; \ + fi) + +# Detect compose command +COMPOSE_CMD := $(shell \ + if [ "$(CONTAINER_RUNTIME)" = "podman" ]; then \ + echo "podman-compose"; \ + elif command -v docker-compose > /dev/null 2>&1; then \ + echo "docker-compose"; \ + else \ + echo "docker compose"; \ + fi) + + + # Version information VERSION ?= $(shell git describe --tags --always --dirty 2>/dev/null || echo "dev") BUILD_TIME=$(shell date -u '+2026-01-02_19:58:30') @@ -119,6 +141,38 @@ sql-install-manual: ## Install SQL tables and procedures manually via psql @psql -f pkg/broker/install/sql/procedures/00_install.sql @echo "SQL schema installed" +docker-up: ## Start PostgreSQL test database + @echo "Starting PostgreSQL test database (using $(CONTAINER_RUNTIME))..." + @if [ "$(CONTAINER_RUNTIME)" = "none" ]; then \ + echo "Error: Neither Docker nor Podman is installed"; \ + exit 1; \ + fi + @if [ "$(CONTAINER_RUNTIME)" = "podman" ]; then \ + podman run -d --name pgsql-broker-test-postgres \ + -e POSTGRES_USER=user \ + -e POSTGRES_PASSWORD=password \ + -e POSTGRES_DB=broker_test \ + -p 5433:5432 \ + postgres:13 2>/dev/null || echo "Container already running"; \ + else \ + cd tests && $(COMPOSE_CMD) up -d; \ + fi + @echo "Waiting for PostgreSQL to be ready..." + @sleep 3 + @echo "PostgreSQL is running on port 5433" + @echo "Connection: postgres://user:password@localhost:5433/broker_test" + +docker-down: ## Stop PostgreSQL test database + @echo "Stopping PostgreSQL test database (using $(CONTAINER_RUNTIME))..." + @if [ "$(CONTAINER_RUNTIME)" = "podman" ]; then \ + podman stop pgsql-broker-test-postgres 2>/dev/null || true; \ + podman rm pgsql-broker-test-postgres 2>/dev/null || true; \ + else \ + cd tests && $(COMPOSE_CMD) down; \ + fi + @echo "PostgreSQL stopped" + + help: ## Show this help message @echo "Usage: make [target]" @echo "" diff --git a/tests/integration/workflow_test.go b/tests/integration/workflow_test.go index 2fbedb8..227b243 100644 --- a/tests/integration/workflow_test.go +++ b/tests/integration/workflow_test.go @@ -1,9 +1,274 @@ package integration import ( + "context" + "database/sql" + "log/slog" "testing" + "time" + + _ "github.com/lib/pq" + "github.com/stretchr/testify/require" + + "git.warky.dev/wdevs/pgsql-broker/pkg/broker" + "git.warky.dev/wdevs/pgsql-broker/pkg/broker/adapter" + "git.warky.dev/wdevs/pgsql-broker/pkg/broker/config" + "git.warky.dev/wdevs/pgsql-broker/pkg/broker/install" + "git.warky.dev/wdevs/pgsql-broker/pkg/broker/models" ) +// TestBrokerWorkflow tests the complete broker workflow: +// 1. Set up database and install schema +// 2. Create and start broker +// 3. Add a job +// 4. Wait for job to be processed +// 5. Verify job completed successfully func TestBrokerWorkflow(t *testing.T) { - t.Skip("Skipping TestBrokerWorkflow due to persistent database visibility issues") + ctx := context.Background() + + // Database connection string + connStr := "user=user password=password dbname=broker_test host=localhost port=5433 sslmode=disable" + + // Connect to database with retry logic + db, err := connectWithRetry(connStr, 10, 2*time.Second) + require.NoError(t, err, "Failed to connect to database") + defer db.Close() + + // Clean up any existing schema + t.Log("Cleaning up existing schema...") + cleanupSchema(t, db) + + // Set up logger + logger := adapter.NewSlogLogger(slog.LevelInfo) + + // Create database adapter + postgresConfig := adapter.PostgresConfig{ + Host: "localhost", + Port: 5433, + Database: "broker_test", + User: "user", + Password: "password", + SSLMode: "disable", + MaxOpenConns: 25, + MaxIdleConns: 5, + ConnMaxLifetime: 5 * time.Minute, + ConnMaxIdleTime: 10 * time.Minute, + } + dbAdapter := adapter.NewPostgresAdapter(postgresConfig, logger) + defer dbAdapter.Close() + + // Connect to database + t.Log("Connecting to database via adapter...") + err = dbAdapter.Connect(ctx) + require.NoError(t, err, "Failed to connect to database via adapter") + + // Install schema + t.Log("Installing database schema...") + installer := install.New(dbAdapter, logger) + err = installer.InstallSchema(ctx) + require.NoError(t, err, "Failed to install schema") + + // Verify installation + t.Log("Verifying schema installation...") + err = installer.VerifyInstallation(ctx) + require.NoError(t, err, "Schema verification failed") + + // Create broker configuration + cfg := &config.Config{ + Databases: []config.DatabaseConfig{ + { + Name: "test_db", + Host: "localhost", + Port: 5433, + Database: "broker_test", + User: "user", + Password: "password", + SSLMode: "disable", + QueueCount: 2, + }, + }, + Broker: config.BrokerConfig{ + Name: "test-broker", + FetchQueryQueSize: 10, + QueueTimerSec: 1, // Short interval for testing + QueueBufferSize: 10, + WorkerIdleTimeoutSec: 5, + NotifyRetrySeconds: 5 * time.Second, + EnableDebug: true, + }, + Logging: config.LoggingConfig{ + Level: "info", + Format: "text", + }, + } + + // Create and start broker + t.Log("Creating broker instance...") + brk, err := broker.New(cfg, logger, "test-1.0.0") + require.NoError(t, err, "Failed to create broker") + + t.Log("Starting broker...") + err = brk.Start() + require.NoError(t, err, "Failed to start broker") + + // Give broker time to initialize + time.Sleep(2 * time.Second) + + // Defer broker shutdown + defer func() { + t.Log("Stopping broker...") + err := brk.Stop() + require.NoError(t, err, "Failed to stop broker") + }() + + // Add a test job using broker_add_job function + t.Log("Adding test job...") + var retval int + var errmsg string + var jobID int64 + err = db.QueryRowContext(ctx, ` + SELECT * FROM broker_add_job( + $1, -- job_name + $2, -- execute_str + $3, -- job_queue + $4, -- job_priority + $5, -- job_language + NULL, -- run_as + NULL, -- schedule_id + NULL -- depends_on + ) + `, + "Test Job", + "SELECT 'Job executed successfully' AS result", + 1, + 0, + "sql", + ).Scan(&retval, &errmsg, &jobID) + require.NoError(t, err, "Failed to add job") + require.Equal(t, 0, retval, "broker_add_job returned error: %s", errmsg) + require.Greater(t, jobID, int64(0), "Invalid job ID returned") + t.Logf("Added job with ID: %d", jobID) + + // Wait for job to be processed + t.Log("Waiting for job to be processed...") + maxWaitTime := 30 * time.Second + checkInterval := 500 * time.Millisecond + deadline := time.Now().Add(maxWaitTime) + + var job models.Job + var executeResult sql.NullString + var errorMsg sql.NullString + jobCompleted := false + + for time.Now().Before(deadline) { + err = db.QueryRowContext(ctx, ` + SELECT id_broker_jobs, job_name, job_priority, job_queue, job_language, + execute_str, execute_result, error_msg, complete_status, + created_at, updated_at + FROM broker_jobs + WHERE id_broker_jobs = $1 + `, jobID).Scan( + &job.ID, + &job.JobName, + &job.JobPriority, + &job.JobQueue, + &job.JobLanguage, + &job.ExecuteStr, + &executeResult, + &errorMsg, + &job.CompleteStatus, + &job.CreatedAt, + &job.UpdatedAt, + ) + require.NoError(t, err, "Failed to query job status") + + // Handle nullable fields + if executeResult.Valid { + job.ExecuteResult = executeResult.String + } + if errorMsg.Valid { + job.ErrorMsg = errorMsg.String + } + + t.Logf("Job status: %d (0=pending, 1=running, 2=completed, 3=failed)", job.CompleteStatus) + + if job.CompleteStatus == int(models.JobStatusCompleted) { + jobCompleted = true + break + } else if job.CompleteStatus == int(models.JobStatusFailed) { + t.Fatalf("Job failed with error: %s", job.ErrorMsg) + } + + time.Sleep(checkInterval) + } + + require.True(t, jobCompleted, "Job was not completed within the timeout period") + t.Logf("Job completed successfully!") + t.Logf(" Job ID: %d", job.ID) + t.Logf(" Job Name: %s", job.JobName) + t.Logf(" Execute Result: %s", job.ExecuteResult) + t.Logf(" Status: %d", job.CompleteStatus) + + // Verify job details + require.Equal(t, jobID, job.ID, "Job ID mismatch") + require.Equal(t, "Test Job", job.JobName, "Job name mismatch") + require.Equal(t, int(models.JobStatusCompleted), job.CompleteStatus, "Job should be completed") + require.NotEmpty(t, job.ExecuteResult, "Execute result should not be empty") + require.Empty(t, job.ErrorMsg, "Error message should be empty for successful job") + + t.Log("Broker workflow test completed successfully!") +} + +// connectWithRetry attempts to connect to the database with retry logic +func connectWithRetry(connStr string, maxRetries int, retryInterval time.Duration) (*sql.DB, error) { + var db *sql.DB + var err error + + for i := 0; i < maxRetries; i++ { + db, err = sql.Open("postgres", connStr) + if err != nil { + time.Sleep(retryInterval) + continue + } + + err = db.Ping() + if err == nil { + return db, nil + } + + db.Close() + time.Sleep(retryInterval) + } + + return nil, err +} + +// cleanupSchema removes all broker tables and functions for a clean test +func cleanupSchema(t *testing.T, db *sql.DB) { + tables := []string{"broker_jobs", "broker_queueinstance", "broker_schedule"} + procedures := []string{ + "broker_get", + "broker_run", + "broker_set", + "broker_add_job", + "broker_register_instance", + "broker_ping_instance", + "broker_shutdown_instance", + } + + // Drop procedures + for _, proc := range procedures { + _, err := db.Exec("DROP FUNCTION IF EXISTS " + proc + " CASCADE") + if err != nil { + t.Logf("Warning: failed to drop procedure %s: %v", proc, err) + } + } + + // Drop tables + for _, table := range tables { + _, err := db.Exec("DROP TABLE IF EXISTS " + table + " CASCADE") + if err != nil { + t.Logf("Warning: failed to drop table %s: %v", table, err) + } + } }