3 # swede - A tool to create DANE/TLSA (draft 14) records.
4 # This tool is really simple and not foolproof, it doesn't check the CN in the
5 # Subject field of the certificate. It also doesn't check if the supplied
6 # certificate is a CA certificate if usage 1 is specified (or any other
7 # checking for that matter).
9 # Usage is explained when running this program with --help
11 # This tool is loosly based on the dane tool in the sshfp package by Paul
12 # Wouters and Christopher Olah from xelerance.com.
14 # Copyright Pieter Lexis (pieter.lexis@os3.nl)
16 # License: GNU GENERAL PUBLIC LICENSE Version 2 or later
22 from M2Crypto import X509, SSL
23 from binascii import a2b_hex, b2a_hex
24 from hashlib import sha256, sha512
25 from ipaddr import IPv4Address, IPv6Address
27 def genTLSA(hostname, protocol, port, certificate, output='draft', usage=1, selector=0, mtype=1):
28 """This function generates a TLSARecord object using the data passed in the parameters,
29 it then validates the record and returns the RR as a string.
31 # check if valid vars were passed
32 if hostname[-1] != '.':
35 certificate = loadCert(certificate)
37 raise Exception('Cannot load certificate from disk')
39 # Create the record without a certificate
41 record = TLSARecord(name='%s._%s.%s'%(port,protocol,hostname), usage=usage, selector=selector, mtype=mtype, cert ='')
43 record = TLSARecord(name='_%s._%s.%s'%(port,protocol,hostname), usage=usage, selector=selector, mtype=mtype, cert ='')
44 # Check if the record is valid
46 if record.selector == 0:
47 # Hash the Full certificate
48 record.cert = getHash(certificate, record.mtype)
50 # Hash only the SubjectPublicKeyInfo
51 record.cert = getHash(certificate.get_pubkey(), record.mtype)
53 record.isValid(raiseException=True)
56 return record.getRecord(draft=True)
57 return record.getRecord()
59 def getA(hostname, secure=True):
60 """Gets a list of A records for hostname, returns a list of ARecords"""
61 records = getRecords(hostname, rrtype='A', secure=secure)
63 for record in records:
64 ret.append(ARecord(hostname, str(IPv4Address(int(b2a_hex(record),16)))))
67 def getAAAA(hostname, secure=True):
68 """Gets a list of A records for hostname, returns a list of AAAARecords"""
69 records = getRecords(hostname, rrtype='AAAA', secure=secure)
71 for record in records:
72 ret.append(AAAARecord(hostname, str(IPv6Address(int(b2a_hex(record),16)))))
75 def getVerificationErrorReason(num):
76 """This function returns the name of the X509 Error based on int(num)
78 # These were taken from the M2Crypto.m2 code
80 50: "X509_V_ERR_APPLICATION_VERIFICATION",
81 22: "X509_V_ERR_CERT_CHAIN_TOO_LONG",
82 10: "X509_V_ERR_CERT_HAS_EXPIRED",
83 9: "X509_V_ERR_CERT_NOT_YET_VALID",
84 28: "X509_V_ERR_CERT_REJECTED",
85 23: "X509_V_ERR_CERT_REVOKED",
86 7: "X509_V_ERR_CERT_SIGNATURE_FAILURE",
87 27: "X509_V_ERR_CERT_UNTRUSTED",
88 12: "X509_V_ERR_CRL_HAS_EXPIRED",
89 11: "X509_V_ERR_CRL_NOT_YET_VALID",
90 8: "X509_V_ERR_CRL_SIGNATURE_FAILURE",
91 18: "X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT",
92 14: "X509_V_ERR_ERROR_IN_CERT_NOT_AFTER_FIELD",
93 13: "X509_V_ERR_ERROR_IN_CERT_NOT_BEFORE_FIELD",
94 15: "X509_V_ERR_ERROR_IN_CRL_LAST_UPDATE_FIELD",
95 16: "X509_V_ERR_ERROR_IN_CRL_NEXT_UPDATE_FIELD",
96 24: "X509_V_ERR_INVALID_CA",
97 26: "X509_V_ERR_INVALID_PURPOSE",
98 17: "X509_V_ERR_OUT_OF_MEM",
99 25: "X509_V_ERR_PATH_LENGTH_EXCEEDED",
100 19: "X509_V_ERR_SELF_SIGNED_CERT_IN_CHAIN",
101 6: "X509_V_ERR_UNABLE_TO_DECODE_ISSUER_PUBLIC_KEY",
102 4: "X509_V_ERR_UNABLE_TO_DECRYPT_CERT_SIGNATURE",
103 5: "X509_V_ERR_UNABLE_TO_DECRYPT_CRL_SIGNATURE",
104 3: "X509_V_ERR_UNABLE_TO_GET_CRL",
105 2: "X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT",
106 20: "X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY",
107 21: "X509_V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE",
108 0: "X509_V_OK"}[int(num)]
110 def getRecords(hostname, rrtype='A', secure=True):
111 """Do a lookup of a name and a rrtype, returns a list of binary coded strings. Only queries for rr_class IN."""
113 ctx = unbound.ub_ctx()
114 ctx.add_ta_file('root.key')
115 # Use the local cache
116 if resolvconf and os.path.isfile(resolvconf):
117 ctx.resolvconf(resolvconf)
119 if type(rrtype) == str:
120 if 'RR_TYPE_' + rrtype in dir(unbound):
121 rrtype = getattr(unbound, 'RR_TYPE_' + rrtype)
123 raise Exception('Error: unknown RR TYPE: %s.' % rrtype)
124 elif type(rrtype) != int:
125 raise Exception('Error: rrtype in wrong format, neither int nor str.')
127 status, result = ctx.resolve(hostname, rrtype=rrtype)
128 if status == 0 and result.havedata:
129 if not result.secure:
131 # The data is insecure and a secure lookup was requested
132 raise InsecureLookupException('Error: query data not secure and secure data requested, unable to continue')
134 print >> sys.stderr, 'Warning: query data is not secure.'
135 # If we are here the data was either secure or insecure data is accepted
136 return result.data.raw
138 raise Exception('Error: Unsuccesful lookup or no data returned.')
140 def getHash(certificate, mtype):
141 """Hashes the certificate based on the mtype.
142 The certificate should be an M2Crypto.X509.X509 object (or the result of the get_pubkey() function on said object)
144 certificate = certificate.as_der()
146 return b2a_hex(certificate)
148 return sha256(certificate).hexdigest()
150 return sha512(certificate).hexdigest()
152 raise Exception('mtype should be 0,1,2')
154 def getTLSA(hostname, port=443, protocol='tcp', secure=True):
156 This function tries to do a secure lookup of the TLSA record.
157 At the moment it requests the TYPE65468 record and parses it into a 'valid' TLSA record
158 It returns a list of TLSARecord objects
160 if hostname[-1] != '.':
163 if not protocol.lower() in ['tcp', 'udp', 'sctp']:
164 raise Exception('Error: unknown protocol: %s. Should be one of tcp, udp or sctp' % protocol)
167 records = getRecords('*._%s.%s' % (protocol.lower(), hostname), rrtype=65468, secure=secure)
169 records = getRecords('_%s._%s.%s' % (port, protocol.lower(), hostname), rrtype=65468, secure=secure)
170 except InsecureLookupException, e:
174 for record in records:
175 hexdata = b2a_hex(record)
177 ret.append(TLSARecord('*._%s.%s' % (protocol.lower(), hostname), int(hexdata[0:2],16), int(hexdata[2:4],16), int(hexdata[4:6],16), hexdata[6:]))
179 ret.append(TLSARecord('_%s._%s.%s' % (port, protocol.lower(), hostname), int(hexdata[0:2],16), int(hexdata[2:4],16), int(hexdata[4:6],16), hexdata[6:]))
182 def loadCert(certificate):
183 """Returns an M2Crypto.X509.X509 object"""
184 if isinstance(certificate, X509.X509):
185 # nothing to be done :-)
188 # Maybe we were passed a path
189 return X509.load_cert(certificate)
191 # Can't load the cert
192 raise Exception('Unable to load certificate %s.' % certificate)
194 def verifyCertMatch(record, cert):
196 Verify the certificate with the record.
197 record should be a TLSARecord and cert should be a M2Crypto.X509.X509
199 if not isinstance(cert, X509.X509):
201 if not isinstance(record, TLSARecord):
204 if record.selector == 1:
205 certhash = getHash(cert.get_pubkey(), record.mtype)
207 certhash = getHash(cert, record.mtype)
212 if certhash == record.cert:
218 """When instanciated, this class contains all the fields of a TLSA record.
220 def __init__(self, name, usage, selector, mtype, cert):
221 """name is the name of the RR in the format: /^(_\d{1,5}|\*)\._(tcp|udp|sctp)\.([a-z0-9]*\.){2,}$/
222 usage, selector and mtype should be an integer
223 cert should be a hexidecimal string representing the certificate to be matched field
226 self.rrtype = 65468 # TLSA provisional
227 self.rrclass = 1 # IN
228 self.name = str(name)
229 self.usage = int(usage)
230 self.selector = int(selector)
231 self.mtype = int(mtype)
232 self.cert = str(cert)
234 raise Exception('Invalid value passed, unable to create a TLSARecord')
236 def getRecord(self, draft=False):
237 """Returns the RR string of this TLSARecord, either in rfc (default) or draft format"""
239 return '%s IN TYPE65468 \# %s %s%s%s%s' % (self.name, (len(self.cert)/2)+3 , self._toHex(self.usage), self._toHex(self.selector), self._toHex(self.mtype), self.cert)
240 return '%s IN TLSA %s %s %s %s' % (self.name, self.usage, self.selector, self.mtype, self.cert)
242 def _toHex(self, val):
243 """Helper function to create hex strings from integers"""
246 def isValid(self, raiseException=False):
247 """Check whether all fields in the TLSA record are conforming to the spec and check if the port, protocol and name are good"""
250 if not 1 <= int(self.getPort()) <= 65535:
251 err.append('Port %s not within correct range (1 <= port <= 65535)' % self.getPort())
253 if self.getPort() != '*':
254 err.append('Port %s not a number' % self.getPort())
255 if not self.usage in [0,1,2]:
256 err.append('Usage: invalid (%s is not one of 0, 1 or 2)' % self.usage)
257 if not self.selector in [0,1]:
258 err.append('Selector: invalid (%s is not one of 0 or 1)' % self.selector)
259 if not self.mtype in [0,1,2]:
260 err.append('Matching Type: invalid (%s is not one of 0, 1 or 2)' % self.mtype)
261 if not self.isNameValid():
262 err.append('Name (%s) is not in the correct format: _portnumber._transportprotocol.hostname.dom.' % self.name)
263 # A certificate length of 0 is accepted
264 if self.mtype in [1,2] and len(self.cert) != 0:
265 if not len(self.cert) == {1:64,2:128}[self.mtype]:
266 err.append('Certificate for Association: invalid (Hash length does not match hash-type in Matching Type(%s))' % {1:'SHA-256',2:'SHA-512'}[self.mtype])
268 if not raiseException:
271 msg = 'The TLSA record is invalid.'
273 msg += '\n\t%s' % error
274 raise RecordValidityException(msg)
278 def isNameValid(self):
279 """Check if the name if in the correct format"""
280 if not re.match('^(_\d{1,5}|\*)\._(tcp|udp|sctp)\.([a-z0-9]*\.){2,}$', self.name):
284 def getProtocol(self):
285 """Returns the protocol based on the name"""
286 return re.split('\.', self.name)[1][1:]
289 """Returns the port based on the name"""
290 if re.split('\.', self.name)[0][0] == '*':
293 return re.split('\.', self.name)[0][1:]
296 """An object representing an A Record (IPv4 address)"""
297 def __init__(self, hostname, address):
299 self.hostname = hostname
300 self.address = address
307 IPv4Address(self.address)
313 """An object representing an AAAA Record (IPv6 address)"""
314 def __init__(self, hostname, address):
316 self.address = address
323 IPv6Address(self.address)
329 class RecordValidityException(Exception):
330 def __init__(self, value):
335 class InsecureLookupException(Exception):
336 def __init__(self, value):
341 if __name__ == '__main__':
344 parser = argparse.ArgumentParser(description='Create and verify DANE records.', epilog='This tool has a few limitations: it only IPv4 for SSL connections.')
346 subparsers = parser.add_subparsers(title='Functions', help='Available functions, see %(prog)s function -h for function-specific help')
347 parser_verify = subparsers.add_parser('verify', help='Verify a TLSA record, exit 0 when all TLSA records are matched, exit 2 when a record does not match the received certificate, exit 1 on error.', epilog='Caveat: For TLSA validation, this program chases through the certificate chain offered by the server, not it\'s local certificates.')
348 parser_verify.set_defaults(function='verify')
349 parser_create = subparsers.add_parser('create', help='Create a TLSA record')
350 parser_create.set_defaults(function='create')
352 #parser.add_argument('-4', dest='ipv4', action='store_true',help='use ipv4 networking only')
353 #parser.add_argument('-6', dest='ipv6', action='store_true',help='use ipv6 networking only')
354 parser.add_argument('--insecure', action='store_true', default=False, help='Allow use of non-dnssec secured answers')
355 parser.add_argument('--resolvconf', metavar='/PATH/TO/RESOLV.CONF', action='store', default='', help='Use a recursive resolver from resolv.conf')
356 parser.add_argument('-v', '--version', action='version', version='%(prog)s v0.1', help='show version and exit')
357 parser.add_argument('host', metavar="hostname")
359 parser_verify.add_argument('--port', '-p', action='store', default='443', help='The port, or \'*\' where running TLS is located (default: %(default)s).')
360 parser_verify.add_argument('--protocol', action='store', choices=['tcp','udp','sctp'], default='tcp', help='The protocol the TLS service is using (default: %(default)s).')
361 parser_verify.add_argument('--only-rr', '-o', action='store_true', help='Only verify that the TLSA resource record is correct (do not check certificate)')
362 parser_verify.add_argument('--ca-cert', metavar='/PATH/TO/CERTSTORE', action='store', default = '/etc/ssl/certs/', help='Path to a CA certificate or a directory containing the certificates (default: %(default)s)')
363 parser_verify.add_argument('--quiet', '-q', action='store_true', help='Only print the result of the validation')
365 parser_create.add_argument('--port', '-p', action='store', type=int, default=443, help='The port where running TLS is located (default: %(default)s).')
366 parser_create.add_argument('--protocol', action='store', choices=['tcp','udp','sctp'], default='tcp', help='The protocol the TLS service is using (default: %(default)s).')
367 parser_create.add_argument('--certificate', '-c', help='The certificate used for the host. If certificate is empty, the certificate will be downloaded from the server')
368 parser_create.add_argument('--output', '-o', action='store', default='draft', choices=['draft','rfc','both'], help='The type of output. Draft (private RRtype, 65468), RFC (TLSA) or both (default: %(default)s).')
370 # Usage of the certificate
371 parser_create.add_argument('--usage', '-u', action='store', type=int, default=1, choices=[0,1,2], help='The Usage of the Certificate for Association. \'0\' for CA, \'1\' for End Entity, \'2\' for trust-anchor (default: %(default)s).')
372 parser_create.add_argument('--selector', '-s', action='store', type=int, default=0, choices=[0,1], help='The Selector for the Certificate for Association. \'0\' for Full Certificate, \'1\' for SubjectPublicKeyInfo (default: %(default)s).')
373 parser_create.add_argument('--mtype', '-m', action='store', type=int, default=1, choices=[0,1,2], help='The Matching Type of the Certificate for Association. \'0\' for Exact match, \'1\' for SHA-256 hash, \'2\' for SHA-512 (default: %(default)s).')
375 args = parser.parse_args()
377 if args.host[-1] != '.':
382 if os.path.isfile(args.resolvconf):
383 resolvconf = args.resolvconf
385 print >> sys.stdout, '%s is not a file. Unable to use it as resolv.conf' % args.resolvconf
390 # not operations are fun!
391 secure = not args.insecure
393 if args.function == 'verify':
394 records = getTLSA(args.host, args.port, args.protocol, secure)
395 if len(records) == 0:
398 for record in records:
400 # First, check if the first three fields have correct values.
402 print 'Received the following record for name %s:' % record.name
403 print '\tUsage:\t\t\t\t%d (%s)' % (record.usage, {0:'CA Constraint', 1:'End-Entity Constraint', 2:'Trust Anchor'}[record.usage])
404 print '\tSelector:\t\t\t%d (%s)' % (record.selector, {0:'Certificate', 1:'SubjectPublicKeyInfo'}[record.selector])
405 print '\tMatching Type:\t\t\t%d (%s)' % (record.mtype, {0:'Full Certificate', 1:'SHA-256', 2:'SHA-512'}[record.mtype])
406 print '\tCertificate for Association:\t%s' % record.cert
409 record.isValid(raiseException=True)
410 except RecordValidityException, e:
411 print sys.stderr, 'Error: %s' % str(e)
415 print 'This record is valid (well-formed).'
418 # Go to the next record
421 # When we are here, The user also wants to verify the certificates with the record
422 if args.protocol != 'tcp':
423 print >> sys.stderr, 'Only SSL over TCP is supported (sorry)'
427 print 'Attempting to verify the record with the TLS service...'
428 addresses = getA(args.host, secure=secure)
429 for address in addresses:
431 print 'Got the following IP: %s' % str(address)
432 # We do the certificate handling here, as M2Crypto keeps segfaulting when we do it in a method
434 if os.path.isfile(args.ca_cert):
435 if ctx.load_verify_locations(cafile=args.ca_cert) != 1: raise Exception('No CA cert')
436 elif os.path.exists(args.ca_cert):
437 if ctx.load_verify_locations(capath=args.ca_cert) != 1: raise Exception('No CA certs')
439 print >> sys.stderr, '%s is neither a file nor a directory, unable to continue' % args.ca_cert
441 # Don't error when the verification fails in the SSL handshake
442 ctx.set_verify(SSL.verify_none, depth=9)
443 connection = SSL.Connection(ctx)
445 connection.connect((str(address), int(args.port)))
446 except SSL.Checker.WrongHost, e:
447 # The name on the remote cert doesn't match the hostname because we connect on IP, not hostname (as we want secure lookup)
449 chain = connection.get_peer_cert_chain()
450 verify_result = connection.get_verify_result()
452 # Good, now let's verify
453 if record.usage == 1: # End-host cert
455 if verifyCertMatch(record, cert):
456 if verify_result == 0: # The cert chains to a valid CA cert according to the system-certificates
457 print 'SUCCES (Usage 1): Certificate offered by the server matches the one mentioned in the TLSA record and chains to a valid CA certificate'
459 print 'FAIL (Usage 1): Certificate offered by the server matches the one mentioned in the TLSA record but the following error was raised during PKIX validation: %s' % getVerificationErrorReason(verify_result)
460 if pre_exit == 0: pre_exit = 2
461 if not args.quiet: print 'The matched certificate has Subject: %s' % cert.get_subject()
463 print 'FAIL: Certificate offered by the server does not match the TLSA record'
464 if pre_exit == 0: pre_exit = 2
466 elif record.usage == 0: # CA constraint
468 # Remove the first (= End-Entity cert) from the chain
470 if verifyCertMatch(record, cert):
475 if verify_result == 0:
476 print 'SUCCES (Usage 0): A certificate in the certificate chain offered by the server matches the one mentioned in the TLSA record and is a CA certificate'
478 print 'FAIL (Usage 0): A certificate in the certificate chain offered by the server matches the one mentioned in the TLSA record and is a CA certificate, but the following error was raised during PKIX validation:' % getVerificationErrorReason(verify_result)
479 if pre_exit == 0: pre_exit = 2
481 print 'FAIL (Usage 0): A certificate in the certificate chain offered by the server matches the one mentioned in the TLSA record but is not a CA certificate'
482 if pre_exit == 0: pre_exit = 2
483 if not args.quiet: print 'The matched certificate has Subject: %s' % cert.get_subject()
485 print 'FAIL (Usage 0): No certificate in the certificate chain offered by the server matches the TLSA record'
486 if pre_exit == 0: pre_exit = 2
488 elif record.usage == 2: # Usage 2, ANY cert in the chain must match (aka 'pick any')
491 if verifyCertMatch(record, cert):
495 print 'SUCCES (usage 2): A certificate in the certificate chain (including the end-entity certificate) offered by the server matches the TLSA record'
496 if not args.quiet: print 'The matched certificate has Subject: %s' % cert.get_subject()
498 print 'FAIL (usage 2): No certificate in the certificate chain (including the end-entity certificate) offered by the server matches the TLSA record'
499 if pre_exit == 0: pre_exit = 2
501 # Cleanup, just in case
506 # END for address in addresses
507 # END for record in records
511 else: # we want to create
513 if not args.certificate:
514 if args.protocol != 'tcp':
515 print >> sys.stderr, 'Only SSL over TCP is supported (sorry)'
518 print 'No certificate specified on the commandline, attempting to retrieve it from the server %s' % (args.host)
519 connection_port = args.port
521 sys.stdout.write('The port specified on the commandline is *, please specify the port of the TLS service on %s (443): ' % args.host)
524 user_input = raw_input()
526 connection_port = 443
529 if 1 <= int(user_input) <= 65535:
530 connection_port = user_input
533 sys.stdout.write('Port %s not numerical or within correct range (1 <= port <= 65535), try again (hit enter for default 443): ' % user_input)
534 # Get the A records for the host
536 addresses = getA(args.host, secure=secure)
537 except InsecureLookupException, e:
538 print >> sys.stderr, str(e)
541 for address in addresses:
542 print 'Attempting to get certificate from %s' % str(address)
543 # We do the certificate handling here, as M2Crypto keeps segfaulting when try to do stuff with the cert if we don't
545 ctx.set_verify(SSL.verify_none, depth=9)
546 connection = SSL.Connection(ctx)
548 connection.connect((str(address), int(connection_port)))
549 except SSL.Checker.WrongHost:
552 chain = connection.get_peer_cert_chain()
553 for chaincert in chain:
554 if int(args.usage) == 1:
555 # The first cert is the end-entity cert
556 print 'Got a certificate with Subject: %s' % chaincert.get_subject()
560 if (int(args.usage) == 0 and chaincert.check_ca()) or int(args.usage) == 2:
561 sys.stdout.write('Got a certificate with the following Subject:\n\t%s.\nUse this as certificate to match? [y/N] ' % chaincert.get_subject())
564 user_input = raw_input()
565 if user_input in ['','n','N']:
567 elif user_input in ['y', 'Y']:
571 sys.stdout.write('Please answer Y or N')
575 if cert: # Print the requested records based on the retrieved certificates
576 if args.output == 'b':
577 print genTLSA(args.host, args.protocol, args.port, cert, 'draft', args.usage, args.selector, args.mtype)
578 print genTLSA(args.host, args.protocol, args.port, cert, 'rfc', args.usage, args.selector, args.mtype)
580 print genTLSA(args.host, args.protocol, args.port, cert, args.output, args.usage, args.selector, args.mtype)
582 else: # Pass the path to the certificate to the genTLSA function
583 if args.output == 'b':
584 print genTLSA(args.host, args.protocol, args.port, args.certificate, 'draft', args.usage, args.selector, args.mtype)
585 print genTLSA(args.host, args.protocol, args.port, args.certificate, 'rfc', args.usage, args.selector, args.mtype)
587 print genTLSA(args.host, args.protocol, args.port, args.certificate, args.output, args.usage, args.selector, args.mtype)