|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162 |
- # -*- coding: utf-8 -*-
-
- # Commands to interface with the bot's git repository; use '!git help' for sub-command list.
-
- import shlex
- import subprocess
- import re
-
- from irc.classes import BaseCommand
- from core import config
-
- class Git(BaseCommand):
- def get_hooks(self):
- return ["msg"]
-
- def get_help(self, command):
- return "Commands to interface with the bot's git repository; use '!git help' for sub-command list."
-
- def check(self, data):
- if data.is_command and data.command == "git":
- return True
- return False
-
- def process(self, data):
- self.data = data
- if data.host not in config.irc.permissions["owners"]:
- self.connection.reply(data, "you must be a bot owner to use this command.")
- return
-
- if not data.args:
- self.connection.reply(data, "no arguments provided. Maybe you wanted '!git help'?")
- return
-
- if data.args[0] == "help":
- self.do_help()
-
- elif data.args[0] == "branch":
- self.do_branch()
-
- elif data.args[0] == "branches":
- self.do_branches()
-
- elif data.args[0] == "checkout":
- self.do_checkout()
-
- elif data.args[0] == "delete":
- self.do_delete()
-
- elif data.args[0] == "pull":
- self.do_pull()
-
- elif data.args[0] == "status":
- self.do_status()
-
- else: # they asked us to do something we don't know
- self.connection.reply(data, "unknown argument: \x0303%s\x0301." % data.args[0])
-
- def exec_shell(self, command):
- """execute a shell command and get the output"""
- command = shlex.split(command)
- result = subprocess.check_output(command, stderr=subprocess.STDOUT)
- if result:
- result = result[:-1] # strip newline
- return result
-
- def do_help(self):
- """display all commands"""
- help_dict = {
- "branch": "get current branch",
- "branches": "get all branches",
- "checkout": "switch branches",
- "delete": "delete an old branch",
- "pull": "update everything from the remote server",
- "status": "check if we are up-to-date",
- }
- keys = help_dict.keys()
- keys.sort()
- help = ""
- for key in keys:
- help += "\x0303%s\x0301 (%s), " % (key, help_dict[key])
- help = help[:-2] # trim last comma and space
- self.connection.reply(self.data, "sub-commands are: %s." % help)
-
- def do_branch(self):
- """get our current branch"""
- branch = self.exec_shell("git name-rev --name-only HEAD")
- self.connection.reply(self.data, "currently on branch \x0302%s\x0301." % branch)
-
- def do_branches(self):
- """get list of branches"""
- branches = self.exec_shell("git branch")
- branches = branches.replace('\n* ', ', ') # cleanup extraneous characters
- branches = branches.replace('* ', ' ')
- branches = branches.replace('\n ', ', ')
- branches = branches.strip()
- self.connection.reply(self.data, "branches: \x0302%s\x0301." % branches)
-
- def do_checkout(self):
- """switch branches"""
- try:
- branch = self.data.args[1]
- except IndexError: # no branch name provided
- self.connection.reply(self.data, "switch to which branch?")
- return
-
- try:
- result = self.exec_shell("git checkout %s" % branch)
- if "Already on" in result:
- self.connection.reply(self.data, "already on \x0302%s\x0301!" % branch)
- else:
- current_branch = self.exec_shell("git name-rev --name-only HEAD")
- self.connection.reply(self.data, "switched from branch \x0302%s\x0301 to \x0302%s\x0301." % (current_branch, branch))
-
- except subprocess.CalledProcessError: # git couldn't switch branches
- self.connection.reply(self.data, "branch \x0302%s\x0301 doesn't exist!" % branch)
-
- def do_delete(self):
- """delete a branch, while making sure that we are not on it"""
- try:
- delete_branch = self.data.args[1]
- except IndexError: # no branch name provided
- self.connection.reply(self.data, "delete which branch?")
- return
-
- current_branch = self.exec_shell("git name-rev --name-only HEAD")
-
- if current_branch == delete_branch:
- self.connection.reply(self.data, "you're currently on this branch; please checkout to a different branch before deleting.")
- return
-
- try:
- self.exec_shell("git branch -d %s" % delete_branch)
- self.connection.reply(self.data, "branch \x0302%s\x0301 has been deleted locally." % delete_branch)
- except subprocess.CalledProcessError: # git couldn't delete
- self.connection.reply(self.data, "branch \x0302%s\x0301 doesn't exist!" % delete_branch)
-
- def do_pull(self):
- """pull from remote repository"""
- branch = self.exec_shell("git name-rev --name-only HEAD")
- self.connection.reply(self.data, "pulling from remote (currently on \x0302%s\x0301)..." % branch)
-
- result = self.exec_shell("git pull")
-
- if "Already up-to-date." in result:
- self.connection.reply(self.data, "done; no new changes.")
- else:
- changes = re.findall("\s*((.*?)\sfile(.*?)tions?\(-\))", result)[0][0] # find the changes
- try:
- remote = self.exec_shell("git config --get branch.%s.remote" % branch)
- url = self.exec_shell("git config --get remote.%s.url" % remote)
- self.connection.reply(self.data, "done; %s [from %s]." % (changes, url))
- except subprocess.CalledProcessError: # something in .git/config is not specified correctly, so we cannot get the remote's url
- self.connection.reply(self.data, "done; %s." % changes)
-
- def do_status(self):
- """check whether we have anything to pull"""
- last = self.exec_shell("git log -n 1 --pretty=\"%ar\"")
- result = self.exec_shell("git fetch --dry-run")
- if not result: # nothing was fetched, so remote and local are equal
- self.connection.reply(self.data, "last commit was %s. Local copy is \x02up-to-date\x0F with remote." % last)
- else:
- self.connection.reply(self.data, "last local commit was %s. Remote is \x02ahead\x0F of local copy." % last)
|