From 058aaa73e4bd18c7ed4d45d07271cf9e1cb9f6dc Mon Sep 17 00:00:00 2001 From: Hein Date: Sat, 11 Apr 2026 22:32:01 +0200 Subject: [PATCH] fix(router): improve endpoint selection logic with retries --- pkg/embedclient/router.go | 69 ++++++++++++++++++++++++++------------- 1 file changed, 46 insertions(+), 23 deletions(-) diff --git a/pkg/embedclient/router.go b/pkg/embedclient/router.go index 73c0293..94131a9 100644 --- a/pkg/embedclient/router.go +++ b/pkg/embedclient/router.go @@ -67,40 +67,59 @@ type RouterSlot struct { } func (r *TargetRouter) Embed(ctx context.Context, req Request) (Response, error) { - slot := r.pick() + tried := make(map[*endpointSlot]bool, len(r.slots)) + var lastErr error - slot.mu.Lock() - slot.inflight++ - if r.metrics != nil { - r.metrics.SetEndpointInflight(r.cfg.TargetName, slot.url, float64(slot.inflight)) - } - slot.mu.Unlock() + for range r.slots { + if ctx.Err() != nil { + return Response{}, ctx.Err() + } + + slot := r.pickExcluding(tried) + if slot == nil { + break + } + tried[slot] = true + + slot.mu.Lock() + slot.inflight++ + if r.metrics != nil { + r.metrics.SetEndpointInflight(r.cfg.TargetName, slot.url, float64(slot.inflight)) + } + slot.mu.Unlock() + + timeout := time.Duration(r.cfg.TimeoutSecs) * time.Second + reqCtx, cancel := context.WithTimeout(ctx, timeout) + resp, err := slot.client.Embed(reqCtx, req) + cancel() - defer func() { slot.mu.Lock() slot.inflight-- if r.metrics != nil { r.metrics.SetEndpointInflight(r.cfg.TargetName, slot.url, float64(slot.inflight)) } slot.mu.Unlock() - }() - timeout := time.Duration(r.cfg.TimeoutSecs) * time.Second - ctx, cancel := context.WithTimeout(ctx, timeout) - defer cancel() + if err != nil { + r.onFailure(slot, err) + lastErr = fmt.Errorf("router embed [%s]: %w", slot.url, err) + continue + } - resp, err := slot.client.Embed(ctx, req) - if err != nil { - r.onFailure(slot, err) - return Response{}, fmt.Errorf("router embed [%s]: %w", slot.url, err) + r.onSuccess(slot) + return resp, nil } - r.onSuccess(slot) - return resp, nil + if lastErr != nil { + return Response{}, lastErr + } + return Response{}, fmt.Errorf("router embed: no endpoints available") } -// pick selects the best available slot. -func (r *TargetRouter) pick() *endpointSlot { +// pickExcluding selects the best available slot not in the excluded set. +// On the first call (empty excluded map) it behaves like the original pick(). +// During retry loops, already-tried slots are excluded so each attempt uses a fresh endpoint. +func (r *TargetRouter) pickExcluding(excluded map[*endpointSlot]bool) *endpointSlot { cooldown := time.Duration(r.cfg.CooldownSecs) * time.Second now := time.Now() @@ -108,10 +127,12 @@ func (r *TargetRouter) pick() *endpointSlot { bestScore := -1 << 30 for _, s := range r.slots { + if excluded[s] { + continue + } s.mu.Lock() inCooldown := !s.lastFail.IsZero() && now.Sub(s.lastFail) < cooldown score := s.priority - s.inflight - lastFail := s.lastFail s.mu.Unlock() if inCooldown { @@ -120,14 +141,16 @@ func (r *TargetRouter) pick() *endpointSlot { if best == nil || score > bestScore { best = s bestScore = score - _ = lastFail } } - // All in cooldown — fall back to oldest failure + // All non-excluded slots are in cooldown — fall back to the one with the oldest failure. if best == nil { var oldest time.Time for _, s := range r.slots { + if excluded[s] { + continue + } s.mu.Lock() lf := s.lastFail s.mu.Unlock()