Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manual testing of Morpheus with Kafka & Validation improvements #290

Merged
37 commits merged into from
Aug 8, 2022
Merged
Changes from 1 commit
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
5763a82
Simple copy test
dagardner-nv Jul 19, 2022
2b47e46
Add instructions for partitioned test
dagardner-nv Jul 19, 2022
bf16140
Add Kafka ABP validation instructions
dagardner-nv Jul 19, 2022
81d4fd8
Test data
dagardner-nv Jul 21, 2022
8b4554c
Add a flag to optionally include or exclude the cudf ID column from C…
dagardner-nv Jul 21, 2022
91636af
Replace usage of older MORPHEUS_HOME env var with MORPHEUS_ROOT
dagardner-nv Jul 21, 2022
55f06c7
Update copy test using Kafka's console producer and consumer
dagardner-nv Jul 21, 2022
abc4864
Update partitioned testing:
dagardner-nv Jul 21, 2022
fafa8b7
Refactor ABP pipeline test
dagardner-nv Jul 21, 2022
abb3343
Compare with sorted keys
dagardner-nv Jul 21, 2022
ee9f2b1
wip
dagardner-nv Jul 21, 2022
d5dfb91
Skip python test, known bug
dagardner-nv Jul 22, 2022
ee6c403
Move comparison logic from validation stage to be usable outside of a…
dagardner-nv Jul 22, 2022
562e510
Rename morpheus.utils.logging to morpheus.utils.logger allowing other…
dagardner-nv Jul 22, 2022
69d881b
Make compare_df runnable from the command line
dagardner-nv Jul 22, 2022
ff913d1
Fix bug in compare_df
dagardner-nv Jul 22, 2022
521cb87
When rows don't match but are within the tolerances comparison.matche…
dagardner-nv Jul 22, 2022
d75b5d0
Add hammah user123 to test
dagardner-nv Jul 22, 2022
8a70db2
Add phishing pipeline to kafka test
dagardner-nv Jul 22, 2022
77a1f85
Add sid validation to kafka testing
dagardner-nv Jul 22, 2022
d914ddd
Split bash commands
dagardner-nv Jul 22, 2022
a79615a
Split bash commands
dagardner-nv Jul 23, 2022
d8d4027
wip
dagardner-nv Jul 23, 2022
f6fc77b
wip
dagardner-nv Jul 23, 2022
eab0051
wip
dagardner-nv Jul 23, 2022
ae5e089
wip
dagardner-nv Jul 23, 2022
eb407b7
wip
dagardner-nv Jul 23, 2022
ebcdda7
wip
dagardner-nv Jul 23, 2022
f0c0fc8
Script no longer needed
dagardner-nv Jul 23, 2022
8f0da01
Fix includes
dagardner-nv Jul 25, 2022
3e6d2bf
Fix imports
dagardner-nv Jul 25, 2022
3a6aaf9
Update import paths
dagardner-nv Jul 25, 2022
fbeb52b
Add instructions for setting ports to prevent forwarding of ip6 ports…
dagardner-nv Jul 26, 2022
f81e3a3
Merge branch 'branch-22.08' into david-test-kafka
dagardner-nv Aug 1, 2022
a279f2b
Move main and parse_arge to a stand-alone script
dagardner-nv Aug 1, 2022
000f860
Fix logger name
dagardner-nv Aug 1, 2022
3bda99d
Update instructions to use new compare_data_files.py script
dagardner-nv Aug 1, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Update copy test using Kafka's console producer and consumer
  • Loading branch information
dagardner-nv committed Jul 21, 2022
commit 55f06c7156c5b77803cbe31d4d8d33aec6b98d3f
72 changes: 63 additions & 9 deletions scripts/validation/kafka_testing.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,80 @@
```

## Simple Data Copying
1. Create a topic called "morpheus-copy-test" with only a single partition
### Checking KafkaSourceStage

1. Open a new terminal and create a topic called "morpheus-src-copy-test" with only a single partition
```bash
./start-kafka-shell.sh $KAFKA_ADVERTISED_HOST_NAME
$KAFKA_HOME/bin/kafka-topics.sh --create --topic=morpheus-copy-test --partitions 1 --bootstrap-server `broker-list.sh`
docker run --rm -it -v /var/run/docker.sock:/var/run/docker.sock \
-e HOST_IP=$KAFKA_ADVERTISED_HOST_NAME -e ZK=$2 \
-v ${MORPHEUS_ROOT}:/workspace wurstmeister/kafka /bin/bash

