tweak gossip.ping code

This commit is contained in:
Henry 2021-04-13 09:50:55 +02:00
parent 1b6077e49d
commit b939ca0873
1 changed files with 46 additions and 18 deletions

View File

@ -5,7 +5,6 @@ package gossip
import (
"context"
"encoding/json"
"fmt"
"io"
"time"
@ -16,10 +15,11 @@ import (
// Ping implements the server side of gossip.ping.
// it's idea is mentioned here https://github.com/ssbc/ssb-gossip/#ping-duplex
// and implemented by https://github.com/dominictarr/pull-ping/
//
func Ping(ctx context.Context, req *muxrpc.Request, peerSrc *muxrpc.ByteSource, peerSnk *muxrpc.ByteSink) error {
type arg struct {
Timeout int
// The only argument is the delay between two pings.
// the Javascript code calls this "timeout", tho.
Delay int `json:"timeout"`
}
var args []arg
@ -33,6 +33,17 @@ func Ping(ctx context.Context, req *muxrpc.Request, peerSrc *muxrpc.ByteSource,
// timeout = time.Minute * time.Duration(args[0].Timeout/(60*1000))
// }
// return sillyPingPong(ctx, peerSrc, peerSnk)
return actualPingPong(ctx, peerSrc, peerSnk)
}
// actually just read and write whenever...
func sillyPingPong(ctx context.Context, peerSrc *muxrpc.ByteSource, peerSnk *muxrpc.ByteSink) error {
var (
sendErr = make(chan error)
receiveErr = make(chan error)
)
go func() {
peerSnk.SetEncoding(muxrpc.TypeJSON)
enc := json.NewEncoder(peerSnk)
@ -40,6 +51,8 @@ func Ping(ctx context.Context, req *muxrpc.Request, peerSrc *muxrpc.ByteSource,
tick := time.NewTicker(5 * time.Second)
defer tick.Stop()
defer close(sendErr)
for {
select {
case <-ctx.Done():
@ -48,27 +61,42 @@ func Ping(ctx context.Context, req *muxrpc.Request, peerSrc *muxrpc.ByteSource,
}
var pong = encodedTime.Millisecs(time.Now())
err = enc.Encode(pong)
if err != nil {
if err := enc.Encode(pong); err != nil {
sendErr <- err
return
}
}
}()
for peerSrc.Next(ctx) {
var ping encodedTime.Millisecs
err := peerSrc.Reader(func(rd io.Reader) error {
return json.NewDecoder(rd).Decode(&ping)
})
if err != nil {
return err
go func() {
defer close(receiveErr)
for peerSrc.Next(ctx) {
var ping encodedTime.Millisecs
err := peerSrc.Reader(func(rd io.Reader) error {
return json.NewDecoder(rd).Decode(&ping)
})
if err != nil {
receiveErr <- err
return
}
// when := time.Time(ping)
// fmt.Printf("got ping: %s - age: %s\n", when.String(), time.Since(when))
}
// when := time.Time(ping)
// fmt.Printf("got ping: %s - age: %s\n", when.String(), time.Since(when))
}
return
}()
return nil
select {
case e := <-sendErr:
return e
case e := <-receiveErr:
return e
case <-ctx.Done():
return nil
}
}
// this is how it should work, i think, but it leads to disconnects...
@ -87,8 +115,8 @@ func actualPingPong(ctx context.Context, peerSrc *muxrpc.ByteSource, peerSnk *mu
return err
}
when := time.Time(ping)
fmt.Printf("got ping: %s - age: %s\n", when.String(), time.Since(when))
//when := time.Time(ping)
//fmt.Printf("got ping: %s - age: %s\n", when.String(), time.Since(when))
pong := encodedTime.Millisecs(time.Now())
err = enc.Encode(pong)