diff --git a/llm/options_to_openclaw.md b/llm/options_to_openclaw.md new file mode 100644 index 0000000..9c7b87e --- /dev/null +++ b/llm/options_to_openclaw.md @@ -0,0 +1,826 @@ +# AMCS → OpenClaw Alternative: Gap Analysis & Roadmap + +## Context + +AMCS is a **passive** MCP memory server. OpenClaw's key differentiator is that it's an **always-on autonomous agent** — it proactively acts, monitors, and learns without human prompting. AMCS has the data model and search foundation; it's missing the execution engine and channel integrations that make OpenClaw compelling. + +OpenClaw's 3 pillars AMCS lacks: +1. **Autonomous heartbeat** — scheduled jobs that run without user prompts +2. **Channel integrations** — 25+ messaging platforms (Telegram, Slack, Discord, email, etc.) +3. **Self-improving memory** — knowledge graph distillation, daily notes, living summary (MEMORY.md) + +--- + +## Phase 1: Autonomous Heartbeat Engine (Critical — unlocks everything else) + +### 1a. Add `Complete()` to AI Provider + +The current `Provider` interface in `internal/ai/provider.go` only supports `Summarize(ctx, systemPrompt, userPrompt)`. An autonomous agent needs a stateful multi-turn call with tool awareness. + +**Extend the interface:** + +```go +// internal/ai/provider.go + +type CompletionRole string + +const ( + RoleSystem CompletionRole = "system" + RoleUser CompletionRole = "user" + RoleAssistant CompletionRole = "assistant" +) + +type CompletionMessage struct { + Role CompletionRole `json:"role"` + Content string `json:"content"` +} + +type CompletionResult struct { + Content string `json:"content"` + StopReason string `json:"stop_reason"` // "stop" | "length" | "error" + Model string `json:"model"` +} + +type Provider interface { + Embed(ctx context.Context, input string) ([]float32, error) + ExtractMetadata(ctx context.Context, input string) (thoughttypes.ThoughtMetadata, error) + Summarize(ctx context.Context, systemPrompt, userPrompt string) (string, error) + Complete(ctx context.Context, messages []CompletionMessage) (CompletionResult, error) + Name() string + EmbeddingModel() string +} +``` + +**Implement in `internal/ai/compat/client.go`:** + +`Complete` is a simplification of the existing `extractMetadataWithModel` path — same OpenAI-compatible `/chat/completions` endpoint, same auth headers, no JSON schema coercion. Add a `chatCompletionsRequest` type (reuse or extend the existing unexported struct) and a `Complete` method on `*Client` that: +1. Builds the request body from `[]CompletionMessage` +2. POSTs to `c.baseURL + "/chat/completions"` with `c.metadataModel` +3. Reads the first choice's `message.content` +4. Returns `CompletionResult{Content, StopReason, Model}` + +Error handling mirrors the metadata path: on HTTP 429/503 mark the model unhealthy (`c.modelHealth`), return a wrapped error. No fallback model chain needed for agent calls — callers should retry on next heartbeat tick. + +--- + +### 1b. Heartbeat Engine Package + +**New package: `internal/agent/`** + +#### `internal/agent/job.go` + +```go +package agent + +import ( + "context" + "time" +) + +// Job is a single scheduled unit of autonomous work. +type Job interface { + Name() string + Interval() time.Duration + Run(ctx context.Context) error +} +``` + +#### `internal/agent/engine.go` + +The engine manages a set of jobs and fires each on its own ticker. It mirrors the pattern already used for `runBackfillPass` and `runMetadataRetryPass` in `internal/app/app.go`, but generalises it. + +```go +package agent + +import ( + "context" + "log/slog" + "sync" + "time" +) + +type Engine struct { + jobs []Job + store JobStore // persists agent_job_runs rows + logger *slog.Logger +} + +func NewEngine(store JobStore, logger *slog.Logger, jobs ...Job) *Engine { + return &Engine{jobs: jobs, store: store, logger: logger} +} + +// Run starts all job tickers and blocks until ctx is cancelled. +func (e *Engine) Run(ctx context.Context) { + var wg sync.WaitGroup + for _, job := range e.jobs { + wg.Add(1) + go func(j Job) { + defer wg.Done() + e.runLoop(ctx, j) + }(job) + } + wg.Wait() +} + +func (e *Engine) runLoop(ctx context.Context, j Job) { + ticker := time.NewTicker(j.Interval()) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + e.runOnce(ctx, j) + } + } +} + +func (e *Engine) runOnce(ctx context.Context, j Job) { + runID, err := e.store.StartRun(ctx, j.Name()) + if err != nil { + e.logger.Error("agent: failed to start job run record", + slog.String("job", j.Name()), slog.String("error", err.Error())) + return + } + if err := j.Run(ctx); err != nil { + e.logger.Error("agent: job failed", + slog.String("job", j.Name()), slog.String("error", err.Error())) + _ = e.store.FinishRun(ctx, runID, "failed", "", err.Error()) + return + } + _ = e.store.FinishRun(ctx, runID, "ok", "", "") + e.logger.Info("agent: job complete", slog.String("job", j.Name())) +} +``` + +**Deduplication / double-run prevention:** `StartRun` should check for an existing `running` row younger than `2 * j.Interval()` and return `ErrAlreadyRunning` — the caller skips that tick. + +#### `internal/agent/distill.go` + +```go +// DistillJob clusters semantically related thoughts and promotes +// durable insights into knowledge nodes. +type DistillJob struct { + store store.ThoughtQuerier + provider ai.Provider + cfg AgentDistillConfig + projectID *uuid.UUID // nil = all projects +} + +func (j *DistillJob) Name() string { return "distill" } +func (j *DistillJob) Interval() time.Duration { return j.cfg.Interval } + +func (j *DistillJob) Run(ctx context.Context) error { + // 1. Fetch recent thoughts not yet distilled (metadata.distilled != true) + // using store.ListThoughts with filter Days = cfg.MinAgeHours/24 + // 2. Group into semantic clusters via SearchSimilarThoughts + // 3. For each cluster > MinClusterSize: + // a. Call provider.Summarize with insight extraction prompt + // b. InsertThought with type="insight", metadata.knowledge_node=true + // c. InsertLink from each cluster member to the insight, relation="distilled_from" + // d. UpdateThought on each source to set metadata.distilled=true + // 4. Return nil; partial failures are logged but do not abort the run +} +``` + +Prompt used in step 3a: +``` +System: You extract durable knowledge from a cluster of related notes. + Return a single paragraph (2-5 sentences) capturing the core insight. + Do not reference the notes themselves. Write in third person. +User: [concatenated thought content, newest first, max 4000 tokens] +``` + +#### `internal/agent/daily_notes.go` + +Runs at a configured hour each day (checked by comparing `time.Now().Hour()` against `cfg.Hour` inside the loop — skip if already ran today by querying `agent_job_runs` for a successful `daily_notes` run with `started_at >= today midnight`). + +Collects: +- Thoughts created today (`store.ListThoughts` with `Days=1`) +- CRM interactions logged today +- Calendar activities for today +- Maintenance logs from today + +Formats into a structured markdown string and calls `store.InsertThought` with `type=daily_note`. + +#### `internal/agent/living_summary.go` + +Regenerates `MEMORY.md` from the last N daily notes + all knowledge nodes. Calls `provider.Summarize` and upserts the result via `store.UpsertFile` using a fixed name `MEMORY.md` scoped to the project (or global if no project). + +--- + +### 1c. Config Structs + +Add to `internal/config/config.go`: + +```go +type Config struct { + // ... existing fields ... + Agent AgentConfig `yaml:"agent"` + Channels ChannelsConfig `yaml:"channels"` + Shell ShellConfig `yaml:"shell"` +} + +type AgentConfig struct { + Enabled bool `yaml:"enabled"` + Distill AgentDistillConfig `yaml:"distill"` + DailyNotes AgentDailyNotesConfig `yaml:"daily_notes"` + LivingSummary AgentLivingSummary `yaml:"living_summary"` + Archival AgentArchivalConfig `yaml:"archival"` + Model string `yaml:"model"` // override for agent calls; falls back to AI.Metadata.Model +} + +type AgentDistillConfig struct { + Enabled bool `yaml:"enabled"` + Interval time.Duration `yaml:"interval"` // default: 24h + BatchSize int `yaml:"batch_size"` // thoughts per run; default: 50 + MinClusterSize int `yaml:"min_cluster_size"` // default: 3 + MinAgeHours int `yaml:"min_age_hours"` // ignore thoughts younger than this; default: 6 +} + +type AgentDailyNotesConfig struct { + Enabled bool `yaml:"enabled"` + Hour int `yaml:"hour"` // 0-23 UTC; default: 23 +} + +type AgentLivingSummary struct { + Enabled bool `yaml:"enabled"` + Interval time.Duration `yaml:"interval"` // default: 24h + MaxDays int `yaml:"max_days"` // daily notes lookback; default: 30 +} + +type AgentArchivalConfig struct { + Enabled bool `yaml:"enabled"` + Interval time.Duration `yaml:"interval"` // default: 168h (weekly) + ArchiveOlderThan int `yaml:"archive_older_than_days"` // default: 90 +} +``` + +**Full YAML reference (`configs/dev.yaml` additions):** + +```yaml +agent: + enabled: false + model: "" # leave blank to reuse ai.metadata.model + distill: + enabled: false + interval: 24h + batch_size: 50 + min_cluster_size: 3 + min_age_hours: 6 + daily_notes: + enabled: false + hour: 23 # UTC hour to generate (0–23) + living_summary: + enabled: false + interval: 24h + max_days: 30 + archival: + enabled: false + interval: 168h + archive_older_than_days: 90 +``` + +--- + +### 1d. Wire into `internal/app/app.go` + +After the existing `MetadataRetry` goroutine block: + +```go +if cfg.Agent.Enabled { + jobStore := store.NewJobStore(db) + var jobs []agent.Job + if cfg.Agent.Distill.Enabled { + jobs = append(jobs, agent.NewDistillJob(db, provider, cfg.Agent.Distill, nil)) + } + if cfg.Agent.DailyNotes.Enabled { + jobs = append(jobs, agent.NewDailyNotesJob(db, provider, cfg.Agent.DailyNotes)) + } + if cfg.Agent.LivingSummary.Enabled { + jobs = append(jobs, agent.NewLivingSummaryJob(db, provider, cfg.Agent.LivingSummary)) + } + if cfg.Agent.Archival.Enabled { + jobs = append(jobs, agent.NewArchivalJob(db, cfg.Agent.Archival)) + } + engine := agent.NewEngine(jobStore, logger, jobs...) + go engine.Run(ctx) +} +``` + +--- + +### 1e. New MCP Tools — `internal/tools/agent.go` + +```go +// list_agent_jobs +// Returns all registered jobs with: name, interval, last_run (status, started_at, finished_at), next_run estimate. + +// trigger_agent_job +// Input: { "job": "distill" } +// Fires the job immediately in a goroutine; returns a run_id for polling. + +// get_agent_job_history +// Input: { "job": "distill", "limit": 20 } +// Returns rows from agent_job_runs ordered by started_at DESC. +``` + +Register in `internal/app/app.go` routes by adding `Agent tools.AgentTool` to `mcpserver.ToolSet` and wiring `tools.NewAgentTool(engine)`. + +--- + +### 1f. Migration — `migrations/021_agent_jobs.sql` + +```sql +CREATE TABLE agent_job_runs ( + id uuid PRIMARY KEY DEFAULT gen_random_uuid(), + job_name text NOT NULL, + started_at timestamptz NOT NULL DEFAULT now(), + finished_at timestamptz, + status text NOT NULL DEFAULT 'running', -- running | ok | failed | skipped + output text, + error text, + metadata jsonb NOT NULL DEFAULT '{}' +); + +CREATE INDEX idx_agent_job_runs_lookup + ON agent_job_runs (job_name, started_at DESC); +``` + +**`JobStore` interface (`internal/store/agent.go`):** + +```go +type JobStore interface { + StartRun(ctx context.Context, jobName string) (uuid.UUID, error) + FinishRun(ctx context.Context, id uuid.UUID, status, output, errMsg string) error + LastRun(ctx context.Context, jobName string) (*AgentJobRun, error) + ListRuns(ctx context.Context, jobName string, limit int) ([]AgentJobRun, error) +} +``` + +--- + +## Phase 2: Knowledge Graph Distillation + +Builds on Phase 1's distillation job. `thought_links` already exists with typed `relation` — the missing piece is a way to mark and query promoted knowledge nodes. + +### 2a. Extend `ThoughtMetadata` + +In `internal/types/thought.go`, add two fields to `ThoughtMetadata`: + +```go +type ThoughtMetadata struct { + // ... existing fields ... + KnowledgeNode bool `json:"knowledge_node,omitempty"` // true = promoted insight + KnowledgeWeight int `json:"knowledge_weight,omitempty"` // number of source thoughts that fed this node + Distilled bool `json:"distilled,omitempty"` // true = this thought has been processed by distill job +} +``` + +These are stored in the existing `metadata jsonb` column — no schema migration needed. + +### 2b. Store Addition + +In `internal/store/thoughts.go` add: + +```go +// ListKnowledgeNodes returns thoughts where metadata->>'knowledge_node' = 'true', +// ordered by knowledge_weight DESC, then created_at DESC. +func (db *DB) ListKnowledgeNodes(ctx context.Context, projectID *uuid.UUID, limit int) ([]types.Thought, error) +``` + +SQL: +```sql +SELECT id, content, metadata, project_id, archived_at, created_at, updated_at +FROM thoughts +WHERE (metadata->>'knowledge_node')::boolean = true + AND ($1::uuid IS NULL OR project_id = $1) + AND archived_at IS NULL +ORDER BY (metadata->>'knowledge_weight')::int DESC, created_at DESC +LIMIT $2 +``` + +### 2c. New MCP Tools — `internal/tools/knowledge.go` + +```go +// get_knowledge_graph +// Input: { "project_id": "uuid|null", "limit": 50 } +// Returns: { nodes: [Thought], edges: [ThoughtLink] } +// Fetches ListKnowledgeNodes + their outgoing/incoming links via store.GetThoughtLinks. + +// distill_now +// Input: { "project_id": "uuid|null", "batch_size": 20 } +// Triggers the distillation job synchronously (for on-demand use); returns { insights_created: N } +``` + +--- + +## Phase 3: Channel Integrations — Telegram First + +### 3a. Channel Adapter Interface — `internal/channels/channel.go` + +```go +package channels + +import ( + "context" + "time" +) + +type Attachment struct { + Name string + MediaType string + Data []byte +} + +type InboundMessage struct { + ChannelID string // e.g. telegram chat ID as string + SenderID string // e.g. telegram user ID as string + SenderName string // display name + Text string + Attachments []Attachment + Timestamp time.Time + Raw any // original platform message for debug/logging +} + +type Channel interface { + Name() string + Start(ctx context.Context, handler func(InboundMessage)) error + Send(ctx context.Context, channelID string, text string) error +} +``` + +### 3b. Telegram Implementation — `internal/channels/telegram/bot.go` + +Uses `net/http` only (no external Telegram SDK). Long-polling loop: + +```go +type Bot struct { + token string + allowedIDs map[int64]struct{} // empty = all allowed + baseURL string // https://api.telegram.org/bot{token} + client *http.Client + offset int64 + logger *slog.Logger +} + +func (b *Bot) Name() string { return "telegram" } + +func (b *Bot) Start(ctx context.Context, handler func(channels.InboundMessage)) error { + for { + updates, err := b.getUpdates(ctx, b.offset, 30 /*timeout seconds*/) + if err != nil { + if ctx.Err() != nil { return nil } + // transient error: log and back off 5s + time.Sleep(5 * time.Second) + continue + } + for _, u := range updates { + b.offset = u.UpdateID + 1 + if u.Message == nil { continue } + if !b.isAllowed(u.Message.Chat.ID) { continue } + handler(b.toInbound(u.Message)) + } + } +} + +func (b *Bot) Send(ctx context.Context, channelID string, text string) error { + // POST /sendMessage with chat_id and text + // Splits messages > 4096 chars automatically +} +``` + +**Error handling:** +- HTTP 401 (bad token): return fatal error, engine stops channel +- HTTP 429 (rate limit): respect `retry_after` from response body, sleep, retry +- HTTP 5xx: exponential backoff (5s → 10s → 30s → 60s), max 3 retries then sleep 5 min + +### 3c. Channel Router — `internal/channels/router.go` + +```go +type Router struct { + store store.ContactQuerier + thoughts store.ThoughtInserter + provider ai.Provider + channels map[string]channels.Channel + cfg config.ChannelsConfig + logger *slog.Logger +} + +func (r *Router) Handle(msg channels.InboundMessage) { + // 1. Resolve sender → CRM contact (by channel_identifiers->>'telegram' = senderID) + // If not found: create a new professional_contact with the sender name + channel identifier + // 2. Capture message as thought: + // content = msg.Text + // metadata.source = "telegram" + // metadata.type = "observation" + // metadata.people = [senderName] + // metadata (extra, stored in JSONB): channel="telegram", channel_id=msg.ChannelID, sender_id=msg.SenderID + // 3. If cfg.Telegram.Respond: + // a. Load recent context via store.SearchSimilarThoughts(msg.Text, limit=10) + // b. Build []CompletionMessage with system context + recent thoughts + user message + // c. Call provider.Complete(ctx, messages) + // d. Capture response as thought (type="assistant_response", source="telegram") + // e. Send reply via channel.Send(ctx, msg.ChannelID, result.Content) + // f. Save chat history via store.InsertChatHistory +} +``` + +**Agent response system prompt (step 3b):** +``` +You are a personal assistant with access to the user's memory. +Relevant context from memory: +{joined recent thought content} + +Respond concisely. If you cannot answer from memory, say so. +``` + +### 3d. Config — full YAML reference + +```yaml +channels: + telegram: + enabled: false + bot_token: "" + allowed_chat_ids: [] # empty = all chats allowed + capture_all: true # save every inbound message as a thought + respond: true # send LLM reply back to sender + response_model: "" # blank = uses agent.model or ai.metadata.model + poll_timeout_seconds: 30 # Telegram long-poll timeout (max 60) + max_message_length: 4096 # split replies longer than this + discord: + enabled: false + bot_token: "" + guild_ids: [] # empty = all guilds + capture_all: true + respond: true + slack: + enabled: false + bot_token: "" + app_token: "" # for socket mode + capture_all: true + respond: true + email: + enabled: false + imap_host: "" + imap_port: 993 + smtp_host: "" + smtp_port: 587 + username: "" + password: "" + poll_interval: 5m + capture_all: true + folders: ["INBOX"] +``` + +### 3e. Schema Migration — `migrations/022_channel_contacts.sql` + +```sql +-- Store per-channel identity handles on CRM contacts +ALTER TABLE professional_contacts + ADD COLUMN IF NOT EXISTS channel_identifiers jsonb NOT NULL DEFAULT '{}'; + +-- e.g. {"telegram": "123456789", "discord": "user#1234", "slack": "U01234567"} +CREATE INDEX idx_contacts_telegram_id + ON professional_contacts ((channel_identifiers->>'telegram')) + WHERE channel_identifiers->>'telegram' IS NOT NULL; +``` + +### 3f. New MCP Tools — `internal/tools/channels.go` + +```go +// send_channel_message +// Input: { "channel": "telegram", "channel_id": "123456789", "text": "Hello" } +// Sends a message on the named channel. Returns { sent: true, channel: "telegram" } + +// list_channel_conversations +// Input: { "channel": "telegram", "limit": 20, "days": 7 } +// Lists chat histories filtered by channel metadata. Wraps store.ListChatHistories. + +// get_channel_status +// Returns: [{ channel: "telegram", connected: true, uptime_seconds: 3600 }, ...] +``` + +### 3g. Future Channel Adapters + +Each is a new subdirectory implementing `channels.Channel`. No router or MCP tool changes needed. + +| Channel | Package | Approach | +|---------|---------|----------| +| Discord | `internal/channels/discord/` | Gateway WebSocket (discord.com/api/gateway); or use `discordgo` lib | +| Slack | `internal/channels/slack/` | Socket Mode WebSocket (no public URL needed) | +| Email (IMAP) | `internal/channels/email/` | IMAP IDLE or poll; SMTP for send | +| Signal | `internal/channels/signal/` | Wrap `signal-cli` JSON-RPC subprocess | +| WhatsApp | `internal/channels/whatsapp/` | Meta Cloud API webhook (requires public URL) | + +--- + +## Phase 4: Shell / Computer Access + +### 4a. Shell Tool — `internal/tools/shell.go` + +```go +type ShellInput struct { + Command string `json:"command"` + WorkingDir string `json:"working_dir,omitempty"` // override default; must be within allowed prefix + Timeout string `json:"timeout,omitempty"` // e.g. "30s"; overrides config default + CaptureAs string `json:"capture_as,omitempty"` // thought type for stored output; default "shell_output" + SaveOutput bool `json:"save_output"` // store stdout/stderr as a thought +} + +type ShellOutput struct { + Stdout string `json:"stdout"` + Stderr string `json:"stderr"` + ExitCode int `json:"exit_code"` + ThoughtID *uuid.UUID `json:"thought_id,omitempty"` // set if save_output=true +} +``` + +**Execution model:** +1. Validate `command` against `cfg.Shell.AllowedCommands` (if non-empty) and `cfg.Shell.BlockedCommands` +2. `exec.CommandContext(ctx, "sh", "-c", command)` with `Dir` set to working dir +3. Capture stdout + stderr into `bytes.Buffer` +4. On timeout: kill process group (`syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL)`), return exit code -1 +5. If `SaveOutput`: call `store.InsertThought` with content = truncated stdout (max 8KB) + stderr summary + +**Security controls:** + +```yaml +shell: + enabled: false + working_dir: "/tmp/amcs-agent" # all commands run here unless overridden + allowed_working_dirs: # if set, working_dir overrides must be within one of these + - "/tmp/amcs-agent" + - "/home/user/projects" + timeout: 30s + max_output_bytes: 65536 # truncate captured output beyond this + allowed_commands: [] # empty = all; non-empty = exact binary name allowlist + blocked_commands: # checked before allowed_commands + - "rm" + - "sudo" + - "su" + - "curl" + - "wget" + save_output_by_default: false +``` + +The tool is registered with `mcp.Tool.Annotations` `Destructive: true` so MCP clients prompt for confirmation. + +### 4b. File Bridge Tools + +Also in `internal/tools/shell.go`: + +```go +// read_file_from_path +// Input: { "path": "/abs/path/file.txt", "link_to_thought": "uuid|null" } +// Reads file from server filesystem → stores as AMCS file via store.InsertFile +// Returns: { file_id: "uuid", size_bytes: N, media_type: "text/plain" } + +// write_file_to_path +// Input: { "file_id": "uuid", "path": "/abs/path/output.txt" } +// Loads AMCS file → writes to filesystem path +// Path must be within cfg.Shell.AllowedWorkingDirs if set +``` + +--- + +## Phase 5: Self-Improving Memory + +### 5a. Skill Discovery Job — `internal/agent/skill_discovery.go` + +Runs weekly. Algorithm: + +1. Load last 30 days of `chat_histories` via `store.ListChatHistories(days=30)` +2. Extract assistant message patterns with `provider.Complete`: + ``` + System: Identify reusable behavioural patterns or preferences visible in these conversations. + Return a JSON array of { "name": "...", "description": "...", "tags": [...] }. + Only include patterns that would be useful across future sessions. + User: [last N assistant + user messages, newest first] + ``` +3. For each discovered pattern, call `store.InsertSkill` with tag `auto-discovered` and the current date +4. Link to all projects via `store.LinkSkillToProject` + +Deduplication: before inserting, call `store.SearchSkills(pattern.name)` — if similarity > 0.9, skip. + +### 5b. Thought Archival Job — `internal/agent/archival.go` + +```go +func (j *ArchivalJob) Run(ctx context.Context) error { + // 1. ListThoughts older than cfg.ArchiveOlderThanDays with no knowledge_node link + // SQL: thoughts where created_at < now() - interval '$N days' + // AND metadata->>'knowledge_node' IS DISTINCT FROM 'true' + // AND archived_at IS NULL + // AND id NOT IN (SELECT thought_id FROM thought_links WHERE relation = 'distilled_from') + // 2. For each batch: store.ArchiveThought(ctx, id) + // 3. Log count +} +``` + +Uses the existing `ArchiveThought` store method — no new SQL needed. + +--- + +## End-to-End Agent Loop Flow + +``` +Telegram message arrives + │ + ▼ +channels/telegram/bot.go (long-poll goroutine) + │ InboundMessage{} + ▼ +channels/router.go Handle() + ├── Resolve sender → CRM contact (store.SearchContacts by channel_identifiers) + ├── store.InsertThought (source="telegram", type="observation") + ├── store.SearchSimilarThoughts (semantic context retrieval) + ├── ai.Provider.Complete (build messages → LLM call) + ├── store.InsertThought (source="telegram", type="assistant_response") + ├── store.InsertChatHistory (full turn saved) + └── channels.Channel.Send (reply dispatched to Telegram) + +Meanwhile, every 24h: +agent/engine.go ticker fires DistillJob + ├── store.ListThoughts (recent, not yet distilled) + ├── store.SearchSimilarThoughts (cluster by semantic similarity) + ├── ai.Provider.Summarize (insight extraction prompt) + ├── store.InsertThought (type="insight", knowledge_node=true) + └── store.InsertLink (relation="distilled_from" for each source) + +After distill: +agent/living_summary.go + ├── store.ListKnowledgeNodes + ├── store.ListThoughts (type="daily_note", last 30 days) + ├── ai.Provider.Summarize (MEMORY.md regeneration) + └── store.UpsertFile (name="MEMORY.md", linked to project) +``` + +--- + +## Error Handling & Retry Strategy + +| Scenario | Handling | +|----------|----------| +| LLM returns 429 | Mark model unhealthy in `modelHealth` map (existing pattern), return error, engine logs and skips tick | +| LLM returns 5xx | Same as 429 | +| Telegram 429 | Read `retry_after` from response, sleep exact duration, retry immediately | +| Telegram 5xx | Exponential backoff: 5s → 10s → 30s → 60s, reset after success | +| Telegram disconnects | Long-poll timeout naturally retries; context cancel exits cleanly | +| Agent job panics | `engine.runOnce` wraps in `recover()`, logs stack trace, marks run `failed` | +| Agent double-run | `store.StartRun` checks for `running` row < `2 * interval` old → returns `ErrAlreadyRunning`, tick skipped silently | +| Shell command timeout | `exec.CommandContext` kills process group via SIGKILL, returns exit_code=-1 and partial output | +| Distillation partial failure | Each cluster processed independently; failure of one cluster logged and skipped, others continue | + +--- + +## Critical Files + +| File | Change | +|------|--------| +| `internal/ai/provider.go` | Add `Complete()`, `CompletionMessage`, `CompletionResult` | +| `internal/ai/compat/client.go` | Implement `Complete()` on `*Client` | +| `internal/config/config.go` | Add `AgentConfig`, `ChannelsConfig`, `ShellConfig` | +| `internal/types/thought.go` | Add `KnowledgeNode`, `KnowledgeWeight`, `Distilled` to `ThoughtMetadata` | +| `internal/store/thoughts.go` | Add `ListKnowledgeNodes()` | +| `internal/store/agent.go` | New: `JobStore` interface + implementation | +| `internal/app/app.go` | Wire agent engine + channel router goroutines | +| `internal/mcpserver/server.go` | Add `Agent`, `Knowledge`, `Channels`, `Shell` to `ToolSet` | +| `internal/agent/` | New package: engine, job, distill, daily_notes, living_summary, archival, skill_discovery | +| `internal/channels/` | New package: channel interface, router, telegram/ | +| `internal/tools/agent.go` | New: list_agent_jobs, trigger_agent_job, get_agent_job_history | +| `internal/tools/knowledge.go` | New: get_knowledge_graph, distill_now | +| `internal/tools/channels.go` | New: send_channel_message, list_channel_conversations, get_channel_status | +| `internal/tools/shell.go` | New: run_shell_command, read_file_from_path, write_file_to_path | +| `migrations/021_agent_jobs.sql` | New table: agent_job_runs | +| `migrations/022_channel_contacts.sql` | ALTER professional_contacts: add channel_identifiers jsonb | + +--- + +## Sequence / Parallelism + +``` +Phase 1 (Heartbeat Engine) ──► Phase 2 (Knowledge Graph) + └──► Phase 5 (Self-Improving) + +Phase 3 (Telegram) ──► Phase 3g (Discord / Slack / Email) + +Phase 4 (Shell) [fully independent — no dependencies on other phases] +``` + +**Minimum viable OpenClaw competitor = Phase 1 + Phase 3** (autonomous scheduling + Telegram channel). + +--- + +## Verification + +| Phase | Test | +|-------|------| +| 1 — Heartbeat | Set `distill.interval: 1m` in dev config. Capture 5+ related thoughts. Wait 1 min. Query `thought_links` for `relation=distilled_from` rows. Check `agent_job_runs` has a `status=ok` row. | +| 1 — Daily notes | Set `daily_notes.hour` to current UTC hour. Restart server. Within 1 min, `list_thoughts` should return a `type=daily_note` entry for today. | +| 2 — Knowledge graph | Call `get_knowledge_graph` MCP tool. Verify `nodes` array contains `type=insight` thoughts with `knowledge_node=true`. Verify edges list `distilled_from` links. | +| 3 — Telegram inbound | Send a message to the configured bot. Call `search_thoughts` with the message text — should appear with `source=telegram`. | +| 3 — Telegram response | Send a question to the bot. Verify a reply arrives in Telegram. Call `list_chat_histories` — should contain the turn. | +| 4 — Shell | Call `run_shell_command` with `{"command": "echo hello", "save_output": true}`. Verify `stdout=hello\n`, `exit_code=0`, and a new thought with `type=shell_output`. | +| 4 — Blocked command | Call `run_shell_command` with `{"command": "sudo whoami"}`. Verify error returned without execution. | +| 5 — Skill discovery | Run `trigger_agent_job` with `{"job": "skill_discovery"}`. Verify new rows in `agent_skills` with tag `auto-discovered`. | +| Full loop | Send Telegram message → agent responds → distill job runs → knowledge node created from conversation → MEMORY.md regenerated with new insight. |