Files
vecna/pkg/embedclient/router.go
Hein 4009a54e39 feat: 🎉 Vectors na Vectors, the begining
Translate 1536 <-> 768 , 3072 <-> 2048
2026-04-11 18:05:05 +02:00

181 lines
4.0 KiB
Go

package embedclient
import (
"context"
"fmt"
"sync"
"time"
"github.com/Warky-Devs/vecna.git/pkg/metrics"
)
// RouterConfig holds tuning parameters for a TargetRouter.
type RouterConfig struct {
TargetName string
TimeoutSecs int
CooldownSecs int
PriorityDecay int
PriorityRecovery int
}
type endpointSlot struct {
client Client
url string
initialPriority int
mu sync.Mutex
priority int
inflight int
successCount int
lastFail time.Time
}
// TargetRouter implements Client by routing requests across multiple endpoint slots
// using a busyness-based priority algorithm.
type TargetRouter struct {
slots []*endpointSlot
cfg RouterConfig
metrics *metrics.Registry
}
// NewTargetRouter constructs a TargetRouter from a slice of (client, url, initialPriority) tuples.
func NewTargetRouter(slots []RouterSlot, cfg RouterConfig, reg *metrics.Registry) (*TargetRouter, error) {
if len(slots) == 0 {
return nil, fmt.Errorf("NewTargetRouter: at least one slot required")
}
es := make([]*endpointSlot, len(slots))
for i, s := range slots {
es[i] = &endpointSlot{
client: s.Client,
url: s.URL,
initialPriority: s.Priority,
priority: s.Priority,
}
if reg != nil {
reg.SetEndpointPriority(cfg.TargetName, s.URL, float64(s.Priority))
reg.SetEndpointInflight(cfg.TargetName, s.URL, 0)
}
}
return &TargetRouter{slots: es, cfg: cfg, metrics: reg}, nil
}
// RouterSlot is a single endpoint entry for NewTargetRouter.
type RouterSlot struct {
Client Client
URL string
Priority int
}
func (r *TargetRouter) Embed(ctx context.Context, req Request) (Response, error) {
slot := r.pick()
slot.mu.Lock()
slot.inflight++
if r.metrics != nil {
r.metrics.SetEndpointInflight(r.cfg.TargetName, slot.url, float64(slot.inflight))
}
slot.mu.Unlock()
defer func() {
slot.mu.Lock()
slot.inflight--
if r.metrics != nil {
r.metrics.SetEndpointInflight(r.cfg.TargetName, slot.url, float64(slot.inflight))
}
slot.mu.Unlock()
}()
timeout := time.Duration(r.cfg.TimeoutSecs) * time.Second
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
resp, err := slot.client.Embed(ctx, req)
if err != nil {
r.onFailure(slot, err)
return Response{}, fmt.Errorf("router embed [%s]: %w", slot.url, err)
}
r.onSuccess(slot)
return resp, nil
}
// pick selects the best available slot.
func (r *TargetRouter) pick() *endpointSlot {
cooldown := time.Duration(r.cfg.CooldownSecs) * time.Second
now := time.Now()
var best *endpointSlot
bestScore := -1 << 30
for _, s := range r.slots {
s.mu.Lock()
inCooldown := !s.lastFail.IsZero() && now.Sub(s.lastFail) < cooldown
score := s.priority - s.inflight
lastFail := s.lastFail
s.mu.Unlock()
if inCooldown {
continue
}
if best == nil || score > bestScore {
best = s
bestScore = score
_ = lastFail
}
}
// All in cooldown — fall back to oldest failure
if best == nil {
var oldest time.Time
for _, s := range r.slots {
s.mu.Lock()
lf := s.lastFail
s.mu.Unlock()
if best == nil || lf.Before(oldest) {
best = s
oldest = lf
}
}
}
return best
}
func (r *TargetRouter) onSuccess(s *endpointSlot) {
s.mu.Lock()
defer s.mu.Unlock()
s.successCount++
if r.cfg.PriorityRecovery > 0 && s.successCount%r.cfg.PriorityRecovery == 0 {
if s.priority < s.initialPriority {
s.priority++
}
}
if r.metrics != nil {
r.metrics.SetEndpointPriority(r.cfg.TargetName, s.url, float64(s.priority))
}
}
func (r *TargetRouter) onFailure(s *endpointSlot, err error) {
s.mu.Lock()
defer s.mu.Unlock()
s.lastFail = time.Now()
s.priority -= r.cfg.PriorityDecay
if s.priority < 1 {
s.priority = 1
}
errType := "error"
if ctx := context.Background(); ctx.Err() != nil {
errType = "timeout"
}
if r.metrics != nil {
r.metrics.SetEndpointPriority(r.cfg.TargetName, s.url, float64(s.priority))
r.metrics.IncEndpointErrors(r.cfg.TargetName, s.url, errType)
}
_ = err
}