Skip to content

Added project reactor transport - reactive rate limiting (Tested) #425

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 55 commits into from
Jul 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
5433cba
Just a start/idea of what we have in mind for 4.0. All subject to cha…
ConnorLinfoot Feb 28, 2021
75e7b01
Add HTTPQueryParams for some cleaner handling over an array
ConnorLinfoot Feb 28, 2021
db810c9
Add a README.md here and remove PlayerCountReply.java
ConnorLinfoot Feb 28, 2021
2475435
Add ApacheHTTPClient to Example
ConnorLinfoot Feb 28, 2021
79512a1
Merge branch 'master' into 4.0.0
ConnorLinfoot Feb 28, 2021
ffefdf6
checkReply return the same type
ConnorLinfoot Feb 28, 2021
964e8b3
Merge branch 'master' into 4.0.0
ConnorLinfoot Feb 28, 2021
a9baafa
Add comment for issue 249
ConnorLinfoot Feb 28, 2021
4665b53
Tweak doc
ConnorLinfoot Feb 28, 2021
643ae52
Some more tweaks towards javadocs etc
ConnorLinfoot Feb 28, 2021
bc7d17a
Merge branch 'master' into 4.0.0
ConnorLinfoot Feb 28, 2021
c3e69f5
Merge branch 'master' into 4.0.0
ConnorLinfoot Feb 28, 2021
cda8ec7
Move SkyBlockBazaarReply to use longs instead of ints
ConnorLinfoot Feb 28, 2021
f3c8a6d
Fix Bazaar Example
ConnorLinfoot Mar 14, 2021
9e6dfd3
Add a comment about the recent change to the name endpoint
ConnorLinfoot Mar 14, 2021
ef86bc8
Merge branch 'master' into 4.0.0
ConnorLinfoot May 9, 2021
98d3f15
Make member static
ConnorLinfoot May 9, 2021
edc181b
Add REPLAY and SMP GameType
ConnorLinfoot May 9, 2021
25eed71
Readme
ConnorLinfoot May 9, 2021
ab05344
Some tweaks, API Key is now passed and only handdled by the HTTP Client
ConnorLinfoot May 9, 2021
71d8bf1
Move a lot of things
ConnorLinfoot May 9, 2021
d79e4c2
Readme changes
ConnorLinfoot May 9, 2021
1439a17
Readme changes
ConnorLinfoot May 9, 2021
8eef69d
Update README.md
ConnorLinfoot May 9, 2021
e4534c0
update license time
ConnorLinfoot May 9, 2021
7f1fea8
Remove these, will make sure its all on the main docs
ConnorLinfoot May 9, 2021
4cecf98
Example link
ConnorLinfoot May 9, 2021
44dac46
tweaks
ConnorLinfoot May 9, 2021
b153827
Tweak
ConnorLinfoot May 9, 2021
687a737
Link direct to README.md
ConnorLinfoot May 9, 2021
16d3069
Tweak examples
ConnorLinfoot May 9, 2021
3b60271
New exceptions and rename some stuff
ConnorLinfoot May 9, 2021
8b03f3f
Add getSkyBlockProfiles method
ConnorLinfoot May 9, 2021
903841b
Add ServerType along with LobbyType
ConnorLinfoot May 9, 2021
816379c
Add a comment
ConnorLinfoot May 9, 2021
d040528
Back to Hypixel example
ConnorLinfoot May 9, 2021
7f101c4
Add User Agent
ConnorLinfoot May 9, 2021
3c1ef7a
Be consistent
ConnorLinfoot May 10, 2021
db06248
Include versions in Readme
ConnorLinfoot May 10, 2021
ecb87d9
Merge branch 'master' into 4.0.0
ConnorLinfoot May 10, 2021
64ac624
Remove skyblock profiles cause another PR has it
ConnorLinfoot May 10, 2021
0edbb73
Added reactive transport
GrizzlT May 11, 2021
6984771
Style conventions update + readme
GrizzlT May 11, 2021
7d191b2
updated main Readme
GrizzlT May 11, 2021
b0abf54
updated main readme
GrizzlT May 11, 2021
b842d14
bugfix
GrizzlT May 11, 2021
bbe53b3
Update documentation
GrizzlT May 11, 2021
45cb42f
added limitRate
GrizzlT May 11, 2021
8c1aeac
changed limitRate from 3 to 1
GrizzlT May 11, 2021
fe8791c
limitRate to delayElements
GrizzlT May 11, 2021
9ed93ed
added subscription on scheduler
GrizzlT May 11, 2021
e752aca
changed delay Scheduler
GrizzlT May 11, 2021
efa342d
different blocking method + unit tested and fixed bugs and inconsiste…
GrizzlT May 13, 2021
d471048
Manual merge
ConnorLinfoot Jul 26, 2021
f4c562c
Change versions to 4.0
ConnorLinfoot Jul 26, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,12 @@ repositories {

#### Transports

We include two built in options communicating with the Hypixel API, you can include either of these or even include the
We include three built-in options for communicating with the Hypixel API, you can include either of these or even include the
core API directly and create your own instance of HypixelHTTPClient.

* [Apache HttpClient Transport](hypixel-api-transport-apache/README.md)
* [Unirest Java Transport](hypixel-api-transport-unirest/README.md)
* [Project Reactor Transport](hypixel-api-transport-reactor/README.md) (automatic rate-limiting by default)

### Dependencies

Expand Down
49 changes: 49 additions & 0 deletions hypixel-api-transport-reactor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@

Hypixel Public API - Reactive Transport
======

### Usage

```xml
<dependency>
<groupId>net.hypixel</groupId>
<artifactId>hypixel-api-transport-reactor</artifactId>
<version>4.0</version>
</dependency>
```

Can also be included with Gradle.

```gradle
dependencies {
implementation 'net.hypixel:hypixel-api-transport-reactor:4.0'
}
```

### Example code

```java
public class Main {
public static void main(String[] args) {
HypixelHttpClient client = new ReactorHttpClient(UUID.fromString("your-api-key-here"));
HypixelAPI hypixelAPI = new HypixelAPI(client);
hypixelAPI.getPlayerByName("Hypixel")
.exceptionally(throwable -> {
// Handle exceptions here
throwable.printStackTrace();
return null;
})
.thenAccept(System.out::println);
}
}
```

### Dependencies

This transport depends on the following:

* [Google Gson library - 2.8.6](https://mvnrepository.com/artifact/com.google.code.gson/gson) (for hypixel-api-core)
* [Reactor Core 3.4.5](https://mvnrepository.com/artifact/io.projectreactor/reactor-core) (for reactor netty)
* Reactor Netty [(project-reactor)](https://projectreactor.io/docs):
* [Netty Core 1.0.6](https://mvnrepository.com/artifact/io.projectreactor.netty/reactor-netty-core)
* [Netty Http 1.0.6](https://mvnrepository.com/artifact/io.projectreactor.netty/reactor-netty-http)
55 changes: 55 additions & 0 deletions hypixel-api-transport-reactor/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>hypixel-api</artifactId>
<groupId>net.hypixel</groupId>
<version>4.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>hypixel-api-transport-reactor</artifactId>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bom</artifactId>
<version>2020.0.6</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>net.hypixel</groupId>
<artifactId>hypixel-api-core</artifactId>
<version>4.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty-core</artifactId>
</dependency>
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty-http</artifactId>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
package net.hypixel.api.reactor;

import io.netty.handler.codec.http.HttpResponseStatus;
import net.hypixel.api.http.HypixelHttpClient;
import net.hypixel.api.http.HypixelHttpResponse;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.scheduler.Schedulers;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.HttpClientResponse;
import reactor.util.function.Tuple2;

import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

public class ReactorHttpClient implements HypixelHttpClient {
private final HttpClient httpClient;
private final UUID apiKey;

// Marker to reset the request counter and release waiting threads
private final AtomicBoolean firstRequestReturned = new AtomicBoolean(false);
// Marker to only schedule a reset clock once on error 429
private final AtomicBoolean overflowStartedNewClock = new AtomicBoolean(false);

// Callbacks that will trigger their corresponding requests
private final ArrayBlockingQueue<RequestCallback> blockingQueue;

// For shutting down the flux that emits request callbacks
private final Disposable requestCallbackFluxDisposable;

private final Object lock = new Object();

/*
* How many requests we can send before reaching the limit
* Starts as 1 so the first request returns and resets this value before allowing other requests to be sent.
*/
private int actionsLeftThisMinute = 1;

/**
* Constructs a new instance of this client using the specified API key.
*
* @param apiKey the key associated with this connection
* @param minDelayBetweenRequests minimum time between sending requests (in ms) default is 8
* @param bufferCapacity fixed size of blockingQueue
*/
public ReactorHttpClient(UUID apiKey, long minDelayBetweenRequests, int bufferCapacity) {
this.apiKey = apiKey;
this.httpClient = HttpClient.create().secure();
this.blockingQueue = new ArrayBlockingQueue<>(bufferCapacity);

this.requestCallbackFluxDisposable = Flux.<RequestCallback>generate((synchronousSink) -> {
try {
RequestCallback callback = blockingQueue.take();
// prune skipped/completed requests to avoid counting them
while (callback.isCanceled()) {
callback = blockingQueue.take();
}

synchronized (lock) {
while (this.actionsLeftThisMinute <= 0) {
lock.wait();
}

actionsLeftThisMinute--;
}
synchronousSink.next(callback);
} catch (InterruptedException e) {
throw new AssertionError("This should not have been possible", e);
}
}).subscribeOn(Schedulers.boundedElastic()).delayElements(Duration.ofMillis(minDelayBetweenRequests), Schedulers.boundedElastic()).subscribe(RequestCallback::sendRequest);
}

public ReactorHttpClient(UUID apiKey, long minDelayBetweenRequests)
{
this(apiKey, minDelayBetweenRequests, 500);
}

public ReactorHttpClient(UUID apiKey, int bufferCapacity)
{
this(apiKey, 8, bufferCapacity);
}

public ReactorHttpClient(UUID apiKey)
{
this(apiKey, 8, 500);
}

/**
* Canceling the returned future will result in canceling the request if possible
*/
@Override
public CompletableFuture<HypixelHttpResponse> makeRequest(String url) {
return toHypixelResponseFuture(makeRequest(url, false));
}

/**
* Canceling the returned future will result in canceling the request if possible
*/
@Override
public CompletableFuture<HypixelHttpResponse> makeAuthenticatedRequest(String url) {
return toHypixelResponseFuture(makeRequest(url, true));
}

private static CompletableFuture<HypixelHttpResponse> toHypixelResponseFuture(Mono<Tuple2<String, Integer>> result) {
return result.map(tuple -> new HypixelHttpResponse(tuple.getT2(), tuple.getT1()))
.toFuture();
}

@Override
public void shutdown() {
this.requestCallbackFluxDisposable.dispose();
}

/**
* Makes a request to the Hypixel api and returns a {@link Mono<Tuple2<String, Integer>>} containing
* the response body and status code, canceling this mono will prevent the request from being sent if possible
* @param path full url
* @param isAuthenticated whether to enable authentication or not
*/
public Mono<Tuple2<String, Integer>> makeRequest(String path, boolean isAuthenticated) {
return Mono.<Tuple2<String, Integer>>create(sink -> {
RequestCallback callback = new RequestCallback(path, sink, isAuthenticated, this);

try {
this.blockingQueue.put(callback);
} catch (InterruptedException e) {
sink.error(e);
throw new AssertionError("Queue insertion interrupted. This should not have been possible", e);
}
}).subscribeOn(Schedulers.boundedElastic());
}

/**
* Reads response status and retries error 429 (too many requests)
* The first request after every limit reset will be used to schedule the next limit reset
*
* @param response the {@link HttpClientResponse} from our request
* @param requestCallback the callback controlling our request
* @return whether to return the request body or wait for a retry
*/
private ResponseHandlingResult handleResponse(HttpClientResponse response, RequestCallback requestCallback) throws InterruptedException {
if (response.status() == HttpResponseStatus.TOO_MANY_REQUESTS) {
int timeRemaining = Math.max(1, response.responseHeaders().getInt("ratelimit-reset", 10));

if (this.overflowStartedNewClock.compareAndSet(false, true)) {
synchronized (lock) {
this.actionsLeftThisMinute = 0;
}
resetForFirstRequest(timeRemaining);
}

// execute this last to prevent a possible exception from messing up our clock synchronization
this.blockingQueue.put(requestCallback);
return new ResponseHandlingResult(false, response.status().code());
}

if (this.firstRequestReturned.compareAndSet(false, true)) {
int timeRemaining = Math.max(1, response.responseHeaders().getInt("ratelimit-reset", 10));
int requestsRemaining = response.responseHeaders().getInt("ratelimit-remaining", 110);

synchronized (lock) {
this.actionsLeftThisMinute = requestsRemaining;
lock.notifyAll();
}

resetForFirstRequest(timeRemaining);
}
return new ResponseHandlingResult(true, response.status().code());
}

/**
* Wakes up all waiting threads in the specified amount of seconds
* (Adds two seconds to account for sync errors in the server).
*
* @param timeRemaining how much time is left until the next reset
*/
private void resetForFirstRequest(int timeRemaining) {
Schedulers.parallel().schedule(() -> {
this.firstRequestReturned.set(false);
this.overflowStartedNewClock.set(false);
synchronized (lock) {
this.actionsLeftThisMinute = 1;
lock.notifyAll();
}
}, timeRemaining + 2, TimeUnit.SECONDS);
}

/**
* Controls a request
*/
private static class RequestCallback {
private final String url;
private final MonoSink<Tuple2<String, Integer>> monoSink;
private final ReactorHttpClient requestRateLimiter;
private final boolean isAuthenticated;
private boolean isCanceled = false;

private RequestCallback(String url, MonoSink<Tuple2<String, Integer>> monoSink, boolean isAuthenticated, ReactorHttpClient requestRateLimiter) {
this.url = url;
this.monoSink = monoSink;
this.requestRateLimiter = requestRateLimiter;
this.isAuthenticated = isAuthenticated;

this.monoSink.onCancel(() -> {
synchronized (this) {
this.isCanceled = true;
}
});
}

public boolean isCanceled() {
return this.isCanceled;
}

private void sendRequest() {
synchronized (this) {
if (isCanceled) {
synchronized (this.requestRateLimiter.lock) {
this.requestRateLimiter.actionsLeftThisMinute++;
this.requestRateLimiter.lock.notifyAll();
}
return;
}
}

(this.isAuthenticated ? requestRateLimiter.httpClient.headers(headers -> headers.add("API-Key", requestRateLimiter.apiKey.toString())) : requestRateLimiter.httpClient).get()
.uri(url)
.responseSingle((response, body) -> {
try {
ResponseHandlingResult result = requestRateLimiter.handleResponse(response, this);

if (result.allowToPass) {
return body.asString().zipWith(Mono.just(result.statusCode));
}
return Mono.empty();
} catch (InterruptedException e) {
monoSink.error(e);
throw new AssertionError("ERROR: Queue insertion got interrupted, serious problem! (this should not happen!!)", e);
}
}).subscribe(this.monoSink::success);
}
}

/**
* Data object
*/
private static class ResponseHandlingResult {
public final boolean allowToPass;
public final int statusCode;

public ResponseHandlingResult(boolean allowToPass, int statusCode) {
this.allowToPass = allowToPass;
this.statusCode = statusCode;
}
}
}
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
<module>hypixel-api-example</module>
<module>hypixel-api-transport-apache</module>
<module>hypixel-api-transport-unirest</module>
<module>hypixel-api-transport-reactor</module>
</modules>

<distributionManagement>
Expand Down