Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 104 additions & 106 deletions ocaml/xapi/xapi_vm_migrate.ml
Original file line number Diff line number Diff line change
Expand Up @@ -482,119 +482,117 @@ let vdi_copy_fun __context dbg vdi_map remote is_intra_pool remote_vdis so_far t
(not is_intra_pool) || (dest_sr_uuid <> vconf.sr)
in

let remote_vdi,remote_vdi_reference,newdp,mirror_id =
if not mirror then
let dest_vdi_ref = XenAPI.VDI.get_by_uuid remote.rpc remote.session vdi_uuid in
vconf.location,dest_vdi_ref,None,None
else begin
let newdp = Printf.sprintf (if vconf.do_mirror then "mirror_%s" else "copy_%s") vconf.dp in
(* DP set up is only essential for MIRROR.start/stop due to their open ended pattern.
It's not necessary for copy which will take care of that itself。*)
if vconf.do_mirror then begin
let with_new_dp cont =
let dp = Printf.sprintf (if vconf.do_mirror then "mirror_%s" else "copy_%s") vconf.dp in
try cont dp
with e ->
(try SMAPI.DP.destroy ~dbg ~dp ~allow_leak:false with _ -> info "Failed to cleanup datapath: %s" dp);
raise e in

let with_remote_vdi remote_vdi cont =
debug "Executing remote scan to ensure VDI is known to xapi";
XenAPI.SR.scan remote.rpc remote.session dest_sr_ref;
let query = Printf.sprintf "(field \"location\"=\"%s\") and (field \"SR\"=\"%s\")" remote_vdi (Ref.string_of dest_sr_ref) in
let vdis = XenAPI.VDI.get_all_records_where remote.rpc remote.session query in
let remote_vdi_ref = match vdis with
| [] -> raise (Api_errors.Server_error(Api_errors.vdi_location_missing, [Ref.string_of dest_sr_ref; remote_vdi]))
| h :: [] -> debug "Found remote vdi reference: %s" (Ref.string_of (fst h)); fst h
| _ -> raise (Api_errors.Server_error(Api_errors.location_not_unique, [Ref.string_of dest_sr_ref; remote_vdi])) in
try cont remote_vdi_ref
with e ->
(try XenAPI.VDI.destroy remote.rpc remote.session remote_vdi_ref with _ -> error "Failed to destroy remote VDI");
raise e in

let get_mirror_record ?new_dp remote_vdi remote_vdi_reference =
{ mr_dp = new_dp;
mr_mirrored = mirror;
mr_local_sr = vconf.sr;
mr_local_vdi = vconf.location;
mr_remote_sr = dest_sr_uuid;
mr_remote_vdi = remote_vdi;
mr_local_xenops_locator = vconf.xenops_locator;
mr_remote_xenops_locator = Xapi_xenops.xenops_vdi_locator_of_strings dest_sr_uuid remote_vdi;
mr_local_vdi_reference = vconf.vdi;
mr_remote_vdi_reference = remote_vdi_reference } in

let mirror_to_remote new_dp =
let task =
if not vconf.do_mirror then
SMAPI.DATA.copy ~dbg ~sr:vconf.sr ~vdi:vconf.location ~dp:new_dp ~url:remote.sm_url ~dest:dest_sr_uuid
else begin
(* Though we have no intention of "write", here we use the same mode as the
associated VBD on a mirrored VDIs (i.e. always RW). This avoids problem
when we need to start/stop the VM along the migration. *)
let read_write = true in
ignore(SMAPI.VDI.attach ~dbg ~dp:newdp ~sr:vconf.sr ~vdi:vconf.location ~read_write);
SMAPI.VDI.activate ~dbg ~dp:newdp ~sr:vconf.sr ~vdi:vconf.location;
end;
(* DP set up is only essential for MIRROR.start/stop due to their open ended pattern.
It's not necessary for copy which will take care of that itself. *)
ignore(SMAPI.VDI.attach ~dbg ~dp:new_dp ~sr:vconf.sr ~vdi:vconf.location ~read_write);
SMAPI.VDI.activate ~dbg ~dp:new_dp ~sr:vconf.sr ~vdi:vconf.location;
ignore(Storage_access.register_mirror __context vconf.location);
SMAPI.DATA.MIRROR.start ~dbg ~sr:vconf.sr ~vdi:vconf.location ~dp:new_dp ~url:remote.sm_url ~dest:dest_sr_uuid
end in

let mapfn x =
let total = Int64.to_float total_size in
let done_ = Int64.to_float !so_far /. total in
let remaining = Int64.to_float vconf.size /. total in
done_ +. x *. remaining in

let open Storage_access in

let task_result =
task |> register_task __context
|> add_to_progress_map mapfn
|> wait_for_task dbg
|> remove_from_progress_map
|> unregister_task __context
|> success_task dbg in

let mirror_id, remote_vdi =
if not vconf.do_mirror then
let vdi = task_result |> vdi_of_task dbg in
remote_vdis := vdi.vdi :: !remote_vdis;
None, vdi.vdi
else
let mirrorid = task_result |> mirror_of_task dbg in
let m = SMAPI.DATA.MIRROR.stat ~dbg ~id:mirrorid in
Some mirrorid, m.Mirror.dest_vdi in

let mapfn =
let start = (Int64.to_float !so_far) /. (Int64.to_float total_size) in
let len = (Int64.to_float vconf.size) /. (Int64.to_float total_size) in
fun x -> start +. x *. len
in
so_far := Int64.add !so_far vconf.size;
debug "Local VDI %s %s to %s" vconf.location (if vconf.do_mirror then "mirrored" else "copied") remote_vdi;
mirror_id, remote_vdi in

let task = if not vconf.do_mirror then
SMAPI.DATA.copy ~dbg ~sr:vconf.sr ~vdi:vconf.location ~dp:newdp ~url:remote.sm_url ~dest:dest_sr_uuid
else begin
ignore(Storage_access.register_mirror __context vconf.location);
SMAPI.DATA.MIRROR.start ~dbg ~sr:vconf.sr ~vdi:vconf.location ~dp:newdp ~url:remote.sm_url ~dest:dest_sr_uuid
end
in

let open Storage_access in
let task_result =
task |> register_task __context
|> add_to_progress_map mapfn
|> wait_for_task dbg
|> remove_from_progress_map
|> unregister_task __context
|> success_task dbg in

let vdi, mirror_id =
if not vconf.do_mirror
then begin
let vdi = task_result |> vdi_of_task dbg in
remote_vdis := vdi.vdi :: !remote_vdis;
vdi.vdi,None
end else begin
let mirror_id = task_result |> mirror_of_task dbg in
let m = SMAPI.DATA.MIRROR.stat ~dbg ~id:mirror_id in
m.Mirror.dest_vdi, Some mirror_id
end
in

so_far := Int64.add !so_far vconf.size;

debug "Local VDI %s mirrored to %s" vconf.location vdi;
debug "Executing remote scan to ensure VDI is known to xapi";
XenAPI.SR.scan remote.rpc remote.session dest_sr_ref;
let query = Printf.sprintf "(field \"location\"=\"%s\") and (field \"SR\"=\"%s\")" vdi (Ref.string_of dest_sr_ref) in
let vdis = XenAPI.VDI.get_all_records_where remote.rpc remote.session query in

if List.length vdis <> 1 then error "Could not locate remote VDI: query='%s', length of results: %d" query (List.length vdis);

let remote_vdi_reference = fst (List.hd vdis) in

debug "Found remote vdi reference: %s" (Ref.string_of remote_vdi_reference);
vdi, remote_vdi_reference, (Some newdp), mirror_id
end
in
let mirror_record = ({ mr_dp = newdp;
mr_mirrored = mirror;
mr_local_sr = vconf.sr;
mr_local_vdi = vconf.location;
mr_remote_sr = dest_sr_uuid;
mr_remote_vdi = remote_vdi;
mr_local_xenops_locator = vconf.xenops_locator;
mr_remote_xenops_locator = Xapi_xenops.xenops_vdi_locator_of_strings dest_sr_uuid remote_vdi;
mr_local_vdi_reference = vconf.vdi;
mr_remote_vdi_reference = remote_vdi_reference; }) in
try
let result = continuation mirror_record in

(match mirror_id with
| Some mid -> ignore(Storage_access.unregister_mirror mid);
| None -> ());

if mirror && not (Xapi_fist.storage_motion_keep_vdi () || copy)
then
Helpers.call_api_functions ~__context (fun rpc session_id ->
let post_mirror mirror_id mirror_record =
try
let result = continuation mirror_record in
(match mirror_id with
| Some mid -> ignore(Storage_access.unregister_mirror mid);
| None -> ());
if mirror && not (Xapi_fist.storage_motion_keep_vdi () || copy) then
Helpers.call_api_functions ~__context (fun rpc session_id ->
XenAPI.VDI.destroy rpc session_id vconf.vdi);

result
with e ->
(* Stop mirroring and check to see if it was us that caused the mirror failure *)
let mirror_failed =
match mirror_id with
| Some mid ->
(try SMAPI.DATA.MIRROR.stop ~dbg ~id:mid with _ -> ());
let m = SMAPI.DATA.MIRROR.stat ~dbg ~id:mid in
m.Mirror.failed
| None ->
false
in

(* Now that mirroring has finished, clean up any datapath we made *)
(match newdp with | Some dp -> (try SMAPI.DP.destroy ~dbg ~dp ~allow_leak:false with _ -> error "Failed to cleanup datapath: %s" dp) | None -> ());

(* And destroy the new VDI *)
(try XenAPI.VDI.destroy remote.rpc remote.session remote_vdi_reference with _ -> error "Failed to destroy remote VDI");

if mirror_failed then raise (Api_errors.Server_error(Api_errors.mirror_failed,[Ref.string_of vconf.vdi]));

raise e
result
with e ->
let mirror_failed =
match mirror_id with
| Some mid ->
ignore(Storage_access.unregister_mirror mid);
(try SMAPI.DATA.MIRROR.stop ~dbg ~id:mid with _ -> ());
let m = SMAPI.DATA.MIRROR.stat ~dbg ~id:mid in
m.Mirror.failed
| None -> false in
if mirror_failed then raise (Api_errors.Server_error(Api_errors.mirror_failed,[Ref.string_of vconf.vdi]))
else raise e in

if mirror then
with_new_dp (fun new_dp ->
let mirror_id, remote_vdi = mirror_to_remote new_dp in
with_remote_vdi remote_vdi (fun remote_vdi_ref ->
let mirror_record = get_mirror_record ~new_dp remote_vdi remote_vdi_ref in
post_mirror mirror_id mirror_record))
else
let mirror_record = get_mirror_record vconf.location (XenAPI.VDI.get_by_uuid remote.rpc remote.session vdi_uuid) in
continuation mirror_record

let wait_for_fist __context fistpoint name =
if fistpoint () then begin
Expand Down