Skip to content

Commit 3d315a2

Browse files
committed
commented out all pinning code
1 parent d0051c5 commit 3d315a2

File tree

9 files changed

+179
-181
lines changed

9 files changed

+179
-181
lines changed

src/main/java/io/nats/client/PullRequestOptions.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,15 +60,15 @@ public String toJson() {
6060
JsonUtils.addFieldAsNanos(sb, IDLE_HEARTBEAT, idleHeartbeat);
6161
JsonUtils.addField(sb, GROUP, group);
6262
JsonUtils.addFieldWhenGtZero(sb, PRIORITY, priority);
63-
JsonUtils.addField(sb, ID, getPinId());
63+
// JsonUtils.addField(sb, ID, getPinId());
6464
JsonUtils.addField(sb, MIN_PENDING, minPending);
6565
JsonUtils.addField(sb, MIN_ACK_PENDING, minAckPending);
6666
return JsonUtils.endJson(sb).toString();
6767
}
6868

69-
protected String getPinId() {
70-
return null;
71-
}
69+
// protected String getPinId() {
70+
// return null;
71+
// }
7272

7373
/**
7474
* Get the batch size option value

src/main/java/io/nats/client/api/PriorityPolicy.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,10 @@
2424
public enum PriorityPolicy {
2525
None("none"),
2626
Overflow("overflow"),
27-
PinnedClient("pinned_client"),
2827
Prioritized("prioritized");
2928

29+
// PinnedClient("pinned_client")
30+
3031
private final String policy;
3132

3233
PriorityPolicy(String p) {

src/main/java/io/nats/client/impl/MessageManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,14 +97,14 @@ protected void trackJsMessage(Message msg) {
9797
NatsJetStreamMetaData meta = msg.metaData();
9898
lastStreamSeq = meta.streamSequence();
9999
lastConsumerSeq++;
100-
subTrackJsMessage(msg); // for subclasses so they don't have to acquire the lock
100+
// subTrackJsMessage(msg); // for subclasses so they don't have to acquire the lock
101101
}
102102
finally {
103103
stateChangeLock.unlock();
104104
}
105105
}
106106

107-
protected void subTrackJsMessage(Message msg) {}
107+
// protected void subTrackJsMessage(Message msg) {}
108108

109109
protected void handleHeartbeatError() {
110110
conn.executeCallback((c, el) -> el.heartbeatAlarm(c, sub, lastStreamSeq, lastConsumerSeq));

src/main/java/io/nats/client/impl/NatsConsumerContext.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import io.nats.client.api.ConsumerConfiguration;
1818
import io.nats.client.api.ConsumerInfo;
1919
import io.nats.client.api.OrderedConsumerConfiguration;
20-
import io.nats.client.api.PriorityPolicy;
2120
import io.nats.client.support.Validator;
2221
import org.jspecify.annotations.NonNull;
2322
import org.jspecify.annotations.Nullable;
@@ -145,14 +144,14 @@ private void checkState() throws IOException {
145144
}
146145
}
147146

148-
private void checkNotPinned(String label) throws IOException {
149-
ConsumerInfo ci = cachedConsumerInfo.get();
150-
if (ci != null) {
151-
if (ci.getConsumerConfiguration().getPriorityPolicy() == PriorityPolicy.PinnedClient) {
152-
throw new IOException("Pinned not allowed with " + label);
153-
}
154-
}
155-
}
147+
// private void checkNotPinned(String label) throws IOException {
148+
// ConsumerInfo ci = cachedConsumerInfo.get();
149+
// if (ci != null) {
150+
// if (ci.getConsumerConfiguration().getPriorityPolicy() == PriorityPolicy.PinnedClient) {
151+
// throw new IOException("Pinned not allowed with " + label);
152+
// }
153+
// }
154+
// }
156155

157156
private NatsMessageConsumerBase trackConsume(NatsMessageConsumerBase con) {
158157
lastConsumer.set(con);
@@ -222,7 +221,7 @@ public Message next(long maxWaitMillis) throws IOException, InterruptedException
222221
try {
223222
stateLock.lock();
224223
checkState();
225-
checkNotPinned("Next");
224+
// checkNotPinned("Next");
226225

227226
try {
228227
long inactiveThreshold = maxWaitMillis * 110 / 100; // 10% longer than the wait
@@ -291,7 +290,7 @@ public FetchConsumer fetch(@NonNull FetchConsumeOptions fetchConsumeOptions) thr
291290
try {
292291
stateLock.lock();
293292
checkState();
294-
checkNotPinned("Fetch");
293+
// checkNotPinned("Fetch");
295294
return (FetchConsumer)trackConsume(new NatsFetchConsumer(this, cachedConsumerInfo.get(), fetchConsumeOptions));
296295
}
297296
finally {

src/main/java/io/nats/client/impl/NatsFetchConsumer.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,16 +48,17 @@ class NatsFetchConsumer extends NatsMessageConsumerBase implements FetchConsumer
4848
inactiveThreshold = expiresInMillis * 110 / 100; // 10% longer than the wait
4949
}
5050

51-
PinnablePullRequestOptions pro = new PinnablePullRequestOptions(pmm.currentPinId,
52-
PullRequestOptions.builder(fetchConsumeOptions.getMaxMessages())
51+
// PinnablePullRequestOptions pro = new PinnablePullRequestOptions(pmm.currentPinId,
52+
PullRequestOptions pro = PullRequestOptions.builder(fetchConsumeOptions.getMaxMessages())
5353
.maxBytes(fetchConsumeOptions.getMaxBytes())
5454
.expiresIn(expiresInMillis)
5555
.idleHeartbeat(fetchConsumeOptions.getIdleHeartbeat())
5656
.noWait(isNoWait)
5757
.group(fetchConsumeOptions.getGroup())
5858
.priority(fetchConsumeOptions.getPriority())
5959
.minPending(fetchConsumeOptions.getMinPending())
60-
.minAckPending(fetchConsumeOptions.getMinAckPending()));
60+
.minAckPending(fetchConsumeOptions.getMinAckPending())
61+
.build();
6162
initSub(subscriptionMaker.subscribe(null, null, null, inactiveThreshold), false);
6263
pullSubject = sub._pull(pro, fetchConsumeOptions.raiseStatusWarnings(), this);
6364
startNanos = -1;

src/main/java/io/nats/client/impl/NatsMessageConsumer.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,15 +104,16 @@ else if (pmm.pendingMessages <= thresholdMessages || (pmm.trackingBytes && pmm.p
104104
private void repull() {
105105
int rePullMessages = Math.max(1, consumeOpts.getBatchSize() - pmm.pendingMessages);
106106
long rePullBytes = consumeOpts.getBatchBytes() == 0 ? 0 : consumeOpts.getBatchBytes() - pmm.pendingBytes;
107-
PinnablePullRequestOptions pro = new PinnablePullRequestOptions(pmm.currentPinId,
108-
PullRequestOptions.builder(rePullMessages)
107+
// PinnablePullRequestOptions pro = new PinnablePullRequestOptions(pmm.currentPinId,
108+
PullRequestOptions pro = PullRequestOptions.builder(rePullMessages)
109109
.maxBytes(rePullBytes)
110110
.expiresIn(consumeOpts.getExpiresInMillis())
111111
.idleHeartbeat(consumeOpts.getIdleHeartbeat())
112112
.group(consumeOpts.getGroup())
113113
.priority(consumeOpts.getPriority())
114114
.minPending(consumeOpts.getMinPending())
115-
.minAckPending(consumeOpts.getMinAckPending()));
115+
.minAckPending(consumeOpts.getMinAckPending())
116+
.build();
116117
sub._pull(pro, consumeOpts.raiseStatusWarnings(), this);
117118
}
118119
}

src/main/java/io/nats/client/impl/NatsMessageConsumerBase.java

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515

1616
import io.nats.client.JetStreamApiException;
1717
import io.nats.client.MessageConsumer;
18-
import io.nats.client.PullRequestOptions;
1918
import io.nats.client.api.ConsumerInfo;
2019

2120
import java.io.IOException;
@@ -140,17 +139,17 @@ protected void shutdownSub() {
140139
}
141140
}
142141

143-
static class PinnablePullRequestOptions extends PullRequestOptions {
144-
final String pinId;
145-
146-
public PinnablePullRequestOptions(String pinId, Builder b) {
147-
super(b);
148-
this.pinId = pinId;
149-
}
150-
151-
@Override
152-
protected String getPinId() {
153-
return pinId;
154-
}
155-
}
142+
// static class PinnablePullRequestOptions extends PullRequestOptions {
143+
// final String pinId;
144+
//
145+
// public PinnablePullRequestOptions(String pinId, Builder b) {
146+
// super(b);
147+
// this.pinId = pinId;
148+
// }
149+
//
150+
// @Override
151+
// protected String getPinId() {
152+
// return pinId;
153+
// }
154+
// }
156155
}

src/main/java/io/nats/client/impl/PullMessageManager.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@
1919
import io.nats.client.support.Status;
2020

2121
import static io.nats.client.impl.MessageManager.ManageResult.*;
22-
import static io.nats.client.support.NatsJetStreamConstants.*;
22+
import static io.nats.client.support.NatsJetStreamConstants.NATS_PENDING_BYTES;
23+
import static io.nats.client.support.NatsJetStreamConstants.NATS_PENDING_MESSAGES;
2324
import static io.nats.client.support.Status.*;
2425

2526
class PullMessageManager extends MessageManager {
@@ -29,7 +30,7 @@ class PullMessageManager extends MessageManager {
2930
protected boolean trackingBytes;
3031
protected boolean raiseStatusWarnings;
3132
protected PullManagerObserver pullManagerObserver;
32-
protected String currentPinId;
33+
// protected String currentPinId;
3334

3435
protected PullMessageManager(NatsConnection conn, SubscribeOptions so, boolean syncMode) {
3536
super(conn, so, syncMode);
@@ -150,12 +151,12 @@ protected ManageResult manage(Message msg) {
150151
return manageStatus(msg);
151152
}
152153

153-
@Override
154-
protected void subTrackJsMessage(Message msg) {
155-
if (msg.hasHeaders()) {
156-
currentPinId = msg.getHeaders().getFirst(NATS_PIN_ID_HDR);
157-
}
158-
}
154+
// @Override
155+
// protected void subTrackJsMessage(Message msg) {
156+
// if (msg.hasHeaders()) {
157+
// currentPinId = msg.getHeaders().getFirst(NATS_PIN_ID_HDR);
158+
// }
159+
// }
159160

160161
protected ManageResult manageStatus(Message msg) {
161162
Status status = msg.getStatus();
@@ -187,9 +188,9 @@ protected ManageResult manageStatus(Message msg) {
187188
}
188189
break;
189190

190-
case PIN_ERROR_CODE:
191-
currentPinId = null;
192-
return STATUS_TERMINUS;
191+
// case PIN_ERROR_CODE:
192+
// currentPinId = null;
193+
// return STATUS_TERMINUS;
193194
}
194195

195196
// All unknown 409s are errors, since that basically means the client is not aware of them.

0 commit comments

Comments
 (0)