Skip to content

Neste projeto, usaremos um conjunto de dados de comércio eletrônico para simular os registros de compras do usuário, visualizações de produtos, histórico de carrinho e jornada do usuário na plataforma online para criar dois pipelines analíticos, Lote e Tempo Real.

Notifications You must be signed in to change notification settings

Antonio-Borges-Rufino/Build-an-Analytical-Platform-for-eCommerce-using-AWS-Services

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

40 Commits
 
 

Repository files navigation

Descrição do Projeto

  • Referencia: O projeto aqui implementado é do site projectpro.io onde estou estudando projetos de engenharia de dados profissionais. Créditos para a equipe.
  • Neste projeto, usaremos um conjunto de dados de comércio eletrônico para simular os registros de compras do usuário, visualizações de produtos, histórico de carrinho e jornada do usuário na plataforma online para criar dois pipelines analíticos, Lote e Tempo Real. O processamento em lote envolverá ingestão de dados, arquitetura Lake House, processamento e visualização usando Amazon Kinesis, Glue, S3 e QuickSight para obter insights sobre o seguinte:
    • Visitantes únicos por dia
    • Durante um determinado tempo, os usuários adicionam produtos aos carrinhos, mas não os compram
    • Principais categorias por hora ou dia da semana (ou seja, para promover descontos com base em tendências)
    • Para saber quais marcas precisam de mais marketing

Diagrama de Arquitetura

  • image

Informação sobre os dados

  • Os dados usados nesse projeto são o eCommerce behavior data from multi category store do kaggle
  • Os dados possuem 9 colunas referente ao acesso em um site. Basicamente, o que os dados fazem, é disponibilizar o que uma pessoa faz em um acesso em uma loja de compras, por exemplo: "As 9 horas o ID_CONTA X acessou o produto XY que é um smarthone e colocou o produto no carrinho na seção Z"
  • As colunas são detalhadas a seguir
    • event_time: Hora em que o evento aconteceu
    • event_type: Apenas um tipo dos eventos (view,cart,remove_from_cart,purchase)
    • product_id: Id do produto
    • category_id: Id da categoria do produto
    • category_code: Nome da categoria
    • brand: Sequencia do nome da marca em letra minuscula
    • price: Preço do produto
    • user_id: Id do usuário (estático)
    • user_session: Id da sessão (Pode variar para o mesmo usuario, cada sessão é um id único)
  • Usaremos esses dados para simular como se fosse uma pessoa usando o site e então vamos fazer o pipeline acima descrito

Instalar comando por CLI da AWS

  • Vou seguir o projeto e vou usar o CLI da AWS
  • Em vez de instalar na minha máquina, vou usar o AWS CloudShell, que é mais prático
  • image

Criar o bucket S3

  • Crie o bucket usando CLI como seguinte comando (substitua {bucket-name} pelo nome do bucket):
  • aws s3 mb s3://{bucket-name}
    
  • Podemos ver se o bucket foi criado acessando o recurso S3 e indo em bucket. No meu caso, consegui criar um bucket com acesso privado, como mostra a imagem abaixo (apaguei parte das informações)
  • image
  • Agora vou adicionar uma tag para o bucket
  • aws s3api put-bucket-tagging --bucket {bucket-name} --tagging 'TagSet=[{Key={key-tag},Value={value-tag}}]'
    
  • Essa tag rastreia as modificações feitas pelo usuário que está consumindo o bucket
  • Agora podemos enviar nossos dados para o bucket. Se eu tivesse optado pela escolha de usar o CLI do windowns, poderia fazer isso diretamente do meu pc, mas com o CloudShell é necessário ou enviar o arquivo para o CloudShell (máximo 1gb) ou enviar direto para o bucket no painel (Máximo de 5gb, que é o nivel gratuito)
  • Portanto, vou enviar direto pelo painel mesmo, inclusive, vou diminuir a quantidade de dados que tenho atualmente para poder caber no nível de teste gratuito do bucket S3, como mostrado na imagem abaixo
  • image
  • Agora é só enviar os dados (no meu caso, utilizei só as as primeiras 7 milhões de linha do arquivo) para o bucket

