xbotlib/xbotlib.py

288 lines
8.2 KiB
Python
Raw Normal View History

2021-01-10 13:10:39 +00:00
"""XMPP bots for humans."""
from argparse import ArgumentParser
2021-01-10 13:10:39 +00:00
from configparser import ConfigParser
from getpass import getpass
2021-01-14 18:18:27 +00:00
from logging import DEBUG, INFO, basicConfig, getLogger
from os import environ
2021-01-10 13:10:39 +00:00
from os.path import exists
from pathlib import Path
from sys import exit, stdout
2021-01-10 13:10:39 +00:00
from slixmpp import ClientXMPP
2021-01-13 13:00:59 +00:00
class SimpleMessage:
2021-01-10 13:49:22 +00:00
"""A simple message interface."""
def __init__(self, message):
self.message = message
@property
def body(self):
return self.message["body"]
@property
def sender(self):
2021-01-13 13:42:26 +00:00
return self.message["from"]
@property
def room(self):
2021-01-10 15:51:20 +00:00
return self.message["from"].bare
2021-01-10 13:49:22 +00:00
@property
def receiver(self):
return self.message["to"]
2021-01-10 15:31:17 +00:00
@property
def nickname(self):
return self.message["mucnick"]
@property
def type(self):
return self.message["type"]
2021-01-10 13:49:22 +00:00
class Config:
"""Bot file configuration."""
def __init__(self, name, config):
self.name = name
self.config = config
self.section = config[self.name] if self.name in config else {}
@property
def account(self):
return self.section.get("account", None)
@property
def password(self):
return self.section.get("password", None)
@property
def nick(self):
return self.section.get("nick", None)
2021-01-10 13:10:39 +00:00
class Bot(ClientXMPP):
"""XMPP bots for humans."""
2021-01-10 13:10:39 +00:00
def __init__(self):
self.name = type(self).__name__.lower()
self.CONFIG_FILE = f"{self.name}.conf"
self.parse_arguments()
self.setup_logging()
2021-01-10 13:10:39 +00:00
self.read_config()
self.init_bot()
self.register_xmpp_event_handlers()
self.register_xmpp_plugins()
self.run()
def parse_arguments(self):
"""Parse command-line arguments."""
self.parser = ArgumentParser(description="XMPP bots for humans")
self.parser.add_argument(
"-d",
"--debug",
help="Enable verbose debug logs",
action="store_const",
dest="log_level",
const=DEBUG,
default=INFO,
)
self.parser.add_argument(
"-a",
"--account",
dest="account",
help="Account for the bot account (foo@example.com)",
)
self.parser.add_argument(
"-p",
"--password",
dest="password",
help="Password for the bot account",
)
self.parser.add_argument(
"-n",
"--nick",
dest="nick",
help="Nickname for the bot account",
)
self.args = self.parser.parse_args()
def setup_logging(self):
"""Arrange logging for the bot."""
basicConfig(
level=self.args.log_level, format="%(levelname)-8s %(message)s"
)
2021-01-14 18:18:27 +00:00
self.log = getLogger(__name__)
2021-01-10 13:10:39 +00:00
def read_config(self):
"""Read configuration for running bot."""
config = ConfigParser()
2021-01-10 13:10:39 +00:00
config_file_path = Path(self.CONFIG_FILE).absolute()
if not exists(config_file_path) and stdout.isatty():
self.generate_config_interactively()
2021-01-10 13:10:39 +00:00
if exists(config_file_path):
config.read(config_file_path)
2021-01-10 13:10:39 +00:00
self.config = Config(self.name, config)
2021-01-10 13:10:39 +00:00
def generate_config_interactively(self):
2021-01-10 13:10:39 +00:00
"""Generate bot configuration."""
account = input("Account: ")
password = getpass("Password: ")
nick = input("Nickname: ")
2021-01-10 13:10:39 +00:00
config = ConfigParser()
config[self.name] = {"account": account, "password": password}
2021-01-10 13:10:39 +00:00
2021-01-10 14:45:12 +00:00
if nick:
config[self.name]["nick"] = nick
2021-01-10 14:45:12 +00:00
with open(self.CONFIG_FILE, "w") as file_handle:
2021-01-10 13:10:39 +00:00
config.write(file_handle)
def init_bot(self):
"""Initialise bot with connection details."""
account = (
self.args.account
or self.config.account
or environ.get("XBOT_ACCOUNT", None)
)
password = (
self.args.password
or self.config.password
or environ.get("XBOT_PASSWORD", None)
)
nick = (
self.args.nick or self.config.nick or environ.get("XBOT_NICK", None)
)
if not account:
self.log.error("Unable to discover account")
exit(1)
if not password:
self.log.error("Unable to discover password")
exit(1)
if not nick:
self.log.error("Unable to discover nick")
exit(1)
ClientXMPP.__init__(self, account, password)
self.account = account
self.password = password
self.nick = nick
2021-01-10 13:10:39 +00:00
def register_xmpp_event_handlers(self):
"""Register functions against specific XMPP event handlers."""
self.add_event_handler("session_start", self.session_start)
2021-01-13 21:49:52 +00:00
self.add_event_handler("groupchat_invite", self.group_invite)
self.add_event_handler("message", self.direct_message)
self.add_event_handler("groupchat_message", self.group_message)
2021-01-10 13:10:39 +00:00
def direct_message(self, message):
2021-01-10 13:10:39 +00:00
"""Handle message event."""
if message["type"] in ("chat", "normal"):
try:
self.direct(SimpleMessage(message))
except AttributeError:
self.log.info("Bot.direct not implemented")
2021-01-10 13:10:39 +00:00
2021-01-13 21:49:52 +00:00
def session_start(self, message):
2021-01-10 13:10:39 +00:00
"""Handle session_start event."""
self.send_presence()
self.get_roster()
2021-01-13 21:49:52 +00:00
def group_invite(self, message):
"""Accept invites to group chats."""
self.plugin["xep_0045"].join_muc(message["from"], self.config.nick)
2021-01-10 15:31:17 +00:00
def group_message(self, message):
2021-01-10 13:10:39 +00:00
"""Handle groupchat_message event."""
2021-01-10 15:31:17 +00:00
if message["type"] in ("groupchat", "normal"):
if message["mucnick"] != self.config.nick:
try:
self.group(SimpleMessage(message))
except AttributeError:
self.log.info("Bot.group not implemented")
2021-01-10 13:10:39 +00:00
def register_xmpp_plugins(self):
"""Register XMPP plugins that the bot supports."""
self.register_plugin("xep_0030") # Service Discovery
self.register_plugin("xep_0045") # Multi-User Chat
self.register_plugin("xep_0199") # XMPP Ping
def run(self):
"""Run the bot."""
self.connect()
2021-01-10 14:37:10 +00:00
try:
self.process()
except KeyboardInterrupt:
pass
2021-01-10 13:49:22 +00:00
def reply(self, body, to=None, room=None):
2021-01-10 18:17:10 +00:00
"""Send back a reply."""
if to is None and room is None:
message = "`to` or `room` arguments required for `reply`"
raise RuntimeError(message)
if to is not None and room is not None:
message = "Cannot send to both `to` and `room` for `reply`"
raise RuntimeError(message)
kwargs = {"mbody": body}
if to is not None:
kwargs["mto"] = to
kwargs["mtype"] = "chat"
else:
kwargs["mto"] = room
kwargs["mtype"] = "groupchat"
self.send_message(**kwargs)
2021-01-13 13:08:44 +00:00
class EchoBot(Bot):
"""Responds with whatever you send.
2021-01-13 13:08:44 +00:00
Simply direct message the bot and see if you get back what you sent. It
also works in group chats but in this case you need to summon the bot using
its nickname. Usually like so.
2021-01-13 13:08:44 +00:00
echobot:foo
"""
def direct(self, message):
"""Send back whatever we receive."""
self.reply(message.body, to=message.sender)
2021-01-13 13:31:00 +00:00
def group(self, message):
"""Send back whatever receive in group chats."""
if "echobot" in message.body:
self.reply(message.body.split(":")[-1], room=message.room)
2021-01-13 13:08:44 +00:00
class WhisperBot(Bot):
"""Anonymous whispering in group chats.
2021-01-13 13:08:44 +00:00
In order to activate this bot you can invite it to your group chat. Once
invited, you can start a private chat with the bot and tell it you want it
to whisper your message into the group chat. The bot will then do this on
your behalf and not reveal your identity. This is nice when you want to
communicate with the group anonymously.
2021-01-13 13:08:44 +00:00
"""
def direct(self, message):
"""Receive private messages and whisper them into group chats."""
self.reply(f"*pssttt...* {message.body}", room=message.room)