diff --git a/.gitignore b/.gitignore index 3033e1e0a98ec..629b545ab9e16 100644 --- a/.gitignore +++ b/.gitignore @@ -32,6 +32,7 @@ fe/ut_ports fe/*/target dependency-reduced-pom.xml tags +.tags #ignore eclipse project file & idea project file diff --git a/be/src/exec/pipeline/exchange/exchange_sink_operator.cpp b/be/src/exec/pipeline/exchange/exchange_sink_operator.cpp index a9c60667a48a3..ee8b221a6b1e9 100644 --- a/be/src/exec/pipeline/exchange/exchange_sink_operator.cpp +++ b/be/src/exec/pipeline/exchange/exchange_sink_operator.cpp @@ -303,9 +303,25 @@ Status ExchangeSinkOperator::push_chunk(RuntimeState* state, const vectorized::C if (num_rows == 0) { return Status::OK(); } - - if (_part_type == TPartitionType::HASH_PARTITIONED || - _part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) { + if (_part_type == TPartitionType::UNPARTITIONED || _channels.size() == 1) { + // We use sender request to avoid serialize chunk many times. + // 1. create a new chunk PB to serialize + ChunkPB* pchunk = _chunk_request.add_chunks(); + // 2. serialize input chunk to pchunk + RETURN_IF_ERROR(serialize_chunk(chunk.get(), pchunk, &_is_first_chunk, _channels.size())); + _current_request_bytes += pchunk->data().size(); + // 3. if request bytes exceede the threshold, send current request + if (_current_request_bytes > _request_bytes_threshold) { + butil::IOBuf attachment; + // construct_brpc_attachment(&_chunk_request, &attachment); + for (auto channel : _channels) { + RETURN_IF_ERROR(channel->send_chunk_request(&_chunk_request, attachment)); + } + _current_request_bytes = 0; + _chunk_request.clear_chunks(); + } + } else if (_part_type == TPartitionType::HASH_PARTITIONED || + _part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) { // hash-partition batch's rows across channels int num_channels = _channels.size(); { @@ -362,23 +378,6 @@ Status ExchangeSinkOperator::push_chunk(RuntimeState* state, const vectorized::C } RETURN_IF_ERROR(_channels[i]->add_rows_selective(chunk.get(), _row_indexes.data(), from, size)); } - } else if (_part_type == TPartitionType::UNPARTITIONED || _channels.size() == 1) { - // We use sender request to avoid serialize chunk many times. - // 1. create a new chunk PB to serialize - ChunkPB* pchunk = _chunk_request.add_chunks(); - // 2. serialize input chunk to pchunk - RETURN_IF_ERROR(serialize_chunk(chunk.get(), pchunk, &_is_first_chunk, _channels.size())); - _current_request_bytes += pchunk->data().size(); - // 3. if request bytes exceede the threshold, send current request - if (_current_request_bytes > _request_bytes_threshold) { - butil::IOBuf attachment; - // construct_brpc_attachment(&_chunk_request, &attachment); - for (auto channel : _channels) { - RETURN_IF_ERROR(channel->send_chunk_request(&_chunk_request, attachment)); - } - _current_request_bytes = 0; - _chunk_request.clear_chunks(); - } } return Status::OK(); }