public inbox for gentoo-commits@lists.gentoo.org
 help / color / mirror / Atom feed
* [gentoo-commits] proj/portage:master commit in: lib/portage/tests/util/futures/
@ 2021-01-04  8:38 Zac Medico
  0 siblings, 0 replies; 3+ messages in thread
From: Zac Medico @ 2021-01-04  8:38 UTC (permalink / raw
  To: gentoo-commits

commit:     e93549105d5f009e47710db93abea9a7aeb34324
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Mon Jan  4 07:38:55 2021 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Mon Jan  4 08:11:51 2021 +0000
URL:        https://gitweb.gentoo.org/proj/portage.git/commit/?id=e9354910

test_retry: use context manager to cancel pending futures

Cancel pending futures in order to avoid this "Task was destroyed but
it is pending!" warning message following migration to PEP 492
coroutines with async and await syntax:

testHangForever (portage.tests.util.futures.test_retry.RetryForkExecutorTestCase) ... ok
testHangForever (portage.tests.util.futures.test_retry.RetryTestCase) ... ok
testHangForever (portage.tests.util.futures.test_retry.RetryThreadExecutorTestCase) ... ok

----------------------------------------------------------------------
Ran 3 tests in 0.839s

OK
Task was destroyed but it is pending!
task: <Task cancelling name='Task-4' coro=<HangForever.__call__() running at portage/tests/util/futures/test_retry.py:58> wait_for=<Future cancelled> cb=[RetryForkExecutorTestCase._wrap_coroutine_func.<locals>.wrapper.<locals>.done_callback() at portage/tests/util/futures/test_retry.py:192]>

Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>

 lib/portage/tests/util/futures/test_retry.py | 181 ++++++++++++++++-----------
 1 file changed, 106 insertions(+), 75 deletions(-)

diff --git a/lib/portage/tests/util/futures/test_retry.py b/lib/portage/tests/util/futures/test_retry.py
index ce5fb3e11..6648b1b2c 100644
--- a/lib/portage/tests/util/futures/test_retry.py
+++ b/lib/portage/tests/util/futures/test_retry.py
@@ -1,15 +1,18 @@
 # Copyright 2018-2020 Gentoo Authors
 # Distributed under the terms of the GNU General Public License v2
 
-from concurrent.futures import ThreadPoolExecutor
+from concurrent.futures import Future, ThreadPoolExecutor
+import contextlib
 
 try:
 	import threading
 except ImportError:
 	import dummy_threading as threading
 
+import weakref
 import time
 
+import portage
 from portage.tests import TestCase
 from portage.util._eventloop.global_event_loop import global_event_loop
 from portage.util.backoff import RandomExponentialBackoff
@@ -64,99 +67,100 @@ class HangForever:
 	A callable object that sleeps forever.
 	"""
 	def __call__(self):
-		return global_event_loop().create_future()
+		return asyncio.Future()
 
 
 class RetryTestCase(TestCase):
 
+	@contextlib.contextmanager
 	def _wrap_coroutine_func(self, coroutine_func):
 		"""
 		Derived classes may override this method in order to implement
 		alternative forms of execution.
 		"""
-		return coroutine_func
+		yield coroutine_func
 
 	def testSucceedLater(self):
 		loop = global_event_loop()
-		func_coroutine = self._wrap_coroutine_func(SucceedLater(1))
-		decorator = retry(try_max=9999,
-			delay_func=RandomExponentialBackoff(multiplier=0.1, base=2))
-		decorated_func = decorator(func_coroutine, loop=loop)
-		result = loop.run_until_complete(decorated_func())
-		self.assertEqual(result, 'success')
+		with self._wrap_coroutine_func(SucceedLater(1)) as func_coroutine:
+			decorator = retry(try_max=9999,
+				delay_func=RandomExponentialBackoff(multiplier=0.1, base=2))
+			decorated_func = decorator(func_coroutine, loop=loop)
+			result = loop.run_until_complete(decorated_func())
+			self.assertEqual(result, 'success')
 
 	def testSucceedNever(self):
 		loop = global_event_loop()
-		func_coroutine = self._wrap_coroutine_func(SucceedNever())
-		decorator = retry(try_max=4, try_timeout=None,
-			delay_func=RandomExponentialBackoff(multiplier=0.1, base=2))
-		decorated_func = decorator(func_coroutine, loop=loop)
-		done, pending = loop.run_until_complete(asyncio.wait([decorated_func()], loop=loop))
-		self.assertEqual(len(done), 1)
-		self.assertTrue(isinstance(done.pop().exception().__cause__, SucceedNeverException))
+		with self._wrap_coroutine_func(SucceedNever()) as func_coroutine:
+			decorator = retry(try_max=4, try_timeout=None,
+				delay_func=RandomExponentialBackoff(multiplier=0.1, base=2))
+			decorated_func = decorator(func_coroutine, loop=loop)
+			done, pending = loop.run_until_complete(asyncio.wait([decorated_func()], loop=loop))
+			self.assertEqual(len(done), 1)
+			self.assertTrue(isinstance(done.pop().exception().__cause__, SucceedNeverException))
 
 	def testSucceedNeverReraise(self):
 		loop = global_event_loop()
-		func_coroutine = self._wrap_coroutine_func(SucceedNever())
-		decorator = retry(reraise=True, try_max=4, try_timeout=None,
-			delay_func=RandomExponentialBackoff(multiplier=0.1, base=2))
-		decorated_func = decorator(func_coroutine, loop=loop)
-		done, pending = loop.run_until_complete(asyncio.wait([decorated_func()], loop=loop))
-		self.assertEqual(len(done), 1)
-		self.assertTrue(isinstance(done.pop().exception(), SucceedNeverException))
+		with self._wrap_coroutine_func(SucceedNever()) as func_coroutine:
+			decorator = retry(reraise=True, try_max=4, try_timeout=None,
+				delay_func=RandomExponentialBackoff(multiplier=0.1, base=2))
+			decorated_func = decorator(func_coroutine, loop=loop)
+			done, pending = loop.run_until_complete(asyncio.wait([decorated_func()], loop=loop))
+			self.assertEqual(len(done), 1)
+			self.assertTrue(isinstance(done.pop().exception(), SucceedNeverException))
 
 	def testHangForever(self):
 		loop = global_event_loop()
-		func_coroutine = self._wrap_coroutine_func(HangForever())
-		decorator = retry(try_max=2, try_timeout=0.1,
-			delay_func=RandomExponentialBackoff(multiplier=0.1, base=2))
-		decorated_func = decorator(func_coroutine, loop=loop)
-		done, pending = loop.run_until_complete(asyncio.wait([decorated_func()], loop=loop))
-		self.assertEqual(len(done), 1)
-		self.assertTrue(isinstance(done.pop().exception().__cause__, asyncio.TimeoutError))
+		with self._wrap_coroutine_func(HangForever()) as func_coroutine:
+			decorator = retry(try_max=2, try_timeout=0.1,
+				delay_func=RandomExponentialBackoff(multiplier=0.1, base=2))
+			decorated_func = decorator(func_coroutine, loop=loop)
+			done, pending = loop.run_until_complete(asyncio.wait([decorated_func()], loop=loop))
+			self.assertEqual(len(done), 1)
+			self.assertTrue(isinstance(done.pop().exception().__cause__, asyncio.TimeoutError))
 
 	def testHangForeverReraise(self):
 		loop = global_event_loop()
-		func_coroutine = self._wrap_coroutine_func(HangForever())
-		decorator = retry(reraise=True, try_max=2, try_timeout=0.1,
-			delay_func=RandomExponentialBackoff(multiplier=0.1, base=2))
-		decorated_func = decorator(func_coroutine, loop=loop)
-		done, pending = loop.run_until_complete(asyncio.wait([decorated_func()], loop=loop))
-		self.assertEqual(len(done), 1)
-		self.assertTrue(isinstance(done.pop().exception(), asyncio.TimeoutError))
+		with self._wrap_coroutine_func(HangForever()) as func_coroutine:
+			decorator = retry(reraise=True, try_max=2, try_timeout=0.1,
+				delay_func=RandomExponentialBackoff(multiplier=0.1, base=2))
+			decorated_func = decorator(func_coroutine, loop=loop)
+			done, pending = loop.run_until_complete(asyncio.wait([decorated_func()], loop=loop))
+			self.assertEqual(len(done), 1)
+			self.assertTrue(isinstance(done.pop().exception(), asyncio.TimeoutError))
 
 	def testCancelRetry(self):
 		loop = global_event_loop()
-		func_coroutine = self._wrap_coroutine_func(SucceedNever())
-		decorator = retry(try_timeout=0.1,
-			delay_func=RandomExponentialBackoff(multiplier=0.1, base=2))
-		decorated_func = decorator(func_coroutine, loop=loop)
-		future = decorated_func()
-		loop.call_later(0.3, future.cancel)
-		done, pending = loop.run_until_complete(asyncio.wait([future], loop=loop))
-		self.assertEqual(len(done), 1)
-		self.assertTrue(done.pop().cancelled())
+		with self._wrap_coroutine_func(SucceedNever()) as func_coroutine:
+			decorator = retry(try_timeout=0.1,
+				delay_func=RandomExponentialBackoff(multiplier=0.1, base=2))
+			decorated_func = decorator(func_coroutine, loop=loop)
+			future = decorated_func()
+			loop.call_later(0.3, future.cancel)
+			done, pending = loop.run_until_complete(asyncio.wait([future], loop=loop))
+			self.assertEqual(len(done), 1)
+			self.assertTrue(done.pop().cancelled())
 
 	def testOverallTimeoutWithException(self):
 		loop = global_event_loop()
-		func_coroutine = self._wrap_coroutine_func(SucceedNever())
-		decorator = retry(try_timeout=0.1, overall_timeout=0.3,
-			delay_func=RandomExponentialBackoff(multiplier=0.1, base=2))
-		decorated_func = decorator(func_coroutine, loop=loop)
-		done, pending = loop.run_until_complete(asyncio.wait([decorated_func()], loop=loop))
-		self.assertEqual(len(done), 1)
-		self.assertTrue(isinstance(done.pop().exception().__cause__, SucceedNeverException))
+		with self._wrap_coroutine_func(SucceedNever()) as func_coroutine:
+			decorator = retry(try_timeout=0.1, overall_timeout=0.3,
+				delay_func=RandomExponentialBackoff(multiplier=0.1, base=2))
+			decorated_func = decorator(func_coroutine, loop=loop)
+			done, pending = loop.run_until_complete(asyncio.wait([decorated_func()], loop=loop))
+			self.assertEqual(len(done), 1)
+			self.assertTrue(isinstance(done.pop().exception().__cause__, SucceedNeverException))
 
 	def testOverallTimeoutWithTimeoutError(self):
 		loop = global_event_loop()
 		# results in TimeoutError because it hangs forever
-		func_coroutine = self._wrap_coroutine_func(HangForever())
-		decorator = retry(try_timeout=0.1, overall_timeout=0.3,
-			delay_func=RandomExponentialBackoff(multiplier=0.1, base=2))
-		decorated_func = decorator(func_coroutine, loop=loop)
-		done, pending = loop.run_until_complete(asyncio.wait([decorated_func()], loop=loop))
-		self.assertEqual(len(done), 1)
-		self.assertTrue(isinstance(done.pop().exception().__cause__, asyncio.TimeoutError))
+		with self._wrap_coroutine_func(HangForever()) as func_coroutine:
+			decorator = retry(try_timeout=0.1, overall_timeout=0.3,
+				delay_func=RandomExponentialBackoff(multiplier=0.1, base=2))
+			decorated_func = decorator(func_coroutine, loop=loop)
+			done, pending = loop.run_until_complete(asyncio.wait([decorated_func()], loop=loop))
+			self.assertEqual(len(done), 1)
+			self.assertTrue(isinstance(done.pop().exception().__cause__, asyncio.TimeoutError))
 
 
 class RetryForkExecutorTestCase(RetryTestCase):
@@ -184,43 +188,70 @@ class RetryForkExecutorTestCase(RetryTestCase):
 	def tearDown(self):
 		self._tearDownExecutor()
 
+	@contextlib.contextmanager
 	def _wrap_coroutine_func(self, coroutine_func):
 		parent_loop = global_event_loop()
+		parent_pid = portage.getpid()
+		pending = weakref.WeakValueDictionary()
 
 		# Since ThreadPoolExecutor does not propagate cancellation of a
 		# parent_future to the underlying coroutine, use kill_switch to
 		# propagate task cancellation to wrapper, so that HangForever's
 		# thread returns when retry eventually cancels parent_future.
 		def wrapper(kill_switch):
-			loop = global_event_loop()
-			if loop is parent_loop:
+			if portage.getpid() == parent_pid:
 				# thread in main process
-				result = coroutine_func()
-				event = threading.Event()
-				loop.call_soon_threadsafe(result.add_done_callback,
-					lambda result: event.set())
-				loop.call_soon_threadsafe(kill_switch.add_done_callback,
-					lambda kill_switch: event.set())
-				event.wait()
-				return result.result()
+				def done_callback(result):
+					result.cancelled() or result.exception() or result.result()
+					kill_switch.set()
+				def start_coroutine(future):
+					result = asyncio.ensure_future(coroutine_func(), loop=parent_loop)
+					pending[id(result)] = result
+					result.add_done_callback(done_callback)
+					future.set_result(result)
+				future = Future()
+				parent_loop.call_soon_threadsafe(start_coroutine, future)
+				kill_switch.wait()
+				if not future.done():
+					future.cancel()
+					raise asyncio.CancelledError
+				elif not future.result().done():
+					future.result().cancel()
+					raise asyncio.CancelledError
+				else:
+					return future.result().result()
 
 			# child process
+			loop = global_event_loop()
 			try:
 				return loop.run_until_complete(coroutine_func())
 			finally:
 				loop.close()
 
 		def execute_wrapper():
-			kill_switch = parent_loop.create_future()
+			kill_switch = threading.Event()
 			parent_future = asyncio.ensure_future(
 				parent_loop.run_in_executor(self._executor, wrapper, kill_switch),
 				loop=parent_loop)
-			parent_future.add_done_callback(
-				lambda parent_future: None if kill_switch.done()
-				else kill_switch.set_result(None))
+			def kill_callback(parent_future):
+				if not kill_switch.is_set():
+					kill_switch.set()
+			parent_future.add_done_callback(kill_callback)
 			return parent_future
 
-		return execute_wrapper
+		try:
+			yield execute_wrapper
+		finally:
+			while True:
+				try:
+					_, future = pending.popitem()
+				except KeyError:
+					break
+				try:
+					parent_loop.run_until_complete(future)
+				except (Exception, asyncio.CancelledError):
+					pass
+				future.cancelled() or future.exception() or future.result()
 
 
 class RetryThreadExecutorTestCase(RetryForkExecutorTestCase):


^ permalink raw reply related	[flat|nested] 3+ messages in thread

* [gentoo-commits] proj/portage:master commit in: lib/portage/tests/util/futures/
@ 2021-01-18 12:20 Zac Medico
  0 siblings, 0 replies; 3+ messages in thread
From: Zac Medico @ 2021-01-18 12:20 UTC (permalink / raw
  To: gentoo-commits

commit:     be0b5d33ee8b355dcc9932b52bb1e025e6bd58d4
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Mon Jan 18 11:16:53 2021 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Mon Jan 18 11:31:57 2021 +0000
URL:        https://gitweb.gentoo.org/proj/portage.git/commit/?id=be0b5d33

RetryTestCase: Use async and await syntax

Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>

 lib/portage/tests/util/futures/test_retry.py | 28 +++++++++-------------------
 1 file changed, 9 insertions(+), 19 deletions(-)

diff --git a/lib/portage/tests/util/futures/test_retry.py b/lib/portage/tests/util/futures/test_retry.py
index 6648b1b2c..bce48a693 100644
--- a/lib/portage/tests/util/futures/test_retry.py
+++ b/lib/portage/tests/util/futures/test_retry.py
@@ -1,4 +1,4 @@
-# Copyright 2018-2020 Gentoo Authors
+# Copyright 2018-2021 Gentoo Authors
 # Distributed under the terms of the GNU General Public License v2
 
 from concurrent.futures import Future, ThreadPoolExecutor
@@ -32,18 +32,11 @@ class SucceedLater:
 	def __init__(self, duration):
 		self._succeed_time = time.monotonic() + duration
 
-	def __call__(self):
-		loop = global_event_loop()
-		result = loop.create_future()
+	async def __call__(self):
 		remaining = self._succeed_time - time.monotonic()
 		if remaining > 0:
-			loop.call_soon_threadsafe(lambda: None if result.done() else
-				result.set_exception(SucceedLaterException(
-				'time until success: {} seconds'.format(remaining))))
-		else:
-			loop.call_soon_threadsafe(lambda: None if result.done() else
-				result.set_result('success'))
-		return result
+			await asyncio.sleep(remaining)
+		return 'success'
 
 
 class SucceedNeverException(Exception):
@@ -54,20 +47,17 @@ class SucceedNever:
 	"""
 	A callable object that never succeeds.
 	"""
-	def __call__(self):
-		loop = global_event_loop()
-		result = loop.create_future()
-		loop.call_soon_threadsafe(lambda: None if result.done() else
-			result.set_exception(SucceedNeverException('expected failure')))
-		return result
+	async def __call__(self):
+		raise SucceedNeverException('expected failure')
 
 
 class HangForever:
 	"""
 	A callable object that sleeps forever.
 	"""
-	def __call__(self):
-		return asyncio.Future()
+	async def __call__(self):
+		while True:
+			await asyncio.sleep(9)
 
 
 class RetryTestCase(TestCase):


^ permalink raw reply related	[flat|nested] 3+ messages in thread

* [gentoo-commits] proj/portage:master commit in: lib/portage/tests/util/futures/
@ 2022-11-26 23:25 Zac Medico
  0 siblings, 0 replies; 3+ messages in thread
From: Zac Medico @ 2022-11-26 23:25 UTC (permalink / raw
  To: gentoo-commits

commit:     98536f208194197c521675e0d0072bdc599e015a
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Fri Nov 25 21:21:30 2022 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Fri Nov 25 23:40:38 2022 +0000
URL:        https://gitweb.gentoo.org/proj/portage.git/commit/?id=98536f20

testOverallTimeoutWithException: handle TimeoutError

Bug: https://bugs.gentoo.org/850127
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>

 lib/portage/tests/util/futures/test_retry.py | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git a/lib/portage/tests/util/futures/test_retry.py b/lib/portage/tests/util/futures/test_retry.py
index 8ea832136..cea3e83f5 100644
--- a/lib/portage/tests/util/futures/test_retry.py
+++ b/lib/portage/tests/util/futures/test_retry.py
@@ -176,8 +176,13 @@ class RetryTestCase(TestCase):
                 asyncio.wait([decorated_func()], loop=loop)
             )
             self.assertEqual(len(done), 1)
+            cause = done.pop().exception().__cause__
             self.assertTrue(
-                isinstance(done.pop().exception().__cause__, SucceedNeverException)
+                isinstance(
+                    cause,
+                    (asyncio.TimeoutError, SucceedNeverException),
+                ),
+                msg=f"Cause was {cause.__class__.__name__}",
             )
 
     def testOverallTimeoutWithTimeoutError(self):


^ permalink raw reply related	[flat|nested] 3+ messages in thread

end of thread, other threads:[~2022-11-26 23:25 UTC | newest]

Thread overview: 3+ messages (download: mbox.gz follow: Atom feed
-- links below jump to the message on this page --
2021-01-04  8:38 [gentoo-commits] proj/portage:master commit in: lib/portage/tests/util/futures/ Zac Medico
  -- strict thread matches above, loose matches on Subject: below --
2021-01-18 12:20 Zac Medico
2022-11-26 23:25 Zac Medico

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox