"""XMPP bots for humans.""" import re from argparse import ArgumentParser from configparser import ConfigParser from datetime import datetime as dt from getpass import getpass from imghdr import what from inspect import cleandoc from json import dumps, loads from logging import DEBUG, INFO, basicConfig, getLogger from os import environ, mkdir from os.path import exists from pathlib import Path from sys import exit, stdout from humanize import naturaldelta from slixmpp import ClientXMPP class SimpleDatabase(dict): """A simple database. It is a dictionary which saves to disk on all writes. It is optimised for ease of hacking and accessibility and not for performance or efficiency. """ def __init__(self, filename, log, *args, **kwargs): """Initialise the object.""" self.filename = Path(filename).absolute() self.log = log self._loads() self.update(*args, **kwargs) def _loads(self): """Load the database.""" if not exists(self.filename): return try: with open(self.filename, "r") as handle: self.update(loads(handle.read())) except Exception as exception: message = f"Loading file storage failed: {exception}" self.log.error(message, exc_info=exception) exit(1) def _dumps(self): """Save the databse to disk.""" try: with open(self.filename, "w") as handle: handle.write(dumps(self, indent=2, sort_keys=True)) except Exception as exception: message = f"Saving file storage failed: {exception}" self.log.error(message, exc_info=exception) exit(1) def __setitem__(self, key, val): """Write data to the database.""" super().__setitem__(key, val) self._dumps() def __delitem__(self, key): """Remove data from the database.""" super().__delitem__(key) self._dumps() def update(self, *args, **kwargs): """Update the database.""" for k, v in dict(*args, **kwargs).items(): self[k] = v self._dumps() class SimpleMessage: """A simple message interface.""" def __init__(self, message, nick, log): """Initialise the object.""" self.message = message self._nick = nick self.log = log @property def text(self): """The entire message text.""" return self.message["body"] @property def content(self): """The content of the message received. This implementation aims to match and extract the content of the messages directed at bots in group chats. So, for example, when sending messages like so. echobot: hi echobot, hi echobot hi The result produced by `message.content` will always be "hi". This makes it easier to work with various commands and avoid messy parsing logic in end-user implementations. """ body = self.message["body"] try: match = fr"^{self._nick}.?(\s)" split = re.split(match, body) filtered = list(filter(None, split)) return filtered[-1].strip() except Exception as exception: message = f"Couldn't parse {body}: {exception}" self.log.error(message, exc_info=exception) return None @property def sender(self): """The sender of the message.""" return self.message["from"] @property def room(self): """The room from which the message originated.""" return self.message["from"].bare @property def receiver(self): """The receiver of the message.""" return self.message["to"] @property def type(self): """The type of the message.""" return self.message["type"] @property def nick(self): """The nick of the message.""" return self.message["mucnick"] @property def url(self): """The URL of a sent file.""" return self.message["oob"]["url"] class Config: """Bot file configuration.""" def __init__(self, name, config): """Initialise the object.""" self.name = name self.config = config self.section = config[self.name] if self.name in config else {} @property def account(self): """The account of the bot.""" return self.section.get("account", None) @property def password(self): """The password of the bot account.""" return self.section.get("password", None) @property def nick(self): """The nickname of the bot.""" return self.section.get("nick", None) @property def avatar(self): """The avatar of the bot.""" return self.section.get("avatar", None) @property def redis_url(self): """The Redis connection URL.""" return self.section.get("redis_url", None) @property def rooms(self): """A list of rooms to automatically join.""" rooms = self.section.get("rooms", None) if rooms is None: return None return [room.strip() for room in rooms.split(",")] @property def no_auto_join(self): """Disable auto-join when invited.""" return self.section.get("no_auto_join", None) @property def port(self): """The port to serve from.""" return self.section.get("port", None) @property def template(self): """The Jinja template to render.""" return self.section.get("template", None) @property def serve(self): """Turn on the web server.""" return self.section.get("serve", None) @property def storage(self): """Choice of storage back-end.""" return self.section.get("storage", None) @property def output(self): """Path to the output directory.""" return self.section.get("output", None) class Bot(ClientXMPP): """XMPP bots for humans.""" DIRECT_MESSAGE_TYPES = ("chat", "normal") GROUP_MESSAGE_TYPES = ("groupchat", "normal") def __init__(self): """Initialise the object.""" self.name = type(self).__name__.lower() self.start = dt.now() self.CONFIG_FILE = f"{self.name}.conf" self.parse_arguments() self.setup_logging() self.read_config() self.init_bot() self.register_xmpp_event_handlers() self.register_xmpp_plugins() self.init_storage() self.init_data() self.run() def parse_arguments(self): """Parse command-line arguments.""" self.parser = ArgumentParser(description="XMPP bots for humans") self.parser.add_argument( "-d", "--debug", help="enable verbose debug logs", action="store_const", dest="log_level", const=DEBUG, default=INFO, ) self.parser.add_argument( "-a", "--account", dest="account", help="account for the bot account", ) self.parser.add_argument( "-p", "--password", dest="password", help="password for the bot account", ) self.parser.add_argument( "-n", "--nick", dest="nick", help="nickname for the bot account", ) self.parser.add_argument( "-av", "--avatar", dest="avatar", help="avatar for the bot account" ) self.parser.add_argument( "-ru", "--redis-url", dest="redis_url", help="redis storage connection URL", ) self.parser.add_argument( "-r", "--rooms", dest="rooms", nargs="+", help="rooms to automatically join", ) self.parser.add_argument( "-naj", "--no-auto-join", default=False, action="store_true", dest="no_auto_join", help="disable automatically joining rooms when invited", ) self.parser.add_argument( "-pt", "--port", dest="port", help="the port to serve from", ) self.parser.add_argument( "-t", "--template", dest="template", help="the template to render", ) self.parser.add_argument( "-s", "--serve", default=False, action="store_true", dest="serve", help="turn on the web server", ) self.parser.add_argument( "-st", "--storage", dest="storage", help="choice of storage back-end", choices=("file", "redis"), ) self.parser.add_argument( "-o", "--output", dest="output", help="path to output directory", ) self.args = self.parser.parse_args() def setup_logging(self): """Arrange logging for the bot.""" basicConfig( level=self.args.log_level, format="%(levelname)-8s %(message)s" ) self.log = getLogger(__name__) def read_config(self): """Read configuration for running bot.""" config = ConfigParser() config_file_path = Path(self.CONFIG_FILE).absolute() if not exists(config_file_path) and stdout.isatty(): self.log.info(f"Did not find {config_file_path}") self.generate_config_interactively() if exists(config_file_path): config.read(config_file_path) self.config = Config(self.name, config) def generate_config_interactively(self): """Generate bot configuration.""" print( "Please enter the XMPP user account of your bot", "This is often referred to as the JID (Jabber ID)", "An example: echobot@vvvvvvaria.org", "See https://xmpp-servers.404.city to make an account", sep="\n", ) account = input("Account: ") print("Please enter the password for your bot account") password = getpass("Password: ") print( "Please enter the nickname for your bot", "Others will use this name to refer to the bot", sep="\n", ) nick = input("Nickname: ") print( "Please list the rooms you want your bot to automatically join", "An example: room1@muc.example.com, room2@muc.example.com", "If your bot only responds to direct messages, leave it blank", sep="\n", ) rooms = input("Rooms: ") inputs = { "account": account, "password": password, "nick": nick, } if rooms: inputs["rooms"] = rooms self.log.debug(f"Received {inputs} as input") config = ConfigParser() config[self.name] = inputs with open(self.CONFIG_FILE, "w") as file_handle: config.write(file_handle) self.log.info(f"Generated {self.CONFIG_FILE}") def init_bot(self): """Initialise bot with connection details.""" account = ( self.args.account or self.config.account or environ.get("XBOT_ACCOUNT", None) ) password = ( self.args.password or self.config.password or environ.get("XBOT_PASSWORD", None) ) nick = ( self.args.nick or self.config.nick or environ.get("XBOT_NICK", None) ) avatar = ( self.args.avatar or self.config.avatar or environ.get("XBOT_AVATAR", None) or "avatar.png" ) redis_url = ( self.args.redis_url or self.config.redis_url or environ.get("XBOT_REDIS_URL", None) ) rooms = ( self.args.rooms or self.config.rooms or environ.get("XBOT_ROOMS", None) ) no_auto_join = ( self.args.no_auto_join or self.config.no_auto_join or environ.get("XBOT_NO_AUTO_JOIN", None) ) port = ( self.args.port or self.config.port or environ.get("XBOT_PORT", None) or "8080" ) template = ( self.args.template or self.config.template or environ.get("XBOT_TEMPLATE", None) or "index.html.j2" ) serve_web = ( self.args.serve or self.config.serve or environ.get("XBOT_SERVE", None) ) storage = ( self.args.storage or self.config.storage or environ.get("XBOT_STORAGE", None) or "file" ) output = ( self.args.output or self.config.output or environ.get("XBOT_OUTPUT", None) or "." ) if not account: self.log.error("Unable to discover account") exit(1) if not password: self.log.error("Unable to discover password") exit(1) if not nick: self.log.error("Unable to discover nick") exit(1) ClientXMPP.__init__(self, account, password) self.account = account self.password = password self.nick = nick self.avatar = avatar self.redis_url = redis_url self.rooms = rooms self.no_auto_join = no_auto_join self.port = port self.serve_web = serve_web self.template = self.load_template(template) self.storage = storage self.output = Path(output).absolute() def load_template(self, template): """Load template via Jinja.""" if not exists(Path(template).absolute()): return None try: from jinja2 import Environment, FileSystemLoader except ModuleNotFoundError as exception: self.log.error( "Missing required dependency jinja2, ", "have you tried `pip install xbotlib[web]`", exc_info=exception, ) exit(1) try: loader = FileSystemLoader(searchpath="./") env = Environment(loader=loader) return env.get_template(template) except Exception as exception: message = f"Unable to load {template}" self.log.error(message, exc_info=exception) exit(1) def register_xmpp_event_handlers(self): """Register functions against specific XMPP event handlers.""" self.add_event_handler("session_start", self.session_start) self.add_event_handler("groupchat_invite", self.group_invite) self.add_event_handler("message", self.direct_message) self.add_event_handler("groupchat_message", self.group_message) self.add_event_handler("message_error", self.error_message) def error_message(self, message): message = SimpleMessage(message, self.nick, self.log) self.log.error(f"Received error message: {message.text}") def direct_message(self, message): """Handle direct message events.""" message = SimpleMessage(message, self.nick, self.log) if message.type not in self.DIRECT_MESSAGE_TYPES: return if message.text.startswith("@"): if self.command(message, to=message.sender): return if not hasattr(self, "direct"): self.log.info(f"Bot.direct not implemented for {self.nick}") return try: self.direct(message) except Exception as exception: message = f"Bot.direct threw exception: {exception}" self.log.error(message, exc_info=exception) if self.storage == "file": self.db._dumps() def session_start(self, message): """Handle session_start event.""" self.send_presence() self.get_roster() self.publish_avatar() self.join_rooms() def publish_avatar(self): """Publish bot avatar.""" abspath = Path(self.avatar).absolute() if not exists(abspath): self.log.info(f"No avatar discovered (tried '{abspath})'") return try: with open(abspath, "rb") as handle: contents = handle.read() except Exception as exception: message = f"Failed to load avatar: {exception}" self.log.error(message, exc_info=exception) return id = self.plugin["xep_0084"].generate_id(contents) info = { "id": id, "type": f"image/{what('', contents)}", "bytes": len(contents), } self.plugin["xep_0084"].publish_avatar(contents) self.plugin["xep_0084"].publish_avatar_metadata(items=[info]) def join_rooms(self): """Automatically join rooms if specified.""" if self.rooms is None: return for room in self.rooms: self.plugin["xep_0045"].join_muc(room, self.config.nick) self.log.info(f"Joining {room} automatically") for room in self._data["invited"]: self.plugin["xep_0045"].join_muc(room, self.config.nick) self.log.info(f"Re-joining {room} (invited previously)") def group_invite(self, message): """Accept invites to group chats.""" room = message["from"] if self.no_auto_join: return self.log.info(f"Not joining {room} (disabled)") self.plugin["xep_0045"].join_muc(room, self.config.nick) self.log.info(f"Joining {room} as invited") if room not in self._data["invited"]: self._data["invited"].append(str(room)) self._data._dumps() def group_message(self, message): """Handle group chat message events.""" message = SimpleMessage(message, self.nick, self.log) if "@" in message.text: if self.meta(message, room=message.room): return miss = message.type not in self.GROUP_MESSAGE_TYPES loop = message.nick == self.nick other = self.nick not in message.text and not message.url if miss or other or loop: return if "@" in message.content: if self.command(message, room=message.room): return if not hasattr(self, "group"): self.log.info(f"Bot.group not implemented for {self.nick}") return try: self.group(message) except Exception as exception: message = f"Bot.group threw exception: {exception}" self.log.error(message, exc_info=exception) if self.storage == "file": self.db._dumps() def register_xmpp_plugins(self): """Register XMPP plugins that the bot supports.""" self.register_plugin("xep_0030") # Service Discovery self.register_plugin("xep_0045") # Multi-User Chat self.register_plugin("xep_0199") # XMPP Ping self.register_plugin("xep_0084") # User Avatar self.register_plugin("xep_0066") # Proces URIs (files, images) if not hasattr(self, "plugins"): self.log.info("No additional plugins loaded") return try: for plugin in self.plugins: self.register_plugin(plugin) self.log.info(f"Loaded {plugin}") except Exception as exception: message = f"Loading additional plugins failed: {exception}" self.log.error(message, exc_info=exception) def init_storage(self): """Initialise the storage back-end.""" file_storage_path = f"{self.output}/{self.nick}.json" if self.storage == "file": try: self.db = SimpleDatabase(file_storage_path, self.log) self.log.info("Successfully loaded file storage") except Exception as exception: message = f"Failed to load {file_storage_path}: {exception}" self.log.error(message, exc_info=exception) exit(1) else: try: from redis import Redis self.db = Redis.from_url(self.redis_url, decode_responses=True) return self.log.info("Successfully connected to Redis storage") except ValueError as exception: message = f"Failed to connect to Redis storage: {exception}" self.log.error(message, exc_info=exception) exit(1) except ModuleNotFoundError as exception: self.log.error( "Missing required dependency using Redis, ", "have you tried `pip install xbotlib[redis]`", exc_info=exception, ) exit(1) def init_data(self): """Initialise internal data storage.""" try: internal_dir = Path(".xbotlib").absolute() if not exists(internal_dir): mkdir(internal_dir) self.log.info("Successfully initialised internal directory") except Exception as exception: message = f"Failed to create {internal_dir}: {exception}" self.log.error(message, exc_info=exception) exit(1) try: internal_storage_path = f"{internal_dir}/data.json" self._data = SimpleDatabase(internal_storage_path, self.log) self.log.info("Successfully loaded internal storage") except Exception as exception: message = f"Failed to load {internal_storage_path}: {exception}" self.log.error(message, exc_info=exception) exit(1) if "invited" not in self._data: self._data["invited"] = [] def run(self): """Run the bot.""" self.connect() try: if self.serve_web: self.log.info("Running the web server") self.run_web_server() if hasattr(self, "setup"): try: self.setup() self.log.info("Finished running setup") except Exception as exception: message = f"Bot.setup failed: {exception}" self.log.error(message, exc_info=exception) self.process(forever=False) except (KeyboardInterrupt, RuntimeError): pass def run_web_server(self): """Run the web server.""" try: from aiohttp.web import Application, get, run_app except ModuleNotFoundError as exception: self.log.error( "Missing required dependency aiohttp, ", "have you tried `pip install xbotlib[web]`", exc_info=exception, ) exit(1) self.web = Application() if hasattr(self, "serve"): self.web.add_routes([get("/", self.serve)]) else: self.web.add_routes([get("/", self.default_serve)]) try: self.routes() self.log.info("Registered additional web routes") except Exception: pass self.log.info(f"Serving on http://0.0.0.0:{self.port}") run_app(self.web, port=self.port, print=None) async def default_serve(self, request): """Default placeholder text for HTML serving.""" try: from aiohttp.web import Response except ModuleNotFoundError as exception: self.log.error( "Missing required dependency aiohttp, ", "have you tried `pip install xbotlib[web]`", exc_info=exception, ) exit(1) return Response(text=f"{self.nick} is alive and well") def reply(self, text, to=None, room=None): """Send back a reply.""" if to is None and room is None: self.log.error("`to` or `room` arguments required for `reply`") exit(1) if to is not None and room is not None: self.log.error("Cannot send to both `to` and `room` for `reply`") exit(1) kwargs = {"mbody": text} if to is not None: kwargs["mto"] = to kwargs["mtype"] = "chat" else: kwargs["mto"] = room kwargs["mtype"] = "groupchat" self.send_message(**kwargs) return True @property def uptime(self): """Time since the bot came up.""" return naturaldelta(self.start - dt.now()) def meta(self, message, **kwargs): """Handle meta command invocations.""" if "@bots" in message.text: return self.reply("🖐️", **kwargs) def command(self, message, **kwargs): """Handle command invocations.""" if "@uptime" in message.content: return self.reply(self.uptime, **kwargs) elif "@help" in message.content: try: return self.reply(cleandoc(self.help), **kwargs) except AttributeError: return self.reply("No help found 🤔️", **kwargs) def respond(self, response, content_type="text/html"): """Send this response back with the web server.""" try: from aiohttp.web import Response except ModuleNotFoundError as exception: self.log.error( "Missing required dependency aiohttp, ", "have you tried `pip install xbotlib[web]`", exc_info=exception, ) exit(1) return Response(text=response, content_type=content_type)