Open
Description
Code
#![feature(async_drop, impl_trait_in_assoc_type)]
use futures_util::stream::StreamExt;
use mongodb::{
Database,
bson::{Document, doc},
change_stream::event::{ChangeStreamEvent, OperationType},
};
use std::future::AsyncDrop;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
pub struct MongoDrop {
database: Database,
collected_events: Arc<Mutex<Vec<ChangeStreamEvent<Document>>>>,
stop_sender: Option<oneshot::Sender<()>>,
_listener_handle: JoinHandle<()>,
}
impl MongoDrop {
pub async fn new(database: &Database) -> Result<Self, mongodb::error::Error> {
let stream = database.watch().await?;
let collected_events: Arc<Mutex<Vec<ChangeStreamEvent<Document>>>> =
Arc::new(Mutex::new(Vec::new()));
let events_clone = Arc::clone(&collected_events);
let (stop_sender, mut stop_receiver) = oneshot::channel();
let stop_sender = Some(stop_sender);
let listener_handle = tokio::spawn(async move {
let mut stream_fused = stream.fuse();
loop {
tokio::select! {
event_opt = stream_fused.next() => {
match event_opt {
Some(Ok(event)) => {
let mut events = events_clone.lock().unwrap();
events.push(event);
}
Some(Err(_e)) => {
break;
}
None => {
break;
}
}
},
_ = &mut stop_receiver => {
break;
},
}
}
});
Ok(MongoDrop {
database: database.clone(),
collected_events,
stop_sender,
_listener_handle: listener_handle,
})
}
}
impl AsyncDrop for MongoDrop {
async fn drop(self: Pin<&mut Self>) {
// Safety: We are consuming the struct `self` here. The future returned
// must complete before the memory is reused. AsyncDrop ensures this.
let this = unsafe { Pin::into_inner_unchecked(self) };
if let Some(sender) = this.stop_sender.take() {
if let Err(_) = sender.send(()) {}
} else {
}
// Wait briefly for the listener to potentially process the last few events
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let events_to_process = {
let mut events = this.collected_events.lock().unwrap();
events.drain(..).collect::<Vec<_>>()
};
if events_to_process.is_empty() {
return; // Return from the async block (Future output is ())
}
for event in events_to_process.into_iter() {
match event.operation_type {
OperationType::Insert => {
if let Some(document_key) = event.document_key {
if let Some(id) = document_key.get("_id") {
let filter = doc! { "_id": id.clone() };
let collection = this
.database
.collection::<Document>(event.ns.unwrap().coll.unwrap().as_str());
if let Err(_e) = collection.delete_one(filter).await {}
} else {
}
} else {
}
}
OperationType::Delete => {
if let Some(full_document) = event.full_document_before_change {
let collection = this
.database
.collection::<Document>(event.ns.unwrap().coll.unwrap().as_str());
if let Err(_e) = collection.insert_one(full_document).await {}
} else {
if let Some(_document_key) = event.document_key {}
}
}
OperationType::Update => {
if let Some(document_key) = event.document_key {
if let Some(full_document_before) = event.full_document_before_change {
if let Some(id) = document_key.get("_id") {
let filter = doc! { "_id": id.clone() };
let collection = this.database.collection::<Document>(
event.ns.unwrap().coll.unwrap().as_str(),
);
if let Err(_e) =
collection.replace_one(filter, full_document_before).await
{
}
} else {
}
} else {
}
} else {
}
}
OperationType::Replace => {
if let Some(document_key) = event.document_key {
if let Some(full_document_before) = event.full_document_before_change {
if let Some(id) = document_key.get("_id") {
let filter = doc! { "_id": id.clone() };
let collection = this.database.collection::<Document>(
event.ns.unwrap().coll.unwrap().as_str(),
);
if let Err(_e) =
collection.replace_one(filter, full_document_before).await
{
}
} else {
}
} else {
}
} else {
}
}
_op_type => {}
}
}
}
}
#[cfg(test)]
mod tests {
use mongodb::{
Client,
bson::{Document, doc},
};
use crate::MongoDrop;
#[tokio::test]
async fn asyncdrop() -> Result<(), Box<dyn std::error::Error>> {
let mongodb_client = get_client().await?;
let database_name = "freecodecamp";
let collection_name = "my_test_collection";
let db = mongodb_client.database(database_name);
let _guard = MongoDrop::new(&db).await?;
let collection = db.collection::<Document>(collection_name);
// Example operations:
collection
.insert_one(doc! {"_id": 1, "data": "initial"})
.await?;
collection
.update_one(doc! {"_id": 1}, doc! {"$set": {"data": "updated"}})
.await?;
collection.delete_one(doc! {"_id": 1}).await?;
Ok(())
}
async fn get_client() -> Result<Client, mongodb::error::Error> {
Client::with_uri_str("mongodb://127.0.0.1:27017/freecodecamp?directConnection=true").await
}
}
Meta
rustc --version --verbose
:
rustc 1.88.0-nightly (50aa04180 2025-05-08)
binary: rustc
commit-hash: 50aa04180709189a03dde5fd1c05751b2625ed37
commit-date: 2025-05-08
host: aarch64-unknown-linux-gnu
release: 1.88.0-nightly
LLVM version: 20.1.4
Error output
rustc-ice-2025-05-10T20_13_23-73738.txt
Backtrace
thread 'rustc' panicked at /rustc/50aa04180709189a03dde5fd1c05751b2625ed37/compiler/rustc_span/src/def_id.rs:393:26:
DefId::expect_local: `DefId(20:29 ~ mongo_drop[0a5e]::{impl#1})` isn't local
stack backtrace:
0: __rustc::rust_begin_unwind
at /rustc/50aa04180709189a03dde5fd1c05751b2625ed37/library/std/src/panicking.rs:697:5
1: core::panicking::panic_fmt
at /rustc/50aa04180709189a03dde5fd1c05751b2625ed37/library/core/src/panicking.rs:75:14
2: <rustc_span::def_id::LocalDefId as rustc_serialize::serialize::Decodable<rustc_metadata::rmeta::decoder::DecodeContext>>::decode
3: rustc_metadata::rmeta::decoder::cstore_impl::provide_extern::adt_async_destructor
[... omitted 1 frame ...]
4: <rustc_middle::ty::adt::AdtDef>::async_destructor
5: rustc_ty_utils::needs_drop::needs_async_drop_raw
[... omitted 2 frames ...]
6: <rustc_mir_build::builder::Builder>::is_async_drop_impl
7: <rustc_mir_build::builder::Builder>::pop_scope
8: <rustc_mir_build::builder::Builder>::ast_block_stmts
9: <rustc_mir_build::builder::Builder>::ast_block
10: <rustc_mir_build::builder::Builder>::expr_into_dest
11: <rustc_mir_build::builder::Builder>::expr_into_dest
12: <rustc_mir_build::builder::Builder>::expr_into_dest
13: <rustc_mir_build::builder::Builder>::expr_into_dest
14: <rustc_mir_build::builder::Builder>::as_temp
15: <rustc_mir_build::builder::Builder>::as_call_operand
16: <rustc_mir_build::builder::Builder>::as_call_operand
17: <rustc_mir_build::builder::Builder>::expr_into_dest
18: <rustc_mir_build::builder::Builder>::as_temp::{closure#0}
19: <rustc_mir_build::builder::Builder>::expr_as_place
20: <rustc_mir_build::builder::Builder>::expr_as_place
21: <rustc_mir_build::builder::Builder>::expr_into_dest
22: <rustc_mir_build::builder::Builder>::expr_into_dest
23: <rustc_mir_build::builder::Builder>::ast_block_stmts
24: <rustc_mir_build::builder::Builder>::ast_block
25: <rustc_mir_build::builder::Builder>::expr_into_dest
26: <rustc_mir_build::builder::Builder>::expr_into_dest
27: rustc_mir_build::builder::build_mir
28: rustc_mir_transform::mir_built
[... omitted 1 frame ...]
29: <rustc_mir_build::check_unsafety::UnsafetyVisitor>::visit_inner_body
30: <rustc_mir_build::check_unsafety::UnsafetyVisitor as rustc_middle::thir::visit::Visitor>::visit_expr
31: <rustc_mir_build::check_unsafety::UnsafetyVisitor as rustc_middle::thir::visit::Visitor>::visit_block
32: <rustc_mir_build::check_unsafety::UnsafetyVisitor as rustc_middle::thir::visit::Visitor>::visit_expr
33: <rustc_mir_build::check_unsafety::UnsafetyVisitor as rustc_middle::thir::visit::Visitor>::visit_expr
34: rustc_mir_build::check_unsafety::check_unsafety
[... omitted 1 frame ...]
35: rustc_interface::passes::run_required_analyses
36: rustc_interface::passes::analysis
[... omitted 1 frame ...]
37: rustc_interface::passes::create_and_enter_global_ctxt::<core::option::Option<rustc_interface::queries::Linker>, rustc_driver_impl::run_compiler::{closure#0}::{closure#2}>::{closure#2}::{closure#0}
38: rustc_interface::interface::run_compiler::<(), rustc_driver_impl::run_compiler::{closure#0}>::{closure#1}
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.
Additional Info
This code used to work with the older nightly version here: https://github.com/ShaunSHamilton/mongo_drop/blob/ab51919a46e1bd2c4e224307c17c5eb6c73a0557/src/lib.rs#L128
Once updated to 1.88.0
using the AsyncDrop drop impl, the compiler panics on cargo test
.
Metadata
Metadata
Assignees
Labels
Category: This is a bug.`#![feature(async_drop)]`Issue: The compiler panicked, giving an Internal Compilation Error (ICE) ❄️Status: a bisection has been found for this issueStatus: A Minimal Complete and Verifiable Example has been found for this issueRelevant to the compiler team, which will review and decide on the PR/issue.This issue requires the use of incomplete features.