Sfoglia il codice sorgente

source.join() now blocks when in the middle of processing.

tags/v0.2
Ben Kurtovic 10 anni fa
parent
commit
8e439e1eea
1 ha cambiato i file con 29 aggiunte e 16 eliminazioni
  1. +29
    -16
      earwigbot/wiki/copyvios/workers.py

+ 29
- 16
earwigbot/wiki/copyvios/workers.py Vedi File

@@ -96,29 +96,37 @@ class _CopyvioSource(object):
self.confidence = 0.0
self.chains = (EMPTY, EMPTY_INTERSECTION)

self._event = Event()
self._event1 = Event()
self._event2 = Event()
self._event2.set()

def active(self):
"""Return whether or not this source needs to be filled out."""
return not self._event.is_set()
def touched(self):
"""Return whether one of start_work() and cancel() have been called."""
return self._event1.is_set()

def complete(self, confidence, source_chain, delta_chain):
def start_work(self):
"""Mark this source as being worked on right now."""
self._event2.clear()
self._event1.set()

def finish_work(self, confidence, source_chain, delta_chain):
"""Complete the confidence information inside this source."""
self.confidence = confidence
self.chains = (source_chain, delta_chain)
self._event.set()
self._event2.set()

def cancel(self):
"""Deactivate this source without filling in the relevant data."""
self._event.set()
self._event1.set()

def join(self, until):
"""Block until this violation result is filled out."""
if until:
timeout = until - time()
if timeout <= 0:
return
self._event.wait(timeout)
for event in [self._event1, self._event2]:
if until:
timeout = until - time()
if timeout <= 0:
return
event.wait(timeout)


class _CopyvioQueues(object):
@@ -231,12 +239,15 @@ class _CopyvioWorker(object):
self._queue = None
self._queues.lock.release()
return self._dequeue()
self._queues.lock.release()

self._logger.debug(u"Got source URL: {0}".format(source.url))
if not source.active():
self._logger.debug("Source is inactive")
if source.touched():
self._logger.debug("Source has been cancelled")
self._queues.lock.release()
return self._dequeue()

source.start_work()
self._queues.lock.release()
return source

def _run(self):
@@ -371,6 +382,8 @@ class CopyvioWorkspace(object):
self._logger.debug("Waiting on {0} sources".format(len(self.sources)))
for source in self.sources:
source.join(self._until)
with self._compare_lock:
pass # Wait for any remaining comparisons to be finished
if not _is_globalized:
for i in xrange(self._num_workers):
self._queues.unassigned.put((StopIteration, None))
@@ -379,10 +392,10 @@ class CopyvioWorkspace(object):
"""Compare a source to the article, and update the best known one."""
delta = MarkovChainIntersection(self._article, source_chain)
conf = self._calculate_confidence(delta)
source.complete(conf, source_chain, delta)
self._logger.debug(u"compare(): {0} -> {1}".format(source.url, conf))

with self._compare_lock:
source.finish_work(conf, source_chain, delta)
if conf > self.best.confidence:
self.best = source
if conf >= self._min_confidence:


Caricamento…
Annulla
Salva