-
Notifications
You must be signed in to change notification settings - Fork 152
Added project reactor transport - reactive rate limiting (Tested) #425
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
55 commits
Select commit
Hold shift + click to select a range
5433cba
Just a start/idea of what we have in mind for 4.0. All subject to cha…
ConnorLinfoot 75e7b01
Add HTTPQueryParams for some cleaner handling over an array
ConnorLinfoot db810c9
Add a README.md here and remove PlayerCountReply.java
ConnorLinfoot 2475435
Add ApacheHTTPClient to Example
ConnorLinfoot 79512a1
Merge branch 'master' into 4.0.0
ConnorLinfoot ffefdf6
checkReply return the same type
ConnorLinfoot 964e8b3
Merge branch 'master' into 4.0.0
ConnorLinfoot a9baafa
Add comment for issue 249
ConnorLinfoot 4665b53
Tweak doc
ConnorLinfoot 643ae52
Some more tweaks towards javadocs etc
ConnorLinfoot bc7d17a
Merge branch 'master' into 4.0.0
ConnorLinfoot c3e69f5
Merge branch 'master' into 4.0.0
ConnorLinfoot cda8ec7
Move SkyBlockBazaarReply to use longs instead of ints
ConnorLinfoot f3c8a6d
Fix Bazaar Example
ConnorLinfoot 9e6dfd3
Add a comment about the recent change to the name endpoint
ConnorLinfoot ef86bc8
Merge branch 'master' into 4.0.0
ConnorLinfoot 98d3f15
Make member static
ConnorLinfoot edc181b
Add REPLAY and SMP GameType
ConnorLinfoot 25eed71
Readme
ConnorLinfoot ab05344
Some tweaks, API Key is now passed and only handdled by the HTTP Client
ConnorLinfoot 71d8bf1
Move a lot of things
ConnorLinfoot d79e4c2
Readme changes
ConnorLinfoot 1439a17
Readme changes
ConnorLinfoot 8eef69d
Update README.md
ConnorLinfoot e4534c0
update license time
ConnorLinfoot 7f1fea8
Remove these, will make sure its all on the main docs
ConnorLinfoot 4cecf98
Example link
ConnorLinfoot 44dac46
tweaks
ConnorLinfoot b153827
Tweak
ConnorLinfoot 687a737
Link direct to README.md
ConnorLinfoot 16d3069
Tweak examples
ConnorLinfoot 3b60271
New exceptions and rename some stuff
ConnorLinfoot 8b03f3f
Add getSkyBlockProfiles method
ConnorLinfoot 903841b
Add ServerType along with LobbyType
ConnorLinfoot 816379c
Add a comment
ConnorLinfoot d040528
Back to Hypixel example
ConnorLinfoot 7f101c4
Add User Agent
ConnorLinfoot 3c1ef7a
Be consistent
ConnorLinfoot db06248
Include versions in Readme
ConnorLinfoot ecb87d9
Merge branch 'master' into 4.0.0
ConnorLinfoot 64ac624
Remove skyblock profiles cause another PR has it
ConnorLinfoot 0edbb73
Added reactive transport
GrizzlT 6984771
Style conventions update + readme
GrizzlT 7d191b2
updated main Readme
GrizzlT b0abf54
updated main readme
GrizzlT b842d14
bugfix
GrizzlT bbe53b3
Update documentation
GrizzlT 45cb42f
added limitRate
GrizzlT 8c1aeac
changed limitRate from 3 to 1
GrizzlT fe8791c
limitRate to delayElements
GrizzlT 9ed93ed
added subscription on scheduler
GrizzlT e752aca
changed delay Scheduler
GrizzlT efa342d
different blocking method + unit tested and fixed bugs and inconsiste…
GrizzlT d471048
Manual merge
ConnorLinfoot f4c562c
Change versions to 4.0
ConnorLinfoot File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
|
||
Hypixel Public API - Reactive Transport | ||
====== | ||
|
||
### Usage | ||
|
||
```xml | ||
<dependency> | ||
<groupId>net.hypixel</groupId> | ||
<artifactId>hypixel-api-transport-reactor</artifactId> | ||
<version>4.0</version> | ||
</dependency> | ||
``` | ||
|
||
Can also be included with Gradle. | ||
|
||
```gradle | ||
dependencies { | ||
implementation 'net.hypixel:hypixel-api-transport-reactor:4.0' | ||
} | ||
``` | ||
|
||
### Example code | ||
|
||
```java | ||
public class Main { | ||
public static void main(String[] args) { | ||
HypixelHttpClient client = new ReactorHttpClient(UUID.fromString("your-api-key-here")); | ||
HypixelAPI hypixelAPI = new HypixelAPI(client); | ||
hypixelAPI.getPlayerByName("Hypixel") | ||
.exceptionally(throwable -> { | ||
// Handle exceptions here | ||
throwable.printStackTrace(); | ||
return null; | ||
}) | ||
.thenAccept(System.out::println); | ||
} | ||
} | ||
``` | ||
|
||
### Dependencies | ||
|
||
This transport depends on the following: | ||
|
||
* [Google Gson library - 2.8.6](https://mvnrepository.com/artifact/com.google.code.gson/gson) (for hypixel-api-core) | ||
* [Reactor Core 3.4.5](https://mvnrepository.com/artifact/io.projectreactor/reactor-core) (for reactor netty) | ||
* Reactor Netty [(project-reactor)](https://projectreactor.io/docs): | ||
* [Netty Core 1.0.6](https://mvnrepository.com/artifact/io.projectreactor.netty/reactor-netty-core) | ||
* [Netty Http 1.0.6](https://mvnrepository.com/artifact/io.projectreactor.netty/reactor-netty-http) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<project xmlns="http://maven.apache.org/POM/4.0.0" | ||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
<parent> | ||
<artifactId>hypixel-api</artifactId> | ||
<groupId>net.hypixel</groupId> | ||
<version>4.0-SNAPSHOT</version> | ||
</parent> | ||
<modelVersion>4.0.0</modelVersion> | ||
|
||
<artifactId>hypixel-api-transport-reactor</artifactId> | ||
|
||
<build> | ||
<plugins> | ||
<plugin> | ||
<groupId>org.apache.maven.plugins</groupId> | ||
<artifactId>maven-compiler-plugin</artifactId> | ||
<configuration> | ||
<source>1.8</source> | ||
<target>1.8</target> | ||
</configuration> | ||
</plugin> | ||
</plugins> | ||
</build> | ||
|
||
<dependencyManagement> | ||
<dependencies> | ||
<dependency> | ||
<groupId>io.projectreactor</groupId> | ||
<artifactId>reactor-bom</artifactId> | ||
<version>2020.0.6</version> | ||
<type>pom</type> | ||
<scope>import</scope> | ||
</dependency> | ||
</dependencies> | ||
</dependencyManagement> | ||
|
||
<dependencies> | ||
<dependency> | ||
<groupId>net.hypixel</groupId> | ||
<artifactId>hypixel-api-core</artifactId> | ||
<version>4.0-SNAPSHOT</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>io.projectreactor.netty</groupId> | ||
<artifactId>reactor-netty-core</artifactId> | ||
</dependency> | ||
<dependency> | ||
<groupId>io.projectreactor.netty</groupId> | ||
<artifactId>reactor-netty-http</artifactId> | ||
</dependency> | ||
</dependencies> | ||
|
||
</project> |
262 changes: 262 additions & 0 deletions
262
hypixel-api-transport-reactor/src/main/java/net/hypixel/api/reactor/ReactorHttpClient.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,262 @@ | ||
package net.hypixel.api.reactor; | ||
|
||
import io.netty.handler.codec.http.HttpResponseStatus; | ||
import net.hypixel.api.http.HypixelHttpClient; | ||
import net.hypixel.api.http.HypixelHttpResponse; | ||
import reactor.core.Disposable; | ||
import reactor.core.publisher.Flux; | ||
import reactor.core.publisher.Mono; | ||
import reactor.core.publisher.MonoSink; | ||
import reactor.core.scheduler.Schedulers; | ||
import reactor.netty.http.client.HttpClient; | ||
import reactor.netty.http.client.HttpClientResponse; | ||
import reactor.util.function.Tuple2; | ||
|
||
import java.time.Duration; | ||
import java.util.UUID; | ||
import java.util.concurrent.ArrayBlockingQueue; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
|
||
public class ReactorHttpClient implements HypixelHttpClient { | ||
private final HttpClient httpClient; | ||
private final UUID apiKey; | ||
|
||
// Marker to reset the request counter and release waiting threads | ||
private final AtomicBoolean firstRequestReturned = new AtomicBoolean(false); | ||
// Marker to only schedule a reset clock once on error 429 | ||
private final AtomicBoolean overflowStartedNewClock = new AtomicBoolean(false); | ||
|
||
// Callbacks that will trigger their corresponding requests | ||
private final ArrayBlockingQueue<RequestCallback> blockingQueue; | ||
|
||
// For shutting down the flux that emits request callbacks | ||
private final Disposable requestCallbackFluxDisposable; | ||
|
||
private final Object lock = new Object(); | ||
|
||
/* | ||
* How many requests we can send before reaching the limit | ||
* Starts as 1 so the first request returns and resets this value before allowing other requests to be sent. | ||
*/ | ||
private int actionsLeftThisMinute = 1; | ||
|
||
/** | ||
* Constructs a new instance of this client using the specified API key. | ||
* | ||
* @param apiKey the key associated with this connection | ||
* @param minDelayBetweenRequests minimum time between sending requests (in ms) default is 8 | ||
* @param bufferCapacity fixed size of blockingQueue | ||
*/ | ||
public ReactorHttpClient(UUID apiKey, long minDelayBetweenRequests, int bufferCapacity) { | ||
this.apiKey = apiKey; | ||
this.httpClient = HttpClient.create().secure(); | ||
this.blockingQueue = new ArrayBlockingQueue<>(bufferCapacity); | ||
|
||
this.requestCallbackFluxDisposable = Flux.<RequestCallback>generate((synchronousSink) -> { | ||
try { | ||
RequestCallback callback = blockingQueue.take(); | ||
// prune skipped/completed requests to avoid counting them | ||
while (callback.isCanceled()) { | ||
callback = blockingQueue.take(); | ||
} | ||
|
||
synchronized (lock) { | ||
while (this.actionsLeftThisMinute <= 0) { | ||
lock.wait(); | ||
} | ||
|
||
actionsLeftThisMinute--; | ||
} | ||
synchronousSink.next(callback); | ||
} catch (InterruptedException e) { | ||
throw new AssertionError("This should not have been possible", e); | ||
} | ||
}).subscribeOn(Schedulers.boundedElastic()).delayElements(Duration.ofMillis(minDelayBetweenRequests), Schedulers.boundedElastic()).subscribe(RequestCallback::sendRequest); | ||
} | ||
|
||
public ReactorHttpClient(UUID apiKey, long minDelayBetweenRequests) | ||
{ | ||
this(apiKey, minDelayBetweenRequests, 500); | ||
} | ||
|
||
public ReactorHttpClient(UUID apiKey, int bufferCapacity) | ||
{ | ||
this(apiKey, 8, bufferCapacity); | ||
} | ||
|
||
public ReactorHttpClient(UUID apiKey) | ||
{ | ||
this(apiKey, 8, 500); | ||
} | ||
|
||
/** | ||
* Canceling the returned future will result in canceling the request if possible | ||
*/ | ||
@Override | ||
public CompletableFuture<HypixelHttpResponse> makeRequest(String url) { | ||
return toHypixelResponseFuture(makeRequest(url, false)); | ||
} | ||
|
||
/** | ||
* Canceling the returned future will result in canceling the request if possible | ||
*/ | ||
@Override | ||
public CompletableFuture<HypixelHttpResponse> makeAuthenticatedRequest(String url) { | ||
return toHypixelResponseFuture(makeRequest(url, true)); | ||
} | ||
|
||
private static CompletableFuture<HypixelHttpResponse> toHypixelResponseFuture(Mono<Tuple2<String, Integer>> result) { | ||
return result.map(tuple -> new HypixelHttpResponse(tuple.getT2(), tuple.getT1())) | ||
.toFuture(); | ||
} | ||
|
||
@Override | ||
public void shutdown() { | ||
this.requestCallbackFluxDisposable.dispose(); | ||
} | ||
|
||
/** | ||
* Makes a request to the Hypixel api and returns a {@link Mono<Tuple2<String, Integer>>} containing | ||
* the response body and status code, canceling this mono will prevent the request from being sent if possible | ||
* @param path full url | ||
* @param isAuthenticated whether to enable authentication or not | ||
*/ | ||
public Mono<Tuple2<String, Integer>> makeRequest(String path, boolean isAuthenticated) { | ||
return Mono.<Tuple2<String, Integer>>create(sink -> { | ||
RequestCallback callback = new RequestCallback(path, sink, isAuthenticated, this); | ||
|
||
try { | ||
this.blockingQueue.put(callback); | ||
} catch (InterruptedException e) { | ||
sink.error(e); | ||
throw new AssertionError("Queue insertion interrupted. This should not have been possible", e); | ||
} | ||
}).subscribeOn(Schedulers.boundedElastic()); | ||
} | ||
|
||
/** | ||
* Reads response status and retries error 429 (too many requests) | ||
* The first request after every limit reset will be used to schedule the next limit reset | ||
* | ||
* @param response the {@link HttpClientResponse} from our request | ||
* @param requestCallback the callback controlling our request | ||
* @return whether to return the request body or wait for a retry | ||
*/ | ||
private ResponseHandlingResult handleResponse(HttpClientResponse response, RequestCallback requestCallback) throws InterruptedException { | ||
if (response.status() == HttpResponseStatus.TOO_MANY_REQUESTS) { | ||
int timeRemaining = Math.max(1, response.responseHeaders().getInt("ratelimit-reset", 10)); | ||
|
||
if (this.overflowStartedNewClock.compareAndSet(false, true)) { | ||
synchronized (lock) { | ||
this.actionsLeftThisMinute = 0; | ||
} | ||
resetForFirstRequest(timeRemaining); | ||
} | ||
|
||
// execute this last to prevent a possible exception from messing up our clock synchronization | ||
this.blockingQueue.put(requestCallback); | ||
return new ResponseHandlingResult(false, response.status().code()); | ||
} | ||
|
||
if (this.firstRequestReturned.compareAndSet(false, true)) { | ||
int timeRemaining = Math.max(1, response.responseHeaders().getInt("ratelimit-reset", 10)); | ||
int requestsRemaining = response.responseHeaders().getInt("ratelimit-remaining", 110); | ||
|
||
synchronized (lock) { | ||
this.actionsLeftThisMinute = requestsRemaining; | ||
lock.notifyAll(); | ||
} | ||
|
||
resetForFirstRequest(timeRemaining); | ||
} | ||
return new ResponseHandlingResult(true, response.status().code()); | ||
} | ||
|
||
/** | ||
* Wakes up all waiting threads in the specified amount of seconds | ||
* (Adds two seconds to account for sync errors in the server). | ||
* | ||
* @param timeRemaining how much time is left until the next reset | ||
*/ | ||
private void resetForFirstRequest(int timeRemaining) { | ||
Schedulers.parallel().schedule(() -> { | ||
this.firstRequestReturned.set(false); | ||
this.overflowStartedNewClock.set(false); | ||
synchronized (lock) { | ||
this.actionsLeftThisMinute = 1; | ||
lock.notifyAll(); | ||
} | ||
}, timeRemaining + 2, TimeUnit.SECONDS); | ||
} | ||
|
||
/** | ||
* Controls a request | ||
*/ | ||
private static class RequestCallback { | ||
private final String url; | ||
private final MonoSink<Tuple2<String, Integer>> monoSink; | ||
private final ReactorHttpClient requestRateLimiter; | ||
private final boolean isAuthenticated; | ||
private boolean isCanceled = false; | ||
|
||
private RequestCallback(String url, MonoSink<Tuple2<String, Integer>> monoSink, boolean isAuthenticated, ReactorHttpClient requestRateLimiter) { | ||
this.url = url; | ||
this.monoSink = monoSink; | ||
this.requestRateLimiter = requestRateLimiter; | ||
this.isAuthenticated = isAuthenticated; | ||
|
||
this.monoSink.onCancel(() -> { | ||
synchronized (this) { | ||
this.isCanceled = true; | ||
} | ||
}); | ||
} | ||
|
||
public boolean isCanceled() { | ||
return this.isCanceled; | ||
} | ||
|
||
private void sendRequest() { | ||
synchronized (this) { | ||
if (isCanceled) { | ||
synchronized (this.requestRateLimiter.lock) { | ||
this.requestRateLimiter.actionsLeftThisMinute++; | ||
this.requestRateLimiter.lock.notifyAll(); | ||
} | ||
return; | ||
} | ||
} | ||
|
||
(this.isAuthenticated ? requestRateLimiter.httpClient.headers(headers -> headers.add("API-Key", requestRateLimiter.apiKey.toString())) : requestRateLimiter.httpClient).get() | ||
.uri(url) | ||
.responseSingle((response, body) -> { | ||
try { | ||
ResponseHandlingResult result = requestRateLimiter.handleResponse(response, this); | ||
|
||
if (result.allowToPass) { | ||
return body.asString().zipWith(Mono.just(result.statusCode)); | ||
} | ||
return Mono.empty(); | ||
} catch (InterruptedException e) { | ||
monoSink.error(e); | ||
throw new AssertionError("ERROR: Queue insertion got interrupted, serious problem! (this should not happen!!)", e); | ||
} | ||
}).subscribe(this.monoSink::success); | ||
} | ||
} | ||
|
||
/** | ||
* Data object | ||
*/ | ||
private static class ResponseHandlingResult { | ||
public final boolean allowToPass; | ||
public final int statusCode; | ||
|
||
public ResponseHandlingResult(boolean allowToPass, int statusCode) { | ||
this.allowToPass = allowToPass; | ||
this.statusCode = statusCode; | ||
} | ||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.