start tunnel muxrpc handler

This commit is contained in:
Henry 2021-01-25 18:31:09 +01:00
parent f4dc1b1f42
commit f8ed24b665
5 changed files with 184 additions and 10 deletions

59
handlers/tunnel/plugin.go Normal file
View File

@ -0,0 +1,59 @@
package tunnel
import (
"net"
kitlog "github.com/go-kit/kit/log"
"go.cryptoscope.co/muxrpc/v2"
"go.cryptoscope.co/muxrpc/v2/typemux"
refs "go.mindeco.de/ssb-refs"
"go.mindeco.de/ssb-rooms/internal/maybemuxrpc"
)
const name = "tunnel"
var method muxrpc.Method = muxrpc.Method{name}
type plugin struct {
h muxrpc.Handler
log kitlog.Logger
}
func (plugin) Name() string { return name }
func (plugin) Method() muxrpc.Method { return method }
func (p plugin) Handler() muxrpc.Handler { return p.h }
func (plugin) Authorize(net.Conn) bool { return true }
/* manifest:
{
"announce": "sync",
"leave": "sync",
"connect": "duplex",
"endpoints": "source",
"isRoom": "async",
"ping": "sync",
}
*/
func New(log kitlog.Logger, self refs.FeedRef) maybemuxrpc.Plugin {
mux := typemux.New(log)
var rs roomState
rs.logger = log
mux.RegisterAsync(append(method, "isRoom"), typemux.AsyncFunc(rs.isRoom))
mux.RegisterAsync(append(method, "ping"), typemux.AsyncFunc(rs.ping))
mux.RegisterAsync(append(method, "announce"), typemux.AsyncFunc(rs.announce))
mux.RegisterAsync(append(method, "leave"), typemux.AsyncFunc(rs.leave))
mux.RegisterSource(append(method, "endpoints"), typemux.SourceFunc(rs.endpoints))
// TODO: patch typemux
// mux.RegisterDuplex(append(method, "connect"), typemux.DuplexFunc(rs.connect))
return plugin{
h: &mux,
}
}

41
handlers/tunnel/state.go Normal file
View File

@ -0,0 +1,41 @@
package tunnel
import (
"context"
"fmt"
"time"
kitlog "github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"go.cryptoscope.co/muxrpc/v2"
)
type roomState struct {
logger kitlog.Logger
}
func (rs roomState) isRoom(context.Context, *muxrpc.Request) (interface{}, error) {
level.Debug(rs.logger).Log("called", "isRoom")
return true, nil
}
func (rs roomState) ping(context.Context, *muxrpc.Request) (interface{}, error) {
now := time.Now().UnixNano() / 1000
level.Debug(rs.logger).Log("called", "ping")
return now, nil
}
func (rs roomState) announce(context.Context, *muxrpc.Request) (interface{}, error) {
level.Debug(rs.logger).Log("called", "announce")
return nil, fmt.Errorf("TODO:announce")
}
func (rs roomState) leave(context.Context, *muxrpc.Request) (interface{}, error) {
level.Debug(rs.logger).Log("called", "leave")
return nil, fmt.Errorf("TODO:leave")
}
func (rs roomState) endpoints(context.Context, *muxrpc.Request, *muxrpc.ByteSink, muxrpc.Endpoint) error {
level.Debug(rs.logger).Log("called", "endpoints")
return fmt.Errorf("TODO:endpoints")
}

View File

