Files
amcs/internal/tools/backfill.go
Hein 4d107cb87e
Some checks failed
CI / build-and-test (push) Failing after -29m22s
feat(tools): add background embedding queue for thoughts
* Implement QueueThought method in BackfillTool for embedding generation
* Update CaptureTool to utilize embedding queuer for failed embeddings
* Add EmbeddingStatus field to Thought type for tracking embedding state
2026-04-11 23:37:53 +02:00

166 lines
4.8 KiB
Go

package tools
import (
"context"
"log/slog"
"sync"
"time"
"github.com/google/uuid"
"github.com/modelcontextprotocol/go-sdk/mcp"
"golang.org/x/sync/semaphore"
"git.warky.dev/wdevs/amcs/internal/ai"
"git.warky.dev/wdevs/amcs/internal/session"
"git.warky.dev/wdevs/amcs/internal/store"
)
const backfillConcurrency = 4
type BackfillTool struct {
store *store.DB
provider ai.Provider
sessions *session.ActiveProjects
logger *slog.Logger
}
type BackfillInput struct {
Project string `json:"project,omitempty" jsonschema:"optional project name or id to scope the backfill"`
Limit int `json:"limit,omitempty" jsonschema:"maximum number of thoughts to process in one call; defaults to 100"`
IncludeArchived bool `json:"include_archived,omitempty" jsonschema:"whether to include archived thoughts; defaults to false"`
OlderThanDays int `json:"older_than_days,omitempty" jsonschema:"only backfill thoughts older than N days; 0 means no restriction"`
DryRun bool `json:"dry_run,omitempty" jsonschema:"report counts and sample ids without generating embeddings"`
}
type BackfillFailure struct {
ID string `json:"id"`
Error string `json:"error"`
}
type BackfillOutput struct {
Model string `json:"model"`
Scanned int `json:"scanned"`
Embedded int `json:"embedded"`
Skipped int `json:"skipped"`
Failed int `json:"failed"`
DryRun bool `json:"dry_run"`
Failures []BackfillFailure `json:"failures,omitempty"`
}
func NewBackfillTool(db *store.DB, provider ai.Provider, sessions *session.ActiveProjects, logger *slog.Logger) *BackfillTool {
return &BackfillTool{store: db, provider: provider, sessions: sessions, logger: logger}
}
// QueueThought queues a single thought for background embedding generation.
// It is used by capture when the embedding provider is temporarily unavailable.
func (t *BackfillTool) QueueThought(ctx context.Context, id uuid.UUID, content string) {
go func() {
vec, err := t.provider.Embed(ctx, content)
if err != nil {
t.logger.Warn("background embedding retry failed",
slog.String("thought_id", id.String()),
slog.String("error", err.Error()),
)
return
}
model := t.provider.EmbeddingModel()
if err := t.store.UpsertEmbedding(ctx, id, model, vec); err != nil {
t.logger.Warn("background embedding upsert failed",
slog.String("thought_id", id.String()),
slog.String("error", err.Error()),
)
return
}
t.logger.Info("background embedding retry succeeded", slog.String("thought_id", id.String()))
}()
}
func (t *BackfillTool) Handle(ctx context.Context, req *mcp.CallToolRequest, in BackfillInput) (*mcp.CallToolResult, BackfillOutput, error) {
limit := in.Limit
if limit <= 0 {
limit = 100
}
project, err := resolveProject(ctx, t.store, t.sessions, req, in.Project, false)
if err != nil {
return nil, BackfillOutput{}, err
}
var projectID *uuid.UUID
if project != nil {
projectID = &project.ID
}
model := t.provider.EmbeddingModel()
thoughts, err := t.store.ListThoughtsMissingEmbedding(ctx, model, limit, projectID, in.IncludeArchived, in.OlderThanDays)
if err != nil {
return nil, BackfillOutput{}, err
}
out := BackfillOutput{
Model: model,
Scanned: len(thoughts),
DryRun: in.DryRun,
}
if in.DryRun || len(thoughts) == 0 {
return nil, out, nil
}
start := time.Now()
sem := semaphore.NewWeighted(backfillConcurrency)
var mu sync.Mutex
var wg sync.WaitGroup
for _, thought := range thoughts {
if ctx.Err() != nil {
break
}
if err := sem.Acquire(ctx, 1); err != nil {
break
}
wg.Add(1)
go func(id uuid.UUID, content string) {
defer wg.Done()
defer sem.Release(1)
vec, embedErr := t.provider.Embed(ctx, content)
if embedErr != nil {
mu.Lock()
out.Failures = append(out.Failures, BackfillFailure{ID: id.String(), Error: embedErr.Error()})
mu.Unlock()
t.logger.Warn("backfill embed failed", slog.String("thought_id", id.String()), slog.String("error", embedErr.Error()))
return
}
if upsertErr := t.store.UpsertEmbedding(ctx, id, model, vec); upsertErr != nil {
mu.Lock()
out.Failures = append(out.Failures, BackfillFailure{ID: id.String(), Error: upsertErr.Error()})
mu.Unlock()
t.logger.Warn("backfill upsert failed", slog.String("thought_id", id.String()), slog.String("error", upsertErr.Error()))
return
}
mu.Lock()
out.Embedded++
mu.Unlock()
}(thought.ID, thought.Content)
}
wg.Wait()
out.Failed = len(out.Failures)
out.Skipped = out.Scanned - out.Embedded - out.Failed
t.logger.Info("backfill completed",
slog.String("model", model),
slog.Int("scanned", out.Scanned),
slog.Int("embedded", out.Embedded),
slog.Int("failed", out.Failed),
slog.Duration("duration", time.Since(start)),
)
return nil, out, nil
}