From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from lists.gentoo.org (pigeon.gentoo.org [208.92.234.80]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by finch.gentoo.org (Postfix) with ESMTPS id D11751382C5 for ; Sun, 6 May 2018 00:38:51 +0000 (UTC) Received: from pigeon.gentoo.org (localhost [127.0.0.1]) by pigeon.gentoo.org (Postfix) with SMTP id BE5D6E0870; Sun, 6 May 2018 00:38:50 +0000 (UTC) Received: from smtp.gentoo.org (woodpecker.gentoo.org [IPv6:2001:470:ea4a:1:5054:ff:fec7:86e4]) (using TLSv1.2 with cipher ECDHE-RSA-AES128-GCM-SHA256 (128/128 bits)) (No client certificate requested) by pigeon.gentoo.org (Postfix) with ESMTPS id 81597E0870 for ; Sun, 6 May 2018 00:38:50 +0000 (UTC) Received: from oystercatcher.gentoo.org (unknown [IPv6:2a01:4f8:202:4333:225:90ff:fed9:fc84]) (using TLSv1.2 with cipher ECDHE-RSA-AES128-GCM-SHA256 (128/128 bits)) (No client certificate requested) by smtp.gentoo.org (Postfix) with ESMTPS id DC3E5335C88 for ; Sun, 6 May 2018 00:38:48 +0000 (UTC) Received: from localhost.localdomain (localhost [IPv6:::1]) by oystercatcher.gentoo.org (Postfix) with ESMTP id 729663E for ; Sun, 6 May 2018 00:38:47 +0000 (UTC) From: "Zac Medico" To: gentoo-commits@lists.gentoo.org Content-Transfer-Encoding: 8bit Content-type: text/plain; charset=UTF-8 Reply-To: gentoo-dev@lists.gentoo.org, "Zac Medico" Message-ID: <1525566944.5a5ed99cb5a6e8913df2e9ca29b4b4d5c179c20f.zmedico@gentoo> Subject: [gentoo-commits] proj/portage:master commit in: pym/portage/tests/util/futures/ X-VCS-Repository: proj/portage X-VCS-Files: pym/portage/tests/util/futures/test_retry.py X-VCS-Directories: pym/portage/tests/util/futures/ X-VCS-Committer: zmedico X-VCS-Committer-Name: Zac Medico X-VCS-Revision: 5a5ed99cb5a6e8913df2e9ca29b4b4d5c179c20f X-VCS-Branch: master Date: Sun, 6 May 2018 00:38:47 +0000 (UTC) Precedence: bulk List-Post: List-Help: List-Unsubscribe: List-Subscribe: List-Id: Gentoo Linux mail X-BeenThere: gentoo-commits@lists.gentoo.org X-Archives-Salt: cb75ccaf-47ae-4494-8b3a-061d4ffe383a X-Archives-Hash: 1294c2123d3280e4fede0a873ea874ea commit: 5a5ed99cb5a6e8913df2e9ca29b4b4d5c179c20f Author: Zac Medico gentoo org> AuthorDate: Sat May 5 23:04:10 2018 +0000 Commit: Zac Medico gentoo org> CommitDate: Sun May 6 00:35:44 2018 +0000 URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=5a5ed99c RetryTestCase: support ThreadPoolExecutor (bug 654390) In order to support the default asyncio event loop's ThreadPoolExecutor, use a threading.Event instance to support cancellation of tasks. Bug: https://bugs.gentoo.org/654390 pym/portage/tests/util/futures/test_retry.py | 96 +++++++++++++++++++++------- 1 file changed, 74 insertions(+), 22 deletions(-) diff --git a/pym/portage/tests/util/futures/test_retry.py b/pym/portage/tests/util/futures/test_retry.py index cdca7d294..781eac9a1 100644 --- a/pym/portage/tests/util/futures/test_retry.py +++ b/pym/portage/tests/util/futures/test_retry.py @@ -1,8 +1,6 @@ # Copyright 2018 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 -import functools - try: import threading except ImportError: @@ -28,10 +26,17 @@ class SucceedLater(object): self._succeed_time = monotonic() + duration def __call__(self): + loop = global_event_loop() + result = loop.create_future() remaining = self._succeed_time - monotonic() if remaining > 0: - raise SucceedLaterException('time until success: {} seconds'.format(remaining)) - return 'success' + loop.call_soon_threadsafe(lambda: None if result.done() else + result.set_exception(SucceedLaterException( + 'time until success: {} seconds'.format(remaining)))) + else: + loop.call_soon_threadsafe(lambda: None if result.done() else + result.set_result('success')) + return result class SucceedNeverException(Exception): @@ -43,7 +48,11 @@ class SucceedNever(object): A callable object that never succeeds. """ def __call__(self): - raise SucceedNeverException('expected failure') + loop = global_event_loop() + result = loop.create_future() + loop.call_soon_threadsafe(lambda: None if result.done() else + result.set_exception(SucceedNeverException('expected failure'))) + return result class HangForever(object): @@ -51,14 +60,21 @@ class HangForever(object): A callable object that sleeps forever. """ def __call__(self): - threading.Event().wait() + return global_event_loop().create_future() class RetryTestCase(TestCase): + + def _wrap_coroutine_func(self, coroutine_func): + """ + Derived classes may override this method in order to implement + alternative forms of execution. + """ + return coroutine_func + def testSucceedLater(self): loop = global_event_loop() - func = SucceedLater(1) - func_coroutine = functools.partial(loop.run_in_executor, None, func) + func_coroutine = self._wrap_coroutine_func(SucceedLater(1)) decorator = retry(try_max=9999, delay_func=RandomExponentialBackoff(multiplier=0.1, base=2)) decorated_func = decorator(func_coroutine) @@ -67,8 +83,7 @@ class RetryTestCase(TestCase): def testSucceedNever(self): loop = global_event_loop() - func = SucceedNever() - func_coroutine = functools.partial(loop.run_in_executor, None, func) + func_coroutine = self._wrap_coroutine_func(SucceedNever()) decorator = retry(try_max=4, try_timeout=None, delay_func=RandomExponentialBackoff(multiplier=0.1, base=2)) decorated_func = decorator(func_coroutine) @@ -78,8 +93,7 @@ class RetryTestCase(TestCase): def testSucceedNeverReraise(self): loop = global_event_loop() - func = SucceedNever() - func_coroutine = functools.partial(loop.run_in_executor, None, func) + func_coroutine = self._wrap_coroutine_func(SucceedNever()) decorator = retry(reraise=True, try_max=4, try_timeout=None, delay_func=RandomExponentialBackoff(multiplier=0.1, base=2)) decorated_func = decorator(func_coroutine) @@ -89,8 +103,7 @@ class RetryTestCase(TestCase): def testHangForever(self): loop = global_event_loop() - func = HangForever() - func_coroutine = functools.partial(loop.run_in_executor, None, func) + func_coroutine = self._wrap_coroutine_func(HangForever()) decorator = retry(try_max=2, try_timeout=0.1, delay_func=RandomExponentialBackoff(multiplier=0.1, base=2)) decorated_func = decorator(func_coroutine) @@ -100,8 +113,7 @@ class RetryTestCase(TestCase): def testHangForeverReraise(self): loop = global_event_loop() - func = HangForever() - func_coroutine = functools.partial(loop.run_in_executor, None, func) + func_coroutine = self._wrap_coroutine_func(HangForever()) decorator = retry(reraise=True, try_max=2, try_timeout=0.1, delay_func=RandomExponentialBackoff(multiplier=0.1, base=2)) decorated_func = decorator(func_coroutine) @@ -111,8 +123,7 @@ class RetryTestCase(TestCase): def testCancelRetry(self): loop = global_event_loop() - func = SucceedNever() - func_coroutine = functools.partial(loop.run_in_executor, None, func) + func_coroutine = self._wrap_coroutine_func(SucceedNever()) decorator = retry(try_timeout=0.1, delay_func=RandomExponentialBackoff(multiplier=0.1, base=2)) decorated_func = decorator(func_coroutine) @@ -124,8 +135,7 @@ class RetryTestCase(TestCase): def testOverallTimeoutWithException(self): loop = global_event_loop() - func = SucceedNever() - func_coroutine = functools.partial(loop.run_in_executor, None, func) + func_coroutine = self._wrap_coroutine_func(SucceedNever()) decorator = retry(try_timeout=0.1, overall_timeout=0.3, delay_func=RandomExponentialBackoff(multiplier=0.1, base=2)) decorated_func = decorator(func_coroutine) @@ -136,11 +146,53 @@ class RetryTestCase(TestCase): def testOverallTimeoutWithTimeoutError(self): loop = global_event_loop() # results in TimeoutError because it hangs forever - func = HangForever() - func_coroutine = functools.partial(loop.run_in_executor, None, func) + func_coroutine = self._wrap_coroutine_func(HangForever()) decorator = retry(try_timeout=0.1, overall_timeout=0.3, delay_func=RandomExponentialBackoff(multiplier=0.1, base=2)) decorated_func = decorator(func_coroutine) done, pending = loop.run_until_complete(asyncio.wait([decorated_func()], loop=loop)) self.assertEqual(len(done), 1) self.assertTrue(isinstance(done.pop().exception().__cause__, asyncio.TimeoutError)) + + +class RetryExecutorTestCase(RetryTestCase): + """ + Wrap each coroutine function with AbstractEventLoop.run_in_executor, + in order to test the event loop's default executor. The executor + may use either a thread or a subprocess, and either case is + automatically detected and handled. + """ + def _wrap_coroutine_func(self, coroutine_func): + parent_loop = global_event_loop() + + # Since ThreadPoolExecutor does not propagate cancellation of a + # parent_future to the underlying coroutine, use kill_switch to + # propagate task cancellation to wrapper, so that HangForever's + # thread returns when retry eventually cancels parent_future. + def wrapper(kill_switch): + loop = global_event_loop() + if loop is parent_loop: + # thread in main process + result = coroutine_func() + event = threading.Event() + loop.call_soon_threadsafe(result.add_done_callback, + lambda result: event.set()) + loop.call_soon_threadsafe(kill_switch.add_done_callback, + lambda kill_switch: event.set()) + event.wait() + return result.result() + else: + # child process + return loop.run_until_complete(coroutine_func()) + + def execute_wrapper(): + kill_switch = parent_loop.create_future() + parent_future = asyncio.ensure_future( + parent_loop.run_in_executor(None, wrapper, kill_switch), + loop=parent_loop) + parent_future.add_done_callback( + lambda parent_future: None if kill_switch.done() + else kill_switch.set_result(None)) + return parent_future + + return execute_wrapper