xbotlib/xbotlib.py

382 lines
11 KiB
Python
Raw Normal View History

2021-01-10 13:10:39 +00:00
"""XMPP bots for humans."""
from argparse import ArgumentParser
2021-01-10 13:10:39 +00:00
from configparser import ConfigParser
from datetime import datetime as dt
2021-01-10 13:10:39 +00:00
from getpass import getpass
from imghdr import what
2021-01-14 18:18:27 +00:00
from logging import DEBUG, INFO, basicConfig, getLogger
from os import environ
2021-01-10 13:10:39 +00:00
from os.path import exists
from pathlib import Path
from sys import exit, stdout
2021-01-10 13:10:39 +00:00
from humanize import naturaldelta
2021-01-10 13:10:39 +00:00
from slixmpp import ClientXMPP
2021-01-13 13:00:59 +00:00
class SimpleMessage:
2021-01-10 13:49:22 +00:00
"""A simple message interface."""
def __init__(self, message):
self.message = message
@property
def body(self):
return self.message["body"]
@property
def sender(self):
2021-01-13 13:42:26 +00:00
return self.message["from"]
@property
def room(self):
2021-01-10 15:51:20 +00:00
return self.message["from"].bare
2021-01-10 13:49:22 +00:00
@property
def receiver(self):
return self.message["to"]
2021-01-10 15:31:17 +00:00
@property
def nickname(self):
return self.message["mucnick"]
@property
def type(self):
return self.message["type"]
2021-01-10 13:49:22 +00:00
class Config:
"""Bot file configuration."""
def __init__(self, name, config):
self.name = name
self.config = config
self.section = config[self.name] if self.name in config else {}
@property
def account(self):
return self.section.get("account", None)
@property
def password(self):
return self.section.get("password", None)
@property
def nick(self):
return self.section.get("nick", None)
2021-01-10 13:10:39 +00:00
class Bot(ClientXMPP):
"""XMPP bots for humans."""
2021-01-10 13:10:39 +00:00
def __init__(self):
self.name = type(self).__name__.lower()
self.start = dt.now()
self.CONFIG_FILE = f"{self.name}.conf"
self.parse_arguments()
self.setup_logging()
2021-01-10 13:10:39 +00:00
self.read_config()
self.init_bot()
self.register_xmpp_event_handlers()
self.register_xmpp_plugins()
self.run()
def parse_arguments(self):
"""Parse command-line arguments."""
self.parser = ArgumentParser(description="XMPP bots for humans")
self.parser.add_argument(
"-d",
"--debug",
help="Enable verbose debug logs",
action="store_const",
dest="log_level",
const=DEBUG,
default=INFO,
)
self.parser.add_argument(
"-a",
"--account",
dest="account",
help="Account for the bot account (foo@example.com)",
)
self.parser.add_argument(
"-p",
"--password",
dest="password",
help="Password for the bot account",
)
self.parser.add_argument(
"-n",
"--nick",
dest="nick",
help="Nickname for the bot account",
)
self.parser.add_argument(
"-av",
"--avatar",
dest="avatar",
help="Avatar for the bot account",
default="avatar.png",
)
self.args = self.parser.parse_args()
def setup_logging(self):
"""Arrange logging for the bot."""
basicConfig(
level=self.args.log_level, format="%(levelname)-8s %(message)s"
)
2021-01-14 18:18:27 +00:00
self.log = getLogger(__name__)
2021-01-10 13:10:39 +00:00
def read_config(self):
"""Read configuration for running bot."""
config = ConfigParser()
2021-01-10 13:10:39 +00:00
config_file_path = Path(self.CONFIG_FILE).absolute()
if not exists(config_file_path) and stdout.isatty():
2021-01-15 13:32:01 +00:00
self.log.info(f"Did not find {config_file_path}")
self.generate_config_interactively()
2021-01-10 13:10:39 +00:00
if exists(config_file_path):
config.read(config_file_path)
2021-01-10 13:10:39 +00:00
self.config = Config(self.name, config)
2021-01-10 13:10:39 +00:00
def generate_config_interactively(self):
2021-01-10 13:10:39 +00:00
"""Generate bot configuration."""
account = input("Account: ")
password = getpass("Password: ")
nick = input("Nickname: ")
2021-01-10 13:10:39 +00:00
config = ConfigParser()
config[self.name] = {"account": account, "password": password}
2021-01-10 13:10:39 +00:00
2021-01-10 14:45:12 +00:00
if nick:
config[self.name]["nick"] = nick
2021-01-10 14:45:12 +00:00
with open(self.CONFIG_FILE, "w") as file_handle:
2021-01-10 13:10:39 +00:00
config.write(file_handle)
def init_bot(self):
"""Initialise bot with connection details."""
account = (
self.args.account
or self.config.account
or environ.get("XBOT_ACCOUNT", None)
)
password = (
self.args.password
or self.config.password
or environ.get("XBOT_PASSWORD", None)
)
nick = (
self.args.nick or self.config.nick or environ.get("XBOT_NICK", None)
)
if not account:
self.log.error("Unable to discover account")
exit(1)
if not password:
self.log.error("Unable to discover password")
exit(1)
if not nick:
self.log.error("Unable to discover nick")
exit(1)
ClientXMPP.__init__(self, account, password)
self.account = account
self.password = password
self.nick = nick
2021-01-10 13:10:39 +00:00
self.avatar = self.args.avatar
2021-01-10 13:10:39 +00:00
def register_xmpp_event_handlers(self):
"""Register functions against specific XMPP event handlers."""
self.add_event_handler("session_start", self.session_start)
2021-01-13 21:49:52 +00:00
self.add_event_handler("groupchat_invite", self.group_invite)
self.add_event_handler("message", self.direct_message)
self.add_event_handler("groupchat_message", self.group_message)
2021-01-15 11:57:25 +00:00
self.add_event_handler("message_error", self.error_message)
def error_message(self, message):
_message = SimpleMessage(message)
self.log.error(f"Received error message: {_message.body}")
2021-01-10 13:10:39 +00:00
def direct_message(self, message):
2021-01-10 13:10:39 +00:00
"""Handle message event."""
if message["type"] in ("chat", "normal"):
_message = SimpleMessage(message)
2021-01-15 14:05:49 +00:00
if _message.body.startswith("/"):
self.meta(_message, to=_message.sender)
return
if _message.body.startswith("!"):
self.command(_message, to=_message.sender)
return
try:
self.direct(_message)
except AttributeError:
self.log.info("Bot.direct not implemented")
2021-01-10 13:10:39 +00:00
2021-01-13 21:49:52 +00:00
def session_start(self, message):
2021-01-10 13:10:39 +00:00
"""Handle session_start event."""
self.send_presence()
self.get_roster()
self.publish_avatar()
def publish_avatar(self):
"""Publish bot avatar."""
try:
abspath = Path(self.avatar).absolute()
with open(abspath, "rb") as handle:
contents = handle.read()
except IOError:
self.log.info(f"No avatar discovered (tried '{abspath}')")
return
id = self.plugin["xep_0084"].generate_id(contents)
info = {
"id": id,
"type": f"image/{what('', contents)}",
"bytes": len(contents),
}
self.plugin["xep_0084"].publish_avatar(contents)
self.plugin["xep_0084"].publish_avatar_metadata(items=[info])
2021-01-10 13:10:39 +00:00
2021-01-13 21:49:52 +00:00
def group_invite(self, message):
"""Accept invites to group chats."""
self.plugin["xep_0045"].join_muc(message["from"], self.config.nick)
2021-01-10 15:31:17 +00:00
def group_message(self, message):
2021-01-10 13:10:39 +00:00
"""Handle groupchat_message event."""
2021-01-10 15:31:17 +00:00
if message["type"] in ("groupchat", "normal"):
if message["mucnick"] != self.config.nick:
_message = SimpleMessage(message)
2021-01-15 14:05:49 +00:00
if _message.body.startswith("/"):
self.meta(_message, room=_message.room)
return
if f"{self.nick}:!" in _message.body:
self.command(_message, room=_message.room)
return
try:
self.group(_message)
except AttributeError:
self.log.info("Bot.group not implemented")
2021-01-10 13:10:39 +00:00
def register_xmpp_plugins(self):
"""Register XMPP plugins that the bot supports."""
self.register_plugin("xep_0030") # Service Discovery
self.register_plugin("xep_0045") # Multi-User Chat
self.register_plugin("xep_0199") # XMPP Ping
self.register_plugin("xep_0084") # User Avatar
2021-01-10 13:10:39 +00:00
def run(self):
"""Run the bot."""
self.connect()
2021-01-10 14:37:10 +00:00
try:
self.process()
except KeyboardInterrupt:
pass
2021-01-10 13:49:22 +00:00
def reply(self, body, to=None, room=None):
2021-01-10 18:17:10 +00:00
"""Send back a reply."""
if to is None and room is None:
2021-01-14 21:44:48 +00:00
self.log("`to` or `room` arguments required for `reply`")
exit(1)
2021-01-10 18:17:10 +00:00
if to is not None and room is not None:
2021-01-14 21:44:48 +00:00
self.log.error("Cannot send to both `to` and `room` for `reply`")
exit(1)
2021-01-10 18:17:10 +00:00
kwargs = {"mbody": body}
if to is not None:
kwargs["mto"] = to
kwargs["mtype"] = "chat"
else:
kwargs["mto"] = room
kwargs["mtype"] = "groupchat"
self.send_message(**kwargs)
@property
def uptime(self):
"""Time since the bot came up."""
return naturaldelta(self.start - dt.now())
def command(self, message, **kwargs):
"""Handle "!" style commands with built-in responses."""
command = message.body.split("!")[-1]
if command == "uptime":
self.reply(self.uptime, **kwargs)
elif command == "help":
try:
self.reply(self.help, **kwargs)
except AttributeError:
self.reply("No help found 🤔️", **kwargs)
2021-01-15 14:05:49 +00:00
else:
self.log.error(f"'{command}' command is not recognised")
def meta(self, message, **kwargs):
"""Handle "/" style commands with built-in responses."""
command = message.body.split("/")[-1]
if command == "bots":
2021-01-15 14:51:21 +00:00
self.reply("🖐️", **kwargs)
else:
2021-01-15 13:42:17 +00:00
self.log.error(f"'{command}' command is not recognised")
2021-01-13 13:08:44 +00:00
class EchoBot(Bot):
"""Responds with whatever you send.
2021-01-13 13:08:44 +00:00
Simply direct message the bot and see if you get back what you sent. It
also works in group chats but in this case you need to summon the bot using
its nickname. Usually like so.
2021-01-13 13:08:44 +00:00
echobot:foo
"""
help = "I echo back whatever you send to me 🖖️"
def direct(self, message):
"""Send back whatever we receive."""
self.reply(message.body, to=message.sender)
2021-01-13 13:31:00 +00:00
def group(self, message):
"""Send back whatever receive in group chats."""
if "echobot" in message.body:
self.reply(message.body.split(":")[-1], room=message.room)
2021-01-13 13:08:44 +00:00
class WhisperBot(Bot):
"""Anonymous whispering in group chats.
2021-01-13 13:08:44 +00:00
In order to activate this bot you can invite it to your group chat. Once
invited, you can start a private chat with the bot and tell it you want it
to whisper your message into the group chat. The bot will then do this on
your behalf and not reveal your identity. This is nice when you want to
communicate with the group anonymously.
2021-01-13 13:08:44 +00:00
"""
help = "I whisper your private messages into group chats 😌️"
def direct(self, message):
"""Receive private messages and whisper them into group chats."""
self.reply(f"*pssttt...* {message.body}", room=message.room)