Przeglądaj źródła

converting commands to use new BaseCommand class

tags/v0.1
Ben Kurtovic 13 lat temu
rodzic
commit
9e3c105b0f
5 zmienionych plików z 271 dodań i 246 usunięć
  1. +22
    -14
      irc/commands/chanops.py
  2. +148
    -146
      irc/commands/git.py
  3. +35
    -37
      irc/commands/help.py
  4. +49
    -38
      irc/commands/link.py
  5. +17
    -11
      irc/commands/test.py

+ 22
- 14
irc/commands/chanops.py Wyświetl plik

@@ -1,24 +1,32 @@
# -*- coding: utf-8 -*-

"""Voice/devoice/op/deop users in the channel."""
# Voice/devoice/op/deop users in the channel.

from irc.base_command import BaseCommand
from config.irc_config import *

connection, data = None, None
class ChanOps(BaseCommand):
def get_hook(self):
return "msg"

def call(c, d):
global connection, data
connection, data = c, d
def get_help(self, command):
action = command.capitalize()
return "%s users in the channel." % action

if data.host not in ADMINS:
connection.reply(data.chan, data.nick, "you must be a bot admin to use this command.")
return
def check(self, data):
if data.is_command and (data.command == "voice" or data.command == "devoice"
or data.command == "op" or data.command == "deop"):
return True
return False

if not data.args: # if it is just !op/!devoice/whatever without arguments, assume they want to do this to themselves
target = data.nick
else:
target = data.args[0]
def process(self, data):
if data.host not in ADMINS:
self.connection.reply(data, "you must be a bot admin to use this command.")
return

action = data.command[1:] # strip ! at the beginning of the command
if not data.args: # if it is just !op/!devoice/whatever without arguments, assume they want to do this to themselves
target = data.nick
else:
target = data.args[0]

connection.say("ChanServ", "%s %s %s" % (action, data.chan, target))
self.connection.say("ChanServ", "%s %s %s" % (data.command, data.chan, target))

+ 148
- 146
irc/commands/git.py Wyświetl plik

@@ -1,152 +1,154 @@
# -*- coding: utf-8 -*-

"""Commands to interface with the bot's git repository; use '!git help' for sub-command list."""
# Commands to interface with the bot's git repository; use '!git help' for sub-command list.

import shlex, subprocess, re
from config.irc_config import *

connection, data = None, None

def call(c, d):
global connection, data
connection, data = c, d

if data.host not in OWNERS:
connection.reply(data.chan, data.nick, "you must be a bot owner to use this command.")
return
if not data.args:
connection.reply(data.chan, data.nick, "no arguments provided.")
return

if data.args[0] == "help":
do_help()

elif data.args[0] == "branch":
do_branch()

elif data.args[0] == "branches":
do_branches()

elif data.args[0] == "checkout":
do_checkout()

elif data.args[0] == "delete":
do_delete()

elif data.args[0] == "pull":
do_pull()

elif data.args[0] == "status":
do_status()

else: # they asked us to do something we don't know
connection.reply(data.chan, data.nick, "unknown argument: \x0303%s\x0301." % data.args[0])

def exec_shell(command):
"""execute a shell command and get the output"""
command = shlex.split(command)
result = subprocess.check_output(command, stderr=subprocess.STDOUT)
return result

def do_help():
"""display all commands"""
help = ""

help_dict = {
"branch": "get current branch",
"branches": "get all branches",
"checkout": "switch branches",
"delete": "delete an old branch",
"pull": "update everything from the remote server",
"status": "check if we are up-to-date",
}

keys = help_dict.keys()
keys.sort()
for key in keys:
help += "\x0303%s\x0301 (%s), " % (key, help_dict[key])
help = help[:-2] # trim last comma

connection.reply(data.chan, data.nick, "sub-commands are: %s." % help)

def do_branch():
"""get our current branch"""
branch = exec_shell("git name-rev --name-only HEAD")
branch = branch[:-1] # strip newline

connection.reply(data.chan, data.nick, "currently on branch \x0302%s\x0301." % branch)

def do_branches():
"""get list of branches"""
branches = exec_shell("git branch")

branches = branches[:-1] # strip newline
branches = branches.replace('\n* ', ', ') # cleanup extraneous characters
branches = branches.replace('* ', ' ')
branches = branches.replace('\n ', ', ')
branches = branches.strip()

connection.reply(data.chan, data.nick, "branches: \x0302%s\x0301." % branches)

def do_checkout():
"""switch branches"""
try:
branch = data.args[1]
except IndexError: # no branch name provided
connection.reply(data.chan, data.nick, "switch to which branch?")
return

try:
result = exec_shell("git checkout %s" % branch)
if "Already on" in result:
connection.reply(data.chan, data.nick, "already on \x0302%s\x0301!" % branch)
from config.irc_config import *
from irc.base_command import BaseCommand

class Git(BaseCommand):
def get_hook(self):
return "msg"

def get_help(self, command):
return "Commands to interface with the bot's git repository; use '!git help' for sub-command list."

def check(self, data):
if data.is_command and data.command == "git":
return True
return False

def process(self, data):
self.data = data
if data.host not in OWNERS:
self.connection.reply(data, "you must be a bot owner to use this command.")
return

if not data.args:
self.connection.reply(data, "no arguments provided. Maybe you wanted '!git help'?")
return

if data.args[0] == "help":
self.do_help()

elif data.args[0] == "branch":
self.do_branch()

elif data.args[0] == "branches":
self.do_branches()

elif data.args[0] == "checkout":
self.do_checkout()

elif data.args[0] == "delete":
self.do_delete()

elif data.args[0] == "pull":
self.do_pull()

elif data.args[0] == "status":
self.do_status()

else: # they asked us to do something we don't know
self.connection.reply(data, "unknown argument: \x0303%s\x0301." % data.args[0])

def exec_shell(self, command):
"""execute a shell command and get the output"""
command = shlex.split(command)
result = subprocess.check_output(command, stderr=subprocess.STDOUT)
if result:
result = result[:-1] # strip newline
return result

def do_help(self):
"""display all commands"""
help_dict = {
"branch": "get current branch",
"branches": "get all branches",
"checkout": "switch branches",
"delete": "delete an old branch",
"pull": "update everything from the remote server",
"status": "check if we are up-to-date",
}
keys = help_dict.keys()
keys.sort()
help = ""
for key in keys:
help += "\x0303%s\x0301 (%s), " % (key, help_dict[key])
help = help[:-2] # trim last comma and space
self.connection.reply(self.data, "sub-commands are: %s." % help)

def do_branch(self):
"""get our current branch"""
branch = self.exec_shell("git name-rev --name-only HEAD")
self.connection.reply(self.data, "currently on branch \x0302%s\x0301." % branch)

def do_branches(self):
"""get list of branches"""
branches = self.exec_shell("git branch")
branches = branches.replace('\n* ', ', ') # cleanup extraneous characters
branches = branches.replace('* ', ' ')
branches = branches.replace('\n ', ', ')
branches = branches.strip()
self.connection.reply(self.data, "branches: \x0302%s\x0301." % branches)

def do_checkout(self):
"""switch branches"""
try:
branch = self.data.args[1]
except IndexError: # no branch name provided
self.connection.reply(self.data, "switch to which branch?")
return

try:
result = self.exec_shell("git checkout %s" % branch)
if "Already on" in result:
self.connection.reply(self.data, "already on \x0302%s\x0301!" % branch)
else:
self.connection.reply(self.data, "switched to branch \x0302%s\x0301." % branch)

except subprocess.CalledProcessError: # git couldn't switch branches
self.connection.reply(self.data, "branch \x0302%s\x0301 doesn't exist!" % branch)

def do_delete(self):
"""delete a branch, while making sure that we are not on it"""
try:
delete_branch = self.data.args[1]
except IndexError: # no branch name provided
self.connection.reply(self.data, "delete which branch?")
return

current_branch = self.exec_shell("git name-rev --name-only HEAD")

if current_branch == delete_branch:
self.connection.reply(self.data, "you're currently on this branch; please checkout to a different branch before deleting.")
return

try:
self.exec_shell("git branch -d %s" % delete_branch)
self.connection.reply(self.data, "branch \x0302%s\x0301 has been deleted locally." % delete_branch)
except subprocess.CalledProcessError: # git couldn't delete
self.connection.reply(self.data, "branch \x0302%s\x0301 doesn't exist!" % delete_branch)

def do_pull(self):
"""pull from remote repository"""
branch = self.exec_shell("git name-rev --name-only HEAD")
self.connection.reply(self.data, "pulling from remote (currently on \x0302%s\x0301)..." % branch)

result = self.exec_shell("git pull")

if "Already up-to-date." in result:
self.connection.reply(self.data, "done; no new changes.")
else:
changes = re.findall("\s*((.*?)\sfile(.*?)tions?\(-\))", result)[0][0] # find the changes
self.connection.reply(self.data, "done; %s." % changes)

