Skip to content

Commit

Permalink
fix(#3040): Change random topic to fixed with stream id (#3041)
Browse files Browse the repository at this point in the history
* fix(#3040): Change random topic to fixed with stream id

* fix(#3040): Remove if statement and sanitize stream id for topic name

* fix(#3040): Sanitized stream id and fix tests

* fix(#3040): Add stream id to river_function
  • Loading branch information
tenthe authored Jul 17, 2024
1 parent 56e7428 commit 95f18b6
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ def __init__(
name="prediction",
attributes=attributes,
broker=get_broker_description(client.dataStreamApi.get(stream_ids[0])), # type: ignore
stream_id=stream_ids[0]
)
function_definition = FunctionDefinition(consumed_streams=stream_ids).add_output_data_stream(output_stream)
self.sp_function = RiverFunction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class RuntimeType(Enum):
def create_data_stream(
name: str,
attributes: Dict[str, str],
stream_id: Optional[str] = None,
stream_id: str = None,
broker: SupportedBroker = SupportedBroker.NATS,
):
"""Creates a data stream
Expand Down Expand Up @@ -104,9 +104,18 @@ def create_data_stream(
)
]

sanitized_stream_id = stream_id.replace(" ", "")

# Assign a default topic name incorporating the unique stream ID to each protocol.
# This ensures the topic name remains consistent across function restarts, avoiding reliance on client-side defaults.
for protocol in transport_protocols:
protocol.topic_definition.actual_topic_name = f"org.apache.streampipes.connect.{sanitized_stream_id}"

data_stream = DataStream(
name=name, event_schema=event_schema, event_grounding=EventGrounding(transport_protocols=transport_protocols)
name=name,
event_schema=event_schema,
event_grounding=EventGrounding(transport_protocols=transport_protocols)
)
if stream_id:
data_stream.element_id = stream_id

data_stream.element_id = sanitized_stream_id
return data_stream
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,9 @@ def save_event(self, event: Dict[str, Any]):
)
)

output_stream = create_data_stream("test", attributes={"number": RuntimeType.INTEGER.value})
output_stream = create_data_stream("test",
attributes={"number": RuntimeType.INTEGER.value},
stream_id='a1d')
test_function = TestFunctionOutput(
function_definition=FunctionDefinition(
consumed_streams=["urn:streampipes.apache.org:eventstream:uPDKLI"]
Expand Down Expand Up @@ -436,7 +438,10 @@ def save_event(self, event: Dict[str, Any]):
)

output_stream = create_data_stream(
"test", attributes={"number": RuntimeType.INTEGER.value}, broker=SupportedBroker.KAFKA
"test",
stream_id='a1c',
attributes={"number": RuntimeType.INTEGER.value},
broker=SupportedBroker.KAFKA
)
test_function = TestFunctionOutput(
function_definition=FunctionDefinition(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ def predict_one(self, x):
class TestRiverFunction(TestCase):
def setUp(self) -> None:
self.data_stream = create_data_stream(
"test", attributes={"number": RuntimeType.FLOAT.value, "bool": RuntimeType.BOOLEAN.value}
"test",
attributes={"number": RuntimeType.FLOAT.value, "bool": RuntimeType.BOOLEAN.value},
stream_id="sample-stream"
).to_dict()

self.test_stream_data = [
Expand Down

0 comments on commit 95f18b6

Please sign in to comment.