parent
89d3881258
commit
e7c20bcd5d
|
@ -0,0 +1,99 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/ssb-ngi-pointer/go-ssb-room/internal/network"
|
||||
"github.com/ssb-ngi-pointer/go-ssb-room/roomdb"
|
||||
"go.cryptoscope.co/muxrpc/v2"
|
||||
refs "go.mindeco.de/ssb-refs"
|
||||
)
|
||||
|
||||
// AttendantsUpdate is emitted if a single member joins or leaves.
|
||||
// Type is either 'joined' or 'left'.
|
||||
type AttendantsUpdate struct {
|
||||
Type string `json:"type"`
|
||||
ID refs.FeedRef `json:"id"`
|
||||
}
|
||||
|
||||
// AttendantsInitialState is emitted the first time the stream is opened
|
||||
type AttendantsInitialState struct {
|
||||
Type string `json:"type"`
|
||||
IDs []refs.FeedRef `json:"ids"`
|
||||
}
|
||||
|
||||
func (h *Handler) attendants(ctx context.Context, req *muxrpc.Request, snk *muxrpc.ByteSink) error {
|
||||
// for future updates
|
||||
toPeer := newAttendantsEncoder(snk)
|
||||
h.state.RegisterAttendantsUpdates(toPeer)
|
||||
|
||||
// get public key from the calling peer
|
||||
peer, err := network.GetFeedRefFromAddr(req.RemoteAddr())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
pm, err := h.config.GetPrivacyMode(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("running with unknown privacy mode")
|
||||
}
|
||||
|
||||
if pm == roomdb.ModeCommunity || pm == roomdb.ModeRestricted {
|
||||
_, err := h.members.GetByFeed(ctx, *peer)
|
||||
if err != nil {
|
||||
return fmt.Errorf("external user are not allowed to enumerate members")
|
||||
}
|
||||
}
|
||||
err = json.NewEncoder(snk).Encode(AttendantsInitialState{
|
||||
Type: "state",
|
||||
IDs: h.state.ListAsRefs(),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// a muxrpc json encoder for endpoints broadcasts
|
||||
type attendantsJSONEncoder struct {
|
||||
mu sync.Mutex // only one caller to forwarder at a time
|
||||
snk *muxrpc.ByteSink
|
||||
enc *json.Encoder
|
||||
}
|
||||
|
||||
func newAttendantsEncoder(snk *muxrpc.ByteSink) *attendantsJSONEncoder {
|
||||
enc := json.NewEncoder(snk)
|
||||
snk.SetEncoding(muxrpc.TypeJSON)
|
||||
return &attendantsJSONEncoder{
|
||||
snk: snk,
|
||||
enc: enc,
|
||||
}
|
||||
}
|
||||
|
||||
func (uf *attendantsJSONEncoder) Joined(member refs.FeedRef) error {
|
||||
uf.mu.Lock()
|
||||
defer uf.mu.Unlock()
|
||||
return uf.enc.Encode(AttendantsUpdate{
|
||||
Type: "joined",
|
||||
ID: member,
|
||||
})
|
||||
}
|
||||
|
||||
func (uf *attendantsJSONEncoder) Left(member refs.FeedRef) error {
|
||||
uf.mu.Lock()
|
||||
defer uf.mu.Unlock()
|
||||
return uf.enc.Encode(AttendantsUpdate{
|
||||
Type: "left",
|
||||
ID: member,
|
||||
})
|
||||
}
|
||||
|
||||
func (uf *attendantsJSONEncoder) Close() error {
|
||||
uf.mu.Lock()
|
||||
defer uf.mu.Unlock()
|
||||
return uf.snk.Close()
|
||||
}
|
|
@ -59,7 +59,7 @@ func (h *Handler) RegisterRoom(mux typemux.HandlerMux) {
|
|||
mux.RegisterAsync(append(namespace, "announce"), typemux.AsyncFunc(h.announce))
|
||||
mux.RegisterAsync(append(namespace, "leave"), typemux.AsyncFunc(h.leave))
|
||||
|
||||
mux.RegisterSource(append(namespace, "attendants"), typemux.SourceFunc(h.endpoints))
|
||||
mux.RegisterSource(append(namespace, "attendants"), typemux.SourceFunc(h.attendants))
|
||||
|
||||
mux.RegisterDuplex(append(namespace, "connect"), connectHandler{
|
||||
logger: h.logger,
|
||||
|
|
|
@ -105,7 +105,7 @@ func (h *Handler) leave(_ context.Context, req *muxrpc.Request) (interface{}, er
|
|||
}
|
||||
|
||||
func (h *Handler) endpoints(ctx context.Context, req *muxrpc.Request, snk *muxrpc.ByteSink) error {
|
||||
toPeer := newForwarder(snk)
|
||||
toPeer := newEndpointsForwarder(snk)
|
||||
|
||||
// for future updates
|
||||
h.state.RegisterLegacyEndpoints(toPeer)
|
||||
|
@ -147,7 +147,7 @@ type endpointsJSONEncoder struct {
|
|||
enc *json.Encoder
|
||||
}
|
||||
|
||||
func newForwarder(snk *muxrpc.ByteSink) *endpointsJSONEncoder {
|
||||
func newEndpointsForwarder(snk *muxrpc.ByteSink) *endpointsJSONEncoder {
|
||||
enc := json.NewEncoder(snk)
|
||||
snk.SetEncoding(muxrpc.TypeJSON)
|
||||
return &endpointsJSONEncoder{
|
||||
|
|
|
@ -0,0 +1,208 @@
|
|||
// SPDX-License-Identifier: MIT
|
||||
|
||||
package go_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/ssb-ngi-pointer/go-ssb-room/muxrpc/handlers/tunnel/server"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.cryptoscope.co/muxrpc/v2"
|
||||
refs "go.mindeco.de/ssb-refs"
|
||||
)
|
||||
|
||||
// this tests the new room.attendants call
|
||||
// basically the same test as endpoints_test
|
||||
func TestRoomAttendants(t *testing.T) {
|
||||
testInit(t)
|
||||
r := require.New(t)
|
||||
a := assert.New(t)
|
||||
|
||||
testPath := filepath.Join("testrun", t.Name())
|
||||
os.RemoveAll(testPath)
|
||||
|
||||
ctx := context.Background()
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
t.Cleanup(cancel)
|
||||
|
||||
// create the roomsrv
|
||||
ts := makeNamedTestBot(t, "server", ctx, nil)
|
||||
ctx = ts.ctx
|
||||
|
||||
// three new clients, connected to server automaticaly
|
||||
alf := ts.makeTestClient("alf")
|
||||
bre := ts.makeTestClient("bre")
|
||||
carl := ts.makeTestClient("carl")
|
||||
|
||||
// start with carl
|
||||
// ===============
|
||||
var ok bool
|
||||
err := carl.Async(ctx, &ok, muxrpc.TypeJSON, muxrpc.Method{"room", "announce"})
|
||||
r.NoError(err)
|
||||
a.True(ok, "announce should be fine")
|
||||
|
||||
carlsSource, err := carl.Source(ctx, muxrpc.TypeJSON, muxrpc.Method{"room", "attendants"})
|
||||
r.NoError(err)
|
||||
t.Log("sarah opened endpoints")
|
||||
|
||||
// first message should be initial state
|
||||
a.True(carlsSource.Next(ctx))
|
||||
var initState server.AttendantsInitialState
|
||||
decodeJSONsrc(t, carlsSource, &initState)
|
||||
a.Equal("state", initState.Type)
|
||||
a.Len(initState.IDs, 1)
|
||||
a.True(initState.IDs[0].Equal(&carl.feed))
|
||||
|
||||
announcementsForCarl := make(announcements)
|
||||
go logAttendantsStream(ts, carlsSource, "carl", announcementsForCarl)
|
||||
time.Sleep(1 * time.Second) // give some time to process new events
|
||||
|
||||
a.Len(announcementsForCarl, 0, "none yet")
|
||||
|
||||
// let alf join the room
|
||||
// =====================
|
||||
err = alf.Async(ctx, &ok, muxrpc.TypeJSON, muxrpc.Method{"room", "announce"})
|
||||
r.NoError(err)
|
||||
a.True(ok, "announce should be fine")
|
||||
|
||||
alfsSource, err := alf.Source(ctx, muxrpc.TypeJSON, muxrpc.Method{"room", "attendants"})
|
||||
r.NoError(err)
|
||||
|
||||
// first message should be initial state
|
||||
a.True(alfsSource.Next(ctx))
|
||||
decodeJSONsrc(t, alfsSource, &initState)
|
||||
a.Equal("state", initState.Type)
|
||||
a.Len(initState.IDs, 2)
|
||||
assertListContains(t, initState.IDs, carl.feed)
|
||||
assertListContains(t, initState.IDs, alf.feed)
|
||||
|
||||
announcementsForAlf := make(announcements)
|
||||
go logAttendantsStream(ts, alfsSource, "alf", announcementsForAlf)
|
||||
time.Sleep(1 * time.Second) // give some time to process new events
|
||||
|
||||
// assert what alf saw
|
||||
var seen bool
|
||||
a.Len(announcementsForAlf, 0, "none yet")
|
||||
|
||||
// assert what carl saw
|
||||
_, seen = announcementsForCarl[alf.feed.Ref()]
|
||||
a.True(seen, "carl saw alf")
|
||||
|
||||
// let bre join the room
|
||||
err = bre.Async(ctx, &ok, muxrpc.TypeJSON, muxrpc.Method{"room", "announce"})
|
||||
r.NoError(err)
|
||||
a.True(ok, "announce should be fine")
|
||||
|
||||
bresSource, err := bre.Source(ctx, muxrpc.TypeJSON, muxrpc.Method{"room", "attendants"})
|
||||
r.NoError(err)
|
||||
|
||||
// first message should be initial state
|
||||
a.True(bresSource.Next(ctx))
|
||||
decodeJSONsrc(t, bresSource, &initState)
|
||||
a.Equal("state", initState.Type)
|
||||
a.Len(initState.IDs, 3)
|
||||
assertListContains(t, initState.IDs, alf.feed)
|
||||
assertListContains(t, initState.IDs, bre.feed)
|
||||
assertListContains(t, initState.IDs, carl.feed)
|
||||
|
||||
announcementsForBre := make(announcements)
|
||||
go logAttendantsStream(ts, bresSource, "bre", announcementsForBre)
|
||||
|
||||
time.Sleep(1 * time.Second) // give some time to process new events
|
||||
|
||||
a.Len(announcementsForBre, 0, "none yet")
|
||||
|
||||
// the two present people saw her
|
||||
_, seen = announcementsForAlf[bre.feed.Ref()]
|
||||
a.True(seen, "alf saw bre")
|
||||
|
||||
_, seen = announcementsForCarl[bre.feed.Ref()]
|
||||
a.True(seen, "carl saw alf")
|
||||
|
||||
// shutdown alf first
|
||||
alf.Terminate()
|
||||
|
||||
time.Sleep(1 * time.Second) // give some time to process new events
|
||||
|
||||
// bre and arl should have removed him
|
||||
|
||||
_, seen = announcementsForBre[alf.feed.Ref()]
|
||||
a.False(seen, "alf should be gone for bre")
|
||||
|
||||
_, seen = announcementsForCarl[alf.feed.Ref()]
|
||||
a.False(seen, "alf should be gone for carl")
|
||||
|
||||
// terminate server and the clients
|
||||
ts.srv.Shutdown()
|
||||
bre.Terminate()
|
||||
carl.Terminate()
|
||||
ts.srv.Close()
|
||||
|
||||
// wait for all muxrpc serve()s to exit
|
||||
r.NoError(ts.serveGroup.Wait())
|
||||
cancel()
|
||||
|
||||
}
|
||||
|
||||
func assertListContains(t *testing.T, lst []refs.FeedRef, who refs.FeedRef) {
|
||||
var found = false
|
||||
for _, feed := range lst {
|
||||
if feed.Equal(&who) {
|
||||
found = true
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Errorf("did not find %s in list of %d", who.ShortRef(), len(lst))
|
||||
}
|
||||
}
|
||||
|
||||
func decodeJSONsrc(t *testing.T, src *muxrpc.ByteSource, val interface{}) {
|
||||
err := src.Reader(func(rd io.Reader) error {
|
||||
return json.NewDecoder(rd).Decode(val)
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// consume endpoint messaes and put each peer on the passed map
|
||||
func logAttendantsStream(ts *testSession, src *muxrpc.ByteSource, who string, a announcements) {
|
||||
var update server.AttendantsUpdate
|
||||
|
||||
for src.Next(ts.ctx) {
|
||||
body, err := src.Bytes()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
// ts.t.Log(who, "got body:", string(body))
|
||||
|
||||
err = json.Unmarshal(body, &update)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
ts.t.Log(who, "got an update:", update.Type, update.ID.ShortRef())
|
||||
|
||||
switch update.Type {
|
||||
case "joined":
|
||||
a[update.ID.Ref()] = struct{}{}
|
||||
case "left":
|
||||
delete(a, update.ID.Ref())
|
||||
default:
|
||||
ts.t.Fatalf("%s: unexpected update type: %v", who, update.Type)
|
||||
}
|
||||
}
|
||||
|
||||
if err := src.Err(); err != nil {
|
||||
ts.t.Log(who, "source errored: ", err)
|
||||
return
|
||||
}
|
||||
|
||||
ts.t.Log(who, "stream closed")
|
||||
}
|
|
@ -1,3 +1,5 @@
|
|||
// SPDX-License-Identifier: MIT
|
||||
|
||||
package go_test
|
||||
|
||||
import (
|
||||
|
@ -19,6 +21,8 @@ type announcements map[string]struct{}
|
|||
|
||||
// we will let three clients (alf, bre, crl) join and see that the endpoint output is as expected
|
||||
func TestEndpointClients(t *testing.T) {
|
||||
testInit(t)
|
||||
|
||||
r := require.New(t)
|
||||
a := assert.New(t)
|
||||
|
||||
|
@ -40,12 +44,12 @@ func TestEndpointClients(t *testing.T) {
|
|||
|
||||
// let carl join the room
|
||||
// carl wont announce to emulate manyverse
|
||||
carlEndpointsSerc, err := carl.Source(ctx, muxrpc.TypeJSON, muxrpc.Method{"tunnel", "endpoints"})
|
||||
carlEndpointsSrc, err := carl.Source(ctx, muxrpc.TypeJSON, muxrpc.Method{"tunnel", "endpoints"})
|
||||
r.NoError(err)
|
||||
t.Log("carl opened endpoints")
|
||||
|
||||
announcementsForCarl := make(announcements)
|
||||
go logEndpointsStream(ts, carlEndpointsSerc, "carl", announcementsForCarl)
|
||||
go logEndpointsStream(ts, carlEndpointsSrc, "carl", announcementsForCarl)
|
||||
time.Sleep(1 * time.Second) // give some time to process new events
|
||||
|
||||
_, seen := announcementsForCarl[carl.feed.Ref()]
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
// SPDX-License-Identifier: MIT
|
||||
|
||||
package go_test
|
||||
|
||||
import (
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package roomstate
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"sync"
|
||||
|
||||
|
@ -56,13 +57,29 @@ func (m *Manager) RegisterAttendantsUpdates(sink broadcasts.AttendantsEmitter) {
|
|||
m.attendantsbroadcaster.Register(sink)
|
||||
}
|
||||
|
||||
// List just returns a list of feed references
|
||||
// List just returns a list of feed references as strings
|
||||
func (m *Manager) List() []string {
|
||||
m.roomMu.Lock()
|
||||
defer m.roomMu.Unlock()
|
||||
return m.room.AsList()
|
||||
}
|
||||
|
||||
func (m *Manager) ListAsRefs() []refs.FeedRef {
|
||||
m.roomMu.Lock()
|
||||
lst := m.room.AsList()
|
||||
m.roomMu.Unlock()
|
||||
|
||||
rlst := make([]refs.FeedRef, len(lst))
|
||||
for i, s := range lst {
|
||||
fr, err := refs.ParseFeedRef(s)
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("invalid feed ref in room state: %d: %s", i, err))
|
||||
}
|
||||
rlst[i] = *fr
|
||||
}
|
||||
return rlst
|
||||
}
|
||||
|
||||
// AddEndpoint adds the endpoint to the room
|
||||
func (m *Manager) AddEndpoint(who refs.FeedRef, edp muxrpc.Endpoint) {
|
||||
m.roomMu.Lock()
|
||||
|
|
Loading…
Reference in New Issue