Browse Source

Redesign copyvio internals to parallelize URL loading/parsing.

tags/v0.2
Ben Kurtovic 10 years ago
parent
commit
ae0c390ceb
4 changed files with 227 additions and 133 deletions
  1. +204
    -131
      earwigbot/wiki/copyvios/__init__.py
  2. +6
    -1
      earwigbot/wiki/copyvios/markov.py
  3. +10
    -1
      earwigbot/wiki/copyvios/parsers.py
  4. +7
    -0
      earwigbot/wiki/copyvios/result.py

+ 204
- 131
earwigbot/wiki/copyvios/__init__.py View File

@@ -20,14 +20,18 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

from collections import namedtuple
from gzip import GzipFile
from Queue import Empty, Queue
from socket import timeout
from StringIO import StringIO
from threading import Lock, Thread
from time import sleep, time
from urllib2 import build_opener, URLError

from earwigbot import exceptions, importer
from earwigbot.wiki.copyvios.markov import MarkovChain, MarkovChainIntersection
from earwigbot.wiki.copyvios.markov import (
EMPTY, EMPTY_INTERSECTION, MarkovChain, MarkovChainIntersection)
from earwigbot.wiki.copyvios.parsers import ArticleTextParser, HTMLTextParser
from earwigbot.wiki.copyvios.result import CopyvioCheckResult
from earwigbot.wiki.copyvios.search import YahooBOSSSearchEngine
@@ -36,26 +40,98 @@ oauth = importer.new("oauth2")

__all__ = ["CopyvioMixIn"]

class CopyvioMixIn(object):
"""
**EarwigBot: Wiki Toolset: Copyright Violation MixIn**
_WorkingResult = namedtuple("_WorkingResult", ["url", "confidence", "chains"])

This is a mixin that provides two public methods, :py:meth:`copyvio_check`
and :py:meth:`copyvio_compare`. The former checks the page for copyright
violations using a search engine API, and the latter compares the page
against a given URL. Credentials for the search engine API are stored in
the :py:class:`~earwigbot.wiki.site.Site`'s config.
"""
EMPTY = MarkovChain("")
EMPTY_INTERSECTION = MarkovChainIntersection(EMPTY, EMPTY)
class _CopyvioWorkspace(object):
"""Manages a single copyvio check distributed across threads."""

def __init__(self, site):
self._search_config = site._search_config
self._exclusions_db = self._search_config.get("exclusions_db")
def __init__(self, article, min_confidence, until, headers, url_timeout=5):
self.best = _WorkingResult(None, 0.0, (EMPTY, EMPTY_INTERSECTION))
self.until = until

self._article = article
self._handled_urls = []
self._headers = headers
self._min_confidence = min_confidence
self._queue = Queue()
self._result_lock = Lock()
self._url_timeout = url_timeout
self._workers = []

def _calculate_confidence(self, delta):
"""Return the confidence of a violation as a float between 0 and 1."""
return float(delta.size()) / self._article.size()

def _finish_early(self):
"""Finish handling links prematurely, e.g. if we've met min confidence.

This works by inserting an additional ``None`` into the queue (the
"exit" signal for workers) and then dequeueing until we reach the first
``None``. This way, every worker will dequeue an exit signal signal on
their next turn.
"""
self._queue.put(None)
try:
while self._queue.get(block=False):
pass
except Empty: # Might happen if we haven't called wait() yet, but NBD.
pass

def spawn(self, workers):
"""Spawn *workers* number of worker threads."""
for i in xrange(workers):
worker = _CopyvioWorker(self, self._headers, self._url_timeout)
thread = Thread(target=worker.run)
thread.daemon = True
thread.start()
self._workers.append(thread)

def enqueue(self, urls, exclude_check=None):
"""Put a list of URLs into the worker queue.

*exclude_check* is an optional exclusion function that takes a URL and
returns ``True`` if we should skip it and ``False`` otherwise.
"""
for url in urls:
if url in self._handled_urls:
continue
self._handled_urls.append(url)
if exclude_check and exclude_check(url):
continue
self._queue.put(url)

def dequeue(self, max_time=None):
"""Get an element from the worker queue, with an optional timeout."""
return self._queue.get(timeout=max_time)

def wait(self):
"""Wait for the workers to finish handling the queue."""
for i in xrange(len(self._workers)):
self._queue.put(None) # Exit signal to workers
for worker in self._workers:
worker.join()

