Skip to content

Commit

Permalink
add tenant sa token subsitute
Browse files Browse the repository at this point in the history
  • Loading branch information
MrGirl authored and zhangyuandao.zyd committed May 9, 2022
1 parent 784b9ad commit 469bac4
Show file tree
Hide file tree
Showing 13 changed files with 57 additions and 63 deletions.
2 changes: 0 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ jobs:
with:
version: ${{ env.GOLANGCI_VERSION }}
args: -v
skip-pkg-cache: true
skip-build-cache: true

markdownlint-misspell-shellcheck:
runs-on: ubuntu-18.04
Expand Down
2 changes: 2 additions & 0 deletions cmd/yurt-controller-manager/controller-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ import (
"time"

"k8s.io/component-base/logs"

// load all the prometheus client-go plugin
_ "k8s.io/component-base/metrics/prometheus/clientgo"

// for version metric registration
_ "k8s.io/component-base/metrics/prometheus/version"

Expand Down
2 changes: 0 additions & 2 deletions cmd/yurthub/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ type YurtHubConfiguration struct {
WorkingMode util.WorkingMode
KubeletHealthGracePeriod time.Duration
FilterManager *filter.Manager
DiskCachePath string
}

// Complete converts *options.YurtHubOptions to *YurtHubConfiguration
Expand Down Expand Up @@ -165,7 +164,6 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
YurtSharedFactory: yurtSharedFactory,
KubeletHealthGracePeriod: options.KubeletHealthGracePeriod,
FilterManager: filterManager,
DiskCachePath: options.DiskCachePath,
}

return cfg, nil
Expand Down
2 changes: 1 addition & 1 deletion cmd/yurthub/app/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error {
trace++

klog.Infof("%d. new tenant sa manager", trace)
tenantMgr := tenant.New(cfg.YurtHubCertOrganizations, cfg.SharedFactory)
tenantMgr := tenant.New(cfg.YurtHubCertOrganizations, cfg.SharedFactory, stopCh)
trace++

klog.Infof("%d. new reverse proxy handler for remote servers", trace)
Expand Down
4 changes: 2 additions & 2 deletions config/setup/yurthub.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ spec:
command:
- yurthub
- --v=2
- --server-addr=https://47.110.228.191:6443
- --server-addr=https://__kubernetes_master_address__
- --node-name=$(NODE_NAME)
- --join-token=pbquwo.ajmim4wek23efyzo
- --join-token=__bootstrap_token__
livenessProbe:
httpGet:
host: 127.0.0.1
Expand Down
18 changes: 13 additions & 5 deletions pkg/yurthub/proxy/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apiserver/pkg/authentication/serviceaccount"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/klog/v2"

"github.com/openyurtio/openyurt/pkg/yurthub/metrics"
"github.com/openyurtio/openyurt/pkg/yurthub/tenant"
jwt2 "github.com/openyurtio/openyurt/pkg/yurthub/tenant/jwt"
"github.com/openyurtio/openyurt/pkg/yurthub/util"
)

Expand Down Expand Up @@ -313,12 +313,20 @@ func WithSaTokenSubstitute(handler http.Handler, tenantMgr tenant.Interface) htt

