aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorZac Medico <zmedico@gentoo.org>2024-08-08 07:58:31 -0700
committerZac Medico <zmedico@gentoo.org>2024-08-08 07:58:31 -0700
commitd6710ee0cdab2a212ff70503f9699f1be4660bb4 (patch)
tree392712bc816757b10d6ef8c3451402ba45f7ba71 /lib
parentdoebuild.spawn: Skip socks5 proxy for "depend" phase (diff)
downloadportage-d6710ee0cdab2a212ff70503f9699f1be4660bb4.tar.gz
portage-d6710ee0cdab2a212ff70503f9699f1be4660bb4.tar.bz2
portage-d6710ee0cdab2a212ff70503f9699f1be4660bb4.zip
run_exitfuncs: Support loop close via hook
Handle the case where the loop has not been explicitly closed before exit. In this case, run_exitfuncs previously tried to call coroutine functions as though they were normal functions, which obvously would not behave correctly. Solve this problem by storing the coroutine functions in a separate _coroutine_exithandlers list that belongs exclusively to the run_coroutine_exitfuncs function, so that it is safe to close the loop and call run_coroutine_exitfuncs from inside a run_exitfuncs hook. A _thread_weakrefs_atexit hook already exists that will close weakly referenced loops. The _thread_weakrefs_atexit hook is now fixed to release its lock when it closes a loop, since the same lock may need to be re-acquired when run_coroutine_exitfuncs runs. The included test case demonstrates that run_exitfuncs will run via an atexit hook and correctly terminate the socks5 proxy in a standalone program using the portage API (like eclean). Due to a deadlock that will occur if an _exit_function atexit hook from the multiprocessing module executes before run_exitfuncs, a portage.process._atexit_register_run_exitfuncs() function needs to be called in order to re-order the hooks after the first process has been started via the multiprocessing module. The natural place to call this is in the ForkProcess class, using a global variable to trigger the call just once. Fixes: c3ebdbb42e72 ("elog/mod_custom: Spawn processes in background") Bug: https://bugs.gentoo.org/937384 Signed-off-by: Zac Medico <zmedico@gentoo.org>
Diffstat (limited to 'lib')
-rw-r--r--lib/portage/process.py37
-rw-r--r--lib/portage/tests/__init__.py19
-rw-r--r--lib/portage/tests/util/test_socks5.py66
-rw-r--r--lib/portage/util/_async/ForkProcess.py8
-rw-r--r--lib/portage/util/futures/_asyncio/__init__.py24
5 files changed, 133 insertions, 21 deletions
diff --git a/lib/portage/process.py b/lib/portage/process.py
index 6e4e0d716..23e2507b5 100644
--- a/lib/portage/process.py
+++ b/lib/portage/process.py
@@ -3,6 +3,7 @@
# Distributed under the terms of the GNU General Public License v2
+import asyncio as _asyncio
import atexit
import errno
import fcntl
@@ -193,6 +194,7 @@ def spawn_fakeroot(mycommand, fakeroot_state=None, opt_name=None, **keywords):
_exithandlers = []
+_coroutine_exithandlers = []
def atexit_register(func, *args, **kargs):
@@ -200,7 +202,12 @@ def atexit_register(func, *args, **kargs):
what is registered. For example, when portage restarts itself via
os.execv, the atexit module does not work so we have to do it
manually by calling the run_exitfuncs() function in this module."""
- _exithandlers.append((func, args, kargs))
+ # The internal asyncio wrapper module would trigger a circular import
+ # if used here.
+ if _asyncio.iscoroutinefunction(func):
+ _coroutine_exithandlers.append((func, args, kargs))
+ else:
+ _exithandlers.append((func, args, kargs))
def run_exitfuncs():
@@ -232,12 +239,16 @@ async def run_coroutine_exitfuncs():
This is the same as run_exitfuncs but it uses asyncio.iscoroutinefunction
to check which functions to run. It is called by the AsyncioEventLoop
_close_main method just before the loop is closed.
+
+ If the loop is explicitly closed before exit, then that will cause
+ run_coroutine_exitfuncs to run before run_exitfuncs. Otherwise, a
+ run_exitfuncs hook will close it, causing run_coroutine_exitfuncs to be
+ called via run_exitfuncs.
"""
tasks = []
- for index, (func, targs, kargs) in reversed(list(enumerate(_exithandlers))):
- if asyncio.iscoroutinefunction(func):
- del _exithandlers[index]
- tasks.append(asyncio.ensure_future(func(*targs, **kargs)))
+ while _coroutine_exithandlers:
+ func, targs, kargs = _coroutine_exithandlers.pop()
+ tasks.append(asyncio.ensure_future(func(*targs, **kargs)))
tracebacks = []
exc_info = None
for task in tasks:
@@ -255,7 +266,21 @@ async def run_coroutine_exitfuncs():
raise exc_info[1].with_traceback(exc_info[2])
-atexit.register(run_exitfuncs)
+def _atexit_register_run_exitfuncs():
+ """
+ Register the run_exitfuncs atexit hook. If this hook is not called
+ before the multiprocessing module's _exit_function, then there will
+ be a deadlock. In order to prevent the deadlock, this function must
+ be called in order to re-order the hooks after the first process has
+ been started via the multiprocessing module. The natural place to
+ call this is in the ForkProcess class, though it should also be
+ called once before, in case the ForkProcess class is never called.
+ """
+ atexit.unregister(run_exitfuncs)
+ atexit.register(run_exitfuncs)
+
+
+_atexit_register_run_exitfuncs()
# It used to be necessary for API consumers to remove pids from spawned_pids,
# since otherwise it would accumulate a pids endlessly. Now, spawned_pids is
diff --git a/lib/portage/tests/__init__.py b/lib/portage/tests/__init__.py
index 23dd366d8..79373dfbb 100644
--- a/lib/portage/tests/__init__.py
+++ b/lib/portage/tests/__init__.py
@@ -16,6 +16,7 @@ from portage import os
from portage.util import no_color
from portage import _encodings
from portage import _unicode_decode
+from portage.const import PORTAGE_PYM_PATH
from portage.output import colorize
from portage.proxy.objectproxy import ObjectProxy
@@ -65,6 +66,24 @@ def cnf_sbindir():
return os.path.join(portage.const.EPREFIX or "/", "usr", "sbin")
+def get_pythonpath():
+ """
+ Prefix current PYTHONPATH with PORTAGE_PYM_PATH, and normalize.
+ """
+ pythonpath = os.environ.get("PYTHONPATH")
+ if pythonpath is not None and not pythonpath.strip():
+ pythonpath = None
+ if pythonpath is not None and pythonpath.split(":")[0] == PORTAGE_PYM_PATH:
+ pass
+ else:
+ if pythonpath is None:
+ pythonpath = ""
+ else:
+ pythonpath = ":" + pythonpath
+ pythonpath = PORTAGE_PYM_PATH + pythonpath
+ return pythonpath
+
+
class TestCase(unittest.TestCase):
"""
We need a way to mark a unit test as "ok to fail"
diff --git a/lib/portage/tests/util/test_socks5.py b/lib/portage/tests/util/test_socks5.py
index e7bc2d699..35f919d97 100644
--- a/lib/portage/tests/util/test_socks5.py
+++ b/lib/portage/tests/util/test_socks5.py
@@ -3,14 +3,16 @@
import asyncio
import functools
+import os
import shutil
import socket
import struct
+import subprocess
import tempfile
import time
import portage
-from portage.tests import TestCase
+from portage.tests import TestCase, get_pythonpath
from portage.util import socks5
from portage.util.futures.executor.fork import ForkExecutor
from portage.util._eventloop.global_event_loop import global_event_loop
@@ -199,10 +201,10 @@ class Socks5ServerTestCase(TestCase):
path = "/index.html"
proxy = None
tempdir = tempfile.mkdtemp()
- previous_exithandlers = portage.process._exithandlers
+ previous_exithandlers = portage.process._coroutine_exithandlers
try:
- portage.process._exithandlers = []
+ portage.process._coroutine_exithandlers = []
with AsyncHTTPServer(host, {path: content}, loop) as server:
settings = {
"PORTAGE_TMPDIR": tempdir,
@@ -225,11 +227,11 @@ class Socks5ServerTestCase(TestCase):
finally:
try:
# Also run_coroutine_exitfuncs to test atexit hook cleanup.
- self.assertNotEqual(portage.process._exithandlers, [])
+ self.assertNotEqual(portage.process._coroutine_exithandlers, [])
await portage.process.run_coroutine_exitfuncs()
- self.assertEqual(portage.process._exithandlers, [])
+ self.assertEqual(portage.process._coroutine_exithandlers, [])
finally:
- portage.process._exithandlers = previous_exithandlers
+ portage.process._coroutine_exithandlers = previous_exithandlers
shutil.rmtree(tempdir)
@@ -269,3 +271,55 @@ class Socks5ServerLoopCloseTestCase(TestCase):
shutil.rmtree(tempdir)
return not socks5.proxy.is_running()
+
+
+class Socks5ServerAtExitTestCase(TestCase):
+ """
+ For bug 937384, test that the socks5 proxy is automatically
+ terminated by portage.process.run_exitfuncs(), using a subprocess
+ for isolation.
+
+ Note that if the subprocess is created via fork then it will be
+ vulnerable to python issue 83856 which is only fixed in python3.13,
+ so this test uses python -c to ensure that atexit hooks will work.
+ """
+
+ def testSocks5ServerAtExit(self):
+ tempdir = tempfile.mkdtemp()
+ try:
+ env = os.environ.copy()
+ env["PYTHONPATH"] = get_pythonpath()
+ output = subprocess.check_output(
+ [
+ portage._python_interpreter,
+ "-c",
+ """
+import sys
+
+from portage.const import PORTAGE_BIN_PATH
+from portage.util import socks5
+from portage.util._eventloop.global_event_loop import global_event_loop
+
+tempdir = sys.argv[0]
+loop = global_event_loop()
+
+settings = {
+ "PORTAGE_TMPDIR": tempdir,
+ "PORTAGE_BIN_PATH": PORTAGE_BIN_PATH,
+}
+
+socks5.get_socks5_proxy(settings)
+loop.run_until_complete(socks5.proxy.ready())
+print(socks5.proxy._proc.pid, flush=True)
+""",
+ tempdir,
+ ],
+ env=env,
+ )
+
+ pid = int(output.strip())
+
+ with self.assertRaises(ProcessLookupError):
+ os.kill(pid, 0)
+ finally:
+ shutil.rmtree(tempdir)
diff --git a/lib/portage/util/_async/ForkProcess.py b/lib/portage/util/_async/ForkProcess.py
index e6cfdefb8..946978b30 100644
--- a/lib/portage/util/_async/ForkProcess.py
+++ b/lib/portage/util/_async/ForkProcess.py
@@ -15,6 +15,8 @@ from portage.cache.mappings import slot_dict_class
from portage.util.futures import asyncio
from _emerge.SpawnProcess import SpawnProcess
+_registered_run_exitfuncs = None
+
class ForkProcess(SpawnProcess):
# NOTE: This class overrides the meaning of the SpawnProcess 'args'
@@ -206,6 +208,8 @@ class ForkProcess(SpawnProcess):
promoting a healthy state for the forked interpreter.
"""
+ global _registered_run_exitfuncs
+
if self.__class__._run is ForkProcess._run:
# target replaces the deprecated self._run method
target = self.target
@@ -252,6 +256,10 @@ class ForkProcess(SpawnProcess):
if stdin_dup is not None:
os.close(stdin_dup)
+ if _registered_run_exitfuncs != portage.getpid():
+ portage.process._atexit_register_run_exitfuncs()
+ _registered_run_exitfuncs = portage.getpid()
+
return portage.process.MultiprocessingProcess(proc)
def _cancel(self):
diff --git a/lib/portage/util/futures/_asyncio/__init__.py b/lib/portage/util/futures/_asyncio/__init__.py
index e377a9cdd..8942bcb67 100644
--- a/lib/portage/util/futures/_asyncio/__init__.py
+++ b/lib/portage/util/futures/_asyncio/__init__.py
@@ -346,15 +346,21 @@ def _get_running_loop():
def _thread_weakrefs_atexit():
- with _thread_weakrefs.lock:
- if _thread_weakrefs.pid == portage.getpid():
- while True:
- try:
- thread_key, loop = _thread_weakrefs.loops.popitem()
- except KeyError:
- break
- else:
- loop.close()
+ while True:
+ loop = None
+ with _thread_weakrefs.lock:
+ if _thread_weakrefs.pid != portage.getpid():
+ return
+
+ try:
+ thread_key, loop = _thread_weakrefs.loops.popitem()
+ except KeyError:
+ return
+
+ # Release the lock while closing the loop, since it may call
+ # run_coroutine_exitfuncs interally.
+ if loop is not None:
+ loop.close()
_thread_weakrefs = types.SimpleNamespace(