From 4d97a88b34c2c64a47ab02b04b4fe6aea2405d7c Mon Sep 17 00:00:00 2001 From: Daniel Shields Date: Mon, 23 Jan 2023 15:09:57 -0600 Subject: [PATCH 1/5] ProcessPoolExecutor should not share one BrokenProcessPool exception among all failed futures (#101267) --- Lib/concurrent/futures/process.py | 19 +++++++++---------- Lib/test/test_concurrent_futures.py | 22 ++++++++++++++++++++++ Misc/ACKS | 1 + 3 files changed, 32 insertions(+), 10 deletions(-) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 7e2f5fa30e82641..2cb0093141f2be5 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -471,17 +471,15 @@ def terminate_broken(self, cause): executor._shutdown_thread = True executor = None - # All pending tasks are to be marked failed with the following - # BrokenProcessPool error - bpe = BrokenProcessPool("A process in the process pool was " - "terminated abruptly while the future was " - "running or pending.") - if cause is not None: - bpe.__cause__ = _RemoteTraceback( - f"\n'''\n{''.join(cause)}'''") - # Mark pending tasks as failed. for work_id, work_item in self.pending_work_items.items(): + bpe = BrokenProcessPool( + "A process in the process pool was " + "terminated abruptly while the future was " + "running or pending.") + if cause is not None: + bpe.__cause__ = _RemoteTraceback( + f"\n'''\n{''.join(cause)}'''") work_item.future.set_exception(bpe) # Delete references to object. See issue16284 del work_item @@ -762,7 +760,8 @@ def _spawn_process(self): def submit(self, fn, /, *args, **kwargs): with self._shutdown_lock: if self._broken: - raise BrokenProcessPool(self._broken) + raise BrokenProcessPool( + 'cannot schedule new futures after process pool has broken') if self._shutdown_thread: raise RuntimeError('cannot schedule new futures after shutdown') if _global_shutdown: diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index fe9fdc4f44d37ba..0473c5200919746 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -15,6 +15,7 @@ import os import queue import sys +import traceback import threading import time import unittest @@ -980,6 +981,27 @@ def test_killed_child(self): # Submitting other jobs fails as well. self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8) + def test_broken_process_pool_traceback(self): + # When a child process is abruptly terminated, the whole pool gets + # "broken", and a BrokenProcessPool exception should be created + # for each future instead of sharing one exception among all futures. + futures = [self.executor.submit(time.sleep, 3) for _ in range(3)] + # Get one of the processes, and terminate (kill) it. + p = next(iter(self.executor._processes.values())) + p.terminate() + for fut in futures: + count = None + try: + fut.result() + except BrokenProcessPool as e: + count = sum( + 1 + for frame_summary in traceback.extract_tb(e.__traceback__) + if frame_summary.filename == __file__ + ) + # This code file should appear exactly once in the traceback. + self.assertEqual(count, 1) + def test_map_chunksize(self): def bad_map(): list(self.executor.map(pow, range(40), range(40), chunksize=-1)) diff --git a/Misc/ACKS b/Misc/ACKS index 74abcebe21ea600..5c64b786dfbf2d6 100644 --- a/Misc/ACKS +++ b/Misc/ACKS @@ -1649,6 +1649,7 @@ Charlie Shepherd Bruce Sherwood Gregory Shevchenko Hai Shi +Daniel Shields Alexander Shigin Pete Shinners Michael Shiplett From d7699bb2cb6c8b94063a8645c787b7ceef46f1f7 Mon Sep 17 00:00:00 2001 From: "blurb-it[bot]" <43283697+blurb-it[bot]@users.noreply.github.com> Date: Mon, 23 Jan 2023 21:23:51 +0000 Subject: [PATCH 2/5] =?UTF-8?q?=F0=9F=93=9C=F0=9F=A4=96=20Added=20by=20blu?= =?UTF-8?q?rb=5Fit.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../next/Library/2023-01-23-21-23-50.gh-issue-101267._f-cFH.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 Misc/NEWS.d/next/Library/2023-01-23-21-23-50.gh-issue-101267._f-cFH.rst diff --git a/Misc/NEWS.d/next/Library/2023-01-23-21-23-50.gh-issue-101267._f-cFH.rst b/Misc/NEWS.d/next/Library/2023-01-23-21-23-50.gh-issue-101267._f-cFH.rst new file mode 100644 index 000000000000000..9a09e70a5500686 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2023-01-23-21-23-50.gh-issue-101267._f-cFH.rst @@ -0,0 +1 @@ +When a worker process terminates unexpectedly, ``concurrent.futures.ProcessPoolExecutor`` no longer shares one ``BrokenProcessPool`` exception instance among all failed futures. This was unsafe because exceptions are mutable. Malformed (repeated) tracebacks were the most common symptom of this bug. From c36703657de9c36ac5ab55f513baf7cde2de8fa1 Mon Sep 17 00:00:00 2001 From: "Gregory P. Smith" Date: Sat, 13 Jun 2026 07:10:40 +0000 Subject: [PATCH 3/5] gh-101267: keep submit()'s broken reason and hoist cause formatting Two refinements to the broken-pool handling: * submit() raises BrokenProcessPool(self._broken) so the message says why the pool broke, rather than a generic fixed string. * Build the cause traceback string once before the per-future loop instead of rebuilding the same f-string on every iteration. --- Lib/concurrent/futures/process.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 8397e5d317dd774..10d4ac89d725713 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -470,10 +470,8 @@ def _terminate_broken(self, cause): executor = None # All pending tasks are to be marked failed with a - # BrokenProcessPool error. The cause is shared by every future, - # but a separate exception instance is created for each one below: - # raising a single shared instance from multiple Future.result() - # calls accumulates tracebacks onto it (gh-101267). + # BrokenProcessPool error, as separate instances to avoid sharing + # a traceback (gh-101267). cause_str = None if cause is not None: cause_str = ''.join(cause) @@ -489,14 +487,15 @@ def _terminate_broken(self, cause): f"with exit code {p.exitcode}") if errors: cause_str = "\n".join(errors) + cause_tb = f"\n'''\n{cause_str}'''" if cause_str else None # Mark pending tasks as failed. for work_id, work_item in self.pending_work_items.items(): bpe = BrokenProcessPool("A process in the process pool was " "terminated abruptly while the future was " "running or pending.") - if cause_str: - bpe.__cause__ = _RemoteTraceback(f"\n'''\n{cause_str}'''") + if cause_tb is not None: + bpe.__cause__ = _RemoteTraceback(cause_tb) try: work_item.future.set_exception(bpe) except _base.InvalidStateError: @@ -815,8 +814,7 @@ def _spawn_process(self): def submit(self, fn, /, *args, **kwargs): with self._shutdown_lock: if self._broken: - raise BrokenProcessPool( - 'cannot schedule new futures after process pool has broken') + raise BrokenProcessPool(self._broken) if self._shutdown_thread: raise RuntimeError('cannot schedule new futures after shutdown') if _global_shutdown: From dd89a4122b0ddde151a84e61fa8acde83690a8c5 Mon Sep 17 00:00:00 2001 From: "Gregory P. Smith" Date: Sat, 13 Jun 2026 07:10:44 +0000 Subject: [PATCH 4/5] gh-101267: make the regression test deterministic and self-checking * Block the workers on a never-set Event (via the harness create_event()) instead of submitting time.sleep(3): the sleep could finish before the worker is terminated on a loaded machine, completing the future and raising no BrokenProcessPool. * Use try/except/else with self.fail() rather than seeding count = None, so a missing exception is reported clearly. assertRaises() can't be used here because it strips the traceback the test needs to inspect. --- .../test_process_pool.py | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/Lib/test/test_concurrent_futures/test_process_pool.py b/Lib/test/test_concurrent_futures/test_process_pool.py index d5e55dd5f792521..45cfd46fb7befbf 100644 --- a/Lib/test/test_concurrent_futures/test_process_pool.py +++ b/Lib/test/test_concurrent_futures/test_process_pool.py @@ -63,25 +63,30 @@ def test_killed_child(self): # Submitting other jobs fails as well. self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8) + @warnings_helper.ignore_fork_in_thread_deprecation_warnings() def test_broken_process_pool_traceback(self): # When a child process is abruptly terminated, the whole pool gets # "broken", and a BrokenProcessPool exception should be created # for each future instead of sharing one exception among all futures. - futures = [self.executor.submit(time.sleep, 3) for _ in range(3)] - # Get one of the processes, and terminate (kill) it. + event = self.create_event() + futures = [self.executor.submit(event.wait) for _ in range(3)] p = next(iter(self.executor._processes.values())) p.terminate() for fut in futures: - count = None + # Don't use assertRaises(): it clears the traceback off exc. try: fut.result() - except BrokenProcessPool as e: - count = sum( - 1 - for frame_summary in traceback.extract_tb(e.__traceback__) - if frame_summary.filename == __file__ - ) + except BrokenProcessPool as exc: + tb = exc.__traceback__ + else: + self.fail("BrokenProcessPool not raised") + count = sum( + 1 + for frame_summary in traceback.extract_tb(tb) + if frame_summary.filename == __file__ + ) # This code file should appear exactly once in the traceback. + # A shared exception would accumulate a frame per result() call. self.assertEqual(count, 1) @warnings_helper.ignore_fork_in_thread_deprecation_warnings() From b5bf2d7a6b245860d5adda2f0f3798a0e6c445de Mon Sep 17 00:00:00 2001 From: "Gregory P. Smith" Date: Sat, 13 Jun 2026 07:10:44 +0000 Subject: [PATCH 5/5] gh-101267: clarify the NEWS entry and use reST roles Describe the actual failure (re-raising one shared exception appends a traceback to it on each Future.result()) and cross-reference the API names. --- .../2023-01-23-21-23-50.gh-issue-101267._f-cFH.rst | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/Misc/NEWS.d/next/Library/2023-01-23-21-23-50.gh-issue-101267._f-cFH.rst b/Misc/NEWS.d/next/Library/2023-01-23-21-23-50.gh-issue-101267._f-cFH.rst index 9a09e70a5500686..901a3fb60ab5b9f 100644 --- a/Misc/NEWS.d/next/Library/2023-01-23-21-23-50.gh-issue-101267._f-cFH.rst +++ b/Misc/NEWS.d/next/Library/2023-01-23-21-23-50.gh-issue-101267._f-cFH.rst @@ -1 +1,7 @@ -When a worker process terminates unexpectedly, ``concurrent.futures.ProcessPoolExecutor`` no longer shares one ``BrokenProcessPool`` exception instance among all failed futures. This was unsafe because exceptions are mutable. Malformed (repeated) tracebacks were the most common symptom of this bug. +When a worker process terminates unexpectedly, +:class:`concurrent.futures.ProcessPoolExecutor` now sets a separate +:exc:`~concurrent.futures.process.BrokenProcessPool` exception on each pending +future instead of sharing a single instance among them all. Sharing one +exception produced malformed tracebacks: each +:meth:`Future.result() ` call re-raised the +same object, appending another copy of the traceback to it.