diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 8ff717f5e003bfb..ad718bf92fe2ac9 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -469,17 +469,20 @@ def _terminate_broken(self, cause): executor._shutdown_thread = True executor = None - # All pending tasks are to be marked failed with the following - # BrokenProcessPool error - bpe = BrokenProcessPool("A process in the process pool was " - "terminated abruptly while the future was " - "running or pending.") + # All pending tasks are to be marked failed with a + # BrokenProcessPool error, as separate instances to avoid sharing + # a traceback (gh-101267). + cause_tb = None if cause is not None: - bpe.__cause__ = _RemoteTraceback( - f"\n'''\n{''.join(cause)}'''") + cause_tb = f"\n'''\n{''.join(cause)}'''" # Mark pending tasks as failed. for work_id, work_item in self.pending_work_items.items(): + bpe = BrokenProcessPool("A process in the process pool was " + "terminated abruptly while the future was " + "running or pending.") + if cause_tb is not None: + bpe.__cause__ = _RemoteTraceback(cause_tb) try: work_item.future.set_exception(bpe) except _base.InvalidStateError: diff --git a/Lib/test/test_concurrent_futures/test_process_pool.py b/Lib/test/test_concurrent_futures/test_process_pool.py index 8b1bdaa33d8f5c4..7cddcd0d9328453 100644 --- a/Lib/test/test_concurrent_futures/test_process_pool.py +++ b/Lib/test/test_concurrent_futures/test_process_pool.py @@ -2,6 +2,7 @@ import sys import threading import time +import traceback import unittest from concurrent import futures from concurrent.futures.process import BrokenProcessPool @@ -43,6 +44,31 @@ def test_killed_child(self): # Submitting other jobs fails as well. self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8) + def test_broken_process_pool_traceback(self): + # When a child process is abruptly terminated, the whole pool gets + # "broken", and a BrokenProcessPool exception should be created + # for each future instead of sharing one exception among all futures. + event = self.create_event() + futures = [self.executor.submit(event.wait) for _ in range(3)] + p = next(iter(self.executor._processes.values())) + p.terminate() + for fut in futures: + # Don't use assertRaises(): it clears the traceback off exc. + try: + fut.result() + except BrokenProcessPool as exc: + tb = exc.__traceback__ + else: + self.fail("BrokenProcessPool not raised") + count = sum( + 1 + for frame_summary in traceback.extract_tb(tb) + if frame_summary.filename == __file__ + ) + # This code file should appear exactly once in the traceback. + # A shared exception would accumulate a frame per result() call. + self.assertEqual(count, 1) + def test_map_chunksize(self): def bad_map(): list(self.executor.map(pow, range(40), range(40), chunksize=-1)) diff --git a/Misc/ACKS b/Misc/ACKS index d7eb6f44300c044..c1216b21e373212 100644 --- a/Misc/ACKS +++ b/Misc/ACKS @@ -1711,6 +1711,7 @@ Charlie Shepherd Bruce Sherwood Gregory Shevchenko Hai Shi +Daniel Shields Alexander Shigin Pete Shinners Michael Shiplett diff --git a/Misc/NEWS.d/next/Library/2023-01-23-21-23-50.gh-issue-101267._f-cFH.rst b/Misc/NEWS.d/next/Library/2023-01-23-21-23-50.gh-issue-101267._f-cFH.rst new file mode 100644 index 000000000000000..901a3fb60ab5b9f --- /dev/null +++ b/Misc/NEWS.d/next/Library/2023-01-23-21-23-50.gh-issue-101267._f-cFH.rst @@ -0,0 +1,7 @@ +When a worker process terminates unexpectedly, +:class:`concurrent.futures.ProcessPoolExecutor` now sets a separate +:exc:`~concurrent.futures.process.BrokenProcessPool` exception on each pending +future instead of sharing a single instance among them all. Sharing one +exception produced malformed tracebacks: each +:meth:`Future.result() ` call re-raised the +same object, appending another copy of the traceback to it.