Skip to content

Commit

Permalink
Merge branch 'master' into fix_default_web_host
Browse files Browse the repository at this point in the history
  • Loading branch information
patkivikram authored Nov 1, 2021
2 parents c422624 + 9385c81 commit 2aca497
Show file tree
Hide file tree
Showing 34 changed files with 766 additions and 140 deletions.
1 change: 0 additions & 1 deletion docs/userguide/cli.rst
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ Example:
--json / --no-json Prefer data to be emitted in json format.
-D, --datadir DIRECTORY Directory to keep application state.
-W, --workdir DIRECTORY Working directory to change to after start.
--no-color / --color Enable colors in output.
--debug / --no-debug Enable debugging output, and the blocking
detector.
-q, --quiet / --no-quiet Silence output to <stdout>/<stderr>.
Expand Down
9 changes: 8 additions & 1 deletion examples/word_count.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@ async def shuffle_words(posts):
for word in post.split():
await count_words.send(key=word, value=word)


last_count = {w:0 for w in WORDS}
@app.agent(value_type=str)
async def count_words(words):
"""Count words from blog post article body."""
async for word in words:
word_counts[word] += 1
last_count[word] = word_counts[word]


@app.page('/count/{word}/')
Expand All @@ -39,6 +40,12 @@ async def get_count(web, request, word):
word: word_counts[word],
})

@app.page('/last/{word}/')
@app.topic_route(topic=posts_topic, match_info='word')
async def get_last(web, request, word):
return web.json({
word: last_count,
})

