Skip to content

Commit a0e0fe0

Browse files
authored
Merge 1ca48b3 into f1e6120
2 parents f1e6120 + 1ca48b3 commit a0e0fe0

File tree

16 files changed

+311
-50
lines changed

16 files changed

+311
-50
lines changed

ydb/core/kqp/provider/yql_kikimr_settings.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ TKikimrConfiguration::TKikimrConfiguration() {
100100
REGISTER_SETTING(*this, MaxDPHypDPTableSize);
101101

102102
REGISTER_SETTING(*this, MaxTasksPerStage);
103+
REGISTER_SETTING(*this, DataSizePerJob);
103104
REGISTER_SETTING(*this, MaxSequentialReadsInFlight);
104105

105106
REGISTER_SETTING(*this, KMeansTreeSearchTopSize);

ydb/core/kqp/provider/yql_kikimr_settings.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,8 @@ struct TKikimrSettings {
7575

7676
NCommon::TConfSetting<ui32, false> MaxDPHypDPTableSize;
7777

78-
7978
NCommon::TConfSetting<ui32, false> MaxTasksPerStage;
79+
NCommon::TConfSetting<ui64, false> DataSizePerJob;
8080
NCommon::TConfSetting<ui32, false> MaxSequentialReadsInFlight;
8181

8282
NCommon::TConfSetting<ui32, false> KMeansTreeSearchTopSize;

ydb/core/kqp/query_compiler/kqp_query_compiler.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1106,7 +1106,7 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
11061106
IDqIntegration::TPartitionSettings pSettings;
11071107
pSettings.MaxPartitions = maxTasksPerStage;
11081108
pSettings.CanFallback = false;
1109-
pSettings.DataSizePerJob = NYql::TDqSettings::TDefault::DataSizePerJob;
1109+
pSettings.DataSizePerJob = Config->DataSizePerJob.Get().GetOrElse(NYql::TDqSettings::TDefault::DataSizePerJob);
11101110
dqIntegration->Partition(*source, partitionParams, &clusterName, ctx, pSettings);
11111111
externalSource.SetTaskParamKey(TString(dataSourceCategory));
11121112
for (const TString& partitionParam : partitionParams) {

ydb/tests/fq/solomon/test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ def test(suite, case, cfg, solomon):
6363
kqprun = KqpRun(config_file=os.path.join('ydb/tests/fq/solomon/cfg', 'kqprun_config.conf'),
6464
scheme_file=os.path.join('ydb/tests/fq/solomon/cfg', 'kqprun_scheme.sql'))
6565
yqlrun_res = kqprun.yql_exec(
66-
program=sql_query,
66+
yql_program=sql_query,
6767
var_templates=['SOLOMON_ENDPOINT', 'SOLOMON_PORT'],
6868
verbose=True,
6969
check_error=not xfail

ydb/tests/fq/tools/kqprun.py

Lines changed: 78 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import os
2+
from typing import Optional, List
23

34
import pytest
45
import yatest.common
@@ -7,30 +8,54 @@
78

89

910
class KqpRun(object):
10-
def __init__(self, config_file, scheme_file, udfs_dir=None):
11-
self.kqprun_binary = yql_utils.yql_binary_path('ydb/tests/tools/kqprun/kqprun')
11+
def __init__(self, config_file: str, scheme_file: str, udfs_dir: Optional[str] = None, path_prefix: str = ""):
12+
self.kqprun_binary: str = yql_utils.yql_binary_path('ydb/tests/tools/kqprun/kqprun')
1213

13-
self.config_file = yql_utils.yql_source_path(config_file)
14-
self.scheme_file = yql_utils.yql_source_path(scheme_file)
14+
self.config_file: str = yql_utils.yql_source_path(config_file)
15+
self.scheme_file: str = yql_utils.yql_source_path(scheme_file)
1516

16-
self.res_dir = yql_utils.get_yql_dir(prefix='kqprun_')
17+
self.res_dir: str = yql_utils.get_yql_dir(prefix=f'{path_prefix}kqprun_')
1718

1819
if udfs_dir is None:
19-
self.udfs_dir = yql_utils.get_udfs_path()
20+
self.udfs_dir: str = yql_utils.get_udfs_path()
2021
else:
21-
self.udfs_dir = udfs_dir
22+
self.udfs_dir: str = udfs_dir
2223

23-
def __res_file_path(self, name):
24+
self.tables: List[str] = []
25+
self.queries: List[str] = []
26+
27+
def __res_file_path(self, name: str) -> str:
2428
return os.path.join(self.res_dir, name)
2529

26-
def yql_exec(self, program=None, program_file=None, verbose=False, check_error=True, var_templates=None, tables=None):
30+
def add_table(self, name: str, content: List[str], attrs: Optional[str] = None):
31+
table_path = self.__res_file_path(f'table_{len(self.tables)}.yson')
32+
with open(table_path, 'w') as table:
33+
for row in content:
34+
table.write(f'{row}\n')
35+
36+
if attrs is not None:
37+
with open(f'{table_path}.attr', 'w') as table_attrs:
38+
table_attrs.write(attrs)
39+
40+
self.tables.append(f'yt./Root/plato.{name}@{table_path}')
41+
42+
def add_query(self, sql: str):
43+
query_path = self.__res_file_path(f'query_{len(self.queries)}.sql')
44+
with open(query_path, 'w') as query:
45+
query.write(sql)
46+
47+
self.queries.append(query_path)
48+
49+
def yql_exec(self, verbose: bool = False, check_error: bool = True, var_templates: Optional[List[str]] = None,
50+
yql_program: Optional[str] = None, yql_tables: List[yql_utils.Table] = []) -> yql_utils.YQLExecResult:
2751
udfs_dir = self.udfs_dir
2852

2953
config_file = self.config_file
30-
program_file = yql_utils.prepare_program(program, program_file, self.res_dir, ext='sql')[1]
3154
scheme_file = self.scheme_file
3255

3356
results_file = self.__res_file_path('results.txt')
57+
ast_file = self.__res_file_path('ast.txt')
58+
plan_file = self.__res_file_path('plan.json')
3459
log_file = self.__res_file_path('log.txt')
3560

3661
cmd = self.kqprun_binary + ' '
@@ -39,46 +64,60 @@ def yql_exec(self, program=None, program_file=None, verbose=False, check_error=T
3964
'--emulate-yt '
4065
'--exclude-linked-udfs '
4166
'--execution-case query '
42-
'--app-config=%(config_file)s '
43-
'--script-query=%(program_file)s '
44-
'--scheme-query=%(scheme_file)s '
45-
'--result-file=%(results_file)s '
46-
'--log-file=%(log_file)s '
47-
'--udfs-dir=%(udfs_dir)s '
67+
f'--app-config={config_file} '
68+
f'--scheme-query={scheme_file} '
69+
f'--result-file={results_file} '
70+
f'--script-ast-file={ast_file} '
71+
f'--script-plan-file={plan_file} '
72+
f'--log-file={log_file} '
73+
f'--udfs-dir={udfs_dir} '
4874
'--result-format full-proto '
49-
'--result-rows-limit 0 ' % locals()
75+
'--plan-format json '
76+
'--result-rows-limit 0 '
5077
)
5178

5279
if var_templates is not None:
5380
for var_template in var_templates:
54-
cmd += '--var-template %s ' % var_template
81+
cmd += f'--var-template {var_template} '
82+
83+
for query in self.queries:
84+
cmd += f'--script-query={query} '
85+
86+
if yql_program is not None:
87+
program_file = yql_utils.prepare_program(yql_program, None, self.res_dir, ext='sql')[1]
88+
cmd += f'--script-query={program_file} '
5589

56-
if tables is not None:
57-
for table in tables:
58-
if table.format != 'yson':
59-
pytest.skip('skip tests containing tables with a non-yson attribute format')
60-
cmd += '--table=yt./Root/%s@%s ' % (table.full_name, table.yqlrun_file)
90+
for table in self.tables:
91+
cmd += f'--table={table} '
92+
93+
for table in yql_tables:
94+
if table.format != 'yson':
95+
pytest.skip('skip tests containing tables with a non-yson attribute format')
96+
cmd += f'--table=yt./Root/{table.full_name}@{table.yqlrun_file} '
6197

6298
proc_result = yatest.common.process.execute(cmd.strip().split(), check_exit_code=False, cwd=self.res_dir)
6399
if proc_result.exit_code != 0 and check_error:
64-
assert 0, (
65-
'Command\n%(command)s\n finished with exit code %(code)d, stderr:\n\n%(stderr)s\n\nlog file:\n%(log_file)s'
66-
% {
67-
'command': cmd,
68-
'code': proc_result.exit_code,
69-
'stderr': proc_result.std_err,
70-
'log_file': yql_utils.read_res_file(log_file)[1],
71-
}
72-
)
100+
assert 0, f'Command\n{cmd}\n finished with exit code {proc_result.exit_code}, stderr:\n\n{proc_result.std_err}\n\nlog file:\n{yql_utils.read_res_file(log_file)[1]}'
73101

74102
results, log_results = yql_utils.read_res_file(results_file)
103+
ast, log_ast = yql_utils.read_res_file(ast_file)
104+
plan, log_plan = yql_utils.read_res_file(plan_file)
75105
err, log_err = yql_utils.read_res_file(log_file)
76106

77107
if verbose:
78-
yql_utils.log('PROGRAM:')
79-
yql_utils.log(program)
108+
yql_utils.log('QUERIES:')
109+
if yql_program is not None:
110+
yql_utils.log(yql_program)
111+
112+
for query in self.queries:
113+
yql_utils.log(yql_program)
114+
80115
yql_utils.log('RESULTS:')
81116
yql_utils.log(log_results)
117+
yql_utils.log('AST:')
118+
yql_utils.log(log_ast)
119+
yql_utils.log('PLAN:')
120+
yql_utils.log(log_plan)
82121
yql_utils.log('ERROR:')
83122
yql_utils.log(log_err)
84123

@@ -87,11 +126,11 @@ def yql_exec(self, program=None, program_file=None, verbose=False, check_error=T
87126
proc_result.std_err,
88127
results,
89128
results_file,
90-
None,
91-
None,
92-
None,
93-
None,
94-
program,
129+
ast,
130+
ast_file,
131+
plan,
132+
plan_file,
133+
yql_program,
95134
proc_result,
96135
None,
97136
)

ydb/tests/fq/yt/cfg/kqprun_config.conf

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@ FeatureFlags {
55
}
66

77
QueryServiceConfig {
8-
AvailableExternalDataSources: "YT"
9-
10-
FileStorage {
8+
AvailableExternalDataSources: "YT"
9+
10+
FileStorage {
1111
MaxFiles: 1000
1212
MaxSizeMb: 512
1313
RetryCount: 3

ydb/tests/fq/yt/kqp_yt_file.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ def run_file_kqp_no_cache(suite, case, cfg):
208208
scheme_file=os.path.join('ydb/tests/fq/yt/cfg', 'kqprun_scheme.sql'),
209209
udfs_dir=yql_binary_path('yql/essentials/tests/common/test_framework/udfs_deps'))
210210

211-
return kqprun.yql_exec(program=sql_query, verbose=True, check_error=True, tables=in_tables)
211+
return kqprun.yql_exec(yql_program=sql_query, verbose=True, check_error=True, yql_tables=in_tables)
212212

213213

214214
def run_file_kqp(suite, case, cfg):
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import os
2+
import pytest
3+
4+
from ydb.tests.fq.tools.kqprun import KqpRun
5+
6+
7+
@pytest.fixture
8+
def kqp_run(request) -> KqpRun:
9+
return KqpRun(
10+
config_file=os.path.join('ydb/tests/fq/yt/kqp_yt_import', 'kqprun_import_config.conf'),
11+
scheme_file=os.path.join('ydb/tests/fq/yt/cfg', 'kqprun_scheme.sql'),
12+
path_prefix=f"{request.function.__name__}_"
13+
)
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
from typing import Optional
2+
3+
import google.protobuf.text_format as proto
4+
import ydb.public.api.protos.ydb_value_pb2 as ydb
5+
6+
from ydb.tests.fq.tools.kqprun import KqpRun
7+
8+
9+
ValueByTypeExtractors = {
10+
ydb.Type.PrimitiveTypeId.INT64: lambda x: x.int64_value,
11+
ydb.Type.PrimitiveTypeId.STRING: lambda x: x.bytes_value,
12+
}
13+
14+
15+
def add_sample_table(kqp_run: KqpRun, table_name: str = 'input', infer_schema: bool = True):
16+
attrs: Optional[str] = None
17+
if not infer_schema:
18+
attrs = """
19+
{"_yql_row_spec" = {
20+
"Type" = ["StructType"; [
21+
["key"; ["DataType"; "String"]];
22+
["subkey"; ["DataType"; "Int64"]];
23+
["value"; ["DataType"; "String"]];
24+
]]
25+
}}
26+
"""
27+
28+
kqp_run.add_table(table_name, [
29+
'{"key"="075";"subkey"=1;"value"="abc"};',
30+
'{"key"="800";"subkey"=2;"value"="ddd"};',
31+
'{"key"="020";"subkey"=3;"value"="q"};',
32+
'{"key"="150";"subkey"=4;"value"="qzz"};'
33+
], attrs)
34+
35+
36+
def validate_sample_result(result: str):
37+
result_set = ydb.ResultSet()
38+
proto.Parse(result, result_set)
39+
40+
columns = [
41+
('key', ydb.Type.PrimitiveTypeId.STRING),
42+
('subkey', ydb.Type.PrimitiveTypeId.INT64),
43+
('value', ydb.Type.PrimitiveTypeId.STRING)
44+
]
45+
46+
assert len(result_set.columns) == len(columns)
47+
for i, (column_name, column_type_id) in enumerate(columns):
48+
assert result_set.columns[i].name == column_name
49+
50+
result_column_type = result_set.columns[i].type.type_id
51+
assert result_column_type == column_type_id, f'{result_column_type} != {column_type_id}'
52+
53+
rows = [
54+
(b'075', 1, b'abc'),
55+
(b'800', 2, b'ddd'),
56+
(b'020', 3, b'q'),
57+
(b'150', 4, b'qzz')
58+
]
59+
60+
assert len(result_set.rows) == len(rows)
61+
for i, row in enumerate(rows):
62+
for j, expected_value in enumerate(row):
63+
value_extractor = ValueByTypeExtractors[result_set.columns[j].type.type_id]
64+
result_value = value_extractor(result_set.rows[i].items[j])
65+
assert result_value == expected_value, f'{result_value} != {expected_value}'
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
ColumnShardConfig {
2+
DisabledOnSchemeShard: false
3+
}
4+
5+
FeatureFlags {
6+
EnableExternalDataSources: true
7+
EnableScriptExecutionOperations: true
8+
EnablePgSyntax: true
9+
}
10+
11+
QueryServiceConfig {
12+
AvailableExternalDataSources: "YT"
13+
14+
FileStorage {
15+
MaxFiles: 1000
16+
MaxSizeMb: 512
17+
RetryCount: 3
18+
Threads: 2
19+
}
20+
21+
Yt {
22+
DefaultSettings {
23+
Name: "InferSchema"
24+
Value: "1"
25+
}
26+
DefaultSettings {
27+
Name: "UseRPCReaderInDQ"
28+
Value: "true"
29+
}
30+
}
31+
}
32+
33+
TableServiceConfig {
34+
BlockChannelsMode: BLOCK_CHANNELS_FORCE
35+
EnableCreateTableAs: true
36+
EnableOlapSink: true
37+
EnablePerStatementQueryExecution: true
38+
39+
WriteActorSettings {
40+
MaxWriteAttempts: 1000
41+
}
42+
}

0 commit comments

Comments
 (0)