diff --git a/internal/broadcasts/attendants.go b/internal/broadcasts/attendants.go index 0ad1fd9..d6bdb3c 100644 --- a/internal/broadcasts/attendants.go +++ b/internal/broadcasts/attendants.go @@ -52,7 +52,6 @@ func (bcst *AttendantsBroadcast) Register(sink AttendantsEmitter) func() { type attendantsSink AttendantsBroadcast func (bcst *attendantsSink) Joined(member refs.FeedRef) error { - bcst.mu.Lock() for s := range bcst.sinks { err := (*s).Joined(member) @@ -66,7 +65,6 @@ func (bcst *attendantsSink) Joined(member refs.FeedRef) error { } func (bcst *attendantsSink) Left(member refs.FeedRef) error { - bcst.mu.Lock() for s := range bcst.sinks { err := (*s).Left(member) @@ -81,7 +79,13 @@ func (bcst *attendantsSink) Left(member refs.FeedRef) error { // Close implements the Sink interface. func (bcst *attendantsSink) Close() error { - var sinks []AttendantsEmitter + bcst.mu.Lock() + defer bcst.mu.Unlock() + sinks := make([]AttendantsEmitter, 0, len(bcst.sinks)) + + for sink := range bcst.sinks { + sinks = append(sinks, *sink) + } bcst.mu.Lock() defer bcst.mu.Unlock() diff --git a/muxrpc/test/go/attendants_test.go b/muxrpc/test/go/attendants_test.go index 9e4e2ff..ace0e09 100644 --- a/muxrpc/test/go/attendants_test.go +++ b/muxrpc/test/go/attendants_test.go @@ -50,7 +50,7 @@ func TestRoomAttendants(t *testing.T) { carlsSource, err := carl.Source(ctx, muxrpc.TypeJSON, muxrpc.Method{"room", "attendants"}) r.NoError(err) - t.Log("sarah opened endpoints") + t.Log("carl opened attendants") // first message should be initial state a.True(carlsSource.Next(ctx))