Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement schema API #601

Merged
merged 13 commits into from
Aug 5, 2021
Next Next commit
Implement plugin interface
  • Loading branch information
fgksgf committed Aug 4, 2021
commit 3d1956dc09ee74a5adfd5e99c33ad260e6bea078
9 changes: 8 additions & 1 deletion pkg/apisix/apisix.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ type Cluster interface {
Consumer() Consumer
// HealthCheck checks apisix cluster health in realtime.
HealthCheck(context.Context) error
// Plugin returns a Plugin interface that can operate Plugin resources.
Plugin() Plugin
}

// Route is the specific client interface to take over the create, update,
Expand Down Expand Up @@ -106,7 +108,7 @@ type GlobalRule interface {
Update(context.Context, *v1.GlobalRule) (*v1.GlobalRule, error)
}

// Consumer it the specific client interface to take over the create, update,
// Consumer is the specific client interface to take over the create, update,
// list and delete for APISIX Consumer resource.
type Consumer interface {
Get(context.Context, string) (*v1.Consumer, error)
Expand All @@ -116,6 +118,11 @@ type Consumer interface {
Update(context.Context, *v1.Consumer) (*v1.Consumer, error)
}

// Plugin is the specific client interface to fetch APISIX Plugin resource.
type Plugin interface {
Get(context.Context, string) (*v1.Schema, error)
}

type apisix struct {
mu sync.RWMutex
nonExistentCluster Cluster
Expand Down
6 changes: 6 additions & 0 deletions pkg/apisix/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type Cache interface {
InsertGlobalRule(*v1.GlobalRule) error
// InsertConsumer adds or updates consumer to cache.
InsertConsumer(*v1.Consumer) error
// InsertSchema adds or updates schema to cache.
InsertSchema(*v1.Schema) error

// GetRoute finds the route from cache according to the primary index (id).
GetRoute(string) (*v1.Route, error)
Expand All @@ -48,6 +50,8 @@ type Cache interface {
GetGlobalRule(string) (*v1.GlobalRule, error)
// GetConsumer finds the consumer from cache according to the primary index (id).
GetConsumer(string) (*v1.Consumer, error)
// GetSchema finds the scheme from cache according to the primary index (id).
GetSchema(string) (*v1.Schema, error)

// ListRoutes lists all routes in cache.
ListRoutes() ([]*v1.Route, error)
Expand All @@ -74,4 +78,6 @@ type Cache interface {
DeleteGlobalRule(*v1.GlobalRule) error
// DeleteConsumer deletes the specified consumer in cache.
DeleteConsumer(*v1.Consumer) error
// DeleteSchema deletes the specified schema in cache.
DeleteSchema(*v1.Schema) error
}
16 changes: 16 additions & 0 deletions pkg/apisix/cache/memdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ func (c *dbCache) InsertConsumer(consumer *v1.Consumer) error {
return c.insert("consumer", consumer.DeepCopy())
}

func (c *dbCache) InsertSchema(schema *v1.Schema) error {
return c.insert("schema", schema.DeepCopy())
}

func (c *dbCache) insert(table string, obj interface{}) error {
txn := c.db.Txn(true)
defer txn.Abort()
Expand Down Expand Up @@ -128,6 +132,14 @@ func (c *dbCache) GetConsumer(username string) (*v1.Consumer, error) {
return obj.(*v1.Consumer).DeepCopy(), nil
}

func (c *dbCache) GetSchema(name string) (*v1.Schema, error) {
obj, err := c.get("schema", name)
if err != nil {
return nil, err
}
return obj.(*v1.Schema).DeepCopy(), nil
}

func (c *dbCache) get(table, id string) (interface{}, error) {
txn := c.db.Txn(false)
defer txn.Abort()
Expand Down Expand Up @@ -257,6 +269,10 @@ func (c *dbCache) DeleteConsumer(consumer *v1.Consumer) error {
return c.delete("consumer", consumer)
}

func (c *dbCache) DeleteSchema(schema *v1.Schema) error {
return c.delete("schema", schema)
}

func (c *dbCache) delete(table string, obj interface{}) error {
txn := c.db.Txn(true)
defer txn.Abort()
Expand Down
10 changes: 10 additions & 0 deletions pkg/apisix/cache/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,16 @@ var (
},
},
},
"schema": {
Name: "schema",
Indexes: map[string]*memdb.IndexSchema{
"id": {
Name: "id",
Unique: true,
Indexer: &memdb.StringFieldIndex{Field: "Name"},
},
},
},
},
}
)
7 changes: 7 additions & 0 deletions pkg/apisix/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ type cluster struct {
streamRoute StreamRoute
globalRules GlobalRule
consumer Consumer
plugin Plugin
}

func newCluster(o *ClusterOptions) (Cluster, error) {
Expand Down Expand Up @@ -124,6 +125,7 @@ func newCluster(o *ClusterOptions) (Cluster, error) {
c.streamRoute = newStreamRouteClient(c)
c.globalRules = newGlobalRuleClient(c)
c.consumer = newConsumerClient(c)
c.plugin = newPluginClient(c)

c.cache, err = cache.NewMemDBCache()
if err != nil {
Expand Down Expand Up @@ -331,6 +333,11 @@ func (c *cluster) Consumer() Consumer {
return c.consumer
}

// Plugin implements Cluster.Plugin method.
func (c *cluster) Plugin() Plugin {
return c.plugin
}

// HealthCheck implements Cluster.HealthCheck method.
func (c *cluster) HealthCheck(ctx context.Context) (err error) {
if c.cacheSyncErr != nil {
Expand Down
15 changes: 15 additions & 0 deletions pkg/apisix/nonexistentclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func newNonExistentCluster() *nonExistentCluster {
streamRoute: &dummyStreamRoute{},
globalRule: &dummyGlobalRule{},
consumer: &dummyConsumer{},
plugin: &dummyPlugin{},
},
}
}
Expand All @@ -46,6 +47,7 @@ type embedDummyResourceImplementer struct {
streamRoute StreamRoute
globalRule GlobalRule
consumer Consumer
plugin Plugin
}

type dummyRoute struct{}
Expand Down Expand Up @@ -180,6 +182,12 @@ func (f *dummyConsumer) Update(_ context.Context, _ *v1.Consumer) (*v1.Consumer,
return nil, ErrClusterNotExist
}

type dummyPlugin struct{}

func (f *dummyPlugin) Get(_ context.Context, _ string) (*v1.Schema, error) {
return nil, ErrClusterNotExist
}

func (nc *nonExistentCluster) Route() Route {
return nc.route
}
Expand All @@ -204,6 +212,10 @@ func (nc *nonExistentCluster) Consumer() Consumer {
return nc.consumer
}

func (nc *nonExistentCluster) Plugin() Plugin {
return nc.plugin
}

func (nc *nonExistentCluster) HasSynced(_ context.Context) error {
return nil
}
Expand All @@ -226,12 +238,14 @@ func (c *dummyCache) InsertUpstream(_ *v1.Upstream) error { return
func (c *dummyCache) InsertStreamRoute(_ *v1.StreamRoute) error { return nil }
func (c *dummyCache) InsertGlobalRule(_ *v1.GlobalRule) error { return nil }
func (c *dummyCache) InsertConsumer(_ *v1.Consumer) error { return nil }
func (c *dummyCache) InsertSchema(_ *v1.Schema) error { return nil }
func (c *dummyCache) GetRoute(_ string) (*v1.Route, error) { return nil, cache.ErrNotFound }
func (c *dummyCache) GetSSL(_ string) (*v1.Ssl, error) { return nil, cache.ErrNotFound }
func (c *dummyCache) GetUpstream(_ string) (*v1.Upstream, error) { return nil, cache.ErrNotFound }
func (c *dummyCache) GetStreamRoute(_ string) (*v1.StreamRoute, error) { return nil, cache.ErrNotFound }
func (c *dummyCache) GetGlobalRule(_ string) (*v1.GlobalRule, error) { return nil, cache.ErrNotFound }
func (c *dummyCache) GetConsumer(_ string) (*v1.Consumer, error) { return nil, cache.ErrNotFound }
func (c *dummyCache) GetSchema(_ string) (*v1.Schema, error) { return nil, cache.ErrNotFound }
func (c *dummyCache) ListRoutes() ([]*v1.Route, error) { return nil, nil }
func (c *dummyCache) ListSSL() ([]*v1.Ssl, error) { return nil, nil }
func (c *dummyCache) ListUpstreams() ([]*v1.Upstream, error) { return nil, nil }
Expand All @@ -244,3 +258,4 @@ func (c *dummyCache) DeleteUpstream(_ *v1.Upstream) error { return
func (c *dummyCache) DeleteStreamRoute(_ *v1.StreamRoute) error { return nil }
func (c *dummyCache) DeleteGlobalRule(_ *v1.GlobalRule) error { return nil }
func (c *dummyCache) DeleteConsumer(_ *v1.Consumer) error { return nil }
func (c *dummyCache) DeleteSchema(_ *v1.Schema) error { return nil }
92 changes: 92 additions & 0 deletions pkg/apisix/plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package apisix

import (
"context"
"github.com/apache/apisix-ingress-controller/pkg/apisix/cache"
"github.com/apache/apisix-ingress-controller/pkg/id"
"github.com/apache/apisix-ingress-controller/pkg/log"
v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
"go.uber.org/zap"
)

type pluginClient struct {
url string
cluster *cluster
}

func newPluginClient(c *cluster) Plugin {
return &pluginClient{
url: c.baseURL + "/plugins",
cluster: c,
}
}

// Get returns the plugin's schema.
func (p *pluginClient) Get(ctx context.Context, name string) (*v1.Schema, error) {
log.Debugw("try to look up plugin's schema",
zap.String("name", name),
zap.String("url", p.url),
zap.String("cluster", "default"),
)
sid := id.GenID(name)
schema, err := p.cluster.cache.GetSchema(sid)
if err == nil {
return schema, nil
}
if err == cache.ErrNotFound {
log.Debugw("failed to find plugin's schema in cache, will try to lookup from APISIX",
zap.String("name", name),
zap.Error(err),
)
} else {
log.Errorw("failed to find plugin's schema in cache, will try to lookup from APISIX",
zap.String("name", name),
zap.Error(err),
)
}

url := p.url + "/" + name
resp, err := p.cluster.getResource(ctx, url)
if err != nil {
log.Errorw("failed to get plugin schema from APISIX",
zap.String("name", name),
zap.String("url", url),
zap.String("cluster", "default"),
zap.Error(err),
)
return nil, err
}

schema, err = resp.Item.schema()
if err != nil {
log.Errorw("failed to convert schema item",
zap.String("url", p.url),
zap.String("plugin_name", resp.Item.Key),
zap.String("plugin_schema", string(resp.Item.Value)),
zap.Error(err),
)
return nil, err
}

schema.Name = name
if err := p.cluster.cache.InsertSchema(schema); err != nil {
log.Errorf("failed to reflect schema create to cache: %s", err)
return nil, err
}
return schema, nil
}
Loading