feat(backfill): implement backfill tool for generating missing embeddings
This commit is contained in:
141
internal/tools/backfill.go
Normal file
141
internal/tools/backfill.go
Normal file
@@ -0,0 +1,141 @@
|
||||
package tools
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/modelcontextprotocol/go-sdk/mcp"
|
||||
"golang.org/x/sync/semaphore"
|
||||
|
||||
"git.warky.dev/wdevs/amcs/internal/ai"
|
||||
"git.warky.dev/wdevs/amcs/internal/session"
|
||||
"git.warky.dev/wdevs/amcs/internal/store"
|
||||
)
|
||||
|
||||
const backfillConcurrency = 4
|
||||
|
||||
type BackfillTool struct {
|
||||
store *store.DB
|
||||
provider ai.Provider
|
||||
sessions *session.ActiveProjects
|
||||
logger *slog.Logger
|
||||
}
|
||||
|
||||
type BackfillInput struct {
|
||||
Project string `json:"project,omitempty" jsonschema:"optional project name or id to scope the backfill"`
|
||||
Limit int `json:"limit,omitempty" jsonschema:"maximum number of thoughts to process in one call; defaults to 100"`
|
||||
IncludeArchived bool `json:"include_archived,omitempty" jsonschema:"whether to include archived thoughts; defaults to false"`
|
||||
OlderThanDays int `json:"older_than_days,omitempty" jsonschema:"only backfill thoughts older than N days; 0 means no restriction"`
|
||||
DryRun bool `json:"dry_run,omitempty" jsonschema:"report counts and sample ids without generating embeddings"`
|
||||
}
|
||||
|
||||
type BackfillFailure struct {
|
||||
ID string `json:"id"`
|
||||
Error string `json:"error"`
|
||||
}
|
||||
|
||||
type BackfillOutput struct {
|
||||
Model string `json:"model"`
|
||||
Scanned int `json:"scanned"`
|
||||
Embedded int `json:"embedded"`
|
||||
Skipped int `json:"skipped"`
|
||||
Failed int `json:"failed"`
|
||||
DryRun bool `json:"dry_run"`
|
||||
Failures []BackfillFailure `json:"failures,omitempty"`
|
||||
}
|
||||
|
||||
func NewBackfillTool(db *store.DB, provider ai.Provider, sessions *session.ActiveProjects, logger *slog.Logger) *BackfillTool {
|
||||
return &BackfillTool{store: db, provider: provider, sessions: sessions, logger: logger}
|
||||
}
|
||||
|
||||
func (t *BackfillTool) Handle(ctx context.Context, req *mcp.CallToolRequest, in BackfillInput) (*mcp.CallToolResult, BackfillOutput, error) {
|
||||
limit := in.Limit
|
||||
if limit <= 0 {
|
||||
limit = 100
|
||||
}
|
||||
|
||||
project, err := resolveProject(ctx, t.store, t.sessions, req, in.Project, false)
|
||||
if err != nil {
|
||||
return nil, BackfillOutput{}, err
|
||||
}
|
||||
|
||||
var projectID *uuid.UUID
|
||||
if project != nil {
|
||||
projectID = &project.ID
|
||||
}
|
||||
|
||||
model := t.provider.EmbeddingModel()
|
||||
|
||||
thoughts, err := t.store.ListThoughtsMissingEmbedding(ctx, model, limit, projectID, in.IncludeArchived, in.OlderThanDays)
|
||||
if err != nil {
|
||||
return nil, BackfillOutput{}, err
|
||||
}
|
||||
|
||||
out := BackfillOutput{
|
||||
Model: model,
|
||||
Scanned: len(thoughts),
|
||||
DryRun: in.DryRun,
|
||||
}
|
||||
|
||||
if in.DryRun || len(thoughts) == 0 {
|
||||
return nil, out, nil
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
sem := semaphore.NewWeighted(backfillConcurrency)
|
||||
var mu sync.Mutex
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for _, thought := range thoughts {
|
||||
if ctx.Err() != nil {
|
||||
break
|
||||
}
|
||||
if err := sem.Acquire(ctx, 1); err != nil {
|
||||
break
|
||||
}
|
||||
wg.Add(1)
|
||||
go func(id uuid.UUID, content string) {
|
||||
defer wg.Done()
|
||||
defer sem.Release(1)
|
||||
|
||||
vec, embedErr := t.provider.Embed(ctx, content)
|
||||
if embedErr != nil {
|
||||
mu.Lock()
|
||||
out.Failures = append(out.Failures, BackfillFailure{ID: id.String(), Error: embedErr.Error()})
|
||||
mu.Unlock()
|
||||
t.logger.Warn("backfill embed failed", slog.String("thought_id", id.String()), slog.String("error", embedErr.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
if upsertErr := t.store.UpsertEmbedding(ctx, id, model, vec); upsertErr != nil {
|
||||
mu.Lock()
|
||||
out.Failures = append(out.Failures, BackfillFailure{ID: id.String(), Error: upsertErr.Error()})
|
||||
mu.Unlock()
|
||||
t.logger.Warn("backfill upsert failed", slog.String("thought_id", id.String()), slog.String("error", upsertErr.Error()))
|
||||
return
|
||||
}
|
||||
|
||||
mu.Lock()
|
||||
out.Embedded++
|
||||
mu.Unlock()
|
||||
}(thought.ID, thought.Content)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
out.Failed = len(out.Failures)
|
||||
out.Skipped = out.Scanned - out.Embedded - out.Failed
|
||||
|
||||
t.logger.Info("backfill completed",
|
||||
slog.String("model", model),
|
||||
slog.Int("scanned", out.Scanned),
|
||||
slog.Int("embedded", out.Embedded),
|
||||
slog.Int("failed", out.Failed),
|
||||
slog.Duration("duration", time.Since(start)),
|
||||
)
|
||||
|
||||
return nil, out, nil
|
||||
}
|
||||
@@ -72,11 +72,7 @@ func (t *ContextTool) Handle(ctx context.Context, req *mcp.CallToolRequest, in P
|
||||
|
||||
query := strings.TrimSpace(in.Query)
|
||||
if query != "" {
|
||||
embedding, err := t.provider.Embed(ctx, query)
|
||||
if err != nil {
|
||||
return nil, ProjectContextOutput{}, err
|
||||
}
|
||||
semantic, err := t.store.SearchSimilarThoughts(ctx, embedding, t.provider.EmbeddingModel(), t.search.DefaultThreshold, limit, &project.ID, nil)
|
||||
semantic, err := semanticSearch(ctx, t.store, t.provider, t.search, query, limit, t.search.DefaultThreshold, &project.ID, nil)
|
||||
if err != nil {
|
||||
return nil, ProjectContextOutput{}, err
|
||||
}
|
||||
|
||||
@@ -117,11 +117,7 @@ func (t *LinksTool) Related(ctx context.Context, _ *mcp.CallToolRequest, in Rela
|
||||
}
|
||||
|
||||
if includeSemantic {
|
||||
embedding, err := t.provider.Embed(ctx, thought.Content)
|
||||
if err != nil {
|
||||
return nil, RelatedOutput{}, err
|
||||
}
|
||||
semantic, err := t.store.SearchSimilarThoughts(ctx, embedding, t.provider.EmbeddingModel(), t.search.DefaultThreshold, t.search.DefaultLimit, thought.ProjectID, &thought.ID)
|
||||
semantic, err := semanticSearch(ctx, t.store, t.provider, t.search, thought.Content, t.search.DefaultLimit, t.search.DefaultThreshold, thought.ProjectID, &thought.ID)
|
||||
if err != nil {
|
||||
return nil, RelatedOutput{}, err
|
||||
}
|
||||
|
||||
@@ -48,17 +48,13 @@ func (t *RecallTool) Handle(ctx context.Context, req *mcp.CallToolRequest, in Re
|
||||
}
|
||||
|
||||
limit := normalizeLimit(in.Limit, t.search)
|
||||
embedding, err := t.provider.Embed(ctx, query)
|
||||
if err != nil {
|
||||
return nil, RecallOutput{}, err
|
||||
}
|
||||
|
||||
var projectID *uuid.UUID
|
||||
if project != nil {
|
||||
projectID = &project.ID
|
||||
}
|
||||
|
||||
semantic, err := t.store.SearchSimilarThoughts(ctx, embedding, t.provider.EmbeddingModel(), t.search.DefaultThreshold, limit, projectID, nil)
|
||||
semantic, err := semanticSearch(ctx, t.store, t.provider, t.search, query, limit, t.search.DefaultThreshold, projectID, nil)
|
||||
if err != nil {
|
||||
return nil, RecallOutput{}, err
|
||||
}
|
||||
|
||||
41
internal/tools/retrieval.go
Normal file
41
internal/tools/retrieval.go
Normal file
@@ -0,0 +1,41 @@
|
||||
package tools
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/google/uuid"
|
||||
|
||||
"git.warky.dev/wdevs/amcs/internal/ai"
|
||||
"git.warky.dev/wdevs/amcs/internal/config"
|
||||
"git.warky.dev/wdevs/amcs/internal/store"
|
||||
thoughttypes "git.warky.dev/wdevs/amcs/internal/types"
|
||||
)
|
||||
|
||||
// semanticSearch runs vector similarity search if embeddings exist for the active model
|
||||
// in the given scope, otherwise falls back to Postgres full-text search.
|
||||
func semanticSearch(
|
||||
ctx context.Context,
|
||||
db *store.DB,
|
||||
provider ai.Provider,
|
||||
search config.SearchConfig,
|
||||
query string,
|
||||
limit int,
|
||||
threshold float64,
|
||||
projectID *uuid.UUID,
|
||||
excludeID *uuid.UUID,
|
||||
) ([]thoughttypes.SearchResult, error) {
|
||||
hasEmbeddings, err := db.HasEmbeddingsForModel(ctx, provider.EmbeddingModel(), projectID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if hasEmbeddings {
|
||||
embedding, err := provider.Embed(ctx, query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return db.SearchSimilarThoughts(ctx, embedding, provider.EmbeddingModel(), threshold, limit, projectID, excludeID)
|
||||
}
|
||||
|
||||
return db.SearchThoughtsText(ctx, query, limit, projectID, excludeID)
|
||||
}
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"strings"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/modelcontextprotocol/go-sdk/mcp"
|
||||
|
||||
"git.warky.dev/wdevs/amcs/internal/ai"
|
||||
@@ -44,24 +45,18 @@ func (t *SearchTool) Handle(ctx context.Context, req *mcp.CallToolRequest, in Se
|
||||
limit := normalizeLimit(in.Limit, t.search)
|
||||
threshold := normalizeThreshold(in.Threshold, t.search.DefaultThreshold)
|
||||
|
||||
embedding, err := t.provider.Embed(ctx, query)
|
||||
if err != nil {
|
||||
return nil, SearchOutput{}, err
|
||||
}
|
||||
|
||||
project, err := resolveProject(ctx, t.store, t.sessions, req, in.Project, false)
|
||||
if err != nil {
|
||||
return nil, SearchOutput{}, err
|
||||
}
|
||||
|
||||
model := t.provider.EmbeddingModel()
|
||||
var results []thoughttypes.SearchResult
|
||||
var projectID *uuid.UUID
|
||||
if project != nil {
|
||||
results, err = t.store.SearchSimilarThoughts(ctx, embedding, model, threshold, limit, &project.ID, nil)
|
||||
projectID = &project.ID
|
||||
_ = t.store.TouchProject(ctx, project.ID)
|
||||
} else {
|
||||
results, err = t.store.SearchThoughts(ctx, embedding, model, threshold, limit, map[string]any{})
|
||||
}
|
||||
|
||||
results, err := semanticSearch(ctx, t.store, t.provider, t.search, query, limit, threshold, projectID, nil)
|
||||
if err != nil {
|
||||
return nil, SearchOutput{}, err
|
||||
}
|
||||
|
||||
@@ -48,15 +48,11 @@ func (t *SummarizeTool) Handle(ctx context.Context, req *mcp.CallToolRequest, in
|
||||
count := 0
|
||||
|
||||
if query != "" {
|
||||
embedding, err := t.provider.Embed(ctx, query)
|
||||
if err != nil {
|
||||
return nil, SummarizeOutput{}, err
|
||||
}
|
||||
var projectID *uuid.UUID
|
||||
if project != nil {
|
||||
projectID = &project.ID
|
||||
}
|
||||
results, err := t.store.SearchSimilarThoughts(ctx, embedding, t.provider.EmbeddingModel(), t.search.DefaultThreshold, limit, projectID, nil)
|
||||
results, err := semanticSearch(ctx, t.store, t.provider, t.search, query, limit, t.search.DefaultThreshold, projectID, nil)
|
||||
if err != nil {
|
||||
return nil, SummarizeOutput{}, err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user