diff --git a/internal/broadcasts/attendants.go b/internal/broadcasts/attendants.go new file mode 100644 index 0000000..0ad1fd9 --- /dev/null +++ b/internal/broadcasts/attendants.go @@ -0,0 +1,121 @@ +// SPDX-License-Identifier: MIT + +package broadcasts + +import ( + "io" + "sync" + + "github.com/ssb-ngi-pointer/go-ssb-room/internal/maybemod/multierror" + refs "go.mindeco.de/ssb-refs" +) + +type AttendantsEmitter interface { + Joined(member refs.FeedRef) error + Left(member refs.FeedRef) error + + io.Closer +} + +// NewAttendantsEmitter returns the Sink, to write to the broadcaster, and the new +// broadcast instance. +func NewAttendantsEmitter() (AttendantsEmitter, *AttendantsBroadcast) { + bcst := AttendantsBroadcast{ + mu: &sync.Mutex{}, + sinks: make(map[*AttendantsEmitter]struct{}), + } + + return (*attendantsSink)(&bcst), &bcst +} + +// AttendantsBroadcast is an interface for registering one or more Sinks to recieve +// updates. +type AttendantsBroadcast struct { + mu *sync.Mutex + sinks map[*AttendantsEmitter]struct{} +} + +// Register a Sink for updates to be sent. also returns +func (bcst *AttendantsBroadcast) Register(sink AttendantsEmitter) func() { + bcst.mu.Lock() + defer bcst.mu.Unlock() + bcst.sinks[&sink] = struct{}{} + + return func() { + bcst.mu.Lock() + defer bcst.mu.Unlock() + delete(bcst.sinks, &sink) + sink.Close() + } +} + +type attendantsSink AttendantsBroadcast + +func (bcst *attendantsSink) Joined(member refs.FeedRef) error { + + bcst.mu.Lock() + for s := range bcst.sinks { + err := (*s).Joined(member) + if err != nil { + delete(bcst.sinks, s) + } + } + bcst.mu.Unlock() + + return nil +} + +func (bcst *attendantsSink) Left(member refs.FeedRef) error { + + bcst.mu.Lock() + for s := range bcst.sinks { + err := (*s).Left(member) + if err != nil { + delete(bcst.sinks, s) + } + } + bcst.mu.Unlock() + + return nil +} + +// Close implements the Sink interface. +func (bcst *attendantsSink) Close() error { + var sinks []AttendantsEmitter + + bcst.mu.Lock() + defer bcst.mu.Unlock() + + sinks = make([]AttendantsEmitter, 0, len(bcst.sinks)) + + for sink := range bcst.sinks { + sinks = append(sinks, *sink) + } + + var ( + wg sync.WaitGroup + me multierror.List + ) + + // might be fine without the waitgroup and concurrency + + wg.Add(len(sinks)) + for _, sink_ := range sinks { + go func(sink AttendantsEmitter) { + defer wg.Done() + + err := sink.Close() + if err != nil { + me.Errs = append(me.Errs, err) + return + } + }(sink_) + } + wg.Wait() + + if len(me.Errs) == 0 { + return nil + } + + return me +} diff --git a/roomstate/roomstate.go b/roomstate/roomstate.go index 25182b7..7199d98 100644 --- a/roomstate/roomstate.go +++ b/roomstate/roomstate.go @@ -14,8 +14,11 @@ import ( type Manager struct { logger kitlog.Logger - endpointsUpdater broadcasts.RoomChangeSink - endpointsbroadcaster *broadcasts.RoomChangeBroadcast + endpointsUpdater broadcasts.EndpointsEmitter + endpointsbroadcaster *broadcasts.EndpointsBroadcast + + attendantsUpdater broadcasts.AttendantsEmitter + attendantsbroadcaster *broadcasts.AttendantsBroadcast roomMu *sync.Mutex room roomStateMap @@ -24,7 +27,8 @@ type Manager struct { func NewManager(log kitlog.Logger) *Manager { var m Manager m.logger = log - m.endpointsUpdater, m.endpointsbroadcaster = broadcasts.NewRoomChanger() + m.endpointsUpdater, m.endpointsbroadcaster = broadcasts.NewEndpointsEmitter() + m.attendantsUpdater, m.attendantsbroadcaster = broadcasts.NewAttendantsEmitter() m.roomMu = new(sync.Mutex) m.room = make(roomStateMap) @@ -44,9 +48,12 @@ func (rsm roomStateMap) AsList() []string { return memberList } -// Register listens to changes to the room -func (m *Manager) RegisterLegacyEndpoints(sink broadcasts.RoomChangeSink) { - m.broadcaster.Register(sink) +func (m *Manager) RegisterLegacyEndpoints(sink broadcasts.EndpointsEmitter) { + m.endpointsbroadcaster.Register(sink) +} + +func (m *Manager) RegisterAttendantsUpdates(sink broadcasts.AttendantsEmitter) { + m.attendantsbroadcaster.Register(sink) } // List just returns a list of feed references @@ -62,7 +69,9 @@ func (m *Manager) AddEndpoint(who refs.FeedRef, edp muxrpc.Endpoint) { // add ref to to the room map m.room[who.Ref()] = edp // update all the connected tunnel.endpoints calls - m.updater.Update(m.room.AsList()) + m.endpointsUpdater.Update(m.room.AsList()) + // update all the connected room.attendants calls + m.attendantsUpdater.Joined(who) m.roomMu.Unlock() } @@ -72,7 +81,9 @@ func (m *Manager) Remove(who refs.FeedRef) { // remove ref from lobby delete(m.room, who.Ref()) // update all the connected tunnel.endpoints calls - m.updater.Update(m.room.AsList()) + m.endpointsUpdater.Update(m.room.AsList()) + // update all the connected room.attendants calls + m.attendantsUpdater.Left(who) m.roomMu.Unlock() } @@ -88,7 +99,8 @@ func (m *Manager) AlreadyAdded(who refs.FeedRef, edp muxrpc.Endpoint) bool { m.room[who.Ref()] = edp // update everyone - m.updater.Update(m.room.AsList()) + m.endpointsUpdater.Update(m.room.AsList()) + m.attendantsUpdater.Joined(who) } m.roomMu.Unlock()