From 6eeff65133da199b3c97153519831d0da46d34e8 Mon Sep 17 00:00:00 2001 From: Simon Eskildsen Date: Wed, 25 Mar 2015 03:09:45 +0000 Subject: [PATCH 1/2] listenbuffer: add test Signed-off-by: Simon Eskildsen Upstream-commit: b6b8032a1759905adbb68355e994b0405054bcc0 Component: engine --- .../pkg/listenbuffer/listen_buffer_test.go | 41 +++++++++++++++++++ 1 file changed, 41 insertions(+) create mode 100644 components/engine/pkg/listenbuffer/listen_buffer_test.go diff --git a/components/engine/pkg/listenbuffer/listen_buffer_test.go b/components/engine/pkg/listenbuffer/listen_buffer_test.go new file mode 100644 index 0000000000..6ffd2f7984 --- /dev/null +++ b/components/engine/pkg/listenbuffer/listen_buffer_test.go @@ -0,0 +1,41 @@ +package listenbuffer + +import ( + "io/ioutil" + "net" + "testing" +) + +func TestListenBufferAllowsAcceptingWhenActivated(t *testing.T) { + lock := make(chan struct{}) + buffer, err := NewListenBuffer("tcp", "", lock) + if err != nil { + t.Fatal("Unable to create listen buffer: ", err) + } + + go func() { + conn, err := net.Dial("tcp", buffer.Addr().String()) + if err != nil { + t.Fatal("Client failed to establish connection to server: ", err) + } + + conn.Write([]byte("ping")) + conn.Close() + }() + + close(lock) + + client, err := buffer.Accept() + if err != nil { + t.Fatal("Failed to accept client: ", err) + } + + response, err := ioutil.ReadAll(client) + if err != nil { + t.Fatal("Failed to read from client: ", err) + } + + if string(response) != "ping" { + t.Fatal("Expected to receive ping from client, received: ", string(response)) + } +} From 04fe8599157adfdb59afaa3e7cd9bd66b40fa6fb Mon Sep 17 00:00:00 2001 From: Simon Eskildsen Date: Wed, 25 Mar 2015 03:09:36 +0000 Subject: [PATCH 2/2] listenbuffer: add docs Signed-off-by: Simon Eskildsen Upstream-commit: 67bd859481a9d5c7a2ccf4c593e65d473ab3f106 Component: engine --- components/engine/pkg/listenbuffer/README.md | 27 ++++++++++++ components/engine/pkg/listenbuffer/buffer.go | 44 ++++++++++++++++---- 2 files changed, 64 insertions(+), 7 deletions(-) create mode 100644 components/engine/pkg/listenbuffer/README.md diff --git a/components/engine/pkg/listenbuffer/README.md b/components/engine/pkg/listenbuffer/README.md new file mode 100644 index 0000000000..2273509817 --- /dev/null +++ b/components/engine/pkg/listenbuffer/README.md @@ -0,0 +1,27 @@ +# listenbuffer + +listenbuffer uses the kernel's listening backlog functionality to queue +connections, allowing applications to start listening immediately and handle +connections later. This is signaled by closing the activation channel passed to +the constructor. + +The maximum amount of queued connections depends on the configuration of your +kernel (typically called SOMAXXCON) and cannot be configured in Go with the +net package. See `src/net/sock_platform.go` in the Go tree or consult your +kernel's manual. + + activator := make(chan struct{}) + buffer, err := NewListenBuffer("tcp", "localhost:4000", activator) + if err != nil { + panic(err) + } + + // will block until activator has been closed or is sent an event + client, err := buffer.Accept() + +Somewhere else in your application once it's been booted: + + close(activator) + +`buffer.Accept()` will return the first client in the kernel listening queue, or +continue to block until a client connects or an error occurs. diff --git a/components/engine/pkg/listenbuffer/buffer.go b/components/engine/pkg/listenbuffer/buffer.go index 17572c8a0e..6e3656d2c4 100644 --- a/components/engine/pkg/listenbuffer/buffer.go +++ b/components/engine/pkg/listenbuffer/buffer.go @@ -1,13 +1,37 @@ /* - Package to allow go applications to immediately start - listening on a socket, unix, tcp, udp but hold connections - until the application has booted and is ready to accept them +listenbuffer uses the kernel's listening backlog functionality to queue +connections, allowing applications to start listening immediately and handle +connections later. This is signaled by closing the activation channel passed to +the constructor. + +The maximum amount of queued connections depends on the configuration of your +kernel (typically called SOMAXXCON) and cannot be configured in Go with the +net package. See `src/net/sock_platform.go` in the Go tree or consult your +kernel's manual. + + activator := make(chan struct{}) + buffer, err := NewListenBuffer("tcp", "localhost:4000", activator) + if err != nil { + panic(err) + } + + // will block until activator has been closed or is sent an event + client, err := buffer.Accept() + +Somewhere else in your application once it's been booted: + + close(activator) + +`buffer.Accept()` will return the first client in the kernel listening queue, or +continue to block until a client connects or an error occurs. */ package listenbuffer import "net" -// NewListenBuffer returns a listener listening on addr with the protocol. +// NewListenBuffer returns a net.Listener listening on addr with the protocol +// passed. The channel passed is used to activate the listenbuffer when the +// caller is ready to accept connections. func NewListenBuffer(proto, addr string, activate chan struct{}) (net.Listener, error) { wrapped, err := net.Listen(proto, addr) if err != nil { @@ -20,20 +44,26 @@ func NewListenBuffer(proto, addr string, activate chan struct{}) (net.Listener, }, nil } +// defaultListener is the buffered wrapper around the net.Listener type defaultListener struct { - wrapped net.Listener // the real listener to wrap - ready bool // is the listner ready to start accpeting connections - activate chan struct{} + wrapped net.Listener // The net.Listener wrapped by listenbuffer + ready bool // Whether the listenbuffer has been activated + activate chan struct{} // Channel to control activation of the listenbuffer } +// Close closes the wrapped socket. func (l *defaultListener) Close() error { return l.wrapped.Close() } +// Addr returns the listening address of the wrapped socket. func (l *defaultListener) Addr() net.Addr { return l.wrapped.Addr() } +// Accept returns a client connection on the wrapped socket if the listen buffer +// has been activated. To active the listenbuffer the activation channel passed +// to NewListenBuffer must have been closed or sent an event. func (l *defaultListener) Accept() (net.Conn, error) { // if the listen has been told it is ready then we can go ahead and // start returning connections