mirror of
https://github.com/Warky-Devs/vecna.git
synced 2026-05-05 01:26:58 +00:00
204 lines
4.6 KiB
Go
204 lines
4.6 KiB
Go
package embedclient
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/Warky-Devs/vecna.git/pkg/metrics"
|
|
)
|
|
|
|
// RouterConfig holds tuning parameters for a TargetRouter.
|
|
type RouterConfig struct {
|
|
TargetName string
|
|
TimeoutSecs int
|
|
CooldownSecs int
|
|
PriorityDecay int
|
|
PriorityRecovery int
|
|
}
|
|
|
|
type endpointSlot struct {
|
|
client Client
|
|
url string
|
|
initialPriority int
|
|
|
|
mu sync.Mutex
|
|
priority int
|
|
inflight int
|
|
successCount int
|
|
lastFail time.Time
|
|
}
|
|
|
|
// TargetRouter implements Client by routing requests across multiple endpoint slots
|
|
// using a busyness-based priority algorithm.
|
|
type TargetRouter struct {
|
|
slots []*endpointSlot
|
|
cfg RouterConfig
|
|
metrics *metrics.Registry
|
|
}
|
|
|
|
// NewTargetRouter constructs a TargetRouter from a slice of (client, url, initialPriority) tuples.
|
|
func NewTargetRouter(slots []RouterSlot, cfg RouterConfig, reg *metrics.Registry) (*TargetRouter, error) {
|
|
if len(slots) == 0 {
|
|
return nil, fmt.Errorf("NewTargetRouter: at least one slot required")
|
|
}
|
|
es := make([]*endpointSlot, len(slots))
|
|
for i, s := range slots {
|
|
es[i] = &endpointSlot{
|
|
client: s.Client,
|
|
url: s.URL,
|
|
initialPriority: s.Priority,
|
|
priority: s.Priority,
|
|
}
|
|
if reg != nil {
|
|
reg.SetEndpointPriority(cfg.TargetName, s.URL, float64(s.Priority))
|
|
reg.SetEndpointInflight(cfg.TargetName, s.URL, 0)
|
|
}
|
|
}
|
|
return &TargetRouter{slots: es, cfg: cfg, metrics: reg}, nil
|
|
}
|
|
|
|
// RouterSlot is a single endpoint entry for NewTargetRouter.
|
|
type RouterSlot struct {
|
|
Client Client
|
|
URL string
|
|
Priority int
|
|
}
|
|
|
|
func (r *TargetRouter) Embed(ctx context.Context, req Request) (Response, error) {
|
|
tried := make(map[*endpointSlot]bool, len(r.slots))
|
|
var lastErr error
|
|
|
|
for range r.slots {
|
|
if ctx.Err() != nil {
|
|
return Response{}, ctx.Err()
|
|
}
|
|
|
|
slot := r.pickExcluding(tried)
|
|
if slot == nil {
|
|
break
|
|
}
|
|
tried[slot] = true
|
|
|
|
slot.mu.Lock()
|
|
slot.inflight++
|
|
if r.metrics != nil {
|
|
r.metrics.SetEndpointInflight(r.cfg.TargetName, slot.url, float64(slot.inflight))
|
|
}
|
|
slot.mu.Unlock()
|
|
|
|
timeout := time.Duration(r.cfg.TimeoutSecs) * time.Second
|
|
reqCtx, cancel := context.WithTimeout(ctx, timeout)
|
|
resp, err := slot.client.Embed(reqCtx, req)
|
|
cancel()
|
|
|
|
slot.mu.Lock()
|
|
slot.inflight--
|
|
if r.metrics != nil {
|
|
r.metrics.SetEndpointInflight(r.cfg.TargetName, slot.url, float64(slot.inflight))
|
|
}
|
|
slot.mu.Unlock()
|
|
|
|
if err != nil {
|
|
r.onFailure(slot, err)
|
|
lastErr = fmt.Errorf("router embed [%s]: %w", slot.url, err)
|
|
continue
|
|
}
|
|
|
|
r.onSuccess(slot)
|
|
return resp, nil
|
|
}
|
|
|
|
if lastErr != nil {
|
|
return Response{}, lastErr
|
|
}
|
|
return Response{}, fmt.Errorf("router embed: no endpoints available")
|
|
}
|
|
|
|
// pickExcluding selects the best available slot not in the excluded set.
|
|
// On the first call (empty excluded map) it behaves like the original pick().
|
|
// During retry loops, already-tried slots are excluded so each attempt uses a fresh endpoint.
|
|
func (r *TargetRouter) pickExcluding(excluded map[*endpointSlot]bool) *endpointSlot {
|
|
cooldown := time.Duration(r.cfg.CooldownSecs) * time.Second
|
|
now := time.Now()
|
|
|
|
var best *endpointSlot
|
|
bestScore := -1 << 30
|
|
|
|
for _, s := range r.slots {
|
|
if excluded[s] {
|
|
continue
|
|
}
|
|
s.mu.Lock()
|
|
inCooldown := !s.lastFail.IsZero() && now.Sub(s.lastFail) < cooldown
|
|
score := s.priority - s.inflight
|
|
s.mu.Unlock()
|
|
|
|
if inCooldown {
|
|
continue
|
|
}
|
|
if best == nil || score > bestScore {
|
|
best = s
|
|
bestScore = score
|
|
}
|
|
}
|
|
|
|
// All non-excluded slots are in cooldown — fall back to the one with the oldest failure.
|
|
if best == nil {
|
|
var oldest time.Time
|
|
for _, s := range r.slots {
|
|
if excluded[s] {
|
|
continue
|
|
}
|
|
s.mu.Lock()
|
|
lf := s.lastFail
|
|
s.mu.Unlock()
|
|
if best == nil || lf.Before(oldest) {
|
|
best = s
|
|
oldest = lf
|
|
}
|
|
}
|
|
}
|
|
|
|
return best
|
|
}
|
|
|
|
func (r *TargetRouter) onSuccess(s *endpointSlot) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
s.successCount++
|
|
if r.cfg.PriorityRecovery > 0 && s.successCount%r.cfg.PriorityRecovery == 0 {
|
|
if s.priority < s.initialPriority {
|
|
s.priority++
|
|
}
|
|
}
|
|
if r.metrics != nil {
|
|
r.metrics.SetEndpointPriority(r.cfg.TargetName, s.url, float64(s.priority))
|
|
}
|
|
}
|
|
|
|
func (r *TargetRouter) onFailure(s *endpointSlot, err error) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
s.lastFail = time.Now()
|
|
s.priority -= r.cfg.PriorityDecay
|
|
if s.priority < 1 {
|
|
s.priority = 1
|
|
}
|
|
|
|
errType := "error"
|
|
if ctx := context.Background(); ctx.Err() != nil {
|
|
errType = "timeout"
|
|
}
|
|
|
|
if r.metrics != nil {
|
|
r.metrics.SetEndpointPriority(r.cfg.TargetName, s.url, float64(s.priority))
|
|
r.metrics.IncEndpointErrors(r.cfg.TargetName, s.url, errType)
|
|
}
|
|
|
|
_ = err
|
|
}
|