diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index d229e6b..18f36f0 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -27,6 +27,14 @@ kafka { name = "airplane_raw" partitions = 1 } + airplane-registration-number-raw-topic { + name = "airplane_registration_number_raw" + partitions = 1 + } + airplane-iata-code-raw-topic { + name = "airplane_iata_code_raw" + partitions = 1 + } flight-open-sky-raw-topic { name = "flight_opensky_raw" partitions = 1 diff --git a/src/main/scala/it/bitrock/dvs/streams/config/kafkaConfig.scala b/src/main/scala/it/bitrock/dvs/streams/config/kafkaConfig.scala index 85410e8..e08dc25 100644 --- a/src/main/scala/it/bitrock/dvs/streams/config/kafkaConfig.scala +++ b/src/main/scala/it/bitrock/dvs/streams/config/kafkaConfig.scala @@ -19,6 +19,8 @@ final case class TopologyConfig( airlineRawTopic: TopicMetadata, cityRawTopic: TopicMetadata, airplaneRawTopic: TopicMetadata, + airplaneRegistrationNumberRawTopic: TopicMetadata, + airplaneIataCodeRawTopic: TopicMetadata, airportInfoTopic: TopicMetadata, flightOpenSkyRawTopic: TopicMetadata, enhancedFlightRawTopic: TopicMetadata, diff --git a/src/main/scala/it/bitrock/dvs/streams/topologies/FlightReceivedStream.scala b/src/main/scala/it/bitrock/dvs/streams/topologies/FlightReceivedStream.scala index a9ef25f..c468ffa 100644 --- a/src/main/scala/it/bitrock/dvs/streams/topologies/FlightReceivedStream.scala +++ b/src/main/scala/it/bitrock/dvs/streams/topologies/FlightReceivedStream.scala @@ -26,12 +26,12 @@ object FlightReceivedStream { implicit val airportInfoSerde: Serde[AirportInfo] = kafkaStreamsOptions.airportInfoSerde implicit val flightReceivedEventSerde: Serde[FlightReceived] = kafkaStreamsOptions.flightReceivedEventSerde - val streamsBuilder = new StreamsBuilder - val flightRawStream = streamsBuilder.stream[String, FlightRaw](config.kafka.topology.enhancedFlightRawTopic.name) - val airportRawStream = streamsBuilder.stream[String, AirportRaw](config.kafka.topology.airportRawTopic.name) - val airlineRawTable = streamsBuilder.globalTable[String, AirlineRaw](config.kafka.topology.airlineRawTopic.name) - val airplaneRawTable = streamsBuilder.globalTable[String, AirplaneRaw](config.kafka.topology.airplaneRawTopic.name) - val cityRawTable = streamsBuilder.globalTable[String, CityRaw](config.kafka.topology.cityRawTopic.name) + val streamsBuilder = new StreamsBuilder + val flightRawStream = streamsBuilder.stream[String, FlightRaw](config.kafka.topology.enhancedFlightRawTopic.name) + val airportRawStream = streamsBuilder.stream[String, AirportRaw](config.kafka.topology.airportRawTopic.name) + val airplaneRawStream = streamsBuilder.stream[String, AirplaneRaw](config.kafka.topology.airplaneRawTopic.name) + val airlineRawTable = streamsBuilder.globalTable[String, AirlineRaw](config.kafka.topology.airlineRawTopic.name) + val cityRawTable = streamsBuilder.globalTable[String, CityRaw](config.kafka.topology.cityRawTopic.name) airportRawStream .leftJoin(cityRawTable)((_, ar) => ar.cityIataCode, airportRaw2AirportInfo) @@ -40,6 +40,18 @@ object FlightReceivedStream { val airportInfoTable: GlobalKTable[String, AirportInfo] = streamsBuilder.globalTable[String, AirportInfo](config.kafka.topology.airportInfoTopic.name) + airplaneRawStream + .to(config.kafka.topology.airplaneRegistrationNumberRawTopic.name) + + airplaneRawStream + .selectKey((_, v) => v.iataCode) + .to(config.kafka.topology.airplaneIataCodeRawTopic.name) + + val airplaneRegNumberRawTable = + streamsBuilder.globalTable[String, AirplaneRaw](config.kafka.topology.airplaneRegistrationNumberRawTopic.name) + val airplaneIataCodeRawTable = + streamsBuilder.globalTable[String, AirplaneRaw](config.kafka.topology.airplaneIataCodeRawTopic.name) + val flightJoinAirport: KStream[String, FlightWithAllAirportInfo] = flightRawToAirportEnrichment(flightRawStream, airportInfoTable) @@ -47,7 +59,7 @@ object FlightReceivedStream { flightWithAirportToAirlineEnrichment(flightJoinAirport, airlineRawTable) val flightAirportAirlineAirplane: KStream[String, FlightReceived] = - flightWithAirportAndAirlineToAirplaneEnrichment(flightAirportAirline, airplaneRawTable) + flightWithAirportAndAirlineToAirplaneEnrichment(flightAirportAirline, airplaneRegNumberRawTable, airplaneIataCodeRawTable) flightAirportAirlineAirplane.to(config.kafka.topology.flightReceivedTopic.name) @@ -93,12 +105,19 @@ object FlightReceivedStream { private def flightWithAirportAndAirlineToAirplaneEnrichment( flightWithAirline: KStream[String, FlightWithAirline], - airplaneRawTable: GlobalKTable[String, AirplaneRaw] + airplaneRegNumberRawTable: GlobalKTable[String, AirplaneRaw], + airplaneIataCodeRawTable: GlobalKTable[String, AirplaneRaw] ): KStream[String, FlightReceived] = flightWithAirline - .leftJoin(airplaneRawTable)( + .leftJoin(airplaneRegNumberRawTable)( (_, v) => v.airplaneRegNumber, - flightWithAirline2FlightReceived + Tuple2.apply + ) + .leftJoin(airplaneIataCodeRawTable)( + (_, v) => v._1.iataNumber, { + case ((flight, airplane), otherAirplane) => + flightWithAirline2FlightReceived(flight, Option(airplane).getOrElse(otherAirplane)) + } ) private def flightRaw2FlightWithDepartureAirportInfo( diff --git a/src/test/scala/it/bitrock/dvs/streams/topologies/FlightReceivedStreamSpec.scala b/src/test/scala/it/bitrock/dvs/streams/topologies/FlightReceivedStreamSpec.scala index a30087e..8152361 100644 --- a/src/test/scala/it/bitrock/dvs/streams/topologies/FlightReceivedStreamSpec.scala +++ b/src/test/scala/it/bitrock/dvs/streams/topologies/FlightReceivedStreamSpec.scala @@ -87,6 +87,89 @@ class FlightReceivedStreamSpec extends Suite with AnyWordSpecLike with EmbeddedK ) } + "be joined successfully with airplane info from iata code if no airplane info on registration number is found" in ResourceLoaner.withFixture { + case Resource(embeddedKafkaConfig, appConfig, kafkaStreamsOptions, topologies) => + implicit val embKafkaConfig: EmbeddedKafkaConfig = embeddedKafkaConfig + implicit val keySerde: Serde[String] = kafkaStreamsOptions.stringKeySerde + + val zurich = "Zurich" + val milan = "Milan" + val cityIataCode1 = "CIC1" + val cityIataCode2 = "CIC2" + val registrationNumber = "anotherRegistrationNumber" + + val (airportsInfo, receivedRecords) = + ResourceLoaner.runAll( + topologies(FlightReceivedTopology), + List( + appConfig.kafka.topology.flightRawTopic.name, + appConfig.kafka.topology.airplaneRawTopic.name, + appConfig.kafka.topology.enhancedFlightRawTopic.name, + appConfig.kafka.topology.airlineRawTopic.name + ) + ) { _ => + publishToKafka( + appConfig.kafka.topology.airportRawTopic.name, + List( + AirportEvent1.iataCode -> AirportEvent1.copy(cityIataCode = cityIataCode1), + AirportEvent2.iataCode -> AirportEvent2.copy(cityIataCode = cityIataCode2) + ) + ) + publishToKafka( + appConfig.kafka.topology.cityRawTopic.name, + List( + cityIataCode1 -> CityRaw(1L, zurich, cityIataCode1, "", 0d, 0d), + cityIataCode2 -> CityRaw(2L, milan, cityIataCode2, "", 0d, 0d) + ) + ) + publishToKafka(appConfig.kafka.topology.airlineRawTopic.name, AirlineEvent1.icaoCode, AirlineEvent1) + publishToKafka( + appConfig.kafka.topology.airplaneRawTopic.name, + registrationNumber, + AirplaneEvent.copy(iataCode = FlightIataCode, registrationNumber = registrationNumber) + ) + + val airportInfoMap = consumeNumberKeyedMessagesFromTopics[String, AirportInfo]( + topics = Set(appConfig.kafka.topology.airportInfoTopic.name), + number = 2, + timeout = ConsumerPollTimeout + ) + + publishToKafka(appConfig.kafka.topology.enhancedFlightRawTopic.name, FlightRawEvent.flight.icaoNumber, FlightRawEvent) + + val messagesMap = consumeNumberKeyedMessagesFromTopics[String, FlightReceived]( + topics = Set(appConfig.kafka.topology.flightReceivedTopic.name), + number = 1, + timeout = ConsumerPollTimeout + ) + + ( + airportInfoMap(appConfig.kafka.topology.airportInfoTopic.name), + messagesMap(appConfig.kafka.topology.flightReceivedTopic.name).head + ) + + } + + val departureAirportInfo = FlightReceivedEvent.departureAirport.copy(city = zurich) + val arrivalAirportInfo = FlightReceivedEvent.arrivalAirport.copy(city = milan) + + airportsInfo should contain theSameElementsAs List( + AirportEvent1.iataCode -> departureAirportInfo, + AirportEvent2.iataCode -> arrivalAirportInfo + ) + + receivedRecords shouldBe ( + ( + FlightReceivedEvent.icaoNumber, + FlightReceivedEvent.copy( + departureAirport = departureAirportInfo, + arrivalAirport = arrivalAirportInfo, + airplane = FlightReceivedEvent.airplane.copy(registrationNumber = registrationNumber) + ) + ) + ) + } + "be joined successfully with default airplane info and city info" in ResourceLoaner.withFixture { case Resource(embeddedKafkaConfig, appConfig, kafkaStreamsOptions, topologies) => implicit val embKafkaConfig: EmbeddedKafkaConfig = embeddedKafkaConfig