Files
amcs/llm/options_to_openclaw.md
Hein 1958eaca01
Some checks failed
CI / build-and-test (push) Failing after -30m34s
Its just a plan, or more of an understanding brainfart
2026-04-05 13:09:06 +02:00

30 KiB
Raw Permalink Blame History

AMCS → OpenClaw Alternative: Gap Analysis & Roadmap

Context

AMCS is a passive MCP memory server. OpenClaw's key differentiator is that it's an always-on autonomous agent — it proactively acts, monitors, and learns without human prompting. AMCS has the data model and search foundation; it's missing the execution engine and channel integrations that make OpenClaw compelling.

OpenClaw's 3 pillars AMCS lacks:

  1. Autonomous heartbeat — scheduled jobs that run without user prompts
  2. Channel integrations — 25+ messaging platforms (Telegram, Slack, Discord, email, etc.)
  3. Self-improving memory — knowledge graph distillation, daily notes, living summary (MEMORY.md)

Phase 1: Autonomous Heartbeat Engine (Critical — unlocks everything else)

1a. Add Complete() to AI Provider

The current Provider interface in internal/ai/provider.go only supports Summarize(ctx, systemPrompt, userPrompt). An autonomous agent needs a stateful multi-turn call with tool awareness.

Extend the interface:

// internal/ai/provider.go

type CompletionRole string

const (
    RoleSystem    CompletionRole = "system"
    RoleUser      CompletionRole = "user"
    RoleAssistant CompletionRole = "assistant"
)

type CompletionMessage struct {
    Role    CompletionRole `json:"role"`
    Content string         `json:"content"`
}

type CompletionResult struct {
    Content    string `json:"content"`
    StopReason string `json:"stop_reason"` // "stop" | "length" | "error"
    Model      string `json:"model"`
}

type Provider interface {
    Embed(ctx context.Context, input string) ([]float32, error)
    ExtractMetadata(ctx context.Context, input string) (thoughttypes.ThoughtMetadata, error)
    Summarize(ctx context.Context, systemPrompt, userPrompt string) (string, error)
    Complete(ctx context.Context, messages []CompletionMessage) (CompletionResult, error)
    Name() string
    EmbeddingModel() string
}

Implement in internal/ai/compat/client.go:

Complete is a simplification of the existing extractMetadataWithModel path — same OpenAI-compatible /chat/completions endpoint, same auth headers, no JSON schema coercion. Add a chatCompletionsRequest type (reuse or extend the existing unexported struct) and a Complete method on *Client that:

  1. Builds the request body from []CompletionMessage
  2. POSTs to c.baseURL + "/chat/completions" with c.metadataModel
  3. Reads the first choice's message.content
  4. Returns CompletionResult{Content, StopReason, Model}

Error handling mirrors the metadata path: on HTTP 429/503 mark the model unhealthy (c.modelHealth), return a wrapped error. No fallback model chain needed for agent calls — callers should retry on next heartbeat tick.


1b. Heartbeat Engine Package

New package: internal/agent/

internal/agent/job.go

package agent

import (
    "context"
    "time"
)

// Job is a single scheduled unit of autonomous work.
type Job interface {
    Name() string
    Interval() time.Duration
    Run(ctx context.Context) error
}

internal/agent/engine.go

The engine manages a set of jobs and fires each on its own ticker. It mirrors the pattern already used for runBackfillPass and runMetadataRetryPass in internal/app/app.go, but generalises it.

package agent

import (
    "context"
    "log/slog"
    "sync"
    "time"
)

type Engine struct {
    jobs   []Job
    store  JobStore  // persists agent_job_runs rows
    logger *slog.Logger
}

func NewEngine(store JobStore, logger *slog.Logger, jobs ...Job) *Engine {
    return &Engine{jobs: jobs, store: store, logger: logger}
}

// Run starts all job tickers and blocks until ctx is cancelled.
func (e *Engine) Run(ctx context.Context) {
    var wg sync.WaitGroup
    for _, job := range e.jobs {
        wg.Add(1)
        go func(j Job) {
            defer wg.Done()
            e.runLoop(ctx, j)
        }(job)
    }
    wg.Wait()
}

func (e *Engine) runLoop(ctx context.Context, j Job) {
    ticker := time.NewTicker(j.Interval())
    defer ticker.Stop()
    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            e.runOnce(ctx, j)
        }
    }
}

func (e *Engine) runOnce(ctx context.Context, j Job) {
    runID, err := e.store.StartRun(ctx, j.Name())
    if err != nil {
        e.logger.Error("agent: failed to start job run record",
            slog.String("job", j.Name()), slog.String("error", err.Error()))
        return
    }
    if err := j.Run(ctx); err != nil {
        e.logger.Error("agent: job failed",
            slog.String("job", j.Name()), slog.String("error", err.Error()))
        _ = e.store.FinishRun(ctx, runID, "failed", "", err.Error())
        return
    }
    _ = e.store.FinishRun(ctx, runID, "ok", "", "")
    e.logger.Info("agent: job complete", slog.String("job", j.Name()))
}

Deduplication / double-run prevention: StartRun should check for an existing running row younger than 2 * j.Interval() and return ErrAlreadyRunning — the caller skips that tick.

internal/agent/distill.go

// DistillJob clusters semantically related thoughts and promotes
// durable insights into knowledge nodes.
type DistillJob struct {
    store     store.ThoughtQuerier
    provider  ai.Provider
    cfg       AgentDistillConfig
    projectID *uuid.UUID // nil = all projects
}

func (j *DistillJob) Name() string             { return "distill" }
func (j *DistillJob) Interval() time.Duration  { return j.cfg.Interval }

func (j *DistillJob) Run(ctx context.Context) error {
    // 1. Fetch recent thoughts not yet distilled (metadata.distilled != true)
    //    using store.ListThoughts with filter Days = cfg.MinAgeHours/24
    // 2. Group into semantic clusters via SearchSimilarThoughts
    // 3. For each cluster > MinClusterSize:
    //    a. Call provider.Summarize with insight extraction prompt
    //    b. InsertThought with type="insight", metadata.knowledge_node=true
    //    c. InsertLink from each cluster member to the insight, relation="distilled_from"
    //    d. UpdateThought on each source to set metadata.distilled=true
    // 4. Return nil; partial failures are logged but do not abort the run
}

Prompt used in step 3a:

System: You extract durable knowledge from a cluster of related notes.
        Return a single paragraph (2-5 sentences) capturing the core insight.
        Do not reference the notes themselves. Write in third person.
User: [concatenated thought content, newest first, max 4000 tokens]

internal/agent/daily_notes.go

Runs at a configured hour each day (checked by comparing time.Now().Hour() against cfg.Hour inside the loop — skip if already ran today by querying agent_job_runs for a successful daily_notes run with started_at >= today midnight).

Collects:

  • Thoughts created today (store.ListThoughts with Days=1)
  • CRM interactions logged today
  • Calendar activities for today
  • Maintenance logs from today

Formats into a structured markdown string and calls store.InsertThought with type=daily_note.

internal/agent/living_summary.go

Regenerates MEMORY.md from the last N daily notes + all knowledge nodes. Calls provider.Summarize and upserts the result via store.UpsertFile using a fixed name MEMORY.md scoped to the project (or global if no project).


1c. Config Structs

Add to internal/config/config.go:

type Config struct {
    // ... existing fields ...
    Agent    AgentConfig    `yaml:"agent"`
    Channels ChannelsConfig `yaml:"channels"`
    Shell    ShellConfig    `yaml:"shell"`
}

type AgentConfig struct {
    Enabled       bool                  `yaml:"enabled"`
    Distill       AgentDistillConfig    `yaml:"distill"`
    DailyNotes    AgentDailyNotesConfig `yaml:"daily_notes"`
    LivingSummary AgentLivingSummary    `yaml:"living_summary"`
    Archival      AgentArchivalConfig   `yaml:"archival"`
    Model         string                `yaml:"model"` // override for agent calls; falls back to AI.Metadata.Model
}

type AgentDistillConfig struct {
    Enabled        bool          `yaml:"enabled"`
    Interval       time.Duration `yaml:"interval"`        // default: 24h
    BatchSize      int           `yaml:"batch_size"`      // thoughts per run; default: 50
    MinClusterSize int           `yaml:"min_cluster_size"` // default: 3
    MinAgeHours    int           `yaml:"min_age_hours"`   // ignore thoughts younger than this; default: 6
}

type AgentDailyNotesConfig struct {
    Enabled bool `yaml:"enabled"`
    Hour    int  `yaml:"hour"` // 0-23 UTC; default: 23
}

type AgentLivingSummary struct {
    Enabled  bool          `yaml:"enabled"`
    Interval time.Duration `yaml:"interval"`   // default: 24h
    MaxDays  int           `yaml:"max_days"`   // daily notes lookback; default: 30
}

type AgentArchivalConfig struct {
    Enabled           bool          `yaml:"enabled"`
    Interval          time.Duration `yaml:"interval"`             // default: 168h (weekly)
    ArchiveOlderThan  int           `yaml:"archive_older_than_days"` // default: 90
}

Full YAML reference (configs/dev.yaml additions):

agent:
  enabled: false
  model: ""                        # leave blank to reuse ai.metadata.model
  distill:
    enabled: false
    interval: 24h
    batch_size: 50
    min_cluster_size: 3
    min_age_hours: 6
  daily_notes:
    enabled: false
    hour: 23                       # UTC hour to generate (023)
  living_summary:
    enabled: false
    interval: 24h
    max_days: 30
  archival:
    enabled: false
    interval: 168h
    archive_older_than_days: 90

1d. Wire into internal/app/app.go

After the existing MetadataRetry goroutine block:

if cfg.Agent.Enabled {
    jobStore := store.NewJobStore(db)
    var jobs []agent.Job
    if cfg.Agent.Distill.Enabled {
        jobs = append(jobs, agent.NewDistillJob(db, provider, cfg.Agent.Distill, nil))
    }
    if cfg.Agent.DailyNotes.Enabled {
        jobs = append(jobs, agent.NewDailyNotesJob(db, provider, cfg.Agent.DailyNotes))
    }
    if cfg.Agent.LivingSummary.Enabled {
        jobs = append(jobs, agent.NewLivingSummaryJob(db, provider, cfg.Agent.LivingSummary))
    }
    if cfg.Agent.Archival.Enabled {
        jobs = append(jobs, agent.NewArchivalJob(db, cfg.Agent.Archival))
    }
    engine := agent.NewEngine(jobStore, logger, jobs...)
    go engine.Run(ctx)
}

1e. New MCP Tools — internal/tools/agent.go

// list_agent_jobs
// Returns all registered jobs with: name, interval, last_run (status, started_at, finished_at), next_run estimate.

// trigger_agent_job
// Input: { "job": "distill" }
// Fires the job immediately in a goroutine; returns a run_id for polling.

// get_agent_job_history
// Input: { "job": "distill", "limit": 20 }
// Returns rows from agent_job_runs ordered by started_at DESC.

Register in internal/app/app.go routes by adding Agent tools.AgentTool to mcpserver.ToolSet and wiring tools.NewAgentTool(engine).


1f. Migration — migrations/021_agent_jobs.sql

CREATE TABLE agent_job_runs (
  id          uuid        PRIMARY KEY DEFAULT gen_random_uuid(),
  job_name    text        NOT NULL,
  started_at  timestamptz NOT NULL DEFAULT now(),
  finished_at timestamptz,
  status      text        NOT NULL DEFAULT 'running', -- running | ok | failed | skipped
  output      text,
  error       text,
  metadata    jsonb       NOT NULL DEFAULT '{}'
);

CREATE INDEX idx_agent_job_runs_lookup
  ON agent_job_runs (job_name, started_at DESC);

JobStore interface (internal/store/agent.go):

type JobStore interface {
    StartRun(ctx context.Context, jobName string) (uuid.UUID, error)
    FinishRun(ctx context.Context, id uuid.UUID, status, output, errMsg string) error
    LastRun(ctx context.Context, jobName string) (*AgentJobRun, error)
    ListRuns(ctx context.Context, jobName string, limit int) ([]AgentJobRun, error)
}

Phase 2: Knowledge Graph Distillation

Builds on Phase 1's distillation job. thought_links already exists with typed relation — the missing piece is a way to mark and query promoted knowledge nodes.

2a. Extend ThoughtMetadata

In internal/types/thought.go, add two fields to ThoughtMetadata:

