A Python robot that edits Wikipedia and interacts with people over IRC https://en.wikipedia.org/wiki/User:EarwigBot
Nie możesz wybrać więcej, niż 25 tematów Tematy muszą się zaczynać od litery lub cyfry, mogą zawierać myślniki ('-') i mogą mieć do 35 znaków.

428 wiersze
18 KiB

  1. # -*- coding: utf-8 -*-
  2. from cookielib import CookieJar
  3. from gzip import GzipFile
  4. from json import loads
  5. from re import escape as re_escape, match as re_match
  6. from StringIO import StringIO
  7. from urllib import unquote_plus, urlencode
  8. from urllib2 import build_opener, HTTPCookieProcessor, URLError
  9. from urlparse import urlparse
  10. from wiki.tools.category import Category
  11. from wiki.tools.constants import *
  12. from wiki.tools.exceptions import *
  13. from wiki.tools.page import Page
  14. from wiki.tools.user import User
  15. class Site(object):
  16. """
  17. EarwigBot's Wiki Toolset: Site Class
  18. Represents a Site, with support for API queries and returning Pages, Users,
  19. and Categories. The constructor takes a bunch of arguments and you probably
  20. won't need to call it directly, rather tools.get_site() for returning Site
  21. instances, tools.add_site() for adding new ones to config, and
  22. tools.del_site() for removing old ones from config, should suffice.
  23. """
  24. def __init__(self, name=None, project=None, lang=None, base_url=None,
  25. article_path=None, script_path=None, sql=(None, None),
  26. namespaces=None, login=(None, None), cookiejar=None):
  27. """Constructor for new Site instances.
  28. This probably isn't necessary to call yourself unless you're building a
  29. Site that's not in your config and you don't want to add it - normally
  30. all you need is tools.get_site(name), which creates the Site for you
  31. based on your config file. We accept a bunch of kwargs, but the only
  32. ones you really "need" are `base_url` and `script_path` - this is
  33. enough to figure out an API url. `login`, a tuple of
  34. (username, password), is highly recommended. `cookiejar` will be used
  35. to store cookies, and we'll use a normal CookieJar if none is given.
  36. First, we'll store the given arguments as attributes, then set up our
  37. URL opener. We'll load any of the attributes that weren't given from
  38. the API, and then log in if a username/pass was given and we aren't
  39. already logged in.
  40. """
  41. # attributes referring to site information, filled in by an API query
  42. # if they are missing (and an API url can be determined)
  43. self._name = name
  44. self._project = project
  45. self._lang = lang
  46. self._base_url = base_url
  47. self._article_path = article_path
  48. self._script_path = script_path
  49. self._sql = sql
  50. self._namespaces = namespaces
  51. # set up cookiejar and URL opener for making API queries
  52. if cookiejar is not None:
  53. self._cookiejar = cookiejar
  54. else:
  55. self._cookiejar = CookieJar()
  56. self._opener = build_opener(HTTPCookieProcessor(self._cookiejar))
  57. self._opener.addheaders = [("User-Agent", USER_AGENT),
  58. ("Accept-Encoding", "gzip")]
  59. # get all of the above attributes that were not specified as arguments
  60. self._load_attributes()
  61. # if we have a name/pass and the API says we're not logged in, log in
  62. self._login_info = name, password = login
  63. if name is not None and password is not None:
  64. logged_in_as = self._get_username_from_cookies()
  65. if logged_in_as is None or name != logged_in_as:
  66. self._login(login)
  67. def _load_attributes(self, force=False):
  68. """Load data about our Site from the API.
  69. This function is called by __init__() when one of the site attributes
  70. was not given as a keyword argument. We'll do an API query to get the
  71. missing data, but only if there actually *is* missing data.
  72. Additionally, you can call this with `force=True` to forcibly reload
  73. all attributes.
  74. """
  75. # all attributes to be loaded, except _namespaces, which is a special
  76. # case because it requires additional params in the API query
  77. attrs = [self._name, self._project, self._lang, self._base_url,
  78. self._article_path, self._script_path]
  79. params = {"action": "query", "meta": "siteinfo"}
  80. if self._namespaces is None or force:
  81. params["siprop"] = "general|namespaces|namespacealiases"
  82. result = self.api_query(params)
  83. self._load_namespaces(result)
  84. elif all(attrs): # everything is already specified and we're not told
  85. return # to force a reload, so do nothing
  86. else: # we're only loading attributes other than _namespaces
  87. params["siprop"] = "general"
  88. result = self.api_query(params)
  89. res = result["query"]["general"]
  90. self._name = res["wikiid"]
  91. self._project = res["sitename"].lower()
  92. self._lang = res["lang"]
  93. self._base_url = res["server"]
  94. self._article_path = res["articlepath"]
  95. self._script_path = res["scriptpath"]
  96. def _load_namespaces(self, result):
  97. """Fill self._namespaces with a dict of namespace IDs and names.
  98. Called by _load_attributes() with API data as `result` when
  99. self._namespaces was not given as an kwarg to __init__().
  100. """
  101. self._namespaces = {}
  102. for namespace in result["query"]["namespaces"].values():
  103. ns_id = namespace["id"]
  104. name = namespace["*"]
  105. try:
  106. canonical = namespace["canonical"]
  107. except KeyError:
  108. self._namespaces[ns_id] = [name]
  109. else:
  110. if name != canonical:
  111. self._namespaces[ns_id] = [name, canonical]
  112. else:
  113. self._namespaces[ns_id] = [name]
  114. for namespace in result["query"]["namespacealiases"]:
  115. ns_id = namespace["id"]
  116. alias = namespace["*"]
  117. self._namespaces[ns_id].append(alias)
  118. def _get_cookie(self, name, domain):
  119. """Return the named cookie unless it is expired or doesn't exist."""
  120. for cookie in self._cookiejar:
  121. if cookie.name == name and cookie.domain == domain:
  122. if cookie.is_expired():
  123. break
  124. return cookie
  125. def _get_username_from_cookies(self):
  126. """Try to return our username based solely on cookies.
  127. First, we'll look for a cookie named self._name + "Token", like
  128. "enwikiToken". If it exists and isn't expired, we'll assume it's valid
  129. and try to return the value of the cookie self._name + "UserName" (like
  130. "enwikiUserName"). This should work fine on wikis without single-user
  131. login.
  132. If `enwikiToken` doesn't exist, we'll try to find a cookie named
  133. `centralauth_Token`. If this exists and is not expired, we'll try to
  134. return the value of `centralauth_User`.
  135. If we didn't get any matches, we'll return None. Our goal here isn't to
  136. return the most likely username, or what we *want* our username to be
  137. (for that, we'd do self._login_info[0]), but rather to get our current
  138. username without an unnecessary ?action=query&meta=userinfo API query.
  139. """
  140. domain = self.domain()
  141. name = ''.join((self._name, "Token"))
  142. cookie = self._get_cookie(name, domain)
  143. if cookie is not None:
  144. name = ''.join((self._name, "UserName"))
  145. user_name = self._get_cookie(name, domain)
  146. if user_name is not None:
  147. return user_name.value
  148. name = "centralauth_Token"
  149. for cookie in self._cookiejar:
  150. if cookie.domain_initial_dot is False or cookie.is_expired():
  151. continue
  152. if cookie.name != name:
  153. continue
  154. # build a regex that will match domains this cookie affects
  155. search = ''.join(("(.*?)", re_escape(cookie.domain)))
  156. if re_match(search, domain): # test it against our site
  157. user_name = self._get_cookie("centralauth_User", cookie.domain)
  158. if user_name is not None:
  159. return user_name.value
  160. def _get_username_from_api(self):
  161. """Do a simple API query to get our username and return it.
  162. This is a reliable way to make sure we are actually logged in, because
  163. it doesn't deal with annoying cookie logic, but it results in an API
  164. query that is unnecessary in some cases.
  165. Called by _get_username() (in turn called by get_user() with no
  166. username argument) when cookie lookup fails, probably indicating that
  167. we are logged out.
  168. """
  169. params = {"action": "query", "meta": "userinfo"}
  170. result = self.api_query(params)
  171. return result["query"]["userinfo"]["name"]
  172. def _get_username(self):
  173. """Return the name of the current user, whether logged in or not.
  174. First, we'll try to deduce it solely from cookies, to avoid an
  175. unnecessary API query. For the cookie-detection method, see
  176. _get_username_from_cookies()'s docs.
  177. If our username isn't in cookies, then we're probably not logged in, or
  178. something fishy is going on (like forced logout). In this case, do a
  179. single API query for our username (or IP address) and return that.
  180. """
  181. name = self._get_username_from_cookies()
  182. if name is not None:
  183. return name
  184. return self._get_username_from_api()
  185. def _save_cookiejar(self):
  186. """Try to save our cookiejar after doing a (normal) login or logout.
  187. Calls the standard .save() method with no filename. Don't fret if our
  188. cookiejar doesn't support saving (CookieJar raises AttributeError,
  189. FileCookieJar raises NotImplementedError) or no default filename was
  190. given (LWPCookieJar and MozillaCookieJar raise ValueError).
  191. """
  192. try:
  193. self._cookiejar.save()
  194. except (AttributeError, NotImplementedError, ValueError):
  195. pass
  196. def _login(self, login, token=None, attempt=0):
  197. """Safely login through the API.
  198. Normally, this is called by __init__() if a username and password have
  199. been provided and no valid login cookies were found. The only other
  200. time it needs to be called is when those cookies expire, which is done
  201. automatically by api_query() if a query fails.
  202. Recent versions of MediaWiki's API have fixed a CSRF vulnerability,
  203. requiring login to be done in two separate requests. If the response
  204. from from our initial request is "NeedToken", we'll do another one with
  205. the token. If login is successful, we'll try to save our cookiejar.
  206. Raises LoginError on login errors (duh), like bad passwords and
  207. nonexistent usernames.
  208. `login` is a (username, password) tuple. `token` is the token returned
  209. from our first request, and `attempt` is to prevent getting stuck in a
  210. loop if MediaWiki isn't acting right.
  211. """
  212. name, password = login
  213. params = {"action": "login", "lgname": name, "lgpassword": password}
  214. if token is not None:
  215. params["lgtoken"] = token
  216. result = self.api_query(params)
  217. res = result["login"]["result"]
  218. if res == "Success":
  219. self._save_cookiejar()
  220. elif res == "NeedToken" and attempt == 0:
  221. token = result["login"]["token"]
  222. return self._login(login, token, attempt=1)
  223. else:
  224. if res == "Illegal":
  225. e = "The provided username is illegal."
  226. elif res == "NotExists":
  227. e = "The provided username does not exist."
  228. elif res == "EmptyPass":
  229. e = "No password was given."
  230. elif res == "WrongPass" or res == "WrongPluginPass":
  231. e = "The given password is incorrect."
  232. else:
  233. e = "Couldn't login; server says '{0}'.".format(res)
  234. raise LoginError(e)
  235. def _logout(self):
  236. """Safely logout through the API.
  237. We'll do a simple API request (api.php?action=logout), clear our
  238. cookiejar (which probably contains now-invalidated cookies) and try to
  239. save it, if it supports that sort of thing.
  240. """
  241. params = {"action": "logout"}
  242. self.api_query(params)
  243. self._cookiejar.clear()
  244. self._save_cookiejar()
  245. def api_query(self, params):
  246. """Do an API query with `params` as a dict of parameters.
  247. This will first attempt to construct an API url from self._base_url and
  248. self._script_path. We need both of these, or else we'll raise
  249. SiteAPIError.
  250. We'll encode the given params, adding format=json along the way, and
  251. make the request through self._opener, which has built-in cookie
  252. support via self._cookiejar, a User-Agent
  253. (wiki.tools.constants.USER_AGENT), and Accept-Encoding set to "gzip".
  254. Assuming everything went well, we'll gunzip the data (if compressed),
  255. load it as a JSON object, and return it.
  256. If our request failed, we'll raise SiteAPIError with details.
  257. There's helpful MediaWiki API documentation at
  258. <http://www.mediawiki.org/wiki/API>.
  259. """
  260. if self._base_url is None or self._script_path is None:
  261. e = "Tried to do an API query, but no API URL is known."
  262. raise SiteAPIError(e)
  263. url = ''.join((self._base_url, self._script_path, "/api.php"))
  264. params["format"] = "json" # this is the only format we understand
  265. data = urlencode(params)
  266. print url, data # debug code
  267. try:
  268. response = self._opener.open(url, data)
  269. except URLError as error:
  270. if hasattr(error, "reason"):
  271. e = "API query at {0} failed because {1}."
  272. e = e.format(error.geturl, error.reason)
  273. elif hasattr(error, "code"):
  274. e = "API query at {0} failed; got an error code of {1}."
  275. e = e.format(error.geturl, error.code)
  276. else:
  277. e = "API query failed."
  278. raise SiteAPIError(e)
  279. else:
  280. result = response.read()
  281. if response.headers.get("Content-Encoding") == "gzip":
  282. stream = StringIO(result)
  283. gzipper = GzipFile(fileobj=stream)
  284. result = gzipper.read()
  285. return loads(result) # parse as a JSON object
  286. def name(self):
  287. """Returns the Site's name (or "wikiid" in the API), like "enwiki"."""
  288. return self._name
  289. def project(self):
  290. """Returns the Site's project name in lowercase, like "wikipedia"."""
  291. return self._project
  292. def lang(self):
  293. """Returns the Site's language, like "en" or "es"."""
  294. return self._lang
  295. def domain(self):
  296. """Returns the Site's web domain, like "en.wikipedia.org"."""
  297. return urlparse(self._base_url).netloc
  298. def namespace_id_to_name(self, ns_id, all=False):
  299. """Given a namespace ID, returns associated namespace names.
  300. If all is False (default), we'll return the first name in the list,
  301. which is usually the localized version. Otherwise, we'll return the
  302. entire list, which includes the canonical name.
  303. For example, returns u"Wikipedia" if ns_id=4 and all=False on enwiki;
  304. returns [u"Wikipedia", u"Project"] if ns_id=4 and all=True.
  305. Raises NamespaceNotFoundError if the ID is not found.
  306. """
  307. try:
  308. if all:
  309. return self._namespaces[ns_id]
  310. else:
  311. return self._namespaces[ns_id][0]
  312. except KeyError:
  313. e = "There is no namespace with id {0}.".format(ns_id)
  314. raise NamespaceNotFoundError(e)
  315. def namespace_name_to_id(self, name):
  316. """Given a namespace name, returns the associated ID.
  317. Like namespace_id_to_name(), but reversed. Case is ignored, because
  318. namespaces are assumed to be case-insensitive.
  319. Raises NamespaceNotFoundError if the name is not found.
  320. """
  321. lname = name.lower()
  322. for ns_id, names in self._namespaces.items():
  323. lnames = [n.lower() for n in names] # be case-insensitive
  324. if lname in lnames:
  325. return ns_id
  326. e = "There is no namespace with name '{0}'.".format(name)
  327. raise NamespaceNotFoundError(e)
  328. def get_page(self, pagename):
  329. """Returns a Page object for the given pagename.
  330. Will return a Category object instead if the given pagename is in the
  331. category namespace. As Category is a subclass of Page, this should not
  332. cause problems.
  333. Note that this doesn't do any checks for existence or
  334. redirect-following - Page's methods provide that.
  335. """
  336. prefixes = self.namespace_id_to_name(NS_CATEGORY, all=True)
  337. prefix = pagename.split(":", 1)[0]
  338. if prefix != pagename: # avoid a page that is simply "Category"
  339. if prefix in prefixes:
  340. return Category(self, pagename)
  341. return Page(self, pagename)
  342. def get_category(self, catname):
  343. """Returns a Category object for the given category name.
  344. `catname` should be given *without* a namespace prefix. This method is
  345. really just shorthand for get_page("Category:" + catname).
  346. """
  347. prefix = self.namespace_id_to_name(NS_CATEGORY)
  348. pagename = "{0}:{1}".format(prefix, catname)
  349. return Category(self, pagename)
  350. def get_user(self, username=None):
  351. """Returns a User object for the given username.
  352. If `username` is left as None, then a User object representing the
  353. currently logged-in (or anonymous!) user is returned.
  354. """
  355. if username is None:
  356. username = self._get_username()
  357. return User(self, username)