| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| use std::future::Future; |
| use std::pin::Pin; |
| use std::sync::Arc; |
| use std::task::Poll; |
|
|
| use anyhow::{Context as _, Result, bail, format_err}; |
| use futures_lite::FutureExt; |
| use iroh::{Endpoint, RelayMode}; |
| use tokio::fs; |
| use tokio::task::JoinHandle; |
| use tokio_util::sync::CancellationToken; |
|
|
| use crate::chat::add_device_msg; |
| use crate::context::Context; |
| use crate::imex::BlobDirContents; |
| use crate::log::warn; |
| use crate::message::Message; |
| use crate::qr::Qr; |
| use crate::stock_str::backup_transfer_msg_body; |
| use crate::tools::{TempPathGuard, create_id, time}; |
| use crate::{EventType, e2ee}; |
|
|
| use super::{DBFILE_BACKUP_NAME, export_backup_stream, export_database, import_backup_stream}; |
|
|
| |
| const BACKUP_ALPN: &[u8] = b"/deltachat/backup"; |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| #[derive(Debug)] |
| pub struct BackupProvider { |
| |
| _endpoint: Endpoint, |
|
|
| |
| node_addr: iroh::NodeAddr, |
|
|
| |
| |
| auth_token: String, |
|
|
| |
| handle: JoinHandle<Result<()>>, |
|
|
| |
| _drop_guard: tokio_util::sync::DropGuard, |
| } |
|
|
| impl BackupProvider { |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| pub async fn prepare(context: &Context) -> Result<Self> { |
| let relay_mode = RelayMode::Disabled; |
| let endpoint = Endpoint::builder() |
| .tls_x509() |
| .alpns(vec![BACKUP_ALPN.to_vec()]) |
| .relay_mode(relay_mode) |
| .bind() |
| .await?; |
| let node_addr = endpoint.node_addr().await?; |
|
|
| |
| let cancel_token = context.alloc_ongoing().await?; |
| let paused_guard = context.scheduler.pause(context).await?; |
| let context_dir = context |
| .get_blobdir() |
| .parent() |
| .context("Context dir not found")?; |
|
|
| |
| e2ee::ensure_secret_key_exists(context) |
| .await |
| .context("Cannot create private key or private key not available")?; |
|
|
| let dbfile = context_dir.join(DBFILE_BACKUP_NAME); |
| if fs::metadata(&dbfile).await.is_ok() { |
| fs::remove_file(&dbfile).await?; |
| warn!(context, "Previous database export deleted"); |
| } |
| let dbfile = TempPathGuard::new(dbfile); |
|
|
| |
| let auth_token = create_id(); |
|
|
| let passphrase = String::new(); |
|
|
| export_database(context, &dbfile, passphrase, time()) |
| .await |
| .context("Database export failed")?; |
|
|
| let drop_token = CancellationToken::new(); |
| let handle = { |
| let context = context.clone(); |
| let drop_token = drop_token.clone(); |
| let endpoint = endpoint.clone(); |
| let auth_token = auth_token.clone(); |
| tokio::spawn(async move { |
| Self::accept_loop( |
| context.clone(), |
| endpoint, |
| auth_token, |
| cancel_token, |
| drop_token, |
| dbfile, |
| ) |
| .await; |
| info!(context, "Finished accept loop."); |
|
|
| context.free_ongoing().await; |
|
|
| |
| drop(paused_guard); |
| Ok(()) |
| }) |
| }; |
| Ok(Self { |
| _endpoint: endpoint, |
| node_addr, |
| auth_token, |
| handle, |
| _drop_guard: drop_token.drop_guard(), |
| }) |
| } |
|
|
| async fn handle_connection( |
| context: Context, |
| conn: iroh::endpoint::Connecting, |
| auth_token: String, |
| dbfile: Arc<TempPathGuard>, |
| ) -> Result<()> { |
| let conn = conn.await?; |
| let (mut send_stream, mut recv_stream) = conn.accept_bi().await?; |
|
|
| |
| let mut received_auth_token = vec![0u8; auth_token.len()]; |
| recv_stream.read_exact(&mut received_auth_token).await?; |
| if received_auth_token.as_slice() != auth_token.as_bytes() { |
| warn!(context, "Received wrong backup authentication token."); |
| return Ok(()); |
| } |
|
|
| info!(context, "Received valid backup authentication token."); |
| |
| context.emit_event(EventType::ImexProgress(1)); |
|
|
| let blobdir = BlobDirContents::new(&context).await?; |
|
|
| let mut file_size = 0; |
| file_size += dbfile.metadata()?.len(); |
| for blob in blobdir.iter() { |
| file_size += blob.to_abs_path().metadata()?.len() |
| } |
|
|
| send_stream.write_all(&file_size.to_be_bytes()).await?; |
|
|
| export_backup_stream(&context, &dbfile, blobdir, send_stream, file_size) |
| .await |
| .context("Failed to write backup into QUIC stream")?; |
| info!(context, "Finished writing backup into QUIC stream."); |
| let mut buf = [0u8; 1]; |
| info!(context, "Waiting for acknowledgment."); |
| recv_stream.read_exact(&mut buf).await?; |
| info!(context, "Received backup reception acknowledgement."); |
| context.emit_event(EventType::ImexProgress(1000)); |
|
|
| let mut msg = Message::new_text(backup_transfer_msg_body(&context).await); |
| add_device_msg(&context, None, Some(&mut msg)).await?; |
|
|
| Ok(()) |
| } |
|
|
| async fn accept_loop( |
| context: Context, |
| endpoint: Endpoint, |
| auth_token: String, |
| cancel_token: async_channel::Receiver<()>, |
| drop_token: CancellationToken, |
| dbfile: TempPathGuard, |
| ) { |
| let dbfile = Arc::new(dbfile); |
| loop { |
| tokio::select! { |
| biased; |
|
|
| conn = endpoint.accept() => { |
| if let Some(conn) = conn { |
| let conn = match conn.accept() { |
| Ok(conn) => conn, |
| Err(err) => { |
| warn!(context, "Failed to accept iroh connection: {err:#}."); |
| continue; |
| } |
| }; |
| |
| let context = context.clone(); |
| let auth_token = auth_token.clone(); |
| let dbfile = dbfile.clone(); |
| if let Err(err) = Self::handle_connection(context.clone(), conn, auth_token, dbfile).race( |
| async { |
| cancel_token.recv().await.ok(); |
| Err(format_err!("Backup transfer canceled")) |
| } |
| ).race( |
| async { |
| drop_token.cancelled().await; |
| Err(format_err!("Backup provider dropped")) |
| } |
| ).await { |
| error!(context, "Error while handling backup connection: {err:#}."); |
| context.emit_event(EventType::ImexProgress(0)); |
| break; |
| } else { |
| info!(context, "Backup transfer finished successfully."); |
| break; |
| } |
| } else { |
| break; |
| } |
| }, |
| _ = cancel_token.recv() => { |
| info!(context, "Backup transfer canceled by the user, stopping accept loop."); |
| context.emit_event(EventType::ImexProgress(0)); |
| break; |
| } |
| _ = drop_token.cancelled() => { |
| info!(context, "Backup transfer canceled by dropping the provider, stopping accept loop."); |
| context.emit_event(EventType::ImexProgress(0)); |
| break; |
| } |
| } |
| } |
| } |
|
|
| |
| |
| |
| pub fn qr(&self) -> Qr { |
| Qr::Backup2 { |
| node_addr: self.node_addr.clone(), |
|
|
| auth_token: self.auth_token.clone(), |
| } |
| } |
| } |
|
|
| impl Future for BackupProvider { |
| type Output = Result<()>; |
|
|
| |
| fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> { |
| Pin::new(&mut self.handle).poll(cx)? |
| } |
| } |
|
|
| pub async fn get_backup2( |
| context: &Context, |
| node_addr: iroh::NodeAddr, |
| auth_token: String, |
| ) -> Result<()> { |
| let relay_mode = RelayMode::Disabled; |
|
|
| let endpoint = Endpoint::builder() |
| .tls_x509() |
| .relay_mode(relay_mode) |
| .bind() |
| .await?; |
|
|
| let conn = endpoint.connect(node_addr, BACKUP_ALPN).await?; |
| let (mut send_stream, mut recv_stream) = conn.open_bi().await?; |
| info!(context, "Sending backup authentication token."); |
| send_stream.write_all(auth_token.as_bytes()).await?; |
|
|
| let passphrase = String::new(); |
| info!(context, "Starting to read backup from the stream."); |
|
|
| let mut file_size_buf = [0u8; 8]; |
| recv_stream.read_exact(&mut file_size_buf).await?; |
| let file_size = u64::from_be_bytes(file_size_buf); |
| info!(context, "Received backup file size."); |
| |
| context.emit_event(EventType::ImexProgress(1)); |
|
|
| import_backup_stream(context, recv_stream, file_size, passphrase) |
| .await |
| .context("Failed to import backup from QUIC stream")?; |
| info!(context, "Finished importing backup from the stream."); |
| context.emit_event(EventType::ImexProgress(1000)); |
|
|
| |
| |
| send_stream.write_all(b".").await.ok(); |
| send_stream.finish().ok(); |
| info!(context, "Sent backup reception acknowledgment."); |
|
|
| |
| |
| _ = send_stream.stopped().await; |
|
|
| Ok(()) |
| } |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| pub async fn get_backup(context: &Context, qr: Qr) -> Result<()> { |
| match qr { |
| Qr::Backup2 { |
| node_addr, |
| auth_token, |
| } => { |
| let cancel_token = context.alloc_ongoing().await?; |
| let res = get_backup2(context, node_addr, auth_token) |
| .race(async { |
| cancel_token.recv().await.ok(); |
| Err(format_err!("Backup reception canceled")) |
| }) |
| .await; |
| if let Err(ref res) = res { |
| error!(context, "{:#}", res); |
| context.emit_event(EventType::ImexProgress(0)); |
| } |
| context.free_ongoing().await; |
| res?; |
| } |
| _ => bail!("QR code for backup must be of type DCBACKUP2"), |
| } |
| Ok(()) |
| } |
|
|
| #[cfg(test)] |
| mod tests { |
| use std::time::Duration; |
|
|
| use crate::chat::{ChatItem, get_chat_msgs, send_msg}; |
| use crate::message::Viewtype; |
| use crate::test_utils::TestContextManager; |
|
|
| use super::*; |
|
|
| #[tokio::test(flavor = "multi_thread", worker_threads = 2)] |
| async fn test_send_receive() { |
| let mut tcm = TestContextManager::new(); |
|
|
| |
| let ctx0 = tcm.alice().await; |
|
|
| |
| let self_chat = ctx0.get_self_chat().await; |
| let mut msg = Message::new_text("hi there".to_string()); |
| send_msg(&ctx0, self_chat.id, &mut msg).await.unwrap(); |
|
|
| |
| let file = ctx0.get_blobdir().join("hello.txt"); |
| fs::write(&file, "i am attachment").await.unwrap(); |
| let mut msg = Message::new(Viewtype::File); |
| msg.set_file_and_deduplicate(&ctx0, &file, Some("hello.txt"), Some("text/plain")) |
| .unwrap(); |
| send_msg(&ctx0, self_chat.id, &mut msg).await.unwrap(); |
|
|
| |
| let provider = BackupProvider::prepare(&ctx0).await.unwrap(); |
|
|
| |
| let ctx1 = tcm.unconfigured().await; |
| get_backup(&ctx1, provider.qr()).await.unwrap(); |
|
|
| |
| tokio::time::timeout(Duration::from_secs(30), provider) |
| .await |
| .expect("timed out") |
| .expect("error in provider"); |
|
|
| |
| let self_chat = ctx1.get_self_chat().await; |
| let msgs = get_chat_msgs(&ctx1, self_chat.id).await.unwrap(); |
| assert_eq!(msgs.len(), 2); |
| let msgid = match msgs.first().unwrap() { |
| ChatItem::Message { msg_id } => msg_id, |
| _ => panic!("wrong chat item"), |
| }; |
| let msg = Message::load_from_db(&ctx1, *msgid).await.unwrap(); |
| let text = msg.get_text(); |
| assert_eq!(text, "hi there"); |
| let msgid = match msgs.get(1).unwrap() { |
| ChatItem::Message { msg_id } => msg_id, |
| _ => panic!("wrong chat item"), |
| }; |
| let msg = Message::load_from_db(&ctx1, *msgid).await.unwrap(); |
|
|
| let path = msg.get_file(&ctx1).unwrap(); |
| assert_eq!( |
| |
| path.with_file_name("ac1d2d284757656a8d41dc40aae4136.txt"), |
| path |
| ); |
| assert_eq!("hello.txt", msg.get_filename().unwrap()); |
| let text = fs::read_to_string(&path).await.unwrap(); |
| assert_eq!(text, "i am attachment"); |
|
|
| let path = path.with_file_name("saved.txt"); |
| msg.save_file(&ctx1, &path).await.unwrap(); |
| let text = fs::read_to_string(&path).await.unwrap(); |
| assert_eq!(text, "i am attachment"); |
| assert!(msg.save_file(&ctx1, &path).await.is_err()); |
|
|
| |
| for ctx in [&ctx0, &ctx1] { |
| ctx.evtracker |
| .get_matching(|ev| matches!(ev, EventType::ImexProgress(1))) |
| .await; |
| ctx.evtracker |
| .get_matching(|ev| matches!(ev, EventType::ImexProgress(1000))) |
| .await; |
| } |
| } |
|
|
| #[tokio::test(flavor = "multi_thread", worker_threads = 2)] |
| async fn test_drop_provider() { |
| let mut tcm = TestContextManager::new(); |
| let ctx = tcm.alice().await; |
|
|
| let provider = BackupProvider::prepare(&ctx).await.unwrap(); |
| drop(provider); |
| ctx.evtracker |
| .get_matching(|ev| matches!(ev, EventType::ImexProgress(0))) |
| .await; |
| } |
| } |
|
|