Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,8 @@ void TCheckpointCoordinator::Handle(const NYql::NDq::TEvDqCompute::TEvRestoreFro
const TString& statusName = NYql::NDqProto::TEvRestoreFromCheckpointResult_ERestoreStatus_Name(status);
CC_LOG_D("[" << checkpoint << "] Got TEvRestoreFromCheckpointResult; taskId: "<< record.GetTaskId()
<< ", checkpoint: " << checkpoint
<< ", status: " << statusName);
<< ", status: " << statusName
<< ", issues: " << NYql::IssuesFromMessageAsString(record.GetIssues()));

if (!PendingRestoreCheckpoint) {
CC_LOG_E("[" << checkpoint << "] Got TEvRestoreFromCheckpointResult but has no PendingRestoreCheckpoint");
Expand All @@ -301,9 +302,10 @@ void TCheckpointCoordinator::Handle(const NYql::NDq::TEvDqCompute::TEvRestoreFro
}

if (status != NYql::NDqProto::TEvRestoreFromCheckpointResult_ERestoreStatus_OK) {
CC_LOG_E("[" << checkpoint << "] Can't restore: " << statusName);
auto msg = TStringBuilder() << "Can't restore: " << statusName << ", " << NYql::IssuesFromMessageAsString(record.GetIssues());
CC_LOG_E("[" << checkpoint << "] " << msg);
++*Metrics.RestoringError;
NYql::TTaskControllerImpl<TCheckpointCoordinator>::OnError(NYql::NDqProto::StatusIds::ABORTED, "Can't restore: " + statusName, {});
NYql::TTaskControllerImpl<TCheckpointCoordinator>::OnError(NYql::NDqProto::StatusIds::ABORTED, msg, {});
return;
}

Expand Down
8 changes: 7 additions & 1 deletion ydb/library/yql/dq/actors/compute/dq_compute_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <ydb/library/yql/dq/runtime/dq_tasks_runner.h>
#include <ydb/library/yql/dq/runtime/dq_transport.h>
#include <ydb/library/yql/public/issue/yql_issue.h>
#include <ydb/library/yql/public/issue/yql_issue_message.h>

#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/actors/core/hfunc.h>
Expand Down Expand Up @@ -165,10 +166,15 @@ struct TEvDqCompute {

using TBaseEventPB::TBaseEventPB;

TEvRestoreFromCheckpointResult(const NDqProto::TCheckpoint& checkpoint, ui64 taskId, NDqProto::TEvRestoreFromCheckpointResult::ERestoreStatus status) {
TEvRestoreFromCheckpointResult(
const NDqProto::TCheckpoint& checkpoint,
ui64 taskId,
NDqProto::TEvRestoreFromCheckpointResult::ERestoreStatus status,
const NYql::TIssues& issues) {
Record.MutableCheckpoint()->CopyFrom(checkpoint);
Record.SetTaskId(taskId);
Record.SetStatus(status);
NYql::IssuesToMessage(issues, Record.MutableIssues());
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvRestoreFromCheckpoint::
switch (StateLoadPlan.GetStateType()) {
case NDqProto::NDqStateLoadPlan::STATE_TYPE_EMPTY:
{
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::OK));
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::OK, NYql::TIssues{}));
break;
}
case NDqProto::NDqStateLoadPlan::STATE_TYPE_OWN:
Expand All @@ -301,9 +301,12 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvRestoreFromCheckpoint::
}
default:
{
LOG_CP_E(checkpoint, "Unsupported state type: "
<< NDqProto::NDqStateLoadPlan::EStateType_Name(StateLoadPlan.GetStateType()) << " (" << static_cast<int>(StateLoadPlan.GetStateType()) << ")");
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::INTERNAL_ERROR));
auto message = TStringBuilder() << "Unsupported state type: "
<< NDqProto::NDqStateLoadPlan::EStateType_Name(StateLoadPlan.GetStateType()) << " (" << static_cast<int>(StateLoadPlan.GetStateType()) << ")";
LOG_CP_E(checkpoint, message);
NYql::TIssues issues;
issues.AddIssue(message);
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::INTERNAL_ERROR, issues));
break;
}
}
Expand All @@ -324,13 +327,17 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvGetTaskStateResult::TPt

if (!ev->Get()->Issues.Empty()) {
LOG_CP_E(checkpoint, "TEvGetTaskStateResult error: " << ev->Get()->Issues.ToOneLineString());
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::STORAGE_ERROR), ev->Cookie);
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::STORAGE_ERROR, ev->Get()->Issues), ev->Cookie);
return;
}

