Skip to content

Commit

Permalink
support resetCursor by timestamp for persistent topic (#103)
Browse files Browse the repository at this point in the history
  • Loading branch information
HaitaoDeng authored Dec 17, 2022
1 parent eba4afe commit 817b9cb
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 0 deletions.
12 changes: 12 additions & 0 deletions lib/api/pulsar/pulsar_partitioned_topic_api.dart
Original file line number Diff line number Diff line change
Expand Up @@ -299,4 +299,16 @@ class PulsarPartitionedTopicApi {
}
return "send msg success";
}

static Future<String> resetCursorByTimestamp(int id, String host, int port, TlsContext tlsContext, String tenant,
String namespace, String topic, String subscription, int timestamp) async {
var url =
'${tlsContext.getSchema()}$host:${port.toString()}/admin/v2/persistent/$tenant/$namespace/$topic/subscription/$subscription/resetcursor/$timestamp';
var response = await HttpUtil.getClient(tlsContext, SERVER.PULSAR, id).post<String>(url);
if (HttpUtil.abnormal(response.statusCode!)) {
log('ErrorCode is ${response.statusCode}, body is ${response.data}');
throw Exception('ErrorCode is ${response.statusCode}, body is ${response.data}');
}
return response.data!;
}
}
2 changes: 2 additions & 0 deletions lib/l10n/intl_en.arb
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
"producer": "Producer",
"producerList": "Producer List",
"refresh": "Refresh",
"resetCursor": "ResetCursor",
"resetCursorWithHint": "Please enter the timestamp in milliseconds",
"result": "Result",
"searchByMessageId": "Search by MessageId, type is ledgerId entryId, submit with enter key",
"searchByMessageIdWithHint": "Search by MessageId, single search type should be ledgerId entryId, multi search type should be ledgerId entryId entryId, submit with enter key",
Expand Down
2 changes: 2 additions & 0 deletions lib/l10n/intl_zh.arb
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
"producer": "生产者",
"producerList": "生产者列表",
"refresh": "刷新",
"resetCursor": "重置游标",
"resetCursorWithHint": "请输入时间戳,单位毫秒",
"result": "结果",
"searchByMessageId": "通过messageId查询消息,格式ledgerId entryId,按enter键进行查询。",
"searchByMessageIdWithHint": "通过messageId查询消息,单条查询格式ledgerId entryId,范围查询格式位ledgerId entryId entryId,按enter键进行查询。",
Expand Down
70 changes: 70 additions & 0 deletions lib/ui/pulsar/widget/pulsar_partitioned_topic_subscription.dart
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
//

import 'package:flutter/material.dart';
import 'package:flutter_datetime_picker/flutter_datetime_picker.dart';
import 'package:paas_dashboard_flutter/generated/l10n.dart';
import 'package:paas_dashboard_flutter/ui/component/clear_backlog_button.dart';
import 'package:paas_dashboard_flutter/ui/util/alert_util.dart';
import 'package:paas_dashboard_flutter/ui/util/exception_util.dart';
import 'package:paas_dashboard_flutter/ui/util/spinner_util.dart';
import 'package:paas_dashboard_flutter/vm/pulsar/pulsar_partitioned_topic_subscription_view_model.dart';
Expand All @@ -35,9 +37,16 @@ class PulsarPartitionedTopicSubscriptionWidget extends StatefulWidget {
}

class PulsarPartitionedTopicSubscriptionWidgetState extends State<PulsarPartitionedTopicSubscriptionWidget> {
final resetCursorByTimeTextController = TextEditingController();
final ledgerIdTextController = TextEditingController();
final entryIdTextController = TextEditingController();

@override
void initState() {
super.initState();
resetCursorByTimeTextController.addListener(() {});
ledgerIdTextController.addListener(() {});
entryIdTextController.addListener(() {});
final vm = Provider.of<PulsarPartitionedTopicSubscriptionViewModel>(context, listen: false);
vm.fetchSubscriptions();
}
Expand All @@ -52,6 +61,27 @@ class PulsarPartitionedTopicSubscriptionWidgetState extends State<PulsarPartitio
}
ExceptionUtil.processLoadException(vm, context);
ExceptionUtil.processOpException(vm, context);

var timeField = TextField(
decoration: InputDecoration(
fillColor: Colors.green,
labelText: Text(S.of(context).resetCursorWithHint).data,
hintText: Text(S.of(context).resetCursorWithHint).data,
),
controller: resetCursorByTimeTextController,
);

var timeButton = TextButton(
onPressed: () {
DatePicker.showDateTimePicker(context, showTitleActions: true, onChanged: (date) {
print("change $date");
}, onConfirm: (date) {
resetCursorByTimeTextController.text = date.millisecondsSinceEpoch.toString();
}, currentTime: DateTime.now(), locale: LocaleType.zh);
},
child: Text(S.of(context).timePick),
);

var subscriptionFuture = SingleChildScrollView(
child: DataTable(
showCheckboxColumn: false,
Expand All @@ -60,6 +90,7 @@ class PulsarPartitionedTopicSubscriptionWidgetState extends State<PulsarPartitio
const DataColumn(label: Text('MsgBacklog')),
const DataColumn(label: Text('MsgRateOut')),
DataColumn(label: Text(S.of(context).clearBacklog)),
DataColumn(label: Text(S.of(context).resetCursor)),
],
rows: vm.displayList
.map((data) => DataRow(cells: [
Expand All @@ -75,6 +106,45 @@ class PulsarPartitionedTopicSubscriptionWidgetState extends State<PulsarPartitio
DataCell(ClearBacklogButton(() {
vm.clearBacklog(data.subscriptionName);
})),
DataCell(TextButton(
onPressed: () {
showDialog(
context: context,
builder: (context) {
return AlertDialog(
title: Text(
S.of(context).resetCursor,
textAlign: TextAlign.center,
),
actions: <Widget>[
timeField,
timeButton,
TextButton(
child: Text(
S.of(context).cancel,
),
onPressed: () {
Navigator.of(context).pop();
},
),
TextButton(
child: Text(
S.of(context).confirm,
),
onPressed: () {
vm.resetCursorByTimestamp(data.subscriptionName,
int.parse(resetCursorByTimeTextController.value.text));
Navigator.of(context).pop();
AlertUtil.createDialog(S.of(context).success, context);
},
),
],
);
});
},
child: Text(
S.of(context).resetCursor,
))),
]))
.toList()),
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,15 @@ class PulsarPartitionedTopicSubscriptionViewModel extends BaseLoadListViewModel<
notifyListeners();
}
}

Future<void> resetCursorByTimestamp(String subscriptionName, int timestamp) async {
try {
await PulsarPartitionedTopicApi.resetCursorByTimestamp(
id, host, port, pulsarInstancePo.createTlsContext(), tenant, namespace, topic, subscriptionName, timestamp);
await fetchSubscriptions();
} on Exception catch (e) {
opException = e;
notifyListeners();
}
}
}

0 comments on commit 817b9cb

Please sign in to comment.