I am trying to analyze the 30 day standardized precipitation index for a multi-state range of the southeastern US for the year 2016. I'm using xclim to process a direct pull of gridded daily precipitation data from nClimGrid-Daily, which involves pulling about 40 years of data and doing a 30-day rolling mean of the daily sum precip. Resolution is daily and 48.3 km. What I expected was the code to provide me the 30-day SPI for the entire range which I could then subset for the year 2016. Instead, I'll run this on my school's HPC via ssh and then get kicked off the compute node halfway (compute node has about 70gb of RAM). I can run it on my local machine, but gets too hot to complete.
I've downloaded the range I needed (8.8gb). After geographically and temporally subsetting, it's around 7gb. I can lazy load everything, but when I persist/compute with dask it just keeps stalling after running for a certain amount of time - a lot of processing power is going towards rechunking by xclim even though I've tried rechunking prior to avoid it. As a note, when I use a coarser spatial resolution (county-level vs gridded) I can run this in less than an hour. With gridded, it breaks after about 3 hours. Please let me know if you have any suggestions!
import xarray as xr
from dask.distributed import Client, LocalCluster
import os
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
#### bulk download files
base_url = "https://www.ncei.noaa.gov/data/nclimgrid-daily/access/grids"
years = range(1976, 2017)
months = range(1, 13)
out_dir = Path("./nclimgrid_prcp") # adjust as needed
out_dir.mkdir(parents=True, exist_ok=True)
max_workers = 16
timeout = 60 # seconds
def dl(year, month):
fname = f"ncdd-{year}{month:02}-grd-scaled.nc"
url = f"{base_url}/{year}/{fname}"
path = out_dir / fname
if path.exists():
return path, "exists"
try:
r = requests.get(url, timeout=timeout, stream=True)
r.raise_for_status()
with open(path, "wb") as f:
for chunk in r.iter_content(chunk_size=2**20):
if chunk:
f.write(chunk)
return path, "downloaded"
except Exception as e:
return path, f"error: {e}"
futures = []
with ThreadPoolExecutor(max_workers=max_workers) as ex:
for y in years:
for m in months:
futures.append(ex.submit(dl, y, m))
files = []
for fut in as_completed(futures):
p, status = fut.result()
print(p.name, status)
if status in ("exists", "downloaded"):
files.append(str(p))
####### End download files
files = sorted(glob.glob('nclimgrid_prcp/*.nc')) #access local files
def subset(ds):
return ds.sel(
lat=slice(30, 38),
lon=slice(-92, -78),
)
ds = xr.open_mfdataset(files, combine="by_coords" preprocess=subset,engine='netcdf4')
pr = ds['prcp']
pr = pr.assign_attrs(units="mm/day")
pr = pr.chunk(
{
"time": -1,
"lat": 64,
"lon": 84,
}
)
cluster = LocalCluster()
client = Client(cluster)
client.cluster.scale(10)
spi_30 = xc.indices.standardized_precipitation_index(
pr,
freq="D",
window=30)