muxrcp: alias.register

* re-work muxrpc handler registration
* update muxrpc tests
This commit is contained in:
Henry 2021-03-11 18:40:33 +01:00
parent e6c3305229
commit 033efe5145
16 changed files with 204 additions and 177 deletions

View File

@ -47,17 +47,7 @@ type Confirmation struct {
}
// Verify checks that the confirmation is for the expected room and from the expected feed
func (c Confirmation) Verify(room, feed refs.FeedRef) bool {
// not for that room
if !c.RoomID.Equal(&room) {
return false
}
// not for that feed
if !c.UserID.Equal(&feed) {
return false
}
func (c Confirmation) Verify() bool {
// re-construct the registration
message := c.createRegistrationMessage()

View File

@ -40,7 +40,7 @@ func TestConfirmation(t *testing.T) {
// create the signed confirmation
confirmation := valid.Sign(userKeyPair.Pair.Secret)
yes := confirmation.Verify(roomID, userKeyPair.Feed)
yes := confirmation.Verify()
r.True(yes, "should be valid for this room and feed")
// make up another id for the invalid test(s)
@ -49,16 +49,19 @@ func TestConfirmation(t *testing.T) {
Algo: "test",
}
yes = confirmation.Verify(otherID, userKeyPair.Feed)
confirmation.RoomID = otherID
yes = confirmation.Verify()
r.False(yes, "should not be valid for another room")
yes = confirmation.Verify(roomID, otherID)
confirmation.RoomID = roomID // restore
confirmation.UserID = otherID
yes = confirmation.Verify()
r.False(yes, "should not be valid for this room but another feed")
// puncture the signature to emulate an invalid one
confirmation.Signature[0] = confirmation.Signature[0] ^ 1
yes = confirmation.Verify(roomID, userKeyPair.Feed)
yes = confirmation.Verify()
r.False(yes, "should not be valid anymore")
}

View File

@ -0,0 +1,21 @@
// SPDX-License-Identifier: MIT
package alias
import (
"github.com/ssb-ngi-pointer/go-ssb-room/roomdb"
kitlog "github.com/go-kit/kit/log"
refs "go.mindeco.de/ssb-refs"
)
func New(log kitlog.Logger, self refs.FeedRef, aliasDB roomdb.AliasService) Handler {
var h Handler
h.self = self
h.logger = log
h.db = aliasDB
return h
}

View File

@ -0,0 +1,62 @@
package alias
import (
"context"
"encoding/json"
"fmt"
"github.com/ssb-ngi-pointer/go-ssb-room/aliases"
"github.com/ssb-ngi-pointer/go-ssb-room/internal/network"
kitlog "github.com/go-kit/kit/log"
"github.com/ssb-ngi-pointer/go-ssb-room/roomdb"
"go.cryptoscope.co/muxrpc/v2"
refs "go.mindeco.de/ssb-refs"
)
type Handler struct {
logger kitlog.Logger
self refs.FeedRef
db roomdb.AliasService
}
func (h Handler) Register(ctx context.Context, req *muxrpc.Request) (interface{}, error) {
var args []json.RawMessage
err := json.Unmarshal(req.RawArgs, &args)
if err != nil {
return nil, fmt.Errorf("registerAlias: bad request: %w", err)
}
if n := len(args); n != 2 {
return nil, fmt.Errorf("registerAlias: expected two arguments got %d", n)
}
var confirmation aliases.Confirmation
confirmation.RoomID = h.self
confirmation.Alias = string(args[0])
// check alias is valid
// if !aliases.IsValid(confirmation.Alias) { ... }
// get the user from the muxrpc connection
userID, err := network.GetFeedRefFromAddr(req.RemoteAddr())
if err != nil {
return nil, err
}
confirmation.UserID = *userID
// check the signature
if !confirmation.Verify() {
return nil, fmt.Errorf("registerAlias: invalid signature")
}
err = h.db.Register(ctx, confirmation.Alias, confirmation.Signature)
if err != nil {
return nil, fmt.Errorf("registerAlias: could not register alias: %w", err)
}
return true, nil
}

View File

@ -23,7 +23,7 @@ type connectWithOriginArg struct {
Origin refs.FeedRef `json:"origin"` // this should be clear from the shs session already
}
func (h *handler) connect(ctx context.Context, req *muxrpc.Request, peerSrc *muxrpc.ByteSource, peerSnk *muxrpc.ByteSink) error {
func (h *Handler) connect(ctx context.Context, req *muxrpc.Request, peerSrc *muxrpc.ByteSource, peerSnk *muxrpc.ByteSink) error {
// unpack arguments
var args []connectArg

View File

@ -3,13 +3,10 @@
package server
import (
"net"
kitlog "github.com/go-kit/kit/log"
"go.cryptoscope.co/muxrpc/v2"
"go.cryptoscope.co/muxrpc/v2/typemux"
"github.com/ssb-ngi-pointer/go-ssb-room/internal/maybemuxrpc"
"github.com/ssb-ngi-pointer/go-ssb-room/roomstate"
refs "go.mindeco.de/ssb-refs"
)
@ -18,16 +15,6 @@ const name = "tunnel"
var method muxrpc.Method = muxrpc.Method{name}
type plugin struct {
h muxrpc.Handler
log kitlog.Logger
}
func (plugin) Name() string { return name }
func (plugin) Method() muxrpc.Method { return method }
func (p plugin) Handler() muxrpc.Handler { return p.h }
func (plugin) Authorize(net.Conn) bool { return true }
/* manifest:
{
"announce": "sync",
@ -39,25 +26,24 @@ func (plugin) Authorize(net.Conn) bool { return true }
}
*/
func New(log kitlog.Logger, self refs.FeedRef, m *roomstate.Manager) maybemuxrpc.Plugin {
mux := typemux.New(log)
func New(log kitlog.Logger, self refs.FeedRef, m *roomstate.Manager) *Handler {
var h = new(handler)
var h = new(Handler)
h.self = self
h.logger = log
h.state = m
mux.RegisterAsync(append(method, "isRoom"), typemux.AsyncFunc(h.isRoom))
mux.RegisterAsync(append(method, "ping"), typemux.AsyncFunc(h.ping))
mux.RegisterAsync(append(method, "announce"), typemux.AsyncFunc(h.announce))
mux.RegisterAsync(append(method, "leave"), typemux.AsyncFunc(h.leave))
mux.RegisterSource(append(method, "endpoints"), typemux.SourceFunc(h.endpoints))
mux.RegisterDuplex(append(method, "connect"), typemux.DuplexFunc(h.connect))
return plugin{
h: &mux,
}
return h
}
func (h *Handler) Register(mux typemux.HandlerMux, namespace muxrpc.Method) {
mux.RegisterAsync(append(namespace, "isRoom"), typemux.AsyncFunc(h.isRoom))
mux.RegisterAsync(append(namespace, "ping"), typemux.AsyncFunc(h.ping))
mux.RegisterAsync(append(namespace, "announce"), typemux.AsyncFunc(h.announce))
mux.RegisterAsync(append(namespace, "leave"), typemux.AsyncFunc(h.leave))
mux.RegisterSource(append(namespace, "endpoints"), typemux.SourceFunc(h.endpoints))
mux.RegisterDuplex(append(namespace, "connect"), typemux.DuplexFunc(h.connect))
}

View File

@ -16,25 +16,25 @@ import (
"go.cryptoscope.co/muxrpc/v2"
)
type handler struct {
type Handler struct {
logger kitlog.Logger
self refs.FeedRef
state *roomstate.Manager
}
func (h *handler) isRoom(context.Context, *muxrpc.Request) (interface{}, error) {
func (h *Handler) isRoom(context.Context, *muxrpc.Request) (interface{}, error) {
level.Debug(h.logger).Log("called", "isRoom")
return true, nil
}
func (h *handler) ping(context.Context, *muxrpc.Request) (interface{}, error) {
func (h *Handler) ping(context.Context, *muxrpc.Request) (interface{}, error) {
now := time.Now().UnixNano() / 1000
level.Debug(h.logger).Log("called", "ping")
return now, nil
}
func (h *handler) announce(_ context.Context, req *muxrpc.Request) (interface{}, error) {
func (h *Handler) announce(_ context.Context, req *muxrpc.Request) (interface{}, error) {
level.Debug(h.logger).Log("called", "announce")
ref, err := network.GetFeedRefFromAddr(req.RemoteAddr())
if err != nil {
@ -46,7 +46,7 @@ func (h *handler) announce(_ context.Context, req *muxrpc.Request) (interface{},
return false, nil
}
func (h *handler) leave(_ context.Context, req *muxrpc.Request) (interface{}, error) {
func (h *Handler) leave(_ context.Context, req *muxrpc.Request) (interface{}, error) {
ref, err := network.GetFeedRefFromAddr(req.RemoteAddr())
if err != nil {
return nil, err
@ -57,7 +57,7 @@ func (h *handler) leave(_ context.Context, req *muxrpc.Request) (interface{}, er
return false, nil
}
func (h *handler) endpoints(_ context.Context, req *muxrpc.Request, snk *muxrpc.ByteSink) error {
func (h *Handler) endpoints(_ context.Context, req *muxrpc.Request, snk *muxrpc.ByteSink) error {
level.Debug(h.logger).Log("called", "endpoints")
toPeer := newForwarder(snk)

View File

@ -4,15 +4,14 @@ package whoami
import (
"context"
"fmt"
"net"
"go.cryptoscope.co/muxrpc/v2/typemux"
kitlog "github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"go.cryptoscope.co/muxrpc/v2"
refs "go.mindeco.de/ssb-refs"
"github.com/ssb-ngi-pointer/go-ssb-room/internal/maybemuxrpc"
)
var (
@ -25,64 +24,18 @@ func checkAndLog(log kitlog.Logger, err error) {
}
}
func New(log kitlog.Logger, id refs.FeedRef) maybemuxrpc.Plugin {
return plugin{handler{
log: log,
id: id,
}}
func New(id refs.FeedRef) typemux.AsyncHandler {
return handler{id: id}
}
type plugin struct {
h handler
}
func (plugin) Name() string { return "whoami" }
func (plugin) Method() muxrpc.Method { return method }
func (wami plugin) Handler() muxrpc.Handler { return wami.h }
func (plugin) Authorize(net.Conn) bool { return true }
type handler struct {
log kitlog.Logger
id refs.FeedRef
id refs.FeedRef
}
func (handler) Handled(m muxrpc.Method) bool { return m.String() == "whoami" }
func (handler) HandleConnect(ctx context.Context, edp muxrpc.Endpoint) {}
func (h handler) HandleCall(ctx context.Context, req *muxrpc.Request) {
// TODO: push manifest check into muxrpc
if req.Type == "" {
req.Type = "async"
}
if req.Method.String() != "whoami" {
req.CloseWithError(fmt.Errorf("wrong method"))
return
}
func (h handler) HandleAsync(ctx context.Context, req *muxrpc.Request) (interface{}, error) {
type ret struct {
ID string `json:"id"`
}
err := req.Return(ctx, ret{h.id.Ref()})
checkAndLog(h.log, err)
}
type endpoint struct {
edp muxrpc.Endpoint
}
func (edp endpoint) WhoAmI(ctx context.Context) (refs.FeedRef, error) {
var resp struct {
ID refs.FeedRef `json:"id"`
}
err := edp.edp.Async(ctx, &resp, muxrpc.TypeJSON, method)
if err != nil {
return refs.FeedRef{}, fmt.Errorf("error making async call: %w", err)
}
return resp.ID, nil
return ret{h.id.Ref()}, nil
}

View File

@ -18,10 +18,10 @@ import (
"github.com/stretchr/testify/require"
"go.cryptoscope.co/muxrpc/v2/debug"
"github.com/ssb-ngi-pointer/go-ssb-room/roomdb/sqlite"
"github.com/ssb-ngi-pointer/go-ssb-room/internal/maybemod/testutils"
"github.com/ssb-ngi-pointer/go-ssb-room/internal/network"
"github.com/ssb-ngi-pointer/go-ssb-room/internal/repo"
"github.com/ssb-ngi-pointer/go-ssb-room/roomdb/sqlite"
"github.com/ssb-ngi-pointer/go-ssb-room/roomsrv"
)
@ -84,7 +84,7 @@ func makeNamedTestBot(t testing.TB, name string, opts []roomsrv.Option) *roomsrv
t.Cleanup(func() {
db.Close()
})
theBot, err := roomsrv.New(db.AllowList, botOptions...)
theBot, err := roomsrv.New(db.AllowList, db.Aliases, botOptions...)
r.NoError(err)
return theBot
}

View File

@ -80,8 +80,9 @@ func TestJSClient(t *testing.T) {
ts := newRandomSession(t)
// ts := newSession(t, nil)
var al = &mockdb.FakeAllowListService{}
srv := ts.startGoServer(al)
var allowDB = &mockdb.FakeAllowListService{}
var aliasDB = &mockdb.FakeAliasService{}
srv := ts.startGoServer(allowDB, aliasDB)
alice := ts.startJSClient("alice", "./testscripts/simple_client.js",
srv.Network.GetListenAddr(),
@ -110,7 +111,7 @@ func TestJSClient(t *testing.T) {
)
srv.Allow(bob, true)
al.HasFeedReturns(true)
allowDB.HasFeedReturns(true)
time.Sleep(5 * time.Second)
@ -137,10 +138,11 @@ func TestJSServer(t *testing.T) {
}
// now connect our go client
var al = &mockdb.FakeAllowListService{}
client := ts.startGoServer(al)
var allowDB = &mockdb.FakeAllowListService{}
var aliasDB = &mockdb.FakeAliasService{}
client := ts.startGoServer(allowDB, aliasDB)
client.Allow(*alice, true)
al.HasFeedReturns(true)
allowDB.HasFeedReturns(true)
var roomHandle bytes.Buffer
roomHandle.WriteString("tunnel:")

View File

@ -86,7 +86,10 @@ func newSession(t *testing.T, appKey []byte) *testSession {
return ts
}
func (ts *testSession) startGoServer(al roomdb.AllowListService, opts ...roomsrv.Option) *roomsrv.Server {
func (ts *testSession) startGoServer(
allowDB roomdb.AllowListService,
aliasDB roomdb.AliasService,
opts ...roomsrv.Option) *roomsrv.Server {
r := require.New(ts.t)
// prepend defaults
@ -107,7 +110,7 @@ func (ts *testSession) startGoServer(al roomdb.AllowListService, opts ...roomsrv
}),
)
srv, err := roomsrv.New(al, opts...)
srv, err := roomsrv.New(allowDB, aliasDB, opts...)
r.NoError(err, "failed to init tees a server")
ts.t.Logf("go server: %s", srv.Whoami().Ref())
ts.t.Cleanup(func() {

47
roomsrv/init_handlers.go Normal file
View File

@ -0,0 +1,47 @@
package roomsrv
import (
kitlog "github.com/go-kit/kit/log"
"github.com/ssb-ngi-pointer/go-ssb-room/roomdb"
muxrpc "go.cryptoscope.co/muxrpc/v2"
"go.cryptoscope.co/muxrpc/v2/typemux"
"github.com/ssb-ngi-pointer/go-ssb-room/muxrpc/handlers/alias"
"github.com/ssb-ngi-pointer/go-ssb-room/muxrpc/handlers/tunnel/server"
"github.com/ssb-ngi-pointer/go-ssb-room/muxrpc/handlers/whoami"
)
func (s *Server) initHandlers(aliasDB roomdb.AliasService) {
// inistaniate handler packages
whoami := whoami.New(s.Whoami())
tunnelHandler := server.New(
kitlog.With(s.logger, "unit", "tunnel"),
s.Whoami(),
s.StateManager,
)
aliasHandler := alias.New(
kitlog.With(s.logger, "unit", "aliases"),
s.Whoami(),
aliasDB,
)
// register muxrpc commands
registries := []typemux.HandlerMux{s.public, s.master}
for _, mux := range registries {
mux.RegisterAsync(muxrpc.Method{"manifest"}, manifest)
mux.RegisterAsync(muxrpc.Method{"whoami"}, whoami)
// register tunnel.connect etc twice (as tunnel.* and room.*)
var method = muxrpc.Method{"tunnel"}
tunnelHandler.Register(mux, method)
method = muxrpc.Method{"room"}
tunnelHandler.Register(mux, method)
mux.RegisterAsync(append(method, "registerAlias"), typemux.AsyncFunc(aliasHandler.Register))
}
}

View File

@ -6,13 +6,10 @@ import (
"fmt"
"net"
kitlog "github.com/go-kit/kit/log"
"go.cryptoscope.co/muxrpc/v2"
refs "go.mindeco.de/ssb-refs"
"github.com/ssb-ngi-pointer/go-ssb-room/internal/network"
"github.com/ssb-ngi-pointer/go-ssb-room/muxrpc/handlers/tunnel/server"
"github.com/ssb-ngi-pointer/go-ssb-room/muxrpc/handlers/whoami"
)
func (s *Server) initNetwork() error {
@ -27,32 +24,16 @@ func (s *Server) initNetwork() error {
}
if s.keyPair.Feed.Equal(remote) {
return s.master.MakeHandler(conn)
return &s.master, nil
}
if s.authorizer.HasFeed(s.rootCtx, *remote) {
return s.public.MakeHandler(conn)
return &s.public, nil
}
return nil, fmt.Errorf("not authorized")
}
// whoami
whoami := whoami.New(kitlog.With(s.logger, "unit", "whoami"), s.Whoami())
s.public.Register(whoami)
s.master.Register(whoami)
s.master.Register(manifestPlug)
// s.master.Register(replicate.NewPlug(s.Users))
tunnelPlug := server.New(
kitlog.With(s.logger, "unit", "tunnel"),
s.Whoami(),
s.StateManager,
)
s.public.Register(tunnelPlug)
// tcp+shs
opts := network.Options{
Logger: s.logger,

View File

@ -53,6 +53,7 @@ func (s *Server) initUnixSock() error {
go func() {
acceptLoop:
for {
c, err := uxLis.Accept()
if err != nil {
@ -77,7 +78,7 @@ func (s *Server) initUnixSock() error {
if err != nil {
level.Warn(s.logger).Log("err", err)
c.Close()
continue
continue acceptLoop
}
}
@ -86,13 +87,7 @@ func (s *Server) initUnixSock() error {
pkr := muxrpc.NewPacker(conn)
h, err := s.master.MakeHandler(conn)
if err != nil {
level.Warn(s.logger).Log("event", "unix sock make handler", "err", err)
return
}
edp := muxrpc.Handle(pkr, h,
edp := muxrpc.Handle(pkr, &s.master,
muxrpc.WithContext(s.rootCtx),
muxrpc.WithLogger(kitlog.NewNopLogger()),
)

View File

@ -6,32 +6,14 @@ import (
"context"
"encoding/json"
"fmt"
"net"
"go.cryptoscope.co/muxrpc/v2"
)
type namedPlugin struct {
h muxrpc.Handler
name string
}
func (np namedPlugin) Name() string { return np.name }
func (np namedPlugin) Method() muxrpc.Method { return muxrpc.Method{np.name} }
func (np namedPlugin) Handler() muxrpc.Handler { return np.h }
func (np namedPlugin) Authorize(net.Conn) bool { return true }
type manifestHandler string
func (manifestHandler) Handled(m muxrpc.Method) bool { return m.String() == "manifest" }
func (manifestHandler) HandleConnect(context.Context, muxrpc.Endpoint) {}
func (h manifestHandler) HandleCall(ctx context.Context, req *muxrpc.Request) {
err := req.Return(ctx, json.RawMessage(h))
if err != nil {
fmt.Println("manifest err", err)
}
func (h manifestHandler) HandleAsync(ctx context.Context, req *muxrpc.Request) (interface{}, error) {
return json.RawMessage(h), nil
}
func init() {
@ -59,8 +41,3 @@ const manifest manifestHandler = `
"ping": "sync"
}
}`
var manifestPlug = namedPlugin{
h: manifest,
name: "manifest",
}

View File

@ -12,13 +12,14 @@ import (
"path/filepath"
"sync"
"go.cryptoscope.co/muxrpc/v2/typemux"
kitlog "github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"go.cryptoscope.co/netwrap"
"github.com/ssb-ngi-pointer/go-ssb-room/internal/maybemod/keys"
"github.com/ssb-ngi-pointer/go-ssb-room/internal/maybemod/multicloser"
"github.com/ssb-ngi-pointer/go-ssb-room/internal/maybemuxrpc"
"github.com/ssb-ngi-pointer/go-ssb-room/internal/network"
"github.com/ssb-ngi-pointer/go-ssb-room/internal/repo"
"github.com/ssb-ngi-pointer/go-ssb-room/roomdb"
@ -53,8 +54,8 @@ type Server struct {
preSecureWrappers []netwrap.ConnWrapper
postSecureWrappers []netwrap.ConnWrapper
public maybemuxrpc.PluginManager
master maybemuxrpc.PluginManager
public typemux.HandlerMux
master typemux.HandlerMux
authorizer roomdb.AllowListService
@ -65,12 +66,13 @@ func (s Server) Whoami() refs.FeedRef {
return s.keyPair.Feed
}
func New(allow roomdb.AllowListService, opts ...Option) (*Server, error) {
func New(
allowdb roomdb.AllowListService,
aliasdb roomdb.AliasService,
opts ...Option,
) (*Server, error) {
var s Server
s.authorizer = allow
s.public = maybemuxrpc.NewPluginManager()
s.master = maybemuxrpc.NewPluginManager()
s.authorizer = allowdb
for i, opt := range opts {
err := opt(&s)
@ -110,6 +112,9 @@ func New(allow roomdb.AllowListService, opts ...Option) (*Server, error) {
s.logger = logger
}
s.public = typemux.New(kitlog.With(s.logger, "mux", "public"))
s.master = typemux.New(kitlog.With(s.logger, "mux", "master"))
if s.rootCtx == nil {
s.rootCtx, s.Shutdown = context.WithCancel(context.Background())
}
@ -126,6 +131,8 @@ func New(allow roomdb.AllowListService, opts ...Option) (*Server, error) {
s.StateManager = roomstate.NewManager(s.rootCtx, s.logger)
s.initHandlers(aliasdb)
if err := s.initNetwork(); err != nil {
return nil, err
}