Browse Source

Merge branch 'feature/wikitools-core' into develop

Ben Kurtovic 13 years ago
11 changed files with 1417 additions and 42 deletions
  1. +3
  2. +7
  3. +8
  4. +20
  5. +30
  6. +35
  7. +47
  8. +181
  9. +414
  10. +446
  11. +226

+ 3
- 0
.gitignore View File

@@ -4,6 +4,9 @@
# Ignore bot-specific config file:

# Ignore cookies file:

# Ignore OS X's crud:

+ 7
- 22
irc/commands/ View File

@@ -3,12 +3,11 @@
"""Report the status of AFC submissions, either as an automatic message on join
or a request via !status."""

import json
import re
import urllib

from core import config
from irc.classes import BaseCommand
from wiki import tools

class AFCStatus(BaseCommand):
def get_hooks(self):
@@ -29,6 +28,8 @@ class AFCStatus(BaseCommand):
return False

def process(self, data): = tools.get_site()

if data.line[1] == "JOIN":
notice = self.get_join_notice()
self.connection.notice(data.nick, notice)
@@ -85,19 +86,15 @@ class AFCStatus(BaseCommand):

def count_submissions(self):
"""Returns the number of open AFC submissions (count of CAT:PEND)."""
params = {'action': 'query', 'list': 'categorymembers', 'cmlimit':'500', 'format': 'json'}
params['cmtitle'] = "Category:Pending_AfC_submissions"
data = urllib.urlencode(params)
raw = urllib.urlopen("", data).read()
res = json.loads(raw)
subs = len(res['query']['categorymembers'])
cat ="Pending AfC submissions")
subs = cat.members(limit=500)
subs -= 2 # remove [[Wikipedia:Articles for creation/Redirects]] and [[Wikipedia:Files for upload]], which aren't real submissions
return subs

def count_redirects(self):
"""Returns the number of open redirect submissions. Calculated as the
total number of submissions minus the closed ones."""
content = self.get_page("Wikipedia:Articles_for_creation/Redirects")
content ="Wikipedia:Articles for creation/Redirects").get()
total = len(re.findall("^\s*==(.*?)==\s*$", content, re.MULTILINE))
closed = content.lower().count("{{afc-c|b}}")
redirs = total - closed
@@ -106,24 +103,12 @@ class AFCStatus(BaseCommand):
def count_files(self):
"""Returns the number of open WP:FFU (Files For Upload) requests.
Calculated as the total number of requests minus the closed ones."""
content = self.get_page("Wikipedia:Files_for_upload")
content ="Wikipedia:Files for upload").get()
total = len(re.findall("^\s*==(.*?)==\s*$", content, re.MULTILINE))
closed = content.lower().count("{{ifu-c|b}}")
files = total - closed
return files

def get_page(self, pagename):
"""Simple method to return the content of the page 'pagename'. Will be
a part of wiki/tools/ when I finish that."""
params = {'action': 'query', 'prop': 'revisions', 'rvprop':'content', 'rvlimit':'1', 'format': 'json'}
params['titles'] = pagename
data = urllib.urlencode(params)
raw = urllib.urlopen("", data).read()
res = json.loads(raw)
pageid = res['query']['pages'].keys()[0]
content = res['query']['pages'][pageid]['revisions'][0]['*']
return content

def get_aggregate(self, num):
"""Returns a human-readable AFC status based on the number of pending
AFC submissions, open redirect requests, and open FFU requests. This

+ 8
- 20
irc/commands/ View File

@@ -4,10 +4,8 @@
Retrieve a list of user rights for a given username via the API.

import json
import urllib

from irc.classes import BaseCommand
from wiki import tools

class Rights(BaseCommand):
def get_hooks(self):
@@ -27,24 +25,14 @@ class Rights(BaseCommand):

username = ' '.join(data.args)
rights = self.get_rights(username)
site = tools.get_site()
user = site.get_user(username)
rights = user.groups()
if rights:
rights.remove("*") # remove the implicit '*' group given to everyone
except ValueError:
self.connection.reply(data, "the rights for \x0302{0}\x0301 are {1}.".format(username, ', '.join(rights)))
self.connection.reply(data, "the user \x0302{0}\x0301 has no rights, or does not exist.".format(username))

def get_rights(self, username):
params = {'action': 'query', 'format': 'json', 'list': 'users', 'usprop': 'groups'}
params['ususers'] = username
data = urllib.urlencode(params)
raw = urllib.urlopen("", data).read()
res = json.loads(raw)
rights = res['query']['users'][0]['groups']
except KeyError: # 'groups' not found, meaning the user does not exist
return None
rights.remove("*") # remove the implicit '*' group given to everyone
except ValueError: # I don't expect this to happen, but if it does, be prepared
return rights

+ 20
- 0
wiki/tools/ View File

@@ -0,0 +1,20 @@
# -*- coding: utf-8 -*-

EarwigBot's Wiki Toolset

This is a collection of classes and functions to read from and write to
Wikipedia and other wiki sites. No connection whatsoever to python-wikitools
written by Mr.Z-man, other than a similar purpose. We share no code.

Import the toolset with `from wiki import tools`.

from import *
from import *
from import *

from import Category
from import Page
from import Site
from import User

+ 30
- 0
wiki/tools/ View File

@@ -0,0 +1,30 @@
# -*- coding: utf-8 -*-

from import Page

class Category(Page):
EarwigBot's Wiki Toolset: Category Class

Represents a Category on a given Site, a subclass of Page. Provides
additional methods, but Page's own methods should work fine on Category
objects. Site.get_page() will return a Category instead of a Page if the
given title is in the category namespace; get_category() is shorthand,
because it accepts category names without the namespace prefix.

Public methods:
members -- returns a list of titles in the category

def members(self, limit=50):
"""Returns a list of titles in the category.

