479 lines
15 KiB
Go
479 lines
15 KiB
Go
package main
|
|
|
|
import (
|
|
iroh "git.coopcloud.tech/decentral1se/iroh-go"
|
|
"fmt"
|
|
"bytes"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
"crypto/sha256"
|
|
|
|
"gomod.cblgh.org/iroh-tracker/util"
|
|
"gomod.cblgh.org/iroh-tracker/proto"
|
|
// "gomod.cblgh.org/cerca/util/eout"
|
|
)
|
|
|
|
const (
|
|
TYPE_CLIENT = iota
|
|
TYPE_SERVER
|
|
)
|
|
|
|
const (
|
|
OLDEST = 14
|
|
NEWEST = 0
|
|
)
|
|
|
|
type StoredEndpoint struct {
|
|
ID string
|
|
TopicHash string
|
|
LastSeen time.Time
|
|
Type int // client or server
|
|
}
|
|
|
|
func (ep *StoredEndpoint) String () string {
|
|
maxLen := len(ep.ID)
|
|
// truncate
|
|
if maxLen > 16 { maxLen = 16 }
|
|
epType := "srv"
|
|
if ep.Type == TYPE_CLIENT {
|
|
epType = "cli"
|
|
}
|
|
return fmt.Sprintf("%s/%s", epType, ep.ID[0:maxLen])
|
|
}
|
|
|
|
// TODO (2026-07-01): add mutex.RWMutex mut and/or sync.Map
|
|
type Lookup struct {
|
|
mut sync.RWMutex
|
|
ByID map[string]*StoredEndpoint
|
|
ListsByHash map[string][]*StoredEndpoint
|
|
SeenHistory [][]*StoredEndpoint
|
|
}
|
|
|
|
func NewLookup () *Lookup{
|
|
return &Lookup{
|
|
ByID: make(map[string]*StoredEndpoint), // one to one
|
|
ListsByHash: make(map[string][]*StoredEndpoint), // one to many
|
|
SeenHistory: make([][]*StoredEndpoint, 15), // one to many
|
|
}
|
|
}
|
|
|
|
// TODO (2026-07-01): reject if readkey is not correct
|
|
|
|
// TODO (2026-07-01): implement federation (server GET/PUT/QUERY)
|
|
|
|
// add(topichash, id)
|
|
func (l *Lookup) AddClient(topichash, endpointID []byte) {
|
|
// NOTE: topic hash is already hexadecimal
|
|
l.add(string(topichash), string(endpointID), TYPE_CLIENT)
|
|
}
|
|
|
|
// TODO (2026-07-01): pass in server hash instead of using global
|
|
func (l *Lookup) AddServer(endpointID string) {
|
|
l.add(fmt.Sprintf("%x", util.TwiceHashedKey(synckey)), string(endpointID), TYPE_SERVER)
|
|
}
|
|
|
|
func (l *Lookup) add(topichash, endpointID string, endpointType int) {
|
|
// for first version, ignore that a particular endpoint could be registered to many topics
|
|
// one endpoint = one topic
|
|
l.mut.RLock()
|
|
_, exists := l.ByID[endpointID]
|
|
l.mut.RUnlock()
|
|
if exists {
|
|
fmt.Printf("endpoint %s already in lookup. bumping up their seen state\n", endpointID)
|
|
l.Bump(endpointID)
|
|
return
|
|
}
|
|
ep := &StoredEndpoint{ID: endpointID, TopicHash: topichash, LastSeen: time.Now().Add(-1 * time.Hour * 25), Type: endpointType}
|
|
l.mut.Lock()
|
|
l.ByID[endpointID] = ep
|
|
l.ListsByHash[topichash] = append(l.ListsByHash[topichash], ep)
|
|
l.SeenHistory[NEWEST] = append(l.SeenHistory[NEWEST], ep)
|
|
l.mut.Unlock()
|
|
}
|
|
|
|
// remove all but one element from the slice. we do this by just creating a new slice and omitting the deleted element
|
|
func copyAllExcept(slice []*StoredEndpoint, e *StoredEndpoint) []*StoredEndpoint {
|
|
// aint shit to do, buddy
|
|
if len(slice) == 0 { return slice }
|
|
newSlice := make([]*StoredEndpoint, len(slice) - 1)
|
|
for i := 0; i < len(slice); i++ {
|
|
if slice[i].ID == e.ID {
|
|
continue
|
|
}
|
|
newSlice[i] = slice[i]
|
|
}
|
|
return newSlice
|
|
}
|
|
|
|
// utility methd
|
|
func (l *Lookup) removeFromSeenHistory(ep *StoredEndpoint) {
|
|
// CONCURRENCY NOTE (2026-07-01): lookup.Forget does not have any locks because all callees are expected to call
|
|
// lock/unlock before calling forget. this is to prevent a deadlock situation that would otherwise occur.
|
|
|
|
diff := time.Now().Sub(ep.LastSeen)
|
|
daysSinceSeen := int(diff / (time.Hour * 24))
|
|
if daysSinceSeen > OLDEST {
|
|
panic(fmt.Errorf("lookup.removeFromSeenHistory: unhandled case, daysSinceSeen was %d", daysSinceSeen))
|
|
}
|
|
// TODO (2026-06-28): handle special case where LastSeen is > 14 days ago
|
|
// -> should search through all of SeenHistory to find it
|
|
l.SeenHistory[daysSinceSeen] = copyAllExcept(l.SeenHistory[daysSinceSeen], ep)
|
|
}
|
|
|
|
// bump(id)
|
|
// -> get endpoint by id from map[endpointId]
|
|
// -> get old endpoint seen time & quantize to #days from now
|
|
// -> update endpoint.LastSeen = time.Now()
|
|
// -> seenHistory[oldSeenTime].forEach((i, e) => if(e == endpoint) removeFromTimeSlice(i); break)
|
|
// -> update seenHistory[0] = append(seenHistory[0], endpointID)
|
|
func (l *Lookup) Bump(endpointID string) {
|
|
l.mut.RLock()
|
|
ep, exists := l.ByID[endpointID]
|
|
l.mut.RUnlock()
|
|
if !exists { return }
|
|
l.mut.Lock()
|
|
l.removeFromSeenHistory(ep)
|
|
l.mut.Unlock()
|
|
// update last seen
|
|
ep.LastSeen = time.Now()
|
|
l.mut.Lock()
|
|
l.SeenHistory[NEWEST] = append(l.SeenHistory[NEWEST], ep)
|
|
l.mut.Unlock()
|
|
}
|
|
|
|
// returns a string slice of hex-encoded endpoint IDs
|
|
func (l *Lookup) GetClientEndpoints(topic []byte) []string {
|
|
return l.GetEndpoints(string(topic), TYPE_CLIENT)
|
|
}
|
|
|
|
func (l *Lookup) GetServerEndpoints(topic []byte) []string {
|
|
return l.GetEndpoints(string(topic), TYPE_SERVER)
|
|
}
|
|
|
|
// returns a string slice of hex-encoded endpoint IDs
|
|
func (l *Lookup) GetEndpoints(topic string, peerType int) []string {
|
|
var endpoints []string
|
|
l.mut.RLock()
|
|
for _, ep := range l.ListsByHash[topic] {
|
|
if ep.Type == peerType {
|
|
endpoints = append(endpoints, ep.ID)
|
|
}
|
|
}
|
|
l.mut.RUnlock()
|
|
return endpoints
|
|
}
|
|
|
|
// forget - remove tracking an endpoint, optionally clearing lookup.SeenHistory tracking that endpoint
|
|
// NOTE (2026-06-27): will probably need a mutex for all operations that alter slices + maps (or use RWMap)
|
|
func (l *Lookup) forget(endpointID string, clearSeenHistory bool) {
|
|
// CONCURRENCY NOTE (2026-07-01): lookup.Forget does not have any locks because all callees are expected to call
|
|
// lock/unlock before calling forget. this is to prevent a deadlock situation that would otherwise occur.
|
|
|
|
// initial sketch
|
|
// -> get endpoint by id from map[endpointId]
|
|
// -> remove: map[topichash].forEach(ep => if (ep === id) remove from map; break)
|
|
// -> remove: from map[endpointId]
|
|
ep, exists := l.ByID[endpointID]
|
|
if !exists { return }
|
|
|
|
// topic-hash related cleanup
|
|
_, exists = l.ListsByHash[ep.TopicHash]
|
|
|
|
if exists {
|
|
l.ListsByHash[ep.TopicHash] = copyAllExcept(l.ListsByHash[ep.TopicHash], ep)
|
|
// if after removing, topic no longer tracks any endpoints then remove the topic
|
|
if len(l.ListsByHash[ep.TopicHash]) == 0 {
|
|
delete(l.ListsByHash, ep.TopicHash)
|
|
}
|
|
}
|
|
|
|
if clearSeenHistory {
|
|
l.removeFromSeenHistory(ep)
|
|
}
|
|
delete(l.ByID, endpointID)
|
|
}
|
|
|
|
// prune() - clears out oldest, and moves all endpoints up a step
|
|
// returns true if prune ran, false if exited early
|
|
func (l *Lookup) Prune() bool {
|
|
// pseudocode i originally sketched this out with
|
|
// -> if seenHistory[14].getFirst().time.quantize < time.now() + 14 -> return early, dont continue
|
|
// -> seenHistory[14].forEach(endpointid => forget(endpointid))
|
|
// -> copy back one step: seenHistory[14] = seenHistory[13] ... seenHistory[0] = new seenHistory
|
|
|
|
var pruneEligible bool
|
|
fmt.Println("run prune")
|
|
// figure out if now is a good time to prune by finding the first time bucket with entries
|
|
l.mut.Lock()
|
|
for i := range OLDEST {
|
|
if len(l.SeenHistory[i]) > 0 {
|
|
tEndpoint := l.SeenHistory[i][0].LastSeen
|
|
// look at first entry to check if history is stale & needs pruning
|
|
diff := time.Now().Sub(tEndpoint)
|
|
pruneEligible = diff > time.Hour * 24 * time.Duration(i+1) // +1 bc slice/iteration starts at 0 and we're talking about whole days
|
|
break
|
|
}
|
|
}
|
|
l.mut.Unlock()
|
|
|
|
if !pruneEligible {
|
|
return false
|
|
}
|
|
|
|
l.mut.Lock()
|
|
// forget all endpoints in oldest bucket
|
|
for _, ep := range l.SeenHistory[OLDEST] {
|
|
l.forget(ep.ID, false)
|
|
}
|
|
// clear oldest
|
|
l.SeenHistory[OLDEST] = []*StoredEndpoint{}
|
|
|
|
// mark all other endpoints as being one step older
|
|
for i := OLDEST - 1; i >= NEWEST; i-- {
|
|
// TODO (2026-06-28): could make this 'more accurate' by looking at each entry's LastSeen and figuring out if they
|
|
// should be moved or not
|
|
// TODO (2026-06-28): and also how many steps they should be moved! e.g. if prune is not being run at least once every day
|
|
if len(l.SeenHistory[i]) > 0 {
|
|
l.SeenHistory[i+1] = append(l.SeenHistory[i+1], l.SeenHistory[i]...)
|
|
}
|
|
// clear current
|
|
l.SeenHistory[i] = []*StoredEndpoint{}
|
|
}
|
|
// clear newest
|
|
l.SeenHistory[NEWEST] = []*StoredEndpoint{}
|
|
l.mut.Unlock()
|
|
return true
|
|
}
|
|
|
|
/* storage & structs
|
|
to be able to also store server endpoints, the 'topic hash' used is simply sha256hash(serverSyncKey)
|
|
|
|
map[EndpointID] -> *StoredEndpoint
|
|
|
|
map[topichash] -> []*StoredEndpoint
|
|
|
|
buckets of time to figure out which endpointIDs to yeet? 15 days 'retention'?
|
|
|
|
seenHistory := [][]string
|
|
seenHistory[0-14; 14 is oldest] -> []{endpointID string}
|
|
seenHistory[0]
|
|
|
|
below i brainstorm functions to further elucidate what type of structs would help
|
|
|
|
// TODO (2026-06-27): continue impl with func bump
|
|
bump(id)
|
|
-> get endpoint by id from map[endpointId]
|
|
-> get old endpoint seen time & quantize to #days from now
|
|
-> update endpoint.LastSeen = time.Now()
|
|
-> seenHistory[oldSeenTime].forEach((i, e) => if(e == endpoint) removeFromTimeSlice(i); break)
|
|
-> update seenHistory[0] = append(seenHistory[0], endpointID)
|
|
|
|
prune() - clears out oldest, and moves all endpoints up a step
|
|
-> if seenHistory[14].getFirst().time.quantize < time.now() + 14 -> return early, dont continue
|
|
-> seenHistory[14].forEach(endpointid => forget(endpointid))
|
|
-> copy back one step: seenHistory[14] = seenHistory[13] ... seenHistory[0] = new seenHistory
|
|
|
|
forget(id)
|
|
-> get endpoint by id from map[endpointId]
|
|
-> remove: map[topichash].forEach(ep => if (ep === id) remove from map; break)
|
|
-> remove: from map[endpointId]
|
|
|
|
add(topichash, id)
|
|
-> check if already exists if _, exists := map[endpointId]; exists { return early }
|
|
-> ep := StoredEndpoint{ID: id, LastSeen: time.Now()}
|
|
-> add: map[endpointId] = &ep
|
|
-> add: seenHistory[0] = append(seenHistory[0], endpointId)
|
|
-> add: map[topichash] = append(topichash, id)
|
|
|
|
getEndpoints(topichash)
|
|
-> if endpoints, exists := map[topichash]; exists { return endpoints } return empty
|
|
|
|
*/
|
|
|
|
/* SIMPLE PROTOCOL
|
|
|
|
HANDSHAKE:
|
|
hello i am another server: 1,sha256hash(serverSyncKey)
|
|
V1/SERVER/HELLO/<HASH>
|
|
-> disconnect if hash incorrect
|
|
-> HELLO if ok
|
|
hello i am a client: 2,sha256hash(sha256hash(serverReadKey))
|
|
V1/CLIENT/HELLO/<HASH>
|
|
-> disconnect if hash incorrect
|
|
-> HELLO if ok
|
|
|
|
CLIENT COMMS:
|
|
PUT: sha256hash(topic),endpointID
|
|
V1/CLIENT/PUT/<TOPIC-HASH>
|
|
? in iroh can you get the endpoint id of an incoming connection?
|
|
! yes! connection.RemoteId() -> don't have to trust on the put endpoint
|
|
GET/CLIENT: sha256hash(topic)
|
|
V1/CLIENT/GET/<TOPIC-HASH>
|
|
-> list of endpointIDs
|
|
-> 0
|
|
V1/CLIENT/GET/<HASH>
|
|
|
|
SERVER COMMS:
|
|
PUT: sha256hash(topic), list of endpointID
|
|
V1/SERVER/PUT/<TOPIC-HASH>/ENDPOINT,ENDPOINT...ENDPOINT
|
|
GET/SERVER: sha256hash(topic) - can be used to query outwards if client.get would otherwise return 0
|
|
V1/SERVER/GET/<TOPIC-HASH>
|
|
-> list of endpointID
|
|
-> 0
|
|
QUERY-SERVERS: - get list of endpoints that map to tracker servers
|
|
V1/QUERY-SERVERS
|
|
-> list of server endpointID (excl or incl the answering server?)
|
|
-> 0
|
|
PING/SERVER - send a ping to a server endpoint to check if it responds (?)
|
|
V1/SERVER/PING
|
|
*/
|
|
|
|
var (
|
|
alpn = util.GetALPN(proto.VERSION)
|
|
synckey string = "server-hello-there-iroh-tracker" // aka federation-key
|
|
readkey string = "client-hello-there-iroh-tracker"
|
|
)
|
|
|
|
type Host struct {
|
|
look *Lookup
|
|
// note: {read,sync}key are the twice sha256 hashed representations of the corresponding key
|
|
readkey []byte
|
|
synckey []byte
|
|
}
|
|
|
|
const PRUNE_INTERVAL = time.Hour * 24
|
|
|
|
func startTimer(fn func () bool) {
|
|
var timer *time.Timer
|
|
timer = time.AfterFunc(PRUNE_INTERVAL, func () {
|
|
fn()
|
|
// reschedule to run again
|
|
timer.Reset(PRUNE_INTERVAL)
|
|
})
|
|
}
|
|
|
|
// TODO (2026-06-27): config relays and use own relay set
|
|
func main () {
|
|
fmt.Printf("secret sync key: %s\n", synckey)
|
|
fmt.Printf("secret read key: %s\n", readkey)
|
|
fmt.Printf("hash(hash(sync))=%x\n", util.TwiceHashedKey(synckey))
|
|
fmt.Printf("hash(hash(read))=%x\n", util.TwiceHashedKey(readkey))
|
|
lookup := NewLookup()
|
|
fmt.Println(lookup)
|
|
// keep {sync,read}key byte slices as hexadecimal encoded bc that's what we are getting in from requests
|
|
h := Host{look: lookup, synckey: []byte(fmt.Sprintf("%x", util.TwiceHashedKey(synckey))), readkey: []byte(fmt.Sprintf("%x", util.TwiceHashedKey(readkey)))}
|
|
// start timer and run prune once a day
|
|
startTimer(h.look.Prune)
|
|
|
|
// construct same endpoint over and over by reusing secret key
|
|
sum32 := sha256.Sum256([]byte("my-secret-key-two"))
|
|
// get []byte from a [32]byte
|
|
sum := sum32[:]
|
|
|
|
preset := iroh.PresetN0()
|
|
|
|
opts := iroh.EndpointOptions{
|
|
Preset: &preset,
|
|
Alpns: &[][]byte{alpn},
|
|
SecretKey: &sum,
|
|
}
|
|
|
|
endpoint, err := iroh.EndpointBind(opts)
|
|
util.Check(err)
|
|
endpoint.Online()
|
|
|
|
fmt.Println("this endpoint is online and listening! use this id to connect via a peer:")
|
|
fmt.Printf("tracker\n -endpoint\n%s\n", endpoint.Id())
|
|
go func() { h.listen(endpoint) }()
|
|
util.AwaitInterrupt()
|
|
}
|
|
|
|
var CLIENT_HELLO = []byte(fmt.Sprintf("V%d/CLIENT/HELLO/", proto.VERSION))
|
|
var CLIENT_PUT = []byte(fmt.Sprintf("V%d/CLIENT/PUT/", proto.VERSION))
|
|
var CLIENT_GET = []byte(fmt.Sprintf("V%d/CLIENT/GET/", proto.VERSION))
|
|
|
|
func (h *Host) listen(e *iroh.Endpoint) {
|
|
for {
|
|
incoming := *e.AcceptNext()
|
|
go h.handleIncoming(incoming)
|
|
}
|
|
}
|
|
|
|
func (h *Host) handleIncoming(incoming *iroh.Incoming) {
|
|
accepting, err := incoming.Accept()
|
|
util.Check(err)
|
|
|
|
conn, err := accepting.Connect()
|
|
util.Check(err)
|
|
fmt.Println("new connection", conn.RemoteId())
|
|
fmt.Println()
|
|
|
|
stream, err := conn.AcceptBi()
|
|
util.Check(err)
|
|
|
|
// closure that has access to conn.RemoteId and h *Host
|
|
done := make(chan struct{})
|
|
processClosure := func (incoming []byte) ([]byte, bool) {
|
|
// NOTE (2026-07-01): in future the future endpoints might contain ticket information? figure out if 1) we can
|
|
// determine from an incoming connection and 2) how to represent that properly
|
|
// want remote ID as a hex-encoded string represented as a byte slice
|
|
// (we use endpoints as string keys elsewhere for map-reasons)
|
|
return h.process([]byte(conn.RemoteId().String()), incoming, done)
|
|
}
|
|
|
|
responseCh := make(chan []byte)
|
|
go util.HandleReading(stream.Recv(), processClosure, responseCh, done)
|
|
go util.SendResponses(stream.Send(), responseCh, done)
|
|
<-done
|
|
conn.Close(1, []byte("BYE:0"))
|
|
}
|
|
|
|
// returns boolean true is responseBytes is populated, false if no response should be sent
|
|
func (h *Host) process (remoteID, incoming []byte, done chan struct{}) ([]byte, bool) {
|
|
fmt.Println("raw data", string(incoming))
|
|
fmt.Printf("from remote id %s\n", remoteID)
|
|
// TODO (2026-06-28): if peerType == TYPE_CLIENT -> pass to a handler that only handles CLIENT/
|
|
// TODO (2026-06-30): decide actual responses {GOOD/BAD}
|
|
if hash, exists := bytes.CutPrefix(incoming, CLIENT_HELLO); exists {
|
|
resp, correctHash := h.handleClientHello(hash)
|
|
if !correctHash {
|
|
fmt.Println("not correct hash")
|
|
close(done)
|
|
return nil, false
|
|
}
|
|
return resp, true
|
|
} else if hash, exists := bytes.CutPrefix(incoming, CLIENT_PUT); exists {
|
|
return h.handleClientPut(hash, remoteID), true
|
|
} else if hash, exists := bytes.CutPrefix(incoming, CLIENT_GET); exists {
|
|
return h.handleClientGet(hash), true
|
|
} else {
|
|
fmt.Println(string(incoming))
|
|
}
|
|
fmt.Println()
|
|
return nil, false
|
|
}
|
|
|
|
func (h *Host) handleClientHello(readhash []byte) ([]byte, bool) {
|
|
fmt.Printf("1: %s\n2: %s\n", h.readkey, readhash)
|
|
if bytes.Equal(h.readkey, readhash) {
|
|
return []byte("HELLO:welcome"), true
|
|
}
|
|
// close connection
|
|
return nil, false
|
|
}
|
|
|
|
// TODO (2026-06-29): register endpoint into lookup for topic
|
|
func (h *Host) handleClientPut(topichash, remoteID []byte) []byte {
|
|
h.look.AddClient(topichash, remoteID)
|
|
return []byte("PUT:OK")
|
|
}
|
|
|
|
func (h *Host) handleClientGet(topichash []byte) []byte {
|
|
endpoints := h.look.GetClientEndpoints(topichash)
|
|
if len(endpoints) == 0 {
|
|
return []byte("GET:0")
|
|
}
|
|
return []byte(fmt.Sprintf("GET:%s", strings.Join(endpoints, ",")))
|
|
}
|