From af55344babd8a88c633767472f4e4f3a4dbcb5ea Mon Sep 17 00:00:00 2001 From: link2xt Date: Tue, 28 Jan 2025 17:10:41 +0000 Subject: [PATCH] fix: make handle_unilateral non-async The check for `unsolicited.is_full()` at the beginning of `handle_unilateral` is not sufficient if the function is called from multiple threads parallel. This normally should not happen, but not guaranteed. Instead of checking if the channel is full in advance, use `tr_send` and ignore the error if the channel happens to be full when we try to send into it. We also ignore the error when the channel is closed instead of panic because the library should never panic. --- src/client.rs | 2 +- src/extensions/id.rs | 2 +- src/extensions/idle.rs | 6 ++-- src/extensions/quota.rs | 4 +-- src/parse.rs | 61 ++++++++++++++++------------------------- 5 files changed, 31 insertions(+), 44 deletions(-) diff --git a/src/client.rs b/src/client.rs index 47099ed..b4fa1be 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1434,7 +1434,7 @@ impl Connection { } if let Some(unsolicited) = unsolicited.clone() { - handle_unilateral(response, unsolicited).await; + handle_unilateral(response, unsolicited); } if let Some(res) = self.stream.next().await { diff --git a/src/extensions/id.rs b/src/extensions/id.rs index 6d341ab..00e6c84 100644 --- a/src/extensions/id.rs +++ b/src/extensions/id.rs @@ -56,7 +56,7 @@ pub(crate) async fn parse_id> + Unpin> }) } _ => { - handle_unilateral(resp, unsolicited.clone()).await; + handle_unilateral(resp, unsolicited.clone()); } } } diff --git a/src/extensions/idle.rs b/src/extensions/idle.rs index 2a981ed..70e38e1 100644 --- a/src/extensions/idle.rs +++ b/src/extensions/idle.rs @@ -168,7 +168,7 @@ impl Handle { // continuation, wait for it } Response::Done { .. } => { - handle_unilateral(resp, sender.clone()).await; + handle_unilateral(resp, sender.clone()); } _ => return Ok(IdleResponse::NewData(resp)), } @@ -203,10 +203,10 @@ impl Handle { .into()); } } - handle_unilateral(res, self.session.unsolicited_responses_tx.clone()).await; + handle_unilateral(res, self.session.unsolicited_responses_tx.clone()); } _ => { - handle_unilateral(res, self.session.unsolicited_responses_tx.clone()).await; + handle_unilateral(res, self.session.unsolicited_responses_tx.clone()); } } } diff --git a/src/extensions/quota.rs b/src/extensions/quota.rs index 64469af..8b62535 100644 --- a/src/extensions/quota.rs +++ b/src/extensions/quota.rs @@ -30,7 +30,7 @@ pub(crate) async fn parse_get_quota> + match resp.parsed() { Response::Quota(q) => quota = Some(q.clone().into()), _ => { - handle_unilateral(resp, unsolicited.clone()).await; + handle_unilateral(resp, unsolicited.clone()); } } } @@ -65,7 +65,7 @@ pub(crate) async fn parse_get_quota_root { - handle_unilateral(resp, unsolicited.clone()).await; + handle_unilateral(resp, unsolicited.clone()); } } } diff --git a/src/parse.rs b/src/parse.rs index dff34df..3424342 100644 --- a/src/parse.rs +++ b/src/parse.rs @@ -29,7 +29,7 @@ pub(crate) fn parse_names> + Unpin + S Some(Ok(name)) } _ => { - handle_unilateral(resp, unsolicited).await; + handle_unilateral(resp, unsolicited); None } }, @@ -79,7 +79,7 @@ pub(crate) fn parse_fetches> + Unpin + Ok(resp) => match resp.parsed() { Response::Fetch(..) => Some(Ok(Fetch::new(resp))), _ => { - handle_unilateral(resp, unsolicited).await; + handle_unilateral(resp, unsolicited); None } }, @@ -157,7 +157,7 @@ pub(crate) async fn parse_status> + Un } } _ => { - handle_unilateral(resp, unsolicited.clone()).await; + handle_unilateral(resp, unsolicited.clone()); } } } @@ -182,7 +182,7 @@ pub(crate) fn parse_expunge> + Unpin + Ok(resp) => match resp.parsed() { Response::Expunge(id) => Some(Ok(*id)), _ => { - handle_unilateral(resp, unsolicited).await; + handle_unilateral(resp, unsolicited); None } }, @@ -213,7 +213,7 @@ pub(crate) async fn parse_capabilities } } _ => { - handle_unilateral(resp, unsolicited.clone()).await; + handle_unilateral(resp, unsolicited.clone()); } } } @@ -232,7 +232,7 @@ pub(crate) async fn parse_noop> + Unpi .await { let resp = resp?; - handle_unilateral(resp, unsolicited.clone()).await; + handle_unilateral(resp, unsolicited.clone()); } Ok(()) @@ -338,7 +338,7 @@ pub(crate) async fn parse_mailbox> + U } } Response::MailboxData(m) => match m { - MailboxDatum::Status { .. } => handle_unilateral(resp, unsolicited.clone()).await, + MailboxDatum::Status { .. } => handle_unilateral(resp, unsolicited.clone()), MailboxDatum::Exists(e) => { mailbox.exists = *e; } @@ -358,7 +358,7 @@ pub(crate) async fn parse_mailbox> + U _ => {} }, _ => { - handle_unilateral(resp, unsolicited.clone()).await; + handle_unilateral(resp, unsolicited.clone()); } } } @@ -386,7 +386,7 @@ pub(crate) async fn parse_ids> + Unpin } } _ => { - handle_unilateral(resp, unsolicited.clone()).await; + handle_unilateral(resp, unsolicited.clone()); } } } @@ -421,57 +421,44 @@ pub(crate) async fn parse_metadata> + // [Unsolicited METADATA Response without Values](https://datatracker.ietf.org/doc/html/rfc5464.html#section-4.4.2), // they go to unsolicited channel with other unsolicited responses. _ => { - handle_unilateral(resp, unsolicited.clone()).await; + handle_unilateral(resp, unsolicited.clone()); } } } Ok(res_values) } -// check if this is simply a unilateral server response -// (see Section 7 of RFC 3501): -pub(crate) async fn handle_unilateral( +/// Sends unilateral server response +/// (see Section 7 of RFC 3501) +/// into the channel. +/// +/// If the channel is full or closed, +/// i.e. the responses are not being consumed, +/// ignores new responses. +pub(crate) fn handle_unilateral( res: ResponseData, unsolicited: channel::Sender, ) { - // ignore these if they are not being consumed - if unsolicited.is_full() { - return; - } - match res.parsed() { Response::MailboxData(MailboxDatum::Status { mailbox, status }) => { unsolicited - .send(UnsolicitedResponse::Status { + .try_send(UnsolicitedResponse::Status { mailbox: (mailbox.as_ref()).into(), attributes: status.to_vec(), }) - .await - .expect("Channel closed unexpectedly"); + .ok(); } Response::MailboxData(MailboxDatum::Recent(n)) => { - unsolicited - .send(UnsolicitedResponse::Recent(*n)) - .await - .expect("Channel closed unexpectedly"); + unsolicited.try_send(UnsolicitedResponse::Recent(*n)).ok(); } Response::MailboxData(MailboxDatum::Exists(n)) => { - unsolicited - .send(UnsolicitedResponse::Exists(*n)) - .await - .expect("Channel closed unexpectedly"); + unsolicited.try_send(UnsolicitedResponse::Exists(*n)).ok(); } Response::Expunge(n) => { - unsolicited - .send(UnsolicitedResponse::Expunge(*n)) - .await - .expect("Channel closed unexpectedly"); + unsolicited.try_send(UnsolicitedResponse::Expunge(*n)).ok(); } _ => { - unsolicited - .send(UnsolicitedResponse::Other(res)) - .await - .expect("Channel closed unexpectedly"); + unsolicited.try_send(UnsolicitedResponse::Other(res)).ok(); } } }