Skip to content

AsyncDrop drop codegen panic #140906

Open
Open
@ShaunSHamilton

Description

@ShaunSHamilton

Code

#![feature(async_drop, impl_trait_in_assoc_type)]

use futures_util::stream::StreamExt;
use mongodb::{
    Database,
    bson::{Document, doc},
    change_stream::event::{ChangeStreamEvent, OperationType},
};
use std::future::AsyncDrop;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
pub struct MongoDrop {
    database: Database,
    collected_events: Arc<Mutex<Vec<ChangeStreamEvent<Document>>>>,
    stop_sender: Option<oneshot::Sender<()>>,
    _listener_handle: JoinHandle<()>,
}

impl MongoDrop {
    pub async fn new(database: &Database) -> Result<Self, mongodb::error::Error> {
        let stream = database.watch().await?;

        let collected_events: Arc<Mutex<Vec<ChangeStreamEvent<Document>>>> =
            Arc::new(Mutex::new(Vec::new()));
        let events_clone = Arc::clone(&collected_events);
        let (stop_sender, mut stop_receiver) = oneshot::channel();
        let stop_sender = Some(stop_sender);
        let listener_handle = tokio::spawn(async move {
            let mut stream_fused = stream.fuse();

            loop {
                tokio::select! {
                    event_opt = stream_fused.next() => {

                        match event_opt {
                            Some(Ok(event)) => {
                                let mut events = events_clone.lock().unwrap();
                                events.push(event);
                            }
                            Some(Err(_e)) => {
                                break;
                            }
                            None => {
                                break;
                            }
                        }
                    },
                    _ = &mut stop_receiver => {
                        break;
                    },
                }
            }
        });

        Ok(MongoDrop {
            database: database.clone(),
            collected_events,
            stop_sender,
            _listener_handle: listener_handle,
        })
    }
}

