diff --git a/go.mod b/go.mod index 62287be..7100ab3 100644 --- a/go.mod +++ b/go.mod @@ -29,7 +29,7 @@ require ( github.com/volatiletech/sqlboiler-sqlite3 v0.0.0-20210314195744-a1c697a68aef // indirect github.com/volatiletech/sqlboiler/v4 v4.5.0 github.com/volatiletech/strmangle v0.0.1 - go.cryptoscope.co/muxrpc/v2 v2.0.1-0.20210420101321-264bf6a7df2c + go.cryptoscope.co/muxrpc/v2 v2.0.2 go.cryptoscope.co/netwrap v0.1.1 go.cryptoscope.co/secretstream v1.2.2 go.mindeco.de v1.11.0 diff --git a/go.sum b/go.sum index 908f976..1dfd0dd 100644 --- a/go.sum +++ b/go.sum @@ -499,6 +499,8 @@ go.cryptoscope.co/muxrpc/v2 v2.0.1-0.20210420101321-264bf6a7df2c h1:+78PDKICrSKw go.cryptoscope.co/muxrpc/v2 v2.0.1-0.20210420101321-264bf6a7df2c/go.mod h1:MgaeojIkWY3lLuoNw1mlMT3b3jiZwOj/fgsoGZp/VNA= go.cryptoscope.co/muxrpc/v2 v2.0.1 h1:U1dS1dsFk7/LygH8o2qsWy8dHB5g3YeLtm0lsN7c1CI= go.cryptoscope.co/muxrpc/v2 v2.0.1/go.mod h1:MgaeojIkWY3lLuoNw1mlMT3b3jiZwOj/fgsoGZp/VNA= +go.cryptoscope.co/muxrpc/v2 v2.0.2 h1:UdlGHY+EEYZpJz5HWnWz0r34pULYxJHfFTeqLvv+7sA= +go.cryptoscope.co/muxrpc/v2 v2.0.2/go.mod h1:MgaeojIkWY3lLuoNw1mlMT3b3jiZwOj/fgsoGZp/VNA= go.cryptoscope.co/netwrap v0.1.0/go.mod h1:7zcYswCa4CT+ct54e9uH9+IIbYYETEMHKDNpzl8Ukew= go.cryptoscope.co/netwrap v0.1.1 h1:JLzzGKEvrUrkKzu3iM0DhpHmt+L/gYqmpcf1lJMUyFs= go.cryptoscope.co/netwrap v0.1.1/go.mod h1:7zcYswCa4CT+ct54e9uH9+IIbYYETEMHKDNpzl8Ukew= diff --git a/muxrpc/handlers/tunnel/server/connect.go b/muxrpc/handlers/tunnel/server/connect.go index 8cc4b44..cb226cb 100644 --- a/muxrpc/handlers/tunnel/server/connect.go +++ b/muxrpc/handlers/tunnel/server/connect.go @@ -17,13 +17,13 @@ import ( refs "go.mindeco.de/ssb-refs" ) -type connectArg struct { +type ConnectArg struct { Portal refs.FeedRef `json:"portal"` // the room server Target refs.FeedRef `json:"target"` // which peer the initiator/caller wants to be tunneld to } type connectWithOriginArg struct { - connectArg + ConnectArg Origin refs.FeedRef `json:"origin"` // who started the call } @@ -50,7 +50,7 @@ func (h connectHandler) HandleConnect(ctx context.Context, edp muxrpc.Endpoint) // HandleDuplex here implements the tunnel.connect behavior of the server-side. It receives incoming events func (h connectHandler) HandleDuplex(ctx context.Context, req *muxrpc.Request, peerSrc *muxrpc.ByteSource, peerSnk *muxrpc.ByteSink) error { // unpack arguments - var args []connectArg + var args []ConnectArg err := json.Unmarshal(req.RawArgs, &args) if err != nil { return fmt.Errorf("connect: invalid arguments: %w", err) @@ -80,7 +80,7 @@ func (h connectHandler) HandleDuplex(ctx context.Context, req *muxrpc.Request, p // call connect on them var argWorigin connectWithOriginArg - argWorigin.connectArg = arg + argWorigin.ConnectArg = arg argWorigin.Origin = *caller targetSrc, targetSnk, err := edp.Duplex(ctx, muxrpc.TypeBinary, muxrpc.Method{"tunnel", "connect"}, argWorigin) diff --git a/muxrpc/test/go/endpoints_test.go b/muxrpc/test/go/endpoints_test.go index a1abf11..7e2d6ec 100644 --- a/muxrpc/test/go/endpoints_test.go +++ b/muxrpc/test/go/endpoints_test.go @@ -2,7 +2,6 @@ package go_test import ( "context" - "encoding/base64" "encoding/json" "os" "path/filepath" @@ -12,12 +11,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.cryptoscope.co/muxrpc/v2" - "go.cryptoscope.co/muxrpc/v2/debug" - "go.cryptoscope.co/netwrap" - "go.cryptoscope.co/secretstream" - "github.com/ssb-ngi-pointer/go-ssb-room/internal/maybemod/keys" - "github.com/ssb-ngi-pointer/go-ssb-room/roomdb" refs "go.mindeco.de/ssb-refs" ) @@ -40,9 +34,9 @@ func TestEndpointClients(t *testing.T) { ctx = ts.ctx // create three test clients - alf, alfFeed := ts.makeTestClient("alf") - bre, breFeed := ts.makeTestClient("bre") - carl, carlFeed := ts.makeTestClient("carl") + alf := ts.makeTestClient("alf") + bre := ts.makeTestClient("bre") + carl := ts.makeTestClient("carl") // let carl join the room // carl wont announce to emulate manyverse @@ -51,10 +45,10 @@ func TestEndpointClients(t *testing.T) { t.Log("carl opened endpoints") announcementsForCarl := make(announcements) - go logStream(ts, carlEndpointsSerc, "carl", announcementsForCarl) + go logEndpointsStream(ts, carlEndpointsSerc, "carl", announcementsForCarl) time.Sleep(1 * time.Second) // give some time to process new events - _, seen := announcementsForCarl[carlFeed.Ref()] + _, seen := announcementsForCarl[carl.feed.Ref()] a.True(seen, "carl saw himself") // let alf join the room @@ -62,17 +56,17 @@ func TestEndpointClients(t *testing.T) { r.NoError(err) announcementsForAlf := make(announcements) - go logStream(ts, alfEndpointsSerc, "alf", announcementsForAlf) + go logEndpointsStream(ts, alfEndpointsSerc, "alf", announcementsForAlf) time.Sleep(1 * time.Second) // give some time to process new events // assert what alf saw - _, seen = announcementsForAlf[carlFeed.Ref()] + _, seen = announcementsForAlf[carl.feed.Ref()] a.True(seen, "alf saw carl") - _, seen = announcementsForAlf[alfFeed.Ref()] + _, seen = announcementsForAlf[alf.feed.Ref()] a.True(seen, "alf saw himself") // assert what carl saw - _, seen = announcementsForCarl[alfFeed.Ref()] + _, seen = announcementsForCarl[alf.feed.Ref()] a.True(seen, "carl saw alf") // let bre join the room @@ -80,22 +74,22 @@ func TestEndpointClients(t *testing.T) { r.NoError(err) announcementsForBre := make(announcements) - go logStream(ts, breEndpointsSrc, "bre", announcementsForBre) + go logEndpointsStream(ts, breEndpointsSrc, "bre", announcementsForBre) time.Sleep(1 * time.Second) // give some time to process new events // assert bre saw the other two and herself - _, seen = announcementsForBre[carlFeed.Ref()] + _, seen = announcementsForBre[carl.feed.Ref()] a.True(seen, "bre saw carl") - _, seen = announcementsForBre[alfFeed.Ref()] + _, seen = announcementsForBre[alf.feed.Ref()] a.True(seen, "bre saw alf") - _, seen = announcementsForBre[breFeed.Ref()] + _, seen = announcementsForBre[bre.feed.Ref()] a.True(seen, "bre saw herself") // assert the others saw bre - _, seen = announcementsForAlf[breFeed.Ref()] + _, seen = announcementsForAlf[bre.feed.Ref()] a.True(seen, "alf saw bre") - _, seen = announcementsForCarl[breFeed.Ref()] + _, seen = announcementsForCarl[bre.feed.Ref()] a.True(seen, "carl saw bre") // terminate server and the clients @@ -110,7 +104,8 @@ func TestEndpointClients(t *testing.T) { cancel() } -func logStream(ts *testSession, src *muxrpc.ByteSource, who string, a announcements) { +// consume endpoint messaes and put each peer on the passed map +func logEndpointsStream(ts *testSession, src *muxrpc.ByteSource, who string, a announcements) { var edps []refs.FeedRef for src.Next(ts.ctx) { @@ -140,58 +135,3 @@ func logStream(ts *testSession, src *muxrpc.ByteSource, who string, a announceme ts.t.Log(who, "stream closed") } - -func (ts *testSession) makeTestClient(name string) (muxrpc.Endpoint, refs.FeedRef) { - r := require.New(ts.t) - - // create a fresh keypairs for the clients - client, err := keys.NewKeyPair(nil) - r.NoError(err) - - ts.t.Log(name, "is", client.Feed.ShortRef()) - - // add it as a memeber - memberID, err := ts.srv.Members.Add(ts.ctx, client.Feed, roomdb.RoleMember) - r.NoError(err) - ts.t.Log(name, "is member ID:", memberID) - - // default app key for the secret-handshake connection - ak, err := base64.StdEncoding.DecodeString("1KHLiKZvAvjbY1ziZEHMXawbCEIM6qwjCDm3VYRan/s=") - r.NoError(err) - - // create a shs client to authenticate and encrypt the connection - clientSHS, err := secretstream.NewClient(client.Pair, ak) - r.NoError(err) - - // returns a new connection that went through shs and does boxstream - - tcpAddr := netwrap.GetAddr(ts.srv.Network.GetListenAddr(), "tcp") - - authedConn, err := netwrap.Dial(tcpAddr, clientSHS.ConnWrapper(ts.srv.Whoami().PubKey())) - r.NoError(err) - - var muxMock muxrpc.FakeHandler - - testPath := filepath.Join("testrun", ts.t.Name()) - debugConn := debug.Dump(filepath.Join(testPath, "client-"+name), authedConn) - pkr := muxrpc.NewPacker(debugConn) - - wsEndpoint := muxrpc.Handle(pkr, &muxMock, muxrpc.WithContext(ts.ctx)) - - srv := wsEndpoint.(muxrpc.Server) - ts.serveGroup.Go(func() error { - err = srv.Serve() - if err != nil { - ts.t.Logf("mux server %s error: %v", name, err) - } - return err - }) - - // check we are talking to a room - var yup bool - err = wsEndpoint.Async(ts.ctx, &yup, muxrpc.TypeJSON, muxrpc.Method{"tunnel", "isRoom"}) - r.NoError(err) - r.True(yup, "server is not a room?") - - return wsEndpoint, client.Feed -} diff --git a/muxrpc/test/go/roomstate_test.go b/muxrpc/test/go/roomstate_test.go new file mode 100644 index 0000000..a3424d0 --- /dev/null +++ b/muxrpc/test/go/roomstate_test.go @@ -0,0 +1,158 @@ +package go_test + +import ( + "bytes" + "context" + "fmt" + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/require" + "go.cryptoscope.co/muxrpc/v2" + + "github.com/ssb-ngi-pointer/go-ssb-room/muxrpc/handlers/tunnel/server" +) + +// peers not on the members list can't connect +func TestStaleMembers(t *testing.T) { + testInit(t) + + r := require.New(t) + + testPath := filepath.Join("testrun", t.Name()) + os.RemoveAll(testPath) + + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + t.Cleanup(cancel) + + // create the roomsrv + ts := makeNamedTestBot(t, "server", ctx, nil) + ctx = ts.ctx + + // two new clients, connected to server automaticaly + // default to member during makeNamedTestBot + tal := ts.makeTestClient("tal") // https://en.wikipedia.org/wiki/Tal_(name) + srh := ts.makeTestClient("srh") // https://en.wikipedia.org/wiki/Sarah + + // announce srh so that tal could connect + var ok bool + err := srh.Async(ctx, &ok, muxrpc.TypeJSON, muxrpc.Method{"room", "announce"}) + r.NoError(err) + r.True(ok) + + _, has := ts.srv.StateManager.Has(srh.feed) + r.True(has, "srh should be connected") + + // shut down srh + srh.Terminate() + time.Sleep(1 * time.Second) + _, has = ts.srv.StateManager.Has(srh.feed) + r.False(has, "srh shouldn't be connected") + + // try to connect srh + var arg server.ConnectArg + arg.Portal = ts.srv.Whoami() + arg.Target = srh.feed + + src, snk, err := tal.Duplex(ctx, muxrpc.TypeBinary, muxrpc.Method{"room", "connect"}, arg) + r.NoError(err) + + time.Sleep(1 * time.Second) // let server respond + + // assert cant read + r.False(src.Next(ctx), "source should be cancled") + r.Error(src.Err(), "source should have an error") + + // assert cant write + testKexMsg := []byte("fake keyexchange") + _, err = snk.Write(testKexMsg) + r.Error(err, "stream should should be canceled") + + // restart srh + oldSrh := srh.feed + srh = ts.makeTestClient("srh") + r.True(oldSrh.Equal(&srh.feed)) + t.Log("restarted srh") + + time.Sleep(1 * time.Second) // let server respond + + // announce srh so that tal can connect + err = srh.Async(ctx, &ok, muxrpc.TypeJSON, muxrpc.Method{"room", "announce"}) + r.NoError(err) + r.True(ok) + + testKexReply := []byte("fake kex reply") + + // prepare srh for incoming call + receivedCall := make(chan struct{}) + receivedKexMessage := make(chan struct{}) + srh.mockedHandler.HandledCalls(func(m muxrpc.Method) bool { return m.String() == "tunnel.connect" }) + srh.mockedHandler.HandleCallCalls(func(ctx context.Context, req *muxrpc.Request) { + m := req.Method.String() + t.Log("received call: ", m) + if m != "tunnel.connect" { + return + } + t.Log("correct call received") + close(receivedCall) + + // receive a msg + src, err := req.ResponseSource() + if err != nil { + panic(fmt.Errorf("expected source for duplex call: %s", err)) + } + + src.Next(ctx) + + gotKexMsg, err := src.Bytes() + if err != nil { + panic(err) + } + + if !bytes.Equal(testKexMsg, gotKexMsg) { + panic(fmt.Sprintf("wrong kex message: %q", gotKexMsg)) + } + + close(receivedKexMessage) + + // send a msg + snk, err := req.ResponseSink() + if err != nil { + panic(fmt.Errorf("expected sink for duplex call: %s", err)) + } + snk.Write(testKexReply) + }) + + // 2nd try to connect (should work) + src, snk, err = tal.Duplex(ctx, muxrpc.TypeBinary, muxrpc.Method{"tunnel", "connect"}, arg) + r.NoError(err) + + <-receivedCall + + _, err = snk.Write(testKexMsg) + r.NoError(err, "stream should should be canceled") + + <-receivedKexMessage + + // assert can read + r.True(src.Next(ctx), "source should be cancled") + r.NoError(src.Err(), "source should have an error") + + gotKexMsgFromSrh, err := src.Bytes() + r.NoError(err) + r.Equal(testKexReply, gotKexMsgFromSrh) + + // shut everythign down + ts.srv.Shutdown() + tal.Terminate() + srh.Terminate() + ts.srv.Close() + + // wait for all muxrpc serve()s to exit + r.NoError(ts.serveGroup.Wait()) + cancel() + +} diff --git a/muxrpc/test/go/simple_test.go b/muxrpc/test/go/simple_test.go index 2dc6821..85727e7 100644 --- a/muxrpc/test/go/simple_test.go +++ b/muxrpc/test/go/simple_test.go @@ -7,15 +7,17 @@ import ( "testing" "time" - "github.com/ssb-ngi-pointer/go-ssb-room/roomdb" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.cryptoscope.co/muxrpc/v2" + + "github.com/ssb-ngi-pointer/go-ssb-room/roomdb" refs "go.mindeco.de/ssb-refs" ) func TestTunnelServerSimple(t *testing.T) { + testInit(t) + // defer leakcheck.Check(t) ctx, cancel := context.WithCancel(context.Background()) theBots := createServerAndBots(t, ctx, 2) diff --git a/muxrpc/test/go/utils_test.go b/muxrpc/test/go/utils_test.go index a9ef813..ef4e674 100644 --- a/muxrpc/test/go/utils_test.go +++ b/muxrpc/test/go/utils_test.go @@ -5,6 +5,7 @@ package go_test import ( "context" "crypto/rand" + "encoding/base64" "errors" "fmt" "io" @@ -15,22 +16,25 @@ import ( "testing" "time" - "golang.org/x/sync/errgroup" - - "github.com/ssb-ngi-pointer/go-ssb-room/roomdb" - "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" _ "github.com/mattn/go-sqlite3" "github.com/stretchr/testify/require" + "go.cryptoscope.co/muxrpc/v2" "go.cryptoscope.co/muxrpc/v2/debug" + "go.cryptoscope.co/netwrap" + "go.cryptoscope.co/secretstream" + "golang.org/x/sync/errgroup" + "github.com/ssb-ngi-pointer/go-ssb-room/internal/maybemod/keys" "github.com/ssb-ngi-pointer/go-ssb-room/internal/maybemod/testutils" "github.com/ssb-ngi-pointer/go-ssb-room/internal/network" "github.com/ssb-ngi-pointer/go-ssb-room/internal/repo" "github.com/ssb-ngi-pointer/go-ssb-room/internal/signinwithssb" + "github.com/ssb-ngi-pointer/go-ssb-room/roomdb" "github.com/ssb-ngi-pointer/go-ssb-room/roomdb/sqlite" "github.com/ssb-ngi-pointer/go-ssb-room/roomsrv" + refs "go.mindeco.de/ssb-refs" ) var ( @@ -78,6 +82,9 @@ type testSession struct { ctx context.Context serveGroup *errgroup.Group + + // so that we can re-spawn clients + clientKeys map[string]*keys.KeyPair } func makeNamedTestBot(t testing.TB, name string, ctx context.Context, opts []roomsrv.Option) *testSession { @@ -119,8 +126,9 @@ func makeNamedTestBot(t testing.TB, name string, ctx context.Context, opts []roo r.NoError(err) ts := testSession{ - t: t, - srv: theBot, + t: t, + srv: theBot, + clientKeys: make(map[string]*keys.KeyPair), } ts.serveGroup, ts.ctx = errgroup.WithContext(ctx) @@ -132,6 +140,83 @@ func makeNamedTestBot(t testing.TB, name string, ctx context.Context, opts []roo return &ts } +type testClient struct { + muxrpc.Endpoint + + feed refs.FeedRef + + mockedHandler *muxrpc.FakeHandler +} + +func (ts *testSession) makeTestClient(name string) testClient { + r := require.New(ts.t) + + // create a fresh keypairs for the clients + client, has := ts.clientKeys[name] + if !has { + var err error + client, err = keys.NewKeyPair(nil) + r.NoError(err) + ts.clientKeys[name] = client + } + + ts.t.Log(name, "is", client.Feed.ShortRef()) + + // add it as a memeber, if it isnt already + _, err := ts.srv.Members.GetByFeed(ts.ctx, client.Feed) + if errors.Is(err, roomdb.ErrNotFound) { + memberID, err := ts.srv.Members.Add(ts.ctx, client.Feed, roomdb.RoleMember) + r.NoError(err) + ts.t.Log(name, "is member ID:", memberID) + } else { + r.NoError(err) + } + + // default app key for the secret-handshake connection + ak, err := base64.StdEncoding.DecodeString("1KHLiKZvAvjbY1ziZEHMXawbCEIM6qwjCDm3VYRan/s=") + r.NoError(err) + + // create a shs client to authenticate and encrypt the connection + clientSHS, err := secretstream.NewClient(client.Pair, ak) + r.NoError(err) + + // returns a new connection that went through shs and does boxstream + tcpAddr := netwrap.GetAddr(ts.srv.Network.GetListenAddr(), "tcp") + authedConn, err := netwrap.Dial(tcpAddr, clientSHS.ConnWrapper(ts.srv.Whoami().PubKey())) + r.NoError(err) + + // testPath := filepath.Join("testrun", ts.t.Name()) + // debugConn := debug.Dump(filepath.Join(testPath, "client-"+name), authedConn) + pkr := muxrpc.NewPacker(authedConn) + + var muxMock = new(muxrpc.FakeHandler) + wsEndpoint := muxrpc.Handle(pkr, muxMock, + muxrpc.WithLogger(log.With(mainLog, "client", name)), + muxrpc.WithContext(ts.ctx), + ) + + srv := wsEndpoint.(muxrpc.Server) + ts.serveGroup.Go(func() error { + err = srv.Serve() + if err != nil { + ts.t.Logf("mux server %s error: %v", name, err) + } + return err + }) + + // check we are talking to a room + var yup bool + err = wsEndpoint.Async(ts.ctx, &yup, muxrpc.TypeJSON, muxrpc.Method{"tunnel", "isRoom"}) + r.NoError(err) + r.True(yup, "server is not a room?") + + return testClient{ + feed: client.Feed, + Endpoint: wsEndpoint, + mockedHandler: muxMock, + } +} + // TODO: refactor for single test session and use makeTestClient() func createServerAndBots(t *testing.T, ctx context.Context, count uint) []*testSession { testInit(t)