diff --git a/CHANGELOG b/CHANGELOG index ea68d05..baa2684 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,6 @@ v0.4 (unreleased): -- Migrated to Python 3 (3.11+). +- Migrated to Python 3 (3.11+). Substantial code cleanup. - Migrated from oursql to pymysql. - Copyvios: Configurable proxy support for specific domains. - Copyvios: Parser-directed URL redirection. diff --git a/earwigbot/__init__.py b/earwigbot/__init__.py index 092b202..ea1b87c 100644 --- a/earwigbot/__init__.py +++ b/earwigbot/__init__.py @@ -1,4 +1,4 @@ -# Copyright (C) 2009-2019 Ben Kurtovic +# Copyright (C) 2009-2024 Ben Kurtovic # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal @@ -20,15 +20,16 @@ """ `EarwigBot `_ is a Python robot that edits -Wikipedia and interacts with people over IRC. +Wikipedia and interacts over IRC. -See :file:`README.rst` for an overview, or the :file:`docs/` directory for -details. This documentation is also available `online -`_. +See :file:`README.rst` for an overview, or the :file:`docs/` directory for details. +This documentation is also available `online `_. """ +import typing + __author__ = "Ben Kurtovic" -__copyright__ = "Copyright (C) 2009-2019 Ben Kurtovic" +__copyright__ = "Copyright (C) 2009-2024 Ben Kurtovic" __license__ = "MIT License" __version__ = "0.4.dev0" __email__ = "ben.kurtovic@gmail.com" @@ -57,12 +58,26 @@ from earwigbot import lazy importer = lazy.LazyImporter() -bot = importer.new("earwigbot.bot") -commands = importer.new("earwigbot.commands") -config = importer.new("earwigbot.config") -exceptions = importer.new("earwigbot.exceptions") -irc = importer.new("earwigbot.irc") -managers = importer.new("earwigbot.managers") -tasks = importer.new("earwigbot.tasks") -util = importer.new("earwigbot.util") -wiki = importer.new("earwigbot.wiki") +if typing.TYPE_CHECKING: + from earwigbot import ( + bot, + commands, + config, + exceptions, + irc, + managers, + tasks, + util, + wiki, + ) + +else: + bot = importer.new("earwigbot.bot") + commands = importer.new("earwigbot.commands") + config = importer.new("earwigbot.config") + exceptions = importer.new("earwigbot.exceptions") + irc = importer.new("earwigbot.irc") + managers = importer.new("earwigbot.managers") + tasks = importer.new("earwigbot.tasks") + util = importer.new("earwigbot.util") + wiki = importer.new("earwigbot.wiki") diff --git a/earwigbot/exceptions.py b/earwigbot/exceptions.py index 31032c8..ab69365 100644 --- a/earwigbot/exceptions.py +++ b/earwigbot/exceptions.py @@ -107,6 +107,9 @@ class APIError(ServiceError): Raised by :py:meth:`Site.api_query `. """ + code: str + info: str + class SQLError(ServiceError): """Some error involving SQL querying occurred. diff --git a/earwigbot/tasks/wikiproject_tagger.py b/earwigbot/tasks/wikiproject_tagger.py index 9620c84..c5c1ea3 100644 --- a/earwigbot/tasks/wikiproject_tagger.py +++ b/earwigbot/tasks/wikiproject_tagger.py @@ -43,13 +43,14 @@ JobKwargs = TypedDict( "nocreate": NotRequired[bool], "recursive": NotRequired[bool | int], "tag-categories": NotRequired[bool], + "not-in-category": NotRequired[str], "site": NotRequired[str], "dry-run": NotRequired[bool], }, ) -@dataclass +@dataclass(frozen=True) class Job: """ Represents a single wikiproject-tagging task. @@ -68,11 +69,20 @@ class Job: only_with: set[str] | None nocreate: bool tag_categories: bool + not_in_category: str | None dry_run: bool - counter: int = 0 + _counter: list[int] = [0] # Wrap to allow frozen updates processed_cats: set[str] = field(default_factory=set) processed_pages: set[str] = field(default_factory=set) + skip_pages: set[str] = field(default_factory=set) + + @property + def counter(self) -> int: + return self._counter[0] + + def add_to_counter(self, value: int) -> None: + self._counter[0] += value class ShutoffEnabled(Exception): @@ -90,7 +100,7 @@ class WikiProjectTagger(Task): Usage: :command:`earwigbot -t wikiproject_tagger PATH --banner BANNER [--category CAT | --file FILE] [--summary SUM] [--update] [--append PARAMS] [--autoassess [CLASSES]] [--only-with BANNER] [--nocreate] [--recursive [NUM]] - [--site SITE] [--dry-run]` + [--not-in-category CAT] [--site SITE] [--dry-run]` .. glossary:: @@ -126,6 +136,8 @@ class WikiProjectTagger(Task): ``NUM`` isn't provided, go infinitely (this can be dangerous) ``--tag-categories`` also tag category pages + ``--not-in-category CAT`` + skip talk pages that are already members of this category ``--site SITE`` the ID of the site to tag pages on, defaulting to the default site ``--dry-run`` @@ -189,6 +201,7 @@ class WikiProjectTagger(Task): nocreate = kwargs.get("nocreate", False) recursive = kwargs.get("recursive", 0) tag_categories = kwargs.get("tag-categories", False) + not_in_category = kwargs.get("not-in-category") dry_run = kwargs.get("dry-run", False) banner, names = self.get_names(site, banner) if not names: @@ -210,6 +223,7 @@ class WikiProjectTagger(Task): only_with=only_with, nocreate=nocreate, tag_categories=tag_categories, + not_in_category=not_in_category, dry_run=dry_run, ) @@ -224,6 +238,11 @@ class WikiProjectTagger(Task): """ Run a tagging *job* on a given *site*. """ + if job.not_in_category: + skip_category = site.get_category(job.not_in_category) + for page in skip_category.get_members(): + job.skip_pages.add(page.title) + if "category" in kwargs: title = kwargs["category"] title = self.guess_namespace(site, title, constants.NS_CATEGORY) @@ -322,6 +341,10 @@ class WikiProjectTagger(Task): if not page.is_talkpage: page = page.toggle_talk() + if page.title in job.skip_pages: + self.logger.debug(f"Skipping page, in category to skip: [[{page.title}]]") + return + if page.title in job.processed_pages: self.logger.debug(f"Skipping page, already processed: [[{page.title}]]") return @@ -330,7 +353,7 @@ class WikiProjectTagger(Task): if job.counter % 10 == 0: # Do a shutoff check every ten pages if self.shutoff_enabled(page.site): raise ShutoffEnabled() - job.counter += 1 + job.add_to_counter(1) try: code = page.parse() diff --git a/earwigbot/wiki/category.py b/earwigbot/wiki/category.py index e9e04a2..02184b3 100644 --- a/earwigbot/wiki/category.py +++ b/earwigbot/wiki/category.py @@ -1,4 +1,4 @@ -# Copyright (C) 2009-2015 Ben Kurtovic +# Copyright (C) 2009-2024 Ben Kurtovic # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal @@ -18,6 +18,9 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. +from collections.abc import Iterator + +from earwigbot.wiki.constants import Service from earwigbot.wiki.page import Page __all__ = ["Category"] @@ -27,14 +30,14 @@ class Category(Page): """ **EarwigBot: Wiki Toolset: Category** - Represents a category on a given :py:class:`~earwigbot.wiki.site.Site`, a - subclass of :py:class:`~earwigbot.wiki.page.Page`. Provides additional - methods, but :py:class:`~earwigbot.wiki.page.Page`'s own methods should - work fine on :py:class:`Category` objects. :py:meth:`site.get_page() - ` will return a :py:class:`Category` - instead of a :py:class:`~earwigbot.wiki.page.Page` if the given title is in - the category namespace; :py:meth:`~earwigbot.wiki.site.Site.get_category` - is shorthand, accepting category names without the namespace prefix. + Represents a category on a given :py:class:`~earwigbot.wiki.site.Site`, a subclass + of :py:class:`~earwigbot.wiki.page.Page`. Provides additional methods, but + :py:class:`~earwigbot.wiki.page.Page`'s own methods should work fine on + :py:class:`Category` objects. :py:meth:`site.get_page() + ` will return a :py:class:`Category` instead of + a :py:class:`~earwigbot.wiki.page.Page` if the given title is in the category + namespace; :py:meth:`~earwigbot.wiki.site.Site.get_category` is shorthand, + accepting category names without the namespace prefix. *Attributes:* @@ -48,22 +51,30 @@ class Category(Page): - :py:meth:`get_members`: iterates over Pages in the category """ - def __repr__(self): - """Return the canonical string representation of the Category.""" + def __repr__(self) -> str: + """ + Return the canonical string representation of the Category. + """ res = "Category(title={0!r}, follow_redirects={1!r}, site={2!r})" return res.format(self._title, self._follow_redirects, self._site) - def __str__(self): - """Return a nice string representation of the Category.""" + def __str__(self) -> str: + """ + Return a nice string representation of the Category. + """ return f'' - def __iter__(self): - """Iterate over all members of the category.""" + def __iter__(self) -> Iterator[Page]: + """ + Iterate over all members of the category. + """ return self.get_members() - def _get_members_via_api(self, limit, follow): - """Iterate over Pages in the category using the API.""" - params = { + def _get_members_via_api(self, limit: int | None, follow: bool) -> Iterator[Page]: + """ + Iterate over Pages in the category using the API. + """ + params: dict[str, str | int] = { "action": "query", "list": "categorymembers", "cmtitle": self.title, @@ -84,8 +95,10 @@ class Category(Page): else: break - def _get_members_via_sql(self, limit, follow): - """Iterate over Pages in the category using SQL.""" + def _get_members_via_sql(self, limit: int | None, follow: bool) -> Iterator[Page]: + """ + Iterate over Pages in the category using SQL. + """ query = """SELECT page_title, page_namespace, page_id FROM page JOIN categorylinks ON page_id = cl_from WHERE cl_to = ?""" @@ -107,16 +120,20 @@ class Category(Page): title = base yield self.site.get_page(title, follow_redirects=follow, pageid=row[2]) - def _get_size_via_api(self, member_type): - """Return the size of the category using the API.""" + def _get_size_via_api(self, member_type: str) -> int: + """ + Return the size of the category using the API. + """ result = self.site.api_query( action="query", prop="categoryinfo", titles=self.title ) info = list(result["query"]["pages"].values())[0]["categoryinfo"] return info[member_type] - def _get_size_via_sql(self, member_type): - """Return the size of the category using SQL.""" + def _get_size_via_sql(self, member_type: str) -> int: + """ + Return the size of the category using SQL. + """ query = "SELECT COUNT(*) FROM categorylinks WHERE cl_to = ?" title = self.title.replace(" ", "_").split(":", 1)[1] if member_type == "size": @@ -126,49 +143,54 @@ class Category(Page): result = self.site.sql_query(query, (title, member_type[:-1])) return list(result)[0][0] - def _get_size(self, member_type): - """Return the size of the category.""" + def _get_size(self, member_type: str) -> int: + """ + Return the size of the category. + """ services = { - self.site.SERVICE_API: self._get_size_via_api, - self.site.SERVICE_SQL: self._get_size_via_sql, + Service.API: self._get_size_via_api, + Service.SQL: self._get_size_via_sql, } - return self.site.delegate(services, (member_type,)) + return self.site.delegate(services, member_type) @property - def size(self): - """The total number of members in the category. + def size(self) -> int: + """ + The total number of members in the category. Includes pages, files, and subcats. Equal to :py:attr:`pages` + - :py:attr:`files` + :py:attr:`subcats`. This will use either the API or - SQL depending on which are enabled and the amount of lag on each. This - is handled by :py:meth:`site.delegate() - `. + :py:attr:`files` + :py:attr:`subcats`. This will use either the API or SQL + depending on which are enabled and the amount of lag on each. This is handled + by :py:meth:`site.delegate() `. """ return self._get_size("size") @property - def pages(self): - """The number of pages in the category. + def pages(self) -> int: + """ + The number of pages in the category. - This will use either the API or SQL depending on which are enabled and - the amount of lag on each. This is handled by :py:meth:`site.delegate() + This will use either the API or SQL depending on which are enabled and the + amount of lag on each. This is handled by :py:meth:`site.delegate() `. """ return self._get_size("pages") @property - def files(self): - """The number of files in the category. + def files(self) -> int: + """ + The number of files in the category. - This will use either the API or SQL depending on which are enabled and - the amount of lag on each. This is handled by :py:meth:`site.delegate() + This will use either the API or SQL depending on which are enabled and the + amount of lag on each. This is handled by :py:meth:`site.delegate() `. """ return self._get_size("files") @property - def subcats(self): - """The number of subcategories in the category. + def subcats(self) -> int: + """ + The number of subcategories in the category. This will use either the API or SQL depending on which are enabled and the amount of lag on each. This is handled by :py:meth:`site.delegate() @@ -176,36 +198,38 @@ class Category(Page): """ return self._get_size("subcats") - def get_members(self, limit=None, follow_redirects=None): - """Iterate over Pages in the category. + def get_members( + self, limit: int | None = None, follow_redirects: bool | None = None + ) -> Iterator[Page]: + """ + Iterate over Pages in the category. - If *limit* is given, we will provide this many pages, or less if the - category is smaller. By default, *limit* is ``None``, meaning we will - keep iterating over members until the category is exhausted. - *follow_redirects* is passed directly to :py:meth:`site.get_page() - `; it defaults to ``None``, which - will use the value passed to our :py:meth:`__init__`. + If *limit* is given, we will provide this many pages, or less if the category + is smaller. By default, *limit* is ``None``, meaning we will keep iterating + over members until the category is exhausted. *follow_redirects* is passed + directly to :py:meth:`site.get_page() `; + it defaults to ``None``, which will use the value passed to our + :py:meth:`__init__`. - This will use either the API or SQL depending on which are enabled and - the amount of lag on each. This is handled by :py:meth:`site.delegate() + This will use either the API or SQL depending on which are enabled and the + amount of lag on each. This is handled by :py:meth:`site.delegate() `. .. note:: - Be careful when iterating over very large categories with no limit. - If using the API, at best, you will make one query per 5000 pages, - which can add up significantly for categories with hundreds of - thousands of members. As for SQL, note that *all page titles are - stored internally* as soon as the query is made, so the site-wide - SQL lock can be freed and unrelated queries can be made without - requiring a separate connection to be opened. This is generally not - an issue unless your category's size approaches several hundred + Be careful when iterating over very large categories with no limit. If using + the API, at best, you will make one query per 5000 pages, which can add up + significantly for categories with hundreds of thousands of members. As for + SQL, note that *all page titles are stored internally* as soon as the query + is made, so the site-wide SQL lock can be freed and unrelated queries can be + made without requiring a separate connection to be opened. This is generally + not an issue unless your category's size approaches several hundred thousand, in which case the sheer number of titles in memory becomes problematic. """ services = { - self.site.SERVICE_API: self._get_members_via_api, - self.site.SERVICE_SQL: self._get_members_via_sql, + Service.API: self._get_members_via_api, + Service.SQL: self._get_members_via_sql, } if follow_redirects is None: follow_redirects = self._follow_redirects - return self.site.delegate(services, (limit, follow_redirects)) + return self.site.delegate(services, limit, follow_redirects) diff --git a/earwigbot/wiki/constants.py b/earwigbot/wiki/constants.py index 9e18a4b..481f0cd 100644 --- a/earwigbot/wiki/constants.py +++ b/earwigbot/wiki/constants.py @@ -31,14 +31,50 @@ Import directly with ``from earwigbot.wiki import constants`` or :py:mod:`earwigbot.wiki` directly (e.g. ``earwigbot.wiki.USER_AGENT``). """ +__all__ = [ + "NS_CATEGORY_TALK", + "NS_CATEGORY", + "NS_DRAFT_TALK", + "NS_DRAFT", + "NS_FILE_TALK", + "NS_FILE", + "NS_HELP_TALK", + "NS_HELP", + "NS_MAIN", + "NS_MEDIA", + "NS_MEDIAWIKI_TALK", + "NS_MEDIAWIKI", + "NS_MODULE_TALK", + "NS_MODULE", + "NS_PORTAL_TALK", + "NS_PORTAL", + "NS_PROJECT_TALK", + "NS_PROJECT", + "NS_SPECIAL", + "NS_TALK", + "NS_TEMPLATE_TALK", + "NS_TEMPLATE", + "NS_USER_TALK", + "NS_USER", + "USER_AGENT", +] + +import platform +from enum import Enum + +import earwigbot + # Default User Agent when making API queries: -from platform import python_version as _p +USER_AGENT = ( + f"EarwigBot/{earwigbot.__version__} " + f"(Python/{platform.python_version()}; https://github.com/earwig/earwigbot)" +) -from earwigbot import __version__ as _v -USER_AGENT = "EarwigBot/{0} (Python/{1}; https://github.com/earwig/earwigbot)" -USER_AGENT = USER_AGENT.format(_v, _p()) -del _v, _p +class Service(Enum): + API = 1 + SQL = 2 + # Default namespace IDs: NS_MAIN = 0 @@ -57,5 +93,13 @@ NS_HELP = 12 NS_HELP_TALK = 13 NS_CATEGORY = 14 NS_CATEGORY_TALK = 15 + +NS_PORTAL = 100 +NS_PORTAL_TALK = 101 +NS_DRAFT = 118 +NS_DRAFT_TALK = 119 +NS_MODULE = 828 +NS_MODULE_TALK = 829 + NS_SPECIAL = -1 NS_MEDIA = -2 diff --git a/earwigbot/wiki/page.py b/earwigbot/wiki/page.py index 217d356..947fd1b 100644 --- a/earwigbot/wiki/page.py +++ b/earwigbot/wiki/page.py @@ -1,4 +1,4 @@ -# Copyright (C) 2009-2019 Ben Kurtovic +# Copyright (C) 2009-2024 Ben Kurtovic # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal @@ -18,17 +18,27 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. +from __future__ import annotations + +import hashlib import re -from hashlib import md5 -from logging import NullHandler, getLogger -from time import gmtime, strftime -from urllib.parse import quote +import time +import typing +import urllib.parse +from collections.abc import Iterable +from logging import Logger, NullHandler, getLogger +from typing import Any import mwparserfromhell from earwigbot import exceptions +from earwigbot.exceptions import APIError from earwigbot.wiki.copyvios import CopyvioMixIn +if typing.TYPE_CHECKING: + from earwigbot.wiki.site import Site + from earwigbot.wiki.user import User + __all__ = ["Page"] @@ -36,10 +46,10 @@ class Page(CopyvioMixIn): """ **EarwigBot: Wiki Toolset: Page** - Represents a page on a given :py:class:`~earwigbot.wiki.site.Site`. Has - methods for getting information about the page, getting page content, and - so on. :py:class:`~earwigbot.wiki.category.Category` is a subclass of - :py:class:`Page` with additional methods. + Represents a page on a given :py:class:`~earwigbot.wiki.site.Site`. Has methods for + getting information about the page, getting page content, and so on. + :py:class:`~earwigbot.wiki.category.Category` is a subclass of :py:class:`Page` + with additional methods. *Attributes:* @@ -59,20 +69,19 @@ class Page(CopyvioMixIn): - :py:meth:`reload`: forcibly reloads the page's attributes - :py:meth:`toggle_talk`: returns a content page's talk page, or vice versa - :py:meth:`get`: returns the page's content - - :py:meth:`get_redirect_target`: returns the page's destination if it is a - redirect - - :py:meth:`get_creator`: returns a User object representing the first - person to edit the page + - :py:meth:`get_redirect_target`: returns the page's destination if it is a redirect + - :py:meth:`get_creator`: returns a User object representing the first person to + edit the page - :py:meth:`parse`: parses the page content for templates, links, etc - :py:meth:`edit`: replaces the page's content or creates a new page - :py:meth:`add_section`: adds a new section at the bottom of the page - - :py:meth:`check_exclusion`: checks whether or not we are allowed to edit - the page, per ``{{bots}}``/``{{nobots}}`` + - :py:meth:`check_exclusion`: checks whether or not we are allowed to edit the + page, per ``{{bots}}``/``{{nobots}}`` - - :py:meth:`~earwigbot.wiki.copyvios.CopyrightMixIn.copyvio_check`: - checks the page for copyright violations - - :py:meth:`~earwigbot.wiki.copyvios.CopyrightMixIn.copyvio_compare`: - checks the page like :py:meth:`copyvio_check`, but against a specific URL + - :py:meth:`~earwigbot.wiki.copyvios.CopyrightMixIn.copyvio_check`: checks the page + for copyright violations + - :py:meth:`~earwigbot.wiki.copyvios.CopyrightMixIn.copyvio_compare`: checks the + page like :py:meth:`copyvio_check`, but against a specific URL """ PAGE_UNKNOWN = 0 @@ -80,18 +89,26 @@ class Page(CopyvioMixIn): PAGE_MISSING = 2 PAGE_EXISTS = 3 - def __init__(self, site, title, follow_redirects=False, pageid=None, logger=None): - """Constructor for new Page instances. + def __init__( + self, + site: Site, + title: str, + follow_redirects: bool = False, + pageid: int | None = None, + logger: Logger | None = None, + ) -> None: + """ + Constructor for new Page instances. - Takes four arguments: a Site object, the Page's title (or pagename), - whether or not to follow redirects (optional, defaults to False), and - a page ID to supplement the title (optional, defaults to None - i.e., - we will have to query the API to get it). + Takes four arguments: a Site object, the Page's title (or pagename), whether or + not to follow redirects (optional, defaults to False), and a page ID to + supplement the title (optional, defaults to None - i.e., we will have to query + the API to get it). As with User, site.get_page() is preferred. - __init__() will not do any API queries, but it will use basic namespace - logic to determine our namespace ID and if we are a talkpage. + __init__() will not do any API queries, but it will use basic namespace logic + to determine our namespace ID and if we are a talkpage. """ super().__init__(site) self._site = site @@ -108,16 +125,16 @@ class Page(CopyvioMixIn): # Attributes to be loaded through the API: self._exists = self.PAGE_UNKNOWN - self._is_redirect = None - self._lastrevid = None - self._protection = None - self._fullurl = None - self._content = None - self._creator = None + self._is_redirect: bool | None = None + self._lastrevid: int | None = None + self._protection: dict | None = None + self._fullurl: str | None = None + self._content: str | None = None + self._creator: str | None = None # Attributes used for editing/deleting/protecting/etc: - self._basetimestamp = None - self._starttimestamp = None + self._basetimestamp: str | None = None + self._starttimestamp: str | None = None # Try to determine the page's namespace using our site's namespace # converter: @@ -137,54 +154,60 @@ class Page(CopyvioMixIn): else: self._is_talkpage = self._namespace % 2 == 1 - def __repr__(self): - """Return the canonical string representation of the Page.""" + def __repr__(self) -> str: + """ + Return the canonical string representation of the Page. + """ res = "Page(title={0!r}, follow_redirects={1!r}, site={2!r})" return res.format(self._title, self._follow_redirects, self._site) - def __str__(self): - """Return a nice string representation of the Page.""" + def __str__(self) -> str: + """ + Return a nice string representation of the Page. + """ return f'' - def _assert_validity(self): - """Used to ensure that our page's title is valid. + def _assert_validity(self) -> None: + """ + Used to ensure that our page's title is valid. If this method is called when our page is not valid (and after _load_attributes() has been called), InvalidPageError will be raised. - Note that validity != existence. If a page's title is invalid (e.g, it - contains "[") it will always be invalid, and cannot be edited. + Note that validity != existence. If a page's title is invalid (e.g, it contains + "[") it will always be invalid, and cannot be edited. """ if self._exists == self.PAGE_INVALID: e = f"Page '{self._title}' is invalid." raise exceptions.InvalidPageError(e) - def _assert_existence(self): - """Used to ensure that our page exists. + def _assert_existence(self) -> None: + """ + Used to ensure that our page exists. If this method is called when our page doesn't exist (and after - _load_attributes() has been called), PageNotFoundError will be raised. - It will also call _assert_validity() beforehand. + _load_attributes() has been called), PageNotFoundError will be raised. It will + also call _assert_validity() beforehand. """ self._assert_validity() if self._exists == self.PAGE_MISSING: e = f"Page '{self._title}' does not exist." raise exceptions.PageNotFoundError(e) - def _load(self): - """Call _load_attributes() and follows redirects if we're supposed to. + def _load(self) -> None: + """ + Call _load_attributes() and follow redirects if we're supposed to. - This method will only follow redirects if follow_redirects=True was - passed to __init__() (perhaps indirectly passed by site.get_page()). - It avoids the API's &redirects param in favor of manual following, - so we can act more realistically (we don't follow double redirects, and - circular redirects don't break us). + This method will only follow redirects if follow_redirects=True was passed to + __init__() (perhaps indirectly passed by site.get_page()). It avoids the API's + &redirects param in favor of manual following, so we can act more realistically + (we don't follow double redirects, and circular redirects don't break us). - This will raise RedirectError if we have a problem following, but that - is a bug and should NOT happen. + This will raise RedirectError if we have a problem following, but that is a bug + and should NOT happen. - If we're following a redirect, this will make a grand total of three - API queries. It's a lot, but each one is quite small. + If we're following a redirect, this will make a grand total of three API + queries. It's a lot, but each one is quite small. """ self._load_attributes() @@ -194,14 +217,14 @@ class Page(CopyvioMixIn): self._content = None # reset the content we just loaded self._load_attributes() - def _load_attributes(self, result=None): - """Load various data from the API in a single query. + def _load_attributes(self, result: dict | None = None) -> None: + """ + Load various data from the API in a single query. - Loads self._title, ._exists, ._is_redirect, ._pageid, ._fullurl, - ._protection, ._namespace, ._is_talkpage, ._creator, ._lastrevid, and - ._starttimestamp using the API. It will do a query of its own unless - *result* is provided, in which case we'll pretend *result* is what the - query returned. + Loads self._title, ._exists, ._is_redirect, ._pageid, ._fullurl, ._protection, + ._namespace, ._is_talkpage, ._creator, ._lastrevid, and ._starttimestamp using + the API. It will do a query of its own unless *result* is provided, in which + case we'll pretend *result* is what the query returned. Assuming the API is sound, this should not raise any exceptions. """ @@ -217,6 +240,7 @@ class Page(CopyvioMixIn): titles=self._title, ) + assert result is not None if "interwiki" in result["query"]: self._title = result["query"]["interwiki"][0]["title"] self._exists = self.PAGE_INVALID @@ -242,7 +266,7 @@ class Page(CopyvioMixIn): self._fullurl = res["fullurl"] self._protection = res["protection"] - self._starttimestamp = strftime("%Y-%m-%dT%H:%M:%SZ", gmtime()) + self._starttimestamp = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) # We've determined the namespace and talkpage status in __init__() # based on the title, but now we can be sure: @@ -256,15 +280,15 @@ class Page(CopyvioMixIn): except KeyError: pass - def _load_content(self, result=None): - """Load current page content from the API. + def _load_content(self, result: dict | None = None) -> None: + """ + Load current page content from the API. - If *result* is provided, we'll pretend that is the result of an API - query and try to get content from that. Otherwise, we'll do an API - query on our own. + If *result* is provided, we'll pretend that is the result of an API query and + try to get content from that. Otherwise, we'll do an API query on our own. - Don't call this directly, ever; use reload() followed by get() if you - want to force content reloading. + Don't call this directly, ever; use reload() followed by get() if you want to + force content reloading. """ if not result: query = self.site.api_query @@ -277,6 +301,7 @@ class Page(CopyvioMixIn): titles=self._title, ) + assert result is not None res = list(result["query"]["pages"].values())[0] try: revision = res["revisions"][0] @@ -291,32 +316,32 @@ class Page(CopyvioMixIn): def _edit( self, - params=None, - text=None, - summary=None, - minor=None, - bot=None, - force=None, - section=None, - captcha_id=None, - captcha_word=None, - **kwargs, - ): - """Edit the page! - - If *params* is given, we'll use it as our API query parameters. - Otherwise, we'll build params using the given kwargs via - _build_edit_params(). - - We'll then try to do the API query, and catch any errors the API raises - in _handle_edit_errors(). We'll then throw these back as subclasses of - EditError. + params: dict[str, Any] | None = None, + text: str | None = None, + summary: str | None = None, + minor: bool | None = None, + bot: bool | None = None, + force: bool | None = None, + section: int | str | None = None, + captcha_id: str | None = None, + captcha_word: str | None = None, + **kwargs: Any, + ) -> None: + """ + Edit the page! + + If *params* is given, we'll use it as our API query parameters. Otherwise, + we'll build params using the given kwargs via _build_edit_params(). + + We'll then try to do the API query, and catch any errors the API raises in + _handle_edit_errors(). We'll then throw these back as subclasses of EditError. """ # Weed out invalid pages before we get too far: self._assert_validity() # Build our API query string: if not params: + assert text is not None, "Edit text must be provided when params are unset" params = self._build_edit_params( text, summary, @@ -351,26 +376,26 @@ class Page(CopyvioMixIn): def _build_edit_params( self, - text, - summary, - minor, - bot, - force, - section, - captcha_id, - captcha_word, - kwargs, - ): - """Given some keyword arguments, build an API edit query string.""" - unitxt = text.encode("utf8") if isinstance(text, str) else text - hashed = md5(unitxt).hexdigest() # Checksum to ensure text is correct + text: str, + summary: str | None, + minor: bool | None, + bot: bool | None, + force: bool | None, + section: int | str | None, + captcha_id: str | None, + captcha_word: str | None, + kwargs: dict[str, Any], + ) -> dict[str, Any]: + """ + Given some keyword arguments, build an API edit query string. + """ params = { "action": "edit", "title": self._title, "text": text, "token": self.site.get_token(), "summary": summary, - "md5": hashed, + "md5": hashlib.md5(text.encode("utf-8")).hexdigest(), } if section: @@ -403,12 +428,15 @@ class Page(CopyvioMixIn): params[key] = val return params - def _handle_edit_errors(self, error, params, retry=True): - """If our edit fails due to some error, try to handle it. + def _handle_edit_errors( + self, error: APIError, params: dict[str, Any], retry: bool = True + ) -> dict: + """ + If our edit fails due to some error, try to handle it. - We'll either raise an appropriate exception (for example, if the page - is protected), or we'll try to fix it (for example, if the token is - invalid, we'll try to get a new one). + We'll either raise an appropriate exception (for example, if the page is + protected), or we'll try to fix it (for example, if the token is invalid, we'll + try to get a new one). """ perms = [ "noedit", @@ -447,27 +475,31 @@ class Page(CopyvioMixIn): raise exceptions.EditError(": ".join((error.code, error.info))) @property - def site(self): - """The page's corresponding Site object.""" + def site(self) -> Site: + """ + The page's corresponding Site object. + """ return self._site @property - def title(self): - """The page's title, or "pagename". + def title(self) -> str: + """ + The page's title, or "pagename". - This won't do any API queries on its own. Any other attributes or - methods that do API queries will reload the title, however, like - :py:attr:`exists` and :py:meth:`get`, potentially "normalizing" it or - following redirects if :py:attr:`self._follow_redirects` is ``True``. + This won't do any API queries on its own. Any other attributes or methods that + do API queries will reload the title, however, like :py:attr:`exists` and + :py:meth:`get`, potentially "normalizing" it or following redirects if + :py:attr:`self._follow_redirects` is ``True``. """ return self._title @property - def exists(self): - """Whether or not the page exists. + def exists(self) -> int: + """ + Whether or not the page exists. - This will be a number; its value does not matter, but it will equal - one of :py:attr:`self.PAGE_INVALID `, + This will be a number; its value does not matter, but it will equal one of + :py:attr:`self.PAGE_INVALID `, :py:attr:`self.PAGE_MISSING `, or :py:attr:`self.PAGE_EXISTS `. @@ -478,55 +510,60 @@ class Page(CopyvioMixIn): return self._exists @property - def pageid(self): - """An integer ID representing the page. + def pageid(self) -> int: + """ + An integer ID representing the page. Makes an API query only if we haven't already made one and the *pageid* - parameter to :py:meth:`__init__` was left as ``None``, which should be - true for all cases except when pages are returned by an SQL generator - (like :py:meth:`category.get_members() + parameter to :py:meth:`__init__` was left as ``None``, which should be true for + all cases except when pages are returned by an SQL generator (like + :py:meth:`category.get_members() `). Raises :py:exc:`~earwigbot.exceptions.InvalidPageError` or - :py:exc:`~earwigbot.exceptions.PageNotFoundError` if the page name is - invalid or the page does not exist, respectively. + :py:exc:`~earwigbot.exceptions.PageNotFoundError` if the page name is invalid + or the page does not exist, respectively. """ if self._pageid: return self._pageid if self._exists == self.PAGE_UNKNOWN: self._load() self._assert_existence() # Missing pages do not have IDs + assert self._pageid is not None, "Page exists but does not have an ID" return self._pageid @property - def url(self): - """The page's URL. + def url(self) -> str: + """ + The page's URL. - Like :py:meth:`title`, this won't do any API queries on its own. If the - API was never queried for this page, we will attempt to determine the - URL ourselves based on the title. + Like :py:meth:`title`, this won't do any API queries on its own. If the API was + never queried for this page, we will attempt to determine the URL ourselves + based on the title. """ if self._fullurl: return self._fullurl else: - encoded = self._title.encode("utf8").replace(" ", "_") - slug = quote(encoded, safe="/:").decode("utf8") - path = self.site._article_path.replace("$1", slug) + encoded = self._title.replace(" ", "_") + slug = urllib.parse.quote(encoded, safe="/:") + path = self.site.article_path.replace("$1", slug) return "".join((self.site.url, path)) @property - def namespace(self): - """The page's namespace ID (an integer). + def namespace(self) -> int: + """ + The page's namespace ID (an integer). - Like :py:meth:`title`, this won't do any API queries on its own. If the - API was never queried for this page, we will attempt to determine the - namespace ourselves based on the title. + Like :py:meth:`title`, this won't do any API queries on its own. If the API was + never queried for this page, we will attempt to determine the namespace + ourselves based on the title. """ return self._namespace @property - def lastrevid(self): - """The ID of the page's most recent revision. + def lastrevid(self) -> int | None: + """ + The ID of the page's most recent revision. Raises :py:exc:`~earwigbot.exceptions.InvalidPageError` or :py:exc:`~earwigbot.exceptions.PageNotFoundError` if the page name is @@ -538,14 +575,15 @@ class Page(CopyvioMixIn): return self._lastrevid @property - def protection(self): - """The page's current protection status. + def protection(self) -> dict | None: + """ + The page's current protection status. Makes an API query only if we haven't already made one. - Raises :py:exc:`~earwigbot.exceptions.InvalidPageError` if the page - name is invalid. Won't raise an error if the page is missing because - those can still be create-protected. + Raises :py:exc:`~earwigbot.exceptions.InvalidPageError` if the page name is + invalid. Won't raise an error if the page is missing because those can still be + create-protected. """ if self._exists == self.PAGE_UNKNOWN: self._load() @@ -553,17 +591,18 @@ class Page(CopyvioMixIn): return self._protection @property - def is_talkpage(self): - """``True`` if the page is a talkpage, otherwise ``False``. + def is_talkpage(self) -> bool: + """ + ``True`` if the page is a talkpage, otherwise ``False``. - Like :py:meth:`title`, this won't do any API queries on its own. If the - API was never queried for this page, we will attempt to determine - whether it is a talkpage ourselves based on its namespace. + Like :py:meth:`title`, this won't do any API queries on its own. If the API was + never queried for this page, we will attempt to determine whether it is a + talkpage ourselves based on its namespace. """ return self._is_talkpage @property - def is_redirect(self): + def is_redirect(self) -> bool: """``True`` if the page is a redirect, otherwise ``False``. Makes an API query only if we haven't already made one. @@ -572,34 +611,36 @@ class Page(CopyvioMixIn): """ if self._exists == self.PAGE_UNKNOWN: self._load() + assert self._is_redirect is not None return self._is_redirect - def reload(self): - """Forcibly reload the page's attributes. + def reload(self) -> None: + """ + Forcibly reload the page's attributes. - Emphasis on *reload*: this is only necessary if there is reason to - believe they have changed. + Emphasis on *reload*: this is only necessary if there is reason to believe they + have changed. """ self._load() if self._content is not None: # Only reload content if it has already been loaded: self._load_content() - def toggle_talk(self, follow_redirects=None): - """Return a content page's talk page, or vice versa. + def toggle_talk(self, follow_redirects: bool | None = None) -> Page: + """ + Return a content page's talk page, or vice versa. - The title of the new page is determined by namespace logic, not API - queries. We won't make any API queries on our own. + The title of the new page is determined by namespace logic, not API queries. + We won't make any API queries on our own. - If *follow_redirects* is anything other than ``None`` (the default), it - will be passed to the new :py:class:`~earwigbot.wiki.page.Page` - object's :py:meth:`__init__`. Otherwise, we'll use the value passed to - our own :py:meth:`__init__`. + If *follow_redirects* is anything other than ``None`` (the default), it will be + passed to the new :py:class:`~earwigbot.wiki.page.Page` object's + :py:meth:`__init__`. Otherwise, we'll use the value passed to our own + :py:meth:`__init__`. - Will raise :py:exc:`~earwigbot.exceptions.InvalidPageError` if we try - to get the talk page of a special page (in the ``Special:`` or - ``Media:`` namespaces), but we won't raise an exception if our page is - otherwise missing or invalid. + Will raise :py:exc:`~earwigbot.exceptions.InvalidPageError` if we try to get + the talk page of a special page (in the ``Special:`` or ``Media:`` namespaces), + but we won't raise an exception if our page is otherwise missing or invalid. """ if self._namespace < 0: ns = self.site.namespace_id_to_name(self._namespace) @@ -629,11 +670,12 @@ class Page(CopyvioMixIn): follow_redirects = self._follow_redirects return Page(self.site, new_title, follow_redirects) - def get(self): - """Return page content, which is cached if you try to call get again. + def get(self) -> str: + """ + Return page content, which is cached if you try to call get again. - Raises InvalidPageError or PageNotFoundError if the page name is - invalid or the page does not exist, respectively. + Raises InvalidPageError or PageNotFoundError if the page name is invalid or the + page does not exist, respectively. """ if self._exists == self.PAGE_UNKNOWN: # Kill two birds with one stone by doing an API query for both our @@ -659,6 +701,7 @@ class Page(CopyvioMixIn): self._exists = self.PAGE_UNKNOWN # Force another API query self.get() + assert self._content is not None return self._content # Make sure we're dealing with a real page here. This may be outdated @@ -669,16 +712,17 @@ class Page(CopyvioMixIn): if self._content is None: self._load_content() + assert self._content is not None return self._content - def get_redirect_target(self): - """If the page is a redirect, return its destination. + def get_redirect_target(self) -> str: + """ + If the page is a redirect, return its destination. Raises :py:exc:`~earwigbot.exceptions.InvalidPageError` or - :py:exc:`~earwigbot.exceptions.PageNotFoundError` if the page name is - invalid or the page does not exist, respectively. Raises - :py:exc:`~earwigbot.exceptions.RedirectError` if the page is not a - redirect. + :py:exc:`~earwigbot.exceptions.PageNotFoundError` if the page name is invalid + or the page does not exist, respectively. Raises + :py:exc:`~earwigbot.exceptions.RedirectError` if the page is not a redirect. """ re_redirect = r"^\s*\#\s*redirect\s*\[\[(.*?)\]\]" content = self.get() @@ -688,19 +732,20 @@ class Page(CopyvioMixIn): e = "The page does not appear to have a redirect target." raise exceptions.RedirectError(e) - def get_creator(self): - """Return the User object for the first person to edit the page. + def get_creator(self) -> User: + """ + Return the User object for the first person to edit the page. - Makes an API query only if we haven't already made one. Normally, we - can get the creator along with everything else (except content) in - :py:meth:`_load_attributes`. However, due to a limitation in the API - (can't get the editor of one revision and the content of another at - both ends of the history), if our other attributes were only loaded - through :py:meth:`get`, we'll have to do another API query. + Makes an API query only if we haven't already made one. Normally, we can get + the creator along with everything else (except content) in + :py:meth:`_load_attributes`. However, due to a limitation in the API (can't get + the editor of one revision and the content of another at both ends of the + history), if our other attributes were only loaded through :py:meth:`get`, + we'll have to do another API query. Raises :py:exc:`~earwigbot.exceptions.InvalidPageError` or - :py:exc:`~earwigbot.exceptions.PageNotFoundError` if the page name is - invalid or the page does not exist, respectively. + :py:exc:`~earwigbot.exceptions.PageNotFoundError` if the page name is invalid + or the page does not exist, respectively. """ if self._exists == self.PAGE_UNKNOWN: self._load() @@ -710,41 +755,59 @@ class Page(CopyvioMixIn): self._assert_existence() return self.site.get_user(self._creator) - def parse(self): - """Parse the page content for templates, links, etc. + def parse(self) -> mwparserfromhell.wikicode.Wikicode: + """ + Parse the page content for templates, links, etc. Actual parsing is handled by :py:mod:`mwparserfromhell`. Raises :py:exc:`~earwigbot.exceptions.InvalidPageError` or - :py:exc:`~earwigbot.exceptions.PageNotFoundError` if the page name is - invalid or the page does not exist, respectively. + :py:exc:`~earwigbot.exceptions.PageNotFoundError` if the page name is invalid + or the page does not exist, respectively. """ return mwparserfromhell.parse(self.get()) - def edit(self, text, summary, minor=False, bot=True, force=False, **kwargs): - """Replace the page's content or creates a new page. + def edit( + self, + text: str, + summary: str | None, + minor: bool = False, + bot: bool = True, + force: bool = False, + **kwargs: Any, + ) -> None: + """ + Replace the page's content or creates a new page. - *text* is the new page content, with *summary* as the edit summary. - If *minor* is ``True``, the edit will be marked as minor. If *bot* is - ``True``, the edit will be marked as a bot edit, but only if we - actually have a bot flag. + *text* is the new page content, with *summary* as the edit summary. If *minor* + is ``True``, the edit will be marked as minor. If *bot* is ``True``, the edit + will be marked as a bot edit, but only if we actually have a bot flag. - Use *force* to push the new content even if there's an edit conflict or - the page was deleted/recreated between getting our edit token and - editing our page. Be careful with this! + Use *force* to push the new content even if there's an edit conflict or the + page was deleted/recreated between getting our edit token and editing our page. + Be careful with this! """ self._edit( text=text, summary=summary, minor=minor, bot=bot, force=force, **kwargs ) - def add_section(self, text, title, minor=False, bot=True, force=False, **kwargs): - """Add a new section to the bottom of the page. + def add_section( + self, + text: str, + title: str, + minor: bool = False, + bot: bool = True, + force: bool = False, + **kwargs: Any, + ) -> None: + """ + Add a new section to the bottom of the page. - The arguments for this are the same as those for :py:meth:`edit`, but - instead of providing a summary, you provide a section title. Likewise, - raised exceptions are the same as :py:meth:`edit`'s. + The arguments for this are the same as those for :py:meth:`edit`, but instead + of providing a summary, you provide a section title. Likewise, raised + exceptions are the same as :py:meth:`edit`'s. - This should create the page if it does not already exist, with just the - new section as content. + This should create the page if it does not already exist, with just the new + section as content. """ self._edit( text=text, @@ -756,25 +819,27 @@ class Page(CopyvioMixIn): **kwargs, ) - def check_exclusion(self, username=None, optouts=None): - """Check whether or not we are allowed to edit the page. + def check_exclusion( + self, username: str | None = None, optouts: Iterable[str] | None = None + ) -> bool: + """ + Check whether or not we are allowed to edit the page. Return ``True`` if we *are* allowed to edit this page, and ``False`` if we aren't. - *username* is used to determine whether we are part of a specific list - of allowed or disallowed bots (e.g. ``{{bots|allow=EarwigBot}}`` or - ``{{bots|deny=FooBot,EarwigBot}}``). It's ``None`` by default, which - will swipe our username from :py:meth:`site.get_user() + *username* is used to determine whether we are part of a specific list of + allowed or disallowed bots (e.g. ``{{bots|allow=EarwigBot}}`` or + ``{{bots|deny=FooBot,EarwigBot}}``). It's ``None`` by default, which will swipe + our username from :py:meth:`site.get_user() `.\ :py:attr:`~earwigbot.wiki.user.User.name`. - *optouts* is a list of messages to consider this check as part of for - the purpose of opt-out; it defaults to ``None``, which ignores the - parameter completely. For example, if *optouts* is ``["nolicense"]``, - we'll return ``False`` on ``{{bots|optout=nolicense}}`` or - ``{{bots|optout=all}}``, but `True` on - ``{{bots|optout=orfud,norationale,replaceable}}``. + *optouts* is a list of messages to consider this check as part of for the + purpose of opt-out; it defaults to ``None``, which ignores the parameter + completely. For example, if *optouts* is ``["nolicense"]``, we'll return + ``False`` on ``{{bots|optout=nolicense}}`` or ``{{bots|optout=all}}``, but + `True` on ``{{bots|optout=orfud,norationale,replaceable}}``. """ def parse_param(template, param): diff --git a/earwigbot/wiki/site.py b/earwigbot/wiki/site.py index 3f32ce8..e8ca6da 100644 --- a/earwigbot/wiki/site.py +++ b/earwigbot/wiki/site.py @@ -18,44 +18,78 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. -from http.cookiejar import CookieJar -from json import dumps -from logging import NullHandler, getLogger -from os.path import expanduser +from __future__ import annotations + +import json +import os.path +import time +import typing +from collections.abc import Callable, Generator, Sequence +from http.cookiejar import Cookie, CookieJar +from logging import Logger, NullHandler, getLogger from threading import RLock -from time import sleep, time +from typing import Any, Literal, ParamSpec, TypedDict, TypeVar from urllib.parse import unquote_plus, urlparse import requests +from requests.cookies import RequestsCookieJar from requests_oauthlib import OAuth1 from earwigbot import exceptions, importer from earwigbot.wiki import constants from earwigbot.wiki.category import Category +from earwigbot.wiki.constants import Service from earwigbot.wiki.page import Page from earwigbot.wiki.user import User -pymysql = importer.new("pymysql") +if typing.TYPE_CHECKING: + import pymysql + import pymysql.cursors + from pymysql.cursors import Cursor +else: + pymysql = importer.new("pymysql") __all__ = ["Site"] +T = TypeVar("T") +P = ParamSpec("P") + +ApiParams = dict[str, str | int] +ApiResult = dict[str, Any] +SqlConnInfo = dict[str, Any] + + +class OAuthInfo(TypedDict): + consumer_token: str + consumer_secret: str + access_token: str + access_secret: str + + +class _ApiInfoCache(TypedDict): + maxlag: int + lastcheck: float + + +class _SqlInfoCache(TypedDict): + replag: int + lastcheck: float + usable: bool | None + class Site: """ **EarwigBot: Wiki Toolset: Site** Represents a site, with support for API queries and returning - :py:class:`~earwigbot.wiki.page.Page`, - :py:class:`~earwigbot.wiki.user.User`, - and :py:class:`~earwigbot.wiki.category.Category` objects. The constructor - takes a bunch of arguments and you probably won't need to call it directly, - rather :py:meth:`wiki.get_site() ` - for returning :py:class:`Site` - instances, :py:meth:`wiki.add_site() - ` for adding new ones to our - database, and :py:meth:`wiki.remove_site() - ` for removing old ones from - our database, should suffice. + :py:class:`~earwigbot.wiki.page.Page`, :py:class:`~earwigbot.wiki.user.User`, and + :py:class:`~earwigbot.wiki.category.Category` objects. The constructor takes a + bunch of arguments and you probably won't need to call it directly, rather + :py:meth:`wiki.get_site() ` for returning + :py:class:`Site` instances, :py:meth:`wiki.add_site() + ` for adding new ones to our database, and + :py:meth:`wiki.remove_site() ` for + removing old ones from our database, should suffice. *Attributes:* @@ -80,8 +114,6 @@ class Site: - :py:meth:`delegate`: controls when the API or SQL is used """ - SERVICE_API = 1 - SERVICE_SQL = 2 SPECIAL_TOKENS = [ "createaccount", "deleteglobalaccount", @@ -95,52 +127,51 @@ class Site: def __init__( self, - name=None, - project=None, - lang=None, - base_url=None, - article_path=None, - script_path=None, - sql=None, - namespaces=None, - login=(None, None), - oauth=None, - cookiejar=None, - user_agent=None, - use_https=True, - assert_edit=None, - maxlag=None, - wait_between_queries=1, - logger=None, - search_config=None, - ): - """Constructor for new Site instances. - - This probably isn't necessary to call yourself unless you're building a - Site that's not in your config and you don't want to add it - normally - all you need is wiki.get_site(name), which creates the Site for you - based on your config file and the sites database. We accept a bunch of - kwargs, but the only ones you really "need" are *base_url* and - *script_path*; this is enough to figure out an API url. *login*, a - tuple of (username, password), can be used to log in using the legacy - BotPasswords system; otherwise, a dict of OAuth info should be provided - to *oauth*. *cookiejar* will be used to store cookies, and we'll use a + name: str | None = None, + project: str | None = None, + lang: str | None = None, + base_url: str | None = None, + article_path: str | None = None, + script_path: str | None = None, + sql: SqlConnInfo | None = None, + namespaces: dict[int, list[str]] | None = None, + login: tuple[str, str] | tuple[None, None] = (None, None), + oauth: OAuthInfo | None = None, + cookiejar: CookieJar | None = None, + user_agent: str | None = None, + use_https: bool = True, + assert_edit: bool | None = None, + maxlag: int | None = None, + wait_between_queries: int = 1, + logger: Logger | None = None, + search_config: dict[str, Any] | None = None, + ) -> None: + """ + Constructor for new Site instances. + + This probably isn't necessary to call yourself unless you're building a Site + that's not in your config and you don't want to add it - normally all you need + is wiki.get_site(name), which creates the Site for you based on your config file + and the sites database. We accept a bunch of kwargs, but the only ones you + really "need" are *base_url* and *script_path*; this is enough to figure out an + API url. *login*, a tuple of (username, password), can be used to log in using + the legacy BotPasswords system; otherwise, a dict of OAuth info should be + provided to *oauth*. *cookiejar* will be used to store cookies, and we'll use a normal CookieJar if none is given. - First, we'll store the given arguments as attributes, then set up our - requests session. We'll load any of the attributes that weren't given - from the API, and then log in if a username/pass was given and we - aren't already logged in. + First, we'll store the given arguments as attributes, then set up our requests + session. We'll load any of the attributes that weren't given from the API, and + then log in if a username/pass was given and we aren't already logged in. """ - # Attributes referring to site information, filled in by an API query - # if they are missing (and an API url can be determined): + # Attributes referring to site information, filled in by an API query # if they + # are missing (and an API url can be determined): self._name = name self._project = project self._lang = lang self._base_url = base_url self._article_path = article_path self._script_path = script_path - self._namespaces = namespaces + self._namespaces: dict[int, list[str]] | None = namespaces # Attributes used for API queries: self._use_https = use_https @@ -149,18 +180,18 @@ class Site: self._wait_between_queries = wait_between_queries self._max_retries = 6 self._last_query_time = 0 - self._tokens = {} + self._tokens: dict[str, str] = {} self._api_lock = RLock() - self._api_info_cache = {"maxlag": 0, "lastcheck": 0} + self._api_info_cache = _ApiInfoCache(maxlag=0, lastcheck=0) # Attributes used for SQL queries: if sql: self._sql_data = sql else: - self._sql_data = {} + self._sql_data: SqlConnInfo = {} self._sql_conn = None self._sql_lock = RLock() - self._sql_info_cache = {"replag": 0, "lastcheck": 0, "usable": None} + self._sql_info_cache = _SqlInfoCache(replag=0, lastcheck=0, usable=None) # Attribute used in copyright violation checks (see CopyrightMixIn): if search_config: @@ -178,7 +209,7 @@ class Site: user_agent = constants.USER_AGENT # Set default UA self._oauth = oauth self._session = requests.Session() - self._session.cookies = self._cookiejar + self._session.cookies = typing.cast(RequestsCookieJar, self._cookiejar) self._session.headers["User-Agent"] = user_agent if oauth: self._session.auth = OAuth1( @@ -199,14 +230,16 @@ class Site: self._load_attributes() # If we have a name/pass and the API says we're not logged in, log in: - self._login_info = name, password = login - if not self._oauth and name and password: + self._login_user, self._login_password = login + if not self._oauth and self._login_user and self._login_password: logged_in_as = self._get_username_from_cookies() - if not logged_in_as or name.replace("_", " ") != logged_in_as: - self._login(login) + if not logged_in_as or self._login_user.replace("_", " ") != logged_in_as: + self._login() - def __repr__(self): - """Return the canonical string representation of the Site.""" + def __repr__(self) -> str: + """ + Return the canonical string representation of the Site. + """ res = ", ".join( ( "Site(name={_name!r}", @@ -225,8 +258,7 @@ class Site: "cookiejar={2})", ) ) - name, password = self._login_info - login = "({}, {})".format(repr(name), "hidden" if password else None) + login = f"({self._login_user!r}, {'hidden' if self._login_password else None})" oauth = "hidden" if self._oauth else None cookies = self._cookiejar.__class__.__name__ if hasattr(self._cookiejar, "filename"): @@ -236,45 +268,42 @@ class Site: agent = self.user_agent return res.format(login, oauth, cookies, agent, **self.__dict__) - def __str__(self): - """Return a nice string representation of the Site.""" + def __str__(self) -> str: + """ + Return a nice string representation of the Site. + """ res = "" return res.format(self.name, self.project, self.lang, self.domain) - def _unicodeify(self, value, encoding="utf8"): - """Return input as unicode if it's not unicode to begin with.""" - if isinstance(value, str): - return value - return str(value, encoding) - def _api_query( self, - params, - tries=0, - wait=5, - ignore_maxlag=False, - no_assert=False, - ae_retry=True, - ): - """Do an API query with *params* as a dict of parameters. - - See the documentation for :py:meth:`api_query` for full implementation - details. *tries*, *wait*, and *ignore_maxlag* are for maxlag; - *no_assert* and *ae_retry* are for AssertEdit. - """ - since_last_query = time() - self._last_query_time # Throttling support + params: ApiParams, + tries: int = 0, + wait: int = 5, + ignore_maxlag: bool = False, + no_assert: bool = False, + ae_retry: bool = True, + ) -> ApiResult: + """ + Do an API query with *params* as a dict of parameters. + + See the documentation for :py:meth:`api_query` for full implementation details. + *tries*, *wait*, and *ignore_maxlag* are for maxlag; *no_assert* and *ae_retry* + are for AssertEdit. + """ + since_last_query = time.time() - self._last_query_time # Throttling support if since_last_query < self._wait_between_queries: wait_time = self._wait_between_queries - since_last_query log = f"Throttled: waiting {round(wait_time, 2)} seconds" self._logger.debug(log) - sleep(wait_time) - self._last_query_time = time() + time.sleep(wait_time) + self._last_query_time = time.time() url, params = self._build_api_query(params, ignore_maxlag, no_assert) if "lgpassword" in params: self._logger.debug(f"{url} -> ") else: - data = dumps(params) + data = json.dumps(params) if len(data) > 1000: self._logger.debug(f"{url} -> {data[:997]}...") else: @@ -288,20 +317,28 @@ class Site: return self._handle_api_result(response, params, tries, wait, ae_retry) - def _request_csrf_token(self, params): - """If possible, add a request for a CSRF token to an API query.""" + def _request_csrf_token(self, params: ApiParams) -> None: + """ + If possible, add a request for a CSRF token to an API query. + """ if params.get("action") == "query": if params.get("meta"): + assert isinstance(params["meta"], str), params["meta"] if "tokens" not in params["meta"].split("|"): params["meta"] += "|tokens" else: params["meta"] = "tokens" if params.get("type"): + assert isinstance(params["type"], str), params["type"] if "csrf" not in params["type"].split("|"): params["type"] += "|csrf" - def _build_api_query(self, params, ignore_maxlag, no_assert): - """Given API query params, return the URL to query and POST data.""" + def _build_api_query( + self, params: ApiParams, ignore_maxlag: bool, no_assert: bool + ) -> tuple[str, ApiParams]: + """ + Given API query params, return the URL to query and POST data. + """ if not self._base_url or self._script_path is None: e = "Tried to do an API query, but no API URL is known." raise exceptions.APIError(e) @@ -319,8 +356,17 @@ class Site: self._request_csrf_token(params) return url, params - def _handle_api_result(self, response, params, tries, wait, ae_retry): - """Given an API query response, attempt to return useful data.""" + def _handle_api_result( + self, + response: requests.Response, + params: ApiParams, + tries: int, + wait: int, + ae_retry: bool, + ) -> ApiResult: + """ + Given an API query response, attempt to return useful data. + """ try: res = response.json() except ValueError: @@ -357,16 +403,22 @@ class Site: tries += 1 msg = 'Server says "{0}"; retrying in {1} seconds ({2}/{3})' self._logger.info(msg.format(info, wait, tries, self._max_retries)) - sleep(wait) + time.sleep(wait) return self._api_query(params, tries, wait * 2, ae_retry=ae_retry) elif code in ["assertuserfailed", "assertbotfailed"]: # AssertEdit - if ae_retry and all(self._login_info) and not self._oauth: + if ( + ae_retry + and self._login_user + and self._login_password + and not self._oauth + ): # Try to log in if we got logged out: - self._login(self._login_info) + self._login() if "token" in params: # Fetch a new one; this is invalid now + assert isinstance(params["action"], str), params["action"] params["token"] = self.get_token(params["action"]) return self._api_query(params, tries, wait, ae_retry=False) - if not all(self._login_info) and not self._oauth: + if not self._oauth and not (self._login_user and self._login_password): e = "Assertion failed, and no login info was provided." elif code == "assertbotfailed": e = "Bot assertion failed: we don't have a bot flag!" @@ -379,15 +431,16 @@ class Site: error.code, error.info = code, info raise error - def _load_attributes(self, force=False): - """Load data about our Site from the API. + def _load_attributes(self, force: bool = False) -> None: + """ + Load data about our Site from the API. - This function is called by __init__() when one of the site attributes - was not given as a keyword argument. We'll do an API query to get the - missing data, but only if there actually *is* missing data. + This function is called by __init__() when one of the site attributes was not + given as a keyword argument. We'll do an API query to get the missing data, but + only if there actually *is* missing data. - Additionally, you can call this with *force* set to True to forcibly - reload all attributes. + Additionally, you can call this with *force* set to True to forcibly reload + all attributes. """ # All attributes to be loaded, except _namespaces, which is a special # case because it requires additional params in the API query: @@ -400,16 +453,24 @@ class Site: self._script_path, ] - params = {"action": "query", "meta": "siteinfo", "siprop": "general"} + params: ApiParams = { + "action": "query", + "meta": "siteinfo", + "siprop": "general", + } if not self._namespaces or force: + assert isinstance(params["siprop"], str), params["siprop"] params["siprop"] += "|namespaces|namespacealiases" with self._api_lock: result = self._api_query(params, no_assert=True) self._load_namespaces(result) - elif all(attrs): # Everything is already specified and we're not told - return # to force a reload, so do nothing - else: # We're only loading attributes other than _namespaces + elif all(attrs): + # Everything is already specified and we're not told to force a reload, + # so do nothing + return + else: + # We're only loading attributes other than _namespaces with self._api_lock: result = self._api_query(params, no_assert=True) @@ -421,11 +482,12 @@ class Site: self._article_path = res["articlepath"] self._script_path = res["scriptpath"] - def _load_namespaces(self, result): - """Fill self._namespaces with a dict of namespace IDs and names. + def _load_namespaces(self, result: ApiResult) -> None: + """ + Fill self._namespaces with a dict of namespace IDs and names. - Called by _load_attributes() with API data as *result* when - self._namespaces was not given as an kwarg to __init__(). + Called by _load_attributes() with API data as *result* when self._namespaces + was not given as an kwarg to __init__(). """ self._namespaces = {} @@ -447,39 +509,42 @@ class Site: alias = namespace["*"] self._namespaces[ns_id].append(alias) - def _get_cookie(self, name, domain): - """Return the named cookie unless it is expired or doesn't exist.""" + def _get_cookie(self, name: str, domain: str) -> Cookie | None: + """ + Return the named cookie unless it is expired or doesn't exist. + """ for cookie in self._cookiejar: if cookie.name == name and cookie.domain == domain: if cookie.is_expired(): break return cookie - def _get_username_from_cookies(self): - """Try to return our username based solely on cookies. + def _get_username_from_cookies(self) -> str | None: + """ + Try to return our username based solely on cookies. - First, we'll look for a cookie named self._name + "Token", like - "enwikiToken". If it exists and isn't expired, we'll assume it's valid - and try to return the value of the cookie self._name + "UserName" (like - "enwikiUserName"). This should work fine on wikis without single-user - login. + First, we'll look for a cookie named self._name + "Token", like "enwikiToken". + If it exists and isn't expired, we'll assume it's valid and try to return the + value of the cookie self._name + "UserName" (like "enwikiUserName"). This + should work fine on wikis without single-user login. If `enwikiToken` doesn't exist, we'll try to find a cookie named - `centralauth_Token`. If this exists and is not expired, we'll try to - return the value of `centralauth_User`. + `centralauth_Token`. If this exists and is not expired, we'll try to return the + value of `centralauth_User`. - If we didn't get any matches, we'll return None. Our goal here isn't to - return the most likely username, or what we *want* our username to be - (for that, we'd do self._login_info[0]), but rather to get our current - username without an unnecessary ?action=query&meta=userinfo API query. + If we didn't get any matches, we'll return None. Our goal here isn't to return + the most likely username, or what we *want* our username to be (for that, we'd + do self._login_user), but rather to get our current username without an + unnecessary ?action=query&meta=userinfo API query. """ - name = "".join((self._name, "Token")) + name = f"{self.name}Token" cookie = self._get_cookie(name, self.domain) if cookie: - name = "".join((self._name, "UserName")) + name = f"{self.name}UserName" user_name = self._get_cookie(name, self.domain) if user_name: + assert user_name.value, user_name return unquote_plus(user_name.value) for cookie in self._cookiejar: @@ -491,85 +556,88 @@ class Site: if self.domain.endswith(base): user_name = self._get_cookie("centralauth_User", cookie.domain) if user_name: + assert user_name.value, user_name return unquote_plus(user_name.value) - def _get_username_from_api(self): - """Do a simple API query to get our username and return it. + def _get_username_from_api(self) -> str: + """ + Do a simple API query to get our username and return it. - This is a reliable way to make sure we are actually logged in, because - it doesn't deal with annoying cookie logic, but it results in an API - query that is unnecessary in some cases. + This is a reliable way to make sure we are actually logged in, because it + doesn't deal with annoying cookie logic, but it results in an API query that is + unnecessary in some cases. - Called by _get_username() (in turn called by get_user() with no - username argument) when cookie lookup fails, probably indicating that - we are logged out. + Called by _get_username() (in turn called by get_user() with no username + argument) when cookie lookup fails, probably indicating that we are logged out. """ result = self.api_query(action="query", meta="userinfo") return result["query"]["userinfo"]["name"] - def _get_username(self): - """Return the name of the current user, whether logged in or not. + def _get_username(self) -> str: + """ + Return the name of the current user, whether logged in or not. - First, we'll try to deduce it solely from cookies, to avoid an - unnecessary API query. For the cookie-detection method, see - _get_username_from_cookies()'s docs. + First, we'll try to deduce it solely from cookies, to avoid an unnecessary API + query. For the cookie-detection method, see _get_username_from_cookies()'s docs. - If our username isn't in cookies, then we're either using OAuth or - we're probably not logged in, or something fishy is going on (like - forced logout). If we're using OAuth and a username was configured, - assume it is accurate and use it. Otherwise, do a single API query for - our username (or IP address) and return that. + If our username isn't in cookies, then we're either using OAuth or we're + probably not logged in, or something fishy is going on (like forced logout). + If we're using OAuth and a username was configured, assume it is accurate and + use it. Otherwise, do a single API query for our username (or IP address) and + return that. """ name = self._get_username_from_cookies() if name: return name - if self._oauth and self._login_info[0]: - return self._login_info[0] + if self._oauth and self._login_user: + return self._login_user return self._get_username_from_api() - def _should_save_cookiejar(self): - """Return a bool indicating whether we should save the cookiejar. + def _should_save_cookiejar(self) -> bool: + """ + Return a bool indicating whether we should save the cookiejar. - This is True if we haven't saved the cookiejar yet this session, or if - our last save was over a day ago. + This is True if we haven't saved the cookiejar yet this session, or if our last + save was over a day ago. """ max_staleness = 60 * 60 * 24 # 1 day if not self._last_cookiejar_save: return True - return time() - self._last_cookiejar_save > max_staleness + return time.time() - self._last_cookiejar_save > max_staleness - def _save_cookiejar(self): - """Try to save our cookiejar after doing a (normal) login or logout. + def _save_cookiejar(self) -> None: + """ + Try to save our cookiejar after doing a (normal) login or logout. - Calls the standard .save() method with no filename. Don't fret if our - cookiejar doesn't support saving (CookieJar raises AttributeError, - FileCookieJar raises NotImplementedError) or no default filename was - given (LWPCookieJar and MozillaCookieJar raise ValueError). + Calls the standard .save() method with no filename. Don't fret if our cookiejar + doesn't support saving (CookieJar raises AttributeError, FileCookieJar raises + NotImplementedError) or no default filename was given (LWPCookieJar and + MozillaCookieJar raise ValueError). """ if hasattr(self._cookiejar, "save"): try: getattr(self._cookiejar, "save")() except (NotImplementedError, ValueError): pass - self._last_cookiejar_save = time() + self._last_cookiejar_save = time.time() - def _login(self, login): - """Safely login through the API. - - Normally, this is called by __init__() if a username and password have - been provided and no valid login cookies were found. The only other - time it needs to be called is when those cookies expire, which is done - automatically by api_query() if a query fails. + def _login(self) -> None: + """ + Safely login through the API. - *login* is a (username, password) tuple. + Normally, this is called by __init__() if a username and password have been + provided and no valid login cookies were found. The only other time it needs to + be called is when those cookies expire, which is done automatically by + api_query() if a query fails. Raises LoginError on login errors (duh), like bad passwords and nonexistent usernames. """ + assert self._login_user + assert self._login_password self._tokens.clear() - name, password = login - params = {"action": "query", "meta": "tokens", "type": "login"} + params: ApiParams = {"action": "query", "meta": "tokens", "type": "login"} with self._api_lock: result = self._api_query(params, no_assert=True) try: @@ -579,8 +647,8 @@ class Site: params = { "action": "login", - "lgname": name, - "lgpassword": password, + "lgname": self._login_user, + "lgpassword": self._login_password, "lgtoken": token, } with self._api_lock: @@ -603,27 +671,29 @@ class Site: e = f"Couldn't login; server says '{res}'." raise exceptions.LoginError(e) - def _logout(self): - """Safely logout through the API. + def _logout(self) -> None: + """ + Safely logout through the API. - We'll do a simple API request (api.php?action=logout), clear our - cookiejar (which probably contains now-invalidated cookies) and try to - save it, if it supports that sort of thing. + We'll do a simple API request (api.php?action=logout), clear our cookiejar + (which probably contains now-invalidated cookies) and try to save it, if it + supports that sort of thing. """ self.api_query(action="logout") self._cookiejar.clear() self._save_cookiejar() - def _sql_connect(self, **kwargs): - """Attempt to establish a connection with this site's SQL database. + def _sql_connect(self, **kwargs: Any) -> pymysql.Connection[Cursor]: + """ + Attempt to establish a connection with this site's SQL database. - pymysql.connect() will be called with self._sql_data as its kwargs. - Any kwargs given to this function will be passed to connect() and will - have precedence over the config file. + pymysql.connect() will be called with self._sql_data as its kwargs. Any kwargs + given to this function will be passed to connect() and will have precedence + over the config file. - Will raise SQLError() if the module "pymysql" is not available. pymysql - may raise its own exceptions (e.g. pymysql.InterfaceError) if it cannot - establish a connection. + Will raise SQLError() if the module "pymysql" is not available. pymysql may + raise its own exceptions (e.g. pymysql.InterfaceError) if it cannot establish + a connection. """ args = self._sql_data for key, value in kwargs.items(): @@ -633,35 +703,35 @@ class Site: and "user" not in args and "passwd" not in args ): - args["read_default_file"] = expanduser("~/.my.cnf") + args["read_default_file"] = os.path.expanduser("~/.my.cnf") elif "read_default_file" in args: - args["read_default_file"] = expanduser(args["read_default_file"]) + args["read_default_file"] = os.path.expanduser(args["read_default_file"]) if "autoping" not in args: args["autoping"] = True if "autoreconnect" not in args: args["autoreconnect"] = True try: - self._sql_conn = pymysql.connect(**args) + return pymysql.connect(**args) except ImportError: e = "SQL querying requires the 'pymysql' package: https://pymysql.readthedocs.io/" raise exceptions.SQLError(e) - def _get_service_order(self): - """Return a preferred order for using services (e.g. the API and SQL). - - A list is returned, starting with the most preferred service first and - ending with the least preferred one. Currently, there are only two - services. SERVICE_API will always be included since the API is expected - to be always usable. In normal circumstances, self.SERVICE_SQL will be - first (with the API second), since using SQL directly is easier on the - servers than making web queries with the API. self.SERVICE_SQL will be - second if replag is greater than three minutes (a cached value updated - every two minutes at most), *unless* API lag is also very high. - self.SERVICE_SQL will not be included in the list if we cannot form a - proper SQL connection. - """ - now = time() + def _get_service_order(self) -> list[Service]: + """ + Return a preferred order for using services (e.g. the API and SQL). + + A list is returned, starting with the most preferred service first and ending + with the least preferred one. Currently, there are only two services. + SERVICE_API will always be included since the API is expected to be always + usable. In normal circumstances, self.SERVICE_SQL will be first (with the API + second), since using SQL directly is easier on the servers than making web + queries with the API. self.SERVICE_SQL will be second if replag is greater than + three minutes (a cached value updated every two minutes at most), *unless* API + lag is also very high. self.SERVICE_SQL will not be included in the list if we + cannot form a proper SQL connection. + """ + now = time.time() if now - self._sql_info_cache["lastcheck"] > 120: self._sql_info_cache["lastcheck"] = now try: @@ -671,16 +741,16 @@ class Site: raise exceptions.SQLError(str(exc)) except (exceptions.SQLError, ImportError): self._sql_info_cache["usable"] = False - return [self.SERVICE_API] + return [Service.API] self._sql_info_cache["usable"] = True else: if not self._sql_info_cache["usable"]: - return [self.SERVICE_API] + return [Service.API] sqllag = self._sql_info_cache["replag"] if sqllag > 300: if not self._maxlag: - return [self.SERVICE_API, self.SERVICE_SQL] + return [Service.API, Service.SQL] if now - self._api_info_cache["lastcheck"] > 300: self._api_info_cache["lastcheck"] = now try: @@ -690,35 +760,59 @@ class Site: else: apilag = self._api_info_cache["maxlag"] if apilag > self._maxlag: - return [self.SERVICE_SQL, self.SERVICE_API] - return [self.SERVICE_API, self.SERVICE_SQL] + return [Service.SQL, Service.API] + return [Service.API, Service.SQL] - return [self.SERVICE_SQL, self.SERVICE_API] + return [Service.SQL, Service.API] @property - def name(self): - """The Site's name (or "wikiid" in the API), like ``"enwiki"``.""" + def name(self) -> str: + """ + The Site's name (or "wikiid" in the API), like ``"enwiki"``. + """ + assert self._name is not None return self._name @property - def project(self): - """The Site's project name in lowercase, like ``"wikipedia"``.""" + def project(self) -> str: + """ + The Site's project name in lowercase, like ``"wikipedia"``. + """ + assert self._project is not None return self._project @property - def lang(self): - """The Site's language code, like ``"en"`` or ``"es"``.""" + def lang(self) -> str: + """ + The Site's language code, like ``"en"`` or ``"es"``. + """ + assert self._lang is not None return self._lang @property - def domain(self): - """The Site's web domain, like ``"en.wikipedia.org"``.""" - return urlparse(self._base_url).netloc + def base_url(self) -> str: + """ + The Site's base URL, like ``"https://en.wikipedia.org"``. + + May be protocol-relative (e.g. ``"//en.wikipedia.org"``). See :py:attr:`url` + for an alternative. + """ + assert self._base_url is not None + return self._base_url + + @property + def domain(self) -> str: + """ + The Site's web domain, like ``"en.wikipedia.org"``. + """ + return urlparse(self.base_url).netloc @property - def url(self): - """The Site's full base URL, like ``"https://en.wikipedia.org"``.""" - url = self._base_url + def url(self) -> str: + """ + The Site's full base URL, like ``"https://en.wikipedia.org"``. + """ + url = self.base_url if url.startswith("//"): # Protocol-relative URLs from 1.18 if self._use_https: url = "https:" + url @@ -727,37 +821,63 @@ class Site: return url @property - def user_agent(self): - """The User-Agent header sent to the API by the requests session.""" - return self._session.headers["User-Agent"] - - def api_query(self, **kwargs): - """Do an API query with `kwargs` as the parameters. - - This will first attempt to construct an API url from - :py:attr:`self._base_url` and :py:attr:`self._script_path`. We need - both of these, or else we'll raise - :py:exc:`~earwigbot.exceptions.APIError`. If - :py:attr:`self._base_url` is protocol-relative (introduced in MediaWiki - 1.18), we'll choose HTTPS only if :py:attr:`self._user_https` is - ``True``, otherwise HTTP. - - We'll encode the given params, adding ``format=json`` along the way, as - well as ``&assert=`` and ``&maxlag=`` based on - :py:attr:`self._assert_edit` and :py:attr:`_maxlag` respectively. - Additionally, we'll sleep a bit if the last query was made fewer than - :py:attr:`self._wait_between_queries` seconds ago. The request is made - through :py:attr:`self._session`, which has cookie support + def article_path(self) -> str: + """ + The base URL used to construct internal links, like ``"/wiki/$1"``. + """ + assert self._article_path is not None + return self._article_path + + @property + def script_path(self) -> str: + """ + The base URL used to refer to other parts of the wiki, like ``"/w"``. + """ + assert self._script_path is not None + return self._script_path + + @property + def user_agent(self) -> str: + """ + The User-Agent header sent to the API by the requests session. + """ + user_agent = self._session.headers["User-Agent"] + assert isinstance(user_agent, str), user_agent + return user_agent + + @property + def namespaces(self) -> dict[int, list[str]]: + """ + The mapping of namespace IDs to namespace names. + """ + assert self._namespaces + return self._namespaces + + def api_query(self, **kwargs: str | int) -> ApiResult: + """ + Do an API query with `kwargs` as the parameters. + + This will first attempt to construct an API url from :py:attr:`self._base_url` + and :py:attr:`self._script_path`. We need both of these, or else we'll raise + :py:exc:`~earwigbot.exceptions.APIError`. If :py:attr:`self._base_url` is + protocol-relative (introduced in MediaWiki 1.18), we'll choose HTTPS only if + :py:attr:`self._user_https` is ``True``, otherwise HTTP. + + We'll encode the given params, adding ``format=json`` along the way, as well as + ``&assert=`` and ``&maxlag=`` based on :py:attr:`self._assert_edit` and + :py:attr:`_maxlag` respectively. Additionally, we'll sleep a bit if the last + query was made fewer than :py:attr:`self._wait_between_queries` seconds ago. + The request is made through :py:attr:`self._session`, which has cookie support (:py:attr:`self._cookiejar`) and a ``User-Agent`` (:py:const:`earwigbot.wiki.constants.USER_AGENT`). - Assuming everything went well, we'll gunzip the data (if compressed), - load it as a JSON object, and return it. + Assuming everything went well, we'll gunzip the data (if compressed), load it + as a JSON object, and return it. If our request failed for some reason, we'll raise - :py:exc:`~earwigbot.exceptions.APIError` with details. If that - reason was due to maxlag, we'll sleep for a bit and then repeat the - query until we exceed :py:attr:`self._max_retries`. + :py:exc:`~earwigbot.exceptions.APIError` with details. If that reason was due + to maxlag, we'll sleep for a bit and then repeat the query until we exceed + :py:attr:`self._max_retries`. There is helpful MediaWiki API documentation at `MediaWiki.org `_. @@ -765,31 +885,63 @@ class Site: with self._api_lock: return self._api_query(kwargs) + @typing.overload + def sql_query( + self, + query: str, + params: Sequence[Any] = (), + *, + dict_cursor: Literal[False] = False, + cursor_class: None = None, + buffsize: int = 1024, + ) -> Generator[tuple[Any, ...], None, None]: ... + + @typing.overload + def sql_query( + self, + query: str, + params: Sequence[Any] = (), + *, + dict_cursor: Literal[True], + cursor_class: None = None, + buffsize: int = 1024, + ) -> Generator[dict[str, Any], None, None]: ... + + @typing.overload + def sql_query( + self, + query: str, + params: Sequence[Any] = (), + *, + dict_cursor: bool = False, + cursor_class: type[pymysql.cursors.DictCursor], + buffsize: int = 1024, + ) -> Generator[dict[str, Any], None, None]: ... + def sql_query( self, - query, - params=(), - plain_query=False, - dict_cursor=False, - cursor_class=None, - buffsize=1024, - ): - """Do an SQL query and yield its results. + query: str, + params: Sequence[Any] = (), + *, + dict_cursor: bool = False, + cursor_class: type[Cursor] | None = None, + buffsize: int = 1024, + ) -> Generator[tuple[Any, ...] | dict[str, Any], None, None]: + """ + Do an SQL query and yield its results. If *plain_query* is ``True``, we will force an unparameterized query. Specifying both *params* and *plain_query* will cause an error. If - *dict_cursor* is ``True``, we will use - :py:class:`pymysql.cursors.DictCursor` as our cursor, otherwise the - default :py:class:`pymysql.cursors.Cursor`. If *cursor_class* is given, - it will override this option. - - *buffsize* is the size of each memory-buffered group of results, to - reduce the number of conversations with the database; it is passed to - :py:meth:`cursor.fetchmany() `. If - set to ``0```, all results will be buffered in memory at once (this - uses :py:meth:`fetchall() `). If set - to ``1``, it is equivalent to using - :py:meth:`fetchone() `. + *dict_cursor* is ``True``, we will use :py:class:`pymysql.cursors.DictCursor` + as our cursor, otherwise the default :py:class:`pymysql.cursors.Cursor`. If + *cursor_class* is given, it will override this option. + + *buffsize* is the size of each memory-buffered group of results, to reduce the + number of conversations with the database; it is passed to + :py:meth:`cursor.fetchmany() `. If set to + ``0```, all results will be buffered in memory at once (this uses + :py:meth:`fetchall() `). If set to ``1``, it + is equivalent to using :py:meth:`fetchone() `. Example usage:: @@ -802,14 +954,13 @@ class Site: >>> for row in result2: print row {'user_id': 7418060L, 'user_registration': '20080703215134'} - This may raise :py:exc:`~earwigbot.exceptions.SQLError` or one of - pymysql's exceptions (:py:exc:`pymysql.ProgrammingError`, - :py:exc:`pymysql.InterfaceError`, ...) if there were problems with the - query. + This may raise :py:exc:`~earwigbot.exceptions.SQLError` or one of pymysql's + exceptions (:py:exc:`pymysql.ProgrammingError`, + :py:exc:`pymysql.InterfaceError`, ...) if there were problems with the query. - See :py:meth:`_sql_connect` for information on how a connection is - acquired. Also relevant is `pymysql's documentation - `_ for details on that package. + See :py:meth:`_sql_connect` for information on how a connection is acquired. + Also relevant is `pymysql's documentation `_ + for details on that package. """ if not cursor_class: if dict_cursor: @@ -820,33 +971,40 @@ class Site: with self._sql_lock: if not self._sql_conn: - self._sql_connect() + self._sql_conn = self._sql_connect() + with self._sql_conn.cursor(klass) as cur: - cur.execute(query, params, plain_query) + cur.execute(query, params) if buffsize: - while True: - group = cur.fetchmany(buffsize) - if not group: - return - for result in group: - yield result - for result in cur.fetchall(): - yield result - - def get_maxlag(self, showall=False): - """Return the internal database replication lag in seconds. - - In a typical setup, this function returns the replication lag *within* - the WMF's cluster, *not* external replication lag affecting the - Toolserver (see :py:meth:`get_replag` for that). This is useful when - combined with the ``maxlag`` API query param (added by config), in - which queries will be halted and retried if the lag is too high, - usually above five seconds. - - With *showall*, will return a list of the lag for all servers in the - cluster, not just the one with the highest lag. - """ - params = {"action": "query", "meta": "siteinfo", "siprop": "dbrepllag"} + while group := cur.fetchmany(buffsize): + yield from group + else: + yield from cur.fetchall() + + @typing.overload + def get_maxlag(self, showall: Literal[False] = False) -> int: ... + + @typing.overload + def get_maxlag(self, showall: Literal[True]) -> list[int]: ... + + def get_maxlag(self, showall: bool = False) -> int | list[int]: + """ + Return the internal database replication lag in seconds. + + In a typical setup, this function returns the replication lag *within* the + WMF's cluster, *not* external replication lag affecting Toolforge (see + :py:meth:`get_replag` for that). This is useful when combined with the + ``maxlag`` API query param (added by config), in which queries will be halted + and retried if the lag is too high, usually above five seconds. + + With *showall*, will return a list of the lag for all servers in the cluster, + not just the one with the highest lag. + """ + params: ApiParams = { + "action": "query", + "meta": "siteinfo", + "siprop": "dbrepllag", + } if showall: params["sishowalldb"] = 1 with self._api_lock: @@ -855,17 +1013,18 @@ class Site: return [server["lag"] for server in result["query"]["dbrepllag"]] return result["query"]["dbrepllag"][0]["lag"] - def get_replag(self): - """Return the estimated external database replication lag in seconds. + def get_replag(self) -> int: + """ + Return the estimated external database replication lag in seconds. - Requires SQL access. This function only makes sense on a replicated - database (e.g. the Wikimedia Toolserver) and on a wiki that receives a - large number of edits (ideally, at least one per second), or the result - may be larger than expected, since it works by subtracting the current - time from the timestamp of the latest recent changes event. + Requires SQL access. This function only makes sense on a replicated database + (e.g. Wikimedia Toolforge) and on a wiki that receives a large number of edits + (ideally, at least one per second), or the result may be larger than expected, + since it works by subtracting the current time from the timestamp of the latest + recent changes event. - This may raise :py:exc:`~earwigbot.exceptions.SQLError` or one of - pymysql's exceptions (:py:exc:`pymysql.ProgrammingError`, + This may raise :py:exc:`~earwigbot.exceptions.SQLError` or one of pymysql's + exceptions (:py:exc:`pymysql.ProgrammingError`, :py:exc:`pymysql.InterfaceError`, ...) if there were problems. """ query = """SELECT UNIX_TIMESTAMP() - UNIX_TIMESTAMP(rc_timestamp) FROM @@ -873,13 +1032,13 @@ class Site: result = list(self.sql_query(query)) return int(result[0][0]) - def get_token(self, action=None, force=False): - """Return a token for a data-modifying API action. + def get_token(self, action: str | None = None, force: bool = False) -> str: + """ + Return a token for a data-modifying API action. - In general, this will be a CSRF token, unless *action* is in a special - list of non-CSRF tokens. Tokens are cached for the session (until - :meth:`_login` is called again); set *force* to ``True`` to force a new - token to be fetched. + In general, this will be a CSRF token, unless *action* is in a special list of + non-CSRF tokens. Tokens are cached for the session (until :meth:`_login` is + called again); set *force* to ``True`` to force a new token to be fetched. Raises :exc:`.APIError` if there was an API issue. """ @@ -894,39 +1053,46 @@ class Site: raise exceptions.APIError(err.format(action, res)) return self._tokens[action] - def namespace_id_to_name(self, ns_id: int, all: bool = False) -> str: - """Given a namespace ID, returns associated namespace names. + @typing.overload + def namespace_id_to_name(self, ns_id: int, all: Literal[False] = False) -> str: ... - If *all* is ``False`` (default), we'll return the first name in the - list, which is usually the localized version. Otherwise, we'll return - the entire list, which includes the canonical name. For example, this - returns ``u"Wikipedia"`` if *ns_id* = ``4`` and *all* is ``False`` on - ``enwiki``; returns ``[u"Wikipedia", u"Project", u"WP"]`` if *ns_id* = - ``4`` and *all* is ``True``. + @typing.overload + def namespace_id_to_name(self, ns_id: int, all: Literal[True]) -> list[str]: ... - Raises :py:exc:`~earwigbot.exceptions.NamespaceNotFoundError` if the ID - is not found. + def namespace_id_to_name(self, ns_id: int, all: bool = False) -> str | list[str]: + """ + Given a namespace ID, returns associated namespace names. + + If *all* is ``False`` (default), we'll return the first name in the list, which + is usually the localized version. Otherwise, we'll return the entire list, + which includes the canonical name. For example, this returns ``"Wikipedia"`` + if *ns_id* = ``4`` and *all* is ``False`` on ``enwiki``; returns + ``["Wikipedia", "Project", "WP"]`` if *ns_id* = ``4`` and *all* is ``True``. + + Raises :py:exc:`~earwigbot.exceptions.NamespaceNotFoundError` if the ID is + not found. """ try: if all: - return self._namespaces[ns_id] + return self.namespaces[ns_id] else: - return self._namespaces[ns_id][0] + return self.namespaces[ns_id][0] except KeyError: e = f"There is no namespace with id {ns_id}." raise exceptions.NamespaceNotFoundError(e) - def namespace_name_to_id(self, name): - """Given a namespace name, returns the associated ID. + def namespace_name_to_id(self, name: str) -> int: + """ + Given a namespace name, returns the associated ID. - Like :py:meth:`namespace_id_to_name`, but reversed. Case is ignored, - because namespaces are assumed to be case-insensitive. + Like :py:meth:`namespace_id_to_name`, but reversed. Case is ignored, because + namespaces are assumed to be case-insensitive. - Raises :py:exc:`~earwigbot.exceptions.NamespaceNotFoundError` if the - name is not found. + Raises :py:exc:`~earwigbot.exceptions.NamespaceNotFoundError` if the name is + not found. """ lname = name.lower() - for ns_id, names in self._namespaces.items(): + for ns_id, names in self.namespaces.items(): lnames = [n.lower() for n in names] # Be case-insensitive if lname in lnames: return ns_id @@ -934,21 +1100,23 @@ class Site: e = f"There is no namespace with name '{name}'." raise exceptions.NamespaceNotFoundError(e) - def get_page(self, title, follow_redirects=False, pageid=None): - """Return a :py:class:`Page` object for the given title. + def get_page( + self, title: str, follow_redirects: bool = False, pageid: int | None = None + ) -> Page: + """ + Return a :py:class:`Page` object for the given title. - *follow_redirects* is passed directly to - :py:class:`~earwigbot.wiki.page.Page`'s constructor. Also, this will - return a :py:class:`~earwigbot.wiki.category.Category` object instead - if the given title is in the category namespace. As - :py:class:`~earwigbot.wiki.category.Category` is a subclass of - :py:class:`~earwigbot.wiki.page.Page`, this should not cause problems. + *follow_redirects* is passed directly to :py:class:`~earwigbot.wiki.page.Page`'s + constructor. Also, this will return a + :py:class:`~earwigbot.wiki.category.Category` object instead if the given title + is in the category namespace. As :py:class:`~earwigbot.wiki.category.Category` + is a subclass of :py:class:`~earwigbot.wiki.page.Page`, this should not + cause problems. Note that this doesn't do any direct checks for existence or redirect-following: :py:class:`~earwigbot.wiki.page.Page`'s methods provide that. """ - title = self._unicodeify(title) prefixes = self.namespace_id_to_name(constants.NS_CATEGORY, all=True) prefix = title.split(":", 1)[0] if prefix != title: # Avoid a page that is simply "Category" @@ -956,51 +1124,47 @@ class Site: return Category(self, title, follow_redirects, pageid, self._logger) return Page(self, title, follow_redirects, pageid, self._logger) - def get_category(self, catname, follow_redirects=False, pageid=None): - """Return a :py:class:`Category` object for the given category name. + def get_category( + self, catname: str, follow_redirects: bool = False, pageid: int | None = None + ) -> Category: + """ + Return a :py:class:`Category` object for the given category name. - *catname* should be given *without* a namespace prefix. This method is - really just shorthand for :py:meth:`get_page("Category:" + catname) - `. + *catname* should be given *without* a namespace prefix. This method is really + just shorthand for :py:meth:`get_page("Category:" + catname) `. """ - catname = self._unicodeify(catname) prefix = self.namespace_id_to_name(constants.NS_CATEGORY) pagename = ":".join((prefix, catname)) return Category(self, pagename, follow_redirects, pageid, self._logger) - def get_user(self, username=None): - """Return a :py:class:`User` object for the given username. + def get_user(self, username: str | None = None) -> User: + """ + Return a :py:class:`User` object for the given username. - If *username* is left as ``None``, then a - :py:class:`~earwigbot.wiki.user.User` object representing the currently - logged-in (or anonymous!) user is returned. + If *username* is left as ``None``, then a :py:class:`~earwigbot.wiki.user.User` + object representing the currently logged-in (or anonymous!) user is returned. """ - if username: - username = self._unicodeify(username) - else: + if not username: username = self._get_username() return User(self, username, self._logger) - def delegate(self, services, args=None, kwargs=None): - """Delegate a task to either the API or SQL depending on conditions. + def delegate( + self, services: dict[Service, Callable[P, T]], *args: P.args, **kwargs: P.kwargs + ) -> T: + """ + Delegate a task to either the API or SQL depending on conditions. *services* should be a dictionary in which the key is the service name (:py:attr:`self.SERVICE_API ` or - :py:attr:`self.SERVICE_SQL `), and the value is the - function to call for this service. All functions will be passed the - same arguments the tuple *args* and the dict *kwargs*, which are both - empty by default. The service order is determined by - :py:meth:`_get_service_order`. + :py:attr:`self.SERVICE_SQL `), and the value is the function to + call for this service. All functions will be passed the same arguments the + tuple *args* and the dict *kwargs*, which are both empty by default. The + service order is determined by :py:meth:`_get_service_order`. Not every service needs an entry in the dictionary. Will raise - :py:exc:`~earwigbot.exceptions.NoServiceError` if an appropriate - service cannot be found. + :py:exc:`~earwigbot.exceptions.NoServiceError` if an appropriate service cannot + be found. """ - if not args: - args = () - if not kwargs: - kwargs = {} - order = self._get_service_order() for srv in order: if srv in services: diff --git a/earwigbot/wiki/sitesdb.py b/earwigbot/wiki/sitesdb.py index 5e66df0..91e8072 100644 --- a/earwigbot/wiki/sitesdb.py +++ b/earwigbot/wiki/sitesdb.py @@ -18,78 +18,102 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. +from __future__ import annotations + import errno import sqlite3 as sqlite import stat +import typing from collections import OrderedDict -from http.cookiejar import LoadError, LWPCookieJar +from dataclasses import dataclass +from http.cookiejar import CookieJar, LoadError, LWPCookieJar from os import chmod, path from platform import python_version from earwigbot import __version__ from earwigbot.exceptions import SiteNotFoundError from earwigbot.wiki.copyvios.exclusions import ExclusionsDB -from earwigbot.wiki.site import Site +from earwigbot.wiki.site import Site, SqlConnInfo + +if typing.TYPE_CHECKING: + from earwigbot.bot import Bot __all__ = ["SitesDB"] +@dataclass(frozen=True) +class _SiteInfoFromDB: + name: str + project: str + lang: str + base_url: str + article_path: str + script_path: str + sql: SqlConnInfo + namespaces: dict[int, list[str]] + + class SitesDB: """ **EarwigBot: Wiki Toolset: Sites Database Manager** - This class controls the :file:`sites.db` file, which stores information - about all wiki sites known to the bot. Three public methods act as bridges - between the bot's config files and :py:class:`~earwigbot.wiki.site.Site` - objects: + This class controls the :file:`sites.db` file, which stores information about all + wiki sites known to the bot. Three public methods act as bridges between the bot's + config files and :py:class:`~earwigbot.wiki.site.Site` objects: - :py:meth:`get_site`: returns a Site object corresponding to a site - :py:meth:`add_site`: stores a site in the database - :py:meth:`remove_site`: removes a site from the database - There's usually no need to use this class directly. All public methods - here are available as :py:meth:`bot.wiki.get_site`, - :py:meth:`bot.wiki.add_site`, and :py:meth:`bot.wiki.remove_site`, which - use a :file:`sites.db` file located in the same directory as our - :file:`config.yml` file. Lower-level access can be achieved by importing - the manager class (``from earwigbot.wiki import SitesDB``). + There's usually no need to use this class directly. All public methods here are + available as :py:meth:`bot.wiki.get_site`, :py:meth:`bot.wiki.add_site`, and + :py:meth:`bot.wiki.remove_site`, which use a :file:`sites.db` file located in the + same directory as our :file:`config.yml` file. Lower-level access can be achieved + by importing the manager class (``from earwigbot.wiki import SitesDB``). """ - def __init__(self, bot): - """Set up the manager with an attribute for the base Bot object.""" + def __init__(self, bot: Bot) -> None: + """ + Set up the manager with an attribute for the base Bot object. + """ self.config = bot.config self._logger = bot.logger.getChild("wiki") - self._sites = {} # Internal site cache + self._sites: dict[str, Site] = {} # Internal site cache self._sitesdb = path.join(bot.config.root_dir, "sites.db") self._cookie_file = path.join(bot.config.root_dir, ".cookies") - self._cookiejar = None + self._cookiejar: CookieJar | None = None excl_db = path.join(bot.config.root_dir, "exclusions.db") excl_logger = self._logger.getChild("exclusionsdb") self._exclusions_db = ExclusionsDB(self, excl_db, excl_logger) - def __repr__(self): - """Return the canonical string representation of the SitesDB.""" + def __repr__(self) -> str: + """ + Return the canonical string representation of the SitesDB. + """ res = "SitesDB(config={0!r}, sitesdb={1!r}, cookie_file={2!r})" return res.format(self.config, self._sitesdb, self._cookie_file) - def __str__(self): - """Return a nice string representation of the SitesDB.""" + def __str__(self) -> str: + """ + Return a nice string representation of the SitesDB. + """ return f"" - def _get_cookiejar(self): - """Return a LWPCookieJar object loaded from our .cookies file. + def _get_cookiejar(self) -> CookieJar: + """ + Return a LWPCookieJar object loaded from our .cookies file. - The same .cookies file is returned every time, located in the project - root, same directory as config.yml and bot.py. If it doesn't exist, we - will create the file and set it to be readable and writeable only by - us. If it exists but the information inside is bogus, we'll ignore it. + The same .cookies file is returned every time, located in the project root, + same directory as config.yml and bot.py. If it doesn't exist, we will create + the file and set it to be readable and writeable only by us. If it exists but + the information inside is bogus, we'll ignore it. - This is normally called by _make_site_object() (in turn called by - get_site()), and the cookiejar is passed to our Site's constructor, - used when it makes API queries. This way, we can easily preserve - cookies between sites (e.g., for CentralAuth), making logins easier. + This is normally called by _make_site_object() (in turn called by get_site()), + and the cookiejar is passed to our Site's constructor, used when it makes API + queries. This way, we can easily preserve cookies between sites (e.g., for + CentralAuth), making logins easier. """ if self._cookiejar: return self._cookiejar @@ -111,8 +135,10 @@ class SitesDB: return self._cookiejar - def _create_sitesdb(self): - """Initialize the sitesdb file with its three necessary tables.""" + def _create_sitesdb(self) -> None: + """ + Initialize the sitesdb file with its three necessary tables. + """ script = """ CREATE TABLE sites (site_name, site_project, site_lang, site_base_url, site_article_path, site_script_path); @@ -122,11 +148,12 @@ class SitesDB: with sqlite.connect(self._sitesdb) as conn: conn.executescript(script) - def _get_site_object(self, name): - """Return the site from our cache, or create it if it doesn't exist. + def _get_site_object(self, name: str) -> Site: + """ + Return the site from our cache, or create it if it doesn't exist. - This is essentially just a wrapper around _make_site_object that - returns the same object each time a specific site is asked for. + This is essentially just a wrapper around _make_site_object that returns the + same object each time a specific site is asked for. """ try: return self._sites[name] @@ -135,14 +162,12 @@ class SitesDB: self._sites[name] = site return site - def _load_site_from_sitesdb(self, name): - """Return all information stored in the sitesdb relating to given site. + def _load_site_from_sitesdb(self, name: str) -> _SiteInfoFromDB: + """ + Return all information stored in the sitesdb relating to given site. - The information will be returned as a tuple, containing the site's - name, project, language, base URL, article path, script path, SQL - connection data, and namespaces, in that order. If the site is not - found in the database, SiteNotFoundError will be raised. An empty - database will be created before the exception is raised if none exists. + If the site is not found in the database, SiteNotFoundError will be raised. An + empty database will be created before the exception is raised if none exists. """ query1 = "SELECT * FROM sites WHERE site_name = ?" query2 = "SELECT sql_data_key, sql_data_value FROM sql_data WHERE sql_site = ?" @@ -161,7 +186,7 @@ class SitesDB: name, project, lang, base_url, article_path, script_path = site_data sql = dict(sql_data) - namespaces = {} + namespaces: dict[int, list[str]] = {} for ns_id, ns_name, ns_is_primary_name in ns_data: try: if ns_is_primary_name: # "Primary" name goes first in list @@ -171,7 +196,7 @@ class SitesDB: except KeyError: namespaces[ns_id] = [ns_name] - return ( + return _SiteInfoFromDB( name, project, lang, @@ -182,16 +207,16 @@ class SitesDB: namespaces, ) - def _make_site_object(self, name): - """Return a Site object associated with the site *name* in our sitesdb. + def _make_site_object(self, name: str) -> Site: + """ + Return a Site object associated with the site *name* in our sitesdb. - This calls _load_site_from_sitesdb(), so SiteNotFoundError will be - raised if the site is not in our sitesdb. + This calls _load_site_from_sitesdb(), so SiteNotFoundError will be raised if + the site is not in our sitesdb. """ cookiejar = self._get_cookiejar() - (name, project, lang, base_url, article_path, script_path, sql, namespaces) = ( - self._load_site_from_sitesdb(name) - ) + info = self._load_site_from_sitesdb(name) + name = info.name config = self.config login = (config.wiki.get("username"), config.wiki.get("password")) @@ -213,6 +238,7 @@ class SitesDB: search_config["nltk_dir"] = nltk_dir search_config["exclusions_db"] = self._exclusions_db + sql = info.sql if not sql: sql = config.wiki.get("sql", OrderedDict()).copy() for key, value in sql.items(): @@ -221,13 +247,13 @@ class SitesDB: return Site( name=name, - project=project, - lang=lang, - base_url=base_url, - article_path=article_path, - script_path=script_path, + project=info.project, + lang=info.lang, + base_url=info.base_url, + article_path=info.article_path, + script_path=info.script_path, sql=sql, - namespaces=namespaces, + namespaces=info.namespaces, login=login, oauth=oauth, cookiejar=cookiejar, @@ -240,18 +266,18 @@ class SitesDB: search_config=search_config, ) - def _get_site_name_from_sitesdb(self, project, lang): - """Return the name of the first site with the given project and lang. + def _get_site_name_from_sitesdb(self, project: str, lang: str) -> str | None: + """ + Return the name of the first site with the given project and lang. - If we can't find the site with the given information, we'll also try - searching for a site whose base_url contains "{lang}.{project}". There - are a few sites, like the French Wikipedia, that set their project to - something other than the expected "wikipedia" ("wikipédia" in this - case), but we should correctly find them when doing get_site(lang="fr", - project="wikipedia"). + If we can't find the site with the given information, we'll also try searching + for a site whose base_url contains "{lang}.{project}". There are a few sites, + like the French Wikipedia, that set their project to something other than the + expected "wikipedia" ("wikipédia" in this case), but we should correctly find + them when doing get_site(lang="fr", project="wikipedia"). - If the site is not found, return None. An empty sitesdb will be created - if none exists. + If the site is not found, return None. An empty sitesdb will be created if + none exists. """ query1 = "SELECT site_name FROM sites WHERE site_project = ? and site_lang = ?" query2 = "SELECT site_name FROM sites WHERE site_base_url LIKE ?" @@ -267,26 +293,27 @@ class SitesDB: except sqlite.OperationalError: self._create_sitesdb() - def _add_site_to_sitesdb(self, site): - """Extract relevant info from a Site object and add it to the sitesdb. + def _add_site_to_sitesdb(self, site: Site) -> None: + """ + Extract relevant info from a Site object and add it to the sitesdb. - Works like a reverse _load_site_from_sitesdb(); the site's project, - language, base URL, article path, script path, SQL connection data, and - namespaces are extracted from the site and inserted into the sites - database. If the sitesdb doesn't exist, we'll create it first. + Works like a reverse _load_site_from_sitesdb(); the site's project, language, + base URL, article path, script path, SQL connection data, and namespaces are + extracted from the site and inserted into the sites database. If the sitesdb + doesn't exist, we'll create it first. """ name = site.name sites_data = ( name, site.project, site.lang, - site._base_url, - site._article_path, - site._script_path, + site.base_url, + site.article_path, + site.script_path, ) sql_data = [(name, key, val) for key, val in site._sql_data.items()] - ns_data = [] - for ns_id, ns_names in site._namespaces.items(): + ns_data: list[tuple[str, int, str, bool]] = [] + for ns_id, ns_names in site.namespaces.items(): ns_data.append((name, ns_id, ns_names.pop(0), True)) for ns_name in ns_names: ns_data.append((name, ns_id, ns_name, False)) @@ -306,8 +333,10 @@ class SitesDB: conn.executemany("INSERT INTO sql_data VALUES (?, ?, ?)", sql_data) conn.executemany("INSERT INTO namespaces VALUES (?, ?, ?, ?)", ns_data) - def _remove_site_from_sitesdb(self, name): - """Remove a site by name from the sitesdb and the internal cache.""" + def _remove_site_from_sitesdb(self, name: str) -> bool: + """ + Remove a site by name from the sitesdb and the internal cache. + """ try: del self._sites[name] except KeyError: @@ -323,30 +352,34 @@ class SitesDB: self._logger.info(f"Removed site '{name}'") return True - def get_site(self, name=None, project=None, lang=None): - """Return a Site instance based on information from the sitesdb. + def get_site( + self, + name: str | None = None, + project: str | None = None, + lang: str | None = None, + ) -> Site: + """ + Return a Site instance based on information from the sitesdb. - With no arguments, return the default site as specified by our config - file. This is ``config.wiki["defaultSite"]``. + With no arguments, return the default site as specified by our config file. + This is ``config.wiki["defaultSite"]``. - With *name* specified, return the site with that name. This is - equivalent to the site's ``wikiid`` in the API, like *enwiki*. + With *name* specified, return the site with that name. This is equivalent to + the site's ``wikiid`` in the API, like *enwiki*. - With *project* and *lang* specified, return the site whose project and - language match these values. If there are multiple sites with the same - values (unlikely), this is not a reliable way of loading a site. Call - the function with an explicit *name* in that case. + With *project* and *lang* specified, return the site whose project and language + match these values. If there are multiple sites with the same values + (unlikely), this is not a reliable way of loading a site. Call the function + with an explicit *name* in that case. We will attempt to login to the site automatically using - ``config.wiki["username"]`` and ``config.wiki["password"]`` if both are - defined. - - Specifying a project without a lang or a lang without a project will - raise :py:exc:`TypeError`. If all three args are specified, *name* will - be first tried, then *project* and *lang* if *name* doesn't work. If a - site cannot be found in the sitesdb, - :py:exc:`~earwigbot.exceptions.SiteNotFoundError` will be raised. An - empty sitesdb will be created if none is found. + ``config.wiki["username"]`` and ``config.wiki["password"]`` if both are defined. + + Specifying a project without a lang or a lang without a project will raise + :py:exc:`TypeError`. If all three args are specified, *name* will be first + tried, then *project* and *lang* if *name* doesn't work. If a site cannot be + found in the sitesdb, :py:exc:`~earwigbot.exceptions.SiteNotFoundError` will be + raised. An empty sitesdb will be created if none is found. """ # Someone specified a project without a lang, or vice versa: if (project and not lang) or (not project and lang): @@ -374,6 +407,7 @@ class SitesDB: raise # If we end up here, then project and lang are the only args given: + assert project is not None and lang is not None, (project, lang) name = self._get_site_name_from_sitesdb(project, lang) if name: return self._get_site_object(name) @@ -381,30 +415,34 @@ class SitesDB: raise SiteNotFoundError(e) def add_site( - self, project=None, lang=None, base_url=None, script_path="/w", sql=None - ): - """Add a site to the sitesdb so it can be retrieved with get_site(). + self, + project: str | None = None, + lang: str | None = None, + base_url: str | None = None, + script_path: str = "/w", + sql: SqlConnInfo | None = None, + ) -> Site: + """ + Add a site to the sitesdb so it can be retrieved with get_site(). If only a project and a lang are given, we'll guess the *base_url* as - ``"//{lang}.{project}.org"`` (which is protocol-relative, becoming - ``"https"`` if *useHTTPS* is ``True`` in config otherwise ``"http"``). - If this is wrong, provide the correct *base_url* as an argument (in - which case project and lang are ignored). Most wikis use ``"/w"`` as - the script path (meaning the API is located at - ``"{base_url}{script_path}/api.php"`` -> - ``"//{lang}.{project}.org/w/api.php"``), so this is the default. If - your wiki is different, provide the script_path as an argument. SQL - connection settings are guessed automatically using config's template - value. If this is wrong or not specified, provide a dict of kwargs as - *sql* and Site will pass it to :py:func:`pymysql.connect(**sql) - `, allowing you to make queries with - :py:meth:`site.sql_query `. - - Returns ``True`` if the site was added successfully or ``False`` if the - site is already in our sitesdb (this can be done purposefully to update - old site info). Raises :py:exc:`~earwigbot.exception.SiteNotFoundError` - if not enough information has been provided to identify the site (e.g. - a *project* but not a *lang*). + ``"//{lang}.{project}.org"`` (which is protocol-relative, becoming ``"https"`` + if *useHTTPS* is ``True`` in config otherwise ``"http"``). If this is wrong, + provide the correct *base_url* as an argument (in which case project and lang + are ignored). Most wikis use ``"/w"`` as the script path (meaning the API is + located at ``"{base_url}{script_path}/api.php"`` -> + ``"//{lang}.{project}.org/w/api.php"``), so this is the default. If your wiki + is different, provide the script_path as an argument. SQL connection settings + are guessed automatically using config's template value. If this is wrong or + not specified, provide a dict of kwargs as *sql* and Site will pass it to + :py:func:`pymysql.connect(**sql) `, allowing you to make + queries with :py:meth:`site.sql_query `. + + Returns ``True`` if the site was added successfully or ``False`` if the site is + already in our sitesdb (this can be done purposefully to update old site info). + Raises :py:exc:`~earwigbot.exception.SiteNotFoundError` if not enough + information has been provided to identify the site (e.g. a *project* but not + a *lang*). """ if not base_url: if not project or not lang: @@ -445,7 +483,12 @@ class SitesDB: self._add_site_to_sitesdb(site) return self._get_site_object(site.name) - def remove_site(self, name=None, project=None, lang=None): + def remove_site( + self, + name: str | None = None, + project: str | None = None, + lang: str | None = None, + ) -> bool: """Remove a site from the sitesdb. Returns ``True`` if the site was removed successfully or ``False`` if diff --git a/earwigbot/wiki/user.py b/earwigbot/wiki/user.py index 9aab1a9..89e2853 100644 --- a/earwigbot/wiki/user.py +++ b/earwigbot/wiki/user.py @@ -1,4 +1,4 @@ -# Copyright (C) 2009-2015 Ben Kurtovic +# Copyright (C) 2009-2024 Ben Kurtovic # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal @@ -18,14 +18,21 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. -from logging import NullHandler, getLogger -from socket import AF_INET, AF_INET6, inet_pton -from time import gmtime, strptime +from __future__ import annotations + +import socket +import time +import typing +from logging import Logger, NullHandler, getLogger +from typing import Any, Literal from earwigbot.exceptions import UserNotFoundError from earwigbot.wiki import constants from earwigbot.wiki.page import Page +if typing.TYPE_CHECKING: + from earwigbot.wiki.site import Site + __all__ = ["User"] @@ -33,10 +40,9 @@ class User: """ **EarwigBot: Wiki Toolset: User** - Represents a user on a given :py:class:`~earwigbot.wiki.site.Site`. Has - methods for getting a bunch of information about the user, such as - editcount and user rights, methods for returning the user's userpage and - talkpage, etc. + Represents a user on a given :py:class:`~earwigbot.wiki.site.Site`. Has methods for + getting a bunch of information about the user, such as editcount and user rights, + methods for returning the user's userpage and talkpage, etc. *Attributes:* @@ -56,24 +62,23 @@ class User: *Public methods:* - :py:meth:`reload`: forcibly reloads the user's attributes - - :py:meth:`get_userpage`: returns a Page object representing the user's - userpage - - :py:meth:`get_talkpage`: returns a Page object representing the user's - talkpage + - :py:meth:`get_userpage`: returns a Page object representing the user's userpage + - :py:meth:`get_talkpage`: returns a Page object representing the user's talkpage """ - def __init__(self, site, name, logger=None): - """Constructor for new User instances. + def __init__(self, site: Site, name: str, logger: Logger | None = None) -> None: + """ + Constructor for new User instances. - Takes two arguments, a Site object (necessary for doing API queries), - and the name of the user, preferably without "User:" in front, although - this prefix will be automatically removed by the API if given. + Takes two arguments, a Site object (necessary for doing API queries), and the + name of the user, preferably without "User:" in front, although this prefix + will be automatically removed by the API if given. - You can also use site.get_user() instead, which returns a User object, - and is preferred. + You can also use site.get_user() instead, which returns a User object, and + is preferred. - We won't do any API queries yet for basic information about the user - - save that for when the information is requested. + We won't do any API queries yet for basic information about the user - save + that for when the information is requested. """ self._site = site self._name = name @@ -85,22 +90,27 @@ class User: self._logger = getLogger("earwigbot.wiki") self._logger.addHandler(NullHandler()) - def __repr__(self): - """Return the canonical string representation of the User.""" + def __repr__(self) -> str: + """ + Return the canonical string representation of the User. + """ return f"User(name={self._name!r}, site={self._site!r})" - def __str__(self): - """Return a nice string representation of the User.""" + def __str__(self) -> str: + """ + Return a nice string representation of the User. + """ return f'' - def _get_attribute(self, attr): - """Internally used to get an attribute by name. + def _get_attribute(self, attr: str) -> Any: + """ + Internally used to get an attribute by name. - We'll call _load_attributes() to get this (and all other attributes) - from the API if it is not already defined. + We'll call _load_attributes() to get this (and all other attributes) from the + API if it is not already defined. - Raises UserNotFoundError if a nonexistant user prevents us from - returning a certain attribute. + Raises UserNotFoundError if a nonexistant user prevents us from returning a + certain attribute. """ if not hasattr(self, attr): self._load_attributes() @@ -109,11 +119,12 @@ class User: raise UserNotFoundError(e) return getattr(self, attr) - def _load_attributes(self): - """Internally used to load all attributes from the API. + def _load_attributes(self) -> None: + """ + Internally used to load all attributes from the API. - Normally, this is called by _get_attribute() when a requested attribute - is not defined. This defines it. + Normally, this is called by _get_attribute() when a requested attribute is not + defined. This defines it. """ props = "blockinfo|groups|rights|editcount|registration|emailable|gender" result = self.site.api_query( @@ -150,11 +161,11 @@ class User: reg = res["registration"] try: - self._registration = strptime(reg, "%Y-%m-%dT%H:%M:%SZ") + self._registration = time.strptime(reg, "%Y-%m-%dT%H:%M:%SZ") except TypeError: # Sometimes the API doesn't give a date; the user's probably really # old. There's nothing else we can do! - self._registration = gmtime(0) + self._registration = time.gmtime(0) try: res["emailable"] @@ -166,24 +177,28 @@ class User: self._gender = res["gender"] @property - def site(self): - """The user's corresponding Site object.""" + def site(self) -> Site: + """ + The user's corresponding Site object. + """ return self._site @property - def name(self): - """The user's username. + def name(self) -> str: + """ + The user's username. - This will never make an API query on its own, but if one has already - been made by the time this is retrieved, the username may have been - "normalized" from the original input to the constructor, converted into - a Unicode object, with underscores removed, etc. + This will never make an API query on its own, but if one has already been made + by the time this is retrieved, the username may have been "normalized" from the + original input to the constructor, converted into a Unicode object, with + underscores removed, etc. """ return self._name @property - def exists(self): - """``True`` if the user exists, or ``False`` if they do not. + def exists(self) -> bool: + """ + ``True`` if the user exists, or ``False`` if they do not. Makes an API query only if we haven't made one already. """ @@ -192,124 +207,135 @@ class User: return self._exists @property - def userid(self): - """An integer ID used by MediaWiki to represent the user. + def userid(self) -> int: + """ + An integer ID used by MediaWiki to represent the user. - Raises :py:exc:`~earwigbot.exceptions.UserNotFoundError` if the user - does not exist. Makes an API query only if we haven't made one already. + Raises :py:exc:`~earwigbot.exceptions.UserNotFoundError` if the user does not + exist. Makes an API query only if we haven't made one already. """ return self._get_attribute("_userid") @property - def blockinfo(self): - """Information about any current blocks on the user. + def blockinfo(self) -> dict[str, Any] | Literal[False]: + """ + Information about any current blocks on the user. - If the user is not blocked, returns ``False``. If they are, returns a - dict with three keys: ``"by"`` is the blocker's username, ``"reason"`` - is the reason why they were blocked, and ``"expiry"`` is when the block - expires. + If the user is not blocked, returns ``False``. If they are, returns a dict with + three keys: ``"by"`` is the blocker's username, ``"reason"`` is the reason why + they were blocked, and ``"expiry"`` is when the block expires. - Raises :py:exc:`~earwigbot.exceptions.UserNotFoundError` if the user - does not exist. Makes an API query only if we haven't made one already. + Raises :py:exc:`~earwigbot.exceptions.UserNotFoundError` if the user does not + exist. Makes an API query only if we haven't made one already. """ return self._get_attribute("_blockinfo") @property - def groups(self): - """A list of groups this user is in, including ``"*"``. + def groups(self) -> list[str]: + """ + A list of groups this user is in, including ``"*"``. - Raises :py:exc:`~earwigbot.exceptions.UserNotFoundError` if the user - does not exist. Makes an API query only if we haven't made one already. + Raises :py:exc:`~earwigbot.exceptions.UserNotFoundError` if the user does not + exist. Makes an API query only if we haven't made one already. """ return self._get_attribute("_groups") @property - def rights(self): - """A list of this user's rights. + def rights(self) -> list[str]: + """ + A list of this user's rights. - Raises :py:exc:`~earwigbot.exceptions.UserNotFoundError` if the user - does not exist. Makes an API query only if we haven't made one already. + Raises :py:exc:`~earwigbot.exceptions.UserNotFoundError` if the user does not + exist. Makes an API query only if we haven't made one already. """ return self._get_attribute("_rights") @property - def editcount(self): - """Returns the number of edits made by the user. + def editcount(self) -> int: + """ + Returns the number of edits made by the user. - Raises :py:exc:`~earwigbot.exceptions.UserNotFoundError` if the user - does not exist. Makes an API query only if we haven't made one already. + Raises :py:exc:`~earwigbot.exceptions.UserNotFoundError` if the user does not + exist. Makes an API query only if we haven't made one already. """ return self._get_attribute("_editcount") @property - def registration(self): - """The time the user registered as a :py:class:`time.struct_time`. + def registration(self) -> time.struct_time: + """ + The time the user registered as a :py:class:`time.struct_time`. - Raises :py:exc:`~earwigbot.exceptions.UserNotFoundError` if the user - does not exist. Makes an API query only if we haven't made one already. + Raises :py:exc:`~earwigbot.exceptions.UserNotFoundError` if the user does not + exist. Makes an API query only if we haven't made one already. """ return self._get_attribute("_registration") @property - def emailable(self): - """``True`` if the user can be emailed, or ``False`` if they cannot. + def emailable(self) -> bool: + """ + ``True`` if the user can be emailed, or ``False`` if they cannot. - Raises :py:exc:`~earwigbot.exceptions.UserNotFoundError` if the user - does not exist. Makes an API query only if we haven't made one already. + Raises :py:exc:`~earwigbot.exceptions.UserNotFoundError` if the user does not + exist. Makes an API query only if we haven't made one already. """ return self._get_attribute("_emailable") @property - def gender(self): - """The user's gender. + def gender(self) -> str: + """ + The user's gender. - Can return either ``"male"``, ``"female"``, or ``"unknown"``, if they - did not specify it. + Can return either ``"male"``, ``"female"``, or ``"unknown"``, if they did not + specify it. - Raises :py:exc:`~earwigbot.exceptions.UserNotFoundError` if the user - does not exist. Makes an API query only if we haven't made one already. + Raises :py:exc:`~earwigbot.exceptions.UserNotFoundError` if the user does not + exist. Makes an API query only if we haven't made one already. """ return self._get_attribute("_gender") @property - def is_ip(self): - """``True`` if the user is an IP address, or ``False`` otherwise. + def is_ip(self) -> bool: + """ + ``True`` if the user is an IP address, or ``False`` otherwise. - This tests for IPv4 and IPv6 using :py:func:`socket.inet_pton` on the - username. No API queries are made. + This tests for IPv4 and IPv6 using :py:func:`socket.inet_pton` on the username. + No API queries are made. """ try: - inet_pton(AF_INET, self.name) + socket.inet_pton(socket.AF_INET, self.name) except OSError: try: - inet_pton(AF_INET6, self.name) + socket.inet_pton(socket.AF_INET6, self.name) except OSError: return False return True - def reload(self): - """Forcibly reload the user's attributes. + def reload(self) -> None: + """ + Forcibly reload the user's attributes. - Emphasis on *reload*: this is only necessary if there is reason to - believe they have changed. + Emphasis on *reload*: this is only necessary if there is reason to believe they + have changed. """ self._load_attributes() - def get_userpage(self): - """Return a Page object representing the user's userpage. + def get_userpage(self) -> Page: + """ + Return a Page object representing the user's userpage. - No checks are made to see if it exists or not. Proper site namespace - conventions are followed. + No checks are made to see if it exists or not. Proper site namespace conventions + are followed. """ prefix = self.site.namespace_id_to_name(constants.NS_USER) pagename = ":".join((prefix, self._name)) return Page(self.site, pagename) - def get_talkpage(self): - """Return a Page object representing the user's talkpage. + def get_talkpage(self) -> Page: + """ + Return a Page object representing the user's talkpage. - No checks are made to see if it exists or not. Proper site namespace - conventions are followed. + No checks are made to see if it exists or not. Proper site namespace conventions + are followed. """ prefix = self.site.namespace_id_to_name(constants.NS_USER_TALK) pagename = ":".join((prefix, self._name))