@ -9,6 +9,7 @@ import (
"go.cryptoscope.co/muxrpc/v2"
refs "go.mindeco.de/ssb-refs"
"go.mindeco.de/ssb-rooms/handlers/tunnel"
"go.mindeco.de/ssb-rooms/handlers/whoami"
"go.mindeco.de/ssb-rooms/internal/maybemuxrpc"
"go.mindeco.de/ssb-rooms/internal/network"
@ -44,8 +45,13 @@ func (s *Server) initNetwork() error {
s.public.Register(whoami)
s.master.Register(whoami)
s.master.Register(manifestPlug)
// s.master.Register(replicate.NewPlug(s.Users))
tunnelPlug := tunnel.New(kitlog.With(s.logger, "unit", "tunnel"), s.keyPair.Feed)
s.public.Register(tunnelPlug)
// tcp+shs
opts := network.Options{
Logger: s.logger,

53
roomsrv/manifest.go Normal file
View File

@ -0,0 +1,53 @@
package roomsrv
import (
"context"
"encoding/json"
"fmt"
"net"
"go.cryptoscope.co/muxrpc/v2"
)
type namedPlugin struct {
h muxrpc.Handler
name string
}
func (np namedPlugin) Name() string { return np.name }
func (np namedPlugin) Method() muxrpc.Method { return muxrpc.Method{np.name} }
func (np namedPlugin) Handler() muxrpc.Handler { return np.h }
func (np namedPlugin) Authorize(net.Conn) bool { return true }
type manifestHandler string
func (manifestHandler) HandleConnect(context.Context, muxrpc.Endpoint) {}
func (h manifestHandler) HandleCall(ctx context.Context, req *muxrpc.Request, edp muxrpc.Endpoint) {
err := req.Return(ctx, json.RawMessage(h))
if err != nil {
fmt.Println("manifest err", err)
}
}
// this is a very simple hardcoded manifest.json dump which oasis' ssb-client expects to do it's magic.
const manifest manifestHandler = `
{
"manifest": "sync",
"whoami":"async",
"tunnel": {
"announce": "sync",
"leave": "sync",
"connect": "duplex",
"endpoints": "source",
"isRoom": "async",
"ping": "sync",
}
}`
var manifestPlug = namedPlugin{
h: manifest,
name: "manifest",
}

View File

@ -63,26 +63,41 @@ func TestTunnelServerSimple(t *testing.T) {
err = botB.Network.Connect(ctx, serv.Network.GetListenAddr())
r.NoError(err, "connect B to the Server") // we dont see an error because it just establishes the tcp connection
edpOfA, has := botA.Network.GetEndpointFor(serv.Whoami())
r.True(has, "botA has no endpoint for the server")
// t.Log("letting handshaking settle..")
// time.Sleep(1 * time.Second)
var srvWho struct {
ID refs.FeedRef
}
edpOfB, has := botB.Network.GetEndpointFor(serv.Whoami())
r.False(has, "botB has an endpoint for the server!")
if edpOfB != nil {
a.Nil(edpOfB, "should not have an endpoint on B")
err = edpOfB.Async(ctx, &srvWho, muxrpc.TypeJSON, muxrpc.Method{"whoami"})
r.Error(err)
t.Log(srvWho.ID.Ref())
}
edpOfA, has := botA.Network.GetEndpointFor(serv.Whoami())
r.True(has, "botA has no endpoint for the server")
err = edpOfA.Async(ctx, &srvWho, muxrpc.TypeJSON, muxrpc.Method{"whoami"})
r.NoError(err)
t.Log("server whoami:", srvWho.ID.Ref())
a.True(serv.Whoami().Equal(&srvWho.ID))
edpOfB, has := botB.Network.GetEndpointFor(serv.Whoami())
r.False(has, "botB has an endpoint for the server!")
if edpOfB != nil {
t.Error("should not have an endpoint on B")
err = edpOfB.Async(ctx, &srvWho, muxrpc.TypeJSON, muxrpc.Method{"whoami"})
r.Error(err)
t.Log(srvWho.ID.Ref())
}
// start testing basic room stuff
var yes bool
err = edpOfA.Async(ctx, &yes, muxrpc.TypeJSON, muxrpc.Method{"tunnel", "isRoom"})
r.NoError(err)
a.True(yes, "srv is not a room?!")
var ts int
err = edpOfA.Async(ctx, &ts, muxrpc.TypeJSON, muxrpc.Method{"tunnel", "ping"})
r.NoError(err)
t.Log("ping:", ts)
// cleanup
cancel()