A Python robot that edits Wikipedia and interacts with people over IRC https://en.wikipedia.org/wiki/User:EarwigBot
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

373 lines
14 KiB

  1. # -*- coding: utf-8 -*-
  2. #
  3. # Copyright (C) 2009-2014 Ben Kurtovic <ben.kurtovic@gmail.com>
  4. #
  5. # Permission is hereby granted, free of charge, to any person obtaining a copy
  6. # of this software and associated documentation files (the "Software"), to deal
  7. # in the Software without restriction, including without limitation the rights
  8. # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  9. # copies of the Software, and to permit persons to whom the Software is
  10. # furnished to do so, subject to the following conditions:
  11. #
  12. # The above copyright notice and this permission notice shall be included in
  13. # all copies or substantial portions of the Software.
  14. #
  15. # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  16. # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  17. # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  18. # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  19. # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  20. # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  21. # SOFTWARE.
  22. from gzip import GzipFile
  23. from math import log
  24. from Queue import Empty, Queue
  25. from socket import error
  26. from StringIO import StringIO
  27. from threading import Event, Lock, Thread
  28. from time import time
  29. from urllib2 import build_opener, URLError
  30. from earwigbot import importer
  31. from earwigbot.wiki.copyvios.markov import (
  32. EMPTY, EMPTY_INTERSECTION, MarkovChain, MarkovChainIntersection)
  33. from earwigbot.wiki.copyvios.parsers import HTMLTextParser
  34. tldextract = importer.new("tldextract")
  35. __all__ = ["globalize", "localize", "CopyvioWorkspace"]
  36. _is_globalized = False
  37. _global_queues = None
  38. _global_workers = []
  39. def globalize(num_workers=8):
  40. """Cause all copyvio checks to be done by one global set of workers.
  41. This is useful when checks are being done through a web interface where
  42. large numbers of simulatenous requests could be problematic. The global
  43. workers are spawned when the function is called, run continuously, and
  44. intelligently handle multiple checks.
  45. This function is not thread-safe and should only be called when no checks
  46. are being done. It has no effect if it has already been called.
  47. """
  48. global _is_globalized, _global_queues
  49. if _is_globalized:
  50. return
  51. _global_queues = _CopyvioQueues()
  52. for i in xrange(num_workers):
  53. worker = _CopyvioWorker(_global_queues)
  54. worker.start("global-{0}".format(i))
  55. _global_workers.append(worker)
  56. _is_globalized = True
  57. def localize():
  58. """Return to using page-specific workers for copyvio checks.
  59. This disables changes made by :func:`globalize`, including stoping the
  60. global worker threads.
  61. This function is not thread-safe and should only be called when no checks
  62. are being done.
  63. """
  64. global _is_globalized, _global_queues, _global_workers
  65. if not _is_globalized:
  66. return
  67. for i in xrange(len(_global_workers)):
  68. _global_queues.unassigned.put((StopIteration, None))
  69. _global_queues = None
  70. _global_workers = []
  71. _is_globalized = False
  72. class _CopyvioSource(object):
  73. """Represents a single suspected violation source (a URL)."""
  74. def __init__(self, workspace, url, key, headers=None, timeout=5):
  75. self.url = url
  76. self.key = key
  77. self.headers = headers
  78. self.timeout = timeout
  79. self.confidence = 0.0
  80. self.chains = (EMPTY, EMPTY_INTERSECTION)
  81. self._workspace = workspace
  82. self._event = Event()
  83. def active(self):
  84. """Return whether or not this source needs to be filled out."""
  85. return not self._event.is_set()
  86. def complete(self, confidence, source_chain, delta_chain):
  87. """Complete the confidence information inside this source."""
  88. self.confidence = confidence
  89. self.chains = (source_chain, delta_chain)
  90. self._event.set()
  91. def cancel(self):
  92. """Deactivate this source without filling in the relevant data."""
  93. self._event.set()
  94. def join(self, until):
  95. """Block until this violation result is filled out."""
  96. if until:
  97. timeout = until - time()
  98. if timeout <= 0:
  99. return
  100. self._event.wait(timeout)
  101. class _CopyvioQueues(object):
  102. """Stores data necessary to maintain the various queues during a check."""
  103. def __init__(self):
  104. self.lock = Lock()
  105. self.sites = {}
  106. self.unassigned = Queue()
  107. class _CopyvioWorker(object):
  108. """A multithreaded URL opener/parser instance."""
  109. def __init__(self, queues, until=None):
  110. self._queues = queues
  111. self._until = until
  112. self._thread = None
  113. self._site = None
  114. self._queue = None
  115. self._opener = build_opener()
  116. def _open_url(self, source):
  117. """Open a URL and return its parsed content, or None.
  118. First, we will decompress the content if the headers contain "gzip" as
  119. its content encoding. Then, we will return the content stripped using
  120. an HTML parser if the headers indicate it is HTML, or return the
  121. content directly if it is plain text. If we don't understand the
  122. content type, we'll return None.
  123. If a URLError was raised while opening the URL or an IOError was raised
  124. while decompressing, None will be returned.
  125. """
  126. self._opener.addheaders = source.headers
  127. url = source.url.encode("utf8")
  128. try:
  129. response = self._opener.open(url, timeout=source.timeout)
  130. except (URLError, error):
  131. return None
  132. try:
  133. size = int(response.headers.get("Content-Length", 0))
  134. except ValueError:
  135. return None
  136. if size > 1024 ** 2: # Ignore URLs larger than a megabyte
  137. return None
  138. ctype_full = response.headers.get("Content-Type", "text/plain")
  139. ctype = ctype_full.split(";", 1)[0]
  140. if ctype in ["text/html", "application/xhtml+xml"]:
  141. handler = lambda res: HTMLTextParser(res).strip()
  142. elif ctype == "text/plain":
  143. handler = lambda res: res.strip()
  144. else:
  145. return None
  146. try:
  147. content = response.read()
  148. except (URLError, error):
  149. return None
  150. if response.headers.get("Content-Encoding") == "gzip":
  151. stream = StringIO(content)
  152. gzipper = GzipFile(fileobj=stream)
  153. try:
  154. content = gzipper.read(2 * 1024 ** 2)
  155. except IOError:
  156. return None
  157. return handler(content)
  158. def _dequeue(self):
  159. """Remove a source from one of the queues."""
  160. if self._until:
  161. timeout = self._until - time()
  162. if timeout <= 0:
  163. return
  164. else:
  165. timeout = None
  166. with self._queues.lock:
  167. if self._queue:
  168. source = self._queue.get(timeout=timeout)
  169. if self._queue.empty():
  170. del self._queues.sites[self._site]
  171. self._queue = None
  172. else:
  173. site, queue = self._queues.unassigned.get(timeout=timeout)
  174. if site is StopIteration:
  175. return StopIteration
  176. source = queue.get_nowait()
  177. if queue.empty():
  178. del self._queues.sites[site]
  179. else:
  180. self._site = site
  181. self._queue = queue
  182. if not source.active():
  183. return self._dequeue()
  184. return source
  185. def _run(self):
  186. """Main entry point for the worker thread.
  187. We will keep fetching URLs from the queues and handling them until
  188. either we run out of time, or we get an exit signal that the queue is
  189. now empty.
  190. """
  191. while True:
  192. try:
  193. source = self._dequeue()
  194. except Empty:
  195. return
  196. if source is StopIteration:
  197. return
  198. text = self._open_url(source)
  199. if text:
  200. source.workspace.compare(source, MarkovChain(text))
  201. def start(self, name):
  202. """Start the worker in a new thread, with a given name."""
  203. self._thread = thread = Thread(target=self._run)
  204. thread.name = "cvworker-" + name
  205. thread.daemon = True
  206. thread.start()
  207. class CopyvioWorkspace(object):
  208. """Manages a single copyvio check distributed across threads."""
  209. def __init__(self, article, min_confidence, until, logger, headers,
  210. url_timeout=5, num_workers=8):
  211. self.best = _CopyvioSource(self, None, None)
  212. self.sources = []
  213. self._article = article
  214. self._logger = logger.getChild("copyvios")
  215. self._min_confidence = min_confidence
  216. self._until = until
  217. self._handled_urls = []
  218. self._is_finished = False
  219. self._compare_lock = Lock()
  220. self._source_args = {"workspace": self, "headers": headers,
  221. "timeout": url_timeout}
  222. if _is_globalized:
  223. self._queues = _global_queues
  224. else:
  225. self._queues = _CopyvioQueues()
  226. for i in xrange(num_workers):
  227. worker = _CopyvioWorker(self._queues, until)
  228. worker.start("local-{0:04}.{1}".format(id(self) % 10000, i))
  229. def _calculate_confidence(self, delta):
  230. """Return the confidence of a violation as a float between 0 and 1."""
  231. def conf_with_article_and_delta(article, delta):
  232. """Calculate confidence using the article and delta chain sizes."""
  233. # This piecewise function, C_AΔ(Δ), was defined such that
  234. # confidence exhibits exponential growth until it reaches the
  235. # default "suspect" confidence threshold, at which point it
  236. # transitions to polynomial growth with lim (A/Δ)→1 C_AΔ(A,Δ) = 1.
  237. # A graph can be viewed here:
  238. # http://benkurtovic.com/static/article-delta_confidence_function.pdf
  239. ratio = delta / article
  240. if ratio <= 0.52763:
  241. return log(1 / (1 - ratio))
  242. else:
  243. return (-0.8939 * (ratio ** 2)) + (1.8948 * ratio) - 0.0009
  244. def conf_with_delta(delta):
  245. """Calculate confidence using just the delta chain size."""
  246. # This piecewise function, C_Δ(Δ), was derived from experimental
  247. # data using reference points at (0, 0), (100, 0.5), (250, 0.75),
  248. # (500, 0.9), and (1000, 0.95) with lim Δ→+∞ C_Δ(Δ) = 1.
  249. # A graph can be viewed here:
  250. # http://benkurtovic.com/static/delta_confidence_function.pdf
  251. if delta <= 100:
  252. return delta / (delta + 100)
  253. elif delta <= 250:
  254. return (delta - 25) / (delta + 50)
  255. elif delta <= 500:
  256. return (10.5 * delta - 750) / (10 * delta)
  257. else:
  258. return (delta - 50) / delta
  259. d_size = float(delta.size)
  260. return max(conf_with_article_and_delta(self._article.size, d_size),
  261. conf_with_delta(d_size))
  262. def _finish_early(self):
  263. """Finish handling links prematurely (if we've hit min_confidence)."""
  264. if self._is_finished:
  265. return
  266. self._logger.debug("Confidence threshold met; cancelling remaining sources")
  267. with self._queues.lock:
  268. for source in self.sources:
  269. source.cancel()
  270. self._is_finished = True
  271. def enqueue(self, urls, exclude_check=None):
  272. """Put a list of URLs into the various worker queues.
  273. *exclude_check* is an optional exclusion function that takes a URL and
  274. returns ``True`` if we should skip it and ``False`` otherwise.
  275. """
  276. for url in urls:
  277. if self._is_finished:
  278. break
  279. if url in self._handled_urls:
  280. continue
  281. self._handled_urls.append(url)
  282. if exclude_check and exclude_check(url):
  283. continue
  284. try:
  285. key = tldextract.extract(url).registered_domain
  286. except ImportError: # Fall back on very naive method
  287. from urlparse import urlparse
  288. key = u".".join(urlparse(url).netloc.split(".")[-2:])
  289. source = _CopyvioSource(url=url, key=key, **self._source_args)
  290. logmsg = u"enqueue(): {0} {1} -> {2}"
  291. with self._queues.lock:
  292. if key in self._queues.sites:
  293. self._logger.debug(logmsg.format("append", key, url))
  294. self._queues.sites[key].put(source)
  295. else:
  296. self._logger.debug(logmsg.format("new", key, url))
  297. self._queues.sites[key] = queue = Queue()
  298. queue.put(source)
  299. self._queues.unassigned.put((key, queue))
  300. def wait(self):
  301. """Wait for the workers to finish handling the sources."""
  302. self._logger.debug("Waiting on {0} sources".format(len(self.sources)))
  303. for source in self.sources:
  304. source.join(self._until)
  305. def compare(self, source, source_chain):
  306. """Compare a source to the article, and update the best known one."""
  307. delta = MarkovChainIntersection(self._article, source_chain)
  308. conf = self._calculate_confidence(delta)
  309. source.complete(conf, source_chain, delta)
  310. self._logger.debug(u"compare(): {0} -> {1}".format(source.url, conf))
  311. with self._compare_lock:
  312. if conf > self.best.confidence:
  313. self.best = source
  314. if conf >= self._min_confidence:
  315. self._finish_early()