- Introduced MessageCachePage for browsing and managing cached webhook events. - Enhanced DashboardPage to display runtime stats and message cache information. - Added new API types for message cache events and system stats. - Integrated SwaggerPage for API documentation and live request testing.
347 lines
10 KiB
Go
347 lines
10 KiB
Go
package storage
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
"git.warky.dev/wdevs/whatshooked/pkg/models"
|
|
"github.com/uptrace/bun"
|
|
)
|
|
|
|
// Repository provides common CRUD operations
|
|
type Repository[T any] struct {
|
|
db *bun.DB
|
|
}
|
|
|
|
// NewRepository creates a new repository instance
|
|
func NewRepository[T any](db *bun.DB) *Repository[T] {
|
|
return &Repository[T]{db: db}
|
|
}
|
|
|
|
// Create inserts a new record
|
|
func (r *Repository[T]) Create(ctx context.Context, entity *T) error {
|
|
_, err := r.db.NewInsert().Model(entity).Exec(ctx)
|
|
return err
|
|
}
|
|
|
|
// GetByID retrieves a record by ID
|
|
func (r *Repository[T]) GetByID(ctx context.Context, id string) (*T, error) {
|
|
var entity T
|
|
err := r.db.NewSelect().Model(&entity).Where("id = ?", id).Scan(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &entity, nil
|
|
}
|
|
|
|
// Update updates an existing record
|
|
func (r *Repository[T]) Update(ctx context.Context, entity *T) error {
|
|
_, err := r.db.NewUpdate().Model(entity).WherePK().Exec(ctx)
|
|
return err
|
|
}
|
|
|
|
// Delete soft deletes a record by ID (if model has DeletedAt field)
|
|
func (r *Repository[T]) Delete(ctx context.Context, id string) error {
|
|
var entity T
|
|
_, err := r.db.NewDelete().Model(&entity).Where("id = ?", id).Exec(ctx)
|
|
return err
|
|
}
|
|
|
|
// List retrieves all records with optional filtering
|
|
func (r *Repository[T]) List(ctx context.Context, filter map[string]interface{}) ([]T, error) {
|
|
var entities []T
|
|
query := r.db.NewSelect().Model(&entities)
|
|
|
|
for key, value := range filter {
|
|
query = query.Where("? = ?", bun.Ident(key), value)
|
|
}
|
|
|
|
err := query.Scan(ctx)
|
|
return entities, err
|
|
}
|
|
|
|
// Count returns the total number of records matching the filter
|
|
func (r *Repository[T]) Count(ctx context.Context, filter map[string]interface{}) (int, error) {
|
|
query := r.db.NewSelect().Model((*T)(nil))
|
|
|
|
for key, value := range filter {
|
|
query = query.Where("? = ?", bun.Ident(key), value)
|
|
}
|
|
|
|
count, err := query.Count(ctx)
|
|
return count, err
|
|
}
|
|
|
|
// UserRepository provides user-specific operations
|
|
type UserRepository struct {
|
|
*Repository[models.ModelPublicUsers]
|
|
}
|
|
|
|
// NewUserRepository creates a new user repository
|
|
func NewUserRepository(db *bun.DB) *UserRepository {
|
|
return &UserRepository{
|
|
Repository: NewRepository[models.ModelPublicUsers](db),
|
|
}
|
|
}
|
|
|
|
// GetByUsername retrieves a user by username
|
|
func (r *UserRepository) GetByUsername(ctx context.Context, username string) (*models.ModelPublicUsers, error) {
|
|
var user models.ModelPublicUsers
|
|
err := r.db.NewSelect().Model(&user).Where("username = ?", username).Scan(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &user, nil
|
|
}
|
|
|
|
// GetByEmail retrieves a user by email
|
|
func (r *UserRepository) GetByEmail(ctx context.Context, email string) (*models.ModelPublicUsers, error) {
|
|
var user models.ModelPublicUsers
|
|
err := r.db.NewSelect().Model(&user).Where("email = ?", email).Scan(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &user, nil
|
|
}
|
|
|
|
// APIKeyRepository provides API key-specific operations
|
|
type APIKeyRepository struct {
|
|
*Repository[models.ModelPublicAPIKey]
|
|
}
|
|
|
|
// NewAPIKeyRepository creates a new API key repository
|
|
func NewAPIKeyRepository(db *bun.DB) *APIKeyRepository {
|
|
return &APIKeyRepository{
|
|
Repository: NewRepository[models.ModelPublicAPIKey](db),
|
|
}
|
|
}
|
|
|
|
// GetByKey retrieves an API key by its key value
|
|
func (r *APIKeyRepository) GetByKey(ctx context.Context, key string) (*models.ModelPublicAPIKey, error) {
|
|
var apiKey models.ModelPublicAPIKey
|
|
err := r.db.NewSelect().Model(&apiKey).
|
|
Where("key = ? AND active = ?", key, true).
|
|
Scan(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &apiKey, nil
|
|
}
|
|
|
|
// GetByUserID retrieves all API keys for a user
|
|
func (r *APIKeyRepository) GetByUserID(ctx context.Context, userID string) ([]models.ModelPublicAPIKey, error) {
|
|
var apiKeys []models.ModelPublicAPIKey
|
|
err := r.db.NewSelect().Model(&apiKeys).Where("user_id = ?", userID).Scan(ctx)
|
|
return apiKeys, err
|
|
}
|
|
|
|
// UpdateLastUsed updates the last used timestamp for an API key
|
|
func (r *APIKeyRepository) UpdateLastUsed(ctx context.Context, id string) error {
|
|
now := time.Now()
|
|
_, err := r.db.NewUpdate().Model((*models.ModelPublicAPIKey)(nil)).
|
|
Set("last_used_at = ?", now).
|
|
Where("id = ?", id).
|
|
Exec(ctx)
|
|
return err
|
|
}
|
|
|
|
// HookRepository provides hook-specific operations
|
|
type HookRepository struct {
|
|
*Repository[models.ModelPublicHook]
|
|
}
|
|
|
|
// NewHookRepository creates a new hook repository
|
|
func NewHookRepository(db *bun.DB) *HookRepository {
|
|
return &HookRepository{
|
|
Repository: NewRepository[models.ModelPublicHook](db),
|
|
}
|
|
}
|
|
|
|
// GetByUserID retrieves all hooks for a user
|
|
func (r *HookRepository) GetByUserID(ctx context.Context, userID string) ([]models.ModelPublicHook, error) {
|
|
var hooks []models.ModelPublicHook
|
|
err := r.db.NewSelect().Model(&hooks).Where("user_id = ?", userID).Scan(ctx)
|
|
return hooks, err
|
|
}
|
|
|
|
// GetActiveHooks retrieves all active hooks
|
|
func (r *HookRepository) GetActiveHooks(ctx context.Context) ([]models.ModelPublicHook, error) {
|
|
var hooks []models.ModelPublicHook
|
|
err := r.db.NewSelect().Model(&hooks).Where("active = ?", true).Scan(ctx)
|
|
return hooks, err
|
|
}
|
|
|
|
// WhatsAppAccountRepository provides WhatsApp account-specific operations
|
|
type WhatsAppAccountRepository struct {
|
|
*Repository[models.ModelPublicWhatsappAccount]
|
|
}
|
|
|
|
// NewWhatsAppAccountRepository creates a new WhatsApp account repository
|
|
func NewWhatsAppAccountRepository(db *bun.DB) *WhatsAppAccountRepository {
|
|
return &WhatsAppAccountRepository{
|
|
Repository: NewRepository[models.ModelPublicWhatsappAccount](db),
|
|
}
|
|
}
|
|
|
|
// GetByUserID retrieves all WhatsApp accounts for a user
|
|
func (r *WhatsAppAccountRepository) GetByUserID(ctx context.Context, userID string) ([]models.ModelPublicWhatsappAccount, error) {
|
|
var accounts []models.ModelPublicWhatsappAccount
|
|
err := r.db.NewSelect().Model(&accounts).Where("user_id = ?", userID).Scan(ctx)
|
|
return accounts, err
|
|
}
|
|
|
|
// GetByPhoneNumber retrieves an account by phone number
|
|
func (r *WhatsAppAccountRepository) GetByPhoneNumber(ctx context.Context, phoneNumber string) (*models.ModelPublicWhatsappAccount, error) {
|
|
var account models.ModelPublicWhatsappAccount
|
|
err := r.db.NewSelect().Model(&account).Where("phone_number = ?", phoneNumber).Scan(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &account, nil
|
|
}
|
|
|
|
// UpdateConfig updates the config JSON column and phone number for a WhatsApp account
|
|
func (r *WhatsAppAccountRepository) UpdateConfig(ctx context.Context, id string, phoneNumber string, cfgJSON string, active bool) error {
|
|
now := time.Now()
|
|
updated, err := r.updateAccountTable(ctx, id, map[string]any{
|
|
"config": cfgJSON,
|
|
"phone_number": phoneNumber,
|
|
"active": active,
|
|
"updated_at": now,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if updated {
|
|
return nil
|
|
}
|
|
|
|
return fmt.Errorf("no whatsapp account row found for id=%s", id)
|
|
}
|
|
|
|
// UpdateStatus updates the status of a WhatsApp account
|
|
func (r *WhatsAppAccountRepository) UpdateStatus(ctx context.Context, id string, status string) error {
|
|
now := time.Now()
|
|
fields := map[string]any{
|
|
"status": status,
|
|
"updated_at": now,
|
|
}
|
|
if status == "connected" {
|
|
fields["last_connected_at"] = now
|
|
}
|
|
|
|
updated, err := r.updateAccountTable(ctx, id, fields)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if updated {
|
|
return nil
|
|
}
|
|
|
|
return fmt.Errorf("no whatsapp account row found for id=%s", id)
|
|
}
|
|
|
|
func (r *WhatsAppAccountRepository) updateAccountTable(ctx context.Context, id string, fields map[string]any) (bool, error) {
|
|
query := r.db.NewUpdate().Table("whatsapp_account").Where("id = ?", id)
|
|
for column, value := range fields {
|
|
query = query.Set(column+" = ?", value)
|
|
}
|
|
|
|
result, err := query.Exec(ctx)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if result == nil {
|
|
return false, nil
|
|
}
|
|
rowsAffected, err := result.RowsAffected()
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return rowsAffected > 0, nil
|
|
}
|
|
|
|
// SessionRepository provides session-specific operations
|
|
type SessionRepository struct {
|
|
*Repository[models.ModelPublicSession]
|
|
}
|
|
|
|
// NewSessionRepository creates a new session repository
|
|
func NewSessionRepository(db *bun.DB) *SessionRepository {
|
|
return &SessionRepository{
|
|
Repository: NewRepository[models.ModelPublicSession](db),
|
|
}
|
|
}
|
|
|
|
// GetByToken retrieves a session by token
|
|
func (r *SessionRepository) GetByToken(ctx context.Context, token string) (*models.ModelPublicSession, error) {
|
|
var session models.ModelPublicSession
|
|
err := r.db.NewSelect().Model(&session).
|
|
Where("token = ? AND expires_at > ?", token, time.Now()).
|
|
Scan(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &session, nil
|
|
}
|
|
|
|
// DeleteExpired removes all expired sessions
|
|
func (r *SessionRepository) DeleteExpired(ctx context.Context) error {
|
|
_, err := r.db.NewDelete().Model((*models.ModelPublicSession)(nil)).
|
|
Where("expires_at <= ?", time.Now()).
|
|
Exec(ctx)
|
|
return err
|
|
}
|
|
|
|
// DeleteByUserID removes all sessions for a user
|
|
func (r *SessionRepository) DeleteByUserID(ctx context.Context, userID string) error {
|
|
_, err := r.db.NewDelete().Model((*models.ModelPublicSession)(nil)).
|
|
Where("user_id = ?", userID).
|
|
Exec(ctx)
|
|
return err
|
|
}
|
|
|
|
// EventLogRepository provides event log-specific operations
|
|
type EventLogRepository struct {
|
|
*Repository[models.ModelPublicEventLog]
|
|
}
|
|
|
|
// NewEventLogRepository creates a new event log repository
|
|
func NewEventLogRepository(db *bun.DB) *EventLogRepository {
|
|
return &EventLogRepository{
|
|
Repository: NewRepository[models.ModelPublicEventLog](db),
|
|
}
|
|
}
|
|
|
|
// GetByUserID retrieves event logs for a user
|
|
func (r *EventLogRepository) GetByUserID(ctx context.Context, userID string, limit int) ([]models.ModelPublicEventLog, error) {
|
|
var logs []models.ModelPublicEventLog
|
|
err := r.db.NewSelect().Model(&logs).
|
|
Where("user_id = ?", userID).
|
|
Order("created_at DESC").
|
|
Limit(limit).
|
|
Scan(ctx)
|
|
return logs, err
|
|
}
|
|
|
|
// GetByEntityID retrieves event logs for a specific entity
|
|
func (r *EventLogRepository) GetByEntityID(ctx context.Context, entityType, entityID string, limit int) ([]models.ModelPublicEventLog, error) {
|
|
var logs []models.ModelPublicEventLog
|
|
err := r.db.NewSelect().Model(&logs).
|
|
Where("entity_type = ? AND entity_id = ?", entityType, entityID).
|
|
Order("created_at DESC").
|
|
Limit(limit).
|
|
Scan(ctx)
|
|
return logs, err
|
|
}
|
|
|
|
// DeleteOlderThan removes event logs older than the specified duration
|
|
func (r *EventLogRepository) DeleteOlderThan(ctx context.Context, duration time.Duration) error {
|
|
cutoff := time.Now().Add(-duration)
|
|
_, err := r.db.NewDelete().Model((*models.ModelPublicEventLog)(nil)).
|
|
Where("created_at < ?", cutoff).
|
|
Exec(ctx)
|
|
return err
|
|
}
|