Skip to content

Commit 5521eef

Browse files
committed
fix: syntax sugar for python op template and task
Signed-off-by: zjgemi <liuxin_zijian@163.com>
1 parent 955db41 commit 5521eef

File tree

6 files changed

+55
-63
lines changed

6 files changed

+55
-63
lines changed

examples/test_dag.py

+4-6
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from dflow import (DAG, InputArtifact, InputParameter, OutputArtifact,
1+
from dflow import (InputArtifact, InputParameter, OutputArtifact,
22
OutputParameter, ShellOPTemplate, Task, Workflow)
33

44
if __name__ == "__main__":
@@ -21,14 +21,12 @@
2121
duplicate.inputs.artifacts = {"foo": InputArtifact(path="/tmp/foo.txt")}
2222
duplicate.outputs.artifacts = {"bar": OutputArtifact(path="/tmp/bar.txt")}
2323

24-
dag = DAG()
24+
wf = Workflow(name="dag")
2525
hello0 = Task(name="hello0", template=hello)
26-
dag.add(hello0)
26+
wf.add(hello0)
2727
hello1 = Task(name="hello1",
2828
template=duplicate,
2929
parameters={"msg": hello0.outputs.parameters["msg"]},
3030
artifacts={"foo": hello0.outputs.artifacts["bar"]})
31-
dag.add(hello1)
32-
33-
wf = Workflow(name="dag", dag=dag)
31+
wf.add(hello1)
3432
wf.submit()

src/dflow/plugins/datasets.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ def render(self, template: PythonOPTemplate, name: str
200200
script += " && rclone mount %s@%s: /launching/%s" % (
201201
self.element, self.version, name)
202202
template.sidecars.append(V1alpha1UserContainer(
203-
name="rclone-%s" % name,
203+
name="rclone-%s" % name.replace("_", "-"),
204204
image=self.rclone_image,
205205
image_pull_policy=self.rclone_image_pull_policy,
206206
command=["sh", "-c"],

src/dflow/python/op.py

+22-2
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,16 @@
66
import logging
77
import os
88
from abc import ABC
9+
from functools import partial
910
from pathlib import Path
1011
from typing import Dict, List, Set, Union
1112

1213
from typeguard import check_type
1314

1415
from ..argo_objects import ArgoObjectDict
1516
from ..config import config
16-
from ..utils import get_key, s3_config
17+
from ..context_syntax import GLOBAL_CONTEXT
18+
from ..utils import get_key, randstr, s3_config
1719
from .opio import (OPIO, Artifact, BigParameter, OPIOSign, Parameter,
1820
type_to_str)
1921

@@ -181,7 +183,9 @@ def _check_signature(
181183
check_type(ii, io, ss)
182184

183185
@classmethod
184-
def function(cls, func):
186+
def function(cls, func=None, **kwargs):
187+
if func is None:
188+
return partial(cls.function, **kwargs)
185189
signature = func.__annotations__
186190
return_type = signature.get('return', None)
187191

@@ -220,6 +224,22 @@ def execute(self, op_in):
220224
return op_out
221225

222226
def __call__(self, **op_in):
227+
if GLOBAL_CONTEXT.in_context:
228+
from .python_op_template import PythonOPTemplate
229+
from ..task import Task
230+
input_sign = self.get_input_sign()
231+
parameters = {k: v for k, v in op_in.items()
232+
if not isinstance(input_sign[k], Artifact)}
233+
artifacts = {k: v for k, v in op_in.items()
234+
if isinstance(input_sign[k], Artifact)}
235+
name = func.__name__.lower().replace("_", "-") + "-" + \
236+
randstr()
237+
task = Task(name,
238+
template=PythonOPTemplate(self, **kwargs),
239+
parameters=parameters, artifacts=artifacts)
240+
op_out = {**task.outputs.parameters,
241+
**task.outputs.artifacts}
242+
return op_out
223243
return self.execute(op_in)
224244

225245
subclass.func = func

src/dflow/python/python_op_template.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -212,8 +212,8 @@ def __init__(self,
212212
for name, global_name in output_artifact_global_name.items():
213213
output_sign[name].global_name = global_name
214214
super().__init__(
215-
name="%s-%s" % (class_name, randstr()), inputs=Inputs(),
216-
outputs=Outputs(), volumes=volumes, mounts=mounts,
215+
name="%s-%s" % (class_name.replace("_", "-"), randstr()),
216+
inputs=Inputs(), outputs=Outputs(), volumes=volumes, mounts=mounts,
217217
requests=requests, limits=limits, envs=envs,
218218
init_containers=init_containers, sidecars=sidecars)
219219
self.pre_script = pre_script

src/dflow/workflow.py

+8-1
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ def __init__(
155155
assert isinstance(dag, DAG)
156156
self.entrypoint = dag
157157
else:
158-
self.entrypoint = Steps(self.name + "-steps")
158+
self.entrypoint = None
159159
self.templates = {}
160160
self.argo_templates = {}
161161
self.pvcs = {}
@@ -180,6 +180,7 @@ def __enter__(self) -> 'Workflow':
180180

181181
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
182182
GLOBAL_CONTEXT.in_context = False
183+
self.submit()
183184

184185
def add(
185186
self,
@@ -192,6 +193,12 @@ def add(
192193
step: a step or a list of parallel steps to be added to the
193194
entrypoint of the workflow
194195
"""
196+
if self.entrypoint is None:
197+
if isinstance(step, Task) or (isinstance(step, list) and all(
198+
[isinstance(s, Task) for s in step])):
199+
self.entrypoint = DAG(self.name + "-dag")
200+
else:
201+
self.entrypoint = Steps(self.name + "-steps")
195202
self.entrypoint.add(step)
196203

197204
def submit(

tutorials/dflow-function.ipynb

+18-51
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
"cell_type": "markdown",
2525
"metadata": {},
2626
"source": [
27-
"##### In this tutorial, we will complete the task using `PythonOPTemplate` and function OP"
27+
"##### In this tutorial, we will complete the task using function OP"
2828
]
2929
},
3030
{
@@ -37,14 +37,15 @@
3737
},
3838
{
3939
"cell_type": "code",
40-
"execution_count": 1,
40+
"execution_count": null,
4141
"metadata": {},
4242
"outputs": [],
4343
"source": [
44+
"import sys\n",
4445
"from pathlib import Path\n",
4546
"\n",
46-
"from dflow import Step, Workflow\n",
47-
"from dflow.python import OP, Artifact, PythonOPTemplate"
47+
"from dflow import Workflow\n",
48+
"from dflow.python import OP, Artifact"
4849
]
4950
},
5051
{
@@ -54,7 +55,7 @@
5455
"source": [
5556
"For step 1: \n",
5657
"\n",
57-
"This class can achieve the function to write files. In the example, we try to write a file containing message of string format, and output a number of int format.\n",
58+
"This OP is to write files. In the example, we try to write a file containing message of string format, and output a number of int format.\n",
5859
"- input:\n",
5960
" - \"msg\": the input message\n",
6061
"- output:\n",
@@ -64,12 +65,12 @@
6465
},
6566
{
6667
"cell_type": "code",
67-
"execution_count": 2,
68+
"execution_count": null,
6869
"metadata": {},
6970
"outputs": [],
7071
"source": [
71-
"@OP.function\n",
72-
"def WriteFile(msg: str) -> {\"out_art\": Artifact(Path), \"length\": int}:\n",
72+
"@OP.function(image=f\"python:{sys.version_info.major}.{sys.version_info.minor}\")\n",
73+
"def write_file(msg: str) -> {\"out_art\": Artifact(Path), \"length\": int}:\n",
7374
" with open(\"msg.txt\",\"w\") as f:\n",
7475
" f.write(msg)\n",
7576
" \n",
@@ -91,12 +92,12 @@
9192
},
9293
{
9394
"cell_type": "code",
94-
"execution_count": 3,
95+
"execution_count": null,
9596
"metadata": {},
9697
"outputs": [],
9798
"source": [
98-
"@OP.function\n",
99-
"def Duplicate(in_art: Artifact(Path), in_num: int) -> {\"out_art\": Artifact(Path), \"out_num\": int}:\n",
99+
"@OP.function(image=f\"python:{sys.version_info.major}.{sys.version_info.minor}\")\n",
100+
"def duplicate(in_art: Artifact(Path), in_num: int) -> {\"out_art\": Artifact(Path), \"out_num\": int}:\n",
100101
" with open(in_art, \"r\") as f:\n",
101102
" content = f.read()\n",
102103
" with open(\"bar.txt\", \"w\") as f:\n",
@@ -113,52 +114,18 @@
113114
"cell_type": "markdown",
114115
"metadata": {},
115116
"source": [
116-
"After defining the PythonOPTemplate, it is to define steps.\n",
117-
"- step0 is to write a file and to output length of the content using WriteFile OP and python image\n",
118-
" - parameters:\n",
119-
" - \"msg\": \"HelloWorld!\"\n",
120-
"- step1 is to duplicate the content in the file and to duplicate the number using Duplicate OP and python image\n",
121-
" - artifacts:\n",
122-
" - in_art file is from step0.outputs.artifacts[\"out_art\"]\n",
123-
" - parameters:\n",
124-
" - in_num is from step0.outputs.parameters[\"length\"]\n",
125-
"\n",
126-
"Finally, we need to set up a Workflow named \"python\" and then add step0 and step1.\n",
127-
"\n",
128-
"`wf.submit` is to submit this workflow to Argo."
117+
"After defining OPs, call the OPs in series in the context of a workflow, which will connect them together as a workflow and submit it finally."
129118
]
130119
},
131120
{
132121
"cell_type": "code",
133-
"execution_count": 4,
122+
"execution_count": null,
134123
"metadata": {},
135-
"outputs": [
136-
{
137-
"name": "stdout",
138-
"output_type": "stream",
139-
"text": [
140-
"Workflow has been submitted (ID: python-wxj66, UID: 600f99da-21ee-40af-9a3b-c738e46556b8)\n"
141-
]
142-
}
143-
],
124+
"outputs": [],
144125
"source": [
145-
"import sys\n",
146-
"step0 = Step(\n",
147-
" name=\"step0\",\n",
148-
" template=PythonOPTemplate(WriteFile, image=f\"python:{sys.version_info.major}.{sys.version_info.minor}\"),\n",
149-
" parameters={\"msg\": \"HelloWorld!\"},\n",
150-
")\n",
151-
"\n",
152-
"step1 = Step(\n",
153-
" name=\"step1\",\n",
154-
" template=PythonOPTemplate(Duplicate, image=f\"python:{sys.version_info.major}.{sys.version_info.minor}\"),\n",
155-
" parameters={\"in_num\": step0.outputs.parameters[\"length\"]},\n",
156-
" artifacts={\"in_art\": step0.outputs.artifacts[\"out_art\"]},\n",
157-
")\n",
158-
"wf = Workflow(name=\"python\")\n",
159-
"wf.add(step0)\n",
160-
"wf.add(step1)\n",
161-
"wf.submit();"
126+
"with Workflow(name=\"python\") as wf:\n",
127+
" out = write_file(msg=\"HelloWorld!\")\n",
128+
" duplicate(in_num=out[\"length\"], in_art=out[\"out_art\"])"
162129
]
163130
}
164131
],

0 commit comments

Comments
 (0)