If `limit` is provided, we will provide this many titles, or less if
the category is too small. `limit` defaults to 50; normal users can go
up to 500, and bots can go up to 5,000 on a single API query.
params = {"action": "query", "list": "categorymembers",
"cmlimit": limit, "cmtitle": self.title}
result = self._site._api_query(params)
members = result['query']['categorymembers']
return [member["title"] for member in members]

+ 35
- 0
wiki/tools/ View File

@@ -0,0 +1,35 @@
# -*- coding: utf-8 -*-

EarwigBot's Wiki Toolset: Constants

This module defines some useful constants, such as default namespace IDs for
easy lookup and our user agent.

Import with `from import *`.

import platform

# User agent when making API queries
USER_AGENT = "EarwigBot/0.1-dev (Python/{0};".format(platform.python_version())

# Default namespace IDs
NS_HELP = 12

+ 47
- 0
wiki/tools/ View File

@@ -0,0 +1,47 @@
# -*- coding: utf-8 -*-

EarwigBot's Wiki Toolset: Exceptions

This module contains all exceptions used by the package.

class WikiToolsetError(Exception):
"""Base exception class for errors in the Wiki Toolset."""

class SiteNotFoundError(WikiToolsetError):
"""A site matching the args given to get_site() could not be found in the
config file."""

class SiteAPIError(WikiToolsetError):
"""We couldn't connect to a site's API, perhaps because the server doesn't
exist, our URL is wrong or incomplete, or they're having temporary

class LoginError(WikiToolsetError):
"""An error occured while trying to login. Perhaps the username/password is

class PermissionsError(WikiToolsetError):
"""We tried to do something we don't have permission to, like a non-admin
trying to delete a page, or trying to edit a page when no login information
was provided."""

class NamespaceNotFoundError(WikiToolsetError):
"""A requested namespace name or namespace ID does not exist."""

class PageNotFoundError(WikiToolsetError):
"""Attempting to get certain information about a page that does not

class InvalidPageError(WikiToolsetError):
"""Attempting to get certain information about a page whose title is

class RedirectError(WikiToolsetError):
"""Page's get_redirect_target() method failed because the page is either
not a redirect, or it is malformed."""

class UserNotFoundError(WikiToolsetError):
"""Attempting to get certain information about a user that does not

+ 181
- 0
wiki/tools/ View File

@@ -0,0 +1,181 @@
# -*- coding: utf-8 -*-

EarwigBot's Wiki Toolset: Misc Functions

This module, a component of the package, contains miscellaneous
functions that are not methods of any class, like get_site().

There's no need to import this module explicitly. All functions here are
automatically available from

from cookielib import LWPCookieJar, LoadError
import errno
from getpass import getpass
from os import chmod, path
import stat

from core import config
from import SiteNotFoundError
from import Site

__all__ = ["get_site"]

_cookiejar = None

def _load_config():
"""Called by a config-requiring function, such as get_site(), when config
has not been loaded. This will usually happen only if we're running code
directly from Python's interpreter and not the bot itself, because or core/ will already call these functions.
is_encrypted = config.verify_config()
if is_encrypted: # passwords in the config file are encrypted
key = getpass("Enter key to unencrypt bot passwords: ")

def _get_cookiejar():
"""Returns a LWPCookieJar object loaded from our .cookies file. The same
one is returned every time.

The .cookies file is located in the project root, same directory as
config.json and If it doesn't exist, we will create the file
and set it to be readable and writeable only by us. If it exists but the
information inside is bogus, we will ignore it.

This is normally called by _get_site_object_from_dict() (in turn called by
get_site()), and the cookiejar is passed to our Site's constructor, used
when it makes API queries. This way, we can easily preserve cookies between
sites (e.g., for CentralAuth), making logins easier.
global _cookiejar
if _cookiejar is not None:
return _cookiejar

cookie_file = path.join(config.root_dir, ".cookies")
_cookiejar = LWPCookieJar(cookie_file)

except LoadError:
# file contains bad data, so ignore it completely
except IOError as e:
if e.errno == errno.ENOENT: # "No such file or directory"
# create the file and restrict reading/writing only to the owner,
# so others can't peak at our cookies
open(cookie_file, "w").close()
chmod(cookie_file, stat.S_IRUSR|stat.S_IWUSR)

return _cookiejar

def _get_site_object_from_dict(name, d):
"""Return a Site object based on the contents of a dict, probably acquired
through our config file, and a separate name.
project = d.get("project")
lang = d.get("lang")
base_url = d.get("baseURL")
article_path = d.get("articlePath")
script_path = d.get("scriptPath")
sql = (d.get("sqlServer"), d.get("sqlDB"))
namespaces = d.get("namespaces")
login = ("username"),"password"))
cookiejar = _get_cookiejar()

return Site(name=name, project=project, lang=lang, base_url=base_url,
article_path=article_path, script_path=script_path, sql=sql,
namespaces=namespaces, login=login, cookiejar=cookiejar)

def get_site(name=None, project=None, lang=None):
"""Returns a Site instance based on information from our config file.

With no arguments, returns the default site as specified by our config
file. This is default =["defaultSite"];["sites"][default].

With `name` specified, returns the site specified by["sites"][name].

With `project` and `lang` specified, returns the site specified by the
member of["sites"], `s`, for which s["project"] == project and
s["lang"] == lang.

We will attempt to login to the site automatically
using["username"] and["password"] if both are

Specifying a project without a lang or a lang without a project will raise
TypeError. If all three args are specified, `name` will be first tried,
then `project` and `lang`. If, with any number of args, a site cannot be
found in the config, SiteNotFoundError is raised.
# check if config has been loaded, and load it if it hasn't
if not config.is_config_loaded():

# someone specified a project without a lang (or a lang without a project)!
if (project is None and lang is not None) or (project is not None and
lang is None):
e = "Keyword arguments 'lang' and 'project' must be specified together."
raise TypeError(e)

# no args given, so return our default site (project is None implies lang
# is None, so we don't need to add that in)
if name is None and project is None:
default =["defaultSite"]
except KeyError:
e = "Default site is not specified in config."
raise SiteNotFoundError(e)
site =["sites"][default]
except KeyError:
e = "Default site specified by config is not in the config's sites list."
raise SiteNotFoundError(e)
return _get_site_object_from_dict(default, site)

# name arg given, but don't look at others unless `name` isn't found
if name is not None:
site =["sites"][name]
except KeyError:
if project is None: # implies lang is None, so only name was given
e = "Site '{0}' not found in config.".format(name)
raise SiteNotFoundError(e)
for sitename, site in["sites"].items():
if site["project"] == project and site["lang"] == lang:
return _get_site_object_from_dict(sitename, site)
e = "Neither site '{0}' nor site '{1}:{2}' found in config."
e.format(name, project, lang)
raise SiteNotFoundError(e)
return _get_site_object_from_dict(name, site)

# if we end up here, then project and lang are both not None
for sitename, site in["sites"].items():
if site["project"] == project and site["lang"] == lang:
return _get_site_object_from_dict(sitename, site)
e = "Site '{0}:{1}' not found in config.".format(project, lang)
raise SiteNotFoundError(e)

def add_site():
"""STUB: config editing is required first.

Returns True if the site was added successfully or False if the site was
already in our config. Raises ConfigError if saving the updated file failed
for some reason."""

def del_site(name):
"""STUB: config editing is required first.

Returns True if the site was removed successfully or False if the site was
not in our config originally. Raises ConfigError if saving the updated file
failed for some reason."""

+ 414
- 0
wiki/tools/ View File

@@ -0,0 +1,414 @@
# -*- coding: utf-8 -*-

import re
from urllib import quote

from import *

class Page(object):
EarwigBot's Wiki Toolset: Page Class

Represents a Page on a given Site. Has methods for getting information
about the page, getting page content, and so on. Category is a subclass of
Page with additional methods.

Public methods:
title -- returns the page's title, or pagename
exists -- returns whether the page exists
pageid -- returns an integer ID representing the page
url -- returns the page's URL
namespace -- returns the page's namespace as an integer
protection -- returns the page's current protection status
is_talkpage -- returns True if the page is a talkpage, else False
is_redirect -- returns True if the page is a redirect, else False
toggle_talk -- returns a content page's talk page, or vice versa
get -- returns page content
get_redirect_target -- if the page is a redirect, returns its destination

def __init__(self, site, title, follow_redirects=False):
"""Constructor for new Page instances.

Takes three arguments: a Site object, the Page's title (or pagename),
and whether or not to follow redirects (optional, defaults to False).

As with User, site.get_page() is preferred. Site's method has support
for a default `follow_redirects` value in our config, while __init__
always defaults to False.

__init__ will not do any API queries, but it will use basic namespace
logic to determine our namespace ID and if we are a talkpage.
self._site = site
self._title = title.strip()
self._follow_redirects = self._keep_following = follow_redirects

self._exists = 0
self._pageid = None
self._is_redirect = None
self._lastrevid = None
self._protection = None
self._fullurl = None
self._content = None

# Try to determine the page's namespace using our site's namespace
# converter:
prefix = self._title.split(":", 1)[0]
if prefix != title: # ignore a page that's titled "Category" or "User"
self._namespace = self._site.namespace_name_to_id(prefix)
except NamespaceNotFoundError:
self._namespace = 0
self._namespace = 0

# Is this a talkpage? Talkpages have odd IDs, while content pages have
# even IDs, excluding the "special" namespaces:
if self._namespace < 0:
self._is_talkpage = False
self._is_talkpage = self._namespace % 2 == 1

def _force_validity(self):
"""Used to ensure that our page's title is valid.

If this method is called when our page is not valid (and after
_load_attributes() has been called), InvalidPageError will be raised.

Note that validity != existence. If a page's title is invalid (e.g, it
contains "[") it will always be invalid, and cannot be edited.
if self._exists == 1:
e = "Page '{0}' is invalid.".format(self._title)
raise InvalidPageError(e)

def _force_existence(self):
"""Used to ensure that our page exists.

If this method is called when our page doesn't exist (and after
_load_attributes() has been called), PageNotFoundError will be raised.
It will also call _force_validity() beforehand.
if self._exists == 2:
e = "Page '{0}' does not exist.".format(self._title)
raise PageNotFoundError(e)

def _load_wrapper(self):
"""Calls _load_attributes() and follows redirects if we're supposed to.

This method will only follow redirects if follow_redirects=True was
passed to __init__() (perhaps indirectly passed by site.get_page()).
It avoids the API's &redirects param in favor of manual following,
so we can act more realistically (we don't follow double redirects, and
circular redirects don't break us).

This will raise RedirectError if we have a problem following, but that
is a bug and should NOT happen.

If we're following a redirect, this will make a grand total of three
API queries. It's a lot, but each one is quite small.

if self._keep_following and self._is_redirect:
self._title = self.get_redirect_target()
self._keep_following = False # don't follow double redirects
self._content = None # reset the content we just loaded

def _load_attributes(self, result=None):
"""Loads various data from the API in a single query.

