You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
#adding.py# based on examples/agent.pyimportfaustfromfaustimportTopicclassAdd(faust.Record, serializer='json'):
a: intb: intclassResult(faust.Record,serializer='json'):
r : intsummands : AddclassSettings:
FAUST_KAFKA_BROCKER='kafka://localhost:9093'KAFKA_BROCKERS= ['localhost:9093']
classEcoTopic(Topic):
def__init__(self, *args, **kwargs) ->None:
""" The default replication for Faust topics is broken, this class changes the default replication from 0 to 1 and should be used as replacement for Fausts own topics until this is fixed in the library """ifkwargs.get("replicas") ==0:
kwargs["replicas"] =1super().__init__(*args, **kwargs)
app=faust.App(
'add_app',
broker=Settings.FAUST_KAFKA_BROCKER,
Topic=EcoTopic
)
topic=app.topic('adding', value_type=Add)
@app.agent(topic)asyncdefadding(stream):
asyncforvalueinstream:
yieldResult(value.a+value.b,value)
##Fix
When fulfilling reply promises the return value should be wrapped in a call to maybe_model
##Details
Faust uses some internal store to Register all created Subclasses of Record, when deserializing some special fields in the
deserialized data are used to detect that this should be actually converted to a Record. The corresponding logic is in maye_model.
The text was updated successfully, but these errors were encountered:
Version Current master
Steps to reproduce
Expected behavior
Return value should be of class
Result
Actual behavior
Return value is plain json/dict
##Fix
When fulfilling reply promises the return value should be wrapped in a call to
maybe_model
##Details
Faust uses some internal store to Register all created Subclasses of
Record
, when deserializing some special fields in thedeserialized data are used to detect that this should be actually converted to a
Record
. The corresponding logic is inmaye_model
.The text was updated successfully, but these errors were encountered: