Yandex Scale 2020: Обработка поточных данных с использованием Managed Service for Apache Kafka® и Data Proc
В этом репозитории содержится код и памятка для практикума по работе с Managed Service for Apache Kafka® и Data Proc в Яндекс.Облаке.
В этом репозитории есть исходные файлы:
Makefile-- файл для создания нужного окружения на vm, для запуска producer.pyproducer.py-- скрипт для загрузки данных в Kafka сырых событий.streaming.py-- pyspark задача для обработки поточных сырых событий.clickhouse.sql-- DDL для ClickHouse, для экспорта обработанных данных в витрину.
Для работы с виртуальными машинами нам потребуется ssh ключ. Если у вас еще нет пары ключей, то вы можете создать новые ssh-keys по документации Яндекс.Облака.
- Перейдите в сервис VPC и нажмите кнопку
Создать сеть. - Укажите уникальное имя, выставьте флаг
Создать подсетии создайте сеть. - Нажмите на созданную сеть и для новых подсетей через настройки нажмите
Включить NAT в интернет. NAT нужен для Data Proc кластера для доступа к Object Storage и отправки своего статуса.
- Вернитесь в каталог и нажмите слева на вкладку
Сервисные пользователи. - Нажмите
Создать сервисный аккаунтсправа. - Введите имя сервисного аккаунта для кластера Data Proc, например
service-account-900. Имена сервисных аккаунтов должны быть уникальны в облаке, поэтому добавьте суффик своего каталога. - Добавьте роль
mdb.dataproc.agentдля отправки метрик.
- Вернитесь в каталог и нажмите на
Managed Service for Kafkaи кнопкуСоздать кластер. - Укажите имя.
- Укажите класс хоста
s2.microв 2 ядра и 8ГБ памяти. - В
сетевых настройкахукажите используемую зону доступности, чтобы создать только 1 инстанс Kafka. - Количество брокеров в зоне отавьте равным
1, чтобы создать только 1 инстанс кафки в одной зоне доступности. - Нажмите кнопку
Создать.
- Вернитесь в каталог и нажмите на
Data Procи кнопкуСоздать кластер. - Укажите имя.
- Выберете последнюю доступную версию, на момент практикума это
1.3. - Укажите публичную часть ключа в поле
ssh-ключ. - Сервисный аккаунт должен быть выбран автоматически, укажите созданный вручную, если он не указан.
- Выставьте зону доступности, для которой вы включили
NAT. В нашем практикуме мы будем создавать все мощности только в одной из зон доступности, выберите по желанию. - Выставьте флаг
UI Proxyдля проксирования интерфейсов кластера через отдельный endpoint. Он потребуется для написания кода на pyspark и отладки работы spark задания. - В поле подкластеров у вас будет 2 подкластера с ролями
MasterиData. - Для мастер-подкластера через настройки можете задать имя.
- Размер хранилища можно уменьшить до 32ГБ и сохраните изменения.
- Для
Dataподкластера так же нажмите на редактирование. - Число хостов можно оставить 1, нам этого хватит.
- Выставьте класс хоста в 4 ядра и 16ГБ памяти.
- Хранилище можно переключить так же на
network-ssdи уменьшить его до 32ГБ. - Сохраните изменения.
- Создайте кластер.
- Вернитесь в каталог и нажмите на
Managed Service for ClickHouseи кнопкуСоздать кластер. - Укажите имя.
- Выставьте последнюю доступную версию
20.6. - Укажите класс хоста
s2.microв 2 ядра и 8ГБ памяти. - Укажите имя базы данных, например,
scale2020. - Укажите имя пользователя, например,
clickhouse. - Укажите пароль для пользователя.
- Выставьте используемую зону доступности, публичный доступ не нужен.
- Выставьте в дополнительных настройках флаги
Доступ из DataLensиДоступ из консоли управления. - В настройках СУБД нажмите кнопку
Настроить. - Пролистайте настройки до секции
Kafkaи раскройте их. - Укажите:
SASL_MECHANISMкакSCRAM-SHA-512,SASL_PASSWORDкак пароль, с которым ClickHouse будет ходить в Kafka.SASL_USERNAMEкакclickhouse. С таким login ClickHouse будет ходить в Kafka.SECURITY_PROTOL, какSASL_PLAINTEXT.
- Нажмите на
СохранитьиСоздать кластер.
- Вернитесь в ваш кластер Kafka.
- Откройте вкладку
Топикии нажмите наСоздать топик. - Создайте топик
rawдля сырых данных. Фактор репликации нужно оставить в1, т.к. у нас всего 1 инстанс. Остальные настройки можно оставить по-умолчанию. - Создайте такой же топик
combinedдля общения Data Proc и ClickHouse.
- Переключитесь на вкладку
Пользователии нажмите наСоздать пользователя. - Создайте первого пользователя с именем
ingestion, который будет публиковать события в топикraw. Для этого выдайте на топикrawправоACCESS_ROLE_PRODUCER. - Создайте пользователя
dataproc, который будет вычитывать данные из топикаrawи записывать вcombined. Для этого выдайте на топикrawправоACCESS_ROLE_CONSUMERи на топикcombinedправоACCESS_ROLE_PRODUCER. - Создайте первого пользователя с именем
clickhouse, который будет вычитывать события из топикаrawв ClickHouse. Для этого выдайте на топикcombinedправоACCESS_ROLE_CONSUMER.
- Вернитесь в каталог и переключитесь на сервис
Compute Cloud. - Нажмите
Создать ВМ. - Укажите имя, выберите ОС
ubuntu-20.04 lts. - Диск можно уменьшить до 8ГБ.
- Гарантированную долю CPU можно уменьшить до 20%.
- RAM можно уменьшить до 1ГБ.
- Публичный адрес выставить
Автоматически. - Выставить логин
ubuntuи публичную часть вашего ssh-ключа. - Создать.
- Откройте терминал и зайдите на виртуальную машину по ssh.
- Выполните команду
sudo apt update && sudo apt install git make screen python3-dev python3-venv, которая установит команды, необходимые для запуска скрипта. - Выполните
git clone https://github.com/epikhinm/scale2020-data-processing-workshop.git; cd scale2020-data-processing-workshop - Установите зависимости с помощью команды
make venv. - Запустите
screenи в нем выполните. venv/bin/activate, для того чтобы начать использовать python и зависимости из собранного venv. - Скопируйте адрес хоста kafka из UI и выполните в терминале команду:
KAFKA_BROKERS="<kafka_host>:9091" KAFKA_PASS="ingestion_password" KAFKA_USER="ingestion" python producer.py
У вас запустился скрипт, который в цикле забирает события и отправялет их в Kafka.
- Вернитесь в ваш Data Proc кластер.
- Откройте ссылку
Zeppelin Web UI. - У вас откроется Web IDE, где удобно разрабатывать и отлаживать код. Нажмите
Create new note. - Скопируйте код и файла streaming.py, впишите в note первой строкой
%pysparkи вставьте содержимоеstreaming.py. - В переменной
KAFKA_BROKERSукажите адрес инстанса Kafka. - Укажите используемый пароль в пемеренной
PASSWORD. - Запустите заметку кнопкой
Run this paragraphили сочетанием клавишShift+Enter.
- Вернитесь в ваш ClickHouse кластер.
- Откройте вкладку
SQLи подключитесь. - Для создания таблицы, которая будет импортировать данные из Kafka создайте таблицу со следующим кодом (замените broker на актульный адрес Kafka):
CREATE TABLE scale2020.queue (
timestamp DateTime,
location_id UInt32,
latitude Float64,
longitude Float64,
country FixedString(2),
temperature Nullable(Float64) DEFAULT NULL,
humidity Nullable(Float64) DEFAULT NULL,
pressure Nullable(Float64) DEFAULT NULL,
P1 Nullable(Float64) DEFAULT NULL,
P2 Nullable(Float64) DEFAULT NULL
) ENGINE = Kafka
SETTINGS kafka_broker_list = 'broker:9092',
kafka_topic_list = 'combined',
kafka_group_name = 'queue',
kafka_format = 'CSV',
kafka_skip_broken_messages = 1,
kafka_num_consumers = 1,
kafka_max_block_size = 1048576;- Для создания таблицы с историческими данными, создается вторую таблицу:
CREATE TABLE scale2020.air_quality (
timestamp DateTime,
location_id UInt32,
latitude Float64,
longitude Float64,
country FixedString(2),
temperature Nullable(Float64) DEFAULT NULL,
humidity Nullable(Float64) DEFAULT NULL,
pressure Nullable(Float64) DEFAULT NULL,
P1 Nullable(Float64) DEFAULT NULL,
P2 Nullable(Float64) DEFAULT NULL
) ENGINE = MergeTree
PARTITION BY toYYYYMM(timestamp)
ORDER BY (timestamp);- Для того чтобы в фоне ClickHouse перекладывал данные из очереди Kafka в историческую таблицу создайте
MATERIALIZED VIEW:
CREATE MATERIALIZED VIEW scale2020.air_quality_mv TO scale2020.air_quality AS
SELECT * FROM scale2020.queue;- Теперь в таблицу
scale2020.air_qualityбудут все собранные и очищенные записи.
- Вернитесь на описание ClickHouse кластера и переключитесь на вкладку DataLens.
- Если DataLens не активирован, то активируйте его нажав кнопку.
- Нажмите на кнопку
Создать подключениев DataLens. - Назовите подключение по имени вашего аккаунта. DataLens подскажет ClickHouse кластер и хост, укажите только пароль и проверьте соединение.
- Нажмите в правом верхнем углу кнопку
Создать.
- Нажмите на кнопку
Создать датасет. - Слева отобразятся таблицы, перетащите таблицу
scale2020.air_qualityв правую часть экрана. - Сохраните датасет.
- Внизу отображаются данные, которые лежат в датасете.
- Давайте добавим новое поле с помощью кнопки
Добавить поле, напишем в формулуGEOPOINT([latitude], [longitude])и сохраним под именемgeopoint. Нужно убедиться что тип колонки определился какГеоточка. - Сохранить.
- Нажмите кнопку
Создать чарт. - Выберте слева наш датасет.
- Тип чарта --
Карта. - На поле
геоточкиперетащите полеgeopoint. - Поле
countryперетащите на полефилтьтры слояи укажите страну, например,RU. - Теперь на поле размер перетащите поле
P2и выбирете типavgаггрегации. - На
Тултипыможно так же перетащитьP2чтобы видеть актуальные значения. - Сохраните дашборд