Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ocaml/idl/datamodel.ml
Original file line number Diff line number Diff line change
Expand Up @@ -1040,7 +1040,7 @@ let _ =
error Api_errors.no_more_redo_logs_allowed []
~doc:"The upper limit of active redo log instances was reached." ();

error Api_errors.could_not_import_database []
error Api_errors.could_not_import_database ["reason"]
~doc:"An error occurred while attempting to import a database from a metadata VDI" ();

error Api_errors.vm_incompatible_with_this_host []
Expand Down
87 changes: 57 additions & 30 deletions ocaml/xapi/xapi_pbd.ml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* @group XenAPI functions
*)

open Client
open Db_filter
open Db_filter_types

Expand Down Expand Up @@ -79,39 +80,65 @@ let abort_if_storage_attached_to_protected_vms ~__context ~self =
) protected_vms
end

let find_metadata_vdis ~__context ~sr =
let pool = Helpers.get_pool ~__context in
List.filter
(fun vdi ->
Db.VDI.get_type ~__context ~self:vdi = `metadata &&
Db.VDI.get_metadata_of_pool ~__context ~self:vdi = pool)
(Db.SR.get_VDIs ~__context ~self:sr)

let plug ~__context ~self =
let currently_attached = Db.PBD.get_currently_attached ~__context ~self in
if not currently_attached then
begin
let sr = Db.PBD.get_SR ~__context ~self in
Storage_access.SR.attach ~__context ~self:sr;
end
let currently_attached = Db.PBD.get_currently_attached ~__context ~self in
if not currently_attached then
begin
let sr = Db.PBD.get_SR ~__context ~self in
Storage_access.SR.attach ~__context ~self:sr;
(* Try to re-enable metadata replication to all suitable VDIs. *)
try
List.iter
(fun vdi ->
debug "Automatically re-enabling database replication to VDI %s" (Ref.string_of vdi);
Xapi_vdi_helpers.enable_database_replication ~__context ~vdi)
(find_metadata_vdis ~__context ~sr)
with Api_errors.Server_error(code, _) when code = Api_errors.no_more_redo_logs_allowed ->
(* The redo log limit was reached - don't throw an exception since automatic *)
(* re-enabling of database replication is best-effort. *)
debug "Metadata is being replicated to the maximum allowed number of VDIs."
end

let unplug ~__context ~self =
let currently_attached = Db.PBD.get_currently_attached ~__context ~self in
if currently_attached then
begin
let host = Db.PBD.get_host ~__context ~self in
let sr = Db.PBD.get_SR ~__context ~self in

if Db.Host.get_enabled ~__context ~self:host
then abort_if_storage_attached_to_protected_vms ~__context ~self;

(* If HA is enabled, prevent a PBD whose SR contains a statefile being unplugged *)
let pool = List.hd (Db.Pool.get_all ~__context) in
if Db.Pool.get_ha_enabled ~__context ~self:pool then begin
let statefiles = Db.Pool.get_ha_statefiles ~__context ~self:pool in
let statefile_srs = List.map (fun self -> Db.VDI.get_SR ~__context ~self:(Ref.of_string self)) statefiles in
if List.mem sr statefile_srs
then raise (Api_errors.Server_error(Api_errors.ha_is_enabled, []))
end;

let vdis = get_active_vdis_by_pbd ~__context ~self in
if List.length vdis > 0
then raise (Api_errors.Server_error(Api_errors.vdi_in_use,List.map Ref.string_of vdis));

Storage_access.SR.detach ~__context ~self:sr
end
let currently_attached = Db.PBD.get_currently_attached ~__context ~self in
if currently_attached then
begin
let host = Db.PBD.get_host ~__context ~self in
let sr = Db.PBD.get_SR ~__context ~self in

if Db.Host.get_enabled ~__context ~self:host
then abort_if_storage_attached_to_protected_vms ~__context ~self;

(* If HA is enabled, prevent a PBD whose SR contains a statefile being unplugged *)
let pool = List.hd (Db.Pool.get_all ~__context) in
if Db.Pool.get_ha_enabled ~__context ~self:pool then begin
let statefiles = Db.Pool.get_ha_statefiles ~__context ~self:pool in
let statefile_srs = List.map (fun self -> Db.VDI.get_SR ~__context ~self:(Ref.of_string self)) statefiles in
if List.mem sr statefile_srs
then raise (Api_errors.Server_error(Api_errors.ha_is_enabled, []))
end;

(* Disable metadata replication to VDIs in the SR. *)
List.iter
(fun vdi ->
debug "Automatically disabling database replication to VDI %s" (Ref.string_of vdi);
Xapi_vdi_helpers.disable_database_replication ~__context ~vdi)
(find_metadata_vdis ~__context ~sr);

let vdis = get_active_vdis_by_pbd ~__context ~self in
if List.length vdis > 0
then raise (Api_errors.Server_error(Api_errors.vdi_in_use,List.map Ref.string_of vdis));

Storage_access.SR.detach ~__context ~self:sr
end

let destroy ~__context ~self =
if Db.PBD.get_currently_attached ~__context ~self
Expand Down
81 changes: 18 additions & 63 deletions ocaml/xapi/xapi_sr.ml
Original file line number Diff line number Diff line change
Expand Up @@ -432,10 +432,6 @@ let assert_can_host_ha_statefile ~__context ~sr =
Xha_statefile.assert_sr_can_host_statefile ~__context ~sr

(* Metadata replication to SRs *)
let redo_log_lifecycle_mutex = Mutex.create ()
let metadata_replication : ((API.ref_SR, (API.ref_VBD * Redo_log.redo_log)) Hashtbl.t) =
Hashtbl.create Xapi_globs.redo_log_max_instances

let find_or_create_metadata_vdi ~__context ~sr =
let pool = Helpers.get_pool ~__context in
let vdi_can_be_used vdi =
Expand Down Expand Up @@ -468,71 +464,30 @@ let get_master_dom0 ~__context =
let vms = Db.Host.get_resident_VMs ~__context ~self:master in
List.hd (List.filter (fun vm -> Db.VM.get_is_control_domain ~__context ~self:vm) vms)

let provision_metadata_vdi_and_start_redo_log ~__context ~sr =
let vdi = find_or_create_metadata_vdi ~__context ~sr in
let log = Redo_log.create () in
let sr_uuid = Db.SR.get_uuid ~__context ~self:sr in
let dom0 = get_master_dom0 ~__context in
(* Create and plug vbd *)
let vbd = Helpers.call_api_functions ~__context (fun rpc session_id ->
let vbd = Client.VBD.create ~rpc ~session_id ~vM:dom0 ~empty:false ~vDI:vdi
~userdevice:"autodetect" ~bootable:false ~mode:`RW ~_type:`Disk
~unpluggable:true ~qos_algorithm_type:"" ~qos_algorithm_params:[]
~other_config:[]
in
Client.VBD.plug ~rpc ~session_id ~self:vbd;
vbd)
in
(* Enable redo_log and point it at the new device *)
let device = Db.VBD.get_device ~__context ~self:vbd in
try
Redo_log.enable_block log ("/dev/" ^ device);
Redo_log.startup log;
Hashtbl.add metadata_replication sr (vbd, log);
debug "Redo log started on SR %s" sr_uuid
with e ->
Helpers.call_api_functions ~__context (fun rpc session_id ->
Client.VBD.unplug ~rpc ~session_id ~self:vbd);
raise (Api_errors.Server_error(Api_errors.cannot_enable_redo_log,
[Printexc.to_string e]))

let enable_database_replication ~__context ~sr =
if (not (Pool_features.is_enabled ~__context Features.DR)) then
raise (Api_errors.Server_error(Api_errors.license_restriction, []));
Mutex.execute redo_log_lifecycle_mutex (fun () ->
debug "Attempting to enable metadata replication on SR [%s:%s]"
(Db.SR.get_name_label ~__context ~self:sr) (Db.SR.get_uuid ~__context ~self:sr);
if Hashtbl.mem metadata_replication sr then
debug "Metadata is already being replicated to this SR."
else begin
(* Check that the number of metadata redo_logs isn't already at the limit. *)
(* There should never actually be more redo_logs than the limit! *)
if Hashtbl.length metadata_replication >= Xapi_globs.redo_log_max_instances then
raise (Api_errors.Server_error(Api_errors.no_more_redo_logs_allowed, []))
else
provision_metadata_vdi_and_start_redo_log ~__context ~sr
end
)
let metadata_vdi = find_or_create_metadata_vdi ~__context ~sr in
Xapi_vdi_helpers.enable_database_replication ~__context ~vdi:metadata_vdi

(* Disable metadata replication to all metadata VDIs in this SR. *)
let disable_database_replication ~__context ~sr =
Mutex.execute redo_log_lifecycle_mutex (fun () ->
debug "Attempting to disable metadata replication on SR [%s:%s]."
(Db.SR.get_name_label ~__context ~self:sr) (Db.SR.get_uuid ~__context ~self:sr);
if not(Hashtbl.mem metadata_replication sr) then
debug "Metadata is not being replicated to this SR."
else begin
let (vbd, log) = Hashtbl.find metadata_replication sr in
Redo_log.shutdown log;
Redo_log.disable log;
Helpers.call_api_functions ~__context (fun rpc session_id ->
Client.VBD.unplug ~rpc ~session_id ~self:vbd);
Hashtbl.remove metadata_replication sr;
Redo_log.delete log;
let vdi = Db.VBD.get_VDI ~__context ~self:vbd in
let metadata_vdis = List.filter
(fun vdi ->
Db.VDI.get_type ~__context ~self:vdi = `metadata &&
(Db.VDI.get_metadata_of_pool ~__context ~self:vdi = Helpers.get_pool ~__context))
(Db.SR.get_VDIs ~__context ~self:sr)
in
List.iter
(fun vdi ->
Xapi_vdi_helpers.disable_database_replication ~__context ~vdi;
(* The VDI may have VBDs hanging around other than those created by the database replication code. *)
(* They must be destroyed before the VDI can be destroyed. *)
Xapi_vdi_helpers.destroy_all_vbds ~__context ~vdi;
Helpers.call_api_functions ~__context (fun rpc session_id ->
Client.VDI.destroy ~rpc ~session_id ~self:vdi)
end
)
Client.VDI.destroy ~rpc ~session_id ~self:vdi)
)
metadata_vdis

let create_new_blob ~__context ~sr ~name ~mime_type =
let blob = Xapi_blob.create ~__context ~mime_type in
Expand Down
22 changes: 13 additions & 9 deletions ocaml/xapi/xapi_vdi.ml
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,10 @@ let open_database ~__context ~self =
debug "%s" "Attaching VDI for metadata import";
Static_vdis.permanent_vdi_attach ~__context ~vdi ~reason
in
let detach vdi =
debug "%s" "Detaching VDI after metadata import";
Static_vdis.permanent_vdi_detach ~__context ~vdi
in
(* Open the database contained in the VDI *)
let db_ref_of_attached_vdi reason =
(* Read db to temporary file *)
Expand All @@ -589,22 +593,22 @@ let open_database ~__context ~self =
Db_ref.update_database db_ref (fun db -> Database.reindex db);
db_ref
in
let detach vdi =
debug "%s" "Detaching VDI after metadata import";
Static_vdis.permanent_vdi_detach ~__context ~vdi
let reason = Printf.sprintf "%s %s"
Xapi_globs.foreign_metadata_vdi_reason
(Db.VDI.get_uuid ~__context ~self)
in
try
let reason = Xapi_globs.foreign_metadata_vdi_reason in
debug "Attaching database VDI to master with reason [%s]" reason;
attach self reason;
debug "%s" "Attempting to read database";
let db_ref = db_ref_of_attached_vdi reason in
debug "%s" "Detaching metadata VDI";
detach self;
(* Create a new session to query the database, and associate it with the db ref *)
debug "%s" "Creating readonly session";
let read_only_session = Xapi_session.create_readonly_session ~__context in
Db_backend.register_session_with_database read_only_session db_ref;
read_only_session
with _ ->
raise (Api_errors.Server_error(Api_errors.could_not_import_database, []))
with e ->
(* Make sure to detach if either the attach or database read fail. *)
detach self;
let error = Printexc.to_string e in
debug "Caught %s while trying to open database" error;
raise (Api_errors.Server_error(Api_errors.could_not_import_database, [error]))
95 changes: 95 additions & 0 deletions ocaml/xapi/xapi_vdi_helpers.ml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@
* @group Storage
*)

