* [gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/, pym/portage/tests/util/futures/asyncio/, ...
@ 2018-04-12 8:40 Zac Medico
0 siblings, 0 replies; only message in thread
From: Zac Medico @ 2018-04-12 8:40 UTC (permalink / raw
To: gentoo-commits
commit: a78dca7e47f79ad48aee4909ee10688604996b86
Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Wed Apr 11 06:44:41 2018 +0000
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Thu Apr 12 08:35:05 2018 +0000
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=a78dca7e
Implement AbstractEventLoopPolicy.get_child_watcher() (bug 649588)
Use a _PortageChildWatcher class to wrap portage's internal event loop
and implement asyncio's AbstractChildWatcher interface.
Bug: https://bugs.gentoo.org/649588
.../util/futures/asyncio/test_child_watcher.py | 45 +++++++++++
pym/portage/util/_eventloop/EventLoop.py | 7 +-
pym/portage/util/futures/_asyncio.py | 13 ++++
pym/portage/util/futures/unix_events.py | 90 ++++++++++++++++++++++
4 files changed, 152 insertions(+), 3 deletions(-)
diff --git a/pym/portage/tests/util/futures/asyncio/test_child_watcher.py b/pym/portage/tests/util/futures/asyncio/test_child_watcher.py
new file mode 100644
index 000000000..dca01be56
--- /dev/null
+++ b/pym/portage/tests/util/futures/asyncio/test_child_watcher.py
@@ -0,0 +1,45 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import os
+
+from portage.process import find_binary, spawn
+from portage.tests import TestCase
+from portage.util.futures import asyncio
+from portage.util.futures.unix_events import DefaultEventLoopPolicy
+
+
+class ChildWatcherTestCase(TestCase):
+ def testChildWatcher(self):
+ true_binary = find_binary("true")
+ self.assertNotEqual(true_binary, None)
+
+ initial_policy = asyncio.get_event_loop_policy()
+ if not isinstance(initial_policy, DefaultEventLoopPolicy):
+ asyncio.set_event_loop_policy(DefaultEventLoopPolicy())
+
+ try:
+ try:
+ asyncio.set_child_watcher(None)
+ except NotImplementedError:
+ pass
+ else:
+ self.assertTrue(False)
+
+ args_tuple = ('hello', 'world')
+
+ loop = asyncio.get_event_loop()
+ future = loop.create_future()
+
+ def callback(pid, returncode, *args):
+ future.set_result((pid, returncode, args))
+
+ with asyncio.get_child_watcher() as watcher:
+ pids = spawn([true_binary], returnpid=True)
+ watcher.add_child_handler(pids[0], callback, *args_tuple)
+
+ self.assertEqual(
+ loop.run_until_complete(future),
+ (pids[0], os.EX_OK, args_tuple))
+ finally:
+ asyncio.set_event_loop_policy(initial_policy)
diff --git a/pym/portage/util/_eventloop/EventLoop.py b/pym/portage/util/_eventloop/EventLoop.py
index d53a76ba1..12c199c76 100644
--- a/pym/portage/util/_eventloop/EventLoop.py
+++ b/pym/portage/util/_eventloop/EventLoop.py
@@ -25,7 +25,7 @@ import portage
portage.proxy.lazyimport.lazyimport(globals(),
'portage.util.futures.futures:Future',
'portage.util.futures.executor.fork:ForkExecutor',
- 'portage.util.futures.unix_events:_PortageEventLoop',
+ 'portage.util.futures.unix_events:_PortageEventLoop,_PortageChildWatcher',
)
from portage import OrderedDict
@@ -190,6 +190,7 @@ class EventLoop(object):
self._sigchld_src_id = None
self._pid = os.getpid()
self._asyncio_wrapper = _PortageEventLoop(loop=self)
+ self._asyncio_child_watcher = _PortageChildWatcher(self)
def create_future(self):
"""
@@ -424,8 +425,8 @@ class EventLoop(object):
self._sigchld_read, self.IO_IN, self._sigchld_io_cb)
signal.signal(signal.SIGCHLD, self._sigchld_sig_cb)
- # poll now, in case the SIGCHLD has already arrived
- self._poll_child_processes()
+ # poll soon, in case the SIGCHLD has already arrived
+ self.call_soon(self._poll_child_processes)
return source_id
def _sigchld_sig_cb(self, signum, frame):
diff --git a/pym/portage/util/futures/_asyncio.py b/pym/portage/util/futures/_asyncio.py
index 02ab59999..0f84f14b7 100644
--- a/pym/portage/util/futures/_asyncio.py
+++ b/pym/portage/util/futures/_asyncio.py
@@ -3,7 +3,9 @@
__all__ = (
'ensure_future',
+ 'get_child_watcher',
'get_event_loop',
+ 'set_child_watcher',
'get_event_loop_policy',
'set_event_loop_policy',
'sleep',
@@ -62,6 +64,17 @@ def get_event_loop():
return get_event_loop_policy().get_event_loop()
+def get_child_watcher():
+ """Equivalent to calling get_event_loop_policy().get_child_watcher()."""
+ return get_event_loop_policy().get_child_watcher()
+
+
+def set_child_watcher(watcher):
+ """Equivalent to calling
+ get_event_loop_policy().set_child_watcher(watcher)."""
+ return get_event_loop_policy().set_child_watcher(watcher)
+
+
class Task(Future):
"""
Schedule the execution of a coroutine: wrap it in a future. A task
diff --git a/pym/portage/util/futures/unix_events.py b/pym/portage/util/futures/unix_events.py
index ed4c6e519..6fcef45fa 100644
--- a/pym/portage/util/futures/unix_events.py
+++ b/pym/portage/util/futures/unix_events.py
@@ -2,9 +2,17 @@
# Distributed under the terms of the GNU General Public License v2
__all__ = (
+ 'AbstractChildWatcher',
'DefaultEventLoopPolicy',
)
+try:
+ from asyncio.unix_events import AbstractChildWatcher as _AbstractChildWatcher
+except ImportError:
+ _AbstractChildWatcher = object
+
+import os
+
from portage.util._eventloop.global_event_loop import (
global_event_loop as _global_event_loop,
)
@@ -68,6 +76,84 @@ class _PortageEventLoop(events.AbstractEventLoop):
return asyncio.Task(coro, loop=self)
+class AbstractChildWatcher(_AbstractChildWatcher):
+ def add_child_handler(self, pid, callback, *args):
+ raise NotImplementedError()
+
+ def remove_child_handler(self, pid):
+ raise NotImplementedError()
+
+ def attach_loop(self, loop):
+ raise NotImplementedError()
+
+ def close(self):
+ raise NotImplementedError()
+
+ def __enter__(self):
+ raise NotImplementedError()
+
+ def __exit__(self, a, b, c):
+ raise NotImplementedError()
+
+
+class _PortageChildWatcher(_AbstractChildWatcher):
+ def __init__(self, loop):
+ """
+ @type loop: EventLoop
+ @param loop: an instance of portage's internal event loop
+ """
+ self._loop = loop
+ self._callbacks = {}
+
+ def close(self):
+ pass
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, a, b, c):
+ pass
+
+ def _child_exit(self, pid, status, data):
+ self._callbacks.pop(pid)
+ callback, args = data
+ callback(pid, self._compute_returncode(status), *args)
+
+ def _compute_returncode(self, status):
+ if os.WIFSIGNALED(status):
+ return -os.WTERMSIG(status)
+ elif os.WIFEXITED(status):
+ return os.WEXITSTATUS(status)
+ else:
+ return status
+
+ def add_child_handler(self, pid, callback, *args):
+ """
+ Register a new child handler.
+
+ Arrange for callback(pid, returncode, *args) to be called when
+ process 'pid' terminates. Specifying another callback for the same
+ process replaces the previous handler.
+ """
+ source_id = self._callbacks.get(pid)
+ if source_id is not None:
+ self._loop.source_remove(source_id)
+ self._callbacks[pid] = self._loop.child_watch_add(
+ pid, self._child_exit, data=(callback, args))
+
+ def remove_child_handler(self, pid):
+ """
+ Removes the handler for process 'pid'.
+
+ The function returns True if the handler was successfully removed,
+ False if there was nothing to remove.
+ """
+ source_id = self._callbacks.pop(pid, None)
+ if source_id is not None:
+ return self._loop.source_remove(source_id)
+ return False
+
+
class _PortageEventLoopPolicy(events.AbstractEventLoopPolicy):
"""
Implementation of asyncio.AbstractEventLoopPolicy based on portage's
@@ -87,5 +173,9 @@ class _PortageEventLoopPolicy(events.AbstractEventLoopPolicy):
"""
return _global_event_loop()._asyncio_wrapper
+ def get_child_watcher(self):
+ """Get the watcher for child processes."""
+ return _global_event_loop()._asyncio_child_watcher
+
DefaultEventLoopPolicy = _PortageEventLoopPolicy
^ permalink raw reply related [flat|nested] only message in thread
only message in thread, other threads:[~2018-04-12 8:40 UTC | newest]
Thread overview: (only message) (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2018-04-12 8:40 [gentoo-commits] proj/portage:master commit in: pym/portage/util/futures/, pym/portage/tests/util/futures/asyncio/, Zac Medico
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox