import threading
import queue
import logging
[docs]class Dispatcher(object):
"""A dispatcher to relay the received messages to the appropriate
commands.
Args:
inc_queue: The incoming message queue.
database_connection: The datanase interface.
consumer: The discord REST API consumer to send messages through.
"""
COMMAND_PREFIX = "!c/" # Must end with delimiter
DELIMITER = "/"
def __init__(self, inc_queue, database_connection, consumer, commands, queue_timeout=10, logging=logging):
self.inc_queue = inc_queue
self.data_conn = database_connection
self.consumer = consumer
self.commands = commands
self.running = False
self.queue_timeout = queue_timeout
self.logger = logging.getLogger(__name__)
[docs] def is_command(self, content):
"""Indicates whether a message is a command.
Args:
content: The contents of the message.
Returns:
A boolean indicating whether the message is a command.
"""
return len(content) >= len(Dispatcher.COMMAND_PREFIX) and content[:3] == Dispatcher.COMMAND_PREFIX
[docs] def parse(self, content):
"""Parses a message to extract the command and its parameters.
Args:
content: The contents of the message.
Returns:
A tuple containing the command identifier and the parameters.
"""
if self.is_command(content):
prefix, command, *params = content.split(Dispatcher.DELIMITER)
return command, params
return None, None
[docs] def dispatch(self, command_id, message, *args):
"""Calls the appropriate command with the given parameters.
Args:
command_id: The string identifier of the command.
params: The parameters to be passed to the command.
message: The full message which contained the command.
Returns:
The results of the command.
"""
self.logger.info("Command called by %s(%s): %s.", message.username, message.author_id, command_id)
command = self.commands.identifiers[command_id]
return command(message, self.data_conn, *args)
[docs] def process_message(self, message):
"""Calls the appropriate command with the given parameters.
Args:
command_id: The string identifier of the command.
params: The parameters to be passed to the command.
message: The full message which contained the command.
Returns:
The results of the command.
"""
command, params = self.parse(message.content)
if command:
if command in self.commands.identifiers:
try:
response = self.dispatch(command, message, *params)
except Exception as e:
self.logger.critical("Unexpected command failure", exc_info=True)
response = self.dispatch("unknown_command", message)
else:
response = self.dispatch("unknown_command", message)
if response is not None:
self.consumer.create_message(response)
self.dispatch("store", message)
[docs] def run(self):
"""Listens for new messages in the incoming queue and dispatches
them to the appropriate commands and stores them.
"""
self.logger.info("Dispatcher loop started.")
while self.running:
try:
message = self.inc_queue.get(timeout=self.queue_timeout)
except queue.Empty:
continue
if message == None:
self.stop()
break
self.process_message(message)
self.logger.info("Dispatcher loop ended.")
[docs] def start(self):
"""Starts listening for incoming messages."""
self.logger.info("Dispatcher thread starting.")
self.running = True
t = threading.Thread(target=self.run)
t.start()
return t
[docs] def stop(self):
"""Stops listening for incoming messages."""
self.logger.info("Dispatcher thread stopping.")
self.running = False