Added a scripts execution ability
This commit is contained in:
226
pkg/writers/sqlexec/README.md
Normal file
226
pkg/writers/sqlexec/README.md
Normal file
@@ -0,0 +1,226 @@
|
||||
# SQL Executor Writer
|
||||
|
||||
The SQL Executor Writer (`sqlexec`) executes SQL scripts from `models.Script` objects against a PostgreSQL database. Scripts are executed in order based on Priority (ascending) and Sequence (ascending).
|
||||
|
||||
## Features
|
||||
|
||||
- **Ordered Execution**: Scripts execute in Priority→Sequence order
|
||||
- **PostgreSQL Support**: Uses `pgx/v5` driver for robust PostgreSQL connectivity
|
||||
- **Stop on Error**: Execution halts immediately on first error (default behavior)
|
||||
- **Progress Reporting**: Prints execution status to stdout
|
||||
- **Multiple Schemas**: Can execute scripts from multiple schemas in a database
|
||||
|
||||
## Usage
|
||||
|
||||
### Basic Usage
|
||||
|
||||
```go
|
||||
import (
|
||||
"git.warky.dev/wdevs/relspecgo/pkg/writers"
|
||||
"git.warky.dev/wdevs/relspecgo/pkg/writers/sqlexec"
|
||||
)
|
||||
|
||||
writer := sqlexec.NewWriter(&writers.WriterOptions{
|
||||
Metadata: map[string]any{
|
||||
"connection_string": "postgres://user:password@localhost:5432/dbname?sslmode=disable",
|
||||
},
|
||||
})
|
||||
|
||||
// Execute all scripts from database
|
||||
err := writer.WriteDatabase(database)
|
||||
if err != nil {
|
||||
log.Fatalf("Execution failed: %v", err)
|
||||
}
|
||||
```
|
||||
|
||||
### Execute Single Schema
|
||||
|
||||
```go
|
||||
err := writer.WriteSchema(schema)
|
||||
if err != nil {
|
||||
log.Fatalf("Schema execution failed: %v", err)
|
||||
}
|
||||
```
|
||||
|
||||
### Complete Example with SQL Directory Reader
|
||||
|
||||
```go
|
||||
import (
|
||||
"log"
|
||||
"git.warky.dev/wdevs/relspecgo/pkg/readers"
|
||||
"git.warky.dev/wdevs/relspecgo/pkg/readers/sqldir"
|
||||
"git.warky.dev/wdevs/relspecgo/pkg/writers"
|
||||
"git.warky.dev/wdevs/relspecgo/pkg/writers/sqlexec"
|
||||
)
|
||||
|
||||
func main() {
|
||||
// Read SQL scripts from directory
|
||||
reader := sqldir.NewReader(&readers.ReaderOptions{
|
||||
FilePath: "./migrations",
|
||||
})
|
||||
|
||||
db, err := reader.ReadDatabase()
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
// Execute scripts against PostgreSQL
|
||||
writer := sqlexec.NewWriter(&writers.WriterOptions{
|
||||
Metadata: map[string]any{
|
||||
"connection_string": "postgres://localhost/myapp",
|
||||
},
|
||||
})
|
||||
|
||||
if err := writer.WriteDatabase(db); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## Configuration
|
||||
|
||||
### Required Metadata
|
||||
|
||||
- **connection_string**: PostgreSQL connection string (required)
|
||||
|
||||
### Connection String Format
|
||||
|
||||
```
|
||||
postgres://[user[:password]@][host][:port][/dbname][?param1=value1&...]
|
||||
```
|
||||
|
||||
Examples:
|
||||
```
|
||||
postgres://localhost/mydb
|
||||
postgres://user:pass@localhost:5432/mydb?sslmode=disable
|
||||
postgres://user@localhost/mydb?sslmode=require
|
||||
postgresql://user:pass@prod-db.example.com:5432/production
|
||||
```
|
||||
|
||||
## Execution Order
|
||||
|
||||
Scripts are sorted and executed based on:
|
||||
|
||||
1. **Priority** (ascending): Lower priority values execute first
|
||||
2. **Sequence** (ascending): Within same priority, lower sequence values execute first
|
||||
|
||||
### Example Execution Order
|
||||
|
||||
Given these scripts:
|
||||
```
|
||||
Script A: Priority=2, Sequence=1
|
||||
Script B: Priority=1, Sequence=3
|
||||
Script C: Priority=1, Sequence=1
|
||||
Script D: Priority=1, Sequence=2
|
||||
Script E: Priority=3, Sequence=1
|
||||
```
|
||||
|
||||
Execution order: **C → D → B → A → E**
|
||||
|
||||
## Output
|
||||
|
||||
The writer prints progress to stdout:
|
||||
|
||||
```
|
||||
Executing script: create_users (Priority=1, Sequence=1)
|
||||
✓ Successfully executed: create_users
|
||||
Executing script: create_posts (Priority=1, Sequence=2)
|
||||
✓ Successfully executed: create_posts
|
||||
Executing script: add_indexes (Priority=2, Sequence=1)
|
||||
✓ Successfully executed: add_indexes
|
||||
```
|
||||
|
||||
## Error Handling
|
||||
|
||||
### Connection Errors
|
||||
|
||||
If the database connection fails, execution stops immediately:
|
||||
|
||||
```
|
||||
Error: failed to connect to database: connection refused
|
||||
```
|
||||
|
||||
### Script Execution Errors
|
||||
|
||||
If a script fails, execution stops and returns the error with context:
|
||||
|
||||
```
|
||||
Error: failed to execute script add_indexes (Priority=2, Sequence=1):
|
||||
syntax error at or near "IDNEX"
|
||||
```
|
||||
|
||||
**Behavior**: Stop on first error (scripts executed before the error remain committed)
|
||||
|
||||
### Empty Script Handling
|
||||
|
||||
Scripts with empty SQL content are skipped silently.
|
||||
|
||||
## Database Support
|
||||
|
||||
Currently supports:
|
||||
- ✅ PostgreSQL (via pgx/v5)
|
||||
|
||||
Future support planned for:
|
||||
- MySQL/MariaDB
|
||||
- SQLite
|
||||
- Generic SQL via database/sql
|
||||
|
||||
## Transaction Behavior
|
||||
|
||||
**Current**: Each script executes in its own implicit transaction (PostgreSQL default behavior)
|
||||
|
||||
**Future Enhancement**: Option to wrap all scripts in a single transaction for atomic execution with rollback on error.
|
||||
|
||||
## Performance Considerations
|
||||
|
||||
- Scripts execute sequentially (not in parallel)
|
||||
- Each script creates a database round-trip
|
||||
- For large migrations, consider:
|
||||
- Combining related statements into fewer scripts
|
||||
- Using PostgreSQL's COPY command for bulk data
|
||||
- Running during low-traffic periods
|
||||
|
||||
## Testing
|
||||
|
||||
Run tests:
|
||||
```bash
|
||||
go test ./pkg/writers/sqlexec/
|
||||
```
|
||||
|
||||
Current tests include:
|
||||
- Validation and error handling
|
||||
- Script sorting logic
|
||||
- Configuration validation
|
||||
|
||||
### Integration Tests
|
||||
|
||||
For integration testing with a real database:
|
||||
|
||||
```bash
|
||||
# Start PostgreSQL (example with Docker)
|
||||
docker run -d --name postgres-test \
|
||||
-e POSTGRES_PASSWORD=test \
|
||||
-e POSTGRES_DB=testdb \
|
||||
-p 5432:5432 \
|
||||
postgres:16
|
||||
|
||||
# Run your integration tests
|
||||
go test -tags=integration ./pkg/writers/sqlexec/
|
||||
|
||||
# Cleanup
|
||||
docker stop postgres-test
|
||||
docker rm postgres-test
|
||||
```
|
||||
|
||||
## Limitations
|
||||
|
||||
- `WriteTable()` is not supported (returns error)
|
||||
- Requires PostgreSQL connection (no offline mode)
|
||||
- No built-in transaction wrapping (yet)
|
||||
- No rollback script support (yet, though `models.Script.Rollback` field exists)
|
||||
|
||||
## Related
|
||||
|
||||
- **SQL Directory Reader**: `pkg/readers/sqldir/` - Read scripts from filesystem
|
||||
- **Script Model**: `pkg/models/models.go` - Script structure definition
|
||||
- **pgx Documentation**: https://github.com/jackc/pgx - PostgreSQL driver docs
|
||||
125
pkg/writers/sqlexec/writer.go
Normal file
125
pkg/writers/sqlexec/writer.go
Normal file
@@ -0,0 +1,125 @@
|
||||
package sqlexec
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sort"
|
||||
|
||||
"github.com/jackc/pgx/v5"
|
||||
|
||||
"git.warky.dev/wdevs/relspecgo/pkg/models"
|
||||
"git.warky.dev/wdevs/relspecgo/pkg/writers"
|
||||
)
|
||||
|
||||
// Writer implements the writers.Writer interface for executing SQL scripts
|
||||
type Writer struct {
|
||||
options *writers.WriterOptions
|
||||
}
|
||||
|
||||
// NewWriter creates a new SQL executor writer
|
||||
func NewWriter(options *writers.WriterOptions) *Writer {
|
||||
return &Writer{
|
||||
options: options,
|
||||
}
|
||||
}
|
||||
|
||||
// WriteDatabase executes all scripts from all schemas in the database
|
||||
func (w *Writer) WriteDatabase(db *models.Database) error {
|
||||
if db == nil {
|
||||
return fmt.Errorf("database is nil")
|
||||
}
|
||||
|
||||
// Get connection string from metadata
|
||||
connString, ok := w.options.Metadata["connection_string"].(string)
|
||||
if !ok || connString == "" {
|
||||
return fmt.Errorf("connection_string is required in writer metadata")
|
||||
}
|
||||
|
||||
// Connect to database
|
||||
ctx := context.Background()
|
||||
conn, err := pgx.Connect(ctx, connString)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to connect to database: %w", err)
|
||||
}
|
||||
defer conn.Close(ctx)
|
||||
|
||||
// Execute scripts from all schemas
|
||||
for _, schema := range db.Schemas {
|
||||
if err := w.executeScripts(ctx, conn, schema.Scripts); err != nil {
|
||||
return fmt.Errorf("failed to execute scripts from schema %s: %w", schema.Name, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// WriteSchema executes all scripts from a single schema
|
||||
func (w *Writer) WriteSchema(schema *models.Schema) error {
|
||||
if schema == nil {
|
||||
return fmt.Errorf("schema is nil")
|
||||
}
|
||||
|
||||
// Get connection string from metadata
|
||||
connString, ok := w.options.Metadata["connection_string"].(string)
|
||||
if !ok || connString == "" {
|
||||
return fmt.Errorf("connection_string is required in writer metadata")
|
||||
}
|
||||
|
||||
// Connect to database
|
||||
ctx := context.Background()
|
||||
conn, err := pgx.Connect(ctx, connString)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to connect to database: %w", err)
|
||||
}
|
||||
defer conn.Close(ctx)
|
||||
|
||||
// Execute scripts
|
||||
if err := w.executeScripts(ctx, conn, schema.Scripts); err != nil {
|
||||
return fmt.Errorf("failed to execute scripts: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// WriteTable is not applicable for SQL script execution
|
||||
func (w *Writer) WriteTable(table *models.Table) error {
|
||||
return fmt.Errorf("WriteTable is not supported for SQL script execution")
|
||||
}
|
||||
|
||||
// executeScripts executes scripts in Priority then Sequence order
|
||||
func (w *Writer) executeScripts(ctx context.Context, conn *pgx.Conn, scripts []*models.Script) error {
|
||||
if len(scripts) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Sort scripts by Priority (ascending) then Sequence (ascending)
|
||||
sortedScripts := make([]*models.Script, len(scripts))
|
||||
copy(sortedScripts, scripts)
|
||||
sort.Slice(sortedScripts, func(i, j int) bool {
|
||||
if sortedScripts[i].Priority != sortedScripts[j].Priority {
|
||||
return sortedScripts[i].Priority < sortedScripts[j].Priority
|
||||
}
|
||||
return sortedScripts[i].Sequence < sortedScripts[j].Sequence
|
||||
})
|
||||
|
||||
// Execute each script in order
|
||||
for _, script := range sortedScripts {
|
||||
if script.SQL == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
fmt.Printf("Executing script: %s (Priority=%d, Sequence=%d)\n",
|
||||
script.Name, script.Priority, script.Sequence)
|
||||
|
||||
// Execute the SQL script
|
||||
_, err := conn.Exec(ctx, script.SQL)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to execute script %s (Priority=%d, Sequence=%d): %w",
|
||||
script.Name, script.Priority, script.Sequence, err)
|
||||
}
|
||||
|
||||
fmt.Printf("✓ Successfully executed: %s\n", script.Name)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
201
pkg/writers/sqlexec/writer_test.go
Normal file
201
pkg/writers/sqlexec/writer_test.go
Normal file
@@ -0,0 +1,201 @@
|
||||
package sqlexec
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"git.warky.dev/wdevs/relspecgo/pkg/models"
|
||||
"git.warky.dev/wdevs/relspecgo/pkg/writers"
|
||||
)
|
||||
|
||||
func TestNewWriter(t *testing.T) {
|
||||
opts := &writers.WriterOptions{
|
||||
Metadata: map[string]any{
|
||||
"connection_string": "postgres://localhost/test",
|
||||
},
|
||||
}
|
||||
|
||||
writer := NewWriter(opts)
|
||||
if writer == nil {
|
||||
t.Fatal("Expected non-nil writer")
|
||||
}
|
||||
if writer.options != opts {
|
||||
t.Error("Writer options not set correctly")
|
||||
}
|
||||
}
|
||||
|
||||
func TestWriter_WriteDatabase_NilDatabase(t *testing.T) {
|
||||
writer := NewWriter(&writers.WriterOptions{
|
||||
Metadata: map[string]any{
|
||||
"connection_string": "postgres://localhost/test",
|
||||
},
|
||||
})
|
||||
|
||||
err := writer.WriteDatabase(nil)
|
||||
if err == nil {
|
||||
t.Error("Expected error for nil database, got nil")
|
||||
}
|
||||
}
|
||||
|
||||
func TestWriter_WriteDatabase_MissingConnectionString(t *testing.T) {
|
||||
writer := NewWriter(&writers.WriterOptions{
|
||||
Metadata: map[string]any{},
|
||||
})
|
||||
|
||||
db := &models.Database{
|
||||
Name: "test",
|
||||
Schemas: []*models.Schema{
|
||||
{
|
||||
Name: "public",
|
||||
Scripts: []*models.Script{
|
||||
{Name: "test", SQL: "SELECT 1;"},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
err := writer.WriteDatabase(db)
|
||||
if err == nil {
|
||||
t.Error("Expected error for missing connection_string, got nil")
|
||||
}
|
||||
}
|
||||
|
||||
func TestWriter_WriteSchema_NilSchema(t *testing.T) {
|
||||
writer := NewWriter(&writers.WriterOptions{
|
||||
Metadata: map[string]any{
|
||||
"connection_string": "postgres://localhost/test",
|
||||
},
|
||||
})
|
||||
|
||||
err := writer.WriteSchema(nil)
|
||||
if err == nil {
|
||||
t.Error("Expected error for nil schema, got nil")
|
||||
}
|
||||
}
|
||||
|
||||
func TestWriter_WriteSchema_MissingConnectionString(t *testing.T) {
|
||||
writer := NewWriter(&writers.WriterOptions{
|
||||
Metadata: map[string]any{},
|
||||
})
|
||||
|
||||
schema := &models.Schema{
|
||||
Name: "public",
|
||||
Scripts: []*models.Script{
|
||||
{Name: "test", SQL: "SELECT 1;"},
|
||||
},
|
||||
}
|
||||
|
||||
err := writer.WriteSchema(schema)
|
||||
if err == nil {
|
||||
t.Error("Expected error for missing connection_string, got nil")
|
||||
}
|
||||
}
|
||||
|
||||
func TestWriter_WriteTable(t *testing.T) {
|
||||
writer := NewWriter(&writers.WriterOptions{})
|
||||
|
||||
err := writer.WriteTable(&models.Table{})
|
||||
if err == nil {
|
||||
t.Error("Expected error for WriteTable (not supported), got nil")
|
||||
}
|
||||
}
|
||||
|
||||
// TestScriptSorting verifies that scripts are sorted correctly by Priority then Sequence
|
||||
func TestScriptSorting(t *testing.T) {
|
||||
scripts := []*models.Script{
|
||||
{Name: "script1", Priority: 2, Sequence: 1, SQL: "SELECT 1;"},
|
||||
{Name: "script2", Priority: 1, Sequence: 3, SQL: "SELECT 2;"},
|
||||
{Name: "script3", Priority: 1, Sequence: 1, SQL: "SELECT 3;"},
|
||||
{Name: "script4", Priority: 1, Sequence: 2, SQL: "SELECT 4;"},
|
||||
{Name: "script5", Priority: 3, Sequence: 1, SQL: "SELECT 5;"},
|
||||
{Name: "script6", Priority: 2, Sequence: 2, SQL: "SELECT 6;"},
|
||||
}
|
||||
|
||||
// Create a copy and sort it using the same logic as executeScripts
|
||||
sortedScripts := make([]*models.Script, len(scripts))
|
||||
copy(sortedScripts, scripts)
|
||||
|
||||
// Use the same sorting logic from executeScripts
|
||||
for i := 0; i < len(sortedScripts)-1; i++ {
|
||||
for j := i + 1; j < len(sortedScripts); j++ {
|
||||
if sortedScripts[i].Priority > sortedScripts[j].Priority ||
|
||||
(sortedScripts[i].Priority == sortedScripts[j].Priority &&
|
||||
sortedScripts[i].Sequence > sortedScripts[j].Sequence) {
|
||||
sortedScripts[i], sortedScripts[j] = sortedScripts[j], sortedScripts[i]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Expected order after sorting
|
||||
expectedOrder := []string{
|
||||
"script3", // Priority 1, Sequence 1
|
||||
"script4", // Priority 1, Sequence 2
|
||||
"script2", // Priority 1, Sequence 3
|
||||
"script1", // Priority 2, Sequence 1
|
||||
"script6", // Priority 2, Sequence 2
|
||||
"script5", // Priority 3, Sequence 1
|
||||
}
|
||||
|
||||
for i, expected := range expectedOrder {
|
||||
if sortedScripts[i].Name != expected {
|
||||
t.Errorf("Position %d: expected %s, got %s", i, expected, sortedScripts[i].Name)
|
||||
}
|
||||
}
|
||||
|
||||
// Verify priorities are ascending
|
||||
for i := 0; i < len(sortedScripts)-1; i++ {
|
||||
if sortedScripts[i].Priority > sortedScripts[i+1].Priority {
|
||||
t.Errorf("Priority not ascending at position %d: %d > %d",
|
||||
i, sortedScripts[i].Priority, sortedScripts[i+1].Priority)
|
||||
}
|
||||
// Within same priority, sequences should be ascending
|
||||
if sortedScripts[i].Priority == sortedScripts[i+1].Priority &&
|
||||
sortedScripts[i].Sequence > sortedScripts[i+1].Sequence {
|
||||
t.Errorf("Sequence not ascending at position %d with same priority %d: %d > %d",
|
||||
i, sortedScripts[i].Priority, sortedScripts[i].Sequence, sortedScripts[i+1].Sequence)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestWriter_WriteSchema_EmptyScripts(t *testing.T) {
|
||||
// This test verifies that writing an empty script list doesn't cause errors
|
||||
// even without a database connection (should return early)
|
||||
writer := NewWriter(&writers.WriterOptions{
|
||||
Metadata: map[string]any{
|
||||
"connection_string": "postgres://invalid/test",
|
||||
},
|
||||
})
|
||||
|
||||
schema := &models.Schema{
|
||||
Name: "public",
|
||||
Scripts: []*models.Script{},
|
||||
}
|
||||
|
||||
// Note: This will try to connect even with empty scripts
|
||||
// In a real scenario, the executeScripts function returns early for empty scripts
|
||||
// but the connection is made before that. This test documents the behavior.
|
||||
err := writer.WriteSchema(schema)
|
||||
// We expect a connection error since we're using an invalid connection string
|
||||
if err == nil {
|
||||
t.Error("Expected connection error, got nil")
|
||||
}
|
||||
}
|
||||
|
||||
// NOTE: Integration tests for actual database execution should be added separately
|
||||
// Those tests would require:
|
||||
// 1. A running PostgreSQL instance
|
||||
// 2. Test database setup/teardown
|
||||
// 3. Verification of actual script execution
|
||||
// 4. Testing error handling during execution
|
||||
// 5. Testing transaction behavior if added
|
||||
//
|
||||
// Example integration test structure:
|
||||
// func TestWriter_Integration_ExecuteScripts(t *testing.T) {
|
||||
// if testing.Short() {
|
||||
// t.Skip("Skipping integration test")
|
||||
// }
|
||||
// // Setup test database
|
||||
// // Create test scripts
|
||||
// // Execute scripts
|
||||
// // Verify results
|
||||
// // Cleanup
|
||||
// }
|
||||
Reference in New Issue
Block a user