diff --git a/components/engine/api/server/router/swarm/backend.go b/components/engine/api/server/router/swarm/backend.go index 19954eee85..c65d705c5c 100644 --- a/components/engine/api/server/router/swarm/backend.go +++ b/components/engine/api/server/router/swarm/backend.go @@ -2,7 +2,9 @@ package swarm import ( basictypes "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/backend" types "github.com/docker/docker/api/types/swarm" + "golang.org/x/net/context" ) // Backend abstracts an swarm commands manager. @@ -19,6 +21,7 @@ type Backend interface { CreateService(types.ServiceSpec, string) (string, error) UpdateService(string, uint64, types.ServiceSpec, string, string) error RemoveService(string) error + ServiceLogs(context.Context, string, *backend.ContainerLogsConfig, chan struct{}) error GetNodes(basictypes.NodeListOptions) ([]types.Node, error) GetNode(string) (types.Node, error) UpdateNode(string, uint64, types.NodeSpec) error diff --git a/components/engine/api/server/router/swarm/cluster.go b/components/engine/api/server/router/swarm/cluster.go index 8e9b58d110..e31c6ad075 100644 --- a/components/engine/api/server/router/swarm/cluster.go +++ b/components/engine/api/server/router/swarm/cluster.go @@ -1,6 +1,9 @@ package swarm -import "github.com/docker/docker/api/server/router" +import ( + "github.com/docker/docker/api/server/router" + "github.com/docker/docker/daemon" +) // buildRouter is a router to talk with the build controller type swarmRouter struct { @@ -9,11 +12,14 @@ type swarmRouter struct { } // NewRouter initializes a new build router -func NewRouter(b Backend) router.Router { +func NewRouter(d *daemon.Daemon, b Backend) router.Router { r := &swarmRouter{ backend: b, } r.initRoutes() + if d.HasExperimental() { + r.addExperimentalRoutes() + } return r } @@ -22,6 +28,12 @@ func (sr *swarmRouter) Routes() []router.Route { return sr.routes } +func (sr *swarmRouter) addExperimentalRoutes() { + sr.routes = append(sr.routes, + router.Cancellable(router.NewGetRoute("/services/{id}/logs", sr.getServiceLogs)), + ) +} + func (sr *swarmRouter) initRoutes() { sr.routes = []router.Route{ router.NewPostRoute("/swarm/init", sr.initCluster), @@ -32,20 +44,20 @@ func (sr *swarmRouter) initRoutes() { router.NewPostRoute("/swarm/update", sr.updateCluster), router.NewPostRoute("/swarm/unlock", sr.unlockCluster), router.NewGetRoute("/services", sr.getServices), - router.NewGetRoute("/services/{id:.*}", sr.getService), + router.NewGetRoute("/services/{id}", sr.getService), router.NewPostRoute("/services/create", sr.createService), - router.NewPostRoute("/services/{id:.*}/update", sr.updateService), - router.NewDeleteRoute("/services/{id:.*}", sr.removeService), + router.NewPostRoute("/services/{id}/update", sr.updateService), + router.NewDeleteRoute("/services/{id}", sr.removeService), router.NewGetRoute("/nodes", sr.getNodes), - router.NewGetRoute("/nodes/{id:.*}", sr.getNode), - router.NewDeleteRoute("/nodes/{id:.*}", sr.removeNode), - router.NewPostRoute("/nodes/{id:.*}/update", sr.updateNode), + router.NewGetRoute("/nodes/{id}", sr.getNode), + router.NewDeleteRoute("/nodes/{id}", sr.removeNode), + router.NewPostRoute("/nodes/{id}/update", sr.updateNode), router.NewGetRoute("/tasks", sr.getTasks), - router.NewGetRoute("/tasks/{id:.*}", sr.getTask), + router.NewGetRoute("/tasks/{id}", sr.getTask), router.NewGetRoute("/secrets", sr.getSecrets), router.NewPostRoute("/secrets", sr.createSecret), - router.NewDeleteRoute("/secrets/{id:.*}", sr.removeSecret), - router.NewGetRoute("/secrets/{id:.*}", sr.getSecret), - router.NewPostRoute("/secrets/{id:.*}/update", sr.updateSecret), + router.NewDeleteRoute("/secrets/{id}", sr.removeSecret), + router.NewGetRoute("/secrets/{id}", sr.getSecret), + router.NewPostRoute("/secrets/{id}/update", sr.updateSecret), } } diff --git a/components/engine/api/server/router/swarm/cluster_routes.go b/components/engine/api/server/router/swarm/cluster_routes.go index 2c380b4b56..3ccbe2ba8b 100644 --- a/components/engine/api/server/router/swarm/cluster_routes.go +++ b/components/engine/api/server/router/swarm/cluster_routes.go @@ -10,6 +10,7 @@ import ( "github.com/docker/docker/api/errors" "github.com/docker/docker/api/server/httputils" basictypes "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/backend" "github.com/docker/docker/api/types/filters" types "github.com/docker/docker/api/types/swarm" "golang.org/x/net/context" @@ -208,6 +209,59 @@ func (sr *swarmRouter) removeService(ctx context.Context, w http.ResponseWriter, return nil } +func (sr *swarmRouter) getServiceLogs(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error { + if err := httputils.ParseForm(r); err != nil { + return err + } + + // Args are validated before the stream starts because when it starts we're + // sending HTTP 200 by writing an empty chunk of data to tell the client that + // daemon is going to stream. By sending this initial HTTP 200 we can't report + // any error after the stream starts (i.e. container not found, wrong parameters) + // with the appropriate status code. + stdout, stderr := httputils.BoolValue(r, "stdout"), httputils.BoolValue(r, "stderr") + if !(stdout || stderr) { + return fmt.Errorf("Bad parameters: you must choose at least one stream") + } + + serviceName := vars["id"] + logsConfig := &backend.ContainerLogsConfig{ + ContainerLogsOptions: basictypes.ContainerLogsOptions{ + Follow: httputils.BoolValue(r, "follow"), + Timestamps: httputils.BoolValue(r, "timestamps"), + Since: r.Form.Get("since"), + Tail: r.Form.Get("tail"), + ShowStdout: stdout, + ShowStderr: stderr, + Details: httputils.BoolValue(r, "details"), + }, + OutStream: w, + } + + if !logsConfig.Follow { + return fmt.Errorf("Bad parameters: Only follow mode is currently supported") + } + + if logsConfig.Details { + return fmt.Errorf("Bad parameters: details is not currently supported") + } + + chStarted := make(chan struct{}) + if err := sr.backend.ServiceLogs(ctx, serviceName, logsConfig, chStarted); err != nil { + select { + case <-chStarted: + // The client may be expecting all of the data we're sending to + // be multiplexed, so send it through OutStream, which will + // have been set up to handle that if needed. + fmt.Fprintf(logsConfig.OutStream, "Error grabbing service logs: %v\n", err) + default: + return err + } + } + + return nil +} + func (sr *swarmRouter) getNodes(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error { if err := httputils.ParseForm(r); err != nil { return err diff --git a/components/engine/cli/command/service/cmd.go b/components/engine/cli/command/service/cmd.go index f4f7d00f91..63f2db7171 100644 --- a/components/engine/cli/command/service/cmd.go +++ b/components/engine/cli/command/service/cmd.go @@ -26,6 +26,7 @@ func NewServiceCommand(dockerCli *command.DockerCli) *cobra.Command { newRemoveCommand(dockerCli), newScaleCommand(dockerCli), newUpdateCommand(dockerCli), + newLogsCommand(dockerCli), ) return cmd } diff --git a/components/engine/cli/command/service/logs.go b/components/engine/cli/command/service/logs.go new file mode 100644 index 0000000000..19d3d9a488 --- /dev/null +++ b/components/engine/cli/command/service/logs.go @@ -0,0 +1,163 @@ +package service + +import ( + "bytes" + "fmt" + "io" + "strings" + + "golang.org/x/net/context" + + "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/swarm" + "github.com/docker/docker/cli" + "github.com/docker/docker/cli/command" + "github.com/docker/docker/cli/command/idresolver" + "github.com/docker/docker/pkg/stdcopy" + "github.com/spf13/cobra" +) + +type logsOptions struct { + noResolve bool + follow bool + since string + timestamps bool + details bool + tail string + + service string +} + +func newLogsCommand(dockerCli *command.DockerCli) *cobra.Command { + var opts logsOptions + + cmd := &cobra.Command{ + Use: "logs [OPTIONS] SERVICE", + Short: "Fetch the logs of a service", + Args: cli.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + opts.service = args[0] + return runLogs(dockerCli, &opts) + }, + Tags: map[string]string{"experimental": ""}, + } + + flags := cmd.Flags() + flags.BoolVar(&opts.noResolve, "no-resolve", false, "Do not map IDs to Names") + flags.BoolVarP(&opts.follow, "follow", "f", false, "Follow log output") + flags.StringVar(&opts.since, "since", "", "Show logs since timestamp") + flags.BoolVarP(&opts.timestamps, "timestamps", "t", false, "Show timestamps") + flags.BoolVar(&opts.details, "details", false, "Show extra details provided to logs") + flags.StringVar(&opts.tail, "tail", "all", "Number of lines to show from the end of the logs") + return cmd +} + +func runLogs(dockerCli *command.DockerCli, opts *logsOptions) error { + ctx := context.Background() + + options := types.ContainerLogsOptions{ + ShowStdout: true, + ShowStderr: true, + Since: opts.since, + Timestamps: opts.timestamps, + Follow: opts.follow, + Tail: opts.tail, + Details: opts.details, + } + + client := dockerCli.Client() + responseBody, err := client.ServiceLogs(ctx, opts.service, options) + if err != nil { + return err + } + defer responseBody.Close() + + resolver := idresolver.New(client, opts.noResolve) + + stdout := &logWriter{ctx: ctx, opts: opts, r: resolver, w: dockerCli.Out()} + stderr := &logWriter{ctx: ctx, opts: opts, r: resolver, w: dockerCli.Err()} + + // TODO(aluzzardi): Do an io.Copy for services with TTY enabled. + _, err = stdcopy.StdCopy(stdout, stderr, responseBody) + return err +} + +type logWriter struct { + ctx context.Context + opts *logsOptions + r *idresolver.IDResolver + w io.Writer +} + +func (lw *logWriter) Write(buf []byte) (int, error) { + contextIndex := 0 + numParts := 2 + if lw.opts.timestamps { + contextIndex++ + numParts++ + } + + parts := bytes.SplitN(buf, []byte(" "), numParts) + if len(parts) != numParts { + return 0, fmt.Errorf("invalid context in log message: %v", string(buf)) + } + + taskName, nodeName, err := lw.parseContext(string(parts[contextIndex])) + if err != nil { + return 0, err + } + + output := []byte{} + for i, part := range parts { + // First part doesn't get space separation. + if i > 0 { + output = append(output, []byte(" ")...) + } + + if i == contextIndex { + // TODO(aluzzardi): Consider constant padding. + output = append(output, []byte(fmt.Sprintf("%s@%s |", taskName, nodeName))...) + } else { + output = append(output, part...) + } + } + _, err = lw.w.Write(output) + if err != nil { + return 0, err + } + + return len(buf), nil +} + +func (lw *logWriter) parseContext(input string) (string, string, error) { + context := make(map[string]string) + + components := strings.Split(input, ",") + for _, component := range components { + parts := strings.SplitN(component, "=", 2) + if len(parts) != 2 { + return "", "", fmt.Errorf("invalid context: %s", input) + } + context[parts[0]] = parts[1] + } + + taskID, ok := context["com.docker.swarm.task.id"] + if !ok { + return "", "", fmt.Errorf("missing task id in context: %s", input) + } + taskName, err := lw.r.Resolve(lw.ctx, swarm.Task{}, taskID) + if err != nil { + return "", "", err + } + + nodeID, ok := context["com.docker.swarm.node.id"] + if !ok { + return "", "", fmt.Errorf("missing node id in context: %s", input) + } + nodeName, err := lw.r.Resolve(lw.ctx, swarm.Node{}, nodeID) + if err != nil { + return "", "", err + } + + return taskName, nodeName, nil +} diff --git a/components/engine/client/interface.go b/components/engine/client/interface.go index f24c9a51f6..883e8801fa 100644 --- a/components/engine/client/interface.go +++ b/components/engine/client/interface.go @@ -111,6 +111,7 @@ type ServiceAPIClient interface { ServiceList(ctx context.Context, options types.ServiceListOptions) ([]swarm.Service, error) ServiceRemove(ctx context.Context, serviceID string) error ServiceUpdate(ctx context.Context, serviceID string, version swarm.Version, service swarm.ServiceSpec, options types.ServiceUpdateOptions) error + ServiceLogs(ctx context.Context, serviceID string, options types.ContainerLogsOptions) (io.ReadCloser, error) TaskInspectWithRaw(ctx context.Context, taskID string) (swarm.Task, []byte, error) TaskList(ctx context.Context, options types.TaskListOptions) ([]swarm.Task, error) } diff --git a/components/engine/client/service_logs.go b/components/engine/client/service_logs.go new file mode 100644 index 0000000000..24384e3ec0 --- /dev/null +++ b/components/engine/client/service_logs.go @@ -0,0 +1,52 @@ +package client + +import ( + "io" + "net/url" + "time" + + "golang.org/x/net/context" + + "github.com/docker/docker/api/types" + timetypes "github.com/docker/docker/api/types/time" +) + +// ServiceLogs returns the logs generated by a service in an io.ReadCloser. +// It's up to the caller to close the stream. +func (cli *Client) ServiceLogs(ctx context.Context, serviceID string, options types.ContainerLogsOptions) (io.ReadCloser, error) { + query := url.Values{} + if options.ShowStdout { + query.Set("stdout", "1") + } + + if options.ShowStderr { + query.Set("stderr", "1") + } + + if options.Since != "" { + ts, err := timetypes.GetTimestamp(options.Since, time.Now()) + if err != nil { + return nil, err + } + query.Set("since", ts) + } + + if options.Timestamps { + query.Set("timestamps", "1") + } + + if options.Details { + query.Set("details", "1") + } + + if options.Follow { + query.Set("follow", "1") + } + query.Set("tail", options.Tail) + + resp, err := cli.get(ctx, "/services/"+serviceID+"/logs", query, nil) + if err != nil { + return nil, err + } + return resp.body, nil +} diff --git a/components/engine/client/service_logs_test.go b/components/engine/client/service_logs_test.go new file mode 100644 index 0000000000..a6d002ba75 --- /dev/null +++ b/components/engine/client/service_logs_test.go @@ -0,0 +1,133 @@ +package client + +import ( + "bytes" + "fmt" + "io" + "io/ioutil" + "log" + "net/http" + "os" + "strings" + "testing" + "time" + + "github.com/docker/docker/api/types" + + "golang.org/x/net/context" +) + +func TestServiceLogsError(t *testing.T) { + client := &Client{ + client: newMockClient(errorMock(http.StatusInternalServerError, "Server error")), + } + _, err := client.ServiceLogs(context.Background(), "service_id", types.ContainerLogsOptions{}) + if err == nil || err.Error() != "Error response from daemon: Server error" { + t.Fatalf("expected a Server Error, got %v", err) + } + _, err = client.ServiceLogs(context.Background(), "service_id", types.ContainerLogsOptions{ + Since: "2006-01-02TZ", + }) + if err == nil || !strings.Contains(err.Error(), `parsing time "2006-01-02TZ"`) { + t.Fatalf("expected a 'parsing time' error, got %v", err) + } +} + +func TestServiceLogs(t *testing.T) { + expectedURL := "/services/service_id/logs" + cases := []struct { + options types.ContainerLogsOptions + expectedQueryParams map[string]string + }{ + { + expectedQueryParams: map[string]string{ + "tail": "", + }, + }, + { + options: types.ContainerLogsOptions{ + Tail: "any", + }, + expectedQueryParams: map[string]string{ + "tail": "any", + }, + }, + { + options: types.ContainerLogsOptions{ + ShowStdout: true, + ShowStderr: true, + Timestamps: true, + Details: true, + Follow: true, + }, + expectedQueryParams: map[string]string{ + "tail": "", + "stdout": "1", + "stderr": "1", + "timestamps": "1", + "details": "1", + "follow": "1", + }, + }, + { + options: types.ContainerLogsOptions{ + // An complete invalid date, timestamp or go duration will be + // passed as is + Since: "invalid but valid", + }, + expectedQueryParams: map[string]string{ + "tail": "", + "since": "invalid but valid", + }, + }, + } + for _, logCase := range cases { + client := &Client{ + client: newMockClient(func(r *http.Request) (*http.Response, error) { + if !strings.HasPrefix(r.URL.Path, expectedURL) { + return nil, fmt.Errorf("Expected URL '%s', got '%s'", expectedURL, r.URL) + } + // Check query parameters + query := r.URL.Query() + for key, expected := range logCase.expectedQueryParams { + actual := query.Get(key) + if actual != expected { + return nil, fmt.Errorf("%s not set in URL query properly. Expected '%s', got %s", key, expected, actual) + } + } + return &http.Response{ + StatusCode: http.StatusOK, + Body: ioutil.NopCloser(bytes.NewReader([]byte("response"))), + }, nil + }), + } + body, err := client.ServiceLogs(context.Background(), "service_id", logCase.options) + if err != nil { + t.Fatal(err) + } + defer body.Close() + content, err := ioutil.ReadAll(body) + if err != nil { + t.Fatal(err) + } + if string(content) != "response" { + t.Fatalf("expected response to contain 'response', got %s", string(content)) + } + } +} + +func ExampleClient_ServiceLogs_withTimeout() { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + client, _ := NewEnvClient() + reader, err := client.ServiceLogs(ctx, "service_id", types.ContainerLogsOptions{}) + if err != nil { + log.Fatal(err) + } + + _, err = io.Copy(os.Stdout, reader) + if err != nil && err != io.EOF { + log.Fatal(err) + } +} diff --git a/components/engine/cmd/dockerd/daemon.go b/components/engine/cmd/dockerd/daemon.go index 38ad14b475..ec1c348905 100644 --- a/components/engine/cmd/dockerd/daemon.go +++ b/components/engine/cmd/dockerd/daemon.go @@ -456,7 +456,7 @@ func initRouter(s *apiserver.Server, d *daemon.Daemon, c *cluster.Cluster) { systemrouter.NewRouter(d, c), volume.NewRouter(d), build.NewRouter(dockerfile.NewBuildManager(d)), - swarmrouter.NewRouter(c), + swarmrouter.NewRouter(d, c), }...) if d.NetworkControllerEnabled() { diff --git a/components/engine/daemon/cluster/cluster.go b/components/engine/daemon/cluster/cluster.go index 528cc38fa3..74e9d7e4bb 100644 --- a/components/engine/daemon/cluster/cluster.go +++ b/components/engine/daemon/cluster/cluster.go @@ -4,6 +4,7 @@ import ( "encoding/base64" "encoding/json" "fmt" + "io" "io/ioutil" "net" "os" @@ -16,20 +17,24 @@ import ( "github.com/Sirupsen/logrus" apierrors "github.com/docker/docker/api/errors" apitypes "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/backend" "github.com/docker/docker/api/types/filters" "github.com/docker/docker/api/types/network" types "github.com/docker/docker/api/types/swarm" "github.com/docker/docker/daemon/cluster/convert" executorpkg "github.com/docker/docker/daemon/cluster/executor" "github.com/docker/docker/daemon/cluster/executor/container" + "github.com/docker/docker/daemon/logger" "github.com/docker/docker/opts" "github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/signal" + "github.com/docker/docker/pkg/stdcopy" "github.com/docker/docker/reference" "github.com/docker/docker/runconfig" swarmapi "github.com/docker/swarmkit/api" "github.com/docker/swarmkit/manager/encryption" swarmnode "github.com/docker/swarmkit/node" + "github.com/docker/swarmkit/protobuf/ptypes" "github.com/pkg/errors" "golang.org/x/net/context" "google.golang.org/grpc" @@ -45,6 +50,7 @@ const defaultAddr = "0.0.0.0:2377" const ( initialReconnectDelay = 100 * time.Millisecond maxReconnectDelay = 30 * time.Second + contextPrefix = "com.docker.swarm" ) // ErrNoSwarm is returned on leaving a cluster that was never initialized @@ -120,6 +126,7 @@ type node struct { ready bool conn *grpc.ClientConn client swarmapi.ControlClient + logs swarmapi.LogsClient reconnectDelay time.Duration config nodeStartConfig } @@ -371,8 +378,10 @@ func (c *Cluster) startNewNode(conf nodeStartConfig) (*node, error) { if node.conn != conn { if conn == nil { node.client = nil + node.logs = nil } else { node.client = swarmapi.NewControlClient(conn) + node.logs = swarmapi.NewLogsClient(conn) } } node.conn = conn @@ -1205,6 +1214,88 @@ func (c *Cluster) RemoveService(input string) error { return nil } +// ServiceLogs collects service logs and writes them back to `config.OutStream` +func (c *Cluster) ServiceLogs(ctx context.Context, input string, config *backend.ContainerLogsConfig, started chan struct{}) error { + c.RLock() + if !c.isActiveManager() { + c.RUnlock() + return c.errNoManager() + } + + service, err := getService(ctx, c.client, input) + if err != nil { + c.RUnlock() + return err + } + + stream, err := c.logs.SubscribeLogs(ctx, &swarmapi.SubscribeLogsRequest{ + Selector: &swarmapi.LogSelector{ + ServiceIDs: []string{service.ID}, + }, + Options: &swarmapi.LogSubscriptionOptions{ + Follow: true, + }, + }) + if err != nil { + c.RUnlock() + return err + } + + wf := ioutils.NewWriteFlusher(config.OutStream) + defer wf.Close() + close(started) + wf.Flush() + + outStream := stdcopy.NewStdWriter(wf, stdcopy.Stdout) + errStream := stdcopy.NewStdWriter(wf, stdcopy.Stderr) + + // Release the lock before starting the stream. + c.RUnlock() + for { + // Check the context before doing anything. + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + subscribeMsg, err := stream.Recv() + if err == io.EOF { + return nil + } + if err != nil { + return err + } + + for _, msg := range subscribeMsg.Messages { + data := []byte{} + + if config.Timestamps { + ts, err := ptypes.Timestamp(msg.Timestamp) + if err != nil { + return err + } + data = append(data, []byte(ts.Format(logger.TimeFormat)+" ")...) + } + + data = append(data, []byte(fmt.Sprintf("%s.node.id=%s,%s.service.id=%s,%s.task.id=%s ", + contextPrefix, msg.Context.NodeID, + contextPrefix, msg.Context.ServiceID, + contextPrefix, msg.Context.TaskID, + ))...) + + data = append(data, msg.Data...) + + switch msg.Stream { + case swarmapi.LogStreamStdout: + outStream.Write(data) + case swarmapi.LogStreamStderr: + errStream.Write(data) + } + } + } +} + // GetNodes returns a list of all nodes known to a cluster. func (c *Cluster) GetNodes(options apitypes.NodeListOptions) ([]types.Node, error) { c.RLock() diff --git a/components/engine/daemon/cluster/executor/backend.go b/components/engine/daemon/cluster/executor/backend.go index 1a1d4ff091..d0c8af6cea 100644 --- a/components/engine/daemon/cluster/executor/backend.go +++ b/components/engine/daemon/cluster/executor/backend.go @@ -6,6 +6,7 @@ import ( "github.com/docker/distribution" "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/backend" "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/events" "github.com/docker/docker/api/types/filters" @@ -28,6 +29,7 @@ type Backend interface { CreateManagedContainer(config types.ContainerCreateConfig, validateHostname bool) (container.ContainerCreateCreatedBody, error) ContainerStart(name string, hostConfig *container.HostConfig, validateHostname bool, checkpoint string, checkpointDir string) error ContainerStop(name string, seconds *int) error + ContainerLogs(context.Context, string, *backend.ContainerLogsConfig, chan struct{}) error ConnectContainerToNetwork(containerName, networkName string, endpointConfig *network.EndpointSettings) error ActivateContainerServiceBinding(containerName string) error DeactivateContainerServiceBinding(containerName string) error diff --git a/components/engine/daemon/cluster/executor/container/adapter.go b/components/engine/daemon/cluster/executor/container/adapter.go index 98ea358100..29e6d0b4d2 100644 --- a/components/engine/daemon/cluster/executor/container/adapter.go +++ b/components/engine/daemon/cluster/executor/container/adapter.go @@ -12,6 +12,7 @@ import ( "github.com/Sirupsen/logrus" "github.com/docker/docker/api/server/httputils" "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/backend" containertypes "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/events" "github.com/docker/docker/api/types/versions" @@ -20,6 +21,7 @@ import ( "github.com/docker/swarmkit/agent/exec" "github.com/docker/swarmkit/api" "github.com/docker/swarmkit/log" + "github.com/docker/swarmkit/protobuf/ptypes" "golang.org/x/net/context" "golang.org/x/time/rate" ) @@ -376,6 +378,56 @@ func (c *containerAdapter) deactivateServiceBinding() error { return c.backend.DeactivateContainerServiceBinding(c.container.name()) } +func (c *containerAdapter) logs(ctx context.Context, options api.LogSubscriptionOptions) (io.ReadCloser, error) { + reader, writer := io.Pipe() + + apiOptions := &backend.ContainerLogsConfig{ + ContainerLogsOptions: types.ContainerLogsOptions{ + Follow: options.Follow, + + // TODO(stevvooe): Parse timestamp out of message. This + // absolutely needs to be done before going to production with + // this, at it is completely redundant. + Timestamps: true, + Details: false, // no clue what to do with this, let's just deprecate it. + }, + OutStream: writer, + } + + if options.Since != nil { + since, err := ptypes.Timestamp(options.Since) + if err != nil { + return nil, err + } + apiOptions.Since = since.Format(time.RFC3339Nano) + } + + if options.Tail < 0 { + // See protobuf documentation for details of how this works. + apiOptions.Tail = fmt.Sprint(-options.Tail - 1) + } else if options.Tail > 0 { + return nil, fmt.Errorf("tail relative to start of logs not supported via docker API") + } + + if len(options.Streams) == 0 { + // empty == all + apiOptions.ShowStdout, apiOptions.ShowStderr = true, true + } else { + for _, stream := range options.Streams { + switch stream { + case api.LogStreamStdout: + apiOptions.ShowStdout = true + case api.LogStreamStderr: + apiOptions.ShowStderr = true + } + } + } + + chStarted := make(chan struct{}) + go c.backend.ContainerLogs(ctx, c.container.name(), apiOptions, chStarted) + return reader, nil +} + // todo: typed/wrapped errors func isContainerCreateNameConflict(err error) bool { return strings.Contains(err.Error(), "Conflict. The name") diff --git a/components/engine/daemon/cluster/executor/container/controller.go b/components/engine/daemon/cluster/executor/container/controller.go index 47fc2bf7a5..103f47e965 100644 --- a/components/engine/daemon/cluster/executor/container/controller.go +++ b/components/engine/daemon/cluster/executor/container/controller.go @@ -1,8 +1,13 @@ package container import ( + "bufio" + "bytes" + "encoding/binary" "fmt" + "io" "os" + "time" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/events" @@ -11,8 +16,10 @@ import ( "github.com/docker/swarmkit/agent/exec" "github.com/docker/swarmkit/api" "github.com/docker/swarmkit/log" + "github.com/docker/swarmkit/protobuf/ptypes" "github.com/pkg/errors" "golang.org/x/net/context" + "golang.org/x/time/rate" ) // controller implements agent.Controller against docker's API. @@ -374,6 +381,128 @@ func (r *controller) Remove(ctx context.Context) error { return nil } +// waitReady waits for a container to be "ready". +// Ready means it's past the started state. +func (r *controller) waitReady(pctx context.Context) error { + if err := r.checkClosed(); err != nil { + return err + } + + ctx, cancel := context.WithCancel(pctx) + defer cancel() + + eventq := r.adapter.events(ctx) + + ctnr, err := r.adapter.inspect(ctx) + if err != nil { + if !isUnknownContainer(err) { + return errors.Wrap(err, "inspect container failed") + } + } else { + switch ctnr.State.Status { + case "running", "exited", "dead": + return nil + } + } + + for { + select { + case event := <-eventq: + if !r.matchevent(event) { + continue + } + + switch event.Action { + case "start": + return nil + } + case <-ctx.Done(): + return ctx.Err() + case <-r.closed: + return r.err + } + } +} + +func (r *controller) Logs(ctx context.Context, publisher exec.LogPublisher, options api.LogSubscriptionOptions) error { + if err := r.checkClosed(); err != nil { + return err + } + + if err := r.waitReady(ctx); err != nil { + return errors.Wrap(err, "container not ready for logs") + } + + rc, err := r.adapter.logs(ctx, options) + if err != nil { + return errors.Wrap(err, "failed getting container logs") + } + defer rc.Close() + + var ( + // use a rate limiter to keep things under control but also provides some + // ability coalesce messages. + limiter = rate.NewLimiter(rate.Every(time.Second), 10<<20) // 10 MB/s + msgctx = api.LogContext{ + NodeID: r.task.NodeID, + ServiceID: r.task.ServiceID, + TaskID: r.task.ID, + } + ) + + brd := bufio.NewReader(rc) + for { + // so, message header is 8 bytes, treat as uint64, pull stream off MSB + var header uint64 + if err := binary.Read(brd, binary.BigEndian, &header); err != nil { + if err == io.EOF { + return nil + } + + return errors.Wrap(err, "failed reading log header") + } + + stream, size := (header>>(7<<3))&0xFF, header & ^(uint64(0xFF)<<(7<<3)) + + // limit here to decrease allocation back pressure. + if err := limiter.WaitN(ctx, int(size)); err != nil { + return errors.Wrap(err, "failed rate limiter") + } + + buf := make([]byte, size) + _, err := io.ReadFull(brd, buf) + if err != nil { + return errors.Wrap(err, "failed reading buffer") + } + + // Timestamp is RFC3339Nano with 1 space after. Lop, parse, publish + parts := bytes.SplitN(buf, []byte(" "), 2) + if len(parts) != 2 { + return fmt.Errorf("invalid timestamp in log message: %v", buf) + } + + ts, err := time.Parse(time.RFC3339Nano, string(parts[0])) + if err != nil { + return errors.Wrap(err, "failed to parse timestamp") + } + + tsp, err := ptypes.TimestampProto(ts) + if err != nil { + return errors.Wrap(err, "failed to convert timestamp") + } + + if err := publisher.Publish(ctx, api.LogMessage{ + Context: msgctx, + Timestamp: tsp, + Stream: api.LogStream(stream), + + Data: parts[1], + }); err != nil { + return errors.Wrap(err, "failed to publish log message") + } + } +} + // Close the runner and clean up any ephemeral resources. func (r *controller) Close() error { select { diff --git a/components/engine/docs/reference/api/docker_remote_api.md b/components/engine/docs/reference/api/docker_remote_api.md index f3652ed21e..491d253841 100644 --- a/components/engine/docs/reference/api/docker_remote_api.md +++ b/components/engine/docs/reference/api/docker_remote_api.md @@ -149,7 +149,7 @@ This section lists each version from latest to oldest. Each listing includes a * `POST /containers/create` now takes `AutoRemove` in HostConfig, to enable auto-removal of the container on daemon side when the container's process exits. * `GET /containers/json` and `GET /containers/(id or name)/json` now return `"removing"` as a value for the `State.Status` field if the container is being removed. Previously, "exited" was returned as status. * `GET /containers/json` now accepts `removing` as a valid value for the `status` filter. -* `GET /containers/json` now supports filtering containers by `health` status. +* `GET /containers/json` now supports filtering containers by `health` status. * `DELETE /volumes/(name)` now accepts a `force` query parameter to force removal of volumes that were already removed out of band by the volume driver plugin. * `POST /containers/create/` and `POST /containers/(name)/update` now validates restart policies. * `POST /containers/create` now validates IPAMConfig in NetworkingConfig, and returns error for invalid IPv4 and IPv6 addresses (`--ip` and `--ip6` in `docker create/run`). diff --git a/components/engine/docs/reference/api/docker_remote_api_v1.25.md b/components/engine/docs/reference/api/docker_remote_api_v1.25.md index 9e7c4da230..5770017721 100644 --- a/components/engine/docs/reference/api/docker_remote_api_v1.25.md +++ b/components/engine/docs/reference/api/docker_remote_api_v1.25.md @@ -5631,6 +5631,49 @@ image](#create-an-image) section for more details. - **404** – no such service - **500** – server error +### Get service logs + +`GET /services/(id or name)/logs` + +Get `stdout` and `stderr` logs from the service ``id`` + +> **Note**: +> This endpoint works only for services with the `json-file` or `journald` logging drivers. + +**Example request**: + + GET /services/4fa6e0f0c678/logs?stderr=1&stdout=1×tamps=1&follow=1&tail=10&since=1428990821 HTTP/1.1 + +**Example response**: + + HTTP/1.1 101 UPGRADED + Content-Type: application/vnd.docker.raw-stream + Connection: Upgrade + Upgrade: tcp + + {% raw %} + {{ STREAM }} + {% endraw %} + +**Query parameters**: + +- **details** - 1/True/true or 0/False/flase, Show extra details provided to logs. Default `false`. +- **follow** – 1/True/true or 0/False/false, return stream. Default `false`. +- **stdout** – 1/True/true or 0/False/false, show `stdout` log. Default `false`. +- **stderr** – 1/True/true or 0/False/false, show `stderr` log. Default `false`. +- **since** – UNIX timestamp (integer) to filter logs. Specifying a timestamp + will only output log-entries since that timestamp. Default: 0 (unfiltered) +- **timestamps** – 1/True/true or 0/False/false, print timestamps for + every log line. Default `false`. +- **tail** – Output specified number of lines at the end of logs: `all` or ``. Default all. + +**Status codes**: + +- **101** – no error, hints proxy about hijacking +- **200** – no error, no upgrade header found +- **404** – no such service +- **500** – server error + ## 3.10 Tasks **Note**: Task operations require the engine to be part of a swarm. diff --git a/components/engine/docs/reference/commandline/service_logs.md b/components/engine/docs/reference/commandline/service_logs.md new file mode 100644 index 0000000000..eb319141e5 --- /dev/null +++ b/components/engine/docs/reference/commandline/service_logs.md @@ -0,0 +1,67 @@ +--- +title: "service logs (experimental)" +description: "The service logs command description and usage" +keywords: "service, logs" +advisory: "experimental" +--- + + + +# service logs + +```Markdown +Usage: docker service logs [OPTIONS] SERVICE + +Fetch the logs of a service + +Options: + --details Show extra details provided to logs + -f, --follow Follow log output + --help Print usage + --since string Show logs since timestamp + --tail string Number of lines to show from the end of the logs (default "all") + -t, --timestamps Show timestamps +``` + +The `docker service logs` command batch-retrieves logs present at the time of execution. + +> **Note**: this command is only functional for services that are started with +> the `json-file` or `journald` logging driver. + +For more information about selecting and configuring login-drivers, refer to +[Configure logging drivers](https://docs.docker.com/engine/admin/logging/overview/). + +The `docker service logs --follow` command will continue streaming the new output from +the service's `STDOUT` and `STDERR`. + +Passing a negative number or a non-integer to `--tail` is invalid and the +value is set to `all` in that case. + +The `docker service logs --timestamps` command will add an [RFC3339Nano timestamp](https://golang.org/pkg/time/#pkg-constants) +, for example `2014-09-16T06:17:46.000000000Z`, to each +log entry. To ensure that the timestamps are aligned the +nano-second part of the timestamp will be padded with zero when necessary. + +The `docker service logs --details` command will add on extra attributes, such as +environment variables and labels, provided to `--log-opt` when creating the +service. + +The `--since` option shows only the service logs generated after +a given date. You can specify the date as an RFC 3339 date, a UNIX +timestamp, or a Go duration string (e.g. `1m30s`, `3h`). Besides RFC3339 date +format you may also use RFC3339Nano, `2006-01-02T15:04:05`, +`2006-01-02T15:04:05.999999999`, `2006-01-02Z07:00`, and `2006-01-02`. The local +timezone on the client will be used if you do not provide either a `Z` or a +`+-00:00` timezone offset at the end of the timestamp. When providing Unix +timestamps enter seconds[.nanoseconds], where seconds is the number of seconds +that have elapsed since January 1, 1970 (midnight UTC/GMT), not counting leap +seconds (aka Unix epoch or Unix time), and the optional .nanoseconds field is a +fraction of a second no more than nine digits long. You can combine the +`--since` option with either or both of the `--follow` or `--tail` options. diff --git a/components/engine/integration-cli/docker_cli_service_logs_experimental_test.go b/components/engine/integration-cli/docker_cli_service_logs_experimental_test.go new file mode 100644 index 0000000000..2bb91c1d7d --- /dev/null +++ b/components/engine/integration-cli/docker_cli_service_logs_experimental_test.go @@ -0,0 +1,55 @@ +// +build !windows + +package main + +import ( + "bufio" + "io" + "os/exec" + "strings" + + "github.com/docker/docker/pkg/integration/checker" + "github.com/go-check/check" +) + +type logMessage struct { + err error + data []byte +} + +func (s *DockerSwarmSuite) TestServiceLogs(c *check.C) { + testRequires(c, ExperimentalDaemon) + + d := s.AddDaemon(c, true, true) + + name := "TestServiceLogs" + + out, err := d.Cmd("service", "create", "--name", name, "busybox", "sh", "-c", "while true; do echo log test; sleep 1; done") + c.Assert(err, checker.IsNil) + c.Assert(strings.TrimSpace(out), checker.Not(checker.Equals), "") + + // make sure task has been deployed. + waitAndAssert(c, defaultReconciliationTimeout, d.checkActiveContainerCount, checker.Equals, 1) + + args := []string{"service", "logs", "-f", name} + cmd := exec.Command(dockerBinary, d.prependHostArg(args)...) + r, w := io.Pipe() + cmd.Stdout = w + cmd.Stderr = w + c.Assert(cmd.Start(), checker.IsNil) + + // Make sure pipe is written to + ch := make(chan *logMessage) + go func() { + reader := bufio.NewReader(r) + msg := &logMessage{} + msg.data, _, msg.err = reader.ReadLine() + ch <- msg + }() + + msg := <-ch + c.Assert(msg.err, checker.IsNil) + c.Assert(string(msg.data), checker.Contains, "log test") + + c.Assert(cmd.Process.Kill(), checker.IsNil) +}