feat: implement file upload handler and related functionality
- Added file upload handler to process both multipart and raw file uploads. - Implemented parsing logic for upload requests, including handling file metadata. - Introduced SaveFileDecodedInput structure for handling decoded file uploads. - Created unit tests for file upload parsing and validation. feat: add metadata retry configuration and functionality - Introduced MetadataRetryConfig to the application configuration. - Implemented MetadataRetryer to handle retrying metadata extraction for thoughts. - Added new tool for retrying failed metadata extractions. - Updated thought metadata structure to include status and timestamps for metadata processing. fix: enhance metadata normalization and error handling - Updated metadata normalization functions to track status and errors. - Improved handling of metadata extraction failures during thought updates and captures. - Ensured that metadata status is correctly set during various operations. refactor: streamline file saving logic in FilesTool - Refactored Save method in FilesTool to utilize new SaveDecoded method. - Simplified project and thought ID resolution logic during file saving.
This commit is contained in:
49
README.md
49
README.md
@@ -46,6 +46,7 @@ A Go MCP server for capturing and retrieving thoughts, memory, and project conte
|
|||||||
| `list_files` | Browse stored files by thought, project, or kind |
|
| `list_files` | Browse stored files by thought, project, or kind |
|
||||||
| `backfill_embeddings` | Generate missing embeddings for stored thoughts |
|
| `backfill_embeddings` | Generate missing embeddings for stored thoughts |
|
||||||
| `reparse_thought_metadata` | Re-extract and normalize metadata for stored thoughts |
|
| `reparse_thought_metadata` | Re-extract and normalize metadata for stored thoughts |
|
||||||
|
| `retry_failed_metadata` | Retry metadata extraction for thoughts still pending or failed |
|
||||||
|
|
||||||
## Configuration
|
## Configuration
|
||||||
|
|
||||||
@@ -115,6 +116,24 @@ Run `reparse_thought_metadata` to fix stale or inconsistent metadata by re-extra
|
|||||||
- If extraction fails for a thought, existing metadata is normalized and written only if it changes
|
- If extraction fails for a thought, existing metadata is normalized and written only if it changes
|
||||||
- Metadata reparse runs in parallel (4 workers); one failure does not abort the run
|
- Metadata reparse runs in parallel (4 workers); one failure does not abort the run
|
||||||
|
|
||||||
|
## Failed Metadata Retry
|
||||||
|
|
||||||
|
`capture_thought` now stores the thought even when metadata extraction times out or fails. Those thoughts are marked with `metadata_status: "pending"` and retried in the background. Use `retry_failed_metadata` to sweep any thoughts still marked `pending` or `failed`.
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"project": "optional-project-name",
|
||||||
|
"limit": 100,
|
||||||
|
"include_archived": false,
|
||||||
|
"older_than_days": 1,
|
||||||
|
"dry_run": false
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
- `dry_run: true` scans only and does not call metadata extraction or write updates
|
||||||
|
- successful retries mark the thought metadata as `complete` and clear the last error
|
||||||
|
- failed retries update the retry markers so the daily sweep can pick them up again later
|
||||||
|
|
||||||
## File Storage
|
## File Storage
|
||||||
|
|
||||||
Use `save_file` to persist binary files as base64. Files can optionally be linked to a memory by passing `thought_id`, which also adds an attachment reference to that thought's metadata.
|
Use `save_file` to persist binary files as base64. Files can optionally be linked to a memory by passing `thought_id`, which also adds an attachment reference to that thought's metadata.
|
||||||
@@ -148,6 +167,27 @@ List files for a thought or project with:
|
|||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
AMCS also supports direct authenticated HTTP uploads to `/files` for clients that want to stream file bodies instead of base64-encoding them into an MCP tool call.
|
||||||
|
|
||||||
|
Multipart upload:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
curl -X POST http://localhost:8080/files \
|
||||||
|
-H "x-brain-key: <key>" \
|
||||||
|
-F "file=@./diagram.png" \
|
||||||
|
-F "project=amcs" \
|
||||||
|
-F "kind=image"
|
||||||
|
```
|
||||||
|
|
||||||
|
Raw body upload:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
curl -X POST "http://localhost:8080/files?project=amcs&name=meeting-notes.pdf" \
|
||||||
|
-H "x-brain-key: <key>" \
|
||||||
|
-H "Content-Type: application/pdf" \
|
||||||
|
--data-binary @./meeting-notes.pdf
|
||||||
|
```
|
||||||
|
|
||||||
**Automatic backfill** (optional, config-gated):
|
**Automatic backfill** (optional, config-gated):
|
||||||
|
|
||||||
```yaml
|
```yaml
|
||||||
@@ -160,6 +200,15 @@ backfill:
|
|||||||
include_archived: false
|
include_archived: false
|
||||||
```
|
```
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
metadata_retry:
|
||||||
|
enabled: true
|
||||||
|
run_on_startup: true # retry failed metadata once on server start
|
||||||
|
interval: "24h" # retry pending/failed metadata daily
|
||||||
|
max_per_run: 100
|
||||||
|
include_archived: false
|
||||||
|
```
|
||||||
|
|
||||||
**Search fallback**: when no embeddings exist for the active model in scope, `search_thoughts`, `recall_context`, `get_project_context`, `summarize_thoughts`, and `related_thoughts` automatically fall back to Postgres full-text search so results are never silently empty.
|
**Search fallback**: when no embeddings exist for the active model in scope, `search_thoughts`, `recall_context`, `get_project_context`, `summarize_thoughts`, and `related_thoughts` automatically fall back to Postgres full-text search so results are never silently empty.
|
||||||
|
|
||||||
## Client Setup
|
## Client Setup
|
||||||
|
|||||||
@@ -90,3 +90,10 @@ backfill:
|
|||||||
batch_size: 20
|
batch_size: 20
|
||||||
max_per_run: 100
|
max_per_run: 100
|
||||||
include_archived: false
|
include_archived: false
|
||||||
|
|
||||||
|
metadata_retry:
|
||||||
|
enabled: false
|
||||||
|
run_on_startup: false
|
||||||
|
interval: "24h"
|
||||||
|
max_per_run: 100
|
||||||
|
include_archived: false
|
||||||
|
|||||||
@@ -80,3 +80,10 @@ logging:
|
|||||||
observability:
|
observability:
|
||||||
metrics_enabled: true
|
metrics_enabled: true
|
||||||
pprof_enabled: false
|
pprof_enabled: false
|
||||||
|
|
||||||
|
metadata_retry:
|
||||||
|
enabled: false
|
||||||
|
run_on_startup: false
|
||||||
|
interval: "24h"
|
||||||
|
max_per_run: 100
|
||||||
|
include_archived: false
|
||||||
|
|||||||
@@ -79,3 +79,10 @@ logging:
|
|||||||
observability:
|
observability:
|
||||||
metrics_enabled: true
|
metrics_enabled: true
|
||||||
pprof_enabled: false
|
pprof_enabled: false
|
||||||
|
|
||||||
|
metadata_retry:
|
||||||
|
enabled: false
|
||||||
|
run_on_startup: false
|
||||||
|
interval: "24h"
|
||||||
|
max_per_run: 100
|
||||||
|
include_archived: false
|
||||||
|
|||||||
@@ -93,6 +93,25 @@ func Run(ctx context.Context, configPath string) error {
|
|||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if cfg.MetadataRetry.Enabled && cfg.MetadataRetry.RunOnStartup {
|
||||||
|
go runMetadataRetryPass(ctx, db, provider, cfg, activeProjects, logger)
|
||||||
|
}
|
||||||
|
|
||||||
|
if cfg.MetadataRetry.Enabled && cfg.MetadataRetry.Interval > 0 {
|
||||||
|
go func() {
|
||||||
|
ticker := time.NewTicker(cfg.MetadataRetry.Interval)
|
||||||
|
defer ticker.Stop()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
runMetadataRetryPass(ctx, db, provider, cfg, activeProjects, logger)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
server := &http.Server{
|
server := &http.Server{
|
||||||
Addr: fmt.Sprintf("%s:%d", cfg.Server.Host, cfg.Server.Port),
|
Addr: fmt.Sprintf("%s:%d", cfg.Server.Host, cfg.Server.Port),
|
||||||
Handler: routes(logger, cfg, db, provider, keyring, oauthRegistry, tokenStore, authCodes, dynClients, activeProjects),
|
Handler: routes(logger, cfg, db, provider, keyring, oauthRegistry, tokenStore, authCodes, dynClients, activeProjects),
|
||||||
@@ -127,9 +146,12 @@ func Run(ctx context.Context, configPath string) error {
|
|||||||
|
|
||||||
func routes(logger *slog.Logger, cfg *config.Config, db *store.DB, provider ai.Provider, keyring *auth.Keyring, oauthRegistry *auth.OAuthRegistry, tokenStore *auth.TokenStore, authCodes *auth.AuthCodeStore, dynClients *auth.DynamicClientStore, activeProjects *session.ActiveProjects) http.Handler {
|
func routes(logger *slog.Logger, cfg *config.Config, db *store.DB, provider ai.Provider, keyring *auth.Keyring, oauthRegistry *auth.OAuthRegistry, tokenStore *auth.TokenStore, authCodes *auth.AuthCodeStore, dynClients *auth.DynamicClientStore, activeProjects *session.ActiveProjects) http.Handler {
|
||||||
mux := http.NewServeMux()
|
mux := http.NewServeMux()
|
||||||
|
authMiddleware := auth.Middleware(cfg.Auth, keyring, oauthRegistry, tokenStore, logger)
|
||||||
|
filesTool := tools.NewFilesTool(db, activeProjects)
|
||||||
|
metadataRetryer := tools.NewMetadataRetryer(context.Background(), db, provider, cfg.Capture, cfg.AI.Metadata.Timeout, activeProjects, logger)
|
||||||
|
|
||||||
toolSet := mcpserver.ToolSet{
|
toolSet := mcpserver.ToolSet{
|
||||||
Capture: tools.NewCaptureTool(db, provider, cfg.Capture, cfg.AI.Metadata.Timeout, activeProjects, logger),
|
Capture: tools.NewCaptureTool(db, provider, cfg.Capture, cfg.AI.Metadata.Timeout, activeProjects, metadataRetryer, logger),
|
||||||
Search: tools.NewSearchTool(db, provider, cfg.Search, activeProjects),
|
Search: tools.NewSearchTool(db, provider, cfg.Search, activeProjects),
|
||||||
List: tools.NewListTool(db, cfg.Search, activeProjects),
|
List: tools.NewListTool(db, cfg.Search, activeProjects),
|
||||||
Stats: tools.NewStatsTool(db),
|
Stats: tools.NewStatsTool(db),
|
||||||
@@ -142,9 +164,10 @@ func routes(logger *slog.Logger, cfg *config.Config, db *store.DB, provider ai.P
|
|||||||
Recall: tools.NewRecallTool(db, provider, cfg.Search, activeProjects),
|
Recall: tools.NewRecallTool(db, provider, cfg.Search, activeProjects),
|
||||||
Summarize: tools.NewSummarizeTool(db, provider, cfg.Search, activeProjects),
|
Summarize: tools.NewSummarizeTool(db, provider, cfg.Search, activeProjects),
|
||||||
Links: tools.NewLinksTool(db, provider, cfg.Search),
|
Links: tools.NewLinksTool(db, provider, cfg.Search),
|
||||||
Files: tools.NewFilesTool(db, activeProjects),
|
Files: filesTool,
|
||||||
Backfill: tools.NewBackfillTool(db, provider, activeProjects, logger),
|
Backfill: tools.NewBackfillTool(db, provider, activeProjects, logger),
|
||||||
Reparse: tools.NewReparseMetadataTool(db, provider, cfg.Capture, activeProjects, logger),
|
Reparse: tools.NewReparseMetadataTool(db, provider, cfg.Capture, activeProjects, logger),
|
||||||
|
RetryMetadata: tools.NewRetryMetadataTool(metadataRetryer),
|
||||||
Household: tools.NewHouseholdTool(db),
|
Household: tools.NewHouseholdTool(db),
|
||||||
Maintenance: tools.NewMaintenanceTool(db),
|
Maintenance: tools.NewMaintenanceTool(db),
|
||||||
Calendar: tools.NewCalendarTool(db),
|
Calendar: tools.NewCalendarTool(db),
|
||||||
@@ -153,7 +176,8 @@ func routes(logger *slog.Logger, cfg *config.Config, db *store.DB, provider ai.P
|
|||||||
}
|
}
|
||||||
|
|
||||||
mcpHandler := mcpserver.New(cfg.MCP, toolSet)
|
mcpHandler := mcpserver.New(cfg.MCP, toolSet)
|
||||||
mux.Handle(cfg.MCP.Path, auth.Middleware(cfg.Auth, keyring, oauthRegistry, tokenStore, logger)(mcpHandler))
|
mux.Handle(cfg.MCP.Path, authMiddleware(mcpHandler))
|
||||||
|
mux.Handle("/files", authMiddleware(fileUploadHandler(filesTool)))
|
||||||
if oauthRegistry != nil && tokenStore != nil {
|
if oauthRegistry != nil && tokenStore != nil {
|
||||||
mux.HandleFunc("/.well-known/oauth-authorization-server", oauthMetadataHandler())
|
mux.HandleFunc("/.well-known/oauth-authorization-server", oauthMetadataHandler())
|
||||||
mux.HandleFunc("/oauth-authorization-server", oauthMetadataHandler())
|
mux.HandleFunc("/oauth-authorization-server", oauthMetadataHandler())
|
||||||
@@ -245,6 +269,25 @@ func routes(logger *slog.Logger, cfg *config.Config, db *store.DB, provider ai.P
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func runMetadataRetryPass(ctx context.Context, db *store.DB, provider ai.Provider, cfg *config.Config, activeProjects *session.ActiveProjects, logger *slog.Logger) {
|
||||||
|
retryer := tools.NewMetadataRetryer(ctx, db, provider, cfg.Capture, cfg.AI.Metadata.Timeout, activeProjects, logger)
|
||||||
|
_, out, err := retryer.Handle(ctx, nil, tools.RetryMetadataInput{
|
||||||
|
Limit: cfg.MetadataRetry.MaxPerRun,
|
||||||
|
IncludeArchived: cfg.MetadataRetry.IncludeArchived,
|
||||||
|
OlderThanDays: 1,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
logger.Error("auto metadata retry failed", slog.String("error", err.Error()))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
logger.Info("auto metadata retry pass",
|
||||||
|
slog.Int("scanned", out.Scanned),
|
||||||
|
slog.Int("retried", out.Retried),
|
||||||
|
slog.Int("updated", out.Updated),
|
||||||
|
slog.Int("failed", out.Failed),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
func runBackfillPass(ctx context.Context, db *store.DB, provider ai.Provider, cfg config.BackfillConfig, logger *slog.Logger) {
|
func runBackfillPass(ctx context.Context, db *store.DB, provider ai.Provider, cfg config.BackfillConfig, logger *slog.Logger) {
|
||||||
backfiller := tools.NewBackfillTool(db, provider, nil, logger)
|
backfiller := tools.NewBackfillTool(db, provider, nil, logger)
|
||||||
_, out, err := backfiller.Handle(ctx, nil, tools.BackfillInput{
|
_, out, err := backfiller.Handle(ctx, nil, tools.BackfillInput{
|
||||||
|
|||||||
112
internal/app/files.go
Normal file
112
internal/app/files.go
Normal file
@@ -0,0 +1,112 @@
|
|||||||
|
package app
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"io"
|
||||||
|
"mime"
|
||||||
|
"net/http"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"git.warky.dev/wdevs/amcs/internal/tools"
|
||||||
|
)
|
||||||
|
|
||||||
|
const maxUploadBytes = 50 << 20
|
||||||
|
|
||||||
|
func fileUploadHandler(files *tools.FilesTool) http.Handler {
|
||||||
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if r.Method != http.MethodPost {
|
||||||
|
w.Header().Set("Allow", http.MethodPost)
|
||||||
|
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
r.Body = http.MaxBytesReader(w, r.Body, maxUploadBytes)
|
||||||
|
|
||||||
|
in, err := parseUploadRequest(r)
|
||||||
|
if err != nil {
|
||||||
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
out, err := files.SaveDecoded(r.Context(), nil, in)
|
||||||
|
if err != nil {
|
||||||
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
w.WriteHeader(http.StatusCreated)
|
||||||
|
_ = json.NewEncoder(w).Encode(out)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseUploadRequest(r *http.Request) (tools.SaveFileDecodedInput, error) {
|
||||||
|
contentType := r.Header.Get("Content-Type")
|
||||||
|
mediaType, _, _ := mime.ParseMediaType(contentType)
|
||||||
|
|
||||||
|
if strings.HasPrefix(mediaType, "multipart/form-data") {
|
||||||
|
return parseMultipartUpload(r)
|
||||||
|
}
|
||||||
|
|
||||||
|
return parseRawUpload(r)
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseMultipartUpload(r *http.Request) (tools.SaveFileDecodedInput, error) {
|
||||||
|
if err := r.ParseMultipartForm(maxUploadBytes); err != nil {
|
||||||
|
return tools.SaveFileDecodedInput{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
file, header, err := r.FormFile("file")
|
||||||
|
if err != nil {
|
||||||
|
return tools.SaveFileDecodedInput{}, errors.New("multipart upload requires a file field named \"file\"")
|
||||||
|
}
|
||||||
|
defer file.Close()
|
||||||
|
|
||||||
|
content, err := io.ReadAll(file)
|
||||||
|
if err != nil {
|
||||||
|
return tools.SaveFileDecodedInput{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return tools.SaveFileDecodedInput{
|
||||||
|
Name: firstNonEmpty(r.FormValue("name"), header.Filename),
|
||||||
|
Content: content,
|
||||||
|
MediaType: firstNonEmpty(r.FormValue("media_type"), header.Header.Get("Content-Type")),
|
||||||
|
Kind: r.FormValue("kind"),
|
||||||
|
ThoughtID: r.FormValue("thought_id"),
|
||||||
|
Project: r.FormValue("project"),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseRawUpload(r *http.Request) (tools.SaveFileDecodedInput, error) {
|
||||||
|
content, err := io.ReadAll(r.Body)
|
||||||
|
if err != nil {
|
||||||
|
return tools.SaveFileDecodedInput{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
name := firstNonEmpty(
|
||||||
|
r.URL.Query().Get("name"),
|
||||||
|
r.Header.Get("X-File-Name"),
|
||||||
|
)
|
||||||
|
if strings.TrimSpace(name) == "" {
|
||||||
|
return tools.SaveFileDecodedInput{}, errors.New("raw upload requires a file name via query param \"name\" or X-File-Name header")
|
||||||
|
}
|
||||||
|
|
||||||
|
return tools.SaveFileDecodedInput{
|
||||||
|
Name: name,
|
||||||
|
Content: content,
|
||||||
|
MediaType: r.Header.Get("Content-Type"),
|
||||||
|
Kind: firstNonEmpty(r.URL.Query().Get("kind"), r.Header.Get("X-File-Kind")),
|
||||||
|
ThoughtID: firstNonEmpty(r.URL.Query().Get("thought_id"), r.Header.Get("X-Thought-Id")),
|
||||||
|
Project: firstNonEmpty(r.URL.Query().Get("project"), r.Header.Get("X-Project")),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func firstNonEmpty(values ...string) string {
|
||||||
|
for _, value := range values {
|
||||||
|
if strings.TrimSpace(value) != "" {
|
||||||
|
return strings.TrimSpace(value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
57
internal/app/files_test.go
Normal file
57
internal/app/files_test.go
Normal file
@@ -0,0 +1,57 @@
|
|||||||
|
package app
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"mime/multipart"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestParseRawUploadRequiresName(t *testing.T) {
|
||||||
|
req := httptest.NewRequest(http.MethodPost, "/files", bytes.NewReader([]byte("hello")))
|
||||||
|
req.Header.Set("Content-Type", "application/octet-stream")
|
||||||
|
|
||||||
|
_, err := parseRawUpload(req)
|
||||||
|
if err == nil {
|
||||||
|
t.Fatal("expected error for missing name")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestParseMultipartUploadUsesFileMetadata(t *testing.T) {
|
||||||
|
var body bytes.Buffer
|
||||||
|
writer := multipart.NewWriter(&body)
|
||||||
|
|
||||||
|
part, err := writer.CreateFormFile("file", "note.txt")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("create form file: %v", err)
|
||||||
|
}
|
||||||
|
if _, err := part.Write([]byte("hello world")); err != nil {
|
||||||
|
t.Fatalf("write form file: %v", err)
|
||||||
|
}
|
||||||
|
_ = writer.WriteField("project", "amcs")
|
||||||
|
_ = writer.WriteField("kind", "document")
|
||||||
|
if err := writer.Close(); err != nil {
|
||||||
|
t.Fatalf("close writer: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
req := httptest.NewRequest(http.MethodPost, "/files", &body)
|
||||||
|
req.Header.Set("Content-Type", writer.FormDataContentType())
|
||||||
|
|
||||||
|
got, err := parseMultipartUpload(req)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("parse multipart upload: %v", err)
|
||||||
|
}
|
||||||
|
if got.Name != "note.txt" {
|
||||||
|
t.Fatalf("name = %q, want note.txt", got.Name)
|
||||||
|
}
|
||||||
|
if string(got.Content) != "hello world" {
|
||||||
|
t.Fatalf("content = %q, want hello world", string(got.Content))
|
||||||
|
}
|
||||||
|
if got.Project != "amcs" {
|
||||||
|
t.Fatalf("project = %q, want amcs", got.Project)
|
||||||
|
}
|
||||||
|
if got.Kind != "document" {
|
||||||
|
t.Fatalf("kind = %q, want document", got.Kind)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -18,6 +18,7 @@ type Config struct {
|
|||||||
Logging LoggingConfig `yaml:"logging"`
|
Logging LoggingConfig `yaml:"logging"`
|
||||||
Observability ObservabilityConfig `yaml:"observability"`
|
Observability ObservabilityConfig `yaml:"observability"`
|
||||||
Backfill BackfillConfig `yaml:"backfill"`
|
Backfill BackfillConfig `yaml:"backfill"`
|
||||||
|
MetadataRetry MetadataRetryConfig `yaml:"metadata_retry"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type ServerConfig struct {
|
type ServerConfig struct {
|
||||||
@@ -152,6 +153,14 @@ type BackfillConfig struct {
|
|||||||
IncludeArchived bool `yaml:"include_archived"`
|
IncludeArchived bool `yaml:"include_archived"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type MetadataRetryConfig struct {
|
||||||
|
Enabled bool `yaml:"enabled"`
|
||||||
|
RunOnStartup bool `yaml:"run_on_startup"`
|
||||||
|
Interval time.Duration `yaml:"interval"`
|
||||||
|
MaxPerRun int `yaml:"max_per_run"`
|
||||||
|
IncludeArchived bool `yaml:"include_archived"`
|
||||||
|
}
|
||||||
|
|
||||||
func (c AIMetadataConfig) EffectiveFallbackModels() []string {
|
func (c AIMetadataConfig) EffectiveFallbackModels() []string {
|
||||||
models := make([]string, 0, len(c.FallbackModels)+1)
|
models := make([]string, 0, len(c.FallbackModels)+1)
|
||||||
for _, model := range c.FallbackModels {
|
for _, model := range c.FallbackModels {
|
||||||
|
|||||||
@@ -103,6 +103,12 @@ func defaultConfig() Config {
|
|||||||
BatchSize: 20,
|
BatchSize: 20,
|
||||||
MaxPerRun: 100,
|
MaxPerRun: 100,
|
||||||
},
|
},
|
||||||
|
MetadataRetry: MetadataRetryConfig{
|
||||||
|
Enabled: false,
|
||||||
|
RunOnStartup: false,
|
||||||
|
Interval: 24 * time.Hour,
|
||||||
|
MaxPerRun: 100,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -89,6 +89,11 @@ func (c Config) Validate() error {
|
|||||||
return fmt.Errorf("invalid config: backfill.max_per_run must be >= backfill.batch_size")
|
return fmt.Errorf("invalid config: backfill.max_per_run must be >= backfill.batch_size")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if c.MetadataRetry.Enabled {
|
||||||
|
if c.MetadataRetry.MaxPerRun <= 0 {
|
||||||
|
return fmt.Errorf("invalid config: metadata_retry.max_per_run must be greater than zero when metadata_retry is enabled")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -111,3 +111,13 @@ func TestValidateRejectsEmptyAuth(t *testing.T) {
|
|||||||
t.Fatal("Validate() error = nil, want error when neither auth.keys nor auth.oauth.clients is configured")
|
t.Fatal("Validate() error = nil, want error when neither auth.keys nor auth.oauth.clients is configured")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestValidateRejectsInvalidMetadataRetryConfig(t *testing.T) {
|
||||||
|
cfg := validConfig()
|
||||||
|
cfg.MetadataRetry.Enabled = true
|
||||||
|
cfg.MetadataRetry.MaxPerRun = 0
|
||||||
|
|
||||||
|
if err := cfg.Validate(); err == nil {
|
||||||
|
t.Fatal("Validate() error = nil, want error for invalid metadata retry config")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -27,6 +27,7 @@ type ToolSet struct {
|
|||||||
Files *tools.FilesTool
|
Files *tools.FilesTool
|
||||||
Backfill *tools.BackfillTool
|
Backfill *tools.BackfillTool
|
||||||
Reparse *tools.ReparseMetadataTool
|
Reparse *tools.ReparseMetadataTool
|
||||||
|
RetryMetadata *tools.RetryMetadataTool
|
||||||
Household *tools.HouseholdTool
|
Household *tools.HouseholdTool
|
||||||
Maintenance *tools.MaintenanceTool
|
Maintenance *tools.MaintenanceTool
|
||||||
Calendar *tools.CalendarTool
|
Calendar *tools.CalendarTool
|
||||||
@@ -150,6 +151,11 @@ func New(cfg config.MCPConfig, toolSet ToolSet) http.Handler {
|
|||||||
Description: "Re-extract and normalize metadata for stored thoughts from their content.",
|
Description: "Re-extract and normalize metadata for stored thoughts from their content.",
|
||||||
}, toolSet.Reparse.Handle)
|
}, toolSet.Reparse.Handle)
|
||||||
|
|
||||||
|
addTool(server, &mcp.Tool{
|
||||||
|
Name: "retry_failed_metadata",
|
||||||
|
Description: "Retry metadata extraction for thoughts still marked pending or failed.",
|
||||||
|
}, toolSet.RetryMetadata.Handle)
|
||||||
|
|
||||||
// Household Knowledge
|
// Household Knowledge
|
||||||
addTool(server, &mcp.Tool{
|
addTool(server, &mcp.Tool{
|
||||||
Name: "add_household_item",
|
Name: "add_household_item",
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ package metadata
|
|||||||
import (
|
import (
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"git.warky.dev/wdevs/amcs/internal/config"
|
"git.warky.dev/wdevs/amcs/internal/config"
|
||||||
thoughttypes "git.warky.dev/wdevs/amcs/internal/types"
|
thoughttypes "git.warky.dev/wdevs/amcs/internal/types"
|
||||||
@@ -11,6 +12,9 @@ import (
|
|||||||
const (
|
const (
|
||||||
DefaultType = "observation"
|
DefaultType = "observation"
|
||||||
DefaultTopicFallback = "uncategorized"
|
DefaultTopicFallback = "uncategorized"
|
||||||
|
MetadataStatusComplete = "complete"
|
||||||
|
MetadataStatusPending = "pending"
|
||||||
|
MetadataStatusFailed = "failed"
|
||||||
maxTopics = 10
|
maxTopics = 10
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -36,6 +40,7 @@ func Fallback(capture config.CaptureConfig) thoughttypes.ThoughtMetadata {
|
|||||||
Type: normalizeType(capture.MetadataDefaults.Type),
|
Type: normalizeType(capture.MetadataDefaults.Type),
|
||||||
Source: normalizeSource(capture.Source),
|
Source: normalizeSource(capture.Source),
|
||||||
Attachments: []thoughttypes.ThoughtAttachment{},
|
Attachments: []thoughttypes.ThoughtAttachment{},
|
||||||
|
MetadataStatus: MetadataStatusComplete,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -48,6 +53,10 @@ func Normalize(in thoughttypes.ThoughtMetadata, capture config.CaptureConfig) th
|
|||||||
Type: normalizeType(in.Type),
|
Type: normalizeType(in.Type),
|
||||||
Source: normalizeSource(in.Source),
|
Source: normalizeSource(in.Source),
|
||||||
Attachments: normalizeAttachments(in.Attachments),
|
Attachments: normalizeAttachments(in.Attachments),
|
||||||
|
MetadataStatus: normalizeMetadataStatus(in.MetadataStatus),
|
||||||
|
MetadataUpdatedAt: strings.TrimSpace(in.MetadataUpdatedAt),
|
||||||
|
MetadataLastAttemptedAt: strings.TrimSpace(in.MetadataLastAttemptedAt),
|
||||||
|
MetadataError: strings.TrimSpace(in.MetadataError),
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(out.Topics) == 0 {
|
if len(out.Topics) == 0 {
|
||||||
@@ -59,10 +68,48 @@ func Normalize(in thoughttypes.ThoughtMetadata, capture config.CaptureConfig) th
|
|||||||
if out.Source == "" {
|
if out.Source == "" {
|
||||||
out.Source = Fallback(capture).Source
|
out.Source = Fallback(capture).Source
|
||||||
}
|
}
|
||||||
|
if out.MetadataStatus == "" {
|
||||||
|
out.MetadataStatus = MetadataStatusComplete
|
||||||
|
}
|
||||||
|
if out.MetadataStatus == MetadataStatusComplete {
|
||||||
|
out.MetadataError = ""
|
||||||
|
}
|
||||||
|
|
||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func MarkMetadataPending(base thoughttypes.ThoughtMetadata, capture config.CaptureConfig, attempt time.Time, err error) thoughttypes.ThoughtMetadata {
|
||||||
|
out := Normalize(base, capture)
|
||||||
|
out.MetadataStatus = MetadataStatusPending
|
||||||
|
out.MetadataLastAttemptedAt = attempt.UTC().Format(time.RFC3339)
|
||||||
|
if err != nil {
|
||||||
|
out.MetadataError = strings.TrimSpace(err.Error())
|
||||||
|
}
|
||||||
|
out.MetadataUpdatedAt = strings.TrimSpace(base.MetadataUpdatedAt)
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
func MarkMetadataFailed(base thoughttypes.ThoughtMetadata, capture config.CaptureConfig, attempt time.Time, err error) thoughttypes.ThoughtMetadata {
|
||||||
|
out := Normalize(base, capture)
|
||||||
|
out.MetadataStatus = MetadataStatusFailed
|
||||||
|
out.MetadataLastAttemptedAt = attempt.UTC().Format(time.RFC3339)
|
||||||
|
if err != nil {
|
||||||
|
out.MetadataError = strings.TrimSpace(err.Error())
|
||||||
|
}
|
||||||
|
out.MetadataUpdatedAt = strings.TrimSpace(base.MetadataUpdatedAt)
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
func MarkMetadataComplete(base thoughttypes.ThoughtMetadata, capture config.CaptureConfig, updatedAt time.Time) thoughttypes.ThoughtMetadata {
|
||||||
|
out := Normalize(base, capture)
|
||||||
|
out.MetadataStatus = MetadataStatusComplete
|
||||||
|
timestamp := updatedAt.UTC().Format(time.RFC3339)
|
||||||
|
out.MetadataUpdatedAt = timestamp
|
||||||
|
out.MetadataLastAttemptedAt = timestamp
|
||||||
|
out.MetadataError = ""
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
func normalizeList(values []string, limit int) []string {
|
func normalizeList(values []string, limit int) []string {
|
||||||
seen := make(map[string]struct{}, len(values))
|
seen := make(map[string]struct{}, len(values))
|
||||||
result := make([]string, 0, len(values))
|
result := make([]string, 0, len(values))
|
||||||
@@ -100,6 +147,19 @@ func normalizeType(value string) string {
|
|||||||
return DefaultType
|
return DefaultType
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func normalizeMetadataStatus(value string) string {
|
||||||
|
switch strings.ToLower(strings.TrimSpace(value)) {
|
||||||
|
case "", MetadataStatusComplete:
|
||||||
|
return MetadataStatusComplete
|
||||||
|
case MetadataStatusPending:
|
||||||
|
return MetadataStatusPending
|
||||||
|
case MetadataStatusFailed:
|
||||||
|
return MetadataStatusFailed
|
||||||
|
default:
|
||||||
|
return MetadataStatusComplete
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func normalizeSource(value string) string {
|
func normalizeSource(value string) string {
|
||||||
normalized := strings.TrimSpace(value)
|
normalized := strings.TrimSpace(value)
|
||||||
if normalized == "" {
|
if normalized == "" {
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ package metadata
|
|||||||
import (
|
import (
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
|
|
||||||
@@ -31,6 +32,9 @@ func TestFallbackUsesConfiguredDefaults(t *testing.T) {
|
|||||||
if got.Source != "mcp" {
|
if got.Source != "mcp" {
|
||||||
t.Fatalf("Fallback source = %q, want mcp", got.Source)
|
t.Fatalf("Fallback source = %q, want mcp", got.Source)
|
||||||
}
|
}
|
||||||
|
if got.MetadataStatus != MetadataStatusComplete {
|
||||||
|
t.Fatalf("Fallback metadata status = %q, want complete", got.MetadataStatus)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestNormalizeTrimsDedupesAndCapsTopics(t *testing.T) {
|
func TestNormalizeTrimsDedupesAndCapsTopics(t *testing.T) {
|
||||||
@@ -102,3 +106,56 @@ func TestNormalizeDedupesAttachmentsByFileID(t *testing.T) {
|
|||||||
t.Fatalf("Attachment kind = %q, want image", got.Attachments[0].Kind)
|
t.Fatalf("Attachment kind = %q, want image", got.Attachments[0].Kind)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMarkMetadataPendingTracksAttemptWithoutClearingPreviousSuccess(t *testing.T) {
|
||||||
|
attempt := time.Date(2026, 3, 30, 10, 0, 0, 0, time.UTC)
|
||||||
|
base := thoughttypes.ThoughtMetadata{
|
||||||
|
Topics: []string{"go"},
|
||||||
|
MetadataUpdatedAt: "2026-03-29T10:00:00Z",
|
||||||
|
}
|
||||||
|
|
||||||
|
got := MarkMetadataPending(base, testCaptureConfig(), attempt, errTestMetadataFailure)
|
||||||
|
if got.MetadataStatus != MetadataStatusPending {
|
||||||
|
t.Fatalf("MetadataStatus = %q, want pending", got.MetadataStatus)
|
||||||
|
}
|
||||||
|
if got.MetadataUpdatedAt != "2026-03-29T10:00:00Z" {
|
||||||
|
t.Fatalf("MetadataUpdatedAt = %q, want previous success timestamp", got.MetadataUpdatedAt)
|
||||||
|
}
|
||||||
|
if got.MetadataLastAttemptedAt != "2026-03-30T10:00:00Z" {
|
||||||
|
t.Fatalf("MetadataLastAttemptedAt = %q, want attempt timestamp", got.MetadataLastAttemptedAt)
|
||||||
|
}
|
||||||
|
if got.MetadataError == "" {
|
||||||
|
t.Fatal("MetadataError is empty, want failure message")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMarkMetadataCompleteClearsErrorAndSetsTimestamps(t *testing.T) {
|
||||||
|
attempt := time.Date(2026, 3, 30, 10, 0, 0, 0, time.UTC)
|
||||||
|
|
||||||
|
got := MarkMetadataComplete(thoughttypes.ThoughtMetadata{
|
||||||
|
Topics: []string{"go"},
|
||||||
|
MetadataStatus: MetadataStatusFailed,
|
||||||
|
MetadataError: "timeout",
|
||||||
|
}, testCaptureConfig(), attempt)
|
||||||
|
|
||||||
|
if got.MetadataStatus != MetadataStatusComplete {
|
||||||
|
t.Fatalf("MetadataStatus = %q, want complete", got.MetadataStatus)
|
||||||
|
}
|
||||||
|
if got.MetadataUpdatedAt != "2026-03-30T10:00:00Z" {
|
||||||
|
t.Fatalf("MetadataUpdatedAt = %q, want completion timestamp", got.MetadataUpdatedAt)
|
||||||
|
}
|
||||||
|
if got.MetadataLastAttemptedAt != "2026-03-30T10:00:00Z" {
|
||||||
|
t.Fatalf("MetadataLastAttemptedAt = %q, want completion timestamp", got.MetadataLastAttemptedAt)
|
||||||
|
}
|
||||||
|
if got.MetadataError != "" {
|
||||||
|
t.Fatalf("MetadataError = %q, want empty", got.MetadataError)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var errTestMetadataFailure = testError("timeout")
|
||||||
|
|
||||||
|
type testError string
|
||||||
|
|
||||||
|
func (e testError) Error() string {
|
||||||
|
return string(e)
|
||||||
|
}
|
||||||
|
|||||||
@@ -277,6 +277,28 @@ func (db *DB) UpdateThought(ctx context.Context, id uuid.UUID, content string, e
|
|||||||
return db.GetThought(ctx, id)
|
return db.GetThought(ctx, id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (db *DB) UpdateThoughtMetadata(ctx context.Context, id uuid.UUID, metadata thoughttypes.ThoughtMetadata) (thoughttypes.Thought, error) {
|
||||||
|
metadataBytes, err := json.Marshal(metadata)
|
||||||
|
if err != nil {
|
||||||
|
return thoughttypes.Thought{}, fmt.Errorf("marshal updated metadata: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
tag, err := db.pool.Exec(ctx, `
|
||||||
|
update thoughts
|
||||||
|
set metadata = $2::jsonb,
|
||||||
|
updated_at = now()
|
||||||
|
where guid = $1
|
||||||
|
`, id, metadataBytes)
|
||||||
|
if err != nil {
|
||||||
|
return thoughttypes.Thought{}, fmt.Errorf("update thought metadata: %w", err)
|
||||||
|
}
|
||||||
|
if tag.RowsAffected() == 0 {
|
||||||
|
return thoughttypes.Thought{}, pgx.ErrNoRows
|
||||||
|
}
|
||||||
|
|
||||||
|
return db.GetThought(ctx, id)
|
||||||
|
}
|
||||||
|
|
||||||
func (db *DB) DeleteThought(ctx context.Context, id uuid.UUID) error {
|
func (db *DB) DeleteThought(ctx context.Context, id uuid.UUID) error {
|
||||||
tag, err := db.pool.Exec(ctx, `delete from thoughts where guid = $1`, id)
|
tag, err := db.pool.Exec(ctx, `delete from thoughts where guid = $1`, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -309,6 +331,58 @@ func (db *DB) RecentThoughts(ctx context.Context, projectID *uuid.UUID, limit in
|
|||||||
return db.ListThoughts(ctx, filter)
|
return db.ListThoughts(ctx, filter)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (db *DB) ListThoughtsPendingMetadataRetry(ctx context.Context, limit int, projectID *uuid.UUID, includeArchived bool, olderThanDays int) ([]thoughttypes.Thought, error) {
|
||||||
|
args := make([]any, 0, 4)
|
||||||
|
conditions := []string{
|
||||||
|
"(metadata->>'metadata_status' = 'pending' or metadata->>'metadata_status' = 'failed')",
|
||||||
|
}
|
||||||
|
|
||||||
|
if !includeArchived {
|
||||||
|
conditions = append(conditions, "archived_at is null")
|
||||||
|
}
|
||||||
|
if projectID != nil {
|
||||||
|
args = append(args, *projectID)
|
||||||
|
conditions = append(conditions, fmt.Sprintf("project_id = $%d", len(args)))
|
||||||
|
}
|
||||||
|
if olderThanDays > 0 {
|
||||||
|
args = append(args, olderThanDays)
|
||||||
|
conditions = append(conditions, fmt.Sprintf("coalesce(nullif(metadata->>'metadata_last_attempted_at', '')::timestamptz, created_at) <= now() - ($%d * interval '1 day')", len(args)))
|
||||||
|
}
|
||||||
|
|
||||||
|
query := `
|
||||||
|
select guid, content, metadata, project_id, archived_at, created_at, updated_at
|
||||||
|
from thoughts
|
||||||
|
where ` + strings.Join(conditions, " and ")
|
||||||
|
|
||||||
|
args = append(args, limit)
|
||||||
|
query += fmt.Sprintf(" order by coalesce(nullif(metadata->>'metadata_last_attempted_at', '')::timestamptz, created_at) asc limit $%d", len(args))
|
||||||
|
|
||||||
|
rows, err := db.pool.Query(ctx, query, args...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("list thoughts pending metadata retry: %w", err)
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
thoughts := make([]thoughttypes.Thought, 0, limit)
|
||||||
|
for rows.Next() {
|
||||||
|
var thought thoughttypes.Thought
|
||||||
|
var metadataBytes []byte
|
||||||
|
if err := rows.Scan(&thought.ID, &thought.Content, &metadataBytes, &thought.ProjectID, &thought.ArchivedAt, &thought.CreatedAt, &thought.UpdatedAt); err != nil {
|
||||||
|
return nil, fmt.Errorf("scan pending metadata retry thought: %w", err)
|
||||||
|
}
|
||||||
|
if err := json.Unmarshal(metadataBytes, &thought.Metadata); err != nil {
|
||||||
|
return nil, fmt.Errorf("decode pending metadata retry thought: %w", err)
|
||||||
|
}
|
||||||
|
thoughts = append(thoughts, thought)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := rows.Err(); err != nil {
|
||||||
|
return nil, fmt.Errorf("iterate pending metadata retry thoughts: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return thoughts, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (db *DB) SearchSimilarThoughts(ctx context.Context, embedding []float32, embeddingModel string, threshold float64, limit int, projectID *uuid.UUID, excludeID *uuid.UUID) ([]thoughttypes.SearchResult, error) {
|
func (db *DB) SearchSimilarThoughts(ctx context.Context, embedding []float32, embeddingModel string, threshold float64, limit int, projectID *uuid.UUID, excludeID *uuid.UUID) ([]thoughttypes.SearchResult, error) {
|
||||||
args := []any{pgvector.NewVector(embedding), threshold, embeddingModel}
|
args := []any{pgvector.NewVector(embedding), threshold, embeddingModel}
|
||||||
conditions := []string{
|
conditions := []string{
|
||||||
|
|||||||
@@ -23,6 +23,7 @@ type CaptureTool struct {
|
|||||||
capture config.CaptureConfig
|
capture config.CaptureConfig
|
||||||
sessions *session.ActiveProjects
|
sessions *session.ActiveProjects
|
||||||
metadataTimeout time.Duration
|
metadataTimeout time.Duration
|
||||||
|
retryer *MetadataRetryer
|
||||||
log *slog.Logger
|
log *slog.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -35,8 +36,8 @@ type CaptureOutput struct {
|
|||||||
Thought thoughttypes.Thought `json:"thought"`
|
Thought thoughttypes.Thought `json:"thought"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewCaptureTool(db *store.DB, provider ai.Provider, capture config.CaptureConfig, metadataTimeout time.Duration, sessions *session.ActiveProjects, log *slog.Logger) *CaptureTool {
|
func NewCaptureTool(db *store.DB, provider ai.Provider, capture config.CaptureConfig, metadataTimeout time.Duration, sessions *session.ActiveProjects, retryer *MetadataRetryer, log *slog.Logger) *CaptureTool {
|
||||||
return &CaptureTool{store: db, provider: provider, capture: capture, sessions: sessions, metadataTimeout: metadataTimeout, log: log}
|
return &CaptureTool{store: db, provider: provider, capture: capture, sessions: sessions, metadataTimeout: metadataTimeout, retryer: retryer, log: log}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *CaptureTool) Handle(ctx context.Context, req *mcp.CallToolRequest, in CaptureInput) (*mcp.CallToolResult, CaptureOutput, error) {
|
func (t *CaptureTool) Handle(ctx context.Context, req *mcp.CallToolRequest, in CaptureInput) (*mcp.CallToolResult, CaptureOutput, error) {
|
||||||
@@ -52,6 +53,7 @@ func (t *CaptureTool) Handle(ctx context.Context, req *mcp.CallToolRequest, in C
|
|||||||
|
|
||||||
var embedding []float32
|
var embedding []float32
|
||||||
rawMetadata := metadata.Fallback(t.capture)
|
rawMetadata := metadata.Fallback(t.capture)
|
||||||
|
metadataNeedsRetry := false
|
||||||
|
|
||||||
group, groupCtx := errgroup.WithContext(ctx)
|
group, groupCtx := errgroup.WithContext(ctx)
|
||||||
group.Go(func() error {
|
group.Go(func() error {
|
||||||
@@ -64,6 +66,7 @@ func (t *CaptureTool) Handle(ctx context.Context, req *mcp.CallToolRequest, in C
|
|||||||
})
|
})
|
||||||
group.Go(func() error {
|
group.Go(func() error {
|
||||||
metaCtx := groupCtx
|
metaCtx := groupCtx
|
||||||
|
attemptedAt := time.Now().UTC()
|
||||||
if t.metadataTimeout > 0 {
|
if t.metadataTimeout > 0 {
|
||||||
var cancel context.CancelFunc
|
var cancel context.CancelFunc
|
||||||
metaCtx, cancel = context.WithTimeout(groupCtx, t.metadataTimeout)
|
metaCtx, cancel = context.WithTimeout(groupCtx, t.metadataTimeout)
|
||||||
@@ -72,9 +75,11 @@ func (t *CaptureTool) Handle(ctx context.Context, req *mcp.CallToolRequest, in C
|
|||||||
extracted, err := t.provider.ExtractMetadata(metaCtx, content)
|
extracted, err := t.provider.ExtractMetadata(metaCtx, content)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.log.Warn("metadata extraction failed, using fallback", slog.String("provider", t.provider.Name()), slog.String("error", err.Error()))
|
t.log.Warn("metadata extraction failed, using fallback", slog.String("provider", t.provider.Name()), slog.String("error", err.Error()))
|
||||||
|
rawMetadata = metadata.MarkMetadataPending(rawMetadata, t.capture, attemptedAt, err)
|
||||||
|
metadataNeedsRetry = true
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
rawMetadata = extracted
|
rawMetadata = metadata.MarkMetadataComplete(extracted, t.capture, attemptedAt)
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
@@ -98,6 +103,9 @@ func (t *CaptureTool) Handle(ctx context.Context, req *mcp.CallToolRequest, in C
|
|||||||
if project != nil {
|
if project != nil {
|
||||||
_ = t.store.TouchProject(ctx, project.ID)
|
_ = t.store.TouchProject(ctx, project.ID)
|
||||||
}
|
}
|
||||||
|
if metadataNeedsRetry && t.retryer != nil {
|
||||||
|
t.retryer.QueueThought(created.ID)
|
||||||
|
}
|
||||||
|
|
||||||
return nil, CaptureOutput{Thought: created}, nil
|
return nil, CaptureOutput{Thought: created}, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -30,6 +30,15 @@ type SaveFileInput struct {
|
|||||||
Project string `json:"project,omitempty" jsonschema:"optional project name or id when saving outside a linked thought"`
|
Project string `json:"project,omitempty" jsonschema:"optional project name or id when saving outside a linked thought"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type SaveFileDecodedInput struct {
|
||||||
|
Name string
|
||||||
|
Content []byte
|
||||||
|
MediaType string
|
||||||
|
Kind string
|
||||||
|
ThoughtID string
|
||||||
|
Project string
|
||||||
|
}
|
||||||
|
|
||||||
type SaveFileOutput struct {
|
type SaveFileOutput struct {
|
||||||
File thoughttypes.StoredFile `json:"file"`
|
File thoughttypes.StoredFile `json:"file"`
|
||||||
}
|
}
|
||||||
@@ -59,11 +68,6 @@ func NewFilesTool(db *store.DB, sessions *session.ActiveProjects) *FilesTool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (t *FilesTool) Save(ctx context.Context, req *mcp.CallToolRequest, in SaveFileInput) (*mcp.CallToolResult, SaveFileOutput, error) {
|
func (t *FilesTool) Save(ctx context.Context, req *mcp.CallToolRequest, in SaveFileInput) (*mcp.CallToolResult, SaveFileOutput, error) {
|
||||||
name := strings.TrimSpace(in.Name)
|
|
||||||
if name == "" {
|
|
||||||
return nil, SaveFileOutput{}, errInvalidInput("name is required")
|
|
||||||
}
|
|
||||||
|
|
||||||
contentBase64, mediaTypeFromDataURL := splitDataURL(strings.TrimSpace(in.ContentBase64))
|
contentBase64, mediaTypeFromDataURL := splitDataURL(strings.TrimSpace(in.ContentBase64))
|
||||||
if contentBase64 == "" {
|
if contentBase64 == "" {
|
||||||
return nil, SaveFileOutput{}, errInvalidInput("content_base64 is required")
|
return nil, SaveFileOutput{}, errInvalidInput("content_base64 is required")
|
||||||
@@ -73,66 +77,18 @@ func (t *FilesTool) Save(ctx context.Context, req *mcp.CallToolRequest, in SaveF
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, SaveFileOutput{}, errInvalidInput("content_base64 must be valid base64")
|
return nil, SaveFileOutput{}, errInvalidInput("content_base64 must be valid base64")
|
||||||
}
|
}
|
||||||
if len(content) == 0 {
|
out, err := t.SaveDecoded(ctx, req, SaveFileDecodedInput{
|
||||||
return nil, SaveFileOutput{}, errInvalidInput("decoded file content must not be empty")
|
Name: in.Name,
|
||||||
}
|
|
||||||
|
|
||||||
project, err := resolveProject(ctx, t.store, t.sessions, req, in.Project, false)
|
|
||||||
if err != nil {
|
|
||||||
return nil, SaveFileOutput{}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
var thoughtID *uuid.UUID
|
|
||||||
var projectID = projectIDPtr(project)
|
|
||||||
if rawThoughtID := strings.TrimSpace(in.ThoughtID); rawThoughtID != "" {
|
|
||||||
parsedThoughtID, err := parseUUID(rawThoughtID)
|
|
||||||
if err != nil {
|
|
||||||
return nil, SaveFileOutput{}, err
|
|
||||||
}
|
|
||||||
thought, err := t.store.GetThought(ctx, parsedThoughtID)
|
|
||||||
if err != nil {
|
|
||||||
return nil, SaveFileOutput{}, err
|
|
||||||
}
|
|
||||||
thoughtID = &parsedThoughtID
|
|
||||||
projectID = thought.ProjectID
|
|
||||||
if project != nil && thought.ProjectID != nil && *thought.ProjectID != project.ID {
|
|
||||||
return nil, SaveFileOutput{}, errInvalidInput("project does not match the linked thought's project")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
mediaType := normalizeMediaType(strings.TrimSpace(in.MediaType), mediaTypeFromDataURL, content)
|
|
||||||
kind := normalizeFileKind(strings.TrimSpace(in.Kind), mediaType)
|
|
||||||
sum := sha256.Sum256(content)
|
|
||||||
|
|
||||||
file := thoughttypes.StoredFile{
|
|
||||||
Name: name,
|
|
||||||
MediaType: mediaType,
|
|
||||||
Kind: kind,
|
|
||||||
Encoding: "base64",
|
|
||||||
SizeBytes: int64(len(content)),
|
|
||||||
SHA256: hex.EncodeToString(sum[:]),
|
|
||||||
Content: content,
|
Content: content,
|
||||||
ProjectID: projectID,
|
MediaType: firstNonEmpty(strings.TrimSpace(in.MediaType), mediaTypeFromDataURL),
|
||||||
}
|
Kind: in.Kind,
|
||||||
if thoughtID != nil {
|
ThoughtID: in.ThoughtID,
|
||||||
file.ThoughtID = thoughtID
|
Project: in.Project,
|
||||||
}
|
})
|
||||||
|
|
||||||
created, err := t.store.InsertStoredFile(ctx, file)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, SaveFileOutput{}, err
|
return nil, SaveFileOutput{}, err
|
||||||
}
|
}
|
||||||
|
return nil, out, nil
|
||||||
if created.ThoughtID != nil {
|
|
||||||
if err := t.store.AddThoughtAttachment(ctx, *created.ThoughtID, thoughtAttachmentFromFile(created)); err != nil {
|
|
||||||
return nil, SaveFileOutput{}, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if created.ProjectID != nil {
|
|
||||||
_ = t.store.TouchProject(ctx, *created.ProjectID)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil, SaveFileOutput{File: created}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *FilesTool) Load(ctx context.Context, _ *mcp.CallToolRequest, in LoadFileInput) (*mcp.CallToolResult, LoadFileOutput, error) {
|
func (t *FilesTool) Load(ctx context.Context, _ *mcp.CallToolRequest, in LoadFileInput) (*mcp.CallToolResult, LoadFileOutput, error) {
|
||||||
@@ -193,6 +149,73 @@ func (t *FilesTool) List(ctx context.Context, req *mcp.CallToolRequest, in ListF
|
|||||||
return nil, ListFilesOutput{Files: files}, nil
|
return nil, ListFilesOutput{Files: files}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *FilesTool) SaveDecoded(ctx context.Context, req *mcp.CallToolRequest, in SaveFileDecodedInput) (SaveFileOutput, error) {
|
||||||
|
name := strings.TrimSpace(in.Name)
|
||||||
|
if name == "" {
|
||||||
|
return SaveFileOutput{}, errInvalidInput("name is required")
|
||||||
|
}
|
||||||
|
if len(in.Content) == 0 {
|
||||||
|
return SaveFileOutput{}, errInvalidInput("decoded file content must not be empty")
|
||||||
|
}
|
||||||
|
|
||||||
|
project, err := resolveProject(ctx, t.store, t.sessions, req, in.Project, false)
|
||||||
|
if err != nil {
|
||||||
|
return SaveFileOutput{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var thoughtID *uuid.UUID
|
||||||
|
var projectID = projectIDPtr(project)
|
||||||
|
if rawThoughtID := strings.TrimSpace(in.ThoughtID); rawThoughtID != "" {
|
||||||
|
parsedThoughtID, err := parseUUID(rawThoughtID)
|
||||||
|
if err != nil {
|
||||||
|
return SaveFileOutput{}, err
|
||||||
|
}
|
||||||
|
thought, err := t.store.GetThought(ctx, parsedThoughtID)
|
||||||
|
if err != nil {
|
||||||
|
return SaveFileOutput{}, err
|
||||||
|
}
|
||||||
|
thoughtID = &parsedThoughtID
|
||||||
|
projectID = thought.ProjectID
|
||||||
|
if project != nil && thought.ProjectID != nil && *thought.ProjectID != project.ID {
|
||||||
|
return SaveFileOutput{}, errInvalidInput("project does not match the linked thought's project")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
mediaType := normalizeMediaType(strings.TrimSpace(in.MediaType), "", in.Content)
|
||||||
|
kind := normalizeFileKind(strings.TrimSpace(in.Kind), mediaType)
|
||||||
|
sum := sha256.Sum256(in.Content)
|
||||||
|
|
||||||
|
file := thoughttypes.StoredFile{
|
||||||
|
Name: name,
|
||||||
|
MediaType: mediaType,
|
||||||
|
Kind: kind,
|
||||||
|
Encoding: "base64",
|
||||||
|
SizeBytes: int64(len(in.Content)),
|
||||||
|
SHA256: hex.EncodeToString(sum[:]),
|
||||||
|
Content: in.Content,
|
||||||
|
ProjectID: projectID,
|
||||||
|
}
|
||||||
|
if thoughtID != nil {
|
||||||
|
file.ThoughtID = thoughtID
|
||||||
|
}
|
||||||
|
|
||||||
|
created, err := t.store.InsertStoredFile(ctx, file)
|
||||||
|
if err != nil {
|
||||||
|
return SaveFileOutput{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if created.ThoughtID != nil {
|
||||||
|
if err := t.store.AddThoughtAttachment(ctx, *created.ThoughtID, thoughtAttachmentFromFile(created)); err != nil {
|
||||||
|
return SaveFileOutput{}, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if created.ProjectID != nil {
|
||||||
|
_ = t.store.TouchProject(ctx, *created.ProjectID)
|
||||||
|
}
|
||||||
|
|
||||||
|
return SaveFileOutput{File: created}, nil
|
||||||
|
}
|
||||||
|
|
||||||
func thoughtAttachmentFromFile(file thoughttypes.StoredFile) thoughttypes.ThoughtAttachment {
|
func thoughtAttachmentFromFile(file thoughttypes.StoredFile) thoughttypes.ThoughtAttachment {
|
||||||
return thoughttypes.ThoughtAttachment{
|
return thoughttypes.ThoughtAttachment{
|
||||||
FileID: file.ID,
|
FileID: file.ID,
|
||||||
@@ -238,6 +261,15 @@ func normalizeMediaType(explicit string, fromDataURL string, content []byte) str
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func firstNonEmpty(values ...string) string {
|
||||||
|
for _, value := range values {
|
||||||
|
if strings.TrimSpace(value) != "" {
|
||||||
|
return strings.TrimSpace(value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
func normalizeFileKind(explicit string, mediaType string) string {
|
func normalizeFileKind(explicit string, mediaType string) string {
|
||||||
if explicit != "" {
|
if explicit != "" {
|
||||||
return explicit
|
return explicit
|
||||||
|
|||||||
206
internal/tools/metadata_retry.go
Normal file
206
internal/tools/metadata_retry.go
Normal file
@@ -0,0 +1,206 @@
|
|||||||
|
package tools
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"log/slog"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
|
"github.com/modelcontextprotocol/go-sdk/mcp"
|
||||||
|
"golang.org/x/sync/semaphore"
|
||||||
|
|
||||||
|
"git.warky.dev/wdevs/amcs/internal/ai"
|
||||||
|
"git.warky.dev/wdevs/amcs/internal/config"
|
||||||
|
"git.warky.dev/wdevs/amcs/internal/metadata"
|
||||||
|
"git.warky.dev/wdevs/amcs/internal/session"
|
||||||
|
"git.warky.dev/wdevs/amcs/internal/store"
|
||||||
|
thoughttypes "git.warky.dev/wdevs/amcs/internal/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
const metadataRetryConcurrency = 4
|
||||||
|
|
||||||
|
type MetadataRetryer struct {
|
||||||
|
backgroundCtx context.Context
|
||||||
|
store *store.DB
|
||||||
|
provider ai.Provider
|
||||||
|
capture config.CaptureConfig
|
||||||
|
sessions *session.ActiveProjects
|
||||||
|
metadataTimeout time.Duration
|
||||||
|
logger *slog.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
type RetryMetadataTool struct {
|
||||||
|
retryer *MetadataRetryer
|
||||||
|
}
|
||||||
|
|
||||||
|
type RetryMetadataInput struct {
|
||||||
|
Project string `json:"project,omitempty" jsonschema:"optional project name or id to scope the retry"`
|
||||||
|
Limit int `json:"limit,omitempty" jsonschema:"maximum number of thoughts to process in one call; defaults to 100"`
|
||||||
|
IncludeArchived bool `json:"include_archived,omitempty" jsonschema:"whether to include archived thoughts; defaults to false"`
|
||||||
|
OlderThanDays int `json:"older_than_days,omitempty" jsonschema:"only retry thoughts whose last metadata attempt was at least N days ago; 0 means no restriction"`
|
||||||
|
DryRun bool `json:"dry_run,omitempty" jsonschema:"report counts without retrying metadata extraction"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type RetryMetadataFailure struct {
|
||||||
|
ID string `json:"id"`
|
||||||
|
Error string `json:"error"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type RetryMetadataOutput struct {
|
||||||
|
Scanned int `json:"scanned"`
|
||||||
|
Retried int `json:"retried"`
|
||||||
|
Updated int `json:"updated"`
|
||||||
|
Skipped int `json:"skipped"`
|
||||||
|
Failed int `json:"failed"`
|
||||||
|
DryRun bool `json:"dry_run"`
|
||||||
|
Failures []RetryMetadataFailure `json:"failures,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewMetadataRetryer(backgroundCtx context.Context, db *store.DB, provider ai.Provider, capture config.CaptureConfig, metadataTimeout time.Duration, sessions *session.ActiveProjects, logger *slog.Logger) *MetadataRetryer {
|
||||||
|
if backgroundCtx == nil {
|
||||||
|
backgroundCtx = context.Background()
|
||||||
|
}
|
||||||
|
return &MetadataRetryer{
|
||||||
|
backgroundCtx: backgroundCtx,
|
||||||
|
store: db,
|
||||||
|
provider: provider,
|
||||||
|
capture: capture,
|
||||||
|
sessions: sessions,
|
||||||
|
metadataTimeout: metadataTimeout,
|
||||||
|
logger: logger,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewRetryMetadataTool(retryer *MetadataRetryer) *RetryMetadataTool {
|
||||||
|
return &RetryMetadataTool{retryer: retryer}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *RetryMetadataTool) Handle(ctx context.Context, req *mcp.CallToolRequest, in RetryMetadataInput) (*mcp.CallToolResult, RetryMetadataOutput, error) {
|
||||||
|
return t.retryer.Handle(ctx, req, in)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *MetadataRetryer) QueueThought(id uuid.UUID) {
|
||||||
|
go func() {
|
||||||
|
attemptCtx := r.backgroundCtx
|
||||||
|
if r.metadataTimeout > 0 {
|
||||||
|
var cancel context.CancelFunc
|
||||||
|
attemptCtx, cancel = context.WithTimeout(r.backgroundCtx, r.metadataTimeout)
|
||||||
|
defer cancel()
|
||||||
|
}
|
||||||
|
if _, err := r.retryOne(attemptCtx, id); err != nil {
|
||||||
|
r.logger.Warn("background metadata retry failed", slog.String("thought_id", id.String()), slog.String("error", err.Error()))
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *MetadataRetryer) Handle(ctx context.Context, req *mcp.CallToolRequest, in RetryMetadataInput) (*mcp.CallToolResult, RetryMetadataOutput, error) {
|
||||||
|
limit := in.Limit
|
||||||
|
if limit <= 0 {
|
||||||
|
limit = 100
|
||||||
|
}
|
||||||
|
|
||||||
|
project, err := resolveProject(ctx, r.store, r.sessions, req, in.Project, false)
|
||||||
|
if err != nil {
|
||||||
|
return nil, RetryMetadataOutput{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var projectID *uuid.UUID
|
||||||
|
if project != nil {
|
||||||
|
projectID = &project.ID
|
||||||
|
}
|
||||||
|
|
||||||
|
thoughts, err := r.store.ListThoughtsPendingMetadataRetry(ctx, limit, projectID, in.IncludeArchived, in.OlderThanDays)
|
||||||
|
if err != nil {
|
||||||
|
return nil, RetryMetadataOutput{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
out := RetryMetadataOutput{
|
||||||
|
Scanned: len(thoughts),
|
||||||
|
DryRun: in.DryRun,
|
||||||
|
}
|
||||||
|
if in.DryRun || len(thoughts) == 0 {
|
||||||
|
return nil, out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
sem := semaphore.NewWeighted(metadataRetryConcurrency)
|
||||||
|
var mu sync.Mutex
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
for _, thought := range thoughts {
|
||||||
|
if ctx.Err() != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if err := sem.Acquire(ctx, 1); err != nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Add(1)
|
||||||
|
go func(thought thoughttypes.Thought) {
|
||||||
|
defer wg.Done()
|
||||||
|
defer sem.Release(1)
|
||||||
|
|
||||||
|
mu.Lock()
|
||||||
|
out.Retried++
|
||||||
|
mu.Unlock()
|
||||||
|
|
||||||
|
updated, err := r.retryOne(ctx, thought.ID)
|
||||||
|
if err != nil {
|
||||||
|
mu.Lock()
|
||||||
|
out.Failures = append(out.Failures, RetryMetadataFailure{ID: thought.ID.String(), Error: err.Error()})
|
||||||
|
mu.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if updated {
|
||||||
|
mu.Lock()
|
||||||
|
out.Updated++
|
||||||
|
mu.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
mu.Lock()
|
||||||
|
out.Skipped++
|
||||||
|
mu.Unlock()
|
||||||
|
}(thought)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
out.Failed = len(out.Failures)
|
||||||
|
|
||||||
|
return nil, out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *MetadataRetryer) retryOne(ctx context.Context, id uuid.UUID) (bool, error) {
|
||||||
|
thought, err := r.store.GetThought(ctx, id)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
if thought.Metadata.MetadataStatus == metadata.MetadataStatusComplete {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
attemptCtx := ctx
|
||||||
|
if r.metadataTimeout > 0 {
|
||||||
|
var cancel context.CancelFunc
|
||||||
|
attemptCtx, cancel = context.WithTimeout(ctx, r.metadataTimeout)
|
||||||
|
defer cancel()
|
||||||
|
}
|
||||||
|
|
||||||
|
attemptedAt := time.Now().UTC()
|
||||||
|
extracted, extractErr := r.provider.ExtractMetadata(attemptCtx, thought.Content)
|
||||||
|
if extractErr != nil {
|
||||||
|
failedMetadata := metadata.MarkMetadataFailed(thought.Metadata, r.capture, attemptedAt, extractErr)
|
||||||
|
if _, updateErr := r.store.UpdateThoughtMetadata(ctx, thought.ID, failedMetadata); updateErr != nil {
|
||||||
|
return false, updateErr
|
||||||
|
}
|
||||||
|
return false, extractErr
|
||||||
|
}
|
||||||
|
|
||||||
|
completedMetadata := metadata.MarkMetadataComplete(extracted, r.capture, attemptedAt)
|
||||||
|
completedMetadata.Attachments = thought.Metadata.Attachments
|
||||||
|
if _, updateErr := r.store.UpdateThoughtMetadata(ctx, thought.ID, completedMetadata); updateErr != nil {
|
||||||
|
return false, updateErr
|
||||||
|
}
|
||||||
|
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
@@ -106,15 +106,18 @@ func (t *ReparseMetadataTool) Handle(ctx context.Context, req *mcp.CallToolReque
|
|||||||
|
|
||||||
normalizedCurrent := metadata.Normalize(thought.Metadata, t.capture)
|
normalizedCurrent := metadata.Normalize(thought.Metadata, t.capture)
|
||||||
|
|
||||||
|
attemptedAt := time.Now().UTC()
|
||||||
extracted, extractErr := t.provider.ExtractMetadata(ctx, thought.Content)
|
extracted, extractErr := t.provider.ExtractMetadata(ctx, thought.Content)
|
||||||
normalizedTarget := normalizedCurrent
|
normalizedTarget := normalizedCurrent
|
||||||
if extractErr != nil {
|
if extractErr != nil {
|
||||||
|
normalizedTarget = metadata.MarkMetadataFailed(normalizedCurrent, t.capture, attemptedAt, extractErr)
|
||||||
mu.Lock()
|
mu.Lock()
|
||||||
out.Normalized++
|
out.Normalized++
|
||||||
mu.Unlock()
|
mu.Unlock()
|
||||||
t.logger.Warn("metadata reparse extract failed, using normalized existing metadata", slog.String("thought_id", thought.ID.String()), slog.String("error", extractErr.Error()))
|
t.logger.Warn("metadata reparse extract failed, using normalized existing metadata", slog.String("thought_id", thought.ID.String()), slog.String("error", extractErr.Error()))
|
||||||
} else {
|
} else {
|
||||||
normalizedTarget = metadata.Normalize(extracted, t.capture)
|
normalizedTarget = metadata.MarkMetadataComplete(extracted, t.capture, attemptedAt)
|
||||||
|
normalizedTarget.Attachments = thought.Metadata.Attachments
|
||||||
mu.Lock()
|
mu.Lock()
|
||||||
out.Reparsed++
|
out.Reparsed++
|
||||||
mu.Unlock()
|
mu.Unlock()
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/modelcontextprotocol/go-sdk/mcp"
|
"github.com/modelcontextprotocol/go-sdk/mcp"
|
||||||
|
|
||||||
@@ -64,8 +65,9 @@ func (t *UpdateTool) Handle(ctx context.Context, _ *mcp.CallToolRequest, in Upda
|
|||||||
extracted, extractErr := t.provider.ExtractMetadata(ctx, content)
|
extracted, extractErr := t.provider.ExtractMetadata(ctx, content)
|
||||||
if extractErr != nil {
|
if extractErr != nil {
|
||||||
t.log.Warn("metadata extraction failed during update, keeping current metadata", slog.String("error", extractErr.Error()))
|
t.log.Warn("metadata extraction failed during update, keeping current metadata", slog.String("error", extractErr.Error()))
|
||||||
|
mergedMetadata = metadata.MarkMetadataFailed(mergedMetadata, t.capture, time.Now().UTC(), extractErr)
|
||||||
} else {
|
} else {
|
||||||
mergedMetadata = metadata.Normalize(extracted, t.capture)
|
mergedMetadata = metadata.MarkMetadataComplete(extracted, t.capture, time.Now().UTC())
|
||||||
mergedMetadata.Attachments = current.Metadata.Attachments
|
mergedMetadata.Attachments = current.Metadata.Attachments
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -14,6 +14,10 @@ type ThoughtMetadata struct {
|
|||||||
Type string `json:"type"`
|
Type string `json:"type"`
|
||||||
Source string `json:"source"`
|
Source string `json:"source"`
|
||||||
Attachments []ThoughtAttachment `json:"attachments,omitempty"`
|
Attachments []ThoughtAttachment `json:"attachments,omitempty"`
|
||||||
|
MetadataStatus string `json:"metadata_status,omitempty"`
|
||||||
|
MetadataUpdatedAt string `json:"metadata_updated_at,omitempty"`
|
||||||
|
MetadataLastAttemptedAt string `json:"metadata_last_attempted_at,omitempty"`
|
||||||
|
MetadataError string `json:"metadata_error,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type ThoughtAttachment struct {
|
type ThoughtAttachment struct {
|
||||||
|
|||||||
Reference in New Issue
Block a user