Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-39979: [Python] Low-level bindings for exporting/importing the C Device Interface #39980

Merged
merged 11 commits into from
Feb 28, 2024
8 changes: 8 additions & 0 deletions cpp/src/arrow/c/bridge.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1958,6 +1958,14 @@ Result<std::shared_ptr<RecordBatch>> ImportRecordBatch(struct ArrowArray* array,
return ImportRecordBatch(array, *maybe_schema);
}

Result<std::shared_ptr<MemoryManager>> DefaultDeviceMapper(ArrowDeviceType device_type,
int64_t device_id) {
if (device_type != ARROW_DEVICE_CPU) {
return Status::NotImplemented("Only importing data on CPU is supported");
}
return default_cpu_memory_manager();
}

Result<std::shared_ptr<Array>> ImportDeviceArray(struct ArrowDeviceArray* array,
std::shared_ptr<DataType> type,
const DeviceMemoryMapper& mapper) {
Expand Down
32 changes: 20 additions & 12 deletions cpp/src/arrow/c/bridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,10 @@ Status ExportDeviceRecordBatch(const RecordBatch& batch,
using DeviceMemoryMapper =
std::function<Result<std::shared_ptr<MemoryManager>>(ArrowDeviceType, int64_t)>;

ARROW_EXPORT
Result<std::shared_ptr<MemoryManager>> DefaultDeviceMapper(ArrowDeviceType device_type,
pitrou marked this conversation as resolved.
Show resolved Hide resolved
int64_t device_id);

/// \brief EXPERIMENTAL: Import C++ device array from the C data interface.
///
/// The ArrowArray struct has its contents moved (as per the C data interface
Expand All @@ -226,12 +230,13 @@ using DeviceMemoryMapper =
///
/// \param[in,out] array C data interface struct holding the array data
/// \param[in] type type of the imported array
/// \param[in] mapper A function to map device + id to memory manager
/// \param[in] mapper A function to map device + id to memory manager. If not
/// specified, defaults to map "cpu" to the built-in default memory manager.
/// \return Imported array object
ARROW_EXPORT
Result<std::shared_ptr<Array>> ImportDeviceArray(struct ArrowDeviceArray* array,
std::shared_ptr<DataType> type,
const DeviceMemoryMapper& mapper);
Result<std::shared_ptr<Array>> ImportDeviceArray(
struct ArrowDeviceArray* array, std::shared_ptr<DataType> type,
const DeviceMemoryMapper& mapper = DefaultDeviceMapper);

/// \brief EXPERIMENTAL: Import C++ device array and its type from the C data interface.
///
Expand All @@ -242,12 +247,13 @@ Result<std::shared_ptr<Array>> ImportDeviceArray(struct ArrowDeviceArray* array,
///
/// \param[in,out] array C data interface struct holding the array data
/// \param[in,out] type C data interface struct holding the array type
/// \param[in] mapper A function to map device + id to memory manager
/// \param[in] mapper A function to map device + id to memory manager. If not
/// specified, defaults to map "cpu" to the built-in default memory manager.
/// \return Imported array object
ARROW_EXPORT
Result<std::shared_ptr<Array>> ImportDeviceArray(struct ArrowDeviceArray* array,
struct ArrowSchema* type,
const DeviceMemoryMapper& mapper);
Result<std::shared_ptr<Array>> ImportDeviceArray(
struct ArrowDeviceArray* array, struct ArrowSchema* type,
const DeviceMemoryMapper& mapper = DefaultDeviceMapper);

/// \brief EXPERIMENTAL: Import C++ record batch with buffers on a device from the C data
/// interface.
Expand All @@ -259,12 +265,13 @@ Result<std::shared_ptr<Array>> ImportDeviceArray(struct ArrowDeviceArray* array,
///
/// \param[in,out] array C data interface struct holding the record batch data
/// \param[in] schema schema of the imported record batch
/// \param[in] mapper A function to map device + id to memory manager
/// \param[in] mapper A function to map device + id to memory manager. If not
/// specified, defaults to map "cpu" to the built-in default memory manager.
/// \return Imported record batch object
ARROW_EXPORT
Result<std::shared_ptr<RecordBatch>> ImportDeviceRecordBatch(
struct ArrowDeviceArray* array, std::shared_ptr<Schema> schema,
const DeviceMemoryMapper& mapper);
const DeviceMemoryMapper& mapper = DefaultDeviceMapper);

/// \brief EXPERIMENTAL: Import C++ record batch with buffers on a device and its schema
/// from the C data interface.
Expand All @@ -278,12 +285,13 @@ Result<std::shared_ptr<RecordBatch>> ImportDeviceRecordBatch(
///
/// \param[in,out] array C data interface struct holding the record batch data
/// \param[in,out] schema C data interface struct holding the record batch schema
/// \param[in] mapper A function to map device + id to memory manager
/// \param[in] mapper A function to map device + id to memory manager. If not
/// specified, defaults to map "cpu" to the built-in default memory manager.
/// \return Imported record batch object
ARROW_EXPORT
Result<std::shared_ptr<RecordBatch>> ImportDeviceRecordBatch(
struct ArrowDeviceArray* array, struct ArrowSchema* schema,
const DeviceMemoryMapper& mapper);
const DeviceMemoryMapper& mapper = DefaultDeviceMapper);

/// @}

Expand Down
64 changes: 64 additions & 0 deletions python/pyarrow/array.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -1778,6 +1778,70 @@ cdef class Array(_PandasConvertible):

return pyarrow_wrap_array(array)

def _export_to_c_device(self, out_ptr, out_schema_ptr=0):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out_schema_ptr=None would feel slightly more Pythonic IMHO, though that's debatable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would propose to leave this as is, to keep it consistent with the other _export_to_c definitions (and the _as_c_pointer helper also requires an integer at the moment)

"""
Export to a C ArrowDeviceArray struct, given its pointer.

If a C ArrowSchema struct pointer is also given, the array type
is exported to it at the same time.

Parameters
----------
out_ptr: int
The raw pointer to a C ArrowDeviceArray struct.
out_schema_ptr: int (optional)
The raw pointer to a C ArrowSchema struct.

Be careful: if you don't pass the ArrowDeviceArray struct to a consumer,
array memory will leak. This is a low-level function intended for
expert users.
Comment on lines +1795 to +1797
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this explicitly mention the release callback on the struct?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I copied this from the existing docstrings. We could mention the release callback explicitly, but essentially then you are a "consumer". This functions returns an integer, you can't call the release callback on the return value as such. Only when you actually interpret it as an ArrowArray struct, you can do that (and at that point, you are a consumer who should be aware of those details?)
I could also point to the general page about the C Data Interface.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @jorisvandenbossche that the release callback need not be mentioned here. This is all in the spec.

"""
cdef:
void* c_ptr = _as_c_pointer(out_ptr)
void* c_schema_ptr = _as_c_pointer(out_schema_ptr,
allow_null=True)
with nogil:
check_status(ExportDeviceArray(
deref(self.sp_array), <shared_ptr[CSyncEvent]>NULL,
<ArrowDeviceArray*> c_ptr, <ArrowSchema*> c_schema_ptr))

@staticmethod
def _import_from_c_device(in_ptr, type):
"""
Import Array from a C ArrowDeviceArray struct, given its pointer
and the imported array type.

Parameters
----------
in_ptr: int
The raw pointer to a C ArrowDeviceArray struct.
type: DataType or int
Either a DataType object, or the raw pointer to a C ArrowSchema
struct.

This is a low-level function intended for expert users.
"""
cdef:
void* c_ptr = _as_c_pointer(in_ptr)
void* c_type_ptr
shared_ptr[CArray] c_array

c_type = pyarrow_unwrap_data_type(type)
if c_type == nullptr:
# Not a DataType object, perhaps a raw ArrowSchema pointer
c_type_ptr = _as_c_pointer(type)
with nogil:
c_array = GetResultValue(
ImportDeviceArray(<ArrowDeviceArray*> c_ptr,
<ArrowSchema*> c_type_ptr)
)
Comment on lines +1834 to +1837
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default mapper is only allowing CPU arrays, but pyarrow does have a cuda lib, shouldn't we allow and enable importing at least CUDA arrays too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Certainly, but as mentioned earlier (#39980 (comment)), I was planning to tackle CUDA in a follow-up, and this PR indeed only properly supports and tests CPU.

else:
with nogil:
c_array = GetResultValue(
ImportDeviceArray(<ArrowDeviceArray*> c_ptr, c_type)
)
return pyarrow_wrap_array(c_array)

def __dlpack__(self, stream=None):
"""Export a primitive array as a DLPack capsule.

Expand Down
10 changes: 10 additions & 0 deletions python/pyarrow/cffi.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,16 @@
// Opaque producer-specific data
void* private_data;
};

typedef int32_t ArrowDeviceType;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we expose the constants in pyarrow somehow?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't use them ourselves, I don't know if that is needed (although it might still be useful for other users of pyarrow.cffi?)


struct ArrowDeviceArray {
struct ArrowArray array;
int64_t device_id;
ArrowDeviceType device_type;
void* sync_event;
int64_t reserved[3];
};
"""

# TODO use out-of-line mode for faster import and avoid C parsing
Expand Down
23 changes: 23 additions & 0 deletions python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,12 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
CResult[unique_ptr[CResizableBuffer]] AllocateResizableBuffer(
const int64_t size, CMemoryPool* pool)

cdef cppclass CSyncEvent" arrow::Device::SyncEvent":
pass

cdef cppclass CDevice" arrow::Device":
pass

cdef CMemoryPool* c_default_memory_pool" arrow::default_memory_pool"()
cdef CMemoryPool* c_system_memory_pool" arrow::system_memory_pool"()
cdef CStatus c_jemalloc_memory_pool" arrow::jemalloc_memory_pool"(
Expand Down Expand Up @@ -2901,6 +2907,9 @@ cdef extern from "arrow/c/abi.h":
cdef struct ArrowArrayStream:
void (*release)(ArrowArrayStream*) noexcept nogil

cdef struct ArrowDeviceArray:
pass

cdef extern from "arrow/c/bridge.h" namespace "arrow" nogil:
CStatus ExportType(CDataType&, ArrowSchema* out)
CResult[shared_ptr[CDataType]] ImportType(ArrowSchema*)
Expand Down Expand Up @@ -2933,6 +2942,20 @@ cdef extern from "arrow/c/bridge.h" namespace "arrow" nogil:
CStatus ExportChunkedArray(shared_ptr[CChunkedArray], ArrowArrayStream*)
CResult[shared_ptr[CChunkedArray]] ImportChunkedArray(ArrowArrayStream*)

CStatus ExportDeviceArray(const CArray&, shared_ptr[CSyncEvent],
ArrowDeviceArray* out, ArrowSchema*)
CResult[shared_ptr[CArray]] ImportDeviceArray(
ArrowDeviceArray*, shared_ptr[CDataType])
CResult[shared_ptr[CArray]] ImportDeviceArray(
ArrowDeviceArray*, ArrowSchema*)

CStatus ExportDeviceRecordBatch(const CRecordBatch&, shared_ptr[CSyncEvent],
ArrowDeviceArray* out, ArrowSchema*)
CResult[shared_ptr[CRecordBatch]] ImportDeviceRecordBatch(
ArrowDeviceArray*, shared_ptr[CSchema])
CResult[shared_ptr[CRecordBatch]] ImportDeviceRecordBatch(
ArrowDeviceArray*, ArrowSchema*)


cdef extern from "arrow/util/byte_size.h" namespace "arrow::util" nogil:
CResult[int64_t] ReferencedBufferSize(const CArray& array_data)
Expand Down
62 changes: 62 additions & 0 deletions python/pyarrow/table.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -3145,6 +3145,68 @@ cdef class RecordBatch(_Tabular):

return pyarrow_wrap_batch(c_batch)

def _export_to_c_device(self, out_ptr, out_schema_ptr=0):
"""
Export to a C ArrowDeviceArray struct, given its pointer.

If a C ArrowSchema struct pointer is also given, the record batch
schema is exported to it at the same time.

Parameters
----------
out_ptr: int
The raw pointer to a C ArrowDeviceArray struct.
out_schema_ptr: int (optional)
The raw pointer to a C ArrowSchema struct.

Be careful: if you don't pass the ArrowDeviceArray struct to a consumer,
array memory will leak. This is a low-level function intended for
expert users.
"""
cdef:
void* c_ptr = _as_c_pointer(out_ptr)
void* c_schema_ptr = _as_c_pointer(out_schema_ptr,
allow_null=True)
with nogil:
check_status(ExportDeviceRecordBatch(
deref(self.sp_batch), <shared_ptr[CSyncEvent]>NULL,
<ArrowDeviceArray*> c_ptr, <ArrowSchema*> c_schema_ptr)
)

@staticmethod
def _import_from_c_device(in_ptr, schema):
"""
Import RecordBatch from a C ArrowDeviceArray struct, given its pointer
and the imported schema.

Parameters
----------
in_ptr: int
The raw pointer to a C ArrowDeviceArray struct.
type: Schema or int
Either a Schema object, or the raw pointer to a C ArrowSchema
struct.

This is a low-level function intended for expert users.
"""
cdef:
void* c_ptr = _as_c_pointer(in_ptr)
void* c_schema_ptr
shared_ptr[CRecordBatch] c_batch

c_schema = pyarrow_unwrap_schema(schema)
if c_schema == nullptr:
# Not a Schema object, perhaps a raw ArrowSchema pointer
c_schema_ptr = _as_c_pointer(schema, allow_null=True)
with nogil:
c_batch = GetResultValue(ImportDeviceRecordBatch(
<ArrowDeviceArray*> c_ptr, <ArrowSchema*> c_schema_ptr))
Comment on lines +3202 to +3203
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment as before, don't we want to allow using the pyarrow.cuda lib to provide a device mapper and hallow handling cuda-based gpu memory arrays?

else:
with nogil:
c_batch = GetResultValue(ImportDeviceRecordBatch(
<ArrowDeviceArray*> c_ptr, c_schema))
return pyarrow_wrap_batch(c_batch)


def _reconstruct_record_batch(columns, schema):
"""
Expand Down
Loading
Loading