diff --git a/muxrpc/handlers/tunnel/server/connect.go b/muxrpc/handlers/tunnel/server/connect.go index 3dd0e8b..f676220 100644 --- a/muxrpc/handlers/tunnel/server/connect.go +++ b/muxrpc/handlers/tunnel/server/connect.go @@ -8,19 +8,22 @@ import ( "fmt" "io" - refs "go.mindeco.de/ssb-refs" - + kitlog "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" "go.cryptoscope.co/muxrpc/v2" + + "github.com/ssb-ngi-pointer/go-ssb-room/internal/network" + refs "go.mindeco.de/ssb-refs" ) type connectArg struct { - Portal refs.FeedRef `json:"portal"` - Target refs.FeedRef `json:"target"` + Portal refs.FeedRef `json:"portal"` // the room server + Target refs.FeedRef `json:"target"` // which peer the initiator/caller wants to be tunneld to } type connectWithOriginArg struct { connectArg - Origin refs.FeedRef `json:"origin"` // this should be clear from the shs session already + Origin refs.FeedRef `json:"origin"` // who started the call } func (h *Handler) connect(ctx context.Context, req *muxrpc.Request, peerSrc *muxrpc.ByteSource, peerSnk *muxrpc.ByteSink) error { @@ -36,6 +39,16 @@ func (h *Handler) connect(ctx context.Context, req *muxrpc.Request, peerSrc *mux } arg := args[0] + if !arg.Portal.Equal(&h.self) { + return fmt.Errorf("talking to the wrong room") + } + + // who made the call + caller, err := network.GetFeedRefFromAddr(req.RemoteAddr()) + if err != nil { + return err + } + // see if we have and endpoint for the target edp, has := h.state.Has(arg.Target) @@ -46,15 +59,16 @@ func (h *Handler) connect(ctx context.Context, req *muxrpc.Request, peerSrc *mux // call connect on them var argWorigin connectWithOriginArg argWorigin.connectArg = arg - argWorigin.Origin = h.self + argWorigin.Origin = *caller targetSrc, targetSnk, err := edp.Duplex(ctx, muxrpc.TypeBinary, muxrpc.Method{"tunnel", "connect"}, argWorigin) if err != nil { return fmt.Errorf("failed to init connect call with target: %w", err) } - // pipe data + // pipe data between caller and target var cpy muxrpcDuplexCopy + cpy.logger = kitlog.With(h.logger, "caller", caller.ShortRef(), "target", arg.Target.ShortRef()) cpy.ctx, cpy.cancel = context.WithCancel(ctx) go cpy.do(targetSnk, peerSrc) @@ -66,6 +80,8 @@ func (h *Handler) connect(ctx context.Context, req *muxrpc.Request, peerSrc *mux type muxrpcDuplexCopy struct { ctx context.Context cancel context.CancelFunc + + logger kitlog.Logger } func (mdc muxrpcDuplexCopy) do(w *muxrpc.ByteSink, r *muxrpc.ByteSource) { @@ -75,14 +91,15 @@ func (mdc muxrpcDuplexCopy) do(w *muxrpc.ByteSink, r *muxrpc.ByteSource) { return err }) if err != nil { - fmt.Println("read failed:", err) + level.Warn(mdc.logger).Log("event", "read failed", "err", err) w.CloseWithError(err) mdc.cancel() return } } if err := r.Err(); err != nil { - fmt.Println("src errored:", err) + level.Warn(mdc.logger).Log("event", "source errored", "err", err) + // TODO: remove reading side from state?! w.CloseWithError(err) mdc.cancel() }