Configurando e Implementando o AWS Kinesis Data Stream

  • Vá no painel de serviços do AWS e pesquise por Kinesis, e então crie um serviço do tipo Kinesis Data Stream
  • image
  • Existe pouca opção de criação, então optei por uso mínimo
  • Após criar, o serviço fica disponivel como na imagem abaixo
  • image

Criando Aplicação Python Que Simula Um Site

  • Para criar essa aplicação, usaremos a SDK boto3 da AWS. Esse framework é preparado para rodar dentro do próprio serviço da amazon, podendo ser no CLI, no SHELL ou no Cloud9.
  • Para o nosso projeto, vamos usar o Cloud9. Para isso, vá até o serviço de aplicações da AWS e procure por Cloud9. Crie um novo serviço e use um sistema de computação EC2 que esteja habilitado para o nível grátis. Use a minima quantidade de recursos possiveis.
  • image
  • A imagem acima mostra o ambiente já criado, e onde clicar caso queira criar outro. Depois de criado, basta clicar em "em aberto" que é o link da IDE
  • Com a IDE aberta, você instala a biblioteca na IDE através do bash no canto inferior usando o comando pip do python e depois testa com uso no bucket S3. A imagem abaixo mostra o processo
  • image
  • No meu caso deu certo, ele executou o e printou os bucketes disponíveis
  • Agora vamos acessar dados de dentro desses buckets. Isso é interessante porque vamos recuperar os dados do nosso bucket e vamos enviar em streaming para o Kinesis
  • O código abaixo está mostrando a execução dos comandos de acesso ao bucket, com a adição do comando split('/n') podemos facilmente gerar uma matriz de strings e depois transformalos em um json que vamos enviar para o kinesis
  • image
  • Abaixo está o código modificado para poder transformar cada registro em um vetor. A função decode('utf-8') serve para transformar o tipo byte em string e a função split('\r\n') serve para poder quebrar de maneira correta cada registro
  • image
  • Agora vamos criar a função de inserção para o kimnesis englobando os recuperados do s3
  • Primeiro vamos importar as bibliotecas necessárias, entre elas a biblioteca csv e json. A biblioteca csv vai ler os dados em forma de dicionário através do método DictReader() e a biblioteca json vai devolver os dados em json para o kinesis
  • Abaixo está o código e o response, esse código é do projectpro, mas na documentação obtemos quase os mesmos códigos, entretanto, aqui eles fazem tratamento dos dados csv
  • image
  • Primeiro usamos a biblioteca boto3, nela, podemos acessar os clientes s3 e kinesis. A partir disso, recuperamos os dados do S3 a partir do comando "response_nov = client_s3.get_object(Bucket={},Key={})". Bucket é o nome do bucket em que está os dados e Key é o nome do arquivo. O método get_object recupera qualquer tipo de arquivo do bucket.
  • O código "response_nov = response_nov['Body'].read().decode("utf-8").split('\r\n')" realiza a leitura do arquivo, a resposta da requisção chega em formato JSON, e no campo Body vem os dados em forma de byte, para convertelos para string usamos decode('utf-8'). o método read() é usado para ler os dados em binário e split('\r\n') é usado para quebrar a string unica em vários vetores contendo as informações de uma só linha
  • Depois criamos o for iterativo onde vamos percorrer as linhas e enviar as informações. Como o arquivo possuia cabeçalho usei o método da biblioteca csv no "for x in csv.DictReader(response):"
  • Dentro do looping fazemos a transformação dos dados, aqui é importante entender o que a linha "json_load['txn_timestamp'] = datetime.now().isoformat()" está fazendo, ela salva o horário de envio da informação e a linha "response = client_kinesis.put_record(StreamName='{},Data=json.dumps(json_load, indent=4),PartitionKey=str(json_load['category_id']))" é quem de fato faz a postagem dos dados para o kinesis
  • É importante resssaltar o uso da "PartitionKey" de 'category_id'. Isso é importante porque conseguimos melhorar a divisão dos nossos dados
  • Podemos ver os dados chegando através do painel de obtenção de dados, como abaixo:
  • image

