Files
member-console/internal/workflows/worker.go

119 lines
3.3 KiB
Go

package workflows
import (
"context"
"fmt"
"log/slog"
"git.coopcloud.tech/wiki-cafe/member-console/internal/db"
"git.coopcloud.tech/wiki-cafe/member-console/internal/workflows/example"
"git.coopcloud.tech/wiki-cafe/member-console/internal/workflows/fedwiki"
"git.coopcloud.tech/wiki-cafe/member-console/internal/workflows/queues"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
)
// WorkerConfig holds configuration for the embedded Temporal worker.
type WorkerConfig struct {
TaskQueue string
DB db.Querier
Logger *slog.Logger
MaxConcurrentActivities int
MaxConcurrentWorkflows int
// FedWiki configuration
FedWikiFarmAPIURL string // URL for FarmManager API calls
FedWikiAllowedDomains []string // Domains where users can create sites
FedWikiAdminToken string
SupportURL string
}
// DefaultWorkerConfig returns a WorkerConfig with sensible defaults.
func DefaultWorkerConfig(database db.Querier, logger *slog.Logger) WorkerConfig {
return WorkerConfig{
TaskQueue: queues.Main,
DB: database,
Logger: logger,
MaxConcurrentActivities: 1000,
MaxConcurrentWorkflows: 1000,
}
}
// Worker wraps a Temporal worker with application-specific setup.
type Worker struct {
worker worker.Worker
logger *slog.Logger
}
// NewWorker creates and configures a new embedded Temporal worker.
func NewWorker(c client.Client, cfg WorkerConfig) (*Worker, error) {
if cfg.TaskQueue == "" {
cfg.TaskQueue = queues.Main
}
opts := worker.Options{}
if cfg.MaxConcurrentActivities > 0 {
opts.MaxConcurrentActivityExecutionSize = cfg.MaxConcurrentActivities
}
if cfg.MaxConcurrentWorkflows > 0 {
opts.MaxConcurrentWorkflowTaskExecutionSize = cfg.MaxConcurrentWorkflows
}
w := worker.New(c, cfg.TaskQueue, opts)
// --- Domain Registration ---
// This is where we wire up the different domains to this worker.
// 1. Example Domain
w.RegisterWorkflow(example.Workflow)
w.RegisterActivity(example.NewActivities(cfg.DB, cfg.Logger))
// 2. FedWiki Domain
w.RegisterWorkflow(fedwiki.CreateFedWikiSiteWorkflow)
w.RegisterWorkflow(fedwiki.DeleteFedWikiSiteWorkflow)
w.RegisterWorkflow(fedwiki.SyncFedWikiSitesWorkflow)
w.RegisterActivity(fedwiki.NewActivities(fedwiki.ActivitiesConfig{
DB: cfg.DB,
Logger: cfg.Logger,
FedWikiFarmAPIURL: cfg.FedWikiFarmAPIURL,
FedWikiAllowedDomains: cfg.FedWikiAllowedDomains,
FedWikiAdminToken: cfg.FedWikiAdminToken,
SupportURL: cfg.SupportURL,
}))
// 3. Future domains (e.g., Onboarding, Payments) would go here...
if cfg.Logger != nil {
cfg.Logger.Info("created Temporal worker",
slog.String("taskQueue", cfg.TaskQueue))
}
return &Worker{
worker: w,
logger: cfg.Logger,
}, nil
}
func (w *Worker) Start() error {
if err := w.worker.Start(); err != nil {
return fmt.Errorf("failed to start Temporal worker: %w", err)
}
if w.logger != nil {
w.logger.Info("Temporal worker started")
}
return nil
}
func (w *Worker) Stop() {
w.worker.Stop()
if w.logger != nil {
w.logger.Info("Temporal worker stopped")
}
}
func (w *Worker) GracefulStop(ctx context.Context) {
w.worker.Stop()
if w.logger != nil {
w.logger.Info("Temporal worker stopped gracefully")
}
}