diff --git a/earwigbot/tasks/wikiproject_tagger.py b/earwigbot/tasks/wikiproject_tagger.py index eb16e05..92473ed 100644 --- a/earwigbot/tasks/wikiproject_tagger.py +++ b/earwigbot/tasks/wikiproject_tagger.py @@ -47,7 +47,7 @@ class WikiProjectTagger(Task): current directory) ``--summary SUM`` an optional edit summary to use; defaults to - ``"Adding {{BANNER}} to article talk page."`` + ``"Adding WikiProject banner {{BANNER}}."`` ``--append TEXT`` optional text to append to the banner (after an autoassessment, if any), like ``|importance=low`` @@ -106,25 +106,21 @@ class WikiProjectTagger(Task): return banner = kwargs["banner"] - summary = kwargs.get("summary", "Adding $3 to article talk page.") + summary = kwargs.get("summary", "Adding WikiProject banner $3.") append = kwargs.get("append") autoassess = kwargs.get("autoassess", False) nocreate = kwargs.get("nocreate", False) recursive = kwargs.get("recursive", 0) + banner, names = self.get_names(site, banner) + if not names: + return + job = _Job(banner, names, summary, append, autoassess, nocreate) site = self.bot.wiki.get_site(name=kwargs.get("site")) if "category" in kwargs: title = kwargs["category"] - prefix = name.split(":", 1)[0] - ns_cat = site.namespace_id_to_name(constants.NS_CATEGORY) - if prefix == title: - title = u":".join((ns_cat, title)) - else: - try: - site.namespace_name_to_id(prefix) - except exceptions.NamespaceNotFoundError: - title = u":".join((ns_cat, title)) - self.process_category(title, recursive) + title = self.guess_namespace(title, constants.NS_CATEGORY) + self.process_category(site.get_page(title), job, recursive) if "file" in kwargs: with open(kwargs["file"], "r") as fileobj: @@ -135,19 +131,86 @@ class WikiProjectTagger(Task): line = line[2:-2] page = site.get_page(line) if page.namespace == constants.NS_CATEGORY: - self.process_category(page, recursive) + self.process_category(page, job, recursive) else: - self.process_page(page) - - def process_category(self, page, recursive): + self.process_page(page, job) + + def guess_namespace(self, title, assumed): + prefix = title.split(":", 1)[0] + if prefix == title: + return u":".join((site.namespace_id_to_name(assumed), title)) + try: + site.namespace_name_to_id(prefix) + except exceptions.NamespaceNotFoundError: + return u":".join((site.namespace_id_to_name(assumed), title)) + return title + + def get_names(self, site, banner): + title = self.guess_namespace(banner, constants.NS_TEMPLATE) + if title == banner: + banner = banner.split(":", 1)[1] + page = site.get_page(title) + if page.exists != page.PAGE_EXISTS: + self.logger.error("Banner [[{0}]] does not exist".format(title)) + return banner, None + + names = [banner] if banner == title else [banner, title] + result = site.api_query(action="query", list="backlinks", bllimit=500, + blfilterredir="redirects", bltitle=title) + for backlink in result["query"]["backlinks"]: + names.append(backlink["title"]) + if backlink["ns"] == constants.NS_TEMPLATE: + names.append(backlink["title"].split(":", 1)[1]) + + log = "Found {0} aliases for banner [[{1}]]".format(len(names), title) + self.logger.debug(log) + return banner, names + + def process_category(self, page, job, recursive): + self.logger.info("Processing category: [[{0]]".format(page.title)) for member in page.get_members(): if member.namespace == constants.NS_CATEGORY: if recursive is True: - self.process_category(member, True) + self.process_category(member, job, True) elif recursive: - self.process_category(member, recursive - 1) + self.process_category(member, job, recursive - 1) else: - self.process_page(member) + self.process_page(member, job) + + def process_page(self, page, job): + if not page.is_talkpage: + page = page.toggle_talk() + try: + code = page.parse() + except exceptions.PageNotFoundError: + if job.nocreate: + log = "Skipping nonexistent page: [[{0}]]".format(page.title) + self.logger.info(log) + else: + log = "Tagging new page: [[{0}]]".format(page.title) + self.logger.info(log) + banner = "{{" + job.banner + job.append + "}}" + summary = job.summary.replace("$3", banner) + page.edit(banner, self.make_summary(summary)) + return + except exceptions.InvalidPageError: + log = u"Skipping invalid page: [[{0}]]".format(page.title) + self.logger.error(log) + return + + raise NotImplementedError() + + text = unicode(code) + if page.get() != text: + summary = job.summary.replace("$3", banner) + page.edit(text, self.make_summary(summary)) + - def process_page(self, page): - raise NotImplementedError(page) +class _Job(object): + def __init__(self, banner, names, summary, append, autoassess, nocreate): + self.banner = banner + self.names = names + self.summary = summary + self.append = append + self.autoassess = autoassess + self.nocreate = nocreate