1,150 questions
2
votes
0
answers
57
views
task works on local, but errors on Dask cluster: "SystemError: error return without exception set"
I have the following codes that pass an array to the task and submit to Dask cluster. The Dask cluster is running in Docker with several Dask workers. Docker starts with:
scheduler:
docker run -d \
-...
2
votes
0
answers
65
views
How to optimize NetCDF files and dask for processing long-term climataological indices with xclim (ex. SPI using 30-day rolling window)?
I am trying to analyze the 30 day standardized precipitation index for a multi-state range of the southeastern US for the year 2016. I'm using xclim to process a direct pull of gridded daily ...
0
votes
0
answers
41
views
Dask distributed stores old version of my code
I am analysing some data using dask distributed on a SLURM cluster. I am also using jupyter notebook. I am changing my codebase frequently and running jobs. Recently, a lot of my jobs started to crash....
0
votes
0
answers
30
views
Is it possible to use dask distributed to pandas with apply working with multiprocessing?
I need advice from you.
Right now i do some computation with pandas library.
Program is using multiprocessing and df.apply.
The simple example showing my idea is here:
import multiprocessing
import ...
0
votes
0
answers
65
views
Why does the Dask dashboard become unresponsive over time?
I maintain a production Dask cluster. Every few weeks or so I need to restart the scheduler because it becomes progressively slower over time. The dashboard can take well over a minute to display the &...
0
votes
0
answers
27
views
Use Python streamz to parallel process many realtime updating files?
Using Python streamz and dask, I want to distribute the data of textfiles that are generated to threads. Which then will process every newline generated inside those files.
from streamz import Stream
...
1
vote
1
answer
48
views
Using Streamz.Dask and matplotlib and tkiniter window to display graphs and histograms in realtime?
I already have a code using threadpool tkiniter and matplotlib to process signals which are getting written to a file from another process. The Synchronization between the two process is by reading ...
0
votes
0
answers
41
views
Google Cloud Platform Dask RefreshError
import os
from dask_cloudprovider.gcp import GCPCluster
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]=r'C:\Users\Me\Documents\credentials\compute_engine_default_key\test-project123-...
0
votes
0
answers
73
views
Dask distributed.protocol.core - CRITICAL - Failed to deserialize with OutOfData exception
I want to use XGBoost with Dask, which requires a client to be passed to the train method. When I try to read the data without defining a client, everything works fine, but when I run the code below I ...
0
votes
1
answer
84
views
Dask adaptive deployment in azure kubernetes
I am trying to deploy a dask cluster with 0 workers and 1 scheduler, based on the work load need to scale up the worker to required, i found that the adaptive deployment is the correct way, i am using ...
1
vote
0
answers
97
views
Dask concat on multiple dataframe axis=1
I am new to Dask. While attempting to run concat on a list of DataFrames, I noticed it is consuming more time, resources, and tasks than expected. Here are the details of my run:
Scheduler (same as ...
0
votes
1
answer
249
views
How to Set Dask Dashboard Address with SLURMRunner (Jobqueue) and Access It via SSH Port Forwarding?
I am trying to run a Dask Scheduler and Workers on a remote cluster using SLURMRunner from dask-jobqueue. I want to bind the Dask dashboard to 0.0.0.0 (so it’s accessible via port forwarding) and ...
0
votes
0
answers
111
views
Initializing a local cluster in Dask takes forever
I'm trying out some things with Dask for the first time, and while I had it running a few weeks ago, I now find that I can't get the LocalCluster initiated. I've cut if off after running 30 minutes at ...
0
votes
0
answers
124
views
dask_cuda problem with Local CUDA Cluster
I am trying to get this code to work and then use it to train various models on two gpu's:
from dask_cuda import LocalCUDACluster
from dask.distributed import Client
if __name__ == "__main__&...
1
vote
1
answer
63
views
How to nest dask.delayed functions within other dask.delayed functions
I am trying to learn dask, and have created the following toy example of a delayed pipeline.
+-----+ +-----+ +-----+
| baz +--+ bar +--+ foo |
+-----+ +-----+ +-----+
So baz has a dependency on ...
0
votes
1
answer
82
views
Dask distributed pause task to wait for subtask - how-to, or bad practice?
I am running tasks using client.submit thus:
from dask.distributed import Client, get_client, wait, as_completed
# other imports
zip_and_upload_futures = [ client.submit(zip_and_upload, id, path, ...
0
votes
1
answer
43
views
Use dask with numactl
I am using dask to parallelize an operation that is memory-bound. So, I want to ensure each dask worker has access to a single NUMA node and prevent cross-node memory access. I can do this in the ...
1
vote
0
answers
1k
views
"Sending large graph": what is the significance of this warning?
I have a zarr dataset on disk, which I open with xarray using:
import xarray as xr
import numpy as np
import dask.distributed as dd
# setup dask
cluster = dd.LocalCluster()
client = dd.Client(cluster)...
0
votes
0
answers
26
views
Dask worker running long calls
The code running on the dask worker calls asyncio.run() and proceeds to exectue a series of async calls (on the worker running event_loop) that gather data, and then run a small computation.
This ...
0
votes
0
answers
74
views
Importing SQL Table from Snowflake into Jupyter using Dask
I have an SQL Table in Snowflake,100K rows and 15 Columns. I want to import this table into my Jupyter notebook using Dask for further analysis. Primarily doing this a form of practice since I am new ...
0
votes
1
answer
94
views
Understanding if current process is part of a multiprocessing pool with --multiprocessing-fork
I need to find a way for a python process to figure out if it was launched as part of a multiprocessing pool.
I am using dask to parallelize calculations, using dask.distributed.LocalCluster. For UX ...
0
votes
1
answer
121
views
Common workflow for using Dask on HPC systems
I’m new to Dask. I’m currently working in an HPC managed by SLURM with some compute nodes (those that execute the jobs) and the login node (which I access through SSH to send the SLURM jobs). I’m ...
0
votes
0
answers
113
views
How to fix memory errors merging large dask dataframes?
I am trying to read 23 CSV files into dask dataframes, merge them together using dask, and ouptut to parquet. However, it's failing due to memory issues.
I used to use pandas to join these together ...
0
votes
1
answer
187
views
How do I set the timeout parameter in dask scheduler
I was trying to run dask-distributed to distribute some big computation in a slurm cluster.
I was always getting a "TimeoutError: No valid workers found" message (this came from line 6130 in ...
0
votes
0
answers
58
views
Null values in log file from Python logging module when used with Dask
I have been trying to setup logging using the logging module in a Python script, and I have got it working properly. It can now log to both the console and a log file. But if fails when I setup a Dask ...
0
votes
1
answer
110
views
How can I keep Dask workers busy when processing large datasets to prevent them from running out of tasks?
I'm trying to process a large dataset (around 1 million tasks) using Dask distributed computing in Python. (I am getting data from a database to process it, and I am retriving around 1M rows). Here I ...
1
vote
1
answer
88
views
How do I modularize functions that work with dask?
I'm trying to modularize my functions that use Dask, but I keep encountering the error "No module named 'setup'". I can't import any local module that is related to Dask, and currently, ...
1
vote
1
answer
740
views
Is there a way to save into a zarr file an xarray, with the possibility of appending in multiple dimensions?
I'm currently doing an internship where I need to create large datasets, often hundreds of GB in size. I'm collecting temporal samples for cartography, where I collect 500 samples for each ...
0
votes
1
answer
94
views
Dask - High CPU consumption unloading parallel workers
I’m using dask to make parallel processing of a simulation. It consists of a series of differential equations that are numerically solved using numpy arrays that are compiled using numba @jit ...
0
votes
0
answers
59
views
Efficiently processing large molecular datasets with Dask Disctributed, DataFrames and Prefect,
I'm working with a large dataset of molecular structures (approximately 240,000 records) stored in a PostgreSQL database. I need to perform computations on each molecule using RDKit. I'm using Dask ...
0
votes
0
answers
180
views
How to speed up interpolation in dask
I have a piece of data code that performs interpolation on a large number of arrays.
This is extremely quick with numpy, but:
The data the code will work with in reality will often not fit in memory
...
0
votes
0
answers
55
views
How to retrieve a task's progress as percentage done in Dask?
I'd like to query for a task's status in a Dask cluster by retrieving a percentage completed beyond the visual progress bar or dashboard. For example, I'm submitting this task below:
from dask....
0
votes
1
answer
304
views
How to use user-defined fsspec filesystem with dask?
I made my own filesystem in the fsspec library and I am trying to read in dask dataframes from this filesystem object to open the dataframe file. However I am getting an error when I try to do this. ...
0
votes
1
answer
230
views
What is the cleanest way to detect whether I'm running in a dask Worker
Given a dask.distributed cluster, for example a LocalCluster, what is the most robust way to detect if I'm running a python code from within a Worker instance?
This can be code that is not strictly ...
-1
votes
1
answer
194
views
Understanding task stream and speeding up Distributed Dask
I have implemented some data analysis in Dask using dask-distributed, but the performance is very far from the same analysis implemented in numpy/pandas and I am finding it difficult to understand the ...
2
votes
0
answers
23
views
Is there a way to restrict stack traces from emitting dataframe contents?
By default, our system logs stack traces in logs output. Generally, we're careful to not log contents of dataframes we're working with as they may contain sensitive user data. However, when Dask ...
0
votes
0
answers
143
views
Dask Can't synchronously read data shared on NFS
Running Dask Scheduler on system A and workers on system A and B. NFS volume from system A is shared on the network through NFS with system B, and contains the data files. This folder has a symbolic ...
1
vote
1
answer
348
views
Default n_workers when creating a Dask cluster?
Simple question. If I create a Dask cluster using the following code:
from dask.distributed import Client
client = Client()
How many workers will it create? I ran this code on one machine, and it ...
1
vote
0
answers
43
views
Dask worker nodes and multiprocessing raising TypeError during object initialization
I have a program that I wrote. I define a class in this program that is a subclass of a class I import. If I run this code without Dask, I successfully run it. When I plug in Dask, I get an error ...
1
vote
1
answer
82
views
How does dask handles the datasets larger than the memory?
I'm seeking guidance on efficiently profiling data using Dask.
I've opted to use Dask to lazily load the DataFrame, either from SQL tables (dask.read_sql_table) or CSV files (dask.read_csv).
I am ...
1
vote
0
answers
97
views
asyncio.exceptions.CancelledError when using Dask LocalCluster with processes=False and progress
This is an example:
import numpy as np
import zarr
from dask.distributed import Client, LocalCluster
from dask import array as da
from dask.distributed import progress
def same(x):
return x
x = ...
0
votes
0
answers
117
views
Is there a way to analyze the dask worker killed?
I have ~30GB uncompressed spatial data, it contains id, tags, and coordinates as three columns in parquet file with row group size 64MB.
I used dask read_parquet with block_size 32MiB got 118 ...
1
vote
0
answers
187
views
Using dask.distributed with rioxarray rio.to_raster results in `ValueError: Lock is not yet acquired`
I am trying to write some code using dask.distributed.Client and rioxarray to_raster that:
Concatenates two rasters (dask arrays)
Applies a function across all blocks in the concatenated array
Writes ...
0
votes
1
answer
110
views
How to convert convert a datetime string to timestamp in dask cudf and then sort the dataframe by this column
I would like to convert a datetime string to timestamp in dask cudf and then sort the dataframe by this column.
Example:
import dask_cudf as ddf
import pandas as pd
# Sample data (replace with your ...
0
votes
1
answer
70
views
How Dask manages file descriptors
How does Dask manage file descriptors?
For example when creating a dask.array from an hdf5 file. When the array is large enough to be chunked.
Do the created tasks inherit the file descriptor created ...
-1
votes
1
answer
86
views
read file csv and do the aggregation with multiple workers , dask.distributed , dask.dataframe
i have server ip:192.168.33.10 launche the schudeler dask scheduler --host 0.0.0.0 this is master in this server i have file "/var/shared/job_skills.csv" and the workers is
192.168.33.11,192....
1
vote
1
answer
69
views
Why dask shows smaller size than the actual size of the data (numpy array)?
Dask shows slightly smaller size than the actual size of a numpy array. Here is an example of a numpy array that is exactly 32 Mb:
import dask as da
import dask.array
import numpy as np
shape = (1000,...
0
votes
1
answer
97
views
Can't dd.read_sql on jupyter, kernel crashes
I'm coming here because I don't understand my problem.
I created a dockerfile + compose which creates 1 dask scheduler and 2 workers:
docker-compose.yaml:
version: '3.8'
services:
dask-scheduler:
...
1
vote
0
answers
188
views
How to Read the Result of Query into a Dask Dataframe in a Distributed Client?
Trying to read the results of a query (from an AWS athena database) to a dask dataframe. Following the read_sql_query method of the official documentation.
Here is how I am calling it.
from dask ...
0
votes
1
answer
209
views
Why doesn't a prefect task fail, if a contained dask.distributed task fails?
I'm running a workflow using Prefect using a DaskTaskRunner, which creates and holds a dask.distibuted.LocalCluster instance.
Inside a prefect task I use a dask_ml.RandomSearchCV and fit it, which by ...