rename broadcast types

This commit is contained in:
Henry 2021-05-17 15:31:49 +02:00
parent 3fae8a200a
commit ec4edcccbc
4 changed files with 31 additions and 30 deletions

View File

@ -9,31 +9,31 @@ import (
"github.com/ssb-ngi-pointer/go-ssb-room/internal/maybemod/multierror"
)
type RoomChangeSink interface {
type EndpointsEmitter interface {
Update(members []string) error
io.Closer
}
// NewRoomChanger returns the Sink, to write to the broadcaster, and the new
// NewEndpointsEmitter returns the Sink, to write to the broadcaster, and the new
// broadcast instance.
func NewRoomChanger() (RoomChangeSink, *RoomChangeBroadcast) {
bcst := RoomChangeBroadcast{
func NewEndpointsEmitter() (EndpointsEmitter, *EndpointsBroadcast) {
bcst := EndpointsBroadcast{
mu: &sync.Mutex{},
sinks: make(map[*RoomChangeSink]struct{}),
sinks: make(map[*EndpointsEmitter]struct{}),
}
return (*broadcastSink)(&bcst), &bcst
return (*endpointsSink)(&bcst), &bcst
}
// RoomChangeBroadcast is an interface for registering one or more Sinks to recieve
// EndpointsBroadcast is an interface for registering one or more Sinks to recieve
// updates.
type RoomChangeBroadcast struct {
type EndpointsBroadcast struct {
mu *sync.Mutex
sinks map[*RoomChangeSink]struct{}
sinks map[*EndpointsEmitter]struct{}
}
// Register a Sink for updates to be sent. also returns
func (bcst *RoomChangeBroadcast) Register(sink RoomChangeSink) func() {
func (bcst *EndpointsBroadcast) Register(sink EndpointsEmitter) func() {
bcst.mu.Lock()
defer bcst.mu.Unlock()
bcst.sinks[&sink] = struct{}{}
@ -46,10 +46,10 @@ func (bcst *RoomChangeBroadcast) Register(sink RoomChangeSink) func() {
}
}
type broadcastSink RoomChangeBroadcast
type endpointsSink EndpointsBroadcast
// Pour implements the Sink interface.
func (bcst *broadcastSink) Update(members []string) error {
func (bcst *endpointsSink) Update(members []string) error {
bcst.mu.Lock()
for s := range bcst.sinks {
@ -64,13 +64,13 @@ func (bcst *broadcastSink) Update(members []string) error {
}
// Close implements the Sink interface.
func (bcst *broadcastSink) Close() error {
var sinks []RoomChangeSink
func (bcst *endpointsSink) Close() error {
var sinks []EndpointsEmitter
bcst.mu.Lock()
defer bcst.mu.Unlock()
sinks = make([]RoomChangeSink, 0, len(bcst.sinks))
sinks = make([]EndpointsEmitter, 0, len(bcst.sinks))
for sink := range bcst.sinks {
sinks = append(sinks, *sink)
@ -85,7 +85,7 @@ func (bcst *broadcastSink) Close() error {
wg.Add(len(sinks))
for _, sink_ := range sinks {
go func(sink RoomChangeSink) {
go func(sink EndpointsEmitter) {
defer wg.Done()
err := sink.Close()

View File

@ -24,7 +24,7 @@ func (tp testPrinter) Update(members []string) error {
func (tp testPrinter) Close() error { return nil }
func ExampleBroadcast() {
sink, bcast := NewRoomChanger()
sink, bcast := NewEndpointsEmitter()
defer sink.Close()
var p1, p2 testPrinter
@ -44,7 +44,7 @@ func ExampleBroadcast() {
}
func ExampleBroadcastCanceled() {
sink, bcast := NewRoomChanger()
sink, bcast := NewEndpointsEmitter()
defer sink.Close()
var p1, p2 testPrinter
@ -78,7 +78,7 @@ func (tp erroringPrinter) Close() error { return nil }
func TestBroadcastOneErrs(t *testing.T) {
var buf = &bytes.Buffer{}
sink, bcast := NewRoomChanger()
sink, bcast := NewEndpointsEmitter()
defer sink.Close()
var p1 testPrinter
@ -144,7 +144,7 @@ func TestBroadcast(t *testing.T) {
tc.rx = tc.tx
}
sink, bcast := NewRoomChanger()
sink, bcast := NewEndpointsEmitter()
mkSink := func() Sink {
var (

View File

@ -108,7 +108,7 @@ func (h *Handler) endpoints(ctx context.Context, req *muxrpc.Request, snk *muxrp
toPeer := newForwarder(snk)
// for future updates
h.state.Register(toPeer)
h.state.RegisterLegacyEndpoints(toPeer)
// get public key from the calling peer
peer, err := network.GetFeedRefFromAddr(req.RemoteAddr())
@ -140,28 +140,29 @@ func (h *Handler) endpoints(ctx context.Context, req *muxrpc.Request, snk *muxrp
return nil
}
type updateForwarder struct {
// a muxrpc json encoder for endpoints broadcasts
type endpointsJSONEncoder struct {
mu sync.Mutex // only one caller to forwarder at a time
snk *muxrpc.ByteSink
enc *json.Encoder
}
func newForwarder(snk *muxrpc.ByteSink) *updateForwarder {
func newForwarder(snk *muxrpc.ByteSink) *endpointsJSONEncoder {
enc := json.NewEncoder(snk)
snk.SetEncoding(muxrpc.TypeJSON)
return &updateForwarder{
return &endpointsJSONEncoder{
snk: snk,
enc: enc,
}
}
func (uf *updateForwarder) Update(members []string) error {
func (uf *endpointsJSONEncoder) Update(members []string) error {
uf.mu.Lock()
defer uf.mu.Unlock()
return uf.enc.Encode(members)
}
func (uf *updateForwarder) Close() error {
func (uf *endpointsJSONEncoder) Close() error {
uf.mu.Lock()
defer uf.mu.Unlock()
return uf.snk.Close()

View File

@ -14,8 +14,8 @@ import (
type Manager struct {
logger kitlog.Logger
updater broadcasts.RoomChangeSink
broadcaster *broadcasts.RoomChangeBroadcast
endpointsUpdater broadcasts.RoomChangeSink
endpointsbroadcaster *broadcasts.RoomChangeBroadcast
roomMu *sync.Mutex
room roomStateMap
@ -24,7 +24,7 @@ type Manager struct {
func NewManager(log kitlog.Logger) *Manager {
var m Manager
m.logger = log
m.updater, m.broadcaster = broadcasts.NewRoomChanger()
m.endpointsUpdater, m.endpointsbroadcaster = broadcasts.NewRoomChanger()
m.roomMu = new(sync.Mutex)
m.room = make(roomStateMap)
@ -45,7 +45,7 @@ func (rsm roomStateMap) AsList() []string {
}
// Register listens to changes to the room
func (m *Manager) Register(sink broadcasts.RoomChangeSink) {
func (m *Manager) RegisterLegacyEndpoints(sink broadcasts.RoomChangeSink) {
m.broadcaster.Register(sink)
}