xbotlib/xbotlib.py

539 lines
16 KiB
Python
Raw Normal View History

2021-01-10 13:10:39 +00:00
"""XMPP bots for humans."""
import re
from argparse import ArgumentParser
2021-01-10 13:10:39 +00:00
from configparser import ConfigParser
from datetime import datetime as dt
2021-01-10 13:10:39 +00:00
from getpass import getpass
from imghdr import what
2021-01-16 12:05:30 +00:00
from inspect import cleandoc
2021-01-14 18:18:27 +00:00
from logging import DEBUG, INFO, basicConfig, getLogger
from os import environ
2021-01-10 13:10:39 +00:00
from os.path import exists
from pathlib import Path
from random import choice
from sys import exit, stdout
2021-01-10 13:10:39 +00:00
from humanize import naturaldelta
from redis import Redis
2021-01-10 13:10:39 +00:00
from slixmpp import ClientXMPP
2021-01-13 13:00:59 +00:00
class SimpleMessage:
2021-01-10 13:49:22 +00:00
"""A simple message interface."""
def __init__(self, message, bot):
"""Initialise the object."""
2021-01-10 13:49:22 +00:00
self.message = message
self.bot = bot
2021-01-10 13:49:22 +00:00
@property
def text(self):
"""The entire message text."""
2021-01-10 13:49:22 +00:00
return self.message["body"]
@property
def content(self):
"""The content of the message received.
This implementation aims to match and extract the content of the
messages directed at bots in group chats. So, for example, when sending
messages like so.
echobot: hi
echobot, hi
echobot hi
The result produced by `message.content` will always be "hi". This
makes it easier to work with various commands and avoid messy parsing
logic in end-user implementations.
"""
body = self.message["body"]
try:
2021-01-16 16:46:50 +00:00
match = fr"^{self.bot.nick}.?(\s)"
split = re.split(match, body)
filtered = list(filter(None, split))
return filtered[-1].strip()
except Exception as exception:
self.bot.log.error(f"Couldn't parse {body}: {exception}")
return None
2021-01-10 13:49:22 +00:00
@property
def sender(self):
"""The sender of the message."""
2021-01-13 13:42:26 +00:00
return self.message["from"]
@property
def room(self):
"""The room from which the message originated."""
2021-01-10 15:51:20 +00:00
return self.message["from"].bare
2021-01-10 13:49:22 +00:00
@property
def receiver(self):
"""The receiver of the message."""
2021-01-10 13:49:22 +00:00
return self.message["to"]
2021-01-10 15:31:17 +00:00
@property
def type(self):
"""The type of the message."""
2021-01-10 15:31:17 +00:00
return self.message["type"]
@property
def nick(self):
"""The nick of the message."""
return self.message["mucnick"]
2021-01-10 13:49:22 +00:00
class Config:
"""Bot file configuration."""
def __init__(self, name, config):
"""Initialise the object."""
self.name = name
self.config = config
self.section = config[self.name] if self.name in config else {}
@property
def account(self):
"""The account of the bot."""
return self.section.get("account", None)
@property
def password(self):
"""The password of the bot account."""
return self.section.get("password", None)
@property
def nick(self):
"""The nickname of the bot."""
return self.section.get("nick", None)
2021-01-16 19:30:08 +00:00
@property
def avatar(self):
"""The avatar of the bot."""
return self.section.get("avatar", None)
@property
def redis_url(self):
"""The Redis connection URL."""
return self.section.get("redis_url", None)
2021-01-10 13:10:39 +00:00
class Bot(ClientXMPP):
"""XMPP bots for humans."""
DIRECT_MESSAGE_TYPES = ("chat", "normal")
GROUP_MESSAGE_TYPES = ("groupchat", "normal")
2021-01-10 13:10:39 +00:00
def __init__(self):
"""Initialise the object."""
self.name = type(self).__name__.lower()
self.start = dt.now()
self.CONFIG_FILE = f"{self.name}.conf"
self.parse_arguments()
self.setup_logging()
2021-01-10 13:10:39 +00:00
self.read_config()
self.init_bot()
self.register_xmpp_event_handlers()
self.register_xmpp_plugins()
self.init_db()
2021-01-10 13:10:39 +00:00
self.run()
def parse_arguments(self):
"""Parse command-line arguments."""
self.parser = ArgumentParser(description="XMPP bots for humans")
self.parser.add_argument(
"-d",
"--debug",
help="Enable verbose debug logs",
action="store_const",
dest="log_level",
const=DEBUG,
default=INFO,
)
self.parser.add_argument(
"-a",
"--account",
dest="account",
help="Account for the bot account (foo@example.com)",
)
self.parser.add_argument(
"-p",
"--password",
dest="password",
help="Password for the bot account",
)
self.parser.add_argument(
"-n",
"--nick",
dest="nick",
help="Nickname for the bot account",
)
self.parser.add_argument(
2021-01-16 19:24:03 +00:00
"-av", "--avatar", dest="avatar", help="Avatar for the bot account"
)
self.parser.add_argument(
"-r",
"--redis-url",
dest="redis_url",
help="Redis storage connection URL",
)
self.args = self.parser.parse_args()
def setup_logging(self):
"""Arrange logging for the bot."""
basicConfig(
level=self.args.log_level, format="%(levelname)-8s %(message)s"
)
2021-01-14 18:18:27 +00:00
self.log = getLogger(__name__)
2021-01-10 13:10:39 +00:00
def read_config(self):
"""Read configuration for running bot."""
config = ConfigParser()
2021-01-10 13:10:39 +00:00
config_file_path = Path(self.CONFIG_FILE).absolute()
if not exists(config_file_path) and stdout.isatty():
2021-01-15 13:32:01 +00:00
self.log.info(f"Did not find {config_file_path}")
self.generate_config_interactively()
2021-01-10 13:10:39 +00:00
if exists(config_file_path):
config.read(config_file_path)
2021-01-10 13:10:39 +00:00
self.config = Config(self.name, config)
2021-01-10 13:10:39 +00:00
def generate_config_interactively(self):
2021-01-10 13:10:39 +00:00
"""Generate bot configuration."""
account = input("Account: ")
password = getpass("Password: ")
nick = input("Nickname: ")
2021-01-16 18:59:09 +00:00
avatar = input("Avatar: ")
redis_url = input("Redis URL: ")
2021-01-10 13:10:39 +00:00
config = ConfigParser()
config[self.name] = {"account": account, "password": password}
2021-01-10 13:10:39 +00:00
2021-01-10 14:45:12 +00:00
if nick:
config[self.name]["nick"] = nick
2021-01-16 18:59:09 +00:00
if avatar:
config[self.name]["avatar"] = avatar
if redis_url:
config[self.name]["redis_url"] = redis_url
2021-01-16 18:59:09 +00:00
with open(self.CONFIG_FILE, "w") as file_handle:
2021-01-10 13:10:39 +00:00
config.write(file_handle)
def init_bot(self):
"""Initialise bot with connection details."""
account = (
self.args.account
or self.config.account
or environ.get("XBOT_ACCOUNT", None)
)
password = (
self.args.password
or self.config.password
or environ.get("XBOT_PASSWORD", None)
)
nick = (
self.args.nick or self.config.nick or environ.get("XBOT_NICK", None)
)
2021-01-16 18:59:09 +00:00
avatar = (
self.args.avatar
or self.config.avatar
or environ.get("XBOT_AVATAR", None)
2021-01-16 19:24:03 +00:00
or "avatar.png"
2021-01-16 18:59:09 +00:00
)
redis_url = (
self.args.redis_url
or self.config.redis_url
or environ.get("XBOT_REDIS_URL", None)
)
if not account:
self.log.error("Unable to discover account")
exit(1)
if not password:
self.log.error("Unable to discover password")
exit(1)
if not nick:
self.log.error("Unable to discover nick")
exit(1)
ClientXMPP.__init__(self, account, password)
self.account = account
self.password = password
self.nick = nick
2021-01-16 18:59:09 +00:00
self.avatar = avatar
self.redis_url = redis_url
2021-01-10 13:10:39 +00:00
def register_xmpp_event_handlers(self):
"""Register functions against specific XMPP event handlers."""
self.add_event_handler("session_start", self.session_start)
2021-01-13 21:49:52 +00:00
self.add_event_handler("groupchat_invite", self.group_invite)
self.add_event_handler("message", self.direct_message)
self.add_event_handler("groupchat_message", self.group_message)
2021-01-15 11:57:25 +00:00
self.add_event_handler("message_error", self.error_message)
def error_message(self, message):
message = SimpleMessage(message, self)
self.log.error(f"Received error message: {message.text}")
2021-01-10 13:10:39 +00:00
def direct_message(self, message):
"""Handle direct message events."""
message = SimpleMessage(message, self)
if message.type not in self.DIRECT_MESSAGE_TYPES:
return
if message.text.startswith("@"):
if self.command(message, to=message.sender):
return
2021-01-16 12:09:41 +00:00
try:
self.direct(message)
2021-01-16 12:09:41 +00:00
except AttributeError:
self.log.info(f"Bot.direct not implemented for {self.nick}")
2021-01-10 13:10:39 +00:00
2021-01-13 21:49:52 +00:00
def session_start(self, message):
2021-01-10 13:10:39 +00:00
"""Handle session_start event."""
self.send_presence()
self.get_roster()
self.publish_avatar()
def publish_avatar(self):
"""Publish bot avatar."""
try:
abspath = Path(self.avatar).absolute()
with open(abspath, "rb") as handle:
contents = handle.read()
except IOError:
self.log.info(f"No avatar discovered (tried '{abspath}')")
return
id = self.plugin["xep_0084"].generate_id(contents)
info = {
"id": id,
"type": f"image/{what('', contents)}",
"bytes": len(contents),
}
self.plugin["xep_0084"].publish_avatar(contents)
self.plugin["xep_0084"].publish_avatar_metadata(items=[info])
2021-01-10 13:10:39 +00:00
2021-01-13 21:49:52 +00:00
def group_invite(self, message):
"""Accept invites to group chats."""
self.plugin["xep_0045"].join_muc(message["from"], self.config.nick)
2021-01-10 15:31:17 +00:00
def group_message(self, message):
"""Handle group chat message events."""
message = SimpleMessage(message, self)
if message.text.startswith("@"):
return self.meta(message, room=message.room)
2021-01-16 12:09:41 +00:00
miss = message.type not in self.GROUP_MESSAGE_TYPES
loop = message.nick == self.nick
other = self.nick not in message.text
2021-01-16 12:17:55 +00:00
if miss or loop or other:
return
2021-01-15 14:05:49 +00:00
if message.content.startswith("@"):
if self.command(message, room=message.room):
return
2021-01-16 12:09:41 +00:00
try:
self.group(message)
2021-01-16 12:09:41 +00:00
except AttributeError:
self.log.info(f"Bot.group not implemented for {self.nick}")
2021-01-10 13:10:39 +00:00
def register_xmpp_plugins(self):
"""Register XMPP plugins that the bot supports."""
self.register_plugin("xep_0030") # Service Discovery
self.register_plugin("xep_0045") # Multi-User Chat
self.register_plugin("xep_0199") # XMPP Ping
self.register_plugin("xep_0084") # User Avatar
def init_db(self):
"""Initialise the Redis key/value store."""
if not self.redis_url:
self.db = None
return self.log.info("No storage discovered")
self.db = Redis.from_url(self.redis_url, decode_responses=True)
self.log.info("Successfully connected to storage")
2021-01-10 13:10:39 +00:00
def run(self):
"""Run the bot."""
self.connect()
2021-01-10 14:37:10 +00:00
try:
self.process()
except KeyboardInterrupt:
pass
2021-01-10 13:49:22 +00:00
def reply(self, text, to=None, room=None):
2021-01-10 18:17:10 +00:00
"""Send back a reply."""
if to is None and room is None:
2021-01-16 11:50:29 +00:00
self.log.error("`to` or `room` arguments required for `reply`")
2021-01-14 21:44:48 +00:00
exit(1)
2021-01-10 18:17:10 +00:00
if to is not None and room is not None:
2021-01-14 21:44:48 +00:00
self.log.error("Cannot send to both `to` and `room` for `reply`")
exit(1)
2021-01-10 18:17:10 +00:00
kwargs = {"mbody": text}
2021-01-10 18:17:10 +00:00
if to is not None:
kwargs["mto"] = to
kwargs["mtype"] = "chat"
else:
kwargs["mto"] = room
kwargs["mtype"] = "groupchat"
self.send_message(**kwargs)
return True
@property
def uptime(self):
"""Time since the bot came up."""
return naturaldelta(self.start - dt.now())
def meta(self, message, **kwargs):
"""Handle meta command invocations."""
if message.text.startswith("@bots"):
return self.reply("🖐️", **kwargs)
2021-01-16 12:14:42 +00:00
def command(self, message, **kwargs):
"""Handle command invocations."""
if message.content.startswith("@uptime"):
return self.reply(self.uptime, **kwargs)
elif message.content.startswith("@help"):
try:
return self.reply(cleandoc(self.help), **kwargs)
except AttributeError:
return self.reply("No help found 🤔️", **kwargs)
2021-01-13 13:08:44 +00:00
class EchoBot(Bot):
"""Responds with whatever you send.
2021-01-13 13:08:44 +00:00
Simply direct message the bot and see if you get back what you sent. It
also works in group chats but in this case you need to summon the bot using
2021-01-15 20:14:38 +00:00
its nickname.
2021-01-13 13:08:44 +00:00
"""
2021-01-16 11:34:26 +00:00
help = "I echo messages back 🖖️"
def direct(self, message):
"""Send back whatever we receive."""
self.reply(message.text, to=message.sender)
2021-01-13 13:31:00 +00:00
def group(self, message):
"""Send back whatever receive in group chats."""
2021-01-16 16:31:22 +00:00
self.reply(message.content, room=message.room)
2021-01-13 13:08:44 +00:00
class WhisperBot(Bot):
"""Anonymous whispering in group chats.
2021-01-13 13:08:44 +00:00
In order to activate this bot you can invite it to your group chat. Once
invited, you can start a private chat with the bot and tell it you want it
to whisper your message into the group chat. The bot will then do this on
your behalf and not reveal your identity. This is nice when you want to
communicate with the group anonymously.
2021-01-13 13:08:44 +00:00
"""
2021-01-16 11:34:26 +00:00
help = "I whisper private messages into group chats 😌️"
def direct(self, message):
"""Receive private messages and whisper them into group chats."""
self.reply(f"*pssttt...* {message.content}", room=message.room)
2021-01-15 20:14:54 +00:00
class GlossBot(Bot):
"""Building a shared glossary together.
A glossary is "an alphabetical list of terms in a particular domain of
knowledge with the definitions for those terms."
This bot reacts to commands which insert, list or delete items from a
shared glossary when summoned in a group chat. This bot makes use of
persistent storage so the glossary is always there even if the bot goes
away.
"""
2021-01-16 12:05:30 +00:00
help = """
I help build a shared glossary
2021-01-16 11:34:26 +00:00
glossbot: @add <entry> - <definition>
glossbot: @rm <entry>
glossbot: @rand
glossbot: @ls
2021-01-15 20:14:54 +00:00
"""
2021-01-15 20:18:20 +00:00
def group(self, message):
2021-01-16 11:29:37 +00:00
"""Handle glossary commands."""
if "@add" in message.content:
2021-01-15 20:14:54 +00:00
try:
parsed = self.parse_add(message)
self.add(*parsed, room=message.room)
except Exception:
response = f"Couldn't understand '{message.content}'?"
self.reply(response, room=message.sender)
elif "@rm" in message.content:
2021-01-15 20:14:54 +00:00
try:
parsed = message.content.split("@rm")[-1].strip()
self.rm(parsed, room=message.room)
except Exception:
response = f"Couldn't understand '{message.content}'?"
self.reply(response, room=message.sender)
elif "@rand" in message.content:
2021-01-16 11:30:19 +00:00
self.rand(room=message.room)
elif "@ls" in message.content:
2021-01-16 11:30:19 +00:00
self.ls(room=message.room)
2021-01-15 20:14:54 +00:00
else:
self.log.info(f"{message.text} not recognised as glossbot command")
def parse_add(self, message):
"""Parse the add command syntax."""
try:
replaced = message.content.replace("@add", "")
return [s.strip() for s in replaced.split("-")]
except ValueError:
self.log.error(f"Failed to parse {message.content}")
2021-01-15 20:14:54 +00:00
def add(self, entry, definition, **kwargs):
"""Add a new entry."""
self.db[entry] = definition
self.reply("Added ✌️", **kwargs)
def rand(self, **kwargs):
"""List a random entry."""
2021-01-16 11:30:37 +00:00
if not self.db.keys():
return self.reply("Glossary is empty 🙃️", **kwargs)
2021-01-16 11:30:37 +00:00
2021-01-15 20:14:54 +00:00
entry = choice(self.db.keys())
self.reply(f"{entry} - {self.db[entry]}", **kwargs)
def ls(self, **kwargs):
"""List all entries."""
if not self.db.keys():
return self.reply("Glossary is empty 🙃️", **kwargs)
2021-01-15 20:14:54 +00:00
for entry in sorted(self.db.keys()):
2021-01-15 20:14:54 +00:00
self.reply(f"{entry} - {self.db[entry]}", **kwargs)
def rm(self, entry, **kwargs):
"""Remove an entry."""
if entry not in self.db.keys():
return self.reply(f"{entry} doesn't exist?", **kwargs)
self.db.delete(entry)
self.reply("Removed ✌️", **kwargs)