type ThoughtMetadata struct {
    // ... existing fields ...
    KnowledgeNode   bool `json:"knowledge_node,omitempty"`   // true = promoted insight
    KnowledgeWeight int  `json:"knowledge_weight,omitempty"` // number of source thoughts that fed this node
    Distilled       bool `json:"distilled,omitempty"`        // true = this thought has been processed by distill job
}

These are stored in the existing metadata jsonb column — no schema migration needed.

2b. Store Addition

In internal/store/thoughts.go add:

// ListKnowledgeNodes returns thoughts where metadata->>'knowledge_node' = 'true',
// ordered by knowledge_weight DESC, then created_at DESC.
func (db *DB) ListKnowledgeNodes(ctx context.Context, projectID *uuid.UUID, limit int) ([]types.Thought, error)

SQL:

SELECT id, content, metadata, project_id, archived_at, created_at, updated_at
FROM thoughts
WHERE (metadata->>'knowledge_node')::boolean = true
  AND ($1::uuid IS NULL OR project_id = $1)
  AND archived_at IS NULL
ORDER BY (metadata->>'knowledge_weight')::int DESC, created_at DESC
LIMIT $2

2c. New MCP Tools — internal/tools/knowledge.go

// get_knowledge_graph
// Input: { "project_id": "uuid|null", "limit": 50 }
// Returns: { nodes: [Thought], edges: [ThoughtLink] }
// Fetches ListKnowledgeNodes + their outgoing/incoming links via store.GetThoughtLinks.

// distill_now
// Input: { "project_id": "uuid|null", "batch_size": 20 }
// Triggers the distillation job synchronously (for on-demand use); returns { insights_created: N }

Phase 3: Channel Integrations — Telegram First

3a. Channel Adapter Interface — internal/channels/channel.go

package channels

import (
    "context"
    "time"
)

type Attachment struct {
    Name      string
    MediaType string
    Data      []byte
}

type InboundMessage struct {
    ChannelID   string      // e.g. telegram chat ID as string
    SenderID    string      // e.g. telegram user ID as string
    SenderName  string      // display name
    Text        string
    Attachments []Attachment
    Timestamp   time.Time
    Raw         any         // original platform message for debug/logging
}

type Channel interface {
    Name() string
    Start(ctx context.Context, handler func(InboundMessage)) error
    Send(ctx context.Context, channelID string, text string) error
}

3b. Telegram Implementation — internal/channels/telegram/bot.go

Uses net/http only (no external Telegram SDK). Long-polling loop:

type Bot struct {
    token      string
    allowedIDs map[int64]struct{} // empty = all allowed
    baseURL    string             // https://api.telegram.org/bot{token}
    client     *http.Client
    offset     int64
    logger     *slog.Logger
}

func (b *Bot) Name() string { return "telegram" }

func (b *Bot) Start(ctx context.Context, handler func(channels.InboundMessage)) error {
    for {
        updates, err := b.getUpdates(ctx, b.offset, 30 /*timeout seconds*/)
        if err != nil {
            if ctx.Err() != nil { return nil }
            // transient error: log and back off 5s
            time.Sleep(5 * time.Second)
            continue
        }
        for _, u := range updates {
            b.offset = u.UpdateID + 1
            if u.Message == nil { continue }
            if !b.isAllowed(u.Message.Chat.ID) { continue }
            handler(b.toInbound(u.Message))
        }
    }
}

func (b *Bot) Send(ctx context.Context, channelID string, text string) error {
    // POST /sendMessage with chat_id and text
    // Splits messages > 4096 chars automatically
}

Error handling:

  • HTTP 401 (bad token): return fatal error, engine stops channel
  • HTTP 429 (rate limit): respect retry_after from response body, sleep, retry
  • HTTP 5xx: exponential backoff (5s → 10s → 30s → 60s), max 3 retries then sleep 5 min

3c. Channel Router — internal/channels/router.go

type Router struct {
    store    store.ContactQuerier
    thoughts store.ThoughtInserter
    provider ai.Provider
    channels map[string]channels.Channel
    cfg      config.ChannelsConfig
    logger   *slog.Logger
}

