add slicker attendants broadcaster

This commit is contained in:
Henry 2021-05-17 15:40:47 +02:00
parent ec4edcccbc
commit 89d3881258
2 changed files with 142 additions and 9 deletions

View File

@ -0,0 +1,121 @@
// SPDX-License-Identifier: MIT
package broadcasts
import (
"io"
"sync"
"github.com/ssb-ngi-pointer/go-ssb-room/internal/maybemod/multierror"
refs "go.mindeco.de/ssb-refs"
)
type AttendantsEmitter interface {
Joined(member refs.FeedRef) error
Left(member refs.FeedRef) error
io.Closer
}
// NewAttendantsEmitter returns the Sink, to write to the broadcaster, and the new
// broadcast instance.
func NewAttendantsEmitter() (AttendantsEmitter, *AttendantsBroadcast) {
bcst := AttendantsBroadcast{
mu: &sync.Mutex{},
sinks: make(map[*AttendantsEmitter]struct{}),
}
return (*attendantsSink)(&bcst), &bcst
}
// AttendantsBroadcast is an interface for registering one or more Sinks to recieve
// updates.
type AttendantsBroadcast struct {
mu *sync.Mutex
sinks map[*AttendantsEmitter]struct{}
}
// Register a Sink for updates to be sent. also returns
func (bcst *AttendantsBroadcast) Register(sink AttendantsEmitter) func() {
bcst.mu.Lock()
defer bcst.mu.Unlock()
bcst.sinks[&sink] = struct{}{}
return func() {
bcst.mu.Lock()
defer bcst.mu.Unlock()
delete(bcst.sinks, &sink)
sink.Close()
}
}
type attendantsSink AttendantsBroadcast
func (bcst *attendantsSink) Joined(member refs.FeedRef) error {
bcst.mu.Lock()
for s := range bcst.sinks {
err := (*s).Joined(member)
if err != nil {
delete(bcst.sinks, s)
}
}
bcst.mu.Unlock()
return nil
}
func (bcst *attendantsSink) Left(member refs.FeedRef) error {
bcst.mu.Lock()
for s := range bcst.sinks {
err := (*s).Left(member)
if err != nil {
delete(bcst.sinks, s)
}
}
bcst.mu.Unlock()
return nil
}
// Close implements the Sink interface.
func (bcst *attendantsSink) Close() error {
var sinks []AttendantsEmitter
bcst.mu.Lock()
defer bcst.mu.Unlock()
sinks = make([]AttendantsEmitter, 0, len(bcst.sinks))
for sink := range bcst.sinks {
sinks = append(sinks, *sink)
}
var (
wg sync.WaitGroup
me multierror.List
)
// might be fine without the waitgroup and concurrency
wg.Add(len(sinks))
for _, sink_ := range sinks {
go func(sink AttendantsEmitter) {
defer wg.Done()
err := sink.Close()
if err != nil {
me.Errs = append(me.Errs, err)
return
}
}(sink_)
}
wg.Wait()
if len(me.Errs) == 0 {
return nil
}
return me
}

View File

@ -14,8 +14,11 @@ import (
type Manager struct {
logger kitlog.Logger
endpointsUpdater broadcasts.RoomChangeSink
endpointsbroadcaster *broadcasts.RoomChangeBroadcast
endpointsUpdater broadcasts.EndpointsEmitter
endpointsbroadcaster *broadcasts.EndpointsBroadcast
attendantsUpdater broadcasts.AttendantsEmitter
attendantsbroadcaster *broadcasts.AttendantsBroadcast
roomMu *sync.Mutex
room roomStateMap
@ -24,7 +27,8 @@ type Manager struct {
func NewManager(log kitlog.Logger) *Manager {
var m Manager
m.logger = log
m.endpointsUpdater, m.endpointsbroadcaster = broadcasts.NewRoomChanger()
m.endpointsUpdater, m.endpointsbroadcaster = broadcasts.NewEndpointsEmitter()
m.attendantsUpdater, m.attendantsbroadcaster = broadcasts.NewAttendantsEmitter()
m.roomMu = new(sync.Mutex)
m.room = make(roomStateMap)
@ -44,9 +48,12 @@ func (rsm roomStateMap) AsList() []string {
return memberList
}
// Register listens to changes to the room
func (m *Manager) RegisterLegacyEndpoints(sink broadcasts.RoomChangeSink) {
m.broadcaster.Register(sink)
func (m *Manager) RegisterLegacyEndpoints(sink broadcasts.EndpointsEmitter) {
m.endpointsbroadcaster.Register(sink)
}
func (m *Manager) RegisterAttendantsUpdates(sink broadcasts.AttendantsEmitter) {
m.attendantsbroadcaster.Register(sink)
}
// List just returns a list of feed references
@ -62,7 +69,9 @@ func (m *Manager) AddEndpoint(who refs.FeedRef, edp muxrpc.Endpoint) {
// add ref to to the room map
m.room[who.Ref()] = edp
// update all the connected tunnel.endpoints calls
m.updater.Update(m.room.AsList())
m.endpointsUpdater.Update(m.room.AsList())
// update all the connected room.attendants calls
m.attendantsUpdater.Joined(who)
m.roomMu.Unlock()
}
@ -72,7 +81,9 @@ func (m *Manager) Remove(who refs.FeedRef) {
// remove ref from lobby
delete(m.room, who.Ref())
// update all the connected tunnel.endpoints calls
m.updater.Update(m.room.AsList())
m.endpointsUpdater.Update(m.room.AsList())
// update all the connected room.attendants calls
m.attendantsUpdater.Left(who)
m.roomMu.Unlock()
}
@ -88,7 +99,8 @@ func (m *Manager) AlreadyAdded(who refs.FeedRef, edp muxrpc.Endpoint) bool {
m.room[who.Ref()] = edp
// update everyone
m.updater.Update(m.room.AsList())
m.endpointsUpdater.Update(m.room.AsList())
m.attendantsUpdater.Joined(who)
}
m.roomMu.Unlock()