xbitinfo.xbitinfo.get_prefect_flow

xbitinfo.xbitinfo.get_prefect_flow#

xbitinfo.xbitinfo.get_prefect_flow(paths=[])[source]#

Create prefect.Flow for paths to be:

  1. Analyse bitwise real information content with xbitinfo.xbitinfo.get_bitinformation()

  2. Retrieve keepbits with xbitinfo.xbitinfo.get_keepbits()

  3. Apply bitrounding with xbitinfo.bitround.xr_bitround()

  4. Save as compressed netcdf with xbitinfo.save_compressed.ToCompressed_Netcdf

Many parameters can be changed when running the flow flow.run(parameters=dict(chunk="auto")): - paths: list of paths

Paths to be bitrounded

Parameters:

paths (list) – List of paths of files to be processed by xbitinfo.xbitinfo.get_bitinformation(), xbitinfo.xbitinfo.get_keepbits(), xbitinfo.bitround.xr_bitround() and to_compressed_netcdf.

Returns:

prefect.Flow – See https://docs.prefect.io/core/concepts/flows.html#overview

Example

Imagine n files of identical structure, i.e. 1-year per file climate model output:

>>> ds = xr.tutorial.load_dataset("rasm")
>>> year, datasets = zip(*ds.groupby("time.year"))
>>> paths = [f"{y}.nc" for y in year]
>>> xr.save_mfdataset(datasets, paths)

Create prefect.Flow and run sequentially

>>> flow = xb.get_prefect_flow(paths=paths)
>>> import prefect
>>> from prefect import get_run_logger
>>> if __name__ == "__main__":
...     logger = get_run_logger()
...     logger.setLevel("ERROR")
...     st = flow.run()
...

Inspect flow state

>>> # flow.visualize(st)  # requires graphviz

Run in parallel with dask:

>>> import os  # https://docs.xarray.dev/en/stable/user-guide/dask.html
>>> os.environ["HDF5_USE_FILE_LOCKING"] = "FALSE"
>>> from prefect_dask.task_runners import DaskTaskRunner
>>> from dask.distributed import Client
>>> if __name__ == "__main__":
...     client = Client(n_workers=2, threads_per_worker=1, processes=True)
...     executor = DaskTaskRunner(
...         address=client.scheduler.address
...     )  # take your own client
...     flow.run(executor=executor, parameters=dict(overwrite=True))
...

Modify parameters of a flow:

>>> if __name__ == "__main__":
...     flow.run(parameters=dict(inflevel=0.9999, overwrite=True))
...