Próximo Passo

  • image
  • Os próximos passos estão relacionados a construção do processamento dos dados em streaming
  • Para isso, vamos criar um banco de dados no AWS Glue utilizando o arquivo que usamos para simular os dados em stream
  • Depois, vamos construir mais um canal de stream no kinesis para poder empurrar os dados do flink para ele
  • Por fim, vamos construir um caderno flink que vai ler os dados do stream 1 e empurrar os dados para o stream 2
  • O AWS Glue vai funcionar como registro de schema para o flink, basicamente, é ele quem vai nortear o SQL do Flink a partir do schema que ele puxar dos nossos dados em CSV guardados no S3

Criação do AWS Glue

  • Para criar um novo banco de dados no AWS Glue, entre no serviço, vá em banco de dados e aperte em criar um novo
  • image
  • No nivel gratuito temos bastante espaço para poder utilizar o serviço
  • No painel de criação, coloque o nome do seu banco de dados e a URI da sua pasta S3 onde você quer salvar o banco
  • image
  • Agora vamos adicionar a tabela usando o crawler, isso significa que o glue vai analisar o arquivo e vai definir um schema pra ele
  • image
  • As configurações são pessoais, a única que deve ser realmente vista é a onfiguração que aponta para onde estão os dados. O que montamos nessa config é o crawler, então a missão dele é pesquisar dentro do bucket S3 em busca dos dados. Portanto, coloque apenas o caminho do bucket, não do dado em si
  • Depois de construido, vá em tables e acesse a sua tabela AWS Glue
  • image
  • Dentro da tabela, acesse os dados através do Athena
  • image
  • Dentro do Athen, aperte em configurações e gerenciar, para indicar um local onde as consultas vão ficar salval. Vou salvar na mesma pasta do bucket S3 que estou usando em todo o projeto
  • image
  • Agora podemos executar a query padrão e ver os resultados
  • image
  • Pronto, nosso banco de dados AWS Glue foi criado

Criação do Canal De Saída de Stream 2

  • Apenas siga os passos da seção Configurando e Implementando o AWS Kinesis Data Stream

