Merge pull request #220 from ssb-ngi-pointer/room-attendants

room.attendants
This commit is contained in:
Henry 2021-05-18 15:56:59 +02:00 committed by GitHub
commit d8da4ae94b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 540 additions and 47 deletions

View File

@ -0,0 +1,125 @@
// SPDX-License-Identifier: MIT
package broadcasts
import (
"io"
"sync"
"github.com/ssb-ngi-pointer/go-ssb-room/internal/maybemod/multierror"
refs "go.mindeco.de/ssb-refs"
)
type AttendantsEmitter interface {
Joined(member refs.FeedRef) error
Left(member refs.FeedRef) error
io.Closer
}
// NewAttendantsEmitter returns the Sink, to write to the broadcaster, and the new
// broadcast instance.
func NewAttendantsEmitter() (AttendantsEmitter, *AttendantsBroadcast) {
bcst := AttendantsBroadcast{
mu: &sync.Mutex{},
sinks: make(map[*AttendantsEmitter]struct{}),
}
return (*attendantsSink)(&bcst), &bcst
}
// AttendantsBroadcast is an interface for registering one or more Sinks to recieve
// updates.
type AttendantsBroadcast struct {
mu *sync.Mutex
sinks map[*AttendantsEmitter]struct{}
}
// Register a Sink for updates to be sent. also returns
func (bcst *AttendantsBroadcast) Register(sink AttendantsEmitter) func() {
bcst.mu.Lock()
defer bcst.mu.Unlock()
bcst.sinks[&sink] = struct{}{}
return func() {
bcst.mu.Lock()
defer bcst.mu.Unlock()
delete(bcst.sinks, &sink)
sink.Close()
}
}
type attendantsSink AttendantsBroadcast
func (bcst *attendantsSink) Joined(member refs.FeedRef) error {
bcst.mu.Lock()
for s := range bcst.sinks {
err := (*s).Joined(member)
if err != nil {
delete(bcst.sinks, s)
}
}
bcst.mu.Unlock()
return nil
}
func (bcst *attendantsSink) Left(member refs.FeedRef) error {
bcst.mu.Lock()
for s := range bcst.sinks {
err := (*s).Left(member)
if err != nil {
delete(bcst.sinks, s)
}
}
bcst.mu.Unlock()
return nil
}
// Close implements the Sink interface.
func (bcst *attendantsSink) Close() error {
bcst.mu.Lock()
defer bcst.mu.Unlock()
sinks := make([]AttendantsEmitter, 0, len(bcst.sinks))
for sink := range bcst.sinks {
sinks = append(sinks, *sink)
}
bcst.mu.Lock()
defer bcst.mu.Unlock()
sinks = make([]AttendantsEmitter, 0, len(bcst.sinks))
for sink := range bcst.sinks {
sinks = append(sinks, *sink)
}
var (
wg sync.WaitGroup
me multierror.List
)
// might be fine without the waitgroup and concurrency
wg.Add(len(sinks))
for _, sink_ := range sinks {
go func(sink AttendantsEmitter) {
defer wg.Done()
err := sink.Close()
if err != nil {
me.Errs = append(me.Errs, err)
return
}
}(sink_)
}
wg.Wait()
if len(me.Errs) == 0 {
return nil
}
return me
}

View File

@ -0,0 +1,5 @@
// SPDX-License-Identifier: MIT
// Package broadcasts implements custom typed one-to-n facilities for broadcasting messages/calls to multiple subscribers.
// They loosely follow from luigi.Broadcasts but using concrete types instead of empty interfaces.
package broadcasts

View File