def do_status(self):
"""check whether we have anything to pull"""
self.connection.reply(self.data, "checking remote for updates...")
result = self.exec_shell("git fetch --dry-run")
if not result:
self.connection.reply(self.data, "local copy is up-to-date with remote.")
else:
connection.reply(data.chan, data.nick, "switched to branch \x0302%s\x0301." % branch)

except subprocess.CalledProcessError: # git couldn't switch branches
connection.reply(data.chan, data.nick, "branch \x0302%s\x0301 doesn't exist!" % branch)

def do_delete():
"""delete a branch, while making sure that we are not on it"""
try:
delete_branch = data.args[1]
except IndexError: # no branch name provided
connection.reply(data.chan, data.nick, "delete which branch?")
return

current_branch = exec_shell("git name-rev --name-only HEAD")
current_branch = current_branch[:-1] # strip newline

if current_branch == delete_branch:
connection.reply(data.chan, data.nick, "you're currently on this branch; please checkout to a different branch before deleting.")
return

try:
exec_shell("git branch -d %s" % delete_branch)
connection.reply(data.chan, data.nick, "branch \x0302%s\x0301 has been deleted locally." % delete_branch)
except subprocess.CalledProcessError: # git couldn't delete
connection.reply(data.chan, data.nick, "branch \x0302%s\x0301 doesn't exist!" % delete_branch)

def do_pull():
"""pull from remote repository"""
branch = exec_shell("git name-rev --name-only HEAD")
branch = branch[:-1] # strip newline
connection.reply(data.chan, data.nick, "pulling from remote (currently on \x0302%s\x0301)..." % branch)

result = exec_shell("git pull")

if "Already up-to-date." in result:
connection.reply(data.chan, data.nick, "done; no new changes.")
else:
changes = re.findall("\s*((.*?)\sfile(.*?)tions?\(-\))", result)[0][0] # find the changes
connection.reply(data.chan, data.nick, "done; %s." % changes)

def do_status():
"""check whether we have anything to pull"""
connection.reply(data.chan, data.nick, "checking remote for updates...")
result = exec_shell("git fetch --dry-run")
if not result:
connection.reply(data.chan, data.nick, "local copy is up-to-date with remote.")
else:
connection.reply(data.chan, data.nick, "remote is ahead of local copy.")
self.connection.reply(self.data, "remote is ahead of local copy.")

+ 35
- 37
irc/commands/help.py Wyświetl plik

@@ -1,48 +1,46 @@
# -*- coding: utf-8 -*-

"""Generates help information."""
# Generates help information.

connection, data = None, None
from irc.base_command import BaseCommand
from irc.data import Data
from irc import triggers

def get_alias(key):
"""connect command aliases with their file, e.g. so we know !voice corresponds to chanops.py"""
aliases = {
"voice": "chanops",
"devoice": "chanops",
"op": "chanops",
"deop": "chanops",
}
return aliases[key]
class Help(BaseCommand):
def get_hook(self):
return "msg"

def call(c, d):
global connection, data
connection, data = c, d
def get_help(self, command):
return "Generates help information."

if not data.args:
do_general_help()
def check(self, data):
if data.is_command and data.command == "help":
return True
return False

else:
do_command_help()
def process(self, data):
if not data.args:
self.do_general_help(data)
else:
self.do_command_help(data)

def do_general_help():
connection.reply(data.chan, data.nick, "I am a bot! You can get help for any command by typing '!help <command>'.")
def do_general_help(self, data):
self.connection.reply(data, "I am a bot! You can get help for any command by typing '!help <command>'.")

def do_command_help():
command = data.args[0]
def do_command_help(self, data):
command = data.args[0]
commands = triggers.get_commands()

dummy = Data() # dummy message to test which command classes pick up this command
dummy.command = command
dummy.is_command = True

for cmnd in commands:
if cmnd.check(dummy):
help = cmnd.get_help(command)
break

try:
exec "from irc.commands import %s as this_command" % command
except ImportError: # if we can't find it directly, this could be an alias for another command
try:
cmd = get_alias(command)
except KeyError:
connection.reply(data.chan, data.nick, "command \x0303%s\x0301 not found!" % command)
return
exec "from irc.commands import %s as this_command" % cmd

info = this_command.__doc__

if info:
connection.reply(data.chan, data.nick, "info for command \x0303%s\x0301: \"%s\"" % (command, info))
else:
connection.reply(data.chan, data.nick, "sorry, no information for \x0303%s\x0301." % command)
self.connection.reply(data, "info for command \x0303%s\x0301: \"%s\"" % (command, help))
except UnboundLocalError:
self.connection.reply(data, "sorry, no help for \x0303%s\x0301." % command)

