js compat

bring schema of announce/leave and endpoints contents on par with JS
implementation.

annonce() and leave() somhow just return false

endpoints is just arrays of endpoints - I _guess_ the consuming client
has to keep track of who left and joined?!
This commit is contained in:
Henry 2021-01-27 11:05:06 +01:00
parent 1269b30f15
commit f4a0a48c97
5 changed files with 81 additions and 52 deletions

View File

@ -51,6 +51,15 @@ type roomsStateMap map[string]roomStateMap
// roomStateMap is a single room
type roomStateMap map[string]muxrpc.Endpoint
// copy map entries to list for broadcast update
func (rsm roomStateMap) asList() []string {
memberList := make([]string, 0, len(rsm))
for m := range rsm {
memberList = append(memberList, m)
}
return memberList
}
func (rs *roomState) isRoom(context.Context, *muxrpc.Request) (interface{}, error) {
level.Debug(rs.logger).Log("called", "isRoom")
return true, nil
@ -70,23 +79,14 @@ func (rs *roomState) announce(_ context.Context, req *muxrpc.Request) (interface
}
rs.roomsMu.Lock()
rs.updater.Update(broadcasts.RoomChange{
Op: "joined",
Who: *ref,
})
// add ref to lobby
rs.rooms["lobby"][ref.Ref()] = req.Endpoint()
members := len(rs.rooms["lobby"])
rs.updater.Update(rs.rooms["lobby"].asList())
rs.roomsMu.Unlock()
return RoomUpdate{"joined", true, uint(members)}, nil
}
type RoomUpdate struct {
Action string
Success bool
Members uint
return false, nil
}
func (rs *roomState) leave(_ context.Context, req *muxrpc.Request) (interface{}, error) {
@ -96,17 +96,13 @@ func (rs *roomState) leave(_ context.Context, req *muxrpc.Request) (interface{},
}
rs.roomsMu.Lock()
rs.updater.Update(broadcasts.RoomChange{
Op: "left",
Who: *ref,
})
// add ref to lobby
// remove ref from lobby
delete(rs.rooms["lobby"], ref.Ref())
members := len(rs.rooms["lobby"])
rs.updater.Update(rs.rooms["lobby"].asList())
rs.roomsMu.Unlock()
return RoomUpdate{"left", true, uint(members)}, nil
return false, nil
}
func (rs *roomState) endpoints(_ context.Context, req *muxrpc.Request, snk *muxrpc.ByteSink, edp muxrpc.Endpoint) error {
@ -129,8 +125,8 @@ func newForwarder(snk *muxrpc.ByteSink) updateForwarder {
}
}
func (uf updateForwarder) Update(rc broadcasts.RoomChange) error {
return uf.enc.Encode(rc)
func (uf updateForwarder) Update(members []string) error {
return uf.enc.Encode(members)
}
func (uf updateForwarder) Close() error {

View File

@ -4,17 +4,11 @@ import (
"io"
"sync"
refs "go.mindeco.de/ssb-refs"
"go.mindeco.de/ssb-rooms/internal/maybemod/multierror"
)
type RoomChange struct {
Op string
Who refs.FeedRef
}
type RoomChangeSink interface {
Update(value RoomChange) error
Update(members []string) error
io.Closer
}
@ -53,11 +47,11 @@ func (bcst *RoomChangeBroadcast) Register(sink RoomChangeSink) func() {
type broadcastSink RoomChangeBroadcast
// Pour implements the Sink interface.
func (bcst *broadcastSink) Update(rc RoomChange) error {
func (bcst *broadcastSink) Update(members []string) error {
bcst.mu.Lock()
for s := range bcst.sinks {
err := (*s).Update(rc)
err := (*s).Update(members)
if err != nil {
delete(bcst.sinks, s)
}

View File

@ -27,9 +27,16 @@ func TestJSClient(t *testing.T) {
srv.Network.GetListenAddr(),
srv.Whoami(),
)
srv.Allow(alice, true)
time.Sleep(1500 * time.Millisecond)
bob := ts.startJSBot("./testscripts/simple_client.js",
srv.Network.GetListenAddr(),
srv.Whoami(),
)
srv.Allow(bob, true)
time.Sleep(5 * time.Second)
ts.wait()
@ -45,28 +52,37 @@ func TestJSServer(t *testing.T) {
ts := newRandomSession(t)
// ts := newSession(t, nil)
client := ts.startGoServer()
// alice is the server now
alice, port := ts.startJSBotAsServer("alice", "./testscripts/server.js")
// a 2nd js instance but as a client
aliceAddr := &net.TCPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: port,
}
bob := ts.startJSBot("./testscripts/simple_client.js",
aliceAddr,
*alice,
)
t.Log("started bob:", bob.Ref())
// now connect our go client
client := ts.startGoServer()
client.Allow(*alice, true)
// connect to alice
wrappedAddr := netwrap.WrapAddr(&net.TCPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: port,
}, secretstream.Addr{PubKey: alice.ID})
aliceShsAddr := netwrap.WrapAddr(aliceAddr, secretstream.Addr{PubKey: alice.ID})
ctx, connCancel := context.WithCancel(context.TODO())
err := client.Network.Connect(ctx, wrappedAddr)
err := client.Network.Connect(ctx, aliceShsAddr)
defer connCancel()
r.NoError(err, "connect #1 failed")
// this might fail if the previous node process is still running...
// TODO: properly write cleanup
time.Sleep(3 * time.Second)
time.Sleep(2 * time.Second)
srvEdp, has := client.Network.GetEndpointFor(*alice)
r.True(has, "botA has no endpoint for the server")
@ -105,7 +121,7 @@ func TestJSServer(t *testing.T) {
t.Log("received join?")
t.Log(got)
}
time.Sleep(1 * time.Second)
time.Sleep(5 * time.Second)
err = srvEdp.Async(ctx, &ret, muxrpc.TypeJSON, muxrpc.Method{"tunnel", "leave"})
r.NoError(err)

View File

@ -3,9 +3,16 @@ const pull = require('pull-stream')
module.exports = {
before: (sbot, ready) => {
sbot.on('rpc:connect', rpc => {
var ret = rpc.tunnel.announce()
console.warn('announced')
console.warn(ret)
// announce ourselves to the room/tunnel
rpc.tunnel.announce().then((ret) => {
console.warn('announced!')
console.warn(ret)
}).catch((err) => {
console.warn('announce failed')
throw err
})
// log all new endpoints
pull(
rpc.tunnel.endpoints(),
pull.drain(el => {
@ -13,18 +20,22 @@ module.exports = {
})
)
// leave after 5 seconds
setTimeout(() => {
ret = rpc.tunnel.leave()
console.warn('left')
console.warn(ret)
}, 2500)
rpc.tunnel.leave().then((ret) => {
console.warn('left')
console.warn(ret)
}).catch((err) => {
console.warn('left failed')
throw err
})
}, 4000)
})
ready()
},
after: (sbot, exit) => {
console.warn('after connect...')
setTimeout(exit, 5000)
console.warn('after connect... exiting in 10s')
setTimeout(exit, 10000)
}
}

View File

@ -1,4 +1,16 @@
const pull = require('pull-stream')
/*
this is a tempalte for a script to be used in the go<>js tests.
all the setup of the peers is done in sbot_client and sbot_server js.
warning: only log to stderr (console.warn)
DONT log to stdout (console.log) as this is connected to the go test process for initialization
TODO: pass the tape instance into the module, so that t.error and it's other helpers can be used.
proably by turning the exported object into an init function which returns the {before, after} object.
*/
// const pull = require('pull-stream')
module.exports = {
before: (sbot, ready) => {