Merge pull request #198 from ssb-ngi-pointer/room1-tests
check that roomstate doesn't keep stale connections
This commit is contained in:
commit
1a1a25a792
2
go.mod
2
go.mod
|
@ -29,7 +29,7 @@ require (
|
||||||
github.com/volatiletech/sqlboiler-sqlite3 v0.0.0-20210314195744-a1c697a68aef // indirect
|
github.com/volatiletech/sqlboiler-sqlite3 v0.0.0-20210314195744-a1c697a68aef // indirect
|
||||||
github.com/volatiletech/sqlboiler/v4 v4.5.0
|
github.com/volatiletech/sqlboiler/v4 v4.5.0
|
||||||
github.com/volatiletech/strmangle v0.0.1
|
github.com/volatiletech/strmangle v0.0.1
|
||||||
go.cryptoscope.co/muxrpc/v2 v2.0.1-0.20210420101321-264bf6a7df2c
|
go.cryptoscope.co/muxrpc/v2 v2.0.2
|
||||||
go.cryptoscope.co/netwrap v0.1.1
|
go.cryptoscope.co/netwrap v0.1.1
|
||||||
go.cryptoscope.co/secretstream v1.2.2
|
go.cryptoscope.co/secretstream v1.2.2
|
||||||
go.mindeco.de v1.11.0
|
go.mindeco.de v1.11.0
|
||||||
|
|
2
go.sum
2
go.sum
|
@ -499,6 +499,8 @@ go.cryptoscope.co/muxrpc/v2 v2.0.1-0.20210420101321-264bf6a7df2c h1:+78PDKICrSKw
|
||||||
go.cryptoscope.co/muxrpc/v2 v2.0.1-0.20210420101321-264bf6a7df2c/go.mod h1:MgaeojIkWY3lLuoNw1mlMT3b3jiZwOj/fgsoGZp/VNA=
|
go.cryptoscope.co/muxrpc/v2 v2.0.1-0.20210420101321-264bf6a7df2c/go.mod h1:MgaeojIkWY3lLuoNw1mlMT3b3jiZwOj/fgsoGZp/VNA=
|
||||||
go.cryptoscope.co/muxrpc/v2 v2.0.1 h1:U1dS1dsFk7/LygH8o2qsWy8dHB5g3YeLtm0lsN7c1CI=
|
go.cryptoscope.co/muxrpc/v2 v2.0.1 h1:U1dS1dsFk7/LygH8o2qsWy8dHB5g3YeLtm0lsN7c1CI=
|
||||||
go.cryptoscope.co/muxrpc/v2 v2.0.1/go.mod h1:MgaeojIkWY3lLuoNw1mlMT3b3jiZwOj/fgsoGZp/VNA=
|
go.cryptoscope.co/muxrpc/v2 v2.0.1/go.mod h1:MgaeojIkWY3lLuoNw1mlMT3b3jiZwOj/fgsoGZp/VNA=
|
||||||
|
go.cryptoscope.co/muxrpc/v2 v2.0.2 h1:UdlGHY+EEYZpJz5HWnWz0r34pULYxJHfFTeqLvv+7sA=
|
||||||
|
go.cryptoscope.co/muxrpc/v2 v2.0.2/go.mod h1:MgaeojIkWY3lLuoNw1mlMT3b3jiZwOj/fgsoGZp/VNA=
|
||||||
go.cryptoscope.co/netwrap v0.1.0/go.mod h1:7zcYswCa4CT+ct54e9uH9+IIbYYETEMHKDNpzl8Ukew=
|
go.cryptoscope.co/netwrap v0.1.0/go.mod h1:7zcYswCa4CT+ct54e9uH9+IIbYYETEMHKDNpzl8Ukew=
|
||||||
go.cryptoscope.co/netwrap v0.1.1 h1:JLzzGKEvrUrkKzu3iM0DhpHmt+L/gYqmpcf1lJMUyFs=
|
go.cryptoscope.co/netwrap v0.1.1 h1:JLzzGKEvrUrkKzu3iM0DhpHmt+L/gYqmpcf1lJMUyFs=
|
||||||
go.cryptoscope.co/netwrap v0.1.1/go.mod h1:7zcYswCa4CT+ct54e9uH9+IIbYYETEMHKDNpzl8Ukew=
|
go.cryptoscope.co/netwrap v0.1.1/go.mod h1:7zcYswCa4CT+ct54e9uH9+IIbYYETEMHKDNpzl8Ukew=
|
||||||
|
|
|
@ -17,13 +17,13 @@ import (
|
||||||
refs "go.mindeco.de/ssb-refs"
|
refs "go.mindeco.de/ssb-refs"
|
||||||
)
|
)
|
||||||
|
|
||||||
type connectArg struct {
|
type ConnectArg struct {
|
||||||
Portal refs.FeedRef `json:"portal"` // the room server
|
Portal refs.FeedRef `json:"portal"` // the room server
|
||||||
Target refs.FeedRef `json:"target"` // which peer the initiator/caller wants to be tunneld to
|
Target refs.FeedRef `json:"target"` // which peer the initiator/caller wants to be tunneld to
|
||||||
}
|
}
|
||||||
|
|
||||||
type connectWithOriginArg struct {
|
type connectWithOriginArg struct {
|
||||||
connectArg
|
ConnectArg
|
||||||
Origin refs.FeedRef `json:"origin"` // who started the call
|
Origin refs.FeedRef `json:"origin"` // who started the call
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -50,7 +50,7 @@ func (h connectHandler) HandleConnect(ctx context.Context, edp muxrpc.Endpoint)
|
||||||
// HandleDuplex here implements the tunnel.connect behavior of the server-side. It receives incoming events
|
// HandleDuplex here implements the tunnel.connect behavior of the server-side. It receives incoming events
|
||||||
func (h connectHandler) HandleDuplex(ctx context.Context, req *muxrpc.Request, peerSrc *muxrpc.ByteSource, peerSnk *muxrpc.ByteSink) error {
|
func (h connectHandler) HandleDuplex(ctx context.Context, req *muxrpc.Request, peerSrc *muxrpc.ByteSource, peerSnk *muxrpc.ByteSink) error {
|
||||||
// unpack arguments
|
// unpack arguments
|
||||||
var args []connectArg
|
var args []ConnectArg
|
||||||
err := json.Unmarshal(req.RawArgs, &args)
|
err := json.Unmarshal(req.RawArgs, &args)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("connect: invalid arguments: %w", err)
|
return fmt.Errorf("connect: invalid arguments: %w", err)
|
||||||
|
@ -80,7 +80,7 @@ func (h connectHandler) HandleDuplex(ctx context.Context, req *muxrpc.Request, p
|
||||||
|
|
||||||
// call connect on them
|
// call connect on them
|
||||||
var argWorigin connectWithOriginArg
|
var argWorigin connectWithOriginArg
|
||||||
argWorigin.connectArg = arg
|
argWorigin.ConnectArg = arg
|
||||||
argWorigin.Origin = *caller
|
argWorigin.Origin = *caller
|
||||||
|
|
||||||
targetSrc, targetSnk, err := edp.Duplex(ctx, muxrpc.TypeBinary, muxrpc.Method{"tunnel", "connect"}, argWorigin)
|
targetSrc, targetSnk, err := edp.Duplex(ctx, muxrpc.TypeBinary, muxrpc.Method{"tunnel", "connect"}, argWorigin)
|
||||||
|
|
|
@ -2,7 +2,6 @@ package go_test
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/base64"
|
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
@ -12,12 +11,7 @@ import (
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"go.cryptoscope.co/muxrpc/v2"
|
"go.cryptoscope.co/muxrpc/v2"
|
||||||
"go.cryptoscope.co/muxrpc/v2/debug"
|
|
||||||
"go.cryptoscope.co/netwrap"
|
|
||||||
"go.cryptoscope.co/secretstream"
|
|
||||||
|
|
||||||
"github.com/ssb-ngi-pointer/go-ssb-room/internal/maybemod/keys"
|
|
||||||
"github.com/ssb-ngi-pointer/go-ssb-room/roomdb"
|
|
||||||
refs "go.mindeco.de/ssb-refs"
|
refs "go.mindeco.de/ssb-refs"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -40,9 +34,9 @@ func TestEndpointClients(t *testing.T) {
|
||||||
ctx = ts.ctx
|
ctx = ts.ctx
|
||||||
|
|
||||||
// create three test clients
|
// create three test clients
|
||||||
alf, alfFeed := ts.makeTestClient("alf")
|
alf := ts.makeTestClient("alf")
|
||||||
bre, breFeed := ts.makeTestClient("bre")
|
bre := ts.makeTestClient("bre")
|
||||||
carl, carlFeed := ts.makeTestClient("carl")
|
carl := ts.makeTestClient("carl")
|
||||||
|
|
||||||
// let carl join the room
|
// let carl join the room
|
||||||
// carl wont announce to emulate manyverse
|
// carl wont announce to emulate manyverse
|
||||||
|
@ -51,10 +45,10 @@ func TestEndpointClients(t *testing.T) {
|
||||||
t.Log("carl opened endpoints")
|
t.Log("carl opened endpoints")
|
||||||
|
|
||||||
announcementsForCarl := make(announcements)
|
announcementsForCarl := make(announcements)
|
||||||
go logStream(ts, carlEndpointsSerc, "carl", announcementsForCarl)
|
go logEndpointsStream(ts, carlEndpointsSerc, "carl", announcementsForCarl)
|
||||||
time.Sleep(1 * time.Second) // give some time to process new events
|
time.Sleep(1 * time.Second) // give some time to process new events
|
||||||
|
|
||||||
_, seen := announcementsForCarl[carlFeed.Ref()]
|
_, seen := announcementsForCarl[carl.feed.Ref()]
|
||||||
a.True(seen, "carl saw himself")
|
a.True(seen, "carl saw himself")
|
||||||
|
|
||||||
// let alf join the room
|
// let alf join the room
|
||||||
|
@ -62,17 +56,17 @@ func TestEndpointClients(t *testing.T) {
|
||||||
r.NoError(err)
|
r.NoError(err)
|
||||||
|
|
||||||
announcementsForAlf := make(announcements)
|
announcementsForAlf := make(announcements)
|
||||||
go logStream(ts, alfEndpointsSerc, "alf", announcementsForAlf)
|
go logEndpointsStream(ts, alfEndpointsSerc, "alf", announcementsForAlf)
|
||||||
time.Sleep(1 * time.Second) // give some time to process new events
|
time.Sleep(1 * time.Second) // give some time to process new events
|
||||||
|
|
||||||
// assert what alf saw
|
// assert what alf saw
|
||||||
_, seen = announcementsForAlf[carlFeed.Ref()]
|
_, seen = announcementsForAlf[carl.feed.Ref()]
|
||||||
a.True(seen, "alf saw carl")
|
a.True(seen, "alf saw carl")
|
||||||
_, seen = announcementsForAlf[alfFeed.Ref()]
|
_, seen = announcementsForAlf[alf.feed.Ref()]
|
||||||
a.True(seen, "alf saw himself")
|
a.True(seen, "alf saw himself")
|
||||||
|
|
||||||
// assert what carl saw
|
// assert what carl saw
|
||||||
_, seen = announcementsForCarl[alfFeed.Ref()]
|
_, seen = announcementsForCarl[alf.feed.Ref()]
|
||||||
a.True(seen, "carl saw alf")
|
a.True(seen, "carl saw alf")
|
||||||
|
|
||||||
// let bre join the room
|
// let bre join the room
|
||||||
|
@ -80,22 +74,22 @@ func TestEndpointClients(t *testing.T) {
|
||||||
r.NoError(err)
|
r.NoError(err)
|
||||||
|
|
||||||
announcementsForBre := make(announcements)
|
announcementsForBre := make(announcements)
|
||||||
go logStream(ts, breEndpointsSrc, "bre", announcementsForBre)
|
go logEndpointsStream(ts, breEndpointsSrc, "bre", announcementsForBre)
|
||||||
|
|
||||||
time.Sleep(1 * time.Second) // give some time to process new events
|
time.Sleep(1 * time.Second) // give some time to process new events
|
||||||
|
|
||||||
// assert bre saw the other two and herself
|
// assert bre saw the other two and herself
|
||||||
_, seen = announcementsForBre[carlFeed.Ref()]
|
_, seen = announcementsForBre[carl.feed.Ref()]
|
||||||
a.True(seen, "bre saw carl")
|
a.True(seen, "bre saw carl")
|
||||||
_, seen = announcementsForBre[alfFeed.Ref()]
|
_, seen = announcementsForBre[alf.feed.Ref()]
|
||||||
a.True(seen, "bre saw alf")
|
a.True(seen, "bre saw alf")
|
||||||
_, seen = announcementsForBre[breFeed.Ref()]
|
_, seen = announcementsForBre[bre.feed.Ref()]
|
||||||
a.True(seen, "bre saw herself")
|
a.True(seen, "bre saw herself")
|
||||||
|
|
||||||
// assert the others saw bre
|
// assert the others saw bre
|
||||||
_, seen = announcementsForAlf[breFeed.Ref()]
|
_, seen = announcementsForAlf[bre.feed.Ref()]
|
||||||
a.True(seen, "alf saw bre")
|
a.True(seen, "alf saw bre")
|
||||||
_, seen = announcementsForCarl[breFeed.Ref()]
|
_, seen = announcementsForCarl[bre.feed.Ref()]
|
||||||
a.True(seen, "carl saw bre")
|
a.True(seen, "carl saw bre")
|
||||||
|
|
||||||
// terminate server and the clients
|
// terminate server and the clients
|
||||||
|
@ -110,7 +104,8 @@ func TestEndpointClients(t *testing.T) {
|
||||||
cancel()
|
cancel()
|
||||||
}
|
}
|
||||||
|
|
||||||
func logStream(ts *testSession, src *muxrpc.ByteSource, who string, a announcements) {
|
// consume endpoint messaes and put each peer on the passed map
|
||||||
|
func logEndpointsStream(ts *testSession, src *muxrpc.ByteSource, who string, a announcements) {
|
||||||
var edps []refs.FeedRef
|
var edps []refs.FeedRef
|
||||||
|
|
||||||
for src.Next(ts.ctx) {
|
for src.Next(ts.ctx) {
|
||||||
|
@ -140,58 +135,3 @@ func logStream(ts *testSession, src *muxrpc.ByteSource, who string, a announceme
|
||||||
|
|
||||||
ts.t.Log(who, "stream closed")
|
ts.t.Log(who, "stream closed")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ts *testSession) makeTestClient(name string) (muxrpc.Endpoint, refs.FeedRef) {
|
|
||||||
r := require.New(ts.t)
|
|
||||||
|
|
||||||
// create a fresh keypairs for the clients
|
|
||||||
client, err := keys.NewKeyPair(nil)
|
|
||||||
r.NoError(err)
|
|
||||||
|
|
||||||
ts.t.Log(name, "is", client.Feed.ShortRef())
|
|
||||||
|
|
||||||
// add it as a memeber
|
|
||||||
memberID, err := ts.srv.Members.Add(ts.ctx, client.Feed, roomdb.RoleMember)
|
|
||||||
r.NoError(err)
|
|
||||||
ts.t.Log(name, "is member ID:", memberID)
|
|
||||||
|
|
||||||
// default app key for the secret-handshake connection
|
|
||||||
ak, err := base64.StdEncoding.DecodeString("1KHLiKZvAvjbY1ziZEHMXawbCEIM6qwjCDm3VYRan/s=")
|
|
||||||
r.NoError(err)
|
|
||||||
|
|
||||||
// create a shs client to authenticate and encrypt the connection
|
|
||||||
clientSHS, err := secretstream.NewClient(client.Pair, ak)
|
|
||||||
r.NoError(err)
|
|
||||||
|
|
||||||
// returns a new connection that went through shs and does boxstream
|
|
||||||
|
|
||||||
tcpAddr := netwrap.GetAddr(ts.srv.Network.GetListenAddr(), "tcp")
|
|
||||||
|
|
||||||
authedConn, err := netwrap.Dial(tcpAddr, clientSHS.ConnWrapper(ts.srv.Whoami().PubKey()))
|
|
||||||
r.NoError(err)
|
|
||||||
|
|
||||||
var muxMock muxrpc.FakeHandler
|
|
||||||
|
|
||||||
testPath := filepath.Join("testrun", ts.t.Name())
|
|
||||||
debugConn := debug.Dump(filepath.Join(testPath, "client-"+name), authedConn)
|
|
||||||
pkr := muxrpc.NewPacker(debugConn)
|
|
||||||
|
|
||||||
wsEndpoint := muxrpc.Handle(pkr, &muxMock, muxrpc.WithContext(ts.ctx))
|
|
||||||
|
|
||||||
srv := wsEndpoint.(muxrpc.Server)
|
|
||||||
ts.serveGroup.Go(func() error {
|
|
||||||
err = srv.Serve()
|
|
||||||
if err != nil {
|
|
||||||
ts.t.Logf("mux server %s error: %v", name, err)
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
})
|
|
||||||
|
|
||||||
// check we are talking to a room
|
|
||||||
var yup bool
|
|
||||||
err = wsEndpoint.Async(ts.ctx, &yup, muxrpc.TypeJSON, muxrpc.Method{"tunnel", "isRoom"})
|
|
||||||
r.NoError(err)
|
|
||||||
r.True(yup, "server is not a room?")
|
|
||||||
|
|
||||||
return wsEndpoint, client.Feed
|
|
||||||
}
|
|
||||||
|
|
|
@ -0,0 +1,158 @@
|
||||||
|
package go_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"go.cryptoscope.co/muxrpc/v2"
|
||||||
|
|
||||||
|
"github.com/ssb-ngi-pointer/go-ssb-room/muxrpc/handlers/tunnel/server"
|
||||||
|
)
|
||||||
|
|
||||||
|
// peers not on the members list can't connect
|
||||||
|
func TestStaleMembers(t *testing.T) {
|
||||||
|
testInit(t)
|
||||||
|
|
||||||
|
r := require.New(t)
|
||||||
|
|
||||||
|
testPath := filepath.Join("testrun", t.Name())
|
||||||
|
os.RemoveAll(testPath)
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
t.Cleanup(cancel)
|
||||||
|
|
||||||
|
// create the roomsrv
|
||||||
|
ts := makeNamedTestBot(t, "server", ctx, nil)
|
||||||
|
ctx = ts.ctx
|
||||||
|
|
||||||
|
// two new clients, connected to server automaticaly
|
||||||
|
// default to member during makeNamedTestBot
|
||||||
|
tal := ts.makeTestClient("tal") // https://en.wikipedia.org/wiki/Tal_(name)
|
||||||
|
srh := ts.makeTestClient("srh") // https://en.wikipedia.org/wiki/Sarah
|
||||||
|
|
||||||
|
// announce srh so that tal could connect
|
||||||
|
var ok bool
|
||||||
|
err := srh.Async(ctx, &ok, muxrpc.TypeJSON, muxrpc.Method{"room", "announce"})
|
||||||
|
r.NoError(err)
|
||||||
|
r.True(ok)
|
||||||
|
|
||||||
|
_, has := ts.srv.StateManager.Has(srh.feed)
|
||||||
|
r.True(has, "srh should be connected")
|
||||||
|
|
||||||
|
// shut down srh
|
||||||
|
srh.Terminate()
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
_, has = ts.srv.StateManager.Has(srh.feed)
|
||||||
|
r.False(has, "srh shouldn't be connected")
|
||||||
|
|
||||||
|
// try to connect srh
|
||||||
|
var arg server.ConnectArg
|
||||||
|
arg.Portal = ts.srv.Whoami()
|
||||||
|
arg.Target = srh.feed
|
||||||
|
|
||||||
|
src, snk, err := tal.Duplex(ctx, muxrpc.TypeBinary, muxrpc.Method{"room", "connect"}, arg)
|
||||||
|
r.NoError(err)
|
||||||
|
|
||||||
|
time.Sleep(1 * time.Second) // let server respond
|
||||||
|
|
||||||
|
// assert cant read
|
||||||
|
r.False(src.Next(ctx), "source should be cancled")
|
||||||
|
r.Error(src.Err(), "source should have an error")
|
||||||
|
|
||||||
|
// assert cant write
|
||||||
|
testKexMsg := []byte("fake keyexchange")
|
||||||
|
_, err = snk.Write(testKexMsg)
|
||||||
|
r.Error(err, "stream should should be canceled")
|
||||||
|
|
||||||
|
// restart srh
|
||||||
|
oldSrh := srh.feed
|
||||||
|
srh = ts.makeTestClient("srh")
|
||||||
|
r.True(oldSrh.Equal(&srh.feed))
|
||||||
|
t.Log("restarted srh")
|
||||||
|
|
||||||
|
time.Sleep(1 * time.Second) // let server respond
|
||||||
|
|
||||||
|
// announce srh so that tal can connect
|
||||||
|
err = srh.Async(ctx, &ok, muxrpc.TypeJSON, muxrpc.Method{"room", "announce"})
|
||||||
|
r.NoError(err)
|
||||||
|
r.True(ok)
|
||||||
|
|
||||||
|
testKexReply := []byte("fake kex reply")
|
||||||
|
|
||||||
|
// prepare srh for incoming call
|
||||||
|
receivedCall := make(chan struct{})
|
||||||
|
receivedKexMessage := make(chan struct{})
|
||||||
|
srh.mockedHandler.HandledCalls(func(m muxrpc.Method) bool { return m.String() == "tunnel.connect" })
|
||||||
|
srh.mockedHandler.HandleCallCalls(func(ctx context.Context, req *muxrpc.Request) {
|
||||||
|
m := req.Method.String()
|
||||||
|
t.Log("received call: ", m)
|
||||||
|
if m != "tunnel.connect" {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
t.Log("correct call received")
|
||||||
|
close(receivedCall)
|
||||||
|
|
||||||
|
// receive a msg
|
||||||
|
src, err := req.ResponseSource()
|
||||||
|
if err != nil {
|
||||||
|
panic(fmt.Errorf("expected source for duplex call: %s", err))
|
||||||
|
}
|
||||||
|
|
||||||
|
src.Next(ctx)
|
||||||
|
|
||||||
|
gotKexMsg, err := src.Bytes()
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !bytes.Equal(testKexMsg, gotKexMsg) {
|
||||||
|
panic(fmt.Sprintf("wrong kex message: %q", gotKexMsg))
|
||||||
|
}
|
||||||
|
|
||||||
|
close(receivedKexMessage)
|
||||||
|
|
||||||
|
// send a msg
|
||||||
|
snk, err := req.ResponseSink()
|
||||||
|
if err != nil {
|
||||||
|
panic(fmt.Errorf("expected sink for duplex call: %s", err))
|
||||||
|
}
|
||||||
|
snk.Write(testKexReply)
|
||||||
|
})
|
||||||
|
|
||||||
|
// 2nd try to connect (should work)
|
||||||
|
src, snk, err = tal.Duplex(ctx, muxrpc.TypeBinary, muxrpc.Method{"tunnel", "connect"}, arg)
|
||||||
|
r.NoError(err)
|
||||||
|
|
||||||
|
<-receivedCall
|
||||||
|
|
||||||
|
_, err = snk.Write(testKexMsg)
|
||||||
|
r.NoError(err, "stream should should be canceled")
|
||||||
|
|
||||||
|
<-receivedKexMessage
|
||||||
|
|
||||||
|
// assert can read
|
||||||
|
r.True(src.Next(ctx), "source should be cancled")
|
||||||
|
r.NoError(src.Err(), "source should have an error")
|
||||||
|
|
||||||
|
gotKexMsgFromSrh, err := src.Bytes()
|
||||||
|
r.NoError(err)
|
||||||
|
r.Equal(testKexReply, gotKexMsgFromSrh)
|
||||||
|
|
||||||
|
// shut everythign down
|
||||||
|
ts.srv.Shutdown()
|
||||||
|
tal.Terminate()
|
||||||
|
srh.Terminate()
|
||||||
|
ts.srv.Close()
|
||||||
|
|
||||||
|
// wait for all muxrpc serve()s to exit
|
||||||
|
r.NoError(ts.serveGroup.Wait())
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
}
|
|
@ -7,15 +7,17 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ssb-ngi-pointer/go-ssb-room/roomdb"
|
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"go.cryptoscope.co/muxrpc/v2"
|
"go.cryptoscope.co/muxrpc/v2"
|
||||||
|
|
||||||
|
"github.com/ssb-ngi-pointer/go-ssb-room/roomdb"
|
||||||
refs "go.mindeco.de/ssb-refs"
|
refs "go.mindeco.de/ssb-refs"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestTunnelServerSimple(t *testing.T) {
|
func TestTunnelServerSimple(t *testing.T) {
|
||||||
|
testInit(t)
|
||||||
|
|
||||||
// defer leakcheck.Check(t)
|
// defer leakcheck.Check(t)
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
theBots := createServerAndBots(t, ctx, 2)
|
theBots := createServerAndBots(t, ctx, 2)
|
||||||
|
|
|
@ -5,6 +5,7 @@ package go_test
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/rand"
|
"crypto/rand"
|
||||||
|
"encoding/base64"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
@ -15,22 +16,25 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"golang.org/x/sync/errgroup"
|
|
||||||
|
|
||||||
"github.com/ssb-ngi-pointer/go-ssb-room/roomdb"
|
|
||||||
|
|
||||||
"github.com/go-kit/kit/log"
|
"github.com/go-kit/kit/log"
|
||||||
"github.com/go-kit/kit/log/level"
|
"github.com/go-kit/kit/log/level"
|
||||||
_ "github.com/mattn/go-sqlite3"
|
_ "github.com/mattn/go-sqlite3"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
"go.cryptoscope.co/muxrpc/v2"
|
||||||
"go.cryptoscope.co/muxrpc/v2/debug"
|
"go.cryptoscope.co/muxrpc/v2/debug"
|
||||||
|
"go.cryptoscope.co/netwrap"
|
||||||
|
"go.cryptoscope.co/secretstream"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
|
|
||||||
|
"github.com/ssb-ngi-pointer/go-ssb-room/internal/maybemod/keys"
|
||||||
"github.com/ssb-ngi-pointer/go-ssb-room/internal/maybemod/testutils"
|
"github.com/ssb-ngi-pointer/go-ssb-room/internal/maybemod/testutils"
|
||||||
"github.com/ssb-ngi-pointer/go-ssb-room/internal/network"
|
"github.com/ssb-ngi-pointer/go-ssb-room/internal/network"
|
||||||
"github.com/ssb-ngi-pointer/go-ssb-room/internal/repo"
|
"github.com/ssb-ngi-pointer/go-ssb-room/internal/repo"
|
||||||
"github.com/ssb-ngi-pointer/go-ssb-room/internal/signinwithssb"
|
"github.com/ssb-ngi-pointer/go-ssb-room/internal/signinwithssb"
|
||||||
|
"github.com/ssb-ngi-pointer/go-ssb-room/roomdb"
|
||||||
"github.com/ssb-ngi-pointer/go-ssb-room/roomdb/sqlite"
|
"github.com/ssb-ngi-pointer/go-ssb-room/roomdb/sqlite"
|
||||||
"github.com/ssb-ngi-pointer/go-ssb-room/roomsrv"
|
"github.com/ssb-ngi-pointer/go-ssb-room/roomsrv"
|
||||||
|
refs "go.mindeco.de/ssb-refs"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -78,6 +82,9 @@ type testSession struct {
|
||||||
|
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
serveGroup *errgroup.Group
|
serveGroup *errgroup.Group
|
||||||
|
|
||||||
|
// so that we can re-spawn clients
|
||||||
|
clientKeys map[string]*keys.KeyPair
|
||||||
}
|
}
|
||||||
|
|
||||||
func makeNamedTestBot(t testing.TB, name string, ctx context.Context, opts []roomsrv.Option) *testSession {
|
func makeNamedTestBot(t testing.TB, name string, ctx context.Context, opts []roomsrv.Option) *testSession {
|
||||||
|
@ -119,8 +126,9 @@ func makeNamedTestBot(t testing.TB, name string, ctx context.Context, opts []roo
|
||||||
r.NoError(err)
|
r.NoError(err)
|
||||||
|
|
||||||
ts := testSession{
|
ts := testSession{
|
||||||
t: t,
|
t: t,
|
||||||
srv: theBot,
|
srv: theBot,
|
||||||
|
clientKeys: make(map[string]*keys.KeyPair),
|
||||||
}
|
}
|
||||||
|
|
||||||
ts.serveGroup, ts.ctx = errgroup.WithContext(ctx)
|
ts.serveGroup, ts.ctx = errgroup.WithContext(ctx)
|
||||||
|
@ -132,6 +140,83 @@ func makeNamedTestBot(t testing.TB, name string, ctx context.Context, opts []roo
|
||||||
return &ts
|
return &ts
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type testClient struct {
|
||||||
|
muxrpc.Endpoint
|
||||||
|
|
||||||
|
feed refs.FeedRef
|
||||||
|
|
||||||
|
mockedHandler *muxrpc.FakeHandler
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ts *testSession) makeTestClient(name string) testClient {
|
||||||
|
r := require.New(ts.t)
|
||||||
|
|
||||||
|
// create a fresh keypairs for the clients
|
||||||
|
client, has := ts.clientKeys[name]
|
||||||
|
if !has {
|
||||||
|
var err error
|
||||||
|
client, err = keys.NewKeyPair(nil)
|
||||||
|
r.NoError(err)
|
||||||
|
ts.clientKeys[name] = client
|
||||||
|
}
|
||||||
|
|
||||||
|
ts.t.Log(name, "is", client.Feed.ShortRef())
|
||||||
|
|
||||||
|
// add it as a memeber, if it isnt already
|
||||||
|
_, err := ts.srv.Members.GetByFeed(ts.ctx, client.Feed)
|
||||||
|
if errors.Is(err, roomdb.ErrNotFound) {
|
||||||
|
memberID, err := ts.srv.Members.Add(ts.ctx, client.Feed, roomdb.RoleMember)
|
||||||
|
r.NoError(err)
|
||||||
|
ts.t.Log(name, "is member ID:", memberID)
|
||||||
|
} else {
|
||||||
|
r.NoError(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// default app key for the secret-handshake connection
|
||||||
|
ak, err := base64.StdEncoding.DecodeString("1KHLiKZvAvjbY1ziZEHMXawbCEIM6qwjCDm3VYRan/s=")
|
||||||
|
r.NoError(err)
|
||||||
|
|
||||||
|
// create a shs client to authenticate and encrypt the connection
|
||||||
|
clientSHS, err := secretstream.NewClient(client.Pair, ak)
|
||||||
|
r.NoError(err)
|
||||||
|
|
||||||
|
// returns a new connection that went through shs and does boxstream
|
||||||
|
tcpAddr := netwrap.GetAddr(ts.srv.Network.GetListenAddr(), "tcp")
|
||||||
|
authedConn, err := netwrap.Dial(tcpAddr, clientSHS.ConnWrapper(ts.srv.Whoami().PubKey()))
|
||||||
|
r.NoError(err)
|
||||||
|
|
||||||
|
// testPath := filepath.Join("testrun", ts.t.Name())
|
||||||
|
// debugConn := debug.Dump(filepath.Join(testPath, "client-"+name), authedConn)
|
||||||
|
pkr := muxrpc.NewPacker(authedConn)
|
||||||
|
|
||||||
|
var muxMock = new(muxrpc.FakeHandler)
|
||||||
|
wsEndpoint := muxrpc.Handle(pkr, muxMock,
|
||||||
|
muxrpc.WithLogger(log.With(mainLog, "client", name)),
|
||||||
|
muxrpc.WithContext(ts.ctx),
|
||||||
|
)
|
||||||
|
|
||||||
|
srv := wsEndpoint.(muxrpc.Server)
|
||||||
|
ts.serveGroup.Go(func() error {
|
||||||
|
err = srv.Serve()
|
||||||
|
if err != nil {
|
||||||
|
ts.t.Logf("mux server %s error: %v", name, err)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
|
||||||
|
// check we are talking to a room
|
||||||
|
var yup bool
|
||||||
|
err = wsEndpoint.Async(ts.ctx, &yup, muxrpc.TypeJSON, muxrpc.Method{"tunnel", "isRoom"})
|
||||||
|
r.NoError(err)
|
||||||
|
r.True(yup, "server is not a room?")
|
||||||
|
|
||||||
|
return testClient{
|
||||||
|
feed: client.Feed,
|
||||||
|
Endpoint: wsEndpoint,
|
||||||
|
mockedHandler: muxMock,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// TODO: refactor for single test session and use makeTestClient()
|
// TODO: refactor for single test session and use makeTestClient()
|
||||||
func createServerAndBots(t *testing.T, ctx context.Context, count uint) []*testSession {
|
func createServerAndBots(t *testing.T, ctx context.Context, count uint) []*testSession {
|
||||||
testInit(t)
|
testInit(t)
|
||||||
|
|
Loading…
Reference in New Issue