Improve thought enrichment reliability
This commit is contained in:
209
internal/tools/enrichment_retry.go
Normal file
209
internal/tools/enrichment_retry.go
Normal file
@@ -0,0 +1,209 @@
|
||||
package tools
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/modelcontextprotocol/go-sdk/mcp"
|
||||
"golang.org/x/sync/semaphore"
|
||||
|
||||
"git.warky.dev/wdevs/amcs/internal/ai"
|
||||
"git.warky.dev/wdevs/amcs/internal/config"
|
||||
"git.warky.dev/wdevs/amcs/internal/metadata"
|
||||
"git.warky.dev/wdevs/amcs/internal/session"
|
||||
"git.warky.dev/wdevs/amcs/internal/store"
|
||||
thoughttypes "git.warky.dev/wdevs/amcs/internal/types"
|
||||
)
|
||||
|
||||
const enrichmentRetryConcurrency = 4
|
||||
const enrichmentRetryMaxAttempts = 5
|
||||
|
||||
var enrichmentRetryBackoff = []time.Duration{
|
||||
30 * time.Second,
|
||||
2 * time.Minute,
|
||||
10 * time.Minute,
|
||||
30 * time.Minute,
|
||||
2 * time.Hour,
|
||||
}
|
||||
|
||||
type EnrichmentRetryer struct {
|
||||
backgroundCtx context.Context
|
||||
store *store.DB
|
||||
provider ai.Provider
|
||||
capture config.CaptureConfig
|
||||
sessions *session.ActiveProjects
|
||||
metadataTimeout time.Duration
|
||||
logger *slog.Logger
|
||||
}
|
||||
|
||||
type RetryEnrichmentTool struct {
|
||||
retryer *EnrichmentRetryer
|
||||
}
|
||||
|
||||
type RetryEnrichmentInput struct {
|
||||
Project string `json:"project,omitempty" jsonschema:"optional project name or id to scope the retry"`
|
||||
Limit int `json:"limit,omitempty" jsonschema:"maximum number of thoughts to process in one call; defaults to 100"`
|
||||
IncludeArchived bool `json:"include_archived,omitempty" jsonschema:"whether to include archived thoughts; defaults to false"`
|
||||
OlderThanDays int `json:"older_than_days,omitempty" jsonschema:"only retry thoughts whose last metadata attempt was at least N days ago; 0 means no restriction"`
|
||||
DryRun bool `json:"dry_run,omitempty" jsonschema:"report counts without retrying metadata extraction"`
|
||||
}
|
||||
|
||||
type RetryEnrichmentFailure struct {
|
||||
ID string `json:"id"`
|
||||
Error string `json:"error"`
|
||||
}
|
||||
|
||||
type RetryEnrichmentOutput struct {
|
||||
Scanned int `json:"scanned"`
|
||||
Retried int `json:"retried"`
|
||||
Updated int `json:"updated"`
|
||||
Skipped int `json:"skipped"`
|
||||
Failed int `json:"failed"`
|
||||
DryRun bool `json:"dry_run"`
|
||||
Failures []RetryEnrichmentFailure `json:"failures,omitempty"`
|
||||
}
|
||||
|
||||
func NewEnrichmentRetryer(backgroundCtx context.Context, db *store.DB, provider ai.Provider, capture config.CaptureConfig, metadataTimeout time.Duration, sessions *session.ActiveProjects, logger *slog.Logger) *EnrichmentRetryer {
|
||||
if backgroundCtx == nil {
|
||||
backgroundCtx = context.Background()
|
||||
}
|
||||
return &EnrichmentRetryer{
|
||||
backgroundCtx: backgroundCtx,
|
||||
store: db,
|
||||
provider: provider,
|
||||
capture: capture,
|
||||
sessions: sessions,
|
||||
metadataTimeout: metadataTimeout,
|
||||
logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
func NewRetryEnrichmentTool(retryer *EnrichmentRetryer) *RetryEnrichmentTool {
|
||||
return &RetryEnrichmentTool{retryer: retryer}
|
||||
}
|
||||
|
||||
func (t *RetryEnrichmentTool) Handle(ctx context.Context, req *mcp.CallToolRequest, in RetryEnrichmentInput) (*mcp.CallToolResult, RetryEnrichmentOutput, error) {
|
||||
return t.retryer.Handle(ctx, req, in)
|
||||
}
|
||||
|
||||
func (r *EnrichmentRetryer) QueueThought(id uuid.UUID) {
|
||||
go func() {
|
||||
if _, err := r.retryOne(r.backgroundCtx, id); err != nil {
|
||||
r.logger.Warn("background metadata retry failed",
|
||||
slog.String("thought_id", id.String()),
|
||||
slog.String("error", err.Error()),
|
||||
)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (r *EnrichmentRetryer) Handle(ctx context.Context, req *mcp.CallToolRequest, in RetryEnrichmentInput) (*mcp.CallToolResult, RetryEnrichmentOutput, error) {
|
||||
limit := in.Limit
|
||||
if limit <= 0 {
|
||||
limit = 100
|
||||
}
|
||||
|
||||
project, err := resolveProject(ctx, r.store, r.sessions, req, in.Project, false)
|
||||
if err != nil {
|
||||
return nil, RetryEnrichmentOutput{}, err
|
||||
}
|
||||
|
||||
var projectID *uuid.UUID
|
||||
if project != nil {
|
||||
projectID = &project.ID
|
||||
}
|
||||
|
||||
thoughts, err := r.store.ListThoughtsPendingMetadataRetry(ctx, limit, projectID, in.IncludeArchived, in.OlderThanDays)
|
||||
if err != nil {
|
||||
return nil, RetryEnrichmentOutput{}, err
|
||||
}
|
||||
|
||||
out := RetryEnrichmentOutput{Scanned: len(thoughts), DryRun: in.DryRun}
|
||||
if in.DryRun || len(thoughts) == 0 {
|
||||
return nil, out, nil
|
||||
}
|
||||
|
||||
sem := semaphore.NewWeighted(enrichmentRetryConcurrency)
|
||||
var mu sync.Mutex
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for _, thought := range thoughts {
|
||||
if ctx.Err() != nil {
|
||||
break
|
||||
}
|
||||
if err := sem.Acquire(ctx, 1); err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
go func(thought thoughttypes.Thought) {
|
||||
defer wg.Done()
|
||||
defer sem.Release(1)
|
||||
|
||||
mu.Lock()
|
||||
out.Retried++
|
||||
mu.Unlock()
|
||||
|
||||
updated, err := r.retryOne(ctx, thought.ID)
|
||||
if err != nil {
|
||||
mu.Lock()
|
||||
out.Failures = append(out.Failures, RetryEnrichmentFailure{ID: thought.ID.String(), Error: err.Error()})
|
||||
mu.Unlock()
|
||||
return
|
||||
}
|
||||
if updated {
|
||||
mu.Lock()
|
||||
out.Updated++
|
||||
mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
mu.Lock()
|
||||
out.Skipped++
|
||||
mu.Unlock()
|
||||
}(thought)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
out.Failed = len(out.Failures)
|
||||
|
||||
return nil, out, nil
|
||||
}
|
||||
|
||||
func (r *EnrichmentRetryer) retryOne(ctx context.Context, id uuid.UUID) (bool, error) {
|
||||
thought, err := r.store.GetThought(ctx, id)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if thought.Metadata.MetadataStatus == metadata.MetadataStatusComplete {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
attemptCtx := ctx
|
||||
if r.metadataTimeout > 0 {
|
||||
var cancel context.CancelFunc
|
||||
attemptCtx, cancel = context.WithTimeout(ctx, r.metadataTimeout)
|
||||
defer cancel()
|
||||
}
|
||||
|
||||
attemptedAt := time.Now().UTC()
|
||||
extracted, extractErr := r.provider.ExtractMetadata(attemptCtx, thought.Content)
|
||||
if extractErr != nil {
|
||||
failedMetadata := metadata.MarkMetadataFailed(thought.Metadata, r.capture, attemptedAt, extractErr)
|
||||
if _, updateErr := r.store.UpdateThoughtMetadata(ctx, thought.ID, failedMetadata); updateErr != nil {
|
||||
return false, updateErr
|
||||
}
|
||||
return false, extractErr
|
||||
}
|
||||
|
||||
completedMetadata := metadata.MarkMetadataComplete(metadata.SanitizeExtracted(extracted), r.capture, attemptedAt)
|
||||
completedMetadata.Attachments = thought.Metadata.Attachments
|
||||
if _, updateErr := r.store.UpdateThoughtMetadata(ctx, thought.ID, completedMetadata); updateErr != nil {
|
||||
return false, updateErr
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
Reference in New Issue
Block a user