Skip to content

Commit

Permalink
Merge the dev branch.
Browse files Browse the repository at this point in the history
  • Loading branch information
shaomengwang committed Apr 26, 2024
1 parent 425469a commit 76a9aed
Show file tree
Hide file tree
Showing 56 changed files with 1,328 additions and 1,273 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package com.alibaba.alink.common.insights;

import org.apache.flink.api.java.tuple.Tuple3;

import breeze.stats.distributions.LogNormal;
import org.apache.commons.math3.distribution.ChiSquaredDistribution;
import org.apache.commons.math3.distribution.ExponentialDistribution;
import org.apache.commons.math3.distribution.LogNormalDistribution;
import org.apache.commons.math3.distribution.LogisticDistribution;
import org.apache.commons.math3.distribution.NormalDistribution;
import org.apache.commons.math3.distribution.TDistribution;
import org.apache.commons.math3.distribution.UniformRealDistribution;
import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest;

import java.util.Arrays;
import java.util.List;

public class DistributionUtil {

public static final KolmogorovSmirnovTest KS_TEST = new KolmogorovSmirnovTest();

public static double[] loadDataToVec(List <Number> dataList) {
double[] datas = new double[dataList.size()];
for (int i = 0; i < dataList.size(); i++) {
datas[i] = Double.valueOf(String.valueOf(dataList.get(i)));
}
return datas;
}

public static Tuple3 <Double, Double, double[]> getMeanSd(List <Number> dataList) {
if (dataList.size() == 0) {
return Tuple3.of(0D, 0D, null);
}
double[] datas = loadDataToVec(dataList);
double avg = Arrays.stream(datas).average().getAsDouble();
double variance = 0;
for (int i = 0; i < datas.length; i++) {
variance += Math.pow(datas[i] - avg, 2);
}
variance = variance / datas.length;
double sd = Math.sqrt(variance);
return Tuple3.of(avg, sd, datas);
}

public static double testNormalDistribution(List <Number> dataList) {
if (dataList.size() == 0) {
return 0;
}
Tuple3 <Double, Double, double[]> avgTuple = getMeanSd(dataList);
NormalDistribution distribution = new NormalDistribution(avgTuple.f0, avgTuple.f1);
return KS_TEST.kolmogorovSmirnovTest(distribution, avgTuple.f2);
}

public static double testUniformDistribution(List <Number> dataList) {
if (dataList.size() == 0) {
return 0;
}
double[] datas = loadDataToVec(dataList);
double lower = Arrays.stream(datas).min().getAsDouble();
double upper = Arrays.stream(datas).max().getAsDouble();

UniformRealDistribution distribution = new UniformRealDistribution(lower, upper);
return KS_TEST.kolmogorovSmirnovTest(distribution, datas);
}

// if positive, return degreeOfFreedom
public static double testChiSquaredDistribution(List <Number> dataList) {
if (dataList.size() == 0) {
return 0;
}
double[] datas = loadDataToVec(dataList);
for (int i = 1; i <= 20; i++) {
ChiSquaredDistribution distribution = new ChiSquaredDistribution(i);
double p = KS_TEST.kolmogorovSmirnovTest(distribution, datas);
if (p > 0.05 ) {
return i;
}
}
return 0;
}

public static double testExpDistribution(List <Number> dataList) {
if (dataList.size() == 0) {
return 0;
}
double[] datas = loadDataToVec(dataList);
double avg = Arrays.stream(datas).average().getAsDouble();
ExponentialDistribution distribution = new ExponentialDistribution(avg);
return KS_TEST.kolmogorovSmirnovTest(distribution, datas);
}

// if positive, return degreeOfFreedom
public static double testTDistribution(List <Number> dataList) {
if (dataList.size() == 0) {
return 0;
}
double[] datas = loadDataToVec(dataList);
for (int i = 1; i <= 20; i++) {
TDistribution distribution = new TDistribution(i);
double p = KS_TEST.kolmogorovSmirnovTest(distribution, datas);
if (p > 0.05) {
return i;
}
}
return 0;
}

public static double testLogisticDistribution(List<Number> dataList) {
if (dataList.size() == 0) {
return 0;
}
Tuple3 <Double, Double, double[]> tuple3 = getMeanSd(dataList);
double avg = tuple3.f0;
double sd = tuple3.f1;
double[] datas = tuple3.f2;
LogisticDistribution distribution = new LogisticDistribution(avg, sd);
return KS_TEST.kolmogorovSmirnovTest(distribution, datas);
}

public static double testLogNormalDistribution(List<Number> dataList) {
if (dataList.size() == 0) {
return 0;
}
double[] datas = loadDataToVec(dataList);
double min = Arrays.stream(datas).min().getAsDouble();
if (min <= 0) {
return 0;
}
double[] logValues = new double[datas.length];
for (int i = 0; i < datas.length; i++) {
logValues[i] = Math.log(datas[i]);
}
double avg = Arrays.stream(logValues).average().getAsDouble();
double var = 0;
for (int i = 0; i < logValues.length; i++) {
var += Math.pow((logValues[i] - avg), 2);
}
var = Math.sqrt(var);
LogNormalDistribution distribution = new LogNormalDistribution(avg, var);
return KS_TEST.kolmogorovSmirnovTest(distribution, datas);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ public static class View implements Serializable {
}

public static DvInsightDescription of(Insight insight) {
return DvInsightDescription.of(insight, new HashMap <>());
}

public static DvInsightDescription of(Insight insight, Map<String, String> cnNamesMap) {
InsightType type = insight.type;
int colNum = insight.layout.data.getNumCol();
int rowNum = insight.layout.data.getNumRow();
Expand All @@ -122,7 +126,7 @@ public static DvInsightDescription of(Insight insight) {
fields[i] = new Field();
fields[i].id = colNames[i];
fields[i].code = colNames[i];
fields[i].alias = colName;
fields[i].alias = cnNamesMap.getOrDefault(colName, colName);;
fields[i].abstraction = new Abstraction();
if (i == 0) {
fields[i].abstraction.aggregation = "COUNTDISTINCT";
Expand All @@ -144,7 +148,8 @@ public static DvInsightDescription of(Insight insight) {
fields[i].id = colNames[i];
fields[i].code = colNames[i];
if (i != 0) {
fields[i].alias = insight.subject.measures.get(i - 1).colName;
String colName = insight.subject.measures.get(i - 1).colName;
fields[i].alias = cnNamesMap.getOrDefault(colName, colName);
fields[i].abstraction = new Abstraction();
fields[i].abstraction.aggregation = insight.subject.measures.get(i - 1).aggr.getEnName();
}
Expand All @@ -157,7 +162,8 @@ public static DvInsightDescription of(Insight insight) {
if (i >= 1) {
if (insight.subject != null && insight.subject.measures != null
&& i - 1 < insight.subject.measures.size()) {
fields[i].alias = insight.subject.measures.get(i - 1).colName;
String colName = insight.subject.measures.get(i - 1).colName;
fields[i].alias = cnNamesMap.getOrDefault(colName, colName);
fields[i].abstraction = new Abstraction();
fields[i].abstraction.aggregation = insight.subject.measures.get(i - 1).aggr.getEnName();
}
Expand All @@ -170,7 +176,8 @@ public static DvInsightDescription of(Insight insight) {
fields[i].code = colNames[i];
if (i >= 1) {
if (insight.subject != null && insight.subject.measures != null) {
fields[i].alias = insight.subject.measures.get(0).colName;
String colName = insight.subject.measures.get(0).colName;
fields[i].alias = cnNamesMap.getOrDefault(colName, colName);
fields[i].abstraction = new Abstraction();
fields[i].abstraction.aggregation = insight.subject.measures.get(0).aggr.getEnName();
}
Expand All @@ -184,7 +191,8 @@ public static DvInsightDescription of(Insight insight) {
if (i >= 1) {
if (insight.subject != null && insight.subject.measures != null
&& i - 1 < insight.subject.measures.size()) {
fields[i].alias = insight.subject.measures.get(i - 1).colName;
String colName = insight.subject.measures.get(i - 1).colName;
fields[i].alias = cnNamesMap.getOrDefault(colName, colName);
fields[i].abstraction = new Abstraction();
fields[i].abstraction.aggregation = insight.subject.measures.get(i - 1).aggr.getEnName();
}
Expand All @@ -197,7 +205,8 @@ public static DvInsightDescription of(Insight insight) {
fields[i].code = colNames[i];
if (i == 1) {
if (insight.subject != null && insight.subject.measures != null) {
fields[i].alias = insight.subject.measures.get(0).colName;
String colName = insight.subject.measures.get(0).colName;
fields[i].alias = cnNamesMap.getOrDefault(colName, colName);
fields[i].abstraction = new Abstraction();
fields[i].abstraction.aggregation = insight.subject.measures.get(0).aggr.getEnName();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,41 @@
public class StatInsight {

public static boolean isNumberType(TypeInformation<?> type) {
return type.equals(Types.INT) || type.equals(Types.LONG) || type.equals(Types.DOUBLE) || type.equals(Types.FLOAT) || type.equals(Types.SHORT);
return type.equals(Types.INT) || type.equals(Types.LONG) || type.equals(Types.DOUBLE) || type.equals(Types.FLOAT)
|| type.equals(Types.SHORT) || type.equals(Types.BIG_DEC) || type.equals(Types.BIG_INT);
}

public static Insight basicStatForString(LocalOperator <?> dataAggr, String colName) {
Insight insight = new Insight();
insight.type = InsightType.BasicStat;
List<Row> list = dataAggr.getOutputTable().getRows();
// colName, term, frequency
int distinct_count = 0;
long count = 0L;
List<Integer> countList = new ArrayList <>();
for (Row row : list) {
Object object = row.getField(0);
if (null == object) {
continue;
}
if (object.equals(colName)) {
distinct_count++;
count += Long.valueOf(String.valueOf(row.getField(2)));
countList.add((Integer) row.getField(2));
}
}
LayoutData layoutData = new LayoutData();
String schema = "distinct_count_value int, count_value long";
Row row = new Row(2);
row.setField(0, distinct_count);
row.setField(1, count);
MTable mTable = new MTable(new Row[]{row}, schema);
layoutData.data = mTable;
layoutData.title = "数据列 " + colName + " 统计数据";
layoutData.xAxis = colName;
insight.layout = layoutData;
insight.score = 0.8;
return insight;
}

public static Insight basicStat(LocalOperator <?> dataAggr, String colName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@
import com.alibaba.alink.common.sql.builtin.agg.TimeSeriesAgg;
import com.alibaba.alink.common.sql.builtin.agg.VarPopUdaf;
import com.alibaba.alink.common.sql.builtin.agg.VarSampUdaf;
import com.alibaba.alink.common.sql.builtin.string.string.DateAdd;
import com.alibaba.alink.common.sql.builtin.string.string.DateDiff;
import com.alibaba.alink.common.sql.builtin.string.string.DateSub;
import com.alibaba.alink.common.sql.builtin.string.string.KeyValue;
import com.alibaba.alink.common.sql.builtin.string.string.RegExp;
import com.alibaba.alink.common.sql.builtin.string.string.RegExpExtract;
import com.alibaba.alink.common.sql.builtin.string.string.RegExpReplace;
import com.alibaba.alink.common.sql.builtin.string.string.SplitPart;
import com.alibaba.alink.common.sql.builtin.time.DataFormat;
import com.alibaba.alink.common.sql.builtin.time.FromUnixTime;
import com.alibaba.alink.common.sql.builtin.time.Now;
Expand All @@ -47,6 +55,19 @@ public static void registerUdf(TableEnvironment env) {
env.registerFunction("unix_timestamp", new UnixTimeStamp());
env.registerFunction("from_unixtime", new FromUnixTime());
env.registerFunction("date_format_ltz", new DataFormat());

env.registerFunction("split_part", new SplitPart());
env.registerFunction("keyvalue", new KeyValue());
env.registerFunction("datediff", new DateDiff());
env.registerFunction("regexp_replace", new RegExpReplace());
env.registerFunction("REGEXP_REPLACE", new RegExpReplace());
env.registerFunction("regexp", new RegExp());
env.registerFunction("REGEXP", new RegExp());
env.registerFunction("regexp_extract", new RegExpExtract());
env.registerFunction("REGEXP_EXTRACT", new RegExpExtract());
env.registerFunction("DATE_ADD", new DateAdd());
env.registerFunction("DATE_SUB", new DateSub());

}

public static void registerUdf(LocalOpCalciteSqlExecutor executor) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.alibaba.alink.common.sql.builtin.string.string;

import org.apache.flink.table.functions.ScalarFunction;

import java.sql.Timestamp;

/**
* @author weibo zhao
*/
public class DateAdd extends ScalarFunction {

private static final long serialVersionUID = -4716626352353627712L;

public String eval(String end, int days) {
if (end == null) {
return null;
}
try {
Timestamp tsEnd = Timestamp.valueOf(end);
long ld = (tsEnd.getTime() + days * 86400000L);
return new Timestamp(ld).toString();
} catch (Exception e) {
return null;
}
}

public String eval(Timestamp end, int days) {
if (end == null) {
return null;
}
long ld = (end.getTime() + days * 86400000L);
return new Timestamp(ld).toString();

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package com.alibaba.alink.common.sql.builtin.string.string;

import org.apache.flink.table.functions.ScalarFunction;

import java.sql.Timestamp;

/**
* @author weibo zhao
*/

public class DateDiff extends ScalarFunction {

private static final long serialVersionUID = 6298088633116239045L;

public Long eval(String end, String start) {
if (start == null || end == null) {
return null;
}
try {
Timestamp tsEnd = Timestamp.valueOf(end);
Timestamp tsStart = Timestamp.valueOf(start);
return (tsEnd.getTime() - tsStart.getTime()) / 86400000L;
} catch (Exception e) {
return null;
}
}

public Long eval(String end, Timestamp start) {
if (start == null || end == null) {
return null;
}
try {
Timestamp tsEnd = Timestamp.valueOf(end);
return (tsEnd.getTime() - start.getTime()) / 86400000L;
} catch (Exception e) {
return null;
}
}

public Long eval(Timestamp end, Timestamp start) {
if (start == null || end == null) {
return null;
}
return (end.getTime() - start.getTime()) / 86400000L;
}

public Long eval(Timestamp end, String start) {
if (start == null || end == null) {
return null;
}
try {
Timestamp tsStart = Timestamp.valueOf(start);
return (end.getTime() - tsStart.getTime()) / 86400000L;
} catch (Exception e) {
return null;
}
}
}
Loading

0 comments on commit 76a9aed

Please sign in to comment.