fix origin information of tunnel.connect
also cleanup logging by removing output to stdout
This commit is contained in:
parent
545187dfe5
commit
3afe9b3659
|
@ -8,19 +8,22 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
|
||||||
refs "go.mindeco.de/ssb-refs"
|
kitlog "github.com/go-kit/kit/log"
|
||||||
|
"github.com/go-kit/kit/log/level"
|
||||||
"go.cryptoscope.co/muxrpc/v2"
|
"go.cryptoscope.co/muxrpc/v2"
|
||||||
|
|
||||||
|
"github.com/ssb-ngi-pointer/go-ssb-room/internal/network"
|
||||||
|
refs "go.mindeco.de/ssb-refs"
|
||||||
)
|
)
|
||||||
|
|
||||||
type connectArg struct {
|
type connectArg struct {
|
||||||
Portal refs.FeedRef `json:"portal"`
|
Portal refs.FeedRef `json:"portal"` // the room server
|
||||||
Target refs.FeedRef `json:"target"`
|
Target refs.FeedRef `json:"target"` // which peer the initiator/caller wants to be tunneld to
|
||||||
}
|
}
|
||||||
|
|
||||||
type connectWithOriginArg struct {
|
type connectWithOriginArg struct {
|
||||||
connectArg
|
connectArg
|
||||||
Origin refs.FeedRef `json:"origin"` // this should be clear from the shs session already
|
Origin refs.FeedRef `json:"origin"` // who started the call
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handler) connect(ctx context.Context, req *muxrpc.Request, peerSrc *muxrpc.ByteSource, peerSnk *muxrpc.ByteSink) error {
|
func (h *Handler) connect(ctx context.Context, req *muxrpc.Request, peerSrc *muxrpc.ByteSource, peerSnk *muxrpc.ByteSink) error {
|
||||||
|
@ -36,6 +39,16 @@ func (h *Handler) connect(ctx context.Context, req *muxrpc.Request, peerSrc *mux
|
||||||
}
|
}
|
||||||
arg := args[0]
|
arg := args[0]
|
||||||
|
|
||||||
|
if !arg.Portal.Equal(&h.self) {
|
||||||
|
return fmt.Errorf("talking to the wrong room")
|
||||||
|
}
|
||||||
|
|
||||||
|
// who made the call
|
||||||
|
caller, err := network.GetFeedRefFromAddr(req.RemoteAddr())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// see if we have and endpoint for the target
|
// see if we have and endpoint for the target
|
||||||
|
|
||||||
edp, has := h.state.Has(arg.Target)
|
edp, has := h.state.Has(arg.Target)
|
||||||
|
@ -46,15 +59,16 @@ func (h *Handler) connect(ctx context.Context, req *muxrpc.Request, peerSrc *mux
|
||||||
// call connect on them
|
// call connect on them
|
||||||
var argWorigin connectWithOriginArg
|
var argWorigin connectWithOriginArg
|
||||||
argWorigin.connectArg = arg
|
argWorigin.connectArg = arg
|
||||||
argWorigin.Origin = h.self
|
argWorigin.Origin = *caller
|
||||||
|
|
||||||
targetSrc, targetSnk, err := edp.Duplex(ctx, muxrpc.TypeBinary, muxrpc.Method{"tunnel", "connect"}, argWorigin)
|
targetSrc, targetSnk, err := edp.Duplex(ctx, muxrpc.TypeBinary, muxrpc.Method{"tunnel", "connect"}, argWorigin)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to init connect call with target: %w", err)
|
return fmt.Errorf("failed to init connect call with target: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// pipe data
|
// pipe data between caller and target
|
||||||
var cpy muxrpcDuplexCopy
|
var cpy muxrpcDuplexCopy
|
||||||
|
cpy.logger = kitlog.With(h.logger, "caller", caller.ShortRef(), "target", arg.Target.ShortRef())
|
||||||
cpy.ctx, cpy.cancel = context.WithCancel(ctx)
|
cpy.ctx, cpy.cancel = context.WithCancel(ctx)
|
||||||
|
|
||||||
go cpy.do(targetSnk, peerSrc)
|
go cpy.do(targetSnk, peerSrc)
|
||||||
|
@ -66,6 +80,8 @@ func (h *Handler) connect(ctx context.Context, req *muxrpc.Request, peerSrc *mux
|
||||||
type muxrpcDuplexCopy struct {
|
type muxrpcDuplexCopy struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
|
|
||||||
|
logger kitlog.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mdc muxrpcDuplexCopy) do(w *muxrpc.ByteSink, r *muxrpc.ByteSource) {
|
func (mdc muxrpcDuplexCopy) do(w *muxrpc.ByteSink, r *muxrpc.ByteSource) {
|
||||||
|
@ -75,14 +91,15 @@ func (mdc muxrpcDuplexCopy) do(w *muxrpc.ByteSink, r *muxrpc.ByteSource) {
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println("read failed:", err)
|
level.Warn(mdc.logger).Log("event", "read failed", "err", err)
|
||||||
w.CloseWithError(err)
|
w.CloseWithError(err)
|
||||||
mdc.cancel()
|
mdc.cancel()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err := r.Err(); err != nil {
|
if err := r.Err(); err != nil {
|
||||||
fmt.Println("src errored:", err)
|
level.Warn(mdc.logger).Log("event", "source errored", "err", err)
|
||||||
|
// TODO: remove reading side from state?!
|
||||||
w.CloseWithError(err)
|
w.CloseWithError(err)
|
||||||
mdc.cancel()
|
mdc.cancel()
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue