Skip to content

Commit

Permalink
[#2356] Improvement: Fix the warning: unchecked call to put(E) as a m…
Browse files Browse the repository at this point in the history
…ember of the raw type RMRecordsReader.Queue (#2357)

### What changes were proposed in this pull request?
Fix the warning: unchecked call to put(E) as a member of the raw type RMRecordsReader.Queue

### Why are the changes needed?
Fix: #2356

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
current UT
<img width="1397" alt="image" src="https://github.com/user-attachments/assets/b80755db-d74e-42af-932b-72c955ed5086" />
  • Loading branch information
cchung100m authored Feb 13, 2025
1 parent 608471a commit 124a901
Showing 1 changed file with 5 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,8 @@ public int compare(K o1, K o2) {
this.maxBufferPerPartition = Math.max(1, maxBuffer / partitionIds.size());
this.maxRecordsNumPerBuffer =
rssConf.get(RSS_CLIENT_REMOTE_MERGE_READER_MAX_RECORDS_PER_BUFFER);
this.results = new Queue(maxBufferPerPartition * maxRecordsNumPerBuffer * partitionIds.size());
this.results =
new Queue<>(maxBufferPerPartition * maxRecordsNumPerBuffer * partitionIds.size());
this.retryMax =
rssConf.getInteger(
RssClientConfig.RSS_CLIENT_RETRY_MAX,
Expand Down Expand Up @@ -400,7 +401,7 @@ class Queue<E> {
private volatile boolean producerDone = false;

Queue(int maxBufferPerPartition) {
this.queue = new LinkedBlockingQueue(maxBufferPerPartition);
this.queue = new LinkedBlockingQueue<>(maxBufferPerPartition);
}

public void setProducerDone(boolean producerDone) {
Expand Down Expand Up @@ -438,7 +439,7 @@ class RecordsFetcher extends Thread {
private long sleepTime;
private long blockId = 1; // Merged blockId counting from 1
private RecordBuffer recordBuffer;
private Queue nextQueue;
private Queue<RecordBuffer> nextQueue;
private List<ShuffleServerInfo> serverInfos;
private ShuffleServerClient client;
private int choose;
Expand Down Expand Up @@ -561,7 +562,7 @@ class RecordsCombiner extends Thread {
// distributed in different RecordBuffers. So we need a cachedBuffer used
// to record the buffer of the last combine.
private RecordBuffer cached;
private Queue nextQueue;
private Queue<RecordBuffer> nextQueue;

RecordsCombiner(int partitionId) {
this.partitionId = partitionId;
Expand Down

0 comments on commit 124a901

Please sign in to comment.