open Client
open Pervasiveext
open Threadext

module D=Debug.Debugger(struct let name="xapi" end)
open D

(* We only support .iso files (from an iso SR) and block devices from
a local magic SR (eg /dev/hda) but NOT phantom_vbd block attach isos *)
let assert_vdi_is_valid_iso ~__context ~vdi =
Expand All @@ -28,3 +35,91 @@ let assert_managed ~__context ~vdi =
if not (Db.VDI.get_managed ~__context ~self:vdi)
then raise (Api_errors.Server_error(Api_errors.vdi_not_managed, [ Ref.string_of vdi ]))

(* Database replication to metadata VDIs. *)
let redo_log_lifecycle_mutex = Mutex.create ()

let metadata_replication : ((API.ref_VDI, (API.ref_VBD * Redo_log.redo_log)) Hashtbl.t) =
Hashtbl.create Xapi_globs.redo_log_max_instances

let get_master_dom0 ~__context =
let pool = Helpers.get_pool ~__context in
let master = Db.Pool.get_master ~__context ~self:pool in
let vms = Db.Host.get_resident_VMs ~__context ~self:master in
List.hd (List.filter (fun vm -> Db.VM.get_is_control_domain ~__context ~self:vm) vms)

(* Unplug and destroy any existing VBDs owned by the VDI. *)
let destroy_all_vbds ~__context ~vdi =
let existing_vbds = Db.VDI.get_VBDs ~__context ~self:vdi in
Helpers.call_api_functions ~__context
(fun rpc session_id -> List.iter
(fun vbd ->
try
Client.VBD.unplug ~rpc ~session_id ~self:vbd;
Client.VBD.destroy ~rpc ~session_id ~self:vbd
with Api_errors.Server_error(code, _) when code = Api_errors.device_already_detached ->
Client.VBD.destroy ~rpc ~session_id ~self:vbd)
existing_vbds)

