diff --git a/handlers/tunnel/connect.go b/handlers/tunnel/connect.go new file mode 100644 index 0000000..d573f71 --- /dev/null +++ b/handlers/tunnel/connect.go @@ -0,0 +1,96 @@ +package tunnel + +import ( + "context" + "encoding/json" + "fmt" + "io" + + refs "go.mindeco.de/ssb-refs" + + "go.cryptoscope.co/muxrpc/v2" +) + +type connectArg struct { + Portal refs.FeedRef `json:"portal"` + Target refs.FeedRef `json:"target"` +} + +type connectWithOriginArg struct { + connectArg + Origin refs.FeedRef `json:"origin"` // this should be clear from the shs session already +} + +func (rs *roomState) connect(ctx context.Context, req *muxrpc.Request, peerSrc *muxrpc.ByteSource, peerSnk *muxrpc.ByteSink) error { + // unpack arguments + + var args []connectArg + err := json.Unmarshal(req.RawArgs, &args) + if err != nil { + return fmt.Errorf("connect: invalid arguments: %w", err) + } + + if n := len(args); n != 1 { + return fmt.Errorf("connect: expected 1 argument, got %d", n) + } + arg := args[0] + + // see if we have and endpoint for the target + rs.roomsMu.Lock() + + edp, has := rs.rooms["lobby"][arg.Target.Ref()] + if !has { + rs.roomsMu.Unlock() + return fmt.Errorf("no such endpoint") + } + + // call connect on them + var argWorigin connectWithOriginArg + argWorigin.connectArg = arg + argWorigin.Origin = rs.self + + targetSrc, targetSnk, err := edp.Duplex(ctx, muxrpc.TypeBinary, muxrpc.Method{"tunnel", "connect"}, argWorigin) + if err != nil { + delete(rs.rooms["lobby"], arg.Target.Ref()) + rs.updater.Update(rs.rooms["lobby"].asList()) + rs.roomsMu.Unlock() + return fmt.Errorf("failed to init connect call with target: %w", err) + } + rs.roomsMu.Unlock() + + // pipe data + var cpy muxrpcDuplexCopy + cpy.ctx, cpy.cancel = context.WithCancel(ctx) + + go cpy.do(targetSnk, peerSrc) + go cpy.do(peerSnk, targetSrc) + + return nil +} + +type muxrpcDuplexCopy struct { + ctx context.Context + cancel context.CancelFunc +} + +func (mdc muxrpcDuplexCopy) do(w *muxrpc.ByteSink, r *muxrpc.ByteSource) { + for r.Next(mdc.ctx) { + err := r.Reader(func(rd io.Reader) error { + _, err := io.Copy(w, rd) + return err + }) + if err != nil { + fmt.Println("read failed:", err) + w.CloseWithError(err) + mdc.cancel() + return + } + } + if err := r.Err(); err != nil { + fmt.Println("src errored:", err) + w.CloseWithError(err) + mdc.cancel() + } + + return +} diff --git a/handlers/tunnel/plugin.go b/handlers/tunnel/plugin.go index 3c73847..434a479 100644 --- a/handlers/tunnel/plugin.go +++ b/handlers/tunnel/plugin.go @@ -8,6 +8,7 @@ import ( "go.cryptoscope.co/muxrpc/v2" "go.cryptoscope.co/muxrpc/v2/typemux" + refs "go.mindeco.de/ssb-refs" "go.mindeco.de/ssb-rooms/internal/broadcasts" "go.mindeco.de/ssb-rooms/internal/maybemuxrpc" ) @@ -37,10 +38,11 @@ func (plugin) Authorize(net.Conn) bool { return true } } */ -func New(log kitlog.Logger, ctx context.Context) maybemuxrpc.Plugin { +func New(log kitlog.Logger, ctx context.Context, self refs.FeedRef) maybemuxrpc.Plugin { mux := typemux.New(log) var rs = new(roomState) + rs.self = self rs.logger = log rs.updater, rs.broadcaster = broadcasts.NewRoomChanger() rs.rooms = make(roomsStateMap) @@ -58,8 +60,7 @@ func New(log kitlog.Logger, ctx context.Context) maybemuxrpc.Plugin { mux.RegisterSource(append(method, "endpoints"), typemux.SourceFunc(rs.endpoints)) - // TODO: patch typemux - // mux.RegisterDuplex(append(method, "connect"), typemux.DuplexFunc(rs.connect)) + mux.RegisterDuplex(append(method, "connect"), typemux.DuplexFunc(rs.connect)) return plugin{ h: &mux, diff --git a/handlers/tunnel/state.go b/handlers/tunnel/state.go index 9718026..a8d16b8 100644 --- a/handlers/tunnel/state.go +++ b/handlers/tunnel/state.go @@ -6,6 +6,7 @@ import ( "sync" "time" + refs "go.mindeco.de/ssb-refs" "go.mindeco.de/ssb-rooms/internal/network" kitlog "github.com/go-kit/kit/log" @@ -15,6 +16,8 @@ import ( ) type roomState struct { + self refs.FeedRef + logger kitlog.Logger updater broadcasts.RoomChangeSink diff --git a/roomsrv/init_network.go b/roomsrv/init_network.go index 4a8df18..5ef3453 100644 --- a/roomsrv/init_network.go +++ b/roomsrv/init_network.go @@ -49,7 +49,7 @@ func (s *Server) initNetwork() error { // s.master.Register(replicate.NewPlug(s.Users)) - tunnelPlug := tunnel.New(kitlog.With(s.logger, "unit", "tunnel"), s.rootCtx) + tunnelPlug := tunnel.New(kitlog.With(s.logger, "unit", "tunnel"), s.rootCtx, s.Whoami()) s.public.Register(tunnelPlug) // tcp+shs diff --git a/test/nodejs/testscripts/server.js b/test/nodejs/testscripts/server.js index ba64504..c5861b3 100644 --- a/test/nodejs/testscripts/server.js +++ b/test/nodejs/testscripts/server.js @@ -8,14 +8,12 @@ module.exports = { console.warn('peer change:',p.type, p.key) }) ) - setTimeout(ready, 1000) }, after: (sbot, client, exit) => { // hrm.. this runs twice (for each connection) console.warn('server new connection:', client.id) - setTimeout(exit, 30000) } } \ No newline at end of file diff --git a/test/nodejs/testscripts/simple_client.js b/test/nodejs/testscripts/simple_client.js index f6e8cae..3571ae1 100644 --- a/test/nodejs/testscripts/simple_client.js +++ b/test/nodejs/testscripts/simple_client.js @@ -41,7 +41,5 @@ module.exports = { console.warn("from roomsrv:",el) }) ) - - } } \ No newline at end of file diff --git a/test/nodejs/testscripts/simple_client_opening_tunnel.js b/test/nodejs/testscripts/simple_client_opening_tunnel.js index 7e5dd41..bca919b 100644 --- a/test/nodejs/testscripts/simple_client_opening_tunnel.js +++ b/test/nodejs/testscripts/simple_client_opening_tunnel.js @@ -50,7 +50,7 @@ module.exports = { client.conn.connect(roomHandle, (err, tunneldRpc) => { if (err) throw err console.warn("got tunnel to:", tunneldRpc.id) - + // check the tunnel connection works tunneldRpc.tunnel.ping((err, id) => { if (err) throw err