From 7137dda920bf0a8d1ee9b84b2102add6e60df15a Mon Sep 17 00:00:00 2001 From: Ben Kurtovic Date: Mon, 18 Aug 2014 05:07:34 -0400 Subject: [PATCH] Update copyvio checker to not make concurrent requests to a single domain. --- earwigbot/wiki/copyvios/__init__.py | 132 +++++++++++++++++++----------------- setup.py | 1 + 2 files changed, 71 insertions(+), 62 deletions(-) diff --git a/earwigbot/wiki/copyvios/__init__.py b/earwigbot/wiki/copyvios/__init__.py index 90673f1..08cf182 100644 --- a/earwigbot/wiki/copyvios/__init__.py +++ b/earwigbot/wiki/copyvios/__init__.py @@ -25,7 +25,7 @@ from gzip import GzipFile from Queue import Empty, Queue from socket import timeout from StringIO import StringIO -from threading import Lock, Thread +from threading import Lock, Semaphore, Thread from time import sleep, time from urllib2 import build_opener, URLError @@ -37,6 +37,7 @@ from earwigbot.wiki.copyvios.result import CopyvioCheckResult from earwigbot.wiki.copyvios.search import YahooBOSSSearchEngine oauth = importer.new("oauth2") +tldextract = importer.new("tldextract") __all__ = ["CopyvioMixIn"] @@ -46,47 +47,34 @@ class _CopyvioWorkspace(object): """Manages a single copyvio check distributed across threads.""" def __init__(self, article, min_confidence, until, logger, headers, - url_timeout=5): + url_timeout=5, max_concurrent_requests=6): self.best = _WorkingResult(None, 0.0, (EMPTY, EMPTY_INTERSECTION)) - self.until = until + self.request_semaphore = Semaphore(max_concurrent_requests) self._article = article - self._handled_urls = [] - self._headers = headers self._logger = logger.getChild("copyvios") self._min_confidence = min_confidence - self._queue = Queue() + self._handled_urls = [] + self._is_finished = False + self._enqueue_lock = Lock() self._result_lock = Lock() - self._url_timeout = url_timeout - self._workers = [] + + self._workers = {} + self._worker_args = (self, until, headers, url_timeout) def _calculate_confidence(self, delta): """Return the confidence of a violation as a float between 0 and 1.""" return float(delta.size()) / self._article.size() def _finish_early(self): - """Finish handling links prematurely, e.g. if we've met min confidence. - - This works by inserting an additional ``None`` into the queue (the - "exit" signal for workers) and then dequeueing until we reach the first - ``None``. This way, every worker will dequeue an exit signal signal on - their next turn. - """ - self._queue.put(None) - try: - while self._queue.get(block=False): - pass - except Empty: # Might happen if we haven't called wait() yet, but NBD. - pass - - def spawn(self, workers): - """Spawn *workers* number of worker threads.""" - for i in xrange(workers): - worker = _CopyvioWorker(self, self._headers, self._url_timeout) - thread = Thread(target=worker.run) - thread.daemon = True - thread.start() - self._workers.append(thread) + """Finish handling links prematurely (if we've hit min_confidence).""" + self._logger.debug("Confidence threshold met; clearing worker queues") + with self._enqueue_lock: + for worker in self._workers.itervalues(): + with worker.queue.mutex: + worker.queue.clear() + worker.queue.put(None) + self._is_finished = True def enqueue(self, urls, exclude_check=None): """Put a list of URLs into the worker queue. @@ -95,29 +83,48 @@ class _CopyvioWorkspace(object): returns ``True`` if we should skip it and ``False`` otherwise. """ for url in urls: - if url in self._handled_urls: - continue - self._handled_urls.append(url) - if exclude_check and exclude_check(url): - continue - self._queue.put(url) + with self._enqueue_lock: + if self._is_finished: + break + if url in self._handled_urls: + continue + self._handled_urls.append(url) + if exclude_check and exclude_check(url): + continue - def dequeue(self, max_time=None): - """Get an element from the worker queue, with an optional timeout.""" - return self._queue.get(timeout=max_time) + try: + key = tldextract.extract(url).registered_domain + except ImportError: # Fall back on very naive method + from urlparse import urlparse + key = u".".join(urlparse(url).netloc.split(".")[-2:]) + + logmsg = "enqueue(): {0} {1} -> {2}" + if key in self._workers: + self._logger.debug(logmsg.format("PUT", key, url)) + self._workers[key].queue.put(url) + else: + self._logger.debug(logmsg.format("NEW", key, url)) + worker = _CopyvioWorker(*self._worker_args) + worker.queue.put(url) + thread = Thread(target=worker.run) + thread.name = "cvworker-" + key.encode("utf8") + thread.daemon = True + thread.start() + self._workers[key] = worker def wait(self): """Wait for the workers to finish handling the queue.""" - for i in xrange(len(self._workers)): - self._queue.put(None) # Exit signal to workers - for worker in self._workers: + self._logger.debug("Waiting on {0} workers".format(len(self._workers))) + for worker in self._workers.itervalues(): + worker.queue.put(None) # Exit signal to workers + for worker in self._workers.itervalues(): worker.join() def compare(self, url, source): """Compare a source to the article, and update the working result.""" delta = MarkovChainIntersection(self._article, source) confidence = self._calculate_confidence(delta) - self._logger.debug("{0} -> {1}".format(url, confidence)) + self._logger.debug("compare(): {0} -> {1}".format(url, confidence)) with self._result_lock: if confidence > self.best.confidence: self.best = _WorkingResult(url, confidence, (source, delta)) @@ -128,8 +135,11 @@ class _CopyvioWorkspace(object): class _CopyvioWorker(object): """A multithreaded URL opener/parser instance.""" - def __init__(self, workspace, headers, url_timeout): + def __init__(self, workspace, until, headers, url_timeout): + self.queue = Queue() + self._workspace = workspace + self._until = until self._opener = build_opener() self._opener.addheaders = headers self._url_timeout = url_timeout @@ -146,11 +156,12 @@ class _CopyvioWorker(object): If a URLError was raised while opening the URL or an IOError was raised while decompressing, None will be returned. """ - try: - response = self._opener.open(url, timeout=self._url_timeout) - result = response.read() - except (URLError, timeout): - return None + with self._workspace.request_semaphore: + try: + response = self._opener.open(url, timeout=self._url_timeout) + result = response.read() + except (URLError, timeout): + return None if response.headers.get("Content-Encoding") == "gzip": stream = StringIO(result) @@ -177,17 +188,17 @@ class _CopyvioWorker(object): now empty. """ while True: - if self._workspace.until: - max_time = self._workspace.until - time() + if self._until: + max_time = self._until - time() if max_time <= 0: return try: - url = self._workspace.dequeue(max_time) + url = self.queue.get(timeout=max_time) except Empty: return else: - url = self._workspace.dequeue() - if url is None: # Exit signal from workspace.wait() + url = self.queue.get() + if url is None: # Exit signal return text = self._open_url(url.encode("utf8")) if text: @@ -237,8 +248,7 @@ class CopyvioMixIn(object): raise exceptions.UnknownSearchEngineError(engine) - def copyvio_check(self, min_confidence=0.5, max_queries=15, max_time=-1, - worker_threads=4): + def copyvio_check(self, min_confidence=0.5, max_queries=15, max_time=-1): """Check the page for copyright violations. Returns a :class:`.CopyvioCheckResult` object with information on the @@ -256,13 +266,12 @@ class CopyvioMixIn(object): if checks are called through a web server with timeouts. We will stop checking new URLs as soon as this limit is reached. - *worker_threads* is the number of threads we will spawn to handle URL - fetching and parsing simultaneously. Between 1 and 8 is recommended. - Raises :exc:`.CopyvioCheckError` or subclasses (:exc:`.UnknownSearchEngineError`, :exc:`.SearchQueryError`, ...) on errors. """ + log = "Starting copyvio check for [[{0}]]" + self._logger.info(log.format(self.title)) start_time = time() until = (start_time + max_time) if max_time > 0 else None searcher = self._get_search_engine() @@ -282,9 +291,7 @@ class CopyvioMixIn(object): self._logger.info(result.get_log_message(self.title)) return result - workspace.spawn(worker_threads) workspace.enqueue(parser.get_links(), exclude) - chunks = parser.chunk(self._search_config["nltk_dir"], max_queries) num_queries = 0 for chunk in chunks: @@ -325,13 +332,14 @@ class CopyvioMixIn(object): Since no searching is done, neither :exc:`.UnknownSearchEngineError` nor :exc:`.SearchQueryError` will be raised. """ + log = "Starting copyvio compare for [[{0}]] against {1}" + self._logger.info(log.format(self.title, url)) start_time = time() until = (start_time + max_time) if max_time > 0 else None article = MarkovChain(ArticleTextParser(self.get()).strip()) workspace = _CopyvioWorkspace(article, min_confidence, until, self._logger, self._addheaders, max_time) workspace.enqueue([url]) - workspace.spawn(1) workspace.wait() url, conf, chains = workspace.best result = CopyvioCheckResult(conf >= min_confidence, conf, url, 0, diff --git a/setup.py b/setup.py index c6c73dd..5efa620 100644 --- a/setup.py +++ b/setup.py @@ -43,6 +43,7 @@ extra_deps = { "lxml >= 2.3.5", # Faster parser for BeautifulSoup "nltk >= 2.0.2", # Parsing sentences to split article content "oauth2 >= 1.5.211", # Interfacing with Yahoo! BOSS Search + "tldextract >= 1.4", # Getting domains for the multithreaded workers ], "time": [ "pytz >= 2012d", # Handling timezones for the !time IRC command