- Referencia: O projeto aqui implementado é do site projectpro.io onde estou estudando projetos de engenharia de dados profissionais. Créditos para a equipe.
- Neste projeto, usaremos um conjunto de dados de comércio eletrônico para simular os registros de compras do usuário, visualizações de produtos, histórico de carrinho e jornada do usuário na plataforma online para criar dois pipelines analíticos, Lote e Tempo Real. O processamento em lote envolverá ingestão de dados, arquitetura Lake House, processamento e visualização usando Amazon Kinesis, Glue, S3 e QuickSight para obter insights sobre o seguinte:
- Visitantes únicos por dia
- Durante um determinado tempo, os usuários adicionam produtos aos carrinhos, mas não os compram
- Principais categorias por hora ou dia da semana (ou seja, para promover descontos com base em tendências)
- Para saber quais marcas precisam de mais marketing
- Os dados usados nesse projeto são o eCommerce behavior data from multi category store do kaggle
- Os dados possuem 9 colunas referente ao acesso em um site. Basicamente, o que os dados fazem, é disponibilizar o que uma pessoa faz em um acesso em uma loja de compras, por exemplo: "As 9 horas o ID_CONTA X acessou o produto XY que é um smarthone e colocou o produto no carrinho na seção Z"
- As colunas são detalhadas a seguir
- event_time: Hora em que o evento aconteceu
- event_type: Apenas um tipo dos eventos (view,cart,remove_from_cart,purchase)
- product_id: Id do produto
- category_id: Id da categoria do produto
- category_code: Nome da categoria
- brand: Sequencia do nome da marca em letra minuscula
- price: Preço do produto
- user_id: Id do usuário (estático)
- user_session: Id da sessão (Pode variar para o mesmo usuario, cada sessão é um id único)
- Usaremos esses dados para simular como se fosse uma pessoa usando o site e então vamos fazer o pipeline acima descrito
- Vou seguir o projeto e vou usar o CLI da AWS
- Em vez de instalar na minha máquina, vou usar o AWS CloudShell, que é mais prático
- Crie o bucket usando CLI como seguinte comando (substitua {bucket-name} pelo nome do bucket):
-
aws s3 mb s3://{bucket-name}
- Podemos ver se o bucket foi criado acessando o recurso S3 e indo em bucket. No meu caso, consegui criar um bucket com acesso privado, como mostra a imagem abaixo (apaguei parte das informações)
- Agora vou adicionar uma tag para o bucket
-
aws s3api put-bucket-tagging --bucket {bucket-name} --tagging 'TagSet=[{Key={key-tag},Value={value-tag}}]'
- Essa tag rastreia as modificações feitas pelo usuário que está consumindo o bucket
- Agora podemos enviar nossos dados para o bucket. Se eu tivesse optado pela escolha de usar o CLI do windowns, poderia fazer isso diretamente do meu pc, mas com o CloudShell é necessário ou enviar o arquivo para o CloudShell (máximo 1gb) ou enviar direto para o bucket no painel (Máximo de 5gb, que é o nivel gratuito)
- Portanto, vou enviar direto pelo painel mesmo, inclusive, vou diminuir a quantidade de dados que tenho atualmente para poder caber no nível de teste gratuito do bucket S3, como mostrado na imagem abaixo
- Agora é só enviar os dados (no meu caso, utilizei só as as primeiras 7 milhões de linha do arquivo) para o bucket
- Vá no painel de serviços do AWS e pesquise por Kinesis, e então crie um serviço do tipo Kinesis Data Stream
- Existe pouca opção de criação, então optei por uso mínimo
- Após criar, o serviço fica disponivel como na imagem abaixo
- Para criar essa aplicação, usaremos a SDK boto3 da AWS. Esse framework é preparado para rodar dentro do próprio serviço da amazon, podendo ser no CLI, no SHELL ou no Cloud9.
- Para o nosso projeto, vamos usar o Cloud9. Para isso, vá até o serviço de aplicações da AWS e procure por Cloud9. Crie um novo serviço e use um sistema de computação EC2 que esteja habilitado para o nível grátis. Use a minima quantidade de recursos possiveis.
- A imagem acima mostra o ambiente já criado, e onde clicar caso queira criar outro. Depois de criado, basta clicar em "em aberto" que é o link da IDE
- Com a IDE aberta, você instala a biblioteca na IDE através do bash no canto inferior usando o comando pip do python e depois testa com uso no bucket S3. A imagem abaixo mostra o processo
- No meu caso deu certo, ele executou o e printou os bucketes disponíveis
- Agora vamos acessar dados de dentro desses buckets. Isso é interessante porque vamos recuperar os dados do nosso bucket e vamos enviar em streaming para o Kinesis
- O código abaixo está mostrando a execução dos comandos de acesso ao bucket, com a adição do comando split('/n') podemos facilmente gerar uma matriz de strings e depois transformalos em um json que vamos enviar para o kinesis
- Abaixo está o código modificado para poder transformar cada registro em um vetor. A função decode('utf-8') serve para transformar o tipo byte em string e a função split('\r\n') serve para poder quebrar de maneira correta cada registro
- Agora vamos criar a função de inserção para o kimnesis englobando os recuperados do s3
- Primeiro vamos importar as bibliotecas necessárias, entre elas a biblioteca csv e json. A biblioteca csv vai ler os dados em forma de dicionário através do método DictReader() e a biblioteca json vai devolver os dados em json para o kinesis
- Abaixo está o código e o response, esse código é do projectpro, mas na documentação obtemos quase os mesmos códigos, entretanto, aqui eles fazem tratamento dos dados csv
- Primeiro usamos a biblioteca boto3, nela, podemos acessar os clientes s3 e kinesis. A partir disso, recuperamos os dados do S3 a partir do comando "response_nov = client_s3.get_object(Bucket={},Key={})". Bucket é o nome do bucket em que está os dados e Key é o nome do arquivo. O método get_object recupera qualquer tipo de arquivo do bucket.
- O código "response_nov = response_nov['Body'].read().decode("utf-8").split('\r\n')" realiza a leitura do arquivo, a resposta da requisção chega em formato JSON, e no campo Body vem os dados em forma de byte, para convertelos para string usamos decode('utf-8'). o método read() é usado para ler os dados em binário e split('\r\n') é usado para quebrar a string unica em vários vetores contendo as informações de uma só linha
- Depois criamos o for iterativo onde vamos percorrer as linhas e enviar as informações. Como o arquivo possuia cabeçalho usei o método da biblioteca csv no "for x in csv.DictReader(response):"
- Dentro do looping fazemos a transformação dos dados, aqui é importante entender o que a linha "json_load['txn_timestamp'] = datetime.now().isoformat()" está fazendo, ela salva o horário de envio da informação e a linha "response = client_kinesis.put_record(StreamName='{},Data=json.dumps(json_load, indent=4),PartitionKey=str(json_load['category_id']))" é quem de fato faz a postagem dos dados para o kinesis
- É importante resssaltar o uso da "PartitionKey" de 'category_id'. Isso é importante porque conseguimos melhorar a divisão dos nossos dados
- Podemos ver os dados chegando através do painel de obtenção de dados, como abaixo:
- Os próximos passos estão relacionados a construção do processamento dos dados em streaming
- Para isso, vamos criar um banco de dados no AWS Glue utilizando o arquivo que usamos para simular os dados em stream
- Depois, vamos construir mais um canal de stream no kinesis para poder empurrar os dados do flink para ele
- Por fim, vamos construir um caderno flink que vai ler os dados do stream 1 e empurrar os dados para o stream 2
- O AWS Glue vai funcionar como registro de schema para o flink, basicamente, é ele quem vai nortear o SQL do Flink a partir do schema que ele puxar dos nossos dados em CSV guardados no S3
- Para criar um novo banco de dados no AWS Glue, entre no serviço, vá em banco de dados e aperte em criar um novo
- No nivel gratuito temos bastante espaço para poder utilizar o serviço
- No painel de criação, coloque o nome do seu banco de dados e a URI da sua pasta S3 onde você quer salvar o banco
- Agora vamos adicionar a tabela usando o crawler, isso significa que o glue vai analisar o arquivo e vai definir um schema pra ele
- As configurações são pessoais, a única que deve ser realmente vista é a onfiguração que aponta para onde estão os dados. O que montamos nessa config é o crawler, então a missão dele é pesquisar dentro do bucket S3 em busca dos dados. Portanto, coloque apenas o caminho do bucket, não do dado em si
- Depois de construido, vá em tables e acesse a sua tabela AWS Glue
- Dentro da tabela, acesse os dados através do Athena
- Dentro do Athen, aperte em configurações e gerenciar, para indicar um local onde as consultas vão ficar salval. Vou salvar na mesma pasta do bucket S3 que estou usando em todo o projeto
- Agora podemos executar a query padrão e ver os resultados
- Pronto, nosso banco de dados AWS Glue foi criado
- Apenas siga os passos da seção Configurando e Implementando o AWS Kinesis Data Stream
- Criar Stream Aplication no Kinesis -> Ler os dados Stream com a Estrutura da tabela do Glue e passar pro outro stream-> Salvar como Aplicação -> Fazer o Deploy da Aplicação -> Executar a aplicação
- Primeiro, vamos no fluxo de dados do kinesis e clique e "Apache Flink Gerenciado"
- Depois, selecione studio notebook
- Selecione criação personalizada, coloque o nome e aperte em avançar
- Na seção de permissões, deixe a permissão padrão e em baixo coloque o banco de dados AWS Glue que criamos anteriormente
- Em fontes incluidas, na fonte destino, coloque o primeiro fluxo kinesis que criei e na fonte de destino, coloque o segundo fluxo kinesis
- Na próxima página, deixe tudo como padrão e coloque o bucket s3 que criamos anteriormente como destino de configurações
- Agora podemos acessar nosso caderno zepelin e acompanhar o stream de dados em tempo real por lá, aperte em executar e abra o zepelin
- Crie um novo notebook, ele deve ter essa cara
- É nele onde vamos codar a nossa aplicação
- Para acessar os dados usando o flink, precisamos construir uma tabela que será salva no AWS Glue, podemos construir diversos tipos de comandos SQL e usar em stream nessas tabelas, que possuem taxa de atualização em real time. Primeiro, vamos construir a tabela que vai ler os dados.
-
%flink.ssql /*Primeiro, drop um schema caso ele exista*/ DROP TABLE IF EXISTS kinesis_pipeline_table_1; /*Aqui, vou usar o schema dos dados que chegam para a aplicação no formato json*/ CREATE TABLE kinesis_pipeline_table_1 ( event_time VARCHAR(30), event_type VARCHAR(30), product_id BIGINT, category_id BIGINT, category_code VARCHAR(30), brand VARCHAR(30), price DOUBLE, user_id BIGINT, user_session VARCHAR(30), txn_timestamp VARCHAR(30) ) PARTITIONED BY (category_id) WITH ( 'connector' = 'kinesis', 'stream' = '{kinesis_stream}', 'aws.region' = 'us-east-2', 'format' = 'json' );
- O comando '%flink.ssql' é fundamental para que o zepelin identifique que é um comando fliker, se não colocar os comandos SQL não funciona
- Criamos a tabela e inserimos o SCHEMA como um SGBD normal, inclusive, utilizando os mesmos comandos
- O comando PARTITIONED BY (category_id) é uma boa prática do flink
- O comando WITH indica os conectores, nesse caso, conector kinesis, em stream coloquei o meu fluxo de dados de entrada, o formato de leitura e a região da minha conta da AWS
- Agora vou criar mais uma célula e colocar um comando simples para poder ler o banco de dados
-
%flink.ssql SELECT * FROM kinesis_pipeline_table_1;
- Vamos ver lá no AWS Glue se o banco foi criado
- Pronto, o banco foi criado da maneira correta no Glue
- Agora vamos ver se o flink está lendo corretamente o stream do kinesis. Execute a aplicação do Cloud9 para simular acessos no site e depois execute o comando de select
- No meu caso, funcionou, agora vamos para a segunda parte, que é construir o input para o segundo fluxo de dados do kinesis
- Os dados que eu vou repassar para o segundo fluxo de dados é uma contagem de ações por usuário. Para isso, vou testar a seguinte query
-
SELECT user_id,event_type, count(event_type) FROM kinesis_pipeline_table_1 GROUP BY user_id,event_type;
- Ok, está funcionando, agora, vou adicionar uma tabela nova antes da query, essa tabela vai corresponder ao segundo fluxo de dados
-
%flink.ssql DROP TABLE IF EXISTS kinesis_pipeline_table_2; /*Aqui, vou usar o schema dos dados que chegam para a aplicação no formato json*/ CREATE TABLE kinesis_pipeline_table_2 ( user_id BIGINT, event_type VARCHAR(30), qtd_event BIGINT ) WITH ( 'connector' = 'kinesis', 'stream' = 'data_ecomerce_2', 'aws.region' = 'us-east-2', 'format' = 'json' );
- Vamos confirmar se a tabela foi criada no AWS Glue
- Agora, vou inserir no insert into o comando de group by para inserir os dados no segundo fluxo do kinesis
- Segundo a documentação precisamos definir janelas de tempo para que possamgos fazer agregamentos em gropuby e envia-los por stream para o kinesis, então vamos fazer uma alteração na tabela 1 e adicionar como datetime a coluna 'txn_timestamp'
- O flink sql não tem suporte para alteração de colunas, então temos que apagar as colunas diretamente no glue, atera-las e executar o script novamente
- A nova tabela para o primeiro fluxo de dados foi alterada. Temos 3 diferenças, a primeira é a mudança em do tipo da coluna 'txn_timestamp' e a criação de uma nova coluna WATERMARK como fala a documentação e a terceira mudança é a inserção do parâmetro 'json.timestamp-format.standard' = 'ISO-8601' para que possa ser lido normalmente nossos dados na forma como construimos na seção de aplicação python com AWS Cloud9
- Executo o aplicativo python de simulação e vemos o coportamento da nova tabela
- Parece que está ok. Agora vamos criar a aplicação para o segundo fluxo de dados em janelas de 1 minuto
- Para executar o comando em uma janela de tempo, utilizei a seguinte query
-
%flink.ssql SELECT user_id,event_type, count(event_type) as qtd FROM kinesis_pipeline_table_1 GROUP BY TUMBLE(txn_timestamp,INTERVAL '1' MINUTE),user_id,event_type;
- Após executar com a aplicação de simulação pronta, temos o seguinte resultado
- Ok, agora é so adicionar o INSERT e pronto
-
INSERT INTO kinesis_pipeline_table_2 SELECT user_id,event_type, count(event_type) as qtd FROM kinesis_pipeline_table_1 GROUP BY TUMBLE(txn_timestamp,INTERVAL '1' MINUTE),user_id,event_type;
- Agora, executamos a aplicação e vamos ver se os dados chegam no segundo fluxo de dados do kinesis
- Deixei um tempo a aplicação funcionando
- E os dados estão chegando para o segundo fluxo de dados
- Estou enviando poucos dados porque tenho medo de cobranças, mas na foto abaixo já da pra visualizar os dados
- Agora vamos por todas as células juntas em apenas uma célula e criar a build da aplicação
- Depois, gere uma aplicação flink
- Pronto, aplicação flink criada, Agora é só executar a aplicação e o pipeline vai funcionar corretamente, sem necessarimanete executar diretamente no zepelin
- Em algumas situações, podemos ser atacados por bots ou algo do tipo
- Para essas situações, vamos construir um sistema de alerta que vai enviar um Email para uma conta cadastrada quando isso acontecer
- Para isso, vá até o SNS e crie um tópico, os clientes vão consumir esse tópico
- Crie como padrão
- Deixe todos as config minimas e padrão e crie o tópico
- Depois de criado, insira um consumidor criando assinatura
- Coloque para enviar um email-json e coloque seu email
- Para confirmar a assinatura, você entra no seu email, copia o url e confirma a assinatura com o URL enviado
- Vamos agora criar o banco de dados que vai persistir os processamentos do segundo fluxo de dados do kinesis utilizando o AWS Lambda
- Para isso, vá para o serviço DynamoDB e crie uma nova tabela
- Agora dé um nome e uma chave primária
- No restante, mantenha a configuração padrão mínima
- Por motivos de custo, continuar utilizando essa arquitetura é muito caro para um projeto simples, então vou fazer uma leve modificação aqui. Em vez de processar os dados utilizando o flink (que me custou 4 dolares) vou fazer passar apenas o primeiro fluxo de dados
- Aqui é importante salientar uma coisa, os fluxos de dados estão funcionando por padrão, então, bastaria implementar no AWS Lambda o gatilho para o fluxo de dados processados, em vez do primeiro fluxo de dados. Nesse caso, só estou mostrando minha expertise, mas não posso me dar o luxo de gastar muito com isso
- Para criar o AWS Lambda, podemos começar de 2 formas. Uma é criando o recurso e implementando os códigos depois, e outra é criando os códigos, implementando a biblioteca e zipando tudo, depois só criar o recurso. Bom, vou optar por essa segunda opção
- O AWS Lambda com python funciona através de um arquivo .zip que deve conter além dos códigos, as bibliotecas e os recursos, pode-se encontrar mais informações na documentação. Vou seguir os passos da documentação
- A única biblioteca que pretendo usar é a boto3 do AWS SDK
- Vou criar uma pasta onde vou criar o arquivo python lambda_function.py
- Dentro da pasta, vou criar uma outra pasta com o nome de package, onde as bibliotecas vão ser instaladas
- Agora, as instruções abaixo são para a instalação do pacote no windows
-
cd desktop cd aws-lambda-code pip install --target .\package boto3 --no-user
- Crie um zip dos arquivos de pacote e delete a pasta packages e crie um zip dos arquivos junto com a função lambda
- Vá para o lambda e crie uma nova função do zero. Deve ficar igual a de baixo
- Para que tudo funcione da maneira correta, vá configurações -> permissões -> nome da função e clique na permissão que ta lá. Habilite a permissão de de administrador
- Agora, que você tem as permissões, adicione um gatilho do kinesis. No meu caso, o gatilho vai ser o fluxo de dados 1. Mas também pode acionar o gatilho de fluxo de dados 2
- Podemos testar para ver se está funcionando,para isso, escreva o seguinte código no lambda_function.py
-
from __future__ import print_function from datetime import datetime import base64 import json import boto3 import os def lambda_handler(event, context): for record in event['Records']: #Kinesis data is base64 encoded so decode here payload=base64.b64decode(record["kinesis"]["data"]) return 'Successfully processed {} records.'.format(str(payload))
- Faça o processo de zip e envie para o lambda. Após enviar, Vá na aba testar e execute o JSON Normal. Eu mudei a as informações do campo data do JSON para um encoded 64 diferente
- É importante ressaltar que as informações chegam do kinesis em formato de base64, então você tem que fazer o decode da informação.
- Podemos ver a string de retorno que criamos
- Agora, vamos escrever o código que vai realizar a inserção no DynamoDB. Abaixo explicaremos melhor:
-
from __future__ import print_function from boto3.dynamodb.types import TypeSerializer import base64 import json import boto3 import os dynamodb = boto3.client('dynamodb', region_name='us-east-2') def convert_to_dynamodb(value): serializer = TypeSerializer() return serializer.serialize(value) def lambda_handler(event, context): for record in event['Records']: payload=base64.b64decode(record["kinesis"]["data"]) json_document = json.loads(payload.decode('utf-8')) input_user_id = str(json_document['user_id']) json_document['id_usuario'] = 'userid{}'.format(input_user_id) json_document_dynamodb = {key: convert_to_dynamodb(value) for key, value in json_document.items()} dynamodb.put_item(TableName='{table_name}',Item=json_document_dynamodb) resposta_val = 'O Documento {} Foi Inserido com A Chave {}'.format(json_document,input_user_id) return resposta_val
- Primeiro, nós chamamos o cliente do dynamodb através do boto3 em 'dynamodb = boto3.client('dynamodb', region_name='us-east-2')', depois construimos a função de serelização para poder inserir os dados no dynamodb, por padrão, o JSON normal não poder ser diretamente inserido na tabela então vamos transformar com a função 'def convert_to_dynamodb(value):', depois, entramos de fato na função lambda. Utilizamos o código para visualização do kinesis da documentação. Fizemos a decodificação dos dados, que já foi explicado anteriormente, adicionamos um campo 'id_usuario' que foi configurado como "chave_primaria" da tabela do dynamodb, depois faço a conversão de todos os dados para o tipo suportado utilizando a função convert_to_dynamodb e depois realizo o put na tabela do dynamodb
- Em cima está o resultado do teste e agora vou mostrar a inserção no DynamoDB
- O projeto da maneira como está me gerou custos mesmo usando o periodo gratuito da AWS, por isso, nenhum desses sitemas vão ser ligados ao mesmo tempo, e vou me limitar a usar os estados de teste
- No caso do Lambda, não vou executa-lo, se não pode gerar ainda mais custos, a partir de agora todas as seções vão ser feitas de maneira independente
- Como apaguei os processos de stream porque estavam me gerando custos adicionais, vou utilizar a tabela glue estática que criamos anteriormente
- Acabei não construindo a função de enviar mensagens, mas isso pode ser implementado com o boto3 dentro da função lambda. Como isso deve ocorrer: O processamento que lmabda deve ler é o processamento do flink, que retorna para o segundo fluxo de dados apenas um JSON normal com 'id_usuario','func','qtd'. A grande sacada é que o flink envia essas informações em lotes, então vamos ler os dados em lote, lendo os dados em lote podemos verificar a quantidade desses dados, por exemplo, se o lote do flink tiver sido programado para 1 minuto e apenas um usuário está com mais de 1000 em qualquer 'qtd' de 'func' então enviariamos um email de alerta de ataque através do boto3 SDK e uma conexão com SNS
- Novamente, o projeto começou a ficar muito extenso, então vou parar po aqui. Tambem já me gerou alguns custos adicionais, realizar trabalhos no glue studio vai me gerar ainda mais custos adicionais, acredito que o que já construi até aqui é o suficiente