matrix_sdk_crypto/store/
mod.rs

1// Copyright 2020 The Matrix.org Foundation C.I.C.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Types and traits to implement the storage layer for the [`OlmMachine`]
16//!
17//! The storage layer for the [`OlmMachine`] can be customized using a trait.
18//! Implementing your own [`CryptoStore`]
19//!
20//! An in-memory only store is provided as well as an SQLite-based one,
21//! depending on your needs and targets a custom store may be implemented, e.g.
22//! for `wasm-unknown-unknown` an indexeddb store would be needed
23//!
24//! ```
25//! # use std::sync::Arc;
26//! # use matrix_sdk_crypto::{
27//! #     OlmMachine,
28//! #     store::MemoryStore,
29//! # };
30//! # use ruma::{device_id, user_id};
31//! # let user_id = user_id!("@example:localhost");
32//! # let device_id = device_id!("TEST");
33//! let store = Arc::new(MemoryStore::new());
34//!
35//! let machine = OlmMachine::with_store(user_id, device_id, store, None);
36//! ```
37//!
38//! [`OlmMachine`]: /matrix_sdk_crypto/struct.OlmMachine.html
39//! [`CryptoStore`]: trait.Cryptostore.html
40
41use std::{
42    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
43    fmt::Debug,
44    ops::Deref,
45    pin::pin,
46    sync::{atomic::Ordering, Arc},
47    time::Duration,
48};
49
50use as_variant::as_variant;
51use futures_core::Stream;
52use futures_util::StreamExt;
53use matrix_sdk_common::locks::RwLock as StdRwLock;
54use ruma::{
55    encryption::KeyUsage, events::secret::request::SecretName, DeviceId, OwnedDeviceId,
56    OwnedRoomId, OwnedUserId, RoomId, UserId,
57};
58use serde::{de::DeserializeOwned, Deserialize, Serialize};
59use thiserror::Error;
60use tokio::sync::{Mutex, MutexGuard, Notify, OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock};
61use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
62use tracing::{info, warn};
63use vodozemac::{base64_encode, megolm::SessionOrdering, Curve25519PublicKey};
64use zeroize::{Zeroize, ZeroizeOnDrop};
65
66#[cfg(doc)]
67use crate::{backups::BackupMachine, identities::OwnUserIdentity};
68use crate::{
69    gossiping::GossippedSecret,
70    identities::{user::UserIdentity, Device, DeviceData, UserDevices, UserIdentityData},
71    olm::{
72        Account, ExportedRoomKey, InboundGroupSession, OlmMessageHash, OutboundGroupSession,
73        PrivateCrossSigningIdentity, SenderData, Session, StaticAccountData,
74    },
75    types::{
76        events::room_key_withheld::RoomKeyWithheldEvent, BackupSecrets, CrossSigningSecrets,
77        EventEncryptionAlgorithm, MegolmBackupV1Curve25519AesSha2Secrets, RoomKeyExport,
78        SecretsBundle,
79    },
80    verification::VerificationMachine,
81    CrossSigningStatus, OwnUserIdentityData, RoomKeyImportResult,
82};
83
84pub mod caches;
85mod crypto_store_wrapper;
86mod error;
87mod memorystore;
88mod traits;
89
90#[cfg(any(test, feature = "testing"))]
91#[macro_use]
92#[allow(missing_docs)]
93pub mod integration_tests;
94
95use caches::{SequenceNumber, UsersForKeyQuery};
96pub(crate) use crypto_store_wrapper::CryptoStoreWrapper;
97pub use error::{CryptoStoreError, Result};
98use matrix_sdk_common::{
99    deserialized_responses::WithheldCode, store_locks::CrossProcessStoreLock, timeout::timeout,
100};
101pub use memorystore::MemoryStore;
102pub use traits::{CryptoStore, DynCryptoStore, IntoCryptoStore};
103
104use crate::types::{
105    events::{room_key_bundle::RoomKeyBundleContent, room_key_withheld::RoomKeyWithheldContent},
106    room_history::RoomKeyBundle,
107};
108pub use crate::{
109    dehydrated_devices::DehydrationError,
110    gossiping::{GossipRequest, SecretInfo},
111};
112
113/// A wrapper for our CryptoStore trait object.
114///
115/// This is needed because we want to have a generic interface so we can
116/// store/restore objects that we can serialize. Since trait objects and
117/// generics don't mix let the CryptoStore store strings and this wrapper
118/// adds the generic interface on top.
119#[derive(Debug, Clone)]
120pub struct Store {
121    inner: Arc<StoreInner>,
122}
123
124#[derive(Debug, Default)]
125pub(crate) struct KeyQueryManager {
126    /// Record of the users that are waiting for a /keys/query.
127    users_for_key_query: Mutex<UsersForKeyQuery>,
128
129    /// Notifier that is triggered each time an update is received for a user.
130    users_for_key_query_notify: Notify,
131}
132
133impl KeyQueryManager {
134    pub async fn synced<'a>(&'a self, cache: &'a StoreCache) -> Result<SyncedKeyQueryManager<'a>> {
135        self.ensure_sync_tracked_users(cache).await?;
136        Ok(SyncedKeyQueryManager { cache, manager: self })
137    }
138
139    /// Load the list of users for whom we are tracking their device lists and
140    /// fill out our caches.
141    ///
142    /// This method ensures that we're only going to load the users from the
143    /// actual [`CryptoStore`] once, it will also make sure that any
144    /// concurrent calls to this method get deduplicated.
145    async fn ensure_sync_tracked_users(&self, cache: &StoreCache) -> Result<()> {
146        // Check if the users are loaded, and in that case do nothing.
147        let loaded = cache.loaded_tracked_users.read().await;
148        if *loaded {
149            return Ok(());
150        }
151
152        // Otherwise, we may load the users.
153        drop(loaded);
154        let mut loaded = cache.loaded_tracked_users.write().await;
155
156        // Check again if the users have been loaded, in case another call to this
157        // method loaded the tracked users between the time we tried to
158        // acquire the lock and the time we actually acquired the lock.
159        if *loaded {
160            return Ok(());
161        }
162
163        let tracked_users = cache.store.load_tracked_users().await?;
164
165        let mut query_users_lock = self.users_for_key_query.lock().await;
166        let mut tracked_users_cache = cache.tracked_users.write();
167        for user in tracked_users {
168            tracked_users_cache.insert(user.user_id.to_owned());
169
170            if user.dirty {
171                query_users_lock.insert_user(&user.user_id);
172            }
173        }
174
175        *loaded = true;
176
177        Ok(())
178    }
179
180    /// Wait for a `/keys/query` response to be received if one is expected for
181    /// the given user.
182    ///
183    /// If the given timeout elapses, the method will stop waiting and return
184    /// `UserKeyQueryResult::TimeoutExpired`.
185    ///
186    /// Requires a [`StoreCacheGuard`] to make sure the users for which a key
187    /// query is pending are up to date, but doesn't hold on to it
188    /// thereafter: the lock is short-lived in this case.
189    pub async fn wait_if_user_key_query_pending(
190        &self,
191        cache: StoreCacheGuard,
192        timeout_duration: Duration,
193        user: &UserId,
194    ) -> Result<UserKeyQueryResult> {
195        {
196            // Drop the cache early, so we don't keep it while waiting (since writing the
197            // results requires to write in the cache, thus take another lock).
198            self.ensure_sync_tracked_users(&cache).await?;
199            drop(cache);
200        }
201
202        let mut users_for_key_query = self.users_for_key_query.lock().await;
203        let Some(waiter) = users_for_key_query.maybe_register_waiting_task(user) else {
204            return Ok(UserKeyQueryResult::WasNotPending);
205        };
206
207        let wait_for_completion = async {
208            while !waiter.completed.load(Ordering::Relaxed) {
209                // Register for being notified before releasing the mutex, so
210                // it's impossible to miss a wakeup between the last check for
211                // whether we should wait, and starting to wait.
212                let mut notified = pin!(self.users_for_key_query_notify.notified());
213                notified.as_mut().enable();
214                drop(users_for_key_query);
215
216                // Wait for a notification
217                notified.await;
218
219                // Reclaim the lock before checking the flag to avoid races
220                // when two notifications happen right after each other and the
221                // second one sets the flag we want to wait for.
222                users_for_key_query = self.users_for_key_query.lock().await;
223            }
224        };
225
226        match timeout(Box::pin(wait_for_completion), timeout_duration).await {
227            Err(_) => {
228                warn!(
229                    user_id = ?user,
230                    "The user has a pending `/keys/query` request which did \
231                    not finish yet, some devices might be missing."
232                );
233
234                Ok(UserKeyQueryResult::TimeoutExpired)
235            }
236            _ => Ok(UserKeyQueryResult::WasPending),
237        }
238    }
239}
240
241pub(crate) struct SyncedKeyQueryManager<'a> {
242    cache: &'a StoreCache,
243    manager: &'a KeyQueryManager,
244}
245
246impl SyncedKeyQueryManager<'_> {
247    /// Add entries to the list of users being tracked for device changes
248    ///
249    /// Any users not already on the list are flagged as awaiting a key query.
250    /// Users that were already in the list are unaffected.
251    pub async fn update_tracked_users(&self, users: impl Iterator<Item = &UserId>) -> Result<()> {
252        let mut store_updates = Vec::new();
253        let mut key_query_lock = self.manager.users_for_key_query.lock().await;
254
255        {
256            let mut tracked_users = self.cache.tracked_users.write();
257            for user_id in users {
258                if tracked_users.insert(user_id.to_owned()) {
259                    key_query_lock.insert_user(user_id);
260                    store_updates.push((user_id, true))
261                }
262            }
263        }
264
265        self.cache.store.save_tracked_users(&store_updates).await
266    }
267
268    /// Process notifications that users have changed devices.
269    ///
270    /// This is used to handle the list of device-list updates that is received
271    /// from the `/sync` response. Any users *whose device lists we are
272    /// tracking* are flagged as needing a key query. Users whose devices we
273    /// are not tracking are ignored.
274    pub async fn mark_tracked_users_as_changed(
275        &self,
276        users: impl Iterator<Item = &UserId>,
277    ) -> Result<()> {
278        let mut store_updates: Vec<(&UserId, bool)> = Vec::new();
279        let mut key_query_lock = self.manager.users_for_key_query.lock().await;
280
281        {
282            let tracked_users = &self.cache.tracked_users.read();
283            for user_id in users {
284                if tracked_users.contains(user_id) {
285                    key_query_lock.insert_user(user_id);
286                    store_updates.push((user_id, true));
287                }
288            }
289        }
290
291        self.cache.store.save_tracked_users(&store_updates).await
292    }
293
294    /// Flag that the given users devices are now up-to-date.
295    ///
296    /// This is called after processing the response to a /keys/query request.
297    /// Any users whose device lists we are tracking are removed from the
298    /// list of those pending a /keys/query.
299    pub async fn mark_tracked_users_as_up_to_date(
300        &self,
301        users: impl Iterator<Item = &UserId>,
302        sequence_number: SequenceNumber,
303    ) -> Result<()> {
304        let mut store_updates: Vec<(&UserId, bool)> = Vec::new();
305        let mut key_query_lock = self.manager.users_for_key_query.lock().await;
306
307        {
308            let tracked_users = self.cache.tracked_users.read();
309            for user_id in users {
310                if tracked_users.contains(user_id) {
311                    let clean = key_query_lock.maybe_remove_user(user_id, sequence_number);
312                    store_updates.push((user_id, !clean));
313                }
314            }
315        }
316
317        self.cache.store.save_tracked_users(&store_updates).await?;
318        // wake up any tasks that may have been waiting for updates
319        self.manager.users_for_key_query_notify.notify_waiters();
320
321        Ok(())
322    }
323
324    /// Get the set of users that has the outdate/dirty flag set for their list
325    /// of devices.
326    ///
327    /// This set should be included in a `/keys/query` request which will update
328    /// the device list.
329    ///
330    /// # Returns
331    ///
332    /// A pair `(users, sequence_number)`, where `users` is the list of users to
333    /// be queried, and `sequence_number` is the current sequence number,
334    /// which should be returned in `mark_tracked_users_as_up_to_date`.
335    pub async fn users_for_key_query(&self) -> (HashSet<OwnedUserId>, SequenceNumber) {
336        self.manager.users_for_key_query.lock().await.users_for_key_query()
337    }
338
339    /// See the docs for [`crate::OlmMachine::tracked_users()`].
340    pub fn tracked_users(&self) -> HashSet<OwnedUserId> {
341        self.cache.tracked_users.read().iter().cloned().collect()
342    }
343
344    /// Mark the given user as being tracked for device lists, and mark that it
345    /// has an outdated device list.
346    ///
347    /// This means that the user will be considered for a `/keys/query` request
348    /// next time [`Store::users_for_key_query()`] is called.
349    pub async fn mark_user_as_changed(&self, user: &UserId) -> Result<()> {
350        self.manager.users_for_key_query.lock().await.insert_user(user);
351        self.cache.tracked_users.write().insert(user.to_owned());
352
353        self.cache.store.save_tracked_users(&[(user, true)]).await
354    }
355}
356
357#[derive(Debug)]
358pub(crate) struct StoreCache {
359    store: Arc<CryptoStoreWrapper>,
360    tracked_users: StdRwLock<BTreeSet<OwnedUserId>>,
361    loaded_tracked_users: RwLock<bool>,
362    account: Mutex<Option<Account>>,
363}
364
365impl StoreCache {
366    pub(crate) fn store_wrapper(&self) -> &CryptoStoreWrapper {
367        self.store.as_ref()
368    }
369
370    /// Returns a reference to the `Account`.
371    ///
372    /// Either load the account from the cache, or the store if missing from
373    /// the cache.
374    ///
375    /// Note there should always be an account stored at least in the store, so
376    /// this doesn't return an `Option`.
377    ///
378    /// Note: this method should remain private, otherwise it's possible to ask
379    /// for a `StoreTransaction`, then get the `StoreTransaction::cache()`
380    /// and thus have two different live copies of the `Account` at once.
381    async fn account(&self) -> Result<impl Deref<Target = Account> + '_> {
382        let mut guard = self.account.lock().await;
383        if guard.is_some() {
384            Ok(MutexGuard::map(guard, |acc| acc.as_mut().unwrap()))
385        } else {
386            match self.store.load_account().await? {
387                Some(account) => {
388                    *guard = Some(account);
389                    Ok(MutexGuard::map(guard, |acc| acc.as_mut().unwrap()))
390                }
391                None => Err(CryptoStoreError::AccountUnset),
392            }
393        }
394    }
395}
396
397/// Read-only store cache guard.
398///
399/// This type should hold all the methods that are available when the cache is
400/// borrowed in read-only mode, while all the write operations on those fields
401/// should happen as part of a `StoreTransaction`.
402pub(crate) struct StoreCacheGuard {
403    cache: OwnedRwLockReadGuard<StoreCache>,
404    // TODO: (bnjbvr, #2624) add cross-process lock guard here.
405}
406
407impl StoreCacheGuard {
408    /// Returns a reference to the `Account`.
409    ///
410    /// Either load the account from the cache, or the store if missing from
411    /// the cache.
412    ///
413    /// Note there should always be an account stored at least in the store, so
414    /// this doesn't return an `Option`.
415    pub async fn account(&self) -> Result<impl Deref<Target = Account> + '_> {
416        self.cache.account().await
417    }
418}
419
420impl Deref for StoreCacheGuard {
421    type Target = StoreCache;
422
423    fn deref(&self) -> &Self::Target {
424        &self.cache
425    }
426}
427
428/// A temporary transaction (that implies a write) to the underlying store.
429#[allow(missing_debug_implementations)]
430pub struct StoreTransaction {
431    store: Store,
432    changes: PendingChanges,
433    // TODO hold onto the cross-process crypto store lock + cache.
434    cache: OwnedRwLockWriteGuard<StoreCache>,
435}
436
437impl StoreTransaction {
438    /// Starts a new `StoreTransaction`.
439    async fn new(store: Store) -> Self {
440        let cache = store.inner.cache.clone();
441
442        Self { store, changes: PendingChanges::default(), cache: cache.clone().write_owned().await }
443    }
444
445    pub(crate) fn cache(&self) -> &StoreCache {
446        &self.cache
447    }
448
449    /// Returns a reference to the current `Store`.
450    pub fn store(&self) -> &Store {
451        &self.store
452    }
453
454    /// Gets a `Account` for update.
455    ///
456    /// Note: since it's guaranteed that one can't have both a
457    /// `StoreTransaction` and a `StoreCacheGuard` at runtime (since the
458    /// underlying `StoreCache` is guarded by a `RwLock` mutex), this ensures
459    /// that we can't have two copies of an `Account` alive at the same time.
460    pub async fn account(&mut self) -> Result<&mut Account> {
461        if self.changes.account.is_none() {
462            // Make sure the cache loaded the account.
463            let _ = self.cache.account().await?;
464            self.changes.account = self.cache.account.lock().await.take();
465        }
466        Ok(self.changes.account.as_mut().unwrap())
467    }
468
469    /// Commits all dirty fields to the store, and maintains the cache so it
470    /// reflects the current state of the database.
471    pub async fn commit(self) -> Result<()> {
472        if self.changes.is_empty() {
473            return Ok(());
474        }
475
476        // Save changes in the database.
477        let account = self.changes.account.as_ref().map(|acc| acc.deep_clone());
478
479        self.store.save_pending_changes(self.changes).await?;
480
481        // Make the cache coherent with the database.
482        if let Some(account) = account {
483            *self.cache.account.lock().await = Some(account);
484        }
485
486        Ok(())
487    }
488}
489
490#[derive(Debug)]
491struct StoreInner {
492    identity: Arc<Mutex<PrivateCrossSigningIdentity>>,
493    store: Arc<CryptoStoreWrapper>,
494
495    /// In-memory cache for the current crypto store.
496    ///
497    /// ⚠ Must remain private.
498    cache: Arc<RwLock<StoreCache>>,
499
500    verification_machine: VerificationMachine,
501
502    /// Static account data that never changes (and thus can be loaded once and
503    /// for all when creating the store).
504    static_account: StaticAccountData,
505}
506
507/// Aggregated changes to be saved in the database.
508///
509/// This is an update version of `Changes` that will replace it as #2624
510/// progresses.
511// If you ever add a field here, make sure to update `Changes::is_empty` too.
512#[derive(Default, Debug)]
513#[allow(missing_docs)]
514pub struct PendingChanges {
515    pub account: Option<Account>,
516}
517
518impl PendingChanges {
519    /// Are there any changes stored or is this an empty `Changes` struct?
520    pub fn is_empty(&self) -> bool {
521        self.account.is_none()
522    }
523}
524
525/// Aggregated changes to be saved in the database.
526// If you ever add a field here, make sure to update `Changes::is_empty` too.
527#[derive(Default, Debug)]
528#[allow(missing_docs)]
529pub struct Changes {
530    pub private_identity: Option<PrivateCrossSigningIdentity>,
531    pub backup_version: Option<String>,
532    pub backup_decryption_key: Option<BackupDecryptionKey>,
533    pub dehydrated_device_pickle_key: Option<DehydratedDeviceKey>,
534    pub sessions: Vec<Session>,
535    pub message_hashes: Vec<OlmMessageHash>,
536    pub inbound_group_sessions: Vec<InboundGroupSession>,
537    pub outbound_group_sessions: Vec<OutboundGroupSession>,
538    pub key_requests: Vec<GossipRequest>,
539    pub identities: IdentityChanges,
540    pub devices: DeviceChanges,
541    /// Stores when a `m.room_key.withheld` is received
542    pub withheld_session_info: BTreeMap<OwnedRoomId, BTreeMap<String, RoomKeyWithheldEvent>>,
543    pub room_settings: HashMap<OwnedRoomId, RoomSettings>,
544    pub secrets: Vec<GossippedSecret>,
545    pub next_batch_token: Option<String>,
546
547    /// Historical room key history bundles that we have received and should
548    /// store.
549    pub received_room_key_bundles: Vec<StoredRoomKeyBundleData>,
550}
551
552/// Information about an [MSC4268] room key bundle.
553///
554/// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
555#[derive(Clone, Debug, Serialize, Deserialize)]
556pub struct StoredRoomKeyBundleData {
557    /// The user that sent us this data.
558    pub sender_user: OwnedUserId,
559
560    /// Information about the sender of this data and how much we trust that
561    /// information.
562    pub sender_data: SenderData,
563
564    /// The room key bundle data itself.
565    pub bundle_data: RoomKeyBundleContent,
566}
567
568/// A user for which we are tracking the list of devices.
569#[derive(Clone, Debug, Serialize, Deserialize)]
570pub struct TrackedUser {
571    /// The user ID of the user.
572    pub user_id: OwnedUserId,
573    /// The outdate/dirty flag of the user, remembers if the list of devices for
574    /// the user is considered to be out of date. If the list of devices is
575    /// out of date, a `/keys/query` request should be sent out for this
576    /// user.
577    pub dirty: bool,
578}
579
580impl Changes {
581    /// Are there any changes stored or is this an empty `Changes` struct?
582    pub fn is_empty(&self) -> bool {
583        self.private_identity.is_none()
584            && self.backup_version.is_none()
585            && self.backup_decryption_key.is_none()
586            && self.dehydrated_device_pickle_key.is_none()
587            && self.sessions.is_empty()
588            && self.message_hashes.is_empty()
589            && self.inbound_group_sessions.is_empty()
590            && self.outbound_group_sessions.is_empty()
591            && self.key_requests.is_empty()
592            && self.identities.is_empty()
593            && self.devices.is_empty()
594            && self.withheld_session_info.is_empty()
595            && self.room_settings.is_empty()
596            && self.secrets.is_empty()
597            && self.next_batch_token.is_none()
598            && self.received_room_key_bundles.is_empty()
599    }
600}
601
602/// This struct is used to remember whether an identity has undergone a change
603/// or remains the same as the one we already know about.
604///
605/// When the homeserver informs us of a potential change in a user's identity or
606/// device during a `/sync` response, it triggers a `/keys/query` request from
607/// our side. In response to this query, the server provides a comprehensive
608/// snapshot of all the user's devices and identities.
609///
610/// Our responsibility is to discern whether a device or identity is new,
611/// changed, or unchanged.
612#[derive(Debug, Clone, Default)]
613#[allow(missing_docs)]
614pub struct IdentityChanges {
615    pub new: Vec<UserIdentityData>,
616    pub changed: Vec<UserIdentityData>,
617    pub unchanged: Vec<UserIdentityData>,
618}
619
620impl IdentityChanges {
621    fn is_empty(&self) -> bool {
622        self.new.is_empty() && self.changed.is_empty()
623    }
624
625    /// Convert the vectors contained in the [`IdentityChanges`] into
626    /// three maps from user id to user identity (new, updated, unchanged).
627    fn into_maps(
628        self,
629    ) -> (
630        BTreeMap<OwnedUserId, UserIdentityData>,
631        BTreeMap<OwnedUserId, UserIdentityData>,
632        BTreeMap<OwnedUserId, UserIdentityData>,
633    ) {
634        let new: BTreeMap<_, _> = self
635            .new
636            .into_iter()
637            .map(|identity| (identity.user_id().to_owned(), identity))
638            .collect();
639
640        let changed: BTreeMap<_, _> = self
641            .changed
642            .into_iter()
643            .map(|identity| (identity.user_id().to_owned(), identity))
644            .collect();
645
646        let unchanged: BTreeMap<_, _> = self
647            .unchanged
648            .into_iter()
649            .map(|identity| (identity.user_id().to_owned(), identity))
650            .collect();
651
652        (new, changed, unchanged)
653    }
654}
655
656#[derive(Debug, Clone, Default)]
657#[allow(missing_docs)]
658pub struct DeviceChanges {
659    pub new: Vec<DeviceData>,
660    pub changed: Vec<DeviceData>,
661    pub deleted: Vec<DeviceData>,
662}
663
664/// Convert the devices and vectors contained in the [`DeviceChanges`] into
665/// a [`DeviceUpdates`] struct.
666///
667/// The [`DeviceChanges`] will contain vectors of [`DeviceData`]s which
668/// we want to convert to a [`Device`].
669fn collect_device_updates(
670    verification_machine: VerificationMachine,
671    own_identity: Option<OwnUserIdentityData>,
672    identities: IdentityChanges,
673    devices: DeviceChanges,
674) -> DeviceUpdates {
675    let mut new: BTreeMap<_, BTreeMap<_, _>> = BTreeMap::new();
676    let mut changed: BTreeMap<_, BTreeMap<_, _>> = BTreeMap::new();
677
678    let (new_identities, changed_identities, unchanged_identities) = identities.into_maps();
679
680    let map_device = |device: DeviceData| {
681        let device_owner_identity = new_identities
682            .get(device.user_id())
683            .or_else(|| changed_identities.get(device.user_id()))
684            .or_else(|| unchanged_identities.get(device.user_id()))
685            .cloned();
686
687        Device {
688            inner: device,
689            verification_machine: verification_machine.to_owned(),
690            own_identity: own_identity.to_owned(),
691            device_owner_identity,
692        }
693    };
694
695    for device in devices.new {
696        let device = map_device(device);
697
698        new.entry(device.user_id().to_owned())
699            .or_default()
700            .insert(device.device_id().to_owned(), device);
701    }
702
703    for device in devices.changed {
704        let device = map_device(device);
705
706        changed
707            .entry(device.user_id().to_owned())
708            .or_default()
709            .insert(device.device_id().to_owned(), device.to_owned());
710    }
711
712    DeviceUpdates { new, changed }
713}
714
715/// Updates about [`Device`]s which got received over the `/keys/query`
716/// endpoint.
717#[derive(Clone, Debug, Default)]
718pub struct DeviceUpdates {
719    /// The list of newly discovered devices.
720    ///
721    /// A device being in this list does not necessarily mean that the device
722    /// was just created, it just means that it's the first time we're
723    /// seeing this device.
724    pub new: BTreeMap<OwnedUserId, BTreeMap<OwnedDeviceId, Device>>,
725    /// The list of changed devices.
726    pub changed: BTreeMap<OwnedUserId, BTreeMap<OwnedDeviceId, Device>>,
727}
728
729/// Updates about [`UserIdentity`]s which got received over the `/keys/query`
730/// endpoint.
731#[derive(Clone, Debug, Default)]
732pub struct IdentityUpdates {
733    /// The list of newly discovered user identities .
734    ///
735    /// A identity being in this list does not necessarily mean that the
736    /// identity was just created, it just means that it's the first time
737    /// we're seeing this identity.
738    pub new: BTreeMap<OwnedUserId, UserIdentity>,
739    /// The list of changed identities.
740    pub changed: BTreeMap<OwnedUserId, UserIdentity>,
741    /// The list of unchanged identities.
742    pub unchanged: BTreeMap<OwnedUserId, UserIdentity>,
743}
744
745/// The private part of a backup key.
746///
747/// The private part of the key is not used on a regular basis. Rather, it is
748/// used only when we need to *recover* the backup.
749///
750/// Typically, this private key is itself encrypted and stored in server-side
751/// secret storage (SSSS), whence it can be retrieved when it is needed for a
752/// recovery operation. Alternatively, the key can be "gossiped" between devices
753/// via "secret sharing".
754#[derive(Clone, Zeroize, ZeroizeOnDrop, Deserialize, Serialize)]
755#[serde(transparent)]
756pub struct BackupDecryptionKey {
757    pub(crate) inner: Box<[u8; BackupDecryptionKey::KEY_SIZE]>,
758}
759
760impl BackupDecryptionKey {
761    /// The number of bytes the decryption key will hold.
762    pub const KEY_SIZE: usize = 32;
763
764    /// Create a new random decryption key.
765    pub fn new() -> Result<Self, rand::Error> {
766        let mut rng = rand::thread_rng();
767
768        let mut key = Box::new([0u8; Self::KEY_SIZE]);
769        rand::Fill::try_fill(key.as_mut_slice(), &mut rng)?;
770
771        Ok(Self { inner: key })
772    }
773
774    /// Export the [`BackupDecryptionKey`] as a base64 encoded string.
775    pub fn to_base64(&self) -> String {
776        base64_encode(self.inner.as_slice())
777    }
778}
779
780#[cfg(not(tarpaulin_include))]
781impl Debug for BackupDecryptionKey {
782    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
783        f.debug_tuple("BackupDecryptionKey").field(&"...").finish()
784    }
785}
786
787/// The pickle key used to safely store the dehydrated device pickle.
788///
789/// This input key material will be expanded using HKDF into an AES key, MAC
790/// key, and an initialization vector (IV).
791#[derive(Clone, Zeroize, ZeroizeOnDrop, Deserialize, Serialize)]
792#[serde(transparent)]
793pub struct DehydratedDeviceKey {
794    pub(crate) inner: Box<[u8; DehydratedDeviceKey::KEY_SIZE]>,
795}
796
797impl DehydratedDeviceKey {
798    /// The number of bytes the encryption key will hold.
799    pub const KEY_SIZE: usize = 32;
800
801    /// Generates a new random pickle key.
802    pub fn new() -> Result<Self, rand::Error> {
803        let mut rng = rand::thread_rng();
804
805        let mut key = Box::new([0u8; Self::KEY_SIZE]);
806        rand::Fill::try_fill(key.as_mut_slice(), &mut rng)?;
807
808        Ok(Self { inner: key })
809    }
810
811    /// Creates a new dehydration pickle key from the given slice.
812    ///
813    /// Fail if the slice length is not 32.
814    pub fn from_slice(slice: &[u8]) -> Result<Self, DehydrationError> {
815        if slice.len() == 32 {
816            let mut key = Box::new([0u8; 32]);
817            key.copy_from_slice(slice);
818            Ok(DehydratedDeviceKey { inner: key })
819        } else {
820            Err(DehydrationError::PickleKeyLength(slice.len()))
821        }
822    }
823
824    /// Creates a dehydration pickle key from the given bytes.
825    pub fn from_bytes(raw_key: &[u8; 32]) -> Self {
826        let mut inner = Box::new([0u8; Self::KEY_SIZE]);
827        inner.copy_from_slice(raw_key);
828
829        Self { inner }
830    }
831
832    /// Export the [`DehydratedDeviceKey`] as a base64 encoded string.
833    pub fn to_base64(&self) -> String {
834        base64_encode(self.inner.as_slice())
835    }
836}
837
838impl From<&[u8; 32]> for DehydratedDeviceKey {
839    fn from(value: &[u8; 32]) -> Self {
840        DehydratedDeviceKey { inner: Box::new(*value) }
841    }
842}
843
844impl From<DehydratedDeviceKey> for Vec<u8> {
845    fn from(key: DehydratedDeviceKey) -> Self {
846        key.inner.to_vec()
847    }
848}
849
850#[cfg(not(tarpaulin_include))]
851impl Debug for DehydratedDeviceKey {
852    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
853        f.debug_tuple("DehydratedDeviceKey").field(&"...").finish()
854    }
855}
856
857impl DeviceChanges {
858    /// Merge the given `DeviceChanges` into this instance of `DeviceChanges`.
859    pub fn extend(&mut self, other: DeviceChanges) {
860        self.new.extend(other.new);
861        self.changed.extend(other.changed);
862        self.deleted.extend(other.deleted);
863    }
864
865    fn is_empty(&self) -> bool {
866        self.new.is_empty() && self.changed.is_empty() && self.deleted.is_empty()
867    }
868}
869
870/// Struct holding info about how many room keys the store has.
871#[derive(Debug, Clone, Default)]
872pub struct RoomKeyCounts {
873    /// The total number of room keys the store has.
874    pub total: usize,
875    /// The number of backed up room keys the store has.
876    pub backed_up: usize,
877}
878
879/// Stored versions of the backup keys.
880#[derive(Default, Clone, Debug)]
881pub struct BackupKeys {
882    /// The key used to decrypt backed up room keys.
883    pub decryption_key: Option<BackupDecryptionKey>,
884    /// The version that we are using for backups.
885    pub backup_version: Option<String>,
886}
887
888/// A struct containing private cross signing keys that can be backed up or
889/// uploaded to the secret store.
890#[derive(Default, Zeroize, ZeroizeOnDrop)]
891pub struct CrossSigningKeyExport {
892    /// The seed of the master key encoded as unpadded base64.
893    pub master_key: Option<String>,
894    /// The seed of the self signing key encoded as unpadded base64.
895    pub self_signing_key: Option<String>,
896    /// The seed of the user signing key encoded as unpadded base64.
897    pub user_signing_key: Option<String>,
898}
899
900#[cfg(not(tarpaulin_include))]
901impl Debug for CrossSigningKeyExport {
902    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
903        f.debug_struct("CrossSigningKeyExport")
904            .field("master_key", &self.master_key.is_some())
905            .field("self_signing_key", &self.self_signing_key.is_some())
906            .field("user_signing_key", &self.user_signing_key.is_some())
907            .finish_non_exhaustive()
908    }
909}
910
911/// Error describing what went wrong when importing private cross signing keys
912/// or the key backup key.
913#[derive(Debug, Error)]
914pub enum SecretImportError {
915    /// The key that we tried to import was invalid.
916    #[error(transparent)]
917    Key(#[from] vodozemac::KeyError),
918    /// The public key of the imported private key doesn't match to the public
919    /// key that was uploaded to the server.
920    #[error(
921        "The public key of the imported private key doesn't match to the \
922            public key that was uploaded to the server"
923    )]
924    MismatchedPublicKeys,
925    /// The new version of the identity couldn't be stored.
926    #[error(transparent)]
927    Store(#[from] CryptoStoreError),
928}
929
930/// Error describing what went wrong when exporting a [`SecretsBundle`].
931///
932/// The [`SecretsBundle`] can only be exported if we have all cross-signing
933/// private keys in the store.
934#[derive(Debug, Error)]
935pub enum SecretsBundleExportError {
936    /// The store itself had an error.
937    #[error(transparent)]
938    Store(#[from] CryptoStoreError),
939    /// We're missing one or multiple cross-signing keys.
940    #[error("The store is missing one or multiple cross-signing keys")]
941    MissingCrossSigningKey(KeyUsage),
942    /// We're missing all cross-signing keys.
943    #[error("The store doesn't contain any cross-signing keys")]
944    MissingCrossSigningKeys,
945    /// We have a backup key stored, but we don't know the version of the
946    /// backup.
947    #[error("The store contains a backup key, but no backup version")]
948    MissingBackupVersion,
949}
950
951/// Result type telling us if a `/keys/query` response was expected for a given
952/// user.
953#[derive(Clone, Copy, Debug, PartialEq, Eq)]
954pub(crate) enum UserKeyQueryResult {
955    WasPending,
956    WasNotPending,
957
958    /// A query was pending, but we gave up waiting
959    TimeoutExpired,
960}
961
962/// Room encryption settings which are modified by state events or user options
963#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
964pub struct RoomSettings {
965    /// The encryption algorithm that should be used in the room.
966    pub algorithm: EventEncryptionAlgorithm,
967
968    /// Should untrusted devices receive the room key, or should they be
969    /// excluded from the conversation.
970    pub only_allow_trusted_devices: bool,
971
972    /// The maximum time an encryption session should be used for, before it is
973    /// rotated.
974    pub session_rotation_period: Option<Duration>,
975
976    /// The maximum number of messages an encryption session should be used for,
977    /// before it is rotated.
978    pub session_rotation_period_messages: Option<usize>,
979}
980
981impl Default for RoomSettings {
982    fn default() -> Self {
983        Self {
984            algorithm: EventEncryptionAlgorithm::MegolmV1AesSha2,
985            only_allow_trusted_devices: false,
986            session_rotation_period: None,
987            session_rotation_period_messages: None,
988        }
989    }
990}
991
992/// Information on a room key that has been received or imported.
993#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
994pub struct RoomKeyInfo {
995    /// The [messaging algorithm] that this key is used for. Will be one of the
996    /// `m.megolm.*` algorithms.
997    ///
998    /// [messaging algorithm]: https://spec.matrix.org/v1.6/client-server-api/#messaging-algorithms
999    pub algorithm: EventEncryptionAlgorithm,
1000
1001    /// The room where the key is used.
1002    pub room_id: OwnedRoomId,
1003
1004    /// The Curve25519 key of the device which initiated the session originally.
1005    pub sender_key: Curve25519PublicKey,
1006
1007    /// The ID of the session that the key is for.
1008    pub session_id: String,
1009}
1010
1011impl From<&InboundGroupSession> for RoomKeyInfo {
1012    fn from(group_session: &InboundGroupSession) -> Self {
1013        RoomKeyInfo {
1014            algorithm: group_session.algorithm().clone(),
1015            room_id: group_session.room_id().to_owned(),
1016            sender_key: group_session.sender_key(),
1017            session_id: group_session.session_id().to_owned(),
1018        }
1019    }
1020}
1021
1022/// Information on a room key that has been withheld
1023#[derive(Clone, Debug, Deserialize, Serialize)]
1024pub struct RoomKeyWithheldInfo {
1025    /// The room where the key is used.
1026    pub room_id: OwnedRoomId,
1027
1028    /// The ID of the session that the key is for.
1029    pub session_id: String,
1030
1031    /// The `m.room_key.withheld` event that notified us that the key is being
1032    /// withheld.
1033    pub withheld_event: RoomKeyWithheldEvent,
1034}
1035
1036impl Store {
1037    /// Create a new Store.
1038    pub(crate) fn new(
1039        account: StaticAccountData,
1040        identity: Arc<Mutex<PrivateCrossSigningIdentity>>,
1041        store: Arc<CryptoStoreWrapper>,
1042        verification_machine: VerificationMachine,
1043    ) -> Self {
1044        Self {
1045            inner: Arc::new(StoreInner {
1046                static_account: account,
1047                identity,
1048                store: store.clone(),
1049                verification_machine,
1050                cache: Arc::new(RwLock::new(StoreCache {
1051                    store,
1052                    tracked_users: Default::default(),
1053                    loaded_tracked_users: Default::default(),
1054                    account: Default::default(),
1055                })),
1056            }),
1057        }
1058    }
1059
1060    /// UserId associated with this store
1061    pub(crate) fn user_id(&self) -> &UserId {
1062        &self.inner.static_account.user_id
1063    }
1064
1065    /// DeviceId associated with this store
1066    pub(crate) fn device_id(&self) -> &DeviceId {
1067        self.inner.verification_machine.own_device_id()
1068    }
1069
1070    /// The static data for the account associated with this store.
1071    pub(crate) fn static_account(&self) -> &StaticAccountData {
1072        &self.inner.static_account
1073    }
1074
1075    pub(crate) async fn cache(&self) -> Result<StoreCacheGuard> {
1076        // TODO: (bnjbvr, #2624) If configured with a cross-process lock:
1077        // - try to take the lock,
1078        // - if acquired, look if another process touched the underlying storage,
1079        // - if yes, reload everything; if no, return current cache
1080        Ok(StoreCacheGuard { cache: self.inner.cache.clone().read_owned().await })
1081    }
1082
1083    pub(crate) async fn transaction(&self) -> StoreTransaction {
1084        StoreTransaction::new(self.clone()).await
1085    }
1086
1087    // Note: bnjbvr lost against borrowck here. Ideally, the `F` parameter would
1088    // take a `&StoreTransaction`, but callers didn't quite like that.
1089    pub(crate) async fn with_transaction<
1090        T,
1091        Fut: futures_core::Future<Output = Result<(StoreTransaction, T), crate::OlmError>>,
1092        F: FnOnce(StoreTransaction) -> Fut,
1093    >(
1094        &self,
1095        func: F,
1096    ) -> Result<T, crate::OlmError> {
1097        let tr = self.transaction().await;
1098        let (tr, res) = func(tr).await?;
1099        tr.commit().await?;
1100        Ok(res)
1101    }
1102
1103    #[cfg(test)]
1104    /// test helper to reset the cross signing identity
1105    pub(crate) async fn reset_cross_signing_identity(&self) {
1106        self.inner.identity.lock().await.reset();
1107    }
1108
1109    /// PrivateCrossSigningIdentity associated with this store
1110    pub(crate) fn private_identity(&self) -> Arc<Mutex<PrivateCrossSigningIdentity>> {
1111        self.inner.identity.clone()
1112    }
1113
1114    /// Save the given Sessions to the store
1115    pub(crate) async fn save_sessions(&self, sessions: &[Session]) -> Result<()> {
1116        let changes = Changes { sessions: sessions.to_vec(), ..Default::default() };
1117
1118        self.save_changes(changes).await
1119    }
1120
1121    pub(crate) async fn get_sessions(
1122        &self,
1123        sender_key: &str,
1124    ) -> Result<Option<Arc<Mutex<Vec<Session>>>>> {
1125        self.inner.store.get_sessions(sender_key).await
1126    }
1127
1128    pub(crate) async fn save_changes(&self, changes: Changes) -> Result<()> {
1129        self.inner.store.save_changes(changes).await
1130    }
1131
1132    /// Compare the given `InboundGroupSession` with an existing session we have
1133    /// in the store.
1134    ///
1135    /// This method returns `SessionOrdering::Better` if the given session is
1136    /// better than the one we already have or if we don't have such a
1137    /// session in the store.
1138    pub(crate) async fn compare_group_session(
1139        &self,
1140        session: &InboundGroupSession,
1141    ) -> Result<SessionOrdering> {
1142        let old_session = self
1143            .inner
1144            .store
1145            .get_inbound_group_session(session.room_id(), session.session_id())
1146            .await?;
1147
1148        Ok(if let Some(old_session) = old_session {
1149            session.compare(&old_session).await
1150        } else {
1151            SessionOrdering::Better
1152        })
1153    }
1154
1155    #[cfg(test)]
1156    /// Testing helper to allow to save only a set of devices
1157    pub(crate) async fn save_device_data(&self, devices: &[DeviceData]) -> Result<()> {
1158        let changes = Changes {
1159            devices: DeviceChanges { changed: devices.to_vec(), ..Default::default() },
1160            ..Default::default()
1161        };
1162
1163        self.save_changes(changes).await
1164    }
1165
1166    /// Convenience helper to persist an array of [`InboundGroupSession`]s.
1167    pub(crate) async fn save_inbound_group_sessions(
1168        &self,
1169        sessions: &[InboundGroupSession],
1170    ) -> Result<()> {
1171        let changes = Changes { inbound_group_sessions: sessions.to_vec(), ..Default::default() };
1172
1173        self.save_changes(changes).await
1174    }
1175
1176    /// Get the display name of our own device.
1177    pub(crate) async fn device_display_name(&self) -> Result<Option<String>, CryptoStoreError> {
1178        Ok(self
1179            .inner
1180            .store
1181            .get_device(self.user_id(), self.device_id())
1182            .await?
1183            .and_then(|d| d.display_name().map(|d| d.to_owned())))
1184    }
1185
1186    /// Get the device data for the given [`UserId`] and [`DeviceId`].
1187    ///
1188    /// *Note*: This method will include our own device which is always present
1189    /// in the store.
1190    pub(crate) async fn get_device_data(
1191        &self,
1192        user_id: &UserId,
1193        device_id: &DeviceId,
1194    ) -> Result<Option<DeviceData>> {
1195        self.inner.store.get_device(user_id, device_id).await
1196    }
1197
1198    /// Get the device data for the given [`UserId`] and [`DeviceId`].
1199    ///
1200    /// *Note*: This method will **not** include our own device.
1201    ///
1202    /// Use this method if you need a list of recipients for a given user, since
1203    /// we don't want to encrypt for our own device, otherwise take a look at
1204    /// the [`Store::get_device_data_for_user`] method.
1205    pub(crate) async fn get_device_data_for_user_filtered(
1206        &self,
1207        user_id: &UserId,
1208    ) -> Result<HashMap<OwnedDeviceId, DeviceData>> {
1209        self.inner.store.get_user_devices(user_id).await.map(|mut d| {
1210            if user_id == self.user_id() {
1211                d.remove(self.device_id());
1212            }
1213            d
1214        })
1215    }
1216
1217    /// Get the [`DeviceData`] for all the devices a user has.
1218    ///
1219    /// *Note*: This method will include our own device which is always present
1220    /// in the store.
1221    ///
1222    /// Use this method if you need to operate on or update all devices of a
1223    /// user, otherwise take a look at the
1224    /// [`Store::get_device_data_for_user_filtered`] method.
1225    pub(crate) async fn get_device_data_for_user(
1226        &self,
1227        user_id: &UserId,
1228    ) -> Result<HashMap<OwnedDeviceId, DeviceData>> {
1229        self.inner.store.get_user_devices(user_id).await
1230    }
1231
1232    /// Get a [`Device`] for the given user with the given
1233    /// [`Curve25519PublicKey`] key.
1234    ///
1235    /// *Note*: This method will include our own device which is always present
1236    /// in the store.
1237    pub(crate) async fn get_device_from_curve_key(
1238        &self,
1239        user_id: &UserId,
1240        curve_key: Curve25519PublicKey,
1241    ) -> Result<Option<Device>> {
1242        self.get_user_devices(user_id)
1243            .await
1244            .map(|d| d.devices().find(|d| d.curve25519_key() == Some(curve_key)))
1245    }
1246
1247    /// Get all devices associated with the given [`UserId`].
1248    ///
1249    /// This method is more expensive than the
1250    /// [`Store::get_device_data_for_user`] method, since a [`Device`]
1251    /// requires the [`OwnUserIdentityData`] and the [`UserIdentityData`] of the
1252    /// device owner to be fetched from the store as well.
1253    ///
1254    /// *Note*: This method will include our own device which is always present
1255    /// in the store.
1256    pub(crate) async fn get_user_devices(&self, user_id: &UserId) -> Result<UserDevices> {
1257        let devices = self.get_device_data_for_user(user_id).await?;
1258
1259        let own_identity = self
1260            .inner
1261            .store
1262            .get_user_identity(self.user_id())
1263            .await?
1264            .and_then(|i| i.own().cloned());
1265        let device_owner_identity = self.inner.store.get_user_identity(user_id).await?;
1266
1267        Ok(UserDevices {
1268            inner: devices,
1269            verification_machine: self.inner.verification_machine.clone(),
1270            own_identity,
1271            device_owner_identity,
1272        })
1273    }
1274
1275    /// Get a [`Device`] for the given user with the given [`DeviceId`].
1276    ///
1277    /// This method is more expensive than the [`Store::get_device_data`] method
1278    /// since a [`Device`] requires the [`OwnUserIdentityData`] and the
1279    /// [`UserIdentityData`] of the device owner to be fetched from the
1280    /// store as well.
1281    ///
1282    /// *Note*: This method will include our own device which is always present
1283    /// in the store.
1284    pub(crate) async fn get_device(
1285        &self,
1286        user_id: &UserId,
1287        device_id: &DeviceId,
1288    ) -> Result<Option<Device>> {
1289        if let Some(device_data) = self.inner.store.get_device(user_id, device_id).await? {
1290            Ok(Some(self.wrap_device_data(device_data).await?))
1291        } else {
1292            Ok(None)
1293        }
1294    }
1295
1296    /// Create a new device using the supplied [`DeviceData`]. Normally we would
1297    /// call [`Self::get_device`] to find an existing device inside this
1298    /// store. Only call this if you have some existing DeviceData and want
1299    /// to wrap it with the extra information provided by a [`Device`].
1300    pub(crate) async fn wrap_device_data(&self, device_data: DeviceData) -> Result<Device> {
1301        let own_identity = self
1302            .inner
1303            .store
1304            .get_user_identity(self.user_id())
1305            .await?
1306            .and_then(|i| i.own().cloned());
1307
1308        let device_owner_identity =
1309            self.inner.store.get_user_identity(device_data.user_id()).await?;
1310
1311        Ok(Device {
1312            inner: device_data,
1313            verification_machine: self.inner.verification_machine.clone(),
1314            own_identity,
1315            device_owner_identity,
1316        })
1317    }
1318
1319    ///  Get the Identity of `user_id`
1320    pub(crate) async fn get_identity(&self, user_id: &UserId) -> Result<Option<UserIdentity>> {
1321        let own_identity = self
1322            .inner
1323            .store
1324            .get_user_identity(self.user_id())
1325            .await?
1326            .and_then(as_variant!(UserIdentityData::Own));
1327
1328        Ok(self.inner.store.get_user_identity(user_id).await?.map(|i| {
1329            UserIdentity::new(
1330                self.clone(),
1331                i,
1332                self.inner.verification_machine.to_owned(),
1333                own_identity,
1334            )
1335        }))
1336    }
1337
1338    /// Try to export the secret with the given secret name.
1339    ///
1340    /// The exported secret will be encoded as unpadded base64. Returns `Null`
1341    /// if the secret can't be found.
1342    ///
1343    /// # Arguments
1344    ///
1345    /// * `secret_name` - The name of the secret that should be exported.
1346    pub async fn export_secret(
1347        &self,
1348        secret_name: &SecretName,
1349    ) -> Result<Option<String>, CryptoStoreError> {
1350        Ok(match secret_name {
1351            SecretName::CrossSigningMasterKey
1352            | SecretName::CrossSigningUserSigningKey
1353            | SecretName::CrossSigningSelfSigningKey => {
1354                self.inner.identity.lock().await.export_secret(secret_name).await
1355            }
1356            SecretName::RecoveryKey => {
1357                if let Some(key) = self.load_backup_keys().await?.decryption_key {
1358                    let exported = key.to_base64();
1359                    Some(exported)
1360                } else {
1361                    None
1362                }
1363            }
1364            name => {
1365                warn!(secret = ?name, "Unknown secret was requested");
1366                None
1367            }
1368        })
1369    }
1370
1371    /// Export all the private cross signing keys we have.
1372    ///
1373    /// The export will contain the seed for the ed25519 keys as a unpadded
1374    /// base64 encoded string.
1375    ///
1376    /// This method returns `None` if we don't have any private cross signing
1377    /// keys.
1378    pub async fn export_cross_signing_keys(
1379        &self,
1380    ) -> Result<Option<CrossSigningKeyExport>, CryptoStoreError> {
1381        let master_key = self.export_secret(&SecretName::CrossSigningMasterKey).await?;
1382        let self_signing_key = self.export_secret(&SecretName::CrossSigningSelfSigningKey).await?;
1383        let user_signing_key = self.export_secret(&SecretName::CrossSigningUserSigningKey).await?;
1384
1385        Ok(if master_key.is_none() && self_signing_key.is_none() && user_signing_key.is_none() {
1386            None
1387        } else {
1388            Some(CrossSigningKeyExport { master_key, self_signing_key, user_signing_key })
1389        })
1390    }
1391
1392    /// Import our private cross signing keys.
1393    ///
1394    /// The export needs to contain the seed for the Ed25519 keys as an unpadded
1395    /// base64 encoded string.
1396    pub async fn import_cross_signing_keys(
1397        &self,
1398        export: CrossSigningKeyExport,
1399    ) -> Result<CrossSigningStatus, SecretImportError> {
1400        if let Some(public_identity) =
1401            self.get_identity(self.user_id()).await?.and_then(|i| i.own())
1402        {
1403            let identity = self.inner.identity.lock().await;
1404
1405            identity
1406                .import_secrets(
1407                    public_identity.to_owned(),
1408                    export.master_key.as_deref(),
1409                    export.self_signing_key.as_deref(),
1410                    export.user_signing_key.as_deref(),
1411                )
1412                .await?;
1413
1414            let status = identity.status().await;
1415
1416            let diff = identity.get_public_identity_diff(&public_identity.inner).await;
1417
1418            let mut changes =
1419                Changes { private_identity: Some(identity.clone()), ..Default::default() };
1420
1421            if diff.none_differ() {
1422                public_identity.mark_as_verified();
1423                changes.identities.changed.push(UserIdentityData::Own(public_identity.inner));
1424            }
1425
1426            info!(?status, "Successfully imported the private cross-signing keys");
1427
1428            self.save_changes(changes).await?;
1429        } else {
1430            warn!("No public identity found while importing cross-signing keys, a /keys/query needs to be done");
1431        }
1432
1433        Ok(self.inner.identity.lock().await.status().await)
1434    }
1435
1436    /// Export all the secrets we have in the store into a [`SecretsBundle`].
1437    ///
1438    /// This method will export all the private cross-signing keys and, if
1439    /// available, the private part of a backup key and its accompanying
1440    /// version.
1441    ///
1442    /// The method will fail if we don't have all three private cross-signing
1443    /// keys available.
1444    ///
1445    /// **Warning**: Only export this and share it with a trusted recipient,
1446    /// i.e. if an existing device is sharing this with a new device.
1447    pub async fn export_secrets_bundle(&self) -> Result<SecretsBundle, SecretsBundleExportError> {
1448        let Some(cross_signing) = self.export_cross_signing_keys().await? else {
1449            return Err(SecretsBundleExportError::MissingCrossSigningKeys);
1450        };
1451
1452        let Some(master_key) = cross_signing.master_key.clone() else {
1453            return Err(SecretsBundleExportError::MissingCrossSigningKey(KeyUsage::Master));
1454        };
1455
1456        let Some(user_signing_key) = cross_signing.user_signing_key.clone() else {
1457            return Err(SecretsBundleExportError::MissingCrossSigningKey(KeyUsage::UserSigning));
1458        };
1459
1460        let Some(self_signing_key) = cross_signing.self_signing_key.clone() else {
1461            return Err(SecretsBundleExportError::MissingCrossSigningKey(KeyUsage::SelfSigning));
1462        };
1463
1464        let backup_keys = self.load_backup_keys().await?;
1465
1466        let backup = if let Some(key) = backup_keys.decryption_key {
1467            if let Some(backup_version) = backup_keys.backup_version {
1468                Some(BackupSecrets::MegolmBackupV1Curve25519AesSha2(
1469                    MegolmBackupV1Curve25519AesSha2Secrets { key, backup_version },
1470                ))
1471            } else {
1472                return Err(SecretsBundleExportError::MissingBackupVersion);
1473            }
1474        } else {
1475            None
1476        };
1477
1478        Ok(SecretsBundle {
1479            cross_signing: CrossSigningSecrets { master_key, user_signing_key, self_signing_key },
1480            backup,
1481        })
1482    }
1483
1484    /// Import and persists secrets from a [`SecretsBundle`].
1485    ///
1486    /// This method will import all the private cross-signing keys and, if
1487    /// available, the private part of a backup key and its accompanying
1488    /// version into the store.
1489    ///
1490    /// **Warning**: Only import this from a trusted source, i.e. if an existing
1491    /// device is sharing this with a new device. The imported cross-signing
1492    /// keys will create a [`OwnUserIdentity`] and mark it as verified.
1493    ///
1494    /// The backup key will be persisted in the store and can be enabled using
1495    /// the [`BackupMachine`].
1496    pub async fn import_secrets_bundle(
1497        &self,
1498        bundle: &SecretsBundle,
1499    ) -> Result<(), SecretImportError> {
1500        let mut changes = Changes::default();
1501
1502        if let Some(backup_bundle) = &bundle.backup {
1503            match backup_bundle {
1504                BackupSecrets::MegolmBackupV1Curve25519AesSha2(bundle) => {
1505                    changes.backup_decryption_key = Some(bundle.key.clone());
1506                    changes.backup_version = Some(bundle.backup_version.clone());
1507                }
1508            }
1509        }
1510
1511        let identity = self.inner.identity.lock().await;
1512
1513        identity
1514            .import_secrets_unchecked(
1515                Some(&bundle.cross_signing.master_key),
1516                Some(&bundle.cross_signing.self_signing_key),
1517                Some(&bundle.cross_signing.user_signing_key),
1518            )
1519            .await?;
1520
1521        let public_identity = identity.to_public_identity().await.expect(
1522            "We should be able to create a new public identity since we just imported \
1523             all the private cross-signing keys",
1524        );
1525
1526        changes.private_identity = Some(identity.clone());
1527        changes.identities.new.push(UserIdentityData::Own(public_identity));
1528
1529        Ok(self.save_changes(changes).await?)
1530    }
1531
1532    /// Import the given `secret` named `secret_name` into the keystore.
1533    pub async fn import_secret(&self, secret: &GossippedSecret) -> Result<(), SecretImportError> {
1534        match &secret.secret_name {
1535            SecretName::CrossSigningMasterKey
1536            | SecretName::CrossSigningUserSigningKey
1537            | SecretName::CrossSigningSelfSigningKey => {
1538                if let Some(public_identity) =
1539                    self.get_identity(self.user_id()).await?.and_then(|i| i.own())
1540                {
1541                    let identity = self.inner.identity.lock().await;
1542
1543                    identity
1544                        .import_secret(
1545                            public_identity,
1546                            &secret.secret_name,
1547                            &secret.event.content.secret,
1548                        )
1549                        .await?;
1550                    info!(
1551                        secret_name = ?secret.secret_name,
1552                        "Successfully imported a private cross signing key"
1553                    );
1554
1555                    let changes =
1556                        Changes { private_identity: Some(identity.clone()), ..Default::default() };
1557
1558                    self.save_changes(changes).await?;
1559                }
1560            }
1561            SecretName::RecoveryKey => {
1562                // We don't import the decryption key here since we'll want to
1563                // check if the public key matches to the latest version on the
1564                // server. We instead put the secret into a secret inbox where
1565                // it will stay until it either gets overwritten
1566                // or the user accepts the secret.
1567            }
1568            name => {
1569                warn!(secret = ?name, "Tried to import an unknown secret");
1570            }
1571        }
1572
1573        Ok(())
1574    }
1575
1576    /// Check whether there is a global flag to only encrypt messages for
1577    /// trusted devices or for everyone.
1578    pub async fn get_only_allow_trusted_devices(&self) -> Result<bool> {
1579        let value = self.get_value("only_allow_trusted_devices").await?.unwrap_or_default();
1580        Ok(value)
1581    }
1582
1583    /// Set global flag whether to encrypt messages for untrusted devices, or
1584    /// whether they should be excluded from the conversation.
1585    pub async fn set_only_allow_trusted_devices(
1586        &self,
1587        block_untrusted_devices: bool,
1588    ) -> Result<()> {
1589        self.set_value("only_allow_trusted_devices", &block_untrusted_devices).await
1590    }
1591
1592    /// Get custom stored value associated with a key
1593    pub async fn get_value<T: DeserializeOwned>(&self, key: &str) -> Result<Option<T>> {
1594        let Some(value) = self.get_custom_value(key).await? else {
1595            return Ok(None);
1596        };
1597        let deserialized = self.deserialize_value(&value)?;
1598        Ok(Some(deserialized))
1599    }
1600
1601    /// Store custom value associated with a key
1602    pub async fn set_value(&self, key: &str, value: &impl Serialize) -> Result<()> {
1603        let serialized = self.serialize_value(value)?;
1604        self.set_custom_value(key, serialized).await?;
1605        Ok(())
1606    }
1607
1608    fn serialize_value(&self, value: &impl Serialize) -> Result<Vec<u8>> {
1609        let serialized =
1610            rmp_serde::to_vec_named(value).map_err(|x| CryptoStoreError::Backend(x.into()))?;
1611        Ok(serialized)
1612    }
1613
1614    fn deserialize_value<T: DeserializeOwned>(&self, value: &[u8]) -> Result<T> {
1615        let deserialized =
1616            rmp_serde::from_slice(value).map_err(|e| CryptoStoreError::Backend(e.into()))?;
1617        Ok(deserialized)
1618    }
1619
1620    /// Receive notifications of room keys being received as a [`Stream`].
1621    ///
1622    /// Each time a room key is updated in any way, an update will be sent to
1623    /// the stream. Updates that happen at the same time are batched into a
1624    /// [`Vec`].
1625    ///
1626    /// If the reader of the stream lags too far behind an error will be sent to
1627    /// the reader.
1628    ///
1629    /// The stream will terminate once all references to the underlying
1630    /// `CryptoStoreWrapper` are dropped.
1631    pub fn room_keys_received_stream(
1632        &self,
1633    ) -> impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>> {
1634        self.inner.store.room_keys_received_stream()
1635    }
1636
1637    /// Receive notifications of received `m.room_key.withheld` messages.
1638    ///
1639    /// Each time an `m.room_key.withheld` is received and stored, an update
1640    /// will be sent to the stream. Updates that happen at the same time are
1641    /// batched into a [`Vec`].
1642    ///
1643    /// If the reader of the stream lags too far behind, a warning will be
1644    /// logged and items will be dropped.
1645    pub fn room_keys_withheld_received_stream(
1646        &self,
1647    ) -> impl Stream<Item = Vec<RoomKeyWithheldInfo>> {
1648        self.inner.store.room_keys_withheld_received_stream()
1649    }
1650
1651    /// Returns a stream of user identity updates, allowing users to listen for
1652    /// notifications about new or changed user identities.
1653    ///
1654    /// The stream produced by this method emits updates whenever a new user
1655    /// identity is discovered or when an existing identities information is
1656    /// changed. Users can subscribe to this stream and receive updates in
1657    /// real-time.
1658    ///
1659    /// Caution: the returned stream will never terminate, and it holds a
1660    /// reference to the [`CryptoStore`]. Listeners should be careful to avoid
1661    /// resource leaks.
1662    ///
1663    /// # Examples
1664    ///
1665    /// ```no_run
1666    /// # use matrix_sdk_crypto::OlmMachine;
1667    /// # use ruma::{device_id, user_id};
1668    /// # use futures_util::{pin_mut, StreamExt};
1669    /// # let machine: OlmMachine = unimplemented!();
1670    /// # futures_executor::block_on(async {
1671    /// let identities_stream = machine.store().user_identities_stream();
1672    /// pin_mut!(identities_stream);
1673    ///
1674    /// for identity_updates in identities_stream.next().await {
1675    ///     for (_, identity) in identity_updates.new {
1676    ///         println!("A new identity has been added {}", identity.user_id());
1677    ///     }
1678    /// }
1679    /// # });
1680    /// ```
1681    pub fn user_identities_stream(&self) -> impl Stream<Item = IdentityUpdates> {
1682        let verification_machine = self.inner.verification_machine.to_owned();
1683
1684        let this = self.clone();
1685        self.inner.store.identities_stream().map(move |(own_identity, identities, _)| {
1686            let (new_identities, changed_identities, unchanged_identities) = identities.into_maps();
1687
1688            let map_identity = |(user_id, identity)| {
1689                (
1690                    user_id,
1691                    UserIdentity::new(
1692                        this.clone(),
1693                        identity,
1694                        verification_machine.to_owned(),
1695                        own_identity.to_owned(),
1696                    ),
1697                )
1698            };
1699
1700            let new = new_identities.into_iter().map(map_identity).collect();
1701            let changed = changed_identities.into_iter().map(map_identity).collect();
1702            let unchanged = unchanged_identities.into_iter().map(map_identity).collect();
1703
1704            IdentityUpdates { new, changed, unchanged }
1705        })
1706    }
1707
1708    /// Returns a stream of device updates, allowing users to listen for
1709    /// notifications about new or changed devices.
1710    ///
1711    /// The stream produced by this method emits updates whenever a new device
1712    /// is discovered or when an existing device's information is changed. Users
1713    /// can subscribe to this stream and receive updates in real-time.
1714    ///
1715    /// Caution: the returned stream will never terminate, and it holds a
1716    /// reference to the [`CryptoStore`]. Listeners should be careful to avoid
1717    /// resource leaks.
1718    ///
1719    /// # Examples
1720    ///
1721    /// ```no_run
1722    /// # use matrix_sdk_crypto::OlmMachine;
1723    /// # use ruma::{device_id, user_id};
1724    /// # use futures_util::{pin_mut, StreamExt};
1725    /// # let machine: OlmMachine = unimplemented!();
1726    /// # futures_executor::block_on(async {
1727    /// let devices_stream = machine.store().devices_stream();
1728    /// pin_mut!(devices_stream);
1729    ///
1730    /// for device_updates in devices_stream.next().await {
1731    ///     if let Some(user_devices) = device_updates.new.get(machine.user_id()) {
1732    ///         for device in user_devices.values() {
1733    ///             println!("A new device has been added {}", device.device_id());
1734    ///         }
1735    ///     }
1736    /// }
1737    /// # });
1738    /// ```
1739    pub fn devices_stream(&self) -> impl Stream<Item = DeviceUpdates> {
1740        let verification_machine = self.inner.verification_machine.to_owned();
1741
1742        self.inner.store.identities_stream().map(move |(own_identity, identities, devices)| {
1743            collect_device_updates(
1744                verification_machine.to_owned(),
1745                own_identity,
1746                identities,
1747                devices,
1748            )
1749        })
1750    }
1751
1752    /// Returns a [`Stream`] of user identity and device updates
1753    ///
1754    /// The stream returned by this method returns the same data as
1755    /// [`Store::user_identities_stream`] and [`Store::devices_stream`] but does
1756    /// not include references to the `VerificationMachine`. It is therefore a
1757    /// lower-level view on that data.
1758    ///
1759    /// The stream will terminate once all references to the underlying
1760    /// `CryptoStoreWrapper` are dropped.
1761    pub fn identities_stream_raw(&self) -> impl Stream<Item = (IdentityChanges, DeviceChanges)> {
1762        self.inner.store.identities_stream().map(|(_, identities, devices)| (identities, devices))
1763    }
1764
1765    /// Creates a `CrossProcessStoreLock` for this store, that will contain the
1766    /// given key and value when hold.
1767    pub fn create_store_lock(
1768        &self,
1769        lock_key: String,
1770        lock_value: String,
1771    ) -> CrossProcessStoreLock<LockableCryptoStore> {
1772        self.inner.store.create_store_lock(lock_key, lock_value)
1773    }
1774
1775    /// Receive notifications of gossipped secrets being received and stored in
1776    /// the secret inbox as a [`Stream`].
1777    ///
1778    /// The gossipped secrets are received using the `m.secret.send` event type
1779    /// and are guaranteed to have been received over a 1-to-1 Olm
1780    /// [`Session`] from a verified [`Device`].
1781    ///
1782    /// The [`GossippedSecret`] can also be later found in the secret inbox and
1783    /// retrieved using the [`CryptoStore::get_secrets_from_inbox()`] method.
1784    ///
1785    /// After a suitable secret of a certain type has been found it can be
1786    /// removed from the store
1787    /// using the [`CryptoStore::delete_secrets_from_inbox()`] method.
1788    ///
1789    /// The only secret this will currently broadcast is the
1790    /// `m.megolm_backup.v1`.
1791    ///
1792    /// If the reader of the stream lags too far behind, a warning will be
1793    /// logged and items will be dropped.
1794    ///
1795    /// # Examples
1796    ///
1797    /// ```no_run
1798    /// # use matrix_sdk_crypto::OlmMachine;
1799    /// # use ruma::{device_id, user_id};
1800    /// # use futures_util::{pin_mut, StreamExt};
1801    /// # let alice = user_id!("@alice:example.org").to_owned();
1802    /// # futures_executor::block_on(async {
1803    /// # let machine = OlmMachine::new(&alice, device_id!("DEVICEID")).await;
1804    ///
1805    /// let secret_stream = machine.store().secrets_stream();
1806    /// pin_mut!(secret_stream);
1807    ///
1808    /// for secret in secret_stream.next().await {
1809    ///     // Accept the secret if it's valid, then delete all the secrets of this type.
1810    ///     machine.store().delete_secrets_from_inbox(&secret.secret_name);
1811    /// }
1812    /// # });
1813    /// ```
1814    pub fn secrets_stream(&self) -> impl Stream<Item = GossippedSecret> {
1815        self.inner.store.secrets_stream()
1816    }
1817
1818    /// Import the given room keys into the store.
1819    ///
1820    /// # Arguments
1821    ///
1822    /// * `exported_keys` - The keys to be imported.
1823    /// * `from_backup_version` - If the keys came from key backup, the key
1824    ///   backup version. This will cause the keys to be marked as already
1825    ///   backed up, and therefore not requiring another backup.
1826    /// * `progress_listener` - Callback which will be called after each key is
1827    ///   processed. Called with arguments `(processed, total)` where
1828    ///   `processed` is the number of keys processed so far, and `total` is the
1829    ///   total number of keys (i.e., `exported_keys.len()`).
1830    pub async fn import_room_keys(
1831        &self,
1832        exported_keys: Vec<ExportedRoomKey>,
1833        from_backup_version: Option<&str>,
1834        progress_listener: impl Fn(usize, usize),
1835    ) -> Result<RoomKeyImportResult> {
1836        let exported_keys: Vec<&ExportedRoomKey> = exported_keys.iter().collect();
1837        self.import_sessions_impl(exported_keys, from_backup_version, progress_listener).await
1838    }
1839
1840    /// Import the given room keys into our store.
1841    ///
1842    /// # Arguments
1843    ///
1844    /// * `exported_keys` - A list of previously exported keys that should be
1845    ///   imported into our store. If we already have a better version of a key
1846    ///   the key will *not* be imported.
1847    ///
1848    /// Returns a tuple of numbers that represent the number of sessions that
1849    /// were imported and the total number of sessions that were found in the
1850    /// key export.
1851    ///
1852    /// # Examples
1853    ///
1854    /// ```no_run
1855    /// # use std::io::Cursor;
1856    /// # use matrix_sdk_crypto::{OlmMachine, decrypt_room_key_export};
1857    /// # use ruma::{device_id, user_id};
1858    /// # let alice = user_id!("@alice:example.org");
1859    /// # async {
1860    /// # let machine = OlmMachine::new(&alice, device_id!("DEVICEID")).await;
1861    /// # let export = Cursor::new("".to_owned());
1862    /// let exported_keys = decrypt_room_key_export(export, "1234").unwrap();
1863    /// machine.store().import_exported_room_keys(exported_keys, |_, _| {}).await.unwrap();
1864    /// # };
1865    /// ```
1866    pub async fn import_exported_room_keys(
1867        &self,
1868        exported_keys: Vec<ExportedRoomKey>,
1869        progress_listener: impl Fn(usize, usize),
1870    ) -> Result<RoomKeyImportResult> {
1871        self.import_room_keys(exported_keys, None, progress_listener).await
1872    }
1873
1874    async fn import_sessions_impl<T>(
1875        &self,
1876        room_keys: Vec<T>,
1877        from_backup_version: Option<&str>,
1878        progress_listener: impl Fn(usize, usize),
1879    ) -> Result<RoomKeyImportResult>
1880    where
1881        T: TryInto<InboundGroupSession> + RoomKeyExport + Copy,
1882        T::Error: Debug,
1883    {
1884        let mut sessions = Vec::new();
1885
1886        async fn new_session_better(
1887            session: &InboundGroupSession,
1888            old_session: Option<InboundGroupSession>,
1889        ) -> bool {
1890            if let Some(old_session) = &old_session {
1891                session.compare(old_session).await == SessionOrdering::Better
1892            } else {
1893                true
1894            }
1895        }
1896
1897        let total_count = room_keys.len();
1898        let mut keys = BTreeMap::new();
1899
1900        for (i, key) in room_keys.into_iter().enumerate() {
1901            match key.try_into() {
1902                Ok(session) => {
1903                    let old_session = self
1904                        .inner
1905                        .store
1906                        .get_inbound_group_session(session.room_id(), session.session_id())
1907                        .await?;
1908
1909                    // Only import the session if we didn't have this session or
1910                    // if it's a better version of the same session.
1911                    if new_session_better(&session, old_session).await {
1912                        if from_backup_version.is_some() {
1913                            session.mark_as_backed_up();
1914                        }
1915
1916                        keys.entry(session.room_id().to_owned())
1917                            .or_insert_with(BTreeMap::new)
1918                            .entry(session.sender_key().to_base64())
1919                            .or_insert_with(BTreeSet::new)
1920                            .insert(session.session_id().to_owned());
1921
1922                        sessions.push(session);
1923                    }
1924                }
1925                Err(e) => {
1926                    warn!(
1927                        sender_key = key.sender_key().to_base64(),
1928                        room_id = ?key.room_id(),
1929                        session_id = key.session_id(),
1930                        error = ?e,
1931                        "Couldn't import a room key from a file export."
1932                    );
1933                }
1934            }
1935
1936            progress_listener(i, total_count);
1937        }
1938
1939        let imported_count = sessions.len();
1940
1941        self.inner.store.save_inbound_group_sessions(sessions, from_backup_version).await?;
1942
1943        info!(total_count, imported_count, room_keys = ?keys, "Successfully imported room keys");
1944
1945        Ok(RoomKeyImportResult::new(imported_count, total_count, keys))
1946    }
1947
1948    pub(crate) fn crypto_store(&self) -> Arc<CryptoStoreWrapper> {
1949        self.inner.store.clone()
1950    }
1951
1952    /// Export the keys that match the given predicate.
1953    ///
1954    /// # Arguments
1955    ///
1956    /// * `predicate` - A closure that will be called for every known
1957    ///   `InboundGroupSession`, which represents a room key. If the closure
1958    ///   returns `true` the `InboundGroupSession` will be included in the
1959    ///   export, if the closure returns `false` it will not be included.
1960    ///
1961    /// # Examples
1962    ///
1963    /// ```no_run
1964    /// # use matrix_sdk_crypto::{OlmMachine, encrypt_room_key_export};
1965    /// # use ruma::{device_id, user_id, room_id};
1966    /// # let alice = user_id!("@alice:example.org");
1967    /// # async {
1968    /// # let machine = OlmMachine::new(&alice, device_id!("DEVICEID")).await;
1969    /// let room_id = room_id!("!test:localhost");
1970    /// let exported_keys = machine.store().export_room_keys(|s| s.room_id() == room_id).await.unwrap();
1971    /// let encrypted_export = encrypt_room_key_export(&exported_keys, "1234", 1);
1972    /// # };
1973    /// ```
1974    pub async fn export_room_keys(
1975        &self,
1976        predicate: impl FnMut(&InboundGroupSession) -> bool,
1977    ) -> Result<Vec<ExportedRoomKey>> {
1978        let mut exported = Vec::new();
1979
1980        let mut sessions = self.get_inbound_group_sessions().await?;
1981        sessions.retain(predicate);
1982
1983        for session in sessions {
1984            let export = session.export().await;
1985            exported.push(export);
1986        }
1987
1988        Ok(exported)
1989    }
1990
1991    /// Export room keys matching a predicate, providing them as an async
1992    /// `Stream`.
1993    ///
1994    /// # Arguments
1995    ///
1996    /// * `predicate` - A closure that will be called for every known
1997    ///   `InboundGroupSession`, which represents a room key. If the closure
1998    ///   returns `true` the `InboundGroupSession` will be included in the
1999    ///   export, if the closure returns `false` it will not be included.
2000    ///
2001    /// # Examples
2002    ///
2003    /// ```no_run
2004    /// use std::pin::pin;
2005    ///
2006    /// use matrix_sdk_crypto::{olm::ExportedRoomKey, OlmMachine};
2007    /// use ruma::{device_id, room_id, user_id};
2008    /// use tokio_stream::StreamExt;
2009    /// # async {
2010    /// let alice = user_id!("@alice:example.org");
2011    /// let machine = OlmMachine::new(&alice, device_id!("DEVICEID")).await;
2012    /// let room_id = room_id!("!test:localhost");
2013    /// let mut keys = pin!(machine
2014    ///     .store()
2015    ///     .export_room_keys_stream(|s| s.room_id() == room_id)
2016    ///     .await
2017    ///     .unwrap());
2018    /// while let Some(key) = keys.next().await {
2019    ///     println!("{}", key.room_id);
2020    /// }
2021    /// # };
2022    /// ```
2023    pub async fn export_room_keys_stream(
2024        &self,
2025        predicate: impl FnMut(&InboundGroupSession) -> bool,
2026    ) -> Result<impl Stream<Item = ExportedRoomKey>> {
2027        // TODO: if/when there is a get_inbound_group_sessions_stream, use that here.
2028        let sessions = self.get_inbound_group_sessions().await?;
2029        Ok(futures_util::stream::iter(sessions.into_iter().filter(predicate))
2030            .then(|session| async move { session.export().await }))
2031    }
2032
2033    /// Assemble a room key bundle for sharing encrypted history, as per
2034    /// [MSC4268].
2035    ///
2036    /// [MSC4268]: https://github.com/matrix-org/matrix-spec-proposals/pull/4268
2037    pub async fn build_room_key_bundle(
2038        &self,
2039        room_id: &RoomId,
2040    ) -> std::result::Result<RoomKeyBundle, CryptoStoreError> {
2041        // TODO: make this WAY more efficient. We should only fetch sessions for the
2042        //   correct room.
2043        let mut sessions = self.get_inbound_group_sessions().await?;
2044        sessions.retain(|session| session.room_id == room_id);
2045
2046        let mut bundle = RoomKeyBundle::default();
2047        for session in sessions {
2048            if session.shared_history() {
2049                bundle.room_keys.push(session.export().await.into());
2050            } else {
2051                bundle.withheld.push(RoomKeyWithheldContent::new(
2052                    session.algorithm().to_owned(),
2053                    WithheldCode::Unauthorised,
2054                    session.room_id().to_owned(),
2055                    session.session_id().to_owned(),
2056                    session.sender_key().to_owned(),
2057                    self.device_id().to_owned(),
2058                ));
2059            }
2060        }
2061
2062        Ok(bundle)
2063    }
2064}
2065
2066impl Deref for Store {
2067    type Target = DynCryptoStore;
2068
2069    fn deref(&self) -> &Self::Target {
2070        self.inner.store.deref().deref()
2071    }
2072}
2073
2074/// A crypto store that implements primitives for cross-process locking.
2075#[derive(Clone, Debug)]
2076pub struct LockableCryptoStore(Arc<dyn CryptoStore<Error = CryptoStoreError>>);
2077
2078#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
2079#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
2080impl matrix_sdk_common::store_locks::BackingStore for LockableCryptoStore {
2081    type LockError = CryptoStoreError;
2082
2083    async fn try_lock(
2084        &self,
2085        lease_duration_ms: u32,
2086        key: &str,
2087        holder: &str,
2088    ) -> std::result::Result<bool, Self::LockError> {
2089        self.0.try_take_leased_lock(lease_duration_ms, key, holder).await
2090    }
2091}
2092
2093#[cfg(test)]
2094mod tests {
2095    use std::pin::pin;
2096
2097    use futures_util::StreamExt;
2098    use insta::{_macro_support::Content, assert_json_snapshot, internals::ContentPath};
2099    use matrix_sdk_test::async_test;
2100    use ruma::{device_id, room_id, user_id, RoomId};
2101    use vodozemac::megolm::SessionKey;
2102
2103    use crate::{
2104        machine::test_helpers::get_machine_pair,
2105        olm::{InboundGroupSession, SenderData},
2106        store::DehydratedDeviceKey,
2107        types::EventEncryptionAlgorithm,
2108        OlmMachine,
2109    };
2110
2111    #[async_test]
2112    async fn test_import_room_keys_notifies_stream() {
2113        use futures_util::FutureExt;
2114
2115        let (alice, bob, _) =
2116            get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
2117
2118        let room1_id = room_id!("!room1:localhost");
2119        alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
2120        let exported_sessions = alice.store().export_room_keys(|_| true).await.unwrap();
2121
2122        let mut room_keys_received_stream = Box::pin(bob.store().room_keys_received_stream());
2123        bob.store().import_room_keys(exported_sessions, None, |_, _| {}).await.unwrap();
2124
2125        let room_keys = room_keys_received_stream
2126            .next()
2127            .now_or_never()
2128            .flatten()
2129            .expect("We should have received an update of room key infos")
2130            .unwrap();
2131        assert_eq!(room_keys.len(), 1);
2132        assert_eq!(room_keys[0].room_id, "!room1:localhost");
2133    }
2134
2135    #[async_test]
2136    async fn test_export_room_keys_provides_selected_keys() {
2137        // Given an OlmMachine with room keys in it
2138        let (alice, _, _) = get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
2139        let room1_id = room_id!("!room1:localhost");
2140        let room2_id = room_id!("!room2:localhost");
2141        let room3_id = room_id!("!room3:localhost");
2142        alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
2143        alice.create_outbound_group_session_with_defaults_test_helper(room2_id).await.unwrap();
2144        alice.create_outbound_group_session_with_defaults_test_helper(room3_id).await.unwrap();
2145
2146        // When I export some of the keys
2147        let keys = alice
2148            .store()
2149            .export_room_keys(|s| s.room_id() == room2_id || s.room_id() == room3_id)
2150            .await
2151            .unwrap();
2152
2153        // Then the requested keys were provided
2154        assert_eq!(keys.len(), 2);
2155        assert_eq!(keys[0].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
2156        assert_eq!(keys[1].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
2157        assert_eq!(keys[0].room_id, "!room2:localhost");
2158        assert_eq!(keys[1].room_id, "!room3:localhost");
2159        assert_eq!(keys[0].session_key.to_base64().len(), 220);
2160        assert_eq!(keys[1].session_key.to_base64().len(), 220);
2161    }
2162
2163    #[async_test]
2164    async fn test_export_room_keys_stream_can_provide_all_keys() {
2165        // Given an OlmMachine with room keys in it
2166        let (alice, _, _) = get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
2167        let room1_id = room_id!("!room1:localhost");
2168        let room2_id = room_id!("!room2:localhost");
2169        alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
2170        alice.create_outbound_group_session_with_defaults_test_helper(room2_id).await.unwrap();
2171
2172        // When I export the keys as a stream
2173        let mut keys = pin!(alice.store().export_room_keys_stream(|_| true).await.unwrap());
2174
2175        // And collect them
2176        let mut collected = vec![];
2177        while let Some(key) = keys.next().await {
2178            collected.push(key);
2179        }
2180
2181        // Then all the keys were provided
2182        assert_eq!(collected.len(), 2);
2183        assert_eq!(collected[0].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
2184        assert_eq!(collected[1].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
2185        assert_eq!(collected[0].room_id, "!room1:localhost");
2186        assert_eq!(collected[1].room_id, "!room2:localhost");
2187        assert_eq!(collected[0].session_key.to_base64().len(), 220);
2188        assert_eq!(collected[1].session_key.to_base64().len(), 220);
2189    }
2190
2191    #[async_test]
2192    async fn test_export_room_keys_stream_can_provide_a_subset_of_keys() {
2193        // Given an OlmMachine with room keys in it
2194        let (alice, _, _) = get_machine_pair(user_id!("@a:s.co"), user_id!("@b:s.co"), false).await;
2195        let room1_id = room_id!("!room1:localhost");
2196        let room2_id = room_id!("!room2:localhost");
2197        alice.create_outbound_group_session_with_defaults_test_helper(room1_id).await.unwrap();
2198        alice.create_outbound_group_session_with_defaults_test_helper(room2_id).await.unwrap();
2199
2200        // When I export the keys as a stream
2201        let mut keys =
2202            pin!(alice.store().export_room_keys_stream(|s| s.room_id() == room1_id).await.unwrap());
2203
2204        // And collect them
2205        let mut collected = vec![];
2206        while let Some(key) = keys.next().await {
2207            collected.push(key);
2208        }
2209
2210        // Then all the keys matching our predicate were provided, and no others
2211        assert_eq!(collected.len(), 1);
2212        assert_eq!(collected[0].algorithm, EventEncryptionAlgorithm::MegolmV1AesSha2);
2213        assert_eq!(collected[0].room_id, "!room1:localhost");
2214        assert_eq!(collected[0].session_key.to_base64().len(), 220);
2215    }
2216
2217    #[async_test]
2218    async fn test_export_secrets_bundle() {
2219        let user_id = user_id!("@alice:example.com");
2220        let (first, second, _) = get_machine_pair(user_id, user_id, false).await;
2221
2222        let _ = first
2223            .bootstrap_cross_signing(false)
2224            .await
2225            .expect("We should be able to bootstrap cross-signing");
2226
2227        let bundle = first.store().export_secrets_bundle().await.expect(
2228            "We should be able to export the secrets bundle, now that we \
2229             have the cross-signing keys",
2230        );
2231
2232        assert!(bundle.backup.is_none(), "The bundle should not contain a backup key");
2233
2234        second
2235            .store()
2236            .import_secrets_bundle(&bundle)
2237            .await
2238            .expect("We should be able to import the secrets bundle");
2239
2240        let status = second.cross_signing_status().await;
2241        let identity = second.get_identity(user_id, None).await.unwrap().unwrap().own().unwrap();
2242
2243        assert!(identity.is_verified(), "The public identity should be marked as verified.");
2244
2245        assert!(status.is_complete(), "We should have imported all the cross-signing keys");
2246    }
2247
2248    #[async_test]
2249    async fn test_create_dehydrated_device_key() {
2250        let pickle_key = DehydratedDeviceKey::new()
2251            .expect("Should be able to create a random dehydrated device key");
2252
2253        let to_vec = pickle_key.inner.to_vec();
2254        let pickle_key_from_slice = DehydratedDeviceKey::from_slice(to_vec.as_slice())
2255            .expect("Should be able to create a dehydrated device key from slice");
2256
2257        assert_eq!(pickle_key_from_slice.to_base64(), pickle_key.to_base64());
2258    }
2259
2260    #[async_test]
2261    async fn test_create_dehydrated_errors() {
2262        let too_small = [0u8; 22];
2263        let pickle_key = DehydratedDeviceKey::from_slice(&too_small);
2264
2265        assert!(pickle_key.is_err());
2266
2267        let too_big = [0u8; 40];
2268        let pickle_key = DehydratedDeviceKey::from_slice(&too_big);
2269
2270        assert!(pickle_key.is_err());
2271    }
2272
2273    #[async_test]
2274    async fn test_build_room_key_bundle() {
2275        // Given: Alice has sent a number of room keys to Bob, including some in the
2276        // wrong room, and some that are not marked as shared...
2277        let alice = OlmMachine::new(user_id!("@a:s.co"), device_id!("ALICE")).await;
2278        let bob = OlmMachine::new(user_id!("@b:s.co"), device_id!("BOB")).await;
2279
2280        let room1_id = room_id!("!room1:localhost");
2281        let room2_id = room_id!("!room2:localhost");
2282
2283        /* We use hardcoded megolm session data, to get a stable output snapshot. These were all created with:
2284
2285           println!("{}", vodozemac::megolm::GroupSession::new(Default::default()).session_key().to_base64());
2286        */
2287        let session_key1 = "AgAAAAC2XHVzsMBKs4QCRElJ92CJKyGtknCSC8HY7cQ7UYwndMKLQAejXLh5UA0l6s736mgctcUMNvELScUWrObdflrHo+vth/gWreXOaCnaSxmyjjKErQwyIYTkUfqbHy40RJfEesLwnN23on9XAkch/iy8R2+Jz7B8zfG01f2Ow2SxPQFnAndcO1ZSD2GmXgedy6n4B20MWI1jGP2wiexOWbFSya8DO/VxC9m5+/mF+WwYqdpKn9g4Y05Yw4uz7cdjTc3rXm7xK+8E7hI//5QD1nHPvuKYbjjM9u2JSL+Bzp61Cw";
2288        let session_key2 = "AgAAAAC1BXreFTUQQSBGekTEuYxhdytRKyv4JgDGcG+VOBYdPNGgs807SdibCGJky4lJ3I+7ZDGHoUzZPZP/4ogGu4kxni0PWdtWuN7+5zsuamgoFF/BkaGeUUGv6kgIkx8pyPpM5SASTUEP9bN2loDSpUPYwfiIqz74DgC4WQ4435sTBctYvKz8n+TDJwdLXpyT6zKljuqADAioud+s/iqx9LYn9HpbBfezZcvbg67GtE113pLrvde3IcPI5s6dNHK2onGO2B2eoaobcen18bbEDnlUGPeIivArLya7Da6us14jBQ";
2289        let session_key3 = "AgAAAAAM9KFsliaUUhGSXgwOzM5UemjkNH4n8NHgvC/y8hhw13zTF+ooGD4uIYEXYX630oNvQm/EvgZo+dkoc0re+vsqsx4sQeNODdSjcBsWOa0oDF+irQn9oYoLUDPI1IBtY1rX+FV99Zm/xnG7uFOX7aTVlko2GSdejy1w9mfobmfxu5aUc04A9zaKJP1pOthZvRAlhpymGYHgsDtWPrrjyc/yypMflE4kIUEEEtu1kT6mrAmcl615XYRAHYK9G2+fZsGvokwzbkl4nulGwcZMpQEoM0nD2o3GWgX81HW3nGfKBg";
2290        let session_key4 = "AgAAAAA4Kkesxq2h4v9PLD6Sm3Smxspz1PXTqytQPCMQMkkrHNmzV2bHlJ+6/Al9cu8vh1Oj69AK0WUAeJOJuaiskEeg/PI3P03+UYLeC379RzgqwSHdBgdQ41G2vD6zpgmE/8vYToe+qpCZACtPOswZxyqxHH+T/Iq0nv13JmlFGIeA6fEPfr5Y28B49viG74Fs9rxV9EH5PfjbuPM/p+Sz5obShuaBPKQBX1jT913nEXPoIJ06exNZGr0285nw/LgVvNlmWmbqNnbzO2cNZjQWA+xZYz5FSfyCxwqEBbEdUCuRCQ";
2291
2292        let sessions = [
2293            create_inbound_group_session_with_visibility(
2294                &alice,
2295                room1_id,
2296                &SessionKey::from_base64(session_key1).unwrap(),
2297                true,
2298            ),
2299            create_inbound_group_session_with_visibility(
2300                &alice,
2301                room1_id,
2302                &SessionKey::from_base64(session_key2).unwrap(),
2303                true,
2304            ),
2305            create_inbound_group_session_with_visibility(
2306                &alice,
2307                room1_id,
2308                &SessionKey::from_base64(session_key3).unwrap(),
2309                false,
2310            ),
2311            create_inbound_group_session_with_visibility(
2312                &alice,
2313                room2_id,
2314                &SessionKey::from_base64(session_key4).unwrap(),
2315                true,
2316            ),
2317        ];
2318        bob.store().save_inbound_group_sessions(&sessions).await.unwrap();
2319
2320        // When I build the bundle
2321        let mut bundle = bob.store().build_room_key_bundle(room1_id).await.unwrap();
2322
2323        // Then the bundle matches the snapshot.
2324
2325        // We sort the sessions in the bundle, so that the snapshot is stable.
2326        bundle.room_keys.sort_by_key(|session| session.session_id.clone());
2327
2328        // We also substitute alice's keys in the snapshot with placeholders
2329        let alice_curve_key = alice.identity_keys().curve25519.to_base64();
2330        let map_alice_curve_key = move |value: Content, _path: ContentPath<'_>| {
2331            assert_eq!(value.as_str().unwrap(), alice_curve_key);
2332            "[alice curve key]"
2333        };
2334        let alice_ed25519_key = alice.identity_keys().ed25519.to_base64();
2335        let map_alice_ed25519_key = move |value: Content, _path: ContentPath<'_>| {
2336            assert_eq!(value.as_str().unwrap(), alice_ed25519_key);
2337            "[alice ed25519 key]"
2338        };
2339
2340        insta::with_settings!({ sort_maps => true }, {
2341            assert_json_snapshot!(bundle, {
2342                ".room_keys[].sender_key" => insta::dynamic_redaction(map_alice_curve_key.clone()),
2343                ".withheld[].sender_key" => insta::dynamic_redaction(map_alice_curve_key),
2344                ".room_keys[].sender_claimed_keys.ed25519" => insta::dynamic_redaction(map_alice_ed25519_key),
2345            });
2346        });
2347    }
2348
2349    /// Create an inbound Megolm session for the given room.
2350    ///
2351    /// `olm_machine` is used to set the `sender_key` and `signing_key`
2352    /// fields of the resultant session.
2353    fn create_inbound_group_session_with_visibility(
2354        olm_machine: &OlmMachine,
2355        room_id: &RoomId,
2356        session_key: &SessionKey,
2357        shared_history: bool,
2358    ) -> InboundGroupSession {
2359        let identity_keys = &olm_machine.store().static_account().identity_keys;
2360        InboundGroupSession::new(
2361            identity_keys.curve25519,
2362            identity_keys.ed25519,
2363            room_id,
2364            session_key,
2365            SenderData::unknown(),
2366            EventEncryptionAlgorithm::MegolmV1AesSha2,
2367            None,
2368            shared_history,
2369        )
2370        .unwrap()
2371    }
2372}