Merge pull request #21939 from calavera/events_until_past

Get events until a time in the past.
Upstream-commit: 19a453e6b40177af31703f107131aae5ca81adf2
Component: engine
This commit is contained in:
Brian Goff
2016-04-15 15:33:41 -04:00
12 changed files with 291 additions and 135 deletions

View File

@ -1,6 +1,8 @@
package system
import (
"time"
"github.com/docker/engine-api/types"
"github.com/docker/engine-api/types/events"
"github.com/docker/engine-api/types/filters"
@ -12,7 +14,7 @@ import (
type Backend interface {
SystemInfo() (*types.Info, error)
SystemVersion() types.Version
SubscribeToEvents(since, sinceNano int64, ef filters.Args) ([]events.Message, chan interface{})
SubscribeToEvents(since, until time.Time, ef filters.Args) ([]events.Message, chan interface{})
UnsubscribeFromEvents(chan interface{})
AuthenticateToRegistry(ctx context.Context, authConfig *types.AuthConfig) (string, string, error)
}

View File

@ -2,12 +2,14 @@ package system
import (
"encoding/json"
"fmt"
"net/http"
"time"
"github.com/Sirupsen/logrus"
"github.com/docker/docker/api"
"github.com/docker/docker/api/server/httputils"
"github.com/docker/docker/errors"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/engine-api/types"
"github.com/docker/engine-api/types/events"
@ -46,19 +48,33 @@ func (s *systemRouter) getEvents(ctx context.Context, w http.ResponseWriter, r *
if err := httputils.ParseForm(r); err != nil {
return err
}
since, sinceNano, err := timetypes.ParseTimestamps(r.Form.Get("since"), -1)
since, err := eventTime(r.Form.Get("since"))
if err != nil {
return err
}
until, untilNano, err := timetypes.ParseTimestamps(r.Form.Get("until"), -1)
until, err := eventTime(r.Form.Get("until"))
if err != nil {
return err
}
var timeout <-chan time.Time
if until > 0 || untilNano > 0 {
dur := time.Unix(until, untilNano).Sub(time.Now())
timeout = time.NewTimer(dur).C
var (
timeout <-chan time.Time
onlyPastEvents bool
)
if !until.IsZero() {
if until.Before(since) {
return errors.NewBadRequestError(fmt.Errorf("`since` time (%s) cannot be after `until` time (%s)", r.Form.Get("since"), r.Form.Get("until")))
}
now := time.Now()
onlyPastEvents = until.Before(now)
if !onlyPastEvents {
dur := until.Sub(now)
timeout = time.NewTimer(dur).C
}
}
ef, err := filters.FromParam(r.Form.Get("filters"))
@ -73,7 +89,7 @@ func (s *systemRouter) getEvents(ctx context.Context, w http.ResponseWriter, r *
enc := json.NewEncoder(output)
buffered, l := s.backend.SubscribeToEvents(since, sinceNano, ef)
buffered, l := s.backend.SubscribeToEvents(since, until, ef)
defer s.backend.UnsubscribeFromEvents(l)
for _, ev := range buffered {
@ -82,6 +98,10 @@ func (s *systemRouter) getEvents(ctx context.Context, w http.ResponseWriter, r *
}
}
if onlyPastEvents {
return nil
}
for {
select {
case ev := <-l:
@ -118,3 +138,14 @@ func (s *systemRouter) postAuth(ctx context.Context, w http.ResponseWriter, r *h
IdentityToken: token,
})
}
func eventTime(formTime string) (time.Time, error) {
t, tNano, err := timetypes.ParseTimestamps(formTime, -1)
if err != nil {
return time.Time{}, err
}
if t == -1 {
return time.Time{}, nil
}
return time.Unix(t, tNano), nil
}