Skip to content

Commit

Permalink
Implement readTable, hasnext, and done methods.
Browse files Browse the repository at this point in the history
  • Loading branch information
kevingurney committed Dec 6, 2024
1 parent 8b5574f commit fab1063
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 5 deletions.
1 change: 1 addition & 0 deletions matlab/src/cpp/arrow/matlab/error/error.h
Original file line number Diff line number Diff line change
Expand Up @@ -249,5 +249,6 @@ static const char* IPC_RECORD_BATCH_READER_OPEN_FAILED =
"arrow:io:ipc:FailedToOpenRecordBatchReader";
static const char* IPC_RECORD_BATCH_READ_INVALID_INDEX = "arrow:io:ipc:InvalidIndex";
static const char* IPC_RECORD_BATCH_READ_FAILED = "arrow:io:ipc:ReadFailed";
static const char* IPC_TABLE_READ_FAILED = "arrow:io:ipc:TableReadFailed";

} // namespace arrow::matlab::error
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "arrow/io/file.h"
#include "arrow/matlab/error/error.h"
#include "arrow/matlab/tabular/proxy/record_batch.h"
#include "arrow/matlab/tabular/proxy/table.h"
#include "arrow/matlab/tabular/proxy/schema.h"
#include "arrow/util/utf8.h"

Expand All @@ -32,7 +33,7 @@ RecordBatchStreamReader::RecordBatchStreamReader(
REGISTER_METHOD(RecordBatchStreamReader, getSchema);
REGISTER_METHOD(RecordBatchStreamReader, readRecordBatch);
REGISTER_METHOD(RecordBatchStreamReader, hasNextRecordBatch);
// REGISTER_METHOD(RecordBatchStreamReader, readTable);
REGISTER_METHOD(RecordBatchStreamReader, readTable);
}

libmexclass::proxy::MakeResult RecordBatchStreamReader::make(
Expand Down Expand Up @@ -73,6 +74,20 @@ void RecordBatchStreamReader::getSchema(libmexclass::proxy::method::Context& con
context.outputs[0] = schema_proxy_id_mda;
}

void RecordBatchStreamReader::readTable(
libmexclass::proxy::method::Context& context) {
namespace mda = ::matlab::data;
using TableProxy = arrow::matlab::tabular::proxy::Table;

MATLAB_ASSIGN_OR_ERROR_WITH_CONTEXT(auto table, reader->ToTable(), context, error::IPC_TABLE_READ_FAILED);
auto table_proxy = std::make_shared<TableProxy>(table);
const auto table_proxy_id = libmexclass::proxy::ProxyManager::manageProxy(table_proxy);

mda::ArrayFactory factory;
const auto table_proxy_id_mda = factory.createScalar(table_proxy_id);
context.outputs[0] = table_proxy_id_mda;
}

void RecordBatchStreamReader::readRecordBatch(
libmexclass::proxy::method::Context& context) {
namespace mda = ::matlab::data;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class RecordBatchStreamReader : public libmexclass::proxy::Proxy {
void getSchema(libmexclass::proxy::method::Context& context);
void readRecordBatch(libmexclass::proxy::method::Context& context);
void hasNextRecordBatch(libmexclass::proxy::method::Context& context);
// void readTable(libmexclass::proxy::method::Context& context);
void readTable(libmexclass::proxy::method::Context& context);
};

} // namespace arrow::matlab::io::ipc::proxy
30 changes: 27 additions & 3 deletions matlab/src/matlab/+arrow/+io/+ipc/RecordBatchStreamReader.m
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,39 @@
schema = arrow.tabular.Schema(proxy);
end

function tf = hasNextRecordBatch(obj)
function tf = hasnext(obj)
tf = obj.Proxy.hasNextRecordBatch();
end

function rb = readRecordBatch(obj)
function tf = done(obj)
tf = ~obj.Proxy.hasNextRecordBatch();
end

function arrowRecordBatch = read(obj)
% NOTE: This function is a "convenience alias" for the readRecordBatch
% method, which has a longer name. This is the exact same implementation
% as readRecordBatch. Since this method might be called in a tight loop,
% it should be slightly more efficient to call the C++ code directly,
% rather than invoking obj.readRecordBatch indirectly. We are intentionally
% trading off code duplication for performance here.
proxyID = obj.Proxy.readRecordBatch();
proxyName = "arrow.tabular.proxy.RecordBatch";
proxy = libmexclass.proxy.Proxy(ID=proxyID, Name=proxyName);
arrowRecordBatch = arrow.tabular.RecordBatch(proxy);
end

function arrowRecordBatch = readRecordBatch(obj)
proxyID = obj.Proxy.readRecordBatch();
proxyName = "arrow.tabular.proxy.RecordBatch";
proxy = libmexclass.proxy.Proxy(ID=proxyID, Name=proxyName);
rb = arrow.tabular.RecordBatch(proxy);
arrowRecordBatch = arrow.tabular.RecordBatch(proxy);
end

function arrowTable = readTable(obj)
proxyID = obj.Proxy.readTable();
proxyName = "arrow.tabular.proxy.Table";
proxy = libmexclass.proxy.Proxy(ID=proxyID, Name=proxyName);
arrowTable = arrow.tabular.Table(proxy);
end

end
Expand Down

0 comments on commit fab1063

Please sign in to comment.