Skip to content

Commit 8f9d64e

Browse files
committed
Default ttl seconds after finished to 3days after data flow completes
Signed-off-by: trafalgarzzz <trafalgarz@outlook.com>
1 parent 80e9b37 commit 8f9d64e

File tree

5 files changed

+123
-26
lines changed

5 files changed

+123
-26
lines changed

examples/kubernetes/create_dataset_and_runtime.py

Lines changed: 98 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,13 @@ def init_logger(logger_name, logger_level):
1919
logger.setLevel(logger_level)
2020

2121

22-
# Example for data experts like data scientists and data engineers.
23-
def main():
24-
global logger
25-
init_logger("fluidsdk", logging.INFO)
26-
27-
client_config = ClientConfig()
28-
fluid_client = FluidClient(client_config)
29-
22+
def create_dataset_with_alluxio(fluid_client: FluidClient):
3023
dataset_name = "demo"
3124
try:
25+
# Mounting WebUFS to Alluxio
3226
fluid_client.create_dataset(dataset_name, "hbase", "https://mirrors.bit.edu.cn/apache/hbase/stable/", "/hbase")
3327
except Exception as e:
34-
raise RuntimeError("f""Failed to create dataset: {e}")
28+
raise RuntimeError(f"Failed to create dataset: {e}")
3529

3630
logger.info(f"Dataset \"{dataset_name}\" created successfully")
3731

@@ -45,6 +39,100 @@ def main():
4539
logger.info(f"AlluxioRuntime created and bound to dataset \"{dataset_name}\", cache engine is now ready")
4640

4741

