Files
docker-cli/components/engine/api/server/router/local/info.go
T
David Calavera d555e15f77 Add PubSub topics.
A TopicFunc is an interface to let the pubisher decide whether it needs
to send a message to a subscriber or not. It returns true if the
publisher must send the message and false otherwise.

Users of the pubsub package can create a subscriber with a topic
function by calling `pubsub.SubscribeTopic`.

Message delivery has also been modified to use concurrent channels per
subscriber. That way, topic verification and message delivery is not
o(N+M) anymore, based on the number of subscribers and topic verification
complexity.

Using pubsub topics, the API stops controlling the message delivery,
delegating that function to a topic generated with the filtering
provided by the user. The publisher sends every message to the
subscriber if there is no filter, but the api doesn't have to select
messages to return anymore.

Signed-off-by: David Calavera <david.calavera@gmail.com>
Upstream-commit: 434d2e8745696255a204d9eefc6a2854ff74e5c2
Component: engine
2015-12-02 16:43:49 -05:00

127 lines
3.1 KiB
Go

package local
import (
"encoding/json"
"net/http"
"runtime"
"time"
"github.com/Sirupsen/logrus"
"github.com/docker/docker/api"
"github.com/docker/docker/api/server/httputils"
"github.com/docker/docker/api/types"
"github.com/docker/docker/dockerversion"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/jsonmessage"
"github.com/docker/docker/pkg/parsers/filters"
"github.com/docker/docker/pkg/parsers/kernel"
"github.com/docker/docker/pkg/timeutils"
"github.com/docker/docker/utils"
"golang.org/x/net/context"
)
func (s *router) getVersion(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
v := &types.Version{
Version: dockerversion.Version,
APIVersion: api.Version,
GitCommit: dockerversion.GitCommit,
GoVersion: runtime.Version(),
Os: runtime.GOOS,
Arch: runtime.GOARCH,
BuildTime: dockerversion.BuildTime,
}
version := httputils.VersionFromContext(ctx)
if version.GreaterThanOrEqualTo("1.19") {
v.Experimental = utils.ExperimentalBuild()
}
if kernelVersion, err := kernel.GetKernelVersion(); err == nil {
v.KernelVersion = kernelVersion.String()
}
return httputils.WriteJSON(w, http.StatusOK, v)
}
func (s *router) getInfo(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
info, err := s.daemon.SystemInfo()
if err != nil {
return err
}
return httputils.WriteJSON(w, http.StatusOK, info)
}
func (s *router) getEvents(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if err := httputils.ParseForm(r); err != nil {
return err
}
since, sinceNano, err := timeutils.ParseTimestamps(r.Form.Get("since"), -1)
if err != nil {
return err
}
until, untilNano, err := timeutils.ParseTimestamps(r.Form.Get("until"), -1)
if err != nil {
return err
}
timer := time.NewTimer(0)
timer.Stop()
if until > 0 || untilNano > 0 {
dur := time.Unix(until, untilNano).Sub(time.Now())
timer = time.NewTimer(dur)
}
ef, err := filters.FromParam(r.Form.Get("filters"))
if err != nil {
return err
}
w.Header().Set("Content-Type", "application/json")
// This is to ensure that the HTTP status code is sent immediately,
// so that it will not block the receiver.
w.WriteHeader(http.StatusOK)
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
}
output := ioutils.NewWriteFlusher(w)
defer output.Close()
enc := json.NewEncoder(output)
buffered, l := s.daemon.SubscribeToEvents(since, sinceNano, ef)
defer s.daemon.UnsubscribeFromEvents(l)
for _, ev := range buffered {
if err := enc.Encode(ev); err != nil {
return err
}
}
var closeNotify <-chan bool
if closeNotifier, ok := w.(http.CloseNotifier); ok {
closeNotify = closeNotifier.CloseNotify()
}
for {
select {
case ev := <-l:
jev, ok := ev.(*jsonmessage.JSONMessage)
if !ok {
continue
}
if err := enc.Encode(jev); err != nil {
return err
}
case <-timer.C:
return nil
case <-closeNotify:
logrus.Debug("Client disconnected, stop sending events")
return nil
}
}
}