Skip to content

Commit

Permalink
add HCP integration component (#14723)
Browse files Browse the repository at this point in the history
* add HCP integration

* lint: use non-deprecated logging interface
  • Loading branch information
nickethier authored Sep 26, 2022
1 parent aa4709a commit 1c1b099
Show file tree
Hide file tree
Showing 40 changed files with 2,633 additions and 116 deletions.
3 changes: 3 additions & 0 deletions .changelog/14723.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
agent/hcp: add initial HashiCorp Cloud Platform integration
```
43 changes: 42 additions & 1 deletion agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ import (
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/hcp-scada-provider/capability"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
"google.golang.org/grpc"

"github.com/hashicorp/consul/acl"
Expand All @@ -40,6 +42,8 @@ import (
"github.com/hashicorp/consul/agent/consul/servercert"
"github.com/hashicorp/consul/agent/dns"
external "github.com/hashicorp/consul/agent/grpc-external"
"github.com/hashicorp/consul/agent/hcp/scada"
libscada "github.com/hashicorp/consul/agent/hcp/scada"
"github.com/hashicorp/consul/agent/local"
"github.com/hashicorp/consul/agent/proxycfg"
proxycfgglue "github.com/hashicorp/consul/agent/proxycfg-glue"
Expand Down Expand Up @@ -382,6 +386,10 @@ type Agent struct {
// xdsServer serves the XDS protocol for configuring Envoy proxies.
xdsServer *xds.Server

// scadaProvider is set when HashiCorp Cloud Platform integration is configured and exposes the agent's API over
// an encrypted session to HCP
scadaProvider scada.Provider

// enterpriseAgent embeds fields that we only access in consul-enterprise builds
enterpriseAgent
}
Expand Down Expand Up @@ -428,6 +436,7 @@ func New(bd BaseDeps) (*Agent, error) {
config: bd.RuntimeConfig,
cache: bd.Cache,
routineManager: routine.NewManager(bd.Logger),
scadaProvider: bd.HCP.Provider,
}

// TODO: create rpcClientHealth in BaseDeps once NetRPC is available without Agent
Expand Down Expand Up @@ -769,6 +778,17 @@ func (a *Agent) Start(ctx context.Context) error {
}()
}

if a.scadaProvider != nil {
a.scadaProvider.UpdateMeta(map[string]string{
"consul_server_id": string(a.config.NodeID),
})

if err = a.scadaProvider.Start(); err != nil {
a.baseDeps.Logger.Error("scada provider failed to start, some HashiCorp Cloud Platform functionality has been disabled",
"error", err, "resource_id", a.config.Cloud.ResourceID)
}
}

return nil
}

Expand Down Expand Up @@ -954,6 +974,12 @@ func (a *Agent) startListeners(addrs []net.Addr) ([]net.Listener, error) {
}
l = &tcpKeepAliveListener{l.(*net.TCPListener)}

case *capability.Addr:
l, err = a.scadaProvider.Listen(x.Capability())
if err != nil {
return nil, err
}

default:
closeAll()
return nil, fmt.Errorf("unsupported address type %T", addr)
Expand Down Expand Up @@ -1011,6 +1037,11 @@ func (a *Agent) listenHTTP() ([]apiServer, error) {
MaxHeaderBytes: a.config.HTTPMaxHeaderBytes,
}

if libscada.IsCapability(l.Addr()) {
// wrap in http2 server handler
httpServer.Handler = h2c.NewHandler(srv.handler(a.config.EnableDebug), &http2.Server{})
}

// Load the connlimit helper into the server
connLimitFn := a.httpConnLimiter.HTTPConnStateFuncWithDefault429Handler(10 * time.Millisecond)

Expand All @@ -1027,7 +1058,12 @@ func (a *Agent) listenHTTP() ([]apiServer, error) {
return nil
}

if err := start("http", a.config.HTTPAddrs); err != nil {
httpAddrs := a.config.HTTPAddrs
if a.config.IsCloudEnabled() {
httpAddrs = append(httpAddrs, scada.CAPCoreAPI)
}

if err := start("http", httpAddrs); err != nil {
closeListeners(ln)
return nil, err
}
Expand Down Expand Up @@ -1582,6 +1618,11 @@ func (a *Agent) ShutdownAgent() error {

a.rpcClientHealth.Close()

// Shutdown SCADA provider
if a.scadaProvider != nil {
a.scadaProvider.Stop()
}

var err error
if a.delegate != nil {
err = a.delegate.Shutdown()
Expand Down
65 changes: 65 additions & 0 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,14 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/google/tcpproxy"
"github.com/hashicorp/consul/agent/hcp"
"github.com/hashicorp/consul/agent/hcp/scada"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/hcp-scada-provider/capability"
"github.com/hashicorp/serf/coordinate"
"github.com/hashicorp/serf/serf"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
Expand Down Expand Up @@ -6049,6 +6053,67 @@ peering {
})
}

func TestAgent_startListeners_scada(t *testing.T) {
t.Parallel()
pvd := scada.NewMockProvider(t)
c := capability.NewAddr("testcap")
pvd.EXPECT().Listen(c.Capability()).Return(nil, nil).Once()
bd := BaseDeps{
Deps: consul.Deps{
Logger: hclog.NewInterceptLogger(nil),
Tokens: new(token.Store),
GRPCConnPool: &fakeGRPCConnPool{},
HCP: hcp.Deps{
Provider: pvd,
},
},
RuntimeConfig: &config.RuntimeConfig{},
Cache: cache.New(cache.Options{}),
}

bd, err := initEnterpriseBaseDeps(bd, nil)
require.NoError(t, err)

agent, err := New(bd)
require.NoError(t, err)

_, err = agent.startListeners([]net.Addr{c})
require.NoError(t, err)
}

func TestAgent_scadaProvider(t *testing.T) {
t.Parallel()

pvd := scada.NewMockProvider(t)

// this listener is used when mocking out the scada provider
l, err := net.Listen("tcp4", fmt.Sprintf("127.0.0.1:%d", freeport.GetOne(t)))
require.NoError(t, err)
defer require.NoError(t, l.Close())

pvd.EXPECT().UpdateMeta(mock.Anything).Once()
pvd.EXPECT().Start().Return(nil).Once()
pvd.EXPECT().Listen(scada.CAPCoreAPI.Capability()).Return(l, nil).Once()
pvd.EXPECT().Stop().Return(nil).Once()
pvd.EXPECT().SessionStatus().Return("test").Once()
a := TestAgent{
OverrideDeps: func(deps *BaseDeps) {
deps.HCP.Provider = pvd
},
Overrides: `
cloud {
resource_id = "organization/0b9de9a3-8403-4ca6-aba8-fca752f42100/project/0b9de9a3-8403-4ca6-aba8-fca752f42100/consul.cluster/0b9de9a3-8403-4ca6-aba8-fca752f42100"
client_id = "test"
client_secret = "test"
}`,
}
defer a.Shutdown()
require.NoError(t, a.Start(t))

_, err = api.NewClient(&api.Config{Address: l.Addr().String()})
require.NoError(t, err)
}

func getExpectedCaPoolByFile(t *testing.T) *x509.CertPool {
pool := x509.NewCertPool()
data, err := ioutil.ReadFile("../test/ca/root.cer")
Expand Down
16 changes: 16 additions & 0 deletions agent/config/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

"github.com/armon/go-metrics/prometheus"
hcpconfig "github.com/hashicorp/consul/agent/hcp/config"
"github.com/hashicorp/go-bexpr"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-multierror"
Expand Down Expand Up @@ -959,6 +960,7 @@ func (b *builder) build() (rt RuntimeConfig, err error) {
AutoEncryptIPSAN: autoEncryptIPSAN,
AutoEncryptAllowTLS: autoEncryptAllowTLS,
AutoConfig: autoConfig,
Cloud: b.cloudConfigVal(c.Cloud),
ConnectEnabled: connectEnabled,
ConnectCAProvider: connectCAProvider,
ConnectCAConfig: connectCAConfig,
Expand Down Expand Up @@ -2446,6 +2448,20 @@ func validateAutoConfigAuthorizer(rt RuntimeConfig) error {
return nil
}

func (b *builder) cloudConfigVal(v *CloudConfigRaw) (val hcpconfig.CloudConfig) {
if v == nil {
return val
}

val.ResourceID = stringVal(v.ResourceID)
val.ClientID = stringVal(v.ClientID)
val.ClientSecret = stringVal(v.ClientSecret)
val.AuthURL = stringVal(v.AuthURL)
val.Hostname = stringVal(v.Hostname)

return val
}

// decodeBytes returns the encryption key decoded.
func decodeBytes(key string) ([]byte, error) {
return base64.StdEncoding.DecodeString(key)
Expand Down
9 changes: 9 additions & 0 deletions agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ type Config struct {
CheckUpdateInterval *string `mapstructure:"check_update_interval"`
Checks []CheckDefinition `mapstructure:"checks"`
ClientAddr *string `mapstructure:"client_addr"`
Cloud *CloudConfigRaw `mapstructure:"cloud"`
ConfigEntries ConfigEntries `mapstructure:"config_entries"`
AutoEncrypt AutoEncrypt `mapstructure:"auto_encrypt"`
Connect Connect `mapstructure:"connect"`
Expand Down Expand Up @@ -859,6 +860,14 @@ type RPC struct {
EnableStreaming *bool `mapstructure:"enable_streaming"`
}

type CloudConfigRaw struct {
ResourceID *string `mapstructure:"resource_id"`
ClientID *string `mapstructure:"client_id"`
ClientSecret *string `mapstructure:"client_secret"`
Hostname *string `mapstructure:"hostname"`
AuthURL *string `mapstructure:"auth_url"`
}

type TLSProtocolConfig struct {
CAFile *string `mapstructure:"ca_file"`
CAPath *string `mapstructure:"ca_path"`
Expand Down
11 changes: 11 additions & 0 deletions agent/config/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/dns"
hcpconfig "github.com/hashicorp/consul/agent/hcp/config"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/api"
Expand Down Expand Up @@ -157,6 +158,11 @@ type RuntimeConfig struct {
// hcl: autopilot { upgrade_version_tag = string }
AutopilotUpgradeVersionTag string

// Cloud contains configuration for agents to connect to HCP.
//
// hcl: cloud { ... }
Cloud hcpconfig.CloudConfig

// DNSAllowStale is used to enable lookups with stale
// data. This gives horizontal read scalability since
// any Consul server can service the query instead of
Expand Down Expand Up @@ -1679,6 +1685,11 @@ func (c *RuntimeConfig) Sanitized() map[string]interface{} {
return sanitize("rt", reflect.ValueOf(c)).Interface().(map[string]interface{})
}

// IsCloudEnabled returns true if a cloud.resource_id is set and the server mode is enabled
func (c *RuntimeConfig) IsCloudEnabled() bool {
return c.ServerMode && c.Cloud.ResourceID != ""
}

// isSecret determines whether a field name represents a field which
// may contain a secret.
func isSecret(name string) bool {
Expand Down
89 changes: 51 additions & 38 deletions agent/config/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/armon/go-metrics/prometheus"
"github.com/google/go-cmp/cmp/cmpopts"
hcpconfig "github.com/hashicorp/consul/agent/hcp/config"
"github.com/stretchr/testify/require"

"github.com/hashicorp/consul/acl"
Expand Down Expand Up @@ -5989,44 +5990,51 @@ func TestLoad_FullConfig(t *testing.T) {
},
ConnectMeshGatewayWANFederationEnabled: false,
ConnectServerlessPluginEnabled: true,
DNSAddrs: []net.Addr{tcpAddr("93.95.95.81:7001"), udpAddr("93.95.95.81:7001")},
DNSARecordLimit: 29907,
DNSAllowStale: true,
DNSDisableCompression: true,
DNSDomain: "7W1xXSqd",
DNSAltDomain: "1789hsd",
DNSEnableTruncate: true,
DNSMaxStale: 29685 * time.Second,
DNSNodeTTL: 7084 * time.Second,
DNSOnlyPassing: true,
DNSPort: 7001,
DNSRecursorStrategy: "sequential",
DNSRecursorTimeout: 4427 * time.Second,
DNSRecursors: []string{"63.38.39.58", "92.49.18.18"},
DNSSOA: RuntimeSOAConfig{Refresh: 3600, Retry: 600, Expire: 86400, Minttl: 0},
DNSServiceTTL: map[string]time.Duration{"*": 32030 * time.Second},
DNSUDPAnswerLimit: 29909,
DNSNodeMetaTXT: true,
DNSUseCache: true,
DNSCacheMaxAge: 5 * time.Minute,
DataDir: dataDir,
Datacenter: "rzo029wg",
DefaultQueryTime: 16743 * time.Second,
DisableAnonymousSignature: true,
DisableCoordinates: true,
DisableHostNodeID: true,
DisableHTTPUnprintableCharFilter: true,
DisableKeyringFile: true,
DisableRemoteExec: true,
DisableUpdateCheck: true,
DiscardCheckOutput: true,
DiscoveryMaxStale: 5 * time.Second,
EnableAgentTLSForChecks: true,
EnableCentralServiceConfig: false,
EnableDebug: true,
EnableRemoteScriptChecks: true,
EnableLocalScriptChecks: true,
EncryptKey: "A4wELWqH",
Cloud: hcpconfig.CloudConfig{
ResourceID: "N43DsscE",
ClientID: "6WvsDZCP",
ClientSecret: "lCSMHOpB",
Hostname: "DH4bh7aC",
AuthURL: "332nCdR2",
},
DNSAddrs: []net.Addr{tcpAddr("93.95.95.81:7001"), udpAddr("93.95.95.81:7001")},
DNSARecordLimit: 29907,
DNSAllowStale: true,
DNSDisableCompression: true,
DNSDomain: "7W1xXSqd",
DNSAltDomain: "1789hsd",
DNSEnableTruncate: true,
DNSMaxStale: 29685 * time.Second,
DNSNodeTTL: 7084 * time.Second,
DNSOnlyPassing: true,
DNSPort: 7001,
DNSRecursorStrategy: "sequential",
DNSRecursorTimeout: 4427 * time.Second,
DNSRecursors: []string{"63.38.39.58", "92.49.18.18"},
DNSSOA: RuntimeSOAConfig{Refresh: 3600, Retry: 600, Expire: 86400, Minttl: 0},
DNSServiceTTL: map[string]time.Duration{"*": 32030 * time.Second},
DNSUDPAnswerLimit: 29909,
DNSNodeMetaTXT: true,
DNSUseCache: true,
DNSCacheMaxAge: 5 * time.Minute,
DataDir: dataDir,
Datacenter: "rzo029wg",
DefaultQueryTime: 16743 * time.Second,
DisableAnonymousSignature: true,
DisableCoordinates: true,
DisableHostNodeID: true,
DisableHTTPUnprintableCharFilter: true,
DisableKeyringFile: true,
DisableRemoteExec: true,
DisableUpdateCheck: true,
DiscardCheckOutput: true,
DiscoveryMaxStale: 5 * time.Second,
EnableAgentTLSForChecks: true,
EnableCentralServiceConfig: false,
EnableDebug: true,
EnableRemoteScriptChecks: true,
EnableLocalScriptChecks: true,
EncryptKey: "A4wELWqH",
StaticRuntimeConfig: StaticRuntimeConfig{
EncryptVerifyIncoming: true,
EncryptVerifyOutgoing: true,
Expand Down Expand Up @@ -6771,6 +6779,11 @@ func TestRuntimeConfig_Sanitize(t *testing.T) {
EntryFetchMaxBurst: 42,
EntryFetchRate: 0.334,
},
Cloud: hcpconfig.CloudConfig{
ResourceID: "cluster1",
ClientID: "id",
ClientSecret: "secret",
},
ConsulCoordinateUpdatePeriod: 15 * time.Second,
RaftProtocol: 3,
RetryJoinLAN: []string{
Expand Down
Loading

0 comments on commit 1c1b099

Please sign in to comment.