Dask for Parallel Computing in Python

In past lectures, we learned how to use numpy, pandas, and xarray to analyze various types of geoscience data. In this lecture, we address an incresingly common problem: what happens if the data we wish to analyze is "big data"

Aside: What is "Big Data"?

There is a lot of hype around the buzzword "big data" today. Some people may associate "big data" with specific sortware platforms (e.g. "Hadoop", "spark"), while, for others, "big data" means specific machine learning techniques. But I think wikipedia's definition is the most useful

Big data is data sets that are so voluminous and complex that traditional data processing application software are inadequate to deal with them.

By this definition, a great many datasets we regularly confront in Earth science are big data.

A good threshold for when data becomes difficult to deal with is when the volume of data exceeds your computer's RAM. Most modern laptops have between 2 and 16 GB of RAM. High-end workstations and servers can have 1 TB (1000 GB) or RAM or more. If the dataset you are trying to analyze can't fit in you computer's memory, some special care is required to carry out the analysis. Data that can't fit in RAM but can fit on your hard drive is sometimes called "medium data."

The next threshold of difficulty is when the data can't fit on your hard drive. Most modern laptops have between 100 GB and 4 TB of storage space on the hard drive. If you can't fit your dataset on your internal hard drive, you can buy an external hard drive. However, at that point you are better off using a high-end server, HPC system, or cloud-based storage for your dataset. Once you have many TB of data to analyze, you are definitely in the realm of "big data"

What is Dask?

Dask is a tool that helps us easily extend our familiar python data analysis tools to medium and big data, i.e. dataset that can't fit in our computer's RAM. In many cases, dask also allows us to speed up our analysis by using mutiple CPU cores. Dask can help us work more efficiently on our laptop, and it can also help us scale up our analysis on HPC and cloud platforms. Most importantly, dask is almost invisible to the user, meaning that you can focus on your science, rather than the details of parallel computing.

Dask was created by the brilliant Matt Rocklin. You can learn more about it on

Dask provides collections for big data and a scheduler for parallel computing. It is probably easiest to illustrate what these mean through examples, so we will jump right in.

Dask Arrays

A dask array looks and feels a lot like a numpy array. However, a dask array doesn't directly hold any data. Instead, it symbolically represents the computations needed to generate the data. Nothing is actually computed until the actual numerical values are needed. This mode of operation is called "lazy"; it allows one to build up complex, large calculations symbolically before turning them over the scheduler for execution.

If we want to create a numpy array of all ones, we do it like this:

In [14]:
import numpy as np
shape = (1000, 4000)
ones_np = np.ones(shape)
ones_np
Out[14]:
array([[ 1.,  1.,  1., ...,  1.,  1.,  1.],
       [ 1.,  1.,  1., ...,  1.,  1.,  1.],
       [ 1.,  1.,  1., ...,  1.,  1.,  1.],
       ..., 
       [ 1.,  1.,  1., ...,  1.,  1.,  1.],
       [ 1.,  1.,  1., ...,  1.,  1.,  1.],
       [ 1.,  1.,  1., ...,  1.,  1.,  1.]])

This array contains exactly 32 MB of data:

In [2]:
ones_np.nbytes / 1e6
Out[2]:
32.0

Now let's create the same array using dask's array interface.

In [3]:
import dask.array as da
ones = da.ones(shape)
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
 in ()
      1 import dask.array as da
----> 2 ones = da.ones(shape)

~/miniconda3/envs/geo_scipy/lib/python3.6/site-packages/dask/array/wrap.py in wrap_func_shape_as_first_arg(func, *args, **kwargs)
     29 
     30     chunks = kwargs.pop('chunks', None)
---> 31     chunks = normalize_chunks(chunks, shape)
     32     name = kwargs.pop('name', None)
     33 