Loads self._title, ._exists, ._is_redirect, ._pageid, ._fullurl,
._protection, ._namespace, ._is_talkpage, and ._lastrevid using the
API. It will do a query of its own unless `result` is provided, in
which case we'll pretend `result` is what the query returned.

Assuming the API is sound, this should not raise any exceptions.
if result is None:
params = {"action": "query", "prop": "info", "titles": self._title,
"inprop": "protection|url"}
result = self._site._api_query(params)

res = result["query"]["pages"].values()[0]

# Normalize our pagename/title thing:
self._title = res["title"]

except KeyError:
self._is_redirect = False
self._is_redirect = True

self._pageid = result["query"]["pages"].keys()[0]
if int(self._pageid) < 0:
except KeyError:
# If it has a negative ID and it's invalid, then break here,
# because there's no other data for us to get:
self._exists = 1
# If it has a negative ID and it's missing; we can still get
# data like the namespace, protection, and URL:
self._exists = 2
self._exists = 3

self._fullurl = res["fullurl"]
self._protection = res["protection"]

# We've determined the namespace and talkpage status in __init__()
# based on the title, but now we can be sure:
self._namespace = res["ns"]
self._is_talkpage = self._namespace % 2 == 1 # talkpages have odd IDs

# This last field will only be specified if the page exists:
self._lastrevid = res["lastrevid"]
except KeyError:

def _load_content(self, result=None):
"""Loads current page content from the API.

If `result` is provided, we'll pretend that is the result of an API
query and try to get content from that. Otherwise, we'll do an API
query on our own.

Don't call this directly, ever - use .get(force=True) if you want to
force content reloading.
if result is None:
params = {"action": "query", "prop": "revisions", "rvlimit": 1,
"rvprop": "content", "titles": self._title}
result = self._site._api_query(params)

res = result["query"]["pages"].values()[0]
content = res["revisions"][0]["*"]
self._content = content
except KeyError:
# This can only happen if the page was deleted since we last called
# self._load_attributes(). In that case, some of our attributes are
# outdated, so force another self._load_attributes():

def title(self, force=False):
"""Returns the Page's title, or pagename.

This won't do any API queries on its own unless force is True, in which
case the title will be forcibly reloaded from the API (normalizing it,
and following redirects if follow_redirects=True was passed to
__init__()). Any other methods that do API queries will reload title on
their own, however, like exists() and get().
if force:
return self._title

def exists(self, force=False):
"""Returns information about whether the Page exists or not.

The returned "information" is a tuple with two items. The first is a
bool, either True if the page exists or False if it does not. The
second is a string giving more information, either "invalid", (title
is invalid, e.g. it contains "["), "missing", or "exists".

Makes an API query if force is True or if we haven't already made one.
cases = {
0: (None, "unknown"),
1: (False, "invalid"),
2: (False, "missing"),
3: (True, "exists"),
if self._exists == 0 or force:
return cases[self._exists]

def pageid(self, force=False):
"""Returns an integer ID representing the Page.

Makes an API query if force is True or if we haven't already made one.

Raises InvalidPageError or PageNotFoundError if the page name is
invalid or the page does not exist, respectively.
if self._exists == 0 or force:
self._force_existence() # missing pages do not have IDs
return self._pageid

def url(self, force=False):
"""Returns the page's URL.

Like title(), this won't do any API queries on its own unless force is
True. If the API was never queried for this page, we will attempt to
determine the URL ourselves based on the title.
if force:
if self._fullurl is not None:
return self._fullurl
slug = quote(self._title.replace(" ", "_"), safe="/:")
path = self._site._article_path.replace("$1", slug)
return ''.join((self._site._base_url, path))

def namespace(self, force=False):
"""Returns the page's namespace ID (an integer).

Like title(), this won't do any API queries on its own unless force is
True. If the API was never queried for this page, we will attempt to
determine the namespace ourselves based on the title.
if force:
return self._namespace

def protection(self, force=False):
"""Returns the page's current protection status.

Makes an API query if force is True or if we haven't already made one.

Raises InvalidPageError if the page name is invalid. Will not raise an
error if the page is missing because those can still be protected.
if self._exists == 0 or force:
self._force_validity() # invalid pages cannot be protected
return self._protection

def is_talkpage(self, force=False):
"""Returns True if the page is a talkpage, else False.

Like title(), this won't do any API queries on its own unless force is
True. If the API was never queried for this page, we will attempt to
determine the talkpage status ourselves based on its namespace ID.
if force:
return self._is_talkpage

def is_redirect(self, force=False):
"""Returns True if the page is a redirect, else False.

Makes an API query if force is True or if we haven't already made one.

We will return False even if the page does not exist or is invalid.
if self._exists == 0 or force:
return self._is_redirect

def toggle_talk(self, force=False, follow_redirects=None):
"""Returns a content page's talk page, or vice versa.

The title of the new page is determined by namespace logic, not API
queries. We won't make any API queries on our own unless force is True,
and the only reason then would be to forcibly update the title or
follow redirects if we haven't already made an API query.

If `follow_redirects` is anything other than None (the default), it
will be passed to the new Page's __init__(). Otherwise, we'll use the
value passed to our own __init__().

