@@ -21,7 +21,7 @@ | |||||
# SOFTWARE. | # SOFTWARE. | ||||
import logging | import logging | ||||
from threading import Lock, Thread | |||||
from threading import Lock, Thread, enumerate as enumerate_threads | |||||
from time import sleep, time | from time import sleep, time | ||||
from earwigbot.config import BotConfig | from earwigbot.config import BotConfig | ||||
@@ -104,6 +104,31 @@ class Bot(object): | |||||
if self.watcher: | if self.watcher: | ||||
self.watcher.stop(msg) | self.watcher.stop(msg) | ||||
def _stop_task_threads(self): | |||||
"""Notify the user of which task threads are going to be killed. | |||||
Unfortunately, there is no method right now of stopping task threads | |||||
safely. This is because there is no way to tell them to stop like the | |||||
IRC components can be told; furthermore, they are run as daemons, and | |||||
daemon threads automatically stop without calling any __exit__ or | |||||
try/finally code when all non-daemon threads stop. They were originally | |||||
implemented as regular non-daemon threads, but this meant there was no | |||||
way to completely stop the bot if tasks were running, because all other | |||||
threads would exit and threading would absorb KeyboardInterrupts. | |||||
The advantage of this is that stopping the bot is truly guarenteed to | |||||
*stop* the bot, while the disadvantage is that the tasks are given no | |||||
advance warning of their forced shutdown. | |||||
""" | |||||
tasks = [] | |||||
non_tasks = self.config.components.keys() + ["MainThread", "reminder"] | |||||
for thread in enumerate_threads(): | |||||
if thread.name not in non_tasks and thread.is_alive(): | |||||
tasks.append(thread.name) | |||||
if tasks: | |||||
log = "The following tasks will be killed: {0}" | |||||
self.logger.warn(log.format(" ".join(tasks))) | |||||
def run(self): | def run(self): | ||||
"""Main entry point into running the bot. | """Main entry point into running the bot. | ||||
@@ -160,3 +185,4 @@ class Bot(object): | |||||
with self.component_lock: | with self.component_lock: | ||||
self._stop_irc_components(msg) | self._stop_irc_components(msg) | ||||
self._keep_looping = False | self._keep_looping = False | ||||
self._stop_task_threads() |
@@ -37,8 +37,10 @@ class Command(BaseCommand): | |||||
try: | try: | ||||
self.statistics = self.bot.tasks.get("afc_statistics") | self.statistics = self.bot.tasks.get("afc_statistics") | ||||
except KeyError: | except KeyError: | ||||
e = "Cannot run command: requires afc_statistics task (from earwigbot_plugins)." | |||||
e = "Cannot run command: requires afc_statistics task (from earwigbot_plugins)" | |||||
self.logger.error(e) | self.logger.error(e) | ||||
msg = "command requires afc_statistics task (from earwigbot_plugins)" | |||||
self.reply(data, msg) | |||||
return | return | ||||
if not data.args: | if not data.args: | ||||
@@ -301,7 +301,7 @@ class BotConfig(object): | |||||
class _ConfigNode(object): | class _ConfigNode(object): | ||||
def __iter__(self): | def __iter__(self): | ||||
for key in self.__dict__.iterkeys(): | |||||
for key in self.__dict__: | |||||
yield key | yield key | ||||
def __getitem__(self, item): | def __getitem__(self, item): | ||||
@@ -330,6 +330,24 @@ class _ConfigNode(object): | |||||
def get(self, *args, **kwargs): | def get(self, *args, **kwargs): | ||||
return self.__dict__.get(*args, **kwargs) | return self.__dict__.get(*args, **kwargs) | ||||
def keys(self): | |||||
return self.__dict__.keys() | |||||
def values(self): | |||||
return self.__dict__.values() | |||||
def items(self): | |||||
return self.__dict__.items() | |||||
def iterkeys(self): | |||||
return self.__dict__.iterkeys() | |||||
def itervalues(self): | |||||
return self.__dict__.itervalues() | |||||
def iteritems(self): | |||||
return self.__dict__.iteritems() | |||||
class _BotFormatter(logging.Formatter): | class _BotFormatter(logging.Formatter): | ||||
def __init__(self, color=False): | def __init__(self, color=False): | ||||
@@ -53,7 +53,7 @@ class IRCConnection(object): | |||||
try: | try: | ||||
self._sock.connect((self.host, self.port)) | self._sock.connect((self.host, self.port)) | ||||
except socket.error: | except socket.error: | ||||
self.logger.exception("Couldn't connect to IRC server") | |||||
self.logger.exception("Couldn't connect to IRC server; retrying") | |||||
sleep(8) | sleep(8) | ||||
self._connect() | self._connect() | ||||
self._send("NICK {0}".format(self.nick)) | self._send("NICK {0}".format(self.nick)) | ||||
@@ -179,21 +179,27 @@ class TaskManager(_ResourceManager): | |||||
self.logger.info(msg.format(task.name)) | self.logger.info(msg.format(task.name)) | ||||
def start(self, task_name, **kwargs): | def start(self, task_name, **kwargs): | ||||
"""Start a given task in a new thread. kwargs are passed to task.run""" | |||||
"""Start a given task in a new daemon thread, and return the thread. | |||||
kwargs are passed to task.run(). If the task is not found, None will be | |||||
returned. | |||||
""" | |||||
msg = "Starting task '{0}' in a new thread" | msg = "Starting task '{0}' in a new thread" | ||||
self.logger.info(msg.format(task_name)) | self.logger.info(msg.format(task_name)) | ||||
try: | try: | ||||
task = self.get(task_name) | task = self.get(task_name) | ||||
except KeyError: | except KeyError: | ||||
e = "Couldn't find task '{0}':" | |||||
e = "Couldn't find task '{0}'" | |||||
self.logger.error(e.format(task_name)) | self.logger.error(e.format(task_name)) | ||||
return | return | ||||
task_thread = Thread(target=self._wrapper, args=(task,), kwargs=kwargs) | task_thread = Thread(target=self._wrapper, args=(task,), kwargs=kwargs) | ||||
start_time = strftime("%b %d %H:%M:%S") | start_time = strftime("%b %d %H:%M:%S") | ||||
task_thread.name = "{0} ({1})".format(task_name, start_time) | task_thread.name = "{0} ({1})".format(task_name, start_time) | ||||
task_thread.daemon = True | |||||
task_thread.start() | task_thread.start() | ||||
return task_thread | |||||
def schedule(self, now=None): | def schedule(self, now=None): | ||||
"""Start all tasks that are supposed to be run at a given time.""" | """Start all tasks that are supposed to be run at a given time.""" | ||||
@@ -207,7 +207,7 @@ class Task(BaseTask): | |||||
replag = self.site.get_replag() | replag = self.site.get_replag() | ||||
self.logger.debug("Server replag is {0}".format(replag)) | self.logger.debug("Server replag is {0}".format(replag)) | ||||
if replag > 600 and not kwargs.get("ignore_replag"): | if replag > 600 and not kwargs.get("ignore_replag"): | ||||
msg = "Sync canceled as replag ({0} secs) is greater than ten minutes." | |||||
msg = "Sync canceled as replag ({0} secs) is greater than ten minutes" | |||||
self.logger.warn(msg.format(replag)) | self.logger.warn(msg.format(replag)) | ||||
return | return | ||||
@@ -26,9 +26,10 @@ This is EarwigBot's command-line utility, enabling you to easily start the | |||||
bot or run specific tasks. | bot or run specific tasks. | ||||
""" | """ | ||||
import argparse | |||||
from argparse import ArgumentParser | |||||
import logging | import logging | ||||
from os import path | from os import path | ||||
from time import sleep | |||||
from earwigbot import __version__ | from earwigbot import __version__ | ||||
from earwigbot.bot import Bot | from earwigbot.bot import Bot | ||||
@@ -37,7 +38,7 @@ __all__ = ["main"] | |||||
def main(): | def main(): | ||||
version = "EarwigBot v{0}".format(__version__) | version = "EarwigBot v{0}".format(__version__) | ||||
parser = argparse.ArgumentParser(description=__doc__) | |||||
parser = ArgumentParser(description=__doc__) | |||||
parser.add_argument("path", nargs="?", metavar="PATH", default=path.curdir, | parser.add_argument("path", nargs="?", metavar="PATH", default=path.curdir, | ||||
help="path to the bot's working directory, which will be created if it doesn't exist; current directory assumed if not specified") | help="path to the bot's working directory, which will be created if it doesn't exist; current directory assumed if not specified") | ||||
parser.add_argument("-v", "--version", action="version", version=version) | parser.add_argument("-v", "--version", action="version", version=version) | ||||
@@ -63,7 +64,17 @@ def main(): | |||||
bot = Bot(path.abspath(args.path), level=level) | bot = Bot(path.abspath(args.path), level=level) | ||||
if args.task: | if args.task: | ||||
bot.tasks.start(args.task) | |||||
thread = bot.tasks.start(args.task) | |||||
if not thread: | |||||
return | |||||
try: | |||||
while thread.is_alive(): # Keep it alive; it's a daemon | |||||
sleep(1) | |||||
except KeyboardInterrupt: | |||||
pass | |||||
finally: | |||||
if thread.is_alive(): | |||||
bot.tasks.logger.warn("The task is will be killed") | |||||
else: | else: | ||||
try: | try: | ||||
bot.run() | bot.run() | ||||
@@ -242,7 +242,7 @@ class Site(object): | |||||
e = "Maximum number of retries reached ({0})." | e = "Maximum number of retries reached ({0})." | ||||
raise SiteAPIError(e.format(self._max_retries)) | raise SiteAPIError(e.format(self._max_retries)) | ||||
tries += 1 | tries += 1 | ||||
msg = 'Server says: "{0}". Retrying in {1} seconds ({2}/{3}).' | |||||
msg = 'Server says "{0}"; retrying in {1} seconds ({2}/{3})' | |||||
logger.info(msg.format(info, wait, tries, self._max_retries)) | logger.info(msg.format(info, wait, tries, self._max_retries)) | ||||
sleep(wait) | sleep(wait) | ||||
return self._api_query(params, tries=tries, wait=wait*3) | return self._api_query(params, tries=tries, wait=wait*3) | ||||