diff --git a/tasks/synonym_authorities.py b/tasks/synonym_authorities.py new file mode 100644 index 0000000..387ef3c --- /dev/null +++ b/tasks/synonym_authorities.py @@ -0,0 +1,348 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2021 Ben Kurtovic +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +import difflib +import json +import re +import sqlite3 +import subprocess +import time + +import more_itertools +import mwparserfromhell +import unidecode + +from earwigbot.tasks import Task + +class SynonymAuthorities(Task): + """ + Correct synonym authorities in taxon articles created by Qbugbot. + """ + name = 'synonym_authorities' + summary = ( + 'Fix {changes} misordered synonym authorities ' + '([[Wikipedia:Bots/Requests for approval/EarwigBot 21|more info]])' + ) + + def setup(self): + self.site = self.bot.wiki.get_site() + self.creator = 'Qbugbot' + self.pages_path = 'qbugbot_pages.json' + self.synonyms_path = 'qbugbot_synonyms.json' + self.edits_path = 'qbugbot_edits.json' + self.itis_path = 'itis.db' + + def run(self, action=None): + if action == 'fetch_pages': + self.fetch_pages() + elif action == 'fetch_synonyms': + self.fetch_synonyms() + elif action == 'prepare_edits': + self.prepare_edits() + elif action == 'view_edits': + self.view_edits() + elif action == 'save_edits': + self.save_edits() + elif action is None: + raise RuntimeError(f'This task requires an action') + else: + raise RuntimeError(f'No such action: {action}') + + def fetch_pages(self): + """ + Fetch pages edited by Qbugbot. + """ + pages = {} + for chunk in more_itertools.chunked(self._iter_creations(), 500): + pages.update(self._fetch_chunk(chunk)) + + self.logger.info(f'Fetched {len(pages)} pages') + with open(self.pages_path, 'w') as fp: + json.dump(pages, fp) + + def _iter_creations(self): + params = { + 'action': 'query', + 'list': 'usercontribs', + 'ucuser': self.creator, + 'uclimit': 5000, + 'ucnamespace': 0, + 'ucprop': 'ids', + 'ucshow': 'new', + 'formatversion': 2, + } + + results = self.site.api_query(**params) + while contribs := results['query']['usercontribs']: + yield from contribs + if 'continue' not in results: + break + params.update(results['continue']) + results = self.site.api_query(**params) + + def _fetch_chunk(self, chunk): + result = self.site.api_query( + action='query', + prop='revisions', + rvprop='ids|content', + rvslots='main', + pageids='|'.join(str(page['pageid']) for page in chunk), + formatversion=2, + ) + + pages = result['query']['pages'] + assert len(pages) == len(chunk) + + return { + page['pageid']: { + 'title': page['title'], + 'content': page['revisions'][0]['slots']['main']['content'], + 'revid': page['revisions'][0]['revid'], + } + for page in pages + } + + def fetch_synonyms(self): + """ + Fetch correct synonym lists for pages generated by fetch_pages. + """ + with open(self.pages_path) as fp: + pages = json.load(fp) + wikidata = self.bot.wiki.get_site('wikidatawiki') + itis_property = 'P815' + conn = sqlite3.connect(self.itis_path) + cur = conn.cursor() + + synonyms = {} + for chunk in more_itertools.chunked(pages.items(), 50): + titles = {page['title']: pageid for pageid, page in chunk} + result = wikidata.api_query( + action='wbgetentities', + sites='enwiki', + titles='|'.join(titles), + props='claims|sitelinks', + languages='en', + sitefilter='enwiki', + ) + + for item in result['entities'].values(): + if 'sitelinks' not in item: + self.logger.warning(f'No sitelinks for item: {item}') + continue + title = item['sitelinks']['enwiki']['title'] + pageid = titles[title] + if itis_property not in item['claims']: + self.logger.warning(f'No ITIS ID for [[{title}]]') + continue + claims = item['claims'][itis_property] + assert len(claims) == 1, (title, claims) + itis_id = claims[0]['mainsnak']['datavalue']['value'] + + cur.execute(""" + SELECT synonym.complete_name, authors.taxon_author + FROM synonym_links sl + INNER JOIN taxonomic_units accepted ON sl.tsn_accepted = accepted.tsn + INNER JOIN taxonomic_units synonym ON sl.tsn = synonym.tsn + LEFT JOIN taxon_authors_lkp authors ON synonym.taxon_author_id = authors.taxon_author_id + WHERE sl.tsn_accepted = ? + UNION ALL + SELECT complete_name, taxon_author + FROM taxonomic_units accepted + LEFT JOIN taxon_authors_lkp authors USING (taxon_author_id) + WHERE accepted.tsn = ?; + """, (itis_id, itis_id)) + synonyms[pageid] = cur.fetchall() + + self.logger.info(f'Fetched {len(synonyms)} synonym lists') + with open(self.synonyms_path, 'w') as fp: + json.dump(synonyms, fp) + + def prepare_edits(self): + """ + Prepare edits based on the output of fetch_pages and fetch_synonyms. + """ + with open(self.pages_path) as fp: + pages = json.load(fp) + with open(self.synonyms_path) as fp: + synonyms = json.load(fp) + + edits = {} + for pageid, pageinfo in pages.items(): + if pageid not in synonyms: + continue + wikitext = mwparserfromhell.parse(pageinfo['content']) + try: + changes = self._update_synonyms(pageinfo['title'], wikitext, synonyms[pageid]) + if not changes: + continue + except Exception: + self.logger.error(f'Failed to update synonyms for [[{pageinfo["title"]}]]') + raise + edits[pageid] = { + 'title': pageinfo['title'], + 'revid': pageinfo['revid'], + 'original': pageinfo['content'], + 'content': str(wikitext), + 'changes': changes, + } + + with open(self.edits_path, 'w') as fp: + json.dump(edits, fp) + + def _update_synonyms(self, title, wikitext, synonyms): + if len(synonyms) <= 1: + return False + if wikitext.split('\n', 1)[0].upper().startswith('#REDIRECT'): + self.logger.debug(f'[[{title}]]: Skipping redirect') + return False + + taxoboxes = wikitext.filter_templates( + matches=lambda tmpl: tmpl.name.matches(('Speciesbox', 'Automatic taxobox'))) + if not taxoboxes: + self.logger.warning(f'[[{title}]]: No taxoboxes found') + return False + if len(taxoboxes) > 1: + self.logger.warning(f'[[{title}]]: Multiple taxoboxes found') + return False + + try: + syn_param = taxoboxes[0].get('synonyms') + except ValueError: + self.logger.debug(f'[[{title}]]: No synonyms parameter in taxobox') + return False + + tmpls = syn_param.value.filter_templates( + matches=lambda tmpl: tmpl.name.matches(('Species list', 'Taxon list'))) + if not tmpls: + # This means the bot's original work is no longer there. In most cases, this is + # an unrelated synonym list added by another editor and there is nothing to check, + # but it's possible someone converted the bot's list into a different format without + # checking the authorities. Those cases need to be manually checked. + self.logger.warning(f'[[{title}]]: Could not find a taxa list in taxobox') + return False + if len(tmpls) > 1: + self.logger.warning(f'[[{title}]]: Multiple taxa lists found in taxobox') + return False + + expected = {} + for taxon, author in synonyms: + if taxon in expected and expected[taxon] != author: + # These need to be manually reviewed + self.logger.warning(f'[[{title}]]: Expected synonym list has duplicates') + return False + expected[self._normalize(taxon)] = self._normalize(author) + + actual = {} + formatted_authors = {} + splist = tmpls[0] + for i in range(len(splist.params) // 2): + taxon_param, author_param = splist.params[2 * i], splist.params[2 * i + 1] + taxon = self._normalize(taxon_param.value) + author = self._normalize(author_param.value) + if taxon not in expected: + self.logger.warning(f'[[{title}]]: Unknown synonym {taxon!r}') + return False + actual[taxon] = author + formatted_authors.setdefault(author, []).append(author_param.value.strip()) + + expected = {taxon: author for taxon, author in expected.items() if taxon in actual} + assert set(expected.keys()) == set(actual.keys()) + if expected == actual: + self.logger.debug(f'[[{title}]]: Nothing to update') + return None + if list(expected.values()) != list(actual.values()): + if set(expected.values()) == set(actual.values()): + self.logger.warning(f'[[{title}]]: Actual authors are not in expected order') + else: + self.logger.warning(f'[[{title}]]: Actual authors do not match expected') + return False + + changes = [] + for i in range(len(splist.params) // 2): + taxon_param, author_param = splist.params[2 * i], splist.params[2 * i + 1] + taxon = self._normalize(taxon_param.value) + if expected[taxon] != actual[taxon]: + author = formatted_authors[expected[taxon]].pop(0) + match = re.match(r'^(\s*).*?(\s*)$', str(author_param.value)) + ws_before, ws_after = match.group(1), match.group(2) + author_param.value = f'{ws_before}{author}{ws_after}' + changes.append((taxon, actual[taxon], expected[taxon])) + + if changes: + self.logger.info(f'Will update {len(changes)} synonyms in [[{title}]]') + else: + self.logger.debug(f'Nothing to update in [[{title}]]') + return changes + + @staticmethod + def _normalize(value): + """ + Normalize a taxon or author name. + """ + if isinstance(value, mwparserfromhell.wikicode.Wikicode): + value = value.strip_code() + if not value or not value.strip(): + return None + return unidecode.unidecode(value.strip().casefold().replace('&', 'and').replace(',', '')) + + def view_edits(self): + """ + Examine edits prepared by prepare_edits. + """ + with open(self.edits_path) as fp: + edits = json.load(fp) + + self.logger.info(f'{len(edits)} pages to edit') + for pageid, edit in edits.items(): + print(f'\n{pageid}: {edit["title"]}:') + old, new = edit['original'], edit['content'] + + udiff = difflib.unified_diff(old.splitlines(), new.splitlines(), 'old', 'new') + subprocess.run( + ['delta', '-s', '--paging', 'never'], + input='\n'.join(udiff), text=True + ) + + def save_edits(self): + """ + Save edits prepared by prepare_edits. + """ + with open(self.edits_path) as fp: + edits = json.load(fp) + + self.logger.info(f'{len(edits)} pages to edit') + for pageid, edit in edits.items(): + page = self.site.get_page(edit['title']) + self.logger.info(f'\n{pageid}: [[{page.title}]]') + + if self.shutoff_enabled(page): + raise RuntimeError('Shutoff enabled') + if not page.check_exclusion(): + self.logger.warning(f'[[{page.title}]]: Bot excluded from editing') + return + + page.edit( + edit['content'], + summary=self.summary.format(changes=len(edit['changes'])), + baserevid=edit['revid'], + ) + time.sleep(10)