Source code for ewokscore.scripttask
import os
import sys
import subprocess
from .task import Task
SCRIPT_ARGUMENT = "_script"
WIN32 = sys.platform == "win32"
[docs]
class ScriptExecutorTask(
Task,
input_names=[SCRIPT_ARGUMENT],
optional_input_names=["_capture_output", "_merge_err", "_raise_on_error"],
output_names=["return_code", "out", "err"],
):
SCRIPT_ARGUMENT = SCRIPT_ARGUMENT
[docs]
def run(self):
fullname = self.inputs._script
if not isinstance(fullname, str):
raise TypeError(fullname, type(fullname))
# Python or shell script
is_python = fullname.endswith(".py")
# Is script executable?
if os.path.isfile(fullname):
# existing python or shell script
fullname = os.path.abspath(fullname)
if WIN32:
is_executable = not is_python
else:
with open(fullname, "r") as f:
is_executable = f.readline().startswith("#!")
else:
# command (although it could be a script that does not exist)
is_executable = True
fullname = fullname.split(" ")
# Select executable when fullname itself is not executable
executable = None
if not is_executable:
if is_python:
executable = sys.executable
elif not WIN32:
executable = "bash"
# Command starts with "[executable] fullname ..."
cmd = []
if executable:
cmd.append(executable)
if isinstance(fullname, str):
cmd.append(fullname)
else:
cmd.extend(fullname)
# Script/command arguments
if is_python:
# Use full parameter name
argmarker = "--"
else:
# Use getopts-style parameter parsing by the script
argmarker = "-"
skip = self.input_names()
for k, v in self.get_input_values().items():
if k not in skip:
cmd.extend((argmarker + k, str(v)))
# Run
stdout = stderr = None
if self.inputs._capture_output:
stdout = subprocess.PIPE
if self.inputs._merge_err:
stderr = subprocess.STDOUT
else:
stderr = subprocess.PIPE
result = subprocess.run(cmd, cwd=os.getcwd(), stdout=stdout, stderr=stderr)
if self.inputs._raise_on_error:
result.check_returncode()
self.outputs.return_code = result.returncode
if result.stdout:
self.outputs.out = result.stdout.decode()
elif self.inputs._capture_output:
self.outputs.out = ""
else:
self.outputs.out = None
if result.stderr:
self.outputs.err = result.stderr.decode()
elif self.inputs._capture_output and not self.inputs._merge_err:
self.outputs.err = ""
else:
self.outputs.err = None