Skip to content

Commit 47a0806

Browse files
committed
fix stats sender
1 parent 509fa2d commit 47a0806

File tree

1 file changed

+14
-12
lines changed

1 file changed

+14
-12
lines changed

src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs

+14-12
Original file line numberDiff line numberDiff line change
@@ -876,20 +876,20 @@ impl QueryCoordinator {
876876
let query_ctx = info_mut.query_ctx.clone();
877877
let request_server_exchanges = std::mem::take(&mut self.statistics_exchanges);
878878

879-
if request_server_exchanges.len() != 1 {
880-
return Err(ErrorCode::Internal(
881-
"Request server must less than 1 if is not request server.",
882-
));
879+
if request_server_exchanges.len() > 1 {
880+
return Err(ErrorCode::Internal(format!(
881+
"Request server exchanges must less than or equal to 1, but it's {}",
882+
request_server_exchanges.len()
883+
)));
883884
}
884885

885886
let ctx = query_ctx.clone();
886-
let (_, request_server_exchange) = request_server_exchanges.into_iter().next().unwrap();
887-
let mut statistics_sender = StatisticsSender::spawn(
888-
&query_id,
889-
ctx,
890-
request_server_exchange,
891-
executor.get_inner(),
892-
);
887+
let statistics_sender = request_server_exchanges
888+
.into_iter()
889+
.next()
890+
.map(|(_, exchange)| {
891+
StatisticsSender::spawn(&query_id, ctx, exchange, executor.get_inner())
892+
});
893893

894894
let span = if let Some(parent) = SpanContext::current_local_parent() {
895895
Span::root("Distributed-Executor", parent)
@@ -900,7 +900,9 @@ impl QueryCoordinator {
900900
Thread::named_spawn(Some(String::from("Distributed-Executor")), move || {
901901
let _g = span.set_local_parent();
902902
let error = executor.execute().err();
903-
statistics_sender.shutdown(error.clone());
903+
if let Some(mut statistics_sender) = statistics_sender {
904+
statistics_sender.shutdown(error.clone());
905+
}
904906
query_ctx
905907
.get_exchange_manager()
906908
.on_finished_query(&query_id, error);

0 commit comments

Comments
 (0)