#!/usr/bin/python3 import socket import ssl import argparse import time import sys import configparser import os # Sends authentication request to MFA server # Receive either pass or fail response from MFA server # Returns status to PAM HEADER_LENGTH = 64 KEY_LENGTH = 64 ACK_LENGTH = 3 DISCONNECT_MESSAGE = "DISCONNECT" FORMAT = "utf-8" RESPONSE_LENGTH = 1 PAM_SUCCESS = 0 PAM_AUTH_ERR = 7 def die(msg): print(msg) sys.exit(1) def parse_arguments(): # Parse command line arguments parser = argparse.ArgumentParser() parser.add_argument("--user",type=str,help="PAM username",required=True) parser.add_argument("--service",type=str,help="PAM service",required=True) parser.add_argument("--host",type=str,help="PAM hostname") parser.add_argument("--config",type=str,help="Path to config file",\ default="/etc/mfa/mfa.conf") parser.add_argument("--server",type=str,help="MFA server address") parser.add_argument("--port",type=str,help="MFA server PAM connection port") parser.add_argument("--plain",action="store_true",help="Connect without TLS") parser.add_argument("--insecure",action="store_true", help="Accept invalid TLS certificates") return parser.parse_args() def init_connection_tls(mfa_server, pam_port, insecure): # Attempts to connect to MFA server with provided address and port # Repeats connection attempts once per second until timeout is reached # Returns the socket if connection was successful or None otherwise connection = None timeout = 0 timeout_length = 5 sleep_length = 1 context = ssl.create_default_context() if insecure: context.check_hostname = False context.verify_mode = 0 while connection == None and timeout < timeout_length: try: connection = context.wrap_socket(socket.socket(socket.AF_INET), server_hostname=mfa_server) connection.connect((mfa_server,int(pam_port))) return connection except (ConnectionError,ConnectionRefusedError): time.sleep(sleep_length) timeout += sleep_length except ssl.SSLCertVerificationError: die("error: server presented invalid certificate") return None def init_connection(mfa_server, pam_port): # Attempts to connect to MFA server with provided address and port # Repeats connection attempts once per second until timeout is reached # Returns the socket if connection was successful or None otherwise connection = None timeout = 0 timeout_length = 5 sleep_length = 1 while connection == None and timeout < timeout_length: try: connection = socket.create_connection((mfa_server,pam_port)) return connection except (ConnectionError,ConnectionRefusedError): time.sleep(sleep_length) timeout += sleep_length return None def read_config(config_file): # Read config file for server and port info # Return tuple (server,port) server = "" port = 0 with open(config_file) as conf: line = None while line != "": line = conf.readline() if line.startswith("server ="): server = line.split("=")[1].strip() if line.startswith("port ="): port = int(line.split("=")[1].strip()) return (server,port) def read_config(config_file): parser = configparser.ConfigParser(inline_comment_prefixes="#") parser.read(config_file) return parser def get_vars(args,confparser): if not os.path.exists(args.config): print("Unable to open config file") sys.exit(1) server = None port = None plain = None insecure = None # Set values from config file first if confparser.has_section("pam"): server = confparser.get("pam","server",fallback=None) port = confparser.get("pam","port",fallback=None) plain = confparser.get("client","plain",fallback=False) insecure = confparser.get("client","insecure",fallback=False) if plain.lower() == "false": plain = False if insecure.lower() == "false": insecure = False # Let command line args overwrite any values if args.server != None: server = args.server if args.port != None: port = args.port if args.plain: plain = args.plain if args.insecure: insecure = args.insecure # Exit if any value is null if None in [server,port]: print("error: one or more items unspecified") sys.exit(1) return server,port,plain,insecure def main(): authed = "0" failed = "1" # Get arguments args = parse_arguments() confparser = read_config(args.config) mfa_server,pam_port,plain,insecure = get_vars(args,confparser) user = args.user service = args.service # Compile data to send to server # Read server and port from config file but allow command line options # to override those settings if args.server != None: mfa_server = args.server if args.port != None: pam_port = args.port # Get hostname if not given on command line if args.host == None: with open("/etc/hostname") as f: hostname = f.read().strip() else: hostname = args.host data = user + "," + hostname + "," + service # Initalize connection to MFA server. Quit if unable to connect. if plain: connection = init_connection(mfa_server, pam_port) else: connection = init_connection_tls(mfa_server,pam_port,insecure) if connection == None: die("failed to connect") # Send authentication data to MFA server data_length = len(data) length_msg = str(data_length) length_msg += ' ' * (HEADER_LENGTH - len(length_msg)) connection.send(length_msg.encode(FORMAT)) connection.send(data.encode(FORMAT)) # Listen for response from MFA server # Response will be 0 for authenticated and 1 for denied response = connection.recv(RESPONSE_LENGTH).decode(FORMAT) if response == "": # lost connection to server response = 1 # Print success/failure for PAM module print(response) if __name__ == '__main__': main()