Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redis adapter #74

Closed
putty-kang opened this issue Oct 8, 2023 · 9 comments · Fixed by #78
Closed

Redis adapter #74

putty-kang opened this issue Oct 8, 2023 · 9 comments · Fixed by #78
Labels
help wanted Extra attention is needed Type: Bug Something isn't working

Comments

@putty-kang
Copy link

Publish offline messages, simulate subscription exceptions, set to retry 100 times, and disconnect the subscription end when sending 10 times,
When the subscriber reconnects, the subsequent retry message cannot be received and the message is lost

@icebob
Copy link
Member

icebob commented Oct 8, 2023

Please create a reproduction code.

@putty-kang
Copy link
Author

putty-kang commented Nov 30, 2023

// static-service.js
const { ServiceBroker } = require("moleculer");
const ChannelsMiddleware = require("@moleculer/channels").Middleware;
const broker = new ServiceBroker({
  namespace: "test",
  nodeID: "test1",
  transporter: "TCP",
  middlewares:[ChannelsMiddleware({
    adapter:"redis://127.0.0.1:6379"
  })]
});

// Start the Moleculer broker
broker.start();

broker.repl();

setInterval(() => {
  broker.sendToChannel("order.created", {
    id: 1234,
    items: "test"
  });
}, 100);


// Start the service to serve static resources

const serviceSchema = {
  name: "subscriber",
  channels: {
    "order.created": {
      group: "mygroup",
      redis: {
        minIdleTime: 1000,
        claimInterval: 1,
        startID: "0"
      },
      maxRetries: 100,
      handler(payload) {
          console.log(payload);
          throw new Error;
      }
    }
  },
}
broker.createService(serviceSchema).then();

// send 10 times

broker.destroyService("subscriber").then(()=>{
  setTimeout(()=>{
    // then create new Service, Remaining transmission times data loss

broker.createService(serviceSchema);
  }, 10000);
});

@putty-kang
Copy link
Author

How to ensure that data in pending is not lost after destroying the service

@valeeum
Copy link

valeeum commented Dec 6, 2023

@putty-kang
The default value for moleculer channels configuration for redis.startID is "$" which means pick up only new messages. You will need to store and manage this value if you want to pick up messages that may have come in while service was down.

From README documentation (https://github.com/moleculerjs/moleculer-channels)
Starting point when consumers fetch data from the consumer group. By default equals to $, i.e., consumers will only see new elements arriving in the stream. More info [here](https://redis.io/commands/XGROUP)

@putty-kang
Copy link
Author

@valeeum I set startID is 0

@putty-kang
Copy link
Author

putty-kang commented Dec 12, 2023

@icebob

const pubClient = this.clients.get(this.pubName);
// 1. Delete consumer from the consumer group
// 2. Do NOT destroy the consumer group
// https://redis.io/commands/XGROUP
return pubClient.xgroup(
	"DELCONSUMER",
	chan.name, // Stream Name
	chan.group, // Consumer Group
	chan.id // Consumer ID
);

================================================================================

This code results in the loss of unconsumed messages in the reconnecting extension after the service is disconnected

@putty-kang
Copy link
Author

@icebob

You need to first check if there is a message in the ending, and then delete the corresponding consumer

@putty-kang
Copy link
Author

putty-kang commented Dec 12, 2023

const pubClient = this.clients.get(this.pubName);
let pending = await pubClient.xpending(
	chan.name,
	chan.group,
	"-",
	"+",
	10 // Max reported entries
);
if(!pending) {
	// 1. Delete consumer from the consumer group
	// 2. Do NOT destroy the consumer group
	// https://redis.io/commands/XGROUP
	return pubClient.xgroup(
		"DELCONSUMER",
		chan.name, // Stream Name
		chan.group, // Consumer Group
		chan.id // Consumer ID
	);
}

@icebob icebob added Type: Bug Something isn't working help wanted Extra attention is needed labels Jan 13, 2024
@AndreMaz
Copy link
Member

AndreMaz commented Apr 5, 2024

Repro example and the proposed fix are here: 8c3eb6e

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
help wanted Extra attention is needed Type: Bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants