Issues with forced cache reading and/or `dask.distributed`
Summary
My pipeline forces cache reading for re-chunking and works fine sequentially, but it fails in parallel when I execute it with a dask.distributed.Client
. I receive several errors about that a file is already open for reading and files missing from the cache, and then dantro actually writes .pkl
files instead of .nc_da
files (my data is xr.Dataarray
).
I tried to replicate the error in a smaller example and so far did not manage to make dantro write pickle files from xr.Dataarray
s. However, I noticed that dantro sometimes switches data types. I'm not sure this is the root of the problem, though. In my case, dantro seems to be inconsistent whether the transformations in the DAG return xr.Dataarray
or dantro.XrDataContainer
, which is directly linked to forced cache reading (both sequentially and in parallel)
Steps to reproduce
cfg.yml Makefile requirements.txt run.py README.md
I created a test with 4 x 2 test cases.
- 4: Evaluate without cache, evaluate with writing cache, evaluate with writing and forced reading of the cache, all
- 2: sequential, parallel with
dask.distributed.Client
.
For the forced reading, the test fails because the returned data type is not XrDataContainer
but xr.Dataarray
.
The case where all eval tasks are executed in parallel fails because several processes report missing cache files. (To see this issue more clearly, comment out line 60 in run.py
)
What is the current bug behaviour?
Multiprocessing transformation DAG with forced cache reading fails.
What is the expected correct behaviour?
Multiprocessing transformation DAG with forced cache reading works like when executed sequentially.