Browse Source

Python 3 conversion + typing for all of copyvios

tags/v0.4
Ben Kurtovic 4 months ago
parent
commit
2936ffbf5f
9 changed files with 765 additions and 555 deletions
  1. +0
    -4
      pyproject.toml
  2. +100
    -166
      src/earwigbot/wiki/copyvios/__init__.py
  3. +51
    -35
      src/earwigbot/wiki/copyvios/exclusions.py
  4. +45
    -31
      src/earwigbot/wiki/copyvios/markov.py
  5. +143
    -86
      src/earwigbot/wiki/copyvios/parsers.py
  6. +71
    -57
      src/earwigbot/wiki/copyvios/result.py
  7. +93
    -48
      src/earwigbot/wiki/copyvios/search.py
  8. +155
    -125
      src/earwigbot/wiki/copyvios/workers.py
  9. +107
    -3
      src/earwigbot/wiki/page.py

+ 0
- 4
pyproject.toml View File

@@ -59,10 +59,6 @@ requires = ["setuptools>=61.0"]
build-backend = "setuptools.build_meta"

[tool.pyright]
exclude = [
# TODO
"src/earwigbot/wiki/copyvios"
]
pythonVersion = "3.11"
venvPath = "."
venv = "venv"


+ 100
- 166
src/earwigbot/wiki/copyvios/__init__.py View File

@@ -18,208 +18,142 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

__all__ = [
"DEFAULT_DEGREE",
"CopyvioChecker",
"CopyvioCheckResult",
"globalize",
"localize",
]

import functools
import logging
import time
from urllib.request import build_opener
from collections.abc import Callable

from earwigbot import exceptions
from earwigbot.wiki.copyvios.markov import MarkovChain
from earwigbot.wiki.copyvios.parsers import ArticleTextParser
from earwigbot.wiki.copyvios.search import SEARCH_ENGINES
from earwigbot.wiki.copyvios.exclusions import ExclusionsDB
from earwigbot.wiki.copyvios.markov import DEFAULT_DEGREE, MarkovChain
from earwigbot.wiki.copyvios.parsers import ArticleParser, ParserArgs
from earwigbot.wiki.copyvios.result import CopyvioCheckResult
from earwigbot.wiki.copyvios.search import SearchEngine, get_search_engine
from earwigbot.wiki.copyvios.workers import CopyvioWorkspace, globalize, localize
from earwigbot.wiki.page import Page

__all__ = ["CopyvioMixIn", "globalize", "localize"]


class CopyvioMixIn:
class CopyvioChecker:
"""
**EarwigBot: Wiki Toolset: Copyright Violation MixIn**
Manages the lifecycle of a copyvio check or comparison.

This is a mixin that provides two public methods, :py:meth:`copyvio_check`
and :py:meth:`copyvio_compare`. The former checks the page for copyright
violations using a search engine API, and the latter compares the page
against a given URL. Credentials for the search engine API are stored in
the :py:class:`~earwigbot.wiki.site.Site`'s config.
Created by :py:class:`~earwigbot.wiki.page.Page` and handles the implementation
details of running a check.
"""

def __init__(self, site):
self._search_config = site._search_config
self._exclusions_db = self._search_config.get("exclusions_db")
self._addheaders = [
("User-Agent", site.user_agent),
def __init__(
self,
page: Page,
*,
min_confidence: float = 0.75,
max_time: float = 30,
degree: int = DEFAULT_DEGREE,
logger: logging.Logger | None = None,
) -> None:
self._page = page
self._site = page.site
self._config = page.site._search_config
self._min_confidence = min_confidence
self._max_time = max_time
self._degree = degree
self._logger = logger or logging.getLogger("earwigbot.wiki")

self._headers = [
("User-Agent", page.site.user_agent),
("Accept-Encoding", "gzip"),
]

def _get_search_engine(self):
"""Return a function that can be called to do web searches.

The function takes one argument, a search query, and returns a list of
URLs, ranked by importance. The underlying logic depends on the
*engine* argument within our config; for example, if *engine* is
"Yahoo! BOSS", we'll use YahooBOSSSearchEngine for querying.

Raises UnknownSearchEngineError if the 'engine' listed in our config is
unknown to us, and UnsupportedSearchEngineError if we are missing a
required package or module, like oauth2 for "Yahoo! BOSS".
"""
engine = self._search_config["engine"]
if engine not in SEARCH_ENGINES:
raise exceptions.UnknownSearchEngineError(engine)

klass = SEARCH_ENGINES[engine]
credentials = self._search_config["credentials"]
opener = build_opener()
opener.addheaders = self._addheaders

for dep in klass.requirements():
try:
__import__(dep).__name__
except (ModuleNotFoundError, AttributeError):
e = "Missing a required dependency ({}) for the {} engine"
e = e.format(dep, engine)
raise exceptions.UnsupportedSearchEngineError(e)

return klass(credentials, opener)

def copyvio_check(
self,
min_confidence=0.75,
max_queries=15,
max_time=-1,
no_searches=False,
no_links=False,
short_circuit=True,
degree=5,
):
"""Check the page for copyright violations.

Returns a :class:`.CopyvioCheckResult` object with information on the
results of the check.

*min_confidence* is the minimum amount of confidence we must have in
the similarity between a source text and the article in order for us to
consider it a suspected violation. This is a number between 0 and 1.

*max_queries* is self-explanatory; we will never make more than this
number of queries in a given check.

*max_time* can be set to prevent copyvio checks from taking longer than
a set amount of time (generally around a minute), which can be useful
if checks are called through a web server with timeouts. We will stop
checking new URLs as soon as this limit is reached.

Setting *no_searches* to ``True`` will cause only URLs in the wikitext
of the page to be checked; no search engine queries will be made.
Setting *no_links* to ``True`` will cause the opposite to happen: URLs
in the wikitext will be ignored; search engine queries will be made
only. Setting both of these to ``True`` is pointless.

Normally, the checker will short-circuit if it finds a URL that meets
*min_confidence*. This behavior normally causes it to skip any
remaining URLs and web queries, but setting *short_circuit* to
``False`` will prevent this.

Raises :exc:`.CopyvioCheckError` or subclasses
(:exc:`.UnknownSearchEngineError`, :exc:`.SearchQueryError`, ...) on
errors.
"""
log = "Starting copyvio check for [[{0}]]"
self._logger.info(log.format(self.title))
searcher = self._get_search_engine()
parser = ArticleTextParser(
self.get(),
args={"nltk_dir": self._search_config["nltk_dir"], "lang": self._site.lang},
self._parser = ArticleParser(
self._page.get(),
lang=self._site.lang,
nltk_dir=self._config["nltk_dir"],
)
article = MarkovChain(parser.strip(), degree=degree)
parser_args = {}
self._article = MarkovChain(self._parser.strip(), degree=self._degree)

if self._exclusions_db:
self._exclusions_db.sync(self.site.name)
@functools.cached_property
def _searcher(self) -> SearchEngine:
return get_search_engine(self._config, self._headers)

def exclude(u):
return self._exclusions_db.check(self.site.name, u)
@property
def _exclusions_db(self) -> ExclusionsDB | None:
return self._config.get("exclusions_db")

parser_args["mirror_hints"] = self._exclusions_db.get_mirror_hints(self)
else:
exclude = None
def _get_exclusion_callback(self) -> Callable[[str], bool] | None:
if not self._exclusions_db:
return None
return functools.partial(self._exclusions_db.check, self._site.name)

def run_check(
self,
*,
max_queries: int = 15,
no_searches: bool = False,
no_links: bool = False,
short_circuit: bool = True,
) -> CopyvioCheckResult:
parser_args: ParserArgs = {}
if self._exclusions_db:
self._exclusions_db.sync(self._site.name)
mirror_hints = self._exclusions_db.get_mirror_hints(self._page)
parser_args["mirror_hints"] = mirror_hints

workspace = CopyvioWorkspace(
article,
min_confidence,
max_time,
self._logger,
self._addheaders,
self._article,
min_confidence=self._min_confidence,
max_time=self._max_time,
logger=self._logger,
headers=self._headers,
short_circuit=short_circuit,
parser_args=parser_args,
exclude_check=exclude,
config=self._search_config,
degree=degree,
exclusion_callback=self._get_exclusion_callback(),
config=self._config,
degree=self._degree,
)

if article.size < 20: # Auto-fail very small articles
result = workspace.get_result()
self._logger.info(result.get_log_message(self.title))
return result
if self._article.size < 20: # Auto-fail very small articles
return workspace.get_result()

if not no_links:
workspace.enqueue(parser.get_links())
workspace.enqueue(self._parser.get_links())
num_queries = 0
if not no_searches:
chunks = parser.chunk(max_queries)
chunks = self._parser.chunk(max_queries)
for chunk in chunks:
if short_circuit and workspace.finished:
workspace.possible_miss = True
break
log = "[[{0}]] -> querying {1} for {2!r}"
self._logger.debug(log.format(self.title, searcher.name, chunk))
workspace.enqueue(searcher.search(chunk))
self._logger.debug(
f"[[{self._page.title}]] -> querying {self._searcher.name} "
f"for {chunk!r}"
)
workspace.enqueue(self._searcher.search(chunk))
num_queries += 1
time.sleep(1)
time.sleep(1) # TODO: Check whether this is needed

workspace.wait()
result = workspace.get_result(num_queries)
self._logger.info(result.get_log_message(self.title))
return result

def copyvio_compare(self, urls, min_confidence=0.75, max_time=30, degree=5):
"""Check the page like :py:meth:`copyvio_check` against specific URLs.

This is essentially a reduced version of :meth:`copyvio_check` - a
copyivo comparison is made using Markov chains and the result is
returned in a :class:`.CopyvioCheckResult` object - but without using a
search engine, since the suspected "violated" URL is supplied from the
start.

Its primary use is to generate a result when the URL is retrieved from
a cache, like the one used in EarwigBot's Tool Labs site. After a
search is done, the resulting URL is stored in a cache for 72 hours so
future checks against that page will not require another set of
time-and-money-consuming search engine queries. However, the comparison
itself (which includes the article's and the source's content) cannot
be stored for data retention reasons, so a fresh comparison is made
using this function.

Since no searching is done, neither :exc:`.UnknownSearchEngineError`
nor :exc:`.SearchQueryError` will be raised.
"""
if not isinstance(urls, list):
urls = [urls]
log = "Starting copyvio compare for [[{0}]] against {1}"
self._logger.info(log.format(self.title, ", ".join(urls)))
article = MarkovChain(ArticleTextParser(self.get()).strip(), degree=degree)
return workspace.get_result(num_queries)

def run_compare(self, urls: list[str]) -> CopyvioCheckResult:
workspace = CopyvioWorkspace(
article,
min_confidence,
max_time,
self._logger,
self._addheaders,
max_time,
self._article,
min_confidence=self._min_confidence,
max_time=self._max_time,
logger=self._logger,
headers=self._headers,
url_timeout=self._max_time,
num_workers=min(len(urls), 8),
short_circuit=False,
config=self._search_config,
degree=degree,
config=self._config,
degree=self._degree,
)

workspace.enqueue(urls)
workspace.wait()
result = workspace.get_result()
self._logger.info(result.get_log_message(self.title))
return result
return workspace.get_result()

+ 51
- 35
src/earwigbot/wiki/copyvios/exclusions.py View File

@@ -18,15 +18,24 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

from __future__ import annotations

__all__ = ["ExclusionsDB"]

import logging
import re
import sqlite3
import threading
import time
import typing
import urllib.parse

from earwigbot import exceptions

__all__ = ["ExclusionsDB"]
if typing.TYPE_CHECKING:
from earwigbot.wiki.page import Page
from earwigbot.wiki.site import Site
from earwigbot.wiki.sitesdb import SitesDB

DEFAULT_SOURCES = {
"all": [ # Applies to all, but located on enwiki
@@ -52,26 +61,28 @@ class ExclusionsDB:
"""
**EarwigBot: Wiki Toolset: Exclusions Database Manager**

Controls the :file:`exclusions.db` file, which stores URLs excluded from
copyright violation checks on account of being known mirrors, for example.
Controls the :file:`exclusions.db` file, which stores URLs excluded from copyright
violation checks on account of being known mirrors, for example.
"""

def __init__(self, sitesdb, dbfile, logger):
def __init__(self, sitesdb: SitesDB, dbfile: str, logger: logging.Logger) -> None:
self._sitesdb = sitesdb
self._dbfile = dbfile
self._logger = logger
self._db_access_lock = threading.Lock()

def __repr__(self):
def __repr__(self) -> str:
"""Return the canonical string representation of the ExclusionsDB."""
res = "ExclusionsDB(sitesdb={0!r}, dbfile={1!r}, logger={2!r})"
return res.format(self._sitesdb, self._dbfile, self._logger)
return (
f"ExclusionsDB(sitesdb={self._sitesdb!r}, dbfile={self._dbfile!r}, "
f"logger={self._logger!r})"
)

def __str__(self):
def __str__(self) -> str:
"""Return a nice string representation of the ExclusionsDB."""
return f"<ExclusionsDB at {self._dbfile}>"

def _create(self):
def _create(self) -> None:
"""Initialize the exclusions database with its necessary tables."""
script = """
CREATE TABLE sources (source_sitename, source_page);
@@ -79,7 +90,7 @@ class ExclusionsDB:
CREATE TABLE exclusions (exclusion_sitename, exclusion_url);
"""
query = "INSERT INTO sources VALUES (?, ?);"
sources = []
sources: list[tuple[str, str]] = []
for sitename, pages in DEFAULT_SOURCES.items():
for page in pages:
sources.append((sitename, page))
@@ -88,9 +99,9 @@ class ExclusionsDB:
conn.executescript(script)
conn.executemany(query, sources)

def _load_source(self, site, source):
def _load_source(self, site: Site, source: str) -> set[str]:
"""Load from a specific source and return a set of URLs."""
urls = set()
urls: set[str] = set()
try:
data = site.get_page(source, follow_redirects=True).get()
except exceptions.PageNotFoundError:
@@ -123,7 +134,7 @@ class ExclusionsDB:
urls.add(url)
return urls

def _update(self, sitename):
def _update(self, sitename: str) -> None:
"""Update the database from listed sources in the index."""
query1 = "SELECT source_page FROM sources WHERE source_sitename = ?"
query2 = "SELECT exclusion_url FROM exclusions WHERE exclusion_sitename = ?"
@@ -140,7 +151,7 @@ class ExclusionsDB:
else:
site = self._sitesdb.get_site(sitename)
with self._db_access_lock, sqlite3.connect(self._dbfile) as conn:
urls = set()
urls: set[str] = set()
for (source,) in conn.execute(query1, (sitename,)):
urls |= self._load_source(site, source)
for (url,) in conn.execute(query2, (sitename,)):
@@ -154,7 +165,7 @@ class ExclusionsDB:
else:
conn.execute(query7, (sitename, int(time.time())))

def _get_last_update(self, sitename):
def _get_last_update(self, sitename: str) -> int:
"""Return the UNIX timestamp of the last time the db was updated."""
query = "SELECT update_time FROM updates WHERE update_sitename = ?"
with self._db_access_lock, sqlite3.connect(self._dbfile) as conn:
@@ -165,28 +176,34 @@ class ExclusionsDB:
return 0
return result[0] if result else 0

def sync(self, sitename, force=False):
"""Update the database if it hasn't been updated recently.
def sync(self, sitename: str, force: bool = False) -> None:
"""
Update the database if it hasn't been updated recently.

This updates the exclusions database for the site *sitename* and "all".

Site-specific lists are considered stale after 48 hours; global lists
after 12 hours.
Site-specific lists are considered stale after 48 hours; global lists after
12 hours.
"""
max_staleness = 60 * 60 * (12 if sitename == "all" else 48)
time_since_update = int(time.time() - self._get_last_update(sitename))
if force or time_since_update > max_staleness:
log = "Updating stale database: {0} (last updated {1} seconds ago)"
self._logger.info(log.format(sitename, time_since_update))
self._logger.info(
f"Updating stale database: {sitename} (last updated "
f"{time_since_update} seconds ago)"
)
self._update(sitename)
else:
log = "Database for {0} is still fresh (last updated {1} seconds ago)"
self._logger.debug(log.format(sitename, time_since_update))
self._logger.debug(
f"Database for {sitename} is still fresh (last updated "
f"{time_since_update} seconds ago)"
)
if sitename != "all":
self.sync("all", force=force)

def check(self, sitename, url):
"""Check whether a given URL is in the exclusions database.
def check(self, sitename: str, url: str) -> bool:
"""
Check whether a given URL is in the exclusions database.

Return ``True`` if the URL is in the database, or ``False`` otherwise.
"""
@@ -216,19 +233,18 @@ class ExclusionsDB:
else:
matches = normalized.startswith(excl)
if matches:
log = "Exclusion detected in {0} for {1}"
self._logger.debug(log.format(sitename, url))
self._logger.debug(f"Exclusion detected in {sitename} for {url}")
return True

log = f"No exclusions in {sitename} for {url}"
self._logger.debug(log)
self._logger.debug(f"No exclusions in {sitename} for {url}")
return False

def get_mirror_hints(self, page, try_mobile=True):
"""Return a list of strings that indicate the existence of a mirror.
def get_mirror_hints(self, page: Page, try_mobile: bool = True) -> list[str]:
"""
Return a list of strings that indicate the existence of a mirror.

The source parser checks for the presence of these strings inside of
certain HTML tag attributes (``"href"`` and ``"src"``).
The source parser checks for the presence of these strings inside of certain
HTML tag attributes (``"href"`` and ``"src"``).
"""
site = page.site
path = urllib.parse.urlparse(page.url).path
@@ -238,10 +254,10 @@ class ExclusionsDB:
if try_mobile:
fragments = re.search(r"^([\w]+)\.([\w]+).([\w]+)$", site.domain)
if fragments:
roots.append("{}.m.{}.{}".format(*fragments.groups()))
roots.append(f"{fragments[1]}.m.{fragments[2]}.{fragments[3]}")

general = [
root + site._script_path + "/" + script
root + site.script_path + "/" + script
for root in roots
for script in scripts
]


+ 45
- 31
src/earwigbot/wiki/copyvios/markov.py View File

@@ -18,29 +18,44 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

__all__ = [
"DEFAULT_DEGREE",
"EMPTY",
"EMPTY_INTERSECTION",
"MarkovChain",
"MarkovChainIntersection",
]

import re
from collections.abc import Iterable
from enum import Enum

__all__ = ["EMPTY", "EMPTY_INTERSECTION", "MarkovChain", "MarkovChainIntersection"]
DEFAULT_DEGREE = 5


class MarkovChain:
"""Implements a basic ngram Markov chain of words."""

class Sentinel(Enum):
START = -1
END = -2

def __init__(self, text, degree=5):

RawChain = dict[tuple[str | Sentinel, ...], int]


class MarkovChain:
"""Implements a basic ngram Markov chain of words."""

def __init__(self, text: str, degree: int = DEFAULT_DEGREE) -> None:
self.text = text
self.degree = degree # 2 for bigrams, 3 for trigrams, etc.
self.chain = self._build()
self.size = self._get_size()

def _build(self):
def _build(self) -> RawChain:
"""Build and return the Markov chain from the input text."""
padding = self.degree - 1
words = re.sub(r"[^\w\s-]", "", self.text.lower(), flags=re.UNICODE).split()
words = ([self.START] * padding) + words + ([self.END] * padding)
chain = {}
words = re.sub(r"[^\w\s-]", "", self.text.lower()).split()
words = ([Sentinel.START] * padding) + words + ([Sentinel.END] * padding)
chain: RawChain = {}

for i in range(len(words) - self.degree + 1):
phrase = tuple(words[i : i + self.degree])
@@ -50,15 +65,15 @@ class MarkovChain:
chain[phrase] = 1
return chain

def _get_size(self):
def _get_size(self) -> int:
"""Return the size of the Markov chain: the total number of nodes."""
return sum(self.chain.values())

def __repr__(self):
def __repr__(self) -> str:
"""Return the canonical string representation of the MarkovChain."""
return f"MarkovChain(text={self.text!r})"

def __str__(self):
def __str__(self) -> str:
"""Return a nice string representation of the MarkovChain."""
return f"<MarkovChain of size {self.size}>"

@@ -66,61 +81,60 @@ class MarkovChain:
class MarkovChainIntersection(MarkovChain):
"""Implements the intersection of two chains (i.e., their shared nodes)."""

def __init__(self, mc1, mc2):
def __init__(self, mc1: MarkovChain, mc2: MarkovChain) -> None:
self.mc1, self.mc2 = mc1, mc2
self.chain = self._build()
self.size = self._get_size()

def _build(self):
def _build(self) -> RawChain:
"""Build and return the Markov chain from the input chains."""
c1 = self.mc1.chain
c2 = self.mc2.chain
chain = {}
chain: RawChain = {}

for phrase in c1:
if phrase in c2:
chain[phrase] = min(c1[phrase], c2[phrase])
return chain

def __repr__(self):
def __repr__(self) -> str:
"""Return the canonical string representation of the intersection."""
res = "MarkovChainIntersection(mc1={0!r}, mc2={1!r})"
return res.format(self.mc1, self.mc2)
return f"MarkovChainIntersection(mc1={self.mc1!r}, mc2={self.mc2!r})"

def __str__(self):
def __str__(self) -> str:
"""Return a nice string representation of the intersection."""
res = "<MarkovChainIntersection of size {0} ({1} ^ {2})>"
return res.format(self.size, self.mc1, self.mc2)
return (
f"<MarkovChainIntersection of size {self.size} ({self.mc1} ^ {self.mc2})>"
)


class MarkovChainUnion(MarkovChain):
"""Implemented the union of multiple chains."""

def __init__(self, chains):
def __init__(self, chains: Iterable[MarkovChain]) -> None:
self.chains = list(chains)
self.chain = self._build()
self.size = self._get_size()

def _build(self):
def _build(self) -> RawChain:
"""Build and return the Markov chain from the input chains."""
union = {}
union: RawChain = {}
for chain in self.chains:
for phrase, count in chain.chain.iteritems():
for phrase, count in chain.chain.items():
if phrase in union:
union[phrase] += count
else:
union[phrase] = count
return union

def __repr__(self):
def __repr__(self) -> str:
"""Return the canonical string representation of the union."""
res = "MarkovChainUnion(chains={!r})"
return res.format(self.chains)
return f"MarkovChainUnion(chains={self.chains!r})"

def __str__(self):
def __str__(self) -> str:
"""Return a nice string representation of the union."""
res = "<MarkovChainUnion of size {} ({})>"
return res.format(self.size, "| ".join(str(chain) for chain in self.chains))
chains = " | ".join(str(chain) for chain in self.chains)
return f"<MarkovChainUnion of size {self.size} ({chains})>"


EMPTY = MarkovChain("")


+ 143
- 86
src/earwigbot/wiki/copyvios/parsers.py View File

@@ -18,44 +18,34 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

from __future__ import annotations

__all__ = ["ArticleParser", "get_parser"]

import io
import json
import os.path
import re
import typing
import urllib.parse
import urllib.request
from abc import ABC, abstractmethod
from collections.abc import Callable
from typing import Any, ClassVar, Literal, TypedDict

import mwparserfromhell

from earwigbot.exceptions import ParserExclusionError, ParserRedirectError

__all__ = ["ArticleTextParser", "get_parser"]


class _BaseTextParser:
"""Base class for a parser that handles text."""

TYPE = None

def __init__(self, text, url=None, args=None):
self.text = text
self.url = url
self._args = args or {}

def __repr__(self):
"""Return the canonical string representation of the text parser."""
return f"{self.__class__.__name__}(text={self.text!r})"
if typing.TYPE_CHECKING:
import bs4

def __str__(self):
"""Return a nice string representation of the text parser."""
name = self.__class__.__name__
return f"<{name} of text with size {len(self.text)}>"
from earwigbot.wiki.copyvios.workers import OpenedURL


class ArticleTextParser(_BaseTextParser):
class ArticleParser:
"""A parser that can strip and chunk wikicode article text."""

TYPE = "Article"
TEMPLATE_MERGE_THRESHOLD = 35
NLTK_DEFAULT = "english"
NLTK_LANGS = {
@@ -78,7 +68,18 @@ class ArticleTextParser(_BaseTextParser):
"tr": "turkish",
}

def _merge_templates(self, code):
def __init__(self, text: str, lang: str, nltk_dir: str) -> None:
self.text = text
self._lang = lang
self._nltk_dir = nltk_dir

def __repr__(self) -> str:
return f"{self.__class__.__name__}(text={self.text!r})"

def __str__(self) -> str:
return f"<{self.__class__.__name__} of text with size {len(self.text)}>"

def _merge_templates(self, code: mwparserfromhell.wikicode.Wikicode) -> None:
"""Merge template contents in to wikicode when the values are long."""
for template in code.filter_templates(recursive=code.RECURSE_OTHERS):
chunks = []
@@ -92,23 +93,25 @@ class ArticleTextParser(_BaseTextParser):
else:
code.remove(template)

def _get_tokenizer(self):
def _get_tokenizer(self) -> Any:
"""Return a NLTK punctuation tokenizer for the article's language."""
import nltk

def datafile(lang):
def datafile(lang: str) -> str:
return "file:" + os.path.join(
self._args["nltk_dir"], "tokenizers", "punkt", lang + ".pickle"
self._nltk_dir, "tokenizers", "punkt", lang + ".pickle"
)

lang = self.NLTK_LANGS.get(self._args.get("lang"), self.NLTK_DEFAULT)
lang = self.NLTK_LANGS.get(self._lang, self.NLTK_DEFAULT)
try:
nltk.data.load(datafile(self.NLTK_DEFAULT))
except LookupError:
nltk.download("punkt", self._args["nltk_dir"])
nltk.download("punkt", self._nltk_dir)
return nltk.data.load(datafile(lang))

def _get_sentences(self, min_query, max_query, split_thresh):
def _get_sentences(
self, min_query: int, max_query: int, split_thresh: int
) -> list[str]:
"""Split the article text into sentences of a certain length."""

def cut_sentence(words):
@@ -138,24 +141,27 @@ class ArticleTextParser(_BaseTextParser):
sentences.extend(cut_sentence(sentence.split()))
return [sen for sen in sentences if len(sen) >= min_query]

def strip(self):
"""Clean the page's raw text by removing templates and formatting.
def strip(self) -> str:
"""
Clean the page's raw text by removing templates and formatting.

Return the page's text with all HTML and wikicode formatting removed,
including templates, tables, and references. It retains punctuation
(spacing, paragraphs, periods, commas, (semi)-colons, parentheses,
quotes), original capitalization, and so forth. HTML entities are
replaced by their unicode equivalents.
Return the page's text with all HTML and wikicode formatting removed, including
templates, tables, and references. It retains punctuation (spacing, paragraphs,
periods, commas, (semi)-colons, parentheses, quotes), original capitalization,
and so forth. HTML entities are replaced by their unicode equivalents.

The actual stripping is handled by :py:mod:`mwparserfromhell`.
"""

def remove(code, node):
"""Remove a node from a code object, ignoring ValueError.
def remove(
code: mwparserfromhell.wikicode.Wikicode, node: mwparserfromhell.nodes.Node
) -> None:
"""
Remove a node from a code object, ignoring ValueError.

Sometimes we will remove a node that contains another node we wish
to remove, and we fail when we try to remove the inner one. Easiest
solution is to just ignore the exception.
Sometimes we will remove a node that contains another node we wish to
remove, and we fail when we try to remove the inner one. Easiest solution
is to just ignore the exception.
"""
try:
code.remove(node)
@@ -181,26 +187,32 @@ class ArticleTextParser(_BaseTextParser):
self.clean = re.sub(r"\n\n+", "\n", clean).strip()
return self.clean

def chunk(self, max_chunks, min_query=8, max_query=128, split_thresh=32):
"""Convert the clean article text into a list of web-searchable chunks.

No greater than *max_chunks* will be returned. Each chunk will only be
a sentence or two long at most (no more than *max_query*). The idea is
to return a sample of the article text rather than the whole, so we'll
pick and choose from parts of it, especially if the article is large
and *max_chunks* is low, so we don't end up just searching for just the
first paragraph.

This is implemented using :py:mod:`nltk` (https://nltk.org/). A base
directory (*nltk_dir*) is required to store nltk's punctuation
database, and should be passed as an argument to the constructor. It is
typically located in the bot's working directory.
def chunk(
self,
max_chunks: int,
min_query: int = 8,
max_query: int = 128,
split_thresh: int = 32,
) -> list[str]:
"""
Convert the clean article text into a list of web-searchable chunks.

No greater than *max_chunks* will be returned. Each chunk will only be a
sentence or two long at most (no more than *max_query*). The idea is to return
a sample of the article text rather than the whole, so we'll pick and choose
from parts of it, especially if the article is large and *max_chunks* is low,
so we don't end up just searching for just the first paragraph.

This is implemented using :py:mod:`nltk` (https://nltk.org/). A base directory
(*nltk_dir*) is required to store nltk's punctuation database, and should be
passed as an argument to the constructor. It is typically located in the bot's
working directory.
"""
sentences = self._get_sentences(min_query, max_query, split_thresh)
if len(sentences) <= max_chunks:
return sentences

chunks = []
chunks: list[str] = []
while len(chunks) < max_chunks:
if len(chunks) % 5 == 0:
chunk = sentences.pop(0) # Pop from beginning
@@ -216,7 +228,8 @@ class ArticleTextParser(_BaseTextParser):
return chunks

def get_links(self):
"""Return a list of all external links in the article.
"""
Return a list of all external links in the article.

The list is restricted to things that we suspect we can parse: i.e.,
those with schemes of ``http`` and ``https``.
@@ -226,14 +239,42 @@ class ArticleTextParser(_BaseTextParser):
return [str(link.url) for link in links if link.url.startswith(schemes)]


class _HTMLParser(_BaseTextParser):
class ParserArgs(TypedDict, total=False):
mirror_hints: list[str]
open_url: Callable[[str], OpenedURL | None]


class SourceParser(ABC):
"""Base class for a parser that handles text."""

TYPE: ClassVar[str]

def __init__(self, text: bytes, url: str, args: ParserArgs | None = None) -> None:
self.text = text
self.url = url
self._args = args or {}

def __repr__(self) -> str:
"""Return the canonical string representation of the text parser."""
return f"{self.__class__.__name__}(text={self.text!r})"

def __str__(self) -> str:
"""Return a nice string representation of the text parser."""
return f"<{self.__class__.__name__} of text with size {len(self.text)}>"

@abstractmethod
def parse(self) -> str: ...


class HTMLParser(SourceParser):
"""A parser that can extract the text from an HTML document."""

TYPE = "HTML"
hidden_tags = ["script", "style"]

def _fail_if_mirror(self, soup):
"""Look for obvious signs that the given soup is a wiki mirror.
def _fail_if_mirror(self, soup: bs4.BeautifulSoup) -> None:
"""
Look for obvious signs that the given soup is a wiki mirror.

If so, raise ParserExclusionError, which is caught in the workers and
causes this source to excluded.
@@ -242,13 +283,14 @@ class _HTMLParser(_BaseTextParser):
return

def func(attr):
assert "mirror_hints" in self._args
return attr and any(hint in attr for hint in self._args["mirror_hints"])

if soup.find_all(href=func) or soup.find_all(src=func):
raise ParserExclusionError()

@staticmethod
def _get_soup(text):
def _get_soup(text: bytes) -> bs4.BeautifulSoup:
"""Parse some text using BeautifulSoup."""
import bs4

@@ -257,11 +299,11 @@ class _HTMLParser(_BaseTextParser):
except ValueError:
return bs4.BeautifulSoup(text)

def _clean_soup(self, soup):
def _clean_soup(self, soup: bs4.element.Tag) -> str:
"""Clean a BeautifulSoup tree of invisible tags."""
import bs4

def is_comment(text):
def is_comment(text: bs4.element.Tag) -> bool:
return isinstance(text, bs4.element.Comment)

for comment in soup.find_all(text=is_comment):
@@ -272,7 +314,7 @@ class _HTMLParser(_BaseTextParser):

return "\n".join(s.replace("\n", " ") for s in soup.stripped_strings)

def _open(self, url, **kwargs):
def _open(self, url: str, **kwargs: Any) -> bytes | None:
"""Try to read a URL. Return None if it couldn't be read."""
opener = self._args.get("open_url")
if not opener:
@@ -280,13 +322,13 @@ class _HTMLParser(_BaseTextParser):
result = opener(url, **kwargs)
return result.content if result else None

def _load_from_blogspot(self, url):
def _load_from_blogspot(self, url: urllib.parse.ParseResult) -> str:
"""Load dynamic content from Blogger Dynamic Views."""
match = re.search(r"'postId': '(\d+)'", self.text)
match = re.search(rb"'postId': '(\d+)'", self.text)
if not match:
return ""
post_id = match.group(1)
url = f"https://{url.netloc}/feeds/posts/default/{post_id}?"
feed_url = f"https://{url.netloc}/feeds/posts/default/{post_id}?"
params = {
"alt": "json",
"v": "2",
@@ -294,7 +336,7 @@ class _HTMLParser(_BaseTextParser):
"rewriteforssl": "true",
}
raw = self._open(
url + urllib.parse.urlencode(params),
feed_url + urllib.parse.urlencode(params),
allow_content_types=["application/json"],
)
if raw is None:
@@ -308,19 +350,24 @@ class _HTMLParser(_BaseTextParser):
except KeyError:
return ""
soup = self._get_soup(text)
if not soup.body:
return ""
return self._clean_soup(soup.body)

def parse(self):
"""Return the actual text contained within an HTML document.
def parse(self) -> str:
"""
Return the actual text contained within an HTML document.

Implemented using :py:mod:`BeautifulSoup <bs4>`
(https://www.crummy.com/software/BeautifulSoup/).
(https://pypi.org/project/beautifulsoup4/).
"""
import bs4

url = urllib.parse.urlparse(self.url) if self.url else None
soup = self._get_soup(self.text)
if not soup.body:
# No <body> tag present in HTML ->
# no scrapable content (possibly JS or <iframe> magic):
# No <body> tag present in HTML -> # no scrapable content
# (possibly JS or <iframe> magic):
return ""

self._fail_if_mirror(soup)
@@ -328,7 +375,7 @@ class _HTMLParser(_BaseTextParser):

if url and url.netloc == "web.archive.org" and url.path.endswith(".pdf"):
playback = body.find(id="playback")
if playback and "src" in playback.attrs:
if isinstance(playback, bs4.element.Tag) and "src" in playback.attrs:
raise ParserRedirectError(playback.attrs["src"])

content = self._clean_soup(body)
@@ -339,7 +386,7 @@ class _HTMLParser(_BaseTextParser):
return content


class _PDFParser(_BaseTextParser):
class PDFParser(SourceParser):
"""A parser that can extract text from a PDF file."""

TYPE = "PDF"
@@ -348,7 +395,7 @@ class _PDFParser(_BaseTextParser):
("\u2022", " "),
]

def parse(self):
def parse(self) -> str:
"""Return extracted text from the PDF."""
from pdfminer import converter, pdfinterp, pdfpage

@@ -358,7 +405,7 @@ class _PDFParser(_BaseTextParser):
interp = pdfinterp.PDFPageInterpreter(manager, conv)

try:
pages = pdfpage.PDFPage.get_pages(io.StringIO(self.text))
pages = pdfpage.PDFPage.get_pages(io.BytesIO(self.text))
for page in pages:
interp.process_page(page)
except Exception: # pylint: disable=broad-except
@@ -372,12 +419,12 @@ class _PDFParser(_BaseTextParser):
return re.sub(r"\n\n+", "\n", value).strip()


class _PlainTextParser(_BaseTextParser):
class PlainTextParser(SourceParser):
"""A parser that can unicode-ify and strip text from a plain text page."""

TYPE = "Text"

def parse(self):
def parse(self) -> str:
"""Unicode-ify and strip whitespace from the plain text document."""
from bs4.dammit import UnicodeDammit

@@ -385,15 +432,25 @@ class _PlainTextParser(_BaseTextParser):
return converted.strip() if converted else ""


_CONTENT_TYPES = {
"text/html": _HTMLParser,
"application/xhtml+xml": _HTMLParser,
"application/pdf": _PDFParser,
"application/x-pdf": _PDFParser,
"text/plain": _PlainTextParser,
_CONTENT_TYPES: dict[str, type[SourceParser]] = {
"text/html": HTMLParser,
"application/xhtml+xml": HTMLParser,
"application/pdf": PDFParser,
"application/x-pdf": PDFParser,
"text/plain": PlainTextParser,
}


def get_parser(content_type):
@typing.overload
def get_parser(content_type: str) -> type[SourceParser] | None: ...


@typing.overload
def get_parser(
content_type: Literal["text/plain"] = "text/plain",
) -> type[SourceParser]: ...


def get_parser(content_type: str = "text/plain") -> type[SourceParser] | None:
"""Return the parser most able to handle a given content type, or None."""
return _CONTENT_TYPES.get(content_type)

+ 71
- 57
src/earwigbot/wiki/copyvios/result.py View File

@@ -18,13 +18,26 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

from __future__ import annotations

__all__ = ["CopyvioSource", "CopyvioCheckResult"]

import time
import typing
import urllib.parse
from threading import Event
from time import time
from typing import Any

from earwigbot.wiki.copyvios.markov import EMPTY, EMPTY_INTERSECTION
from earwigbot.wiki.copyvios.markov import (
EMPTY,
EMPTY_INTERSECTION,
MarkovChain,
MarkovChainIntersection,
)

__all__ = ["CopyvioSource", "CopyvioCheckResult"]
if typing.TYPE_CHECKING:
from earwigbot.wiki.copyvios.parsers import ParserArgs
from earwigbot.wiki.copyvios.workers import CopyvioWorkspace


class CopyvioSource:
@@ -45,13 +58,13 @@ class CopyvioSource:

def __init__(
self,
workspace,
url,
headers=None,
timeout=5,
parser_args=None,
search_config=None,
):
workspace: CopyvioWorkspace,
url: str,
headers: list[tuple[str, str]] | None = None,
timeout: float = 5,
parser_args: ParserArgs | None = None,
search_config: dict[str, Any] | None = None,
) -> None:
self.workspace = workspace
self.url = url
self.headers = headers
@@ -68,54 +81,57 @@ class CopyvioSource:
self._event2 = Event()
self._event2.set()

def __repr__(self):
def __repr__(self) -> str:
"""Return the canonical string representation of the source."""
res = (
"CopyvioSource(url={0!r}, confidence={1!r}, skipped={2!r}, "
"excluded={3!r})"
return (
f"CopyvioSource(url={self.url!r}, confidence={self.confidence!r}, "
f"skipped={self.skipped!r}, excluded={self.excluded!r})"
)
return res.format(self.url, self.confidence, self.skipped, self.excluded)

def __str__(self):
def __str__(self) -> str:
"""Return a nice string representation of the source."""
if self.excluded:
return f"<CopyvioSource ({self.url}, excluded)>"
if self.skipped:
return f"<CopyvioSource ({self.url}, skipped)>"
res = "<CopyvioSource ({0} with {1} conf)>"
return res.format(self.url, self.confidence)
return f"<CopyvioSource ({self.url} with {self.confidence} conf)>"

@property
def domain(self):
def domain(self) -> str | None:
"""The source URL's domain name, or None."""
return urllib.parse.urlparse(self.url).netloc or None

def start_work(self):
def start_work(self) -> None:
"""Mark this source as being worked on right now."""
self._event2.clear()
self._event1.set()

def update(self, confidence, source_chain, delta_chain):
def update(
self,
confidence: float,
source_chain: MarkovChain,
delta_chain: MarkovChainIntersection,
) -> None:
"""Fill out the confidence and chain information inside this source."""
self.confidence = confidence
self.chains = (source_chain, delta_chain)

def finish_work(self):
def finish_work(self) -> None:
"""Mark this source as finished."""
self._event2.set()

def skip(self):
def skip(self) -> None:
"""Deactivate this source without filling in the relevant data."""
if self._event1.is_set():
return
self.skipped = True
self._event1.set()

def join(self, until):
def join(self, until: float | None = None) -> None:
"""Block until this violation result is filled out."""
for event in [self._event1, self._event2]:
if until:
timeout = until - time()
if until is not None:
timeout = until - time.time()
if timeout <= 0:
return
event.wait(timeout)
@@ -144,16 +160,15 @@ class CopyvioCheckResult:

def __init__(
self,
violation,
sources,
queries,
check_time,
article_chain,
possible_miss,
included_sources=None,
unified_confidence=None,
violation: bool,
sources: list[CopyvioSource],
queries: int,
check_time: float,
article_chain: MarkovChain,
possible_miss: bool,
included_sources: list[CopyvioSource] | None = None,
unified_confidence: float | None = None,
):
assert isinstance(sources, list)
self.violation = violation
self.sources = sources
self.queries = queries
@@ -163,48 +178,47 @@ class CopyvioCheckResult:
self.included_sources = included_sources if included_sources else []
self.unified_confidence = unified_confidence

def __repr__(self):
def __repr__(self) -> str:
"""Return the canonical string representation of the result."""
res = "CopyvioCheckResult(violation={0!r}, sources={1!r}, queries={2!r}, time={3!r})"
return res.format(self.violation, self.sources, self.queries, self.time)
return (
f"CopyvioCheckResult(violation={self.violation!r}, "
f"sources={self.sources!r}, queries={self.queries!r}, time={self.time!r})"
)

def __str__(self):
def __str__(self) -> str:
"""Return a nice string representation of the result."""
res = "<CopyvioCheckResult ({0} with best {1})>"
return res.format(self.violation, self.best)
return f"<CopyvioCheckResult ({self.violation} with best {self.best})>"

@property
def best(self):
def best(self) -> CopyvioSource | None:
"""The best known source, or None if no sources exist."""
return self.sources[0] if self.sources else None

@property
def confidence(self):
def confidence(self) -> float:
"""The confidence of the best source, or 0 if no sources exist."""
if self.unified_confidence is not None:
return self.unified_confidence
if self.best:
if self.best is not None:
return self.best.confidence
return 0.0

@property
def url(self):
def url(self) -> str | None:
"""The URL of the best source, or None if no sources exist."""
return self.best.url if self.best else None

def get_log_message(self, title):
def get_log_message(self, title: str) -> str:
"""Build a relevant log message for this copyvio check result."""
if not self.sources:
log = "No violation for [[{0}]] (no sources; {1} queries; {2} seconds)"
return log.format(title, self.queries, self.time)
log = "{0} for [[{1}]] (best: {2} ({3} confidence); {4} sources; {5} queries; {6} seconds)"
return (
f"No violation for [[{title}]] (no sources; {self.queries} queries; "
f"{self.time} seconds)"
)

is_vio = "Violation detected" if self.violation else "No violation"
return log.format(
is_vio,
title,
self.url,
self.confidence,
len(self.sources),
self.queries,
self.time,
return (
f"{is_vio} for [[{title}]] (best: {self.url} ({self.confidence} "
f"confidence); {len(self.sources)} sources; {self.queries} queries; "
f"{self.time} seconds)"
)

+ 93
- 48
src/earwigbot/wiki/copyvios/search.py View File

@@ -18,91 +18,101 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

import re
from gzip import GzipFile
from io import StringIO
from json import loads
from urllib.error import URLError
from urllib.parse import urlencode

from earwigbot.exceptions import SearchQueryError

__all__ = [
"BingSearchEngine",
"GoogleSearchEngine",
"SearchEngine",
"YandexSearchEngine",
"SEARCH_ENGINES",
"get_search_engine",
]

import base64
import gzip
import io
import json
import re
import urllib.parse
import urllib.request
from abc import ABC, abstractmethod
from typing import Any
from urllib.error import URLError

class _BaseSearchEngine:
from earwigbot import exceptions


class SearchEngine(ABC):
"""Base class for a simple search engine interface."""

name = "Base"

def __init__(self, cred, opener):
def __init__(
self, cred: dict[str, str], opener: urllib.request.OpenerDirector
) -> None:
"""Store credentials (*cred*) and *opener* for searching later on."""
self.cred = cred
self.opener = opener
self.count = 5

def __repr__(self):
def __repr__(self) -> str:
"""Return the canonical string representation of the search engine."""
return f"{self.__class__.__name__}()"

def __str__(self):
def __str__(self) -> str:
"""Return a nice string representation of the search engine."""
return f"<{self.__class__.__name__}>"

def _open(self, *args):
def _open(self, url: str) -> bytes:
"""Open a URL (like urlopen) and try to return its contents."""
try:
response = self.opener.open(*args)
response = self.opener.open(url)
result = response.read()
except (OSError, URLError) as exc:
err = SearchQueryError(f"{self.name} Error: {exc}")
err.cause = exc
raise err
raise exceptions.SearchQueryError(f"{self.name} Error: {exc}")

if response.headers.get("Content-Encoding") == "gzip":
stream = StringIO(result)
gzipper = GzipFile(fileobj=stream)
stream = io.BytesIO(result)
gzipper = gzip.GzipFile(fileobj=stream)
result = gzipper.read()

code = response.getcode()
if code != 200:
err = "{0} Error: got response code '{1}':\n{2}'"
raise SearchQueryError(err.format(self.name, code, result))
raise exceptions.SearchQueryError(
f"{self.name} Error: got response code '{code}':\n{result}'"
)

return result

@staticmethod
def requirements():
def requirements() -> list[str]:
"""Return a list of packages required by this search engine."""
return []

def search(self, query):
"""Use this engine to search for *query*.
@abstractmethod
def search(self, query: str) -> list[str]:
"""
Use this engine to search for *query*.

Not implemented in this base class; overridden in subclasses.
"""
raise NotImplementedError()


class BingSearchEngine(_BaseSearchEngine):
class BingSearchEngine(SearchEngine):
"""A search engine interface with Bing Search (via Azure Marketplace)."""

name = "Bing"

def __init__(self, cred, opener):
def __init__(
self, cred: dict[str, str], opener: urllib.request.OpenerDirector
) -> None:
super().__init__(cred, opener)

key = self.cred["key"]
auth = (key + ":" + key).encode("base64").replace("\n", "")
self.opener.addheaders.append(("Authorization", "Basic " + auth))
auth = base64.b64encode(f"{key}:{key}".encode()).decode()
self.opener.addheaders.append(("Authorization", f"Basic {auth}"))

def search(self, query: str) -> list[str]:
"""Do a Bing web search for *query*.
"""
Do a Bing web search for *query*.

Returns a list of URLs ranked by relevance (as determined by Bing).
Raises :py:exc:`~earwigbot.exceptions.SearchQueryError` on errors.
@@ -112,20 +122,19 @@ class BingSearchEngine(_BaseSearchEngine):
params = {
"$format": "json",
"$top": str(self.count),
"Query": "'\"" + query.replace('"', "").encode("utf8") + "\"'",
"Query": "'\"" + query.replace('"', "") + "\"'",
"Market": "'en-US'",
"Adult": "'Off'",
"Options": "'DisableLocationDetection'",
"WebSearchOptions": "'DisableHostCollapsing+DisableQueryAlterations'",
}

result = self._open(url + urlencode(params))
result = self._open(url + urllib.parse.urlencode(params))

try:
res = loads(result)
res = json.loads(result)
except ValueError:
err = "Bing Error: JSON could not be decoded"
raise SearchQueryError(err)
raise exceptions.SearchQueryError("Bing Error: JSON could not be decoded")

try:
results = res["d"]["results"]
@@ -134,13 +143,14 @@ class BingSearchEngine(_BaseSearchEngine):
return [result["Url"] for result in results]


class GoogleSearchEngine(_BaseSearchEngine):
class GoogleSearchEngine(SearchEngine):
"""A search engine interface with Google Search."""

name = "Google"

def search(self, query: str) -> list[str]:
"""Do a Google web search for *query*.
"""
Do a Google web search for *query*.

Returns a list of URLs ranked by relevance (as determined by Google).
Raises :py:exc:`~earwigbot.exceptions.SearchQueryError` on errors.
@@ -157,13 +167,13 @@ class GoogleSearchEngine(_BaseSearchEngine):
"fields": "items(link)",
}

result = self._open(url + urlencode(params))
result = self._open(url + urllib.parse.urlencode(params))

try:
res = loads(result)
res = json.loads(result)
except ValueError:
err = "Google Error: JSON could not be decoded"
raise SearchQueryError(err)
raise exceptions.SearchQueryError(err)

try:
return [item["link"] for item in res["items"]]
@@ -171,7 +181,7 @@ class GoogleSearchEngine(_BaseSearchEngine):
return []


class YandexSearchEngine(_BaseSearchEngine):
class YandexSearchEngine(SearchEngine):
"""A search engine interface with Yandex Search."""

name = "Yandex"
@@ -181,7 +191,8 @@ class YandexSearchEngine(_BaseSearchEngine):
return ["lxml.etree"]

def search(self, query: str) -> list[str]:
"""Do a Yandex web search for *query*.
"""
Do a Yandex web search for *query*.

Returns a list of URLs ranked by relevance (as determined by Yandex).
Raises :py:exc:`~earwigbot.exceptions.SearchQueryError` on errors.
@@ -201,17 +212,51 @@ class YandexSearchEngine(_BaseSearchEngine):
"groupby": f"mode=flat.groups-on-page={self.count}",
}

result = self._open(url + urlencode(params))
result = self._open(url + urllib.parse.urlencode(params))

try:
data = lxml.etree.fromstring(result) # type: ignore
data = lxml.etree.fromstring(result)
return [elem.text for elem in data.xpath(".//url")]
except lxml.etree.Error as exc:
raise SearchQueryError("Yandex XML parse error: " + str(exc))
raise exceptions.SearchQueryError(f"Yandex XML parse error: {exc}")


SEARCH_ENGINES = {
SEARCH_ENGINES: dict[str, type[SearchEngine]] = {
"Bing": BingSearchEngine,
"Google": GoogleSearchEngine,
"Yandex": YandexSearchEngine,
}


def get_search_engine(
search_config: dict[str, Any], headers: list[tuple[str, str]]
) -> SearchEngine:
"""Return a function that can be called to do web searches.

The function takes one argument, a search query, and returns a list of URLs, ranked
by importance. The underlying logic depends on the *engine* argument within our
config; for example, if *engine* is "Yahoo! BOSS", we'll use YahooBOSSSearchEngine
for querying.

Raises UnknownSearchEngineError if the 'engine' listed in our config is unknown to
us, and UnsupportedSearchEngineError if we are missing a required package or
module, like oauth2 for "Yahoo! BOSS".
"""
engine = search_config["engine"]
if engine not in SEARCH_ENGINES:
raise exceptions.UnknownSearchEngineError(engine)

klass = SEARCH_ENGINES[engine]
credentials = search_config["credentials"]
opener = urllib.request.build_opener()
opener.addheaders = headers

for dep in klass.requirements():
try:
__import__(dep).__name__
except (ModuleNotFoundError, AttributeError):
e = "Missing a required dependency ({}) for the {} engine"
e = e.format(dep, engine)
raise exceptions.UnsupportedSearchEngineError(e)

return klass(credentials, opener)

+ 155
- 125
src/earwigbot/wiki/copyvios/workers.py View File

@@ -18,59 +18,61 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

from __future__ import annotations

__all__ = ["globalize", "localize", "CopyvioWorkspace"]

import base64
import collections
import dataclasses
import functools
import gzip
import io
import logging
import math
import queue
import struct
import threading
import time
import urllib.parse
from collections import deque
from gzip import GzipFile
import urllib.request
from collections.abc import Callable, Container
from dataclasses import dataclass
from http.client import HTTPException
from io import StringIO
from logging import getLogger
from math import log
from queue import Empty, Queue
from struct import error as struct_error
from threading import Lock, Thread
from typing import Any
from urllib.error import URLError
from urllib.request import Request, build_opener

from earwigbot import importer
from earwigbot.exceptions import ParserExclusionError, ParserRedirectError
from earwigbot.wiki.copyvios.markov import (
DEFAULT_DEGREE,
MarkovChain,
MarkovChainIntersection,
MarkovChainUnion,
)
from earwigbot.wiki.copyvios.parsers import get_parser
from earwigbot.wiki.copyvios.parsers import ParserArgs, SourceParser, get_parser
from earwigbot.wiki.copyvios.result import CopyvioCheckResult, CopyvioSource

tldextract = importer.new("tldextract")

__all__ = ["globalize", "localize", "CopyvioWorkspace"]

INCLUDE_THRESHOLD = 0.15

_MAX_REDIRECTS = 3
_MAX_RAW_SIZE = 20 * 1024**2

_is_globalized = False
_global_queues = None
_global_workers = []
_global_queues: _CopyvioQueues | None = None
_global_workers: list[_CopyvioWorker] = []

_OpenedURL = collections.namedtuple("_OpenedURL", ["content", "parser_class"])

def globalize(num_workers: int = 8) -> None:
"""
Cause all copyvio checks to be done by one global set of workers.

def globalize(num_workers=8):
"""Cause all copyvio checks to be done by one global set of workers.

This is useful when checks are being done through a web interface where
large numbers of simulatenous requests could be problematic. The global
workers are spawned when the function is called, run continuously, and
intelligently handle multiple checks.
This is useful when checks are being done through a web interface where large
numbers of simulatenous requests could be problematic. The global workers are
spawned when the function is called, run continuously, and intelligently handle
multiple checks.

This function is not thread-safe and should only be called when no checks
are being done. It has no effect if it has already been called.
This function is not thread-safe and should only be called when no checks are being
done. It has no effect if it has already been called.
"""
global _is_globalized, _global_queues
if _is_globalized:
@@ -84,19 +86,20 @@ def globalize(num_workers=8):
_is_globalized = True


def localize():
def localize() -> None:
"""Return to using page-specific workers for copyvio checks.

This disables changes made by :func:`globalize`, including stoping the
global worker threads.
This disables changes made by :func:`globalize`, including stoping the global
worker threads.

This function is not thread-safe and should only be called when no checks
are being done.
This function is not thread-safe and should only be called when no checks are
being done.
"""
global _is_globalized, _global_queues, _global_workers
if not _is_globalized:
return

assert _global_queues is not None
for i in range(len(_global_workers)):
_global_queues.unassigned.put((StopIteration, None))
_global_queues = None
@@ -104,30 +107,50 @@ def localize():
_is_globalized = False


@dataclass(frozen=True)
class OpenedURL:
content: bytes
parser_class: type[SourceParser]


SourceQueue = collections.deque[CopyvioSource]
UnassignedQueue = queue.Queue[
tuple[str, SourceQueue] | tuple[type[StopIteration], None]
]


@dataclass(frozen=True)
class _CopyvioQueues:
"""Stores data necessary to maintain the various queues during a check."""

def __init__(self):
self.lock = Lock()
self.sites = {}
self.unassigned = Queue()
lock: threading.Lock = dataclasses.field(default_factory=threading.Lock)
sites: dict[str, SourceQueue] = dataclasses.field(default_factory=dict)
unassigned: UnassignedQueue = dataclasses.field(default_factory=queue.Queue)


class _CopyvioWorker:
"""A multithreaded URL opener/parser instance."""

def __init__(self, name, queues, until=None):
def __init__(
self, name: str, queues: _CopyvioQueues, until: float | None = None
) -> None:
self._name = name
self._queues = queues
self._until = until

self._site = None
self._queue = None
self._search_config = None
self._opener = build_opener()
self._logger = getLogger("earwigbot.wiki.cvworker." + name)
self._site: str | None = None
self._queue: SourceQueue | None = None
self._search_config: dict[str, Any] | None = None
self._opener = urllib.request.build_opener()
self._logger = logging.getLogger("earwigbot.wiki.cvworker." + name)

def _try_map_proxy_url(self, url, parsed, extra_headers, is_error=False):
def _try_map_proxy_url(
self,
url: str,
parsed: urllib.parse.ParseResult,
extra_headers: dict[str, str],
is_error: bool = False,
) -> tuple[str, bool]:
if not self._search_config or "proxies" not in self._search_config:
return url, False
for proxy_info in self._search_config["proxies"]:
@@ -152,17 +175,20 @@ class _CopyvioWorker:
return url, True
return url, False

def _open_url_raw(self, url, timeout=5, allow_content_types=None):
def _open_url_raw(
self,
url: str,
timeout: float = 5,
allow_content_types: Container[str] | None = None,
) -> OpenedURL | None:
"""Open a URL, without parsing it.

None will be returned for URLs that cannot be read for whatever reason.
"""
parsed = urllib.parse.urlparse(url)
if not isinstance(url, str):
url = url.encode("utf8")
extra_headers = {}
extra_headers: dict[str, str] = {}
url, _ = self._try_map_proxy_url(url, parsed, extra_headers)
request = Request(url, headers=extra_headers)
request = urllib.request.Request(url, headers=extra_headers)
try:
response = self._opener.open(request, timeout=timeout)
except (OSError, URLError, HTTPException, ValueError):
@@ -170,14 +196,14 @@ class _CopyvioWorker:
url, parsed, extra_headers, is_error=True
)
if not remapped:
self._logger.exception("Failed to fetch URL: %s", url)
self._logger.exception(f"Failed to fetch URL: {url}")
return None
self._logger.info("Failed to fetch URL, trying proxy remap: %s", url)
request = Request(url, headers=extra_headers)
self._logger.info(f"Failed to fetch URL, trying proxy remap: {url}")
request = urllib.request.Request(url, headers=extra_headers)
try:
response = self._opener.open(request, timeout=timeout)
except (OSError, URLError, HTTPException, ValueError):
self._logger.exception("Failed to fetch URL after proxy remap: %s", url)
self._logger.exception(f"Failed to fetch URL after proxy remap: {url}")
return None

try:
@@ -193,7 +219,7 @@ class _CopyvioWorker:
):
return None
if not parser_class:
parser_class = get_parser("text/plain")
parser_class = get_parser()
if size > (15 if parser_class.TYPE == "PDF" else 2) * 1024**2:
return None

@@ -207,28 +233,27 @@ class _CopyvioWorker:
return None

if response.headers.get("Content-Encoding") == "gzip":
stream = StringIO(content)
gzipper = GzipFile(fileobj=stream)
stream = io.BytesIO(content)
gzipper = gzip.GzipFile(fileobj=stream)
try:
content = gzipper.read()
except (OSError, struct_error):
except (OSError, struct.error):
return None

if len(content) > _MAX_RAW_SIZE:
return None
return _OpenedURL(content, parser_class)
return OpenedURL(content, parser_class)

def _open_url(self, source, redirects=0):
def _open_url(self, source: CopyvioSource, redirects: int = 0) -> str | None:
"""Open a URL and return its parsed content, or None.

First, we will decompress the content if the headers contain "gzip" as
its content encoding. Then, we will return the content stripped using
an HTML parser if the headers indicate it is HTML, or return the
content directly if it is plain text. If we don't understand the
content type, we'll return None.
First, we will decompress the content if the headers contain "gzip" as its
content encoding. Then, we will return the content stripped using an HTML
parser if the headers indicate it is HTML, or return the content directly if it
is plain text. If we don't understand the content type, we'll return None.

If a URLError was raised while opening the URL or an IOError was raised
while decompressing, None will be returned.
If a URLError was raised while opening the URL or an IOError was raised while
decompressing, None will be returned.
"""
self._search_config = source.search_config
if source.headers:
@@ -238,9 +263,9 @@ class _CopyvioWorker:
if result is None:
return None

args = source.parser_args.copy() if source.parser_args else {}
args: ParserArgs = source.parser_args.copy() if source.parser_args else {}
args["open_url"] = functools.partial(self._open_url_raw, timeout=source.timeout)
parser = result.parser_class(result.content, url=source.url, args=args)
parser = result.parser_class(result.content, source.url, args=args)
try:
return parser.parse()
except ParserRedirectError as exc:
@@ -249,30 +274,31 @@ class _CopyvioWorker:
source.url = exc.url.decode("utf8")
return self._open_url(source, redirects=redirects + 1)

def _acquire_new_site(self):
def _acquire_new_site(self) -> None:
"""Block for a new unassigned site queue."""
if self._until:
timeout = self._until - time.time()
if timeout <= 0:
raise Empty
raise queue.Empty()
else:
timeout = None

self._logger.debug("Waiting for new site queue")
site, queue = self._queues.unassigned.get(timeout=timeout)
if site is StopIteration:
site, q = self._queues.unassigned.get(timeout=timeout)
if isinstance(site, type) and issubclass(site, StopIteration):
raise StopIteration
self._logger.debug(f"Acquired new site queue: {site}")
self._site = site
self._queue = queue
self._queue = q

def _dequeue(self):
def _dequeue(self) -> CopyvioSource:
"""Remove a source from one of the queues."""
if not self._site:
self._acquire_new_site()
assert self._site is not None
assert self._queue is not None

logmsg = "Fetching source URL from queue {0}"
self._logger.debug(logmsg.format(self._site))
self._logger.debug(f"Fetching source URL from queue {self._site}")
self._queues.lock.acquire()
try:
source = self._queue.popleft()
@@ -294,11 +320,11 @@ class _CopyvioWorker:
self._queues.lock.release()
return source

def _handle_once(self):
"""Handle a single source from one of the queues."""
def _handle_once(self) -> bool:
"""Handle a single source from one of the queues. Return if we should exit."""
try:
source = self._dequeue()
except Empty:
except queue.Empty:
self._logger.debug("Exiting: queue timed out")
return False
except StopIteration:
@@ -320,12 +346,11 @@ class _CopyvioWorker:
source.workspace.compare(source, chain)
return True

def _run(self):
def _run(self) -> None:
"""Main entry point for the worker thread.

We will keep fetching URLs from the queues and handling them until
either we run out of time, or we get an exit signal that the queue is
now empty.
We will keep fetching URLs from the queues and handling them until either we
run out of time, or we get an exit signal that the queue is now empty.
"""
while True:
try:
@@ -335,9 +360,9 @@ class _CopyvioWorker:
self._logger.exception("Uncaught exception in worker")
time.sleep(5) # Delay if we get stuck in a busy loop

def start(self):
def start(self) -> None:
"""Start the copyvio worker in a new thread."""
thread = Thread(target=self._run, name="cvworker-" + self._name)
thread = threading.Thread(target=self._run, name="cvworker-" + self._name)
thread.daemon = True
thread.start()

@@ -347,20 +372,20 @@ class CopyvioWorkspace:

def __init__(
self,
article,
min_confidence,
max_time,
logger,
headers,
url_timeout=5,
num_workers=8,
short_circuit=True,
parser_args=None,
exclude_check=None,
config=None,
degree=5,
):
self.sources = []
article: MarkovChain,
min_confidence: float,
max_time: float,
logger: logging.Logger,
headers: list[tuple[str, str]],
url_timeout: float = 5,
num_workers: int = 8,
short_circuit: bool = True,
parser_args: ParserArgs | None = None,
exclusion_callback: Callable[[str], bool] | None = None,
config: dict[str, Any] | None = None,
degree: int = DEFAULT_DEGREE,
) -> None:
self.sources: list[CopyvioSource] = []
self.finished = False
self.possible_miss = False

@@ -369,8 +394,8 @@ class CopyvioWorkspace:
self._min_confidence = min_confidence
self._start_time = time.time()
self._until = (self._start_time + max_time) if max_time > 0 else None
self._handled_urls = set()
self._finish_lock = Lock()
self._handled_urls: set[str] = set()
self._finish_lock = threading.Lock()
self._short_circuit = short_circuit
self._source_args = {
"workspace": self,
@@ -379,10 +404,11 @@ class CopyvioWorkspace:
"parser_args": parser_args,
"search_config": config,
}
self._exclude_check = exclude_check
self._exclusion_callback = exclusion_callback
self._degree = degree

if _is_globalized:
assert _global_queues is not None
self._queues = _global_queues
else:
self._queues = _CopyvioQueues()
@@ -391,28 +417,27 @@ class CopyvioWorkspace:
name = f"local-{id(self) % 10000:04}.{i}"
_CopyvioWorker(name, self._queues, self._until).start()

def _calculate_confidence(self, delta):
def _calculate_confidence(self, delta: MarkovChainIntersection) -> float:
"""Return the confidence of a violation as a float between 0 and 1."""

def conf_with_article_and_delta(article, delta):
def conf_with_article_and_delta(article: float, delta: float) -> float:
"""Calculate confidence using the article and delta chain sizes."""
# This piecewise function exhibits exponential growth until it
# reaches the default "suspect" confidence threshold, at which
# point it transitions to polynomial growth with a limit of 1 as
# (delta / article) approaches 1.
# This piecewise function exhibits exponential growth until it reaches the
# default "suspect" confidence threshold, at which point it transitions to
# polynomial growth with a limit of 1 as # (delta / article) approaches 1.
# A graph can be viewed here: https://goo.gl/mKPhvr
ratio = delta / article
if ratio <= 0.52763:
return -log(1 - ratio)
return -math.log(1 - ratio)
else:
return (-0.8939 * (ratio**2)) + (1.8948 * ratio) - 0.0009

def conf_with_delta(delta):
def conf_with_delta(delta: float) -> float:
"""Calculate confidence using just the delta chain size."""
# This piecewise function was derived from experimental data using
# reference points at (0, 0), (100, 0.5), (250, 0.75), (500, 0.9),
# and (1000, 0.95), with a limit of 1 as delta approaches infinity.
# A graph can be viewed here: https://goo.gl/lVl7or
# reference points at (0, 0), (100, 0.5), (250, 0.75), (500, 0.9), and
# (1000, 0.95), with a limit of 1 as delta approaches infinity. A graph can
# be viewed here: https://goo.gl/lVl7or
if delta <= 100:
return delta / (delta + 100)
elif delta <= 250:
@@ -430,7 +455,7 @@ class CopyvioWorkspace:
)
)

def _finish_early(self):
def _finish_early(self) -> None:
"""Finish handling links prematurely (if we've hit min_confidence)."""
self._logger.debug("Confidence threshold met; skipping remaining sources")
with self._queues.lock:
@@ -438,7 +463,7 @@ class CopyvioWorkspace:
source.skip()
self.finished = True

def enqueue(self, urls):
def enqueue(self, urls: list[str]) -> None:
"""Put a list of URLs into the various worker queues."""
for url in urls:
with self._queues.lock:
@@ -449,7 +474,7 @@ class CopyvioWorkspace:
source = CopyvioSource(url=url, **self._source_args)
self.sources.append(source)

if self._exclude_check and self._exclude_check(url):
if self._exclusion_callback and self._exclusion_callback(url):
self._logger.debug(f"enqueue(): exclude {url}")
source.excluded = True
source.skip()
@@ -460,32 +485,37 @@ class CopyvioWorkspace:
continue

try:
import tldextract

key = tldextract.extract(url).registered_domain
except ImportError: # Fall back on very naive method
except ModuleNotFoundError: # Fall back on very naive method
from urllib.parse import urlparse

key = ".".join(urlparse(url).netloc.split(".")[-2:])

logmsg = "enqueue(): {0} {1} -> {2}"
logmsg = f"enqueue(): %s {key} -> {url}"
if key in self._queues.sites:
self._logger.debug(logmsg.format("append", key, url))
self._logger.debug(logmsg % "append")
self._queues.sites[key].append(source)
else:
self._logger.debug(logmsg.format("new", key, url))
self._queues.sites[key] = queue = deque()
queue.append(source)
self._queues.unassigned.put((key, queue))
self._logger.debug(logmsg % "new")
q: SourceQueue = collections.deque()
q.append(source)
self._queues.sites[key] = q
self._queues.unassigned.put((key, q))

def compare(self, source, source_chain):
def compare(self, source: CopyvioSource, source_chain: MarkovChain | None) -> None:
"""Compare a source to the article; call _finish_early if necessary."""
if source_chain:
delta = MarkovChainIntersection(self._article, source_chain)
conf = self._calculate_confidence(delta)
else:
delta = None
conf = 0.0
self._logger.debug(f"compare(): {source.url} -> {conf}")
with self._finish_lock:
if source_chain:
assert delta is not None
source.update(conf, source_chain, delta)
source.finish_work()
if not self.finished and conf >= self._min_confidence:
@@ -494,7 +524,7 @@ class CopyvioWorkspace:
else:
self.finished = True

def wait(self):
def wait(self) -> None:
"""Wait for the workers to finish handling the sources."""
self._logger.debug(f"Waiting on {len(self.sources)} sources")
for source in self.sources:
@@ -505,7 +535,7 @@ class CopyvioWorkspace:
for i in range(self._num_workers):
self._queues.unassigned.put((StopIteration, None))

def get_result(self, num_queries=0):
def get_result(self, num_queries: int = 0) -> CopyvioCheckResult:
"""Return a CopyvioCheckResult containing the results of this check."""
self.sources.sort(
key=lambda s: (


+ 107
- 3
src/earwigbot/wiki/page.py View File

@@ -35,14 +35,14 @@ import mwparserfromhell

from earwigbot import exceptions
from earwigbot.exceptions import APIError
from earwigbot.wiki.copyvios import CopyvioMixIn
from earwigbot.wiki.copyvios import DEFAULT_DEGREE, CopyvioChecker, CopyvioCheckResult

if typing.TYPE_CHECKING:
from earwigbot.wiki.site import Site
from earwigbot.wiki.user import User


class Page(CopyvioMixIn):
class Page:
"""
**EarwigBot: Wiki Toolset: Page**

@@ -110,7 +110,6 @@ class Page(CopyvioMixIn):
__init__() will not do any API queries, but it will use basic namespace logic
to determine our namespace ID and if we are a talkpage.
"""
super().__init__(site)
self._site = site
self._title = title.strip()
self._follow_redirects = self._keep_following = follow_redirects
@@ -873,3 +872,108 @@ class Page(CopyvioMixIn):
return False

return True

def copyvio_check(
self,
min_confidence: float = 0.75,
max_queries: int = 15,
max_time: float = -1,
no_searches: bool = False,
no_links: bool = False,
short_circuit: bool = True,
degree: int = DEFAULT_DEGREE,
) -> CopyvioCheckResult:
"""
Check the page for copyright violations.

Returns a :class:`.CopyvioCheckResult` object with information on the results
of the check.

*min_confidence* is the minimum amount of confidence we must have in the
similarity between a source text and the article in order for us to consider it
a suspected violation. This is a number between 0 and 1.

*max_queries* is self-explanatory; we will never make more than this number of
queries in a given check.

*max_time* can be set to prevent copyvio checks from taking longer than a set
amount of time (generally around a minute), which can be useful if checks are
called through a web server with timeouts. We will stop checking new URLs as
soon as this limit is reached.

Setting *no_searches* to ``True`` will cause only URLs in the wikitext of the
page to be checked; no search engine queries will be made. Setting *no_links*
to ``True`` will cause the opposite to happen: URLs in the wikitext will be
ignored; search engine queries will be made only. Setting both of these to
``True`` is pointless.

Normally, the checker will short-circuit if it finds a URL that meets
*min_confidence*. This behavior normally causes it to skip any remaining URLs
and web queries, but setting *short_circuit* to ``False`` will prevent this.

The *degree* controls the n-gram word size used in comparing similarity. It
should usually be a number between 3 and 5.

Raises :exc:`.CopyvioCheckError` or subclasses
(:exc:`.UnknownSearchEngineError`, :exc:`.SearchQueryError`, ...) on errors.
"""
self._logger.info(f"Starting copyvio check for [[{self.title}]]")
checker = CopyvioChecker(
self,
min_confidence=min_confidence,
max_time=max_time,
degree=degree,
logger=self._logger,
)

result = checker.run_check(
max_queries=max_queries,
no_searches=no_searches,
no_links=no_links,
short_circuit=short_circuit,
)
self._logger.info(result.get_log_message(self.title))
return result

def copyvio_compare(
self,
urls: list[str] | str,
min_confidence: float = 0.75,
max_time: float = 30,
degree: int = DEFAULT_DEGREE,
) -> CopyvioCheckResult:
"""
Check the page, like :py:meth:`copyvio_check`, against specific URLs.

This is essentially a reduced version of :meth:`copyvio_check` - a copyivo
comparison is made using Markov chains and the result is returned in a
:class:`.CopyvioCheckResult` object - but without using a search engine, since
the suspected "violated" URL is supplied from the start.

One use case is to generate a result when the URL is retrieved from a cache,
like the one used in EarwigBot's Toolforge site. After a search is done, the
resulting URL is stored in a cache for 72 hours so future checks against that
page will not require another set of time-and-money-consuming search engine
queries. However, the comparison itself (which includes the article's and the
source's content) cannot be stored for data retention reasons, so a fresh
comparison is made using this function.

Since no searching is done, neither :exc:`.UnknownSearchEngineError` nor
:exc:`.SearchQueryError` will be raised.
"""
if not isinstance(urls, list):
urls = [urls]
self._logger.info(
f"Starting copyvio compare for [[{self.title}]] against {', '.join(urls)}"
)
checker = CopyvioChecker(
self,
min_confidence=min_confidence,
max_time=max_time,
degree=degree,
logger=self._logger,
)

result = checker.run_compare(urls)
self._logger.info(result.get_log_message(self.title))
return result

Loading…
Cancel
Save