# Copyright (C) 2009-2014 Ben Kurtovic # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. import time import git from earwigbot.commands import Command class Git(Command): """Commands to interface with configurable git repositories; use '!git' for a sub-command list.""" name = "git" def setup(self): try: self.repos = self.config.commands[self.name]["repos"] except KeyError: self.repos = None def process(self, data): self.data = data if not self.config.irc["permissions"].is_owner(data): msg = "You must be a bot owner to use this command." self.reply(data, msg) return if not data.args or data.args[0] == "help": self.do_help() return if not self.repos: self.reply(data, "No repos are specified in the config file.") return command = data.args[0] try: repo_name = data.args[1] except IndexError: repos = self.get_repos() msg = "Which repo do you want to work with (options are {0})?" self.reply(data, msg.format(repos)) return if repo_name not in self.repos: repos = self.get_repos() msg = "Repository must be one of the following: {0}." self.reply(data, msg.format(repos)) return self.repo = git.Repo(self.repos[repo_name]) if command == "branch": self.do_branch() elif command == "branches": self.do_branches() elif command == "checkout": self.do_checkout() elif command == "delete": self.do_delete() elif command == "pull": self.do_pull() elif command == "status": self.do_status() else: # They asked us to do something we don't know msg = f"Unknown argument: \x0303{data.args[0]}\x0f." self.reply(data, msg) def get_repos(self): data = self.repos.iteritems() repos = [f"\x0302{k}\x0f ({v})" for k, v in data] return ", ".join(repos) def get_remote(self): try: remote_name = self.data.args[2] except IndexError: remote_name = "origin" try: return getattr(self.repo.remotes, remote_name) except AttributeError: msg = f"Unknown remote: \x0302{remote_name}\x0f." self.reply(self.data, msg) def get_time_since(self, date): diff = time.mktime(time.gmtime()) - date if diff < 60: return f"{int(diff)} seconds" if diff < 60 * 60: return f"{int(diff / 60)} minutes" if diff < 60 * 60 * 24: return f"{int(diff / 60 / 60)} hours" return f"{int(diff / 60 / 60 / 24)} days" def do_help(self): """Display all commands.""" help = { "branch": "get current branch", "branches": "get all branches", "checkout": "switch branches", "delete": "delete an old branch", "pull": "update everything from the remote server", "status": "check if we are up-to-date", } subcommands = "" for key in sorted(help.keys()): subcommands += f"\x0303{key}\x0f ({help[key]}), " subcommands = subcommands[:-2] # Trim last comma and space msg = "Sub-commands are: {0}; repos are: {1}. Syntax: !git \x0303subcommand\x0f \x0302repo\x0f." self.reply(self.data, msg.format(subcommands, self.get_repos())) def do_branch(self): """Get our current branch.""" branch = self.repo.active_branch.name msg = f"Currently on branch \x0302{branch}\x0f." self.reply(self.data, msg) def do_branches(self): """Get a list of branches.""" branches = [branch.name for branch in self.repo.branches] msg = "Branches: \x0302{}\x0f.".format(", ".join(branches)) self.reply(self.data, msg) def do_checkout(self): """Switch branches.""" try: target = self.data.args[2] except IndexError: # No branch name provided self.reply(self.data, "Wwitch to which branch?") return current_branch = self.repo.active_branch.name if target == current_branch: msg = f"Already on \x0302{target}\x0f!" self.reply(self.data, msg) return try: ref = getattr(self.repo.branches, target) except AttributeError: msg = f"Branch \x0302{target}\x0f doesn't exist!" self.reply(self.data, msg) else: ref.checkout() ms = "Switched from branch \x0302{0}\x0f to \x0302{1}\x0f." msg = ms.format(current_branch, target) self.reply(self.data, msg) log = "{0} checked out branch {1} of {2}" logmsg = log.format(self.data.nick, target, self.repo.working_dir) self.logger.info(logmsg) def do_delete(self): """Delete a branch, while making sure that we are not already on it.""" try: target = self.data.args[2] except IndexError: # No branch name provided self.reply(self.data, "Delete which branch?") return current_branch = self.repo.active_branch.name if current_branch == target: msg = "You're currently on this branch; please checkout to a different branch before deleting." self.reply(self.data, msg) return try: ref = getattr(self.repo.branches, target) except AttributeError: msg = f"Branch \x0302{target}\x0f doesn't exist!" self.reply(self.data, msg) else: self.repo.git.branch("-d", ref) msg = "Branch \x0302{0}\x0f has been deleted locally." self.reply(self.data, msg.format(target)) log = "{0} deleted branch {1} of {2}" logmsg = log.format(self.data.nick, target, self.repo.working_dir) self.logger.info(logmsg) def do_pull(self): """Pull from our remote repository.""" branch = self.repo.active_branch.name msg = "Pulling from remote (currently on \x0302{0}\x0f)..." self.reply(self.data, msg.format(branch)) remote = self.get_remote() if not remote: return result = remote.pull() updated = [info for info in result if info.flags != info.HEAD_UPTODATE] if updated: branches = ", ".join([info.ref.remote_head for info in updated]) msg = "Done; updates to \x0302{0}\x0f (from {1})." self.reply(self.data, msg.format(branches, remote.url)) log = "{0} pulled {1} of {2} (updates to {3})" self.logger.info( log.format(self.data.nick, remote.name, self.repo.working_dir, branches) ) else: self.reply(self.data, "Done; no new changes.") log = "{0} pulled {1} of {2} (no updates)" self.logger.info( log.format(self.data.nick, remote.name, self.repo.working_dir) ) def do_status(self): """Check if we have anything to pull.""" remote = self.get_remote() if not remote: return since = self.get_time_since(self.repo.head.object.committed_date) result = remote.fetch(dry_run=True) updated = [info for info in result if info.flags != info.HEAD_UPTODATE] if updated: branches = ", ".join([info.ref.remote_head for info in updated]) msg = "Last local commit was \x02{0}\x0f ago; updates to \x0302{1}\x0f." self.reply(self.data, msg.format(since, branches)) log = "{0} got status of {1} of {2} (updates to {3})" self.logger.info( log.format(self.data.nick, remote.name, self.repo.working_dir, branches) ) else: msg = ( "Last commit was \x02{0}\x0f ago. Local copy is up-to-date with remote." ) self.reply(self.data, msg.format(since)) log = "{0} pulled {1} of {2} (no updates)" self.logger.info( log.format(self.data.nick, remote.name, self.repo.working_dir) )