Procházet zdrojové kódy

Fix dequeueing logic a bit.

tags/v0.2
Ben Kurtovic před 10 roky
rodič
revize
de7576728f
1 změnil soubory, kde provedl 42 přidání a 44 odebrání
  1. +42
    -44
      earwigbot/wiki/copyvios/workers.py

+ 42
- 44
earwigbot/wiki/copyvios/workers.py Zobrazit soubor

@@ -60,8 +60,7 @@ def globalize(num_workers=8):

_global_queues = _CopyvioQueues()
for i in xrange(num_workers):
worker = _CopyvioWorker(_global_queues)
worker.start("global-{0}".format(i))
worker = _CopyvioWorker("global-{0}".format(i), _global_queues)
_global_workers.append(worker)
_is_globalized = True

@@ -134,16 +133,19 @@ class _CopyvioQueues(object):
class _CopyvioWorker(object):
"""A multithreaded URL opener/parser instance."""

def __init__(self, queues, until=None):
def __init__(self, name, queues, until=None):
self._queues = queues
self._until = until

self._thread = None
self._thread = thread = Thread(target=self._run)
self._site = None
self._queue = None
self._opener = build_opener()
self._logger = getLogger("earwigbot.wiki.cvworker." + name)

self._logger = None
thread.name = "cvworker-" + name
thread.daemon = True
thread.start()

def _open_url(self, source):
"""Open a URL and return its parsed content, or None.
@@ -195,42 +197,44 @@ class _CopyvioWorker(object):

return handler(content)

def _dequeue(self):
"""Remove a source from one of the queues."""
def _acquire_new_site(self):
"""Block for a new unassigned site queue."""
if self._until:
timeout = self._until - time()
if timeout <= 0:
return
raise Empty
else:
timeout = None

if self._queue:
self._logger.debug(u"Popping source from existing queue ({0})".format(self._site))
self._logger.debug("Waiting for new site queue")
site, queue = self._queues.unassigned.get(timeout=timeout)
if site is StopIteration:
raise StopIteration
self._logger.debug(u"Acquired new site queue: {0}".format(site))
self._site = site
self._queue = queue

def _dequeue(self):
"""Remove a source from one of the queues."""
if not self._queue:
self._acquire_new_site()

logmsg = u"Fetching a new source URL from site queue {0}"
self._logger.debug(logmsg.format(self._site))
self._queues.lock.acquire()
try:
source = self._queue.pop()
self._logger.debug(u"Got URL: {0}".format(source.url))
with self._queues.lock:
if not self._queue:
self._logger.debug(u"Destroying site {0}".format(self._site))
del self._queues.sites[self._site]
self._queue = None
else:
self._logger.debug("Waiting for unassigned URL queue")
site, queue = self._queues.unassigned.get(timeout=timeout)
if site is StopIteration:
return StopIteration
self._logger.debug(u"Got queue: {0}".format(site))
source = queue.pop()
self._logger.debug(u"Got URL: {0}".format(source.url))
with self._queues.lock:
if not queue:
self._logger.debug(u"Destroying site {0}".format(site))
del self._queues.sites[site]
else:
self._logger.debug(u"Saving site {0}".format(site))
self._site = site
self._queue = queue
except IndexError:
self._logger.debug("Queue is empty")
del self._queues.sites[self._site]
self._queue = None
self._queues.lock.release()
return self._dequeue()
self._queues.lock.release()

self._logger.debug(u"Got source URL: {0}".format(source.url))
if not source.active():
self._logger.debug(u"Inactive source; trying again")
self._logger.debug("Source is inactive")
return self._dequeue()
return source

@@ -245,21 +249,15 @@ class _CopyvioWorker(object):
try:
source = self._dequeue()
except Empty:
self._logger.debug("Exiting: queue timed out")
return
if source is StopIteration:
except StopIteration:
self._logger.debug("Exiting: got stop signal")
return
text = self._open_url(source)
if text:
source.workspace.compare(source, MarkovChain(text))

def start(self, name):
"""Start the worker in a new thread, with a given name."""
self._logger = getLogger("earwigbot.wiki.cvworker." + name)
self._thread = thread = Thread(target=self._run)
thread.name = "cvworker-" + name
thread.daemon = True
thread.start()


class CopyvioWorkspace(object):
"""Manages a single copyvio check distributed across threads."""
@@ -284,8 +282,8 @@ class CopyvioWorkspace(object):
else:
self._queues = _CopyvioQueues()
for i in xrange(num_workers):
worker = _CopyvioWorker(self._queues, until)
worker.start("local-{0:04}.{1}".format(id(self) % 10000, i))
name = "local-{0:04}.{1}".format(id(self) % 10000, i)
worker = _CopyvioWorker(name, self._queues, until)

def _calculate_confidence(self, delta):
"""Return the confidence of a violation as a float between 0 and 1."""


Načítá se…
Zrušit
Uložit