From b6e156011f2a34349cbfa3fc91eeb9ab86b973b7 Mon Sep 17 00:00:00 2001 From: sgcommand Date: Mon, 13 Apr 2026 23:04:11 +0200 Subject: [PATCH] Improve thought enrichment reliability --- internal/app/app.go | 6 +- internal/tools/capture.go | 102 +++++++------- internal/tools/enrichment_retry.go | 209 +++++++++++++++++++++++++++++ internal/tools/metadata_retry.go | 42 ++++++ 4 files changed, 306 insertions(+), 53 deletions(-) create mode 100644 internal/tools/enrichment_retry.go diff --git a/internal/app/app.go b/internal/app/app.go index bd15e6c..4ae9359 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -162,11 +162,11 @@ func routes(logger *slog.Logger, cfg *config.Config, info buildinfo.Info, db *st oauthEnabled := oauthRegistry != nil && tokenStore != nil authMiddleware := auth.Middleware(cfg.Auth, keyring, oauthRegistry, tokenStore, accessTracker, logger) filesTool := tools.NewFilesTool(db, activeProjects) - metadataRetryer := tools.NewMetadataRetryer(context.Background(), db, provider, cfg.Capture, cfg.AI.Metadata.Timeout, activeProjects, logger) + enrichmentRetryer := tools.NewEnrichmentRetryer(context.Background(), db, provider, cfg.Capture, cfg.AI.Metadata.Timeout, activeProjects, logger) backfillTool := tools.NewBackfillTool(db, provider, activeProjects, logger) toolSet := mcpserver.ToolSet{ - Capture: tools.NewCaptureTool(db, provider, cfg.Capture, cfg.AI.Metadata.Timeout, activeProjects, metadataRetryer, backfillTool, logger), + Capture: tools.NewCaptureTool(db, provider, cfg.Capture, cfg.AI.Metadata.Timeout, activeProjects, enrichmentRetryer, backfillTool, logger), Search: tools.NewSearchTool(db, provider, cfg.Search, activeProjects), List: tools.NewListTool(db, cfg.Search, activeProjects), Stats: tools.NewStatsTool(db), @@ -183,7 +183,7 @@ func routes(logger *slog.Logger, cfg *config.Config, info buildinfo.Info, db *st Files: filesTool, Backfill: backfillTool, Reparse: tools.NewReparseMetadataTool(db, provider, cfg.Capture, activeProjects, logger), - RetryMetadata: tools.NewRetryMetadataTool(metadataRetryer), + RetryMetadata: tools.NewRetryEnrichmentTool(enrichmentRetryer), Maintenance: tools.NewMaintenanceTool(db), Skills: tools.NewSkillsTool(db, activeProjects), ChatHistory: tools.NewChatHistoryTool(db, activeProjects), diff --git a/internal/tools/capture.go b/internal/tools/capture.go index 7483ec0..eb33ea8 100644 --- a/internal/tools/capture.go +++ b/internal/tools/capture.go @@ -8,7 +8,6 @@ import ( "github.com/google/uuid" "github.com/modelcontextprotocol/go-sdk/mcp" - "golang.org/x/sync/errgroup" "git.warky.dev/wdevs/amcs/internal/ai" "git.warky.dev/wdevs/amcs/internal/config" @@ -58,52 +57,10 @@ func (t *CaptureTool) Handle(ctx context.Context, req *mcp.CallToolRequest, in C return nil, CaptureOutput{}, err } - var embedding []float32 rawMetadata := metadata.Fallback(t.capture) - metadataNeedsRetry := false - embeddingNeedsRetry := false - - group, groupCtx := errgroup.WithContext(ctx) - group.Go(func() error { - vector, err := t.provider.Embed(groupCtx, content) - if err != nil { - t.log.Warn("embedding failed, thought will be saved without embedding", - slog.String("provider", t.provider.Name()), - slog.String("error", err.Error()), - ) - embeddingNeedsRetry = true - return nil - } - embedding = vector - return nil - }) - group.Go(func() error { - metaCtx := groupCtx - attemptedAt := time.Now().UTC() - if t.metadataTimeout > 0 { - var cancel context.CancelFunc - metaCtx, cancel = context.WithTimeout(groupCtx, t.metadataTimeout) - defer cancel() - } - extracted, err := t.provider.ExtractMetadata(metaCtx, content) - if err != nil { - t.log.Warn("metadata extraction failed, using fallback", slog.String("provider", t.provider.Name()), slog.String("error", err.Error())) - rawMetadata = metadata.MarkMetadataPending(rawMetadata, t.capture, attemptedAt, err) - metadataNeedsRetry = true - return nil - } - rawMetadata = metadata.MarkMetadataComplete(extracted, t.capture, attemptedAt) - return nil - }) - - if err := group.Wait(); err != nil { - return nil, CaptureOutput{}, err - } - thought := thoughttypes.Thought{ - Content: content, - Embedding: embedding, - Metadata: metadata.Normalize(metadata.SanitizeExtracted(rawMetadata), t.capture), + Content: content, + Metadata: rawMetadata, } if project != nil { thought.ProjectID = &project.ID @@ -116,12 +73,57 @@ func (t *CaptureTool) Handle(ctx context.Context, req *mcp.CallToolRequest, in C if project != nil { _ = t.store.TouchProject(ctx, project.ID) } - if metadataNeedsRetry && t.retryer != nil { - t.retryer.QueueThought(created.ID) - } - if embeddingNeedsRetry && t.embedRetryer != nil { - t.embedRetryer.QueueThought(ctx, created.ID, content) + + if t.retryer != nil || t.embedRetryer != nil { + t.launchEnrichment(created.ID, content) } return nil, CaptureOutput{Thought: created}, nil } + +func (t *CaptureTool) launchEnrichment(id uuid.UUID, content string) { + go func() { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + if t.retryer != nil { + attemptedAt := time.Now().UTC() + rawMetadata := metadata.Fallback(t.capture) + extracted, err := t.provider.ExtractMetadata(ctx, content) + if err != nil { + failed := metadata.MarkMetadataFailed(rawMetadata, t.capture, attemptedAt, err) + if _, updateErr := t.store.UpdateThoughtMetadata(ctx, id, failed); updateErr != nil { + t.log.Warn("deferred metadata failure could not be persisted", + slog.String("thought_id", id.String()), + slog.String("error", updateErr.Error()), + ) + } + t.log.Warn("deferred metadata extraction failed", + slog.String("thought_id", id.String()), + slog.String("provider", t.provider.Name()), + slog.String("error", err.Error()), + ) + t.retryer.QueueThought(id) + } else { + completed := metadata.MarkMetadataComplete(extracted, t.capture, attemptedAt) + if _, updateErr := t.store.UpdateThoughtMetadata(ctx, id, completed); updateErr != nil { + t.log.Warn("deferred metadata completion could not be persisted", + slog.String("thought_id", id.String()), + slog.String("error", updateErr.Error()), + ) + } + } + } + + if t.embedRetryer != nil { + if _, err := t.provider.Embed(ctx, content); err != nil { + t.log.Warn("deferred embedding failed", + slog.String("thought_id", id.String()), + slog.String("provider", t.provider.Name()), + slog.String("error", err.Error()), + ) + } + t.embedRetryer.QueueThought(ctx, id, content) + } + }() +} diff --git a/internal/tools/enrichment_retry.go b/internal/tools/enrichment_retry.go new file mode 100644 index 0000000..6a3d4d4 --- /dev/null +++ b/internal/tools/enrichment_retry.go @@ -0,0 +1,209 @@ +package tools + +import ( + "context" + "log/slog" + "sync" + "time" + + "github.com/google/uuid" + "github.com/modelcontextprotocol/go-sdk/mcp" + "golang.org/x/sync/semaphore" + + "git.warky.dev/wdevs/amcs/internal/ai" + "git.warky.dev/wdevs/amcs/internal/config" + "git.warky.dev/wdevs/amcs/internal/metadata" + "git.warky.dev/wdevs/amcs/internal/session" + "git.warky.dev/wdevs/amcs/internal/store" + thoughttypes "git.warky.dev/wdevs/amcs/internal/types" +) + +const enrichmentRetryConcurrency = 4 +const enrichmentRetryMaxAttempts = 5 + +var enrichmentRetryBackoff = []time.Duration{ + 30 * time.Second, + 2 * time.Minute, + 10 * time.Minute, + 30 * time.Minute, + 2 * time.Hour, +} + +type EnrichmentRetryer struct { + backgroundCtx context.Context + store *store.DB + provider ai.Provider + capture config.CaptureConfig + sessions *session.ActiveProjects + metadataTimeout time.Duration + logger *slog.Logger +} + +type RetryEnrichmentTool struct { + retryer *EnrichmentRetryer +} + +type RetryEnrichmentInput struct { + Project string `json:"project,omitempty" jsonschema:"optional project name or id to scope the retry"` + Limit int `json:"limit,omitempty" jsonschema:"maximum number of thoughts to process in one call; defaults to 100"` + IncludeArchived bool `json:"include_archived,omitempty" jsonschema:"whether to include archived thoughts; defaults to false"` + OlderThanDays int `json:"older_than_days,omitempty" jsonschema:"only retry thoughts whose last metadata attempt was at least N days ago; 0 means no restriction"` + DryRun bool `json:"dry_run,omitempty" jsonschema:"report counts without retrying metadata extraction"` +} + +type RetryEnrichmentFailure struct { + ID string `json:"id"` + Error string `json:"error"` +} + +type RetryEnrichmentOutput struct { + Scanned int `json:"scanned"` + Retried int `json:"retried"` + Updated int `json:"updated"` + Skipped int `json:"skipped"` + Failed int `json:"failed"` + DryRun bool `json:"dry_run"` + Failures []RetryEnrichmentFailure `json:"failures,omitempty"` +} + +func NewEnrichmentRetryer(backgroundCtx context.Context, db *store.DB, provider ai.Provider, capture config.CaptureConfig, metadataTimeout time.Duration, sessions *session.ActiveProjects, logger *slog.Logger) *EnrichmentRetryer { + if backgroundCtx == nil { + backgroundCtx = context.Background() + } + return &EnrichmentRetryer{ + backgroundCtx: backgroundCtx, + store: db, + provider: provider, + capture: capture, + sessions: sessions, + metadataTimeout: metadataTimeout, + logger: logger, + } +} + +func NewRetryEnrichmentTool(retryer *EnrichmentRetryer) *RetryEnrichmentTool { + return &RetryEnrichmentTool{retryer: retryer} +} + +func (t *RetryEnrichmentTool) Handle(ctx context.Context, req *mcp.CallToolRequest, in RetryEnrichmentInput) (*mcp.CallToolResult, RetryEnrichmentOutput, error) { + return t.retryer.Handle(ctx, req, in) +} + +func (r *EnrichmentRetryer) QueueThought(id uuid.UUID) { + go func() { + if _, err := r.retryOne(r.backgroundCtx, id); err != nil { + r.logger.Warn("background metadata retry failed", + slog.String("thought_id", id.String()), + slog.String("error", err.Error()), + ) + } + }() +} + +func (r *EnrichmentRetryer) Handle(ctx context.Context, req *mcp.CallToolRequest, in RetryEnrichmentInput) (*mcp.CallToolResult, RetryEnrichmentOutput, error) { + limit := in.Limit + if limit <= 0 { + limit = 100 + } + + project, err := resolveProject(ctx, r.store, r.sessions, req, in.Project, false) + if err != nil { + return nil, RetryEnrichmentOutput{}, err + } + + var projectID *uuid.UUID + if project != nil { + projectID = &project.ID + } + + thoughts, err := r.store.ListThoughtsPendingMetadataRetry(ctx, limit, projectID, in.IncludeArchived, in.OlderThanDays) + if err != nil { + return nil, RetryEnrichmentOutput{}, err + } + + out := RetryEnrichmentOutput{Scanned: len(thoughts), DryRun: in.DryRun} + if in.DryRun || len(thoughts) == 0 { + return nil, out, nil + } + + sem := semaphore.NewWeighted(enrichmentRetryConcurrency) + var mu sync.Mutex + var wg sync.WaitGroup + + for _, thought := range thoughts { + if ctx.Err() != nil { + break + } + if err := sem.Acquire(ctx, 1); err != nil { + break + } + + wg.Add(1) + go func(thought thoughttypes.Thought) { + defer wg.Done() + defer sem.Release(1) + + mu.Lock() + out.Retried++ + mu.Unlock() + + updated, err := r.retryOne(ctx, thought.ID) + if err != nil { + mu.Lock() + out.Failures = append(out.Failures, RetryEnrichmentFailure{ID: thought.ID.String(), Error: err.Error()}) + mu.Unlock() + return + } + if updated { + mu.Lock() + out.Updated++ + mu.Unlock() + return + } + + mu.Lock() + out.Skipped++ + mu.Unlock() + }(thought) + } + + wg.Wait() + out.Failed = len(out.Failures) + + return nil, out, nil +} + +func (r *EnrichmentRetryer) retryOne(ctx context.Context, id uuid.UUID) (bool, error) { + thought, err := r.store.GetThought(ctx, id) + if err != nil { + return false, err + } + if thought.Metadata.MetadataStatus == metadata.MetadataStatusComplete { + return false, nil + } + + attemptCtx := ctx + if r.metadataTimeout > 0 { + var cancel context.CancelFunc + attemptCtx, cancel = context.WithTimeout(ctx, r.metadataTimeout) + defer cancel() + } + + attemptedAt := time.Now().UTC() + extracted, extractErr := r.provider.ExtractMetadata(attemptCtx, thought.Content) + if extractErr != nil { + failedMetadata := metadata.MarkMetadataFailed(thought.Metadata, r.capture, attemptedAt, extractErr) + if _, updateErr := r.store.UpdateThoughtMetadata(ctx, thought.ID, failedMetadata); updateErr != nil { + return false, updateErr + } + return false, extractErr + } + + completedMetadata := metadata.MarkMetadataComplete(metadata.SanitizeExtracted(extracted), r.capture, attemptedAt) + completedMetadata.Attachments = thought.Metadata.Attachments + if _, updateErr := r.store.UpdateThoughtMetadata(ctx, thought.ID, completedMetadata); updateErr != nil { + return false, updateErr + } + + return true, nil +} diff --git a/internal/tools/metadata_retry.go b/internal/tools/metadata_retry.go index c49e356..ceb2268 100644 --- a/internal/tools/metadata_retry.go +++ b/internal/tools/metadata_retry.go @@ -28,12 +28,42 @@ type MetadataRetryer struct { sessions *session.ActiveProjects metadataTimeout time.Duration logger *slog.Logger + lock *RetryLocker } type RetryMetadataTool struct { retryer *MetadataRetryer } +type RetryLocker struct { + mu sync.Mutex + locks map[uuid.UUID]time.Time +} + +func NewRetryLocker() *RetryLocker { + return &RetryLocker{locks: map[uuid.UUID]time.Time{}} +} + +func (l *RetryLocker) Acquire(id uuid.UUID, ttl time.Duration) bool { + l.mu.Lock() + defer l.mu.Unlock() + if l.locks == nil { + l.locks = map[uuid.UUID]time.Time{} + } + now := time.Now() + if exp, ok := l.locks[id]; ok && exp.After(now) { + return false + } + l.locks[id] = now.Add(ttl) + return true +} + +func (l *RetryLocker) Release(id uuid.UUID) { + l.mu.Lock() + defer l.mu.Unlock() + delete(l.locks, id) +} + type RetryMetadataInput struct { Project string `json:"project,omitempty" jsonschema:"optional project name or id to scope the retry"` Limit int `json:"limit,omitempty" jsonschema:"maximum number of thoughts to process in one call; defaults to 100"` @@ -69,6 +99,7 @@ func NewMetadataRetryer(backgroundCtx context.Context, db *store.DB, provider ai sessions: sessions, metadataTimeout: metadataTimeout, logger: logger, + lock: NewRetryLocker(), } } @@ -82,6 +113,10 @@ func (t *RetryMetadataTool) Handle(ctx context.Context, req *mcp.CallToolRequest func (r *MetadataRetryer) QueueThought(id uuid.UUID) { go func() { + if !r.lock.Acquire(id, 15*time.Minute) { + return + } + defer r.lock.Release(id) if _, err := r.retryOne(r.backgroundCtx, id); err != nil { r.logger.Warn("background metadata retry failed", slog.String("thought_id", id.String()), slog.String("error", err.Error())) } @@ -138,7 +173,14 @@ func (r *MetadataRetryer) Handle(ctx context.Context, req *mcp.CallToolRequest, out.Retried++ mu.Unlock() + if !r.lock.Acquire(thought.ID, 15*time.Minute) { + mu.Lock() + out.Skipped++ + mu.Unlock() + return + } updated, err := r.retryOne(ctx, thought.ID) + r.lock.Release(thought.ID) if err != nil { mu.Lock() out.Failures = append(out.Failures, RetryMetadataFailure{ID: thought.ID.String(), Error: err.Error()})