begin nodejs testing

This commit is contained in:
Henry 2021-01-26 18:33:29 +01:00
parent 994a76cd67
commit 29688bc328
18 changed files with 3914 additions and 53 deletions

View File

@ -1,13 +1,14 @@
package tunnel
import (
"context"
"net"
kitlog "github.com/go-kit/kit/log"
"go.cryptoscope.co/muxrpc/v2"
"go.cryptoscope.co/muxrpc/v2/typemux"
refs "go.mindeco.de/ssb-refs"
"go.mindeco.de/ssb-rooms/internal/broadcasts"
"go.mindeco.de/ssb-rooms/internal/maybemuxrpc"
)
@ -36,11 +37,18 @@ func (plugin) Authorize(net.Conn) bool { return true }
}
*/
func New(log kitlog.Logger, self refs.FeedRef) maybemuxrpc.Plugin {
func New(log kitlog.Logger, ctx context.Context) maybemuxrpc.Plugin {
mux := typemux.New(log)
var rs roomState
var rs = new(roomState)
rs.logger = log
rs.updater, rs.broadcaster = broadcasts.NewRoomChanger()
rs.rooms = make(roomsStateMap)
go rs.stateTicker(ctx)
// so far just lobby (v1 rooms)
rs.rooms["lobby"] = make(roomStateMap)
mux.RegisterAsync(append(method, "isRoom"), typemux.AsyncFunc(rs.isRoom))
mux.RegisterAsync(append(method, "ping"), typemux.AsyncFunc(rs.ping))

View File

@ -2,40 +2,137 @@ package tunnel
import (
"context"
"fmt"
"encoding/json"
"sync"
"time"
"go.mindeco.de/ssb-rooms/internal/network"
kitlog "github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"go.cryptoscope.co/muxrpc/v2"
"go.mindeco.de/ssb-rooms/internal/broadcasts"
)
type roomState struct {
logger kitlog.Logger
updater broadcasts.RoomChangeSink
broadcaster *broadcasts.RoomChangeBroadcast
roomsMu sync.Mutex
rooms roomsStateMap
}
func (rs roomState) isRoom(context.Context, *muxrpc.Request) (interface{}, error) {
func (rs *roomState) stateTicker(ctx context.Context) {
tick := time.NewTicker(1 * time.Second)
for {
select {
case <-ctx.Done():
tick.Stop()
return
case <-tick.C:
}
rs.roomsMu.Lock()
for room, members := range rs.rooms {
level.Info(rs.logger).Log("room", room, "cnt", len(members))
for who := range members {
level.Info(rs.logger).Log("room", room, "feed", who)
}
}
rs.roomsMu.Unlock()
}
}
// layout is map[room-name]map[canonical feedref]client-handle
type roomsStateMap map[string]roomStateMap
// roomStateMap is a single room
type roomStateMap map[string]muxrpc.Endpoint
func (rs *roomState) isRoom(context.Context, *muxrpc.Request) (interface{}, error) {
level.Debug(rs.logger).Log("called", "isRoom")
return true, nil
}
func (rs roomState) ping(context.Context, *muxrpc.Request) (interface{}, error) {
func (rs *roomState) ping(context.Context, *muxrpc.Request) (interface{}, error) {
now := time.Now().UnixNano() / 1000
level.Debug(rs.logger).Log("called", "ping")
return now, nil
}
func (rs roomState) announce(context.Context, *muxrpc.Request) (interface{}, error) {
func (rs *roomState) announce(_ context.Context, req *muxrpc.Request) (interface{}, error) {
level.Debug(rs.logger).Log("called", "announce")
return nil, fmt.Errorf("TODO:announce")
ref, err := network.GetFeedRefFromAddr(req.RemoteAddr())
if err != nil {
return nil, err
}
rs.roomsMu.Lock()
rs.updater.Update(broadcasts.RoomChange{
Op: "joined",
Who: *ref,
})
// add ref to lobby
rs.rooms["lobby"][ref.Ref()] = req.Endpoint()
members := len(rs.rooms["lobby"])
rs.roomsMu.Unlock()
return RoomUpdate{"joined", true, uint(members)}, nil
}
func (rs roomState) leave(context.Context, *muxrpc.Request) (interface{}, error) {
level.Debug(rs.logger).Log("called", "leave")
return nil, fmt.Errorf("TODO:leave")
type RoomUpdate struct {
Action string
Success bool
Members uint
}
func (rs roomState) endpoints(context.Context, *muxrpc.Request, *muxrpc.ByteSink, muxrpc.Endpoint) error {
func (rs *roomState) leave(_ context.Context, req *muxrpc.Request) (interface{}, error) {
ref, err := network.GetFeedRefFromAddr(req.RemoteAddr())
if err != nil {
return nil, err
}
rs.roomsMu.Lock()
rs.updater.Update(broadcasts.RoomChange{
Op: "left",
Who: *ref,
})
// add ref to lobby
delete(rs.rooms["lobby"], ref.Ref())
members := len(rs.rooms["lobby"])
rs.roomsMu.Unlock()
return RoomUpdate{"left", true, uint(members)}, nil
}
func (rs *roomState) endpoints(_ context.Context, req *muxrpc.Request, snk *muxrpc.ByteSink, edp muxrpc.Endpoint) error {
level.Debug(rs.logger).Log("called", "endpoints")
return fmt.Errorf("TODO:endpoints")
rs.broadcaster.Register(newForwarder(snk))
return nil
}
type updateForwarder struct {
snk *muxrpc.ByteSink
enc *json.Encoder
}
func newForwarder(snk *muxrpc.ByteSink) updateForwarder {
enc := json.NewEncoder(snk)
snk.SetEncoding(muxrpc.TypeJSON)
return updateForwarder{
snk: snk,
enc: enc,
}
}
func (uf updateForwarder) Update(rc broadcasts.RoomChange) error {
return uf.enc.Encode(rc)
}
func (uf updateForwarder) Close() error {
return uf.snk.Close()
}

View File

@ -0,0 +1,220 @@
// SPDX-License-Identifier: MIT
package broadcasts
import (
"errors"
"fmt"
)
type testPrinter struct{}
func (tp testPrinter) Update(rc RoomChange) error {
fmt.Println("test:", rc.Op, rc.Who.Ref())
return nil
}
func (tp testPrinter) Close() error { return nil }
func ExampleBroadcast() {
sink, bcast := NewRoomChanger()
defer sink.Close()
var p1, p2 testPrinter
closeSink := bcast.Register(p1)
defer closeSink()
closeSink = bcast.Register(p2)
defer closeSink()
var rc RoomChange
rc.Who.Algo = "dummy"
rc.Who.ID = []byte{0, 0, 0, 0}
rc.Op = "joined"
sink.Update(rc)
// Output:
// test: joined @AAAAAA==.dummy
// test: joined @AAAAAA==.dummy
}
func ExampleBroadcastCanceled() {
sink, bcast := NewRoomChanger()
defer sink.Close()
var p1, p2 testPrinter
closeSink := bcast.Register(p1)
defer closeSink()
closeSink = bcast.Register(p2)
var rc RoomChange
rc.Who.Algo = "dummy"
rc.Who.ID = []byte{0, 0, 0, 0}
rc.Op = "joined"
closeSink()
sink.Update(rc)
// Output:
// test: joined @AAAAAA==.dummy
}
type erroringPrinter struct{}
func (tp erroringPrinter) Update(rc RoomChange) error {
fmt.Println("failed:", rc.Op, rc.Who.Ref())
return errors.New("nope")
}
func (tp erroringPrinter) Close() error { return nil }
func ExampleBroadcastOneErrs() {
sink, bcast := NewRoomChanger()
defer sink.Close()
var p1 testPrinter
var p2 erroringPrinter
closeSink := bcast.Register(p1)
defer closeSink()
closeSink = bcast.Register(p2)
defer closeSink()
var rc RoomChange
rc.Who.Algo = "dummy"
rc.Who.ID = []byte{0, 0, 0, 0}
rc.Op = "joined"
sink.Update(rc)
rc.Op = "left"
sink.Update(rc)
// Output:
// test: joined @AAAAAA==.dummy
// failed: joined @AAAAAA==.dummy
// test: left @AAAAAA==.dummy
}
/*
type expectedEOSErr struct{ v interface{} }
func (err expectedEOSErr) Error() string {
return fmt.Sprintf("expected end of stream but got %q", err.v)
}
func (err expectedEOSErr) IsExpectedEOS() bool {
return true
}
func IsExpectedEOS(err error) bool {
_, ok := err.(expectedEOSErr)
return ok
}
func TestBroadcast(t *testing.T) {
type testcase struct {
rx, tx []interface{}
}
test := func(tc testcase) {
if tc.rx == nil {
tc.rx = tc.tx
}
sink, bcast := NewRoomChanger()
mkSink := func() Sink {
var (
closed bool
i int
)
return FuncSink(func(ctx context.Context, v interface{}, err error) error {
if err != nil {
if err != (EOS{}) {
t.Log("closed with non-EOF error:", err)
}
if closed {
return fmt.Errorf("sink already closed")
}
if i != len(tc.rx) {
return fmt.Errorf("early close at i=%v", i)
}
closed = true
return nil
}
if i >= len(tc.rx) {
return expectedEOSErr{v}
}
if v != tc.rx[i] {
return fmt.Errorf("expected value %v but got %v", tc.rx[i], v)
}
i++
return nil
})
}
cancelReg1 := bcast.Register(mkSink())
cancelReg2 := bcast.Register(mkSink())
defer cancelReg1()
defer cancelReg2()
for j, v := range tc.tx {
err := sink.Pour(context.TODO(), v)
if len(tc.tx) == len(tc.rx) {
if err != nil {
t.Errorf("expected nil error but got %#v", err)
}
} else if len(tc.tx) > len(tc.rx) {
if j >= len(tc.rx) {
merr, ok := err.(*multierror.Error)
if ok {
for _, err := range merr.Errors {
if !IsExpectedEOS(err) {
t.Errorf("expected an expectedEOS error, but got %v", err)
}
}
} else {
if !IsExpectedEOS(err) {
t.Errorf("expected an expectedEOS error, but got %v", err)
}
}
} else {
if err != nil {
t.Errorf("expected nil error but got %#v", err)
}
}
}
}
err := sink.Close()
if err != nil {
t.Errorf("expected nil error but got %s", err)
}
}
cases := []testcase{
{tx: []interface{}{1, 2, 3}},
{tx: []interface{}{}},
{tx: []interface{}{nil, 0, ""}},
{tx: []interface{}{nil, 0, ""}, rx: []interface{}{nil, 0}},
}
for _, tc := range cases {
test(tc)
}
}
*/

View File

@ -0,0 +1,105 @@
package broadcasts
import (
"io"
"sync"
"github.com/hashicorp/go-multierror"
refs "go.mindeco.de/ssb-refs"
)
type RoomChange struct {
Op string
Who refs.FeedRef
}
type RoomChangeSink interface {
Update(value RoomChange) error
io.Closer
}
// NewRoomChanger returns the Sink, to write to the broadcaster, and the new
// broadcast instance.
func NewRoomChanger() (RoomChangeSink, *RoomChangeBroadcast) {
bcst := RoomChangeBroadcast{
mu: &sync.Mutex{},
sinks: make(map[*RoomChangeSink]struct{}),
}
return (*broadcastSink)(&bcst), &bcst
}
// RoomChangeBroadcast is an interface for registering one or more Sinks to recieve
// updates.
type RoomChangeBroadcast struct {
mu *sync.Mutex
sinks map[*RoomChangeSink]struct{}
}
// Register a Sink for updates to be sent. also returns
func (bcst *RoomChangeBroadcast) Register(sink RoomChangeSink) func() {
bcst.mu.Lock()
defer bcst.mu.Unlock()
bcst.sinks[&sink] = struct{}{}
return func() {
bcst.mu.Lock()
defer bcst.mu.Unlock()
delete(bcst.sinks, &sink)
sink.Close()
}
}
type broadcastSink RoomChangeBroadcast
// Pour implements the Sink interface.
func (bcst *broadcastSink) Update(rc RoomChange) error {
bcst.mu.Lock()
for s := range bcst.sinks {
err := (*s).Update(rc)
if err != nil {
delete(bcst.sinks, s)
}
}
bcst.mu.Unlock()
return nil
}
// Close implements the Sink interface.
func (bcst *broadcastSink) Close() error {
var sinks []RoomChangeSink
bcst.mu.Lock()
defer bcst.mu.Unlock()
sinks = make([]RoomChangeSink, 0, len(bcst.sinks))
for sink := range bcst.sinks {
sinks = append(sinks, *sink)
}
var (
wg sync.WaitGroup
merr *multierror.Error
)
// might be fine without the waitgroup and concurrency
wg.Add(len(sinks))
for _, sink_ := range sinks {
go func(sink RoomChangeSink) {
defer wg.Done()
err := sink.Close()
if err != nil {
merr = multierror.Append(merr, err)
return
}
}(sink_)
}
wg.Wait()
return merr.ErrorOrNil()
}

View File

@ -5,8 +5,9 @@ package multicloser
import (
"fmt"
"io"
"strings"
"sync"
"go.mindeco.de/ssb-rooms/internal/maybemod/multierror"
)
type Closer struct {
@ -44,23 +45,5 @@ func (mc *Closer) Close() error {
return nil
}
return errList{errs: errs}
}
type errList struct {
errs []error
}
func (el errList) Error() string {
var str strings.Builder
if n := len(el.errs); n > 0 {
fmt.Fprintf(&str, "multiple errors(%d): ", n)
}
for i, err := range el.errs {
fmt.Fprintf(&str, "(%d): ", i)
str.WriteString(err.Error() + " - ")
}
return str.String()
return multierror.List{Errs: errs}
}

View File

@ -0,0 +1,23 @@
package multierror
import (
"fmt"
"strings"
)
// List contains a list of errors
type List struct{ Errs []error }
func (el List) Error() string {
var str strings.Builder
if n := len(el.Errs); n > 0 {
fmt.Fprintf(&str, "multiple errors(%d): ", n)
}
for i, err := range el.Errs {
fmt.Fprintf(&str, "(%d): ", i)
str.WriteString(err.Error() + " - ")
}
return str.String()
}

View File

@ -41,7 +41,7 @@ func (s *Server) initNetwork() error {
}
// whoami
whoami := whoami.New(kitlog.With(s.logger, "unit", "whoami"), s.keyPair.Feed)
whoami := whoami.New(kitlog.With(s.logger, "unit", "whoami"), s.Whoami())
s.public.Register(whoami)
s.master.Register(whoami)
@ -49,7 +49,7 @@ func (s *Server) initNetwork() error {
// s.master.Register(replicate.NewPlug(s.Users))
tunnelPlug := tunnel.New(kitlog.With(s.logger, "unit", "tunnel"), s.keyPair.Feed)
tunnelPlug := tunnel.New(kitlog.With(s.logger, "unit", "tunnel"), s.rootCtx)
s.public.Register(tunnelPlug)
// tcp+shs

View File

@ -30,6 +30,15 @@ func (h manifestHandler) HandleCall(ctx context.Context, req *muxrpc.Request, ed
}
}
func init() {
if !json.Valid([]byte(manifest)) {
manifestMap := make(map[string]interface{})
err := json.Unmarshal([]byte(manifest), &manifestMap)
fmt.Println(err)
panic("manifest blob is broken json")
}
}
// this is a very simple hardcoded manifest.json dump which oasis' ssb-client expects to do it's magic.
const manifest manifestHandler = `
{
@ -43,7 +52,7 @@ const manifest manifestHandler = `
"connect": "duplex",
"endpoints": "source",
"isRoom": "async",
"ping": "sync",
"ping": "sync"
}
}`

View File

@ -3,6 +3,7 @@ package go_test
import (
"context"
"crypto/rand"
"fmt"
"testing"
"time"
@ -15,14 +16,10 @@ import (
"go.mindeco.de/ssb-rooms/roomsrv"
)
func TestTunnelServerSimple(t *testing.T) {
r := require.New(t)
a := assert.New(t)
// defer leakcheck.Check(t)
func createServerAndBots(t *testing.T, ctx context.Context, count uint) []*roomsrv.Server {
testInit(t)
r := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
botgroup, ctx := errgroup.WithContext(ctx)
bs := newBotServer(ctx, mainLog)
@ -34,17 +31,42 @@ func TestTunnelServerSimple(t *testing.T) {
roomsrv.WithAppKey(appKey),
roomsrv.WithContext(ctx),
}
theBots := []*roomsrv.Server{}
serv := makeNamedTestBot(t, "srv", netOpts)
botgroup.Go(bs.Serve(serv))
theBots = append(theBots, serv)
botA := makeNamedTestBot(t, "B", netOpts)
botgroup.Go(bs.Serve(botA))
for i := uint(1); i < count+1; i++ {
botI := makeNamedTestBot(t, fmt.Sprintf("%d", i), netOpts)
botgroup.Go(bs.Serve(botI))
theBots = append(theBots, botI)
}
botB := makeNamedTestBot(t, "C", netOpts)
botgroup.Go(bs.Serve(botB))
t.Cleanup(func() {
time.Sleep(1 * time.Second)
for _, bot := range theBots {
bot.Shutdown()
r.NoError(bot.Close())
}
r.NoError(botgroup.Wait())
})
theBots := []*roomsrv.Server{serv, botA, botB}
return theBots
}
func TestTunnelServerSimple(t *testing.T) {
// defer leakcheck.Check(t)
ctx, cancel := context.WithCancel(context.Background())
theBots := createServerAndBots(t, ctx, 2)
r := require.New(t)
a := assert.New(t)
serv := theBots[0]
botA := theBots[1]
botB := theBots[2]
// only allow B to dial A
serv.Allow(botA.Whoami(), true)
@ -101,10 +123,113 @@ func TestTunnelServerSimple(t *testing.T) {
// cleanup
cancel()
time.Sleep(1 * time.Second)
for _, bot := range theBots {
bot.Shutdown()
r.NoError(bot.Close())
}
r.NoError(botgroup.Wait())
}
func TestRoomAnnounce(t *testing.T) {
// defer leakcheck.Check(t)
ctx, cancel := context.WithCancel(context.Background())
theBots := createServerAndBots(t, ctx, 2)
r := require.New(t)
a := assert.New(t)
serv := theBots[0]
botA := theBots[1]
botB := theBots[2]
// only allow B to dial A
serv.Allow(botA.Whoami(), true)
serv.Allow(botB.Whoami(), true)
// allow bots to dial the remote
botA.Allow(serv.Whoami(), true)
botB.Allow(serv.Whoami(), true)
// should work (we allowed A)
err := botA.Network.Connect(ctx, serv.Network.GetListenAddr())
r.NoError(err, "connect A to the Server")
// shouldn't work (we did not allowed A)
err = botB.Network.Connect(ctx, serv.Network.GetListenAddr())
r.NoError(err, "connect B to the Server") // we dont see an error because it just establishes the tcp connection
t.Log("letting handshaking settle..")
time.Sleep(1 * time.Second)
var srvWho struct {
ID refs.FeedRef
}
edpOfA, has := botA.Network.GetEndpointFor(serv.Whoami())
r.True(has, "botA has no endpoint for the server")
edpOfB, has := botB.Network.GetEndpointFor(serv.Whoami())
r.True(has, "botB has no endpoint for the server!")
err = edpOfA.Async(ctx, &srvWho, muxrpc.TypeJSON, muxrpc.Method{"whoami"})
r.NoError(err)
a.True(serv.Whoami().Equal(&srvWho.ID))
err = edpOfB.Async(ctx, &srvWho, muxrpc.TypeJSON, muxrpc.Method{"whoami"})
r.NoError(err)
a.True(serv.Whoami().Equal(&srvWho.ID))
// let B listen for changes
newRoomMember, err := edpOfB.Source(ctx, muxrpc.TypeJSON, muxrpc.Method{"tunnel", "endpoints"})
r.NoError(err)
newMemberChan := make(chan string)
// read all the messages from endpoints and throw them over the channel
go func() {
for newRoomMember.Next(ctx) {
body, err := newRoomMember.Bytes()
if err != nil {
panic(err)
}
newMemberChan <- string(body)
}
close(newMemberChan)
}()
// announce A
var ret struct {
Action string
Success bool
Members uint
}
err = edpOfA.Async(ctx, &ret, muxrpc.TypeJSON, muxrpc.Method{"tunnel", "announce"})
r.NoError(err)
a.Equal("joined", ret.Action)
a.True(ret.Success)
a.EqualValues(1, ret.Members, "expected just one member")
select {
case <-time.After(10 * time.Second):
t.Error("timeout")
case got := <-newMemberChan:
t.Log("received join?")
t.Log(got)
}
time.Sleep(5 * time.Second)
err = edpOfA.Async(ctx, &ret, muxrpc.TypeJSON, muxrpc.Method{"tunnel", "leave"})
r.NoError(err)
a.Equal("left", ret.Action)
a.True(ret.Success)
a.EqualValues(0, ret.Members, "expected empty rooms")
select {
case <-time.After(10 * time.Second):
t.Error("timeout")
case got := <-newMemberChan:
t.Log("received leave?")
t.Log(got)
}
// cleanup
cancel()
}

View File

@ -4,6 +4,7 @@ import (
"context"
"errors"
"io"
"net"
"os"
"path/filepath"
"sync"
@ -12,6 +13,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/stretchr/testify/require"
"go.cryptoscope.co/muxrpc/v2/debug"
"go.mindeco.de/ssb-rooms/internal/maybemod/testutils"
"go.mindeco.de/ssb-rooms/internal/network"
@ -59,12 +61,16 @@ func (bs botServer) Serve(s *roomsrv.Server) func() error {
func makeNamedTestBot(t testing.TB, name string, opts []roomsrv.Option) *roomsrv.Server {
r := require.New(t)
testPath := filepath.Join("testrun", t.Name(), "bot-"+name)
os.RemoveAll(testPath)
botOptions := append(opts,
roomsrv.WithLogger(log.With(mainLog, "bot", name)),
roomsrv.WithRepoPath(testPath),
roomsrv.WithListenAddr(":0"),
roomsrv.WithNetworkConnTracker(network.NewLastWinsTracker()),
roomsrv.WithPostSecureConnWrapper(func(conn net.Conn) (net.Conn, error) {
return debug.WrapDump(filepath.Join(testPath, "muxdump"), conn)
}),
)
theBot, err := roomsrv.New(botOptions...)

2
test/nodejs/.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
node_modules
testrun

View File

@ -0,0 +1,150 @@
package nodejs_test
import (
"context"
"net"
"os"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.cryptoscope.co/muxrpc/v2"
"go.cryptoscope.co/netwrap"
"go.cryptoscope.co/secretstream"
)
func TestJSClient(t *testing.T) {
// defer leakcheck.Check(t)
// r := require.New(t)
// ts := newRandomSession(t)
ts := newSession(t, nil, nil)
ts.startGoServer()
s := ts.gobot
alice := ts.startJSBot(`
sbot.on('rpc:connect', rpc => {
var ret = rpc.tunnel.announce()
t.comment('announced')
console.warn(ret)
pull(
rpc.tunnel.endpoints(),
pull.drain(el => {
console.warn("from roomsrv:",el)
})
)
setTimeout(() => {
ret = rpc.tunnel.leave()
t.comment('left')
console.warn(ret)
}, 2500)
setTimeout(() => {
t.comment('shutting down')
exit()
}, 5000)
})
run()`, ``)
s.Allow(alice, true)
time.Sleep(5 * time.Second)
ts.wait()
// TODO: check wantManager for this connection is stopped when the jsbot exited
}
func TestJSServer(t *testing.T) {
// defer leakcheck.Check(t)
r := require.New(t)
a := assert.New(t)
os.RemoveAll("testrun")
// ts := newRandomSession(t)
ts := newSession(t, nil, nil)
ts.startGoServer()
client := ts.gobot
// alice is the server now
alice, port := ts.startJSBotAsServer("alice", "./testscripts/server.js")
client.Allow(*alice, true)
wrappedAddr := netwrap.WrapAddr(&net.TCPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: port,
}, secretstream.Addr{PubKey: alice.ID})
ctx, connCancel := context.WithCancel(context.TODO())
err := client.Network.Connect(ctx, wrappedAddr)
defer connCancel()
r.NoError(err, "connect #1 failed")
// this might fail if the previous node process is still running...
// TODO: properly write cleanup
time.Sleep(5 * time.Second)
srvEdp, has := client.Network.GetEndpointFor(*alice)
r.True(has, "botA has no endpoint for the server")
t.Log("connected")
// let B listen for changes
newRoomMember, err := srvEdp.Source(ctx, muxrpc.TypeJSON, muxrpc.Method{"tunnel", "endpoints"})
r.NoError(err)
newMemberChan := make(chan string)
// read all the messages from endpoints and throw them over the channel
go func() {
for newRoomMember.Next(ctx) {
body, err := newRoomMember.Bytes()
if err != nil {
panic(err)
}
newMemberChan <- string(body)
}
close(newMemberChan)
}()
// announce A
var ret bool
err = srvEdp.Async(ctx, &ret, muxrpc.TypeJSON, muxrpc.Method{"tunnel", "announce"})
r.NoError(err)
// a.Equal("joined", ret.Action)
a.False(ret, "would assume these are true but..?")
// a.EqualValues(1, ret.Members, "expected just one member")
select {
case <-time.After(10 * time.Second):
t.Error("timeout")
case got := <-newMemberChan:
t.Log("received join?")
t.Log(got)
}
time.Sleep(5 * time.Second)
err = srvEdp.Async(ctx, &ret, muxrpc.TypeJSON, muxrpc.Method{"tunnel", "leave"})
r.NoError(err)
// a.Equal("left", ret.Action)
a.False(ret, "would assume these are true but..?")
// a.EqualValues(0, ret.Members, "expected empty rooms")
select {
case <-time.After(10 * time.Second):
t.Error("timeout")
case got := <-newMemberChan:
t.Log("received leave?")
t.Log(got)
}
ts.wait()
}

2671
test/nodejs/package-lock.json generated Normal file

File diff suppressed because it is too large Load Diff

24
test/nodejs/package.json Normal file
View File

@ -0,0 +1,24 @@
{
"name": "go-ssb-rooms-tests",
"version": "1.0.1",
"description": "tests between go and ssb-js",
"main": "sbot_client.js",
"scripts": {
"test": "go test"
},
"author": "cryptix",
"dependencies": {
"run-parallel": "^1.1.9",
"run-series": "^1.1.9",
"secret-stack": "^6.3.1",
"sodium-native": "^3.2.0",
"ssb-config": "^3.4.5",
"ssb-conn": "^0.19.1",
"ssb-db": "^20.3.0",
"ssb-gossip": "^1.1.1",
"ssb-keys": "^8.0.0",
"ssb-replicate": "^1.3.2",
"ssb-room": "^1.3.0",
"tape": "^5.0.1"
}
}

View File

@ -0,0 +1,86 @@
const Path = require('path')
const { readFileSync } = require('fs')
const { loadOrCreateSync } = require('ssb-keys')
const pull = require('pull-stream') // used in eval scripts
const tape = require('tape')
const parallel = require('run-parallel') // used in eval scripts
const theStack = require('secret-stack')
const ssbCaps = require('ssb-caps')
const testSHSappKey = bufFromEnv('TEST_APPKEY')
let testAppkey = Buffer.from(ssbCaps.shs, 'base64')
if (testSHSappKey !== false) {
testAppkey = testSHSappKey
}
const createSbot = theStack({caps: {shs: testAppkey } })
.use(require('ssb-db'))
.use(require('ssb-gossip'))
.use(require('ssb-replicate'))
.use(require('ssb-conn'))
.use(require('ssb-room/tunnel/client'))
const testName = process.env.TEST_NAME
const testBob = process.env.TEST_BOB
const testAddr = process.env.TEST_GOADDR
const scriptBefore = readFileSync(process.env.TEST_BEFORE).toString()
const scriptAfter = readFileSync(process.env.TEST_AFTER).toString()
function bufFromEnv(evname) {
const has = process.env[evname]
if (has) {
return Buffer.from(has, 'base64')
}
return false
}
tape.createStream().pipe(process.stderr)
tape(testName, function (t) {
let timeoutLength = 15000
var tapeTimeout = null
function run() { // needs to be called by the before block when it's done
t.timeoutAfter(timeoutLength) // doesn't exit the process
tapeTimeout = setTimeout(() => {
t.comment('test timeout')
process.exit(1)
}, timeoutLength*1.25)
const to = `net:${testAddr}~shs:${testBob.substr(1).replace('.ed25519', '')}`
t.comment('dialing:' + to)
sbot.connect(to, (err) => {
t.error(err, 'connected')
eval(scriptAfter)
})
}
function exit() { // call this when you're done
sbot.close()
t.comment('closed sbot')
clearTimeout(tapeTimeout)
t.end()
process.exit(0)
}
const tempRepo = Path.join('testrun', testName)
const keys = loadOrCreateSync(Path.join(tempRepo, 'secret'))
const opts = {
allowPrivate: true,
path: tempRepo,
keys: keys
}
if (testSHSappKey !== false) {
opts.caps = opts.caps ? opts.caps : {}
opts.caps.shs = testSHSappKey
}
const sbot = createSbot(opts)
const alice = sbot.whoami()
t.comment('sbot spawned, running before')
console.log(alice.id) // tell go process who's incoming
eval(scriptBefore)
})

81
test/nodejs/sbot_serv.js Normal file
View File

@ -0,0 +1,81 @@
const Path = require('path')
const tape = require('tape')
const { readFileSync } = require('fs')
const { loadOrCreateSync } = require('ssb-keys')
const theStack = require('secret-stack')
const ssbCaps = require('ssb-caps')
const testSHSappKey = bufFromEnv('TEST_APPKEY')
let testAppkey = Buffer.from(ssbCaps.shs, 'base64')
if (testSHSappKey !== false) {
testAppkey = testSHSappKey
}
stackOpts = {appKey: require('ssb-caps').shs}
// stackOpts = {caps: {shs: testAppkey } }
const createSbot = theStack(stackOpts)
.use(require('ssb-db'))
.use(require('ssb-master'))
.use(require('ssb-logging'))
.use(require('ssb-conn'))
.use(require('ssb-room/tunnel/server'))
const testName = process.env['TEST_NAME']
const testPort = process.env['TEST_PORT']
const testSession = require(process.env['TEST_BEFORE'])
// const scriptBefore = readFileSync(
// const scriptAfter = readFileSync(process.env['TEST_AFTER']).toString()
tape.createStream().pipe(process.stderr);
tape(testName, function (t) {
// t.timeoutAfter(30000) // doesn't exit the process
// const tapeTimeout = setTimeout(() => {
// t.comment("test timeout")
// process.exit(1)
// }, 50000)
function exit() { // call this when you're done
sbot.close()
t.comment('closed jsbot')
// clearTimeout(tapeTimeout)
t.end()
}
const tempRepo = Path.join('testrun', testName)
const keys = loadOrCreateSync(Path.join(tempRepo, 'secret'))
const sbot = createSbot({
port: testPort,
path: tempRepo,
keys: keys,
})
const alice = sbot.whoami()
// const replicate_changes = sbot.replicate.changes()
t.comment("sbot spawned, running before")
function ready() {
console.warn('ready!', alice.id)
console.log(alice.id) // tell go process who our pubkey
}
testSession.before(sbot, ready)
sbot.on("rpc:connect", (remote, isClient) => {
t.comment("new connection: "+ remote.id)
testSession.after(sbot, remote)
})
})
// util
function bufFromEnv(evname) {
const has = process.env[evname]
if (has) {
return Buffer.from(has, 'base64')
}
return false
}

View File

@ -1 +1,261 @@
// SPDX-License-Identifier: MIT
// Package nodejs_test contains test scenarios and helpers to run interoparability tests against the javascript implementation.
package nodejs_test
import (
"bufio"
"context"
"crypto/rand"
"encoding/base64"
"fmt"
"io/ioutil"
mrand "math/rand"
"net"
"os"
"os/exec"
"path/filepath"
"testing"
"time"
"golang.org/x/sync/errgroup"
"github.com/go-kit/kit/log"
"github.com/stretchr/testify/require"
"go.cryptoscope.co/muxrpc/v2/debug"
"go.cryptoscope.co/netwrap"
refs "go.mindeco.de/ssb-refs"
"go.mindeco.de/ssb-rooms/internal/maybemod/testutils"
"go.mindeco.de/ssb-rooms/roomsrv"
)
func init() {
err := os.RemoveAll("testrun")
if err != nil {
fmt.Println("failed to clean testrun dir")
panic(err)
}
}
func writeFile(t *testing.T, data string) string {
r := require.New(t)
f, err := ioutil.TempFile("testrun/"+t.Name(), "*.js")
r.NoError(err)
_, err = fmt.Fprintf(f, "%s", data)
r.NoError(err)
err = f.Close()
r.NoError(err)
return f.Name()
}
type testSession struct {
t *testing.T
info log.Logger
repo string
keySHS, keyHMAC []byte
// since we can't pass *testing.T to other goroutines, we use this to collect errors from background taskts
backgroundErrs []<-chan error
gobot *roomsrv.Server
done errgroup.Group
// doneJS, doneGo <-chan struct{}
ctx context.Context
cancel context.CancelFunc
}
// TODO: restrucuture so that we can test both (default and random net keys) with the same Code
// rolls random values for secret-handshake app-key and HMAC
func newRandomSession(t *testing.T) *testSession {
appKey := make([]byte, 32)
rand.Read(appKey)
hmacKey := make([]byte, 32)
rand.Read(hmacKey)
return newSession(t, appKey, hmacKey)
}
// if appKey is nil, the default value is used
// if hmac is nil, the object string is signed instead
func newSession(t *testing.T, appKey, hmacKey []byte) *testSession {
repo := filepath.Join("testrun", t.Name())
err := os.RemoveAll(repo)
if err != nil {
t.Errorf("remove testrun folder (%s) for this test: %s", repo, err)
}
ts := &testSession{
info: testutils.NewRelativeTimeLogger(nil),
repo: repo,
t: t,
keySHS: appKey,
keyHMAC: hmacKey,
}
// todo: hook into deadline
ts.ctx, ts.cancel = context.WithCancel(context.Background())
return ts
}
func (ts *testSession) startGoServer(opts ...roomsrv.Option) {
r := require.New(ts.t)
// prepend defaults
opts = append([]roomsrv.Option{
roomsrv.WithLogger(ts.info),
roomsrv.WithListenAddr("localhost:0"),
roomsrv.WithRepoPath(ts.repo),
roomsrv.WithContext(ts.ctx),
}, opts...)
if ts.keySHS != nil {
opts = append(opts, roomsrv.WithAppKey(ts.keySHS))
}
opts = append(opts,
roomsrv.WithPostSecureConnWrapper(func(conn net.Conn) (net.Conn, error) {
return debug.WrapDump(filepath.Join("testrun", ts.t.Name(), "muxdump"), conn)
}),
)
srv, err := roomsrv.New(opts...)
r.NoError(err, "failed to init tees a server")
ts.t.Logf("go server: %s", srv.Whoami())
ts.t.Cleanup(func() {
srv.Close()
})
ts.done.Go(func() error {
err := srv.Network.Serve(ts.ctx)
if err != nil {
err = fmt.Errorf("node serve exited: %w", err)
ts.t.Log(err)
return err
}
return nil
})
ts.gobot = srv
// TODO: make muxrpc client and connect to whoami for _ready_ ?
return
}
var jsBotCnt = 0
func (ts *testSession) startJSBot(jsbefore, jsafter string) refs.FeedRef {
return ts.startJSBotWithName("", jsbefore, jsafter)
}
// returns the jsbots pubkey
func (ts *testSession) startJSBotWithName(name, jsbefore, jsafter string) refs.FeedRef {
ts.t.Log("starting client", name)
r := require.New(ts.t)
cmd := exec.CommandContext(ts.ctx, "node", "./sbot_client.js")
cmd.Stderr = os.Stderr
outrc, err := cmd.StdoutPipe()
r.NoError(err)
if name == "" {
name = fmt.Sprint(ts.t.Name(), jsBotCnt)
}
jsBotCnt++
env := []string{
"TEST_NAME=" + name,
"TEST_BOB=" + ts.gobot.Whoami().Ref(),
"TEST_GOADDR=" + netwrap.GetAddr(ts.gobot.Network.GetListenAddr(), "tcp").String(),
"TEST_BEFORE=" + writeFile(ts.t, jsbefore),
"TEST_AFTER=" + writeFile(ts.t, jsafter),
}
if ts.keySHS != nil {
env = append(env, "TEST_APPKEY="+base64.StdEncoding.EncodeToString(ts.keySHS))
}
if ts.keyHMAC != nil {
env = append(env, "TEST_HMACKEY="+base64.StdEncoding.EncodeToString(ts.keyHMAC))
}
cmd.Env = env
r.NoError(cmd.Start(), "failed to init test js-sbot")
ts.done.Go(cmd.Wait)
ts.t.Cleanup(func() {
cmd.Process.Kill()
})
pubScanner := bufio.NewScanner(outrc) // TODO muxrpc comms?
r.True(pubScanner.Scan(), "multiple lines of output from js - expected #1 to be %s pubkey/id", name)
jsBotRef, err := refs.ParseFeedRef(pubScanner.Text())
r.NoError(err, "failed to get %s key from JS process")
ts.t.Logf("JS %s:%d %s", name, jsBotCnt, jsBotRef.Ref())
return *jsBotRef
}
func (ts *testSession) startJSBotAsServer(name, testScriptFileName string) (*refs.FeedRef, int) {
ts.t.Log("starting srv", name)
r := require.New(ts.t)
cmd := exec.CommandContext(ts.ctx, "node", "./sbot_serv.js")
cmd.Stderr = os.Stderr
outrc, err := cmd.StdoutPipe()
r.NoError(err)
if name == "" {
name = fmt.Sprintf("jsbot-%d", jsBotCnt)
}
jsBotCnt++
var port = 1024 + mrand.Intn(23000)
env := []string{
"TEST_NAME=" + filepath.Join(ts.t.Name(), "jsbot-"+name),
"TEST_BOB=" + ts.gobot.Whoami().Ref(),
fmt.Sprintf("TEST_PORT=%d", port),
"TEST_BEFORE=" + testScriptFileName,
}
// if ts.keySHS != nil {
// env = append(env, "TEST_APPKEY="+base64.StdEncoding.EncodeToString(ts.keySHS))
// }
cmd.Env = env
r.NoError(cmd.Start(), "failed to init test js-sbot")
ts.done.Go(cmd.Wait)
ts.t.Cleanup(func() {
cmd.Process.Kill()
})
pubScanner := bufio.NewScanner(outrc) // TODO muxrpc comms?
r.True(pubScanner.Scan(), "multiple lines of output from js - expected #1 to be %s pubkey/id", name)
srvRef, err := refs.ParseFeedRef(pubScanner.Text())
r.NoError(err, "failed to get srvRef key from JS process")
ts.t.Logf("JS %s: %s port: %d", name, srvRef.Ref(), port)
return srvRef, port
}
func (ts *testSession) wait() {
closeErrc := make(chan error)
go func() {
time.Sleep(15 * time.Second) // would be nice to get -test.timeout for this
ts.gobot.Shutdown()
closeErrc <- ts.gobot.Close()
close(closeErrc)
}()
for err := range testutils.MergeErrorChans(append(ts.backgroundErrs, closeErrc)...) {
require.NoError(ts.t, err)
}
require.NoError(ts.t, ts.done.Wait())
}

View File

@ -0,0 +1,11 @@
module.exports = {
before: (sbot, ready) => {
console.warn('before:', sbot.id)
setTimeout(ready, 1000)
// ready()
},
after: (sbot, client) => {
console.warn('after:', sbot.id, client.id)
}
}