2

I am trying to analyze the 30 day standardized precipitation index for a multi-state range of the southeastern US for the year 2016. I'm using xclim to process a direct pull of gridded daily precipitation data from nClimGrid-Daily, which involves pulling about 40 years of data and doing a 30-day rolling mean of the daily sum precip. Resolution is daily and 48.3 km. What I expected was the code to provide me the 30-day SPI for the entire range which I could then subset for the year 2016. Instead, I'll run this on my school's HPC via ssh and then get kicked off the compute node halfway (compute node has about 70gb of RAM). I can run it on my local machine, but gets too hot to complete.

I've downloaded the range I needed (8.8gb). After geographically and temporally subsetting, it's around 7gb. I can lazy load everything, but when I persist/compute with dask it just keeps stalling after running for a certain amount of time - a lot of processing power is going towards rechunking by xclim even though I've tried rechunking prior to avoid it. As a note, when I use a coarser spatial resolution (county-level vs gridded) I can run this in less than an hour. With gridded, it breaks after about 3 hours. Please let me know if you have any suggestions!

import xarray as xr
from dask.distributed import Client, LocalCluster
import os
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests

#### bulk download files 
base_url = "https://www.ncei.noaa.gov/data/nclimgrid-daily/access/grids"
years = range(1976, 2017)
months = range(1, 13)

out_dir = Path("./nclimgrid_prcp")  # adjust as needed
out_dir.mkdir(parents=True, exist_ok=True)

max_workers = 16
timeout = 60  # seconds

def dl(year, month):
    fname = f"ncdd-{year}{month:02}-grd-scaled.nc"
    url = f"{base_url}/{year}/{fname}"
    path = out_dir / fname

    if path.exists():
        return path, "exists"

    try:
        r = requests.get(url, timeout=timeout, stream=True)
        r.raise_for_status()
        with open(path, "wb") as f:
            for chunk in r.iter_content(chunk_size=2**20):
                if chunk:
                    f.write(chunk)
        return path, "downloaded"
    except Exception as e:
        return path, f"error: {e}"


futures = []
with ThreadPoolExecutor(max_workers=max_workers) as ex:
    for y in years:
        for m in months:
            futures.append(ex.submit(dl, y, m))

files = []
for fut in as_completed(futures):
    p, status = fut.result()
    print(p.name, status)
    if status in ("exists", "downloaded"):
        files.append(str(p))

####### End download files

files = sorted(glob.glob('nclimgrid_prcp/*.nc')) #access local files

def subset(ds):
    return ds.sel(
         lat=slice(30, 38),    
         lon=slice(-92, -78), 
    )
ds = xr.open_mfdataset(files, combine="by_coords"  preprocess=subset,engine='netcdf4')

pr = ds['prcp']
pr = pr.assign_attrs(units="mm/day")

pr = pr.chunk(
    {
        "time": -1,   
        "lat": 64,    
        "lon": 84,    
    }
)

cluster = LocalCluster()
client = Client(cluster)
client.cluster.scale(10)  

spi_30 = xc.indices.standardized_precipitation_index(
    pr,
    freq="D",   
    window=30)
2
  • Why would you calculate SPI for 40 years if you are only interested in a single year? Commented Nov 19 at 14:55
  • I want to compare the fall of 2016 against the historic trend since spi is actual/anticipated precip Commented Nov 20 at 19:22

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.