From 8e439e1eea5d439c78cec368e94ed01436c59d51 Mon Sep 17 00:00:00 2001 From: Ben Kurtovic Date: Sun, 31 Aug 2014 22:57:53 -0500 Subject: [PATCH] source.join() now blocks when in the middle of processing. --- earwigbot/wiki/copyvios/workers.py | 45 ++++++++++++++++++++++++-------------- 1 file changed, 29 insertions(+), 16 deletions(-) diff --git a/earwigbot/wiki/copyvios/workers.py b/earwigbot/wiki/copyvios/workers.py index ffe0b29..61ec917 100644 --- a/earwigbot/wiki/copyvios/workers.py +++ b/earwigbot/wiki/copyvios/workers.py @@ -96,29 +96,37 @@ class _CopyvioSource(object): self.confidence = 0.0 self.chains = (EMPTY, EMPTY_INTERSECTION) - self._event = Event() + self._event1 = Event() + self._event2 = Event() + self._event2.set() - def active(self): - """Return whether or not this source needs to be filled out.""" - return not self._event.is_set() + def touched(self): + """Return whether one of start_work() and cancel() have been called.""" + return self._event1.is_set() - def complete(self, confidence, source_chain, delta_chain): + def start_work(self): + """Mark this source as being worked on right now.""" + self._event2.clear() + self._event1.set() + + def finish_work(self, confidence, source_chain, delta_chain): """Complete the confidence information inside this source.""" self.confidence = confidence self.chains = (source_chain, delta_chain) - self._event.set() + self._event2.set() def cancel(self): """Deactivate this source without filling in the relevant data.""" - self._event.set() + self._event1.set() def join(self, until): """Block until this violation result is filled out.""" - if until: - timeout = until - time() - if timeout <= 0: - return - self._event.wait(timeout) + for event in [self._event1, self._event2]: + if until: + timeout = until - time() + if timeout <= 0: + return + event.wait(timeout) class _CopyvioQueues(object): @@ -231,12 +239,15 @@ class _CopyvioWorker(object): self._queue = None self._queues.lock.release() return self._dequeue() - self._queues.lock.release() self._logger.debug(u"Got source URL: {0}".format(source.url)) - if not source.active(): - self._logger.debug("Source is inactive") + if source.touched(): + self._logger.debug("Source has been cancelled") + self._queues.lock.release() return self._dequeue() + + source.start_work() + self._queues.lock.release() return source def _run(self): @@ -371,6 +382,8 @@ class CopyvioWorkspace(object): self._logger.debug("Waiting on {0} sources".format(len(self.sources))) for source in self.sources: source.join(self._until) + with self._compare_lock: + pass # Wait for any remaining comparisons to be finished if not _is_globalized: for i in xrange(self._num_workers): self._queues.unassigned.put((StopIteration, None)) @@ -379,10 +392,10 @@ class CopyvioWorkspace(object): """Compare a source to the article, and update the best known one.""" delta = MarkovChainIntersection(self._article, source_chain) conf = self._calculate_confidence(delta) - source.complete(conf, source_chain, delta) self._logger.debug(u"compare(): {0} -> {1}".format(source.url, conf)) with self._compare_lock: + source.finish_work(conf, source_chain, delta) if conf > self.best.confidence: self.best = source if conf >= self._min_confidence: