2021-01-10 13:10:39 +00:00
|
|
|
"""XMPP bots for humans."""
|
|
|
|
|
2021-01-14 21:26:13 +00:00
|
|
|
from argparse import ArgumentParser
|
2021-01-10 13:10:39 +00:00
|
|
|
from configparser import ConfigParser
|
|
|
|
from getpass import getpass
|
2021-01-14 18:18:27 +00:00
|
|
|
from logging import DEBUG, INFO, basicConfig, getLogger
|
2021-01-12 20:39:50 +00:00
|
|
|
from os import environ
|
2021-01-10 13:10:39 +00:00
|
|
|
from os.path import exists
|
|
|
|
from pathlib import Path
|
2021-01-14 21:26:13 +00:00
|
|
|
from sys import exit, stdout
|
2021-01-10 13:10:39 +00:00
|
|
|
|
|
|
|
from slixmpp import ClientXMPP
|
|
|
|
|
|
|
|
|
2021-01-13 13:00:59 +00:00
|
|
|
class SimpleMessage:
|
2021-01-10 13:49:22 +00:00
|
|
|
"""A simple message interface."""
|
|
|
|
|
|
|
|
def __init__(self, message):
|
|
|
|
self.message = message
|
|
|
|
|
|
|
|
@property
|
|
|
|
def body(self):
|
|
|
|
return self.message["body"]
|
|
|
|
|
|
|
|
@property
|
2021-01-13 13:51:23 +00:00
|
|
|
def sender(self):
|
2021-01-13 13:42:26 +00:00
|
|
|
return self.message["from"]
|
|
|
|
|
|
|
|
@property
|
|
|
|
def room(self):
|
2021-01-10 15:51:20 +00:00
|
|
|
return self.message["from"].bare
|
2021-01-10 13:49:22 +00:00
|
|
|
|
|
|
|
@property
|
|
|
|
def receiver(self):
|
|
|
|
return self.message["to"]
|
|
|
|
|
2021-01-10 15:31:17 +00:00
|
|
|
@property
|
|
|
|
def nickname(self):
|
|
|
|
return self.message["mucnick"]
|
|
|
|
|
|
|
|
@property
|
|
|
|
def type(self):
|
|
|
|
return self.message["type"]
|
|
|
|
|
2021-01-10 13:49:22 +00:00
|
|
|
|
2021-01-14 21:26:13 +00:00
|
|
|
class Config:
|
|
|
|
"""Bot file configuration."""
|
|
|
|
|
2021-01-14 21:36:03 +00:00
|
|
|
def __init__(self, name, config):
|
|
|
|
self.name = name
|
2021-01-14 21:26:13 +00:00
|
|
|
self.config = config
|
2021-01-14 21:36:03 +00:00
|
|
|
self.section = config[self.name] if self.name in config else {}
|
2021-01-14 21:26:13 +00:00
|
|
|
|
|
|
|
@property
|
|
|
|
def account(self):
|
|
|
|
return self.section.get("account", None)
|
|
|
|
|
|
|
|
@property
|
|
|
|
def password(self):
|
|
|
|
return self.section.get("password", None)
|
|
|
|
|
|
|
|
@property
|
|
|
|
def nick(self):
|
|
|
|
return self.section.get("nick", None)
|
|
|
|
|
|
|
|
|
2021-01-10 13:10:39 +00:00
|
|
|
class Bot(ClientXMPP):
|
2021-01-14 21:26:13 +00:00
|
|
|
"""XMPP bots for humans."""
|
|
|
|
|
2021-01-10 13:10:39 +00:00
|
|
|
def __init__(self):
|
2021-01-14 21:36:03 +00:00
|
|
|
self.name = type(self).__name__.lower()
|
|
|
|
self.CONFIG_FILE = f"{self.name}.conf"
|
|
|
|
|
2021-01-12 20:39:50 +00:00
|
|
|
self.parse_arguments()
|
2021-01-13 22:19:06 +00:00
|
|
|
self.setup_logging()
|
2021-01-10 13:10:39 +00:00
|
|
|
self.read_config()
|
|
|
|
self.init_bot()
|
|
|
|
self.register_xmpp_event_handlers()
|
|
|
|
self.register_xmpp_plugins()
|
|
|
|
self.run()
|
|
|
|
|
2021-01-12 20:39:50 +00:00
|
|
|
def parse_arguments(self):
|
|
|
|
"""Parse command-line arguments."""
|
2021-01-14 21:26:13 +00:00
|
|
|
self.parser = ArgumentParser(description="XMPP bots for humans")
|
|
|
|
|
2021-01-13 22:19:06 +00:00
|
|
|
self.parser.add_argument(
|
|
|
|
"-d",
|
|
|
|
"--debug",
|
2021-01-14 21:26:13 +00:00
|
|
|
help="Enable verbose debug logs",
|
2021-01-13 22:19:06 +00:00
|
|
|
action="store_const",
|
|
|
|
dest="log_level",
|
|
|
|
const=DEBUG,
|
|
|
|
default=INFO,
|
|
|
|
)
|
2021-01-14 21:26:13 +00:00
|
|
|
self.parser.add_argument(
|
|
|
|
"-a",
|
|
|
|
"--account",
|
|
|
|
dest="account",
|
|
|
|
help="Account for the bot account (foo@example.com)",
|
|
|
|
)
|
|
|
|
self.parser.add_argument(
|
|
|
|
"-p",
|
|
|
|
"--password",
|
|
|
|
dest="password",
|
|
|
|
help="Password for the bot account",
|
|
|
|
)
|
|
|
|
self.parser.add_argument(
|
|
|
|
"-n",
|
|
|
|
"--nick",
|
|
|
|
dest="nick",
|
|
|
|
help="Nickname for the bot account",
|
|
|
|
)
|
|
|
|
|
2021-01-12 20:39:50 +00:00
|
|
|
self.args = self.parser.parse_args()
|
|
|
|
|
2021-01-13 22:19:06 +00:00
|
|
|
def setup_logging(self):
|
|
|
|
"""Arrange logging for the bot."""
|
|
|
|
basicConfig(
|
|
|
|
level=self.args.log_level, format="%(levelname)-8s %(message)s"
|
|
|
|
)
|
2021-01-14 18:18:27 +00:00
|
|
|
self.log = getLogger(__name__)
|
2021-01-13 22:19:06 +00:00
|
|
|
|
2021-01-10 13:10:39 +00:00
|
|
|
def read_config(self):
|
|
|
|
"""Read configuration for running bot."""
|
2021-01-14 21:26:13 +00:00
|
|
|
config = ConfigParser()
|
2021-01-12 20:39:50 +00:00
|
|
|
|
2021-01-10 13:10:39 +00:00
|
|
|
config_file_path = Path(self.CONFIG_FILE).absolute()
|
2021-01-14 21:26:13 +00:00
|
|
|
|
|
|
|
if not exists(config_file_path) and stdout.isatty():
|
2021-01-12 20:39:50 +00:00
|
|
|
self.generate_config_interactively()
|
2021-01-10 13:10:39 +00:00
|
|
|
|
2021-01-12 20:39:50 +00:00
|
|
|
if exists(config_file_path):
|
2021-01-14 21:26:13 +00:00
|
|
|
config.read(config_file_path)
|
2021-01-10 13:10:39 +00:00
|
|
|
|
2021-01-14 21:36:03 +00:00
|
|
|
self.config = Config(self.name, config)
|
2021-01-10 13:10:39 +00:00
|
|
|
|
2021-01-12 20:39:50 +00:00
|
|
|
def generate_config_interactively(self):
|
2021-01-10 13:10:39 +00:00
|
|
|
"""Generate bot configuration."""
|
2021-01-14 21:26:13 +00:00
|
|
|
account = input("Account: ")
|
|
|
|
password = getpass("Password: ")
|
|
|
|
nick = input("Nickname: ")
|
2021-01-10 13:10:39 +00:00
|
|
|
|
|
|
|
config = ConfigParser()
|
2021-01-14 21:36:03 +00:00
|
|
|
config[self.name] = {"account": account, "password": password}
|
2021-01-10 13:10:39 +00:00
|
|
|
|
2021-01-10 14:45:12 +00:00
|
|
|
if nick:
|
2021-01-14 21:36:03 +00:00
|
|
|
config[self.name]["nick"] = nick
|
2021-01-10 14:45:12 +00:00
|
|
|
|
2021-01-14 21:36:03 +00:00
|
|
|
with open(self.CONFIG_FILE, "w") as file_handle:
|
2021-01-10 13:10:39 +00:00
|
|
|
config.write(file_handle)
|
|
|
|
|
|
|
|
def init_bot(self):
|
|
|
|
"""Initialise bot with connection details."""
|
2021-01-14 21:26:13 +00:00
|
|
|
account = (
|
|
|
|
self.args.account
|
|
|
|
or self.config.account
|
|
|
|
or environ.get("XBOT_ACCOUNT", None)
|
|
|
|
)
|
|
|
|
password = (
|
|
|
|
self.args.password
|
|
|
|
or self.config.password
|
|
|
|
or environ.get("XBOT_PASSWORD", None)
|
|
|
|
)
|
|
|
|
nick = (
|
|
|
|
self.args.nick or self.config.nick or environ.get("XBOT_NICK", None)
|
|
|
|
)
|
|
|
|
|
|
|
|
if not account:
|
|
|
|
self.log.error("Unable to discover account")
|
|
|
|
exit(1)
|
|
|
|
if not password:
|
|
|
|
self.log.error("Unable to discover password")
|
|
|
|
exit(1)
|
|
|
|
if not nick:
|
|
|
|
self.log.error("Unable to discover nick")
|
|
|
|
exit(1)
|
|
|
|
|
|
|
|
ClientXMPP.__init__(self, account, password)
|
|
|
|
|
|
|
|
self.account = account
|
|
|
|
self.password = password
|
|
|
|
self.nick = nick
|
2021-01-10 13:10:39 +00:00
|
|
|
|
|
|
|
def register_xmpp_event_handlers(self):
|
|
|
|
"""Register functions against specific XMPP event handlers."""
|
|
|
|
self.add_event_handler("session_start", self.session_start)
|
2021-01-13 21:49:52 +00:00
|
|
|
self.add_event_handler("groupchat_invite", self.group_invite)
|
2021-01-13 20:56:45 +00:00
|
|
|
self.add_event_handler("message", self.direct_message)
|
|
|
|
self.add_event_handler("groupchat_message", self.group_message)
|
2021-01-10 13:10:39 +00:00
|
|
|
|
2021-01-13 20:56:45 +00:00
|
|
|
def direct_message(self, message):
|
2021-01-10 13:10:39 +00:00
|
|
|
"""Handle message event."""
|
|
|
|
if message["type"] in ("chat", "normal"):
|
2021-01-14 18:34:13 +00:00
|
|
|
try:
|
|
|
|
self.direct(SimpleMessage(message))
|
|
|
|
except AttributeError:
|
|
|
|
self.log.info("Bot.direct not implemented")
|
2021-01-10 13:10:39 +00:00
|
|
|
|
2021-01-13 21:49:52 +00:00
|
|
|
def session_start(self, message):
|
2021-01-10 13:10:39 +00:00
|
|
|
"""Handle session_start event."""
|
|
|
|
self.send_presence()
|
|
|
|
self.get_roster()
|
|
|
|
|
2021-01-13 21:49:52 +00:00
|
|
|
def group_invite(self, message):
|
|
|
|
"""Accept invites to group chats."""
|
2021-01-14 21:26:13 +00:00
|
|
|
self.plugin["xep_0045"].join_muc(message["from"], self.config.nick)
|
2021-01-10 15:31:17 +00:00
|
|
|
|
2021-01-13 20:56:45 +00:00
|
|
|
def group_message(self, message):
|
2021-01-10 13:10:39 +00:00
|
|
|
"""Handle groupchat_message event."""
|
2021-01-10 15:31:17 +00:00
|
|
|
if message["type"] in ("groupchat", "normal"):
|
2021-01-14 21:26:13 +00:00
|
|
|
if message["mucnick"] != self.config.nick:
|
2021-01-14 18:34:13 +00:00
|
|
|
try:
|
|
|
|
self.group(SimpleMessage(message))
|
|
|
|
except AttributeError:
|
|
|
|
self.log.info("Bot.group not implemented")
|
2021-01-10 13:10:39 +00:00
|
|
|
|
|
|
|
def register_xmpp_plugins(self):
|
|
|
|
"""Register XMPP plugins that the bot supports."""
|
|
|
|
self.register_plugin("xep_0030") # Service Discovery
|
|
|
|
self.register_plugin("xep_0045") # Multi-User Chat
|
|
|
|
self.register_plugin("xep_0199") # XMPP Ping
|
|
|
|
|
|
|
|
def run(self):
|
|
|
|
"""Run the bot."""
|
|
|
|
self.connect()
|
2021-01-10 14:37:10 +00:00
|
|
|
|
|
|
|
try:
|
|
|
|
self.process()
|
|
|
|
except KeyboardInterrupt:
|
|
|
|
pass
|
2021-01-10 13:49:22 +00:00
|
|
|
|
2021-01-10 18:41:56 +00:00
|
|
|
def reply(self, body, to=None, room=None):
|
2021-01-10 18:17:10 +00:00
|
|
|
"""Send back a reply."""
|
|
|
|
if to is None and room is None:
|
2021-01-14 21:44:48 +00:00
|
|
|
self.log("`to` or `room` arguments required for `reply`")
|
|
|
|
exit(1)
|
2021-01-10 18:17:10 +00:00
|
|
|
|
|
|
|
if to is not None and room is not None:
|
2021-01-14 21:44:48 +00:00
|
|
|
self.log.error("Cannot send to both `to` and `room` for `reply`")
|
|
|
|
exit(1)
|
2021-01-10 18:17:10 +00:00
|
|
|
|
|
|
|
kwargs = {"mbody": body}
|
|
|
|
if to is not None:
|
|
|
|
kwargs["mto"] = to
|
|
|
|
kwargs["mtype"] = "chat"
|
|
|
|
else:
|
|
|
|
kwargs["mto"] = room
|
|
|
|
kwargs["mtype"] = "groupchat"
|
|
|
|
|
|
|
|
self.send_message(**kwargs)
|
2021-01-10 18:29:34 +00:00
|
|
|
|
2021-01-13 13:08:44 +00:00
|
|
|
|
|
|
|
class EchoBot(Bot):
|
2021-01-13 20:56:45 +00:00
|
|
|
"""Responds with whatever you send.
|
2021-01-13 13:08:44 +00:00
|
|
|
|
2021-01-13 20:56:45 +00:00
|
|
|
Simply direct message the bot and see if you get back what you sent. It
|
|
|
|
also works in group chats but in this case you need to summon the bot using
|
|
|
|
its nickname. Usually like so.
|
2021-01-13 13:08:44 +00:00
|
|
|
|
|
|
|
echobot:foo
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
2021-01-13 20:56:45 +00:00
|
|
|
def direct(self, message):
|
|
|
|
"""Send back whatever we receive."""
|
|
|
|
self.reply(message.body, to=message.sender)
|
2021-01-13 13:31:00 +00:00
|
|
|
|
2021-01-13 20:56:45 +00:00
|
|
|
def group(self, message):
|
|
|
|
"""Send back whatever receive in group chats."""
|
|
|
|
if "echobot" in message.body:
|
|
|
|
self.reply(message.body.split(":")[-1], room=message.room)
|
2021-01-13 13:08:44 +00:00
|
|
|
|
|
|
|
|
|
|
|
class WhisperBot(Bot):
|
2021-01-13 20:56:45 +00:00
|
|
|
"""Anonymous whispering in group chats.
|
2021-01-13 13:08:44 +00:00
|
|
|
|
|
|
|
In order to activate this bot you can invite it to your group chat. Once
|
2021-01-13 20:56:45 +00:00
|
|
|
invited, you can start a private chat with the bot and tell it you want it
|
|
|
|
to whisper your message into the group chat. The bot will then do this on
|
|
|
|
your behalf and not reveal your identity. This is nice when you want to
|
|
|
|
communicate with the group anonymously.
|
2021-01-13 13:08:44 +00:00
|
|
|
|
|
|
|
"""
|
|
|
|
|
2021-01-13 20:56:45 +00:00
|
|
|
def direct(self, message):
|
|
|
|
"""Receive private messages and whisper them into group chats."""
|
|
|
|
self.reply(f"*pssttt...* {message.body}", room=message.room)
|