Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Remove state reporter for local mode #8477

Merged
merged 1 commit into from
Mar 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/batch/src/rpc/service/task_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,7 @@ impl TaskService for BatchServiceImpl {
let task = BatchTaskExecution::new(&task_id, plan, context, epoch, self.mgr.runtime())?;
let task = Arc::new(task);
let (tx, rx) = tokio::sync::mpsc::channel(LOCAL_EXECUTE_BUFFER_SIZE);
let state_reporter = StateReporter::new_with_local_sender(tx.clone());
if let Err(e) = task.clone().async_execute(state_reporter).await {
if let Err(e) = task.clone().async_execute(None).await {
error!(
"failed to build executors and trigger execution of Task {:?}: {}",
task_id, e
Expand Down
46 changes: 17 additions & 29 deletions src/batch/src/task/task_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,12 @@ use task_stats_alloc::{TaskLocalBytesAllocated, BYTES_ALLOCATED};
use tokio::runtime::Runtime;
use tokio::sync::oneshot::{Receiver, Sender};
use tokio_metrics::TaskMonitor;
use tonic::Status;

use crate::error::BatchError::{Aborted, SenderError};
use crate::error::{to_rw_error, BatchError, Result as BatchResult};
use crate::executor::{BoxedExecutor, ExecutorBuilder};
use crate::rpc::service::exchange::ExchangeWriter;
use crate::rpc::service::task_service::{GetDataResponseResult, TaskInfoResponseResult};
use crate::rpc::service::task_service::TaskInfoResponseResult;
use crate::task::channel::{create_output_channel, ChanReceiverImpl, ChanSenderImpl};
use crate::task::BatchTaskContext;

Expand Down Expand Up @@ -96,33 +95,18 @@ where
/// effect. Local sender only report Failed update, Distributed sender will also report
/// Finished/Pending/Starting/Aborted etc.
pub enum StateReporter {
Local(tokio::sync::mpsc::Sender<GetDataResponseResult>),
Distributed(tokio::sync::mpsc::Sender<TaskInfoResponseResult>),
Mock(),
}

impl StateReporter {
pub async fn send(&mut self, val: TaskInfoResponse) -> BatchResult<()> {
match self {
Self::Local(s) => {
// A hack here to convert task failure message to data error
match val.task_status() {
TaskStatus::Failed => s
.send(Err(Status::internal(val.error_message)))
.await
.map_err(|_| SenderError),
_ => Ok(()),
}
}
Self::Distributed(s) => s.send(Ok(val)).await.map_err(|_| SenderError),
Self::Mock() => Ok(()),
}
}

pub fn new_with_local_sender(s: tokio::sync::mpsc::Sender<GetDataResponseResult>) -> Self {
Self::Local(s)
}

pub fn new_with_dist_sender(s: tokio::sync::mpsc::Sender<TaskInfoResponseResult>) -> Self {
Self::Distributed(s)
}
Expand Down Expand Up @@ -355,7 +339,7 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
/// hash partitioned across multiple channels.
/// To obtain the result, one must pick one of the channels to consume via [`TaskOutputId`]. As
/// such, parallel consumers are able to consume the result independently.
pub async fn async_execute(self: Arc<Self>, state_tx: StateReporter) -> Result<()> {
pub async fn async_execute(self: Arc<Self>, state_tx: Option<StateReporter>) -> Result<()> {
let mut state_tx = state_tx;
trace!(
"Prepare executing plan [{:?}]: {}",
Expand All @@ -382,7 +366,7 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
// After we init the output receivers, it's must safe to schedule next stage -- able to send
// TaskStatus::Running here.
// Init the state receivers. Swap out later.
self.change_state_notify(TaskStatus::Running, &mut state_tx, None)
self.change_state_notify(TaskStatus::Running, state_tx.as_mut(), None)
.await?;

// Clone `self` to make compiler happy because of the move block.
Expand All @@ -398,7 +382,7 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
let task = |task_id: TaskId| async move {
// We should only pass a reference of sender to execution because we should only
// close it after task error has been set.
t_1.run(exec, sender, shutdown_rx, &mut state_tx)
t_1.run(exec, sender, shutdown_rx, state_tx.as_mut())
.in_span({
let mut span = Span::enter_with_local_parent("batch_execute");
span.add_property(|| ("task_id", task_id.task_id.to_string()));
Expand Down Expand Up @@ -470,18 +454,22 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
pub async fn change_state_notify(
&self,
task_status: TaskStatus,
state_tx: &mut StateReporter,
state_tx: Option<&mut StateReporter>,
err_str: Option<String>,
) -> BatchResult<()> {
self.change_state(task_status);
// Notify frontend the task status.
state_tx
.send(TaskInfoResponse {
task_id: Some(self.task_id.to_prost()),
task_status: task_status.into(),
error_message: err_str.unwrap_or("".to_string()),
})
.await
if let Some(reporter) = state_tx {
reporter
.send(TaskInfoResponse {
task_id: Some(self.task_id.to_prost()),
task_status: task_status.into(),
error_message: err_str.unwrap_or("".to_string()),
})
.await
} else {
Ok(())
}
}

pub fn change_state(&self, task_status: TaskStatus) {
Expand All @@ -493,7 +481,7 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
root: BoxedExecutor,
mut sender: ChanSenderImpl,
mut shutdown_rx: Receiver<String>,
state_tx: &mut StateReporter,
state_tx: Option<&mut StateReporter>,
) {
let mut data_chunk_stream = root.execute();
let mut state;
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/task/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl BatchManager {
))
.into())
};
task.async_execute(state_reporter).await?;
task.async_execute(Some(state_reporter)).await?;
ret
}

Expand Down