Files

156 lines
3.4 KiB
Go

package util
import (
"strings"
"os/signal"
"os"
"fmt"
"syscall"
"crypto/sha256"
"encoding/binary"
iroh "git.coopcloud.tech/decentral1se/iroh-go"
)
func Check(err error) {
if err != nil {
irohErr := err.(*iroh.IrohError)
panic(irohErr.Message())
}
}
func TwiceHashedKey (key string) []byte {
onceHashed32 := (sha256.Sum256([]byte(key)))
// converting [32]byte to []byte can be done by doing a slice copy à ala [:]
onceHashed := onceHashed32[:]
sum := sha256.Sum256(onceHashed)
return sum[:]
}
func WriteData(data []byte, send *iroh.SendStream) {
// NOTE (2026-06-30): bit of thrashing around having to instantiate sizebuf all the time
sizebuf := make([]byte, binary.MaxVarintLen64)
// calculate varint length of data
// prefix data with the length
fmt.Println("wrote", string(data))
wrote := binary.PutVarint(sizebuf, int64(binary.Size(data)))
sizebuf = sizebuf[:wrote]
// prepend varint length to data
data = append(sizebuf, data...)
// fmt.Println("data", data, "size", size, "len(data)", len(data))
// send data
err := send.WriteAll(data)
Check(err)
}
const frameSize = 1024 // NOTE (2026-06-30): arbitrarily chosen value, can be tweaked if things are not working
func ReadBytes(recv *iroh.RecvStream, ch chan<- []byte, done chan struct{}) {
for {
frame, err := recv.Read(frameSize)
select {
case <-done:
// done was closed elsewhere, exit earlier
return
default:
}
if checkForTimeout(err) {
break
}
ch<-frame
}
close(done)
}
const SENTINEL = -1024
func ProcessRead (recv *iroh.RecvStream, processed chan<- []byte, done chan struct{}) {
var size int64
size = SENTINEL
dataCh := make(chan []byte)
data := make([]byte, 0)
go ReadBytes(recv, dataCh, done)
for {
select {
case <-done:
return
case frame := <- dataCh:
data = append(data, frame...)
default:
// dont block
}
// if we don't have any data we instead of continuing the loop block on receiving from dataCh
if len(data) == 0 {
frame := <-dataCh
data = append(data, frame...)
}
bytesread := 0
if (size == SENTINEL) {
size, bytesread = binary.Varint(data)
if bytesread > 0 {
data = data[bytesread:]
} else {
panic("handlReading: varint wasn't where we expected it")
}
}
if len(data) >= int(size) {
processed <-data[:size]
data = data[size:]
size = SENTINEL
}
}
}
func HandleReading(recv *iroh.RecvStream, process func ([]byte) ([]byte, bool), responseCh chan<- []byte, done chan struct{}) {
dataCh := make(chan []byte)
go ProcessRead(recv, dataCh, done)
for {
select {
case data := <- dataCh:
responseData, hasData := process(data)
if hasData {
responseCh <- responseData
}
case <-done:
return
}
}
}
func SendResponses(send *iroh.SendStream, responseCh <-chan []byte, done chan struct{}) {
for {
select {
case <-done:
return
case data := <- responseCh:
WriteData(data, send)
}
}
}
func checkForTimeout(err error) bool {
// handle timeout
if err != nil {
irohErr := err.(*iroh.IrohError)
errMsg := irohErr.Message()
if strings.HasPrefix(errMsg, "ConnectionLost") {
fmt.Println("lost connection")
return true
} else {
// panics and exits bc unhandled err
Check(err)
}
}
return false
}
func AwaitInterrupt() {
done := make(chan os.Signal, 1)
signal.Notify(done, syscall.SIGINT, syscall.SIGTERM)
<-done
os.Exit(0)
}
func GetALPN(protoVersion int) []byte {
return TwiceHashedKey(fmt.Sprintf("iroh-tracker/%d", protoVersion))
}