impl AsyncDrop for MongoDrop {
    async fn drop(self: Pin<&mut Self>) {
        // Safety: We are consuming the struct `self` here. The future returned
        // must complete before the memory is reused. AsyncDrop ensures this.
        let this = unsafe { Pin::into_inner_unchecked(self) };
        if let Some(sender) = this.stop_sender.take() {
            if let Err(_) = sender.send(()) {}
        } else {
        }
        // Wait briefly for the listener to potentially process the last few events
        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

        let events_to_process = {
            let mut events = this.collected_events.lock().unwrap();
            events.drain(..).collect::<Vec<_>>()
        };

        if events_to_process.is_empty() {
            return; // Return from the async block (Future output is ())
        }

        for event in events_to_process.into_iter() {
            match event.operation_type {
                OperationType::Insert => {
                    if let Some(document_key) = event.document_key {
                        if let Some(id) = document_key.get("_id") {
                            let filter = doc! { "_id": id.clone() };
                            let collection = this
                                .database
                                .collection::<Document>(event.ns.unwrap().coll.unwrap().as_str());
                            if let Err(_e) = collection.delete_one(filter).await {}
                        } else {
                        }
                    } else {
                    }
                }
                OperationType::Delete => {
                    if let Some(full_document) = event.full_document_before_change {
                        let collection = this
                            .database
                            .collection::<Document>(event.ns.unwrap().coll.unwrap().as_str());
                        if let Err(_e) = collection.insert_one(full_document).await {}
                    } else {
                        if let Some(_document_key) = event.document_key {}
                    }
                }
                OperationType::Update => {
                    if let Some(document_key) = event.document_key {
                        if let Some(full_document_before) = event.full_document_before_change {
                            if let Some(id) = document_key.get("_id") {
                                let filter = doc! { "_id": id.clone() };
                                let collection = this.database.collection::<Document>(
                                    event.ns.unwrap().coll.unwrap().as_str(),
                                );
                                if let Err(_e) =
                                    collection.replace_one(filter, full_document_before).await
                                {
                                }
                            } else {
                            }
                        } else {
                        }
                    } else {
                    }
                }
                OperationType::Replace => {
                    if let Some(document_key) = event.document_key {
                        if let Some(full_document_before) = event.full_document_before_change {
                            if let Some(id) = document_key.get("_id") {
                                let filter = doc! { "_id": id.clone() };
                                let collection = this.database.collection::<Document>(
                                    event.ns.unwrap().coll.unwrap().as_str(),
                                );
                                if let Err(_e) =
                                    collection.replace_one(filter, full_document_before).await
                                {
                                }
                            } else {
                            }
                        } else {
                        }
                    } else {
                    }
                }
                _op_type => {}
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use mongodb::{
        Client,
        bson::{Document, doc},
    };

    use crate::MongoDrop;

    #[tokio::test]
    async fn asyncdrop() -> Result<(), Box<dyn std::error::Error>> {
        let mongodb_client = get_client().await?;

        let database_name = "freecodecamp";
        let collection_name = "my_test_collection";
        let db = mongodb_client.database(database_name);

        let _guard = MongoDrop::new(&db).await?;

        let collection = db.collection::<Document>(collection_name);

        // Example operations:
        collection
            .insert_one(doc! {"_id": 1, "data": "initial"})
            .await?;

        collection
            .update_one(doc! {"_id": 1}, doc! {"$set": {"data": "updated"}})
            .await?;
        collection.delete_one(doc! {"_id": 1}).await?;
        Ok(())
    }

    async fn get_client() -> Result<Client, mongodb::error::Error> {
        Client::with_uri_str("mongodb://127.0.0.1:27017/freecodecamp?directConnection=true").await
    }
}

Meta

rustc --version --verbose:

rustc 1.88.0-nightly (50aa04180 2025-05-08)
binary: rustc
commit-hash: 50aa04180709189a03dde5fd1c05751b2625ed37
commit-date: 2025-05-08
host: aarch64-unknown-linux-gnu
release: 1.88.0-nightly
LLVM version: 20.1.4

Error output

rustc-ice-2025-05-10T20_13_23-73738.txt

Backtrace

thread 'rustc' panicked at /rustc/50aa04180709189a03dde5fd1c05751b2625ed37/compiler/rustc_span/src/def_id.rs:393:26:
DefId::expect_local: `DefId(20:29 ~ mongo_drop[0a5e]::{impl#1})` isn't local
stack backtrace:
   0: __rustc::rust_begin_unwind
             at /rustc/50aa04180709189a03dde5fd1c05751b2625ed37/library/std/src/panicking.rs:697:5
   1: core::panicking::panic_fmt
             at /rustc/50aa04180709189a03dde5fd1c05751b2625ed37/library/core/src/panicking.rs:75:14
   2: <rustc_span::def_id::LocalDefId as rustc_serialize::serialize::Decodable<rustc_metadata::rmeta::decoder::DecodeContext>>::decode
   3: rustc_metadata::rmeta::decoder::cstore_impl::provide_extern::adt_async_destructor
      [... omitted 1 frame ...]
   4: <rustc_middle::ty::adt::AdtDef>::async_destructor
   5: rustc_ty_utils::needs_drop::needs_async_drop_raw
      [... omitted 2 frames ...]
   6: <rustc_mir_build::builder::Builder>::is_async_drop_impl
   7: <rustc_mir_build::builder::Builder>::pop_scope
   8: <rustc_mir_build::builder::Builder>::ast_block_stmts
   9: <rustc_mir_build::builder::Builder>::ast_block
  10: <rustc_mir_build::builder::Builder>::expr_into_dest
  11: <rustc_mir_build::builder::Builder>::expr_into_dest
  12: <rustc_mir_build::builder::Builder>::expr_into_dest
  13: <rustc_mir_build::builder::Builder>::expr_into_dest
  14: <rustc_mir_build::builder::Builder>::as_temp
  15: <rustc_mir_build::builder::Builder>::as_call_operand
  16: <rustc_mir_build::builder::Builder>::as_call_operand
  17: <rustc_mir_build::builder::Builder>::expr_into_dest
  18: <rustc_mir_build::builder::Builder>::as_temp::{closure#0}
  19: <rustc_mir_build::builder::Builder>::expr_as_place
  20: <rustc_mir_build::builder::Builder>::expr_as_place
  21: <rustc_mir_build::builder::Builder>::expr_into_dest
  22: <rustc_mir_build::builder::Builder>::expr_into_dest
  23: <rustc_mir_build::builder::Builder>::ast_block_stmts
  24: <rustc_mir_build::builder::Builder>::ast_block
  25: <rustc_mir_build::builder::Builder>::expr_into_dest
  26: <rustc_mir_build::builder::Builder>::expr_into_dest
  27: rustc_mir_build::builder::build_mir
  28: rustc_mir_transform::mir_built
      [... omitted 1 frame ...]
  29: <rustc_mir_build::check_unsafety::UnsafetyVisitor>::visit_inner_body
  30: <rustc_mir_build::check_unsafety::UnsafetyVisitor as rustc_middle::thir::visit::Visitor>::visit_expr
  31: <rustc_mir_build::check_unsafety::UnsafetyVisitor as rustc_middle::thir::visit::Visitor>::visit_block
  32: <rustc_mir_build::check_unsafety::UnsafetyVisitor as rustc_middle::thir::visit::Visitor>::visit_expr
  33: <rustc_mir_build::check_unsafety::UnsafetyVisitor as rustc_middle::thir::visit::Visitor>::visit_expr
  34: rustc_mir_build::check_unsafety::check_unsafety
      [... omitted 1 frame ...]
  35: rustc_interface::passes::run_required_analyses
  36: rustc_interface::passes::analysis
      [... omitted 1 frame ...]
  37: rustc_interface::passes::create_and_enter_global_ctxt::<core::option::Option<rustc_interface::queries::Linker>, rustc_driver_impl::run_compiler::{closure#0}::{closure#2}>::{closure#2}::{closure#0}
  38: rustc_interface::interface::run_compiler::<(), rustc_driver_impl::run_compiler::{closure#0}>::{closure#1}
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.

Additional Info

This code used to work with the older nightly version here: https://github.com/ShaunSHamilton/mongo_drop/blob/ab51919a46e1bd2c4e224307c17c5eb6c73a0557/src/lib.rs#L128

Once updated to 1.88.0 using the AsyncDrop drop impl, the compiler panics on cargo test.

Metadata

Metadata

Assignees

No one assigned

    Labels

    C-bugCategory: This is a bug.F-async_drop`#![feature(async_drop)]`I-ICEIssue: The compiler panicked, giving an Internal Compilation Error (ICE) ❄️S-has-bisectionStatus: a bisection has been found for this issueS-has-mcveStatus: A Minimal Complete and Verifiable Example has been found for this issueT-compilerRelevant to the compiler team, which will review and decide on the PR/issue.requires-incomplete-featuresThis issue requires the use of incomplete features.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions