Skip to content

Commit

Permalink
refactor!: Remove Dask as a hard dependency of Datashader (#1350)
Browse files Browse the repository at this point in the history
  • Loading branch information
hoxbro authored Nov 12, 2024
1 parent e5182bf commit 1b9eac6
Show file tree
Hide file tree
Showing 17 changed files with 146 additions and 58 deletions.
14 changes: 11 additions & 3 deletions datashader/bundling.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,17 @@

from math import ceil

from dask import compute, delayed
from pandas import DataFrame

try:
import dask
from dask import compute, delayed
except ImportError:
dask, compute = None, None
def delayed(*args, **kwargs):
def func(*args, **kwargs):
raise ImportError("dask is required to use delayed functions")
return func
try:
import skimage
from skimage.filters import gaussian, sobel_h, sobel_v
Expand Down Expand Up @@ -457,8 +465,8 @@ class hammer_bundle(connect_edges):
Column name for each edge weight. If None, weights are ignored.""")

def __call__(self, nodes, edges, **params):
if skimage is None:
raise ImportError("hammer_bundle operation requires scikit-image. "
if dask is None or skimage is None:
raise ImportError("hammer_bundle operation requires dask and scikit-image. "
"Ensure you install the dependency before applying "
"bundling.")

Expand Down
21 changes: 14 additions & 7 deletions datashader/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@

import numpy as np
import pandas as pd
import dask.dataframe as dd
import dask.array as da
from packaging.version import Version
from xarray import DataArray, Dataset

Expand All @@ -19,6 +17,12 @@
from .resampling import resample_2d, resample_2d_distributed
from . import reductions as rd

try:
import dask.dataframe as dd
import dask.array as da
except ImportError:
dd, da = None, None

try:
import cudf
except Exception:
Expand Down Expand Up @@ -1143,7 +1147,7 @@ def raster(self,
source_window = array[rmin:rmax+1, cmin:cmax+1]
if ds_method in ['var', 'std']:
source_window = source_window.astype('f')
if isinstance(source_window, da.Array):
if da and isinstance(source_window, da.Array):
data = resample_2d_distributed(
source_window, chunksize=chunksize, max_mem=max_mem,
**kwargs)
Expand All @@ -1156,7 +1160,7 @@ def raster(self,
source_window = source_window.astype('f')
arrays = []
for arr in source_window:
if isinstance(arr, da.Array):
if da and isinstance(arr, da.Array):
arr = resample_2d_distributed(
arr, chunksize=chunksize, max_mem=max_mem,
**kwargs)
Expand Down Expand Up @@ -1192,7 +1196,7 @@ def raster(self,
top_pad = np.full(tshape, fill_value, source_window.dtype)
bottom_pad = np.full(bshape, fill_value, source_window.dtype)

concat = da.concatenate if isinstance(data, da.Array) else np.concatenate
concat = da.concatenate if da and isinstance(data, da.Array) else np.concatenate
arrays = (top_pad, data) if top_pad.shape[0] > 0 else (data,)
if bottom_pad.shape[0] > 0:
arrays += (bottom_pad,)
Expand Down Expand Up @@ -1353,7 +1357,10 @@ def _bypixel_sanitise(source, glyph, agg):
columns = list(source.coords.keys()) + list(source.data_vars.keys())
cols_to_keep = _cols_to_keep(columns, glyph, agg)
source = source.drop_vars([col for col in columns if col not in cols_to_keep])
source = source.to_dask_dataframe()
if dd:
source = source.to_dask_dataframe()
else:
source = source.to_dataframe()

if (isinstance(source, pd.DataFrame) or
(cudf and isinstance(source, cudf.DataFrame))):
Expand All @@ -1375,7 +1382,7 @@ def _bypixel_sanitise(source, glyph, agg):
getattr(source[glyph.geometry].array, "_sindex", None) is None):
source[glyph.geometry].array._sindex = sindex
dshape = dshape_from_pandas(source)
elif isinstance(source, dd.DataFrame):
elif dd and isinstance(source, dd.DataFrame):
dshape, source = dshape_from_dask(source)
elif isinstance(source, Dataset):
# Multi-dimensional Dataset
Expand Down
14 changes: 10 additions & 4 deletions datashader/resampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,20 @@
from itertools import groupby
from math import floor, ceil

import dask.array as da
import numpy as np

from dask.delayed import delayed
from numba import prange
from .utils import ngjit, ngjit_parallel

try:
import dask.array as da
from dask.delayed import delayed
except ImportError:
da, delayed = None, None

try:
import cupy
except Exception:
except ImportError:
cupy = None


Expand Down Expand Up @@ -242,6 +246,8 @@ def resample_2d_distributed(src, w, h, ds_method='mean', us_method='linear',
resampled : dask.array.Array
A resampled version of the *src* array.
"""
if da is None:
raise ImportError('dask is required for distributed regridding')
temp_chunks = compute_chunksize(src, w, h, chunksize, max_mem)
if chunksize is None:
chunksize = src.chunksize
Expand Down Expand Up @@ -343,7 +349,7 @@ def resample_2d(src, w, h, ds_method='mean', us_method='linear',
return _mask_or_not(resampled, src, fill_value)


_resample_2d_delayed = delayed(resample_2d)
_resample_2d_delayed = delayed(resample_2d) if delayed else None


def upsample_2d(src, w, h, method=US_LINEAR, fill_value=None, out=None):
Expand Down
11 changes: 8 additions & 3 deletions datashader/tests/test_dask.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
from __future__ import annotations

import dask.dataframe as dd
import numpy as np
import pandas as pd
import xarray as xr

from dask.context import config
from numpy import nan
from packaging.version import Version

Expand All @@ -27,7 +25,14 @@
assert_eq_xr, assert_eq_ndarray, values
)

config.set(scheduler='synchronous')

try:
import dask.dataframe as dd
from dask.context import config
config.set(scheduler='synchronous')
except ImportError:
pytestmark = pytest.importorskip("dask")



@dask_switcher(query=False)
Expand Down
6 changes: 5 additions & 1 deletion datashader/tests/test_geopandas.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# Testing GeoPandas and SpatialPandas
import contextlib

import dask.dataframe as dd
import datashader as ds
from datashader.tests.test_pandas import assert_eq_ndarray
import numpy as np
Expand All @@ -10,6 +9,11 @@
from datashader.tests.utils import dask_switcher
from packaging.version import Version

try:
import dask.dataframe as dd
except ImportError:
dd = None

_backends = [
pytest.param(False, id="dask"),
]
Expand Down
5 changes: 4 additions & 1 deletion datashader/tests/test_polygons.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@
import xarray as xr
import datashader as ds
from datashader.tests.test_pandas import assert_eq_ndarray, assert_eq_xr
import dask.dataframe as dd
from datashader.tests.utils import dask_switcher

try:
import dask.dataframe as dd
except ImportError:
dd = None

@pytest.fixture(autouse=True)
def _classic_dd():
Expand Down
21 changes: 15 additions & 6 deletions datashader/tests/test_quadmesh.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,20 @@
import datashader as ds
import pytest

import dask.array
from datashader.tests.test_pandas import assert_eq_ndarray, assert_eq_xr
from datashader.tests.utils import dask_skip

array_modules = [np]

try:
import dask
import dask.array
dask.config.set(scheduler='single-threaded')
array_modules.append(dask.array)
except ImportError:
class dask:
array = None

array_modules = [np, dask.array]
try:
import cudf
import cupy
Expand All @@ -18,9 +28,6 @@
cupy = None


dask.config.set(scheduler='single-threaded')


# Raster
@pytest.mark.parametrize('array_module', array_modules)
def test_raster_quadmesh_autorange_downsample(array_module):
Expand Down Expand Up @@ -90,6 +97,7 @@ def test_raster_quadmesh_autorange(array_module):
assert_eq_ndarray(res.y_range, (0.5, 2.5), close=True)


@dask_skip
def test_raster_quadmesh_autorange_chunked():
c = ds.Canvas(plot_width=8, plot_height=6)
da = xr.DataArray(
Expand Down Expand Up @@ -331,6 +339,7 @@ def test_rectilinear_quadmesh_autorange(array_module):
assert_eq_ndarray(res.y_range, (0.5, 2.5), close=True)


@dask_skip
def test_rectilinear_quadmesh_autorange_chunked():
c = ds.Canvas(plot_width=8, plot_height=6)
da = xr.DataArray(
Expand Down Expand Up @@ -572,7 +581,7 @@ def test_curve_quadmesh_autorange(array_module):
assert_eq_ndarray(res.x_range, (0.5, 2.5), close=True)
assert_eq_ndarray(res.y_range, (-1, 7), close=True)


@dask_skip
def test_curve_quadmesh_autorange_chunked():
c = ds.Canvas(plot_width=4, plot_height=8)

Expand Down
36 changes: 21 additions & 15 deletions datashader/tests/test_raster.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,35 @@
from __future__ import annotations
import pytest

try:
import rasterio
except ImportError:
rasterio = None

try:
import rioxarray
except ImportError:
rioxarray = None

from dask.context import config

from os import path
from itertools import product

import datashader as ds
import xarray as xr
import numpy as np
import dask.array as da
import pandas as pd

from datashader.resampling import compute_chunksize
import datashader.transfer_functions as tf
from packaging.version import Version
from .utils import dask_skip

try:
import rasterio
except ImportError:
rasterio = None

try:
import rioxarray
except ImportError:
rioxarray = None

config.set(scheduler='synchronous')
try:
from dask.context import config
import dask.array as da
config.set(scheduler='synchronous')
except ImportError:
da = None

open_rasterio_available = pytest.mark.skipif(rioxarray is None and rasterio is None,
reason="requires rioxarray or rasterio")
Expand Down Expand Up @@ -454,6 +457,7 @@ def test_raster_single_pixel_range_with_padding():
assert np.allclose(agg.y.values, np.array([-0.399875, -0.199625, 0.000625, 0.200875]))


@dask_skip
@pytest.mark.parametrize(
'in_size, out_size, agg',
product(range(5, 8), range(2, 5),
Expand Down Expand Up @@ -481,6 +485,7 @@ def test_raster_distributed_downsample(in_size, out_size, agg):
assert np.allclose(agg_arr.y.values, agg_darr.y.values)


@dask_skip
@pytest.mark.parametrize('in_size, out_size', product(range(2, 5), range(7, 9)))
def test_raster_distributed_upsample(in_size, out_size):
"""
Expand All @@ -505,6 +510,7 @@ def test_raster_distributed_upsample(in_size, out_size):
assert np.allclose(agg_arr.y.values, agg_darr.y.values)


@dask_skip
def test_raster_distributed_regrid_chunksize():
"""
Ensure that distributed regrid respects explicit chunk size.
Expand All @@ -523,7 +529,7 @@ def test_raster_distributed_regrid_chunksize():

assert agg_darr.data.chunksize == (1, 1)


@dask_skip
def test_resample_compute_chunksize():
"""
Ensure chunksize computation is correct.
Expand Down
4 changes: 4 additions & 0 deletions datashader/tests/test_tiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from datashader.tiles import calculate_zoom_level_stats
from datashader.tiles import MercatorTileDefinition

from datashader.tests.utils import dask_skip

import numpy as np
import pandas as pd

Expand Down Expand Up @@ -58,6 +60,7 @@ def mock_post_render_func(img, **kwargs):


# TODO: mark with slow_test
@dask_skip
def test_render_tiles():
full_extent_of_data = (-500000, -500000,
500000, 500000)
Expand Down Expand Up @@ -108,6 +111,7 @@ def test_get_super_tile_min_max():
assert_is_numeric(result[0])
assert_is_numeric(result[1])

@dask_skip
def test_calculate_zoom_level_stats_with_fullscan_ranging_strategy():
full_extent = (-MERCATOR_CONST, -MERCATOR_CONST,
MERCATOR_CONST, MERCATOR_CONST)
Expand Down
Loading

0 comments on commit 1b9eac6

Please sign in to comment.