Skip to content

Commit

Permalink
kafkajs custom log
Browse files Browse the repository at this point in the history
  • Loading branch information
MalpenZibo committed Jun 5, 2024
1 parent 1dcbee4 commit 8fe8e3e
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 262 deletions.
3 changes: 2 additions & 1 deletion packages/kafka-iam-auth/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
"aws-msk-iam-sasl-signer-js": "^1.0.0",
"kafkajs": "2.2.4",
"pagopa-interop-commons": "workspace:^",
"pagopa-interop-models": "workspace:*"
"pagopa-interop-models": "workspace:*",
"ts-pattern": "5.1.2"
},
"devDependencies": {
"typescript": "5.4.5"
Expand Down
33 changes: 32 additions & 1 deletion packages/kafka-iam-auth/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ import {
Kafka,
KafkaConfig,
OauthbearerProviderResponse,
logLevel,
} from "kafkajs";
import {
KafkaConsumerConfig,
Logger,
genericLogger,
} from "pagopa-interop-commons";
import { kafkaMessageProcessError } from "pagopa-interop-models";
import { P, match } from "ts-pattern";

const errorTypes = ["unhandledRejection", "uncaughtException"];
const signalTraps = ["SIGTERM", "SIGINT", "SIGUSR2"];
Expand Down Expand Up @@ -138,7 +140,36 @@ const initConsumer = async (
},
};

const kafka = new Kafka(kafkaConfig);
const kafka = new Kafka({
...kafkaConfig,
logCreator:
(_logLevel) =>
({ level, log }) => {
const { message, error } = log;

const filteredLevel = match(error)
.with(
P.string,
(error) =>
(level === logLevel.ERROR || level === logLevel.WARN) &&
error.includes("The group is rebalancing, so a rejoin is needed"),
() => logLevel.INFO
)
.otherwise(() => level);

// eslint-disable-next-line sonarjs/no-nested-template-literals
const msg = `${message}${error ? ` - ${error}` : ""}`;

match(filteredLevel)
.with(logLevel.NOTHING, logLevel.ERROR, () =>
genericLogger.error(msg)
)
.with(logLevel.WARN, () => genericLogger.warn(msg))
.with(logLevel.INFO, () => genericLogger.info(msg))
.with(logLevel.DEBUG, () => genericLogger.debug(msg))
.otherwise(() => genericLogger.error(msg));
},
});

const consumer = kafka.consumer({
groupId: config.kafkaGroupId,
Expand Down
Loading

0 comments on commit 8fe8e3e

Please sign in to comment.