# -*- test-case-name: twisted.test.test_kqueuereactor -*- # Copyright (c) Twisted Matrix Laboratories. # See LICENSE for details. """ A kqueue()/kevent() based implementation of the Twisted main loop. To use this reactor, start your application specifying the kqueue reactor:: twistd --reactor kqueue ... To install the event loop from code (and you should do this before any connections, listeners or connectors are added):: from twisted.internet import kqreactor kqreactor.install() """ from __future__ import division, absolute_import import errno import select from select import KQ_FILTER_READ, KQ_FILTER_WRITE from select import KQ_EV_DELETE, KQ_EV_ADD, KQ_EV_EOF from zope.interface import implementer, declarations, Interface, Attribute from twisted.internet import main, posixbase from twisted.internet.interfaces import IReactorFDSet, IReactorDaemonize from twisted.python import log, failure class _IKQueue(Interface): """ An interface for KQueue implementations. """ kqueue = Attribute("An implementation of kqueue(2).") kevent = Attribute("An implementation of kevent(2).") declarations.directlyProvides(select, _IKQueue) @implementer(IReactorFDSet, IReactorDaemonize) class KQueueReactor(posixbase.PosixReactorBase): """ A reactor that uses kqueue(2)/kevent(2) and relies on Python 2.6 or higher which has built in support for kqueue in the select module. @ivar _kq: A C{kqueue} which will be used to check for I/O readiness. @ivar _impl: The implementation of L{_IKQueue} to use. @ivar _selectables: A dictionary mapping integer file descriptors to instances of L{FileDescriptor} which have been registered with the reactor. All L{FileDescriptor}s which are currently receiving read or write readiness notifications will be present as values in this dictionary. @ivar _reads: A set containing integer file descriptors. Values in this set will be registered with C{_kq} for read readiness notifications which will be dispatched to the corresponding L{FileDescriptor} instances in C{_selectables}. @ivar _writes: A set containing integer file descriptors. Values in this set will be registered with C{_kq} for write readiness notifications which will be dispatched to the corresponding L{FileDescriptor} instances in C{_selectables}. """ def __init__(self, _kqueueImpl=select): """ Initialize kqueue object, file descriptor tracking dictionaries, and the base class. See: - http://docs.python.org/library/select.html - www.freebsd.org/cgi/man.cgi?query=kqueue - people.freebsd.org/~jlemon/papers/kqueue.pdf @param _kqueueImpl: The implementation of L{_IKQueue} to use. A hook for testing. """ self._impl = _kqueueImpl self._kq = self._impl.kqueue() self._reads = set() self._writes = set() self._selectables = {} posixbase.PosixReactorBase.__init__(self) def _updateRegistration(self, fd, filter, op): """ Private method for changing kqueue registration on a given FD filtering for events given filter/op. This will never block and returns nothing. """ self._kq.control([self._impl.kevent(fd, filter, op)], 0, 0) def beforeDaemonize(self): """ Implement L{IReactorDaemonize.beforeDaemonize}. """ # Twisted-internal method called during daemonization (when application # is started via twistd). This is called right before the magic double # forking done for daemonization. We cleanly close the kqueue() and later # recreate it. This is needed since a) kqueue() are not inherited across # forks and b) twistd will create the reactor already before daemonization # (and will also add at least 1 reader to the reactor, an instance of # twisted.internet.posixbase._UnixWaker). # # See: twisted.scripts._twistd_unix.daemonize() self._kq.close() self._kq = None def afterDaemonize(self): """ Implement L{IReactorDaemonize.afterDaemonize}. """ # Twisted-internal method called during daemonization. This is called right # after daemonization and recreates the kqueue() and any readers/writers # that were added before. Note that you MUST NOT call any reactor methods # in between beforeDaemonize() and afterDaemonize()! self._kq = self._impl.kqueue() for fd in self._reads: self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_ADD) for fd in self._writes: self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_ADD) def addReader(self, reader): """ Implement L{IReactorFDSet.addReader}. """ fd = reader.fileno() if fd not in self._reads: try: self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_ADD) except OSError: pass finally: self._selectables[fd] = reader self._reads.add(fd) def addWriter(self, writer): """ Implement L{IReactorFDSet.addWriter}. """ fd = writer.fileno() if fd not in self._writes: try: self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_ADD) except OSError: pass finally: self._selectables[fd] = writer self._writes.add(fd) def removeReader(self, reader): """ Implement L{IReactorFDSet.removeReader}. """ wasLost = False try: fd = reader.fileno() except: fd = -1 if fd == -1: for fd, fdes in self._selectables.items(): if reader is fdes: wasLost = True break else: return if fd in self._reads: self._reads.remove(fd) if fd not in self._writes: del self._selectables[fd] if not wasLost: try: self._updateRegistration(fd, KQ_FILTER_READ, KQ_EV_DELETE) except OSError: pass def removeWriter(self, writer): """ Implement L{IReactorFDSet.removeWriter}. """ wasLost = False try: fd = writer.fileno() except: fd = -1 if fd == -1: for fd, fdes in self._selectables.items(): if writer is fdes: wasLost = True break else: return if fd in self._writes: self._writes.remove(fd) if fd not in self._reads: del self._selectables[fd] if not wasLost: try: self._updateRegistration(fd, KQ_FILTER_WRITE, KQ_EV_DELETE) except OSError: pass def removeAll(self): """ Implement L{IReactorFDSet.removeAll}. """ return self._removeAll( [self._selectables[fd] for fd in self._reads], [self._selectables[fd] for fd in self._writes]) def getReaders(self): """ Implement L{IReactorFDSet.getReaders}. """ return [self._selectables[fd] for fd in self._reads] def getWriters(self): """ Implement L{IReactorFDSet.getWriters}. """ return [self._selectables[fd] for fd in self._writes] def doKEvent(self, timeout): """ Poll the kqueue for new events. """ if timeout is None: timeout = 1 try: events = self._kq.control([], len(self._selectables), timeout) except OSError as e: # Since this command blocks for potentially a while, it's possible # EINTR can be raised for various reasons (for example, if the user # hits ^C). if e.errno == errno.EINTR: return else: raise _drdw = self._doWriteOrRead for event in events: fd = event.ident try: selectable = self._selectables[fd] except KeyError: # Handles the infrequent case where one selectable's # handler disconnects another. continue else: log.callWithLogger(selectable, _drdw, selectable, fd, event) def _doWriteOrRead(self, selectable, fd, event): """ Private method called when a FD is ready for reading, writing or was lost. Do the work and raise errors where necessary. """ why = None inRead = False (filter, flags, data, fflags) = ( event.filter, event.flags, event.data, event.fflags) if flags & KQ_EV_EOF and data and fflags: why = main.CONNECTION_LOST else: try: if selectable.fileno() == -1: inRead = False why = posixbase._NO_FILEDESC else: if filter == KQ_FILTER_READ: inRead = True why = selectable.doRead() if filter == KQ_FILTER_WRITE: inRead = False why = selectable.doWrite() except: # Any exception from application code gets logged and will # cause us to disconnect the selectable. why = failure.Failure() log.err(why, "An exception was raised from application code" \ " while processing a reactor selectable") if why: self._disconnectSelectable(selectable, why, inRead) doIteration = doKEvent def install(): """ Install the kqueue() reactor. """ p = KQueueReactor() from twisted.internet.main import installReactor installReactor(p) __all__ = ["KQueueReactor", "install"]