klog.Errorf("invaled bearer token %s, err: %v", oldToken, err)
} else {
oldClaim := jwt2.BearerClaims{}
oldClaim := jwt.Claims{}

if err := jsonWebToken.UnsafeClaimsWithoutVerification(&oldClaim); err == nil {
if tenantMgr.GetTenantNs() != oldClaim.Namespace && oldClaim.Namespace == "kube-system" { // token is not from tenant's namespace
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", tenantMgr.GetTenantToken()))
klog.V(2).Infof("replace token, old: %s, new: %s", oldToken, tenantMgr.GetTenantToken())

if tenantNs, _, err := serviceaccount.SplitUsername(oldClaim.Subject); err == nil {

if tenantMgr.WaitForCacheSync() {
if tenantMgr.GetTenantNs() != tenantNs && tenantNs == "kube-system" { // token is not from tenant's namespace
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", tenantMgr.GetTenantToken()))
klog.V(2).Infof("replace token, old: %s, new: %s", oldToken, tenantMgr.GetTenantToken())
}
}
} else {
klog.Errorf("failed to parse tenant ns from token, token %s, sub: %s", oldToken, oldClaim.Subject)
}
}
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/yurthub/proxy/util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,8 @@ func TestWithSaTokenSubsitute(t *testing.T) {
resolver := newTestRequestInfoResolver()
orgs := []string{"system:bootstrappers:openyurt:tenant:myspace"}

tenantMgr := tenant.New(orgs, nil)
stopCh := make(<-chan struct{})
tenantMgr := tenant.New(orgs, nil, stopCh)

data := make(map[string][]byte)
data["token"] = []byte(tenantToken)
Expand Down Expand Up @@ -530,7 +531,8 @@ func TestWithSaTokenSubsituteTenantTokenEmpty(t *testing.T) {
resolver := newTestRequestInfoResolver()
orgs := []string{"system:bootstrappers:openyurt:tenant:myspace"}

tenantMgr := tenant.New(orgs, nil)
stopCh := make(<-chan struct{})
tenantMgr := tenant.New(orgs, nil, stopCh)

data := make(map[string][]byte)
data["token"] = []byte(tenantToken)
Expand All @@ -552,7 +554,6 @@ func TestWithSaTokenSubsituteTenantTokenEmpty(t *testing.T) {
var handler http.Handler = http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
rToken := req.Header.Get("Authorization")
if rToken == fmt.Sprintf("Bearer %s", tenantToken) {

needSubsitute = true
}

Expand Down
3 changes: 0 additions & 3 deletions pkg/yurthub/storage/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ import (
)

// CreateStorage create a storage.Store for backend storage
// Singleton mode
func CreateStorage(cachePath string) (storage.Store, error) {

return disk.NewDiskStorage(cachePath)

}
26 changes: 0 additions & 26 deletions pkg/yurthub/tenant/jwt/jwt.go

This file was deleted.

7 changes: 4 additions & 3 deletions pkg/yurthub/tenant/jwt/jwt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,19 @@ import (
"testing"

"gopkg.in/square/go-jose.v2/jwt"
"k8s.io/apiserver/pkg/authentication/serviceaccount"
)

func TestJwt(t *testing.T) {

bearerToken := "eyJhbGciOiJSUzI1NiIsImtpZCI6InVfTVZpZWIySUFUTzQ4NjlkM0VwTlBRb0xJOWVKUGg1ZXVzbEdaY0ZxckEifQ.eyJpc3MiOiJrdWJlcm5ldGVzL3NlcnZpY2VhY2NvdW50Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9uYW1lc3BhY2UiOiJrdWJlLXN5c3RlbSIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bnQvc2VjcmV0Lm5hbWUiOiJkZWZhdWx0LXRva2VuLXF3c2ZtIiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9zZXJ2aWNlLWFjY291bnQubmFtZSI6ImRlZmF1bHQiLCJrdWJlcm5ldGVzLmlvL3NlcnZpY2VhY2NvdW50L3NlcnZpY2UtYWNjb3VudC51aWQiOiI4M2EwMzc4ZS1mY2UxLTRmZDEtOGI1NC00MTE2MjUzYzNkYWMiLCJzdWIiOiJzeXN0ZW06c2VydmljZWFjY291bnQ6a3ViZS1zeXN0ZW06ZGVmYXVsdCJ9.sFpHHg4o88Z0CBJseMBvBeP00bS5isLBmQJpAOiYs3BTkEAD63YLTnDURt0r3I9QjtcP0DZAb5wSOccGChMAFVtxMIoIoZC6Mk4FSB720kawRxFVujNFR1T7uVV_dbpEU-wsxSb9-Y4ILVknuJR9t35x6lUbRkUE9tN1wDy4DH296C3gEGNJf8sbJMERZzOckc82_BamlCzaieo1nX396KafxdQGVIgxstx88hm_rgpjDy3LA1GNsx6x2pqXdzZ8mufQt7sTljRorXUk-rNU6y9wX2RvIMO8tNiPClNkdIpgpmeQo-g7XZivpEeq3VzoeExphRbusgCtO9T9tgU64w"
var bearerClaims = BearerClaims{}
var bearerClaims = jwt.Claims{}

if token, err := jwt.ParseSigned(bearerToken); err == nil {
if err := token.UnsafeClaimsWithoutVerification(&bearerClaims); err == nil {

if bearerClaims.Namespace != "" {
t.Logf("succeed to parse toke, ns: %s, sa: %s", bearerClaims.Namespace, bearerClaims.ServiceAccountName)
if tenantNs, username, err := serviceaccount.SplitUsername(bearerClaims.Subject); err == nil {
t.Logf("succeed to parse toke, ns: %s, username: %s", tenantNs, username)
} else {
t.Errorf("failed to parse jwt token, %v", err)
}
Expand Down
41 changes: 28 additions & 13 deletions pkg/yurthub/tenant/tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ limitations under the License.
package tenant

import (
"sync"

v1 "k8s.io/api/core/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
Expand All @@ -29,42 +31,57 @@ type Interface interface {

GetTenantToken() string

InformerSynced() cache.InformerSynced
WaitForCacheSync() bool

SetSecret(sec *v1.Secret)
}

type tenantManager struct {
Informer cache.SharedIndexInformer

secretSynced cache.InformerSynced

TenantSecret *v1.Secret

TenantNs string

StopCh <-chan struct{}

IsSynced bool

mutex sync.Mutex
}

func New(orgs []string, factory informers.SharedInformerFactory) Interface {
func (mgr *tenantManager) WaitForCacheSync() bool {

tenantNs := util.ParseTenantNsFromOrgs(orgs)
if mgr.IsSynced || mgr.TenantSecret != nil { //try to do sync for just one time, fast return
return true
}

mgr.mutex.Lock()
defer mgr.mutex.Unlock()

mgr.IsSynced = cache.WaitForCacheSync(mgr.StopCh, mgr.secretSynced)

return mgr.IsSynced
}

func New(orgs []string, factory informers.SharedInformerFactory, stopCh <-chan struct{}) Interface {

tenantNs := util.ParseTenantNsFromOrgs(orgs)
klog.Infof("parse tenant ns: %s", tenantNs)
if tenantNs == "" {
return nil
}

tenantMgr := &tenantManager{TenantNs: tenantNs}
tenantMgr := &tenantManager{TenantNs: tenantNs, StopCh: stopCh}

if factory != nil {
tenantMgr.Informer = factory.InformerFor(&v1.Secret{}, nil) //get registered secret informer
tenantMgr.secretSynced = tenantMgr.Informer.HasSynced
informer := factory.InformerFor(&v1.Secret{}, nil) //get registered secret informer

//add handlers
tenantMgr.Informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: tenantMgr.addSecret,
UpdateFunc: tenantMgr.updateSecret,
DeleteFunc: tenantMgr.deleteSecret})

}

return tenantMgr
Expand All @@ -78,9 +95,7 @@ func (mgr *tenantManager) GetTenantToken() string {
return ""
}

tokenStr := string(mgr.TenantSecret.Data["token"])

return tokenStr
return string(mgr.TenantSecret.Data["token"])
}

func (mgr *tenantManager) GetTenantNs() string {
Expand Down
4 changes: 2 additions & 2 deletions pkg/yurthub/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,14 +473,14 @@ func NewGZipReaderCloser(header http.Header, body io.ReadCloser, req *http.Reque
body: body,
}, true
}

func ParseTenantNs(certOrg string) string {

if !strings.Contains(certOrg, "openyurt:tenant:") {
return ""
}

idx := strings.LastIndex(certOrg, "openyurt:tenant:") + len("openyurt:tenant:")
return certOrg[idx:]
return strings.TrimPrefix(certOrg, "openyurt:tenant:")
}

func ParseTenantNsFromOrgs(orgs []string) string {
Expand Down
2 changes: 1 addition & 1 deletion pkg/yurthub/util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func TestParseTenantNs(t *testing.T) {

testCases := map[string]string{
"a": "",
"system:bootstrappers:openyurt:tenant:myspace": "myspace",
"openyurt:tenant:myspace": "myspace",
}

for k, v := range testCases {
Expand Down

0 comments on commit 469bac4

Please sign in to comment.