Skip to content

Commit

Permalink
Feature/topic route (faust-streaming#200)
Browse files Browse the repository at this point in the history
* Fix canonical URL when setting either port of host from the CLI

* Fix canonical URL (reformatted)

* Duplicate table_route concept to support topic_route for external topics

* Fix router types

* fix topic_route formatting and add unitests

* Fix copy paste issue with table_route test

Co-authored-by: Ran Katzir <ran.katzir@valerann.com>
Co-authored-by: Vikram Patki <54442035+patkivikram@users.noreply.github.com>
  • Loading branch information
3 people authored Oct 26, 2021
1 parent 654b648 commit 4c1efbd
Show file tree
Hide file tree
Showing 9 changed files with 291 additions and 12 deletions.
9 changes: 8 additions & 1 deletion examples/word_count.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@ async def shuffle_words(posts):
for word in post.split():
await count_words.send(key=word, value=word)


last_count = {w:0 for w in WORDS}
@app.agent(value_type=str)
async def count_words(words):
"""Count words from blog post article body."""
async for word in words:
word_counts[word] += 1
last_count[word] = word_counts[word]


@app.page('/count/{word}/')
Expand All @@ -39,6 +40,12 @@ async def get_count(web, request, word):
word: word_counts[word],
})

@app.page('/last/{word}/')
@app.topic_route(topic=posts_topic, match_info='word')
async def get_last(web, request, word):
return web.json({
word: last_count,
})

