Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions ocaml/idl/datamodel_cluster.ml
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,13 @@ let t =
~ty:(Set (Ref _cluster_host)) "cluster_hosts"
"A list of the cluster_host objects associated with the Cluster"

; field ~qualifier:DynamicRO ~lifecycle:[ Prototyped, rel_lima, "" ]
; field ~qualifier:DynamicRO
~lifecycle:[ Prototyped, rel_lima, "" ; Changed, rel_next, "allow encoding of IP_with_hostname" ]
(* this column stores addresses
* the old encoding is to store the IP directly as a string
* the new encoding is to store rpc string rep of the address *)
~ty:(Set String) "pending_forget" ~default_value:(Some (VSet []))
"Internal field used by Host.destroy to store the IP of cluster members \
"Internal field used by Host.destroy to store the address of cluster members \
marked as permanently dead but not yet removed"

; field ~qualifier:StaticRO ~lifecycle
Expand Down
2 changes: 1 addition & 1 deletion ocaml/idl/schematest.ml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
let hash x = Digest.string x |> Digest.to_hex

(* BEWARE: if this changes, check that schema has been bumped accordingly *)
let last_known_schema_hash = "aa2bb241740310988f3f16b236392066"
let last_known_schema_hash = "9cd231613de78338d83df8075bcbca9f"

let current_schema_hash : string =
let open Datamodel_types in
Expand Down
9 changes: 7 additions & 2 deletions ocaml/tests/test_cluster.ml
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,13 @@ let test_clusterd_rpc ~__context call =
Rpc.{success= true; contents= Rpc.Null; is_notification= false}
| "diagnostics", _ ->
let open Cluster_interface in
let id = 1l in
let me = {addr= IPv4 "192.0.2.1"; id} in
let host = Helpers.get_localhost ~__context in
let me =
{
addr= Xapi_clustering.Addr.of_ip_host ~__context ~host ~ip:"192.0.2.1"
; id= 1l
}
in
let cluster_config =
{
cluster_name= "xapi-clusterd"
Expand Down
10 changes: 8 additions & 2 deletions ocaml/tests/test_cluster_host.ml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ let test_fix_prereq () =
Alcotest.check_raises "Should fail when checking PIF prequisites"
Api_errors.(
Server_error (pif_has_no_network_configuration, [Ref.string_of pifref]))
(fun () -> Xapi_cluster_host.fix_pif_prerequisites __context pifref) ;
(fun () -> Xapi_cluster_host.fix_pif_prerequisites ~__context pifref) ;
Db.PIF.set_IP ~__context ~self:pifref ~value:"1.1.1.1" ;
Xapi_cluster_host.fix_pif_prerequisites ~__context pifref ;
let pif = Xapi_clustering.pif_of_host ~__context network host in
Expand Down Expand Up @@ -202,10 +202,16 @@ let test_forget2 () =
let __context = Test_common.make_test_database () in
let host2 = Test_common.make_host ~__context () in
let host3 = Test_common.make_host __context () in
let host3_address =
Xapi_clustering.Addr.of_ip_host ~__context ~ip:"192.0.2.3" ~host:host3
in
let cluster, original_cluster_hosts = make ~__context [host2; host3] in
Xapi_host.destroy ~__context ~self:host3 ;
let pending = Db.Cluster.get_pending_forget ~__context ~self:cluster in
Alcotest.(check (list string) "1 pending forgets" ["192.0.2.3"] pending) ;
Alcotest.(check (list string))
"host3 should be a pending forget"
[Xapi_clustering.Addr._string_of_address host3_address]
pending ;
Xapi_host.destroy ~__context ~self:host2 ;
Db_gc_util.gc_Cluster_hosts ~__context ;
let cluster_hosts = Db.Cluster.get_cluster_hosts ~__context ~self:cluster in
Expand Down
2 changes: 1 addition & 1 deletion ocaml/xapi/certificates.ml
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ end = struct
let expr =
let open Db_filter_types in
let type' = Eq (Field "type", Literal "ca") in
let name' = Eq (Field "type", Literal name) in
let name' = Eq (Field "name", Literal name) in
And (type', name')
in
let self =
Expand Down
7 changes: 7 additions & 0 deletions ocaml/xapi/message_forwarding.ml
Original file line number Diff line number Diff line change
Expand Up @@ -936,6 +936,13 @@ functor
Cert_distrib.(
go ~__context ~existing_cert_strategy:Erase_old
~from_hosts:all_hosts ~to_hosts:all_hosts) ;

(* check clusterds are set up correctly *)
Db.Cluster.get_all ~__context
|> List.iter (fun self ->
Xapi_clustering.enable_tls_verification_prechecks ~__context
~self) ;

all_hosts
|> List.iter (fun host ->
do_op_on ~local_fn ~__context ~host (fun session_id rpc ->
Expand Down
4 changes: 2 additions & 2 deletions ocaml/xapi/xapi_cluster.ml
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@ let create ~__context ~pIF ~cluster_stack ~pool_auto_join ~token_timeout
let host = Helpers.get_master ~__context in
let pifrec = Db.PIF.get_record ~__context ~self:pIF in
assert_pif_prerequisites (pIF, pifrec) ;
let ip = ip_of_pif (pIF, pifrec) in
let addr = Addr.of_pif_host ~__context ~host (pIF, pifrec) in
let token_timeout_ms = Int64.of_float (token_timeout *. 1000.0) in
let token_timeout_coefficient_ms =
Int64.of_float (token_timeout_coefficient *. 1000.0)
in
let init_config =
{
Cluster_interface.local_ip= ip
Cluster_interface.local_ip= addr
; token_timeout_ms= Some token_timeout_ms
; token_coefficient_ms= Some token_timeout_coefficient_ms
; name= None
Expand Down
47 changes: 26 additions & 21 deletions ocaml/xapi/xapi_cluster_host.ml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ let fix_pif_prerequisites ~__context (self : API.ref_PIF) =
avoids making any changes to the PIF if there's something we
simply can't fix. *)
let pif_rec self = Db.PIF.get_record ~__context ~self in
ip_of_pif (self, pif_rec self) |> ignore ;
Addr.assert_ip_ok (self, pif_rec self) ;
if not (pif_rec self).API.pIF_currently_attached then
Helpers.call_api_functions ~__context (fun rpc session_id ->
Client.Client.PIF.plug ~rpc ~session_id ~self) ;
Expand Down Expand Up @@ -74,31 +74,31 @@ let join_internal ~__context ~self =
let cluster_token =
Db.Cluster.get_cluster_token ~__context ~self:cluster
in
let ip = ip_of_pif (pIF, Db.PIF.get_record ~__context ~self:pIF) in
let ip_list =
let addr = Addr.of_cluster_host ~__context ~cluster_host:self in
let addr_list =
List.filter_map
(fun self ->
let p_ref = Db.Cluster_host.get_PIF ~__context ~self in
let p_rec = Db.PIF.get_record ~__context ~self:p_ref in
(* parallel join: some hosts may not have an IP yet *)
try
let other_ip = ip_of_pif (p_ref, p_rec) in
if other_ip <> ip then
Some other_ip
let other_addr =
Addr.of_cluster_host ~__context ~cluster_host:self
in
if other_addr <> addr then
Some other_addr
else
None
with _ -> None)
(Db.Cluster.get_cluster_hosts ~__context ~self:cluster)
in
if ip_list = [] then
if addr_list = [] then
raise
Api_errors.(
Server_error (no_cluster_hosts_reachable, [Ref.string_of cluster])) ;
debug "Enabling clusterd and joining cluster_host %s" (Ref.string_of self) ;
Xapi_clustering.Daemon.enable ~__context ;
let result =
Cluster_client.LocalClient.join (rpc ~__context) dbg cluster_token ip
ip_list
Cluster_client.LocalClient.join (rpc ~__context) dbg cluster_token addr
addr_list
in
match Idl.IdM.run @@ Cluster_client.IDL.T.get result with
| Ok () ->
Expand Down Expand Up @@ -191,20 +191,23 @@ let destroy ~__context ~self =
in
destroy_op ~__context ~self ~force:false

let ip_of_str str = Cluster_interface.IPv4 str

let forget ~__context ~self =
with_clustering_lock __LOC__ (fun () ->
let module Addr = Xapi_clustering.Addr in
let dbg = Context.string_of_task __context in
let cluster = Db.Cluster_host.get_cluster ~__context ~self in
let pif = Db.Cluster_host.get_PIF ~__context ~self in
let ip = Db.PIF.get_IP ~__context ~self:pif in
let new_pending_forget =
Addr.of_cluster_host ~__context ~cluster_host:self
in
let pending =
ip :: Db.Cluster.get_pending_forget ~__context ~self:cluster
new_pending_forget :: Addr.get_pending_forgets ~__context ~cluster
in
debug "Setting pending forget to %s" (String.concat "," pending) ;
Db.Cluster.set_pending_forget ~__context ~self:cluster ~value:pending ;
let pending = List.map ip_of_str pending in
debug "Setting pending_forget_hosts to '%s'"
(pending
|> List.map (Cluster_interface.printaddr ())
|> String.concat ","
) ;
Addr.set_pending_forgets ~__context ~cluster pending ;
let result =
Cluster_client.LocalClient.declare_dead (rpc ~__context) dbg pending
in
Expand Down Expand Up @@ -233,10 +236,12 @@ let enable ~__context ~self =
let pifref = Db.Cluster_host.get_PIF ~__context ~self in
let pifrec = Db.PIF.get_record ~__context ~self:pifref in
assert_pif_prerequisites (pifref, pifrec) ;
let ip = ip_of_pif (pifref, pifrec) in
let addr =
Addr.of_pif_cluster_host ~__context ~cluster_host:self (pifref, pifrec)
in
let init_config =
{
Cluster_interface.local_ip= ip
Cluster_interface.local_ip= addr
; token_timeout_ms= None
; token_coefficient_ms= None
; name= None
Expand Down
99 changes: 90 additions & 9 deletions ocaml/xapi/xapi_clustering.ml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,67 @@ module D = Debug.Make (struct let name = "xapi_clustering" end)

open D

module Addr = struct
(* we factor cluster address related details here, in particular those of
* hostnames. a host's hostname should match the CN of the certificate
* served by the cluster daemon. we assume this is the host uuid *)

(* there are multiple ways of getting a [cluster_address], we implement
* several, for performance / convenience *)

let assert_ip_ok ((pifref, pifrec) : API.ref_PIF * API.pIF_t) : unit =
if pifrec.API.pIF_IP = "" then
raise
Api_errors.(
Server_error (pif_has_no_network_configuration, [Ref.string_of pifref]))

let of_ip_host ~__context ~host ~ip : Cluster_interface.address =
let hostname = Db.Host.get_uuid ~__context ~self:host in
IP_with_hostname {ip; hostname}

let of_pif_host ~__context ~host (ref, record) : Cluster_interface.address =
assert_ip_ok (ref, record) ;
let ip = record.API.pIF_IP in
of_ip_host ~__context ~host ~ip

let of_pif_cluster_host ~__context ~cluster_host (pifref, pifrec) :
Cluster_interface.address =
let host = Db.Cluster_host.get_host ~__context ~self:cluster_host in
of_pif_host ~__context ~host (pifref, pifrec)

let of_cluster_host ~__context ~cluster_host =
let pifref = Db.Cluster_host.get_PIF ~__context ~self:cluster_host in
let pifrec = Db.PIF.get_record ~__context ~self:pifref in
of_pif_cluster_host ~__context ~cluster_host (pifref, pifrec)

let all_members ~__context ~cluster =
Db.Cluster.get_cluster_hosts ~__context ~self:cluster
|> List.map @@ fun self -> of_cluster_host ~__context ~cluster_host:self

let _string_of_address (x : Cluster_interface.address) : string =
Rpcmarshal.marshal Cluster_interface.typ_of_address x |> Jsonrpc.to_string

let _address_of_string (x : string) : Cluster_interface.address option =
match
x
|> Jsonrpc.of_string
|> Rpcmarshal.unmarshal Cluster_interface.typ_of_address
with
| Error _ | (exception _) ->
None
| Ok x ->
Some x

let get_pending_forgets ~__context ~cluster : Cluster_interface.address list =
Db.Cluster.get_pending_forget ~__context ~self:cluster
|> List.map @@ fun s ->
match _address_of_string s with None -> IPv4 s | Some x -> x

let set_pending_forgets ~__context ~cluster xs : unit =
Db.Cluster.set_pending_forget ~__context ~self:cluster
~value:(List.map _string_of_address xs)
end

(* Called by Cluster.create/destroy *)
let set_ha_cluster_stack ~__context =
let self = Helpers.get_pool ~__context in
Expand Down Expand Up @@ -64,14 +125,6 @@ let pif_of_host ~__context (network : API.ref_network) (host : API.ref_host) =
debug "%s" msg ;
raise Api_errors.(Server_error (internal_error, [msg]))

let ip_of_pif (ref, record) =
let ip = record.API.pIF_IP in
if ip = "" then
raise
Api_errors.(
Server_error (pif_has_no_network_configuration, [Ref.string_of ref])) ;
Cluster_interface.IPv4 ip

(** [assert_pif_prerequisites (pif_ref,pif_rec)] raises an exception if any of
the prerequisites of using a PIF for clustering are unmet. These
prerequisites are:
Expand All @@ -92,7 +145,7 @@ let assert_pif_prerequisites pif =
Server_error (required_pif_is_unplugged, [Ref.string_of pif_ref]))
in
assert_pif_permaplugged pif ;
ignore (ip_of_pif pif) ;
Addr.assert_ip_ok pif ;
debug "Got IP %s for PIF %s" record.API.pIF_IP (Ref.string_of pif_ref)

let assert_pif_attached_to ~__context ~host ~pIF =
Expand Down Expand Up @@ -378,3 +431,31 @@ let compute_corosync_max_host_failures ~__context =
((nhosts - disabled_hosts - 1) / 2) + disabled_hosts
in
corosync_ha_max_hosts

let enable_tls_verification_prechecks ~__context ~self =
let dbg = Context.string_of_task __context in
let all_members = Addr.all_members ~__context ~cluster:self in
let res =
Cluster_client.LocalClient.enable_tls_verification_prechecks
(rpc ~__context) dbg all_members
in
match Idl.IdM.run @@ Cluster_client.IDL.T.get res with
| Ok () ->
D.debug
"enable_tls_verification_prechecks: prechecks on self=%s were \
successful"
(Ref.short_string_of self)
| Error e ->
let err_str = match e with InternalError s -> s | Unix_error s -> s in
D.error
"enable_tls_verification_prechecks: prechecks on self=%s failed. ex: %s"
(Ref.short_string_of self) err_str ;
raise
Api_errors.(
Server_error
( internal_error
, [
"Failed to enable TLS verification because prechecks failed on \
the pool's cluster. It may be easiest to destroy your cluster \
and try again"
] ))