Skip to content

Commit

Permalink
Union stmt support 'OutFileClause' (apache#7026)
Browse files Browse the repository at this point in the history
The union(set operation) stmt also need to analyze 'OutFileClause'.

Whether the fragment is colocate only needs to check the plan node belonging to this fragment.
  • Loading branch information
EmmyMiao87 authored Nov 6, 2021
1 parent 5ca2712 commit 3cef2fb
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public List<List<String>> getSchema() {
return schema;
}

private void analyze(Analyzer analyzer) throws UserException {
public void analyze(Analyzer analyzer, List<Expr> resultExprs) throws UserException {
if (isAnalyzed) {
// If the query stmt is rewritten, the whole stmt will be analyzed again.
// But some of fields in this OutfileClause has been changed,
Expand Down Expand Up @@ -187,13 +187,9 @@ private void analyze(Analyzer analyzer) throws UserException {
throw new AnalysisException("Must specify BROKER properties in OUTFILE clause");
}
isAnalyzed = true;
}

public void analyze(Analyzer analyzer, SelectStmt stmt) throws UserException {
analyze(analyzer);

if (isParquetFormat()) {
analyzeForParquetFormat(stmt.getResultExprs());
analyzeForParquetFormat(resultExprs);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ public void analyze(Analyzer analyzer) throws UserException {
}
}
if (hasOutFileClause()) {
outFileClause.analyze(analyzer, this);
outFileClause.analyze(analyzer, resultExprs);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,8 @@ public void analyze(Analyzer analyzer) throws UserException {
setOpsResultExprs_ = Expr.cloneList(resultExprs);
if (evaluateOrderBy) createSortTupleInfo(analyzer);
baseTblResultExprs = resultExprs;

if (hasOutFileClause()) outFileClause.analyze(analyzer, resultExprs);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.apache.doris.load.loadv2.LoadJobFinalOperation;
import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.load.sync.SyncJob;
import org.apache.doris.master.Checkpoint;
import org.apache.doris.mysql.privilege.UserPropertyInfo;
import org.apache.doris.persist.AlterRoutineLoadJobOperationLog;
import org.apache.doris.persist.AlterViewInfo;
Expand Down
15 changes: 1 addition & 14 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -1222,23 +1222,10 @@ private boolean isColocateFragment(PlanFragment planFragment, PlanNode node) {
}

if (planFragment.hasColocatePlanNode()) {
colocateFragmentIds.add(planFragment.getId().asInt());
return true;
}

if (node instanceof HashJoinNode) {
HashJoinNode joinNode = (HashJoinNode) node;
if (joinNode.isColocate()) {
colocateFragmentIds.add(joinNode.getFragmentId().asInt());
return true;
}
}

for (PlanNode childNode : node.getChildren()) {
if (childNode.getFragmentId().asInt() == planFragment.getId().asInt() && isColocateFragment(planFragment, childNode)) {
return true;
}
}

return false;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.analysis;

import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.utframe.DorisAssert;
import org.apache.doris.utframe.UtFrameUtils;

import java.io.File;
import java.util.UUID;

import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class OutFileClauseFunctionTest {

private static String baseDir = "fe";
private static String runningDir = baseDir + "/mocked/MaterializedViewFunctionTest/"
+ UUID.randomUUID().toString() + "/";
private static ConnectContext ctx;

private static final String DB_NAME = "db1";

@BeforeClass
public static void beforeClass() throws Exception {
FeConstants.default_scheduler_interval_millisecond = 10;
FeConstants.runningUnitTest = true;
Config.enable_outfile_to_local = true;
ctx = UtFrameUtils.createDefaultCtx();
UtFrameUtils.createDorisCluster(runningDir);
CreateDbStmt createDbStmt = (CreateDbStmt) UtFrameUtils.parseAndAnalyzeStmt("create database db1;", ctx);
Catalog.getCurrentCatalog().createDb(createDbStmt);
String createTableSQL = "create table " + DB_NAME
+ ".test (k1 int, k2 varchar ) "
+ "distributed by hash(k1) buckets 3 properties('replication_num' = '1');";
CreateTableStmt createTableStmt = (CreateTableStmt) UtFrameUtils.parseAndAnalyzeStmt(createTableSQL, ctx);
Catalog.getCurrentCatalog().createTable(createTableStmt);
}

@AfterClass
public static void tearDown() {
File file = new File(runningDir);
file.delete();
}

@Test
public void testSelectStmtOutFileClause() throws Exception {
String query1 = "select * from db1.test into outfile \"file:///" + runningDir + "/result_\";";
QueryStmt analyzedQueryStmt = (QueryStmt) UtFrameUtils.parseAndAnalyzeStmt(query1, ctx);
Assert.assertTrue(analyzedQueryStmt.hasOutFileClause());
OutFileClause outFileClause = analyzedQueryStmt.getOutFileClause();
boolean isOutFileClauseAnalyzed = Deencapsulation.getField(outFileClause, "isAnalyzed");
Assert.assertTrue(isOutFileClauseAnalyzed);
Assert.assertEquals(outFileClause.getFileFormatType(), TFileFormatType.FORMAT_CSV_PLAIN);
}

@Test
public void testSetOperationStmtOutFileClause() throws Exception {
String query1 = "select * from db1.test union select * from db1.test into outfile \"file:///" + runningDir + "/result_\";";
QueryStmt analyzedSetOperationStmt = (QueryStmt) UtFrameUtils.parseAndAnalyzeStmt(query1, ctx);
Assert.assertTrue(analyzedSetOperationStmt.hasOutFileClause());
OutFileClause outFileClause = analyzedSetOperationStmt.getOutFileClause();
boolean isOutFileClauseAnalyzed = Deencapsulation.getField(outFileClause, "isAnalyzed");
Assert.assertTrue(isOutFileClauseAnalyzed);
Assert.assertEquals(outFileClause.getFileFormatType(), TFileFormatType.FORMAT_CSV_PLAIN);
}

@Test
public void testParquetFormat() throws Exception {
String query1 = "select * from db1.test union select * from db1.test into outfile \"file:///" + runningDir + "/result_\" FORMAT AS PARQUET;";
QueryStmt analyzedSetOperationStmt = (QueryStmt) UtFrameUtils.parseAndAnalyzeStmt(query1, ctx);
Assert.assertTrue(analyzedSetOperationStmt.hasOutFileClause());
OutFileClause outFileClause = analyzedSetOperationStmt.getOutFileClause();
boolean isOutFileClauseAnalyzed = Deencapsulation.getField(outFileClause, "isAnalyzed");
Assert.assertTrue(isOutFileClauseAnalyzed);
Assert.assertEquals(outFileClause.getFileFormatType(), TFileFormatType.FORMAT_PARQUET);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,4 +172,18 @@ public void sqlAggWithColocateTable() throws Exception {
Assert.assertEquals(instanceInfo.size(), 2);
}

@Test
public void checkColocatePlanFragment() throws Exception {
String sql = "select a.k1 from db1.test_colocate a, db1.test_colocate b where a.k1=b.k1 and a.k2=b.k2 group by a.k1;";
StmtExecutor executor = UtFrameUtils.getSqlStmtExecutor(ctx, sql);
Planner planner = executor.planner();
Coordinator coordinator = Deencapsulation.getField(executor, "coord");
boolean isColocateFragment0 = Deencapsulation.invoke(coordinator, "isColocateFragment",
planner.getFragments().get(1), planner.getFragments().get(1).getPlanRoot());
Assert.assertFalse(isColocateFragment0);
boolean isColocateFragment1 = Deencapsulation.invoke(coordinator, "isColocateFragment",
planner.getFragments().get(2), planner.getFragments().get(2).getPlanRoot());
Assert.assertTrue(isColocateFragment1);
}

}

0 comments on commit 3cef2fb

Please sign in to comment.