Skip to content
This repository has been archived by the owner on Feb 15, 2022. It is now read-only.

Commit

Permalink
FIX: NullPointerException
Browse files Browse the repository at this point in the history
  • Loading branch information
chenqi0805 committed Oct 20, 2020
1 parent a62d2c4 commit dcb9f51
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.MatchQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;

import java.io.IOException;
import java.util.Arrays;
Expand All @@ -19,11 +23,18 @@
public class ElasticsearchReader {
private String scrollId = null;
private final String indexPattern;
private final SearchSourceBuilder searchSourceBuilder;
private final TimeValue keepAlive = TimeValue.timeValueMinutes(1);
private long total;

public ElasticsearchReader(final String indexPattern) {
public ElasticsearchReader(final String indexPattern, final String field, final String value) {
this.indexPattern = indexPattern;
if (field != null && value != null) {
searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.matchQuery(field, value));
} else {
searchSourceBuilder = null;
}
}

public long getTotal() {
Expand All @@ -34,6 +45,9 @@ public List<Map<String, Object>> nextBatch(final RestHighLevelClient restHighLev
if (scrollId == null) {
final SearchRequest searchRequest = new SearchRequest(indexPattern);
searchRequest.scroll(keepAlive);
if (searchSourceBuilder != null) {
searchRequest.source(searchSourceBuilder);
}
final SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
// Update total hits
total = searchResponse.getHits().getTotalHits().value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@ public class ZipkinElasticToOtel {
public static final Logger LOG = LoggerFactory.getLogger(ZipkinElasticToOtel.class);

public static void main(final String[] args) throws IOException {
if (args.length != 1) {
if (args.length < 1) {
System.err.println("Missing indexPattern as arg");
System.exit(1);
}
final String indexPattern = args[0];
final String field = args.length >= 2? args[1] : null;
final String value = args.length >= 3? args[2] : null;
final boolean isTest = !System.getProperty("test", "false").equalsIgnoreCase("false");
OTelTraceSource oTelTraceSource = null;
if (isTest) {
Expand All @@ -39,7 +41,7 @@ public static void main(final String[] args) throws IOException {
.withPassword("admin")
.build();
final RestHighLevelClient restHighLevelClient = connectionConfiguration.createClient();
final ElasticsearchReader reader = new ElasticsearchReader(indexPattern);
final ElasticsearchReader reader = new ElasticsearchReader(indexPattern, field, value);
final TraceServiceGrpc.TraceServiceBlockingStub client = createGRPCClient();
System.out.println("Reading batch 0");
List<Map<String, Object>> sources = reader.nextBatch(restHighLevelClient);
Expand All @@ -52,7 +54,7 @@ public static void main(final String[] args) throws IOException {
final ExportTraceServiceRequest exportTraceServiceRequest = ZipkinElasticToOtelProcessor.sourcesToRequest(sources);
client.export(exportTraceServiceRequest);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
e.printStackTrace();
}
System.out.println(String.format("Reading batch %d", i+1));
sources = reader.nextBatch(restHighLevelClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ public static Span sourceToSpan(final Map<String, Object> source) {
final String spanKind = (String) source.get(SPAN_KIND);
// TODO: read span status from tags
final Map<String, Object> tags = (Map<String, Object>) source.get(TAGS);
final Integer statusCode = extractStatusCodeFromTags(tags);
Integer statusCode = null;
if (tags != null) {
statusCode = extractStatusCodeFromTags(tags);
}

final Span.Builder spanBuilder = Span.newBuilder()
.setStartTimeUnixNano(startTime * 1000) // Convert to UnixNano
Expand Down Expand Up @@ -103,9 +106,11 @@ public static Integer extractStatusCodeFromTags(final Map<String, Object> tags)
// Extract only if value is json string
try {
final Map<String, Object> value = mapper.readValue((String) entry.getValue(), typeRef);
final Integer statusCodeValue = (Integer) value.get(STATUS_CODE_VALUE);
if (statusCodeValue != null) {
return statusCodeValue;
if (value != null) {
final Integer statusCodeValue = (Integer) value.get(STATUS_CODE_VALUE);
if (statusCodeValue != null) {
return statusCodeValue;
}
}
} catch (final JsonProcessingException ignored) { }
}
Expand Down

0 comments on commit dcb9f51

Please sign in to comment.