[Bug] [Flink] After K8s Session runs the job, the console is always in the state of executing the job. #3372
Closed
Description
Search before asking
- I had searched in the issues and found no similar issues.
What happened
K8s Session 运行作业后控制台一直处于执行作业的状态,任务提交成功的,Flink UI 里能看到任务正常运行,运维中心看不到对应的实例,或者处于 Unknown 状态,特别是在启动的作业多了之后,会频繁出现这种情况,没有报错日志
版本:
Dinky 1.0.1
Mariadb 10.6.11
Paimon 0.8
Flink 1.17.2
Kafka 3.4
FLINK SQL:
set 'execution.checkpointing.interval' = '30s';
set 'execution.checkpointing.max-concurrent-checkpoints' = '1';
set 'execution.checkpointing.tolerable-failed-checkpoints' = '1';
set 'execution.checkpointing.min-pause' = '30s';
set 'table.exec.state.ttl' = '45min';
set 'execution.checkpointing.timeout' = '1h';
set 'table.exec.sink.upsert-materialize' = 'NONE';
CREATE CATALOG paimon WITH (
'type' = 'paimon',
'warehouse' = 's3://uat-warehouse/paimon',
's3.endpoint' = 'http://minio:9000',
's3.access-key' = '',
's3.secret-key' = '',
'fs.s3a.connection.maximum'='2000',
'fs.s3a.threads.max'='4000',
'fs.s3a.buffer.dir'='/tmp/'
);
use catalog default_catalog;
create database if not exists source_kafka ;
set 'parallelism.default' = '12' ;
create table default_catalog.source_kafka.source_s4h_kafka_qm_zqminspectdefect
(
`CHARG` STRING,`POSNR` STRING,`FEGRP` STRING,`FECOD` STRING,`ANZFEHLER` DOUBLE,`FEHLBEWC` STRING,`PRUEFLINR` STRING,`LASTRECTXNTYPE` STRING,`RECCREDATE` STRING,`LASTRECTXNDATE` STRING,`LASTRECTXNTIME` STRING,`LASTRECTXNUSERID` STRING
) with (
'connector' = 'kafka'
, 'topic' = 'ZQMINSPECTDEFECT.A20.SAP'
, 'properties.group.id' = 'source_s4h_qm_zqminspectdefect_sink'
, 'scan.startup.mode' = 'group-offsets'
, 'properties.bootstrap.servers' = 'kafka.confluent:9071'
, 'properties.auto.offset.reset' = 'earliest'
, 'properties.security.protocol' = 'SASL_SSL'
, 'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="" password="";'
, 'properties.sasl.mechanism' = 'PLAIN'
, 'format' = 'json'
, 'json.fail-on-missing-field' = 'false'
, 'json.ignore-parse-errors' = 'true'
);
insert into paimon.ods.ods_s4h_qm_zqminspectdefect/*+ OPTIONS('sink.use-managed-memory-allocator'='true', 'sink.managed.writer-buffer-memory'='256M') */
select
`CHARG` ,`POSNR` ,`FEGRP` ,`FECOD` ,`ANZFEHLER` ,`FEHLBEWC` ,`PRUEFLINR` ,`LASTRECTXNTYPE` ,`RECCREDATE` ,`LASTRECTXNDATE` ,`LASTRECTXNTIME` ,`LASTRECTXNUSERID` ,
cast( FROM_UNIXTIME(UNIX_TIMESTAMP()) as TIMESTAMP(3) ) as data_update_time
from default_catalog.source_kafka.source_s4h_kafka_qm_zqminspectdefect ;
What you expected to happen
作业正常启动,运维中心状态正常
How to reproduce
- 新建 K8s Session 集群配置,jobmanager.flink:8081
- 新建flink sql任务,选择 K8s Session 模式
- 运行flink sql多个项目
Anything else
No response
Version
1.0.0
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct
Metadata
Assignees
Type
Projects
Status
Done