Skip to content

Commit

Permalink
Merge bbc6e5b into e1e9aca
Browse files Browse the repository at this point in the history
  • Loading branch information
pandaapo authored May 7, 2023
2 parents e1e9aca + bbc6e5b commit 9e60028
Show file tree
Hide file tree
Showing 7 changed files with 274 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
* LoadBalance Interface
*
* <p> see {@link RandomLoadBalanceSelector}
* <p> see {@link WeightRandomLoadBalanceSelector}
* <p> see {@link WeightRoundRobinLoadBalanceSelector}
* <p> see {@link SourceIPHashLoadBalanceSelector}
*
* @param <T> Target type
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
public enum LoadBalanceType {
RANDOM(0, "random load balance strategy"),
WEIGHT_ROUND_ROBIN(1, "weight round robin load balance strategy"),
WEIGHT_RANDOM(2, "weight random load balance strategy");
WEIGHT_RANDOM(2, "weight random load balance strategy"),
SOURCE_IP_HASH(3, "source IP hash load balance strategy");

private final int code;
private final String desc;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.common.loadbalance;

import org.apache.commons.lang3.StringUtils;

import java.util.Collections;
import java.util.List;


import lombok.extern.slf4j.Slf4j;

/**
* Source IP Hash LoadBalance: make the same client always accessing the same server.
*
* @param <T> Target type
*/
@Slf4j
public class SourceIPHashLoadBalanceSelector<T> implements LoadBalanceSelector<T> {

private final transient List<T> servers;

private String clientKey;

public SourceIPHashLoadBalanceSelector(List<T> servers, String clientKey) {
this.servers = servers;
this.clientKey = clientKey;
}

@Override
public T select() {
// Avoid servers being changed during select().
List<T> targets = Collections.unmodifiableList(servers);
if (StringUtils.isBlank(clientKey)) {
clientKey = "127.0.0.1";
log.warn("Blank client IP has been set default {}", clientKey);

Check warning on line 51 in eventmesh-common/src/main/java/org/apache/eventmesh/common/loadbalance/SourceIPHashLoadBalanceSelector.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-common/src/main/java/org/apache/eventmesh/common/loadbalance/SourceIPHashLoadBalanceSelector.java#L50-L51

Added lines #L50 - L51 were not covered by tests
}
int hashCode = hash(clientKey);
int index = hashCode % targets.size();
return targets.get(index);
}

@Override
public LoadBalanceType getType() {
return LoadBalanceType.SOURCE_IP_HASH;
}

/**
* FNV hash algorithm that is suitable for hashing some similar strings, like IP.
* @return
*/
private int hash(String data) {
final int p = 16777619;
int hash = (int) 2166136261L;
for (int i = 0; i < data.length(); i++) {
hash = (hash ^ data.charAt(i)) * p;
}
hash += hash << 13;
hash ^= hash >> 7;
hash += hash << 3;
hash ^= hash >> 17;
hash += hash << 5;
if (hash < 0) {
hash = Math.abs(hash);
}
return hash;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.common.utils;

import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;

/**
* A string utils as supplement of org.apache.commons.lang3.StringUtils
*/
public class CommonStringUtils extends StringUtils {

Check warning on line 26 in eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/CommonStringUtils.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-common/src/main/java/org/apache/eventmesh/common/utils/CommonStringUtils.java#L26

Added line #L26 was not covered by tests

/**
* Compares given string to a CharSequences vararg of searchStrings,
* returning true if the string is equal to all of the searchStrings.
*
* CommonStringUtils.equalsAll("abc", "abc", "def") = false
* CommonStringUtils.equalsAll(null, "abc", "def") = false
* CommonStringUtils.equalsAll(null, (CharSequence[]) null) = true
* CommonStringUtils.equalsAll(null, null, null) = true
* CommonStringUtils.equalsAll("abc", "abc", "abc") = true
*
* @param string
* @param searchStrings
* @return
*/
public static boolean equalsAll(final CharSequence string, final CharSequence... searchStrings) {
if (ArrayUtils.isNotEmpty(searchStrings)) {
for (final CharSequence next : searchStrings) {
if (!equals(string, next)) {
return false;
}
}
}
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.common.loadbalance;

import org.apache.eventmesh.common.utils.CommonStringUtils;

import org.apache.commons.lang3.StringUtils;

import java.util.Arrays;
import java.util.List;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class SourceIPHashLoadBalanceSelectorTest {

private SourceIPHashLoadBalanceSelector<String> loadBalanceSelector;

private List<String> servers;

private String client1Key;

private String client2Key;

@Before
public void init() {
servers = Arrays.asList(new String[]{
"192.168.1.10", "192.168.1.11",
"192.168.1.12", "192.168.1.13",
"192.168.1.14", "192.168.1.15"
});
client1Key = "192.168.1.1-1-tester1-TLSv1.2";
client2Key = "192.168.1.2-1-tester2-TLSv1.2";
loadBalanceSelector = new SourceIPHashLoadBalanceSelector<>(servers, client1Key);
}

@Test
public void testSelect() {
String target1 = loadBalanceSelector.select();
String target2 = loadBalanceSelector.select();
String target3 = loadBalanceSelector.select();
Assert.assertTrue(CommonStringUtils.equalsAll(target1, target2, target3));

loadBalanceSelector = new SourceIPHashLoadBalanceSelector<>(servers, client2Key);
String target4 = loadBalanceSelector.select();
Assert.assertFalse(StringUtils.equals(target1, target4));
}

@Test
public void testType() {
Assert.assertEquals(LoadBalanceType.SOURCE_IP_HASH, loadBalanceSelector.getType());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.common.utils;

import org.junit.Assert;
import org.junit.Test;

public class CommonStringUtilsTest {

@Test
public void testEqualsAll() {
Assert.assertTrue(CommonStringUtils.equalsAll(null, null));
Assert.assertTrue(CommonStringUtils.equalsAll(null, null, null));
Assert.assertTrue(CommonStringUtils.equalsAll("", "", ""));
Assert.assertTrue(CommonStringUtils.equalsAll("abc", "abc", "abc"));
Assert.assertFalse(CommonStringUtils.equalsAll(null, "abc", "def"));
Assert.assertFalse(CommonStringUtils.equalsAll("abc", "def", "ghi"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,15 @@
import org.apache.eventmesh.common.exception.EventMeshException;
import org.apache.eventmesh.common.loadbalance.LoadBalanceSelector;
import org.apache.eventmesh.common.loadbalance.RandomLoadBalanceSelector;
import org.apache.eventmesh.common.loadbalance.SourceIPHashLoadBalanceSelector;
import org.apache.eventmesh.common.loadbalance.Weight;
import org.apache.eventmesh.common.loadbalance.WeightRandomLoadBalanceSelector;
import org.apache.eventmesh.common.loadbalance.WeightRoundRobinLoadBalanceSelector;
import org.apache.eventmesh.common.utils.IPUtils;
import org.apache.eventmesh.common.utils.SystemUtils;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -55,6 +59,10 @@ public static LoadBalanceSelector<String> createEventMeshServerLoadBalanceSelect
eventMeshServerSelector = new WeightRoundRobinLoadBalanceSelector<>(buildWeightedClusterGroupFromConfig(
eventMeshHttpClientConfig));
break;
case SOURCE_IP_HASH:
eventMeshServerSelector = new SourceIPHashLoadBalanceSelector<>(
buildClusterGroupFromConfig(eventMeshHttpClientConfig), toClientKey(eventMeshHttpClientConfig));
break;

Check warning on line 65 in eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/util/HttpLoadBalanceUtils.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/util/HttpLoadBalanceUtils.java#L63-L65

Added lines #L63 - L65 were not covered by tests
default:
// ignore
}
Expand All @@ -64,6 +72,27 @@ public static LoadBalanceSelector<String> createEventMeshServerLoadBalanceSelect
return eventMeshServerSelector;
}

/**
* get client unique key(format: IP-pid-userName-sslClientProtocol) from EventMeshHttpClientConfig.
* @param eventMeshHttpClientConfig
* @return
*/
private static String toClientKey(EventMeshHttpClientConfig eventMeshHttpClientConfig) {
String ip = eventMeshHttpClientConfig.getIp();

Check warning on line 81 in eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/util/HttpLoadBalanceUtils.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/util/HttpLoadBalanceUtils.java#L81

Added line #L81 was not covered by tests
if (StringUtils.equals(ip, "localhost")) {
ip = IPUtils.getLocalAddress();

Check warning on line 83 in eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/util/HttpLoadBalanceUtils.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/util/HttpLoadBalanceUtils.java#L83

Added line #L83 was not covered by tests
}
String pid = eventMeshHttpClientConfig.getPid();

Check warning on line 85 in eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/util/HttpLoadBalanceUtils.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/util/HttpLoadBalanceUtils.java#L85

Added line #L85 was not covered by tests
if (StringUtils.isBlank(pid)) {
pid = SystemUtils.getProcessId();

Check warning on line 87 in eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/util/HttpLoadBalanceUtils.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/util/HttpLoadBalanceUtils.java#L87

Added line #L87 was not covered by tests
}
return StringUtils.joinWith("-",

Check warning on line 89 in eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/util/HttpLoadBalanceUtils.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/util/HttpLoadBalanceUtils.java#L89

Added line #L89 was not covered by tests
ip, pid,
eventMeshHttpClientConfig.getUserName(),
eventMeshHttpClientConfig.getSslClientProtocol()

Check warning on line 92 in eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/util/HttpLoadBalanceUtils.java

View check run for this annotation

Codecov / codecov/patch

eventmesh-sdk-java/src/main/java/org/apache/eventmesh/client/http/util/HttpLoadBalanceUtils.java#L91-L92

Added lines #L91 - L92 were not covered by tests
);
}

private static List<Weight<String>> buildWeightedClusterGroupFromConfig(
final EventMeshHttpClientConfig eventMeshHttpClientConfig)
throws EventMeshException {
Expand Down

0 comments on commit 9e60028

Please sign in to comment.