23
23
import java .util .Objects ;
24
24
import java .util .function .BooleanSupplier ;
25
25
import java .util .function .Consumer ;
26
+ import java .util .function .Function ;
26
27
27
28
/**
28
29
* Encapsulates licensing checking for CCR.
@@ -58,23 +59,89 @@ public boolean isCcrAllowed() {
58
59
59
60
/**
60
61
* Fetches the leader index metadata from the remote cluster. Before fetching the index metadata, the remote cluster is checked for
61
- * license compatibility with CCR. If the remote cluster is not licensed for CCR, the {@link ActionListener#onFailure(Exception)} method
62
- * of the specified listener is invoked. Otherwise, the specified consumer is invoked with the leader index metadata fetched from the
63
- * remote cluster.
62
+ * license compatibility with CCR. If the remote cluster is not licensed for CCR, the {@code onFailure} consumer is is invoked.
63
+ * Otherwise, the specified consumer is invoked with the leader index metadata fetched from the remote cluster.
64
64
*
65
65
* @param client the client
66
66
* @param clusterAlias the remote cluster alias
67
67
* @param leaderIndex the name of the leader index
68
- * @param listener the listener
68
+ * @param onFailure the failure consumer
69
69
* @param leaderIndexMetadataConsumer the leader index metadata consumer
70
70
* @param <T> the type of response the listener is waiting for
71
71
*/
72
72
public <T > void checkRemoteClusterLicenseAndFetchLeaderIndexMetadata (
73
73
final Client client ,
74
74
final String clusterAlias ,
75
75
final String leaderIndex ,
76
- final ActionListener < T > listener ,
76
+ final Consumer < Exception > onFailure ,
77
77
final Consumer <IndexMetaData > leaderIndexMetadataConsumer ) {
78
+
79
+ final ClusterStateRequest request = new ClusterStateRequest ();
80
+ request .clear ();
81
+ request .metaData (true );
82
+ request .indices (leaderIndex );
83
+ checkRemoteClusterLicenseAndFetchClusterState (
84
+ client ,
85
+ clusterAlias ,
86
+ request ,
87
+ onFailure ,
88
+ leaderClusterState -> leaderIndexMetadataConsumer .accept (leaderClusterState .getMetaData ().index (leaderIndex )),
89
+ licenseCheck -> indexMetadataNonCompliantRemoteLicense (leaderIndex , licenseCheck ),
90
+ e -> indexMetadataUnknownRemoteLicense (leaderIndex , clusterAlias , e ));
91
+ }
92
+
93
+ /**
94
+ * Fetches the leader cluster state from the remote cluster by the specified cluster state request. Before fetching the cluster state,
95
+ * the remote cluster is checked for license compliance with CCR. If the remote cluster is not licensed for CCR,
96
+ * the {@code onFailure} consumer is invoked. Otherwise, the specified consumer is invoked with the leader cluster state fetched from
97
+ * the remote cluster.
98
+ *
99
+ * @param client the client
100
+ * @param clusterAlias the remote cluster alias
101
+ * @param request the cluster state request
102
+ * @param onFailure the failure consumer
103
+ * @param leaderClusterStateConsumer the leader cluster state consumer
104
+ * @param <T> the type of response the listener is waiting for
105
+ */
106
+ public <T > void checkRemoteClusterLicenseAndFetchClusterState (
107
+ final Client client ,
108
+ final String clusterAlias ,
109
+ final ClusterStateRequest request ,
110
+ final Consumer <Exception > onFailure ,
111
+ final Consumer <ClusterState > leaderClusterStateConsumer ) {
112
+ checkRemoteClusterLicenseAndFetchClusterState (
113
+ client ,
114
+ clusterAlias ,
115
+ request ,
116
+ onFailure ,
117
+ leaderClusterStateConsumer ,
118
+ CcrLicenseChecker ::clusterStateNonCompliantRemoteLicense ,
119
+ e -> clusterStateUnknownRemoteLicense (clusterAlias , e ));
120
+ }
121
+
122
+ /**
123
+ * Fetches the leader cluster state from the remote cluster by the specified cluster state request. Before fetching the cluster state,
124
+ * the remote cluster is checked for license compliance with CCR. If the remote cluster is not licensed for CCR,
125
+ * the {@code onFailure} consumer is invoked. Otherwise, the specified consumer is invoked with the leader cluster state fetched from
126
+ * the remote cluster.
127
+ *
128
+ * @param client the client
129
+ * @param clusterAlias the remote cluster alias
130
+ * @param request the cluster state request
131
+ * @param onFailure the failure consumer
132
+ * @param leaderClusterStateConsumer the leader cluster state consumer
133
+ * @param nonCompliantLicense the supplier for when the license state of the remote cluster is non-compliant
134
+ * @param unknownLicense the supplier for when the license state of the remote cluster is unknown due to failure
135
+ * @param <T> the type of response the listener is waiting for
136
+ */
137
+ private <T > void checkRemoteClusterLicenseAndFetchClusterState (
138
+ final Client client ,
139
+ final String clusterAlias ,
140
+ final ClusterStateRequest request ,
141
+ final Consumer <Exception > onFailure ,
142
+ final Consumer <ClusterState > leaderClusterStateConsumer ,
143
+ final Function <RemoteClusterLicenseChecker .LicenseCheck , ElasticsearchStatusException > nonCompliantLicense ,
144
+ final Function <Exception , ElasticsearchStatusException > unknownLicense ) {
78
145
// we have to check the license on the remote cluster
79
146
new RemoteClusterLicenseChecker (client , XPackLicenseState ::isCcrAllowedForOperationMode ).checkRemoteClusterLicenses (
80
147
Collections .singletonList (clusterAlias ),
@@ -83,35 +150,25 @@ public <T> void checkRemoteClusterLicenseAndFetchLeaderIndexMetadata(
83
150
@ Override
84
151
public void onResponse (final RemoteClusterLicenseChecker .LicenseCheck licenseCheck ) {
85
152
if (licenseCheck .isSuccess ()) {
86
- final Client remoteClient = client .getRemoteClusterClient (clusterAlias );
87
- final ClusterStateRequest clusterStateRequest = new ClusterStateRequest ();
88
- clusterStateRequest .clear ();
89
- clusterStateRequest .metaData (true );
90
- clusterStateRequest .indices (leaderIndex );
91
- final ActionListener <ClusterStateResponse > clusterStateListener = ActionListener .wrap (
92
- r -> {
93
- final ClusterState remoteClusterState = r .getState ();
94
- final IndexMetaData leaderIndexMetadata =
95
- remoteClusterState .getMetaData ().index (leaderIndex );
96
- leaderIndexMetadataConsumer .accept (leaderIndexMetadata );
97
- },
98
- listener ::onFailure );
153
+ final Client leaderClient = client .getRemoteClusterClient (clusterAlias );
154
+ final ActionListener <ClusterStateResponse > clusterStateListener =
155
+ ActionListener .wrap (s -> leaderClusterStateConsumer .accept (s .getState ()), onFailure );
99
156
// following an index in remote cluster, so use remote client to fetch leader index metadata
100
- remoteClient .admin ().cluster ().state (clusterStateRequest , clusterStateListener );
157
+ leaderClient .admin ().cluster ().state (request , clusterStateListener );
101
158
} else {
102
- listener . onFailure ( incompatibleRemoteLicense ( leaderIndex , licenseCheck ));
159
+ onFailure . accept ( nonCompliantLicense . apply ( licenseCheck ));
103
160
}
104
161
}
105
162
106
163
@ Override
107
164
public void onFailure (final Exception e ) {
108
- listener . onFailure ( unknownRemoteLicense ( leaderIndex , clusterAlias , e ));
165
+ onFailure . accept ( unknownLicense . apply ( e ));
109
166
}
110
167
111
168
});
112
169
}
113
170
114
- private static ElasticsearchStatusException incompatibleRemoteLicense (
171
+ private static ElasticsearchStatusException indexMetadataNonCompliantRemoteLicense (
115
172
final String leaderIndex , final RemoteClusterLicenseChecker .LicenseCheck licenseCheck ) {
116
173
final String clusterAlias = licenseCheck .remoteClusterLicenseInfo ().clusterAlias ();
117
174
final String message = String .format (
@@ -127,7 +184,21 @@ private static ElasticsearchStatusException incompatibleRemoteLicense(
127
184
return new ElasticsearchStatusException (message , RestStatus .BAD_REQUEST );
128
185
}
129
186
130
- private static ElasticsearchStatusException unknownRemoteLicense (
187
+ private static ElasticsearchStatusException clusterStateNonCompliantRemoteLicense (
188
+ final RemoteClusterLicenseChecker .LicenseCheck licenseCheck ) {
189
+ final String clusterAlias = licenseCheck .remoteClusterLicenseInfo ().clusterAlias ();
190
+ final String message = String .format (
191
+ Locale .ROOT ,
192
+ "can not fetch remote cluster state as the remote cluster [%s] is not licensed for [ccr]; %s" ,
193
+ clusterAlias ,
194
+ RemoteClusterLicenseChecker .buildErrorMessage (
195
+ "ccr" ,
196
+ licenseCheck .remoteClusterLicenseInfo (),
197
+ RemoteClusterLicenseChecker ::isLicensePlatinumOrTrial ));
198
+ return new ElasticsearchStatusException (message , RestStatus .BAD_REQUEST );
199
+ }
200
+
201
+ private static ElasticsearchStatusException indexMetadataUnknownRemoteLicense (
131
202
final String leaderIndex , final String clusterAlias , final Exception cause ) {
132
203
final String message = String .format (
133
204
Locale .ROOT ,
@@ -138,4 +209,11 @@ private static ElasticsearchStatusException unknownRemoteLicense(
138
209
return new ElasticsearchStatusException (message , RestStatus .BAD_REQUEST , cause );
139
210
}
140
211
212
+ private static ElasticsearchStatusException clusterStateUnknownRemoteLicense (final String clusterAlias , final Exception cause ) {
213
+ final String message = String .format (
214
+ Locale .ROOT ,
215
+ "can not fetch remote cluster state as the license state of the remote cluster [%s] could not be determined" , clusterAlias );
216
+ return new ElasticsearchStatusException (message , RestStatus .BAD_REQUEST , cause );
217
+ }
218
+
141
219
}
0 commit comments