@app.task
async def sender():
Expand Down
44 changes: 44 additions & 0 deletions faust/app/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1354,6 +1354,50 @@ async def get(

return _decorator

def topic_route(
self,
topic: CollectionT,
shard_param: str = None,
*,
query_param: str = None,
match_info: str = None,
exact_key: str = None,
) -> ViewDecorator:
"""Decorate view method to route request to a topic partition destination."""

def _decorator(fun: ViewHandlerFun) -> ViewHandlerFun:
_query_param = query_param
if shard_param is not None:
warnings.warn(DeprecationWarning(W_DEPRECATED_SHARD_PARAM))
if query_param:
raise TypeError("Cannot specify shard_param and query_param")
_query_param = shard_param
if _query_param is None and match_info is None and exact_key is None:
raise TypeError("Need one of query_param, shard_param, or exact key")

@wraps(fun)
async def get(
view: View, request: Request, *args: Any, **kwargs: Any
) -> Response:
if exact_key:
key = exact_key
elif match_info:
key = request.match_info[match_info]
elif _query_param:
key = request.query[_query_param]
else: # pragma: no cover
raise Exception("cannot get here")
try:
return await self.router.route_topic_req(
topic, key, view.web, request
)
except SameNode:
return await fun(view, request, *args, **kwargs) # type: ignore

return get

return _decorator

def command(
self, *options: Any, base: Optional[Type[_AppCommand]] = None, **kwargs: Any
) -> Callable[[Callable], Type[_AppCommand]]:
Expand Down
55 changes: 45 additions & 10 deletions faust/app/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from faust.types.core import K
from faust.types.router import HostToPartitionMap, RouterT
from faust.types.tables import CollectionT
from faust.types.topics import TopicT
from faust.types.web import Request, Response, Web
from faust.web.exceptions import ServiceUnavailable

Expand All @@ -29,6 +30,12 @@ def key_store(self, table_name: str, key: K) -> URL:
k = self._get_serialized_key(table, key)
return self._assignor.key_store(topic, k)

def external_topic_key_store(self, topic: TopicT, key: K) -> URL:
"""Return the URL of web server that processes the key in a topics."""
topic_name = topic.get_topic_name()
k = topic.prepare_key(key, None)[0]
return self._assignor.external_key_store(topic_name, k)

def table_metadata(self, table_name: str) -> HostToPartitionMap:
"""Return metadata stored for table in the partition assignor."""
table = self._get_table(table_name)
Expand All @@ -39,6 +46,10 @@ def tables_metadata(self) -> HostToPartitionMap:
"""Return metadata stored for all tables in the partition assignor."""
return self._assignor.tables_metadata()

def external_topics_metadata(self) -> HostToPartitionMap:
"""Return metadata stored for all external topics in the partition assignor."""
return self._assignor.external_topics_metadata()

@classmethod
def _get_table_topic(cls, table: CollectionT) -> str:
return table.changelog_topic.get_topic_name()
Expand All @@ -50,6 +61,19 @@ def _get_serialized_key(cls, table: CollectionT, key: K) -> bytes:
def _get_table(self, name: str) -> CollectionT:
return self.app.tables[name]

async def _route_req(self, dest_url: URL, web: Web, request: Request) -> Response:
app = self.app
dest_ident = (host, port) = self._urlident(dest_url)
if dest_ident == self._urlident(app.conf.canonical_url):
raise SameNode()
routed_url = request.url.with_host(host).with_port(int(port))
async with app.http_client.request(request.method, routed_url) as response:
return web.text(
await response.text(),
content_type=response.content_type,
status=response.status,
)

async def route_req(
self, table_name: str, key: K, web: Web, request: Request
) -> Response:
Expand All @@ -66,16 +90,27 @@ async def route_req(
dest_url: URL = app.router.key_store(table_name, key)
except KeyError:
raise ServiceUnavailable()
dest_ident = (host, port) = self._urlident(dest_url)
if dest_ident == self._urlident(app.conf.canonical_url):
raise SameNode()
routed_url = request.url.with_host(host).with_port(int(port))
async with app.http_client.request(request.method, routed_url) as response:
return web.text(
await response.text(),
content_type=response.content_type,
status=response.status,
)

return await self._route_req(dest_url, web, request)

async def route_topic_req(
self, topic: TopicT, key: K, web: Web, request: Request
) -> Response:
"""Route request to a worker that processes the partition with the given key.
Arguments:
topic: the topic to route request for.
key: The key that we want.
web: The currently sued web driver,
request: The web request currently being served.
"""
app = self.app
try:
dest_url: URL = app.router.external_topic_key_store(topic, key)
except KeyError:
raise ServiceUnavailable()

return await self._route_req(dest_url, web, request)

def _urlident(self, url: URL) -> Tuple[str, int]:
return (
Expand Down
1 change: 1 addition & 0 deletions faust/assignor/client_assignment.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ class ClientMetadata(
assignment: ClientAssignment
url: str
changelog_distribution: HostToPartitionMap
external_topic_distribution: HostToPartitionMap
topic_groups: Mapping[str, int] = cast(Mapping[str, int], None)

def __post_init__(self) -> None:
Expand Down
52 changes: 52 additions & 0 deletions faust/assignor/partition_assignor.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class PartitionAssignor(AbstractPartitionAssignor, PartitionAssignorT): # type:
"""PartitionAssignor handles internal topic creation.
Further, this assignor needs to be sticky and potentially redundant
In addition, it tracks external topic assignments as well (to support topic routes)
Notes:
Interface copied from :mod:`kafka.coordinator.assignors.abstract`.
Expand All @@ -59,9 +60,11 @@ class PartitionAssignor(AbstractPartitionAssignor, PartitionAssignorT): # type:
_table_manager: TableManagerT
_member_urls: MutableMapping[str, str]
_changelog_distribution: HostToPartitionMap
_external_topic_distribution: HostToPartitionMap
_active_tps: Set[TP]
_standby_tps: Set[TP]
_tps_url: MutableMapping[TP, str]
_external_tps_url: MutableMapping[TP, str]
_topic_groups: MutableMapping[str, int]

def __init__(self, app: AppT, replicas: int = 0) -> None:
Expand All @@ -70,9 +73,11 @@ def __init__(self, app: AppT, replicas: int = 0) -> None:
self._table_manager = self.app.tables
self._assignment = ClientAssignment(actives={}, standbys={})
self._changelog_distribution = {}
self._external_topic_distribution = {}
self.replicas = replicas
self._member_urls = {}
self._tps_url = {}
self._external_tps_url = {}
self._active_tps = set()
self._standby_tps = set()
self._topic_groups = {}
Expand All @@ -94,12 +99,27 @@ def changelog_distribution(self, value: HostToPartitionMap) -> None:
for partition in partitions
}

@property
def external_topic_distribution(self) -> HostToPartitionMap:
return self._external_topic_distribution

@external_topic_distribution.setter
def external_topic_distribution(self, value: HostToPartitionMap) -> None:
self._external_topic_distribution = value
self._external_tps_url = {
TP(topic, partition): url
for url, tps in self._external_topic_distribution.items()
for topic, partitions in tps.items()
for partition in partitions
}

@property
def _metadata(self) -> ClientMetadata:
return ClientMetadata(
assignment=self._assignment,
url=str(self._url),
changelog_distribution=self.changelog_distribution,
external_topic_distribution=self.external_topic_distribution,
topic_groups=self._topic_groups,
)

Expand All @@ -116,6 +136,7 @@ def on_assignment(self, assignment: ConsumerProtocolMemberMetadata) -> None:
self._active_tps = self._assignment.active_tps
self._standby_tps = self._assignment.standby_tps
self.changelog_distribution = metadata.changelog_distribution
self.external_topic_distribution = metadata.external_topic_distribution
a = sorted(assignment.assignment)
b = sorted(self._assignment.kafka_protocol_assignment(self._table_manager))
assert a == b, f"{a!r} != {b!r}"
Expand Down Expand Up @@ -278,9 +299,11 @@ def _perform_assignment(
)

changelog_distribution = self._get_changelog_distribution(assignments)
external_topic_distribution = self._get_external_topic_distribution(assignments)
res = self._protocol_assignments(
assignments,
changelog_distribution,
external_topic_distribution,
topic_to_group_id,
)
return res
Expand Down Expand Up @@ -312,6 +335,7 @@ def _protocol_assignments(
self,
assignments: ClientAssignmentMapping,
cl_distribution: HostToPartitionMap,
tp_distribution: HostToPartitionMap,
topic_groups: Mapping[str, int],
) -> MemberAssignmentMapping:
return {
Expand All @@ -323,6 +347,7 @@ def _protocol_assignments(
assignment=assignment,
url=self._member_urls[client],
changelog_distribution=cl_distribution,
external_topic_distribution=tp_distribution,
topic_groups=topic_groups,
).dumps(),
),
Expand All @@ -348,6 +373,16 @@ def _topics_filtered(
if topic in topics
}

@classmethod
def _non_table_topics_filtered(
cls, assignment: TopicToPartitionMap, topics: Set[str]
) -> TopicToPartitionMap:
return {
topic: partitions
for topic, partitions in assignment.items()
if topic not in topics
}

def _get_changelog_distribution(
self, assignments: ClientAssignmentMapping
) -> HostToPartitionMap:
Expand All @@ -357,6 +392,17 @@ def _get_changelog_distribution(
for client, assignment in assignments.items()
}

def _get_external_topic_distribution(
self, assignments: ClientAssignmentMapping
) -> HostToPartitionMap:
topics = self._table_manager.changelog_topics
return {
self._member_urls[client]: self._non_table_topics_filtered(
assignment.actives, topics
)
for client, assignment in assignments.items()
}

@property
def name(self) -> str:
return "faust"
Expand Down Expand Up @@ -388,9 +434,15 @@ def table_metadata(self, topic: str) -> HostToPartitionMap:
def tables_metadata(self) -> HostToPartitionMap:
return self.changelog_distribution

def external_topics_metadata(self) -> HostToPartitionMap:
return self.external_topic_distribution

def key_store(self, topic: str, key: bytes) -> URL:
return URL(self._tps_url[self.app.producer.key_partition(topic, key)])

def external_key_store(self, topic: str, key: bytes) -> URL:
return URL(self._external_tps_url[self.app.producer.key_partition(topic, key)])

def is_active(self, tp: TP) -> bool:
return tp in self._active_tps

Expand Down
15 changes: 15 additions & 0 deletions faust/types/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from . import web
from .assignor import HostToPartitionMap
from .core import K
from .topics import TopicT

if typing.TYPE_CHECKING:
from .app import AppT as _AppT
Expand All @@ -29,6 +30,10 @@ def __init__(self, app: _AppT) -> None:
def key_store(self, table_name: str, key: K) -> URL:
...

@abc.abstractmethod
def external_topic_key_store(self, topic: TopicT, key: K) -> URL:
...

@abc.abstractmethod
def table_metadata(self, table_name: str) -> HostToPartitionMap:
...
Expand All @@ -37,8 +42,18 @@ def table_metadata(self, table_name: str) -> HostToPartitionMap:
def tables_metadata(self) -> HostToPartitionMap:
...

@abc.abstractmethod
def external_topics_metadata(self) -> HostToPartitionMap:
...

@abc.abstractmethod
async def route_req(
self, table_name: str, key: K, web: web.Web, request: web.Request
) -> web.Response:
...

@abc.abstractmethod
async def route_topic_req(
self, topic: TopicT, key: K, web: web.Web, request: web.Request
) -> web.Response:
...
12 changes: 11 additions & 1 deletion faust/web/apps/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from faust import web
from faust.web.exceptions import ServiceUnavailable

__all__ = ["TableList", "TableDetail", "TableKeyDetail", "blueprint"]
__all__ = ["TableList", "TopicList", "TableDetail", "TableKeyDetail", "blueprint"]

blueprint = web.Blueprint("router")

Expand All @@ -24,6 +24,16 @@ async def get(self, request: web.Request) -> web.Response:
return self.json(router.tables_metadata())


@blueprint.route("/topics", name="list")
class TopicList(web.View):
"""List routes for all external topics."""

async def get(self, request: web.Request) -> web.Response:
"""Return JSON response with list of all table routes."""
router = self.app.router
return self.json(router.external_topics_metadata())


@blueprint.route("/{name}/", name="detail")
class TableDetail(web.View):
"""
Expand Down
Loading

0 comments on commit 4c1efbd

Please sign in to comment.