1use std::{
16 collections::{BTreeMap, HashMap, HashSet},
17 sync::Arc,
18 time::Duration,
19};
20
21use itertools::Itertools;
22#[cfg(feature = "experimental-send-custom-to-device")]
23use matrix_sdk_common::deserialized_responses::WithheldCode;
24use matrix_sdk_common::{
25 deserialized_responses::{
26 AlgorithmInfo, DecryptedRoomEvent, DeviceLinkProblem, EncryptionInfo, UnableToDecryptInfo,
27 UnableToDecryptReason, UnsignedDecryptionResult, UnsignedEventLocation, VerificationLevel,
28 VerificationState,
29 },
30 locks::RwLock as StdRwLock,
31 BoxFuture,
32};
33use ruma::{
34 api::client::{
35 dehydrated_device::DehydratedDeviceData,
36 keys::{
37 claim_keys::v3::Request as KeysClaimRequest,
38 get_keys::v3::Response as KeysQueryResponse,
39 upload_keys::v3::{Request as UploadKeysRequest, Response as UploadKeysResponse},
40 upload_signatures::v3::Request as UploadSignaturesRequest,
41 },
42 sync::sync_events::DeviceLists,
43 },
44 assign,
45 events::{
46 secret::request::SecretName, AnyMessageLikeEvent, AnyMessageLikeEventContent,
47 AnyToDeviceEvent, MessageLikeEventContent,
48 },
49 serde::{JsonObject, Raw},
50 DeviceId, MilliSecondsSinceUnixEpoch, OneTimeKeyAlgorithm, OwnedDeviceId, OwnedDeviceKeyId,
51 OwnedTransactionId, OwnedUserId, RoomId, TransactionId, UInt, UserId,
52};
53use serde_json::{value::to_raw_value, Value};
54use tokio::sync::Mutex;
55#[cfg(feature = "experimental-send-custom-to-device")]
56use tracing::trace;
57use tracing::{
58 debug, error,
59 field::{debug, display},
60 info, instrument, trace, warn, Span,
61};
62use vodozemac::{
63 megolm::{DecryptionError, SessionOrdering},
64 Curve25519PublicKey, Ed25519Signature,
65};
66
67use crate::{
68 backups::{BackupMachine, MegolmV1BackupKey},
69 dehydrated_devices::{DehydratedDevices, DehydrationError},
70 error::{EventError, MegolmError, MegolmResult, OlmError, OlmResult, SetRoomSettingsError},
71 gossiping::GossipMachine,
72 identities::{user::UserIdentity, Device, IdentityManager, UserDevices},
73 olm::{
74 Account, CrossSigningStatus, EncryptionSettings, IdentityKeys, InboundGroupSession,
75 KnownSenderData, OlmDecryptionInfo, PrivateCrossSigningIdentity, SenderData,
76 SenderDataFinder, SessionType, StaticAccountData,
77 },
78 session_manager::{GroupSessionManager, SessionManager},
79 store::{
80 Changes, CryptoStoreWrapper, DeviceChanges, IdentityChanges, IntoCryptoStore, MemoryStore,
81 PendingChanges, Result as StoreResult, RoomKeyInfo, RoomSettings, SecretImportError, Store,
82 StoreCache, StoreTransaction, StoredRoomKeyBundleData,
83 },
84 types::{
85 events::{
86 olm_v1::{AnyDecryptedOlmEvent, DecryptedRoomKeyBundleEvent, DecryptedRoomKeyEvent},
87 room::encrypted::{
88 EncryptedEvent, EncryptedToDeviceEvent, RoomEncryptedEventContent,
89 RoomEventEncryptionScheme, SupportedEventEncryptionSchemes,
90 },
91 room_key::{MegolmV1AesSha2Content, RoomKeyContent},
92 room_key_bundle::RoomKeyBundleContent,
93 room_key_withheld::{
94 MegolmV1AesSha2WithheldContent, RoomKeyWithheldContent, RoomKeyWithheldEvent,
95 },
96 ToDeviceEvents,
97 },
98 requests::{
99 AnyIncomingResponse, KeysQueryRequest, OutgoingRequest, ToDeviceRequest,
100 UploadSigningKeysRequest,
101 },
102 EventEncryptionAlgorithm, ProcessedToDeviceEvent, Signatures,
103 },
104 utilities::timestamp_to_iso8601,
105 verification::{Verification, VerificationMachine, VerificationRequest},
106 CollectStrategy, CrossSigningKeyExport, CryptoStoreError, DecryptionSettings, DeviceData,
107 LocalTrust, RoomEventDecryptionResult, SignatureError, TrustRequirement,
108};
109
110#[derive(Clone)]
113pub struct OlmMachine {
114 pub(crate) inner: Arc<OlmMachineInner>,
115}
116
117pub struct OlmMachineInner {
118 user_id: OwnedUserId,
120 device_id: OwnedDeviceId,
122 user_identity: Arc<Mutex<PrivateCrossSigningIdentity>>,
127 store: Store,
131 session_manager: SessionManager,
133 pub(crate) group_session_manager: GroupSessionManager,
135 verification_machine: VerificationMachine,
138 pub(crate) key_request_machine: GossipMachine,
141 identity_manager: IdentityManager,
144 backup_machine: BackupMachine,
146}
147
148#[cfg(not(tarpaulin_include))]
149impl std::fmt::Debug for OlmMachine {
150 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
151 f.debug_struct("OlmMachine")
152 .field("user_id", &self.user_id())
153 .field("device_id", &self.device_id())
154 .finish()
155 }
156}
157
158impl OlmMachine {
159 const CURRENT_GENERATION_STORE_KEY: &'static str = "generation-counter";
160 const HAS_MIGRATED_VERIFICATION_LATCH: &'static str = "HAS_MIGRATED_VERIFICATION_LATCH";
161
162 pub async fn new(user_id: &UserId, device_id: &DeviceId) -> Self {
173 OlmMachine::with_store(user_id, device_id, MemoryStore::new(), None)
174 .await
175 .expect("Reading and writing to the memory store always succeeds")
176 }
177
178 pub(crate) async fn rehydrate(
179 &self,
180 pickle_key: &[u8; 32],
181 device_id: &DeviceId,
182 device_data: Raw<DehydratedDeviceData>,
183 ) -> Result<OlmMachine, DehydrationError> {
184 let account = Account::rehydrate(pickle_key, self.user_id(), device_id, device_data)?;
185 let static_account = account.static_data().clone();
186
187 let store =
188 Arc::new(CryptoStoreWrapper::new(self.user_id(), device_id, MemoryStore::new()));
189 let device = DeviceData::from_account(&account);
190 store.save_pending_changes(PendingChanges { account: Some(account) }).await?;
191 store
192 .save_changes(Changes {
193 devices: DeviceChanges { new: vec![device], ..Default::default() },
194 ..Default::default()
195 })
196 .await?;
197
198 let (verification_machine, store, identity_manager) =
199 Self::new_helper_prelude(store, static_account, self.store().private_identity());
200
201 Ok(Self::new_helper(
202 device_id,
203 store,
204 verification_machine,
205 identity_manager,
206 self.store().private_identity(),
207 None,
208 ))
209 }
210
211 fn new_helper_prelude(
212 store_wrapper: Arc<CryptoStoreWrapper>,
213 account: StaticAccountData,
214 user_identity: Arc<Mutex<PrivateCrossSigningIdentity>>,
215 ) -> (VerificationMachine, Store, IdentityManager) {
216 let verification_machine =
217 VerificationMachine::new(account.clone(), user_identity.clone(), store_wrapper.clone());
218 let store = Store::new(account, user_identity, store_wrapper, verification_machine.clone());
219
220 let identity_manager = IdentityManager::new(store.clone());
221
222 (verification_machine, store, identity_manager)
223 }
224
225 fn new_helper(
226 device_id: &DeviceId,
227 store: Store,
228 verification_machine: VerificationMachine,
229 identity_manager: IdentityManager,
230 user_identity: Arc<Mutex<PrivateCrossSigningIdentity>>,
231 maybe_backup_key: Option<MegolmV1BackupKey>,
232 ) -> Self {
233 let group_session_manager = GroupSessionManager::new(store.clone());
234
235 let users_for_key_claim = Arc::new(StdRwLock::new(BTreeMap::new()));
236 let key_request_machine = GossipMachine::new(
237 store.clone(),
238 identity_manager.clone(),
239 group_session_manager.session_cache(),
240 users_for_key_claim.clone(),
241 );
242
243 let session_manager =
244 SessionManager::new(users_for_key_claim, key_request_machine.clone(), store.clone());
245
246 let backup_machine = BackupMachine::new(store.clone(), maybe_backup_key);
247
248 let inner = Arc::new(OlmMachineInner {
249 user_id: store.user_id().to_owned(),
250 device_id: device_id.to_owned(),
251 user_identity,
252 store,
253 session_manager,
254 group_session_manager,
255 verification_machine,
256 key_request_machine,
257 identity_manager,
258 backup_machine,
259 });
260
261 Self { inner }
262 }
263
264 #[instrument(skip(store, custom_account), fields(ed25519_key, curve25519_key))]
292 pub async fn with_store(
293 user_id: &UserId,
294 device_id: &DeviceId,
295 store: impl IntoCryptoStore,
296 custom_account: Option<vodozemac::olm::Account>,
297 ) -> StoreResult<Self> {
298 let store = store.into_crypto_store();
299
300 let static_account = match store.load_account().await? {
301 Some(account) => {
302 if user_id != account.user_id()
303 || device_id != account.device_id()
304 || custom_account.is_some()
305 {
306 return Err(CryptoStoreError::MismatchedAccount {
307 expected: (account.user_id().to_owned(), account.device_id().to_owned()),
308 got: (user_id.to_owned(), device_id.to_owned()),
309 });
310 }
311
312 Span::current()
313 .record("ed25519_key", display(account.identity_keys().ed25519))
314 .record("curve25519_key", display(account.identity_keys().curve25519));
315 debug!("Restored an Olm account");
316
317 account.static_data().clone()
318 }
319
320 None => {
321 let account = if let Some(account) = custom_account {
322 Account::new_helper(account, user_id, device_id)
323 } else {
324 Account::with_device_id(user_id, device_id)
325 };
326
327 let static_account = account.static_data().clone();
328
329 Span::current()
330 .record("ed25519_key", display(account.identity_keys().ed25519))
331 .record("curve25519_key", display(account.identity_keys().curve25519));
332
333 let device = DeviceData::from_account(&account);
334
335 device.set_trust_state(LocalTrust::Verified);
339
340 let changes = Changes {
341 devices: DeviceChanges { new: vec![device], ..Default::default() },
342 ..Default::default()
343 };
344 store.save_changes(changes).await?;
345 store.save_pending_changes(PendingChanges { account: Some(account) }).await?;
346
347 debug!("Created a new Olm account");
348
349 static_account
350 }
351 };
352
353 let identity = match store.load_identity().await? {
354 Some(i) => {
355 let master_key = i
356 .master_public_key()
357 .await
358 .and_then(|m| m.get_first_key().map(|m| m.to_owned()));
359 debug!(?master_key, "Restored the cross signing identity");
360 i
361 }
362 None => {
363 debug!("Creating an empty cross signing identity stub");
364 PrivateCrossSigningIdentity::empty(user_id)
365 }
366 };
367
368 let saved_keys = store.load_backup_keys().await?;
373 let maybe_backup_key = saved_keys.decryption_key.and_then(|k| {
374 if let Some(version) = saved_keys.backup_version {
375 let megolm_v1_backup_key = k.megolm_v1_public_key();
376 megolm_v1_backup_key.set_version(version);
377 Some(megolm_v1_backup_key)
378 } else {
379 None
380 }
381 });
382
383 let identity = Arc::new(Mutex::new(identity));
384 let store = Arc::new(CryptoStoreWrapper::new(user_id, device_id, store));
385
386 let (verification_machine, store, identity_manager) =
387 Self::new_helper_prelude(store, static_account, identity.clone());
388
389 Self::migration_post_verified_latch_support(&store, &identity_manager).await?;
392
393 Ok(Self::new_helper(
394 device_id,
395 store,
396 verification_machine,
397 identity_manager,
398 identity,
399 maybe_backup_key,
400 ))
401 }
402
403 pub(crate) async fn migration_post_verified_latch_support(
411 store: &Store,
412 identity_manager: &IdentityManager,
413 ) -> Result<(), CryptoStoreError> {
414 let maybe_migrate_for_identity_verified_latch =
415 store.get_custom_value(Self::HAS_MIGRATED_VERIFICATION_LATCH).await?.is_none();
416
417 if maybe_migrate_for_identity_verified_latch {
418 identity_manager.mark_all_tracked_users_as_dirty(store.cache().await?).await?;
419
420 store.set_custom_value(Self::HAS_MIGRATED_VERIFICATION_LATCH, vec![0]).await?
421 }
422 Ok(())
423 }
424
425 pub fn store(&self) -> &Store {
427 &self.inner.store
428 }
429
430 pub fn user_id(&self) -> &UserId {
432 &self.inner.user_id
433 }
434
435 pub fn device_id(&self) -> &DeviceId {
437 &self.inner.device_id
438 }
439
440 pub fn device_creation_time(&self) -> MilliSecondsSinceUnixEpoch {
447 self.inner.store.static_account().creation_local_time()
448 }
449
450 pub fn identity_keys(&self) -> IdentityKeys {
452 let account = self.inner.store.static_account();
453 account.identity_keys()
454 }
455
456 pub async fn display_name(&self) -> StoreResult<Option<String>> {
458 self.store().device_display_name().await
459 }
460
461 pub async fn tracked_users(&self) -> StoreResult<HashSet<OwnedUserId>> {
466 let cache = self.store().cache().await?;
467 Ok(self.inner.identity_manager.key_query_manager.synced(&cache).await?.tracked_users())
468 }
469
470 #[cfg(feature = "automatic-room-key-forwarding")]
479 pub fn set_room_key_requests_enabled(&self, enable: bool) {
480 self.inner.key_request_machine.set_room_key_requests_enabled(enable)
481 }
482
483 pub fn are_room_key_requests_enabled(&self) -> bool {
488 self.inner.key_request_machine.are_room_key_requests_enabled()
489 }
490
491 #[cfg(feature = "automatic-room-key-forwarding")]
500 pub fn set_room_key_forwarding_enabled(&self, enable: bool) {
501 self.inner.key_request_machine.set_room_key_forwarding_enabled(enable)
502 }
503
504 pub fn is_room_key_forwarding_enabled(&self) -> bool {
508 self.inner.key_request_machine.is_room_key_forwarding_enabled()
509 }
510
511 pub async fn outgoing_requests(&self) -> StoreResult<Vec<OutgoingRequest>> {
519 let mut requests = Vec::new();
520
521 {
522 let store_cache = self.inner.store.cache().await?;
523 let account = store_cache.account().await?;
524 if let Some(r) = self.keys_for_upload(&account).await.map(|r| OutgoingRequest {
525 request_id: TransactionId::new(),
526 request: Arc::new(r.into()),
527 }) {
528 requests.push(r);
529 }
530 }
531
532 for request in self
533 .inner
534 .identity_manager
535 .users_for_key_query()
536 .await?
537 .into_iter()
538 .map(|(request_id, r)| OutgoingRequest { request_id, request: Arc::new(r.into()) })
539 {
540 requests.push(request);
541 }
542
543 requests.append(&mut self.inner.verification_machine.outgoing_messages());
544 requests.append(&mut self.inner.key_request_machine.outgoing_to_device_requests().await?);
545
546 Ok(requests)
547 }
548
549 pub fn query_keys_for_users<'a>(
570 &self,
571 users: impl IntoIterator<Item = &'a UserId>,
572 ) -> (OwnedTransactionId, KeysQueryRequest) {
573 self.inner.identity_manager.build_key_query_for_users(users)
574 }
575
576 pub async fn mark_request_as_sent<'a>(
586 &self,
587 request_id: &TransactionId,
588 response: impl Into<AnyIncomingResponse<'a>>,
589 ) -> OlmResult<()> {
590 match response.into() {
591 AnyIncomingResponse::KeysUpload(response) => {
592 Box::pin(self.receive_keys_upload_response(response)).await?;
593 }
594 AnyIncomingResponse::KeysQuery(response) => {
595 Box::pin(self.receive_keys_query_response(request_id, response)).await?;
596 }
597 AnyIncomingResponse::KeysClaim(response) => {
598 Box::pin(
599 self.inner.session_manager.receive_keys_claim_response(request_id, response),
600 )
601 .await?;
602 }
603 AnyIncomingResponse::ToDevice(_) => {
604 Box::pin(self.mark_to_device_request_as_sent(request_id)).await?;
605 }
606 AnyIncomingResponse::SigningKeysUpload(_) => {
607 Box::pin(self.receive_cross_signing_upload_response()).await?;
608 }
609 AnyIncomingResponse::SignatureUpload(_) => {
610 self.inner.verification_machine.mark_request_as_sent(request_id);
611 }
612 AnyIncomingResponse::RoomMessage(_) => {
613 self.inner.verification_machine.mark_request_as_sent(request_id);
614 }
615 AnyIncomingResponse::KeysBackup(_) => {
616 Box::pin(self.inner.backup_machine.mark_request_as_sent(request_id)).await?;
617 }
618 };
619
620 Ok(())
621 }
622
623 async fn receive_cross_signing_upload_response(&self) -> StoreResult<()> {
625 let identity = self.inner.user_identity.lock().await;
626 identity.mark_as_shared();
627
628 let changes = Changes { private_identity: Some(identity.clone()), ..Default::default() };
629
630 self.store().save_changes(changes).await
631 }
632
633 pub async fn bootstrap_cross_signing(
652 &self,
653 reset: bool,
654 ) -> StoreResult<CrossSigningBootstrapRequests> {
655 let identity = self.inner.user_identity.lock().await.clone();
660
661 let (upload_signing_keys_req, upload_signatures_req) = if reset || identity.is_empty().await
662 {
663 info!("Creating new cross signing identity");
664
665 let (identity, upload_signing_keys_req, upload_signatures_req) = {
666 let cache = self.inner.store.cache().await?;
667 let account = cache.account().await?;
668 account.bootstrap_cross_signing().await
669 };
670
671 let public = identity.to_public_identity().await.expect(
672 "Couldn't create a public version of the identity from a new private identity",
673 );
674
675 *self.inner.user_identity.lock().await = identity.clone();
676
677 self.store()
678 .save_changes(Changes {
679 identities: IdentityChanges { new: vec![public.into()], ..Default::default() },
680 private_identity: Some(identity),
681 ..Default::default()
682 })
683 .await?;
684
685 (upload_signing_keys_req, upload_signatures_req)
686 } else {
687 info!("Trying to upload the existing cross signing identity");
688 let upload_signing_keys_req = identity.as_upload_request().await;
689
690 let upload_signatures_req = identity
692 .sign_account(self.inner.store.static_account())
693 .await
694 .expect("Can't sign device keys");
695
696 (upload_signing_keys_req, upload_signatures_req)
697 };
698
699 let upload_keys_req =
703 self.upload_device_keys().await?.map(|(_, request)| OutgoingRequest::from(request));
704
705 Ok(CrossSigningBootstrapRequests {
706 upload_signing_keys_req,
707 upload_keys_req,
708 upload_signatures_req,
709 })
710 }
711
712 pub async fn upload_device_keys(
724 &self,
725 ) -> StoreResult<Option<(OwnedTransactionId, UploadKeysRequest)>> {
726 let cache = self.store().cache().await?;
727 let account = cache.account().await?;
728
729 Ok(self.keys_for_upload(&account).await.map(|request| (TransactionId::new(), request)))
730 }
731
732 async fn receive_keys_upload_response(&self, response: &UploadKeysResponse) -> OlmResult<()> {
739 self.inner
740 .store
741 .with_transaction(|mut tr| async {
742 let account = tr.account().await?;
743 account.receive_keys_upload_response(response)?;
744 Ok((tr, ()))
745 })
746 .await
747 }
748
749 #[instrument(skip_all)]
777 pub async fn get_missing_sessions(
778 &self,
779 users: impl Iterator<Item = &UserId>,
780 ) -> StoreResult<Option<(OwnedTransactionId, KeysClaimRequest)>> {
781 self.inner.session_manager.get_missing_sessions(users).await
782 }
783
784 async fn receive_keys_query_response(
793 &self,
794 request_id: &TransactionId,
795 response: &KeysQueryResponse,
796 ) -> OlmResult<(DeviceChanges, IdentityChanges)> {
797 self.inner.identity_manager.receive_keys_query_response(request_id, response).await
798 }
799
800 async fn keys_for_upload(&self, account: &Account) -> Option<UploadKeysRequest> {
809 let (mut device_keys, one_time_keys, fallback_keys) = account.keys_for_upload();
810
811 if let Some(device_keys) = &mut device_keys {
821 let private_identity = self.store().private_identity();
822 let guard = private_identity.lock().await;
823
824 if guard.status().await.is_complete() {
825 guard.sign_device_keys(device_keys).await.expect(
826 "We should be able to sign our device keys since we confirmed that we \
827 have a complete set of private cross-signing keys",
828 );
829 }
830 }
831
832 if device_keys.is_none() && one_time_keys.is_empty() && fallback_keys.is_empty() {
833 None
834 } else {
835 let device_keys = device_keys.map(|d| d.to_raw());
836
837 Some(assign!(UploadKeysRequest::new(), {
838 device_keys, one_time_keys, fallback_keys
839 }))
840 }
841 }
842
843 async fn decrypt_to_device_event(
852 &self,
853 transaction: &mut StoreTransaction,
854 event: &EncryptedToDeviceEvent,
855 changes: &mut Changes,
856 ) -> OlmResult<OlmDecryptionInfo> {
857 let mut decrypted =
858 transaction.account().await?.decrypt_to_device_event(&self.inner.store, event).await?;
859
860 if !self.to_device_event_is_from_dehydrated_device(&decrypted, &event.sender).await? {
863 self.handle_decrypted_to_device_event(transaction.cache(), &mut decrypted, changes)
866 .await?;
867 }
868
869 Ok(decrypted)
870 }
871
872 #[instrument(
873 skip_all,
874 fields(room_id = ? content.room_id, session_id)
878 )]
879 async fn handle_key(
880 &self,
881 sender_key: Curve25519PublicKey,
882 event: &DecryptedRoomKeyEvent,
883 content: &MegolmV1AesSha2Content,
884 ) -> OlmResult<Option<InboundGroupSession>> {
885 let session =
886 InboundGroupSession::from_room_key_content(sender_key, event.keys.ed25519, content);
887
888 match session {
889 Ok(mut session) => {
890 Span::current().record("session_id", session.session_id());
891
892 let sender_data =
893 SenderDataFinder::find_using_event(self.store(), sender_key, event, &session)
894 .await?;
895
896 session.sender_data = sender_data;
897
898 match self.store().compare_group_session(&session).await? {
899 SessionOrdering::Better => {
900 info!("Received a new megolm room key");
901
902 Ok(Some(session))
903 }
904 comparison_result => {
905 warn!(
906 ?comparison_result,
907 "Received a megolm room key that we already have a better version \
908 of, discarding"
909 );
910
911 Ok(None)
912 }
913 }
914 }
915 Err(e) => {
916 Span::current().record("session_id", &content.session_id);
917 warn!("Received a room key event which contained an invalid session key: {e}");
918
919 Ok(None)
920 }
921 }
922 }
923
924 #[instrument(skip_all, fields(algorithm = ?event.content.algorithm()))]
926 async fn add_room_key(
927 &self,
928 sender_key: Curve25519PublicKey,
929 event: &DecryptedRoomKeyEvent,
930 ) -> OlmResult<Option<InboundGroupSession>> {
931 match &event.content {
932 RoomKeyContent::MegolmV1AesSha2(content) => {
933 self.handle_key(sender_key, event, content).await
934 }
935 #[cfg(feature = "experimental-algorithms")]
936 RoomKeyContent::MegolmV2AesSha2(content) => {
937 self.handle_key(sender_key, event, content).await
938 }
939 RoomKeyContent::Unknown(_) => {
940 warn!("Received a room key with an unsupported algorithm");
941 Ok(None)
942 }
943 }
944 }
945
946 #[instrument()]
949 async fn receive_room_key_bundle_data(
950 &self,
951 event: &DecryptedRoomKeyBundleEvent,
952 changes: &mut Changes,
953 ) -> OlmResult<()> {
954 let Some(sender_device_keys) = &event.sender_device_keys else {
955 warn!("Received a room key bundle with no sender device keys: ignoring");
956 return Ok(());
957 };
958
959 let sender_device_data =
964 DeviceData::try_from(sender_device_keys).expect("failed to verify sender device keys");
965 let sender_device = self.store().wrap_device_data(sender_device_data).await?;
966
967 changes.received_room_key_bundles.push(StoredRoomKeyBundleData {
968 sender_user: event.sender.clone(),
969 sender_data: SenderData::from_device(&sender_device),
970 bundle_data: event.content.clone(),
971 });
972 Ok(())
973 }
974
975 fn add_withheld_info(&self, changes: &mut Changes, event: &RoomKeyWithheldEvent) {
976 debug!(?event.content, "Processing `m.room_key.withheld` event");
977
978 if let RoomKeyWithheldContent::MegolmV1AesSha2(
979 MegolmV1AesSha2WithheldContent::BlackListed(c)
980 | MegolmV1AesSha2WithheldContent::Unverified(c),
981 ) = &event.content
982 {
983 changes
984 .withheld_session_info
985 .entry(c.room_id.to_owned())
986 .or_default()
987 .insert(c.session_id.to_owned(), event.to_owned());
988 }
989 }
990
991 #[cfg(test)]
992 pub(crate) async fn create_outbound_group_session_with_defaults_test_helper(
993 &self,
994 room_id: &RoomId,
995 ) -> OlmResult<()> {
996 let (_, session) = self
997 .inner
998 .group_session_manager
999 .create_outbound_group_session(
1000 room_id,
1001 EncryptionSettings::default(),
1002 SenderData::unknown(),
1003 )
1004 .await?;
1005
1006 self.store().save_inbound_group_sessions(&[session]).await?;
1007
1008 Ok(())
1009 }
1010
1011 #[cfg(test)]
1012 #[allow(dead_code)]
1013 pub(crate) async fn create_inbound_session_test_helper(
1014 &self,
1015 room_id: &RoomId,
1016 ) -> OlmResult<InboundGroupSession> {
1017 let (_, session) = self
1018 .inner
1019 .group_session_manager
1020 .create_outbound_group_session(
1021 room_id,
1022 EncryptionSettings::default(),
1023 SenderData::unknown(),
1024 )
1025 .await?;
1026
1027 Ok(session)
1028 }
1029
1030 pub async fn encrypt_room_event(
1047 &self,
1048 room_id: &RoomId,
1049 content: impl MessageLikeEventContent,
1050 ) -> MegolmResult<Raw<RoomEncryptedEventContent>> {
1051 let event_type = content.event_type().to_string();
1052 let content = Raw::new(&content)?.cast();
1053 self.encrypt_room_event_raw(room_id, &event_type, &content).await
1054 }
1055
1056 pub async fn encrypt_room_event_raw(
1076 &self,
1077 room_id: &RoomId,
1078 event_type: &str,
1079 content: &Raw<AnyMessageLikeEventContent>,
1080 ) -> MegolmResult<Raw<RoomEncryptedEventContent>> {
1081 self.inner.group_session_manager.encrypt(room_id, event_type, content).await
1082 }
1083
1084 pub async fn discard_room_key(&self, room_id: &RoomId) -> StoreResult<bool> {
1095 self.inner.group_session_manager.invalidate_group_session(room_id).await
1096 }
1097
1098 pub async fn share_room_key(
1118 &self,
1119 room_id: &RoomId,
1120 users: impl Iterator<Item = &UserId>,
1121 encryption_settings: impl Into<EncryptionSettings>,
1122 ) -> OlmResult<Vec<Arc<ToDeviceRequest>>> {
1123 self.inner.group_session_manager.share_room_key(room_id, users, encryption_settings).await
1124 }
1125
1126 #[cfg(feature = "experimental-send-custom-to-device")]
1140 pub async fn encrypt_content_for_devices(
1141 &self,
1142 devices: Vec<DeviceData>,
1143 event_type: &str,
1144 content: &Value,
1145 ) -> OlmResult<(Vec<ToDeviceRequest>, Vec<(DeviceData, WithheldCode)>)> {
1146 let mut changes = Changes::default();
1149
1150 let result = self
1151 .inner
1152 .group_session_manager
1153 .encrypt_content_for_devices(devices, event_type, content.clone(), &mut changes)
1154 .await;
1155
1156 if !changes.is_empty() {
1158 let session_count = changes.sessions.len();
1159
1160 self.inner.store.save_changes(changes).await?;
1161
1162 trace!(
1163 session_count = session_count,
1164 "Stored the changed sessions after encrypting a custom to-device event"
1165 );
1166 }
1167
1168 result
1169 }
1170 pub async fn share_room_key_bundle_data(
1175 &self,
1176 user_id: &UserId,
1177 collect_strategy: &CollectStrategy,
1178 bundle_data: RoomKeyBundleContent,
1179 ) -> OlmResult<Vec<ToDeviceRequest>> {
1180 self.inner
1181 .group_session_manager
1182 .share_room_key_bundle_data(user_id, collect_strategy, bundle_data)
1183 .await
1184 }
1185
1186 #[deprecated(note = "Use OlmMachine::receive_verification_event instead", since = "0.7.0")]
1194 pub async fn receive_unencrypted_verification_event(
1195 &self,
1196 event: &AnyMessageLikeEvent,
1197 ) -> StoreResult<()> {
1198 self.inner.verification_machine.receive_any_event(event).await
1199 }
1200
1201 pub async fn receive_verification_event(&self, event: &AnyMessageLikeEvent) -> StoreResult<()> {
1205 self.inner.verification_machine.receive_any_event(event).await
1206 }
1207
1208 #[instrument(
1214 skip_all,
1215 fields(
1216 sender_key = ?decrypted.result.sender_key,
1217 event_type = decrypted.result.event.event_type(),
1218 ),
1219 )]
1220 async fn handle_decrypted_to_device_event(
1221 &self,
1222 cache: &StoreCache,
1223 decrypted: &mut OlmDecryptionInfo,
1224 changes: &mut Changes,
1225 ) -> OlmResult<()> {
1226 debug!(
1227 sender_device_keys =
1228 ?decrypted.result.event.sender_device_keys().map(|k| (k.curve25519_key(), k.ed25519_key())).unwrap_or((None, None)),
1229 "Received a decrypted to-device event",
1230 );
1231
1232 match &*decrypted.result.event {
1233 AnyDecryptedOlmEvent::RoomKey(e) => {
1234 let session = self.add_room_key(decrypted.result.sender_key, e).await?;
1235 decrypted.inbound_group_session = session;
1236 }
1237 AnyDecryptedOlmEvent::ForwardedRoomKey(e) => {
1238 let session = self
1239 .inner
1240 .key_request_machine
1241 .receive_forwarded_room_key(decrypted.result.sender_key, e)
1242 .await?;
1243 decrypted.inbound_group_session = session;
1244 }
1245 AnyDecryptedOlmEvent::SecretSend(e) => {
1246 let name = self
1247 .inner
1248 .key_request_machine
1249 .receive_secret_event(cache, decrypted.result.sender_key, e, changes)
1250 .await?;
1251
1252 if let Ok(ToDeviceEvents::SecretSend(mut e)) =
1255 decrypted.result.raw_event.deserialize_as()
1256 {
1257 e.content.secret_name = name;
1258 decrypted.result.raw_event = Raw::from_json(to_raw_value(&e)?);
1259 }
1260 }
1261 AnyDecryptedOlmEvent::Dummy(_) => {
1262 debug!("Received an `m.dummy` event");
1263 }
1264 AnyDecryptedOlmEvent::RoomKeyBundle(e) => {
1265 debug!("Received a room key bundle event {:?}", e);
1266 self.receive_room_key_bundle_data(e, changes).await?;
1267 }
1268 AnyDecryptedOlmEvent::Custom(_) => {
1269 warn!("Received an unexpected encrypted to-device event");
1270 }
1271 }
1272
1273 Ok(())
1274 }
1275
1276 async fn handle_verification_event(&self, event: &ToDeviceEvents) {
1277 if let Err(e) = self.inner.verification_machine.receive_any_event(event).await {
1278 error!("Error handling a verification event: {e:?}");
1279 }
1280 }
1281
1282 async fn mark_to_device_request_as_sent(&self, request_id: &TransactionId) -> StoreResult<()> {
1284 self.inner.verification_machine.mark_request_as_sent(request_id);
1285 self.inner.key_request_machine.mark_outgoing_request_as_sent(request_id).await?;
1286 self.inner.group_session_manager.mark_request_as_sent(request_id).await?;
1287 self.inner.session_manager.mark_outgoing_request_as_sent(request_id);
1288 Ok(())
1289 }
1290
1291 pub fn get_verification(&self, user_id: &UserId, flow_id: &str) -> Option<Verification> {
1293 self.inner.verification_machine.get_verification(user_id, flow_id)
1294 }
1295
1296 pub fn get_verification_request(
1298 &self,
1299 user_id: &UserId,
1300 flow_id: impl AsRef<str>,
1301 ) -> Option<VerificationRequest> {
1302 self.inner.verification_machine.get_request(user_id, flow_id)
1303 }
1304
1305 pub fn get_verification_requests(&self, user_id: &UserId) -> Vec<VerificationRequest> {
1307 self.inner.verification_machine.get_requests(user_id)
1308 }
1309
1310 async fn handle_to_device_event(&self, changes: &mut Changes, event: &ToDeviceEvents) {
1311 use crate::types::events::ToDeviceEvents::*;
1312
1313 match event {
1314 RoomKeyRequest(e) => self.inner.key_request_machine.receive_incoming_key_request(e),
1315 SecretRequest(e) => self.inner.key_request_machine.receive_incoming_secret_request(e),
1316 RoomKeyWithheld(e) => self.add_withheld_info(changes, e),
1317 KeyVerificationAccept(..)
1318 | KeyVerificationCancel(..)
1319 | KeyVerificationKey(..)
1320 | KeyVerificationMac(..)
1321 | KeyVerificationRequest(..)
1322 | KeyVerificationReady(..)
1323 | KeyVerificationDone(..)
1324 | KeyVerificationStart(..) => {
1325 self.handle_verification_event(event).await;
1326 }
1327 Dummy(_) | RoomKey(_) | ForwardedRoomKey(_) | RoomEncrypted(_) => {}
1328 _ => {}
1329 }
1330 }
1331
1332 fn record_message_id(event: &Raw<AnyToDeviceEvent>) {
1333 use serde::Deserialize;
1334
1335 #[derive(Deserialize)]
1336 struct ContentStub<'a> {
1337 #[serde(borrow, rename = "org.matrix.msgid")]
1338 message_id: Option<&'a str>,
1339 }
1340
1341 #[derive(Deserialize)]
1342 struct ToDeviceStub<'a> {
1343 sender: &'a str,
1344 #[serde(rename = "type")]
1345 event_type: &'a str,
1346 #[serde(borrow)]
1347 content: ContentStub<'a>,
1348 }
1349
1350 if let Ok(event) = event.deserialize_as::<ToDeviceStub<'_>>() {
1351 Span::current().record("sender", event.sender);
1352 Span::current().record("event_type", event.event_type);
1353 Span::current().record("message_id", event.content.message_id);
1354 }
1355 }
1356
1357 #[instrument(skip_all, fields(sender, event_type, message_id))]
1365 async fn receive_to_device_event(
1366 &self,
1367 transaction: &mut StoreTransaction,
1368 changes: &mut Changes,
1369 mut raw_event: Raw<AnyToDeviceEvent>,
1370 ) -> Option<ProcessedToDeviceEvent> {
1371 Self::record_message_id(&raw_event);
1372
1373 let event: ToDeviceEvents = match raw_event.deserialize_as() {
1374 Ok(e) => e,
1375 Err(e) => {
1376 warn!("Received an invalid to-device event: {e}");
1378 return Some(ProcessedToDeviceEvent::Invalid(raw_event));
1379 }
1380 };
1381
1382 debug!("Received a to-device event");
1383
1384 match event {
1385 ToDeviceEvents::RoomEncrypted(e) => {
1386 let decrypted = match self.decrypt_to_device_event(transaction, &e, changes).await {
1387 Ok(e) => e,
1388 Err(err) => {
1389 if let OlmError::SessionWedged(sender, curve_key) = err {
1390 if let Err(e) = self
1391 .inner
1392 .session_manager
1393 .mark_device_as_wedged(&sender, curve_key)
1394 .await
1395 {
1396 error!(
1397 error = ?e,
1398 "Couldn't mark device from to be unwedged",
1399 );
1400 }
1401 }
1402
1403 return Some(ProcessedToDeviceEvent::UnableToDecrypt(raw_event));
1404 }
1405 };
1406
1407 match self.to_device_event_is_from_dehydrated_device(&decrypted, &e.sender).await {
1410 Ok(true) => {
1411 warn!(
1412 sender = ?e.sender,
1413 session = ?decrypted.session,
1414 "Received a to-device event from a dehydrated device. This is unexpected: ignoring event"
1415 );
1416 return None;
1417 }
1418 Ok(false) => {}
1419 Err(err) => {
1420 error!(
1421 error = ?err,
1422 "Couldn't check whether event is from dehydrated device",
1423 );
1424 }
1425 }
1426
1427 match decrypted.session {
1430 SessionType::New(s) | SessionType::Existing(s) => {
1431 changes.sessions.push(s);
1432 }
1433 }
1434
1435 changes.message_hashes.push(decrypted.message_hash);
1436
1437 if let Some(group_session) = decrypted.inbound_group_session {
1438 changes.inbound_group_sessions.push(group_session);
1439 }
1440
1441 match decrypted.result.raw_event.deserialize_as() {
1442 Ok(event) => {
1443 self.handle_to_device_event(changes, &event).await;
1444
1445 raw_event = event
1446 .serialize_zeroized()
1447 .expect("Zeroizing and reserializing our events should always work")
1448 .cast();
1449 }
1450 Err(e) => {
1451 warn!("Received an invalid encrypted to-device event: {e}");
1452 raw_event = decrypted.result.raw_event;
1453 }
1454 }
1455
1456 Some(ProcessedToDeviceEvent::Decrypted(raw_event))
1457 }
1458
1459 e => {
1460 self.handle_to_device_event(changes, &e).await;
1461 Some(ProcessedToDeviceEvent::PlainText(raw_event))
1462 }
1463 }
1464 }
1465
1466 async fn to_device_event_is_from_dehydrated_device(
1472 &self,
1473 decrypted: &OlmDecryptionInfo,
1474 sender_user_id: &UserId,
1475 ) -> OlmResult<bool> {
1476 if let Some(device_keys) = decrypted.result.event.sender_device_keys() {
1478 if device_keys.dehydrated.unwrap_or(false) {
1484 return Ok(true);
1485 }
1486 }
1491
1492 Ok(self
1494 .store()
1495 .get_device_from_curve_key(sender_user_id, decrypted.result.sender_key)
1496 .await?
1497 .is_some_and(|d| d.is_dehydrated()))
1498 }
1499
1500 #[instrument(skip_all)]
1518 pub async fn receive_sync_changes(
1519 &self,
1520 sync_changes: EncryptionSyncChanges<'_>,
1521 ) -> OlmResult<(Vec<ProcessedToDeviceEvent>, Vec<RoomKeyInfo>)> {
1522 let mut store_transaction = self.inner.store.transaction().await;
1523
1524 let (events, changes) =
1525 self.preprocess_sync_changes(&mut store_transaction, sync_changes).await?;
1526
1527 let room_key_updates: Vec<_> =
1530 changes.inbound_group_sessions.iter().map(RoomKeyInfo::from).collect();
1531
1532 self.store().save_changes(changes).await?;
1533 store_transaction.commit().await?;
1534
1535 Ok((events, room_key_updates))
1536 }
1537
1538 pub(crate) async fn preprocess_sync_changes(
1547 &self,
1548 transaction: &mut StoreTransaction,
1549 sync_changes: EncryptionSyncChanges<'_>,
1550 ) -> OlmResult<(Vec<ProcessedToDeviceEvent>, Changes)> {
1551 let mut events: Vec<ProcessedToDeviceEvent> = self
1553 .inner
1554 .verification_machine
1555 .garbage_collect()
1556 .iter()
1557 .map(|e| ProcessedToDeviceEvent::PlainText(e.clone()))
1561 .collect();
1562 let mut changes = Default::default();
1565
1566 {
1567 let account = transaction.account().await?;
1568 account.update_key_counts(
1569 sync_changes.one_time_keys_counts,
1570 sync_changes.unused_fallback_keys,
1571 )
1572 }
1573
1574 if let Err(e) = self
1575 .inner
1576 .identity_manager
1577 .receive_device_changes(
1578 transaction.cache(),
1579 sync_changes.changed_devices.changed.iter().map(|u| u.as_ref()),
1580 )
1581 .await
1582 {
1583 error!(error = ?e, "Error marking a tracked user as changed");
1584 }
1585
1586 for raw_event in sync_changes.to_device_events {
1587 let raw_event =
1588 Box::pin(self.receive_to_device_event(transaction, &mut changes, raw_event)).await;
1589
1590 if let Some(raw_event) = raw_event {
1591 events.push(raw_event);
1592 }
1593 }
1594
1595 let changed_sessions = self
1596 .inner
1597 .key_request_machine
1598 .collect_incoming_key_requests(transaction.cache())
1599 .await?;
1600
1601 changes.sessions.extend(changed_sessions);
1602 changes.next_batch_token = sync_changes.next_batch_token;
1603
1604 Ok((events, changes))
1605 }
1606
1607 pub async fn request_room_key(
1624 &self,
1625 event: &Raw<EncryptedEvent>,
1626 room_id: &RoomId,
1627 ) -> MegolmResult<(Option<OutgoingRequest>, OutgoingRequest)> {
1628 let event = event.deserialize()?;
1629 self.inner.key_request_machine.request_key(room_id, &event).await
1630 }
1631
1632 async fn get_or_update_verification_state(
1642 &self,
1643 session: &InboundGroupSession,
1644 sender: &UserId,
1645 ) -> MegolmResult<(VerificationState, Option<OwnedDeviceId>)> {
1646 fn should_recalculate_sender_data(sender_data: &SenderData) -> bool {
1657 matches!(
1658 sender_data,
1659 SenderData::UnknownDevice { .. }
1660 | SenderData::DeviceInfo { .. }
1661 | SenderData::VerificationViolation { .. }
1662 )
1663 }
1664
1665 let sender_data = if should_recalculate_sender_data(&session.sender_data) {
1666 let calculated_sender_data = SenderDataFinder::find_using_curve_key(
1668 self.store(),
1669 session.sender_key(),
1670 sender,
1671 session,
1672 )
1673 .await?;
1674
1675 if calculated_sender_data.compare_trust_level(&session.sender_data).is_gt() {
1677 let mut new_session = session.clone();
1679 new_session.sender_data = calculated_sender_data.clone();
1680 self.store().save_inbound_group_sessions(&[new_session]).await?;
1681
1682 calculated_sender_data
1684 } else {
1685 session.sender_data.clone()
1687 }
1688 } else {
1689 session.sender_data.clone()
1690 };
1691
1692 Ok(sender_data_to_verification_state(sender_data, session.has_been_imported()))
1693 }
1694
1695 pub async fn query_missing_secrets_from_other_sessions(&self) -> StoreResult<bool> {
1720 let identity = self.inner.user_identity.lock().await;
1721 let mut secrets = identity.get_missing_secrets().await;
1722
1723 if self.store().load_backup_keys().await?.decryption_key.is_none() {
1724 secrets.push(SecretName::RecoveryKey);
1725 }
1726
1727 if secrets.is_empty() {
1728 debug!("No missing requests to query");
1729 return Ok(false);
1730 }
1731
1732 let secret_requests = GossipMachine::request_missing_secrets(self.user_id(), secrets);
1733
1734 let unsent_request = self.store().get_unsent_secret_requests().await?;
1736 let not_yet_requested = secret_requests
1737 .into_iter()
1738 .filter(|request| !unsent_request.iter().any(|unsent| unsent.info == request.info))
1739 .collect_vec();
1740
1741 if not_yet_requested.is_empty() {
1742 debug!("The missing secrets have already been requested");
1743 Ok(false)
1744 } else {
1745 debug!("Requesting missing secrets");
1746
1747 let changes = Changes { key_requests: not_yet_requested, ..Default::default() };
1748
1749 self.store().save_changes(changes).await?;
1750 Ok(true)
1751 }
1752 }
1753
1754 async fn get_encryption_info(
1760 &self,
1761 session: &InboundGroupSession,
1762 sender: &UserId,
1763 ) -> MegolmResult<EncryptionInfo> {
1764 let (verification_state, device_id) =
1765 self.get_or_update_verification_state(session, sender).await?;
1766
1767 let sender = sender.to_owned();
1768
1769 Ok(EncryptionInfo {
1770 sender,
1771 sender_device: device_id,
1772 algorithm_info: AlgorithmInfo::MegolmV1AesSha2 {
1773 curve25519_key: session.sender_key().to_base64(),
1774 sender_claimed_keys: session
1775 .signing_keys()
1776 .iter()
1777 .map(|(k, v)| (k.to_owned(), v.to_base64()))
1778 .collect(),
1779 session_id: Some(session.session_id().to_owned()),
1780 },
1781 verification_state,
1782 })
1783 }
1784
1785 async fn decrypt_megolm_events(
1786 &self,
1787 room_id: &RoomId,
1788 event: &EncryptedEvent,
1789 content: &SupportedEventEncryptionSchemes<'_>,
1790 decryption_settings: &DecryptionSettings,
1791 ) -> MegolmResult<(JsonObject, EncryptionInfo)> {
1792 let session =
1793 self.get_inbound_group_session_or_error(room_id, content.session_id()).await?;
1794
1795 Span::current().record("sender_key", debug(session.sender_key()));
1801
1802 let result = session.decrypt(event).await;
1803 match result {
1804 Ok((decrypted_event, _)) => {
1805 let encryption_info = self.get_encryption_info(&session, &event.sender).await?;
1806
1807 self.check_sender_trust_requirement(
1808 &session,
1809 &encryption_info,
1810 &decryption_settings.sender_device_trust_requirement,
1811 )?;
1812
1813 Ok((decrypted_event, encryption_info))
1814 }
1815 Err(error) => Err(
1816 if let MegolmError::Decryption(DecryptionError::UnknownMessageIndex(_, _)) = error {
1817 let withheld_code = self
1818 .inner
1819 .store
1820 .get_withheld_info(room_id, content.session_id())
1821 .await?
1822 .map(|e| e.content.withheld_code());
1823
1824 if withheld_code.is_some() {
1825 MegolmError::MissingRoomKey(withheld_code)
1827 } else {
1828 error
1829 }
1830 } else {
1831 error
1832 },
1833 ),
1834 }
1835 }
1836
1837 fn check_sender_trust_requirement(
1843 &self,
1844 session: &InboundGroupSession,
1845 encryption_info: &EncryptionInfo,
1846 trust_requirement: &TrustRequirement,
1847 ) -> MegolmResult<()> {
1848 trace!(
1849 verification_state = ?encryption_info.verification_state,
1850 ?trust_requirement, "check_sender_trust_requirement",
1851 );
1852
1853 let verification_level = match &encryption_info.verification_state {
1856 VerificationState::Verified => return Ok(()),
1857 VerificationState::Unverified(verification_level) => verification_level,
1858 };
1859
1860 let ok = match trust_requirement {
1861 TrustRequirement::Untrusted => true,
1862
1863 TrustRequirement::CrossSignedOrLegacy => {
1864 let legacy_session = match session.sender_data {
1870 SenderData::DeviceInfo { legacy_session, .. } => legacy_session,
1871 SenderData::UnknownDevice { legacy_session, .. } => legacy_session,
1872 _ => false,
1873 };
1874
1875 match (verification_level, legacy_session) {
1885 (VerificationLevel::UnverifiedIdentity, _) => true,
1887
1888 (VerificationLevel::UnsignedDevice, true) => true,
1890
1891 (VerificationLevel::None(_), true) => true,
1893
1894 (VerificationLevel::VerificationViolation, _)
1896 | (VerificationLevel::UnsignedDevice, false)
1897 | (VerificationLevel::None(_), false) => false,
1898 }
1899 }
1900
1901 TrustRequirement::CrossSigned => match verification_level {
1904 VerificationLevel::UnverifiedIdentity => true,
1905
1906 VerificationLevel::VerificationViolation
1907 | VerificationLevel::UnsignedDevice
1908 | VerificationLevel::None(_) => false,
1909 },
1910 };
1911
1912 if ok {
1913 Ok(())
1914 } else {
1915 Err(MegolmError::SenderIdentityNotTrusted(verification_level.clone()))
1916 }
1917 }
1918
1919 async fn get_inbound_group_session_or_error(
1924 &self,
1925 room_id: &RoomId,
1926 session_id: &str,
1927 ) -> MegolmResult<InboundGroupSession> {
1928 match self.store().get_inbound_group_session(room_id, session_id).await? {
1929 Some(session) => Ok(session),
1930 None => {
1931 let withheld_code = self
1932 .inner
1933 .store
1934 .get_withheld_info(room_id, session_id)
1935 .await?
1936 .map(|e| e.content.withheld_code());
1937 Err(MegolmError::MissingRoomKey(withheld_code))
1938 }
1939 }
1940 }
1941
1942 pub async fn try_decrypt_room_event(
1957 &self,
1958 raw_event: &Raw<EncryptedEvent>,
1959 room_id: &RoomId,
1960 decryption_settings: &DecryptionSettings,
1961 ) -> Result<RoomEventDecryptionResult, CryptoStoreError> {
1962 match self.decrypt_room_event_inner(raw_event, room_id, true, decryption_settings).await {
1963 Ok(decrypted) => Ok(RoomEventDecryptionResult::Decrypted(decrypted)),
1964 Err(err) => Ok(RoomEventDecryptionResult::UnableToDecrypt(megolm_error_to_utd_info(
1965 raw_event, err,
1966 )?)),
1967 }
1968 }
1969
1970 pub async fn decrypt_room_event(
1978 &self,
1979 event: &Raw<EncryptedEvent>,
1980 room_id: &RoomId,
1981 decryption_settings: &DecryptionSettings,
1982 ) -> MegolmResult<DecryptedRoomEvent> {
1983 self.decrypt_room_event_inner(event, room_id, true, decryption_settings).await
1984 }
1985
1986 #[instrument(name = "decrypt_room_event", skip_all, fields(?room_id, event_id, origin_server_ts, sender, algorithm, session_id, message_index, sender_key))]
1987 async fn decrypt_room_event_inner(
1988 &self,
1989 event: &Raw<EncryptedEvent>,
1990 room_id: &RoomId,
1991 decrypt_unsigned: bool,
1992 decryption_settings: &DecryptionSettings,
1993 ) -> MegolmResult<DecryptedRoomEvent> {
1994 let event = event.deserialize()?;
1995
1996 Span::current()
1997 .record("sender", debug(&event.sender))
1998 .record("event_id", debug(&event.event_id))
1999 .record(
2000 "origin_server_ts",
2001 timestamp_to_iso8601(event.origin_server_ts)
2002 .unwrap_or_else(|| "<out of range>".to_owned()),
2003 )
2004 .record("algorithm", debug(event.content.algorithm()));
2005
2006 let content: SupportedEventEncryptionSchemes<'_> = match &event.content.scheme {
2007 RoomEventEncryptionScheme::MegolmV1AesSha2(c) => {
2008 Span::current().record("sender_key", debug(c.sender_key));
2009 c.into()
2010 }
2011 #[cfg(feature = "experimental-algorithms")]
2012 RoomEventEncryptionScheme::MegolmV2AesSha2(c) => c.into(),
2013 RoomEventEncryptionScheme::Unknown(_) => {
2014 warn!("Received an encrypted room event with an unsupported algorithm");
2015 return Err(EventError::UnsupportedAlgorithm.into());
2016 }
2017 };
2018
2019 Span::current().record("session_id", content.session_id());
2020 Span::current().record("message_index", content.message_index());
2021
2022 let result =
2023 self.decrypt_megolm_events(room_id, &event, &content, decryption_settings).await;
2024
2025 if let Err(e) = &result {
2026 #[cfg(feature = "automatic-room-key-forwarding")]
2027 match e {
2028 MegolmError::MissingRoomKey(_)
2031 | MegolmError::Decryption(DecryptionError::UnknownMessageIndex(_, _)) => {
2032 self.inner
2033 .key_request_machine
2034 .create_outgoing_key_request(room_id, &event)
2035 .await?;
2036 }
2037 _ => {}
2038 }
2039
2040 warn!("Failed to decrypt a room event: {e}");
2041 }
2042
2043 let (mut decrypted_event, encryption_info) = result?;
2044
2045 let mut unsigned_encryption_info = None;
2046 if decrypt_unsigned {
2047 unsigned_encryption_info = self
2049 .decrypt_unsigned_events(&mut decrypted_event, room_id, decryption_settings)
2050 .await;
2051 }
2052
2053 let event = serde_json::from_value::<Raw<AnyMessageLikeEvent>>(decrypted_event.into())?;
2054
2055 Ok(DecryptedRoomEvent { event, encryption_info, unsigned_encryption_info })
2056 }
2057
2058 async fn decrypt_unsigned_events(
2068 &self,
2069 main_event: &mut JsonObject,
2070 room_id: &RoomId,
2071 decryption_settings: &DecryptionSettings,
2072 ) -> Option<BTreeMap<UnsignedEventLocation, UnsignedDecryptionResult>> {
2073 let unsigned = main_event.get_mut("unsigned")?.as_object_mut()?;
2074 let mut unsigned_encryption_info: Option<
2075 BTreeMap<UnsignedEventLocation, UnsignedDecryptionResult>,
2076 > = None;
2077
2078 let location = UnsignedEventLocation::RelationsReplace;
2080 let replace = location.find_mut(unsigned);
2081 if let Some(decryption_result) =
2082 self.decrypt_unsigned_event(replace, room_id, decryption_settings).await
2083 {
2084 unsigned_encryption_info
2085 .get_or_insert_with(Default::default)
2086 .insert(location, decryption_result);
2087 }
2088
2089 let location = UnsignedEventLocation::RelationsThreadLatestEvent;
2092 let thread_latest_event = location.find_mut(unsigned);
2093 if let Some(decryption_result) =
2094 self.decrypt_unsigned_event(thread_latest_event, room_id, decryption_settings).await
2095 {
2096 unsigned_encryption_info
2097 .get_or_insert_with(Default::default)
2098 .insert(location, decryption_result);
2099 }
2100
2101 unsigned_encryption_info
2102 }
2103
2104 fn decrypt_unsigned_event<'a>(
2112 &'a self,
2113 event: Option<&'a mut Value>,
2114 room_id: &'a RoomId,
2115 decryption_settings: &'a DecryptionSettings,
2116 ) -> BoxFuture<'a, Option<UnsignedDecryptionResult>> {
2117 Box::pin(async move {
2118 let event = event?;
2119
2120 let is_encrypted = event
2121 .get("type")
2122 .and_then(|type_| type_.as_str())
2123 .is_some_and(|s| s == "m.room.encrypted");
2124 if !is_encrypted {
2125 return None;
2126 }
2127
2128 let raw_event = serde_json::from_value(event.clone()).ok()?;
2129 match self
2130 .decrypt_room_event_inner(&raw_event, room_id, false, decryption_settings)
2131 .await
2132 {
2133 Ok(decrypted_event) => {
2134 *event = serde_json::to_value(decrypted_event.event).ok()?;
2136 Some(UnsignedDecryptionResult::Decrypted(decrypted_event.encryption_info))
2137 }
2138 Err(err) => {
2139 let utd_info = megolm_error_to_utd_info(&raw_event, err).ok()?;
2144 Some(UnsignedDecryptionResult::UnableToDecrypt(utd_info))
2145 }
2146 }
2147 })
2148 }
2149
2150 pub async fn is_room_key_available(
2157 &self,
2158 event: &Raw<EncryptedEvent>,
2159 room_id: &RoomId,
2160 ) -> Result<bool, CryptoStoreError> {
2161 let event = event.deserialize()?;
2162
2163 let (session_id, message_index) = match &event.content.scheme {
2164 RoomEventEncryptionScheme::MegolmV1AesSha2(c) => {
2165 (&c.session_id, c.ciphertext.message_index())
2166 }
2167 #[cfg(feature = "experimental-algorithms")]
2168 RoomEventEncryptionScheme::MegolmV2AesSha2(c) => {
2169 (&c.session_id, c.ciphertext.message_index())
2170 }
2171 RoomEventEncryptionScheme::Unknown(_) => {
2172 return Ok(false);
2174 }
2175 };
2176
2177 Ok(self
2180 .store()
2181 .get_inbound_group_session(room_id, session_id)
2182 .await?
2183 .filter(|s| s.first_known_index() <= message_index)
2184 .is_some())
2185 }
2186
2187 pub async fn get_room_event_encryption_info(
2200 &self,
2201 event: &Raw<EncryptedEvent>,
2202 room_id: &RoomId,
2203 ) -> MegolmResult<EncryptionInfo> {
2204 let event = event.deserialize()?;
2205
2206 let content: SupportedEventEncryptionSchemes<'_> = match &event.content.scheme {
2207 RoomEventEncryptionScheme::MegolmV1AesSha2(c) => c.into(),
2208 #[cfg(feature = "experimental-algorithms")]
2209 RoomEventEncryptionScheme::MegolmV2AesSha2(c) => c.into(),
2210 RoomEventEncryptionScheme::Unknown(_) => {
2211 return Err(EventError::UnsupportedAlgorithm.into());
2212 }
2213 };
2214
2215 self.get_session_encryption_info(room_id, content.session_id(), &event.sender).await
2216 }
2217
2218 pub async fn get_session_encryption_info(
2232 &self,
2233 room_id: &RoomId,
2234 session_id: &str,
2235 sender: &UserId,
2236 ) -> MegolmResult<EncryptionInfo> {
2237 let session = self.get_inbound_group_session_or_error(room_id, session_id).await?;
2238 self.get_encryption_info(&session, sender).await
2239 }
2240
2241 pub async fn update_tracked_users(
2259 &self,
2260 users: impl IntoIterator<Item = &UserId>,
2261 ) -> StoreResult<()> {
2262 self.inner.identity_manager.update_tracked_users(users).await
2263 }
2264
2265 pub async fn mark_all_tracked_users_as_dirty(&self) -> StoreResult<()> {
2270 self.inner
2271 .identity_manager
2272 .mark_all_tracked_users_as_dirty(self.inner.store.cache().await?)
2273 .await
2274 }
2275
2276 async fn wait_if_user_pending(
2277 &self,
2278 user_id: &UserId,
2279 timeout: Option<Duration>,
2280 ) -> StoreResult<()> {
2281 if let Some(timeout) = timeout {
2282 let cache = self.store().cache().await?;
2283 self.inner
2284 .identity_manager
2285 .key_query_manager
2286 .wait_if_user_key_query_pending(cache, timeout, user_id)
2287 .await?;
2288 }
2289 Ok(())
2290 }
2291
2292 #[instrument(skip(self))]
2322 pub async fn get_device(
2323 &self,
2324 user_id: &UserId,
2325 device_id: &DeviceId,
2326 timeout: Option<Duration>,
2327 ) -> StoreResult<Option<Device>> {
2328 self.wait_if_user_pending(user_id, timeout).await?;
2329 self.store().get_device(user_id, device_id).await
2330 }
2331
2332 #[instrument(skip(self))]
2346 pub async fn get_identity(
2347 &self,
2348 user_id: &UserId,
2349 timeout: Option<Duration>,
2350 ) -> StoreResult<Option<UserIdentity>> {
2351 self.wait_if_user_pending(user_id, timeout).await?;
2352 self.store().get_identity(user_id).await
2353 }
2354
2355 #[instrument(skip(self))]
2382 pub async fn get_user_devices(
2383 &self,
2384 user_id: &UserId,
2385 timeout: Option<Duration>,
2386 ) -> StoreResult<UserDevices> {
2387 self.wait_if_user_pending(user_id, timeout).await?;
2388 self.store().get_user_devices(user_id).await
2389 }
2390
2391 pub async fn cross_signing_status(&self) -> CrossSigningStatus {
2396 self.inner.user_identity.lock().await.status().await
2397 }
2398
2399 pub async fn export_cross_signing_keys(&self) -> StoreResult<Option<CrossSigningKeyExport>> {
2407 let master_key = self.store().export_secret(&SecretName::CrossSigningMasterKey).await?;
2408 let self_signing_key =
2409 self.store().export_secret(&SecretName::CrossSigningSelfSigningKey).await?;
2410 let user_signing_key =
2411 self.store().export_secret(&SecretName::CrossSigningUserSigningKey).await?;
2412
2413 Ok(if master_key.is_none() && self_signing_key.is_none() && user_signing_key.is_none() {
2414 None
2415 } else {
2416 Some(CrossSigningKeyExport { master_key, self_signing_key, user_signing_key })
2417 })
2418 }
2419
2420 pub async fn import_cross_signing_keys(
2425 &self,
2426 export: CrossSigningKeyExport,
2427 ) -> Result<CrossSigningStatus, SecretImportError> {
2428 self.store().import_cross_signing_keys(export).await
2429 }
2430
2431 async fn sign_with_master_key(
2432 &self,
2433 message: &str,
2434 ) -> Result<(OwnedDeviceKeyId, Ed25519Signature), SignatureError> {
2435 let identity = &*self.inner.user_identity.lock().await;
2436 let key_id = identity.master_key_id().await.ok_or(SignatureError::MissingSigningKey)?;
2437
2438 let signature = identity.sign(message).await?;
2439
2440 Ok((key_id, signature))
2441 }
2442
2443 pub async fn sign(&self, message: &str) -> Result<Signatures, CryptoStoreError> {
2449 let mut signatures = Signatures::new();
2450
2451 {
2452 let cache = self.inner.store.cache().await?;
2453 let account = cache.account().await?;
2454 let key_id = account.signing_key_id();
2455 let signature = account.sign(message);
2456 signatures.add_signature(self.user_id().to_owned(), key_id, signature);
2457 }
2458
2459 match self.sign_with_master_key(message).await {
2460 Ok((key_id, signature)) => {
2461 signatures.add_signature(self.user_id().to_owned(), key_id, signature);
2462 }
2463 Err(e) => {
2464 warn!(error = ?e, "Couldn't sign the message using the cross signing master key")
2465 }
2466 }
2467
2468 Ok(signatures)
2469 }
2470
2471 pub fn backup_machine(&self) -> &BackupMachine {
2476 &self.inner.backup_machine
2477 }
2478
2479 pub async fn initialize_crypto_store_generation(
2483 &self,
2484 generation: &Mutex<Option<u64>>,
2485 ) -> StoreResult<()> {
2486 let mut gen_guard = generation.lock().await;
2489
2490 let prev_generation =
2491 self.inner.store.get_custom_value(Self::CURRENT_GENERATION_STORE_KEY).await?;
2492
2493 let gen = match prev_generation {
2494 Some(val) => {
2495 u64::from_le_bytes(val.try_into().map_err(|_| {
2498 CryptoStoreError::InvalidLockGeneration("invalid format".to_owned())
2499 })?)
2500 .wrapping_add(1)
2501 }
2502 None => 0,
2503 };
2504
2505 tracing::debug!("Initialising crypto store generation at {}", gen);
2506
2507 self.inner
2508 .store
2509 .set_custom_value(Self::CURRENT_GENERATION_STORE_KEY, gen.to_le_bytes().to_vec())
2510 .await?;
2511
2512 *gen_guard = Some(gen);
2513
2514 Ok(())
2515 }
2516
2517 pub async fn maintain_crypto_store_generation(
2542 &'_ self,
2543 generation: &Mutex<Option<u64>>,
2544 ) -> StoreResult<(bool, u64)> {
2545 let mut gen_guard = generation.lock().await;
2546
2547 let actual_gen = self
2553 .inner
2554 .store
2555 .get_custom_value(Self::CURRENT_GENERATION_STORE_KEY)
2556 .await?
2557 .ok_or_else(|| {
2558 CryptoStoreError::InvalidLockGeneration("counter missing in store".to_owned())
2559 })?;
2560
2561 let actual_gen =
2562 u64::from_le_bytes(actual_gen.try_into().map_err(|_| {
2563 CryptoStoreError::InvalidLockGeneration("invalid format".to_owned())
2564 })?);
2565
2566 let new_gen = match gen_guard.as_ref() {
2567 Some(expected_gen) => {
2568 if actual_gen == *expected_gen {
2569 return Ok((false, actual_gen));
2570 }
2571 actual_gen.max(*expected_gen).wrapping_add(1)
2573 }
2574 None => {
2575 actual_gen.wrapping_add(1)
2578 }
2579 };
2580
2581 tracing::debug!(
2582 "Crypto store generation mismatch: previously known was {:?}, actual is {:?}, next is {}",
2583 *gen_guard,
2584 actual_gen,
2585 new_gen
2586 );
2587
2588 *gen_guard = Some(new_gen);
2590
2591 self.inner
2593 .store
2594 .set_custom_value(Self::CURRENT_GENERATION_STORE_KEY, new_gen.to_le_bytes().to_vec())
2595 .await?;
2596
2597 Ok((true, new_gen))
2598 }
2599
2600 pub fn dehydrated_devices(&self) -> DehydratedDevices {
2602 DehydratedDevices { inner: self.to_owned() }
2603 }
2604
2605 pub async fn room_settings(&self, room_id: &RoomId) -> StoreResult<Option<RoomSettings>> {
2610 self.inner.store.get_room_settings(room_id).await
2613 }
2614
2615 pub async fn set_room_settings(
2626 &self,
2627 room_id: &RoomId,
2628 new_settings: &RoomSettings,
2629 ) -> Result<(), SetRoomSettingsError> {
2630 let store = &self.inner.store;
2631
2632 let _store_transaction = store.transaction().await;
2637
2638 let old_settings = store.get_room_settings(room_id).await?;
2639
2640 if let Some(old_settings) = old_settings {
2653 if old_settings != *new_settings {
2654 return Err(SetRoomSettingsError::EncryptionDowngrade);
2655 } else {
2656 return Ok(());
2658 }
2659 }
2660
2661 match new_settings.algorithm {
2663 EventEncryptionAlgorithm::MegolmV1AesSha2 => (),
2664
2665 #[cfg(feature = "experimental-algorithms")]
2666 EventEncryptionAlgorithm::MegolmV2AesSha2 => (),
2667
2668 _ => {
2669 warn!(
2670 ?room_id,
2671 "Rejecting invalid encryption algorithm {}", new_settings.algorithm
2672 );
2673 return Err(SetRoomSettingsError::InvalidSettings);
2674 }
2675 }
2676
2677 store
2679 .save_changes(Changes {
2680 room_settings: HashMap::from([(room_id.to_owned(), new_settings.clone())]),
2681 ..Default::default()
2682 })
2683 .await?;
2684
2685 Ok(())
2686 }
2687
2688 #[cfg(any(feature = "testing", test))]
2692 pub fn same_as(&self, other: &OlmMachine) -> bool {
2693 Arc::ptr_eq(&self.inner, &other.inner)
2694 }
2695
2696 #[cfg(any(feature = "testing", test))]
2698 pub async fn uploaded_key_count(&self) -> Result<u64, CryptoStoreError> {
2699 let cache = self.inner.store.cache().await?;
2700 let account = cache.account().await?;
2701 Ok(account.uploaded_key_count())
2702 }
2703
2704 #[cfg(test)]
2706 pub(crate) fn identity_manager(&self) -> &IdentityManager {
2707 &self.inner.identity_manager
2708 }
2709
2710 #[cfg(test)]
2712 pub(crate) fn key_for_has_migrated_verification_latch() -> &'static str {
2713 Self::HAS_MIGRATED_VERIFICATION_LATCH
2714 }
2715}
2716
2717fn sender_data_to_verification_state(
2718 sender_data: SenderData,
2719 session_has_been_imported: bool,
2720) -> (VerificationState, Option<OwnedDeviceId>) {
2721 match sender_data {
2722 SenderData::UnknownDevice { owner_check_failed: false, .. } => {
2723 let device_link_problem = if session_has_been_imported {
2724 DeviceLinkProblem::InsecureSource
2725 } else {
2726 DeviceLinkProblem::MissingDevice
2727 };
2728
2729 (VerificationState::Unverified(VerificationLevel::None(device_link_problem)), None)
2730 }
2731 SenderData::UnknownDevice { owner_check_failed: true, .. } => (
2732 VerificationState::Unverified(VerificationLevel::None(
2733 DeviceLinkProblem::InsecureSource,
2734 )),
2735 None,
2736 ),
2737 SenderData::DeviceInfo { device_keys, .. } => (
2738 VerificationState::Unverified(VerificationLevel::UnsignedDevice),
2739 Some(device_keys.device_id),
2740 ),
2741 SenderData::VerificationViolation(KnownSenderData { device_id, .. }) => {
2742 (VerificationState::Unverified(VerificationLevel::VerificationViolation), device_id)
2743 }
2744 SenderData::SenderUnverified(KnownSenderData { device_id, .. }) => {
2745 (VerificationState::Unverified(VerificationLevel::UnverifiedIdentity), device_id)
2746 }
2747 SenderData::SenderVerified(KnownSenderData { device_id, .. }) => {
2748 (VerificationState::Verified, device_id)
2749 }
2750 }
2751}
2752
2753#[derive(Debug, Clone)]
2756pub struct CrossSigningBootstrapRequests {
2757 pub upload_keys_req: Option<OutgoingRequest>,
2764
2765 pub upload_signing_keys_req: UploadSigningKeysRequest,
2769
2770 pub upload_signatures_req: UploadSignaturesRequest,
2775}
2776
2777#[derive(Debug)]
2780pub struct EncryptionSyncChanges<'a> {
2781 pub to_device_events: Vec<Raw<AnyToDeviceEvent>>,
2783 pub changed_devices: &'a DeviceLists,
2786 pub one_time_keys_counts: &'a BTreeMap<OneTimeKeyAlgorithm, UInt>,
2788 pub unused_fallback_keys: Option<&'a [OneTimeKeyAlgorithm]>,
2790 pub next_batch_token: Option<String>,
2792}
2793
2794fn megolm_error_to_utd_info(
2802 raw_event: &Raw<EncryptedEvent>,
2803 error: MegolmError,
2804) -> Result<UnableToDecryptInfo, CryptoStoreError> {
2805 use MegolmError::*;
2806 let reason = match error {
2807 EventError(_) => UnableToDecryptReason::MalformedEncryptedEvent,
2808 Decode(_) => UnableToDecryptReason::MalformedEncryptedEvent,
2809 MissingRoomKey(maybe_withheld) => {
2810 UnableToDecryptReason::MissingMegolmSession { withheld_code: maybe_withheld }
2811 }
2812 Decryption(DecryptionError::UnknownMessageIndex(_, _)) => {
2813 UnableToDecryptReason::UnknownMegolmMessageIndex
2814 }
2815 Decryption(_) => UnableToDecryptReason::MegolmDecryptionFailure,
2816 JsonError(_) => UnableToDecryptReason::PayloadDeserializationFailure,
2817 MismatchedIdentityKeys(_) => UnableToDecryptReason::MismatchedIdentityKeys,
2818 SenderIdentityNotTrusted(level) => UnableToDecryptReason::SenderIdentityNotTrusted(level),
2819
2820 Store(error) => Err(error)?,
2823 };
2824
2825 let session_id = raw_event.deserialize().ok().and_then(|ev| match ev.content.scheme {
2826 RoomEventEncryptionScheme::MegolmV1AesSha2(s) => Some(s.session_id),
2827 #[cfg(feature = "experimental-algorithms")]
2828 RoomEventEncryptionScheme::MegolmV2AesSha2(s) => Some(s.session_id),
2829 RoomEventEncryptionScheme::Unknown(_) => None,
2830 });
2831
2832 Ok(UnableToDecryptInfo { session_id, reason })
2833}
2834
2835#[cfg(test)]
2836pub(crate) mod test_helpers;
2837
2838#[cfg(test)]
2839pub(crate) mod tests;