Files
member-console/internal/workflows/fedwiki/sync.go

263 lines
8.5 KiB
Go

package fedwiki
import (
"context"
"database/sql"
"fmt"
"log/slog"
"time"
"git.coopcloud.tech/wiki-cafe/member-console/internal/db"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"
)
// SyncFedWikiSitesWorkflowInput is the input for the SyncFedWikiSitesWorkflow.
type SyncFedWikiSitesWorkflowInput struct {
// DefaultUserID is the user ID to assign to sites that don't have a known owner.
// This is typically a system/admin user ID.
DefaultUserID int64
}
// SyncFedWikiSitesWorkflowOutput is the output for the SyncFedWikiSitesWorkflow.
type SyncFedWikiSitesWorkflowOutput struct {
Success bool
SitesFound int
SitesAdded int
SitesRemoved int
ErrorMessage string
SyncTimestamp time.Time
}
// SyncActivityOptions returns activity options for sync operations.
func SyncActivityOptions() workflow.ActivityOptions {
return workflow.ActivityOptions{
StartToCloseTimeout: 5 * time.Minute,
RetryPolicy: &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: time.Minute,
MaximumAttempts: 5,
},
}
}
// SyncFedWikiSitesWorkflow synchronizes sites from the FedWiki farm to the local database.
// It fetches all sites from the FarmManager API and ensures the local database reflects
// the current state of the farm.
func SyncFedWikiSitesWorkflow(ctx workflow.Context, input SyncFedWikiSitesWorkflowInput) (*SyncFedWikiSitesWorkflowOutput, error) {
logger := workflow.GetLogger(ctx)
logger.Info("SyncFedWikiSitesWorkflow started",
"defaultUserID", input.DefaultUserID)
var activities *Activities
activityCtx := workflow.WithActivityOptions(ctx, SyncActivityOptions())
// Step 1: Fetch all sites from the FedWiki farm
var listResult *ListFedWikiSitesOutput
err := workflow.ExecuteActivity(activityCtx, activities.ListFedWikiSitesActivity, ListFedWikiSitesInput{}).Get(ctx, &listResult)
if err != nil {
logger.Error("Failed to list sites from FedWiki farm", "error", err)
return &SyncFedWikiSitesWorkflowOutput{
Success: false,
ErrorMessage: "Failed to fetch sites from FedWiki farm. Please try again later.",
SyncTimestamp: workflow.Now(ctx),
}, nil
}
logger.Info("Fetched sites from FedWiki farm", "count", len(listResult.Sites))
// Step 2: Sync the sites to the local database
var syncResult *SyncSitesToDBOutput
err = workflow.ExecuteActivity(activityCtx, activities.SyncSitesToDBActivity, SyncSitesToDBInput{
FarmSites: listResult.Sites,
DefaultUserID: input.DefaultUserID,
}).Get(ctx, &syncResult)
if err != nil {
logger.Error("Failed to sync sites to database", "error", err)
return &SyncFedWikiSitesWorkflowOutput{
Success: false,
SitesFound: len(listResult.Sites),
ErrorMessage: "Failed to sync sites to local database.",
SyncTimestamp: workflow.Now(ctx),
}, nil
}
logger.Info("SyncFedWikiSitesWorkflow completed successfully",
"sitesFound", len(listResult.Sites),
"sitesAdded", syncResult.SitesAdded,
"sitesRemoved", syncResult.SitesRemoved)
return &SyncFedWikiSitesWorkflowOutput{
Success: true,
SitesFound: len(listResult.Sites),
SitesAdded: syncResult.SitesAdded,
SitesRemoved: syncResult.SitesRemoved,
SyncTimestamp: workflow.Now(ctx),
}, nil
}
// SyncSitesToDBInput is the input for the SyncSitesToDB activity.
type SyncSitesToDBInput struct {
FarmSites []SiteInfo
DefaultUserID int64
}
// SyncSitesToDBOutput is the output for the SyncSitesToDB activity.
type SyncSitesToDBOutput struct {
SitesAdded int
SitesUpdated int
SitesRemoved int
}
// SyncSitesToDBActivity synchronizes sites from the FedWiki farm to the local database.
// It adds new sites, updates ownership for existing sites, and removes sites that no longer exist on the farm.
func (a *Activities) SyncSitesToDBActivity(ctx context.Context, input SyncSitesToDBInput) (*SyncSitesToDBOutput, error) {
a.Logger.Info("syncing sites to database",
slog.Int("farmSiteCount", len(input.FarmSites)),
slog.Int64("defaultUserID", input.DefaultUserID))
// Get all current sites from the local database (we need full site info for ownership comparison)
localSites, err := a.DB.GetAllSites(ctx)
if err != nil {
a.Logger.Error("failed to get local sites", slog.Any("error", err))
return nil, fmt.Errorf("failed to get local sites: %w", err)
}
// Create a map of local sites by domain for quick lookup
localSiteMap := make(map[string]db.Site, len(localSites))
for _, site := range localSites {
localSiteMap[site.Domain] = site
}
// Create a map of farm domains for quick lookup
farmDomainMap := make(map[string]SiteInfo, len(input.FarmSites))
for _, site := range input.FarmSites {
farmDomainMap[site.Name] = site
}
sitesAdded := 0
sitesUpdated := 0
sitesRemoved := 0
// Add or update sites from the farm
for _, farmSite := range input.FarmSites {
// Skip inactive sites
if farmSite.Status != "active" {
a.Logger.Debug("skipping inactive site", slog.String("domain", farmSite.Name), slog.String("status", farmSite.Status))
continue
}
// Resolve the owner's OIDC subject to a user ID
ownerOIDCSubject := farmSite.Owner.OAuth2.ID
userID := input.DefaultUserID
if ownerOIDCSubject != "" {
// Try to find the user by their OIDC subject
user, err := a.DB.GetUserByOIDCSubject(ctx, ownerOIDCSubject)
if err == nil {
userID = user.ID
a.Logger.Debug("resolved site owner to user",
slog.String("domain", farmSite.Name),
slog.String("ownerOIDC", ownerOIDCSubject),
slog.Int64("userID", userID))
} else {
a.Logger.Debug("site owner not found in DB, using default user",
slog.String("domain", farmSite.Name),
slog.String("ownerOIDC", ownerOIDCSubject),
slog.Int64("defaultUserID", input.DefaultUserID))
}
}
localSite, exists := localSiteMap[farmSite.Name]
if !exists {
// Site exists on farm but not locally - add it
_, err := a.DB.UpsertSiteByDomain(ctx, db.UpsertSiteByDomainParams{
UserID: userID,
Domain: farmSite.Name,
IsCustomDomain: 0, // Assume standard domain for synced sites
OwnerOidcSubject: toNullString(ownerOIDCSubject),
})
if err != nil {
a.Logger.Error("failed to add site",
slog.String("domain", farmSite.Name),
slog.Any("error", err))
continue
}
sitesAdded++
a.Logger.Info("added site from farm",
slog.String("domain", farmSite.Name),
slog.Int64("userID", userID),
slog.String("ownerOIDC", ownerOIDCSubject))
} else {
// Site exists locally - check if ownership needs updating
currentOwnerOIDC := fromNullString(localSite.OwnerOidcSubject)
if currentOwnerOIDC != ownerOIDCSubject || localSite.UserID != userID {
// Ownership has changed - update it
err := a.DB.UpdateSiteOwnership(ctx, db.UpdateSiteOwnershipParams{
UserID: userID,
OwnerOidcSubject: toNullString(ownerOIDCSubject),
Domain: farmSite.Name,
})
if err != nil {
a.Logger.Error("failed to update site ownership",
slog.String("domain", farmSite.Name),
slog.Any("error", err))
continue
}
sitesUpdated++
a.Logger.Info("updated site ownership",
slog.String("domain", farmSite.Name),
slog.Int64("oldUserID", localSite.UserID),
slog.Int64("newUserID", userID),
slog.String("oldOwnerOIDC", currentOwnerOIDC),
slog.String("newOwnerOIDC", ownerOIDCSubject))
}
}
}
// Remove sites from local database that no longer exist on the farm
for _, localSite := range localSites {
farmSite, exists := farmDomainMap[localSite.Domain]
if !exists || farmSite.Status != "active" {
// Site exists locally but not on farm (or is inactive) - remove it
err := a.DB.DeleteSiteByDomain(ctx, localSite.Domain)
if err != nil {
a.Logger.Error("failed to delete site",
slog.String("domain", localSite.Domain),
slog.Any("error", err))
continue
}
sitesRemoved++
a.Logger.Info("removed site not on farm", slog.String("domain", localSite.Domain))
}
}
a.Logger.Info("site sync completed",
slog.Int("sitesAdded", sitesAdded),
slog.Int("sitesUpdated", sitesUpdated),
slog.Int("sitesRemoved", sitesRemoved))
return &SyncSitesToDBOutput{
SitesAdded: sitesAdded,
SitesUpdated: sitesUpdated,
SitesRemoved: sitesRemoved,
}, nil
}
// toNullString converts a string to sql.NullString
func toNullString(s string) sql.NullString {
if s == "" {
return sql.NullString{Valid: false}
}
return sql.NullString{String: s, Valid: true}
}
// fromNullString converts sql.NullString to string
func fromNullString(ns sql.NullString) string {
if ns.Valid {
return ns.String
}
return ""
}