A Python robot that edits Wikipedia and interacts with people over IRC https://en.wikipedia.org/wiki/User:EarwigBot
25'ten fazla konu seçemezsiniz Konular bir harf veya rakamla başlamalı, kısa çizgiler ('-') içerebilir ve en fazla 35 karakter uzunluğunda olabilir.

518 satır
19 KiB

  1. # Copyright (C) 2009-2019 Ben Kurtovic <ben.kurtovic@gmail.com>
  2. #
  3. # Permission is hereby granted, free of charge, to any person obtaining a copy
  4. # of this software and associated documentation files (the "Software"), to deal
  5. # in the Software without restriction, including without limitation the rights
  6. # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  7. # copies of the Software, and to permit persons to whom the Software is
  8. # furnished to do so, subject to the following conditions:
  9. #
  10. # The above copyright notice and this permission notice shall be included in
  11. # all copies or substantial portions of the Software.
  12. #
  13. # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  14. # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  15. # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  16. # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  17. # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  18. # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  19. # SOFTWARE.
  20. import base64
  21. import collections
  22. import functools
  23. import time
  24. import urllib.parse
  25. from collections import deque
  26. from gzip import GzipFile
  27. from http.client import HTTPException
  28. from io import StringIO
  29. from logging import getLogger
  30. from math import log
  31. from queue import Empty, Queue
  32. from struct import error as struct_error
  33. from threading import Lock, Thread
  34. from urllib.error import URLError
  35. from urllib.request import Request, build_opener
  36. from earwigbot import importer
  37. from earwigbot.exceptions import ParserExclusionError, ParserRedirectError
  38. from earwigbot.wiki.copyvios.markov import MarkovChain, MarkovChainIntersection
  39. from earwigbot.wiki.copyvios.parsers import get_parser
  40. from earwigbot.wiki.copyvios.result import CopyvioCheckResult, CopyvioSource
  41. tldextract = importer.new("tldextract")
  42. __all__ = ["globalize", "localize", "CopyvioWorkspace"]
  43. _MAX_REDIRECTS = 3
  44. _MAX_RAW_SIZE = 20 * 1024**2
  45. _is_globalized = False
  46. _global_queues = None
  47. _global_workers = []
  48. _OpenedURL = collections.namedtuple("_OpenedURL", ["content", "parser_class"])
  49. def globalize(num_workers=8):
  50. """Cause all copyvio checks to be done by one global set of workers.
  51. This is useful when checks are being done through a web interface where
  52. large numbers of simulatenous requests could be problematic. The global
  53. workers are spawned when the function is called, run continuously, and
  54. intelligently handle multiple checks.
  55. This function is not thread-safe and should only be called when no checks
  56. are being done. It has no effect if it has already been called.
  57. """
  58. global _is_globalized, _global_queues
  59. if _is_globalized:
  60. return
  61. _global_queues = _CopyvioQueues()
  62. for i in range(num_workers):
  63. worker = _CopyvioWorker(f"global-{i}", _global_queues)
  64. worker.start()
  65. _global_workers.append(worker)
  66. _is_globalized = True
  67. def localize():
  68. """Return to using page-specific workers for copyvio checks.
  69. This disables changes made by :func:`globalize`, including stoping the
  70. global worker threads.
  71. This function is not thread-safe and should only be called when no checks
  72. are being done.
  73. """
  74. global _is_globalized, _global_queues, _global_workers
  75. if not _is_globalized:
  76. return
  77. for i in range(len(_global_workers)):
  78. _global_queues.unassigned.put((StopIteration, None))
  79. _global_queues = None
  80. _global_workers = []
  81. _is_globalized = False
  82. class _CopyvioQueues:
  83. """Stores data necessary to maintain the various queues during a check."""
  84. def __init__(self):
  85. self.lock = Lock()
  86. self.sites = {}
  87. self.unassigned = Queue()
  88. class _CopyvioWorker:
  89. """A multithreaded URL opener/parser instance."""
  90. def __init__(self, name, queues, until=None):
  91. self._name = name
  92. self._queues = queues
  93. self._until = until
  94. self._site = None
  95. self._queue = None
  96. self._search_config = None
  97. self._opener = build_opener()
  98. self._logger = getLogger("earwigbot.wiki.cvworker." + name)
  99. def _try_map_proxy_url(self, url, parsed, extra_headers, is_error=False):
  100. if not self._search_config or "proxies" not in self._search_config:
  101. return url, False
  102. for proxy_info in self._search_config["proxies"]:
  103. if parsed.netloc != proxy_info["netloc"]:
  104. continue
  105. if "onerr" in proxy_info:
  106. if proxy_info["onerr"] and not is_error:
  107. continue
  108. if not proxy_info["onerr"] and is_error:
  109. continue
  110. path = parsed.path
  111. if "path" in proxy_info:
  112. if not parsed.path.startswith(proxy_info["path"]):
  113. continue
  114. path = path[len(proxy_info["path"]) :]
  115. url = proxy_info["target"] + path
  116. if "auth" in proxy_info:
  117. extra_headers["Authorization"] = "Basic %s" % (
  118. base64.b64encode(proxy_info["auth"])
  119. )
  120. return url, True
  121. return url, False
  122. def _open_url_raw(self, url, timeout=5, allow_content_types=None):
  123. """Open a URL, without parsing it.
  124. None will be returned for URLs that cannot be read for whatever reason.
  125. """
  126. parsed = urllib.parse.urlparse(url)
  127. if not isinstance(url, str):
  128. url = url.encode("utf8")
  129. extra_headers = {}
  130. url, _ = self._try_map_proxy_url(url, parsed, extra_headers)
  131. request = Request(url, headers=extra_headers)
  132. try:
  133. response = self._opener.open(request, timeout=timeout)
  134. except (OSError, URLError, HTTPException, ValueError):
  135. url, remapped = self._try_map_proxy_url(
  136. url, parsed, extra_headers, is_error=True
  137. )
  138. if not remapped:
  139. self._logger.exception("Failed to fetch URL: %s", url)
  140. return None
  141. self._logger.info("Failed to fetch URL, trying proxy remap: %s", url)
  142. request = Request(url, headers=extra_headers)
  143. try:
  144. response = self._opener.open(request, timeout=timeout)
  145. except (OSError, URLError, HTTPException, ValueError):
  146. self._logger.exception("Failed to fetch URL after proxy remap: %s", url)
  147. return None
  148. try:
  149. size = int(response.headers.get("Content-Length", 0))
  150. except ValueError:
  151. return None
  152. content_type = response.headers.get("Content-Type", "text/plain")
  153. content_type = content_type.split(";", 1)[0]
  154. parser_class = get_parser(content_type)
  155. if not parser_class and (
  156. not allow_content_types or content_type not in allow_content_types
  157. ):
  158. return None
  159. if not parser_class:
  160. parser_class = get_parser("text/plain")
  161. if size > (15 if parser_class.TYPE == "PDF" else 2) * 1024**2:
  162. return None
  163. try:
  164. # Additional safety check for pages using Transfer-Encoding: chunked
  165. # where we can't read the Content-Length
  166. content = response.read(_MAX_RAW_SIZE + 1)
  167. except (OSError, URLError):
  168. return None
  169. if len(content) > _MAX_RAW_SIZE:
  170. return None
  171. if response.headers.get("Content-Encoding") == "gzip":
  172. stream = StringIO(content)
  173. gzipper = GzipFile(fileobj=stream)
  174. try:
  175. content = gzipper.read()
  176. except (OSError, struct_error):
  177. return None
  178. if len(content) > _MAX_RAW_SIZE:
  179. return None
  180. return _OpenedURL(content, parser_class)
  181. def _open_url(self, source, redirects=0):
  182. """Open a URL and return its parsed content, or None.
  183. First, we will decompress the content if the headers contain "gzip" as
  184. its content encoding. Then, we will return the content stripped using
  185. an HTML parser if the headers indicate it is HTML, or return the
  186. content directly if it is plain text. If we don't understand the
  187. content type, we'll return None.
  188. If a URLError was raised while opening the URL or an IOError was raised
  189. while decompressing, None will be returned.
  190. """
  191. self._search_config = source.search_config
  192. if source.headers:
  193. self._opener.addheaders = source.headers
  194. result = self._open_url_raw(source.url, timeout=source.timeout)
  195. if result is None:
  196. return None
  197. args = source.parser_args.copy() if source.parser_args else {}
  198. args["open_url"] = functools.partial(self._open_url_raw, timeout=source.timeout)
  199. parser = result.parser_class(result.content, url=source.url, args=args)
  200. try:
  201. return parser.parse()
  202. except ParserRedirectError as exc:
  203. if redirects >= _MAX_REDIRECTS:
  204. return None
  205. source.url = exc.url.decode("utf8")
  206. return self._open_url(source, redirects=redirects + 1)
  207. def _acquire_new_site(self):
  208. """Block for a new unassigned site queue."""
  209. if self._until:
  210. timeout = self._until - time.time()
  211. if timeout <= 0:
  212. raise Empty
  213. else:
  214. timeout = None
  215. self._logger.debug("Waiting for new site queue")
  216. site, queue = self._queues.unassigned.get(timeout=timeout)
  217. if site is StopIteration:
  218. raise StopIteration
  219. self._logger.debug(f"Acquired new site queue: {site}")
  220. self._site = site
  221. self._queue = queue
  222. def _dequeue(self):
  223. """Remove a source from one of the queues."""
  224. if not self._site:
  225. self._acquire_new_site()
  226. logmsg = "Fetching source URL from queue {0}"
  227. self._logger.debug(logmsg.format(self._site))
  228. self._queues.lock.acquire()
  229. try:
  230. source = self._queue.popleft()
  231. except IndexError:
  232. self._logger.debug("Queue is empty")
  233. del self._queues.sites[self._site]
  234. self._site = None
  235. self._queue = None
  236. self._queues.lock.release()
  237. return self._dequeue()
  238. self._logger.debug(f"Got source URL: {source.url}")
  239. if source.skipped:
  240. self._logger.debug("Source has been skipped")
  241. self._queues.lock.release()
  242. return self._dequeue()
  243. source.start_work()
  244. self._queues.lock.release()
  245. return source
  246. def _handle_once(self):
  247. """Handle a single source from one of the queues."""
  248. try:
  249. source = self._dequeue()
  250. except Empty:
  251. self._logger.debug("Exiting: queue timed out")
  252. return False
  253. except StopIteration:
  254. self._logger.debug("Exiting: got stop signal")
  255. return False
  256. try:
  257. text = self._open_url(source)
  258. except ParserExclusionError:
  259. self._logger.debug("Source excluded by content parser")
  260. source.skipped = source.excluded = True
  261. source.finish_work()
  262. except Exception:
  263. self._logger.exception("Uncaught exception in worker")
  264. source.skip()
  265. source.finish_work()
  266. else:
  267. chain = MarkovChain(text) if text else None
  268. source.workspace.compare(source, chain)
  269. return True
  270. def _run(self):
  271. """Main entry point for the worker thread.
  272. We will keep fetching URLs from the queues and handling them until
  273. either we run out of time, or we get an exit signal that the queue is
  274. now empty.
  275. """
  276. while True:
  277. try:
  278. if not self._handle_once():
  279. break
  280. except Exception:
  281. self._logger.exception("Uncaught exception in worker")
  282. time.sleep(5) # Delay if we get stuck in a busy loop
  283. def start(self):
  284. """Start the copyvio worker in a new thread."""
  285. thread = Thread(target=self._run, name="cvworker-" + self._name)
  286. thread.daemon = True
  287. thread.start()
  288. class CopyvioWorkspace:
  289. """Manages a single copyvio check distributed across threads."""
  290. def __init__(
  291. self,
  292. article,
  293. min_confidence,
  294. max_time,
  295. logger,
  296. headers,
  297. url_timeout=5,
  298. num_workers=8,
  299. short_circuit=True,
  300. parser_args=None,
  301. exclude_check=None,
  302. config=None,
  303. ):
  304. self.sources = []
  305. self.finished = False
  306. self.possible_miss = False
  307. self._article = article
  308. self._logger = logger.getChild("copyvios")
  309. self._min_confidence = min_confidence
  310. self._start_time = time.time()
  311. self._until = (self._start_time + max_time) if max_time > 0 else None
  312. self._handled_urls = set()
  313. self._finish_lock = Lock()
  314. self._short_circuit = short_circuit
  315. self._source_args = {
  316. "workspace": self,
  317. "headers": headers,
  318. "timeout": url_timeout,
  319. "parser_args": parser_args,
  320. "search_config": config,
  321. }
  322. self._exclude_check = exclude_check
  323. if _is_globalized:
  324. self._queues = _global_queues
  325. else:
  326. self._queues = _CopyvioQueues()
  327. self._num_workers = num_workers
  328. for i in range(num_workers):
  329. name = f"local-{id(self) % 10000:04}.{i}"
  330. _CopyvioWorker(name, self._queues, self._until).start()
  331. def _calculate_confidence(self, delta):
  332. """Return the confidence of a violation as a float between 0 and 1."""
  333. def conf_with_article_and_delta(article, delta):
  334. """Calculate confidence using the article and delta chain sizes."""
  335. # This piecewise function exhibits exponential growth until it
  336. # reaches the default "suspect" confidence threshold, at which
  337. # point it transitions to polynomial growth with a limit of 1 as
  338. # (delta / article) approaches 1.
  339. # A graph can be viewed here: https://goo.gl/mKPhvr
  340. ratio = delta / article
  341. if ratio <= 0.52763:
  342. return -log(1 - ratio)
  343. else:
  344. return (-0.8939 * (ratio**2)) + (1.8948 * ratio) - 0.0009
  345. def conf_with_delta(delta):
  346. """Calculate confidence using just the delta chain size."""
  347. # This piecewise function was derived from experimental data using
  348. # reference points at (0, 0), (100, 0.5), (250, 0.75), (500, 0.9),
  349. # and (1000, 0.95), with a limit of 1 as delta approaches infinity.
  350. # A graph can be viewed here: https://goo.gl/lVl7or
  351. if delta <= 100:
  352. return delta / (delta + 100)
  353. elif delta <= 250:
  354. return (delta - 25) / (delta + 50)
  355. elif delta <= 500:
  356. return (10.5 * delta - 750) / (10 * delta)
  357. else:
  358. return (delta - 50) / delta
  359. d_size = float(delta.size)
  360. return abs(
  361. max(
  362. conf_with_article_and_delta(self._article.size, d_size),
  363. conf_with_delta(d_size),
  364. )
  365. )
  366. def _finish_early(self):
  367. """Finish handling links prematurely (if we've hit min_confidence)."""
  368. self._logger.debug("Confidence threshold met; skipping remaining sources")
  369. with self._queues.lock:
  370. for source in self.sources:
  371. source.skip()
  372. self.finished = True
  373. def enqueue(self, urls):
  374. """Put a list of URLs into the various worker queues."""
  375. for url in urls:
  376. with self._queues.lock:
  377. if url in self._handled_urls:
  378. continue
  379. self._handled_urls.add(url)
  380. source = CopyvioSource(url=url, **self._source_args)
  381. self.sources.append(source)
  382. if self._exclude_check and self._exclude_check(url):
  383. self._logger.debug(f"enqueue(): exclude {url}")
  384. source.excluded = True
  385. source.skip()
  386. continue
  387. if self._short_circuit and self.finished:
  388. self._logger.debug(f"enqueue(): auto-skip {url}")
  389. source.skip()
  390. continue
  391. try:
  392. key = tldextract.extract(url).registered_domain
  393. except ImportError: # Fall back on very naive method
  394. from urllib.parse import urlparse
  395. key = ".".join(urlparse(url).netloc.split(".")[-2:])
  396. logmsg = "enqueue(): {0} {1} -> {2}"
  397. if key in self._queues.sites:
  398. self._logger.debug(logmsg.format("append", key, url))
  399. self._queues.sites[key].append(source)
  400. else:
  401. self._logger.debug(logmsg.format("new", key, url))
  402. self._queues.sites[key] = queue = deque()
  403. queue.append(source)
  404. self._queues.unassigned.put((key, queue))
  405. def compare(self, source, source_chain):
  406. """Compare a source to the article; call _finish_early if necessary."""
  407. if source_chain:
  408. delta = MarkovChainIntersection(self._article, source_chain)
  409. conf = self._calculate_confidence(delta)
  410. else:
  411. conf = 0.0
  412. self._logger.debug(f"compare(): {source.url} -> {conf}")
  413. with self._finish_lock:
  414. if source_chain:
  415. source.update(conf, source_chain, delta)
  416. source.finish_work()
  417. if not self.finished and conf >= self._min_confidence:
  418. if self._short_circuit:
  419. self._finish_early()
  420. else:
  421. self.finished = True
  422. def wait(self):
  423. """Wait for the workers to finish handling the sources."""
  424. self._logger.debug(f"Waiting on {len(self.sources)} sources")
  425. for source in self.sources:
  426. source.join(self._until)
  427. with self._finish_lock:
  428. pass # Wait for any remaining comparisons to be finished
  429. if not _is_globalized:
  430. for i in range(self._num_workers):
  431. self._queues.unassigned.put((StopIteration, None))
  432. def get_result(self, num_queries=0):
  433. """Return a CopyvioCheckResult containing the results of this check."""
  434. def cmpfunc(s1, s2):
  435. if s2.confidence != s1.confidence:
  436. return 1 if s2.confidence > s1.confidence else -1
  437. if s2.excluded != s1.excluded:
  438. return 1 if s1.excluded else -1
  439. return int(s1.skipped) - int(s2.skipped)
  440. self.sources.sort(cmpfunc)
  441. return CopyvioCheckResult(
  442. self.finished,
  443. self.sources,
  444. num_queries,
  445. time.time() - self._start_time,
  446. self._article,
  447. self.possible_miss,
  448. )