~/miniconda3/envs/geo_scipy/lib/python3.6/site-packages/dask/array/core.py in normalize_chunks(chunks, shape)
   1838     """
   1839     if chunks is None:
-> 1840         raise ValueError(CHUNKS_NONE_ERROR_MESSAGE)
   1841     if type(chunks) is not tuple:
   1842         if type(chunks) is list:

ValueError: You must specify a chunks= keyword argument.
This specifies the chunksize of your array blocks.

See the following documentation page for details:
  http://dask.pydata.org/en/latest/array-creation.html#chunks

This did not work, because we didn't tell dask how to split up the array.

A crucal difference with dask is that we must specify the chunks argument. "Chunks" describes how the array is split up over many sub-arrays.

Dask Arrays source: Dask Array Documentation

There are several ways to specify chunks. In this lecture, we will use a block shape.

In [4]:
chunk_shape = (1000, 1000)
ones = da.ones(shape, chunks=chunk_shape)
ones
Out[4]:
dask.array

Notice that we just see a symbolic represetnation of the array, including its shape, dtype, and chunksize. No data has been generated yet. When we call .compute() on a dask array, the computation is trigger and the dask array becomes a numpy array.

In [5]:
ones.compute()
Out[5]:
array([[ 1.,  1.,  1., ...,  1.,  1.,  1.],
       [ 1.,  1.,  1., ...,  1.,  1.,  1.],
       [ 1.,  1.,  1., ...,  1.,  1.,  1.],
       ..., 
       [ 1.,  1.,  1., ...,  1.,  1.,  1.],
       [ 1.,  1.,  1., ...,  1.,  1.,  1.],
       [ 1.,  1.,  1., ...,  1.,  1.,  1.]])

In order to understand what happened when we called .compute(), we can visualize the dask graph, the symbolic operations that make up the array

In [6]:
ones.visualize()
Out[6]:

Our array has four chunks. To generate it, dask calls np.ones four times and then concatenates this together into one array.

Rather than immediately loading a dask array (which puts all the data into RAM), it is more common to want to reduce the data somehow. For example

In [7]:
sum_of_ones = ones.sum()
sum_of_ones.visualize()
Out[7]:

Here we see dask's strategy for finding the sum. This simple example illustrates the beauty of dask: it automatically designs an algorithm appropriate for custom operations with big data.

If we make our operation more complex, the graph gets more complex.

In [8]:
fancy_calculation = (ones * ones[::-1, ::-1]).mean()
fancy_calculation.visualize()
Out[8]:

A Bigger Calculation

The examples above were toy examples; the data (32 MB) is nowhere nearly big enough to warrant the use of dask.

We can make it a lot bigger!

In [9]:
bigshape = (200000, 4000)
big_ones = da.ones(bigshape, chunks=chunk_shape)
big_ones
Out[9]:
dask.array
In [10]:
big_ones.nbytes / 1e6
Out[10]:
6400.0

This dataset is 3.2 GB, rather MB! This is probably close to or greater than the amount of available RAM than you have in your computer. Nevertheless, dask has no problem working on it.

Do not try to .visualize() this array!

When doing a big calculation, dask also has some tools to help us understand what is happening under the hood

In [11]:
from dask.diagnostics import ProgressBar

big_calc = (big_ones * big_ones[::-1, ::-1]).mean()

with ProgressBar():
    result = big_calc.compute()
result
[########################################] | 100% Completed |  7.3s
Out[11]:
1.0

Reduction

All the usual numpy methods work on dask arrays. You can also apply numpy function directly to a dask array, and it will stay lazy.

In [12]:
big_ones_reduce = (np.cos(big_ones)**2).mean(axis=0)
big_ones_reduce
Out[12]:
dask.array

Plotting also triggers computation, since we need the actual values

In [7]:
from matplotlib import pyplot as plt
%matplotlib inline
plt.rcParams['figure.figsize'] = (12,8)
In [13]:
plt.plot(big_ones_reduce)
Out[13]:
[]

Distributed Cluster

For more fancy visualization of what dask is doing, we can use the distributed scheduler.

In [1]:
from dask.distributed import Client, LocalCluster
In [2]:
lc = LocalCluster(n_workers=1)
client = Client(lc)
client
Out[2]:

Client

Cluster

  • Workers: 1
  • Cores: 4
  • Memory: 8.59 GB
In [16]:
big_calc.compute()
Out[16]:
1.0
In [17]:
random_values = da.random.normal(size=(2e8,), chunks=(1e6,))
hist, bins = da.histogram(random_values, bins=100, range=[-5, 5]) 
In [18]:
hist
Out[18]:
dask.array
In [19]:
x = 0.5 * (bins[1:] + bins[:-1])
width = np.diff(bins)
plt.bar(x, hist, width);

Dask + XArray

Xarray can automatically wrap its data in dask arrays. This capability turns xarray into an extremely powerful tool for Big Data earth science

To see this in action, we will download a fairly large dataset to analyze. This file contains 1 year of daily data from the AVISO sea-surface height satellite altimetry dataset.

In [20]:
! curl -O http://www.ldeo.columbia.edu/~rpa/aviso_madt_2015.tar.gz
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
 38  471M   38  180M    0     0  10.5M      0  0:00:44  0:00:17  0:00:27 8856k:30 11.7M^C
In [ ]:
! tar -xvzf aviso_madt_2015.tar.gz
In [22]:
! ls 2015 | wc -l
     365

Let's load the first file as a regular xarray dataset.

In [3]:
import xarray as xr
xr.__version__
Out[3]:
'0.10.0-52-gd8842a6'
In [24]:
ds_first = xr.open_dataset('2015/dt_global_allsat_madt_h_20150101_20150914.nc')
ds_first
Out[24]:

Dimensions:   (lat: 720, lon: 1440, nv: 2, time: 1)
Coordinates:
  * time      (time) datetime64[ns] 2015-01-01T00:01:44.499838976
  * lat       (lat) float32 -89.875 -89.625 -89.375 -89.125 -88.875 -88.625 ...
  * lon       (lon) float32 0.125 0.375 0.625 0.875 1.125 1.375 1.625 1.875 ...
  * nv        (nv) int32 0 1
Data variables:
    lat_bnds  (lat, nv) float32 ...
    lon_bnds  (lon, nv) float32 ...
    crs       int32 ...
    adt       (time, lat, lon) float64 ...
Attributes:
    Conventions:                     CF-1.6
    Metadata_Conventions:            Unidata Dataset Discovery v1.0
    cdm_data_type:                   Grid
    comment:                         Absolute Dynamic Topography
    contact:                         aviso@altimetry.fr
    creator_email:                   aviso@altimetry.fr
    creator_name:                    SSALTO/DUACS
    creator_url:                     http://www.aviso.altimetry.fr
    date_created:                    2015-10-23T17:07:42Z
    date_issued:                     2015-09-14T00:00:00Z
    date_modified:                   2015-11-13T16:23:24Z
    geospatial_lat_max:              89.875
    geospatial_lat_min:              -89.875
    geospatial_lat_resolution:       0.25
    geospatial_lat_units:            degrees_north
    geospatial_lon_max:              359.875
    geospatial_lon_min:              0.125
    geospatial_lon_resolution:       0.25
    geospatial_lon_units:            degrees_east
    geospatial_vertical_max:         0.0
    geospatial_vertical_min:         0.0
    geospatial_vertical_positive:    down
    geospatial_vertical_resolution:  point
    geospatial_vertical_units:       m
    history:                         2015-10-23T17:07:42Z: created by DUACS D...
    institution:                     CNES, CLS
    keywords:                        Oceans > Ocean Topography > Sea Surface ...
    keywords_vocabulary:             NetCDF COARDS Climate and Forecast Stand...
    license:                         http://www.aviso.altimetry.fr/fileadmin/...
    platform:                        AltiKa, Cryosat-2, Haiyang-2A, OSTM/Jason-2
    processing_level:                L4
    product_version:                 5.7
    project:                         SSALTO/DUACS :  Data Unification and Alt...
    references:                      http://www.aviso.altimetry.fr
    source:                          Altimetry measurements
    ssalto_duacs_comment:            The reference mission used for the altim...
    standard_name_vocabulary:        NetCDF Climate and Forecast (CF) Metadat...
    summary:                         Delayed-Time Level-4 sea surface height ...
    time_coverage_duration:          P1D
    time_coverage_end:               2015-01-01T12:00:00Z
    time_coverage_resolution:        P1D
    time_coverage_start:             2014-12-31T12:00:00Z
    title:                           DT merged all satellites Global Ocean Gr...
In [25]:
ds_first.nbytes / 1e6
Out[25]:
8.32034

This one file is about 8 MB. So 365 of them will be nearly 3 GB. If we had downloaded all 25 years of data, it would be 73 GB. This is a good example of "medium data."

open_mfdataset

An incredibly useful function in xarray is open_mfdataset.

In [26]:
help(xr.open_mfdataset)
Help on function open_mfdataset in module xarray.backends.api:

open_mfdataset(paths, chunks=None, concat_dim='__infer_concat_dim__', compat='no_conflicts', preprocess=None, engine=None, lock=None, data_vars='all', coords='different', **kwargs)
    Open multiple files as a single dataset.
    
    Requires dask to be installed.  Attributes from the first dataset file
    are used for the combined dataset.
    
    Parameters
    ----------
    paths : str or sequence
        Either a string glob in the form "path/to/my/files/*.nc" or an explicit
        list of files to open.  Paths can be given as strings or as pathlib
        Paths.
    chunks : int or dict, optional
        Dictionary with keys given by dimension names and values given by chunk
        sizes. In general, these should divide the dimensions of each dataset.
        If int, chunk each dimension by ``chunks``.
        By default, chunks will be chosen to load entire input files into
        memory at once. This has a major impact on performance: please see the
        full documentation for more details.
    concat_dim : None, str, DataArray or Index, optional
        Dimension to concatenate files along. This argument is passed on to
        :py:func:`xarray.auto_combine` along with the dataset objects. You only
        need to provide this argument if the dimension along which you want to
        concatenate is not a dimension in the original datasets, e.g., if you
        want to stack a collection of 2D arrays along a third dimension.
        By default, xarray attempts to infer this argument by examining
        component files. Set ``concat_dim=None`` explicitly to disable
        concatenation.
    compat : {'identical', 'equals', 'broadcast_equals',
              'no_conflicts'}, optional
        String indicating how to compare variables of the same name for
        potential conflicts when merging:
    
        - 'broadcast_equals': all values must be equal when variables are
          broadcast against each other to ensure common dimensions.
        - 'equals': all values and dimensions must be the same.
        - 'identical': all values, dimensions and attributes must be the
          same.
        - 'no_conflicts': only values which are not null in both datasets
          must be equal. The returned dataset then contains the combination
          of all non-null values.
    preprocess : callable, optional
        If provided, call this function on each dataset prior to concatenation.
    engine : {'netcdf4', 'scipy', 'pydap', 'h5netcdf', 'pynio'}, optional
        Engine to use when reading files. If not provided, the default engine
        is chosen based on available dependencies, with a preference for
        'netcdf4'.
    autoclose : bool, optional
        If True, automatically close files to avoid OS Error of too many files
        being open.  However, this option doesn't work with streams, e.g.,
        BytesIO.
    lock : False, True or threading.Lock, optional
        This argument is passed on to :py:func:`dask.array.from_array`. By
        default, a per-variable lock is used when reading data from netCDF
        files with the netcdf4 and h5netcdf engines to avoid issues with
        concurrent access when using dask's multithreaded backend.
    data_vars : {'minimal', 'different', 'all' or list of str}, optional
        These data variables will be concatenated together:
          * 'minimal': Only data variables in which the dimension already
            appears are included.
          * 'different': Data variables which are not equal (ignoring
            attributes) across all datasets are also concatenated (as well as
            all for which dimension already appears). Beware: this option may
            load the data payload of data variables into memory if they are not
            already loaded.
          * 'all': All data variables will be concatenated.
          * list of str: The listed data variables will be concatenated, in
            addition to the 'minimal' data variables.
    coords : {'minimal', 'different', 'all' o list of str}, optional
        These coordinate variables will be concatenated together:
          * 'minimal': Only coordinates in which the dimension already appears
            are included.
          * 'different': Coordinates which are not equal (ignoring attributes)
            across all datasets are also concatenated (as well as all for which
            dimension already appears). Beware: this option may load the data
            payload of coordinate variables into memory if they are not already
            loaded.
          * 'all': All coordinate variables will be concatenated, except
            those corresponding to other dimensions.
          * list of str: The listed coordinate variables will be concatenated,
            in addition the 'minimal' coordinates.
    
    **kwargs : optional
        Additional arguments passed on to :py:func:`xarray.open_dataset`.
    
    Returns
    -------
    xarray.Dataset
    
    See Also
    --------
    auto_combine
    open_dataset

Using open_mfdataset we can easily open all the netcdf files into one Dataset object.

In [4]:
# On I got a "Too many open files" OSError.
# It's only 365 files. That shouldn't be too many. 
# However, I discovered my ulimit was extremely low.
# One workaround is to call 
#  $ ulimit -S -n 4000
# from the command line before launching the notebook

ds = xr.open_mfdataset('2015/*.nc')
ds
Out[4]:

Dimensions:   (lat: 720, lon: 1440, nv: 2, time: 365)
Coordinates:
  * lat       (lat) float32 -89.875 -89.625 -89.375 -89.125 -88.875 -88.625 ...
  * lon       (lon) float32 0.125 0.375 0.625 0.875 1.125 1.375 1.625 1.875 ...
  * nv        (nv) int32 0 1
  * time      (time) datetime64[ns] 2015-01-01T00:01:44.499838976 ...
Data variables:
    lat_bnds  (time, lat, nv) float32 dask.array
    lon_bnds  (time, lon, nv) float32 dask.array
    crs       (time) int32 -2147483647 -2147483647 -2147483647 -2147483647 ...
    adt       (time, lat, lon) float64 dask.array
Attributes:
    Conventions:                     CF-1.6
    Metadata_Conventions:            Unidata Dataset Discovery v1.0
    cdm_data_type:                   Grid
    comment:                         Absolute Dynamic Topography
    contact:                         aviso@altimetry.fr
    creator_email:                   aviso@altimetry.fr
    creator_name:                    SSALTO/DUACS
    creator_url:                     http://www.aviso.altimetry.fr
    date_created:                    2015-10-23T17:07:42Z
    date_issued:                     2015-09-14T00:00:00Z
    date_modified:                   2015-11-13T16:23:24Z
    geospatial_lat_max:              89.875
    geospatial_lat_min:              -89.875
    geospatial_lat_resolution:       0.25
    geospatial_lat_units:            degrees_north
    geospatial_lon_max:              359.875
    geospatial_lon_min:              0.125
    geospatial_lon_resolution:       0.25
    geospatial_lon_units:            degrees_east
    geospatial_vertical_max:         0.0
    geospatial_vertical_min:         0.0
    geospatial_vertical_positive:    down
    geospatial_vertical_resolution:  point
    geospatial_vertical_units:       m
    history:                         2015-10-23T17:07:42Z: created by DUACS D...
    institution:                     CNES, CLS
    keywords:                        Oceans > Ocean Topography > Sea Surface ...
    keywords_vocabulary:             NetCDF COARDS Climate and Forecast Stand...
    license:                         http://www.aviso.altimetry.fr/fileadmin/...
    platform:                        AltiKa, Cryosat-2, Haiyang-2A, OSTM/Jason-2
    processing_level:                L4
    product_version:                 5.7
    project:                         SSALTO/DUACS :  Data Unification and Alt...
    references:                      http://www.aviso.altimetry.fr
    source:                          Altimetry measurements
    ssalto_duacs_comment:            The reference mission used for the altim...
    standard_name_vocabulary:        NetCDF Climate and Forecast (CF) Metadat...
    summary:                         Delayed-Time Level-4 sea surface height ...
    time_coverage_duration:          P1D
    time_coverage_end:               2015-01-01T12:00:00Z
    time_coverage_resolution:        P1D
    time_coverage_start:             2014-12-31T12:00:00Z
    title:                           DT merged all satellites Global Ocean Gr...

Note that the values are not displayed, since that would trigger computation.

In [5]:
ssh = ds.adt
ssh
Out[5]:

dask.array
Coordinates:
  * lat      (lat) float32 -89.875 -89.625 -89.375 -89.125 -88.875 -88.625 ...
  * lon      (lon) float32 0.125 0.375 0.625 0.875 1.125 1.375 1.625 1.875 ...
  * time     (time) datetime64[ns] 2015-01-01T00:01:44.499838976 ...
Attributes:
    grid_mapping:   crs
    long_name:      Absolute Dynamic Topography
    standard_name:  sea_surface_height_above_geoid
    units:          m
In [8]:
ssh[0].plot()
Out[8]:
In [9]:
ssh_2015_mean = ssh.mean(dim='time')
ssh_2015_mean.load()
Out[9]:

array([[ nan,  nan,  nan, ...,  nan,  nan,  nan],
       [ nan,  nan,  nan, ...,  nan,  nan,  nan],
       [ nan,  nan,  nan, ...,  nan,  nan,  nan],
       ..., 
       [  0.,   0.,   0., ...,   0.,   0.,   0.],
       [  0.,   0.,   0., ...,   0.,   0.,   0.],
       [  0.,   0.,   0., ...,   0.,   0.,   0.]])
Coordinates:
  * lat      (lat) float32 -89.875 -89.625 -89.375 -89.125 -88.875 -88.625 ...
  * lon      (lon) float32 0.125 0.375 0.625 0.875 1.125 1.375 1.625 1.875 ...
In [10]:
ssh_2015_mean.plot()
Out[10]:
In [11]:
ssh_anom = ssh - ssh_2015_mean
ssh_variance_lonmean = (ssh_anom**2).mean(dim=('lon', 'time'))
In [12]:
ssh_variance_lonmean.plot()
Out[12]:
[]
In [15]:
weight = np.cos(np.deg2rad(ds.lat))
weight /= weight.mean()
(ssh_anom * weight).mean(dim=('lon', 'lat')).plot()
Out[15]:
[]