xbitinfo.xbitinfo.get_prefect_flow#
- xbitinfo.xbitinfo.get_prefect_flow(paths=[])[source]#
Create prefect.Flow for paths to be:
Analyse bitwise real information content with
xbitinfo.xbitinfo.get_bitinformation()
Retrieve keepbits with
xbitinfo.xbitinfo.get_keepbits()
Apply bitrounding with
xbitinfo.bitround.xr_bitround()
Save as compressed netcdf with
xbitinfo.save_compressed.ToCompressed_Netcdf
Many parameters can be changed when running the flow
flow.run(parameters=dict(chunk="auto"))
: - paths: list of pathsPaths to be bitrounded
- analyse_paths: str or int
Which paths to be passed to
xbitinfo.xbitinfo.get_bitinformation()
. Choose from["first_last", "all", int]
, where int is interpreted as stride, i.e. paths[::stride]. Defaults to"first"
.
- enforce_dtypestr or None
Enforce dtype for all variables. Currently,
xbitinfo.xbitinfo.get_bitinformation()
fails for different dtypes in variables. Do nothing ifNone
. Defaults toNone
.
label : see
xbitinfo.xbitinfo.get_bitinformation()
dim/axis : see
xbitinfo.xbitinfo.get_bitinformation()
inflevel : see
xbitinfo.xbitinfo.get_keepbits()
- non_negative_keepbitsbool
Set negative keepbits from
xbitinfo.xbitinfo.get_keepbits()
to0
. Required when usingxbitinfo.bitround.xr_bitround()
. Defaults to True.
chunks : see
xarray.open_mfdataset()
. Note that withchunks=None
,dask
is not used for I/O and the flow is still parallelized when usingDaskExecutor
.- bitround_in_juliabool
Use
xbitinfo.bitround.jl_bitround()
instead ofxbitinfo.bitround.xr_bitround()
. Both should yield identical results. Defaults toFalse
.
- overwritebool
Whether to overwrite bitrounded netcdf files.
False
(default) skips existing files.
complevel : see to_compressed_netcdf, defaults to
7
.- renamelist
Replace mapping for paths towards new_path of bitrounded file, i.e.
replace=[".nc", "_bitrounded_compressed.nc"]
- Parameters:
paths (
list
) – List of paths of files to be processed byxbitinfo.xbitinfo.get_bitinformation()
,xbitinfo.xbitinfo.get_keepbits()
,xbitinfo.bitround.xr_bitround()
andto_compressed_netcdf
.- Returns:
prefect.Flow
– See https://docs.prefect.io/core/concepts/flows.html#overview
Example
Imagine n files of identical structure, i.e. 1-year per file climate model output:
>>> ds = xr.tutorial.load_dataset("rasm") >>> year, datasets = zip(*ds.groupby("time.year")) >>> paths = [f"{y}.nc" for y in year] >>> xr.save_mfdataset(datasets, paths)
Create prefect.Flow and run sequentially
>>> flow = xb.get_prefect_flow(paths=paths) >>> import prefect >>> logger = prefect.context.get("logger") >>> logger.setLevel("ERROR") >>> st = flow.run()
Inspect flow state
>>> # flow.visualize(st) # requires graphviz
Run in parallel with dask:
>>> import os # https://docs.xarray.dev/en/stable/user-guide/dask.html >>> os.environ["HDF5_USE_FILE_LOCKING"] = "FALSE" >>> from prefect.executors import DaskExecutor, LocalDaskExecutor >>> from dask.distributed import Client >>> client = Client(n_workers=2, threads_per_worker=1, processes=True) >>> executor = DaskExecutor( ... address=client.scheduler.address ... ) # take your own client >>> executor = DaskExecutor() # use dask from prefect >>> executor = LocalDaskExecutor() # use dask local from prefect >>> # flow.run(executor=executor, parameters=dict(overwrite=True))
Modify parameters of a flow:
>>> flow.run(parameters=dict(inflevel=0.9999, overwrite=True)) <Success: "All reference tasks succeeded.">
See also