diff --git a/earwigbot/wiki/copyvios/workers.py b/earwigbot/wiki/copyvios/workers.py index 1573a06..de33b82 100644 --- a/earwigbot/wiki/copyvios/workers.py +++ b/earwigbot/wiki/copyvios/workers.py @@ -60,8 +60,7 @@ def globalize(num_workers=8): _global_queues = _CopyvioQueues() for i in xrange(num_workers): - worker = _CopyvioWorker(_global_queues) - worker.start("global-{0}".format(i)) + worker = _CopyvioWorker("global-{0}".format(i), _global_queues) _global_workers.append(worker) _is_globalized = True @@ -134,16 +133,19 @@ class _CopyvioQueues(object): class _CopyvioWorker(object): """A multithreaded URL opener/parser instance.""" - def __init__(self, queues, until=None): + def __init__(self, name, queues, until=None): self._queues = queues self._until = until - self._thread = None + self._thread = thread = Thread(target=self._run) self._site = None self._queue = None self._opener = build_opener() + self._logger = getLogger("earwigbot.wiki.cvworker." + name) - self._logger = None + thread.name = "cvworker-" + name + thread.daemon = True + thread.start() def _open_url(self, source): """Open a URL and return its parsed content, or None. @@ -195,42 +197,44 @@ class _CopyvioWorker(object): return handler(content) - def _dequeue(self): - """Remove a source from one of the queues.""" + def _acquire_new_site(self): + """Block for a new unassigned site queue.""" if self._until: timeout = self._until - time() if timeout <= 0: - return + raise Empty else: timeout = None - if self._queue: - self._logger.debug(u"Popping source from existing queue ({0})".format(self._site)) + self._logger.debug("Waiting for new site queue") + site, queue = self._queues.unassigned.get(timeout=timeout) + if site is StopIteration: + raise StopIteration + self._logger.debug(u"Acquired new site queue: {0}".format(site)) + self._site = site + self._queue = queue + + def _dequeue(self): + """Remove a source from one of the queues.""" + if not self._queue: + self._acquire_new_site() + + logmsg = u"Fetching a new source URL from site queue {0}" + self._logger.debug(logmsg.format(self._site)) + self._queues.lock.acquire() + try: source = self._queue.pop() - self._logger.debug(u"Got URL: {0}".format(source.url)) - with self._queues.lock: - if not self._queue: - self._logger.debug(u"Destroying site {0}".format(self._site)) - del self._queues.sites[self._site] - self._queue = None - else: - self._logger.debug("Waiting for unassigned URL queue") - site, queue = self._queues.unassigned.get(timeout=timeout) - if site is StopIteration: - return StopIteration - self._logger.debug(u"Got queue: {0}".format(site)) - source = queue.pop() - self._logger.debug(u"Got URL: {0}".format(source.url)) - with self._queues.lock: - if not queue: - self._logger.debug(u"Destroying site {0}".format(site)) - del self._queues.sites[site] - else: - self._logger.debug(u"Saving site {0}".format(site)) - self._site = site - self._queue = queue + except IndexError: + self._logger.debug("Queue is empty") + del self._queues.sites[self._site] + self._queue = None + self._queues.lock.release() + return self._dequeue() + self._queues.lock.release() + + self._logger.debug(u"Got source URL: {0}".format(source.url)) if not source.active(): - self._logger.debug(u"Inactive source; trying again") + self._logger.debug("Source is inactive") return self._dequeue() return source @@ -245,21 +249,15 @@ class _CopyvioWorker(object): try: source = self._dequeue() except Empty: + self._logger.debug("Exiting: queue timed out") return - if source is StopIteration: + except StopIteration: + self._logger.debug("Exiting: got stop signal") return text = self._open_url(source) if text: source.workspace.compare(source, MarkovChain(text)) - def start(self, name): - """Start the worker in a new thread, with a given name.""" - self._logger = getLogger("earwigbot.wiki.cvworker." + name) - self._thread = thread = Thread(target=self._run) - thread.name = "cvworker-" + name - thread.daemon = True - thread.start() - class CopyvioWorkspace(object): """Manages a single copyvio check distributed across threads.""" @@ -284,8 +282,8 @@ class CopyvioWorkspace(object): else: self._queues = _CopyvioQueues() for i in xrange(num_workers): - worker = _CopyvioWorker(self._queues, until) - worker.start("local-{0:04}.{1}".format(id(self) % 10000, i)) + name = "local-{0:04}.{1}".format(id(self) % 10000, i) + worker = _CopyvioWorker(name, self._queues, until) def _calculate_confidence(self, delta): """Return the confidence of a violation as a float between 0 and 1."""