xbitinfo.xbitinfo.get_prefect_flow

xbitinfo.xbitinfo.get_prefect_flow#

xbitinfo.xbitinfo.get_prefect_flow(paths=[])[source]#

Create prefect.Flow for paths to be:

  1. Analyse bitwise real information content with xbitinfo.xbitinfo.get_bitinformation()

  2. Retrieve keepbits with xbitinfo.xbitinfo.get_keepbits()

  3. Apply bitrounding with xbitinfo.bitround.xr_bitround()

  4. Save as compressed netcdf with xbitinfo.save_compressed.ToCompressed_Netcdf

Many parameters can be changed when running the flow flow.run(parameters=dict(chunk="auto")): - paths: list of paths

Paths to be bitrounded

Parameters:

paths (list) – List of paths of files to be processed by xbitinfo.xbitinfo.get_bitinformation(), xbitinfo.xbitinfo.get_keepbits(), xbitinfo.bitround.xr_bitround() and to_compressed_netcdf.

Returns:

prefect.Flow – See https://docs.prefect.io/core/concepts/flows.html#overview

Example

Imagine n files of identical structure, i.e. 1-year per file climate model output:

>>> ds = xr.tutorial.load_dataset("rasm")
>>> year, datasets = zip(*ds.groupby("time.year"))
>>> paths = [f"{y}.nc" for y in year]
>>> xr.save_mfdataset(datasets, paths)

Create prefect.Flow and run sequentially

>>> flow = xb.get_prefect_flow(paths=paths)
>>> import prefect
>>> logger = prefect.context.get("logger")
>>> logger.setLevel("ERROR")
>>> st = flow.run()

Inspect flow state

>>> # flow.visualize(st)  # requires graphviz

Run in parallel with dask:

>>> import os  # https://docs.xarray.dev/en/stable/user-guide/dask.html
>>> os.environ["HDF5_USE_FILE_LOCKING"] = "FALSE"
>>> from prefect.executors import DaskExecutor, LocalDaskExecutor
>>> from dask.distributed import Client
>>> client = Client(n_workers=2, threads_per_worker=1, processes=True)
>>> executor = DaskExecutor(
...     address=client.scheduler.address
... )  # take your own client
>>> executor = DaskExecutor()  # use dask from prefect
>>> executor = LocalDaskExecutor()  # use dask local from prefect
>>> # flow.run(executor=executor, parameters=dict(overwrite=True))

Modify parameters of a flow:

>>> flow.run(parameters=dict(inflevel=0.9999, overwrite=True))
<Success: "All reference tasks succeeded.">