From e73e626994b47dedde2297828df1a880ec16bc60 Mon Sep 17 00:00:00 2001 From: Ben Kurtovic Date: Sun, 31 Aug 2014 16:36:44 -0500 Subject: [PATCH] Some locks needed to be tightened. --- earwigbot/wiki/copyvios/workers.py | 66 ++++++++++++++++++++------------------ 1 file changed, 34 insertions(+), 32 deletions(-) diff --git a/earwigbot/wiki/copyvios/workers.py b/earwigbot/wiki/copyvios/workers.py index 0ccac2d..5f75594 100644 --- a/earwigbot/wiki/copyvios/workers.py +++ b/earwigbot/wiki/copyvios/workers.py @@ -201,25 +201,26 @@ class _CopyvioWorker(object): else: timeout = None - with self._queues.lock: - if self._queue: - source = self._queue.get(timeout=timeout) - if self._queue.empty(): + if self._queue: + source = self._queue.pop() + with self._queues.lock: + if not self._queue: del self._queues.sites[self._site] self._queue = None - else: - site, queue = self._queues.unassigned.get(timeout=timeout) - if site is StopIteration: - return StopIteration - source = queue.get_nowait() - if queue.empty(): + else: + site, queue = self._queues.unassigned.get(timeout=timeout) + if site is StopIteration: + return StopIteration + source = queue.pop() + with self._queues.lock: + if not queue: del self._queues.sites[site] else: self._site = site self._queue = queue - if not source.active(): - return self._dequeue() - return source + if not source.active(): + return self._dequeue() + return source def _run(self): """Main entry point for the worker thread. @@ -326,30 +327,31 @@ class CopyvioWorkspace(object): returns ``True`` if we should skip it and ``False`` otherwise. """ for url in urls: - if self._is_finished: - break - if url in self._handled_urls: - continue - self._handled_urls.append(url) - if exclude_check and exclude_check(url): - continue - - try: - key = tldextract.extract(url).registered_domain - except ImportError: # Fall back on very naive method - from urlparse import urlparse - key = u".".join(urlparse(url).netloc.split(".")[-2:]) - - source = _CopyvioSource(url=url, key=key, **self._source_args) - logmsg = u"enqueue(): {0} {1} -> {2}" with self._queues.lock: + if self._is_finished: + break + if url in self._handled_urls: + continue + self._handled_urls.append(url) + if exclude_check and exclude_check(url): + continue + + try: + key = tldextract.extract(url).registered_domain + except ImportError: # Fall back on very naive method + from urlparse import urlparse + key = u".".join(urlparse(url).netloc.split(".")[-2:]) + + source = _CopyvioSource(url=url, key=key, **self._source_args) + self.sources.append(source) + logmsg = u"enqueue(): {0} {1} -> {2}" if key in self._queues.sites: self._logger.debug(logmsg.format("append", key, url)) - self._queues.sites[key].put(source) + self._queues.sites[key].append(source) else: self._logger.debug(logmsg.format("new", key, url)) - self._queues.sites[key] = queue = Queue() - queue.put(source) + self._queues.sites[key] = queue = [] + queue.append(source) self._queues.unassigned.put((key, queue)) def wait(self):