All checks were successful
- Implement SQLite DDL writer to convert PostgreSQL schemas to SQLite-compatible SQL statements. - Include automatic schema flattening, type mapping, auto-increment detection, and function translation. - Add templates for creating tables, indexes, unique constraints, check constraints, and foreign keys. - Implement tests for writer functionality and data type mapping.
419 lines
8.7 KiB
Go
419 lines
8.7 KiB
Go
package sqlite
|
|
|
|
import (
|
|
"bytes"
|
|
"strings"
|
|
"testing"
|
|
|
|
"git.warky.dev/wdevs/relspecgo/pkg/models"
|
|
"git.warky.dev/wdevs/relspecgo/pkg/writers"
|
|
)
|
|
|
|
func TestNewWriter(t *testing.T) {
|
|
opts := &writers.WriterOptions{
|
|
OutputPath: "/tmp/test.sql",
|
|
FlattenSchema: false, // Should be forced to true
|
|
}
|
|
|
|
writer := NewWriter(opts)
|
|
|
|
if !writer.options.FlattenSchema {
|
|
t.Error("Expected FlattenSchema to be forced to true for SQLite")
|
|
}
|
|
}
|
|
|
|
func TestWriteDatabase(t *testing.T) {
|
|
db := &models.Database{
|
|
Name: "testdb",
|
|
Schemas: []*models.Schema{
|
|
{
|
|
Name: "public",
|
|
Tables: []*models.Table{
|
|
{
|
|
Name: "users",
|
|
Columns: map[string]*models.Column{
|
|
"id": {
|
|
Name: "id",
|
|
Type: "serial",
|
|
NotNull: true,
|
|
IsPrimaryKey: true,
|
|
Default: "nextval('users_id_seq'::regclass)",
|
|
},
|
|
"email": {
|
|
Name: "email",
|
|
Type: "varchar(255)",
|
|
NotNull: true,
|
|
},
|
|
"active": {
|
|
Name: "active",
|
|
Type: "boolean",
|
|
NotNull: true,
|
|
Default: "true",
|
|
},
|
|
},
|
|
Constraints: map[string]*models.Constraint{
|
|
"pk_users": {
|
|
Name: "pk_users",
|
|
Type: models.PrimaryKeyConstraint,
|
|
Columns: []string{"id"},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
var buf bytes.Buffer
|
|
opts := &writers.WriterOptions{}
|
|
writer := NewWriter(opts)
|
|
writer.writer = &buf
|
|
|
|
err := writer.WriteDatabase(db)
|
|
if err != nil {
|
|
t.Fatalf("WriteDatabase failed: %v", err)
|
|
}
|
|
|
|
output := buf.String()
|
|
|
|
// Check for expected elements
|
|
if !strings.Contains(output, "PRAGMA foreign_keys = ON") {
|
|
t.Error("Expected PRAGMA foreign_keys statement")
|
|
}
|
|
|
|
if !strings.Contains(output, "CREATE TABLE") {
|
|
t.Error("Expected CREATE TABLE statement")
|
|
}
|
|
|
|
if !strings.Contains(output, "\"public_users\"") {
|
|
t.Error("Expected flattened table name public_users")
|
|
}
|
|
|
|
if !strings.Contains(output, "INTEGER PRIMARY KEY AUTOINCREMENT") {
|
|
t.Error("Expected autoincrement for serial primary key")
|
|
}
|
|
|
|
if !strings.Contains(output, "TEXT") {
|
|
t.Error("Expected TEXT type for varchar")
|
|
}
|
|
|
|
// Boolean should be mapped to INTEGER with default 1
|
|
if !strings.Contains(output, "active") {
|
|
t.Error("Expected active column")
|
|
}
|
|
}
|
|
|
|
func TestDataTypeMapping(t *testing.T) {
|
|
tests := []struct {
|
|
pgType string
|
|
expected string
|
|
}{
|
|
{"varchar(255)", "TEXT"},
|
|
{"text", "TEXT"},
|
|
{"integer", "INTEGER"},
|
|
{"bigint", "INTEGER"},
|
|
{"serial", "INTEGER"},
|
|
{"boolean", "INTEGER"},
|
|
{"real", "REAL"},
|
|
{"double precision", "REAL"},
|
|
{"numeric(10,2)", "NUMERIC"},
|
|
{"decimal", "NUMERIC"},
|
|
{"bytea", "BLOB"},
|
|
{"timestamp", "TEXT"},
|
|
{"uuid", "TEXT"},
|
|
{"json", "TEXT"},
|
|
{"jsonb", "TEXT"},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
result := MapPostgreSQLType(tt.pgType)
|
|
if result != tt.expected {
|
|
t.Errorf("MapPostgreSQLType(%q) = %q, want %q", tt.pgType, result, tt.expected)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestIsAutoIncrementCandidate(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
col *models.Column
|
|
expected bool
|
|
}{
|
|
{
|
|
name: "serial primary key",
|
|
col: &models.Column{
|
|
Name: "id",
|
|
Type: "serial",
|
|
IsPrimaryKey: true,
|
|
Default: "nextval('seq')",
|
|
},
|
|
expected: true,
|
|
},
|
|
{
|
|
name: "integer primary key with nextval",
|
|
col: &models.Column{
|
|
Name: "id",
|
|
Type: "integer",
|
|
IsPrimaryKey: true,
|
|
Default: "nextval('users_id_seq'::regclass)",
|
|
},
|
|
expected: true,
|
|
},
|
|
{
|
|
name: "integer not primary key",
|
|
col: &models.Column{
|
|
Name: "count",
|
|
Type: "integer",
|
|
IsPrimaryKey: false,
|
|
Default: "0",
|
|
},
|
|
expected: false,
|
|
},
|
|
{
|
|
name: "varchar primary key",
|
|
col: &models.Column{
|
|
Name: "code",
|
|
Type: "varchar",
|
|
IsPrimaryKey: true,
|
|
},
|
|
expected: false,
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
result := IsAutoIncrementCandidate(tt.col)
|
|
if result != tt.expected {
|
|
t.Errorf("IsAutoIncrementCandidate() = %v, want %v", result, tt.expected)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestFormatDefault(t *testing.T) {
|
|
tests := []struct {
|
|
name string
|
|
col *models.Column
|
|
expected string
|
|
}{
|
|
{
|
|
name: "current_timestamp",
|
|
col: &models.Column{
|
|
Type: "timestamp",
|
|
Default: "CURRENT_TIMESTAMP",
|
|
},
|
|
expected: "CURRENT_TIMESTAMP",
|
|
},
|
|
{
|
|
name: "now()",
|
|
col: &models.Column{
|
|
Type: "timestamp",
|
|
Default: "now()",
|
|
},
|
|
expected: "CURRENT_TIMESTAMP",
|
|
},
|
|
{
|
|
name: "boolean true",
|
|
col: &models.Column{
|
|
Type: "boolean",
|
|
Default: "true",
|
|
},
|
|
expected: "1",
|
|
},
|
|
{
|
|
name: "boolean false",
|
|
col: &models.Column{
|
|
Type: "boolean",
|
|
Default: "false",
|
|
},
|
|
expected: "0",
|
|
},
|
|
{
|
|
name: "serial autoincrement",
|
|
col: &models.Column{
|
|
Type: "serial",
|
|
IsPrimaryKey: true,
|
|
Default: "nextval('seq')",
|
|
},
|
|
expected: "",
|
|
},
|
|
{
|
|
name: "uuid default removed",
|
|
col: &models.Column{
|
|
Type: "uuid",
|
|
Default: "gen_random_uuid()",
|
|
},
|
|
expected: "",
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
result := FormatDefault(tt.col)
|
|
if result != tt.expected {
|
|
t.Errorf("FormatDefault() = %q, want %q", result, tt.expected)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestWriteSchema_MultiSchema(t *testing.T) {
|
|
db := &models.Database{
|
|
Name: "testdb",
|
|
Schemas: []*models.Schema{
|
|
{
|
|
Name: "auth",
|
|
Tables: []*models.Table{
|
|
{
|
|
Name: "sessions",
|
|
Columns: map[string]*models.Column{
|
|
"id": {
|
|
Name: "id",
|
|
Type: "uuid",
|
|
NotNull: true,
|
|
IsPrimaryKey: true,
|
|
},
|
|
},
|
|
Constraints: map[string]*models.Constraint{
|
|
"pk_sessions": {
|
|
Name: "pk_sessions",
|
|
Type: models.PrimaryKeyConstraint,
|
|
Columns: []string{"id"},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
{
|
|
Name: "public",
|
|
Tables: []*models.Table{
|
|
{
|
|
Name: "posts",
|
|
Columns: map[string]*models.Column{
|
|
"id": {
|
|
Name: "id",
|
|
Type: "integer",
|
|
NotNull: true,
|
|
IsPrimaryKey: true,
|
|
},
|
|
},
|
|
Constraints: map[string]*models.Constraint{
|
|
"pk_posts": {
|
|
Name: "pk_posts",
|
|
Type: models.PrimaryKeyConstraint,
|
|
Columns: []string{"id"},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
var buf bytes.Buffer
|
|
opts := &writers.WriterOptions{}
|
|
writer := NewWriter(opts)
|
|
writer.writer = &buf
|
|
|
|
err := writer.WriteDatabase(db)
|
|
if err != nil {
|
|
t.Fatalf("WriteDatabase failed: %v", err)
|
|
}
|
|
|
|
output := buf.String()
|
|
|
|
// Check for flattened table names from both schemas
|
|
if !strings.Contains(output, "\"auth_sessions\"") {
|
|
t.Error("Expected flattened table name auth_sessions")
|
|
}
|
|
|
|
if !strings.Contains(output, "\"public_posts\"") {
|
|
t.Error("Expected flattened table name public_posts")
|
|
}
|
|
}
|
|
|
|
func TestWriteIndexes(t *testing.T) {
|
|
table := &models.Table{
|
|
Name: "users",
|
|
Columns: map[string]*models.Column{
|
|
"email": {
|
|
Name: "email",
|
|
Type: "varchar(255)",
|
|
},
|
|
},
|
|
Indexes: map[string]*models.Index{
|
|
"idx_users_email": {
|
|
Name: "idx_users_email",
|
|
Columns: []string{"email"},
|
|
},
|
|
},
|
|
}
|
|
|
|
var buf bytes.Buffer
|
|
opts := &writers.WriterOptions{}
|
|
writer := NewWriter(opts)
|
|
writer.writer = &buf
|
|
|
|
err := writer.writeIndexes("public", table)
|
|
if err != nil {
|
|
t.Fatalf("writeIndexes failed: %v", err)
|
|
}
|
|
|
|
output := buf.String()
|
|
|
|
if !strings.Contains(output, "CREATE INDEX") {
|
|
t.Error("Expected CREATE INDEX statement")
|
|
}
|
|
|
|
if !strings.Contains(output, "public_users_idx_users_email") {
|
|
t.Errorf("Expected flattened index name public_users_idx_users_email, got output:\n%s", output)
|
|
}
|
|
}
|
|
|
|
func TestWriteUniqueConstraints(t *testing.T) {
|
|
table := &models.Table{
|
|
Name: "users",
|
|
Constraints: map[string]*models.Constraint{
|
|
"uk_users_email": {
|
|
Name: "uk_users_email",
|
|
Type: models.UniqueConstraint,
|
|
Columns: []string{"email"},
|
|
},
|
|
},
|
|
}
|
|
|
|
var buf bytes.Buffer
|
|
opts := &writers.WriterOptions{}
|
|
writer := NewWriter(opts)
|
|
writer.writer = &buf
|
|
|
|
err := writer.writeUniqueConstraints("public", table)
|
|
if err != nil {
|
|
t.Fatalf("writeUniqueConstraints failed: %v", err)
|
|
}
|
|
|
|
output := buf.String()
|
|
|
|
if !strings.Contains(output, "CREATE UNIQUE INDEX") {
|
|
t.Error("Expected CREATE UNIQUE INDEX statement")
|
|
}
|
|
}
|
|
|
|
func TestQuoteIdentifier(t *testing.T) {
|
|
tests := []struct {
|
|
input string
|
|
expected string
|
|
}{
|
|
{"users", `"users"`},
|
|
{"public_users", `"public_users"`},
|
|
{`user"name`, `"user""name"`}, // Double quotes should be escaped
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
result := QuoteIdentifier(tt.input)
|
|
if result != tt.expected {
|
|
t.Errorf("QuoteIdentifier(%q) = %q, want %q", tt.input, result, tt.expected)
|
|
}
|
|
}
|
|
}
|