diff --git a/internal/app/app.go b/internal/app/app.go index df3a310..bd15e6c 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -163,9 +163,10 @@ func routes(logger *slog.Logger, cfg *config.Config, info buildinfo.Info, db *st authMiddleware := auth.Middleware(cfg.Auth, keyring, oauthRegistry, tokenStore, accessTracker, logger) filesTool := tools.NewFilesTool(db, activeProjects) metadataRetryer := tools.NewMetadataRetryer(context.Background(), db, provider, cfg.Capture, cfg.AI.Metadata.Timeout, activeProjects, logger) + backfillTool := tools.NewBackfillTool(db, provider, activeProjects, logger) toolSet := mcpserver.ToolSet{ - Capture: tools.NewCaptureTool(db, provider, cfg.Capture, cfg.AI.Metadata.Timeout, activeProjects, metadataRetryer, logger), + Capture: tools.NewCaptureTool(db, provider, cfg.Capture, cfg.AI.Metadata.Timeout, activeProjects, metadataRetryer, backfillTool, logger), Search: tools.NewSearchTool(db, provider, cfg.Search, activeProjects), List: tools.NewListTool(db, cfg.Search, activeProjects), Stats: tools.NewStatsTool(db), @@ -180,7 +181,7 @@ func routes(logger *slog.Logger, cfg *config.Config, info buildinfo.Info, db *st Summarize: tools.NewSummarizeTool(db, provider, cfg.Search, activeProjects), Links: tools.NewLinksTool(db, provider, cfg.Search), Files: filesTool, - Backfill: tools.NewBackfillTool(db, provider, activeProjects, logger), + Backfill: backfillTool, Reparse: tools.NewReparseMetadataTool(db, provider, cfg.Capture, activeProjects, logger), RetryMetadata: tools.NewRetryMetadataTool(metadataRetryer), Maintenance: tools.NewMaintenanceTool(db), diff --git a/internal/store/thoughts.go b/internal/store/thoughts.go index 2c18df4..1819a2e 100644 --- a/internal/store/thoughts.go +++ b/internal/store/thoughts.go @@ -58,6 +58,12 @@ func (db *DB) InsertThought(ctx context.Context, thought thoughttypes.Thought, e return thoughttypes.Thought{}, fmt.Errorf("commit thought insert: %w", err) } + if len(thought.Embedding) > 0 { + created.EmbeddingStatus = "done" + } else { + created.EmbeddingStatus = "pending" + } + return created, nil } diff --git a/internal/tools/backfill.go b/internal/tools/backfill.go index c92d9c7..521a9b7 100644 --- a/internal/tools/backfill.go +++ b/internal/tools/backfill.go @@ -51,6 +51,30 @@ func NewBackfillTool(db *store.DB, provider ai.Provider, sessions *session.Activ return &BackfillTool{store: db, provider: provider, sessions: sessions, logger: logger} } +// QueueThought queues a single thought for background embedding generation. +// It is used by capture when the embedding provider is temporarily unavailable. +func (t *BackfillTool) QueueThought(ctx context.Context, id uuid.UUID, content string) { + go func() { + vec, err := t.provider.Embed(ctx, content) + if err != nil { + t.logger.Warn("background embedding retry failed", + slog.String("thought_id", id.String()), + slog.String("error", err.Error()), + ) + return + } + model := t.provider.EmbeddingModel() + if err := t.store.UpsertEmbedding(ctx, id, model, vec); err != nil { + t.logger.Warn("background embedding upsert failed", + slog.String("thought_id", id.String()), + slog.String("error", err.Error()), + ) + return + } + t.logger.Info("background embedding retry succeeded", slog.String("thought_id", id.String())) + }() +} + func (t *BackfillTool) Handle(ctx context.Context, req *mcp.CallToolRequest, in BackfillInput) (*mcp.CallToolResult, BackfillOutput, error) { limit := in.Limit if limit <= 0 { diff --git a/internal/tools/capture.go b/internal/tools/capture.go index bd90bba..7483ec0 100644 --- a/internal/tools/capture.go +++ b/internal/tools/capture.go @@ -6,6 +6,7 @@ import ( "strings" "time" + "github.com/google/uuid" "github.com/modelcontextprotocol/go-sdk/mcp" "golang.org/x/sync/errgroup" @@ -17,6 +18,11 @@ import ( thoughttypes "git.warky.dev/wdevs/amcs/internal/types" ) +// EmbeddingQueuer queues a thought for background embedding generation. +type EmbeddingQueuer interface { + QueueThought(ctx context.Context, id uuid.UUID, content string) +} + type CaptureTool struct { store *store.DB provider ai.Provider @@ -24,6 +30,7 @@ type CaptureTool struct { sessions *session.ActiveProjects metadataTimeout time.Duration retryer *MetadataRetryer + embedRetryer EmbeddingQueuer log *slog.Logger } @@ -36,8 +43,8 @@ type CaptureOutput struct { Thought thoughttypes.Thought `json:"thought"` } -func NewCaptureTool(db *store.DB, provider ai.Provider, capture config.CaptureConfig, metadataTimeout time.Duration, sessions *session.ActiveProjects, retryer *MetadataRetryer, log *slog.Logger) *CaptureTool { - return &CaptureTool{store: db, provider: provider, capture: capture, sessions: sessions, metadataTimeout: metadataTimeout, retryer: retryer, log: log} +func NewCaptureTool(db *store.DB, provider ai.Provider, capture config.CaptureConfig, metadataTimeout time.Duration, sessions *session.ActiveProjects, retryer *MetadataRetryer, embedRetryer EmbeddingQueuer, log *slog.Logger) *CaptureTool { + return &CaptureTool{store: db, provider: provider, capture: capture, sessions: sessions, metadataTimeout: metadataTimeout, retryer: retryer, embedRetryer: embedRetryer, log: log} } func (t *CaptureTool) Handle(ctx context.Context, req *mcp.CallToolRequest, in CaptureInput) (*mcp.CallToolResult, CaptureOutput, error) { @@ -54,12 +61,18 @@ func (t *CaptureTool) Handle(ctx context.Context, req *mcp.CallToolRequest, in C var embedding []float32 rawMetadata := metadata.Fallback(t.capture) metadataNeedsRetry := false + embeddingNeedsRetry := false group, groupCtx := errgroup.WithContext(ctx) group.Go(func() error { vector, err := t.provider.Embed(groupCtx, content) if err != nil { - return err + t.log.Warn("embedding failed, thought will be saved without embedding", + slog.String("provider", t.provider.Name()), + slog.String("error", err.Error()), + ) + embeddingNeedsRetry = true + return nil } embedding = vector return nil @@ -106,6 +119,9 @@ func (t *CaptureTool) Handle(ctx context.Context, req *mcp.CallToolRequest, in C if metadataNeedsRetry && t.retryer != nil { t.retryer.QueueThought(created.ID) } + if embeddingNeedsRetry && t.embedRetryer != nil { + t.embedRetryer.QueueThought(ctx, created.ID, content) + } return nil, CaptureOutput{Thought: created}, nil } diff --git a/internal/types/thought.go b/internal/types/thought.go index 3a68021..e122e29 100644 --- a/internal/types/thought.go +++ b/internal/types/thought.go @@ -52,14 +52,15 @@ type StoredFileFilter struct { } type Thought struct { - ID uuid.UUID `json:"id"` - Content string `json:"content"` - Embedding []float32 `json:"embedding,omitempty"` - Metadata ThoughtMetadata `json:"metadata"` - ProjectID *uuid.UUID `json:"project_id,omitempty"` - ArchivedAt *time.Time `json:"archived_at,omitempty"` - CreatedAt time.Time `json:"created_at"` - UpdatedAt time.Time `json:"updated_at"` + ID uuid.UUID `json:"id"` + Content string `json:"content"` + Embedding []float32 `json:"embedding,omitempty"` + EmbeddingStatus string `json:"embedding_status,omitempty"` + Metadata ThoughtMetadata `json:"metadata"` + ProjectID *uuid.UUID `json:"project_id,omitempty"` + ArchivedAt *time.Time `json:"archived_at,omitempty"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` } type SearchResult struct {