Add ConnectPlain to open the DB without the custom search_path and switch migration and CLI flows to run on that plain connection. Wrap multi-statement goose migrations with StatementBegin/End to ensure statements are executed atomically. Move Stripe price outbox seeding into a dedicated stripe migration.
144 lines
4.7 KiB
Go
144 lines
4.7 KiB
Go
package db
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"fmt"
|
|
"log/slog"
|
|
"net/url"
|
|
"strings"
|
|
"time"
|
|
|
|
_ "github.com/jackc/pgx/v5/stdlib"
|
|
)
|
|
|
|
// DBConfig holds database configuration.
|
|
type DBConfig struct {
|
|
DSN string // Data Source Name (PostgreSQL connection string)
|
|
MaxOpenConns int // Maximum number of open connections
|
|
MaxIdleConns int // Maximum number of idle connections
|
|
ConnMaxLifetime time.Duration // Maximum lifetime of connections
|
|
ConnMaxIdleTime time.Duration // Maximum idle time for connections
|
|
}
|
|
|
|
// DefaultDBConfig returns a DBConfig with sensible defaults for PostgreSQL.
|
|
func DefaultDBConfig(dsn string) *DBConfig {
|
|
return &DBConfig{
|
|
DSN: dsn,
|
|
MaxOpenConns: 25,
|
|
MaxIdleConns: 10,
|
|
ConnMaxLifetime: 30 * time.Minute,
|
|
ConnMaxIdleTime: 5 * time.Minute,
|
|
}
|
|
}
|
|
|
|
// ensureSearchPath appends search_path with all module schemas to the DSN if
|
|
// not already present, so every connection in the pool resolves module-schema
|
|
// tables without qualification (Decision 113).
|
|
func ensureSearchPath(dsn string) string {
|
|
const moduleSearchPath = "identity,organization,entitlements,billing,cooperative,audit,integration,stripe,public"
|
|
u, err := url.Parse(dsn)
|
|
if err != nil || u.Scheme == "" {
|
|
// Key=value DSN format — append if not already set.
|
|
if !strings.Contains(dsn, "search_path") {
|
|
return dsn + " search_path=" + moduleSearchPath
|
|
}
|
|
return dsn
|
|
}
|
|
// URI format — append as query parameter.
|
|
q := u.Query()
|
|
if q.Get("search_path") == "" {
|
|
q.Set("search_path", moduleSearchPath)
|
|
u.RawQuery = q.Encode()
|
|
}
|
|
return u.String()
|
|
}
|
|
|
|
// openAndConfigureDB opens the database connection and configures the connection pool.
|
|
func openAndConfigureDB(config *DBConfig) (*sql.DB, error) {
|
|
db, err := sql.Open("pgx", ensureSearchPath(config.DSN))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to open database: %w", err)
|
|
}
|
|
|
|
// Configure connection pool
|
|
db.SetMaxOpenConns(config.MaxOpenConns)
|
|
db.SetMaxIdleConns(config.MaxIdleConns)
|
|
db.SetConnMaxLifetime(config.ConnMaxLifetime)
|
|
db.SetConnMaxIdleTime(config.ConnMaxIdleTime)
|
|
|
|
return db, nil
|
|
}
|
|
|
|
// ConnectPlain opens a database connection without the custom search_path.
|
|
// Use this for migrations — the custom search_path causes a pgx/PG18
|
|
// interaction that silently drops tables from multi-statement DDL.
|
|
func ConnectPlain(ctx context.Context, logger *slog.Logger, config *DBConfig) (*sql.DB, error) {
|
|
db, err := sql.Open("pgx", config.DSN)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to open database: %w", err)
|
|
}
|
|
db.SetMaxOpenConns(config.MaxOpenConns)
|
|
db.SetMaxIdleConns(config.MaxIdleConns)
|
|
db.SetConnMaxLifetime(config.ConnMaxLifetime)
|
|
db.SetConnMaxIdleTime(config.ConnMaxIdleTime)
|
|
if err := db.PingContext(ctx); err != nil {
|
|
db.Close()
|
|
return nil, fmt.Errorf("failed to connect to database: %w", err)
|
|
}
|
|
logger.Info("database connection established (plain)",
|
|
slog.String("dsn", config.DSN))
|
|
return db, nil
|
|
}
|
|
|
|
// Connect initializes and returns a new database connection pool.
|
|
// This is the basic connection function without any automatic operations.
|
|
func Connect(ctx context.Context, logger *slog.Logger, config *DBConfig) (*sql.DB, error) {
|
|
db, err := openAndConfigureDB(config)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := db.PingContext(ctx); err != nil {
|
|
db.Close()
|
|
return nil, fmt.Errorf("failed to connect to database: %w", err)
|
|
}
|
|
|
|
logger.Info("database connection established",
|
|
slog.String("dsn", config.DSN),
|
|
slog.Int("max_open_conns", config.MaxOpenConns),
|
|
slog.Int("max_idle_conns", config.MaxIdleConns))
|
|
|
|
return db, nil
|
|
}
|
|
|
|
// ConnectAndMigrate initializes a database connection and automatically runs migrations.
|
|
// The sources parameter specifies migration sources in dependency order.
|
|
//
|
|
// Migrations run on a separate connection WITHOUT the custom search_path to
|
|
// avoid a pgx/PG18 interaction where setting module schemas in the search_path
|
|
// at connection time causes multi-statement DDL to silently lose tables. After
|
|
// migrations complete, the returned connection uses the full search_path for
|
|
// application queries.
|
|
func ConnectAndMigrate(ctx context.Context, logger *slog.Logger, config *DBConfig, sources []MigrationSource) (*sql.DB, error) {
|
|
// Run migrations on a plain connection (no custom search_path).
|
|
migDB, err := ConnectPlain(ctx, logger, config)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if err := RunMigrations(migDB, sources); err != nil {
|
|
migDB.Close()
|
|
return nil, fmt.Errorf("failed to run migrations: %w", err)
|
|
}
|
|
migDB.Close()
|
|
|
|
// Now open the application connection with full search_path.
|
|
db, err := Connect(ctx, logger, config)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
logger.Info("database migrations applied")
|
|
return db, nil
|
|
}
|