Browse Source

Update copyvio checker to not make concurrent requests to a single domain.

tags/v0.2
Ben Kurtovic 10 years ago
parent
commit
7137dda920
2 changed files with 71 additions and 62 deletions
  1. +70
    -62
      earwigbot/wiki/copyvios/__init__.py
  2. +1
    -0
      setup.py

+ 70
- 62
earwigbot/wiki/copyvios/__init__.py View File

@@ -25,7 +25,7 @@ from gzip import GzipFile
from Queue import Empty, Queue from Queue import Empty, Queue
from socket import timeout from socket import timeout
from StringIO import StringIO from StringIO import StringIO
from threading import Lock, Thread
from threading import Lock, Semaphore, Thread
from time import sleep, time from time import sleep, time
from urllib2 import build_opener, URLError from urllib2 import build_opener, URLError


@@ -37,6 +37,7 @@ from earwigbot.wiki.copyvios.result import CopyvioCheckResult
from earwigbot.wiki.copyvios.search import YahooBOSSSearchEngine from earwigbot.wiki.copyvios.search import YahooBOSSSearchEngine


oauth = importer.new("oauth2") oauth = importer.new("oauth2")
tldextract = importer.new("tldextract")


__all__ = ["CopyvioMixIn"] __all__ = ["CopyvioMixIn"]


@@ -46,47 +47,34 @@ class _CopyvioWorkspace(object):
"""Manages a single copyvio check distributed across threads.""" """Manages a single copyvio check distributed across threads."""


def __init__(self, article, min_confidence, until, logger, headers, def __init__(self, article, min_confidence, until, logger, headers,
url_timeout=5):
url_timeout=5, max_concurrent_requests=6):
self.best = _WorkingResult(None, 0.0, (EMPTY, EMPTY_INTERSECTION)) self.best = _WorkingResult(None, 0.0, (EMPTY, EMPTY_INTERSECTION))
self.until = until
self.request_semaphore = Semaphore(max_concurrent_requests)


self._article = article self._article = article
self._handled_urls = []
self._headers = headers
self._logger = logger.getChild("copyvios") self._logger = logger.getChild("copyvios")
self._min_confidence = min_confidence self._min_confidence = min_confidence
self._queue = Queue()
self._handled_urls = []
self._is_finished = False
self._enqueue_lock = Lock()
self._result_lock = Lock() self._result_lock = Lock()
self._url_timeout = url_timeout
self._workers = []

self._workers = {}
self._worker_args = (self, until, headers, url_timeout)


def _calculate_confidence(self, delta): def _calculate_confidence(self, delta):
"""Return the confidence of a violation as a float between 0 and 1.""" """Return the confidence of a violation as a float between 0 and 1."""
return float(delta.size()) / self._article.size() return float(delta.size()) / self._article.size()


def _finish_early(self): def _finish_early(self):
"""Finish handling links prematurely, e.g. if we've met min confidence.

This works by inserting an additional ``None`` into the queue (the
"exit" signal for workers) and then dequeueing until we reach the first
``None``. This way, every worker will dequeue an exit signal signal on
their next turn.
"""
self._queue.put(None)
try:
while self._queue.get(block=False):
pass
except Empty: # Might happen if we haven't called wait() yet, but NBD.
pass

def spawn(self, workers):
"""Spawn *workers* number of worker threads."""
for i in xrange(workers):
worker = _CopyvioWorker(self, self._headers, self._url_timeout)
thread = Thread(target=worker.run)
thread.daemon = True
thread.start()
self._workers.append(thread)
"""Finish handling links prematurely (if we've hit min_confidence)."""
self._logger.debug("Confidence threshold met; clearing worker queues")
with self._enqueue_lock:
for worker in self._workers.itervalues():
with worker.queue.mutex:
worker.queue.clear()
worker.queue.put(None)
self._is_finished = True


def enqueue(self, urls, exclude_check=None): def enqueue(self, urls, exclude_check=None):
"""Put a list of URLs into the worker queue. """Put a list of URLs into the worker queue.
@@ -95,29 +83,48 @@ class _CopyvioWorkspace(object):
returns ``True`` if we should skip it and ``False`` otherwise. returns ``True`` if we should skip it and ``False`` otherwise.
""" """
for url in urls: for url in urls:
if url in self._handled_urls:
continue
self._handled_urls.append(url)
if exclude_check and exclude_check(url):
continue
self._queue.put(url)
with self._enqueue_lock:
if self._is_finished:
break
if url in self._handled_urls:
continue
self._handled_urls.append(url)
if exclude_check and exclude_check(url):
continue


