Compare commits
3 Commits
structured
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| a6165a0f2e | |||
| b6e156011f | |||
| 4d107cb87e |
@@ -162,10 +162,11 @@ func routes(logger *slog.Logger, cfg *config.Config, info buildinfo.Info, db *st
|
|||||||
oauthEnabled := oauthRegistry != nil && tokenStore != nil
|
oauthEnabled := oauthRegistry != nil && tokenStore != nil
|
||||||
authMiddleware := auth.Middleware(cfg.Auth, keyring, oauthRegistry, tokenStore, accessTracker, logger)
|
authMiddleware := auth.Middleware(cfg.Auth, keyring, oauthRegistry, tokenStore, accessTracker, logger)
|
||||||
filesTool := tools.NewFilesTool(db, activeProjects)
|
filesTool := tools.NewFilesTool(db, activeProjects)
|
||||||
metadataRetryer := tools.NewMetadataRetryer(context.Background(), db, provider, cfg.Capture, cfg.AI.Metadata.Timeout, activeProjects, logger)
|
enrichmentRetryer := tools.NewEnrichmentRetryer(context.Background(), db, provider, cfg.Capture, cfg.AI.Metadata.Timeout, activeProjects, logger)
|
||||||
|
backfillTool := tools.NewBackfillTool(db, provider, activeProjects, logger)
|
||||||
|
|
||||||
toolSet := mcpserver.ToolSet{
|
toolSet := mcpserver.ToolSet{
|
||||||
Capture: tools.NewCaptureTool(db, provider, cfg.Capture, cfg.AI.Metadata.Timeout, activeProjects, metadataRetryer, logger),
|
Capture: tools.NewCaptureTool(db, provider, cfg.Capture, cfg.AI.Metadata.Timeout, activeProjects, enrichmentRetryer, backfillTool, logger),
|
||||||
Search: tools.NewSearchTool(db, provider, cfg.Search, activeProjects),
|
Search: tools.NewSearchTool(db, provider, cfg.Search, activeProjects),
|
||||||
List: tools.NewListTool(db, cfg.Search, activeProjects),
|
List: tools.NewListTool(db, cfg.Search, activeProjects),
|
||||||
Stats: tools.NewStatsTool(db),
|
Stats: tools.NewStatsTool(db),
|
||||||
@@ -180,9 +181,9 @@ func routes(logger *slog.Logger, cfg *config.Config, info buildinfo.Info, db *st
|
|||||||
Summarize: tools.NewSummarizeTool(db, provider, cfg.Search, activeProjects),
|
Summarize: tools.NewSummarizeTool(db, provider, cfg.Search, activeProjects),
|
||||||
Links: tools.NewLinksTool(db, provider, cfg.Search),
|
Links: tools.NewLinksTool(db, provider, cfg.Search),
|
||||||
Files: filesTool,
|
Files: filesTool,
|
||||||
Backfill: tools.NewBackfillTool(db, provider, activeProjects, logger),
|
Backfill: backfillTool,
|
||||||
Reparse: tools.NewReparseMetadataTool(db, provider, cfg.Capture, activeProjects, logger),
|
Reparse: tools.NewReparseMetadataTool(db, provider, cfg.Capture, activeProjects, logger),
|
||||||
RetryMetadata: tools.NewRetryMetadataTool(metadataRetryer),
|
RetryMetadata: tools.NewRetryEnrichmentTool(enrichmentRetryer),
|
||||||
Maintenance: tools.NewMaintenanceTool(db),
|
Maintenance: tools.NewMaintenanceTool(db),
|
||||||
Skills: tools.NewSkillsTool(db, activeProjects),
|
Skills: tools.NewSkillsTool(db, activeProjects),
|
||||||
ChatHistory: tools.NewChatHistoryTool(db, activeProjects),
|
ChatHistory: tools.NewChatHistoryTool(db, activeProjects),
|
||||||
|
|||||||
@@ -58,6 +58,12 @@ func (db *DB) InsertThought(ctx context.Context, thought thoughttypes.Thought, e
|
|||||||
return thoughttypes.Thought{}, fmt.Errorf("commit thought insert: %w", err)
|
return thoughttypes.Thought{}, fmt.Errorf("commit thought insert: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(thought.Embedding) > 0 {
|
||||||
|
created.EmbeddingStatus = "done"
|
||||||
|
} else {
|
||||||
|
created.EmbeddingStatus = "pending"
|
||||||
|
}
|
||||||
|
|
||||||
return created, nil
|
return created, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -51,6 +51,30 @@ func NewBackfillTool(db *store.DB, provider ai.Provider, sessions *session.Activ
|
|||||||
return &BackfillTool{store: db, provider: provider, sessions: sessions, logger: logger}
|
return &BackfillTool{store: db, provider: provider, sessions: sessions, logger: logger}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// QueueThought queues a single thought for background embedding generation.
|
||||||
|
// It is used by capture when the embedding provider is temporarily unavailable.
|
||||||
|
func (t *BackfillTool) QueueThought(ctx context.Context, id uuid.UUID, content string) {
|
||||||
|
go func() {
|
||||||
|
vec, err := t.provider.Embed(ctx, content)
|
||||||
|
if err != nil {
|
||||||
|
t.logger.Warn("background embedding retry failed",
|
||||||
|
slog.String("thought_id", id.String()),
|
||||||
|
slog.String("error", err.Error()),
|
||||||
|
)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
model := t.provider.EmbeddingModel()
|
||||||
|
if err := t.store.UpsertEmbedding(ctx, id, model, vec); err != nil {
|
||||||
|
t.logger.Warn("background embedding upsert failed",
|
||||||
|
slog.String("thought_id", id.String()),
|
||||||
|
slog.String("error", err.Error()),
|
||||||
|
)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
t.logger.Info("background embedding retry succeeded", slog.String("thought_id", id.String()))
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
func (t *BackfillTool) Handle(ctx context.Context, req *mcp.CallToolRequest, in BackfillInput) (*mcp.CallToolResult, BackfillOutput, error) {
|
func (t *BackfillTool) Handle(ctx context.Context, req *mcp.CallToolRequest, in BackfillInput) (*mcp.CallToolResult, BackfillOutput, error) {
|
||||||
limit := in.Limit
|
limit := in.Limit
|
||||||
if limit <= 0 {
|
if limit <= 0 {
|
||||||
|
|||||||
@@ -6,8 +6,8 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
"github.com/modelcontextprotocol/go-sdk/mcp"
|
"github.com/modelcontextprotocol/go-sdk/mcp"
|
||||||
"golang.org/x/sync/errgroup"
|
|
||||||
|
|
||||||
"git.warky.dev/wdevs/amcs/internal/ai"
|
"git.warky.dev/wdevs/amcs/internal/ai"
|
||||||
"git.warky.dev/wdevs/amcs/internal/config"
|
"git.warky.dev/wdevs/amcs/internal/config"
|
||||||
@@ -17,6 +17,11 @@ import (
|
|||||||
thoughttypes "git.warky.dev/wdevs/amcs/internal/types"
|
thoughttypes "git.warky.dev/wdevs/amcs/internal/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// EmbeddingQueuer queues a thought for background embedding generation.
|
||||||
|
type EmbeddingQueuer interface {
|
||||||
|
QueueThought(ctx context.Context, id uuid.UUID, content string)
|
||||||
|
}
|
||||||
|
|
||||||
type CaptureTool struct {
|
type CaptureTool struct {
|
||||||
store *store.DB
|
store *store.DB
|
||||||
provider ai.Provider
|
provider ai.Provider
|
||||||
@@ -24,6 +29,7 @@ type CaptureTool struct {
|
|||||||
sessions *session.ActiveProjects
|
sessions *session.ActiveProjects
|
||||||
metadataTimeout time.Duration
|
metadataTimeout time.Duration
|
||||||
retryer *MetadataRetryer
|
retryer *MetadataRetryer
|
||||||
|
embedRetryer EmbeddingQueuer
|
||||||
log *slog.Logger
|
log *slog.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -36,8 +42,8 @@ type CaptureOutput struct {
|
|||||||
Thought thoughttypes.Thought `json:"thought"`
|
Thought thoughttypes.Thought `json:"thought"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewCaptureTool(db *store.DB, provider ai.Provider, capture config.CaptureConfig, metadataTimeout time.Duration, sessions *session.ActiveProjects, retryer *MetadataRetryer, log *slog.Logger) *CaptureTool {
|
func NewCaptureTool(db *store.DB, provider ai.Provider, capture config.CaptureConfig, metadataTimeout time.Duration, sessions *session.ActiveProjects, retryer *MetadataRetryer, embedRetryer EmbeddingQueuer, log *slog.Logger) *CaptureTool {
|
||||||
return &CaptureTool{store: db, provider: provider, capture: capture, sessions: sessions, metadataTimeout: metadataTimeout, retryer: retryer, log: log}
|
return &CaptureTool{store: db, provider: provider, capture: capture, sessions: sessions, metadataTimeout: metadataTimeout, retryer: retryer, embedRetryer: embedRetryer, log: log}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *CaptureTool) Handle(ctx context.Context, req *mcp.CallToolRequest, in CaptureInput) (*mcp.CallToolResult, CaptureOutput, error) {
|
func (t *CaptureTool) Handle(ctx context.Context, req *mcp.CallToolRequest, in CaptureInput) (*mcp.CallToolResult, CaptureOutput, error) {
|
||||||
@@ -51,46 +57,10 @@ func (t *CaptureTool) Handle(ctx context.Context, req *mcp.CallToolRequest, in C
|
|||||||
return nil, CaptureOutput{}, err
|
return nil, CaptureOutput{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var embedding []float32
|
|
||||||
rawMetadata := metadata.Fallback(t.capture)
|
rawMetadata := metadata.Fallback(t.capture)
|
||||||
metadataNeedsRetry := false
|
|
||||||
|
|
||||||
group, groupCtx := errgroup.WithContext(ctx)
|
|
||||||
group.Go(func() error {
|
|
||||||
vector, err := t.provider.Embed(groupCtx, content)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
embedding = vector
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
group.Go(func() error {
|
|
||||||
metaCtx := groupCtx
|
|
||||||
attemptedAt := time.Now().UTC()
|
|
||||||
if t.metadataTimeout > 0 {
|
|
||||||
var cancel context.CancelFunc
|
|
||||||
metaCtx, cancel = context.WithTimeout(groupCtx, t.metadataTimeout)
|
|
||||||
defer cancel()
|
|
||||||
}
|
|
||||||
extracted, err := t.provider.ExtractMetadata(metaCtx, content)
|
|
||||||
if err != nil {
|
|
||||||
t.log.Warn("metadata extraction failed, using fallback", slog.String("provider", t.provider.Name()), slog.String("error", err.Error()))
|
|
||||||
rawMetadata = metadata.MarkMetadataPending(rawMetadata, t.capture, attemptedAt, err)
|
|
||||||
metadataNeedsRetry = true
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
rawMetadata = metadata.MarkMetadataComplete(extracted, t.capture, attemptedAt)
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
|
|
||||||
if err := group.Wait(); err != nil {
|
|
||||||
return nil, CaptureOutput{}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
thought := thoughttypes.Thought{
|
thought := thoughttypes.Thought{
|
||||||
Content: content,
|
Content: content,
|
||||||
Embedding: embedding,
|
Metadata: rawMetadata,
|
||||||
Metadata: metadata.Normalize(metadata.SanitizeExtracted(rawMetadata), t.capture),
|
|
||||||
}
|
}
|
||||||
if project != nil {
|
if project != nil {
|
||||||
thought.ProjectID = &project.ID
|
thought.ProjectID = &project.ID
|
||||||
@@ -103,9 +73,57 @@ func (t *CaptureTool) Handle(ctx context.Context, req *mcp.CallToolRequest, in C
|
|||||||
if project != nil {
|
if project != nil {
|
||||||
_ = t.store.TouchProject(ctx, project.ID)
|
_ = t.store.TouchProject(ctx, project.ID)
|
||||||
}
|
}
|
||||||
if metadataNeedsRetry && t.retryer != nil {
|
|
||||||
t.retryer.QueueThought(created.ID)
|
if t.retryer != nil || t.embedRetryer != nil {
|
||||||
|
t.launchEnrichment(created.ID, content)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, CaptureOutput{Thought: created}, nil
|
return nil, CaptureOutput{Thought: created}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *CaptureTool) launchEnrichment(id uuid.UUID, content string) {
|
||||||
|
go func() {
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
if t.retryer != nil {
|
||||||
|
attemptedAt := time.Now().UTC()
|
||||||
|
rawMetadata := metadata.Fallback(t.capture)
|
||||||
|
extracted, err := t.provider.ExtractMetadata(ctx, content)
|
||||||
|
if err != nil {
|
||||||
|
failed := metadata.MarkMetadataFailed(rawMetadata, t.capture, attemptedAt, err)
|
||||||
|
if _, updateErr := t.store.UpdateThoughtMetadata(ctx, id, failed); updateErr != nil {
|
||||||
|
t.log.Warn("deferred metadata failure could not be persisted",
|
||||||
|
slog.String("thought_id", id.String()),
|
||||||
|
slog.String("error", updateErr.Error()),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
t.log.Warn("deferred metadata extraction failed",
|
||||||
|
slog.String("thought_id", id.String()),
|
||||||
|
slog.String("provider", t.provider.Name()),
|
||||||
|
slog.String("error", err.Error()),
|
||||||
|
)
|
||||||
|
t.retryer.QueueThought(id)
|
||||||
|
} else {
|
||||||
|
completed := metadata.MarkMetadataComplete(extracted, t.capture, attemptedAt)
|
||||||
|
if _, updateErr := t.store.UpdateThoughtMetadata(ctx, id, completed); updateErr != nil {
|
||||||
|
t.log.Warn("deferred metadata completion could not be persisted",
|
||||||
|
slog.String("thought_id", id.String()),
|
||||||
|
slog.String("error", updateErr.Error()),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if t.embedRetryer != nil {
|
||||||
|
if _, err := t.provider.Embed(ctx, content); err != nil {
|
||||||
|
t.log.Warn("deferred embedding failed",
|
||||||
|
slog.String("thought_id", id.String()),
|
||||||
|
slog.String("provider", t.provider.Name()),
|
||||||
|
slog.String("error", err.Error()),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
t.embedRetryer.QueueThought(ctx, id, content)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|||||||
209
internal/tools/enrichment_retry.go
Normal file
209
internal/tools/enrichment_retry.go
Normal file
@@ -0,0 +1,209 @@
|
|||||||
|
package tools
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"log/slog"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
|
"github.com/modelcontextprotocol/go-sdk/mcp"
|
||||||
|
"golang.org/x/sync/semaphore"
|
||||||
|
|
||||||
|
"git.warky.dev/wdevs/amcs/internal/ai"
|
||||||
|
"git.warky.dev/wdevs/amcs/internal/config"
|
||||||
|
"git.warky.dev/wdevs/amcs/internal/metadata"
|
||||||
|
"git.warky.dev/wdevs/amcs/internal/session"
|
||||||
|
"git.warky.dev/wdevs/amcs/internal/store"
|
||||||
|
thoughttypes "git.warky.dev/wdevs/amcs/internal/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
const enrichmentRetryConcurrency = 4
|
||||||
|
const enrichmentRetryMaxAttempts = 5
|
||||||
|
|
||||||
|
var enrichmentRetryBackoff = []time.Duration{
|
||||||
|
30 * time.Second,
|
||||||
|
2 * time.Minute,
|
||||||
|
10 * time.Minute,
|
||||||
|
30 * time.Minute,
|
||||||
|
2 * time.Hour,
|
||||||
|
}
|
||||||
|
|
||||||
|
type EnrichmentRetryer struct {
|
||||||
|
backgroundCtx context.Context
|
||||||
|
store *store.DB
|
||||||
|
provider ai.Provider
|
||||||
|
capture config.CaptureConfig
|
||||||
|
sessions *session.ActiveProjects
|
||||||
|
metadataTimeout time.Duration
|
||||||
|
logger *slog.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
type RetryEnrichmentTool struct {
|
||||||
|
retryer *EnrichmentRetryer
|
||||||
|
}
|
||||||
|
|
||||||
|
type RetryEnrichmentInput struct {
|
||||||
|
Project string `json:"project,omitempty" jsonschema:"optional project name or id to scope the retry"`
|
||||||
|
Limit int `json:"limit,omitempty" jsonschema:"maximum number of thoughts to process in one call; defaults to 100"`
|
||||||
|
IncludeArchived bool `json:"include_archived,omitempty" jsonschema:"whether to include archived thoughts; defaults to false"`
|
||||||
|
OlderThanDays int `json:"older_than_days,omitempty" jsonschema:"only retry thoughts whose last metadata attempt was at least N days ago; 0 means no restriction"`
|
||||||
|
DryRun bool `json:"dry_run,omitempty" jsonschema:"report counts without retrying metadata extraction"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type RetryEnrichmentFailure struct {
|
||||||
|
ID string `json:"id"`
|
||||||
|
Error string `json:"error"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type RetryEnrichmentOutput struct {
|
||||||
|
Scanned int `json:"scanned"`
|
||||||
|
Retried int `json:"retried"`
|
||||||
|
Updated int `json:"updated"`
|
||||||
|
Skipped int `json:"skipped"`
|
||||||
|
Failed int `json:"failed"`
|
||||||
|
DryRun bool `json:"dry_run"`
|
||||||
|
Failures []RetryEnrichmentFailure `json:"failures,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewEnrichmentRetryer(backgroundCtx context.Context, db *store.DB, provider ai.Provider, capture config.CaptureConfig, metadataTimeout time.Duration, sessions *session.ActiveProjects, logger *slog.Logger) *EnrichmentRetryer {
|
||||||
|
if backgroundCtx == nil {
|
||||||
|
backgroundCtx = context.Background()
|
||||||
|
}
|
||||||
|
return &EnrichmentRetryer{
|
||||||
|
backgroundCtx: backgroundCtx,
|
||||||
|
store: db,
|
||||||
|
provider: provider,
|
||||||
|
capture: capture,
|
||||||
|
sessions: sessions,
|
||||||
|
metadataTimeout: metadataTimeout,
|
||||||
|
logger: logger,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewRetryEnrichmentTool(retryer *EnrichmentRetryer) *RetryEnrichmentTool {
|
||||||
|
return &RetryEnrichmentTool{retryer: retryer}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *RetryEnrichmentTool) Handle(ctx context.Context, req *mcp.CallToolRequest, in RetryEnrichmentInput) (*mcp.CallToolResult, RetryEnrichmentOutput, error) {
|
||||||
|
return t.retryer.Handle(ctx, req, in)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *EnrichmentRetryer) QueueThought(id uuid.UUID) {
|
||||||
|
go func() {
|
||||||
|
if _, err := r.retryOne(r.backgroundCtx, id); err != nil {
|
||||||
|
r.logger.Warn("background metadata retry failed",
|
||||||
|
slog.String("thought_id", id.String()),
|
||||||
|
slog.String("error", err.Error()),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *EnrichmentRetryer) Handle(ctx context.Context, req *mcp.CallToolRequest, in RetryEnrichmentInput) (*mcp.CallToolResult, RetryEnrichmentOutput, error) {
|
||||||
|
limit := in.Limit
|
||||||
|
if limit <= 0 {
|
||||||
|
limit = 100
|
||||||
|
}
|
||||||
|
|
||||||
|
project, err := resolveProject(ctx, r.store, r.sessions, req, in.Project, false)
|
||||||
|
if err != nil {
|
||||||
|
return nil, RetryEnrichmentOutput{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var projectID *uuid.UUID
|
||||||
|
if project != nil {
|
||||||
|
projectID = &project.ID
|
||||||
|
}
|
||||||
|
|
||||||
|
thoughts, err := r.store.ListThoughtsPendingMetadataRetry(ctx, limit, projectID, in.IncludeArchived, in.OlderThanDays)
|
||||||
|
if err != nil {
|
||||||
|
return nil, RetryEnrichmentOutput{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
out := RetryEnrichmentOutput{Scanned: len(thoughts), DryRun: in.DryRun}
|
||||||
|
if in.DryRun || len(thoughts) == 0 {
|
||||||
|
return nil, out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
sem := semaphore.NewWeighted(enrichmentRetryConcurrency)
|
||||||
|
var mu sync.Mutex
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
for _, thought := range thoughts {
|
||||||
|
if ctx.Err() != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if err := sem.Acquire(ctx, 1); err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Add(1)
|
||||||
|
go func(thought thoughttypes.Thought) {
|
||||||
|
defer wg.Done()
|
||||||
|
defer sem.Release(1)
|
||||||
|
|
||||||
|
mu.Lock()
|
||||||
|
out.Retried++
|
||||||
|
mu.Unlock()
|
||||||
|
|
||||||
|
updated, err := r.retryOne(ctx, thought.ID)
|
||||||
|
if err != nil {
|
||||||
|
mu.Lock()
|
||||||
|
out.Failures = append(out.Failures, RetryEnrichmentFailure{ID: thought.ID.String(), Error: err.Error()})
|
||||||
|
mu.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if updated {
|
||||||
|
mu.Lock()
|
||||||
|
out.Updated++
|
||||||
|
mu.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
mu.Lock()
|
||||||
|
out.Skipped++
|
||||||
|
mu.Unlock()
|
||||||
|
}(thought)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
out.Failed = len(out.Failures)
|
||||||
|
|
||||||
|
return nil, out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *EnrichmentRetryer) retryOne(ctx context.Context, id uuid.UUID) (bool, error) {
|
||||||
|
thought, err := r.store.GetThought(ctx, id)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
if thought.Metadata.MetadataStatus == metadata.MetadataStatusComplete {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
attemptCtx := ctx
|
||||||
|
if r.metadataTimeout > 0 {
|
||||||
|
var cancel context.CancelFunc
|
||||||
|
attemptCtx, cancel = context.WithTimeout(ctx, r.metadataTimeout)
|
||||||
|
defer cancel()
|
||||||
|
}
|
||||||
|
|
||||||
|
attemptedAt := time.Now().UTC()
|
||||||
|
extracted, extractErr := r.provider.ExtractMetadata(attemptCtx, thought.Content)
|
||||||
|
if extractErr != nil {
|
||||||
|
failedMetadata := metadata.MarkMetadataFailed(thought.Metadata, r.capture, attemptedAt, extractErr)
|
||||||
|
if _, updateErr := r.store.UpdateThoughtMetadata(ctx, thought.ID, failedMetadata); updateErr != nil {
|
||||||
|
return false, updateErr
|
||||||
|
}
|
||||||
|
return false, extractErr
|
||||||
|
}
|
||||||
|
|
||||||
|
completedMetadata := metadata.MarkMetadataComplete(metadata.SanitizeExtracted(extracted), r.capture, attemptedAt)
|
||||||
|
completedMetadata.Attachments = thought.Metadata.Attachments
|
||||||
|
if _, updateErr := r.store.UpdateThoughtMetadata(ctx, thought.ID, completedMetadata); updateErr != nil {
|
||||||
|
return false, updateErr
|
||||||
|
}
|
||||||
|
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
@@ -28,12 +28,42 @@ type MetadataRetryer struct {
|
|||||||
sessions *session.ActiveProjects
|
sessions *session.ActiveProjects
|
||||||
metadataTimeout time.Duration
|
metadataTimeout time.Duration
|
||||||
logger *slog.Logger
|
logger *slog.Logger
|
||||||
|
lock *RetryLocker
|
||||||
}
|
}
|
||||||
|
|
||||||
type RetryMetadataTool struct {
|
type RetryMetadataTool struct {
|
||||||
retryer *MetadataRetryer
|
retryer *MetadataRetryer
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type RetryLocker struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
locks map[uuid.UUID]time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewRetryLocker() *RetryLocker {
|
||||||
|
return &RetryLocker{locks: map[uuid.UUID]time.Time{}}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *RetryLocker) Acquire(id uuid.UUID, ttl time.Duration) bool {
|
||||||
|
l.mu.Lock()
|
||||||
|
defer l.mu.Unlock()
|
||||||
|
if l.locks == nil {
|
||||||
|
l.locks = map[uuid.UUID]time.Time{}
|
||||||
|
}
|
||||||
|
now := time.Now()
|
||||||
|
if exp, ok := l.locks[id]; ok && exp.After(now) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
l.locks[id] = now.Add(ttl)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *RetryLocker) Release(id uuid.UUID) {
|
||||||
|
l.mu.Lock()
|
||||||
|
defer l.mu.Unlock()
|
||||||
|
delete(l.locks, id)
|
||||||
|
}
|
||||||
|
|
||||||
type RetryMetadataInput struct {
|
type RetryMetadataInput struct {
|
||||||
Project string `json:"project,omitempty" jsonschema:"optional project name or id to scope the retry"`
|
Project string `json:"project,omitempty" jsonschema:"optional project name or id to scope the retry"`
|
||||||
Limit int `json:"limit,omitempty" jsonschema:"maximum number of thoughts to process in one call; defaults to 100"`
|
Limit int `json:"limit,omitempty" jsonschema:"maximum number of thoughts to process in one call; defaults to 100"`
|
||||||
@@ -69,6 +99,7 @@ func NewMetadataRetryer(backgroundCtx context.Context, db *store.DB, provider ai
|
|||||||
sessions: sessions,
|
sessions: sessions,
|
||||||
metadataTimeout: metadataTimeout,
|
metadataTimeout: metadataTimeout,
|
||||||
logger: logger,
|
logger: logger,
|
||||||
|
lock: NewRetryLocker(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -82,6 +113,10 @@ func (t *RetryMetadataTool) Handle(ctx context.Context, req *mcp.CallToolRequest
|
|||||||
|
|
||||||
func (r *MetadataRetryer) QueueThought(id uuid.UUID) {
|
func (r *MetadataRetryer) QueueThought(id uuid.UUID) {
|
||||||
go func() {
|
go func() {
|
||||||
|
if !r.lock.Acquire(id, 15*time.Minute) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer r.lock.Release(id)
|
||||||
if _, err := r.retryOne(r.backgroundCtx, id); err != nil {
|
if _, err := r.retryOne(r.backgroundCtx, id); err != nil {
|
||||||
r.logger.Warn("background metadata retry failed", slog.String("thought_id", id.String()), slog.String("error", err.Error()))
|
r.logger.Warn("background metadata retry failed", slog.String("thought_id", id.String()), slog.String("error", err.Error()))
|
||||||
}
|
}
|
||||||
@@ -138,7 +173,14 @@ func (r *MetadataRetryer) Handle(ctx context.Context, req *mcp.CallToolRequest,
|
|||||||
out.Retried++
|
out.Retried++
|
||||||
mu.Unlock()
|
mu.Unlock()
|
||||||
|
|
||||||
|
if !r.lock.Acquire(thought.ID, 15*time.Minute) {
|
||||||
|
mu.Lock()
|
||||||
|
out.Skipped++
|
||||||
|
mu.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
updated, err := r.retryOne(ctx, thought.ID)
|
updated, err := r.retryOne(ctx, thought.ID)
|
||||||
|
r.lock.Release(thought.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
mu.Lock()
|
mu.Lock()
|
||||||
out.Failures = append(out.Failures, RetryMetadataFailure{ID: thought.ID.String(), Error: err.Error()})
|
out.Failures = append(out.Failures, RetryMetadataFailure{ID: thought.ID.String(), Error: err.Error()})
|
||||||
|
|||||||
@@ -55,6 +55,7 @@ type Thought struct {
|
|||||||
ID uuid.UUID `json:"id"`
|
ID uuid.UUID `json:"id"`
|
||||||
Content string `json:"content"`
|
Content string `json:"content"`
|
||||||
Embedding []float32 `json:"embedding,omitempty"`
|
Embedding []float32 `json:"embedding,omitempty"`
|
||||||
|
EmbeddingStatus string `json:"embedding_status,omitempty"`
|
||||||
Metadata ThoughtMetadata `json:"metadata"`
|
Metadata ThoughtMetadata `json:"metadata"`
|
||||||
ProjectID *uuid.UUID `json:"project_id,omitempty"`
|
ProjectID *uuid.UUID `json:"project_id,omitempty"`
|
||||||
ArchivedAt *time.Time `json:"archived_at,omitempty"`
|
ArchivedAt *time.Time `json:"archived_at,omitempty"`
|
||||||
|
|||||||
Reference in New Issue
Block a user