Criação e Integração de Aplicação com Apache Flink

  • Criar Stream Aplication no Kinesis -> Ler os dados Stream com a Estrutura da tabela do Glue e passar pro outro stream-> Salvar como Aplicação -> Fazer o Deploy da Aplicação -> Executar a aplicação
  • Primeiro, vamos no fluxo de dados do kinesis e clique e "Apache Flink Gerenciado"
  • image
  • Depois, selecione studio notebook
  • image
  • Selecione criação personalizada, coloque o nome e aperte em avançar
  • image
  • Na seção de permissões, deixe a permissão padrão e em baixo coloque o banco de dados AWS Glue que criamos anteriormente
  • image
  • Em fontes incluidas, na fonte destino, coloque o primeiro fluxo kinesis que criei e na fonte de destino, coloque o segundo fluxo kinesis
  • image
  • Na próxima página, deixe tudo como padrão e coloque o bucket s3 que criamos anteriormente como destino de configurações
  • Agora podemos acessar nosso caderno zepelin e acompanhar o stream de dados em tempo real por lá, aperte em executar e abra o zepelin
  • image
  • Crie um novo notebook, ele deve ter essa cara
  • image
  • É nele onde vamos codar a nossa aplicação
  • Para acessar os dados usando o flink, precisamos construir uma tabela que será salva no AWS Glue, podemos construir diversos tipos de comandos SQL e usar em stream nessas tabelas, que possuem taxa de atualização em real time. Primeiro, vamos construir a tabela que vai ler os dados.
  • %flink.ssql
    /*Primeiro, drop um schema caso ele exista*/
    DROP TABLE IF EXISTS kinesis_pipeline_table_1;
    /*Aqui, vou usar o schema dos dados que chegam para a aplicação no formato json*/
    CREATE TABLE kinesis_pipeline_table_1 (
      event_time VARCHAR(30), 
      event_type VARCHAR(30), 
      product_id BIGINT, 
      category_id BIGINT, 
      category_code VARCHAR(30), 
      brand VARCHAR(30), 
      price DOUBLE, 
      user_id BIGINT, 
      user_session VARCHAR(30), 
      txn_timestamp VARCHAR(30)
      )
      PARTITIONED BY (category_id)
      WITH (
          'connector' = 'kinesis',
          'stream' = '{kinesis_stream}',
          'aws.region' = 'us-east-2',
          'format' = 'json'
          );
    
  • O comando '%flink.ssql' é fundamental para que o zepelin identifique que é um comando fliker, se não colocar os comandos SQL não funciona
  • Criamos a tabela e inserimos o SCHEMA como um SGBD normal, inclusive, utilizando os mesmos comandos
  • O comando PARTITIONED BY (category_id) é uma boa prática do flink
  • O comando WITH indica os conectores, nesse caso, conector kinesis, em stream coloquei o meu fluxo de dados de entrada, o formato de leitura e a região da minha conta da AWS
  • Agora vou criar mais uma célula e colocar um comando simples para poder ler o banco de dados
  • %flink.ssql
    SELECT * FROM kinesis_pipeline_table_1;
    
  • Vamos ver lá no AWS Glue se o banco foi criado
  • image
  • Pronto, o banco foi criado da maneira correta no Glue
  • Agora vamos ver se o flink está lendo corretamente o stream do kinesis. Execute a aplicação do Cloud9 para simular acessos no site e depois execute o comando de select
  • image
  • No meu caso, funcionou, agora vamos para a segunda parte, que é construir o input para o segundo fluxo de dados do kinesis
  • Os dados que eu vou repassar para o segundo fluxo de dados é uma contagem de ações por usuário. Para isso, vou testar a seguinte query
  • SELECT user_id,event_type, count(event_type) FROM kinesis_pipeline_table_1 GROUP BY user_id,event_type;
    
  • image
  • Ok, está funcionando, agora, vou adicionar uma tabela nova antes da query, essa tabela vai corresponder ao segundo fluxo de dados
  • %flink.ssql
    DROP TABLE IF EXISTS kinesis_pipeline_table_2;
    /*Aqui, vou usar o schema dos dados que chegam para a aplicação no formato json*/
    CREATE TABLE kinesis_pipeline_table_2 (
      user_id BIGINT, 
      event_type VARCHAR(30), 
      qtd_event BIGINT
      )
      WITH (
          'connector' = 'kinesis',
          'stream' = 'data_ecomerce_2',
          'aws.region' = 'us-east-2',
          'format' = 'json'
          );
    
  • Vamos confirmar se a tabela foi criada no AWS Glue
  • image
  • Agora, vou inserir no insert into o comando de group by para inserir os dados no segundo fluxo do kinesis
  • Segundo a documentação precisamos definir janelas de tempo para que possamgos fazer agregamentos em gropuby e envia-los por stream para o kinesis, então vamos fazer uma alteração na tabela 1 e adicionar como datetime a coluna 'txn_timestamp'
  • O flink sql não tem suporte para alteração de colunas, então temos que apagar as colunas diretamente no glue, atera-las e executar o script novamente
  • A nova tabela para o primeiro fluxo de dados foi alterada. Temos 3 diferenças, a primeira é a mudança em do tipo da coluna 'txn_timestamp' e a criação de uma nova coluna WATERMARK como fala a documentação e a terceira mudança é a inserção do parâmetro 'json.timestamp-format.standard' = 'ISO-8601' para que possa ser lido normalmente nossos dados na forma como construimos na seção de aplicação python com AWS Cloud9
  • image
  • Executo o aplicativo python de simulação e vemos o coportamento da nova tabela
  • image
  • Parece que está ok. Agora vamos criar a aplicação para o segundo fluxo de dados em janelas de 1 minuto
  • Para executar o comando em uma janela de tempo, utilizei a seguinte query
  • %flink.ssql
    SELECT user_id,event_type, count(event_type) as qtd FROM kinesis_pipeline_table_1 GROUP BY TUMBLE(txn_timestamp,INTERVAL '1'  
    MINUTE),user_id,event_type;
    
  • Após executar com a aplicação de simulação pronta, temos o seguinte resultado
  • image
  • Ok, agora é so adicionar o INSERT e pronto
  • INSERT INTO kinesis_pipeline_table_2 
    SELECT user_id,event_type, count(event_type) as qtd FROM kinesis_pipeline_table_1 GROUP BY TUMBLE(txn_timestamp,INTERVAL '1'   
    MINUTE),user_id,event_type;
    
  • Agora, executamos a aplicação e vamos ver se os dados chegam no segundo fluxo de dados do kinesis
  • Deixei um tempo a aplicação funcionando
  • image
  • E os dados estão chegando para o segundo fluxo de dados
  • Estou enviando poucos dados porque tenho medo de cobranças, mas na foto abaixo já da pra visualizar os dados
  • image
  • Agora vamos por todas as células juntas em apenas uma célula e criar a build da aplicação
  • image
  • Depois, gere uma aplicação flink
  • image
  • Pronto, aplicação flink criada, Agora é só executar a aplicação e o pipeline vai funcionar corretamente, sem necessarimanete executar diretamente no zepelin
  • image

