add tunnel.connect
This commit is contained in:
parent
20ed882f33
commit
84c36d1f71
|
@ -0,0 +1,96 @@
|
|||
package tunnel
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
refs "go.mindeco.de/ssb-refs"
|
||||
|
||||
"go.cryptoscope.co/muxrpc/v2"
|
||||
)
|
||||
|
||||
type connectArg struct {
|
||||
Portal refs.FeedRef `json:"portal"`
|
||||
Target refs.FeedRef `json:"target"`
|
||||
}
|
||||
|
||||
type connectWithOriginArg struct {
|
||||
connectArg
|
||||
Origin refs.FeedRef `json:"origin"` // this should be clear from the shs session already
|
||||
}
|
||||
|
||||
func (rs *roomState) connect(ctx context.Context, req *muxrpc.Request, peerSrc *muxrpc.ByteSource, peerSnk *muxrpc.ByteSink) error {
|
||||
// unpack arguments
|
||||
|
||||
var args []connectArg
|
||||
err := json.Unmarshal(req.RawArgs, &args)
|
||||
if err != nil {
|
||||
return fmt.Errorf("connect: invalid arguments: %w", err)
|
||||
}
|
||||
|
||||
if n := len(args); n != 1 {
|
||||
return fmt.Errorf("connect: expected 1 argument, got %d", n)
|
||||
}
|
||||
arg := args[0]
|
||||
|
||||
// see if we have and endpoint for the target
|
||||
rs.roomsMu.Lock()
|
||||
|
||||
edp, has := rs.rooms["lobby"][arg.Target.Ref()]
|
||||
if !has {
|
||||
rs.roomsMu.Unlock()
|
||||
return fmt.Errorf("no such endpoint")
|
||||
}
|
||||
|
||||
// call connect on them
|
||||
var argWorigin connectWithOriginArg
|
||||
argWorigin.connectArg = arg
|
||||
argWorigin.Origin = rs.self
|
||||
|
||||
targetSrc, targetSnk, err := edp.Duplex(ctx, muxrpc.TypeBinary, muxrpc.Method{"tunnel", "connect"}, argWorigin)
|
||||
if err != nil {
|
||||
delete(rs.rooms["lobby"], arg.Target.Ref())
|
||||
rs.updater.Update(rs.rooms["lobby"].asList())
|
||||
rs.roomsMu.Unlock()
|
||||
return fmt.Errorf("failed to init connect call with target: %w", err)
|
||||
}
|
||||
rs.roomsMu.Unlock()
|
||||
|
||||
// pipe data
|
||||
var cpy muxrpcDuplexCopy
|
||||
cpy.ctx, cpy.cancel = context.WithCancel(ctx)
|
||||
|
||||
go cpy.do(targetSnk, peerSrc)
|
||||
go cpy.do(peerSnk, targetSrc)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type muxrpcDuplexCopy struct {
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
func (mdc muxrpcDuplexCopy) do(w *muxrpc.ByteSink, r *muxrpc.ByteSource) {
|
||||
for r.Next(mdc.ctx) {
|
||||
err := r.Reader(func(rd io.Reader) error {
|
||||
_, err := io.Copy(w, rd)
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
fmt.Println("read failed:", err)
|
||||
w.CloseWithError(err)
|
||||
mdc.cancel()
|
||||
return
|
||||
}
|
||||
}
|
||||
if err := r.Err(); err != nil {
|
||||
fmt.Println("src errored:", err)
|
||||
w.CloseWithError(err)
|
||||
mdc.cancel()
|
||||
}
|
||||
|
||||
return
|
||||
}
|
|
@ -8,6 +8,7 @@ import (
|
|||
"go.cryptoscope.co/muxrpc/v2"
|
||||
"go.cryptoscope.co/muxrpc/v2/typemux"
|
||||
|
||||
refs "go.mindeco.de/ssb-refs"
|
||||
"go.mindeco.de/ssb-rooms/internal/broadcasts"
|
||||
"go.mindeco.de/ssb-rooms/internal/maybemuxrpc"
|
||||
)
|
||||
|
@ -37,10 +38,11 @@ func (plugin) Authorize(net.Conn) bool { return true }
|
|||
}
|
||||
*/
|
||||
|
||||
func New(log kitlog.Logger, ctx context.Context) maybemuxrpc.Plugin {
|
||||
func New(log kitlog.Logger, ctx context.Context, self refs.FeedRef) maybemuxrpc.Plugin {
|
||||
mux := typemux.New(log)
|
||||
|
||||
var rs = new(roomState)
|
||||
rs.self = self
|
||||
rs.logger = log
|
||||
rs.updater, rs.broadcaster = broadcasts.NewRoomChanger()
|
||||
rs.rooms = make(roomsStateMap)
|
||||
|
@ -58,8 +60,7 @@ func New(log kitlog.Logger, ctx context.Context) maybemuxrpc.Plugin {
|
|||
|
||||
mux.RegisterSource(append(method, "endpoints"), typemux.SourceFunc(rs.endpoints))
|
||||
|
||||
// TODO: patch typemux
|
||||
// mux.RegisterDuplex(append(method, "connect"), typemux.DuplexFunc(rs.connect))
|
||||
mux.RegisterDuplex(append(method, "connect"), typemux.DuplexFunc(rs.connect))
|
||||
|
||||
return plugin{
|
||||
h: &mux,
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
refs "go.mindeco.de/ssb-refs"
|
||||
"go.mindeco.de/ssb-rooms/internal/network"
|
||||
|
||||
kitlog "github.com/go-kit/kit/log"
|
||||
|
@ -15,6 +16,8 @@ import (
|
|||
)
|
||||
|
||||
type roomState struct {
|
||||
self refs.FeedRef
|
||||
|
||||
logger kitlog.Logger
|
||||
|
||||
updater broadcasts.RoomChangeSink
|
||||
|
|
|
@ -49,7 +49,7 @@ func (s *Server) initNetwork() error {
|
|||
|
||||
// s.master.Register(replicate.NewPlug(s.Users))
|
||||
|
||||
tunnelPlug := tunnel.New(kitlog.With(s.logger, "unit", "tunnel"), s.rootCtx)
|
||||
tunnelPlug := tunnel.New(kitlog.With(s.logger, "unit", "tunnel"), s.rootCtx, s.Whoami())
|
||||
s.public.Register(tunnelPlug)
|
||||
|
||||
// tcp+shs
|
||||
|
|
|
@ -8,14 +8,12 @@ module.exports = {
|
|||
console.warn('peer change:',p.type, p.key)
|
||||
})
|
||||
)
|
||||
|
||||
setTimeout(ready, 1000)
|
||||
},
|
||||
|
||||
after: (sbot, client, exit) => {
|
||||
// hrm.. this runs twice (for each connection)
|
||||
console.warn('server new connection:', client.id)
|
||||
|
||||
setTimeout(exit, 30000)
|
||||
}
|
||||
}
|
|
@ -41,7 +41,5 @@ module.exports = {
|
|||
console.warn("from roomsrv:",el)
|
||||
})
|
||||
)
|
||||
|
||||
|
||||
}
|
||||
}
|
|
@ -50,7 +50,7 @@ module.exports = {
|
|||
client.conn.connect(roomHandle, (err, tunneldRpc) => {
|
||||
if (err) throw err
|
||||
console.warn("got tunnel to:", tunneldRpc.id)
|
||||
|
||||
|
||||
// check the tunnel connection works
|
||||
tunneldRpc.tunnel.ping((err, id) => {
|
||||
if (err) throw err
|
||||
|
|
Loading…
Reference in New Issue