Skip to content

Commit

Permalink
Merge pull request #1937 from openziti/fix-create-terminator-ha-loop
Browse files Browse the repository at this point in the history
Don't use EdgeSessionRemoved tracking for HA sessions. Fixes #1936
  • Loading branch information
plorenz authored Apr 16, 2024
2 parents b0f91af + dd6b923 commit 4085c32
Show file tree
Hide file tree
Showing 12 changed files with 67 additions and 17 deletions.
1 change: 0 additions & 1 deletion controller/env/appenv.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,7 +838,6 @@ func (ae *AppEnv) ControllersKeyFunc(token *jwt.Token) (interface{}, error) {

func (ae *AppEnv) GetControllerPublicKey(kid string) crypto.PublicKey {
signers := ae.Broker.GetPublicKeys()
pfxlog.Logger().Info("looking for signer: " + kid)
return signers[kid]
}

Expand Down
4 changes: 2 additions & 2 deletions controller/internal/routes/session_api_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import (
"github.com/openziti/edge-api/rest_model"
"github.com/openziti/ziti/controller/env"
"github.com/openziti/ziti/controller/model"
"github.com/openziti/ziti/controller/response"
"github.com/openziti/ziti/controller/models"
"github.com/openziti/ziti/controller/response"
)

const EntityNameSession = "sessions"
Expand Down Expand Up @@ -172,7 +172,7 @@ func getSessionEdgeRouters(ae *env.AppEnv, ns *model.Session) ([]*rest_model.Ses
Urls: state.Protocols,
}

pfxlog.Logger().Debugf("Returning %+v to %+v, with urls: %+v", edgeRouter, restModel, restModel.Urls)
pfxlog.Logger().Debugf("Returning %+v to %+v, with urls: %+v", edgeRouter, restModel, restModel.SupportedProtocols)
edgeRouters = append(edgeRouters, restModel)
}

Expand Down
4 changes: 4 additions & 0 deletions controller/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,10 @@ func (self *Controller) Dispatch(cmd command.Command) error {
return err
}

if self.GetLeaderAddr() == "" {
return errors.New("unable to execute command, cluster has no leader")
}

log.WithField("cmd", reflect.TypeOf(cmd)).WithField("dest", self.GetLeaderAddr()).Info("forwarding command")

peer, err := self.GetMesh().GetOrConnectPeer(self.GetLeaderAddr(), 5*time.Second)
Expand Down
5 changes: 5 additions & 0 deletions router/state/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/openziti/ziti/controller/oidc_auth"
"github.com/openziti/ziti/router"
"github.com/openziti/ziti/router/env"
"github.com/openziti/ziti/router/xgress_common"
cmap "github.com/orcaman/concurrent-map/v2"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -464,6 +465,10 @@ func (sm *ManagerImpl) MarkSessionRecentlyRemoved(token string) {
}

