3 # swede - A tool to create DANE/TLSA (draft 14) records.
4 # This tool is really simple and not foolproof, it doesn't check the CN in the
5 # Subject field of the certificate. It also doesn't check if the supplied
6 # certificate is a CA certificate if usage 1 is specified (or any other
7 # checking for that matter).
9 # Usage is explained when running this program with --help
11 # This tool is loosly based on the dane tool in the sshfp package by Paul
12 # Wouters and Christopher Olah from xelerance.com.
14 # Copyright Pieter Lexis (pieter.lexis@os3.nl)
16 # License: GNU GENERAL PUBLIC LICENSE Version 2 or later
22 from M2Crypto import X509, SSL
23 from binascii import a2b_hex, b2a_hex
24 from hashlib import sha256, sha512
25 from ipaddr import IPv4Address, IPv6Address
27 def genTLSA(hostname, protocol, port, certificate, output='draft', usage=1, selector=0, mtype=1):
28 """This function generates a TLSARecord object using the data passed in the parameters,
29 it then validates the record and returns the RR as a string.
31 # check if valid vars were passed
32 if hostname[-1] != '.':
35 certificate = loadCert(certificate)
37 raise Exception('Cannot load certificate from disk')
39 # Create the record without a certificate
41 record = TLSARecord(name='%s._%s.%s'%(port,protocol,hostname), usage=usage, selector=selector, mtype=mtype, cert ='')
43 record = TLSARecord(name='_%s._%s.%s'%(port,protocol,hostname), usage=usage, selector=selector, mtype=mtype, cert ='')
44 # Check if the record is valid
46 if record.selector == 0:
47 # Hash the Full certificate
48 record.cert = getHash(certificate, record.mtype)
50 # Hash only the SubjectPublicKeyInfo
51 record.cert = getHash(certificate.get_pubkey(), record.mtype)
53 record.isValid(raiseException=True)
56 return record.getRecord(draft=True)
57 return record.getRecord()
59 def getA(hostname, secure=True):
60 """Gets a list of A records for hostname, returns a list of ARecords"""
61 records = getRecords(hostname, rrtype='A', secure=secure)
63 for record in records:
64 ret.append(ARecord(hostname, str(IPv4Address(int(b2a_hex(record),16)))))
67 def getAAAA(hostname, secure=True):
68 """Gets a list of A records for hostname, returns a list of AAAARecords"""
69 records = getRecords(hostname, rrtype='AAAA', secure=secure)
71 for record in records:
72 ret.append(AAAARecord(hostname, str(IPv6Address(int(b2a_hex(record),16)))))
75 def getVerificationErrorReason(num):
76 """This function returns the name of the X509 Error based on int(num)
78 # These were taken from the M2Crypto.m2 code
80 50: "X509_V_ERR_APPLICATION_VERIFICATION",
81 22: "X509_V_ERR_CERT_CHAIN_TOO_LONG",
82 10: "X509_V_ERR_CERT_HAS_EXPIRED",
83 9: "X509_V_ERR_CERT_NOT_YET_VALID",
84 28: "X509_V_ERR_CERT_REJECTED",
85 23: "X509_V_ERR_CERT_REVOKED",
86 7: "X509_V_ERR_CERT_SIGNATURE_FAILURE",
87 27: "X509_V_ERR_CERT_UNTRUSTED",
88 12: "X509_V_ERR_CRL_HAS_EXPIRED",
89 11: "X509_V_ERR_CRL_NOT_YET_VALID",
90 8: "X509_V_ERR_CRL_SIGNATURE_FAILURE",
91 18: "X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT",
92 14: "X509_V_ERR_ERROR_IN_CERT_NOT_AFTER_FIELD",
93 13: "X509_V_ERR_ERROR_IN_CERT_NOT_BEFORE_FIELD",
94 15: "X509_V_ERR_ERROR_IN_CRL_LAST_UPDATE_FIELD",
95 16: "X509_V_ERR_ERROR_IN_CRL_NEXT_UPDATE_FIELD",
96 24: "X509_V_ERR_INVALID_CA",
97 26: "X509_V_ERR_INVALID_PURPOSE",
98 17: "X509_V_ERR_OUT_OF_MEM",
99 25: "X509_V_ERR_PATH_LENGTH_EXCEEDED",
100 19: "X509_V_ERR_SELF_SIGNED_CERT_IN_CHAIN",
101 6: "X509_V_ERR_UNABLE_TO_DECODE_ISSUER_PUBLIC_KEY",
102 4: "X509_V_ERR_UNABLE_TO_DECRYPT_CERT_SIGNATURE",
103 5: "X509_V_ERR_UNABLE_TO_DECRYPT_CRL_SIGNATURE",
104 3: "X509_V_ERR_UNABLE_TO_GET_CRL",
105 2: "X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT",
106 20: "X509_V_ERR_UNABLE_TO_GET_ISSUER_CERT_LOCALLY",
107 21: "X509_V_ERR_UNABLE_TO_VERIFY_LEAF_SIGNATURE",
108 0: "X509_V_OK"}[int(num)]
110 def getRecords(hostname, rrtype='A', secure=True):
111 """Do a lookup of a name and a rrtype, returns a list of binary coded strings. Only queries for rr_class IN."""
112 ctx = unbound.ub_ctx()
113 ctx.add_ta_file('root.key')
115 if type(rrtype) == str:
116 if 'RR_TYPE_' + rrtype in dir(unbound):
117 rrtype = getattr(unbound, 'RR_TYPE_' + rrtype)
119 raise Exception('Error: unknown RR TYPE: %s.' % rrtype)
120 elif type(rrtype) != int:
121 raise Exception('Error: rrtype in wrong format, neither int nor str.')
123 status, result = ctx.resolve(hostname, rrtype=rrtype)
124 if status == 0 and result.havedata:
125 if not result.secure:
127 # The data is insecure and a secure lookup was requested
128 raise InsecureLookupException('Error: query data not secure and secure data requested, unable to continue')
130 print >> sys.stderr, 'Warning: query data is not secure.'
131 # If we are here the data was either secure or insecure data is accepted
132 return result.data.raw
134 raise Exception('Error: Unsuccesful lookup or no data returned.')
136 def getHash(certificate, mtype):
137 """Hashes the certificate based on the mtype.
138 The certificate should be an M2Crypto.X509.X509 object (or the result of the get_pubkey() function on said object)
140 certificate = certificate.as_der()
142 return b2a_hex(certificate)
144 return sha256(certificate).hexdigest()
146 return sha512(certificate).hexdigest()
148 raise Exception('mtype should be 0,1,2')
150 def getTLSA(hostname, port=443, protocol='tcp', secure=True):
152 This function tries to do a secure lookup of the TLSA record.
153 At the moment it requests the TYPE65468 record and parses it into a 'valid' TLSA record
154 It returns a list of TLSARecord objects
156 if hostname[-1] != '.':
159 if not protocol.lower() in ['tcp', 'udp', 'sctp']:
160 raise Exception('Error: unknown protocol: %s. Should be one of tcp, udp or sctp' % protocol)
163 records = getRecords('*._%s.%s' % (protocol.lower(), hostname), rrtype=65468, secure=secure)
165 records = getRecords('_%s._%s.%s' % (port, protocol.lower(), hostname), rrtype=65468, secure=secure)
166 except InsecureLookupException, e:
170 for record in records:
171 hexdata = b2a_hex(record)
173 ret.append(TLSARecord('*._%s.%s' % (protocol.lower(), hostname), int(hexdata[0:2],16), int(hexdata[2:4],16), int(hexdata[4:6],16), hexdata[6:]))
175 ret.append(TLSARecord('_%s._%s.%s' % (port, protocol.lower(), hostname), int(hexdata[0:2],16), int(hexdata[2:4],16), int(hexdata[4:6],16), hexdata[6:]))
178 def loadCert(certificate):
179 """Returns an M2Crypto.X509.X509 object"""
180 if isinstance(certificate, X509.X509):
181 # nothing to be done :-)
184 # Maybe we were passed a path
185 return X509.load_cert(certificate)
187 # Can't load the cert
188 raise Exception('Unable to load certificate %s.' % certificate)
190 def verifyCertMatch(record, cert):
192 Verify the certificate with the record.
193 record should be a TLSARecord and cert should be a M2Crypto.X509.X509
195 if not isinstance(cert, X509.X509):
197 if not isinstance(record, TLSARecord):
200 if record.selector == 1:
201 certhash = getHash(cert.get_pubkey(), record.mtype)
203 certhash = getHash(cert, record.mtype)
208 if certhash == record.cert:
214 """When instanciated, this class contains all the fields of a TLSA record.
216 def __init__(self, name, usage, selector, mtype, cert):
217 """name is the name of the RR in the format: /^(_\d{1,5}|\*)\._(tcp|udp|sctp)\.([a-z0-9]*\.){2,}$/
218 usage, selector and mtype should be an integer
219 cert should be a hexidecimal string representing the certificate to be matched field
222 self.rrtype = 65468 # TLSA provisional
223 self.rrclass = 1 # IN
224 self.name = str(name)
225 self.usage = int(usage)
226 self.selector = int(selector)
227 self.mtype = int(mtype)
228 self.cert = str(cert)
230 raise Exception('Invalid value passed, unable to create a TLSARecord')
232 def getRecord(self, draft=False):
233 """Returns the RR string of this TLSARecord, either in rfc (default) or draft format"""
235 return '%s IN TYPE65468 \# %s %s%s%s%s' % (self.name, (len(self.cert)/2)+3 , self._toHex(self.usage), self._toHex(self.selector), self._toHex(self.mtype), self.cert)
236 return '%s IN TLSA %s %s %s %s' % (self.name, self.usage, self.selector, self.mtype, self.cert)
238 def _toHex(self, val):
239 """Helper function to create hex strings from integers"""
242 def isValid(self, raiseException=False):
243 """Check whether all fields in the TLSA record are conforming to the spec and check if the port, protocol and name are good"""
246 if not 1 <= int(self.getPort()) <= 65535:
247 err.append('Port %s not within correct range (1 <= port <= 65535)' % self.getPort())
249 if self.getPort() != '*':
250 err.append('Port %s not a number' % self.getPort())
251 if not self.usage in [0,1,2]:
252 err.append('Usage: invalid (%s is not one of 0, 1 or 2)' % self.usage)
253 if not self.selector in [0,1]:
254 err.append('Selector: invalid (%s is not one of 0 or 1)' % self.selector)
255 if not self.mtype in [0,1,2]:
256 err.append('Matching Type: invalid (%s is not one of 0, 1 or 2)' % self.mtype)
257 if not self.isNameValid():
258 err.append('Name (%s) is not in the correct format: _portnumber._transportprotocol.hostname.dom.' % self.name)
259 # A certificate length of 0 is accepted
260 if self.mtype in [1,2] and len(self.cert) != 0:
261 if not len(self.cert) == {1:64,2:128}[self.mtype]:
262 err.append('Certificate for Association: invalid (Hash length does not match hash-type in Matching Type(%s))' % {1:'SHA-256',2:'SHA-512'}[self.mtype])
264 if not raiseException:
267 msg = 'The TLSA record is invalid.'
269 msg += '\n\t%s' % error
270 raise RecordValidityException(msg)
274 def isNameValid(self):
275 """Check if the name if in the correct format"""
276 if not re.match('^(_\d{1,5}|\*)\._(tcp|udp|sctp)\.([a-z0-9]*\.){2,}$', self.name):
280 def getProtocol(self):
281 """Returns the protocol based on the name"""
282 return re.split('\.', self.name)[1][1:]
285 """Returns the port based on the name"""
286 if re.split('\.', self.name)[0][0] == '*':
289 return re.split('\.', self.name)[0][1:]
292 """An object representing an A Record (IPv4 address)"""
293 def __init__(self, hostname, address):
295 self.hostname = hostname
296 self.address = address
303 IPv4Address(self.address)
309 """An object representing an AAAA Record (IPv6 address)"""
310 def __init__(self, hostname, address):
312 self.address = address
319 IPv6Address(self.address)
325 class RecordValidityException(Exception):
326 def __init__(self, value):
331 class InsecureLookupException(Exception):
332 def __init__(self, value):
337 if __name__ == '__main__':
340 parser = argparse.ArgumentParser(description='Create and verify DANE records.', epilog='This tool has a few limitations: it only IPv4 for SSL connections.')
342 subparsers = parser.add_subparsers(title='Functions', help='Available functions, see %(prog)s function -h for function-specific help')
343 parser_verify = subparsers.add_parser('verify', help='Verify a TLSA record, exit 0 when all TLSA records are matched, exit 2 when a record does not match the received certificate, exit 1 on error.', epilog='Caveat: For TLSA validation, this program chases through the certificate chain offered by the server, not it\'s local certificates.')
344 parser_verify.set_defaults(function='verify')
345 parser_create = subparsers.add_parser('create', help='Create a TLSA record')
346 parser_create.set_defaults(function='create')
348 #parser.add_argument('-4', dest='ipv4', action='store_true',help='use ipv4 networking only')
349 #parser.add_argument('-6', dest='ipv6', action='store_true',help='use ipv6 networking only')
350 parser.add_argument('--insecure', action='store_true', default=False, help='Allow use of non-dnssec secured answers')
351 parser.add_argument('-v', '--version', action='version', version='%(prog)s v0.1', help='show version and exit')
352 parser.add_argument('host', metavar="hostname")
354 parser_verify.add_argument('--port', '-p', action='store', default='443', help='The port, or \'*\' where running TLS is located (default: %(default)s).')
355 parser_verify.add_argument('--protocol', action='store', choices=['tcp','udp','sctp'], default='tcp', help='The protocol the TLS service is using (default: %(default)s).')
356 parser_verify.add_argument('--only-rr', '-o', action='store_true', help='Only verify that the TLSA resource record is correct (do not check certificate)')
357 parser_verify.add_argument('--ca-cert', metavar='/PATH/TO/CERTSTORE', action='store', default = '/etc/ssl/certs/', help='Path to a CA certificate or a directory containing the certificates (default: %(default)s)')
358 parser_verify.add_argument('--quiet', '-q', action='store_true', help='Only print the result of the validation')
360 parser_create.add_argument('--port', '-p', action='store', type=int, default=443, help='The port where running TLS is located (default: %(default)s).')
361 parser_create.add_argument('--protocol', action='store', choices=['tcp','udp','sctp'], default='tcp', help='The protocol the TLS service is using (default: %(default)s).')
362 parser_create.add_argument('--certificate', '-c', help='The certificate used for the host. If certificate is empty, the certificate will be downloaded from the server')
363 parser_create.add_argument('--output', '-o', action='store', default='draft', choices=['draft','rfc','both'], help='The type of output. Draft (private RRtype, 65468), RFC (TLSA) or both (default: %(default)s).')
365 # Usage of the certificate
366 parser_create.add_argument('--usage', '-u', action='store', type=int, default=1, choices=[0,1,2], help='The Usage of the Certificate for Association. \'0\' for CA, \'1\' for End Entity, \'2\' for trust-anchor (default: %(default)s).')
367 parser_create.add_argument('--selector', '-s', action='store', type=int, default=0, choices=[0,1], help='The Selector for the Certificate for Association. \'0\' for Full Certificate, \'1\' for SubjectPublicKeyInfo (default: %(default)s).')
368 parser_create.add_argument('--mtype', '-m', action='store', type=int, default=1, choices=[0,1,2], help='The Matching Type of the Certificate for Association. \'0\' for Exact match, \'1\' for SHA-256 hash, \'2\' for SHA-512 (default: %(default)s).')
370 args = parser.parse_args()
372 if args.host[-1] != '.':
375 # not operations are fun!
376 secure = not args.insecure
378 if args.function == 'verify':
379 records = getTLSA(args.host, args.port, args.protocol, secure)
380 if len(records) == 0:
383 for record in records:
385 # First, check if the first three fields have correct values.
387 print 'Received the following record for name %s:' % record.name
388 print '\tUsage:\t\t\t\t%d (%s)' % (record.usage, {0:'CA Constraint', 1:'End-Entity Constraint', 2:'Trust Anchor'}[record.usage])
389 print '\tSelector:\t\t\t%d (%s)' % (record.selector, {0:'Certificate', 1:'SubjectPublicKeyInfo'}[record.selector])
390 print '\tMatching Type:\t\t\t%d (%s)' % (record.mtype, {0:'Full Certificate', 1:'SHA-256', 2:'SHA-512'}[record.mtype])
391 print '\tCertificate for Association:\t%s' % record.cert
394 record.isValid(raiseException=True)
395 except RecordValidityException, e:
396 print sys.stderr, 'Error: %s' % str(e)
400 print 'This record is valid (well-formed).'
403 # Go to the next record
406 # When we are here, The user also wants to verify the certificates with the record
407 if args.protocol != 'tcp':
408 print >> sys.stderr, 'Only SSL over TCP is supported (sorry)'
412 print 'Attempting to verify the record with the TLS service...'
413 addresses = getA(args.host, secure=secure)
414 for address in addresses:
416 print 'Got the following IP: %s' % str(address)
417 # We do the certificate handling here, as M2Crypto keeps segfaulting when we do it in a method
419 if os.path.isfile(args.ca_cert):
420 if ctx.load_verify_locations(cafile=args.ca_cert) != 1: raise Exception('No CA cert')
421 elif os.path.exists(args.ca_cert):
422 if ctx.load_verify_locations(capath=args.ca_cert) != 1: raise Exception('No CA certs')
424 print >> sys.stderr, '%s is neither a file nor a directory, unable to continue' % args.ca_cert
426 # Don't error when the verification fails in the SSL handshake
427 ctx.set_verify(SSL.verify_none, depth=9)
428 connection = SSL.Connection(ctx)
430 connection.connect((str(address), int(args.port)))
431 except SSL.Checker.WrongHost, e:
432 # The name on the remote cert doesn't match the hostname because we connect on IP, not hostname (as we want secure lookup)
434 chain = connection.get_peer_cert_chain()
435 verify_result = connection.get_verify_result()
437 # Good, now let's verify
438 if record.usage == 1: # End-host cert
440 if verifyCertMatch(record, cert):
441 if verify_result == 0: # The cert chains to a valid CA cert according to the system-certificates
442 print 'SUCCES (Usage 1): Certificate offered by the server matches the one mentioned in the TLSA record and chains to a valid CA certificate'
444 print 'FAIL (Usage 1): Certificate offered by the server matches the one mentioned in the TLSA record but the following error was raised during PKIX validation: %s' % getVerificationErrorReason(verify_result)
445 if pre_exit == 0: pre_exit = 2
446 if not args.quiet: print 'The matched certificate has Subject: %s' % cert.get_subject()
448 print 'FAIL: Certificate offered by the server does not match the TLSA record'
449 if pre_exit == 0: pre_exit = 2
451 elif record.usage == 0: # CA constraint
453 # Remove the first (= End-Entity cert) from the chain
455 if verifyCertMatch(record, cert):
460 if verify_result == 0:
461 print 'SUCCES (Usage 0): A certificate in the certificate chain offered by the server matches the one mentioned in the TLSA record and is a CA certificate'
463 print 'FAIL (Usage 0): A certificate in the certificate chain offered by the server matches the one mentioned in the TLSA record and is a CA certificate, but the following error was raised during PKIX validation:' % getVerificationErrorReason(verify_result)
464 if pre_exit == 0: pre_exit = 2
466 print 'FAIL (Usage 0): A certificate in the certificate chain offered by the server matches the one mentioned in the TLSA record but is not a CA certificate'
467 if pre_exit == 0: pre_exit = 2
468 if not args.quiet: print 'The matched certificate has Subject: %s' % cert.get_subject()
470 print 'FAIL (Usage 0): No certificate in the certificate chain offered by the server matches the TLSA record'
471 if pre_exit == 0: pre_exit = 2
473 elif record.usage == 2: # Usage 2, ANY cert in the chain must match (aka 'pick any')
476 if verifyCertMatch(record, cert):
480 print 'SUCCES (usage 2): A certificate in the certificate chain (including the end-entity certificate) offered by the server matches the TLSA record'
481 if not args.quiet: print 'The matched certificate has Subject: %s' % cert.get_subject()
483 print 'FAIL (usage 2): No certificate in the certificate chain (including the end-entity certificate) offered by the server matches the TLSA record'
484 if pre_exit == 0: pre_exit = 2
486 # Cleanup, just in case
491 # END for address in addresses
492 # END for record in records
496 else: # we want to create
498 if not args.certificate:
499 if args.protocol != 'tcp':
500 print >> sys.stderr, 'Only SSL over TCP is supported (sorry)'
503 print 'No certificate specified on the commandline, attempting to retrieve it from the server %s' % (args.host)
504 connection_port = args.port
506 sys.stdout.write('The port specified on the commandline is *, please specify the port of the TLS service on %s (443): ' % args.host)
509 user_input = raw_input()
511 connection_port = 443
514 if 1 <= int(user_input) <= 65535:
515 connection_port = user_input
518 sys.stdout.write('Port %s not numerical or within correct range (1 <= port <= 65535), try again (hit enter for default 443): ' % user_input)
519 # Get the A records for the host
521 addresses = getA(args.host, secure=secure)
522 except InsecureLookupException, e:
523 print >> sys.stderr, str(e)
526 for address in addresses:
527 print 'Attempting to get certificate from %s' % str(address)
528 # We do the certificate handling here, as M2Crypto keeps segfaulting when try to do stuff with the cert if we don't
530 ctx.set_verify(SSL.verify_none, depth=9)
531 connection = SSL.Connection(ctx)
533 connection.connect((str(address), int(connection_port)))
534 except SSL.Checker.WrongHost:
537 chain = connection.get_peer_cert_chain()
538 for chaincert in chain:
539 if int(args.usage) == 1:
540 # The first cert is the end-entity cert
541 print 'Got a certificate with Subject: %s' % chaincert.get_subject()
545 if (int(args.usage) == 0 and chaincert.check_ca()) or int(args.usage) == 2:
546 sys.stdout.write('Got a certificate with the following Subject:\n\t%s.\nUse this as certificate to match? [y/N] ' % chaincert.get_subject())
549 user_input = raw_input()
550 if user_input in ['','n','N']:
552 elif user_input in ['y', 'Y']:
556 sys.stdout.write('Please answer Y or N')
560 if cert: # Print the requested records based on the retrieved certificates
561 if args.output == 'b':
562 print genTLSA(args.host, args.protocol, args.port, cert, 'draft', args.usage, args.selector, args.mtype)
563 print genTLSA(args.host, args.protocol, args.port, cert, 'rfc', args.usage, args.selector, args.mtype)
565 print genTLSA(args.host, args.protocol, args.port, cert, args.output, args.usage, args.selector, args.mtype)
567 else: # Pass the path to the certificate to the genTLSA function
568 if args.output == 'b':
569 print genTLSA(args.host, args.protocol, args.port, args.certificate, 'draft', args.usage, args.selector, args.mtype)
570 print genTLSA(args.host, args.protocol, args.port, args.certificate, 'rfc', args.usage, args.selector, args.mtype)
572 print genTLSA(args.host, args.protocol, args.port, args.certificate, args.output, args.usage, args.selector, args.mtype)