1#[cfg(feature = "e2e-encryption")]
17use std::sync::Arc;
18use std::{
19 collections::{BTreeMap, BTreeSet, HashMap},
20 fmt,
21 ops::Deref,
22};
23
24use eyeball::{SharedObservable, Subscriber};
25use eyeball_im::{Vector, VectorDiff};
26use futures_util::Stream;
27#[cfg(feature = "e2e-encryption")]
28use matrix_sdk_crypto::{
29 store::DynCryptoStore, types::requests::ToDeviceRequest, CollectStrategy, EncryptionSettings,
30 OlmError, OlmMachine, TrustRequirement,
31};
32#[cfg(feature = "e2e-encryption")]
33use ruma::events::room::{history_visibility::HistoryVisibility, member::MembershipState};
34#[cfg(doc)]
35use ruma::DeviceId;
36use ruma::{
37 api::client::{self as api, sync::sync_events::v5},
38 events::{
39 push_rules::{PushRulesEvent, PushRulesEventContent},
40 room::member::SyncRoomMemberEvent,
41 StateEvent, StateEventType,
42 },
43 push::Ruleset,
44 time::Instant,
45 OwnedRoomId, OwnedUserId, RoomId,
46};
47use tokio::sync::{broadcast, Mutex};
48#[cfg(feature = "e2e-encryption")]
49use tokio::sync::{RwLock, RwLockReadGuard};
50use tracing::{debug, enabled, info, instrument, Level};
51
52#[cfg(feature = "e2e-encryption")]
53use crate::RoomMemberships;
54use crate::{
55 deserialized_responses::DisplayName,
56 error::{Error, Result},
57 event_cache::store::EventCacheStoreLock,
58 response_processors::{self as processors, Context},
59 room::{
60 Room, RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons, RoomMembersUpdate, RoomState,
61 },
62 store::{
63 ambiguity_map::AmbiguityCache, BaseStateStore, DynStateStore, MemoryStore,
64 Result as StoreResult, RoomLoadSettings, StateChanges, StateStoreDataKey,
65 StateStoreDataValue, StateStoreExt, StoreConfig,
66 },
67 sync::{RoomUpdates, SyncResponse},
68 RoomStateFilter, SessionMeta,
69};
70
71#[derive(Clone)]
85pub struct BaseClient {
86 pub(crate) state_store: BaseStateStore,
88
89 event_cache_store: EventCacheStoreLock,
91
92 #[cfg(feature = "e2e-encryption")]
97 crypto_store: Arc<DynCryptoStore>,
98
99 #[cfg(feature = "e2e-encryption")]
103 olm_machine: Arc<RwLock<Option<OlmMachine>>>,
104
105 pub(crate) ignore_user_list_changes: SharedObservable<Vec<String>>,
107
108 pub(crate) room_info_notable_update_sender: broadcast::Sender<RoomInfoNotableUpdate>,
111
112 #[cfg(feature = "e2e-encryption")]
115 pub room_key_recipient_strategy: CollectStrategy,
116
117 #[cfg(feature = "e2e-encryption")]
119 pub decryption_trust_requirement: TrustRequirement,
120
121 #[cfg(feature = "e2e-encryption")]
123 pub handle_verification_events: bool,
124}
125
126#[cfg(not(tarpaulin_include))]
127impl fmt::Debug for BaseClient {
128 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
129 f.debug_struct("BaseClient")
130 .field("session_meta", &self.state_store.session_meta())
131 .field("sync_token", &self.state_store.sync_token)
132 .finish_non_exhaustive()
133 }
134}
135
136impl BaseClient {
137 pub fn new(config: StoreConfig) -> Self {
144 let store = BaseStateStore::new(config.state_store);
145
146 let (room_info_notable_update_sender, _room_info_notable_update_receiver) =
156 broadcast::channel(500);
157
158 BaseClient {
159 state_store: store,
160 event_cache_store: config.event_cache_store,
161 #[cfg(feature = "e2e-encryption")]
162 crypto_store: config.crypto_store,
163 #[cfg(feature = "e2e-encryption")]
164 olm_machine: Default::default(),
165 ignore_user_list_changes: Default::default(),
166 room_info_notable_update_sender,
167 #[cfg(feature = "e2e-encryption")]
168 room_key_recipient_strategy: Default::default(),
169 #[cfg(feature = "e2e-encryption")]
170 decryption_trust_requirement: TrustRequirement::Untrusted,
171 #[cfg(feature = "e2e-encryption")]
172 handle_verification_events: true,
173 }
174 }
175
176 #[cfg(feature = "e2e-encryption")]
179 pub async fn clone_with_in_memory_state_store(
180 &self,
181 cross_process_store_locks_holder_name: &str,
182 handle_verification_events: bool,
183 ) -> Result<Self> {
184 let config = StoreConfig::new(cross_process_store_locks_holder_name.to_owned())
185 .state_store(MemoryStore::new());
186 let config = config.crypto_store(self.crypto_store.clone());
187
188 let copy = Self {
189 state_store: BaseStateStore::new(config.state_store),
190 event_cache_store: config.event_cache_store,
191 crypto_store: self.crypto_store.clone(),
198 olm_machine: self.olm_machine.clone(),
199 ignore_user_list_changes: Default::default(),
200 room_info_notable_update_sender: self.room_info_notable_update_sender.clone(),
201 room_key_recipient_strategy: self.room_key_recipient_strategy.clone(),
202 decryption_trust_requirement: self.decryption_trust_requirement,
203 handle_verification_events,
204 };
205
206 copy.state_store
207 .derive_from_other(&self.state_store, ©.room_info_notable_update_sender)
208 .await?;
209
210 Ok(copy)
211 }
212
213 #[cfg(not(feature = "e2e-encryption"))]
216 #[allow(clippy::unused_async)]
217 pub async fn clone_with_in_memory_state_store(
218 &self,
219 cross_process_store_locks_holder: &str,
220 _handle_verification_events: bool,
221 ) -> Result<Self> {
222 let config = StoreConfig::new(cross_process_store_locks_holder.to_owned())
223 .state_store(MemoryStore::new());
224 Ok(Self::new(config))
225 }
226
227 pub fn session_meta(&self) -> Option<&SessionMeta> {
233 self.state_store.session_meta()
234 }
235
236 pub fn rooms(&self) -> Vec<Room> {
238 self.state_store.rooms()
239 }
240
241 pub fn rooms_filtered(&self, filter: RoomStateFilter) -> Vec<Room> {
243 self.state_store.rooms_filtered(filter)
244 }
245
246 pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>>) {
249 self.state_store.rooms_stream()
250 }
251
252 pub fn get_or_create_room(&self, room_id: &RoomId, room_state: RoomState) -> Room {
255 self.state_store.get_or_create_room(
256 room_id,
257 room_state,
258 self.room_info_notable_update_sender.clone(),
259 )
260 }
261
262 pub fn state_store(&self) -> &DynStateStore {
264 self.state_store.deref()
265 }
266
267 pub fn event_cache_store(&self) -> &EventCacheStoreLock {
269 &self.event_cache_store
270 }
271
272 pub fn is_active(&self) -> bool {
276 self.state_store.session_meta().is_some()
277 }
278
279 pub async fn activate(
311 &self,
312 session_meta: SessionMeta,
313 room_load_settings: RoomLoadSettings,
314 #[cfg(feature = "e2e-encryption")] custom_account: Option<
315 crate::crypto::vodozemac::olm::Account,
316 >,
317 ) -> Result<()> {
318 debug!(user_id = ?session_meta.user_id, device_id = ?session_meta.device_id, "Activating the client");
319
320 self.state_store
321 .load_rooms(
322 &session_meta.user_id,
323 room_load_settings,
324 &self.room_info_notable_update_sender,
325 )
326 .await?;
327 self.state_store.load_sync_token().await?;
328 self.state_store.set_session_meta(session_meta);
329
330 #[cfg(feature = "e2e-encryption")]
331 self.regenerate_olm(custom_account).await?;
332
333 Ok(())
334 }
335
336 #[cfg(feature = "e2e-encryption")]
340 pub async fn regenerate_olm(
341 &self,
342 custom_account: Option<crate::crypto::vodozemac::olm::Account>,
343 ) -> Result<()> {
344 tracing::debug!("regenerating OlmMachine");
345 let session_meta = self.session_meta().ok_or(Error::OlmError(OlmError::MissingSession))?;
346
347 let olm_machine = OlmMachine::with_store(
350 &session_meta.user_id,
351 &session_meta.device_id,
352 self.crypto_store.clone(),
353 custom_account,
354 )
355 .await
356 .map_err(OlmError::from)?;
357
358 *self.olm_machine.write().await = Some(olm_machine);
359 Ok(())
360 }
361
362 pub async fn sync_token(&self) -> Option<String> {
365 self.state_store.sync_token.read().await.clone()
366 }
367
368 pub async fn room_knocked(&self, room_id: &RoomId) -> Result<Room> {
372 let room = self.state_store.get_or_create_room(
373 room_id,
374 RoomState::Knocked,
375 self.room_info_notable_update_sender.clone(),
376 );
377
378 if room.state() != RoomState::Knocked {
379 let _sync_lock = self.sync_lock().lock().await;
380
381 let mut room_info = room.clone_info();
382 room_info.mark_as_knocked();
383 room_info.mark_state_partially_synced();
384 room_info.mark_members_missing(); let mut changes = StateChanges::default();
386 changes.add_room(room_info.clone());
387 self.state_store.save_changes(&changes).await?; room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP);
389 }
390
391 Ok(room)
392 }
393
394 pub async fn room_joined(&self, room_id: &RoomId) -> Result<Room> {
398 let room = self.state_store.get_or_create_room(
399 room_id,
400 RoomState::Joined,
401 self.room_info_notable_update_sender.clone(),
402 );
403
404 if room.state() != RoomState::Joined {
405 let _sync_lock = self.sync_lock().lock().await;
406
407 let mut room_info = room.clone_info();
408 room_info.mark_as_joined();
409 room_info.mark_state_partially_synced();
410 room_info.mark_members_missing(); let mut changes = StateChanges::default();
412 changes.add_room(room_info.clone());
413 self.state_store.save_changes(&changes).await?; room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP);
415 }
416
417 Ok(room)
418 }
419
420 pub async fn room_left(&self, room_id: &RoomId) -> Result<()> {
424 let room = self.state_store.get_or_create_room(
425 room_id,
426 RoomState::Left,
427 self.room_info_notable_update_sender.clone(),
428 );
429
430 if room.state() != RoomState::Left {
431 let _sync_lock = self.sync_lock().lock().await;
432
433 let mut room_info = room.clone_info();
434 room_info.mark_as_left();
435 room_info.mark_state_partially_synced();
436 room_info.mark_members_missing(); let mut changes = StateChanges::default();
438 changes.add_room(room_info.clone());
439 self.state_store.save_changes(&changes).await?; room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP);
441 }
442
443 Ok(())
444 }
445
446 pub fn sync_lock(&self) -> &Mutex<()> {
448 self.state_store.sync_lock()
449 }
450
451 #[instrument(skip_all)]
457 pub async fn receive_sync_response(
458 &self,
459 response: api::sync::sync_events::v3::Response,
460 ) -> Result<SyncResponse> {
461 self.receive_sync_response_with_requested_required_states(
462 response,
463 &RequestedRequiredStates::default(),
464 )
465 .await
466 }
467
468 pub async fn receive_sync_response_with_requested_required_states(
476 &self,
477 response: api::sync::sync_events::v3::Response,
478 requested_required_states: &RequestedRequiredStates,
479 ) -> Result<SyncResponse> {
480 if self.state_store.sync_token.read().await.as_ref() == Some(&response.next_batch) {
484 info!("Got the same sync response twice");
485 return Ok(SyncResponse::default());
486 }
487
488 let now = if enabled!(Level::INFO) { Some(Instant::now()) } else { None };
489
490 #[cfg(feature = "e2e-encryption")]
491 let olm_machine = self.olm_machine().await;
492
493 let mut context = Context::new(StateChanges::new(response.next_batch.clone()));
494
495 #[cfg(feature = "e2e-encryption")]
496 let to_device = {
497 let processors::e2ee::to_device::Output {
498 decrypted_to_device_events: to_device,
499 room_key_updates,
500 } = processors::e2ee::to_device::from_sync_v2(&response, olm_machine.as_ref()).await?;
501
502 processors::latest_event::decrypt_from_rooms(
503 &mut context,
504 room_key_updates
505 .into_iter()
506 .flatten()
507 .filter_map(|room_key_info| self.get_room(&room_key_info.room_id))
508 .collect(),
509 processors::e2ee::E2EE::new(
510 olm_machine.as_ref(),
511 self.decryption_trust_requirement,
512 self.handle_verification_events,
513 ),
514 )
515 .await?;
516
517 to_device
518 };
519
520 #[cfg(not(feature = "e2e-encryption"))]
521 let to_device = response.to_device.events;
522
523 let mut ambiguity_cache = AmbiguityCache::new(self.state_store.inner.clone());
524
525 let global_account_data_processor =
526 processors::account_data::global(&response.account_data.events);
527
528 let push_rules = self.get_push_rules(&global_account_data_processor).await?;
529
530 let mut room_updates = RoomUpdates::default();
531 let mut notifications = Default::default();
532
533 let mut updated_members_in_room: BTreeMap<OwnedRoomId, BTreeSet<OwnedUserId>> =
534 BTreeMap::new();
535
536 for (room_id, joined_room) in response.rooms.join {
537 let joined_room_update = processors::room::sync_v2::update_joined_room(
538 &mut context,
539 processors::room::RoomCreationData::new(
540 &room_id,
541 self.room_info_notable_update_sender.clone(),
542 requested_required_states,
543 &mut ambiguity_cache,
544 ),
545 joined_room,
546 &mut updated_members_in_room,
547 processors::notification::Notification::new(
548 &push_rules,
549 &mut notifications,
550 &self.state_store,
551 ),
552 #[cfg(feature = "e2e-encryption")]
553 processors::e2ee::E2EE::new(
554 olm_machine.as_ref(),
555 self.decryption_trust_requirement,
556 self.handle_verification_events,
557 ),
558 )
559 .await?;
560
561 room_updates.joined.insert(room_id, joined_room_update);
562 }
563
564 for (room_id, left_room) in response.rooms.leave {
565 let left_room_update = processors::room::sync_v2::update_left_room(
566 &mut context,
567 processors::room::RoomCreationData::new(
568 &room_id,
569 self.room_info_notable_update_sender.clone(),
570 requested_required_states,
571 &mut ambiguity_cache,
572 ),
573 left_room,
574 processors::notification::Notification::new(
575 &push_rules,
576 &mut notifications,
577 &self.state_store,
578 ),
579 #[cfg(feature = "e2e-encryption")]
580 processors::e2ee::E2EE::new(
581 olm_machine.as_ref(),
582 self.decryption_trust_requirement,
583 self.handle_verification_events,
584 ),
585 )
586 .await?;
587
588 room_updates.left.insert(room_id, left_room_update);
589 }
590
591 for (room_id, invited_room) in response.rooms.invite {
592 let invited_room_update = processors::room::sync_v2::update_invited_room(
593 &mut context,
594 &room_id,
595 invited_room,
596 self.room_info_notable_update_sender.clone(),
597 processors::notification::Notification::new(
598 &push_rules,
599 &mut notifications,
600 &self.state_store,
601 ),
602 )
603 .await?;
604
605 room_updates.invited.insert(room_id, invited_room_update);
606 }
607
608 for (room_id, knocked_room) in response.rooms.knock {
609 let knocked_room_update = processors::room::sync_v2::update_knocked_room(
610 &mut context,
611 &room_id,
612 knocked_room,
613 self.room_info_notable_update_sender.clone(),
614 processors::notification::Notification::new(
615 &push_rules,
616 &mut notifications,
617 &self.state_store,
618 ),
619 )
620 .await?;
621
622 room_updates.knocked.insert(room_id, knocked_room_update);
623 }
624
625 global_account_data_processor.apply(&mut context, &self.state_store).await;
626
627 context.state_changes.presence = response
628 .presence
629 .events
630 .iter()
631 .filter_map(|e| {
632 let event = e.deserialize().ok()?;
633 Some((event.sender, e.clone()))
634 })
635 .collect();
636
637 context.state_changes.ambiguity_maps = ambiguity_cache.cache;
638
639 {
640 let _sync_lock = self.sync_lock().lock().await;
641
642 processors::changes::save_and_apply(
643 context,
644 &self.state_store,
645 &self.ignore_user_list_changes,
646 Some(response.next_batch.clone()),
647 )
648 .await?;
649 }
650
651 let mut context = Context::default();
652
653 processors::room::display_name::update_for_rooms(
656 &mut context,
657 &room_updates,
658 &self.state_store,
659 )
660 .await;
661
662 processors::changes::save_only(context, &self.state_store).await?;
664
665 for (room_id, member_ids) in updated_members_in_room {
666 if let Some(room) = self.get_room(&room_id) {
667 let _ =
668 room.room_member_updates_sender.send(RoomMembersUpdate::Partial(member_ids));
669 }
670 }
671
672 if enabled!(Level::INFO) {
673 info!("Processed a sync response in {:?}", now.map(|now| now.elapsed()));
674 }
675
676 let response = SyncResponse {
677 rooms: room_updates,
678 presence: response.presence.events,
679 account_data: response.account_data.events,
680 to_device,
681 notifications,
682 };
683
684 Ok(response)
685 }
686
687 #[instrument(skip_all, fields(?room_id))]
699 pub async fn receive_all_members(
700 &self,
701 room_id: &RoomId,
702 request: &api::membership::get_member_events::v3::Request,
703 response: &api::membership::get_member_events::v3::Response,
704 ) -> Result<()> {
705 if request.membership.is_some() || request.not_membership.is_some() || request.at.is_some()
706 {
707 return Err(Error::InvalidReceiveMembersParameters);
711 }
712
713 let Some(room) = self.state_store.room(room_id) else {
714 return Ok(());
716 };
717
718 let mut chunk = Vec::with_capacity(response.chunk.len());
719 let mut context = Context::default();
720
721 #[cfg(feature = "e2e-encryption")]
722 let mut user_ids = BTreeSet::new();
723
724 let mut ambiguity_map: HashMap<DisplayName, BTreeSet<OwnedUserId>> = Default::default();
725
726 for raw_event in &response.chunk {
727 let member = match raw_event.deserialize() {
728 Ok(ev) => ev,
729 Err(e) => {
730 let event_id: Option<String> = raw_event.get_field("event_id").ok().flatten();
731 debug!(event_id, "Failed to deserialize member event: {e}");
732 continue;
733 }
734 };
735
736 #[cfg(feature = "e2e-encryption")]
746 match member.membership() {
747 MembershipState::Join | MembershipState::Invite => {
748 user_ids.insert(member.state_key().to_owned());
749 }
750 _ => (),
751 }
752
753 if let StateEvent::Original(e) = &member {
754 if let Some(d) = &e.content.displayname {
755 let display_name = DisplayName::new(d);
756 ambiguity_map
757 .entry(display_name)
758 .or_default()
759 .insert(member.state_key().clone());
760 }
761 }
762
763 let sync_member: SyncRoomMemberEvent = member.clone().into();
764 processors::profiles::upsert_or_delete(&mut context, room_id, &sync_member);
765
766 context
767 .state_changes
768 .state
769 .entry(room_id.to_owned())
770 .or_default()
771 .entry(member.event_type())
772 .or_default()
773 .insert(member.state_key().to_string(), raw_event.clone().cast());
774 chunk.push(member);
775 }
776
777 #[cfg(feature = "e2e-encryption")]
778 processors::e2ee::tracked_users::update(
779 self.olm_machine().await.as_ref(),
780 room.encryption_state(),
781 &user_ids,
782 )
783 .await?;
784
785 context.state_changes.ambiguity_maps.insert(room_id.to_owned(), ambiguity_map);
786
787 let _sync_lock = self.sync_lock().lock().await;
788 let mut room_info = room.clone_info();
789 room_info.mark_members_synced();
790 context.state_changes.add_room(room_info);
791
792 processors::changes::save_and_apply(
793 context,
794 &self.state_store,
795 &self.ignore_user_list_changes,
796 None,
797 )
798 .await?;
799
800 let _ = room.room_member_updates_sender.send(RoomMembersUpdate::FullReload);
801
802 Ok(())
803 }
804
805 pub async fn receive_filter_upload(
821 &self,
822 filter_name: &str,
823 response: &api::filter::create_filter::v3::Response,
824 ) -> Result<()> {
825 Ok(self
826 .state_store
827 .set_kv_data(
828 StateStoreDataKey::Filter(filter_name),
829 StateStoreDataValue::Filter(response.filter_id.clone()),
830 )
831 .await?)
832 }
833
834 pub async fn get_filter(&self, filter_name: &str) -> StoreResult<Option<String>> {
846 let filter = self
847 .state_store
848 .get_kv_data(StateStoreDataKey::Filter(filter_name))
849 .await?
850 .map(|d| d.into_filter().expect("State store data not a filter"));
851
852 Ok(filter)
853 }
854
855 #[cfg(feature = "e2e-encryption")]
857 pub async fn share_room_key(&self, room_id: &RoomId) -> Result<Vec<Arc<ToDeviceRequest>>> {
858 match self.olm_machine().await.as_ref() {
859 Some(o) => {
860 let Some(room) = self.get_room(room_id) else {
861 return Err(Error::InsufficientData);
862 };
863
864 let history_visibility = room.history_visibility_or_default();
865 let Some(room_encryption_event) = room.encryption_settings() else {
866 return Err(Error::EncryptionNotEnabled);
867 };
868
869 let filter = if history_visibility == HistoryVisibility::Joined {
872 RoomMemberships::JOIN
873 } else {
874 RoomMemberships::ACTIVE
875 };
876
877 let members = self.state_store.get_user_ids(room_id, filter).await?;
878
879 let settings = EncryptionSettings::new(
880 room_encryption_event,
881 history_visibility,
882 self.room_key_recipient_strategy.clone(),
883 );
884
885 Ok(o.share_room_key(room_id, members.iter().map(Deref::deref), settings).await?)
886 }
887 None => panic!("Olm machine wasn't started"),
888 }
889 }
890
891 pub fn get_room(&self, room_id: &RoomId) -> Option<Room> {
897 self.state_store.room(room_id)
898 }
899
900 pub async fn forget_room(&self, room_id: &RoomId) -> Result<()> {
908 self.state_store.forget_room(room_id).await?;
910
911 self.event_cache_store().lock().await?.remove_room(room_id).await?;
913
914 Ok(())
915 }
916
917 #[cfg(feature = "e2e-encryption")]
919 pub async fn olm_machine(&self) -> RwLockReadGuard<'_, Option<OlmMachine>> {
920 self.olm_machine.read().await
921 }
922
923 pub(crate) async fn get_push_rules(
929 &self,
930 global_account_data_processor: &processors::account_data::Global,
931 ) -> Result<Ruleset> {
932 if let Some(event) = global_account_data_processor
933 .push_rules()
934 .and_then(|ev| ev.deserialize_as::<PushRulesEvent>().ok())
935 {
936 Ok(event.content.global)
937 } else if let Some(event) = self
938 .state_store
939 .get_account_data_event_static::<PushRulesEventContent>()
940 .await?
941 .and_then(|ev| ev.deserialize().ok())
942 {
943 Ok(event.content.global)
944 } else if let Some(session_meta) = self.state_store.session_meta() {
945 Ok(Ruleset::server_default(&session_meta.user_id))
946 } else {
947 Ok(Ruleset::new())
948 }
949 }
950
951 pub fn subscribe_to_ignore_user_list_changes(&self) -> Subscriber<Vec<String>> {
954 self.ignore_user_list_changes.subscribe()
955 }
956
957 pub fn room_info_notable_update_receiver(&self) -> broadcast::Receiver<RoomInfoNotableUpdate> {
961 self.room_info_notable_update_sender.subscribe()
962 }
963}
964
965#[derive(Debug, Default)]
977pub struct RequestedRequiredStates {
978 default: Vec<(StateEventType, String)>,
979 for_rooms: HashMap<OwnedRoomId, Vec<(StateEventType, String)>>,
980}
981
982impl RequestedRequiredStates {
983 pub fn new(
988 default: Vec<(StateEventType, String)>,
989 for_rooms: HashMap<OwnedRoomId, Vec<(StateEventType, String)>>,
990 ) -> Self {
991 Self { default, for_rooms }
992 }
993
994 pub fn for_room(&self, room_id: &RoomId) -> &[(StateEventType, String)] {
996 self.for_rooms.get(room_id).unwrap_or(&self.default)
997 }
998}
999
1000impl From<&v5::Request> for RequestedRequiredStates {
1001 fn from(request: &v5::Request) -> Self {
1002 let mut default = BTreeSet::new();
1009
1010 for list in request.lists.values() {
1011 default.extend(BTreeSet::from_iter(list.room_details.required_state.iter().cloned()));
1012 }
1013
1014 for room_subscription in request.room_subscriptions.values() {
1015 default.extend(BTreeSet::from_iter(room_subscription.required_state.iter().cloned()));
1016 }
1017
1018 Self { default: default.into_iter().collect(), for_rooms: HashMap::new() }
1019 }
1020}
1021
1022#[cfg(test)]
1023mod tests {
1024 use std::collections::HashMap;
1025
1026 use assert_matches2::assert_let;
1027 use futures_util::FutureExt as _;
1028 use matrix_sdk_test::{
1029 async_test, event_factory::EventFactory, ruma_response_from_json, InvitedRoomBuilder,
1030 LeftRoomBuilder, StateTestEvent, StrippedStateTestEvent, SyncResponseBuilder, BOB,
1031 };
1032 use ruma::{
1033 api::client::{self as api, sync::sync_events::v5},
1034 event_id,
1035 events::{room::member::MembershipState, StateEventType},
1036 room_id,
1037 serde::Raw,
1038 user_id,
1039 };
1040 use serde_json::{json, value::to_raw_value};
1041
1042 use super::{BaseClient, RequestedRequiredStates};
1043 use crate::{
1044 store::{RoomLoadSettings, StateStoreExt, StoreConfig},
1045 test_utils::logged_in_base_client,
1046 RoomDisplayName, RoomState, SessionMeta,
1047 };
1048
1049 #[test]
1050 fn test_requested_required_states() {
1051 let room_id_0 = room_id!("!r0");
1052 let room_id_1 = room_id!("!r1");
1053
1054 let requested_required_states = RequestedRequiredStates::new(
1055 vec![(StateEventType::RoomAvatar, "".to_owned())],
1056 HashMap::from([(
1057 room_id_0.to_owned(),
1058 vec![
1059 (StateEventType::RoomMember, "foo".to_owned()),
1060 (StateEventType::RoomEncryption, "".to_owned()),
1061 ],
1062 )]),
1063 );
1064
1065 assert_eq!(
1067 requested_required_states.for_room(room_id_0),
1068 &[
1069 (StateEventType::RoomMember, "foo".to_owned()),
1070 (StateEventType::RoomEncryption, "".to_owned()),
1071 ]
1072 );
1073
1074 assert_eq!(
1076 requested_required_states.for_room(room_id_1),
1077 &[(StateEventType::RoomAvatar, "".to_owned()),]
1078 );
1079 }
1080
1081 #[test]
1082 fn test_requested_required_states_from_sync_v5_request() {
1083 let room_id_0 = room_id!("!r0");
1084 let room_id_1 = room_id!("!r1");
1085
1086 let mut request = v5::Request::new();
1088
1089 {
1090 let requested_required_states = RequestedRequiredStates::from(&request);
1091
1092 assert!(requested_required_states.default.is_empty());
1093 assert!(requested_required_states.for_rooms.is_empty());
1094 }
1095
1096 request.lists.insert("foo".to_owned(), {
1098 let mut list = v5::request::List::default();
1099 list.room_details.required_state = vec![
1100 (StateEventType::RoomAvatar, "".to_owned()),
1101 (StateEventType::RoomEncryption, "".to_owned()),
1102 ];
1103
1104 list
1105 });
1106
1107 {
1108 let requested_required_states = RequestedRequiredStates::from(&request);
1109
1110 assert_eq!(
1111 requested_required_states.default,
1112 &[
1113 (StateEventType::RoomAvatar, "".to_owned()),
1114 (StateEventType::RoomEncryption, "".to_owned())
1115 ]
1116 );
1117 assert!(requested_required_states.for_rooms.is_empty());
1118 }
1119
1120 request.lists.insert("bar".to_owned(), {
1122 let mut list = v5::request::List::default();
1123 list.room_details.required_state = vec![
1124 (StateEventType::RoomEncryption, "".to_owned()),
1125 (StateEventType::RoomName, "".to_owned()),
1126 ];
1127
1128 list
1129 });
1130
1131 {
1132 let requested_required_states = RequestedRequiredStates::from(&request);
1133
1134 assert_eq!(
1136 requested_required_states.default,
1137 &[
1138 (StateEventType::RoomAvatar, "".to_owned()),
1139 (StateEventType::RoomEncryption, "".to_owned()),
1140 (StateEventType::RoomName, "".to_owned()),
1141 ]
1142 );
1143 assert!(requested_required_states.for_rooms.is_empty());
1144 }
1145
1146 request.room_subscriptions.insert(room_id_0.to_owned(), {
1148 let mut room_subscription = v5::request::RoomSubscription::default();
1149
1150 room_subscription.required_state = vec![
1151 (StateEventType::RoomJoinRules, "".to_owned()),
1152 (StateEventType::RoomEncryption, "".to_owned()),
1153 ];
1154
1155 room_subscription
1156 });
1157
1158 {
1159 let requested_required_states = RequestedRequiredStates::from(&request);
1160
1161 assert_eq!(
1163 requested_required_states.default,
1164 &[
1165 (StateEventType::RoomAvatar, "".to_owned()),
1166 (StateEventType::RoomEncryption, "".to_owned()),
1167 (StateEventType::RoomJoinRules, "".to_owned()),
1168 (StateEventType::RoomName, "".to_owned()),
1169 ]
1170 );
1171 assert!(requested_required_states.for_rooms.is_empty());
1172 }
1173
1174 request.room_subscriptions.insert(room_id_1.to_owned(), {
1176 let mut room_subscription = v5::request::RoomSubscription::default();
1177
1178 room_subscription.required_state = vec![
1179 (StateEventType::RoomName, "".to_owned()),
1180 (StateEventType::RoomTopic, "".to_owned()),
1181 ];
1182
1183 room_subscription
1184 });
1185
1186 {
1187 let requested_required_states = RequestedRequiredStates::from(&request);
1188
1189 assert_eq!(
1191 requested_required_states.default,
1192 &[
1193 (StateEventType::RoomAvatar, "".to_owned()),
1194 (StateEventType::RoomEncryption, "".to_owned()),
1195 (StateEventType::RoomJoinRules, "".to_owned()),
1196 (StateEventType::RoomName, "".to_owned()),
1197 (StateEventType::RoomTopic, "".to_owned()),
1198 ]
1199 );
1200 }
1201 }
1202
1203 #[async_test]
1204 async fn test_invite_after_leaving() {
1205 let user_id = user_id!("@alice:example.org");
1206 let room_id = room_id!("!test:example.org");
1207
1208 let client = logged_in_base_client(Some(user_id)).await;
1209
1210 let mut sync_builder = SyncResponseBuilder::new();
1211
1212 let response = sync_builder
1213 .add_left_room(
1214 LeftRoomBuilder::new(room_id).add_timeline_event(
1215 EventFactory::new()
1216 .member(user_id)
1217 .membership(MembershipState::Leave)
1218 .display_name("Alice")
1219 .event_id(event_id!("$994173582443PhrSn:example.org")),
1220 ),
1221 )
1222 .build_sync_response();
1223 client.receive_sync_response(response).await.unwrap();
1224 assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Left);
1225
1226 let response = sync_builder
1227 .add_invited_room(InvitedRoomBuilder::new(room_id).add_state_event(
1228 StrippedStateTestEvent::Custom(json!({
1229 "content": {
1230 "displayname": "Alice",
1231 "membership": "invite",
1232 },
1233 "event_id": "$143273582443PhrSn:example.org",
1234 "origin_server_ts": 1432735824653u64,
1235 "sender": "@example:example.org",
1236 "state_key": user_id,
1237 "type": "m.room.member",
1238 })),
1239 ))
1240 .build_sync_response();
1241 client.receive_sync_response(response).await.unwrap();
1242 assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Invited);
1243 }
1244
1245 #[async_test]
1246 async fn test_invite_displayname() {
1247 let user_id = user_id!("@alice:example.org");
1248 let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
1249
1250 let client = logged_in_base_client(Some(user_id)).await;
1251
1252 let response = ruma_response_from_json(&json!({
1253 "next_batch": "asdkl;fjasdkl;fj;asdkl;f",
1254 "device_one_time_keys_count": {
1255 "signed_curve25519": 50u64
1256 },
1257 "device_unused_fallback_key_types": [
1258 "signed_curve25519"
1259 ],
1260 "rooms": {
1261 "invite": {
1262 "!ithpyNKDtmhneaTQja:example.org": {
1263 "invite_state": {
1264 "events": [
1265 {
1266 "content": {
1267 "creator": "@test:example.org",
1268 "room_version": "9"
1269 },
1270 "sender": "@test:example.org",
1271 "state_key": "",
1272 "type": "m.room.create"
1273 },
1274 {
1275 "content": {
1276 "join_rule": "invite"
1277 },
1278 "sender": "@test:example.org",
1279 "state_key": "",
1280 "type": "m.room.join_rules"
1281 },
1282 {
1283 "content": {
1284 "algorithm": "m.megolm.v1.aes-sha2"
1285 },
1286 "sender": "@test:example.org",
1287 "state_key": "",
1288 "type": "m.room.encryption"
1289 },
1290 {
1291 "content": {
1292 "avatar_url": "mxc://example.org/dcBBDwuWEUrjfrOchvkirUST",
1293 "displayname": "Kyra",
1294 "membership": "join"
1295 },
1296 "sender": "@test:example.org",
1297 "state_key": "@test:example.org",
1298 "type": "m.room.member"
1299 },
1300 {
1301 "content": {
1302 "avatar_url": "mxc://example.org/ABFEXSDrESxovWwEnCYdNcHT",
1303 "displayname": "alice",
1304 "is_direct": true,
1305 "membership": "invite"
1306 },
1307 "origin_server_ts": 1650878657984u64,
1308 "sender": "@test:example.org",
1309 "state_key": "@alice:example.org",
1310 "type": "m.room.member",
1311 "unsigned": {
1312 "age": 14u64
1313 },
1314 "event_id": "$fLDqltg9Puj-kWItLSFVHPGN4YkgpYQf2qImPzdmgrE"
1315 }
1316 ]
1317 }
1318 }
1319 }
1320 }
1321 }));
1322
1323 client.receive_sync_response(response).await.unwrap();
1324
1325 let room = client.get_room(room_id).expect("Room not found");
1326 assert_eq!(room.state(), RoomState::Invited);
1327 assert_eq!(
1328 room.compute_display_name().await.expect("fetching display name failed").into_inner(),
1329 RoomDisplayName::Calculated("Kyra".to_owned())
1330 );
1331 }
1332
1333 #[async_test]
1334 async fn test_deserialization_failure() {
1335 let user_id = user_id!("@alice:example.org");
1336 let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
1337
1338 let client =
1339 BaseClient::new(StoreConfig::new("cross-process-store-locks-holder-name".to_owned()));
1340 client
1341 .activate(
1342 SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
1343 RoomLoadSettings::default(),
1344 #[cfg(feature = "e2e-encryption")]
1345 None,
1346 )
1347 .await
1348 .unwrap();
1349
1350 let response = ruma_response_from_json(&json!({
1351 "next_batch": "asdkl;fjasdkl;fj;asdkl;f",
1352 "rooms": {
1353 "join": {
1354 "!ithpyNKDtmhneaTQja:example.org": {
1355 "state": {
1356 "events": [
1357 {
1358 "invalid": "invalid",
1359 },
1360 {
1361 "content": {
1362 "name": "The room name"
1363 },
1364 "event_id": "$143273582443PhrSn:example.org",
1365 "origin_server_ts": 1432735824653u64,
1366 "room_id": "!jEsUZKDJdhlrceRyVU:example.org",
1367 "sender": "@example:example.org",
1368 "state_key": "",
1369 "type": "m.room.name",
1370 "unsigned": {
1371 "age": 1234
1372 }
1373 },
1374 ]
1375 }
1376 }
1377 }
1378 }
1379 }));
1380
1381 client.receive_sync_response(response).await.unwrap();
1382 client
1383 .state_store()
1384 .get_state_event_static::<ruma::events::room::name::RoomNameEventContent>(room_id)
1385 .await
1386 .expect("Failed to fetch state event")
1387 .expect("State event not found")
1388 .deserialize()
1389 .expect("Failed to deserialize state event");
1390 }
1391
1392 #[async_test]
1393 async fn test_invited_members_arent_ignored() {
1394 let user_id = user_id!("@alice:example.org");
1395 let inviter_user_id = user_id!("@bob:example.org");
1396 let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
1397
1398 let client =
1399 BaseClient::new(StoreConfig::new("cross-process-store-locks-holder-name".to_owned()));
1400 client
1401 .activate(
1402 SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
1403 RoomLoadSettings::default(),
1404 #[cfg(feature = "e2e-encryption")]
1405 None,
1406 )
1407 .await
1408 .unwrap();
1409
1410 let mut sync_builder = SyncResponseBuilder::new();
1412 let response = sync_builder
1413 .add_joined_room(matrix_sdk_test::JoinedRoomBuilder::new(room_id))
1414 .build_sync_response();
1415 client.receive_sync_response(response).await.unwrap();
1416
1417 let request = api::membership::get_member_events::v3::Request::new(room_id.to_owned());
1420
1421 let raw_member_event = json!({
1422 "content": {
1423 "avatar_url": "mxc://localhost/fewjilfewjil42",
1424 "displayname": "Invited Alice",
1425 "membership": "invite"
1426 },
1427 "event_id": "$151800140517rfvjc:localhost",
1428 "origin_server_ts": 151800140,
1429 "room_id": room_id,
1430 "sender": inviter_user_id,
1431 "state_key": user_id,
1432 "type": "m.room.member",
1433 "unsigned": {
1434 "age": 13374242,
1435 }
1436 });
1437 let response = api::membership::get_member_events::v3::Response::new(vec![Raw::from_json(
1438 to_raw_value(&raw_member_event).unwrap(),
1439 )]);
1440
1441 client.receive_all_members(room_id, &request, &response).await.unwrap();
1443
1444 let room = client.get_room(room_id).unwrap();
1445
1446 let member = room.get_member(user_id).await.expect("ok").expect("exists");
1448
1449 assert_eq!(member.user_id(), user_id);
1450 assert_eq!(member.display_name().unwrap(), "Invited Alice");
1451 assert_eq!(member.avatar_url().unwrap().to_string(), "mxc://localhost/fewjilfewjil42");
1452 }
1453
1454 #[async_test]
1455 async fn test_reinvited_members_get_a_display_name() {
1456 let user_id = user_id!("@alice:example.org");
1457 let inviter_user_id = user_id!("@bob:example.org");
1458 let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
1459
1460 let client =
1461 BaseClient::new(StoreConfig::new("cross-process-store-locks-holder-name".to_owned()));
1462 client
1463 .activate(
1464 SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
1465 RoomLoadSettings::default(),
1466 #[cfg(feature = "e2e-encryption")]
1467 None,
1468 )
1469 .await
1470 .unwrap();
1471
1472 let mut sync_builder = SyncResponseBuilder::new();
1474 let response = sync_builder
1475 .add_joined_room(matrix_sdk_test::JoinedRoomBuilder::new(room_id).add_state_event(
1476 StateTestEvent::Custom(json!({
1477 "content": {
1478 "avatar_url": null,
1479 "displayname": null,
1480 "membership": "leave"
1481 },
1482 "event_id": "$151803140217rkvjc:localhost",
1483 "origin_server_ts": 151800139,
1484 "room_id": room_id,
1485 "sender": user_id,
1486 "state_key": user_id,
1487 "type": "m.room.member",
1488 })),
1489 ))
1490 .build_sync_response();
1491 client.receive_sync_response(response).await.unwrap();
1492
1493 let request = api::membership::get_member_events::v3::Request::new(room_id.to_owned());
1495
1496 let raw_member_event = json!({
1497 "content": {
1498 "avatar_url": "mxc://localhost/fewjilfewjil42",
1499 "displayname": "Invited Alice",
1500 "membership": "invite"
1501 },
1502 "event_id": "$151800140517rfvjc:localhost",
1503 "origin_server_ts": 151800140,
1504 "room_id": room_id,
1505 "sender": inviter_user_id,
1506 "state_key": user_id,
1507 "type": "m.room.member",
1508 "unsigned": {
1509 "age": 13374242,
1510 }
1511 });
1512 let response = api::membership::get_member_events::v3::Response::new(vec![Raw::from_json(
1513 to_raw_value(&raw_member_event).unwrap(),
1514 )]);
1515
1516 client.receive_all_members(room_id, &request, &response).await.unwrap();
1518
1519 let room = client.get_room(room_id).unwrap();
1520
1521 let member = room.get_member(user_id).await.expect("ok").expect("exists");
1523
1524 assert_eq!(member.user_id(), user_id);
1525 assert_eq!(member.display_name().unwrap(), "Invited Alice");
1526 assert_eq!(member.avatar_url().unwrap().to_string(), "mxc://localhost/fewjilfewjil42");
1527 }
1528
1529 #[async_test]
1530 async fn test_ignored_user_list_changes() {
1531 let user_id = user_id!("@alice:example.org");
1532 let client =
1533 BaseClient::new(StoreConfig::new("cross-process-store-locks-holder-name".to_owned()));
1534
1535 client
1536 .activate(
1537 SessionMeta { user_id: user_id.to_owned(), device_id: "FOOBAR".into() },
1538 RoomLoadSettings::default(),
1539 #[cfg(feature = "e2e-encryption")]
1540 None,
1541 )
1542 .await
1543 .unwrap();
1544
1545 let mut subscriber = client.subscribe_to_ignore_user_list_changes();
1546 assert!(subscriber.next().now_or_never().is_none());
1547
1548 let mut sync_builder = SyncResponseBuilder::new();
1549 let response = sync_builder
1550 .add_global_account_data_event(matrix_sdk_test::GlobalAccountDataTestEvent::Custom(
1551 json!({
1552 "content": {
1553 "ignored_users": {
1554 *BOB: {}
1555 }
1556 },
1557 "type": "m.ignored_user_list",
1558 }),
1559 ))
1560 .build_sync_response();
1561 client.receive_sync_response(response).await.unwrap();
1562
1563 assert_let!(Some(ignored) = subscriber.next().await);
1564 assert_eq!(ignored, [BOB.to_string()]);
1565
1566 let response = sync_builder
1568 .add_global_account_data_event(matrix_sdk_test::GlobalAccountDataTestEvent::Custom(
1569 json!({
1570 "content": {
1571 "ignored_users": {
1572 *BOB: {}
1573 }
1574 },
1575 "type": "m.ignored_user_list",
1576 }),
1577 ))
1578 .build_sync_response();
1579 client.receive_sync_response(response).await.unwrap();
1580
1581 assert!(subscriber.next().now_or_never().is_none());
1583
1584 let response = sync_builder
1586 .add_global_account_data_event(matrix_sdk_test::GlobalAccountDataTestEvent::Custom(
1587 json!({
1588 "content": {
1589 "ignored_users": {}
1590 },
1591 "type": "m.ignored_user_list",
1592 }),
1593 ))
1594 .build_sync_response();
1595 client.receive_sync_response(response).await.unwrap();
1596
1597 assert_let!(Some(ignored) = subscriber.next().await);
1598 assert!(ignored.is_empty());
1599 }
1600}