diff --git a/examples/word_count.py b/examples/word_count.py index ef20b2419..dafb1b98a 100755 --- a/examples/word_count.py +++ b/examples/word_count.py @@ -24,12 +24,13 @@ async def shuffle_words(posts): for word in post.split(): await count_words.send(key=word, value=word) - +last_count = {w:0 for w in WORDS} @app.agent(value_type=str) async def count_words(words): """Count words from blog post article body.""" async for word in words: word_counts[word] += 1 + last_count[word] = word_counts[word] @app.page('/count/{word}/') @@ -39,6 +40,12 @@ async def get_count(web, request, word): word: word_counts[word], }) +@app.page('/last/{word}/') +@app.topic_route(topic=posts_topic, match_info='word') +async def get_last(web, request, word): + return web.json({ + word: last_count, + }) @app.task async def sender(): diff --git a/faust/app/base.py b/faust/app/base.py index 4c6850baa..743a7c474 100644 --- a/faust/app/base.py +++ b/faust/app/base.py @@ -1354,6 +1354,50 @@ async def get( return _decorator + def topic_route( + self, + topic: CollectionT, + shard_param: str = None, + *, + query_param: str = None, + match_info: str = None, + exact_key: str = None, + ) -> ViewDecorator: + """Decorate view method to route request to a topic partition destination.""" + + def _decorator(fun: ViewHandlerFun) -> ViewHandlerFun: + _query_param = query_param + if shard_param is not None: + warnings.warn(DeprecationWarning(W_DEPRECATED_SHARD_PARAM)) + if query_param: + raise TypeError("Cannot specify shard_param and query_param") + _query_param = shard_param + if _query_param is None and match_info is None and exact_key is None: + raise TypeError("Need one of query_param, shard_param, or exact key") + + @wraps(fun) + async def get( + view: View, request: Request, *args: Any, **kwargs: Any + ) -> Response: + if exact_key: + key = exact_key + elif match_info: + key = request.match_info[match_info] + elif _query_param: + key = request.query[_query_param] + else: # pragma: no cover + raise Exception("cannot get here") + try: + return await self.router.route_topic_req( + topic, key, view.web, request + ) + except SameNode: + return await fun(view, request, *args, **kwargs) # type: ignore + + return get + + return _decorator + def command( self, *options: Any, base: Optional[Type[_AppCommand]] = None, **kwargs: Any ) -> Callable[[Callable], Type[_AppCommand]]: diff --git a/faust/app/router.py b/faust/app/router.py index 5bf0a7c60..3adc64ff8 100644 --- a/faust/app/router.py +++ b/faust/app/router.py @@ -9,6 +9,7 @@ from faust.types.core import K from faust.types.router import HostToPartitionMap, RouterT from faust.types.tables import CollectionT +from faust.types.topics import TopicT from faust.types.web import Request, Response, Web from faust.web.exceptions import ServiceUnavailable @@ -29,6 +30,12 @@ def key_store(self, table_name: str, key: K) -> URL: k = self._get_serialized_key(table, key) return self._assignor.key_store(topic, k) + def external_topic_key_store(self, topic: TopicT, key: K) -> URL: + """Return the URL of web server that processes the key in a topics.""" + topic_name = topic.get_topic_name() + k = topic.prepare_key(key, None)[0] + return self._assignor.external_key_store(topic_name, k) + def table_metadata(self, table_name: str) -> HostToPartitionMap: """Return metadata stored for table in the partition assignor.""" table = self._get_table(table_name) @@ -39,6 +46,10 @@ def tables_metadata(self) -> HostToPartitionMap: """Return metadata stored for all tables in the partition assignor.""" return self._assignor.tables_metadata() + def external_topics_metadata(self) -> HostToPartitionMap: + """Return metadata stored for all external topics in the partition assignor.""" + return self._assignor.external_topics_metadata() + @classmethod def _get_table_topic(cls, table: CollectionT) -> str: return table.changelog_topic.get_topic_name() @@ -50,6 +61,19 @@ def _get_serialized_key(cls, table: CollectionT, key: K) -> bytes: def _get_table(self, name: str) -> CollectionT: return self.app.tables[name] + async def _route_req(self, dest_url: URL, web: Web, request: Request) -> Response: + app = self.app + dest_ident = (host, port) = self._urlident(dest_url) + if dest_ident == self._urlident(app.conf.canonical_url): + raise SameNode() + routed_url = request.url.with_host(host).with_port(int(port)) + async with app.http_client.request(request.method, routed_url) as response: + return web.text( + await response.text(), + content_type=response.content_type, + status=response.status, + ) + async def route_req( self, table_name: str, key: K, web: Web, request: Request ) -> Response: @@ -66,16 +90,27 @@ async def route_req( dest_url: URL = app.router.key_store(table_name, key) except KeyError: raise ServiceUnavailable() - dest_ident = (host, port) = self._urlident(dest_url) - if dest_ident == self._urlident(app.conf.canonical_url): - raise SameNode() - routed_url = request.url.with_host(host).with_port(int(port)) - async with app.http_client.request(request.method, routed_url) as response: - return web.text( - await response.text(), - content_type=response.content_type, - status=response.status, - ) + + return await self._route_req(dest_url, web, request) + + async def route_topic_req( + self, topic: TopicT, key: K, web: Web, request: Request + ) -> Response: + """Route request to a worker that processes the partition with the given key. + + Arguments: + topic: the topic to route request for. + key: The key that we want. + web: The currently sued web driver, + request: The web request currently being served. + """ + app = self.app + try: + dest_url: URL = app.router.external_topic_key_store(topic, key) + except KeyError: + raise ServiceUnavailable() + + return await self._route_req(dest_url, web, request) def _urlident(self, url: URL) -> Tuple[str, int]: return ( diff --git a/faust/assignor/client_assignment.py b/faust/assignor/client_assignment.py index 5441c6cfa..3b1205f23 100644 --- a/faust/assignor/client_assignment.py +++ b/faust/assignor/client_assignment.py @@ -149,6 +149,7 @@ class ClientMetadata( assignment: ClientAssignment url: str changelog_distribution: HostToPartitionMap + external_topic_distribution: HostToPartitionMap topic_groups: Mapping[str, int] = cast(Mapping[str, int], None) def __post_init__(self) -> None: diff --git a/faust/assignor/partition_assignor.py b/faust/assignor/partition_assignor.py index f5e6922f7..78c15c4f9 100644 --- a/faust/assignor/partition_assignor.py +++ b/faust/assignor/partition_assignor.py @@ -50,6 +50,7 @@ class PartitionAssignor(AbstractPartitionAssignor, PartitionAssignorT): # type: """PartitionAssignor handles internal topic creation. Further, this assignor needs to be sticky and potentially redundant + In addition, it tracks external topic assignments as well (to support topic routes) Notes: Interface copied from :mod:`kafka.coordinator.assignors.abstract`. @@ -59,9 +60,11 @@ class PartitionAssignor(AbstractPartitionAssignor, PartitionAssignorT): # type: _table_manager: TableManagerT _member_urls: MutableMapping[str, str] _changelog_distribution: HostToPartitionMap + _external_topic_distribution: HostToPartitionMap _active_tps: Set[TP] _standby_tps: Set[TP] _tps_url: MutableMapping[TP, str] + _external_tps_url: MutableMapping[TP, str] _topic_groups: MutableMapping[str, int] def __init__(self, app: AppT, replicas: int = 0) -> None: @@ -70,9 +73,11 @@ def __init__(self, app: AppT, replicas: int = 0) -> None: self._table_manager = self.app.tables self._assignment = ClientAssignment(actives={}, standbys={}) self._changelog_distribution = {} + self._external_topic_distribution = {} self.replicas = replicas self._member_urls = {} self._tps_url = {} + self._external_tps_url = {} self._active_tps = set() self._standby_tps = set() self._topic_groups = {} @@ -94,12 +99,27 @@ def changelog_distribution(self, value: HostToPartitionMap) -> None: for partition in partitions } + @property + def external_topic_distribution(self) -> HostToPartitionMap: + return self._external_topic_distribution + + @external_topic_distribution.setter + def external_topic_distribution(self, value: HostToPartitionMap) -> None: + self._external_topic_distribution = value + self._external_tps_url = { + TP(topic, partition): url + for url, tps in self._external_topic_distribution.items() + for topic, partitions in tps.items() + for partition in partitions + } + @property def _metadata(self) -> ClientMetadata: return ClientMetadata( assignment=self._assignment, url=str(self._url), changelog_distribution=self.changelog_distribution, + external_topic_distribution=self.external_topic_distribution, topic_groups=self._topic_groups, ) @@ -116,6 +136,7 @@ def on_assignment(self, assignment: ConsumerProtocolMemberMetadata) -> None: self._active_tps = self._assignment.active_tps self._standby_tps = self._assignment.standby_tps self.changelog_distribution = metadata.changelog_distribution + self.external_topic_distribution = metadata.external_topic_distribution a = sorted(assignment.assignment) b = sorted(self._assignment.kafka_protocol_assignment(self._table_manager)) assert a == b, f"{a!r} != {b!r}" @@ -278,9 +299,11 @@ def _perform_assignment( ) changelog_distribution = self._get_changelog_distribution(assignments) + external_topic_distribution = self._get_external_topic_distribution(assignments) res = self._protocol_assignments( assignments, changelog_distribution, + external_topic_distribution, topic_to_group_id, ) return res @@ -312,6 +335,7 @@ def _protocol_assignments( self, assignments: ClientAssignmentMapping, cl_distribution: HostToPartitionMap, + tp_distribution: HostToPartitionMap, topic_groups: Mapping[str, int], ) -> MemberAssignmentMapping: return { @@ -323,6 +347,7 @@ def _protocol_assignments( assignment=assignment, url=self._member_urls[client], changelog_distribution=cl_distribution, + external_topic_distribution=tp_distribution, topic_groups=topic_groups, ).dumps(), ), @@ -348,6 +373,16 @@ def _topics_filtered( if topic in topics } + @classmethod + def _non_table_topics_filtered( + cls, assignment: TopicToPartitionMap, topics: Set[str] + ) -> TopicToPartitionMap: + return { + topic: partitions + for topic, partitions in assignment.items() + if topic not in topics + } + def _get_changelog_distribution( self, assignments: ClientAssignmentMapping ) -> HostToPartitionMap: @@ -357,6 +392,17 @@ def _get_changelog_distribution( for client, assignment in assignments.items() } + def _get_external_topic_distribution( + self, assignments: ClientAssignmentMapping + ) -> HostToPartitionMap: + topics = self._table_manager.changelog_topics + return { + self._member_urls[client]: self._non_table_topics_filtered( + assignment.actives, topics + ) + for client, assignment in assignments.items() + } + @property def name(self) -> str: return "faust" @@ -388,9 +434,15 @@ def table_metadata(self, topic: str) -> HostToPartitionMap: def tables_metadata(self) -> HostToPartitionMap: return self.changelog_distribution + def external_topics_metadata(self) -> HostToPartitionMap: + return self.external_topic_distribution + def key_store(self, topic: str, key: bytes) -> URL: return URL(self._tps_url[self.app.producer.key_partition(topic, key)]) + def external_key_store(self, topic: str, key: bytes) -> URL: + return URL(self._external_tps_url[self.app.producer.key_partition(topic, key)]) + def is_active(self, tp: TP) -> bool: return tp in self._active_tps diff --git a/faust/types/router.py b/faust/types/router.py index c52fe889b..011027875 100644 --- a/faust/types/router.py +++ b/faust/types/router.py @@ -7,6 +7,7 @@ from . import web from .assignor import HostToPartitionMap from .core import K +from .topics import TopicT if typing.TYPE_CHECKING: from .app import AppT as _AppT @@ -29,6 +30,10 @@ def __init__(self, app: _AppT) -> None: def key_store(self, table_name: str, key: K) -> URL: ... + @abc.abstractmethod + def external_topic_key_store(self, topic: TopicT, key: K) -> URL: + ... + @abc.abstractmethod def table_metadata(self, table_name: str) -> HostToPartitionMap: ... @@ -37,8 +42,18 @@ def table_metadata(self, table_name: str) -> HostToPartitionMap: def tables_metadata(self) -> HostToPartitionMap: ... + @abc.abstractmethod + def external_topics_metadata(self) -> HostToPartitionMap: + ... + @abc.abstractmethod async def route_req( self, table_name: str, key: K, web: web.Web, request: web.Request ) -> web.Response: ... + + @abc.abstractmethod + async def route_topic_req( + self, topic: TopicT, key: K, web: web.Web, request: web.Request + ) -> web.Response: + ... diff --git a/faust/web/apps/router.py b/faust/web/apps/router.py index 9a62d1688..d1897cbd2 100644 --- a/faust/web/apps/router.py +++ b/faust/web/apps/router.py @@ -2,7 +2,7 @@ from faust import web from faust.web.exceptions import ServiceUnavailable -__all__ = ["TableList", "TableDetail", "TableKeyDetail", "blueprint"] +__all__ = ["TableList", "TopicList", "TableDetail", "TableKeyDetail", "blueprint"] blueprint = web.Blueprint("router") @@ -24,6 +24,16 @@ async def get(self, request: web.Request) -> web.Response: return self.json(router.tables_metadata()) +@blueprint.route("/topics", name="list") +class TopicList(web.View): + """List routes for all external topics.""" + + async def get(self, request: web.Request) -> web.Response: + """Return JSON response with list of all table routes.""" + router = self.app.router + return self.json(router.external_topics_metadata()) + + @blueprint.route("/{name}/", name="detail") class TableDetail(web.View): """ diff --git a/tests/unit/app/test_base.py b/tests/unit/app/test_base.py index ce93e3410..c29b3ffbb 100644 --- a/tests/unit/app/test_base.py +++ b/tests/unit/app/test_base.py @@ -951,6 +951,89 @@ def test_table_route__missing_param(self, *, app): async def view(self, request): ... + @pytest.mark.asyncio + async def test_topic_route__query_param(self, *, app): + topic = app.topic("foo") + view = Mock() + request = Mock() + app.router.route_topic_req = AsyncMock() + + @app.topic_route(topic, query_param="q") + async def routed(self, request): + return 42 + + request.query = {"q": "KEY"} + + ret = await routed(view, request) + assert ret is app.router.route_topic_req.coro.return_value + + app.router.route_topic_req.coro.side_effect = SameNode() + ret = await routed(view, request) + assert ret == 42 + + @pytest.mark.asyncio + async def test_topic_route__match_info(self, *, app): + topic = app.topic("foo") + view = Mock() + request = Mock() + app.router.route_topic_req = AsyncMock() + + @app.topic_route(topic, match_info="q") + async def routed(self, request): + return 42 + + request.match_info = {"q": "KEY"} + + ret = await routed(view, request) + assert ret is app.router.route_topic_req.coro.return_value + + app.router.route_topic_req.coro.side_effect = SameNode() + ret = await routed(view, request) + assert ret == 42 + + @pytest.mark.asyncio + async def test_topic_route__exact_key(self, *, app): + topic = app.topic("foo") + view = Mock() + request = Mock() + app.router.route_topic_req = AsyncMock() + + @app.topic_route(topic, exact_key="active") + async def routed(self, request): + return 42 + + ret = await routed(view, request) + assert ret is app.router.route_topic_req.coro.return_value + + app.router.route_topic_req.coro.side_effect = SameNode() + ret = await routed(view, request) + assert ret == 42 + + def test_topic_route__compat_shard_param(self, *, app): + topic = app.topic("foo") + with pytest.warns(DeprecationWarning): + + @app.topic_route(topic, shard_param="x") + async def view(self, request): + ... + + def test_topic_route__query_param_and_shard_param(self, *, app): + topic = app.topic("foo") + with pytest.warns(DeprecationWarning): + with pytest.raises(TypeError): + + @app.topic_route(topic, query_param="q", shard_param="x") + async def view(self, request): + ... + + def test_topic_route__missing_param(self, *, app): + topic = app.topic("foo") + with pytest.raises(TypeError): + + @app.topic_route(topic) + async def view(self, request): + ... + def test_command(self, *, app): @app.command() async def foo(): diff --git a/tests/unit/app/test_router.py b/tests/unit/app/test_router.py index 8175d6a36..1feaab230 100644 --- a/tests/unit/app/test_router.py +++ b/tests/unit/app/test_router.py @@ -41,6 +41,19 @@ def test_key_store(self, *, router, app, assignor): table.changelog_topic.prepare_key.return_value, ) + def test_external_topic_key_store(self, *, router, app, assignor): + topic = Mock() + prepare_key = topic.prepare_key + prepare_key.return_value = [Mock(name="v1"), Mock(name="v2")] + assert ( + router.external_topic_key_store(topic, "k") + is assignor.external_key_store.return_value + ) + assignor.external_key_store.assert_called_once_with( + topic.get_topic_name(), + topic.prepare_key.return_value[0], + ) + def test_table_metadata(self, *, router, app, assignor): table = app.tables["foo"] = Mock(name="table") ret = router.table_metadata("foo") @@ -54,6 +67,11 @@ def test_tables_metadata(self, *, router, assignor): assert res is assignor.tables_metadata.return_value assignor.tables_metadata.assert_called_once_with() + def test_external_topics_metadata(self, *, router, assignor): + res = router.external_topics_metadata() + assert res is assignor.external_topics_metadata.return_value + assignor.external_topics_metadata.assert_called_once_with() + @pytest.mark.asyncio async def test_route_req__unavail(self, *, router, app): web = Mock(name="web") @@ -101,3 +119,17 @@ async def test_route_req_method(self, *, router, app, mock_http_client): mock_http_client.request.assert_called_once_with( request_method, request.url.with_host(routed_url).with_port(routed_port) ) + + @pytest.mark.asyncio + @pytest.mark.http_session(text=b"foobar") + async def test_topic_route_req(self, *, router, app, mock_http_client): + app.conf.canonical_url = URL("http://ge.example.com:8181") + web = Mock(name="web") + request = Mock(name="request") + app.router.external_topic_key_store = Mock() + app.router.external_topic_key_store.return_value = URL( + "http://el.example.com:8181" + ) + response = await router.route_topic_req("foo", "k", web, request) + assert response is web.text.return_value + web.text.assert_called_once_with(b"foobar", content_type=ANY, status=ANY)