def compare(self, url, source):
"""Compare a source to the article, and update the working result."""
delta = MarkovChainIntersection(self._article, source)
confidence = self._calculate_confidence(delta)
with self._result_lock:
if confidence > self.best.confidence:
self.best = _WorkingResult(url, confidence, (source, delta))
if confidence >= self._min_confidence:
self._finish_early()


class _CopyvioWorker(object):
"""A multithreaded URL opener/parser instance."""

def __init__(self, workspace, headers, url_timeout):
self._workspace = workspace
self._opener = build_opener()
self._opener.addheaders = site._opener.addheaders
self._opener.addheaders = headers
self._url_timeout = url_timeout

def _open_url_ignoring_errors(self, url, max_time=5):
def _open_url(self, url):
"""Open a URL and return its parsed content, or None.

First, we will decompress the content if the headers contain "gzip" as
@@ -68,7 +144,7 @@ class CopyvioMixIn(object):
while decompressing, None will be returned.
"""
try:
response = self._opener.open(url.encode("utf8"), timeout=max_time)
response = self._opener.open(url, timeout=self._url_timeout)
result = response.read()
except (URLError, timeout):
return None
@@ -90,7 +166,48 @@ class CopyvioMixIn(object):
else:
return None

def _select_search_engine(self):
def run(self):
"""Main entry point for the worker.

We will keep fetching URLs from the queue and handling them until
either we run out of time, or we get an exit signal that the queue is
now empty.
"""
while True:
if self._workspace.until:
max_time = self._workspace.until - time()
if max_time <= 0:
return
try:
url = self._workspace.dequeue(timeout=max_time)
except Empty:
return
else:
url = self._workspace.dequeue()
if url is None: # Exit signal from workspace.wait()
return
text = self._open_url(url.encode("utf8"))
if text:
self._workspace.compare(url, MarkovChain(text))


class CopyvioMixIn(object):
"""
**EarwigBot: Wiki Toolset: Copyright Violation MixIn**

This is a mixin that provides two public methods, :py:meth:`copyvio_check`
and :py:meth:`copyvio_compare`. The former checks the page for copyright
violations using a search engine API, and the latter compares the page
against a given URL. Credentials for the search engine API are stored in
the :py:class:`~earwigbot.wiki.site.Site`'s config.
"""

def __init__(self, site):
self._search_config = site._search_config
self._exclusions_db = self._search_config.get("exclusions_db")
self._addheaders = site._opener.addheaders

def _get_search_engine(self):
"""Return a function that can be called to do web searches.

The function takes one argument, a search query, and returns a list of
@@ -111,152 +228,108 @@ class CopyvioMixIn(object):
except ImportError:
e = "Yahoo! BOSS requires the 'oauth2' package: https://github.com/simplegeo/python-oauth2"
raise exceptions.UnsupportedSearchEngineError(e)
return YahooBOSSSearchEngine(credentials, self._opener)
opener = build_opener()
opener.addheaders = self._addheaders
return YahooBOSSSearchEngine(credentials, opener)

raise exceptions.UnknownSearchEngineError(engine)

def _copyvio_compare_content(self, article, url, max_time=5):
"""Return a number comparing an article and a URL.

The *article* is a Markov chain, whereas the *url* is just a string
that we'll try to open and read ourselves.
"""
text = self._open_url_ignoring_errors(url, max_time)
if not text:
return 0, (self.EMPTY, self.EMPTY_INTERSECTION)

source = MarkovChain(text)
delta = MarkovChainIntersection(article, source)
return float(delta.size()) / article.size(), (source, delta)

def copyvio_check(self, min_confidence=0.5, max_queries=-1, max_time=-1,
interquery_sleep=1):
def copyvio_check(self, min_confidence=0.5, max_queries=15, max_time=-1,
worker_threads=4):
"""Check the page for copyright violations.

Returns a
:py:class:`~earwigbot.wiki.copyvios.result.CopyvioCheckResult` object
with information on the results of the check.
Returns a :class:`.CopyvioCheckResult` object with information on the
results of the check.

*min_confidence* is the minimum amount of confidence we must have in
the similarity between a source text and the article in order for us to
consider it a suspected violation. This is a number between 0 and 1.

*max_queries* is self-explanatory; we will never make more than this
number of queries in a given check. If it's lower than 0, we will not
limit the number of queries.
number of queries in a given check.

*max_time* can be set to prevent copyvio checks from taking longer than
a set amount of time (generally around a minute), which can be useful
if checks are called through a web server with timeouts. We will stop
checking new URLs as soon as this limit is reached.

*interquery_sleep* is the minimum amount of time we will sleep between
search engine queries, in seconds.
*worker_threads* is the number of threads we will spawn to handle URL
fetching and parsing simultaneously. Between 1 and 8 is recommended.

