public inbox for gentoo-commits@lists.gentoo.org
 help / color / mirror / Atom feed
* [gentoo-commits] proj/portage:master commit in: pym/portage/tests/util/futures/
@ 2018-04-22 16:25 Zac Medico
  0 siblings, 0 replies; 3+ messages in thread
From: Zac Medico @ 2018-04-22 16:25 UTC (permalink / raw
  To: gentoo-commits

commit:     a6e9c7cf429741015e26b923c8036416cc6bff7d
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Sun Apr 22 16:19:27 2018 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Sun Apr 22 16:24:37 2018 +0000
URL:        https://gitweb.gentoo.org/proj/portage.git/commit/?id=a6e9c7cf

test_iter_completed: fix SleepProcess._future_done cancel race

Fixes: a9e8ebaa6979 ("Add async_iter_completed for asyncio migration (bug 591760)")

 pym/portage/tests/util/futures/test_iter_completed.py | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/pym/portage/tests/util/futures/test_iter_completed.py b/pym/portage/tests/util/futures/test_iter_completed.py
index 1344523c6..b07146ed3 100644
--- a/pym/portage/tests/util/futures/test_iter_completed.py
+++ b/pym/portage/tests/util/futures/test_iter_completed.py
@@ -19,7 +19,8 @@ class SleepProcess(ForkProcess):
 		ForkProcess._start(self)
 
 	def _future_done(self, task):
-		self.future.set_result(self.seconds)
+		if not self.future.cancelled():
+			self.future.set_result(self.seconds)
 
 	def _run(self):
 		time.sleep(self.seconds)


^ permalink raw reply related	[flat|nested] 3+ messages in thread

* [gentoo-commits] proj/portage:master commit in: pym/portage/tests/util/futures/
@ 2018-05-06  0:38 Zac Medico
  0 siblings, 0 replies; 3+ messages in thread
From: Zac Medico @ 2018-05-06  0:38 UTC (permalink / raw
  To: gentoo-commits

commit:     5a5ed99cb5a6e8913df2e9ca29b4b4d5c179c20f
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Sat May  5 23:04:10 2018 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Sun May  6 00:35:44 2018 +0000
URL:        https://gitweb.gentoo.org/proj/portage.git/commit/?id=5a5ed99c

RetryTestCase: support ThreadPoolExecutor (bug 654390)

In order to support the default asyncio event loop's
ThreadPoolExecutor, use a threading.Event instance to
support cancellation of tasks.

Bug: https://bugs.gentoo.org/654390

 pym/portage/tests/util/futures/test_retry.py | 96 +++++++++++++++++++++-------
 1 file changed, 74 insertions(+), 22 deletions(-)

diff --git a/pym/portage/tests/util/futures/test_retry.py b/pym/portage/tests/util/futures/test_retry.py
index cdca7d294..781eac9a1 100644
--- a/pym/portage/tests/util/futures/test_retry.py
+++ b/pym/portage/tests/util/futures/test_retry.py
@@ -1,8 +1,6 @@
 # Copyright 2018 Gentoo Foundation
 # Distributed under the terms of the GNU General Public License v2
 
-import functools
-
 try:
 	import threading
 except ImportError:
@@ -28,10 +26,17 @@ class SucceedLater(object):
 		self._succeed_time = monotonic() + duration
 
 	def __call__(self):
+		loop = global_event_loop()
+		result = loop.create_future()
 		remaining = self._succeed_time - monotonic()
 		if remaining > 0:
-			raise SucceedLaterException('time until success: {} seconds'.format(remaining))
-		return 'success'
+			loop.call_soon_threadsafe(lambda: None if result.done() else
+				result.set_exception(SucceedLaterException(
+				'time until success: {} seconds'.format(remaining))))
+		else:
+			loop.call_soon_threadsafe(lambda: None if result.done() else
+				result.set_result('success'))
+		return result
 
 
 class SucceedNeverException(Exception):
@@ -43,7 +48,11 @@ class SucceedNever(object):
 	A callable object that never succeeds.
 	"""
 	def __call__(self):
-		raise SucceedNeverException('expected failure')
+		loop = global_event_loop()
+		result = loop.create_future()
+		loop.call_soon_threadsafe(lambda: None if result.done() else
+			result.set_exception(SucceedNeverException('expected failure')))
+		return result
 
 
 class HangForever(object):
@@ -51,14 +60,21 @@ class HangForever(object):
 	A callable object that sleeps forever.
 	"""
 	def __call__(self):
-		threading.Event().wait()
+		return global_event_loop().create_future()
 
 
 class RetryTestCase(TestCase):
+
+	def _wrap_coroutine_func(self, coroutine_func):
+		"""
+		Derived classes may override this method in order to implement
+		alternative forms of execution.
+		"""
+		return coroutine_func
+
 	def testSucceedLater(self):
 		loop = global_event_loop()
-		func = SucceedLater(1)
-		func_coroutine = functools.partial(loop.run_in_executor, None, func)
+		func_coroutine = self._wrap_coroutine_func(SucceedLater(1))
 		decorator = retry(try_max=9999,
 			delay_func=RandomExponentialBackoff(multiplier=0.1, base=2))
 		decorated_func = decorator(func_coroutine)
@@ -67,8 +83,7 @@ class RetryTestCase(TestCase):
 
 	def testSucceedNever(self):
 		loop = global_event_loop()
-		func = SucceedNever()
-		func_coroutine = functools.partial(loop.run_in_executor, None, func)
+		func_coroutine = self._wrap_coroutine_func(SucceedNever())
 		decorator = retry(try_max=4, try_timeout=None,
 			delay_func=RandomExponentialBackoff(multiplier=0.1, base=2))
 		decorated_func = decorator(func_coroutine)
@@ -78,8 +93,7 @@ class RetryTestCase(TestCase):
 
 	def testSucceedNeverReraise(self):
 		loop = global_event_loop()
-		func = SucceedNever()
-		func_coroutine = functools.partial(loop.run_in_executor, None, func)
+		func_coroutine = self._wrap_coroutine_func(SucceedNever())
 		decorator = retry(reraise=True, try_max=4, try_timeout=None,
 			delay_func=RandomExponentialBackoff(multiplier=0.1, base=2))
 		decorated_func = decorator(func_coroutine)
@@ -89,8 +103,7 @@ class RetryTestCase(TestCase):
 
 	def testHangForever(self):
 		loop = global_event_loop()
-		func = HangForever()
-		func_coroutine = functools.partial(loop.run_in_executor, None, func)
+		func_coroutine = self._wrap_coroutine_func(HangForever())
 		decorator = retry(try_max=2, try_timeout=0.1,
 			delay_func=RandomExponentialBackoff(multiplier=0.1, base=2))
 		decorated_func = decorator(func_coroutine)
@@ -100,8 +113,7 @@ class RetryTestCase(TestCase):
 
 	def testHangForeverReraise(self):
 		loop = global_event_loop()
-		func = HangForever()
-		func_coroutine = functools.partial(loop.run_in_executor, None, func)
+		func_coroutine = self._wrap_coroutine_func(HangForever())
 		decorator = retry(reraise=True, try_max=2, try_timeout=0.1,
 			delay_func=RandomExponentialBackoff(multiplier=0.1, base=2))
 		decorated_func = decorator(func_coroutine)
@@ -111,8 +123,7 @@ class RetryTestCase(TestCase):
 
 	def testCancelRetry(self):
 		loop = global_event_loop()
-		func = SucceedNever()
-		func_coroutine = functools.partial(loop.run_in_executor, None, func)
+		func_coroutine = self._wrap_coroutine_func(SucceedNever())
 		decorator = retry(try_timeout=0.1,
 			delay_func=RandomExponentialBackoff(multiplier=0.1, base=2))
 		decorated_func = decorator(func_coroutine)
@@ -124,8 +135,7 @@ class RetryTestCase(TestCase):
 
 	def testOverallTimeoutWithException(self):
 		loop = global_event_loop()
-		func = SucceedNever()
-		func_coroutine = functools.partial(loop.run_in_executor, None, func)
+		func_coroutine = self._wrap_coroutine_func(SucceedNever())
 		decorator = retry(try_timeout=0.1, overall_timeout=0.3,
 			delay_func=RandomExponentialBackoff(multiplier=0.1, base=2))
 		decorated_func = decorator(func_coroutine)
@@ -136,11 +146,53 @@ class RetryTestCase(TestCase):
 	def testOverallTimeoutWithTimeoutError(self):
 		loop = global_event_loop()
 		# results in TimeoutError because it hangs forever
-		func = HangForever()
-		func_coroutine = functools.partial(loop.run_in_executor, None, func)
+		func_coroutine = self._wrap_coroutine_func(HangForever())
 		decorator = retry(try_timeout=0.1, overall_timeout=0.3,
 			delay_func=RandomExponentialBackoff(multiplier=0.1, base=2))
 		decorated_func = decorator(func_coroutine)
 		done, pending = loop.run_until_complete(asyncio.wait([decorated_func()], loop=loop))
 		self.assertEqual(len(done), 1)
 		self.assertTrue(isinstance(done.pop().exception().__cause__, asyncio.TimeoutError))
+
+
+class RetryExecutorTestCase(RetryTestCase):
+	"""
+	Wrap each coroutine function with AbstractEventLoop.run_in_executor,
+	in order to test the event loop's default executor. The executor
+	may use either a thread or a subprocess, and either case is
+	automatically detected and handled.
+	"""
+	def _wrap_coroutine_func(self, coroutine_func):
+		parent_loop = global_event_loop()
+
+		# Since ThreadPoolExecutor does not propagate cancellation of a
+		# parent_future to the underlying coroutine, use kill_switch to
+		# propagate task cancellation to wrapper, so that HangForever's
+		# thread returns when retry eventually cancels parent_future.
+		def wrapper(kill_switch):
+			loop = global_event_loop()
+			if loop is parent_loop:
+				# thread in main process
+				result = coroutine_func()
+				event = threading.Event()
+				loop.call_soon_threadsafe(result.add_done_callback,
+					lambda result: event.set())
+				loop.call_soon_threadsafe(kill_switch.add_done_callback,
+					lambda kill_switch: event.set())
+				event.wait()
+				return result.result()
+			else:
+				# child process
+				return loop.run_until_complete(coroutine_func())
+
+		def execute_wrapper():
+			kill_switch = parent_loop.create_future()
+			parent_future = asyncio.ensure_future(
+				parent_loop.run_in_executor(None, wrapper, kill_switch),
+				loop=parent_loop)
+			parent_future.add_done_callback(
+				lambda parent_future: None if kill_switch.done()
+				else kill_switch.set_result(None))
+			return parent_future
+
+		return execute_wrapper


^ permalink raw reply related	[flat|nested] 3+ messages in thread

* [gentoo-commits] proj/portage:master commit in: pym/portage/tests/util/futures/
@ 2018-05-07  0:27 Zac Medico
  0 siblings, 0 replies; 3+ messages in thread
From: Zac Medico @ 2018-05-07  0:27 UTC (permalink / raw
  To: gentoo-commits

commit:     fefc6f1feb57503ce4826de1e405c12ef761c0ce
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Sun May  6 23:41:15 2018 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Mon May  7 00:19:54 2018 +0000
URL:        https://gitweb.gentoo.org/proj/portage.git/commit/?id=fefc6f1f

RetryTestCase: test ThreadPoolExecutor and ForkExecutor (bug 654390)

ThreadPoolExecutor is the default asyncio event loop's default executor,
so explicitly test it. Also explicitly test ForkExecutor, since it is
more useful in cases where tasks may need to be forcefully cancelled.

Bug: https://bugs.gentoo.org/654390

 pym/portage/tests/util/futures/test_retry.py | 37 ++++++++++++++++++++++++++--
 1 file changed, 35 insertions(+), 2 deletions(-)

diff --git a/pym/portage/tests/util/futures/test_retry.py b/pym/portage/tests/util/futures/test_retry.py
index baf293d56..16ecccbc7 100644
--- a/pym/portage/tests/util/futures/test_retry.py
+++ b/pym/portage/tests/util/futures/test_retry.py
@@ -1,16 +1,24 @@
 # Copyright 2018 Gentoo Foundation
 # Distributed under the terms of the GNU General Public License v2
 
+try:
+	from concurrent.futures import ThreadPoolExecutor
+except ImportError:
+	ThreadPoolExecutor = None
+
 try:
 	import threading
 except ImportError:
 	import dummy_threading as threading
 
+import sys
+
 from portage.tests import TestCase
 from portage.util._eventloop.global_event_loop import global_event_loop
 from portage.util.backoff import RandomExponentialBackoff
 from portage.util.futures import asyncio
 from portage.util.futures.retry import retry
+from portage.util.futures.executor.fork import ForkExecutor
 from portage.util.monotonic import monotonic
 
 
@@ -155,13 +163,31 @@ class RetryTestCase(TestCase):
 		self.assertTrue(isinstance(done.pop().exception().__cause__, asyncio.TimeoutError))
 
 
-class RetryExecutorTestCase(RetryTestCase):
+class RetryForkExecutorTestCase(RetryTestCase):
 	"""
 	Wrap each coroutine function with AbstractEventLoop.run_in_executor,
 	in order to test the event loop's default executor. The executor
 	may use either a thread or a subprocess, and either case is
 	automatically detected and handled.
 	"""
+	def __init__(self, *pargs, **kwargs):
+		super(RetryForkExecutorTestCase, self).__init__(*pargs, **kwargs)
+		self._executor = None
+
+	def _setUpExecutor(self):
+		self._executor = ForkExecutor()
+
+	def _tearDownExecutor(self):
+		if self._executor is not None:
+			self._executor.shutdown(wait=True)
+			self._executor = None
+
+	def setUp(self):
+		self._setUpExecutor()
+
+	def tearDown(self):
+		self._tearDownExecutor()
+
 	def _wrap_coroutine_func(self, coroutine_func):
 		parent_loop = global_event_loop()
 
@@ -191,7 +217,7 @@ class RetryExecutorTestCase(RetryTestCase):
 		def execute_wrapper():
 			kill_switch = parent_loop.create_future()
 			parent_future = asyncio.ensure_future(
-				parent_loop.run_in_executor(None, wrapper, kill_switch),
+				parent_loop.run_in_executor(self._executor, wrapper, kill_switch),
 				loop=parent_loop)
 			parent_future.add_done_callback(
 				lambda parent_future: None if kill_switch.done()
@@ -199,3 +225,10 @@ class RetryExecutorTestCase(RetryTestCase):
 			return parent_future
 
 		return execute_wrapper
+
+
+class RetryThreadExecutorTestCase(RetryForkExecutorTestCase):
+	def _setUpExecutor(self):
+		if sys.version_info.major < 3:
+			self.skipTest('ThreadPoolExecutor not supported for python2')
+		self._executor = ThreadPoolExecutor(max_workers=1)


^ permalink raw reply related	[flat|nested] 3+ messages in thread

end of thread, other threads:[~2018-05-07  0:27 UTC | newest]

Thread overview: 3+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2018-05-07  0:27 [gentoo-commits] proj/portage:master commit in: pym/portage/tests/util/futures/ Zac Medico
  -- strict thread matches above, loose matches on Subject: below --
2018-05-06  0:38 Zac Medico
2018-04-22 16:25 Zac Medico

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox