Skip to content

Commit

Permalink
Merge branch 'refs/heads/main' into feature/SELC-5349
Browse files Browse the repository at this point in the history
  • Loading branch information
gianmarcoplutino committed Aug 8, 2024
2 parents 18a0232 + 64b78f9 commit dcf186f
Show file tree
Hide file tree
Showing 9 changed files with 174 additions and 79 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package it.pagopa.selfcare.onboarding.dto;

import it.pagopa.selfcare.onboarding.entity.Onboarding;
import it.pagopa.selfcare.onboarding.entity.Token;
import org.openapi.quarkus.core_json.model.InstitutionResponse;

public class NotificationsResources {
private Onboarding onboarding;
private InstitutionResponse institution;
private Token token;
private QueueEvent queueEvent;

public NotificationsResources(Onboarding onboarding, InstitutionResponse institution, Token token, QueueEvent queueEvent) {
this.onboarding = onboarding;
this.institution = institution;
this.token = token;
this.queueEvent = queueEvent;
}

public Onboarding getOnboarding() {
return onboarding;
}

public void setOnboarding(Onboarding onboarding) {
this.onboarding = onboarding;
}

public InstitutionResponse getInstitution() {
return institution;
}

public void setInstitution(InstitutionResponse institution) {
this.institution = institution;
}

public Token getToken() {
return token;
}

public void setToken(Token token) {
this.token = token;
}

public QueueEvent getQueueEvent() {
return queueEvent;
}

public void setQueueEvent(QueueEvent queueEvent) {
this.queueEvent = queueEvent;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,7 @@ public class ResendNotificationsFilters {
private String from;
private String to;
private Integer page;

public Integer getPage() {
return page;
}

public void setPage(Integer page) {
this.page = page;
}
private String notificationEventTraceId;

public ResendNotificationsFilters() {
}
Expand All @@ -32,6 +25,7 @@ private ResendNotificationsFilters(Builder builder) {
this.from = builder.from;
this.to = builder.to;
this.page = builder.page;
this.notificationEventTraceId = builder.notificationEventTraceId;
}

public static Builder builder() {
Expand All @@ -46,8 +40,8 @@ public static class Builder {
private List<String> status;
private String from;
private String to;

private Integer page;
private String notificationEventTraceId;

public Builder productId(String productId) {
this.productId = productId;
Expand Down Expand Up @@ -89,6 +83,11 @@ public Builder page(Integer page) {
return this;
}

public Builder notificationEventTraceId(String notificationEventTraceId) {
this.notificationEventTraceId = notificationEventTraceId;
return this;
}

public ResendNotificationsFilters build() {
return new ResendNotificationsFilters(this);
}
Expand Down Expand Up @@ -150,6 +149,22 @@ public void setTo(String to) {
this.to = to;
}

public Integer getPage() {
return page;
}

public void setPage(Integer page) {
this.page = page;
}

public String getNotificationEventTraceId() {
return notificationEventTraceId;
}

public void setNotificationEventTraceId(String notificationEventTraceId) {
this.notificationEventTraceId = notificationEventTraceId;
}

@Override
public String toString() {
return "ResendNotificationsFilters{" +
Expand All @@ -161,6 +176,7 @@ public String toString() {
", from='" + from + '\'' +
", to='" + to + '\'' +
", page='" + page + '\'' +
", notificationEventTraceId='" + notificationEventTraceId + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.logging.Level;

import static it.pagopa.selfcare.onboarding.functions.CommonFunctions.FORMAT_LOGGER_ONBOARDING_STRING;
import static it.pagopa.selfcare.onboarding.functions.utils.ActivityName.*;
Expand Down Expand Up @@ -53,7 +54,6 @@ public NotificationFunctions(ObjectMapper objectMapper,
* It gets invoked by module onboarding-cdc when status is COMPLETED or DELETED
*/
@FunctionName("Notification")
@FixedDelayRetry(maxRetryCount = 3, delayInterval = "00:00:30")
public HttpResponseMessage sendNotification (
@HttpTrigger(name = "req", methods = {HttpMethod.POST}, authLevel = AuthorizationLevel.FUNCTION) HttpRequestMessage<Optional<String>> request,
final ExecutionContext context) {
Expand Down Expand Up @@ -165,16 +165,35 @@ public HttpResponseMessage resendNotifications(
@FunctionName("NotificationsSender")
public void notificationsSenderOrchestrator(
@DurableOrchestrationTrigger(name = "taskOrchestrationContext") TaskOrchestrationContext ctx,
ExecutionContext functionContext) {
String filtersString = ctx.getInput(String.class);
functionContext.getLogger().info("Resend notifications orchestration started with input: " + filtersString);
ExecutionContext functionContext) throws JsonProcessingException {
String filtersString = getFiltersFromContextAndEnrichWithInstanceId(ctx);
if (functionContext.getLogger().isLoggable(Level.INFO)) {
functionContext.getLogger().info("Resend notifications orchestration started with input: " + filtersString);
}

do {
filtersString = ctx.callActivity(RESEND_NOTIFICATIONS_ACTIVITY, filtersString, String.class).await();
} while (filtersString != null);

functionContext.getLogger().info("Resend notifications orchestration completed");
}

/**
* It retrieves the filters from the orchestrator context and enriches them with the instanceId.
* For logging purposes, the instanceId will be used as the notificationEventTraceId.
*
* @param ctx TaskOrchestrationContext provided by the Azure Functions runtime.
* @return JSON string representing the filters to be applied when resending notifications.
* @throws JsonProcessingException if there is an error parsing the filtersString into a ResendNotificationsFilters object.
*/
private String getFiltersFromContextAndEnrichWithInstanceId(TaskOrchestrationContext ctx) throws JsonProcessingException {
String filtersString = ctx.getInput(String.class);
String instanceId = ctx.getInstanceId();
ResendNotificationsFilters filters = objectMapper.readValue(filtersString, ResendNotificationsFilters.class);
filters.setNotificationEventTraceId(instanceId);
return objectMapper.writeValueAsString(filters);
}

/**
* It is triggered by the orchestrator function "NotificationsSender", and is responsible for a resend of single page of notifications.
*
Expand All @@ -186,7 +205,6 @@ public void notificationsSenderOrchestrator(
@FunctionName(RESEND_NOTIFICATIONS_ACTIVITY)
public String resendNotificationsActivity(@DurableActivityTrigger(name = "filtersString") String filtersString, final ExecutionContext context) throws JsonProcessingException {
context.getLogger().info(() -> String.format(FORMAT_LOGGER_ONBOARDING_STRING, RESEND_NOTIFICATIONS_ACTIVITY, filtersString));

ResendNotificationsFilters filters;
try {
filters = objectMapper.readValue(filtersString, ResendNotificationsFilters.class);
Expand All @@ -197,7 +215,7 @@ public String resendNotificationsActivity(@DurableActivityTrigger(name = "filter
/*
* At the end of the resendNotifications it is checked whether there are more onboardings to resend, if there are, the method
* returns the same filters received as input by incrementing the page by one to fetch on next iteration the next page of onboardings.
* Otherwise it returns null.
* Otherwise, it returns null.
*/
ResendNotificationsFilters nextFilters = notificationEventResenderService.resendNotifications(filters, context);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,18 @@
import com.microsoft.azure.functions.ExecutionContext;
import it.pagopa.selfcare.onboarding.dto.ResendNotificationsFilters;
import it.pagopa.selfcare.onboarding.entity.Onboarding;
import it.pagopa.selfcare.onboarding.exception.NotificationException;
import jakarta.enterprise.context.ApplicationScoped;

import java.util.List;
import java.util.Optional;
import java.util.logging.Level;

@ApplicationScoped
public class NotificationEventResenderServiceDefault implements NotificationEventResenderService {
private final NotificationEventService notificationEventService;
private final OnboardingService onboardingService;

private static final String RESEND_ENDING_LOG = "Resend notifications for page %s completed with %s notifications sent successfully and %s notifications not sent";
private static final String RESEND_ENDING_LOG_LAST_PAGE = "There aren't more notifications to resend, page %s completed with %s notifications sent successfully and %s notifications not sent";
private static final String RESEND_ENDING_LOG = "Resend notifications for page %s completed";
private static final String RESEND_ENDING_LOG_LAST_PAGE = "There aren't more notifications to resend, page %s completed";


public NotificationEventResenderServiceDefault(NotificationEventService notificationEventService, OnboardingService onboardingService) {
Expand All @@ -25,32 +23,23 @@ public NotificationEventResenderServiceDefault(NotificationEventService notifica
}

public ResendNotificationsFilters resendNotifications(ResendNotificationsFilters filters, ExecutionContext context) {
context.getLogger().info("resendNotifications started with filters: " + filters.toString());
context.getLogger().info(() -> "resendNotifications started with filters: " + filters);

int page = Optional.ofNullable(filters.getPage()).orElse(0);
int pageSize = 100;
int notificationsSent = 0;
int notificationsFailed = 0;

List<Onboarding> onboardingsToResend = onboardingService.getOnboardingsToResend(filters, page, pageSize);
context.getLogger().info(String.format("Found: %s onboardings to send for page: %s ", onboardingsToResend.size(), page));

context.getLogger().info(() -> String.format("Found: %s onboardings to send for page: %s ", onboardingsToResend.size(), page));
for (Onboarding onboarding : onboardingsToResend) {
try {
notificationEventService.send(context, onboarding, null);
notificationsSent++;
} catch (NotificationException e) {
context.getLogger().log(Level.WARNING, e, () -> String.format("Error resending notification for onboarding with ID %s. Error: %s", onboarding.getId(), e.getMessage()));
notificationsFailed++;
}
notificationEventService.send(context, onboarding, null, filters.getNotificationEventTraceId());
}

if(onboardingsToResend.isEmpty() || onboardingsToResend.size() < pageSize) {
context.getLogger().info(String.format(RESEND_ENDING_LOG_LAST_PAGE, filters.getPage(), notificationsSent, notificationsFailed));
context.getLogger().info(() -> String.format(RESEND_ENDING_LOG_LAST_PAGE, filters.getPage()));
return null;
}

context.getLogger().info(String.format(RESEND_ENDING_LOG, filters.getPage(), notificationsSent, notificationsFailed));
context.getLogger().info(() -> String.format(RESEND_ENDING_LOG, filters.getPage()));

filters.setPage(page + 1);
return filters;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@
public interface NotificationEventService {

void send(ExecutionContext context, Onboarding onboarding, QueueEvent queueEvent);

void send(ExecutionContext context, Onboarding onboarding, QueueEvent queueEvent, String notificationEventTraceId);
}
Loading

0 comments on commit dcf186f

Please sign in to comment.