Raises :py:exc:`~earwigbot.exceptions.CopyvioCheckError` or subclasses
(:py:exc:`~earwigbot.exceptions.UnknownSearchEngineError`,
:py:exc:`~earwigbot.exceptions.SearchQueryError`, ...) on errors.
Raises :exc:`.CopyvioCheckError` or subclasses
(:exc:`.UnknownSearchEngineError`, :exc:`.SearchQueryError`, ...) on
errors.
"""
start_time = time()
searcher = self._select_search_engine()
until = (start_time + max_time) if max_time > 0 else None
searcher = self._get_search_engine()
parser = ArticleTextParser(self.get())
article = MarkovChain(parser.strip())
workspace = _CopyvioWorkspace(article, min_confidence,
until, self._addheaders)
if self._exclusions_db:
self._exclusions_db.sync(self.site.name)
handled_urls = []
best_confidence = 0
best_match = None
num_queries = 0
best_chains = (self.EMPTY, self.EMPTY_INTERSECTION)
parser = ArticleTextParser(self.get())
clean = parser.strip()
chunks = parser.chunk(self._search_config["nltk_dir"], max_queries)
article_chain = MarkovChain(clean)
last_query = time()
exclude = lambda u: self._exclusions_db.check(self.site.name, u)
else:
exclude = None

if article.size() < 20: # Auto-fail very small articles
result = CopyvioCheckResult(False, 0.0, None, 0, 0, article,
workspace.best.chains)
self._logger.debug(result.get_log_message(self.title))
return result

if article_chain.size() < 20: # Auto-fail very small articles
return CopyvioCheckResult(False, best_confidence, best_match,
num_queries, 0, article_chain,
best_chains)
workspace.spawn(worker_threads)
workspace.enqueue(parser.get_links(), exclude)

while (chunks and best_confidence < min_confidence and
(max_queries < 0 or num_queries < max_queries)):
chunk = chunks.pop(0)
chunks = parser.chunk(self._search_config["nltk_dir"], max_queries)
for chunk in chunks:
if workspace.best.confidence >= min_confidence:
break
log = u"[[{0}]] -> querying {1} for {2!r}"
self._logger.debug(log.format(self.title, searcher.name, chunk))
urls = searcher.search(chunk)
urls = [url for url in urls if url not in handled_urls]
for url in urls:
handled_urls.append(url)
if self._exclusions_db:
if self._exclusions_db.check(self.site.name, url):
continue
conf, chns = self._copyvio_compare_content(article_chain, url)
if conf > best_confidence:
best_confidence = conf
best_match = url
best_chains = chns
if time() - start_time > max_time:
break
num_queries += 1
if time() - start_time > max_time:
break
diff = time() - last_query
if diff < interquery_sleep:
sleep(interquery_sleep - diff)
last_query = time()

ctime = time() - start_time
if best_confidence >= min_confidence:
is_violation = True
log = u"Violation detected for [[{0}]] (confidence: {1}; URL: {2}; using {3} queries in {4} seconds)"
self._logger.debug(log.format(self.title, best_confidence,
best_match, num_queries, ctime))
else:
is_violation = False
log = u"No violation for [[{0}]] (confidence: {1}; using {2} queries in {3} seconds)"
self._logger.debug(log.format(self.title, best_confidence,
num_queries, ctime))
workspace.enqueue(searcher.search(chunk), exclude)
sleep(1)

return CopyvioCheckResult(is_violation, best_confidence, best_match,
num_queries, ctime, article_chain,
best_chains)
workspace.wait()
result = CopyvioCheckResult(
workspace.best.confidence >= min_confidence,
workspace.best.confidence, workspace.best.url, len(chunks),
time() - start_time, article, workspace.best.chains)
self._logger.debug(result.get_log_message(self.title))
return result

def copyvio_compare(self, url, min_confidence=0.5, max_time=30):
"""Check the page like :py:meth:`copyvio_check` against a specific URL.

This is essentially a reduced version of the above - a copyivo
comparison is made using Markov chains and the result is returned in a
:py:class:`~earwigbot.wiki.copyvios.result.CopyvioCheckResult` object -
but without using a search engine, since the suspected "violated" URL
is supplied from the start.
This is essentially a reduced version of :meth:`copyvio_check` - a
copyivo comparison is made using Markov chains and the result is
returned in a :class:`.CopyvioCheckResult` object - but without using a
search engine, since the suspected "violated" URL is supplied from the
start.

