Skip to content

Commit 7a3d157

Browse files
Merge pull request #202 from robinhood-jim/develop
Develop
2 parents 667b630 + de3befe commit 7a3d157

20 files changed

+581
-144
lines changed

README.md

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ Slightly Framework design to support Spring based java or Bigdata program.
5252
PARQUET
5353
ORC
5454
PROTOBUF
55+
Apache Arrow
5556
CSV
5657
XML
5758
JSON
@@ -70,21 +71,24 @@ Slightly Framework design to support Spring based java or Bigdata program.
7071
RabbitMq
7172
7273
V. Iterable and wirtable support intergate Storage and file format
73-
mix storage and File format to support cross storage read/write
74+
mix storage and File format to support cross storage read/write
75+
76+
VI. Data file Sql filter support
77+
All File format support single table new column generator and sql filter using normal sql,parseed using Calcite core
7478

75-
VI. Spring cloud support
79+
VII. Spring cloud support
7680
WebUI simple webui base on dhtmlxGrid 5.1 with spring boot native
7781
related project in my another project microservices
7882

79-
VII. Zipkin Intergation
83+
VIII. Zipkin Intergation
8084
trace sub project aimed to support All database to be tracable and can record query parameters.
8185

82-
VIII.Dataming support
86+
IX.Dataming support
8387
Support weka dataming tools
8488
Support simile dataming tools
8589
Support spark mlib dataming tools
8690

87-
IX. Special feature
91+
X. Special feature
8892
a.A user defined xml Query config system,similar to mybatis,but easy config.
8993
b.Support defined annotation or jpa annotation in JdbcDao with ORM.
9094
c. BaseAnnotationService can access DB with minimize code,and use transaction with annotation.

README.zh.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ Spring Boot 支持
3636

3737
支持parquet/orc/avro/protobuf/csv/xml/json 等文件格式
3838
与通用文件系统和云存储访问整合,支持压缩格式
39+
大数据通用文件格式Sql支持
40+
支持以上的文件格式基于单表的sql 新字段生成和过滤,支持基本函数,有大数据量下的测试,可以为不同云服务之间有条件的数据交换提供支持
3941

4042
数据挖掘平台支持
4143

common/src/main/java/com/robin/comm/sql/CommRecordGenerator.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -348,9 +348,9 @@ private static boolean walkTree(SqlNode node, Map<String, Object> inputMap, SqlS
348348
return runValue;
349349
}
350350
protected static void checkColumnNumeric(SqlSegment segment,String columnName){
351-
Assert.isTrue(Const.META_TYPE_INTEGER.equals(segment.getOriginSchemaMap().get(columnName))
352-
|| Const.META_TYPE_BIGINT.equals(segment.getOriginSchemaMap().get(columnName))
353-
|| Const.META_TYPE_DOUBLE.equals(segment.getOriginSchemaMap().get(columnName)),"require numeric column");
351+
Assert.isTrue(Const.META_TYPE_INTEGER.equals(segment.getOriginSchemaMap().get(columnName).getColumnType())
352+
|| Const.META_TYPE_BIGINT.equals(segment.getOriginSchemaMap().get(columnName).getColumnType())
353+
|| Const.META_TYPE_DOUBLE.equals(segment.getOriginSchemaMap().get(columnName).getColumnType()),"require numeric column");
354354
}
355355

