Kaynağa Gözat

Implement guess_namespace(), get_names(), part of process_page().

tags/v0.1^2
Ben Kurtovic 11 yıl önce
ebeveyn
işleme
1799f2f568
1 değiştirilmiş dosya ile 84 ekleme ve 21 silme
  1. +84
    -21
      earwigbot/tasks/wikiproject_tagger.py

+ 84
- 21
earwigbot/tasks/wikiproject_tagger.py Dosyayı Görüntüle

@@ -47,7 +47,7 @@ class WikiProjectTagger(Task):
current directory)
``--summary SUM``
an optional edit summary to use; defaults to
``"Adding {{BANNER}} to article talk page."``
``"Adding WikiProject banner {{BANNER}}."``
``--append TEXT``
optional text to append to the banner (after an autoassessment, if
any), like ``|importance=low``
@@ -106,25 +106,21 @@ class WikiProjectTagger(Task):
return

banner = kwargs["banner"]
summary = kwargs.get("summary", "Adding $3 to article talk page.")
summary = kwargs.get("summary", "Adding WikiProject banner $3.")
append = kwargs.get("append")
autoassess = kwargs.get("autoassess", False)
nocreate = kwargs.get("nocreate", False)
recursive = kwargs.get("recursive", 0)
banner, names = self.get_names(site, banner)
if not names:
return
job = _Job(banner, names, summary, append, autoassess, nocreate)
site = self.bot.wiki.get_site(name=kwargs.get("site"))

if "category" in kwargs:
title = kwargs["category"]
prefix = name.split(":", 1)[0]
ns_cat = site.namespace_id_to_name(constants.NS_CATEGORY)
if prefix == title:
title = u":".join((ns_cat, title))
else:
try:
site.namespace_name_to_id(prefix)
except exceptions.NamespaceNotFoundError:
title = u":".join((ns_cat, title))
self.process_category(title, recursive)
title = self.guess_namespace(title, constants.NS_CATEGORY)
self.process_category(site.get_page(title), job, recursive)

if "file" in kwargs:
with open(kwargs["file"], "r") as fileobj:
@@ -135,19 +131,86 @@ class WikiProjectTagger(Task):
line = line[2:-2]
page = site.get_page(line)
if page.namespace == constants.NS_CATEGORY:
self.process_category(page, recursive)
self.process_category(page, job, recursive)
else:
self.process_page(page)

def process_category(self, page, recursive):
self.process_page(page, job)

def guess_namespace(self, title, assumed):
prefix = title.split(":", 1)[0]
if prefix == title:
return u":".join((site.namespace_id_to_name(assumed), title))
try:
site.namespace_name_to_id(prefix)
except exceptions.NamespaceNotFoundError:
return u":".join((site.namespace_id_to_name(assumed), title))
return title

def get_names(self, site, banner):
title = self.guess_namespace(banner, constants.NS_TEMPLATE)
if title == banner:
banner = banner.split(":", 1)[1]
page = site.get_page(title)
if page.exists != page.PAGE_EXISTS:
self.logger.error("Banner [[{0}]] does not exist".format(title))
return banner, None

names = [banner] if banner == title else [banner, title]
result = site.api_query(action="query", list="backlinks", bllimit=500,
blfilterredir="redirects", bltitle=title)
for backlink in result["query"]["backlinks"]:
names.append(backlink["title"])
if backlink["ns"] == constants.NS_TEMPLATE:
names.append(backlink["title"].split(":", 1)[1])

log = "Found {0} aliases for banner [[{1}]]".format(len(names), title)
self.logger.debug(log)
return banner, names

def process_category(self, page, job, recursive):
self.logger.info("Processing category: [[{0]]".format(page.title))
for member in page.get_members():
if member.namespace == constants.NS_CATEGORY:
if recursive is True:
self.process_category(member, True)
self.process_category(member, job, True)
elif recursive:
self.process_category(member, recursive - 1)
self.process_category(member, job, recursive - 1)
else:
self.process_page(member)
self.process_page(member, job)

def process_page(self, page, job):
if not page.is_talkpage:
page = page.toggle_talk()
try:
code = page.parse()
except exceptions.PageNotFoundError:
if job.nocreate:
log = "Skipping nonexistent page: [[{0}]]".format(page.title)
self.logger.info(log)
else:
log = "Tagging new page: [[{0}]]".format(page.title)
self.logger.info(log)
banner = "{{" + job.banner + job.append + "}}"
summary = job.summary.replace("$3", banner)
page.edit(banner, self.make_summary(summary))
return
except exceptions.InvalidPageError:
log = u"Skipping invalid page: [[{0}]]".format(page.title)
self.logger.error(log)
return

raise NotImplementedError()

text = unicode(code)
if page.get() != text:
summary = job.summary.replace("$3", banner)
page.edit(text, self.make_summary(summary))


def process_page(self, page):
raise NotImplementedError(page)
class _Job(object):
def __init__(self, banner, names, summary, append, autoassess, nocreate):
self.banner = banner
self.names = names
self.summary = summary
self.append = append
self.autoassess = autoassess
self.nocreate = nocreate

Yükleniyor…
İptal
Kaydet