Skip to content

Commit a1d5fe3

Browse files
author
hemiao
committed
JsonArray数组
1 parent e1e2891 commit a1d5fe3

File tree

2 files changed

+153
-4
lines changed

2 files changed

+153
-4
lines changed

moql-querier/src/main/java/org/datayoo/moql/querier/es/EsDataQuerier.java

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
import java.io.IOException;
2828
import java.util.*;
29+
import java.util.stream.Collectors;
2930

3031
public class EsDataQuerier implements DataQuerier {
3132

@@ -370,18 +371,58 @@ protected EntityMap toQueryEntityMap(JsonObject jsonObject) {
370371
return new EntityMapImpl(record);
371372
}
372373

373-
374374
protected void toMap(JsonObject jsonObject, Map<String, Object> record) {
375375
for (Map.Entry<String, JsonElement> entry : jsonObject.entrySet()) {
376376
if (entry.getKey().equals("_source")) {
377377
toMap((JsonObject) entry.getValue(), record);
378378
continue;
379379
}
380380
Object value = entry.getValue();
381-
if (entry.getValue() instanceof JsonPrimitive) {
381+
if (value instanceof JsonArray) {
382+
toArrayRecord(entry.getKey(), (JsonArray) entry.getValue(), record);
383+
} else if (value instanceof JsonObject) {
384+
value = value.toString();
385+
record.put(entry.getKey(), value);
386+
} else if (entry.getValue() instanceof JsonPrimitive) {
382387
value = getValue((JsonPrimitive) entry.getValue());
388+
record.put(entry.getKey(), value);
389+
}
390+
}
391+
}
392+
393+
protected void toArrayRecord(String prefix, JsonArray array,
394+
Map<String, Object> record) {
395+
// 先把旧的数据集加入结果
396+
record.put(prefix, array.toString());
397+
Map<String, JsonArray> arrayMap = new HashMap<>();
398+
for (JsonElement element : array) {
399+
if (element instanceof JsonObject) {
400+
JsonObject jsonObject = (JsonObject) element;
401+
Set<Map.Entry<String, JsonElement>> set = jsonObject.entrySet();
402+
403+
for (Map.Entry<String, JsonElement> entry : set) {
404+
String key = prefix + "." + entry.getKey();
405+
if (arrayMap.get(key) == null) {
406+
JsonArray jsonArray = new JsonArray();
407+
jsonArray.add(entry.getValue());
408+
arrayMap.put(key, jsonArray);
409+
} else {
410+
JsonArray value = arrayMap.get(key);
411+
value.add(entry.getValue());
412+
arrayMap.put(key, value);
413+
}
414+
}
383415
}
384-
record.put(entry.getKey(), value);
416+
if (element instanceof JsonPrimitive) {
417+
if (record.get(prefix) == null) {
418+
Object value = getValue((JsonPrimitive) element);
419+
record.put(prefix, value);
420+
}
421+
422+
}
423+
}
424+
for (Map.Entry<String, JsonArray> entry : arrayMap.entrySet()) {
425+
toArrayRecord(entry.getKey(), entry.getValue(), record);
385426
}
386427
}
387428

@@ -478,7 +519,8 @@ protected Object getValue(JsonPrimitive value) {
478519
protected Object[] toRecord(Operand[] operands, EntityMap entityMap) {
479520
Object[] record = new Object[operands.length];
480521
for (int i = 0; i < operands.length; i++) {
481-
record[i] = operands[i].operate(entityMap);
522+
record[i] = entityMap.getEntity(operands[i].getName());
523+
// record[i] = operands[i].operate(entityMap);
482524
}
483525
return record;
484526
}
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
package org.datayoo.moql.querier.es;
2+
3+
import junit.framework.TestCase;
4+
import org.apache.commons.lang3.StringUtils;
5+
import org.apache.http.HttpHost;
6+
import org.apache.http.auth.AuthScope;
7+
import org.apache.http.auth.UsernamePasswordCredentials;
8+
import org.apache.http.client.CredentialsProvider;
9+
import org.apache.http.impl.client.BasicCredentialsProvider;
10+
import org.datayoo.moql.RecordSet;
11+
import org.elasticsearch.client.RestClient;
12+
import org.elasticsearch.client.RestClientBuilder;
13+
import org.elasticsearch.client.RestHighLevelClient;
14+
15+
import java.io.IOException;
16+
import java.util.*;
17+
18+
public class EsQueryTest extends TestCase {
19+
protected RestHighLevelClient restHighLevelClient;
20+
21+
public void test01() throws IOException {
22+
open();
23+
String sql = "select titleWords.index,titleWords.term from zz_news_entity-202205";
24+
EsDataQuerier esDataQuerier = new EsDataQuerier();
25+
esDataQuerier.bind(restHighLevelClient.getLowLevelClient());
26+
27+
String[] array = sql.split("(?i)from");
28+
String tableNames = array[array.length - 1].trim().split(" ")[0];
29+
30+
sql = sql.replace(tableNames, "table");
31+
Properties properties = new Properties();
32+
Properties indexNameMappings = new Properties();
33+
indexNameMappings.put("table", tableNames);
34+
properties.put(EsDataQuerier.INDEX_NAME_MAPPINGS, indexNameMappings);
35+
RecordSet recordSet = esDataQuerier.query(sql, properties);
36+
37+
System.out.println(recordSet.getRecords().size());
38+
close();
39+
}
40+
41+
public void open() throws IOException {
42+
if (restHighLevelClient != null) {
43+
return;
44+
}
45+
46+
String url = "172.30.30.4:19200";
47+
48+
List<Map<String, Object>> address = new LinkedList<>();
49+
if (StringUtils.isNotEmpty(url)) {
50+
String[] addressArray = StringUtils.split(url, ",");
51+
for (String addr : addressArray) {
52+
String host = addr.split(":")[0];
53+
String port = addr.split(":")[1];
54+
Map<String, Object> connectionInfo = new HashMap<>();
55+
connectionInfo.put("host", host);
56+
connectionInfo.put("port", Integer.valueOf(port));
57+
address.add(connectionInfo);
58+
}
59+
} else {
60+
Map<String, Object> connectionInfo = new HashMap<>();
61+
connectionInfo.put("host", "172.30.30.4");
62+
connectionInfo.put("port", 9200);
63+
address.add(connectionInfo);
64+
}
65+
restHighLevelClient = restHighLevelClient(address, "", "");
66+
}
67+
68+
public static RestHighLevelClient restHighLevelClient(
69+
List<Map<String, Object>> address, String username, String password) {
70+
// 拆分地址
71+
List<HttpHost> hostLists = new ArrayList<>();
72+
for (Map<String, Object> hostInfo : address) {
73+
String host = (String) hostInfo.get("host");
74+
int port = (int) hostInfo.get("port");
75+
hostLists.add(new HttpHost(host, port, "http"));
76+
}
77+
// 转换成 HttpHost 数组
78+
HttpHost[] httpHost = hostLists.toArray(new HttpHost[] {});
79+
// 构建连接对象
80+
RestClientBuilder builder = RestClient.builder(httpHost);
81+
if (StringUtils.isNotEmpty(username)) {
82+
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
83+
credentialsProvider.setCredentials(AuthScope.ANY,
84+
new UsernamePasswordCredentials(username, password));
85+
builder.setHttpClientConfigCallback(
86+
f -> f.setDefaultCredentialsProvider(credentialsProvider));
87+
}
88+
89+
// socket通信超时时间
90+
// 异步连接延时配置
91+
builder.setRequestConfigCallback(requestConfigBuilder -> {
92+
requestConfigBuilder.setConnectTimeout(300 * 1000);
93+
requestConfigBuilder.setSocketTimeout(300 * 1000);
94+
requestConfigBuilder.setConnectionRequestTimeout(300 * 1000);
95+
return requestConfigBuilder;
96+
});
97+
98+
return new RestHighLevelClient(builder);
99+
}
100+
101+
public void close() throws IOException {
102+
if (restHighLevelClient != null) {
103+
restHighLevelClient.close();
104+
restHighLevelClient = null;
105+
}
106+
}
107+
}

0 commit comments

Comments
 (0)