@ -9,31 +9,31 @@ import (
"github.com/ssb-ngi-pointer/go-ssb-room/internal/maybemod/multierror" "github.com/ssb-ngi-pointer/go-ssb-room/internal/maybemod/multierror"
) )
type RoomChangeSink interface { type EndpointsEmitter interface {
Update(members []string) error Update(members []string) error
io.Closer io.Closer
} }
// NewRoomChanger returns the Sink, to write to the broadcaster, and the new // NewEndpointsEmitter returns the Sink, to write to the broadcaster, and the new
// broadcast instance. // broadcast instance.
func NewRoomChanger() (RoomChangeSink, *RoomChangeBroadcast) { func NewEndpointsEmitter() (EndpointsEmitter, *EndpointsBroadcast) {
bcst := RoomChangeBroadcast{ bcst := EndpointsBroadcast{
mu: &sync.Mutex{}, mu: &sync.Mutex{},
sinks: make(map[*RoomChangeSink]struct{}), sinks: make(map[*EndpointsEmitter]struct{}),
} }
return (*broadcastSink)(&bcst), &bcst return (*endpointsSink)(&bcst), &bcst
} }
// RoomChangeBroadcast is an interface for registering one or more Sinks to recieve // EndpointsBroadcast is an interface for registering one or more Sinks to recieve
// updates. // updates.
type RoomChangeBroadcast struct { type EndpointsBroadcast struct {
mu *sync.Mutex mu *sync.Mutex
sinks map[*RoomChangeSink]struct{} sinks map[*EndpointsEmitter]struct{}
} }
// Register a Sink for updates to be sent. also returns // Register a Sink for updates to be sent. also returns
func (bcst *RoomChangeBroadcast) Register(sink RoomChangeSink) func() { func (bcst *EndpointsBroadcast) Register(sink EndpointsEmitter) func() {
bcst.mu.Lock() bcst.mu.Lock()
defer bcst.mu.Unlock() defer bcst.mu.Unlock()
bcst.sinks[&sink] = struct{}{} bcst.sinks[&sink] = struct{}{}
@ -46,10 +46,10 @@ func (bcst *RoomChangeBroadcast) Register(sink RoomChangeSink) func() {
} }
} }
type broadcastSink RoomChangeBroadcast type endpointsSink EndpointsBroadcast
// Pour implements the Sink interface. // Pour implements the Sink interface.
func (bcst *broadcastSink) Update(members []string) error { func (bcst *endpointsSink) Update(members []string) error {
bcst.mu.Lock() bcst.mu.Lock()
for s := range bcst.sinks { for s := range bcst.sinks {
@ -64,13 +64,13 @@ func (bcst *broadcastSink) Update(members []string) error {
} }
// Close implements the Sink interface. // Close implements the Sink interface.
func (bcst *broadcastSink) Close() error { func (bcst *endpointsSink) Close() error {
var sinks []RoomChangeSink var sinks []EndpointsEmitter
bcst.mu.Lock() bcst.mu.Lock()
defer bcst.mu.Unlock() defer bcst.mu.Unlock()
sinks = make([]RoomChangeSink, 0, len(bcst.sinks)) sinks = make([]EndpointsEmitter, 0, len(bcst.sinks))
for sink := range bcst.sinks { for sink := range bcst.sinks {
sinks = append(sinks, *sink) sinks = append(sinks, *sink)
@ -85,7 +85,7 @@ func (bcst *broadcastSink) Close() error {
wg.Add(len(sinks)) wg.Add(len(sinks))
for _, sink_ := range sinks { for _, sink_ := range sinks {
go func(sink RoomChangeSink) { go func(sink EndpointsEmitter) {
defer wg.Done() defer wg.Done()
err := sink.Close() err := sink.Close()

View File

@ -24,7 +24,7 @@ func (tp testPrinter) Update(members []string) error {
func (tp testPrinter) Close() error { return nil } func (tp testPrinter) Close() error { return nil }
func ExampleBroadcast() { func ExampleBroadcast() {
sink, bcast := NewRoomChanger() sink, bcast := NewEndpointsEmitter()
defer sink.Close() defer sink.Close()
var p1, p2 testPrinter var p1, p2 testPrinter
@ -44,7 +44,7 @@ func ExampleBroadcast() {
} }
func ExampleBroadcastCanceled() { func ExampleBroadcastCanceled() {
sink, bcast := NewRoomChanger() sink, bcast := NewEndpointsEmitter()
defer sink.Close() defer sink.Close()
var p1, p2 testPrinter var p1, p2 testPrinter
@ -78,7 +78,7 @@ func (tp erroringPrinter) Close() error { return nil }
func TestBroadcastOneErrs(t *testing.T) { func TestBroadcastOneErrs(t *testing.T) {
var buf = &bytes.Buffer{} var buf = &bytes.Buffer{}
sink, bcast := NewRoomChanger() sink, bcast := NewEndpointsEmitter()
defer sink.Close() defer sink.Close()
var p1 testPrinter var p1 testPrinter
@ -144,7 +144,7 @@ func TestBroadcast(t *testing.T) {
tc.rx = tc.tx tc.rx = tc.tx
} }
sink, bcast := NewRoomChanger() sink, bcast := NewEndpointsEmitter()
mkSink := func() Sink { mkSink := func() Sink {
var ( var (

View File

@ -0,0 +1,101 @@
// SPDX-License-Identifier: MIT
package server
import (
"context"
"encoding/json"
"fmt"
"sync"
"github.com/ssb-ngi-pointer/go-ssb-room/internal/network"
"github.com/ssb-ngi-pointer/go-ssb-room/roomdb"
"go.cryptoscope.co/muxrpc/v2"
refs "go.mindeco.de/ssb-refs"
)
// AttendantsUpdate is emitted if a single member joins or leaves.
// Type is either 'joined' or 'left'.
type AttendantsUpdate struct {
Type string `json:"type"`
ID refs.FeedRef `json:"id"`
}
// AttendantsInitialState is emitted the first time the stream is opened
type AttendantsInitialState struct {
Type string `json:"type"`
IDs []refs.FeedRef `json:"ids"`
}
func (h *Handler) attendants(ctx context.Context, req *muxrpc.Request, snk *muxrpc.ByteSink) error {
// for future updates
toPeer := newAttendantsEncoder(snk)
h.state.RegisterAttendantsUpdates(toPeer)
// get public key from the calling peer
peer, err := network.GetFeedRefFromAddr(req.RemoteAddr())
if err != nil {
return err
}
pm, err := h.config.GetPrivacyMode(ctx)
if err != nil {
return fmt.Errorf("running with unknown privacy mode")
}
if pm == roomdb.ModeCommunity || pm == roomdb.ModeRestricted {
_, err := h.members.GetByFeed(ctx, *peer)
if err != nil {
return fmt.Errorf("external user are not allowed to enumerate members")
}
}
err = json.NewEncoder(snk).Encode(AttendantsInitialState{
Type: "state",
IDs: h.state.ListAsRefs(),
})
if err != nil {
return err
}
return nil
}
// a muxrpc json encoder for endpoints broadcasts
type attendantsJSONEncoder struct {
mu sync.Mutex // only one caller to forwarder at a time
snk *muxrpc.ByteSink
enc *json.Encoder
}
func newAttendantsEncoder(snk *muxrpc.ByteSink) *attendantsJSONEncoder {
enc := json.NewEncoder(snk)
snk.SetEncoding(muxrpc.TypeJSON)
return &attendantsJSONEncoder{
snk: snk,
enc: enc,
}
}
func (uf *attendantsJSONEncoder) Joined(member refs.FeedRef) error {
uf.mu.Lock()
defer uf.mu.Unlock()
return uf.enc.Encode(AttendantsUpdate{
Type: "joined",
ID: member,
})
}
func (uf *attendantsJSONEncoder) Left(member refs.FeedRef) error {
uf.mu.Lock()
defer uf.mu.Unlock()
return uf.enc.Encode(AttendantsUpdate{
Type: "left",
ID: member,
})
}
func (uf *attendantsJSONEncoder) Close() error {
uf.mu.Lock()
defer uf.mu.Unlock()
return uf.snk.Close()
}

View File

@ -34,7 +34,8 @@ func New(log kitlog.Logger, netInfo network.ServerEndpointDetails, m *roomstate.
return h return h
} }
func (h *Handler) Register(mux typemux.HandlerMux, namespace muxrpc.Method) { func (h *Handler) RegisterTunnel(mux typemux.HandlerMux) {
var namespace = muxrpc.Method{"tunnel"}
mux.RegisterAsync(append(namespace, "isRoom"), typemux.AsyncFunc(h.metadata)) mux.RegisterAsync(append(namespace, "isRoom"), typemux.AsyncFunc(h.metadata))
mux.RegisterAsync(append(namespace, "ping"), typemux.AsyncFunc(h.ping)) mux.RegisterAsync(append(namespace, "ping"), typemux.AsyncFunc(h.ping))
@ -49,3 +50,20 @@ func (h *Handler) Register(mux typemux.HandlerMux, namespace muxrpc.Method) {
state: h.state, state: h.state,
}) })
} }
func (h *Handler) RegisterRoom(mux typemux.HandlerMux) {
var namespace = muxrpc.Method{"room"}
mux.RegisterAsync(append(namespace, "metadata"), typemux.AsyncFunc(h.metadata))
mux.RegisterAsync(append(namespace, "ping"), typemux.AsyncFunc(h.ping))
mux.RegisterAsync(append(namespace, "announce"), typemux.AsyncFunc(h.announce))
mux.RegisterAsync(append(namespace, "leave"), typemux.AsyncFunc(h.leave))
mux.RegisterSource(append(namespace, "attendants"), typemux.SourceFunc(h.attendants))
mux.RegisterDuplex(append(namespace, "connect"), connectHandler{
logger: h.logger,
self: h.netInfo.RoomID,
state: h.state,
})
}

View File

@ -105,10 +105,10 @@ func (h *Handler) leave(_ context.Context, req *muxrpc.Request) (interface{}, er
} }
func (h *Handler) endpoints(ctx context.Context, req *muxrpc.Request, snk *muxrpc.ByteSink) error { func (h *Handler) endpoints(ctx context.Context, req *muxrpc.Request, snk *muxrpc.ByteSink) error {
toPeer := newForwarder(snk) toPeer := newEndpointsForwarder(snk)
// for future updates // for future updates
h.state.Register(toPeer) h.state.RegisterLegacyEndpoints(toPeer)
// get public key from the calling peer // get public key from the calling peer
peer, err := network.GetFeedRefFromAddr(req.RemoteAddr()) peer, err := network.GetFeedRefFromAddr(req.RemoteAddr())
@ -140,28 +140,29 @@ func (h *Handler) endpoints(ctx context.Context, req *muxrpc.Request, snk *muxrp
return nil return nil
} }
type updateForwarder struct { // a muxrpc json encoder for endpoints broadcasts
type endpointsJSONEncoder struct {
mu sync.Mutex // only one caller to forwarder at a time mu sync.Mutex // only one caller to forwarder at a time
snk *muxrpc.ByteSink snk *muxrpc.ByteSink
enc *json.Encoder enc *json.Encoder
} }
func newForwarder(snk *muxrpc.ByteSink) *updateForwarder { func newEndpointsForwarder(snk *muxrpc.ByteSink) *endpointsJSONEncoder {
enc := json.NewEncoder(snk) enc := json.NewEncoder(snk)
snk.SetEncoding(muxrpc.TypeJSON) snk.SetEncoding(muxrpc.TypeJSON)
return &updateForwarder{ return &endpointsJSONEncoder{
snk: snk, snk: snk,
enc: enc, enc: enc,
} }
} }
func (uf *updateForwarder) Update(members []string) error { func (uf *endpointsJSONEncoder) Update(members []string) error {
uf.mu.Lock() uf.mu.Lock()
defer uf.mu.Unlock() defer uf.mu.Unlock()
return uf.enc.Encode(members) return uf.enc.Encode(members)
} }
func (uf *updateForwarder) Close() error { func (uf *endpointsJSONEncoder) Close() error {
uf.mu.Lock() uf.mu.Lock()
defer uf.mu.Unlock() defer uf.mu.Unlock()
return uf.snk.Close() return uf.snk.Close()

View File

@ -0,0 +1,208 @@
// SPDX-License-Identifier: MIT
package go_test
import (
"context"
"encoding/json"
"io"
"os"
"path/filepath"
"testing"
"time"
"github.com/ssb-ngi-pointer/go-ssb-room/muxrpc/handlers/tunnel/server"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.cryptoscope.co/muxrpc/v2"
refs "go.mindeco.de/ssb-refs"
)
// this tests the new room.attendants call
// basically the same test as endpoints_test
func TestRoomAttendants(t *testing.T) {
testInit(t)
r := require.New(t)
a := assert.New(t)
testPath := filepath.Join("testrun", t.Name())
os.RemoveAll(testPath)
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
t.Cleanup(cancel)
// create the roomsrv
ts := makeNamedTestBot(t, "server", ctx, nil)
ctx = ts.ctx
// three new clients, connected to server automaticaly
alf := ts.makeTestClient("alf")
bre := ts.makeTestClient("bre")
carl := ts.makeTestClient("carl")
// start with carl
// ===============
var ok bool
err := carl.Async(ctx, &ok, muxrpc.TypeJSON, muxrpc.Method{"room", "announce"})
r.NoError(err)
a.True(ok, "announce should be fine")
carlsSource, err := carl.Source(ctx, muxrpc.TypeJSON, muxrpc.Method{"room", "attendants"})
r.NoError(err)
t.Log("carl opened attendants")
// first message should be initial state
a.True(carlsSource.Next(ctx))
var initState server.AttendantsInitialState
decodeJSONsrc(t, carlsSource, &initState)
a.Equal("state", initState.Type)
a.Len(initState.IDs, 1)
a.True(initState.IDs[0].Equal(&carl.feed))
announcementsForCarl := make(announcements)
go logAttendantsStream(ts, carlsSource, "carl", announcementsForCarl)
time.Sleep(1 * time.Second) // give some time to process new events
a.Len(announcementsForCarl, 0, "none yet")
// let alf join the room
// =====================
err = alf.Async(ctx, &ok, muxrpc.TypeJSON, muxrpc.Method{"room", "announce"})
r.NoError(err)
a.True(ok, "announce should be fine")
alfsSource, err := alf.Source(ctx, muxrpc.TypeJSON, muxrpc.Method{"room", "attendants"})
r.NoError(err)
// first message should be initial state
a.True(alfsSource.Next(ctx))
decodeJSONsrc(t, alfsSource, &initState)
a.Equal("state", initState.Type)
a.Len(initState.IDs, 2)
assertListContains(t, initState.IDs, carl.feed)
assertListContains(t, initState.IDs, alf.feed)
announcementsForAlf := make(announcements)
go logAttendantsStream(ts, alfsSource, "alf", announcementsForAlf)
time.Sleep(1 * time.Second) // give some time to process new events
// assert what alf saw
var seen bool
a.Len(announcementsForAlf, 0, "none yet")
// assert what carl saw
_, seen = announcementsForCarl[alf.feed.Ref()]
a.True(seen, "carl saw alf")
// let bre join the room
err = bre.Async(ctx, &ok, muxrpc.TypeJSON, muxrpc.Method{"room", "announce"})
r.NoError(err)
a.True(ok, "announce should be fine")
bresSource, err := bre.Source(ctx, muxrpc.TypeJSON, muxrpc.Method{"room", "attendants"})
r.NoError(err)
// first message should be initial state
a.True(bresSource.Next(ctx))
decodeJSONsrc(t, bresSource, &initState)
a.Equal("state", initState.Type)
a.Len(initState.IDs, 3)
assertListContains(t, initState.IDs, alf.feed)
assertListContains(t, initState.IDs, bre.feed)
assertListContains(t, initState.IDs, carl.feed)
announcementsForBre := make(announcements)
go logAttendantsStream(ts, bresSource, "bre", announcementsForBre)
time.Sleep(1 * time.Second) // give some time to process new events
a.Len(announcementsForBre, 0, "none yet")
// the two present people saw her
_, seen = announcementsForAlf[bre.feed.Ref()]
a.True(seen, "alf saw bre")
_, seen = announcementsForCarl[bre.feed.Ref()]
a.True(seen, "carl saw alf")
// shutdown alf first
alf.Terminate()
time.Sleep(1 * time.Second) // give some time to process new events
// bre and arl should have removed him
_, seen = announcementsForBre[alf.feed.Ref()]
a.False(seen, "alf should be gone for bre")
_, seen = announcementsForCarl[alf.feed.Ref()]
a.False(seen, "alf should be gone for carl")
// terminate server and the clients
ts.srv.Shutdown()
bre.Terminate()
carl.Terminate()
ts.srv.Close()
// wait for all muxrpc serve()s to exit
r.NoError(ts.serveGroup.Wait())
cancel()
}
func assertListContains(t *testing.T, lst []refs.FeedRef, who refs.FeedRef) {
var found = false
for _, feed := range lst {
if feed.Equal(&who) {
found = true
}
}
if !found {
t.Errorf("did not find %s in list of %d", who.ShortRef(), len(lst))
}
}
func decodeJSONsrc(t *testing.T, src *muxrpc.ByteSource, val interface{}) {
err := src.Reader(func(rd io.Reader) error {
return json.NewDecoder(rd).Decode(val)
})
if err != nil {
t.Fatal(err)
}
}
// consume endpoint messaes and put each peer on the passed map
func logAttendantsStream(ts *testSession, src *muxrpc.ByteSource, who string, a announcements) {
var update server.AttendantsUpdate
for src.Next(ts.ctx) {
body, err := src.Bytes()
if err != nil {
panic(err)
}
// ts.t.Log(who, "got body:", string(body))
err = json.Unmarshal(body, &update)
if err != nil {
panic(err)
}
ts.t.Log(who, "got an update:", update.Type, update.ID.ShortRef())
switch update.Type {
case "joined":
a[update.ID.Ref()] = struct{}{}
case "left":
delete(a, update.ID.Ref())
default:
ts.t.Fatalf("%s: unexpected update type: %v", who, update.Type)
}
}
if err := src.Err(); err != nil {
ts.t.Log(who, "source errored: ", err)
return
}
ts.t.Log(who, "stream closed")
}

View File

@ -1,3 +1,5 @@
// SPDX-License-Identifier: MIT
package go_test package go_test
import ( import (
@ -19,6 +21,8 @@ type announcements map[string]struct{}
// we will let three clients (alf, bre, crl) join and see that the endpoint output is as expected // we will let three clients (alf, bre, crl) join and see that the endpoint output is as expected
func TestEndpointClients(t *testing.T) { func TestEndpointClients(t *testing.T) {
testInit(t)
r := require.New(t) r := require.New(t)
a := assert.New(t) a := assert.New(t)
@ -40,12 +44,12 @@ func TestEndpointClients(t *testing.T) {
// let carl join the room // let carl join the room
// carl wont announce to emulate manyverse // carl wont announce to emulate manyverse
carlEndpointsSerc, err := carl.Source(ctx, muxrpc.TypeJSON, muxrpc.Method{"tunnel", "endpoints"}) carlEndpointsSrc, err := carl.Source(ctx, muxrpc.TypeJSON, muxrpc.Method{"tunnel", "endpoints"})
r.NoError(err) r.NoError(err)
t.Log("carl opened endpoints") t.Log("carl opened endpoints")
announcementsForCarl := make(announcements) announcementsForCarl := make(announcements)
go logEndpointsStream(ts, carlEndpointsSerc, "carl", announcementsForCarl) go logEndpointsStream(ts, carlEndpointsSrc, "carl", announcementsForCarl)
time.Sleep(1 * time.Second) // give some time to process new events time.Sleep(1 * time.Second) // give some time to process new events
_, seen := announcementsForCarl[carl.feed.Ref()] _, seen := announcementsForCarl[carl.feed.Ref()]

View File

@ -1,3 +1,5 @@
// SPDX-License-Identifier: MIT
package go_test package go_test
import ( import (

View File

@ -50,13 +50,13 @@ func (s *Server) initHandlers() {
mux.RegisterAsync(muxrpc.Method{"manifest"}, manifest) mux.RegisterAsync(muxrpc.Method{"manifest"}, manifest)
mux.RegisterAsync(muxrpc.Method{"whoami"}, whoami) mux.RegisterAsync(muxrpc.Method{"whoami"}, whoami)
// register tunnel.connect etc twice (as tunnel.* and room.*) // register old room v1 commands
var method = muxrpc.Method{"tunnel"} tunnelHandler.RegisterTunnel(mux)
tunnelHandler.Register(mux, method)
method = muxrpc.Method{"room"} // register new room v2 commands
tunnelHandler.Register(mux, method) tunnelHandler.RegisterRoom(mux)
var method = muxrpc.Method{"room"}
mux.RegisterAsync(append(method, "registerAlias"), typemux.AsyncFunc(aliasHandler.Register)) mux.RegisterAsync(append(method, "registerAlias"), typemux.AsyncFunc(aliasHandler.Register))
mux.RegisterAsync(append(method, "revokeAlias"), typemux.AsyncFunc(aliasHandler.Revoke)) mux.RegisterAsync(append(method, "revokeAlias"), typemux.AsyncFunc(aliasHandler.Revoke))

View File

@ -43,8 +43,8 @@ const manifest manifestHandler = `
"announce": "sync", "announce": "sync",
"leave": "sync", "leave": "sync",
"connect": "duplex", "connect": "duplex",
"endpoints": "source", "attendants": "source",
"isRoom": "async", "metadata": "async",
"ping": "sync" "ping": "sync"
}, },

View File

@ -1,6 +1,7 @@
package roomstate package roomstate
import ( import (
"fmt"
"sort" "sort"
"sync" "sync"
@ -14,8 +15,11 @@ import (
type Manager struct { type Manager struct {
logger kitlog.Logger logger kitlog.Logger
updater broadcasts.RoomChangeSink endpointsUpdater broadcasts.EndpointsEmitter
broadcaster *broadcasts.RoomChangeBroadcast endpointsbroadcaster *broadcasts.EndpointsBroadcast
attendantsUpdater broadcasts.AttendantsEmitter
attendantsbroadcaster *broadcasts.AttendantsBroadcast
roomMu *sync.Mutex roomMu *sync.Mutex
room roomStateMap room roomStateMap
@ -24,7 +28,8 @@ type Manager struct {
func NewManager(log kitlog.Logger) *Manager { func NewManager(log kitlog.Logger) *Manager {
var m Manager var m Manager
m.logger = log m.logger = log
m.updater, m.broadcaster = broadcasts.NewRoomChanger() m.endpointsUpdater, m.endpointsbroadcaster = broadcasts.NewEndpointsEmitter()
m.attendantsUpdater, m.attendantsbroadcaster = broadcasts.NewAttendantsEmitter()
m.roomMu = new(sync.Mutex) m.roomMu = new(sync.Mutex)
m.room = make(roomStateMap) m.room = make(roomStateMap)
@ -44,25 +49,46 @@ func (rsm roomStateMap) AsList() []string {
return memberList return memberList
} }
// Register listens to changes to the room func (m *Manager) RegisterLegacyEndpoints(sink broadcasts.EndpointsEmitter) {
func (m *Manager) Register(sink broadcasts.RoomChangeSink) { m.endpointsbroadcaster.Register(sink)
m.broadcaster.Register(sink)
} }
// List just returns a list of feed references func (m *Manager) RegisterAttendantsUpdates(sink broadcasts.AttendantsEmitter) {
m.attendantsbroadcaster.Register(sink)
}
// List just returns a list of feed references as strings
func (m *Manager) List() []string { func (m *Manager) List() []string {
m.roomMu.Lock() m.roomMu.Lock()
defer m.roomMu.Unlock() defer m.roomMu.Unlock()
return m.room.AsList() return m.room.AsList()
} }
func (m *Manager) ListAsRefs() []refs.FeedRef {
m.roomMu.Lock()
lst := m.room.AsList()
m.roomMu.Unlock()
rlst := make([]refs.FeedRef, len(lst))
for i, s := range lst {
fr, err := refs.ParseFeedRef(s)
if err != nil {
panic(fmt.Errorf("invalid feed ref in room state: %d: %s", i, err))
}
rlst[i] = *fr
}
return rlst
}
// AddEndpoint adds the endpoint to the room // AddEndpoint adds the endpoint to the room
func (m *Manager) AddEndpoint(who refs.FeedRef, edp muxrpc.Endpoint) { func (m *Manager) AddEndpoint(who refs.FeedRef, edp muxrpc.Endpoint) {
m.roomMu.Lock() m.roomMu.Lock()
// add ref to to the room map // add ref to to the room map
m.room[who.Ref()] = edp m.room[who.Ref()] = edp
// update all the connected tunnel.endpoints calls // update all the connected tunnel.endpoints calls
m.updater.Update(m.room.AsList()) m.endpointsUpdater.Update(m.room.AsList())
// update all the connected room.attendants calls
m.attendantsUpdater.Joined(who)
m.roomMu.Unlock() m.roomMu.Unlock()
} }
@ -72,7 +98,9 @@ func (m *Manager) Remove(who refs.FeedRef) {
// remove ref from lobby // remove ref from lobby
delete(m.room, who.Ref()) delete(m.room, who.Ref())
// update all the connected tunnel.endpoints calls // update all the connected tunnel.endpoints calls
m.updater.Update(m.room.AsList()) m.endpointsUpdater.Update(m.room.AsList())
// update all the connected room.attendants calls
m.attendantsUpdater.Left(who)
m.roomMu.Unlock() m.roomMu.Unlock()
} }
@ -88,7 +116,8 @@ func (m *Manager) AlreadyAdded(who refs.FeedRef, edp muxrpc.Endpoint) bool {
m.room[who.Ref()] = edp m.room[who.Ref()] = edp
// update everyone // update everyone
m.updater.Update(m.room.AsList()) m.endpointsUpdater.Update(m.room.AsList())
m.attendantsUpdater.Joined(who)
} }
m.roomMu.Unlock() m.roomMu.Unlock()