Will raise InvalidPageError if we try to get the talk page of a special
page (in the Special: or Media: namespaces), but we won't raise an
exception if our page is otherwise missing or invalid.
if force:
if self._namespace < 0:
ns = self._site.namespace_id_to_name(self._namespace)
e = "Pages in the {0} namespace can't have talk pages.".format(ns)
raise InvalidPageError(e)

if self._is_talkpage:
new_ns = self._namespace - 1
new_ns = self._namespace + 1

body = self._title.split(":", 1)[1]
except IndexError:
body = self._title

new_prefix = self._site.namespace_id_to_name(new_ns)

# If the new page is in namespace 0, don't do ":Title" (it's correct,
# but unnecessary), just do "Title":
if new_prefix:
new_title = ':'.join((new_prefix, body))
new_title = body

if follow_redirects is None:
follow_redirects = self._follow_redirects
return Page(self._site, new_title, follow_redirects)

def get(self, force=False):
"""Returns page content, which is cached if you try to call get again.

Use `force` to forcibly reload page content even if we've already
loaded some. This is good if you want to edit a page multiple times,
and you want to get updated content before you make your second edit.

Raises InvalidPageError or PageNotFoundError if the page name is
invalid or the page does not exist, respectively.
if force or self._exists == 0:
# Kill two birds with one stone by doing an API query for both our
# attributes and our page content:
params = {"action": "query", "rvprop": "content", "rvlimit": 1,
"prop": "info|revisions", "inprop": "protection|url",
"titles": self._title}
result = self._site._api_query(params)

# Follow redirects if we're told to:
if self._keep_following and self._is_redirect:
self._title = self.get_redirect_target()
self._keep_following = False # don't follow double redirects
self._content = None # reset the content we just loaded

return self._content

# Make sure we're dealing with a real page here. This may be outdated
# if the page was deleted since we last called self._load_attributes(),
# but self._load_content() can handle that:

if self._content is None:

return self._content

def get_redirect_target(self, force=False):
"""If the page is a redirect, returns its destination.

Use `force` to forcibly reload content even if we've already loaded
some before. Note that this method calls get() for page content.

Raises InvalidPageError or PageNotFoundError if the page name is
invalid or the page does not exist, respectively. Raises RedirectError
if the page is not a redirect.
content = self.get(force)
regexp = "^\s*\#\s*redirect\s*\[\[(.*?)\]\]"
return re.findall(regexp, content, flags=re.IGNORECASE)[0]
except IndexError:
e = "The page does not appear to have a redirect target."
raise RedirectError(e)

+ 446
- 0
wiki/tools/ View File

@@ -0,0 +1,446 @@
# -*- coding: utf-8 -*-

from cookielib import CookieJar
from gzip import GzipFile
from json import loads
from re import escape as re_escape, match as re_match
from StringIO import StringIO
from urllib import unquote_plus, urlencode
from urllib2 import build_opener, HTTPCookieProcessor, URLError
from urlparse import urlparse

from import Category
from import *
from import *
from import Page
from import User

class Site(object):
EarwigBot's Wiki Toolset: Site Class

Represents a Site, with support for API queries and returning Pages, Users,
and Categories. The constructor takes a bunch of arguments and you probably
won't need to call it directly, rather tools.get_site() for returning Site
instances, tools.add_site() for adding new ones to config, and
tools.del_site() for removing old ones from config, should suffice.

Public methods:
name -- returns our name (or "wikiid"), like "enwiki"
project -- returns our project name, like "wikipedia"
lang -- returns our language code, like "en"
domain -- returns our web domain, like ""
api_query -- does an API query with the given kwargs as params
namespace_id_to_name -- given a namespace ID, returns associated name(s)
namespace_name_to_id -- given a namespace name, returns associated id
get_page -- returns a Page object for the given title
get_category -- returns a Category object for the given title
get_user -- returns a User object for the given username

def __init__(self, name=None, project=None, lang=None, base_url=None,
article_path=None, script_path=None, sql=(None, None),
namespaces=None, login=(None, None), cookiejar=None):
"""Constructor for new Site instances.

This probably isn't necessary to call yourself unless you're building a
Site that's not in your config and you don't want to add it - normally
all you need is tools.get_site(name), which creates the Site for you
based on your config file. We accept a bunch of kwargs, but the only
ones you really "need" are `base_url` and `script_path` - this is
enough to figure out an API url. `login`, a tuple of
(username, password), is highly recommended. `cookiejar` will be used
to store cookies, and we'll use a normal CookieJar if none is given.

First, we'll store the given arguments as attributes, then set up our
URL opener. We'll load any of the attributes that weren't given from
the API, and then log in if a username/pass was given and we aren't
already logged in.
# attributes referring to site information, filled in by an API query
# if they are missing (and an API url can be determined)
self._name = name
self._project = project
self._lang = lang
self._base_url = base_url
self._article_path = article_path
self._script_path = script_path
self._sql = sql
self._namespaces = namespaces

# set up cookiejar and URL opener for making API queries
if cookiejar is not None:
self._cookiejar = cookiejar
self._cookiejar = CookieJar()
self._opener = build_opener(HTTPCookieProcessor(self._cookiejar))
self._opener.addheaders = [("User-Agent", USER_AGENT),
("Accept-Encoding", "gzip")]

# get all of the above attributes that were not specified as arguments

# if we have a name/pass and the API says we're not logged in, log in
self._login_info = name, password = login
if name is not None and password is not None:
logged_in_as = self._get_username_from_cookies()
if logged_in_as is None or name != logged_in_as:

def _api_query(self, params):
"""Do an API query with `params` as a dict of parameters.

