827 lines
30 KiB
Markdown
827 lines
30 KiB
Markdown
# AMCS → OpenClaw Alternative: Gap Analysis & Roadmap
|
||
|
||
## Context
|
||
|
||
AMCS is a **passive** MCP memory server. OpenClaw's key differentiator is that it's an **always-on autonomous agent** — it proactively acts, monitors, and learns without human prompting. AMCS has the data model and search foundation; it's missing the execution engine and channel integrations that make OpenClaw compelling.
|
||
|
||
OpenClaw's 3 pillars AMCS lacks:
|
||
1. **Autonomous heartbeat** — scheduled jobs that run without user prompts
|
||
2. **Channel integrations** — 25+ messaging platforms (Telegram, Slack, Discord, email, etc.)
|
||
3. **Self-improving memory** — knowledge graph distillation, daily notes, living summary (MEMORY.md)
|
||
|
||
---
|
||
|
||
## Phase 1: Autonomous Heartbeat Engine (Critical — unlocks everything else)
|
||
|
||
### 1a. Add `Complete()` to AI Provider
|
||
|
||
The current `Provider` interface in `internal/ai/provider.go` only supports `Summarize(ctx, systemPrompt, userPrompt)`. An autonomous agent needs a stateful multi-turn call with tool awareness.
|
||
|
||
**Extend the interface:**
|
||
|
||
```go
|
||
// internal/ai/provider.go
|
||
|
||
type CompletionRole string
|
||
|
||
const (
|
||
RoleSystem CompletionRole = "system"
|
||
RoleUser CompletionRole = "user"
|
||
RoleAssistant CompletionRole = "assistant"
|
||
)
|
||
|
||
type CompletionMessage struct {
|
||
Role CompletionRole `json:"role"`
|
||
Content string `json:"content"`
|
||
}
|
||
|
||
type CompletionResult struct {
|
||
Content string `json:"content"`
|
||
StopReason string `json:"stop_reason"` // "stop" | "length" | "error"
|
||
Model string `json:"model"`
|
||
}
|
||
|
||
type Provider interface {
|
||
Embed(ctx context.Context, input string) ([]float32, error)
|
||
ExtractMetadata(ctx context.Context, input string) (thoughttypes.ThoughtMetadata, error)
|
||
Summarize(ctx context.Context, systemPrompt, userPrompt string) (string, error)
|
||
Complete(ctx context.Context, messages []CompletionMessage) (CompletionResult, error)
|
||
Name() string
|
||
EmbeddingModel() string
|
||
}
|
||
```
|
||
|
||
**Implement in `internal/ai/compat/client.go`:**
|
||
|
||
`Complete` is a simplification of the existing `extractMetadataWithModel` path — same OpenAI-compatible `/chat/completions` endpoint, same auth headers, no JSON schema coercion. Add a `chatCompletionsRequest` type (reuse or extend the existing unexported struct) and a `Complete` method on `*Client` that:
|
||
1. Builds the request body from `[]CompletionMessage`
|
||
2. POSTs to `c.baseURL + "/chat/completions"` with `c.metadataModel`
|
||
3. Reads the first choice's `message.content`
|
||
4. Returns `CompletionResult{Content, StopReason, Model}`
|
||
|
||
Error handling mirrors the metadata path: on HTTP 429/503 mark the model unhealthy (`c.modelHealth`), return a wrapped error. No fallback model chain needed for agent calls — callers should retry on next heartbeat tick.
|
||
|
||
---
|
||
|
||
### 1b. Heartbeat Engine Package
|
||
|
||
**New package: `internal/agent/`**
|
||
|
||
#### `internal/agent/job.go`
|
||
|
||
```go
|
||
package agent
|
||
|
||
import (
|
||
"context"
|
||
"time"
|
||
)
|
||
|
||
// Job is a single scheduled unit of autonomous work.
|
||
type Job interface {
|
||
Name() string
|
||
Interval() time.Duration
|
||
Run(ctx context.Context) error
|
||
}
|
||
```
|
||
|
||
#### `internal/agent/engine.go`
|
||
|
||
The engine manages a set of jobs and fires each on its own ticker. It mirrors the pattern already used for `runBackfillPass` and `runMetadataRetryPass` in `internal/app/app.go`, but generalises it.
|
||
|
||
```go
|
||
package agent
|
||
|
||
import (
|
||
"context"
|
||
"log/slog"
|
||
"sync"
|
||
"time"
|
||
)
|
||
|
||
type Engine struct {
|
||
jobs []Job
|
||
store JobStore // persists agent_job_runs rows
|
||
logger *slog.Logger
|
||
}
|
||
|
||
func NewEngine(store JobStore, logger *slog.Logger, jobs ...Job) *Engine {
|
||
return &Engine{jobs: jobs, store: store, logger: logger}
|
||
}
|
||
|
||
// Run starts all job tickers and blocks until ctx is cancelled.
|
||
func (e *Engine) Run(ctx context.Context) {
|
||
var wg sync.WaitGroup
|
||
for _, job := range e.jobs {
|
||
wg.Add(1)
|
||
go func(j Job) {
|
||
defer wg.Done()
|
||
e.runLoop(ctx, j)
|
||
}(job)
|
||
}
|
||
wg.Wait()
|
||
}
|
||
|
||
func (e *Engine) runLoop(ctx context.Context, j Job) {
|
||
ticker := time.NewTicker(j.Interval())
|
||
defer ticker.Stop()
|
||
for {
|
||
select {
|
||
case <-ctx.Done():
|
||
return
|
||
case <-ticker.C:
|
||
e.runOnce(ctx, j)
|
||
}
|
||
}
|
||
}
|
||
|
||
func (e *Engine) runOnce(ctx context.Context, j Job) {
|
||
runID, err := e.store.StartRun(ctx, j.Name())
|
||
if err != nil {
|
||
e.logger.Error("agent: failed to start job run record",
|
||
slog.String("job", j.Name()), slog.String("error", err.Error()))
|
||
return
|
||
}
|
||
if err := j.Run(ctx); err != nil {
|
||
e.logger.Error("agent: job failed",
|
||
slog.String("job", j.Name()), slog.String("error", err.Error()))
|
||
_ = e.store.FinishRun(ctx, runID, "failed", "", err.Error())
|
||
return
|
||
}
|
||
_ = e.store.FinishRun(ctx, runID, "ok", "", "")
|
||
e.logger.Info("agent: job complete", slog.String("job", j.Name()))
|
||
}
|
||
```
|
||
|
||
**Deduplication / double-run prevention:** `StartRun` should check for an existing `running` row younger than `2 * j.Interval()` and return `ErrAlreadyRunning` — the caller skips that tick.
|
||
|
||
#### `internal/agent/distill.go`
|
||
|
||
```go
|
||
// DistillJob clusters semantically related thoughts and promotes
|
||
// durable insights into knowledge nodes.
|
||
type DistillJob struct {
|
||
store store.ThoughtQuerier
|
||
provider ai.Provider
|
||
cfg AgentDistillConfig
|
||
projectID *uuid.UUID // nil = all projects
|
||
}
|
||
|
||
func (j *DistillJob) Name() string { return "distill" }
|
||
func (j *DistillJob) Interval() time.Duration { return j.cfg.Interval }
|
||
|
||
func (j *DistillJob) Run(ctx context.Context) error {
|
||
// 1. Fetch recent thoughts not yet distilled (metadata.distilled != true)
|
||
// using store.ListThoughts with filter Days = cfg.MinAgeHours/24
|
||
// 2. Group into semantic clusters via SearchSimilarThoughts
|
||
// 3. For each cluster > MinClusterSize:
|
||
// a. Call provider.Summarize with insight extraction prompt
|
||
// b. InsertThought with type="insight", metadata.knowledge_node=true
|
||
// c. InsertLink from each cluster member to the insight, relation="distilled_from"
|
||
// d. UpdateThought on each source to set metadata.distilled=true
|
||
// 4. Return nil; partial failures are logged but do not abort the run
|
||
}
|
||
```
|
||
|
||
Prompt used in step 3a:
|
||
```
|
||
System: You extract durable knowledge from a cluster of related notes.
|
||
Return a single paragraph (2-5 sentences) capturing the core insight.
|
||
Do not reference the notes themselves. Write in third person.
|
||
User: [concatenated thought content, newest first, max 4000 tokens]
|
||
```
|
||
|
||
#### `internal/agent/daily_notes.go`
|
||
|
||
Runs at a configured hour each day (checked by comparing `time.Now().Hour()` against `cfg.Hour` inside the loop — skip if already ran today by querying `agent_job_runs` for a successful `daily_notes` run with `started_at >= today midnight`).
|
||
|
||
Collects:
|
||
- Thoughts created today (`store.ListThoughts` with `Days=1`)
|
||
- CRM interactions logged today
|
||
- Calendar activities for today
|
||
- Maintenance logs from today
|
||
|
||
Formats into a structured markdown string and calls `store.InsertThought` with `type=daily_note`.
|
||
|
||
#### `internal/agent/living_summary.go`
|
||
|
||
Regenerates `MEMORY.md` from the last N daily notes + all knowledge nodes. Calls `provider.Summarize` and upserts the result via `store.UpsertFile` using a fixed name `MEMORY.md` scoped to the project (or global if no project).
|
||
|
||
---
|
||
|
||
### 1c. Config Structs
|
||
|
||
Add to `internal/config/config.go`:
|
||
|
||
```go
|
||
type Config struct {
|
||
// ... existing fields ...
|
||
Agent AgentConfig `yaml:"agent"`
|
||
Channels ChannelsConfig `yaml:"channels"`
|
||
Shell ShellConfig `yaml:"shell"`
|
||
}
|
||
|
||
type AgentConfig struct {
|
||
Enabled bool `yaml:"enabled"`
|
||
Distill AgentDistillConfig `yaml:"distill"`
|
||
DailyNotes AgentDailyNotesConfig `yaml:"daily_notes"`
|
||
LivingSummary AgentLivingSummary `yaml:"living_summary"`
|
||
Archival AgentArchivalConfig `yaml:"archival"`
|
||
Model string `yaml:"model"` // override for agent calls; falls back to AI.Metadata.Model
|
||
}
|
||
|
||
type AgentDistillConfig struct {
|
||
Enabled bool `yaml:"enabled"`
|
||
Interval time.Duration `yaml:"interval"` // default: 24h
|
||
BatchSize int `yaml:"batch_size"` // thoughts per run; default: 50
|
||
MinClusterSize int `yaml:"min_cluster_size"` // default: 3
|
||
MinAgeHours int `yaml:"min_age_hours"` // ignore thoughts younger than this; default: 6
|
||
}
|
||
|
||
type AgentDailyNotesConfig struct {
|
||
Enabled bool `yaml:"enabled"`
|
||
Hour int `yaml:"hour"` // 0-23 UTC; default: 23
|
||
}
|
||
|
||
type AgentLivingSummary struct {
|
||
Enabled bool `yaml:"enabled"`
|
||
Interval time.Duration `yaml:"interval"` // default: 24h
|
||
MaxDays int `yaml:"max_days"` // daily notes lookback; default: 30
|
||
}
|
||
|
||
type AgentArchivalConfig struct {
|
||
Enabled bool `yaml:"enabled"`
|
||
Interval time.Duration `yaml:"interval"` // default: 168h (weekly)
|
||
ArchiveOlderThan int `yaml:"archive_older_than_days"` // default: 90
|
||
}
|
||
```
|
||
|
||
**Full YAML reference (`configs/dev.yaml` additions):**
|
||
|
||
```yaml
|
||
agent:
|
||
enabled: false
|
||
model: "" # leave blank to reuse ai.metadata.model
|
||
distill:
|
||
enabled: false
|
||
interval: 24h
|
||
batch_size: 50
|
||
min_cluster_size: 3
|
||
min_age_hours: 6
|
||
daily_notes:
|
||
enabled: false
|
||
hour: 23 # UTC hour to generate (0–23)
|
||
living_summary:
|
||
enabled: false
|
||
interval: 24h
|
||
max_days: 30
|
||
archival:
|
||
enabled: false
|
||
interval: 168h
|
||
archive_older_than_days: 90
|
||
```
|
||
|
||
---
|
||
|
||
### 1d. Wire into `internal/app/app.go`
|
||
|
||
After the existing `MetadataRetry` goroutine block:
|
||
|
||
```go
|
||
if cfg.Agent.Enabled {
|
||
jobStore := store.NewJobStore(db)
|
||
var jobs []agent.Job
|
||
if cfg.Agent.Distill.Enabled {
|
||
jobs = append(jobs, agent.NewDistillJob(db, provider, cfg.Agent.Distill, nil))
|
||
}
|
||
if cfg.Agent.DailyNotes.Enabled {
|
||
jobs = append(jobs, agent.NewDailyNotesJob(db, provider, cfg.Agent.DailyNotes))
|
||
}
|
||
if cfg.Agent.LivingSummary.Enabled {
|
||
jobs = append(jobs, agent.NewLivingSummaryJob(db, provider, cfg.Agent.LivingSummary))
|
||
}
|
||
if cfg.Agent.Archival.Enabled {
|
||
jobs = append(jobs, agent.NewArchivalJob(db, cfg.Agent.Archival))
|
||
}
|
||
engine := agent.NewEngine(jobStore, logger, jobs...)
|
||
go engine.Run(ctx)
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
### 1e. New MCP Tools — `internal/tools/agent.go`
|
||
|
||
```go
|
||
// list_agent_jobs
|
||
// Returns all registered jobs with: name, interval, last_run (status, started_at, finished_at), next_run estimate.
|
||
|
||
// trigger_agent_job
|
||
// Input: { "job": "distill" }
|
||
// Fires the job immediately in a goroutine; returns a run_id for polling.
|
||
|
||
// get_agent_job_history
|
||
// Input: { "job": "distill", "limit": 20 }
|
||
// Returns rows from agent_job_runs ordered by started_at DESC.
|
||
```
|
||
|
||
Register in `internal/app/app.go` routes by adding `Agent tools.AgentTool` to `mcpserver.ToolSet` and wiring `tools.NewAgentTool(engine)`.
|
||
|
||
---
|
||
|
||
### 1f. Migration — `migrations/021_agent_jobs.sql`
|
||
|
||
```sql
|
||
CREATE TABLE agent_job_runs (
|
||
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
|
||
job_name text NOT NULL,
|
||
started_at timestamptz NOT NULL DEFAULT now(),
|
||
finished_at timestamptz,
|
||
status text NOT NULL DEFAULT 'running', -- running | ok | failed | skipped
|
||
output text,
|
||
error text,
|
||
metadata jsonb NOT NULL DEFAULT '{}'
|
||
);
|
||
|
||
CREATE INDEX idx_agent_job_runs_lookup
|
||
ON agent_job_runs (job_name, started_at DESC);
|
||
```
|
||
|
||
**`JobStore` interface (`internal/store/agent.go`):**
|
||
|
||
```go
|
||
type JobStore interface {
|
||
StartRun(ctx context.Context, jobName string) (uuid.UUID, error)
|
||
FinishRun(ctx context.Context, id uuid.UUID, status, output, errMsg string) error
|
||
LastRun(ctx context.Context, jobName string) (*AgentJobRun, error)
|
||
ListRuns(ctx context.Context, jobName string, limit int) ([]AgentJobRun, error)
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
## Phase 2: Knowledge Graph Distillation
|
||
|
||
Builds on Phase 1's distillation job. `thought_links` already exists with typed `relation` — the missing piece is a way to mark and query promoted knowledge nodes.
|
||
|
||
### 2a. Extend `ThoughtMetadata`
|
||
|
||
In `internal/types/thought.go`, add two fields to `ThoughtMetadata`:
|
||
|
||
```go
|
||
type ThoughtMetadata struct {
|
||
// ... existing fields ...
|
||
KnowledgeNode bool `json:"knowledge_node,omitempty"` // true = promoted insight
|
||
KnowledgeWeight int `json:"knowledge_weight,omitempty"` // number of source thoughts that fed this node
|
||
Distilled bool `json:"distilled,omitempty"` // true = this thought has been processed by distill job
|
||
}
|
||
```
|
||
|
||
These are stored in the existing `metadata jsonb` column — no schema migration needed.
|
||
|
||
### 2b. Store Addition
|
||
|
||
In `internal/store/thoughts.go` add:
|
||
|
||
```go
|
||
// ListKnowledgeNodes returns thoughts where metadata->>'knowledge_node' = 'true',
|
||
// ordered by knowledge_weight DESC, then created_at DESC.
|
||
func (db *DB) ListKnowledgeNodes(ctx context.Context, projectID *uuid.UUID, limit int) ([]types.Thought, error)
|
||
```
|
||
|
||
SQL:
|
||
```sql
|
||
SELECT id, content, metadata, project_id, archived_at, created_at, updated_at
|
||
FROM thoughts
|
||
WHERE (metadata->>'knowledge_node')::boolean = true
|
||
AND ($1::uuid IS NULL OR project_id = $1)
|
||
AND archived_at IS NULL
|
||
ORDER BY (metadata->>'knowledge_weight')::int DESC, created_at DESC
|
||
LIMIT $2
|
||
```
|
||
|
||
### 2c. New MCP Tools — `internal/tools/knowledge.go`
|
||
|
||
```go
|
||
// get_knowledge_graph
|
||
// Input: { "project_id": "uuid|null", "limit": 50 }
|
||
// Returns: { nodes: [Thought], edges: [ThoughtLink] }
|
||
// Fetches ListKnowledgeNodes + their outgoing/incoming links via store.GetThoughtLinks.
|
||
|
||
// distill_now
|
||
// Input: { "project_id": "uuid|null", "batch_size": 20 }
|
||
// Triggers the distillation job synchronously (for on-demand use); returns { insights_created: N }
|
||
```
|
||
|
||
---
|
||
|
||
## Phase 3: Channel Integrations — Telegram First
|
||
|
||
### 3a. Channel Adapter Interface — `internal/channels/channel.go`
|
||
|
||
```go
|
||
package channels
|
||
|
||
import (
|
||
"context"
|
||
"time"
|
||
)
|
||
|
||
type Attachment struct {
|
||
Name string
|
||
MediaType string
|
||
Data []byte
|
||
}
|
||
|
||
type InboundMessage struct {
|
||
ChannelID string // e.g. telegram chat ID as string
|
||
SenderID string // e.g. telegram user ID as string
|
||
SenderName string // display name
|
||
Text string
|
||
Attachments []Attachment
|
||
Timestamp time.Time
|
||
Raw any // original platform message for debug/logging
|
||
}
|
||
|
||
type Channel interface {
|
||
Name() string
|
||
Start(ctx context.Context, handler func(InboundMessage)) error
|
||
Send(ctx context.Context, channelID string, text string) error
|
||
}
|
||
```
|
||
|
||
### 3b. Telegram Implementation — `internal/channels/telegram/bot.go`
|
||
|
||
Uses `net/http` only (no external Telegram SDK). Long-polling loop:
|
||
|
||
```go
|
||
type Bot struct {
|
||
token string
|
||
allowedIDs map[int64]struct{} // empty = all allowed
|
||
baseURL string // https://api.telegram.org/bot{token}
|
||
client *http.Client
|
||
offset int64
|
||
logger *slog.Logger
|
||
}
|
||
|
||
func (b *Bot) Name() string { return "telegram" }
|
||
|
||
func (b *Bot) Start(ctx context.Context, handler func(channels.InboundMessage)) error {
|
||
for {
|
||
updates, err := b.getUpdates(ctx, b.offset, 30 /*timeout seconds*/)
|
||
if err != nil {
|
||
if ctx.Err() != nil { return nil }
|
||
// transient error: log and back off 5s
|
||
time.Sleep(5 * time.Second)
|
||
continue
|
||
}
|
||
for _, u := range updates {
|
||
b.offset = u.UpdateID + 1
|
||
if u.Message == nil { continue }
|
||
if !b.isAllowed(u.Message.Chat.ID) { continue }
|
||
handler(b.toInbound(u.Message))
|
||
}
|
||
}
|
||
}
|
||
|
||
func (b *Bot) Send(ctx context.Context, channelID string, text string) error {
|
||
// POST /sendMessage with chat_id and text
|
||
// Splits messages > 4096 chars automatically
|
||
}
|
||
```
|
||
|
||
**Error handling:**
|
||
- HTTP 401 (bad token): return fatal error, engine stops channel
|
||
- HTTP 429 (rate limit): respect `retry_after` from response body, sleep, retry
|
||
- HTTP 5xx: exponential backoff (5s → 10s → 30s → 60s), max 3 retries then sleep 5 min
|
||
|
||
### 3c. Channel Router — `internal/channels/router.go`
|
||
|
||
```go
|
||
type Router struct {
|
||
store store.ContactQuerier
|
||
thoughts store.ThoughtInserter
|
||
provider ai.Provider
|
||
channels map[string]channels.Channel
|
||
cfg config.ChannelsConfig
|
||
logger *slog.Logger
|
||
}
|
||
|
||
func (r *Router) Handle(msg channels.InboundMessage) {
|
||
// 1. Resolve sender → CRM contact (by channel_identifiers->>'telegram' = senderID)
|
||
// If not found: create a new professional_contact with the sender name + channel identifier
|
||
// 2. Capture message as thought:
|
||
// content = msg.Text
|
||
// metadata.source = "telegram"
|
||
// metadata.type = "observation"
|
||
// metadata.people = [senderName]
|
||
// metadata (extra, stored in JSONB): channel="telegram", channel_id=msg.ChannelID, sender_id=msg.SenderID
|
||
// 3. If cfg.Telegram.Respond:
|
||
// a. Load recent context via store.SearchSimilarThoughts(msg.Text, limit=10)
|
||
// b. Build []CompletionMessage with system context + recent thoughts + user message
|
||
// c. Call provider.Complete(ctx, messages)
|
||
// d. Capture response as thought (type="assistant_response", source="telegram")
|
||
// e. Send reply via channel.Send(ctx, msg.ChannelID, result.Content)
|
||
// f. Save chat history via store.InsertChatHistory
|
||
}
|
||
```
|
||
|
||
**Agent response system prompt (step 3b):**
|
||
```
|
||
You are a personal assistant with access to the user's memory.
|
||
Relevant context from memory:
|
||
{joined recent thought content}
|
||
|
||
Respond concisely. If you cannot answer from memory, say so.
|
||
```
|
||
|
||
### 3d. Config — full YAML reference
|
||
|
||
```yaml
|
||
channels:
|
||
telegram:
|
||
enabled: false
|
||
bot_token: ""
|
||
allowed_chat_ids: [] # empty = all chats allowed
|
||
capture_all: true # save every inbound message as a thought
|
||
respond: true # send LLM reply back to sender
|
||
response_model: "" # blank = uses agent.model or ai.metadata.model
|
||
poll_timeout_seconds: 30 # Telegram long-poll timeout (max 60)
|
||
max_message_length: 4096 # split replies longer than this
|
||
discord:
|
||
enabled: false
|
||
bot_token: ""
|
||
guild_ids: [] # empty = all guilds
|
||
capture_all: true
|
||
respond: true
|
||
slack:
|
||
enabled: false
|
||
bot_token: ""
|
||
app_token: "" # for socket mode
|
||
capture_all: true
|
||
respond: true
|
||
email:
|
||
enabled: false
|
||
imap_host: ""
|
||
imap_port: 993
|
||
smtp_host: ""
|
||
smtp_port: 587
|
||
username: ""
|
||
password: ""
|
||
poll_interval: 5m
|
||
capture_all: true
|
||
folders: ["INBOX"]
|
||
```
|
||
|
||
### 3e. Schema Migration — `migrations/022_channel_contacts.sql`
|
||
|
||
```sql
|
||
-- Store per-channel identity handles on CRM contacts
|
||
ALTER TABLE professional_contacts
|
||
ADD COLUMN IF NOT EXISTS channel_identifiers jsonb NOT NULL DEFAULT '{}';
|
||
|
||
-- e.g. {"telegram": "123456789", "discord": "user#1234", "slack": "U01234567"}
|
||
CREATE INDEX idx_contacts_telegram_id
|
||
ON professional_contacts ((channel_identifiers->>'telegram'))
|
||
WHERE channel_identifiers->>'telegram' IS NOT NULL;
|
||
```
|
||
|
||
### 3f. New MCP Tools — `internal/tools/channels.go`
|
||
|
||
```go
|
||
// send_channel_message
|
||
// Input: { "channel": "telegram", "channel_id": "123456789", "text": "Hello" }
|
||
// Sends a message on the named channel. Returns { sent: true, channel: "telegram" }
|
||
|
||
// list_channel_conversations
|
||
// Input: { "channel": "telegram", "limit": 20, "days": 7 }
|
||
// Lists chat histories filtered by channel metadata. Wraps store.ListChatHistories.
|
||
|
||
// get_channel_status
|
||
// Returns: [{ channel: "telegram", connected: true, uptime_seconds: 3600 }, ...]
|
||
```
|
||
|
||
### 3g. Future Channel Adapters
|
||
|
||
Each is a new subdirectory implementing `channels.Channel`. No router or MCP tool changes needed.
|
||
|
||
| Channel | Package | Approach |
|
||
|---------|---------|----------|
|
||
| Discord | `internal/channels/discord/` | Gateway WebSocket (discord.com/api/gateway); or use `discordgo` lib |
|
||
| Slack | `internal/channels/slack/` | Socket Mode WebSocket (no public URL needed) |
|
||
| Email (IMAP) | `internal/channels/email/` | IMAP IDLE or poll; SMTP for send |
|
||
| Signal | `internal/channels/signal/` | Wrap `signal-cli` JSON-RPC subprocess |
|
||
| WhatsApp | `internal/channels/whatsapp/` | Meta Cloud API webhook (requires public URL) |
|
||
|
||
---
|
||
|
||
## Phase 4: Shell / Computer Access
|
||
|
||
### 4a. Shell Tool — `internal/tools/shell.go`
|
||
|
||
```go
|
||
type ShellInput struct {
|
||
Command string `json:"command"`
|
||
WorkingDir string `json:"working_dir,omitempty"` // override default; must be within allowed prefix
|
||
Timeout string `json:"timeout,omitempty"` // e.g. "30s"; overrides config default
|
||
CaptureAs string `json:"capture_as,omitempty"` // thought type for stored output; default "shell_output"
|
||
SaveOutput bool `json:"save_output"` // store stdout/stderr as a thought
|
||
}
|
||
|
||
type ShellOutput struct {
|
||
Stdout string `json:"stdout"`
|
||
Stderr string `json:"stderr"`
|
||
ExitCode int `json:"exit_code"`
|
||
ThoughtID *uuid.UUID `json:"thought_id,omitempty"` // set if save_output=true
|
||
}
|
||
```
|
||
|
||
**Execution model:**
|
||
1. Validate `command` against `cfg.Shell.AllowedCommands` (if non-empty) and `cfg.Shell.BlockedCommands`
|
||
2. `exec.CommandContext(ctx, "sh", "-c", command)` with `Dir` set to working dir
|
||
3. Capture stdout + stderr into `bytes.Buffer`
|
||
4. On timeout: kill process group (`syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL)`), return exit code -1
|
||
5. If `SaveOutput`: call `store.InsertThought` with content = truncated stdout (max 8KB) + stderr summary
|
||
|
||
**Security controls:**
|
||
|
||
```yaml
|
||
shell:
|
||
enabled: false
|
||
working_dir: "/tmp/amcs-agent" # all commands run here unless overridden
|
||
allowed_working_dirs: # if set, working_dir overrides must be within one of these
|
||
- "/tmp/amcs-agent"
|
||
- "/home/user/projects"
|
||
timeout: 30s
|
||
max_output_bytes: 65536 # truncate captured output beyond this
|
||
allowed_commands: [] # empty = all; non-empty = exact binary name allowlist
|
||
blocked_commands: # checked before allowed_commands
|
||
- "rm"
|
||
- "sudo"
|
||
- "su"
|
||
- "curl"
|
||
- "wget"
|
||
save_output_by_default: false
|
||
```
|
||
|
||
The tool is registered with `mcp.Tool.Annotations` `Destructive: true` so MCP clients prompt for confirmation.
|
||
|
||
### 4b. File Bridge Tools
|
||
|
||
Also in `internal/tools/shell.go`:
|
||
|
||
```go
|
||
// read_file_from_path
|
||
// Input: { "path": "/abs/path/file.txt", "link_to_thought": "uuid|null" }
|
||
// Reads file from server filesystem → stores as AMCS file via store.InsertFile
|
||
// Returns: { file_id: "uuid", size_bytes: N, media_type: "text/plain" }
|
||
|
||
// write_file_to_path
|
||
// Input: { "file_id": "uuid", "path": "/abs/path/output.txt" }
|
||
// Loads AMCS file → writes to filesystem path
|
||
// Path must be within cfg.Shell.AllowedWorkingDirs if set
|
||
```
|
||
|
||
---
|
||
|
||
## Phase 5: Self-Improving Memory
|
||
|
||
### 5a. Skill Discovery Job — `internal/agent/skill_discovery.go`
|
||
|
||
Runs weekly. Algorithm:
|
||
|
||
1. Load last 30 days of `chat_histories` via `store.ListChatHistories(days=30)`
|
||
2. Extract assistant message patterns with `provider.Complete`:
|
||
```
|
||
System: Identify reusable behavioural patterns or preferences visible in these conversations.
|
||
Return a JSON array of { "name": "...", "description": "...", "tags": [...] }.
|
||
Only include patterns that would be useful across future sessions.
|
||
User: [last N assistant + user messages, newest first]
|
||
```
|
||
3. For each discovered pattern, call `store.InsertSkill` with tag `auto-discovered` and the current date
|
||
4. Link to all projects via `store.LinkSkillToProject`
|
||
|
||
Deduplication: before inserting, call `store.SearchSkills(pattern.name)` — if similarity > 0.9, skip.
|
||
|
||
### 5b. Thought Archival Job — `internal/agent/archival.go`
|
||
|
||
```go
|
||
func (j *ArchivalJob) Run(ctx context.Context) error {
|
||
// 1. ListThoughts older than cfg.ArchiveOlderThanDays with no knowledge_node link
|
||
// SQL: thoughts where created_at < now() - interval '$N days'
|
||
// AND metadata->>'knowledge_node' IS DISTINCT FROM 'true'
|
||
// AND archived_at IS NULL
|
||
// AND id NOT IN (SELECT thought_id FROM thought_links WHERE relation = 'distilled_from')
|
||
// 2. For each batch: store.ArchiveThought(ctx, id)
|
||
// 3. Log count
|
||
}
|
||
```
|
||
|
||
Uses the existing `ArchiveThought` store method — no new SQL needed.
|
||
|
||
---
|
||
|
||
## End-to-End Agent Loop Flow
|
||
|
||
```
|
||
Telegram message arrives
|
||
│
|
||
▼
|
||
channels/telegram/bot.go (long-poll goroutine)
|
||
│ InboundMessage{}
|
||
▼
|
||
channels/router.go Handle()
|
||
├── Resolve sender → CRM contact (store.SearchContacts by channel_identifiers)
|
||
├── store.InsertThought (source="telegram", type="observation")
|
||
├── store.SearchSimilarThoughts (semantic context retrieval)
|
||
├── ai.Provider.Complete (build messages → LLM call)
|
||
├── store.InsertThought (source="telegram", type="assistant_response")
|
||
├── store.InsertChatHistory (full turn saved)
|
||
└── channels.Channel.Send (reply dispatched to Telegram)
|
||
|
||
Meanwhile, every 24h:
|
||
agent/engine.go ticker fires DistillJob
|
||
├── store.ListThoughts (recent, not yet distilled)
|
||
├── store.SearchSimilarThoughts (cluster by semantic similarity)
|
||
├── ai.Provider.Summarize (insight extraction prompt)
|
||
├── store.InsertThought (type="insight", knowledge_node=true)
|
||
└── store.InsertLink (relation="distilled_from" for each source)
|
||
|
||
After distill:
|
||
agent/living_summary.go
|
||
├── store.ListKnowledgeNodes
|
||
├── store.ListThoughts (type="daily_note", last 30 days)
|
||
├── ai.Provider.Summarize (MEMORY.md regeneration)
|
||
└── store.UpsertFile (name="MEMORY.md", linked to project)
|
||
```
|
||
|
||
---
|
||
|
||
## Error Handling & Retry Strategy
|
||
|
||
| Scenario | Handling |
|
||
|----------|----------|
|
||
| LLM returns 429 | Mark model unhealthy in `modelHealth` map (existing pattern), return error, engine logs and skips tick |
|
||
| LLM returns 5xx | Same as 429 |
|
||
| Telegram 429 | Read `retry_after` from response, sleep exact duration, retry immediately |
|
||
| Telegram 5xx | Exponential backoff: 5s → 10s → 30s → 60s, reset after success |
|
||
| Telegram disconnects | Long-poll timeout naturally retries; context cancel exits cleanly |
|
||
| Agent job panics | `engine.runOnce` wraps in `recover()`, logs stack trace, marks run `failed` |
|
||
| Agent double-run | `store.StartRun` checks for `running` row < `2 * interval` old → returns `ErrAlreadyRunning`, tick skipped silently |
|
||
| Shell command timeout | `exec.CommandContext` kills process group via SIGKILL, returns exit_code=-1 and partial output |
|
||
| Distillation partial failure | Each cluster processed independently; failure of one cluster logged and skipped, others continue |
|
||
|
||
---
|
||
|
||
## Critical Files
|
||
|
||
| File | Change |
|
||
|------|--------|
|
||
| `internal/ai/provider.go` | Add `Complete()`, `CompletionMessage`, `CompletionResult` |
|
||
| `internal/ai/compat/client.go` | Implement `Complete()` on `*Client` |
|
||
| `internal/config/config.go` | Add `AgentConfig`, `ChannelsConfig`, `ShellConfig` |
|
||
| `internal/types/thought.go` | Add `KnowledgeNode`, `KnowledgeWeight`, `Distilled` to `ThoughtMetadata` |
|
||
| `internal/store/thoughts.go` | Add `ListKnowledgeNodes()` |
|
||
| `internal/store/agent.go` | New: `JobStore` interface + implementation |
|
||
| `internal/app/app.go` | Wire agent engine + channel router goroutines |
|
||
| `internal/mcpserver/server.go` | Add `Agent`, `Knowledge`, `Channels`, `Shell` to `ToolSet` |
|
||
| `internal/agent/` | New package: engine, job, distill, daily_notes, living_summary, archival, skill_discovery |
|
||
| `internal/channels/` | New package: channel interface, router, telegram/ |
|
||
| `internal/tools/agent.go` | New: list_agent_jobs, trigger_agent_job, get_agent_job_history |
|
||
| `internal/tools/knowledge.go` | New: get_knowledge_graph, distill_now |
|
||
| `internal/tools/channels.go` | New: send_channel_message, list_channel_conversations, get_channel_status |
|
||
| `internal/tools/shell.go` | New: run_shell_command, read_file_from_path, write_file_to_path |
|
||
| `migrations/021_agent_jobs.sql` | New table: agent_job_runs |
|
||
| `migrations/022_channel_contacts.sql` | ALTER professional_contacts: add channel_identifiers jsonb |
|
||
|
||
---
|
||
|
||
## Sequence / Parallelism
|
||
|
||
```
|
||
Phase 1 (Heartbeat Engine) ──► Phase 2 (Knowledge Graph)
|
||
└──► Phase 5 (Self-Improving)
|
||
|
||
Phase 3 (Telegram) ──► Phase 3g (Discord / Slack / Email)
|
||
|
||
Phase 4 (Shell) [fully independent — no dependencies on other phases]
|
||
```
|
||
|
||
**Minimum viable OpenClaw competitor = Phase 1 + Phase 3** (autonomous scheduling + Telegram channel).
|
||
|
||
---
|
||
|
||
## Verification
|
||
|
||
| Phase | Test |
|
||
|-------|------|
|
||
| 1 — Heartbeat | Set `distill.interval: 1m` in dev config. Capture 5+ related thoughts. Wait 1 min. Query `thought_links` for `relation=distilled_from` rows. Check `agent_job_runs` has a `status=ok` row. |
|
||
| 1 — Daily notes | Set `daily_notes.hour` to current UTC hour. Restart server. Within 1 min, `list_thoughts` should return a `type=daily_note` entry for today. |
|
||
| 2 — Knowledge graph | Call `get_knowledge_graph` MCP tool. Verify `nodes` array contains `type=insight` thoughts with `knowledge_node=true`. Verify edges list `distilled_from` links. |
|
||
| 3 — Telegram inbound | Send a message to the configured bot. Call `search_thoughts` with the message text — should appear with `source=telegram`. |
|
||
| 3 — Telegram response | Send a question to the bot. Verify a reply arrives in Telegram. Call `list_chat_histories` — should contain the turn. |
|
||
| 4 — Shell | Call `run_shell_command` with `{"command": "echo hello", "save_output": true}`. Verify `stdout=hello\n`, `exit_code=0`, and a new thought with `type=shell_output`. |
|
||
| 4 — Blocked command | Call `run_shell_command` with `{"command": "sudo whoami"}`. Verify error returned without execution. |
|
||
| 5 — Skill discovery | Run `trigger_agent_job` with `{"job": "skill_discovery"}`. Verify new rows in `agent_skills` with tag `auto-discovered`. |
|
||
| Full loop | Send Telegram message → agent responds → distill job runs → knowledge node created from conversation → MEMORY.md regenerated with new insight. |
|