mirror of
https://github.com/bitechdev/ResolveSpec.git
synced 2026-01-05 11:24:26 +00:00
Websocket spec fixes
This commit is contained in:
@@ -116,17 +116,21 @@ func (cm *ConnectionManager) Run() {
|
|||||||
case conn := <-cm.register:
|
case conn := <-cm.register:
|
||||||
cm.mu.Lock()
|
cm.mu.Lock()
|
||||||
cm.connections[conn.ID] = conn
|
cm.connections[conn.ID] = conn
|
||||||
|
count := len(cm.connections)
|
||||||
cm.mu.Unlock()
|
cm.mu.Unlock()
|
||||||
logger.Info("[WebSocketSpec] Connection registered: %s (total: %d)", conn.ID, cm.Count())
|
logger.Info("[WebSocketSpec] Connection registered: %s (total: %d)", conn.ID, count)
|
||||||
|
|
||||||
case conn := <-cm.unregister:
|
case conn := <-cm.unregister:
|
||||||
cm.mu.Lock()
|
cm.mu.Lock()
|
||||||
if _, ok := cm.connections[conn.ID]; ok {
|
if _, ok := cm.connections[conn.ID]; ok {
|
||||||
delete(cm.connections, conn.ID)
|
delete(cm.connections, conn.ID)
|
||||||
close(conn.send)
|
close(conn.send)
|
||||||
logger.Info("[WebSocketSpec] Connection unregistered: %s (total: %d)", conn.ID, cm.Count())
|
count := len(cm.connections)
|
||||||
|
cm.mu.Unlock()
|
||||||
|
logger.Info("[WebSocketSpec] Connection unregistered: %s (total: %d)", conn.ID, count)
|
||||||
|
} else {
|
||||||
|
cm.mu.Unlock()
|
||||||
}
|
}
|
||||||
cm.mu.Unlock()
|
|
||||||
|
|
||||||
case msg := <-cm.broadcast:
|
case msg := <-cm.broadcast:
|
||||||
cm.mu.RLock()
|
cm.mu.RLock()
|
||||||
@@ -296,13 +300,19 @@ func (c *Connection) SendJSON(v interface{}) error {
|
|||||||
// Close closes the connection
|
// Close closes the connection
|
||||||
func (c *Connection) Close() {
|
func (c *Connection) Close() {
|
||||||
c.closedOnce.Do(func() {
|
c.closedOnce.Do(func() {
|
||||||
c.cancel()
|
if c.cancel != nil {
|
||||||
c.ws.Close()
|
c.cancel()
|
||||||
|
}
|
||||||
|
if c.ws != nil {
|
||||||
|
c.ws.Close()
|
||||||
|
}
|
||||||
|
|
||||||
// Clean up subscriptions
|
// Clean up subscriptions
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
for subID := range c.subscriptions {
|
for subID := range c.subscriptions {
|
||||||
c.handler.subscriptionManager.Unsubscribe(subID)
|
if c.handler != nil && c.handler.subscriptionManager != nil {
|
||||||
|
c.handler.subscriptionManager.Unsubscribe(subID)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
c.subscriptions = make(map[string]*Subscription)
|
c.subscriptions = make(map[string]*Subscription)
|
||||||
c.mu.Unlock()
|
c.mu.Unlock()
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package websocketspec
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/bitechdev/ResolveSpec/pkg/common"
|
"github.com/bitechdev/ResolveSpec/pkg/common"
|
||||||
@@ -344,6 +345,7 @@ func TestNewHandler(t *testing.T) {
|
|||||||
mockRegistry := &MockModelRegistry{}
|
mockRegistry := &MockModelRegistry{}
|
||||||
|
|
||||||
handler := NewHandler(mockDB, mockRegistry)
|
handler := NewHandler(mockDB, mockRegistry)
|
||||||
|
defer handler.Shutdown()
|
||||||
|
|
||||||
assert.NotNil(t, handler)
|
assert.NotNil(t, handler)
|
||||||
assert.NotNil(t, handler.db)
|
assert.NotNil(t, handler.db)
|
||||||
@@ -358,6 +360,7 @@ func TestHandler_Hooks(t *testing.T) {
|
|||||||
mockDB := &MockDatabase{}
|
mockDB := &MockDatabase{}
|
||||||
mockRegistry := &MockModelRegistry{}
|
mockRegistry := &MockModelRegistry{}
|
||||||
handler := NewHandler(mockDB, mockRegistry)
|
handler := NewHandler(mockDB, mockRegistry)
|
||||||
|
defer handler.Shutdown()
|
||||||
|
|
||||||
hooks := handler.Hooks()
|
hooks := handler.Hooks()
|
||||||
assert.NotNil(t, hooks)
|
assert.NotNil(t, hooks)
|
||||||
@@ -368,6 +371,7 @@ func TestHandler_Registry(t *testing.T) {
|
|||||||
mockDB := &MockDatabase{}
|
mockDB := &MockDatabase{}
|
||||||
mockRegistry := &MockModelRegistry{}
|
mockRegistry := &MockModelRegistry{}
|
||||||
handler := NewHandler(mockDB, mockRegistry)
|
handler := NewHandler(mockDB, mockRegistry)
|
||||||
|
defer handler.Shutdown()
|
||||||
|
|
||||||
registry := handler.Registry()
|
registry := handler.Registry()
|
||||||
assert.NotNil(t, registry)
|
assert.NotNil(t, registry)
|
||||||
@@ -378,6 +382,7 @@ func TestHandler_GetDatabase(t *testing.T) {
|
|||||||
mockDB := &MockDatabase{}
|
mockDB := &MockDatabase{}
|
||||||
mockRegistry := &MockModelRegistry{}
|
mockRegistry := &MockModelRegistry{}
|
||||||
handler := NewHandler(mockDB, mockRegistry)
|
handler := NewHandler(mockDB, mockRegistry)
|
||||||
|
defer handler.Shutdown()
|
||||||
|
|
||||||
db := handler.GetDatabase()
|
db := handler.GetDatabase()
|
||||||
assert.NotNil(t, db)
|
assert.NotNil(t, db)
|
||||||
@@ -388,6 +393,7 @@ func TestHandler_GetConnectionCount(t *testing.T) {
|
|||||||
mockDB := &MockDatabase{}
|
mockDB := &MockDatabase{}
|
||||||
mockRegistry := &MockModelRegistry{}
|
mockRegistry := &MockModelRegistry{}
|
||||||
handler := NewHandler(mockDB, mockRegistry)
|
handler := NewHandler(mockDB, mockRegistry)
|
||||||
|
defer handler.Shutdown()
|
||||||
|
|
||||||
count := handler.GetConnectionCount()
|
count := handler.GetConnectionCount()
|
||||||
assert.Equal(t, 0, count)
|
assert.Equal(t, 0, count)
|
||||||
@@ -397,6 +403,7 @@ func TestHandler_GetSubscriptionCount(t *testing.T) {
|
|||||||
mockDB := &MockDatabase{}
|
mockDB := &MockDatabase{}
|
||||||
mockRegistry := &MockModelRegistry{}
|
mockRegistry := &MockModelRegistry{}
|
||||||
handler := NewHandler(mockDB, mockRegistry)
|
handler := NewHandler(mockDB, mockRegistry)
|
||||||
|
defer handler.Shutdown()
|
||||||
|
|
||||||
count := handler.GetSubscriptionCount()
|
count := handler.GetSubscriptionCount()
|
||||||
assert.Equal(t, 0, count)
|
assert.Equal(t, 0, count)
|
||||||
@@ -406,6 +413,7 @@ func TestHandler_GetConnection(t *testing.T) {
|
|||||||
mockDB := &MockDatabase{}
|
mockDB := &MockDatabase{}
|
||||||
mockRegistry := &MockModelRegistry{}
|
mockRegistry := &MockModelRegistry{}
|
||||||
handler := NewHandler(mockDB, mockRegistry)
|
handler := NewHandler(mockDB, mockRegistry)
|
||||||
|
defer handler.Shutdown()
|
||||||
|
|
||||||
// Non-existent connection
|
// Non-existent connection
|
||||||
_, exists := handler.GetConnection("non-existent")
|
_, exists := handler.GetConnection("non-existent")
|
||||||
@@ -416,6 +424,7 @@ func TestHandler_HandleMessage_InvalidType(t *testing.T) {
|
|||||||
mockDB := &MockDatabase{}
|
mockDB := &MockDatabase{}
|
||||||
mockRegistry := &MockModelRegistry{}
|
mockRegistry := &MockModelRegistry{}
|
||||||
handler := NewHandler(mockDB, mockRegistry)
|
handler := NewHandler(mockDB, mockRegistry)
|
||||||
|
defer handler.Shutdown()
|
||||||
|
|
||||||
conn := &Connection{
|
conn := &Connection{
|
||||||
ID: "conn-1",
|
ID: "conn-1",
|
||||||
@@ -431,25 +440,27 @@ func TestHandler_HandleMessage_InvalidType(t *testing.T) {
|
|||||||
|
|
||||||
handler.HandleMessage(conn, msg)
|
handler.HandleMessage(conn, msg)
|
||||||
|
|
||||||
|
// Shutdown handler properly
|
||||||
|
defer handler.Shutdown()
|
||||||
|
|
||||||
// Should send error response
|
// Should send error response
|
||||||
select {
|
select {
|
||||||
case data := <-conn.send:
|
case data := <-conn.send:
|
||||||
var response map[string]interface{}
|
var response ResponseMessage
|
||||||
require.NoError(t, ParseMessageBytes(data, &response))
|
err := json.Unmarshal(data, &response)
|
||||||
assert.False(t, response["success"].(bool))
|
require.NoError(t, err)
|
||||||
|
assert.False(t, response.Success)
|
||||||
|
assert.NotNil(t, response.Error)
|
||||||
default:
|
default:
|
||||||
t.Fatal("Expected error response")
|
t.Fatal("Expected error response")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func ParseMessageBytes(data []byte, v interface{}) error {
|
|
||||||
return nil // Simplified for testing
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestHandler_GetOperatorSQL(t *testing.T) {
|
func TestHandler_GetOperatorSQL(t *testing.T) {
|
||||||
mockDB := &MockDatabase{}
|
mockDB := &MockDatabase{}
|
||||||
mockRegistry := &MockModelRegistry{}
|
mockRegistry := &MockModelRegistry{}
|
||||||
handler := NewHandler(mockDB, mockRegistry)
|
handler := NewHandler(mockDB, mockRegistry)
|
||||||
|
defer handler.Shutdown()
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
operator string
|
operator string
|
||||||
@@ -479,6 +490,7 @@ func TestHandler_GetTableName(t *testing.T) {
|
|||||||
mockDB := &MockDatabase{}
|
mockDB := &MockDatabase{}
|
||||||
mockRegistry := &MockModelRegistry{}
|
mockRegistry := &MockModelRegistry{}
|
||||||
handler := NewHandler(mockDB, mockRegistry)
|
handler := NewHandler(mockDB, mockRegistry)
|
||||||
|
defer handler.Shutdown()
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
@@ -518,6 +530,7 @@ func TestHandler_GetMetadata(t *testing.T) {
|
|||||||
mockDB := &MockDatabase{}
|
mockDB := &MockDatabase{}
|
||||||
mockRegistry := &MockModelRegistry{}
|
mockRegistry := &MockModelRegistry{}
|
||||||
handler := NewHandler(mockDB, mockRegistry)
|
handler := NewHandler(mockDB, mockRegistry)
|
||||||
|
defer handler.Shutdown()
|
||||||
|
|
||||||
metadata := handler.getMetadata("public", "users", &TestUser{})
|
metadata := handler.getMetadata("public", "users", &TestUser{})
|
||||||
|
|
||||||
@@ -533,13 +546,19 @@ func TestHandler_NotifySubscribers(t *testing.T) {
|
|||||||
mockDB := &MockDatabase{}
|
mockDB := &MockDatabase{}
|
||||||
mockRegistry := &MockModelRegistry{}
|
mockRegistry := &MockModelRegistry{}
|
||||||
handler := NewHandler(mockDB, mockRegistry)
|
handler := NewHandler(mockDB, mockRegistry)
|
||||||
|
defer handler.Shutdown()
|
||||||
|
|
||||||
// Create connection
|
// Create connection
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
conn := &Connection{
|
conn := &Connection{
|
||||||
ID: "conn-1",
|
ID: "conn-1",
|
||||||
send: make(chan []byte, 256),
|
send: make(chan []byte, 256),
|
||||||
subscriptions: make(map[string]*Subscription),
|
subscriptions: make(map[string]*Subscription),
|
||||||
handler: handler,
|
handler: handler,
|
||||||
|
ctx: ctx,
|
||||||
|
cancel: cancel,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Register connection
|
// Register connection
|
||||||
@@ -566,6 +585,7 @@ func TestHandler_NotifySubscribers_NoSubscribers(t *testing.T) {
|
|||||||
mockDB := &MockDatabase{}
|
mockDB := &MockDatabase{}
|
||||||
mockRegistry := &MockModelRegistry{}
|
mockRegistry := &MockModelRegistry{}
|
||||||
handler := NewHandler(mockDB, mockRegistry)
|
handler := NewHandler(mockDB, mockRegistry)
|
||||||
|
defer handler.Shutdown()
|
||||||
|
|
||||||
// Notify with no subscribers - should not panic
|
// Notify with no subscribers - should not panic
|
||||||
data := map[string]interface{}{"id": 1, "name": "John"}
|
data := map[string]interface{}{"id": 1, "name": "John"}
|
||||||
@@ -578,6 +598,7 @@ func TestHandler_NotifySubscribers_ConnectionNotFound(t *testing.T) {
|
|||||||
mockDB := &MockDatabase{}
|
mockDB := &MockDatabase{}
|
||||||
mockRegistry := &MockModelRegistry{}
|
mockRegistry := &MockModelRegistry{}
|
||||||
handler := NewHandler(mockDB, mockRegistry)
|
handler := NewHandler(mockDB, mockRegistry)
|
||||||
|
defer handler.Shutdown()
|
||||||
|
|
||||||
// Create subscription without connection
|
// Create subscription without connection
|
||||||
handler.subscriptionManager.Subscribe("sub-1", "conn-1", "public", "users", nil)
|
handler.subscriptionManager.Subscribe("sub-1", "conn-1", "public", "users", nil)
|
||||||
@@ -593,6 +614,7 @@ func TestHandler_HooksIntegration(t *testing.T) {
|
|||||||
mockDB := &MockDatabase{}
|
mockDB := &MockDatabase{}
|
||||||
mockRegistry := &MockModelRegistry{}
|
mockRegistry := &MockModelRegistry{}
|
||||||
handler := NewHandler(mockDB, mockRegistry)
|
handler := NewHandler(mockDB, mockRegistry)
|
||||||
|
defer handler.Shutdown()
|
||||||
|
|
||||||
beforeCalled := false
|
beforeCalled := false
|
||||||
afterCalled := false
|
afterCalled := false
|
||||||
@@ -625,6 +647,7 @@ func TestHandler_Shutdown(t *testing.T) {
|
|||||||
mockDB := &MockDatabase{}
|
mockDB := &MockDatabase{}
|
||||||
mockRegistry := &MockModelRegistry{}
|
mockRegistry := &MockModelRegistry{}
|
||||||
handler := NewHandler(mockDB, mockRegistry)
|
handler := NewHandler(mockDB, mockRegistry)
|
||||||
|
defer handler.Shutdown()
|
||||||
|
|
||||||
// Shutdown should not panic
|
// Shutdown should not panic
|
||||||
handler.Shutdown()
|
handler.Shutdown()
|
||||||
@@ -642,6 +665,7 @@ func TestHandler_SubscriptionLifecycle(t *testing.T) {
|
|||||||
mockDB := &MockDatabase{}
|
mockDB := &MockDatabase{}
|
||||||
mockRegistry := &MockModelRegistry{}
|
mockRegistry := &MockModelRegistry{}
|
||||||
handler := NewHandler(mockDB, mockRegistry)
|
handler := NewHandler(mockDB, mockRegistry)
|
||||||
|
defer handler.Shutdown()
|
||||||
|
|
||||||
// Create connection
|
// Create connection
|
||||||
conn := &Connection{
|
conn := &Connection{
|
||||||
@@ -681,6 +705,7 @@ func TestHandler_UnsubscribeLifecycle(t *testing.T) {
|
|||||||
mockDB := &MockDatabase{}
|
mockDB := &MockDatabase{}
|
||||||
mockRegistry := &MockModelRegistry{}
|
mockRegistry := &MockModelRegistry{}
|
||||||
handler := NewHandler(mockDB, mockRegistry)
|
handler := NewHandler(mockDB, mockRegistry)
|
||||||
|
defer handler.Shutdown()
|
||||||
|
|
||||||
// Create connection
|
// Create connection
|
||||||
conn := &Connection{
|
conn := &Connection{
|
||||||
@@ -725,11 +750,17 @@ func TestHandler_HandlePing(t *testing.T) {
|
|||||||
mockDB := &MockDatabase{}
|
mockDB := &MockDatabase{}
|
||||||
mockRegistry := &MockModelRegistry{}
|
mockRegistry := &MockModelRegistry{}
|
||||||
handler := NewHandler(mockDB, mockRegistry)
|
handler := NewHandler(mockDB, mockRegistry)
|
||||||
|
defer handler.Shutdown()
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
conn := &Connection{
|
conn := &Connection{
|
||||||
ID: "conn-1",
|
ID: "conn-1",
|
||||||
send: make(chan []byte, 256),
|
send: make(chan []byte, 256),
|
||||||
subscriptions: make(map[string]*Subscription),
|
subscriptions: make(map[string]*Subscription),
|
||||||
|
ctx: ctx,
|
||||||
|
cancel: cancel,
|
||||||
}
|
}
|
||||||
|
|
||||||
msg := &Message{
|
msg := &Message{
|
||||||
@@ -752,6 +783,7 @@ func TestHandler_CompleteWorkflow(t *testing.T) {
|
|||||||
mockDB := &MockDatabase{}
|
mockDB := &MockDatabase{}
|
||||||
mockRegistry := &MockModelRegistry{}
|
mockRegistry := &MockModelRegistry{}
|
||||||
handler := NewHandler(mockDB, mockRegistry)
|
handler := NewHandler(mockDB, mockRegistry)
|
||||||
|
defer handler.Shutdown()
|
||||||
|
|
||||||
// Setup hooks (these are registered but not called in this test workflow)
|
// Setup hooks (these are registered but not called in this test workflow)
|
||||||
handler.Hooks().RegisterBefore(OperationCreate, func(ctx *HookContext) error {
|
handler.Hooks().RegisterBefore(OperationCreate, func(ctx *HookContext) error {
|
||||||
|
|||||||
Reference in New Issue
Block a user