This will first attempt to construct an API url from self._base_url and
self._script_path. We need both of these, or else we'll raise

We'll encode the given params, adding format=json along the way, and
make the request through self._opener, which has built-in cookie
support via self._cookiejar, a User-Agent
(, and Accept-Encoding set to "gzip".
Assuming everything went well, we'll gunzip the data (if compressed),
load it as a JSON object, and return it.

If our request failed, we'll raise SiteAPIError with details.

There's helpful MediaWiki API documentation at
if self._base_url is None or self._script_path is None:
e = "Tried to do an API query, but no API URL is known."
raise SiteAPIError(e)

url = ''.join((self._base_url, self._script_path, "/api.php"))
params["format"] = "json" # this is the only format we understand
data = urlencode(params)

print url, data # debug code

response =, data)
except URLError as error:
if hasattr(error, "reason"):
e = "API query at {0} failed because {1}."
e = e.format(error.geturl, error.reason)
elif hasattr(error, "code"):
e = "API query at {0} failed; got an error code of {1}."
e = e.format(error.geturl, error.code)
e = "API query failed."
raise SiteAPIError(e)
result =
if response.headers.get("Content-Encoding") == "gzip":
stream = StringIO(result)
gzipper = GzipFile(fileobj=stream)
result =
return loads(result) # parse as a JSON object

def _load_attributes(self, force=False):
"""Load data about our Site from the API.

This function is called by __init__() when one of the site attributes
was not given as a keyword argument. We'll do an API query to get the
missing data, but only if there actually *is* missing data.

Additionally, you can call this with `force=True` to forcibly reload
all attributes.
# all attributes to be loaded, except _namespaces, which is a special
# case because it requires additional params in the API query
attrs = [self._name, self._project, self._lang, self._base_url,
self._article_path, self._script_path]

params = {"action": "query", "meta": "siteinfo"}

if self._namespaces is None or force:
params["siprop"] = "general|namespaces|namespacealiases"
result = self._api_query(params)
elif all(attrs): # everything is already specified and we're not told
return # to force a reload, so do nothing
else: # we're only loading attributes other than _namespaces
params["siprop"] = "general"
result = self._api_query(params)

res = result["query"]["general"]
self._name = res["wikiid"]
self._project = res["sitename"].lower()
self._lang = res["lang"]
self._base_url = res["server"]
self._article_path = res["articlepath"]
self._script_path = res["scriptpath"]

def _load_namespaces(self, result):
"""Fill self._namespaces with a dict of namespace IDs and names.

Called by _load_attributes() with API data as `result` when
self._namespaces was not given as an kwarg to __init__().
self._namespaces = {}

for namespace in result["query"]["namespaces"].values():
ns_id = namespace["id"]
name = namespace["*"]
canonical = namespace["canonical"]
except KeyError:
self._namespaces[ns_id] = [name]
if name != canonical:
self._namespaces[ns_id] = [name, canonical]
self._namespaces[ns_id] = [name]

for namespace in result["query"]["namespacealiases"]:
ns_id = namespace["id"]
alias = namespace["*"]

def _get_cookie(self, name, domain):
"""Return the named cookie unless it is expired or doesn't exist."""
for cookie in self._cookiejar:
if == name and cookie.domain == domain:
if cookie.is_expired():
return cookie

def _get_username_from_cookies(self):
"""Try to return our username based solely on cookies.

First, we'll look for a cookie named self._name + "Token", like
"enwikiToken". If it exists and isn't expired, we'll assume it's valid
and try to return the value of the cookie self._name + "UserName" (like
"enwikiUserName"). This should work fine on wikis without single-user

If `enwikiToken` doesn't exist, we'll try to find a cookie named
`centralauth_Token`. If this exists and is not expired, we'll try to
return the value of `centralauth_User`.

If we didn't get any matches, we'll return None. Our goal here isn't to
return the most likely username, or what we *want* our username to be
(for that, we'd do self._login_info[0]), but rather to get our current
username without an unnecessary ?action=query&meta=userinfo API query.
domain = self.domain()
name = ''.join((self._name, "Token"))
cookie = self._get_cookie(name, domain)

if cookie is not None:
name = ''.join((self._name, "UserName"))
user_name = self._get_cookie(name, domain)
if user_name is not None:
return user_name.value

name = "centralauth_Token"
for cookie in self._cookiejar:
if cookie.domain_initial_dot is False or cookie.is_expired():
if != name:
# build a regex that will match domains this cookie affects
search = ''.join(("(.*?)", re_escape(cookie.domain)))
if re_match(search, domain): # test it against our site
user_name = self._get_cookie("centralauth_User", cookie.domain)
if user_name is not None:
return user_name.value

def _get_username_from_api(self):
"""Do a simple API query to get our username and return it.
This is a reliable way to make sure we are actually logged in, because
it doesn't deal with annoying cookie logic, but it results in an API
query that is unnecessary in some cases.
Called by _get_username() (in turn called by get_user() with no
username argument) when cookie lookup fails, probably indicating that
we are logged out.
params = {"action": "query", "meta": "userinfo"}
result = self._api_query(params)
return result["query"]["userinfo"]["name"]

def _get_username(self):
"""Return the name of the current user, whether logged in or not.

First, we'll try to deduce it solely from cookies, to avoid an
unnecessary API query. For the cookie-detection method, see
_get_username_from_cookies()'s docs.

If our username isn't in cookies, then we're probably not logged in, or
something fishy is going on (like forced logout). In this case, do a
single API query for our username (or IP address) and return that.
name = self._get_username_from_cookies()
if name is not None:
return name
return self._get_username_from_api()

def _save_cookiejar(self):
"""Try to save our cookiejar after doing a (normal) login or logout.

Calls the standard .save() method with no filename. Don't fret if our
cookiejar doesn't support saving (CookieJar raises AttributeError,
FileCookieJar raises NotImplementedError) or no default filename was
given (LWPCookieJar and MozillaCookieJar raise ValueError).
except (AttributeError, NotImplementedError, ValueError):

def _login(self, login, token=None, attempt=0):
"""Safely login through the API.

Normally, this is called by __init__() if a username and password have
been provided and no valid login cookies were found. The only other
time it needs to be called is when those cookies expire, which is done
automatically by api_query() if a query fails.

Recent versions of MediaWiki's API have fixed a CSRF vulnerability,
requiring login to be done in two separate requests. If the response
from from our initial request is "NeedToken", we'll do another one with
the token. If login is successful, we'll try to save our cookiejar.

Raises LoginError on login errors (duh), like bad passwords and
nonexistent usernames.

`login` is a (username, password) tuple. `token` is the token returned
from our first request, and `attempt` is to prevent getting stuck in a
loop if MediaWiki isn't acting right.
name, password = login
params = {"action": "login", "lgname": name, "lgpassword": password}
if token is not None:
params["lgtoken"] = token
result = self._api_query(params)
res = result["login"]["result"]

if res == "Success":
elif res == "NeedToken" and attempt == 0:
token = result["login"]["token"]
return self._login(login, token, attempt=1)
if res == "Illegal":
e = "The provided username is illegal."
elif res == "NotExists":
e = "The provided username does not exist."
elif res == "EmptyPass":
e = "No password was given."
elif res == "WrongPass" or res == "WrongPluginPass":
e = "The given password is incorrect."
e = "Couldn't login; server says '{0}'.".format(res)
raise LoginError(e)

def _logout(self):
"""Safely logout through the API.

We'll do a simple API request (api.php?action=logout), clear our
cookiejar (which probably contains now-invalidated cookies) and try to
save it, if it supports that sort of thing.
params = {"action": "logout"}

def api_query(self, **kwargs):
"""Do an API query with `kwargs` as the parameters.

See _api_query()'s documentation for details.
return self._api_query(kwargs)

def name(self):
"""Returns the Site's name (or "wikiid" in the API), like "enwiki"."""
return self._name

def project(self):
"""Returns the Site's project name in lowercase, like "wikipedia"."""
return self._project

def lang(self):
"""Returns the Site's language code, like "en" or "es"."""
return self._lang

def domain(self):
"""Returns the Site's web domain, like ""."""
return urlparse(self._base_url).netloc

def namespace_id_to_name(self, ns_id, all=False):
"""Given a namespace ID, returns associated namespace names.

If all is False (default), we'll return the first name in the list,
which is usually the localized version. Otherwise, we'll return the
entire list, which includes the canonical name.

For example, returns u"Wikipedia" if ns_id=4 and all=False on enwiki;
returns [u"Wikipedia", u"Project"] if ns_id=4 and all=True.

Raises NamespaceNotFoundError if the ID is not found.
if all:
return self._namespaces[ns_id]
return self._namespaces[ns_id][0]
except KeyError:
e = "There is no namespace with id {0}.".format(ns_id)
raise NamespaceNotFoundError(e)

def namespace_name_to_id(self, name):
"""Given a namespace name, returns the associated ID.

Like namespace_id_to_name(), but reversed. Case is ignored, because
namespaces are assumed to be case-insensitive.

Raises NamespaceNotFoundError if the name is not found.
lname = name.lower()
for ns_id, names in self._namespaces.items():
lnames = [n.lower() for n in names] # be case-insensitive
if lname in lnames:
return ns_id

e = "There is no namespace with name '{0}'.".format(name)
raise NamespaceNotFoundError(e)

def get_page(self, title, follow_redirects=False):
"""Returns a Page object for the given title (pagename).

Will return a Category object instead if the given title is in the
category namespace. As Category is a subclass of Page, this should not
cause problems.

Note that this doesn't do any direct checks for existence or
redirect-following - Page's methods provide that.
prefixes = self.namespace_id_to_name(NS_CATEGORY, all=True)
prefix = title.split(":", 1)[0]
if prefix != title: # avoid a page that is simply "Category"
if prefix in prefixes:
return Category(self, title, follow_redirects)
return Page(self, title, follow_redirects)

def get_category(self, catname, follow_redirects=False):
"""Returns a Category object for the given category name.

`catname` should be given *without* a namespace prefix. This method is
really just shorthand for get_page("Category:" + catname).
prefix = self.namespace_id_to_name(NS_CATEGORY)
pagename = ':'.join((prefix, catname))
return Category(self, pagename, follow_redirects)

def get_user(self, username=None):
"""Returns a User object for the given username.

If `username` is left as None, then a User object representing the
currently logged-in (or anonymous!) user is returned.
if username is None:
username = self._get_username()
return User(self, username)

+ 226
- 0
wiki/tools/ View File

@@ -0,0 +1,226 @@
# -*- coding: utf-8 -*-

from time import strptime

from import *
from import UserNotFoundError
from import Page

class User(object):
EarwigBot's Wiki Toolset: User Class

Represents a User on a given Site. Has methods for getting a bunch of
information about the user, such as editcount and user rights, methods for
returning the user's userpage and talkpage, etc.

Public methods:
name -- returns the user's username
exists -- returns True if the user exists, False if they do not
userid -- returns an integer ID representing the user
blockinfo -- returns information about a current block on the user
groups -- returns a list of the user's groups
rights -- returns a list of the user's rights
editcount -- returns the number of edits made by the user
registration -- returns the time the user registered as a time.struct_time
emailable -- returns True if you can email the user, False if you cannot
gender -- returns the user's gender ("male", "female", or "unknown")
get_userpage -- returns a Page object representing the user's userpage
get_talkpage -- returns a Page object representing the user's talkpage

def __init__(self, site, name):
"""Constructor for new User instances.