356356
private static Object getValueWithCalculate(Map<String, Object> inputMap, SqlSegment segment, SqlNode nodes) {

common/src/main/java/com/robin/comm/sql/CommSqlParser.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@ public static SqlSegment parseGroupByAgg(String sql, Lex lex, DataCollectionMeta
7272
SqlNode havingNode = sqlSelect.getHaving();
7373
segment.setHavingCause(havingNode);
7474
parseWhereParts(segment, havingNode, segment.getHaving());
75-
//List<DataSetColumnMeta> calculateSchema = getCalculateSchema(segment, meta);
7675
return segment;
7776
}catch (SqlParseException|ValidationException ex){
7877
throw new GenericException(ex);
@@ -184,7 +183,7 @@ private static List<ValueParts> parseSelectColumn(SqlSegment segment, SqlNodeLis
184183
else if (SqlBasicCall.class.isAssignableFrom(columnNodes.get(0).getClass())) {
185184
valueParts.setNodeString(columnNodes.get(0).toString());
186185
valueParts.setCalculator(columnNodes.get(0));
187-
segment.setHasFourOperations(true);
186+
segment.setSelectHasFourOperations(true);
188187
setAliasName(newColumnPrefix, newColumnPosMap, valueParts);
189188
}
190189
} else if (SqlBasicCall.class.isAssignableFrom(selected.getClass())) {
@@ -286,23 +285,26 @@ private static void parseWhereParts(SqlSegment segment, SqlNode whereNode,List<C
286285
}
287286
} else {
288287
List<SqlNode> nodes = ((SqlBasicCall) whereNode).getOperandList();
289-
for (SqlNode node : nodes) {
288+
for (int i=0;i<nodes.size();i++) {
289+
SqlNode node=nodes.get(i);
290290
String calculator = node.toString().replace(Quoting.BACK_TICK.string, "");
291291
if (FilterSqlParser.fourZeOper.matcher(calculator).find()) {
292292
ValueParts parts = new ValueParts();
293293
parts.setCalculator(node);
294294
parts.setNodeString(node.toString());
295295
parts.setAliasName(returnDefaultNewColumn(segment.getNewColumnPrefix(), segment.getNewColumnPosMap()));
296+
segment.setConditionHasFourOperations(true);
296297
newColumns.add(parts);
297298
}else if(SqlKind.FUNCTION.contains(node.getKind())){
298299
List<SqlNode> nodes1=((SqlBasicCall)node).getOperandList();
299300
ValueParts parts = new ValueParts();
300301
parts.setFunctionName(((SqlBasicCall) node).getOperator().getName());
301302
parts.setFunctionParams(nodes1);
302303
parts.setSqlKind(node.getKind());
304+
segment.setConditionHasFunction(true);
303305
newColumns.add(parts);
304-
}else if(SqlKind.LITERAL.equals(node.getKind())){
305-
306+
}else if(SqlKind.IDENTIFIER.equals(node.getKind()) && i==nodes.size()-1){
307+
segment.setHasRightColumnCmp(true);
306308
}
307309
}
308310
}

common/src/main/java/com/robin/comm/sql/SqlSegment.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import com.robin.core.fileaccess.meta.DataSetColumnMeta;
44
import lombok.Data;
5-
import org.apache.calcite.sql.SqlIdentifier;
65
import org.apache.calcite.sql.SqlNode;
76
import org.apache.commons.lang3.tuple.Pair;
87

@@ -29,7 +28,9 @@ public class SqlSegment {
2928
private List<CommSqlParser.ValueParts> having=new ArrayList<>();
3029
private Map<String,DataSetColumnMeta> originSchemaMap;
3130
// if filterSql has four operations,orc and parquet can not use filter directly
32-
private boolean hasFourOperations = false;
31+
private boolean selectHasFourOperations = false;
32+
private boolean conditionHasFourOperations=false;
33+
private boolean conditionHasFunction=false;
3334
// if filterSql compare right hand is column,orc and parquet can not use filter directly
3435
private boolean hasRightColumnCmp = false;
3536
}

common/src/main/java/com/robin/core/fileaccess/iterator/AbstractFileIterator.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,10 @@ public abstract class AbstractFileIterator implements IResourceIterator {
6464
//calculate column run async, so use concurrent
6565
protected Map<String, Object> newRecord = new ConcurrentHashMap<>();
6666
// filterSql parse compare tree
67-
protected CompareNode rootNode = null;
6867
protected String defaultNewColumnPrefix = "N_COLUMN";
6968
protected DateTimeFormatter formatter=DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
69+
//if using BufferedReader as input.only csv json format must set this to true
70+
protected boolean useBufferedReader=false;
7071

7172

7273
protected SqlSegment segment;
@@ -104,9 +105,14 @@ public void beforeProcess() {
104105
checkAccessUtil(colmeta.getPath());
105106
Assert.notNull(accessUtil, "ResourceAccessUtil is required!");
106107
try {
107-
Pair<BufferedReader, InputStream> pair = accessUtil.getInResourceByReader(colmeta, ResourceUtil.getProcessPath(colmeta.getPath()));
108-
this.reader = pair.getKey();
109-
this.instream = pair.getValue();
108+
if(useBufferedReader){
109+
Pair<BufferedReader, InputStream> pair = accessUtil.getInResourceByReader(colmeta, ResourceUtil.getProcessPath(colmeta.getPath()));
110+
this.reader = pair.getKey();
111+
this.instream = pair.getValue();
112+
}else{
113+
this.instream=accessUtil.getInResourceByStream(colmeta,ResourceUtil.getProcessPath(colmeta.getPath()));
114+
}
115+
110116
} catch (Exception ex) {
111117
logger.error("{}", ex.getMessage());
112118
}

common/src/main/java/com/robin/core/fileaccess/iterator/JsonFileIterator.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,18 @@ public class JsonFileIterator extends AbstractFileIterator{
3232
private JsonReader jreader=null;
3333
public JsonFileIterator(){
3434
identifier= Const.FILEFORMATSTR.JSON.getValue();
35+
useBufferedReader=true;
3536
}
3637
public JsonFileIterator(DataCollectionMeta metaList) {
3738
super(metaList);
3839
identifier= Const.FILEFORMATSTR.JSON.getValue();
40+
useBufferedReader=true;
3941
}
4042
public JsonFileIterator(DataCollectionMeta metaList,AbstractFileSystemAccessor accessor) {
4143
super(metaList);
4244
identifier= Const.FILEFORMATSTR.JSON.getValue();
4345
accessUtil=accessor;
46+
useBufferedReader=true;
4447
}
4548
@Override
4649
public void beforeProcess() {
Lines changed: 58 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
/*
22
* Copyright (c) 2015,robinjim(robinjim@126.com)
3-
*
3+
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
66
* You may obtain a copy of the License at
@@ -25,64 +25,66 @@
2525

2626
import java.io.IOException;
2727

28-
public class PlainTextFileIterator extends AbstractFileIterator{
29-
protected String readLineStr=null;
30-
protected String split=",";
31-
public PlainTextFileIterator(){
32-
identifier= Const.FILEFORMATSTR.CSV.getValue();
33-
}
34-
public PlainTextFileIterator(DataCollectionMeta metaList) {
35-
super(metaList);
36-
identifier= Const.FILEFORMATSTR.CSV.getValue();
37-
}
38-
public PlainTextFileIterator(DataCollectionMeta metaList,AbstractFileSystemAccessor accessor) {
39-
super(metaList);
40-
identifier= Const.FILEFORMATSTR.CSV.getValue();
41-
accessUtil=accessor;
42-
}
28+
public class PlainTextFileIterator extends AbstractFileIterator {
29+
protected String readLineStr = null;
30+
protected String split = ",";
31+
32+
public PlainTextFileIterator() {
33+
identifier = Const.FILEFORMATSTR.CSV.getValue();
34+
useBufferedReader=true;
35+
}
36+
37+
public PlainTextFileIterator(DataCollectionMeta metaList) {
38+
super(metaList);
39+
identifier = Const.FILEFORMATSTR.CSV.getValue();
40+
useBufferedReader=true;
41+
}
42+
43+
public PlainTextFileIterator(DataCollectionMeta metaList, AbstractFileSystemAccessor accessor) {
44+
super(metaList);
45+
identifier = Const.FILEFORMATSTR.CSV.getValue();
46+
accessUtil = accessor;
47+
useBufferedReader=true;
48+
}
49+
50+
@Override
51+
protected void pullNext() {
52+
53+
cachedValue.clear();
54+
try {
55+
if (reader != null) {
56+
readLineStr = reader.readLine();
57+
if (!ObjectUtils.isEmpty(readLineStr)) {
58+
String[] arr = StringUtils.split(readLineStr, split.charAt(0));
59+
if (arr.length >= colmeta.getColumnList().size()) {
60+
for (int i = 0; i < colmeta.getColumnList().size(); i++) {
61+
DataSetColumnMeta meta = colmeta.getColumnList().get(i);
62+
cachedValue.put(meta.getColumnName(), ConvertUtil.convertStringToTargetObject(arr[i], meta, formatter));
63+
}
64+
}
65+
}
66+
}
67+
} catch (IOException ex) {
68+
logger.error("{}", ex.getMessage());
69+
}
70+
}
4371

44-
@Override
45-
protected void pullNext() {
46-
try{
47-
cachedValue.clear();
48-
try{
49-
if(reader!=null){
50-
readLineStr=reader.readLine();
51-
if(!ObjectUtils.isEmpty(readLineStr)) {
52-
String[] arr = StringUtils.split(readLineStr, split.charAt(0));
53-
if (arr.length >= colmeta.getColumnList().size()) {
54-
for (int i = 0; i < colmeta.getColumnList().size(); i++) {
55-
DataSetColumnMeta meta = colmeta.getColumnList().get(i);
56-
cachedValue.put(meta.getColumnName(), ConvertUtil.convertStringToTargetObject(arr[i], meta, formatter));
57-
}
58-
}
59-
}
60-
}
61-
}catch(IOException ex){
62-
logger.error("{0}",ex.getMessage());
63-
}
72+
@Override
73+
public void remove() {
74+
try {
75+
if (useFilter) {
76+
hasNext();
77+
} else {
78+
reader.readLine();
79+
}
80+
} catch (Exception ex) {
6481

65-
}catch(Exception ex){
66-
logger.error("{}",ex.getMessage());
67-
}
68-
}
82+
}
83+
}
6984

70-
@Override
71-
public void remove() {
72-
try{
73-
if(useFilter) {
74-
hasNext();
75-
}else{
76-
reader.readLine();
77-
}
78-
}catch(Exception ex){
79-
80-
}
81-
}
85+
public void setSplit(String split) {
86+
this.split = split;
87+
}
8288

83-
public void setSplit(String split) {
84-
this.split = split;
85-
}
8689

87-
8890
}

core/src/main/java/com/robin/core/base/util/Const.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,7 @@ public enum FILEFORMATSTR {
435435
PLAIN("txt"),
436436
CSV("csv"),
437437
PROTOBUF("proto"),
438-
438+
ARROW("arrow"),
439439
ARFF("arff");
440440
private String value;
441441
FILEFORMATSTR(String value){

0 commit comments

Comments
 (0)