# AMCS → OpenClaw Alternative: Gap Analysis & Roadmap ## Context AMCS is a **passive** MCP memory server. OpenClaw's key differentiator is that it's an **always-on autonomous agent** — it proactively acts, monitors, and learns without human prompting. AMCS has the data model and search foundation; it's missing the execution engine and channel integrations that make OpenClaw compelling. OpenClaw's 3 pillars AMCS lacks: 1. **Autonomous heartbeat** — scheduled jobs that run without user prompts 2. **Channel integrations** — 25+ messaging platforms (Telegram, Slack, Discord, email, etc.) 3. **Self-improving memory** — knowledge graph distillation, daily notes, living summary (MEMORY.md) --- ## Phase 1: Autonomous Heartbeat Engine (Critical — unlocks everything else) ### 1a. Add `Complete()` to AI Provider The current `Provider` interface in `internal/ai/provider.go` only supports `Summarize(ctx, systemPrompt, userPrompt)`. An autonomous agent needs a stateful multi-turn call with tool awareness. **Extend the interface:** ```go // internal/ai/provider.go type CompletionRole string const ( RoleSystem CompletionRole = "system" RoleUser CompletionRole = "user" RoleAssistant CompletionRole = "assistant" ) type CompletionMessage struct { Role CompletionRole `json:"role"` Content string `json:"content"` } type CompletionResult struct { Content string `json:"content"` StopReason string `json:"stop_reason"` // "stop" | "length" | "error" Model string `json:"model"` } type Provider interface { Embed(ctx context.Context, input string) ([]float32, error) ExtractMetadata(ctx context.Context, input string) (thoughttypes.ThoughtMetadata, error) Summarize(ctx context.Context, systemPrompt, userPrompt string) (string, error) Complete(ctx context.Context, messages []CompletionMessage) (CompletionResult, error) Name() string EmbeddingModel() string } ``` **Implement in `internal/ai/compat/client.go`:** `Complete` is a simplification of the existing `extractMetadataWithModel` path — same OpenAI-compatible `/chat/completions` endpoint, same auth headers, no JSON schema coercion. Add a `chatCompletionsRequest` type (reuse or extend the existing unexported struct) and a `Complete` method on `*Client` that: 1. Builds the request body from `[]CompletionMessage` 2. POSTs to `c.baseURL + "/chat/completions"` with `c.metadataModel` 3. Reads the first choice's `message.content` 4. Returns `CompletionResult{Content, StopReason, Model}` Error handling mirrors the metadata path: on HTTP 429/503 mark the model unhealthy (`c.modelHealth`), return a wrapped error. No fallback model chain needed for agent calls — callers should retry on next heartbeat tick. --- ### 1b. Heartbeat Engine Package **New package: `internal/agent/`** #### `internal/agent/job.go` ```go package agent import ( "context" "time" ) // Job is a single scheduled unit of autonomous work. type Job interface { Name() string Interval() time.Duration Run(ctx context.Context) error } ``` #### `internal/agent/engine.go` The engine manages a set of jobs and fires each on its own ticker. It mirrors the pattern already used for `runBackfillPass` and `runMetadataRetryPass` in `internal/app/app.go`, but generalises it. ```go package agent import ( "context" "log/slog" "sync" "time" ) type Engine struct { jobs []Job store JobStore // persists agent_job_runs rows logger *slog.Logger } func NewEngine(store JobStore, logger *slog.Logger, jobs ...Job) *Engine { return &Engine{jobs: jobs, store: store, logger: logger} } // Run starts all job tickers and blocks until ctx is cancelled. func (e *Engine) Run(ctx context.Context) { var wg sync.WaitGroup for _, job := range e.jobs { wg.Add(1) go func(j Job) { defer wg.Done() e.runLoop(ctx, j) }(job) } wg.Wait() } func (e *Engine) runLoop(ctx context.Context, j Job) { ticker := time.NewTicker(j.Interval()) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: e.runOnce(ctx, j) } } } func (e *Engine) runOnce(ctx context.Context, j Job) { runID, err := e.store.StartRun(ctx, j.Name()) if err != nil { e.logger.Error("agent: failed to start job run record", slog.String("job", j.Name()), slog.String("error", err.Error())) return } if err := j.Run(ctx); err != nil { e.logger.Error("agent: job failed", slog.String("job", j.Name()), slog.String("error", err.Error())) _ = e.store.FinishRun(ctx, runID, "failed", "", err.Error()) return } _ = e.store.FinishRun(ctx, runID, "ok", "", "") e.logger.Info("agent: job complete", slog.String("job", j.Name())) } ``` **Deduplication / double-run prevention:** `StartRun` should check for an existing `running` row younger than `2 * j.Interval()` and return `ErrAlreadyRunning` — the caller skips that tick. #### `internal/agent/distill.go` ```go // DistillJob clusters semantically related thoughts and promotes // durable insights into knowledge nodes. type DistillJob struct { store store.ThoughtQuerier provider ai.Provider cfg AgentDistillConfig projectID *uuid.UUID // nil = all projects } func (j *DistillJob) Name() string { return "distill" } func (j *DistillJob) Interval() time.Duration { return j.cfg.Interval } func (j *DistillJob) Run(ctx context.Context) error { // 1. Fetch recent thoughts not yet distilled (metadata.distilled != true) // using store.ListThoughts with filter Days = cfg.MinAgeHours/24 // 2. Group into semantic clusters via SearchSimilarThoughts // 3. For each cluster > MinClusterSize: // a. Call provider.Summarize with insight extraction prompt // b. InsertThought with type="insight", metadata.knowledge_node=true // c. InsertLink from each cluster member to the insight, relation="distilled_from" // d. UpdateThought on each source to set metadata.distilled=true // 4. Return nil; partial failures are logged but do not abort the run } ``` Prompt used in step 3a: ``` System: You extract durable knowledge from a cluster of related notes. Return a single paragraph (2-5 sentences) capturing the core insight. Do not reference the notes themselves. Write in third person. User: [concatenated thought content, newest first, max 4000 tokens] ``` #### `internal/agent/daily_notes.go` Runs at a configured hour each day (checked by comparing `time.Now().Hour()` against `cfg.Hour` inside the loop — skip if already ran today by querying `agent_job_runs` for a successful `daily_notes` run with `started_at >= today midnight`). Collects: - Thoughts created today (`store.ListThoughts` with `Days=1`) - CRM interactions logged today - Calendar activities for today - Maintenance logs from today Formats into a structured markdown string and calls `store.InsertThought` with `type=daily_note`. #### `internal/agent/living_summary.go` Regenerates `MEMORY.md` from the last N daily notes + all knowledge nodes. Calls `provider.Summarize` and upserts the result via `store.UpsertFile` using a fixed name `MEMORY.md` scoped to the project (or global if no project). --- ### 1c. Config Structs Add to `internal/config/config.go`: ```go type Config struct { // ... existing fields ... Agent AgentConfig `yaml:"agent"` Channels ChannelsConfig `yaml:"channels"` Shell ShellConfig `yaml:"shell"` } type AgentConfig struct { Enabled bool `yaml:"enabled"` Distill AgentDistillConfig `yaml:"distill"` DailyNotes AgentDailyNotesConfig `yaml:"daily_notes"` LivingSummary AgentLivingSummary `yaml:"living_summary"` Archival AgentArchivalConfig `yaml:"archival"` Model string `yaml:"model"` // override for agent calls; falls back to AI.Metadata.Model } type AgentDistillConfig struct { Enabled bool `yaml:"enabled"` Interval time.Duration `yaml:"interval"` // default: 24h BatchSize int `yaml:"batch_size"` // thoughts per run; default: 50 MinClusterSize int `yaml:"min_cluster_size"` // default: 3 MinAgeHours int `yaml:"min_age_hours"` // ignore thoughts younger than this; default: 6 } type AgentDailyNotesConfig struct { Enabled bool `yaml:"enabled"` Hour int `yaml:"hour"` // 0-23 UTC; default: 23 } type AgentLivingSummary struct { Enabled bool `yaml:"enabled"` Interval time.Duration `yaml:"interval"` // default: 24h MaxDays int `yaml:"max_days"` // daily notes lookback; default: 30 } type AgentArchivalConfig struct { Enabled bool `yaml:"enabled"` Interval time.Duration `yaml:"interval"` // default: 168h (weekly) ArchiveOlderThan int `yaml:"archive_older_than_days"` // default: 90 } ``` **Full YAML reference (`configs/dev.yaml` additions):** ```yaml agent: enabled: false model: "" # leave blank to reuse ai.metadata.model distill: enabled: false interval: 24h batch_size: 50 min_cluster_size: 3 min_age_hours: 6 daily_notes: enabled: false hour: 23 # UTC hour to generate (0–23) living_summary: enabled: false interval: 24h max_days: 30 archival: enabled: false interval: 168h archive_older_than_days: 90 ``` --- ### 1d. Wire into `internal/app/app.go` After the existing `MetadataRetry` goroutine block: ```go if cfg.Agent.Enabled { jobStore := store.NewJobStore(db) var jobs []agent.Job if cfg.Agent.Distill.Enabled { jobs = append(jobs, agent.NewDistillJob(db, provider, cfg.Agent.Distill, nil)) } if cfg.Agent.DailyNotes.Enabled { jobs = append(jobs, agent.NewDailyNotesJob(db, provider, cfg.Agent.DailyNotes)) } if cfg.Agent.LivingSummary.Enabled { jobs = append(jobs, agent.NewLivingSummaryJob(db, provider, cfg.Agent.LivingSummary)) } if cfg.Agent.Archival.Enabled { jobs = append(jobs, agent.NewArchivalJob(db, cfg.Agent.Archival)) } engine := agent.NewEngine(jobStore, logger, jobs...) go engine.Run(ctx) } ``` --- ### 1e. New MCP Tools — `internal/tools/agent.go` ```go // list_agent_jobs // Returns all registered jobs with: name, interval, last_run (status, started_at, finished_at), next_run estimate. // trigger_agent_job // Input: { "job": "distill" } // Fires the job immediately in a goroutine; returns a run_id for polling. // get_agent_job_history // Input: { "job": "distill", "limit": 20 } // Returns rows from agent_job_runs ordered by started_at DESC. ``` Register in `internal/app/app.go` routes by adding `Agent tools.AgentTool` to `mcpserver.ToolSet` and wiring `tools.NewAgentTool(engine)`. --- ### 1f. Migration — `migrations/021_agent_jobs.sql` ```sql CREATE TABLE agent_job_runs ( id uuid PRIMARY KEY DEFAULT gen_random_uuid(), job_name text NOT NULL, started_at timestamptz NOT NULL DEFAULT now(), finished_at timestamptz, status text NOT NULL DEFAULT 'running', -- running | ok | failed | skipped output text, error text, metadata jsonb NOT NULL DEFAULT '{}' ); CREATE INDEX idx_agent_job_runs_lookup ON agent_job_runs (job_name, started_at DESC); ``` **`JobStore` interface (`internal/store/agent.go`):** ```go type JobStore interface { StartRun(ctx context.Context, jobName string) (uuid.UUID, error) FinishRun(ctx context.Context, id uuid.UUID, status, output, errMsg string) error LastRun(ctx context.Context, jobName string) (*AgentJobRun, error) ListRuns(ctx context.Context, jobName string, limit int) ([]AgentJobRun, error) } ``` --- ## Phase 2: Knowledge Graph Distillation Builds on Phase 1's distillation job. `thought_links` already exists with typed `relation` — the missing piece is a way to mark and query promoted knowledge nodes. ### 2a. Extend `ThoughtMetadata` In `internal/types/thought.go`, add two fields to `ThoughtMetadata`: ```go type ThoughtMetadata struct { // ... existing fields ... KnowledgeNode bool `json:"knowledge_node,omitempty"` // true = promoted insight KnowledgeWeight int `json:"knowledge_weight,omitempty"` // number of source thoughts that fed this node Distilled bool `json:"distilled,omitempty"` // true = this thought has been processed by distill job } ``` These are stored in the existing `metadata jsonb` column — no schema migration needed. ### 2b. Store Addition In `internal/store/thoughts.go` add: ```go // ListKnowledgeNodes returns thoughts where metadata->>'knowledge_node' = 'true', // ordered by knowledge_weight DESC, then created_at DESC. func (db *DB) ListKnowledgeNodes(ctx context.Context, projectID *uuid.UUID, limit int) ([]types.Thought, error) ``` SQL: ```sql SELECT id, content, metadata, project_id, archived_at, created_at, updated_at FROM thoughts WHERE (metadata->>'knowledge_node')::boolean = true AND ($1::uuid IS NULL OR project_id = $1) AND archived_at IS NULL ORDER BY (metadata->>'knowledge_weight')::int DESC, created_at DESC LIMIT $2 ``` ### 2c. New MCP Tools — `internal/tools/knowledge.go` ```go // get_knowledge_graph // Input: { "project_id": "uuid|null", "limit": 50 } // Returns: { nodes: [Thought], edges: [ThoughtLink] } // Fetches ListKnowledgeNodes + their outgoing/incoming links via store.GetThoughtLinks. // distill_now // Input: { "project_id": "uuid|null", "batch_size": 20 } // Triggers the distillation job synchronously (for on-demand use); returns { insights_created: N } ``` --- ## Phase 3: Channel Integrations — Telegram First ### 3a. Channel Adapter Interface — `internal/channels/channel.go` ```go package channels import ( "context" "time" ) type Attachment struct { Name string MediaType string Data []byte } type InboundMessage struct { ChannelID string // e.g. telegram chat ID as string SenderID string // e.g. telegram user ID as string SenderName string // display name Text string Attachments []Attachment Timestamp time.Time Raw any // original platform message for debug/logging } type Channel interface { Name() string Start(ctx context.Context, handler func(InboundMessage)) error Send(ctx context.Context, channelID string, text string) error } ``` ### 3b. Telegram Implementation — `internal/channels/telegram/bot.go` Uses `net/http` only (no external Telegram SDK). Long-polling loop: ```go type Bot struct { token string allowedIDs map[int64]struct{} // empty = all allowed baseURL string // https://api.telegram.org/bot{token} client *http.Client offset int64 logger *slog.Logger } func (b *Bot) Name() string { return "telegram" } func (b *Bot) Start(ctx context.Context, handler func(channels.InboundMessage)) error { for { updates, err := b.getUpdates(ctx, b.offset, 30 /*timeout seconds*/) if err != nil { if ctx.Err() != nil { return nil } // transient error: log and back off 5s time.Sleep(5 * time.Second) continue } for _, u := range updates { b.offset = u.UpdateID + 1 if u.Message == nil { continue } if !b.isAllowed(u.Message.Chat.ID) { continue } handler(b.toInbound(u.Message)) } } } func (b *Bot) Send(ctx context.Context, channelID string, text string) error { // POST /sendMessage with chat_id and text // Splits messages > 4096 chars automatically } ``` **Error handling:** - HTTP 401 (bad token): return fatal error, engine stops channel - HTTP 429 (rate limit): respect `retry_after` from response body, sleep, retry - HTTP 5xx: exponential backoff (5s → 10s → 30s → 60s), max 3 retries then sleep 5 min ### 3c. Channel Router — `internal/channels/router.go` ```go type Router struct { store store.ContactQuerier thoughts store.ThoughtInserter provider ai.Provider channels map[string]channels.Channel cfg config.ChannelsConfig logger *slog.Logger } func (r *Router) Handle(msg channels.InboundMessage) { // 1. Resolve sender → CRM contact (by channel_identifiers->>'telegram' = senderID) // If not found: create a new professional_contact with the sender name + channel identifier // 2. Capture message as thought: // content = msg.Text // metadata.source = "telegram" // metadata.type = "observation" // metadata.people = [senderName] // metadata (extra, stored in JSONB): channel="telegram", channel_id=msg.ChannelID, sender_id=msg.SenderID // 3. If cfg.Telegram.Respond: // a. Load recent context via store.SearchSimilarThoughts(msg.Text, limit=10) // b. Build []CompletionMessage with system context + recent thoughts + user message // c. Call provider.Complete(ctx, messages) // d. Capture response as thought (type="assistant_response", source="telegram") // e. Send reply via channel.Send(ctx, msg.ChannelID, result.Content) // f. Save chat history via store.InsertChatHistory } ``` **Agent response system prompt (step 3b):** ``` You are a personal assistant with access to the user's memory. Relevant context from memory: {joined recent thought content} Respond concisely. If you cannot answer from memory, say so. ``` ### 3d. Config — full YAML reference ```yaml channels: telegram: enabled: false bot_token: "" allowed_chat_ids: [] # empty = all chats allowed capture_all: true # save every inbound message as a thought respond: true # send LLM reply back to sender response_model: "" # blank = uses agent.model or ai.metadata.model poll_timeout_seconds: 30 # Telegram long-poll timeout (max 60) max_message_length: 4096 # split replies longer than this discord: enabled: false bot_token: "" guild_ids: [] # empty = all guilds capture_all: true respond: true slack: enabled: false bot_token: "" app_token: "" # for socket mode capture_all: true respond: true email: enabled: false imap_host: "" imap_port: 993 smtp_host: "" smtp_port: 587 username: "" password: "" poll_interval: 5m capture_all: true folders: ["INBOX"] ``` ### 3e. Schema Migration — `migrations/022_channel_contacts.sql` ```sql -- Store per-channel identity handles on CRM contacts ALTER TABLE professional_contacts ADD COLUMN IF NOT EXISTS channel_identifiers jsonb NOT NULL DEFAULT '{}'; -- e.g. {"telegram": "123456789", "discord": "user#1234", "slack": "U01234567"} CREATE INDEX idx_contacts_telegram_id ON professional_contacts ((channel_identifiers->>'telegram')) WHERE channel_identifiers->>'telegram' IS NOT NULL; ``` ### 3f. New MCP Tools — `internal/tools/channels.go` ```go // send_channel_message // Input: { "channel": "telegram", "channel_id": "123456789", "text": "Hello" } // Sends a message on the named channel. Returns { sent: true, channel: "telegram" } // list_channel_conversations // Input: { "channel": "telegram", "limit": 20, "days": 7 } // Lists chat histories filtered by channel metadata. Wraps store.ListChatHistories. // get_channel_status // Returns: [{ channel: "telegram", connected: true, uptime_seconds: 3600 }, ...] ``` ### 3g. Future Channel Adapters Each is a new subdirectory implementing `channels.Channel`. No router or MCP tool changes needed. | Channel | Package | Approach | |---------|---------|----------| | Discord | `internal/channels/discord/` | Gateway WebSocket (discord.com/api/gateway); or use `discordgo` lib | | Slack | `internal/channels/slack/` | Socket Mode WebSocket (no public URL needed) | | Email (IMAP) | `internal/channels/email/` | IMAP IDLE or poll; SMTP for send | | Signal | `internal/channels/signal/` | Wrap `signal-cli` JSON-RPC subprocess | | WhatsApp | `internal/channels/whatsapp/` | Meta Cloud API webhook (requires public URL) | --- ## Phase 4: Shell / Computer Access ### 4a. Shell Tool — `internal/tools/shell.go` ```go type ShellInput struct { Command string `json:"command"` WorkingDir string `json:"working_dir,omitempty"` // override default; must be within allowed prefix Timeout string `json:"timeout,omitempty"` // e.g. "30s"; overrides config default CaptureAs string `json:"capture_as,omitempty"` // thought type for stored output; default "shell_output" SaveOutput bool `json:"save_output"` // store stdout/stderr as a thought } type ShellOutput struct { Stdout string `json:"stdout"` Stderr string `json:"stderr"` ExitCode int `json:"exit_code"` ThoughtID *uuid.UUID `json:"thought_id,omitempty"` // set if save_output=true } ``` **Execution model:** 1. Validate `command` against `cfg.Shell.AllowedCommands` (if non-empty) and `cfg.Shell.BlockedCommands` 2. `exec.CommandContext(ctx, "sh", "-c", command)` with `Dir` set to working dir 3. Capture stdout + stderr into `bytes.Buffer` 4. On timeout: kill process group (`syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL)`), return exit code -1 5. If `SaveOutput`: call `store.InsertThought` with content = truncated stdout (max 8KB) + stderr summary **Security controls:** ```yaml shell: enabled: false working_dir: "/tmp/amcs-agent" # all commands run here unless overridden allowed_working_dirs: # if set, working_dir overrides must be within one of these - "/tmp/amcs-agent" - "/home/user/projects" timeout: 30s max_output_bytes: 65536 # truncate captured output beyond this allowed_commands: [] # empty = all; non-empty = exact binary name allowlist blocked_commands: # checked before allowed_commands - "rm" - "sudo" - "su" - "curl" - "wget" save_output_by_default: false ``` The tool is registered with `mcp.Tool.Annotations` `Destructive: true` so MCP clients prompt for confirmation. ### 4b. File Bridge Tools Also in `internal/tools/shell.go`: ```go // read_file_from_path // Input: { "path": "/abs/path/file.txt", "link_to_thought": "uuid|null" } // Reads file from server filesystem → stores as AMCS file via store.InsertFile // Returns: { file_id: "uuid", size_bytes: N, media_type: "text/plain" } // write_file_to_path // Input: { "file_id": "uuid", "path": "/abs/path/output.txt" } // Loads AMCS file → writes to filesystem path // Path must be within cfg.Shell.AllowedWorkingDirs if set ``` --- ## Phase 5: Self-Improving Memory ### 5a. Skill Discovery Job — `internal/agent/skill_discovery.go` Runs weekly. Algorithm: 1. Load last 30 days of `chat_histories` via `store.ListChatHistories(days=30)` 2. Extract assistant message patterns with `provider.Complete`: ``` System: Identify reusable behavioural patterns or preferences visible in these conversations. Return a JSON array of { "name": "...", "description": "...", "tags": [...] }. Only include patterns that would be useful across future sessions. User: [last N assistant + user messages, newest first] ``` 3. For each discovered pattern, call `store.InsertSkill` with tag `auto-discovered` and the current date 4. Link to all projects via `store.LinkSkillToProject` Deduplication: before inserting, call `store.SearchSkills(pattern.name)` — if similarity > 0.9, skip. ### 5b. Thought Archival Job — `internal/agent/archival.go` ```go func (j *ArchivalJob) Run(ctx context.Context) error { // 1. ListThoughts older than cfg.ArchiveOlderThanDays with no knowledge_node link // SQL: thoughts where created_at < now() - interval '$N days' // AND metadata->>'knowledge_node' IS DISTINCT FROM 'true' // AND archived_at IS NULL // AND id NOT IN (SELECT thought_id FROM thought_links WHERE relation = 'distilled_from') // 2. For each batch: store.ArchiveThought(ctx, id) // 3. Log count } ``` Uses the existing `ArchiveThought` store method — no new SQL needed. --- ## End-to-End Agent Loop Flow ``` Telegram message arrives │ ▼ channels/telegram/bot.go (long-poll goroutine) │ InboundMessage{} ▼ channels/router.go Handle() ├── Resolve sender → CRM contact (store.SearchContacts by channel_identifiers) ├── store.InsertThought (source="telegram", type="observation") ├── store.SearchSimilarThoughts (semantic context retrieval) ├── ai.Provider.Complete (build messages → LLM call) ├── store.InsertThought (source="telegram", type="assistant_response") ├── store.InsertChatHistory (full turn saved) └── channels.Channel.Send (reply dispatched to Telegram) Meanwhile, every 24h: agent/engine.go ticker fires DistillJob ├── store.ListThoughts (recent, not yet distilled) ├── store.SearchSimilarThoughts (cluster by semantic similarity) ├── ai.Provider.Summarize (insight extraction prompt) ├── store.InsertThought (type="insight", knowledge_node=true) └── store.InsertLink (relation="distilled_from" for each source) After distill: agent/living_summary.go ├── store.ListKnowledgeNodes ├── store.ListThoughts (type="daily_note", last 30 days) ├── ai.Provider.Summarize (MEMORY.md regeneration) └── store.UpsertFile (name="MEMORY.md", linked to project) ``` --- ## Error Handling & Retry Strategy | Scenario | Handling | |----------|----------| | LLM returns 429 | Mark model unhealthy in `modelHealth` map (existing pattern), return error, engine logs and skips tick | | LLM returns 5xx | Same as 429 | | Telegram 429 | Read `retry_after` from response, sleep exact duration, retry immediately | | Telegram 5xx | Exponential backoff: 5s → 10s → 30s → 60s, reset after success | | Telegram disconnects | Long-poll timeout naturally retries; context cancel exits cleanly | | Agent job panics | `engine.runOnce` wraps in `recover()`, logs stack trace, marks run `failed` | | Agent double-run | `store.StartRun` checks for `running` row < `2 * interval` old → returns `ErrAlreadyRunning`, tick skipped silently | | Shell command timeout | `exec.CommandContext` kills process group via SIGKILL, returns exit_code=-1 and partial output | | Distillation partial failure | Each cluster processed independently; failure of one cluster logged and skipped, others continue | --- ## Critical Files | File | Change | |------|--------| | `internal/ai/provider.go` | Add `Complete()`, `CompletionMessage`, `CompletionResult` | | `internal/ai/compat/client.go` | Implement `Complete()` on `*Client` | | `internal/config/config.go` | Add `AgentConfig`, `ChannelsConfig`, `ShellConfig` | | `internal/types/thought.go` | Add `KnowledgeNode`, `KnowledgeWeight`, `Distilled` to `ThoughtMetadata` | | `internal/store/thoughts.go` | Add `ListKnowledgeNodes()` | | `internal/store/agent.go` | New: `JobStore` interface + implementation | | `internal/app/app.go` | Wire agent engine + channel router goroutines | | `internal/mcpserver/server.go` | Add `Agent`, `Knowledge`, `Channels`, `Shell` to `ToolSet` | | `internal/agent/` | New package: engine, job, distill, daily_notes, living_summary, archival, skill_discovery | | `internal/channels/` | New package: channel interface, router, telegram/ | | `internal/tools/agent.go` | New: list_agent_jobs, trigger_agent_job, get_agent_job_history | | `internal/tools/knowledge.go` | New: get_knowledge_graph, distill_now | | `internal/tools/channels.go` | New: send_channel_message, list_channel_conversations, get_channel_status | | `internal/tools/shell.go` | New: run_shell_command, read_file_from_path, write_file_to_path | | `migrations/021_agent_jobs.sql` | New table: agent_job_runs | | `migrations/022_channel_contacts.sql` | ALTER professional_contacts: add channel_identifiers jsonb | --- ## Sequence / Parallelism ``` Phase 1 (Heartbeat Engine) ──► Phase 2 (Knowledge Graph) └──► Phase 5 (Self-Improving) Phase 3 (Telegram) ──► Phase 3g (Discord / Slack / Email) Phase 4 (Shell) [fully independent — no dependencies on other phases] ``` **Minimum viable OpenClaw competitor = Phase 1 + Phase 3** (autonomous scheduling + Telegram channel). --- ## Verification | Phase | Test | |-------|------| | 1 — Heartbeat | Set `distill.interval: 1m` in dev config. Capture 5+ related thoughts. Wait 1 min. Query `thought_links` for `relation=distilled_from` rows. Check `agent_job_runs` has a `status=ok` row. | | 1 — Daily notes | Set `daily_notes.hour` to current UTC hour. Restart server. Within 1 min, `list_thoughts` should return a `type=daily_note` entry for today. | | 2 — Knowledge graph | Call `get_knowledge_graph` MCP tool. Verify `nodes` array contains `type=insight` thoughts with `knowledge_node=true`. Verify edges list `distilled_from` links. | | 3 — Telegram inbound | Send a message to the configured bot. Call `search_thoughts` with the message text — should appear with `source=telegram`. | | 3 — Telegram response | Send a question to the bot. Verify a reply arrives in Telegram. Call `list_chat_histories` — should contain the turn. | | 4 — Shell | Call `run_shell_command` with `{"command": "echo hello", "save_output": true}`. Verify `stdout=hello\n`, `exit_code=0`, and a new thought with `type=shell_output`. | | 4 — Blocked command | Call `run_shell_command` with `{"command": "sudo whoami"}`. Verify error returned without execution. | | 5 — Skill discovery | Run `trigger_agent_job` with `{"job": "skill_discovery"}`. Verify new rows in `agent_skills` with tag `auto-discovered`. | | Full loop | Send Telegram message → agent responds → distill job runs → knowledge node created from conversation → MEMORY.md regenerated with new insight. |