aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/portage/util/futures/_asyncio/__init__.py43
1 files changed, 38 insertions, 5 deletions
diff --git a/lib/portage/util/futures/_asyncio/__init__.py b/lib/portage/util/futures/_asyncio/__init__.py
index 8805e3575..c960d0363 100644
--- a/lib/portage/util/futures/_asyncio/__init__.py
+++ b/lib/portage/util/futures/_asyncio/__init__.py
@@ -26,6 +26,7 @@ __all__ = (
import sys
import types
+import warnings
import weakref
import asyncio as _real_asyncio
@@ -46,6 +47,7 @@ from asyncio import (
)
import threading
+from typing import Optional
import portage
@@ -251,11 +253,35 @@ def _wrap_loop(loop=None):
# The default loop returned by _wrap_loop should be consistent
# with global_event_loop, in order to avoid accidental registration
# of callbacks with a loop that is not intended to run.
- loop = loop or _safe_loop()
- return loop if hasattr(loop, "_asyncio_wrapper") else _AsyncioEventLoop(loop=loop)
+ if hasattr(loop, "_asyncio_wrapper"):
+ return loop
+
+ # This returns a running loop if it exists, and otherwise returns
+ # a loop associated with the current thread.
+ safe_loop = _safe_loop(create=loop is None)
+ if safe_loop is not None and (loop is None or safe_loop._loop is loop):
+ return safe_loop
+
+ if safe_loop is None:
+ msg = f"_wrap_loop argument '{loop}' not associated with thread '{threading.get_ident()}'"
+ else:
+ msg = f"_wrap_loop argument '{loop}' different frome loop '{safe_loop._loop}' already associated with thread '{threading.get_ident()}'"
+
+ if portage._internal_caller:
+ raise AssertionError(msg)
+ # It's not known whether external API consumers will trigger this case,
+ # so if it happens then emit a UserWarning before returning a temporary
+ # AsyncioEventLoop instance.
+ warnings.warn(msg, UserWarning, stacklevel=2)
-def _safe_loop():
+ # We could possibly add a weak reference in _thread_weakrefs.loops when
+ # safe_loop is None, but if safe_loop is not None, then there is a
+ # collision in _thread_weakrefs.loops that would need to be resolved.
+ return _AsyncioEventLoop(loop=loop)
+
+
+def _safe_loop(create: Optional[bool] = True) -> Optional[_AsyncioEventLoop]:
"""
Return an event loop that's safe to use within the current context.
For portage internal callers or external API consumers calling from
@@ -276,8 +302,13 @@ def _safe_loop():
are added to a WeakValueDictionary, and closed via an atexit hook
if they still exist during exit for the current pid.
- @rtype: asyncio.AbstractEventLoop (or compatible)
- @return: event loop instance
+ @type create: bool
+ @param create: Create a loop by default if a loop is not already associated
+ with the current thread. If create is False, then return None if a loop
+ is not already associated with the current thread.
+ @rtype: AsyncioEventLoop or None
+ @return: event loop instance, or None if the create parameter is False and
+ a loop is not already associated with the current thread.
"""
loop = _get_running_loop()
if loop is not None:
@@ -292,6 +323,8 @@ def _safe_loop():
try:
loop = _thread_weakrefs.loops[thread_key]
except KeyError:
+ if not create:
+ return None
try:
try:
_loop = _real_asyncio.get_running_loop()