Source code for gevent._abstract_linkable

# -*- coding: utf-8 -*-
# cython: auto_pickle=False,embedsignature=True,always_allow_keywords=False
"""
Internal module, support for the linkable protocol for "event" like objects.

"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import sys

from gevent._hub_local import get_hub_noargs as get_hub
from gevent._hub_local import get_hub_if_exists

from gevent.exceptions import InvalidSwitchError
from gevent.timeout import Timeout

locals()['getcurrent'] = __import__('greenlet').getcurrent
locals()['greenlet_init'] = lambda: None

__all__ = [
    'AbstractLinkable',
]

class AbstractLinkable(object):
    # Encapsulates the standard parts of the linking and notifying
    # protocol common to both repeatable events (Event, Semaphore) and
    # one-time events (AsyncResult).
    #
    # With a few careful exceptions, instances of this object can only
    # be used from a single thread. The exception is that certain methods
    # may be used from multiple threads IFF:
    #
    # 1.  They are documented as safe for that purpose; AND
    # 2a. This object is compiled with Cython and thus is holding the GIL
    #     for the entire duration of the method; OR
    # 2b. A subclass ensures that a Python-level native thread lock is held
    #     for the duration of the method; this is necessary in pure-Python mode.
    #     The only known implementation of such
    #     a subclass is for Semaphore.
    #
    # TODO: As of gevent 1.5, we use the same datastructures and almost
    # the same algorithm as Greenlet. See about unifying them more.

    __slots__ = (
        'hub',
        '_links',
        '_notifier',
        '_notify_all',
        '__weakref__'
    )

    def __init__(self, hub=None):
        # Before this implementation, AsyncResult and Semaphore
        # maintained the order of notifications, but Event did not.

        # In gevent 1.3, before Semaphore extended this class, that
        # was changed to not maintain the order. It was done because
        # Event guaranteed to only call callbacks once (a set) but
        # AsyncResult had no such guarantees. When Semaphore was
        # changed to extend this class, it lost its ordering
        # guarantees. Unfortunately, that made it unfair. There are
        # rare cases that this can starve a greenlet
        # (https://github.com/gevent/gevent/issues/1487) and maybe
        # even lead to deadlock (not tested).

        # So in gevent 1.5 we go back to maintaining order. But it's
        # still important not to make duplicate calls, and it's also
        # important to avoid O(n^2) behaviour that can result from
        # naive use of a simple list due to the need to handle removed
        # links in the _notify_links loop. Cython has special support for
        # built-in sets, lists, and dicts, but not ordereddict. Rather than
        # use two data structures, or a dict({link: order}), we simply use a
        # list and remove objects as we go, keeping track of them so as not to
        # have duplicates called. This makes `unlink` O(n), but we can avoid
        # calling it in the common case in _wait_core (even so, the number of
        # waiters should usually be pretty small)
        self._links = []
        self._notifier = None
        # This is conceptually a class attribute, defined here for ease of access in
        # cython. If it's true, when notifiers fire, all existing callbacks are called.
        # If its false, we only call callbacks as long as ready() returns true.
        self._notify_all = True
        # we don't want to do get_hub() here to allow defining module-level objects
        # without initializing the hub. However, for multiple-thread safety, as soon
        # as a waiting method is entered, even if it won't have to wait, we
        # need to grab the hub and assign ownership. For that reason, if the hub
        # is present, we'll go ahead and take it.
        self.hub = hub if hub is not None else get_hub_if_exists()

    def linkcount(self):
        # For testing: how many objects are linked to this one?
        return len(self._links)

    def ready(self):
        # Instances must define this
        raise NotImplementedError

    def rawlink(self, callback):
        """
        Register a callback to call when this object is ready.

        *callback* will be called in the :class:`Hub
        <gevent.hub.Hub>`, so it must not use blocking gevent API.
        *callback* will be passed one argument: this instance.
        """
        if not callable(callback):
            raise TypeError('Expected callable: %r' % (callback, ))
        self._links.append(callback)
        self._check_and_notify()

    def unlink(self, callback):
        """Remove the callback set by :meth:`rawlink`"""
        try:
            self._links.remove(callback)
        except ValueError:
            pass

        if not self._links and self._notifier is not None and self._notifier.pending:
            # If we currently have one queued, but not running, de-queue it.
            # This will break a reference cycle.
            # (self._notifier -> self._notify_links -> self)
            # If it's actually running, though, (and we're here as a result of callbacks)
            # we don't want to change it; it needs to finish what its doing
            # so we don't attempt to start a fresh one or swap it out from underneath the
            # _notify_links method.
            self._notifier.stop()

    def _capture_hub(self, create):
        # Subclasses should call this as the first action from any
        # public method that could, in theory, block and switch
        # to the hub. This may release the GIL.
        if self.hub is None:
            # This next line might release the GIL.
            current_hub = get_hub() if create else get_hub_if_exists()
            if current_hub is None:
                return
            # We have the GIL again. Did anything change? If so,
            # we lost the race.
            if self.hub is None:
                self.hub = current_hub

    def _check_and_notify(self):
        # If this object is ready to be notified, begin the process.
        if self.ready() and self._links and not self._notifier:
            self._capture_hub(True) # Must create, we need it.
            self._notifier = self.hub.loop.run_callback(self._notify_links, [])

    def _notify_link_list(self, links):
        # The core of the _notify_links method to notify
        # links in order. Lets the ``links`` list be mutated,
        # and only notifies up to the last item in the list, in case
        # objects are added to it.
        only_while_ready = not self._notify_all
        final_link = links[-1]
        done = set() # of ids
        while links: # remember this can be mutated
            if only_while_ready and not self.ready():
                break

            link = links.pop(0) # Cython optimizes using list internals
            id_link = id(link)
            if id_link not in done:
                # XXX: JAM: What was I thinking? This doesn't make much sense,
                # there's a good chance `link` will be deallocated, and its id() will
                # be free to be reused.
                done.add(id_link)
                try:
                    self._drop_lock_for_switch_out()
                    try:
                        link(self)
                    finally:
                        self._acquire_lock_for_switch_in()
                except: # pylint:disable=bare-except
                    # We're running in the hub, errors must not escape.
                    self.hub.handle_error((link, self), *sys.exc_info())

            if link is final_link:
                break

    def _notify_links(self, arrived_while_waiting):
        # This method must hold the GIL, or be guarded with the lock that guards
        # this object. Thus, while we are notifying objects, an object from another
        # thread simply cannot arrive and mutate ``_links`` or ``arrived_while_waiting``

        # ``arrived_while_waiting`` is a list of greenlet.switch methods
        # to call. These were objects that called wait() while we were processing,
        # and which would have run *before* those that had actually waited
        # and blocked. Instead of returning True immediately, we add them to this
        # list so they wait their turn.

        # We release self._notifier here when done invoking links.
        # The object itself becomes false in a boolean way as soon
        # as this method returns.
        notifier = self._notifier
        if notifier is None:
            # XXX: How did we get here?
            self._check_and_notify()
            return
        # Early links are allowed to remove later links, and links
        # are allowed to add more links, thus we must not
        # make a copy of our the ``_links`` list, we must traverse it and
        # mutate in place.
        #
        # We were ready() at the time this callback was scheduled; we
        # may not be anymore, and that status may change during
        # callback processing. Some of our subclasses (Event) will
        # want to notify everyone who was registered when the status
        # became true that it was once true, even though it may not be
        # any more. In that case, we must not keep notifying anyone that's
        # newly added after that, even if we go ready again.
        try:
            self._notify_link_list(self._links)
            # Now, those that arrived after we had begun the notification
            # process. Follow the same rules, stop with those that are
            # added so far to prevent starvation.
            if arrived_while_waiting:
                self._notify_link_list(arrived_while_waiting)

                # Anything left needs to go back on the main list.
                self._links.extend(arrived_while_waiting)
        finally:
            # We should not have created a new notifier even if callbacks
            # released us because we loop through *all* of our links on the
            # same callback while self._notifier is still true.
            assert self._notifier is notifier, (self._notifier, notifier)
            self._notifier = None
            # TODO: Maybe we should intelligently reset self.hub to
            # free up thread affinity? In case of a pathological situation where
            # one object was used from one thread once & first,  but usually is
            # used by another thread.
        # Now we may be ready or not ready. If we're ready, which
        # could have happened during the last link we called, then we
        # must have more links than we started with. We need to schedule the
        # wakeup.
        self._check_and_notify()

    def __unlink_all(self, obj):
        if obj is None:
            return

        self.unlink(obj)
        if self._notifier is not None and self._notifier.args:
            try:
                self._notifier.args[0].remove(obj)
            except ValueError:
                pass

    def __wait_to_be_notified(self, rawlink): # pylint:disable=too-many-branches
        # We've got to watch where we could potentially release the GIL.
        # Decisions we make based an the state of this object must be in blocks
        # that cannot release the GIL.
        resume_this_greenlet = None
        watcher = None
        current_hub = get_hub()
        send = None

        while 1:
            my_hub = self.hub
            if my_hub is current_hub:
                break

            # We're owned by another hub.
            if my_hub.dead: # dead is a property, this could have released the GIL.
                # We have the GIL back. Did anything change?
                if my_hub is not self.hub:
                    continue # start over.
                # The other hub is dead, so we can take ownership.
                self.hub = current_hub
                break
            # Some other hub owns this object. We must ask it to wake us
            # up. We can't use a Python-level ``Lock`` because
            # (1) it doesn't support a timeout on all platforms; and
            # (2) we don't want to block this hub from running. So we need to
            # do so in a way that cooperates with *two* hubs. That's what an
            # async watcher is built for.
            #
            # Allocating and starting the watcher *could* release the GIL.
            # with the libev corcext, allocating won't, but starting briefly will.
            # With other backends, allocating might, and starting might also.
            # So...XXX: Race condition here, tiny though it may be.
            watcher = current_hub.loop.async_()
            send = watcher.send_ignoring_arg
            if rawlink:
                # Make direct calls to self.rawlink, the most common case,
                # so cython can more easily optimize.
                self.rawlink(send)
            else:
                self._notifier.args[0].append(send)

            watcher.start(getcurrent().switch, self) # pylint:disable=undefined-variable
            break

        if self.hub is current_hub:
            resume_this_greenlet = getcurrent().switch # pylint:disable=undefined-variable
            if rawlink:
                self.rawlink(resume_this_greenlet)
            else:
                self._notifier.args[0].append(resume_this_greenlet)
        try:
            self._drop_lock_for_switch_out()
            result = current_hub.switch() # Probably releases
            # If we got here, we were automatically unlinked already.
            resume_this_greenlet = None
            if result is not self: # pragma: no cover
                raise InvalidSwitchError(
                    'Invalid switch into %s.wait(): %r' % (
                        self.__class__.__name__,
                        result,
                    )
                )
        finally:
            self._acquire_lock_for_switch_in()
            self.__unlink_all(resume_this_greenlet)
            self.__unlink_all(send)
            if watcher is not None:
                watcher.stop()
                watcher.close()

    def _acquire_lock_for_switch_in(self):
        return

    def _drop_lock_for_switch_out(self):
        return

    def _wait_core(self, timeout, catch=Timeout):
        """
        The core of the wait implementation, handling switching and
        linking.

        This method is safe to call from multiple threads; it must be holding
        the GIL for the entire duration, or be protected by a Python-level
        lock for that to be true.

        ``self.hub`` must be initialized before entering this method.
        The hub that is set is considered the owner and cannot be changed.

        If *catch* is set to ``()``, a timeout that elapses will be
        allowed to be raised.

        :return: A true value if the wait succeeded without timing out.
          That is, a true return value means we were notified and control
          resumed in this greenlet.
        """
        with Timeout._start_new_or_dummy(timeout) as timer: # Might release
            try:
                self.__wait_to_be_notified(True) # Use rawlink()
                return True
            except catch as ex:
                if ex is not timer:
                    raise
                # test_set_and_clear and test_timeout in test_threading
                # rely on the exact return values, not just truthish-ness
                return False

    def _wait_return_value(self, waited, wait_success):
        # pylint:disable=unused-argument
        # Subclasses should override this to return a value from _wait.
        # By default we return None.
        return None # pragma: no cover all extent subclasses override

    def _wait(self, timeout=None):
        """
        This method is safe to call from multiple threads, providing
        the conditions laid out in the class documentation are met.
        """
        # Watch where we could potentially release the GIL.
        self._capture_hub(True) # Must create, we must have an owner. Might release

        if self.ready(): # *might* release, if overridden in Python.
            result = self._wait_return_value(False, False) # pylint:disable=assignment-from-none
            if self._notifier:
                # We're already notifying waiters; one of them must have run
                # and switched to this greenlet, which arrived here. Alternately,
                # we could be in a separate thread (but we're holding the GIL/object lock)
                self.__wait_to_be_notified(False) # Use self._notifier.args[0] instead of self.rawlink

            return result

        gotit = self._wait_core(timeout)
        return self._wait_return_value(True, gotit)

    def _at_fork_reinit(self):
        """
        This method was added in Python 3.9 and is called by logging.py
        ``_after_at_fork_child_reinit_locks`` on Lock objects.

        It is also called from threading.py, ``_after_fork`` in
        ``_reset_internal_locks``, and that can hit ``Event`` objects.

        Subclasses should reset themselves to an initial state. This
        includes unlocking/releasing, if possible. This method detaches from the
        previous hub and drops any existing notifier.
        """
        self.hub = None
        self._notifier = None

def _init():
    greenlet_init() # pylint:disable=undefined-variable

_init()


from gevent._util import import_c_accel
import_c_accel(globals(), 'gevent.__abstract_linkable')