if (ev->Get()->States.size() != taskIdsSize) {
LOG_CP_E(checkpoint, "TEvGetTaskStateResult unexpected states count: " << ev->Get()->States.size() << ", expected: " << taskIdsSize);
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::STORAGE_ERROR), ev->Cookie);

auto message = TStringBuilder() << "TEvGetTaskStateResult unexpected states count: " << ev->Get()->States.size() << ", expected: " << taskIdsSize;
LOG_CP_E(checkpoint, message);
NYql::TIssues issues;
issues.AddIssue(message);
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::STORAGE_ERROR, issues), ev->Cookie);
return;
}

Expand All @@ -352,11 +359,14 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvGetTaskStateResult::TPt
void TDqComputeActorCheckpoints::AfterStateLoading(const TMaybe<TString>& error) {
auto& checkpoint = RestoringTaskRunnerForCheckpoint;
if (error.Defined()) {
LOG_CP_E(checkpoint, "Failed to load state: " << error << ", ABORTED");
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::INTERNAL_ERROR), RestoringTaskRunnerForEvent);
auto message = TStringBuilder() << "Failed to load state: " << error << ", ABORTED";
LOG_CP_E(checkpoint, message);
NYql::TIssues issues;
issues.AddIssue(message);
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::INTERNAL_ERROR, issues), RestoringTaskRunnerForEvent);
return;
}
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::OK), RestoringTaskRunnerForEvent);
EventsQueue.Send(MakeHolder<TEvDqCompute::TEvRestoreFromCheckpointResult>(checkpoint, Task.GetId(), NDqProto::TEvRestoreFromCheckpointResult::OK, NYql::TIssues{}), RestoringTaskRunnerForEvent);
LOG_CP_D(checkpoint, "Checkpoint state restored");
}

Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/dq/actors/protos/dq_events.proto
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ message TEvRestoreFromCheckpointResult {
optional TCheckpoint Checkpoint = 1;
optional uint64 TaskId = 2;
optional ERestoreStatus Status = 3;
repeated Ydb.Issue.IssueMessage Issues = 4;

optional TMessageTransportMeta TransportMeta = 100;
}
Expand Down
41 changes: 41 additions & 0 deletions ydb/tests/fq/yds/test_recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,3 +387,44 @@ def test_ic_disconnection(self, client):
time.sleep(yatest_common.plain_or_under_sanitizer(0.5, 2))

close_ic_sessions_future.wait()

@yq_v1
def test_program_state_recovery_error_if_no_states(self, client, kikimr):
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
kikimr.control_plane.wait_bootstrap(node_index)
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
kikimr.control_plane.wait_discovery(node_index)
self.init_topics("error_if_no_states", partitions_count=1)

sql = R'''
INSERT INTO myyds.`{output_topic}`
SELECT STREAM * FROM myyds.`{input_topic}`;'''\
.format(
input_topic=self.input_topic,
output_topic=self.output_topic,
)
client.create_yds_connection("myyds", os.getenv("YDB_DATABASE"), os.getenv("YDB_ENDPOINT"))

query_id = client.create_query("error_if_no_states", sql, type=fq.QueryContent.QueryType.STREAMING).result.query_id
client.wait_query_status(query_id, fq.QueryMeta.RUNNING)
kikimr.compute_plane.wait_zero_checkpoint(query_id)
kikimr.compute_plane.wait_completed_checkpoints(query_id, kikimr.compute_plane.get_completed_checkpoints(query_id) + 1)

for node_index in kikimr.control_plane.kikimr_cluster.nodes:
kikimr.control_plane.kikimr_cluster.nodes[node_index].stop()

session = kikimr.driver.table_client.session().create()
checkpoint_table_prefix = "/local/CheckpointCoordinatorStorage_" + kikimr.uuid + '/states'
session.transaction().execute(f"DELETE FROM `{checkpoint_table_prefix}`", commit_tx=True)

for node_index in kikimr.control_plane.kikimr_cluster.nodes:
kikimr.control_plane.kikimr_cluster.nodes[node_index].start()
for node_index in kikimr.control_plane.kikimr_cluster.nodes:
kikimr.control_plane.wait_bootstrap(node_index)

client.wait_query_status(query_id, fq.QueryMeta.FAILED)
describe_result = client.describe_query(query_id).result
logging.debug("Describe result: {}".format(describe_result))
describe_string = "{}".format(describe_result)
assert r"Can\'t restore: STORAGE_ERROR" in describe_string
assert r"Checkpoint is not found" in describe_string