From 0b9a5973e41102b629473938a916e1a2ed630c17 Mon Sep 17 00:00:00 2001
From: pandaapo <1052156701@qq.com>
Date: Sun, 7 May 2023 14:33:50 +0800
Subject: [PATCH 1/2] Add a load balance strategy: source ip hash
---
.../loadbalance/LoadBalanceSelector.java | 2 +
.../common/loadbalance/LoadBalanceType.java | 3 +-
.../SourceIPHashLoadBalanceSelector.java | 83 +++++++++++++++++++
.../common/utils/CommonStringUtils.java | 52 ++++++++++++
.../SourceIPHashLoadBalanceSelectorTest.java | 72 ++++++++++++++++
.../http/util/HttpLoadBalanceUtils.java | 29 +++++++
6 files changed, 240 insertions(+), 1 deletion(-)
create mode 100644 eventmesh-common/src/main/java/org/apache/eventmesh/common/loadbalance/SourceIPHashLoadBalanceSelector.java
create mode 100644 eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/CommonStringUtils.java
create mode 100644 eventmesh-common/src/test/java/org/apache/eventmesh/common/loadbalance/SourceIPHashLoadBalanceSelectorTest.java
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/loadbalance/LoadBalanceSelector.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/loadbalance/LoadBalanceSelector.java
index 3bee08c6ec..778c1eabb1 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/loadbalance/LoadBalanceSelector.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/loadbalance/LoadBalanceSelector.java
@@ -21,7 +21,9 @@
* LoadBalance Interface
*
*
see {@link RandomLoadBalanceSelector}
+ *
see {@link WeightRandomLoadBalanceSelector}
*
see {@link WeightRoundRobinLoadBalanceSelector}
+ *
see {@link SourceIPHashLoadBalanceSelector}
*
* @param Target type
*/
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/loadbalance/LoadBalanceType.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/loadbalance/LoadBalanceType.java
index b85918b251..3309133bfe 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/loadbalance/LoadBalanceType.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/loadbalance/LoadBalanceType.java
@@ -22,7 +22,8 @@
public enum LoadBalanceType {
RANDOM(0, "random load balance strategy"),
WEIGHT_ROUND_ROBIN(1, "weight round robin load balance strategy"),
- WEIGHT_RANDOM(2, "weight random load balance strategy");
+ WEIGHT_RANDOM(2, "weight random load balance strategy"),
+ SOURCE_IP_HASH(3, "source IP hash load balance strategy");
private final int code;
private final String desc;
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/loadbalance/SourceIPHashLoadBalanceSelector.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/loadbalance/SourceIPHashLoadBalanceSelector.java
new file mode 100644
index 0000000000..89e180787f
--- /dev/null
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/loadbalance/SourceIPHashLoadBalanceSelector.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.loadbalance;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.Collections;
+import java.util.List;
+
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Source IP Hash LoadBalance: make the same client always accessing the same server.
+ *
+ * @param Target type
+ */
+@Slf4j
+public class SourceIPHashLoadBalanceSelector implements LoadBalanceSelector {
+
+ private final transient List servers;
+
+ private String clientKey;
+
+ public SourceIPHashLoadBalanceSelector(List servers, String clientKey) {
+ this.servers = servers;
+ this.clientKey = clientKey;
+ }
+
+ @Override
+ public T select() {
+ // Avoid servers being changed during select().
+ List targets = Collections.unmodifiableList(servers);
+ if (StringUtils.isBlank(clientKey)) {
+ clientKey = "127.0.0.1";
+ log.warn("Blank client IP has been set default {}", clientKey);
+ }
+ int hashCode = hash(clientKey);
+ int index = hashCode % targets.size();
+ return targets.get(index);
+ }
+
+ @Override
+ public LoadBalanceType getType() {
+ return LoadBalanceType.SOURCE_IP_HASH;
+ }
+
+ /**
+ * FNV hash algorithm that is suitable for hashing some similar strings, like IP.
+ * @return
+ */
+ private int hash(String data) {
+ final int p = 16777619;
+ int hash = (int) 2166136261L;
+ for (int i = 0; i < data.length(); i++) {
+ hash = (hash ^ data.charAt(i)) * p;
+ }
+ hash += hash << 13;
+ hash ^= hash >> 7;
+ hash += hash << 3;
+ hash ^= hash >> 17;
+ hash += hash << 5;
+ if (hash < 0) {
+ hash = Math.abs(hash);
+ }
+ return hash;
+ }
+}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/CommonStringUtils.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/CommonStringUtils.java
new file mode 100644
index 0000000000..84f016c5e6
--- /dev/null
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/CommonStringUtils.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.utils;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ * A string utils as supplement of org.apache.commons.lang3.StringUtils
+ */
+public class CommonStringUtils extends StringUtils {
+
+ /**
+ * Compares given string to a CharSequences vararg of searchStrings,
+ * returning true if the string is equal to all of the searchStrings.
+ *
+ * CommonStringUtils.equalsAll("abc", "abc", "def") = false
+ * CommonStringUtils.equalsAll(null, "abc", "def") = false
+ * CommonStringUtils.equalsAll(null, (CharSequence[]) null) = true
+ * CommonStringUtils.equalsAll(null, null, null) = true
+ * CommonStringUtils.equalsAll("abc", "abc", "abc") = true
+ *
+ * @param string
+ * @param searchStrings
+ * @return
+ */
+ public static boolean equalsAll(final CharSequence string, final CharSequence... searchStrings) {
+ if (ArrayUtils.isNotEmpty(searchStrings)) {
+ for (final CharSequence next : searchStrings) {
+ if (!equals(string, next)) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+}
diff --git a/eventmesh-common/src/test/java/org/apache/eventmesh/common/loadbalance/SourceIPHashLoadBalanceSelectorTest.java b/eventmesh-common/src/test/java/org/apache/eventmesh/common/loadbalance/SourceIPHashLoadBalanceSelectorTest.java
new file mode 100644
index 0000000000..e7ddef66c1
--- /dev/null
+++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/loadbalance/SourceIPHashLoadBalanceSelectorTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.loadbalance;
+
+import org.apache.eventmesh.common.utils.CommonStringUtils;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class SourceIPHashLoadBalanceSelectorTest {
+
+ private SourceIPHashLoadBalanceSelector loadBalanceSelector;
+
+ private List servers;
+
+ private String client1Key;
+
+ private String client2Key;
+
+ @Before
+ public void init() {
+ servers = Arrays.asList(new String[]{
+ "192.168.1.10", "192.168.1.11",
+ "192.168.1.12", "192.168.1.13",
+ "192.168.1.14", "192.168.1.15"
+ });
+ client1Key = "192.168.1.1-1-tester1-TLSv1.2";
+ client2Key = "192.168.1.2-1-tester2-TLSv1.2";
+ loadBalanceSelector = new SourceIPHashLoadBalanceSelector<>(servers, client1Key);
+ }
+
+ @Test
+ public void testSelect() {
+ String target1 = loadBalanceSelector.select();
+ String target2 = loadBalanceSelector.select();
+ String target3 = loadBalanceSelector.select();
+ Assert.assertTrue(CommonStringUtils.equalsAll(target1, target2, target3));
+
+ loadBalanceSelector = new SourceIPHashLoadBalanceSelector<>(servers, client2Key);
+ String target4 = loadBalanceSelector.select();
+ Assert.assertFalse(StringUtils.equals(target1, target4));
+ }
+
+ @Test
+ public void testType() {
+ Assert.assertEquals(LoadBalanceType.SOURCE_IP_HASH, loadBalanceSelector.getType());
+ }
+}
diff --git a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/util/HttpLoadBalanceUtils.java b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/util/HttpLoadBalanceUtils.java
index 3f2646ae03..51c03dc810 100644
--- a/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/util/HttpLoadBalanceUtils.java
+++ b/eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/util/HttpLoadBalanceUtils.java
@@ -21,11 +21,15 @@
import org.apache.eventmesh.common.exception.EventMeshException;
import org.apache.eventmesh.common.loadbalance.LoadBalanceSelector;
import org.apache.eventmesh.common.loadbalance.RandomLoadBalanceSelector;
+import org.apache.eventmesh.common.loadbalance.SourceIPHashLoadBalanceSelector;
import org.apache.eventmesh.common.loadbalance.Weight;
import org.apache.eventmesh.common.loadbalance.WeightRandomLoadBalanceSelector;
import org.apache.eventmesh.common.loadbalance.WeightRoundRobinLoadBalanceSelector;
+import org.apache.eventmesh.common.utils.IPUtils;
+import org.apache.eventmesh.common.utils.SystemUtils;
import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.List;
@@ -55,6 +59,10 @@ public static LoadBalanceSelector createEventMeshServerLoadBalanceSelect
eventMeshServerSelector = new WeightRoundRobinLoadBalanceSelector<>(buildWeightedClusterGroupFromConfig(
eventMeshHttpClientConfig));
break;
+ case SOURCE_IP_HASH:
+ eventMeshServerSelector = new SourceIPHashLoadBalanceSelector<>(
+ buildClusterGroupFromConfig(eventMeshHttpClientConfig), toClientKey(eventMeshHttpClientConfig));
+ break;
default:
// ignore
}
@@ -64,6 +72,27 @@ public static LoadBalanceSelector createEventMeshServerLoadBalanceSelect
return eventMeshServerSelector;
}
+ /**
+ * get client unique key(format: IP-pid-userName-sslClientProtocol) from EventMeshHttpClientConfig.
+ * @param eventMeshHttpClientConfig
+ * @return
+ */
+ private static String toClientKey(EventMeshHttpClientConfig eventMeshHttpClientConfig) {
+ String ip = eventMeshHttpClientConfig.getIp();
+ if (StringUtils.equals(ip, "localhost")) {
+ ip = IPUtils.getLocalAddress();
+ }
+ String pid = eventMeshHttpClientConfig.getPid();
+ if (StringUtils.isBlank(pid)) {
+ pid = SystemUtils.getProcessId();
+ }
+ return StringUtils.joinWith("-",
+ ip, pid,
+ eventMeshHttpClientConfig.getUserName(),
+ eventMeshHttpClientConfig.getSslClientProtocol()
+ );
+ }
+
private static List> buildWeightedClusterGroupFromConfig(
final EventMeshHttpClientConfig eventMeshHttpClientConfig)
throws EventMeshException {
From bbc6e5b36a23ca19c432b1e799af474b396301a9 Mon Sep 17 00:00:00 2001
From: pandaapo <1052156701@qq.com>
Date: Sun, 7 May 2023 16:01:47 +0800
Subject: [PATCH 2/2] Add unit test for CommonStringUtils
---
.../common/utils/CommonStringUtilsTest.java | 34 +++++++++++++++++++
1 file changed, 34 insertions(+)
create mode 100644 eventmesh-common/src/test/java/org/apache/eventmesh/common/utils/CommonStringUtilsTest.java
diff --git a/eventmesh-common/src/test/java/org/apache/eventmesh/common/utils/CommonStringUtilsTest.java b/eventmesh-common/src/test/java/org/apache/eventmesh/common/utils/CommonStringUtilsTest.java
new file mode 100644
index 0000000000..20dcbe199a
--- /dev/null
+++ b/eventmesh-common/src/test/java/org/apache/eventmesh/common/utils/CommonStringUtilsTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.utils;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CommonStringUtilsTest {
+
+ @Test
+ public void testEqualsAll() {
+ Assert.assertTrue(CommonStringUtils.equalsAll(null, null));
+ Assert.assertTrue(CommonStringUtils.equalsAll(null, null, null));
+ Assert.assertTrue(CommonStringUtils.equalsAll("", "", ""));
+ Assert.assertTrue(CommonStringUtils.equalsAll("abc", "abc", "abc"));
+ Assert.assertFalse(CommonStringUtils.equalsAll(null, "abc", "def"));
+ Assert.assertFalse(CommonStringUtils.equalsAll("abc", "def", "ghi"));
+ }
+}