From db62d5f9f25f9d3013a5efa4ef7edb69ea9bd628 Mon Sep 17 00:00:00 2001 From: stxue1 Date: Fri, 21 Mar 2025 12:31:58 -0700 Subject: [PATCH 1/5] Update to connexion 3 Co-authored-by: Michael R. Crusoe --- mypy-requirements.txt | 1 + pyproject.toml | 2 +- wes_service/arvados_wes.py | 2 +- wes_service/cwl_runner.py | 2 +- wes_service/toil_wes.py | 29 ++++++++++++-- wes_service/util.py | 68 +++++++++++++-------------------- wes_service/wes_service_main.py | 3 +- 7 files changed, 56 insertions(+), 51 deletions(-) diff --git a/mypy-requirements.txt b/mypy-requirements.txt index 4dfacc5..9fbd8f6 100644 --- a/mypy-requirements.txt +++ b/mypy-requirements.txt @@ -3,3 +3,4 @@ types-PyYAML types-requests types-setuptools arvados-cwl-runner +flask diff --git a/pyproject.toml b/pyproject.toml index ed9dffe..778228b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,7 +23,7 @@ classifiers = [ ] requires-python = ">=3.9" dependencies = [ - "connexion[swagger-ui] >= 2.0.2, < 3", + "connexion[swagger-ui] >= 3, < 4", "ruamel.yaml >= 0.15.78", "schema-salad", ] diff --git a/wes_service/arvados_wes.py b/wes_service/arvados_wes.py index e2369cc..7f1ecf6 100644 --- a/wes_service/arvados_wes.py +++ b/wes_service/arvados_wes.py @@ -298,7 +298,7 @@ def RunWorkflow( ) try: - tempdir, body = self.collect_attachments(cr["uuid"]) + tempdir, body = self.collect_attachments(args, cr["uuid"]) workflow_engine_parameters = cast( dict[str, Any], body.get("workflow_engine_parameters", {}) diff --git a/wes_service/cwl_runner.py b/wes_service/cwl_runner.py index 9fc31a6..af3400d 100644 --- a/wes_service/cwl_runner.py +++ b/wes_service/cwl_runner.py @@ -198,7 +198,7 @@ def ListRuns( def RunWorkflow(self, **args: str) -> dict[str, str]: """Submit the workflow run request.""" - tempdir, body = self.collect_attachments() + tempdir, body = self.collect_attachments(args) run_id = uuid.uuid4().hex job = Workflow(run_id) diff --git a/wes_service/toil_wes.py b/wes_service/toil_wes.py index 1461d04..8b1c9ed 100644 --- a/wes_service/toil_wes.py +++ b/wes_service/toil_wes.py @@ -1,3 +1,6 @@ +"""Toil backed for the WES service.""" + +import errno import json import logging import os @@ -294,18 +297,36 @@ def getstate(self) -> tuple[str, int]: logging.info("Workflow " + self.run_id + ": EXECUTOR_ERROR") open(self.staterrorfile, "a").close() return "EXECUTOR_ERROR", 255 + + # get the jobstore + with open(self.jobstorefile, "r") as f: + jobstore = f.read().rstrip() if ( subprocess.run( # nosec B603 [ shutil.which("toil") or "toil", "status", "--failIfNotComplete", - self.jobstorefile, + jobstore, ] ).returncode == 0 ): - completed = True + # Get the PID of the running process + with open(self.pidfile, "r") as f: + pid = int(f.read()) + try: + os.kill(pid, 0) + except OSError as e: + if e.errno == errno.ESRCH: + # Process is no longer running, could be completed + completed = True + # Reap zombie child processes in a non-blocking manner + os.waitpid(pid, os.WNOHANG) + else: + raise + # If no exception, process is still running + # We can't rely on toil status as the process may not have created the jobstore yet if completed: logging.info("Workflow " + self.run_id + ": COMPLETE") open(self.statcompletefile, "a").close() @@ -354,9 +375,9 @@ def ListRuns( workflows = [{"run_id": w.run_id, "state": w.getstate()[0]} for w in wf] # NOQA return {"workflows": workflows, "next_page_token": ""} - def RunWorkflow(self) -> dict[str, str]: + def RunWorkflow(self, **args: str) -> dict[str, str]: """Submit the workflow run request.""" - tempdir, body = self.collect_attachments() + tempdir, body = self.collect_attachments(args) run_id = uuid.uuid4().hex job = ToilWorkflow(run_id) diff --git a/wes_service/util.py b/wes_service/util.py index b710db5..b687490 100644 --- a/wes_service/util.py +++ b/wes_service/util.py @@ -4,7 +4,6 @@ import tempfile from typing import Any, Callable, Optional -import connexion # type: ignore[import-untyped] from werkzeug.utils import secure_filename @@ -49,52 +48,37 @@ def log_for_run(self, run_id: Optional[str], message: str) -> None: logging.info("Workflow %s: %s", run_id, message) def collect_attachments( - self, run_id: Optional[str] = None + self, args: dict[str, Any], run_id: Optional[str] = None ) -> tuple[str, dict[str, str]]: """Stage all attachments to a temporary directory.""" tempdir = tempfile.mkdtemp() body: dict[str, str] = {} has_attachments = False - for k, ls in connexion.request.files.lists(): - try: - for v in ls: - if k == "workflow_attachment": - sp = v.filename.split("/") - fn = [] - for p in sp: - if p not in ("", ".", ".."): - fn.append(secure_filename(p)) - dest = os.path.join(tempdir, *fn) - if not os.path.isdir(os.path.dirname(dest)): - os.makedirs(os.path.dirname(dest)) - self.log_for_run( - run_id, - f"Staging attachment {v.filename!r} to {dest!r}", - ) - v.save(dest) - has_attachments = True - body[k] = ( - "file://%s" % tempdir - ) # Reference to temp working dir. - elif k in ("workflow_params", "tags", "workflow_engine_parameters"): - content = v.read() - body[k] = json.loads(content.decode("utf-8")) - else: - body[k] = v.read().decode() - except Exception as e: - raise ValueError(f"Error reading parameter {k!r}: {e}") from e - for k, ls in connexion.request.form.lists(): - try: - for v in ls: - if not v: - continue - if k in ("workflow_params", "tags", "workflow_engine_parameters"): - body[k] = json.loads(v) - else: - body[k] = v - except Exception as e: - raise ValueError(f"Error reading parameter {k!r}: {e}") from e - + for k, v in args.items(): + if k == "workflow_attachment": + for file in v or []: + sp = file.filename.split("/") + fn = [] + for p in sp: + if p not in ("", ".", ".."): + fn.append(secure_filename(p)) + dest = os.path.join(tempdir, *fn) + if not os.path.isdir(os.path.dirname(dest)): + os.makedirs(os.path.dirname(dest)) + self.log_for_run( + run_id, + f"Staging attachment {file.filename!r} to {dest!r}", + ) + file.save(dest) + has_attachments = True + body["workflow_attachment"] = ( + "file://%s" % tempdir + ) # Reference to temp working dir. + elif k in ("workflow_params", "tags", "workflow_engine_parameters"): + if v is not None: + body[k] = json.loads(v) + else: + body[k] = v if "workflow_url" in body: if ":" not in body["workflow_url"]: if not has_attachments: diff --git a/wes_service/wes_service_main.py b/wes_service/wes_service_main.py index fa81888..6ef50b1 100644 --- a/wes_service/wes_service_main.py +++ b/wes_service/wes_service_main.py @@ -63,7 +63,6 @@ def get_parser() -> argparse.ArgumentParser: help="Example: '--opt runner=cwltoil --opt extra=--logLevel=CRITICAL' " "or '--opt extra=--workDir=/'. Accepts multiple values.", ) - parser.add_argument("--debug", action="store_true", default=False) parser.add_argument("--version", action="store_true", default=False) return parser @@ -78,7 +77,7 @@ def main(argv: list[str] = sys.argv[1:]) -> None: app = setup(args) - app.run(port=args.port, debug=args.debug) + app.run(port=args.port) if __name__ == "__main__": From 0f28c3feb4a45a035cca1489cd9a933fdd25584a Mon Sep 17 00:00:00 2001 From: stxue1 Date: Thu, 3 Apr 2025 16:11:21 -0700 Subject: [PATCH 2/5] Fix client request, fix dependencies, fix a bug in Makefile --- Makefile | 2 +- pyproject.toml | 2 +- test/test_integration.py | 2 -- wes_client/util.py | 17 ++++++++++------- 4 files changed, 12 insertions(+), 11 deletions(-) diff --git a/Makefile b/Makefile index 553667a..5a3b861 100644 --- a/Makefile +++ b/Makefile @@ -150,7 +150,7 @@ test: $(PYSOURCES) FORCE ## testcov : run the wes-service test suite and collect coverage testcov: $(PYSOURCES) - python -m pytest -rsx --cov ${PYTEST_EXTRA} + python -m pytest ${PYTEST_EXTRA} -rsx --cov sloccount.sc: $(PYSOURCES) Makefile sloccount --duplicates --wide --details $^ > $@ diff --git a/pyproject.toml b/pyproject.toml index 778228b..2380d98 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,7 +23,7 @@ classifiers = [ ] requires-python = ">=3.9" dependencies = [ - "connexion[swagger-ui] >= 3, < 4", + "connexion[swagger-ui,flask,uvicorn] >= 3, < 4", "ruamel.yaml >= 0.15.78", "schema-salad", ] diff --git a/test/test_integration.py b/test/test_integration.py index 0d6fd17..5929666 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -246,7 +246,6 @@ def setUp(self) -> None: "--opt", "runner=cwltool", "--port=8080", - "--debug", ] ) time.sleep(5) @@ -304,7 +303,6 @@ def setUp(self) -> None: os.path.abspath("wes_service/wes_service_main.py"), "--backend=wes_service.arvados_wes", "--port=8080", - "--debug", ] ) self.client.auth = { diff --git a/wes_client/util.py b/wes_client/util.py index 3e9672d..caf0410 100644 --- a/wes_client/util.py +++ b/wes_client/util.py @@ -128,7 +128,7 @@ def fixpaths(d: Any) -> None: def build_wes_request( workflow_file: str, json_path: str, attachments: Optional[list[str]] = None -) -> list[tuple[str, Any]]: +) -> tuple[list[tuple[str, Any]], list[tuple[str, Any]]]: """ :param workflow_file: Path to cwl/wdl file. Can be http/https/file. :param json_path: Path to accompanying json file. @@ -157,10 +157,12 @@ def build_wes_request( ("workflow_type_version", wf_version), ] + workflow_attachments = [] + if workflow_file.startswith("file://"): if wfbase is None: wfbase = os.path.dirname(workflow_file[7:]) - parts.append( + workflow_attachments.append( ( "workflow_attachment", (os.path.basename(workflow_file[7:]), open(workflow_file[7:], "rb")), @@ -182,9 +184,9 @@ def build_wes_request( attach_f = urlopen(attachment) # nosec B310 relpath = os.path.basename(attach_f) - parts.append(("workflow_attachment", (relpath, attach_f))) + workflow_attachments.append(("workflow_attachment", (relpath, attach_f))) - return parts + return parts, workflow_attachments def expand_globs(attachments: Optional[Union[list[str], str]]) -> set[str]: @@ -275,11 +277,12 @@ def run( :return: The body of the post result as a dictionary. """ attachments = list(expand_globs(attachments)) - parts = build_wes_request(wf, jsonyaml, attachments) + parts, attachments = build_wes_request(wf, jsonyaml, attachments) postresult = requests.post( # nosec B113 f"{self.proto}://{self.host}/ga4gh/wes/v1/runs", - files=parts, - headers=self.auth, + data=parts, + files=attachments, + # headers=self.auth, ) return wes_response(postresult) From a2179da62f988e153554e8ab8863fd2d8fbe7c70 Mon Sep 17 00:00:00 2001 From: stxue1 Date: Thu, 3 Apr 2025 17:45:48 -0700 Subject: [PATCH 3/5] Bump version --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 2380d98..8a97915 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta" [project] name = "wes-service" -version = "4.0" +version = "5.0" authors = [{name = "GA4GH Containers and Workflows task team", email = "common-workflow-language@googlegroups.com"}] description = "GA4GH Workflow Execution Service reference implementation" classifiers = [ From 712a36fd8ebff863aa3a1f127cbffcd7af4e3cf1 Mon Sep 17 00:00:00 2001 From: stxue1 Date: Wed, 9 Apr 2025 12:12:04 -0700 Subject: [PATCH 4/5] Fix exception handling --- wes_service/toil_wes.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/wes_service/toil_wes.py b/wes_service/toil_wes.py index 8b1c9ed..f6cbde6 100644 --- a/wes_service/toil_wes.py +++ b/wes_service/toil_wes.py @@ -322,7 +322,12 @@ def getstate(self) -> tuple[str, int]: # Process is no longer running, could be completed completed = True # Reap zombie child processes in a non-blocking manner - os.waitpid(pid, os.WNOHANG) + # os.WNOHANG still raises an error if no child processes exist + try: + os.waitpid(pid, os.WNOHANG) + except OSError as e: + if e.errno != errno.ECHILD: + raise else: raise # If no exception, process is still running From 4ae74039319777a950b792ad2a6ea979e5848ac9 Mon Sep 17 00:00:00 2001 From: stxue1 Date: Tue, 15 Apr 2025 18:08:12 -0700 Subject: [PATCH 5/5] Fix type error --- wes_client/util.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/wes_client/util.py b/wes_client/util.py index caf0410..862ec57 100644 --- a/wes_client/util.py +++ b/wes_client/util.py @@ -277,11 +277,11 @@ def run( :return: The body of the post result as a dictionary. """ attachments = list(expand_globs(attachments)) - parts, attachments = build_wes_request(wf, jsonyaml, attachments) + parts, files = build_wes_request(wf, jsonyaml, attachments) postresult = requests.post( # nosec B113 f"{self.proto}://{self.host}/ga4gh/wes/v1/runs", data=parts, - files=attachments, + files=files, # headers=self.auth, ) return wes_response(postresult)