Files
relspecgo/pkg/inspector/validators.go
Hein 97a57f5dc8
Some checks failed
CI / Test (1.24) (push) Successful in -25m44s
CI / Test (1.25) (push) Successful in -25m40s
CI / Build (push) Successful in -25m53s
CI / Lint (push) Successful in -25m45s
Integration Tests / Integration Tests (push) Failing after -26m2s
feature: Inspector Gadget
2025-12-31 01:40:08 +02:00

604 lines
15 KiB
Go

package inspector
import (
"regexp"
"strings"
"git.warky.dev/wdevs/relspecgo/pkg/models"
"git.warky.dev/wdevs/relspecgo/pkg/pgsql"
)
// validatePrimaryKeyNaming checks that primary key column names match a pattern
func validatePrimaryKeyNaming(db *models.Database, rule Rule, ruleName string) []ValidationResult {
results := []ValidationResult{}
pattern, err := regexp.Compile(rule.Pattern)
if err != nil {
return results
}
for _, schema := range db.Schemas {
for _, table := range schema.Tables {
for _, col := range table.Columns {
if col.IsPrimaryKey {
location := formatLocation(schema.Name, table.Name, col.Name)
passed := pattern.MatchString(col.Name)
results = append(results, createResult(
ruleName,
passed,
rule.Message,
location,
map[string]interface{}{
"schema": schema.Name,
"table": table.Name,
"column": col.Name,
"expected_pattern": rule.Pattern,
},
))
}
}
}
}
return results
}
// validatePrimaryKeyDatatype checks that primary keys use approved data types
func validatePrimaryKeyDatatype(db *models.Database, rule Rule, ruleName string) []ValidationResult {
results := []ValidationResult{}
for _, schema := range db.Schemas {
for _, table := range schema.Tables {
for _, col := range table.Columns {
if col.IsPrimaryKey {
location := formatLocation(schema.Name, table.Name, col.Name)
// Normalize type (remove size/precision)
normalizedType := normalizeDataType(col.Type)
passed := contains(rule.AllowedTypes, normalizedType)
results = append(results, createResult(
ruleName,
passed,
rule.Message,
location,
map[string]interface{}{
"schema": schema.Name,
"table": table.Name,
"column": col.Name,
"current_type": col.Type,
"allowed_types": rule.AllowedTypes,
},
))
}
}
}
}
return results
}
// validatePrimaryKeyAutoIncrement checks primary key auto-increment settings
func validatePrimaryKeyAutoIncrement(db *models.Database, rule Rule, ruleName string) []ValidationResult {
results := []ValidationResult{}
for _, schema := range db.Schemas {
for _, table := range schema.Tables {
for _, col := range table.Columns {
if col.IsPrimaryKey {
location := formatLocation(schema.Name, table.Name, col.Name)
// Check if auto-increment matches requirement
passed := col.AutoIncrement == rule.RequireAutoIncrement
if !passed {
results = append(results, createResult(
ruleName,
false,
rule.Message,
location,
map[string]interface{}{
"schema": schema.Name,
"table": table.Name,
"column": col.Name,
"has_auto_increment": col.AutoIncrement,
"require_auto_increment": rule.RequireAutoIncrement,
},
))
}
}
}
}
}
return results
}
// validateForeignKeyColumnNaming checks that foreign key column names match a pattern
func validateForeignKeyColumnNaming(db *models.Database, rule Rule, ruleName string) []ValidationResult {
results := []ValidationResult{}
pattern, err := regexp.Compile(rule.Pattern)
if err != nil {
return results
}
for _, schema := range db.Schemas {
for _, table := range schema.Tables {
// Check foreign key constraints
for _, constraint := range table.Constraints {
if constraint.Type == models.ForeignKeyConstraint {
for _, colName := range constraint.Columns {
location := formatLocation(schema.Name, table.Name, colName)
passed := pattern.MatchString(colName)
results = append(results, createResult(
ruleName,
passed,
rule.Message,
location,
map[string]interface{}{
"schema": schema.Name,
"table": table.Name,
"column": colName,
"constraint": constraint.Name,
"expected_pattern": rule.Pattern,
},
))
}
}
}
}
}
return results
}
// validateForeignKeyConstraintNaming checks that foreign key constraint names match a pattern
func validateForeignKeyConstraintNaming(db *models.Database, rule Rule, ruleName string) []ValidationResult {
results := []ValidationResult{}
pattern, err := regexp.Compile(rule.Pattern)
if err != nil {
return results
}
for _, schema := range db.Schemas {
for _, table := range schema.Tables {
for _, constraint := range table.Constraints {
if constraint.Type == models.ForeignKeyConstraint {
location := formatLocation(schema.Name, table.Name, "")
passed := pattern.MatchString(constraint.Name)
results = append(results, createResult(
ruleName,
passed,
rule.Message,
location,
map[string]interface{}{
"schema": schema.Name,
"table": table.Name,
"constraint": constraint.Name,
"expected_pattern": rule.Pattern,
},
))
}
}
}
}
return results
}
// validateForeignKeyIndex checks that foreign key columns have indexes
func validateForeignKeyIndex(db *models.Database, rule Rule, ruleName string) []ValidationResult {
results := []ValidationResult{}
if !rule.RequireIndex {
return results
}
for _, schema := range db.Schemas {
for _, table := range schema.Tables {
// Get all foreign key columns
fkColumns := make(map[string]bool)
for _, constraint := range table.Constraints {
if constraint.Type == models.ForeignKeyConstraint {
for _, col := range constraint.Columns {
fkColumns[col] = true
}
}
}
// Check if each FK column has an index
for fkCol := range fkColumns {
hasIndex := false
// Check table indexes
for _, index := range table.Indexes {
// Index is good if FK column is the first column
if len(index.Columns) > 0 && index.Columns[0] == fkCol {
hasIndex = true
break
}
}
location := formatLocation(schema.Name, table.Name, fkCol)
results = append(results, createResult(
ruleName,
hasIndex,
rule.Message,
location,
map[string]interface{}{
"schema": schema.Name,
"table": table.Name,
"column": fkCol,
"has_index": hasIndex,
},
))
}
}
}
return results
}
// validateTableNamingCase checks table name casing
func validateTableNamingCase(db *models.Database, rule Rule, ruleName string) []ValidationResult {
results := []ValidationResult{}
pattern, err := regexp.Compile(rule.Pattern)
if err != nil {
return results
}
for _, schema := range db.Schemas {
for _, table := range schema.Tables {
location := formatLocation(schema.Name, table.Name, "")
passed := pattern.MatchString(table.Name)
results = append(results, createResult(
ruleName,
passed,
rule.Message,
location,
map[string]interface{}{
"schema": schema.Name,
"table": table.Name,
"expected_case": rule.Case,
"expected_pattern": rule.Pattern,
},
))
}
}
return results
}
// validateColumnNamingCase checks column name casing
func validateColumnNamingCase(db *models.Database, rule Rule, ruleName string) []ValidationResult {
results := []ValidationResult{}
pattern, err := regexp.Compile(rule.Pattern)
if err != nil {
return results
}
for _, schema := range db.Schemas {
for _, table := range schema.Tables {
for _, col := range table.Columns {
location := formatLocation(schema.Name, table.Name, col.Name)
passed := pattern.MatchString(col.Name)
results = append(results, createResult(
ruleName,
passed,
rule.Message,
location,
map[string]interface{}{
"schema": schema.Name,
"table": table.Name,
"column": col.Name,
"expected_case": rule.Case,
"expected_pattern": rule.Pattern,
},
))
}
}
}
return results
}
// validateTableNameLength checks table name length
func validateTableNameLength(db *models.Database, rule Rule, ruleName string) []ValidationResult {
results := []ValidationResult{}
for _, schema := range db.Schemas {
for _, table := range schema.Tables {
location := formatLocation(schema.Name, table.Name, "")
passed := len(table.Name) <= rule.MaxLength
results = append(results, createResult(
ruleName,
passed,
rule.Message,
location,
map[string]interface{}{
"schema": schema.Name,
"table": table.Name,
"length": len(table.Name),
"max_length": rule.MaxLength,
},
))
}
}
return results
}
// validateColumnNameLength checks column name length
func validateColumnNameLength(db *models.Database, rule Rule, ruleName string) []ValidationResult {
results := []ValidationResult{}
for _, schema := range db.Schemas {
for _, table := range schema.Tables {
for _, col := range table.Columns {
location := formatLocation(schema.Name, table.Name, col.Name)
passed := len(col.Name) <= rule.MaxLength
results = append(results, createResult(
ruleName,
passed,
rule.Message,
location,
map[string]interface{}{
"schema": schema.Name,
"table": table.Name,
"column": col.Name,
"length": len(col.Name),
"max_length": rule.MaxLength,
},
))
}
}
}
return results
}
// validateReservedKeywords checks for reserved SQL keywords
func validateReservedKeywords(db *models.Database, rule Rule, ruleName string) []ValidationResult {
results := []ValidationResult{}
// Build keyword map from PostgreSQL keywords
keywordSlice := pgsql.GetPostgresKeywords()
keywords := make(map[string]bool)
for _, kw := range keywordSlice {
keywords[strings.ToUpper(kw)] = true
}
for _, schema := range db.Schemas {
for _, table := range schema.Tables {
// Check table name
if rule.CheckTables {
location := formatLocation(schema.Name, table.Name, "")
passed := !keywords[strings.ToUpper(table.Name)]
results = append(results, createResult(
ruleName,
passed,
rule.Message,
location,
map[string]interface{}{
"schema": schema.Name,
"table": table.Name,
"object_type": "table",
},
))
}
// Check column names
if rule.CheckColumns {
for _, col := range table.Columns {
location := formatLocation(schema.Name, table.Name, col.Name)
passed := !keywords[strings.ToUpper(col.Name)]
results = append(results, createResult(
ruleName,
passed,
rule.Message,
location,
map[string]interface{}{
"schema": schema.Name,
"table": table.Name,
"column": col.Name,
"object_type": "column",
},
))
}
}
}
}
return results
}
// validateMissingPrimaryKey checks for tables without primary keys
func validateMissingPrimaryKey(db *models.Database, rule Rule, ruleName string) []ValidationResult {
results := []ValidationResult{}
for _, schema := range db.Schemas {
for _, table := range schema.Tables {
hasPrimaryKey := false
// Check columns for primary key
for _, col := range table.Columns {
if col.IsPrimaryKey {
hasPrimaryKey = true
break
}
}
// Also check constraints
if !hasPrimaryKey {
for _, constraint := range table.Constraints {
if constraint.Type == models.PrimaryKeyConstraint {
hasPrimaryKey = true
break
}
}
}
location := formatLocation(schema.Name, table.Name, "")
results = append(results, createResult(
ruleName,
hasPrimaryKey,
rule.Message,
location,
map[string]interface{}{
"schema": schema.Name,
"table": table.Name,
},
))
}
}
return results
}
// validateOrphanedForeignKey checks for foreign keys referencing non-existent tables
func validateOrphanedForeignKey(db *models.Database, rule Rule, ruleName string) []ValidationResult {
results := []ValidationResult{}
// Build a map of existing tables for quick lookup
tableExists := make(map[string]bool)
for _, schema := range db.Schemas {
for _, table := range schema.Tables {
key := schema.Name + "." + table.Name
tableExists[key] = true
}
}
// Check all foreign key constraints
for _, schema := range db.Schemas {
for _, table := range schema.Tables {
for _, constraint := range table.Constraints {
if constraint.Type == models.ForeignKeyConstraint {
// Build referenced table key
refSchema := constraint.ReferencedSchema
if refSchema == "" {
refSchema = schema.Name
}
refKey := refSchema + "." + constraint.ReferencedTable
location := formatLocation(schema.Name, table.Name, "")
passed := tableExists[refKey]
results = append(results, createResult(
ruleName,
passed,
rule.Message,
location,
map[string]interface{}{
"schema": schema.Name,
"table": table.Name,
"constraint": constraint.Name,
"referenced_schema": refSchema,
"referenced_table": constraint.ReferencedTable,
},
))
}
}
}
}
return results
}
// validateCircularDependency checks for circular foreign key dependencies
func validateCircularDependency(db *models.Database, rule Rule, ruleName string) []ValidationResult {
results := []ValidationResult{}
// Build dependency graph
dependencies := make(map[string][]string)
for _, schema := range db.Schemas {
for _, table := range schema.Tables {
tableKey := schema.Name + "." + table.Name
for _, constraint := range table.Constraints {
if constraint.Type == models.ForeignKeyConstraint {
refSchema := constraint.ReferencedSchema
if refSchema == "" {
refSchema = schema.Name
}
refKey := refSchema + "." + constraint.ReferencedTable
dependencies[tableKey] = append(dependencies[tableKey], refKey)
}
}
}
}
// Check for cycles using DFS
for tableKey := range dependencies {
visited := make(map[string]bool)
recStack := make(map[string]bool)
if hasCycle(tableKey, dependencies, visited, recStack) {
parts := strings.Split(tableKey, ".")
location := formatLocation(parts[0], parts[1], "")
results = append(results, createResult(
ruleName,
false,
rule.Message,
location,
map[string]interface{}{
"schema": parts[0],
"table": parts[1],
},
))
}
}
return results
}
// Helper functions
// hasCycle performs DFS to detect cycles in dependency graph
func hasCycle(node string, graph map[string][]string, visited, recStack map[string]bool) bool {
visited[node] = true
recStack[node] = true
for _, neighbor := range graph[node] {
if !visited[neighbor] {
if hasCycle(neighbor, graph, visited, recStack) {
return true
}
} else if recStack[neighbor] {
return true
}
}
recStack[node] = false
return false
}
// normalizeDataType removes size/precision from data type
func normalizeDataType(dataType string) string {
// Remove everything in parentheses
idx := strings.Index(dataType, "(")
if idx > 0 {
dataType = dataType[:idx]
}
return strings.ToLower(strings.TrimSpace(dataType))
}
// contains checks if a string slice contains a value
func contains(slice []string, value string) bool {
for _, item := range slice {
if strings.EqualFold(item, value) {
return true
}
}
return false
}