xbitinfo.xbitinfo.get_prefect_flow#
- xbitinfo.xbitinfo.get_prefect_flow(paths=[])[source]#
Create prefect.Flow for paths to be:
Analyse bitwise real information content with
xbitinfo.xbitinfo.get_bitinformation()Retrieve keepbits with
xbitinfo.xbitinfo.get_keepbits()Apply bitrounding with
xbitinfo.bitround.xr_bitround()Save as compressed netcdf with
xbitinfo.save_compressed.ToCompressed_Netcdf
Many parameters can be changed when running the flow
flow.run(parameters=dict(chunk="auto")): - paths: list of pathsPaths to be bitrounded
- analyse_paths: str or int
Which paths to be passed to
xbitinfo.xbitinfo.get_bitinformation(). Choose from["first_last", "all", int], where int is interpreted as stride, i.e. paths[::stride]. Defaults to"first".
- enforce_dtypestr or None
Enforce dtype for all variables. Currently,
xbitinfo.xbitinfo.get_bitinformation()fails for different dtypes in variables. Do nothing ifNone. Defaults toNone.
label : see
xbitinfo.xbitinfo.get_bitinformation()dim/axis : see
xbitinfo.xbitinfo.get_bitinformation()inflevel : see
xbitinfo.xbitinfo.get_keepbits()- non_negative_keepbitsbool
Set negative keepbits from
xbitinfo.xbitinfo.get_keepbits()to0. Required when usingxbitinfo.bitround.xr_bitround(). Defaults to True.
chunks : see
xarray.open_mfdataset(). Note that withchunks=None,daskis not used for I/O and the flow is still parallelized when usingDaskExecutor.- bitround_in_juliabool
Use
xbitinfo.bitround.jl_bitround()instead ofxbitinfo.bitround.xr_bitround(). Both should yield identical results. Defaults toFalse.
- overwritebool
Whether to overwrite bitrounded netcdf files.
False(default) skips existing files.
complevel : see to_compressed_netcdf, defaults to
7.- renamelist
Replace mapping for paths towards new_path of bitrounded file, i.e.
replace=[".nc", "_bitrounded_compressed.nc"]
- Parameters:
paths (
list) – List of paths of files to be processed byxbitinfo.xbitinfo.get_bitinformation(),xbitinfo.xbitinfo.get_keepbits(),xbitinfo.bitround.xr_bitround()andto_compressed_netcdf.- Returns:
prefect.Flow– See https://docs.prefect.io/core/concepts/flows.html#overview
Example
Imagine n files of identical structure, i.e. 1-year per file climate model output:
>>> ds = xr.tutorial.load_dataset("rasm") >>> year, datasets = zip(*ds.groupby("time.year")) >>> paths = [f"{y}.nc" for y in year] >>> xr.save_mfdataset(datasets, paths)
Create prefect.Flow and run sequentially
>>> flow = xb.get_prefect_flow(paths=paths) >>> import prefect >>> from prefect import get_run_logger >>> if __name__ == "__main__": ... logger = get_run_logger() ... logger.setLevel("ERROR") ... st = flow.run() ...
Inspect flow state
>>> # flow.visualize(st) # requires graphviz
Run in parallel with dask:
>>> import os # https://docs.xarray.dev/en/stable/user-guide/dask.html >>> os.environ["HDF5_USE_FILE_LOCKING"] = "FALSE" >>> from prefect_dask.task_runners import DaskTaskRunner >>> from dask.distributed import Client >>> if __name__ == "__main__": ... client = Client(n_workers=2, threads_per_worker=1, processes=True) ... executor = DaskTaskRunner( ... address=client.scheduler.address ... ) # take your own client ... flow.run(executor=executor, parameters=dict(overwrite=True)) ...
Modify parameters of a flow:
>>> if __name__ == "__main__": ... flow.run(parameters=dict(inflevel=0.9999, overwrite=True)) ...
See also