func (r *Router) Handle(msg channels.InboundMessage) {
    // 1. Resolve sender → CRM contact (by channel_identifiers->>'telegram' = senderID)
    //    If not found: create a new professional_contact with the sender name + channel identifier
    // 2. Capture message as thought:
    //    content = msg.Text
    //    metadata.source = "telegram"
    //    metadata.type = "observation"
    //    metadata.people = [senderName]
    //    metadata (extra, stored in JSONB): channel="telegram", channel_id=msg.ChannelID, sender_id=msg.SenderID
    // 3. If cfg.Telegram.Respond:
    //    a. Load recent context via store.SearchSimilarThoughts(msg.Text, limit=10)
    //    b. Build []CompletionMessage with system context + recent thoughts + user message
    //    c. Call provider.Complete(ctx, messages)
    //    d. Capture response as thought (type="assistant_response", source="telegram")
    //    e. Send reply via channel.Send(ctx, msg.ChannelID, result.Content)
    //    f. Save chat history via store.InsertChatHistory
}

Agent response system prompt (step 3b):

You are a personal assistant with access to the user's memory.
Relevant context from memory:
{joined recent thought content}

Respond concisely. If you cannot answer from memory, say so.

3d. Config — full YAML reference

channels:
  telegram:
    enabled: false
    bot_token: ""
    allowed_chat_ids: []           # empty = all chats allowed
    capture_all: true              # save every inbound message as a thought
    respond: true                  # send LLM reply back to sender
    response_model: ""             # blank = uses agent.model or ai.metadata.model
    poll_timeout_seconds: 30       # Telegram long-poll timeout (max 60)
    max_message_length: 4096       # split replies longer than this
  discord:
    enabled: false
    bot_token: ""
    guild_ids: []                  # empty = all guilds
    capture_all: true
    respond: true
  slack:
    enabled: false
    bot_token: ""
    app_token: ""                  # for socket mode
    capture_all: true
    respond: true
  email:
    enabled: false
    imap_host: ""
    imap_port: 993
    smtp_host: ""
    smtp_port: 587
    username: ""
    password: ""
    poll_interval: 5m
    capture_all: true
    folders: ["INBOX"]

3e. Schema Migration — migrations/022_channel_contacts.sql

-- Store per-channel identity handles on CRM contacts
ALTER TABLE professional_contacts
  ADD COLUMN IF NOT EXISTS channel_identifiers jsonb NOT NULL DEFAULT '{}';

-- e.g. {"telegram": "123456789", "discord": "user#1234", "slack": "U01234567"}
CREATE INDEX idx_contacts_telegram_id
  ON professional_contacts ((channel_identifiers->>'telegram'))
  WHERE channel_identifiers->>'telegram' IS NOT NULL;

3f. New MCP Tools — internal/tools/channels.go

// send_channel_message
// Input: { "channel": "telegram", "channel_id": "123456789", "text": "Hello" }
// Sends a message on the named channel. Returns { sent: true, channel: "telegram" }

// list_channel_conversations
// Input: { "channel": "telegram", "limit": 20, "days": 7 }
// Lists chat histories filtered by channel metadata. Wraps store.ListChatHistories.

// get_channel_status
// Returns: [{ channel: "telegram", connected: true, uptime_seconds: 3600 }, ...]

3g. Future Channel Adapters

Each is a new subdirectory implementing channels.Channel. No router or MCP tool changes needed.

Channel Package Approach
Discord internal/channels/discord/ Gateway WebSocket (discord.com/api/gateway); or use discordgo lib
Slack internal/channels/slack/ Socket Mode WebSocket (no public URL needed)
Email (IMAP) internal/channels/email/ IMAP IDLE or poll; SMTP for send
Signal internal/channels/signal/ Wrap signal-cli JSON-RPC subprocess
WhatsApp internal/channels/whatsapp/ Meta Cloud API webhook (requires public URL)

Phase 4: Shell / Computer Access

4a. Shell Tool — internal/tools/shell.go

type ShellInput struct {
    Command    string `json:"command"`
    WorkingDir string `json:"working_dir,omitempty"` // override default; must be within allowed prefix
    Timeout    string `json:"timeout,omitempty"`     // e.g. "30s"; overrides config default
    CaptureAs  string `json:"capture_as,omitempty"`  // thought type for stored output; default "shell_output"
    SaveOutput bool   `json:"save_output"`           // store stdout/stderr as a thought
}

type ShellOutput struct {
    Stdout   string `json:"stdout"`
    Stderr   string `json:"stderr"`
    ExitCode int    `json:"exit_code"`
    ThoughtID *uuid.UUID `json:"thought_id,omitempty"` // set if save_output=true
}

