# Copyright (c) Twisted Matrix Laboratories. # See LICENSE for details. """ Tests for L{twisted.cred._digest} and the associated bits in L{twisted.cred.credentials}. """ from __future__ import division, absolute_import import base64 from binascii import hexlify from hashlib import md5, sha1 from zope.interface.verify import verifyObject from twisted.trial.unittest import TestCase from twisted.internet.address import IPv4Address from twisted.cred.error import LoginFailed from twisted.cred.credentials import calcHA1, calcHA2, IUsernameDigestHash from twisted.cred.credentials import calcResponse, DigestCredentialFactory from twisted.python.compat import networkString def b64encode(s): return base64.b64encode(s).strip() class FakeDigestCredentialFactory(DigestCredentialFactory): """ A Fake Digest Credential Factory that generates a predictable nonce and opaque """ def __init__(self, *args, **kwargs): super(FakeDigestCredentialFactory, self).__init__(*args, **kwargs) self.privateKey = b"0" def _generateNonce(self): """ Generate a static nonce """ return b'178288758716122392881254770685' def _getTime(self): """ Return a stable time """ return 0 class DigestAuthTests(TestCase): """ L{TestCase} mixin class which defines a number of tests for L{DigestCredentialFactory}. Because this mixin defines C{setUp}, it must be inherited before L{TestCase}. """ def setUp(self): """ Create a DigestCredentialFactory for testing """ self.username = b"foobar" self.password = b"bazquux" self.realm = b"test realm" self.algorithm = b"md5" self.cnonce = b"29fc54aa1641c6fa0e151419361c8f23" self.qop = b"auth" self.uri = b"/write/" self.clientAddress = IPv4Address('TCP', '10.2.3.4', 43125) self.method = b'GET' self.credentialFactory = DigestCredentialFactory( self.algorithm, self.realm) def test_MD5HashA1(self, _algorithm=b'md5', _hash=md5): """ L{calcHA1} accepts the C{'md5'} algorithm and returns an MD5 hash of its parameters, excluding the nonce and cnonce. """ nonce = b'abc123xyz' hashA1 = calcHA1(_algorithm, self.username, self.realm, self.password, nonce, self.cnonce) a1 = b":".join((self.username, self.realm, self.password)) expected = hexlify(_hash(a1).digest()) self.assertEqual(hashA1, expected) def test_MD5SessionHashA1(self): """ L{calcHA1} accepts the C{'md5-sess'} algorithm and returns an MD5 hash of its parameters, including the nonce and cnonce. """ nonce = b'xyz321abc' hashA1 = calcHA1(b'md5-sess', self.username, self.realm, self.password, nonce, self.cnonce) a1 = self.username + b':' + self.realm + b':' + self.password ha1 = hexlify(md5(a1).digest()) a1 = ha1 + b':' + nonce + b':' + self.cnonce expected = hexlify(md5(a1).digest()) self.assertEqual(hashA1, expected) def test_SHAHashA1(self): """ L{calcHA1} accepts the C{'sha'} algorithm and returns a SHA hash of its parameters, excluding the nonce and cnonce. """ self.test_MD5HashA1(b'sha', sha1) def test_MD5HashA2Auth(self, _algorithm=b'md5', _hash=md5): """ L{calcHA2} accepts the C{'md5'} algorithm and returns an MD5 hash of its arguments, excluding the entity hash for QOP other than C{'auth-int'}. """ method = b'GET' hashA2 = calcHA2(_algorithm, method, self.uri, b'auth', None) a2 = method + b':' + self.uri expected = hexlify(_hash(a2).digest()) self.assertEqual(hashA2, expected) def test_MD5HashA2AuthInt(self, _algorithm=b'md5', _hash=md5): """ L{calcHA2} accepts the C{'md5'} algorithm and returns an MD5 hash of its arguments, including the entity hash for QOP of C{'auth-int'}. """ method = b'GET' hentity = b'foobarbaz' hashA2 = calcHA2(_algorithm, method, self.uri, b'auth-int', hentity) a2 = method + b':' + self.uri + b':' + hentity expected = hexlify(_hash(a2).digest()) self.assertEqual(hashA2, expected) def test_MD5SessHashA2Auth(self): """ L{calcHA2} accepts the C{'md5-sess'} algorithm and QOP of C{'auth'} and returns the same value as it does for the C{'md5'} algorithm. """ self.test_MD5HashA2Auth(b'md5-sess') def test_MD5SessHashA2AuthInt(self): """ L{calcHA2} accepts the C{'md5-sess'} algorithm and QOP of C{'auth-int'} and returns the same value as it does for the C{'md5'} algorithm. """ self.test_MD5HashA2AuthInt(b'md5-sess') def test_SHAHashA2Auth(self): """ L{calcHA2} accepts the C{'sha'} algorithm and returns a SHA hash of its arguments, excluding the entity hash for QOP other than C{'auth-int'}. """ self.test_MD5HashA2Auth(b'sha', sha1) def test_SHAHashA2AuthInt(self): """ L{calcHA2} accepts the C{'sha'} algorithm and returns a SHA hash of its arguments, including the entity hash for QOP of C{'auth-int'}. """ self.test_MD5HashA2AuthInt(b'sha', sha1) def test_MD5HashResponse(self, _algorithm=b'md5', _hash=md5): """ L{calcResponse} accepts the C{'md5'} algorithm and returns an MD5 hash of its parameters, excluding the nonce count, client nonce, and QoP value if the nonce count and client nonce are L{None} """ hashA1 = b'abc123' hashA2 = b'789xyz' nonce = b'lmnopq' response = hashA1 + b':' + nonce + b':' + hashA2 expected = hexlify(_hash(response).digest()) digest = calcResponse(hashA1, hashA2, _algorithm, nonce, None, None, None) self.assertEqual(expected, digest) def test_MD5SessionHashResponse(self): """ L{calcResponse} accepts the C{'md5-sess'} algorithm and returns an MD5 hash of its parameters, excluding the nonce count, client nonce, and QoP value if the nonce count and client nonce are L{None} """ self.test_MD5HashResponse(b'md5-sess') def test_SHAHashResponse(self): """ L{calcResponse} accepts the C{'sha'} algorithm and returns a SHA hash of its parameters, excluding the nonce count, client nonce, and QoP value if the nonce count and client nonce are L{None} """ self.test_MD5HashResponse(b'sha', sha1) def test_MD5HashResponseExtra(self, _algorithm=b'md5', _hash=md5): """ L{calcResponse} accepts the C{'md5'} algorithm and returns an MD5 hash of its parameters, including the nonce count, client nonce, and QoP value if they are specified. """ hashA1 = b'abc123' hashA2 = b'789xyz' nonce = b'lmnopq' nonceCount = b'00000004' clientNonce = b'abcxyz123' qop = b'auth' response = hashA1 + b':' + nonce + b':' + nonceCount + b':' +\ clientNonce + b':' + qop + b':' + hashA2 expected = hexlify(_hash(response).digest()) digest = calcResponse( hashA1, hashA2, _algorithm, nonce, nonceCount, clientNonce, qop) self.assertEqual(expected, digest) def test_MD5SessionHashResponseExtra(self): """ L{calcResponse} accepts the C{'md5-sess'} algorithm and returns an MD5 hash of its parameters, including the nonce count, client nonce, and QoP value if they are specified. """ self.test_MD5HashResponseExtra(b'md5-sess') def test_SHAHashResponseExtra(self): """ L{calcResponse} accepts the C{'sha'} algorithm and returns a SHA hash of its parameters, including the nonce count, client nonce, and QoP value if they are specified. """ self.test_MD5HashResponseExtra(b'sha', sha1) def formatResponse(self, quotes=True, **kw): """ Format all given keyword arguments and their values suitably for use as the value of an HTTP header. @types quotes: C{bool} @param quotes: A flag indicating whether to quote the values of each field in the response. @param **kw: Keywords and C{bytes} values which will be treated as field name/value pairs to include in the result. @rtype: C{bytes} @return: The given fields formatted for use as an HTTP header value. """ if 'username' not in kw: kw['username'] = self.username if 'realm' not in kw: kw['realm'] = self.realm if 'algorithm' not in kw: kw['algorithm'] = self.algorithm if 'qop' not in kw: kw['qop'] = self.qop if 'cnonce' not in kw: kw['cnonce'] = self.cnonce if 'uri' not in kw: kw['uri'] = self.uri if quotes: quote = b'"' else: quote = b'' return b', '.join([ b"".join((networkString(k), b"=", quote, v, quote)) for (k, v) in kw.items() if v is not None]) def getDigestResponse(self, challenge, ncount): """ Calculate the response for the given challenge """ nonce = challenge.get('nonce') algo = challenge.get('algorithm').lower() qop = challenge.get('qop') ha1 = calcHA1( algo, self.username, self.realm, self.password, nonce, self.cnonce) ha2 = calcHA2(algo, b"GET", self.uri, qop, None) expected = calcResponse( ha1, ha2, algo, nonce, ncount, self.cnonce, qop) return expected def test_response(self, quotes=True): """ L{DigestCredentialFactory.decode} accepts a digest challenge response and parses it into an L{IUsernameHashedPassword} provider. """ challenge = self.credentialFactory.getChallenge( self.clientAddress.host) nc = b"00000001" clientResponse = self.formatResponse( quotes=quotes, nonce=challenge['nonce'], response=self.getDigestResponse(challenge, nc), nc=nc, opaque=challenge['opaque']) creds = self.credentialFactory.decode( clientResponse, self.method, self.clientAddress.host) self.assertTrue(creds.checkPassword(self.password)) self.assertFalse(creds.checkPassword(self.password + b'wrong')) def test_responseWithoutQuotes(self): """ L{DigestCredentialFactory.decode} accepts a digest challenge response which does not quote the values of its fields and parses it into an L{IUsernameHashedPassword} provider in the same way it would a response which included quoted field values. """ self.test_response(False) def test_responseWithCommaURI(self): """ L{DigestCredentialFactory.decode} accepts a digest challenge response which quotes the values of its fields and includes a C{b","} in the URI field. """ self.uri = b"/some,path/" self.test_response(True) def test_caseInsensitiveAlgorithm(self): """ The case of the algorithm value in the response is ignored when checking the credentials. """ self.algorithm = b'MD5' self.test_response() def test_md5DefaultAlgorithm(self): """ The algorithm defaults to MD5 if it is not supplied in the response. """ self.algorithm = None self.test_response() def test_responseWithoutClientIP(self): """ L{DigestCredentialFactory.decode} accepts a digest challenge response even if the client address it is passed is L{None}. """ challenge = self.credentialFactory.getChallenge(None) nc = b"00000001" clientResponse = self.formatResponse( nonce=challenge['nonce'], response=self.getDigestResponse(challenge, nc), nc=nc, opaque=challenge['opaque']) creds = self.credentialFactory.decode(clientResponse, self.method, None) self.assertTrue(creds.checkPassword(self.password)) self.assertFalse(creds.checkPassword(self.password + b'wrong')) def test_multiResponse(self): """ L{DigestCredentialFactory.decode} handles multiple responses to a single challenge. """ challenge = self.credentialFactory.getChallenge( self.clientAddress.host) nc = b"00000001" clientResponse = self.formatResponse( nonce=challenge['nonce'], response=self.getDigestResponse(challenge, nc), nc=nc, opaque=challenge['opaque']) creds = self.credentialFactory.decode(clientResponse, self.method, self.clientAddress.host) self.assertTrue(creds.checkPassword(self.password)) self.assertFalse(creds.checkPassword(self.password + b'wrong')) nc = b"00000002" clientResponse = self.formatResponse( nonce=challenge['nonce'], response=self.getDigestResponse(challenge, nc), nc=nc, opaque=challenge['opaque']) creds = self.credentialFactory.decode(clientResponse, self.method, self.clientAddress.host) self.assertTrue(creds.checkPassword(self.password)) self.assertFalse(creds.checkPassword(self.password + b'wrong')) def test_failsWithDifferentMethod(self): """ L{DigestCredentialFactory.decode} returns an L{IUsernameHashedPassword} provider which rejects a correct password for the given user if the challenge response request is made using a different HTTP method than was used to request the initial challenge. """ challenge = self.credentialFactory.getChallenge( self.clientAddress.host) nc = b"00000001" clientResponse = self.formatResponse( nonce=challenge['nonce'], response=self.getDigestResponse(challenge, nc), nc=nc, opaque=challenge['opaque']) creds = self.credentialFactory.decode(clientResponse, b'POST', self.clientAddress.host) self.assertFalse(creds.checkPassword(self.password)) self.assertFalse(creds.checkPassword(self.password + b'wrong')) def test_noUsername(self): """ L{DigestCredentialFactory.decode} raises L{LoginFailed} if the response has no username field or if the username field is empty. """ # Check for no username e = self.assertRaises( LoginFailed, self.credentialFactory.decode, self.formatResponse(username=None), self.method, self.clientAddress.host) self.assertEqual(str(e), "Invalid response, no username given.") # Check for an empty username e = self.assertRaises( LoginFailed, self.credentialFactory.decode, self.formatResponse(username=b""), self.method, self.clientAddress.host) self.assertEqual(str(e), "Invalid response, no username given.") def test_noNonce(self): """ L{DigestCredentialFactory.decode} raises L{LoginFailed} if the response has no nonce. """ e = self.assertRaises( LoginFailed, self.credentialFactory.decode, self.formatResponse(opaque=b"abc123"), self.method, self.clientAddress.host) self.assertEqual(str(e), "Invalid response, no nonce given.") def test_noOpaque(self): """ L{DigestCredentialFactory.decode} raises L{LoginFailed} if the response has no opaque. """ e = self.assertRaises( LoginFailed, self.credentialFactory.decode, self.formatResponse(), self.method, self.clientAddress.host) self.assertEqual(str(e), "Invalid response, no opaque given.") def test_checkHash(self): """ L{DigestCredentialFactory.decode} returns an L{IUsernameDigestHash} provider which can verify a hash of the form 'username:realm:password'. """ challenge = self.credentialFactory.getChallenge( self.clientAddress.host) nc = b"00000001" clientResponse = self.formatResponse( nonce=challenge['nonce'], response=self.getDigestResponse(challenge, nc), nc=nc, opaque=challenge['opaque']) creds = self.credentialFactory.decode(clientResponse, self.method, self.clientAddress.host) self.assertTrue(verifyObject(IUsernameDigestHash, creds)) cleartext = self.username + b":" + self.realm + b":" + self.password hash = md5(cleartext) self.assertTrue(creds.checkHash(hexlify(hash.digest()))) hash.update(b'wrong') self.assertFalse(creds.checkHash(hexlify(hash.digest()))) def test_invalidOpaque(self): """ L{DigestCredentialFactory.decode} raises L{LoginFailed} when the opaque value does not contain all the required parts. """ credentialFactory = FakeDigestCredentialFactory(self.algorithm, self.realm) challenge = credentialFactory.getChallenge(self.clientAddress.host) exc = self.assertRaises( LoginFailed, credentialFactory._verifyOpaque, b'badOpaque', challenge['nonce'], self.clientAddress.host) self.assertEqual(str(exc), 'Invalid response, invalid opaque value') badOpaque = b'foo-' + b64encode(b'nonce,clientip') exc = self.assertRaises( LoginFailed, credentialFactory._verifyOpaque, badOpaque, challenge['nonce'], self.clientAddress.host) self.assertEqual(str(exc), 'Invalid response, invalid opaque value') exc = self.assertRaises( LoginFailed, credentialFactory._verifyOpaque, b'', challenge['nonce'], self.clientAddress.host) self.assertEqual(str(exc), 'Invalid response, invalid opaque value') badOpaque = b'foo-' + b64encode( b",".join((challenge['nonce'], networkString(self.clientAddress.host), b"foobar"))) exc = self.assertRaises( LoginFailed, credentialFactory._verifyOpaque, badOpaque, challenge['nonce'], self.clientAddress.host) self.assertEqual( str(exc), 'Invalid response, invalid opaque/time values') def test_incompatibleNonce(self): """ L{DigestCredentialFactory.decode} raises L{LoginFailed} when the given nonce from the response does not match the nonce encoded in the opaque. """ credentialFactory = FakeDigestCredentialFactory(self.algorithm, self.realm) challenge = credentialFactory.getChallenge(self.clientAddress.host) badNonceOpaque = credentialFactory._generateOpaque( b'1234567890', self.clientAddress.host) exc = self.assertRaises( LoginFailed, credentialFactory._verifyOpaque, badNonceOpaque, challenge['nonce'], self.clientAddress.host) self.assertEqual( str(exc), 'Invalid response, incompatible opaque/nonce values') exc = self.assertRaises( LoginFailed, credentialFactory._verifyOpaque, badNonceOpaque, b'', self.clientAddress.host) self.assertEqual( str(exc), 'Invalid response, incompatible opaque/nonce values') def test_incompatibleClientIP(self): """ L{DigestCredentialFactory.decode} raises L{LoginFailed} when the request comes from a client IP other than what is encoded in the opaque. """ credentialFactory = FakeDigestCredentialFactory(self.algorithm, self.realm) challenge = credentialFactory.getChallenge(self.clientAddress.host) badAddress = '10.0.0.1' # Sanity check self.assertNotEqual(self.clientAddress.host, badAddress) badNonceOpaque = credentialFactory._generateOpaque( challenge['nonce'], badAddress) self.assertRaises( LoginFailed, credentialFactory._verifyOpaque, badNonceOpaque, challenge['nonce'], self.clientAddress.host) def test_oldNonce(self): """ L{DigestCredentialFactory.decode} raises L{LoginFailed} when the given opaque is older than C{DigestCredentialFactory.CHALLENGE_LIFETIME_SECS} """ credentialFactory = FakeDigestCredentialFactory(self.algorithm, self.realm) challenge = credentialFactory.getChallenge(self.clientAddress.host) key = b",".join((challenge['nonce'], networkString(self.clientAddress.host), b'-137876876')) digest = hexlify(md5(key + credentialFactory.privateKey).digest()) ekey = b64encode(key) oldNonceOpaque = b"-".join((digest, ekey.strip(b'\n'))) self.assertRaises( LoginFailed, credentialFactory._verifyOpaque, oldNonceOpaque, challenge['nonce'], self.clientAddress.host) def test_mismatchedOpaqueChecksum(self): """ L{DigestCredentialFactory.decode} raises L{LoginFailed} when the opaque checksum fails verification. """ credentialFactory = FakeDigestCredentialFactory(self.algorithm, self.realm) challenge = credentialFactory.getChallenge(self.clientAddress.host) key = b",".join((challenge['nonce'], networkString(self.clientAddress.host), b'0')) digest = hexlify(md5(key + b'this is not the right pkey').digest()) badChecksum = b"-".join((digest, b64encode(key))) self.assertRaises( LoginFailed, credentialFactory._verifyOpaque, badChecksum, challenge['nonce'], self.clientAddress.host) def test_incompatibleCalcHA1Options(self): """ L{calcHA1} raises L{TypeError} when any of the pszUsername, pszRealm, or pszPassword arguments are specified with the preHA1 keyword argument. """ arguments = ( (b"user", b"realm", b"password", b"preHA1"), (None, b"realm", None, b"preHA1"), (None, None, b"password", b"preHA1"), ) for pszUsername, pszRealm, pszPassword, preHA1 in arguments: self.assertRaises( TypeError, calcHA1, b"md5", pszUsername, pszRealm, pszPassword, b"nonce", b"cnonce", preHA1=preHA1) def test_noNewlineOpaque(self): """ L{DigestCredentialFactory._generateOpaque} returns a value without newlines, regardless of the length of the nonce. """ opaque = self.credentialFactory._generateOpaque( b"long nonce " * 10, None) self.assertNotIn(b'\n', opaque)