@app.task
async def sender():
Expand Down
44 changes: 44 additions & 0 deletions faust/app/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1354,6 +1354,50 @@ async def get(

return _decorator

def topic_route(
self,
topic: CollectionT,
shard_param: str = None,
*,
query_param: str = None,
match_info: str = None,
exact_key: str = None,
) -> ViewDecorator:
"""Decorate view method to route request to a topic partition destination."""

def _decorator(fun: ViewHandlerFun) -> ViewHandlerFun:
_query_param = query_param
if shard_param is not None:
warnings.warn(DeprecationWarning(W_DEPRECATED_SHARD_PARAM))
if query_param:
raise TypeError("Cannot specify shard_param and query_param")
_query_param = shard_param
if _query_param is None and match_info is None and exact_key is None:
raise TypeError("Need one of query_param, shard_param, or exact key")

@wraps(fun)
async def get(
view: View, request: Request, *args: Any, **kwargs: Any
) -> Response:
if exact_key:
key = exact_key
elif match_info:
key = request.match_info[match_info]
elif _query_param:
key = request.query[_query_param]
else: # pragma: no cover
raise Exception("cannot get here")
try:
return await self.router.route_topic_req(
topic, key, view.web, request
)
except SameNode:
return await fun(view, request, *args, **kwargs) # type: ignore

return get

return _decorator

def command(
self, *options: Any, base: Optional[Type[_AppCommand]] = None, **kwargs: Any
) -> Callable[[Callable], Type[_AppCommand]]:
Expand Down
55 changes: 45 additions & 10 deletions faust/app/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from faust.types.core import K
from faust.types.router import HostToPartitionMap, RouterT
from faust.types.tables import CollectionT
from faust.types.topics import TopicT
from faust.types.web import Request, Response, Web
from faust.web.exceptions import ServiceUnavailable

Expand All @@ -29,6 +30,12 @@ def key_store(self, table_name: str, key: K) -> URL:
k = self._get_serialized_key(table, key)
return self._assignor.key_store(topic, k)

def external_topic_key_store(self, topic: TopicT, key: K) -> URL:
"""Return the URL of web server that processes the key in a topics."""
topic_name = topic.get_topic_name()
k = topic.prepare_key(key, None)[0]
return self._assignor.external_key_store(topic_name, k)

def table_metadata(self, table_name: str) -> HostToPartitionMap:
"""Return metadata stored for table in the partition assignor."""
table = self._get_table(table_name)
Expand All @@ -39,6 +46,10 @@ def tables_metadata(self) -> HostToPartitionMap:
"""Return metadata stored for all tables in the partition assignor."""
return self._assignor.tables_metadata()

def external_topics_metadata(self) -> HostToPartitionMap:
"""Return metadata stored for all external topics in the partition assignor."""
return self._assignor.external_topics_metadata()

@classmethod
def _get_table_topic(cls, table: CollectionT) -> str:
return table.changelog_topic.get_topic_name()
Expand All @@ -50,6 +61,19 @@ def _get_serialized_key(cls, table: CollectionT, key: K) -> bytes:
def _get_table(self, name: str) -> CollectionT:
return self.app.tables[name]

async def _route_req(self, dest_url: URL, web: Web, request: Request) -> Response:
app = self.app
dest_ident = (host, port) = self._urlident(dest_url)
if dest_ident == self._urlident(app.conf.canonical_url):
raise SameNode()
routed_url = request.url.with_host(host).with_port(int(port))
async with app.http_client.request(request.method, routed_url) as response:
return web.text(
await response.text(),
content_type=response.content_type,
status=response.status,
)

async def route_req(
self, table_name: str, key: K, web: Web, request: Request
) -> Response:
Expand All @@ -66,16 +90,27 @@ async def route_req(
dest_url: URL = app.router.key_store(table_name, key)
except KeyError:
raise ServiceUnavailable()
dest_ident = (host, port) = self._urlident(dest_url)
if dest_ident == self._urlident(app.conf.canonical_url):
raise SameNode()
routed_url = request.url.with_host(host).with_port(int(port))
async with app.http_client.request(request.method, routed_url) as response:
return web.text(
await response.text(),
content_type=response.content_type,
status=response.status,
)

return await self._route_req(dest_url, web, request)

async def route_topic_req(
self, topic: TopicT, key: K, web: Web, request: Request
) -> Response:
"""Route request to a worker that processes the partition with the given key.
Arguments:
topic: the topic to route request for.
key: The key that we want.
web: The currently sued web driver,
request: The web request currently being served.
"""
app = self.app
try:
dest_url: URL = app.router.external_topic_key_store(topic, key)
except KeyError:
raise ServiceUnavailable()

return await self._route_req(dest_url, web, request)

def _urlident(self, url: URL) -> Tuple[str, int]:
return (
Expand Down
1 change: 1 addition & 0 deletions faust/assignor/client_assignment.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ class ClientMetadata(
assignment: ClientAssignment
url: str
changelog_distribution: HostToPartitionMap
external_topic_distribution: HostToPartitionMap
topic_groups: Mapping[str, int] = cast(Mapping[str, int], None)

def __post_init__(self) -> None:
Expand Down
52 changes: 52 additions & 0 deletions faust/assignor/partition_assignor.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class PartitionAssignor(AbstractPartitionAssignor, PartitionAssignorT): # type:
"""PartitionAssignor handles internal topic creation.
Further, this assignor needs to be sticky and potentially redundant
In addition, it tracks external topic assignments as well (to support topic routes)
Notes:
Interface copied from :mod:`kafka.coordinator.assignors.abstract`.
Expand All @@ -59,9 +60,11 @@ class PartitionAssignor(AbstractPartitionAssignor, PartitionAssignorT): # type:
_table_manager: TableManagerT
_member_urls: MutableMapping[str, str]
_changelog_distribution: HostToPartitionMap
_external_topic_distribution: HostToPartitionMap
_active_tps: Set[TP]
_standby_tps: Set[TP]
_tps_url: MutableMapping[TP, str]
_external_tps_url: MutableMapping[TP, str]
_topic_groups: MutableMapping[str, int]

def __init__(self, app: AppT, replicas: int = 0) -> None:
Expand All @@ -70,9 +73,11 @@ def __init__(self, app: AppT, replicas: int = 0) -> None:
self._table_manager = self.app.tables
self._assignment = ClientAssignment(actives={}, standbys={})
self._changelog_distribution = {}
self._external_topic_distribution = {}
self.replicas = replicas
self._member_urls = {}
self._tps_url = {}
self._external_tps_url = {}
self._active_tps = set()
self._standby_tps = set()
self._topic_groups = {}
Expand All @@ -94,12 +99,27 @@ def changelog_distribution(self, value: HostToPartitionMap) -> None:
for partition in partitions
}

@property
def external_topic_distribution(self) -> HostToPartitionMap:
return self._external_topic_distribution

@external_topic_distribution.setter
def external_topic_distribution(self, value: HostToPartitionMap) -> None:
self._external_topic_distribution = value
self._external_tps_url = {
TP(topic, partition): url
for url, tps in self._external_topic_distribution.items()
for topic, partitions in tps.items()
for partition in partitions
}

@property
def _metadata(self) -> ClientMetadata:
return ClientMetadata(
assignment=self._assignment,
url=str(self._url),
changelog_distribution=self.changelog_distribution,
external_topic_distribution=self.external_topic_distribution,
topic_groups=self._topic_groups,
)

Expand All @@ -116,6 +136,7 @@ def on_assignment(self, assignment: ConsumerProtocolMemberMetadata) -> None:
self._active_tps = self._assignment.active_tps
self._standby_tps = self._assignment.standby_tps
self.changelog_distribution = metadata.changelog_distribution
self.external_topic_distribution = metadata.external_topic_distribution
a = sorted(assignment.assignment)
b = sorted(self._assignment.kafka_protocol_assignment(self._table_manager))
assert a == b, f"{a!r} != {b!r}"
Expand Down Expand Up @@ -278,9 +299,11 @@ def _perform_assignment(
)

changelog_distribution = self._get_changelog_distribution(assignments)
external_topic_distribution = self._get_external_topic_distribution(assignments)
res = self._protocol_assignments(
assignments,
changelog_distribution,
external_topic_distribution,
topic_to_group_id,
)
return res
Expand Down Expand Up @@ -312,6 +335,7 @@ def _protocol_assignments(
self,
assignments: ClientAssignmentMapping,
cl_distribution: HostToPartitionMap,
tp_distribution: HostToPartitionMap,
topic_groups: Mapping[str, int],
) -> MemberAssignmentMapping:
return {
Expand All @@ -323,6 +347,7 @@ def _protocol_assignments(
assignment=assignment,
url=self._member_urls[client],
changelog_distribution=cl_distribution,
external_topic_distribution=tp_distribution,
topic_groups=topic_groups,
).dumps(),
),
Expand All @@ -348,6 +373,16 @@ def _topics_filtered(
if topic in topics
}

@classmethod
def _non_table_topics_filtered(
cls, assignment: TopicToPartitionMap, topics: Set[str]
) -> TopicToPartitionMap:
return {
topic: partitions
for topic, partitions in assignment.items()
if topic not in topics
}

def _get_changelog_distribution(
self, assignments: ClientAssignmentMapping
) -> HostToPartitionMap:
Expand All @@ -357,6 +392,17 @@ def _get_changelog_distribution(
for client, assignment in assignments.items()
}

def _get_external_topic_distribution(
self, assignments: ClientAssignmentMapping
) -> HostToPartitionMap:
topics = self._table_manager.changelog_topics
return {
self._member_urls[client]: self._non_table_topics_filtered(
assignment.actives, topics
)
for client, assignment in assignments.items()
}

@property
def name(self) -> str:
return "faust"
Expand Down Expand Up @@ -388,9 +434,15 @@ def table_metadata(self, topic: str) -> HostToPartitionMap:
def tables_metadata(self) -> HostToPartitionMap:
return self.changelog_distribution

def external_topics_metadata(self) -> HostToPartitionMap:
return self.external_topic_distribution

def key_store(self, topic: str, key: bytes) -> URL:
return URL(self._tps_url[self.app.producer.key_partition(topic, key)])

def external_key_store(self, topic: str, key: bytes) -> URL:
return URL(self._external_tps_url[self.app.producer.key_partition(topic, key)])

def is_active(self, tp: TP) -> bool:
return tp in self._active_tps

Expand Down
4 changes: 2 additions & 2 deletions faust/cli/agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ def agents(self, *, local: bool = False) -> Sequence[AgentT]:
def agent_to_row(self, agent: AgentT) -> Sequence[str]:
"""Convert agent fields to terminal table row."""
return [
self.bold_tail(self._name(agent)),
self._name(agent),
self._topic(agent),
self.dark(self._help(agent)),
self._help(agent),
]

def _name(self, agent: AgentT) -> str:
Expand Down
Loading

0 comments on commit 2aca497

Please sign in to comment.