From f4a0a48c9728cf7a576161351df1fee6482bd95c Mon Sep 17 00:00:00 2001 From: Henry Date: Wed, 27 Jan 2021 11:05:06 +0100 Subject: [PATCH] js compat bring schema of announce/leave and endpoints contents on par with JS implementation. annonce() and leave() somhow just return false endpoints is just arrays of endpoints - I _guess_ the consuming client has to keep track of who left and joined?! --- handlers/tunnel/state.go | 40 +++++++++++------------- internal/broadcasts/room_change.go | 12 ++----- test/nodejs/announce_test.go | 36 +++++++++++++++------ test/nodejs/testscripts/simple_client.js | 31 ++++++++++++------ test/nodejs/testscripts/template.js | 14 ++++++++- 5 files changed, 81 insertions(+), 52 deletions(-) diff --git a/handlers/tunnel/state.go b/handlers/tunnel/state.go index db815e8..9ba3f7d 100644 --- a/handlers/tunnel/state.go +++ b/handlers/tunnel/state.go @@ -51,6 +51,15 @@ type roomsStateMap map[string]roomStateMap // roomStateMap is a single room type roomStateMap map[string]muxrpc.Endpoint +// copy map entries to list for broadcast update +func (rsm roomStateMap) asList() []string { + memberList := make([]string, 0, len(rsm)) + for m := range rsm { + memberList = append(memberList, m) + } + return memberList +} + func (rs *roomState) isRoom(context.Context, *muxrpc.Request) (interface{}, error) { level.Debug(rs.logger).Log("called", "isRoom") return true, nil @@ -70,23 +79,14 @@ func (rs *roomState) announce(_ context.Context, req *muxrpc.Request) (interface } rs.roomsMu.Lock() - rs.updater.Update(broadcasts.RoomChange{ - Op: "joined", - Who: *ref, - }) // add ref to lobby rs.rooms["lobby"][ref.Ref()] = req.Endpoint() - members := len(rs.rooms["lobby"]) + + rs.updater.Update(rs.rooms["lobby"].asList()) rs.roomsMu.Unlock() - return RoomUpdate{"joined", true, uint(members)}, nil -} - -type RoomUpdate struct { - Action string - Success bool - Members uint + return false, nil } func (rs *roomState) leave(_ context.Context, req *muxrpc.Request) (interface{}, error) { @@ -96,17 +96,13 @@ func (rs *roomState) leave(_ context.Context, req *muxrpc.Request) (interface{}, } rs.roomsMu.Lock() - rs.updater.Update(broadcasts.RoomChange{ - Op: "left", - Who: *ref, - }) - - // add ref to lobby + // remove ref from lobby delete(rs.rooms["lobby"], ref.Ref()) - members := len(rs.rooms["lobby"]) + + rs.updater.Update(rs.rooms["lobby"].asList()) rs.roomsMu.Unlock() - return RoomUpdate{"left", true, uint(members)}, nil + return false, nil } func (rs *roomState) endpoints(_ context.Context, req *muxrpc.Request, snk *muxrpc.ByteSink, edp muxrpc.Endpoint) error { @@ -129,8 +125,8 @@ func newForwarder(snk *muxrpc.ByteSink) updateForwarder { } } -func (uf updateForwarder) Update(rc broadcasts.RoomChange) error { - return uf.enc.Encode(rc) +func (uf updateForwarder) Update(members []string) error { + return uf.enc.Encode(members) } func (uf updateForwarder) Close() error { diff --git a/internal/broadcasts/room_change.go b/internal/broadcasts/room_change.go index 1c98345..fc8bf1d 100644 --- a/internal/broadcasts/room_change.go +++ b/internal/broadcasts/room_change.go @@ -4,17 +4,11 @@ import ( "io" "sync" - refs "go.mindeco.de/ssb-refs" "go.mindeco.de/ssb-rooms/internal/maybemod/multierror" ) -type RoomChange struct { - Op string - Who refs.FeedRef -} - type RoomChangeSink interface { - Update(value RoomChange) error + Update(members []string) error io.Closer } @@ -53,11 +47,11 @@ func (bcst *RoomChangeBroadcast) Register(sink RoomChangeSink) func() { type broadcastSink RoomChangeBroadcast // Pour implements the Sink interface. -func (bcst *broadcastSink) Update(rc RoomChange) error { +func (bcst *broadcastSink) Update(members []string) error { bcst.mu.Lock() for s := range bcst.sinks { - err := (*s).Update(rc) + err := (*s).Update(members) if err != nil { delete(bcst.sinks, s) } diff --git a/test/nodejs/announce_test.go b/test/nodejs/announce_test.go index 94c7597..15974d7 100644 --- a/test/nodejs/announce_test.go +++ b/test/nodejs/announce_test.go @@ -27,9 +27,16 @@ func TestJSClient(t *testing.T) { srv.Network.GetListenAddr(), srv.Whoami(), ) - srv.Allow(alice, true) + time.Sleep(1500 * time.Millisecond) + bob := ts.startJSBot("./testscripts/simple_client.js", + srv.Network.GetListenAddr(), + srv.Whoami(), + ) + + srv.Allow(bob, true) + time.Sleep(5 * time.Second) ts.wait() @@ -45,28 +52,37 @@ func TestJSServer(t *testing.T) { ts := newRandomSession(t) // ts := newSession(t, nil) - client := ts.startGoServer() - // alice is the server now alice, port := ts.startJSBotAsServer("alice", "./testscripts/server.js") + // a 2nd js instance but as a client + aliceAddr := &net.TCPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: port, + } + + bob := ts.startJSBot("./testscripts/simple_client.js", + aliceAddr, + *alice, + ) + t.Log("started bob:", bob.Ref()) + + // now connect our go client + client := ts.startGoServer() client.Allow(*alice, true) // connect to alice - wrappedAddr := netwrap.WrapAddr(&net.TCPAddr{ - IP: net.ParseIP("127.0.0.1"), - Port: port, - }, secretstream.Addr{PubKey: alice.ID}) + aliceShsAddr := netwrap.WrapAddr(aliceAddr, secretstream.Addr{PubKey: alice.ID}) ctx, connCancel := context.WithCancel(context.TODO()) - err := client.Network.Connect(ctx, wrappedAddr) + err := client.Network.Connect(ctx, aliceShsAddr) defer connCancel() r.NoError(err, "connect #1 failed") // this might fail if the previous node process is still running... // TODO: properly write cleanup - time.Sleep(3 * time.Second) + time.Sleep(2 * time.Second) srvEdp, has := client.Network.GetEndpointFor(*alice) r.True(has, "botA has no endpoint for the server") @@ -105,7 +121,7 @@ func TestJSServer(t *testing.T) { t.Log("received join?") t.Log(got) } - time.Sleep(1 * time.Second) + time.Sleep(5 * time.Second) err = srvEdp.Async(ctx, &ret, muxrpc.TypeJSON, muxrpc.Method{"tunnel", "leave"}) r.NoError(err) diff --git a/test/nodejs/testscripts/simple_client.js b/test/nodejs/testscripts/simple_client.js index 66297fc..70944d0 100644 --- a/test/nodejs/testscripts/simple_client.js +++ b/test/nodejs/testscripts/simple_client.js @@ -3,9 +3,16 @@ const pull = require('pull-stream') module.exports = { before: (sbot, ready) => { sbot.on('rpc:connect', rpc => { - var ret = rpc.tunnel.announce() - console.warn('announced') - console.warn(ret) + // announce ourselves to the room/tunnel + rpc.tunnel.announce().then((ret) => { + console.warn('announced!') + console.warn(ret) + }).catch((err) => { + console.warn('announce failed') + throw err + }) + + // log all new endpoints pull( rpc.tunnel.endpoints(), pull.drain(el => { @@ -13,18 +20,22 @@ module.exports = { }) ) + // leave after 5 seconds setTimeout(() => { - ret = rpc.tunnel.leave() - console.warn('left') - console.warn(ret) - }, 2500) + rpc.tunnel.leave().then((ret) => { + console.warn('left') + console.warn(ret) + }).catch((err) => { + console.warn('left failed') + throw err + }) + }, 4000) }) ready() }, after: (sbot, exit) => { - console.warn('after connect...') - - setTimeout(exit, 5000) + console.warn('after connect... exiting in 10s') + setTimeout(exit, 10000) } } \ No newline at end of file diff --git a/test/nodejs/testscripts/template.js b/test/nodejs/testscripts/template.js index 9f7e747..2921123 100644 --- a/test/nodejs/testscripts/template.js +++ b/test/nodejs/testscripts/template.js @@ -1,4 +1,16 @@ -const pull = require('pull-stream') +/* +this is a tempalte for a script to be used in the go<>js tests. + +all the setup of the peers is done in sbot_client and sbot_server js. + +warning: only log to stderr (console.warn) +DONT log to stdout (console.log) as this is connected to the go test process for initialization + +TODO: pass the tape instance into the module, so that t.error and it's other helpers can be used. +proably by turning the exported object into an init function which returns the {before, after} object. +*/ + +// const pull = require('pull-stream') module.exports = { before: (sbot, ready) => {