2021-01-10 13:10:39 +00:00
|
|
|
"""XMPP bots for humans."""
|
|
|
|
|
2021-01-12 20:39:50 +00:00
|
|
|
from argparse import ArgumentParser, BooleanOptionalAction
|
2021-01-10 13:10:39 +00:00
|
|
|
from configparser import ConfigParser
|
|
|
|
from getpass import getpass
|
2021-01-12 20:39:50 +00:00
|
|
|
from os import environ
|
2021-01-10 13:10:39 +00:00
|
|
|
from os.path import exists
|
|
|
|
from pathlib import Path
|
|
|
|
|
|
|
|
from slixmpp import ClientXMPP
|
|
|
|
|
|
|
|
|
2021-01-10 13:49:22 +00:00
|
|
|
class EasyMessage:
|
|
|
|
"""A simple message interface."""
|
|
|
|
|
|
|
|
def __init__(self, message):
|
|
|
|
self.message = message
|
|
|
|
|
|
|
|
@property
|
|
|
|
def body(self):
|
|
|
|
return self.message["body"]
|
|
|
|
|
|
|
|
@property
|
|
|
|
def sender(self):
|
2021-01-10 15:51:20 +00:00
|
|
|
return self.message["from"].bare
|
2021-01-10 13:49:22 +00:00
|
|
|
|
|
|
|
@property
|
|
|
|
def receiver(self):
|
|
|
|
return self.message["to"]
|
|
|
|
|
2021-01-10 15:31:17 +00:00
|
|
|
@property
|
|
|
|
def nickname(self):
|
|
|
|
return self.message["mucnick"]
|
|
|
|
|
|
|
|
@property
|
|
|
|
def type(self):
|
|
|
|
return self.message["type"]
|
|
|
|
|
2021-01-10 13:49:22 +00:00
|
|
|
|
2021-01-10 13:10:39 +00:00
|
|
|
class Bot(ClientXMPP):
|
|
|
|
CONFIG_FILE = "bot.conf"
|
|
|
|
|
|
|
|
def __init__(self):
|
2021-01-12 20:39:50 +00:00
|
|
|
self.parse_arguments()
|
2021-01-10 13:10:39 +00:00
|
|
|
self.read_config()
|
|
|
|
self.init_bot()
|
|
|
|
self.register_xmpp_event_handlers()
|
|
|
|
self.register_xmpp_plugins()
|
|
|
|
self.run()
|
|
|
|
|
2021-01-12 20:39:50 +00:00
|
|
|
def parse_arguments(self):
|
|
|
|
"""Parse command-line arguments."""
|
|
|
|
self.parser = ArgumentParser()
|
|
|
|
self.parser.add_argument(
|
|
|
|
"--input",
|
|
|
|
help="Read configuration from environment",
|
|
|
|
action=BooleanOptionalAction,
|
|
|
|
default=True,
|
|
|
|
)
|
|
|
|
self.args = self.parser.parse_args()
|
|
|
|
|
2021-01-10 13:10:39 +00:00
|
|
|
def read_config(self):
|
|
|
|
"""Read configuration for running bot."""
|
2021-01-12 20:39:50 +00:00
|
|
|
self.config = ConfigParser()
|
|
|
|
|
2021-01-10 13:10:39 +00:00
|
|
|
config_file_path = Path(self.CONFIG_FILE).absolute()
|
2021-01-12 20:39:50 +00:00
|
|
|
if not exists(config_file_path) and self.args.input:
|
|
|
|
self.generate_config_interactively()
|
2021-01-10 13:10:39 +00:00
|
|
|
|
2021-01-12 20:39:50 +00:00
|
|
|
if exists(config_file_path):
|
|
|
|
self.config.read(config_file_path)
|
2021-01-10 13:10:39 +00:00
|
|
|
|
2021-01-12 20:39:50 +00:00
|
|
|
if self.args.input is False:
|
|
|
|
self.read_config_from_env()
|
|
|
|
|
|
|
|
if "bot" not in self.config:
|
|
|
|
raise RuntimeError("Failed to configure bot")
|
2021-01-10 13:10:39 +00:00
|
|
|
|
2021-01-12 20:39:50 +00:00
|
|
|
def generate_config_interactively(self):
|
2021-01-10 13:10:39 +00:00
|
|
|
"""Generate bot configuration."""
|
2021-01-10 14:45:12 +00:00
|
|
|
jid = input("XMPP address (e.g. foo@bar.com): ") or "foo@bar.com"
|
2021-01-10 13:10:39 +00:00
|
|
|
password = (
|
2021-01-10 14:45:12 +00:00
|
|
|
getpass("Password (e.g. my-cool-password): ") or "my-cool-password"
|
2021-01-10 13:10:39 +00:00
|
|
|
)
|
2021-01-10 18:17:10 +00:00
|
|
|
room = input("XMPP room (e.g. foo@muc.bar.com): ")
|
2021-01-10 14:45:12 +00:00
|
|
|
nick = input("Nickname (e.g. lurkbot): ")
|
2021-01-10 13:10:39 +00:00
|
|
|
|
|
|
|
config = ConfigParser()
|
|
|
|
config["bot"] = {"jid": jid, "password": password}
|
|
|
|
|
2021-01-10 14:45:12 +00:00
|
|
|
if room:
|
|
|
|
config["bot"]["room"] = room
|
|
|
|
if nick:
|
|
|
|
config["bot"]["nick"] = nick
|
|
|
|
|
2021-01-10 13:10:39 +00:00
|
|
|
with open("bot.conf", "w") as file_handle:
|
|
|
|
config.write(file_handle)
|
|
|
|
|
2021-01-12 20:39:50 +00:00
|
|
|
def read_config_from_env(self):
|
|
|
|
"""Read configuration from the environment."""
|
|
|
|
self.config["bot"] = {}
|
|
|
|
self.config["bot"]["jid"] = environ.get("XBOT_JID")
|
|
|
|
self.config["bot"]["password"] = environ.get("XBOT_PASSWORD")
|
2021-01-12 21:04:29 +00:00
|
|
|
self.config["bot"]["room"] = environ.get("XBOT_ROOM", "")
|
|
|
|
self.config["bot"]["nick"] = environ.get("XBOT_NICK", "")
|
2021-01-12 20:39:50 +00:00
|
|
|
|
2021-01-10 13:10:39 +00:00
|
|
|
def init_bot(self):
|
|
|
|
"""Initialise bot with connection details."""
|
|
|
|
jid = self.config["bot"]["jid"]
|
2021-01-10 15:31:17 +00:00
|
|
|
password = self.config["bot"]["password"]
|
|
|
|
ClientXMPP.__init__(self, jid, password)
|
2021-01-10 13:10:39 +00:00
|
|
|
|
|
|
|
def register_xmpp_event_handlers(self):
|
|
|
|
"""Register functions against specific XMPP event handlers."""
|
|
|
|
self.add_event_handler("session_start", self.session_start)
|
|
|
|
self.add_event_handler("message", self.message)
|
|
|
|
self.add_event_handler("groupchat_message", self.groupchat_message)
|
|
|
|
|
|
|
|
def message(self, message):
|
|
|
|
"""Handle message event."""
|
|
|
|
if message["type"] in ("chat", "normal"):
|
2021-01-10 18:17:10 +00:00
|
|
|
self.react(EasyMessage(message))
|
2021-01-10 13:10:39 +00:00
|
|
|
|
|
|
|
def session_start(self, event):
|
|
|
|
"""Handle session_start event."""
|
|
|
|
self.send_presence()
|
|
|
|
self.get_roster()
|
|
|
|
|
2021-01-10 15:31:17 +00:00
|
|
|
room = self.config["bot"].get("room")
|
|
|
|
nick = self.config["bot"].get("nick")
|
|
|
|
|
|
|
|
if room and nick:
|
|
|
|
self.plugin["xep_0045"].join_muc(room, nick)
|
|
|
|
|
2021-01-10 13:10:39 +00:00
|
|
|
def groupchat_message(self, message):
|
|
|
|
"""Handle groupchat_message event."""
|
2021-01-10 15:31:17 +00:00
|
|
|
if message["type"] in ("groupchat", "normal"):
|
|
|
|
if message["mucnick"] != self.config["bot"]["nick"]:
|
2021-01-10 18:17:10 +00:00
|
|
|
self.react(EasyMessage(message))
|
2021-01-10 13:10:39 +00:00
|
|
|
|
|
|
|
def register_xmpp_plugins(self):
|
|
|
|
"""Register XMPP plugins that the bot supports."""
|
|
|
|
self.register_plugin("xep_0030") # Service Discovery
|
|
|
|
self.register_plugin("xep_0045") # Multi-User Chat
|
|
|
|
self.register_plugin("xep_0199") # XMPP Ping
|
|
|
|
|
|
|
|
def run(self):
|
|
|
|
"""Run the bot."""
|
|
|
|
self.connect()
|
2021-01-10 14:37:10 +00:00
|
|
|
|
|
|
|
try:
|
|
|
|
self.process()
|
|
|
|
except KeyboardInterrupt:
|
|
|
|
pass
|
2021-01-10 13:49:22 +00:00
|
|
|
|
2021-01-10 18:41:56 +00:00
|
|
|
def reply(self, body, to=None, room=None):
|
2021-01-10 18:17:10 +00:00
|
|
|
"""Send back a reply."""
|
|
|
|
if to is None and room is None:
|
|
|
|
message = "`to` or `room` arguments required for `reply`"
|
|
|
|
raise RuntimeError(message)
|
|
|
|
|
|
|
|
if to is not None and room is not None:
|
|
|
|
message = "Cannot send to both `to` and `room` for `reply`"
|
|
|
|
raise RuntimeError(message)
|
|
|
|
|
|
|
|
kwargs = {"mbody": body}
|
|
|
|
if to is not None:
|
|
|
|
kwargs["mto"] = to
|
|
|
|
kwargs["mtype"] = "chat"
|
|
|
|
else:
|
|
|
|
kwargs["mto"] = room
|
|
|
|
kwargs["mtype"] = "groupchat"
|
|
|
|
|
|
|
|
self.send_message(**kwargs)
|
2021-01-10 18:29:34 +00:00
|
|
|
|
|
|
|
def react(self, message):
|
|
|
|
message = "You need to write your own `react` implementation"
|
|
|
|
raise NotImplementedError(message)
|