Este projeto envolve a criação de um contêiner no Docker para restaurar o backup do banco de dados original, que é um SQL Server. Em seguida, os dados são transferidos para um banco de dados PostgreSQL na AWS, transformados e posteriormente visualizados no Power BI. Ao longo deste processo, são utilizadas várias tecnologias e serviços da AWS, como RDS, DMS, Glue e Athena, que garantem uma manipulação de dados eficiente e eficaz ao longo de todo processo.
Utilização do Terraform como ferramenta de infraestrutura como código (IaC). O uso do Terraform como uma ferramenta de infraestrutura como código (IaC) oferece diversos benefícios para o projeto. Com o Terraform, é possível definir a infraestrutura como código. Isso permite um controle preciso e rastreamento dos recursos provisionados, garantindo consistência e facilitando a revisão e auditoria.
- Overview da solução
- Desafios
- Potenciais Impactos
- Backup do SQL Server
- Instalação Docker
- Criação de um container sql server.
- Importação do backup para o container
- Criação de uma instância do RDS Postgres
- Migração dos dados do banco Sql Server no Docker para RDS Postgres
- Migração dos dados do RDS para S3 com DMS
- Transformação dos dados com AWS Glue
- Armazenamento dos dados processados no S3
- Consulta dos dados com o AWS Athena
- Visualização dos dados com Power BI
- Garantia da integridade e consistência dos dados
- Otimização do desempenho dos recursos
- Segurança dos dados durante todo o fluxo
- Qualidade dos dados
- Governança de dados
- Identificação dos repórteres mais ativos
- Analise dos locais das matérias e as cidades com maior cobertura
- Distribuição de tipos de matérias
- Identificação de cinegrafistas e produtores mais ativos
- Análise temporal: identificação de padrões sazonais, quantidade e tipos de matérias por período
- Identificação da subutilização do sistema
-
Criação de um container docker para instalar o sql server e restaurar o backup do banco de dados. Instalação do docker em ambiente windows
-
No PowerShell
Pull da imagem de contêiner do SQL Server mais recente.
docker pull mcr.microsoft.com/mssql/server:2022-latest
- Criação do contêiner docker SQL Server usando a imagem
docker run -e 'ACCEPT_EULA=Y' -e 'SA_PASSWORD=sua_senha_aqui' \
-p 1433:1433 --name meu-sql-server \
-v home/pasta_exemplo/pasta_exemplo:/var/opt/mssql
-d mcr.microsoft.com/mssql/server:2022-latest
- Movendo o arquivo de backup para o contêiner
docker cp /path/to/database.bak sql_server_conteiner_id:/var/opt/mssql/data/database.bak
- Interagindo com o conteiner em execução.
docker exec -it conteiner_id /bin/bash
- Restaurando o banco de dados utilizando a ferramenta de linha de comando 'sqlcmd'.
/opt/mssql-tools/bin/sqlcmd -S localhost -U SA -P "sua_senha"
RESTORE DATABASE database FROM DISK = 'database.bak'
GO
Nesta etapa, criamos uma instância do RDS Postgres. A principal justificativa para a escolha do RDS Postgres baseia-se na sua excelente relação custo-benefício, aliada à ampla gama de recursos disponíveis.
Utilização do terraform para provisionamento de recursos e gerenciamento da infraestrutura.
Migração de dados de um banco SQL Server rodando no contêiner Docker para um banco PostgreSQL na AWS RDS.
Python
Bibliotecas pyodbc, pandas e sqlalchemy.
Código Migração SQL Server para o RDS ⬇️
migration-sql-server-rds-postgres.py
import pyodbc
import pandas as pd
import pymysql
from sqlalchemy import create_engine
#Estabelecendo conexão com o banco de dados no docker
server = 'server'
database = 'db'
username = 'username'
password = '*****'
cnxn = pyodbc.connect('DRIVER={SQL Server};SERVER='+server+';DATABASE='+database+';UID='+username+';PWD='+ password)
#Extraindo somente as tabelas que serão utilizadas
query_materia = "SELECT * FROM materia;"
df_materia = pd.read_sql(query_materia, cnxn)
query_usuario = "SELECT * FROM usuario;"
df_usuario = pd.read_sql(query_usuario, cnxn)
query_muni = "SELECT * FROM municipio;"
df_muni = pd.read_sql(query_muni, cnxn)
query_tm = "SELECT * FROM tipomateria;"
df_tm = pd.read_sql(query_tm, cnxn)
query_edi = "SELECT * FROM editoria;"
df_edi = pd.read_sql(query_edi, cnxn)
#Fazendo a ingestão no banco postgre do RDS da AWS
engine = create_engine('postgresql://postgres:*****@db.url:porta/banco')
#Dataframes que serão inseridos
dataframes = [df_materia, df_usuario, df_muni, df_tm, df_edi]
#Nomeando as tabelas
nomes_tabelas = ['dbo_materia', 'dbo_usuario','dbo_municipio','dbo_tipomateria','dbo_editoria']
#Inteirando nos dataframes e inserindo os dados no rds
for df, tabela in zip(dataframes, nomes_tabelas):
df.to_sql(tabela, engine, if_exists='replace')
A migração dos dados para o Amazon S3 apresenta várias vantagens, o s3 é altamente escalável, é projeto para fornecer 99,99% de disponibilidade, não há custos mínimos nem compromissos iniciais e tem uma ótima integração com outros serviços da AWS.
Cada etapa será realizada via códigos Terraform
Para realizar a migração dos dados da instância RDS para o S3 usando o AWS DMS (Data Migration Service), seguiremos algumas etapas.
- Configuração do IAM: Criação de uma função de IAM que o AWS DMS utilize para acessar o bucket do S3
- Criação da instância de replicação do DMS: Será responsável por gerenciar a migração do dados
- Criação do endpoint de origem no DMS: Será a conexão com a instância do RDS
- Criação do endpoint de destino no DMS: Será a conexão com o bucket S3
- Criação de uma tarefa de migração no DMS: A tarefa usará a instância de replicação, o endpoint de origem e o endpoint de destino para migrar os dados
Após a criação e iniciação da tarefa de migração, o AWS DMS começará a migrar os dados do seu banco de dados RDS para o bucket S3
⚙️ Preparação e o carregamento dos dados para análise.
O AWS Glue também apresenta várias vantagens na sua ultilização, é um serviço serverless, oferece recursos para descoberta automática de dados, interface gráfica interativa, é facilmente escalável e se integra bem com outros serviços da AWS.
Etapas
- job ETL: execução do script python
- Extração: obtenção do dados brutos
- Transformação, limpeza dos dados, remoção de duplicatas, preenchimento de valores ausentes, modelagem, normalização, agregação de dados.
- Limpeza e validação, verificação de consistência, integridade e validade
- Carregamento dos dados no destino final
- Crawler
- Definição de um crawler para rastrear os dados, criar um banco dentro do catalogo de dados.
- Crawler name : crawler-tvnews
- Crawler soucer type: Data stores
- Repeat crawls od s3 data stores: crawl all folders
- Escolha um datastore: S3
- conexão: sem conexão
- Rastrear dados no > Caminho especificado na minha conta: s3a://tvnews-curated-prod/tvnews-full
- Função do IAM: Crie uma função do IAM: tvnews-role-crawler
- Frequência: Executar sob demanda
- Banco de dados: database
- Prefixo adicionados a tabelas: crawler_tvnews_
- Opções de configuração: Atualizar a definição da tabela no catálogo de dados
- Como o AWS Glue deve lidar com a exclusão de objetos no datastore?: Marcar a tabela como suspensa no catálogo de dados
- Execução do crawler criado
- Dados disponiveis no Athena
Código Glue job ⬇️
glue-job-tvnews.py
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
-- setup da aplicação Spark
spark = SparkSession \
.builder \
.appName("job-glue-spark-tvnews") \
.getOrCreate()
-- definindo o método de logging da aplicação use INFO somente para DEV [INFO,ERROR]
spark.sparkContext.setLogLevel("ERROR")
df_mat = spark.read.format("csv")\
.option("header", "True")\
.option("inferSchema","True")\
.csv("s3a://tvnews-landing-prod/materia.csv")
df_usu = spark.read.format("csv")\
.option("header", "True")\
.option("inferSchema","True")\
.csv("s3a://tvnews-landing-prod/usuarios.csv")
df_edi = spark.read.format("csv")\
.option("header", "True")\
.option("inferSchema","True")\
.csv("s3a://tvnews-landing-prod/editoria.csv")
df_muni = spark.read.format("csv")\
.option("header", "True")\
.option("inferSchema","True")\
.csv("s3a://tvnews-landing-prod/municipio.csv")
df_tpmat = spark.read.format("csv")\
.option("header", "True")\
.option("inferSchema","True")\
.csv("s3a://tvnews-landing-prod/tipomateria.csv")
-- converte para formato parquet
print ("\nEscrevendo os dados lidos da raw para parquet na processing zone...")
df_mat.write.format("parquet")\
.mode("overwrite")\
.save("s3a://tvnews-processed-prod/dbo_materia.parquet")
df_usu.write.format("parquet")\
.mode("overwrite")\
.save("s3a://tvnews-processed-prod/dbo_usuario.parquet")
df_edi.write.format("parquet")\
.mode("overwrite")\
.save("s3a://tvnews-processed-prod/dbo_editoria.parquet")
df_muni.write.format("parquet")\
.mode("overwrite")\
.save("s3a://tvnews-processed-prod/dbo_municipio.parquet")
df_tpmat.write.format("parquet")\
.mode("overwrite")\
.save("s3a://tvnews-processed-prod/dob_tipomateria.parquet")
-- lendo arquivos parquet
df_mat_parquet = spark.read.format("parquet")\
.load("s3a://tvnews-processed-prod/dbo_materia.parquet/*.parquet")
df_usu_parquet = spark.read.format("parquet")\
.load("s3a://tvnews-processed-prod/dbo_usuario.parquet/*.parquet")
df_edi_parquet = spark.read.format("parquet")\
.load("s3a://tvnews-processed-prod/dbo_editoria.parquet/*.parquet")
df_muni_parquet = spark.read.format("parquet")\
.load("s3a://tvnews-processed-prod/dbo_municipio.parquet/*.parquet")
df_tpmat_parquet = spark.read.format("parquet")\
.load("s3a://tvnews-processed-prod/dob_tipomateria.parquet/*.parquet")
-- Selecionando apenas as colunas necessarias dos dataframes
df_usu_parquet = df_usu_parquet.select('CODUSUARIO', 'US_NOME')
df_muni_parquet = df_muni_parquet.select('CODMUNICIPIO', 'MU_NOME')
df_edi_parquet = df_edi_parquet.select('CODEDITORIA', 'ED_DESCRICAO')
df_tpmat_parquet = df_tpmat_parquet.select('CODTIPOMATERIA', 'TM_DESCRICAO')
-- Substituindo o codigo dos reportes, produtores, cinegrafistas pelo nome
df_joined = df_mat_parquet.join(df_usu_parquet, df_mat_parquet.CODREPORTER == df_usu_parquet.CODUSUARIO, 'left').withColumnRenamed('US_NOME', 'REPORTER').withColumnRenamed('CODUSUARIO', 'CODREPORTER_')
df_joined = df_joined.join(df_usu_parquet, df_mat_parquet.CODPRODUTOR == df_usu_parquet.CODUSUARIO, 'left').withColumnRenamed('US_NOME', 'PRODUTOR').withColumnRenamed('CODUSUARIO', 'CODPRODUTOR_')
df_joined = df_joined.join(df_usu_parquet, df_mat_parquet.CODCINEGRAFISTA == df_usu_parquet.CODUSUARIO, 'left').withColumnRenamed('US_NOME', 'CINEGRAFISTA').withColumnRenamed('CODUSUARIO', 'CODPRODUTOR_')
-- Substituindo o codigo dos municipios, editoria, tipo da materia por seus respectivos nomes
df_joined = df_joined.join(df_muni_parquet, df_mat_parquet.CODMUNICIPIO == df_muni_parquet.CODMUNICIPIO, 'left').withColumnRenamed('MU_NOME', 'MUNICIPIO').withColumnRenamed('CODMUNICIPIO', 'CODREPORTER_')\
.join(df_edi_parquet, df_mat_parquet.CODEDITORIA == df_edi_parquet.CODEDITORIA, 'left').withColumnRenamed('ED_DESCRICAO', 'EDITORIA').withColumnRenamed('CODEDITORIA', 'CODEDITORIA_')\
.join(df_tpmat_parquet, df_mat_parquet.CODTIPOMATERIA == df_tpmat_parquet.CODTIPOMATERIA, 'left').withColumnRenamed('TM_DESCRICAO', 'TIPO_MATERIA').withColumnRenamed('CODTIPOMATERIA', 'CODTIPOMATERIA_')
df_soft = df_joined.select('CODMATERIA', 'PRODUTOR','REPORTER','CINEGRAFISTA','MUNICIPIO','TIPO_MATERIA','EDITORIA','MA_DATA','MA_LOCAL','MA_RETRANCA')
df_soft = df_soft.withColumn("MA_LOCAL", when(col("MA_LOCAL") == 'POLÍCIA', 'DELEGACIA').otherwise(col("MA_LOCAL")))\
.withColumn("MA_LOCAL", when(col("MA_LOCAL") == 'POLICIA', 'DELEGACIA').otherwise(col("MA_LOCAL")))\
.withColumn("MA_LOCAL", when(col("MA_LOCAL") == 'ESTADIO', 'ESTÁDIO ').otherwise(col("MA_LOCAL")))\
.withColumn("MA_LOCAL", when(col("MA_LOCAL") == 'COLOSSO', 'ESTÁDIO ').otherwise(col("MA_LOCAL")))\
.withColumn("MA_LOCAL", when(col("MA_LOCAL") == 'CÂMARA', 'CÂMARA DE VEREADORES').otherwise(col("MA_LOCAL")))
-- Limpeza na coluna MA_LOCAL
df_final = df_soft.withColumn('MA_LOCAL', when(col('MA_LOCAL').rlike('X{2,}'), 'NAO ESPECIFICADO').otherwise(col('MA_LOCAL')))\
.withColumn('MA_LOCAL', when(col('MA_LOCAL').rlike('\.{2,}'), 'NAO ESPECIFICADO').otherwise(col('MA_LOCAL')))\
.withColumn('MA_LOCAL', when(col('MA_LOCAL').rlike('\,{2,}'), 'NAO ESPECIFICADO').otherwise(col('MA_LOCAL')))\
.withColumn('MA_LOCAL', when(col('MA_LOCAL').rlike('\\*,{2,}'), 'NAO ESPECIFICADO').otherwise(col('MA_LOCAL')))\
.withColumn('MA_LOCAL', when(col('MA_LOCAL').rlike('Z{2,}'), 'NAO ESPECIFICADO').otherwise(col('MA_LOCAL')))\
.withColumn('MA_LOCAL', when(col('MA_LOCAL').rlike('\;{2,}'), 'NAO ESPECIFICADO').otherwise(col('MA_LOCAL')))\
.withColumn('MA_LOCAL', when(col('MA_LOCAL').rlike('-{2,}'), 'NAO ESPECIFICADO').otherwise(col('MA_LOCAL')))
-- Crie uma lista das palavras que você deseja substituir
palavras_para_substituir = [ 'NA PAURTA',r'\bS\b',r'\bN PAUTA\b',r'\b-\b','NA PAUTA', r'\b//\b','NA PAUTA//',,'TV','STM','VÁRIOS','///','/','VER PAUTA','VER PAUTA','INDEFINIDO',r'\bRUA\b','VER NA PAUTA','RUAS','A DEFINIR','\\.', r'\b;\b', r'\bVÍDEO\b','\\*+',r'\bR\b',r'\bRR\b','RRR','SSSS',r'\bD\b','AAA',r'\bAA\b','////','SSS',r'\bSS\b',r'\bQ\b',r'\bVARIOS\b','LLLL','NA ´PAUTA','N APAUTA','NA OPAUTA','GGG','NA PAUTAS'] # note que estamos escapando o ponto
-- Crie a expressão regular
regex = '|'.join(palavras_para_substituir)
-- Substitua a coluna se ela contiver qualquer uma das palavras na lista
df_final = df_final.withColumn('MA_LOCAL', when(col('MA_LOCAL').rlike(regex), 'NAO ESPECIFICADO').otherwise(col('MA_LOCAL')))
df_final.show(truncate=False)
df_final.repartition(1)\
.write\
.format("parquet")\
.mode("overwrite")\
.save("s3a://tvnews-curated-prod/tvnews-full.parquet/*.parquet")
O processo de ETL pode ser contínuo, permitindo que os dados sejam atualizados e refinados ao longo do tempo.
🌐 Os dados serão organizados e armazenados em três buckets distintos: Landing, Processing e Curated, todos integrantes de um sistema de Data Lake.
🪣 Bucket Landing: O armazenamento dos dados em seu estado bruto, em formato csv, sem nenhum tipo de manipulação ou processamento.
🪣 Bucket Processing: Armazena os dados convertidos para o formato Parquet.
🪣 Bucket Curated: Armazena o resultado final do processo de ETL, nessa etapa, os dados já foram limpos, transformados, validados, modelados e estão prontos para serem consumidos e usados para gerar valor para a organização.
Em conjunto, esses três buckets formam a estrutura do Data Lake, permitindo um fluxo de dados eficiente e bem organizado, que facilita a manipulação, análise e utilização desses dados
Buckets Provisionados via Terraform
⚡ Serviço interativo de consultas que facilita a análise de dados diretamente no Amazon S3 usando SQL padrão
Após a execução do crawler o aws glue cria uma tabela no AwsDataCatalog e essa tabela fica disponível no editor do Athena.
Configurações > Manage Settings > Direcione o resultado das consultas para o backet athena-query-tvnews
Código para consultar as tabelas diretamente no Athena ⬇️
querys-athena-tvnews.sql
-- Consultando as editorias com mais ocorrencias em 2022
SELECT extract(year from MA_DATA) AS ANO, EDITORIA, COUNT(*) AS TOTAL
FROM crawler_bucket_tvnews_prod
WHERE extract(year from MA_DATA) = 2022
GROUP BY extract(year from MA_DATA), EDITORIA
ORDER BY TOTAL DESC
-- Reporter que mais colaborarão no ano de 2023
WITH data AS (
SELECT extract(year from MA_DATA) AS ANO, REPORTER
FROM crawler_bucket_tvnews_prod
)
SELECT ANO, REPORTER, COUNT(*) AS TOTAL
FROM data
WHERE ANO = 2023
GROUP BY ANO, REPORTER
ORDER BY TOTAL DESC
-- Reporter e cinegrafista que mais trabalharam juntos no ano de 2023
WITH data AS (
SELECT extract(year from MA_DATA) AS ANO, REPORTER, CINEGRAFISTA
FROM dados
)
SELECT REPORTER, CINEGRAFISTA, COUNT(*) AS TOTAL
FROM data
WHERE ANO = 2023
GROUP BY REPORTER, CINEGRAFISTA
ORDER BY TOTAL DESC
LIMIT 10
-- Repórter e editoria e ordenados em ordem decrescente pelo número total de matérias
WITH data AS (
SELECT extract(year from MA_DATA) AS ANO, REPORTER, EDITORIA
FROM dados
)
SELECT REPORTER, EDITORIA, COUNT(*) AS TOTAL_MAT
FROM data
WHERE ANO = 2023
GROUP BY REPORTER, EDITORIA
ORDER BY TOTAL_MAT DESC
📊 📈 Setup Athena + Power BI
- Conexão ao Amazon Athena com ODBC
- Baixar Simba Athena ODBC Driver
- Fonte de dados ODBC >> DSN de Sistema >> Simba Athena ODBC drive >> Configurar o Simba com os dados corretos >> configurar as opções de autenticação >> No Power BI >> Obter dados >> ODBC >> DNS
- Baixar Simba Athena ODBC Driver
Visualizações