# Copyright (c) 2005 Divmod, Inc. # Copyright (c) Twisted Matrix Laboratories. # See LICENSE for details. """ Tests for Twisted plugin system. """ from __future__ import absolute_import, division import sys, errno, os, time import compileall import functools from zope.interface import Interface from twisted.trial import unittest from twisted.python.compat import _PY3, _PYPY from twisted.python.log import textFromEventDict, addObserver, removeObserver from twisted.python.filepath import FilePath from twisted import plugin if _PY3: from importlib import invalidate_caches as invalidateImportCaches else: def invalidateImportCaches(): """ On python 2, import caches don't need to be invalidated. """ class ITestPlugin(Interface): """ A plugin for use by the plugin system's unit tests. Do not use this. """ class ITestPlugin2(Interface): """ See L{ITestPlugin}. """ class PluginTests(unittest.TestCase): """ Tests which verify the behavior of the current, active Twisted plugins directory. """ def setUp(self): """ Save C{sys.path} and C{sys.modules}, and create a package for tests. """ self.originalPath = sys.path[:] self.savedModules = sys.modules.copy() self.root = FilePath(self.mktemp()) self.root.createDirectory() self.package = self.root.child('mypackage') self.package.createDirectory() self.package.child('__init__.py').setContent(b"") FilePath(__file__).sibling('plugin_basic.py' ).copyTo(self.package.child('testplugin.py')) self.originalPlugin = "testplugin" sys.path.insert(0, self.root.path) import mypackage self.module = mypackage def tearDown(self): """ Restore C{sys.path} and C{sys.modules} to their original values. """ sys.path[:] = self.originalPath sys.modules.clear() sys.modules.update(self.savedModules) def _unimportPythonModule(self, module, deleteSource=False): modulePath = module.__name__.split('.') packageName = '.'.join(modulePath[:-1]) moduleName = modulePath[-1] delattr(sys.modules[packageName], moduleName) del sys.modules[module.__name__] for ext in ['c', 'o'] + (deleteSource and [''] or []): try: os.remove(module.__file__ + ext) except OSError as ose: if ose.errno != errno.ENOENT: raise def _clearCache(self): """ Remove the plugins B{droping.cache} file. """ self.package.child('dropin.cache').remove() def _withCacheness(meth): """ This is a paranoid test wrapper, that calls C{meth} 2 times, clear the cache, and calls it 2 other times. It's supposed to ensure that the plugin system behaves correctly no matter what the state of the cache is. """ @functools.wraps(meth) def wrapped(self): meth(self) meth(self) self._clearCache() meth(self) meth(self) return wrapped def test_cache(self): """ Check that the cache returned by L{plugin.getCache} hold the plugin B{testplugin}, and that this plugin has the properties we expect: provide L{TestPlugin}, has the good name and description, and can be loaded successfully. """ cache = plugin.getCache(self.module) dropin = cache[self.originalPlugin] self.assertEqual(dropin.moduleName, 'mypackage.%s' % (self.originalPlugin,)) self.assertIn("I'm a test drop-in.", dropin.description) # Note, not the preferred way to get a plugin by its interface. p1 = [p for p in dropin.plugins if ITestPlugin in p.provided][0] self.assertIs(p1.dropin, dropin) self.assertEqual(p1.name, "TestPlugin") # Check the content of the description comes from the plugin module # docstring self.assertEqual( p1.description.strip(), "A plugin used solely for testing purposes.") self.assertEqual(p1.provided, [ITestPlugin, plugin.IPlugin]) realPlugin = p1.load() # The plugin should match the class present in sys.modules self.assertIs( realPlugin, sys.modules['mypackage.%s' % (self.originalPlugin,)].TestPlugin) # And it should also match if we import it classicly import mypackage.testplugin as tp self.assertIs(realPlugin, tp.TestPlugin) test_cache = _withCacheness(test_cache) def test_cacheRepr(self): """ L{CachedPlugin} has a helpful C{repr} which contains relevant information about it. """ cachedDropin = plugin.getCache(self.module)[self.originalPlugin] cachedPlugin = list(p for p in cachedDropin.plugins if p.name == 'TestPlugin')[0] self.assertEqual( repr(cachedPlugin), "" ) def test_plugins(self): """ L{plugin.getPlugins} should return the list of plugins matching the specified interface (here, L{ITestPlugin2}), and these plugins should be instances of classes with a C{test} method, to be sure L{plugin.getPlugins} load classes correctly. """ plugins = list(plugin.getPlugins(ITestPlugin2, self.module)) self.assertEqual(len(plugins), 2) names = ['AnotherTestPlugin', 'ThirdTestPlugin'] for p in plugins: names.remove(p.__name__) p.test() test_plugins = _withCacheness(test_plugins) def test_detectNewFiles(self): """ Check that L{plugin.getPlugins} is able to detect plugins added at runtime. """ FilePath(__file__).sibling('plugin_extra1.py' ).copyTo(self.package.child('pluginextra.py')) try: # Check that the current situation is clean self.failIfIn('mypackage.pluginextra', sys.modules) self.assertFalse(hasattr(sys.modules['mypackage'], 'pluginextra'), "mypackage still has pluginextra module") plgs = list(plugin.getPlugins(ITestPlugin, self.module)) # We should find 2 plugins: the one in testplugin, and the one in # pluginextra self.assertEqual(len(plgs), 2) names = ['TestPlugin', 'FourthTestPlugin'] for p in plgs: names.remove(p.__name__) p.test1() finally: self._unimportPythonModule( sys.modules['mypackage.pluginextra'], True) test_detectNewFiles = _withCacheness(test_detectNewFiles) def test_detectFilesChanged(self): """ Check that if the content of a plugin change, L{plugin.getPlugins} is able to detect the new plugins added. """ FilePath(__file__).sibling('plugin_extra1.py' ).copyTo(self.package.child('pluginextra.py')) try: plgs = list(plugin.getPlugins(ITestPlugin, self.module)) # Sanity check self.assertEqual(len(plgs), 2) FilePath(__file__).sibling('plugin_extra2.py' ).copyTo(self.package.child('pluginextra.py')) # Fake out Python. self._unimportPythonModule(sys.modules['mypackage.pluginextra']) # Make sure additions are noticed plgs = list(plugin.getPlugins(ITestPlugin, self.module)) self.assertEqual(len(plgs), 3) names = ['TestPlugin', 'FourthTestPlugin', 'FifthTestPlugin'] for p in plgs: names.remove(p.__name__) p.test1() finally: self._unimportPythonModule( sys.modules['mypackage.pluginextra'], True) test_detectFilesChanged = _withCacheness(test_detectFilesChanged) def test_detectFilesRemoved(self): """ Check that when a dropin file is removed, L{plugin.getPlugins} doesn't return it anymore. """ FilePath(__file__).sibling('plugin_extra1.py' ).copyTo(self.package.child('pluginextra.py')) try: # Generate a cache with pluginextra in it. list(plugin.getPlugins(ITestPlugin, self.module)) finally: self._unimportPythonModule( sys.modules['mypackage.pluginextra'], True) plgs = list(plugin.getPlugins(ITestPlugin, self.module)) self.assertEqual(1, len(plgs)) test_detectFilesRemoved = _withCacheness(test_detectFilesRemoved) def test_nonexistentPathEntry(self): """ Test that getCache skips over any entries in a plugin package's C{__path__} which do not exist. """ path = self.mktemp() self.assertFalse(os.path.exists(path)) # Add the test directory to the plugins path self.module.__path__.append(path) try: plgs = list(plugin.getPlugins(ITestPlugin, self.module)) self.assertEqual(len(plgs), 1) finally: self.module.__path__.remove(path) test_nonexistentPathEntry = _withCacheness(test_nonexistentPathEntry) def test_nonDirectoryChildEntry(self): """ Test that getCache skips over any entries in a plugin package's C{__path__} which refer to children of paths which are not directories. """ path = FilePath(self.mktemp()) self.assertFalse(path.exists()) path.touch() child = path.child("test_package").path self.module.__path__.append(child) try: plgs = list(plugin.getPlugins(ITestPlugin, self.module)) self.assertEqual(len(plgs), 1) finally: self.module.__path__.remove(child) test_nonDirectoryChildEntry = _withCacheness(test_nonDirectoryChildEntry) def test_deployedMode(self): """ The C{dropin.cache} file may not be writable: the cache should still be attainable, but an error should be logged to show that the cache couldn't be updated. """ # Generate the cache plugin.getCache(self.module) cachepath = self.package.child('dropin.cache') # Add a new plugin FilePath(__file__).sibling('plugin_extra1.py' ).copyTo(self.package.child('pluginextra.py')) invalidateImportCaches() os.chmod(self.package.path, 0o500) # Change the right of dropin.cache too for windows os.chmod(cachepath.path, 0o400) self.addCleanup(os.chmod, self.package.path, 0o700) self.addCleanup(os.chmod, cachepath.path, 0o700) # Start observing log events to see the warning events = [] addObserver(events.append) self.addCleanup(removeObserver, events.append) cache = plugin.getCache(self.module) # The new plugin should be reported self.assertIn('pluginextra', cache) self.assertIn(self.originalPlugin, cache) # Make sure something was logged about the cache. expected = "Unable to write to plugin cache %s: error number %d" % ( cachepath.path, errno.EPERM) for event in events: if expected in textFromEventDict(event): break else: self.fail( "Did not observe unwriteable cache warning in log " "events: %r" % (events,)) # This is something like the Twisted plugins file. pluginInitFile = b""" from twisted.plugin import pluginPackagePaths __path__.extend(pluginPackagePaths(__name__)) __all__ = [] """ def pluginFileContents(name): return ( "from zope.interface import provider\n" "from twisted.plugin import IPlugin\n" "from twisted.test.test_plugin import ITestPlugin\n" "\n" "@provider(IPlugin, ITestPlugin)\n" "class {0}(object):\n" " pass\n" ).format(name).encode('ascii') def _createPluginDummy(entrypath, pluginContent, real, pluginModule): """ Create a plugindummy package. """ entrypath.createDirectory() pkg = entrypath.child('plugindummy') pkg.createDirectory() if real: pkg.child('__init__.py').setContent(b'') plugs = pkg.child('plugins') plugs.createDirectory() if real: plugs.child('__init__.py').setContent(pluginInitFile) plugs.child(pluginModule + '.py').setContent(pluginContent) return plugs class DeveloperSetupTests(unittest.TestCase): """ These tests verify things about the plugin system without actually interacting with the deployed 'twisted.plugins' package, instead creating a temporary package. """ def setUp(self): """ Create a complex environment with multiple entries on sys.path, akin to a developer's environment who has a development (trunk) checkout of Twisted, a system installed version of Twisted (for their operating system's tools) and a project which provides Twisted plugins. """ self.savedPath = sys.path[:] self.savedModules = sys.modules.copy() self.fakeRoot = FilePath(self.mktemp()) self.fakeRoot.createDirectory() self.systemPath = self.fakeRoot.child('system_path') self.devPath = self.fakeRoot.child('development_path') self.appPath = self.fakeRoot.child('application_path') self.systemPackage = _createPluginDummy( self.systemPath, pluginFileContents('system'), True, 'plugindummy_builtin') self.devPackage = _createPluginDummy( self.devPath, pluginFileContents('dev'), True, 'plugindummy_builtin') self.appPackage = _createPluginDummy( self.appPath, pluginFileContents('app'), False, 'plugindummy_app') # Now we're going to do the system installation. sys.path.extend([x.path for x in [self.systemPath, self.appPath]]) # Run all the way through the plugins list to cause the # L{plugin.getPlugins} generator to write cache files for the system # installation. self.getAllPlugins() self.sysplug = self.systemPath.child('plugindummy').child('plugins') self.syscache = self.sysplug.child('dropin.cache') # Make sure there's a nice big difference in modification times so that # we won't re-build the system cache. now = time.time() os.utime( self.sysplug.child('plugindummy_builtin.py').path, (now - 5000,) * 2) os.utime(self.syscache.path, (now - 2000,) * 2) # For extra realism, let's make sure that the system path is no longer # writable. self.lockSystem() self.resetEnvironment() def lockSystem(self): """ Lock the system directories, as if they were unwritable by this user. """ os.chmod(self.sysplug.path, 0o555) os.chmod(self.syscache.path, 0o555) def unlockSystem(self): """ Unlock the system directories, as if they were writable by this user. """ os.chmod(self.sysplug.path, 0o777) os.chmod(self.syscache.path, 0o777) def getAllPlugins(self): """ Get all the plugins loadable from our dummy package, and return their short names. """ # Import the module we just added to our path. (Local scope because # this package doesn't exist outside of this test.) import plugindummy.plugins x = list(plugin.getPlugins(ITestPlugin, plugindummy.plugins)) return [plug.__name__ for plug in x] def resetEnvironment(self): """ Change the environment to what it should be just as the test is starting. """ self.unsetEnvironment() sys.path.extend([x.path for x in [self.devPath, self.systemPath, self.appPath]]) def unsetEnvironment(self): """ Change the Python environment back to what it was before the test was started. """ invalidateImportCaches() sys.modules.clear() sys.modules.update(self.savedModules) sys.path[:] = self.savedPath def tearDown(self): """ Reset the Python environment to what it was before this test ran, and restore permissions on files which were marked read-only so that the directory may be cleanly cleaned up. """ self.unsetEnvironment() # Normally we wouldn't "clean up" the filesystem like this (leaving # things for post-test inspection), but if we left the permissions the # way they were, we'd be leaving files around that the buildbots # couldn't delete, and that would be bad. self.unlockSystem() def test_developmentPluginAvailability(self): """ Plugins added in the development path should be loadable, even when the (now non-importable) system path contains its own idea of the list of plugins for a package. Inversely, plugins added in the system path should not be available. """ # Run 3 times: uncached, cached, and then cached again to make sure we # didn't overwrite / corrupt the cache on the cached try. for x in range(3): names = self.getAllPlugins() names.sort() self.assertEqual(names, ['app', 'dev']) def test_freshPyReplacesStalePyc(self): """ Verify that if a stale .pyc file on the PYTHONPATH is replaced by a fresh .py file, the plugins in the new .py are picked up rather than the stale .pyc, even if the .pyc is still around. """ mypath = self.appPackage.child("stale.py") mypath.setContent(pluginFileContents('one')) # Make it super stale x = time.time() - 1000 os.utime(mypath.path, (x, x)) pyc = mypath.sibling('stale.pyc') # compile it if _PY3: # On python 3, don't use the __pycache__ directory; the intention # of scanning for .pyc files is for configurations where you want # to intentionally include them, which means we _don't_ scan for # them inside cache directories. extra = dict(legacy=True) else: # On python 2 this option doesn't exist. extra = dict() compileall.compile_dir(self.appPackage.path, quiet=1, **extra) os.utime(pyc.path, (x, x)) # Eliminate the other option. mypath.remove() # Make sure it's the .pyc path getting cached. self.resetEnvironment() # Sanity check. self.assertIn('one', self.getAllPlugins()) self.failIfIn('two', self.getAllPlugins()) self.resetEnvironment() mypath.setContent(pluginFileContents('two')) self.failIfIn('one', self.getAllPlugins()) self.assertIn('two', self.getAllPlugins()) if _PYPY and not _PY3: test_freshPyReplacesStalePyc.skip = ( "PyPy2 will not normally import lone .pyc files.") def test_newPluginsOnReadOnlyPath(self): """ Verify that a failure to write the dropin.cache file on a read-only path will not affect the list of plugins returned. Note: this test should pass on both Linux and Windows, but may not provide useful coverage on Windows due to the different meaning of "read-only directory". """ self.unlockSystem() self.sysplug.child('newstuff.py').setContent(pluginFileContents('one')) self.lockSystem() # Take the developer path out, so that the system plugins are actually # examined. sys.path.remove(self.devPath.path) # Start observing log events to see the warning events = [] addObserver(events.append) self.addCleanup(removeObserver, events.append) self.assertIn('one', self.getAllPlugins()) # Make sure something was logged about the cache. expected = "Unable to write to plugin cache %s: error number %d" % ( self.syscache.path, errno.EPERM) for event in events: if expected in textFromEventDict(event): break else: self.fail( "Did not observe unwriteable cache warning in log " "events: %r" % (events,)) class AdjacentPackageTests(unittest.TestCase): """ Tests for the behavior of the plugin system when there are multiple installed copies of the package containing the plugins being loaded. """ def setUp(self): """ Save the elements of C{sys.path} and the items of C{sys.modules}. """ self.originalPath = sys.path[:] self.savedModules = sys.modules.copy() def tearDown(self): """ Restore C{sys.path} and C{sys.modules} to their original values. """ sys.path[:] = self.originalPath sys.modules.clear() sys.modules.update(self.savedModules) def createDummyPackage(self, root, name, pluginName): """ Create a directory containing a Python package named I{dummy} with a I{plugins} subpackage. @type root: L{FilePath} @param root: The directory in which to create the hierarchy. @type name: C{str} @param name: The name of the directory to create which will contain the package. @type pluginName: C{str} @param pluginName: The name of a module to create in the I{dummy.plugins} package. @rtype: L{FilePath} @return: The directory which was created to contain the I{dummy} package. """ directory = root.child(name) package = directory.child('dummy') package.makedirs() package.child('__init__.py').setContent(b'') plugins = package.child('plugins') plugins.makedirs() plugins.child('__init__.py').setContent(pluginInitFile) pluginModule = plugins.child(pluginName + '.py') pluginModule.setContent(pluginFileContents(name)) return directory def test_hiddenPackageSamePluginModuleNameObscured(self): """ Only plugins from the first package in sys.path should be returned by getPlugins in the case where there are two Python packages by the same name installed, each with a plugin module by a single name. """ root = FilePath(self.mktemp()) root.makedirs() firstDirectory = self.createDummyPackage(root, 'first', 'someplugin') secondDirectory = self.createDummyPackage(root, 'second', 'someplugin') sys.path.append(firstDirectory.path) sys.path.append(secondDirectory.path) import dummy.plugins plugins = list(plugin.getPlugins(ITestPlugin, dummy.plugins)) self.assertEqual(['first'], [p.__name__ for p in plugins]) def test_hiddenPackageDifferentPluginModuleNameObscured(self): """ Plugins from the first package in sys.path should be returned by getPlugins in the case where there are two Python packages by the same name installed, each with a plugin module by a different name. """ root = FilePath(self.mktemp()) root.makedirs() firstDirectory = self.createDummyPackage(root, 'first', 'thisplugin') secondDirectory = self.createDummyPackage(root, 'second', 'thatplugin') sys.path.append(firstDirectory.path) sys.path.append(secondDirectory.path) import dummy.plugins plugins = list(plugin.getPlugins(ITestPlugin, dummy.plugins)) self.assertEqual(['first'], [p.__name__ for p in plugins]) class PackagePathTests(unittest.TestCase): """ Tests for L{plugin.pluginPackagePaths} which constructs search paths for plugin packages. """ def setUp(self): """ Save the elements of C{sys.path}. """ self.originalPath = sys.path[:] def tearDown(self): """ Restore C{sys.path} to its original value. """ sys.path[:] = self.originalPath def test_pluginDirectories(self): """ L{plugin.pluginPackagePaths} should return a list containing each directory in C{sys.path} with a suffix based on the supplied package name. """ foo = FilePath('foo') bar = FilePath('bar') sys.path = [foo.path, bar.path] self.assertEqual( plugin.pluginPackagePaths('dummy.plugins'), [foo.child('dummy').child('plugins').path, bar.child('dummy').child('plugins').path]) def test_pluginPackagesExcluded(self): """ L{plugin.pluginPackagePaths} should exclude directories which are Python packages. The only allowed plugin package (the only one associated with a I{dummy} package which Python will allow to be imported) will already be known to the caller of L{plugin.pluginPackagePaths} and will most commonly already be in the C{__path__} they are about to mutate. """ root = FilePath(self.mktemp()) foo = root.child('foo').child('dummy').child('plugins') foo.makedirs() foo.child('__init__.py').setContent(b'') sys.path = [root.child('foo').path, root.child('bar').path] self.assertEqual( plugin.pluginPackagePaths('dummy.plugins'), [root.child('bar').child('dummy').child('plugins').path])