package workflows import ( "context" "database/sql" "fmt" "log/slog" "git.coopcloud.tech/wiki-cafe/member-console/internal/entitlements" fwmod "git.coopcloud.tech/wiki-cafe/member-console/internal/fedwiki" wfEnt "git.coopcloud.tech/wiki-cafe/member-console/internal/workflows/entitlements" "git.coopcloud.tech/wiki-cafe/member-console/internal/workflows/example" "git.coopcloud.tech/wiki-cafe/member-console/internal/workflows/fedwiki" "git.coopcloud.tech/wiki-cafe/member-console/internal/workflows/queues" wfStripe "git.coopcloud.tech/wiki-cafe/member-console/internal/workflows/stripe" "go.temporal.io/sdk/client" "go.temporal.io/sdk/worker" ) // WorkerConfig holds configuration for the embedded Temporal worker. type WorkerConfig struct { TaskQueue string SiteQ fwmod.Querier EntitlementsQ entitlements.Querier Database *sql.DB Logger *slog.Logger MaxConcurrentActivities int MaxConcurrentWorkflows int // FedWiki configuration FedWikiFarmAPIURL string // URL for FarmManager API calls FedWikiAllowedDomains []string // Domains where users can create sites FedWikiAdminToken string SupportURL string } // DefaultWorkerConfig returns a WorkerConfig with sensible defaults. func DefaultWorkerConfig(siteQ fwmod.Querier, entQ entitlements.Querier, database *sql.DB, logger *slog.Logger) WorkerConfig { return WorkerConfig{ TaskQueue: queues.Main, SiteQ: siteQ, EntitlementsQ: entQ, Database: database, Logger: logger, MaxConcurrentActivities: 1000, MaxConcurrentWorkflows: 1000, } } // Worker wraps a Temporal worker with application-specific setup. type Worker struct { worker worker.Worker logger *slog.Logger } // NewWorker creates and configures a new embedded Temporal worker. func NewWorker(c client.Client, cfg WorkerConfig) (*Worker, error) { if cfg.TaskQueue == "" { cfg.TaskQueue = queues.Main } opts := worker.Options{} if cfg.MaxConcurrentActivities > 0 { opts.MaxConcurrentActivityExecutionSize = cfg.MaxConcurrentActivities } if cfg.MaxConcurrentWorkflows > 0 { opts.MaxConcurrentWorkflowTaskExecutionSize = cfg.MaxConcurrentWorkflows } w := worker.New(c, cfg.TaskQueue, opts) // --- Domain Registration --- // This is where we wire up the different domains to this worker. // 1. Example Domain w.RegisterWorkflow(example.Workflow) w.RegisterActivity(example.NewActivities(cfg.Logger)) // 2. FedWiki Domain w.RegisterWorkflow(fedwiki.CreateFedWikiSiteWorkflow) w.RegisterWorkflow(fedwiki.DeleteFedWikiSiteWorkflow) w.RegisterWorkflow(fedwiki.SyncFedWikiSitesWorkflow) w.RegisterActivity(fedwiki.NewActivities(fedwiki.ActivitiesConfig{ SiteQ: cfg.SiteQ, EntitlementsQ: cfg.EntitlementsQ, Database: cfg.Database, Logger: cfg.Logger, FedWikiFarmAPIURL: cfg.FedWikiFarmAPIURL, FedWikiAllowedDomains: cfg.FedWikiAllowedDomains, FedWikiAdminToken: cfg.FedWikiAdminToken, SupportURL: cfg.SupportURL, })) // 3. Stripe / Integration Domain w.RegisterWorkflow(wfStripe.ProcessStripeWebhooks) w.RegisterWorkflow(wfStripe.PollIntegrationOutbox) w.RegisterActivity(wfStripe.NewWebhookActivities(cfg.Database, cfg.Logger)) w.RegisterActivity(wfStripe.NewOutboxActivities(cfg.Database, cfg.Logger)) // 4. Entitlements Domain — grant-expiration workflow invokes Transition // at valid_until so trial grants cleanly return the pool to the // configured default (or detach if no default is set). w.RegisterWorkflow(wfEnt.GrantExpirationWorkflow) w.RegisterActivity(wfEnt.NewActivities(cfg.Database, cfg.Logger)) if cfg.Logger != nil { cfg.Logger.Info("created Temporal worker", slog.String("taskQueue", cfg.TaskQueue)) } return &Worker{ worker: w, logger: cfg.Logger, }, nil } func (w *Worker) Start() error { if err := w.worker.Start(); err != nil { return fmt.Errorf("failed to start Temporal worker: %w", err) } if w.logger != nil { w.logger.Info("Temporal worker started") } return nil } func (w *Worker) Stop() { w.worker.Stop() if w.logger != nil { w.logger.Info("Temporal worker stopped") } } func (w *Worker) GracefulStop(ctx context.Context) { w.worker.Stop() if w.logger != nil { w.logger.Info("Temporal worker stopped gracefully") } }