diff --git a/go.mod b/go.mod index e0228ed..546474a 100644 --- a/go.mod +++ b/go.mod @@ -44,6 +44,7 @@ require ( github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-viper/mapstructure/v2 v2.4.0 // indirect + github.com/gorilla/websocket v1.5.3 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.2 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect diff --git a/go.sum b/go.sum index b9335b2..bac8865 100644 --- a/go.sum +++ b/go.sum @@ -48,6 +48,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.2 h1:8Tjv8EJ+pM1xP8mK6egEbD1OgnVTyacbefKhmbLhIhU= github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.2/go.mod h1:pkJQ2tZHJ0aFOVEEot6oZmaVEZcRme73eIFmhiVuRWs= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= diff --git a/pkg/websocketspec/README.md b/pkg/websocketspec/README.md new file mode 100644 index 0000000..9472cb2 --- /dev/null +++ b/pkg/websocketspec/README.md @@ -0,0 +1,726 @@ +# WebSocketSpec - Real-Time WebSocket API Framework + +WebSocketSpec provides a WebSocket-based API specification for real-time, bidirectional communication with full CRUD operations, subscriptions, and lifecycle hooks. + +## Table of Contents + +- [Features](#features) +- [Installation](#installation) +- [Quick Start](#quick-start) +- [Message Protocol](#message-protocol) +- [CRUD Operations](#crud-operations) +- [Subscriptions](#subscriptions) +- [Lifecycle Hooks](#lifecycle-hooks) +- [Client Examples](#client-examples) +- [Authentication](#authentication) +- [Error Handling](#error-handling) +- [Best Practices](#best-practices) + +## Features + +- **Real-Time Bidirectional Communication**: WebSocket-based persistent connections +- **Full CRUD Operations**: Create, Read, Update, Delete with rich query options +- **Real-Time Subscriptions**: Subscribe to entity changes with filter support +- **Automatic Notifications**: Server pushes updates to subscribed clients +- **Lifecycle Hooks**: Before/after hooks for all operations +- **Database Agnostic**: Works with GORM and Bun ORM through adapters +- **Connection Management**: Automatic connection tracking and cleanup +- **Request/Response Correlation**: Message IDs for tracking requests +- **Filter & Sort**: Advanced filtering, sorting, pagination, and preloading + +## Installation + +```bash +go get github.com/bitechdev/ResolveSpec +``` + +## Quick Start + +### Server Setup + +```go +package main + +import ( + "net/http" + "github.com/bitechdev/ResolveSpec/pkg/websocketspec" + "gorm.io/driver/postgres" + "gorm.io/gorm" +) + +func main() { + // Connect to database + db, _ := gorm.Open(postgres.Open("your-connection-string"), &gorm.Config{}) + + // Create WebSocket handler + handler := websocketspec.NewHandlerWithGORM(db) + + // Register models + handler.Registry.RegisterModel("public.users", &User{}) + handler.Registry.RegisterModel("public.posts", &Post{}) + + // Setup WebSocket endpoint + http.HandleFunc("/ws", handler.HandleWebSocket) + + // Start server + http.ListenAndServe(":8080", nil) +} + +type User struct { + ID uint `json:"id" gorm:"primaryKey"` + Name string `json:"name"` + Email string `json:"email"` + Status string `json:"status"` +} + +type Post struct { + ID uint `json:"id" gorm:"primaryKey"` + Title string `json:"title"` + Content string `json:"content"` + UserID uint `json:"user_id"` +} +``` + +### Client Setup (JavaScript) + +```javascript +const ws = new WebSocket("ws://localhost:8080/ws"); + +ws.onopen = () => { + console.log("Connected to WebSocket"); +}; + +ws.onmessage = (event) => { + const message = JSON.parse(event.data); + console.log("Received:", message); +}; + +ws.onerror = (error) => { + console.error("WebSocket error:", error); +}; +``` + +## Message Protocol + +All messages are JSON-encoded with the following structure: + +```typescript +interface Message { + id: string; // Unique message ID for correlation + type: "request" | "response" | "notification" | "subscription"; + operation?: "read" | "create" | "update" | "delete" | "subscribe" | "unsubscribe" | "meta"; + schema?: string; // Database schema + entity: string; // Table/model name + record_id?: string; // For single-record operations + data?: any; // Request/response payload + options?: QueryOptions; // Filters, sorting, pagination + subscription_id?: string; // For subscription messages + success?: boolean; // Response success indicator + error?: ErrorInfo; // Error details + metadata?: Record; // Additional metadata + timestamp?: string; // Message timestamp +} + +interface QueryOptions { + filters?: FilterOption[]; + columns?: string[]; + preload?: PreloadOption[]; + sort?: SortOption[]; + limit?: number; + offset?: number; +} +``` + +## CRUD Operations + +### CREATE - Create New Records + +**Request:** +```json +{ + "id": "msg-1", + "type": "request", + "operation": "create", + "schema": "public", + "entity": "users", + "data": { + "name": "John Doe", + "email": "john@example.com", + "status": "active" + } +} +``` + +**Response:** +```json +{ + "id": "msg-1", + "type": "response", + "success": true, + "data": { + "id": 123, + "name": "John Doe", + "email": "john@example.com", + "status": "active" + }, + "timestamp": "2025-12-12T10:30:00Z" +} +``` + +### READ - Query Records + +**Read Multiple Records:** +```json +{ + "id": "msg-2", + "type": "request", + "operation": "read", + "schema": "public", + "entity": "users", + "options": { + "filters": [ + {"column": "status", "operator": "eq", "value": "active"} + ], + "columns": ["id", "name", "email"], + "sort": [ + {"column": "name", "direction": "asc"} + ], + "limit": 10, + "offset": 0 + } +} +``` + +**Read Single Record:** +```json +{ + "id": "msg-3", + "type": "request", + "operation": "read", + "schema": "public", + "entity": "users", + "record_id": "123" +} +``` + +**Response:** +```json +{ + "id": "msg-2", + "type": "response", + "success": true, + "data": [ + {"id": 1, "name": "Alice", "email": "alice@example.com"}, + {"id": 2, "name": "Bob", "email": "bob@example.com"} + ], + "metadata": { + "total": 50, + "count": 2 + }, + "timestamp": "2025-12-12T10:30:00Z" +} +``` + +### UPDATE - Update Records + +```json +{ + "id": "msg-4", + "type": "request", + "operation": "update", + "schema": "public", + "entity": "users", + "record_id": "123", + "data": { + "name": "John Updated", + "email": "john.updated@example.com" + } +} +``` + +### DELETE - Delete Records + +```json +{ + "id": "msg-5", + "type": "request", + "operation": "delete", + "schema": "public", + "entity": "users", + "record_id": "123" +} +``` + +## Subscriptions + +Subscriptions allow clients to receive real-time notifications when entities change. + +### Subscribe to Changes + +```json +{ + "id": "sub-1", + "type": "subscription", + "operation": "subscribe", + "schema": "public", + "entity": "users", + "options": { + "filters": [ + {"column": "status", "operator": "eq", "value": "active"} + ] + } +} +``` + +**Response:** +```json +{ + "id": "sub-1", + "type": "response", + "success": true, + "data": { + "subscription_id": "sub-abc123", + "schema": "public", + "entity": "users" + }, + "timestamp": "2025-12-12T10:30:00Z" +} +``` + +### Receive Notifications + +When a subscribed entity changes, clients automatically receive notifications: + +```json +{ + "type": "notification", + "operation": "create", + "subscription_id": "sub-abc123", + "schema": "public", + "entity": "users", + "data": { + "id": 124, + "name": "Jane Smith", + "email": "jane@example.com", + "status": "active" + }, + "timestamp": "2025-12-12T10:35:00Z" +} +``` + +**Notification Operations:** +- `create` - New record created +- `update` - Record updated +- `delete` - Record deleted + +### Unsubscribe + +```json +{ + "id": "unsub-1", + "type": "subscription", + "operation": "unsubscribe", + "subscription_id": "sub-abc123" +} +``` + +## Lifecycle Hooks + +Hooks allow you to intercept and modify operations at various points in the lifecycle. + +### Available Hook Types + +- **BeforeRead** / **AfterRead** +- **BeforeCreate** / **AfterCreate** +- **BeforeUpdate** / **AfterUpdate** +- **BeforeDelete** / **AfterDelete** +- **BeforeSubscribe** / **AfterSubscribe** +- **BeforeConnect** / **AfterConnect** + +### Hook Example + +```go +handler := websocketspec.NewHandlerWithGORM(db) + +// Authorization hook +handler.Hooks().RegisterBefore(websocketspec.OperationRead, func(ctx *websocketspec.HookContext) error { + // Check permissions + userID, _ := ctx.Connection.GetMetadata("user_id") + if userID == nil { + return fmt.Errorf("unauthorized: user not authenticated") + } + + // Add filter to only show user's own records + if ctx.Entity == "posts" { + ctx.Options.Filters = append(ctx.Options.Filters, common.FilterOption{ + Column: "user_id", + Operator: "eq", + Value: userID, + }) + } + + return nil +}) + +// Logging hook +handler.Hooks().RegisterAfter(websocketspec.OperationCreate, func(ctx *websocketspec.HookContext) error { + log.Printf("Created %s in %s.%s", ctx.Result, ctx.Schema, ctx.Entity) + return nil +}) + +// Validation hook +handler.Hooks().RegisterBefore(websocketspec.OperationCreate, func(ctx *websocketspec.HookContext) error { + // Validate data before creation + if data, ok := ctx.Data.(map[string]interface{}); ok { + if email, exists := data["email"]; !exists || email == "" { + return fmt.Errorf("email is required") + } + } + return nil +}) +``` + +## Client Examples + +### JavaScript/TypeScript Client + +```typescript +class WebSocketClient { + private ws: WebSocket; + private messageHandlers: Map void> = new Map(); + private subscriptions: Map void> = new Map(); + + constructor(url: string) { + this.ws = new WebSocket(url); + this.ws.onmessage = (event) => this.handleMessage(event); + } + + // Send request and wait for response + async request(operation: string, entity: string, options?: any): Promise { + const id = this.generateId(); + + return new Promise((resolve, reject) => { + this.messageHandlers.set(id, (data) => { + if (data.success) { + resolve(data.data); + } else { + reject(data.error); + } + }); + + this.ws.send(JSON.stringify({ + id, + type: "request", + operation, + entity, + ...options + })); + }); + } + + // Subscribe to entity changes + async subscribe(entity: string, filters?: any[], callback?: (data: any) => void): Promise { + const id = this.generateId(); + + return new Promise((resolve, reject) => { + this.messageHandlers.set(id, (data) => { + if (data.success) { + const subId = data.data.subscription_id; + if (callback) { + this.subscriptions.set(subId, callback); + } + resolve(subId); + } else { + reject(data.error); + } + }); + + this.ws.send(JSON.stringify({ + id, + type: "subscription", + operation: "subscribe", + entity, + options: { filters } + })); + }); + } + + private handleMessage(event: MessageEvent) { + const message = JSON.parse(event.data); + + if (message.type === "response") { + const handler = this.messageHandlers.get(message.id); + if (handler) { + handler(message); + this.messageHandlers.delete(message.id); + } + } else if (message.type === "notification") { + const callback = this.subscriptions.get(message.subscription_id); + if (callback) { + callback(message); + } + } + } + + private generateId(): string { + return `msg-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`; + } +} + +// Usage +const client = new WebSocketClient("ws://localhost:8080/ws"); + +// Read users +const users = await client.request("read", "users", { + options: { + filters: [{ column: "status", operator: "eq", value: "active" }], + limit: 10 + } +}); + +// Subscribe to user changes +await client.subscribe("users", + [{ column: "status", operator: "eq", value: "active" }], + (notification) => { + console.log("User changed:", notification.operation, notification.data); + } +); + +// Create user +const newUser = await client.request("create", "users", { + data: { + name: "Alice", + email: "alice@example.com", + status: "active" + } +}); +``` + +### Python Client Example + +```python +import asyncio +import websockets +import json +import uuid + +class WebSocketClient: + def __init__(self, url): + self.url = url + self.ws = None + self.handlers = {} + self.subscriptions = {} + + async def connect(self): + self.ws = await websockets.connect(self.url) + asyncio.create_task(self.listen()) + + async def listen(self): + async for message in self.ws: + data = json.loads(message) + + if data["type"] == "response": + handler = self.handlers.get(data["id"]) + if handler: + handler(data) + del self.handlers[data["id"]] + + elif data["type"] == "notification": + callback = self.subscriptions.get(data["subscription_id"]) + if callback: + callback(data) + + async def request(self, operation, entity, **kwargs): + msg_id = str(uuid.uuid4()) + future = asyncio.Future() + + self.handlers[msg_id] = lambda data: future.set_result(data) + + await self.ws.send(json.dumps({ + "id": msg_id, + "type": "request", + "operation": operation, + "entity": entity, + **kwargs + })) + + result = await future + if result["success"]: + return result["data"] + else: + raise Exception(result["error"]["message"]) + + async def subscribe(self, entity, callback, filters=None): + msg_id = str(uuid.uuid4()) + future = asyncio.Future() + + self.handlers[msg_id] = lambda data: future.set_result(data) + + await self.ws.send(json.dumps({ + "id": msg_id, + "type": "subscription", + "operation": "subscribe", + "entity": entity, + "options": {"filters": filters} if filters else {} + })) + + result = await future + if result["success"]: + sub_id = result["data"]["subscription_id"] + self.subscriptions[sub_id] = callback + return sub_id + else: + raise Exception(result["error"]["message"]) + +# Usage +async def main(): + client = WebSocketClient("ws://localhost:8080/ws") + await client.connect() + + # Read users + users = await client.request("read", "users", + options={ + "filters": [{"column": "status", "operator": "eq", "value": "active"}], + "limit": 10 + } + ) + print("Users:", users) + + # Subscribe to changes + def on_user_change(notification): + print(f"User {notification['operation']}: {notification['data']}") + + await client.subscribe("users", on_user_change, + filters=[{"column": "status", "operator": "eq", "value": "active"}] + ) + +asyncio.run(main()) +``` + +## Authentication + +Implement authentication using hooks: + +```go +handler := websocketspec.NewHandlerWithGORM(db) + +// Authentication on connection +handler.Hooks().Register(websocketspec.BeforeConnect, func(ctx *websocketspec.HookContext) error { + // Extract token from query params or headers + r := ctx.Connection.ws.UnderlyingConn().RemoteAddr() + + // Validate token (implement your auth logic) + token := extractToken(r) + user, err := validateToken(token) + if err != nil { + return fmt.Errorf("authentication failed: %w", err) + } + + // Store user info in connection metadata + ctx.Connection.SetMetadata("user", user) + ctx.Connection.SetMetadata("user_id", user.ID) + + return nil +}) + +// Check permissions for each operation +handler.Hooks().RegisterBefore(websocketspec.OperationRead, func(ctx *websocketspec.HookContext) error { + userID, ok := ctx.Connection.GetMetadata("user_id") + if !ok { + return fmt.Errorf("unauthorized") + } + + // Add user-specific filters + if ctx.Entity == "orders" { + ctx.Options.Filters = append(ctx.Options.Filters, common.FilterOption{ + Column: "user_id", + Operator: "eq", + Value: userID, + }) + } + + return nil +}) +``` + +## Error Handling + +Errors are returned in a consistent format: + +```json +{ + "id": "msg-1", + "type": "response", + "success": false, + "error": { + "code": "validation_error", + "message": "Email is required", + "details": { + "field": "email" + } + }, + "timestamp": "2025-12-12T10:30:00Z" +} +``` + +**Common Error Codes:** +- `invalid_message` - Message format is invalid +- `model_not_found` - Entity not registered +- `invalid_model` - Model validation failed +- `read_error` - Read operation failed +- `create_error` - Create operation failed +- `update_error` - Update operation failed +- `delete_error` - Delete operation failed +- `hook_error` - Hook execution failed +- `unauthorized` - Authentication/authorization failed + +## Best Practices + +1. **Always Use Message IDs**: Correlate requests with responses using unique IDs +2. **Handle Reconnections**: Implement automatic reconnection logic on the client +3. **Validate Data**: Use before-hooks to validate data before operations +4. **Limit Subscriptions**: Implement limits on subscriptions per connection +5. **Use Filters**: Apply filters to subscriptions to reduce unnecessary notifications +6. **Implement Authentication**: Always validate users before processing operations +7. **Handle Errors Gracefully**: Display user-friendly error messages +8. **Clean Up**: Unsubscribe when components unmount or disconnect +9. **Rate Limiting**: Implement rate limiting to prevent abuse +10. **Monitor Connections**: Track active connections and subscriptions + +## Filter Operators + +Supported filter operators: + +- `eq` - Equal (=) +- `neq` - Not Equal (!=) +- `gt` - Greater Than (>) +- `gte` - Greater Than or Equal (>=) +- `lt` - Less Than (<) +- `lte` - Less Than or Equal (<=) +- `like` - LIKE (case-sensitive) +- `ilike` - ILIKE (case-insensitive) +- `in` - IN (array of values) + +## Performance Considerations + +- **Connection Pooling**: WebSocket connections are reused, reducing overhead +- **Subscription Filtering**: Only matching updates are sent to clients +- **Efficient Queries**: Uses database adapters for optimized queries +- **Message Batching**: Multiple messages can be sent in one write +- **Keepalive**: Automatic ping/pong for connection health + +## Comparison with Other Specs + +| Feature | WebSocketSpec | RestHeadSpec | ResolveSpec | +|---------|--------------|--------------|-------------| +| Protocol | WebSocket | HTTP/REST | HTTP/REST | +| Real-time | ✅ Yes | ❌ No | ❌ No | +| Subscriptions | ✅ Yes | ❌ No | ❌ No | +| Bidirectional | ✅ Yes | ❌ No | ❌ No | +| Query Options | In Message | In Headers | In Body | +| Overhead | Low | Medium | Medium | +| Use Case | Real-time apps | Traditional APIs | Body-based APIs | + +## License + +MIT License - See LICENSE file for details diff --git a/pkg/websocketspec/connection.go b/pkg/websocketspec/connection.go new file mode 100644 index 0000000..05b5bee --- /dev/null +++ b/pkg/websocketspec/connection.go @@ -0,0 +1,369 @@ +package websocketspec + +import ( + "context" + "encoding/json" + "fmt" + "sync" + "time" + + "github.com/bitechdev/ResolveSpec/pkg/logger" + "github.com/gorilla/websocket" +) + +// Connection rvepresents a WebSocket connection with its state +type Connection struct { + // ID is a unique identifier for this connection + ID string + + // ws is the underlying WebSocket connection + ws *websocket.Conn + + // send is a channel for outbound messages + send chan []byte + + // subscriptions holds active subscriptions for this connection + subscriptions map[string]*Subscription + + // mu protects subscriptions map + mu sync.RWMutex + + // ctx is the connection context + ctx context.Context + + // cancel cancels the connection context + cancel context.CancelFunc + + // handler is the WebSocket handler + handler *Handler + + // metadata stores connection-specific metadata (e.g., user info, auth state) + metadata map[string]interface{} + + // metaMu protects metadata map + metaMu sync.RWMutex + + // closedOnce ensures cleanup happens only once + closedOnce sync.Once +} + +// ConnectionManager manages all active WebSocket connections +type ConnectionManager struct { + // connections holds all active connections + connections map[string]*Connection + + // mu protects the connections map + mu sync.RWMutex + + // register channel for new connections + register chan *Connection + + // unregister channel for closing connections + unregister chan *Connection + + // broadcast channel for broadcasting messages + broadcast chan *BroadcastMessage + + // ctx is the manager context + ctx context.Context + + // cancel cancels the manager context + cancel context.CancelFunc +} + +// BroadcastMessage represents a message to broadcast to multiple connections +type BroadcastMessage struct { + // Message is the message to broadcast + Message []byte + + // Filter is an optional function to filter which connections receive the message + Filter func(*Connection) bool +} + +// NewConnection creates a new WebSocket connection +func NewConnection(id string, ws *websocket.Conn, handler *Handler) *Connection { + ctx, cancel := context.WithCancel(context.Background()) + return &Connection{ + ID: id, + ws: ws, + send: make(chan []byte, 256), + subscriptions: make(map[string]*Subscription), + ctx: ctx, + cancel: cancel, + handler: handler, + metadata: make(map[string]interface{}), + } +} + +// NewConnectionManager creates a new connection manager +func NewConnectionManager(ctx context.Context) *ConnectionManager { + ctx, cancel := context.WithCancel(ctx) + return &ConnectionManager{ + connections: make(map[string]*Connection), + register: make(chan *Connection), + unregister: make(chan *Connection), + broadcast: make(chan *BroadcastMessage), + ctx: ctx, + cancel: cancel, + } +} + +// Run starts the connection manager event loop +func (cm *ConnectionManager) Run() { + for { + select { + case conn := <-cm.register: + cm.mu.Lock() + cm.connections[conn.ID] = conn + cm.mu.Unlock() + logger.Info("[WebSocketSpec] Connection registered: %s (total: %d)", conn.ID, cm.Count()) + + case conn := <-cm.unregister: + cm.mu.Lock() + if _, ok := cm.connections[conn.ID]; ok { + delete(cm.connections, conn.ID) + close(conn.send) + logger.Info("[WebSocketSpec] Connection unregistered: %s (total: %d)", conn.ID, cm.Count()) + } + cm.mu.Unlock() + + case msg := <-cm.broadcast: + cm.mu.RLock() + for _, conn := range cm.connections { + if msg.Filter == nil || msg.Filter(conn) { + select { + case conn.send <- msg.Message: + default: + // Channel full, connection is slow - close it + logger.Warn("[WebSocketSpec] Connection %s send buffer full, closing", conn.ID) + cm.mu.RUnlock() + cm.unregister <- conn + cm.mu.RLock() + } + } + } + cm.mu.RUnlock() + + case <-cm.ctx.Done(): + logger.Info("[WebSocketSpec] Connection manager shutting down") + return + } + } +} + +// Register registers a new connection +func (cm *ConnectionManager) Register(conn *Connection) { + cm.register <- conn +} + +// Unregister removes a connection +func (cm *ConnectionManager) Unregister(conn *Connection) { + cm.unregister <- conn +} + +// Broadcast sends a message to all connections matching the filter +func (cm *ConnectionManager) Broadcast(message []byte, filter func(*Connection) bool) { + cm.broadcast <- &BroadcastMessage{ + Message: message, + Filter: filter, + } +} + +// Count returns the number of active connections +func (cm *ConnectionManager) Count() int { + cm.mu.RLock() + defer cm.mu.RUnlock() + return len(cm.connections) +} + +// GetConnection retrieves a connection by ID +func (cm *ConnectionManager) GetConnection(id string) (*Connection, bool) { + cm.mu.RLock() + defer cm.mu.RUnlock() + conn, ok := cm.connections[id] + return conn, ok +} + +// Shutdown gracefully shuts down the connection manager +func (cm *ConnectionManager) Shutdown() { + cm.cancel() + + // Close all connections + cm.mu.Lock() + for _, conn := range cm.connections { + conn.Close() + } + cm.mu.Unlock() +} + +// ReadPump reads messages from the WebSocket connection +func (c *Connection) ReadPump() { + defer func() { + c.handler.connManager.Unregister(c) + c.Close() + }() + + // Configure read parameters + c.ws.SetReadDeadline(time.Now().Add(60 * time.Second)) + c.ws.SetPongHandler(func(string) error { + c.ws.SetReadDeadline(time.Now().Add(60 * time.Second)) + return nil + }) + + for { + _, message, err := c.ws.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { + logger.Error("[WebSocketSpec] Connection %s read error: %v", c.ID, err) + } + break + } + + // Parse and handle the message + c.handleMessage(message) + } +} + +// WritePump writes messages to the WebSocket connection +func (c *Connection) WritePump() { + ticker := time.NewTicker(54 * time.Second) + defer func() { + ticker.Stop() + c.Close() + }() + + for { + select { + case message, ok := <-c.send: + c.ws.SetWriteDeadline(time.Now().Add(10 * time.Second)) + if !ok { + // Channel closed + c.ws.WriteMessage(websocket.CloseMessage, []byte{}) + return + } + + w, err := c.ws.NextWriter(websocket.TextMessage) + if err != nil { + return + } + w.Write(message) + + // Write any queued messages + n := len(c.send) + for i := 0; i < n; i++ { + w.Write([]byte{'\n'}) + w.Write(<-c.send) + } + + if err := w.Close(); err != nil { + return + } + + case <-ticker.C: + c.ws.SetWriteDeadline(time.Now().Add(10 * time.Second)) + if err := c.ws.WriteMessage(websocket.PingMessage, nil); err != nil { + return + } + + case <-c.ctx.Done(): + return + } + } +} + +// Send sends a message to this connection +func (c *Connection) Send(message []byte) error { + select { + case c.send <- message: + return nil + case <-c.ctx.Done(): + return fmt.Errorf("connection closed") + default: + return fmt.Errorf("send buffer full") + } +} + +// SendJSON sends a JSON-encoded message to this connection +func (c *Connection) SendJSON(v interface{}) error { + data, err := json.Marshal(v) + if err != nil { + return fmt.Errorf("failed to marshal message: %w", err) + } + return c.Send(data) +} + +// Close closes the connection +func (c *Connection) Close() { + c.closedOnce.Do(func() { + c.cancel() + c.ws.Close() + + // Clean up subscriptions + c.mu.Lock() + for subID := range c.subscriptions { + c.handler.subscriptionManager.Unsubscribe(subID) + } + c.subscriptions = make(map[string]*Subscription) + c.mu.Unlock() + + logger.Info("[WebSocketSpec] Connection %s closed", c.ID) + }) +} + +// AddSubscription adds a subscription to this connection +func (c *Connection) AddSubscription(sub *Subscription) { + c.mu.Lock() + defer c.mu.Unlock() + c.subscriptions[sub.ID] = sub +} + +// RemoveSubscription removes a subscription from this connection +func (c *Connection) RemoveSubscription(subID string) { + c.mu.Lock() + defer c.mu.Unlock() + delete(c.subscriptions, subID) +} + +// GetSubscription retrieves a subscription by ID +func (c *Connection) GetSubscription(subID string) (*Subscription, bool) { + c.mu.RLock() + defer c.mu.RUnlock() + sub, ok := c.subscriptions[subID] + return sub, ok +} + +// SetMetadata sets metadata for this connection +func (c *Connection) SetMetadata(key string, value interface{}) { + c.metaMu.Lock() + defer c.metaMu.Unlock() + c.metadata[key] = value +} + +// GetMetadata retrieves metadata for this connection +func (c *Connection) GetMetadata(key string) (interface{}, bool) { + c.metaMu.RLock() + defer c.metaMu.RUnlock() + val, ok := c.metadata[key] + return val, ok +} + +// handleMessage processes an incoming message +func (c *Connection) handleMessage(data []byte) { + msg, err := ParseMessage(data) + if err != nil { + logger.Error("[WebSocketSpec] Failed to parse message: %v", err) + errResp := NewErrorResponse("", "invalid_message", "Failed to parse message") + c.SendJSON(errResp) + return + } + + if !msg.IsValid() { + logger.Error("[WebSocketSpec] Invalid message received") + errResp := NewErrorResponse(msg.ID, "invalid_message", "Message validation failed") + c.SendJSON(errResp) + return + } + + // Route message to appropriate handler + c.handler.HandleMessage(c, msg) +} diff --git a/pkg/websocketspec/example_test.go b/pkg/websocketspec/example_test.go new file mode 100644 index 0000000..54f28ec --- /dev/null +++ b/pkg/websocketspec/example_test.go @@ -0,0 +1,239 @@ +package websocketspec_test + +import ( + "fmt" + "log" + "net/http" + + "github.com/bitechdev/ResolveSpec/pkg/websocketspec" + "gorm.io/driver/postgres" + "gorm.io/gorm" +) + +// User model example +type User struct { + ID uint `json:"id" gorm:"primaryKey"` + Name string `json:"name"` + Email string `json:"email"` + Status string `json:"status"` +} + +// Post model example +type Post struct { + ID uint `json:"id" gorm:"primaryKey"` + Title string `json:"title"` + Content string `json:"content"` + UserID uint `json:"user_id"` + User *User `json:"user,omitempty" gorm:"foreignKey:UserID"` +} + +// Example_basicSetup demonstrates basic WebSocketSpec setup +func Example_basicSetup() { + // Connect to database + db, err := gorm.Open(postgres.Open("your-connection-string"), &gorm.Config{}) + if err != nil { + log.Fatal(err) + } + + // Create WebSocket handler + handler := websocketspec.NewHandlerWithGORM(db) + + // Register models + handler.Registry().RegisterModel("public.users", &User{}) + handler.Registry().RegisterModel("public.posts", &Post{}) + + // Setup WebSocket endpoint + http.HandleFunc("/ws", handler.HandleWebSocket) + + // Start server + log.Println("WebSocket server starting on :8080") + if err := http.ListenAndServe(":8080", nil); err != nil { + log.Fatal(err) + } +} + +// Example_withHooks demonstrates using lifecycle hooks +func Example_withHooks() { + db, _ := gorm.Open(postgres.Open("your-connection-string"), &gorm.Config{}) + handler := websocketspec.NewHandlerWithGORM(db) + + // Register models + handler.Registry().RegisterModel("public.users", &User{}) + + // Add authentication hook + handler.Hooks().Register(websocketspec.BeforeConnect, func(ctx *websocketspec.HookContext) error { + // Validate authentication token + // (In real implementation, extract from query params or headers) + userID := uint(123) // From token + + // Store in connection metadata + ctx.Connection.SetMetadata("user_id", userID) + log.Printf("User %d connected", userID) + + return nil + }) + + // Add authorization hook for read operations + handler.Hooks().RegisterBefore(websocketspec.OperationRead, func(ctx *websocketspec.HookContext) error { + userID, ok := ctx.Connection.GetMetadata("user_id") + if !ok { + return fmt.Errorf("unauthorized: not authenticated") + } + + log.Printf("User %v reading %s.%s", userID, ctx.Schema, ctx.Entity) + + // Add filter to only show user's own records + if ctx.Entity == "posts" { + // ctx.Options.Filters = append(ctx.Options.Filters, common.FilterOption{ + // Column: "user_id", + // Operator: "eq", + // Value: userID, + // }) + } + + return nil + }) + + // Add logging hook after create + handler.Hooks().RegisterAfter(websocketspec.OperationCreate, func(ctx *websocketspec.HookContext) error { + userID, _ := ctx.Connection.GetMetadata("user_id") + log.Printf("User %v created record in %s.%s", userID, ctx.Schema, ctx.Entity) + return nil + }) + + // Add validation hook before create + handler.Hooks().RegisterBefore(websocketspec.OperationCreate, func(ctx *websocketspec.HookContext) error { + // Validate required fields + if data, ok := ctx.Data.(map[string]interface{}); ok { + if ctx.Entity == "users" { + if email, exists := data["email"]; !exists || email == "" { + return fmt.Errorf("validation error: email is required") + } + if name, exists := data["name"]; !exists || name == "" { + return fmt.Errorf("validation error: name is required") + } + } + } + return nil + }) + + // Add limit hook for subscriptions + handler.Hooks().Register(websocketspec.BeforeSubscribe, func(ctx *websocketspec.HookContext) error { + // Limit subscriptions per connection + maxSubscriptions := 10 + currentCount := len(ctx.Connection.subscriptions) + + if currentCount >= maxSubscriptions { + return fmt.Errorf("maximum subscriptions reached (%d)", maxSubscriptions) + } + + log.Printf("Creating subscription %d/%d", currentCount+1, maxSubscriptions) + return nil + }) + + http.HandleFunc("/ws", handler.HandleWebSocket) + log.Println("Server with hooks starting on :8080") + http.ListenAndServe(":8080", nil) +} + +// Example_monitoring demonstrates monitoring connections and subscriptions +func Example_monitoring() { + db, _ := gorm.Open(postgres.Open("your-connection-string"), &gorm.Config{}) + handler := websocketspec.NewHandlerWithGORM(db) + + handler.Registry.RegisterModel("public.users", &User{}) + + // Add connection tracking + handler.Hooks().Register(websocketspec.AfterConnect, func(ctx *websocketspec.HookContext) error { + count := handler.GetConnectionCount() + log.Printf("Client connected. Total connections: %d", count) + return nil + }) + + handler.Hooks().Register(websocketspec.AfterDisconnect, func(ctx *websocketspec.HookContext) error { + count := handler.GetConnectionCount() + log.Printf("Client disconnected. Total connections: %d", count) + return nil + }) + + // Add subscription tracking + handler.Hooks().Register(websocketspec.AfterSubscribe, func(ctx *websocketspec.HookContext) error { + count := handler.GetSubscriptionCount() + log.Printf("New subscription. Total subscriptions: %d", count) + return nil + }) + + // Monitoring endpoint + http.HandleFunc("/stats", func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, "Active Connections: %d\n", handler.GetConnectionCount()) + fmt.Fprintf(w, "Active Subscriptions: %d\n", handler.GetSubscriptionCount()) + }) + + http.HandleFunc("/ws", handler.HandleWebSocket) + log.Println("Server with monitoring starting on :8080") + http.ListenAndServe(":8080", nil) +} + +// Example_clientSide shows client-side usage example +func Example_clientSide() { + // This is JavaScript code for documentation purposes + jsCode := ` +// JavaScript WebSocket Client Example + +const ws = new WebSocket("ws://localhost:8080/ws"); + +ws.onopen = () => { + console.log("Connected to WebSocket"); + + // Read users + ws.send(JSON.stringify({ + id: "msg-1", + type: "request", + operation: "read", + schema: "public", + entity: "users", + options: { + filters: [{column: "status", operator: "eq", value: "active"}], + limit: 10 + } + })); + + // Subscribe to user changes + ws.send(JSON.stringify({ + id: "sub-1", + type: "subscription", + operation: "subscribe", + schema: "public", + entity: "users", + options: { + filters: [{column: "status", operator: "eq", value: "active"}] + } + })); +}; + +ws.onmessage = (event) => { + const message = JSON.parse(event.data); + + if (message.type === "response") { + if (message.success) { + console.log("Response:", message.data); + } else { + console.error("Error:", message.error); + } + } else if (message.type === "notification") { + console.log("Notification:", message.operation, message.data); + } +}; + +ws.onerror = (error) => { + console.error("WebSocket error:", error); +}; + +ws.onclose = () => { + console.log("WebSocket connection closed"); + // Implement reconnection logic here +}; +` + + fmt.Println(jsCode) +} diff --git a/pkg/websocketspec/handler.go b/pkg/websocketspec/handler.go new file mode 100644 index 0000000..b61a5e1 --- /dev/null +++ b/pkg/websocketspec/handler.go @@ -0,0 +1,746 @@ +package websocketspec + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "reflect" + "strconv" + "time" + + "github.com/bitechdev/ResolveSpec/pkg/common" + "github.com/bitechdev/ResolveSpec/pkg/logger" + "github.com/bitechdev/ResolveSpec/pkg/reflection" + "github.com/google/uuid" + "github.com/gorilla/websocket" +) + +// Handler handles WebSocket connections and messages +type Handler struct { + db common.Database + registry common.ModelRegistry + hooks *HookRegistry + nestedProcessor *common.NestedCUDProcessor + connManager *ConnectionManager + subscriptionManager *SubscriptionManager + upgrader websocket.Upgrader + ctx context.Context +} + +// NewHandler creates a new WebSocket handler +func NewHandler(db common.Database, registry common.ModelRegistry) *Handler { + ctx := context.Background() + handler := &Handler{ + db: db, + registry: registry, + hooks: NewHookRegistry(), + connManager: NewConnectionManager(ctx), + subscriptionManager: NewSubscriptionManager(), + upgrader: websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + CheckOrigin: func(r *http.Request) bool { + // TODO: Implement proper origin checking + return true + }, + }, + ctx: ctx, + } + + // Initialize nested processor (nil for now, can be added later if needed) + // handler.nestedProcessor = common.NewNestedCUDProcessor(db, registry, handler) + + // Start connection manager + go handler.connManager.Run() + + return handler +} + +// GetRelationshipInfo implements the RelationshipInfoProvider interface +// This is a placeholder implementation - full relationship support can be added later +func (h *Handler) GetRelationshipInfo(modelType reflect.Type, relationName string) *common.RelationshipInfo { + // TODO: Implement full relationship detection similar to restheadspec + return nil +} + +// GetDatabase returns the underlying database connection +// Implements common.SpecHandler interface +func (h *Handler) GetDatabase() common.Database { + return h.db +} + +// Hooks returns the hook registry for this handler +func (h *Handler) Hooks() *HookRegistry { + return h.hooks +} + +// Registry returns the model registry for this handler +func (h *Handler) Registry() common.ModelRegistry { + return h.registry +} + +// HandleWebSocket upgrades HTTP connection to WebSocket +func (h *Handler) HandleWebSocket(w http.ResponseWriter, r *http.Request) { + // Upgrade connection + ws, err := h.upgrader.Upgrade(w, r, nil) + if err != nil { + logger.Error("[WebSocketSpec] Failed to upgrade connection: %v", err) + return + } + + // Create connection + connID := uuid.New().String() + conn := NewConnection(connID, ws, h) + + // Execute before connect hook + hookCtx := &HookContext{ + Context: r.Context(), + Handler: h, + Connection: conn, + } + if err := h.hooks.Execute(BeforeConnect, hookCtx); err != nil { + logger.Error("[WebSocketSpec] BeforeConnect hook failed: %v", err) + ws.Close() + return + } + + // Register connection + h.connManager.Register(conn) + + // Execute after connect hook + h.hooks.Execute(AfterConnect, hookCtx) + + // Start read/write pumps + go conn.WritePump() + go conn.ReadPump() + + logger.Info("[WebSocketSpec] WebSocket connection established: %s", connID) +} + +// HandleMessage routes incoming messages to appropriate handlers +func (h *Handler) HandleMessage(conn *Connection, msg *Message) { + switch msg.Type { + case MessageTypeRequest: + h.handleRequest(conn, msg) + case MessageTypeSubscription: + h.handleSubscription(conn, msg) + case MessageTypePing: + h.handlePing(conn, msg) + default: + errResp := NewErrorResponse(msg.ID, "invalid_message_type", fmt.Sprintf("Unknown message type: %s", msg.Type)) + conn.SendJSON(errResp) + } +} + +// handleRequest processes a request message +func (h *Handler) handleRequest(conn *Connection, msg *Message) { + ctx := conn.ctx + + schema := msg.Schema + entity := msg.Entity + recordID := msg.RecordID + + // Get model from registry + model, err := h.registry.GetModelByEntity(schema, entity) + if err != nil { + logger.Error("[WebSocketSpec] Model not found for %s.%s: %v", schema, entity, err) + errResp := NewErrorResponse(msg.ID, "model_not_found", fmt.Sprintf("Model not found: %s.%s", schema, entity)) + conn.SendJSON(errResp) + return + } + + // Validate and unwrap model + result, err := common.ValidateAndUnwrapModel(model) + if err != nil { + logger.Error("[WebSocketSpec] Model validation failed for %s.%s: %v", schema, entity, err) + errResp := NewErrorResponse(msg.ID, "invalid_model", err.Error()) + conn.SendJSON(errResp) + return + } + + model = result.Model + modelPtr := result.ModelPtr + tableName := h.getTableName(schema, entity, model) + + // Create hook context + hookCtx := &HookContext{ + Context: ctx, + Handler: h, + Connection: conn, + Message: msg, + Schema: schema, + Entity: entity, + TableName: tableName, + Model: model, + ModelPtr: modelPtr, + Options: msg.Options, + ID: recordID, + Data: msg.Data, + Metadata: make(map[string]interface{}), + } + + // Route to operation handler + switch msg.Operation { + case OperationRead: + h.handleRead(conn, msg, hookCtx) + case OperationCreate: + h.handleCreate(conn, msg, hookCtx) + case OperationUpdate: + h.handleUpdate(conn, msg, hookCtx) + case OperationDelete: + h.handleDelete(conn, msg, hookCtx) + case OperationMeta: + h.handleMeta(conn, msg, hookCtx) + default: + errResp := NewErrorResponse(msg.ID, "invalid_operation", fmt.Sprintf("Unknown operation: %s", msg.Operation)) + conn.SendJSON(errResp) + } +} + +// handleRead processes a read operation +func (h *Handler) handleRead(conn *Connection, msg *Message, hookCtx *HookContext) { + // Execute before hook + if err := h.hooks.Execute(BeforeRead, hookCtx); err != nil { + logger.Error("[WebSocketSpec] BeforeRead hook failed: %v", err) + errResp := NewErrorResponse(msg.ID, "hook_error", err.Error()) + conn.SendJSON(errResp) + return + } + + // Perform read operation + var data interface{} + var metadata map[string]interface{} + var err error + + if hookCtx.ID != "" { + // Read single record by ID + data, err = h.readByID(hookCtx) + metadata = map[string]interface{}{"total": 1} + } else { + // Read multiple records + data, metadata, err = h.readMultiple(hookCtx) + } + + if err != nil { + logger.Error("[WebSocketSpec] Read operation failed: %v", err) + errResp := NewErrorResponse(msg.ID, "read_error", err.Error()) + conn.SendJSON(errResp) + return + } + + // Update hook context with result + hookCtx.Result = data + + // Execute after hook + if err := h.hooks.Execute(AfterRead, hookCtx); err != nil { + logger.Error("[WebSocketSpec] AfterRead hook failed: %v", err) + errResp := NewErrorResponse(msg.ID, "hook_error", err.Error()) + conn.SendJSON(errResp) + return + } + + // Send response + resp := NewResponseMessage(msg.ID, true, hookCtx.Result) + resp.Metadata = metadata + conn.SendJSON(resp) +} + +// handleCreate processes a create operation +func (h *Handler) handleCreate(conn *Connection, msg *Message, hookCtx *HookContext) { + // Execute before hook + if err := h.hooks.Execute(BeforeCreate, hookCtx); err != nil { + logger.Error("[WebSocketSpec] BeforeCreate hook failed: %v", err) + errResp := NewErrorResponse(msg.ID, "hook_error", err.Error()) + conn.SendJSON(errResp) + return + } + + // Perform create operation + data, err := h.create(hookCtx) + if err != nil { + logger.Error("[WebSocketSpec] Create operation failed: %v", err) + errResp := NewErrorResponse(msg.ID, "create_error", err.Error()) + conn.SendJSON(errResp) + return + } + + // Update hook context + hookCtx.Result = data + + // Execute after hook + if err := h.hooks.Execute(AfterCreate, hookCtx); err != nil { + logger.Error("[WebSocketSpec] AfterCreate hook failed: %v", err) + errResp := NewErrorResponse(msg.ID, "hook_error", err.Error()) + conn.SendJSON(errResp) + return + } + + // Send response + resp := NewResponseMessage(msg.ID, true, hookCtx.Result) + conn.SendJSON(resp) + + // Notify subscribers + h.notifySubscribers(hookCtx.Schema, hookCtx.Entity, OperationCreate, data) +} + +// handleUpdate processes an update operation +func (h *Handler) handleUpdate(conn *Connection, msg *Message, hookCtx *HookContext) { + // Execute before hook + if err := h.hooks.Execute(BeforeUpdate, hookCtx); err != nil { + logger.Error("[WebSocketSpec] BeforeUpdate hook failed: %v", err) + errResp := NewErrorResponse(msg.ID, "hook_error", err.Error()) + conn.SendJSON(errResp) + return + } + + // Perform update operation + data, err := h.update(hookCtx) + if err != nil { + logger.Error("[WebSocketSpec] Update operation failed: %v", err) + errResp := NewErrorResponse(msg.ID, "update_error", err.Error()) + conn.SendJSON(errResp) + return + } + + // Update hook context + hookCtx.Result = data + + // Execute after hook + if err := h.hooks.Execute(AfterUpdate, hookCtx); err != nil { + logger.Error("[WebSocketSpec] AfterUpdate hook failed: %v", err) + errResp := NewErrorResponse(msg.ID, "hook_error", err.Error()) + conn.SendJSON(errResp) + return + } + + // Send response + resp := NewResponseMessage(msg.ID, true, hookCtx.Result) + conn.SendJSON(resp) + + // Notify subscribers + h.notifySubscribers(hookCtx.Schema, hookCtx.Entity, OperationUpdate, data) +} + +// handleDelete processes a delete operation +func (h *Handler) handleDelete(conn *Connection, msg *Message, hookCtx *HookContext) { + // Execute before hook + if err := h.hooks.Execute(BeforeDelete, hookCtx); err != nil { + logger.Error("[WebSocketSpec] BeforeDelete hook failed: %v", err) + errResp := NewErrorResponse(msg.ID, "hook_error", err.Error()) + conn.SendJSON(errResp) + return + } + + // Perform delete operation + err := h.delete(hookCtx) + if err != nil { + logger.Error("[WebSocketSpec] Delete operation failed: %v", err) + errResp := NewErrorResponse(msg.ID, "delete_error", err.Error()) + conn.SendJSON(errResp) + return + } + + // Execute after hook + if err := h.hooks.Execute(AfterDelete, hookCtx); err != nil { + logger.Error("[WebSocketSpec] AfterDelete hook failed: %v", err) + errResp := NewErrorResponse(msg.ID, "hook_error", err.Error()) + conn.SendJSON(errResp) + return + } + + // Send response + resp := NewResponseMessage(msg.ID, true, map[string]interface{}{"deleted": true}) + conn.SendJSON(resp) + + // Notify subscribers + h.notifySubscribers(hookCtx.Schema, hookCtx.Entity, OperationDelete, map[string]interface{}{"id": hookCtx.ID}) +} + +// handleMeta processes a metadata request +func (h *Handler) handleMeta(conn *Connection, msg *Message, hookCtx *HookContext) { + metadata := h.getMetadata(hookCtx.Schema, hookCtx.Entity, hookCtx.Model) + resp := NewResponseMessage(msg.ID, true, metadata) + conn.SendJSON(resp) +} + +// handleSubscription processes subscription messages +func (h *Handler) handleSubscription(conn *Connection, msg *Message) { + switch msg.Operation { + case OperationSubscribe: + h.handleSubscribe(conn, msg) + case OperationUnsubscribe: + h.handleUnsubscribe(conn, msg) + default: + errResp := NewErrorResponse(msg.ID, "invalid_subscription_operation", fmt.Sprintf("Unknown subscription operation: %s", msg.Operation)) + conn.SendJSON(errResp) + } +} + +// handleSubscribe creates a new subscription +func (h *Handler) handleSubscribe(conn *Connection, msg *Message) { + // Generate subscription ID + subID := uuid.New().String() + + // Create hook context + hookCtx := &HookContext{ + Context: conn.ctx, + Handler: h, + Connection: conn, + Message: msg, + Schema: msg.Schema, + Entity: msg.Entity, + Options: msg.Options, + Metadata: make(map[string]interface{}), + } + + // Execute before hook + if err := h.hooks.Execute(BeforeSubscribe, hookCtx); err != nil { + logger.Error("[WebSocketSpec] BeforeSubscribe hook failed: %v", err) + errResp := NewErrorResponse(msg.ID, "hook_error", err.Error()) + conn.SendJSON(errResp) + return + } + + // Create subscription + sub := h.subscriptionManager.Subscribe(subID, conn.ID, msg.Schema, msg.Entity, msg.Options) + conn.AddSubscription(sub) + + // Update hook context + hookCtx.Subscription = sub + + // Execute after hook + h.hooks.Execute(AfterSubscribe, hookCtx) + + // Send response + resp := NewResponseMessage(msg.ID, true, map[string]interface{}{ + "subscription_id": subID, + "schema": msg.Schema, + "entity": msg.Entity, + }) + conn.SendJSON(resp) + + logger.Info("[WebSocketSpec] Subscription created: %s for %s.%s (conn: %s)", subID, msg.Schema, msg.Entity, conn.ID) +} + +// handleUnsubscribe removes a subscription +func (h *Handler) handleUnsubscribe(conn *Connection, msg *Message) { + subID := msg.SubscriptionID + if subID == "" { + errResp := NewErrorResponse(msg.ID, "missing_subscription_id", "Subscription ID is required for unsubscribe") + conn.SendJSON(errResp) + return + } + + // Get subscription + sub, exists := conn.GetSubscription(subID) + if !exists { + errResp := NewErrorResponse(msg.ID, "subscription_not_found", fmt.Sprintf("Subscription not found: %s", subID)) + conn.SendJSON(errResp) + return + } + + // Create hook context + hookCtx := &HookContext{ + Context: conn.ctx, + Handler: h, + Connection: conn, + Message: msg, + Subscription: sub, + Metadata: make(map[string]interface{}), + } + + // Execute before hook + if err := h.hooks.Execute(BeforeUnsubscribe, hookCtx); err != nil { + logger.Error("[WebSocketSpec] BeforeUnsubscribe hook failed: %v", err) + errResp := NewErrorResponse(msg.ID, "hook_error", err.Error()) + conn.SendJSON(errResp) + return + } + + // Remove subscription + h.subscriptionManager.Unsubscribe(subID) + conn.RemoveSubscription(subID) + + // Execute after hook + h.hooks.Execute(AfterUnsubscribe, hookCtx) + + // Send response + resp := NewResponseMessage(msg.ID, true, map[string]interface{}{ + "unsubscribed": true, + "subscription_id": subID, + }) + conn.SendJSON(resp) +} + +// handlePing responds to ping messages +func (h *Handler) handlePing(conn *Connection, msg *Message) { + pong := &Message{ + ID: msg.ID, + Type: MessageTypePong, + Timestamp: time.Now(), + } + conn.SendJSON(pong) +} + +// notifySubscribers sends notifications to all subscribers of an entity +func (h *Handler) notifySubscribers(schema, entity string, operation OperationType, data interface{}) { + subscriptions := h.subscriptionManager.GetSubscriptionsByEntity(schema, entity) + if len(subscriptions) == 0 { + return + } + + for _, sub := range subscriptions { + // Check if data matches subscription filters + if !sub.MatchesFilters(data) { + continue + } + + // Get connection + conn, exists := h.connManager.GetConnection(sub.ConnectionID) + if !exists { + continue + } + + // Send notification + notification := NewNotificationMessage(sub.ID, operation, schema, entity, data) + if err := conn.SendJSON(notification); err != nil { + logger.Error("[WebSocketSpec] Failed to send notification to connection %s: %v", conn.ID, err) + } + } +} + +// CRUD operation implementations + +func (h *Handler) readByID(hookCtx *HookContext) (interface{}, error) { + query := h.db.NewSelect().Model(hookCtx.ModelPtr).Table(hookCtx.TableName) + + // Add ID filter + pkName := reflection.GetPrimaryKeyName(hookCtx.Model) + query = query.Where(fmt.Sprintf("%s = ?", pkName), hookCtx.ID) + + // Apply columns + if hookCtx.Options != nil && len(hookCtx.Options.Columns) > 0 { + query = query.Column(hookCtx.Options.Columns...) + } + + // Apply preloads (simplified for now) + if hookCtx.Options != nil { + for _, preload := range hookCtx.Options.Preload { + query = query.PreloadRelation(preload.Relation) + } + } + + // Execute query + if err := query.ScanModel(hookCtx.Context); err != nil { + return nil, fmt.Errorf("failed to read record: %w", err) + } + + return hookCtx.ModelPtr, nil +} + +func (h *Handler) readMultiple(hookCtx *HookContext) (interface{}, map[string]interface{}, error) { + query := h.db.NewSelect().Model(hookCtx.ModelPtr).Table(hookCtx.TableName) + + // Apply options (simplified implementation) + if hookCtx.Options != nil { + // Apply filters + for _, filter := range hookCtx.Options.Filters { + query = query.Where(fmt.Sprintf("%s %s ?", filter.Column, h.getOperatorSQL(filter.Operator)), filter.Value) + } + + // Apply sorting + for _, sort := range hookCtx.Options.Sort { + direction := "ASC" + if sort.Direction == "desc" { + direction = "DESC" + } + query = query.Order(fmt.Sprintf("%s %s", sort.Column, direction)) + } + + // Apply limit and offset + if hookCtx.Options.Limit != nil { + query = query.Limit(*hookCtx.Options.Limit) + } + if hookCtx.Options.Offset != nil { + query = query.Offset(*hookCtx.Options.Offset) + } + + // Apply preloads + for _, preload := range hookCtx.Options.Preload { + query = query.PreloadRelation(preload.Relation) + } + + // Apply columns + if len(hookCtx.Options.Columns) > 0 { + query = query.Column(hookCtx.Options.Columns...) + } + } + + // Execute query + if err := query.ScanModel(hookCtx.Context); err != nil { + return nil, nil, fmt.Errorf("failed to read records: %w", err) + } + + // Get count + metadata := make(map[string]interface{}) + countQuery := h.db.NewSelect().Model(hookCtx.ModelPtr).Table(hookCtx.TableName) + if hookCtx.Options != nil { + for _, filter := range hookCtx.Options.Filters { + countQuery = countQuery.Where(fmt.Sprintf("%s %s ?", filter.Column, h.getOperatorSQL(filter.Operator)), filter.Value) + } + } + count, _ := countQuery.Count(hookCtx.Context) + metadata["total"] = count + metadata["count"] = reflection.Len(hookCtx.ModelPtr) + + return hookCtx.ModelPtr, metadata, nil +} + +func (h *Handler) create(hookCtx *HookContext) (interface{}, error) { + // Marshal and unmarshal data into model + dataBytes, err := json.Marshal(hookCtx.Data) + if err != nil { + return nil, fmt.Errorf("failed to marshal data: %w", err) + } + + if err := json.Unmarshal(dataBytes, hookCtx.ModelPtr); err != nil { + return nil, fmt.Errorf("failed to unmarshal data into model: %w", err) + } + + // Insert record + query := h.db.NewInsert().Model(hookCtx.ModelPtr).Table(hookCtx.TableName) + if _, err := query.Exec(hookCtx.Context); err != nil { + return nil, fmt.Errorf("failed to create record: %w", err) + } + + return hookCtx.ModelPtr, nil +} + +func (h *Handler) update(hookCtx *HookContext) (interface{}, error) { + // Marshal and unmarshal data into model + dataBytes, err := json.Marshal(hookCtx.Data) + if err != nil { + return nil, fmt.Errorf("failed to marshal data: %w", err) + } + + if err := json.Unmarshal(dataBytes, hookCtx.ModelPtr); err != nil { + return nil, fmt.Errorf("failed to unmarshal data into model: %w", err) + } + + // Update record + query := h.db.NewUpdate().Model(hookCtx.ModelPtr).Table(hookCtx.TableName) + + // Add ID filter + pkName := reflection.GetPrimaryKeyName(hookCtx.Model) + query = query.Where(fmt.Sprintf("%s = ?", pkName), hookCtx.ID) + + if _, err := query.Exec(hookCtx.Context); err != nil { + return nil, fmt.Errorf("failed to update record: %w", err) + } + + // Fetch updated record + return h.readByID(hookCtx) +} + +func (h *Handler) delete(hookCtx *HookContext) error { + query := h.db.NewDelete().Model(hookCtx.ModelPtr).Table(hookCtx.TableName) + + // Add ID filter + pkName := reflection.GetPrimaryKeyName(hookCtx.Model) + query = query.Where(fmt.Sprintf("%s = ?", pkName), hookCtx.ID) + + if _, err := query.Exec(hookCtx.Context); err != nil { + return fmt.Errorf("failed to delete record: %w", err) + } + + return nil +} + +// Helper methods + +func (h *Handler) getTableName(schema, entity string, model interface{}) string { + // Use entity as table name + tableName := entity + + if schema != "" { + tableName = schema + "." + tableName + } + return tableName +} + +func (h *Handler) getMetadata(schema, entity string, model interface{}) map[string]interface{} { + metadata := make(map[string]interface{}) + metadata["schema"] = schema + metadata["entity"] = entity + metadata["table_name"] = h.getTableName(schema, entity, model) + + // Get fields from model using reflection + columns := reflection.GetModelColumns(model) + metadata["columns"] = columns + metadata["primary_key"] = reflection.GetPrimaryKeyName(model) + + return metadata +} + +// getOperatorSQL converts filter operator to SQL operator +func (h *Handler) getOperatorSQL(operator string) string { + switch operator { + case "eq": + return "=" + case "neq": + return "!=" + case "gt": + return ">" + case "gte": + return ">=" + case "lt": + return "<" + case "lte": + return "<=" + case "like": + return "LIKE" + case "ilike": + return "ILIKE" + case "in": + return "IN" + default: + return "=" + } +} + +// Shutdown gracefully shuts down the handler +func (h *Handler) Shutdown() { + h.connManager.Shutdown() +} + +// GetConnectionCount returns the number of active connections +func (h *Handler) GetConnectionCount() int { + return h.connManager.Count() +} + +// GetSubscriptionCount returns the number of active subscriptions +func (h *Handler) GetSubscriptionCount() int { + return h.subscriptionManager.Count() +} + +// BroadcastMessage sends a message to all connections matching the filter +func (h *Handler) BroadcastMessage(message interface{}, filter func(*Connection) bool) error { + data, err := json.Marshal(message) + if err != nil { + return fmt.Errorf("failed to marshal message: %w", err) + } + + h.connManager.Broadcast(data, filter) + return nil +} + +// GetConnection retrieves a connection by ID +func (h *Handler) GetConnection(id string) (*Connection, bool) { + return h.connManager.GetConnection(id) +} + +// Helper to convert string ID to int64 +func parseID(id string) (int64, error) { + return strconv.ParseInt(id, 10, 64) +} diff --git a/pkg/websocketspec/hooks.go b/pkg/websocketspec/hooks.go new file mode 100644 index 0000000..fc5af17 --- /dev/null +++ b/pkg/websocketspec/hooks.go @@ -0,0 +1,193 @@ +package websocketspec + +import ( + "context" + + "github.com/bitechdev/ResolveSpec/pkg/common" +) + +// HookType represents the type of lifecycle hook +type HookType string + +const ( + // BeforeRead is called before a read operation + BeforeRead HookType = "before_read" + // AfterRead is called after a read operation + AfterRead HookType = "after_read" + + // BeforeCreate is called before a create operation + BeforeCreate HookType = "before_create" + // AfterCreate is called after a create operation + AfterCreate HookType = "after_create" + + // BeforeUpdate is called before an update operation + BeforeUpdate HookType = "before_update" + // AfterUpdate is called after an update operation + AfterUpdate HookType = "after_update" + + // BeforeDelete is called before a delete operation + BeforeDelete HookType = "before_delete" + // AfterDelete is called after a delete operation + AfterDelete HookType = "after_delete" + + // BeforeSubscribe is called before creating a subscription + BeforeSubscribe HookType = "before_subscribe" + // AfterSubscribe is called after creating a subscription + AfterSubscribe HookType = "after_subscribe" + + // BeforeUnsubscribe is called before removing a subscription + BeforeUnsubscribe HookType = "before_unsubscribe" + // AfterUnsubscribe is called after removing a subscription + AfterUnsubscribe HookType = "after_unsubscribe" + + // BeforeConnect is called when a new connection is established + BeforeConnect HookType = "before_connect" + // AfterConnect is called after a connection is established + AfterConnect HookType = "after_connect" + + // BeforeDisconnect is called before a connection is closed + BeforeDisconnect HookType = "before_disconnect" + // AfterDisconnect is called after a connection is closed + AfterDisconnect HookType = "after_disconnect" +) + +// HookContext contains context information for hook execution +type HookContext struct { + // Context is the request context + Context context.Context + + // Handler provides access to the handler, database, and registry + Handler *Handler + + // Connection is the WebSocket connection + Connection *Connection + + // Message is the original message + Message *Message + + // Schema is the database schema + Schema string + + // Entity is the table/model name + Entity string + + // TableName is the actual database table name + TableName string + + // Model is the registered model instance + Model interface{} + + // ModelPtr is a pointer to the model for queries + ModelPtr interface{} + + // Options contains the parsed request options + Options *common.RequestOptions + + // ID is the record ID for single-record operations + ID string + + // Data is the request data (for create/update operations) + Data interface{} + + // Result is the operation result (for after hooks) + Result interface{} + + // Subscription is the subscription being created/removed + Subscription *Subscription + + // Error is any error that occurred (for after hooks) + Error error + + // Metadata is additional context data + Metadata map[string]interface{} +} + +// HookFunc is a function that processes a hook +type HookFunc func(*HookContext) error + +// HookRegistry manages lifecycle hooks +type HookRegistry struct { + hooks map[HookType][]HookFunc +} + +// NewHookRegistry creates a new hook registry +func NewHookRegistry() *HookRegistry { + return &HookRegistry{ + hooks: make(map[HookType][]HookFunc), + } +} + +// Register registers a hook function for a specific hook type +func (hr *HookRegistry) Register(hookType HookType, fn HookFunc) { + hr.hooks[hookType] = append(hr.hooks[hookType], fn) +} + +// RegisterBefore registers a hook that runs before an operation +// Convenience method for BeforeRead, BeforeCreate, BeforeUpdate, BeforeDelete +func (hr *HookRegistry) RegisterBefore(operation OperationType, fn HookFunc) { + switch operation { + case OperationRead: + hr.Register(BeforeRead, fn) + case OperationCreate: + hr.Register(BeforeCreate, fn) + case OperationUpdate: + hr.Register(BeforeUpdate, fn) + case OperationDelete: + hr.Register(BeforeDelete, fn) + case OperationSubscribe: + hr.Register(BeforeSubscribe, fn) + case OperationUnsubscribe: + hr.Register(BeforeUnsubscribe, fn) + } +} + +// RegisterAfter registers a hook that runs after an operation +// Convenience method for AfterRead, AfterCreate, AfterUpdate, AfterDelete +func (hr *HookRegistry) RegisterAfter(operation OperationType, fn HookFunc) { + switch operation { + case OperationRead: + hr.Register(AfterRead, fn) + case OperationCreate: + hr.Register(AfterCreate, fn) + case OperationUpdate: + hr.Register(AfterUpdate, fn) + case OperationDelete: + hr.Register(AfterDelete, fn) + case OperationSubscribe: + hr.Register(AfterSubscribe, fn) + case OperationUnsubscribe: + hr.Register(AfterUnsubscribe, fn) + } +} + +// Execute runs all hooks for a specific type +func (hr *HookRegistry) Execute(hookType HookType, ctx *HookContext) error { + hooks, exists := hr.hooks[hookType] + if !exists { + return nil + } + + for _, hook := range hooks { + if err := hook(ctx); err != nil { + return err + } + } + + return nil +} + +// HasHooks checks if any hooks are registered for a hook type +func (hr *HookRegistry) HasHooks(hookType HookType) bool { + hooks, exists := hr.hooks[hookType] + return exists && len(hooks) > 0 +} + +// Clear removes all hooks of a specific type +func (hr *HookRegistry) Clear(hookType HookType) { + delete(hr.hooks, hookType) +} + +// ClearAll removes all registered hooks +func (hr *HookRegistry) ClearAll() { + hr.hooks = make(map[HookType][]HookFunc) +} diff --git a/pkg/websocketspec/message.go b/pkg/websocketspec/message.go new file mode 100644 index 0000000..6e009d9 --- /dev/null +++ b/pkg/websocketspec/message.go @@ -0,0 +1,240 @@ +package websocketspec + +import ( + "encoding/json" + "time" + + "github.com/bitechdev/ResolveSpec/pkg/common" +) + +// MessageType represents the type of WebSocket message +type MessageType string + +const ( + // MessageTypeRequest is a client request message + MessageTypeRequest MessageType = "request" + // MessageTypeResponse is a server response message + MessageTypeResponse MessageType = "response" + // MessageTypeNotification is a server-initiated notification + MessageTypeNotification MessageType = "notification" + // MessageTypeSubscription is a subscription control message + MessageTypeSubscription MessageType = "subscription" + // MessageTypeError is an error message + MessageTypeError MessageType = "error" + // MessageTypePing is a keepalive ping message + MessageTypePing MessageType = "ping" + // MessageTypePong is a keepalive pong response + MessageTypePong MessageType = "pong" +) + +// OperationType represents the operation to perform +type OperationType string + +const ( + // OperationRead retrieves records + OperationRead OperationType = "read" + // OperationCreate creates a new record + OperationCreate OperationType = "create" + // OperationUpdate updates an existing record + OperationUpdate OperationType = "update" + // OperationDelete deletes a record + OperationDelete OperationType = "delete" + // OperationSubscribe subscribes to entity changes + OperationSubscribe OperationType = "subscribe" + // OperationUnsubscribe unsubscribes from entity changes + OperationUnsubscribe OperationType = "unsubscribe" + // OperationMeta retrieves metadata about an entity + OperationMeta OperationType = "meta" +) + +// Message represents a WebSocket message +type Message struct { + // ID is a unique identifier for request/response correlation + ID string `json:"id,omitempty"` + + // Type is the message type + Type MessageType `json:"type"` + + // Operation is the operation to perform + Operation OperationType `json:"operation,omitempty"` + + // Schema is the database schema name + Schema string `json:"schema,omitempty"` + + // Entity is the table/model name + Entity string `json:"entity,omitempty"` + + // RecordID is the ID for single-record operations (update, delete, read by ID) + RecordID string `json:"record_id,omitempty"` + + // Data contains the request/response payload + Data interface{} `json:"data,omitempty"` + + // Options contains query options (filters, sorting, pagination, etc.) + Options *common.RequestOptions `json:"options,omitempty"` + + // SubscriptionID is the subscription identifier + SubscriptionID string `json:"subscription_id,omitempty"` + + // Success indicates if the operation was successful + Success bool `json:"success,omitempty"` + + // Error contains error information + Error *ErrorInfo `json:"error,omitempty"` + + // Metadata contains additional response metadata + Metadata map[string]interface{} `json:"metadata,omitempty"` + + // Timestamp is when the message was created + Timestamp time.Time `json:"timestamp,omitempty"` +} + +// ErrorInfo contains error details +type ErrorInfo struct { + // Code is the error code + Code string `json:"code"` + + // Message is a human-readable error message + Message string `json:"message"` + + // Details contains additional error context + Details map[string]interface{} `json:"details,omitempty"` +} + +// RequestMessage represents a client request +type RequestMessage struct { + ID string `json:"id"` + Type MessageType `json:"type"` + Operation OperationType `json:"operation"` + Schema string `json:"schema,omitempty"` + Entity string `json:"entity"` + RecordID string `json:"record_id,omitempty"` + Data interface{} `json:"data,omitempty"` + Options *common.RequestOptions `json:"options,omitempty"` +} + +// ResponseMessage represents a server response +type ResponseMessage struct { + ID string `json:"id"` + Type MessageType `json:"type"` + Success bool `json:"success"` + Data interface{} `json:"data,omitempty"` + Error *ErrorInfo `json:"error,omitempty"` + Metadata map[string]interface{} `json:"metadata,omitempty"` + Timestamp time.Time `json:"timestamp"` +} + +// NotificationMessage represents a server-initiated notification +type NotificationMessage struct { + Type MessageType `json:"type"` + Operation OperationType `json:"operation"` + SubscriptionID string `json:"subscription_id"` + Schema string `json:"schema"` + Entity string `json:"entity"` + Data interface{} `json:"data"` + Timestamp time.Time `json:"timestamp"` +} + +// SubscriptionMessage represents a subscription control message +type SubscriptionMessage struct { + ID string `json:"id"` + Type MessageType `json:"type"` + Operation OperationType `json:"operation"` // subscribe or unsubscribe + Schema string `json:"schema,omitempty"` + Entity string `json:"entity"` + Options *common.RequestOptions `json:"options,omitempty"` // Filters for subscription + SubscriptionID string `json:"subscription_id,omitempty"` // For unsubscribe +} + +// NewRequestMessage creates a new request message +func NewRequestMessage(id string, operation OperationType, schema, entity string) *RequestMessage { + return &RequestMessage{ + ID: id, + Type: MessageTypeRequest, + Operation: operation, + Schema: schema, + Entity: entity, + } +} + +// NewResponseMessage creates a new response message +func NewResponseMessage(id string, success bool, data interface{}) *ResponseMessage { + return &ResponseMessage{ + ID: id, + Type: MessageTypeResponse, + Success: success, + Data: data, + Timestamp: time.Now(), + } +} + +// NewErrorResponse creates an error response message +func NewErrorResponse(id string, code, message string) *ResponseMessage { + return &ResponseMessage{ + ID: id, + Type: MessageTypeResponse, + Success: false, + Error: &ErrorInfo{ + Code: code, + Message: message, + }, + Timestamp: time.Now(), + } +} + +// NewNotificationMessage creates a new notification message +func NewNotificationMessage(subscriptionID string, operation OperationType, schema, entity string, data interface{}) *NotificationMessage { + return &NotificationMessage{ + Type: MessageTypeNotification, + Operation: operation, + SubscriptionID: subscriptionID, + Schema: schema, + Entity: entity, + Data: data, + Timestamp: time.Now(), + } +} + +// ParseMessage parses a JSON message into a Message struct +func ParseMessage(data []byte) (*Message, error) { + var msg Message + if err := json.Unmarshal(data, &msg); err != nil { + return nil, err + } + return &msg, nil +} + +// ToJSON converts a message to JSON bytes +func (m *Message) ToJSON() ([]byte, error) { + return json.Marshal(m) +} + +// ToJSON converts a response message to JSON bytes +func (r *ResponseMessage) ToJSON() ([]byte, error) { + return json.Marshal(r) +} + +// ToJSON converts a notification message to JSON bytes +func (n *NotificationMessage) ToJSON() ([]byte, error) { + return json.Marshal(n) +} + +// IsValid checks if a message is valid +func (m *Message) IsValid() bool { + // Type must be set + if m.Type == "" { + return false + } + + // Request messages must have an ID, operation, and entity + if m.Type == MessageTypeRequest { + return m.ID != "" && m.Operation != "" && m.Entity != "" + } + + // Subscription messages must have an ID and operation + if m.Type == MessageTypeSubscription { + return m.ID != "" && m.Operation != "" + } + + return true +} diff --git a/pkg/websocketspec/subscription.go b/pkg/websocketspec/subscription.go new file mode 100644 index 0000000..a6b552c --- /dev/null +++ b/pkg/websocketspec/subscription.go @@ -0,0 +1,192 @@ +package websocketspec + +import ( + "sync" + + "github.com/bitechdev/ResolveSpec/pkg/common" + "github.com/bitechdev/ResolveSpec/pkg/logger" +) + +// Subscription represents a subscription to entity changes +type Subscription struct { + // ID is the unique subscription identifier + ID string + + // ConnectionID is the ID of the connection that owns this subscription + ConnectionID string + + // Schema is the database schema + Schema string + + // Entity is the table/model name + Entity string + + // Options contains filters and other query options + Options *common.RequestOptions + + // Active indicates if the subscription is active + Active bool +} + +// SubscriptionManager manages all subscriptions +type SubscriptionManager struct { + // subscriptions maps subscription ID to subscription + subscriptions map[string]*Subscription + + // entitySubscriptions maps "schema.entity" to list of subscription IDs + entitySubscriptions map[string][]string + + // mu protects the maps + mu sync.RWMutex +} + +// NewSubscriptionManager creates a new subscription manager +func NewSubscriptionManager() *SubscriptionManager { + return &SubscriptionManager{ + subscriptions: make(map[string]*Subscription), + entitySubscriptions: make(map[string][]string), + } +} + +// Subscribe creates a new subscription +func (sm *SubscriptionManager) Subscribe(id, connID, schema, entity string, options *common.RequestOptions) *Subscription { + sm.mu.Lock() + defer sm.mu.Unlock() + + sub := &Subscription{ + ID: id, + ConnectionID: connID, + Schema: schema, + Entity: entity, + Options: options, + Active: true, + } + + // Store subscription + sm.subscriptions[id] = sub + + // Index by entity + key := makeEntityKey(schema, entity) + sm.entitySubscriptions[key] = append(sm.entitySubscriptions[key], id) + + logger.Info("[WebSocketSpec] Subscription created: %s for %s.%s (conn: %s)", id, schema, entity, connID) + return sub +} + +// Unsubscribe removes a subscription +func (sm *SubscriptionManager) Unsubscribe(subID string) bool { + sm.mu.Lock() + defer sm.mu.Unlock() + + sub, exists := sm.subscriptions[subID] + if !exists { + return false + } + + // Remove from entity index + key := makeEntityKey(sub.Schema, sub.Entity) + if subs, ok := sm.entitySubscriptions[key]; ok { + newSubs := make([]string, 0, len(subs)-1) + for _, id := range subs { + if id != subID { + newSubs = append(newSubs, id) + } + } + if len(newSubs) > 0 { + sm.entitySubscriptions[key] = newSubs + } else { + delete(sm.entitySubscriptions, key) + } + } + + // Remove subscription + delete(sm.subscriptions, subID) + + logger.Info("[WebSocketSpec] Subscription removed: %s", subID) + return true +} + +// GetSubscription retrieves a subscription by ID +func (sm *SubscriptionManager) GetSubscription(subID string) (*Subscription, bool) { + sm.mu.RLock() + defer sm.mu.RUnlock() + sub, ok := sm.subscriptions[subID] + return sub, ok +} + +// GetSubscriptionsByEntity retrieves all subscriptions for an entity +func (sm *SubscriptionManager) GetSubscriptionsByEntity(schema, entity string) []*Subscription { + sm.mu.RLock() + defer sm.mu.RUnlock() + + key := makeEntityKey(schema, entity) + subIDs, ok := sm.entitySubscriptions[key] + if !ok { + return nil + } + + result := make([]*Subscription, 0, len(subIDs)) + for _, subID := range subIDs { + if sub, ok := sm.subscriptions[subID]; ok && sub.Active { + result = append(result, sub) + } + } + + return result +} + +// GetSubscriptionsByConnection retrieves all subscriptions for a connection +func (sm *SubscriptionManager) GetSubscriptionsByConnection(connID string) []*Subscription { + sm.mu.RLock() + defer sm.mu.RUnlock() + + result := make([]*Subscription, 0) + for _, sub := range sm.subscriptions { + if sub.ConnectionID == connID && sub.Active { + result = append(result, sub) + } + } + + return result +} + +// Count returns the total number of active subscriptions +func (sm *SubscriptionManager) Count() int { + sm.mu.RLock() + defer sm.mu.RUnlock() + return len(sm.subscriptions) +} + +// CountForEntity returns the number of subscriptions for a specific entity +func (sm *SubscriptionManager) CountForEntity(schema, entity string) int { + sm.mu.RLock() + defer sm.mu.RUnlock() + + key := makeEntityKey(schema, entity) + return len(sm.entitySubscriptions[key]) +} + +// MatchesFilters checks if data matches the subscription's filters +func (s *Subscription) MatchesFilters(data interface{}) bool { + // If no filters, match everything + if s.Options == nil || len(s.Options.Filters) == 0 { + return true + } + + // TODO: Implement filter matching logic + // For now, return true (send all notifications) + // In a full implementation, you would: + // 1. Convert data to a map + // 2. Evaluate each filter against the data + // 3. Return true only if all filters match + + return true +} + +// makeEntityKey creates a key for entity indexing +func makeEntityKey(schema, entity string) string { + if schema == "" { + return entity + } + return schema + "." + entity +} diff --git a/pkg/websocketspec/websocketspec.go b/pkg/websocketspec/websocketspec.go new file mode 100644 index 0000000..b1522ef --- /dev/null +++ b/pkg/websocketspec/websocketspec.go @@ -0,0 +1,331 @@ +// Package websocketspec provides a WebSocket-based API specification for real-time +// CRUD operations with bidirectional communication and subscription support. +// +// # Key Features +// +// - Real-time bidirectional communication over WebSocket +// - CRUD operations (Create, Read, Update, Delete) +// - Real-time subscriptions with filtering +// - Lifecycle hooks for all operations +// - Database-agnostic: Works with GORM and Bun ORM through adapters +// - Automatic change notifications to subscribers +// - Connection and subscription management +// +// # Message Protocol +// +// WebSocketSpec uses JSON messages for communication: +// +// { +// "id": "unique-message-id", +// "type": "request|response|notification|subscription", +// "operation": "read|create|update|delete|subscribe|unsubscribe", +// "schema": "public", +// "entity": "users", +// "data": {...}, +// "options": { +// "filters": [...], +// "columns": [...], +// "preload": [...], +// "sort": [...], +// "limit": 10 +// } +// } +// +// # Usage Example +// +// // Create handler with GORM +// handler := websocketspec.NewHandlerWithGORM(db) +// +// // Register models +// handler.Registry.RegisterModel("public.users", &User{}) +// +// // Setup WebSocket endpoint +// http.HandleFunc("/ws", handler.HandleWebSocket) +// +// // Start server +// http.ListenAndServe(":8080", nil) +// +// # Client Example +// +// // Connect to WebSocket +// ws := new WebSocket("ws://localhost:8080/ws") +// +// // Send read request +// ws.send(JSON.stringify({ +// id: "msg-1", +// type: "request", +// operation: "read", +// entity: "users", +// options: { +// filters: [{column: "status", operator: "eq", value: "active"}], +// limit: 10 +// } +// })) +// +// // Subscribe to changes +// ws.send(JSON.stringify({ +// id: "msg-2", +// type: "subscription", +// operation: "subscribe", +// entity: "users", +// options: { +// filters: [{column: "status", operator: "eq", value: "active"}] +// } +// })) +package websocketspec + +import ( + "github.com/bitechdev/ResolveSpec/pkg/common" + "github.com/bitechdev/ResolveSpec/pkg/common/adapters/database" + "github.com/bitechdev/ResolveSpec/pkg/modelregistry" + "github.com/uptrace/bun" + "gorm.io/gorm" +) + +// NewHandlerWithGORM creates a new Handler with GORM adapter +func NewHandlerWithGORM(db *gorm.DB) *Handler { + gormAdapter := database.NewGormAdapter(db) + registry := modelregistry.NewModelRegistry() + return NewHandler(gormAdapter, registry) +} + +// NewHandlerWithBun creates a new Handler with Bun adapter +func NewHandlerWithBun(db *bun.DB) *Handler { + bunAdapter := database.NewBunAdapter(db) + registry := modelregistry.NewModelRegistry() + return NewHandler(bunAdapter, registry) +} + +// NewHandlerWithDatabase creates a new Handler with a custom database adapter +func NewHandlerWithDatabase(db common.Database, registry common.ModelRegistry) *Handler { + return NewHandler(db, registry) +} + +// Example usage functions for documentation: + +// ExampleWithGORM shows how to use WebSocketSpec with GORM +func ExampleWithGORM(db *gorm.DB) { + // Create handler using GORM + handler := NewHandlerWithGORM(db) + + // Register models + handler.Registry().RegisterModel("public.users", &struct{}{}) + + // Register hooks (optional) + handler.Hooks().RegisterBefore(OperationRead, func(ctx *HookContext) error { + // Add custom logic before read operations + return nil + }) + + // Setup WebSocket endpoint + // http.HandleFunc("/ws", handler.HandleWebSocket) + + // Start server + // http.ListenAndServe(":8080", nil) +} + +// ExampleWithBun shows how to use WebSocketSpec with Bun ORM +func ExampleWithBun(bunDB *bun.DB) { + // Create handler using Bun + handler := NewHandlerWithBun(bunDB) + + // Register models + handler.Registry().RegisterModel("public.users", &struct{}{}) + + // Setup WebSocket endpoint + // http.HandleFunc("/ws", handler.HandleWebSocket) +} + +// ExampleWithHooks shows how to use lifecycle hooks +func ExampleWithHooks(db *gorm.DB) { + handler := NewHandlerWithGORM(db) + + // Register a before-read hook for authorization + handler.Hooks().RegisterBefore(OperationRead, func(ctx *HookContext) error { + // Check if user has permission to read this entity + // return fmt.Errorf("unauthorized") if not allowed + return nil + }) + + // Register an after-create hook for logging + handler.Hooks().RegisterAfter(OperationCreate, func(ctx *HookContext) error { + // Log the created record + // logger.Info("Created record: %v", ctx.Result) + return nil + }) + + // Register a before-subscribe hook to limit subscriptions + handler.Hooks().Register(BeforeSubscribe, func(ctx *HookContext) error { + // Limit number of subscriptions per connection + // if len(ctx.Connection.subscriptions) >= 10 { + // return fmt.Errorf("maximum subscriptions reached") + // } + return nil + }) +} + +// ExampleWithSubscriptions shows subscription usage +func ExampleWithSubscriptions() { + // Client-side JavaScript example: + /* + const ws = new WebSocket("ws://localhost:8080/ws"); + + // Subscribe to user changes + ws.send(JSON.stringify({ + id: "sub-1", + type: "subscription", + operation: "subscribe", + schema: "public", + entity: "users", + options: { + filters: [ + {column: "status", operator: "eq", value: "active"} + ] + } + })); + + // Handle notifications + ws.onmessage = (event) => { + const msg = JSON.parse(event.data); + if (msg.type === "notification") { + console.log("User changed:", msg.data); + console.log("Operation:", msg.operation); // create, update, or delete + } + }; + + // Unsubscribe + ws.send(JSON.stringify({ + id: "unsub-1", + type: "subscription", + operation: "unsubscribe", + subscription_id: "sub-abc123" + })); + */ +} + +// ExampleCRUDOperations shows basic CRUD operations +func ExampleCRUDOperations() { + // Client-side JavaScript example: + /* + const ws = new WebSocket("ws://localhost:8080/ws"); + + // CREATE - Create a new user + ws.send(JSON.stringify({ + id: "create-1", + type: "request", + operation: "create", + schema: "public", + entity: "users", + data: { + name: "John Doe", + email: "john@example.com", + status: "active" + } + })); + + // READ - Get all active users + ws.send(JSON.stringify({ + id: "read-1", + type: "request", + operation: "read", + schema: "public", + entity: "users", + options: { + filters: [{column: "status", operator: "eq", value: "active"}], + columns: ["id", "name", "email"], + sort: [{column: "name", direction: "asc"}], + limit: 10 + } + })); + + // READ BY ID - Get a specific user + ws.send(JSON.stringify({ + id: "read-2", + type: "request", + operation: "read", + schema: "public", + entity: "users", + record_id: "123" + })); + + // UPDATE - Update a user + ws.send(JSON.stringify({ + id: "update-1", + type: "request", + operation: "update", + schema: "public", + entity: "users", + record_id: "123", + data: { + name: "John Updated", + email: "john.updated@example.com" + } + })); + + // DELETE - Delete a user + ws.send(JSON.stringify({ + id: "delete-1", + type: "request", + operation: "delete", + schema: "public", + entity: "users", + record_id: "123" + })); + + // Handle responses + ws.onmessage = (event) => { + const response = JSON.parse(event.data); + if (response.type === "response") { + if (response.success) { + console.log("Operation successful:", response.data); + } else { + console.error("Operation failed:", response.error); + } + } + }; + */ +} + +// ExampleAuthentication shows how to implement authentication +func ExampleAuthentication() { + // Server-side example with authentication hook: + /* + handler := NewHandlerWithGORM(db) + + // Register before-connect hook for authentication + handler.Hooks().Register(BeforeConnect, func(ctx *HookContext) error { + // Extract token from query params or headers + r := ctx.Connection.ws.UnderlyingConn().RemoteAddr() + + // Validate token + // token := extractToken(r) + // user, err := validateToken(token) + // if err != nil { + // return fmt.Errorf("authentication failed: %w", err) + // } + + // Store user info in connection metadata + // ctx.Connection.SetMetadata("user", user) + // ctx.Connection.SetMetadata("user_id", user.ID) + + return nil + }) + + // Use connection metadata in other hooks + handler.Hooks().RegisterBefore(OperationRead, func(ctx *HookContext) error { + // Get user from connection metadata + // userID, _ := ctx.Connection.GetMetadata("user_id") + + // Add filter to only show user's own records + // if ctx.Entity == "orders" { + // ctx.Options.Filters = append(ctx.Options.Filters, common.FilterOption{ + // Column: "user_id", + // Operator: "eq", + // Value: userID, + // }) + // } + + return nil + }) + */ +} diff --git a/resolvespec-js/WEBSOCKET.md b/resolvespec-js/WEBSOCKET.md new file mode 100644 index 0000000..00e4fa0 --- /dev/null +++ b/resolvespec-js/WEBSOCKET.md @@ -0,0 +1,530 @@ +# WebSocketSpec JavaScript Client + +A TypeScript/JavaScript client for connecting to WebSocketSpec servers with full support for real-time subscriptions, CRUD operations, and automatic reconnection. + +## Installation + +```bash +npm install @warkypublic/resolvespec-js +# or +yarn add @warkypublic/resolvespec-js +# or +pnpm add @warkypublic/resolvespec-js +``` + +## Quick Start + +```typescript +import { WebSocketClient } from '@warkypublic/resolvespec-js'; + +// Create client +const client = new WebSocketClient({ + url: 'ws://localhost:8080/ws', + reconnect: true, + debug: true +}); + +// Connect +await client.connect(); + +// Read records +const users = await client.read('users', { + schema: 'public', + filters: [ + { column: 'status', operator: 'eq', value: 'active' } + ], + limit: 10 +}); + +// Subscribe to changes +const subscriptionId = await client.subscribe('users', (notification) => { + console.log('User changed:', notification.operation, notification.data); +}, { schema: 'public' }); + +// Clean up +await client.unsubscribe(subscriptionId); +client.disconnect(); +``` + +## Features + +- **Real-Time Updates**: Subscribe to entity changes and receive instant notifications +- **Full CRUD Support**: Create, read, update, and delete operations +- **TypeScript Support**: Full type definitions included +- **Auto Reconnection**: Automatic reconnection with configurable retry logic +- **Heartbeat**: Built-in keepalive mechanism +- **Event System**: Listen to connection, error, and message events +- **Promise-based API**: All async operations return promises +- **Filter & Sort**: Advanced querying with filters, sorting, and pagination +- **Preloading**: Load related entities in a single query + +## Configuration + +```typescript +const client = new WebSocketClient({ + url: 'ws://localhost:8080/ws', // WebSocket server URL + reconnect: true, // Enable auto-reconnection + reconnectInterval: 3000, // Reconnection delay (ms) + maxReconnectAttempts: 10, // Max reconnection attempts + heartbeatInterval: 30000, // Heartbeat interval (ms) + debug: false // Enable debug logging +}); +``` + +## API Reference + +### Connection Management + +#### `connect(): Promise` +Connect to the WebSocket server. + +```typescript +await client.connect(); +``` + +#### `disconnect(): void` +Disconnect from the server. + +```typescript +client.disconnect(); +``` + +#### `isConnected(): boolean` +Check if currently connected. + +```typescript +if (client.isConnected()) { + console.log('Connected!'); +} +``` + +#### `getState(): ConnectionState` +Get current connection state: `'connecting'`, `'connected'`, `'disconnecting'`, `'disconnected'`, or `'reconnecting'`. + +```typescript +const state = client.getState(); +console.log('State:', state); +``` + +### CRUD Operations + +#### `read(entity: string, options?): Promise` +Read records from an entity. + +```typescript +// Read all active users +const users = await client.read('users', { + schema: 'public', + filters: [ + { column: 'status', operator: 'eq', value: 'active' } + ], + columns: ['id', 'name', 'email'], + sort: [ + { column: 'name', direction: 'asc' } + ], + limit: 10, + offset: 0 +}); + +// Read single record by ID +const user = await client.read('users', { + schema: 'public', + record_id: '123' +}); + +// Read with preloading +const posts = await client.read('posts', { + schema: 'public', + preload: [ + { + relation: 'user', + columns: ['id', 'name', 'email'] + }, + { + relation: 'comments', + filters: [ + { column: 'status', operator: 'eq', value: 'approved' } + ] + } + ] +}); +``` + +#### `create(entity: string, data: any, options?): Promise` +Create a new record. + +```typescript +const newUser = await client.create('users', { + name: 'John Doe', + email: 'john@example.com', + status: 'active' +}, { + schema: 'public' +}); +``` + +#### `update(entity: string, id: string, data: any, options?): Promise` +Update an existing record. + +```typescript +const updatedUser = await client.update('users', '123', { + name: 'John Updated', + email: 'john.new@example.com' +}, { + schema: 'public' +}); +``` + +#### `delete(entity: string, id: string, options?): Promise` +Delete a record. + +```typescript +await client.delete('users', '123', { + schema: 'public' +}); +``` + +#### `meta(entity: string, options?): Promise` +Get metadata for an entity. + +```typescript +const metadata = await client.meta('users', { + schema: 'public' +}); +console.log('Columns:', metadata.columns); +console.log('Primary key:', metadata.primary_key); +``` + +### Subscriptions + +#### `subscribe(entity: string, callback: Function, options?): Promise` +Subscribe to entity changes. + +```typescript +const subscriptionId = await client.subscribe( + 'users', + (notification) => { + console.log('Operation:', notification.operation); // 'create', 'update', or 'delete' + console.log('Data:', notification.data); + console.log('Timestamp:', notification.timestamp); + }, + { + schema: 'public', + filters: [ + { column: 'status', operator: 'eq', value: 'active' } + ] + } +); +``` + +#### `unsubscribe(subscriptionId: string): Promise` +Unsubscribe from entity changes. + +```typescript +await client.unsubscribe(subscriptionId); +``` + +#### `getSubscriptions(): Subscription[]` +Get list of active subscriptions. + +```typescript +const subscriptions = client.getSubscriptions(); +console.log('Active subscriptions:', subscriptions.length); +``` + +### Event Handling + +#### `on(event: string, callback: Function): void` +Add event listener. + +```typescript +// Connection events +client.on('connect', () => { + console.log('Connected!'); +}); + +client.on('disconnect', (event) => { + console.log('Disconnected:', event.code, event.reason); +}); + +client.on('error', (error) => { + console.error('Error:', error); +}); + +// State changes +client.on('stateChange', (state) => { + console.log('State:', state); +}); + +// All messages +client.on('message', (message) => { + console.log('Message:', message); +}); +``` + +#### `off(event: string): void` +Remove event listener. + +```typescript +client.off('connect'); +``` + +## Filter Operators + +- `eq` - Equal (=) +- `neq` - Not Equal (!=) +- `gt` - Greater Than (>) +- `gte` - Greater Than or Equal (>=) +- `lt` - Less Than (<) +- `lte` - Less Than or Equal (<=) +- `like` - LIKE (case-sensitive) +- `ilike` - ILIKE (case-insensitive) +- `in` - IN (array of values) + +## Examples + +### Basic CRUD + +```typescript +const client = new WebSocketClient({ url: 'ws://localhost:8080/ws' }); +await client.connect(); + +// Create +const user = await client.create('users', { + name: 'Alice', + email: 'alice@example.com' +}); + +// Read +const users = await client.read('users', { + filters: [{ column: 'status', operator: 'eq', value: 'active' }] +}); + +// Update +await client.update('users', user.id, { name: 'Alice Updated' }); + +// Delete +await client.delete('users', user.id); + +client.disconnect(); +``` + +### Real-Time Subscriptions + +```typescript +const client = new WebSocketClient({ url: 'ws://localhost:8080/ws' }); +await client.connect(); + +// Subscribe to all user changes +const subId = await client.subscribe('users', (notification) => { + switch (notification.operation) { + case 'create': + console.log('New user:', notification.data); + break; + case 'update': + console.log('User updated:', notification.data); + break; + case 'delete': + console.log('User deleted:', notification.data); + break; + } +}); + +// Later: unsubscribe +await client.unsubscribe(subId); +``` + +### React Integration + +```typescript +import { useEffect, useState } from 'react'; +import { WebSocketClient } from '@warkypublic/resolvespec-js'; + +function useWebSocket(url: string) { + const [client] = useState(() => new WebSocketClient({ url })); + const [isConnected, setIsConnected] = useState(false); + + useEffect(() => { + client.on('connect', () => setIsConnected(true)); + client.on('disconnect', () => setIsConnected(false)); + client.connect(); + + return () => client.disconnect(); + }, [client]); + + return { client, isConnected }; +} + +function UsersComponent() { + const { client, isConnected } = useWebSocket('ws://localhost:8080/ws'); + const [users, setUsers] = useState([]); + + useEffect(() => { + if (!isConnected) return; + + const loadUsers = async () => { + // Subscribe to changes + await client.subscribe('users', (notification) => { + if (notification.operation === 'create') { + setUsers(prev => [...prev, notification.data]); + } else if (notification.operation === 'update') { + setUsers(prev => prev.map(u => + u.id === notification.data.id ? notification.data : u + )); + } else if (notification.operation === 'delete') { + setUsers(prev => prev.filter(u => u.id !== notification.data.id)); + } + }); + + // Load initial data + const data = await client.read('users'); + setUsers(data); + }; + + loadUsers(); + }, [client, isConnected]); + + return ( +
+

Users {isConnected ? '🟢' : '🔴'}

+ {users.map(user => ( +
{user.name}
+ ))} +
+ ); +} +``` + +### TypeScript with Typed Models + +```typescript +interface User { + id: number; + name: string; + email: string; + status: 'active' | 'inactive'; +} + +interface Post { + id: number; + title: string; + content: string; + user_id: number; + user?: User; +} + +const client = new WebSocketClient({ url: 'ws://localhost:8080/ws' }); +await client.connect(); + +// Type-safe operations +const users = await client.read('users', { + filters: [{ column: 'status', operator: 'eq', value: 'active' }] +}); + +const newUser = await client.create('users', { + name: 'Bob', + email: 'bob@example.com', + status: 'active' +}); + +// Type-safe subscriptions +await client.subscribe( + 'posts', + (notification) => { + const post = notification.data as Post; + console.log('Post:', post.title); + } +); +``` + +### Error Handling + +```typescript +const client = new WebSocketClient({ + url: 'ws://localhost:8080/ws', + reconnect: true, + maxReconnectAttempts: 5 +}); + +client.on('error', (error) => { + console.error('Connection error:', error); +}); + +client.on('stateChange', (state) => { + console.log('State:', state); + if (state === 'reconnecting') { + console.log('Attempting to reconnect...'); + } +}); + +try { + await client.connect(); + + try { + const user = await client.read('users', { record_id: '999' }); + } catch (error) { + console.error('Record not found:', error); + } + + try { + await client.create('users', { /* invalid data */ }); + } catch (error) { + console.error('Validation failed:', error); + } + +} catch (error) { + console.error('Connection failed:', error); +} +``` + +### Multiple Subscriptions + +```typescript +const client = new WebSocketClient({ url: 'ws://localhost:8080/ws' }); +await client.connect(); + +// Subscribe to multiple entities +const userSub = await client.subscribe('users', (n) => { + console.log('[Users]', n.operation, n.data); +}); + +const postSub = await client.subscribe('posts', (n) => { + console.log('[Posts]', n.operation, n.data); +}, { + filters: [{ column: 'status', operator: 'eq', value: 'published' }] +}); + +const commentSub = await client.subscribe('comments', (n) => { + console.log('[Comments]', n.operation, n.data); +}); + +// Check active subscriptions +console.log('Active:', client.getSubscriptions().length); + +// Clean up +await client.unsubscribe(userSub); +await client.unsubscribe(postSub); +await client.unsubscribe(commentSub); +``` + +## Best Practices + +1. **Always Clean Up**: Call `disconnect()` when done to close the connection properly +2. **Use TypeScript**: Leverage type definitions for better type safety +3. **Handle Errors**: Always wrap operations in try-catch blocks +4. **Limit Subscriptions**: Don't create too many subscriptions per connection +5. **Use Filters**: Apply filters to subscriptions to reduce unnecessary notifications +6. **Connection State**: Check `isConnected()` before operations +7. **Event Listeners**: Remove event listeners when no longer needed with `off()` +8. **Reconnection**: Enable auto-reconnection for production apps + +## Browser Support + +- Chrome/Edge 88+ +- Firefox 85+ +- Safari 14+ +- Node.js 14.16+ + +## License + +MIT diff --git a/resolvespec-js/src/index.ts b/resolvespec-js/src/index.ts index e69de29..1a9aa90 100644 --- a/resolvespec-js/src/index.ts +++ b/resolvespec-js/src/index.ts @@ -0,0 +1,7 @@ +// Types +export * from './types'; +export * from './websocket-types'; + +// WebSocket Client +export { WebSocketClient } from './websocket-client'; +export type { WebSocketClient as default } from './websocket-client'; diff --git a/resolvespec-js/src/websocket-client.ts b/resolvespec-js/src/websocket-client.ts new file mode 100644 index 0000000..6482cc3 --- /dev/null +++ b/resolvespec-js/src/websocket-client.ts @@ -0,0 +1,487 @@ +import { v4 as uuidv4 } from 'uuid'; +import type { + WebSocketClientConfig, + WSMessage, + WSRequestMessage, + WSResponseMessage, + WSNotificationMessage, + WSOperation, + WSOptions, + Subscription, + SubscriptionOptions, + ConnectionState, + WebSocketClientEvents +} from './websocket-types'; + +export class WebSocketClient { + private ws: WebSocket | null = null; + private config: Required; + private messageHandlers: Map void> = new Map(); + private subscriptions: Map = new Map(); + private eventListeners: Partial = {}; + private state: ConnectionState = 'disconnected'; + private reconnectAttempts = 0; + private reconnectTimer: ReturnType | null = null; + private heartbeatTimer: ReturnType | null = null; + private isManualClose = false; + + constructor(config: WebSocketClientConfig) { + this.config = { + url: config.url, + reconnect: config.reconnect ?? true, + reconnectInterval: config.reconnectInterval ?? 3000, + maxReconnectAttempts: config.maxReconnectAttempts ?? 10, + heartbeatInterval: config.heartbeatInterval ?? 30000, + debug: config.debug ?? false + }; + } + + /** + * Connect to WebSocket server + */ + async connect(): Promise { + if (this.ws?.readyState === WebSocket.OPEN) { + this.log('Already connected'); + return; + } + + this.isManualClose = false; + this.setState('connecting'); + + return new Promise((resolve, reject) => { + try { + this.ws = new WebSocket(this.config.url); + + this.ws.onopen = () => { + this.log('Connected to WebSocket server'); + this.setState('connected'); + this.reconnectAttempts = 0; + this.startHeartbeat(); + this.emit('connect'); + resolve(); + }; + + this.ws.onmessage = (event) => { + this.handleMessage(event.data); + }; + + this.ws.onerror = (event) => { + this.log('WebSocket error:', event); + const error = new Error('WebSocket connection error'); + this.emit('error', error); + reject(error); + }; + + this.ws.onclose = (event) => { + this.log('WebSocket closed:', event.code, event.reason); + this.stopHeartbeat(); + this.setState('disconnected'); + this.emit('disconnect', event); + + // Attempt reconnection if enabled and not manually closed + if (this.config.reconnect && !this.isManualClose && this.reconnectAttempts < this.config.maxReconnectAttempts) { + this.reconnectAttempts++; + this.log(`Reconnection attempt ${this.reconnectAttempts}/${this.config.maxReconnectAttempts}`); + this.setState('reconnecting'); + + this.reconnectTimer = setTimeout(() => { + this.connect().catch((err) => { + this.log('Reconnection failed:', err); + }); + }, this.config.reconnectInterval); + } + }; + } catch (error) { + reject(error); + } + }); + } + + /** + * Disconnect from WebSocket server + */ + disconnect(): void { + this.isManualClose = true; + + if (this.reconnectTimer) { + clearTimeout(this.reconnectTimer); + this.reconnectTimer = null; + } + + this.stopHeartbeat(); + + if (this.ws) { + this.setState('disconnecting'); + this.ws.close(); + this.ws = null; + } + + this.setState('disconnected'); + this.messageHandlers.clear(); + } + + /** + * Send a CRUD request and wait for response + */ + async request( + operation: WSOperation, + entity: string, + options?: { + schema?: string; + record_id?: string; + data?: any; + options?: WSOptions; + } + ): Promise { + this.ensureConnected(); + + const id = uuidv4(); + const message: WSRequestMessage = { + id, + type: 'request', + operation, + entity, + schema: options?.schema, + record_id: options?.record_id, + data: options?.data, + options: options?.options + }; + + return new Promise((resolve, reject) => { + // Set up response handler + this.messageHandlers.set(id, (response: WSResponseMessage) => { + if (response.success) { + resolve(response.data); + } else { + reject(new Error(response.error?.message || 'Request failed')); + } + }); + + // Send message + this.send(message); + + // Timeout after 30 seconds + setTimeout(() => { + if (this.messageHandlers.has(id)) { + this.messageHandlers.delete(id); + reject(new Error('Request timeout')); + } + }, 30000); + }); + } + + /** + * Read records + */ + async read(entity: string, options?: { + schema?: string; + record_id?: string; + filters?: import('./types').FilterOption[]; + columns?: string[]; + sort?: import('./types').SortOption[]; + preload?: import('./types').PreloadOption[]; + limit?: number; + offset?: number; + }): Promise { + return this.request('read', entity, { + schema: options?.schema, + record_id: options?.record_id, + options: { + filters: options?.filters, + columns: options?.columns, + sort: options?.sort, + preload: options?.preload, + limit: options?.limit, + offset: options?.offset + } + }); + } + + /** + * Create a record + */ + async create(entity: string, data: any, options?: { + schema?: string; + }): Promise { + return this.request('create', entity, { + schema: options?.schema, + data + }); + } + + /** + * Update a record + */ + async update(entity: string, id: string, data: any, options?: { + schema?: string; + }): Promise { + return this.request('update', entity, { + schema: options?.schema, + record_id: id, + data + }); + } + + /** + * Delete a record + */ + async delete(entity: string, id: string, options?: { + schema?: string; + }): Promise { + await this.request('delete', entity, { + schema: options?.schema, + record_id: id + }); + } + + /** + * Get metadata for an entity + */ + async meta(entity: string, options?: { + schema?: string; + }): Promise { + return this.request('meta', entity, { + schema: options?.schema + }); + } + + /** + * Subscribe to entity changes + */ + async subscribe( + entity: string, + callback: (notification: WSNotificationMessage) => void, + options?: { + schema?: string; + filters?: import('./types').FilterOption[]; + } + ): Promise { + this.ensureConnected(); + + const id = uuidv4(); + const message: WSMessage = { + id, + type: 'subscription', + operation: 'subscribe', + entity, + schema: options?.schema, + options: { + filters: options?.filters + } + }; + + return new Promise((resolve, reject) => { + this.messageHandlers.set(id, (response: WSResponseMessage) => { + if (response.success && response.data?.subscription_id) { + const subscriptionId = response.data.subscription_id; + + // Store subscription + this.subscriptions.set(subscriptionId, { + id: subscriptionId, + entity, + schema: options?.schema, + options: { filters: options?.filters }, + callback + }); + + this.log(`Subscribed to ${entity} with ID: ${subscriptionId}`); + resolve(subscriptionId); + } else { + reject(new Error(response.error?.message || 'Subscription failed')); + } + }); + + this.send(message); + + // Timeout + setTimeout(() => { + if (this.messageHandlers.has(id)) { + this.messageHandlers.delete(id); + reject(new Error('Subscription timeout')); + } + }, 10000); + }); + } + + /** + * Unsubscribe from entity changes + */ + async unsubscribe(subscriptionId: string): Promise { + this.ensureConnected(); + + const id = uuidv4(); + const message: WSMessage = { + id, + type: 'subscription', + operation: 'unsubscribe', + subscription_id: subscriptionId + }; + + return new Promise((resolve, reject) => { + this.messageHandlers.set(id, (response: WSResponseMessage) => { + if (response.success) { + this.subscriptions.delete(subscriptionId); + this.log(`Unsubscribed from ${subscriptionId}`); + resolve(); + } else { + reject(new Error(response.error?.message || 'Unsubscribe failed')); + } + }); + + this.send(message); + + // Timeout + setTimeout(() => { + if (this.messageHandlers.has(id)) { + this.messageHandlers.delete(id); + reject(new Error('Unsubscribe timeout')); + } + }, 10000); + }); + } + + /** + * Get list of active subscriptions + */ + getSubscriptions(): Subscription[] { + return Array.from(this.subscriptions.values()); + } + + /** + * Get connection state + */ + getState(): ConnectionState { + return this.state; + } + + /** + * Check if connected + */ + isConnected(): boolean { + return this.ws?.readyState === WebSocket.OPEN; + } + + /** + * Add event listener + */ + on(event: K, callback: WebSocketClientEvents[K]): void { + this.eventListeners[event] = callback as any; + } + + /** + * Remove event listener + */ + off(event: K): void { + delete this.eventListeners[event]; + } + + // Private methods + + private handleMessage(data: string): void { + try { + const message: WSMessage = JSON.parse(data); + this.log('Received message:', message); + + this.emit('message', message); + + // Handle different message types + switch (message.type) { + case 'response': + this.handleResponse(message as WSResponseMessage); + break; + + case 'notification': + this.handleNotification(message as WSNotificationMessage); + break; + + case 'pong': + // Heartbeat response + break; + + default: + this.log('Unknown message type:', message.type); + } + } catch (error) { + this.log('Error parsing message:', error); + } + } + + private handleResponse(message: WSResponseMessage): void { + const handler = this.messageHandlers.get(message.id); + if (handler) { + handler(message); + this.messageHandlers.delete(message.id); + } + } + + private handleNotification(message: WSNotificationMessage): void { + const subscription = this.subscriptions.get(message.subscription_id); + if (subscription?.callback) { + subscription.callback(message); + } + } + + private send(message: WSMessage): void { + if (!this.ws || this.ws.readyState !== WebSocket.OPEN) { + throw new Error('WebSocket is not connected'); + } + + const data = JSON.stringify(message); + this.log('Sending message:', message); + this.ws.send(data); + } + + private startHeartbeat(): void { + if (this.heartbeatTimer) { + return; + } + + this.heartbeatTimer = setInterval(() => { + if (this.isConnected()) { + const pingMessage: WSMessage = { + id: uuidv4(), + type: 'ping' + }; + this.send(pingMessage); + } + }, this.config.heartbeatInterval); + } + + private stopHeartbeat(): void { + if (this.heartbeatTimer) { + clearInterval(this.heartbeatTimer); + this.heartbeatTimer = null; + } + } + + private setState(state: ConnectionState): void { + if (this.state !== state) { + this.state = state; + this.emit('stateChange', state); + } + } + + private ensureConnected(): void { + if (!this.isConnected()) { + throw new Error('WebSocket is not connected. Call connect() first.'); + } + } + + private emit( + event: K, + ...args: Parameters + ): void { + const listener = this.eventListeners[event]; + if (listener) { + (listener as any)(...args); + } + } + + private log(...args: any[]): void { + if (this.config.debug) { + console.log('[WebSocketClient]', ...args); + } + } +} + +export default WebSocketClient; diff --git a/resolvespec-js/src/websocket-examples.ts b/resolvespec-js/src/websocket-examples.ts new file mode 100644 index 0000000..576603d --- /dev/null +++ b/resolvespec-js/src/websocket-examples.ts @@ -0,0 +1,427 @@ +import { WebSocketClient } from './websocket-client'; +import type { WSNotificationMessage } from './websocket-types'; + +/** + * Example 1: Basic Usage + */ +export async function basicUsageExample() { + // Create client + const client = new WebSocketClient({ + url: 'ws://localhost:8080/ws', + reconnect: true, + debug: true + }); + + // Connect + await client.connect(); + + // Read users + const users = await client.read('users', { + schema: 'public', + filters: [ + { column: 'status', operator: 'eq', value: 'active' } + ], + limit: 10, + sort: [ + { column: 'name', direction: 'asc' } + ] + }); + + console.log('Users:', users); + + // Create a user + const newUser = await client.create('users', { + name: 'John Doe', + email: 'john@example.com', + status: 'active' + }, { schema: 'public' }); + + console.log('Created user:', newUser); + + // Update user + const updatedUser = await client.update('users', '123', { + name: 'John Updated' + }, { schema: 'public' }); + + console.log('Updated user:', updatedUser); + + // Delete user + await client.delete('users', '123', { schema: 'public' }); + + // Disconnect + client.disconnect(); +} + +/** + * Example 2: Real-time Subscriptions + */ +export async function subscriptionExample() { + const client = new WebSocketClient({ + url: 'ws://localhost:8080/ws', + debug: true + }); + + await client.connect(); + + // Subscribe to user changes + const subscriptionId = await client.subscribe( + 'users', + (notification: WSNotificationMessage) => { + console.log('User changed:', notification.operation, notification.data); + + switch (notification.operation) { + case 'create': + console.log('New user created:', notification.data); + break; + case 'update': + console.log('User updated:', notification.data); + break; + case 'delete': + console.log('User deleted:', notification.data); + break; + } + }, + { + schema: 'public', + filters: [ + { column: 'status', operator: 'eq', value: 'active' } + ] + } + ); + + console.log('Subscribed with ID:', subscriptionId); + + // Later: unsubscribe + setTimeout(async () => { + await client.unsubscribe(subscriptionId); + console.log('Unsubscribed'); + client.disconnect(); + }, 60000); +} + +/** + * Example 3: Event Handling + */ +export async function eventHandlingExample() { + const client = new WebSocketClient({ + url: 'ws://localhost:8080/ws' + }); + + // Listen to connection events + client.on('connect', () => { + console.log('Connected!'); + }); + + client.on('disconnect', (event) => { + console.log('Disconnected:', event.code, event.reason); + }); + + client.on('error', (error) => { + console.error('WebSocket error:', error); + }); + + client.on('stateChange', (state) => { + console.log('State changed to:', state); + }); + + client.on('message', (message) => { + console.log('Received message:', message); + }); + + await client.connect(); + + // Your operations here... +} + +/** + * Example 4: Multiple Subscriptions + */ +export async function multipleSubscriptionsExample() { + const client = new WebSocketClient({ + url: 'ws://localhost:8080/ws', + debug: true + }); + + await client.connect(); + + // Subscribe to users + const userSubId = await client.subscribe( + 'users', + (notification) => { + console.log('[Users]', notification.operation, notification.data); + }, + { schema: 'public' } + ); + + // Subscribe to posts + const postSubId = await client.subscribe( + 'posts', + (notification) => { + console.log('[Posts]', notification.operation, notification.data); + }, + { + schema: 'public', + filters: [ + { column: 'status', operator: 'eq', value: 'published' } + ] + } + ); + + // Subscribe to comments + const commentSubId = await client.subscribe( + 'comments', + (notification) => { + console.log('[Comments]', notification.operation, notification.data); + }, + { schema: 'public' } + ); + + console.log('Active subscriptions:', client.getSubscriptions()); + + // Clean up after 60 seconds + setTimeout(async () => { + await client.unsubscribe(userSubId); + await client.unsubscribe(postSubId); + await client.unsubscribe(commentSubId); + client.disconnect(); + }, 60000); +} + +/** + * Example 5: Advanced Queries + */ +export async function advancedQueriesExample() { + const client = new WebSocketClient({ + url: 'ws://localhost:8080/ws' + }); + + await client.connect(); + + // Complex query with filters, sorting, pagination, and preloading + const posts = await client.read('posts', { + schema: 'public', + filters: [ + { column: 'status', operator: 'eq', value: 'published' }, + { column: 'views', operator: 'gte', value: 100 } + ], + columns: ['id', 'title', 'content', 'user_id', 'created_at'], + sort: [ + { column: 'created_at', direction: 'desc' }, + { column: 'views', direction: 'desc' } + ], + preload: [ + { + relation: 'user', + columns: ['id', 'name', 'email'] + }, + { + relation: 'comments', + columns: ['id', 'content', 'user_id'], + filters: [ + { column: 'status', operator: 'eq', value: 'approved' } + ] + } + ], + limit: 20, + offset: 0 + }); + + console.log('Posts:', posts); + + // Get single record by ID + const post = await client.read('posts', { + schema: 'public', + record_id: '123' + }); + + console.log('Single post:', post); + + client.disconnect(); +} + +/** + * Example 6: Error Handling + */ +export async function errorHandlingExample() { + const client = new WebSocketClient({ + url: 'ws://localhost:8080/ws', + reconnect: true, + maxReconnectAttempts: 5 + }); + + client.on('error', (error) => { + console.error('Connection error:', error); + }); + + client.on('stateChange', (state) => { + console.log('Connection state:', state); + }); + + try { + await client.connect(); + + try { + // Try to read non-existent entity + await client.read('nonexistent', { schema: 'public' }); + } catch (error) { + console.error('Read error:', error); + } + + try { + // Try to create invalid record + await client.create('users', { + // Missing required fields + }, { schema: 'public' }); + } catch (error) { + console.error('Create error:', error); + } + + } catch (error) { + console.error('Connection failed:', error); + } finally { + client.disconnect(); + } +} + +/** + * Example 7: React Integration + */ +export function reactIntegrationExample() { + const exampleCode = ` +import { useEffect, useState } from 'react'; +import { WebSocketClient } from '@warkypublic/resolvespec-js'; + +export function useWebSocket(url: string) { + const [client] = useState(() => new WebSocketClient({ url })); + const [isConnected, setIsConnected] = useState(false); + + useEffect(() => { + client.on('connect', () => setIsConnected(true)); + client.on('disconnect', () => setIsConnected(false)); + + client.connect(); + + return () => { + client.disconnect(); + }; + }, [client]); + + return { client, isConnected }; +} + +export function UsersComponent() { + const { client, isConnected } = useWebSocket('ws://localhost:8080/ws'); + const [users, setUsers] = useState([]); + + useEffect(() => { + if (!isConnected) return; + + // Subscribe to user changes + const subscribeToUsers = async () => { + const subId = await client.subscribe('users', (notification) => { + if (notification.operation === 'create') { + setUsers(prev => [...prev, notification.data]); + } else if (notification.operation === 'update') { + setUsers(prev => prev.map(u => + u.id === notification.data.id ? notification.data : u + )); + } else if (notification.operation === 'delete') { + setUsers(prev => prev.filter(u => u.id !== notification.data.id)); + } + }, { schema: 'public' }); + + // Load initial users + const initialUsers = await client.read('users', { + schema: 'public', + filters: [{ column: 'status', operator: 'eq', value: 'active' }] + }); + setUsers(initialUsers); + + return () => client.unsubscribe(subId); + }; + + subscribeToUsers(); + }, [client, isConnected]); + + const createUser = async (name: string, email: string) => { + await client.create('users', { name, email, status: 'active' }, { + schema: 'public' + }); + }; + + return ( +
+

Users ({users.length})

+ {isConnected ? '🟢 Connected' : '🔴 Disconnected'} + {/* Render users... */} +
+ ); +} +`; + + console.log(exampleCode); +} + +/** + * Example 8: TypeScript with Typed Models + */ +export async function typedModelsExample() { + // Define your models + interface User { + id: number; + name: string; + email: string; + status: 'active' | 'inactive'; + created_at: string; + } + + interface Post { + id: number; + title: string; + content: string; + user_id: number; + status: 'draft' | 'published'; + views: number; + user?: User; + } + + const client = new WebSocketClient({ + url: 'ws://localhost:8080/ws' + }); + + await client.connect(); + + // Type-safe operations + const users = await client.read('users', { + schema: 'public', + filters: [{ column: 'status', operator: 'eq', value: 'active' }] + }); + + const newUser = await client.create('users', { + name: 'Alice', + email: 'alice@example.com', + status: 'active' + }, { schema: 'public' }); + + const posts = await client.read('posts', { + schema: 'public', + preload: [ + { + relation: 'user', + columns: ['id', 'name', 'email'] + } + ] + }); + + // Type-safe subscriptions + await client.subscribe( + 'users', + (notification) => { + const user = notification.data as User; + console.log('User changed:', user.name, user.email); + }, + { schema: 'public' } + ); + + client.disconnect(); +} diff --git a/resolvespec-js/src/websocket-types.ts b/resolvespec-js/src/websocket-types.ts new file mode 100644 index 0000000..29fc34d --- /dev/null +++ b/resolvespec-js/src/websocket-types.ts @@ -0,0 +1,110 @@ +// WebSocket Message Types +export type MessageType = 'request' | 'response' | 'notification' | 'subscription' | 'error' | 'ping' | 'pong'; +export type WSOperation = 'read' | 'create' | 'update' | 'delete' | 'subscribe' | 'unsubscribe' | 'meta'; + +// Re-export common types +export type { FilterOption, SortOption, PreloadOption, Operator, SortDirection } from './types'; + +export interface WSOptions { + filters?: import('./types').FilterOption[]; + columns?: string[]; + preload?: import('./types').PreloadOption[]; + sort?: import('./types').SortOption[]; + limit?: number; + offset?: number; +} + +export interface WSMessage { + id?: string; + type: MessageType; + operation?: WSOperation; + schema?: string; + entity?: string; + record_id?: string; + data?: any; + options?: WSOptions; + subscription_id?: string; + success?: boolean; + error?: WSErrorInfo; + metadata?: Record; + timestamp?: string; +} + +export interface WSErrorInfo { + code: string; + message: string; + details?: Record; +} + +export interface WSRequestMessage { + id: string; + type: 'request'; + operation: WSOperation; + schema?: string; + entity: string; + record_id?: string; + data?: any; + options?: WSOptions; +} + +export interface WSResponseMessage { + id: string; + type: 'response'; + success: boolean; + data?: any; + error?: WSErrorInfo; + metadata?: Record; + timestamp: string; +} + +export interface WSNotificationMessage { + type: 'notification'; + operation: WSOperation; + subscription_id: string; + schema?: string; + entity: string; + data: any; + timestamp: string; +} + +export interface WSSubscriptionMessage { + id: string; + type: 'subscription'; + operation: 'subscribe' | 'unsubscribe'; + schema?: string; + entity: string; + options?: WSOptions; + subscription_id?: string; +} + +export interface SubscriptionOptions { + filters?: import('./types').FilterOption[]; + onNotification?: (notification: WSNotificationMessage) => void; +} + +export interface WebSocketClientConfig { + url: string; + reconnect?: boolean; + reconnectInterval?: number; + maxReconnectAttempts?: number; + heartbeatInterval?: number; + debug?: boolean; +} + +export interface Subscription { + id: string; + entity: string; + schema?: string; + options?: WSOptions; + callback?: (notification: WSNotificationMessage) => void; +} + +export type ConnectionState = 'connecting' | 'connected' | 'disconnecting' | 'disconnected' | 'reconnecting'; + +export interface WebSocketClientEvents { + connect: () => void; + disconnect: (event: CloseEvent) => void; + error: (error: Error) => void; + message: (message: WSMessage) => void; + stateChange: (state: ConnectionState) => void; +}