Skip to content

Commit

Permalink
Merge pull request #166 from bitrockteam/release/1.2.1
Browse files Browse the repository at this point in the history
Release 1.2.1
  • Loading branch information
simoexpo committed Mar 12, 2020
2 parents 5f43cda + 93ba544 commit 0629dfa
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 10 deletions.
8 changes: 8 additions & 0 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ kafka {
name = "airplane_raw"
partitions = 1
}
airplane-registration-number-raw-topic {
name = "airplane_registration_number_raw"
partitions = 1
}
airplane-iata-code-raw-topic {
name = "airplane_iata_code_raw"
partitions = 1
}
flight-open-sky-raw-topic {
name = "flight_opensky_raw"
partitions = 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ final case class TopologyConfig(
airlineRawTopic: TopicMetadata,
cityRawTopic: TopicMetadata,
airplaneRawTopic: TopicMetadata,
airplaneRegistrationNumberRawTopic: TopicMetadata,
airplaneIataCodeRawTopic: TopicMetadata,
airportInfoTopic: TopicMetadata,
flightOpenSkyRawTopic: TopicMetadata,
enhancedFlightRawTopic: TopicMetadata,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ object FlightReceivedStream {
implicit val airportInfoSerde: Serde[AirportInfo] = kafkaStreamsOptions.airportInfoSerde
implicit val flightReceivedEventSerde: Serde[FlightReceived] = kafkaStreamsOptions.flightReceivedEventSerde

val streamsBuilder = new StreamsBuilder
val flightRawStream = streamsBuilder.stream[String, FlightRaw](config.kafka.topology.enhancedFlightRawTopic.name)
val airportRawStream = streamsBuilder.stream[String, AirportRaw](config.kafka.topology.airportRawTopic.name)
val airlineRawTable = streamsBuilder.globalTable[String, AirlineRaw](config.kafka.topology.airlineRawTopic.name)
val airplaneRawTable = streamsBuilder.globalTable[String, AirplaneRaw](config.kafka.topology.airplaneRawTopic.name)
val cityRawTable = streamsBuilder.globalTable[String, CityRaw](config.kafka.topology.cityRawTopic.name)
val streamsBuilder = new StreamsBuilder
val flightRawStream = streamsBuilder.stream[String, FlightRaw](config.kafka.topology.enhancedFlightRawTopic.name)
val airportRawStream = streamsBuilder.stream[String, AirportRaw](config.kafka.topology.airportRawTopic.name)
val airplaneRawStream = streamsBuilder.stream[String, AirplaneRaw](config.kafka.topology.airplaneRawTopic.name)
val airlineRawTable = streamsBuilder.globalTable[String, AirlineRaw](config.kafka.topology.airlineRawTopic.name)
val cityRawTable = streamsBuilder.globalTable[String, CityRaw](config.kafka.topology.cityRawTopic.name)

airportRawStream
.leftJoin(cityRawTable)((_, ar) => ar.cityIataCode, airportRaw2AirportInfo)
Expand All @@ -40,14 +40,26 @@ object FlightReceivedStream {
val airportInfoTable: GlobalKTable[String, AirportInfo] =
streamsBuilder.globalTable[String, AirportInfo](config.kafka.topology.airportInfoTopic.name)

airplaneRawStream
.to(config.kafka.topology.airplaneRegistrationNumberRawTopic.name)

airplaneRawStream
.selectKey((_, v) => v.iataCode)
.to(config.kafka.topology.airplaneIataCodeRawTopic.name)

val airplaneRegNumberRawTable =
streamsBuilder.globalTable[String, AirplaneRaw](config.kafka.topology.airplaneRegistrationNumberRawTopic.name)
val airplaneIataCodeRawTable =
streamsBuilder.globalTable[String, AirplaneRaw](config.kafka.topology.airplaneIataCodeRawTopic.name)

val flightJoinAirport: KStream[String, FlightWithAllAirportInfo] =
flightRawToAirportEnrichment(flightRawStream, airportInfoTable)

val flightAirportAirline: KStream[String, FlightWithAirline] =
flightWithAirportToAirlineEnrichment(flightJoinAirport, airlineRawTable)

val flightAirportAirlineAirplane: KStream[String, FlightReceived] =
flightWithAirportAndAirlineToAirplaneEnrichment(flightAirportAirline, airplaneRawTable)
flightWithAirportAndAirlineToAirplaneEnrichment(flightAirportAirline, airplaneRegNumberRawTable, airplaneIataCodeRawTable)

flightAirportAirlineAirplane.to(config.kafka.topology.flightReceivedTopic.name)

Expand Down Expand Up @@ -93,12 +105,19 @@ object FlightReceivedStream {

private def flightWithAirportAndAirlineToAirplaneEnrichment(
flightWithAirline: KStream[String, FlightWithAirline],
airplaneRawTable: GlobalKTable[String, AirplaneRaw]
airplaneRegNumberRawTable: GlobalKTable[String, AirplaneRaw],
airplaneIataCodeRawTable: GlobalKTable[String, AirplaneRaw]
): KStream[String, FlightReceived] =
flightWithAirline
.leftJoin(airplaneRawTable)(
.leftJoin(airplaneRegNumberRawTable)(
(_, v) => v.airplaneRegNumber,
flightWithAirline2FlightReceived
Tuple2.apply
)
.leftJoin(airplaneIataCodeRawTable)(
(_, v) => v._1.iataNumber, {
case ((flight, airplane), otherAirplane) =>
flightWithAirline2FlightReceived(flight, Option(airplane).getOrElse(otherAirplane))
}
)

private def flightRaw2FlightWithDepartureAirportInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,89 @@ class FlightReceivedStreamSpec extends Suite with AnyWordSpecLike with EmbeddedK
)
}

"be joined successfully with airplane info from iata code if no airplane info on registration number is found" in ResourceLoaner.withFixture {
case Resource(embeddedKafkaConfig, appConfig, kafkaStreamsOptions, topologies) =>
implicit val embKafkaConfig: EmbeddedKafkaConfig = embeddedKafkaConfig
implicit val keySerde: Serde[String] = kafkaStreamsOptions.stringKeySerde

val zurich = "Zurich"
val milan = "Milan"
val cityIataCode1 = "CIC1"
val cityIataCode2 = "CIC2"
val registrationNumber = "anotherRegistrationNumber"

val (airportsInfo, receivedRecords) =
ResourceLoaner.runAll(
topologies(FlightReceivedTopology),
List(
appConfig.kafka.topology.flightRawTopic.name,
appConfig.kafka.topology.airplaneRawTopic.name,
appConfig.kafka.topology.enhancedFlightRawTopic.name,
appConfig.kafka.topology.airlineRawTopic.name
)
) { _ =>
publishToKafka(
appConfig.kafka.topology.airportRawTopic.name,
List(
AirportEvent1.iataCode -> AirportEvent1.copy(cityIataCode = cityIataCode1),
AirportEvent2.iataCode -> AirportEvent2.copy(cityIataCode = cityIataCode2)
)
)
publishToKafka(
appConfig.kafka.topology.cityRawTopic.name,
List(
cityIataCode1 -> CityRaw(1L, zurich, cityIataCode1, "", 0d, 0d),
cityIataCode2 -> CityRaw(2L, milan, cityIataCode2, "", 0d, 0d)
)
)
publishToKafka(appConfig.kafka.topology.airlineRawTopic.name, AirlineEvent1.icaoCode, AirlineEvent1)
publishToKafka(
appConfig.kafka.topology.airplaneRawTopic.name,
registrationNumber,
AirplaneEvent.copy(iataCode = FlightIataCode, registrationNumber = registrationNumber)
)

val airportInfoMap = consumeNumberKeyedMessagesFromTopics[String, AirportInfo](
topics = Set(appConfig.kafka.topology.airportInfoTopic.name),
number = 2,
timeout = ConsumerPollTimeout
)

publishToKafka(appConfig.kafka.topology.enhancedFlightRawTopic.name, FlightRawEvent.flight.icaoNumber, FlightRawEvent)

val messagesMap = consumeNumberKeyedMessagesFromTopics[String, FlightReceived](
topics = Set(appConfig.kafka.topology.flightReceivedTopic.name),
number = 1,
timeout = ConsumerPollTimeout
)

(
airportInfoMap(appConfig.kafka.topology.airportInfoTopic.name),
messagesMap(appConfig.kafka.topology.flightReceivedTopic.name).head
)

}

val departureAirportInfo = FlightReceivedEvent.departureAirport.copy(city = zurich)
val arrivalAirportInfo = FlightReceivedEvent.arrivalAirport.copy(city = milan)

airportsInfo should contain theSameElementsAs List(
AirportEvent1.iataCode -> departureAirportInfo,
AirportEvent2.iataCode -> arrivalAirportInfo
)

receivedRecords shouldBe (
(
FlightReceivedEvent.icaoNumber,
FlightReceivedEvent.copy(
departureAirport = departureAirportInfo,
arrivalAirport = arrivalAirportInfo,
airplane = FlightReceivedEvent.airplane.copy(registrationNumber = registrationNumber)
)
)
)
}

"be joined successfully with default airplane info and city info" in ResourceLoaner.withFixture {
case Resource(embeddedKafkaConfig, appConfig, kafkaStreamsOptions, topologies) =>
implicit val embKafkaConfig: EmbeddedKafkaConfig = embeddedKafkaConfig
Expand Down

0 comments on commit 0629dfa

Please sign in to comment.