From 3f45041e74c0e9007c50e16d601ad8e10ba69b60 Mon Sep 17 00:00:00 2001 From: Alex Zhang Date: Wed, 6 Jan 2021 18:52:55 +0800 Subject: [PATCH] chore: return object in client.Update method --- pkg/apisix/client.go | 8 ++--- pkg/apisix/resource.go | 2 ++ pkg/apisix/route.go | 14 +++++++-- pkg/apisix/route_test.go | 4 ++- pkg/apisix/service.go | 14 +++++++-- pkg/apisix/service_test.go | 4 ++- pkg/apisix/ssl.go | 14 +++++++-- pkg/apisix/ssl_test.go | 4 ++- pkg/apisix/stub.go | 15 ++++++---- pkg/apisix/upstream.go | 14 +++++++-- pkg/apisix/upstream_test.go | 4 ++- pkg/seven/apisix/route.go | 1 + pkg/seven/apisix/upstream.go | 3 +- pkg/seven/state/builder.go | 4 +-- pkg/seven/state/service_worker.go | 49 +++++++++++++++---------------- pkg/seven/state/solver.go | 3 +- 16 files changed, 103 insertions(+), 54 deletions(-) diff --git a/pkg/apisix/client.go b/pkg/apisix/client.go index a3723793238..89d68eecc62 100644 --- a/pkg/apisix/client.go +++ b/pkg/apisix/client.go @@ -49,7 +49,7 @@ type Route interface { List(context.Context, string) ([]*v1.Route, error) Create(context.Context, *v1.Route) (*v1.Route, error) Delete(context.Context, *v1.Route) error - Update(context.Context, *v1.Route) error + Update(context.Context, *v1.Route) (*v1.Route, error) } // SSL is the specific client interface to take over the create, update, @@ -58,7 +58,7 @@ type SSL interface { List(context.Context, string) ([]*v1.Ssl, error) Create(context.Context, *v1.Ssl) (*v1.Ssl, error) Delete(context.Context, *v1.Ssl) error - Update(context.Context, *v1.Ssl) error + Update(context.Context, *v1.Ssl) (*v1.Ssl, error) } // Upstream is the specific client interface to take over the create, update, @@ -67,7 +67,7 @@ type Upstream interface { List(context.Context, string) ([]*v1.Upstream, error) Create(context.Context, *v1.Upstream) (*v1.Upstream, error) Delete(context.Context, *v1.Upstream) error - Update(context.Context, *v1.Upstream) error + Update(context.Context, *v1.Upstream) (*v1.Upstream, error) } // Service is the specific client interface to take over the create, update, @@ -76,7 +76,7 @@ type Service interface { List(context.Context, string) ([]*v1.Service, error) Create(context.Context, *v1.Service) (*v1.Service, error) Delete(context.Context, *v1.Service) error - Update(context.Context, *v1.Service) error + Update(context.Context, *v1.Service) (*v1.Service, error) } type client struct { diff --git a/pkg/apisix/resource.go b/pkg/apisix/resource.go index 82e5f50c16d..73868733d7e 100644 --- a/pkg/apisix/resource.go +++ b/pkg/apisix/resource.go @@ -36,6 +36,8 @@ type createResponse struct { Item item `json:"node"` } +type updateResponse = createResponse + type node struct { Key string `json:"key"` Items items `json:"nodes"` diff --git a/pkg/apisix/route.go b/pkg/apisix/route.go index 209d5143193..8ca148b2d17 100644 --- a/pkg/apisix/route.go +++ b/pkg/apisix/route.go @@ -107,7 +107,7 @@ func (r *routeClient) Delete(ctx context.Context, obj *v1.Route) error { return r.stub.deleteResource(ctx, url) } -func (r *routeClient) Update(ctx context.Context, obj *v1.Route) error { +func (r *routeClient) Update(ctx context.Context, obj *v1.Route) (*v1.Route, error) { log.Infof("update route, id:%s", *obj.ID) body, err := json.Marshal(routeReqBody{ Desc: obj.Name, @@ -117,8 +117,16 @@ func (r *routeClient) Update(ctx context.Context, obj *v1.Route) error { Plugins: obj.Plugins, }) if err != nil { - return err + return nil, err } url := r.url + "/" + *obj.ID - return r.stub.updateResource(ctx, url, bytes.NewReader(body)) + resp, err := r.stub.updateResource(ctx, url, bytes.NewReader(body)) + if err != nil { + return nil, err + } + var group string + if obj.Group != nil { + group = *obj.Group + } + return resp.Item.route(group) } diff --git a/pkg/apisix/route_test.go b/pkg/apisix/route_test.go index 5a1931dfee2..2c2aaca64ac 100644 --- a/pkg/apisix/route_test.go +++ b/pkg/apisix/route_test.go @@ -131,6 +131,8 @@ func (srv *fakeAPISIXRouteSrv) ServeHTTP(w http.ResponseWriter, r *http.Request) srv.route[id] = data w.WriteHeader(http.StatusOK) + output := fmt.Sprintf(`{"action": "compareAndSwap", "node": {"key": "%s", "value": %s}}`, id, string(data)) + _, _ = w.Write([]byte(output)) return } } @@ -216,7 +218,7 @@ func TestRouteClient(t *testing.T) { // Patch then List id = "112" objId := "2" - err = cli.Update(context.Background(), &v1.Route{ + _, err = cli.Update(context.Background(), &v1.Route{ ID: &objId, Host: &host, Path: &uri, diff --git a/pkg/apisix/service.go b/pkg/apisix/service.go index 3f210bdc25a..4b8b47785c9 100644 --- a/pkg/apisix/service.go +++ b/pkg/apisix/service.go @@ -99,7 +99,7 @@ func (s *serviceClient) Delete(ctx context.Context, obj *v1.Service) error { return s.stub.deleteResource(ctx, url) } -func (s *serviceClient) Update(ctx context.Context, obj *v1.Service) error { +func (s *serviceClient) Update(ctx context.Context, obj *v1.Service) (*v1.Service, error) { log.Infof("update service, id:%s", *obj.ID) body, err := json.Marshal(serviceItem{ @@ -108,9 +108,17 @@ func (s *serviceClient) Update(ctx context.Context, obj *v1.Service) error { Desc: obj.Name, }) if err != nil { - return err + return nil, err } url := s.url + "/" + *obj.ID - return s.stub.updateResource(ctx, url, bytes.NewReader(body)) + resp, err := s.stub.updateResource(ctx, url, bytes.NewReader(body)) + if err != nil { + return nil, err + } + var group string + if obj.Group != nil { + group = *obj.Group + } + return resp.Item.service(group) } diff --git a/pkg/apisix/service_test.go b/pkg/apisix/service_test.go index 659bcdb2b59..a2c9054db02 100644 --- a/pkg/apisix/service_test.go +++ b/pkg/apisix/service_test.go @@ -110,6 +110,8 @@ func (srv *fakeAPISIXServiceSrv) ServeHTTP(w http.ResponseWriter, r *http.Reques srv.service[id] = data w.WriteHeader(http.StatusOK) + output := fmt.Sprintf(`{"action": "compareAndSwap", "node": {"key": "%s", "value": %s}}`, id, string(data)) + _, _ = w.Write([]byte(output)) return } } @@ -192,7 +194,7 @@ func TestServiceClient(t *testing.T) { // Patch then List upsId = "14" objId := "2" - err = cli.Update(context.Background(), &v1.Service{ + _, err = cli.Update(context.Background(), &v1.Service{ ID: &objId, FullName: &fullName, Group: &group, diff --git a/pkg/apisix/ssl.go b/pkg/apisix/ssl.go index 85ff87c3bab..9da18fac46a 100644 --- a/pkg/apisix/ssl.go +++ b/pkg/apisix/ssl.go @@ -99,7 +99,7 @@ func (t *sslClient) Delete(ctx context.Context, obj *v1.Ssl) error { return t.stub.deleteResource(ctx, url) } -func (t *sslClient) Update(ctx context.Context, obj *v1.Ssl) error { +func (t *sslClient) Update(ctx context.Context, obj *v1.Ssl) (*v1.Ssl, error) { log.Infof("update ssl, id:%s", *obj.ID) url := t.url + "/" + *obj.ID data, err := json.Marshal(v1.Ssl{ @@ -110,7 +110,15 @@ func (t *sslClient) Update(ctx context.Context, obj *v1.Ssl) error { Status: obj.Status, }) if err != nil { - return err + return nil, err } - return t.stub.updateResource(ctx, url, bytes.NewReader(data)) + resp, err := t.stub.updateResource(ctx, url, bytes.NewReader(data)) + if err != nil { + return nil, err + } + var group string + if obj.Group != nil { + group = *obj.Group + } + return resp.Item.ssl(group) } diff --git a/pkg/apisix/ssl_test.go b/pkg/apisix/ssl_test.go index 2e2c002fc77..46138f9b8d6 100644 --- a/pkg/apisix/ssl_test.go +++ b/pkg/apisix/ssl_test.go @@ -110,6 +110,8 @@ func (srv *fakeAPISIXSSLSrv) ServeHTTP(w http.ResponseWriter, r *http.Request) { srv.ssl[id] = data w.WriteHeader(http.StatusOK) + output := fmt.Sprintf(`{"action": "compareAndSwap", "node": {"key": "%s", "value": %s}}`, id, string(data)) + _, _ = w.Write([]byte(output)) return } } @@ -185,7 +187,7 @@ func TestSSLClient(t *testing.T) { // Patch then List objId := "2" sni = "foo.com" - err = cli.Update(context.Background(), &v1.Ssl{ + _, err = cli.Update(context.Background(), &v1.Ssl{ ID: &objId, Snis: []*string{&sni}, }) diff --git a/pkg/apisix/stub.go b/pkg/apisix/stub.go index 26697eb9c1c..9690a2fe365 100644 --- a/pkg/apisix/stub.go +++ b/pkg/apisix/stub.go @@ -91,21 +91,26 @@ func (s *stub) createResource(ctx context.Context, url string, body io.Reader) ( return &cr, nil } -func (s *stub) updateResource(ctx context.Context, url string, body io.Reader) error { +func (s *stub) updateResource(ctx context.Context, url string, body io.Reader) (*updateResponse, error) { req, err := http.NewRequestWithContext(ctx, http.MethodPatch, url, body) if err != nil { - return err + return nil, err } resp, err := s.do(req) if err != nil { - return err + return nil, err } defer drainBody(resp.Body, url) if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { - return fmt.Errorf("unexpected status code: %d", resp.StatusCode) + return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode) } - return nil + var ur updateResponse + dec := json.NewDecoder(resp.Body) + if err := dec.Decode(&ur); err != nil { + return nil, err + } + return &ur, nil } func (s *stub) deleteResource(ctx context.Context, url string) error { diff --git a/pkg/apisix/upstream.go b/pkg/apisix/upstream.go index d064cd64360..39efd1cc949 100644 --- a/pkg/apisix/upstream.go +++ b/pkg/apisix/upstream.go @@ -120,7 +120,7 @@ func (u *upstreamClient) Delete(ctx context.Context, obj *v1.Upstream) error { return u.stub.deleteResource(ctx, url) } -func (u *upstreamClient) Update(ctx context.Context, obj *v1.Upstream) error { +func (u *upstreamClient) Update(ctx context.Context, obj *v1.Upstream) (*v1.Upstream, error) { log.Infof("update upstream, id:%s", *obj.ID) // TODO Just pass the node array. @@ -138,9 +138,17 @@ func (u *upstreamClient) Update(ctx context.Context, obj *v1.Upstream) error { Desc: obj.Name, }) if err != nil { - return err + return nil, err } url := u.url + "/" + *obj.ID - return u.stub.updateResource(ctx, url, bytes.NewReader(body)) + resp, err := u.stub.updateResource(ctx, url, bytes.NewReader(body)) + if err != nil { + return nil, err + } + var group string + if obj.Group != nil { + group = *obj.Group + } + return resp.Item.upstream(group) } diff --git a/pkg/apisix/upstream_test.go b/pkg/apisix/upstream_test.go index fc56d861fce..41337417a1a 100644 --- a/pkg/apisix/upstream_test.go +++ b/pkg/apisix/upstream_test.go @@ -111,6 +111,8 @@ func (srv *fakeAPISIXUpstreamSrv) ServeHTTP(w http.ResponseWriter, r *http.Reque srv.upstream[id] = data w.WriteHeader(http.StatusOK) + output := fmt.Sprintf(`{"action": "compareAndSwap", "node": {"key": "%s", "value": %s}}`, id, string(data)) + _, _ = w.Write([]byte(output)) return } } @@ -208,7 +210,7 @@ func TestUpstreamClient(t *testing.T) { // Patch then List lbType = "chash" objId := "2" - err = cli.Update(context.Background(), &v1.Upstream{ + _, err = cli.Update(context.Background(), &v1.Upstream{ ID: &objId, FullName: &fullName, Group: &group, diff --git a/pkg/seven/apisix/route.go b/pkg/seven/apisix/route.go index 0144fa8d236..f819031259f 100644 --- a/pkg/seven/apisix/route.go +++ b/pkg/seven/apisix/route.go @@ -20,6 +20,7 @@ import ( "github.com/api7/ingress-controller/pkg/seven/conf" sevendb "github.com/api7/ingress-controller/pkg/seven/db" + "github.com/api7/ingress-controller/pkg/seven/utils" v1 "github.com/api7/ingress-controller/pkg/types/apisix/v1" ) diff --git a/pkg/seven/apisix/upstream.go b/pkg/seven/apisix/upstream.go index 04478786045..130923a9b55 100644 --- a/pkg/seven/apisix/upstream.go +++ b/pkg/seven/apisix/upstream.go @@ -61,5 +61,6 @@ func PatchNodes(upstream *v1.Upstream, nodes []*v1.Node) error { // Restore it upstream.Nodes = oldNodes }() - return conf.Client.Upstream().Update(context.TODO(), upstream) + _, err := conf.Client.Upstream().Update(context.TODO(), upstream) + return err } diff --git a/pkg/seven/state/builder.go b/pkg/seven/state/builder.go index 77a93633705..b9c83ac7213 100644 --- a/pkg/seven/state/builder.go +++ b/pkg/seven/state/builder.go @@ -147,7 +147,7 @@ func (r *routeWorker) sync() error { return err } // 2. sync apisix - if err := conf.Client.Route().Update(context.TODO(), r.Route); err != nil { + if _, err := conf.Client.Route().Update(context.TODO(), r.Route); err != nil { log.Errorf("failed to update route %s: %s, ", *r.Name, err) return err } @@ -233,7 +233,7 @@ func SolverSingleUpstream(u *v1.Upstream, swg ServiceWorkerGroup, wg *sync.WaitG } // 2.sync apisix - if err = conf.Client.Upstream().Update(context.TODO(), u); err != nil { + if _, err = conf.Client.Upstream().Update(context.TODO(), u); err != nil { glog.Errorf("solver upstream failed, update upstream to etcd failed, err: %+v", err) return } diff --git a/pkg/seven/state/service_worker.go b/pkg/seven/state/service_worker.go index 64a8356c3d3..3209c0350f9 100644 --- a/pkg/seven/state/service_worker.go +++ b/pkg/seven/state/service_worker.go @@ -109,7 +109,6 @@ func SolverSingleService(svc *v1.Service, rwg RouteWorkerGroup, wg *sync.WaitGro } else { *svc.ID = *s.ID } - // 2. sync memDB db := &db.ServiceDB{Services: []*v1.Service{svc}} if err := db.Insert(); err != nil { @@ -117,31 +116,31 @@ func SolverSingleService(svc *v1.Service, rwg RouteWorkerGroup, wg *sync.WaitGro return } log.Infof("create service %s, %s", *svc.Name, *svc.UpstreamId) - } - } else { - op = Update - needToUpdate := true - if currentService.FromKind != nil && *(currentService.FromKind) == ApisixService { // update from ApisixUpstream - if svc.FromKind == nil || (svc.FromKind != nil && *(svc.FromKind) != ApisixService) { - // currentService > svc - // set lb && health check - needToUpdate = false - } - } - if needToUpdate { - // 1. sync memDB - db := db.ServiceDB{Services: []*v1.Service{svc}} - if err := db.UpdateService(); err != nil { - log.Errorf("failed to update service to mem db: %s", err) - errNotify = err - return + } else { + op = Update + needToUpdate := true + if currentService.FromKind != nil && *(currentService.FromKind) == ApisixService { // update from ApisixUpstream + if svc.FromKind == nil || (svc.FromKind != nil && *(svc.FromKind) != ApisixService) { + // currentService > svc + // set lb && health check + needToUpdate = false + } } - // 2. sync apisix - if err := conf.Client.Service().Update(context.TODO(), svc); err != nil { - errNotify = err - log.Errorf("failed to update service: %s, id:%s", err, *svc.ID) - } else { - log.Infof("updated service, id:%s, upstream_id:%s", *svc.ID, *svc.UpstreamId) + if needToUpdate { + // 1. sync memDB + db := db.ServiceDB{Services: []*v1.Service{svc}} + if err := db.UpdateService(); err != nil { + log.Errorf("failed to update service to mem db: %s", err) + errNotify = err + return + } + // 2. sync apisix + if _, err := conf.Client.Service().Update(context.TODO(), svc); err != nil { + errNotify = err + log.Errorf("failed to update service: %s, id:%s", err, *svc.ID) + } else { + log.Infof("updated service, id:%s, upstream_id:%s", *svc.ID, *svc.UpstreamId) + } } } } diff --git a/pkg/seven/state/solver.go b/pkg/seven/state/solver.go index 77fe5ee8c23..cb5d023a5e8 100644 --- a/pkg/seven/state/solver.go +++ b/pkg/seven/state/solver.go @@ -156,7 +156,8 @@ func SyncSsl(ssl *v1.Ssl, method string) error { _, err := conf.Client.SSL().Create(context.TODO(), ssl) return err case Update: - return conf.Client.SSL().Update(context.TODO(), ssl) + _, err := conf.Client.SSL().Update(context.TODO(), ssl) + return err case Delete: return conf.Client.SSL().Delete(context.TODO(), ssl) }