From c62f5d279017a6021490badb3390914ec6228a09 Mon Sep 17 00:00:00 2001 From: "d.kovalenko" Date: Sun, 21 Jun 2026 15:18:34 +0300 Subject: [PATCH 1/9] OsOperations::exec_command is refactored (typing) --- src/local_ops.py | 133 ++++++++++++++++++++++++++---------- src/os_ops.py | 39 ++++++++++- src/remote_ops.py | 79 ++++++++++++++------- tests/test_os_ops_common.py | 6 +- 4 files changed, 194 insertions(+), 63 deletions(-) diff --git a/src/local_ops.py b/src/local_ops.py index 52d94dc..ed5db71 100644 --- a/src/local_ops.py +++ b/src/local_ops.py @@ -35,7 +35,7 @@ class LocalOperations(OsOperations): sm_dummy_conn_params = ConnectionParams() - sm_single_instance: OsOperations = None + sm_single_instance: typing.Optional[OsOperations] = None sm_single_instance_guard = threading.Lock() # TODO: make it read-only @@ -103,20 +103,26 @@ def create_clone(self) -> LocalOperations: clone._username = self._username return clone - @staticmethod - def _process_output(encoding, temp_file_path): - """Process the output of a command from a temporary file.""" - with open(temp_file_path, 'rb') as temp_file: - output = temp_file.read() - if encoding: - output = output.decode(encoding) - return output, None # In Windows stderr writing in stdout + _T_RUN_COMMAND__RESULT = typing.Union[ + subprocess.Popen, + typing.Tuple[int, str, typing.Optional[str]], + typing.Tuple[int, bytes, typing.Optional[bytes]], + ] def _run_command__nt( - self, cmd, shell, input, stdin, stdout, stderr, get_process, timeout, encoding, + self, + cmd: OsOperations.T_CMD, + shell, + input, + stdin, + stdout, + stderr, + get_process, + timeout, + encoding, exec_env: typing.Optional[dict], cwd: typing.Optional[str], - ): + ) -> _T_RUN_COMMAND__RESULT: assert exec_env is None or type(exec_env) is dict assert cwd is None or type(cwd) is str @@ -157,21 +163,37 @@ def _run_command__nt( cwd=cwd, **extParams, ) + assert isinstance(process, subprocess.Popen) if get_process: - return process, None, None + return process temp_file_path = temp_file.name # Wait process finished process.wait() - output, error = self._process_output(encoding, temp_file_path) - return process, output, error + # Process the output of a command from a temporary file. + # In Windows stderr writing in stdout + with open(temp_file_path, 'rb') as temp_file: + output = temp_file.read() + if encoding: + return process.returncode, output.decode(encoding), None + + return process.returncode, output, None def _run_command__generic( - self, cmd, shell, input, stdin, stdout, stderr, get_process, timeout, encoding, + self, + cmd: OsOperations.T_CMD, + shell, + input, + stdin, + stdout, + stderr, + get_process, + timeout, + encoding, exec_env: typing.Optional[dict], cwd: typing.Optional[str], - ): + ) -> _T_RUN_COMMAND__RESULT: assert exec_env is None or type(exec_env) is dict assert cwd is None or type(cwd) is str @@ -214,8 +236,9 @@ def _run_command__generic( **extParams ) assert process is not None + assert isinstance(process, subprocess.Popen) if get_process: - return process, None, None + return process try: output, error = process.communicate(input=input_prepared, timeout=timeout) except subprocess.TimeoutExpired: @@ -228,13 +251,22 @@ def _run_command__generic( if encoding: output = output.decode(encoding) error = error.decode(encoding) - return process, output, error + return process.returncode, output, error def _run_command( - self, cmd, shell, input, stdin, stdout, stderr, get_process, timeout, encoding, + self, + cmd: OsOperations.T_CMD, + shell, + input, + stdin, + stdout, + stderr, + get_process, + timeout, + encoding, exec_env: typing.Optional[dict], cwd: typing.Optional[str], - ): + ) -> _T_RUN_COMMAND__RESULT: """Execute a command and return the process and its output.""" assert exec_env is None or type(exec_env) is dict @@ -248,12 +280,24 @@ def _run_command( return method(self, cmd, shell, input, stdin, stdout, stderr, get_process, timeout, encoding, exec_env, cwd) def exec_command( - self, cmd, wait_exit=False, verbose=False, expect_error=False, encoding=None, shell=False, - text=False, input=None, stdin=None, stdout=None, stderr=None, get_process=False, timeout=None, + self, + cmd: OsOperations.T_CMD, + wait_exit=False, + verbose=False, + expect_error=False, + encoding: typing.Optional[str] = None, + shell=False, + text=False, + input=None, + stdin=None, + stdout=None, + stderr=None, + get_process=False, + timeout=None, ignore_errors=False, exec_env: typing.Optional[dict] = None, cwd: typing.Optional[str] = None - ): + ) -> OsOperations.T_EXEC_COMMAND_RESULT: """ Execute a command in a subprocess and handle the output based on the provided parameters. """ @@ -262,37 +306,52 @@ def exec_command( assert exec_env is None or type(exec_env) is dict assert cwd is None or type(cwd) is str - process, output, error = self._run_command( - cmd, shell, input, stdin, stdout, stderr, get_process, timeout, encoding, - exec_env, - cwd + run_r = self._run_command( + cmd=cmd, + shell=shell, + input=input, + stdin=stdin, + stdout=stdout, + stderr=stderr, + get_process=get_process, + timeout=timeout, + encoding=encoding, + exec_env=exec_env, + cwd=cwd, ) if get_process: - return process + assert isinstance(run_r, subprocess.Popen) + return run_r + + assert type(run_r) is tuple + assert len(run_r) == 3 + assert type(run_r[0]) is int + assert type(run_r[1]) is not None if expect_error: - if process.returncode == 0: + if run_r[0] == 0: raise InvalidOperationException("We expected an execution error.") elif ignore_errors: pass - elif process.returncode == 0: + elif run_r[0] == 0: pass else: assert not expect_error assert not ignore_errors - assert process.returncode != 0 + assert run_r[0] != 0 + RaiseError.UtilityExitedWithNonZeroCode( cmd=cmd, - exit_code=process.returncode, - msg_arg=error or output, - error=error, - out=output) + exit_code=run_r[0], + msg_arg=run_r[2] or run_r[1], + error=run_r[2], + out=run_r[1]) if verbose: - return process.returncode, output, error + return run_r - return output + return run_r[1] def build_path(self, a: str, *parts: str) -> str: assert a is not None diff --git a/src/os_ops.py b/src/os_ops.py index 09d02d2..d3e13ed 100644 --- a/src/os_ops.py +++ b/src/os_ops.py @@ -3,6 +3,7 @@ import locale import typing import signal as os_signal +import subprocess class ConnectionParams: @@ -42,7 +43,43 @@ def create_clone(self) -> OsOperations: raise NotImplementedError() # Command execution - def exec_command(self, cmd, **kwargs): + T_CMD = typing.Union[str, typing.List[str]] + T_EXEC_COMMAND_RESULT = typing.Union[ + subprocess.Popen, + str, + bytes, + typing.Tuple[int, str, typing.Optional[str]], + typing.Tuple[int, bytes, typing.Optional[bytes]], + ] + + def exec_command( + self, + cmd: T_CMD, + wait_exit=False, + verbose=False, + expect_error=False, + encoding: typing.Optional[str] = None, + shell=False, + text=False, + input=None, + stdin=None, + stdout=None, + stderr=None, + get_process=False, + timeout=None, + ignore_errors=False, + exec_env: typing.Optional[dict] = None, + cwd: typing.Optional[str] = None + ) -> T_EXEC_COMMAND_RESULT: + assert type(cmd) is str or type(cmd) is list + assert type(verbose) is bool + assert type(expect_error) is bool + assert encoding is None or type(encoding) is str + assert type(wait_exit) is bool + assert type(get_process) is bool + assert type(ignore_errors) is bool + assert exec_env is None or type(exec_env) is dict + assert cwd is None or type(cwd) is dict raise NotImplementedError() def build_path(self, a: str, *parts: str) -> str: diff --git a/src/remote_ops.py b/src/remote_ops.py index a9f0164..6e8a225 100644 --- a/src/remote_ops.py +++ b/src/remote_ops.py @@ -116,12 +116,24 @@ def create_clone(self) -> RemoteOperations: return clone def exec_command( - self, cmd, wait_exit=False, verbose=False, expect_error=False, - encoding=None, shell=True, text=False, input=None, stdin=None, stdout=None, - stderr=None, get_process=None, timeout=None, ignore_errors=False, + self, + cmd: OsOperations.T_CMD, + wait_exit=False, + verbose=False, + expect_error=False, + encoding: typing.Optional[str] = None, + shell=True, + text=False, + input=None, + stdin=None, + stdout=None, + stderr=None, + get_process=None, + timeout=None, + ignore_errors=False, exec_env: typing.Optional[dict] = None, cwd: typing.Optional[str] = None - ): + ) -> OsOperations.T_EXEC_COMMAND_RESULT: """ Execute a command in the SSH session. Args: @@ -156,6 +168,7 @@ def exec_command( process = subprocess.Popen(ssh_cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) assert process is not None + assert isinstance(process, subprocess.Popen) if get_process: return process @@ -169,31 +182,35 @@ def exec_command( assert type(error) is bytes if encoding: - output = output.decode(encoding) - error = error.decode(encoding) + run_r = process.returncode, output.decode(encoding), error.decode(encoding) + else: + run_r = process.returncode, output, error + + assert type(run_r[0]) is int + assert type(run_r[1]) is type(run_r[2]) if expect_error: - if process.returncode == 0: + if run_r[0] == 0: raise InvalidOperationException("We expected an execution error.") elif ignore_errors: pass - elif process.returncode == 0: + elif run_r[0] == 0: pass else: assert not expect_error assert not ignore_errors - assert process.returncode != 0 + assert run_r[0] != 0 RaiseError.UtilityExitedWithNonZeroCode( cmd=cmd, - exit_code=process.returncode, + exit_code=run_r[0], msg_arg=error, - error=error, - out=output) + error=run_r[2], + out=run_r[1]) if verbose: - return process.returncode, output, error + return run_r - return output + return run_r[1] def build_path(self, a: str, *parts: str) -> str: assert a is not None @@ -210,11 +227,15 @@ def environ(self, var_name: str) -> str: - var_name (str): The name of the environment variable. """ cmd = "echo ${}".format(var_name) - return self.exec_command(cmd, encoding=get_default_encoding()).strip() + stdout = self.exec_command(cmd, encoding=get_default_encoding()) + assert type(stdout) is str + return stdout.strip() def cwd(self): cmd = 'pwd' - return self.exec_command(cmd, encoding=get_default_encoding()).rstrip() + stdout = self.exec_command(cmd, encoding=get_default_encoding()) + assert type(stdout) is str + return stdout.rstrip() def find_executable(self, executable): search_paths = self.environ("PATH") @@ -267,7 +288,9 @@ def set_env(self, var_name: str, var_val: str): def get_name(self): cmd = 'python3 -c "import os; print(os.name)"' - return self.exec_command(cmd, encoding=get_default_encoding()).strip() + stdout = self.exec_command(cmd, encoding=get_default_encoding()) + assert type(stdout) is str + return stdout.strip() # Work with dirs def makedirs(self, path, remove_existing=False): @@ -604,14 +627,17 @@ def read_binary(self, filename, offset): return r def isfile(self, remote_file): - stdout = self.exec_command("test -f {}; echo $?".format(remote_file)) + cmd = "test -f {}; echo $?".format(remote_file) + stdout = self.exec_command(cmd) + assert type(stdout) is bytes result = int(stdout.strip()) return result == 0 def isdir(self, dirname): cmd = "if [ -d {} ]; then echo True; else echo False; fi".format(dirname) - response = self.exec_command(cmd) - return response.strip() == b"True" + stdout = self.exec_command(cmd) + assert type(stdout) is bytes + return stdout.strip() == b"True" def get_file_size(self, filename): C_ERR_SRC = "RemoteOpertions::get_file_size" @@ -692,7 +718,9 @@ def kill(self, pid: int, signal: typing.Union[int, os_signal.Signals]): def get_pid(self): # Get current process id - return int(self.exec_command("echo $$", encoding=get_default_encoding())) + x = self.exec_command("echo $$", encoding=get_default_encoding()) + assert type(x) is str + return int(x) def get_process_children(self, pid): assert type(pid) is int @@ -792,7 +820,10 @@ def is_abs_path(self, path: str) -> bool: return posixpath.isabs(path) @staticmethod - def _build_cmdline(cmd, exec_env: typing.Dict = None) -> str: + def _build_cmdline( + cmd, + exec_env: typing.Optional[typing.Dict] = None, + ) -> str: cmd_items = __class__._create_exec_env_list(exec_env) assert type(cmd_items) is list @@ -813,7 +844,9 @@ def _ensure_cmdline(cmd) -> str: raise ValueError("Invalid 'cmd' argument type - {0}".format(type(cmd).__name__)) @staticmethod - def _create_exec_env_list(exec_env: typing.Dict) -> typing.List[str]: + def _create_exec_env_list( + exec_env: typing.Optional[typing.Dict], + ) -> typing.List[str]: env: typing.Dict[str, str] = dict() # ---------------------------------- SYSTEM ENV diff --git a/tests/test_os_ops_common.py b/tests/test_os_ops_common.py index 14786f3..fb65fae 100644 --- a/tests/test_os_ops_common.py +++ b/tests/test_os_ops_common.py @@ -74,7 +74,7 @@ def test_exec_command_success(self, os_ops: OsOperations): cmd = ["sh", "-c", "python3 --version"] response = os_ops.exec_command(cmd) - + assert type(response) is bytes assert b'Python 3.' in response def test_exec_command_failure(self, os_ops: OsOperations): @@ -273,7 +273,9 @@ def test_makedirs_and_rmdirs_success(self, os_ops: OsOperations): RunConditions.skip_if_windows() cmd = "pwd" - pwd = os_ops.exec_command(cmd, wait_exit=True, encoding='utf-8').strip() + stdout = os_ops.exec_command(cmd, encoding='utf-8') + assert type(stdout) is str + pwd = stdout.strip() path = "{}/test_dir".format(pwd) From 48d10c9054fd98ea8deab4eb57b2681217c4f8b3 Mon Sep 17 00:00:00 2001 From: "d.kovalenko" Date: Sun, 21 Jun 2026 16:52:41 +0300 Subject: [PATCH 2/9] revert: LocalOperations::sm_single_instance --- src/local_ops.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/local_ops.py b/src/local_ops.py index ed5db71..583da34 100644 --- a/src/local_ops.py +++ b/src/local_ops.py @@ -35,7 +35,7 @@ class LocalOperations(OsOperations): sm_dummy_conn_params = ConnectionParams() - sm_single_instance: typing.Optional[OsOperations] = None + sm_single_instance: OsOperations = None sm_single_instance_guard = threading.Lock() # TODO: make it read-only From b8af8fd75c68dea625c3097bbbf438ebe08c1530 Mon Sep 17 00:00:00 2001 From: "d.kovalenko" Date: Sun, 21 Jun 2026 17:17:09 +0300 Subject: [PATCH 3/9] RemoteOperations::is_executable is updated --- src/remote_ops.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/src/remote_ops.py b/src/remote_ops.py index 6e8a225..5f61e85 100644 --- a/src/remote_ops.py +++ b/src/remote_ops.py @@ -254,8 +254,19 @@ def is_executable(self, file): # Check if the file is executable command = ["test", "-x", file] - exit_status, output, error = self.exec_command(cmd=command, encoding=get_default_encoding(), ignore_errors=True, verbose=True) + exec_r = self.exec_command( + cmd=command, + encoding=get_default_encoding(), + ignore_errors=True, + verbose=True, + ) + + assert type(exec_r) is tuple + assert len(exec_r) == 3 + exit_status, output, error = exec_r + + assert type(exit_status) is int assert type(output) is str assert type(error) is str @@ -267,7 +278,8 @@ def is_executable(self, file): errMsg = "Test operation returns an unknown result code: {0}. File name is [{1}].".format( exit_status, - file) + file, + ) RaiseError.CommandExecutionError( cmd=command, From 220a7bcfbb30a4a284b26e95c64635f9aec13af9 Mon Sep 17 00:00:00 2001 From: "d.kovalenko" Date: Sun, 21 Jun 2026 17:18:18 +0300 Subject: [PATCH 4/9] RemoteOperations::makedirs is updated --- src/remote_ops.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/remote_ops.py b/src/remote_ops.py index 5f61e85..24385f0 100644 --- a/src/remote_ops.py +++ b/src/remote_ops.py @@ -317,11 +317,11 @@ def makedirs(self, path, remove_existing=False): else: cmd = "mkdir -p {}".format(path) try: - exit_status, result, error = self.exec_command(cmd, verbose=True) + result = self.exec_command(cmd) except ExecUtilException as e: raise Exception("Couldn't create dir {} because of error {}".format(path, e.message)) - if exit_status != 0: - raise Exception("Couldn't create dir {} because of error {}".format(path, error)) + + assert type(result) is bytes return result def makedir(self, path: str): From 758671caa321911b020716529f1581c58ad6a930 Mon Sep 17 00:00:00 2001 From: "d.kovalenko" Date: Sun, 21 Jun 2026 17:21:18 +0300 Subject: [PATCH 5/9] RemoteOperations::path_exists is updated --- src/remote_ops.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/remote_ops.py b/src/remote_ops.py index 24385f0..2fc7508 100644 --- a/src/remote_ops.py +++ b/src/remote_ops.py @@ -390,8 +390,19 @@ def listdir(self, path): def path_exists(self, path): command = ["test", "-e", path] - exit_status, output, error = self.exec_command(cmd=command, encoding=get_default_encoding(), ignore_errors=True, verbose=True) + exec_r = self.exec_command( + cmd=command, + encoding=get_default_encoding(), + ignore_errors=True, + verbose=True, + ) + assert type(exec_r) is tuple + assert len(exec_r) == 3 + + exit_status, output, error = exec_r + + assert type(exit_status) is int assert type(output) is str assert type(error) is str From c23e792ed77096ccc80d573b4142441c6904d53d Mon Sep 17 00:00:00 2001 From: "d.kovalenko" Date: Sun, 21 Jun 2026 17:26:41 +0300 Subject: [PATCH 6/9] RemoteOperations::mkdtemp is updated --- src/remote_ops.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/remote_ops.py b/src/remote_ops.py index 2fc7508..f34033b 100644 --- a/src/remote_ops.py +++ b/src/remote_ops.py @@ -446,7 +446,12 @@ def mkdtemp(self, prefix=None): else: command = ["mktemp", "-d"] - exec_exitcode, exec_output, exec_error = self.exec_command(command, verbose=True, encoding=get_default_encoding(), ignore_errors=True) + exec_r = self.exec_command(command, verbose=True, encoding=get_default_encoding(), ignore_errors=True) + + assert type(exec_r) is tuple + assert len(exec_r) == 3 + + exec_exitcode, exec_output, exec_error = exec_r assert type(exec_exitcode) is int assert type(exec_output) is str From c9ef7938c5d2bd1b19bc29a84faa56ab93c3cb6b Mon Sep 17 00:00:00 2001 From: "d.kovalenko" Date: Sun, 21 Jun 2026 17:27:45 +0300 Subject: [PATCH 7/9] RemoteOperations::mkstemp is updated --- src/remote_ops.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/remote_ops.py b/src/remote_ops.py index f34033b..26da4f7 100644 --- a/src/remote_ops.py +++ b/src/remote_ops.py @@ -479,7 +479,17 @@ def mkstemp(self, prefix=None): else: command = ["mktemp"] - exec_exitcode, exec_output, exec_error = self.exec_command(command, verbose=True, encoding=get_default_encoding(), ignore_errors=True) + exec_r = self.exec_command( + command, + verbose=True, + encoding=get_default_encoding(), + ignore_errors=True, + ) + + assert type(exec_r) is tuple + assert len(exec_r) == 3 + + exec_exitcode, exec_output, exec_error = exec_r assert type(exec_exitcode) is int assert type(exec_output) is str From 67a84b5562cb00c3230609d9ca86140c4c1f7d03 Mon Sep 17 00:00:00 2001 From: "d.kovalenko" Date: Sun, 21 Jun 2026 17:40:34 +0300 Subject: [PATCH 8/9] RemoteOperations::get_tempdir is updated --- src/remote_ops.py | 9 +++++++-- tests/test_os_ops_common.py | 4 ++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/src/remote_ops.py b/src/remote_ops.py index 26da4f7..a524bda 100644 --- a/src/remote_ops.py +++ b/src/remote_ops.py @@ -824,13 +824,18 @@ def is_port_free(self, number: int) -> bool: def get_tempdir(self) -> str: command = ["mktemp", "-u", "-d"] - exec_exitcode, exec_output, exec_error = self.exec_command( + exec_r = self.exec_command( command, verbose=True, encoding=get_default_encoding(), - ignore_errors=True + ignore_errors=True, ) + assert type(exec_r) is tuple + assert len(exec_r) == 3 + + exec_exitcode, exec_output, exec_error = exec_r + assert type(exec_exitcode) is int assert type(exec_output) is str assert type(exec_error) is str diff --git a/tests/test_os_ops_common.py b/tests/test_os_ops_common.py index fb65fae..7e62758 100644 --- a/tests/test_os_ops_common.py +++ b/tests/test_os_ops_common.py @@ -900,7 +900,7 @@ def LOCAL_server(s: socket.socket): if ok_count == 0: raise RuntimeError("No one free port was found.") - def test_get_tmpdir(self, os_ops: OsOperations): + def test_get_tempdir(self, os_ops: OsOperations): assert isinstance(os_ops, OsOperations) dir = os_ops.get_tempdir() @@ -924,7 +924,7 @@ def test_get_tmpdir(self, os_ops: OsOperations): assert not os_ops.path_exists(file_path) assert not os.path.exists(file_path) - def test_get_tmpdir__compare_with_py_info(self, os_ops: OsOperations): + def test_get_tempdir__compare_with_py_info(self, os_ops: OsOperations): assert isinstance(os_ops, OsOperations) actual_dir = os_ops.get_tempdir() From 8b1d080b3021cd272d60e574352376f164d48751 Mon Sep 17 00:00:00 2001 From: "d.kovalenko" Date: Sun, 21 Jun 2026 17:41:22 +0300 Subject: [PATCH 9/9] RemoteOperations::is_port_free is updated --- src/remote_ops.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/remote_ops.py b/src/remote_ops.py index a524bda..a05e808 100644 --- a/src/remote_ops.py +++ b/src/remote_ops.py @@ -796,13 +796,18 @@ def is_port_free(self, number: int) -> bool: grep_cmd_s, ] - exit_status, output, error = self.exec_command( + exec_r = self.exec_command( cmd=cmd, encoding=get_default_encoding(), ignore_errors=True, - verbose=True + verbose=True, ) + assert type(exec_r) is tuple + assert len(exec_r) == 3 + + exit_status, output, error = exec_r + # grep exit 0 -> port is busy if exit_status == 0: return False