summaryrefslogtreecommitdiff
path: root/pggit.py
blob: c0b26483c3bdd5d2643333819e6e75e8e85e61d0 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
#!/usr/bin/env python3

"""
Hook to be called when attempting to access the git repository through
ssh.

Verify permissions and pass control to git-shell if allowed.

First commandline argument should contain the username to authenticate,
which is controlled by ~/.ssh/authorized_keys

SVN_ORIGINAL_COMMAND contains the git command and argument, which is
controlled by the client side git command.
"""

import sys
import os
import os.path
import psycopg2
import subprocess
import configparser
from datetime import datetime

ALLOWED_COMMANDS = ('git-upload-pack', 'git-receive-pack')
WRITE_COMMANDS = ('git-receive-pack')


class Logger(object):
    def __init__(self, cfg):
        self.user = "Unknown"
        self.logfile = cfg.get('paths', 'logfile')

    def log(self, message):
        f = open(self.logfile, "a")
        f.write("%s: (%s) %s" % (datetime.now(), self.user, message))
        f.write("\n")
        f.close()

    def setuser(self, user):
        if user:
            self.user = user


class InternalException(Exception):
    pass


class PgGit(object):
    user = None
    command = None
    path = None
    subpath = None

    def __init__(self, cfg):
        self.cfg = cfg
        self.logger = Logger(cfg)
        self.repoprefix = "%s/repos/" % cfg.get('paths', 'githome')
        if cfg.has_option('trigger', 'pushtrigger'):
            pieces = cfg.get('trigger', 'pushtrigger').split('.')
            modname = '.'.join(pieces[:-1])
            classname = pieces[-1]
            try:
                mod = __import__(modname)
                c = getattr(mod, classname)
                self.pushtrigger = c(self.cfg)
            except Exception as e:
                raise InternalException("Failed to load push trigger class: %s" % e)
        else:
            self.pushtrigger = None

    def parse_commandline(self):
        if len(sys.argv) != 2:
            raise InternalException("Can only be run with one commandline argument!")
        self.user = sys.argv[1]
        self.logger.setuser(self.user)

    def parse_command(self):
        env = os.environ.get('SSH_ORIGINAL_COMMAND', None)
        if not env:
            raise InternalException("No SSH_ORIGINAL_COMMAND present!")

        # env contains "git-<command> <argument>" or "git <command> <argument>"
        command, args = env.split(None, 1)
        if command == "git":
            subcommand, args = args.split(None, 1)
            command = "git-%s" % subcommand
        if command not in ALLOWED_COMMANDS:
            raise InternalException("Command '%s' not allowed" % command)

        self.command = command
        if not args.startswith("'/"):
            raise InternalException("Expected git path to start with slash!")

        # FIXME: what about that single quote? Make sure it's there?

        # use os.path.normpath to make sure the user does not attempt to break out of the repository root
        self.path = os.path.normpath(("%s%s" % (self.repoprefix, args[2:].rstrip("'"))))
        if not self.path.startswith(self.repoprefix):
            raise InternalException("Escaping the root directory is of course not permitted")
        if not self.path.endswith('.git'):
            raise InternalException("Git repository paths must end in .git")
        if not os.path.exists(self.path):
            raise InternalException('git repository "%s" does not exist' % args)
        self.subpath = self.path[len(self.repoprefix):-4]

    def check_permissions(self):
        writeperm = False
        db = psycopg2.connect(self.cfg.get('database', 'db'))
        curs = db.cursor()
        curs.execute("SELECT CASE WHEN remoterepository_id IS NULL THEN level ELSE 0 END FROM repository_permissions INNER JOIN repositories ON repoid=repository INNER JOIN auth_user ON auth_user.id=user_id WHERE username=%s AND name=%s",
                     (self.user, self.subpath))

        try:
            writeperm = (curs.fetchone()[0] > 0)
        except:
            raise InternalException("Permission denied on repository for user %s" % self.user)

        if self.command in WRITE_COMMANDS:
            if not writeperm:
                raise InternalException("Write permission denied on repository for user %s" % self.user)

    def run_command(self):
        self.logger.log("Running \"git shell %s %s\"" % (self.command, "'%s'" % self.path))
        subprocess.call(['git', 'shell', '-c', "%s %s" % (self.command, "'%s'" % self.path)])

    def run(self):
        try:
            self.parse_commandline()
            self.parse_command()
            self.check_permissions()
            self.run_command()
            if self.pushtrigger:
                if self.command in WRITE_COMMANDS:
                    self.pushtrigger.pushtrigger(self.subpath, self.user)

        except InternalException as e:
            try:
                self.logger.log(e)
            except Exception as e:
                pass
            sys.stderr.write("%s\n" % e)
            sys.exit(1)
        except Exception as e:
            try:
                self.logger.log(e)
            except Exception as e:
                # If we failed to log, try once more with a new logger, otherwise,
                # just accept that we failed.
                try:
                    Logger().log(e)
                except:
                    pass
            sys.stderr.write("An unhandled exception occurred on the server\n")
            sys.exit(1)


if __name__ == "__main__":
    c = configparser.ConfigParser()
    c.read("%s/pggit.settings" % os.path.abspath(sys.path[0]))
    PgGit(c).run()