Construindo Sistema de Alerta com AWS SNS

  • Em algumas situações, podemos ser atacados por bots ou algo do tipo
  • Para essas situações, vamos construir um sistema de alerta que vai enviar um Email para uma conta cadastrada quando isso acontecer
  • Para isso, vá até o SNS e crie um tópico, os clientes vão consumir esse tópico
  • image
  • Crie como padrão
  • image
  • Deixe todos as config minimas e padrão e crie o tópico
  • Depois de criado, insira um consumidor criando assinatura
  • image
  • Coloque para enviar um email-json e coloque seu email
  • image
  • Para confirmar a assinatura, você entra no seu email, copia o url e confirma a assinatura com o URL enviado

Implementando Banco de Dados DynamoDB

  • Vamos agora criar o banco de dados que vai persistir os processamentos do segundo fluxo de dados do kinesis utilizando o AWS Lambda
  • Para isso, vá para o serviço DynamoDB e crie uma nova tabela
  • Agora dé um nome e uma chave primária
  • image
  • No restante, mantenha a configuração padrão mínima

Implementando AWS Lambda

  • Por motivos de custo, continuar utilizando essa arquitetura é muito caro para um projeto simples, então vou fazer uma leve modificação aqui. Em vez de processar os dados utilizando o flink (que me custou 4 dolares) vou fazer passar apenas o primeiro fluxo de dados
  • Aqui é importante salientar uma coisa, os fluxos de dados estão funcionando por padrão, então, bastaria implementar no AWS Lambda o gatilho para o fluxo de dados processados, em vez do primeiro fluxo de dados. Nesse caso, só estou mostrando minha expertise, mas não posso me dar o luxo de gastar muito com isso
  • Para criar o AWS Lambda, podemos começar de 2 formas. Uma é criando o recurso e implementando os códigos depois, e outra é criando os códigos, implementando a biblioteca e zipando tudo, depois só criar o recurso. Bom, vou optar por essa segunda opção
  • O AWS Lambda com python funciona através de um arquivo .zip que deve conter além dos códigos, as bibliotecas e os recursos, pode-se encontrar mais informações na documentação. Vou seguir os passos da documentação
  • A única biblioteca que pretendo usar é a boto3 do AWS SDK
  • Vou criar uma pasta onde vou criar o arquivo python lambda_function.py
  • Dentro da pasta, vou criar uma outra pasta com o nome de package, onde as bibliotecas vão ser instaladas
  • Agora, as instruções abaixo são para a instalação do pacote no windows
  • cd desktop
    cd aws-lambda-code
    pip install --target .\package boto3 --no-user
    
  • Crie um zip dos arquivos de pacote e delete a pasta packages e crie um zip dos arquivos junto com a função lambda
  • Vá para o lambda e crie uma nova função do zero. Deve ficar igual a de baixo
  • image
  • Para que tudo funcione da maneira correta, vá configurações -> permissões -> nome da função e clique na permissão que ta lá. Habilite a permissão de de administrador
  • Agora, que você tem as permissões, adicione um gatilho do kinesis. No meu caso, o gatilho vai ser o fluxo de dados 1. Mas também pode acionar o gatilho de fluxo de dados 2
  • image
  • Podemos testar para ver se está funcionando,para isso, escreva o seguinte código no lambda_function.py
  • from __future__ import print_function
    from datetime import datetime
    import base64
    import json
    import boto3
    import os
    def lambda_handler(event, context):
      for record in event['Records']:
         #Kinesis data is base64 encoded so decode here
         payload=base64.b64decode(record["kinesis"]["data"])
         return 'Successfully processed {} records.'.format(str(payload))
    
  • Faça o processo de zip e envie para o lambda. Após enviar, Vá na aba testar e execute o JSON Normal. Eu mudei a as informações do campo data do JSON para um encoded 64 diferente
  • É importante ressaltar que as informações chegam do kinesis em formato de base64, então você tem que fazer o decode da informação.
  • image
  • Podemos ver a string de retorno que criamos
  • Agora, vamos escrever o código que vai realizar a inserção no DynamoDB. Abaixo explicaremos melhor:
  • from __future__ import print_function
    from boto3.dynamodb.types import TypeSerializer
    import base64
    import json
    import boto3
    import os
    dynamodb = boto3.client('dynamodb', region_name='us-east-2')
    def convert_to_dynamodb(value):
      serializer = TypeSerializer()
      return serializer.serialize(value)
    def lambda_handler(event, context):
      for record in event['Records']:
         payload=base64.b64decode(record["kinesis"]["data"])
         json_document = json.loads(payload.decode('utf-8'))
         input_user_id = str(json_document['user_id'])
         json_document['id_usuario'] = 'userid{}'.format(input_user_id)
         json_document_dynamodb = {key: convert_to_dynamodb(value) for key, value in json_document.items()}
         dynamodb.put_item(TableName='{table_name}',Item=json_document_dynamodb)
         resposta_val = 'O Documento {} Foi Inserido com A Chave {}'.format(json_document,input_user_id)
         return resposta_val
    
    
  • Primeiro, nós chamamos o cliente do dynamodb através do boto3 em 'dynamodb = boto3.client('dynamodb', region_name='us-east-2')', depois construimos a função de serelização para poder inserir os dados no dynamodb, por padrão, o JSON normal não poder ser diretamente inserido na tabela então vamos transformar com a função 'def convert_to_dynamodb(value):', depois, entramos de fato na função lambda. Utilizamos o código para visualização do kinesis da documentação. Fizemos a decodificação dos dados, que já foi explicado anteriormente, adicionamos um campo 'id_usuario' que foi configurado como "chave_primaria" da tabela do dynamodb, depois faço a conversão de todos os dados para o tipo suportado utilizando a função convert_to_dynamodb e depois realizo o put na tabela do dynamodb
  • image
  • Em cima está o resultado do teste e agora vou mostrar a inserção no DynamoDB
  • image

