Skip to content
This repository has been archived by the owner on Apr 15, 2024. It is now read-only.

Commit

Permalink
add cursors_by_topic
Browse files Browse the repository at this point in the history
  • Loading branch information
DyrellC committed Jul 12, 2022
1 parent 7833835 commit 18aa0d8
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 3 deletions.
4 changes: 4 additions & 0 deletions streams/src/api/cursor_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ impl CursorStore {
.flat_map(|(topic, branch)| branch.cursors.iter().map(move |(id, cursor)| (topic, id, *cursor)))
}

pub(crate) fn cursors_by_topic(&self, topic: &Topic) -> Option<impl Iterator<Item = (&Identifier, &usize)>> {
self.0.get(topic).map(|inner| inner.cursors.iter())
}

pub(crate) fn insert_cursor(&mut self, topic: &Topic, id: Identifier, cursor: usize) -> Option<usize> {
if let Some(branch) = self.0.get_mut(topic) {
return branch.cursors.insert(id, cursor);
Expand Down
12 changes: 9 additions & 3 deletions streams/src/api/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,13 @@ impl<T> User<T> {
self.state.cursor_store.cursors()
}

fn cursors_by_topic(&self, topic: &Topic) -> Result<impl Iterator<Item = (&Identifier, &usize)>> {
self.state
.cursor_store
.cursors_by_topic(topic)
.ok_or_else(|| anyhow!("previous topic {} not found in store", topic))
}

pub fn subscribers(&self) -> impl Iterator<Item = &Identifier> + Clone + '_ {
self.state.exchange_keys.keys()
}
Expand Down Expand Up @@ -298,9 +305,8 @@ impl<T> User<T> {
self.state.cursor_store.new_branch(new_topic.clone());
// Collect permissions from previous branch and clone them into new branch
let prev_permissions = self
.cursors()
.filter(|cursor| cursor.0 == &prev_topic)
.map(|(_, id, _)| id.clone())
.cursors_by_topic(&prev_topic)?
.map(|(id, _)| id.clone())
.collect::<Vec<Identifier>>();
for id in prev_permissions {
self.state.cursor_store.insert_cursor(new_topic, id, INIT_MESSAGE_NUM);
Expand Down

0 comments on commit 18aa0d8

Please sign in to comment.