From 9e3c105b0ff027c090ffbac8e658409769a25c7f Mon Sep 17 00:00:00 2001 From: Ben Kurtovic Date: Tue, 19 Apr 2011 01:29:50 -0400 Subject: [PATCH] converting commands to use new BaseCommand class --- irc/commands/chanops.py | 36 +++--- irc/commands/git.py | 294 ++++++++++++++++++++++++------------------------ irc/commands/help.py | 72 ++++++------ irc/commands/link.py | 87 +++++++------- irc/commands/test.py | 28 +++-- 5 files changed, 271 insertions(+), 246 deletions(-) diff --git a/irc/commands/chanops.py b/irc/commands/chanops.py index 7c22ecc..e620fb2 100644 --- a/irc/commands/chanops.py +++ b/irc/commands/chanops.py @@ -1,24 +1,32 @@ # -*- coding: utf-8 -*- -"""Voice/devoice/op/deop users in the channel.""" +# Voice/devoice/op/deop users in the channel. +from irc.base_command import BaseCommand from config.irc_config import * -connection, data = None, None +class ChanOps(BaseCommand): + def get_hook(self): + return "msg" -def call(c, d): - global connection, data - connection, data = c, d + def get_help(self, command): + action = command.capitalize() + return "%s users in the channel." % action - if data.host not in ADMINS: - connection.reply(data.chan, data.nick, "you must be a bot admin to use this command.") - return + def check(self, data): + if data.is_command and (data.command == "voice" or data.command == "devoice" + or data.command == "op" or data.command == "deop"): + return True + return False - if not data.args: # if it is just !op/!devoice/whatever without arguments, assume they want to do this to themselves - target = data.nick - else: - target = data.args[0] + def process(self, data): + if data.host not in ADMINS: + self.connection.reply(data, "you must be a bot admin to use this command.") + return - action = data.command[1:] # strip ! at the beginning of the command + if not data.args: # if it is just !op/!devoice/whatever without arguments, assume they want to do this to themselves + target = data.nick + else: + target = data.args[0] - connection.say("ChanServ", "%s %s %s" % (action, data.chan, target)) + self.connection.say("ChanServ", "%s %s %s" % (data.command, data.chan, target)) diff --git a/irc/commands/git.py b/irc/commands/git.py index ab17b93..df439cb 100644 --- a/irc/commands/git.py +++ b/irc/commands/git.py @@ -1,152 +1,154 @@ # -*- coding: utf-8 -*- -"""Commands to interface with the bot's git repository; use '!git help' for sub-command list.""" +# Commands to interface with the bot's git repository; use '!git help' for sub-command list. import shlex, subprocess, re -from config.irc_config import * -connection, data = None, None - -def call(c, d): - global connection, data - connection, data = c, d - - if data.host not in OWNERS: - connection.reply(data.chan, data.nick, "you must be a bot owner to use this command.") - return - - if not data.args: - connection.reply(data.chan, data.nick, "no arguments provided.") - return - - if data.args[0] == "help": - do_help() - - elif data.args[0] == "branch": - do_branch() - - elif data.args[0] == "branches": - do_branches() - - elif data.args[0] == "checkout": - do_checkout() - - elif data.args[0] == "delete": - do_delete() - - elif data.args[0] == "pull": - do_pull() - - elif data.args[0] == "status": - do_status() - - else: # they asked us to do something we don't know - connection.reply(data.chan, data.nick, "unknown argument: \x0303%s\x0301." % data.args[0]) - -def exec_shell(command): - """execute a shell command and get the output""" - command = shlex.split(command) - result = subprocess.check_output(command, stderr=subprocess.STDOUT) - return result - -def do_help(): - """display all commands""" - help = "" - - help_dict = { - "branch": "get current branch", - "branches": "get all branches", - "checkout": "switch branches", - "delete": "delete an old branch", - "pull": "update everything from the remote server", - "status": "check if we are up-to-date", - } - - keys = help_dict.keys() - keys.sort() - for key in keys: - help += "\x0303%s\x0301 (%s), " % (key, help_dict[key]) - help = help[:-2] # trim last comma - - connection.reply(data.chan, data.nick, "sub-commands are: %s." % help) - -def do_branch(): - """get our current branch""" - branch = exec_shell("git name-rev --name-only HEAD") - branch = branch[:-1] # strip newline - - connection.reply(data.chan, data.nick, "currently on branch \x0302%s\x0301." % branch) - -def do_branches(): - """get list of branches""" - branches = exec_shell("git branch") - - branches = branches[:-1] # strip newline - branches = branches.replace('\n* ', ', ') # cleanup extraneous characters - branches = branches.replace('* ', ' ') - branches = branches.replace('\n ', ', ') - branches = branches.strip() - - connection.reply(data.chan, data.nick, "branches: \x0302%s\x0301." % branches) - -def do_checkout(): - """switch branches""" - try: - branch = data.args[1] - except IndexError: # no branch name provided - connection.reply(data.chan, data.nick, "switch to which branch?") - return - - try: - result = exec_shell("git checkout %s" % branch) - if "Already on" in result: - connection.reply(data.chan, data.nick, "already on \x0302%s\x0301!" % branch) +from config.irc_config import * +from irc.base_command import BaseCommand + +class Git(BaseCommand): + def get_hook(self): + return "msg" + + def get_help(self, command): + return "Commands to interface with the bot's git repository; use '!git help' for sub-command list." + + def check(self, data): + if data.is_command and data.command == "git": + return True + return False + + def process(self, data): + self.data = data + if data.host not in OWNERS: + self.connection.reply(data, "you must be a bot owner to use this command.") + return + + if not data.args: + self.connection.reply(data, "no arguments provided. Maybe you wanted '!git help'?") + return + + if data.args[0] == "help": + self.do_help() + + elif data.args[0] == "branch": + self.do_branch() + + elif data.args[0] == "branches": + self.do_branches() + + elif data.args[0] == "checkout": + self.do_checkout() + + elif data.args[0] == "delete": + self.do_delete() + + elif data.args[0] == "pull": + self.do_pull() + + elif data.args[0] == "status": + self.do_status() + + else: # they asked us to do something we don't know + self.connection.reply(data, "unknown argument: \x0303%s\x0301." % data.args[0]) + + def exec_shell(self, command): + """execute a shell command and get the output""" + command = shlex.split(command) + result = subprocess.check_output(command, stderr=subprocess.STDOUT) + if result: + result = result[:-1] # strip newline + return result + + def do_help(self): + """display all commands""" + help_dict = { + "branch": "get current branch", + "branches": "get all branches", + "checkout": "switch branches", + "delete": "delete an old branch", + "pull": "update everything from the remote server", + "status": "check if we are up-to-date", + } + keys = help_dict.keys() + keys.sort() + help = "" + for key in keys: + help += "\x0303%s\x0301 (%s), " % (key, help_dict[key]) + help = help[:-2] # trim last comma and space + self.connection.reply(self.data, "sub-commands are: %s." % help) + + def do_branch(self): + """get our current branch""" + branch = self.exec_shell("git name-rev --name-only HEAD") + self.connection.reply(self.data, "currently on branch \x0302%s\x0301." % branch) + + def do_branches(self): + """get list of branches""" + branches = self.exec_shell("git branch") + branches = branches.replace('\n* ', ', ') # cleanup extraneous characters + branches = branches.replace('* ', ' ') + branches = branches.replace('\n ', ', ') + branches = branches.strip() + self.connection.reply(self.data, "branches: \x0302%s\x0301." % branches) + + def do_checkout(self): + """switch branches""" + try: + branch = self.data.args[1] + except IndexError: # no branch name provided + self.connection.reply(self.data, "switch to which branch?") + return + + try: + result = self.exec_shell("git checkout %s" % branch) + if "Already on" in result: + self.connection.reply(self.data, "already on \x0302%s\x0301!" % branch) + else: + self.connection.reply(self.data, "switched to branch \x0302%s\x0301." % branch) + + except subprocess.CalledProcessError: # git couldn't switch branches + self.connection.reply(self.data, "branch \x0302%s\x0301 doesn't exist!" % branch) + + def do_delete(self): + """delete a branch, while making sure that we are not on it""" + try: + delete_branch = self.data.args[1] + except IndexError: # no branch name provided + self.connection.reply(self.data, "delete which branch?") + return + + current_branch = self.exec_shell("git name-rev --name-only HEAD") + + if current_branch == delete_branch: + self.connection.reply(self.data, "you're currently on this branch; please checkout to a different branch before deleting.") + return + + try: + self.exec_shell("git branch -d %s" % delete_branch) + self.connection.reply(self.data, "branch \x0302%s\x0301 has been deleted locally." % delete_branch) + except subprocess.CalledProcessError: # git couldn't delete + self.connection.reply(self.data, "branch \x0302%s\x0301 doesn't exist!" % delete_branch) + + def do_pull(self): + """pull from remote repository""" + branch = self.exec_shell("git name-rev --name-only HEAD") + self.connection.reply(self.data, "pulling from remote (currently on \x0302%s\x0301)..." % branch) + + result = self.exec_shell("git pull") + + if "Already up-to-date." in result: + self.connection.reply(self.data, "done; no new changes.") + else: + changes = re.findall("\s*((.*?)\sfile(.*?)tions?\(-\))", result)[0][0] # find the changes + self.connection.reply(self.data, "done; %s." % changes) + + def do_status(self): + """check whether we have anything to pull""" + self.connection.reply(self.data, "checking remote for updates...") + result = self.exec_shell("git fetch --dry-run") + if not result: + self.connection.reply(self.data, "local copy is up-to-date with remote.") else: - connection.reply(data.chan, data.nick, "switched to branch \x0302%s\x0301." % branch) - - except subprocess.CalledProcessError: # git couldn't switch branches - connection.reply(data.chan, data.nick, "branch \x0302%s\x0301 doesn't exist!" % branch) - -def do_delete(): - """delete a branch, while making sure that we are not on it""" - try: - delete_branch = data.args[1] - except IndexError: # no branch name provided - connection.reply(data.chan, data.nick, "delete which branch?") - return - - current_branch = exec_shell("git name-rev --name-only HEAD") - current_branch = current_branch[:-1] # strip newline - - if current_branch == delete_branch: - connection.reply(data.chan, data.nick, "you're currently on this branch; please checkout to a different branch before deleting.") - return - - try: - exec_shell("git branch -d %s" % delete_branch) - connection.reply(data.chan, data.nick, "branch \x0302%s\x0301 has been deleted locally." % delete_branch) - except subprocess.CalledProcessError: # git couldn't delete - connection.reply(data.chan, data.nick, "branch \x0302%s\x0301 doesn't exist!" % delete_branch) - -def do_pull(): - """pull from remote repository""" - branch = exec_shell("git name-rev --name-only HEAD") - branch = branch[:-1] # strip newline - connection.reply(data.chan, data.nick, "pulling from remote (currently on \x0302%s\x0301)..." % branch) - - result = exec_shell("git pull") - - if "Already up-to-date." in result: - connection.reply(data.chan, data.nick, "done; no new changes.") - else: - changes = re.findall("\s*((.*?)\sfile(.*?)tions?\(-\))", result)[0][0] # find the changes - connection.reply(data.chan, data.nick, "done; %s." % changes) - -def do_status(): - """check whether we have anything to pull""" - connection.reply(data.chan, data.nick, "checking remote for updates...") - result = exec_shell("git fetch --dry-run") - if not result: - connection.reply(data.chan, data.nick, "local copy is up-to-date with remote.") - else: - connection.reply(data.chan, data.nick, "remote is ahead of local copy.") + self.connection.reply(self.data, "remote is ahead of local copy.") diff --git a/irc/commands/help.py b/irc/commands/help.py index e32454f..d4c0907 100644 --- a/irc/commands/help.py +++ b/irc/commands/help.py @@ -1,48 +1,46 @@ # -*- coding: utf-8 -*- -"""Generates help information.""" +# Generates help information. -connection, data = None, None +from irc.base_command import BaseCommand +from irc.data import Data +from irc import triggers -def get_alias(key): - """connect command aliases with their file, e.g. so we know !voice corresponds to chanops.py""" - aliases = { - "voice": "chanops", - "devoice": "chanops", - "op": "chanops", - "deop": "chanops", - } - return aliases[key] +class Help(BaseCommand): + def get_hook(self): + return "msg" -def call(c, d): - global connection, data - connection, data = c, d + def get_help(self, command): + return "Generates help information." - if not data.args: - do_general_help() + def check(self, data): + if data.is_command and data.command == "help": + return True + return False - else: - do_command_help() + def process(self, data): + if not data.args: + self.do_general_help(data) + else: + self.do_command_help(data) -def do_general_help(): - connection.reply(data.chan, data.nick, "I am a bot! You can get help for any command by typing '!help '.") + def do_general_help(self, data): + self.connection.reply(data, "I am a bot! You can get help for any command by typing '!help '.") -def do_command_help(): - command = data.args[0] + def do_command_help(self, data): + command = data.args[0] + commands = triggers.get_commands() + + dummy = Data() # dummy message to test which command classes pick up this command + dummy.command = command + dummy.is_command = True + + for cmnd in commands: + if cmnd.check(dummy): + help = cmnd.get_help(command) + break - try: - exec "from irc.commands import %s as this_command" % command - except ImportError: # if we can't find it directly, this could be an alias for another command try: - cmd = get_alias(command) - except KeyError: - connection.reply(data.chan, data.nick, "command \x0303%s\x0301 not found!" % command) - return - exec "from irc.commands import %s as this_command" % cmd - - info = this_command.__doc__ - - if info: - connection.reply(data.chan, data.nick, "info for command \x0303%s\x0301: \"%s\"" % (command, info)) - else: - connection.reply(data.chan, data.nick, "sorry, no information for \x0303%s\x0301." % command) + self.connection.reply(data, "info for command \x0303%s\x0301: \"%s\"" % (command, help)) + except UnboundLocalError: + self.connection.reply(data, "sorry, no help for \x0303%s\x0301." % command) diff --git a/irc/commands/link.py b/irc/commands/link.py index 455a433..b3cd99a 100644 --- a/irc/commands/link.py +++ b/irc/commands/link.py @@ -1,54 +1,65 @@ # -*- coding: utf-8 -*- -"""Convert a Wikipedia page name into a URL.""" +# Convert a Wikipedia page name into a URL. import re -connection, data = None, None +from irc.base_command import BaseCommand -def call(c, d): - global connection, data - connection, data = c, d +class Link(BaseCommand): + def get_hook(self): + return "msg" - msg = data.msg + def get_help(self, command): + return "Convert a Wikipedia page name into a URL." - if re.search("(\[\[(.*?)\]\])|(\{\{(.*?)\}\})", msg): - links = parse_line(msg) - links = " , ".join(links) - connection.reply(data.chan, data.nick, links) + def check(self, data): + if ((data.is_command and data.command == "link") or + (("[[" in data.msg and "]]" in data.msg) or + ("{{" in data.msg and "}}" in data.msg))): + return True + return False - elif data.command == "!link": - if not data.args: - connection.reply(data.chan, data.nick, "what do you want me to link to?") - return - pagename = ' '.join(data.args) - link = parse_link(pagename) - connection.reply(data.chan, data.nick, link) + def process(self, data): + msg = data.msg -def parse_line(line): - results = list() + if re.search("(\[\[(.*?)\]\])|(\{\{(.*?)\}\})", msg): + links = self.parse_line(msg) + links = " , ".join(links) + self.connection.reply(data, links) - line = re.sub("\{\{\{(.*?)\}\}\}", "", line) # destroy {{{template parameters}}} + elif data.command == "link": + if not data.args: + self.connection.reply(data, "what do you want me to link to?") + return + pagename = ' '.join(data.args) + link = self.parse_link(pagename) + self.connection.reply(data, link) - links = re.findall("(\[\[(.*?)(\||\]\]))", line) # find all [[links]] - if links: - links = map(lambda x: x[1], links) # re.findall() returns a list of tuples, but we only want the 2nd item in each tuple - results.extend(map(parse_link, links)) + def parse_line(self, line): + results = list() - templates = re.findall("(\{\{(.*?)(\||\}\}))", line) # find all {{templates}} - if templates: - templates = map(lambda x: x[1], templates) - results.extend(map(parse_template, templates)) + line = re.sub("\{\{\{(.*?)\}\}\}", "", line) # destroy {{{template parameters}}} - return results + links = re.findall("(\[\[(.*?)(\||\]\]))", line) # find all [[links]] + if links: + links = map(lambda x: x[1], links) # re.findall() returns a list of tuples, but we only want the 2nd item in each tuple + results.extend(map(self.parse_link, links)) -def parse_link(pagename): - pagename = pagename.strip() - link = "http://en.wikipedia.org/wiki/" + pagename - link = link.replace(" ", "_") - return link + templates = re.findall("(\{\{(.*?)(\||\}\}))", line) # find all {{templates}} + if templates: + templates = map(lambda x: x[1], templates) + results.extend(map(self.parse_template, templates)) -def parse_template(pagename): - pagename = "Template:%s" % pagename # TODO: implement an actual namespace check - link = parse_link(pagename) - return link + return results + + def parse_link(self, pagename): + pagename = pagename.strip() + link = "http://en.wikipedia.org/wiki/" + pagename + link = link.replace(" ", "_") + return link + + def parse_template(self, pagename): + pagename = "Template:%s" % pagename # TODO: implement an actual namespace check + link = self.parse_link(pagename) + return link diff --git a/irc/commands/test.py b/irc/commands/test.py index 48421aa..64e821d 100644 --- a/irc/commands/test.py +++ b/irc/commands/test.py @@ -1,20 +1,26 @@ # -*- coding: utf-8 -*- -"""Test the bot!""" +# A very simple command to test the bot. import random -connection, data = None, None +from irc.base_command import BaseCommand -def call(c, d): - global connection, data - connection, data = c, d +class Test(BaseCommand): + def get_hook(self): + return "msg" - choices = ("say_hi()", "say_sup()") - exec random.choice(choices) + def get_help(self, command): + return "Test the bot!" -def say_hi(): - connection.say(data.chan, "Hey \x02%s\x0F!" % data.nick) + def check(self, data): + if data.is_command and data.command == "test": + return True + return False -def say_sup(): - connection.say(data.chan, "'sup \x02%s\x0F?" % data.nick) + def process(self, data): + hey = random.randint(0, 1) + if hey: + self.connection.say(data.chan, "Hey \x02%s\x0F!" % data.nick) + else: + self.connection.say(data.chan, "'sup \x02%s\x0F?" % data.nick)