feat(metadata): implement streaming response handling and enhance error management for metadata extraction
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
package compat
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
@@ -104,6 +105,15 @@ type chatCompletionsResponse struct {
|
||||
Error *providerError `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
type chatCompletionsChunk struct {
|
||||
Choices []struct {
|
||||
Delta responseChatMessage `json:"delta"`
|
||||
Message responseChatMessage `json:"message"`
|
||||
Text string `json:"text,omitempty"`
|
||||
} `json:"choices"`
|
||||
Error *providerError `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
type responseChatMessage struct {
|
||||
Role string `json:"role"`
|
||||
Content json.RawMessage `json:"content"`
|
||||
@@ -120,6 +130,7 @@ const maxMetadataAttempts = 3
|
||||
const (
|
||||
emptyResponseCircuitThreshold = 3
|
||||
emptyResponseCircuitTTL = 5 * time.Minute
|
||||
permanentModelFailureTTL = 24 * time.Hour
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -201,6 +212,9 @@ func (c *Client) ExtractMetadata(ctx context.Context, input string) (thoughttype
|
||||
if errors.Is(err, errMetadataEmptyResponse) {
|
||||
c.noteEmptyResponse(c.metadataModel)
|
||||
}
|
||||
if isPermanentModelError(err) {
|
||||
c.notePermanentModelFailure(c.metadataModel, err)
|
||||
}
|
||||
if err == nil {
|
||||
c.noteModelSuccess(c.metadataModel)
|
||||
return result, nil
|
||||
@@ -228,6 +242,9 @@ func (c *Client) ExtractMetadata(ctx context.Context, input string) (thoughttype
|
||||
if errors.Is(fallbackErr, errMetadataEmptyResponse) {
|
||||
c.noteEmptyResponse(fallbackModel)
|
||||
}
|
||||
if isPermanentModelError(fallbackErr) {
|
||||
c.notePermanentModelFailure(fallbackModel, fallbackErr)
|
||||
}
|
||||
if fallbackErr == nil {
|
||||
c.noteModelSuccess(fallbackModel)
|
||||
return fallbackResult, nil
|
||||
@@ -250,7 +267,7 @@ func (c *Client) extractMetadataWithModel(ctx context.Context, input, model stri
|
||||
return thoughttypes.ThoughtMetadata{}, fmt.Errorf("%s metadata: model %q temporarily bypassed after repeated empty responses", c.name, model)
|
||||
}
|
||||
|
||||
stream := false
|
||||
stream := true
|
||||
req := chatCompletionsRequest{
|
||||
Model: model,
|
||||
Temperature: c.temperature,
|
||||
@@ -264,6 +281,25 @@ func (c *Client) extractMetadataWithModel(ctx context.Context, input, model stri
|
||||
},
|
||||
}
|
||||
|
||||
metadata, err := c.extractMetadataWithRequest(ctx, req, input, model)
|
||||
if err == nil || !shouldRetryWithoutJSONMode(err) {
|
||||
return metadata, err
|
||||
}
|
||||
|
||||
if c.log != nil {
|
||||
c.log.Warn("metadata json mode failed, retrying without response_format",
|
||||
slog.String("provider", c.name),
|
||||
slog.String("model", model),
|
||||
slog.String("error", err.Error()),
|
||||
)
|
||||
}
|
||||
|
||||
req.ResponseFormat = nil
|
||||
return c.extractMetadataWithRequest(ctx, req, input, model)
|
||||
}
|
||||
|
||||
func (c *Client) extractMetadataWithRequest(ctx context.Context, req chatCompletionsRequest, input, model string) (thoughttypes.ThoughtMetadata, error) {
|
||||
|
||||
var lastErr error
|
||||
for attempt := 1; attempt <= maxMetadataAttempts; attempt++ {
|
||||
if c.logConversations && c.log != nil {
|
||||
@@ -276,8 +312,8 @@ func (c *Client) extractMetadataWithModel(ctx context.Context, input, model stri
|
||||
)
|
||||
}
|
||||
|
||||
var resp chatCompletionsResponse
|
||||
if err := c.doJSON(ctx, "/chat/completions", req, &resp); err != nil {
|
||||
resp, err := c.doChatCompletions(ctx, req)
|
||||
if err != nil {
|
||||
return thoughttypes.ThoughtMetadata{}, err
|
||||
}
|
||||
if resp.Error != nil {
|
||||
@@ -445,6 +481,199 @@ func (c *Client) doJSON(ctx context.Context, path string, requestBody any, dest
|
||||
return lastErr
|
||||
}
|
||||
|
||||
func (c *Client) doChatCompletions(ctx context.Context, reqBody chatCompletionsRequest) (chatCompletionsResponse, error) {
|
||||
var resp chatCompletionsResponse
|
||||
|
||||
body, err := json.Marshal(reqBody)
|
||||
if err != nil {
|
||||
return resp, fmt.Errorf("%s request marshal: %w", c.name, err)
|
||||
}
|
||||
|
||||
const maxAttempts = 3
|
||||
|
||||
var lastErr error
|
||||
for attempt := 1; attempt <= maxAttempts; attempt++ {
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, strings.TrimRight(c.baseURL, "/")+"/chat/completions", bytes.NewReader(body))
|
||||
if err != nil {
|
||||
return resp, fmt.Errorf("%s build request: %w", c.name, err)
|
||||
}
|
||||
|
||||
req.Header.Set("Authorization", "Bearer "+c.apiKey)
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req.Header.Set("Accept", "application/json, text/event-stream")
|
||||
for key, value := range c.headers {
|
||||
if strings.TrimSpace(key) == "" || strings.TrimSpace(value) == "" {
|
||||
continue
|
||||
}
|
||||
req.Header.Set(key, value)
|
||||
}
|
||||
|
||||
httpResp, err := c.httpClient.Do(req)
|
||||
if err != nil {
|
||||
lastErr = fmt.Errorf("%s request failed: %w", c.name, err)
|
||||
if attempt < maxAttempts && ctx.Err() == nil && isRetryableError(err) {
|
||||
if retryErr := sleepRetry(ctx, attempt, c.log, c.name); retryErr != nil {
|
||||
return resp, retryErr
|
||||
}
|
||||
continue
|
||||
}
|
||||
return resp, lastErr
|
||||
}
|
||||
|
||||
resp, err = c.decodeChatCompletionsResponse(httpResp)
|
||||
if err == nil {
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
lastErr = err
|
||||
if attempt < maxAttempts && (ctx.Err() == nil) && isRetryableChatResponseError(err) {
|
||||
if retryErr := sleepRetry(ctx, attempt, c.log, c.name); retryErr != nil {
|
||||
return resp, retryErr
|
||||
}
|
||||
continue
|
||||
}
|
||||
return resp, lastErr
|
||||
}
|
||||
|
||||
return resp, lastErr
|
||||
}
|
||||
|
||||
func (c *Client) decodeChatCompletionsResponse(resp *http.Response) (chatCompletionsResponse, error) {
|
||||
defer resp.Body.Close()
|
||||
|
||||
contentType := strings.ToLower(resp.Header.Get("Content-Type"))
|
||||
if strings.Contains(contentType, "text/event-stream") {
|
||||
streamResp, err := decodeStreamingChatCompletionsResponse(resp.Body)
|
||||
if err != nil {
|
||||
return chatCompletionsResponse{}, fmt.Errorf("%s read stream response: %w", c.name, err)
|
||||
}
|
||||
if resp.StatusCode >= http.StatusBadRequest {
|
||||
if streamResp.Error != nil {
|
||||
return chatCompletionsResponse{}, fmt.Errorf("%s request failed with status %d: %s", c.name, resp.StatusCode, streamResp.Error.Message)
|
||||
}
|
||||
return chatCompletionsResponse{}, fmt.Errorf("%s request failed with status %d", c.name, resp.StatusCode)
|
||||
}
|
||||
return streamResp, nil
|
||||
}
|
||||
|
||||
payload, readErr := io.ReadAll(resp.Body)
|
||||
if readErr != nil {
|
||||
return chatCompletionsResponse{}, fmt.Errorf("%s read response: %w", c.name, readErr)
|
||||
}
|
||||
if resp.StatusCode >= http.StatusBadRequest {
|
||||
return chatCompletionsResponse{}, fmt.Errorf("%s request failed with status %d: %s", c.name, resp.StatusCode, strings.TrimSpace(string(payload)))
|
||||
}
|
||||
var decoded chatCompletionsResponse
|
||||
if err := json.Unmarshal(payload, &decoded); err != nil {
|
||||
if c.log != nil {
|
||||
c.log.Debug("provider response body", slog.String("provider", c.name), slog.String("body", string(payload)))
|
||||
}
|
||||
return chatCompletionsResponse{}, fmt.Errorf("%s decode response: %w", c.name, err)
|
||||
}
|
||||
return decoded, nil
|
||||
}
|
||||
|
||||
func decodeStreamingChatCompletionsResponse(body io.Reader) (chatCompletionsResponse, error) {
|
||||
scanner := bufio.NewScanner(body)
|
||||
scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024)
|
||||
|
||||
var eventLines []string
|
||||
var textBuilder strings.Builder
|
||||
var lastMessage responseChatMessage
|
||||
var streamErr *providerError
|
||||
|
||||
flushEvent := func() error {
|
||||
if len(eventLines) == 0 {
|
||||
return nil
|
||||
}
|
||||
payload := strings.Join(eventLines, "\n")
|
||||
eventLines = eventLines[:0]
|
||||
|
||||
payload = strings.TrimSpace(payload)
|
||||
if payload == "" {
|
||||
return nil
|
||||
}
|
||||
if payload == "[DONE]" {
|
||||
return nil
|
||||
}
|
||||
|
||||
var chunk chatCompletionsChunk
|
||||
if err := json.Unmarshal([]byte(payload), &chunk); err != nil {
|
||||
return err
|
||||
}
|
||||
if chunk.Error != nil {
|
||||
streamErr = chunk.Error
|
||||
}
|
||||
for _, choice := range chunk.Choices {
|
||||
if text := extractChoiceText(choice.Delta, choice.Text); text != "" {
|
||||
textBuilder.WriteString(text)
|
||||
lastMessage = choice.Delta
|
||||
continue
|
||||
}
|
||||
if text := extractChoiceText(choice.Message, choice.Text); text != "" {
|
||||
textBuilder.WriteString(text)
|
||||
lastMessage = choice.Message
|
||||
continue
|
||||
}
|
||||
if len(choice.Message.Content) > 0 || choice.Message.ReasoningContent != "" {
|
||||
lastMessage = choice.Message
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
for scanner.Scan() {
|
||||
line := scanner.Text()
|
||||
if line == "" {
|
||||
if err := flushEvent(); err != nil {
|
||||
return chatCompletionsResponse{}, err
|
||||
}
|
||||
continue
|
||||
}
|
||||
if strings.HasPrefix(line, ":") {
|
||||
continue
|
||||
}
|
||||
if strings.HasPrefix(line, "data:") {
|
||||
eventLines = append(eventLines, strings.TrimSpace(strings.TrimPrefix(line, "data:")))
|
||||
}
|
||||
}
|
||||
if err := scanner.Err(); err != nil {
|
||||
return chatCompletionsResponse{}, err
|
||||
}
|
||||
if err := flushEvent(); err != nil {
|
||||
return chatCompletionsResponse{}, err
|
||||
}
|
||||
|
||||
content := textBuilder.String()
|
||||
if content != "" || len(lastMessage.Content) > 0 || lastMessage.ReasoningContent != "" {
|
||||
encoded, _ := json.Marshal(content)
|
||||
lastMessage.Content = json.RawMessage(encoded)
|
||||
return chatCompletionsResponse{
|
||||
Choices: []struct {
|
||||
Message responseChatMessage `json:"message"`
|
||||
Text string `json:"text,omitempty"`
|
||||
}{
|
||||
{Message: lastMessage, Text: content},
|
||||
},
|
||||
Error: streamErr,
|
||||
}, nil
|
||||
}
|
||||
|
||||
return chatCompletionsResponse{Error: streamErr}, nil
|
||||
}
|
||||
|
||||
func isRetryableChatResponseError(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
if isRetryableError(err) {
|
||||
return true
|
||||
}
|
||||
|
||||
lower := strings.ToLower(err.Error())
|
||||
return strings.Contains(lower, "read response") || strings.Contains(lower, "read stream response")
|
||||
}
|
||||
|
||||
// extractJSONObject finds the first complete {...} block in s.
|
||||
// It handles models that prepend prose to a JSON response despite json_object mode.
|
||||
func extractJSONObject(s string) string {
|
||||
@@ -772,6 +1001,39 @@ func isRetryableError(err error) bool {
|
||||
return errors.As(err, &netErr)
|
||||
}
|
||||
|
||||
func shouldRetryWithoutJSONMode(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
if errors.Is(err, errMetadataEmptyResponse) || errors.Is(err, errMetadataNoJSONObject) {
|
||||
return true
|
||||
}
|
||||
|
||||
lower := strings.ToLower(err.Error())
|
||||
return strings.Contains(lower, "parse json")
|
||||
}
|
||||
|
||||
func isPermanentModelError(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
lower := strings.ToLower(err.Error())
|
||||
for _, marker := range []string{
|
||||
"invalid model name",
|
||||
"model_not_found",
|
||||
"model not found",
|
||||
"unknown model",
|
||||
"no such model",
|
||||
"does not exist",
|
||||
} {
|
||||
if strings.Contains(lower, marker) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func sleepRetry(ctx context.Context, attempt int, log *slog.Logger, provider string) error {
|
||||
delay := time.Duration(attempt*attempt) * 200 * time.Millisecond
|
||||
if log != nil {
|
||||
@@ -835,3 +1097,22 @@ func (c *Client) noteModelSuccess(model string) {
|
||||
|
||||
delete(c.modelHealth, model)
|
||||
}
|
||||
|
||||
func (c *Client) notePermanentModelFailure(model string, err error) {
|
||||
c.modelHealthMu.Lock()
|
||||
defer c.modelHealthMu.Unlock()
|
||||
|
||||
state := c.modelHealth[model]
|
||||
state.consecutiveEmpty = emptyResponseCircuitThreshold
|
||||
state.unhealthyUntil = time.Now().Add(permanentModelFailureTTL)
|
||||
c.modelHealth[model] = state
|
||||
|
||||
if c.log != nil {
|
||||
c.log.Warn("metadata model marked unhealthy after permanent failure",
|
||||
slog.String("provider", c.name),
|
||||
slog.String("model", model),
|
||||
slog.String("error", err.Error()),
|
||||
slog.Time("until", state.unhealthyUntil),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user