From e930dafbb3560be4613db4baab219089dbf67846 Mon Sep 17 00:00:00 2001 From: Madhu Venugopal Date: Wed, 31 May 2017 16:04:00 -0700 Subject: [PATCH] Vendoring libnetwork 2e99f06621c23a5f4038968f1af1e28c84e4104e Fixes #33415 Fixes #33346 Implemented few additional IPVS APIs to be used by other projects Signed-off-by: Madhu Venugopal Upstream-commit: 0484bdb6ca0ddd0bebc490e335a7f788cdc30f13 Component: engine --- components/engine/vendor.conf | 2 +- .../docker/libnetwork/controller.go | 1 - .../docker/libnetwork/ipvs/constants.go | 17 + .../github.com/docker/libnetwork/ipvs/ipvs.go | 42 +++ .../docker/libnetwork/ipvs/netlink.go | 325 +++++++++++++++++- .../github.com/docker/libnetwork/network.go | 3 + .../docker/libnetwork/networkdb/cluster.go | 1 - .../docker/libnetwork/networkdb/delegate.go | 41 ++- .../libnetwork/networkdb/event_delegate.go | 13 +- .../docker/libnetwork/networkdb/networkdb.go | 10 + 10 files changed, 442 insertions(+), 13 deletions(-) diff --git a/components/engine/vendor.conf b/components/engine/vendor.conf index 35708e168c..3d9f47c944 100644 --- a/components/engine/vendor.conf +++ b/components/engine/vendor.conf @@ -26,7 +26,7 @@ github.com/imdario/mergo 0.2.1 golang.org/x/sync de49d9dcd27d4f764488181bea099dfe6179bcf0 #get libnetwork packages -github.com/docker/libnetwork 83e1e49475b88a9f1f8ba89a690a7d5de42e24b9 +github.com/docker/libnetwork 2e99f06621c23a5f4038968f1af1e28c84e4104e github.com/docker/go-events 18b43f1bc85d9cdd42c05a6cd2d444c7a200a894 github.com/armon/go-radix e39d623f12e8e41c7b5529e9a9dd67a1e2261f80 github.com/armon/go-metrics eb0af217e5e9747e41dd5303755356b62d28e3ec diff --git a/components/engine/vendor/github.com/docker/libnetwork/controller.go b/components/engine/vendor/github.com/docker/libnetwork/controller.go index df75be707f..ae7dac0b82 100644 --- a/components/engine/vendor/github.com/docker/libnetwork/controller.go +++ b/components/engine/vendor/github.com/docker/libnetwork/controller.go @@ -738,7 +738,6 @@ func (c *controller) NewNetwork(networkType, name string, id string, options ... if network.configOnly { network.scope = datastore.LocalScope network.networkType = "null" - network.ipamType = "" goto addToStore } diff --git a/components/engine/vendor/github.com/docker/libnetwork/ipvs/constants.go b/components/engine/vendor/github.com/docker/libnetwork/ipvs/constants.go index 103e71a37c..d36bec0e80 100644 --- a/components/engine/vendor/github.com/docker/libnetwork/ipvs/constants.go +++ b/components/engine/vendor/github.com/docker/libnetwork/ipvs/constants.go @@ -85,6 +85,23 @@ const ( ipvsDestAttrInactiveConnections ipvsDestAttrPersistentConnections ipvsDestAttrStats + ipvsDestAttrAddressFamily +) + +// IPVS Svc Statistics constancs + +const ( + ipvsSvcStatsUnspec int = iota + ipvsSvcStatsConns + ipvsSvcStatsPktsIn + ipvsSvcStatsPktsOut + ipvsSvcStatsBytesIn + ipvsSvcStatsBytesOut + ipvsSvcStatsCPS + ipvsSvcStatsPPSIn + ipvsSvcStatsPPSOut + ipvsSvcStatsBPSIn + ipvsSvcStatsBPSOut ) // Destination forwarding methods diff --git a/components/engine/vendor/github.com/docker/libnetwork/ipvs/ipvs.go b/components/engine/vendor/github.com/docker/libnetwork/ipvs/ipvs.go index 266cc24dbe..a285e102e3 100644 --- a/components/engine/vendor/github.com/docker/libnetwork/ipvs/ipvs.go +++ b/components/engine/vendor/github.com/docker/libnetwork/ipvs/ipvs.go @@ -6,6 +6,7 @@ import ( "net" "syscall" + "fmt" "github.com/vishvananda/netlink/nl" "github.com/vishvananda/netns" ) @@ -25,6 +26,21 @@ type Service struct { Netmask uint32 AddressFamily uint16 PEName string + Stats SvcStats +} + +// SvcStats defines an IPVS service statistics +type SvcStats struct { + Connections uint32 + PacketsIn uint32 + PacketsOut uint32 + BytesIn uint64 + BytesOut uint64 + CPS uint32 + BPSOut uint32 + PPSIn uint32 + PPSOut uint32 + BPSIn uint32 } // Destination defines an IPVS destination (real server) in its @@ -117,3 +133,29 @@ func (i *Handle) UpdateDestination(s *Service, d *Destination) error { func (i *Handle) DelDestination(s *Service, d *Destination) error { return i.doCmd(s, d, ipvsCmdDelDest) } + +// GetServices returns an array of services configured on the Node +func (i *Handle) GetServices() ([]*Service, error) { + return i.doGetServicesCmd(nil) +} + +// GetDestinations returns an array of Destinations configured for this Service +func (i *Handle) GetDestinations(s *Service) ([]*Destination, error) { + return i.doGetDestinationsCmd(s, nil) +} + +// GetService gets details of a specific IPVS services, useful in updating statisics etc., +func (i *Handle) GetService(s *Service) (*Service, error) { + + res, err := i.doGetServicesCmd(s) + if err != nil { + return nil, err + } + + // We are looking for exactly one service otherwise error out + if len(res) != 1 { + return nil, fmt.Errorf("Expected only one service obtained=%d", len(res)) + } + + return res[0], nil +} diff --git a/components/engine/vendor/github.com/docker/libnetwork/ipvs/netlink.go b/components/engine/vendor/github.com/docker/libnetwork/ipvs/netlink.go index 635606dacd..5450679c3b 100644 --- a/components/engine/vendor/github.com/docker/libnetwork/ipvs/netlink.go +++ b/components/engine/vendor/github.com/docker/libnetwork/ipvs/netlink.go @@ -19,6 +19,7 @@ import ( "github.com/vishvananda/netns" ) +// For Quick Reference IPVS related netlink message is described at the end of this file. var ( native = nl.NativeEndian() ipvsFamily int @@ -89,7 +90,6 @@ func fillService(s *Service) nl.NetlinkRequestData { if s.PEName != "" { nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrPEName, nl.ZeroTerminated(s.PEName)) } - f := &ipvsFlags{ flags: s.Flags, mask: 0xFFFFFFFF, @@ -117,20 +117,38 @@ func fillDestinaton(d *Destination) nl.NetlinkRequestData { return cmdAttr } -func (i *Handle) doCmd(s *Service, d *Destination, cmd uint8) error { +func (i *Handle) doCmdwithResponse(s *Service, d *Destination, cmd uint8) ([][]byte, error) { req := newIPVSRequest(cmd) req.Seq = atomic.AddUint32(&i.seq, 1) - req.AddData(fillService(s)) - if d != nil { + if s == nil { + req.Flags |= syscall.NLM_F_DUMP //Flag to dump all messages + req.AddData(nl.NewRtAttr(ipvsCmdAttrService, nil)) //Add a dummy attribute + } else { + req.AddData(fillService(s)) + } + + if d == nil { + if cmd == ipvsCmdGetDest { + req.Flags |= syscall.NLM_F_DUMP + } + + } else { req.AddData(fillDestinaton(d)) } - if _, err := execute(i.sock, req, 0); err != nil { - return err + res, err := execute(i.sock, req, 0) + if err != nil { + return [][]byte{}, err } - return nil + return res, nil +} + +func (i *Handle) doCmd(s *Service, d *Destination, cmd uint8) error { + _, err := i.doCmdwithResponse(s, d, cmd) + + return err } func getIPVSFamily() (int, error) { @@ -171,7 +189,6 @@ func rawIPData(ip net.IP) []byte { if family == nl.FAMILY_V4 { return ip.To4() } - return ip } @@ -235,3 +252,295 @@ done: } return res, nil } + +func parseIP(ip []byte, family uint16) (net.IP, error) { + + var resIP net.IP + + switch family { + case syscall.AF_INET: + resIP = (net.IP)(ip[:4]) + case syscall.AF_INET6: + resIP = (net.IP)(ip[:16]) + default: + return nil, fmt.Errorf("parseIP Error ip=%v", ip) + + } + return resIP, nil +} + +// parseStats +func assembleStats(msg []byte) (SvcStats, error) { + + var s SvcStats + + attrs, err := nl.ParseRouteAttr(msg) + if err != nil { + return s, err + } + + for _, attr := range attrs { + attrType := int(attr.Attr.Type) + switch attrType { + case ipvsSvcStatsConns: + s.Connections = native.Uint32(attr.Value) + case ipvsSvcStatsPktsIn: + s.PacketsIn = native.Uint32(attr.Value) + case ipvsSvcStatsPktsOut: + s.PacketsOut = native.Uint32(attr.Value) + case ipvsSvcStatsBytesIn: + s.BytesIn = native.Uint64(attr.Value) + case ipvsSvcStatsBytesOut: + s.BytesOut = native.Uint64(attr.Value) + case ipvsSvcStatsCPS: + s.CPS = native.Uint32(attr.Value) + case ipvsSvcStatsPPSIn: + s.PPSIn = native.Uint32(attr.Value) + case ipvsSvcStatsPPSOut: + s.PPSOut = native.Uint32(attr.Value) + case ipvsSvcStatsBPSIn: + s.BPSIn = native.Uint32(attr.Value) + case ipvsSvcStatsBPSOut: + s.BPSOut = native.Uint32(attr.Value) + } + } + return s, nil +} + +// assembleService assembles a services back from a hain of netlink attributes +func assembleService(attrs []syscall.NetlinkRouteAttr) (*Service, error) { + + var s Service + + for _, attr := range attrs { + + attrType := int(attr.Attr.Type) + + switch attrType { + + case ipvsSvcAttrAddressFamily: + s.AddressFamily = native.Uint16(attr.Value) + case ipvsSvcAttrProtocol: + s.Protocol = native.Uint16(attr.Value) + case ipvsSvcAttrAddress: + ip, err := parseIP(attr.Value, s.AddressFamily) + if err != nil { + return nil, err + } + s.Address = ip + case ipvsSvcAttrPort: + s.Port = binary.BigEndian.Uint16(attr.Value) + case ipvsSvcAttrFWMark: + s.FWMark = native.Uint32(attr.Value) + case ipvsSvcAttrSchedName: + s.SchedName = nl.BytesToString(attr.Value) + case ipvsSvcAttrFlags: + s.Flags = native.Uint32(attr.Value) + case ipvsSvcAttrTimeout: + s.Timeout = native.Uint32(attr.Value) + case ipvsSvcAttrNetmask: + s.Netmask = native.Uint32(attr.Value) + case ipvsSvcAttrStats: + stats, err := assembleStats(attr.Value) + if err != nil { + return nil, err + } + s.Stats = stats + } + + } + return &s, nil +} + +// parseService given a ipvs netlink response this function will respond with a valid service entry, an error otherwise +func (i *Handle) parseService(msg []byte) (*Service, error) { + + var s *Service + + //Remove General header for this message and parse the NetLink message + hdr := deserializeGenlMsg(msg) + NetLinkAttrs, err := nl.ParseRouteAttr(msg[hdr.Len():]) + if err != nil { + return nil, err + } + if len(NetLinkAttrs) == 0 { + return nil, fmt.Errorf("error no valid netlink message found while parsing service record") + } + + //Now Parse and get IPVS related attributes messages packed in this message. + ipvsAttrs, err := nl.ParseRouteAttr(NetLinkAttrs[0].Value) + if err != nil { + return nil, err + } + + //Assemble all the IPVS related attribute messages and create a service record + s, err = assembleService(ipvsAttrs) + if err != nil { + return nil, err + } + + return s, nil +} + +// doGetServicesCmd a wrapper which could be used commonly for both GetServices() and GetService(*Service) +func (i *Handle) doGetServicesCmd(svc *Service) ([]*Service, error) { + var res []*Service + + msgs, err := i.doCmdwithResponse(svc, nil, ipvsCmdGetService) + if err != nil { + return nil, err + } + + for _, msg := range msgs { + srv, err := i.parseService(msg) + if err != nil { + return nil, err + } + res = append(res, srv) + } + + return res, nil +} + +func assembleDestination(attrs []syscall.NetlinkRouteAttr) (*Destination, error) { + + var d Destination + + for _, attr := range attrs { + + attrType := int(attr.Attr.Type) + + switch attrType { + case ipvsDestAttrAddress: + ip, err := parseIP(attr.Value, syscall.AF_INET) + if err != nil { + return nil, err + } + d.Address = ip + case ipvsDestAttrPort: + d.Port = binary.BigEndian.Uint16(attr.Value) + case ipvsDestAttrForwardingMethod: + d.ConnectionFlags = native.Uint32(attr.Value) + case ipvsDestAttrWeight: + d.Weight = int(native.Uint16(attr.Value)) + case ipvsDestAttrUpperThreshold: + d.UpperThreshold = native.Uint32(attr.Value) + case ipvsDestAttrLowerThreshold: + d.LowerThreshold = native.Uint32(attr.Value) + case ipvsDestAttrAddressFamily: + d.AddressFamily = native.Uint16(attr.Value) + } + } + return &d, nil +} + +// parseDestination given a ipvs netlink response this function will respond with a valid destination entry, an error otherwise +func (i *Handle) parseDestination(msg []byte) (*Destination, error) { + var dst *Destination + + //Remove General header for this message + hdr := deserializeGenlMsg(msg) + NetLinkAttrs, err := nl.ParseRouteAttr(msg[hdr.Len():]) + if err != nil { + return nil, err + } + if len(NetLinkAttrs) == 0 { + return nil, fmt.Errorf("error no valid netlink message found while parsing destination record") + } + + //Now Parse and get IPVS related attributes messages packed in this message. + ipvsAttrs, err := nl.ParseRouteAttr(NetLinkAttrs[0].Value) + if err != nil { + return nil, err + } + + //Assemble netlink attributes and create a Destination record + dst, err = assembleDestination(ipvsAttrs) + if err != nil { + return nil, err + } + + return dst, nil +} + +// doGetDestinationsCmd a wrapper function to be used by GetDestinations and GetDestination(d) apis +func (i *Handle) doGetDestinationsCmd(s *Service, d *Destination) ([]*Destination, error) { + + var res []*Destination + + msgs, err := i.doCmdwithResponse(s, d, ipvsCmdGetDest) + if err != nil { + return nil, err + } + + for _, msg := range msgs { + dest, err := i.parseDestination(msg) + if err != nil { + return res, err + } + res = append(res, dest) + } + return res, nil +} + +// IPVS related netlink message format explained + +/* EACH NETLINK MSG is of the below format, this is what we will receive from execute() api. + If we have multiple netlink objects to process like GetServices() etc., execute() will + supply an array of this below object + + NETLINK MSG +|-----------------------------------| + 0 1 2 3 +|--------|--------|--------|--------| - +| CMD ID | VER | RESERVED | |==> General Message Header represented by genlMsgHdr +|-----------------------------------| - +| ATTR LEN | ATTR TYPE | | +|-----------------------------------| | +| | | +| VALUE | | +| []byte Array of IPVS MSG | |==> Attribute Message represented by syscall.NetlinkRouteAttr +| PADDED BY 4 BYTES | | +| | | +|-----------------------------------| - + + + Once We strip genlMsgHdr from above NETLINK MSG, we should parse the VALUE. + VALUE will have an array of netlink attributes (syscall.NetlinkRouteAttr) such that each attribute will + represent a "Service" or "Destination" object's field. If we assemble these attributes we can construct + Service or Destination. + + IPVS MSG +|-----------------------------------| + 0 1 2 3 +|--------|--------|--------|--------| +| ATTR LEN | ATTR TYPE | +|-----------------------------------| +| | +| | +| []byte IPVS ATTRIBUTE BY 4 BYTES | +| | +| | +|-----------------------------------| + NEXT ATTRIBUTE +|-----------------------------------| +| ATTR LEN | ATTR TYPE | +|-----------------------------------| +| | +| | +| []byte IPVS ATTRIBUTE BY 4 BYTES | +| | +| | +|-----------------------------------| + NEXT ATTRIBUTE +|-----------------------------------| +| ATTR LEN | ATTR TYPE | +|-----------------------------------| +| | +| | +| []byte IPVS ATTRIBUTE BY 4 BYTES | +| | +| | +|-----------------------------------| + +*/ diff --git a/components/engine/vendor/github.com/docker/libnetwork/network.go b/components/engine/vendor/github.com/docker/libnetwork/network.go index 2b6c705a7b..fa2ab800ae 100644 --- a/components/engine/vendor/github.com/docker/libnetwork/network.go +++ b/components/engine/vendor/github.com/docker/libnetwork/network.go @@ -412,6 +412,9 @@ func (n *network) applyConfigurationTo(to *network) error { } } } + if len(n.ipamType) != 0 { + to.ipamType = n.ipamType + } if len(n.ipamOptions) > 0 { to.ipamOptions = make(map[string]string, len(n.ipamOptions)) for k, v := range n.ipamOptions { diff --git a/components/engine/vendor/github.com/docker/libnetwork/networkdb/cluster.go b/components/engine/vendor/github.com/docker/libnetwork/networkdb/cluster.go index d448c8caef..9156d0da68 100644 --- a/components/engine/vendor/github.com/docker/libnetwork/networkdb/cluster.go +++ b/components/engine/vendor/github.com/docker/libnetwork/networkdb/cluster.go @@ -284,7 +284,6 @@ func (nDB *NetworkDB) reconnectNode() { } if err := nDB.sendNodeEvent(NodeEventTypeJoin); err != nil { - logrus.Errorf("failed to send node join during reconnect: %v", err) return } diff --git a/components/engine/vendor/github.com/docker/libnetwork/networkdb/delegate.go b/components/engine/vendor/github.com/docker/libnetwork/networkdb/delegate.go index 2096ea622e..6df358382f 100644 --- a/components/engine/vendor/github.com/docker/libnetwork/networkdb/delegate.go +++ b/components/engine/vendor/github.com/docker/libnetwork/networkdb/delegate.go @@ -17,6 +17,25 @@ func (d *delegate) NodeMeta(limit int) []byte { return []byte{} } +func (nDB *NetworkDB) getNode(nEvent *NodeEvent) *node { + nDB.Lock() + defer nDB.Unlock() + + for _, nodes := range []map[string]*node{ + nDB.failedNodes, + nDB.leftNodes, + nDB.nodes, + } { + if n, ok := nodes[nEvent.NodeName]; ok { + if n.ltime >= nEvent.LTime { + return nil + } + return n + } + } + return nil +} + func (nDB *NetworkDB) checkAndGetNode(nEvent *NodeEvent) *node { nDB.Lock() defer nDB.Unlock() @@ -63,10 +82,28 @@ func (nDB *NetworkDB) purgeSameNode(n *node) { } func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool { - n := nDB.checkAndGetNode(nEvent) + // Update our local clock if the received messages has newer + // time. + nDB.networkClock.Witness(nEvent.LTime) + + n := nDB.getNode(nEvent) if n == nil { return false } + // If its a node leave event for a manager and this is the only manager we + // know of we want the reconnect logic to kick in. In a single manager + // cluster manager's gossip can't be bootstrapped unless some other node + // connects to it. + if len(nDB.bootStrapIP) == 1 && nEvent.Type == NodeEventTypeLeave { + for _, ip := range nDB.bootStrapIP { + if ip.Equal(n.Addr) { + n.ltime = nEvent.LTime + return true + } + } + } + + n = nDB.checkAndGetNode(nEvent) nDB.purgeSameNode(n) n.ltime = nEvent.LTime @@ -76,11 +113,13 @@ func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool { nDB.Lock() nDB.nodes[n.Name] = n nDB.Unlock() + logrus.Infof("Node join event for %s/%s", n.Name, n.Addr) return true case NodeEventTypeLeave: nDB.Lock() nDB.leftNodes[n.Name] = n nDB.Unlock() + logrus.Infof("Node leave event for %s/%s", n.Name, n.Addr) return true } diff --git a/components/engine/vendor/github.com/docker/libnetwork/networkdb/event_delegate.go b/components/engine/vendor/github.com/docker/libnetwork/networkdb/event_delegate.go index 6ae0a32ad1..23e16832e7 100644 --- a/components/engine/vendor/github.com/docker/libnetwork/networkdb/event_delegate.go +++ b/components/engine/vendor/github.com/docker/libnetwork/networkdb/event_delegate.go @@ -22,6 +22,7 @@ func (e *eventDelegate) broadcastNodeEvent(addr net.IP, op opType) { } func (e *eventDelegate) NotifyJoin(mn *memberlist.Node) { + logrus.Infof("Node %s/%s, joined gossip cluster", mn.Name, mn.Addr) e.broadcastNodeEvent(mn.Addr, opCreate) e.nDB.Lock() // In case the node is rejoining after a failure or leave, @@ -37,9 +38,12 @@ func (e *eventDelegate) NotifyJoin(mn *memberlist.Node) { e.nDB.nodes[mn.Name] = &node{Node: *mn} e.nDB.Unlock() + logrus.Infof("Node %s/%s, added to nodes list", mn.Name, mn.Addr) } func (e *eventDelegate) NotifyLeave(mn *memberlist.Node) { + var failed bool + logrus.Infof("Node %s/%s, left gossip cluster", mn.Name, mn.Addr) e.broadcastNodeEvent(mn.Addr, opDelete) e.nDB.deleteNodeTableEntries(mn.Name) e.nDB.deleteNetworkEntriesForNode(mn.Name) @@ -47,10 +51,17 @@ func (e *eventDelegate) NotifyLeave(mn *memberlist.Node) { if n, ok := e.nDB.nodes[mn.Name]; ok { delete(e.nDB.nodes, mn.Name) - n.reapTime = reapInterval + // In case of node failure, keep retrying to reconnect every retryInterval (1sec) for nodeReapInterval (24h) + // Explicit leave will have already removed the node from the list of nodes (nDB.nodes) and put it into the leftNodes map + n.reapTime = nodeReapInterval e.nDB.failedNodes[mn.Name] = n + failed = true } e.nDB.Unlock() + if failed { + logrus.Infof("Node %s/%s, added to failed nodes list", mn.Name, mn.Addr) + } + } func (e *eventDelegate) NotifyUpdate(n *memberlist.Node) { diff --git a/components/engine/vendor/github.com/docker/libnetwork/networkdb/networkdb.go b/components/engine/vendor/github.com/docker/libnetwork/networkdb/networkdb.go index 86b0128b60..ecb2d714a4 100644 --- a/components/engine/vendor/github.com/docker/libnetwork/networkdb/networkdb.go +++ b/components/engine/vendor/github.com/docker/libnetwork/networkdb/networkdb.go @@ -4,6 +4,7 @@ package networkdb import ( "fmt" + "net" "strings" "sync" "time" @@ -88,6 +89,10 @@ type NetworkDB struct { // Reference to the memberlist's keyring to add & remove keys keyring *memberlist.Keyring + + // bootStrapIP is the list of IPs that can be used to bootstrap + // the gossip. + bootStrapIP []net.IP } // PeerInfo represents the peer (gossip cluster) nodes of a network @@ -194,6 +199,11 @@ func New(c *Config) (*NetworkDB, error) { // Join joins this NetworkDB instance with a list of peer NetworkDB // instances passed by the caller in the form of addr:port func (nDB *NetworkDB) Join(members []string) error { + nDB.Lock() + for _, m := range members { + nDB.bootStrapIP = append(nDB.bootStrapIP, net.ParseIP(m)) + } + nDB.Unlock() return nDB.clusterJoin(members) }