add staleness and tunnel.connect tests
This commit is contained in:
@ -17,13 +17,13 @@ import (
|
|||||||
refs "go.mindeco.de/ssb-refs"
|
refs "go.mindeco.de/ssb-refs"
|
||||||
)
|
)
|
||||||
|
|
||||||
type connectArg struct {
|
type ConnectArg struct {
|
||||||
Portal refs.FeedRef `json:"portal"` // the room server
|
Portal refs.FeedRef `json:"portal"` // the room server
|
||||||
Target refs.FeedRef `json:"target"` // which peer the initiator/caller wants to be tunneld to
|
Target refs.FeedRef `json:"target"` // which peer the initiator/caller wants to be tunneld to
|
||||||
}
|
}
|
||||||
|
|
||||||
type connectWithOriginArg struct {
|
type connectWithOriginArg struct {
|
||||||
connectArg
|
ConnectArg
|
||||||
Origin refs.FeedRef `json:"origin"` // who started the call
|
Origin refs.FeedRef `json:"origin"` // who started the call
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -50,7 +50,7 @@ func (h connectHandler) HandleConnect(ctx context.Context, edp muxrpc.Endpoint)
|
|||||||
// HandleDuplex here implements the tunnel.connect behavior of the server-side. It receives incoming events
|
// HandleDuplex here implements the tunnel.connect behavior of the server-side. It receives incoming events
|
||||||
func (h connectHandler) HandleDuplex(ctx context.Context, req *muxrpc.Request, peerSrc *muxrpc.ByteSource, peerSnk *muxrpc.ByteSink) error {
|
func (h connectHandler) HandleDuplex(ctx context.Context, req *muxrpc.Request, peerSrc *muxrpc.ByteSource, peerSnk *muxrpc.ByteSink) error {
|
||||||
// unpack arguments
|
// unpack arguments
|
||||||
var args []connectArg
|
var args []ConnectArg
|
||||||
err := json.Unmarshal(req.RawArgs, &args)
|
err := json.Unmarshal(req.RawArgs, &args)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("connect: invalid arguments: %w", err)
|
return fmt.Errorf("connect: invalid arguments: %w", err)
|
||||||
@ -80,7 +80,7 @@ func (h connectHandler) HandleDuplex(ctx context.Context, req *muxrpc.Request, p
|
|||||||
|
|
||||||
// call connect on them
|
// call connect on them
|
||||||
var argWorigin connectWithOriginArg
|
var argWorigin connectWithOriginArg
|
||||||
argWorigin.connectArg = arg
|
argWorigin.ConnectArg = arg
|
||||||
argWorigin.Origin = *caller
|
argWorigin.Origin = *caller
|
||||||
|
|
||||||
targetSrc, targetSnk, err := edp.Duplex(ctx, muxrpc.TypeBinary, muxrpc.Method{"tunnel", "connect"}, argWorigin)
|
targetSrc, targetSnk, err := edp.Duplex(ctx, muxrpc.TypeBinary, muxrpc.Method{"tunnel", "connect"}, argWorigin)
|
||||||
|
78
muxrpc/test/go/roomstate_test.go
Normal file
78
muxrpc/test/go/roomstate_test.go
Normal file
@ -0,0 +1,78 @@
|
|||||||
|
package go_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"go.cryptoscope.co/muxrpc/v2"
|
||||||
|
|
||||||
|
"github.com/ssb-ngi-pointer/go-ssb-room/muxrpc/handlers/tunnel/server"
|
||||||
|
)
|
||||||
|
|
||||||
|
// peers not on the members list can't connect
|
||||||
|
func TestStaleMembers(t *testing.T) {
|
||||||
|
r := require.New(t)
|
||||||
|
// a := assert.New(t)
|
||||||
|
|
||||||
|
testPath := filepath.Join("testrun", t.Name())
|
||||||
|
os.RemoveAll(testPath)
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
t.Cleanup(cancel)
|
||||||
|
|
||||||
|
// create the roomsrv
|
||||||
|
ts := makeNamedTestBot(t, "server", ctx, nil)
|
||||||
|
ctx = ts.ctx
|
||||||
|
|
||||||
|
// two new clients, connected to server automaticaly
|
||||||
|
// default to member during makeNamedTestBot
|
||||||
|
tal, _ := ts.makeTestClient("tal")
|
||||||
|
srh, srhFeed := ts.makeTestClient("srh")
|
||||||
|
|
||||||
|
// announce srh so that other could connect
|
||||||
|
var ok bool
|
||||||
|
err := srh.Async(ctx, &ok, muxrpc.TypeJSON, muxrpc.Method{"room", "announce"})
|
||||||
|
r.NoError(err)
|
||||||
|
|
||||||
|
_, has := ts.srv.StateManager.Has(srhFeed)
|
||||||
|
r.True(has, "srh should be connected")
|
||||||
|
|
||||||
|
// shut down srh
|
||||||
|
srh.Terminate()
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
_, has = ts.srv.StateManager.Has(srhFeed)
|
||||||
|
r.False(has, "srh shouldn't be connected")
|
||||||
|
|
||||||
|
// try to connect srh
|
||||||
|
var arg server.ConnectArg
|
||||||
|
arg.Portal = ts.srv.Whoami()
|
||||||
|
arg.Target = srhFeed
|
||||||
|
|
||||||
|
src, snk, err := tal.Duplex(ctx, muxrpc.TypeBinary, muxrpc.Method{"room", "connect"}, arg)
|
||||||
|
r.NoError(err)
|
||||||
|
|
||||||
|
// let server respond
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
|
||||||
|
// assert cant read
|
||||||
|
r.False(src.Next(ctx), "source should be cancled")
|
||||||
|
r.Error(src.Err(), "source should have an error")
|
||||||
|
|
||||||
|
// assert cant write
|
||||||
|
_, err = snk.Write([]byte("fake keyexchange"))
|
||||||
|
r.Error(err, "stream should should be canceled")
|
||||||
|
|
||||||
|
ts.srv.Shutdown()
|
||||||
|
tal.Terminate()
|
||||||
|
ts.srv.Close()
|
||||||
|
|
||||||
|
// wait for all muxrpc serve()s to exit
|
||||||
|
r.NoError(ts.serveGroup.Wait())
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
}
|
Reference in New Issue
Block a user