xbotlib/xbotlib.py

456 lines
13 KiB
Python
Raw Normal View History

2021-01-10 13:10:39 +00:00
"""XMPP bots for humans."""
from argparse import ArgumentParser
2021-01-10 13:10:39 +00:00
from configparser import ConfigParser
from datetime import datetime as dt
2021-01-10 13:10:39 +00:00
from getpass import getpass
from imghdr import what
2021-01-16 12:05:30 +00:00
from inspect import cleandoc
2021-01-14 18:18:27 +00:00
from logging import DEBUG, INFO, basicConfig, getLogger
from os import environ
2021-01-10 13:10:39 +00:00
from os.path import exists
from pathlib import Path
from random import choice
from sys import exit, stdout
2021-01-10 13:10:39 +00:00
from humanize import naturaldelta
from redis import Redis
2021-01-10 13:10:39 +00:00
from slixmpp import ClientXMPP
2021-01-13 13:00:59 +00:00
class SimpleMessage:
2021-01-10 13:49:22 +00:00
"""A simple message interface."""
def __init__(self, message):
self.message = message
@property
def body(self):
return self.message["body"]
@property
def sender(self):
2021-01-13 13:42:26 +00:00
return self.message["from"]
@property
def room(self):
2021-01-10 15:51:20 +00:00
return self.message["from"].bare
2021-01-10 13:49:22 +00:00
@property
def receiver(self):
return self.message["to"]
2021-01-10 15:31:17 +00:00
@property
def nickname(self):
return self.message["mucnick"]
@property
def type(self):
return self.message["type"]
2021-01-10 13:49:22 +00:00
class Config:
"""Bot file configuration."""
def __init__(self, name, config):
self.name = name
self.config = config
self.section = config[self.name] if self.name in config else {}
@property
def account(self):
return self.section.get("account", None)
@property
def password(self):
return self.section.get("password", None)
@property
def nick(self):
return self.section.get("nick", None)
2021-01-10 13:10:39 +00:00
class Bot(ClientXMPP):
"""XMPP bots for humans."""
2021-01-10 13:10:39 +00:00
def __init__(self):
self.name = type(self).__name__.lower()
self.start = dt.now()
self.CONFIG_FILE = f"{self.name}.conf"
self.parse_arguments()
self.setup_logging()
2021-01-10 13:10:39 +00:00
self.read_config()
self.init_bot()
self.register_xmpp_event_handlers()
self.register_xmpp_plugins()
self.init_db()
2021-01-10 13:10:39 +00:00
self.run()
def parse_arguments(self):
"""Parse command-line arguments."""
self.parser = ArgumentParser(description="XMPP bots for humans")
self.parser.add_argument(
"-d",
"--debug",
help="Enable verbose debug logs",
action="store_const",
dest="log_level",
const=DEBUG,
default=INFO,
)
self.parser.add_argument(
"-a",
"--account",
dest="account",
help="Account for the bot account (foo@example.com)",
)
self.parser.add_argument(
"-p",
"--password",
dest="password",
help="Password for the bot account",
)
self.parser.add_argument(
"-n",
"--nick",
dest="nick",
help="Nickname for the bot account",
)
self.parser.add_argument(
"-av",
"--avatar",
dest="avatar",
help="Avatar for the bot account",
default="avatar.png",
)
self.args = self.parser.parse_args()
def setup_logging(self):
"""Arrange logging for the bot."""
basicConfig(
level=self.args.log_level, format="%(levelname)-8s %(message)s"
)
2021-01-14 18:18:27 +00:00
self.log = getLogger(__name__)
2021-01-10 13:10:39 +00:00
def read_config(self):
"""Read configuration for running bot."""
config = ConfigParser()
2021-01-10 13:10:39 +00:00
config_file_path = Path(self.CONFIG_FILE).absolute()
if not exists(config_file_path) and stdout.isatty():
2021-01-15 13:32:01 +00:00
self.log.info(f"Did not find {config_file_path}")
self.generate_config_interactively()
2021-01-10 13:10:39 +00:00
if exists(config_file_path):
config.read(config_file_path)
2021-01-10 13:10:39 +00:00
self.config = Config(self.name, config)
2021-01-10 13:10:39 +00:00
def generate_config_interactively(self):
2021-01-10 13:10:39 +00:00
"""Generate bot configuration."""
account = input("Account: ")
password = getpass("Password: ")
nick = input("Nickname: ")
2021-01-10 13:10:39 +00:00
config = ConfigParser()
config[self.name] = {"account": account, "password": password}
2021-01-10 13:10:39 +00:00
2021-01-10 14:45:12 +00:00
if nick:
config[self.name]["nick"] = nick
2021-01-10 14:45:12 +00:00
with open(self.CONFIG_FILE, "w") as file_handle:
2021-01-10 13:10:39 +00:00
config.write(file_handle)
def init_bot(self):
"""Initialise bot with connection details."""
account = (
self.args.account
or self.config.account
or environ.get("XBOT_ACCOUNT", None)
)
password = (
self.args.password
or self.config.password
or environ.get("XBOT_PASSWORD", None)
)
nick = (
self.args.nick or self.config.nick or environ.get("XBOT_NICK", None)
)
if not account:
self.log.error("Unable to discover account")
exit(1)
if not password:
self.log.error("Unable to discover password")
exit(1)
if not nick:
self.log.error("Unable to discover nick")
exit(1)
ClientXMPP.__init__(self, account, password)
self.account = account
self.password = password
self.nick = nick
2021-01-10 13:10:39 +00:00
self.avatar = self.args.avatar
2021-01-10 13:10:39 +00:00
def register_xmpp_event_handlers(self):
"""Register functions against specific XMPP event handlers."""
self.add_event_handler("session_start", self.session_start)
2021-01-13 21:49:52 +00:00
self.add_event_handler("groupchat_invite", self.group_invite)
self.add_event_handler("message", self.direct_message)
self.add_event_handler("groupchat_message", self.group_message)
2021-01-15 11:57:25 +00:00
self.add_event_handler("message_error", self.error_message)
def error_message(self, message):
_message = SimpleMessage(message)
self.log.error(f"Received error message: {_message.body}")
2021-01-10 13:10:39 +00:00
def direct_message(self, message):
2021-01-10 13:10:39 +00:00
"""Handle message event."""
if message["type"] in ("chat", "normal"):
_message = SimpleMessage(message)
2021-01-16 12:05:53 +00:00
if "@" in _message.body:
self.direct_command(_message)
try:
self.direct(_message)
except AttributeError:
2021-01-16 11:50:08 +00:00
self.log.info(f"Bot.direct not implemented for {self.nick}")
2021-01-10 13:10:39 +00:00
2021-01-13 21:49:52 +00:00
def session_start(self, message):
2021-01-10 13:10:39 +00:00
"""Handle session_start event."""
self.send_presence()
self.get_roster()
self.publish_avatar()
def publish_avatar(self):
"""Publish bot avatar."""
try:
abspath = Path(self.avatar).absolute()
with open(abspath, "rb") as handle:
contents = handle.read()
except IOError:
self.log.info(f"No avatar discovered (tried '{abspath}')")
return
id = self.plugin["xep_0084"].generate_id(contents)
info = {
"id": id,
"type": f"image/{what('', contents)}",
"bytes": len(contents),
}
self.plugin["xep_0084"].publish_avatar(contents)
self.plugin["xep_0084"].publish_avatar_metadata(items=[info])
2021-01-10 13:10:39 +00:00
2021-01-13 21:49:52 +00:00
def group_invite(self, message):
"""Accept invites to group chats."""
self.plugin["xep_0045"].join_muc(message["from"], self.config.nick)
2021-01-10 15:31:17 +00:00
def group_message(self, message):
2021-01-10 13:10:39 +00:00
"""Handle groupchat_message event."""
2021-01-10 15:31:17 +00:00
if message["type"] in ("groupchat", "normal"):
if message["mucnick"] != self.config.nick:
_message = SimpleMessage(message)
2021-01-16 12:05:53 +00:00
if _message.body.startswith("@"):
2021-01-15 14:05:49 +00:00
self.meta(_message, room=_message.room)
if f"{self.nick}:!" in _message.body:
self.command(_message, room=_message.room)
try:
self.group(_message)
except AttributeError:
2021-01-16 11:50:08 +00:00
self.log.info(f"Bot.group not implemented for {self.nick}")
2021-01-10 13:10:39 +00:00
def register_xmpp_plugins(self):
"""Register XMPP plugins that the bot supports."""
self.register_plugin("xep_0030") # Service Discovery
self.register_plugin("xep_0045") # Multi-User Chat
self.register_plugin("xep_0199") # XMPP Ping
self.register_plugin("xep_0084") # User Avatar
def init_db(self):
"""Initialise the Redis key/value store."""
url = environ.get("REDIS_URL", None)
if not url:
self.db = None
self.log.info("No storage discovered")
else:
self.db = Redis.from_url(url, decode_responses=True)
self.log.info("Successfully connected to storage")
2021-01-10 13:10:39 +00:00
def run(self):
"""Run the bot."""
self.connect()
2021-01-10 14:37:10 +00:00
try:
self.process()
except KeyboardInterrupt:
pass
2021-01-10 13:49:22 +00:00
def reply(self, body, to=None, room=None):
2021-01-10 18:17:10 +00:00
"""Send back a reply."""
if to is None and room is None:
2021-01-16 11:50:29 +00:00
self.log.error("`to` or `room` arguments required for `reply`")
2021-01-14 21:44:48 +00:00
exit(1)
2021-01-10 18:17:10 +00:00
if to is not None and room is not None:
2021-01-14 21:44:48 +00:00
self.log.error("Cannot send to both `to` and `room` for `reply`")
exit(1)
2021-01-10 18:17:10 +00:00
kwargs = {"mbody": body}
if to is not None:
kwargs["mto"] = to
kwargs["mtype"] = "chat"
else:
kwargs["mto"] = room
kwargs["mtype"] = "groupchat"
self.send_message(**kwargs)
@property
def uptime(self):
"""Time since the bot came up."""
return naturaldelta(self.start - dt.now())
2021-01-16 12:05:53 +00:00
def direct_command(self, message):
"""Handle commands in direct messages."""
if "@uptime" in message.body:
self.reply(self.uptime, to=message.sender)
elif "@help" in message.body:
try:
2021-01-16 12:05:53 +00:00
self.reply(cleandoc(self.help), to=message.sender)
except AttributeError:
2021-01-16 12:05:53 +00:00
self.reply("No help found 🤔️", to=message.sender)
else:
self.log.info(f"'{message.body}' not handled")
2021-01-13 13:08:44 +00:00
class EchoBot(Bot):
"""Responds with whatever you send.
2021-01-13 13:08:44 +00:00
Simply direct message the bot and see if you get back what you sent. It
also works in group chats but in this case you need to summon the bot using
2021-01-15 20:14:38 +00:00
its nickname.
2021-01-13 13:08:44 +00:00
"""
2021-01-16 11:34:26 +00:00
help = "I echo messages back 🖖️"
def direct(self, message):
"""Send back whatever we receive."""
self.reply(message.body, to=message.sender)
2021-01-13 13:31:00 +00:00
def group(self, message):
"""Send back whatever receive in group chats."""
if "echobot" in message.body:
self.reply(message.body.split(":")[-1], room=message.room)
2021-01-13 13:08:44 +00:00
class WhisperBot(Bot):
"""Anonymous whispering in group chats.
2021-01-13 13:08:44 +00:00
In order to activate this bot you can invite it to your group chat. Once
invited, you can start a private chat with the bot and tell it you want it
to whisper your message into the group chat. The bot will then do this on
your behalf and not reveal your identity. This is nice when you want to
communicate with the group anonymously.
2021-01-13 13:08:44 +00:00
"""
2021-01-16 11:34:26 +00:00
help = "I whisper private messages into group chats 😌️"
def direct(self, message):
"""Receive private messages and whisper them into group chats."""
self.reply(f"*pssttt...* {message.body}", room=message.room)
2021-01-15 20:14:54 +00:00
class GlossBot(Bot):
"""Building a shared glossary together.
A glossary is "an alphabetical list of terms in a particular domain of
knowledge with the definitions for those terms."
This bot reacts to commands which insert, list or delete items from a
shared glossary when summoned in a group chat. This bot makes use of
persistent storage so the glossary is always there even if the bot goes
away.
"""
2021-01-16 12:05:30 +00:00
help = """
I help build a shared glossary
2021-01-16 11:34:26 +00:00
glossbot: @add <entry> - <definition>
glossbot: @rm <entry>
glossbot: @rand
glossbot: @ls
2021-01-15 20:14:54 +00:00
"""
2021-01-15 20:18:20 +00:00
def group(self, message):
2021-01-16 11:29:37 +00:00
"""Handle glossary commands."""
2021-01-15 20:14:54 +00:00
if "!add" in message.body:
try:
_, body = message.body.split(":!add")
entry, definition = body.strip().split("-").strip()
2021-01-16 11:30:19 +00:00
self.add(entry, definition, room=message.room)
2021-01-15 20:14:54 +00:00
except ValueError:
2021-01-16 11:30:19 +00:00
self.reply("Hmmm, couldn't read that", room=message.room)
2021-01-15 20:14:54 +00:00
elif "!rm" in message.body:
try:
entry = message.body.split(":!rm")[-1].strip()
2021-01-16 11:30:19 +00:00
self.rm(entry, room=message.room)
2021-01-15 20:14:54 +00:00
except ValueError:
2021-01-16 11:30:19 +00:00
self.reply("Hmmm, couldn't read that", room=message.room)
2021-01-15 20:14:54 +00:00
elif "!rand" in message.body:
2021-01-16 11:30:19 +00:00
self.rand(room=message.room)
2021-01-15 20:14:54 +00:00
elif "!ls" in message.body:
2021-01-16 11:30:19 +00:00
self.ls(room=message.room)
2021-01-15 20:14:54 +00:00
else:
self.log.error(f"{message.body} is not recognised")
def add(self, entry, definition, **kwargs):
"""Add a new entry."""
self.db[entry] = definition
self.reply("Added ✌️", **kwargs)
def rand(self, **kwargs):
"""List a random entry."""
2021-01-16 11:30:37 +00:00
if not self.db.keys():
self.reply("Glossary is empty", **kwargs)
return
2021-01-15 20:14:54 +00:00
entry = choice(self.db.keys())
self.reply(f"{entry} - {self.db[entry]}", **kwargs)
def ls(self, **kwargs):
"""List all entries."""
if not self.db.keys():
self.reply("Glossary is empty", **kwargs)
2021-01-16 11:30:37 +00:00
return
2021-01-15 20:14:54 +00:00
for entry in self.db.keys():
self.reply(f"{entry} - {self.db[entry]}", **kwargs)
def rm(self, entry, **kwargs):
"""Remove an entry."""
try:
self.db[entry].delete()
self.reply("Removed ✌️", **kwargs)
except KeyError:
self.reply(f"{entry} doesn't exist?", **kwargs)