diff --git a/.gitignore b/.gitignore index bc67eea..1884197 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,9 @@ # Ignore bot-specific config file: config.json +# Ignore cookies file: +.cookies + # Ignore OS X's crud: *.DS_Store diff --git a/irc/commands/afc_status.py b/irc/commands/afc_status.py index 9273a53..0f5722e 100644 --- a/irc/commands/afc_status.py +++ b/irc/commands/afc_status.py @@ -3,12 +3,11 @@ """Report the status of AFC submissions, either as an automatic message on join or a request via !status.""" -import json import re -import urllib from core import config from irc.classes import BaseCommand +from wiki import tools class AFCStatus(BaseCommand): def get_hooks(self): @@ -29,6 +28,8 @@ class AFCStatus(BaseCommand): return False def process(self, data): + self.site = tools.get_site() + if data.line[1] == "JOIN": notice = self.get_join_notice() self.connection.notice(data.nick, notice) @@ -85,19 +86,15 @@ class AFCStatus(BaseCommand): def count_submissions(self): """Returns the number of open AFC submissions (count of CAT:PEND).""" - params = {'action': 'query', 'list': 'categorymembers', 'cmlimit':'500', 'format': 'json'} - params['cmtitle'] = "Category:Pending_AfC_submissions" - data = urllib.urlencode(params) - raw = urllib.urlopen("http://en.wikipedia.org/w/api.php", data).read() - res = json.loads(raw) - subs = len(res['query']['categorymembers']) + cat = self.site.get_category("Pending AfC submissions") + subs = cat.members(limit=500) subs -= 2 # remove [[Wikipedia:Articles for creation/Redirects]] and [[Wikipedia:Files for upload]], which aren't real submissions return subs def count_redirects(self): """Returns the number of open redirect submissions. Calculated as the total number of submissions minus the closed ones.""" - content = self.get_page("Wikipedia:Articles_for_creation/Redirects") + content = self.site.get_page("Wikipedia:Articles for creation/Redirects").get() total = len(re.findall("^\s*==(.*?)==\s*$", content, re.MULTILINE)) closed = content.lower().count("{{afc-c|b}}") redirs = total - closed @@ -106,24 +103,12 @@ class AFCStatus(BaseCommand): def count_files(self): """Returns the number of open WP:FFU (Files For Upload) requests. Calculated as the total number of requests minus the closed ones.""" - content = self.get_page("Wikipedia:Files_for_upload") + content = self.site.get_page("Wikipedia:Files for upload").get() total = len(re.findall("^\s*==(.*?)==\s*$", content, re.MULTILINE)) closed = content.lower().count("{{ifu-c|b}}") files = total - closed return files - def get_page(self, pagename): - """Simple method to return the content of the page 'pagename'. Will be - a part of wiki/tools/ when I finish that.""" - params = {'action': 'query', 'prop': 'revisions', 'rvprop':'content', 'rvlimit':'1', 'format': 'json'} - params['titles'] = pagename - data = urllib.urlencode(params) - raw = urllib.urlopen("http://en.wikipedia.org/w/api.php", data).read() - res = json.loads(raw) - pageid = res['query']['pages'].keys()[0] - content = res['query']['pages'][pageid]['revisions'][0]['*'] - return content - def get_aggregate(self, num): """Returns a human-readable AFC status based on the number of pending AFC submissions, open redirect requests, and open FFU requests. This diff --git a/irc/commands/rights.py b/irc/commands/rights.py index 31d9437..4289002 100644 --- a/irc/commands/rights.py +++ b/irc/commands/rights.py @@ -4,10 +4,8 @@ Retrieve a list of user rights for a given username via the API. """ -import json -import urllib - from irc.classes import BaseCommand +from wiki import tools class Rights(BaseCommand): def get_hooks(self): @@ -27,24 +25,14 @@ class Rights(BaseCommand): return username = ' '.join(data.args) - rights = self.get_rights(username) + site = tools.get_site() + user = site.get_user(username) + rights = user.groups() if rights: + try: + rights.remove("*") # remove the implicit '*' group given to everyone + except ValueError: + pass self.connection.reply(data, "the rights for \x0302{0}\x0301 are {1}.".format(username, ', '.join(rights))) else: self.connection.reply(data, "the user \x0302{0}\x0301 has no rights, or does not exist.".format(username)) - - def get_rights(self, username): - params = {'action': 'query', 'format': 'json', 'list': 'users', 'usprop': 'groups'} - params['ususers'] = username - data = urllib.urlencode(params) - raw = urllib.urlopen("http://en.wikipedia.org/w/api.php", data).read() - res = json.loads(raw) - try: - rights = res['query']['users'][0]['groups'] - except KeyError: # 'groups' not found, meaning the user does not exist - return None - try: - rights.remove("*") # remove the implicit '*' group given to everyone - except ValueError: # I don't expect this to happen, but if it does, be prepared - pass - return rights diff --git a/wiki/tools/__init__.py b/wiki/tools/__init__.py index e69de29..7fb431e 100644 --- a/wiki/tools/__init__.py +++ b/wiki/tools/__init__.py @@ -0,0 +1,20 @@ +# -*- coding: utf-8 -*- + +""" +EarwigBot's Wiki Toolset + +This is a collection of classes and functions to read from and write to +Wikipedia and other wiki sites. No connection whatsoever to python-wikitools +written by Mr.Z-man, other than a similar purpose. We share no code. + +Import the toolset with `from wiki import tools`. +""" + +from wiki.tools.constants import * +from wiki.tools.exceptions import * +from wiki.tools.functions import * + +from wiki.tools.category import Category +from wiki.tools.page import Page +from wiki.tools.site import Site +from wiki.tools.user import User diff --git a/wiki/tools/category.py b/wiki/tools/category.py new file mode 100644 index 0000000..7ebe342 --- /dev/null +++ b/wiki/tools/category.py @@ -0,0 +1,30 @@ +# -*- coding: utf-8 -*- + +from wiki.tools.page import Page + +class Category(Page): + """ + EarwigBot's Wiki Toolset: Category Class + + Represents a Category on a given Site, a subclass of Page. Provides + additional methods, but Page's own methods should work fine on Category + objects. Site.get_page() will return a Category instead of a Page if the + given title is in the category namespace; get_category() is shorthand, + because it accepts category names without the namespace prefix. + + Public methods: + members -- returns a list of titles in the category + """ + + def members(self, limit=50): + """Returns a list of titles in the category. + + If `limit` is provided, we will provide this many titles, or less if + the category is too small. `limit` defaults to 50; normal users can go + up to 500, and bots can go up to 5,000 on a single API query. + """ + params = {"action": "query", "list": "categorymembers", + "cmlimit": limit, "cmtitle": self.title} + result = self._site._api_query(params) + members = result['query']['categorymembers'] + return [member["title"] for member in members] diff --git a/wiki/tools/constants.py b/wiki/tools/constants.py new file mode 100644 index 0000000..6397c5d --- /dev/null +++ b/wiki/tools/constants.py @@ -0,0 +1,35 @@ +# -*- coding: utf-8 -*- + +""" +EarwigBot's Wiki Toolset: Constants + +This module defines some useful constants, such as default namespace IDs for +easy lookup and our user agent. + +Import with `from wiki.tools.constants import *`. +""" + +import platform + +# User agent when making API queries +USER_AGENT = "EarwigBot/0.1-dev (Python/{0}; https://github.com/earwig/earwigbot)".format(platform.python_version()) + +# Default namespace IDs +NS_MAIN = 0 +NS_TALK = 1 +NS_USER = 2 +NS_USER_TALK = 3 +NS_PROJECT = 4 +NS_PROJECT_TALK = 5 +NS_FILE = 6 +NS_FILE_TALK = 7 +NS_MEDIAWIKI = 8 +NS_MEDIAWIKI_TALK = 9 +NS_TEMPLATE = 10 +NS_TEMPLATE_TALK = 11 +NS_HELP = 12 +NS_HELP_TALK = 13 +NS_CATEGORY = 14 +NS_CATEGORY_TALK = 15 +NS_SPECIAL = -1 +NS_MEDIA = -2 diff --git a/wiki/tools/exceptions.py b/wiki/tools/exceptions.py new file mode 100644 index 0000000..f36dae3 --- /dev/null +++ b/wiki/tools/exceptions.py @@ -0,0 +1,47 @@ +# -*- coding: utf-8 -*- + +""" +EarwigBot's Wiki Toolset: Exceptions + +This module contains all exceptions used by the wiki.tools package. +""" + +class WikiToolsetError(Exception): + """Base exception class for errors in the Wiki Toolset.""" + +class SiteNotFoundError(WikiToolsetError): + """A site matching the args given to get_site() could not be found in the + config file.""" + +class SiteAPIError(WikiToolsetError): + """We couldn't connect to a site's API, perhaps because the server doesn't + exist, our URL is wrong or incomplete, or they're having temporary + problems.""" + +class LoginError(WikiToolsetError): + """An error occured while trying to login. Perhaps the username/password is + incorrect.""" + +class PermissionsError(WikiToolsetError): + """We tried to do something we don't have permission to, like a non-admin + trying to delete a page, or trying to edit a page when no login information + was provided.""" + +class NamespaceNotFoundError(WikiToolsetError): + """A requested namespace name or namespace ID does not exist.""" + +class PageNotFoundError(WikiToolsetError): + """Attempting to get certain information about a page that does not + exist.""" + +class InvalidPageError(WikiToolsetError): + """Attempting to get certain information about a page whose title is + invalid.""" + +class RedirectError(WikiToolsetError): + """Page's get_redirect_target() method failed because the page is either + not a redirect, or it is malformed.""" + +class UserNotFoundError(WikiToolsetError): + """Attempting to get certain information about a user that does not + exist.""" diff --git a/wiki/tools/functions.py b/wiki/tools/functions.py new file mode 100644 index 0000000..ab18609 --- /dev/null +++ b/wiki/tools/functions.py @@ -0,0 +1,181 @@ +# -*- coding: utf-8 -*- + +""" +EarwigBot's Wiki Toolset: Misc Functions + +This module, a component of the wiki.tools package, contains miscellaneous +functions that are not methods of any class, like get_site(). + +There's no need to import this module explicitly. All functions here are +automatically available from wiki.tools. +""" + +from cookielib import LWPCookieJar, LoadError +import errno +from getpass import getpass +from os import chmod, path +import stat + +from core import config +from wiki.tools.exceptions import SiteNotFoundError +from wiki.tools.site import Site + +__all__ = ["get_site"] + +_cookiejar = None + +def _load_config(): + """Called by a config-requiring function, such as get_site(), when config + has not been loaded. This will usually happen only if we're running code + directly from Python's interpreter and not the bot itself, because + earwigbot.py or core/main.py will already call these functions. + """ + is_encrypted = config.verify_config() + if is_encrypted: # passwords in the config file are encrypted + key = getpass("Enter key to unencrypt bot passwords: ") + config.parse_config(key) + else: + config.parse_config(None) + +def _get_cookiejar(): + """Returns a LWPCookieJar object loaded from our .cookies file. The same + one is returned every time. + + The .cookies file is located in the project root, same directory as + config.json and earwigbot.py. If it doesn't exist, we will create the file + and set it to be readable and writeable only by us. If it exists but the + information inside is bogus, we will ignore it. + + This is normally called by _get_site_object_from_dict() (in turn called by + get_site()), and the cookiejar is passed to our Site's constructor, used + when it makes API queries. This way, we can easily preserve cookies between + sites (e.g., for CentralAuth), making logins easier. + """ + global _cookiejar + if _cookiejar is not None: + return _cookiejar + + cookie_file = path.join(config.root_dir, ".cookies") + _cookiejar = LWPCookieJar(cookie_file) + + try: + _cookiejar.load() + except LoadError: + # file contains bad data, so ignore it completely + pass + except IOError as e: + if e.errno == errno.ENOENT: # "No such file or directory" + # create the file and restrict reading/writing only to the owner, + # so others can't peak at our cookies + open(cookie_file, "w").close() + chmod(cookie_file, stat.S_IRUSR|stat.S_IWUSR) + else: + raise + + return _cookiejar + +def _get_site_object_from_dict(name, d): + """Return a Site object based on the contents of a dict, probably acquired + through our config file, and a separate name. + """ + project = d.get("project") + lang = d.get("lang") + base_url = d.get("baseURL") + article_path = d.get("articlePath") + script_path = d.get("scriptPath") + sql = (d.get("sqlServer"), d.get("sqlDB")) + namespaces = d.get("namespaces") + login = (config.wiki.get("username"), config.wiki.get("password")) + cookiejar = _get_cookiejar() + + return Site(name=name, project=project, lang=lang, base_url=base_url, + article_path=article_path, script_path=script_path, sql=sql, + namespaces=namespaces, login=login, cookiejar=cookiejar) + +def get_site(name=None, project=None, lang=None): + """Returns a Site instance based on information from our config file. + + With no arguments, returns the default site as specified by our config + file. This is default = config.wiki["defaultSite"]; + config.wiki["sites"][default]. + + With `name` specified, returns the site specified by + config.wiki["sites"][name]. + + With `project` and `lang` specified, returns the site specified by the + member of config.wiki["sites"], `s`, for which s["project"] == project and + s["lang"] == lang. + + We will attempt to login to the site automatically + using config.wiki["username"] and config.wiki["password"] if both are + defined. + + Specifying a project without a lang or a lang without a project will raise + TypeError. If all three args are specified, `name` will be first tried, + then `project` and `lang`. If, with any number of args, a site cannot be + found in the config, SiteNotFoundError is raised. + """ + # check if config has been loaded, and load it if it hasn't + if not config.is_config_loaded(): + _load_config() + + # someone specified a project without a lang (or a lang without a project)! + if (project is None and lang is not None) or (project is not None and + lang is None): + e = "Keyword arguments 'lang' and 'project' must be specified together." + raise TypeError(e) + + # no args given, so return our default site (project is None implies lang + # is None, so we don't need to add that in) + if name is None and project is None: + try: + default = config.wiki["defaultSite"] + except KeyError: + e = "Default site is not specified in config." + raise SiteNotFoundError(e) + try: + site = config.wiki["sites"][default] + except KeyError: + e = "Default site specified by config is not in the config's sites list." + raise SiteNotFoundError(e) + return _get_site_object_from_dict(default, site) + + # name arg given, but don't look at others unless `name` isn't found + if name is not None: + try: + site = config.wiki["sites"][name] + except KeyError: + if project is None: # implies lang is None, so only name was given + e = "Site '{0}' not found in config.".format(name) + raise SiteNotFoundError(e) + for sitename, site in config.wiki["sites"].items(): + if site["project"] == project and site["lang"] == lang: + return _get_site_object_from_dict(sitename, site) + e = "Neither site '{0}' nor site '{1}:{2}' found in config." + e.format(name, project, lang) + raise SiteNotFoundError(e) + else: + return _get_site_object_from_dict(name, site) + + # if we end up here, then project and lang are both not None + for sitename, site in config.wiki["sites"].items(): + if site["project"] == project and site["lang"] == lang: + return _get_site_object_from_dict(sitename, site) + e = "Site '{0}:{1}' not found in config.".format(project, lang) + raise SiteNotFoundError(e) + +def add_site(): + """STUB: config editing is required first. + + Returns True if the site was added successfully or False if the site was + already in our config. Raises ConfigError if saving the updated file failed + for some reason.""" + pass + +def del_site(name): + """STUB: config editing is required first. + + Returns True if the site was removed successfully or False if the site was + not in our config originally. Raises ConfigError if saving the updated file + failed for some reason.""" + pass diff --git a/wiki/tools/page.py b/wiki/tools/page.py new file mode 100644 index 0000000..8ae25f1 --- /dev/null +++ b/wiki/tools/page.py @@ -0,0 +1,414 @@ +# -*- coding: utf-8 -*- + +import re +from urllib import quote + +from wiki.tools.exceptions import * + +class Page(object): + """ + EarwigBot's Wiki Toolset: Page Class + + Represents a Page on a given Site. Has methods for getting information + about the page, getting page content, and so on. Category is a subclass of + Page with additional methods. + + Public methods: + title -- returns the page's title, or pagename + exists -- returns whether the page exists + pageid -- returns an integer ID representing the page + url -- returns the page's URL + namespace -- returns the page's namespace as an integer + protection -- returns the page's current protection status + is_talkpage -- returns True if the page is a talkpage, else False + is_redirect -- returns True if the page is a redirect, else False + toggle_talk -- returns a content page's talk page, or vice versa + get -- returns page content + get_redirect_target -- if the page is a redirect, returns its destination + """ + + def __init__(self, site, title, follow_redirects=False): + """Constructor for new Page instances. + + Takes three arguments: a Site object, the Page's title (or pagename), + and whether or not to follow redirects (optional, defaults to False). + + As with User, site.get_page() is preferred. Site's method has support + for a default `follow_redirects` value in our config, while __init__ + always defaults to False. + + __init__ will not do any API queries, but it will use basic namespace + logic to determine our namespace ID and if we are a talkpage. + """ + self._site = site + self._title = title.strip() + self._follow_redirects = self._keep_following = follow_redirects + + self._exists = 0 + self._pageid = None + self._is_redirect = None + self._lastrevid = None + self._protection = None + self._fullurl = None + self._content = None + + # Try to determine the page's namespace using our site's namespace + # converter: + prefix = self._title.split(":", 1)[0] + if prefix != title: # ignore a page that's titled "Category" or "User" + try: + self._namespace = self._site.namespace_name_to_id(prefix) + except NamespaceNotFoundError: + self._namespace = 0 + else: + self._namespace = 0 + + # Is this a talkpage? Talkpages have odd IDs, while content pages have + # even IDs, excluding the "special" namespaces: + if self._namespace < 0: + self._is_talkpage = False + else: + self._is_talkpage = self._namespace % 2 == 1 + + def _force_validity(self): + """Used to ensure that our page's title is valid. + + If this method is called when our page is not valid (and after + _load_attributes() has been called), InvalidPageError will be raised. + + Note that validity != existence. If a page's title is invalid (e.g, it + contains "[") it will always be invalid, and cannot be edited. + """ + if self._exists == 1: + e = "Page '{0}' is invalid.".format(self._title) + raise InvalidPageError(e) + + def _force_existence(self): + """Used to ensure that our page exists. + + If this method is called when our page doesn't exist (and after + _load_attributes() has been called), PageNotFoundError will be raised. + It will also call _force_validity() beforehand. + """ + self._force_validity() + if self._exists == 2: + e = "Page '{0}' does not exist.".format(self._title) + raise PageNotFoundError(e) + + def _load_wrapper(self): + """Calls _load_attributes() and follows redirects if we're supposed to. + + This method will only follow redirects if follow_redirects=True was + passed to __init__() (perhaps indirectly passed by site.get_page()). + It avoids the API's &redirects param in favor of manual following, + so we can act more realistically (we don't follow double redirects, and + circular redirects don't break us). + + This will raise RedirectError if we have a problem following, but that + is a bug and should NOT happen. + + If we're following a redirect, this will make a grand total of three + API queries. It's a lot, but each one is quite small. + """ + self._load_attributes() + + if self._keep_following and self._is_redirect: + self._title = self.get_redirect_target() + self._keep_following = False # don't follow double redirects + self._content = None # reset the content we just loaded + self._load_attributes() + + def _load_attributes(self, result=None): + """Loads various data from the API in a single query. + + Loads self._title, ._exists, ._is_redirect, ._pageid, ._fullurl, + ._protection, ._namespace, ._is_talkpage, and ._lastrevid using the + API. It will do a query of its own unless `result` is provided, in + which case we'll pretend `result` is what the query returned. + + Assuming the API is sound, this should not raise any exceptions. + """ + if result is None: + params = {"action": "query", "prop": "info", "titles": self._title, + "inprop": "protection|url"} + result = self._site._api_query(params) + + res = result["query"]["pages"].values()[0] + + # Normalize our pagename/title thing: + self._title = res["title"] + + try: + res["redirect"] + except KeyError: + self._is_redirect = False + else: + self._is_redirect = True + + self._pageid = result["query"]["pages"].keys()[0] + if int(self._pageid) < 0: + try: + res["missing"] + except KeyError: + # If it has a negative ID and it's invalid, then break here, + # because there's no other data for us to get: + self._exists = 1 + return + else: + # If it has a negative ID and it's missing; we can still get + # data like the namespace, protection, and URL: + self._exists = 2 + else: + self._exists = 3 + + self._fullurl = res["fullurl"] + self._protection = res["protection"] + + # We've determined the namespace and talkpage status in __init__() + # based on the title, but now we can be sure: + self._namespace = res["ns"] + self._is_talkpage = self._namespace % 2 == 1 # talkpages have odd IDs + + # This last field will only be specified if the page exists: + try: + self._lastrevid = res["lastrevid"] + except KeyError: + pass + + def _load_content(self, result=None): + """Loads current page content from the API. + + If `result` is provided, we'll pretend that is the result of an API + query and try to get content from that. Otherwise, we'll do an API + query on our own. + + Don't call this directly, ever - use .get(force=True) if you want to + force content reloading. + """ + if result is None: + params = {"action": "query", "prop": "revisions", "rvlimit": 1, + "rvprop": "content", "titles": self._title} + result = self._site._api_query(params) + + res = result["query"]["pages"].values()[0] + try: + content = res["revisions"][0]["*"] + self._content = content + except KeyError: + # This can only happen if the page was deleted since we last called + # self._load_attributes(). In that case, some of our attributes are + # outdated, so force another self._load_attributes(): + self._load_attributes() + self._force_existence() + + def title(self, force=False): + """Returns the Page's title, or pagename. + + This won't do any API queries on its own unless force is True, in which + case the title will be forcibly reloaded from the API (normalizing it, + and following redirects if follow_redirects=True was passed to + __init__()). Any other methods that do API queries will reload title on + their own, however, like exists() and get(). + """ + if force: + self._load_wrapper() + return self._title + + def exists(self, force=False): + """Returns information about whether the Page exists or not. + + The returned "information" is a tuple with two items. The first is a + bool, either True if the page exists or False if it does not. The + second is a string giving more information, either "invalid", (title + is invalid, e.g. it contains "["), "missing", or "exists". + + Makes an API query if force is True or if we haven't already made one. + """ + cases = { + 0: (None, "unknown"), + 1: (False, "invalid"), + 2: (False, "missing"), + 3: (True, "exists"), + } + if self._exists == 0 or force: + self._load_wrapper() + return cases[self._exists] + + def pageid(self, force=False): + """Returns an integer ID representing the Page. + + Makes an API query if force is True or if we haven't already made one. + + Raises InvalidPageError or PageNotFoundError if the page name is + invalid or the page does not exist, respectively. + """ + if self._exists == 0 or force: + self._load_wrapper() + self._force_existence() # missing pages do not have IDs + return self._pageid + + def url(self, force=False): + """Returns the page's URL. + + Like title(), this won't do any API queries on its own unless force is + True. If the API was never queried for this page, we will attempt to + determine the URL ourselves based on the title. + """ + if force: + self._load_wrapper() + if self._fullurl is not None: + return self._fullurl + else: + slug = quote(self._title.replace(" ", "_"), safe="/:") + path = self._site._article_path.replace("$1", slug) + return ''.join((self._site._base_url, path)) + + def namespace(self, force=False): + """Returns the page's namespace ID (an integer). + + Like title(), this won't do any API queries on its own unless force is + True. If the API was never queried for this page, we will attempt to + determine the namespace ourselves based on the title. + """ + if force: + self._load_wrapper() + return self._namespace + + def protection(self, force=False): + """Returns the page's current protection status. + + Makes an API query if force is True or if we haven't already made one. + + Raises InvalidPageError if the page name is invalid. Will not raise an + error if the page is missing because those can still be protected. + """ + if self._exists == 0 or force: + self._load_wrapper() + self._force_validity() # invalid pages cannot be protected + return self._protection + + def is_talkpage(self, force=False): + """Returns True if the page is a talkpage, else False. + + Like title(), this won't do any API queries on its own unless force is + True. If the API was never queried for this page, we will attempt to + determine the talkpage status ourselves based on its namespace ID. + """ + if force: + self._load_wrapper() + return self._is_talkpage + + def is_redirect(self, force=False): + """Returns True if the page is a redirect, else False. + + Makes an API query if force is True or if we haven't already made one. + + We will return False even if the page does not exist or is invalid. + """ + if self._exists == 0 or force: + self._load_wrapper() + return self._is_redirect + + def toggle_talk(self, force=False, follow_redirects=None): + """Returns a content page's talk page, or vice versa. + + The title of the new page is determined by namespace logic, not API + queries. We won't make any API queries on our own unless force is True, + and the only reason then would be to forcibly update the title or + follow redirects if we haven't already made an API query. + + If `follow_redirects` is anything other than None (the default), it + will be passed to the new Page's __init__(). Otherwise, we'll use the + value passed to our own __init__(). + + Will raise InvalidPageError if we try to get the talk page of a special + page (in the Special: or Media: namespaces), but we won't raise an + exception if our page is otherwise missing or invalid. + """ + if force: + self._load_wrapper() + if self._namespace < 0: + ns = self._site.namespace_id_to_name(self._namespace) + e = "Pages in the {0} namespace can't have talk pages.".format(ns) + raise InvalidPageError(e) + + if self._is_talkpage: + new_ns = self._namespace - 1 + else: + new_ns = self._namespace + 1 + + try: + body = self._title.split(":", 1)[1] + except IndexError: + body = self._title + + new_prefix = self._site.namespace_id_to_name(new_ns) + + # If the new page is in namespace 0, don't do ":Title" (it's correct, + # but unnecessary), just do "Title": + if new_prefix: + new_title = ':'.join((new_prefix, body)) + else: + new_title = body + + if follow_redirects is None: + follow_redirects = self._follow_redirects + return Page(self._site, new_title, follow_redirects) + + def get(self, force=False): + """Returns page content, which is cached if you try to call get again. + + Use `force` to forcibly reload page content even if we've already + loaded some. This is good if you want to edit a page multiple times, + and you want to get updated content before you make your second edit. + + Raises InvalidPageError or PageNotFoundError if the page name is + invalid or the page does not exist, respectively. + """ + if force or self._exists == 0: + # Kill two birds with one stone by doing an API query for both our + # attributes and our page content: + params = {"action": "query", "rvprop": "content", "rvlimit": 1, + "prop": "info|revisions", "inprop": "protection|url", + "titles": self._title} + result = self._site._api_query(params) + self._load_attributes(result=result) + self._force_existence() + self._load_content(result=result) + + # Follow redirects if we're told to: + if self._keep_following and self._is_redirect: + self._title = self.get_redirect_target() + self._keep_following = False # don't follow double redirects + self._content = None # reset the content we just loaded + self.get(force=True) + + return self._content + + # Make sure we're dealing with a real page here. This may be outdated + # if the page was deleted since we last called self._load_attributes(), + # but self._load_content() can handle that: + self._force_existence() + + if self._content is None: + self._load_content() + + return self._content + + def get_redirect_target(self, force=False): + """If the page is a redirect, returns its destination. + + Use `force` to forcibly reload content even if we've already loaded + some before. Note that this method calls get() for page content. + + Raises InvalidPageError or PageNotFoundError if the page name is + invalid or the page does not exist, respectively. Raises RedirectError + if the page is not a redirect. + """ + content = self.get(force) + regexp = "^\s*\#\s*redirect\s*\[\[(.*?)\]\]" + try: + return re.findall(regexp, content, flags=re.IGNORECASE)[0] + except IndexError: + e = "The page does not appear to have a redirect target." + raise RedirectError(e) diff --git a/wiki/tools/site.py b/wiki/tools/site.py new file mode 100644 index 0000000..57b890d --- /dev/null +++ b/wiki/tools/site.py @@ -0,0 +1,446 @@ +# -*- coding: utf-8 -*- + +from cookielib import CookieJar +from gzip import GzipFile +from json import loads +from re import escape as re_escape, match as re_match +from StringIO import StringIO +from urllib import unquote_plus, urlencode +from urllib2 import build_opener, HTTPCookieProcessor, URLError +from urlparse import urlparse + +from wiki.tools.category import Category +from wiki.tools.constants import * +from wiki.tools.exceptions import * +from wiki.tools.page import Page +from wiki.tools.user import User + +class Site(object): + """ + EarwigBot's Wiki Toolset: Site Class + + Represents a Site, with support for API queries and returning Pages, Users, + and Categories. The constructor takes a bunch of arguments and you probably + won't need to call it directly, rather tools.get_site() for returning Site + instances, tools.add_site() for adding new ones to config, and + tools.del_site() for removing old ones from config, should suffice. + + Public methods: + name -- returns our name (or "wikiid"), like "enwiki" + project -- returns our project name, like "wikipedia" + lang -- returns our language code, like "en" + domain -- returns our web domain, like "en.wikipedia.org" + api_query -- does an API query with the given kwargs as params + namespace_id_to_name -- given a namespace ID, returns associated name(s) + namespace_name_to_id -- given a namespace name, returns associated id + get_page -- returns a Page object for the given title + get_category -- returns a Category object for the given title + get_user -- returns a User object for the given username + """ + + def __init__(self, name=None, project=None, lang=None, base_url=None, + article_path=None, script_path=None, sql=(None, None), + namespaces=None, login=(None, None), cookiejar=None): + """Constructor for new Site instances. + + This probably isn't necessary to call yourself unless you're building a + Site that's not in your config and you don't want to add it - normally + all you need is tools.get_site(name), which creates the Site for you + based on your config file. We accept a bunch of kwargs, but the only + ones you really "need" are `base_url` and `script_path` - this is + enough to figure out an API url. `login`, a tuple of + (username, password), is highly recommended. `cookiejar` will be used + to store cookies, and we'll use a normal CookieJar if none is given. + + First, we'll store the given arguments as attributes, then set up our + URL opener. We'll load any of the attributes that weren't given from + the API, and then log in if a username/pass was given and we aren't + already logged in. + """ + # attributes referring to site information, filled in by an API query + # if they are missing (and an API url can be determined) + self._name = name + self._project = project + self._lang = lang + self._base_url = base_url + self._article_path = article_path + self._script_path = script_path + self._sql = sql + self._namespaces = namespaces + + # set up cookiejar and URL opener for making API queries + if cookiejar is not None: + self._cookiejar = cookiejar + else: + self._cookiejar = CookieJar() + self._opener = build_opener(HTTPCookieProcessor(self._cookiejar)) + self._opener.addheaders = [("User-Agent", USER_AGENT), + ("Accept-Encoding", "gzip")] + + # get all of the above attributes that were not specified as arguments + self._load_attributes() + + # if we have a name/pass and the API says we're not logged in, log in + self._login_info = name, password = login + if name is not None and password is not None: + logged_in_as = self._get_username_from_cookies() + if logged_in_as is None or name != logged_in_as: + self._login(login) + + def _api_query(self, params): + """Do an API query with `params` as a dict of parameters. + + This will first attempt to construct an API url from self._base_url and + self._script_path. We need both of these, or else we'll raise + SiteAPIError. + + We'll encode the given params, adding format=json along the way, and + make the request through self._opener, which has built-in cookie + support via self._cookiejar, a User-Agent + (wiki.tools.constants.USER_AGENT), and Accept-Encoding set to "gzip". + Assuming everything went well, we'll gunzip the data (if compressed), + load it as a JSON object, and return it. + + If our request failed, we'll raise SiteAPIError with details. + + There's helpful MediaWiki API documentation at + . + """ + if self._base_url is None or self._script_path is None: + e = "Tried to do an API query, but no API URL is known." + raise SiteAPIError(e) + + url = ''.join((self._base_url, self._script_path, "/api.php")) + params["format"] = "json" # this is the only format we understand + data = urlencode(params) + + print url, data # debug code + + try: + response = self._opener.open(url, data) + except URLError as error: + if hasattr(error, "reason"): + e = "API query at {0} failed because {1}." + e = e.format(error.geturl, error.reason) + elif hasattr(error, "code"): + e = "API query at {0} failed; got an error code of {1}." + e = e.format(error.geturl, error.code) + else: + e = "API query failed." + raise SiteAPIError(e) + else: + result = response.read() + if response.headers.get("Content-Encoding") == "gzip": + stream = StringIO(result) + gzipper = GzipFile(fileobj=stream) + result = gzipper.read() + return loads(result) # parse as a JSON object + + def _load_attributes(self, force=False): + """Load data about our Site from the API. + + This function is called by __init__() when one of the site attributes + was not given as a keyword argument. We'll do an API query to get the + missing data, but only if there actually *is* missing data. + + Additionally, you can call this with `force=True` to forcibly reload + all attributes. + """ + # all attributes to be loaded, except _namespaces, which is a special + # case because it requires additional params in the API query + attrs = [self._name, self._project, self._lang, self._base_url, + self._article_path, self._script_path] + + params = {"action": "query", "meta": "siteinfo"} + + if self._namespaces is None or force: + params["siprop"] = "general|namespaces|namespacealiases" + result = self._api_query(params) + self._load_namespaces(result) + elif all(attrs): # everything is already specified and we're not told + return # to force a reload, so do nothing + else: # we're only loading attributes other than _namespaces + params["siprop"] = "general" + result = self._api_query(params) + + res = result["query"]["general"] + self._name = res["wikiid"] + self._project = res["sitename"].lower() + self._lang = res["lang"] + self._base_url = res["server"] + self._article_path = res["articlepath"] + self._script_path = res["scriptpath"] + + def _load_namespaces(self, result): + """Fill self._namespaces with a dict of namespace IDs and names. + + Called by _load_attributes() with API data as `result` when + self._namespaces was not given as an kwarg to __init__(). + """ + self._namespaces = {} + + for namespace in result["query"]["namespaces"].values(): + ns_id = namespace["id"] + name = namespace["*"] + try: + canonical = namespace["canonical"] + except KeyError: + self._namespaces[ns_id] = [name] + else: + if name != canonical: + self._namespaces[ns_id] = [name, canonical] + else: + self._namespaces[ns_id] = [name] + + for namespace in result["query"]["namespacealiases"]: + ns_id = namespace["id"] + alias = namespace["*"] + self._namespaces[ns_id].append(alias) + + def _get_cookie(self, name, domain): + """Return the named cookie unless it is expired or doesn't exist.""" + for cookie in self._cookiejar: + if cookie.name == name and cookie.domain == domain: + if cookie.is_expired(): + break + return cookie + + def _get_username_from_cookies(self): + """Try to return our username based solely on cookies. + + First, we'll look for a cookie named self._name + "Token", like + "enwikiToken". If it exists and isn't expired, we'll assume it's valid + and try to return the value of the cookie self._name + "UserName" (like + "enwikiUserName"). This should work fine on wikis without single-user + login. + + If `enwikiToken` doesn't exist, we'll try to find a cookie named + `centralauth_Token`. If this exists and is not expired, we'll try to + return the value of `centralauth_User`. + + If we didn't get any matches, we'll return None. Our goal here isn't to + return the most likely username, or what we *want* our username to be + (for that, we'd do self._login_info[0]), but rather to get our current + username without an unnecessary ?action=query&meta=userinfo API query. + """ + domain = self.domain() + name = ''.join((self._name, "Token")) + cookie = self._get_cookie(name, domain) + + if cookie is not None: + name = ''.join((self._name, "UserName")) + user_name = self._get_cookie(name, domain) + if user_name is not None: + return user_name.value + + name = "centralauth_Token" + for cookie in self._cookiejar: + if cookie.domain_initial_dot is False or cookie.is_expired(): + continue + if cookie.name != name: + continue + # build a regex that will match domains this cookie affects + search = ''.join(("(.*?)", re_escape(cookie.domain))) + if re_match(search, domain): # test it against our site + user_name = self._get_cookie("centralauth_User", cookie.domain) + if user_name is not None: + return user_name.value + + def _get_username_from_api(self): + """Do a simple API query to get our username and return it. + + This is a reliable way to make sure we are actually logged in, because + it doesn't deal with annoying cookie logic, but it results in an API + query that is unnecessary in some cases. + + Called by _get_username() (in turn called by get_user() with no + username argument) when cookie lookup fails, probably indicating that + we are logged out. + """ + params = {"action": "query", "meta": "userinfo"} + result = self._api_query(params) + return result["query"]["userinfo"]["name"] + + def _get_username(self): + """Return the name of the current user, whether logged in or not. + + First, we'll try to deduce it solely from cookies, to avoid an + unnecessary API query. For the cookie-detection method, see + _get_username_from_cookies()'s docs. + + If our username isn't in cookies, then we're probably not logged in, or + something fishy is going on (like forced logout). In this case, do a + single API query for our username (or IP address) and return that. + """ + name = self._get_username_from_cookies() + if name is not None: + return name + return self._get_username_from_api() + + def _save_cookiejar(self): + """Try to save our cookiejar after doing a (normal) login or logout. + + Calls the standard .save() method with no filename. Don't fret if our + cookiejar doesn't support saving (CookieJar raises AttributeError, + FileCookieJar raises NotImplementedError) or no default filename was + given (LWPCookieJar and MozillaCookieJar raise ValueError). + """ + try: + self._cookiejar.save() + except (AttributeError, NotImplementedError, ValueError): + pass + + def _login(self, login, token=None, attempt=0): + """Safely login through the API. + + Normally, this is called by __init__() if a username and password have + been provided and no valid login cookies were found. The only other + time it needs to be called is when those cookies expire, which is done + automatically by api_query() if a query fails. + + Recent versions of MediaWiki's API have fixed a CSRF vulnerability, + requiring login to be done in two separate requests. If the response + from from our initial request is "NeedToken", we'll do another one with + the token. If login is successful, we'll try to save our cookiejar. + + Raises LoginError on login errors (duh), like bad passwords and + nonexistent usernames. + + `login` is a (username, password) tuple. `token` is the token returned + from our first request, and `attempt` is to prevent getting stuck in a + loop if MediaWiki isn't acting right. + """ + name, password = login + params = {"action": "login", "lgname": name, "lgpassword": password} + if token is not None: + params["lgtoken"] = token + result = self._api_query(params) + res = result["login"]["result"] + + if res == "Success": + self._save_cookiejar() + elif res == "NeedToken" and attempt == 0: + token = result["login"]["token"] + return self._login(login, token, attempt=1) + else: + if res == "Illegal": + e = "The provided username is illegal." + elif res == "NotExists": + e = "The provided username does not exist." + elif res == "EmptyPass": + e = "No password was given." + elif res == "WrongPass" or res == "WrongPluginPass": + e = "The given password is incorrect." + else: + e = "Couldn't login; server says '{0}'.".format(res) + raise LoginError(e) + + def _logout(self): + """Safely logout through the API. + + We'll do a simple API request (api.php?action=logout), clear our + cookiejar (which probably contains now-invalidated cookies) and try to + save it, if it supports that sort of thing. + """ + params = {"action": "logout"} + self._api_query(params) + self._cookiejar.clear() + self._save_cookiejar() + + def api_query(self, **kwargs): + """Do an API query with `kwargs` as the parameters. + + See _api_query()'s documentation for details. + """ + return self._api_query(kwargs) + + def name(self): + """Returns the Site's name (or "wikiid" in the API), like "enwiki".""" + return self._name + + def project(self): + """Returns the Site's project name in lowercase, like "wikipedia".""" + return self._project + + def lang(self): + """Returns the Site's language code, like "en" or "es".""" + return self._lang + + def domain(self): + """Returns the Site's web domain, like "en.wikipedia.org".""" + return urlparse(self._base_url).netloc + + def namespace_id_to_name(self, ns_id, all=False): + """Given a namespace ID, returns associated namespace names. + + If all is False (default), we'll return the first name in the list, + which is usually the localized version. Otherwise, we'll return the + entire list, which includes the canonical name. + + For example, returns u"Wikipedia" if ns_id=4 and all=False on enwiki; + returns [u"Wikipedia", u"Project"] if ns_id=4 and all=True. + + Raises NamespaceNotFoundError if the ID is not found. + """ + try: + if all: + return self._namespaces[ns_id] + else: + return self._namespaces[ns_id][0] + except KeyError: + e = "There is no namespace with id {0}.".format(ns_id) + raise NamespaceNotFoundError(e) + + def namespace_name_to_id(self, name): + """Given a namespace name, returns the associated ID. + + Like namespace_id_to_name(), but reversed. Case is ignored, because + namespaces are assumed to be case-insensitive. + + Raises NamespaceNotFoundError if the name is not found. + """ + lname = name.lower() + for ns_id, names in self._namespaces.items(): + lnames = [n.lower() for n in names] # be case-insensitive + if lname in lnames: + return ns_id + + e = "There is no namespace with name '{0}'.".format(name) + raise NamespaceNotFoundError(e) + + def get_page(self, title, follow_redirects=False): + """Returns a Page object for the given title (pagename). + + Will return a Category object instead if the given title is in the + category namespace. As Category is a subclass of Page, this should not + cause problems. + + Note that this doesn't do any direct checks for existence or + redirect-following - Page's methods provide that. + """ + prefixes = self.namespace_id_to_name(NS_CATEGORY, all=True) + prefix = title.split(":", 1)[0] + if prefix != title: # avoid a page that is simply "Category" + if prefix in prefixes: + return Category(self, title, follow_redirects) + return Page(self, title, follow_redirects) + + def get_category(self, catname, follow_redirects=False): + """Returns a Category object for the given category name. + + `catname` should be given *without* a namespace prefix. This method is + really just shorthand for get_page("Category:" + catname). + """ + prefix = self.namespace_id_to_name(NS_CATEGORY) + pagename = ':'.join((prefix, catname)) + return Category(self, pagename, follow_redirects) + + def get_user(self, username=None): + """Returns a User object for the given username. + + If `username` is left as None, then a User object representing the + currently logged-in (or anonymous!) user is returned. + """ + if username is None: + username = self._get_username() + return User(self, username) diff --git a/wiki/tools/user.py b/wiki/tools/user.py new file mode 100644 index 0000000..3b0173f --- /dev/null +++ b/wiki/tools/user.py @@ -0,0 +1,226 @@ +# -*- coding: utf-8 -*- + +from time import strptime + +from wiki.tools.constants import * +from wiki.tools.exceptions import UserNotFoundError +from wiki.tools.page import Page + +class User(object): + """ + EarwigBot's Wiki Toolset: User Class + + Represents a User on a given Site. Has methods for getting a bunch of + information about the user, such as editcount and user rights, methods for + returning the user's userpage and talkpage, etc. + + Public methods: + name -- returns the user's username + exists -- returns True if the user exists, False if they do not + userid -- returns an integer ID representing the user + blockinfo -- returns information about a current block on the user + groups -- returns a list of the user's groups + rights -- returns a list of the user's rights + editcount -- returns the number of edits made by the user + registration -- returns the time the user registered as a time.struct_time + emailable -- returns True if you can email the user, False if you cannot + gender -- returns the user's gender ("male", "female", or "unknown") + get_userpage -- returns a Page object representing the user's userpage + get_talkpage -- returns a Page object representing the user's talkpage + """ + + def __init__(self, site, name): + """Constructor for new User instances. + + Takes two arguments, a Site object (necessary for doing API queries), + and the name of the user, preferably without "User:" in front, although + this prefix will be automatically removed by the API if given. + + You can also use site.get_user() instead, which returns a User object, + and is preferred. + + We won't do any API queries yet for basic information about the user - + save that for when the information is requested. + """ + self._site = site + self._name = name + + def _get_attribute(self, attr, force): + """Internally used to get an attribute by name. + + We'll call _load_attributes() to get this (and all other attributes) + from the API if it is not already defined. If `force` is True, we'll + re-load them even if they've already been loaded. + + Raises UserNotFoundError if a nonexistant user prevents us from + returning a certain attribute. + """ + if not hasattr(self, attr) or force: + self._load_attributes() + if self._exists is False: + e = "User '{0}' does not exist.".format(self._name) + raise UserNotFoundError(e) + return getattr(self, attr) + + def _load_attributes(self): + """Internally used to load all attributes from the API. + + Normally, this is called by _get_attribute() when a requested attribute + is not defined. This defines it. + """ + params = {"action": "query", "list": "users", "ususers": self._name, + "usprop": "blockinfo|groups|rights|editcount|registration|emailable|gender"} + result = self._site._api_query(params) + res = result["query"]["users"][0] + + # normalize our username in case it was entered oddly + self._name = res["name"] + + try: + self._userid = res["userid"] + except KeyError: # userid is missing, so user does not exist + self._exists = False + return + + self._exists = True + + try: + self._blockinfo = { + "by": res["blockedby"], + "reason": res["blockreason"], + "expiry": res["blockexpiry"] + } + except KeyError: + self._blockinfo = False + + self._groups = res["groups"] + self._rights = res["rights"].values() + self._editcount = res["editcount"] + + reg = res["registration"] + self._registration = strptime(reg, "%Y-%m-%dT%H:%M:%SZ") + + try: + res["emailable"] + except KeyError: + self._emailable = False + else: + self._emailable = True + + self._gender = res["gender"] + + def name(self, force=False): + """Returns the user's name. + + If `force` is True, we will load the name from the API and return that. + This could potentially return a "normalized" version of the name - for + example, without a "User:" prefix or without underscores. Unlike other + attribute getters, this will never make an API query without `force`. + + Note that if another attribute getter, like exists(), has already been + called, then the username has already been normalized. + """ + if force: + self._load_attributes() + return self._name + + def exists(self, force=False): + """Returns True if the user exists, or False if they do not. + + Makes an API query if `force` is True or if we haven't made one + already. + """ + if not hasattr(self, "_exists") or force: + self._load_attributes() + return self._exists + + def userid(self, force=False): + """Returns an integer ID used by MediaWiki to represent the user. + + Raises UserNotFoundError if the user does not exist. Makes an API query + if `force` is True or if we haven't made one already. + """ + return self._get_attribute("_userid", force) + + def blockinfo(self, force=False): + """Returns information about a current block on the user. + + If the user is not blocked, returns False. If they are, returns a dict + with three keys: "by" is the blocker's username, "reason" is the reason + why they were blocked, and "expiry" is when the block expires. + + Raises UserNotFoundError if the user does not exist. Makes an API query + if `force` is True or if we haven't made one already. + """ + return self._get_attribute("_blockinfo", force) + + def groups(self, force=False): + """Returns a list of groups this user is in, including "*". + + Raises UserNotFoundError if the user does not exist. Makes an API query + if `force` is True or if we haven't made one already. + """ + return self._get_attribute("_groups", force) + + def rights(self, force=False): + """Returns a list of this user's rights. + + Raises UserNotFoundError if the user does not exist. Makes an API query + if `force` is True or if we haven't made one already. + """ + return self._get_attribute("_rights", force) + + def editcount(self, force=False): + """Returns the number of edits made by the user. + + Raises UserNotFoundError if the user does not exist. Makes an API query + if `force` is True or if we haven't made one already. + """ + return self._get_attribute("_editcount", force) + + def registration(self, force=False): + """Returns the time the user registered as a time.struct_time object. + + Raises UserNotFoundError if the user does not exist. Makes an API query + if `force` is True or if we haven't made one already. + """ + return self._get_attribute("_registration", force) + + def emailable(self, force=False): + """Returns True if the user can be emailed, or False if they cannot. + + Raises UserNotFoundError if the user does not exist. Makes an API query + if `force` is True or if we haven't made one already. + """ + return self._get_attribute("_emailable", force) + + def gender(self, force=False): + """Returns the user's gender. + + Can return either "male", "female", or "unknown", if they did not + specify it. + + Raises UserNotFoundError if the user does not exist. Makes an API query + if `force` is True or if we haven't made one already. + """ + return self._get_attribute("_gender", force) + + def get_userpage(self): + """Returns a Page object representing the user's userpage. + + No checks are made to see if it exists or not. Proper site namespace + conventions are followed. + """ + prefix = self._site.namespace_id_to_name(NS_USER) + pagename = ':'.join((prefix, self._name)) + return Page(self._site, pagename) + + def get_talkpage(self): + """Returns a Page object representing the user's talkpage. + + No checks are made to see if it exists or not. Proper site namespace + conventions are followed. + """ + prefix = self._site.namespace_id_to_name(NS_USER_TALK) + pagename = ':'.join((prefix, self._name)) + return Page(self._site, pagename)