package broker import ( "context" "database/sql" // Import sql package "encoding/json" "fmt" "os" "sync" "time" "git.warky.dev/wdevs/pgsql-broker/pkg/broker/adapter" "git.warky.dev/wdevs/pgsql-broker/pkg/broker/config" "git.warky.dev/wdevs/pgsql-broker/pkg/broker/models" "git.warky.dev/wdevs/pgsql-broker/pkg/broker/queue" ) // DatabaseInstance represents a broker instance for a single database type DatabaseInstance struct { ID int64 Name string DatabaseName string Hostname string PID int Version string config *config.Config dbConfig *config.DatabaseConfig db adapter.DBAdapter logger adapter.Logger queues map[int]*queue.Queue queuesMu sync.RWMutex ctx context.Context shutdown bool shutdownMu sync.RWMutex jobsHandled int64 startTime time.Time } // NewDatabaseInstance creates a new database instance func NewDatabaseInstance(cfg *config.Config, dbCfg *config.DatabaseConfig, db adapter.DBAdapter, logger adapter.Logger, version string, parentCtx context.Context) (*DatabaseInstance, error) { hostname, err := os.Hostname() if err != nil { hostname = "unknown" } instance := &DatabaseInstance{ Name: fmt.Sprintf("%s-%s", cfg.Broker.Name, dbCfg.Name), DatabaseName: dbCfg.Name, Hostname: hostname, PID: os.Getpid(), Version: version, config: cfg, dbConfig: dbCfg, db: db, logger: logger.With("component", "database-instance").With("database", dbCfg.Name), queues: make(map[int]*queue.Queue), ctx: parentCtx, startTime: time.Now(), } return instance, nil } // Start begins the database instance func (i *DatabaseInstance) Start() error { i.logger.Info("starting database instance", "name", i.Name, "hostname", i.Hostname, "pid", i.PID) // Connect to database if err := i.db.Connect(i.ctx); err != nil { return fmt.Errorf("failed to connect to database: %w", err) } // Register instance in database if err := i.registerInstance(); err != nil { return fmt.Errorf("failed to register instance: %w", err) } i.logger.Info("database instance registered", "id", i.ID) // Start queues if err := i.startQueues(); err != nil { return fmt.Errorf("failed to start queues: %w", err) } // Start listening for notifications if err := i.startListener(); err != nil { return fmt.Errorf("failed to start listener: %w", err) } // Start ping routine go i.pingRoutine() i.logger.Info("database instance started successfully") return nil } // Stop gracefully stops the database instance func (i *DatabaseInstance) Stop() error { i.shutdownMu.Lock() if i.shutdown { i.shutdownMu.Unlock() return nil } i.shutdown = true i.shutdownMu.Unlock() i.logger.Info("stopping database instance") // Stop all queues i.queuesMu.Lock() for num, q := range i.queues { i.logger.Info("stopping queue", "number", num) if err := q.Stop(); err != nil { i.logger.Error("failed to stop queue", "number", num, "error", err) } } i.queuesMu.Unlock() // Update instance status in database if err := i.shutdownInstance(); err != nil { i.logger.Error("failed to shutdown instance in database", "error", err) } // Close database connection if err := i.db.Close(); err != nil { i.logger.Error("failed to close database", "error", err) } i.logger.Info("database instance stopped") return nil } // registerInstance registers the instance in the database func (i *DatabaseInstance) registerInstance() error { var retval int var errmsg string var nullableInstanceID sql.NullInt64 // Change to nullable type i.logger.Debug("registering instance", "name", i.Name, "hostname", i.Hostname, "pid", i.PID, "version", i.Version, "queue_count", i.dbConfig.QueueCount) err := i.db.QueryRow(i.ctx, "SELECT p_retval, p_errmsg, p_instance_id FROM broker_register_instance($1, $2, $3, $4, $5)", i.Name, i.Hostname, i.PID, i.Version, i.dbConfig.QueueCount, ).Scan(&retval, &errmsg, &nullableInstanceID) if err != nil { i.logger.Error("query error during instance registration", "error", err) return fmt.Errorf("query error: %w", err) } if retval == 3 { i.logger.Warn("another broker instance is already active, attempting to retrieve ID", "error", errmsg) // Try to retrieve the ID of the active instance var activeID int64 err := i.db.QueryRow(i.ctx, "SELECT id_broker_queueinstance FROM broker_queueinstance WHERE name = $1 AND hostname = $2 AND status = 'active' ORDER BY started_at DESC LIMIT 1", i.Name, i.Hostname, ).Scan(&activeID) if err != nil { i.logger.Error("failed to retrieve ID of active instance", "error", err) return fmt.Errorf("failed to retrieve ID of active instance: %w", err) } i.ID = activeID i.logger.Info("retrieved active instance ID", "id", i.ID) return nil } else if retval > 0 { i.logger.Error("broker_register_instance error", "retval", retval, "errmsg", errmsg) return fmt.Errorf("broker_register_instance error: %s", errmsg) } // If successfully registered, nullableInstanceID.Valid will be true if nullableInstanceID.Valid { i.ID = nullableInstanceID.Int64 i.logger.Info("registered new instance", "id", i.ID) // Debug logging: Retrieve all entries from broker_queueinstance rows, err := i.db.Query(i.ctx, "SELECT id_broker_queueinstance, name, hostname, status FROM broker_queueinstance") if err != nil { i.logger.Error("debug query failed", "error", err) } else { defer rows.Close() for rows.Next() { var id int64 var name, hostname, status string if err := rows.Scan(&id, &name, &hostname, &status); err != nil { i.logger.Error("debug scan failed", "error", err) break } i.logger.Debug("broker_queueinstance entry", "id", id, "name", name, "hostname", hostname, "status", status) } } } else { // This case should ideally not happen if retval is 0 (success) // but if it does, it means p_instance_id was NULL despite success. // This would be an unexpected scenario. i.logger.Error("broker_register_instance returned success but no instance ID", "retval", retval, "errmsg", errmsg) return fmt.Errorf("broker_register_instance returned success but no instance ID") } return nil } // startQueues initializes and starts all queues func (i *DatabaseInstance) startQueues() error { i.queuesMu.Lock() defer i.queuesMu.Unlock() for queueNum := 1; queueNum <= i.dbConfig.QueueCount; queueNum++ { queueCfg := queue.Config{ Number: queueNum, InstanceID: i.ID, WorkerCount: 1, // One worker per queue for now DBAdapter: i.db, Logger: i.logger, BufferSize: i.config.Broker.QueueBufferSize, TimerSeconds: i.config.Broker.QueueTimerSec, FetchSize: i.config.Broker.FetchQueryQueSize, } q := queue.New(queueCfg) if err := q.Start(queueCfg); err != nil { return fmt.Errorf("failed to start queue %d: %w", queueNum, err) } i.queues[queueNum] = q i.logger.Info("queue started", "number", queueNum) } return nil } // startListener starts listening for database notifications func (i *DatabaseInstance) startListener() error { handler := func(n *adapter.Notification) { i.handleNotification(n) } if err := i.db.Listen(i.ctx, "broker.event", handler); err != nil { return fmt.Errorf("failed to start listener: %w", err) } return nil } // handleNotification processes incoming job notifications func (i *DatabaseInstance) handleNotification(n *adapter.Notification) { if i.config.Broker.EnableDebug { i.logger.Debug("received notification", "channel", n.Channel, "payload", n.Payload) } var job models.Job if err := json.Unmarshal([]byte(n.Payload), &job); err != nil { i.logger.Error("failed to unmarshal notification", "error", err, "payload", n.Payload) return } if job.ID <= 0 { i.logger.Warn("notification missing job ID", "payload", n.Payload) return } if job.JobQueue <= 0 { i.logger.Warn("notification missing queue number", "job_id", job.ID) return } // Get the queue i.queuesMu.RLock() q, exists := i.queues[job.JobQueue] i.queuesMu.RUnlock() if !exists { i.logger.Warn("queue not found for job", "job_id", job.ID, "queue", job.JobQueue) return } // Add job to queue if err := q.AddJob(job); err != nil { i.logger.Error("failed to add job to queue", "job_id", job.ID, "queue", job.JobQueue, "error", err) } } // pingRoutine periodically updates the instance status in the database func (i *DatabaseInstance) pingRoutine() { ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: i.shutdownMu.RLock() if i.shutdown { i.shutdownMu.RUnlock() return } i.shutdownMu.RUnlock() if err := i.ping(); err != nil { i.logger.Error("ping failed", "error", err) } case <-i.ctx.Done(): return } } } // ping updates the instance ping timestamp func (i *DatabaseInstance) ping() error { var retval int var errmsg string err := i.db.QueryRow(i.ctx, "SELECT p_retval, p_errmsg FROM broker_ping_instance($1, $2)", i.ID, i.jobsHandled, ).Scan(&retval, &errmsg) if err != nil { return fmt.Errorf("query error: %w", err) } if retval > 0 { return fmt.Errorf("broker_ping_instance error: %s", errmsg) } return nil } // shutdownInstance marks the instance as shutdown in the database func (i *DatabaseInstance) shutdownInstance() error { var retval int var errmsg string err := i.db.QueryRow(i.ctx, "SELECT p_retval, p_errmsg FROM broker_shutdown_instance($1)", i.ID, ).Scan(&retval, &errmsg) if err != nil { return fmt.Errorf("query error: %w", err) } if retval > 0 { return fmt.Errorf("broker_shutdown_instance error: %s", errmsg) } return nil } // GetStats returns instance statistics func (i *DatabaseInstance) GetStats() map[string]interface{} { i.queuesMu.RLock() defer i.queuesMu.RUnlock() stats := map[string]interface{}{ "id": i.ID, "name": i.Name, "database_name": i.DatabaseName, "hostname": i.Hostname, "pid": i.PID, "version": i.Version, "uptime": time.Since(i.startTime).String(), "jobs_handled": i.jobsHandled, "queue_count": len(i.queues), } queueStats := make(map[int]interface{}) for num, q := range i.queues { queueStats[num] = q.GetStats() } stats["queues"] = queueStats return stats }