diff --git a/muxrpc/handlers/tunnel/server/connect.go b/muxrpc/handlers/tunnel/server/connect.go index 8cc4b44..cb226cb 100644 --- a/muxrpc/handlers/tunnel/server/connect.go +++ b/muxrpc/handlers/tunnel/server/connect.go @@ -17,13 +17,13 @@ import ( refs "go.mindeco.de/ssb-refs" ) -type connectArg struct { +type ConnectArg struct { Portal refs.FeedRef `json:"portal"` // the room server Target refs.FeedRef `json:"target"` // which peer the initiator/caller wants to be tunneld to } type connectWithOriginArg struct { - connectArg + ConnectArg Origin refs.FeedRef `json:"origin"` // who started the call } @@ -50,7 +50,7 @@ func (h connectHandler) HandleConnect(ctx context.Context, edp muxrpc.Endpoint) // HandleDuplex here implements the tunnel.connect behavior of the server-side. It receives incoming events func (h connectHandler) HandleDuplex(ctx context.Context, req *muxrpc.Request, peerSrc *muxrpc.ByteSource, peerSnk *muxrpc.ByteSink) error { // unpack arguments - var args []connectArg + var args []ConnectArg err := json.Unmarshal(req.RawArgs, &args) if err != nil { return fmt.Errorf("connect: invalid arguments: %w", err) @@ -80,7 +80,7 @@ func (h connectHandler) HandleDuplex(ctx context.Context, req *muxrpc.Request, p // call connect on them var argWorigin connectWithOriginArg - argWorigin.connectArg = arg + argWorigin.ConnectArg = arg argWorigin.Origin = *caller targetSrc, targetSnk, err := edp.Duplex(ctx, muxrpc.TypeBinary, muxrpc.Method{"tunnel", "connect"}, argWorigin) diff --git a/muxrpc/test/go/roomstate_test.go b/muxrpc/test/go/roomstate_test.go new file mode 100644 index 0000000..7009301 --- /dev/null +++ b/muxrpc/test/go/roomstate_test.go @@ -0,0 +1,78 @@ +package go_test + +import ( + "context" + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.cryptoscope.co/muxrpc/v2" + + "github.com/ssb-ngi-pointer/go-ssb-room/muxrpc/handlers/tunnel/server" +) + +// peers not on the members list can't connect +func TestStaleMembers(t *testing.T) { + r := require.New(t) + // a := assert.New(t) + + testPath := filepath.Join("testrun", t.Name()) + os.RemoveAll(testPath) + + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + t.Cleanup(cancel) + + // create the roomsrv + ts := makeNamedTestBot(t, "server", ctx, nil) + ctx = ts.ctx + + // two new clients, connected to server automaticaly + // default to member during makeNamedTestBot + tal, _ := ts.makeTestClient("tal") + srh, srhFeed := ts.makeTestClient("srh") + + // announce srh so that other could connect + var ok bool + err := srh.Async(ctx, &ok, muxrpc.TypeJSON, muxrpc.Method{"room", "announce"}) + r.NoError(err) + + _, has := ts.srv.StateManager.Has(srhFeed) + r.True(has, "srh should be connected") + + // shut down srh + srh.Terminate() + time.Sleep(1 * time.Second) + _, has = ts.srv.StateManager.Has(srhFeed) + r.False(has, "srh shouldn't be connected") + + // try to connect srh + var arg server.ConnectArg + arg.Portal = ts.srv.Whoami() + arg.Target = srhFeed + + src, snk, err := tal.Duplex(ctx, muxrpc.TypeBinary, muxrpc.Method{"room", "connect"}, arg) + r.NoError(err) + + // let server respond + time.Sleep(1 * time.Second) + + // assert cant read + r.False(src.Next(ctx), "source should be cancled") + r.Error(src.Err(), "source should have an error") + + // assert cant write + _, err = snk.Write([]byte("fake keyexchange")) + r.Error(err, "stream should should be canceled") + + ts.srv.Shutdown() + tal.Terminate() + ts.srv.Close() + + // wait for all muxrpc serve()s to exit + r.NoError(ts.serveGroup.Wait()) + cancel() + +}