Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
aomegax committed Aug 10, 2023
1 parent c11bbd1 commit eb1e578
Show file tree
Hide file tree
Showing 12 changed files with 89 additions and 115 deletions.
12 changes: 6 additions & 6 deletions .devops/deploy-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,21 @@ variables:
AZURE_SUBSCRIPTION: $(DEV_AZURE_SUBSCRIPTION)
APP_NAME: $(DEV_WEB_APP_NAME)
STAGE: "d"
RESOURCE_GROUP: 'pagopa-d-weu-nodo-re-to-datastore-rg'
RESOURCE_GROUP: 'pagopa-d-weu-nodo-re-to-tablestorage-rg'
dockerRegistryServiceConnection: $(DEV_CONTAINER_REGISTRY_SERVICE_CONN)
dockerNamespace: $(DEV_CONTAINER_NAMESPACE)
${{ if eq(parameters['ENV'], 'uat') }}:
AZURE_SUBSCRIPTION: $(UAT_AZURE_SUBSCRIPTION)
APP_NAME: $(UAT_WEB_APP_NAME)
STAGE: "u"
RESOURCE_GROUP: 'pagopa-u-weu-nodo-re-to-datastore-rg'
RESOURCE_GROUP: 'pagopa-u-weu-nodo-re-to-tablestorage-rg'
dockerRegistryServiceConnection: $(UAT_CONTAINER_REGISTRY_SERVICE_CONN)
dockerNamespace: $(UAT_CONTAINER_NAMESPACE)
${{ if eq(parameters['ENV'], 'prod') }}:
AZURE_SUBSCRIPTION: $(PROD_AZURE_SUBSCRIPTION)
APP_NAME: $(PROD_WEB_APP_NAME)
STAGE: "p"
RESOURCE_GROUP: 'pagopa-p-weu-nodo-re-to-datastore-rg'
RESOURCE_GROUP: 'pagopa-p-weu-nodo-re-to-tablestorage-rg'
dockerRegistryServiceConnection: $(PROD_CONTAINER_REGISTRY_SERVICE_CONN)
dockerNamespace: $(PROD_CONTAINER_NAMESPACE)

Expand Down Expand Up @@ -155,7 +155,7 @@ stages:
condition: in('${{ parameters.ENV }}', 'dev', 'uat')
inputs:
azureSubscription: $(AZURE_SUBSCRIPTION)
appName: "${{variables.APP_NAME}}-nodo-re-fn"
appName: "${{variables.APP_NAME}}-nodo-re-ts-fn"
imageName: "${{variables.dockerNamespace}}/${{ variables.imageRepository }}:latest"
slotName: production
resourceGroupName: $(RESOURCE_GROUP)
Expand All @@ -164,7 +164,7 @@ stages:
condition: eq('${{ parameters.ENV }}', 'prod')
inputs:
azureSubscription: $(AZURE_SUBSCRIPTION)
appName: "${{variables.APP_NAME}}-nodo-re-fn"
appName: "${{variables.APP_NAME}}-nodo-re-ts-fn"
imageName: "${{variables.dockerNamespace}}/${{ variables.imageRepository }}:latest"
deployToSlotOrASE: true
slotName: staging
Expand Down Expand Up @@ -198,7 +198,7 @@ stages:
displayName: Swapping App Service Deploy
inputs:
ConnectedServiceName: $(AZURE_SUBSCRIPTION)
WebAppName: "${{variables.APP_NAME}}-nodo-re-fn"
WebAppName: "${{variables.APP_NAME}}-nodo-re-ts-fn"
ResourceGroupName: $(RESOURCE_GROUP)
SourceSlot: staging
SwapWithProduction: true
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/03_code_review.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ on:
workflow_dispatch:

env:
PROJECT_KEY: pagopa_pagopa-nodo-re-to-datastore
PROJECT_KEY: pagopa_pagopa-nodo-re-to-tablestorage


permissions:
Expand Down
2 changes: 1 addition & 1 deletion .identity/99_variables.tf
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
locals {
github = {
org = "pagopa"
repository = "pagopa-nodo-re-to-datastore"
repository = "pagopa-nodo-re-to-tablestorage"
}

prefix = "pagopa"
Expand Down
2 changes: 1 addition & 1 deletion .identity/env/dev/backend.tfvars
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
resource_group_name = "io-infra-rg"
storage_account_name = "pagopainfraterraformdev"
container_name = "azurermstate"
key = "pagopa-nodo-re-to-datastore.tfstate"
key = "pagopa-nodo-re-to-tablestorage.tfstate"
2 changes: 1 addition & 1 deletion .identity/env/dev/terraform.tfvars
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ tags = {
CreatedBy = "Terraform"
Environment = "Dev"
Owner = "pagoPA"
Source = "https://github.com/pagopa/pagopa-nodo-re-to-datastore"
Source = "https://github.com/pagopa/pagopa-nodo-re-to-tablestorage"
CostCenter = "TS310 - PAGAMENTI & SERVIZI"
}
2 changes: 1 addition & 1 deletion .identity/env/prod/backend.tfvars
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
resource_group_name = "io-infra-rg"
storage_account_name = "pagopainfraterraformprod"
container_name = "azurermstate"
key = "pagopa-nodo-re-to-datastore.tfstate"
key = "pagopa-nodo-re-to-tablestorage.tfstate"
2 changes: 1 addition & 1 deletion .identity/env/prod/terraform.tfvars
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ tags = {
CreatedBy = "Terraform"
Environment = "Prod"
Owner = "pagoPA"
Source = "https://github.com/pagopa/pagopa-nodo-re-to-datastore"
Source = "https://github.com/pagopa/pagopa-nodo-re-to-tablestorage"
CostCenter = "TS310 - PAGAMENTI & SERVIZI"
}
2 changes: 1 addition & 1 deletion .identity/env/uat/backend.tfvars
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
resource_group_name = "io-infra-rg"
storage_account_name = "pagopainfraterraformuat"
container_name = "azurermstate"
key = "pagopa-nodo-re-to-datastore.tfstate"
key = "pagopa-nodo-re-to-tablestorage.tfstate"
2 changes: 1 addition & 1 deletion .identity/env/uat/terraform.tfvars
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ tags = {
CreatedBy = "Terraform"
Environment = "Uat"
Owner = "pagoPA"
Source = "https://github.com/pagopa/pagopa-nodo-re-to-datastore"
Source = "https://github.com/pagopa/pagopa-nodo-re-to-tablestorage"
CostCenter = "TS310 - PAGAMENTI & SERVIZI"
}
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
# pagoPA Functions nodo-re-to-datastore
# pagoPA Functions nodo-re-to-tablestorage

Java nodo-re-to-datastore Azure Function.
The function aims to dump RE sent via Azure Event Hub to a CosmosDB, with a TTL of 120 days, and to an Azure Table Storage with a TTL of 10 years.
Java nodo-re-to-tablestorage Azure Function.
The function aims to dump RE sent via Azure Event Hub to an Azure Table Storage with a TTL of 10 years.

[![Quality Gate Status](https://sonarcloud.io/api/project_badges/measure?project=pagopa_pagopa-nodo-re-to-datastore&metric=alert_status)](https://sonarcloud.io/dashboard?id=pagopa_pagopa-nodo-re-to-datastore)
[![Quality Gate Status](https://sonarcloud.io/api/project_badges/measure?project=pagopa_pagopa-nodo-re-to-tablestorage&metric=alert_status)](https://sonarcloud.io/dashboard?id=pagopa_pagopa-nodo-re-to-tablestorage)


---

## Run locally with Docker
`docker build -t pagopa-functions-nodo-re-to-datastore .`
`docker build -t pagopa-functions-nodo-re-to-tablestorage .`

`docker run -it -rm -p 8999:80 pagopa-functions-nodo-re-to-datastore`
`docker run -it -rm -p 8999:80 pagopa-functions-nodo-re-to-tablestorage`

### Test
`curl http://localhost:8999/example`
Expand Down
12 changes: 3 additions & 9 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,18 @@
<modelVersion>4.0.0</modelVersion>

<groupId>it.gov.pagopa</groupId>
<artifactId>nodoretodatastore</artifactId>
<artifactId>nodoretotablestorage</artifactId>
<version>0.1.1-14</version>
<packaging>jar</packaging>

<name>Nodo RE to datastore Fn</name>
<name>Nodo RE to Table Storage Fn</name>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>11</java.version>
<azure.functions.maven.plugin.version>1.26.0</azure.functions.maven.plugin.version>
<azure.functions.java.library.version>3.0.0</azure.functions.java.library.version>
<functionAppName>pagopa-d-weu-nodo-re-to-datastore-fn</functionAppName>
<functionAppName>pagopa-d-weu-nodo-re-to-tablestorage-fn</functionAppName>
<resteasy.version>3.15.3.Final</resteasy.version>
</properties>

Expand Down Expand Up @@ -155,12 +155,6 @@
<version>2.15.2</version>
</dependency>

<!-- <dependency>-->
<!-- <groupId>org.mongodb</groupId>-->
<!-- <artifactId>mongo-java-driver</artifactId>-->
<!-- <version>3.4.2</version>-->
<!-- </dependency>-->

<!-- Jackson END-->

<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
package it.gov.pagopa.nodoretodatastore;

package it.gov.pagopa.nodoretotablestorage;

import com.azure.data.tables.TableClient;
import com.azure.data.tables.TableServiceClient;
import com.azure.data.tables.TableServiceClientBuilder;
import com.azure.data.tables.models.TableEntity;
import com.azure.data.tables.models.TableTransactionAction;
import com.azure.data.tables.models.TableTransactionActionType;
import com.microsoft.azure.functions.ExecutionContext;
import com.microsoft.azure.functions.OutputBinding;
import com.microsoft.azure.functions.annotation.BindingName;
import com.microsoft.azure.functions.annotation.Cardinality;
import com.microsoft.azure.functions.annotation.CosmosDBOutput;
import com.microsoft.azure.functions.annotation.EventHubTrigger;
import com.microsoft.azure.functions.annotation.FunctionName;
import it.gov.pagopa.nodoretodatastore.util.ObjectMapperUtils;
import lombok.NonNull;

import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.logging.Logger;
Expand All @@ -33,46 +36,39 @@ public class NodoReEventToDataStore {

private Pattern replaceDashPattern = Pattern.compile("-([a-zA-Z])");
private static String NA = "NA";
private static String idField = "uniqueId";
// private static String tableName = System.getenv("TABLE_STORAGE_TABLE_NAME");
private static String insertedTimestamp = "insertedTimestamp";
private static String insertedDate = "insertedDate";
private static String partitionKey = "PartitionKey";
private static String uniqueIdField = "uniqueId";
private static String insertedDateField = "insertedDate";
private static String insertedTimestampField = "insertedTimestamp";
private static String partitionKeyField = "PartitionKey";
private static String payloadField = "payload";
private static String idDominioField = "idDominio";

private static TableServiceClient tableServiceClient = null;

private static String tableName = System.getenv("TABLE_STORAGE_TABLE_NAME");

private static TableServiceClient getTableServiceClient(){
if(tableServiceClient==null) {
tableServiceClient = new TableServiceClientBuilder().connectionString(System.getenv("TABLE_STORAGE_CONN_STRING"))
.buildClient();
tableServiceClient.createTableIfNotExists(tableName);
}
return tableServiceClient;
}

// private static MongoClient mongoClient = null;

// private static TableServiceClient tableServiceClient = null;

// private static MongoClient getMongoClient(){
// if(mongoClient==null){
// mongoClient = new MongoClient(new MongoClientURI(System.getenv("COSMOS_CONN_STRING")));
// }
// return mongoClient;
// }

// private static TableServiceClient getTableServiceClient(){
// if(tableServiceClient==null){
// tableServiceClient = new TableServiceClientBuilder().connectionString(System.getenv("TABLE_STORAGE_CONN_STRING"))
// .buildClient();
// tableServiceClient.createTableIfNotExists(tableName);
// }
// return tableServiceClient;
// }


// private void addToBatch(Logger logger, Map<String,List<TableTransactionAction>> partitionEvents, Map<String,Object> reEvent){
// if(reEvent.get(idField) == null) {
// logger.warning("event has no '" + idField + "' field");
// } else {
// TableEntity entity = new TableEntity((String) reEvent.get(partitionKey), (String)reEvent.get(idField));
// entity.setProperties(reEvent);
// if(!partitionEvents.containsKey(entity.getPartitionKey())){
// partitionEvents.put(entity.getPartitionKey(),new ArrayList<TableTransactionAction>());
// }
// partitionEvents.get(entity.getPartitionKey()).add(new TableTransactionAction(TableTransactionActionType.UPSERT_REPLACE,entity));
// }
// }
private void addToBatch(Logger logger, Map<String,List<TableTransactionAction>> partitionEvents, Map<String, Object> reEvent) {
if(reEvent.get(uniqueIdField) == null) {
logger.warning("event has no '" + uniqueIdField + "' field");
}
else {
TableEntity entity = new TableEntity((String) reEvent.get(partitionKeyField), (String)reEvent.get(uniqueIdField));
entity.setProperties(reEvent);
if(!partitionEvents.containsKey(entity.getPartitionKey())){
partitionEvents.put(entity.getPartitionKey(),new ArrayList<TableTransactionAction>());
}
partitionEvents.get(entity.getPartitionKey()).add(new TableTransactionAction(TableTransactionActionType.UPSERT_REPLACE,entity));
}
}

private String replaceDashWithUppercase(String input) {
if(!input.contains("-")){
Expand All @@ -89,8 +85,8 @@ private String replaceDashWithUppercase(String input) {
return sb.toString();
}

private void zipPayload(Logger logger,Map<String,Object> reEvent) {
if(reEvent.get(payloadField)!=null){
private void zipPayload(Logger logger, Map<String,Object> reEvent) {
if(reEvent.get(payloadField)!=null) {
try {
byte[] data = ((String)reEvent.get(payloadField)).getBytes(StandardCharsets.UTF_8);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Expand All @@ -105,6 +101,11 @@ private void zipPayload(Logger logger,Map<String,Object> reEvent) {
}
}

private Map<String,Object> getEvent(String partitionKeyValue, Map<String,Object> reEvent) {
reEvent.put(partitionKeyField, partitionKeyValue);
return reEvent;
}

@FunctionName("EventHubNodoReEventProcessor")
public void processNodoReEvent (
@EventHubTrigger(
Expand All @@ -114,63 +115,43 @@ public void processNodoReEvent (
cardinality = Cardinality.MANY)
List<String> reEvents,
@BindingName(value = "PropertiesArray") Map<String, Object>[] properties,
@CosmosDBOutput(
name = "NodoReEventToDataStore",
databaseName = "nodo_re",
containerName = "events",
createIfNotExists = false,
connection = "COSMOS_CONN_STRING")
@NonNull OutputBinding<List<Object>> documentdb,
final ExecutionContext context) {

Logger logger = context.getLogger();

// TableClient tableClient = getTableServiceClient().getTableClient(tableName);
String msg = String.format("Persisting %d events", reEvents.size());
logger.info(msg);
TableClient tableClient = getTableServiceClient().getTableClient(tableName);

logger.info(String.format("Persisting %d events", reEvents.size()));
try {
if (reEvents.size() == properties.length) {
// Map<String,List<TableTransactionAction>> partitionEvents = new HashMap<>();
List<Object> eventsToPersistCosmos = new ArrayList<>();
Map<String,List<TableTransactionAction>> partitionEvents = new HashMap<>();

for(int index=0; index< properties.length; index++) {
// logger.info("processing "+(index+1)+" of "+properties.length);
final Map<String,Object> reEvent = ObjectMapperUtils.readValue(reEvents.get(index), Map.class);
properties[index].forEach((p,v) -> {
String s = replaceDashWithUppercase(p);
reEvent.put(s, v);
});

reEvent.put("id", reEvent.get("uniqueId"));

String insertedDateValue = reEvent.get(insertedTimestamp) != null ? ((String)reEvent.get(insertedTimestamp)).substring(0, 10) : NA;
reEvent.put(insertedDate, insertedDateValue);
String insertedDateValue = reEvent.get(insertedTimestampField) != null ? ((String)reEvent.get(insertedTimestampField)).substring(0, 10) : NA;
reEvent.put(insertedDateField, insertedDateValue);

String idDominio = reEvent.get("idDominio") != null ? reEvent.get("idDominio").toString() : NA;
String idPsp = reEvent.get("psp") != null ? reEvent.get("psp").toString() : NA;
String partitionKeyValue = insertedDateValue + "-" + idDominio + "-" + idPsp;
reEvent.put(partitionKey, partitionKeyValue);
zipPayload(logger, reEvent);

// zipPayload(logger,reEvent);
reEvent.put(payloadField, null);
String idDominio = reEvent.get(idDominioField) != null ? reEvent.get(idDominioField).toString() : NA;

// addToBatch(logger,partitionEvents,reEvent);
eventsToPersistCosmos.add(reEvent);
addToBatch(logger, partitionEvents, getEvent(insertedDateValue, reEvent));
addToBatch(logger, partitionEvents, getEvent(insertedDateValue + "-" + idDominio, reEvent));
}

// partitionEvents.forEach((pe,values)->{
// try {
// tableClient.submitTransaction(values);
// } catch (Throwable t){
// logger.severe("Could not save on tableStorage,partition "+pe+", "+values.size()+" rows,error:"+ t.toString());
// }
// });

try {
documentdb.setValue(eventsToPersistCosmos);
} catch (Throwable t){
logger.severe("Could not save on cosmos "+eventsToPersistCosmos.size()+", error:"+ t.toString());
}
// save batch by partition keys
partitionEvents.forEach((pe, values)->{
try {
tableClient.submitTransaction(values);
} catch (Throwable t){
logger.severe("Could not save on tableStorage,partition "+pe+", "+values.size()+" rows,error:"+ t.toString());
}
});

logger.info("Done processing events");
} else {
Expand All @@ -179,8 +160,7 @@ public void processNodoReEvent (
} catch (NullPointerException e) {
logger.severe("NullPointerException exception on cosmos nodo-re-events msg ingestion at "+ LocalDateTime.now()+ " : " + e.getMessage());
} catch (Throwable e) {
logger.severe("Generic exception on cosmos nodo-re-events msg ingestion at "+ LocalDateTime.now()+ " : " + e.getMessage());
logger.severe("Generic exception on table storage nodo-re-events msg ingestion at "+ LocalDateTime.now()+ " : " + e.getMessage());
}

}
}

0 comments on commit eb1e578

Please sign in to comment.