Execution model:

  1. Validate command against cfg.Shell.AllowedCommands (if non-empty) and cfg.Shell.BlockedCommands
  2. exec.CommandContext(ctx, "sh", "-c", command) with Dir set to working dir
  3. Capture stdout + stderr into bytes.Buffer
  4. On timeout: kill process group (syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL)), return exit code -1
  5. If SaveOutput: call store.InsertThought with content = truncated stdout (max 8KB) + stderr summary

Security controls:

shell:
  enabled: false
  working_dir: "/tmp/amcs-agent"  # all commands run here unless overridden
  allowed_working_dirs:           # if set, working_dir overrides must be within one of these
    - "/tmp/amcs-agent"
    - "/home/user/projects"
  timeout: 30s
  max_output_bytes: 65536         # truncate captured output beyond this
  allowed_commands: []            # empty = all; non-empty = exact binary name allowlist
  blocked_commands:               # checked before allowed_commands
    - "rm"
    - "sudo"
    - "su"
    - "curl"
    - "wget"
  save_output_by_default: false

The tool is registered with mcp.Tool.Annotations Destructive: true so MCP clients prompt for confirmation.

4b. File Bridge Tools

Also in internal/tools/shell.go:

// read_file_from_path
// Input: { "path": "/abs/path/file.txt", "link_to_thought": "uuid|null" }
// Reads file from server filesystem → stores as AMCS file via store.InsertFile
// Returns: { file_id: "uuid", size_bytes: N, media_type: "text/plain" }

// write_file_to_path
// Input: { "file_id": "uuid", "path": "/abs/path/output.txt" }
// Loads AMCS file → writes to filesystem path
// Path must be within cfg.Shell.AllowedWorkingDirs if set

Phase 5: Self-Improving Memory

5a. Skill Discovery Job — internal/agent/skill_discovery.go

Runs weekly. Algorithm:

  1. Load last 30 days of chat_histories via store.ListChatHistories(days=30)
  2. Extract assistant message patterns with provider.Complete:
    System: Identify reusable behavioural patterns or preferences visible in these conversations.
            Return a JSON array of { "name": "...", "description": "...", "tags": [...] }.
            Only include patterns that would be useful across future sessions.
    User: [last N assistant + user messages, newest first]
    
  3. For each discovered pattern, call store.InsertSkill with tag auto-discovered and the current date
  4. Link to all projects via store.LinkSkillToProject

Deduplication: before inserting, call store.SearchSkills(pattern.name) — if similarity > 0.9, skip.

5b. Thought Archival Job — internal/agent/archival.go

func (j *ArchivalJob) Run(ctx context.Context) error {
    // 1. ListThoughts older than cfg.ArchiveOlderThanDays with no knowledge_node link
    //    SQL: thoughts where created_at < now() - interval '$N days'
    //         AND metadata->>'knowledge_node' IS DISTINCT FROM 'true'
    //         AND archived_at IS NULL
    //         AND id NOT IN (SELECT thought_id FROM thought_links WHERE relation = 'distilled_from')
    // 2. For each batch: store.ArchiveThought(ctx, id)
    // 3. Log count
}

Uses the existing ArchiveThought store method — no new SQL needed.


End-to-End Agent Loop Flow

Telegram message arrives
        │
        ▼
channels/telegram/bot.go (long-poll goroutine)
        │  InboundMessage{}
        ▼
channels/router.go Handle()
        ├── Resolve sender → CRM contact (store.SearchContacts by channel_identifiers)
        ├── store.InsertThought (source="telegram", type="observation")
        ├── store.SearchSimilarThoughts (semantic context retrieval)
        ├── ai.Provider.Complete (build messages → LLM call)
        ├── store.InsertThought (source="telegram", type="assistant_response")
        ├── store.InsertChatHistory (full turn saved)
        └── channels.Channel.Send (reply dispatched to Telegram)

Meanwhile, every 24h:
agent/engine.go ticker fires DistillJob
        ├── store.ListThoughts (recent, not yet distilled)
        ├── store.SearchSimilarThoughts (cluster by semantic similarity)
        ├── ai.Provider.Summarize (insight extraction prompt)
        ├── store.InsertThought (type="insight", knowledge_node=true)
        └── store.InsertLink (relation="distilled_from" for each source)