def dequeue(self, max_time=None):
"""Get an element from the worker queue, with an optional timeout."""
return self._queue.get(timeout=max_time)
try:
key = tldextract.extract(url).registered_domain
except ImportError: # Fall back on very naive method
from urlparse import urlparse
key = u".".join(urlparse(url).netloc.split(".")[-2:])

logmsg = "enqueue(): {0} {1} -> {2}"
if key in self._workers:
self._logger.debug(logmsg.format("PUT", key, url))
self._workers[key].queue.put(url)
else:
self._logger.debug(logmsg.format("NEW", key, url))
worker = _CopyvioWorker(*self._worker_args)
worker.queue.put(url)
thread = Thread(target=worker.run)
thread.name = "cvworker-" + key.encode("utf8")
thread.daemon = True
thread.start()
self._workers[key] = worker


def wait(self): def wait(self):
"""Wait for the workers to finish handling the queue.""" """Wait for the workers to finish handling the queue."""
for i in xrange(len(self._workers)):
self._queue.put(None) # Exit signal to workers
for worker in self._workers:
self._logger.debug("Waiting on {0} workers".format(len(self._workers)))
for worker in self._workers.itervalues():
worker.queue.put(None) # Exit signal to workers
for worker in self._workers.itervalues():
worker.join() worker.join()


def compare(self, url, source): def compare(self, url, source):
"""Compare a source to the article, and update the working result.""" """Compare a source to the article, and update the working result."""
delta = MarkovChainIntersection(self._article, source) delta = MarkovChainIntersection(self._article, source)
confidence = self._calculate_confidence(delta) confidence = self._calculate_confidence(delta)
self._logger.debug("{0} -> {1}".format(url, confidence))
self._logger.debug("compare(): {0} -> {1}".format(url, confidence))
with self._result_lock: with self._result_lock:
if confidence > self.best.confidence: if confidence > self.best.confidence:
self.best = _WorkingResult(url, confidence, (source, delta)) self.best = _WorkingResult(url, confidence, (source, delta))
@@ -128,8 +135,11 @@ class _CopyvioWorkspace(object):
class _CopyvioWorker(object): class _CopyvioWorker(object):
"""A multithreaded URL opener/parser instance.""" """A multithreaded URL opener/parser instance."""


def __init__(self, workspace, headers, url_timeout):
def __init__(self, workspace, until, headers, url_timeout):
self.queue = Queue()

self._workspace = workspace self._workspace = workspace
self._until = until
self._opener = build_opener() self._opener = build_opener()
self._opener.addheaders = headers self._opener.addheaders = headers
self._url_timeout = url_timeout self._url_timeout = url_timeout
@@ -146,11 +156,12 @@ class _CopyvioWorker(object):
If a URLError was raised while opening the URL or an IOError was raised If a URLError was raised while opening the URL or an IOError was raised
while decompressing, None will be returned. while decompressing, None will be returned.
""" """
try:
response = self._opener.open(url, timeout=self._url_timeout)
result = response.read()
except (URLError, timeout):
return None
with self._workspace.request_semaphore:
try:
response = self._opener.open(url, timeout=self._url_timeout)
result = response.read()
except (URLError, timeout):
return None


if response.headers.get("Content-Encoding") == "gzip": if response.headers.get("Content-Encoding") == "gzip":
stream = StringIO(result) stream = StringIO(result)
@@ -177,17 +188,17 @@ class _CopyvioWorker(object):
now empty. now empty.
""" """
while True: while True:
if self._workspace.until:
max_time = self._workspace.until - time()
if self._until:
max_time = self._until - time()
if max_time <= 0: if max_time <= 0:
return return
try: try:
url = self._workspace.dequeue(max_time)
url = self.queue.get(timeout=max_time)
except Empty: except Empty:
return return
else: else:
url = self._workspace.dequeue()
if url is None: # Exit signal from workspace.wait()
url = self.queue.get()
if url is None: # Exit signal
return return
text = self._open_url(url.encode("utf8")) text = self._open_url(url.encode("utf8"))
if text: if text:
@@ -237,8 +248,7 @@ class CopyvioMixIn(object):


