Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import time
import json
import subprocess
import threading
import queue
from concurrent.futures import ThreadPoolExecutor
import concurrent.futures
from datetime import datetime
Expand Down Expand Up @@ -83,6 +85,118 @@
step_exit_fail_called = False
step_exit_pass_called = False


# --- Per-action timeout enforcement (action_timeout shared variable) ----------
# Every action ends up at Action_Handler -> run_function(data_set). To guarantee
# that no single action can hang the whole test run, we run the action on a
# dedicated worker thread and wait at most `action_timeout` seconds for it.
#
# A single, *reused* worker thread is used (not one-per-action) so libraries
# with thread affinity -- notably Playwright's sync API and its greenlets --
# keep running on the same thread across actions. Python cannot force-kill a
# thread that is stuck inside a blocking call, so on timeout we stop waiting,
# abandon the worker (it is a daemon thread, so it never blocks node shutdown),
# spin up a fresh worker for the next action, and report the step as failed.
_DEFAULT_ACTION_TIMEOUT = 300
_action_worker = None
_action_worker_local = threading.local()


class _ActionTimeoutWorker:
"""A single long-lived daemon thread that runs one action at a time."""

def __init__(self):
self._in_q = queue.Queue()
self._out_q = queue.Queue()
self.thread = threading.Thread(
target=self._loop, name="zeuz_action_worker", daemon=True
)
self.thread.start()

def _loop(self):
# Mark this thread so re-entrant (nested) action calls run inline
# instead of submitting back to the worker and deadlocking on it.
_action_worker_local.in_worker = True
while True:
func, args = self._in_q.get()
try:
self._out_q.put(("ok", func(*args)))
except BaseException: # propagate any error to the calling thread
self._out_q.put(("err", sys.exc_info()))

def run(self, func, args, timeout):
"""Run func(*args), waiting at most `timeout` seconds.

Raises TimeoutError if the worker does not finish in time. Re-raises any
exception thrown by func on the calling thread (so existing exception
handling is preserved).
"""
self._in_q.put((func, args))
try:
kind, payload = self._out_q.get(timeout=timeout)
except queue.Empty:
raise TimeoutError()
if kind == "err":
_exc_type, exc_value, exc_tb = payload
raise exc_value.with_traceback(exc_tb)
return payload


def _get_action_timeout():
"""Resolve the `action_timeout` shared variable to seconds (float).

Defaults to 300. A value <= 0 disables the cap. Anything unparsable falls
back to the default.
"""
try:
val = sr.Get_Shared_Variables("action_timeout", log=False)
if val in failed_tag_list or val is None or val == "":
return _DEFAULT_ACTION_TIMEOUT
return float(str(val).strip())
except Exception:
return _DEFAULT_ACTION_TIMEOUT


def _run_action_with_timeout(run_function, data_set):
"""Execute run_function(data_set), enforcing the action_timeout cap.

Returns the action's result, or "zeuz_failed" (with a logged error) if it
exceeds action_timeout. Runs the action inline -- with no timeout wrapper --
when the cap is disabled, during load/performance testing (those manage
their own timeout and run many actions in parallel threads), or for nested
action calls already executing on the worker thread.
"""
sModuleInfo = inspect.currentframe().f_code.co_name + " : " + MODULE_NAME
global _action_worker

timeout = _get_action_timeout()

if (
timeout <= 0
or load_testing
or getattr(CommonUtil, "load_testing", False)
or getattr(_action_worker_local, "in_worker", False)
):
return run_function(data_set)

if _action_worker is None:
_action_worker = _ActionTimeoutWorker()

try:
return _action_worker.run(run_function, (data_set,), timeout)
except TimeoutError:
# The worker is still stuck on the timed-out action. Abandon it (daemon
# thread won't block shutdown) and create a fresh worker for the next
# action so its queues start clean.
_action_worker = None
CommonUtil.ExecLog(
sModuleInfo,
"Action exceeded the action_timeout of %s second(s) and was aborted. "
"Marking the step as failed." % timeout,
3,
)
return "zeuz_failed"

from pathlib import Path
if os.path.exists(Path(__file__).parent.parent.parent.parent / "bypass.json"):
bypass_exist = True
Expand Down Expand Up @@ -2416,7 +2530,7 @@ def Action_Handler(_data_set, action_row, _bypass_bug=True):
time.sleep(pre_sleep)
elif module in CommonUtil.global_sleep and "_all_" in CommonUtil.global_sleep[module]:
time.sleep(CommonUtil.global_sleep[module]["_all_"]["pre"])
result = run_function(data_set) # Execute function, providing all rows in the data set
result = _run_action_with_timeout(run_function, data_set) # Execute action, enforcing action_timeout
if post_sleep:
time.sleep(post_sleep)
elif module in CommonUtil.global_sleep and "_all_" in CommonUtil.global_sleep[module]:
Expand Down
6 changes: 6 additions & 0 deletions Framework/MainDriverApi.py
Original file line number Diff line number Diff line change
Expand Up @@ -999,6 +999,12 @@ def run_test_case(
shared.Set_Shared_Variables("zeuz_attachments_dir", (Path(temp_ini_file).parent/"attachments").__str__())
if not shared.Test_Shared_Variables("element_wait"):
shared.Set_Shared_Variables("element_wait", 10)
# Max seconds any single action may run before it is aborted and the
# step is failed. Configurable as a runtime parameter or via "Set Shared
# Variable"; set to 0 to disable. The Test_Shared_Variables guard keeps a
# user/runtime-supplied value from being overwritten by this default.
if not shared.Test_Shared_Variables("action_timeout"):
shared.Set_Shared_Variables("action_timeout", 300)

_color = "white"
# danger_style = Style(color=_color, blink=False, bold=True)
Expand Down
Loading