Some checks failed
CI / build-and-test (push) Failing after -31m12s
All internal entity lookups now use bigserial primary keys (int64) while GUIDs are retained for external/public identification. Updated store functions (TouchProject, UpdateThoughtMetadata, AddThoughtAttachment) to query by id instead of guid, added GetThoughtByID, changed semanticSearch and all tool helpers to use *int64 project IDs, and updated retry/backfill workers to use int64 thought IDs throughout.
266 lines
7.2 KiB
Go
266 lines
7.2 KiB
Go
package tools
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/modelcontextprotocol/go-sdk/mcp"
|
|
"golang.org/x/sync/semaphore"
|
|
|
|
"git.warky.dev/wdevs/amcs/internal/ai"
|
|
"git.warky.dev/wdevs/amcs/internal/config"
|
|
"git.warky.dev/wdevs/amcs/internal/metadata"
|
|
"git.warky.dev/wdevs/amcs/internal/session"
|
|
"git.warky.dev/wdevs/amcs/internal/store"
|
|
thoughttypes "git.warky.dev/wdevs/amcs/internal/types"
|
|
)
|
|
|
|
const metadataRetryConcurrency = 4
|
|
|
|
type MetadataRetryer struct {
|
|
backgroundCtx context.Context
|
|
store *store.DB
|
|
metadata *ai.MetadataRunner
|
|
capture config.CaptureConfig
|
|
sessions *session.ActiveProjects
|
|
metadataTimeout time.Duration
|
|
logger *slog.Logger
|
|
lock *RetryLocker
|
|
}
|
|
|
|
type RetryMetadataTool struct {
|
|
retryer *MetadataRetryer
|
|
}
|
|
|
|
type RetryLocker struct {
|
|
mu sync.Mutex
|
|
locks map[int64]time.Time
|
|
}
|
|
|
|
func NewRetryLocker() *RetryLocker {
|
|
return &RetryLocker{locks: map[int64]time.Time{}}
|
|
}
|
|
|
|
func (l *RetryLocker) Acquire(id int64, ttl time.Duration) bool {
|
|
l.mu.Lock()
|
|
defer l.mu.Unlock()
|
|
if l.locks == nil {
|
|
l.locks = map[int64]time.Time{}
|
|
}
|
|
now := time.Now()
|
|
if exp, ok := l.locks[id]; ok && exp.After(now) {
|
|
return false
|
|
}
|
|
l.locks[id] = now.Add(ttl)
|
|
return true
|
|
}
|
|
|
|
func (l *RetryLocker) Release(id int64) {
|
|
l.mu.Lock()
|
|
defer l.mu.Unlock()
|
|
delete(l.locks, id)
|
|
}
|
|
|
|
type RetryMetadataInput struct {
|
|
Project string `json:"project,omitempty" jsonschema:"optional project name or id to scope the retry"`
|
|
Limit int `json:"limit,omitempty" jsonschema:"maximum number of thoughts to process in one call; defaults to 100"`
|
|
IncludeArchived bool `json:"include_archived,omitempty" jsonschema:"whether to include archived thoughts; defaults to false"`
|
|
OlderThanDays int `json:"older_than_days,omitempty" jsonschema:"only retry thoughts whose last metadata attempt was at least N days ago; 0 means no restriction"`
|
|
DryRun bool `json:"dry_run,omitempty" jsonschema:"report counts without retrying metadata extraction"`
|
|
}
|
|
|
|
type RetryMetadataFailure struct {
|
|
ID string `json:"id"`
|
|
Error string `json:"error"`
|
|
}
|
|
|
|
type RetryMetadataOutput struct {
|
|
Scanned int `json:"scanned"`
|
|
Retried int `json:"retried"`
|
|
Updated int `json:"updated"`
|
|
Skipped int `json:"skipped"`
|
|
Failed int `json:"failed"`
|
|
DryRun bool `json:"dry_run"`
|
|
Failures []RetryMetadataFailure `json:"failures,omitempty"`
|
|
}
|
|
|
|
func NewMetadataRetryer(backgroundCtx context.Context, db *store.DB, metadataRunner *ai.MetadataRunner, capture config.CaptureConfig, metadataTimeout time.Duration, sessions *session.ActiveProjects, logger *slog.Logger) *MetadataRetryer {
|
|
if backgroundCtx == nil {
|
|
backgroundCtx = context.Background()
|
|
}
|
|
return &MetadataRetryer{
|
|
backgroundCtx: backgroundCtx,
|
|
store: db,
|
|
metadata: metadataRunner,
|
|
capture: capture,
|
|
sessions: sessions,
|
|
metadataTimeout: metadataTimeout,
|
|
logger: logger,
|
|
lock: NewRetryLocker(),
|
|
}
|
|
}
|
|
|
|
func NewRetryMetadataTool(retryer *MetadataRetryer) *RetryMetadataTool {
|
|
return &RetryMetadataTool{retryer: retryer}
|
|
}
|
|
|
|
func (t *RetryMetadataTool) Handle(ctx context.Context, req *mcp.CallToolRequest, in RetryMetadataInput) (*mcp.CallToolResult, RetryMetadataOutput, error) {
|
|
return t.retryer.Handle(ctx, req, in)
|
|
}
|
|
|
|
func (r *MetadataRetryer) QueueThought(id int64) {
|
|
go func() {
|
|
started := time.Now()
|
|
idStr := fmt.Sprint(id)
|
|
if !r.lock.Acquire(id, 15*time.Minute) {
|
|
return
|
|
}
|
|
defer r.lock.Release(id)
|
|
|
|
r.logger.Info("background metadata started",
|
|
slog.String("thought_id", idStr),
|
|
slog.String("provider", r.metadata.PrimaryProvider()),
|
|
slog.String("model", r.metadata.PrimaryModel()),
|
|
)
|
|
updated, err := r.retryOne(r.backgroundCtx, id)
|
|
if err != nil {
|
|
r.logger.Warn("background metadata error",
|
|
slog.String("thought_id", idStr),
|
|
slog.String("provider", r.metadata.PrimaryProvider()),
|
|
slog.String("model", r.metadata.PrimaryModel()),
|
|
slog.Duration("duration", time.Since(started)),
|
|
slog.String("error", err.Error()),
|
|
)
|
|
return
|
|
}
|
|
r.logger.Info("background metadata complete",
|
|
slog.String("thought_id", idStr),
|
|
slog.String("provider", r.metadata.PrimaryProvider()),
|
|
slog.String("model", r.metadata.PrimaryModel()),
|
|
slog.Bool("updated", updated),
|
|
slog.Duration("duration", time.Since(started)),
|
|
)
|
|
}()
|
|
}
|
|
|
|
func (r *MetadataRetryer) Handle(ctx context.Context, req *mcp.CallToolRequest, in RetryMetadataInput) (*mcp.CallToolResult, RetryMetadataOutput, error) {
|
|
limit := in.Limit
|
|
if limit <= 0 {
|
|
limit = 100
|
|
}
|
|
|
|
project, err := resolveProject(ctx, r.store, r.sessions, req, in.Project, false)
|
|
if err != nil {
|
|
return nil, RetryMetadataOutput{}, err
|
|
}
|
|
|
|
var projectID *int64
|
|
if project != nil {
|
|
projectID = &project.NumericID
|
|
}
|
|
|
|
thoughts, err := r.store.ListThoughtsPendingMetadataRetry(ctx, limit, projectID, in.IncludeArchived, in.OlderThanDays)
|
|
if err != nil {
|
|
return nil, RetryMetadataOutput{}, err
|
|
}
|
|
|
|
out := RetryMetadataOutput{
|
|
Scanned: len(thoughts),
|
|
DryRun: in.DryRun,
|
|
}
|
|
if in.DryRun || len(thoughts) == 0 {
|
|
return nil, out, nil
|
|
}
|
|
|
|
sem := semaphore.NewWeighted(metadataRetryConcurrency)
|
|
var mu sync.Mutex
|
|
var wg sync.WaitGroup
|
|
|
|
for _, thought := range thoughts {
|
|
if ctx.Err() != nil {
|
|
break
|
|
}
|
|
if err := sem.Acquire(ctx, 1); err != nil {
|
|
break
|
|
}
|
|
|
|
wg.Add(1)
|
|
go func(thought thoughttypes.Thought) {
|
|
defer wg.Done()
|
|
defer sem.Release(1)
|
|
|
|
mu.Lock()
|
|
out.Retried++
|
|
mu.Unlock()
|
|
|
|
if !r.lock.Acquire(thought.ID, 15*time.Minute) {
|
|
mu.Lock()
|
|
out.Skipped++
|
|
mu.Unlock()
|
|
return
|
|
}
|
|
updated, err := r.retryOne(ctx, thought.ID)
|
|
r.lock.Release(thought.ID)
|
|
if err != nil {
|
|
mu.Lock()
|
|
out.Failures = append(out.Failures, RetryMetadataFailure{ID: fmt.Sprint(thought.ID), Error: err.Error()})
|
|
mu.Unlock()
|
|
return
|
|
}
|
|
if updated {
|
|
mu.Lock()
|
|
out.Updated++
|
|
mu.Unlock()
|
|
return
|
|
}
|
|
|
|
mu.Lock()
|
|
out.Skipped++
|
|
mu.Unlock()
|
|
}(thought)
|
|
}
|
|
|
|
wg.Wait()
|
|
out.Failed = len(out.Failures)
|
|
|
|
return nil, out, nil
|
|
}
|
|
|
|
func (r *MetadataRetryer) retryOne(ctx context.Context, id int64) (bool, error) {
|
|
thought, err := r.store.GetThoughtByID(ctx, id)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if thought.Metadata.MetadataStatus == metadata.MetadataStatusComplete {
|
|
return false, nil
|
|
}
|
|
|
|
attemptCtx := ctx
|
|
if r.metadataTimeout > 0 {
|
|
var cancel context.CancelFunc
|
|
attemptCtx, cancel = context.WithTimeout(ctx, r.metadataTimeout)
|
|
defer cancel()
|
|
}
|
|
|
|
attemptedAt := time.Now().UTC()
|
|
extracted, extractErr := r.metadata.ExtractMetadata(attemptCtx, thought.Content)
|
|
if extractErr != nil {
|
|
failedMetadata := metadata.MarkMetadataFailed(thought.Metadata, r.capture, attemptedAt, extractErr)
|
|
if _, updateErr := r.store.UpdateThoughtMetadata(ctx, thought.ID, failedMetadata); updateErr != nil {
|
|
return false, updateErr
|
|
}
|
|
return false, extractErr
|
|
}
|
|
|
|
completedMetadata := metadata.MarkMetadataComplete(metadata.SanitizeExtracted(extracted), r.capture, attemptedAt)
|
|
completedMetadata.Attachments = thought.Metadata.Attachments
|
|
if _, updateErr := r.store.UpdateThoughtMetadata(ctx, thought.ID, completedMetadata); updateErr != nil {
|
|
return false, updateErr
|
|
}
|
|
|
|
return true, nil
|
|
}
|