From 15bcae1956b9e16205f4322028be1a8b27ac87fe Mon Sep 17 00:00:00 2001 From: Andrew Hsu Date: Fri, 4 Aug 2017 14:29:19 -0700 Subject: [PATCH] vndr libnetwork to latest bump_17.07 Signed-off-by: Andrew Hsu --- components/engine/vendor.conf | 2 +- .../upgrade/v17_06_1/template.go | 20 -- .../github.com/docker/libnetwork/agent.go | 31 ++- .../docker/libnetwork/bitseq/sequence.go | 8 +- .../docker/libnetwork/config/config.go | 34 ++- .../docker/libnetwork/diagnose/diagnose.go | 133 ++++++++++ .../libnetwork/drivers/overlay/ov_network.go | 65 ++++- .../libnetwork/drivers/overlay/peerdb.go | 6 + .../libnetwork/drivers/windows/labels.go | 3 + .../libnetwork/drivers/windows/windows.go | 14 + .../docker/libnetwork/drivers_windows.go | 1 + .../docker/libnetwork/endpoint_info.go | 8 +- .../docker/libnetwork/iptables/iptables.go | 8 +- .../github.com/docker/libnetwork/network.go | 23 +- .../docker/libnetwork/network_windows.go | 2 +- .../docker/libnetwork/networkdb/broadcast.go | 3 +- .../docker/libnetwork/networkdb/cluster.go | 47 ++-- .../docker/libnetwork/networkdb/delegate.go | 64 +++-- .../libnetwork/networkdb/event_delegate.go | 8 +- .../docker/libnetwork/networkdb/message.go | 4 - .../docker/libnetwork/networkdb/networkdb.go | 191 ++++++++------ .../libnetwork/networkdb/networkdbdiagnose.go | 242 ++++++++++++++++++ .../docker/libnetwork/osl/neigh_linux.go | 4 +- .../github.com/docker/libnetwork/resolver.go | 2 +- .../github.com/docker/libnetwork/sandbox.go | 16 +- .../libnetwork/sandbox_externalkey_unix.go | 1 - .../docker/libnetwork/sandbox_store.go | 4 +- .../docker/libnetwork/service_linux.go | 5 +- 28 files changed, 715 insertions(+), 234 deletions(-) delete mode 100644 components/engine/vendor/github.com/crosbymichael/upgrade/v17_06_1/template.go create mode 100644 components/engine/vendor/github.com/docker/libnetwork/diagnose/diagnose.go create mode 100644 components/engine/vendor/github.com/docker/libnetwork/networkdb/networkdbdiagnose.go diff --git a/components/engine/vendor.conf b/components/engine/vendor.conf index cf39a58ac8..e935b772aa 100644 --- a/components/engine/vendor.conf +++ b/components/engine/vendor.conf @@ -27,7 +27,7 @@ github.com/imdario/mergo 0.2.1 golang.org/x/sync de49d9dcd27d4f764488181bea099dfe6179bcf0 #get libnetwork packages -github.com/docker/libnetwork 6426d1e66f33c0b0c8bb135b7ee547447f54d043 +github.com/docker/libnetwork 449a72278619d77b2806cca1a5138342d04e6f2e github.com/docker/go-events 18b43f1bc85d9cdd42c05a6cd2d444c7a200a894 github.com/armon/go-radix e39d623f12e8e41c7b5529e9a9dd67a1e2261f80 github.com/armon/go-metrics eb0af217e5e9747e41dd5303755356b62d28e3ec diff --git a/components/engine/vendor/github.com/crosbymichael/upgrade/v17_06_1/template.go b/components/engine/vendor/github.com/crosbymichael/upgrade/v17_06_1/template.go deleted file mode 100644 index 17dba4e3ad..0000000000 --- a/components/engine/vendor/github.com/crosbymichael/upgrade/v17_06_1/template.go +++ /dev/null @@ -1,20 +0,0 @@ -//+build ignore - -package v17_06_1 - -import ( - "github.com/containerd/containerd/runtime" - "github.com/opencontainers/runc/libcontainer" - specs "github.com/opencontainers/runtime-spec/specs-go" -) - -//go:generate -command rewrite go run ../gen/rewrite-structs.go -- - -//go:generate rewrite spec_gen.go .Process.Capabilities->linuxCapabilities .Linux.Resources.Memory.Swappiness->memorySwappiness .Linux.Seccomp.Syscalls->linuxSyscalls -type Spec specs.Spec - -//go:generate rewrite process_state_gen.go .Capabilities->linuxCapabilities -type ProcessState runtime.ProcessState - -//go:generate rewrite state_gen.go .Config.Capabilities->linuxCapabilities .Config.Cgroups.MemorySwappiness->memorySwappiness -type State libcontainer.State diff --git a/components/engine/vendor/github.com/docker/libnetwork/agent.go b/components/engine/vendor/github.com/docker/libnetwork/agent.go index a45a569500..4877df1c34 100644 --- a/components/engine/vendor/github.com/docker/libnetwork/agent.go +++ b/components/engine/vendor/github.com/docker/libnetwork/agent.go @@ -214,8 +214,8 @@ func (c *controller) agentSetup(clusterProvider cluster.Provider) error { listen := clusterProvider.GetListenAddress() listenAddr, _, _ := net.SplitHostPort(listen) - logrus.Infof("Initializing Libnetwork Agent Listen-Addr=%s Local-addr=%s Adv-addr=%s Data-addr=%s Remote-addr-list=%v", - listenAddr, bindAddr, advAddr, dataAddr, remoteAddrList) + logrus.Infof("Initializing Libnetwork Agent Listen-Addr=%s Local-addr=%s Adv-addr=%s Data-addr=%s Remote-addr-list=%v MTU=%d", + listenAddr, bindAddr, advAddr, dataAddr, remoteAddrList, c.Config().Daemon.NetworkControlPlaneMTU) if advAddr != "" && agent == nil { if err := c.agentInit(listenAddr, bindAddr, advAddr, dataAddr); err != nil { logrus.Errorf("error in agentInit: %v", err) @@ -286,12 +286,19 @@ func (c *controller) agentInit(listenAddr, bindAddrOrInterface, advertiseAddr, d nodeName := hostname + "-" + stringid.TruncateID(stringid.GenerateRandomID()) logrus.Info("Gossip cluster hostname ", nodeName) - nDB, err := networkdb.New(&networkdb.Config{ - BindAddr: listenAddr, - AdvertiseAddr: advertiseAddr, - NodeName: nodeName, - Keys: keys, - }) + netDBConf := networkdb.DefaultConfig() + netDBConf.NodeName = nodeName + netDBConf.BindAddr = listenAddr + netDBConf.AdvertiseAddr = advertiseAddr + netDBConf.Keys = keys + if c.Config().Daemon.NetworkControlPlaneMTU != 0 { + // Consider the MTU remove the IP hdr (IPv4 or IPv6) and the TCP/UDP hdr. + // To be on the safe side let's cut 100 bytes + netDBConf.PacketBufferSize = (c.Config().Daemon.NetworkControlPlaneMTU - 100) + logrus.Debugf("Control plane MTU: %d will initialize NetworkDB with: %d", + c.Config().Daemon.NetworkControlPlaneMTU, netDBConf.PacketBufferSize) + } + nDB, err := networkdb.New(netDBConf) if err != nil { return err @@ -383,15 +390,11 @@ func (c *controller) agentClose() { agent.Lock() for _, cancelFuncs := range agent.driverCancelFuncs { - for _, cancel := range cancelFuncs { - cancelList = append(cancelList, cancel) - } + cancelList = append(cancelList, cancelFuncs...) } // Add also the cancel functions for the network db - for _, cancel := range agent.coreCancelFuncs { - cancelList = append(cancelList, cancel) - } + cancelList = append(cancelList, agent.coreCancelFuncs...) agent.Unlock() for _, cancel := range cancelList { diff --git a/components/engine/vendor/github.com/docker/libnetwork/bitseq/sequence.go b/components/engine/vendor/github.com/docker/libnetwork/bitseq/sequence.go index 86cf69b34f..6accc8acc3 100644 --- a/components/engine/vendor/github.com/docker/libnetwork/bitseq/sequence.go +++ b/components/engine/vendor/github.com/docker/libnetwork/bitseq/sequence.go @@ -497,7 +497,10 @@ func getFirstAvailable(head *sequence, start uint64) (uint64, uint64, error) { // Derive the this sequence offsets byteOffset := byteStart - inBlockBytePos bitOffset := inBlockBytePos*8 + bitStart - + var firstOffset uint64 + if current == head { + firstOffset = byteOffset + } for current != nil { if current.block != blockMAX { bytePos, bitPos, err := current.getAvailableBit(bitOffset) @@ -505,7 +508,8 @@ func getFirstAvailable(head *sequence, start uint64) (uint64, uint64, error) { } // Moving to next block: Reset bit offset. bitOffset = 0 - byteOffset += current.count * blockBytes + byteOffset += (current.count * blockBytes) - firstOffset + firstOffset = 0 current = current.next } return invalidPos, invalidPos, ErrNoBitAvailable diff --git a/components/engine/vendor/github.com/docker/libnetwork/config/config.go b/components/engine/vendor/github.com/docker/libnetwork/config/config.go index 3acb4320c4..a2e43e3a53 100644 --- a/components/engine/vendor/github.com/docker/libnetwork/config/config.go +++ b/components/engine/vendor/github.com/docker/libnetwork/config/config.go @@ -26,14 +26,15 @@ type Config struct { // DaemonCfg represents libnetwork core configuration type DaemonCfg struct { - Debug bool - Experimental bool - DataDir string - DefaultNetwork string - DefaultDriver string - Labels []string - DriverCfg map[string]interface{} - ClusterProvider cluster.Provider + Debug bool + Experimental bool + DataDir string + DefaultNetwork string + DefaultDriver string + Labels []string + DriverCfg map[string]interface{} + ClusterProvider cluster.Provider + NetworkControlPlaneMTU int } // ClusterCfg represents cluster configuration @@ -221,6 +222,18 @@ func OptionExperimental(exp bool) Option { } } +// OptionNetworkControlPlaneMTU function returns an option setter for control plane MTU +func OptionNetworkControlPlaneMTU(exp int) Option { + return func(c *Config) { + logrus.Debugf("Network Control Plane MTU: %d", exp) + if exp < 1500 { + // if exp == 0 the value won't be used + logrus.Warnf("Received a MTU of %d, this value is very low, the network control plane can misbehave", exp) + } + c.Daemon.NetworkControlPlaneMTU = exp + } +} + // ProcessOptions processes options and stores it in config func (c *Config) ProcessOptions(options ...Option) { for _, opt := range options { @@ -232,10 +245,7 @@ func (c *Config) ProcessOptions(options ...Option) { // IsValidName validates configuration objects supported by libnetwork func IsValidName(name string) bool { - if strings.TrimSpace(name) == "" { - return false - } - return true + return strings.TrimSpace(name) != "" } // OptionLocalKVProvider function returns an option setter for kvstore provider diff --git a/components/engine/vendor/github.com/docker/libnetwork/diagnose/diagnose.go b/components/engine/vendor/github.com/docker/libnetwork/diagnose/diagnose.go new file mode 100644 index 0000000000..0ce7a491bf --- /dev/null +++ b/components/engine/vendor/github.com/docker/libnetwork/diagnose/diagnose.go @@ -0,0 +1,133 @@ +package diagnose + +import ( + "fmt" + "net" + "net/http" + "sync" + + "github.com/Sirupsen/logrus" +) + +// HTTPHandlerFunc TODO +type HTTPHandlerFunc func(interface{}, http.ResponseWriter, *http.Request) + +type httpHandlerCustom struct { + ctx interface{} + F func(interface{}, http.ResponseWriter, *http.Request) +} + +// ServeHTTP TODO +func (h httpHandlerCustom) ServeHTTP(w http.ResponseWriter, r *http.Request) { + h.F(h.ctx, w, r) +} + +var diagPaths2Func = map[string]HTTPHandlerFunc{ + "/": notImplemented, + "/help": help, + "/ready": ready, +} + +// Server when the debug is enabled exposes a +// This data structure is protected by the Agent mutex so does not require and additional mutex here +type Server struct { + sk net.Listener + port int + mux *http.ServeMux + registeredHanders []string + sync.Mutex +} + +// Init TODO +func (n *Server) Init() { + n.mux = http.NewServeMux() + + // Register local handlers + n.RegisterHandler(n, diagPaths2Func) +} + +// RegisterHandler TODO +func (n *Server) RegisterHandler(ctx interface{}, hdlrs map[string]HTTPHandlerFunc) { + n.Lock() + defer n.Unlock() + for path, fun := range hdlrs { + n.mux.Handle(path, httpHandlerCustom{ctx, fun}) + n.registeredHanders = append(n.registeredHanders, path) + } +} + +// EnableDebug opens a TCP socket to debug the passed network DB +func (n *Server) EnableDebug(ip string, port int) { + n.Lock() + defer n.Unlock() + + n.port = port + logrus.SetLevel(logrus.DebugLevel) + + if n.sk != nil { + logrus.Infof("The server is already up and running") + return + } + + logrus.Infof("Starting the server listening on %d for commands", port) + + // // Create the socket + // var err error + // n.sk, err = net.Listen("tcp", listeningAddr) + // if err != nil { + // log.Fatal(err) + // } + // + // go func() { + // http.Serve(n.sk, n.mux) + // }() + http.ListenAndServe(":8000", n.mux) +} + +// DisableDebug stop the dubug and closes the tcp socket +func (n *Server) DisableDebug() { + n.Lock() + defer n.Unlock() + n.sk.Close() + n.sk = nil +} + +// IsDebugEnable returns true when the debug is enabled +func (n *Server) IsDebugEnable() bool { + n.Lock() + defer n.Unlock() + return n.sk != nil +} + +func notImplemented(ctx interface{}, w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, "URL path: %s no method implemented check /help\n", r.URL.Path) +} + +func help(ctx interface{}, w http.ResponseWriter, r *http.Request) { + n, ok := ctx.(*Server) + if ok { + for _, path := range n.registeredHanders { + fmt.Fprintf(w, "%s\n", path) + } + } +} + +func ready(ctx interface{}, w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, "OK\n") +} + +// DebugHTTPForm TODO +func DebugHTTPForm(r *http.Request) { + r.ParseForm() + for k, v := range r.Form { + logrus.Debugf("Form[%q] = %q\n", k, v) + } +} + +// HTTPReplyError TODO +func HTTPReplyError(w http.ResponseWriter, message, usage string) { + fmt.Fprintf(w, "%s\n", message) + if usage != "" { + fmt.Fprintf(w, "Usage: %s\n", usage) + } +} diff --git a/components/engine/vendor/github.com/docker/libnetwork/drivers/overlay/ov_network.go b/components/engine/vendor/github.com/docker/libnetwork/drivers/overlay/ov_network.go index 3a4ea41bfc..01f6287bed 100644 --- a/components/engine/vendor/github.com/docker/libnetwork/drivers/overlay/ov_network.go +++ b/components/engine/vendor/github.com/docker/libnetwork/drivers/overlay/ov_network.go @@ -12,6 +12,7 @@ import ( "strings" "sync" "syscall" + "time" "github.com/Sirupsen/logrus" "github.com/docker/docker/pkg/reexec" @@ -705,6 +706,7 @@ func (n *network) initSandbox(restore bool) error { } func (n *network) watchMiss(nlSock *nl.NetlinkSocket) { + t := time.Now() for { msgs, err := nlSock.Receive() if err != nil { @@ -757,23 +759,55 @@ func (n *network) watchMiss(nlSock *nl.NetlinkSocket) { continue } - if !n.driver.isSerfAlive() { - continue - } + if n.driver.isSerfAlive() { + mac, IPmask, vtep, err := n.driver.resolvePeer(n.id, ip) + if err != nil { + logrus.Errorf("could not resolve peer %q: %v", ip, err) + continue + } - mac, IPmask, vtep, err := n.driver.resolvePeer(n.id, ip) - if err != nil { - logrus.Errorf("could not resolve peer %q: %v", ip, err) - continue - } - - if err := n.driver.peerAdd(n.id, "dummy", ip, IPmask, mac, vtep, true, l2Miss, l3Miss); err != nil { - logrus.Errorf("could not add neighbor entry for missed peer %q: %v", ip, err) + if err := n.driver.peerAdd(n.id, "dummy", ip, IPmask, mac, vtep, true, l2Miss, l3Miss); err != nil { + logrus.Errorf("could not add neighbor entry for missed peer %q: %v", ip, err) + } + } else { + // If the gc_thresh values are lower kernel might knock off the neighor entries. + // When we get a L3 miss check if its a valid peer and reprogram the neighbor + // entry again. Rate limit it to once attempt every 500ms, just in case a faulty + // container sends a flood of packets to invalid peers + if !l3Miss { + continue + } + if time.Since(t) > 500*time.Millisecond { + t = time.Now() + n.programNeighbor(ip) + } } } } } +func (n *network) programNeighbor(ip net.IP) { + peerMac, _, _, err := n.driver.peerDbSearch(n.id, ip) + if err != nil { + logrus.Errorf("Reprogramming on L3 miss failed for %s, no peer entry", ip) + return + } + s := n.getSubnetforIPAddr(ip) + if s == nil { + logrus.Errorf("Reprogramming on L3 miss failed for %s, not a valid subnet", ip) + return + } + sbox := n.sandbox() + if sbox == nil { + logrus.Errorf("Reprogramming on L3 miss failed for %s, overlay sandbox missing", ip) + return + } + if err := sbox.AddNeighbor(ip, peerMac, true, sbox.NeighborOptions().LinkName(s.vxlanName)); err != nil { + logrus.Errorf("Reprogramming on L3 miss failed for %s: %v", ip, err) + return + } +} + func (d *driver) addNetwork(n *network) { d.Lock() d.networks[n.id] = n @@ -1052,6 +1086,15 @@ func (n *network) contains(ip net.IP) bool { return false } +func (n *network) getSubnetforIPAddr(ip net.IP) *subnet { + for _, s := range n.subnets { + if s.subnetIP.Contains(ip) { + return s + } + } + return nil +} + // getSubnetforIP returns the subnet to which the given IP belongs func (n *network) getSubnetforIP(ip *net.IPNet) *subnet { for _, s := range n.subnets { diff --git a/components/engine/vendor/github.com/docker/libnetwork/drivers/overlay/peerdb.go b/components/engine/vendor/github.com/docker/libnetwork/drivers/overlay/peerdb.go index 21cd1fbe3d..510c6138d4 100644 --- a/components/engine/vendor/github.com/docker/libnetwork/drivers/overlay/peerdb.go +++ b/components/engine/vendor/github.com/docker/libnetwork/drivers/overlay/peerdb.go @@ -207,6 +207,12 @@ func (d *driver) peerDbDelete(nid, eid string, peerIP net.IP, peerIPMask net.IPM } func (d *driver) peerDbUpdateSandbox(nid string) { + // The following logic is useful only in non swarm mode + // In swarm mode instead the programmig will come directly from networkDB + if !d.isSerfAlive() { + return + } + d.peerDb.Lock() pMap, ok := d.peerDb.mp[nid] if !ok { diff --git a/components/engine/vendor/github.com/docker/libnetwork/drivers/windows/labels.go b/components/engine/vendor/github.com/docker/libnetwork/drivers/windows/labels.go index b32c6ffb61..6cb077cb4f 100644 --- a/components/engine/vendor/github.com/docker/libnetwork/drivers/windows/labels.go +++ b/components/engine/vendor/github.com/docker/libnetwork/drivers/windows/labels.go @@ -28,6 +28,9 @@ const ( // DNSServers of the network DNSServers = "com.docker.network.windowsshim.dnsservers" + // MacPool of the network + MacPool = "com.docker.network.windowsshim.macpool" + // SourceMac of the network SourceMac = "com.docker.network.windowsshim.sourcemac" diff --git a/components/engine/vendor/github.com/docker/libnetwork/drivers/windows/windows.go b/components/engine/vendor/github.com/docker/libnetwork/drivers/windows/windows.go index eb1522a74a..19b2e685b4 100644 --- a/components/engine/vendor/github.com/docker/libnetwork/drivers/windows/windows.go +++ b/components/engine/vendor/github.com/docker/libnetwork/drivers/windows/windows.go @@ -38,6 +38,7 @@ type networkConfiguration struct { VLAN uint VSID uint DNSServers string + MacPools []hcsshim.MacPool DNSSuffix string SourceMac string NetworkAdapterName string @@ -168,6 +169,18 @@ func (d *driver) parseNetworkOptions(id string, genericOptions map[string]string config.DNSSuffix = value case DNSServers: config.DNSServers = value + case MacPool: + config.MacPools = make([]hcsshim.MacPool, 0) + s := strings.Split(value, ",") + if len(s)%2 != 0 { + return nil, types.BadRequestErrorf("Invalid mac pool. You must specify both a start range and an end range") + } + for i := 0; i < len(s)-1; i += 2 { + config.MacPools = append(config.MacPools, hcsshim.MacPool{ + StartMacAddress: s[i], + EndMacAddress: s[i+1], + }) + } case VLAN: vlan, err := strconv.ParseUint(value, 10, 32) if err != nil { @@ -274,6 +287,7 @@ func (d *driver) CreateNetwork(id string, option map[string]interface{}, nInfo d Subnets: subnets, DNSServerList: config.DNSServers, DNSSuffix: config.DNSSuffix, + MacPools: config.MacPools, SourceMac: config.SourceMac, NetworkAdapterName: config.NetworkAdapterName, } diff --git a/components/engine/vendor/github.com/docker/libnetwork/drivers_windows.go b/components/engine/vendor/github.com/docker/libnetwork/drivers_windows.go index 384d855cb8..a037c16efb 100644 --- a/components/engine/vendor/github.com/docker/libnetwork/drivers_windows.go +++ b/components/engine/vendor/github.com/docker/libnetwork/drivers_windows.go @@ -16,5 +16,6 @@ func getInitializers(experimental bool) []initializer { {windows.GetInit("l2bridge"), "l2bridge"}, {windows.GetInit("l2tunnel"), "l2tunnel"}, {windows.GetInit("nat"), "nat"}, + {windows.GetInit("ics"), "ics"}, } } diff --git a/components/engine/vendor/github.com/docker/libnetwork/endpoint_info.go b/components/engine/vendor/github.com/docker/libnetwork/endpoint_info.go index 202c27b308..47bff432cb 100644 --- a/components/engine/vendor/github.com/docker/libnetwork/endpoint_info.go +++ b/components/engine/vendor/github.com/docker/libnetwork/endpoint_info.go @@ -154,9 +154,7 @@ func (epi *endpointInterface) CopyTo(dstEpi *endpointInterface) error { dstEpi.v6PoolID = epi.v6PoolID if len(epi.llAddrs) != 0 { dstEpi.llAddrs = make([]*net.IPNet, 0, len(epi.llAddrs)) - for _, ll := range epi.llAddrs { - dstEpi.llAddrs = append(dstEpi.llAddrs, ll) - } + dstEpi.llAddrs = append(dstEpi.llAddrs, epi.llAddrs...) } for _, route := range epi.routes { @@ -415,7 +413,7 @@ func (epj *endpointJoinInfo) UnmarshalJSON(b []byte) error { return err } if v, ok := epMap["gw"]; ok { - epj.gw6 = net.ParseIP(v.(string)) + epj.gw = net.ParseIP(v.(string)) } if v, ok := epMap["gw6"]; ok { epj.gw6 = net.ParseIP(v.(string)) @@ -444,6 +442,6 @@ func (epj *endpointJoinInfo) CopyTo(dstEpj *endpointJoinInfo) error { dstEpj.driverTableEntries = make([]*tableEntry, len(epj.driverTableEntries)) copy(dstEpj.driverTableEntries, epj.driverTableEntries) dstEpj.gw = types.GetIPCopy(epj.gw) - dstEpj.gw = types.GetIPCopy(epj.gw6) + dstEpj.gw6 = types.GetIPCopy(epj.gw6) return nil } diff --git a/components/engine/vendor/github.com/docker/libnetwork/iptables/iptables.go b/components/engine/vendor/github.com/docker/libnetwork/iptables/iptables.go index 20edb9b5d6..caa202b366 100644 --- a/components/engine/vendor/github.com/docker/libnetwork/iptables/iptables.go +++ b/components/engine/vendor/github.com/docker/libnetwork/iptables/iptables.go @@ -151,11 +151,11 @@ func ProgramChain(c *ChainInfo, bridgeName string, hairpinMode, enable bool) err "-j", c.Name} if !Exists(Nat, "PREROUTING", preroute...) && enable { if err := c.Prerouting(Append, preroute...); err != nil { - return fmt.Errorf("Failed to inject docker in PREROUTING chain: %s", err) + return fmt.Errorf("Failed to inject %s in PREROUTING chain: %s", c.Name, err) } } else if Exists(Nat, "PREROUTING", preroute...) && !enable { if err := c.Prerouting(Delete, preroute...); err != nil { - return fmt.Errorf("Failed to remove docker in PREROUTING chain: %s", err) + return fmt.Errorf("Failed to remove %s in PREROUTING chain: %s", c.Name, err) } } output := []string{ @@ -167,11 +167,11 @@ func ProgramChain(c *ChainInfo, bridgeName string, hairpinMode, enable bool) err } if !Exists(Nat, "OUTPUT", output...) && enable { if err := c.Output(Append, output...); err != nil { - return fmt.Errorf("Failed to inject docker in OUTPUT chain: %s", err) + return fmt.Errorf("Failed to inject %s in OUTPUT chain: %s", c.Name, err) } } else if Exists(Nat, "OUTPUT", output...) && !enable { if err := c.Output(Delete, output...); err != nil { - return fmt.Errorf("Failed to inject docker in OUTPUT chain: %s", err) + return fmt.Errorf("Failed to inject %s in OUTPUT chain: %s", c.Name, err) } } case Filter: diff --git a/components/engine/vendor/github.com/docker/libnetwork/network.go b/components/engine/vendor/github.com/docker/libnetwork/network.go index 9f99064e11..72deeea660 100644 --- a/components/engine/vendor/github.com/docker/libnetwork/network.go +++ b/components/engine/vendor/github.com/docker/libnetwork/network.go @@ -434,15 +434,11 @@ func (n *network) applyConfigurationTo(to *network) error { } if len(n.ipamV4Config) > 0 { to.ipamV4Config = make([]*IpamConf, 0, len(n.ipamV4Config)) - for _, v4conf := range n.ipamV4Config { - to.ipamV4Config = append(to.ipamV4Config, v4conf) - } + to.ipamV4Config = append(to.ipamV4Config, n.ipamV4Config...) } if len(n.ipamV6Config) > 0 { to.ipamV6Config = make([]*IpamConf, 0, len(n.ipamV6Config)) - for _, v6conf := range n.ipamV6Config { - to.ipamV6Config = append(to.ipamV6Config, v6conf) - } + to.ipamV6Config = append(to.ipamV6Config, n.ipamV6Config...) } if len(n.generic) > 0 { to.generic = options.Generic{} @@ -873,8 +869,7 @@ func (n *network) resolveDriver(name string, load bool) (driverapi.Driver, *driv d, cap := c.drvRegistry.Driver(name) if d == nil { if load { - var err error - err = c.loadDriver(name) + err := c.loadDriver(name) if err != nil { return nil, nil, err } @@ -1451,11 +1446,7 @@ func (n *network) ipamAllocate() error { } err = n.ipamAllocateVersion(6, ipam) - if err != nil { - return err - } - - return nil + return err } func (n *network) requestPoolHelper(ipam ipamapi.Ipam, addressSpace, preferredPool, subPool string, options map[string]string, v6 bool) (string, *net.IPNet, map[string]string, error) { @@ -1654,9 +1645,7 @@ func (n *network) getIPInfo(ipVer int) []*IpamInfo { } l := make([]*IpamInfo, 0, len(info)) n.Lock() - for _, d := range info { - l = append(l, d) - } + l = append(l, info...) n.Unlock() return l } @@ -1870,7 +1859,7 @@ func (n *network) ResolveName(req string, ipType int) ([]net.IP, bool) { // the docker network domain. If the network is not v6 enabled // set ipv6Miss to filter the DNS query from going to external // resolvers. - if ok && n.enableIPv6 == false { + if ok && !n.enableIPv6 { ipv6Miss = true } ipSet, ok = sr.svcIPv6Map.Get(req) diff --git a/components/engine/vendor/github.com/docker/libnetwork/network_windows.go b/components/engine/vendor/github.com/docker/libnetwork/network_windows.go index 07a1c1d424..ddcd3345f2 100644 --- a/components/engine/vendor/github.com/docker/libnetwork/network_windows.go +++ b/components/engine/vendor/github.com/docker/libnetwork/network_windows.go @@ -29,7 +29,7 @@ func executeInCompartment(compartmentID uint32, x func()) { func (n *network) startResolver() { n.resolverOnce.Do(func() { - logrus.Debugf("Launching DNS server for network", n.Name()) + logrus.Debugf("Launching DNS server for network %q", n.Name()) options := n.Info().DriverOptions() hnsid := options[windows.HNSID] diff --git a/components/engine/vendor/github.com/docker/libnetwork/networkdb/broadcast.go b/components/engine/vendor/github.com/docker/libnetwork/networkdb/broadcast.go index 3fe9f6271a..52e96ec639 100644 --- a/components/engine/vendor/github.com/docker/libnetwork/networkdb/broadcast.go +++ b/components/engine/vendor/github.com/docker/libnetwork/networkdb/broadcast.go @@ -114,7 +114,8 @@ type tableEventMessage struct { } func (m *tableEventMessage) Invalidates(other memberlist.Broadcast) bool { - return false + otherm := other.(*tableEventMessage) + return m.tname == otherm.tname && m.id == otherm.id && m.key == otherm.key } func (m *tableEventMessage) Message() []byte { diff --git a/components/engine/vendor/github.com/docker/libnetwork/networkdb/cluster.go b/components/engine/vendor/github.com/docker/libnetwork/networkdb/cluster.go index 9156d0da68..ba9d91826f 100644 --- a/components/engine/vendor/github.com/docker/libnetwork/networkdb/cluster.go +++ b/components/engine/vendor/github.com/docker/libnetwork/networkdb/cluster.go @@ -98,10 +98,14 @@ func (nDB *NetworkDB) RemoveKey(key []byte) { } func (nDB *NetworkDB) clusterInit() error { + nDB.lastStatsTimestamp = time.Now() + nDB.lastHealthTimestamp = nDB.lastStatsTimestamp + config := memberlist.DefaultLANConfig() config.Name = nDB.config.NodeName config.BindAddr = nDB.config.BindAddr config.AdvertiseAddr = nDB.config.AdvertiseAddr + config.UDPBufferSize = nDB.config.PacketBufferSize if nDB.config.BindPort != 0 { config.BindPort = nDB.config.BindPort @@ -199,9 +203,8 @@ func (nDB *NetworkDB) clusterJoin(members []string) error { mlist := nDB.memberlist if _, err := mlist.Join(members); err != nil { - // Incase of failure, keep retrying join until it succeeds or the cluster is shutdown. + // In case of failure, keep retrying join until it succeeds or the cluster is shutdown. go nDB.retryJoin(members, nDB.stopCh) - return fmt.Errorf("could not join node to memberlist: %v", err) } @@ -287,13 +290,6 @@ func (nDB *NetworkDB) reconnectNode() { return } - // Update all the local table state to a new time to - // force update on the node we are trying to rejoin, just in - // case that node has these in deleting state still. This is - // facilitate fast convergence after recovering from a gossip - // failure. - nDB.updateLocalTableTime() - logrus.Debugf("Initiating bulk sync with node %s after reconnect", node.Name) nDB.bulkSync([]string{node.Name}, true) } @@ -310,12 +306,11 @@ func (nDB *NetworkDB) reapState() { func (nDB *NetworkDB) reapNetworks() { nDB.Lock() - for name, nn := range nDB.networks { + for _, nn := range nDB.networks { for id, n := range nn { if n.leaving { if n.reapTime <= 0 { delete(nn, id) - nDB.deleteNetworkNode(id, name) continue } n.reapTime -= reapPeriod @@ -373,11 +368,21 @@ func (nDB *NetworkDB) gossip() { networkNodes[nid] = nDB.networkNodes[nid] } + printStats := time.Since(nDB.lastStatsTimestamp) >= nDB.config.StatsPrintPeriod + printHealth := time.Since(nDB.lastHealthTimestamp) >= nDB.config.HealthPrintPeriod nDB.RUnlock() + if printHealth { + healthScore := nDB.memberlist.GetHealthScore() + if healthScore != 0 { + logrus.Warnf("NetworkDB stats - healthscore:%d (connectivity issues)", healthScore) + } + nDB.lastHealthTimestamp = time.Now() + } + for nid, nodes := range networkNodes { mNodes := nDB.mRandomNodes(3, nodes) - bytesAvail := udpSendBuf - compoundHeaderOverhead + bytesAvail := nDB.config.PacketBufferSize - compoundHeaderOverhead nDB.RLock() network, ok := thisNodeNetworks[nid] @@ -398,6 +403,14 @@ func (nDB *NetworkDB) gossip() { } msgs := broadcastQ.GetBroadcasts(compoundOverhead, bytesAvail) + // Collect stats and print the queue info, note this code is here also to have a view of the queues empty + network.qMessagesSent += len(msgs) + if printStats { + logrus.Infof("NetworkDB stats - Queue net:%s qLen:%d netPeers:%d netMsg/s:%d", + nid, broadcastQ.NumQueued(), broadcastQ.NumNodes(), network.qMessagesSent/int((nDB.config.StatsPrintPeriod/time.Second))) + network.qMessagesSent = 0 + } + if len(msgs) == 0 { continue } @@ -415,11 +428,15 @@ func (nDB *NetworkDB) gossip() { } // Send the compound message - if err := nDB.memberlist.SendToUDP(&mnode.Node, compound); err != nil { + if err := nDB.memberlist.SendBestEffort(&mnode.Node, compound); err != nil { logrus.Errorf("Failed to send gossip to %s: %s", mnode.Addr, err) } } } + // Reset the stats + if printStats { + nDB.lastStatsTimestamp = time.Now() + } } func (nDB *NetworkDB) bulkSyncTables() { @@ -590,7 +607,7 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b nDB.bulkSyncAckTbl[node] = ch nDB.Unlock() - err = nDB.memberlist.SendToTCP(&mnode.Node, buf) + err = nDB.memberlist.SendReliable(&mnode.Node, buf) if err != nil { nDB.Lock() delete(nDB.bulkSyncAckTbl, node) @@ -607,7 +624,7 @@ func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited b case <-t.C: logrus.Errorf("Bulk sync to node %s timed out", node) case <-ch: - logrus.Debugf("%s: Bulk sync to node %s took %s", nDB.config.NodeName, node, time.Now().Sub(startTime)) + logrus.Debugf("%s: Bulk sync to node %s took %s", nDB.config.NodeName, node, time.Since(startTime)) } t.Stop() } diff --git a/components/engine/vendor/github.com/docker/libnetwork/networkdb/delegate.go b/components/engine/vendor/github.com/docker/libnetwork/networkdb/delegate.go index 6df358382f..c48c779558 100644 --- a/components/engine/vendor/github.com/docker/libnetwork/networkdb/delegate.go +++ b/components/engine/vendor/github.com/docker/libnetwork/networkdb/delegate.go @@ -104,6 +104,9 @@ func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool { } n = nDB.checkAndGetNode(nEvent) + if n == nil { + return false + } nDB.purgeSameNode(n) n.ltime = nEvent.LTime @@ -111,9 +114,12 @@ func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool { switch nEvent.Type { case NodeEventTypeJoin: nDB.Lock() + _, found := nDB.nodes[n.Name] nDB.nodes[n.Name] = n nDB.Unlock() - logrus.Infof("Node join event for %s/%s", n.Name, n.Addr) + if !found { + logrus.Infof("Node join event for %s/%s", n.Name, n.Addr) + } return true case NodeEventTypeLeave: nDB.Lock() @@ -127,25 +133,12 @@ func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool { } func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool { - var flushEntries bool // Update our local clock if the received messages has newer // time. nDB.networkClock.Witness(nEvent.LTime) nDB.Lock() - defer func() { - nDB.Unlock() - // When a node leaves a network on the last task removal cleanup the - // local entries for this network & node combination. When the tasks - // on a network are removed we could have missed the gossip updates. - // Not doing this cleanup can leave stale entries because bulksyncs - // from the node will no longer include this network state. - // - // deleteNodeNetworkEntries takes nDB lock. - if flushEntries { - nDB.deleteNodeNetworkEntries(nEvent.NetworkID, nEvent.NodeName) - } - }() + defer nDB.Unlock() if nEvent.NodeName == nDB.config.NodeName { return false @@ -173,10 +166,20 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool { n.leaving = nEvent.Type == NetworkEventTypeLeave if n.leaving { n.reapTime = reapInterval - flushEntries = true + + // The remote node is leaving the network, but not the gossip cluster. + // Mark all its entries in deleted state, this will guarantee that + // if some node bulk sync with us, the deleted state of + // these entries will be propagated. + nDB.deleteNodeNetworkEntries(nEvent.NetworkID, nEvent.NodeName) + } + + if nEvent.Type == NetworkEventTypeLeave { + nDB.deleteNetworkNode(nEvent.NetworkID, nEvent.NodeName) + } else { + nDB.addNetworkNode(nEvent.NetworkID, nEvent.NodeName) } - nDB.addNetworkNode(nEvent.NetworkID, nEvent.NodeName) return true } @@ -203,17 +206,22 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool { nDB.RLock() networks := nDB.networks[nDB.config.NodeName] network, ok := networks[tEvent.NetworkID] - nDB.RUnlock() - if !ok || network.leaving { - return true + // Check if the owner of the event is still part of the network + nodes := nDB.networkNodes[tEvent.NetworkID] + var nodePresent bool + for _, node := range nodes { + if node == tEvent.NodeName { + nodePresent = true + break + } } - - e, err := nDB.getEntry(tEvent.TableName, tEvent.NetworkID, tEvent.Key) - if err != nil && tEvent.Type == TableEventTypeDelete { - // If it is a delete event and we don't have the entry here nothing to do. + nDB.RUnlock() + if !ok || network.leaving || !nodePresent { + // I'm out of the network OR the event owner is not anymore part of the network so do not propagate return false } + e, err := nDB.getEntry(tEvent.TableName, tEvent.NetworkID, tEvent.Key) if err == nil { // We have the latest state. Ignore the event // since it is stale. @@ -238,6 +246,11 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool { nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", tEvent.NetworkID, tEvent.TableName, tEvent.Key), e) nDB.Unlock() + if err != nil && tEvent.Type == TableEventTypeDelete { + // If it is a delete event and we didn't have the entry here don't repropagate + return true + } + var op opType switch tEvent.Type { case TableEventTypeCreate: @@ -278,8 +291,7 @@ func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) { return } - // Do not rebroadcast a bulk sync - if rebroadcast := nDB.handleTableEvent(&tEvent); rebroadcast && !isBulkSync { + if rebroadcast := nDB.handleTableEvent(&tEvent); rebroadcast { var err error buf, err = encodeRawMessage(MessageTypeTableEvent, buf) if err != nil { diff --git a/components/engine/vendor/github.com/docker/libnetwork/networkdb/event_delegate.go b/components/engine/vendor/github.com/docker/libnetwork/networkdb/event_delegate.go index 23e16832e7..0f32194cf3 100644 --- a/components/engine/vendor/github.com/docker/libnetwork/networkdb/event_delegate.go +++ b/components/engine/vendor/github.com/docker/libnetwork/networkdb/event_delegate.go @@ -45,9 +45,12 @@ func (e *eventDelegate) NotifyLeave(mn *memberlist.Node) { var failed bool logrus.Infof("Node %s/%s, left gossip cluster", mn.Name, mn.Addr) e.broadcastNodeEvent(mn.Addr, opDelete) - e.nDB.deleteNodeTableEntries(mn.Name) - e.nDB.deleteNetworkEntriesForNode(mn.Name) + // The node left or failed, delete all the entries created by it. + // If the node was temporary down, deleting the entries will guarantee that the CREATE events will be accepted + // If the node instead left because was going down, then it makes sense to just delete all its state e.nDB.Lock() + e.nDB.deleteNetworkEntriesForNode(mn.Name) + e.nDB.deleteNodeTableEntries(mn.Name) if n, ok := e.nDB.nodes[mn.Name]; ok { delete(e.nDB.nodes, mn.Name) @@ -61,7 +64,6 @@ func (e *eventDelegate) NotifyLeave(mn *memberlist.Node) { if failed { logrus.Infof("Node %s/%s, added to failed nodes list", mn.Name, mn.Addr) } - } func (e *eventDelegate) NotifyUpdate(n *memberlist.Node) { diff --git a/components/engine/vendor/github.com/docker/libnetwork/networkdb/message.go b/components/engine/vendor/github.com/docker/libnetwork/networkdb/message.go index a861752bd4..81a6d832a6 100644 --- a/components/engine/vendor/github.com/docker/libnetwork/networkdb/message.go +++ b/components/engine/vendor/github.com/docker/libnetwork/networkdb/message.go @@ -3,10 +3,6 @@ package networkdb import "github.com/gogo/protobuf/proto" const ( - // Max udp message size chosen to avoid network packet - // fragmentation. - udpSendBuf = 1400 - // Compound message header overhead 1 byte(message type) + 4 // bytes (num messages) compoundHeaderOverhead = 5 diff --git a/components/engine/vendor/github.com/docker/libnetwork/networkdb/networkdb.go b/components/engine/vendor/github.com/docker/libnetwork/networkdb/networkdb.go index b93a90d019..6447a16357 100644 --- a/components/engine/vendor/github.com/docker/libnetwork/networkdb/networkdb.go +++ b/components/engine/vendor/github.com/docker/libnetwork/networkdb/networkdb.go @@ -1,10 +1,11 @@ package networkdb -//go:generate protoc -I.:../Godeps/_workspace/src/github.com/gogo/protobuf --gogo_out=import_path=github.com/docker/libnetwork/networkdb,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto:. networkdb.proto +//go:generate protoc -I.:../vendor/github.com/gogo/protobuf --gogo_out=import_path=github.com/docker/libnetwork/networkdb,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto:. networkdb.proto import ( "fmt" "net" + "os" "strings" "sync" "time" @@ -93,6 +94,12 @@ type NetworkDB struct { // bootStrapIP is the list of IPs that can be used to bootstrap // the gossip. bootStrapIP []net.IP + + // lastStatsTimestamp is the last timestamp when the stats got printed + lastStatsTimestamp time.Time + + // lastHealthTimestamp is the last timestamp when the health score got printed + lastHealthTimestamp time.Time } // PeerInfo represents the peer (gossip cluster) nodes of a network @@ -101,6 +108,11 @@ type PeerInfo struct { IP string } +// PeerClusterInfo represents the peer (gossip cluster) nodes +type PeerClusterInfo struct { + PeerInfo +} + type node struct { memberlist.Node ltime serf.LamportTime @@ -126,6 +138,9 @@ type network struct { // The broadcast queue for table event gossip. This is only // initialized for this node's network attachment entries. tableBroadcasts *memberlist.TransmitLimitedQueue + + // Number of gossip messages sent related to this network during the last stats collection period + qMessagesSent int } // Config represents the configuration of the networdb instance and @@ -149,6 +164,21 @@ type Config struct { // Keys to be added to the Keyring of the memberlist. Key at index // 0 is the primary key Keys [][]byte + + // PacketBufferSize is the maximum number of bytes that memberlist will + // put in a packet (this will be for UDP packets by default with a NetTransport). + // A safe value for this is typically 1400 bytes (which is the default). However, + // depending on your network's MTU (Maximum Transmission Unit) you may + // be able to increase this to get more content into each gossip packet. + PacketBufferSize int + + // StatsPrintPeriod the period to use to print queue stats + // Default is 5min + StatsPrintPeriod time.Duration + + // HealthPrintPeriod the period to use to print the health score + // Default is 1min + HealthPrintPeriod time.Duration } // entry defines a table entry @@ -171,6 +201,18 @@ type entry struct { reapTime time.Duration } +// DefaultConfig returns a NetworkDB config with default values +func DefaultConfig() *Config { + hostname, _ := os.Hostname() + return &Config{ + NodeName: hostname, + BindAddr: "0.0.0.0", + PacketBufferSize: 1400, + StatsPrintPeriod: 5 * time.Minute, + HealthPrintPeriod: 1 * time.Minute, + } +} + // New creates a new instance of NetworkDB using the Config passed by // the caller. func New(c *Config) (*NetworkDB, error) { @@ -200,6 +242,7 @@ func New(c *Config) (*NetworkDB, error) { // instances passed by the caller in the form of addr:port func (nDB *NetworkDB) Join(members []string) error { nDB.Lock() + nDB.bootStrapIP = make([]net.IP, 0, len(members)) for _, m := range members { nDB.bootStrapIP = append(nDB.bootStrapIP, net.ParseIP(m)) } @@ -215,6 +258,20 @@ func (nDB *NetworkDB) Close() { } } +// ClusterPeers returns all the gossip cluster peers. +func (nDB *NetworkDB) ClusterPeers() []PeerInfo { + nDB.RLock() + defer nDB.RUnlock() + peers := make([]PeerInfo, 0, len(nDB.nodes)) + for _, node := range nDB.nodes { + peers = append(peers, PeerInfo{ + Name: node.Name, + IP: node.Node.Addr.String(), + }) + } + return peers +} + // Peers returns the gossip peers for a given network. func (nDB *NetworkDB) Peers(nid string) []PeerInfo { nDB.RLock() @@ -361,7 +418,6 @@ func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error { } func (nDB *NetworkDB) deleteNetworkEntriesForNode(deletedNode string) { - nDB.Lock() for nid, nodes := range nDB.networkNodes { updatedNodes := make([]string, 0, len(nodes)) for _, node := range nodes { @@ -376,11 +432,25 @@ func (nDB *NetworkDB) deleteNetworkEntriesForNode(deletedNode string) { } delete(nDB.networks, deletedNode) - nDB.Unlock() } +// deleteNodeNetworkEntries is called in 2 conditions with 2 different outcomes: +// 1) when a notification is coming of a node leaving the network +// - Walk all the network entries and mark the leaving node's entries for deletion +// These will be garbage collected when the reap timer will expire +// 2) when the local node is leaving the network +// - Walk all the network entries: +// A) if the entry is owned by the local node +// then we will mark it for deletion. This will ensure that if a node did not +// yet received the notification that the local node is leaving, will be aware +// of the entries to be deleted. +// B) if the entry is owned by a remote node, then we can safely delete it. This +// ensures that if we join back this network as we receive the CREATE event for +// entries owned by remote nodes, we will accept them and we notify the application func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) { - nDB.Lock() + // Indicates if the delete is triggered for the local node + isNodeLocal := node == nDB.config.NodeName + nDB.indexes[byNetwork].WalkPrefix(fmt.Sprintf("/%s", nid), func(path string, v interface{}) bool { oldEntry := v.(*entry) @@ -389,7 +459,15 @@ func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) { tname := params[1] key := params[2] - if oldEntry.node != node { + // If the entry is owned by a remote node and this node is not leaving the network + if oldEntry.node != node && !isNodeLocal { + // Don't do anything because the event is triggered for a node that does not own this entry + return false + } + + // If this entry is already marked for deletion and this node is not leaving the network + if oldEntry.deleting && !isNodeLocal { + // Don't do anything this entry will be already garbage collected using the old reapTime return false } @@ -401,17 +479,29 @@ func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) { reapTime: reapInterval, } - nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry) - nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry) + // we arrived at this point in 2 cases: + // 1) this entry is owned by the node that is leaving the network + // 2) the local node is leaving the network + if oldEntry.node == node { + if isNodeLocal { + // TODO fcrisciani: this can be removed if there is no way to leave the network + // without doing a delete of all the objects + entry.ltime++ + } + nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry) + nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry) + } else { + // the local node is leaving the network, all the entries of remote nodes can be safely removed + nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key)) + nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key)) + } nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, entry.value)) return false }) - nDB.Unlock() } func (nDB *NetworkDB) deleteNodeTableEntries(node string) { - nDB.Lock() nDB.indexes[byTable].Walk(func(path string, v interface{}) bool { oldEntry := v.(*entry) if oldEntry.node != node { @@ -423,21 +513,12 @@ func (nDB *NetworkDB) deleteNodeTableEntries(node string) { nid := params[1] key := params[2] - entry := &entry{ - ltime: oldEntry.ltime, - node: node, - value: oldEntry.value, - deleting: true, - reapTime: reapInterval, - } + nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key)) + nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key)) - nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry) - nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry) - - nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, entry.value)) + nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, oldEntry.value)) return false }) - nDB.Unlock() } // WalkTable walks a single table in NetworkDB and invokes the passed @@ -481,13 +562,12 @@ func (nDB *NetworkDB) JoinNetwork(nid string) error { nodeNetworks[nid].tableBroadcasts = &memberlist.TransmitLimitedQueue{ NumNodes: func() int { nDB.RLock() - num := len(nDB.networkNodes[nid]) - nDB.RUnlock() - return num + defer nDB.RUnlock() + return len(nDB.networkNodes[nid]) }, RetransmitMult: 4, } - nDB.networkNodes[nid] = append(nDB.networkNodes[nid], nDB.config.NodeName) + nDB.addNetworkNode(nid, nDB.config.NodeName) networkNodes := nDB.networkNodes[nid] nDB.Unlock() @@ -517,35 +597,12 @@ func (nDB *NetworkDB) LeaveNetwork(nid string) error { nDB.Lock() defer nDB.Unlock() - var ( - paths []string - entries []*entry - ) - nwWalker := func(path string, v interface{}) bool { - entry, ok := v.(*entry) - if !ok { - return false - } - paths = append(paths, path) - entries = append(entries, entry) - return false - } + // Remove myself from the list of the nodes participating to the network + nDB.deleteNetworkNode(nid, nDB.config.NodeName) - nDB.indexes[byNetwork].WalkPrefix(fmt.Sprintf("/%s", nid), nwWalker) - for _, path := range paths { - params := strings.Split(path[1:], "/") - tname := params[1] - key := params[2] - - if _, ok := nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key)); !ok { - logrus.Errorf("Could not delete entry in table %s with network id %s and key %s as it does not exist", tname, nid, key) - } - - if _, ok := nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key)); !ok { - logrus.Errorf("Could not delete entry in network %s with table name %s and key %s as it does not exist", nid, tname, key) - } - } + // Update all the local entries marking them for deletion and delete all the remote entries + nDB.deleteNodeNetworkEntries(nid, nDB.config.NodeName) nodeNetworks, ok := nDB.networks[nDB.config.NodeName] if !ok { @@ -558,6 +615,7 @@ func (nDB *NetworkDB) LeaveNetwork(nid string) error { } n.ltime = ltime + n.reapTime = reapInterval n.leaving = true return nil } @@ -580,7 +638,10 @@ func (nDB *NetworkDB) addNetworkNode(nid string, nodeName string) { // passed network. Caller should hold the NetworkDB lock while calling // this func (nDB *NetworkDB) deleteNetworkNode(nid string, nodeName string) { - nodes := nDB.networkNodes[nid] + nodes, ok := nDB.networkNodes[nid] + if !ok || len(nodes) == 0 { + return + } newNodes := make([]string, 0, len(nodes)-1) for _, name := range nodes { if name == nodeName { @@ -618,27 +679,3 @@ func (nDB *NetworkDB) updateLocalNetworkTime() { n.ltime = ltime } } - -func (nDB *NetworkDB) updateLocalTableTime() { - nDB.Lock() - defer nDB.Unlock() - - ltime := nDB.tableClock.Increment() - nDB.indexes[byTable].Walk(func(path string, v interface{}) bool { - entry := v.(*entry) - if entry.node != nDB.config.NodeName { - return false - } - - params := strings.Split(path[1:], "/") - tname := params[0] - nid := params[1] - key := params[2] - entry.ltime = ltime - - nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry) - nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry) - - return false - }) -} diff --git a/components/engine/vendor/github.com/docker/libnetwork/networkdb/networkdbdiagnose.go b/components/engine/vendor/github.com/docker/libnetwork/networkdb/networkdbdiagnose.go new file mode 100644 index 0000000000..d70cec7bb6 --- /dev/null +++ b/components/engine/vendor/github.com/docker/libnetwork/networkdb/networkdbdiagnose.go @@ -0,0 +1,242 @@ +package networkdb + +import ( + "fmt" + "net/http" + "strings" + + "github.com/docker/libnetwork/diagnose" +) + +const ( + missingParameter = "missing parameter" +) + +// NetDbPaths2Func TODO +var NetDbPaths2Func = map[string]diagnose.HTTPHandlerFunc{ + "/join": dbJoin, + "/networkpeers": dbPeers, + "/clusterpeers": dbClusterPeers, + "/joinnetwork": dbJoinNetwork, + "/leavenetwork": dbLeaveNetwork, + "/createentry": dbCreateEntry, + "/updateentry": dbUpdateEntry, + "/deleteentry": dbDeleteEntry, + "/getentry": dbGetEntry, + "/gettable": dbGetTable, +} + +func dbJoin(ctx interface{}, w http.ResponseWriter, r *http.Request) { + r.ParseForm() + diagnose.DebugHTTPForm(r) + if len(r.Form["members"]) < 1 { + diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?members=ip1,ip2,...", r.URL.Path)) + return + } + + nDB, ok := ctx.(*NetworkDB) + if ok { + err := nDB.Join(strings.Split(r.Form["members"][0], ",")) + if err != nil { + fmt.Fprintf(w, "%s error in the DB join %s\n", r.URL.Path, err) + return + } + + fmt.Fprintf(w, "OK\n") + } +} + +func dbPeers(ctx interface{}, w http.ResponseWriter, r *http.Request) { + r.ParseForm() + diagnose.DebugHTTPForm(r) + if len(r.Form["nid"]) < 1 { + diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?nid=test", r.URL.Path)) + return + } + + nDB, ok := ctx.(*NetworkDB) + if ok { + peers := nDB.Peers(r.Form["nid"][0]) + fmt.Fprintf(w, "Network:%s Total peers: %d\n", r.Form["nid"], len(peers)) + for i, peerInfo := range peers { + fmt.Fprintf(w, "%d) %s -> %s\n", i, peerInfo.Name, peerInfo.IP) + } + } +} + +func dbClusterPeers(ctx interface{}, w http.ResponseWriter, r *http.Request) { + nDB, ok := ctx.(*NetworkDB) + if ok { + peers := nDB.ClusterPeers() + fmt.Fprintf(w, "Total peers: %d\n", len(peers)) + for i, peerInfo := range peers { + fmt.Fprintf(w, "%d) %s -> %s\n", i, peerInfo.Name, peerInfo.IP) + } + } +} + +func dbCreateEntry(ctx interface{}, w http.ResponseWriter, r *http.Request) { + r.ParseForm() + diagnose.DebugHTTPForm(r) + if len(r.Form["tname"]) < 1 || + len(r.Form["nid"]) < 1 || + len(r.Form["key"]) < 1 || + len(r.Form["value"]) < 1 { + diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?tname=table_name&nid=network_id&key=k&value=v", r.URL.Path)) + return + } + + tname := r.Form["tname"][0] + nid := r.Form["nid"][0] + key := r.Form["key"][0] + value := r.Form["value"][0] + + nDB, ok := ctx.(*NetworkDB) + if ok { + if err := nDB.CreateEntry(tname, nid, key, []byte(value)); err != nil { + diagnose.HTTPReplyError(w, err.Error(), "") + return + } + fmt.Fprintf(w, "OK\n") + } +} + +func dbUpdateEntry(ctx interface{}, w http.ResponseWriter, r *http.Request) { + r.ParseForm() + diagnose.DebugHTTPForm(r) + if len(r.Form["tname"]) < 1 || + len(r.Form["nid"]) < 1 || + len(r.Form["key"]) < 1 || + len(r.Form["value"]) < 1 { + diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?tname=table_name&nid=network_id&key=k&value=v", r.URL.Path)) + return + } + + tname := r.Form["tname"][0] + nid := r.Form["nid"][0] + key := r.Form["key"][0] + value := r.Form["value"][0] + + nDB, ok := ctx.(*NetworkDB) + if ok { + if err := nDB.UpdateEntry(tname, nid, key, []byte(value)); err != nil { + diagnose.HTTPReplyError(w, err.Error(), "") + return + } + fmt.Fprintf(w, "OK\n") + } +} + +func dbDeleteEntry(ctx interface{}, w http.ResponseWriter, r *http.Request) { + r.ParseForm() + diagnose.DebugHTTPForm(r) + if len(r.Form["tname"]) < 1 || + len(r.Form["nid"]) < 1 || + len(r.Form["key"]) < 1 { + diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?tname=table_name&nid=network_id&key=k", r.URL.Path)) + return + } + + tname := r.Form["tname"][0] + nid := r.Form["nid"][0] + key := r.Form["key"][0] + + nDB, ok := ctx.(*NetworkDB) + if ok { + err := nDB.DeleteEntry(tname, nid, key) + if err != nil { + diagnose.HTTPReplyError(w, err.Error(), "") + return + } + fmt.Fprintf(w, "OK\n") + } +} + +func dbGetEntry(ctx interface{}, w http.ResponseWriter, r *http.Request) { + r.ParseForm() + diagnose.DebugHTTPForm(r) + if len(r.Form["tname"]) < 1 || + len(r.Form["nid"]) < 1 || + len(r.Form["key"]) < 1 { + diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?tname=table_name&nid=network_id&key=k", r.URL.Path)) + return + } + + tname := r.Form["tname"][0] + nid := r.Form["nid"][0] + key := r.Form["key"][0] + + nDB, ok := ctx.(*NetworkDB) + if ok { + value, err := nDB.GetEntry(tname, nid, key) + if err != nil { + diagnose.HTTPReplyError(w, err.Error(), "") + return + } + fmt.Fprintf(w, "key:`%s` value:`%s`\n", key, string(value)) + } +} + +func dbJoinNetwork(ctx interface{}, w http.ResponseWriter, r *http.Request) { + r.ParseForm() + diagnose.DebugHTTPForm(r) + if len(r.Form["nid"]) < 1 { + diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?nid=network_id", r.URL.Path)) + return + } + + nid := r.Form["nid"][0] + + nDB, ok := ctx.(*NetworkDB) + if ok { + if err := nDB.JoinNetwork(nid); err != nil { + diagnose.HTTPReplyError(w, err.Error(), "") + return + } + fmt.Fprintf(w, "OK\n") + } +} + +func dbLeaveNetwork(ctx interface{}, w http.ResponseWriter, r *http.Request) { + r.ParseForm() + diagnose.DebugHTTPForm(r) + if len(r.Form["nid"]) < 1 { + diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?nid=network_id", r.URL.Path)) + return + } + + nid := r.Form["nid"][0] + + nDB, ok := ctx.(*NetworkDB) + if ok { + if err := nDB.LeaveNetwork(nid); err != nil { + diagnose.HTTPReplyError(w, err.Error(), "") + return + } + fmt.Fprintf(w, "OK\n") + } +} + +func dbGetTable(ctx interface{}, w http.ResponseWriter, r *http.Request) { + r.ParseForm() + diagnose.DebugHTTPForm(r) + if len(r.Form["tname"]) < 1 || + len(r.Form["nid"]) < 1 { + diagnose.HTTPReplyError(w, missingParameter, fmt.Sprintf("%s?tname=table_name&nid=network_id", r.URL.Path)) + return + } + + tname := r.Form["tname"][0] + nid := r.Form["nid"][0] + + nDB, ok := ctx.(*NetworkDB) + if ok { + table := nDB.GetTableByNetwork(tname, nid) + fmt.Fprintf(w, "total elements: %d\n", len(table)) + i := 0 + for k, v := range table { + fmt.Fprintf(w, "%d) k:`%s` -> v:`%s`\n", i, k, string(v.([]byte))) + i++ + } + } +} diff --git a/components/engine/vendor/github.com/docker/libnetwork/osl/neigh_linux.go b/components/engine/vendor/github.com/docker/libnetwork/osl/neigh_linux.go index 161ffa7beb..c105e52232 100644 --- a/components/engine/vendor/github.com/docker/libnetwork/osl/neigh_linux.go +++ b/components/engine/vendor/github.com/docker/libnetwork/osl/neigh_linux.go @@ -91,9 +91,7 @@ func (n *networkNamespace) DeleteNeighbor(dstIP net.IP, dstMac net.HardwareAddr, if nh.linkDst != "" { nlnh.LinkIndex = iface.Attrs().Index } - if err := nlh.NeighDel(nlnh); err != nil { - logrus.Warnf("Deleting bridge mac mac %s failed, %v", dstMac, err) - } + nlh.NeighDel(nlnh) } } diff --git a/components/engine/vendor/github.com/docker/libnetwork/resolver.go b/components/engine/vendor/github.com/docker/libnetwork/resolver.go index cc7692621f..ff472d0019 100644 --- a/components/engine/vendor/github.com/docker/libnetwork/resolver.go +++ b/components/engine/vendor/github.com/docker/libnetwork/resolver.go @@ -446,7 +446,7 @@ func (r *resolver) ServeDNS(w dns.ResponseWriter, query *dns.Msg) { defer co.Close() // limits the number of outstanding concurrent queries. - if r.forwardQueryStart() == false { + if !r.forwardQueryStart() { old := r.tStamp r.tStamp = time.Now() if r.tStamp.Sub(old) > logInterval { diff --git a/components/engine/vendor/github.com/docker/libnetwork/sandbox.go b/components/engine/vendor/github.com/docker/libnetwork/sandbox.go index 9454c5c286..46674067a1 100644 --- a/components/engine/vendor/github.com/docker/libnetwork/sandbox.go +++ b/components/engine/vendor/github.com/docker/libnetwork/sandbox.go @@ -621,7 +621,7 @@ func (sb *sandbox) resolveName(req string, networkName string, epList []*endpoin func (sb *sandbox) SetKey(basePath string) error { start := time.Now() defer func() { - logrus.Debugf("sandbox set key processing took %s for container %s", time.Now().Sub(start), sb.ContainerID()) + logrus.Debugf("sandbox set key processing took %s for container %s", time.Since(start), sb.ContainerID()) }() if basePath == "" { @@ -773,9 +773,7 @@ func (sb *sandbox) restoreOslSandbox() error { } Ifaces[fmt.Sprintf("%s+%s", i.srcName, i.dstPrefix)] = ifaceOptions if joinInfo != nil { - for _, r := range joinInfo.StaticRoutes { - routes = append(routes, r) - } + routes = append(routes, joinInfo.StaticRoutes...) } if ep.needResolver() { sb.startResolver(true) @@ -789,11 +787,7 @@ func (sb *sandbox) restoreOslSandbox() error { // restore osl sandbox err := sb.osSbox.Restore(Ifaces, routes, gwep.joinInfo.gw, gwep.joinInfo.gw6) - if err != nil { - return err - } - - return nil + return err } func (sb *sandbox) populateNetworkResources(ep *endpoint) error { @@ -958,9 +952,7 @@ func (sb *sandbox) joinLeaveStart() { joinLeaveDone := sb.joinLeaveDone sb.Unlock() - select { - case <-joinLeaveDone: - } + <-joinLeaveDone sb.Lock() } diff --git a/components/engine/vendor/github.com/docker/libnetwork/sandbox_externalkey_unix.go b/components/engine/vendor/github.com/docker/libnetwork/sandbox_externalkey_unix.go index 4a7ac06b59..c33398f953 100644 --- a/components/engine/vendor/github.com/docker/libnetwork/sandbox_externalkey_unix.go +++ b/components/engine/vendor/github.com/docker/libnetwork/sandbox_externalkey_unix.go @@ -52,7 +52,6 @@ func processSetKeyReexec() { controllerID := os.Args[2] err = SetExternalKey(controllerID, containerID, fmt.Sprintf("/proc/%d/ns/net", state.Pid)) - return } // SetExternalKey provides a convenient way to set an External key to a sandbox diff --git a/components/engine/vendor/github.com/docker/libnetwork/sandbox_store.go b/components/engine/vendor/github.com/docker/libnetwork/sandbox_store.go index 38b2bd7e8b..b92a544dc9 100644 --- a/components/engine/vendor/github.com/docker/libnetwork/sandbox_store.go +++ b/components/engine/vendor/github.com/docker/libnetwork/sandbox_store.go @@ -115,9 +115,7 @@ func (sbs *sbState) CopyTo(o datastore.KVObject) error { dstSbs.dbExists = sbs.dbExists dstSbs.EpPriority = sbs.EpPriority - for _, eps := range sbs.Eps { - dstSbs.Eps = append(dstSbs.Eps, eps) - } + dstSbs.Eps = append(dstSbs.Eps, sbs.Eps...) if len(sbs.ExtDNS2) > 0 { for _, dns := range sbs.ExtDNS2 { diff --git a/components/engine/vendor/github.com/docker/libnetwork/service_linux.go b/components/engine/vendor/github.com/docker/libnetwork/service_linux.go index 1cf7ee91aa..784c1784ec 100644 --- a/components/engine/vendor/github.com/docker/libnetwork/service_linux.go +++ b/components/engine/vendor/github.com/docker/libnetwork/service_linux.go @@ -372,6 +372,7 @@ func programIngress(gwIP net.IP, ingressPorts []*PortConfig, isDelete bool) erro if err := iptables.RawCombinedOutput("-I", "FORWARD", "-j", ingressChain); err != nil { return fmt.Errorf("failed to add jump rule to %s in filter table forward chain: %v", ingressChain, err) } + arrangeUserFilterRule() } oifName, err := findOIFName(gwIP) @@ -438,7 +439,9 @@ func programIngress(gwIP net.IP, ingressPorts []*PortConfig, isDelete bool) erro return nil } -// In the filter table FORWARD chain first rule should be to jump to INGRESS-CHAIN +// In the filter table FORWARD chain the first rule should be to jump to +// DOCKER-USER so the user is able to filter packet first. +// The second rule should be jump to INGRESS-CHAIN. // This chain has the rules to allow access to the published ports for swarm tasks // from local bridge networks and docker_gwbridge (ie:taks on other swarm netwroks) func arrangeIngressFilterRule() {