1515 */
1616package com .google .cloud .bigtable .hbase ;
1717
18+ import static com .google .api .client .util .Preconditions .checkState ;
1819import static org .threeten .bp .Duration .ofMillis ;
1920
2021import java .io .FileInputStream ;
3233import com .google .api .gax .core .NoCredentialsProvider ;
3334import com .google .api .gax .grpc .GrpcTransportChannel ;
3435import com .google .api .gax .retrying .RetrySettings ;
35- import com .google .api .gax .rpc .FixedHeaderProvider ;
3636import com .google .api .gax .rpc .FixedTransportChannelProvider ;
3737import com .google .cloud .bigtable .config .BigtableOptions ;
3838import com .google .cloud .bigtable .config .BulkOptions ;
4646import com .google .cloud .bigtable .data .v2 .stub .BigtableStubSettings ;
4747
4848import io .grpc .ManagedChannelBuilder ;
49- import io .grpc .internal .GrpcUtil ;
5049
5150/**
5251 * Static methods to convert an instance of {@link Configuration} or {@link BigtableOptions} to a
@@ -65,49 +64,40 @@ public class BigtableDataSettingsFactory {
6564 */
6665 public static BigtableDataSettings fromBigtableOptions (final BigtableOptions options )
6766 throws IOException , GeneralSecurityException {
68- if (!options .getRetryOptions ().enableRetries ()) {
69- throw new IllegalStateException (
70- "Retry is must for BigtableDataSettings configuration from BigtableOptions." );
71- }
72-
67+ checkState (!options .getRetryOptions ().enableRetries (), "Disabling retries is not currently supported." );
68+
7369 BigtableDataSettings .Builder builder = BigtableDataSettings .newBuilder ();
7470
7571 InstanceName instanceName = InstanceName .newBuilder ().setProject (options .getProjectId ())
7672 .setInstance (options .getInstanceId ()).build ();
7773 builder .setInstanceName (instanceName );
7874 builder .setAppProfileId (options .getAppProfileId ());
7975
80- LOG .debug ("endpoint host %s." , options .getDataHost ());
81- LOG .debug ("endpoint host %s." , options .getPort ());
8276 builder .setEndpoint (options .getDataHost () + ":" + options .getPort ());
8377
84- buildCredentialProvider (builder , options .getCredentialOptions ());
85-
86- buildBulkOptions (builder , options );
78+ buildCredentialProviderSettings (builder , options .getCredentialOptions ());
8779
88- buildCheckAndMutateRow (builder , options . getCallOptionsConfig (). getShortRpcTimeoutMs () );
80+ buildBulkMutationsSettings (builder , options );
8981
90- buildReadModifyWrite (builder , options .getCallOptionsConfig ().getShortRpcTimeoutMs ());
82+ buildCheckAndMutateRowSettings (builder , options .getCallOptionsConfig ().getShortRpcTimeoutMs ());
9183
92- buildReadRows (builder , options );
84+ buildReadModifyWriteSettings (builder , options . getCallOptionsConfig (). getShortRpcTimeoutMs () );
9385
94- buildMutateRow (builder , options );
86+ buildReadRowsSettings (builder , options );
9587
96- buildSampleRowKeys (builder , options );
88+ buildMutateRowSettings (builder , options );
9789
98- // TODO: would it map to GrpcHeaderInterceptor? or we should build userAgent
99- // using ManagedChannelBuilder AND
100- builder .setHeaderProvider (
101- FixedHeaderProvider .create (GrpcUtil .USER_AGENT_KEY .name (), options .getUserAgent ()));
90+ buildSampleRowKeysSettings (builder , options );
10291
10392 // TODO: implementation for channelCount or channelPerCPU
10493 ManagedChannelBuilder channelBuilder = ManagedChannelBuilder
105- .forAddress (options .getDataHost (), options .getPort ())//
94+ .forAddress (options .getDataHost (), options .getPort ())
10695 .userAgent (options .getUserAgent ());
10796
10897 if (options .usePlaintextNegotiation ()) {
10998 channelBuilder .usePlaintext ();
11099 }
100+
111101 builder .setTransportChannelProvider (
112102 FixedTransportChannelProvider .create (GrpcTransportChannel .create (channelBuilder .build ())));
113103
@@ -120,23 +110,22 @@ public static BigtableDataSettings fromBigtableOptions(final BigtableOptions opt
120110 * @param builder a {@link BigtableDataSettings.Builder} object.
121111 * @param options a {@link BigtableOptions} object.
122112 */
123- private static void buildBulkOptions (Builder builder , BigtableOptions options ) {
113+ private static void buildBulkMutationsSettings (Builder builder , BigtableOptions options ) {
124114 BulkOptions bulkOptions = options .getBulkOptions ();
125115 BatchingSettings .Builder batchSettingsBuilder = BatchingSettings .newBuilder ();
126116
127- FlowControlSettings .Builder flowControlBuilder =
128- FlowControlSettings .newBuilder ()
129- .setMaxOutstandingRequestBytes (bulkOptions .getMaxMemory ());
130-
131117 long autoFlushMs = bulkOptions .getAutoflushMs ();
132118 long bulkMaxRowKeyCount = bulkOptions .getBulkMaxRowKeyCount ();
133119 long maxInflightRpcs = bulkOptions .getMaxInflightRpcs ();
134120
135121 if (autoFlushMs > 0 ) {
136122 batchSettingsBuilder .setDelayThreshold (Duration .ofMillis (autoFlushMs ));
137123 }
124+ FlowControlSettings .Builder flowControlBuilder = FlowControlSettings .newBuilder ();
138125 if (maxInflightRpcs > 0 ) {
139- flowControlBuilder .setMaxOutstandingElementCount (maxInflightRpcs * bulkMaxRowKeyCount );
126+ flowControlBuilder
127+ .setMaxOutstandingRequestBytes (bulkOptions .getMaxMemory ())
128+ .setMaxOutstandingElementCount (maxInflightRpcs * bulkMaxRowKeyCount );
140129 }
141130
142131 batchSettingsBuilder
@@ -156,9 +145,9 @@ private static void buildBulkOptions(Builder builder, BigtableOptions options) {
156145 * @param builder a {@link BigtableDataSettings.Builder} object.
157146 * @param bulkMutation a {@link BulkOptions} object.
158147 */
159- private static void buildSampleRowKeys (Builder builder , BigtableOptions options ) {
148+ private static void buildSampleRowKeysSettings (Builder builder , BigtableOptions options ) {
160149 builder .sampleRowKeysSettings ()
161- .setRetrySettings (defaultRetrySettings (options ));
150+ .setRetrySettings (buildIdempotentRetrySettings (options ));
162151 }
163152
164153 /**
@@ -167,9 +156,9 @@ private static void buildSampleRowKeys(Builder builder, BigtableOptions options)
167156 * @param builder a {@link BigtableDataSettings.Builder} object.
168157 * @param bulkMutation a {@link BulkOptions} object.
169158 */
170- private static void buildMutateRow (Builder builder , BigtableOptions options ) {
159+ private static void buildMutateRowSettings (Builder builder , BigtableOptions options ) {
171160 builder .mutateRowSettings ()
172- .setRetrySettings (defaultRetrySettings (options ));
161+ .setRetrySettings (buildIdempotentRetrySettings (options ));
173162 }
174163
175164 /**
@@ -178,10 +167,24 @@ private static void buildMutateRow(Builder builder, BigtableOptions options) {
178167 * @param builder a {@link BigtableDataSettings.Builder} object.
179168 * @param bulkMutation a {@link BulkOptions} object.
180169 */
181- private static void buildReadRows (Builder builder , BigtableOptions options ) {
182- // TODO: set readPartialRowTimeout for watchdog timer, taken BigtableSession#setupWatchdog()
170+ private static void buildReadRowsSettings (Builder builder , BigtableOptions options ) {
171+ RetryOptions retryOptions = options .getRetryOptions ();
172+
173+ RetrySettings .Builder retryBuilder = RetrySettings .newBuilder ()
174+ .setInitialRetryDelay (ofMillis (retryOptions .getInitialBackoffMillis ()))
175+ .setRetryDelayMultiplier (retryOptions .getBackoffMultiplier ())
176+ .setMaxRetryDelay (ofMillis (retryOptions .getMaxElapsedBackoffMillis ()))
177+ .setMaxAttempts (retryOptions .getMaxScanTimeoutRetries ());
178+
179+ // configurations for RPC timeouts
180+ Duration readPartialRowTimeout = ofMillis (retryOptions .getReadPartialRowTimeoutMillis ());
181+ retryBuilder
182+ .setInitialRpcTimeout (readPartialRowTimeout )
183+ .setMaxRpcTimeout (readPartialRowTimeout )
184+ .setTotalTimeout (ofMillis (options .getCallOptionsConfig ().getLongRpcTimeoutMs ()));
185+
183186 builder .readRowsSettings ()
184- .setRetrySettings (defaultRetrySettings ( options ));
187+ .setRetrySettings (retryBuilder . build ( ));
185188 }
186189
187190 /**
@@ -190,7 +193,7 @@ private static void buildReadRows(Builder builder, BigtableOptions options) {
190193 * @param builder a {@link BigtableDataSettings.Builder} object.
191194 * @param bulkMutation a {@link BulkOptions} object.
192195 */
193- private static void buildReadModifyWrite (Builder builder , long rpcTimeoutMs ) {
196+ private static void buildReadModifyWriteSettings (Builder builder , long rpcTimeoutMs ) {
194197 builder .readModifyWriteRowSettings ()
195198 .setSimpleTimeoutNoRetries (ofMillis (rpcTimeoutMs ));
196199 }
@@ -201,7 +204,7 @@ private static void buildReadModifyWrite(Builder builder, long rpcTimeoutMs) {
201204 * @param builder a {@link BigtableDataSettings.Builder} object.
202205 * @param bulkMutation a {@link BulkOptions} object.
203206 */
204- private static void buildCheckAndMutateRow (Builder builder , long rpcTimeoutMs ) {
207+ private static void buildCheckAndMutateRowSettings (Builder builder , long rpcTimeoutMs ) {
205208 builder .checkAndMutateRowSettings ()
206209 .setSimpleTimeoutNoRetries (ofMillis (rpcTimeoutMs ));
207210 }
@@ -212,7 +215,7 @@ private static void buildCheckAndMutateRow(Builder builder, long rpcTimeoutMs) {
212215 * @param builder a {@link BigtableDataSettings.Builder} object.
213216 * @param bulkMutation a {@link BulkOptions} object.
214217 */
215- private static RetrySettings defaultRetrySettings (BigtableOptions options ) {
218+ private static RetrySettings buildIdempotentRetrySettings (BigtableOptions options ) {
216219 RetryOptions retryOptions = options .getRetryOptions ();
217220
218221 RetrySettings .Builder retryBuilder = RetrySettings .newBuilder ()
@@ -222,12 +225,16 @@ private static RetrySettings defaultRetrySettings(BigtableOptions options) {
222225 .setMaxAttempts (retryOptions .getMaxScanTimeoutRetries ());
223226
224227 // configurations for RPC timeouts
228+ Duration shortRpcTimeout = ofMillis (options .getCallOptionsConfig ().getShortRpcTimeoutMs ());
225229 retryBuilder
226- .setInitialRpcTimeout (ofMillis ( options . getCallOptionsConfig (). getShortRpcTimeoutMs ()) )
227- .setMaxRpcTimeout (ofMillis ( retryOptions . getReadPartialRowTimeoutMillis ()) )
230+ .setInitialRpcTimeout (shortRpcTimeout )
231+ .setMaxRpcTimeout (shortRpcTimeout )
228232 .setTotalTimeout (ofMillis (options .getCallOptionsConfig ().getLongRpcTimeoutMs ()));
229-
230- // TODO: an option to set RetryOptions#allowRetriesWithoutTimestamp
233+
234+ if (retryOptions .allowRetriesWithoutTimestamp ()) {
235+ //TODO: add instruction to create unsafeMutation.
236+ LOG .warn ("Retries without Timestamp doesn't support" );
237+ }
231238 return retryBuilder .build ();
232239 }
233240
@@ -239,7 +246,7 @@ private static RetrySettings defaultRetrySettings(BigtableOptions options) {
239246 * @throws FileNotFoundException
240247 * @throws IOException
241248 */
242- private static void buildCredentialProvider (Builder builder , CredentialOptions credentialOptions )
249+ private static void buildCredentialProviderSettings (Builder builder , CredentialOptions credentialOptions )
243250 throws FileNotFoundException , IOException {
244251 CredentialsProvider credentialsProvider = NoCredentialsProvider .create ();
245252
0 commit comments