Takes two arguments, a Site object (necessary for doing API queries),
and the name of the user, preferably without "User:" in front, although
this prefix will be automatically removed by the API if given.

You can also use site.get_user() instead, which returns a User object,
and is preferred.

We won't do any API queries yet for basic information about the user -
save that for when the information is requested.
self._site = site
self._name = name

def _get_attribute(self, attr, force):
"""Internally used to get an attribute by name.

We'll call _load_attributes() to get this (and all other attributes)
from the API if it is not already defined. If `force` is True, we'll
re-load them even if they've already been loaded.

Raises UserNotFoundError if a nonexistant user prevents us from
returning a certain attribute.
if not hasattr(self, attr) or force:
if self._exists is False:
e = "User '{0}' does not exist.".format(self._name)
raise UserNotFoundError(e)
return getattr(self, attr)

def _load_attributes(self):
"""Internally used to load all attributes from the API.

Normally, this is called by _get_attribute() when a requested attribute
is not defined. This defines it.
params = {"action": "query", "list": "users", "ususers": self._name,
"usprop": "blockinfo|groups|rights|editcount|registration|emailable|gender"}
result = self._site._api_query(params)
res = result["query"]["users"][0]

# normalize our username in case it was entered oddly
self._name = res["name"]

self._userid = res["userid"]
except KeyError: # userid is missing, so user does not exist
self._exists = False

self._exists = True

self._blockinfo = {
"by": res["blockedby"],
"reason": res["blockreason"],
"expiry": res["blockexpiry"]
except KeyError:
self._blockinfo = False

self._groups = res["groups"]
self._rights = res["rights"].values()
self._editcount = res["editcount"]

reg = res["registration"]
self._registration = strptime(reg, "%Y-%m-%dT%H:%M:%SZ")

except KeyError:
self._emailable = False
self._emailable = True

self._gender = res["gender"]

def name(self, force=False):
"""Returns the user's name.

If `force` is True, we will load the name from the API and return that.
This could potentially return a "normalized" version of the name - for
example, without a "User:" prefix or without underscores. Unlike other
attribute getters, this will never make an API query without `force`.

Note that if another attribute getter, like exists(), has already been
called, then the username has already been normalized.
if force:
return self._name

def exists(self, force=False):
"""Returns True if the user exists, or False if they do not.

Makes an API query if `force` is True or if we haven't made one
if not hasattr(self, "_exists") or force:
return self._exists

def userid(self, force=False):
"""Returns an integer ID used by MediaWiki to represent the user.

Raises UserNotFoundError if the user does not exist. Makes an API query
if `force` is True or if we haven't made one already.
return self._get_attribute("_userid", force)

def blockinfo(self, force=False):
"""Returns information about a current block on the user.

If the user is not blocked, returns False. If they are, returns a dict
with three keys: "by" is the blocker's username, "reason" is the reason
why they were blocked, and "expiry" is when the block expires.

Raises UserNotFoundError if the user does not exist. Makes an API query
if `force` is True or if we haven't made one already.
return self._get_attribute("_blockinfo", force)

def groups(self, force=False):
"""Returns a list of groups this user is in, including "*".

Raises UserNotFoundError if the user does not exist. Makes an API query
if `force` is True or if we haven't made one already.
return self._get_attribute("_groups", force)

def rights(self, force=False):
"""Returns a list of this user's rights.

Raises UserNotFoundError if the user does not exist. Makes an API query
if `force` is True or if we haven't made one already.
return self._get_attribute("_rights", force)

def editcount(self, force=False):
"""Returns the number of edits made by the user.

Raises UserNotFoundError if the user does not exist. Makes an API query
if `force` is True or if we haven't made one already.
return self._get_attribute("_editcount", force)

def registration(self, force=False):
"""Returns the time the user registered as a time.struct_time object.

Raises UserNotFoundError if the user does not exist. Makes an API query
if `force` is True or if we haven't made one already.
return self._get_attribute("_registration", force)

def emailable(self, force=False):
"""Returns True if the user can be emailed, or False if they cannot.

Raises UserNotFoundError if the user does not exist. Makes an API query
if `force` is True or if we haven't made one already.
return self._get_attribute("_emailable", force)

def gender(self, force=False):
"""Returns the user's gender.

Can return either "male", "female", or "unknown", if they did not
specify it.

Raises UserNotFoundError if the user does not exist. Makes an API query
if `force` is True or if we haven't made one already.
return self._get_attribute("_gender", force)

def get_userpage(self):
"""Returns a Page object representing the user's userpage.
No checks are made to see if it exists or not. Proper site namespace
conventions are followed.
prefix = self._site.namespace_id_to_name(NS_USER)
pagename = ':'.join((prefix, self._name))
return Page(self._site, pagename)

def get_talkpage(self):
"""Returns a Page object representing the user's talkpage.
No checks are made to see if it exists or not. Proper site namespace
conventions are followed.
prefix = self._site.namespace_id_to_name(NS_USER_TALK)
pagename = ':'.join((prefix, self._name))
return Page(self._site, pagename)