raise exceptions.UnknownSearchEngineError(engine) raise exceptions.UnknownSearchEngineError(engine)


def copyvio_check(self, min_confidence=0.5, max_queries=15, max_time=-1,
worker_threads=4):
def copyvio_check(self, min_confidence=0.5, max_queries=15, max_time=-1):
"""Check the page for copyright violations. """Check the page for copyright violations.


Returns a :class:`.CopyvioCheckResult` object with information on the Returns a :class:`.CopyvioCheckResult` object with information on the
@@ -256,13 +266,12 @@ class CopyvioMixIn(object):
if checks are called through a web server with timeouts. We will stop if checks are called through a web server with timeouts. We will stop
checking new URLs as soon as this limit is reached. checking new URLs as soon as this limit is reached.


*worker_threads* is the number of threads we will spawn to handle URL
fetching and parsing simultaneously. Between 1 and 8 is recommended.

Raises :exc:`.CopyvioCheckError` or subclasses Raises :exc:`.CopyvioCheckError` or subclasses
(:exc:`.UnknownSearchEngineError`, :exc:`.SearchQueryError`, ...) on (:exc:`.UnknownSearchEngineError`, :exc:`.SearchQueryError`, ...) on
errors. errors.
""" """
log = "Starting copyvio check for [[{0}]]"
self._logger.info(log.format(self.title))
start_time = time() start_time = time()
until = (start_time + max_time) if max_time > 0 else None until = (start_time + max_time) if max_time > 0 else None
searcher = self._get_search_engine() searcher = self._get_search_engine()
@@ -282,9 +291,7 @@ class CopyvioMixIn(object):
self._logger.info(result.get_log_message(self.title)) self._logger.info(result.get_log_message(self.title))
return result return result


workspace.spawn(worker_threads)
workspace.enqueue(parser.get_links(), exclude) workspace.enqueue(parser.get_links(), exclude)

chunks = parser.chunk(self._search_config["nltk_dir"], max_queries) chunks = parser.chunk(self._search_config["nltk_dir"], max_queries)
num_queries = 0 num_queries = 0
for chunk in chunks: for chunk in chunks:
@@ -325,13 +332,14 @@ class CopyvioMixIn(object):
Since no searching is done, neither :exc:`.UnknownSearchEngineError` Since no searching is done, neither :exc:`.UnknownSearchEngineError`
nor :exc:`.SearchQueryError` will be raised. nor :exc:`.SearchQueryError` will be raised.
""" """
log = "Starting copyvio compare for [[{0}]] against {1}"
self._logger.info(log.format(self.title, url))
start_time = time() start_time = time()
until = (start_time + max_time) if max_time > 0 else None until = (start_time + max_time) if max_time > 0 else None
article = MarkovChain(ArticleTextParser(self.get()).strip()) article = MarkovChain(ArticleTextParser(self.get()).strip())
workspace = _CopyvioWorkspace(article, min_confidence, until, workspace = _CopyvioWorkspace(article, min_confidence, until,
self._logger, self._addheaders, max_time) self._logger, self._addheaders, max_time)
workspace.enqueue([url]) workspace.enqueue([url])
workspace.spawn(1)
workspace.wait() workspace.wait()
url, conf, chains = workspace.best url, conf, chains = workspace.best
result = CopyvioCheckResult(conf >= min_confidence, conf, url, 0, result = CopyvioCheckResult(conf >= min_confidence, conf, url, 0,


+ 1
- 0
setup.py View File

@@ -43,6 +43,7 @@ extra_deps = {
"lxml >= 2.3.5", # Faster parser for BeautifulSoup "lxml >= 2.3.5", # Faster parser for BeautifulSoup
"nltk >= 2.0.2", # Parsing sentences to split article content "nltk >= 2.0.2", # Parsing sentences to split article content
"oauth2 >= 1.5.211", # Interfacing with Yahoo! BOSS Search "oauth2 >= 1.5.211", # Interfacing with Yahoo! BOSS Search
"tldextract >= 1.4", # Getting domains for the multithreaded workers
], ],
"time": [ "time": [
"pytz >= 2012d", # Handling timezones for the !time IRC command "pytz >= 2012d", # Handling timezones for the !time IRC command


Loading…
Cancel
Save