Add filter support for stack ps and services with Kubernetes

Signed-off-by: Mathieu Champlon <mathieu.champlon@docker.com>
Upstream-commit: 1f7aa1c036
Component: cli
This commit is contained in:
Simon Ferquel
2018-04-24 12:28:08 +02:00
committed by Mathieu Champlon
parent 274de24328
commit b8ca625f10
25 changed files with 365 additions and 11973 deletions

View File

@ -76,6 +76,11 @@ func (s *Factory) ReplicaSets() typesappsv1beta2.ReplicaSetInterface {
return s.appsClientSet.ReplicaSets(s.namespace)
}
// DaemonSets returns a client for kubernetes daemon sets
func (s *Factory) DaemonSets() typesappsv1beta2.DaemonSetInterface {
return s.appsClientSet.DaemonSets(s.namespace)
}
// Stacks returns a client for Docker's Stack on Kubernetes
func (s *Factory) Stacks(allNamespaces bool) (StackClient, error) {
version, err := kubernetes.GetStackAPIVersion(s.clientSet)

View File

@ -2,10 +2,13 @@ package kubernetes
import (
"fmt"
"sort"
"strings"
"time"
"github.com/docker/cli/cli/command/formatter"
"github.com/docker/cli/kubernetes/labels"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/api/types/swarm"
appsv1beta2 "k8s.io/api/apps/v1beta2"
apiv1 "k8s.io/api/core/v1"
@ -65,13 +68,43 @@ func toSwarmProtocol(protocol apiv1.Protocol) swarm.PortConfigProtocol {
return swarm.PortConfigProtocol("unknown")
}
func fetchPods(namespace string, pods corev1.PodInterface) ([]apiv1.Pod, error) {
labelSelector := labels.SelectorForStack(namespace)
podsList, err := pods.List(metav1.ListOptions{LabelSelector: labelSelector})
func fetchPods(stackName string, pods corev1.PodInterface, f filters.Args) ([]apiv1.Pod, error) {
services := f.Get("service")
// for existing script compatibility, support either <servicename> or <stackname>_<servicename> format
stackNamePrefix := stackName + "_"
for _, s := range services {
if strings.HasPrefix(s, stackNamePrefix) {
services = append(services, strings.TrimPrefix(s, stackNamePrefix))
}
}
listOpts := metav1.ListOptions{LabelSelector: labels.SelectorForStack(stackName, services...)}
var result []apiv1.Pod
podsList, err := pods.List(listOpts)
if err != nil {
return nil, err
}
return podsList.Items, nil
nodes := f.Get("node")
for _, pod := range podsList.Items {
if filterPod(pod, nodes) &&
// name filter is done client side for matching partials
f.FuzzyMatch("name", stackNamePrefix+pod.Name) {
result = append(result, pod)
}
}
return result, nil
}
func filterPod(pod apiv1.Pod, nodes []string) bool {
if len(nodes) == 0 {
return true
}
for _, name := range nodes {
if pod.Spec.NodeName == name {
return true
}
}
return false
}
func getContainerImage(containers []apiv1.Container) string {
@ -121,56 +154,76 @@ const (
publishedOnRandomPortSuffix = "-random-ports"
)
// Replicas conversion
func replicasToServices(replicas *appsv1beta2.ReplicaSetList, services *apiv1.ServiceList) ([]swarm.Service, map[string]formatter.ServiceListInfo, error) {
func convertToServices(replicas *appsv1beta2.ReplicaSetList, daemons *appsv1beta2.DaemonSetList, services *apiv1.ServiceList) ([]swarm.Service, map[string]formatter.ServiceListInfo, error) {
result := make([]swarm.Service, len(replicas.Items))
infos := make(map[string]formatter.ServiceListInfo, len(replicas.Items))
infos := make(map[string]formatter.ServiceListInfo, len(replicas.Items)+len(daemons.Items))
for i, r := range replicas.Items {
serviceName := r.Labels[labels.ForServiceName]
serviceHeadless, ok := findService(services, serviceName)
if !ok {
return nil, nil, fmt.Errorf("could not find service '%s'", serviceName)
s, err := convertToService(r.Labels[labels.ForServiceName], services, r.Spec.Template.Spec.Containers)
if err != nil {
return nil, nil, err
}
stack, ok := serviceHeadless.Labels[labels.ForStackName]
if ok {
stack += "_"
}
uid := string(serviceHeadless.UID)
s := swarm.Service{
ID: uid,
Spec: swarm.ServiceSpec{
Annotations: swarm.Annotations{
Name: stack + serviceHeadless.Name,
},
TaskTemplate: swarm.TaskSpec{
ContainerSpec: &swarm.ContainerSpec{
Image: getContainerImage(r.Spec.Template.Spec.Containers),
},
},
},
}
if serviceNodePort, ok := findService(services, serviceName+publishedOnRandomPortSuffix); ok && serviceNodePort.Spec.Type == apiv1.ServiceTypeNodePort {
s.Endpoint = serviceEndpoint(serviceNodePort, swarm.PortConfigPublishModeHost)
}
if serviceLoadBalancer, ok := findService(services, serviceName+publishedServiceSuffix); ok && serviceLoadBalancer.Spec.Type == apiv1.ServiceTypeLoadBalancer {
s.Endpoint = serviceEndpoint(serviceLoadBalancer, swarm.PortConfigPublishModeIngress)
}
result[i] = s
infos[uid] = formatter.ServiceListInfo{
result[i] = *s
infos[s.ID] = formatter.ServiceListInfo{
Mode: "replicated",
Replicas: fmt.Sprintf("%d/%d", r.Status.AvailableReplicas, r.Status.Replicas),
}
}
for _, d := range daemons.Items {
s, err := convertToService(d.Labels[labels.ForServiceName], services, d.Spec.Template.Spec.Containers)
if err != nil {
return nil, nil, err
}
result = append(result, *s)
infos[s.ID] = formatter.ServiceListInfo{
Mode: "global",
Replicas: fmt.Sprintf("%d/%d", d.Status.NumberReady, d.Status.DesiredNumberScheduled),
}
}
sort.Slice(result, func(i, j int) bool {
return result[i].ID < result[j].ID
})
return result, infos, nil
}
func findService(services *apiv1.ServiceList, name string) (apiv1.Service, bool) {
func convertToService(serviceName string, services *apiv1.ServiceList, containers []apiv1.Container) (*swarm.Service, error) {
serviceHeadless, err := findService(services, serviceName)
if err != nil {
return nil, err
}
stack, ok := serviceHeadless.Labels[labels.ForStackName]
if ok {
stack += "_"
}
uid := string(serviceHeadless.UID)
s := &swarm.Service{
ID: uid,
Spec: swarm.ServiceSpec{
Annotations: swarm.Annotations{
Name: stack + serviceHeadless.Name,
},
TaskTemplate: swarm.TaskSpec{
ContainerSpec: &swarm.ContainerSpec{
Image: getContainerImage(containers),
},
},
},
}
if serviceNodePort, err := findService(services, serviceName+publishedOnRandomPortSuffix); err == nil && serviceNodePort.Spec.Type == apiv1.ServiceTypeNodePort {
s.Endpoint = serviceEndpoint(serviceNodePort, swarm.PortConfigPublishModeHost)
}
if serviceLoadBalancer, err := findService(services, serviceName+publishedServiceSuffix); err == nil && serviceLoadBalancer.Spec.Type == apiv1.ServiceTypeLoadBalancer {
s.Endpoint = serviceEndpoint(serviceLoadBalancer, swarm.PortConfigPublishModeIngress)
}
return s, nil
}
func findService(services *apiv1.ServiceList, name string) (apiv1.Service, error) {
for _, s := range services.Items {
if s.Name == name {
return s, true
return s, nil
}
}
return apiv1.Service{}, false
return apiv1.Service{}, fmt.Errorf("could not find service '%s'", name)
}
func serviceEndpoint(service apiv1.Service, publishMode swarm.PortConfigPublishMode) swarm.Endpoint {

View File

@ -19,7 +19,7 @@ func TestReplicasConversionNeedsAService(t *testing.T) {
Items: []appsv1beta2.ReplicaSet{makeReplicaSet("unknown", 0, 0)},
}
services := apiv1.ServiceList{}
_, _, err := replicasToServices(&replicas, &services)
_, _, err := convertToServices(&replicas, &appsv1beta2.DaemonSetList{}, &services)
assert.ErrorContains(t, err, "could not find service")
}
@ -124,7 +124,7 @@ func TestKubernetesServiceToSwarmServiceConversion(t *testing.T) {
}
for _, tc := range testCases {
swarmServices, listInfo, err := replicasToServices(tc.replicas, tc.services)
swarmServices, listInfo, err := convertToServices(tc.replicas, &appsv1beta2.DaemonSetList{}, tc.services)
assert.NilError(t, err)
assert.DeepEqual(t, tc.expectedServices, swarmServices)
assert.DeepEqual(t, tc.expectedListInfo, listInfo)

View File

@ -10,17 +10,23 @@ import (
"github.com/docker/cli/cli/command/task"
"github.com/docker/docker/api/types/swarm"
apiv1 "k8s.io/api/core/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/kubernetes/pkg/api"
)
var supportedPSFilters = map[string]bool{
"name": true,
"service": true,
"node": true,
}
// RunPS is the kubernetes implementation of docker stack ps
func RunPS(dockerCli *KubeCli, options options.PS) error {
namespace := options.Namespace
// Initialize clients
filters := options.Filter.Value()
if err := filters.Validate(supportedPSFilters); err != nil {
return err
}
client, err := dockerCli.composeClient()
if err != nil {
return err
@ -29,78 +35,71 @@ func RunPS(dockerCli *KubeCli, options options.PS) error {
if err != nil {
return err
}
podsClient := client.Pods()
// Fetch pods
if _, err := stacks.Get(namespace); err != nil {
return fmt.Errorf("nothing found in stack: %s", namespace)
stackName := options.Namespace
_, err = stacks.Get(stackName)
if apierrs.IsNotFound(err) {
return fmt.Errorf("nothing found in stack: %s", stackName)
}
pods, err := fetchPods(namespace, podsClient)
if err != nil {
return err
}
if len(pods) == 0 {
return fmt.Errorf("nothing found in stack: %s", namespace)
pods, err := fetchPods(stackName, client.Pods(), filters)
if err != nil {
return err
}
if len(pods) == 0 {
return fmt.Errorf("nothing found in stack: %s", stackName)
}
return printTasks(dockerCli, options, stackName, client, pods)
}
func printTasks(dockerCli command.Cli, options options.PS, namespace string, client corev1.NodesGetter, pods []apiv1.Pod) error {
format := options.Format
if len(format) == 0 {
if format == "" {
format = task.DefaultFormat(dockerCli.ConfigFile(), options.Quiet)
}
nodeResolver := makeNodeResolver(options.NoResolve, client.Nodes())
tasks := make([]swarm.Task, len(pods))
for i, pod := range pods {
tasks[i] = podToTask(pod)
}
return print(dockerCli, namespace, tasks, pods, nodeResolver, !options.NoTrunc, options.Quiet, format)
}
type idResolver func(name string) (string, error)
func print(dockerCli command.Cli, namespace string, tasks []swarm.Task, pods []apiv1.Pod, nodeResolver idResolver, trunc, quiet bool, format string) error {
sort.Stable(tasksBySlot(tasks))
names := map[string]string{}
nodes := map[string]string{}
tasksCtx := formatter.Context{
Output: dockerCli.Out(),
Format: formatter.NewTaskFormat(format, quiet),
Trunc: trunc,
n, err := client.Nodes().List(metav1.ListOptions{})
if err != nil {
return err
}
for i, task := range tasks {
nodeValue, err := nodeResolver(pods[i].Spec.NodeName)
nodeValue, err := resolveNode(pods[i].Spec.NodeName, n, options.NoResolve)
if err != nil {
return err
}
names[task.ID] = fmt.Sprintf("%s_%s", namespace, pods[i].Name)
nodes[task.ID] = nodeValue
}
tasksCtx := formatter.Context{
Output: dockerCli.Out(),
Format: formatter.NewTaskFormat(format, options.Quiet),
Trunc: !options.NoTrunc,
}
return formatter.TaskWrite(tasksCtx, tasks, names, nodes)
}
func makeNodeResolver(noResolve bool, nodes corev1.NodeInterface) func(string) (string, error) {
func resolveNode(name string, nodes *apiv1.NodeList, noResolve bool) (string, error) {
// Here we have a name and we need to resolve its identifier. To mimic swarm behavior
// we need to resolve the id when noresolve is set, otherwise we return the name.
// we need to resolve to the id when noResolve is set, otherwise we return the name.
if noResolve {
return func(name string) (string, error) {
n, err := nodes.List(metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector(api.ObjectNameField, name).String(),
})
if err != nil {
return "", err
for _, node := range nodes.Items {
if node.Name == name {
return string(node.UID), nil
}
if len(n.Items) != 1 {
return "", fmt.Errorf("could not find node '%s'", name)
}
return string(n.Items[0].UID), nil
}
return "", fmt.Errorf("could not find node '%s'", name)
}
return func(name string) (string, error) { return name, nil }
return name, nil
}

View File

@ -2,43 +2,124 @@ package kubernetes
import (
"fmt"
"sort"
"strings"
"github.com/docker/cli/cli/command/formatter"
"github.com/docker/cli/cli/command/stack/options"
"github.com/docker/cli/kubernetes/labels"
"github.com/docker/docker/api/types/filters"
appsv1beta2 "k8s.io/api/apps/v1beta2"
corev1 "k8s.io/api/core/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
var supportedServicesFilters = map[string]bool{
"mode": true,
"name": true,
"label": true,
}
func generateSelector(labels map[string][]string) []string {
var result []string
for k, v := range labels {
switch len(v) {
case 0:
result = append(result, k)
case 1:
result = append(result, fmt.Sprintf("%s=%s", k, v[0]))
default:
sort.Strings(v)
result = append(result, fmt.Sprintf("%s in (%s)", k, strings.Join(v, ",")))
}
}
return result
}
func parseLabelFilters(rawFilters []string) map[string][]string {
labels := map[string][]string{}
for _, rawLabel := range rawFilters {
v := strings.SplitN(rawLabel, "=", 2)
key := v[0]
if len(v) > 1 {
labels[key] = append(labels[key], v[1])
} else if _, ok := labels[key]; !ok {
labels[key] = []string{}
}
}
return labels
}
func generateLabelSelector(f filters.Args, stackName string) string {
names := f.Get("name")
sort.Strings(names)
for _, n := range names {
if strings.HasPrefix(n, stackName+"_") {
// also accepts with unprefixed service name (for compat with existing swarm scripts where service names are prefixed by stack names)
names = append(names, strings.TrimPrefix(n, stackName+"_"))
}
}
selectors := append(generateSelector(parseLabelFilters(f.Get("label"))), labels.SelectorForStack(stackName, names...))
return strings.Join(selectors, ",")
}
func getResourcesForServiceList(dockerCli *KubeCli, filters filters.Args, labelSelector string) (*appsv1beta2.ReplicaSetList, *appsv1beta2.DaemonSetList, *corev1.ServiceList, error) {
client, err := dockerCli.composeClient()
if err != nil {
return nil, nil, nil, err
}
modes := filters.Get("mode")
replicas := &appsv1beta2.ReplicaSetList{}
if len(modes) == 0 || filters.ExactMatch("mode", "replicated") {
if replicas, err = client.ReplicaSets().List(metav1.ListOptions{LabelSelector: labelSelector}); err != nil {
return nil, nil, nil, err
}
}
daemons := &appsv1beta2.DaemonSetList{}
if len(modes) == 0 || filters.ExactMatch("mode", "global") {
if daemons, err = client.DaemonSets().List(metav1.ListOptions{LabelSelector: labelSelector}); err != nil {
return nil, nil, nil, err
}
}
services, err := client.Services().List(metav1.ListOptions{LabelSelector: labelSelector})
if err != nil {
return nil, nil, nil, err
}
return replicas, daemons, services, nil
}
// RunServices is the kubernetes implementation of docker stack services
func RunServices(dockerCli *KubeCli, opts options.Services) error {
// Initialize clients
filters := opts.Filter.Value()
if err := filters.Validate(supportedServicesFilters); err != nil {
return err
}
client, err := dockerCli.composeClient()
if err != nil {
return nil
}
stacks, err := client.Stacks(false)
if err != nil {
return err
}
replicas := client.ReplicaSets()
if _, err := stacks.Get(opts.Namespace); err != nil {
fmt.Fprintf(dockerCli.Err(), "Nothing found in stack: %s\n", opts.Namespace)
return nil
}
replicasList, err := replicas.List(metav1.ListOptions{LabelSelector: labels.SelectorForStack(opts.Namespace)})
stackName := opts.Namespace
_, err = stacks.Get(stackName)
if apierrs.IsNotFound(err) {
return fmt.Errorf("nothing found in stack: %s", stackName)
}
if err != nil {
return err
}
servicesList, err := client.Services().List(metav1.ListOptions{LabelSelector: labels.SelectorForStack(opts.Namespace)})
labelSelector := generateLabelSelector(filters, stackName)
replicasList, daemonsList, servicesList, err := getResourcesForServiceList(dockerCli, filters, labelSelector)
if err != nil {
return err
}
// Convert Replicas sets and kubernetes services to swam services and formatter informations
services, info, err := replicasToServices(replicasList, servicesList)
services, info, err := convertToServices(replicasList, daemonsList, servicesList)
if err != nil {
return err
}

View File

@ -0,0 +1,115 @@
package kubernetes
import (
"testing"
"github.com/docker/docker/api/types/filters"
"github.com/gotestyourself/gotestyourself/assert"
"github.com/gotestyourself/gotestyourself/assert/cmp"
)
func TestServiceFiltersLabelSelectorGen(t *testing.T) {
cases := []struct {
name string
stackName string
filters filters.Args
expectedSelectorParts []string
}{
{
name: "no-filter",
stackName: "test",
filters: filters.NewArgs(),
expectedSelectorParts: []string{
"com.docker.stack.namespace=test",
},
},
{
name: "single-name filter",
stackName: "test",
filters: filters.NewArgs(filters.KeyValuePair{Key: "name", Value: "svc-test"}),
expectedSelectorParts: []string{
"com.docker.stack.namespace=test",
"com.docker.service.name=svc-test",
},
},
{
name: "multi-name filter",
stackName: "test",
filters: filters.NewArgs(
filters.KeyValuePair{Key: "name", Value: "svc-test"},
filters.KeyValuePair{Key: "name", Value: "svc-test2"},
),
expectedSelectorParts: []string{
"com.docker.stack.namespace=test",
"com.docker.service.name in (svc-test,svc-test2)",
},
},
{
name: "label present filter",
stackName: "test",
filters: filters.NewArgs(
filters.KeyValuePair{Key: "label", Value: "label-is-present"},
),
expectedSelectorParts: []string{
"com.docker.stack.namespace=test",
"label-is-present",
},
},
{
name: "single value label filter",
stackName: "test",
filters: filters.NewArgs(
filters.KeyValuePair{Key: "label", Value: "label1=test"},
),
expectedSelectorParts: []string{
"com.docker.stack.namespace=test",
"label1=test",
},
},
{
name: "multi value label filter",
stackName: "test",
filters: filters.NewArgs(
filters.KeyValuePair{Key: "label", Value: "label1=test"},
filters.KeyValuePair{Key: "label", Value: "label1=test2"},
),
expectedSelectorParts: []string{
"com.docker.stack.namespace=test",
"label1 in (test,test2)",
},
},
{
name: "2 different labels filter",
stackName: "test",
filters: filters.NewArgs(
filters.KeyValuePair{Key: "label", Value: "label1=test"},
filters.KeyValuePair{Key: "label", Value: "label2=test2"},
),
expectedSelectorParts: []string{
"com.docker.stack.namespace=test",
"label1=test",
"label2=test2",
},
},
{
name: "name filter with stackName prefix",
stackName: "test",
filters: filters.NewArgs(
filters.KeyValuePair{Key: "name", Value: "test_svc1"},
),
expectedSelectorParts: []string{
"com.docker.stack.namespace=test",
"com.docker.service.name in (test_svc1,svc1)",
},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
result := generateLabelSelector(c.filters, c.stackName)
for _, toFind := range c.expectedSelectorParts {
assert.Assert(t, cmp.Contains(result, toFind))
}
})
}
}

View File

@ -37,7 +37,6 @@ func newPsCommand(dockerCli command.Cli) *cobra.Command {
flags.BoolVar(&opts.NoTrunc, "no-trunc", false, "Do not truncate output")
flags.BoolVar(&opts.NoResolve, "no-resolve", false, "Do not map IDs to Names")
flags.VarP(&opts.Filter, "filter", "f", "Filter output based on conditions provided")
flags.SetAnnotation("filter", "swarm", nil)
flags.BoolVarP(&opts.Quiet, "quiet", "q", false, "Only display task IDs")
flags.StringVar(&opts.Format, "format", "", "Pretty-print tasks using a Go template")
kubernetes.AddNamespaceFlag(flags)

View File

@ -37,7 +37,6 @@ func newServicesCommand(dockerCli command.Cli) *cobra.Command {
flags.BoolVarP(&opts.Quiet, "quiet", "q", false, "Only display IDs")
flags.StringVar(&opts.Format, "format", "", "Pretty-print services using a Go template")
flags.VarP(&opts.Filter, "filter", "f", "Filter output based on conditions provided")
flags.SetAnnotation("filter", "swarm", nil)
kubernetes.AddNamespaceFlag(flags)
return cmd
}