|
@@ -21,6 +21,7 @@ |
|
|
# SOFTWARE. |
|
|
# SOFTWARE. |
|
|
|
|
|
|
|
|
from gzip import GzipFile |
|
|
from gzip import GzipFile |
|
|
|
|
|
from logging import getLogger |
|
|
from math import log |
|
|
from math import log |
|
|
from Queue import Empty, Queue |
|
|
from Queue import Empty, Queue |
|
|
from socket import error |
|
|
from socket import error |
|
@@ -142,6 +143,8 @@ class _CopyvioWorker(object): |
|
|
self._queue = None |
|
|
self._queue = None |
|
|
self._opener = build_opener() |
|
|
self._opener = build_opener() |
|
|
|
|
|
|
|
|
|
|
|
self._logger = None |
|
|
|
|
|
|
|
|
def _open_url(self, source): |
|
|
def _open_url(self, source): |
|
|
"""Open a URL and return its parsed content, or None. |
|
|
"""Open a URL and return its parsed content, or None. |
|
|
|
|
|
|
|
@@ -202,23 +205,32 @@ class _CopyvioWorker(object): |
|
|
timeout = None |
|
|
timeout = None |
|
|
|
|
|
|
|
|
if self._queue: |
|
|
if self._queue: |
|
|
|
|
|
self._logger.debug(u"Popping source from existing queue ({0})".format(self._site)) |
|
|
source = self._queue.pop() |
|
|
source = self._queue.pop() |
|
|
|
|
|
self._logger.debug(u"Got URL: {0}".format(source.url)) |
|
|
with self._queues.lock: |
|
|
with self._queues.lock: |
|
|
if not self._queue: |
|
|
if not self._queue: |
|
|
|
|
|
self._logger.debug(u"Destroying site {0}".format(self._site)) |
|
|
del self._queues.sites[self._site] |
|
|
del self._queues.sites[self._site] |
|
|
self._queue = None |
|
|
self._queue = None |
|
|
else: |
|
|
else: |
|
|
|
|
|
self._logger.debug("Waiting for unassigned URL queue") |
|
|
site, queue = self._queues.unassigned.get(timeout=timeout) |
|
|
site, queue = self._queues.unassigned.get(timeout=timeout) |
|
|
if site is StopIteration: |
|
|
if site is StopIteration: |
|
|
return StopIteration |
|
|
return StopIteration |
|
|
|
|
|
self._logger.debug(u"Got queue: {0}".format(site)) |
|
|
source = queue.pop() |
|
|
source = queue.pop() |
|
|
|
|
|
self._logger.debug(u"Got URL: {0}".format(source.url)) |
|
|
with self._queues.lock: |
|
|
with self._queues.lock: |
|
|
if not queue: |
|
|
if not queue: |
|
|
|
|
|
self._logger.debug(u"Destroying site {0}".format(site)) |
|
|
del self._queues.sites[site] |
|
|
del self._queues.sites[site] |
|
|
else: |
|
|
else: |
|
|
|
|
|
self._logger.debug(u"Saving site {0}".format(site)) |
|
|
self._site = site |
|
|
self._site = site |
|
|
self._queue = queue |
|
|
self._queue = queue |
|
|
if not source.active(): |
|
|
if not source.active(): |
|
|
|
|
|
self._logger.debug(u"Inactive source; trying again") |
|
|
return self._dequeue() |
|
|
return self._dequeue() |
|
|
return source |
|
|
return source |
|
|
|
|
|
|
|
@@ -246,6 +258,7 @@ class _CopyvioWorker(object): |
|
|
thread.name = "cvworker-" + name |
|
|
thread.name = "cvworker-" + name |
|
|
thread.daemon = True |
|
|
thread.daemon = True |
|
|
thread.start() |
|
|
thread.start() |
|
|
|
|
|
self._logger = getLogger("earwigbot.wiki.cvworker." + name) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class CopyvioWorkspace(object): |
|
|
class CopyvioWorkspace(object): |
|
@@ -358,7 +371,9 @@ class CopyvioWorkspace(object): |
|
|
"""Wait for the workers to finish handling the sources.""" |
|
|
"""Wait for the workers to finish handling the sources.""" |
|
|
self._logger.debug("Waiting on {0} sources".format(len(self.sources))) |
|
|
self._logger.debug("Waiting on {0} sources".format(len(self.sources))) |
|
|
for source in self.sources: |
|
|
for source in self.sources: |
|
|
|
|
|
self._logger.debug("Waiting on source: {0}".format(source.url)) |
|
|
source.join(self._until) |
|
|
source.join(self._until) |
|
|
|
|
|
self._logger.debug("Done waiting") |
|
|
|
|
|
|
|
|
def compare(self, source, source_chain): |
|
|
def compare(self, source, source_chain): |
|
|
"""Compare a source to the article, and update the best known one.""" |
|
|
"""Compare a source to the article, and update the best known one.""" |
|
|