Files
ResolveSpec/pkg/mqttspec/README.md
2025-12-30 14:12:36 +02:00

725 lines
18 KiB
Markdown

# MQTTSpec - MQTT-based Database Query Framework
MQTTSpec is an MQTT-based database query framework that enables real-time database operations and subscriptions via MQTT protocol. It mirrors the functionality of WebSocketSpec but uses MQTT as the transport layer, making it ideal for IoT applications, mobile apps with unreliable networks, and distributed systems requiring QoS guarantees.
## Features
- **Dual Broker Support**: Embedded broker (Mochi MQTT) or external broker connection (Paho MQTT)
- **QoS 1 (At-least-once delivery)**: Reliable message delivery for all operations
- **Full CRUD Operations**: Create, Read, Update, Delete with hooks
- **Real-time Subscriptions**: Subscribe to entity changes with filtering
- **Database Agnostic**: GORM and Bun ORM support
- **Lifecycle Hooks**: 12 hooks for authentication, authorization, validation, and auditing
- **Multi-tenancy Support**: Built-in tenant isolation via hooks
- **Thread-safe**: Proper concurrency handling throughout
## Installation
```bash
go get github.com/bitechdev/ResolveSpec/pkg/mqttspec
```
## Quick Start
### Embedded Broker (Default)
```go
package main
import (
"github.com/bitechdev/ResolveSpec/pkg/mqttspec"
"gorm.io/driver/postgres"
"gorm.io/gorm"
)
type User struct {
ID uint `json:"id" gorm:"primaryKey"`
Name string `json:"name"`
Email string `json:"email"`
Status string `json:"status"`
}
func main() {
// Connect to database
db, _ := gorm.Open(postgres.Open("postgres://..."), &gorm.Config{})
db.AutoMigrate(&User{})
// Create MQTT handler with embedded broker
handler, err := mqttspec.NewHandlerWithGORM(db)
if err != nil {
panic(err)
}
// Register models
handler.Registry().RegisterModel("public.users", &User{})
// Start handler (starts embedded broker on localhost:1883)
if err := handler.Start(); err != nil {
panic(err)
}
// Handler is now listening for MQTT messages
select {} // Keep running
}
```
### External Broker
```go
handler, err := mqttspec.NewHandlerWithGORM(db,
mqttspec.WithExternalBroker(mqttspec.ExternalBrokerConfig{
BrokerURL: "tcp://mqtt.example.com:1883",
ClientID: "mqttspec-server",
Username: "admin",
Password: "secret",
ConnectTimeout: 10 * time.Second,
}),
)
```
### Custom Port (Embedded Broker)
```go
handler, err := mqttspec.NewHandlerWithGORM(db,
mqttspec.WithEmbeddedBroker(mqttspec.BrokerConfig{
Host: "0.0.0.0",
Port: 1884,
}),
)
```
## Topic Structure
MQTTSpec uses a client-based topic hierarchy:
```
spec/{client_id}/request # Client publishes requests
spec/{client_id}/response # Server publishes responses
spec/{client_id}/notify/{sub_id} # Server publishes notifications
```
### Wildcard Subscriptions
- **Server**: `spec/+/request` (receives all client requests)
- **Client**: `spec/{client_id}/response` + `spec/{client_id}/notify/+`
## Message Protocol
MQTTSpec uses the same JSON message structure as WebSocketSpec and ResolveSpec for consistency.
### Request Message
```json
{
"id": "msg-123",
"type": "request",
"operation": "read",
"schema": "public",
"entity": "users",
"options": {
"filters": [
{"column": "status", "operator": "eq", "value": "active"}
],
"sort": [{"column": "created_at", "direction": "desc"}],
"limit": 10
}
}
```
### Response Message
```json
{
"id": "msg-123",
"type": "response",
"success": true,
"data": [
{"id": 1, "name": "John Doe", "email": "john@example.com", "status": "active"},
{"id": 2, "name": "Jane Smith", "email": "jane@example.com", "status": "active"}
],
"metadata": {
"total": 50,
"count": 2
}
}
```
### Notification Message
```json
{
"type": "notification",
"operation": "create",
"subscription_id": "sub-xyz",
"schema": "public",
"entity": "users",
"data": {
"id": 3,
"name": "New User",
"email": "new@example.com",
"status": "active"
}
}
```
## CRUD Operations
### Read (Single Record)
**MQTT Client Publishes to**: `spec/{client_id}/request`
```json
{
"id": "msg-1",
"type": "request",
"operation": "read",
"schema": "public",
"entity": "users",
"data": {"id": 1}
}
```
**Server Publishes Response to**: `spec/{client_id}/response`
```json
{
"id": "msg-1",
"success": true,
"data": {"id": 1, "name": "John Doe", "email": "john@example.com"}
}
```
### Read (Multiple Records with Filtering)
```json
{
"id": "msg-2",
"type": "request",
"operation": "read",
"schema": "public",
"entity": "users",
"options": {
"filters": [
{"column": "status", "operator": "eq", "value": "active"}
],
"sort": [{"column": "name", "direction": "asc"}],
"limit": 20,
"offset": 0
}
}
```
### Create
```json
{
"id": "msg-3",
"type": "request",
"operation": "create",
"schema": "public",
"entity": "users",
"data": {
"name": "Alice Brown",
"email": "alice@example.com",
"status": "active"
}
}
```
### Update
```json
{
"id": "msg-4",
"type": "request",
"operation": "update",
"schema": "public",
"entity": "users",
"data": {
"id": 1,
"status": "inactive"
}
}
```
### Delete
```json
{
"id": "msg-5",
"type": "request",
"operation": "delete",
"schema": "public",
"entity": "users",
"data": {"id": 1}
}
```
## Real-time Subscriptions
### Subscribe to Entity Changes
**Client Publishes to**: `spec/{client_id}/request`
```json
{
"id": "msg-6",
"type": "subscription",
"operation": "subscribe",
"schema": "public",
"entity": "users",
"options": {
"filters": [
{"column": "status", "operator": "eq", "value": "active"}
]
}
}
```
**Server Response** (published to `spec/{client_id}/response`):
```json
{
"id": "msg-6",
"success": true,
"data": {
"subscription_id": "sub-abc123",
"notify_topic": "spec/{client_id}/notify/sub-abc123"
}
}
```
**Client Then Subscribes** to MQTT topic: `spec/{client_id}/notify/sub-abc123`
### Receiving Notifications
When any client creates/updates/deletes a user matching the subscription filters, the subscriber receives:
```json
{
"type": "notification",
"operation": "create",
"subscription_id": "sub-abc123",
"schema": "public",
"entity": "users",
"data": {
"id": 10,
"name": "New User",
"email": "newuser@example.com",
"status": "active"
}
}
```
### Unsubscribe
```json
{
"id": "msg-7",
"type": "subscription",
"operation": "unsubscribe",
"data": {
"subscription_id": "sub-abc123"
}
}
```
## Lifecycle Hooks
MQTTSpec provides 12 lifecycle hooks for implementing cross-cutting concerns:
### Hook Types
- `BeforeConnect` / `AfterConnect` - Connection lifecycle
- `BeforeDisconnect` / `AfterDisconnect` - Disconnection lifecycle
- `BeforeRead` / `AfterRead` - Read operations
- `BeforeCreate` / `AfterCreate` - Create operations
- `BeforeUpdate` / `AfterUpdate` - Update operations
- `BeforeDelete` / `AfterDelete` - Delete operations
- `BeforeSubscribe` / `AfterSubscribe` - Subscription creation
- `BeforeUnsubscribe` / `AfterUnsubscribe` - Subscription removal
### Authentication Example (JWT)
```go
handler.Hooks().Register(mqttspec.BeforeConnect, func(ctx *mqttspec.HookContext) error {
client := ctx.Metadata["mqtt_client"].(*mqttspec.Client)
// MQTT username contains JWT token
token := client.Username
claims, err := jwt.Validate(token)
if err != nil {
return fmt.Errorf("invalid token: %w", err)
}
// Store user info in client metadata for later use
client.SetMetadata("user_id", claims.UserID)
client.SetMetadata("tenant_id", claims.TenantID)
client.SetMetadata("roles", claims.Roles)
logger.Info("Client authenticated: user_id=%d, tenant=%s", claims.UserID, claims.TenantID)
return nil
})
```
### Multi-tenancy Example
```go
// Auto-inject tenant filter for all read operations
handler.Hooks().Register(mqttspec.BeforeRead, func(ctx *mqttspec.HookContext) error {
client := ctx.Metadata["mqtt_client"].(*mqttspec.Client)
tenantID, _ := client.GetMetadata("tenant_id")
// Add tenant filter to ensure users only see their own data
ctx.Options.Filters = append(ctx.Options.Filters, common.FilterOption{
Column: "tenant_id",
Operator: "eq",
Value: tenantID,
})
return nil
})
// Auto-set tenant_id for all create operations
handler.Hooks().Register(mqttspec.BeforeCreate, func(ctx *mqttspec.HookContext) error {
client := ctx.Metadata["mqtt_client"].(*mqttspec.Client)
tenantID, _ := client.GetMetadata("tenant_id")
// Inject tenant_id into new records
if dataMap, ok := ctx.Data.(map[string]interface{}); ok {
dataMap["tenant_id"] = tenantID
}
return nil
})
```
### Role-based Access Control (RBAC)
```go
handler.Hooks().Register(mqttspec.BeforeDelete, func(ctx *mqttspec.HookContext) error {
client := ctx.Metadata["mqtt_client"].(*mqttspec.Client)
roles, _ := client.GetMetadata("roles")
roleList := roles.([]string)
hasAdminRole := false
for _, role := range roleList {
if role == "admin" {
hasAdminRole = true
break
}
}
if !hasAdminRole {
return fmt.Errorf("permission denied: delete requires admin role")
}
return nil
})
```
### Audit Logging Example
```go
handler.Hooks().Register(mqttspec.AfterCreate, func(ctx *mqttspec.HookContext) error {
client := ctx.Metadata["mqtt_client"].(*mqttspec.Client)
userID, _ := client.GetMetadata("user_id")
logger.Info("Audit: user %d created %s.%s record: %+v",
userID, ctx.Schema, ctx.Entity, ctx.Result)
// Could also write to audit log table
return nil
})
```
## Client Examples
### JavaScript (MQTT.js)
```javascript
const mqtt = require('mqtt');
// Connect to MQTT broker
const client = mqtt.connect('mqtt://localhost:1883', {
clientId: 'client-abc123',
username: 'your-jwt-token',
password: '', // JWT in username, password can be empty
});
client.on('connect', () => {
console.log('Connected to MQTT broker');
// Subscribe to responses
client.subscribe('spec/client-abc123/response');
// Read users
const readMsg = {
id: 'msg-1',
type: 'request',
operation: 'read',
schema: 'public',
entity: 'users',
options: {
filters: [
{ column: 'status', operator: 'eq', value: 'active' }
]
}
};
client.publish('spec/client-abc123/request', JSON.stringify(readMsg));
});
client.on('message', (topic, payload) => {
const message = JSON.parse(payload.toString());
console.log('Received:', message);
if (message.type === 'response') {
console.log('Response data:', message.data);
} else if (message.type === 'notification') {
console.log('Notification:', message.operation, message.data);
}
});
```
### Python (paho-mqtt)
```python
import paho.mqtt.client as mqtt
import json
client_id = 'client-python-123'
def on_connect(client, userdata, flags, rc):
print(f"Connected with result code {rc}")
# Subscribe to responses
client.subscribe(f"spec/{client_id}/response")
# Create a user
create_msg = {
'id': 'msg-create-1',
'type': 'request',
'operation': 'create',
'schema': 'public',
'entity': 'users',
'data': {
'name': 'Python User',
'email': 'python@example.com',
'status': 'active'
}
}
client.publish(f"spec/{client_id}/request", json.dumps(create_msg))
def on_message(client, userdata, msg):
message = json.loads(msg.payload.decode())
print(f"Received on {msg.topic}: {message}")
client = mqtt.Client(client_id=client_id)
client.username_pw_set('your-jwt-token', '')
client.on_connect = on_connect
client.on_message = on_message
client.connect('localhost', 1883, 60)
client.loop_forever()
```
### Go (paho.mqtt.golang)
```go
package main
import (
"encoding/json"
"fmt"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
func main() {
clientID := "client-go-123"
opts := mqtt.NewClientOptions()
opts.AddBroker("tcp://localhost:1883")
opts.SetClientID(clientID)
opts.SetUsername("your-jwt-token")
opts.SetPassword("")
opts.SetDefaultPublishHandler(func(client mqtt.Client, msg mqtt.Message) {
var message map[string]interface{}
json.Unmarshal(msg.Payload(), &message)
fmt.Printf("Received on %s: %+v\n", msg.Topic(), message)
})
opts.OnConnect = func(client mqtt.Client) {
fmt.Println("Connected to MQTT broker")
// Subscribe to responses
client.Subscribe(fmt.Sprintf("spec/%s/response", clientID), 1, nil)
// Read users
readMsg := map[string]interface{}{
"id": "msg-1",
"type": "request",
"operation": "read",
"schema": "public",
"entity": "users",
"options": map[string]interface{}{
"filters": []map[string]interface{}{
{"column": "status", "operator": "eq", "value": "active"},
},
},
}
payload, _ := json.Marshal(readMsg)
client.Publish(fmt.Sprintf("spec/%s/request", clientID), 1, false, payload)
}
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
// Keep running
select {}
}
```
## Configuration Options
### BrokerConfig (Embedded Broker)
```go
type BrokerConfig struct {
Host string // Default: "localhost"
Port int // Default: 1883
EnableWebSocket bool // Enable WebSocket listener
WSPort int // WebSocket port (default: 1884)
MaxConnections int // Max concurrent connections
KeepAlive time.Duration // MQTT keep-alive interval
EnableAuth bool // Enable authentication
}
```
### ExternalBrokerConfig
```go
type ExternalBrokerConfig struct {
BrokerURL string // MQTT broker URL (tcp://host:port)
ClientID string // MQTT client ID
Username string // MQTT username
Password string // MQTT password
CleanSession bool // Clean session flag
KeepAlive time.Duration // Keep-alive interval
ConnectTimeout time.Duration // Connection timeout
ReconnectDelay time.Duration // Auto-reconnect delay
MaxReconnect int // Max reconnect attempts
TLSConfig *tls.Config // TLS configuration
}
```
### QoS Configuration
```go
handler, err := mqttspec.NewHandlerWithGORM(db,
mqttspec.WithQoS(1, 1, 1), // Request, Response, Notification
)
```
### Topic Prefix
```go
handler, err := mqttspec.NewHandlerWithGORM(db,
mqttspec.WithTopicPrefix("myapp"), // Changes topics to myapp/{client_id}/...
)
```
## Documentation References
- **ResolveSpec JSON Protocol**: See `/pkg/resolvespec/README.md` for the full message protocol specification
- **WebSocketSpec Documentation**: See `/pkg/websocketspec/README.md` for similar WebSocket-based implementation
- **Common Interfaces**: See `/pkg/common/types.go` for database adapter interfaces and query options
- **Model Registry**: See `/pkg/modelregistry/README.md` for model registration and reflection
- **Hooks Reference**: See `/pkg/websocketspec/hooks.go` for hook types (same as MQTTSpec)
- **Subscription Management**: See `/pkg/websocketspec/subscription.go` for subscription filtering
## Comparison: MQTTSpec vs WebSocketSpec
| Feature | MQTTSpec | WebSocketSpec |
|---------|----------|---------------|
| **Transport** | MQTT (pub/sub broker) | WebSocket (direct connection) |
| **Connection Model** | Broker-mediated | Direct client-server |
| **QoS Levels** | QoS 0, 1, 2 support | No built-in QoS |
| **Offline Messages** | Yes (with QoS 1+) | No |
| **Auto-reconnect** | Yes (built into MQTT) | Manual implementation needed |
| **Network Efficiency** | Better for unreliable networks | Better for low-latency |
| **Best For** | IoT, mobile apps, distributed systems | Web applications, real-time dashboards |
| **Message Protocol** | Same JSON structure | Same JSON structure |
| **Hooks** | Same 12 hooks | Same 12 hooks |
| **CRUD Operations** | Identical | Identical |
| **Subscriptions** | Identical (via MQTT topics) | Identical (via app-level) |
## Use Cases
### IoT Sensor Data
```go
// Sensors publish data, backend stores and notifies subscribers
handler.Registry().RegisterModel("public.sensor_readings", &SensorReading{})
// Auto-set device_id from client metadata
handler.Hooks().Register(mqttspec.BeforeCreate, func(ctx *mqttspec.HookContext) error {
client := ctx.Metadata["mqtt_client"].(*mqttspec.Client)
deviceID, _ := client.GetMetadata("device_id")
if ctx.Entity == "sensor_readings" {
if dataMap, ok := ctx.Data.(map[string]interface{}); ok {
dataMap["device_id"] = deviceID
dataMap["timestamp"] = time.Now()
}
}
return nil
})
```
### Mobile App with Offline Support
MQTTSpec's QoS 1 ensures messages are delivered even if the client temporarily disconnects.
### Distributed Microservices
Multiple services can subscribe to entity changes and react accordingly.
## Testing
Run unit tests:
```bash
go test -v ./pkg/mqttspec
```
Run with race detection:
```bash
go test -race -v ./pkg/mqttspec
```
## License
This package is part of the ResolveSpec project.
## Contributing
Contributions are welcome! Please ensure:
- All tests pass (`go test ./pkg/mqttspec`)
- No race conditions (`go test -race ./pkg/mqttspec`)
- Documentation is updated
- Examples are provided for new features
## Support
For issues, questions, or feature requests, please open an issue in the ResolveSpec repository.