(* Create and plug a VBD from the VDI, then create a redo log and point it at the block device. *)
let enable_database_replication ~__context ~vdi =
Mutex.execute redo_log_lifecycle_mutex (fun () ->
let name_label = Db.VDI.get_name_label ~__context ~self:vdi in
let uuid = Db.VDI.get_uuid ~__context ~self:vdi in
debug "Attempting to enable metadata replication on VDI [%s:%s]." name_label uuid;
if Hashtbl.mem metadata_replication vdi then
debug "Metadata is already being replicated to VDI [%s:%s]." name_label uuid
else begin
(* Check that the number of metadata redo_logs isn't already at the limit. *)
(* There should never actually be more redo_logs than the limit! *)
if Hashtbl.length metadata_replication >= Xapi_globs.redo_log_max_instances then
raise (Api_errors.Server_error(Api_errors.no_more_redo_logs_allowed, []));
let log = Redo_log.create () in
let dom0 = get_master_dom0 ~__context in
(* We've established that metadata is not being replicated to this VDI, so it should be safe to do this. *)
destroy_all_vbds ~__context ~vdi;
(* Create and plug vbd *)
let vbd = Helpers.call_api_functions ~__context (fun rpc session_id ->
let vbd = Client.VBD.create ~rpc ~session_id ~vM:dom0 ~empty:false ~vDI:vdi
~userdevice:"autodetect" ~bootable:false ~mode:`RW ~_type:`Disk
~unpluggable:true ~qos_algorithm_type:"" ~qos_algorithm_params:[]
~other_config:[]
in
Client.VBD.plug ~rpc ~session_id ~self:vbd;
vbd)
in
(* Enable redo_log and point it at the new device *)
let device = Db.VBD.get_device ~__context ~self:vbd in
try
Redo_log.enable_block log ("/dev/" ^ device);
Redo_log.startup log;
Redo_log.flush_db_to_redo_log (Db_ref.get_database (Db_backend.make ()));
Hashtbl.add metadata_replication vdi (vbd, log);
let vbd_uuid = Db.VBD.get_uuid ~__context ~self:vbd in
debug "Redo log started on VBD %s" vbd_uuid
with e ->
Helpers.call_api_functions ~__context (fun rpc session_id ->
Client.VBD.unplug ~rpc ~session_id ~self:vbd);
raise (Api_errors.Server_error(Api_errors.cannot_enable_redo_log,
[Printexc.to_string e]))
end
)

(* Shut down the redo log, then unplug and destroy the VBD. *)
let disable_database_replication ~__context ~vdi =
Mutex.execute redo_log_lifecycle_mutex (fun () ->
debug "Attempting to disable metadata replication on VDI [%s:%s]."
(Db.VDI.get_name_label ~__context ~self:vdi) (Db.VDI.get_uuid ~__context ~self:vdi);
if not(Hashtbl.mem metadata_replication vdi) then
debug "Metadata is not being replicated to this VDI."
else begin
let (vbd, log) = Hashtbl.find metadata_replication vdi in
Redo_log.shutdown log;
Redo_log.disable log;
Helpers.call_api_functions ~__context (fun rpc session_id ->
Client.VBD.unplug ~rpc ~session_id ~self:vbd;
Client.VBD.destroy ~rpc ~session_id ~self:vbd);
Hashtbl.remove metadata_replication vdi;
Redo_log.delete log;
Db.VDI.set_metadata_latest ~__context ~self:vdi ~value:false
end
)