3 # swede - A tool to create DANE/TLSA (draft 14) records.
4 # This tool is really simple and not foolproof, it doesn't check the CN in the
5 # Subject field of the certificate. It also doesn't check if the supplied
6 # certificate is a CA certificate if usage 1 is specified (or any other
7 # checking for that matter).
9 # Usage is explained when running this program with --help
11 # This tool is loosly based on the dane tool in the sshfp package by Paul
12 # Wouters and Christopher Olah from xelerance.com.
14 # Copyright Pieter Lexis (pieter.lexis@os3.nl)
16 # License: GNU GENERAL PUBLIC LICENSE Version 2 or later
22 from M2Crypto import X509, SSL
23 from binascii import a2b_hex, b2a_hex
24 from hashlib import sha256, sha512
25 from ipaddr import IPv4Address, IPv6Address
28 def genTLSA(hostname, protocol, port, certificate, output='draft', usage=1, selector=0, mtype=1):
29 """This function generates a TLSARecord object using the data passed in the parameters,
30 it then validates the record and returns the RR as a string.
32 # check if valid vars were passed
33 if hostname[-1] != '.':
36 certificate = loadCert(certificate)
38 raise Exception('Cannot load certificate from disk')
40 # Create the record without a certificate
42 record = TLSARecord(name='%s._%s.%s'%(port,protocol,hostname), usage=usage, selector=selector, mtype=mtype, cert ='')
44 record = TLSARecord(name='_%s._%s.%s'%(port,protocol,hostname), usage=usage, selector=selector, mtype=mtype, cert ='')
45 # Check if the record is valid
47 if record.selector == 0:
48 # Hash the Full certificate
49 record.cert = getHash(certificate, record.mtype)
51 # Hash only the SubjectPublicKeyInfo
52 record.cert = getHash(certificate.get_pubkey(), record.mtype)
54 record.isValid(raiseException=True)
57 return record.getRecord(draft=True)
58 return record.getRecord()
60 def getA(hostname, secure=True):
61 """Gets a list of A records for hostname, returns a list of ARecords"""
63 records = getRecords(hostname, rrtype='A', secure=secure)
64 except InsecureLookupException, e:
67 except DNSLookupError, e:
68 print 'Unable to resolve %s: %s' % (hostname, str(e))
71 for record in records:
72 ret.append(ARecord(hostname, str(IPv4Address(int(b2a_hex(record),16)))))
75 def getAAAA(hostname, secure=True):
76 """Gets a list of A records for hostname, returns a list of AAAARecords"""
78 records = getRecords(hostname, rrtype='AAAA', secure=secure)
79 except InsecureLookupException, e:
82 except DNSLookupError, e:
83 print 'Unable to resolve %s: %s' % (hostname, str(e))
86 for record in records:
87 ret.append(AAAARecord(hostname, str(IPv6Address(int(b2a_hex(record),16)))))
90 def getVerificationErrorReason(num):
91 """This function returns the name of the X509 Error based on int(num)
93 # These were taken from the M2Crypto.m2 code
95 50: "X509_V_ERR_APPLICATION_VERIFICATION",
96 22: "X509_V_ERR_CERT_CHAIN_TOO_LONG",
97 10: "X509_V_ERR_CERT_HAS_EXPIRED",
98 9: "X509_V_ERR_CERT_NOT_YET_VALID",
99 28: "X509_V_ERR_CERT_REJECTED",
100 23: "X509_V_ERR_CERT_REVOKED",
101 7: "X509_V_ERR_CERT_SIGNATURE_FAILURE",
102 27: "X509_V_ERR_CERT_UNTRUSTED",
103 12: "X509_V_ERR_CRL_HAS_EXPIRED",
104 11: "X509_V_ERR_CRL_NOT_YET_VALID",
105 8: "X509_V_ERR_CRL_SIGNATURE_FAILURE",
106 18: "X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT",
107 14: "X509_V_ERR_ERROR_IN_CERT_NOT_AFTER_FIELD",
108 13: "X509_V_ERR_ERROR_IN_CERT_NOT_BEFORE_FIELD",
109 15: "X509_V_ERR_ERROR_IN_CRL_LAST_UPDATE_FIELD",
110 16: "X509_V_ERR_ERROR_IN_CRL_NEXT_UPDATE_FIELD",
111 24: "X509_V_ERR_INVALID_CA",
112 26: "X509_V_ERR_INVALID_PURPOSE",
113 17: "X509_V_ERR_OUT_OF_MEM",
114 25: "X509_V_ERR_PATH_LENGTH_EXCEEDED",
115 19: "X509_V_ERR_SELF_SIGNED_CERT_IN_CHAIN",
116 6: "X509_V_ERR_UNABLE_TO_DECODE_ISSUER_PUBLIC_KEY",
117 4: "X509_V_ERR_UNABLE_TO_DECRYPT_CERT_SIGNATURE",
118 5: "X509_V_ERR_UNABLE_TO_DECRYPT_CRL_SIGNATURE",
119 3: "X509_V_ERR_UNABLE_TO_GET_CRL",
120 2: "X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT",
121 20: "X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY",
122 21: "X509_V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE",
123 0: "X509_V_OK"}[int(num)]
125 def getRecords(hostname, rrtype='A', secure=True):
126 """Do a lookup of a name and a rrtype, returns a list of binary coded strings. Only queries for rr_class IN."""
128 ctx = unbound.ub_ctx()
129 ctx.add_ta_file('root.key')
130 # Use the local cache
131 if resolvconf and os.path.isfile(resolvconf):
132 ctx.resolvconf(resolvconf)
134 if type(rrtype) == str:
135 if 'RR_TYPE_' + rrtype in dir(unbound):
136 rrtype = getattr(unbound, 'RR_TYPE_' + rrtype)
138 raise Exception('Error: unknown RR TYPE: %s.' % rrtype)
139 elif type(rrtype) != int:
140 raise Exception('Error: rrtype in wrong format, neither int nor str.')
142 status, result = ctx.resolve(hostname, rrtype=rrtype)
143 if status == 0 and result.havedata:
144 if not result.secure:
146 # The data is insecure and a secure lookup was requested
147 raise InsecureLookupException('Error: query data not secure and secure data requested, unable to continue')
149 print >> sys.stderr, 'Warning: query data is not secure.'
150 # If we are here the data was either secure or insecure data is accepted
151 return result.data.raw
153 raise DNSLookupError('Unsuccesful lookup or no data returned for rrtype %s.' % rrtype)
155 def getHash(certificate, mtype):
156 """Hashes the certificate based on the mtype.
157 The certificate should be an M2Crypto.X509.X509 object (or the result of the get_pubkey() function on said object)
159 certificate = certificate.as_der()
161 return b2a_hex(certificate)
163 return sha256(certificate).hexdigest()
165 return sha512(certificate).hexdigest()
167 raise Exception('mtype should be 0,1,2')
169 def getTLSA(hostname, port=443, protocol='tcp', secure=True):
171 This function tries to do a secure lookup of the TLSA record.
172 At the moment it requests the TYPE65468 record and parses it into a 'valid' TLSA record
173 It returns a list of TLSARecord objects
175 if hostname[-1] != '.':
178 if not protocol.lower() in ['tcp', 'udp', 'sctp']:
179 raise Exception('Error: unknown protocol: %s. Should be one of tcp, udp or sctp' % protocol)
182 records = getRecords('*._%s.%s' % (protocol.lower(), hostname), rrtype=65468, secure=secure)
184 records = getRecords('_%s._%s.%s' % (port, protocol.lower(), hostname), rrtype=65468, secure=secure)
185 except InsecureLookupException, e:
188 except DNSLookupError, e:
189 print 'Unable to resolve %s: %s' % (hostname, str(e))
192 for record in records:
193 hexdata = b2a_hex(record)
195 ret.append(TLSARecord('*._%s.%s' % (protocol.lower(), hostname), int(hexdata[0:2],16), int(hexdata[2:4],16), int(hexdata[4:6],16), hexdata[6:]))
197 ret.append(TLSARecord('_%s._%s.%s' % (port, protocol.lower(), hostname), int(hexdata[0:2],16), int(hexdata[2:4],16), int(hexdata[4:6],16), hexdata[6:]))
200 def loadCert(certificate):
201 """Returns an M2Crypto.X509.X509 object"""
202 if isinstance(certificate, X509.X509):
203 # nothing to be done :-)
206 # Maybe we were passed a path
207 return X509.load_cert(certificate)
209 # Can't load the cert
210 raise Exception('Unable to load certificate %s.' % certificate)
212 def verifyCertMatch(record, cert):
214 Verify the certificate with the record.
215 record should be a TLSARecord and cert should be a M2Crypto.X509.X509
217 if not isinstance(cert, X509.X509):
219 if not isinstance(record, TLSARecord):
222 if record.selector == 1:
223 certhash = getHash(cert.get_pubkey(), record.mtype)
225 certhash = getHash(cert, record.mtype)
230 if certhash == record.cert:
236 """When instanciated, this class contains all the fields of a TLSA record.
238 def __init__(self, name, usage, selector, mtype, cert):
239 """name is the name of the RR in the format: /^(_\d{1,5}|\*)\._(tcp|udp|sctp)\.([a-z0-9]*\.){2,}$/
240 usage, selector and mtype should be an integer
241 cert should be a hexidecimal string representing the certificate to be matched field
244 self.rrtype = 65468 # TLSA provisional
245 self.rrclass = 1 # IN
246 self.name = str(name)
247 self.usage = int(usage)
248 self.selector = int(selector)
249 self.mtype = int(mtype)
250 self.cert = str(cert)
252 raise Exception('Invalid value passed, unable to create a TLSARecord')
254 def getRecord(self, draft=False):
255 """Returns the RR string of this TLSARecord, either in rfc (default) or draft format"""
257 return '%s IN TYPE65468 \# %s %s%s%s%s' % (self.name, (len(self.cert)/2)+3 , self._toHex(self.usage), self._toHex(self.selector), self._toHex(self.mtype), self.cert)
258 return '%s IN TLSA %s %s %s %s' % (self.name, self.usage, self.selector, self.mtype, self.cert)
260 def _toHex(self, val):
261 """Helper function to create hex strings from integers"""
264 def isValid(self, raiseException=False):
265 """Check whether all fields in the TLSA record are conforming to the spec and check if the port, protocol and name are good"""
268 if not 1 <= int(self.getPort()) <= 65535:
269 err.append('Port %s not within correct range (1 <= port <= 65535)' % self.getPort())
271 if self.getPort() != '*':
272 err.append('Port %s not a number' % self.getPort())
273 if not self.usage in [0,1,2]:
274 err.append('Usage: invalid (%s is not one of 0, 1 or 2)' % self.usage)
275 if not self.selector in [0,1]:
276 err.append('Selector: invalid (%s is not one of 0 or 1)' % self.selector)
277 if not self.mtype in [0,1,2]:
278 err.append('Matching Type: invalid (%s is not one of 0, 1 or 2)' % self.mtype)
279 if not self.isNameValid():
280 err.append('Name (%s) is not in the correct format: _portnumber._transportprotocol.hostname.dom.' % self.name)
281 # A certificate length of 0 is accepted
282 if self.mtype in [1,2] and len(self.cert) != 0:
283 if not len(self.cert) == {1:64,2:128}[self.mtype]:
284 err.append('Certificate for Association: invalid (Hash length does not match hash-type in Matching Type(%s))' % {1:'SHA-256',2:'SHA-512'}[self.mtype])
286 if not raiseException:
289 msg = 'The TLSA record is invalid.'
291 msg += '\n\t%s' % error
292 raise RecordValidityException(msg)
296 def isNameValid(self):
297 """Check if the name if in the correct format"""
298 if not re.match('^(_\d{1,5}|\*)\._(tcp|udp|sctp)\.([a-z0-9]*\.){2,}$', self.name):
302 def getProtocol(self):
303 """Returns the protocol based on the name"""
304 return re.split('\.', self.name)[1][1:]
307 """Returns the port based on the name"""
308 if re.split('\.', self.name)[0][0] == '*':
311 return re.split('\.', self.name)[0][1:]
314 """An object representing an A Record (IPv4 address)"""
315 def __init__(self, hostname, address):
317 self.hostname = hostname
318 self.address = address
325 IPv4Address(self.address)
331 """An object representing an AAAA Record (IPv6 address)"""
332 def __init__(self, hostname, address):
334 self.address = address
341 IPv6Address(self.address)
347 class RecordValidityException(Exception):
350 class InsecureLookupException(Exception):
353 class DNSLookupError(Exception):
356 if __name__ == '__main__':
359 parser = argparse.ArgumentParser(description='Create and verify DANE records.', epilog='This tool has a few limitations: it only IPv4 for SSL connections.')
361 subparsers = parser.add_subparsers(title='Functions', help='Available functions, see %(prog)s function -h for function-specific help')
362 parser_verify = subparsers.add_parser('verify', help='Verify a TLSA record, exit 0 when all TLSA records are matched, exit 2 when a record does not match the received certificate, exit 1 on error.', epilog='Caveat: For TLSA validation, this program chases through the certificate chain offered by the server, not it\'s local certificates.')
363 parser_verify.set_defaults(function='verify')
364 parser_create = subparsers.add_parser('create', help='Create a TLSA record')
365 parser_create.set_defaults(function='create')
367 #parser.add_argument('-4', dest='ipv4', action='store_true',help='use ipv4 networking only')
368 #parser.add_argument('-6', dest='ipv6', action='store_true',help='use ipv6 networking only')
369 parser.add_argument('--insecure', action='store_true', default=False, help='Allow use of non-dnssec secured answers')
370 parser.add_argument('--resolvconf', metavar='/PATH/TO/RESOLV.CONF', action='store', default='', help='Use a recursive resolver from resolv.conf')
371 parser.add_argument('-v', '--version', action='version', version='%(prog)s v0.1', help='show version and exit')
372 parser.add_argument('host', metavar="hostname")
374 parser_verify.add_argument('--port', '-p', action='store', default='443', help='The port, or \'*\' where running TLS is located (default: %(default)s).')
375 parser_verify.add_argument('--protocol', action='store', choices=['tcp','udp','sctp'], default='tcp', help='The protocol the TLS service is using (default: %(default)s).')
376 parser_verify.add_argument('--only-rr', '-o', action='store_true', help='Only verify that the TLSA resource record is correct (do not check certificate)')
377 parser_verify.add_argument('--ca-cert', metavar='/PATH/TO/CERTSTORE', action='store', default = '/etc/ssl/certs/', help='Path to a CA certificate or a directory containing the certificates (default: %(default)s)')
378 parser_verify.add_argument('--quiet', '-q', action='store_true', help='Only print the result of the validation')
380 parser_create.add_argument('--port', '-p', action='store', type=int, default=443, help='The port where running TLS is located (default: %(default)s).')
381 parser_create.add_argument('--protocol', action='store', choices=['tcp','udp','sctp'], default='tcp', help='The protocol the TLS service is using (default: %(default)s).')
382 parser_create.add_argument('--certificate', '-c', help='The certificate used for the host. If certificate is empty, the certificate will be downloaded from the server')
383 parser_create.add_argument('--output', '-o', action='store', default='draft', choices=['draft','rfc','both'], help='The type of output. Draft (private RRtype, 65468), RFC (TLSA) or both (default: %(default)s).')
385 # Usage of the certificate
386 parser_create.add_argument('--usage', '-u', action='store', type=int, default=1, choices=[0,1,2], help='The Usage of the Certificate for Association. \'0\' for CA, \'1\' for End Entity, \'2\' for trust-anchor (default: %(default)s).')
387 parser_create.add_argument('--selector', '-s', action='store', type=int, default=0, choices=[0,1], help='The Selector for the Certificate for Association. \'0\' for Full Certificate, \'1\' for SubjectPublicKeyInfo (default: %(default)s).')
388 parser_create.add_argument('--mtype', '-m', action='store', type=int, default=1, choices=[0,1,2], help='The Matching Type of the Certificate for Association. \'0\' for Exact match, \'1\' for SHA-256 hash, \'2\' for SHA-512 (default: %(default)s).')
390 args = parser.parse_args()
392 if args.host[-1] != '.':
397 if os.path.isfile(args.resolvconf):
398 resolvconf = args.resolvconf
400 print >> sys.stdout, '%s is not a file. Unable to use it as resolv.conf' % args.resolvconf
405 # not operations are fun!
406 secure = not args.insecure
408 if args.function == 'verify':
409 records = getTLSA(args.host, args.port, args.protocol, secure)
410 if len(records) == 0:
413 for record in records:
415 # First, check if the first three fields have correct values.
417 print 'Received the following record for name %s:' % record.name
418 print '\tUsage:\t\t\t\t%d (%s)' % (record.usage, {0:'CA Constraint', 1:'End-Entity Constraint', 2:'Trust Anchor'}[record.usage])
419 print '\tSelector:\t\t\t%d (%s)' % (record.selector, {0:'Certificate', 1:'SubjectPublicKeyInfo'}[record.selector])
420 print '\tMatching Type:\t\t\t%d (%s)' % (record.mtype, {0:'Full Certificate', 1:'SHA-256', 2:'SHA-512'}[record.mtype])
421 print '\tCertificate for Association:\t%s' % record.cert
424 record.isValid(raiseException=True)
425 except RecordValidityException, e:
426 print sys.stderr, 'Error: %s' % str(e)
430 print 'This record is valid (well-formed).'
433 # Go to the next record
436 # When we are here, The user also wants to verify the certificates with the record
437 if args.protocol != 'tcp':
438 print >> sys.stderr, 'Only SSL over TCP is supported (sorry)'
442 print 'Attempting to verify the record with the TLS service...'
443 addresses = getA(args.host, secure=secure)
444 for address in addresses:
446 print 'Got the following IP: %s' % str(address)
447 # We do the certificate handling here, as M2Crypto keeps segfaulting when we do it in a method
449 if os.path.isfile(args.ca_cert):
450 if ctx.load_verify_locations(cafile=args.ca_cert) != 1: raise Exception('No CA cert')
451 elif os.path.exists(args.ca_cert):
452 if ctx.load_verify_locations(capath=args.ca_cert) != 1: raise Exception('No CA certs')
454 print >> sys.stderr, '%s is neither a file nor a directory, unable to continue' % args.ca_cert
456 # Don't error when the verification fails in the SSL handshake
457 ctx.set_verify(SSL.verify_none, depth=9)
458 connection = SSL.Connection(ctx)
460 connection.connect((str(address), int(args.port)))
461 except SSL.Checker.WrongHost, e:
462 # The name on the remote cert doesn't match the hostname because we connect on IP, not hostname (as we want secure lookup)
464 chain = connection.get_peer_cert_chain()
465 verify_result = connection.get_verify_result()
467 # Good, now let's verify
468 if record.usage == 1: # End-host cert
470 if verifyCertMatch(record, cert):
471 if verify_result == 0: # The cert chains to a valid CA cert according to the system-certificates
472 print 'SUCCES (Usage 1): Certificate offered by the server matches the one mentioned in the TLSA record and chains to a valid CA certificate'
474 print 'FAIL (Usage 1): Certificate offered by the server matches the one mentioned in the TLSA record but the following error was raised during PKIX validation: %s' % getVerificationErrorReason(verify_result)
475 if pre_exit == 0: pre_exit = 2
476 if not args.quiet: print 'The matched certificate has Subject: %s' % cert.get_subject()
478 print 'FAIL: Certificate offered by the server does not match the TLSA record'
479 if pre_exit == 0: pre_exit = 2
481 elif record.usage == 0: # CA constraint
483 # Remove the first (= End-Entity cert) from the chain
485 if verifyCertMatch(record, cert):
490 if verify_result == 0:
491 print 'SUCCES (Usage 0): A certificate in the certificate chain offered by the server matches the one mentioned in the TLSA record and is a CA certificate'
493 print 'FAIL (Usage 0): A certificate in the certificate chain offered by the server matches the one mentioned in the TLSA record and is a CA certificate, but the following error was raised during PKIX validation:' % getVerificationErrorReason(verify_result)
494 if pre_exit == 0: pre_exit = 2
496 print 'FAIL (Usage 0): A certificate in the certificate chain offered by the server matches the one mentioned in the TLSA record but is not a CA certificate'
497 if pre_exit == 0: pre_exit = 2
498 if not args.quiet: print 'The matched certificate has Subject: %s' % cert.get_subject()
500 print 'FAIL (Usage 0): No certificate in the certificate chain offered by the server matches the TLSA record'
501 if pre_exit == 0: pre_exit = 2
503 elif record.usage == 2: # Usage 2, ANY cert in the chain must match (aka 'pick any')
506 if verifyCertMatch(record, cert):
510 print 'SUCCES (usage 2): A certificate in the certificate chain (including the end-entity certificate) offered by the server matches the TLSA record'
511 if not args.quiet: print 'The matched certificate has Subject: %s' % cert.get_subject()
513 print 'FAIL (usage 2): No certificate in the certificate chain (including the end-entity certificate) offered by the server matches the TLSA record'
514 if pre_exit == 0: pre_exit = 2
516 # Cleanup, just in case
521 # END for address in addresses
522 # END for record in records
526 else: # we want to create
528 if not args.certificate:
529 if args.protocol != 'tcp':
530 print >> sys.stderr, 'Only SSL over TCP is supported (sorry)'
533 print 'No certificate specified on the commandline, attempting to retrieve it from the server %s' % (args.host)
534 connection_port = args.port
536 sys.stdout.write('The port specified on the commandline is *, please specify the port of the TLS service on %s (443): ' % args.host)
539 user_input = raw_input()
541 connection_port = 443
544 if 1 <= int(user_input) <= 65535:
545 connection_port = user_input
548 sys.stdout.write('Port %s not numerical or within correct range (1 <= port <= 65535), try again (hit enter for default 443): ' % user_input)
549 # Get the A records for the host
551 addresses = getA(args.host, secure=secure)
552 except InsecureLookupException, e:
553 print >> sys.stderr, str(e)
556 for address in addresses:
557 print 'Attempting to get certificate from %s' % str(address)
558 # We do the certificate handling here, as M2Crypto keeps segfaulting when try to do stuff with the cert if we don't
560 ctx.set_verify(SSL.verify_none, depth=9)
561 connection = SSL.Connection(ctx)
563 connection.connect((str(address), int(connection_port)))
564 except SSL.Checker.WrongHost:
567 chain = connection.get_peer_cert_chain()
568 for chaincert in chain:
569 if int(args.usage) == 1:
570 # The first cert is the end-entity cert
571 print 'Got a certificate with Subject: %s' % chaincert.get_subject()
575 if (int(args.usage) == 0 and chaincert.check_ca()) or int(args.usage) == 2:
576 sys.stdout.write('Got a certificate with the following Subject:\n\t%s.\nUse this as certificate to match? [y/N] ' % chaincert.get_subject())
579 user_input = raw_input()
580 if user_input in ['','n','N']:
582 elif user_input in ['y', 'Y']:
586 sys.stdout.write('Please answer Y or N')
590 if cert: # Print the requested records based on the retrieved certificates
591 if args.output == 'b':
592 print genTLSA(args.host, args.protocol, args.port, cert, 'draft', args.usage, args.selector, args.mtype)
593 print genTLSA(args.host, args.protocol, args.port, cert, 'rfc', args.usage, args.selector, args.mtype)
595 print genTLSA(args.host, args.protocol, args.port, cert, args.output, args.usage, args.selector, args.mtype)
597 else: # Pass the path to the certificate to the genTLSA function
598 if args.output == 'b':
599 print genTLSA(args.host, args.protocol, args.port, args.certificate, 'draft', args.usage, args.selector, args.mtype)
600 print genTLSA(args.host, args.protocol, args.port, args.certificate, 'rfc', args.usage, args.selector, args.mtype)
602 print genTLSA(args.host, args.protocol, args.port, args.certificate, args.output, args.usage, args.selector, args.mtype)