$KAFKA_HOME/bin/kafka-topics.sh --create --topic=morpheus-src-copy-test --partitions 1 --bootstrap-server `broker-list.sh`
```
Keep this shell & container open you will need it in later steps.

1. Open a new terminal and launch a pipeline to listen to Kafka, from the root of the Morpheus repo run:
```bash
morpheus --log_level=DEBUG run pipeline-nlp from-kafka --input_topic morpheus-copy-test --bootstrap_servers "${BROKER_LIST}" deserialize monitor --description read serialize to-file --filename=/tmp/morpheus-copy-test.csv --overwrite
morpheus --log_level=DEBUG run \
pipeline-nlp \
from-kafka --input_topic morpheus-src-copy-test --bootstrap_servers "${BROKER_LIST}" \
deserialize \
monitor --description "Kafka Read" \
serialize \
to-file --include-index-col=false --filename=${MORPHEUS_ROOT}/.tmp/morpheus-src-copy-test.csv --overwrite
```
1. Open a new terminal and launch a Kafka writer process:

1. Return to the Kafka terminal and run:
```bash
cat /workspace/tests/tests_data/filter_probs.json | \
$KAFKA_HOME/bin/kafka-console-producer.sh \
--topic=morpheus-src-copy-test --broker-list=`broker-list.sh` -
```

1. Return to the Morpheus terminal, and once the monitor stage has recorded: `read: 20 messages` shut down the pipeline with Cntrl-C.

1. If successful the output file `.tmp/morpheus-src-copy-test.csv` should be identicle to `tests/tests_data/filter_probs.csv`. Verify:
```bash
diff -q --ignore-all-space ${MORPHEUS_ROOT}/tests/tests_data/filter_probs.csv ${MORPHEUS_ROOT}/.tmp/morpheus-src-copy-test.csv
```

### Checking WriteToKafkaStage
1. Open a new terminal and create a topic called "morpheus-sink-copy-test" with only a single partition, and start a consumer on that topic:
```bash
morpheus --log_level=DEBUG run pipeline-nlp from-file --filename=tests/tests_data/filter_probs.csv deserialize serialize --exclude='^_ts_' to-kafka --output_topic morpheus-copy-test --bootstrap_servers "${BROKER_LIST}"
docker run --rm -it -v /var/run/docker.sock:/var/run/docker.sock \
-e HOST_IP=$KAFKA_ADVERTISED_HOST_NAME -e ZK=$2 \
-v ${MORPHEUS_ROOT}:/workspace wurstmeister/kafka /bin/bash

$KAFKA_HOME/bin/kafka-topics.sh --create --topic=morpheus-sink-copy-test --partitions 1 --bootstrap-server `broker-list.sh`

$KAFKA_HOME/bin/kafka-console-consumer.sh --topic=morpheus-sink-copy-test \
--bootstrap-server `broker-list.sh` > /workspace/.tmp/morpheus-sink-copy-test.jsonlines
```

1. Open a new terminal and from the Morpheus root run:
```bash
morpheus --log_level=DEBUG run \
pipeline-nlp \
from-file --filename=${MORPHEUS_ROOT}/tests/tests_data/filter_probs.csv \
deserialize \
serialize \
to-kafka --output_topic morpheus-sink-copy-test --bootstrap_servers "${BROKER_LIST}"
```
The `tests/tests_data/filter_probs.csv` contains 20 lines of data and the pipeline should complete rather quickly (less than 5 seconds).

1. Return to the first terminal, and once the monitor stage has recorded: `read: 20 messages` shut down the pipeline with Cntrl-C.
1. If successful our output file `/tmp/morpheus-copy-test.csv` should be identicle to `tests/tests_data/filter_probs.csv` with the addition of cuDF's ID column. To verify the output we will strip the new ID column and pipe the output to `diff`:
1. The Kafka consumer we started in step #1 won't give us any sort of indication that it has concluded we will indirectly check the progress by counting the rows in the output file. Once the Morpheus pipeline completes check the number of lines in the output:
```bash
wc -l ${MORPHEUS_ROOT}/.tmp/morpheus-sink-copy-test.jsonlines
```

1. Once all 20 lines have been written to the output file, verify the contents with:
```bash
scripts/validation/strip_first_csv_col.py /tmp/morpheus-copy-test.csv | diff -q --ignore-all-space tests/tests_data/filter_probs.csv -
diff -q --ignore-all-space <(cat ${MORPHEUS_ROOT}/.tmp/morpheus-sink-copy-test.jsonlines | jq --sort-keys) <(cat ${MORPHEUS_ROOT}/tests/tests_data/filter_probs.jsonlines | jq --sort-keys)
```
Note the usage of `jq --sort-keys` which will reformat the json outut, sorting the keys, this ensures that `{"a": 5, "b": 6}` and `{"b": 6, "a": 5}` are considered equivelant.

1. Stop the consumer in the Kafka terminal.


## Partitioned Data Copying
Same as above, but we cannot depend on the ordering of the records being preserved.
dagardner-nv marked this conversation as resolved.
Show resolved Hide resolved
Expand Down