-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-39979: [Python] Low-level bindings for exporting/importing the C Device Interface #39980
Changes from 3 commits
4dfd0d6
3b28616
596175d
6ffc6b8
864a52c
7f78d83
d64f0e0
6e0870f
5e6c3d5
51064e3
2afbc63
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -587,7 +587,8 @@ struct ArrayExporter { | |
export_.buffers_.resize(n_buffers); | ||
std::transform(buffers_begin, data->buffers.end(), export_.buffers_.begin(), | ||
[](const std::shared_ptr<Buffer>& buffer) -> const void* { | ||
return buffer ? buffer->data() : nullptr; | ||
return buffer ? reinterpret_cast<const void*>(buffer->address()) | ||
: nullptr; | ||
}); | ||
|
||
if (need_variadic_buffer_sizes) { | ||
|
@@ -1977,6 +1978,24 @@ Result<std::shared_ptr<Array>> ImportDeviceArray(struct ArrowDeviceArray* array, | |
return ImportDeviceArray(array, *maybe_type, mapper); | ||
} | ||
|
||
Result<std::shared_ptr<MemoryManager>> DefaultDeviceMapper(ArrowDeviceType device_type, | ||
int64_t device_id) { | ||
if (device_type != ARROW_DEVICE_CPU) { | ||
return Status::NotImplemented("Only importing data on CPU is supported"); | ||
} | ||
return default_cpu_memory_manager(); | ||
} | ||
|
||
Result<std::shared_ptr<Array>> ImportDeviceArray(struct ArrowDeviceArray* array, | ||
std::shared_ptr<DataType> type) { | ||
return ImportDeviceArray(array, type, DefaultDeviceMapper); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we want to provide such an API that uses a default DeviceMapper? With the current APIs here, I assume the idea is that it's the responsibility of the user (i.e. the library or application using Arrow C++ to consume data through the C Device interface) to provide the device mapping as they see fit. (I suppose when we add a default in C++, I could also give the existing signatures a default parameter value for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes, that sounds reasonable to me. I think that in many (most?) cases, users will want to use whatever device mapper is registered for the given device type. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also:
Yes, that would reduce the proliferation of different functions. You could simply have something like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The difficulty with providing a default device mapper here is that it created a circular dependency due to the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That might be a reason to keep this default on the pyarrow side? (we can implement the mapper function in C++, but only provide it as the default argument on the Python side) In Python, we can more easily dynamically check if pyarrow.cuda module is available, and if so provide a different default mapper (that includes GPU devices). |
||
|
||
Result<std::shared_ptr<Array>> ImportDeviceArray(struct ArrowDeviceArray* array, | ||
struct ArrowSchema* type) { | ||
return ImportDeviceArray(array, type, DefaultDeviceMapper); | ||
} | ||
|
||
Result<std::shared_ptr<RecordBatch>> ImportDeviceRecordBatch( | ||
struct ArrowDeviceArray* array, std::shared_ptr<Schema> schema, | ||
const DeviceMemoryMapper& mapper) { | ||
|
@@ -1997,6 +2016,16 @@ Result<std::shared_ptr<RecordBatch>> ImportDeviceRecordBatch( | |
return ImportDeviceRecordBatch(array, *maybe_schema, mapper); | ||
} | ||
|
||
Result<std::shared_ptr<RecordBatch>> ImportDeviceRecordBatch( | ||
struct ArrowDeviceArray* array, std::shared_ptr<Schema> schema) { | ||
return ImportDeviceRecordBatch(array, schema, DefaultDeviceMapper); | ||
} | ||
|
||
Result<std::shared_ptr<RecordBatch>> ImportDeviceRecordBatch( | ||
struct ArrowDeviceArray* array, struct ArrowSchema* schema) { | ||
return ImportDeviceRecordBatch(array, schema, DefaultDeviceMapper); | ||
} | ||
|
||
////////////////////////////////////////////////////////////////////////// | ||
// C stream export | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1778,6 +1778,70 @@ cdef class Array(_PandasConvertible): | |
|
||
return pyarrow_wrap_array(array) | ||
|
||
def _export_to_c_device(self, out_ptr, out_schema_ptr=0): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would propose to leave this as is, to keep it consistent with the other |
||
""" | ||
Export to a C ArrowDeviceArray struct, given its pointer. | ||
|
||
If a C ArrowSchema struct pointer is also given, the array type | ||
is exported to it at the same time. | ||
|
||
Parameters | ||
---------- | ||
out_ptr: int | ||
The raw pointer to a C ArrowDeviceArray struct. | ||
out_schema_ptr: int (optional) | ||
The raw pointer to a C ArrowSchema struct. | ||
|
||
Be careful: if you don't pass the ArrowDeviceArray struct to a consumer, | ||
array memory will leak. This is a low-level function intended for | ||
expert users. | ||
Comment on lines
+1795
to
+1797
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this explicitly mention the release callback on the struct? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I copied this from the existing docstrings. We could mention the release callback explicitly, but essentially then you are a "consumer". This functions returns an integer, you can't call the release callback on the return value as such. Only when you actually interpret it as an ArrowArray struct, you can do that (and at that point, you are a consumer who should be aware of those details?) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree with @jorisvandenbossche that the release callback need not be mentioned here. This is all in the spec. |
||
""" | ||
cdef: | ||
void* c_ptr = _as_c_pointer(out_ptr) | ||
void* c_schema_ptr = _as_c_pointer(out_schema_ptr, | ||
allow_null=True) | ||
with nogil: | ||
check_status(ExportDeviceArray( | ||
deref(self.sp_array), <shared_ptr[CSyncEvent]>NULL, | ||
<ArrowDeviceArray*> c_ptr, <ArrowSchema*> c_schema_ptr)) | ||
|
||
@staticmethod | ||
def _import_from_c_device(in_ptr, type): | ||
""" | ||
Import Array from a C ArrowDeviceArray struct, given its pointer | ||
and the imported array type. | ||
|
||
Parameters | ||
---------- | ||
in_ptr: int | ||
The raw pointer to a C ArrowDeviceArray struct. | ||
type: DataType or int | ||
Either a DataType object, or the raw pointer to a C ArrowSchema | ||
struct. | ||
|
||
This is a low-level function intended for expert users. | ||
""" | ||
cdef: | ||
void* c_ptr = _as_c_pointer(in_ptr) | ||
void* c_type_ptr | ||
shared_ptr[CArray] c_array | ||
|
||
c_type = pyarrow_unwrap_data_type(type) | ||
if c_type == nullptr: | ||
# Not a DataType object, perhaps a raw ArrowSchema pointer | ||
c_type_ptr = _as_c_pointer(type) | ||
with nogil: | ||
c_array = GetResultValue( | ||
ImportDeviceArray(<ArrowDeviceArray*> c_ptr, | ||
<ArrowSchema*> c_type_ptr) | ||
) | ||
Comment on lines
+1834
to
+1837
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The default mapper is only allowing CPU arrays, but pyarrow does have a cuda lib, shouldn't we allow and enable importing at least CUDA arrays too? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Certainly, but as mentioned earlier (#39980 (comment)), I was planning to tackle CUDA in a follow-up, and this PR indeed only properly supports and tests CPU. |
||
else: | ||
with nogil: | ||
c_array = GetResultValue( | ||
ImportDeviceArray(<ArrowDeviceArray*> c_ptr, c_type) | ||
) | ||
return pyarrow_wrap_array(c_array) | ||
|
||
def __dlpack__(self, stream=None): | ||
"""Export a primitive array as a DLPack capsule. | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -64,6 +64,16 @@ | |
// Opaque producer-specific data | ||
void* private_data; | ||
}; | ||
|
||
typedef int32_t ArrowDeviceType; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we expose the constants in pyarrow somehow? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we don't use them ourselves, I don't know if that is needed (although it might still be useful for other users of |
||
|
||
struct ArrowDeviceArray { | ||
struct ArrowArray array; | ||
int64_t device_id; | ||
ArrowDeviceType device_type; | ||
void* sync_event; | ||
int64_t reserved[3]; | ||
}; | ||
""" | ||
|
||
# TODO use out-of-line mode for faster import and avoid C parsing | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3084,6 +3084,68 @@ cdef class RecordBatch(_Tabular): | |
|
||
return pyarrow_wrap_batch(c_batch) | ||
|
||
def _export_to_c_device(self, out_ptr, out_schema_ptr=0): | ||
""" | ||
Export to a C ArrowDeviceArray struct, given its pointer. | ||
|
||
If a C ArrowSchema struct pointer is also given, the record batch | ||
schema is exported to it at the same time. | ||
|
||
Parameters | ||
---------- | ||
out_ptr: int | ||
The raw pointer to a C ArrowDeviceArray struct. | ||
out_schema_ptr: int (optional) | ||
The raw pointer to a C ArrowSchema struct. | ||
|
||
Be careful: if you don't pass the ArrowDeviceArray struct to a consumer, | ||
array memory will leak. This is a low-level function intended for | ||
expert users. | ||
""" | ||
cdef: | ||
void* c_ptr = _as_c_pointer(out_ptr) | ||
void* c_schema_ptr = _as_c_pointer(out_schema_ptr, | ||
allow_null=True) | ||
with nogil: | ||
check_status(ExportDeviceRecordBatch( | ||
deref(self.sp_batch), <shared_ptr[CSyncEvent]>NULL, | ||
<ArrowDeviceArray*> c_ptr, <ArrowSchema*> c_schema_ptr) | ||
) | ||
|
||
@staticmethod | ||
def _import_from_c_device(in_ptr, schema): | ||
""" | ||
Import RecordBatch from a C ArrowDeviceArray struct, given its pointer | ||
and the imported schema. | ||
|
||
Parameters | ||
---------- | ||
in_ptr: int | ||
The raw pointer to a C ArrowDeviceArray struct. | ||
type: Schema or int | ||
Either a Schema object, or the raw pointer to a C ArrowSchema | ||
struct. | ||
|
||
This is a low-level function intended for expert users. | ||
""" | ||
cdef: | ||
void* c_ptr = _as_c_pointer(in_ptr) | ||
void* c_schema_ptr | ||
shared_ptr[CRecordBatch] c_batch | ||
|
||
c_schema = pyarrow_unwrap_schema(schema) | ||
if c_schema == nullptr: | ||
# Not a Schema object, perhaps a raw ArrowSchema pointer | ||
c_schema_ptr = _as_c_pointer(schema, allow_null=True) | ||
with nogil: | ||
c_batch = GetResultValue(ImportDeviceRecordBatch( | ||
<ArrowDeviceArray*> c_ptr, <ArrowSchema*> c_schema_ptr)) | ||
Comment on lines
+3202
to
+3203
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same comment as before, don't we want to allow using the pyarrow.cuda lib to provide a device mapper and hallow handling cuda-based gpu memory arrays? |
||
else: | ||
with nogil: | ||
c_batch = GetResultValue(ImportDeviceRecordBatch( | ||
<ArrowDeviceArray*> c_ptr, c_schema)) | ||
return pyarrow_wrap_batch(c_batch) | ||
|
||
|
||
def _reconstruct_record_batch(columns, schema): | ||
""" | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -601,3 +601,115 @@ def test_roundtrip_batch_reader_capsule(): | |
assert imported_reader.read_next_batch().equals(batch) | ||
with pytest.raises(StopIteration): | ||
imported_reader.read_next_batch() | ||
|
||
|
||
@needs_cffi | ||
def test_export_import_device_array(): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We're copy-pasting a lot of code in those tests, can we try to reduce duplication by factoring common functionality out? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Did an attempt to refactor this. In any case it's adding less code now ;) |
||
c_schema = ffi.new("struct ArrowSchema*") | ||
ptr_schema = int(ffi.cast("uintptr_t", c_schema)) | ||
c_array = ffi.new("struct ArrowDeviceArray*") | ||
ptr_array = int(ffi.cast("uintptr_t", c_array)) | ||
|
||
gc.collect() # Make sure no Arrow data dangles in a ref cycle | ||
old_allocated = pa.total_allocated_bytes() | ||
|
||
# Type is known up front | ||
typ = pa.list_(pa.int32()) | ||
arr = pa.array([[1], [2, 42]], type=typ) | ||
py_value = arr.to_pylist() | ||
arr._export_to_c_device(ptr_array) | ||
assert pa.total_allocated_bytes() > old_allocated | ||
|
||
# verify exported struct | ||
assert c_array.device_type == 1 # ARROW_DEVICE_CPU 1 | ||
assert c_array.device_id == -1 | ||
assert c_array.array.length == 2 | ||
|
||
# Delete recreate C++ object from exported pointer | ||
del arr | ||
arr_new = pa.Array._import_from_c_device(ptr_array, typ) | ||
assert arr_new.to_pylist() == py_value | ||
assert arr_new.type == pa.list_(pa.int32()) | ||
assert pa.total_allocated_bytes() > old_allocated | ||
del arr_new, typ | ||
assert pa.total_allocated_bytes() == old_allocated | ||
# Now released | ||
with assert_array_released: | ||
pa.Array._import_from_c(ptr_array, pa.list_(pa.int32())) | ||
|
||
# Type is exported and imported at the same time | ||
arr = pa.array([[1], [2, 42]], type=pa.list_(pa.int32())) | ||
py_value = arr.to_pylist() | ||
arr._export_to_c(ptr_array, ptr_schema) | ||
# Delete and recreate C++ objects from exported pointers | ||
del arr | ||
arr_new = pa.Array._import_from_c(ptr_array, ptr_schema) | ||
assert arr_new.to_pylist() == py_value | ||
assert arr_new.type == pa.list_(pa.int32()) | ||
assert pa.total_allocated_bytes() > old_allocated | ||
del arr_new | ||
assert pa.total_allocated_bytes() == old_allocated | ||
# Now released | ||
with assert_schema_released: | ||
pa.Array._import_from_c(ptr_array, ptr_schema) | ||
|
||
|
||
@needs_cffi | ||
def test_export_import_device_batch(): | ||
c_schema = ffi.new("struct ArrowSchema*") | ||
ptr_schema = int(ffi.cast("uintptr_t", c_schema)) | ||
c_array = ffi.new("struct ArrowDeviceArray*") | ||
ptr_array = int(ffi.cast("uintptr_t", c_array)) | ||
|
||
gc.collect() # Make sure no Arrow data dangles in a ref cycle | ||
old_allocated = pa.total_allocated_bytes() | ||
|
||
# Schema is known up front | ||
batch = make_batch() | ||
schema = batch.schema | ||
py_value = batch.to_pydict() | ||
batch._export_to_c_device(ptr_array) | ||
assert pa.total_allocated_bytes() > old_allocated | ||
|
||
# verify exported struct | ||
assert c_array.device_type == 1 # ARROW_DEVICE_CPU 1 | ||
assert c_array.device_id == -1 | ||
assert c_array.array.length == 2 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could we add a test that uses the arrow cuda lib and verify the device etc.? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was planning to add actual cuda tests later in a separate PR (with proper roundtrip tests, not just export, but roundtrip doesn't work yet for non-cpu right now) |
||
|
||
# Delete and recreate C++ object from exported pointer | ||
del batch | ||
batch_new = pa.RecordBatch._import_from_c_device(ptr_array, schema) | ||
assert batch_new.to_pydict() == py_value | ||
assert batch_new.schema == schema | ||
assert pa.total_allocated_bytes() > old_allocated | ||
del batch_new, schema | ||
assert pa.total_allocated_bytes() == old_allocated | ||
# Now released | ||
with assert_array_released: | ||
pa.RecordBatch._import_from_c_device(ptr_array, make_schema()) | ||
|
||
# Type is exported and imported at the same time | ||
batch = make_batch() | ||
py_value = batch.to_pydict() | ||
batch._export_to_c_device(ptr_array, ptr_schema) | ||
# Delete and recreate C++ objects from exported pointers | ||
del batch | ||
batch_new = pa.RecordBatch._import_from_c_device(ptr_array, ptr_schema) | ||
assert batch_new.to_pydict() == py_value | ||
assert batch_new.schema == make_batch().schema | ||
assert pa.total_allocated_bytes() > old_allocated | ||
del batch_new | ||
assert pa.total_allocated_bytes() == old_allocated | ||
# Now released | ||
with assert_schema_released: | ||
pa.RecordBatch._import_from_c_device(ptr_array, ptr_schema) | ||
|
||
# Not a struct type | ||
pa.int32()._export_to_c(ptr_schema) | ||
make_batch()._export_to_c_device(ptr_array) | ||
with pytest.raises(ValueError, | ||
match="ArrowSchema describes non-struct type"): | ||
pa.RecordBatch._import_from_c_device(ptr_array, ptr_schema) | ||
# Now released | ||
with assert_schema_released: | ||
pa.RecordBatch._import_from_c_device(ptr_array, ptr_schema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could later be expanded to also allow CUDA device for CUDA enabled builds
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, there probably should be some kind of registry so that "default" device mappers can be added separately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pitrou you mention a "registry", but AFAIK that's what we ideally would have (so external device implementations could register themselves) and that doesn't exist yet, right?
In that case, is the function above an OK short-term default?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes and yes!