go-ssb-room/internal/broadcasts/room_change.go

106 lines
1.9 KiB
Go
Raw Normal View History

2021-02-09 11:53:33 +00:00
// SPDX-License-Identifier: MIT
2021-01-26 17:33:29 +00:00
package broadcasts
import (
"io"
"sync"
"github.com/ssb-ngi-pointer/go-ssb-room/internal/maybemod/multierror"
2021-01-26 17:33:29 +00:00
)
type RoomChangeSink interface {
Update(members []string) error
2021-01-26 17:33:29 +00:00
io.Closer
}
// NewRoomChanger returns the Sink, to write to the broadcaster, and the new
// broadcast instance.
func NewRoomChanger() (RoomChangeSink, *RoomChangeBroadcast) {
bcst := RoomChangeBroadcast{
mu: &sync.Mutex{},
sinks: make(map[*RoomChangeSink]struct{}),
}
return (*broadcastSink)(&bcst), &bcst
}
// RoomChangeBroadcast is an interface for registering one or more Sinks to recieve
// updates.
type RoomChangeBroadcast struct {
mu *sync.Mutex
sinks map[*RoomChangeSink]struct{}
}
// Register a Sink for updates to be sent. also returns
func (bcst *RoomChangeBroadcast) Register(sink RoomChangeSink) func() {
bcst.mu.Lock()
defer bcst.mu.Unlock()
bcst.sinks[&sink] = struct{}{}
return func() {
bcst.mu.Lock()
defer bcst.mu.Unlock()
delete(bcst.sinks, &sink)
sink.Close()
}
}
type broadcastSink RoomChangeBroadcast
// Pour implements the Sink interface.
func (bcst *broadcastSink) Update(members []string) error {
2021-01-26 17:33:29 +00:00
bcst.mu.Lock()
for s := range bcst.sinks {
err := (*s).Update(members)
2021-01-26 17:33:29 +00:00
if err != nil {
delete(bcst.sinks, s)
}
}
bcst.mu.Unlock()
return nil
}
// Close implements the Sink interface.
func (bcst *broadcastSink) Close() error {
var sinks []RoomChangeSink
bcst.mu.Lock()
defer bcst.mu.Unlock()
sinks = make([]RoomChangeSink, 0, len(bcst.sinks))
for sink := range bcst.sinks {
sinks = append(sinks, *sink)
}
var (
2021-01-27 09:01:35 +00:00
wg sync.WaitGroup
me multierror.List
2021-01-26 17:33:29 +00:00
)
// might be fine without the waitgroup and concurrency
wg.Add(len(sinks))
for _, sink_ := range sinks {
go func(sink RoomChangeSink) {
defer wg.Done()
err := sink.Close()
if err != nil {
2021-01-27 09:01:35 +00:00
me.Errs = append(me.Errs, err)
2021-01-26 17:33:29 +00:00
return
}
}(sink_)
}
wg.Wait()
2021-01-27 09:01:35 +00:00
if len(me.Errs) == 0 {
return nil
}
return me
2021-01-26 17:33:29 +00:00
}