feat(tools): add background embedding queue for thoughts
Some checks failed
CI / build-and-test (push) Failing after -29m22s
Some checks failed
CI / build-and-test (push) Failing after -29m22s
* Implement QueueThought method in BackfillTool for embedding generation * Update CaptureTool to utilize embedding queuer for failed embeddings * Add EmbeddingStatus field to Thought type for tracking embedding state
This commit is contained in:
@@ -163,9 +163,10 @@ func routes(logger *slog.Logger, cfg *config.Config, info buildinfo.Info, db *st
|
|||||||
authMiddleware := auth.Middleware(cfg.Auth, keyring, oauthRegistry, tokenStore, accessTracker, logger)
|
authMiddleware := auth.Middleware(cfg.Auth, keyring, oauthRegistry, tokenStore, accessTracker, logger)
|
||||||
filesTool := tools.NewFilesTool(db, activeProjects)
|
filesTool := tools.NewFilesTool(db, activeProjects)
|
||||||
metadataRetryer := tools.NewMetadataRetryer(context.Background(), db, provider, cfg.Capture, cfg.AI.Metadata.Timeout, activeProjects, logger)
|
metadataRetryer := tools.NewMetadataRetryer(context.Background(), db, provider, cfg.Capture, cfg.AI.Metadata.Timeout, activeProjects, logger)
|
||||||
|
backfillTool := tools.NewBackfillTool(db, provider, activeProjects, logger)
|
||||||
|
|
||||||
toolSet := mcpserver.ToolSet{
|
toolSet := mcpserver.ToolSet{
|
||||||
Capture: tools.NewCaptureTool(db, provider, cfg.Capture, cfg.AI.Metadata.Timeout, activeProjects, metadataRetryer, logger),
|
Capture: tools.NewCaptureTool(db, provider, cfg.Capture, cfg.AI.Metadata.Timeout, activeProjects, metadataRetryer, backfillTool, logger),
|
||||||
Search: tools.NewSearchTool(db, provider, cfg.Search, activeProjects),
|
Search: tools.NewSearchTool(db, provider, cfg.Search, activeProjects),
|
||||||
List: tools.NewListTool(db, cfg.Search, activeProjects),
|
List: tools.NewListTool(db, cfg.Search, activeProjects),
|
||||||
Stats: tools.NewStatsTool(db),
|
Stats: tools.NewStatsTool(db),
|
||||||
@@ -180,7 +181,7 @@ func routes(logger *slog.Logger, cfg *config.Config, info buildinfo.Info, db *st
|
|||||||
Summarize: tools.NewSummarizeTool(db, provider, cfg.Search, activeProjects),
|
Summarize: tools.NewSummarizeTool(db, provider, cfg.Search, activeProjects),
|
||||||
Links: tools.NewLinksTool(db, provider, cfg.Search),
|
Links: tools.NewLinksTool(db, provider, cfg.Search),
|
||||||
Files: filesTool,
|
Files: filesTool,
|
||||||
Backfill: tools.NewBackfillTool(db, provider, activeProjects, logger),
|
Backfill: backfillTool,
|
||||||
Reparse: tools.NewReparseMetadataTool(db, provider, cfg.Capture, activeProjects, logger),
|
Reparse: tools.NewReparseMetadataTool(db, provider, cfg.Capture, activeProjects, logger),
|
||||||
RetryMetadata: tools.NewRetryMetadataTool(metadataRetryer),
|
RetryMetadata: tools.NewRetryMetadataTool(metadataRetryer),
|
||||||
Maintenance: tools.NewMaintenanceTool(db),
|
Maintenance: tools.NewMaintenanceTool(db),
|
||||||
|
|||||||
@@ -58,6 +58,12 @@ func (db *DB) InsertThought(ctx context.Context, thought thoughttypes.Thought, e
|
|||||||
return thoughttypes.Thought{}, fmt.Errorf("commit thought insert: %w", err)
|
return thoughttypes.Thought{}, fmt.Errorf("commit thought insert: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(thought.Embedding) > 0 {
|
||||||
|
created.EmbeddingStatus = "done"
|
||||||
|
} else {
|
||||||
|
created.EmbeddingStatus = "pending"
|
||||||
|
}
|
||||||
|
|
||||||
return created, nil
|
return created, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -51,6 +51,30 @@ func NewBackfillTool(db *store.DB, provider ai.Provider, sessions *session.Activ
|
|||||||
return &BackfillTool{store: db, provider: provider, sessions: sessions, logger: logger}
|
return &BackfillTool{store: db, provider: provider, sessions: sessions, logger: logger}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// QueueThought queues a single thought for background embedding generation.
|
||||||
|
// It is used by capture when the embedding provider is temporarily unavailable.
|
||||||
|
func (t *BackfillTool) QueueThought(ctx context.Context, id uuid.UUID, content string) {
|
||||||
|
go func() {
|
||||||
|
vec, err := t.provider.Embed(ctx, content)
|
||||||
|
if err != nil {
|
||||||
|
t.logger.Warn("background embedding retry failed",
|
||||||
|
slog.String("thought_id", id.String()),
|
||||||
|
slog.String("error", err.Error()),
|
||||||
|
)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
model := t.provider.EmbeddingModel()
|
||||||
|
if err := t.store.UpsertEmbedding(ctx, id, model, vec); err != nil {
|
||||||
|
t.logger.Warn("background embedding upsert failed",
|
||||||
|
slog.String("thought_id", id.String()),
|
||||||
|
slog.String("error", err.Error()),
|
||||||
|
)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
t.logger.Info("background embedding retry succeeded", slog.String("thought_id", id.String()))
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
func (t *BackfillTool) Handle(ctx context.Context, req *mcp.CallToolRequest, in BackfillInput) (*mcp.CallToolResult, BackfillOutput, error) {
|
func (t *BackfillTool) Handle(ctx context.Context, req *mcp.CallToolRequest, in BackfillInput) (*mcp.CallToolResult, BackfillOutput, error) {
|
||||||
limit := in.Limit
|
limit := in.Limit
|
||||||
if limit <= 0 {
|
if limit <= 0 {
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
"github.com/modelcontextprotocol/go-sdk/mcp"
|
"github.com/modelcontextprotocol/go-sdk/mcp"
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
|
|
||||||
@@ -17,6 +18,11 @@ import (
|
|||||||
thoughttypes "git.warky.dev/wdevs/amcs/internal/types"
|
thoughttypes "git.warky.dev/wdevs/amcs/internal/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// EmbeddingQueuer queues a thought for background embedding generation.
|
||||||
|
type EmbeddingQueuer interface {
|
||||||
|
QueueThought(ctx context.Context, id uuid.UUID, content string)
|
||||||
|
}
|
||||||
|
|
||||||
type CaptureTool struct {
|
type CaptureTool struct {
|
||||||
store *store.DB
|
store *store.DB
|
||||||
provider ai.Provider
|
provider ai.Provider
|
||||||
@@ -24,6 +30,7 @@ type CaptureTool struct {
|
|||||||
sessions *session.ActiveProjects
|
sessions *session.ActiveProjects
|
||||||
metadataTimeout time.Duration
|
metadataTimeout time.Duration
|
||||||
retryer *MetadataRetryer
|
retryer *MetadataRetryer
|
||||||
|
embedRetryer EmbeddingQueuer
|
||||||
log *slog.Logger
|
log *slog.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -36,8 +43,8 @@ type CaptureOutput struct {
|
|||||||
Thought thoughttypes.Thought `json:"thought"`
|
Thought thoughttypes.Thought `json:"thought"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewCaptureTool(db *store.DB, provider ai.Provider, capture config.CaptureConfig, metadataTimeout time.Duration, sessions *session.ActiveProjects, retryer *MetadataRetryer, log *slog.Logger) *CaptureTool {
|
func NewCaptureTool(db *store.DB, provider ai.Provider, capture config.CaptureConfig, metadataTimeout time.Duration, sessions *session.ActiveProjects, retryer *MetadataRetryer, embedRetryer EmbeddingQueuer, log *slog.Logger) *CaptureTool {
|
||||||
return &CaptureTool{store: db, provider: provider, capture: capture, sessions: sessions, metadataTimeout: metadataTimeout, retryer: retryer, log: log}
|
return &CaptureTool{store: db, provider: provider, capture: capture, sessions: sessions, metadataTimeout: metadataTimeout, retryer: retryer, embedRetryer: embedRetryer, log: log}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *CaptureTool) Handle(ctx context.Context, req *mcp.CallToolRequest, in CaptureInput) (*mcp.CallToolResult, CaptureOutput, error) {
|
func (t *CaptureTool) Handle(ctx context.Context, req *mcp.CallToolRequest, in CaptureInput) (*mcp.CallToolResult, CaptureOutput, error) {
|
||||||
@@ -54,12 +61,18 @@ func (t *CaptureTool) Handle(ctx context.Context, req *mcp.CallToolRequest, in C
|
|||||||
var embedding []float32
|
var embedding []float32
|
||||||
rawMetadata := metadata.Fallback(t.capture)
|
rawMetadata := metadata.Fallback(t.capture)
|
||||||
metadataNeedsRetry := false
|
metadataNeedsRetry := false
|
||||||
|
embeddingNeedsRetry := false
|
||||||
|
|
||||||
group, groupCtx := errgroup.WithContext(ctx)
|
group, groupCtx := errgroup.WithContext(ctx)
|
||||||
group.Go(func() error {
|
group.Go(func() error {
|
||||||
vector, err := t.provider.Embed(groupCtx, content)
|
vector, err := t.provider.Embed(groupCtx, content)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
t.log.Warn("embedding failed, thought will be saved without embedding",
|
||||||
|
slog.String("provider", t.provider.Name()),
|
||||||
|
slog.String("error", err.Error()),
|
||||||
|
)
|
||||||
|
embeddingNeedsRetry = true
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
embedding = vector
|
embedding = vector
|
||||||
return nil
|
return nil
|
||||||
@@ -106,6 +119,9 @@ func (t *CaptureTool) Handle(ctx context.Context, req *mcp.CallToolRequest, in C
|
|||||||
if metadataNeedsRetry && t.retryer != nil {
|
if metadataNeedsRetry && t.retryer != nil {
|
||||||
t.retryer.QueueThought(created.ID)
|
t.retryer.QueueThought(created.ID)
|
||||||
}
|
}
|
||||||
|
if embeddingNeedsRetry && t.embedRetryer != nil {
|
||||||
|
t.embedRetryer.QueueThought(ctx, created.ID, content)
|
||||||
|
}
|
||||||
|
|
||||||
return nil, CaptureOutput{Thought: created}, nil
|
return nil, CaptureOutput{Thought: created}, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -52,14 +52,15 @@ type StoredFileFilter struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type Thought struct {
|
type Thought struct {
|
||||||
ID uuid.UUID `json:"id"`
|
ID uuid.UUID `json:"id"`
|
||||||
Content string `json:"content"`
|
Content string `json:"content"`
|
||||||
Embedding []float32 `json:"embedding,omitempty"`
|
Embedding []float32 `json:"embedding,omitempty"`
|
||||||
Metadata ThoughtMetadata `json:"metadata"`
|
EmbeddingStatus string `json:"embedding_status,omitempty"`
|
||||||
ProjectID *uuid.UUID `json:"project_id,omitempty"`
|
Metadata ThoughtMetadata `json:"metadata"`
|
||||||
ArchivedAt *time.Time `json:"archived_at,omitempty"`
|
ProjectID *uuid.UUID `json:"project_id,omitempty"`
|
||||||
CreatedAt time.Time `json:"created_at"`
|
ArchivedAt *time.Time `json:"archived_at,omitempty"`
|
||||||
UpdatedAt time.Time `json:"updated_at"`
|
CreatedAt time.Time `json:"created_at"`
|
||||||
|
UpdatedAt time.Time `json:"updated_at"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type SearchResult struct {
|
type SearchResult struct {
|
||||||
|
|||||||
Reference in New Issue
Block a user