Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions Lib/concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -469,17 +469,20 @@ def _terminate_broken(self, cause):
executor._shutdown_thread = True
executor = None

# All pending tasks are to be marked failed with the following
# BrokenProcessPool error
bpe = BrokenProcessPool("A process in the process pool was "
"terminated abruptly while the future was "
"running or pending.")
# All pending tasks are to be marked failed with a
# BrokenProcessPool error, as separate instances to avoid sharing
# a traceback (gh-101267).
cause_tb = None
if cause is not None:
bpe.__cause__ = _RemoteTraceback(
f"\n'''\n{''.join(cause)}'''")
cause_tb = f"\n'''\n{''.join(cause)}'''"

# Mark pending tasks as failed.
for work_id, work_item in self.pending_work_items.items():
bpe = BrokenProcessPool("A process in the process pool was "
"terminated abruptly while the future was "
"running or pending.")
if cause_tb is not None:
bpe.__cause__ = _RemoteTraceback(cause_tb)
try:
work_item.future.set_exception(bpe)
except _base.InvalidStateError:
Expand Down
26 changes: 26 additions & 0 deletions Lib/test/test_concurrent_futures/test_process_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import sys
import threading
import time
import traceback
import unittest
from concurrent import futures
from concurrent.futures.process import BrokenProcessPool
Expand Down Expand Up @@ -43,6 +44,31 @@ def test_killed_child(self):
# Submitting other jobs fails as well.
self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8)

def test_broken_process_pool_traceback(self):
# When a child process is abruptly terminated, the whole pool gets
# "broken", and a BrokenProcessPool exception should be created
# for each future instead of sharing one exception among all futures.
event = self.create_event()
futures = [self.executor.submit(event.wait) for _ in range(3)]
p = next(iter(self.executor._processes.values()))
p.terminate()
for fut in futures:
# Don't use assertRaises(): it clears the traceback off exc.
try:
fut.result()
except BrokenProcessPool as exc:
tb = exc.__traceback__
else:
self.fail("BrokenProcessPool not raised")
count = sum(
1
for frame_summary in traceback.extract_tb(tb)
if frame_summary.filename == __file__
)
# This code file should appear exactly once in the traceback.
# A shared exception would accumulate a frame per result() call.
self.assertEqual(count, 1)

def test_map_chunksize(self):
def bad_map():
list(self.executor.map(pow, range(40), range(40), chunksize=-1))
Expand Down
1 change: 1 addition & 0 deletions Misc/ACKS
Original file line number Diff line number Diff line change
Expand Up @@ -1711,6 +1711,7 @@ Charlie Shepherd
Bruce Sherwood
Gregory Shevchenko
Hai Shi
Daniel Shields
Alexander Shigin
Pete Shinners
Michael Shiplett
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
When a worker process terminates unexpectedly,
:class:`concurrent.futures.ProcessPoolExecutor` now sets a separate
:exc:`~concurrent.futures.process.BrokenProcessPool` exception on each pending
future instead of sharing a single instance among them all. Sharing one
exception produced malformed tracebacks: each
:meth:`Future.result() <concurrent.futures.Future.result>` call re-raised the
same object, appending another copy of the traceback to it.
Loading