From d1efb7843b268ddf08b021ccf27946d7c59217d5 Mon Sep 17 00:00:00 2001 From: Victor Vieux Date: Wed, 10 Jul 2013 12:55:05 +0000 Subject: [PATCH 1/8] basic version of the /events endpoint Upstream-commit: b5da816487d68853a8ac46630cb3118646f71d2d Component: engine --- components/engine/api.go | 28 ++++++++++++++++++++++++- components/engine/api_params.go | 15 +++++++------- components/engine/commands.go | 31 ++++++++++++++++++++-------- components/engine/runtime_test.go | 12 +++++------ components/engine/server.go | 34 +++++++++++++++++++++++-------- components/engine/utils/utils.go | 15 ++++++++++++++ 6 files changed, 104 insertions(+), 31 deletions(-) diff --git a/components/engine/api.go b/components/engine/api.go index b6ab7badfa..9d0348b608 100644 --- a/components/engine/api.go +++ b/components/engine/api.go @@ -217,6 +217,31 @@ func getInfo(srv *Server, version float64, w http.ResponseWriter, r *http.Reques return nil } +func getEvents(srv *Server, version float64, w http.ResponseWriter, r *http.Request, vars map[string]string) error { + events := make(chan utils.JSONMessage) + srv.Lock() + srv.events[r.RemoteAddr] = events + srv.Unlock() + w.Header().Set("Content-Type", "application/json") + wf := utils.NewWriteFlusher(w) + for { + event := <-events + b, err := json.Marshal(event) + if err != nil { + continue + } + _, err = wf.Write(b) + if err != nil { + utils.Debugf("%s", err) + srv.Lock() + delete(srv.events, r.RemoteAddr) + srv.Unlock() + return err + } + } + return nil +} + func getImagesHistory(srv *Server, version float64, w http.ResponseWriter, r *http.Request, vars map[string]string) error { if vars == nil { return fmt.Errorf("Missing parameter") @@ -855,8 +880,9 @@ func createRouter(srv *Server, logging bool) (*mux.Router, error) { m := map[string]map[string]func(*Server, float64, http.ResponseWriter, *http.Request, map[string]string) error{ "GET": { "/auth": getAuth, - "/version": getVersion, + "/events": getEvents, "/info": getInfo, + "/version": getVersion, "/images/json": getImagesJSON, "/images/viz": getImagesViz, "/images/search": getImagesSearch, diff --git a/components/engine/api_params.go b/components/engine/api_params.go index b371ca314f..26d70711a1 100644 --- a/components/engine/api_params.go +++ b/components/engine/api_params.go @@ -17,13 +17,14 @@ type APIImages struct { } type APIInfo struct { - Debug bool - Containers int - Images int - NFd int `json:",omitempty"` - NGoroutines int `json:",omitempty"` - MemoryLimit bool `json:",omitempty"` - SwapLimit bool `json:",omitempty"` + Debug bool + Containers int + Images int + NFd int `json:",omitempty"` + NGoroutines int `json:",omitempty"` + MemoryLimit bool `json:",omitempty"` + SwapLimit bool `json:",omitempty"` + NEventsListener int `json:",omitempty"` } type APITop struct { diff --git a/components/engine/commands.go b/components/engine/commands.go index b0e32162e6..12647feead 100644 --- a/components/engine/commands.go +++ b/components/engine/commands.go @@ -78,6 +78,7 @@ func (cli *DockerCli) CmdHelp(args ...string) error { {"build", "Build a container from a Dockerfile"}, {"commit", "Create a new image from a container's changes"}, {"diff", "Inspect changes on a container's filesystem"}, + {"events", "Get real time events from the server"}, {"export", "Stream the contents of a container as a tar archive"}, {"history", "Show the history of an image"}, {"images", "List images"}, @@ -466,6 +467,7 @@ func (cli *DockerCli) CmdInfo(args ...string) error { fmt.Fprintf(cli.out, "Debug mode (client): %v\n", os.Getenv("DEBUG") != "") fmt.Fprintf(cli.out, "Fds: %d\n", out.NFd) fmt.Fprintf(cli.out, "Goroutines: %d\n", out.NGoroutines) + fmt.Fprintf(cli.out, "EventsListeners: %d\n", out.NEventsListener) } if !out.MemoryLimit { fmt.Fprintf(cli.err, "WARNING: No memory limit support\n") @@ -1055,6 +1057,23 @@ func (cli *DockerCli) CmdCommit(args ...string) error { return nil } +func (cli *DockerCli) CmdEvents(args ...string) error { + cmd := Subcmd("events", "", "Get real time events from the server") + if err := cmd.Parse(args); err != nil { + return nil + } + + if cmd.NArg() != 0 { + cmd.Usage() + return nil + } + + if err := cli.stream("GET", "/events", nil, cli.out); err != nil { + return err + } + return nil +} + func (cli *DockerCli) CmdExport(args ...string) error { cmd := Subcmd("export", "CONTAINER", "Export the contents of a filesystem as a tar archive") if err := cmd.Parse(args); err != nil { @@ -1509,19 +1528,13 @@ func (cli *DockerCli) stream(method, path string, in io.Reader, out io.Writer) e if resp.Header.Get("Content-Type") == "application/json" { dec := json.NewDecoder(resp.Body) for { - var m utils.JSONMessage - if err := dec.Decode(&m); err == io.EOF { + var jm utils.JSONMessage + if err := dec.Decode(&jm); err == io.EOF { break } else if err != nil { return err } - if m.Progress != "" { - fmt.Fprintf(out, "%s %s\r", m.Status, m.Progress) - } else if m.Error != "" { - return fmt.Errorf(m.Error) - } else { - fmt.Fprintf(out, "%s\n", m.Status) - } + jm.Display(out) } } else { if _, err := io.Copy(out, resp.Body); err != nil { diff --git a/components/engine/runtime_test.go b/components/engine/runtime_test.go index 66d92c8100..807097404d 100644 --- a/components/engine/runtime_test.go +++ b/components/engine/runtime_test.go @@ -17,12 +17,12 @@ import ( ) const ( - unitTestImageName = "docker-test-image" - unitTestImageID = "83599e29c455eb719f77d799bc7c51521b9551972f5a850d7ad265bc1b5292f6" // 1.0 - unitTestNetworkBridge = "testdockbr0" - unitTestStoreBase = "/var/lib/docker/unit-tests" - testDaemonAddr = "127.0.0.1:4270" - testDaemonProto = "tcp" + unitTestImageName = "docker-test-image" + unitTestImageID = "83599e29c455eb719f77d799bc7c51521b9551972f5a850d7ad265bc1b5292f6" // 1.0 + unitTestNetworkBridge = "testdockbr0" + unitTestStoreBase = "/var/lib/docker/unit-tests" + testDaemonAddr = "127.0.0.1:4270" + testDaemonProto = "tcp" ) var globalRuntime *Runtime diff --git a/components/engine/server.go b/components/engine/server.go index b92ed8fd73..2499d64397 100644 --- a/components/engine/server.go +++ b/components/engine/server.go @@ -32,8 +32,9 @@ func (srv *Server) DockerVersion() APIVersion { func (srv *Server) ContainerKill(name string) error { if container := srv.runtime.Get(name); container != nil { if err := container.Kill(); err != nil { - return fmt.Errorf("Error restarting container %s: %s", name, err) + return fmt.Errorf("Error killing container %s: %s", name, err) } + srv.SendEvent("kill", name) } else { return fmt.Errorf("No such container: %s", name) } @@ -52,6 +53,7 @@ func (srv *Server) ContainerExport(name string, out io.Writer) error { if _, err := io.Copy(out, data); err != nil { return err } + srv.SendEvent("export", name) return nil } return fmt.Errorf("No such container: %s", name) @@ -209,13 +211,14 @@ func (srv *Server) DockerInfo() *APIInfo { imgcount = len(images) } return &APIInfo{ - Containers: len(srv.runtime.List()), - Images: imgcount, - MemoryLimit: srv.runtime.capabilities.MemoryLimit, - SwapLimit: srv.runtime.capabilities.SwapLimit, - Debug: os.Getenv("DEBUG") != "", - NFd: utils.GetTotalUsedFds(), - NGoroutines: runtime.NumGoroutine(), + Containers: len(srv.runtime.List()), + Images: imgcount, + MemoryLimit: srv.runtime.capabilities.MemoryLimit, + SwapLimit: srv.runtime.capabilities.SwapLimit, + Debug: os.Getenv("DEBUG") != "", + NFd: utils.GetTotalUsedFds(), + NGoroutines: runtime.NumGoroutine(), + NEventsListener: len(srv.events), } } @@ -810,6 +813,7 @@ func (srv *Server) ContainerCreate(config *Config) (string, error) { } return "", err } + srv.SendEvent("create", container.ShortID()) return container.ShortID(), nil } @@ -818,6 +822,7 @@ func (srv *Server) ContainerRestart(name string, t int) error { if err := container.Restart(t); err != nil { return fmt.Errorf("Error restarting container %s: %s", name, err) } + srv.SendEvent("restart", name) } else { return fmt.Errorf("No such container: %s", name) } @@ -837,6 +842,7 @@ func (srv *Server) ContainerDestroy(name string, removeVolume bool) error { if err := srv.runtime.Destroy(container); err != nil { return fmt.Errorf("Error destroying container %s: %s", name, err) } + srv.SendEvent("destroy", name) if removeVolume { // Retrieve all volumes from all remaining containers @@ -903,6 +909,7 @@ func (srv *Server) deleteImageAndChildren(id string, imgs *[]APIRmi) error { return err } *imgs = append(*imgs, APIRmi{Deleted: utils.TruncateID(id)}) + srv.SendEvent("delete", utils.TruncateID(id)) return nil } return nil @@ -946,6 +953,7 @@ func (srv *Server) deleteImage(img *Image, repoName, tag string) ([]APIRmi, erro } if tagDeleted { imgs = append(imgs, APIRmi{Untagged: img.ShortID()}) + srv.SendEvent("untagged", img.ShortID()) } if len(srv.runtime.repositories.ByID()[img.ID]) == 0 { if err := srv.deleteImageAndChildren(img.ID, &imgs); err != nil { @@ -1018,6 +1026,7 @@ func (srv *Server) ContainerStart(name string, hostConfig *HostConfig) error { if err := container.Start(hostConfig); err != nil { return fmt.Errorf("Error starting container %s: %s", name, err) } + srv.SendEvent("start", name) } else { return fmt.Errorf("No such container: %s", name) } @@ -1029,6 +1038,7 @@ func (srv *Server) ContainerStop(name string, t int) error { if err := container.Stop(t); err != nil { return fmt.Errorf("Error stopping container %s: %s", name, err) } + srv.SendEvent("stop", name) } else { return fmt.Errorf("No such container: %s", name) } @@ -1162,15 +1172,23 @@ func NewServer(flGraphPath string, autoRestart, enableCors bool, dns ListOpts) ( enableCors: enableCors, pullingPool: make(map[string]struct{}), pushingPool: make(map[string]struct{}), + events: make(map[string]chan utils.JSONMessage), } runtime.srv = srv return srv, nil } +func (srv *Server) SendEvent(action, id string) { + for _, c := range srv.events { + c <- utils.JSONMessage{Status: action, ID: id} + } +} + type Server struct { sync.Mutex runtime *Runtime enableCors bool pullingPool map[string]struct{} pushingPool map[string]struct{} + events map[string]chan utils.JSONMessage } diff --git a/components/engine/utils/utils.go b/components/engine/utils/utils.go index 77b3f879cd..1523835f99 100644 --- a/components/engine/utils/utils.go +++ b/components/engine/utils/utils.go @@ -611,8 +611,23 @@ type JSONMessage struct { Status string `json:"status,omitempty"` Progress string `json:"progress,omitempty"` Error string `json:"error,omitempty"` + ID string `json:"id,omitempty"` } +func (jm *JSONMessage) Display(out io.Writer) (error) { + if jm.Progress != "" { + fmt.Fprintf(out, "%s %s\r", jm.Status, jm.Progress) + } else if jm.Error != "" { + return fmt.Errorf(jm.Error) + } else if jm.ID != "" { + fmt.Fprintf(out, "%s: %s\n", jm.ID, jm.Status) + } else { + fmt.Fprintf(out, "%s\n", jm.Status) + } + return nil +} + + type StreamFormatter struct { json bool used bool From fd89a1c59f66d03ee2e809f4eb9d574af2d2e14d Mon Sep 17 00:00:00 2001 From: Victor Vieux Date: Fri, 12 Jul 2013 15:09:20 +0000 Subject: [PATCH 2/8] add timestamp and change untagged -> untag Upstream-commit: b8d52ec2669332988a972bff3b5f5d2e9d526b33 Component: engine --- components/engine/server.go | 5 +++-- components/engine/utils/utils.go | 4 ++++ 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/components/engine/server.go b/components/engine/server.go index 2499d64397..8ba90c69e3 100644 --- a/components/engine/server.go +++ b/components/engine/server.go @@ -19,6 +19,7 @@ import ( "runtime" "strings" "sync" + "time" ) func (srv *Server) DockerVersion() APIVersion { @@ -953,7 +954,7 @@ func (srv *Server) deleteImage(img *Image, repoName, tag string) ([]APIRmi, erro } if tagDeleted { imgs = append(imgs, APIRmi{Untagged: img.ShortID()}) - srv.SendEvent("untagged", img.ShortID()) + srv.SendEvent("untag", img.ShortID()) } if len(srv.runtime.repositories.ByID()[img.ID]) == 0 { if err := srv.deleteImageAndChildren(img.ID, &imgs); err != nil { @@ -1180,7 +1181,7 @@ func NewServer(flGraphPath string, autoRestart, enableCors bool, dns ListOpts) ( func (srv *Server) SendEvent(action, id string) { for _, c := range srv.events { - c <- utils.JSONMessage{Status: action, ID: id} + c <- utils.JSONMessage{Status: action, ID: id, Time: time.Now().Unix()} } } diff --git a/components/engine/utils/utils.go b/components/engine/utils/utils.go index 1523835f99..acb015becd 100644 --- a/components/engine/utils/utils.go +++ b/components/engine/utils/utils.go @@ -612,9 +612,13 @@ type JSONMessage struct { Progress string `json:"progress,omitempty"` Error string `json:"error,omitempty"` ID string `json:"id,omitempty"` + Time int64 `json:"time,omitempty"` } func (jm *JSONMessage) Display(out io.Writer) (error) { + if jm.Time != 0 { + fmt.Fprintf(out, "[%s] ", time.Unix(jm.Time, 0)) + } if jm.Progress != "" { fmt.Fprintf(out, "%s %s\r", jm.Status, jm.Progress) } else if jm.Error != "" { From 012e9440352ecf0138ba0c7b5e8b8e8bbec44fc5 Mon Sep 17 00:00:00 2001 From: Victor Vieux Date: Fri, 12 Jul 2013 16:29:23 +0000 Subject: [PATCH 3/8] add since for polling, rename some vars Upstream-commit: 2e4d4c9f60d0ad92ab0cc84c56c060678222c4db Component: engine --- components/engine/api.go | 51 ++++++++++++++++++++++++++++------- components/engine/commands.go | 10 +++++-- components/engine/server.go | 33 +++++++++++++---------- 3 files changed, 68 insertions(+), 26 deletions(-) diff --git a/components/engine/api.go b/components/engine/api.go index 9d0348b608..0a7f5744b1 100644 --- a/components/engine/api.go +++ b/components/engine/api.go @@ -218,24 +218,55 @@ func getInfo(srv *Server, version float64, w http.ResponseWriter, r *http.Reques } func getEvents(srv *Server, version float64, w http.ResponseWriter, r *http.Request, vars map[string]string) error { - events := make(chan utils.JSONMessage) - srv.Lock() - srv.events[r.RemoteAddr] = events - srv.Unlock() - w.Header().Set("Content-Type", "application/json") - wf := utils.NewWriteFlusher(w) - for { - event := <-events + sendEvent := func(wf *utils.WriteFlusher, event *utils.JSONMessage) (bool, error) { b, err := json.Marshal(event) if err != nil { - continue + return true, nil } _, err = wf.Write(b) if err != nil { utils.Debugf("%s", err) srv.Lock() - delete(srv.events, r.RemoteAddr) + delete(srv.listeners, r.RemoteAddr) srv.Unlock() + return false, err + } + return false, nil + } + + if err := parseForm(r); err != nil { + return err + } + listener := make(chan utils.JSONMessage) + srv.Lock() + srv.listeners[r.RemoteAddr] = listener + srv.Unlock() + since, err := strconv.ParseInt(r.Form.Get("since"), 10, 0) + if err != nil { + since = 0 + } + w.Header().Set("Content-Type", "application/json") + wf := utils.NewWriteFlusher(w) + if since != 0 { + for _, event := range srv.events { + if event.Time >= since { + skip, err := sendEvent(wf, &event) + if skip { + continue + } + if err != nil { + return err + } + } + } + } + for { + event := <-listener + skip, err := sendEvent(wf, &event) + if skip { + continue + } + if err != nil { return err } } diff --git a/components/engine/commands.go b/components/engine/commands.go index 12647feead..2ab107bb77 100644 --- a/components/engine/commands.go +++ b/components/engine/commands.go @@ -1058,7 +1058,8 @@ func (cli *DockerCli) CmdCommit(args ...string) error { } func (cli *DockerCli) CmdEvents(args ...string) error { - cmd := Subcmd("events", "", "Get real time events from the server") + cmd := Subcmd("events", "[OPTIONS]", "Get real time events from the server") + since := cmd.String("since", "", "Show events previously created (used for polling).") if err := cmd.Parse(args); err != nil { return nil } @@ -1068,7 +1069,12 @@ func (cli *DockerCli) CmdEvents(args ...string) error { return nil } - if err := cli.stream("GET", "/events", nil, cli.out); err != nil { + v := url.Values{} + if *since != "" { + v.Set("since", *since) + } + + if err := cli.stream("GET", "/events?"+v.Encode(), nil, cli.out); err != nil { return err } return nil diff --git a/components/engine/server.go b/components/engine/server.go index 8ba90c69e3..7efb850882 100644 --- a/components/engine/server.go +++ b/components/engine/server.go @@ -35,7 +35,7 @@ func (srv *Server) ContainerKill(name string) error { if err := container.Kill(); err != nil { return fmt.Errorf("Error killing container %s: %s", name, err) } - srv.SendEvent("kill", name) + srv.LogEvent("kill", name) } else { return fmt.Errorf("No such container: %s", name) } @@ -54,7 +54,7 @@ func (srv *Server) ContainerExport(name string, out io.Writer) error { if _, err := io.Copy(out, data); err != nil { return err } - srv.SendEvent("export", name) + srv.LogEvent("export", name) return nil } return fmt.Errorf("No such container: %s", name) @@ -814,7 +814,7 @@ func (srv *Server) ContainerCreate(config *Config) (string, error) { } return "", err } - srv.SendEvent("create", container.ShortID()) + srv.LogEvent("create", container.ShortID()) return container.ShortID(), nil } @@ -823,7 +823,7 @@ func (srv *Server) ContainerRestart(name string, t int) error { if err := container.Restart(t); err != nil { return fmt.Errorf("Error restarting container %s: %s", name, err) } - srv.SendEvent("restart", name) + srv.LogEvent("restart", name) } else { return fmt.Errorf("No such container: %s", name) } @@ -843,7 +843,7 @@ func (srv *Server) ContainerDestroy(name string, removeVolume bool) error { if err := srv.runtime.Destroy(container); err != nil { return fmt.Errorf("Error destroying container %s: %s", name, err) } - srv.SendEvent("destroy", name) + srv.LogEvent("destroy", name) if removeVolume { // Retrieve all volumes from all remaining containers @@ -910,7 +910,7 @@ func (srv *Server) deleteImageAndChildren(id string, imgs *[]APIRmi) error { return err } *imgs = append(*imgs, APIRmi{Deleted: utils.TruncateID(id)}) - srv.SendEvent("delete", utils.TruncateID(id)) + srv.LogEvent("delete", utils.TruncateID(id)) return nil } return nil @@ -954,7 +954,7 @@ func (srv *Server) deleteImage(img *Image, repoName, tag string) ([]APIRmi, erro } if tagDeleted { imgs = append(imgs, APIRmi{Untagged: img.ShortID()}) - srv.SendEvent("untag", img.ShortID()) + srv.LogEvent("untag", img.ShortID()) } if len(srv.runtime.repositories.ByID()[img.ID]) == 0 { if err := srv.deleteImageAndChildren(img.ID, &imgs); err != nil { @@ -1027,7 +1027,7 @@ func (srv *Server) ContainerStart(name string, hostConfig *HostConfig) error { if err := container.Start(hostConfig); err != nil { return fmt.Errorf("Error starting container %s: %s", name, err) } - srv.SendEvent("start", name) + srv.LogEvent("start", name) } else { return fmt.Errorf("No such container: %s", name) } @@ -1039,7 +1039,7 @@ func (srv *Server) ContainerStop(name string, t int) error { if err := container.Stop(t); err != nil { return fmt.Errorf("Error stopping container %s: %s", name, err) } - srv.SendEvent("stop", name) + srv.LogEvent("stop", name) } else { return fmt.Errorf("No such container: %s", name) } @@ -1173,15 +1173,19 @@ func NewServer(flGraphPath string, autoRestart, enableCors bool, dns ListOpts) ( enableCors: enableCors, pullingPool: make(map[string]struct{}), pushingPool: make(map[string]struct{}), - events: make(map[string]chan utils.JSONMessage), + events: make([]utils.JSONMessage, 0, 64), //only keeps the 64 last events + listeners: make(map[string]chan utils.JSONMessage), } runtime.srv = srv return srv, nil } -func (srv *Server) SendEvent(action, id string) { - for _, c := range srv.events { - c <- utils.JSONMessage{Status: action, ID: id, Time: time.Now().Unix()} +func (srv *Server) LogEvent(action, id string) { + now := time.Now().Unix() + jm := utils.JSONMessage{Status: action, ID: id, Time: now} + srv.events = append(srv.events, jm) + for _, c := range srv.listeners { + c <- jm } } @@ -1191,5 +1195,6 @@ type Server struct { enableCors bool pullingPool map[string]struct{} pushingPool map[string]struct{} - events map[string]chan utils.JSONMessage + events []utils.JSONMessage + listeners map[string]chan utils.JSONMessage } From 05d166e113cf82f6e67f7b0efeb00212a4f6597f Mon Sep 17 00:00:00 2001 From: Victor Vieux Date: Wed, 17 Jul 2013 15:44:06 +0200 Subject: [PATCH 4/8] add docs Upstream-commit: ec559c02b8dd70692821b3dc7f497d6fccaa88ad Component: engine --- .../docs/sources/api/docker_remote_api.rst | 4 +++ .../sources/api/docker_remote_api_v1.3.rst | 30 +++++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/components/engine/docs/sources/api/docker_remote_api.rst b/components/engine/docs/sources/api/docker_remote_api.rst index f8a1381c53..792d43f501 100644 --- a/components/engine/docs/sources/api/docker_remote_api.rst +++ b/components/engine/docs/sources/api/docker_remote_api.rst @@ -44,6 +44,10 @@ What's new **New!** List the processes running inside a container. +.. http:get:: /events: + + **New!** Monitor docker's events via streaming or via polling + Builder (/build): - Simplify the upload of the build context diff --git a/components/engine/docs/sources/api/docker_remote_api_v1.3.rst b/components/engine/docs/sources/api/docker_remote_api_v1.3.rst index 273ec2e98d..69f480e453 100644 --- a/components/engine/docs/sources/api/docker_remote_api_v1.3.rst +++ b/components/engine/docs/sources/api/docker_remote_api_v1.3.rst @@ -1059,6 +1059,36 @@ Create a new image from a container's changes :statuscode 500: server error +Monitor Docker's events +*********************** + +.. http:get:: /events + + Get events from docker, either in real time via streaming, or via polling (using `since`) + + **Example request**: + + .. sourcecode:: http + + POST /events?since=1374067924 + + **Example response**: + + .. sourcecode:: http + + HTTP/1.1 200 OK + Content-Type: application/json + + {"status":"create","id":"dfdf82bd3881","time":1374067924} + {"status":"start","id":"dfdf82bd3881","time":1374067924} + {"status":"stop","id":"dfdf82bd3881","time":1374067966} + {"status":"destroy","id":"dfdf82bd3881","time":1374067970} + + :query since: timestamp used for polling + :statuscode 200: no error + :statuscode 500: server error + + 3. Going further ================ From 35c9a9d01e288176a1302a294bb1d0a6a76fda5b Mon Sep 17 00:00:00 2001 From: Victor Vieux Date: Wed, 17 Jul 2013 13:56:09 +0000 Subject: [PATCH 5/8] getEvents a bit simpler Upstream-commit: 8b3519c5f7540b99d19ed2c3163aabf9897dd5a4 Component: engine --- components/engine/api.go | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/components/engine/api.go b/components/engine/api.go index 0a7f5744b1..dd3e8eed0d 100644 --- a/components/engine/api.go +++ b/components/engine/api.go @@ -218,20 +218,21 @@ func getInfo(srv *Server, version float64, w http.ResponseWriter, r *http.Reques } func getEvents(srv *Server, version float64, w http.ResponseWriter, r *http.Request, vars map[string]string) error { - sendEvent := func(wf *utils.WriteFlusher, event *utils.JSONMessage) (bool, error) { + sendEvent := func(wf *utils.WriteFlusher, event *utils.JSONMessage) (error) { b, err := json.Marshal(event) if err != nil { - return true, nil + return fmt.Errorf("JSON error") } _, err = wf.Write(b) if err != nil { + // On error, evict the listener utils.Debugf("%s", err) srv.Lock() delete(srv.listeners, r.RemoteAddr) srv.Unlock() - return false, err + return err } - return false, nil + return nil } if err := parseForm(r); err != nil { @@ -248,10 +249,11 @@ func getEvents(srv *Server, version float64, w http.ResponseWriter, r *http.Requ w.Header().Set("Content-Type", "application/json") wf := utils.NewWriteFlusher(w) if since != 0 { + // If since, send previous events that happened after the timestamp for _, event := range srv.events { if event.Time >= since { - skip, err := sendEvent(wf, &event) - if skip { + err := sendEvent(wf, &event) + if err != nil && err.Error() == "JSON error" { continue } if err != nil { @@ -262,8 +264,8 @@ func getEvents(srv *Server, version float64, w http.ResponseWriter, r *http.Requ } for { event := <-listener - skip, err := sendEvent(wf, &event) - if skip { + err := sendEvent(wf, &event) + if err != nil && err.Error() == "JSON error" { continue } if err != nil { From ae5b19f00259a1ce4b1d33643e557640be5bd3cb Mon Sep 17 00:00:00 2001 From: Victor Vieux Date: Thu, 18 Jul 2013 14:35:14 +0000 Subject: [PATCH 6/8] use non-blocking channel to prevent dead-lock and add test for server Upstream-commit: 040c3b50d0a56baf98bd1ec14ad7d59c55a4ab31 Component: engine --- components/engine/server.go | 5 +++- components/engine/server_test.go | 40 ++++++++++++++++++++++++++++++++ 2 files changed, 44 insertions(+), 1 deletion(-) diff --git a/components/engine/server.go b/components/engine/server.go index 7efb850882..8eff17a947 100644 --- a/components/engine/server.go +++ b/components/engine/server.go @@ -1185,7 +1185,10 @@ func (srv *Server) LogEvent(action, id string) { jm := utils.JSONMessage{Status: action, ID: id, Time: now} srv.events = append(srv.events, jm) for _, c := range srv.listeners { - c <- jm + select { // non blocking channel + case c <- jm: + default: + } } } diff --git a/components/engine/server_test.go b/components/engine/server_test.go index 05a286aaa8..8612b3fcea 100644 --- a/components/engine/server_test.go +++ b/components/engine/server_test.go @@ -1,7 +1,9 @@ package docker import ( + "github.com/dotcloud/docker/utils" "testing" + "time" ) func TestContainerTagImageDelete(t *testing.T) { @@ -163,3 +165,41 @@ func TestRunWithTooLowMemoryLimit(t *testing.T) { } } + +func TestLogEvent(t *testing.T) { + runtime := mkRuntime(t) + srv := &Server{ + runtime: runtime, + events: make([]utils.JSONMessage, 0, 64), + listeners: make(map[string]chan utils.JSONMessage), + } + + srv.LogEvent("fakeaction", "fakeid") + + listener := make(chan utils.JSONMessage) + srv.Lock() + srv.listeners["test"] = listener + srv.Unlock() + + srv.LogEvent("fakeaction2", "fakeid") + + if len(srv.events) != 2 { + t.Fatalf("Expected 2 events, found %d", len(srv.events)) + } + go func() { + time.Sleep(200 * time.Millisecond) + srv.LogEvent("fakeaction3", "fakeid") + time.Sleep(200 * time.Millisecond) + srv.LogEvent("fakeaction4", "fakeid") + }() + + setTimeout(t, "Listening for events timed out", 2*time.Second, func() { + for i := 2; i < 4; i++ { + event := <-listener + if event != srv.events[i] { + t.Fatalf("Event received it different than expected") + } + } + }) + +} From 211bbde153caf194c9c48898a3b815a886a1be50 Mon Sep 17 00:00:00 2001 From: Victor Vieux Date: Thu, 18 Jul 2013 14:54:58 +0000 Subject: [PATCH 7/8] Add tests for the api Upstream-commit: ed7a4236b32f3f711c183dff8b70fbef17bae2d7 Component: engine --- components/engine/api_test.go | 38 ++++++++++++++++++++++++++++++ components/engine/commands_test.go | 2 +- 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/components/engine/api_test.go b/components/engine/api_test.go index 17ada96eab..13731dbf9e 100644 --- a/components/engine/api_test.go +++ b/components/engine/api_test.go @@ -89,6 +89,44 @@ func TestGetInfo(t *testing.T) { } } +func TestGetEvents(t *testing.T) { + runtime := mkRuntime(t) + srv := &Server{ + runtime: runtime, + events: make([]utils.JSONMessage, 0, 64), + listeners: make(map[string]chan utils.JSONMessage), + } + + srv.LogEvent("fakeaction", "fakeid") + srv.LogEvent("fakeaction2", "fakeid") + + req, err := http.NewRequest("GET", "/events?since=1", nil) + if err != nil { + t.Fatal(err) + } + + r := httptest.NewRecorder() + setTimeout(t, "", 500*time.Millisecond, func() { + if err := getEvents(srv, APIVERSION, r, req, nil); err != nil { + t.Fatal(err) + } + }) + + dec := json.NewDecoder(r.Body) + for i := 0; i < 2; i++ { + var jm utils.JSONMessage + if err := dec.Decode(&jm); err == io.EOF { + break + } else if err != nil { + t.Fatal(err) + } + if jm != srv.events[i] { + t.Fatalf("Event received it different than expected") + } + } + +} + func TestGetImagesJSON(t *testing.T) { runtime := mkRuntime(t) defer nuke(runtime) diff --git a/components/engine/commands_test.go b/components/engine/commands_test.go index 233c6337d4..030fb29f95 100644 --- a/components/engine/commands_test.go +++ b/components/engine/commands_test.go @@ -38,7 +38,7 @@ func setTimeout(t *testing.T, msg string, d time.Duration, f func()) { f() c <- false }() - if <-c { + if <-c && msg != "" { t.Fatal(msg) } } From 056ee7cfd5b6a8f0e395263cc735a943c9d11a7e Mon Sep 17 00:00:00 2001 From: Victor Vieux Date: Fri, 19 Jul 2013 14:31:42 +0000 Subject: [PATCH 8/8] add die event Upstream-commit: a41384ad7312a21fd8fe429637c8d6b5c883fa2a Component: engine --- components/engine/container.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/components/engine/container.go b/components/engine/container.go index f4a3c762ab..1e7bfed299 100644 --- a/components/engine/container.go +++ b/components/engine/container.go @@ -789,7 +789,9 @@ func (container *Container) monitor() { } } utils.Debugf("Process finished") - + if container.runtime != nil && container.runtime.srv != nil { + container.runtime.srv.LogEvent("die", container.ShortID()) + } exitCode := -1 if container.cmd != nil { exitCode = container.cmd.ProcessState.Sys().(syscall.WaitStatus).ExitStatus()