Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release 1.2.1 #166

Merged
merged 3 commits into from
Mar 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ kafka {
name = "airplane_raw"
partitions = 1
}
airplane-registration-number-raw-topic {
name = "airplane_registration_number_raw"
partitions = 1
}
airplane-iata-code-raw-topic {
name = "airplane_iata_code_raw"
partitions = 1
}
flight-open-sky-raw-topic {
name = "flight_opensky_raw"
partitions = 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ final case class TopologyConfig(
airlineRawTopic: TopicMetadata,
cityRawTopic: TopicMetadata,
airplaneRawTopic: TopicMetadata,
airplaneRegistrationNumberRawTopic: TopicMetadata,
airplaneIataCodeRawTopic: TopicMetadata,
airportInfoTopic: TopicMetadata,
flightOpenSkyRawTopic: TopicMetadata,
enhancedFlightRawTopic: TopicMetadata,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ object FlightReceivedStream {
implicit val airportInfoSerde: Serde[AirportInfo] = kafkaStreamsOptions.airportInfoSerde
implicit val flightReceivedEventSerde: Serde[FlightReceived] = kafkaStreamsOptions.flightReceivedEventSerde

val streamsBuilder = new StreamsBuilder
val flightRawStream = streamsBuilder.stream[String, FlightRaw](config.kafka.topology.enhancedFlightRawTopic.name)
val airportRawStream = streamsBuilder.stream[String, AirportRaw](config.kafka.topology.airportRawTopic.name)
val airlineRawTable = streamsBuilder.globalTable[String, AirlineRaw](config.kafka.topology.airlineRawTopic.name)
val airplaneRawTable = streamsBuilder.globalTable[String, AirplaneRaw](config.kafka.topology.airplaneRawTopic.name)
val cityRawTable = streamsBuilder.globalTable[String, CityRaw](config.kafka.topology.cityRawTopic.name)
val streamsBuilder = new StreamsBuilder
val flightRawStream = streamsBuilder.stream[String, FlightRaw](config.kafka.topology.enhancedFlightRawTopic.name)
val airportRawStream = streamsBuilder.stream[String, AirportRaw](config.kafka.topology.airportRawTopic.name)
val airplaneRawStream = streamsBuilder.stream[String, AirplaneRaw](config.kafka.topology.airplaneRawTopic.name)
val airlineRawTable = streamsBuilder.globalTable[String, AirlineRaw](config.kafka.topology.airlineRawTopic.name)
val cityRawTable = streamsBuilder.globalTable[String, CityRaw](config.kafka.topology.cityRawTopic.name)

airportRawStream
.leftJoin(cityRawTable)((_, ar) => ar.cityIataCode, airportRaw2AirportInfo)
Expand All @@ -40,14 +40,26 @@ object FlightReceivedStream {
val airportInfoTable: GlobalKTable[String, AirportInfo] =
streamsBuilder.globalTable[String, AirportInfo](config.kafka.topology.airportInfoTopic.name)

airplaneRawStream
.to(config.kafka.topology.airplaneRegistrationNumberRawTopic.name)

airplaneRawStream
.selectKey((_, v) => v.iataCode)
.to(config.kafka.topology.airplaneIataCodeRawTopic.name)

val airplaneRegNumberRawTable =
streamsBuilder.globalTable[String, AirplaneRaw](config.kafka.topology.airplaneRegistrationNumberRawTopic.name)
val airplaneIataCodeRawTable =
streamsBuilder.globalTable[String, AirplaneRaw](config.kafka.topology.airplaneIataCodeRawTopic.name)

val flightJoinAirport: KStream[String, FlightWithAllAirportInfo] =
flightRawToAirportEnrichment(flightRawStream, airportInfoTable)

val flightAirportAirline: KStream[String, FlightWithAirline] =
flightWithAirportToAirlineEnrichment(flightJoinAirport, airlineRawTable)

val flightAirportAirlineAirplane: KStream[String, FlightReceived] =
flightWithAirportAndAirlineToAirplaneEnrichment(flightAirportAirline, airplaneRawTable)
flightWithAirportAndAirlineToAirplaneEnrichment(flightAirportAirline, airplaneRegNumberRawTable, airplaneIataCodeRawTable)

flightAirportAirlineAirplane.to(config.kafka.topology.flightReceivedTopic.name)

Expand Down Expand Up @@ -93,12 +105,19 @@ object FlightReceivedStream {

private def flightWithAirportAndAirlineToAirplaneEnrichment(
flightWithAirline: KStream[String, FlightWithAirline],
airplaneRawTable: GlobalKTable[String, AirplaneRaw]
airplaneRegNumberRawTable: GlobalKTable[String, AirplaneRaw],
airplaneIataCodeRawTable: GlobalKTable[String, AirplaneRaw]
): KStream[String, FlightReceived] =
flightWithAirline
.leftJoin(airplaneRawTable)(
.leftJoin(airplaneRegNumberRawTable)(
(_, v) => v.airplaneRegNumber,
flightWithAirline2FlightReceived
Tuple2.apply
)
.leftJoin(airplaneIataCodeRawTable)(
(_, v) => v._1.iataNumber, {
case ((flight, airplane), otherAirplane) =>
flightWithAirline2FlightReceived(flight, Option(airplane).getOrElse(otherAirplane))
}
)

private def flightRaw2FlightWithDepartureAirportInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,89 @@ class FlightReceivedStreamSpec extends Suite with AnyWordSpecLike with EmbeddedK
)
}

"be joined successfully with airplane info from iata code if no airplane info on registration number is found" in ResourceLoaner.withFixture {
case Resource(embeddedKafkaConfig, appConfig, kafkaStreamsOptions, topologies) =>
implicit val embKafkaConfig: EmbeddedKafkaConfig = embeddedKafkaConfig
implicit val keySerde: Serde[String] = kafkaStreamsOptions.stringKeySerde

val zurich = "Zurich"
val milan = "Milan"
val cityIataCode1 = "CIC1"
val cityIataCode2 = "CIC2"
val registrationNumber = "anotherRegistrationNumber"

val (airportsInfo, receivedRecords) =
ResourceLoaner.runAll(
topologies(FlightReceivedTopology),
List(
appConfig.kafka.topology.flightRawTopic.name,
appConfig.kafka.topology.airplaneRawTopic.name,
appConfig.kafka.topology.enhancedFlightRawTopic.name,
appConfig.kafka.topology.airlineRawTopic.name
)
) { _ =>
publishToKafka(
appConfig.kafka.topology.airportRawTopic.name,
List(
AirportEvent1.iataCode -> AirportEvent1.copy(cityIataCode = cityIataCode1),
AirportEvent2.iataCode -> AirportEvent2.copy(cityIataCode = cityIataCode2)
)
)
publishToKafka(
appConfig.kafka.topology.cityRawTopic.name,
List(
cityIataCode1 -> CityRaw(1L, zurich, cityIataCode1, "", 0d, 0d),
cityIataCode2 -> CityRaw(2L, milan, cityIataCode2, "", 0d, 0d)
)
)
publishToKafka(appConfig.kafka.topology.airlineRawTopic.name, AirlineEvent1.icaoCode, AirlineEvent1)
publishToKafka(
appConfig.kafka.topology.airplaneRawTopic.name,
registrationNumber,
AirplaneEvent.copy(iataCode = FlightIataCode, registrationNumber = registrationNumber)
)

val airportInfoMap = consumeNumberKeyedMessagesFromTopics[String, AirportInfo](
topics = Set(appConfig.kafka.topology.airportInfoTopic.name),
number = 2,
timeout = ConsumerPollTimeout
)

publishToKafka(appConfig.kafka.topology.enhancedFlightRawTopic.name, FlightRawEvent.flight.icaoNumber, FlightRawEvent)

val messagesMap = consumeNumberKeyedMessagesFromTopics[String, FlightReceived](
topics = Set(appConfig.kafka.topology.flightReceivedTopic.name),
number = 1,
timeout = ConsumerPollTimeout
)

(
airportInfoMap(appConfig.kafka.topology.airportInfoTopic.name),
messagesMap(appConfig.kafka.topology.flightReceivedTopic.name).head
)

}

val departureAirportInfo = FlightReceivedEvent.departureAirport.copy(city = zurich)
val arrivalAirportInfo = FlightReceivedEvent.arrivalAirport.copy(city = milan)

airportsInfo should contain theSameElementsAs List(
AirportEvent1.iataCode -> departureAirportInfo,
AirportEvent2.iataCode -> arrivalAirportInfo
)

receivedRecords shouldBe (
(
FlightReceivedEvent.icaoNumber,
FlightReceivedEvent.copy(
departureAirport = departureAirportInfo,
arrivalAirport = arrivalAirportInfo,
airplane = FlightReceivedEvent.airplane.copy(registrationNumber = registrationNumber)
)
)
)
}

"be joined successfully with default airplane info and city info" in ResourceLoaner.withFixture {
case Resource(embeddedKafkaConfig, appConfig, kafkaStreamsOptions, topologies) =>
implicit val embKafkaConfig: EmbeddedKafkaConfig = embeddedKafkaConfig
Expand Down
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version in ThisBuild := "1.2.0-SNAPSHOT"
version in ThisBuild := "1.2.1-SNAPSHOT"