Its just a plan, or more of an understanding brainfart
Some checks failed
CI / build-and-test (push) Failing after -30m34s
Some checks failed
CI / build-and-test (push) Failing after -30m34s
This commit is contained in:
826
llm/options_to_openclaw.md
Normal file
826
llm/options_to_openclaw.md
Normal file
@@ -0,0 +1,826 @@
|
||||
# AMCS → OpenClaw Alternative: Gap Analysis & Roadmap
|
||||
|
||||
## Context
|
||||
|
||||
AMCS is a **passive** MCP memory server. OpenClaw's key differentiator is that it's an **always-on autonomous agent** — it proactively acts, monitors, and learns without human prompting. AMCS has the data model and search foundation; it's missing the execution engine and channel integrations that make OpenClaw compelling.
|
||||
|
||||
OpenClaw's 3 pillars AMCS lacks:
|
||||
1. **Autonomous heartbeat** — scheduled jobs that run without user prompts
|
||||
2. **Channel integrations** — 25+ messaging platforms (Telegram, Slack, Discord, email, etc.)
|
||||
3. **Self-improving memory** — knowledge graph distillation, daily notes, living summary (MEMORY.md)
|
||||
|
||||
---
|
||||
|
||||
## Phase 1: Autonomous Heartbeat Engine (Critical — unlocks everything else)
|
||||
|
||||
### 1a. Add `Complete()` to AI Provider
|
||||
|
||||
The current `Provider` interface in `internal/ai/provider.go` only supports `Summarize(ctx, systemPrompt, userPrompt)`. An autonomous agent needs a stateful multi-turn call with tool awareness.
|
||||
|
||||
**Extend the interface:**
|
||||
|
||||
```go
|
||||
// internal/ai/provider.go
|
||||
|
||||
type CompletionRole string
|
||||
|
||||
const (
|
||||
RoleSystem CompletionRole = "system"
|
||||
RoleUser CompletionRole = "user"
|
||||
RoleAssistant CompletionRole = "assistant"
|
||||
)
|
||||
|
||||
type CompletionMessage struct {
|
||||
Role CompletionRole `json:"role"`
|
||||
Content string `json:"content"`
|
||||
}
|
||||
|
||||
type CompletionResult struct {
|
||||
Content string `json:"content"`
|
||||
StopReason string `json:"stop_reason"` // "stop" | "length" | "error"
|
||||
Model string `json:"model"`
|
||||
}
|
||||
|
||||
type Provider interface {
|
||||
Embed(ctx context.Context, input string) ([]float32, error)
|
||||
ExtractMetadata(ctx context.Context, input string) (thoughttypes.ThoughtMetadata, error)
|
||||
Summarize(ctx context.Context, systemPrompt, userPrompt string) (string, error)
|
||||
Complete(ctx context.Context, messages []CompletionMessage) (CompletionResult, error)
|
||||
Name() string
|
||||
EmbeddingModel() string
|
||||
}
|
||||
```
|
||||
|
||||
**Implement in `internal/ai/compat/client.go`:**
|
||||
|
||||
`Complete` is a simplification of the existing `extractMetadataWithModel` path — same OpenAI-compatible `/chat/completions` endpoint, same auth headers, no JSON schema coercion. Add a `chatCompletionsRequest` type (reuse or extend the existing unexported struct) and a `Complete` method on `*Client` that:
|
||||
1. Builds the request body from `[]CompletionMessage`
|
||||
2. POSTs to `c.baseURL + "/chat/completions"` with `c.metadataModel`
|
||||
3. Reads the first choice's `message.content`
|
||||
4. Returns `CompletionResult{Content, StopReason, Model}`
|
||||
|
||||
Error handling mirrors the metadata path: on HTTP 429/503 mark the model unhealthy (`c.modelHealth`), return a wrapped error. No fallback model chain needed for agent calls — callers should retry on next heartbeat tick.
|
||||
|
||||
---
|
||||
|
||||
### 1b. Heartbeat Engine Package
|
||||
|
||||
**New package: `internal/agent/`**
|
||||
|
||||
#### `internal/agent/job.go`
|
||||
|
||||
```go
|
||||
package agent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Job is a single scheduled unit of autonomous work.
|
||||
type Job interface {
|
||||
Name() string
|
||||
Interval() time.Duration
|
||||
Run(ctx context.Context) error
|
||||
}
|
||||
```
|
||||
|
||||
#### `internal/agent/engine.go`
|
||||
|
||||
The engine manages a set of jobs and fires each on its own ticker. It mirrors the pattern already used for `runBackfillPass` and `runMetadataRetryPass` in `internal/app/app.go`, but generalises it.
|
||||
|
||||
```go
|
||||
package agent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Engine struct {
|
||||
jobs []Job
|
||||
store JobStore // persists agent_job_runs rows
|
||||
logger *slog.Logger
|
||||
}
|
||||
|
||||
func NewEngine(store JobStore, logger *slog.Logger, jobs ...Job) *Engine {
|
||||
return &Engine{jobs: jobs, store: store, logger: logger}
|
||||
}
|
||||
|
||||
// Run starts all job tickers and blocks until ctx is cancelled.
|
||||
func (e *Engine) Run(ctx context.Context) {
|
||||
var wg sync.WaitGroup
|
||||
for _, job := range e.jobs {
|
||||
wg.Add(1)
|
||||
go func(j Job) {
|
||||
defer wg.Done()
|
||||
e.runLoop(ctx, j)
|
||||
}(job)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (e *Engine) runLoop(ctx context.Context, j Job) {
|
||||
ticker := time.NewTicker(j.Interval())
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
e.runOnce(ctx, j)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Engine) runOnce(ctx context.Context, j Job) {
|
||||
runID, err := e.store.StartRun(ctx, j.Name())
|
||||
if err != nil {
|
||||
e.logger.Error("agent: failed to start job run record",
|
||||
slog.String("job", j.Name()), slog.String("error", err.Error()))
|
||||
return
|
||||
}
|
||||
if err := j.Run(ctx); err != nil {
|
||||
e.logger.Error("agent: job failed",
|
||||
slog.String("job", j.Name()), slog.String("error", err.Error()))
|
||||
_ = e.store.FinishRun(ctx, runID, "failed", "", err.Error())
|
||||
return
|
||||
}
|
||||
_ = e.store.FinishRun(ctx, runID, "ok", "", "")
|
||||
e.logger.Info("agent: job complete", slog.String("job", j.Name()))
|
||||
}
|
||||
```
|
||||
|
||||
**Deduplication / double-run prevention:** `StartRun` should check for an existing `running` row younger than `2 * j.Interval()` and return `ErrAlreadyRunning` — the caller skips that tick.
|
||||
|
||||
#### `internal/agent/distill.go`
|
||||
|
||||
```go
|
||||
// DistillJob clusters semantically related thoughts and promotes
|
||||
// durable insights into knowledge nodes.
|
||||
type DistillJob struct {
|
||||
store store.ThoughtQuerier
|
||||
provider ai.Provider
|
||||
cfg AgentDistillConfig
|
||||
projectID *uuid.UUID // nil = all projects
|
||||
}
|
||||
|
||||
func (j *DistillJob) Name() string { return "distill" }
|
||||
func (j *DistillJob) Interval() time.Duration { return j.cfg.Interval }
|
||||
|
||||
func (j *DistillJob) Run(ctx context.Context) error {
|
||||
// 1. Fetch recent thoughts not yet distilled (metadata.distilled != true)
|
||||
// using store.ListThoughts with filter Days = cfg.MinAgeHours/24
|
||||
// 2. Group into semantic clusters via SearchSimilarThoughts
|
||||
// 3. For each cluster > MinClusterSize:
|
||||
// a. Call provider.Summarize with insight extraction prompt
|
||||
// b. InsertThought with type="insight", metadata.knowledge_node=true
|
||||
// c. InsertLink from each cluster member to the insight, relation="distilled_from"
|
||||
// d. UpdateThought on each source to set metadata.distilled=true
|
||||
// 4. Return nil; partial failures are logged but do not abort the run
|
||||
}
|
||||
```
|
||||
|
||||
Prompt used in step 3a:
|
||||
```
|
||||
System: You extract durable knowledge from a cluster of related notes.
|
||||
Return a single paragraph (2-5 sentences) capturing the core insight.
|
||||
Do not reference the notes themselves. Write in third person.
|
||||
User: [concatenated thought content, newest first, max 4000 tokens]
|
||||
```
|
||||
|
||||
#### `internal/agent/daily_notes.go`
|
||||
|
||||
Runs at a configured hour each day (checked by comparing `time.Now().Hour()` against `cfg.Hour` inside the loop — skip if already ran today by querying `agent_job_runs` for a successful `daily_notes` run with `started_at >= today midnight`).
|
||||
|
||||
Collects:
|
||||
- Thoughts created today (`store.ListThoughts` with `Days=1`)
|
||||
- CRM interactions logged today
|
||||
- Calendar activities for today
|
||||
- Maintenance logs from today
|
||||
|
||||
Formats into a structured markdown string and calls `store.InsertThought` with `type=daily_note`.
|
||||
|
||||
#### `internal/agent/living_summary.go`
|
||||
|
||||
Regenerates `MEMORY.md` from the last N daily notes + all knowledge nodes. Calls `provider.Summarize` and upserts the result via `store.UpsertFile` using a fixed name `MEMORY.md` scoped to the project (or global if no project).
|
||||
|
||||
---
|
||||
|
||||
### 1c. Config Structs
|
||||
|
||||
Add to `internal/config/config.go`:
|
||||
|
||||
```go
|
||||
type Config struct {
|
||||
// ... existing fields ...
|
||||
Agent AgentConfig `yaml:"agent"`
|
||||
Channels ChannelsConfig `yaml:"channels"`
|
||||
Shell ShellConfig `yaml:"shell"`
|
||||
}
|
||||
|
||||
type AgentConfig struct {
|
||||
Enabled bool `yaml:"enabled"`
|
||||
Distill AgentDistillConfig `yaml:"distill"`
|
||||
DailyNotes AgentDailyNotesConfig `yaml:"daily_notes"`
|
||||
LivingSummary AgentLivingSummary `yaml:"living_summary"`
|
||||
Archival AgentArchivalConfig `yaml:"archival"`
|
||||
Model string `yaml:"model"` // override for agent calls; falls back to AI.Metadata.Model
|
||||
}
|
||||
|
||||
type AgentDistillConfig struct {
|
||||
Enabled bool `yaml:"enabled"`
|
||||
Interval time.Duration `yaml:"interval"` // default: 24h
|
||||
BatchSize int `yaml:"batch_size"` // thoughts per run; default: 50
|
||||
MinClusterSize int `yaml:"min_cluster_size"` // default: 3
|
||||
MinAgeHours int `yaml:"min_age_hours"` // ignore thoughts younger than this; default: 6
|
||||
}
|
||||
|
||||
type AgentDailyNotesConfig struct {
|
||||
Enabled bool `yaml:"enabled"`
|
||||
Hour int `yaml:"hour"` // 0-23 UTC; default: 23
|
||||
}
|
||||
|
||||
type AgentLivingSummary struct {
|
||||
Enabled bool `yaml:"enabled"`
|
||||
Interval time.Duration `yaml:"interval"` // default: 24h
|
||||
MaxDays int `yaml:"max_days"` // daily notes lookback; default: 30
|
||||
}
|
||||
|
||||
type AgentArchivalConfig struct {
|
||||
Enabled bool `yaml:"enabled"`
|
||||
Interval time.Duration `yaml:"interval"` // default: 168h (weekly)
|
||||
ArchiveOlderThan int `yaml:"archive_older_than_days"` // default: 90
|
||||
}
|
||||
```
|
||||
|
||||
**Full YAML reference (`configs/dev.yaml` additions):**
|
||||
|
||||
```yaml
|
||||
agent:
|
||||
enabled: false
|
||||
model: "" # leave blank to reuse ai.metadata.model
|
||||
distill:
|
||||
enabled: false
|
||||
interval: 24h
|
||||
batch_size: 50
|
||||
min_cluster_size: 3
|
||||
min_age_hours: 6
|
||||
daily_notes:
|
||||
enabled: false
|
||||
hour: 23 # UTC hour to generate (0–23)
|
||||
living_summary:
|
||||
enabled: false
|
||||
interval: 24h
|
||||
max_days: 30
|
||||
archival:
|
||||
enabled: false
|
||||
interval: 168h
|
||||
archive_older_than_days: 90
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### 1d. Wire into `internal/app/app.go`
|
||||
|
||||
After the existing `MetadataRetry` goroutine block:
|
||||
|
||||
```go
|
||||
if cfg.Agent.Enabled {
|
||||
jobStore := store.NewJobStore(db)
|
||||
var jobs []agent.Job
|
||||
if cfg.Agent.Distill.Enabled {
|
||||
jobs = append(jobs, agent.NewDistillJob(db, provider, cfg.Agent.Distill, nil))
|
||||
}
|
||||
if cfg.Agent.DailyNotes.Enabled {
|
||||
jobs = append(jobs, agent.NewDailyNotesJob(db, provider, cfg.Agent.DailyNotes))
|
||||
}
|
||||
if cfg.Agent.LivingSummary.Enabled {
|
||||
jobs = append(jobs, agent.NewLivingSummaryJob(db, provider, cfg.Agent.LivingSummary))
|
||||
}
|
||||
if cfg.Agent.Archival.Enabled {
|
||||
jobs = append(jobs, agent.NewArchivalJob(db, cfg.Agent.Archival))
|
||||
}
|
||||
engine := agent.NewEngine(jobStore, logger, jobs...)
|
||||
go engine.Run(ctx)
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### 1e. New MCP Tools — `internal/tools/agent.go`
|
||||
|
||||
```go
|
||||
// list_agent_jobs
|
||||
// Returns all registered jobs with: name, interval, last_run (status, started_at, finished_at), next_run estimate.
|
||||
|
||||
// trigger_agent_job
|
||||
// Input: { "job": "distill" }
|
||||
// Fires the job immediately in a goroutine; returns a run_id for polling.
|
||||
|
||||
// get_agent_job_history
|
||||
// Input: { "job": "distill", "limit": 20 }
|
||||
// Returns rows from agent_job_runs ordered by started_at DESC.
|
||||
```
|
||||
|
||||
Register in `internal/app/app.go` routes by adding `Agent tools.AgentTool` to `mcpserver.ToolSet` and wiring `tools.NewAgentTool(engine)`.
|
||||
|
||||
---
|
||||
|
||||
### 1f. Migration — `migrations/021_agent_jobs.sql`
|
||||
|
||||
```sql
|
||||
CREATE TABLE agent_job_runs (
|
||||
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
job_name text NOT NULL,
|
||||
started_at timestamptz NOT NULL DEFAULT now(),
|
||||
finished_at timestamptz,
|
||||
status text NOT NULL DEFAULT 'running', -- running | ok | failed | skipped
|
||||
output text,
|
||||
error text,
|
||||
metadata jsonb NOT NULL DEFAULT '{}'
|
||||
);
|
||||
|
||||
CREATE INDEX idx_agent_job_runs_lookup
|
||||
ON agent_job_runs (job_name, started_at DESC);
|
||||
```
|
||||
|
||||
**`JobStore` interface (`internal/store/agent.go`):**
|
||||
|
||||
```go
|
||||
type JobStore interface {
|
||||
StartRun(ctx context.Context, jobName string) (uuid.UUID, error)
|
||||
FinishRun(ctx context.Context, id uuid.UUID, status, output, errMsg string) error
|
||||
LastRun(ctx context.Context, jobName string) (*AgentJobRun, error)
|
||||
ListRuns(ctx context.Context, jobName string, limit int) ([]AgentJobRun, error)
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Phase 2: Knowledge Graph Distillation
|
||||
|
||||
Builds on Phase 1's distillation job. `thought_links` already exists with typed `relation` — the missing piece is a way to mark and query promoted knowledge nodes.
|
||||
|
||||
### 2a. Extend `ThoughtMetadata`
|
||||
|
||||
In `internal/types/thought.go`, add two fields to `ThoughtMetadata`:
|
||||
|
||||
```go
|
||||
type ThoughtMetadata struct {
|
||||
// ... existing fields ...
|
||||
KnowledgeNode bool `json:"knowledge_node,omitempty"` // true = promoted insight
|
||||
KnowledgeWeight int `json:"knowledge_weight,omitempty"` // number of source thoughts that fed this node
|
||||
Distilled bool `json:"distilled,omitempty"` // true = this thought has been processed by distill job
|
||||
}
|
||||
```
|
||||
|
||||
These are stored in the existing `metadata jsonb` column — no schema migration needed.
|
||||
|
||||
### 2b. Store Addition
|
||||
|
||||
In `internal/store/thoughts.go` add:
|
||||
|
||||
```go
|
||||
// ListKnowledgeNodes returns thoughts where metadata->>'knowledge_node' = 'true',
|
||||
// ordered by knowledge_weight DESC, then created_at DESC.
|
||||
func (db *DB) ListKnowledgeNodes(ctx context.Context, projectID *uuid.UUID, limit int) ([]types.Thought, error)
|
||||
```
|
||||
|
||||
SQL:
|
||||
```sql
|
||||
SELECT id, content, metadata, project_id, archived_at, created_at, updated_at
|
||||
FROM thoughts
|
||||
WHERE (metadata->>'knowledge_node')::boolean = true
|
||||
AND ($1::uuid IS NULL OR project_id = $1)
|
||||
AND archived_at IS NULL
|
||||
ORDER BY (metadata->>'knowledge_weight')::int DESC, created_at DESC
|
||||
LIMIT $2
|
||||
```
|
||||
|
||||
### 2c. New MCP Tools — `internal/tools/knowledge.go`
|
||||
|
||||
```go
|
||||
// get_knowledge_graph
|
||||
// Input: { "project_id": "uuid|null", "limit": 50 }
|
||||
// Returns: { nodes: [Thought], edges: [ThoughtLink] }
|
||||
// Fetches ListKnowledgeNodes + their outgoing/incoming links via store.GetThoughtLinks.
|
||||
|
||||
// distill_now
|
||||
// Input: { "project_id": "uuid|null", "batch_size": 20 }
|
||||
// Triggers the distillation job synchronously (for on-demand use); returns { insights_created: N }
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Phase 3: Channel Integrations — Telegram First
|
||||
|
||||
### 3a. Channel Adapter Interface — `internal/channels/channel.go`
|
||||
|
||||
```go
|
||||
package channels
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Attachment struct {
|
||||
Name string
|
||||
MediaType string
|
||||
Data []byte
|
||||
}
|
||||
|
||||
type InboundMessage struct {
|
||||
ChannelID string // e.g. telegram chat ID as string
|
||||
SenderID string // e.g. telegram user ID as string
|
||||
SenderName string // display name
|
||||
Text string
|
||||
Attachments []Attachment
|
||||
Timestamp time.Time
|
||||
Raw any // original platform message for debug/logging
|
||||
}
|
||||
|
||||
type Channel interface {
|
||||
Name() string
|
||||
Start(ctx context.Context, handler func(InboundMessage)) error
|
||||
Send(ctx context.Context, channelID string, text string) error
|
||||
}
|
||||
```
|
||||
|
||||
### 3b. Telegram Implementation — `internal/channels/telegram/bot.go`
|
||||
|
||||
Uses `net/http` only (no external Telegram SDK). Long-polling loop:
|
||||
|
||||
```go
|
||||
type Bot struct {
|
||||
token string
|
||||
allowedIDs map[int64]struct{} // empty = all allowed
|
||||
baseURL string // https://api.telegram.org/bot{token}
|
||||
client *http.Client
|
||||
offset int64
|
||||
logger *slog.Logger
|
||||
}
|
||||
|
||||
func (b *Bot) Name() string { return "telegram" }
|
||||
|
||||
func (b *Bot) Start(ctx context.Context, handler func(channels.InboundMessage)) error {
|
||||
for {
|
||||
updates, err := b.getUpdates(ctx, b.offset, 30 /*timeout seconds*/)
|
||||
if err != nil {
|
||||
if ctx.Err() != nil { return nil }
|
||||
// transient error: log and back off 5s
|
||||
time.Sleep(5 * time.Second)
|
||||
continue
|
||||
}
|
||||
for _, u := range updates {
|
||||
b.offset = u.UpdateID + 1
|
||||
if u.Message == nil { continue }
|
||||
if !b.isAllowed(u.Message.Chat.ID) { continue }
|
||||
handler(b.toInbound(u.Message))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Bot) Send(ctx context.Context, channelID string, text string) error {
|
||||
// POST /sendMessage with chat_id and text
|
||||
// Splits messages > 4096 chars automatically
|
||||
}
|
||||
```
|
||||
|
||||
**Error handling:**
|
||||
- HTTP 401 (bad token): return fatal error, engine stops channel
|
||||
- HTTP 429 (rate limit): respect `retry_after` from response body, sleep, retry
|
||||
- HTTP 5xx: exponential backoff (5s → 10s → 30s → 60s), max 3 retries then sleep 5 min
|
||||
|
||||
### 3c. Channel Router — `internal/channels/router.go`
|
||||
|
||||
```go
|
||||
type Router struct {
|
||||
store store.ContactQuerier
|
||||
thoughts store.ThoughtInserter
|
||||
provider ai.Provider
|
||||
channels map[string]channels.Channel
|
||||
cfg config.ChannelsConfig
|
||||
logger *slog.Logger
|
||||
}
|
||||
|
||||
func (r *Router) Handle(msg channels.InboundMessage) {
|
||||
// 1. Resolve sender → CRM contact (by channel_identifiers->>'telegram' = senderID)
|
||||
// If not found: create a new professional_contact with the sender name + channel identifier
|
||||
// 2. Capture message as thought:
|
||||
// content = msg.Text
|
||||
// metadata.source = "telegram"
|
||||
// metadata.type = "observation"
|
||||
// metadata.people = [senderName]
|
||||
// metadata (extra, stored in JSONB): channel="telegram", channel_id=msg.ChannelID, sender_id=msg.SenderID
|
||||
// 3. If cfg.Telegram.Respond:
|
||||
// a. Load recent context via store.SearchSimilarThoughts(msg.Text, limit=10)
|
||||
// b. Build []CompletionMessage with system context + recent thoughts + user message
|
||||
// c. Call provider.Complete(ctx, messages)
|
||||
// d. Capture response as thought (type="assistant_response", source="telegram")
|
||||
// e. Send reply via channel.Send(ctx, msg.ChannelID, result.Content)
|
||||
// f. Save chat history via store.InsertChatHistory
|
||||
}
|
||||
```
|
||||
|
||||
**Agent response system prompt (step 3b):**
|
||||
```
|
||||
You are a personal assistant with access to the user's memory.
|
||||
Relevant context from memory:
|
||||
{joined recent thought content}
|
||||
|
||||
Respond concisely. If you cannot answer from memory, say so.
|
||||
```
|
||||
|
||||
### 3d. Config — full YAML reference
|
||||
|
||||
```yaml
|
||||
channels:
|
||||
telegram:
|
||||
enabled: false
|
||||
bot_token: ""
|
||||
allowed_chat_ids: [] # empty = all chats allowed
|
||||
capture_all: true # save every inbound message as a thought
|
||||
respond: true # send LLM reply back to sender
|
||||
response_model: "" # blank = uses agent.model or ai.metadata.model
|
||||
poll_timeout_seconds: 30 # Telegram long-poll timeout (max 60)
|
||||
max_message_length: 4096 # split replies longer than this
|
||||
discord:
|
||||
enabled: false
|
||||
bot_token: ""
|
||||
guild_ids: [] # empty = all guilds
|
||||
capture_all: true
|
||||
respond: true
|
||||
slack:
|
||||
enabled: false
|
||||
bot_token: ""
|
||||
app_token: "" # for socket mode
|
||||
capture_all: true
|
||||
respond: true
|
||||
email:
|
||||
enabled: false
|
||||
imap_host: ""
|
||||
imap_port: 993
|
||||
smtp_host: ""
|
||||
smtp_port: 587
|
||||
username: ""
|
||||
password: ""
|
||||
poll_interval: 5m
|
||||
capture_all: true
|
||||
folders: ["INBOX"]
|
||||
```
|
||||
|
||||
### 3e. Schema Migration — `migrations/022_channel_contacts.sql`
|
||||
|
||||
```sql
|
||||
-- Store per-channel identity handles on CRM contacts
|
||||
ALTER TABLE professional_contacts
|
||||
ADD COLUMN IF NOT EXISTS channel_identifiers jsonb NOT NULL DEFAULT '{}';
|
||||
|
||||
-- e.g. {"telegram": "123456789", "discord": "user#1234", "slack": "U01234567"}
|
||||
CREATE INDEX idx_contacts_telegram_id
|
||||
ON professional_contacts ((channel_identifiers->>'telegram'))
|
||||
WHERE channel_identifiers->>'telegram' IS NOT NULL;
|
||||
```
|
||||
|
||||
### 3f. New MCP Tools — `internal/tools/channels.go`
|
||||
|
||||
```go
|
||||
// send_channel_message
|
||||
// Input: { "channel": "telegram", "channel_id": "123456789", "text": "Hello" }
|
||||
// Sends a message on the named channel. Returns { sent: true, channel: "telegram" }
|
||||
|
||||
// list_channel_conversations
|
||||
// Input: { "channel": "telegram", "limit": 20, "days": 7 }
|
||||
// Lists chat histories filtered by channel metadata. Wraps store.ListChatHistories.
|
||||
|
||||
// get_channel_status
|
||||
// Returns: [{ channel: "telegram", connected: true, uptime_seconds: 3600 }, ...]
|
||||
```
|
||||
|
||||
### 3g. Future Channel Adapters
|
||||
|
||||
Each is a new subdirectory implementing `channels.Channel`. No router or MCP tool changes needed.
|
||||
|
||||
| Channel | Package | Approach |
|
||||
|---------|---------|----------|
|
||||
| Discord | `internal/channels/discord/` | Gateway WebSocket (discord.com/api/gateway); or use `discordgo` lib |
|
||||
| Slack | `internal/channels/slack/` | Socket Mode WebSocket (no public URL needed) |
|
||||
| Email (IMAP) | `internal/channels/email/` | IMAP IDLE or poll; SMTP for send |
|
||||
| Signal | `internal/channels/signal/` | Wrap `signal-cli` JSON-RPC subprocess |
|
||||
| WhatsApp | `internal/channels/whatsapp/` | Meta Cloud API webhook (requires public URL) |
|
||||
|
||||
---
|
||||
|
||||
## Phase 4: Shell / Computer Access
|
||||
|
||||
### 4a. Shell Tool — `internal/tools/shell.go`
|
||||
|
||||
```go
|
||||
type ShellInput struct {
|
||||
Command string `json:"command"`
|
||||
WorkingDir string `json:"working_dir,omitempty"` // override default; must be within allowed prefix
|
||||
Timeout string `json:"timeout,omitempty"` // e.g. "30s"; overrides config default
|
||||
CaptureAs string `json:"capture_as,omitempty"` // thought type for stored output; default "shell_output"
|
||||
SaveOutput bool `json:"save_output"` // store stdout/stderr as a thought
|
||||
}
|
||||
|
||||
type ShellOutput struct {
|
||||
Stdout string `json:"stdout"`
|
||||
Stderr string `json:"stderr"`
|
||||
ExitCode int `json:"exit_code"`
|
||||
ThoughtID *uuid.UUID `json:"thought_id,omitempty"` // set if save_output=true
|
||||
}
|
||||
```
|
||||
|
||||
**Execution model:**
|
||||
1. Validate `command` against `cfg.Shell.AllowedCommands` (if non-empty) and `cfg.Shell.BlockedCommands`
|
||||
2. `exec.CommandContext(ctx, "sh", "-c", command)` with `Dir` set to working dir
|
||||
3. Capture stdout + stderr into `bytes.Buffer`
|
||||
4. On timeout: kill process group (`syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL)`), return exit code -1
|
||||
5. If `SaveOutput`: call `store.InsertThought` with content = truncated stdout (max 8KB) + stderr summary
|
||||
|
||||
**Security controls:**
|
||||
|
||||
```yaml
|
||||
shell:
|
||||
enabled: false
|
||||
working_dir: "/tmp/amcs-agent" # all commands run here unless overridden
|
||||
allowed_working_dirs: # if set, working_dir overrides must be within one of these
|
||||
- "/tmp/amcs-agent"
|
||||
- "/home/user/projects"
|
||||
timeout: 30s
|
||||
max_output_bytes: 65536 # truncate captured output beyond this
|
||||
allowed_commands: [] # empty = all; non-empty = exact binary name allowlist
|
||||
blocked_commands: # checked before allowed_commands
|
||||
- "rm"
|
||||
- "sudo"
|
||||
- "su"
|
||||
- "curl"
|
||||
- "wget"
|
||||
save_output_by_default: false
|
||||
```
|
||||
|
||||
The tool is registered with `mcp.Tool.Annotations` `Destructive: true` so MCP clients prompt for confirmation.
|
||||
|
||||
### 4b. File Bridge Tools
|
||||
|
||||
Also in `internal/tools/shell.go`:
|
||||
|
||||
```go
|
||||
// read_file_from_path
|
||||
// Input: { "path": "/abs/path/file.txt", "link_to_thought": "uuid|null" }
|
||||
// Reads file from server filesystem → stores as AMCS file via store.InsertFile
|
||||
// Returns: { file_id: "uuid", size_bytes: N, media_type: "text/plain" }
|
||||
|
||||
// write_file_to_path
|
||||
// Input: { "file_id": "uuid", "path": "/abs/path/output.txt" }
|
||||
// Loads AMCS file → writes to filesystem path
|
||||
// Path must be within cfg.Shell.AllowedWorkingDirs if set
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Phase 5: Self-Improving Memory
|
||||
|
||||
### 5a. Skill Discovery Job — `internal/agent/skill_discovery.go`
|
||||
|
||||
Runs weekly. Algorithm:
|
||||
|
||||
1. Load last 30 days of `chat_histories` via `store.ListChatHistories(days=30)`
|
||||
2. Extract assistant message patterns with `provider.Complete`:
|
||||
```
|
||||
System: Identify reusable behavioural patterns or preferences visible in these conversations.
|
||||
Return a JSON array of { "name": "...", "description": "...", "tags": [...] }.
|
||||
Only include patterns that would be useful across future sessions.
|
||||
User: [last N assistant + user messages, newest first]
|
||||
```
|
||||
3. For each discovered pattern, call `store.InsertSkill` with tag `auto-discovered` and the current date
|
||||
4. Link to all projects via `store.LinkSkillToProject`
|
||||
|
||||
Deduplication: before inserting, call `store.SearchSkills(pattern.name)` — if similarity > 0.9, skip.
|
||||
|
||||
### 5b. Thought Archival Job — `internal/agent/archival.go`
|
||||
|
||||
```go
|
||||
func (j *ArchivalJob) Run(ctx context.Context) error {
|
||||
// 1. ListThoughts older than cfg.ArchiveOlderThanDays with no knowledge_node link
|
||||
// SQL: thoughts where created_at < now() - interval '$N days'
|
||||
// AND metadata->>'knowledge_node' IS DISTINCT FROM 'true'
|
||||
// AND archived_at IS NULL
|
||||
// AND id NOT IN (SELECT thought_id FROM thought_links WHERE relation = 'distilled_from')
|
||||
// 2. For each batch: store.ArchiveThought(ctx, id)
|
||||
// 3. Log count
|
||||
}
|
||||
```
|
||||
|
||||
Uses the existing `ArchiveThought` store method — no new SQL needed.
|
||||
|
||||
---
|
||||
|
||||
## End-to-End Agent Loop Flow
|
||||
|
||||
```
|
||||
Telegram message arrives
|
||||
│
|
||||
▼
|
||||
channels/telegram/bot.go (long-poll goroutine)
|
||||
│ InboundMessage{}
|
||||
▼
|
||||
channels/router.go Handle()
|
||||
├── Resolve sender → CRM contact (store.SearchContacts by channel_identifiers)
|
||||
├── store.InsertThought (source="telegram", type="observation")
|
||||
├── store.SearchSimilarThoughts (semantic context retrieval)
|
||||
├── ai.Provider.Complete (build messages → LLM call)
|
||||
├── store.InsertThought (source="telegram", type="assistant_response")
|
||||
├── store.InsertChatHistory (full turn saved)
|
||||
└── channels.Channel.Send (reply dispatched to Telegram)
|
||||
|
||||
Meanwhile, every 24h:
|
||||
agent/engine.go ticker fires DistillJob
|
||||
├── store.ListThoughts (recent, not yet distilled)
|
||||
├── store.SearchSimilarThoughts (cluster by semantic similarity)
|
||||
├── ai.Provider.Summarize (insight extraction prompt)
|
||||
├── store.InsertThought (type="insight", knowledge_node=true)
|
||||
└── store.InsertLink (relation="distilled_from" for each source)
|
||||
|
||||
After distill:
|
||||
agent/living_summary.go
|
||||
├── store.ListKnowledgeNodes
|
||||
├── store.ListThoughts (type="daily_note", last 30 days)
|
||||
├── ai.Provider.Summarize (MEMORY.md regeneration)
|
||||
└── store.UpsertFile (name="MEMORY.md", linked to project)
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Error Handling & Retry Strategy
|
||||
|
||||
| Scenario | Handling |
|
||||
|----------|----------|
|
||||
| LLM returns 429 | Mark model unhealthy in `modelHealth` map (existing pattern), return error, engine logs and skips tick |
|
||||
| LLM returns 5xx | Same as 429 |
|
||||
| Telegram 429 | Read `retry_after` from response, sleep exact duration, retry immediately |
|
||||
| Telegram 5xx | Exponential backoff: 5s → 10s → 30s → 60s, reset after success |
|
||||
| Telegram disconnects | Long-poll timeout naturally retries; context cancel exits cleanly |
|
||||
| Agent job panics | `engine.runOnce` wraps in `recover()`, logs stack trace, marks run `failed` |
|
||||
| Agent double-run | `store.StartRun` checks for `running` row < `2 * interval` old → returns `ErrAlreadyRunning`, tick skipped silently |
|
||||
| Shell command timeout | `exec.CommandContext` kills process group via SIGKILL, returns exit_code=-1 and partial output |
|
||||
| Distillation partial failure | Each cluster processed independently; failure of one cluster logged and skipped, others continue |
|
||||
|
||||
---
|
||||
|
||||
## Critical Files
|
||||
|
||||
| File | Change |
|
||||
|------|--------|
|
||||
| `internal/ai/provider.go` | Add `Complete()`, `CompletionMessage`, `CompletionResult` |
|
||||
| `internal/ai/compat/client.go` | Implement `Complete()` on `*Client` |
|
||||
| `internal/config/config.go` | Add `AgentConfig`, `ChannelsConfig`, `ShellConfig` |
|
||||
| `internal/types/thought.go` | Add `KnowledgeNode`, `KnowledgeWeight`, `Distilled` to `ThoughtMetadata` |
|
||||
| `internal/store/thoughts.go` | Add `ListKnowledgeNodes()` |
|
||||
| `internal/store/agent.go` | New: `JobStore` interface + implementation |
|
||||
| `internal/app/app.go` | Wire agent engine + channel router goroutines |
|
||||
| `internal/mcpserver/server.go` | Add `Agent`, `Knowledge`, `Channels`, `Shell` to `ToolSet` |
|
||||
| `internal/agent/` | New package: engine, job, distill, daily_notes, living_summary, archival, skill_discovery |
|
||||
| `internal/channels/` | New package: channel interface, router, telegram/ |
|
||||
| `internal/tools/agent.go` | New: list_agent_jobs, trigger_agent_job, get_agent_job_history |
|
||||
| `internal/tools/knowledge.go` | New: get_knowledge_graph, distill_now |
|
||||
| `internal/tools/channels.go` | New: send_channel_message, list_channel_conversations, get_channel_status |
|
||||
| `internal/tools/shell.go` | New: run_shell_command, read_file_from_path, write_file_to_path |
|
||||
| `migrations/021_agent_jobs.sql` | New table: agent_job_runs |
|
||||
| `migrations/022_channel_contacts.sql` | ALTER professional_contacts: add channel_identifiers jsonb |
|
||||
|
||||
---
|
||||
|
||||
## Sequence / Parallelism
|
||||
|
||||
```
|
||||
Phase 1 (Heartbeat Engine) ──► Phase 2 (Knowledge Graph)
|
||||
└──► Phase 5 (Self-Improving)
|
||||
|
||||
Phase 3 (Telegram) ──► Phase 3g (Discord / Slack / Email)
|
||||
|
||||
Phase 4 (Shell) [fully independent — no dependencies on other phases]
|
||||
```
|
||||
|
||||
**Minimum viable OpenClaw competitor = Phase 1 + Phase 3** (autonomous scheduling + Telegram channel).
|
||||
|
||||
---
|
||||
|
||||
## Verification
|
||||
|
||||
| Phase | Test |
|
||||
|-------|------|
|
||||
| 1 — Heartbeat | Set `distill.interval: 1m` in dev config. Capture 5+ related thoughts. Wait 1 min. Query `thought_links` for `relation=distilled_from` rows. Check `agent_job_runs` has a `status=ok` row. |
|
||||
| 1 — Daily notes | Set `daily_notes.hour` to current UTC hour. Restart server. Within 1 min, `list_thoughts` should return a `type=daily_note` entry for today. |
|
||||
| 2 — Knowledge graph | Call `get_knowledge_graph` MCP tool. Verify `nodes` array contains `type=insight` thoughts with `knowledge_node=true`. Verify edges list `distilled_from` links. |
|
||||
| 3 — Telegram inbound | Send a message to the configured bot. Call `search_thoughts` with the message text — should appear with `source=telegram`. |
|
||||
| 3 — Telegram response | Send a question to the bot. Verify a reply arrives in Telegram. Call `list_chat_histories` — should contain the turn. |
|
||||
| 4 — Shell | Call `run_shell_command` with `{"command": "echo hello", "save_output": true}`. Verify `stdout=hello\n`, `exit_code=0`, and a new thought with `type=shell_output`. |
|
||||
| 4 — Blocked command | Call `run_shell_command` with `{"command": "sudo whoami"}`. Verify error returned without execution. |
|
||||
| 5 — Skill discovery | Run `trigger_agent_job` with `{"job": "skill_discovery"}`. Verify new rows in `agent_skills` with tag `auto-discovered`. |
|
||||
| Full loop | Send Telegram message → agent responds → distill job runs → knowledge node created from conversation → MEMORY.md regenerated with new insight. |
|
||||
Reference in New Issue
Block a user