diff --git a/unnamed-project.go b/unnamed-project.go index fadfadc..252c006 100644 --- a/unnamed-project.go +++ b/unnamed-project.go @@ -1,12 +1,18 @@ package main import ( + "container/list" + "encoding/json" "flag" "fmt" "log" + "net" "os" "os/user" "path/filepath" + "strings" + "sync" + "time" ) const help = `unnamed-project: WIP @@ -53,7 +59,7 @@ func main() { debug: debugFlag, } - fmt.Println(conf) + serve(conf) } @@ -74,3 +80,209 @@ type config struct { sharePath string debug bool } + +type announcer struct { + conf *config +} + +type nodeInfo struct { + nodeID string `json:"nodeID"` + addr string `json:"addr"` + webPort int `json:"webPort"` + lastMulticast int64 `json:"lastMulticast"` +} + +var ( + nodeMutex sync.Mutex +) + +func newAnnouncePacket(n *nodeInfo) (string, error) { + jsonMessage, err := json.Marshal(n) + if err != nil { + return "", err + } + + message := fmt.Sprintf("%s%s%s", header, nodeAnnounceCommand, jsonMessage) + + return message, nil +} + +func announceNode(nodeInfo *nodeInfo) { + address, err := net.ResolveUDPAddr("udp", multicastAddress) + if err != nil { + return + } + + conn, err := net.DialUDP("udp", nil, address) + if err != nil { + return + } + + for { + fmt.Println("sending multicast info") + + message, err := newAnnouncePacket(nodeInfo) + if err != nil { + fmt.Println("Could not get announce package") + fmt.Println(err) + continue + } + + conn.Write([]byte(message)) + time.Sleep(announceIntervalSec * time.Second) + } +} + +const ( + multicastAddress = "239.6.6.6:1337" + multicastBufferSize = 4096 + nodeAnnounceCommand = "\x01" + header = "\x60\x0D\xF0\x0D" + minPackageSize = 6 + expireTimeoutSec = 50 + announceIntervalSec = 10 +) + +func announcedNodeHandler(ninfo *nodeInfo, nodeList *list.List) { + nodeMutex.Lock() + updateNodeList(ninfo, nodeList) + nodeMutex.Unlock() + + fmt.Println("Printing nodes") + + fmt.Print("[") + for el := nodeList.Front(); el != nil; el = el.Next() { + fmt.Print(el.Value.(*nodeInfo).nodeID, " ") + } + fmt.Print("]\n\n") +} + +func updateNodeList(ninfo *nodeInfo, nodeList *list.List) { + nodeExists := false + for el := nodeList.Front(); el != nil; el = el.Next() { + tmp := el.Value.(*nodeInfo) + + // Already in list + if tmp.nodeID == ninfo.nodeID { + tmp.lastMulticast = time.Now().Unix() + fmt.Printf("Updating node %s multicast\n", ninfo.nodeID) + nodeExists = true + break + } + + } + + for el := nodeList.Front(); el != nil; el = el.Next() { + tmp := el.Value.(*nodeInfo) + if isNodeExpired(tmp, expireTimeoutSec) { + fmt.Println("Node expired, removing: ", tmp.nodeID) + nodeList.Remove(el) + } + } + + if !nodeExists { + fmt.Printf("Adding new node! %p %s\n", ninfo, ninfo.nodeID) + ninfo.lastMulticast = time.Now().Unix() + nodeList.PushBack(ninfo) + } +} + +func isNodeExpired(nodeInfo *nodeInfo, timeout int) bool { + diff := time.Now().Unix() - nodeInfo.lastMulticast + return diff > int64(timeout) +} + +func parseAnnouncePacket(size int, addr *net.UDPAddr, packet []byte) (*nodeInfo, error) { + if size <= minPackageSize { + return nil, fmt.Errorf("Invalid packet size") + } + + if strings.Compare(string(packet[0:len(header)]), header) != 0 { + return nil, fmt.Errorf("Invalid packet header") + } + + if string(packet[len(header):len(header)+1]) != nodeAnnounceCommand[0:] { + return nil, fmt.Errorf("Command different than NODE_ANNOUNCE_COMMAND") + } + + fmt.Println("Packet command is nodeAnnounceCommand") + + payload := string(packet[len(header)+1:]) + payload = strings.Trim(payload, "\x00") + + nodeInfo := &nodeInfo{} + + err := json.Unmarshal([]byte(payload), nodeInfo) + nodeInfo.addr = addr.IP.String() + nodeInfo.nodeID = fmt.Sprintf("%s-%s", nodeInfo.nodeID, nodeInfo.addr) + if err != nil { + return nil, err + } + + return nodeInfo, nil +} + +func listenForNodes(nodeList *list.List) { + address, err := net.ResolveUDPAddr("udp", multicastAddress) + if err != nil { + return + } + + conn, err := net.ListenMulticastUDP("udp", nil, address) + if err != nil { + return + } + + conn.SetReadBuffer(multicastBufferSize) + + for { + packet := make([]byte, multicastBufferSize) + size, udpAddr, err := conn.ReadFromUDP(packet) + if err != nil { + fmt.Println(err) + continue + } + + nodeInfo, err := parseAnnouncePacket(size, udpAddr, packet) + + if err != nil { + fmt.Println(err) + continue + } + fmt.Printf("Received multicast packet from %s Id: %s\n", udpAddr.String(), nodeInfo.nodeID) + + go announcedNodeHandler(nodeInfo, nodeList) + } +} + +func (a *announcer) Start(nodeList *list.List) { + nodeInfo := &nodeInfo{ + nodeID: a.conf.nodeID, + addr: "", + webPort: a.conf.webPort, + lastMulticast: 0, + } + + go announceNode(nodeInfo) + go listenForNodes(nodeList) +} + +func startAnnouncer(conf *config, nodeList *list.List) { + announcer := &announcer{conf: conf} + announcer.Start(nodeList) +} + +func serve(conf *config) { + nodeList := list.New() + + go startAnnouncer(conf, nodeList) + + // TODO: next steps... + // go fileServe(conf) + // go dashboardServe(conf, nodeList) + + for { + // TODO: do this context cancel trick to escape cleanly? + time.Sleep(time.Minute * 15) + } +}