cowmesh-network-test/moonlight_analytics.py

291 lines
12 KiB
Python

import asyncio
import logging
from datetime import date, datetime
from telegram import Update
from telegram.ext import ApplicationBuilder, ContextTypes, CommandHandler, MessageHandler, filters
import os
import json
from cowmesh_pi_speedtest import CowmeshPiSpeedtestTester
from cowmesh_router_iperf_test import CowmeshRouterIperfTester
from cowmesh_pi_iperf_test import CowmeshPiIperfTester
PROJECT_PATH = os.path.abspath(os.path.dirname(__file__))
SECRETS_PATH = os.path.join(PROJECT_PATH, "secrets.json")
LOG_DIR_PATH = "/datadrive/apps/moonlight_analytics/results"
with open(SECRETS_PATH, 'r') as f:
SECRETS = json.loads(f.read())
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
class MoonlightTester:
async def start(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
await context.bot.send_message(chat_id=update.effective_chat.id, text="This is the moonlight analytics bot for mesh network diagnostics.\n\nType /help to see available commands.", message_thread_id=update.message.message_thread_id)
async def caps(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""
this is just a function to test that the bot is working as expected
"""
text_caps = ' '.join(context.args).upper()
print("chat_id: {}".format(update.effective_chat.id))
print("message_thread_id: {}".format(update.message.message_thread_id))
await context.bot.send_message(chat_id=update.effective_chat.id, text=text_caps, message_thread_id=update.message.message_thread_id)
async def router_iperf(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
time = context.args[0] if context.args else None
if not time:
time = 10
log_name = "moonlight-{:%m-%d-%Y}-{:%H-%M}-router-iperf.txt".format(date.today(), datetime.now())
log_location = os.path.join(LOG_DIR_PATH, log_name)
async def offline_log(msg):
with open(log_location, 'a') as log_file:
log_file.write(msg + "\n")
async def log(msg):
await offline_log(msg)
await context.bot.send_message(chat_id=update.effective_chat.id, text=msg,
message_thread_id=update.message.message_thread_id)
if update.effective_chat.id != int(SECRETS["TELEGRAM_LOG_CHAT_ID"]):
await log("++ can only start iperf test from Moonlight Bot group")
return
await log("++ starting router iperf test with {} seconds per test".format(time))
tester = CowmeshRouterIperfTester(log=log, time=time)
await tester.run_test()
await tester.output_results()
# send log file
document = open(log_location, 'rb')
await context.bot.send_document(update.effective_chat.id, document,
message_thread_id=update.message.message_thread_id)
async def pi_iperf(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
time = context.args[0] if context.args else None
if not time:
time = 10
log_name = "moonlight-{:%m-%d-%Y}-{:%H-%M}-pi-iperf.txt".format(date.today(), datetime.now())
log_location = os.path.join(LOG_DIR_PATH, log_name)
async def offline_log(msg):
with open(log_location, 'a') as log_file:
log_file.write(msg + "\n")
async def log(msg):
await offline_log(msg)
await context.bot.send_message(chat_id=update.effective_chat.id, text=msg, message_thread_id=update.message.message_thread_id)
if update.effective_chat.id != int(SECRETS["TELEGRAM_LOG_CHAT_ID"]):
await log("++ can only start iperf test from Moonlight Bot group")
return
await log("++ starting pi iperf test with {} seconds per test".format(time))
tester = CowmeshPiIperfTester(log=log, time=time)
await tester.run_test()
await tester.output_results()
# send log file
document = open(log_location, 'rb')
await context.bot.send_document(update.effective_chat.id, document, message_thread_id=update.message.message_thread_id)
async def speedtest(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
log_name = "moonlight-{:%m-%d-%Y}-{:%H-%M}-pi-speedtest.txt".format(date.today(), datetime.now())
log_location = os.path.join(LOG_DIR_PATH, log_name)
async def offline_log(msg):
with open(log_location, 'a') as log_file:
log_file.write(msg + "\n")
async def log(msg):
await offline_log(msg)
await context.bot.send_message(chat_id=update.effective_chat.id, text=msg,
message_thread_id=update.message.message_thread_id)
if update.effective_chat.id != int(SECRETS["TELEGRAM_LOG_CHAT_ID"]):
await log("++ can only start test from Moonlight Bot group")
return
await log("++ starting pi speedtest-cli speedtest")
tester = CowmeshPiSpeedtestTester(log=log)
await tester.run_test()
await tester.output_results()
# send log file
document = open(log_location, 'rb')
await context.bot.send_document(update.effective_chat.id, document,
message_thread_id=update.message.message_thread_id)
async def unknown(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
await context.bot.send_message(chat_id=update.effective_chat.id, text="Sorry, I didn't understand that, please run /help for a list of available commands.", message_thread_id=update.message.message_thread_id)
def about_message(self):
msg = "This is a bot designed to help measure the performance of a mesh network through active testing. " \
"Every night the bot runs a network test using iperf to measure the connectivity between all the nodes in the mesh, " \
"and logs the result to this channel. " \
"Members of this channel can also initiate a new network iperf test at any time by sending a message to this channel, " \
"with the command /iperf . " \
"Telegram users outside of this channel cannot initiate a test, to help keep the network secure from being " \
"overrun by malicious users. " \
"The network test runs at night because that is when the fewest people are using the network, and so is more likely to " \
"give consistent results with less random variability. " \
"Please be mindful of initiating too many iperf tests during the day, as it uses a lot of network resources " \
"to run the test and could interfere with the internet connections of people using the network. "
return msg
def help_message(self):
msg = "This bot runs an iperf test every night and logs the results here. You can also initiate a new test using the command /iperf " \
"or read a longer message explaining how this bot works by typing the command /readme."
return msg
async def about(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
text = self.about_message()
await context.bot.send_message(chat_id=update.effective_chat.id, text=text, message_thread_id=update.message.message_thread_id)
async def help_fun(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
text = self.help_message()
await context.bot.send_message(chat_id=update.effective_chat.id, text=text, message_thread_id=update.message.message_thread_id)
async def send_log(self, bot, chat_id, message_thread_id, log_location):
# send log file
document = open(log_location, 'rb')
await bot.send_document(chat_id, document, message_thread_id=message_thread_id)
async def nightly_router_iperf(self, time=10):
token = SECRETS["TELEGRAM_TOKEN"]
application = ApplicationBuilder().token(token).build()
chat_id = SECRETS["TELEGRAM_LOG_CHAT_ID"]
message_thread_id = SECRETS.get("TELEGRAM_LOG_MESSAGE_THREAD_ID")
bot = application.bot
log_name = "moonlight-{:%m-%d-%Y}-{:%H:%m}-router-iperf.txt".format(datetime.today(), datetime.now())
log_location = os.path.join(LOG_DIR_PATH, log_name)
async def offline_log(msg):
with open(log_location, 'a') as log_file:
log_file.write(msg)
async def log(msg):
await offline_log(msg)
try:
await bot.send_message(chat_id=chat_id, text=msg, message_thread_id=message_thread_id)
except:
pass
await log("☾☾ starting nightly router-to-router iperf test with {time} seconds per test".format(time=time))
tester = CowmeshRouterIperfTester(log=log, time=time)
await tester.run_test()
await tester.output_results()
await self.send_log(bot=bot, log_location=log_location, chat_id=chat_id, message_thread_id=message_thread_id)
async def nightly_pi_iperf(self, time=10):
token = SECRETS["TELEGRAM_TOKEN"]
application = ApplicationBuilder().token(token).build()
chat_id = SECRETS["TELEGRAM_LOG_CHAT_ID"]
message_thread_id = SECRETS.get("TELEGRAM_LOG_MESSAGE_THREAD_ID")
bot = application.bot
log_name = "moonlight-{:%m-%d-%Y}-{:%H:%m}-pi-iperf.txt".format(datetime.today(), datetime.now())
log_location = os.path.join(LOG_DIR_PATH, log_name)
async def offline_log(msg):
with open(log_location, 'a') as log_file:
log_file.write(msg)
async def log(msg):
await offline_log(msg)
try:
await bot.send_message(chat_id=chat_id, text=msg, message_thread_id=message_thread_id)
except:
pass
await log("☾☾ starting nightly computer-to-computer iperf test with {} seconds per test".format(time))
tester = CowmeshPiIperfTester(log=log, time=time)
await tester.run_test()
await tester.output_results()
await self.send_log(bot=bot, log_location=log_location, chat_id=chat_id, message_thread_id=message_thread_id)
async def nightly_pi_speedtest(self):
token = SECRETS["TELEGRAM_TOKEN"]
application = ApplicationBuilder().token(token).build()
chat_id = SECRETS["TELEGRAM_LOG_CHAT_ID"]
message_thread_id = SECRETS.get("TELEGRAM_LOG_MESSAGE_THREAD_ID")
bot = application.bot
log_name = "moonlight-{:%m-%d-%Y}-{:%H:%m}-pi-speedtest.txt".format(datetime.today(), datetime.now())
log_location = os.path.join(LOG_DIR_PATH, log_name)
async def offline_log(msg):
with open(log_location, 'a') as log_file:
log_file.write(msg)
async def log(msg):
await offline_log(msg)
try:
await bot.send_message(chat_id=chat_id, text=msg, message_thread_id=message_thread_id)
except:
pass
await log("☾☾ starting nightly speedtest-cli speedtest")
tester = CowmeshPiSpeedtestTester(log=log)
await tester.run_test()
await tester.output_results()
await self.send_log(bot=bot, log_location=log_location, chat_id=chat_id, message_thread_id=message_thread_id)
def init_bot_listener(self):
token = SECRETS["TELEGRAM_TOKEN"]
application = ApplicationBuilder().token(token).build()
start_handler = CommandHandler('start', self.start)
application.add_handler(start_handler)
caps_handler = CommandHandler('caps', self.caps)
application.add_handler(caps_handler)
about_handler = CommandHandler('readme', self.about)
application.add_handler(about_handler)
help_handler = CommandHandler('help', self.help_fun)
application.add_handler(help_handler)
iperf_handler = CommandHandler('iperf', self.router_iperf)
application.add_handler(iperf_handler)
pi_iperf_handler = CommandHandler('pi_iperf', self.pi_iperf)
application.add_handler(pi_iperf_handler)
speedtest_handler = CommandHandler('speedtest', self.speedtest)
application.add_handler(speedtest_handler)
unknown_handler = MessageHandler(filters.COMMAND, self.unknown)
application.add_handler(unknown_handler)
application.run_polling()
if __name__ == '__main__':
tester = MoonlightTester()
tester.init_bot_listener()