xbitinfo.xbitinfo.get_prefect_flow

xbitinfo.xbitinfo.get_prefect_flow#

xbitinfo.xbitinfo.get_prefect_flow(paths=[])[source]#

Create prefect.Flow for paths to be:

  1. Analyse bitwise real information content with xbitinfo.xbitinfo.get_bitinformation()

  2. Retrieve keepbits with xbitinfo.xbitinfo.get_keepbits()

  3. Apply bitrounding with xbitinfo.bitround.xr_bitround()

  4. Save as compressed netcdf with xbitinfo.save_compressed.ToCompressed_Netcdf

Many parameters can be changed when running the flow flow.run(parameters=dict(chunk="auto")): - paths: list of paths

Paths to be bitrounded

Parameters:

paths (list) – List of paths of files to be processed by xbitinfo.xbitinfo.get_bitinformation(), xbitinfo.xbitinfo.get_keepbits(), xbitinfo.bitround.xr_bitround() and to_compressed_netcdf.

Returns:

prefect.Flow – See https://docs.prefect.io/core/concepts/flows.html#overview

Example

Imagine n files of identical structure, i.e. 1-year per file climate model output: >>> ds = xr.tutorial.load_dataset(“rasm”) >>> year, datasets = zip(*ds.groupby(“time.year”)) >>> paths = [f”{y}.nc” for y in year] >>> xr.save_mfdataset(datasets, paths)

Create prefect.Flow and run sequentially >>> flow = xb.get_prefect_flow(paths=paths) >>> import prefect >>> logger = prefect.context.get(“logger”) >>> logger.setLevel(“ERROR”) >>> st = flow.run()

Inspect flow state >>> # flow.visualize(st) # requires graphviz

Run in parallel with dask: >>> import os # https://docs.xarray.dev/en/stable/user-guide/dask.html >>> os.environ[“HDF5_USE_FILE_LOCKING”] = “FALSE” >>> from prefect.executors import DaskExecutor, LocalDaskExecutor >>> from dask.distributed import Client >>> client = Client(n_workers=2, threads_per_worker=1, processes=True) >>> executor = DaskExecutor( … address=client.scheduler.address … ) # take your own client >>> executor = DaskExecutor() # use dask from prefect >>> executor = LocalDaskExecutor() # use dask local from prefect >>> # flow.run(executor=executor, parameters=dict(overwrite=True))

Modify parameters of a flow: >>> flow.run(parameters=dict(inflevel=0.9999, overwrite=True)) <Success: “All reference tasks succeeded.”>

See also

-(), -()