nginx-ldap-auth/nginx-ldap-auth-daemon.py

227 lines
7.4 KiB
Python
Executable File

#!/bin/sh
''''which python2 >/dev/null && exec python2 "$0" "$@" # '''
''''which python >/dev/null && exec python "$0" "$@" # '''
# Copyright (C) 2014-2015 Nginx, Inc.
import sys, os, signal, base64, ldap, Cookie
from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
Listen = ('localhost', 8888)
#Listen = "/tmp/auth.sock" # Also uncomment lines in 'Requests are
# processed with UNIX sockets' section below
# -----------------------------------------------------------------------------
# Different request processing models: select one
# -----------------------------------------------------------------------------
# Requests are processed in separate thread
import threading
from SocketServer import ThreadingMixIn
class AuthHTTPServer(ThreadingMixIn, HTTPServer):
pass
# -----------------------------------------------------------------------------
# Requests are processed in separate process
#from SocketServer import ForkingMixIn
#class AuthHTTPServer(ForkingMixIn, HTTPServer):
# pass
# -----------------------------------------------------------------------------
# Requests are processed with UNIX sockets
#import threading
#from SocketServer import ThreadingUnixStreamServer
#class AuthHTTPServer(ThreadingUnixStreamServer, HTTPServer):
# pass
# -----------------------------------------------------------------------------
class AuthHandler(BaseHTTPRequestHandler):
# Return True if request is processed and response sent, otherwise False
# Set ctx['user'] and ctx['pass'] for authentication
def do_GET(self):
ctx = self.ctx
ctx['action'] = 'input parameters check'
for k, v in self.get_params().items():
ctx[k] = self.headers.get(v[0], v[1])
if ctx[k] == None:
self.auth_failed(ctx, 'required "%s" header was not passed' % k)
return True
ctx['action'] = 'performing authorization'
auth_header = self.headers.get('Authorization')
auth_cookie = self.get_cookie(ctx['cookiename'])
if auth_cookie != None and auth_cookie != '':
auth_header = "Basic " + auth_cookie
self.log_message("using username/password from cookie %s" %
ctx['cookiename'])
else:
self.log_message("using username/password from authorization header")
if auth_header is None or not auth_header.lower().startswith('basic '):
self.send_response(401)
self.send_header('WWW-Authenticate', 'Basic realm=' + ctx['realm'])
self.send_header('Cache-Control', 'no-cache')
self.end_headers()
return True
ctx['action'] = 'decoding credentials'
try:
auth_decoded = base64.b64decode(auth_header[6:])
user, passwd = auth_decoded.split(':', 2)
except:
self.auth_failed(ctx)
return True
ctx['user'] = user
ctx['pass'] = passwd
# Continue request processing
return False
def get_cookie(self, name):
cookies = self.headers.get('Cookie')
if cookies:
authcookie = Cookie.BaseCookie(cookies).get(name)
if authcookie:
return authcookie.value
else:
return None
else:
return None
# Log the error and complete the request with appropriate status
def auth_failed(self, ctx, errmsg = None):
msg = 'Error while ' + ctx['action']
if errmsg:
msg += ': ' + errmsg
ex, value, trace = sys.exc_info()
if ex != None:
msg += ": " + str(value)
if ctx.get('url'):
msg += ', server="%s"' % ctx['url']
if ctx.get('user'):
msg += ', login="%s"' % ctx['user']
self.log_error(msg)
self.send_response(403)
self.end_headers()
def get_params(self):
return {}
def log_message(self, format, *args):
if len(self.client_address) > 0:
addr = BaseHTTPRequestHandler.address_string(self)
else:
addr = "-"
sys.stdout.write("%s - %s [%s] %s\n" % (addr, self.ctx['user'],
self.log_date_time_string(), format % args))
def log_error(self, format, *args):
self.log_message(format, *args)
# Verify username/password against LDAP server
class LDAPAuthHandler(AuthHandler):
# Parameters to put into self.ctx from the HTTP header of auth request
def get_params(self):
return {
# parameter header default
'realm': ('X-Ldap-Realm', 'Restricted'),
'url': ('X-Ldap-URL', None),
'basedn': ('X-Ldap-BaseDN', None),
'template': ('X-Ldap-Template', '(cn=%(username)s)'),
'binddn': ('X-Ldap-BindDN', 'cn=anonymous'),
'bindpasswd': ('X-Ldap-BindPass', ''),
'cookiename': ('X-CookieName', '')
}
# GET handler for the authentication request
def do_GET(self):
ctx = dict()
self.ctx = ctx
ctx['action'] = 'initializing basic auth handler'
ctx['user'] = '-'
if AuthHandler.do_GET(self):
# request already processed
return
ctx['action'] = 'empty password check'
if not ctx['pass']:
self.auth_failed(ctx, 'attempt to use empty password')
return
try:
ctx['action'] = 'initializing LDAP connection'
ldap_obj = ldap.initialize(ctx['url']);
# See http://www.python-ldap.org/faq.shtml
# uncomment, if required
# ldap_obj.set_option(ldap.OPT_REFERRALS, 0)
ctx['action'] = 'binding as search user'
ldap_obj.bind_s(ctx['binddn'], ctx['bindpasswd'], ldap.AUTH_SIMPLE)
ctx['action'] = 'preparing search filter'
searchfilter = ctx['template'] % { 'username': ctx['user'] }
self.log_message(('searching on server "%s" with base dn ' + \
'"%s" with filter "%s"') %
(ctx['url'], ctx['basedn'], searchfilter))
ctx['action'] = 'running search query'
results = ldap_obj.search_s(ctx['basedn'], ldap.SCOPE_SUBTREE,
searchfilter, ['objectclass'], 1)
ctx['action'] = 'verifying search query results'
if len(results) < 1:
self.auth_failed(ctx, 'no objects found')
return
ctx['action'] = 'binding as an existing user'
ldap_dn = results[0][0]
ctx['action'] += ' "%s"' % ldap_dn
ldap_obj.bind_s(ldap_dn, ctx['pass'], ldap.AUTH_SIMPLE)
self.log_message('Auth OK for user "%s"' % (ctx['user']))
# Successfully authenticated user
self.send_response(200)
self.end_headers()
except:
self.auth_failed(ctx)
def exit_handler(signal, frame):
global Listen
if isinstance(Listen, basestring):
try:
os.unlink(Listen)
except:
ex, value, trace = sys.exc_info()
sys.stderr.write('Failed to remove socket "%s": %s\n' %
(Listen, str(value)))
sys.exit(0)
if __name__ == '__main__':
server = AuthHTTPServer(Listen, LDAPAuthHandler)
signal.signal(signal.SIGINT, exit_handler)
server.serve_forever()