# Copyright (c) Twisted Matrix Laboratories. # See LICENSE for details. """ Test cases for Ltwisted.mail.pop3} module. """ from __future__ import print_function import hmac import base64 import itertools from collections import OrderedDict from io import BytesIO from zope.interface import implementer from twisted import cred from twisted import internet from twisted import mail from twisted.internet import defer from twisted.mail import pop3 from twisted.protocols import loopback from twisted.python import failure from twisted.python.compat import intToBytes from twisted.test.proto_helpers import LineSendingProtocol from twisted.trial import unittest, util import twisted.cred.checkers import twisted.cred.credentials import twisted.cred.portal import twisted.internet.protocol import twisted.mail.pop3 import twisted.mail.protocols class UtilityTests(unittest.TestCase): """ Test the various helper functions and classes used by the POP3 server protocol implementation. """ def test_LineBuffering(self): """ Test creating a LineBuffer and feeding it some lines. The lines should build up in its internal buffer for a while and then get spat out to the writer. """ output = [] input = iter(itertools.cycle(['012', '345', '6', '7', '8', '9'])) c = pop3._IteratorBuffer(output.extend, input, 6) i = iter(c) self.assertEqual(output, []) # Nothing is buffer next(i) self.assertEqual(output, []) # '012' is buffered next(i) self.assertEqual(output, []) # '012345' is buffered next(i) self.assertEqual(output, ['012', '345', '6']) # Nothing is buffered for n in range(5): next(i) self.assertEqual(output, ['012', '345', '6', '7', '8', '9', '012', '345']) def test_FinishLineBuffering(self): """ Test that a LineBuffer flushes everything when its iterator is exhausted, and itself raises StopIteration. """ output = [] input = iter(['a', 'b', 'c']) c = pop3._IteratorBuffer(output.extend, input, 5) for i in c: pass self.assertEqual(output, ['a', 'b', 'c']) def test_SuccessResponseFormatter(self): """ Test that the thing that spits out POP3 'success responses' works right. """ self.assertEqual( pop3.successResponse(b'Great.'), b'+OK Great.\r\n') def test_StatLineFormatter(self): """ Test that the function which formats stat lines does so appropriately. """ statLine = list(pop3.formatStatResponse([]))[-1] self.assertEqual(statLine, b'+OK 0 0\r\n') statLine = list(pop3.formatStatResponse([10, 31, 0, 10101]))[-1] self.assertEqual(statLine, b'+OK 4 10142\r\n') def test_ListLineFormatter(self): """ Test that the function which formats the lines in response to a LIST command does so appropriately. """ listLines = list(pop3.formatListResponse([])) self.assertEqual( listLines, [b'+OK 0\r\n', b'.\r\n']) listLines = list(pop3.formatListResponse([1, 2, 3, 100])) self.assertEqual( listLines, [b'+OK 4\r\n', b'1 1\r\n', b'2 2\r\n', b'3 3\r\n', b'4 100\r\n', b'.\r\n']) def test_UIDListLineFormatter(self): """ Test that the function which formats lines in response to a UIDL command does so appropriately. """ uids = ['abc', 'def', 'ghi'] listLines = list(pop3.formatUIDListResponse([], uids.__getitem__)) self.assertEqual( listLines, [b'+OK \r\n', b'.\r\n']) listLines = list(pop3.formatUIDListResponse([123, 431, 591], uids.__getitem__)) self.assertEqual( listLines, [b'+OK \r\n', b'1 abc\r\n', b'2 def\r\n', b'3 ghi\r\n', b'.\r\n']) listLines = list(pop3.formatUIDListResponse([0, None, 591], uids.__getitem__)) self.assertEqual( listLines, [b'+OK \r\n', b'1 abc\r\n', b'3 ghi\r\n', b'.\r\n']) class MyVirtualPOP3(mail.protocols.VirtualPOP3): """ A virtual-domain-supporting POP3 server. """ magic = b'' def authenticateUserAPOP(self, user, digest): """ Authenticate against a user against a virtual domain. @param user: The username. @param digest: The digested password. @return: A three-tuple like the one returned by L{IRealm.requestAvatar}. The mailbox will be for the user given by C{user}. """ user, domain = self.lookupDomain(user) return self.service.domains[b'baz.com'].authenticateUserAPOP( user, digest, self.magic, domain) class DummyDomain: """ A virtual domain for a POP3 server. """ def __init__(self): self.users = {} def addUser(self, name): """ Create a mailbox for a new user. @param name: The username. """ self.users[name] = [] def addMessage(self, name, message): """ Add a message to the mailbox of the named user. @param name: The username. @param message: The contents of the message. """ self.users[name].append(message) def authenticateUserAPOP(self, name, digest, magic, domain): """ Succeed with a L{ListMailbox}. @param name: The name of the user authenticating. @param digest: ignored @param magic: ignored @param domain: ignored @return: A three-tuple like the one returned by L{IRealm.requestAvatar}. The mailbox will be for the user given by C{name}. """ return pop3.IMailbox, ListMailbox(self.users[name]), lambda: None class ListMailbox: """ A simple in-memory list implementation of L{IMailbox}. """ def __init__(self, list): """ @param list: The messages. """ self.list = list def listMessages(self, i=None): """ Get some message information. @param i: See L{pop3.IMailbox.listMessages}. @return: See L{pop3.IMailbox.listMessages}. """ if i is None: return [len(l) for l in self.list] return len(self.list[i]) def getMessage(self, i): """ Get the message content. @param i: See L{pop3.IMailbox.getMessage}. @return: See L{pop3.IMailbox.getMessage}. """ return BytesIO(self.list[i]) def getUidl(self, i): """ Construct a UID by using the given index value. @param i: See L{pop3.IMailbox.getUidl}. @return: See L{pop3.IMailbox.getUidl}. """ return i def deleteMessage(self, i): """ Wipe the message at the given index. @param i: See L{pop3.IMailbox.deleteMessage}. """ self.list[i] = b'' def sync(self): """ No-op. @see: L{pop3.IMailbox.sync} """ class MyPOP3Downloader(pop3.POP3Client): """ A POP3 client which downloads all messages from the server. """ def handle_WELCOME(self, line): """ Authenticate. @param line: The welcome response. """ pop3.POP3Client.handle_WELCOME(self, line) self.apop(b'hello@baz.com', b'world') def handle_APOP(self, line): """ Require an I{OK} response to I{APOP}. @param line: The I{APOP} response. """ parts = line.split() code = parts[0] if code != b'+OK': raise AssertionError('code is: %s , parts is: %s ' % (code, parts)) self.lines = [] self.retr(1) def handle_RETR_continue(self, line): """ Record one line of message information. @param line: A I{RETR} response line. """ self.lines.append(line) def handle_RETR_end(self): """ Record the received message information. """ self.message = b'\n'.join(self.lines) + b'\n' self.quit() def handle_QUIT(self, line): """ Require an I{OK} response to I{QUIT}. @param line: The I{QUIT} response. """ if line[:3] != b'+OK': raise AssertionError(b'code is ' + line) class POP3Tests(unittest.TestCase): """ Tests for L{pop3.POP3}. """ message = b'''\ Subject: urgent Someone set up us the bomb! ''' expectedOutput = (b'''\ +OK \015 +OK Authentication succeeded\015 +OK \015 1 0\015 .\015 +OK ''' + intToBytes(len(message)) + b'''\015 Subject: urgent\015 \015 Someone set up us the bomb!\015 .\015 +OK \015 ''') def setUp(self): """ Set up a POP3 server with virtual domain support. """ self.factory = internet.protocol.Factory() self.factory.domains = {} self.factory.domains[b'baz.com'] = DummyDomain() self.factory.domains[b'baz.com'].addUser(b'hello') self.factory.domains[b'baz.com'].addMessage(b'hello', self.message) def test_messages(self): """ Messages can be downloaded over a loopback TCP connection. """ client = LineSendingProtocol([ b'APOP hello@baz.com world', b'UIDL', b'RETR 1', b'QUIT', ]) server = MyVirtualPOP3() server.service = self.factory def check(ignored): output = b'\r\n'.join(client.response) + b'\r\n' self.assertEqual(output, self.expectedOutput) return loopback.loopbackTCP(server, client).addCallback(check) def test_loopback(self): """ Messages can be downloaded over a loopback connection. """ protocol = MyVirtualPOP3() protocol.service = self.factory clientProtocol = MyPOP3Downloader() def check(ignored): self.assertEqual(clientProtocol.message, self.message) protocol.connectionLost( failure.Failure(Exception("Test harness disconnect"))) d = loopback.loopbackAsync(protocol, clientProtocol) return d.addCallback(check) test_loopback.suppress = [util.suppress( message="twisted.mail.pop3.POP3Client is deprecated")] def test_incorrectDomain(self): """ Look up a user in a domain which this server does not support. """ factory = internet.protocol.Factory() factory.domains = {} factory.domains[b'twistedmatrix.com'] = DummyDomain() server = MyVirtualPOP3() server.service = factory exc = self.assertRaises(pop3.POP3Error, server.authenticateUserAPOP, b'nobody@baz.com', b'password') self.assertEqual(exc.args[0], 'no such domain baz.com') class DummyPOP3(pop3.POP3): """ A simple POP3 server with a hard-coded mailbox for any user. """ magic = b'' def authenticateUserAPOP(self, user, password): """ Succeed with a L{DummyMailbox}. @param user: ignored @param password: ignored @return: A three-tuple like the one returned by L{IRealm.requestAvatar}. """ return pop3.IMailbox, DummyMailbox(ValueError), lambda: None class DummyMailbox(pop3.Mailbox): """ An in-memory L{pop3.IMailbox} implementation. @ivar messages: A sequence of L{bytes} defining the messages in this mailbox. @ivar exceptionType: The type of exception to raise when an out-of-bounds index is addressed. """ messages = [b'From: moshe\nTo: moshe\n\nHow are you, friend?\n'] def __init__(self, exceptionType): self.messages = DummyMailbox.messages[:] self.exceptionType = exceptionType def listMessages(self, i=None): """ Get some message information. @param i: See L{pop3.IMailbox.listMessages}. @return: See L{pop3.IMailbox.listMessages}. """ if i is None: return [len(m) for m in self.messages] if i >= len(self.messages): raise self.exceptionType() return len(self.messages[i]) def getMessage(self, i): """ Get the message content. @param i: See L{pop3.IMailbox.getMessage}. @return: See L{pop3.IMailbox.getMessage}. """ return BytesIO(self.messages[i]) def getUidl(self, i): """ Construct a UID which is simply the string representation of the given index. @param i: See L{pop3.IMailbox.getUidl}. @return: See L{pop3.IMailbox.getUidl}. """ if i >= len(self.messages): raise self.exceptionType() return intToBytes(i) def deleteMessage(self, i): """ Wipe the message at the given index. @param i: See L{pop3.IMailbox.deleteMessage}. """ self.messages[i] = b'' class AnotherPOP3Tests(unittest.TestCase): """ Additional L{pop3.POP3} tests. """ def runTest(self, lines, expectedOutput): """ Assert that when C{lines} are delivered to L{pop3.POP3} it responds with C{expectedOutput}. @param lines: A sequence of L{bytes} representing lines to deliver to the server. @param expectedOutput: A sequence of L{bytes} representing the expected response from the server. @return: A L{Deferred} that fires when the lines have been delivered and the output checked. """ dummy = DummyPOP3() client = LineSendingProtocol(lines) d = loopback.loopbackAsync(dummy, client) return d.addCallback(self._cbRunTest, client, dummy, expectedOutput) def _cbRunTest(self, ignored, client, dummy, expectedOutput): self.assertEqual(b'\r\n'.join(expectedOutput), b'\r\n'.join(client.response)) dummy.connectionLost(failure.Failure( Exception("Test harness disconnect"))) return ignored def test_buffer(self): """ Test a lot of different POP3 commands in an extremely pipelined scenario. This test may cover legitimate behavior, but the intent and granularity are not very good. It would likely be an improvement to split it into a number of smaller, more focused tests. """ return self.runTest( [b"APOP moshez dummy", b"LIST", b"UIDL", b"RETR 1", b"RETR 2", b"DELE 1", b"RETR 1", b"QUIT"], [b'+OK ', b'+OK Authentication succeeded', b'+OK 1', b'1 44', b'.', b'+OK ', b'1 0', b'.', b'+OK 44', b'From: moshe', b'To: moshe', b'', b'How are you, friend?', b'.', b'-ERR Bad message number argument', b'+OK ', b'-ERR message deleted', b'+OK ']) def test_noop(self): """ Test the no-op command. """ return self.runTest( [b'APOP spiv dummy', b'NOOP', b'QUIT'], [b'+OK ', b'+OK Authentication succeeded', b'+OK ', b'+OK ']) def test_authListing(self): """ L{pop3.POP3} responds to an I{AUTH} command with a list of supported authentication types based on its factory's C{challengers}. """ p = DummyPOP3() p.factory = internet.protocol.Factory() p.factory.challengers = {b'Auth1': None, b'secondAuth': None, b'authLast': None} client = LineSendingProtocol([ b"AUTH", b"QUIT", ]) d = loopback.loopbackAsync(p, client) return d.addCallback(self._cbTestAuthListing, client) def _cbTestAuthListing(self, ignored, client): self.assertTrue(client.response[1].startswith(b'+OK')) self.assertEqual(sorted(client.response[2:5]), [b"AUTH1", b"AUTHLAST", b"SECONDAUTH"]) self.assertEqual(client.response[5], b".") def test_illegalPASS(self): """ L{pop3.POP3} handles a I{PASS} command before a I{USER} command with an error indicating the out-of-sequence operation. """ dummy = DummyPOP3() client = LineSendingProtocol([ b"PASS fooz", b"QUIT" ]) d = loopback.loopbackAsync(dummy, client) return d.addCallback(self._cbTestIllegalPASS, client, dummy) def _cbTestIllegalPASS(self, ignored, client, dummy): expectedOutput = ( b'+OK \r\n-ERR USER required before PASS\r\n+OK \r\n') self.assertEqual(expectedOutput, b'\r\n'.join(client.response) + b'\r\n') dummy.connectionLost(failure.Failure( Exception("Test harness disconnect"))) def test_emptyPASS(self): """ L{pop3.POP3} handles a I{PASS} command with a password equal to C{""} as it would any other value. """ dummy = DummyPOP3() client = LineSendingProtocol([ b"PASS ", b"QUIT" ]) d = loopback.loopbackAsync(dummy, client) return d.addCallback(self._cbTestEmptyPASS, client, dummy) def _cbTestEmptyPASS(self, ignored, client, dummy): expectedOutput = ( b'+OK \r\n-ERR USER required before PASS\r\n+OK \r\n') self.assertEqual(expectedOutput, b'\r\n'.join(client.response) + b'\r\n') dummy.connectionLost(failure.Failure( Exception("Test harness disconnect"))) def test_badUTF8CharactersInCommand(self): """ Sending a command with invalid UTF-8 characters will raise a L{pop3.POP3Error}. """ error = str(b'not authenticated yet: cannot do \x81PASS') if not isinstance(error, bytes): error = error.encode("utf-8") d = self.runTest( [b'\x81PASS', b'QUIT'], [b'+OK ', b"-ERR bad protocol or server: POP3Error: " + error, b'+OK ']) errors = self.flushLoggedErrors(pop3.POP3Error) self.assertEqual(len(errors), 1) return d @implementer(pop3.IServerFactory) class TestServerFactory: """ A L{pop3.IServerFactory} implementation, for use by the test suite, with some behavior controlled by the values of (settable) public attributes and other behavior based on values hard-coded both here and in some test methods. """ def cap_IMPLEMENTATION(self): """ Return the hard-coded value. @return: L{pop3.IServerFactory} """ return "Test Implementation String" def cap_EXPIRE(self): """ Return the hard-coded value. @return: L{pop3.IServerFactory} """ return 60 challengers = OrderedDict([(b"SCHEME_1", None), (b"SCHEME_2", None)]) def cap_LOGIN_DELAY(self): """ Return the hard-coded value. @return: L{pop3.IServerFactory} """ return 120 pue = True def perUserExpiration(self): """ Return the hard-coded value. @return: L{pop3.IServerFactory} """ return self.pue puld = True def perUserLoginDelay(self): """ Return the hard-coded value. @return: L{pop3.IServerFactory} """ return self.puld class TestMailbox: """ An incomplete L{IMailbox} implementation with certain per-user values hard-coded and known by tests in this module. This is useful for testing the server's per-user capability implementation. """ loginDelay = 100 messageExpiration = 25 def contained(testcase, s, *caps): """ Assert that the given capability is included in all of the capability sets. @param testcase: A L{unittest.TestCase} to use to make assertions. @param s: The capability for which to check. @type s: L{bytes} @param caps: The capability sets in which to check. @type caps: L{tuple} of iterable """ for c in caps: testcase.assertIn(s, c) class CapabilityTests(unittest.TestCase): """ Tests for L{pop3.POP3}'s per-user capability handling. """ def setUp(self): """ Create a POP3 server with some capabilities. """ s = BytesIO() p = pop3.POP3() p.factory = TestServerFactory() p.transport = internet.protocol.FileWrapper(s) p.connectionMade() p.do_CAPA() self.caps = p.listCapabilities() self.pcaps = s.getvalue().splitlines() s = BytesIO() p.mbox = TestMailbox() p.transport = internet.protocol.FileWrapper(s) p.do_CAPA() self.lpcaps = s.getvalue().splitlines() p.connectionLost(failure.Failure(Exception("Test harness disconnect"))) def test_UIDL(self): """ The server can advertise the I{UIDL} capability. """ contained(self, b"UIDL", self.caps, self.pcaps, self.lpcaps) def test_TOP(self): """ The server can advertise the I{TOP} capability. """ contained(self, b"TOP", self.caps, self.pcaps, self.lpcaps) def test_USER(self): """ The server can advertise the I{USER} capability. """ contained(self, b"USER", self.caps, self.pcaps, self.lpcaps) def test_EXPIRE(self): """ The server can advertise its per-user expiration as well as a global expiration. """ contained(self, b"EXPIRE 60 USER", self.caps, self.pcaps) contained(self, b"EXPIRE 25", self.lpcaps) def test_IMPLEMENTATION(self): """ The server can advertise its implementation string. """ contained( self, b"IMPLEMENTATION Test Implementation String", self.caps, self.pcaps, self.lpcaps ) def test_SASL(self): """ The server can advertise the SASL schemes it supports. """ contained( self, b"SASL SCHEME_1 SCHEME_2", self.caps, self.pcaps, self.lpcaps ) def test_LOGIN_DELAY(self): """ The can advertise a per-user login delay as well as a global login delay. """ contained(self, b"LOGIN-DELAY 120 USER", self.caps, self.pcaps) self.assertIn(b"LOGIN-DELAY 100", self.lpcaps) class GlobalCapabilitiesTests(unittest.TestCase): """ Tests for L{pop3.POP3}'s global capability handling. """ def setUp(self): """ Create a POP3 server with some capabilities. """ s = BytesIO() p = pop3.POP3() p.factory = TestServerFactory() p.factory.pue = p.factory.puld = False p.transport = internet.protocol.FileWrapper(s) p.connectionMade() p.do_CAPA() self.caps = p.listCapabilities() self.pcaps = s.getvalue().splitlines() s = BytesIO() p.mbox = TestMailbox() p.transport = internet.protocol.FileWrapper(s) p.do_CAPA() self.lpcaps = s.getvalue().splitlines() p.connectionLost(failure.Failure(Exception("Test harness disconnect"))) def test_EXPIRE(self): """ I{EXPIRE} is in the server's advertised capabilities. """ contained(self, b"EXPIRE 60", self.caps, self.pcaps, self.lpcaps) def test_LOGIN_DELAY(self): """ I{LOGIN-DELAY} is in the server's advertised capabilities. """ contained(self, b"LOGIN-DELAY 120", self.caps, self.pcaps, self.lpcaps) class TestRealm: """ An L{IRealm} which knows about a single test account's mailbox. """ def requestAvatar(self, avatarId, mind, *interfaces): """ Retrieve a mailbox for I{testuser} or fail. @param avatarId: See L{IRealm.requestAvatar}. @param mind: See L{IRealm.requestAvatar}. @param interfaces: See L{IRealm.requestAvatar}. @raises: L{AssertionError} when requesting an C{avatarId} other than I{testuser}. """ if avatarId == b'testuser': return pop3.IMailbox, DummyMailbox(ValueError), lambda: None assert False class SASLTests(unittest.TestCase): """ Tests for L{pop3.POP3}'s SASL implementation. """ def test_ValidLogin(self): """ A CRAM-MD5-based SASL login attempt succeeds if it uses a username and a hashed password known to the server's credentials checker. """ p = pop3.POP3() p.factory = TestServerFactory() p.factory.challengers = {b'CRAM-MD5': cred.credentials.CramMD5Credentials} p.portal = cred.portal.Portal(TestRealm()) ch = cred.checkers.InMemoryUsernamePasswordDatabaseDontUse() ch.addUser(b'testuser', b'testpassword') p.portal.registerChecker(ch) s = BytesIO() p.transport = internet.protocol.FileWrapper(s) p.connectionMade() p.lineReceived(b"CAPA") self.assertTrue(s.getvalue().find(b"SASL CRAM-MD5") >= 0) p.lineReceived(b"AUTH CRAM-MD5") chal = s.getvalue().splitlines()[-1][2:] chal = base64.decodestring(chal) response = hmac.HMAC(b'testpassword', chal).hexdigest().encode("ascii") p.lineReceived( base64.encodestring(b'testuser ' + response).rstrip(b'\n')) self.assertTrue(p.mbox) self.assertTrue(s.getvalue().splitlines()[-1].find(b"+OK") >= 0) p.connectionLost(failure.Failure(Exception("Test harness disconnect"))) class CommandMixin: """ Tests for all the commands a POP3 server is allowed to receive. """ extraMessage = b'''\ From: guy To: fellow More message text for you. ''' def setUp(self): """ Make a POP3 server protocol instance hooked up to a simple mailbox and a transport that buffers output to a BytesIO. """ p = pop3.POP3() p.mbox = self.mailboxType(self.exceptionType) p.schedule = list self.pop3Server = p s = BytesIO() p.transport = internet.protocol.FileWrapper(s) p.connectionMade() s.seek(0) s.truncate(0) self.pop3Transport = s def tearDown(self): """ Disconnect the server protocol so it can clean up anything it might need to clean up. """ self.pop3Server.connectionLost(failure.Failure( Exception("Test harness disconnect"))) def _flush(self): """ Do some of the things that the reactor would take care of, if the reactor were actually running. """ # Oh man FileWrapper is pooh. self.pop3Server.transport._checkProducer() def test_LIST(self): """ Test the two forms of list: with a message index number, which should return a short-form response, and without a message index number, which should return a long-form response, one line per message. """ p = self.pop3Server s = self.pop3Transport p.lineReceived(b"LIST 1") self._flush() self.assertEqual(s.getvalue(), b"+OK 1 44\r\n") s.seek(0) s.truncate(0) p.lineReceived(b"LIST") self._flush() self.assertEqual(s.getvalue(), b"+OK 1\r\n1 44\r\n.\r\n") def test_LISTWithBadArgument(self): """ Test that non-integers and out-of-bound integers produce appropriate error responses. """ p = self.pop3Server s = self.pop3Transport p.lineReceived(b"LIST a") self.assertEqual( s.getvalue(), b"-ERR Invalid message-number: a\r\n") s.seek(0) s.truncate(0) p.lineReceived(b"LIST 0") self.assertEqual( s.getvalue(), b"-ERR Invalid message-number: 0\r\n") s.seek(0) s.truncate(0) p.lineReceived(b"LIST 2") self.assertEqual( s.getvalue(), b"-ERR Invalid message-number: 2\r\n") s.seek(0) s.truncate(0) def test_UIDL(self): """ Test the two forms of the UIDL command. These are just like the two forms of the LIST command. """ p = self.pop3Server s = self.pop3Transport p.lineReceived(b"UIDL 1") self.assertEqual(s.getvalue(), b"+OK 0\r\n") s.seek(0) s.truncate(0) p.lineReceived(b"UIDL") self._flush() self.assertEqual(s.getvalue(), b"+OK \r\n1 0\r\n.\r\n") def test_UIDLWithBadArgument(self): """ Test that UIDL with a non-integer or an out-of-bounds integer produces the appropriate error response. """ p = self.pop3Server s = self.pop3Transport p.lineReceived(b"UIDL a") self.assertEqual( s.getvalue(), b"-ERR Bad message number argument\r\n") s.seek(0) s.truncate(0) p.lineReceived(b"UIDL 0") self.assertEqual( s.getvalue(), b"-ERR Bad message number argument\r\n") s.seek(0) s.truncate(0) p.lineReceived(b"UIDL 2") self.assertEqual( s.getvalue(), b"-ERR Bad message number argument\r\n") s.seek(0) s.truncate(0) def test_STAT(self): """ Test the single form of the STAT command, which returns a short-form response of the number of messages in the mailbox and their total size. """ p = self.pop3Server s = self.pop3Transport p.lineReceived(b"STAT") self._flush() self.assertEqual(s.getvalue(), b"+OK 1 44\r\n") def test_RETR(self): """ Test downloading a message. """ p = self.pop3Server s = self.pop3Transport p.lineReceived(b"RETR 1") self._flush() self.assertEqual( s.getvalue(), b"+OK 44\r\n" b"From: moshe\r\n" b"To: moshe\r\n" b"\r\n" b"How are you, friend?\r\n" b".\r\n") s.seek(0) s.truncate(0) def test_RETRWithBadArgument(self): """ Test that trying to download a message with a bad argument, either not an integer or an out-of-bounds integer, fails with the appropriate error response. """ p = self.pop3Server s = self.pop3Transport p.lineReceived(b"RETR a") self.assertEqual( s.getvalue(), b"-ERR Bad message number argument\r\n") s.seek(0) s.truncate(0) p.lineReceived(b"RETR 0") self.assertEqual( s.getvalue(), b"-ERR Bad message number argument\r\n") s.seek(0) s.truncate(0) p.lineReceived(b"RETR 2") self.assertEqual( s.getvalue(), b"-ERR Bad message number argument\r\n") s.seek(0) s.truncate(0) def test_TOP(self): """ Test downloading the headers and part of the body of a message. """ p = self.pop3Server s = self.pop3Transport p.mbox.messages.append(self.extraMessage) p.lineReceived(b"TOP 1 0") self._flush() self.assertEqual( s.getvalue(), b"+OK Top of message follows\r\n" b"From: moshe\r\n" b"To: moshe\r\n" b"\r\n" b".\r\n") def test_TOPWithBadArgument(self): """ Test that trying to download a message with a bad argument, either a message number which isn't an integer or is an out-of-bounds integer or a number of lines which isn't an integer or is a negative integer, fails with the appropriate error response. """ p = self.pop3Server s = self.pop3Transport p.mbox.messages.append(self.extraMessage) p.lineReceived(b"TOP 1 a") self.assertEqual( s.getvalue(), b"-ERR Bad line count argument\r\n") s.seek(0) s.truncate(0) p.lineReceived(b"TOP 1 -1") self.assertEqual( s.getvalue(), b"-ERR Bad line count argument\r\n") s.seek(0) s.truncate(0) p.lineReceived(b"TOP a 1") self.assertEqual( s.getvalue(), b"-ERR Bad message number argument\r\n") s.seek(0) s.truncate(0) p.lineReceived(b"TOP 0 1") self.assertEqual( s.getvalue(), b"-ERR Bad message number argument\r\n") s.seek(0) s.truncate(0) p.lineReceived(b"TOP 3 1") self.assertEqual( s.getvalue(), b"-ERR Bad message number argument\r\n") s.seek(0) s.truncate(0) def test_LAST(self): """ Test the exceedingly pointless LAST command, which tells you the highest message index which you have already downloaded. """ p = self.pop3Server s = self.pop3Transport p.mbox.messages.append(self.extraMessage) p.lineReceived(b'LAST') self.assertEqual( s.getvalue(), b"+OK 0\r\n") s.seek(0) s.truncate(0) def test_RetrieveUpdatesHighest(self): """ Test that issuing a RETR command updates the LAST response. """ p = self.pop3Server s = self.pop3Transport p.mbox.messages.append(self.extraMessage) p.lineReceived(b'RETR 2') self._flush() s.seek(0) s.truncate(0) p.lineReceived(b'LAST') self.assertEqual( s.getvalue(), b'+OK 2\r\n') s.seek(0) s.truncate(0) def test_TopUpdatesHighest(self): """ Test that issuing a TOP command updates the LAST response. """ p = self.pop3Server s = self.pop3Transport p.mbox.messages.append(self.extraMessage) p.lineReceived(b'TOP 2 10') self._flush() s.seek(0) s.truncate(0) p.lineReceived(b'LAST') self.assertEqual( s.getvalue(), b'+OK 2\r\n') def test_HighestOnlyProgresses(self): """ Test that downloading a message with a smaller index than the current LAST response doesn't change the LAST response. """ p = self.pop3Server s = self.pop3Transport p.mbox.messages.append(self.extraMessage) p.lineReceived(b'RETR 2') self._flush() p.lineReceived(b'TOP 1 10') self._flush() s.seek(0) s.truncate(0) p.lineReceived(b'LAST') self.assertEqual( s.getvalue(), b'+OK 2\r\n') def test_ResetClearsHighest(self): """ Test that issuing RSET changes the LAST response to 0. """ p = self.pop3Server s = self.pop3Transport p.mbox.messages.append(self.extraMessage) p.lineReceived(b'RETR 2') self._flush() p.lineReceived(b'RSET') s.seek(0) s.truncate(0) p.lineReceived(b'LAST') self.assertEqual( s.getvalue(), b'+OK 0\r\n') _listMessageDeprecation = ( "twisted.mail.pop3.IMailbox.listMessages may not " "raise IndexError for out-of-bounds message numbers: " "raise ValueError instead.") _listMessageSuppression = util.suppress( message=_listMessageDeprecation, category=PendingDeprecationWarning) _getUidlDeprecation = ( "twisted.mail.pop3.IMailbox.getUidl may not " "raise IndexError for out-of-bounds message numbers: " "raise ValueError instead.") _getUidlSuppression = util.suppress( message=_getUidlDeprecation, category=PendingDeprecationWarning) class IndexErrorCommandTests(CommandMixin, unittest.TestCase): """ Run all of the command tests against a mailbox which raises IndexError when an out of bounds request is made. This behavior will be deprecated shortly and then removed. """ exceptionType = IndexError mailboxType = DummyMailbox def test_LISTWithBadArgument(self): """ An attempt to get metadata about a message with a bad argument fails with an I{ERR} response even if the mailbox implementation raises L{IndexError}. """ return CommandMixin.test_LISTWithBadArgument(self) test_LISTWithBadArgument.suppress = [_listMessageSuppression] def test_UIDLWithBadArgument(self): """ An attempt to look up the UID of a message with a bad argument fails with an I{ERR} response even if the mailbox implementation raises L{IndexError}. """ return CommandMixin.test_UIDLWithBadArgument(self) test_UIDLWithBadArgument.suppress = [_getUidlSuppression] def test_TOPWithBadArgument(self): """ An attempt to download some of a message with a bad argument fails with an I{ERR} response even if the mailbox implementation raises L{IndexError}. """ return CommandMixin.test_TOPWithBadArgument(self) test_TOPWithBadArgument.suppress = [_listMessageSuppression] def test_RETRWithBadArgument(self): """ An attempt to download a message with a bad argument fails with an I{ERR} response even if the mailbox implementation raises L{IndexError}. """ return CommandMixin.test_RETRWithBadArgument(self) test_RETRWithBadArgument.suppress = [_listMessageSuppression] class ValueErrorCommandTests(CommandMixin, unittest.TestCase): """ Run all of the command tests against a mailbox which raises ValueError when an out of bounds request is made. This is the correct behavior and after support for mailboxes which raise IndexError is removed, this will become just C{CommandTestCase}. """ exceptionType = ValueError mailboxType = DummyMailbox class SyncDeferredMailbox(DummyMailbox): """ Mailbox which has a listMessages implementation which returns a Deferred which has already fired. """ def listMessages(self, n=None): """ Synchronously list messages. @type n: L{int} or L{None} @param n: The 0-based index of the message. @return: A L{Deferred} which already has a message list result. """ return defer.succeed(DummyMailbox.listMessages(self, n)) class IndexErrorSyncDeferredCommandTests(IndexErrorCommandTests): """ Run all of the L{IndexErrorCommandTests} tests with a synchronous-Deferred returning IMailbox implementation. """ mailboxType = SyncDeferredMailbox class ValueErrorSyncDeferredCommandTests(ValueErrorCommandTests): """ Run all of the L{ValueErrorCommandTests} tests with a synchronous-Deferred returning IMailbox implementation. """ mailboxType = SyncDeferredMailbox class AsyncDeferredMailbox(DummyMailbox): """ Mailbox which has a listMessages implementation which returns a Deferred which has not yet fired. """ def __init__(self, *a, **kw): self.waiting = [] DummyMailbox.__init__(self, *a, **kw) def listMessages(self, n=None): """ Record a new unfired L{Deferred} in C{self.waiting} and return it. @type n: L{int} or L{None} @param n: The 0-based index of the message. @return: The L{Deferred} """ d = defer.Deferred() # See AsyncDeferredMailbox._flush self.waiting.append((d, DummyMailbox.listMessages(self, n))) return d class IndexErrorAsyncDeferredCommandTests(IndexErrorCommandTests): """ Run all of the L{IndexErrorCommandTests} tests with an asynchronous-Deferred returning IMailbox implementation. """ mailboxType = AsyncDeferredMailbox def _flush(self): """ Fire whatever Deferreds we've built up in our mailbox. """ while self.pop3Server.mbox.waiting: d, a = self.pop3Server.mbox.waiting.pop() d.callback(a) IndexErrorCommandTests._flush(self) class ValueErrorAsyncDeferredCommandTests(ValueErrorCommandTests): """ Run all of the L{IndexErrorCommandTests} tests with an asynchronous-Deferred returning IMailbox implementation. """ mailboxType = AsyncDeferredMailbox def _flush(self): """ Fire whatever Deferreds we've built up in our mailbox. """ while self.pop3Server.mbox.waiting: d, a = self.pop3Server.mbox.waiting.pop() d.callback(a) ValueErrorCommandTests._flush(self) class POP3MiscTests(unittest.TestCase): """ Miscellaneous tests more to do with module/package structure than anything to do with the Post Office Protocol. """ def test_all(self): """ This test checks that all names listed in twisted.mail.pop3.__all__ are actually present in the module. """ mod = twisted.mail.pop3 for attr in mod.__all__: self.assertTrue(hasattr(mod, attr))