Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-39979: [Python] Low-level bindings for exporting/importing the C Device Interface #39980

Merged
merged 11 commits into from
Feb 28, 2024
10 changes: 10 additions & 0 deletions cpp/src/arrow/c/bridge.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2016,6 +2016,16 @@ Result<std::shared_ptr<RecordBatch>> ImportDeviceRecordBatch(
return ImportDeviceRecordBatch(array, *maybe_schema, mapper);
}

Result<std::shared_ptr<RecordBatch>> ImportDeviceRecordBatch(
struct ArrowDeviceArray* array, std::shared_ptr<Schema> schema) {
return ImportDeviceRecordBatch(array, schema, DefaultDeviceMapper);
}

Result<std::shared_ptr<RecordBatch>> ImportDeviceRecordBatch(
struct ArrowDeviceArray* array, struct ArrowSchema* schema) {
return ImportDeviceRecordBatch(array, schema, DefaultDeviceMapper);
}

//////////////////////////////////////////////////////////////////////////
// C stream export

Expand Down
7 changes: 7 additions & 0 deletions cpp/src/arrow/c/bridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,13 @@ Result<std::shared_ptr<RecordBatch>> ImportDeviceRecordBatch(
struct ArrowDeviceArray* array, struct ArrowSchema* schema,
const DeviceMemoryMapper& mapper);

ARROW_EXPORT
Result<std::shared_ptr<RecordBatch>> ImportDeviceRecordBatch(
struct ArrowDeviceArray* array, std::shared_ptr<Schema> schema);
ARROW_EXPORT
Result<std::shared_ptr<RecordBatch>> ImportDeviceRecordBatch(
struct ArrowDeviceArray* array, struct ArrowSchema* schema);

/// @}

/// \defgroup c-stream-interface Functions for working with the C data interface.
Expand Down
15 changes: 11 additions & 4 deletions python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -2851,10 +2851,17 @@ cdef extern from "arrow/c/bridge.h" namespace "arrow" nogil:

CStatus ExportDeviceArray(const CArray&, shared_ptr[CSyncEvent],
ArrowDeviceArray* out, ArrowSchema*)
CResult[shared_ptr[CArray]] ImportDeviceArray(ArrowDeviceArray*,
shared_ptr[CDataType])
CResult[shared_ptr[CArray]] ImportDeviceArray(ArrowDeviceArray*,
ArrowSchema*)
CResult[shared_ptr[CArray]] ImportDeviceArray(
ArrowDeviceArray*, shared_ptr[CDataType])
CResult[shared_ptr[CArray]] ImportDeviceArray(
ArrowDeviceArray*, ArrowSchema*)

CStatus ExportDeviceRecordBatch(const CRecordBatch&, shared_ptr[CSyncEvent],
ArrowDeviceArray* out, ArrowSchema*)
CResult[shared_ptr[CRecordBatch]] ImportDeviceRecordBatch(
ArrowDeviceArray*, shared_ptr[CSchema])
CResult[shared_ptr[CRecordBatch]] ImportDeviceRecordBatch(
ArrowDeviceArray*, ArrowSchema*)

cdef extern from "arrow/util/byte_size.h" namespace "arrow::util" nogil:
CResult[int64_t] ReferencedBufferSize(const CArray& array_data)
Expand Down
62 changes: 62 additions & 0 deletions python/pyarrow/table.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -3084,6 +3084,68 @@ cdef class RecordBatch(_Tabular):

return pyarrow_wrap_batch(c_batch)

def _export_to_c_device(self, out_ptr, out_schema_ptr=0):
"""
Export to a C ArrowDeviceArray struct, given its pointer.

If a C ArrowSchema struct pointer is also given, the record batch
schema is exported to it at the same time.

Parameters
----------
out_ptr: int
The raw pointer to a C ArrowDeviceArray struct.
out_schema_ptr: int (optional)
The raw pointer to a C ArrowSchema struct.

Be careful: if you don't pass the ArrowDeviceArray struct to a consumer,
array memory will leak. This is a low-level function intended for
expert users.
"""
cdef:
void* c_ptr = _as_c_pointer(out_ptr)
void* c_schema_ptr = _as_c_pointer(out_schema_ptr,
allow_null=True)
with nogil:
check_status(ExportDeviceRecordBatch(
deref(self.sp_batch), <shared_ptr[CSyncEvent]>NULL,
<ArrowDeviceArray*> c_ptr, <ArrowSchema*> c_schema_ptr)
)

@staticmethod
def _import_from_c_device(in_ptr, schema):
"""
Import RecordBatch from a C ArrowDeviceArray struct, given its pointer
and the imported schema.

Parameters
----------
in_ptr: int
The raw pointer to a C ArrowDeviceArray struct.
type: Schema or int
Either a Schema object, or the raw pointer to a C ArrowSchema
struct.

This is a low-level function intended for expert users.
"""
cdef:
void* c_ptr = _as_c_pointer(in_ptr)
void* c_schema_ptr
shared_ptr[CRecordBatch] c_batch

c_schema = pyarrow_unwrap_schema(schema)
if c_schema == nullptr:
# Not a Schema object, perhaps a raw ArrowSchema pointer
c_schema_ptr = _as_c_pointer(schema, allow_null=True)
with nogil:
c_batch = GetResultValue(ImportDeviceRecordBatch(
<ArrowDeviceArray*> c_ptr, <ArrowSchema*> c_schema_ptr))
Comment on lines +3202 to +3203
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment as before, don't we want to allow using the pyarrow.cuda lib to provide a device mapper and hallow handling cuda-based gpu memory arrays?

else:
with nogil:
c_batch = GetResultValue(ImportDeviceRecordBatch(
<ArrowDeviceArray*> c_ptr, c_schema))
return pyarrow_wrap_batch(c_batch)


def _reconstruct_record_batch(columns, schema):
"""
Expand Down
61 changes: 61 additions & 0 deletions python/pyarrow/tests/test_cffi.py
Original file line number Diff line number Diff line change
Expand Up @@ -652,3 +652,64 @@ def test_export_import_device_array():
# Now released
with assert_schema_released:
pa.Array._import_from_c(ptr_array, ptr_schema)


@needs_cffi
def test_export_import_device_batch():
c_schema = ffi.new("struct ArrowSchema*")
ptr_schema = int(ffi.cast("uintptr_t", c_schema))
c_array = ffi.new("struct ArrowDeviceArray*")
ptr_array = int(ffi.cast("uintptr_t", c_array))

gc.collect() # Make sure no Arrow data dangles in a ref cycle
old_allocated = pa.total_allocated_bytes()

# Schema is known up front
batch = make_batch()
schema = batch.schema
py_value = batch.to_pydict()
batch._export_to_c_device(ptr_array)
assert pa.total_allocated_bytes() > old_allocated

# verify exported struct
assert c_array.device_type == 1 # ARROW_DEVICE_CPU 1
assert c_array.device_id == -1
assert c_array.array.length == 2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we add a test that uses the arrow cuda lib and verify the device etc.?

Copy link
Member Author

@jorisvandenbossche jorisvandenbossche Feb 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was planning to add actual cuda tests later in a separate PR (with proper roundtrip tests, not just export, but roundtrip doesn't work yet for non-cpu right now)


# Delete and recreate C++ object from exported pointer
del batch
batch_new = pa.RecordBatch._import_from_c_device(ptr_array, schema)
assert batch_new.to_pydict() == py_value
assert batch_new.schema == schema
assert pa.total_allocated_bytes() > old_allocated
del batch_new, schema
assert pa.total_allocated_bytes() == old_allocated
# Now released
with assert_array_released:
pa.RecordBatch._import_from_c_device(ptr_array, make_schema())

# Type is exported and imported at the same time
batch = make_batch()
py_value = batch.to_pydict()
batch._export_to_c_device(ptr_array, ptr_schema)
# Delete and recreate C++ objects from exported pointers
del batch
batch_new = pa.RecordBatch._import_from_c_device(ptr_array, ptr_schema)
assert batch_new.to_pydict() == py_value
assert batch_new.schema == make_batch().schema
assert pa.total_allocated_bytes() > old_allocated
del batch_new
assert pa.total_allocated_bytes() == old_allocated
# Now released
with assert_schema_released:
pa.RecordBatch._import_from_c_device(ptr_array, ptr_schema)

# Not a struct type
pa.int32()._export_to_c(ptr_schema)
make_batch()._export_to_c_device(ptr_array)
with pytest.raises(ValueError,
match="ArrowSchema describes non-struct type"):
pa.RecordBatch._import_from_c_device(ptr_array, ptr_schema)
# Now released
with assert_schema_released:
pa.RecordBatch._import_from_c_device(ptr_array, ptr_schema)
Loading