diff --git a/README.md b/README.md index 5068e7d..cbb576a 100644 --- a/README.md +++ b/README.md @@ -46,6 +46,7 @@ A Go MCP server for capturing and retrieving thoughts, memory, and project conte | `list_files` | Browse stored files by thought, project, or kind | | `backfill_embeddings` | Generate missing embeddings for stored thoughts | | `reparse_thought_metadata` | Re-extract and normalize metadata for stored thoughts | +| `retry_failed_metadata` | Retry metadata extraction for thoughts still pending or failed | ## Configuration @@ -115,6 +116,24 @@ Run `reparse_thought_metadata` to fix stale or inconsistent metadata by re-extra - If extraction fails for a thought, existing metadata is normalized and written only if it changes - Metadata reparse runs in parallel (4 workers); one failure does not abort the run +## Failed Metadata Retry + +`capture_thought` now stores the thought even when metadata extraction times out or fails. Those thoughts are marked with `metadata_status: "pending"` and retried in the background. Use `retry_failed_metadata` to sweep any thoughts still marked `pending` or `failed`. + +```json +{ + "project": "optional-project-name", + "limit": 100, + "include_archived": false, + "older_than_days": 1, + "dry_run": false +} +``` + +- `dry_run: true` scans only and does not call metadata extraction or write updates +- successful retries mark the thought metadata as `complete` and clear the last error +- failed retries update the retry markers so the daily sweep can pick them up again later + ## File Storage Use `save_file` to persist binary files as base64. Files can optionally be linked to a memory by passing `thought_id`, which also adds an attachment reference to that thought's metadata. @@ -148,6 +167,27 @@ List files for a thought or project with: } ``` +AMCS also supports direct authenticated HTTP uploads to `/files` for clients that want to stream file bodies instead of base64-encoding them into an MCP tool call. + +Multipart upload: + +```bash +curl -X POST http://localhost:8080/files \ + -H "x-brain-key: " \ + -F "file=@./diagram.png" \ + -F "project=amcs" \ + -F "kind=image" +``` + +Raw body upload: + +```bash +curl -X POST "http://localhost:8080/files?project=amcs&name=meeting-notes.pdf" \ + -H "x-brain-key: " \ + -H "Content-Type: application/pdf" \ + --data-binary @./meeting-notes.pdf +``` + **Automatic backfill** (optional, config-gated): ```yaml @@ -160,6 +200,15 @@ backfill: include_archived: false ``` +```yaml +metadata_retry: + enabled: true + run_on_startup: true # retry failed metadata once on server start + interval: "24h" # retry pending/failed metadata daily + max_per_run: 100 + include_archived: false +``` + **Search fallback**: when no embeddings exist for the active model in scope, `search_thoughts`, `recall_context`, `get_project_context`, `summarize_thoughts`, and `related_thoughts` automatically fall back to Postgres full-text search so results are never silently empty. ## Client Setup diff --git a/configs/config.example.yaml b/configs/config.example.yaml index 332b346..4b176cc 100644 --- a/configs/config.example.yaml +++ b/configs/config.example.yaml @@ -90,3 +90,10 @@ backfill: batch_size: 20 max_per_run: 100 include_archived: false + +metadata_retry: + enabled: false + run_on_startup: false + interval: "24h" + max_per_run: 100 + include_archived: false diff --git a/configs/dev.yaml b/configs/dev.yaml index f665bf4..3e7766c 100644 --- a/configs/dev.yaml +++ b/configs/dev.yaml @@ -80,3 +80,10 @@ logging: observability: metrics_enabled: true pprof_enabled: false + +metadata_retry: + enabled: false + run_on_startup: false + interval: "24h" + max_per_run: 100 + include_archived: false diff --git a/configs/docker.yaml b/configs/docker.yaml index 990c6aa..1cda2be 100644 --- a/configs/docker.yaml +++ b/configs/docker.yaml @@ -79,3 +79,10 @@ logging: observability: metrics_enabled: true pprof_enabled: false + +metadata_retry: + enabled: false + run_on_startup: false + interval: "24h" + max_per_run: 100 + include_archived: false diff --git a/internal/app/app.go b/internal/app/app.go index c99802a..0fe9425 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -93,6 +93,25 @@ func Run(ctx context.Context, configPath string) error { }() } + if cfg.MetadataRetry.Enabled && cfg.MetadataRetry.RunOnStartup { + go runMetadataRetryPass(ctx, db, provider, cfg, activeProjects, logger) + } + + if cfg.MetadataRetry.Enabled && cfg.MetadataRetry.Interval > 0 { + go func() { + ticker := time.NewTicker(cfg.MetadataRetry.Interval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + runMetadataRetryPass(ctx, db, provider, cfg, activeProjects, logger) + } + } + }() + } + server := &http.Server{ Addr: fmt.Sprintf("%s:%d", cfg.Server.Host, cfg.Server.Port), Handler: routes(logger, cfg, db, provider, keyring, oauthRegistry, tokenStore, authCodes, dynClients, activeProjects), @@ -127,33 +146,38 @@ func Run(ctx context.Context, configPath string) error { func routes(logger *slog.Logger, cfg *config.Config, db *store.DB, provider ai.Provider, keyring *auth.Keyring, oauthRegistry *auth.OAuthRegistry, tokenStore *auth.TokenStore, authCodes *auth.AuthCodeStore, dynClients *auth.DynamicClientStore, activeProjects *session.ActiveProjects) http.Handler { mux := http.NewServeMux() + authMiddleware := auth.Middleware(cfg.Auth, keyring, oauthRegistry, tokenStore, logger) + filesTool := tools.NewFilesTool(db, activeProjects) + metadataRetryer := tools.NewMetadataRetryer(context.Background(), db, provider, cfg.Capture, cfg.AI.Metadata.Timeout, activeProjects, logger) toolSet := mcpserver.ToolSet{ - Capture: tools.NewCaptureTool(db, provider, cfg.Capture, cfg.AI.Metadata.Timeout, activeProjects, logger), - Search: tools.NewSearchTool(db, provider, cfg.Search, activeProjects), - List: tools.NewListTool(db, cfg.Search, activeProjects), - Stats: tools.NewStatsTool(db), - Get: tools.NewGetTool(db), - Update: tools.NewUpdateTool(db, provider, cfg.Capture, logger), - Delete: tools.NewDeleteTool(db), - Archive: tools.NewArchiveTool(db), - Projects: tools.NewProjectsTool(db, activeProjects), - Context: tools.NewContextTool(db, provider, cfg.Search, activeProjects), - Recall: tools.NewRecallTool(db, provider, cfg.Search, activeProjects), - Summarize: tools.NewSummarizeTool(db, provider, cfg.Search, activeProjects), - Links: tools.NewLinksTool(db, provider, cfg.Search), - Files: tools.NewFilesTool(db, activeProjects), - Backfill: tools.NewBackfillTool(db, provider, activeProjects, logger), - Reparse: tools.NewReparseMetadataTool(db, provider, cfg.Capture, activeProjects, logger), - Household: tools.NewHouseholdTool(db), - Maintenance: tools.NewMaintenanceTool(db), - Calendar: tools.NewCalendarTool(db), - Meals: tools.NewMealsTool(db), - CRM: tools.NewCRMTool(db), + Capture: tools.NewCaptureTool(db, provider, cfg.Capture, cfg.AI.Metadata.Timeout, activeProjects, metadataRetryer, logger), + Search: tools.NewSearchTool(db, provider, cfg.Search, activeProjects), + List: tools.NewListTool(db, cfg.Search, activeProjects), + Stats: tools.NewStatsTool(db), + Get: tools.NewGetTool(db), + Update: tools.NewUpdateTool(db, provider, cfg.Capture, logger), + Delete: tools.NewDeleteTool(db), + Archive: tools.NewArchiveTool(db), + Projects: tools.NewProjectsTool(db, activeProjects), + Context: tools.NewContextTool(db, provider, cfg.Search, activeProjects), + Recall: tools.NewRecallTool(db, provider, cfg.Search, activeProjects), + Summarize: tools.NewSummarizeTool(db, provider, cfg.Search, activeProjects), + Links: tools.NewLinksTool(db, provider, cfg.Search), + Files: filesTool, + Backfill: tools.NewBackfillTool(db, provider, activeProjects, logger), + Reparse: tools.NewReparseMetadataTool(db, provider, cfg.Capture, activeProjects, logger), + RetryMetadata: tools.NewRetryMetadataTool(metadataRetryer), + Household: tools.NewHouseholdTool(db), + Maintenance: tools.NewMaintenanceTool(db), + Calendar: tools.NewCalendarTool(db), + Meals: tools.NewMealsTool(db), + CRM: tools.NewCRMTool(db), } mcpHandler := mcpserver.New(cfg.MCP, toolSet) - mux.Handle(cfg.MCP.Path, auth.Middleware(cfg.Auth, keyring, oauthRegistry, tokenStore, logger)(mcpHandler)) + mux.Handle(cfg.MCP.Path, authMiddleware(mcpHandler)) + mux.Handle("/files", authMiddleware(fileUploadHandler(filesTool))) if oauthRegistry != nil && tokenStore != nil { mux.HandleFunc("/.well-known/oauth-authorization-server", oauthMetadataHandler()) mux.HandleFunc("/oauth-authorization-server", oauthMetadataHandler()) @@ -245,6 +269,25 @@ func routes(logger *slog.Logger, cfg *config.Config, db *store.DB, provider ai.P ) } +func runMetadataRetryPass(ctx context.Context, db *store.DB, provider ai.Provider, cfg *config.Config, activeProjects *session.ActiveProjects, logger *slog.Logger) { + retryer := tools.NewMetadataRetryer(ctx, db, provider, cfg.Capture, cfg.AI.Metadata.Timeout, activeProjects, logger) + _, out, err := retryer.Handle(ctx, nil, tools.RetryMetadataInput{ + Limit: cfg.MetadataRetry.MaxPerRun, + IncludeArchived: cfg.MetadataRetry.IncludeArchived, + OlderThanDays: 1, + }) + if err != nil { + logger.Error("auto metadata retry failed", slog.String("error", err.Error())) + return + } + logger.Info("auto metadata retry pass", + slog.Int("scanned", out.Scanned), + slog.Int("retried", out.Retried), + slog.Int("updated", out.Updated), + slog.Int("failed", out.Failed), + ) +} + func runBackfillPass(ctx context.Context, db *store.DB, provider ai.Provider, cfg config.BackfillConfig, logger *slog.Logger) { backfiller := tools.NewBackfillTool(db, provider, nil, logger) _, out, err := backfiller.Handle(ctx, nil, tools.BackfillInput{ diff --git a/internal/app/files.go b/internal/app/files.go new file mode 100644 index 0000000..223e9a2 --- /dev/null +++ b/internal/app/files.go @@ -0,0 +1,112 @@ +package app + +import ( + "encoding/json" + "errors" + "io" + "mime" + "net/http" + "strings" + + "git.warky.dev/wdevs/amcs/internal/tools" +) + +const maxUploadBytes = 50 << 20 + +func fileUploadHandler(files *tools.FilesTool) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + w.Header().Set("Allow", http.MethodPost) + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + + r.Body = http.MaxBytesReader(w, r.Body, maxUploadBytes) + + in, err := parseUploadRequest(r) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + out, err := files.SaveDecoded(r.Context(), nil, in) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusCreated) + _ = json.NewEncoder(w).Encode(out) + }) +} + +func parseUploadRequest(r *http.Request) (tools.SaveFileDecodedInput, error) { + contentType := r.Header.Get("Content-Type") + mediaType, _, _ := mime.ParseMediaType(contentType) + + if strings.HasPrefix(mediaType, "multipart/form-data") { + return parseMultipartUpload(r) + } + + return parseRawUpload(r) +} + +func parseMultipartUpload(r *http.Request) (tools.SaveFileDecodedInput, error) { + if err := r.ParseMultipartForm(maxUploadBytes); err != nil { + return tools.SaveFileDecodedInput{}, err + } + + file, header, err := r.FormFile("file") + if err != nil { + return tools.SaveFileDecodedInput{}, errors.New("multipart upload requires a file field named \"file\"") + } + defer file.Close() + + content, err := io.ReadAll(file) + if err != nil { + return tools.SaveFileDecodedInput{}, err + } + + return tools.SaveFileDecodedInput{ + Name: firstNonEmpty(r.FormValue("name"), header.Filename), + Content: content, + MediaType: firstNonEmpty(r.FormValue("media_type"), header.Header.Get("Content-Type")), + Kind: r.FormValue("kind"), + ThoughtID: r.FormValue("thought_id"), + Project: r.FormValue("project"), + }, nil +} + +func parseRawUpload(r *http.Request) (tools.SaveFileDecodedInput, error) { + content, err := io.ReadAll(r.Body) + if err != nil { + return tools.SaveFileDecodedInput{}, err + } + + name := firstNonEmpty( + r.URL.Query().Get("name"), + r.Header.Get("X-File-Name"), + ) + if strings.TrimSpace(name) == "" { + return tools.SaveFileDecodedInput{}, errors.New("raw upload requires a file name via query param \"name\" or X-File-Name header") + } + + return tools.SaveFileDecodedInput{ + Name: name, + Content: content, + MediaType: r.Header.Get("Content-Type"), + Kind: firstNonEmpty(r.URL.Query().Get("kind"), r.Header.Get("X-File-Kind")), + ThoughtID: firstNonEmpty(r.URL.Query().Get("thought_id"), r.Header.Get("X-Thought-Id")), + Project: firstNonEmpty(r.URL.Query().Get("project"), r.Header.Get("X-Project")), + }, nil +} + +func firstNonEmpty(values ...string) string { + for _, value := range values { + if strings.TrimSpace(value) != "" { + return strings.TrimSpace(value) + } + } + return "" +} diff --git a/internal/app/files_test.go b/internal/app/files_test.go new file mode 100644 index 0000000..085383f --- /dev/null +++ b/internal/app/files_test.go @@ -0,0 +1,57 @@ +package app + +import ( + "bytes" + "mime/multipart" + "net/http" + "net/http/httptest" + "testing" +) + +func TestParseRawUploadRequiresName(t *testing.T) { + req := httptest.NewRequest(http.MethodPost, "/files", bytes.NewReader([]byte("hello"))) + req.Header.Set("Content-Type", "application/octet-stream") + + _, err := parseRawUpload(req) + if err == nil { + t.Fatal("expected error for missing name") + } +} + +func TestParseMultipartUploadUsesFileMetadata(t *testing.T) { + var body bytes.Buffer + writer := multipart.NewWriter(&body) + + part, err := writer.CreateFormFile("file", "note.txt") + if err != nil { + t.Fatalf("create form file: %v", err) + } + if _, err := part.Write([]byte("hello world")); err != nil { + t.Fatalf("write form file: %v", err) + } + _ = writer.WriteField("project", "amcs") + _ = writer.WriteField("kind", "document") + if err := writer.Close(); err != nil { + t.Fatalf("close writer: %v", err) + } + + req := httptest.NewRequest(http.MethodPost, "/files", &body) + req.Header.Set("Content-Type", writer.FormDataContentType()) + + got, err := parseMultipartUpload(req) + if err != nil { + t.Fatalf("parse multipart upload: %v", err) + } + if got.Name != "note.txt" { + t.Fatalf("name = %q, want note.txt", got.Name) + } + if string(got.Content) != "hello world" { + t.Fatalf("content = %q, want hello world", string(got.Content)) + } + if got.Project != "amcs" { + t.Fatalf("project = %q, want amcs", got.Project) + } + if got.Kind != "document" { + t.Fatalf("kind = %q, want document", got.Kind) + } +} diff --git a/internal/config/config.go b/internal/config/config.go index 84c0daa..5cecd2e 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -18,6 +18,7 @@ type Config struct { Logging LoggingConfig `yaml:"logging"` Observability ObservabilityConfig `yaml:"observability"` Backfill BackfillConfig `yaml:"backfill"` + MetadataRetry MetadataRetryConfig `yaml:"metadata_retry"` } type ServerConfig struct { @@ -152,6 +153,14 @@ type BackfillConfig struct { IncludeArchived bool `yaml:"include_archived"` } +type MetadataRetryConfig struct { + Enabled bool `yaml:"enabled"` + RunOnStartup bool `yaml:"run_on_startup"` + Interval time.Duration `yaml:"interval"` + MaxPerRun int `yaml:"max_per_run"` + IncludeArchived bool `yaml:"include_archived"` +} + func (c AIMetadataConfig) EffectiveFallbackModels() []string { models := make([]string, 0, len(c.FallbackModels)+1) for _, model := range c.FallbackModels { diff --git a/internal/config/loader.go b/internal/config/loader.go index 2448248..00aca67 100644 --- a/internal/config/loader.go +++ b/internal/config/loader.go @@ -103,6 +103,12 @@ func defaultConfig() Config { BatchSize: 20, MaxPerRun: 100, }, + MetadataRetry: MetadataRetryConfig{ + Enabled: false, + RunOnStartup: false, + Interval: 24 * time.Hour, + MaxPerRun: 100, + }, } } diff --git a/internal/config/validate.go b/internal/config/validate.go index 2280f7c..020203b 100644 --- a/internal/config/validate.go +++ b/internal/config/validate.go @@ -89,6 +89,11 @@ func (c Config) Validate() error { return fmt.Errorf("invalid config: backfill.max_per_run must be >= backfill.batch_size") } } + if c.MetadataRetry.Enabled { + if c.MetadataRetry.MaxPerRun <= 0 { + return fmt.Errorf("invalid config: metadata_retry.max_per_run must be greater than zero when metadata_retry is enabled") + } + } return nil } diff --git a/internal/config/validate_test.go b/internal/config/validate_test.go index a701c0d..06bce4d 100644 --- a/internal/config/validate_test.go +++ b/internal/config/validate_test.go @@ -111,3 +111,13 @@ func TestValidateRejectsEmptyAuth(t *testing.T) { t.Fatal("Validate() error = nil, want error when neither auth.keys nor auth.oauth.clients is configured") } } + +func TestValidateRejectsInvalidMetadataRetryConfig(t *testing.T) { + cfg := validConfig() + cfg.MetadataRetry.Enabled = true + cfg.MetadataRetry.MaxPerRun = 0 + + if err := cfg.Validate(); err == nil { + t.Fatal("Validate() error = nil, want error for invalid metadata retry config") + } +} diff --git a/internal/mcpserver/server.go b/internal/mcpserver/server.go index 44904a2..082f604 100644 --- a/internal/mcpserver/server.go +++ b/internal/mcpserver/server.go @@ -11,27 +11,28 @@ import ( ) type ToolSet struct { - Capture *tools.CaptureTool - Search *tools.SearchTool - List *tools.ListTool - Stats *tools.StatsTool - Get *tools.GetTool - Update *tools.UpdateTool - Delete *tools.DeleteTool - Archive *tools.ArchiveTool - Projects *tools.ProjectsTool - Context *tools.ContextTool - Recall *tools.RecallTool - Summarize *tools.SummarizeTool - Links *tools.LinksTool - Files *tools.FilesTool - Backfill *tools.BackfillTool - Reparse *tools.ReparseMetadataTool - Household *tools.HouseholdTool - Maintenance *tools.MaintenanceTool - Calendar *tools.CalendarTool - Meals *tools.MealsTool - CRM *tools.CRMTool + Capture *tools.CaptureTool + Search *tools.SearchTool + List *tools.ListTool + Stats *tools.StatsTool + Get *tools.GetTool + Update *tools.UpdateTool + Delete *tools.DeleteTool + Archive *tools.ArchiveTool + Projects *tools.ProjectsTool + Context *tools.ContextTool + Recall *tools.RecallTool + Summarize *tools.SummarizeTool + Links *tools.LinksTool + Files *tools.FilesTool + Backfill *tools.BackfillTool + Reparse *tools.ReparseMetadataTool + RetryMetadata *tools.RetryMetadataTool + Household *tools.HouseholdTool + Maintenance *tools.MaintenanceTool + Calendar *tools.CalendarTool + Meals *tools.MealsTool + CRM *tools.CRMTool } func New(cfg config.MCPConfig, toolSet ToolSet) http.Handler { @@ -150,6 +151,11 @@ func New(cfg config.MCPConfig, toolSet ToolSet) http.Handler { Description: "Re-extract and normalize metadata for stored thoughts from their content.", }, toolSet.Reparse.Handle) + addTool(server, &mcp.Tool{ + Name: "retry_failed_metadata", + Description: "Retry metadata extraction for thoughts still marked pending or failed.", + }, toolSet.RetryMetadata.Handle) + // Household Knowledge addTool(server, &mcp.Tool{ Name: "add_household_item", diff --git a/internal/metadata/normalize.go b/internal/metadata/normalize.go index 29529e9..7237c2e 100644 --- a/internal/metadata/normalize.go +++ b/internal/metadata/normalize.go @@ -3,15 +3,19 @@ package metadata import ( "sort" "strings" + "time" "git.warky.dev/wdevs/amcs/internal/config" thoughttypes "git.warky.dev/wdevs/amcs/internal/types" ) const ( - DefaultType = "observation" - DefaultTopicFallback = "uncategorized" - maxTopics = 10 + DefaultType = "observation" + DefaultTopicFallback = "uncategorized" + MetadataStatusComplete = "complete" + MetadataStatusPending = "pending" + MetadataStatusFailed = "failed" + maxTopics = 10 ) var allowedTypes = map[string]struct{}{ @@ -36,18 +40,23 @@ func Fallback(capture config.CaptureConfig) thoughttypes.ThoughtMetadata { Type: normalizeType(capture.MetadataDefaults.Type), Source: normalizeSource(capture.Source), Attachments: []thoughttypes.ThoughtAttachment{}, + MetadataStatus: MetadataStatusComplete, } } func Normalize(in thoughttypes.ThoughtMetadata, capture config.CaptureConfig) thoughttypes.ThoughtMetadata { out := thoughttypes.ThoughtMetadata{ - People: normalizeList(in.People, 0), - ActionItems: normalizeList(in.ActionItems, 0), - DatesMentioned: normalizeList(in.DatesMentioned, 0), - Topics: normalizeList(in.Topics, maxTopics), - Type: normalizeType(in.Type), - Source: normalizeSource(in.Source), - Attachments: normalizeAttachments(in.Attachments), + People: normalizeList(in.People, 0), + ActionItems: normalizeList(in.ActionItems, 0), + DatesMentioned: normalizeList(in.DatesMentioned, 0), + Topics: normalizeList(in.Topics, maxTopics), + Type: normalizeType(in.Type), + Source: normalizeSource(in.Source), + Attachments: normalizeAttachments(in.Attachments), + MetadataStatus: normalizeMetadataStatus(in.MetadataStatus), + MetadataUpdatedAt: strings.TrimSpace(in.MetadataUpdatedAt), + MetadataLastAttemptedAt: strings.TrimSpace(in.MetadataLastAttemptedAt), + MetadataError: strings.TrimSpace(in.MetadataError), } if len(out.Topics) == 0 { @@ -59,10 +68,48 @@ func Normalize(in thoughttypes.ThoughtMetadata, capture config.CaptureConfig) th if out.Source == "" { out.Source = Fallback(capture).Source } + if out.MetadataStatus == "" { + out.MetadataStatus = MetadataStatusComplete + } + if out.MetadataStatus == MetadataStatusComplete { + out.MetadataError = "" + } return out } +func MarkMetadataPending(base thoughttypes.ThoughtMetadata, capture config.CaptureConfig, attempt time.Time, err error) thoughttypes.ThoughtMetadata { + out := Normalize(base, capture) + out.MetadataStatus = MetadataStatusPending + out.MetadataLastAttemptedAt = attempt.UTC().Format(time.RFC3339) + if err != nil { + out.MetadataError = strings.TrimSpace(err.Error()) + } + out.MetadataUpdatedAt = strings.TrimSpace(base.MetadataUpdatedAt) + return out +} + +func MarkMetadataFailed(base thoughttypes.ThoughtMetadata, capture config.CaptureConfig, attempt time.Time, err error) thoughttypes.ThoughtMetadata { + out := Normalize(base, capture) + out.MetadataStatus = MetadataStatusFailed + out.MetadataLastAttemptedAt = attempt.UTC().Format(time.RFC3339) + if err != nil { + out.MetadataError = strings.TrimSpace(err.Error()) + } + out.MetadataUpdatedAt = strings.TrimSpace(base.MetadataUpdatedAt) + return out +} + +func MarkMetadataComplete(base thoughttypes.ThoughtMetadata, capture config.CaptureConfig, updatedAt time.Time) thoughttypes.ThoughtMetadata { + out := Normalize(base, capture) + out.MetadataStatus = MetadataStatusComplete + timestamp := updatedAt.UTC().Format(time.RFC3339) + out.MetadataUpdatedAt = timestamp + out.MetadataLastAttemptedAt = timestamp + out.MetadataError = "" + return out +} + func normalizeList(values []string, limit int) []string { seen := make(map[string]struct{}, len(values)) result := make([]string, 0, len(values)) @@ -100,6 +147,19 @@ func normalizeType(value string) string { return DefaultType } +func normalizeMetadataStatus(value string) string { + switch strings.ToLower(strings.TrimSpace(value)) { + case "", MetadataStatusComplete: + return MetadataStatusComplete + case MetadataStatusPending: + return MetadataStatusPending + case MetadataStatusFailed: + return MetadataStatusFailed + default: + return MetadataStatusComplete + } +} + func normalizeSource(value string) string { normalized := strings.TrimSpace(value) if normalized == "" { diff --git a/internal/metadata/normalize_test.go b/internal/metadata/normalize_test.go index f09b3a0..aa75b54 100644 --- a/internal/metadata/normalize_test.go +++ b/internal/metadata/normalize_test.go @@ -3,6 +3,7 @@ package metadata import ( "strings" "testing" + "time" "github.com/google/uuid" @@ -31,6 +32,9 @@ func TestFallbackUsesConfiguredDefaults(t *testing.T) { if got.Source != "mcp" { t.Fatalf("Fallback source = %q, want mcp", got.Source) } + if got.MetadataStatus != MetadataStatusComplete { + t.Fatalf("Fallback metadata status = %q, want complete", got.MetadataStatus) + } } func TestNormalizeTrimsDedupesAndCapsTopics(t *testing.T) { @@ -102,3 +106,56 @@ func TestNormalizeDedupesAttachmentsByFileID(t *testing.T) { t.Fatalf("Attachment kind = %q, want image", got.Attachments[0].Kind) } } + +func TestMarkMetadataPendingTracksAttemptWithoutClearingPreviousSuccess(t *testing.T) { + attempt := time.Date(2026, 3, 30, 10, 0, 0, 0, time.UTC) + base := thoughttypes.ThoughtMetadata{ + Topics: []string{"go"}, + MetadataUpdatedAt: "2026-03-29T10:00:00Z", + } + + got := MarkMetadataPending(base, testCaptureConfig(), attempt, errTestMetadataFailure) + if got.MetadataStatus != MetadataStatusPending { + t.Fatalf("MetadataStatus = %q, want pending", got.MetadataStatus) + } + if got.MetadataUpdatedAt != "2026-03-29T10:00:00Z" { + t.Fatalf("MetadataUpdatedAt = %q, want previous success timestamp", got.MetadataUpdatedAt) + } + if got.MetadataLastAttemptedAt != "2026-03-30T10:00:00Z" { + t.Fatalf("MetadataLastAttemptedAt = %q, want attempt timestamp", got.MetadataLastAttemptedAt) + } + if got.MetadataError == "" { + t.Fatal("MetadataError is empty, want failure message") + } +} + +func TestMarkMetadataCompleteClearsErrorAndSetsTimestamps(t *testing.T) { + attempt := time.Date(2026, 3, 30, 10, 0, 0, 0, time.UTC) + + got := MarkMetadataComplete(thoughttypes.ThoughtMetadata{ + Topics: []string{"go"}, + MetadataStatus: MetadataStatusFailed, + MetadataError: "timeout", + }, testCaptureConfig(), attempt) + + if got.MetadataStatus != MetadataStatusComplete { + t.Fatalf("MetadataStatus = %q, want complete", got.MetadataStatus) + } + if got.MetadataUpdatedAt != "2026-03-30T10:00:00Z" { + t.Fatalf("MetadataUpdatedAt = %q, want completion timestamp", got.MetadataUpdatedAt) + } + if got.MetadataLastAttemptedAt != "2026-03-30T10:00:00Z" { + t.Fatalf("MetadataLastAttemptedAt = %q, want completion timestamp", got.MetadataLastAttemptedAt) + } + if got.MetadataError != "" { + t.Fatalf("MetadataError = %q, want empty", got.MetadataError) + } +} + +var errTestMetadataFailure = testError("timeout") + +type testError string + +func (e testError) Error() string { + return string(e) +} diff --git a/internal/store/thoughts.go b/internal/store/thoughts.go index af2a562..e1a09bb 100644 --- a/internal/store/thoughts.go +++ b/internal/store/thoughts.go @@ -277,6 +277,28 @@ func (db *DB) UpdateThought(ctx context.Context, id uuid.UUID, content string, e return db.GetThought(ctx, id) } +func (db *DB) UpdateThoughtMetadata(ctx context.Context, id uuid.UUID, metadata thoughttypes.ThoughtMetadata) (thoughttypes.Thought, error) { + metadataBytes, err := json.Marshal(metadata) + if err != nil { + return thoughttypes.Thought{}, fmt.Errorf("marshal updated metadata: %w", err) + } + + tag, err := db.pool.Exec(ctx, ` + update thoughts + set metadata = $2::jsonb, + updated_at = now() + where guid = $1 + `, id, metadataBytes) + if err != nil { + return thoughttypes.Thought{}, fmt.Errorf("update thought metadata: %w", err) + } + if tag.RowsAffected() == 0 { + return thoughttypes.Thought{}, pgx.ErrNoRows + } + + return db.GetThought(ctx, id) +} + func (db *DB) DeleteThought(ctx context.Context, id uuid.UUID) error { tag, err := db.pool.Exec(ctx, `delete from thoughts where guid = $1`, id) if err != nil { @@ -309,6 +331,58 @@ func (db *DB) RecentThoughts(ctx context.Context, projectID *uuid.UUID, limit in return db.ListThoughts(ctx, filter) } +func (db *DB) ListThoughtsPendingMetadataRetry(ctx context.Context, limit int, projectID *uuid.UUID, includeArchived bool, olderThanDays int) ([]thoughttypes.Thought, error) { + args := make([]any, 0, 4) + conditions := []string{ + "(metadata->>'metadata_status' = 'pending' or metadata->>'metadata_status' = 'failed')", + } + + if !includeArchived { + conditions = append(conditions, "archived_at is null") + } + if projectID != nil { + args = append(args, *projectID) + conditions = append(conditions, fmt.Sprintf("project_id = $%d", len(args))) + } + if olderThanDays > 0 { + args = append(args, olderThanDays) + conditions = append(conditions, fmt.Sprintf("coalesce(nullif(metadata->>'metadata_last_attempted_at', '')::timestamptz, created_at) <= now() - ($%d * interval '1 day')", len(args))) + } + + query := ` + select guid, content, metadata, project_id, archived_at, created_at, updated_at + from thoughts + where ` + strings.Join(conditions, " and ") + + args = append(args, limit) + query += fmt.Sprintf(" order by coalesce(nullif(metadata->>'metadata_last_attempted_at', '')::timestamptz, created_at) asc limit $%d", len(args)) + + rows, err := db.pool.Query(ctx, query, args...) + if err != nil { + return nil, fmt.Errorf("list thoughts pending metadata retry: %w", err) + } + defer rows.Close() + + thoughts := make([]thoughttypes.Thought, 0, limit) + for rows.Next() { + var thought thoughttypes.Thought + var metadataBytes []byte + if err := rows.Scan(&thought.ID, &thought.Content, &metadataBytes, &thought.ProjectID, &thought.ArchivedAt, &thought.CreatedAt, &thought.UpdatedAt); err != nil { + return nil, fmt.Errorf("scan pending metadata retry thought: %w", err) + } + if err := json.Unmarshal(metadataBytes, &thought.Metadata); err != nil { + return nil, fmt.Errorf("decode pending metadata retry thought: %w", err) + } + thoughts = append(thoughts, thought) + } + + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("iterate pending metadata retry thoughts: %w", err) + } + + return thoughts, nil +} + func (db *DB) SearchSimilarThoughts(ctx context.Context, embedding []float32, embeddingModel string, threshold float64, limit int, projectID *uuid.UUID, excludeID *uuid.UUID) ([]thoughttypes.SearchResult, error) { args := []any{pgvector.NewVector(embedding), threshold, embeddingModel} conditions := []string{ diff --git a/internal/tools/capture.go b/internal/tools/capture.go index 3a9f879..d44e588 100644 --- a/internal/tools/capture.go +++ b/internal/tools/capture.go @@ -23,6 +23,7 @@ type CaptureTool struct { capture config.CaptureConfig sessions *session.ActiveProjects metadataTimeout time.Duration + retryer *MetadataRetryer log *slog.Logger } @@ -35,8 +36,8 @@ type CaptureOutput struct { Thought thoughttypes.Thought `json:"thought"` } -func NewCaptureTool(db *store.DB, provider ai.Provider, capture config.CaptureConfig, metadataTimeout time.Duration, sessions *session.ActiveProjects, log *slog.Logger) *CaptureTool { - return &CaptureTool{store: db, provider: provider, capture: capture, sessions: sessions, metadataTimeout: metadataTimeout, log: log} +func NewCaptureTool(db *store.DB, provider ai.Provider, capture config.CaptureConfig, metadataTimeout time.Duration, sessions *session.ActiveProjects, retryer *MetadataRetryer, log *slog.Logger) *CaptureTool { + return &CaptureTool{store: db, provider: provider, capture: capture, sessions: sessions, metadataTimeout: metadataTimeout, retryer: retryer, log: log} } func (t *CaptureTool) Handle(ctx context.Context, req *mcp.CallToolRequest, in CaptureInput) (*mcp.CallToolResult, CaptureOutput, error) { @@ -52,6 +53,7 @@ func (t *CaptureTool) Handle(ctx context.Context, req *mcp.CallToolRequest, in C var embedding []float32 rawMetadata := metadata.Fallback(t.capture) + metadataNeedsRetry := false group, groupCtx := errgroup.WithContext(ctx) group.Go(func() error { @@ -64,6 +66,7 @@ func (t *CaptureTool) Handle(ctx context.Context, req *mcp.CallToolRequest, in C }) group.Go(func() error { metaCtx := groupCtx + attemptedAt := time.Now().UTC() if t.metadataTimeout > 0 { var cancel context.CancelFunc metaCtx, cancel = context.WithTimeout(groupCtx, t.metadataTimeout) @@ -72,9 +75,11 @@ func (t *CaptureTool) Handle(ctx context.Context, req *mcp.CallToolRequest, in C extracted, err := t.provider.ExtractMetadata(metaCtx, content) if err != nil { t.log.Warn("metadata extraction failed, using fallback", slog.String("provider", t.provider.Name()), slog.String("error", err.Error())) + rawMetadata = metadata.MarkMetadataPending(rawMetadata, t.capture, attemptedAt, err) + metadataNeedsRetry = true return nil } - rawMetadata = extracted + rawMetadata = metadata.MarkMetadataComplete(extracted, t.capture, attemptedAt) return nil }) @@ -98,6 +103,9 @@ func (t *CaptureTool) Handle(ctx context.Context, req *mcp.CallToolRequest, in C if project != nil { _ = t.store.TouchProject(ctx, project.ID) } + if metadataNeedsRetry && t.retryer != nil { + t.retryer.QueueThought(created.ID) + } return nil, CaptureOutput{Thought: created}, nil } diff --git a/internal/tools/files.go b/internal/tools/files.go index d7ba94d..76f77b1 100644 --- a/internal/tools/files.go +++ b/internal/tools/files.go @@ -30,6 +30,15 @@ type SaveFileInput struct { Project string `json:"project,omitempty" jsonschema:"optional project name or id when saving outside a linked thought"` } +type SaveFileDecodedInput struct { + Name string + Content []byte + MediaType string + Kind string + ThoughtID string + Project string +} + type SaveFileOutput struct { File thoughttypes.StoredFile `json:"file"` } @@ -59,11 +68,6 @@ func NewFilesTool(db *store.DB, sessions *session.ActiveProjects) *FilesTool { } func (t *FilesTool) Save(ctx context.Context, req *mcp.CallToolRequest, in SaveFileInput) (*mcp.CallToolResult, SaveFileOutput, error) { - name := strings.TrimSpace(in.Name) - if name == "" { - return nil, SaveFileOutput{}, errInvalidInput("name is required") - } - contentBase64, mediaTypeFromDataURL := splitDataURL(strings.TrimSpace(in.ContentBase64)) if contentBase64 == "" { return nil, SaveFileOutput{}, errInvalidInput("content_base64 is required") @@ -73,66 +77,18 @@ func (t *FilesTool) Save(ctx context.Context, req *mcp.CallToolRequest, in SaveF if err != nil { return nil, SaveFileOutput{}, errInvalidInput("content_base64 must be valid base64") } - if len(content) == 0 { - return nil, SaveFileOutput{}, errInvalidInput("decoded file content must not be empty") - } - - project, err := resolveProject(ctx, t.store, t.sessions, req, in.Project, false) - if err != nil { - return nil, SaveFileOutput{}, err - } - - var thoughtID *uuid.UUID - var projectID = projectIDPtr(project) - if rawThoughtID := strings.TrimSpace(in.ThoughtID); rawThoughtID != "" { - parsedThoughtID, err := parseUUID(rawThoughtID) - if err != nil { - return nil, SaveFileOutput{}, err - } - thought, err := t.store.GetThought(ctx, parsedThoughtID) - if err != nil { - return nil, SaveFileOutput{}, err - } - thoughtID = &parsedThoughtID - projectID = thought.ProjectID - if project != nil && thought.ProjectID != nil && *thought.ProjectID != project.ID { - return nil, SaveFileOutput{}, errInvalidInput("project does not match the linked thought's project") - } - } - - mediaType := normalizeMediaType(strings.TrimSpace(in.MediaType), mediaTypeFromDataURL, content) - kind := normalizeFileKind(strings.TrimSpace(in.Kind), mediaType) - sum := sha256.Sum256(content) - - file := thoughttypes.StoredFile{ - Name: name, - MediaType: mediaType, - Kind: kind, - Encoding: "base64", - SizeBytes: int64(len(content)), - SHA256: hex.EncodeToString(sum[:]), + out, err := t.SaveDecoded(ctx, req, SaveFileDecodedInput{ + Name: in.Name, Content: content, - ProjectID: projectID, - } - if thoughtID != nil { - file.ThoughtID = thoughtID - } - - created, err := t.store.InsertStoredFile(ctx, file) + MediaType: firstNonEmpty(strings.TrimSpace(in.MediaType), mediaTypeFromDataURL), + Kind: in.Kind, + ThoughtID: in.ThoughtID, + Project: in.Project, + }) if err != nil { return nil, SaveFileOutput{}, err } - - if created.ThoughtID != nil { - if err := t.store.AddThoughtAttachment(ctx, *created.ThoughtID, thoughtAttachmentFromFile(created)); err != nil { - return nil, SaveFileOutput{}, err - } - } - if created.ProjectID != nil { - _ = t.store.TouchProject(ctx, *created.ProjectID) - } - - return nil, SaveFileOutput{File: created}, nil + return nil, out, nil } func (t *FilesTool) Load(ctx context.Context, _ *mcp.CallToolRequest, in LoadFileInput) (*mcp.CallToolResult, LoadFileOutput, error) { @@ -193,6 +149,73 @@ func (t *FilesTool) List(ctx context.Context, req *mcp.CallToolRequest, in ListF return nil, ListFilesOutput{Files: files}, nil } +func (t *FilesTool) SaveDecoded(ctx context.Context, req *mcp.CallToolRequest, in SaveFileDecodedInput) (SaveFileOutput, error) { + name := strings.TrimSpace(in.Name) + if name == "" { + return SaveFileOutput{}, errInvalidInput("name is required") + } + if len(in.Content) == 0 { + return SaveFileOutput{}, errInvalidInput("decoded file content must not be empty") + } + + project, err := resolveProject(ctx, t.store, t.sessions, req, in.Project, false) + if err != nil { + return SaveFileOutput{}, err + } + + var thoughtID *uuid.UUID + var projectID = projectIDPtr(project) + if rawThoughtID := strings.TrimSpace(in.ThoughtID); rawThoughtID != "" { + parsedThoughtID, err := parseUUID(rawThoughtID) + if err != nil { + return SaveFileOutput{}, err + } + thought, err := t.store.GetThought(ctx, parsedThoughtID) + if err != nil { + return SaveFileOutput{}, err + } + thoughtID = &parsedThoughtID + projectID = thought.ProjectID + if project != nil && thought.ProjectID != nil && *thought.ProjectID != project.ID { + return SaveFileOutput{}, errInvalidInput("project does not match the linked thought's project") + } + } + + mediaType := normalizeMediaType(strings.TrimSpace(in.MediaType), "", in.Content) + kind := normalizeFileKind(strings.TrimSpace(in.Kind), mediaType) + sum := sha256.Sum256(in.Content) + + file := thoughttypes.StoredFile{ + Name: name, + MediaType: mediaType, + Kind: kind, + Encoding: "base64", + SizeBytes: int64(len(in.Content)), + SHA256: hex.EncodeToString(sum[:]), + Content: in.Content, + ProjectID: projectID, + } + if thoughtID != nil { + file.ThoughtID = thoughtID + } + + created, err := t.store.InsertStoredFile(ctx, file) + if err != nil { + return SaveFileOutput{}, err + } + + if created.ThoughtID != nil { + if err := t.store.AddThoughtAttachment(ctx, *created.ThoughtID, thoughtAttachmentFromFile(created)); err != nil { + return SaveFileOutput{}, err + } + } + if created.ProjectID != nil { + _ = t.store.TouchProject(ctx, *created.ProjectID) + } + + return SaveFileOutput{File: created}, nil +} + func thoughtAttachmentFromFile(file thoughttypes.StoredFile) thoughttypes.ThoughtAttachment { return thoughttypes.ThoughtAttachment{ FileID: file.ID, @@ -238,6 +261,15 @@ func normalizeMediaType(explicit string, fromDataURL string, content []byte) str } } +func firstNonEmpty(values ...string) string { + for _, value := range values { + if strings.TrimSpace(value) != "" { + return strings.TrimSpace(value) + } + } + return "" +} + func normalizeFileKind(explicit string, mediaType string) string { if explicit != "" { return explicit diff --git a/internal/tools/metadata_retry.go b/internal/tools/metadata_retry.go new file mode 100644 index 0000000..39f79ee --- /dev/null +++ b/internal/tools/metadata_retry.go @@ -0,0 +1,206 @@ +package tools + +import ( + "context" + "log/slog" + "sync" + "time" + + "github.com/google/uuid" + "github.com/modelcontextprotocol/go-sdk/mcp" + "golang.org/x/sync/semaphore" + + "git.warky.dev/wdevs/amcs/internal/ai" + "git.warky.dev/wdevs/amcs/internal/config" + "git.warky.dev/wdevs/amcs/internal/metadata" + "git.warky.dev/wdevs/amcs/internal/session" + "git.warky.dev/wdevs/amcs/internal/store" + thoughttypes "git.warky.dev/wdevs/amcs/internal/types" +) + +const metadataRetryConcurrency = 4 + +type MetadataRetryer struct { + backgroundCtx context.Context + store *store.DB + provider ai.Provider + capture config.CaptureConfig + sessions *session.ActiveProjects + metadataTimeout time.Duration + logger *slog.Logger +} + +type RetryMetadataTool struct { + retryer *MetadataRetryer +} + +type RetryMetadataInput struct { + Project string `json:"project,omitempty" jsonschema:"optional project name or id to scope the retry"` + Limit int `json:"limit,omitempty" jsonschema:"maximum number of thoughts to process in one call; defaults to 100"` + IncludeArchived bool `json:"include_archived,omitempty" jsonschema:"whether to include archived thoughts; defaults to false"` + OlderThanDays int `json:"older_than_days,omitempty" jsonschema:"only retry thoughts whose last metadata attempt was at least N days ago; 0 means no restriction"` + DryRun bool `json:"dry_run,omitempty" jsonschema:"report counts without retrying metadata extraction"` +} + +type RetryMetadataFailure struct { + ID string `json:"id"` + Error string `json:"error"` +} + +type RetryMetadataOutput struct { + Scanned int `json:"scanned"` + Retried int `json:"retried"` + Updated int `json:"updated"` + Skipped int `json:"skipped"` + Failed int `json:"failed"` + DryRun bool `json:"dry_run"` + Failures []RetryMetadataFailure `json:"failures,omitempty"` +} + +func NewMetadataRetryer(backgroundCtx context.Context, db *store.DB, provider ai.Provider, capture config.CaptureConfig, metadataTimeout time.Duration, sessions *session.ActiveProjects, logger *slog.Logger) *MetadataRetryer { + if backgroundCtx == nil { + backgroundCtx = context.Background() + } + return &MetadataRetryer{ + backgroundCtx: backgroundCtx, + store: db, + provider: provider, + capture: capture, + sessions: sessions, + metadataTimeout: metadataTimeout, + logger: logger, + } +} + +func NewRetryMetadataTool(retryer *MetadataRetryer) *RetryMetadataTool { + return &RetryMetadataTool{retryer: retryer} +} + +func (t *RetryMetadataTool) Handle(ctx context.Context, req *mcp.CallToolRequest, in RetryMetadataInput) (*mcp.CallToolResult, RetryMetadataOutput, error) { + return t.retryer.Handle(ctx, req, in) +} + +func (r *MetadataRetryer) QueueThought(id uuid.UUID) { + go func() { + attemptCtx := r.backgroundCtx + if r.metadataTimeout > 0 { + var cancel context.CancelFunc + attemptCtx, cancel = context.WithTimeout(r.backgroundCtx, r.metadataTimeout) + defer cancel() + } + if _, err := r.retryOne(attemptCtx, id); err != nil { + r.logger.Warn("background metadata retry failed", slog.String("thought_id", id.String()), slog.String("error", err.Error())) + } + }() +} + +func (r *MetadataRetryer) Handle(ctx context.Context, req *mcp.CallToolRequest, in RetryMetadataInput) (*mcp.CallToolResult, RetryMetadataOutput, error) { + limit := in.Limit + if limit <= 0 { + limit = 100 + } + + project, err := resolveProject(ctx, r.store, r.sessions, req, in.Project, false) + if err != nil { + return nil, RetryMetadataOutput{}, err + } + + var projectID *uuid.UUID + if project != nil { + projectID = &project.ID + } + + thoughts, err := r.store.ListThoughtsPendingMetadataRetry(ctx, limit, projectID, in.IncludeArchived, in.OlderThanDays) + if err != nil { + return nil, RetryMetadataOutput{}, err + } + + out := RetryMetadataOutput{ + Scanned: len(thoughts), + DryRun: in.DryRun, + } + if in.DryRun || len(thoughts) == 0 { + return nil, out, nil + } + + sem := semaphore.NewWeighted(metadataRetryConcurrency) + var mu sync.Mutex + var wg sync.WaitGroup + + for _, thought := range thoughts { + if ctx.Err() != nil { + break + } + if err := sem.Acquire(ctx, 1); err != nil { + break + } + + wg.Add(1) + go func(thought thoughttypes.Thought) { + defer wg.Done() + defer sem.Release(1) + + mu.Lock() + out.Retried++ + mu.Unlock() + + updated, err := r.retryOne(ctx, thought.ID) + if err != nil { + mu.Lock() + out.Failures = append(out.Failures, RetryMetadataFailure{ID: thought.ID.String(), Error: err.Error()}) + mu.Unlock() + return + } + if updated { + mu.Lock() + out.Updated++ + mu.Unlock() + return + } + + mu.Lock() + out.Skipped++ + mu.Unlock() + }(thought) + } + + wg.Wait() + out.Failed = len(out.Failures) + + return nil, out, nil +} + +func (r *MetadataRetryer) retryOne(ctx context.Context, id uuid.UUID) (bool, error) { + thought, err := r.store.GetThought(ctx, id) + if err != nil { + return false, err + } + if thought.Metadata.MetadataStatus == metadata.MetadataStatusComplete { + return false, nil + } + + attemptCtx := ctx + if r.metadataTimeout > 0 { + var cancel context.CancelFunc + attemptCtx, cancel = context.WithTimeout(ctx, r.metadataTimeout) + defer cancel() + } + + attemptedAt := time.Now().UTC() + extracted, extractErr := r.provider.ExtractMetadata(attemptCtx, thought.Content) + if extractErr != nil { + failedMetadata := metadata.MarkMetadataFailed(thought.Metadata, r.capture, attemptedAt, extractErr) + if _, updateErr := r.store.UpdateThoughtMetadata(ctx, thought.ID, failedMetadata); updateErr != nil { + return false, updateErr + } + return false, extractErr + } + + completedMetadata := metadata.MarkMetadataComplete(extracted, r.capture, attemptedAt) + completedMetadata.Attachments = thought.Metadata.Attachments + if _, updateErr := r.store.UpdateThoughtMetadata(ctx, thought.ID, completedMetadata); updateErr != nil { + return false, updateErr + } + + return true, nil +} diff --git a/internal/tools/reparse_metadata.go b/internal/tools/reparse_metadata.go index c4edaa7..e64ccc0 100644 --- a/internal/tools/reparse_metadata.go +++ b/internal/tools/reparse_metadata.go @@ -106,15 +106,18 @@ func (t *ReparseMetadataTool) Handle(ctx context.Context, req *mcp.CallToolReque normalizedCurrent := metadata.Normalize(thought.Metadata, t.capture) + attemptedAt := time.Now().UTC() extracted, extractErr := t.provider.ExtractMetadata(ctx, thought.Content) normalizedTarget := normalizedCurrent if extractErr != nil { + normalizedTarget = metadata.MarkMetadataFailed(normalizedCurrent, t.capture, attemptedAt, extractErr) mu.Lock() out.Normalized++ mu.Unlock() t.logger.Warn("metadata reparse extract failed, using normalized existing metadata", slog.String("thought_id", thought.ID.String()), slog.String("error", extractErr.Error())) } else { - normalizedTarget = metadata.Normalize(extracted, t.capture) + normalizedTarget = metadata.MarkMetadataComplete(extracted, t.capture, attemptedAt) + normalizedTarget.Attachments = thought.Metadata.Attachments mu.Lock() out.Reparsed++ mu.Unlock() diff --git a/internal/tools/update.go b/internal/tools/update.go index 4d3a226..3e6c4aa 100644 --- a/internal/tools/update.go +++ b/internal/tools/update.go @@ -4,6 +4,7 @@ import ( "context" "log/slog" "strings" + "time" "github.com/modelcontextprotocol/go-sdk/mcp" @@ -64,8 +65,9 @@ func (t *UpdateTool) Handle(ctx context.Context, _ *mcp.CallToolRequest, in Upda extracted, extractErr := t.provider.ExtractMetadata(ctx, content) if extractErr != nil { t.log.Warn("metadata extraction failed during update, keeping current metadata", slog.String("error", extractErr.Error())) + mergedMetadata = metadata.MarkMetadataFailed(mergedMetadata, t.capture, time.Now().UTC(), extractErr) } else { - mergedMetadata = metadata.Normalize(extracted, t.capture) + mergedMetadata = metadata.MarkMetadataComplete(extracted, t.capture, time.Now().UTC()) mergedMetadata.Attachments = current.Metadata.Attachments } } diff --git a/internal/types/thought.go b/internal/types/thought.go index 3e0783a..3a68021 100644 --- a/internal/types/thought.go +++ b/internal/types/thought.go @@ -7,13 +7,17 @@ import ( ) type ThoughtMetadata struct { - People []string `json:"people"` - ActionItems []string `json:"action_items"` - DatesMentioned []string `json:"dates_mentioned"` - Topics []string `json:"topics"` - Type string `json:"type"` - Source string `json:"source"` - Attachments []ThoughtAttachment `json:"attachments,omitempty"` + People []string `json:"people"` + ActionItems []string `json:"action_items"` + DatesMentioned []string `json:"dates_mentioned"` + Topics []string `json:"topics"` + Type string `json:"type"` + Source string `json:"source"` + Attachments []ThoughtAttachment `json:"attachments,omitempty"` + MetadataStatus string `json:"metadata_status,omitempty"` + MetadataUpdatedAt string `json:"metadata_updated_at,omitempty"` + MetadataLastAttemptedAt string `json:"metadata_last_attempted_at,omitempty"` + MetadataError string `json:"metadata_error,omitempty"` } type ThoughtAttachment struct {