func (sm *ManagerImpl) AddEdgeSessionRemovedListener(token string, callBack func(token string)) RemoveListener {
if xgress_common.IsBearerToken(token) {
return func() {}
}

if sm.recentlyRemovedSessions.Has(token) {
go callBack(token) // callback can be long process with network traffic. Don't block event processing
return func() {}
Expand Down
25 changes: 25 additions & 0 deletions router/xgress_common/edge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
Copyright NetFoundry Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package xgress_common

import "strings"

const JwtTokenPrefix = "ey"

func IsBearerToken(s string) bool {
return strings.HasPrefix(s, JwtTokenPrefix)
}
2 changes: 0 additions & 2 deletions router/xgress_edge/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ import (
"github.com/openziti/ziti/router/state"
)

const JwtTokenPrefix = "ey"

type sessionConnectionHandler struct {
stateManager state.Manager
options *Options
Expand Down
4 changes: 2 additions & 2 deletions router/xgress_edge/hosted.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ import (
"github.com/openziti/ziti/controller/command"
routerEnv "github.com/openziti/ziti/router/env"
"github.com/openziti/ziti/router/state"
"github.com/openziti/ziti/router/xgress_common"
cmap "github.com/orcaman/concurrent-map/v2"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
"strings"
"sync/atomic"
"time"
)
Expand Down Expand Up @@ -550,7 +550,7 @@ func (self *hostedServiceRegistry) establishTerminator(terminator *edgeTerminato
InstanceSecret: terminator.instanceSecret,
}

if self.stateManager.GetConfig().Ha.Enabled && strings.HasPrefix(request.SessionToken, JwtTokenPrefix) {
if self.stateManager.GetConfig().Ha.Enabled && xgress_common.IsBearerToken(request.SessionToken) {
apiSession := self.stateManager.GetApiSessionFromCh(terminator.Channel)

if apiSession == nil {
Expand Down
9 changes: 4 additions & 5 deletions router/xgress_edge/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"
"github.com/openziti/ziti/common/ctrl_msg"
"github.com/openziti/ziti/controller/idgen"
"strings"
"time"

"github.com/openziti/ziti/common/capabilities"
Expand Down Expand Up @@ -178,7 +177,7 @@ func (self *edgeClientConn) processConnect(manager state.Manager, req *channel.M
PeerData: peerData,
}

if manager.GetConfig().Ha.Enabled && strings.HasPrefix(sessionToken, JwtTokenPrefix) {
if manager.GetConfig().Ha.Enabled && xgress_common.IsBearerToken(sessionToken) {
apiSession := manager.GetApiSessionFromCh(ch)

if apiSession == nil {
Expand Down Expand Up @@ -352,7 +351,7 @@ func (self *edgeClientConn) processBindV1(manager state.Manager, req *channel.Me
InstanceSecret: terminatorIdentitySecret,
}

if manager.GetConfig().Ha.Enabled && strings.HasPrefix(sessionToken, JwtTokenPrefix) {
if manager.GetConfig().Ha.Enabled && xgress_common.IsBearerToken(sessionToken) {
apiSession := manager.GetApiSessionFromCh(ch)

if apiSession == nil {
Expand Down Expand Up @@ -579,7 +578,7 @@ func (self *edgeClientConn) processUpdateBind(manager state.Manager, req *channe
TerminatorId: terminator.terminatorId,
}

if manager.GetConfig().Ha.Enabled && strings.HasPrefix(sessionToken, JwtTokenPrefix) {
if manager.GetConfig().Ha.Enabled && xgress_common.IsBearerToken(sessionToken) {
apiSession := manager.GetApiSessionFromCh(ch)
request.ApiSessionToken = apiSession.Token
}
Expand Down Expand Up @@ -646,7 +645,7 @@ func (self *edgeClientConn) processHealthEvent(manager state.Manager, req *chann

log = log.WithField("terminator", terminator.terminatorId).WithField("checkPassed", checkPassed)

if manager.GetConfig().Ha.Enabled && strings.HasPrefix(sessionToken, JwtTokenPrefix) {
if manager.GetConfig().Ha.Enabled && xgress_common.IsBearerToken(sessionToken) {
apiSession := manager.GetApiSessionFromCh(ch)
request.ApiSessionToken = apiSession.Token
}
Expand Down
2 changes: 1 addition & 1 deletion tunnel/entities/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ type Service struct {
func (self *Service) GetConfigOfType(configType string, target interface{}) (bool, error) {
configMap, found := self.Config[configType]
if !found {
pfxlog.Logger().Debugf("no service config of type %v defined for service %v", configType, self.Name)
pfxlog.Logger().Debugf("no service config of type %v defined for service %v", configType, *self.Name)
return false, nil
}
decoder, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
Expand Down
6 changes: 6 additions & 0 deletions ziti/tunnel/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func NewTunnelCmd(standalone bool) *cobra.Command {
root.PersistentFlags().BoolVar(&cliAgentEnabled, "cli-agent", true, "Enable/disable CLI Agent (enabled by default)")
root.PersistentFlags().StringVar(&cliAgentAddr, "cli-agent-addr", "", "Specify where CLI Agent should list (ex: unix:/tmp/myfile.sock or tcp:127.0.0.1:10001)")
root.PersistentFlags().StringVar(&cliAgentAlias, "cli-agent-alias", "", "Alias which can be used by ziti agent commands to find this instance")
root.PersistentFlags().BoolVar(&ha, "ha", false, "Enable HA controller compatibility")

root.AddCommand(NewHostCmd())
root.AddCommand(NewProxyCmd())
Expand All @@ -86,6 +87,7 @@ var logFormatter string
var cliAgentEnabled bool
var cliAgentAddr string
var cliAgentAlias string
var ha bool

func rootPreRun(cmd *cobra.Command, _ []string) {
verbose, err := cmd.Flags().GetBool("verbose")
Expand Down Expand Up @@ -205,6 +207,10 @@ func startIdentity(cmd *cobra.Command, serviceListenerGroup *intercept.ServiceLi
pfxlog.Logger().WithError(err).Fatal("could not create ziti sdk context")
}

if ha {
rootPrivateContext.(*ziti.ContextImpl).CtrlClt.SetUseOidc(true)
}

for {
if err = rootPrivateContext.Authenticate(); err != nil {
log.WithError(err).Error("failed to authenticate")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,10 @@ func getUniqueId() string {
}

var Model = &model.Model{
Id: "simple-transfer",
Id: "smoketest",
Scope: model.Scope{
Defaults: model.Variables{
"environment": "simple-transfer-smoketest" + getUniqueId(),
"environment": "smoketest" + getUniqueId(),
"credentials": model.Variables{
"aws": model.Variables{
"managed_key": true,
Expand All @@ -76,6 +76,12 @@ var Model = &model.Model{
for _, host := range m.SelectHosts("component.ha") {
delete(host.Region.Hosts, host.Id)
}
} else {
for _, component := range m.SelectComponents("*") {
if ztType, ok := component.Type.(*zitilab.ZitiTunnelType); ok {
ztType.HA = true
}
}
}
return nil
}),
Expand Down Expand Up @@ -255,6 +261,8 @@ var Model = &model.Model{
"start": actions.NewStartAction(),
"stop": model.Bind(component.StopInParallel("*", 15)),
"login": model.Bind(edge.Login("#ctrl1")),
"login2": model.Bind(edge.Login("#ctrl2")),
"login3": model.Bind(edge.Login("#ctrl3")),
},

Infrastructure: model.Stages{
Expand Down
10 changes: 8 additions & 2 deletions zititest/zitilab/component_ziti_tunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type ZitiTunnelType struct {
Version string
LocalPath string
ConfigPathF func(c *model.Component) string
HA bool
}

func (self *ZitiTunnelType) Label() string {
Expand Down Expand Up @@ -126,8 +127,13 @@ func (self *ZitiTunnelType) Start(_ model.Run, c *model.Component) error {
useSudo = "sudo"
}

serviceCmd := fmt.Sprintf("%s %s tunnel %s -v --cli-agent-alias %s --log-formatter pfxlog -i %s > %s 2>&1 &",
useSudo, binaryPath, mode.String(), c.Id, configPath, logsPath)
ha := ""
if self.HA {
ha = "--ha"
}

serviceCmd := fmt.Sprintf("%s %s tunnel %s -v %s --cli-agent-alias %s --log-formatter pfxlog -i %s > %s 2>&1 &",
useSudo, binaryPath, mode.String(), ha, c.Id, configPath, logsPath)

value, err := c.Host.ExecLogged(
"rm -f "+logsPath,
Expand Down

0 comments on commit 4085c32

Please sign in to comment.