From ae0c390ceb77280be03ce218938ce2fd2b0ea295 Mon Sep 17 00:00:00 2001 From: Ben Kurtovic Date: Sun, 20 Jul 2014 16:23:58 -0400 Subject: [PATCH] Redesign copyvio internals to parallelize URL loading/parsing. --- earwigbot/wiki/copyvios/__init__.py | 335 ++++++++++++++++++++++-------------- earwigbot/wiki/copyvios/markov.py | 7 +- earwigbot/wiki/copyvios/parsers.py | 11 +- earwigbot/wiki/copyvios/result.py | 7 + 4 files changed, 227 insertions(+), 133 deletions(-) diff --git a/earwigbot/wiki/copyvios/__init__.py b/earwigbot/wiki/copyvios/__init__.py index 6eac3d9..b12cf09 100644 --- a/earwigbot/wiki/copyvios/__init__.py +++ b/earwigbot/wiki/copyvios/__init__.py @@ -20,14 +20,18 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. +from collections import namedtuple from gzip import GzipFile +from Queue import Empty, Queue from socket import timeout from StringIO import StringIO +from threading import Lock, Thread from time import sleep, time from urllib2 import build_opener, URLError from earwigbot import exceptions, importer -from earwigbot.wiki.copyvios.markov import MarkovChain, MarkovChainIntersection +from earwigbot.wiki.copyvios.markov import ( + EMPTY, EMPTY_INTERSECTION, MarkovChain, MarkovChainIntersection) from earwigbot.wiki.copyvios.parsers import ArticleTextParser, HTMLTextParser from earwigbot.wiki.copyvios.result import CopyvioCheckResult from earwigbot.wiki.copyvios.search import YahooBOSSSearchEngine @@ -36,26 +40,98 @@ oauth = importer.new("oauth2") __all__ = ["CopyvioMixIn"] -class CopyvioMixIn(object): - """ - **EarwigBot: Wiki Toolset: Copyright Violation MixIn** +_WorkingResult = namedtuple("_WorkingResult", ["url", "confidence", "chains"]) - This is a mixin that provides two public methods, :py:meth:`copyvio_check` - and :py:meth:`copyvio_compare`. The former checks the page for copyright - violations using a search engine API, and the latter compares the page - against a given URL. Credentials for the search engine API are stored in - the :py:class:`~earwigbot.wiki.site.Site`'s config. - """ - EMPTY = MarkovChain("") - EMPTY_INTERSECTION = MarkovChainIntersection(EMPTY, EMPTY) +class _CopyvioWorkspace(object): + """Manages a single copyvio check distributed across threads.""" - def __init__(self, site): - self._search_config = site._search_config - self._exclusions_db = self._search_config.get("exclusions_db") + def __init__(self, article, min_confidence, until, headers, url_timeout=5): + self.best = _WorkingResult(None, 0.0, (EMPTY, EMPTY_INTERSECTION)) + self.until = until + + self._article = article + self._handled_urls = [] + self._headers = headers + self._min_confidence = min_confidence + self._queue = Queue() + self._result_lock = Lock() + self._url_timeout = url_timeout + self._workers = [] + + def _calculate_confidence(self, delta): + """Return the confidence of a violation as a float between 0 and 1.""" + return float(delta.size()) / self._article.size() + + def _finish_early(self): + """Finish handling links prematurely, e.g. if we've met min confidence. + + This works by inserting an additional ``None`` into the queue (the + "exit" signal for workers) and then dequeueing until we reach the first + ``None``. This way, every worker will dequeue an exit signal signal on + their next turn. + """ + self._queue.put(None) + try: + while self._queue.get(block=False): + pass + except Empty: # Might happen if we haven't called wait() yet, but NBD. + pass + + def spawn(self, workers): + """Spawn *workers* number of worker threads.""" + for i in xrange(workers): + worker = _CopyvioWorker(self, self._headers, self._url_timeout) + thread = Thread(target=worker.run) + thread.daemon = True + thread.start() + self._workers.append(thread) + + def enqueue(self, urls, exclude_check=None): + """Put a list of URLs into the worker queue. + + *exclude_check* is an optional exclusion function that takes a URL and + returns ``True`` if we should skip it and ``False`` otherwise. + """ + for url in urls: + if url in self._handled_urls: + continue + self._handled_urls.append(url) + if exclude_check and exclude_check(url): + continue + self._queue.put(url) + + def dequeue(self, max_time=None): + """Get an element from the worker queue, with an optional timeout.""" + return self._queue.get(timeout=max_time) + + def wait(self): + """Wait for the workers to finish handling the queue.""" + for i in xrange(len(self._workers)): + self._queue.put(None) # Exit signal to workers + for worker in self._workers: + worker.join() + + def compare(self, url, source): + """Compare a source to the article, and update the working result.""" + delta = MarkovChainIntersection(self._article, source) + confidence = self._calculate_confidence(delta) + with self._result_lock: + if confidence > self.best.confidence: + self.best = _WorkingResult(url, confidence, (source, delta)) + if confidence >= self._min_confidence: + self._finish_early() + + +class _CopyvioWorker(object): + """A multithreaded URL opener/parser instance.""" + + def __init__(self, workspace, headers, url_timeout): + self._workspace = workspace self._opener = build_opener() - self._opener.addheaders = site._opener.addheaders + self._opener.addheaders = headers + self._url_timeout = url_timeout - def _open_url_ignoring_errors(self, url, max_time=5): + def _open_url(self, url): """Open a URL and return its parsed content, or None. First, we will decompress the content if the headers contain "gzip" as @@ -68,7 +144,7 @@ class CopyvioMixIn(object): while decompressing, None will be returned. """ try: - response = self._opener.open(url.encode("utf8"), timeout=max_time) + response = self._opener.open(url, timeout=self._url_timeout) result = response.read() except (URLError, timeout): return None @@ -90,7 +166,48 @@ class CopyvioMixIn(object): else: return None - def _select_search_engine(self): + def run(self): + """Main entry point for the worker. + + We will keep fetching URLs from the queue and handling them until + either we run out of time, or we get an exit signal that the queue is + now empty. + """ + while True: + if self._workspace.until: + max_time = self._workspace.until - time() + if max_time <= 0: + return + try: + url = self._workspace.dequeue(timeout=max_time) + except Empty: + return + else: + url = self._workspace.dequeue() + if url is None: # Exit signal from workspace.wait() + return + text = self._open_url(url.encode("utf8")) + if text: + self._workspace.compare(url, MarkovChain(text)) + + +class CopyvioMixIn(object): + """ + **EarwigBot: Wiki Toolset: Copyright Violation MixIn** + + This is a mixin that provides two public methods, :py:meth:`copyvio_check` + and :py:meth:`copyvio_compare`. The former checks the page for copyright + violations using a search engine API, and the latter compares the page + against a given URL. Credentials for the search engine API are stored in + the :py:class:`~earwigbot.wiki.site.Site`'s config. + """ + + def __init__(self, site): + self._search_config = site._search_config + self._exclusions_db = self._search_config.get("exclusions_db") + self._addheaders = site._opener.addheaders + + def _get_search_engine(self): """Return a function that can be called to do web searches. The function takes one argument, a search query, and returns a list of @@ -111,152 +228,108 @@ class CopyvioMixIn(object): except ImportError: e = "Yahoo! BOSS requires the 'oauth2' package: https://github.com/simplegeo/python-oauth2" raise exceptions.UnsupportedSearchEngineError(e) - return YahooBOSSSearchEngine(credentials, self._opener) + opener = build_opener() + opener.addheaders = self._addheaders + return YahooBOSSSearchEngine(credentials, opener) raise exceptions.UnknownSearchEngineError(engine) - def _copyvio_compare_content(self, article, url, max_time=5): - """Return a number comparing an article and a URL. - - The *article* is a Markov chain, whereas the *url* is just a string - that we'll try to open and read ourselves. - """ - text = self._open_url_ignoring_errors(url, max_time) - if not text: - return 0, (self.EMPTY, self.EMPTY_INTERSECTION) - - source = MarkovChain(text) - delta = MarkovChainIntersection(article, source) - return float(delta.size()) / article.size(), (source, delta) - - def copyvio_check(self, min_confidence=0.5, max_queries=-1, max_time=-1, - interquery_sleep=1): + def copyvio_check(self, min_confidence=0.5, max_queries=15, max_time=-1, + worker_threads=4): """Check the page for copyright violations. - Returns a - :py:class:`~earwigbot.wiki.copyvios.result.CopyvioCheckResult` object - with information on the results of the check. + Returns a :class:`.CopyvioCheckResult` object with information on the + results of the check. + + *min_confidence* is the minimum amount of confidence we must have in + the similarity between a source text and the article in order for us to + consider it a suspected violation. This is a number between 0 and 1. *max_queries* is self-explanatory; we will never make more than this - number of queries in a given check. If it's lower than 0, we will not - limit the number of queries. + number of queries in a given check. *max_time* can be set to prevent copyvio checks from taking longer than a set amount of time (generally around a minute), which can be useful if checks are called through a web server with timeouts. We will stop checking new URLs as soon as this limit is reached. - *interquery_sleep* is the minimum amount of time we will sleep between - search engine queries, in seconds. + *worker_threads* is the number of threads we will spawn to handle URL + fetching and parsing simultaneously. Between 1 and 8 is recommended. - Raises :py:exc:`~earwigbot.exceptions.CopyvioCheckError` or subclasses - (:py:exc:`~earwigbot.exceptions.UnknownSearchEngineError`, - :py:exc:`~earwigbot.exceptions.SearchQueryError`, ...) on errors. + Raises :exc:`.CopyvioCheckError` or subclasses + (:exc:`.UnknownSearchEngineError`, :exc:`.SearchQueryError`, ...) on + errors. """ start_time = time() - searcher = self._select_search_engine() + until = (start_time + max_time) if max_time > 0 else None + searcher = self._get_search_engine() + parser = ArticleTextParser(self.get()) + article = MarkovChain(parser.strip()) + workspace = _CopyvioWorkspace(article, min_confidence, + until, self._addheaders) if self._exclusions_db: self._exclusions_db.sync(self.site.name) - handled_urls = [] - best_confidence = 0 - best_match = None - num_queries = 0 - best_chains = (self.EMPTY, self.EMPTY_INTERSECTION) - parser = ArticleTextParser(self.get()) - clean = parser.strip() - chunks = parser.chunk(self._search_config["nltk_dir"], max_queries) - article_chain = MarkovChain(clean) - last_query = time() + exclude = lambda u: self._exclusions_db.check(self.site.name, u) + else: + exclude = None + + if article.size() < 20: # Auto-fail very small articles + result = CopyvioCheckResult(False, 0.0, None, 0, 0, article, + workspace.best.chains) + self._logger.debug(result.get_log_message(self.title)) + return result - if article_chain.size() < 20: # Auto-fail very small articles - return CopyvioCheckResult(False, best_confidence, best_match, - num_queries, 0, article_chain, - best_chains) + workspace.spawn(worker_threads) + workspace.enqueue(parser.get_links(), exclude) - while (chunks and best_confidence < min_confidence and - (max_queries < 0 or num_queries < max_queries)): - chunk = chunks.pop(0) + chunks = parser.chunk(self._search_config["nltk_dir"], max_queries) + for chunk in chunks: + if workspace.best.confidence >= min_confidence: + break log = u"[[{0}]] -> querying {1} for {2!r}" self._logger.debug(log.format(self.title, searcher.name, chunk)) - urls = searcher.search(chunk) - urls = [url for url in urls if url not in handled_urls] - for url in urls: - handled_urls.append(url) - if self._exclusions_db: - if self._exclusions_db.check(self.site.name, url): - continue - conf, chns = self._copyvio_compare_content(article_chain, url) - if conf > best_confidence: - best_confidence = conf - best_match = url - best_chains = chns - if time() - start_time > max_time: - break - num_queries += 1 - if time() - start_time > max_time: - break - diff = time() - last_query - if diff < interquery_sleep: - sleep(interquery_sleep - diff) - last_query = time() - - ctime = time() - start_time - if best_confidence >= min_confidence: - is_violation = True - log = u"Violation detected for [[{0}]] (confidence: {1}; URL: {2}; using {3} queries in {4} seconds)" - self._logger.debug(log.format(self.title, best_confidence, - best_match, num_queries, ctime)) - else: - is_violation = False - log = u"No violation for [[{0}]] (confidence: {1}; using {2} queries in {3} seconds)" - self._logger.debug(log.format(self.title, best_confidence, - num_queries, ctime)) + workspace.enqueue(searcher.search(chunk), exclude) + sleep(1) - return CopyvioCheckResult(is_violation, best_confidence, best_match, - num_queries, ctime, article_chain, - best_chains) + workspace.wait() + result = CopyvioCheckResult( + workspace.best.confidence >= min_confidence, + workspace.best.confidence, workspace.best.url, len(chunks), + time() - start_time, article, workspace.best.chains) + self._logger.debug(result.get_log_message(self.title)) + return result def copyvio_compare(self, url, min_confidence=0.5, max_time=30): """Check the page like :py:meth:`copyvio_check` against a specific URL. - This is essentially a reduced version of the above - a copyivo - comparison is made using Markov chains and the result is returned in a - :py:class:`~earwigbot.wiki.copyvios.result.CopyvioCheckResult` object - - but without using a search engine, since the suspected "violated" URL - is supplied from the start. + This is essentially a reduced version of :meth:`copyvio_check` - a + copyivo comparison is made using Markov chains and the result is + returned in a :class:`.CopyvioCheckResult` object - but without using a + search engine, since the suspected "violated" URL is supplied from the + start. Its primary use is to generate a result when the URL is retrieved from - a cache, like the one used in EarwigBot's Toolserver site. After a - search is done, the resulting URL is stored in a cache for 24 hours so + a cache, like the one used in EarwigBot's Tool Labs site. After a + search is done, the resulting URL is stored in a cache for 72 hours so future checks against that page will not require another set of time-and-money-consuming search engine queries. However, the comparison itself (which includes the article's and the source's content) cannot be stored for data retention reasons, so a fresh comparison is made using this function. - Since no searching is done, neither - :py:exc:`~earwigbot.exceptions.UnknownSearchEngineError` nor - :py:exc:`~earwigbot.exceptions.SearchQueryError` will be raised. + Since no searching is done, neither :exc:`.UnknownSearchEngineError` + nor :exc:`.SearchQueryError` will be raised. """ start_time = time() - content = self.get() - clean = ArticleTextParser(content).strip() - article_chain = MarkovChain(clean) - - if not url: - chns = (self.EMPTY, self.EMPTY_INTERSECTION) - return CopyvioCheckResult(False, 0, url, 0, 0, article_chain, chns) - - confidence, chains = self._copyvio_compare_content(article_chain, url, max_time) - ctime = time() - start_time - if confidence >= min_confidence: - is_violation = True - log = u"Violation detected for [[{0}]] (confidence: {1}; URL: {2}; {3} seconds)" - self._logger.debug(log.format(self.title, confidence, url, ctime)) - else: - is_violation = False - log = u"No violation for [[{0}]] (confidence: {1}; URL: {2}; {3} seconds)" - self._logger.debug(log.format(self.title, confidence, url, ctime)) - - return CopyvioCheckResult(is_violation, confidence, url, 0, ctime, - article_chain, chains) + until = (start_time + max_time) if max_time > 0 else None + article = MarkovChain(ArticleTextParser(self.get()).strip()) + workspace = _CopyvioWorkspace(article, min_confidence, + until, self._addheaders) + workspace.enqueue([url]) + workspace.spawn(1) + workspace.wait() + url, conf, chains = workspace.best + result = CopyvioCheckResult(conf >= min_confidence, conf, url, 0, + time() - start_time, article, chains) + self._logger.debug(result.get_log_message(self.title)) + return result diff --git a/earwigbot/wiki/copyvios/markov.py b/earwigbot/wiki/copyvios/markov.py index 065d6f1..c30e6ad 100644 --- a/earwigbot/wiki/copyvios/markov.py +++ b/earwigbot/wiki/copyvios/markov.py @@ -23,7 +23,8 @@ from collections import defaultdict from re import sub, UNICODE -__all__ = ["MarkovChain", "MarkovChainIntersection"] +__all__ = ["EMPTY", "EMPTY_INTERSECTION", "MarkovChain", + "MarkovChainIntersection"] class MarkovChain(object): """Implements a basic ngram Markov chain of words.""" @@ -85,3 +86,7 @@ class MarkovChainIntersection(MarkovChain): """Return a nice string representation of the intersection.""" res = "" return res.format(self.size(), self.mc1, self.mc2) + + +EMPTY = MarkovChain("") +EMPTY_INTERSECTION = MarkovChainIntersection(EMPTY, EMPTY) diff --git a/earwigbot/wiki/copyvios/parsers.py b/earwigbot/wiki/copyvios/parsers.py index 89e1e8f..9083158 100644 --- a/earwigbot/wiki/copyvios/parsers.py +++ b/earwigbot/wiki/copyvios/parsers.py @@ -124,9 +124,18 @@ class ArticleTextParser(BaseTextParser): else: chunk = sentences.pop(3 * len(sentences) / 4) # Pop from Q3 chunks.append(chunk) - return chunks + def get_links(self): + """Return a list of all external links in the article. + + The list is restricted to things that we suspect we can parse: i.e., + those with schemes of ``http`` and ``https``. + """ + schemes = ("http://", "https://") + links = mwparserfromhell.parse(self.text).ifilter_external_links() + return [link.url for link in links if link.url.startswith(schemes)] + class HTMLTextParser(BaseTextParser): """A parser that can extract the text from an HTML document.""" diff --git a/earwigbot/wiki/copyvios/result.py b/earwigbot/wiki/copyvios/result.py index 521c810..cad86bc 100644 --- a/earwigbot/wiki/copyvios/result.py +++ b/earwigbot/wiki/copyvios/result.py @@ -61,3 +61,10 @@ class CopyvioCheckResult(object): """Return a nice string representation of the result.""" res = "" return res.format(self.violation, self.confidence) + + def get_log_message(self, title): + """Build a relevant log message for this copyvio check result.""" + log = u"{0} for [[{1}]] (confidence: {2}; URL: {3}; {4} queries; {5} seconds)" + is_vio = "Violation detected" if self.violation else "No violation" + return log.format(is_vio, title, self.confidence, self.url, + self.queries, self.time)