Files
amcs/llm/options_to_openclaw.md
Hein 1958eaca01
Some checks failed
CI / build-and-test (push) Failing after -30m34s
Its just a plan, or more of an understanding brainfart
2026-04-05 13:09:06 +02:00

827 lines
30 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# AMCS → OpenClaw Alternative: Gap Analysis & Roadmap
## Context
AMCS is a **passive** MCP memory server. OpenClaw's key differentiator is that it's an **always-on autonomous agent** — it proactively acts, monitors, and learns without human prompting. AMCS has the data model and search foundation; it's missing the execution engine and channel integrations that make OpenClaw compelling.
OpenClaw's 3 pillars AMCS lacks:
1. **Autonomous heartbeat** — scheduled jobs that run without user prompts
2. **Channel integrations** — 25+ messaging platforms (Telegram, Slack, Discord, email, etc.)
3. **Self-improving memory** — knowledge graph distillation, daily notes, living summary (MEMORY.md)
---
## Phase 1: Autonomous Heartbeat Engine (Critical — unlocks everything else)
### 1a. Add `Complete()` to AI Provider
The current `Provider` interface in `internal/ai/provider.go` only supports `Summarize(ctx, systemPrompt, userPrompt)`. An autonomous agent needs a stateful multi-turn call with tool awareness.
**Extend the interface:**
```go
// internal/ai/provider.go
type CompletionRole string
const (
RoleSystem CompletionRole = "system"
RoleUser CompletionRole = "user"
RoleAssistant CompletionRole = "assistant"
)
type CompletionMessage struct {
Role CompletionRole `json:"role"`
Content string `json:"content"`
}
type CompletionResult struct {
Content string `json:"content"`
StopReason string `json:"stop_reason"` // "stop" | "length" | "error"
Model string `json:"model"`
}
type Provider interface {
Embed(ctx context.Context, input string) ([]float32, error)
ExtractMetadata(ctx context.Context, input string) (thoughttypes.ThoughtMetadata, error)
Summarize(ctx context.Context, systemPrompt, userPrompt string) (string, error)
Complete(ctx context.Context, messages []CompletionMessage) (CompletionResult, error)
Name() string
EmbeddingModel() string
}
```
**Implement in `internal/ai/compat/client.go`:**
`Complete` is a simplification of the existing `extractMetadataWithModel` path — same OpenAI-compatible `/chat/completions` endpoint, same auth headers, no JSON schema coercion. Add a `chatCompletionsRequest` type (reuse or extend the existing unexported struct) and a `Complete` method on `*Client` that:
1. Builds the request body from `[]CompletionMessage`
2. POSTs to `c.baseURL + "/chat/completions"` with `c.metadataModel`
3. Reads the first choice's `message.content`
4. Returns `CompletionResult{Content, StopReason, Model}`
Error handling mirrors the metadata path: on HTTP 429/503 mark the model unhealthy (`c.modelHealth`), return a wrapped error. No fallback model chain needed for agent calls — callers should retry on next heartbeat tick.
---
### 1b. Heartbeat Engine Package
**New package: `internal/agent/`**
#### `internal/agent/job.go`
```go
package agent
import (
"context"
"time"
)
// Job is a single scheduled unit of autonomous work.
type Job interface {
Name() string
Interval() time.Duration
Run(ctx context.Context) error
}
```
#### `internal/agent/engine.go`
The engine manages a set of jobs and fires each on its own ticker. It mirrors the pattern already used for `runBackfillPass` and `runMetadataRetryPass` in `internal/app/app.go`, but generalises it.
```go
package agent
import (
"context"
"log/slog"
"sync"
"time"
)
type Engine struct {
jobs []Job
store JobStore // persists agent_job_runs rows
logger *slog.Logger
}
func NewEngine(store JobStore, logger *slog.Logger, jobs ...Job) *Engine {
return &Engine{jobs: jobs, store: store, logger: logger}
}
// Run starts all job tickers and blocks until ctx is cancelled.
func (e *Engine) Run(ctx context.Context) {
var wg sync.WaitGroup
for _, job := range e.jobs {
wg.Add(1)
go func(j Job) {
defer wg.Done()
e.runLoop(ctx, j)
}(job)
}
wg.Wait()
}
func (e *Engine) runLoop(ctx context.Context, j Job) {
ticker := time.NewTicker(j.Interval())
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
e.runOnce(ctx, j)
}
}
}
func (e *Engine) runOnce(ctx context.Context, j Job) {
runID, err := e.store.StartRun(ctx, j.Name())
if err != nil {
e.logger.Error("agent: failed to start job run record",
slog.String("job", j.Name()), slog.String("error", err.Error()))
return
}
if err := j.Run(ctx); err != nil {
e.logger.Error("agent: job failed",
slog.String("job", j.Name()), slog.String("error", err.Error()))
_ = e.store.FinishRun(ctx, runID, "failed", "", err.Error())
return
}
_ = e.store.FinishRun(ctx, runID, "ok", "", "")
e.logger.Info("agent: job complete", slog.String("job", j.Name()))
}
```
**Deduplication / double-run prevention:** `StartRun` should check for an existing `running` row younger than `2 * j.Interval()` and return `ErrAlreadyRunning` — the caller skips that tick.
#### `internal/agent/distill.go`
```go
// DistillJob clusters semantically related thoughts and promotes
// durable insights into knowledge nodes.
type DistillJob struct {
store store.ThoughtQuerier
provider ai.Provider
cfg AgentDistillConfig
projectID *uuid.UUID // nil = all projects
}
func (j *DistillJob) Name() string { return "distill" }
func (j *DistillJob) Interval() time.Duration { return j.cfg.Interval }
func (j *DistillJob) Run(ctx context.Context) error {
// 1. Fetch recent thoughts not yet distilled (metadata.distilled != true)
// using store.ListThoughts with filter Days = cfg.MinAgeHours/24
// 2. Group into semantic clusters via SearchSimilarThoughts
// 3. For each cluster > MinClusterSize:
// a. Call provider.Summarize with insight extraction prompt
// b. InsertThought with type="insight", metadata.knowledge_node=true
// c. InsertLink from each cluster member to the insight, relation="distilled_from"
// d. UpdateThought on each source to set metadata.distilled=true
// 4. Return nil; partial failures are logged but do not abort the run
}
```
Prompt used in step 3a:
```
System: You extract durable knowledge from a cluster of related notes.
Return a single paragraph (2-5 sentences) capturing the core insight.
Do not reference the notes themselves. Write in third person.
User: [concatenated thought content, newest first, max 4000 tokens]
```
#### `internal/agent/daily_notes.go`
Runs at a configured hour each day (checked by comparing `time.Now().Hour()` against `cfg.Hour` inside the loop — skip if already ran today by querying `agent_job_runs` for a successful `daily_notes` run with `started_at >= today midnight`).
Collects:
- Thoughts created today (`store.ListThoughts` with `Days=1`)
- CRM interactions logged today
- Calendar activities for today
- Maintenance logs from today
Formats into a structured markdown string and calls `store.InsertThought` with `type=daily_note`.
#### `internal/agent/living_summary.go`
Regenerates `MEMORY.md` from the last N daily notes + all knowledge nodes. Calls `provider.Summarize` and upserts the result via `store.UpsertFile` using a fixed name `MEMORY.md` scoped to the project (or global if no project).
---
### 1c. Config Structs
Add to `internal/config/config.go`:
```go
type Config struct {
// ... existing fields ...
Agent AgentConfig `yaml:"agent"`
Channels ChannelsConfig `yaml:"channels"`
Shell ShellConfig `yaml:"shell"`
}
type AgentConfig struct {
Enabled bool `yaml:"enabled"`
Distill AgentDistillConfig `yaml:"distill"`
DailyNotes AgentDailyNotesConfig `yaml:"daily_notes"`
LivingSummary AgentLivingSummary `yaml:"living_summary"`
Archival AgentArchivalConfig `yaml:"archival"`
Model string `yaml:"model"` // override for agent calls; falls back to AI.Metadata.Model
}
type AgentDistillConfig struct {
Enabled bool `yaml:"enabled"`
Interval time.Duration `yaml:"interval"` // default: 24h
BatchSize int `yaml:"batch_size"` // thoughts per run; default: 50
MinClusterSize int `yaml:"min_cluster_size"` // default: 3
MinAgeHours int `yaml:"min_age_hours"` // ignore thoughts younger than this; default: 6
}
type AgentDailyNotesConfig struct {
Enabled bool `yaml:"enabled"`
Hour int `yaml:"hour"` // 0-23 UTC; default: 23
}
type AgentLivingSummary struct {
Enabled bool `yaml:"enabled"`
Interval time.Duration `yaml:"interval"` // default: 24h
MaxDays int `yaml:"max_days"` // daily notes lookback; default: 30
}
type AgentArchivalConfig struct {
Enabled bool `yaml:"enabled"`
Interval time.Duration `yaml:"interval"` // default: 168h (weekly)
ArchiveOlderThan int `yaml:"archive_older_than_days"` // default: 90
}
```
**Full YAML reference (`configs/dev.yaml` additions):**
```yaml
agent:
enabled: false
model: "" # leave blank to reuse ai.metadata.model
distill:
enabled: false
interval: 24h
batch_size: 50
min_cluster_size: 3
min_age_hours: 6
daily_notes:
enabled: false
hour: 23 # UTC hour to generate (023)
living_summary:
enabled: false
interval: 24h
max_days: 30
archival:
enabled: false
interval: 168h
archive_older_than_days: 90
```
---
### 1d. Wire into `internal/app/app.go`
After the existing `MetadataRetry` goroutine block:
```go
if cfg.Agent.Enabled {
jobStore := store.NewJobStore(db)
var jobs []agent.Job
if cfg.Agent.Distill.Enabled {
jobs = append(jobs, agent.NewDistillJob(db, provider, cfg.Agent.Distill, nil))
}
if cfg.Agent.DailyNotes.Enabled {
jobs = append(jobs, agent.NewDailyNotesJob(db, provider, cfg.Agent.DailyNotes))
}
if cfg.Agent.LivingSummary.Enabled {
jobs = append(jobs, agent.NewLivingSummaryJob(db, provider, cfg.Agent.LivingSummary))
}
if cfg.Agent.Archival.Enabled {
jobs = append(jobs, agent.NewArchivalJob(db, cfg.Agent.Archival))
}
engine := agent.NewEngine(jobStore, logger, jobs...)
go engine.Run(ctx)
}
```
---
### 1e. New MCP Tools — `internal/tools/agent.go`
```go
// list_agent_jobs
// Returns all registered jobs with: name, interval, last_run (status, started_at, finished_at), next_run estimate.
// trigger_agent_job
// Input: { "job": "distill" }
// Fires the job immediately in a goroutine; returns a run_id for polling.
// get_agent_job_history
// Input: { "job": "distill", "limit": 20 }
// Returns rows from agent_job_runs ordered by started_at DESC.
```
Register in `internal/app/app.go` routes by adding `Agent tools.AgentTool` to `mcpserver.ToolSet` and wiring `tools.NewAgentTool(engine)`.
---
### 1f. Migration — `migrations/021_agent_jobs.sql`
```sql
CREATE TABLE agent_job_runs (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
job_name text NOT NULL,
started_at timestamptz NOT NULL DEFAULT now(),
finished_at timestamptz,
status text NOT NULL DEFAULT 'running', -- running | ok | failed | skipped
output text,
error text,
metadata jsonb NOT NULL DEFAULT '{}'
);
CREATE INDEX idx_agent_job_runs_lookup
ON agent_job_runs (job_name, started_at DESC);
```
**`JobStore` interface (`internal/store/agent.go`):**
```go
type JobStore interface {
StartRun(ctx context.Context, jobName string) (uuid.UUID, error)
FinishRun(ctx context.Context, id uuid.UUID, status, output, errMsg string) error
LastRun(ctx context.Context, jobName string) (*AgentJobRun, error)
ListRuns(ctx context.Context, jobName string, limit int) ([]AgentJobRun, error)
}
```
---
## Phase 2: Knowledge Graph Distillation
Builds on Phase 1's distillation job. `thought_links` already exists with typed `relation` — the missing piece is a way to mark and query promoted knowledge nodes.
### 2a. Extend `ThoughtMetadata`
In `internal/types/thought.go`, add two fields to `ThoughtMetadata`:
```go
type ThoughtMetadata struct {
// ... existing fields ...
KnowledgeNode bool `json:"knowledge_node,omitempty"` // true = promoted insight
KnowledgeWeight int `json:"knowledge_weight,omitempty"` // number of source thoughts that fed this node
Distilled bool `json:"distilled,omitempty"` // true = this thought has been processed by distill job
}
```
These are stored in the existing `metadata jsonb` column — no schema migration needed.
### 2b. Store Addition
In `internal/store/thoughts.go` add:
```go
// ListKnowledgeNodes returns thoughts where metadata->>'knowledge_node' = 'true',
// ordered by knowledge_weight DESC, then created_at DESC.
func (db *DB) ListKnowledgeNodes(ctx context.Context, projectID *uuid.UUID, limit int) ([]types.Thought, error)
```
SQL:
```sql
SELECT id, content, metadata, project_id, archived_at, created_at, updated_at
FROM thoughts
WHERE (metadata->>'knowledge_node')::boolean = true
AND ($1::uuid IS NULL OR project_id = $1)
AND archived_at IS NULL
ORDER BY (metadata->>'knowledge_weight')::int DESC, created_at DESC
LIMIT $2
```
### 2c. New MCP Tools — `internal/tools/knowledge.go`
```go
// get_knowledge_graph
// Input: { "project_id": "uuid|null", "limit": 50 }
// Returns: { nodes: [Thought], edges: [ThoughtLink] }
// Fetches ListKnowledgeNodes + their outgoing/incoming links via store.GetThoughtLinks.
// distill_now
// Input: { "project_id": "uuid|null", "batch_size": 20 }
// Triggers the distillation job synchronously (for on-demand use); returns { insights_created: N }
```
---
## Phase 3: Channel Integrations — Telegram First
### 3a. Channel Adapter Interface — `internal/channels/channel.go`
```go
package channels
import (
"context"
"time"
)
type Attachment struct {
Name string
MediaType string
Data []byte
}
type InboundMessage struct {
ChannelID string // e.g. telegram chat ID as string
SenderID string // e.g. telegram user ID as string
SenderName string // display name
Text string
Attachments []Attachment
Timestamp time.Time
Raw any // original platform message for debug/logging
}
type Channel interface {
Name() string
Start(ctx context.Context, handler func(InboundMessage)) error
Send(ctx context.Context, channelID string, text string) error
}
```
### 3b. Telegram Implementation — `internal/channels/telegram/bot.go`
Uses `net/http` only (no external Telegram SDK). Long-polling loop:
```go
type Bot struct {
token string
allowedIDs map[int64]struct{} // empty = all allowed
baseURL string // https://api.telegram.org/bot{token}
client *http.Client
offset int64
logger *slog.Logger
}
func (b *Bot) Name() string { return "telegram" }
func (b *Bot) Start(ctx context.Context, handler func(channels.InboundMessage)) error {
for {
updates, err := b.getUpdates(ctx, b.offset, 30 /*timeout seconds*/)
if err != nil {
if ctx.Err() != nil { return nil }
// transient error: log and back off 5s
time.Sleep(5 * time.Second)
continue
}
for _, u := range updates {
b.offset = u.UpdateID + 1
if u.Message == nil { continue }
if !b.isAllowed(u.Message.Chat.ID) { continue }
handler(b.toInbound(u.Message))
}
}
}
func (b *Bot) Send(ctx context.Context, channelID string, text string) error {
// POST /sendMessage with chat_id and text
// Splits messages > 4096 chars automatically
}
```
**Error handling:**
- HTTP 401 (bad token): return fatal error, engine stops channel
- HTTP 429 (rate limit): respect `retry_after` from response body, sleep, retry
- HTTP 5xx: exponential backoff (5s → 10s → 30s → 60s), max 3 retries then sleep 5 min
### 3c. Channel Router — `internal/channels/router.go`
```go
type Router struct {
store store.ContactQuerier
thoughts store.ThoughtInserter
provider ai.Provider
channels map[string]channels.Channel
cfg config.ChannelsConfig
logger *slog.Logger
}
func (r *Router) Handle(msg channels.InboundMessage) {
// 1. Resolve sender → CRM contact (by channel_identifiers->>'telegram' = senderID)
// If not found: create a new professional_contact with the sender name + channel identifier
// 2. Capture message as thought:
// content = msg.Text
// metadata.source = "telegram"
// metadata.type = "observation"
// metadata.people = [senderName]
// metadata (extra, stored in JSONB): channel="telegram", channel_id=msg.ChannelID, sender_id=msg.SenderID
// 3. If cfg.Telegram.Respond:
// a. Load recent context via store.SearchSimilarThoughts(msg.Text, limit=10)
// b. Build []CompletionMessage with system context + recent thoughts + user message
// c. Call provider.Complete(ctx, messages)
// d. Capture response as thought (type="assistant_response", source="telegram")
// e. Send reply via channel.Send(ctx, msg.ChannelID, result.Content)
// f. Save chat history via store.InsertChatHistory
}
```
**Agent response system prompt (step 3b):**
```
You are a personal assistant with access to the user's memory.
Relevant context from memory:
{joined recent thought content}
Respond concisely. If you cannot answer from memory, say so.
```
### 3d. Config — full YAML reference
```yaml
channels:
telegram:
enabled: false
bot_token: ""
allowed_chat_ids: [] # empty = all chats allowed
capture_all: true # save every inbound message as a thought
respond: true # send LLM reply back to sender
response_model: "" # blank = uses agent.model or ai.metadata.model
poll_timeout_seconds: 30 # Telegram long-poll timeout (max 60)
max_message_length: 4096 # split replies longer than this
discord:
enabled: false
bot_token: ""
guild_ids: [] # empty = all guilds
capture_all: true
respond: true
slack:
enabled: false
bot_token: ""
app_token: "" # for socket mode
capture_all: true
respond: true
email:
enabled: false
imap_host: ""
imap_port: 993
smtp_host: ""
smtp_port: 587
username: ""
password: ""
poll_interval: 5m
capture_all: true
folders: ["INBOX"]
```
### 3e. Schema Migration — `migrations/022_channel_contacts.sql`
```sql
-- Store per-channel identity handles on CRM contacts
ALTER TABLE professional_contacts
ADD COLUMN IF NOT EXISTS channel_identifiers jsonb NOT NULL DEFAULT '{}';
-- e.g. {"telegram": "123456789", "discord": "user#1234", "slack": "U01234567"}
CREATE INDEX idx_contacts_telegram_id
ON professional_contacts ((channel_identifiers->>'telegram'))
WHERE channel_identifiers->>'telegram' IS NOT NULL;
```
### 3f. New MCP Tools — `internal/tools/channels.go`
```go
// send_channel_message
// Input: { "channel": "telegram", "channel_id": "123456789", "text": "Hello" }
// Sends a message on the named channel. Returns { sent: true, channel: "telegram" }
// list_channel_conversations
// Input: { "channel": "telegram", "limit": 20, "days": 7 }
// Lists chat histories filtered by channel metadata. Wraps store.ListChatHistories.
// get_channel_status
// Returns: [{ channel: "telegram", connected: true, uptime_seconds: 3600 }, ...]
```
### 3g. Future Channel Adapters
Each is a new subdirectory implementing `channels.Channel`. No router or MCP tool changes needed.
| Channel | Package | Approach |
|---------|---------|----------|
| Discord | `internal/channels/discord/` | Gateway WebSocket (discord.com/api/gateway); or use `discordgo` lib |
| Slack | `internal/channels/slack/` | Socket Mode WebSocket (no public URL needed) |
| Email (IMAP) | `internal/channels/email/` | IMAP IDLE or poll; SMTP for send |
| Signal | `internal/channels/signal/` | Wrap `signal-cli` JSON-RPC subprocess |
| WhatsApp | `internal/channels/whatsapp/` | Meta Cloud API webhook (requires public URL) |
---
## Phase 4: Shell / Computer Access
### 4a. Shell Tool — `internal/tools/shell.go`
```go
type ShellInput struct {
Command string `json:"command"`
WorkingDir string `json:"working_dir,omitempty"` // override default; must be within allowed prefix
Timeout string `json:"timeout,omitempty"` // e.g. "30s"; overrides config default
CaptureAs string `json:"capture_as,omitempty"` // thought type for stored output; default "shell_output"
SaveOutput bool `json:"save_output"` // store stdout/stderr as a thought
}
type ShellOutput struct {
Stdout string `json:"stdout"`
Stderr string `json:"stderr"`
ExitCode int `json:"exit_code"`
ThoughtID *uuid.UUID `json:"thought_id,omitempty"` // set if save_output=true
}
```
**Execution model:**
1. Validate `command` against `cfg.Shell.AllowedCommands` (if non-empty) and `cfg.Shell.BlockedCommands`
2. `exec.CommandContext(ctx, "sh", "-c", command)` with `Dir` set to working dir
3. Capture stdout + stderr into `bytes.Buffer`
4. On timeout: kill process group (`syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL)`), return exit code -1
5. If `SaveOutput`: call `store.InsertThought` with content = truncated stdout (max 8KB) + stderr summary
**Security controls:**
```yaml
shell:
enabled: false
working_dir: "/tmp/amcs-agent" # all commands run here unless overridden
allowed_working_dirs: # if set, working_dir overrides must be within one of these
- "/tmp/amcs-agent"
- "/home/user/projects"
timeout: 30s
max_output_bytes: 65536 # truncate captured output beyond this
allowed_commands: [] # empty = all; non-empty = exact binary name allowlist
blocked_commands: # checked before allowed_commands
- "rm"
- "sudo"
- "su"
- "curl"
- "wget"
save_output_by_default: false
```
The tool is registered with `mcp.Tool.Annotations` `Destructive: true` so MCP clients prompt for confirmation.
### 4b. File Bridge Tools
Also in `internal/tools/shell.go`:
```go
// read_file_from_path
// Input: { "path": "/abs/path/file.txt", "link_to_thought": "uuid|null" }
// Reads file from server filesystem → stores as AMCS file via store.InsertFile
// Returns: { file_id: "uuid", size_bytes: N, media_type: "text/plain" }
// write_file_to_path
// Input: { "file_id": "uuid", "path": "/abs/path/output.txt" }
// Loads AMCS file → writes to filesystem path
// Path must be within cfg.Shell.AllowedWorkingDirs if set
```
---
## Phase 5: Self-Improving Memory
### 5a. Skill Discovery Job — `internal/agent/skill_discovery.go`
Runs weekly. Algorithm:
1. Load last 30 days of `chat_histories` via `store.ListChatHistories(days=30)`
2. Extract assistant message patterns with `provider.Complete`:
```
System: Identify reusable behavioural patterns or preferences visible in these conversations.
Return a JSON array of { "name": "...", "description": "...", "tags": [...] }.
Only include patterns that would be useful across future sessions.
User: [last N assistant + user messages, newest first]
```
3. For each discovered pattern, call `store.InsertSkill` with tag `auto-discovered` and the current date
4. Link to all projects via `store.LinkSkillToProject`
Deduplication: before inserting, call `store.SearchSkills(pattern.name)` — if similarity > 0.9, skip.
### 5b. Thought Archival Job — `internal/agent/archival.go`
```go
func (j *ArchivalJob) Run(ctx context.Context) error {
// 1. ListThoughts older than cfg.ArchiveOlderThanDays with no knowledge_node link
// SQL: thoughts where created_at < now() - interval '$N days'
// AND metadata->>'knowledge_node' IS DISTINCT FROM 'true'
// AND archived_at IS NULL
// AND id NOT IN (SELECT thought_id FROM thought_links WHERE relation = 'distilled_from')
// 2. For each batch: store.ArchiveThought(ctx, id)
// 3. Log count
}
```
Uses the existing `ArchiveThought` store method — no new SQL needed.
---
## End-to-End Agent Loop Flow
```
Telegram message arrives
channels/telegram/bot.go (long-poll goroutine)
│ InboundMessage{}
channels/router.go Handle()
├── Resolve sender → CRM contact (store.SearchContacts by channel_identifiers)
├── store.InsertThought (source="telegram", type="observation")
├── store.SearchSimilarThoughts (semantic context retrieval)
├── ai.Provider.Complete (build messages → LLM call)
├── store.InsertThought (source="telegram", type="assistant_response")
├── store.InsertChatHistory (full turn saved)
└── channels.Channel.Send (reply dispatched to Telegram)
Meanwhile, every 24h:
agent/engine.go ticker fires DistillJob
├── store.ListThoughts (recent, not yet distilled)
├── store.SearchSimilarThoughts (cluster by semantic similarity)
├── ai.Provider.Summarize (insight extraction prompt)
├── store.InsertThought (type="insight", knowledge_node=true)
└── store.InsertLink (relation="distilled_from" for each source)
After distill:
agent/living_summary.go
├── store.ListKnowledgeNodes
├── store.ListThoughts (type="daily_note", last 30 days)
├── ai.Provider.Summarize (MEMORY.md regeneration)
└── store.UpsertFile (name="MEMORY.md", linked to project)
```
---
## Error Handling & Retry Strategy
| Scenario | Handling |
|----------|----------|
| LLM returns 429 | Mark model unhealthy in `modelHealth` map (existing pattern), return error, engine logs and skips tick |
| LLM returns 5xx | Same as 429 |
| Telegram 429 | Read `retry_after` from response, sleep exact duration, retry immediately |
| Telegram 5xx | Exponential backoff: 5s → 10s → 30s → 60s, reset after success |
| Telegram disconnects | Long-poll timeout naturally retries; context cancel exits cleanly |
| Agent job panics | `engine.runOnce` wraps in `recover()`, logs stack trace, marks run `failed` |
| Agent double-run | `store.StartRun` checks for `running` row < `2 * interval` old → returns `ErrAlreadyRunning`, tick skipped silently |
| Shell command timeout | `exec.CommandContext` kills process group via SIGKILL, returns exit_code=-1 and partial output |
| Distillation partial failure | Each cluster processed independently; failure of one cluster logged and skipped, others continue |
---
## Critical Files
| File | Change |
|------|--------|
| `internal/ai/provider.go` | Add `Complete()`, `CompletionMessage`, `CompletionResult` |
| `internal/ai/compat/client.go` | Implement `Complete()` on `*Client` |
| `internal/config/config.go` | Add `AgentConfig`, `ChannelsConfig`, `ShellConfig` |
| `internal/types/thought.go` | Add `KnowledgeNode`, `KnowledgeWeight`, `Distilled` to `ThoughtMetadata` |
| `internal/store/thoughts.go` | Add `ListKnowledgeNodes()` |
| `internal/store/agent.go` | New: `JobStore` interface + implementation |
| `internal/app/app.go` | Wire agent engine + channel router goroutines |
| `internal/mcpserver/server.go` | Add `Agent`, `Knowledge`, `Channels`, `Shell` to `ToolSet` |
| `internal/agent/` | New package: engine, job, distill, daily_notes, living_summary, archival, skill_discovery |
| `internal/channels/` | New package: channel interface, router, telegram/ |
| `internal/tools/agent.go` | New: list_agent_jobs, trigger_agent_job, get_agent_job_history |
| `internal/tools/knowledge.go` | New: get_knowledge_graph, distill_now |
| `internal/tools/channels.go` | New: send_channel_message, list_channel_conversations, get_channel_status |
| `internal/tools/shell.go` | New: run_shell_command, read_file_from_path, write_file_to_path |
| `migrations/021_agent_jobs.sql` | New table: agent_job_runs |
| `migrations/022_channel_contacts.sql` | ALTER professional_contacts: add channel_identifiers jsonb |
---
## Sequence / Parallelism
```
Phase 1 (Heartbeat Engine) ──► Phase 2 (Knowledge Graph)
└──► Phase 5 (Self-Improving)
Phase 3 (Telegram) ──► Phase 3g (Discord / Slack / Email)
Phase 4 (Shell) [fully independent — no dependencies on other phases]
```
**Minimum viable OpenClaw competitor = Phase 1 + Phase 3** (autonomous scheduling + Telegram channel).
---
## Verification
| Phase | Test |
|-------|------|
| 1 — Heartbeat | Set `distill.interval: 1m` in dev config. Capture 5+ related thoughts. Wait 1 min. Query `thought_links` for `relation=distilled_from` rows. Check `agent_job_runs` has a `status=ok` row. |
| 1 — Daily notes | Set `daily_notes.hour` to current UTC hour. Restart server. Within 1 min, `list_thoughts` should return a `type=daily_note` entry for today. |
| 2 — Knowledge graph | Call `get_knowledge_graph` MCP tool. Verify `nodes` array contains `type=insight` thoughts with `knowledge_node=true`. Verify edges list `distilled_from` links. |
| 3 — Telegram inbound | Send a message to the configured bot. Call `search_thoughts` with the message text — should appear with `source=telegram`. |
| 3 — Telegram response | Send a question to the bot. Verify a reply arrives in Telegram. Call `list_chat_histories` — should contain the turn. |
| 4 — Shell | Call `run_shell_command` with `{"command": "echo hello", "save_output": true}`. Verify `stdout=hello\n`, `exit_code=0`, and a new thought with `type=shell_output`. |
| 4 — Blocked command | Call `run_shell_command` with `{"command": "sudo whoami"}`. Verify error returned without execution. |
| 5 — Skill discovery | Run `trigger_agent_job` with `{"job": "skill_discovery"}`. Verify new rows in `agent_skills` with tag `auto-discovered`. |
| Full loop | Send Telegram message → agent responds → distill job runs → knowledge node created from conversation → MEMORY.md regenerated with new insight. |