diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 38dfd1d488..f1973c1216 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -279,32 +279,55 @@ Launching a full production Kafka cluster is outside the scope of this project. docker-compose up -d --scale kafka=3 ``` In practice, 3 instances has been shown to work well. Use as many instances as required. Keep in mind each instance takes about 1 Gb of memory. -7. Create the topic: +7. Launch the Kafka shell + 1. To configure the cluster, you will need to launch into a container that has the Kafka shell. + 2. You can do this with `./start-kafka-shell.sh $KAFKA_ADVERTISED_HOST_NAME`. + 3. However, this makes it difficult to load data into the cluster. Instead, you can manually launch the Kafka shell by running: + ```bash + # Change to the morpheus root to make it easier for mounting volumes + cd ${MORPHEUS_HOME} + + # Run the Kafka shell docker container + docker run --rm -it -v /var/run/docker.sock:/var/run/docker.sock \ + -e HOST_IP=$KAFKA_ADVERTISED_HOST_NAME -e ZK=$2 \ + -v $PWD:/workspace wurstmeister/kafka /bin/bash + ``` + Note the `-v $PWD:/workspace`. This will make anything in your current directory available in `/workspace`. + 4. Once the Kafka shell has been launched, you can begin configuring the cluster. All of the following commands require the argument `--bootstrap-server`. To simplify things, set the `BOOTSTRAP_SERVER` and `MY_TOPIC` variables: + ```bash + export BOOTSTRAP_SERVER=$(broker-list.sh) + export MY_TOPIC="your_topic_here" + ``` +8. Create the topic ```bash - ./start-kafka-shell.sh $KAFKA_ADVERTISED_HOST_NAME - $KAFKA_HOME/bin/kafka-topics.sh --create --topic=$MY_INPUT_TOPIC_NAME --bootstrap-server `broker-list.sh` + # Create the topic + kafka-topics.sh --bootstrap-server ${BOOTSTRAP_SERVER} --create --topic ${MY_TOPIC} + + # Change the number of partitions + kafka-topics.sh --bootstrap-server ${BOOTSTRAP_SERVER} --alter --topic ${MY_TOPIC} --partitions 3 + + # See the topic info + kafka-topics.sh --bootstrap-server ${BOOTSTRAP_SERVER} --describe --topic=${MY_TOPIC} ``` - Replace `` with the input name of your choice. If you are using `to-kafka`, ensure your output topic is also created. - -8. Generate input messages - 1. In order for Morpheus to read from Kafka, messages need to be published to the cluster. For debugging/testing purposes, the following container can be used: - - ```bash - # Download from https://netq-shared.s3-us-west-2.amazonaws.com/kafka-producer.tar.gz - wget https://netq-shared.s3-us-west-2.amazonaws.com/kafka-producer.tar.gz - # Load container - docker load --input kafka-producer.tar.gz - # Run the producer container - docker run --rm -it -e KAFKA_BROKER_SERVERS=$(broker-list.sh) -e INPUT_FILE_NAME=$MY_INPUT_FILE -e TOPIC_NAME=$MY_INPUT_TOPIC_NAME --mount src="$PWD,target=/app/data/,type=bind" kafka-producer:1 - ``` - In order for this to work, your input file must be accessible from `$PWD`. + **Note:** If you are using `to-kafka`, ensure your output topic is also created. + +9. Generate input messages + 1. In order for Morpheus to read from Kafka, messages need to be published to the cluster. You can use the `kafka-console-producer.sh` script to load data: + + ```bash + kafka-console-producer.sh --bootstrap-server ${BOOTSTRAP_SERVER} --topic ${MY_TOPIC} < ${FILE_TO_LOAD} + ``` + + **Note:** In order for this to work, your input file must be accessible from the current directory the Kafka shell was launched from. + 2. You can view the messages with: - ```bash - ./start-kafka-shell.sh $KAFKA_ADVERTISED_HOST_NAME - $KAFKA_HOME/bin/kafka-console-consumer.sh --topic=$MY_TOPIC --bootstrap-server `broker-list.sh` - ``` + ```bash + kafka-console-consumer.sh --bootstrap-server ${BOOTSTRAP_SERVER} --topic ${MY_TOPIC} + ``` + + **Note:** This will consume messages. ### Launching Triton Server diff --git a/morpheus/_lib/src/stages/kafka_source.cpp b/morpheus/_lib/src/stages/kafka_source.cpp index 3caf6e3887..4227701e44 100644 --- a/morpheus/_lib/src/stages/kafka_source.cpp +++ b/morpheus/_lib/src/stages/kafka_source.cpp @@ -44,8 +44,8 @@ #include #include #include +#include #include -#include "srf/runnable/forward.hpp" #define CHECK_KAFKA(command, expected, msg) \ { \ @@ -121,11 +121,16 @@ class KafkaSourceStage__Rebalancer : public RdKafka::RebalanceCb // Update now now = std::chrono::high_resolution_clock::now(); - } while (msg_count < m_max_batch_size_fn() && now < batch_end && m_is_rebalanced); + } while (msg_count < m_max_batch_size_fn() && now < batch_end); return std::move(messages); } + bool process_messages(std::vector> &messages) + { + return m_process_fn(messages); + } + private: bool m_is_rebalanced{false}; @@ -155,9 +160,18 @@ void KafkaSourceStage__Rebalancer::rebalance_cb(RdKafka::KafkaConsumer *consumer { std::unique_lock lock(m_mutex); + std::vector current_assignment; + CHECK_KAFKA(consumer->assignment(current_assignment), RdKafka::ERR_NO_ERROR, "Error retrieving current assignment"); + + auto old_partition_ids = foreach_map(current_assignment, [](const auto &x) { return x->partition(); }); + auto new_partition_ids = foreach_map(partitions, [](const auto &x) { return x->partition(); }); + if (err == RdKafka::ERR__ASSIGN_PARTITIONS) { - VLOG(10) << m_display_str_fn("Rebalance: Assign Partitions"); + VLOG(10) << m_display_str_fn(MORPHEUS_CONCAT_STR( + "Rebalance: Assign Partitions. Current Partitions: << " + << StringUtil::array_to_str(old_partition_ids.begin(), old_partition_ids.end()) + << ". Assigning: " << StringUtil::array_to_str(new_partition_ids.begin(), new_partition_ids.end()))); // application may load offets from arbitrary external storage here and update \p partitions if (consumer->rebalance_protocol() == "COOPERATIVE") @@ -173,7 +187,10 @@ void KafkaSourceStage__Rebalancer::rebalance_cb(RdKafka::KafkaConsumer *consumer } else if (err == RdKafka::ERR__REVOKE_PARTITIONS) { - VLOG(10) << m_display_str_fn("Rebalance: Revoke Partitions"); + VLOG(10) << m_display_str_fn(MORPHEUS_CONCAT_STR( + "Rebalance: Revoke Partitions. Current Partitions: << " + << StringUtil::array_to_str(old_partition_ids.begin(), old_partition_ids.end()) + << ". Revoking: " << StringUtil::array_to_str(new_partition_ids.begin(), new_partition_ids.end()))); // Application may commit offsets manually here if auto.commit.enable=false if (consumer->rebalance_protocol() == "COOPERATIVE") @@ -292,19 +309,13 @@ KafkaSourceStage::subscriber_fn_t KafkaSourceStage::build() std::vector> message_batch = rebalancer.partition_progress_step(consumer.get()); - std::shared_ptr batch; + // Process the messages. Returns true if we need to commit + auto should_commit = rebalancer.process_messages(message_batch); - try + if (should_commit) { - batch = std::move(this->process_batch(std::move(message_batch))); - } catch (std::exception &ex) - { - LOG(ERROR) << "Exception in process_batch. Msg: " << ex.what(); - - break; + CHECK_KAFKA(consumer->commitAsync(), RdKafka::ERR_NO_ERROR, "Error during commitAsync"); } - - sub.on_next(std::move(batch)); } } catch (std::exception &ex) @@ -418,20 +429,6 @@ std::unique_ptr KafkaSourceStage::create_consumer(RdKafk throw std::runtime_error("Failed to list_topics in Kafka broker after 5 attempts"); } - if (md == nullptr) - { - CHECK_KAFKA(consumer->metadata(spec_topic == nullptr, spec_topic.get(), &md, 1000), - RdKafka::ERR_NO_ERROR, - "Failed to list_topics in Kafka broker"); - } - - if (md == nullptr) - { - CHECK_KAFKA(consumer->metadata(spec_topic == nullptr, spec_topic.get(), &md, 1000), - RdKafka::ERR_NO_ERROR, - "Failed to list_topics in Kafka broker"); - } - std::map> topic_parts; auto &ctx = srf::runnable::Context::get_runtime_context(); @@ -467,14 +464,29 @@ std::unique_ptr KafkaSourceStage::create_consumer(RdKafk auto positions = foreach_map(toppar_list, [](const std::unique_ptr &x) { return x->offset(); }); + auto watermarks = foreach_map(toppar_list, [&consumer](const std::unique_ptr &x) { + int64_t low; + int64_t high; + CHECK_KAFKA(consumer->query_watermark_offsets(x->topic(), x->partition(), &low, &high, 1000), + RdKafka::ERR_NO_ERROR, + "Failed retrieve Kafka watermark offsets"); + + return std::make_tuple(low, high); + }); + + auto watermark_strs = foreach_map(watermarks, [](const auto &x) { + return MORPHEUS_CONCAT_STR("(" << std::get<0>(x) << ", " << std::get<1>(x) << ")"); + }); + auto &ctx = srf::runnable::Context::get_runtime_context(); VLOG(10) << ctx.info() << MORPHEUS_CONCAT_STR( " Topic: '" << topic->topic() << "', Parts: " << StringUtil::array_to_str(part_ids.begin(), part_ids.end()) << ", Committed: " << StringUtil::array_to_str(committed.begin(), committed.end()) - << ", Positions: " - << StringUtil::array_to_str(positions.begin(), positions.end())); + << ", Positions: " << StringUtil::array_to_str(positions.begin(), positions.end()) + << ", Watermarks: " + << StringUtil::array_to_str(watermark_strs.begin(), watermark_strs.end())); } return std::move(consumer); diff --git a/morpheus/cli.py b/morpheus/cli.py index 9ead0d6d4c..98315d3ef0 100644 --- a/morpheus/cli.py +++ b/morpheus/cli.py @@ -355,17 +355,6 @@ def run(ctx: click.Context, **kwargs): pass -def validate_rolls(ctx, param, value): - if isinstance(value, tuple): - return value - - try: - rolls, _, dice = value.partition("d") - return int(dice), int(rolls) - except ValueError: - raise click.BadParameter("format must be 'NdM'") - - @click.group(chain=True, short_help="Run the inference pipeline with a NLP model", no_args_is_help=True, @@ -671,6 +660,11 @@ def from_file(ctx: click.Context, **kwargs): is_flag=True, help=("Enabling this option will skip pre-filtering of json messages. " "This is only useful when inputs are known to be valid json.")) +@click.option("--auto_offset_reset", + type=click.Choice(["earliest", "latest", "none"], case_sensitive=False), + default="latest", + help=("Sets the value for the configuration option 'auto.offset.reset'. " + "See the kafka documentation for more information on the effects of each value.")) @prepare_command() def from_kafka(ctx: click.Context, **kwargs): diff --git a/morpheus/stages/input/kafka_source_stage.py b/morpheus/stages/input/kafka_source_stage.py index 76f3814f96..bd951be703 100644 --- a/morpheus/stages/input/kafka_source_stage.py +++ b/morpheus/stages/input/kafka_source_stage.py @@ -50,6 +50,12 @@ class KafkaSourceStage(SingleOutputSource): disable_commit: bool, default = False Enabling this option will skip committing messages as they are pulled off the server. This is only useful for debugging, allowing the user to process the same messages multiple times. + disable_pre_filtering : bool, default = False + Enabling this option will skip pre-filtering of json messages. This is only useful when inputs are known to be + valid json. + auto_offset_reset : str, default = "latest" + Sets the value for the configuration option 'auto.offset.reset'. See the kafka documentation for more + information on the effects of each value." """ def __init__(self, @@ -59,11 +65,15 @@ def __init__(self, group_id: str = "custreamz", poll_interval: str = "10millis", disable_commit: bool = False, - disable_pre_filtering: bool = False): + disable_pre_filtering: bool = False, + auto_offset_reset: str = "latest"): super().__init__(c) self._consumer_conf = { - 'bootstrap.servers': bootstrap_servers, 'group.id': group_id, 'session.timeout.ms': "60000" + 'bootstrap.servers': bootstrap_servers, + 'group.id': group_id, + 'session.timeout.ms': "60000", + "auto.offset.reset": auto_offset_reset } self._input_topic = input_topic @@ -88,7 +98,7 @@ def __init__(self, # Override the auto-commit config to enforce custom streamz checkpointing self._consumer_params['enable.auto.commit'] = 'false' if 'auto.offset.reset' not in self._consumer_params.keys(): - self._consumer_params['auto.offset.reset'] = 'latest' + self._consumer_params['auto.offset.reset'] = 'earliest' self._topic = topic self._npartitions = npartitions self._refresh_partitions = refresh_partitions @@ -107,7 +117,7 @@ def name(self) -> str: def supports_cpp_node(self): return True - def _source_generator(self, sub: srf.Subscriber): + def _source_generator(self): # Each invocation of this function makes a new thread so recreate the producers # Set some initial values @@ -181,7 +191,7 @@ def _source_generator(self, sub: srf.Subscriber): for partition in range(npartitions): tps.append(ck.TopicPartition(self._topic, partition)) - while sub.is_subscribed(): + while True: try: committed = consumer.committed(tps, timeout=1) except ck.KafkaException: @@ -191,7 +201,7 @@ def _source_generator(self, sub: srf.Subscriber): positions[tp.partition] = tp.offset break - while sub.is_subscribed(): + while True: out = [] if self._refresh_partitions: @@ -248,10 +258,11 @@ def commit(topic, part_no, keys, lowest, offset): self._consumer_conf["bootstrap.servers"], offset) - weakref.finalize(meta, commit, *part[1:]) + if (not self._disable_commit): + weakref.finalize(meta, commit, *part[1:]) # Push the message meta - sub.on_next(meta) + yield meta else: time.sleep(self._poll_interval) except Exception: @@ -263,7 +274,6 @@ def commit(topic, part_no, keys, lowest, offset): # Close the consumer and call on_completed if (consumer): consumer.close() - sub.on_completed() def _kafka_params_to_messagemeta(self, x: tuple): @@ -292,13 +302,11 @@ def _read_gdf(kafka_configs, raise ValueError("ERROR: You must specifiy the topic " "that you want to consume from") - kafka_confs = {str.encode(key): str.encode(value) for key, value in kafka_configs.items()} - kafka_datasource = None try: kafka_datasource = KafkaDatasource( - kafka_confs, + kafka_configs, topic.encode(), partition, start, @@ -317,7 +325,7 @@ def _read_gdf(kafka_configs, result = cudf_readers[message_format](kafka_datasource, engine="cudf", lines=lines) - return cudf.DataFrame(data=result._data, index=result._index) + return cudf.DataFrame._from_data(data=result._data, index=result._index) except Exception: logger.exception("Error occurred converting KafkaDatasource to Dataframe.") finally: