Skip to content

Commit

Permalink
[SR-4679] Adjust the order of branches for case 'TestArray_test_array…
Browse files Browse the repository at this point in the history
…_column_with_count_distinct' (StarRocks#300)
  • Loading branch information
liuyehcf authored Sep 18, 2021
1 parent 499e797 commit e65ec37
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 20 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ fe/ut_ports
fe/*/target
dependency-reduced-pom.xml
tags
.tags


#ignore eclipse project file & idea project file
Expand Down
39 changes: 19 additions & 20 deletions be/src/exec/pipeline/exchange/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -303,9 +303,25 @@ Status ExchangeSinkOperator::push_chunk(RuntimeState* state, const vectorized::C
if (num_rows == 0) {
return Status::OK();
}

if (_part_type == TPartitionType::HASH_PARTITIONED ||
_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
if (_part_type == TPartitionType::UNPARTITIONED || _channels.size() == 1) {
// We use sender request to avoid serialize chunk many times.
// 1. create a new chunk PB to serialize
ChunkPB* pchunk = _chunk_request.add_chunks();
// 2. serialize input chunk to pchunk
RETURN_IF_ERROR(serialize_chunk(chunk.get(), pchunk, &_is_first_chunk, _channels.size()));
_current_request_bytes += pchunk->data().size();
// 3. if request bytes exceede the threshold, send current request
if (_current_request_bytes > _request_bytes_threshold) {
butil::IOBuf attachment;
// construct_brpc_attachment(&_chunk_request, &attachment);
for (auto channel : _channels) {
RETURN_IF_ERROR(channel->send_chunk_request(&_chunk_request, attachment));
}
_current_request_bytes = 0;
_chunk_request.clear_chunks();
}
} else if (_part_type == TPartitionType::HASH_PARTITIONED ||
_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
// hash-partition batch's rows across channels
int num_channels = _channels.size();
{
Expand Down Expand Up @@ -362,23 +378,6 @@ Status ExchangeSinkOperator::push_chunk(RuntimeState* state, const vectorized::C
}
RETURN_IF_ERROR(_channels[i]->add_rows_selective(chunk.get(), _row_indexes.data(), from, size));
}
} else if (_part_type == TPartitionType::UNPARTITIONED || _channels.size() == 1) {
// We use sender request to avoid serialize chunk many times.
// 1. create a new chunk PB to serialize
ChunkPB* pchunk = _chunk_request.add_chunks();
// 2. serialize input chunk to pchunk
RETURN_IF_ERROR(serialize_chunk(chunk.get(), pchunk, &_is_first_chunk, _channels.size()));
_current_request_bytes += pchunk->data().size();
// 3. if request bytes exceede the threshold, send current request
if (_current_request_bytes > _request_bytes_threshold) {
butil::IOBuf attachment;
// construct_brpc_attachment(&_chunk_request, &attachment);
for (auto channel : _channels) {
RETURN_IF_ERROR(channel->send_chunk_request(&_chunk_request, attachment));
}
_current_request_bytes = 0;
_chunk_request.clear_chunks();
}
}
return Status::OK();
}
Expand Down

0 comments on commit e65ec37

Please sign in to comment.