42+
def create_dataset_with_jindofs(fluid_client: FluidClient):
43+
dataset_name = "demo"
44+
try:
45+
# Mounting OSS Bucket to Jindo
46+
fluid_client.create_dataset(dataset_name,
47+
"mybucket", "oss://mybucket/subdir", "/",
48+
options={"fs.oss.endpoint": "oss-cn-beijing-internal.aliyuncs.com"},
49+
cred_secret_name="access-key",
50+
cred_secret_options={
51+
"fs.oss.accessKeyId": "fs.oss.accessKeyId",
52+
"fs.oss.accessKeySecret": "fs.oss.accessKeySecret"
53+
})
54+
except Exception as e:
55+
raise RuntimeError("f""Failed to create dataset: {e}")
56+
57+
logger.info(f"Dataset \"{dataset_name}\" created successfully")
58+
59+
try:
60+
dataset = fluid_client.get_dataset(dataset_name)
61+
except Exception as e:
62+
raise RuntimeError(f"Failed to get dataset: {e}")
63+
64+
logger.info(f"Binding JindoRuntime to dataset \"{dataset_name}\"...")
65+
dataset.bind_runtime(runtime_type="jindo", replicas=1, cache_medium="MEM", cache_capacity_GiB=2, wait=True)
66+
logger.info(f"JindoRuntime created and bound to dataset \"{dataset_name}\", cache engine is now ready")
67+
68+
69+
def create_dataset_with_juicefs(fluid_client: FluidClient):
70+
dataset_name = "demo"
71+
try:
72+
# Setting minio as JuiceFS's backend storage and redis as JuiceFS's meta server
73+
fluid_client.create_dataset(dataset_name,
74+
"minio", "juicefs:///", "/",
75+
options={"bucket": "http://minio:9000/minio/test", "storage": "minio"},
76+
cred_secret_name="jfs-secret",
77+
cred_secret_options={
78+
"metaurl": "metaurl",
79+
"access-key": "access-key",
80+
"secret-key": "secret-key"
81+
})
82+
except Exception as e:
83+
raise RuntimeError("f""Failed to create dataset: {e}")
84+
85+
logger.info(f"Dataset \"{dataset_name}\" created successfully")
86+
87+
try:
88+
dataset = fluid_client.get_dataset(dataset_name)
89+
except Exception as e:
90+
raise RuntimeError(f"Failed to get dataset: {e}")
91+
92+
logger.info(f"Binding JuiceFSRuntime to dataset \"{dataset_name}\"...")
93+
dataset.bind_runtime(runtime_type="juicefs", replicas=1, cache_medium="MEM", cache_capacity_GiB=2, wait=True)
94+
logger.info(f"JuiceFSRuntime created and bound to dataset \"{dataset_name}\", cache engine is now ready")
95+
96+
97+
def create_dataset_with_vineyard(fluid_client: FluidClient):
98+
dataset_name = "vineyard"
99+
try:
100+
fluid_client.create_dataset(dataset_name)
101+
except Exception as e:
102+
raise RuntimeError("f""Failed to create dataset: {e}")
103+
104+
logger.info(f"Dataset \"{dataset_name}\" created successfully")
105+
106+
try:
107+
dataset = fluid_client.get_dataset(dataset_name)
108+
except Exception as e:
109+
raise RuntimeError(f"Failed to get dataset: {e}")
110+
111+
logger.info(f"Binding VineyardRuntime to dataset \"{dataset_name}\"...")
112+
dataset.bind_runtime(runtime_type="vineyard", replicas=1, cache_medium="MEM", cache_capacity_GiB=2, wait=True)
113+
logger.info(f"VineyardRuntime created and bound to dataset \"{dataset_name}\", cache engine is now ready")
114+
115+
116+
# Examples for data experts like data scientists and data engineers.
117+
def main():
118+
global logger
119+
init_logger("fluidsdk", logging.INFO)
120+
121+
client_config = ClientConfig()
122+
fluid_client = FluidClient(client_config)
123+
124+
cases = {
125+
"alluxio": create_dataset_with_alluxio,
126+
"jindofs": create_dataset_with_jindofs,
127+
"juicefs": create_dataset_with_juicefs,
128+
"vineyard": create_dataset_with_vineyard
129+
}
130+
131+
# Change case_name to play with different cache engines
132+
case_name = "alluxio"
133+
cases[case_name](fluid_client)
134+
135+
48136
# Example for Kubernetes experts who is familiar with YAML-like APIs.
49137
def main_k8s_client():
50138
global logger
@@ -90,7 +178,7 @@ def main_k8s_client():
90178
levels=[
91179
models.Level(
92180
mediumtype="MEM",
93-
volume_type="hostPath",
181+
volume_type="emptyDir",
94182
path="/dev/shm",
95183
quota="2Gi",
96184
high="0.95",

examples/kubernetes/fluid_dataflow.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
from fluid import constants
1818
from fluid import models
19-
from fluid import FluidK8sClient
19+
from fluid import FluidClient, ClientConfig
2020

2121
logger = logging.getLogger("fluidsdk")
2222
stream_handler = logging.StreamHandler(sys.stdout)
@@ -71,7 +71,7 @@ def build_data_processor():
7171

7272
def main():
7373
# Initialize Fluid client
74-
fluid_client = FluidK8sClient()
74+
fluid_client = FluidClient(ClientConfig())
7575
# Get a bound dataset for later operations and dataflow
7676
dataset = fluid_client.get_dataset("demo-dataset")
7777

@@ -85,12 +85,14 @@ def main():
8585
print(op_status)
8686

8787
# Flow example 3: migrate some data to the dataset, preload it and finally process it.
88-
# Then wait until the flow is completed
89-
flow3 = dataset.migrate("/ossdata", constants.DATA_MIGRATE_DIRECTION_FROM,
90-
models.ExternalStorage(uri="oss://my-bucket/my-data",
91-
encrypt_options=get_encrypt_options())) \
92-
.preload("/ossdata").process(dataset_mountpath="/data", processor=build_data_processor()).run(
93-
run_id="testflow3")
88+
# The following code waits until the flow is completed.
89+
# After 24 hours, all the operations in the flow will be automatically garbage collected.
90+
flow3 = dataset \
91+
.migrate("/ossdata", constants.DATA_MIGRATE_DIRECTION_FROM,
92+
models.ExternalStorage(uri="oss://my-bucket/my-data", encrypt_options=get_encrypt_options())) \
93+
.preload("/ossdata") \
94+
.process(dataset_mountpath="/data", processor=build_data_processor()) \
95+
.run(run_id="testflow3", ttl_seconds_after_finished=86400)
9496
flow3.wait()
9597

9698

fluid/api/fluid_dataflow.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,14 +208,16 @@ def process(self, dataset_mountpath, processor, sub_path=None):
208208
self.flow_ops.append(op)
209209
return self
210210

211-
def run(self, run_id):
211+
def run(self, run_id, ttl_seconds_after_finished=constants.DEFAULT_DATAFLOW_TIME_TO_LIVE):
212212
"""
213213
Execute the dataflow by submitting all the defined operations to the cluster.
214214
215215
Parameters
216216
----------
217217
run_id: str
218218
The run id of the dataflow.
219+
ttl_seconds_after_finished: int
220+
Seconds to live after finished for each operation in the dataflow.
219221
220222
Returns
221223
-------
@@ -246,6 +248,7 @@ def run(self, run_id):
246248
namespace=last_step.metadata.namespace,
247249
kind=last_step.kind
248250
)
251+
op.spec.ttl_seconds_after_finished = ttl_seconds_after_finished
249252
self.fluid_client.create_data_operation(op)
250253
last_step = op
251254
return FlowHandle(run_id, steps_to_execute)

fluid/api/fluid_k8s_client.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ def create_data_operation(self, data_op: constants.DATA_OPERATION_MODELS_TYPE, n
140140
if wait:
141141
self.wait_data_operation_completed(data_op.metadata.name, data_op.kind, namespace)
142142

143-
def get_dataset(self, name, namespace=None, timeout=constants.DEFAULT_TIMEOUT) -> models.Dataset:
143+
def get_dataset(self, name, namespace=None, timeout=constants.DEFAULT_API_REQUEST_TIMEOUT) -> models.Dataset:
144144
namespace = namespace or self.namespace
145145

146146
try:
@@ -168,7 +168,7 @@ def get_dataset(self, name, namespace=None, timeout=constants.DEFAULT_TIMEOUT) -
168168
return dataset
169169

170170
def get_runtime(self, name, runtime_type=None, namespace=None,
171-
timeout=constants.DEFAULT_TIMEOUT) -> constants.RUNTIME_MODELS_TYPE:
171+
timeout=constants.DEFAULT_API_REQUEST_TIMEOUT) -> constants.RUNTIME_MODELS_TYPE:
172172
namespace = namespace or self.namespace
173173
runtime_kind = utils.infer_runtime_kind(runtime_type) or self.default_runtime_kind
174174
if runtime_kind is None:
@@ -199,7 +199,7 @@ def get_runtime(self, name, runtime_type=None, namespace=None,
199199

200200
return runtime
201201

202-
def get_data_operation(self, name, data_op_type, namespace=None, timeout=constants.DEFAULT_TIMEOUT):
202+
def get_data_operation(self, name, data_op_type, namespace=None, timeout=constants.DEFAULT_API_REQUEST_TIMEOUT):
203203
namespace = namespace or self.namespace
204204
data_op_kind = utils.infer_data_operation_kind(data_op_type)
205205
if data_op_kind is None:
@@ -280,7 +280,8 @@ def wait_dataset_bound(self, name, namespace=None, poll_timeout=constants.DEFAUL
280280
if poll >= poll_timeout:
281281
raise TimeoutError(f"TimeoutError: Timed out when waiting dataset \"{namespace}/{name}\" bound")
282282

283-
def delete_dataset(self, name, namespace=None, wait_until_cleaned_up=False, timeout=constants.DEFAULT_TIMEOUT,
283+
def delete_dataset(self, name, namespace=None, wait_until_cleaned_up=False,
284+
timeout=constants.DEFAULT_API_REQUEST_TIMEOUT,
284285
**kwargs):
285286
namespace = namespace or self.namespace
286287

@@ -330,7 +331,7 @@ def delete_dataset(self, name, namespace=None, wait_until_cleaned_up=False, time
330331
wait_until_cleaned_up=wait_until_cleaned_up, timeout=timeout, **kwargs)
331332

332333
def delete_runtime(self, name, runtime_type=None, namespace=None, wait_until_cleaned_up=False,
333-
timeout=constants.DEFAULT_TIMEOUT, **kwargs):
334+
timeout=constants.DEFAULT_API_REQUEST_TIMEOUT, **kwargs):
334335
namespace = namespace or self.namespace
335336
runtime_kind = utils.infer_runtime_kind(runtime_type) or self.default_runtime_kind
336337
if runtime_kind is None:

fluid/constants/constants.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,13 @@
1616

1717
from fluid import models
1818

19-
DEFAULT_TIMEOUT = 300
20-
19+
# ------ Common Contants -------
20+
DEFAULT_API_REQUEST_TIMEOUT = 300
2121
DEFAULT_POLL_TIMEOUT = 600
2222
DEFAULT_POLL_INTERVAL = 3
23+
DEFAULT_DATAFLOW_TIME_TO_LIVE = 86400 * 3 # 3 days
2324

25+
# ------ Fluid Types -------
2426
GROUP = "data.fluid.io"
2527
VERSION = "v1alpha1"
2628
API_VERSION = f"{GROUP}/{VERSION}"
@@ -125,5 +127,6 @@
125127
for kind, params in DATA_OPERATION_PARAMETERS.items():
126128
FLUID_CRD_PARAMETERS[kind] = params
127129

130+
# ------ DataMigrate Constants ------
128131
DATA_MIGRATE_DIRECTION_FROM = "from"
129132
DATA_MIGRATE_DIRECTION_TO = "to"

0 commit comments

Comments
 (0)