mirror of
https://github.com/Warky-Devs/vecna.git
synced 2026-05-05 01:26:58 +00:00
fix(router): improve endpoint selection logic with retries
This commit is contained in:
@@ -67,40 +67,59 @@ type RouterSlot struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (r *TargetRouter) Embed(ctx context.Context, req Request) (Response, error) {
|
func (r *TargetRouter) Embed(ctx context.Context, req Request) (Response, error) {
|
||||||
slot := r.pick()
|
tried := make(map[*endpointSlot]bool, len(r.slots))
|
||||||
|
var lastErr error
|
||||||
|
|
||||||
slot.mu.Lock()
|
for range r.slots {
|
||||||
slot.inflight++
|
if ctx.Err() != nil {
|
||||||
if r.metrics != nil {
|
return Response{}, ctx.Err()
|
||||||
r.metrics.SetEndpointInflight(r.cfg.TargetName, slot.url, float64(slot.inflight))
|
}
|
||||||
}
|
|
||||||
slot.mu.Unlock()
|
slot := r.pickExcluding(tried)
|
||||||
|
if slot == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
tried[slot] = true
|
||||||
|
|
||||||
|
slot.mu.Lock()
|
||||||
|
slot.inflight++
|
||||||
|
if r.metrics != nil {
|
||||||
|
r.metrics.SetEndpointInflight(r.cfg.TargetName, slot.url, float64(slot.inflight))
|
||||||
|
}
|
||||||
|
slot.mu.Unlock()
|
||||||
|
|
||||||
|
timeout := time.Duration(r.cfg.TimeoutSecs) * time.Second
|
||||||
|
reqCtx, cancel := context.WithTimeout(ctx, timeout)
|
||||||
|
resp, err := slot.client.Embed(reqCtx, req)
|
||||||
|
cancel()
|
||||||
|
|
||||||
defer func() {
|
|
||||||
slot.mu.Lock()
|
slot.mu.Lock()
|
||||||
slot.inflight--
|
slot.inflight--
|
||||||
if r.metrics != nil {
|
if r.metrics != nil {
|
||||||
r.metrics.SetEndpointInflight(r.cfg.TargetName, slot.url, float64(slot.inflight))
|
r.metrics.SetEndpointInflight(r.cfg.TargetName, slot.url, float64(slot.inflight))
|
||||||
}
|
}
|
||||||
slot.mu.Unlock()
|
slot.mu.Unlock()
|
||||||
}()
|
|
||||||
|
|
||||||
timeout := time.Duration(r.cfg.TimeoutSecs) * time.Second
|
if err != nil {
|
||||||
ctx, cancel := context.WithTimeout(ctx, timeout)
|
r.onFailure(slot, err)
|
||||||
defer cancel()
|
lastErr = fmt.Errorf("router embed [%s]: %w", slot.url, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
resp, err := slot.client.Embed(ctx, req)
|
r.onSuccess(slot)
|
||||||
if err != nil {
|
return resp, nil
|
||||||
r.onFailure(slot, err)
|
|
||||||
return Response{}, fmt.Errorf("router embed [%s]: %w", slot.url, err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
r.onSuccess(slot)
|
if lastErr != nil {
|
||||||
return resp, nil
|
return Response{}, lastErr
|
||||||
|
}
|
||||||
|
return Response{}, fmt.Errorf("router embed: no endpoints available")
|
||||||
}
|
}
|
||||||
|
|
||||||
// pick selects the best available slot.
|
// pickExcluding selects the best available slot not in the excluded set.
|
||||||
func (r *TargetRouter) pick() *endpointSlot {
|
// On the first call (empty excluded map) it behaves like the original pick().
|
||||||
|
// During retry loops, already-tried slots are excluded so each attempt uses a fresh endpoint.
|
||||||
|
func (r *TargetRouter) pickExcluding(excluded map[*endpointSlot]bool) *endpointSlot {
|
||||||
cooldown := time.Duration(r.cfg.CooldownSecs) * time.Second
|
cooldown := time.Duration(r.cfg.CooldownSecs) * time.Second
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
|
||||||
@@ -108,10 +127,12 @@ func (r *TargetRouter) pick() *endpointSlot {
|
|||||||
bestScore := -1 << 30
|
bestScore := -1 << 30
|
||||||
|
|
||||||
for _, s := range r.slots {
|
for _, s := range r.slots {
|
||||||
|
if excluded[s] {
|
||||||
|
continue
|
||||||
|
}
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
inCooldown := !s.lastFail.IsZero() && now.Sub(s.lastFail) < cooldown
|
inCooldown := !s.lastFail.IsZero() && now.Sub(s.lastFail) < cooldown
|
||||||
score := s.priority - s.inflight
|
score := s.priority - s.inflight
|
||||||
lastFail := s.lastFail
|
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
|
|
||||||
if inCooldown {
|
if inCooldown {
|
||||||
@@ -120,14 +141,16 @@ func (r *TargetRouter) pick() *endpointSlot {
|
|||||||
if best == nil || score > bestScore {
|
if best == nil || score > bestScore {
|
||||||
best = s
|
best = s
|
||||||
bestScore = score
|
bestScore = score
|
||||||
_ = lastFail
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// All in cooldown — fall back to oldest failure
|
// All non-excluded slots are in cooldown — fall back to the one with the oldest failure.
|
||||||
if best == nil {
|
if best == nil {
|
||||||
var oldest time.Time
|
var oldest time.Time
|
||||||
for _, s := range r.slots {
|
for _, s := range r.slots {
|
||||||
|
if excluded[s] {
|
||||||
|
continue
|
||||||
|
}
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
lf := s.lastFail
|
lf := s.lastFail
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
|
|||||||
Reference in New Issue
Block a user