Its primary use is to generate a result when the URL is retrieved from
a cache, like the one used in EarwigBot's Toolserver site. After a
search is done, the resulting URL is stored in a cache for 24 hours so
a cache, like the one used in EarwigBot's Tool Labs site. After a
search is done, the resulting URL is stored in a cache for 72 hours so
future checks against that page will not require another set of
time-and-money-consuming search engine queries. However, the comparison
itself (which includes the article's and the source's content) cannot
be stored for data retention reasons, so a fresh comparison is made
using this function.

Since no searching is done, neither
:py:exc:`~earwigbot.exceptions.UnknownSearchEngineError` nor
:py:exc:`~earwigbot.exceptions.SearchQueryError` will be raised.
Since no searching is done, neither :exc:`.UnknownSearchEngineError`
nor :exc:`.SearchQueryError` will be raised.
"""
start_time = time()
content = self.get()
clean = ArticleTextParser(content).strip()
article_chain = MarkovChain(clean)

if not url:
chns = (self.EMPTY, self.EMPTY_INTERSECTION)
return CopyvioCheckResult(False, 0, url, 0, 0, article_chain, chns)

confidence, chains = self._copyvio_compare_content(article_chain, url, max_time)
ctime = time() - start_time
if confidence >= min_confidence:
is_violation = True
log = u"Violation detected for [[{0}]] (confidence: {1}; URL: {2}; {3} seconds)"
self._logger.debug(log.format(self.title, confidence, url, ctime))
else:
is_violation = False
log = u"No violation for [[{0}]] (confidence: {1}; URL: {2}; {3} seconds)"
self._logger.debug(log.format(self.title, confidence, url, ctime))

return CopyvioCheckResult(is_violation, confidence, url, 0, ctime,
article_chain, chains)
until = (start_time + max_time) if max_time > 0 else None
article = MarkovChain(ArticleTextParser(self.get()).strip())
workspace = _CopyvioWorkspace(article, min_confidence,
until, self._addheaders)
workspace.enqueue([url])
workspace.spawn(1)
workspace.wait()
url, conf, chains = workspace.best
result = CopyvioCheckResult(conf >= min_confidence, conf, url, 0,
time() - start_time, article, chains)
self._logger.debug(result.get_log_message(self.title))
return result

+ 6
- 1
earwigbot/wiki/copyvios/markov.py View File

@@ -23,7 +23,8 @@
from collections import defaultdict
from re import sub, UNICODE

__all__ = ["MarkovChain", "MarkovChainIntersection"]
__all__ = ["EMPTY", "EMPTY_INTERSECTION", "MarkovChain",
"MarkovChainIntersection"]

class MarkovChain(object):
"""Implements a basic ngram Markov chain of words."""
@@ -85,3 +86,7 @@ class MarkovChainIntersection(MarkovChain):
"""Return a nice string representation of the intersection."""
res = "<MarkovChainIntersection of size {0} ({1} ^ {2})>"
return res.format(self.size(), self.mc1, self.mc2)


EMPTY = MarkovChain("")
EMPTY_INTERSECTION = MarkovChainIntersection(EMPTY, EMPTY)

+ 10
- 1
earwigbot/wiki/copyvios/parsers.py View File

@@ -124,9 +124,18 @@ class ArticleTextParser(BaseTextParser):
else:
chunk = sentences.pop(3 * len(sentences) / 4) # Pop from Q3
chunks.append(chunk)

return chunks

def get_links(self):
"""Return a list of all external links in the article.

The list is restricted to things that we suspect we can parse: i.e.,
those with schemes of ``http`` and ``https``.
"""
schemes = ("http://", "https://")
links = mwparserfromhell.parse(self.text).ifilter_external_links()
return [link.url for link in links if link.url.startswith(schemes)]


class HTMLTextParser(BaseTextParser):
"""A parser that can extract the text from an HTML document."""


+ 7
- 0
earwigbot/wiki/copyvios/result.py View File

@@ -61,3 +61,10 @@ class CopyvioCheckResult(object):
"""Return a nice string representation of the result."""
res = "<CopyvioCheckResult ({0} with {1} conf)>"
return res.format(self.violation, self.confidence)

def get_log_message(self, title):
"""Build a relevant log message for this copyvio check result."""
log = u"{0} for [[{1}]] (confidence: {2}; URL: {3}; {4} queries; {5} seconds)"
is_vio = "Violation detected" if self.violation else "No violation"
return log.format(is_vio, title, self.confidence, self.url,
self.queries, self.time)

Loading…
Cancel
Save