From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from lists.gentoo.org (pigeon.gentoo.org [208.92.234.80]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by finch.gentoo.org (Postfix) with ESMTPS id C10491382C5 for ; Tue, 17 Apr 2018 00:53:33 +0000 (UTC) Received: from pigeon.gentoo.org (localhost [127.0.0.1]) by pigeon.gentoo.org (Postfix) with SMTP id D4677E0837; Tue, 17 Apr 2018 00:53:32 +0000 (UTC) Received: from smtp.gentoo.org (dev.gentoo.org [IPv6:2001:470:ea4a:1:5054:ff:fec7:86e4]) (using TLSv1.2 with cipher ECDHE-RSA-AES128-GCM-SHA256 (128/128 bits)) (No client certificate requested) by pigeon.gentoo.org (Postfix) with ESMTPS id 790DCE0837 for ; Tue, 17 Apr 2018 00:53:32 +0000 (UTC) Received: from oystercatcher.gentoo.org (oystercatcher.gentoo.org [148.251.78.52]) (using TLSv1.2 with cipher ECDHE-RSA-AES128-GCM-SHA256 (128/128 bits)) (No client certificate requested) by smtp.gentoo.org (Postfix) with ESMTPS id A5EA1335C2C for ; Tue, 17 Apr 2018 00:53:30 +0000 (UTC) Received: from localhost.localdomain (localhost [IPv6:::1]) by oystercatcher.gentoo.org (Postfix) with ESMTP id CD9B0270 for ; Tue, 17 Apr 2018 00:53:28 +0000 (UTC) From: "Zac Medico" To: gentoo-commits@lists.gentoo.org Content-Transfer-Encoding: 8bit Content-type: text/plain; charset=UTF-8 Reply-To: gentoo-dev@lists.gentoo.org, "Zac Medico" Message-ID: <1523926149.9772f8f2a58a858a80ad1542d1ce46193616be67.zmedico@gentoo> Subject: [gentoo-commits] proj/portage:master commit in: pym/portage/util/_eventloop/ X-VCS-Repository: proj/portage X-VCS-Files: pym/portage/util/_eventloop/EventLoop.py X-VCS-Directories: pym/portage/util/_eventloop/ X-VCS-Committer: zmedico X-VCS-Committer-Name: Zac Medico X-VCS-Revision: 9772f8f2a58a858a80ad1542d1ce46193616be67 X-VCS-Branch: master Date: Tue, 17 Apr 2018 00:53:28 +0000 (UTC) Precedence: bulk List-Post: List-Help: List-Unsubscribe: List-Subscribe: List-Id: Gentoo Linux mail X-BeenThere: gentoo-commits@lists.gentoo.org X-Archives-Salt: 260df328-d2d1-4195-866a-30f7e00efc60 X-Archives-Hash: 49fe45621b3ea3383930c601b2523623 commit: 9772f8f2a58a858a80ad1542d1ce46193616be67 Author: Zac Medico gentoo org> AuthorDate: Mon Apr 16 23:55:14 2018 +0000 Commit: Zac Medico gentoo org> CommitDate: Tue Apr 17 00:49:09 2018 +0000 URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=9772f8f2 EventLoop._idle_add: use thread-safe deque append This fixes previously unsafe usage of self._idle_callbacks when it was a dictionary. The deque append is thread-safe, but it does *not* notify the loop's thread, so the caller must notify if appropriate. Fixes: 1ee8971ba1cb ("EventLoop: eliminate thread safety from call_soon") pym/portage/util/_eventloop/EventLoop.py | 90 ++++++++++++++++++++------------ 1 file changed, 58 insertions(+), 32 deletions(-) diff --git a/pym/portage/util/_eventloop/EventLoop.py b/pym/portage/util/_eventloop/EventLoop.py index ae5a0a70a..d4f20c6ed 100644 --- a/pym/portage/util/_eventloop/EventLoop.py +++ b/pym/portage/util/_eventloop/EventLoop.py @@ -3,6 +3,7 @@ from __future__ import division +import collections import errno import functools import logging @@ -30,7 +31,6 @@ portage.proxy.lazyimport.lazyimport(globals(), 'portage.util.futures.unix_events:_PortageEventLoop,_PortageChildWatcher', ) -from portage import OrderedDict from portage.util import writemsg_level from portage.util.monotonic import monotonic from ..SlotObject import SlotObject @@ -55,7 +55,7 @@ class EventLoop(object): __slots__ = ("callback", "data", "pid", "source_id") class _idle_callback_class(SlotObject): - __slots__ = ("args", "callback", "calling", "source_id") + __slots__ = ("_args", "_callback", "_calling", "_cancelled") class _io_handler_class(SlotObject): __slots__ = ("args", "callback", "f", "source_id") @@ -141,10 +141,10 @@ class EventLoop(object): # If this attribute has changed since the last time that the # call_soon callbacks have been called, then it's not safe to # wait on self._thread_condition without a timeout. - self._call_soon_id = 0 - # Use OrderedDict in order to emulate the FIFO queue behavior - # of the AbstractEventLoop.call_soon method. - self._idle_callbacks = OrderedDict() + self._call_soon_id = None + # Use deque, with thread-safe append, in order to emulate the FIFO + # queue behavior of the AbstractEventLoop.call_soon method. + self._idle_callbacks = collections.deque() self._timeout_handlers = {} self._timeout_interval = None self._default_executor = None @@ -298,7 +298,10 @@ class EventLoop(object): events_handled += 1 timeouts_checked = True - call_soon = prev_call_soon_id != self._call_soon_id + call_soon = prev_call_soon_id is not self._call_soon_id + if self._call_soon_id is not None and self._call_soon_id._cancelled: + # Allow garbage collection of cancelled callback. + self._call_soon_id = None if (not call_soon and not event_handlers and not events_handled and may_block): @@ -501,8 +504,9 @@ class EventLoop(object): @type callback: callable @param callback: a function to call - @rtype: int - @return: an integer ID + @return: a handle which can be used to cancel the callback + via the source_remove method + @rtype: object """ with self._thread_condition: source_id = self._idle_add(callback, *args) @@ -511,32 +515,51 @@ class EventLoop(object): def _idle_add(self, callback, *args): """Like idle_add(), but without thread safety.""" - source_id = self._call_soon_id = self._new_source_id() - self._idle_callbacks[source_id] = self._idle_callback_class( - args=args, callback=callback, source_id=source_id) - return source_id + # Hold self._thread_condition when assigning self._call_soon_id, + # since it might be modified via a thread-safe method. + with self._thread_condition: + handle = self._call_soon_id = self._idle_callback_class( + _args=args, _callback=callback) + # This deque append is thread-safe, but it does *not* notify the + # loop's thread, so the caller must notify if appropriate. + self._idle_callbacks.append(handle) + return handle def _run_idle_callbacks(self): # assumes caller has acquired self._thread_rlock if not self._idle_callbacks: return False state_change = 0 - # Iterate of our local list, since self._idle_callbacks can be - # modified during the exection of these callbacks. - for x in list(self._idle_callbacks.values()): - if x.source_id not in self._idle_callbacks: - # it got cancelled while executing another callback - continue - if x.calling: - # don't call it recursively - continue - x.calling = True - try: - if not x.callback(*x.args): - state_change += 1 - self.source_remove(x.source_id) - finally: - x.calling = False + reschedule = [] + # Use remaining count to avoid calling any newly scheduled callbacks, + # since self._idle_callbacks can be modified during the exection of + # these callbacks. + remaining = len(self._idle_callbacks) + try: + while remaining: + remaining -= 1 + try: + x = self._idle_callbacks.popleft() # thread-safe + except IndexError: + break + if x._cancelled: + # it got cancelled while executing another callback + continue + if x._calling: + # don't call it recursively + continue + x._calling = True + try: + if x._callback(*x._args): + reschedule.append(x) + else: + x._cancelled = True + state_change += 1 + finally: + x._calling = False + finally: + # Reschedule those that were not cancelled. + self._idle_callbacks.extend(reschedule) return bool(state_change) @@ -732,6 +755,12 @@ class EventLoop(object): is found and removed, and False if the reg_id is invalid or has already been removed. """ + if isinstance(reg_id, self._idle_callback_class): + if not reg_id._cancelled: + reg_id._cancelled = True + return True + return False + x = self._child_handlers.pop(reg_id, None) if x is not None: if not self._child_handlers and self._use_signal: @@ -741,9 +770,6 @@ class EventLoop(object): return True with self._thread_rlock: - idle_callback = self._idle_callbacks.pop(reg_id, None) - if idle_callback is not None: - return True timeout_handler = self._timeout_handlers.pop(reg_id, None) if timeout_handler is not None: if timeout_handler.interval == self._timeout_interval: