Files

479 lines
15 KiB
Go

package main
import (
iroh "git.coopcloud.tech/decentral1se/iroh-go"
"fmt"
"bytes"
"strings"
"sync"
"time"
"crypto/sha256"
"gomod.cblgh.org/iroh-tracker/util"
"gomod.cblgh.org/iroh-tracker/proto"
// "gomod.cblgh.org/cerca/util/eout"
)
const (
TYPE_CLIENT = iota
TYPE_SERVER
)
const (
OLDEST = 14
NEWEST = 0
)
type StoredEndpoint struct {
ID string
TopicHash string
LastSeen time.Time
Type int // client or server
}
func (ep *StoredEndpoint) String () string {
maxLen := len(ep.ID)
// truncate
if maxLen > 16 { maxLen = 16 }
epType := "srv"
if ep.Type == TYPE_CLIENT {
epType = "cli"
}
return fmt.Sprintf("%s/%s", epType, ep.ID[0:maxLen])
}
// TODO (2026-07-01): add mutex.RWMutex mut and/or sync.Map
type Lookup struct {
mut sync.RWMutex
ByID map[string]*StoredEndpoint
ListsByHash map[string][]*StoredEndpoint
SeenHistory [][]*StoredEndpoint
}
func NewLookup () *Lookup{
return &Lookup{
ByID: make(map[string]*StoredEndpoint), // one to one
ListsByHash: make(map[string][]*StoredEndpoint), // one to many
SeenHistory: make([][]*StoredEndpoint, 15), // one to many
}
}
// TODO (2026-07-01): reject if readkey is not correct
// TODO (2026-07-01): implement federation (server GET/PUT/QUERY)
// add(topichash, id)
func (l *Lookup) AddClient(topichash, endpointID []byte) {
// NOTE: topic hash is already hexadecimal
l.add(string(topichash), string(endpointID), TYPE_CLIENT)
}
// TODO (2026-07-01): pass in server hash instead of using global
func (l *Lookup) AddServer(endpointID string) {
l.add(fmt.Sprintf("%x", util.TwiceHashedKey(synckey)), string(endpointID), TYPE_SERVER)
}
func (l *Lookup) add(topichash, endpointID string, endpointType int) {
// for first version, ignore that a particular endpoint could be registered to many topics
// one endpoint = one topic
l.mut.RLock()
_, exists := l.ByID[endpointID]
l.mut.RUnlock()
if exists {
fmt.Printf("endpoint %s already in lookup. bumping up their seen state\n", endpointID)
l.Bump(endpointID)
return
}
ep := &StoredEndpoint{ID: endpointID, TopicHash: topichash, LastSeen: time.Now().Add(-1 * time.Hour * 25), Type: endpointType}
l.mut.Lock()
l.ByID[endpointID] = ep
l.ListsByHash[topichash] = append(l.ListsByHash[topichash], ep)
l.SeenHistory[NEWEST] = append(l.SeenHistory[NEWEST], ep)
l.mut.Unlock()
}
// remove all but one element from the slice. we do this by just creating a new slice and omitting the deleted element
func copyAllExcept(slice []*StoredEndpoint, e *StoredEndpoint) []*StoredEndpoint {
// aint shit to do, buddy
if len(slice) == 0 { return slice }
newSlice := make([]*StoredEndpoint, len(slice) - 1)
for i := 0; i < len(slice); i++ {
if slice[i].ID == e.ID {
continue
}
newSlice[i] = slice[i]
}
return newSlice
}
// utility methd
func (l *Lookup) removeFromSeenHistory(ep *StoredEndpoint) {
// CONCURRENCY NOTE (2026-07-01): lookup.Forget does not have any locks because all callees are expected to call
// lock/unlock before calling forget. this is to prevent a deadlock situation that would otherwise occur.
diff := time.Now().Sub(ep.LastSeen)
daysSinceSeen := int(diff / (time.Hour * 24))
if daysSinceSeen > OLDEST {
panic(fmt.Errorf("lookup.removeFromSeenHistory: unhandled case, daysSinceSeen was %d", daysSinceSeen))
}
// TODO (2026-06-28): handle special case where LastSeen is > 14 days ago
// -> should search through all of SeenHistory to find it
l.SeenHistory[daysSinceSeen] = copyAllExcept(l.SeenHistory[daysSinceSeen], ep)
}
// bump(id)
// -> get endpoint by id from map[endpointId]
// -> get old endpoint seen time & quantize to #days from now
// -> update endpoint.LastSeen = time.Now()
// -> seenHistory[oldSeenTime].forEach((i, e) => if(e == endpoint) removeFromTimeSlice(i); break)
// -> update seenHistory[0] = append(seenHistory[0], endpointID)
func (l *Lookup) Bump(endpointID string) {
l.mut.RLock()
ep, exists := l.ByID[endpointID]
l.mut.RUnlock()
if !exists { return }
l.mut.Lock()
l.removeFromSeenHistory(ep)
l.mut.Unlock()
// update last seen
ep.LastSeen = time.Now()
l.mut.Lock()
l.SeenHistory[NEWEST] = append(l.SeenHistory[NEWEST], ep)
l.mut.Unlock()
}
// returns a string slice of hex-encoded endpoint IDs
func (l *Lookup) GetClientEndpoints(topic []byte) []string {
return l.GetEndpoints(string(topic), TYPE_CLIENT)
}
func (l *Lookup) GetServerEndpoints(topic []byte) []string {
return l.GetEndpoints(string(topic), TYPE_SERVER)
}
// returns a string slice of hex-encoded endpoint IDs
func (l *Lookup) GetEndpoints(topic string, peerType int) []string {
var endpoints []string
l.mut.RLock()
for _, ep := range l.ListsByHash[topic] {
if ep.Type == peerType {
endpoints = append(endpoints, ep.ID)
}
}
l.mut.RUnlock()
return endpoints
}
// forget - remove tracking an endpoint, optionally clearing lookup.SeenHistory tracking that endpoint
// NOTE (2026-06-27): will probably need a mutex for all operations that alter slices + maps (or use RWMap)
func (l *Lookup) forget(endpointID string, clearSeenHistory bool) {
// CONCURRENCY NOTE (2026-07-01): lookup.Forget does not have any locks because all callees are expected to call
// lock/unlock before calling forget. this is to prevent a deadlock situation that would otherwise occur.
// initial sketch
// -> get endpoint by id from map[endpointId]
// -> remove: map[topichash].forEach(ep => if (ep === id) remove from map; break)
// -> remove: from map[endpointId]
ep, exists := l.ByID[endpointID]
if !exists { return }
// topic-hash related cleanup
_, exists = l.ListsByHash[ep.TopicHash]
if exists {
l.ListsByHash[ep.TopicHash] = copyAllExcept(l.ListsByHash[ep.TopicHash], ep)
// if after removing, topic no longer tracks any endpoints then remove the topic
if len(l.ListsByHash[ep.TopicHash]) == 0 {
delete(l.ListsByHash, ep.TopicHash)
}
}
if clearSeenHistory {
l.removeFromSeenHistory(ep)
}
delete(l.ByID, endpointID)
}
// prune() - clears out oldest, and moves all endpoints up a step
// returns true if prune ran, false if exited early
func (l *Lookup) Prune() bool {
// pseudocode i originally sketched this out with
// -> if seenHistory[14].getFirst().time.quantize < time.now() + 14 -> return early, dont continue
// -> seenHistory[14].forEach(endpointid => forget(endpointid))
// -> copy back one step: seenHistory[14] = seenHistory[13] ... seenHistory[0] = new seenHistory
var pruneEligible bool
fmt.Println("run prune")
// figure out if now is a good time to prune by finding the first time bucket with entries
l.mut.Lock()
for i := range OLDEST {
if len(l.SeenHistory[i]) > 0 {
tEndpoint := l.SeenHistory[i][0].LastSeen
// look at first entry to check if history is stale & needs pruning
diff := time.Now().Sub(tEndpoint)
pruneEligible = diff > time.Hour * 24 * time.Duration(i+1) // +1 bc slice/iteration starts at 0 and we're talking about whole days
break
}
}
l.mut.Unlock()
if !pruneEligible {
return false
}
l.mut.Lock()
// forget all endpoints in oldest bucket
for _, ep := range l.SeenHistory[OLDEST] {
l.forget(ep.ID, false)
}
// clear oldest
l.SeenHistory[OLDEST] = []*StoredEndpoint{}
// mark all other endpoints as being one step older
for i := OLDEST - 1; i >= NEWEST; i-- {
// TODO (2026-06-28): could make this 'more accurate' by looking at each entry's LastSeen and figuring out if they
// should be moved or not
// TODO (2026-06-28): and also how many steps they should be moved! e.g. if prune is not being run at least once every day
if len(l.SeenHistory[i]) > 0 {
l.SeenHistory[i+1] = append(l.SeenHistory[i+1], l.SeenHistory[i]...)
}
// clear current
l.SeenHistory[i] = []*StoredEndpoint{}
}
// clear newest
l.SeenHistory[NEWEST] = []*StoredEndpoint{}
l.mut.Unlock()
return true
}
/* storage & structs
to be able to also store server endpoints, the 'topic hash' used is simply sha256hash(serverSyncKey)
map[EndpointID] -> *StoredEndpoint
map[topichash] -> []*StoredEndpoint
buckets of time to figure out which endpointIDs to yeet? 15 days 'retention'?
seenHistory := [][]string
seenHistory[0-14; 14 is oldest] -> []{endpointID string}
seenHistory[0]
below i brainstorm functions to further elucidate what type of structs would help
// TODO (2026-06-27): continue impl with func bump
bump(id)
-> get endpoint by id from map[endpointId]
-> get old endpoint seen time & quantize to #days from now
-> update endpoint.LastSeen = time.Now()
-> seenHistory[oldSeenTime].forEach((i, e) => if(e == endpoint) removeFromTimeSlice(i); break)
-> update seenHistory[0] = append(seenHistory[0], endpointID)
prune() - clears out oldest, and moves all endpoints up a step
-> if seenHistory[14].getFirst().time.quantize < time.now() + 14 -> return early, dont continue
-> seenHistory[14].forEach(endpointid => forget(endpointid))
-> copy back one step: seenHistory[14] = seenHistory[13] ... seenHistory[0] = new seenHistory
forget(id)
-> get endpoint by id from map[endpointId]
-> remove: map[topichash].forEach(ep => if (ep === id) remove from map; break)
-> remove: from map[endpointId]
add(topichash, id)
-> check if already exists if _, exists := map[endpointId]; exists { return early }
-> ep := StoredEndpoint{ID: id, LastSeen: time.Now()}
-> add: map[endpointId] = &ep
-> add: seenHistory[0] = append(seenHistory[0], endpointId)
-> add: map[topichash] = append(topichash, id)
getEndpoints(topichash)
-> if endpoints, exists := map[topichash]; exists { return endpoints } return empty
*/
/* SIMPLE PROTOCOL
HANDSHAKE:
hello i am another server: 1,sha256hash(serverSyncKey)
V1/SERVER/HELLO/<HASH>
-> disconnect if hash incorrect
-> HELLO if ok
hello i am a client: 2,sha256hash(sha256hash(serverReadKey))
V1/CLIENT/HELLO/<HASH>
-> disconnect if hash incorrect
-> HELLO if ok
CLIENT COMMS:
PUT: sha256hash(topic),endpointID
V1/CLIENT/PUT/<TOPIC-HASH>
? in iroh can you get the endpoint id of an incoming connection?
! yes! connection.RemoteId() -> don't have to trust on the put endpoint
GET/CLIENT: sha256hash(topic)
V1/CLIENT/GET/<TOPIC-HASH>
-> list of endpointIDs
-> 0
V1/CLIENT/GET/<HASH>
SERVER COMMS:
PUT: sha256hash(topic), list of endpointID
V1/SERVER/PUT/<TOPIC-HASH>/ENDPOINT,ENDPOINT...ENDPOINT
GET/SERVER: sha256hash(topic) - can be used to query outwards if client.get would otherwise return 0
V1/SERVER/GET/<TOPIC-HASH>
-> list of endpointID
-> 0
QUERY-SERVERS: - get list of endpoints that map to tracker servers
V1/QUERY-SERVERS
-> list of server endpointID (excl or incl the answering server?)
-> 0
PING/SERVER - send a ping to a server endpoint to check if it responds (?)
V1/SERVER/PING
*/
var (
alpn = util.GetALPN(proto.VERSION)
synckey string = "server-hello-there-iroh-tracker" // aka federation-key
readkey string = "client-hello-there-iroh-tracker"
)
type Host struct {
look *Lookup
// note: {read,sync}key are the twice sha256 hashed representations of the corresponding key
readkey []byte
synckey []byte
}
const PRUNE_INTERVAL = time.Hour * 24
func startTimer(fn func () bool) {
var timer *time.Timer
timer = time.AfterFunc(PRUNE_INTERVAL, func () {
fn()
// reschedule to run again
timer.Reset(PRUNE_INTERVAL)
})
}
// TODO (2026-06-27): config relays and use own relay set
func main () {
fmt.Printf("secret sync key: %s\n", synckey)
fmt.Printf("secret read key: %s\n", readkey)
fmt.Printf("hash(hash(sync))=%x\n", util.TwiceHashedKey(synckey))
fmt.Printf("hash(hash(read))=%x\n", util.TwiceHashedKey(readkey))
lookup := NewLookup()
fmt.Println(lookup)
// keep {sync,read}key byte slices as hexadecimal encoded bc that's what we are getting in from requests
h := Host{look: lookup, synckey: []byte(fmt.Sprintf("%x", util.TwiceHashedKey(synckey))), readkey: []byte(fmt.Sprintf("%x", util.TwiceHashedKey(readkey)))}
// start timer and run prune once a day
startTimer(h.look.Prune)
// construct same endpoint over and over by reusing secret key
sum32 := sha256.Sum256([]byte("my-secret-key-two"))
// get []byte from a [32]byte
sum := sum32[:]
preset := iroh.PresetN0()
opts := iroh.EndpointOptions{
Preset: &preset,
Alpns: &[][]byte{alpn},
SecretKey: &sum,
}
endpoint, err := iroh.EndpointBind(opts)
util.Check(err)
endpoint.Online()
fmt.Println("this endpoint is online and listening! use this id to connect via a peer:")
fmt.Printf("tracker\n -endpoint\n%s\n", endpoint.Id())
go func() { h.listen(endpoint) }()
util.AwaitInterrupt()
}
var CLIENT_HELLO = []byte(fmt.Sprintf("V%d/CLIENT/HELLO/", proto.VERSION))
var CLIENT_PUT = []byte(fmt.Sprintf("V%d/CLIENT/PUT/", proto.VERSION))
var CLIENT_GET = []byte(fmt.Sprintf("V%d/CLIENT/GET/", proto.VERSION))
func (h *Host) listen(e *iroh.Endpoint) {
for {
incoming := *e.AcceptNext()
go h.handleIncoming(incoming)
}
}
func (h *Host) handleIncoming(incoming *iroh.Incoming) {
accepting, err := incoming.Accept()
util.Check(err)
conn, err := accepting.Connect()
util.Check(err)
fmt.Println("new connection", conn.RemoteId())
fmt.Println()
stream, err := conn.AcceptBi()
util.Check(err)
// closure that has access to conn.RemoteId and h *Host
done := make(chan struct{})
processClosure := func (incoming []byte) ([]byte, bool) {
// NOTE (2026-07-01): in future the future endpoints might contain ticket information? figure out if 1) we can
// determine from an incoming connection and 2) how to represent that properly
// want remote ID as a hex-encoded string represented as a byte slice
// (we use endpoints as string keys elsewhere for map-reasons)
return h.process([]byte(conn.RemoteId().String()), incoming, done)
}
responseCh := make(chan []byte)
go util.HandleReading(stream.Recv(), processClosure, responseCh, done)
go util.SendResponses(stream.Send(), responseCh, done)
<-done
conn.Close(1, []byte("BYE:0"))
}
// returns boolean true is responseBytes is populated, false if no response should be sent
func (h *Host) process (remoteID, incoming []byte, done chan struct{}) ([]byte, bool) {
fmt.Println("raw data", string(incoming))
fmt.Printf("from remote id %s\n", remoteID)
// TODO (2026-06-28): if peerType == TYPE_CLIENT -> pass to a handler that only handles CLIENT/
// TODO (2026-06-30): decide actual responses {GOOD/BAD}
if hash, exists := bytes.CutPrefix(incoming, CLIENT_HELLO); exists {
resp, correctHash := h.handleClientHello(hash)
if !correctHash {
fmt.Println("not correct hash")
close(done)
return nil, false
}
return resp, true
} else if hash, exists := bytes.CutPrefix(incoming, CLIENT_PUT); exists {
return h.handleClientPut(hash, remoteID), true
} else if hash, exists := bytes.CutPrefix(incoming, CLIENT_GET); exists {
return h.handleClientGet(hash), true
} else {
fmt.Println(string(incoming))
}
fmt.Println()
return nil, false
}
func (h *Host) handleClientHello(readhash []byte) ([]byte, bool) {
fmt.Printf("1: %s\n2: %s\n", h.readkey, readhash)
if bytes.Equal(h.readkey, readhash) {
return []byte("HELLO:welcome"), true
}
// close connection
return nil, false
}
// TODO (2026-06-29): register endpoint into lookup for topic
func (h *Host) handleClientPut(topichash, remoteID []byte) []byte {
h.look.AddClient(topichash, remoteID)
return []byte("PUT:OK")
}
func (h *Host) handleClientGet(topichash []byte) []byte {
endpoints := h.look.GetClientEndpoints(topichash)
if len(endpoints) == 0 {
return []byte("GET:0")
}
return []byte(fmt.Sprintf("GET:%s", strings.Join(endpoints, ",")))
}