321 lines
10 KiB
Go
321 lines
10 KiB
Go
package app
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"log/slog"
|
|
"net/http"
|
|
"time"
|
|
|
|
"git.warky.dev/wdevs/amcs/internal/ai"
|
|
"git.warky.dev/wdevs/amcs/internal/auth"
|
|
"git.warky.dev/wdevs/amcs/internal/buildinfo"
|
|
"git.warky.dev/wdevs/amcs/internal/config"
|
|
"git.warky.dev/wdevs/amcs/internal/mcpserver"
|
|
"git.warky.dev/wdevs/amcs/internal/observability"
|
|
"git.warky.dev/wdevs/amcs/internal/session"
|
|
"git.warky.dev/wdevs/amcs/internal/store"
|
|
"git.warky.dev/wdevs/amcs/internal/tools"
|
|
)
|
|
|
|
func Run(ctx context.Context, configPath string) error {
|
|
cfg, loadedFrom, err := config.Load(configPath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
info := buildinfo.Current()
|
|
cfg.MCP.Version = info.Version
|
|
|
|
logger, err := observability.NewLogger(cfg.Logging)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
logger.Info("loaded configuration",
|
|
slog.String("path", loadedFrom),
|
|
slog.String("provider", cfg.AI.Provider),
|
|
slog.String("version", info.Version),
|
|
slog.String("tag_name", info.TagName),
|
|
slog.String("build_date", info.BuildDate),
|
|
slog.String("commit", info.Commit),
|
|
)
|
|
|
|
db, err := store.New(ctx, cfg.Database)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer db.Close()
|
|
|
|
if err := db.VerifyRequirements(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
httpClient := &http.Client{Timeout: 30 * time.Second}
|
|
provider, err := ai.NewProvider(cfg.AI, httpClient, logger)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var keyring *auth.Keyring
|
|
var oauthRegistry *auth.OAuthRegistry
|
|
var tokenStore *auth.TokenStore
|
|
if len(cfg.Auth.Keys) > 0 {
|
|
keyring, err = auth.NewKeyring(cfg.Auth.Keys)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if len(cfg.Auth.OAuth.Clients) > 0 {
|
|
oauthRegistry, err = auth.NewOAuthRegistry(cfg.Auth.OAuth.Clients)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
tokenStore = auth.NewTokenStore(0)
|
|
}
|
|
authCodes := auth.NewAuthCodeStore()
|
|
dynClients := auth.NewDynamicClientStore()
|
|
activeProjects := session.NewActiveProjects()
|
|
|
|
logger.Info("database connection verified",
|
|
slog.String("provider", provider.Name()),
|
|
)
|
|
|
|
if cfg.Backfill.Enabled && cfg.Backfill.RunOnStartup {
|
|
go runBackfillPass(ctx, db, provider, cfg.Backfill, logger)
|
|
}
|
|
|
|
if cfg.Backfill.Enabled && cfg.Backfill.Interval > 0 {
|
|
go func() {
|
|
ticker := time.NewTicker(cfg.Backfill.Interval)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
runBackfillPass(ctx, db, provider, cfg.Backfill, logger)
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
if cfg.MetadataRetry.Enabled && cfg.MetadataRetry.RunOnStartup {
|
|
go runMetadataRetryPass(ctx, db, provider, cfg, activeProjects, logger)
|
|
}
|
|
|
|
if cfg.MetadataRetry.Enabled && cfg.MetadataRetry.Interval > 0 {
|
|
go func() {
|
|
ticker := time.NewTicker(cfg.MetadataRetry.Interval)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-ticker.C:
|
|
runMetadataRetryPass(ctx, db, provider, cfg, activeProjects, logger)
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
handler, err := routes(logger, cfg, info, db, provider, keyring, oauthRegistry, tokenStore, authCodes, dynClients, activeProjects)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
server := &http.Server{
|
|
Addr: fmt.Sprintf("%s:%d", cfg.Server.Host, cfg.Server.Port),
|
|
Handler: handler,
|
|
ReadTimeout: cfg.Server.ReadTimeout,
|
|
WriteTimeout: cfg.Server.WriteTimeout,
|
|
IdleTimeout: cfg.Server.IdleTimeout,
|
|
}
|
|
|
|
errCh := make(chan error, 1)
|
|
go func() {
|
|
logger.Info("starting HTTP server",
|
|
slog.String("addr", server.Addr),
|
|
slog.String("mcp_path", cfg.MCP.Path),
|
|
)
|
|
|
|
if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
|
errCh <- err
|
|
}
|
|
}()
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
logger.Info("shutting down HTTP server")
|
|
return server.Shutdown(shutdownCtx)
|
|
case err := <-errCh:
|
|
return fmt.Errorf("run server: %w", err)
|
|
}
|
|
}
|
|
|
|
func routes(logger *slog.Logger, cfg *config.Config, info buildinfo.Info, db *store.DB, provider ai.Provider, keyring *auth.Keyring, oauthRegistry *auth.OAuthRegistry, tokenStore *auth.TokenStore, authCodes *auth.AuthCodeStore, dynClients *auth.DynamicClientStore, activeProjects *session.ActiveProjects) (http.Handler, error) {
|
|
mux := http.NewServeMux()
|
|
accessTracker := auth.NewAccessTracker()
|
|
oauthEnabled := oauthRegistry != nil && tokenStore != nil
|
|
authMiddleware := auth.Middleware(cfg.Auth, keyring, oauthRegistry, tokenStore, accessTracker, logger)
|
|
filesTool := tools.NewFilesTool(db, activeProjects)
|
|
metadataRetryer := tools.NewMetadataRetryer(context.Background(), db, provider, cfg.Capture, cfg.AI.Metadata.Timeout, activeProjects, logger)
|
|
|
|
toolSet := mcpserver.ToolSet{
|
|
Capture: tools.NewCaptureTool(db, provider, cfg.Capture, cfg.AI.Metadata.Timeout, activeProjects, metadataRetryer, logger),
|
|
Search: tools.NewSearchTool(db, provider, cfg.Search, activeProjects),
|
|
List: tools.NewListTool(db, cfg.Search, activeProjects),
|
|
Stats: tools.NewStatsTool(db),
|
|
Get: tools.NewGetTool(db),
|
|
Update: tools.NewUpdateTool(db, provider, cfg.Capture, logger),
|
|
Delete: tools.NewDeleteTool(db),
|
|
Archive: tools.NewArchiveTool(db),
|
|
Projects: tools.NewProjectsTool(db, activeProjects),
|
|
Version: tools.NewVersionTool(cfg.MCP.ServerName, info),
|
|
Context: tools.NewContextTool(db, provider, cfg.Search, activeProjects),
|
|
Recall: tools.NewRecallTool(db, provider, cfg.Search, activeProjects),
|
|
Summarize: tools.NewSummarizeTool(db, provider, cfg.Search, activeProjects),
|
|
Links: tools.NewLinksTool(db, provider, cfg.Search),
|
|
Files: filesTool,
|
|
Backfill: tools.NewBackfillTool(db, provider, activeProjects, logger),
|
|
Reparse: tools.NewReparseMetadataTool(db, provider, cfg.Capture, activeProjects, logger),
|
|
RetryMetadata: tools.NewRetryMetadataTool(metadataRetryer),
|
|
Household: tools.NewHouseholdTool(db),
|
|
Maintenance: tools.NewMaintenanceTool(db),
|
|
Calendar: tools.NewCalendarTool(db),
|
|
Meals: tools.NewMealsTool(db),
|
|
CRM: tools.NewCRMTool(db),
|
|
Skills: tools.NewSkillsTool(db, activeProjects),
|
|
ChatHistory: tools.NewChatHistoryTool(db, activeProjects),
|
|
Describe: tools.NewDescribeTool(db, mcpserver.BuildToolCatalog()),
|
|
}
|
|
|
|
mcpHandler, err := mcpserver.New(cfg.MCP, logger, toolSet, activeProjects.Clear)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("build mcp handler: %w", err)
|
|
}
|
|
mux.Handle(cfg.MCP.Path, authMiddleware(mcpHandler))
|
|
mux.Handle("/files", authMiddleware(fileHandler(filesTool)))
|
|
mux.Handle("/files/{id}", authMiddleware(fileHandler(filesTool)))
|
|
if oauthEnabled {
|
|
mux.HandleFunc("/.well-known/oauth-authorization-server", oauthMetadataHandler())
|
|
mux.HandleFunc("/oauth-authorization-server", oauthMetadataHandler())
|
|
mux.HandleFunc("/oauth/register", oauthRegisterHandler(dynClients, logger))
|
|
mux.HandleFunc("/authorize", oauthAuthorizeHandler(dynClients, authCodes, logger))
|
|
mux.HandleFunc("/oauth/authorize", oauthAuthorizeHandler(dynClients, authCodes, logger))
|
|
mux.HandleFunc("/oauth/token", oauthTokenHandler(oauthRegistry, tokenStore, authCodes, logger))
|
|
}
|
|
mux.HandleFunc("/favicon.ico", serveFavicon)
|
|
mux.HandleFunc("/images/project.jpg", serveHomeImage)
|
|
mux.HandleFunc("/images/icon.png", serveIcon)
|
|
mux.HandleFunc("/llm", serveLLMInstructions)
|
|
mux.HandleFunc("/api/status", statusAPIHandler(info, accessTracker, oauthEnabled))
|
|
|
|
mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
|
|
w.WriteHeader(http.StatusOK)
|
|
_, _ = w.Write([]byte("ok"))
|
|
})
|
|
|
|
mux.HandleFunc("/readyz", func(w http.ResponseWriter, r *http.Request) {
|
|
if err := db.Ready(r.Context()); err != nil {
|
|
logger.Error("readiness check failed", slog.String("error", err.Error()))
|
|
http.Error(w, "not ready", http.StatusServiceUnavailable)
|
|
return
|
|
}
|
|
|
|
w.WriteHeader(http.StatusOK)
|
|
_, _ = w.Write([]byte("ready"))
|
|
})
|
|
|
|
mux.HandleFunc("/", homeHandler(info, accessTracker, oauthEnabled))
|
|
|
|
return observability.Chain(
|
|
mux,
|
|
observability.RequestID(),
|
|
observability.Recover(logger),
|
|
observability.AccessLog(logger),
|
|
observability.Timeout(cfg.Server.WriteTimeout),
|
|
), nil
|
|
}
|
|
|
|
func runMetadataRetryPass(ctx context.Context, db *store.DB, provider ai.Provider, cfg *config.Config, activeProjects *session.ActiveProjects, logger *slog.Logger) {
|
|
retryer := tools.NewMetadataRetryer(ctx, db, provider, cfg.Capture, cfg.AI.Metadata.Timeout, activeProjects, logger)
|
|
_, out, err := retryer.Handle(ctx, nil, tools.RetryMetadataInput{
|
|
Limit: cfg.MetadataRetry.MaxPerRun,
|
|
IncludeArchived: cfg.MetadataRetry.IncludeArchived,
|
|
OlderThanDays: 1,
|
|
})
|
|
if err != nil {
|
|
logger.Error("auto metadata retry failed", slog.String("error", err.Error()))
|
|
return
|
|
}
|
|
logger.Info("auto metadata retry pass",
|
|
slog.Int("scanned", out.Scanned),
|
|
slog.Int("retried", out.Retried),
|
|
slog.Int("updated", out.Updated),
|
|
slog.Int("failed", out.Failed),
|
|
)
|
|
}
|
|
|
|
func runBackfillPass(ctx context.Context, db *store.DB, provider ai.Provider, cfg config.BackfillConfig, logger *slog.Logger) {
|
|
backfiller := tools.NewBackfillTool(db, provider, nil, logger)
|
|
_, out, err := backfiller.Handle(ctx, nil, tools.BackfillInput{
|
|
Limit: cfg.MaxPerRun,
|
|
IncludeArchived: cfg.IncludeArchived,
|
|
})
|
|
if err != nil {
|
|
logger.Error("auto backfill failed", slog.String("error", err.Error()))
|
|
return
|
|
}
|
|
logger.Info("auto backfill pass",
|
|
slog.String("model", out.Model),
|
|
slog.Int("scanned", out.Scanned),
|
|
slog.Int("embedded", out.Embedded),
|
|
slog.Int("failed", out.Failed),
|
|
)
|
|
}
|
|
|
|
func serveHomeImage(w http.ResponseWriter, r *http.Request) {
|
|
if r.Method != http.MethodGet && r.Method != http.MethodHead {
|
|
w.Header().Set("Allow", "GET, HEAD")
|
|
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "image/jpeg")
|
|
w.Header().Set("Cache-Control", "public, max-age=31536000, immutable")
|
|
w.WriteHeader(http.StatusOK)
|
|
|
|
if r.Method == http.MethodHead {
|
|
return
|
|
}
|
|
|
|
_, _ = w.Write(homeImage)
|
|
}
|
|
|
|
func serveIcon(w http.ResponseWriter, r *http.Request) {
|
|
if iconImage == nil {
|
|
http.NotFound(w, r)
|
|
return
|
|
}
|
|
|
|
if r.Method != http.MethodGet && r.Method != http.MethodHead {
|
|
w.Header().Set("Allow", "GET, HEAD")
|
|
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "image/png")
|
|
w.Header().Set("Cache-Control", "public, max-age=31536000, immutable")
|
|
w.WriteHeader(http.StatusOK)
|
|
|
|
if r.Method == http.MethodHead {
|
|
return
|
|
}
|
|
|
|
_, _ = w.Write(iconImage)
|
|
}
|