119 lines
3.3 KiB
Go
119 lines
3.3 KiB
Go
package workflows
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
|
|
"git.coopcloud.tech/wiki-cafe/member-console/internal/db"
|
|
"git.coopcloud.tech/wiki-cafe/member-console/internal/workflows/example"
|
|
"git.coopcloud.tech/wiki-cafe/member-console/internal/workflows/fedwiki"
|
|
"git.coopcloud.tech/wiki-cafe/member-console/internal/workflows/queues"
|
|
"go.temporal.io/sdk/client"
|
|
"go.temporal.io/sdk/worker"
|
|
)
|
|
|
|
// WorkerConfig holds configuration for the embedded Temporal worker.
|
|
type WorkerConfig struct {
|
|
TaskQueue string
|
|
DB db.Querier
|
|
Logger *slog.Logger
|
|
MaxConcurrentActivities int
|
|
MaxConcurrentWorkflows int
|
|
// FedWiki configuration
|
|
FedWikiFarmAPIURL string // URL for FarmManager API calls
|
|
FedWikiAllowedDomains []string // Domains where users can create sites
|
|
FedWikiAdminToken string
|
|
SupportURL string
|
|
}
|
|
|
|
// DefaultWorkerConfig returns a WorkerConfig with sensible defaults.
|
|
func DefaultWorkerConfig(database db.Querier, logger *slog.Logger) WorkerConfig {
|
|
return WorkerConfig{
|
|
TaskQueue: queues.Main,
|
|
DB: database,
|
|
Logger: logger,
|
|
MaxConcurrentActivities: 1000,
|
|
MaxConcurrentWorkflows: 1000,
|
|
}
|
|
}
|
|
|
|
// Worker wraps a Temporal worker with application-specific setup.
|
|
type Worker struct {
|
|
worker worker.Worker
|
|
logger *slog.Logger
|
|
}
|
|
|
|
// NewWorker creates and configures a new embedded Temporal worker.
|
|
func NewWorker(c client.Client, cfg WorkerConfig) (*Worker, error) {
|
|
if cfg.TaskQueue == "" {
|
|
cfg.TaskQueue = queues.Main
|
|
}
|
|
|
|
opts := worker.Options{}
|
|
if cfg.MaxConcurrentActivities > 0 {
|
|
opts.MaxConcurrentActivityExecutionSize = cfg.MaxConcurrentActivities
|
|
}
|
|
if cfg.MaxConcurrentWorkflows > 0 {
|
|
opts.MaxConcurrentWorkflowTaskExecutionSize = cfg.MaxConcurrentWorkflows
|
|
}
|
|
|
|
w := worker.New(c, cfg.TaskQueue, opts)
|
|
|
|
// --- Domain Registration ---
|
|
// This is where we wire up the different domains to this worker.
|
|
|
|
// 1. Example Domain
|
|
w.RegisterWorkflow(example.Workflow)
|
|
w.RegisterActivity(example.NewActivities(cfg.DB, cfg.Logger))
|
|
|
|
// 2. FedWiki Domain
|
|
w.RegisterWorkflow(fedwiki.CreateFedWikiSiteWorkflow)
|
|
w.RegisterWorkflow(fedwiki.DeleteFedWikiSiteWorkflow)
|
|
w.RegisterWorkflow(fedwiki.SyncFedWikiSitesWorkflow)
|
|
w.RegisterActivity(fedwiki.NewActivities(fedwiki.ActivitiesConfig{
|
|
DB: cfg.DB,
|
|
Logger: cfg.Logger,
|
|
FedWikiFarmAPIURL: cfg.FedWikiFarmAPIURL,
|
|
FedWikiAllowedDomains: cfg.FedWikiAllowedDomains,
|
|
FedWikiAdminToken: cfg.FedWikiAdminToken,
|
|
SupportURL: cfg.SupportURL,
|
|
}))
|
|
|
|
// 3. Future domains (e.g., Onboarding, Payments) would go here...
|
|
|
|
if cfg.Logger != nil {
|
|
cfg.Logger.Info("created Temporal worker",
|
|
slog.String("taskQueue", cfg.TaskQueue))
|
|
}
|
|
|
|
return &Worker{
|
|
worker: w,
|
|
logger: cfg.Logger,
|
|
}, nil
|
|
}
|
|
|
|
func (w *Worker) Start() error {
|
|
if err := w.worker.Start(); err != nil {
|
|
return fmt.Errorf("failed to start Temporal worker: %w", err)
|
|
}
|
|
if w.logger != nil {
|
|
w.logger.Info("Temporal worker started")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (w *Worker) Stop() {
|
|
w.worker.Stop()
|
|
if w.logger != nil {
|
|
w.logger.Info("Temporal worker stopped")
|
|
}
|
|
}
|
|
|
|
func (w *Worker) GracefulStop(ctx context.Context) {
|
|
w.worker.Stop()
|
|
if w.logger != nil {
|
|
w.logger.Info("Temporal worker stopped gracefully")
|
|
}
|
|
}
|