Skip to content

Commit add9489

Browse files
authored
Add Sink class, implement sink methods and tests (#1063)
1 parent 0edaca8 commit add9489

File tree

9 files changed

+1390
-4
lines changed

9 files changed

+1390
-4
lines changed

gcloud-java-logging/pom.xml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
<dependency>
3636
<groupId>io.grpc</groupId>
3737
<artifactId>grpc-all</artifactId>
38-
<version>0.12.0</version>
38+
<version>0.14.0</version>
3939
</dependency>
4040
<dependency>
4141
<groupId>com.google.auto.value</groupId>
@@ -48,6 +48,12 @@
4848
<version>4.12</version>
4949
<scope>test</scope>
5050
</dependency>
51+
<dependency>
52+
<groupId>org.easymock</groupId>
53+
<artifactId>easymock</artifactId>
54+
<version>3.4</version>
55+
<scope>test</scope>
56+
</dependency>
5157
</dependencies>
5258
<profiles>
5359
<profile>

gcloud-java-logging/src/main/java/com/google/cloud/logging/Logging.java

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,123 @@
1616

1717
package com.google.cloud.logging;
1818

19+
import com.google.cloud.AsyncPage;
20+
import com.google.cloud.Page;
1921
import com.google.cloud.Service;
2022

23+
import java.util.Map;
24+
import java.util.concurrent.Future;
25+
2126
public interface Logging extends AutoCloseable, Service<LoggingOptions> {
27+
28+
/**
29+
* Class for specifying options for listing sinks, monitored resources and monitored resource
30+
* descriptors.
31+
*/
32+
final class ListOption extends Option {
33+
34+
private static final long serialVersionUID = -6857294816115909271L;
35+
36+
enum OptionType implements Option.OptionType {
37+
PAGE_SIZE, PAGE_TOKEN;
38+
39+
@SuppressWarnings("unchecked")
40+
<T> T get(Map<Option.OptionType, ?> options) {
41+
return (T) options.get(this);
42+
}
43+
}
44+
45+
private ListOption(OptionType option, Object value) {
46+
super(option, value);
47+
}
48+
49+
/**
50+
* Returns an option to specify the maximum number of resources returned per page.
51+
*/
52+
public static ListOption pageSize(int pageSize) {
53+
return new ListOption(OptionType.PAGE_SIZE, pageSize);
54+
}
55+
56+
/**
57+
* Returns an option to specify the page token from which to start listing resources.
58+
*/
59+
public static ListOption pageToken(String pageToken) {
60+
return new ListOption(OptionType.PAGE_TOKEN, pageToken);
61+
}
62+
}
63+
64+
/**
65+
* Creates a new sink.
66+
*
67+
* @return the created sink
68+
* @throws LoggingException upon failure
69+
*/
70+
Sink create(SinkInfo sink);
71+
72+
/**
73+
* Sends a request for creating a sink. This method returns a {@code Future} object to consume the
74+
* result. {@link Future#get()} returns the created sink or {@code null} if not found.
75+
*/
76+
Future<Sink> createAsync(SinkInfo sink);
77+
78+
/**
79+
* Updates a sink or creates one if it does not exist.
80+
*
81+
* @return the created sink
82+
* @throws LoggingException upon failure
83+
*/
84+
Sink update(SinkInfo sink);
85+
86+
/**
87+
* Sends a request for updating a sink (or creating it, if it does not exist). This method returns
88+
* a {@code Future} object to consume the result. {@link Future#get()} returns the
89+
* updated/created sink or {@code null} if not found.
90+
*/
91+
Future<Sink> updateAsync(SinkInfo sink);
92+
93+
/**
94+
* Returns the requested sink or {@code null} if not found.
95+
*
96+
* @throws LoggingException upon failure
97+
*/
98+
Sink getSink(String sink);
99+
100+
/**
101+
* Sends a request for getting a sink. This method returns a {@code Future} object to consume the
102+
* result. {@link Future#get()} returns the requested sink or {@code null} if not found.
103+
*
104+
* @throws LoggingException upon failure
105+
*/
106+
Future<Sink> getSinkAsync(String sink);
107+
108+
/**
109+
* Lists the sinks. This method returns a {@link Page} object that can be used to consume
110+
* paginated results. Use {@link ListOption} to specify the page size or the page token from which
111+
* to start listing sinks.
112+
*
113+
* @throws LoggingException upon failure
114+
*/
115+
Page<Sink> listSinks(ListOption... options);
116+
117+
/**
118+
* Sends a request for listing sinks. This method returns a {@code Future} object to consume
119+
* the result. {@link Future#get()} returns an {@link AsyncPage} object that can be used to
120+
* asynchronously handle paginated results. Use {@link ListOption} to specify the page size or the
121+
* page token from which to start listing sinks.
122+
*/
123+
Future<AsyncPage<Sink>> listSinksAsync(ListOption... options);
124+
125+
/**
126+
* Deletes the requested sink.
127+
*
128+
* @return {@code true} if the sink was deleted, {@code false} if it was not found
129+
*/
130+
boolean deleteSink(String sink);
131+
132+
/**
133+
* Sends a request for deleting a sink. This method returns a {@code Future} object to consume the
134+
* result. {@link Future#get()} returns {@code true} if the sink was deleted, {@code false} if it
135+
* was not found.
136+
*/
137+
Future<Boolean> deleteSinkAsync(String sink);
22138
}
Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
/*
2+
* Copyright 2016 Google Inc. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.logging;
18+
19+
import static com.google.api.client.util.Preconditions.checkArgument;
20+
import static com.google.cloud.logging.Logging.ListOption.OptionType.PAGE_SIZE;
21+
import static com.google.cloud.logging.Logging.ListOption.OptionType.PAGE_TOKEN;
22+
import static com.google.common.util.concurrent.Futures.lazyTransform;
23+
24+
import com.google.cloud.AsyncPage;
25+
import com.google.cloud.AsyncPageImpl;
26+
import com.google.cloud.BaseService;
27+
import com.google.cloud.Page;
28+
import com.google.cloud.PageImpl;
29+
import com.google.cloud.logging.spi.LoggingRpc;
30+
import com.google.cloud.logging.spi.v2.ConfigServiceV2Api;
31+
import com.google.common.base.Function;
32+
import com.google.common.base.Throwables;
33+
import com.google.common.collect.ImmutableList;
34+
import com.google.common.collect.Lists;
35+
import com.google.common.collect.Maps;
36+
import com.google.common.util.concurrent.Uninterruptibles;
37+
import com.google.logging.v2.CreateSinkRequest;
38+
import com.google.logging.v2.DeleteSinkRequest;
39+
import com.google.logging.v2.GetSinkRequest;
40+
import com.google.logging.v2.ListSinksRequest;
41+
import com.google.logging.v2.ListSinksResponse;
42+
import com.google.logging.v2.UpdateSinkRequest;
43+
import com.google.protobuf.Empty;
44+
45+
import java.util.List;
46+
import java.util.Map;
47+
import java.util.concurrent.ExecutionException;
48+
import java.util.concurrent.Future;
49+
50+
class LoggingImpl extends BaseService<LoggingOptions> implements Logging {
51+
52+
private final LoggingRpc rpc;
53+
private boolean closed;
54+
55+
private static final Function<Empty, Boolean> EMPTY_TO_BOOLEAN_FUNCTION =
56+
new Function<Empty, Boolean>() {
57+
@Override
58+
public Boolean apply(Empty input) {
59+
return input != null;
60+
}
61+
};
62+
63+
LoggingImpl(LoggingOptions options) {
64+
super(options);
65+
rpc = options.rpc();
66+
}
67+
68+
private static <V> V get(Future<V> future) {
69+
try {
70+
return Uninterruptibles.getUninterruptibly(future);
71+
} catch (ExecutionException ex) {
72+
throw Throwables.propagate(ex.getCause());
73+
}
74+
}
75+
76+
private abstract static class BasePageFetcher<T> implements AsyncPageImpl.NextPageFetcher<T> {
77+
78+
private static final long serialVersionUID = 5095123855547444030L;
79+
80+
private final LoggingOptions serviceOptions;
81+
private final Map<Option.OptionType, ?> requestOptions;
82+
83+
private BasePageFetcher(LoggingOptions serviceOptions, String cursor,
84+
Map<Option.OptionType, ?> requestOptions) {
85+
this.serviceOptions = serviceOptions;
86+
this.requestOptions =
87+
PageImpl.nextRequestOptions(PAGE_TOKEN, cursor, requestOptions);
88+
}
89+
90+
LoggingOptions serviceOptions() {
91+
return serviceOptions;
92+
}
93+
94+
Map<Option.OptionType, ?> requestOptions() {
95+
return requestOptions;
96+
}
97+
}
98+
99+
private static class SinkPageFetcher extends BasePageFetcher<Sink> {
100+
101+
private static final long serialVersionUID = 4879364260060886875L;
102+
103+
SinkPageFetcher(LoggingOptions serviceOptions, String cursor,
104+
Map<Option.OptionType, ?> requestOptions) {
105+
super(serviceOptions, cursor, requestOptions);
106+
}
107+
108+
@Override
109+
public Future<AsyncPage<Sink>> nextPage() {
110+
return listSinksAsync(serviceOptions(), requestOptions());
111+
}
112+
}
113+
114+
@Override
115+
public Sink create(SinkInfo sink) {
116+
return get(createAsync(sink));
117+
}
118+
119+
@Override
120+
public Future<Sink> createAsync(SinkInfo sink) {
121+
CreateSinkRequest request = CreateSinkRequest.newBuilder()
122+
.setProjectName(ConfigServiceV2Api.formatProjectName(options().projectId()))
123+
.setSink(sink.toPb(options().projectId()))
124+
.build();
125+
return lazyTransform(rpc.create(request), Sink.fromPbFunction(this));
126+
}
127+
128+
@Override
129+
public Sink update(SinkInfo sink) {
130+
return get(updateAsync(sink));
131+
}
132+
133+
@Override
134+
public Future<Sink> updateAsync(SinkInfo sink) {
135+
UpdateSinkRequest request = UpdateSinkRequest.newBuilder()
136+
.setSinkName(ConfigServiceV2Api.formatSinkName(options().projectId(), sink.name()))
137+
.setSink(sink.toPb(options().projectId()))
138+
.build();
139+
return lazyTransform(rpc.update(request), Sink.fromPbFunction(this));
140+
}
141+
142+
@Override
143+
public Sink getSink(String sink) {
144+
return get(getSinkAsync(sink));
145+
}
146+
147+
@Override
148+
public Future<Sink> getSinkAsync(String sink) {
149+
GetSinkRequest request = GetSinkRequest.newBuilder()
150+
.setSinkName(ConfigServiceV2Api.formatSinkName(options().projectId(), sink))
151+
.build();
152+
return lazyTransform(rpc.get(request), Sink.fromPbFunction(this));
153+
}
154+
155+
private static ListSinksRequest listSinksRequest(LoggingOptions serviceOptions,
156+
Map<Option.OptionType, ?> options) {
157+
ListSinksRequest.Builder builder = ListSinksRequest.newBuilder();
158+
builder.setProjectName(ConfigServiceV2Api.formatProjectName(serviceOptions.projectId()));
159+
Integer pageSize = PAGE_SIZE.get(options);
160+
String pageToken = PAGE_TOKEN.get(options);
161+
if (pageSize != null) {
162+
builder.setPageSize(pageSize);
163+
}
164+
if (pageToken != null) {
165+
builder.setPageToken(pageToken);
166+
}
167+
return builder.build();
168+
}
169+
170+
private static Future<AsyncPage<Sink>> listSinksAsync(final LoggingOptions serviceOptions,
171+
final Map<Option.OptionType, ?> options) {
172+
final ListSinksRequest request = listSinksRequest(serviceOptions, options);
173+
Future<ListSinksResponse> list = serviceOptions.rpc().list(request);
174+
return lazyTransform(list, new Function<ListSinksResponse, AsyncPage<Sink>>() {
175+
@Override
176+
public AsyncPage<Sink> apply(ListSinksResponse listSinksResponse) {
177+
List<Sink> sinks = listSinksResponse.getSinksList() == null ? ImmutableList.<Sink>of()
178+
: Lists.transform(listSinksResponse.getSinksList(),
179+
Sink.fromPbFunction(serviceOptions.service()));
180+
String cursor = listSinksResponse.getNextPageToken().equals("") ? null
181+
: listSinksResponse.getNextPageToken();
182+
return new AsyncPageImpl<>(
183+
new SinkPageFetcher(serviceOptions, cursor, options), cursor, sinks);
184+
}
185+
});
186+
}
187+
188+
@Override
189+
public Page<Sink> listSinks(ListOption... options) {
190+
return get(listSinksAsync(options));
191+
}
192+
193+
@Override
194+
public Future<AsyncPage<Sink>> listSinksAsync(ListOption... options) {
195+
return listSinksAsync(options(), optionMap(options));
196+
}
197+
198+
@Override
199+
public boolean deleteSink(String sink) {
200+
return get(deleteSinkAsync(sink));
201+
}
202+
203+
@Override
204+
public Future<Boolean> deleteSinkAsync(String sink) {
205+
DeleteSinkRequest request = DeleteSinkRequest.newBuilder()
206+
.setSinkName(ConfigServiceV2Api.formatSinkName(options().projectId(), sink))
207+
.build();
208+
return lazyTransform(rpc.delete(request), EMPTY_TO_BOOLEAN_FUNCTION);
209+
}
210+
211+
@Override
212+
public void close() throws Exception {
213+
if (closed) {
214+
return;
215+
}
216+
closed = true;
217+
rpc.close();
218+
}
219+
220+
static <T extends Option.OptionType> Map<Option.OptionType, ?> optionMap(Option... options) {
221+
Map<Option.OptionType, Object> optionMap = Maps.newHashMap();
222+
for (Option option : options) {
223+
Object prev = optionMap.put(option.optionType(), option.value());
224+
checkArgument(prev == null, "Duplicate option %s", option);
225+
}
226+
return optionMap;
227+
}
228+
}

gcloud-java-logging/src/main/java/com/google/cloud/logging/LoggingOptions.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,7 @@ public static class DefaultLoggingFactory implements LoggingFactory {
3737

3838
@Override
3939
public Logging create(LoggingOptions options) {
40-
// todo(mziccard) uncomment once LoggingImpl is implemented
41-
// return new LoggingImpl(options);
42-
return null;
40+
return new LoggingImpl(options);
4341
}
4442
}
4543

0 commit comments

Comments
 (0)