X-Git-Url: https://git.svenne.dk/?p=public%2Fdnssec-swede-utility.git;a=blobdiff_plain;f=swede;h=cd212a4c38b5595c387f9b4e7f9d89e25898f34c;hp=2bc1e4a1cb5578e3701b05046c0c4b9f66401b33;hb=HEAD;hpb=3b9158b46a1025251aa6e895422e038e74954b63 diff --git a/swede b/swede index 2bc1e4a..cd212a4 100755 --- a/swede +++ b/swede @@ -1,4 +1,4 @@ -#!/usr/bin/python +#!/usr/bin/python2 # swede - A tool to create DANE/TLSA records. # This tool is really simple and not foolproof, it doesn't check the CN in the @@ -17,6 +17,7 @@ import sys import os +import os.path import socket import unbound import re @@ -25,6 +26,9 @@ from binascii import a2b_hex, b2a_hex from hashlib import sha256, sha512 from ipaddr import IPv4Address, IPv6Address +check_ipv4=True +check_ipv6=True + def genTLSA(hostname, protocol, port, certificate, output='generic', usage=1, selector=0, mtype=1): """This function generates a TLSARecord object using the data passed in the parameters, @@ -59,6 +63,7 @@ def genTLSA(hostname, protocol, port, certificate, output='generic', usage=1, se return record.getRecord() def getA(hostname, secure=True): + if not check_ipv4: return [] """Gets a list of A records for hostname, returns a list of ARecords""" try: records = getRecords(hostname, rrtype='A', secure=secure) @@ -74,6 +79,7 @@ def getA(hostname, secure=True): return ret def getAAAA(hostname, secure=True): + if not check_ipv6: return [] """Gets a list of A records for hostname, returns a list of AAAARecords""" try: records = getRecords(hostname, rrtype='AAAA', secure=secure) @@ -127,8 +133,22 @@ def getRecords(hostname, rrtype='A', secure=True): """Do a lookup of a name and a rrtype, returns a list of binary coded strings. Only queries for rr_class IN.""" global resolvconf ctx = unbound.ub_ctx() - ctx.add_ta_file('root.key') - ctx.set_option("dlv-anchor-file:", "dlv.isc.org.key") + if os.path.exists("root.key"): + ctx.add_ta_file('root.key') + elif os.path.exists("/etc/swede/root.key"): + ctx.add_ta_file('/etc/swede/root.key') + else: + print "Cannot find root.key, please move it to /etc/swede" + sys.exit() + + if os.path.exists("dlv.isc.org.key"): + ctx.set_option("dlv-anchor-file:", "dlv.isc.org.key") + elif os.path.exists("/etc/swede/dlv.isc.org.key"): + ctx.set_option("dlv-anchor-file:", "/etc/swede/dlv.isc.org.key") + else: + print "Cannot find dlv.isc.org.key, please move it to /etc/swede" + sys.exit() + # Use the local cache if resolvconf and os.path.isfile(resolvconf): ctx.resolvconf(resolvconf) @@ -234,6 +254,28 @@ def verifyCertMatch(record, cert): else: return False +def verifyCertNameWithHostName(cert, hostname, with_msg=False): + """Verify the name on the certificate with a hostname, we need this because we get the cert based on IP address and thusly cannot rely on M2Crypto to verify this""" + if not isinstance(cert, X509.X509): + return + if not isinstance(hostname, str): + return + + if hostname[-1] == '.': + hostname = hostname[0:-1] + + # Ugly string comparison to see if the name on the ee-cert matches with the name provided on the commandline + try: + altnames_on_cert = cert.get_ext('subjectAltName').get_value() + except: + altnames_on_cert = '' + if hostname in (str(cert.get_subject()) + altnames_on_cert): + return True + else: + if with_msg: + print 'WARNING: Name on the certificate (Subject: %s, SubjectAltName: %s) doesn\'t match requested hostname (%s).' % (str(cert.get_subject()), altnames_on_cert, hostname) + return False + class TLSARecord: """When instanciated, this class contains all the fields of a TLSA record. """ @@ -367,8 +409,8 @@ if __name__ == '__main__': parser_create = subparsers.add_parser('create', help='Create a TLSA record') parser_create.set_defaults(function='create') - #parser.add_argument('-4', dest='ipv4', action='store_true',help='use ipv4 networking only') - #parser.add_argument('-6', dest='ipv6', action='store_true',help='use ipv6 networking only') + parser.add_argument('-4', dest='ipv4', action='store_true',help='use ipv4 networking only') + parser.add_argument('-6', dest='ipv6', action='store_true',help='use ipv6 networking only') parser.add_argument('--insecure', action='store_true', default=False, help='Allow use of non-dnssec secured answers') parser.add_argument('--resolvconf', metavar='/PATH/TO/RESOLV.CONF', action='store', default='', help='Use a recursive resolver from resolv.conf') parser.add_argument('-v', '--version', action='version', version='%(prog)s v0.2', help='show version and exit') @@ -391,6 +433,15 @@ if __name__ == '__main__': parser_create.add_argument('--mtype', '-m', action='store', type=int, default=1, choices=[0,1,2], help='The Matching Type of the Certificate for Association. \'0\' for Exact match, \'1\' for SHA-256 hash, \'2\' for SHA-512 (default: %(default)s).') args = parser.parse_args() + import pprint + pprint.pprint(args) + if args.ipv4 == True and args.ipv6 == True: + print "Cannot have only ipv4 and only ipv6 at the same time" + sys.exit() + elif args.ipv4 == True: + check_ipv6 = False + elif args.ipv6 == True: + check_ipv4 = False if args.host[-1] != '.': args.host += '.' @@ -443,7 +494,13 @@ if __name__ == '__main__': if not args.quiet: print 'Attempting to verify the record with the TLS service...' - addresses = getA(args.host, secure=secure) + getAAAA(args.host, secure=secure) + if check_ipv4 and check_ipv6: + addresses = getA(args.host, secure=secure) + getAAAA(args.host, secure=secure) + elif check_ipv4: + addresses = getA(args.host, secure=secure) + else: + addresses = getAAAA(args.host, secure=secure) + for address in addresses: if not args.quiet: print 'Got the following IP: %s' % str(address) @@ -458,9 +515,12 @@ if __name__ == '__main__': sys.exit(1) # Don't error when the verification fails in the SSL handshake ctx.set_verify(SSL.verify_none, depth=9) - if isinstance(address, AAAARecord): + if check_ipv6 and isinstance(address, AAAARecord): sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + elif check_ipv4 and isinstance(address, ARecord): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) else: sock = None connection = SSL.Connection(ctx, sock=sock) @@ -476,6 +536,10 @@ if __name__ == '__main__': verify_result = connection.get_verify_result() # Good, now let's verify + if not verifyCertNameWithHostName(cert=chain[0], hostname=str(args.host), with_msg=True): + # The name on the cert doesn't match the hostname... we don't verify the TLSA record + print 'Not checking the TLSA record.' + continue if record.usage == 1: # End-host cert cert = chain[0] if verifyCertMatch(record, cert): @@ -575,7 +639,13 @@ if __name__ == '__main__': sys.stdout.write('Port %s not numerical or within correct range (1 <= port <= 65535), try again (hit enter for default 443): ' % user_input) # Get the address records for the host try: - addresses = getA(args.host, secure=secure) + getAAAA(args.host, secure=secure) + if check_ipv4 and check_ipv6: + addresses = getA(args.host, secure=secure) + getAAAA(args.host, secure=secure) + elif check_ipv4: + addresses = getA(args.host, secure=secure) + else: + addresses = getAAAA(args.host, secure=secure) + except InsecureLookupException, e: print >> sys.stderr, str(e) sys.exit(1) @@ -585,9 +655,12 @@ if __name__ == '__main__': # We do the certificate handling here, as M2Crypto keeps segfaulting when try to do stuff with the cert if we don't ctx = SSL.Context() ctx.set_verify(SSL.verify_none, depth=9) - if isinstance(address, AAAARecord): + if check_ipv6 and isinstance(address, AAAARecord): sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + if check_ipv4 and isinstance(address, ARecord): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) else: sock = None connection = SSL.Connection(ctx, sock=sock)