Files
member-console/internal/workflows/worker.go
Christian Galo 667e9ffe24 Add plan ladders and pool provision transitions
Introduce DB migrations for ladder and pool-attachment tables and an
audit log for provision transitions. Make product_type nullable and add
lifecycle_status plus a product_kinds view. Implement Transition and
ReapplyDefaultsForPool primitives, SQLC queries/models, webhook and
Temporal workflow integration, and accompanying unit/integration tests.
2026-04-19 20:45:56 -05:00

139 lines
4.3 KiB
Go

package workflows
import (
"context"
"database/sql"
"fmt"
"log/slog"
"git.coopcloud.tech/wiki-cafe/member-console/internal/entitlements"
fwmod "git.coopcloud.tech/wiki-cafe/member-console/internal/fedwiki"
wfEnt "git.coopcloud.tech/wiki-cafe/member-console/internal/workflows/entitlements"
"git.coopcloud.tech/wiki-cafe/member-console/internal/workflows/example"
"git.coopcloud.tech/wiki-cafe/member-console/internal/workflows/fedwiki"
"git.coopcloud.tech/wiki-cafe/member-console/internal/workflows/queues"
wfStripe "git.coopcloud.tech/wiki-cafe/member-console/internal/workflows/stripe"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
)
// WorkerConfig holds configuration for the embedded Temporal worker.
type WorkerConfig struct {
TaskQueue string
SiteQ fwmod.Querier
EntitlementsQ entitlements.Querier
Database *sql.DB
Logger *slog.Logger
MaxConcurrentActivities int
MaxConcurrentWorkflows int
// FedWiki configuration
FedWikiFarmAPIURL string // URL for FarmManager API calls
FedWikiAllowedDomains []string // Domains where users can create sites
FedWikiAdminToken string
SupportURL string
}
// DefaultWorkerConfig returns a WorkerConfig with sensible defaults.
func DefaultWorkerConfig(siteQ fwmod.Querier, entQ entitlements.Querier, database *sql.DB, logger *slog.Logger) WorkerConfig {
return WorkerConfig{
TaskQueue: queues.Main,
SiteQ: siteQ,
EntitlementsQ: entQ,
Database: database,
Logger: logger,
MaxConcurrentActivities: 1000,
MaxConcurrentWorkflows: 1000,
}
}
// Worker wraps a Temporal worker with application-specific setup.
type Worker struct {
worker worker.Worker
logger *slog.Logger
}
// NewWorker creates and configures a new embedded Temporal worker.
func NewWorker(c client.Client, cfg WorkerConfig) (*Worker, error) {
if cfg.TaskQueue == "" {
cfg.TaskQueue = queues.Main
}
opts := worker.Options{}
if cfg.MaxConcurrentActivities > 0 {
opts.MaxConcurrentActivityExecutionSize = cfg.MaxConcurrentActivities
}
if cfg.MaxConcurrentWorkflows > 0 {
opts.MaxConcurrentWorkflowTaskExecutionSize = cfg.MaxConcurrentWorkflows
}
w := worker.New(c, cfg.TaskQueue, opts)
// --- Domain Registration ---
// This is where we wire up the different domains to this worker.
// 1. Example Domain
w.RegisterWorkflow(example.Workflow)
w.RegisterActivity(example.NewActivities(cfg.Logger))
// 2. FedWiki Domain
w.RegisterWorkflow(fedwiki.CreateFedWikiSiteWorkflow)
w.RegisterWorkflow(fedwiki.DeleteFedWikiSiteWorkflow)
w.RegisterWorkflow(fedwiki.SyncFedWikiSitesWorkflow)
w.RegisterActivity(fedwiki.NewActivities(fedwiki.ActivitiesConfig{
SiteQ: cfg.SiteQ,
EntitlementsQ: cfg.EntitlementsQ,
Database: cfg.Database,
Logger: cfg.Logger,
FedWikiFarmAPIURL: cfg.FedWikiFarmAPIURL,
FedWikiAllowedDomains: cfg.FedWikiAllowedDomains,
FedWikiAdminToken: cfg.FedWikiAdminToken,
SupportURL: cfg.SupportURL,
}))
// 3. Stripe / Integration Domain
w.RegisterWorkflow(wfStripe.ProcessStripeWebhooks)
w.RegisterWorkflow(wfStripe.PollIntegrationOutbox)
w.RegisterActivity(wfStripe.NewWebhookActivities(cfg.Database, cfg.Logger))
w.RegisterActivity(wfStripe.NewOutboxActivities(cfg.Database, cfg.Logger))
// 4. Entitlements Domain — grant-expiration workflow invokes Transition
// at valid_until so trial grants cleanly return the pool to the
// configured default (or detach if no default is set).
w.RegisterWorkflow(wfEnt.GrantExpirationWorkflow)
w.RegisterActivity(wfEnt.NewActivities(cfg.Database, cfg.Logger))
if cfg.Logger != nil {
cfg.Logger.Info("created Temporal worker",
slog.String("taskQueue", cfg.TaskQueue))
}
return &Worker{
worker: w,
logger: cfg.Logger,
}, nil
}
func (w *Worker) Start() error {
if err := w.worker.Start(); err != nil {
return fmt.Errorf("failed to start Temporal worker: %w", err)
}
if w.logger != nil {
w.logger.Info("Temporal worker started")
}
return nil
}
func (w *Worker) Stop() {
w.worker.Stop()
if w.logger != nil {
w.logger.Info("Temporal worker stopped")
}
}
func (w *Worker) GracefulStop(ctx context.Context) {
w.worker.Stop()
if w.logger != nil {
w.logger.Info("Temporal worker stopped gracefully")
}
}