reconnect testing
This commit is contained in:
parent
3093385635
commit
3969372993
2
go.mod
2
go.mod
@ -29,7 +29,7 @@ require (
|
||||
github.com/volatiletech/sqlboiler-sqlite3 v0.0.0-20210314195744-a1c697a68aef // indirect
|
||||
github.com/volatiletech/sqlboiler/v4 v4.5.0
|
||||
github.com/volatiletech/strmangle v0.0.1
|
||||
go.cryptoscope.co/muxrpc/v2 v2.0.1-0.20210420101321-264bf6a7df2c
|
||||
go.cryptoscope.co/muxrpc/v2 v2.0.2
|
||||
go.cryptoscope.co/netwrap v0.1.1
|
||||
go.cryptoscope.co/secretstream v1.2.2
|
||||
go.mindeco.de v1.11.0
|
||||
|
2
go.sum
2
go.sum
@ -499,6 +499,8 @@ go.cryptoscope.co/muxrpc/v2 v2.0.1-0.20210420101321-264bf6a7df2c h1:+78PDKICrSKw
|
||||
go.cryptoscope.co/muxrpc/v2 v2.0.1-0.20210420101321-264bf6a7df2c/go.mod h1:MgaeojIkWY3lLuoNw1mlMT3b3jiZwOj/fgsoGZp/VNA=
|
||||
go.cryptoscope.co/muxrpc/v2 v2.0.1 h1:U1dS1dsFk7/LygH8o2qsWy8dHB5g3YeLtm0lsN7c1CI=
|
||||
go.cryptoscope.co/muxrpc/v2 v2.0.1/go.mod h1:MgaeojIkWY3lLuoNw1mlMT3b3jiZwOj/fgsoGZp/VNA=
|
||||
go.cryptoscope.co/muxrpc/v2 v2.0.2 h1:UdlGHY+EEYZpJz5HWnWz0r34pULYxJHfFTeqLvv+7sA=
|
||||
go.cryptoscope.co/muxrpc/v2 v2.0.2/go.mod h1:MgaeojIkWY3lLuoNw1mlMT3b3jiZwOj/fgsoGZp/VNA=
|
||||
go.cryptoscope.co/netwrap v0.1.0/go.mod h1:7zcYswCa4CT+ct54e9uH9+IIbYYETEMHKDNpzl8Ukew=
|
||||
go.cryptoscope.co/netwrap v0.1.1 h1:JLzzGKEvrUrkKzu3iM0DhpHmt+L/gYqmpcf1lJMUyFs=
|
||||
go.cryptoscope.co/netwrap v0.1.1/go.mod h1:7zcYswCa4CT+ct54e9uH9+IIbYYETEMHKDNpzl8Ukew=
|
||||
|
@ -1,7 +1,9 @@
|
||||
package go_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
@ -15,8 +17,9 @@ import (
|
||||
|
||||
// peers not on the members list can't connect
|
||||
func TestStaleMembers(t *testing.T) {
|
||||
testInit(t)
|
||||
|
||||
r := require.New(t)
|
||||
// a := assert.New(t)
|
||||
|
||||
testPath := filepath.Join("testrun", t.Name())
|
||||
os.RemoveAll(testPath)
|
||||
@ -31,13 +34,14 @@ func TestStaleMembers(t *testing.T) {
|
||||
|
||||
// two new clients, connected to server automaticaly
|
||||
// default to member during makeNamedTestBot
|
||||
tal := ts.makeTestClient("tal")
|
||||
srh := ts.makeTestClient("srh")
|
||||
tal := ts.makeTestClient("tal") // https://en.wikipedia.org/wiki/Tal_(name)
|
||||
srh := ts.makeTestClient("srh") // https://en.wikipedia.org/wiki/Sarah
|
||||
|
||||
// announce srh so that other could connect
|
||||
// announce srh so that tal could connect
|
||||
var ok bool
|
||||
err := srh.Async(ctx, &ok, muxrpc.TypeJSON, muxrpc.Method{"room", "announce"})
|
||||
r.NoError(err)
|
||||
r.True(ok)
|
||||
|
||||
_, has := ts.srv.StateManager.Has(srh.feed)
|
||||
r.True(has, "srh should be connected")
|
||||
@ -56,18 +60,90 @@ func TestStaleMembers(t *testing.T) {
|
||||
src, snk, err := tal.Duplex(ctx, muxrpc.TypeBinary, muxrpc.Method{"room", "connect"}, arg)
|
||||
r.NoError(err)
|
||||
|
||||
// let server respond
|
||||
time.Sleep(1 * time.Second)
|
||||
time.Sleep(1 * time.Second) // let server respond
|
||||
|
||||
// assert cant read
|
||||
r.False(src.Next(ctx), "source should be cancled")
|
||||
r.Error(src.Err(), "source should have an error")
|
||||
|
||||
// assert cant write
|
||||
_, err = snk.Write([]byte("fake keyexchange"))
|
||||
testKexMsg := []byte("fake keyexchange")
|
||||
_, err = snk.Write(testKexMsg)
|
||||
r.Error(err, "stream should should be canceled")
|
||||
|
||||
// restart srh
|
||||
oldSrh := srh.feed
|
||||
srh = ts.makeTestClient("srh")
|
||||
r.True(oldSrh.Equal(&srh.feed))
|
||||
t.Log("restarted srh")
|
||||
|
||||
time.Sleep(1 * time.Second) // let server respond
|
||||
|
||||
// announce srh so that tal can connect
|
||||
err = srh.Async(ctx, &ok, muxrpc.TypeJSON, muxrpc.Method{"room", "announce"})
|
||||
r.NoError(err)
|
||||
r.True(ok)
|
||||
|
||||
testKexReply := []byte("fake kex reply")
|
||||
|
||||
// prepare srh for incoming call
|
||||
receivedCall := make(chan struct{})
|
||||
receivedKexMessage := make(chan struct{})
|
||||
srh.mockedHandler.HandledCalls(func(m muxrpc.Method) bool { return m.String() == "tunnel.connect" })
|
||||
srh.mockedHandler.HandleCallCalls(func(ctx context.Context, req *muxrpc.Request) {
|
||||
m := req.Method.String()
|
||||
t.Log("received call: ", m)
|
||||
if m != "tunnel.connect" {
|
||||
return
|
||||
}
|
||||
t.Log("correct call received")
|
||||
close(receivedCall)
|
||||
|
||||
// receive a msg
|
||||
src, err := req.ResponseSource()
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("expected source for duplex call: %s", err))
|
||||
}
|
||||
|
||||
src.Next(ctx)
|
||||
|
||||
gotKexMsg, err := src.Bytes()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if !bytes.Equal(testKexMsg, gotKexMsg) {
|
||||
panic(fmt.Sprintf("wrong kex message: %q", gotKexMsg))
|
||||
}
|
||||
|
||||
close(receivedKexMessage)
|
||||
|
||||
// send a msg
|
||||
snk, err := req.ResponseSink()
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("expected sink for duplex call: %s", err))
|
||||
}
|
||||
snk.Write(testKexReply)
|
||||
})
|
||||
|
||||
// 2nd try to connect (should work)
|
||||
src, snk, err = tal.Duplex(ctx, muxrpc.TypeBinary, muxrpc.Method{"tunnel", "connect"}, arg)
|
||||
r.NoError(err)
|
||||
|
||||
<-receivedCall
|
||||
|
||||
_, err = snk.Write(testKexMsg)
|
||||
r.NoError(err, "stream should should be canceled")
|
||||
|
||||
<-receivedKexMessage
|
||||
|
||||
// assert can read
|
||||
r.True(src.Next(ctx), "source should be cancled")
|
||||
r.NoError(src.Err(), "source should have an error")
|
||||
|
||||
gotKexMsgFromSrh, err := src.Bytes()
|
||||
r.NoError(err)
|
||||
r.Equal(testKexReply, gotKexMsgFromSrh)
|
||||
|
||||
// shut everythign down
|
||||
ts.srv.Shutdown()
|
||||
|
@ -7,15 +7,17 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/ssb-ngi-pointer/go-ssb-room/roomdb"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.cryptoscope.co/muxrpc/v2"
|
||||
|
||||
"github.com/ssb-ngi-pointer/go-ssb-room/roomdb"
|
||||
refs "go.mindeco.de/ssb-refs"
|
||||
)
|
||||
|
||||
func TestTunnelServerSimple(t *testing.T) {
|
||||
testInit(t)
|
||||
|
||||
// defer leakcheck.Check(t)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
theBots := createServerAndBots(t, ctx, 2)
|
||||
|
@ -145,7 +145,7 @@ type testClient struct {
|
||||
|
||||
feed refs.FeedRef
|
||||
|
||||
mockedHandler muxrpc.FakeHandler
|
||||
mockedHandler *muxrpc.FakeHandler
|
||||
}
|
||||
|
||||
func (ts *testSession) makeTestClient(name string) testClient {
|
||||
@ -181,19 +181,19 @@ func (ts *testSession) makeTestClient(name string) testClient {
|
||||
r.NoError(err)
|
||||
|
||||
// returns a new connection that went through shs and does boxstream
|
||||
|
||||
tcpAddr := netwrap.GetAddr(ts.srv.Network.GetListenAddr(), "tcp")
|
||||
|
||||
authedConn, err := netwrap.Dial(tcpAddr, clientSHS.ConnWrapper(ts.srv.Whoami().PubKey()))
|
||||
r.NoError(err)
|
||||
|
||||
var muxMock muxrpc.FakeHandler
|
||||
|
||||
// testPath := filepath.Join("testrun", ts.t.Name())
|
||||
// debugConn := debug.Dump(filepath.Join(testPath, "client-"+name), authedConn)
|
||||
pkr := muxrpc.NewPacker(authedConn)
|
||||
|
||||
wsEndpoint := muxrpc.Handle(pkr, &muxMock, muxrpc.WithContext(ts.ctx))
|
||||
var muxMock = new(muxrpc.FakeHandler)
|
||||
wsEndpoint := muxrpc.Handle(pkr, muxMock,
|
||||
muxrpc.WithLogger(log.With(mainLog, "client", name)),
|
||||
muxrpc.WithContext(ts.ctx),
|
||||
)
|
||||
|
||||
srv := wsEndpoint.(muxrpc.Server)
|
||||
ts.serveGroup.Go(func() error {
|
||||
|
Loading…
Reference in New Issue
Block a user