310 lines
7.0 KiB
Go
310 lines
7.0 KiB
Go
package caddy_incus_upstreams
|
|
|
|
import (
|
|
"encoding/json"
|
|
"net"
|
|
"net/http"
|
|
"slices"
|
|
"sort"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/caddyserver/caddy/v2"
|
|
"github.com/caddyserver/caddy/v2/caddyconfig/caddyfile"
|
|
"github.com/caddyserver/caddy/v2/modules/caddyhttp"
|
|
"github.com/caddyserver/caddy/v2/modules/caddyhttp/reverseproxy"
|
|
incus "github.com/lxc/incus/v6/client"
|
|
"github.com/lxc/incus/v6/shared/api"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
const (
|
|
UserConfigEnable = "user.caddyserver.http.enable"
|
|
UserConfigMatchHost = "user.caddyserver.http.matchers.host"
|
|
UserConfigUpstreamPort = "user.caddyserver.http.upstream.port"
|
|
)
|
|
|
|
var (
|
|
candidates []candidate
|
|
candidatesMu sync.RWMutex
|
|
producers = map[string]func(string) (caddyhttp.RequestMatcher, error){
|
|
UserConfigMatchHost: func(value string) (caddyhttp.RequestMatcher, error) {
|
|
return &caddyhttp.MatchHost{value}, nil
|
|
},
|
|
}
|
|
)
|
|
|
|
type candidate struct {
|
|
matchers caddyhttp.MatcherSet
|
|
upstream *reverseproxy.Upstream
|
|
}
|
|
|
|
func init() {
|
|
caddy.RegisterModule(Upstreams{})
|
|
}
|
|
|
|
type Upstreams struct {
|
|
}
|
|
|
|
func (Upstreams) CaddyModule() caddy.ModuleInfo {
|
|
return caddy.ModuleInfo{
|
|
ID: "http.reverse_proxy.upstreams.incus",
|
|
New: func() caddy.Module { return new(Upstreams) },
|
|
}
|
|
}
|
|
|
|
func (u *Upstreams) Provision(ctx caddy.Context) error {
|
|
conn, err := incus.ConnectIncusUnix("", nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
ctx.Logger().Info("connected to incus")
|
|
|
|
return u.provision(ctx, conn)
|
|
}
|
|
|
|
func (u *Upstreams) GetUpstreams(r *http.Request) ([]*reverseproxy.Upstream, error) {
|
|
upstreams := make([]*reverseproxy.Upstream, 0, 1)
|
|
|
|
candidatesMu.RLock()
|
|
defer candidatesMu.RUnlock()
|
|
|
|
for _, c := range candidates {
|
|
if c.matchers.Match(r) {
|
|
upstreams = append(upstreams, c.upstream)
|
|
}
|
|
}
|
|
|
|
return upstreams, nil
|
|
}
|
|
|
|
func (u *Upstreams) UnmarshalCaddyfile(d *caddyfile.Dispenser) error {
|
|
for d.Next() {
|
|
if d.NextArg() {
|
|
return d.ArgErr()
|
|
}
|
|
if d.NextBlock(0) {
|
|
return d.Errf("unrecognized incus option '%s'", d.Val())
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (u *Upstreams) provision(ctx caddy.Context, conn incus.InstanceServer) error {
|
|
err := u.provisionCandidates(ctx, conn)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
go u.keepUpdated(ctx, conn)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (u *Upstreams) provisionCandidates(ctx caddy.Context, conn incus.InstanceServer) error {
|
|
instances, err := conn.GetInstancesAllProjects(api.InstanceTypeContainer)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
updated := make([]candidate, 0, len(instances))
|
|
|
|
for _, i := range instances {
|
|
matchers := buildMatchers(ctx, i.Config)
|
|
|
|
ctx.Logger().Debug("matched instance", zap.String("instance_name", i.Name))
|
|
|
|
enabled, ok := i.Config[UserConfigEnable]
|
|
if !ok {
|
|
ctx.Logger().Debug("dynamic incus upstream not enabled",
|
|
zap.String("instance_name", i.Name),
|
|
)
|
|
continue
|
|
}
|
|
|
|
if enabled != "true" {
|
|
ctx.Logger().Debug("dynamic incus upstream disabled",
|
|
zap.String("instance_name", i.Name),
|
|
zap.String("enabled", enabled),
|
|
)
|
|
continue
|
|
}
|
|
|
|
port, ok := i.Config[UserConfigUpstreamPort]
|
|
if !ok {
|
|
ctx.Logger().Debug("unable to get port from instance config",
|
|
zap.String("instance_name", i.Name),
|
|
)
|
|
continue
|
|
}
|
|
|
|
ctx.Logger().Debug("port retrieved",
|
|
zap.Any("instance_name", i.Name),
|
|
zap.String("port", port),
|
|
)
|
|
|
|
pConn := conn.UseProject(i.Project)
|
|
instanceFull, _, err := pConn.GetInstanceFull(i.Name)
|
|
if err != nil {
|
|
ctx.Logger().Debug("unable to get full instance info",
|
|
zap.String("instance_name", i.Name),
|
|
zap.Error(err),
|
|
)
|
|
continue
|
|
}
|
|
|
|
ipv4s := []string{}
|
|
if instanceFull.IsActive() && instanceFull.State != nil && instanceFull.State.Network != nil {
|
|
for _, net := range instanceFull.State.Network {
|
|
if net.Type == "loopback" {
|
|
continue
|
|
}
|
|
|
|
for _, addr := range net.Addresses {
|
|
if slices.Contains([]string{"link", "local"}, addr.Scope) {
|
|
continue
|
|
}
|
|
|
|
if addr.Family == "inet" {
|
|
ipv4s = append(ipv4s, addr.Address)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if len(ipv4s) == 0 {
|
|
ctx.Logger().Error("unable to get ipv4",
|
|
zap.String("instance_name", i.Name),
|
|
)
|
|
continue
|
|
}
|
|
|
|
sort.Sort(sort.Reverse(sort.StringSlice(ipv4s)))
|
|
addr := ipv4s[0]
|
|
|
|
ctx.Logger().Debug("ipv4 retrieved",
|
|
zap.Any("instance_name", i.Name),
|
|
zap.String("ipv4", addr),
|
|
)
|
|
|
|
address := net.JoinHostPort(addr, port)
|
|
|
|
ctx.Logger().Debug("candidate is provisioned",
|
|
zap.String("instance_name", i.Name),
|
|
zap.String("address", address),
|
|
zap.Any("matchers", matchers),
|
|
)
|
|
|
|
updated = append(updated, candidate{
|
|
matchers: matchers,
|
|
upstream: &reverseproxy.Upstream{Dial: address},
|
|
})
|
|
}
|
|
|
|
candidatesMu.Lock()
|
|
candidates = updated
|
|
candidatesMu.Unlock()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (u *Upstreams) keepUpdated(ctx caddy.Context, conn incus.InstanceServer) {
|
|
defer conn.Disconnect()
|
|
|
|
listener, err := conn.GetEventsAllProjects()
|
|
if err != nil {
|
|
ctx.Logger().Warn("unable to monitor instance events, will retry", zap.Error(err))
|
|
time.Sleep(500 * time.Millisecond)
|
|
u.keepUpdated(ctx, conn)
|
|
}
|
|
|
|
ctx.Logger().Debug("initialised event listener")
|
|
|
|
events := []string{
|
|
api.EventLifecycleInstanceCreated,
|
|
api.EventLifecycleInstanceReady,
|
|
api.EventLifecycleInstanceRestarted,
|
|
api.EventLifecycleInstanceResumed,
|
|
api.EventLifecycleInstanceStarted,
|
|
}
|
|
|
|
if _, err := listener.AddHandler([]string{"lifecycle"}, func(event api.Event) {
|
|
metadata := &api.EventLifecycle{}
|
|
if err := json.Unmarshal(event.Metadata, &metadata); err != nil {
|
|
ctx.Logger().Debug("unable to marshal event metadata",
|
|
zap.Any("event", event),
|
|
zap.Error(err),
|
|
)
|
|
return
|
|
}
|
|
|
|
if !slices.Contains(events, metadata.Action) {
|
|
return
|
|
}
|
|
|
|
// NOTE(d1): ensure network comes up
|
|
time.Sleep(3 * time.Second)
|
|
|
|
ctx.Logger().Debug("handling event",
|
|
zap.String("instance_name", metadata.Name),
|
|
zap.String("event", metadata.Action),
|
|
)
|
|
|
|
if err := u.provisionCandidates(ctx, conn); err != nil {
|
|
ctx.Logger().Error("unable to provision candidates", zap.Error(err))
|
|
}
|
|
}); err != nil {
|
|
ctx.Logger().Error("event listener handler setup error", zap.Error(err))
|
|
}
|
|
|
|
if err := listener.Wait(); err != nil {
|
|
ctx.Logger().Error("event listener wait error", zap.Error(err))
|
|
}
|
|
|
|
ctx.Logger().Debug("event listener saying goodbye 👋")
|
|
}
|
|
|
|
func buildMatchers(ctx caddy.Context, config map[string]string) caddyhttp.MatcherSet {
|
|
var matchers caddyhttp.MatcherSet
|
|
|
|
for key, producer := range producers {
|
|
value, ok := config[key]
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
matcher, err := producer(value)
|
|
if err != nil {
|
|
ctx.Logger().Error("unable to load matcher",
|
|
zap.String("key", key),
|
|
zap.String("value", value),
|
|
zap.Error(err),
|
|
)
|
|
continue
|
|
}
|
|
|
|
if prov, ok := matcher.(caddy.Provisioner); ok {
|
|
err = prov.Provision(ctx)
|
|
if err != nil {
|
|
ctx.Logger().Error("unable to provision matcher",
|
|
zap.String("key", key),
|
|
zap.String("value", value),
|
|
zap.Error(err),
|
|
)
|
|
continue
|
|
}
|
|
}
|
|
|
|
matchers = append(matchers, matcher)
|
|
}
|
|
|
|
return matchers
|
|
}
|
|
|
|
var (
|
|
_ caddy.Provisioner = (*Upstreams)(nil)
|
|
_ reverseproxy.UpstreamSource = (*Upstreams)(nil)
|
|
_ caddyfile.Unmarshaler = (*Upstreams)(nil)
|
|
)
|