Skip to content
131 changes: 95 additions & 36 deletions src/local_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,20 +103,26 @@ def create_clone(self) -> LocalOperations:
clone._username = self._username
return clone

@staticmethod
def _process_output(encoding, temp_file_path):
"""Process the output of a command from a temporary file."""
with open(temp_file_path, 'rb') as temp_file:
output = temp_file.read()
if encoding:
output = output.decode(encoding)
return output, None # In Windows stderr writing in stdout
_T_RUN_COMMAND__RESULT = typing.Union[
subprocess.Popen,
typing.Tuple[int, str, typing.Optional[str]],
typing.Tuple[int, bytes, typing.Optional[bytes]],
]

def _run_command__nt(
self, cmd, shell, input, stdin, stdout, stderr, get_process, timeout, encoding,
self,
cmd: OsOperations.T_CMD,
shell,
input,
stdin,
stdout,
stderr,
get_process,
timeout,
encoding,
exec_env: typing.Optional[dict],
cwd: typing.Optional[str],
):
) -> _T_RUN_COMMAND__RESULT:
assert exec_env is None or type(exec_env) is dict
assert cwd is None or type(cwd) is str

Expand Down Expand Up @@ -157,21 +163,37 @@ def _run_command__nt(
cwd=cwd,
**extParams,
)
assert isinstance(process, subprocess.Popen)
if get_process:
return process, None, None
return process
temp_file_path = temp_file.name

# Wait process finished
process.wait()

output, error = self._process_output(encoding, temp_file_path)
return process, output, error
# Process the output of a command from a temporary file.
# In Windows stderr writing in stdout
with open(temp_file_path, 'rb') as temp_file:
output = temp_file.read()
if encoding:
return process.returncode, output.decode(encoding), None

return process.returncode, output, None

def _run_command__generic(
self, cmd, shell, input, stdin, stdout, stderr, get_process, timeout, encoding,
self,
cmd: OsOperations.T_CMD,
shell,
input,
stdin,
stdout,
stderr,
get_process,
timeout,
encoding,
exec_env: typing.Optional[dict],
cwd: typing.Optional[str],
):
) -> _T_RUN_COMMAND__RESULT:
assert exec_env is None or type(exec_env) is dict
assert cwd is None or type(cwd) is str

Expand Down Expand Up @@ -214,8 +236,9 @@ def _run_command__generic(
**extParams
)
assert process is not None
assert isinstance(process, subprocess.Popen)
if get_process:
return process, None, None
return process
try:
output, error = process.communicate(input=input_prepared, timeout=timeout)
except subprocess.TimeoutExpired:
Expand All @@ -228,13 +251,22 @@ def _run_command__generic(
if encoding:
output = output.decode(encoding)
error = error.decode(encoding)
return process, output, error
return process.returncode, output, error

def _run_command(
self, cmd, shell, input, stdin, stdout, stderr, get_process, timeout, encoding,
self,
cmd: OsOperations.T_CMD,
shell,
input,
stdin,
stdout,
stderr,
get_process,
timeout,
encoding,
exec_env: typing.Optional[dict],
cwd: typing.Optional[str],
):
) -> _T_RUN_COMMAND__RESULT:
"""Execute a command and return the process and its output."""

assert exec_env is None or type(exec_env) is dict
Expand All @@ -248,12 +280,24 @@ def _run_command(
return method(self, cmd, shell, input, stdin, stdout, stderr, get_process, timeout, encoding, exec_env, cwd)

def exec_command(
self, cmd, wait_exit=False, verbose=False, expect_error=False, encoding=None, shell=False,
text=False, input=None, stdin=None, stdout=None, stderr=None, get_process=False, timeout=None,
self,
cmd: OsOperations.T_CMD,
wait_exit=False,
verbose=False,
expect_error=False,
encoding: typing.Optional[str] = None,
shell=False,
text=False,
input=None,
stdin=None,
stdout=None,
stderr=None,
get_process=False,
timeout=None,
ignore_errors=False,
exec_env: typing.Optional[dict] = None,
cwd: typing.Optional[str] = None
):
) -> OsOperations.T_EXEC_COMMAND_RESULT:
"""
Execute a command in a subprocess and handle the output based on the provided parameters.
"""
Expand All @@ -262,37 +306,52 @@ def exec_command(
assert exec_env is None or type(exec_env) is dict
assert cwd is None or type(cwd) is str

process, output, error = self._run_command(
cmd, shell, input, stdin, stdout, stderr, get_process, timeout, encoding,
exec_env,
cwd
run_r = self._run_command(
cmd=cmd,
shell=shell,
input=input,
stdin=stdin,
stdout=stdout,
stderr=stderr,
get_process=get_process,
timeout=timeout,
encoding=encoding,
exec_env=exec_env,
cwd=cwd,
)

if get_process:
return process
assert isinstance(run_r, subprocess.Popen)
return run_r

assert type(run_r) is tuple
assert len(run_r) == 3
assert type(run_r[0]) is int
assert type(run_r[1]) is not None

if expect_error:
if process.returncode == 0:
if run_r[0] == 0:
raise InvalidOperationException("We expected an execution error.")
elif ignore_errors:
pass
elif process.returncode == 0:
elif run_r[0] == 0:
pass
else:
assert not expect_error
assert not ignore_errors
assert process.returncode != 0
assert run_r[0] != 0

RaiseError.UtilityExitedWithNonZeroCode(
cmd=cmd,
exit_code=process.returncode,
msg_arg=error or output,
error=error,
out=output)
exit_code=run_r[0],
msg_arg=run_r[2] or run_r[1],
error=run_r[2],
out=run_r[1])

if verbose:
return process.returncode, output, error
return run_r

return output
return run_r[1]

def build_path(self, a: str, *parts: str) -> str:
assert a is not None
Expand Down
39 changes: 38 additions & 1 deletion src/os_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import locale
import typing
import signal as os_signal
import subprocess


class ConnectionParams:
Expand Down Expand Up @@ -42,7 +43,43 @@ def create_clone(self) -> OsOperations:
raise NotImplementedError()

# Command execution
def exec_command(self, cmd, **kwargs):
T_CMD = typing.Union[str, typing.List[str]]
T_EXEC_COMMAND_RESULT = typing.Union[
subprocess.Popen,
str,
bytes,
typing.Tuple[int, str, typing.Optional[str]],
typing.Tuple[int, bytes, typing.Optional[bytes]],
]

def exec_command(
self,
cmd: T_CMD,
wait_exit=False,
verbose=False,
expect_error=False,
encoding: typing.Optional[str] = None,
shell=False,
text=False,
input=None,
stdin=None,
stdout=None,
stderr=None,
get_process=False,
timeout=None,
ignore_errors=False,
exec_env: typing.Optional[dict] = None,
cwd: typing.Optional[str] = None
) -> T_EXEC_COMMAND_RESULT:
assert type(cmd) is str or type(cmd) is list
assert type(verbose) is bool
assert type(expect_error) is bool
assert encoding is None or type(encoding) is str
assert type(wait_exit) is bool
assert type(get_process) is bool
assert type(ignore_errors) is bool
assert exec_env is None or type(exec_env) is dict
assert cwd is None or type(cwd) is dict
raise NotImplementedError()

def build_path(self, a: str, *parts: str) -> str:
Expand Down
Loading