package embedclient import ( "context" "fmt" "sync" "time" "github.com/Warky-Devs/vecna.git/pkg/metrics" ) // RouterConfig holds tuning parameters for a TargetRouter. type RouterConfig struct { TargetName string TimeoutSecs int CooldownSecs int PriorityDecay int PriorityRecovery int } type endpointSlot struct { client Client url string initialPriority int mu sync.Mutex priority int inflight int successCount int lastFail time.Time } // TargetRouter implements Client by routing requests across multiple endpoint slots // using a busyness-based priority algorithm. type TargetRouter struct { slots []*endpointSlot cfg RouterConfig metrics *metrics.Registry } // NewTargetRouter constructs a TargetRouter from a slice of (client, url, initialPriority) tuples. func NewTargetRouter(slots []RouterSlot, cfg RouterConfig, reg *metrics.Registry) (*TargetRouter, error) { if len(slots) == 0 { return nil, fmt.Errorf("NewTargetRouter: at least one slot required") } es := make([]*endpointSlot, len(slots)) for i, s := range slots { es[i] = &endpointSlot{ client: s.Client, url: s.URL, initialPriority: s.Priority, priority: s.Priority, } if reg != nil { reg.SetEndpointPriority(cfg.TargetName, s.URL, float64(s.Priority)) reg.SetEndpointInflight(cfg.TargetName, s.URL, 0) } } return &TargetRouter{slots: es, cfg: cfg, metrics: reg}, nil } // RouterSlot is a single endpoint entry for NewTargetRouter. type RouterSlot struct { Client Client URL string Priority int } func (r *TargetRouter) Embed(ctx context.Context, req Request) (Response, error) { slot := r.pick() slot.mu.Lock() slot.inflight++ if r.metrics != nil { r.metrics.SetEndpointInflight(r.cfg.TargetName, slot.url, float64(slot.inflight)) } slot.mu.Unlock() defer func() { slot.mu.Lock() slot.inflight-- if r.metrics != nil { r.metrics.SetEndpointInflight(r.cfg.TargetName, slot.url, float64(slot.inflight)) } slot.mu.Unlock() }() timeout := time.Duration(r.cfg.TimeoutSecs) * time.Second ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() resp, err := slot.client.Embed(ctx, req) if err != nil { r.onFailure(slot, err) return Response{}, fmt.Errorf("router embed [%s]: %w", slot.url, err) } r.onSuccess(slot) return resp, nil } // pick selects the best available slot. func (r *TargetRouter) pick() *endpointSlot { cooldown := time.Duration(r.cfg.CooldownSecs) * time.Second now := time.Now() var best *endpointSlot bestScore := -1 << 30 for _, s := range r.slots { s.mu.Lock() inCooldown := !s.lastFail.IsZero() && now.Sub(s.lastFail) < cooldown score := s.priority - s.inflight lastFail := s.lastFail s.mu.Unlock() if inCooldown { continue } if best == nil || score > bestScore { best = s bestScore = score _ = lastFail } } // All in cooldown — fall back to oldest failure if best == nil { var oldest time.Time for _, s := range r.slots { s.mu.Lock() lf := s.lastFail s.mu.Unlock() if best == nil || lf.Before(oldest) { best = s oldest = lf } } } return best } func (r *TargetRouter) onSuccess(s *endpointSlot) { s.mu.Lock() defer s.mu.Unlock() s.successCount++ if r.cfg.PriorityRecovery > 0 && s.successCount%r.cfg.PriorityRecovery == 0 { if s.priority < s.initialPriority { s.priority++ } } if r.metrics != nil { r.metrics.SetEndpointPriority(r.cfg.TargetName, s.url, float64(s.priority)) } } func (r *TargetRouter) onFailure(s *endpointSlot, err error) { s.mu.Lock() defer s.mu.Unlock() s.lastFail = time.Now() s.priority -= r.cfg.PriorityDecay if s.priority < 1 { s.priority = 1 } errType := "error" if ctx := context.Background(); ctx.Err() != nil { errType = "timeout" } if r.metrics != nil { r.metrics.SetEndpointPriority(r.cfg.TargetName, s.url, float64(s.priority)) r.metrics.IncEndpointErrors(r.cfg.TargetName, s.url, errType) } _ = err }