Skip to content

Commit

Permalink
enable config tls for pulsar
Browse files Browse the repository at this point in the history
  • Loading branch information
HaitaoDeng committed Apr 6, 2022
1 parent 4078d1c commit e77ff9c
Show file tree
Hide file tree
Showing 51 changed files with 1,495 additions and 632 deletions.
52 changes: 52 additions & 0 deletions lib/api/http_util.dart
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,54 @@
// under the License.
//

import 'dart:collection';
import 'dart:io';

import 'package:dio/adapter.dart';
import 'package:dio/dio.dart';
import 'package:paas_dashboard_flutter/api/tls_context.dart';

class HttpUtil {
static final String http = "http://";
static final String https = "https://";
static const int CONNECT_TIMEOUT = 10000;
static const int RECEIVE_TIMEOUT = 10000;
static Map<SERVER, Map<int, Dio>> clients = {};

static Dio getClient(TlsContext tlsContext, SERVER service, int id) {
clients.putIfAbsent(service, () => new HashMap.identity());
clients[service]!.putIfAbsent(id, () => createClient(tlsContext));
return clients[service]![id]!;
}

static Dio createClient(TlsContext tlsContext) {
BaseOptions options = BaseOptions(
connectTimeout: CONNECT_TIMEOUT,
receiveTimeout: RECEIVE_TIMEOUT,
);
Dio client = new Dio(options);
if (!tlsContext.enableTls) {
return client;
}
SecurityContext context = SecurityContext();
if (tlsContext.clientKeyPassword.isNotEmpty) {
context.usePrivateKey(tlsContext.clientKeyFile, password: tlsContext.clientKeyPassword);
} else {
context.usePrivateKey(tlsContext.clientKeyFile);
}
context.useCertificateChain(tlsContext.clientCertFile);
context.setTrustedCertificates(tlsContext.caFile);

(client.httpClientAdapter as DefaultHttpClientAdapter).onHttpClientCreate = (client) {
HttpClient httpClient = new HttpClient(context: context);
httpClient.badCertificateCallback = (X509Certificate cert, String host, int port) {
return true;
};
return httpClient;
};
return client;
}

static bool abnormal(int code) {
return code < 200 || code >= 300;
}
Expand All @@ -26,3 +73,8 @@ class HttpUtil {
return code >= 200 && code < 300;
}
}

enum SERVER {
PULSAR,
PULSAR_FUNCTION,
}
54 changes: 30 additions & 24 deletions lib/api/pulsar/pulsar_cluster_api.dart
Original file line number Diff line number Diff line change
Expand Up @@ -20,43 +20,49 @@
import 'dart:convert';
import 'dart:developer';

import 'package:http/http.dart' as http;
import 'package:dio/dio.dart';
import 'package:paas_dashboard_flutter/api/http_util.dart';
import 'package:paas_dashboard_flutter/api/pulsar/pulsar_tenant_api.dart';
import 'package:paas_dashboard_flutter/api/tls_context.dart';
import 'package:paas_dashboard_flutter/module/pulsar/pulsar_cluster.dart';

class PulsarClusterApi {
static Future<List<ClusterResp>> cluster(String host, int port) async {
String version = await getVersion(host, port);
String tenantInfo = await PulsarTenantApi.getTenantInfo(host, port, "public");
static Future<List<ClusterResp>> cluster(int id, String host, int port, TlsContext tlsContext) async {
String version = await getVersion(id, host, port, tlsContext);
String tenantInfo = await PulsarTenantApi.getTenantInfo(id, host, port, "public", tlsContext);
String cluster = ((json.decode(tenantInfo) as Map)["allowedClusters"] as List)[0];
String url = 'http://$host:${port.toString()}/admin/v2/brokers/$cluster';
final brokersResponse = await http.get(Uri.parse(url));
if (HttpUtil.abnormal(brokersResponse.statusCode)) {
log('ErrorCode is ${brokersResponse.statusCode}, body is ${brokersResponse.body}');
throw Exception('ErrorCode is ${brokersResponse.statusCode}, body is ${brokersResponse.body}');
String url =
tlsContext.enableTls ? HttpUtil.https : HttpUtil.http + '$host:${port.toString()}/admin/v2/brokers/$cluster';
final brokersResponse = await HttpUtil.getClient(tlsContext, SERVER.PULSAR, id).get<String>(url);
if (HttpUtil.abnormal(brokersResponse.statusCode!)) {
log('ErrorCode is ${brokersResponse.statusCode}, body is ${brokersResponse.data}');
throw Exception('ErrorCode is ${brokersResponse.statusCode}, body is ${brokersResponse.data}');
}
List brokers = json.decode(brokersResponse.body) as List;
List brokers = json.decode(brokersResponse.data!) as List;
return brokers.map((e) => ClusterResp(e, version)).toList();
}

static Future<String> getVersion(String host, int port) async {
String url = 'http://$host:${port.toString()}/admin/v2/brokers/version';
final versionResponse = await http.get(Uri.parse(url));
if (HttpUtil.abnormal(versionResponse.statusCode)) {
log('ErrorCode is ${versionResponse.statusCode}, body is ${versionResponse.body}');
throw Exception('ErrorCode is ${versionResponse.statusCode}, body is ${versionResponse.body}');
static Future<String> getVersion(int id, String host, int port, TlsContext tlsContext) async {
String url =
tlsContext.enableTls ? HttpUtil.https : HttpUtil.http + '$host:${port.toString()}/admin/v2/brokers/version';
final versionResponse = await HttpUtil.getClient(tlsContext, SERVER.PULSAR, id)
.get<String>(url, options: new Options(responseType: ResponseType.json));
if (HttpUtil.abnormal(versionResponse.statusCode!)) {
log('ErrorCode is ${versionResponse.statusCode}, body is ${versionResponse.data}');
throw Exception('ErrorCode is ${versionResponse.statusCode}, body is ${versionResponse.data}');
}
return versionResponse.body;
return versionResponse.data!;
}

static Future<String> getLeader(String host, int port) async {
String url = 'http://$host:${port.toString()}/admin/v2/brokers/leaderBroker';
final leaderBrokerResponse = await http.get(Uri.parse(url));
if (HttpUtil.abnormal(leaderBrokerResponse.statusCode)) {
log('ErrorCode is ${leaderBrokerResponse.statusCode}, body is ${leaderBrokerResponse.body}');
throw Exception('ErrorCode is ${leaderBrokerResponse.statusCode}, body is ${leaderBrokerResponse.body}');
static Future<String> getLeader(int id, String host, int port, TlsContext tlsContext) async {
String url = tlsContext.enableTls
? HttpUtil.https
: HttpUtil.http + '$host:${port.toString()}/admin/v2/brokers/leaderBroker';
final leaderBrokerResponse = await HttpUtil.getClient(tlsContext, SERVER.PULSAR, id).get<String>(url);
if (HttpUtil.abnormal(leaderBrokerResponse.statusCode!)) {
log('ErrorCode is ${leaderBrokerResponse.statusCode}, body is ${leaderBrokerResponse.data}');
throw Exception('ErrorCode is ${leaderBrokerResponse.statusCode}, body is ${leaderBrokerResponse.data}');
}
return json.decode(leaderBrokerResponse.body)["serviceUrl"];
return json.decode(leaderBrokerResponse.data!)["serviceUrl"];
}
}
Loading

0 comments on commit e77ff9c

Please sign in to comment.