import sys
import os
+import socket
import unbound
import re
from M2Crypto import X509, SSL
else:
return False
+def verifyCertNameWithHostName(cert, hostname, with_msg=False):
+ """Verify the name on the certificate with a hostname, we need this because we get the cert based on IP address and thusly cannot rely on M2Crypto to verify this"""
+ if not isinstance(cert, X509.X509):
+ return
+ if not isinstance(hostname, str):
+ return
+
+ if hostname[-1] == '.':
+ hostname = hostname[0:-1]
+
+ # Ugly string comparison to see if the name on the ee-cert matches with the name provided on the commandline
+ try:
+ altnames_on_cert = cert.get_ext('subjectAltName').get_value()
+ except:
+ altnames_on_cert = ''
+ if hostname in (str(cert.get_subject()) + altnames_on_cert):
+ return True
+ else:
+ if with_msg:
+ print 'WARNING: Name on the certificate (Subject: %s, SubjectAltName: %s) doesn\'t match requested hostname (%s).' % (str(cert.get_subject()), altnames_on_cert, hostname)
+ return False
+
class TLSARecord:
"""When instanciated, this class contains all the fields of a TLSA record.
"""
"""An object representing an AAAA Record (IPv6 address)"""
def __init__(self, hostname, address):
self.rrtype = 28
+ self.hostname = hostname
self.address = address
def __str__(self):
if __name__ == '__main__':
import argparse
# create the parser
- parser = argparse.ArgumentParser(description='Create and verify DANE records.', epilog='This tool has a few limitations: it only IPv4 for SSL connections.')
+ parser = argparse.ArgumentParser(description='Create and verify DANE records.', epilog='This tool has a few limitations')
subparsers = parser.add_subparsers(title='Functions', help='Available functions, see %(prog)s function -h for function-specific help')
parser_verify = subparsers.add_parser('verify', help='Verify a TLSA record, exit 0 when all TLSA records are matched, exit 2 when a record does not match the received certificate, exit 1 on error.', epilog='Caveat: For TLSA validation, this program chases through the certificate chain offered by the server, not its local certificates.')
if not args.quiet:
print 'Attempting to verify the record with the TLS service...'
- addresses = getA(args.host, secure=secure)
+ addresses = getA(args.host, secure=secure) + getAAAA(args.host, secure=secure)
for address in addresses:
if not args.quiet:
print 'Got the following IP: %s' % str(address)
sys.exit(1)
# Don't error when the verification fails in the SSL handshake
ctx.set_verify(SSL.verify_none, depth=9)
- connection = SSL.Connection(ctx)
+ if isinstance(address, AAAARecord):
+ sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ else:
+ sock = None
+ connection = SSL.Connection(ctx, sock=sock)
try:
connection.connect((str(address), int(args.port)))
except SSL.Checker.WrongHost, e:
# The name on the remote cert doesn't match the hostname because we connect on IP, not hostname (as we want secure lookup)
pass
+ except socket.error, e:
+ print 'Cannot connect to %s: %s' % (address, str(e))
+ continue
chain = connection.get_peer_cert_chain()
verify_result = connection.get_verify_result()
# Good, now let's verify
+ if not verifyCertNameWithHostName(cert=chain[0], hostname=str(args.host), with_msg=True):
+ # The name on the cert doesn't match the hostname... we don't verify the TLSA record
+ print 'Not checking the TLSA record.'
+ continue
if record.usage == 1: # End-host cert
cert = chain[0]
if verifyCertMatch(record, cert):
input_ok = True
except:
sys.stdout.write('Port %s not numerical or within correct range (1 <= port <= 65535), try again (hit enter for default 443): ' % user_input)
- # Get the A records for the host
+ # Get the address records for the host
try:
- addresses = getA(args.host, secure=secure)
+ addresses = getA(args.host, secure=secure) + getAAAA(args.host, secure=secure)
except InsecureLookupException, e:
print >> sys.stderr, str(e)
sys.exit(1)
# We do the certificate handling here, as M2Crypto keeps segfaulting when try to do stuff with the cert if we don't
ctx = SSL.Context()
ctx.set_verify(SSL.verify_none, depth=9)
- connection = SSL.Connection(ctx)
+ if isinstance(address, AAAARecord):
+ sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ else:
+ sock = None
+ connection = SSL.Connection(ctx, sock=sock)
try:
connection.connect((str(address), int(connection_port)))
except SSL.Checker.WrongHost:
pass
+ except socket.error, e:
+ print 'Cannot connect to %s: %s' % (address, str(e))
+ continue
chain = connection.get_peer_cert_chain()
for chaincert in chain: