#!/usr/bin/env python3 """ Hook to be called when attempting to access the git repository through ssh. Verify permissions and pass control to git-shell if allowed. First commandline argument should contain the username to authenticate, which is controlled by ~/.ssh/authorized_keys SVN_ORIGINAL_COMMAND contains the git command and argument, which is controlled by the client side git command. """ import sys import os import os.path import psycopg2 import subprocess import configparser from datetime import datetime ALLOWED_COMMANDS = ('git-upload-pack', 'git-receive-pack') WRITE_COMMANDS = ('git-receive-pack') class Logger(object): def __init__(self, cfg): self.user = "Unknown" self.logfile = cfg.get('paths', 'logfile') def log(self, message): f = open(self.logfile, "a") f.write("%s: (%s) %s" % (datetime.now(), self.user, message)) f.write("\n") f.close() def setuser(self, user): if user: self.user = user class InternalException(Exception): pass class PgGit(object): user = None command = None path = None subpath = None def __init__(self, cfg): self.cfg = cfg self.logger = Logger(cfg) self.repoprefix = "%s/repos/" % cfg.get('paths', 'githome') if cfg.has_option('trigger', 'pushtrigger'): pieces = cfg.get('trigger', 'pushtrigger').split('.') modname = '.'.join(pieces[:-1]) classname = pieces[-1] try: mod = __import__(modname) c = getattr(mod, classname) self.pushtrigger = c(self.cfg) except Exception as e: raise InternalException("Failed to load push trigger class: %s" % e) else: self.pushtrigger = None def parse_commandline(self): if len(sys.argv) != 2: raise InternalException("Can only be run with one commandline argument!") self.user = sys.argv[1] self.logger.setuser(self.user) def parse_command(self): env = os.environ.get('SSH_ORIGINAL_COMMAND', None) if not env: raise InternalException("No SSH_ORIGINAL_COMMAND present!") # env contains "git- " or "git " command, args = env.split(None, 1) if command == "git": subcommand, args = args.split(None, 1) command = "git-%s" % subcommand if command not in ALLOWED_COMMANDS: raise InternalException("Command '%s' not allowed" % command) self.command = command if not args.startswith("'/"): raise InternalException("Expected git path to start with slash!") # FIXME: what about that single quote? Make sure it's there? # use os.path.normpath to make sure the user does not attempt to break out of the repository root self.path = os.path.normpath(("%s%s" % (self.repoprefix, args[2:].rstrip("'")))) if not self.path.startswith(self.repoprefix): raise InternalException("Escaping the root directory is of course not permitted") if not self.path.endswith('.git'): raise InternalException("Git repository paths must end in .git") if not os.path.exists(self.path): raise InternalException('git repository "%s" does not exist' % args) self.subpath = self.path[len(self.repoprefix):-4] def check_permissions(self): writeperm = False db = psycopg2.connect(self.cfg.get('database', 'db')) curs = db.cursor() curs.execute("SELECT CASE WHEN remoterepository_id IS NULL THEN level ELSE 0 END FROM repository_permissions INNER JOIN repositories ON repoid=repository INNER JOIN auth_user ON auth_user.id=user_id WHERE username=%s AND name=%s", (self.user, self.subpath)) try: writeperm = (curs.fetchone()[0] > 0) except: raise InternalException("Permission denied on repository for user %s" % self.user) if self.command in WRITE_COMMANDS: if not writeperm: raise InternalException("Write permission denied on repository for user %s" % self.user) def run_command(self): self.logger.log("Running \"git shell %s %s\"" % (self.command, "'%s'" % self.path)) subprocess.call(['git', 'shell', '-c', "%s %s" % (self.command, "'%s'" % self.path)]) def run(self): try: self.parse_commandline() self.parse_command() self.check_permissions() self.run_command() if self.pushtrigger: if self.command in WRITE_COMMANDS: self.pushtrigger.pushtrigger(self.subpath, self.user) except InternalException as e: try: self.logger.log(e) except Exception as e: pass sys.stderr.write("%s\n" % e) sys.exit(1) except Exception as e: try: self.logger.log(e) except Exception as e: # If we failed to log, try once more with a new logger, otherwise, # just accept that we failed. try: Logger().log(e) except: pass sys.stderr.write("An unhandled exception occurred on the server\n") sys.exit(1) if __name__ == "__main__": c = configparser.ConfigParser() c.read("%s/pggit.settings" % os.path.abspath(sys.path[0])) PgGit(c).run()