@@ -75,6 +75,55 @@ class Site(object):
if logged_in_as is None or name != logged_in_as:
if logged_in_as is None or name != logged_in_as:
self._login(login)
self._login(login)
def _api_query(self, params):
"""Do an API query with `params` as a dict of parameters.
This will first attempt to construct an API url from self._base_url and
self._script_path. We need both of these, or else we'll raise
SiteAPIError.
We'll encode the given params, adding format=json along the way, and
make the request through self._opener, which has built-in cookie
support via self._cookiejar, a User-Agent
(wiki.tools.constants.USER_AGENT), and Accept-Encoding set to "gzip".
Assuming everything went well, we'll gunzip the data (if compressed),
load it as a JSON object, and return it.
If our request failed, we'll raise SiteAPIError with details.
There's helpful MediaWiki API documentation at
<http://www.mediawiki.org/wiki/API>.
"""
if self._base_url is None or self._script_path is None:
e = "Tried to do an API query, but no API URL is known."
raise SiteAPIError(e)
url = ''.join((self._base_url, self._script_path, "/api.php"))
params["format"] = "json" # this is the only format we understand
data = urlencode(params)
print url, data # debug code
try:
response = self._opener.open(url, data)
except URLError as error:
if hasattr(error, "reason"):
e = "API query at {0} failed because {1}."
e = e.format(error.geturl, error.reason)
elif hasattr(error, "code"):
e = "API query at {0} failed; got an error code of {1}."
e = e.format(error.geturl, error.code)
else:
e = "API query failed."
raise SiteAPIError(e)
else:
result = response.read()
if response.headers.get("Content-Encoding") == "gzip":
stream = StringIO(result)
gzipper = GzipFile(fileobj=stream)
result = gzipper.read()
return loads(result) # parse as a JSON object
def _load_attributes(self, force=False):
def _load_attributes(self, force=False):
"""Load data about our Site from the API.
"""Load data about our Site from the API.
@@ -94,13 +143,13 @@ class Site(object):
if self._namespaces is None or force:
if self._namespaces is None or force:
params["siprop"] = "general|namespaces|namespacealiases"
params["siprop"] = "general|namespaces|namespacealiases"
result = self.api_query(params)
result = self._ api_query(params)
self._load_namespaces(result)
self._load_namespaces(result)
elif all(attrs): # everything is already specified and we're not told
elif all(attrs): # everything is already specified and we're not told
return # to force a reload, so do nothing
return # to force a reload, so do nothing
else: # we're only loading attributes other than _namespaces
else: # we're only loading attributes other than _namespaces
params["siprop"] = "general"
params["siprop"] = "general"
result = self.api_query(params)
result = self._ api_query(params)
res = result["query"]["general"]
res = result["query"]["general"]
self._name = res["wikiid"]
self._name = res["wikiid"]
@@ -197,7 +246,7 @@ class Site(object):
we are logged out.
we are logged out.
"""
"""
params = {"action": "query", "meta": "userinfo"}
params = {"action": "query", "meta": "userinfo"}
result = self.api_query(params)
result = self._ api_query(params)
return result["query"]["userinfo"]["name"]
return result["query"]["userinfo"]["name"]
def _get_username(self):
def _get_username(self):
@@ -253,7 +302,7 @@ class Site(object):
params = {"action": "login", "lgname": name, "lgpassword": password}
params = {"action": "login", "lgname": name, "lgpassword": password}
if token is not None:
if token is not None:
params["lgtoken"] = token
params["lgtoken"] = token
result = self.api_query(params)
result = self._ api_query(params)
res = result["login"]["result"]
res = result["login"]["result"]
if res == "Success":
if res == "Success":
@@ -282,58 +331,16 @@ class Site(object):
save it, if it supports that sort of thing.
save it, if it supports that sort of thing.
"""
"""
params = {"action": "logout"}
params = {"action": "logout"}
self.api_query(params)
self._ api_query(params)
self._cookiejar.clear()
self._cookiejar.clear()
self._save_cookiejar()
self._save_cookiejar()
def api_query(self, params):
"""Do an API query with `params` as a dict of parameters.
This will first attempt to construct an API url from self._base_url and
self._script_path. We need both of these, or else we'll raise
SiteAPIError.
def api_query(self, **kwargs):
"""Do an API query with `kwargs` as the parameters.
We'll encode the given params, adding format=json along the way, and
make the request through self._opener, which has built-in cookie
support via self._cookiejar, a User-Agent
(wiki.tools.constants.USER_AGENT), and Accept-Encoding set to "gzip".
Assuming everything went well, we'll gunzip the data (if compressed),
load it as a JSON object, and return it.
If our request failed, we'll raise SiteAPIError with details.
There's helpful MediaWiki API documentation at
<http://www.mediawiki.org/wiki/API>.
See _api_query()'s documentation for details.
"""
"""
if self._base_url is None or self._script_path is None:
e = "Tried to do an API query, but no API URL is known."
raise SiteAPIError(e)
url = ''.join((self._base_url, self._script_path, "/api.php"))
params["format"] = "json" # this is the only format we understand
data = urlencode(params)
print url, data # debug code
try:
response = self._opener.open(url, data)
except URLError as error:
if hasattr(error, "reason"):
e = "API query at {0} failed because {1}."
e = e.format(error.geturl, error.reason)
elif hasattr(error, "code"):
e = "API query at {0} failed; got an error code of {1}."
e = e.format(error.geturl, error.code)
else:
e = "API query failed."
raise SiteAPIError(e)
else:
result = response.read()
if response.headers.get("Content-Encoding") == "gzip":
stream = StringIO(result)
gzipper = GzipFile(fileobj=stream)
result = gzipper.read()
return loads(result) # parse as a JSON object
return self._api_query(kwargs)
def name(self):
def name(self):
"""Returns the Site's name (or "wikiid" in the API), like "enwiki"."""
"""Returns the Site's name (or "wikiid" in the API), like "enwiki"."""
@@ -389,32 +396,32 @@ class Site(object):
e = "There is no namespace with name '{0}'.".format(name)
e = "There is no namespace with name '{0}'.".format(name)
raise NamespaceNotFoundError(e)
raise NamespaceNotFoundError(e)
def get_page(self, pagenam e):
"""Returns a Page object for the given pagename.
def get_page(self, title, follow_redirects=Fals e):
"""Returns a Page object for the given title ( pagename) .
Will return a Category object instead if the given pagenam e is in the
Will return a Category object instead if the given titl e is in the
category namespace. As Category is a subclass of Page, this should not
category namespace. As Category is a subclass of Page, this should not
cause problems.
cause problems.
Note that this doesn't do any checks for existence or
Note that this doesn't do any direct checks for existence or
redirect-following - Page's methods provide that.
redirect-following - Page's methods provide that.
"""
"""
prefixes = self.namespace_id_to_name(NS_CATEGORY, all=True)
prefixes = self.namespace_id_to_name(NS_CATEGORY, all=True)
prefix = pagenam e.split(":", 1)[0]
if prefix != pagenam e: # avoid a page that is simply "Category"
prefix = titl e.split(":", 1)[0]
if prefix != titl e: # avoid a page that is simply "Category"
if prefix in prefixes:
if prefix in prefixes:
return Category(self, pagename )
return Page(self, pagename )
return Category(self, title, follow_redirects )
return Page(self, title, follow_redirects )
def get_category(self, catname):
def get_category(self, catname, follow_redirects=False ):
"""Returns a Category object for the given category name.
"""Returns a Category object for the given category name.
`catname` should be given *without* a namespace prefix. This method is
`catname` should be given *without* a namespace prefix. This method is
really just shorthand for get_page("Category:" + catname).
really just shorthand for get_page("Category:" + catname).
"""
"""
prefix = self.namespace_id_to_name(NS_CATEGORY)
prefix = self.namespace_id_to_name(NS_CATEGORY)
pagename = "{0}:{1}".format(prefix, catname )
return Category(self, pagename)
pagename = ':'.join((prefix, catname) )
return Category(self, pagename, follow_redirects )
def get_user(self, username=None):
def get_user(self, username=None):
"""Returns a User object for the given username.
"""Returns a User object for the given username.