+ 49
- 38
irc/commands/link.py Wyświetl plik

@@ -1,54 +1,65 @@
# -*- coding: utf-8 -*-

"""Convert a Wikipedia page name into a URL."""
# Convert a Wikipedia page name into a URL.

import re

connection, data = None, None
from irc.base_command import BaseCommand

def call(c, d):
global connection, data
connection, data = c, d
class Link(BaseCommand):
def get_hook(self):
return "msg"

msg = data.msg
def get_help(self, command):
return "Convert a Wikipedia page name into a URL."

if re.search("(\[\[(.*?)\]\])|(\{\{(.*?)\}\})", msg):
links = parse_line(msg)
links = " , ".join(links)
connection.reply(data.chan, data.nick, links)
def check(self, data):
if ((data.is_command and data.command == "link") or
(("[[" in data.msg and "]]" in data.msg) or
("{{" in data.msg and "}}" in data.msg))):
return True
return False

elif data.command == "!link":
if not data.args:
connection.reply(data.chan, data.nick, "what do you want me to link to?")
return
pagename = ' '.join(data.args)
link = parse_link(pagename)
connection.reply(data.chan, data.nick, link)
def process(self, data):
msg = data.msg

def parse_line(line):
results = list()
if re.search("(\[\[(.*?)\]\])|(\{\{(.*?)\}\})", msg):
links = self.parse_line(msg)
links = " , ".join(links)
self.connection.reply(data, links)

line = re.sub("\{\{\{(.*?)\}\}\}", "", line) # destroy {{{template parameters}}}
elif data.command == "link":
if not data.args:
self.connection.reply(data, "what do you want me to link to?")
return
pagename = ' '.join(data.args)
link = self.parse_link(pagename)
self.connection.reply(data, link)

links = re.findall("(\[\[(.*?)(\||\]\]))", line) # find all [[links]]
if links:
links = map(lambda x: x[1], links) # re.findall() returns a list of tuples, but we only want the 2nd item in each tuple
results.extend(map(parse_link, links))
def parse_line(self, line):
results = list()

templates = re.findall("(\{\{(.*?)(\||\}\}))", line) # find all {{templates}}
if templates:
templates = map(lambda x: x[1], templates)
results.extend(map(parse_template, templates))
line = re.sub("\{\{\{(.*?)\}\}\}", "", line) # destroy {{{template parameters}}}

return results
links = re.findall("(\[\[(.*?)(\||\]\]))", line) # find all [[links]]
if links:
links = map(lambda x: x[1], links) # re.findall() returns a list of tuples, but we only want the 2nd item in each tuple
results.extend(map(self.parse_link, links))

def parse_link(pagename):
pagename = pagename.strip()
link = "http://en.wikipedia.org/wiki/" + pagename
link = link.replace(" ", "_")
return link
templates = re.findall("(\{\{(.*?)(\||\}\}))", line) # find all {{templates}}
if templates:
templates = map(lambda x: x[1], templates)
results.extend(map(self.parse_template, templates))

def parse_template(pagename):
pagename = "Template:%s" % pagename # TODO: implement an actual namespace check
link = parse_link(pagename)
return link
return results

def parse_link(self, pagename):
pagename = pagename.strip()
link = "http://en.wikipedia.org/wiki/" + pagename
link = link.replace(" ", "_")
return link

def parse_template(self, pagename):
pagename = "Template:%s" % pagename # TODO: implement an actual namespace check
link = self.parse_link(pagename)
return link

+ 17
- 11
irc/commands/test.py Wyświetl plik

@@ -1,20 +1,26 @@
# -*- coding: utf-8 -*-

"""Test the bot!"""
# A very simple command to test the bot.

import random

connection, data = None, None
from irc.base_command import BaseCommand

def call(c, d):
global connection, data
connection, data = c, d
class Test(BaseCommand):
def get_hook(self):
return "msg"

choices = ("say_hi()", "say_sup()")
exec random.choice(choices)
def get_help(self, command):
return "Test the bot!"

def say_hi():
connection.say(data.chan, "Hey \x02%s\x0F!" % data.nick)
def check(self, data):
if data.is_command and data.command == "test":
return True
return False

def say_sup():
connection.say(data.chan, "'sup \x02%s\x0F?" % data.nick)
def process(self, data):
hey = random.randint(0, 1)
if hey:
self.connection.say(data.chan, "Hey \x02%s\x0F!" % data.nick)
else:
self.connection.say(data.chan, "'sup \x02%s\x0F?" % data.nick)

Ładowanie…
Anuluj
Zapisz