After distill:
agent/living_summary.go
        ├── store.ListKnowledgeNodes
        ├── store.ListThoughts (type="daily_note", last 30 days)
        ├── ai.Provider.Summarize (MEMORY.md regeneration)
        └── store.UpsertFile (name="MEMORY.md", linked to project)

Error Handling & Retry Strategy

Scenario Handling
LLM returns 429 Mark model unhealthy in modelHealth map (existing pattern), return error, engine logs and skips tick
LLM returns 5xx Same as 429
Telegram 429 Read retry_after from response, sleep exact duration, retry immediately
Telegram 5xx Exponential backoff: 5s → 10s → 30s → 60s, reset after success
Telegram disconnects Long-poll timeout naturally retries; context cancel exits cleanly
Agent job panics engine.runOnce wraps in recover(), logs stack trace, marks run failed
Agent double-run store.StartRun checks for running row < 2 * interval old → returns ErrAlreadyRunning, tick skipped silently
Shell command timeout exec.CommandContext kills process group via SIGKILL, returns exit_code=-1 and partial output
Distillation partial failure Each cluster processed independently; failure of one cluster logged and skipped, others continue

Critical Files

File Change
internal/ai/provider.go Add Complete(), CompletionMessage, CompletionResult
internal/ai/compat/client.go Implement Complete() on *Client
internal/config/config.go Add AgentConfig, ChannelsConfig, ShellConfig
internal/types/thought.go Add KnowledgeNode, KnowledgeWeight, Distilled to ThoughtMetadata
internal/store/thoughts.go Add ListKnowledgeNodes()
internal/store/agent.go New: JobStore interface + implementation
internal/app/app.go Wire agent engine + channel router goroutines
internal/mcpserver/server.go Add Agent, Knowledge, Channels, Shell to ToolSet
internal/agent/ New package: engine, job, distill, daily_notes, living_summary, archival, skill_discovery
internal/channels/ New package: channel interface, router, telegram/
internal/tools/agent.go New: list_agent_jobs, trigger_agent_job, get_agent_job_history
internal/tools/knowledge.go New: get_knowledge_graph, distill_now
internal/tools/channels.go New: send_channel_message, list_channel_conversations, get_channel_status
internal/tools/shell.go New: run_shell_command, read_file_from_path, write_file_to_path
migrations/021_agent_jobs.sql New table: agent_job_runs
migrations/022_channel_contacts.sql ALTER professional_contacts: add channel_identifiers jsonb

Sequence / Parallelism

Phase 1 (Heartbeat Engine) ──► Phase 2 (Knowledge Graph)
                           └──► Phase 5 (Self-Improving)

Phase 3 (Telegram) ──► Phase 3g (Discord / Slack / Email)

Phase 4 (Shell) [fully independent — no dependencies on other phases]

Minimum viable OpenClaw competitor = Phase 1 + Phase 3 (autonomous scheduling + Telegram channel).


Verification

Phase Test
1 — Heartbeat Set distill.interval: 1m in dev config. Capture 5+ related thoughts. Wait 1 min. Query thought_links for relation=distilled_from rows. Check agent_job_runs has a status=ok row.
1 — Daily notes Set daily_notes.hour to current UTC hour. Restart server. Within 1 min, list_thoughts should return a type=daily_note entry for today.
2 — Knowledge graph Call get_knowledge_graph MCP tool. Verify nodes array contains type=insight thoughts with knowledge_node=true. Verify edges list distilled_from links.
3 — Telegram inbound Send a message to the configured bot. Call search_thoughts with the message text — should appear with source=telegram.
3 — Telegram response Send a question to the bot. Verify a reply arrives in Telegram. Call list_chat_histories — should contain the turn.
4 — Shell Call run_shell_command with {"command": "echo hello", "save_output": true}. Verify stdout=hello\n, exit_code=0, and a new thought with type=shell_output.
4 — Blocked command Call run_shell_command with {"command": "sudo whoami"}. Verify error returned without execution.
5 — Skill discovery Run trigger_agent_job with {"job": "skill_discovery"}. Verify new rows in agent_skills with tag auto-discovered.
Full loop Send Telegram message → agent responds → distill job runs → knowledge node created from conversation → MEMORY.md regenerated with new insight.