3 # swede - A tool to create DANE/TLSA (draft 14) records.
4 # This tool is really simple and not foolproof, it doesn't check the CN in the
5 # Subject field of the certificate. It also doesn't check if the supplied
6 # certificate is a CA certificate if usage 1 is specified (or any other
7 # checking for that matter).
9 # Usage is explained when running this program with --help
11 # This tool is loosly based on the dane tool in the sshfp package by Paul
12 # Wouters and Christopher Olah from xelerance.com.
14 # Copyright Pieter Lexis (pieter.lexis@os3.nl)
16 # License: GNU GENERAL PUBLIC LICENSE Version 2 or later
22 from M2Crypto import X509, SSL
23 from binascii import a2b_hex, b2a_hex
24 from hashlib import sha256, sha512
25 from ipaddr import IPv4Address, IPv6Address
28 def genTLSA(hostname, protocol, port, certificate, output='draft', usage=1, selector=0, mtype=1):
29 """This function generates a TLSARecord object using the data passed in the parameters,
30 it then validates the record and returns the RR as a string.
32 # check if valid vars were passed
33 if hostname[-1] != '.':
36 certificate = loadCert(certificate)
38 raise Exception('Cannot load certificate from disk')
40 # Create the record without a certificate
42 record = TLSARecord(name='%s._%s.%s'%(port,protocol,hostname), usage=usage, selector=selector, mtype=mtype, cert ='')
44 record = TLSARecord(name='_%s._%s.%s'%(port,protocol,hostname), usage=usage, selector=selector, mtype=mtype, cert ='')
45 # Check if the record is valid
47 if record.selector == 0:
48 # Hash the Full certificate
49 record.cert = getHash(certificate, record.mtype)
51 # Hash only the SubjectPublicKeyInfo
52 record.cert = getHash(certificate.get_pubkey(), record.mtype)
54 record.isValid(raiseException=True)
57 return record.getRecord(draft=True)
58 return record.getRecord()
60 def getA(hostname, secure=True):
61 """Gets a list of A records for hostname, returns a list of ARecords"""
62 records = getRecords(hostname, rrtype='A', secure=secure)
64 for record in records:
65 ret.append(ARecord(hostname, str(IPv4Address(int(b2a_hex(record),16)))))
68 def getAAAA(hostname, secure=True):
69 """Gets a list of A records for hostname, returns a list of AAAARecords"""
70 records = getRecords(hostname, rrtype='AAAA', secure=secure)
72 for record in records:
73 ret.append(AAAARecord(hostname, str(IPv6Address(int(b2a_hex(record),16)))))
76 def getVerificationErrorReason(num):
77 """This function returns the name of the X509 Error based on int(num)
79 # These were taken from the M2Crypto.m2 code
81 50: "X509_V_ERR_APPLICATION_VERIFICATION",
82 22: "X509_V_ERR_CERT_CHAIN_TOO_LONG",
83 10: "X509_V_ERR_CERT_HAS_EXPIRED",
84 9: "X509_V_ERR_CERT_NOT_YET_VALID",
85 28: "X509_V_ERR_CERT_REJECTED",
86 23: "X509_V_ERR_CERT_REVOKED",
87 7: "X509_V_ERR_CERT_SIGNATURE_FAILURE",
88 27: "X509_V_ERR_CERT_UNTRUSTED",
89 12: "X509_V_ERR_CRL_HAS_EXPIRED",
90 11: "X509_V_ERR_CRL_NOT_YET_VALID",
91 8: "X509_V_ERR_CRL_SIGNATURE_FAILURE",
92 18: "X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT",
93 14: "X509_V_ERR_ERROR_IN_CERT_NOT_AFTER_FIELD",
94 13: "X509_V_ERR_ERROR_IN_CERT_NOT_BEFORE_FIELD",
95 15: "X509_V_ERR_ERROR_IN_CRL_LAST_UPDATE_FIELD",
96 16: "X509_V_ERR_ERROR_IN_CRL_NEXT_UPDATE_FIELD",
97 24: "X509_V_ERR_INVALID_CA",
98 26: "X509_V_ERR_INVALID_PURPOSE",
99 17: "X509_V_ERR_OUT_OF_MEM",
100 25: "X509_V_ERR_PATH_LENGTH_EXCEEDED",
101 19: "X509_V_ERR_SELF_SIGNED_CERT_IN_CHAIN",
102 6: "X509_V_ERR_UNABLE_TO_DECODE_ISSUER_PUBLIC_KEY",
103 4: "X509_V_ERR_UNABLE_TO_DECRYPT_CERT_SIGNATURE",
104 5: "X509_V_ERR_UNABLE_TO_DECRYPT_CRL_SIGNATURE",
105 3: "X509_V_ERR_UNABLE_TO_GET_CRL",
106 2: "X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT",
107 20: "X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY",
108 21: "X509_V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE",
109 0: "X509_V_OK"}[int(num)]
111 def getRecords(hostname, rrtype='A', secure=True):
112 """Do a lookup of a name and a rrtype, returns a list of binary coded strings. Only queries for rr_class IN."""
114 ctx = unbound.ub_ctx()
115 ctx.add_ta_file('root.key')
116 # Use the local cache
117 if resolvconf and os.path.isfile(resolvconf):
118 ctx.resolvconf(resolvconf)
120 if type(rrtype) == str:
121 if 'RR_TYPE_' + rrtype in dir(unbound):
122 rrtype = getattr(unbound, 'RR_TYPE_' + rrtype)
124 raise Exception('Error: unknown RR TYPE: %s.' % rrtype)
125 elif type(rrtype) != int:
126 raise Exception('Error: rrtype in wrong format, neither int nor str.')
128 status, result = ctx.resolve(hostname, rrtype=rrtype)
129 if status == 0 and result.havedata:
130 if not result.secure:
132 # The data is insecure and a secure lookup was requested
133 raise InsecureLookupException('Error: query data not secure and secure data requested, unable to continue')
135 print >> sys.stderr, 'Warning: query data is not secure.'
136 # If we are here the data was either secure or insecure data is accepted
137 return result.data.raw
139 raise DNSLookupError('Unsuccesful lookup or no data returned for rrtype %s.' % rrtype)
141 def getHash(certificate, mtype):
142 """Hashes the certificate based on the mtype.
143 The certificate should be an M2Crypto.X509.X509 object (or the result of the get_pubkey() function on said object)
145 certificate = certificate.as_der()
147 return b2a_hex(certificate)
149 return sha256(certificate).hexdigest()
151 return sha512(certificate).hexdigest()
153 raise Exception('mtype should be 0,1,2')
155 def getTLSA(hostname, port=443, protocol='tcp', secure=True):
157 This function tries to do a secure lookup of the TLSA record.
158 At the moment it requests the TYPE65468 record and parses it into a 'valid' TLSA record
159 It returns a list of TLSARecord objects
161 if hostname[-1] != '.':
164 if not protocol.lower() in ['tcp', 'udp', 'sctp']:
165 raise Exception('Error: unknown protocol: %s. Should be one of tcp, udp or sctp' % protocol)
168 records = getRecords('*._%s.%s' % (protocol.lower(), hostname), rrtype=65468, secure=secure)
170 records = getRecords('_%s._%s.%s' % (port, protocol.lower(), hostname), rrtype=65468, secure=secure)
171 except InsecureLookupException, e:
174 except DNSLookupError, e:
175 print 'Unable to resolve %s: %s' % (hostname, str(e))
178 for record in records:
179 hexdata = b2a_hex(record)
181 ret.append(TLSARecord('*._%s.%s' % (protocol.lower(), hostname), int(hexdata[0:2],16), int(hexdata[2:4],16), int(hexdata[4:6],16), hexdata[6:]))
183 ret.append(TLSARecord('_%s._%s.%s' % (port, protocol.lower(), hostname), int(hexdata[0:2],16), int(hexdata[2:4],16), int(hexdata[4:6],16), hexdata[6:]))
186 def loadCert(certificate):
187 """Returns an M2Crypto.X509.X509 object"""
188 if isinstance(certificate, X509.X509):
189 # nothing to be done :-)
192 # Maybe we were passed a path
193 return X509.load_cert(certificate)
195 # Can't load the cert
196 raise Exception('Unable to load certificate %s.' % certificate)
198 def verifyCertMatch(record, cert):
200 Verify the certificate with the record.
201 record should be a TLSARecord and cert should be a M2Crypto.X509.X509
203 if not isinstance(cert, X509.X509):
205 if not isinstance(record, TLSARecord):
208 if record.selector == 1:
209 certhash = getHash(cert.get_pubkey(), record.mtype)
211 certhash = getHash(cert, record.mtype)
216 if certhash == record.cert:
222 """When instanciated, this class contains all the fields of a TLSA record.
224 def __init__(self, name, usage, selector, mtype, cert):
225 """name is the name of the RR in the format: /^(_\d{1,5}|\*)\._(tcp|udp|sctp)\.([a-z0-9]*\.){2,}$/
226 usage, selector and mtype should be an integer
227 cert should be a hexidecimal string representing the certificate to be matched field
230 self.rrtype = 65468 # TLSA provisional
231 self.rrclass = 1 # IN
232 self.name = str(name)
233 self.usage = int(usage)
234 self.selector = int(selector)
235 self.mtype = int(mtype)
236 self.cert = str(cert)
238 raise Exception('Invalid value passed, unable to create a TLSARecord')
240 def getRecord(self, draft=False):
241 """Returns the RR string of this TLSARecord, either in rfc (default) or draft format"""
243 return '%s IN TYPE65468 \# %s %s%s%s%s' % (self.name, (len(self.cert)/2)+3 , self._toHex(self.usage), self._toHex(self.selector), self._toHex(self.mtype), self.cert)
244 return '%s IN TLSA %s %s %s %s' % (self.name, self.usage, self.selector, self.mtype, self.cert)
246 def _toHex(self, val):
247 """Helper function to create hex strings from integers"""
250 def isValid(self, raiseException=False):
251 """Check whether all fields in the TLSA record are conforming to the spec and check if the port, protocol and name are good"""
254 if not 1 <= int(self.getPort()) <= 65535:
255 err.append('Port %s not within correct range (1 <= port <= 65535)' % self.getPort())
257 if self.getPort() != '*':
258 err.append('Port %s not a number' % self.getPort())
259 if not self.usage in [0,1,2]:
260 err.append('Usage: invalid (%s is not one of 0, 1 or 2)' % self.usage)
261 if not self.selector in [0,1]:
262 err.append('Selector: invalid (%s is not one of 0 or 1)' % self.selector)
263 if not self.mtype in [0,1,2]:
264 err.append('Matching Type: invalid (%s is not one of 0, 1 or 2)' % self.mtype)
265 if not self.isNameValid():
266 err.append('Name (%s) is not in the correct format: _portnumber._transportprotocol.hostname.dom.' % self.name)
267 # A certificate length of 0 is accepted
268 if self.mtype in [1,2] and len(self.cert) != 0:
269 if not len(self.cert) == {1:64,2:128}[self.mtype]:
270 err.append('Certificate for Association: invalid (Hash length does not match hash-type in Matching Type(%s))' % {1:'SHA-256',2:'SHA-512'}[self.mtype])
272 if not raiseException:
275 msg = 'The TLSA record is invalid.'
277 msg += '\n\t%s' % error
278 raise RecordValidityException(msg)
282 def isNameValid(self):
283 """Check if the name if in the correct format"""
284 if not re.match('^(_\d{1,5}|\*)\._(tcp|udp|sctp)\.([a-z0-9]*\.){2,}$', self.name):
288 def getProtocol(self):
289 """Returns the protocol based on the name"""
290 return re.split('\.', self.name)[1][1:]
293 """Returns the port based on the name"""
294 if re.split('\.', self.name)[0][0] == '*':
297 return re.split('\.', self.name)[0][1:]
300 """An object representing an A Record (IPv4 address)"""
301 def __init__(self, hostname, address):
303 self.hostname = hostname
304 self.address = address
311 IPv4Address(self.address)
317 """An object representing an AAAA Record (IPv6 address)"""
318 def __init__(self, hostname, address):
320 self.address = address
327 IPv6Address(self.address)
333 class RecordValidityException(Exception):
336 class InsecureLookupException(Exception):
339 class DNSLookupError(Exception):
342 if __name__ == '__main__':
345 parser = argparse.ArgumentParser(description='Create and verify DANE records.', epilog='This tool has a few limitations: it only IPv4 for SSL connections.')
347 subparsers = parser.add_subparsers(title='Functions', help='Available functions, see %(prog)s function -h for function-specific help')
348 parser_verify = subparsers.add_parser('verify', help='Verify a TLSA record, exit 0 when all TLSA records are matched, exit 2 when a record does not match the received certificate, exit 1 on error.', epilog='Caveat: For TLSA validation, this program chases through the certificate chain offered by the server, not it\'s local certificates.')
349 parser_verify.set_defaults(function='verify')
350 parser_create = subparsers.add_parser('create', help='Create a TLSA record')
351 parser_create.set_defaults(function='create')
353 #parser.add_argument('-4', dest='ipv4', action='store_true',help='use ipv4 networking only')
354 #parser.add_argument('-6', dest='ipv6', action='store_true',help='use ipv6 networking only')
355 parser.add_argument('--insecure', action='store_true', default=False, help='Allow use of non-dnssec secured answers')
356 parser.add_argument('--resolvconf', metavar='/PATH/TO/RESOLV.CONF', action='store', default='', help='Use a recursive resolver from resolv.conf')
357 parser.add_argument('-v', '--version', action='version', version='%(prog)s v0.1', help='show version and exit')
358 parser.add_argument('host', metavar="hostname")
360 parser_verify.add_argument('--port', '-p', action='store', default='443', help='The port, or \'*\' where running TLS is located (default: %(default)s).')
361 parser_verify.add_argument('--protocol', action='store', choices=['tcp','udp','sctp'], default='tcp', help='The protocol the TLS service is using (default: %(default)s).')
362 parser_verify.add_argument('--only-rr', '-o', action='store_true', help='Only verify that the TLSA resource record is correct (do not check certificate)')
363 parser_verify.add_argument('--ca-cert', metavar='/PATH/TO/CERTSTORE', action='store', default = '/etc/ssl/certs/', help='Path to a CA certificate or a directory containing the certificates (default: %(default)s)')
364 parser_verify.add_argument('--quiet', '-q', action='store_true', help='Only print the result of the validation')
366 parser_create.add_argument('--port', '-p', action='store', type=int, default=443, help='The port where running TLS is located (default: %(default)s).')
367 parser_create.add_argument('--protocol', action='store', choices=['tcp','udp','sctp'], default='tcp', help='The protocol the TLS service is using (default: %(default)s).')
368 parser_create.add_argument('--certificate', '-c', help='The certificate used for the host. If certificate is empty, the certificate will be downloaded from the server')
369 parser_create.add_argument('--output', '-o', action='store', default='draft', choices=['draft','rfc','both'], help='The type of output. Draft (private RRtype, 65468), RFC (TLSA) or both (default: %(default)s).')
371 # Usage of the certificate
372 parser_create.add_argument('--usage', '-u', action='store', type=int, default=1, choices=[0,1,2], help='The Usage of the Certificate for Association. \'0\' for CA, \'1\' for End Entity, \'2\' for trust-anchor (default: %(default)s).')
373 parser_create.add_argument('--selector', '-s', action='store', type=int, default=0, choices=[0,1], help='The Selector for the Certificate for Association. \'0\' for Full Certificate, \'1\' for SubjectPublicKeyInfo (default: %(default)s).')
374 parser_create.add_argument('--mtype', '-m', action='store', type=int, default=1, choices=[0,1,2], help='The Matching Type of the Certificate for Association. \'0\' for Exact match, \'1\' for SHA-256 hash, \'2\' for SHA-512 (default: %(default)s).')
376 args = parser.parse_args()
378 if args.host[-1] != '.':
383 if os.path.isfile(args.resolvconf):
384 resolvconf = args.resolvconf
386 print >> sys.stdout, '%s is not a file. Unable to use it as resolv.conf' % args.resolvconf
391 # not operations are fun!
392 secure = not args.insecure
394 if args.function == 'verify':
395 records = getTLSA(args.host, args.port, args.protocol, secure)
396 if len(records) == 0:
399 for record in records:
401 # First, check if the first three fields have correct values.
403 print 'Received the following record for name %s:' % record.name
404 print '\tUsage:\t\t\t\t%d (%s)' % (record.usage, {0:'CA Constraint', 1:'End-Entity Constraint', 2:'Trust Anchor'}[record.usage])
405 print '\tSelector:\t\t\t%d (%s)' % (record.selector, {0:'Certificate', 1:'SubjectPublicKeyInfo'}[record.selector])
406 print '\tMatching Type:\t\t\t%d (%s)' % (record.mtype, {0:'Full Certificate', 1:'SHA-256', 2:'SHA-512'}[record.mtype])
407 print '\tCertificate for Association:\t%s' % record.cert
410 record.isValid(raiseException=True)
411 except RecordValidityException, e:
412 print sys.stderr, 'Error: %s' % str(e)
416 print 'This record is valid (well-formed).'
419 # Go to the next record
422 # When we are here, The user also wants to verify the certificates with the record
423 if args.protocol != 'tcp':
424 print >> sys.stderr, 'Only SSL over TCP is supported (sorry)'
428 print 'Attempting to verify the record with the TLS service...'
429 addresses = getA(args.host, secure=secure)
430 for address in addresses:
432 print 'Got the following IP: %s' % str(address)
433 # We do the certificate handling here, as M2Crypto keeps segfaulting when we do it in a method
435 if os.path.isfile(args.ca_cert):
436 if ctx.load_verify_locations(cafile=args.ca_cert) != 1: raise Exception('No CA cert')
437 elif os.path.exists(args.ca_cert):
438 if ctx.load_verify_locations(capath=args.ca_cert) != 1: raise Exception('No CA certs')
440 print >> sys.stderr, '%s is neither a file nor a directory, unable to continue' % args.ca_cert
442 # Don't error when the verification fails in the SSL handshake
443 ctx.set_verify(SSL.verify_none, depth=9)
444 connection = SSL.Connection(ctx)
446 connection.connect((str(address), int(args.port)))
447 except SSL.Checker.WrongHost, e:
448 # The name on the remote cert doesn't match the hostname because we connect on IP, not hostname (as we want secure lookup)
450 chain = connection.get_peer_cert_chain()
451 verify_result = connection.get_verify_result()
453 # Good, now let's verify
454 if record.usage == 1: # End-host cert
456 if verifyCertMatch(record, cert):
457 if verify_result == 0: # The cert chains to a valid CA cert according to the system-certificates
458 print 'SUCCES (Usage 1): Certificate offered by the server matches the one mentioned in the TLSA record and chains to a valid CA certificate'
460 print 'FAIL (Usage 1): Certificate offered by the server matches the one mentioned in the TLSA record but the following error was raised during PKIX validation: %s' % getVerificationErrorReason(verify_result)
461 if pre_exit == 0: pre_exit = 2
462 if not args.quiet: print 'The matched certificate has Subject: %s' % cert.get_subject()
464 print 'FAIL: Certificate offered by the server does not match the TLSA record'
465 if pre_exit == 0: pre_exit = 2
467 elif record.usage == 0: # CA constraint
469 # Remove the first (= End-Entity cert) from the chain
471 if verifyCertMatch(record, cert):
476 if verify_result == 0:
477 print 'SUCCES (Usage 0): A certificate in the certificate chain offered by the server matches the one mentioned in the TLSA record and is a CA certificate'
479 print 'FAIL (Usage 0): A certificate in the certificate chain offered by the server matches the one mentioned in the TLSA record and is a CA certificate, but the following error was raised during PKIX validation:' % getVerificationErrorReason(verify_result)
480 if pre_exit == 0: pre_exit = 2
482 print 'FAIL (Usage 0): A certificate in the certificate chain offered by the server matches the one mentioned in the TLSA record but is not a CA certificate'
483 if pre_exit == 0: pre_exit = 2
484 if not args.quiet: print 'The matched certificate has Subject: %s' % cert.get_subject()
486 print 'FAIL (Usage 0): No certificate in the certificate chain offered by the server matches the TLSA record'
487 if pre_exit == 0: pre_exit = 2
489 elif record.usage == 2: # Usage 2, ANY cert in the chain must match (aka 'pick any')
492 if verifyCertMatch(record, cert):
496 print 'SUCCES (usage 2): A certificate in the certificate chain (including the end-entity certificate) offered by the server matches the TLSA record'
497 if not args.quiet: print 'The matched certificate has Subject: %s' % cert.get_subject()
499 print 'FAIL (usage 2): No certificate in the certificate chain (including the end-entity certificate) offered by the server matches the TLSA record'
500 if pre_exit == 0: pre_exit = 2
502 # Cleanup, just in case
507 # END for address in addresses
508 # END for record in records
512 else: # we want to create
514 if not args.certificate:
515 if args.protocol != 'tcp':
516 print >> sys.stderr, 'Only SSL over TCP is supported (sorry)'
519 print 'No certificate specified on the commandline, attempting to retrieve it from the server %s' % (args.host)
520 connection_port = args.port
522 sys.stdout.write('The port specified on the commandline is *, please specify the port of the TLS service on %s (443): ' % args.host)
525 user_input = raw_input()
527 connection_port = 443
530 if 1 <= int(user_input) <= 65535:
531 connection_port = user_input
534 sys.stdout.write('Port %s not numerical or within correct range (1 <= port <= 65535), try again (hit enter for default 443): ' % user_input)
535 # Get the A records for the host
537 addresses = getA(args.host, secure=secure)
538 except InsecureLookupException, e:
539 print >> sys.stderr, str(e)
542 for address in addresses:
543 print 'Attempting to get certificate from %s' % str(address)
544 # We do the certificate handling here, as M2Crypto keeps segfaulting when try to do stuff with the cert if we don't
546 ctx.set_verify(SSL.verify_none, depth=9)
547 connection = SSL.Connection(ctx)
549 connection.connect((str(address), int(connection_port)))
550 except SSL.Checker.WrongHost:
553 chain = connection.get_peer_cert_chain()
554 for chaincert in chain:
555 if int(args.usage) == 1:
556 # The first cert is the end-entity cert
557 print 'Got a certificate with Subject: %s' % chaincert.get_subject()
561 if (int(args.usage) == 0 and chaincert.check_ca()) or int(args.usage) == 2:
562 sys.stdout.write('Got a certificate with the following Subject:\n\t%s.\nUse this as certificate to match? [y/N] ' % chaincert.get_subject())
565 user_input = raw_input()
566 if user_input in ['','n','N']:
568 elif user_input in ['y', 'Y']:
572 sys.stdout.write('Please answer Y or N')
576 if cert: # Print the requested records based on the retrieved certificates
577 if args.output == 'b':
578 print genTLSA(args.host, args.protocol, args.port, cert, 'draft', args.usage, args.selector, args.mtype)
579 print genTLSA(args.host, args.protocol, args.port, cert, 'rfc', args.usage, args.selector, args.mtype)
581 print genTLSA(args.host, args.protocol, args.port, cert, args.output, args.usage, args.selector, args.mtype)
583 else: # Pass the path to the certificate to the genTLSA function
584 if args.output == 'b':
585 print genTLSA(args.host, args.protocol, args.port, args.certificate, 'draft', args.usage, args.selector, args.mtype)
586 print genTLSA(args.host, args.protocol, args.port, args.certificate, 'rfc', args.usage, args.selector, args.mtype)
588 print genTLSA(args.host, args.protocol, args.port, args.certificate, args.output, args.usage, args.selector, args.mtype)