Merge component 'engine' from git@github.com:moby/moby master

This commit is contained in:
GordonTheTurtle
2017-12-05 19:13:41 +00:00
62 changed files with 1851 additions and 708 deletions

View File

@ -14,7 +14,6 @@ import (
"github.com/docker/docker/cmd/dockerd/hack"
"github.com/docker/docker/daemon"
"github.com/docker/docker/libcontainerd"
"github.com/docker/docker/pkg/parsers/kernel"
"github.com/docker/libnetwork/portallocator"
"golang.org/x/sys/unix"
)
@ -38,24 +37,13 @@ func getDaemonConfDir(_ string) string {
}
func (cli *DaemonCli) getPlatformRemoteOptions() ([]libcontainerd.RemoteOption, error) {
// On older kernel, letting putting the containerd-shim in its own
// namespace will effectively prevent operations such as unlink, rename
// and remove on mountpoints that were present at the time the shim
// namespace was created. This would led to a famous EBUSY will trying to
// remove shm mounts.
var noNewNS bool
if !kernel.CheckKernelVersion(3, 18, 0) {
noNewNS = true
}
opts := []libcontainerd.RemoteOption{
libcontainerd.WithOOMScore(cli.Config.OOMScoreAdjust),
libcontainerd.WithPlugin("linux", &linux.Config{
Shim: daemon.DefaultShimBinary,
Runtime: daemon.DefaultRuntimeBinary,
RuntimeRoot: filepath.Join(cli.Config.Root, "runc"),
ShimDebug: cli.Config.Debug,
ShimNoMountNS: noNewNS,
Shim: daemon.DefaultShimBinary,
Runtime: daemon.DefaultRuntimeBinary,
RuntimeRoot: filepath.Join(cli.Config.Root, "runc"),
ShimDebug: cli.Config.Debug,
}),
}
if cli.Config.Debug {

View File

@ -11,6 +11,7 @@ package copy
*/
import "C"
import (
"container/list"
"fmt"
"io"
"os"
@ -65,7 +66,7 @@ func copyRegular(srcPath, dstPath string, fileinfo os.FileInfo, copyWithFileRang
// as the ioctl may not have been available (therefore EINVAL)
if err == unix.EXDEV || err == unix.ENOSYS {
*copyWithFileRange = false
} else if err != nil {
} else {
return err
}
}
@ -106,11 +107,28 @@ func copyXattr(srcPath, dstPath, attr string) error {
return nil
}
type fileID struct {
dev uint64
ino uint64
}
type dirMtimeInfo struct {
dstPath *string
stat *syscall.Stat_t
}
// DirCopy copies or hardlinks the contents of one directory to another,
// properly handling xattrs, and soft links
func DirCopy(srcDir, dstDir string, copyMode Mode) error {
//
// Copying xattrs can be opted out of by passing false for copyXattrs.
func DirCopy(srcDir, dstDir string, copyMode Mode, copyXattrs bool) error {
copyWithFileRange := true
copyWithFileClone := true
// This is a map of source file inodes to dst file paths
copiedFiles := make(map[fileID]string)
dirsToSetMtimes := list.New()
err := filepath.Walk(srcDir, func(srcPath string, f os.FileInfo, err error) error {
if err != nil {
return err
@ -136,15 +154,21 @@ func DirCopy(srcDir, dstDir string, copyMode Mode) error {
switch f.Mode() & os.ModeType {
case 0: // Regular file
id := fileID{dev: stat.Dev, ino: stat.Ino}
if copyMode == Hardlink {
isHardlink = true
if err2 := os.Link(srcPath, dstPath); err2 != nil {
return err2
}
} else if hardLinkDstPath, ok := copiedFiles[id]; ok {
if err2 := os.Link(hardLinkDstPath, dstPath); err2 != nil {
return err2
}
} else {
if err2 := copyRegular(srcPath, dstPath, f, &copyWithFileRange, &copyWithFileClone); err2 != nil {
return err2
}
copiedFiles[id] = dstPath
}
case os.ModeDir:
@ -192,16 +216,10 @@ func DirCopy(srcDir, dstDir string, copyMode Mode) error {
return err
}
if err := copyXattr(srcPath, dstPath, "security.capability"); err != nil {
return err
}
// We need to copy this attribute if it appears in an overlay upper layer, as
// this function is used to copy those. It is set by overlay if a directory
// is removed and then re-created and should not inherit anything from the
// same dir in the lower dir.
if err := copyXattr(srcPath, dstPath, "trusted.overlay.opaque"); err != nil {
return err
if copyXattrs {
if err := doCopyXattrs(srcPath, dstPath); err != nil {
return err
}
}
isSymlink := f.Mode()&os.ModeSymlink != 0
@ -216,7 +234,9 @@ func DirCopy(srcDir, dstDir string, copyMode Mode) error {
// system.Chtimes doesn't support a NOFOLLOW flag atm
// nolint: unconvert
if !isSymlink {
if f.IsDir() {
dirsToSetMtimes.PushFront(&dirMtimeInfo{dstPath: &dstPath, stat: stat})
} else if !isSymlink {
aTime := time.Unix(int64(stat.Atim.Sec), int64(stat.Atim.Nsec))
mTime := time.Unix(int64(stat.Mtim.Sec), int64(stat.Mtim.Nsec))
if err := system.Chtimes(dstPath, aTime, mTime); err != nil {
@ -230,5 +250,31 @@ func DirCopy(srcDir, dstDir string, copyMode Mode) error {
}
return nil
})
return err
if err != nil {
return err
}
for e := dirsToSetMtimes.Front(); e != nil; e = e.Next() {
mtimeInfo := e.Value.(*dirMtimeInfo)
ts := []syscall.Timespec{mtimeInfo.stat.Atim, mtimeInfo.stat.Mtim}
if err := system.LUtimesNano(*mtimeInfo.dstPath, ts); err != nil {
return err
}
}
return nil
}
func doCopyXattrs(srcPath, dstPath string) error {
if err := copyXattr(srcPath, dstPath, "security.capability"); err != nil {
return err
}
// We need to copy this attribute if it appears in an overlay upper layer, as
// this function is used to copy those. It is set by overlay if a directory
// is removed and then re-created and should not inherit anything from the
// same dir in the lower dir.
if err := copyXattr(srcPath, dstPath, "trusted.overlay.opaque"); err != nil {
return err
}
return nil
}

View File

@ -3,15 +3,20 @@
package copy
import (
"fmt"
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"syscall"
"testing"
"time"
"github.com/docker/docker/pkg/parsers/kernel"
"github.com/docker/docker/pkg/system"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sys/unix"
)
func TestIsCopyFileRangeSyscallAvailable(t *testing.T) {
@ -45,6 +50,84 @@ func TestCopyWithoutRange(t *testing.T) {
doCopyTest(t, &copyWithFileRange, &copyWithFileClone)
}
func TestCopyDir(t *testing.T) {
srcDir, err := ioutil.TempDir("", "srcDir")
require.NoError(t, err)
populateSrcDir(t, srcDir, 3)
dstDir, err := ioutil.TempDir("", "testdst")
require.NoError(t, err)
defer os.RemoveAll(dstDir)
assert.NoError(t, DirCopy(srcDir, dstDir, Content, false))
require.NoError(t, filepath.Walk(srcDir, func(srcPath string, f os.FileInfo, err error) error {
if err != nil {
return err
}
// Rebase path
relPath, err := filepath.Rel(srcDir, srcPath)
require.NoError(t, err)
if relPath == "." {
return nil
}
dstPath := filepath.Join(dstDir, relPath)
require.NoError(t, err)
// If we add non-regular dirs and files to the test
// then we need to add more checks here.
dstFileInfo, err := os.Lstat(dstPath)
require.NoError(t, err)
srcFileSys := f.Sys().(*syscall.Stat_t)
dstFileSys := dstFileInfo.Sys().(*syscall.Stat_t)
t.Log(relPath)
if srcFileSys.Dev == dstFileSys.Dev {
assert.NotEqual(t, srcFileSys.Ino, dstFileSys.Ino)
}
// Todo: check size, and ctim is not equal
/// on filesystems that have granular ctimes
assert.Equal(t, srcFileSys.Mode, dstFileSys.Mode)
assert.Equal(t, srcFileSys.Uid, dstFileSys.Uid)
assert.Equal(t, srcFileSys.Gid, dstFileSys.Gid)
assert.Equal(t, srcFileSys.Mtim, dstFileSys.Mtim)
return nil
}))
}
func randomMode(baseMode int) os.FileMode {
for i := 0; i < 7; i++ {
baseMode = baseMode | (1&rand.Intn(2))<<uint(i)
}
return os.FileMode(baseMode)
}
func populateSrcDir(t *testing.T, srcDir string, remainingDepth int) {
if remainingDepth == 0 {
return
}
aTime := time.Unix(rand.Int63(), 0)
mTime := time.Unix(rand.Int63(), 0)
for i := 0; i < 10; i++ {
dirName := filepath.Join(srcDir, fmt.Sprintf("srcdir-%d", i))
// Owner all bits set
require.NoError(t, os.Mkdir(dirName, randomMode(0700)))
populateSrcDir(t, dirName, remainingDepth-1)
require.NoError(t, system.Chtimes(dirName, aTime, mTime))
}
for i := 0; i < 10; i++ {
fileName := filepath.Join(srcDir, fmt.Sprintf("srcfile-%d", i))
// Owner read bit set
require.NoError(t, ioutil.WriteFile(fileName, []byte{}, randomMode(0400)))
require.NoError(t, system.Chtimes(fileName, aTime, mTime))
}
}
func doCopyTest(t *testing.T, copyWithFileRange, copyWithFileClone *bool) {
dir, err := ioutil.TempDir("", "docker-copy-check")
require.NoError(t, err)
@ -65,3 +148,32 @@ func doCopyTest(t *testing.T, copyWithFileRange, copyWithFileClone *bool) {
require.NoError(t, err)
assert.Equal(t, buf, readBuf)
}
func TestCopyHardlink(t *testing.T) {
var srcFile1FileInfo, srcFile2FileInfo, dstFile1FileInfo, dstFile2FileInfo unix.Stat_t
srcDir, err := ioutil.TempDir("", "srcDir")
require.NoError(t, err)
defer os.RemoveAll(srcDir)
dstDir, err := ioutil.TempDir("", "dstDir")
require.NoError(t, err)
defer os.RemoveAll(dstDir)
srcFile1 := filepath.Join(srcDir, "file1")
srcFile2 := filepath.Join(srcDir, "file2")
dstFile1 := filepath.Join(dstDir, "file1")
dstFile2 := filepath.Join(dstDir, "file2")
require.NoError(t, ioutil.WriteFile(srcFile1, []byte{}, 0777))
require.NoError(t, os.Link(srcFile1, srcFile2))
assert.NoError(t, DirCopy(srcDir, dstDir, Content, false))
require.NoError(t, unix.Stat(srcFile1, &srcFile1FileInfo))
require.NoError(t, unix.Stat(srcFile2, &srcFile2FileInfo))
require.Equal(t, srcFile1FileInfo.Ino, srcFile2FileInfo.Ino)
require.NoError(t, unix.Stat(dstFile1, &dstFile1FileInfo))
require.NoError(t, unix.Stat(dstFile2, &dstFile2FileInfo))
assert.Equal(t, dstFile1FileInfo.Ino, dstFile2FileInfo.Ino)
}

View File

@ -327,7 +327,7 @@ func (d *Driver) Create(id, parent string, opts *graphdriver.CreateOpts) (retErr
return err
}
return copy.DirCopy(parentUpperDir, upperDir, copy.Content)
return copy.DirCopy(parentUpperDir, upperDir, copy.Content, true)
}
func (d *Driver) dir(id string) string {
@ -466,7 +466,7 @@ func (d *Driver) ApplyDiff(id string, parent string, diff io.Reader) (size int64
}
}()
if err = copy.DirCopy(parentRootDir, tmpRootDir, copy.Hardlink); err != nil {
if err = copy.DirCopy(parentRootDir, tmpRootDir, copy.Hardlink, true); err != nil {
return 0, err
}

View File

@ -0,0 +1,9 @@
// +build linux
package vfs
import "github.com/docker/docker/daemon/graphdriver/copy"
func dirCopy(srcDir, dstDir string) error {
return copy.DirCopy(srcDir, dstDir, copy.Content, false)
}

View File

@ -0,0 +1,9 @@
// +build !linux
package vfs
import "github.com/docker/docker/pkg/chrootarchive"
func dirCopy(srcDir, dstDir string) error {
return chrootarchive.NewArchiver(nil).CopyWithTar(srcDir, dstDir)
}

View File

@ -7,7 +7,6 @@ import (
"github.com/docker/docker/daemon/graphdriver"
"github.com/docker/docker/daemon/graphdriver/quota"
"github.com/docker/docker/pkg/chrootarchive"
"github.com/docker/docker/pkg/containerfs"
"github.com/docker/docker/pkg/idtools"
"github.com/docker/docker/pkg/system"
@ -16,8 +15,8 @@ import (
)
var (
// CopyWithTar defines the copy method to use.
CopyWithTar = chrootarchive.NewArchiver(nil).CopyWithTar
// CopyDir defines the copy method to use.
CopyDir = dirCopy
)
func init() {
@ -133,7 +132,7 @@ func (d *Driver) create(id, parent string, size uint64) error {
if err != nil {
return fmt.Errorf("%s: %s", parent, err)
}
return CopyWithTar(parentDir.Path(), dir)
return CopyDir(parentDir.Path(), dir)
}
func (d *Driver) dir(id string) string {

View File

@ -48,11 +48,12 @@ const (
defaultRetryWait = 1000
defaultMaxRetries = math.MaxInt32
addressKey = "fluentd-address"
bufferLimitKey = "fluentd-buffer-limit"
retryWaitKey = "fluentd-retry-wait"
maxRetriesKey = "fluentd-max-retries"
asyncConnectKey = "fluentd-async-connect"
addressKey = "fluentd-address"
bufferLimitKey = "fluentd-buffer-limit"
retryWaitKey = "fluentd-retry-wait"
maxRetriesKey = "fluentd-max-retries"
asyncConnectKey = "fluentd-async-connect"
subSecondPrecisionKey = "fluentd-sub-second-precision"
)
func init() {
@ -117,15 +118,23 @@ func New(info logger.Info) (logger.Logger, error) {
}
}
subSecondPrecision := false
if info.Config[subSecondPrecisionKey] != "" {
if subSecondPrecision, err = strconv.ParseBool(info.Config[subSecondPrecisionKey]); err != nil {
return nil, err
}
}
fluentConfig := fluent.Config{
FluentPort: loc.port,
FluentHost: loc.host,
FluentNetwork: loc.protocol,
FluentSocketPath: loc.path,
BufferLimit: bufferLimit,
RetryWait: retryWait,
MaxRetry: maxRetries,
AsyncConnect: asyncConnect,
FluentPort: loc.port,
FluentHost: loc.host,
FluentNetwork: loc.protocol,
FluentSocketPath: loc.path,
BufferLimit: bufferLimit,
RetryWait: retryWait,
MaxRetry: maxRetries,
AsyncConnect: asyncConnect,
SubSecondPrecision: subSecondPrecision,
}
logrus.WithField("container", info.ContainerID).WithField("config", fluentConfig).
@ -183,6 +192,7 @@ func ValidateLogOpt(cfg map[string]string) error {
case retryWaitKey:
case maxRetriesKey:
case asyncConnectKey:
case subSecondPrecisionKey:
// Accepted
default:
return fmt.Errorf("unknown log opt '%s' for fluentd log driver", key)

View File

@ -4,7 +4,7 @@ TOMLV_COMMIT=9baf8a8a9f2ed20a8e54160840c492f937eeaf9a
# When updating RUNC_COMMIT, also update runc in vendor.conf accordingly
RUNC_COMMIT=b2567b37d7b75eb4cf325b77297b140ea686ce8f
CONTAINERD_COMMIT=6bff39c643886dfa3d546e83a90a527b64ddeacf
CONTAINERD_COMMIT=cc969fb42f427a68a8cc6870ef47f17304b83962
TINI_COMMIT=949e6facb77383876aeff8a6944dde66b3089574
LIBNETWORK_COMMIT=7b2b1feb1de4817d522cc372af149ff48d25028e
VNDR_COMMIT=a6e196d8b4b0cbbdc29aebdb20c59ac6926bb384

View File

@ -23,7 +23,7 @@ import (
func init() {
graphdriver.ApplyUncompressedLayer = archive.UnpackLayer
defaultArchiver := archive.NewDefaultArchiver()
vfs.CopyWithTar = defaultArchiver.CopyWithTar
vfs.CopyDir = defaultArchiver.CopyWithTar
}
func newVFSGraphDriver(td string) (graphdriver.Driver, error) {

View File

@ -79,10 +79,10 @@ github.com/golang/protobuf 7a211bcf3bce0e3f1d74f9894916e6f116ae83b4
# gelf logging driver deps
github.com/Graylog2/go-gelf v2
github.com/fluent/fluent-logger-golang v1.2.1
github.com/fluent/fluent-logger-golang v1.3.0
# fluent-logger-golang deps
github.com/philhofer/fwd 98c11a7a6ec829d672b03833c3d69a7fae1ca972
github.com/tinylib/msgp 75ee40d2601edf122ef667e2a07d600d4c44490c
github.com/tinylib/msgp 3b556c64540842d4f82967be066a7f7fffc3adad
# fsnotify
github.com/fsnotify/fsnotify 4da3e2cfbabc9f751898f250b49f2439785783a1
@ -103,7 +103,7 @@ github.com/googleapis/gax-go da06d194a00e19ce00d9011a13931c3f6f6887c7
google.golang.org/genproto d80a6e20e776b0b17a324d0ba1ab50a39c8e8944
# containerd
github.com/containerd/containerd 6bff39c643886dfa3d546e83a90a527b64ddeacf
github.com/containerd/containerd cc969fb42f427a68a8cc6870ef47f17304b83962
github.com/containerd/fifo fbfb6a11ec671efbe94ad1c12c2e98773f19e1e6
github.com/containerd/continuity 35d55c5e8dd23b32037d56cf97174aff3efdfa83
github.com/containerd/cgroups 29da22c6171a4316169f9205ab6c49f59b5b852f
@ -111,7 +111,7 @@ github.com/containerd/console 84eeaae905fa414d03e07bcd6c8d3f19e7cf180e
github.com/containerd/go-runc ed1cbe1fc31f5fb2359d3a54b6330d1a097858b7
github.com/containerd/typeurl f6943554a7e7e88b3c14aad190bf05932da84788
github.com/dmcgowan/go-tar go1.10
github.com/stevvooe/ttrpc 8c92e22ce0c492875ccaac3ab06143a77d8ed0c1
github.com/stevvooe/ttrpc 76e68349ad9ab4d03d764c713826d31216715e4f
# cluster
github.com/docker/swarmkit de950a7ed842c7b7e47e9451cde9bf8f96031894

View File

@ -15,6 +15,28 @@ containerd is designed to be embedded into a larger system, rather than being us
If you are interested in trying out containerd please see our [Getting Started Guide](docs/getting-started.md).
## Runtime Requirements
Runtime requirements for containerd are very minimal. Most interactions with
the Linux and Windows container feature sets are handled via [runc](https://github.com/opencontainers/runc) and/or
OS-specific libraries (e.g. [hcsshim](https://github.com/Microsoft/hcsshim) for Microsoft). There are specific features
used by containerd core code and snapshotters that will require a minimum kernel
version on Linux. With the understood caveat of distro kernel versioning, a
reasonable starting point for Linux is a minimum 4.x kernel version.
The overlay filesystem snapshotter, used by default, uses features that were
finalized in the 4.x kernel series. If you choose to use btrfs, there may
be more flexibility in kernel version (minimum recommended is 3.18), but will
require the btrfs kernel module and btrfs tools to be installed on your Linux
distribution.
To use Linux checkpoint and restore features, you will need `criu` installed on
your system. See more details in [Checkpoint and Restore](#checkpoint-and-restore).
The current required version of runc is always listed in [RUNC.md](/RUNC.md).
Build requirements for developers are listed in the [Developer Quick-Start](#developer-quick-start) section.
## Features
### Client
@ -93,7 +115,6 @@ image, err := client.Pull(context, "docker.io/library/redis:latest", containerd.
redis, err := client.NewContainer(context, "redis-master",
containerd.WithNewSnapshot("redis-rootfs", image),
containerd.WithNewSpec(oci.WithImageConfig(image)),
)
// use a readonly filesystem with multiple containers
@ -150,7 +171,7 @@ defer task.Delete(context)
err := task.Start(context)
```
## Developer Quick-Start
## Developer Quick Start
To build the daemon and `ctr` simple test client, the following build system dependencies are required:

View File

@ -162,11 +162,17 @@ func (c *container) Image(ctx context.Context) (Image, error) {
}, nil
}
func (c *container) NewTask(ctx context.Context, ioCreate cio.Creation, opts ...NewTaskOpts) (Task, error) {
func (c *container) NewTask(ctx context.Context, ioCreate cio.Creation, opts ...NewTaskOpts) (_ Task, err error) {
i, err := ioCreate(c.id)
if err != nil {
return nil, err
}
defer func() {
if err != nil && i != nil {
i.Cancel()
i.Close()
}
}()
cfg := i.Config()
request := &tasks.CreateTaskRequest{
ContainerID: c.id,

View File

@ -24,7 +24,6 @@ import (
"github.com/opencontainers/image-spec/identity"
"github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"golang.org/x/sys/unix"
)
// WithCheckpoint allows a container to be created from the checkpointed information
@ -193,14 +192,17 @@ func remapRootFS(mounts []mount.Mount, uid, gid uint32) error {
if err != nil {
return err
}
defer os.RemoveAll(root)
defer os.Remove(root)
for _, m := range mounts {
if err := m.Mount(root); err != nil {
return err
}
}
defer unix.Unmount(root, 0)
return filepath.Walk(root, incrementFS(root, uid, gid))
err = filepath.Walk(root, incrementFS(root, uid, gid))
if uerr := mount.Unmount(root, 0); err == nil {
err = uerr
}
return err
}
func incrementFS(root string, uidInc, gidInc uint32) filepath.WalkFunc {

View File

@ -62,7 +62,7 @@ func NewStore(root string) (content.Store, error) {
// require labels and should use `NewStore`. `NewLabeledStore` is primarily
// useful for tests or standalone implementations.
func NewLabeledStore(root string, ls LabelStore) (content.Store, error) {
if err := os.MkdirAll(filepath.Join(root, "ingest"), 0777); err != nil && !os.IsExist(err) {
if err := os.MkdirAll(filepath.Join(root, "ingest"), 0777); err != nil {
return nil, err
}

View File

@ -147,7 +147,7 @@ func (i *image) getLayers(ctx context.Context, platform string) ([]rootfs.Layer,
manifest, err := images.Manifest(ctx, cs, i.i.Target, platform)
if err != nil {
return nil, errors.Wrap(err, "")
return nil, err
}
diffIDs, err := i.i.RootFS(ctx, cs, platform)

View File

@ -187,13 +187,13 @@ func Manifest(ctx context.Context, provider content.Provider, image ocispec.Desc
return descs, nil
}
return nil, errors.Wrap(errdefs.ErrNotFound, "could not resolve manifest")
return nil, errors.Wrapf(errdefs.ErrNotFound, "unexpected media type %v for %v", desc.MediaType, desc.Digest)
}), image); err != nil {
return ocispec.Manifest{}, err
}
if m == nil {
return ocispec.Manifest{}, errors.Wrap(errdefs.ErrNotFound, "manifest not found")
return ocispec.Manifest{}, errors.Wrapf(errdefs.ErrNotFound, "manifest %v", image.Digest)
}
return *m, nil
@ -257,7 +257,7 @@ func Check(ctx context.Context, provider content.Provider, image ocispec.Descrip
return false, []ocispec.Descriptor{image}, nil, []ocispec.Descriptor{image}, nil
}
return false, nil, nil, nil, errors.Wrap(err, "image check failed")
return false, nil, nil, nil, errors.Wrapf(err, "failed to check image %v", image.Digest)
}
// TODO(stevvooe): It is possible that referenced conponents could have
@ -272,7 +272,7 @@ func Check(ctx context.Context, provider content.Provider, image ocispec.Descrip
missing = append(missing, desc)
continue
} else {
return false, nil, nil, nil, err
return false, nil, nil, nil, errors.Wrapf(err, "failed to check image %v", desc.Digest)
}
}
ra.Close()

View File

@ -75,10 +75,10 @@ type bundle struct {
type ShimOpt func(*bundle, string, *runctypes.RuncOptions) (shim.Config, client.Opt)
// ShimRemote is a ShimOpt for connecting and starting a remote shim
func ShimRemote(shimBinary, daemonAddress, cgroup string, nonewns, debug bool, exitHandler func()) ShimOpt {
func ShimRemote(shimBinary, daemonAddress, cgroup string, debug bool, exitHandler func()) ShimOpt {
return func(b *bundle, ns string, ropts *runctypes.RuncOptions) (shim.Config, client.Opt) {
return b.shimConfig(ns, ropts),
client.WithStart(shimBinary, b.shimAddress(ns), daemonAddress, cgroup, nonewns, debug, exitHandler)
client.WithStart(shimBinary, b.shimAddress(ns), daemonAddress, cgroup, debug, exitHandler)
}
}

View File

@ -22,6 +22,7 @@ import (
shim "github.com/containerd/containerd/linux/shim/v1"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/plugin"
@ -78,17 +79,6 @@ type Config struct {
NoShim bool `toml:"no_shim"`
// Debug enable debug on the shim
ShimDebug bool `toml:"shim_debug"`
// ShimNoMountNS prevents the runtime from putting shims into their own mount namespace.
//
// Putting the shim in its own mount namespace ensure that any mounts made
// by it in order to get the task rootfs ready will be undone regardless
// on how the shim dies.
//
// NOTE: This should only be used in kernel older than 3.18 to avoid shims
// from causing a DoS in their parent namespace due to having a copy of
// mounts previously there which would prevent unlink, rename and remove
// operations on those mountpoints.
ShimNoMountNS bool `toml:"shim_no_newns"`
}
// New returns a configured runtime
@ -226,8 +216,7 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts
}).Warn("failed to clen up after killed shim")
}
}
shimopt = ShimRemote(r.config.Shim, r.address, cgroup,
r.config.ShimNoMountNS, r.config.ShimDebug, exitHandler)
shimopt = ShimRemote(r.config.Shim, r.address, cgroup, r.config.ShimDebug, exitHandler)
}
s, err := bundle.NewShimClient(ctx, namespace, shimopt, ropts)
@ -486,7 +475,7 @@ func (r *Runtime) terminate(ctx context.Context, bundle *bundle, ns, id string)
}); err != nil {
log.G(ctx).WithError(err).Warnf("delete runtime state %s", id)
}
if err := unix.Unmount(filepath.Join(bundle.path, "rootfs"), 0); err != nil {
if err := mount.Unmount(filepath.Join(bundle.path, "rootfs"), 0); err != nil {
log.G(ctx).WithError(err).WithFields(logrus.Fields{
"path": bundle.path,
"id": id,

View File

@ -34,7 +34,7 @@ var empty = &ptypes.Empty{}
type Opt func(context.Context, shim.Config) (shimapi.ShimService, io.Closer, error)
// WithStart executes a new shim process
func WithStart(binary, address, daemonAddress, cgroup string, nonewns, debug bool, exitHandler func()) Opt {
func WithStart(binary, address, daemonAddress, cgroup string, debug bool, exitHandler func()) Opt {
return func(ctx context.Context, config shim.Config) (_ shimapi.ShimService, _ io.Closer, err error) {
socket, err := newSocket(address)
if err != nil {
@ -47,7 +47,7 @@ func WithStart(binary, address, daemonAddress, cgroup string, nonewns, debug boo
}
defer f.Close()
cmd := newCommand(binary, daemonAddress, nonewns, debug, config, f)
cmd := newCommand(binary, daemonAddress, debug, config, f)
ec, err := reaper.Default.Start(cmd)
if err != nil {
return nil, nil, errors.Wrapf(err, "failed to start shim")
@ -87,7 +87,7 @@ func WithStart(binary, address, daemonAddress, cgroup string, nonewns, debug boo
}
}
func newCommand(binary, daemonAddress string, nonewns, debug bool, config shim.Config, socket *os.File) *exec.Cmd {
func newCommand(binary, daemonAddress string, debug bool, config shim.Config, socket *os.File) *exec.Cmd {
selfExe, err := os.Executable()
if err != nil {
panic(err)
@ -117,7 +117,7 @@ func newCommand(binary, daemonAddress string, nonewns, debug bool, config shim.C
// make sure the shim can be re-parented to system init
// and is cloned in a new mount namespace because the overlay/filesystems
// will be mounted by the shim
cmd.SysProcAttr = getSysProcAttr(nonewns)
cmd.SysProcAttr = getSysProcAttr()
cmd.ExtraFiles = append(cmd.ExtraFiles, socket)
if debug {
cmd.Stdout = os.Stdout

View File

@ -10,14 +10,10 @@ import (
"github.com/pkg/errors"
)
func getSysProcAttr(nonewns bool) *syscall.SysProcAttr {
attr := syscall.SysProcAttr{
func getSysProcAttr() *syscall.SysProcAttr {
return &syscall.SysProcAttr{
Setpgid: true,
}
if !nonewns {
attr.Cloneflags = syscall.CLONE_NEWNS
}
return &attr
}
func setCgroup(cgroupPath string, cmd *exec.Cmd) error {

View File

@ -7,7 +7,7 @@ import (
"syscall"
)
func getSysProcAttr(nonewns bool) *syscall.SysProcAttr {
func getSysProcAttr() *syscall.SysProcAttr {
return &syscall.SysProcAttr{
Setpgid: true,
}

View File

@ -7,8 +7,8 @@ import (
"path/filepath"
shimapi "github.com/containerd/containerd/linux/shim/v1"
"github.com/containerd/containerd/mount"
ptypes "github.com/gogo/protobuf/types"
"golang.org/x/sys/unix"
)
// NewLocal returns a shim client implementation for issue commands to a shim
@ -32,7 +32,7 @@ func (c *local) Start(ctx context.Context, in *shimapi.StartRequest) (*shimapi.S
func (c *local) Delete(ctx context.Context, in *ptypes.Empty) (*shimapi.DeleteResponse, error) {
// make sure we unmount the containers rootfs for this local
if err := unix.Unmount(filepath.Join(c.s.config.Path, "rootfs"), 0); err != nil {
if err := mount.Unmount(filepath.Join(c.s.config.Path, "rootfs"), 0); err != nil {
return nil, err
}
return c.s.Delete(ctx, in)

View File

@ -37,12 +37,12 @@ func (s *containerStore) Get(ctx context.Context, id string) (containers.Contain
bkt := getContainerBucket(s.tx, namespace, id)
if bkt == nil {
return containers.Container{}, errors.Wrapf(errdefs.ErrNotFound, "bucket name %q:%q", namespace, id)
return containers.Container{}, errors.Wrapf(errdefs.ErrNotFound, "container %q in namespace %q", id, namespace)
}
container := containers.Container{ID: id}
if err := readContainer(&container, bkt); err != nil {
return containers.Container{}, errors.Wrapf(err, "failed to read container %v", id)
return containers.Container{}, errors.Wrapf(err, "failed to read container %q", id)
}
return container, nil
@ -61,7 +61,7 @@ func (s *containerStore) List(ctx context.Context, fs ...string) ([]containers.C
bkt := getContainersBucket(s.tx, namespace)
if bkt == nil {
return nil, nil
return nil, nil // empty store
}
var m []containers.Container
@ -73,7 +73,7 @@ func (s *containerStore) List(ctx context.Context, fs ...string) ([]containers.C
container := containers.Container{ID: string(k)}
if err := readContainer(&container, cbkt); err != nil {
return errors.Wrap(err, "failed to read container")
return errors.Wrapf(err, "failed to read container %q", string(k))
}
if filter.Match(adaptContainer(container)) {
@ -113,7 +113,7 @@ func (s *containerStore) Create(ctx context.Context, container containers.Contai
container.CreatedAt = time.Now().UTC()
container.UpdatedAt = container.CreatedAt
if err := writeContainer(cbkt, &container); err != nil {
return containers.Container{}, errors.Wrap(err, "failed to write container")
return containers.Container{}, errors.Wrapf(err, "failed to write container %q", container.ID)
}
return container, nil
@ -131,7 +131,7 @@ func (s *containerStore) Update(ctx context.Context, container containers.Contai
bkt := getContainersBucket(s.tx, namespace)
if bkt == nil {
return containers.Container{}, errors.Wrapf(errdefs.ErrNotFound, "container %q", container.ID)
return containers.Container{}, errors.Wrapf(errdefs.ErrNotFound, "cannot update container %q in namespace %q", container.ID, namespace)
}
cbkt := bkt.Bucket([]byte(container.ID))
@ -141,7 +141,7 @@ func (s *containerStore) Update(ctx context.Context, container containers.Contai
var updated containers.Container
if err := readContainer(&updated, cbkt); err != nil {
return updated, errors.Wrapf(err, "failed to read container from bucket")
return updated, errors.Wrapf(err, "failed to read container %q", container.ID)
}
createdat := updated.CreatedAt
updated.ID = container.ID
@ -211,7 +211,7 @@ func (s *containerStore) Update(ctx context.Context, container containers.Contai
updated.CreatedAt = createdat
updated.UpdatedAt = time.Now().UTC()
if err := writeContainer(cbkt, &updated); err != nil {
return containers.Container{}, errors.Wrap(err, "failed to write container")
return containers.Container{}, errors.Wrapf(err, "failed to write container %q", container.ID)
}
return updated, nil
@ -225,7 +225,7 @@ func (s *containerStore) Delete(ctx context.Context, id string) error {
bkt := getContainersBucket(s.tx, namespace)
if bkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "cannot delete container %v, bucket not present", id)
return errors.Wrapf(errdefs.ErrNotFound, "cannot delete container %q in namespace %q", id, namespace)
}
if err := bkt.DeleteBucket([]byte(id)); err == bolt.ErrBucketNotFound {
@ -236,7 +236,7 @@ func (s *containerStore) Delete(ctx context.Context, id string) error {
func validateContainer(container *containers.Container) error {
if err := identifiers.Validate(container.ID); err != nil {
return errors.Wrapf(err, "container.ID validation error")
return errors.Wrap(err, "container.ID")
}
for k := range container.Extensions {

View File

@ -138,7 +138,7 @@ func (m *DB) Init(ctx context.Context) error {
if err := m.migrate(tx); err != nil {
return errors.Wrapf(err, "failed to migrate to %s.%d", m.schema, m.version)
}
log.G(ctx).WithField("d", time.Now().Sub(t0)).Debugf("database migration to %s.%d finished", m.schema, m.version)
log.G(ctx).WithField("d", time.Now().Sub(t0)).Debugf("finished database migration to %s.%d", m.schema, m.version)
}
}
@ -269,7 +269,7 @@ func (m *DB) GarbageCollect(ctx context.Context) (stats GCStats, err error) {
stats.SnapshotD = map[string]time.Duration{}
wg.Add(len(m.dirtySS))
for snapshotterName := range m.dirtySS {
log.G(ctx).WithField("snapshotter", snapshotterName).Debug("scheduling snapshotter cleanup")
log.G(ctx).WithField("snapshotter", snapshotterName).Debug("schedule snapshotter cleanup")
go func(snapshotterName string) {
st1 := time.Now()
m.cleanupSnapshotter(snapshotterName)
@ -286,7 +286,7 @@ func (m *DB) GarbageCollect(ctx context.Context) (stats GCStats, err error) {
if m.dirtyCS {
wg.Add(1)
log.G(ctx).Debug("scheduling content cleanup")
log.G(ctx).Debug("schedule content cleanup")
go func() {
ct1 := time.Now()
m.cleanupContent()

View File

@ -301,7 +301,7 @@ func remove(ctx context.Context, tx *bolt.Tx, node gc.Node) error {
cbkt = cbkt.Bucket(bucketKeyObjectBlob)
}
if cbkt != nil {
log.G(ctx).WithField("key", node.Key).Debug("delete content")
log.G(ctx).WithField("key", node.Key).Debug("remove content")
return cbkt.DeleteBucket([]byte(node.Key))
}
case ResourceSnapshot:
@ -313,7 +313,7 @@ func remove(ctx context.Context, tx *bolt.Tx, node gc.Node) error {
}
ssbkt := sbkt.Bucket([]byte(parts[0]))
if ssbkt != nil {
log.G(ctx).WithField("key", parts[1]).WithField("snapshotter", parts[0]).Debug("delete snapshot")
log.G(ctx).WithField("key", parts[1]).WithField("snapshotter", parts[0]).Debug("remove snapshot")
return ssbkt.DeleteBucket([]byte(parts[1]))
}
}

View File

@ -359,7 +359,8 @@ func (s *snapshotter) Commit(ctx context.Context, name, key string, opts ...snap
return update(ctx, s.db, func(tx *bolt.Tx) error {
bkt := getSnapshotterBucket(tx, ns, s.name)
if bkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "snapshot %v does not exist", key)
return errors.Wrapf(errdefs.ErrNotFound,
"can not find snapshotter %q", s.name)
}
bbkt, err := bkt.CreateBucket([]byte(name))
@ -722,7 +723,7 @@ func (s *snapshotter) pruneBranch(ctx context.Context, node *treeNode) error {
if !errdefs.IsFailedPrecondition(err) {
return err
}
logger.WithError(err).WithField("key", node.info.Name).Warnf("snapshot removal failed")
logger.WithError(err).WithField("key", node.info.Name).Warnf("failed to remove snapshot")
} else {
logger.WithField("key", node.info.Name).Debug("removed snapshot")
}

View File

@ -2,7 +2,9 @@ package mount
import (
"strings"
"time"
"github.com/pkg/errors"
"golang.org/x/sys/unix"
)
@ -42,8 +44,27 @@ func (m *Mount) Mount(target string) error {
}
// Unmount the provided mount path with the flags
func Unmount(mount string, flags int) error {
return unix.Unmount(mount, flags)
func Unmount(target string, flags int) error {
if err := unmount(target, flags); err != nil && err != unix.EINVAL {
return err
}
return nil
}
func unmount(target string, flags int) error {
for i := 0; i < 50; i++ {
if err := unix.Unmount(target, flags); err != nil {
switch err {
case unix.EBUSY:
time.Sleep(50 * time.Millisecond)
continue
default:
return err
}
}
return nil
}
return errors.Wrapf(unix.EBUSY, "failed to unmount target %s", target)
}
// UnmountAll repeatedly unmounts the given mount point until there
@ -51,7 +72,7 @@ func Unmount(mount string, flags int) error {
// useful for undoing a stack of mounts on the same mount point.
func UnmountAll(mount string, flags int) error {
for {
if err := Unmount(mount, flags); err != nil {
if err := unmount(mount, flags); err != nil {
// EINVAL is returned if the target is not a
// mount point, indicating that we are
// done. It can also indicate a few other

View File

@ -12,12 +12,11 @@ import (
"strconv"
"strings"
"golang.org/x/sys/unix"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/fs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/namespaces"
"github.com/opencontainers/image-spec/specs-go/v1"
"github.com/opencontainers/runc/libcontainer/user"
@ -101,7 +100,7 @@ func WithImageConfig(image Image) SpecOpts {
parts := strings.Split(config.User, ":")
switch len(parts) {
case 1:
v, err := strconv.ParseUint(parts[0], 0, 10)
v, err := strconv.Atoi(parts[0])
if err != nil {
// if we cannot parse as a uint they try to see if it is a username
if err := WithUsername(config.User)(ctx, client, c, s); err != nil {
@ -113,13 +112,13 @@ func WithImageConfig(image Image) SpecOpts {
return err
}
case 2:
v, err := strconv.ParseUint(parts[0], 0, 10)
v, err := strconv.Atoi(parts[0])
if err != nil {
return err
return errors.Wrapf(err, "parse uid %s", parts[0])
}
uid := uint32(v)
if v, err = strconv.ParseUint(parts[1], 0, 10); err != nil {
return err
if v, err = strconv.Atoi(parts[1]); err != nil {
return errors.Wrapf(err, "parse gid %s", parts[1])
}
gid := uint32(v)
s.Process.User.UID, s.Process.User.GID = uid, gid
@ -260,7 +259,7 @@ func WithUIDGID(uid, gid uint32) SpecOpts {
// or uid is not found in /etc/passwd, it sets gid to be the same with
// uid, and not returns error.
func WithUserID(uid uint32) SpecOpts {
return func(ctx context.Context, client Client, c *containers.Container, s *specs.Spec) error {
return func(ctx context.Context, client Client, c *containers.Container, s *specs.Spec) (err error) {
if c.Snapshotter == "" {
return errors.Errorf("no snapshotter set for container")
}
@ -276,13 +275,19 @@ func WithUserID(uid uint32) SpecOpts {
if err != nil {
return err
}
defer os.RemoveAll(root)
defer os.Remove(root)
for _, m := range mounts {
if err := m.Mount(root); err != nil {
return err
}
}
defer unix.Unmount(root, 0)
defer func() {
if uerr := mount.Unmount(root, 0); uerr != nil {
if err == nil {
err = uerr
}
}
}()
ppath, err := fs.RootPath(root, "/etc/passwd")
if err != nil {
return err
@ -317,7 +322,7 @@ func WithUserID(uid uint32) SpecOpts {
// does not exist, or the username is not found in /etc/passwd,
// it returns error.
func WithUsername(username string) SpecOpts {
return func(ctx context.Context, client Client, c *containers.Container, s *specs.Spec) error {
return func(ctx context.Context, client Client, c *containers.Container, s *specs.Spec) (err error) {
if c.Snapshotter == "" {
return errors.Errorf("no snapshotter set for container")
}
@ -333,13 +338,19 @@ func WithUsername(username string) SpecOpts {
if err != nil {
return err
}
defer os.RemoveAll(root)
defer os.Remove(root)
for _, m := range mounts {
if err := m.Mount(root); err != nil {
return err
}
}
defer unix.Unmount(root, 0)
defer func() {
if uerr := mount.Unmount(root, 0); uerr != nil {
if err == nil {
err = uerr
}
}
}()
ppath, err := fs.RootPath(root, "/etc/passwd")
if err != nil {
return err

View File

@ -60,3 +60,11 @@ func WithTTY(width, height int) SpecOpts {
return nil
}
}
// WithUsername sets the username on the process
func WithUsername(username string) SpecOpts {
return func(ctx context.Context, client Client, c *containers.Container, s *specs.Spec) error {
s.Process.User.Username = username
return nil
}
}

View File

@ -55,10 +55,10 @@ func ApplyLayer(ctx context.Context, layer Layer, chain []digest.Digest, sn snap
_, err := sn.Stat(ctx, chainID.String())
if err == nil {
log.G(ctx).Debugf("Extraction not needed, layer snapshot exists")
log.G(ctx).Debugf("Extraction not needed, layer snapshot %s exists", chainID)
return false, nil
} else if !errdefs.IsNotFound(err) {
return false, errors.Wrap(err, "failed to stat snapshot")
return false, errors.Wrapf(err, "failed to stat snapshot %s", chainID)
}
key := fmt.Sprintf("extract-%s %s", uniquePart(), chainID)
@ -67,7 +67,7 @@ func ApplyLayer(ctx context.Context, layer Layer, chain []digest.Digest, sn snap
mounts, err := sn.Prepare(ctx, key, parent.String(), opts...)
if err != nil {
//TODO: If is snapshot exists error, retry
return false, errors.Wrap(err, "failed to prepare extraction layer")
return false, errors.Wrapf(err, "failed to prepare extraction snapshot %q", key)
}
defer func() {
if err != nil {
@ -89,7 +89,7 @@ func ApplyLayer(ctx context.Context, layer Layer, chain []digest.Digest, sn snap
if err = sn.Commit(ctx, chainID.String(), key, opts...); err != nil {
if !errdefs.IsAlreadyExists(err) {
return false, errors.Wrapf(err, "failed to commit snapshot %s", parent)
return false, errors.Wrapf(err, "failed to commit snapshot %s", key)
}
// Destination already exists, cleanup key and return without error

View File

@ -49,6 +49,8 @@ func (l *TaskList) Get(ctx context.Context, id string) (Task, error) {
// GetAll tasks under a namespace
func (l *TaskList) GetAll(ctx context.Context) ([]Task, error) {
l.mu.Lock()
defer l.mu.Unlock()
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err

View File

@ -277,7 +277,7 @@ func (t *task) Delete(ctx context.Context, opts ...ProcessDeleteOpts) (*ExitStat
return &ExitStatus{code: r.ExitStatus, exitedAt: r.ExitedAt}, nil
}
func (t *task) Exec(ctx context.Context, id string, spec *specs.Process, ioCreate cio.Creation) (Process, error) {
func (t *task) Exec(ctx context.Context, id string, spec *specs.Process, ioCreate cio.Creation) (_ Process, err error) {
if id == "" {
return nil, errors.Wrapf(errdefs.ErrInvalidArgument, "exec id must not be empty")
}
@ -285,6 +285,12 @@ func (t *task) Exec(ctx context.Context, id string, spec *specs.Process, ioCreat
if err != nil {
return nil, err
}
defer func() {
if err != nil && i != nil {
i.Cancel()
i.Close()
}
}()
any, err := typeurl.MarshalAny(spec)
if err != nil {
return nil, err

View File

@ -41,4 +41,4 @@ github.com/boltdb/bolt e9cf4fae01b5a8ff89d0ec6b32f0d9c9f79aefdd
google.golang.org/genproto d80a6e20e776b0b17a324d0ba1ab50a39c8e8944
golang.org/x/text 19e51611da83d6be54ddafce4a4af510cb3e9ea4
github.com/dmcgowan/go-tar go1.10
github.com/stevvooe/ttrpc 8c92e22ce0c492875ccaac3ab06143a77d8ed0c1
github.com/stevvooe/ttrpc 76e68349ad9ab4d03d764c713826d31216715e4f

View File

@ -21,7 +21,7 @@ import "github.com/fluent/fluent-logger-golang/fluent"
GoDoc: http://godoc.org/github.com/fluent/fluent-logger-golang/fluent
##Example
## Example
```go
package main
@ -44,14 +44,14 @@ func main() {
"hoge": "hoge",
}
error := logger.Post(tag, data)
// error := logger.Post(tag, time.Time.Now(), data)
// error := logger.PostWithTime(tag, time.Now(), data)
if error != nil {
panic(error)
}
}
```
`data` must be a value like `map[string]literal`, `map[string]interface{}` or `struct`. Logger refers tags `msg` or `codec` of each fields of structs.
`data` must be a value like `map[string]literal`, `map[string]interface{}`, `struct` or [`msgp.Marshaler`](http://godoc.org/github.com/tinylib/msgp/msgp#Marshaler). Logger refers tags `msg` or `codec` of each fields of structs.
## Setting config values
@ -59,6 +59,11 @@ func main() {
f := fluent.New(fluent.Config{FluentPort: 80, FluentHost: "example.com"})
```
### WriteTimeout
Sets the timeout for Write call of logger.Post.
Since the default is zero value, Write will not time out.
## Tests
```
go test

View File

@ -4,13 +4,14 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"math"
"net"
"reflect"
"strconv"
"sync"
"time"
"github.com/tinylib/msgp/msgp"
)
const (
@ -19,10 +20,14 @@ const (
defaultSocketPath = ""
defaultPort = 24224
defaultTimeout = 3 * time.Second
defaultWriteTimeout = time.Duration(0) // Write() will not time out
defaultBufferLimit = 8 * 1024 * 1024
defaultRetryWait = 500
defaultMaxRetry = 13
defaultReconnectWaitIncreRate = 1.5
// Default sub-second precision value to false since it is only compatible
// with fluentd versions v0.14 and above.
defaultSubSecondPrecision = false
)
type Config struct {
@ -31,12 +36,17 @@ type Config struct {
FluentNetwork string `json:"fluent_network"`
FluentSocketPath string `json:"fluent_socket_path"`
Timeout time.Duration `json:"timeout"`
WriteTimeout time.Duration `json:"write_timeout"`
BufferLimit int `json:"buffer_limit"`
RetryWait int `json:"retry_wait"`
MaxRetry int `json:"max_retry"`
TagPrefix string `json:"tag_prefix"`
AsyncConnect bool `json:"async_connect"`
MarshalAsJSON bool `json:"marshal_as_json"`
// Sub-second precision timestamps are only possible for those using fluentd
// v0.14+ and serializing their messages with msgpack.
SubSecondPrecision bool `json:"sub_second_precision"`
}
type Fluent struct {
@ -46,7 +56,7 @@ type Fluent struct {
pending []byte
muconn sync.Mutex
conn io.WriteCloser
conn net.Conn
reconnecting bool
}
@ -67,6 +77,9 @@ func New(config Config) (f *Fluent, err error) {
if config.Timeout == 0 {
config.Timeout = defaultTimeout
}
if config.WriteTimeout == 0 {
config.WriteTimeout = defaultWriteTimeout
}
if config.BufferLimit == 0 {
config.BufferLimit = defaultBufferLimit
}
@ -90,9 +103,6 @@ func New(config Config) (f *Fluent, err error) {
//
// Examples:
//
// // send string
// f.Post("tag_name", "data")
//
// // send map[string]
// mapStringData := map[string]string{
// "foo": "bar",
@ -124,6 +134,10 @@ func (f *Fluent) PostWithTime(tag string, tm time.Time, message interface{}) err
tag = f.TagPrefix + "." + tag
}
if m, ok := message.(msgp.Marshaler); ok {
return f.EncodeAndPostData(tag, tm, m)
}
msg := reflect.ValueOf(message)
msgtype := msg.Type()
@ -203,6 +217,9 @@ func (f *Fluent) EncodeData(tag string, tm time.Time, message interface{}) (data
msg := Message{Tag: tag, Time: timeUnix, Record: message}
chunk := &MessageChunk{message: msg}
data, err = json.Marshal(chunk)
} else if f.Config.SubSecondPrecision {
msg := &MessageExt{Tag: tag, Time: EventTime(tm), Record: message}
data, err = msg.MarshalMsg(nil)
} else {
msg := &Message{Tag: tag, Time: timeUnix, Record: message}
data, err = msg.MarshalMsg(nil)
@ -297,6 +314,12 @@ func (f *Fluent) send() error {
var err error
if len(f.pending) > 0 {
t := f.Config.WriteTimeout
if time.Duration(0) < t {
f.conn.SetWriteDeadline(time.Now().Add(t))
} else {
f.conn.SetWriteDeadline(time.Time{})
}
_, err = f.conn.Write(f.pending)
if err != nil {
f.conn.Close()

View File

@ -2,6 +2,12 @@
package fluent
import (
"time"
"github.com/tinylib/msgp/msgp"
)
//msgp:tuple Entry
type Entry struct {
Time int64 `msg:"time"`
@ -22,3 +28,69 @@ type Message struct {
Record interface{} `msg:"record"`
Option interface{} `msg:"option"`
}
//msgp:tuple MessageExt
type MessageExt struct {
Tag string `msg:"tag"`
Time EventTime `msg:"time,extension"`
Record interface{} `msg:"record"`
Option interface{} `msg:"option"`
}
// EventTime is an extension to the serialized time value. It builds in support
// for sub-second (nanosecond) precision in serialized timestamps.
//
// You can find the full specification for the msgpack message payload here:
// https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1.
//
// You can find more information on msgpack extension types here:
// https://github.com/tinylib/msgp/wiki/Using-Extensions.
type EventTime time.Time
const (
extensionType = 0
length = 8
)
func init() {
msgp.RegisterExtension(extensionType, func() msgp.Extension { return new(EventTime) })
}
func (t *EventTime) ExtensionType() int8 { return extensionType }
func (t *EventTime) Len() int { return length }
func (t *EventTime) MarshalBinaryTo(b []byte) error {
// Unwrap to Golang time
goTime := time.Time(*t)
// There's no support for timezones in fluentd's protocol for EventTime.
// Convert to UTC.
utc := goTime.UTC()
// Warning! Converting seconds to an int32 is a lossy operation. This code
// will hit the "Year 2038" problem.
sec := int32(utc.Unix())
nsec := utc.Nanosecond()
// Fill the buffer with 4 bytes for the second component of the timestamp.
b[0] = byte(sec >> 24)
b[1] = byte(sec >> 16)
b[2] = byte(sec >> 8)
b[3] = byte(sec)
// Fill the buffer with 4 bytes for the nanosecond component of the
// timestamp.
b[4] = byte(nsec >> 24)
b[5] = byte(nsec >> 16)
b[6] = byte(nsec >> 8)
b[7] = byte(nsec)
return nil
}
// UnmarshalBinary is not implemented since decoding messages is not supported
// by this library.
func (t *EventTime) UnmarshalBinary(b []byte) error {
return nil
}

View File

@ -10,13 +10,13 @@ import (
// DecodeMsg implements msgp.Decodable
func (z *Entry) DecodeMsg(dc *msgp.Reader) (err error) {
var ssz uint32
ssz, err = dc.ReadArrayHeader()
var zxvk uint32
zxvk, err = dc.ReadArrayHeader()
if err != nil {
return
}
if ssz != 2 {
err = msgp.ArrayError{Wanted: 2, Got: ssz}
if zxvk != 2 {
err = msgp.ArrayError{Wanted: 2, Got: zxvk}
return
}
z.Time, err = dc.ReadInt64()
@ -32,9 +32,10 @@ func (z *Entry) DecodeMsg(dc *msgp.Reader) (err error) {
// EncodeMsg implements msgp.Encodable
func (z Entry) EncodeMsg(en *msgp.Writer) (err error) {
err = en.WriteArrayHeader(2)
// array header, size 2
err = en.Append(0x92)
if err != nil {
return
return err
}
err = en.WriteInt64(z.Time)
if err != nil {
@ -50,7 +51,8 @@ func (z Entry) EncodeMsg(en *msgp.Writer) (err error) {
// MarshalMsg implements msgp.Marshaler
func (z Entry) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
o = msgp.AppendArrayHeader(o, 2)
// array header, size 2
o = append(o, 0x92)
o = msgp.AppendInt64(o, z.Time)
o, err = msgp.AppendIntf(o, z.Record)
if err != nil {
@ -61,16 +63,14 @@ func (z Entry) MarshalMsg(b []byte) (o []byte, err error) {
// UnmarshalMsg implements msgp.Unmarshaler
func (z *Entry) UnmarshalMsg(bts []byte) (o []byte, err error) {
{
var ssz uint32
ssz, bts, err = msgp.ReadArrayHeaderBytes(bts)
if err != nil {
return
}
if ssz != 2 {
err = msgp.ArrayError{Wanted: 2, Got: ssz}
return
}
var zbzg uint32
zbzg, bts, err = msgp.ReadArrayHeaderBytes(bts)
if err != nil {
return
}
if zbzg != 2 {
err = msgp.ArrayError{Wanted: 2, Got: zbzg}
return
}
z.Time, bts, err = msgp.ReadInt64Bytes(bts)
if err != nil {
@ -84,51 +84,52 @@ func (z *Entry) UnmarshalMsg(bts []byte) (o []byte, err error) {
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z Entry) Msgsize() (s int) {
s = msgp.ArrayHeaderSize + msgp.Int64Size + msgp.GuessSize(z.Record)
s = 1 + msgp.Int64Size + msgp.GuessSize(z.Record)
return
}
// DecodeMsg implements msgp.Decodable
func (z *Forward) DecodeMsg(dc *msgp.Reader) (err error) {
var ssz uint32
ssz, err = dc.ReadArrayHeader()
var zcmr uint32
zcmr, err = dc.ReadArrayHeader()
if err != nil {
return
}
if ssz != 3 {
err = msgp.ArrayError{Wanted: 3, Got: ssz}
if zcmr != 3 {
err = msgp.ArrayError{Wanted: 3, Got: zcmr}
return
}
z.Tag, err = dc.ReadString()
if err != nil {
return
}
var xsz uint32
xsz, err = dc.ReadArrayHeader()
var zajw uint32
zajw, err = dc.ReadArrayHeader()
if err != nil {
return
}
if cap(z.Entries) >= int(xsz) {
z.Entries = z.Entries[:xsz]
if cap(z.Entries) >= int(zajw) {
z.Entries = (z.Entries)[:zajw]
} else {
z.Entries = make([]Entry, xsz)
z.Entries = make([]Entry, zajw)
}
for xvk := range z.Entries {
var ssz uint32
ssz, err = dc.ReadArrayHeader()
for zbai := range z.Entries {
var zwht uint32
zwht, err = dc.ReadArrayHeader()
if err != nil {
return
}
if ssz != 2 {
err = msgp.ArrayError{Wanted: 2, Got: ssz}
if zwht != 2 {
err = msgp.ArrayError{Wanted: 2, Got: zwht}
return
}
z.Entries[xvk].Time, err = dc.ReadInt64()
z.Entries[zbai].Time, err = dc.ReadInt64()
if err != nil {
return
}
z.Entries[xvk].Record, err = dc.ReadIntf()
z.Entries[zbai].Record, err = dc.ReadIntf()
if err != nil {
return
}
@ -142,9 +143,10 @@ func (z *Forward) DecodeMsg(dc *msgp.Reader) (err error) {
// EncodeMsg implements msgp.Encodable
func (z *Forward) EncodeMsg(en *msgp.Writer) (err error) {
err = en.WriteArrayHeader(3)
// array header, size 3
err = en.Append(0x93)
if err != nil {
return
return err
}
err = en.WriteString(z.Tag)
if err != nil {
@ -154,16 +156,17 @@ func (z *Forward) EncodeMsg(en *msgp.Writer) (err error) {
if err != nil {
return
}
for xvk := range z.Entries {
err = en.WriteArrayHeader(2)
for zbai := range z.Entries {
// array header, size 2
err = en.Append(0x92)
if err != nil {
return err
}
err = en.WriteInt64(z.Entries[zbai].Time)
if err != nil {
return
}
err = en.WriteInt64(z.Entries[xvk].Time)
if err != nil {
return
}
err = en.WriteIntf(z.Entries[xvk].Record)
err = en.WriteIntf(z.Entries[zbai].Record)
if err != nil {
return
}
@ -178,13 +181,15 @@ func (z *Forward) EncodeMsg(en *msgp.Writer) (err error) {
// MarshalMsg implements msgp.Marshaler
func (z *Forward) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
o = msgp.AppendArrayHeader(o, 3)
// array header, size 3
o = append(o, 0x93)
o = msgp.AppendString(o, z.Tag)
o = msgp.AppendArrayHeader(o, uint32(len(z.Entries)))
for xvk := range z.Entries {
o = msgp.AppendArrayHeader(o, 2)
o = msgp.AppendInt64(o, z.Entries[xvk].Time)
o, err = msgp.AppendIntf(o, z.Entries[xvk].Record)
for zbai := range z.Entries {
// array header, size 2
o = append(o, 0x92)
o = msgp.AppendInt64(o, z.Entries[zbai].Time)
o, err = msgp.AppendIntf(o, z.Entries[zbai].Record)
if err != nil {
return
}
@ -198,48 +203,44 @@ func (z *Forward) MarshalMsg(b []byte) (o []byte, err error) {
// UnmarshalMsg implements msgp.Unmarshaler
func (z *Forward) UnmarshalMsg(bts []byte) (o []byte, err error) {
{
var ssz uint32
ssz, bts, err = msgp.ReadArrayHeaderBytes(bts)
if err != nil {
return
}
if ssz != 3 {
err = msgp.ArrayError{Wanted: 3, Got: ssz}
return
}
var zhct uint32
zhct, bts, err = msgp.ReadArrayHeaderBytes(bts)
if err != nil {
return
}
if zhct != 3 {
err = msgp.ArrayError{Wanted: 3, Got: zhct}
return
}
z.Tag, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
return
}
var xsz uint32
xsz, bts, err = msgp.ReadArrayHeaderBytes(bts)
var zcua uint32
zcua, bts, err = msgp.ReadArrayHeaderBytes(bts)
if err != nil {
return
}
if cap(z.Entries) >= int(xsz) {
z.Entries = z.Entries[:xsz]
if cap(z.Entries) >= int(zcua) {
z.Entries = (z.Entries)[:zcua]
} else {
z.Entries = make([]Entry, xsz)
z.Entries = make([]Entry, zcua)
}
for xvk := range z.Entries {
{
var ssz uint32
ssz, bts, err = msgp.ReadArrayHeaderBytes(bts)
if err != nil {
return
}
if ssz != 2 {
err = msgp.ArrayError{Wanted: 2, Got: ssz}
return
}
}
z.Entries[xvk].Time, bts, err = msgp.ReadInt64Bytes(bts)
for zbai := range z.Entries {
var zxhx uint32
zxhx, bts, err = msgp.ReadArrayHeaderBytes(bts)
if err != nil {
return
}
z.Entries[xvk].Record, bts, err = msgp.ReadIntfBytes(bts)
if zxhx != 2 {
err = msgp.ArrayError{Wanted: 2, Got: zxhx}
return
}
z.Entries[zbai].Time, bts, err = msgp.ReadInt64Bytes(bts)
if err != nil {
return
}
z.Entries[zbai].Record, bts, err = msgp.ReadIntfBytes(bts)
if err != nil {
return
}
@ -252,10 +253,11 @@ func (z *Forward) UnmarshalMsg(bts []byte) (o []byte, err error) {
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *Forward) Msgsize() (s int) {
s = msgp.ArrayHeaderSize + msgp.StringPrefixSize + len(z.Tag) + msgp.ArrayHeaderSize
for xvk := range z.Entries {
s += msgp.ArrayHeaderSize + msgp.Int64Size + msgp.GuessSize(z.Entries[xvk].Record)
s = 1 + msgp.StringPrefixSize + len(z.Tag) + msgp.ArrayHeaderSize
for zbai := range z.Entries {
s += 1 + msgp.Int64Size + msgp.GuessSize(z.Entries[zbai].Record)
}
s += msgp.GuessSize(z.Option)
return
@ -263,13 +265,13 @@ func (z *Forward) Msgsize() (s int) {
// DecodeMsg implements msgp.Decodable
func (z *Message) DecodeMsg(dc *msgp.Reader) (err error) {
var ssz uint32
ssz, err = dc.ReadArrayHeader()
var zlqf uint32
zlqf, err = dc.ReadArrayHeader()
if err != nil {
return
}
if ssz != 4 {
err = msgp.ArrayError{Wanted: 4, Got: ssz}
if zlqf != 4 {
err = msgp.ArrayError{Wanted: 4, Got: zlqf}
return
}
z.Tag, err = dc.ReadString()
@ -293,9 +295,10 @@ func (z *Message) DecodeMsg(dc *msgp.Reader) (err error) {
// EncodeMsg implements msgp.Encodable
func (z *Message) EncodeMsg(en *msgp.Writer) (err error) {
err = en.WriteArrayHeader(4)
// array header, size 4
err = en.Append(0x94)
if err != nil {
return
return err
}
err = en.WriteString(z.Tag)
if err != nil {
@ -319,7 +322,8 @@ func (z *Message) EncodeMsg(en *msgp.Writer) (err error) {
// MarshalMsg implements msgp.Marshaler
func (z *Message) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
o = msgp.AppendArrayHeader(o, 4)
// array header, size 4
o = append(o, 0x94)
o = msgp.AppendString(o, z.Tag)
o = msgp.AppendInt64(o, z.Time)
o, err = msgp.AppendIntf(o, z.Record)
@ -335,16 +339,14 @@ func (z *Message) MarshalMsg(b []byte) (o []byte, err error) {
// UnmarshalMsg implements msgp.Unmarshaler
func (z *Message) UnmarshalMsg(bts []byte) (o []byte, err error) {
{
var ssz uint32
ssz, bts, err = msgp.ReadArrayHeaderBytes(bts)
if err != nil {
return
}
if ssz != 4 {
err = msgp.ArrayError{Wanted: 4, Got: ssz}
return
}
var zdaf uint32
zdaf, bts, err = msgp.ReadArrayHeaderBytes(bts)
if err != nil {
return
}
if zdaf != 4 {
err = msgp.ArrayError{Wanted: 4, Got: zdaf}
return
}
z.Tag, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
@ -366,7 +368,122 @@ func (z *Message) UnmarshalMsg(bts []byte) (o []byte, err error) {
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *Message) Msgsize() (s int) {
s = msgp.ArrayHeaderSize + msgp.StringPrefixSize + len(z.Tag) + msgp.Int64Size + msgp.GuessSize(z.Record) + msgp.GuessSize(z.Option)
s = 1 + msgp.StringPrefixSize + len(z.Tag) + msgp.Int64Size + msgp.GuessSize(z.Record) + msgp.GuessSize(z.Option)
return
}
// DecodeMsg implements msgp.Decodable
func (z *MessageExt) DecodeMsg(dc *msgp.Reader) (err error) {
var zpks uint32
zpks, err = dc.ReadArrayHeader()
if err != nil {
return
}
if zpks != 4 {
err = msgp.ArrayError{Wanted: 4, Got: zpks}
return
}
z.Tag, err = dc.ReadString()
if err != nil {
return
}
err = dc.ReadExtension(&z.Time)
if err != nil {
return
}
z.Record, err = dc.ReadIntf()
if err != nil {
return
}
z.Option, err = dc.ReadIntf()
if err != nil {
return
}
return
}
// EncodeMsg implements msgp.Encodable
func (z *MessageExt) EncodeMsg(en *msgp.Writer) (err error) {
// array header, size 4
err = en.Append(0x94)
if err != nil {
return err
}
err = en.WriteString(z.Tag)
if err != nil {
return
}
err = en.WriteExtension(&z.Time)
if err != nil {
return
}
err = en.WriteIntf(z.Record)
if err != nil {
return
}
err = en.WriteIntf(z.Option)
if err != nil {
return
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z *MessageExt) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// array header, size 4
o = append(o, 0x94)
o = msgp.AppendString(o, z.Tag)
o, err = msgp.AppendExtension(o, &z.Time)
if err != nil {
return
}
o, err = msgp.AppendIntf(o, z.Record)
if err != nil {
return
}
o, err = msgp.AppendIntf(o, z.Option)
if err != nil {
return
}
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *MessageExt) UnmarshalMsg(bts []byte) (o []byte, err error) {
var zjfb uint32
zjfb, bts, err = msgp.ReadArrayHeaderBytes(bts)
if err != nil {
return
}
if zjfb != 4 {
err = msgp.ArrayError{Wanted: 4, Got: zjfb}
return
}
z.Tag, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
return
}
bts, err = msgp.ReadExtensionBytes(bts, &z.Time)
if err != nil {
return
}
z.Record, bts, err = msgp.ReadIntfBytes(bts)
if err != nil {
return
}
z.Option, bts, err = msgp.ReadIntfBytes(bts)
if err != nil {
return
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z *MessageExt) Msgsize() (s int) {
s = 1 + msgp.StringPrefixSize + len(z.Tag) + msgp.ExtensionPrefixSize + z.Time.Len() + msgp.GuessSize(z.Record) + msgp.GuessSize(z.Option)
return
}

View File

@ -0,0 +1,7 @@
package fluent
//go:generate msgp
type TestMessage struct {
Foo string `msg:"foo" json:"foo,omitempty"`
Hoge string `msg:"hoge" json:"hoge,omitempty"`
}

View File

@ -0,0 +1,125 @@
package fluent
// NOTE: THIS FILE WAS PRODUCED BY THE
// MSGP CODE GENERATION TOOL (github.com/tinylib/msgp)
// DO NOT EDIT
import (
"github.com/tinylib/msgp/msgp"
)
// DecodeMsg implements msgp.Decodable
func (z *TestMessage) DecodeMsg(dc *msgp.Reader) (err error) {
var field []byte
_ = field
var zxvk uint32
zxvk, err = dc.ReadMapHeader()
if err != nil {
return
}
for zxvk > 0 {
zxvk--
field, err = dc.ReadMapKeyPtr()
if err != nil {
return
}
switch msgp.UnsafeString(field) {
case "foo":
z.Foo, err = dc.ReadString()
if err != nil {
return
}
case "hoge":
z.Hoge, err = dc.ReadString()
if err != nil {
return
}
default:
err = dc.Skip()
if err != nil {
return
}
}
}
return
}
// EncodeMsg implements msgp.Encodable
func (z TestMessage) EncodeMsg(en *msgp.Writer) (err error) {
// map header, size 2
// write "foo"
err = en.Append(0x82, 0xa3, 0x66, 0x6f, 0x6f)
if err != nil {
return err
}
err = en.WriteString(z.Foo)
if err != nil {
return
}
// write "hoge"
err = en.Append(0xa4, 0x68, 0x6f, 0x67, 0x65)
if err != nil {
return err
}
err = en.WriteString(z.Hoge)
if err != nil {
return
}
return
}
// MarshalMsg implements msgp.Marshaler
func (z TestMessage) MarshalMsg(b []byte) (o []byte, err error) {
o = msgp.Require(b, z.Msgsize())
// map header, size 2
// string "foo"
o = append(o, 0x82, 0xa3, 0x66, 0x6f, 0x6f)
o = msgp.AppendString(o, z.Foo)
// string "hoge"
o = append(o, 0xa4, 0x68, 0x6f, 0x67, 0x65)
o = msgp.AppendString(o, z.Hoge)
return
}
// UnmarshalMsg implements msgp.Unmarshaler
func (z *TestMessage) UnmarshalMsg(bts []byte) (o []byte, err error) {
var field []byte
_ = field
var zbzg uint32
zbzg, bts, err = msgp.ReadMapHeaderBytes(bts)
if err != nil {
return
}
for zbzg > 0 {
zbzg--
field, bts, err = msgp.ReadMapKeyZC(bts)
if err != nil {
return
}
switch msgp.UnsafeString(field) {
case "foo":
z.Foo, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
return
}
case "hoge":
z.Hoge, bts, err = msgp.ReadStringBytes(bts)
if err != nil {
return
}
default:
bts, err = msgp.Skip(bts)
if err != nil {
return
}
}
}
o = bts
return
}
// Msgsize returns an upper bound estimate of the number of bytes occupied by the serialized message
func (z TestMessage) Msgsize() (s int) {
s = 1 + 4 + msgp.StringPrefixSize + len(z.Foo) + 5 + msgp.StringPrefixSize + len(z.Hoge)
return
}

View File

@ -1,3 +1,3 @@
package fluent
const Version = "1.2.1"
const Version = "1.3.0"

View File

@ -0,0 +1,23 @@
package ttrpc
import "github.com/pkg/errors"
type serverConfig struct {
handshaker Handshaker
}
type ServerOpt func(*serverConfig) error
// WithServerHandshaker can be passed to NewServer to ensure that the
// handshaker is called before every connection attempt.
//
// Only one handshaker is allowed per server.
func WithServerHandshaker(handshaker Handshaker) ServerOpt {
return func(c *serverConfig) error {
if c.handshaker != nil {
return errors.New("only one handshaker allowed per server")
}
c.handshaker = handshaker
return nil
}
}

View File

@ -0,0 +1,34 @@
package ttrpc
import (
"context"
"net"
)
// Handshaker defines the interface for connection handshakes performed on the
// server or client when first connecting.
type Handshaker interface {
// Handshake should confirm or decorate a connection that may be incoming
// to a server or outgoing from a client.
//
// If this returns without an error, the caller should use the connection
// in place of the original connection.
//
// The second return value can contain credential specific data, such as
// unix socket credentials or TLS information.
//
// While we currently only have implementations on the server-side, this
// interface should be sufficient to implement similar handshakes on the
// client-side.
Handshake(ctx context.Context, conn net.Conn) (net.Conn, interface{}, error)
}
type handshakerFunc func(ctx context.Context, conn net.Conn) (net.Conn, interface{}, error)
func (fn handshakerFunc) Handshake(ctx context.Context, conn net.Conn) (net.Conn, interface{}, error) {
return fn(ctx, conn)
}
func noopHandshake(ctx context.Context, conn net.Conn) (net.Conn, interface{}, error) {
return conn, nil, nil
}

View File

@ -2,6 +2,7 @@ package ttrpc
import (
"context"
"io"
"math/rand"
"net"
"sync"
@ -19,6 +20,7 @@ var (
)
type Server struct {
config *serverConfig
services *serviceSet
codec codec
@ -28,13 +30,21 @@ type Server struct {
done chan struct{} // marks point at which we stop serving requests
}
func NewServer() *Server {
func NewServer(opts ...ServerOpt) (*Server, error) {
config := &serverConfig{}
for _, opt := range opts {
if err := opt(config); err != nil {
return nil, err
}
}
return &Server{
config: config,
services: newServiceSet(),
done: make(chan struct{}),
listeners: make(map[net.Listener]struct{}),
connections: make(map[*serverConn]struct{}),
}
}, nil
}
func (s *Server) Register(name string, methods map[string]Method) {
@ -46,10 +56,15 @@ func (s *Server) Serve(l net.Listener) error {
defer s.closeListener(l)
var (
ctx = context.Background()
backoff time.Duration
ctx = context.Background()
backoff time.Duration
handshaker = s.config.handshaker
)
if handshaker == nil {
handshaker = handshakerFunc(noopHandshake)
}
for {
conn, err := l.Accept()
if err != nil {
@ -82,7 +97,15 @@ func (s *Server) Serve(l net.Listener) error {
}
backoff = 0
sc := s.newConn(conn)
approved, handshake, err := handshaker.Handshake(ctx, conn)
if err != nil {
log.L.WithError(err).Errorf("ttrpc: refusing connection after handshake")
conn.Close()
continue
}
sc := s.newConn(approved, handshake)
go sc.run(ctx)
}
}
@ -205,11 +228,12 @@ func (cs connState) String() string {
}
}
func (s *Server) newConn(conn net.Conn) *serverConn {
func (s *Server) newConn(conn net.Conn, handshake interface{}) *serverConn {
c := &serverConn{
server: s,
conn: conn,
shutdown: make(chan struct{}),
server: s,
conn: conn,
handshake: handshake,
shutdown: make(chan struct{}),
}
c.setState(connStateIdle)
s.addConnection(c)
@ -217,9 +241,10 @@ func (s *Server) newConn(conn net.Conn) *serverConn {
}
type serverConn struct {
server *Server
conn net.Conn
state atomic.Value
server *Server
conn net.Conn
handshake interface{} // data from handshake, not used for now
state atomic.Value
shutdownOnce sync.Once
shutdown chan struct{} // forced shutdown, used by close
@ -406,7 +431,7 @@ func (c *serverConn) run(sctx context.Context) {
// branch. Basically, it means that we are no longer receiving
// requests due to a terminal error.
recvErr = nil // connection is now "closing"
if err != nil {
if err != nil && err != io.EOF {
log.L.WithError(err).Error("error receiving message")
}
case <-shutdown:

View File

@ -0,0 +1,92 @@
package ttrpc
import (
"context"
"net"
"os"
"syscall"
"github.com/pkg/errors"
"golang.org/x/sys/unix"
)
type UnixCredentialsFunc func(*unix.Ucred) error
func (fn UnixCredentialsFunc) Handshake(ctx context.Context, conn net.Conn) (net.Conn, interface{}, error) {
uc, err := requireUnixSocket(conn)
if err != nil {
return nil, nil, errors.Wrap(err, "ttrpc.UnixCredentialsFunc: require unix socket")
}
rs, err := uc.SyscallConn()
if err != nil {
return nil, nil, errors.Wrap(err, "ttrpc.UnixCredentialsFunc: (net.UnixConn).SyscallConn failed")
}
var (
ucred *unix.Ucred
ucredErr error
)
if err := rs.Control(func(fd uintptr) {
ucred, ucredErr = unix.GetsockoptUcred(int(fd), unix.SOL_SOCKET, unix.SO_PEERCRED)
}); err != nil {
return nil, nil, errors.Wrapf(err, "ttrpc.UnixCredentialsFunc: (*syscall.RawConn).Control failed")
}
if ucredErr != nil {
return nil, nil, errors.Wrapf(err, "ttrpc.UnixCredentialsFunc: failed to retrieve socket peer credentials")
}
if err := fn(ucred); err != nil {
return nil, nil, errors.Wrapf(err, "ttrpc.UnixCredentialsFunc: credential check failed")
}
return uc, ucred, nil
}
// UnixSocketRequireUidGid requires specific *effective* UID/GID, rather than the real UID/GID.
//
// For example, if a daemon binary is owned by the root (UID 0) with SUID bit but running as an
// unprivileged user (UID 1001), the effective UID becomes 0, and the real UID becomes 1001.
// So calling this function with uid=0 allows a connection from effective UID 0 but rejects
// a connection from effective UID 1001.
//
// See socket(7), SO_PEERCRED: "The returned credentials are those that were in effect at the time of the call to connect(2) or socketpair(2)."
func UnixSocketRequireUidGid(uid, gid int) UnixCredentialsFunc {
return func(ucred *unix.Ucred) error {
return requireUidGid(ucred, uid, gid)
}
}
func UnixSocketRequireRoot() UnixCredentialsFunc {
return UnixSocketRequireUidGid(0, 0)
}
// UnixSocketRequireSameUser resolves the current effective unix user and returns a
// UnixCredentialsFunc that will validate incoming unix connections against the
// current credentials.
//
// This is useful when using abstract sockets that are accessible by all users.
func UnixSocketRequireSameUser() UnixCredentialsFunc {
euid, egid := os.Geteuid(), os.Getegid()
return UnixSocketRequireUidGid(euid, egid)
}
func requireRoot(ucred *unix.Ucred) error {
return requireUidGid(ucred, 0, 0)
}
func requireUidGid(ucred *unix.Ucred, uid, gid int) error {
if (uid != -1 && uint32(uid) != ucred.Uid) || (gid != -1 && uint32(gid) != ucred.Gid) {
return errors.Wrap(syscall.EPERM, "ttrpc: invalid credentials")
}
return nil
}
func requireUnixSocket(conn net.Conn) (*net.UnixConn, error) {
uc, ok := conn.(*net.UnixConn)
if !ok {
return nil, errors.New("a unix socket connection is required")
}
return uc, nil
}

View File

@ -1,15 +1,12 @@
MessagePack Code Generator [![Build Status](https://travis-ci.org/tinylib/msgp.svg?branch=master)](https://travis-ci.org/tinylib/msgp)
=======
[![forthebadge](http://forthebadge.com/badges/uses-badges.svg)](http://forthebadge.com)
[![forthebadge](http://forthebadge.com/badges/ages-12.svg)](http://forthebadge.com)
This is a code generation tool and serialization library for [MessagePack](http://msgpack.org). It is targeted at the `go generate` [tool](http://tip.golang.org/cmd/go/#hdr-Generate_Go_files_by_processing_source). You can read more about MessagePack [in the wiki](http://github.com/tinylib/msgp/wiki), or at [msgpack.org](http://msgpack.org).
This is a code generation tool and serialization library for [MessagePack](http://msgpack.org). You can read more about MessagePack [in the wiki](http://github.com/tinylib/msgp/wiki), or at [msgpack.org](http://msgpack.org).
### Why?
- Use Go as your schema language
- Speeeeeed (400MB/s on modern hardware)
- Performance
- [JSON interop](http://godoc.org/github.com/tinylib/msgp/msgp#CopyToJSON)
- [User-defined extensions](http://github.com/tinylib/msgp/wiki/Using-Extensions)
- Type safety
@ -17,8 +14,6 @@ This is a code generation tool and serialization library for [MessagePack](http:
### Quickstart
Note: you need at least go 1.3 to compile this package, and at least go 1.4 to use `go generate`.
In a source file, include the following directive:
```go
@ -45,7 +40,7 @@ type Person struct {
By default, the code generator will satisfy `msgp.Sizer`, `msgp.Encodable`, `msgp.Decodable`,
`msgp.Marshaler`, and `msgp.Unmarshaler`. Carefully-designed applications can use these methods to do
marshalling/unmarshalling with zero allocations.
marshalling/unmarshalling with zero heap allocations.
While `msgp.Marshaler` and `msgp.Unmarshaler` are quite similar to the standard library's
`json.Marshaler` and `json.Unmarshaler`, `msgp.Encodable` and `msgp.Decodable` are useful for
@ -62,6 +57,7 @@ of `*bufio.Writer` and `*bufio.Reader`, respectively.)
- Generation of both `[]byte`-oriented and `io.Reader/io.Writer`-oriented methods
- Support for arbitrary type system extensions
- [Preprocessor directives](http://github.com/tinylib/msgp/wiki/Preprocessor-Directives)
- File-based dependency model means fast codegen regardless of source tree size.
Consider the following:
```go
@ -84,21 +80,23 @@ the data "type" (`int8`) and the raw binary. You [can see a worked example in th
### Status
Alpha. I _will_ break stuff. There is an open milestone for Beta stability (targeted for January.) Only the `/msgp` sub-directory will have a stability guarantee.
Mostly stable, in that no breaking changes have been made to the `/msgp` library in more than a year. Newer versions
of the code may generate different code than older versions for performance reasons. I (@philhofer) am aware of a
number of stability-critical commercial applications that use this code with good results. But, caveat emptor.
You can read more about how `msgp` maps MessagePack types onto Go types [in the wiki](http://github.com/tinylib/msgp/wiki).
Here some of the known limitations/restrictions:
- Identifiers from outside the processed source file are assumed (optimistically) to satisfy the generator's interfaces. If this isn't the case, your code will fail to compile.
- Like most serializers, `chan` and `func` fields are ignored, as well as non-exported fields.
- Encoding of `interface{}` is limited to built-ins or types that have explicit encoding methods.
- _Maps must have `string` keys._ This is intentional (as it preserves JSON interop.) Although non-string map keys are not forbidden by the MessagePack standard, many serializers impose this restriction. (It also means *any* well-formed `struct` can be de-serialized into a `map[string]interface{}`.) The only exception to this rule is that the deserializers will allow you to read map keys encoded as `bin` types, due to the fact that some legacy encodings permitted this. (However, those values will still be cast to Go `string`s, and they will be converted to `str` types when re-encoded. It is the responsibility of the user to ensure that map keys are UTF-8 safe in this case.) The same rules hold true for JSON translation.
- Identifiers from outside the processed source file are assumed (optimistically) to satisfy the generator's interfaces. If this isn't the case, your code will fail to compile.
- Like most serializers, `chan` and `func` fields are ignored, as well as non-exported fields.
- Encoding of `interface{}` is limited to built-ins or types that have explicit encoding methods.
- _Maps must have `string` keys._ This is intentional (as it preserves JSON interop.) Although non-string map keys are not forbidden by the MessagePack standard, many serializers impose this restriction. (It also means *any* well-formed `struct` can be de-serialized into a `map[string]interface{}`.) The only exception to this rule is that the deserializers will allow you to read map keys encoded as `bin` types, due to the fact that some legacy encodings permitted this. (However, those values will still be cast to Go `string`s, and they will be converted to `str` types when re-encoded. It is the responsibility of the user to ensure that map keys are UTF-8 safe in this case.) The same rules hold true for JSON translation.
If the output compiles, then there's a pretty good chance things are fine. (Plus, we generate tests for you.) *Please, please, please* file an issue if you think the generator is writing broken code.
### Performance
If you like benchmarks, see [here.](https://github.com/alecthomas/go_serialization_benchmarks)
If you like benchmarks, see [here](http://bravenewgeek.com/so-you-wanna-go-fast/) and [here](https://github.com/alecthomas/go_serialization_benchmarks).
As one might expect, the generated methods that deal with `[]byte` are faster, but the `io.Reader/Writer` methods are generally more memory-efficient for large (> 2KB) objects.
As one might expect, the generated methods that deal with `[]byte` are faster for small objects, but the `io.Reader/Writer` methods are generally more memory-efficient (and, at some point, faster) for large (> 2KB) objects.

View File

@ -0,0 +1,24 @@
// +build linux,!appengine
package msgp
import (
"os"
"syscall"
)
func adviseRead(mem []byte) {
syscall.Madvise(mem, syscall.MADV_SEQUENTIAL|syscall.MADV_WILLNEED)
}
func adviseWrite(mem []byte) {
syscall.Madvise(mem, syscall.MADV_SEQUENTIAL)
}
func fallocate(f *os.File, sz int64) error {
err := syscall.Fallocate(int(f.Fd()), 0, 0, sz)
if err == syscall.ENOTSUP {
return f.Truncate(sz)
}
return err
}

View File

@ -0,0 +1,17 @@
// +build !linux appengine
package msgp
import (
"os"
)
// TODO: darwin, BSD support
func adviseRead(mem []byte) {}
func adviseWrite(mem []byte) {}
func fallocate(f *os.File, sz int64) error {
return f.Truncate(sz)
}

View File

@ -0,0 +1,15 @@
// +build appengine
package msgp
// let's just assume appengine
// uses 64-bit hardware...
const smallint = false
func UnsafeString(b []byte) string {
return string(b)
}
func UnsafeBytes(s string) []byte {
return []byte(s)
}

View File

@ -1,20 +1,21 @@
package msgp
import (
"testing"
)
type timer interface {
StartTimer()
StopTimer()
}
// EndlessReader is an io.Reader
// that loops over the same data
// endlessly. It is used for benchmarking.
type EndlessReader struct {
tb *testing.B
tb timer
data []byte
offset int
}
// NewEndlessReader returns a new endless reader
func NewEndlessReader(b []byte, tb *testing.B) *EndlessReader {
func NewEndlessReader(b []byte, tb timer) *EndlessReader {
return &EndlessReader{tb: tb, data: b, offset: 0}
}

View File

@ -226,7 +226,7 @@ func (mw *Writer) WriteExtension(e Extension) error {
// peek at the extension type, assuming the next
// kind to be read is Extension
func (m *Reader) peekExtensionType() (int8, error) {
p, err := m.r.Peek(2)
p, err := m.R.Peek(2)
if err != nil {
return 0, err
}
@ -238,7 +238,7 @@ func (m *Reader) peekExtensionType() (int8, error) {
return int8(p[1]), nil
}
size := spec.size
p, err = m.r.Peek(int(size))
p, err = m.R.Peek(int(size))
if err != nil {
return 0, err
}
@ -273,7 +273,7 @@ func peekExtension(b []byte) (int8, error) {
// e.Type() is not the same as the wire type.
func (m *Reader) ReadExtension(e Extension) (err error) {
var p []byte
p, err = m.r.Peek(2)
p, err = m.R.Peek(2)
if err != nil {
return
}
@ -286,13 +286,13 @@ func (m *Reader) ReadExtension(e Extension) (err error) {
err = errExt(int8(p[1]), e.ExtensionType())
return
}
p, err = m.r.Peek(3)
p, err = m.R.Peek(3)
if err != nil {
return
}
err = e.UnmarshalBinary(p[2:])
if err == nil {
_, err = m.r.Skip(3)
_, err = m.R.Skip(3)
}
return
@ -301,13 +301,13 @@ func (m *Reader) ReadExtension(e Extension) (err error) {
err = errExt(int8(p[1]), e.ExtensionType())
return
}
p, err = m.r.Peek(4)
p, err = m.R.Peek(4)
if err != nil {
return
}
err = e.UnmarshalBinary(p[2:])
if err == nil {
_, err = m.r.Skip(4)
_, err = m.R.Skip(4)
}
return
@ -316,13 +316,13 @@ func (m *Reader) ReadExtension(e Extension) (err error) {
err = errExt(int8(p[1]), e.ExtensionType())
return
}
p, err = m.r.Peek(6)
p, err = m.R.Peek(6)
if err != nil {
return
}
err = e.UnmarshalBinary(p[2:])
if err == nil {
_, err = m.r.Skip(6)
_, err = m.R.Skip(6)
}
return
@ -331,13 +331,13 @@ func (m *Reader) ReadExtension(e Extension) (err error) {
err = errExt(int8(p[1]), e.ExtensionType())
return
}
p, err = m.r.Peek(10)
p, err = m.R.Peek(10)
if err != nil {
return
}
err = e.UnmarshalBinary(p[2:])
if err == nil {
_, err = m.r.Skip(10)
_, err = m.R.Skip(10)
}
return
@ -346,18 +346,18 @@ func (m *Reader) ReadExtension(e Extension) (err error) {
err = errExt(int8(p[1]), e.ExtensionType())
return
}
p, err = m.r.Peek(18)
p, err = m.R.Peek(18)
if err != nil {
return
}
err = e.UnmarshalBinary(p[2:])
if err == nil {
_, err = m.r.Skip(18)
_, err = m.R.Skip(18)
}
return
case mext8:
p, err = m.r.Peek(3)
p, err = m.R.Peek(3)
if err != nil {
return
}
@ -369,7 +369,7 @@ func (m *Reader) ReadExtension(e Extension) (err error) {
off = 3
case mext16:
p, err = m.r.Peek(4)
p, err = m.R.Peek(4)
if err != nil {
return
}
@ -381,7 +381,7 @@ func (m *Reader) ReadExtension(e Extension) (err error) {
off = 4
case mext32:
p, err = m.r.Peek(6)
p, err = m.R.Peek(6)
if err != nil {
return
}
@ -397,13 +397,13 @@ func (m *Reader) ReadExtension(e Extension) (err error) {
return
}
p, err = m.r.Peek(read + off)
p, err = m.R.Peek(read + off)
if err != nil {
return
}
err = e.UnmarshalBinary(p[off:])
if err == nil {
_, err = m.r.Skip(read + off)
_, err = m.R.Skip(read + off)
}
return
}

View File

@ -0,0 +1,92 @@
// +build linux darwin dragonfly freebsd netbsd openbsd
// +build !appengine
package msgp
import (
"os"
"syscall"
)
// ReadFile reads a file into 'dst' using
// a read-only memory mapping. Consequently,
// the file must be mmap-able, and the
// Unmarshaler should never write to
// the source memory. (Methods generated
// by the msgp tool obey that constraint, but
// user-defined implementations may not.)
//
// Reading and writing through file mappings
// is only efficient for large files; small
// files are best read and written using
// the ordinary streaming interfaces.
//
func ReadFile(dst Unmarshaler, file *os.File) error {
stat, err := file.Stat()
if err != nil {
return err
}
data, err := syscall.Mmap(int(file.Fd()), 0, int(stat.Size()), syscall.PROT_READ, syscall.MAP_SHARED)
if err != nil {
return err
}
adviseRead(data)
_, err = dst.UnmarshalMsg(data)
uerr := syscall.Munmap(data)
if err == nil {
err = uerr
}
return err
}
// MarshalSizer is the combination
// of the Marshaler and Sizer
// interfaces.
type MarshalSizer interface {
Marshaler
Sizer
}
// WriteFile writes a file from 'src' using
// memory mapping. It overwrites the entire
// contents of the previous file.
// The mapping size is calculated
// using the `Msgsize()` method
// of 'src', so it must produce a result
// equal to or greater than the actual encoded
// size of the object. Otherwise,
// a fault (SIGBUS) will occur.
//
// Reading and writing through file mappings
// is only efficient for large files; small
// files are best read and written using
// the ordinary streaming interfaces.
//
// NOTE: The performance of this call
// is highly OS- and filesystem-dependent.
// Users should take care to test that this
// performs as expected in a production environment.
// (Linux users should run a kernel and filesystem
// that support fallocate(2) for the best results.)
func WriteFile(src MarshalSizer, file *os.File) error {
sz := src.Msgsize()
err := fallocate(file, int64(sz))
if err != nil {
return err
}
data, err := syscall.Mmap(int(file.Fd()), 0, sz, syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED)
if err != nil {
return err
}
adviseWrite(data)
chunk := data[:0]
chunk, err = src.MarshalMsg(chunk)
if err != nil {
return err
}
uerr := syscall.Munmap(data)
if uerr != nil {
return uerr
}
return file.Truncate(int64(len(chunk)))
}

View File

@ -0,0 +1,47 @@
// +build windows appengine
package msgp
import (
"io/ioutil"
"os"
)
// MarshalSizer is the combination
// of the Marshaler and Sizer
// interfaces.
type MarshalSizer interface {
Marshaler
Sizer
}
func ReadFile(dst Unmarshaler, file *os.File) error {
if u, ok := dst.(Decodable); ok {
return u.DecodeMsg(NewReader(file))
}
data, err := ioutil.ReadAll(file)
if err != nil {
return err
}
_, err = dst.UnmarshalMsg(data)
return err
}
func WriteFile(src MarshalSizer, file *os.File) error {
if e, ok := src.(Encodable); ok {
w := NewWriter(file)
err := e.EncodeMsg(w)
if err == nil {
err = w.Flush()
}
return err
}
raw, err := src.MarshalMsg(nil)
if err != nil {
return err
}
_, err = file.Write(raw)
return err
}

View File

@ -66,7 +66,7 @@ func (r *Reader) WriteToJSON(w io.Writer) (n int64, err error) {
if jsw, ok := w.(jsWriter); ok {
j = jsw
} else {
bf = bufio.NewWriterSize(w, 512)
bf = bufio.NewWriter(w)
j = bf
}
var nn int
@ -333,7 +333,7 @@ func rwExtension(dst jsWriter, src *Reader) (n int, err error) {
func rwString(dst jsWriter, src *Reader) (n int, err error) {
var p []byte
p, err = src.r.Peek(1)
p, err = src.R.Peek(1)
if err != nil {
return
}
@ -342,25 +342,25 @@ func rwString(dst jsWriter, src *Reader) (n int, err error) {
if isfixstr(lead) {
read = int(rfixstr(lead))
src.r.Skip(1)
src.R.Skip(1)
goto write
}
switch lead {
case mstr8:
p, err = src.r.Next(2)
p, err = src.R.Next(2)
if err != nil {
return
}
read = int(uint8(p[1]))
case mstr16:
p, err = src.r.Next(3)
p, err = src.R.Next(3)
if err != nil {
return
}
read = int(big.Uint16(p[1:]))
case mstr32:
p, err = src.r.Next(5)
p, err = src.R.Next(5)
if err != nil {
return
}
@ -370,7 +370,7 @@ func rwString(dst jsWriter, src *Reader) (n int, err error) {
return
}
write:
p, err = src.r.Next(read)
p, err = src.R.Next(read)
if err != nil {
return
}

View File

@ -1,11 +1,105 @@
package msgp
import (
"math"
"strconv"
)
// The portable parts of the Number implementation
// Number can be
// an int64, uint64, float32,
// or float64 internally.
// It can decode itself
// from any of the native
// messagepack number types.
// The zero-value of Number
// is Int(0). Using the equality
// operator with Number compares
// both the type and the value
// of the number.
type Number struct {
// internally, this
// is just a tagged union.
// the raw bits of the number
// are stored the same way regardless.
bits uint64
typ Type
}
// AsInt sets the number to an int64.
func (n *Number) AsInt(i int64) {
// we always store int(0)
// as {0, InvalidType} in
// order to preserve
// the behavior of the == operator
if i == 0 {
n.typ = InvalidType
n.bits = 0
return
}
n.typ = IntType
n.bits = uint64(i)
}
// AsUint sets the number to a uint64.
func (n *Number) AsUint(u uint64) {
n.typ = UintType
n.bits = u
}
// AsFloat32 sets the value of the number
// to a float32.
func (n *Number) AsFloat32(f float32) {
n.typ = Float32Type
n.bits = uint64(math.Float32bits(f))
}
// AsFloat64 sets the value of the
// number to a float64.
func (n *Number) AsFloat64(f float64) {
n.typ = Float64Type
n.bits = math.Float64bits(f)
}
// Int casts the number as an int64, and
// returns whether or not that was the
// underlying type.
func (n *Number) Int() (int64, bool) {
return int64(n.bits), n.typ == IntType || n.typ == InvalidType
}
// Uint casts the number as a uint64, and returns
// whether or not that was the underlying type.
func (n *Number) Uint() (uint64, bool) {
return n.bits, n.typ == UintType
}
// Float casts the number to a float64, and
// returns whether or not that was the underlying
// type (either a float64 or a float32).
func (n *Number) Float() (float64, bool) {
switch n.typ {
case Float32Type:
return float64(math.Float32frombits(uint32(n.bits))), true
case Float64Type:
return math.Float64frombits(n.bits), true
default:
return 0.0, false
}
}
// Type will return one of:
// Float64Type, Float32Type, UintType, or IntType.
func (n *Number) Type() Type {
if n.typ == InvalidType {
return IntType
}
return n.typ
}
// DecodeMsg implements msgp.Decodable
func (n *Number) DecodeMsg(r *Reader) error {
typ, err := r.NextType()
@ -83,6 +177,38 @@ func (n *Number) UnmarshalMsg(b []byte) ([]byte, error) {
}
}
// MarshalMsg implements msgp.Marshaler
func (n *Number) MarshalMsg(b []byte) ([]byte, error) {
switch n.typ {
case IntType:
return AppendInt64(b, int64(n.bits)), nil
case UintType:
return AppendUint64(b, uint64(n.bits)), nil
case Float64Type:
return AppendFloat64(b, math.Float64frombits(n.bits)), nil
case Float32Type:
return AppendFloat32(b, math.Float32frombits(uint32(n.bits))), nil
default:
return AppendInt64(b, 0), nil
}
}
// EncodeMsg implements msgp.Encodable
func (n *Number) EncodeMsg(w *Writer) error {
switch n.typ {
case IntType:
return w.WriteInt64(int64(n.bits))
case UintType:
return w.WriteUint64(n.bits)
case Float64Type:
return w.WriteFloat64(math.Float64frombits(n.bits))
case Float32Type:
return w.WriteFloat32(math.Float32frombits(uint32(n.bits)))
default:
return w.WriteInt64(0)
}
}
// Msgsize implements msgp.Sizer
func (n *Number) Msgsize() int {
switch n.typ {
@ -121,6 +247,7 @@ func (n *Number) MarshalJSON() ([]byte, error) {
}
}
// String implements fmt.Stringer
func (n *Number) String() string {
switch n.typ {
case InvalidType:

View File

@ -1,101 +0,0 @@
// +build appengine
package msgp
// let's just assume appengine
// uses 64-bit hardware...
const smallint = false
func UnsafeString(b []byte) string {
return string(b)
}
func UnsafeBytes(s string) []byte {
return []byte(s)
}
type Number struct {
ibits uint64 // zero or bits
fbits float64 // zero or bits
typ Type // zero or type
}
func (n *Number) AsFloat64(f float64) {
n.typ = Float64Type
n.fbits = f
n.ibits = 0
}
func (n *Number) AsFloat32(f float32) {
n.typ = Float32Type
n.fbits = float64(f)
n.ibits = 0
}
func (n *Number) AsInt(i int64) {
n.fbits = 0
if i == 0 {
n.typ = InvalidType
n.ibits = 0
return
}
n.ibits = uint64(i)
n.typ = IntType
}
func (n *Number) AsUint(u uint64) {
n.ibits = u
n.fbits = 0
n.typ = UintType
}
func (n *Number) Float() (float64, bool) {
return n.fbits, n.typ == Float64Type || n.typ == Float32Type
}
func (n *Number) Int() (int64, bool) {
return int64(n.ibits), n.typ == IntType
}
func (n *Number) Uint() (uint64, bool) {
return n.ibits, n.typ == UintType
}
func (n *Number) Type() Type {
if n.typ == InvalidType {
return IntType
}
return n.typ
}
func (n *Number) MarshalMsg(o []byte) ([]byte, error) {
switch n.typ {
case InvalidType:
return AppendInt64(o, 0), nil
case IntType:
return AppendInt64(o, int64(n.ibits)), nil
case UintType:
return AppendUint64(o, n.ibits), nil
case Float32Type:
return AppendFloat32(o, float32(n.fbits)), nil
case Float64Type:
return AppendFloat64(o, n.fbits), nil
}
panic("unreachable code!")
}
func (n *Number) EncodeMsg(w *Writer) error {
switch n.typ {
case InvalidType:
return w.WriteInt64(0)
case IntType:
return w.WriteInt64(int64(n.ibits))
case UintType:
return w.WriteUint64(n.ibits)
case Float32Type:
return w.WriteFloat32(float32(n.fbits))
case Float64Type:
return w.WriteFloat64(n.fbits)
}
panic("unreachable code!")
}

View File

@ -1,159 +0,0 @@
// +build !appengine
package msgp
import (
"reflect"
"unsafe"
)
const (
// spec says int and uint are always
// the same size, but that int/uint
// size may not be machine word size
smallint = unsafe.Sizeof(int(0)) == 4
)
// UnsafeString returns the byte slice as a volatile string
// THIS SHOULD ONLY BE USED BY THE CODE GENERATOR.
// THIS IS EVIL CODE.
// YOU HAVE BEEN WARNED.
func UnsafeString(b []byte) string {
return *(*string)(unsafe.Pointer(&reflect.StringHeader{Data: uintptr(unsafe.Pointer(&b[0])), Len: len(b)}))
}
// UnsafeBytes returns the string as a byte slice
// THIS SHOULD ONLY BE USED BY THE CODE GENERATOR.
// THIS IS EVIL CODE.
// YOU HAVE BEEN WARNED.
func UnsafeBytes(s string) []byte {
return *(*[]byte)(unsafe.Pointer(&reflect.SliceHeader{
Len: len(s),
Cap: len(s),
Data: (*(*reflect.StringHeader)(unsafe.Pointer(&s))).Data,
}))
}
// Number can be
// an int64, uint64, float32,
// or float64 internally.
// It can decode itself
// from any of the native
// messagepack number types.
// The zero-value of Number
// is Int(0). Using the equality
// operator with Number compares
// both the type and the value
// of the number.
type Number struct {
// internally, this
// is just a tagged union.
// the raw bits of the number
// are stored the same way regardless.
bits uint64
typ Type
}
// AsFloat64 sets the number to
// a float64.
func (n *Number) AsFloat64(f float64) {
n.typ = Float64Type
n.bits = *(*uint64)(unsafe.Pointer(&f))
}
// AsInt sets the number to an int64.
func (n *Number) AsInt(i int64) {
// we always store int(0)
// as {0, InvalidType} in
// order to preserve
// the behavior of the == operator
if i == 0 {
n.typ = InvalidType
n.bits = 0
return
}
n.typ = IntType
n.bits = uint64(i)
}
// AsUint sets the number to a uint64.
func (n *Number) AsUint(u uint64) {
n.typ = UintType
n.bits = u
}
// AsFloat32 sets the number to a float32.
func (n *Number) AsFloat32(f float32) {
n.typ = Float32Type
g := float64(f)
n.bits = *(*uint64)(unsafe.Pointer(&g))
}
// Type will return one of:
// Float64Type, Float32Type, UintType, or IntType.
func (n *Number) Type() Type {
if n.typ == InvalidType {
return IntType
}
return n.typ
}
// Float casts the number of the float,
// and returns whether or not that was
// the underlying type. (This is legal
// for both float32 and float64 types.)
func (n *Number) Float() (float64, bool) {
return *(*float64)(unsafe.Pointer(&n.bits)), n.typ == Float64Type || n.typ == Float32Type
}
// Int casts the number as an int64, and
// returns whether or not that was the
// underlying type.
func (n *Number) Int() (int64, bool) {
return int64(n.bits), n.typ == IntType || n.typ == InvalidType
}
// Uint casts the number as a uint64, and returns
// whether or not that was the underlying type.
func (n *Number) Uint() (uint64, bool) {
return n.bits, n.typ == UintType
}
// EncodeMsg implements msgp.Encodable
func (n *Number) EncodeMsg(w *Writer) error {
switch n.typ {
case InvalidType:
return w.WriteInt(0)
case IntType:
return w.WriteInt64(int64(n.bits))
case UintType:
return w.WriteUint64(n.bits)
case Float64Type:
return w.WriteFloat64(*(*float64)(unsafe.Pointer(&n.bits)))
case Float32Type:
return w.WriteFloat32(float32(*(*float64)(unsafe.Pointer(&n.bits))))
default:
// this should never ever happen
panic("(*Number).typ is invalid")
}
}
// MarshalMsg implements msgp.Marshaler
func (n *Number) MarshalMsg(b []byte) ([]byte, error) {
switch n.typ {
case InvalidType:
return AppendInt(b, 0), nil
case IntType:
return AppendInt64(b, int64(n.bits)), nil
case UintType:
return AppendUint64(b, n.bits), nil
case Float64Type:
return AppendFloat64(b, *(*float64)(unsafe.Pointer(&n.bits))), nil
case Float32Type:
return AppendFloat32(b, float32(*(*float64)(unsafe.Pointer(&n.bits)))), nil
default:
panic("(*Number).typ is invalid")
}
}

View File

@ -1,11 +1,12 @@
package msgp
import (
"github.com/philhofer/fwd"
"io"
"math"
"sync"
"time"
"github.com/philhofer/fwd"
)
// where we keep old *Readers
@ -111,10 +112,10 @@ func Decode(r io.Reader, d Decodable) error {
// reader will be buffered.
func NewReader(r io.Reader) *Reader {
p := readerPool.Get().(*Reader)
if p.r == nil {
p.r = fwd.NewReader(r)
if p.R == nil {
p.R = fwd.NewReader(r)
} else {
p.r.Reset(r)
p.R.Reset(r)
}
return p
}
@ -122,39 +123,96 @@ func NewReader(r io.Reader) *Reader {
// NewReaderSize returns a *Reader with a buffer of the given size.
// (This is vastly preferable to passing the decoder a reader that is already buffered.)
func NewReaderSize(r io.Reader, sz int) *Reader {
return &Reader{r: fwd.NewReaderSize(r, sz)}
return &Reader{R: fwd.NewReaderSize(r, sz)}
}
// Reader wraps an io.Reader and provides
// methods to read MessagePack-encoded values
// from it. Readers are buffered.
type Reader struct {
r *fwd.Reader
// R is the buffered reader
// that the Reader uses
// to decode MessagePack.
// The Reader itself
// is stateless; all the
// buffering is done
// within R.
R *fwd.Reader
scratch []byte
}
// Read implements `io.Reader`
func (m *Reader) Read(p []byte) (int, error) {
return m.r.Read(p)
return m.R.Read(p)
}
// CopyNext reads the next object from m without decoding it and writes it to w.
// It avoids unnecessary copies internally.
func (m *Reader) CopyNext(w io.Writer) (int64, error) {
sz, o, err := getNextSize(m.R)
if err != nil {
return 0, err
}
var n int64
// Opportunistic optimization: if we can fit the whole thing in the m.R
// buffer, then just get a pointer to that, and pass it to w.Write,
// avoiding an allocation.
if int(sz) <= m.R.BufferSize() {
var nn int
var buf []byte
buf, err = m.R.Next(int(sz))
if err != nil {
if err == io.ErrUnexpectedEOF {
err = ErrShortBytes
}
return 0, err
}
nn, err = w.Write(buf)
n += int64(nn)
} else {
// Fall back to io.CopyN.
// May avoid allocating if w is a ReaderFrom (e.g. bytes.Buffer)
n, err = io.CopyN(w, m.R, int64(sz))
if err == io.ErrUnexpectedEOF {
err = ErrShortBytes
}
}
if err != nil {
return n, err
} else if n < int64(sz) {
return n, io.ErrShortWrite
}
// for maps and slices, read elements
for x := uintptr(0); x < o; x++ {
var n2 int64
n2, err = m.CopyNext(w)
if err != nil {
return n, err
}
n += n2
}
return n, nil
}
// ReadFull implements `io.ReadFull`
func (m *Reader) ReadFull(p []byte) (int, error) {
return m.r.ReadFull(p)
return m.R.ReadFull(p)
}
// Reset resets the underlying reader.
func (m *Reader) Reset(r io.Reader) { m.r.Reset(r) }
func (m *Reader) Reset(r io.Reader) { m.R.Reset(r) }
// Buffered returns the number of bytes currently in the read buffer.
func (m *Reader) Buffered() int { return m.r.Buffered() }
func (m *Reader) Buffered() int { return m.R.Buffered() }
// BufferSize returns the capacity of the read buffer.
func (m *Reader) BufferSize() int { return m.r.BufferSize() }
func (m *Reader) BufferSize() int { return m.R.BufferSize() }
// NextType returns the next object type to be decoded.
func (m *Reader) NextType() (Type, error) {
p, err := m.r.Peek(1)
p, err := m.R.Peek(1)
if err != nil {
return InvalidType, err
}
@ -182,12 +240,14 @@ func (m *Reader) NextType() (Type, error) {
// IsNil returns whether or not
// the next byte is a null messagepack byte
func (m *Reader) IsNil() bool {
p, err := m.r.Peek(1)
p, err := m.R.Peek(1)
return err == nil && p[0] == mnil
}
// getNextSize returns the size of the next object on the wire.
// returns (obj size, obj elements, error)
// only maps and arrays have non-zero obj elements
// for maps and arrays, obj size does not include elements
//
// use uintptr b/c it's guaranteed to be large enough
// to hold whatever we can fit in memory.
@ -243,8 +303,8 @@ func (m *Reader) Skip() error {
// we can use the faster
// method if we have enough
// buffered data
if m.r.Buffered() >= 5 {
p, err = m.r.Peek(5)
if m.R.Buffered() >= 5 {
p, err = m.R.Peek(5)
if err != nil {
return err
}
@ -253,7 +313,7 @@ func (m *Reader) Skip() error {
return err
}
} else {
v, o, err = getNextSize(m.r)
v, o, err = getNextSize(m.R)
if err != nil {
return err
}
@ -261,7 +321,7 @@ func (m *Reader) Skip() error {
// 'v' is always non-zero
// if err == nil
_, err = m.r.Skip(int(v))
_, err = m.R.Skip(int(v))
if err != nil {
return err
}
@ -284,26 +344,26 @@ func (m *Reader) Skip() error {
func (m *Reader) ReadMapHeader() (sz uint32, err error) {
var p []byte
var lead byte
p, err = m.r.Peek(1)
p, err = m.R.Peek(1)
if err != nil {
return
}
lead = p[0]
if isfixmap(lead) {
sz = uint32(rfixmap(lead))
_, err = m.r.Skip(1)
_, err = m.R.Skip(1)
return
}
switch lead {
case mmap16:
p, err = m.r.Next(3)
p, err = m.R.Next(3)
if err != nil {
return
}
sz = uint32(big.Uint16(p[1:]))
return
case mmap32:
p, err = m.r.Next(5)
p, err = m.R.Next(5)
if err != nil {
return
}
@ -338,7 +398,7 @@ func (m *Reader) ReadMapKey(scratch []byte) ([]byte, error) {
// method; writing into the returned slice may
// corrupt future reads.
func (m *Reader) ReadMapKeyPtr() ([]byte, error) {
p, err := m.r.Peek(1)
p, err := m.R.Peek(1)
if err != nil {
return nil, err
}
@ -346,24 +406,24 @@ func (m *Reader) ReadMapKeyPtr() ([]byte, error) {
var read int
if isfixstr(lead) {
read = int(rfixstr(lead))
m.r.Skip(1)
m.R.Skip(1)
goto fill
}
switch lead {
case mstr8, mbin8:
p, err = m.r.Next(2)
p, err = m.R.Next(2)
if err != nil {
return nil, err
}
read = int(p[1])
case mstr16, mbin16:
p, err = m.r.Next(3)
p, err = m.R.Next(3)
if err != nil {
return nil, err
}
read = int(big.Uint16(p[1:]))
case mstr32, mbin32:
p, err = m.r.Next(5)
p, err = m.R.Next(5)
if err != nil {
return nil, err
}
@ -375,7 +435,7 @@ fill:
if read == 0 {
return nil, ErrShortBytes
}
return m.r.Next(read)
return m.R.Next(read)
}
// ReadArrayHeader reads the next object as an
@ -384,19 +444,19 @@ fill:
func (m *Reader) ReadArrayHeader() (sz uint32, err error) {
var lead byte
var p []byte
p, err = m.r.Peek(1)
p, err = m.R.Peek(1)
if err != nil {
return
}
lead = p[0]
if isfixarray(lead) {
sz = uint32(rfixarray(lead))
_, err = m.r.Skip(1)
_, err = m.R.Skip(1)
return
}
switch lead {
case marray16:
p, err = m.r.Next(3)
p, err = m.R.Next(3)
if err != nil {
return
}
@ -404,7 +464,7 @@ func (m *Reader) ReadArrayHeader() (sz uint32, err error) {
return
case marray32:
p, err = m.r.Next(5)
p, err = m.R.Next(5)
if err != nil {
return
}
@ -419,14 +479,14 @@ func (m *Reader) ReadArrayHeader() (sz uint32, err error) {
// ReadNil reads a 'nil' MessagePack byte from the reader
func (m *Reader) ReadNil() error {
p, err := m.r.Peek(1)
p, err := m.R.Peek(1)
if err != nil {
return err
}
if p[0] != mnil {
return badPrefix(NilType, p[0])
}
_, err = m.r.Skip(1)
_, err = m.R.Skip(1)
return err
}
@ -435,7 +495,7 @@ func (m *Reader) ReadNil() error {
// it will be up-cast to a float64.)
func (m *Reader) ReadFloat64() (f float64, err error) {
var p []byte
p, err = m.r.Peek(9)
p, err = m.R.Peek(9)
if err != nil {
// we'll allow a coversion from float32 to float64,
// since we don't lose any precision
@ -455,14 +515,14 @@ func (m *Reader) ReadFloat64() (f float64, err error) {
return
}
f = math.Float64frombits(getMuint64(p))
_, err = m.r.Skip(9)
_, err = m.R.Skip(9)
return
}
// ReadFloat32 reads a float32 from the reader
func (m *Reader) ReadFloat32() (f float32, err error) {
var p []byte
p, err = m.r.Peek(5)
p, err = m.R.Peek(5)
if err != nil {
return
}
@ -471,14 +531,14 @@ func (m *Reader) ReadFloat32() (f float32, err error) {
return
}
f = math.Float32frombits(getMuint32(p))
_, err = m.r.Skip(5)
_, err = m.R.Skip(5)
return
}
// ReadBool reads a bool from the reader
func (m *Reader) ReadBool() (b bool, err error) {
var p []byte
p, err = m.r.Peek(1)
p, err = m.R.Peek(1)
if err != nil {
return
}
@ -490,7 +550,7 @@ func (m *Reader) ReadBool() (b bool, err error) {
err = badPrefix(BoolType, p[0])
return
}
_, err = m.r.Skip(1)
_, err = m.R.Skip(1)
return
}
@ -498,7 +558,7 @@ func (m *Reader) ReadBool() (b bool, err error) {
func (m *Reader) ReadInt64() (i int64, err error) {
var p []byte
var lead byte
p, err = m.r.Peek(1)
p, err = m.R.Peek(1)
if err != nil {
return
}
@ -506,17 +566,17 @@ func (m *Reader) ReadInt64() (i int64, err error) {
if isfixint(lead) {
i = int64(rfixint(lead))
_, err = m.r.Skip(1)
_, err = m.R.Skip(1)
return
} else if isnfixint(lead) {
i = int64(rnfixint(lead))
_, err = m.r.Skip(1)
_, err = m.R.Skip(1)
return
}
switch lead {
case mint8:
p, err = m.r.Next(2)
p, err = m.R.Next(2)
if err != nil {
return
}
@ -524,7 +584,7 @@ func (m *Reader) ReadInt64() (i int64, err error) {
return
case mint16:
p, err = m.r.Next(3)
p, err = m.R.Next(3)
if err != nil {
return
}
@ -532,7 +592,7 @@ func (m *Reader) ReadInt64() (i int64, err error) {
return
case mint32:
p, err = m.r.Next(5)
p, err = m.R.Next(5)
if err != nil {
return
}
@ -540,7 +600,7 @@ func (m *Reader) ReadInt64() (i int64, err error) {
return
case mint64:
p, err = m.r.Next(9)
p, err = m.R.Next(9)
if err != nil {
return
}
@ -607,19 +667,19 @@ func (m *Reader) ReadInt() (i int, err error) {
func (m *Reader) ReadUint64() (u uint64, err error) {
var p []byte
var lead byte
p, err = m.r.Peek(1)
p, err = m.R.Peek(1)
if err != nil {
return
}
lead = p[0]
if isfixint(lead) {
u = uint64(rfixint(lead))
_, err = m.r.Skip(1)
_, err = m.R.Skip(1)
return
}
switch lead {
case muint8:
p, err = m.r.Next(2)
p, err = m.R.Next(2)
if err != nil {
return
}
@ -627,7 +687,7 @@ func (m *Reader) ReadUint64() (u uint64, err error) {
return
case muint16:
p, err = m.r.Next(3)
p, err = m.R.Next(3)
if err != nil {
return
}
@ -635,7 +695,7 @@ func (m *Reader) ReadUint64() (u uint64, err error) {
return
case muint32:
p, err = m.r.Next(5)
p, err = m.R.Next(5)
if err != nil {
return
}
@ -643,7 +703,7 @@ func (m *Reader) ReadUint64() (u uint64, err error) {
return
case muint64:
p, err = m.r.Next(9)
p, err = m.R.Next(9)
if err != nil {
return
}
@ -707,6 +767,10 @@ func (m *Reader) ReadUint() (u uint, err error) {
return
}
// ReadByte is analogous to ReadUint8.
//
// NOTE: this is *not* an implementation
// of io.ByteReader.
func (m *Reader) ReadByte() (b byte, err error) {
var in uint64
in, err = m.ReadUint64()
@ -724,7 +788,7 @@ func (m *Reader) ReadByte() (b byte, err error) {
func (m *Reader) ReadBytes(scratch []byte) (b []byte, err error) {
var p []byte
var lead byte
p, err = m.r.Peek(2)
p, err = m.R.Peek(2)
if err != nil {
return
}
@ -733,15 +797,15 @@ func (m *Reader) ReadBytes(scratch []byte) (b []byte, err error) {
switch lead {
case mbin8:
read = int64(p[1])
m.r.Skip(2)
m.R.Skip(2)
case mbin16:
p, err = m.r.Next(3)
p, err = m.R.Next(3)
if err != nil {
return
}
read = int64(big.Uint16(p[1:]))
case mbin32:
p, err = m.r.Next(5)
p, err = m.R.Next(5)
if err != nil {
return
}
@ -755,16 +819,55 @@ func (m *Reader) ReadBytes(scratch []byte) (b []byte, err error) {
} else {
b = scratch[0:read]
}
_, err = m.r.ReadFull(b)
_, err = m.R.ReadFull(b)
return
}
// ReadBytesHeader reads the size header
// of a MessagePack 'bin' object. The user
// is responsible for dealing with the next
// 'sz' bytes from the reader in an application-specific
// way.
func (m *Reader) ReadBytesHeader() (sz uint32, err error) {
var p []byte
p, err = m.R.Peek(1)
if err != nil {
return
}
switch p[0] {
case mbin8:
p, err = m.R.Next(2)
if err != nil {
return
}
sz = uint32(p[1])
return
case mbin16:
p, err = m.R.Next(3)
if err != nil {
return
}
sz = uint32(big.Uint16(p[1:]))
return
case mbin32:
p, err = m.R.Next(5)
if err != nil {
return
}
sz = uint32(big.Uint32(p[1:]))
return
default:
err = badPrefix(BinType, p[0])
return
}
}
// ReadExactBytes reads a MessagePack 'bin'-encoded
// object off of the wire into the provided slice. An
// ArrayError will be returned if the object is not
// exactly the length of the input slice.
func (m *Reader) ReadExactBytes(into []byte) error {
p, err := m.r.Peek(2)
p, err := m.R.Peek(2)
if err != nil {
return err
}
@ -776,14 +879,14 @@ func (m *Reader) ReadExactBytes(into []byte) error {
read = int64(p[1])
skip = 2
case mbin16:
p, err = m.r.Peek(3)
p, err = m.R.Peek(3)
if err != nil {
return err
}
read = int64(big.Uint16(p[1:]))
skip = 3
case mbin32:
p, err = m.r.Peek(5)
p, err = m.R.Peek(5)
if err != nil {
return err
}
@ -795,8 +898,8 @@ func (m *Reader) ReadExactBytes(into []byte) error {
if read != int64(len(into)) {
return ArrayError{Wanted: uint32(len(into)), Got: uint32(read)}
}
m.r.Skip(skip)
_, err = m.r.ReadFull(into)
m.R.Skip(skip)
_, err = m.R.ReadFull(into)
return err
}
@ -806,7 +909,7 @@ func (m *Reader) ReadExactBytes(into []byte) error {
func (m *Reader) ReadStringAsBytes(scratch []byte) (b []byte, err error) {
var p []byte
var lead byte
p, err = m.r.Peek(1)
p, err = m.R.Peek(1)
if err != nil {
return
}
@ -815,25 +918,25 @@ func (m *Reader) ReadStringAsBytes(scratch []byte) (b []byte, err error) {
if isfixstr(lead) {
read = int64(rfixstr(lead))
m.r.Skip(1)
m.R.Skip(1)
goto fill
}
switch lead {
case mstr8:
p, err = m.r.Next(2)
p, err = m.R.Next(2)
if err != nil {
return
}
read = int64(uint8(p[1]))
case mstr16:
p, err = m.r.Next(3)
p, err = m.R.Next(3)
if err != nil {
return
}
read = int64(big.Uint16(p[1:]))
case mstr32:
p, err = m.r.Next(5)
p, err = m.R.Next(5)
if err != nil {
return
}
@ -848,16 +951,60 @@ fill:
} else {
b = scratch[0:read]
}
_, err = m.r.ReadFull(b)
_, err = m.R.ReadFull(b)
return
}
// ReadStringHeader reads a string header
// off of the wire. The user is then responsible
// for dealing with the next 'sz' bytes from
// the reader in an application-specific manner.
func (m *Reader) ReadStringHeader() (sz uint32, err error) {
var p []byte
p, err = m.R.Peek(1)
if err != nil {
return
}
lead := p[0]
if isfixstr(lead) {
sz = uint32(rfixstr(lead))
m.R.Skip(1)
return
}
switch lead {
case mstr8:
p, err = m.R.Next(2)
if err != nil {
return
}
sz = uint32(p[1])
return
case mstr16:
p, err = m.R.Next(3)
if err != nil {
return
}
sz = uint32(big.Uint16(p[1:]))
return
case mstr32:
p, err = m.R.Next(5)
if err != nil {
return
}
sz = big.Uint32(p[1:])
return
default:
err = badPrefix(StrType, lead)
return
}
}
// ReadString reads a utf-8 string from the reader
func (m *Reader) ReadString() (s string, err error) {
var p []byte
var lead byte
var read int64
p, err = m.r.Peek(1)
p, err = m.R.Peek(1)
if err != nil {
return
}
@ -865,25 +1012,25 @@ func (m *Reader) ReadString() (s string, err error) {
if isfixstr(lead) {
read = int64(rfixstr(lead))
m.r.Skip(1)
m.R.Skip(1)
goto fill
}
switch lead {
case mstr8:
p, err = m.r.Next(2)
p, err = m.R.Next(2)
if err != nil {
return
}
read = int64(uint8(p[1]))
case mstr16:
p, err = m.r.Next(3)
p, err = m.R.Next(3)
if err != nil {
return
}
read = int64(big.Uint16(p[1:]))
case mstr32:
p, err = m.r.Next(5)
p, err = m.R.Next(5)
if err != nil {
return
}
@ -915,7 +1062,7 @@ fill:
// thus escape analysis *must* conclude that
// 'out' escapes.
out := make([]byte, read)
_, err = m.r.ReadFull(out)
_, err = m.R.ReadFull(out)
if err != nil {
return
}
@ -926,7 +1073,7 @@ fill:
// ReadComplex64 reads a complex64 from the reader
func (m *Reader) ReadComplex64() (f complex64, err error) {
var p []byte
p, err = m.r.Peek(10)
p, err = m.R.Peek(10)
if err != nil {
return
}
@ -940,14 +1087,14 @@ func (m *Reader) ReadComplex64() (f complex64, err error) {
}
f = complex(math.Float32frombits(big.Uint32(p[2:])),
math.Float32frombits(big.Uint32(p[6:])))
_, err = m.r.Skip(10)
_, err = m.R.Skip(10)
return
}
// ReadComplex128 reads a complex128 from the reader
func (m *Reader) ReadComplex128() (f complex128, err error) {
var p []byte
p, err = m.r.Peek(18)
p, err = m.R.Peek(18)
if err != nil {
return
}
@ -961,7 +1108,7 @@ func (m *Reader) ReadComplex128() (f complex128, err error) {
}
f = complex(math.Float64frombits(big.Uint64(p[2:])),
math.Float64frombits(big.Uint64(p[10:])))
_, err = m.r.Skip(18)
_, err = m.R.Skip(18)
return
}
@ -996,7 +1143,7 @@ func (m *Reader) ReadMapStrIntf(mp map[string]interface{}) (err error) {
// The returned time's location will be set to time.Local.
func (m *Reader) ReadTime() (t time.Time, err error) {
var p []byte
p, err = m.r.Peek(15)
p, err = m.R.Peek(15)
if err != nil {
return
}
@ -1010,7 +1157,7 @@ func (m *Reader) ReadTime() (t time.Time, err error) {
}
sec, nsec := getUnix(p[3:])
t = time.Unix(sec, int64(nsec)).Local()
_, err = m.r.Skip(15)
_, err = m.R.Skip(15)
return
}

View File

@ -117,13 +117,13 @@ func (r Raw) Msgsize() int {
}
func appendNext(f *Reader, d *[]byte) error {
amt, o, err := getNextSize(f.r)
amt, o, err := getNextSize(f.R)
if err != nil {
return err
}
var i int
*d, i = ensure(*d, int(amt))
_, err = f.r.ReadFull((*d)[i:])
_, err = f.R.ReadFull((*d)[i:])
if err != nil {
return err
}
@ -576,7 +576,7 @@ func ReadUintBytes(b []byte) (uint, []byte, error) {
return uint(u), b, err
}
// ReadByteBytes is analagous to ReadUint8Bytes
// ReadByteBytes is analogous to ReadUint8Bytes
func ReadByteBytes(b []byte) (byte, []byte, error) {
return ReadUint8Bytes(b)
}
@ -784,6 +784,22 @@ func ReadStringBytes(b []byte) (string, []byte, error) {
return string(v), o, err
}
// ReadStringAsBytes reads a 'str' object
// into a slice of bytes. 'v' is the value of
// the 'str' object, which may reside in memory
// pointed to by 'scratch.' 'o' is the remaining bytes
// in 'b.''
// Possible errors:
// - ErrShortBytes (b not long enough)
// - TypeError{} (not 'str' type)
// - InvalidPrefixError (unknown type marker)
func ReadStringAsBytes(b []byte, scratch []byte) (v []byte, o []byte, err error) {
var tmp []byte
tmp, o, err = ReadStringZC(b)
v = append(scratch[:0], tmp...)
return
}
// ReadComplex128Bytes reads a complex128
// extension object from 'b' and returns the
// remaining bytes.
@ -922,14 +938,14 @@ func ReadIntfBytes(b []byte) (i interface{}, o []byte, err error) {
case ArrayType:
var sz uint32
sz, b, err = ReadArrayHeaderBytes(b)
sz, o, err = ReadArrayHeaderBytes(b)
if err != nil {
return
}
j := make([]interface{}, int(sz))
i = j
for d := range j {
j[d], b, err = ReadIntfBytes(b)
j[d], o, err = ReadIntfBytes(o)
if err != nil {
return
}

View File

@ -0,0 +1,41 @@
// +build !appengine
package msgp
import (
"reflect"
"unsafe"
)
// NOTE:
// all of the definition in this file
// should be repeated in appengine.go,
// but without using unsafe
const (
// spec says int and uint are always
// the same size, but that int/uint
// size may not be machine word size
smallint = unsafe.Sizeof(int(0)) == 4
)
// UnsafeString returns the byte slice as a volatile string
// THIS SHOULD ONLY BE USED BY THE CODE GENERATOR.
// THIS IS EVIL CODE.
// YOU HAVE BEEN WARNED.
func UnsafeString(b []byte) string {
sh := (*reflect.SliceHeader)(unsafe.Pointer(&b))
return *(*string)(unsafe.Pointer(&reflect.StringHeader{Data: sh.Data, Len: sh.Len}))
}
// UnsafeBytes returns the string as a byte slice
// THIS SHOULD ONLY BE USED BY THE CODE GENERATOR.
// THIS IS EVIL CODE.
// YOU HAVE BEEN WARNED.
func UnsafeBytes(s string) []byte {
return *(*[]byte)(unsafe.Pointer(&reflect.SliceHeader{
Len: len(s),
Cap: len(s),
Data: (*(*reflect.StringHeader)(unsafe.Pointer(&s))).Data,
}))
}

View File

@ -10,13 +10,6 @@ import (
"time"
)
func abs(i int64) int64 {
if i < 0 {
return -i
}
return i
}
// Sizer is an interface implemented
// by types that can estimate their
// size when MessagePack encoded.
@ -59,15 +52,26 @@ func pushWriter(wr *Writer) {
// it will cause undefined behavior.
func freeW(w *Writer) { pushWriter(w) }
// Require ensures that cap(old)-len(old) >= extra
// Require ensures that cap(old)-len(old) >= extra.
func Require(old []byte, extra int) []byte {
if cap(old)-len(old) >= extra {
l := len(old)
c := cap(old)
r := l + extra
if c >= r {
return old
}
if len(old) == 0 {
} else if l == 0 {
return make([]byte, 0, extra)
}
n := make([]byte, len(old), cap(old)-len(old)+extra)
// the new size is the greater
// of double the old capacity
// and the sum of the old length
// and the number of new bytes
// necessary.
c <<= 1
if c < r {
c = r
}
n := make([]byte, l, c)
copy(n, old)
return n
}
@ -184,6 +188,17 @@ func (mw *Writer) require(n int) (int, error) {
return wl, nil
}
func (mw *Writer) Append(b ...byte) error {
if mw.avail() < len(b) {
err := mw.flush()
if err != nil {
return err
}
}
mw.wloc += copy(mw.buf[mw.wloc:], b)
return nil
}
// push one byte onto the buffer
//
// NOTE: this is a hot code path
@ -289,9 +304,9 @@ func (mw *Writer) Reset(w io.Writer) {
// size to the writer
func (mw *Writer) WriteMapHeader(sz uint32) error {
switch {
case sz < 16:
case sz <= 15:
return mw.push(wfixmap(uint8(sz)))
case sz < math.MaxUint16:
case sz <= math.MaxUint16:
return mw.prefix16(mmap16, uint16(sz))
default:
return mw.prefix32(mmap32, sz)
@ -302,9 +317,9 @@ func (mw *Writer) WriteMapHeader(sz uint32) error {
// given size to the writer
func (mw *Writer) WriteArrayHeader(sz uint32) error {
switch {
case sz < 16:
case sz <= 15:
return mw.push(wfixarray(uint8(sz)))
case sz < math.MaxUint16:
case sz <= math.MaxUint16:
return mw.prefix16(marray16, uint16(sz))
default:
return mw.prefix32(marray32, sz)
@ -328,17 +343,26 @@ func (mw *Writer) WriteFloat32(f float32) error {
// WriteInt64 writes an int64 to the writer
func (mw *Writer) WriteInt64(i int64) error {
a := abs(i)
if i >= 0 {
switch {
case i <= math.MaxInt8:
return mw.push(wfixint(uint8(i)))
case i <= math.MaxInt16:
return mw.prefix16(mint16, uint16(i))
case i <= math.MaxInt32:
return mw.prefix32(mint32, uint32(i))
default:
return mw.prefix64(mint64, uint64(i))
}
}
switch {
case i < 0 && i > -32:
case i >= -32:
return mw.push(wnfixint(int8(i)))
case i >= 0 && i < 128:
return mw.push(wfixint(uint8(i)))
case a < math.MaxInt8:
case i >= math.MinInt8:
return mw.prefix8(mint8, uint8(i))
case a < math.MaxInt16:
case i >= math.MinInt16:
return mw.prefix16(mint16, uint16(i))
case a < math.MaxInt32:
case i >= math.MinInt32:
return mw.prefix32(mint32, uint32(i))
default:
return mw.prefix64(mint64, uint64(i))
@ -360,20 +384,20 @@ func (mw *Writer) WriteInt(i int) error { return mw.WriteInt64(int64(i)) }
// WriteUint64 writes a uint64 to the writer
func (mw *Writer) WriteUint64(u uint64) error {
switch {
case u < (1 << 7):
case u <= (1<<7)-1:
return mw.push(wfixint(uint8(u)))
case u < math.MaxUint8:
case u <= math.MaxUint8:
return mw.prefix8(muint8, uint8(u))
case u < math.MaxUint16:
case u <= math.MaxUint16:
return mw.prefix16(muint16, uint16(u))
case u < math.MaxUint32:
case u <= math.MaxUint32:
return mw.prefix32(muint32, uint32(u))
default:
return mw.prefix64(muint64, u)
}
}
// WriteByte is analagous to WriteUint8
// WriteByte is analogous to WriteUint8
func (mw *Writer) WriteByte(u byte) error { return mw.WriteUint8(uint8(u)) }
// WriteUint8 writes a uint8 to the writer
@ -393,9 +417,9 @@ func (mw *Writer) WriteBytes(b []byte) error {
sz := uint32(len(b))
var err error
switch {
case sz < math.MaxUint8:
case sz <= math.MaxUint8:
err = mw.prefix8(mbin8, uint8(sz))
case sz < math.MaxUint16:
case sz <= math.MaxUint16:
err = mw.prefix16(mbin16, uint16(sz))
default:
err = mw.prefix32(mbin32, sz)
@ -407,6 +431,20 @@ func (mw *Writer) WriteBytes(b []byte) error {
return err
}
// WriteBytesHeader writes just the size header
// of a MessagePack 'bin' object. The user is responsible
// for then writing 'sz' more bytes into the stream.
func (mw *Writer) WriteBytesHeader(sz uint32) error {
switch {
case sz <= math.MaxUint8:
return mw.prefix8(mbin8, uint8(sz))
case sz <= math.MaxUint16:
return mw.prefix16(mbin16, uint16(sz))
default:
return mw.prefix32(mbin32, sz)
}
}
// WriteBool writes a bool to the writer
func (mw *Writer) WriteBool(b bool) error {
if b {
@ -421,11 +459,11 @@ func (mw *Writer) WriteString(s string) error {
sz := uint32(len(s))
var err error
switch {
case sz < 32:
case sz <= 31:
err = mw.push(wfixstr(uint8(sz)))
case sz < math.MaxUint8:
case sz <= math.MaxUint8:
err = mw.prefix8(mstr8, uint8(sz))
case sz < math.MaxUint16:
case sz <= math.MaxUint16:
err = mw.prefix16(mstr16, uint16(sz))
default:
err = mw.prefix32(mstr32, sz)
@ -436,6 +474,45 @@ func (mw *Writer) WriteString(s string) error {
return mw.writeString(s)
}
// WriteStringHeader writes just the string size
// header of a MessagePack 'str' object. The user
// is responsible for writing 'sz' more valid UTF-8
// bytes to the stream.
func (mw *Writer) WriteStringHeader(sz uint32) error {
switch {
case sz <= 31:
return mw.push(wfixstr(uint8(sz)))
case sz <= math.MaxUint8:
return mw.prefix8(mstr8, uint8(sz))
case sz <= math.MaxUint16:
return mw.prefix16(mstr16, uint16(sz))
default:
return mw.prefix32(mstr32, sz)
}
}
// WriteStringFromBytes writes a 'str' object
// from a []byte.
func (mw *Writer) WriteStringFromBytes(str []byte) error {
sz := uint32(len(str))
var err error
switch {
case sz <= 31:
err = mw.push(wfixstr(uint8(sz)))
case sz <= math.MaxUint8:
err = mw.prefix8(mstr8, uint8(sz))
case sz <= math.MaxUint16:
err = mw.prefix16(mstr16, uint16(sz))
default:
err = mw.prefix32(mstr32, sz)
}
if err != nil {
return err
}
_, err = mw.Write(str)
return err
}
// WriteComplex64 writes a complex64 to the writer
func (mw *Writer) WriteComplex64(f complex64) error {
o, err := mw.require(10)
@ -509,7 +586,7 @@ func (mw *Writer) WriteMapStrIntf(mp map[string]interface{}) (err error) {
// elapsed since "zero" Unix time, followed by 4 bytes
// for a big-endian 32-bit signed integer denoting
// the nanosecond offset of the time. This encoding
// is intended to ease portability accross languages.
// is intended to ease portability across languages.
// (Note that this is *not* the standard time.Time
// binary encoding, because its implementation relies
// heavily on the internal representation used by the
@ -612,7 +689,7 @@ func (mw *Writer) WriteIntf(v interface{}) error {
}
func (mw *Writer) writeMap(v reflect.Value) (err error) {
if v.Elem().Kind() != reflect.String {
if v.Type().Key().Kind() != reflect.String {
return errors.New("msgp: map keys must be strings")
}
ks := v.MapKeys()

View File

@ -22,10 +22,10 @@ func ensure(b []byte, sz int) ([]byte, int) {
// given size to the slice
func AppendMapHeader(b []byte, sz uint32) []byte {
switch {
case sz < 16:
case sz <= 15:
return append(b, wfixmap(uint8(sz)))
case sz < math.MaxUint16:
case sz <= math.MaxUint16:
o, n := ensure(b, 3)
prefixu16(o[n:], mmap16, uint16(sz))
return o
@ -41,10 +41,10 @@ func AppendMapHeader(b []byte, sz uint32) []byte {
// the given size to the slice
func AppendArrayHeader(b []byte, sz uint32) []byte {
switch {
case sz < 16:
case sz <= 15:
return append(b, wfixarray(uint8(sz)))
case sz < math.MaxUint16:
case sz <= math.MaxUint16:
o, n := ensure(b, 3)
prefixu16(o[n:], marray16, uint16(sz))
return o
@ -75,29 +75,39 @@ func AppendFloat32(b []byte, f float32) []byte {
// AppendInt64 appends an int64 to the slice
func AppendInt64(b []byte, i int64) []byte {
a := abs(i)
if i >= 0 {
switch {
case i <= math.MaxInt8:
return append(b, wfixint(uint8(i)))
case i <= math.MaxInt16:
o, n := ensure(b, 3)
putMint16(o[n:], int16(i))
return o
case i <= math.MaxInt32:
o, n := ensure(b, 5)
putMint32(o[n:], int32(i))
return o
default:
o, n := ensure(b, 9)
putMint64(o[n:], i)
return o
}
}
switch {
case i < 0 && i > -32:
case i >= -32:
return append(b, wnfixint(int8(i)))
case i >= 0 && i < 128:
return append(b, wfixint(uint8(i)))
case a < math.MaxInt8:
case i >= math.MinInt8:
o, n := ensure(b, 2)
putMint8(o[n:], int8(i))
return o
case a < math.MaxInt16:
case i >= math.MinInt16:
o, n := ensure(b, 3)
putMint16(o[n:], int16(i))
return o
case a < math.MaxInt32:
case i >= math.MinInt32:
o, n := ensure(b, 5)
putMint32(o[n:], int32(i))
return o
default:
o, n := ensure(b, 9)
putMint64(o[n:], i)
@ -120,20 +130,20 @@ func AppendInt32(b []byte, i int32) []byte { return AppendInt64(b, int64(i)) }
// AppendUint64 appends a uint64 to the slice
func AppendUint64(b []byte, u uint64) []byte {
switch {
case u < (1 << 7):
case u <= (1<<7)-1:
return append(b, wfixint(uint8(u)))
case u < math.MaxUint8:
case u <= math.MaxUint8:
o, n := ensure(b, 2)
putMuint8(o[n:], uint8(u))
return o
case u < math.MaxUint16:
case u <= math.MaxUint16:
o, n := ensure(b, 3)
putMuint16(o[n:], uint16(u))
return o
case u < math.MaxUint32:
case u <= math.MaxUint32:
o, n := ensure(b, 5)
putMuint32(o[n:], uint32(u))
return o
@ -152,7 +162,7 @@ func AppendUint(b []byte, u uint) []byte { return AppendUint64(b, uint64(u)) }
// AppendUint8 appends a uint8 to the slice
func AppendUint8(b []byte, u uint8) []byte { return AppendUint64(b, uint64(u)) }
// AppendByte is analagous to AppendUint8
// AppendByte is analogous to AppendUint8
func AppendByte(b []byte, u byte) []byte { return AppendUint8(b, uint8(u)) }
// AppendUint16 appends a uint16 to the slice
@ -167,11 +177,11 @@ func AppendBytes(b []byte, bts []byte) []byte {
var o []byte
var n int
switch {
case sz < math.MaxUint8:
case sz <= math.MaxUint8:
o, n = ensure(b, 2+sz)
prefixu8(o[n:], mbin8, uint8(sz))
n += 2
case sz < math.MaxUint16:
case sz <= math.MaxUint16:
o, n = ensure(b, 3+sz)
prefixu16(o[n:], mbin16, uint16(sz))
n += 3
@ -197,15 +207,15 @@ func AppendString(b []byte, s string) []byte {
var n int
var o []byte
switch {
case sz < 32:
case sz <= 31:
o, n = ensure(b, 1+sz)
o[n] = wfixstr(uint8(sz))
n++
case sz < math.MaxUint8:
case sz <= math.MaxUint8:
o, n = ensure(b, 2+sz)
prefixu8(o[n:], mstr8, uint8(sz))
n += 2
case sz < math.MaxUint16:
case sz <= math.MaxUint16:
o, n = ensure(b, 3+sz)
prefixu16(o[n:], mstr16, uint16(sz))
n += 3
@ -217,6 +227,33 @@ func AppendString(b []byte, s string) []byte {
return o[:n+copy(o[n:], s)]
}
// AppendStringFromBytes appends a []byte
// as a MessagePack 'str' to the slice 'b.'
func AppendStringFromBytes(b []byte, str []byte) []byte {
sz := len(str)
var n int
var o []byte
switch {
case sz <= 31:
o, n = ensure(b, 1+sz)
o[n] = wfixstr(uint8(sz))
n++
case sz <= math.MaxUint8:
o, n = ensure(b, 2+sz)
prefixu8(o[n:], mstr8, uint8(sz))
n += 2
case sz <= math.MaxUint16:
o, n = ensure(b, 3+sz)
prefixu16(o[n:], mstr16, uint16(sz))
n += 3
default:
o, n = ensure(b, 5+sz)
prefixu32(o[n:], mstr32, uint32(sz))
n += 5
}
return o[:n+copy(o[n:], str)]
}
// AppendComplex64 appends a complex64 to the slice as a MessagePack extension
func AppendComplex64(b []byte, c complex64) []byte {
o, n := ensure(b, Complex64Size)
@ -362,7 +399,12 @@ func AppendIntf(b []byte, i interface{}) ([]byte, error) {
}
}
return b, nil
case reflect.Ptr:
if v.IsNil() {
return AppendNil(b), err
}
b, err = AppendIntf(b, v.Elem().Interface())
return b, err
default:
return b, &ErrUnsupportedType{T: v.Type()}
}