diff --git a/earwigbot/wiki/copyvios/__init__.py b/earwigbot/wiki/copyvios/__init__.py index 958a7ad..5265a3e 100644 --- a/earwigbot/wiki/copyvios/__init__.py +++ b/earwigbot/wiki/copyvios/__init__.py @@ -20,246 +20,20 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. -from collections import namedtuple -from gzip import GzipFile -from math import log -from Queue import Empty, Queue -from socket import error -from StringIO import StringIO -from threading import Lock, Semaphore, Thread from time import sleep, time -from urllib2 import build_opener, URLError +from urllib2 import build_opener from earwigbot import exceptions, importer -from earwigbot.wiki.copyvios.markov import ( - EMPTY, EMPTY_INTERSECTION, MarkovChain, MarkovChainIntersection) -from earwigbot.wiki.copyvios.parsers import ArticleTextParser, HTMLTextParser +from earwigbot.wiki.copyvios.markov import MarkovChain +from earwigbot.wiki.copyvios.parsers import ArticleTextParser from earwigbot.wiki.copyvios.result import CopyvioCheckResult from earwigbot.wiki.copyvios.search import YahooBOSSSearchEngine +from earwigbot.wiki.copyvios.workers import ( + globalize, localize, CopyvioWorkspace) oauth = importer.new("oauth2") -tldextract = importer.new("tldextract") - -__all__ = ["CopyvioMixIn"] - -_WorkingResult = namedtuple("_WorkingResult", ["url", "confidence", "chains"]) - -class _CopyvioWorkspace(object): - """Manages a single copyvio check distributed across threads.""" - - def __init__(self, article, min_confidence, until, logger, headers, - url_timeout=5, max_concurrent_requests=6): - self.best = _WorkingResult(None, 0.0, (EMPTY, EMPTY_INTERSECTION)) - self.request_semaphore = Semaphore(max_concurrent_requests) - - self._article = article - self._logger = logger.getChild("copyvios") - self._min_confidence = min_confidence - self._handled_urls = [] - self._is_finished = False - self._enqueue_lock = Lock() - self._result_lock = Lock() - - self._workers = {} - self._worker_args = (self, until, headers, url_timeout) - - def _calculate_confidence(self, delta): - """Return the confidence of a violation as a float between 0 and 1.""" - def conf_with_article_and_delta(article, delta): - """Calculate confidence using the article and delta chain sizes.""" - # This piecewise function, C_AΔ(Δ), was defined such that - # confidence exhibits exponential growth until it reaches the - # default "suspect" confidence threshold, at which point it - # transitions to polynomial growth with lim (A/Δ)→1 C_AΔ(A,Δ) = 1. - # A graph can be viewed here: - # http://benkurtovic.com/static/article-delta_confidence_function.pdf - ratio = delta / article - if ratio <= 0.52763: - return log(1 / (1 - ratio)) - else: - return (-0.8939 * (ratio ** 2)) + (1.8948 * ratio) - 0.0009 - - def conf_with_delta(delta): - """Calculate confidence using just the delta chain size.""" - # This piecewise function, C_Δ(Δ), was derived from experimental - # data using reference points at (0, 0), (100, 0.5), (250, 0.75), - # (500, 0.9), and (1000, 0.95) with lim Δ→+∞ C_Δ(Δ) = 1. - # A graph can be viewed here: - # http://benkurtovic.com/static/delta_confidence_function.pdf - if delta <= 100: - return delta / (delta + 100) - elif delta <= 250: - return (delta - 25) / (delta + 50) - elif delta <= 500: - return (10.5 * delta - 750) / (10 * delta) - else: - return (delta - 50) / delta - - d_size = float(delta.size) - return max(conf_with_article_and_delta(self._article.size, d_size), - conf_with_delta(d_size)) - - def _finish_early(self): - """Finish handling links prematurely (if we've hit min_confidence).""" - self._logger.debug("Confidence threshold met; clearing worker queues") - with self._enqueue_lock: - for worker in self._workers.itervalues(): - with worker.queue.mutex: - worker.queue.queue.clear() - worker.queue.queue.append(None) - self._is_finished = True - - def enqueue(self, urls, exclude_check=None): - """Put a list of URLs into the worker queue. - - *exclude_check* is an optional exclusion function that takes a URL and - returns ``True`` if we should skip it and ``False`` otherwise. - """ - for url in urls: - with self._enqueue_lock: - if self._is_finished: - break - if url in self._handled_urls: - continue - self._handled_urls.append(url) - if exclude_check and exclude_check(url): - continue - - try: - key = tldextract.extract(url).registered_domain - except ImportError: # Fall back on very naive method - from urlparse import urlparse - key = u".".join(urlparse(url).netloc.split(".")[-2:]) - - logmsg = u"enqueue(): {0} {1} -> {2}" - if key in self._workers: - self._logger.debug(logmsg.format("PUT", key, url)) - self._workers[key].queue.put(url) - else: - self._logger.debug(logmsg.format("NEW", key, url)) - worker = _CopyvioWorker(*self._worker_args) - worker.queue.put(url) - worker.start(key.encode("utf8")) - self._workers[key] = worker - - def wait(self): - """Wait for the workers to finish handling the queue.""" - self._logger.debug("Waiting on {0} workers".format(len(self._workers))) - for worker in self._workers.itervalues(): - worker.queue.put(None) # Exit signal to workers - for worker in self._workers.itervalues(): - worker.join() - - def compare(self, url, source): - """Compare a source to the article, and update the working result.""" - delta = MarkovChainIntersection(self._article, source) - confidence = self._calculate_confidence(delta) - self._logger.debug(u"compare(): {0} -> {1}".format(url, confidence)) - with self._result_lock: - if confidence > self.best.confidence: - self.best = _WorkingResult(url, confidence, (source, delta)) - if confidence >= self._min_confidence: - self._finish_early() - - -class _CopyvioWorker(object): - """A multithreaded URL opener/parser instance.""" - - def __init__(self, workspace, until, headers, url_timeout): - self.queue = Queue() - - self._thread = None - self._workspace = workspace - self._until = until - self._opener = build_opener() - self._opener.addheaders = headers - self._url_timeout = url_timeout - - def _open_url(self, url): - """Open a URL and return its parsed content, or None. - - First, we will decompress the content if the headers contain "gzip" as - its content encoding. Then, we will return the content stripped using - an HTML parser if the headers indicate it is HTML, or return the - content directly if it is plain text. If we don't understand the - content type, we'll return None. - - If a URLError was raised while opening the URL or an IOError was raised - while decompressing, None will be returned. - """ - with self._workspace.request_semaphore: - try: - response = self._opener.open(url, timeout=self._url_timeout) - except (URLError, error): - return None - - try: - size = int(response.headers.get("Content-Length", 0)) - except ValueError: - return None - if size > 1024 ** 2: # Ignore URLs larger than a megabyte - return None - - ctype_full = response.headers.get("Content-Type", "text/plain") - ctype = ctype_full.split(";", 1)[0] - if ctype in ["text/html", "application/xhtml+xml"]: - handler = lambda res: HTMLTextParser(res).strip() - elif ctype == "text/plain": - handler = lambda res: res.strip() - else: - return None - - with self._workspace.request_semaphore: - try: - content = response.read() - except (URLError, error): - return None - - if response.headers.get("Content-Encoding") == "gzip": - stream = StringIO(content) - gzipper = GzipFile(fileobj=stream) - try: - content = gzipper.read(2 * 1024 ** 2) - except IOError: - return None - - return handler(content) - - def _run(self): - """Main entry point for the worker thread. - - We will keep fetching URLs from the queue and handling them until - either we run out of time, or we get an exit signal that the queue is - now empty. - """ - while True: - if self._until: - max_time = self._until - time() - if max_time <= 0: - return - try: - url = self.queue.get(timeout=max_time) - except Empty: - return - else: - url = self.queue.get() - if url is None: # Exit signal - return - text = self._open_url(url.encode("utf8")) - if text: - self._workspace.compare(url, MarkovChain(text)) - - def start(self, name): - """Start the worker in a new thread, with a given name.""" - self._thread = thread = Thread(target=self._run) - thread.name = "cvworker-" + name - thread.daemon = True - thread.start() - - def join(self): - """Join to the worker thread, blocking until it finishes.""" - self._thread.join() +__all__ = ["CopyvioMixIn", "globalize", "localize"] class CopyvioMixIn(object): """ @@ -333,8 +107,8 @@ class CopyvioMixIn(object): searcher = self._get_search_engine() parser = ArticleTextParser(self.get()) article = MarkovChain(parser.strip()) - workspace = _CopyvioWorkspace(article, min_confidence, until, - self._logger, self._addheaders) + workspace = CopyvioWorkspace(article, min_confidence, until, + self._logger, self._addheaders) if self._exclusions_db: self._exclusions_db.sync(self.site.name) exclude = lambda u: self._exclusions_db.check(self.site.name, u) @@ -393,8 +167,8 @@ class CopyvioMixIn(object): start_time = time() until = (start_time + max_time) if max_time > 0 else None article = MarkovChain(ArticleTextParser(self.get()).strip()) - workspace = _CopyvioWorkspace(article, min_confidence, until, - self._logger, self._addheaders, max_time) + workspace = CopyvioWorkspace(article, min_confidence, until, + self._logger, self._addheaders, max_time) workspace.enqueue([url]) workspace.wait() url, conf, chains = workspace.best diff --git a/earwigbot/wiki/copyvios/workers.py b/earwigbot/wiki/copyvios/workers.py new file mode 100644 index 0000000..8eef335 --- /dev/null +++ b/earwigbot/wiki/copyvios/workers.py @@ -0,0 +1,374 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2009-2014 Ben Kurtovic +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +from gzip import GzipFile +from math import log +from Queue import Empty, Queue +from socket import error +from StringIO import StringIO +from threading import Event, Lock, Thread +from time import time +from urllib2 import build_opener, URLError + +from earwigbot import importer +from earwigbot.wiki.copyvios.markov import ( + EMPTY, EMPTY_INTERSECTION, MarkovChain, MarkovChainIntersection) +from earwigbot.wiki.copyvios.parsers import HTMLTextParser + +tldextract = importer.new("tldextract") + +__all__ = ["globalize", "localize", "CopyvioWorkspace"] + +_is_globalized = False +_global_workers = [] +_global_queues = None + +def globalize(num_workers=8): + """Cause all copyvio checks to be done by one global set of workers. + + This is useful when checks are being done through a web interface where + large numbers of simulatenous requests could be problematic. The global + workers are spawned when the function is called, run continuously, and + intelligently handle multiple checks. + + This function is not thread-safe and should only be called when no checks + are being done. It has no effect if it has already been called. + """ + global _is_globalized, _global_queues + if _is_globalized: + return + + _global_queues = _CopyvioQueues() + for i in xrange(num_workers): + worker = _CopyvioWorker(_global_queues) + worker.start("global-{0}".format(i)) + _global_workers.append(worker) + _is_globalized = True + +def localize(): + """Return to using page-specific workers for copyvio checks. + + This disables changes made by :func:`globalize`, including stoping the + global worker threads. + + This function is not thread-safe and should only be called when no checks + are being done. + """ + global _is_globalized, _global_queues + if not _is_globalized: + return + + for i in xrange(len(_global_workers)): + _global_queues.unassigned.put((StopIteration, None)) + _global_queues = None + _global_workers = None + _is_globalized = False + + +class _CopyvioSource(object): + """Represents a single suspected violation source (a URL).""" + + def __init__(self, workspace, url, key, headers=None, timeout=5): + self.url = url + self.key = key + self.headers = headers + self.timeout = timeout + self.confidence = 0.0 + self.chains = (EMPTY, EMPTY_INTERSECTION) + + self._workspace = workspace + self._event = Event() + + def active(self): + """Return whether or not this source needs to be filled out.""" + return not self._event.is_set() + + def complete(self, confidence, source_chain, delta_chain): + """Complete the confidence information inside this source.""" + self.confidence = confidence + self.chains = (source_chain, delta_chain) + self._event.set() + + def cancel(self): + """Deactivate this source without filling in the relevant data.""" + self._event.set() + + def join(self, until): + """Block until this violation result is filled out.""" + if until: + timeout = until - time() + if timeout <= 0: + return + self._event.wait(timeout) + + +class _CopyvioQueues(object): + """Stores data necessary to maintain the various queues during a check.""" + + def __init__(self): + self.lock = Lock() + self.sites = {} + self.unassigned = Queue() + + +class _CopyvioWorker(object): + """A multithreaded URL opener/parser instance.""" + + def __init__(self, queues, until=None): + self._queues = queues + self._until = until + + self._thread = None + self._site = None + self._queue = None + self._opener = build_opener() + + def _open_url(self, source): + """Open a URL and return its parsed content, or None. + + First, we will decompress the content if the headers contain "gzip" as + its content encoding. Then, we will return the content stripped using + an HTML parser if the headers indicate it is HTML, or return the + content directly if it is plain text. If we don't understand the + content type, we'll return None. + + If a URLError was raised while opening the URL or an IOError was raised + while decompressing, None will be returned. + """ + self._opener.addheaders = source.headers + url = source.url.encode("utf8") + try: + response = self._opener.open(url, timeout=source.timeout) + except (URLError, error): + return None + + try: + size = int(response.headers.get("Content-Length", 0)) + except ValueError: + return None + if size > 1024 ** 2: # Ignore URLs larger than a megabyte + return None + + ctype_full = response.headers.get("Content-Type", "text/plain") + ctype = ctype_full.split(";", 1)[0] + if ctype in ["text/html", "application/xhtml+xml"]: + handler = lambda res: HTMLTextParser(res).strip() + elif ctype == "text/plain": + handler = lambda res: res.strip() + else: + return None + + try: + content = response.read() + except (URLError, error): + return None + + if response.headers.get("Content-Encoding") == "gzip": + stream = StringIO(content) + gzipper = GzipFile(fileobj=stream) + try: + content = gzipper.read(2 * 1024 ** 2) + except IOError: + return None + + return handler(content) + + def _dequeue(self): + """Remove a source from one of the queues.""" + if self._until: + timeout = self._until - time() + if timeout <= 0: + return + else: + timeout = None + + with self._queues.lock: + if self._queue: + source = self._queue.get(timeout=timeout) + if self._queue.empty(): + del self._queues.sites[self._site] + self._queue = None + else: + site, queue = self._queues.unassigned.get(timeout=timeout) + if site is StopIteration: + return StopIteration + source = queue.get_nowait() + if queue.empty(): + del self._queues.sites[site] + else: + self._site = site + self._queue = queue + if not source.active(): + return self._dequeue() + return source + + def _run(self): + """Main entry point for the worker thread. + + We will keep fetching URLs from the queues and handling them until + either we run out of time, or we get an exit signal that the queue is + now empty. + """ + while True: + try: + source = self._dequeue() + except Empty: + return + if source is StopIteration: + return + text = self._open_url(source) + if text: + source.workspace.compare(source, MarkovChain(text)) + + def start(self, name): + """Start the worker in a new thread, with a given name.""" + self._thread = thread = Thread(target=self._run) + thread.name = "cvworker-" + name + thread.daemon = True + thread.start() + + +class CopyvioWorkspace(object): + """Manages a single copyvio check distributed across threads.""" + + def __init__(self, article, min_confidence, until, logger, headers, + url_timeout=5, num_workers=8): + self.best = _CopyvioSource(self, None) + self.sources = [] + + self._article = article + self._logger = logger.getChild("copyvios") + self._min_confidence = min_confidence + self._until = until + self._handled_urls = [] + self._is_finished = False + self._compare_lock = Lock() + self._source_args = {"workspace": self, "headers": headers, + "timeout": url_timeout) + + if _is_globalized: + self._queues = _global_queues + self._workers = _global_workers + else: + self._queues = _CopyvioQueues() + for i in xrange(num_workers): + worker = _CopyvioWorker(self._queues, until) + worker.start("local-{0:04}.{1}".format(id(self) % 10000, i)) + self._workers.append(worker) + + def _calculate_confidence(self, delta): + """Return the confidence of a violation as a float between 0 and 1.""" + def conf_with_article_and_delta(article, delta): + """Calculate confidence using the article and delta chain sizes.""" + # This piecewise function, C_AΔ(Δ), was defined such that + # confidence exhibits exponential growth until it reaches the + # default "suspect" confidence threshold, at which point it + # transitions to polynomial growth with lim (A/Δ)→1 C_AΔ(A,Δ) = 1. + # A graph can be viewed here: + # http://benkurtovic.com/static/article-delta_confidence_function.pdf + ratio = delta / article + if ratio <= 0.52763: + return log(1 / (1 - ratio)) + else: + return (-0.8939 * (ratio ** 2)) + (1.8948 * ratio) - 0.0009 + + def conf_with_delta(delta): + """Calculate confidence using just the delta chain size.""" + # This piecewise function, C_Δ(Δ), was derived from experimental + # data using reference points at (0, 0), (100, 0.5), (250, 0.75), + # (500, 0.9), and (1000, 0.95) with lim Δ→+∞ C_Δ(Δ) = 1. + # A graph can be viewed here: + # http://benkurtovic.com/static/delta_confidence_function.pdf + if delta <= 100: + return delta / (delta + 100) + elif delta <= 250: + return (delta - 25) / (delta + 50) + elif delta <= 500: + return (10.5 * delta - 750) / (10 * delta) + else: + return (delta - 50) / delta + + d_size = float(delta.size) + return max(conf_with_article_and_delta(self._article.size, d_size), + conf_with_delta(d_size)) + + def _finish_early(self): + """Finish handling links prematurely (if we've hit min_confidence).""" + if self._is_finished: + return + self._logger.debug("Confidence threshold met; cancelling remaining sources") + with self._queues.lock: + for source in self.sources: + source.cancel() + self._is_finished = True + + def enqueue(self, urls, exclude_check=None): + """Put a list of URLs into the various worker queues. + + *exclude_check* is an optional exclusion function that takes a URL and + returns ``True`` if we should skip it and ``False`` otherwise. + """ + for url in urls: + if self._is_finished: + break + if url in self._handled_urls: + continue + self._handled_urls.append(url) + if exclude_check and exclude_check(url): + continue + + try: + key = tldextract.extract(url).registered_domain + except ImportError: # Fall back on very naive method + from urlparse import urlparse + key = u".".join(urlparse(url).netloc.split(".")[-2:]) + + source = _CopyvioSource(url=url, key=key, **self._source_args) + logmsg = u"enqueue(): {0} {1} -> {2}" + with self._queues.lock: + if key in self._queues.sites: + self._logger.debug(logmsg.format("append", key, url)) + self._queues.sites[key].put(source) + else: + self._logger.debug(logmsg.format("new", key, url)) + self._queues.sites[key] = queue = Queue() + queue.put(source) + self._queues.unassigned.put((key, queue)) + + def wait(self): + """Wait for the workers to finish handling the sources.""" + self._logger.debug("Waiting on {0} sources".format(len(self.sources))) + for source in self.sources: + source.join(self._until) + + def compare(self, source, source_chain): + """Compare a source to the article, and update the best known one.""" + delta = MarkovChainIntersection(self._article, source_chain) + conf = self._calculate_confidence(delta) + source.complete(conf, source_chain, delta) + self._logger.debug(u"compare(): {0} -> {1}".format(source.url, conf)) + + with self._compare_lock: + if conf > self.best.confidence: + self.best = source + if conf >= self._min_confidence: + self._finish_early()