OBS SOBRE OS CUSTOS E SOBRE O PROJETO

  • O projeto da maneira como está me gerou custos mesmo usando o periodo gratuito da AWS, por isso, nenhum desses sitemas vão ser ligados ao mesmo tempo, e vou me limitar a usar os estados de teste
  • No caso do Lambda, não vou executa-lo, se não pode gerar ainda mais custos, a partir de agora todas as seções vão ser feitas de maneira independente

Próximos passos e passos faltantes

  • Como apaguei os processos de stream porque estavam me gerando custos adicionais, vou utilizar a tabela glue estática que criamos anteriormente
  • Acabei não construindo a função de enviar mensagens, mas isso pode ser implementado com o boto3 dentro da função lambda. Como isso deve ocorrer: O processamento que lmabda deve ler é o processamento do flink, que retorna para o segundo fluxo de dados apenas um JSON normal com 'id_usuario','func','qtd'. A grande sacada é que o flink envia essas informações em lotes, então vamos ler os dados em lote, lendo os dados em lote podemos verificar a quantidade desses dados, por exemplo, se o lote do flink tiver sido programado para 1 minuto e apenas um usuário está com mais de 1000 em qualquer 'qtd' de 'func' então enviariamos um email de alerta de ataque através do boto3 SDK e uma conexão com SNS
  • Novamente, o projeto começou a ficar muito extenso, então vou parar po aqui. Tambem já me gerou alguns custos adicionais, realizar trabalhos no glue studio vai me gerar ainda mais custos adicionais, acredito que o que já construi até aqui é o suficiente

About

Neste projeto, usaremos um conjunto de dados de comércio eletrônico para simular os registros de compras do usuário, visualizações de produtos, histórico de carrinho e jornada do usuário na plataforma online para criar dois pipelines analíticos, Lote e Tempo Real.

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published