Skip to content

streams: Handle adding/removing stream events. #851

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def logged_on_user():
'name': 'Some general stream',
'invite_only': False,
'color': '#b0a5fd', # Color in '#xxxxxx' format
'pin_to_top': False,
'pin_to_top': True,
'stream_id': 1000,
'in_home_view': True,
'audible_notifications': False,
Expand Down
186 changes: 183 additions & 3 deletions tests/model/test_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from zulip import ZulipError

from zulipterminal.helper import initial_index, powerset
from zulipterminal.model import Model, ServerConnectionFailure
from zulipterminal.model import Model, ServerConnectionFailure, sort_streams


class TestModel:
Expand Down Expand Up @@ -46,14 +46,34 @@ def model(self, mocker, initial_data, user_profile,
model = Model(self.controller)
return model

@pytest.fixture
def initial_pinned_streams(self, model, stream_dict):
pinned_streams = []
for _, stream in stream_dict.items():
if stream['pin_to_top']:
pinned_streams.append(model.make_reduced_stream_data(stream))
return pinned_streams

@pytest.fixture
def initial_unpinned_streams(self, model, stream_dict):
pinned_streams = []
for _, stream in stream_dict.items():
if not stream['pin_to_top']:
pinned_streams.append(model.make_reduced_stream_data(stream))
return pinned_streams

def test_init(self, model, initial_data, user_profile,
unicode_emojis, custom_emojis, stream_dict):
unicode_emojis, custom_emojis, stream_dict,
initial_pinned_streams, initial_unpinned_streams):
assert hasattr(model, 'controller')
assert hasattr(model, 'client')
assert model.narrow == []
assert model._have_last_message == {}
assert model.stream_id is None
assert model.stream_dict == stream_dict
assert model.pinned_streams == initial_pinned_streams
assert model.unpinned_streams == initial_unpinned_streams
assert model.subscribed_streams == {1, 2, 99, 1000}
assert model.recipients == frozenset()
assert model.index == initial_index
model.get_messages.assert_called_once_with(num_before=30,
Expand Down Expand Up @@ -777,7 +797,7 @@ def test__subscribe_to_streams(self, initial_data, muted, model):
assert all(msg_id == msg['stream_id']
for msg_id, msg in model.stream_dict.items())
assert model.muted_streams == muted
assert model.pinned_streams == [] # FIXME generalize/parametrize
assert len(model.pinned_streams) # FIXME generalize/parametrize
assert len(model.unpinned_streams) # FIXME generalize/parametrize

def test__handle_message_event_with_Falsey_log(self, mocker,
Expand Down Expand Up @@ -2001,6 +2021,30 @@ def test_is_user_subscribed_to_stream(self, model, stream_dict, stream_id,

assert return_value == expected_response

@pytest.mark.parametrize('stream_id, expected_response,', [
(99, False),
(101, True),
(2, False),
],
ids=[
'subscribed_pinned_stream',
'subscribed_unpinned_stream',
'unsubscribed_stream',
]
)
def test_is_user_in_unsubscribed_stream(self, model,
stream_id, expected_response,
initial_pinned_streams,
initial_unpinned_streams):

model.pinned_streams = deepcopy(initial_pinned_streams)
model.unpinned_streams = deepcopy(initial_unpinned_streams)
return_value = model.is_user_in_unsubscribed_stream(stream_id)

assert model.pinned_streams == initial_pinned_streams
assert model.unpinned_streams == initial_unpinned_streams
assert return_value == expected_response

@pytest.mark.parametrize('response', [{
'result': 'success',
'msg': '',
Expand Down Expand Up @@ -2089,6 +2133,142 @@ def test_fetch_custom_emojis(self, mocker, model, custom_emojis,

assert fetched_custom_emojis == custom_emojis

@pytest.mark.parametrize('event', [
{
'op': 'add',
'type': 'subscription',
'subscriptions': [
{
'name': 'all',
'stream_id': 1,
'description': '',
'color': '#95a5fd',
'pin_to_top': False,
'invite_only': False,
'in_home_view': True
}
]
},
{
'op': 'add',
'type': 'subscription',
'subscriptions': [
{
'name': 'design',
'stream_id': 2,
'description': '',
'color': '#95a5fd',
'pin_to_top': True,
'invite_only': False,
'in_home_view': True
}
]
},
{
'op': 'add',
'type': 'subscription',
'subscriptions': [
{
'name': 'all',
'stream_id': 3,
'description': '',
'color': '#95a5fd',
'pin_to_top': True,
'invite_only': False,
'in_home_view': False
}
]
},
], ids=[
'add_1',
'add_and_pin_2',
'add_and_mute_3'
])
def test__handle_subscription_event_add_stream(
self, model, mocker,
event,
initial_pinned_streams,
initial_unpinned_streams
):

model.pinned_streams = deepcopy(initial_pinned_streams)
model.unpinned_streams = deepcopy(initial_unpinned_streams)
model._handle_subscription_event(event)
is_stream_pinned = event['subscriptions'][0]['pin_to_top']
if is_stream_pinned:
new_pinned_stream = (model
.make_reduced_stream_data(
event['subscriptions'][0]))
expected_pinned_streams = (initial_pinned_streams
+ [new_pinned_stream])
expected_unpinned_streams = initial_unpinned_streams
sort_streams(expected_pinned_streams)
else:
new_unpinned_stream = (model
.make_reduced_stream_data(
event['subscriptions'][0]))
expected_unpinned_streams = (initial_unpinned_streams
+ [new_unpinned_stream])
expected_pinned_streams = initial_pinned_streams
sort_streams(expected_unpinned_streams)

assert model.pinned_streams == expected_pinned_streams
assert model.unpinned_streams == expected_unpinned_streams

(model.controller.view.left_panel.update_stream_view
.assert_called_once_with())
model.controller.update_screen.assert_called_once_with()

@pytest.mark.parametrize('event', [
{
'op': 'remove',
'type': 'subscription',
'subscriptions': [{
'name': 'Secret stream',
'stream_id': 99
}]
},
{
'op': 'remove',
'type': 'subscription',
'subscriptions': [{
'name': 'Some general stream',
'stream_id': 1000
}]
},
], ids=[
'remove_99',
'remove_1000'
])
def test__handle_subscription_event_remove_stream(
self, model, mocker,
event,
initial_pinned_streams,
initial_unpinned_streams,
stream_dict
):
model.pinned_streams = deepcopy(initial_pinned_streams)
model.unpinned_streams = deepcopy(initial_unpinned_streams)
model.stream_dict = stream_dict
stream_id = event['subscriptions'][0]['stream_id']
if model.is_pinned_stream(stream_id):
stream = model._get_stream_by_id(initial_pinned_streams, stream_id)
initial_pinned_streams.remove(stream)
else:
stream = (model
._get_stream_by_id(initial_unpinned_streams, stream_id))
initial_unpinned_streams.remove(stream)
model._handle_subscription_event(event)
expected_pinned_streams = initial_pinned_streams
expected_unpinned_streams = initial_unpinned_streams

assert model.pinned_streams == expected_pinned_streams
assert model.unpinned_streams == expected_unpinned_streams

update_left_panel = model.controller.view.left_panel.update_stream_view
update_left_panel.assert_called_once_with()
model.controller.update_screen.assert_called_once_with()

# Use LoopEnder with raising_event to cause the event loop to end without
# processing the event
class LoopEnder(Exception):
Expand Down
7 changes: 6 additions & 1 deletion tests/ui/test_ui_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def mock_external_classes(self, mocker):
self.model = mocker.MagicMock()
self.view = mocker.Mock()
self.urwid = mocker.patch(VIEWS + ".urwid")
self.model.controller.check_for_invalid_operation.return_value = False

@pytest.fixture
def msg_view(self, mocker, msg_box):
Expand Down Expand Up @@ -820,6 +821,7 @@ def mock_external_classes(self, mocker):
self.super = mocker.patch(VIEWS + '.urwid.Frame.__init__')
self.super_keypress = mocker.patch(VIEWS + '.urwid.Frame.keypress')
self.model.controller == mocker.Mock()
self.model.controller.check_for_invalid_operation.return_value = False

@pytest.fixture
def mid_col_view(self):
Expand Down Expand Up @@ -1266,6 +1268,7 @@ class TestMessageBox:
def mock_external_classes(self, mocker, initial_index):
self.model = mocker.MagicMock()
self.model.index = initial_index
self.model.controller.check_for_invalid_operation.return_value = False

@pytest.mark.parametrize('message_type, set_fields', [
('stream',
Expand Down Expand Up @@ -1875,7 +1878,9 @@ def test_main_view_generates_EDITED_label(self, mocker,
])
def test_keypress_STREAM_MESSAGE(self, mocker, msg_box, widget_size,
narrow, expect_to_prefill, key):
write_box = msg_box.model.controller.view.write_box
controller = msg_box.model.controller
controller.check_for_invalid_operation.return_value = False
write_box = controller.view.write_box
msg_box.model.narrow = narrow
size = widget_size(msg_box)

Expand Down
1 change: 1 addition & 0 deletions tests/ui_tools/test_boxes.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class TestWriteBox:
def mock_external_classes(self, mocker, initial_index):
self.view = mocker.Mock()
self.view.model = mocker.Mock()
self.view.model.is_user_in_unsubscribed_stream.return_value = False

@pytest.fixture()
def write_box(self, mocker, users_fixture, user_groups_fixture,
Expand Down
1 change: 1 addition & 0 deletions zulipterminal/api_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ class SubscriptionEvent(TypedDict):

value: bool
message_ids: List[int] # Present when subject of msg(s) is updated
subscriptions: List[Subscription] # Present when stream is added/removed


class TypingEvent(TypedDict):
Expand Down
8 changes: 8 additions & 0 deletions zulipterminal/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,14 @@ def raise_exception_in_main_thread(self,
self._critical_exception = True
os.write(self._exception_pipe, b'1')

def check_for_invalid_operation(self, stream_id: int) -> bool:
if self.model.is_user_in_unsubscribed_stream(stream_id):
self.model.controller.view.set_footer_text(
" This is not allowed in unsubscribed streams.",
3)
return True
return False

def is_in_editor_mode(self) -> bool:
return self._editor is not None

Expand Down
Loading