Skip to content

Commit

Permalink
Adding support for streaming the input arrays of a pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
joaori committed Nov 7, 2024
1 parent fdabd27 commit b379d7b
Show file tree
Hide file tree
Showing 6 changed files with 226 additions and 39 deletions.
78 changes: 56 additions & 22 deletions src/pdal/PyArray.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@

#include "PyArray.hpp"
#include <pdal/io/MemoryViewReader.hpp>
#include <numpy/arrayobject.h>

namespace pdal
{
Expand Down Expand Up @@ -94,7 +93,8 @@ std::string toString(PyObject *pname)
#define PyDataType_FIELDS(descr) ((descr)->fields)
#endif

Array::Array(PyArrayObject* array) : m_array(array), m_rowMajor(true)
Array::Array(PyArrayObject* array, std::shared_ptr<ArrayStreamHandler> stream_handler)
: m_array(array), m_rowMajor(true), m_stream_handler(std::move(stream_handler))
{
Py_XINCREF(array);

Expand Down Expand Up @@ -163,51 +163,85 @@ Array::~Array()
Py_XDECREF(m_array);
}


ArrayIter& Array::iterator()
std::shared_ptr<ArrayIter> Array::iterator()
{
ArrayIter *it = new ArrayIter(m_array);
m_iterators.emplace_back((it));
return *it;
return std::make_shared<ArrayIter>(m_array, m_stream_handler);
}

ArrayIter::ArrayIter(PyArrayObject* np_array, std::shared_ptr<ArrayStreamHandler> stream_handler)
: m_stream_handler(std::move(stream_handler))
{
resetIterator(np_array);
}

ArrayIter::ArrayIter(PyArrayObject* np_array)
void ArrayIter::resetIterator(std::optional<PyArrayObject*> np_array = {})
{
m_iter = NpyIter_New(np_array,
NPY_ITER_EXTERNAL_LOOP | NPY_ITER_READONLY | NPY_ITER_REFS_OK,
NPY_KEEPORDER, NPY_NO_CASTING, NULL);
if (!m_iter)
throw pdal_error("Unable to create numpy iterator.");
std::optional<int> stream_chunk_size = std::nullopt;
if (m_stream_handler) {
stream_chunk_size = (*m_stream_handler)();
if (*stream_chunk_size == 0) {
m_done = true;
return;
}
}

if (np_array) {
// Init iterator
m_iter = NpyIter_New(np_array.value(),
NPY_ITER_EXTERNAL_LOOP | NPY_ITER_READONLY | NPY_ITER_REFS_OK,
NPY_KEEPORDER, NPY_NO_CASTING, NULL);
if (!m_iter)
throw pdal_error("Unable to create numpy iterator.");
} else {
// Otherwise, reset the iterator to the initial state
if (NpyIter_Reset(m_iter, NULL) != NPY_SUCCEED) {
NpyIter_Deallocate(m_iter);
throw pdal_error("Unable to reset numpy iterator.");
}
}

char *itererr;
m_iterNext = NpyIter_GetIterNext(m_iter, &itererr);
if (!m_iterNext)
{
NpyIter_Deallocate(m_iter);
throw pdal_error(std::string("Unable to create numpy iterator: ") +
itererr);
throw pdal_error(std::string("Unable to create numpy iterator: ") + itererr);
}
m_data = NpyIter_GetDataPtrArray(m_iter);
m_stride = NpyIter_GetInnerStrideArray(m_iter);
m_size = NpyIter_GetInnerLoopSizePtr(m_iter);
m_stride = *NpyIter_GetInnerStrideArray(m_iter);
m_size = *NpyIter_GetInnerLoopSizePtr(m_iter);
if (stream_chunk_size) {
if (0 <= *stream_chunk_size && *stream_chunk_size <= m_size) {
m_size = *stream_chunk_size;
} else {
throw pdal_error(std::string("Stream chunk size not in the range of array length: ") +
std::to_string(*stream_chunk_size));
}
}
m_done = false;
}

ArrayIter::~ArrayIter()
{
NpyIter_Deallocate(m_iter);
if (m_iter != nullptr) {
NpyIter_Deallocate(m_iter);
}
}

ArrayIter& ArrayIter::operator++()
{
if (m_done)
return *this;

if (--(*m_size))
*m_data += *m_stride;
else if (!m_iterNext(m_iter))
m_done = true;
if (--m_size) {
*m_data += m_stride;
} else if (!m_iterNext(m_iter)) {
if (m_stream_handler) {
resetIterator();
} else {
m_done = true;
}
}
return *this;
}

Expand Down
19 changes: 12 additions & 7 deletions src/pdal/PyArray.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@

#include <vector>
#include <memory>
#include <optional>

namespace pdal
{
Expand All @@ -57,14 +58,15 @@ namespace python

class ArrayIter;

using ArrayStreamHandler = std::function<int64_t()>;

class PDAL_DLL Array
{
public:
using Shape = std::array<size_t, 3>;
using Fields = std::vector<MemoryViewReader::Field>;

Array(PyArrayObject* array);
Array(PyArrayObject* array, std::shared_ptr<ArrayStreamHandler> stream_handler = {});
~Array();

Array(Array&& a) = default;
Expand All @@ -76,14 +78,14 @@ class PDAL_DLL Array
bool rowMajor() const { return m_rowMajor; };
Shape shape() const { return m_shape; }
const Fields& fields() const { return m_fields; };
ArrayIter& iterator();
std::shared_ptr<ArrayIter> iterator();

private:
PyArrayObject* m_array;
Fields m_fields;
bool m_rowMajor;
Shape m_shape {};
std::vector<std::unique_ptr<ArrayIter>> m_iterators;
std::shared_ptr<ArrayStreamHandler> m_stream_handler;
};


Expand All @@ -93,20 +95,23 @@ class ArrayIter
ArrayIter(const ArrayIter&) = delete;
ArrayIter() = delete;

ArrayIter(PyArrayObject*);
ArrayIter(PyArrayObject*, std::shared_ptr<ArrayStreamHandler>);
~ArrayIter();

ArrayIter& operator++();
operator bool () const { return !m_done; }
char* operator*() const { return *m_data; }

private:
NpyIter *m_iter;
NpyIter *m_iter = nullptr;
NpyIter_IterNextFunc *m_iterNext;
char **m_data;
npy_intp *m_size;
npy_intp *m_stride;
npy_intp m_size;
npy_intp m_stride;
bool m_done;

std::shared_ptr<ArrayStreamHandler> m_stream_handler;
void resetIterator(std::optional<PyArrayObject*> np_array);
};

} // namespace python
Expand Down
14 changes: 10 additions & 4 deletions src/pdal/PyPipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,14 +235,20 @@ void PipelineExecutor::addArrayReaders(std::vector<std::shared_ptr<Array>> array
for (auto f : array->fields())
r.pushField(f);

ArrayIter& iter = array->iterator();
auto incrementer = [&iter](PointId id) -> char *
auto arrayIter = array->iterator();
auto incrementer = [arrayIter, firstPoint = true](PointId id) mutable -> char *
{
if (! iter)
ArrayIter& iter = *arrayIter;
if (!firstPoint && iter) {
++iter;
} else {
firstPoint = false;
}

if (!iter)
return nullptr;

char *c = *iter;
++iter;
return c;
};

Expand Down
20 changes: 16 additions & 4 deletions src/pdal/libpdalpython.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <pybind11/pybind11.h>
#include <pybind11/stl.h>
#include <pybind11/numpy.h>
#include <pybind11/functional.h>
#include <pybind11/stl/filesystem.h>
#include <iostream>

Expand Down Expand Up @@ -190,11 +191,22 @@ namespace pdal {
));
}

void setInputs(std::vector<py::array> ndarrays) {
void setInputs(const std::vector<py::object>& inputs) {
_inputs.clear();
for (const auto& ndarray: ndarrays) {
PyArrayObject* ndarray_ptr = (PyArrayObject*)ndarray.ptr();
_inputs.push_back(std::make_shared<pdal::python::Array>(ndarray_ptr));
for (const auto& input_obj: inputs) {
if (py::isinstance<py::array>(input_obj)) {
// Backward compatibility for accepting list of numpy arrays
auto ndarray = input_obj.cast<py::array>();
_inputs.push_back(std::make_shared<pdal::python::Array>((PyArrayObject*)ndarray.ptr()));
} else {
// Now expected to be a list of pairs: (numpy array, <optional> stream handler)
auto input = input_obj.cast<std::pair<py::array, pdal::python::ArrayStreamHandler>>();
_inputs.push_back(std::make_shared<pdal::python::Array>(
(PyArrayObject*)input.first.ptr(),
input.second ?
std::make_shared<pdal::python::ArrayStreamHandler>(input.second)
: nullptr));
}
}
delExecutor();
}
Expand Down
12 changes: 10 additions & 2 deletions src/pdal/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import json
import logging
from typing import Any, Container, Dict, Iterator, List, Optional, Sequence, Union, cast
from typing import Any, Container, Dict, Iterator, List, Optional, Sequence, Union, cast, Callable

import numpy as np
import pathlib
Expand Down Expand Up @@ -41,6 +41,7 @@ def __init__(
loglevel: int = logging.ERROR,
json: Optional[str] = None,
dataframes: Sequence[DataFrame] = (),
stream_handlers: Sequence[Callable[[], int]] = (),
):

if json:
Expand All @@ -58,7 +59,14 @@ def __init__(
stages = _parse_stages(spec) if isinstance(spec, str) else spec
for stage in stages:
self |= stage
self.inputs = arrays

if stream_handlers:
if len(stream_handlers) != len(arrays):
raise RuntimeError("stream_handlers must match the number of specified input arrays / dataframes")
self.inputs = [(a, h) for a, h in zip(arrays, stream_handlers)]
else:
self.inputs = [(a, None) for a in arrays]

self.loglevel = loglevel

def __getstate__(self):
Expand Down
Loading

0 comments on commit b379d7b

Please sign in to comment.