Skip to content

Commit d12c585

Browse files
committed
MongoDB query problem fix and client server module code refactor
1 parent 7bc1fac commit d12c585

File tree

19 files changed

+90
-62
lines changed

19 files changed

+90
-62
lines changed

core/src/main/java/com/qihoo/qsql/codegen/QueryGenerator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ private static void setSpecificState(QueryGenerator generator,
182182
* close each engine.
183183
*/
184184
public static void close() {
185+
mongo = null;
185186
elasticSearch = null;
186187
hive = null;
187188
jdbc = null;

core/src/main/java/com/qihoo/qsql/codegen/spark/SparkJdbcGenerator.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public void importDependency() {
4343
"import java.sql.DriverManager",
4444
"import java.util.ArrayList",
4545
"import java.util.List",
46+
"import java.math.BigInteger",
4647
"import org.apache.hadoop.conf.Configuration",
4748
"import org.apache.hadoop.fs.FileSystem",
4849
"import org.apache.hadoop.fs.Path",
@@ -164,6 +165,8 @@ private String declareDataTypeFormat() {
164165
+ " return DataTypes.FloatType;\n"
165166
+ " case Types.TINYINT:\n"
166167
+ " return DataTypes.ByteType;\n"
168+
+ " case Types.NUMERIC:\n"
169+
+ " return DataTypes.createDecimalType();\n"
167170
+ " case Types.BIGINT:\n"
168171
+ " return DataTypes.LongType;\n"
169172
+ " case Types.INTEGER:\n"
@@ -280,10 +283,15 @@ private String getWhileCodeInMethod() {
280283
return " while (resultSet.next()) {\n"
281284
+ " Object[] rowValue = new Object[n];\n"
282285
+ "\n"
283-
+ " for (int index = 1; index <= n; index++)\n"
284-
// + " rowValue[index - 1] = resultSet.getObject(index).toString();\n"
285-
+ " rowValue[index - 1] = resultSet.getObject(index);\n"
286-
+ "\n"
286+
+ " for (int index = 1; index <= n; index++){\n"
287+
// + " rowValue[index - 1] = resultSet.getObject(index);\n"
288+
+ " Object value = resultSet.getObject(index);\n"
289+
+ " if (value instanceof BigInteger) {\n"
290+
+ " rowValue[index - 1] = ((BigInteger) value).longValue();\n"
291+
+ " } else {\n"
292+
+ " rowValue[index - 1] = value;\n"
293+
+ " }\n"
294+
+ " }\n"
287295
+ " rows.add(rowValue);\n"
288296
+ " rowCount++;\n"
289297
+ "\n"

core/src/main/java/com/qihoo/qsql/exec/JdbcPipeline.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,9 @@ public static Connection createSpecificConnection(String json, List<String> pars
125125
case "csv":
126126
LOGGER.debug("Connection to CSV server....");
127127
return createCsvConnection(json);
128+
case "mongo":
129+
LOGGER.debug("Connection to Mongo server....");
130+
return createMongoConnection(json);
128131
default:
129132
throw new RuntimeException("Not support");
130133
}
@@ -503,6 +506,9 @@ Map<String, String> getConnectionInfo() {
503506

504507
connectionInfo.put("type", "jdbc");
505508
break;
509+
case MONGO:
510+
connectionInfo.put("type", "mongo");
511+
break;
506512
default:
507513
throw new RuntimeException("Do not support this engine type: " + type);
508514
}
@@ -547,6 +553,8 @@ boolean visit(JsonCustomSchema schema) {
547553
type = JdbcType.ELASTICSEARCH;
548554
} else if (schema.factory.toLowerCase().contains("csv")) {
549555
type = JdbcType.CSV;
556+
} else if (schema.factory.toLowerCase().contains("mongo")) {
557+
type = JdbcType.MONGO;
550558
}
551559
return true;
552560
}

core/src/main/java/com/qihoo/qsql/exec/spark/SparkPipeline.java

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,11 @@
99
import com.qihoo.qsql.exec.result.PipelineResult;
1010
import com.qihoo.qsql.plan.proc.LoadProcedure;
1111
import com.qihoo.qsql.plan.proc.QueryProcedure;
12+
import com.qihoo.qsql.utils.PropertiesReader;
13+
import java.util.HashMap;
14+
import java.util.Map;
15+
import java.util.Properties;
16+
import org.apache.spark.SparkConf;
1217
import org.apache.spark.sql.SparkSession;
1318
import org.slf4j.Logger;
1419
import org.slf4j.LoggerFactory;
@@ -44,32 +49,45 @@ public void run() {
4449
}
4550

4651
private SparkSession session() {
47-
//TODO
4852
if (System.getenv("SPARK_HOME") != null) {
4953
SparkSession sc;
50-
if (builder.getEnableHive()) {
54+
if (this.builder.getEnableHive()) {
5155
sc = SparkSession.builder()
52-
.appName(builder.getAppName())
53-
.master(builder.getMaster())
56+
.config(loadSparkConf())
57+
.appName(this.builder.getAppName())
58+
.master(this.builder.getMaster())
5459
.enableHiveSupport()
5560
.getOrCreate();
5661
} else {
5762
sc = SparkSession.builder()
58-
.appName(builder.getAppName())
59-
.master(builder.getMaster())
63+
.config(loadSparkConf())
64+
.appName(this.builder.getAppName())
65+
.master(this.builder.getMaster())
6066
.getOrCreate();
6167
}
6268
LOGGER
63-
.debug("Initialize SparkContext successfully, App name: {}", builder.getAppName());
69+
.debug("Initialize SparkContext successfully, App name: {}", this.builder.getAppName());
6470
return sc;
65-
6671
} else {
6772
LOGGER.error(
6873
"Initialize SparkContext failed, the reason for which is not find spark env");
6974
throw new RuntimeException("No available Spark to execute. Please deploy Spark and put SPARK_HOME in env");
7075
}
7176
}
7277

78+
private SparkConf loadSparkConf() {
79+
SparkConf conf = new SparkConf();
80+
Properties properties =
81+
PropertiesReader.readProperties("quicksql-runner.properties", this.getClass());
82+
Map<String, String> map = new HashMap<String, String>((Map) properties);
83+
map.forEach((key,value) -> {
84+
if (key.startsWith("spark")) {
85+
conf.set(key,value);
86+
}
87+
});
88+
return conf;
89+
}
90+
7391
@Override
7492
public Object collect() {
7593
Requirement requirement = compileRequirement(buildWrapper().collect(builder.getAcceptedResultsNum()), session(),

core/src/main/java/com/qihoo/qsql/launcher/ArgumentsSupplier.java

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
import java.util.List;
99
import java.util.Properties;
1010
import java.util.stream.Collectors;
11-
import org.apache.commons.lang3.StringUtils;
1211

1312
/**
1413
* Generate Spark execution command.
@@ -61,11 +60,6 @@ public List<String> assemblySparkOptions() {
6160
private List<String> loadSparkConf() {
6261
Properties properties =
6362
PropertiesReader.readProperties("quicksql-runner.properties", this.getClass());
64-
//only mongo query job need set 'spark.mongodb.input.uri' parameter.
65-
if (builder.getRunnerProperties().size() > 0 && builder.getRunnerProperties().getProperty("dbType")
66-
.equalsIgnoreCase("mongo")) {
67-
properties.put("spark.mongodb.input.uri", constructMongoUrl(builder.getRunnerProperties()));
68-
}
6963
return properties.entrySet().stream()
7064
.filter(conf -> conf.getKey().toString().startsWith("spark"))
7165
.map(conf -> conf.getKey() + "=" + conf.getValue())
@@ -113,20 +107,4 @@ public List<String> assemblyFlinkOptions() {
113107
arguments.add(parser.getOptionValue(OptionsParser.SubmitOption.RUNNER));
114108
return arguments;
115109
}
116-
117-
118-
protected String constructMongoUrl(Properties properties) {
119-
//mongodb url like "mongodb://user:pass@localhost:27017/dbName.collectionName")
120-
StringBuilder mongoUrl = new StringBuilder();
121-
mongoUrl.append("mongodb://")
122-
.append(StringUtils.isNotEmpty(properties.getProperty("userName")) ? properties.getProperty("userName")
123-
+ ":" : "")
124-
.append(StringUtils.isNotEmpty(properties.getProperty("password"))
125-
? properties.getProperty("password") : "")
126-
.append("@" + properties.getProperty("host"))
127-
.append(":" + properties.getProperty("port"))
128-
.append("/" + properties.getProperty("dbName"))
129-
.append("." + properties.getProperty("collectionName"));
130-
return mongoUrl.toString();
131-
}
132110
}

core/src/main/java/com/qihoo/qsql/plan/SubtreeSyncopator.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ private boolean neededToSyncopate(RelNode parent, TableScan single) {
216216

217217
RelOptTableImpl singleImpl = ((RelOptTableImpl) single.getTable());
218218
//TODO if runner is DEFAULT, it should be operated dynamically
219-
if (runnerFuncTable.getRunner() == RunnerType.SPARK
219+
if (builder.getRunner() == RunnerType.SPARK
220220
&& singleImpl.getTable() instanceof ElasticsearchTranslatableTable) {
221221
pruneSubtree(parent, single, 0);
222222
return true;
@@ -235,7 +235,8 @@ private boolean commonNeededToSyncopate(RelNode parent, TableScan single) {
235235
}
236236

237237
RelOptTableImpl singleImpl = ((RelOptTableImpl) single.getTable());
238-
if (singleImpl.getTable() instanceof MongoTable) {
238+
if (singleImpl.getTable() instanceof MongoTable
239+
&& builder.getRunner() == RunnerType.SPARK) {
239240
pruneSubtree(parent, single, 0);
240241
return true;
241242
}
@@ -273,7 +274,7 @@ private boolean shouldBeDivided(RelOptTable left, RelOptTable right) {
273274
Table leftTable = leftImpl.getTable();
274275
Table rightTable = rightImpl.getTable();
275276

276-
return notSupportedBinOp(leftTable, leftTable)
277+
return notSupportedBinOp(leftTable, rightTable)
277278
|| isDiffFromEachOther(leftTable, rightTable);
278279
}
279280

core/src/main/resources/quicksql-runner.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,4 @@ spark.driver.userClassPathFirst=true
1515
# spark.sql.crossJoin.enabled=true
1616
# spark.speculation=true
1717
# spark.sql.files.maxPartitionBytes=134217728
18+
spark.mongodb.input.uri=mongodb://Guru99:password@10.209.242.189:7774/test.products

metastore/win32-x86/schema.db

Whitespace-only changes.

parser/src/main/java/com/qihoo/qsql/org/apache/calcite/adapter/mongodb/MongoSchemaFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.ArrayList;
2727
import java.util.List;
2828
import java.util.Map;
29+
import org.apache.commons.lang3.StringUtils;
2930

3031
/**
3132
* Factory that creates a {@link MongoSchema}.
@@ -47,7 +48,7 @@ public Schema create(SchemaPlus parentSchema, String name,
4748
final MongoClientOptions.Builder options = MongoClientOptions.builder();
4849

4950
final List<MongoCredential> credentials = new ArrayList<>();
50-
if (authMechanismName != null) {
51+
if (StringUtils.isNotBlank(authMechanismName)) {
5152
final MongoCredential credential = createCredentials(operand);
5253
credentials.add(credential);
5354
}

parser/src/main/java/com/qihoo/qsql/org/apache/calcite/adapter/mongodb/MongoTable.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import com.qihoo.qsql.org.apache.calcite.schema.TranslatableTable;
4141
import com.qihoo.qsql.org.apache.calcite.schema.impl.AbstractTableQueryable;
4242
import com.qihoo.qsql.org.apache.calcite.util.Util;
43+
import org.apache.commons.lang3.StringUtils;
4344
import org.bson.BsonDocument;
4445
import org.bson.Document;
4546
import org.bson.conversions.Bson;
@@ -57,9 +58,27 @@ public class MongoTable extends AbstractQueryableTable
5758
private Map<String, Object> operand;
5859

5960
public Properties getProperties() {
61+
Properties properties = new Properties();
62+
operand.forEach((key, value) -> properties.put(key, value.toString()));
63+
properties.put("spark.mongodb.input.uri", constructMongoUrl(properties));
6064
return properties;
6165
}
6266

67+
protected String constructMongoUrl(Properties properties) {
68+
//mongodb url like "mongodb://user:pass@localhost:27017/dbName.collectionName")
69+
StringBuilder mongoUrl = new StringBuilder();
70+
mongoUrl.append("mongodb://")
71+
.append(StringUtils.isNotEmpty(properties.getProperty("userName")) ? properties.getProperty("userName")
72+
+ ":" : "")
73+
.append(StringUtils.isNotEmpty(properties.getProperty("password"))
74+
? properties.getProperty("password") : "")
75+
.append("@" + properties.getProperty("host"))
76+
.append(":" + properties.getProperty("port"))
77+
.append("/" + properties.getProperty("dbName"))
78+
.append("." + properties.getProperty("collectionName"));
79+
return mongoUrl.toString();
80+
}
81+
6382
/**
6483
* Creates a MongoTable.
6584
*/

0 commit comments

Comments
 (0)