Skip to content

Commit

Permalink
Fixing the from-kafka stage (nv-morpheus#257)
Browse files Browse the repository at this point in the history
Fixes nv-morpheus#254 

This re-enables the check on the number of messages received before trying to process to cudf.

Authors:
  - Michael Demoret (https://github.com/mdemoret-nv)

Approvers:
  - David Gardner (https://github.com/dagardner-nv)
  - Pete MacKinnon (https://github.com/pdmack)

URL: nv-morpheus#257
  • Loading branch information
mdemoret-nv authored Jul 5, 2022
1 parent 0030a1d commit cb12889
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 75 deletions.
65 changes: 44 additions & 21 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -279,32 +279,55 @@ Launching a full production Kafka cluster is outside the scope of this project.
docker-compose up -d --scale kafka=3
```
In practice, 3 instances has been shown to work well. Use as many instances as required. Keep in mind each instance takes about 1 Gb of memory.
7. Create the topic:
7. Launch the Kafka shell
1. To configure the cluster, you will need to launch into a container that has the Kafka shell.
2. You can do this with `./start-kafka-shell.sh $KAFKA_ADVERTISED_HOST_NAME`.
3. However, this makes it difficult to load data into the cluster. Instead, you can manually launch the Kafka shell by running:
```bash
# Change to the morpheus root to make it easier for mounting volumes
cd ${MORPHEUS_HOME}
# Run the Kafka shell docker container
docker run --rm -it -v /var/run/docker.sock:/var/run/docker.sock \
-e HOST_IP=$KAFKA_ADVERTISED_HOST_NAME -e ZK=$2 \
-v $PWD:/workspace wurstmeister/kafka /bin/bash
```
Note the `-v $PWD:/workspace`. This will make anything in your current directory available in `/workspace`.
4. Once the Kafka shell has been launched, you can begin configuring the cluster. All of the following commands require the argument `--bootstrap-server`. To simplify things, set the `BOOTSTRAP_SERVER` and `MY_TOPIC` variables:
```bash
export BOOTSTRAP_SERVER=$(broker-list.sh)
export MY_TOPIC="your_topic_here"
```
8. Create the topic

```bash
./start-kafka-shell.sh $KAFKA_ADVERTISED_HOST_NAME
$KAFKA_HOME/bin/kafka-topics.sh --create --topic=$MY_INPUT_TOPIC_NAME --bootstrap-server `broker-list.sh`
# Create the topic
kafka-topics.sh --bootstrap-server ${BOOTSTRAP_SERVER} --create --topic ${MY_TOPIC}
# Change the number of partitions
kafka-topics.sh --bootstrap-server ${BOOTSTRAP_SERVER} --alter --topic ${MY_TOPIC} --partitions 3
# See the topic info
kafka-topics.sh --bootstrap-server ${BOOTSTRAP_SERVER} --describe --topic=${MY_TOPIC}
```
Replace `<INPUT_TOPIC_NAME>` with the input name of your choice. If you are using `to-kafka`, ensure your output topic is also created.

8. Generate input messages
1. In order for Morpheus to read from Kafka, messages need to be published to the cluster. For debugging/testing purposes, the following container can be used:

```bash
# Download from https://netq-shared.s3-us-west-2.amazonaws.com/kafka-producer.tar.gz
wget https://netq-shared.s3-us-west-2.amazonaws.com/kafka-producer.tar.gz
# Load container
docker load --input kafka-producer.tar.gz
# Run the producer container
docker run --rm -it -e KAFKA_BROKER_SERVERS=$(broker-list.sh) -e INPUT_FILE_NAME=$MY_INPUT_FILE -e TOPIC_NAME=$MY_INPUT_TOPIC_NAME --mount src="$PWD,target=/app/data/,type=bind" kafka-producer:1
```
In order for this to work, your input file must be accessible from `$PWD`.
**Note:** If you are using `to-kafka`, ensure your output topic is also created.

9. Generate input messages
1. In order for Morpheus to read from Kafka, messages need to be published to the cluster. You can use the `kafka-console-producer.sh` script to load data:

```bash
kafka-console-producer.sh --bootstrap-server ${BOOTSTRAP_SERVER} --topic ${MY_TOPIC} < ${FILE_TO_LOAD}
```

**Note:** In order for this to work, your input file must be accessible from the current directory the Kafka shell was launched from.

2. You can view the messages with:

```bash
./start-kafka-shell.sh $KAFKA_ADVERTISED_HOST_NAME
$KAFKA_HOME/bin/kafka-console-consumer.sh --topic=$MY_TOPIC --bootstrap-server `broker-list.sh`
```
```bash
kafka-console-consumer.sh --bootstrap-server ${BOOTSTRAP_SERVER} --topic ${MY_TOPIC}
```

**Note:** This will consume messages.

### Launching Triton Server

Expand Down
72 changes: 42 additions & 30 deletions morpheus/_lib/src/stages/kafka_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@
#include <mutex>
#include <sstream>
#include <stdexcept>
#include <tuple>
#include <utility>
#include "srf/runnable/forward.hpp"

#define CHECK_KAFKA(command, expected, msg) \
{ \
Expand Down Expand Up @@ -121,11 +121,16 @@ class KafkaSourceStage__Rebalancer : public RdKafka::RebalanceCb

// Update now
now = std::chrono::high_resolution_clock::now();
} while (msg_count < m_max_batch_size_fn() && now < batch_end && m_is_rebalanced);
} while (msg_count < m_max_batch_size_fn() && now < batch_end);

return std::move(messages);
}

bool process_messages(std::vector<std::unique_ptr<RdKafka::Message>> &messages)
{
return m_process_fn(messages);
}

private:
bool m_is_rebalanced{false};

Expand Down Expand Up @@ -155,9 +160,18 @@ void KafkaSourceStage__Rebalancer::rebalance_cb(RdKafka::KafkaConsumer *consumer
{
std::unique_lock<boost::fibers::recursive_mutex> lock(m_mutex);

std::vector<RdKafka::TopicPartition *> current_assignment;
CHECK_KAFKA(consumer->assignment(current_assignment), RdKafka::ERR_NO_ERROR, "Error retrieving current assignment");

auto old_partition_ids = foreach_map(current_assignment, [](const auto &x) { return x->partition(); });
auto new_partition_ids = foreach_map(partitions, [](const auto &x) { return x->partition(); });

if (err == RdKafka::ERR__ASSIGN_PARTITIONS)
{
VLOG(10) << m_display_str_fn("Rebalance: Assign Partitions");
VLOG(10) << m_display_str_fn(MORPHEUS_CONCAT_STR(
"Rebalance: Assign Partitions. Current Partitions: << "
<< StringUtil::array_to_str(old_partition_ids.begin(), old_partition_ids.end())
<< ". Assigning: " << StringUtil::array_to_str(new_partition_ids.begin(), new_partition_ids.end())));

// application may load offets from arbitrary external storage here and update \p partitions
if (consumer->rebalance_protocol() == "COOPERATIVE")
Expand All @@ -173,7 +187,10 @@ void KafkaSourceStage__Rebalancer::rebalance_cb(RdKafka::KafkaConsumer *consumer
}
else if (err == RdKafka::ERR__REVOKE_PARTITIONS)
{
VLOG(10) << m_display_str_fn("Rebalance: Revoke Partitions");
VLOG(10) << m_display_str_fn(MORPHEUS_CONCAT_STR(
"Rebalance: Revoke Partitions. Current Partitions: << "
<< StringUtil::array_to_str(old_partition_ids.begin(), old_partition_ids.end())
<< ". Revoking: " << StringUtil::array_to_str(new_partition_ids.begin(), new_partition_ids.end())));

// Application may commit offsets manually here if auto.commit.enable=false
if (consumer->rebalance_protocol() == "COOPERATIVE")
Expand Down Expand Up @@ -292,19 +309,13 @@ KafkaSourceStage::subscriber_fn_t KafkaSourceStage::build()
std::vector<std::unique_ptr<RdKafka::Message>> message_batch =
rebalancer.partition_progress_step(consumer.get());

std::shared_ptr<morpheus::MessageMeta> batch;
// Process the messages. Returns true if we need to commit
auto should_commit = rebalancer.process_messages(message_batch);

try
if (should_commit)
{
batch = std::move(this->process_batch(std::move(message_batch)));
} catch (std::exception &ex)
{
LOG(ERROR) << "Exception in process_batch. Msg: " << ex.what();

break;
CHECK_KAFKA(consumer->commitAsync(), RdKafka::ERR_NO_ERROR, "Error during commitAsync");
}

sub.on_next(std::move(batch));
}

} catch (std::exception &ex)
Expand Down Expand Up @@ -418,20 +429,6 @@ std::unique_ptr<RdKafka::KafkaConsumer> KafkaSourceStage::create_consumer(RdKafk
throw std::runtime_error("Failed to list_topics in Kafka broker after 5 attempts");
}

if (md == nullptr)
{
CHECK_KAFKA(consumer->metadata(spec_topic == nullptr, spec_topic.get(), &md, 1000),
RdKafka::ERR_NO_ERROR,
"Failed to list_topics in Kafka broker");
}

if (md == nullptr)
{
CHECK_KAFKA(consumer->metadata(spec_topic == nullptr, spec_topic.get(), &md, 1000),
RdKafka::ERR_NO_ERROR,
"Failed to list_topics in Kafka broker");
}

std::map<std::string, std::vector<int32_t>> topic_parts;

auto &ctx = srf::runnable::Context::get_runtime_context();
Expand Down Expand Up @@ -467,14 +464,29 @@ std::unique_ptr<RdKafka::KafkaConsumer> KafkaSourceStage::create_consumer(RdKafk
auto positions =
foreach_map(toppar_list, [](const std::unique_ptr<RdKafka::TopicPartition> &x) { return x->offset(); });

auto watermarks = foreach_map(toppar_list, [&consumer](const std::unique_ptr<RdKafka::TopicPartition> &x) {
int64_t low;
int64_t high;
CHECK_KAFKA(consumer->query_watermark_offsets(x->topic(), x->partition(), &low, &high, 1000),
RdKafka::ERR_NO_ERROR,
"Failed retrieve Kafka watermark offsets");

return std::make_tuple(low, high);
});

auto watermark_strs = foreach_map(watermarks, [](const auto &x) {
return MORPHEUS_CONCAT_STR("(" << std::get<0>(x) << ", " << std::get<1>(x) << ")");
});

auto &ctx = srf::runnable::Context::get_runtime_context();
VLOG(10) << ctx.info()
<< MORPHEUS_CONCAT_STR(
" Topic: '" << topic->topic()
<< "', Parts: " << StringUtil::array_to_str(part_ids.begin(), part_ids.end())
<< ", Committed: " << StringUtil::array_to_str(committed.begin(), committed.end())
<< ", Positions: "
<< StringUtil::array_to_str(positions.begin(), positions.end()));
<< ", Positions: " << StringUtil::array_to_str(positions.begin(), positions.end())
<< ", Watermarks: "
<< StringUtil::array_to_str(watermark_strs.begin(), watermark_strs.end()));
}

return std::move(consumer);
Expand Down
16 changes: 5 additions & 11 deletions morpheus/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,17 +355,6 @@ def run(ctx: click.Context, **kwargs):
pass


def validate_rolls(ctx, param, value):
if isinstance(value, tuple):
return value

try:
rolls, _, dice = value.partition("d")
return int(dice), int(rolls)
except ValueError:
raise click.BadParameter("format must be 'NdM'")


@click.group(chain=True,
short_help="Run the inference pipeline with a NLP model",
no_args_is_help=True,
Expand Down Expand Up @@ -671,6 +660,11 @@ def from_file(ctx: click.Context, **kwargs):
is_flag=True,
help=("Enabling this option will skip pre-filtering of json messages. "
"This is only useful when inputs are known to be valid json."))
@click.option("--auto_offset_reset",
type=click.Choice(["earliest", "latest", "none"], case_sensitive=False),
default="latest",
help=("Sets the value for the configuration option 'auto.offset.reset'. "
"See the kafka documentation for more information on the effects of each value."))
@prepare_command()
def from_kafka(ctx: click.Context, **kwargs):

Expand Down
34 changes: 21 additions & 13 deletions morpheus/stages/input/kafka_source_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ class KafkaSourceStage(SingleOutputSource):
disable_commit: bool, default = False
Enabling this option will skip committing messages as they are pulled off the server. This is only useful for
debugging, allowing the user to process the same messages multiple times.
disable_pre_filtering : bool, default = False
Enabling this option will skip pre-filtering of json messages. This is only useful when inputs are known to be
valid json.
auto_offset_reset : str, default = "latest"
Sets the value for the configuration option 'auto.offset.reset'. See the kafka documentation for more
information on the effects of each value."
"""

def __init__(self,
Expand All @@ -59,11 +65,15 @@ def __init__(self,
group_id: str = "custreamz",
poll_interval: str = "10millis",
disable_commit: bool = False,
disable_pre_filtering: bool = False):
disable_pre_filtering: bool = False,
auto_offset_reset: str = "latest"):
super().__init__(c)

self._consumer_conf = {
'bootstrap.servers': bootstrap_servers, 'group.id': group_id, 'session.timeout.ms': "60000"
'bootstrap.servers': bootstrap_servers,
'group.id': group_id,
'session.timeout.ms': "60000",
"auto.offset.reset": auto_offset_reset
}

self._input_topic = input_topic
Expand All @@ -88,7 +98,7 @@ def __init__(self,
# Override the auto-commit config to enforce custom streamz checkpointing
self._consumer_params['enable.auto.commit'] = 'false'
if 'auto.offset.reset' not in self._consumer_params.keys():
self._consumer_params['auto.offset.reset'] = 'latest'
self._consumer_params['auto.offset.reset'] = 'earliest'
self._topic = topic
self._npartitions = npartitions
self._refresh_partitions = refresh_partitions
Expand All @@ -107,7 +117,7 @@ def name(self) -> str:
def supports_cpp_node(self):
return True

def _source_generator(self, sub: srf.Subscriber):
def _source_generator(self):
# Each invocation of this function makes a new thread so recreate the producers

# Set some initial values
Expand Down Expand Up @@ -181,7 +191,7 @@ def _source_generator(self, sub: srf.Subscriber):
for partition in range(npartitions):
tps.append(ck.TopicPartition(self._topic, partition))

while sub.is_subscribed():
while True:
try:
committed = consumer.committed(tps, timeout=1)
except ck.KafkaException:
Expand All @@ -191,7 +201,7 @@ def _source_generator(self, sub: srf.Subscriber):
positions[tp.partition] = tp.offset
break

while sub.is_subscribed():
while True:
out = []

if self._refresh_partitions:
Expand Down Expand Up @@ -248,10 +258,11 @@ def commit(topic, part_no, keys, lowest, offset):
self._consumer_conf["bootstrap.servers"],
offset)

weakref.finalize(meta, commit, *part[1:])
if (not self._disable_commit):
weakref.finalize(meta, commit, *part[1:])

# Push the message meta
sub.on_next(meta)
yield meta
else:
time.sleep(self._poll_interval)
except Exception:
Expand All @@ -263,7 +274,6 @@ def commit(topic, part_no, keys, lowest, offset):
# Close the consumer and call on_completed
if (consumer):
consumer.close()
sub.on_completed()

def _kafka_params_to_messagemeta(self, x: tuple):

Expand Down Expand Up @@ -292,13 +302,11 @@ def _read_gdf(kafka_configs,
raise ValueError("ERROR: You must specifiy the topic "
"that you want to consume from")

kafka_confs = {str.encode(key): str.encode(value) for key, value in kafka_configs.items()}

kafka_datasource = None

try:
kafka_datasource = KafkaDatasource(
kafka_confs,
kafka_configs,
topic.encode(),
partition,
start,
Expand All @@ -317,7 +325,7 @@ def _read_gdf(kafka_configs,

result = cudf_readers[message_format](kafka_datasource, engine="cudf", lines=lines)

return cudf.DataFrame(data=result._data, index=result._index)
return cudf.DataFrame._from_data(data=result._data, index=result._index)
except Exception:
logger.exception("Error occurred converting KafkaDatasource to Dataframe.")
finally:
Expand Down

0 comments on commit cb12889

Please sign in to comment.