Skip to content

Commit 5c6276d

Browse files
author
Tomek
committed
fix: initial e2e test + associated fixes
1 parent 9bba282 commit 5c6276d

File tree

13 files changed

+426
-73
lines changed

13 files changed

+426
-73
lines changed

.github/workflows/go.yml

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -22,26 +22,10 @@ jobs:
2222
go-version: '1.20'
2323

2424
- name: Build
25-
run: go build -v ./...
25+
run: make build
2626

2727
- name: Test
28-
run: go test -v ./...
29-
30-
# - id: gen_image_tag
31-
# name: Replace slash to dash
32-
# uses: mad9000/actions-find-and-replace-string@3
33-
# with:
34-
# source: ${{ github.ref_name }}
35-
# find: '/'
36-
# replace: '-'
37-
#
38-
# - name: Dockerize and push
39-
# env:
40-
# DOCKER_REGISTRY: TODO
41-
# IMAGE_TAG: ${{ steps.gen_image_tag.outputs.value }}
42-
# run: |
43-
# docker build -t $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG .
44-
# docker push $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG
28+
run: make test
4529

4630
- name: Log in to Docker Hub
4731
uses: docker/login-action@f4ef78c080cd8ba55a85445d5b36e214a81df20a

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,10 @@ Dockerfile.cross
2525
*.swo
2626
*~
2727

28-
# Local
28+
# Local
2929
.vscode/
3030
.env
3131
examples/
3232
packages/
3333
vals.yaml
34+
/config/crd/bases

README.md

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,6 @@ schemaRegistry:
5252

5353
## How to use
5454

55-
Deploy __ConfigMap__
56-
5755
```yaml
5856
apiVersion: kafka-schema-operator.incubly/v2beta1
5957
kind: KafkaSchema

controllers/kafkaschema_controller.go

Lines changed: 33 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -40,22 +40,22 @@ type KafkaSchemaReconciler struct {
4040
}
4141

4242
func (r *KafkaSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
43-
log := log.FromContext(ctx)
43+
logger := log.FromContext(ctx)
4444

4545
schema := &kafkaschemaoperatorv2beta1.KafkaSchema{}
4646
err := r.Get(ctx, req.NamespacedName, schema)
4747
if err != nil {
4848
if errors.IsNotFound(err) {
49-
log.Info("Schema resource not found. Ignoring since object must be deleted")
49+
logger.Info("Schema resource not found. Ignoring since object must be deleted")
5050
return ctrl.Result{}, nil
5151
}
52-
log.Error(err, "Failed to get Schema resource")
52+
logger.Error(err, "Failed to get Schema resource")
5353
return ctrl.Result{}, err
5454
}
5555

56-
srClient, err := schemareg.NewClient(&schema.SchemaRegistry, log)
56+
srClient, err := schemareg.NewClient(&schema.SchemaRegistry, logger)
5757
if err != nil {
58-
log.Error(err, "Failed to instantiate Schema Registry Client")
58+
logger.Error(err, "Failed to instantiate Schema Registry Client")
5959
return ctrl.Result{}, err
6060
}
6161

@@ -75,23 +75,23 @@ func (r *KafkaSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
7575
controllerutil.RemoveFinalizer(schema, schemaFinilizers)
7676
err = r.Update(ctx, schema)
7777
if err != nil {
78-
log.Error(err, "Failed to delete KafkaSchema from kubernetes: "+schema.Name)
78+
logger.Error(err, "Failed to delete KafkaSchema from kubernetes: "+schema.Name)
7979
return ctrl.Result{}, err
8080
}
81-
log.Info("KafkaSchema CR was deleted: " + schema.Name)
81+
logger.Info("KafkaSchema CR was deleted: " + schema.Name)
8282

8383
err := srClient.DeleteSubject(schemaKey)
8484
if err != nil {
85-
log.Error(err, "Failed to delete subject for key schema from registry: "+schemaKey)
85+
logger.Error(err, "Failed to delete subject for key schema from registry: "+schemaKey)
8686
return ctrl.Result{}, err
8787
}
8888

8989
err = srClient.DeleteSubject(schemaValue)
9090
if err != nil {
91-
log.Error(err, "Failed to delete subject for value schema from registry: "+schemaValue)
91+
logger.Error(err, "Failed to delete subject for value schema from registry: "+schemaValue)
9292
return ctrl.Result{}, err
9393
}
94-
log.Info("Schema was removed from registry")
94+
logger.Info("Schema was removed from registry")
9595
return ctrl.Result{}, nil
9696
}
9797
return ctrl.Result{}, nil
@@ -101,60 +101,59 @@ func (r *KafkaSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
101101
controllerutil.AddFinalizer(schema, schemaFinilizers)
102102
err = r.Update(ctx, schema)
103103
if err != nil {
104-
log.Info("Failed to update finalizers for CR: " + schema.Name)
104+
logger.Info("Failed to update finalizers for CR: " + schema.Name)
105105
return ctrl.Result{}, err
106106
}
107-
log.Info("Finalizers are set for CR: " + schema.Name)
107+
logger.Info("Finalizers are set for CR: " + schema.Name)
108108
}
109109

110110
err = srClient.RegisterSchema(
111111
schemaKey,
112112
schemareg.RegisterSchemaReq{
113-
Schema: `{\"type\": \"` + schema.Spec.SchemaSerializer + `\"}`,
113+
Schema: `{"type": "` + schema.Spec.SchemaSerializer + `"}`,
114114
})
115115
if err != nil {
116-
log.Error(err, "Failed to update schema registry")
116+
logger.Error(err, "Failed to update schema registry")
117117
return reconcileResult, err
118118
}
119-
log.Info("Schema key was published: " + schemaKey)
120-
121-
cfgData := schema.Spec.Data.Schema
122-
cfgData = strings.ReplaceAll(cfgData, "\n", "")
123-
cfgData = strings.ReplaceAll(cfgData, "\t", "")
124-
cfgData = strings.ReplaceAll(cfgData, " ", "")
125-
cfgData = strings.ReplaceAll(cfgData, `"`, `\"`)
126-
cfgData = strings.Replace(cfgData, `\"{`, `"{`, 1)
127-
cfgData = strings.Replace(cfgData, `}\"`, `}"`, -1)
119+
logger.Info("Schema key was published: " + schemaKey)
128120

129121
err = srClient.RegisterSchema(
130122
schemaValue,
131123
schemareg.RegisterSchemaReq{
132-
Schema: cfgData,
124+
Schema: schema.Spec.Data.Schema,
133125
SchemaType: strings.ToUpper(schema.Spec.Data.Format),
134126
})
135127
if err != nil {
136-
log.Error(err, "Failed to update schema registry")
128+
logger.Error(err, "Failed to update schema registry")
137129
return reconcileResult, err
138130
}
139131

140-
var schemaCompatibilityPayload strings.Builder
141-
schemaCompatibilityPayload.WriteString(`{"compatibility": "`)
142-
schemaCompatibilityPayload.WriteString(schema.Spec.Data.Compatibility)
143-
schemaCompatibilityPayload.WriteString(`"}`)
132+
err = srClient.SetCompatibilityMode(
133+
schemaValue,
134+
schemareg.SetCompatibilityModeReq{
135+
Compatibility: schema.Spec.Data.Compatibility,
136+
},
137+
)
144138

145-
err = srClient.SetCompatibilityMode(schemaValue, schemaCompatibilityPayload.String())
146139
if err != nil {
147-
log.Error(err, "Failed to update schema compatibility for value")
140+
logger.Error(err, "Failed to update schema compatibility for value")
148141
return reconcileResult, err
149142
}
150143

151-
err = srClient.SetCompatibilityMode(schemaKey, schemaCompatibilityPayload.String())
144+
err = srClient.SetCompatibilityMode(
145+
schemaKey,
146+
schemareg.SetCompatibilityModeReq{
147+
Compatibility: schema.Spec.Data.Compatibility,
148+
},
149+
)
150+
152151
if err != nil {
153-
log.Error(err, "Failed to update schema compatibility for key")
152+
logger.Error(err, "Failed to update schema compatibility for key")
154153
return reconcileResult, err
155154
}
156155

157-
log.Info("Schema value was published: " + schemaValue)
156+
logger.Info("Schema value was published: " + schemaValue)
158157
return reconcileResult, nil
159158
}
160159

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package controllers
2+
3+
import (
4+
"context"
5+
. "github.com/onsi/ginkgo/v2"
6+
. "github.com/onsi/gomega"
7+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
8+
"k8s.io/apimachinery/pkg/types"
9+
"kafka-schema-operator/api/v2beta1"
10+
schemareg_mock "kafka-schema-operator/schemareg-mock"
11+
"strconv"
12+
"time"
13+
)
14+
15+
const ResourceNs = "default"
16+
17+
var _ = Describe("KafkaschemaController", func() {
18+
19+
BeforeEach(func() {
20+
srMock.Clear()
21+
})
22+
Context("When creating Schema", func() {
23+
24+
It("Should create schema", func() {
25+
topicName := "testenv.testservice.testevent." + strconv.FormatInt(time.Now().UnixMilli(), 10)
26+
schemaRes := v2beta1.KafkaSchema{
27+
TypeMeta: metav1.TypeMeta{
28+
Kind: "KafkaSchema",
29+
APIVersion: "kafka-schema-operator.incubly/v2beta1",
30+
},
31+
ObjectMeta: metav1.ObjectMeta{
32+
Name: topicName,
33+
Namespace: ResourceNs,
34+
},
35+
Spec: v2beta1.KafkaSchemaSpec{
36+
Name: topicName,
37+
SchemaSerializer: "string",
38+
AutoReconciliation: false,
39+
TerminationProtection: false,
40+
Data: v2beta1.KafkaSchemaData{
41+
Schema: `{
42+
"namespace": "testing",
43+
"type": "record",
44+
"name": "testing",
45+
"fields": [
46+
{"name": "id", "type": "string"},
47+
{"name": "email", "type": "string"}
48+
]
49+
}`,
50+
Format: "AVRO",
51+
Compatibility: "BACKWARD",
52+
},
53+
},
54+
}
55+
56+
By("When: creating KafkaSchema resource")
57+
ctx := context.Background()
58+
Expect(k8sClient.Create(ctx, &schemaRes)).Should(Succeed())
59+
60+
By("Then: resource is created in K8S")
61+
schemaLookupKey := types.NamespacedName{Name: topicName, Namespace: ResourceNs}
62+
createdResource := &v2beta1.KafkaSchema{}
63+
Eventually(func() error {
64+
return k8sClient.Get(ctx, schemaLookupKey, createdResource)
65+
}).Should(Succeed())
66+
67+
Expect(createdResource.Spec.Name).Should(Equal(topicName))
68+
// TODO: verify resource
69+
70+
By("And: subjects and schemas for topic key & value are registered in schema registry")
71+
72+
Eventually(func() map[string]*schemareg_mock.Subject {
73+
return srMock.Subjects
74+
},
75+
time.Second*10,
76+
time.Millisecond*100,
77+
).Should(
78+
And(
79+
HaveKey(topicName+"-key"),
80+
HaveKey(topicName+"-value"),
81+
),
82+
)
83+
Expect(srMock.Subjects[topicName+"-value"].SchemaRefs).Should(HaveLen(1))
84+
// TODO: verify schema
85+
})
86+
87+
})
88+
})

controllers/suite_test.go

Lines changed: 76 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,18 @@ limitations under the License.
1717
package controllers
1818

1919
import (
20-
"path/filepath"
21-
"testing"
22-
20+
"errors"
2321
. "github.com/onsi/ginkgo/v2"
2422
. "github.com/onsi/gomega"
23+
"github.com/onsi/gomega/gexec"
24+
"github.com/onsi/gomega/ghttp"
25+
schemaregmock "kafka-schema-operator/schemareg-mock"
26+
"net/url"
27+
"os"
28+
"path/filepath"
29+
ctrl "sigs.k8s.io/controller-runtime"
30+
"testing"
31+
"time"
2532

2633
"k8s.io/client-go/kubernetes/scheme"
2734
"k8s.io/client-go/rest"
@@ -37,9 +44,13 @@ import (
3744
// These tests use Ginkgo (BDD-style Go testing framework). Refer to
3845
// http://onsi.github.io/ginkgo/ to learn more about Ginkgo.
3946

40-
var cfg *rest.Config
41-
var k8sClient client.Client
42-
var testEnv *envtest.Environment
47+
var (
48+
cfg *rest.Config
49+
k8sClient client.Client
50+
testEnv *envtest.Environment
51+
srMockServer *ghttp.Server
52+
srMock *schemaregmock.SchemaRegMock
53+
)
4354

4455
func TestAPIs(t *testing.T) {
4556
RegisterFailHandler(Fail)
@@ -51,6 +62,8 @@ var _ = BeforeSuite(func() {
5162
logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true)))
5263

5364
By("bootstrapping test environment")
65+
Expect(setupKubeBuilderAssets()).To(Succeed())
66+
5467
testEnv = &envtest.Environment{
5568
CRDDirectoryPaths: []string{filepath.Join("..", "config", "crd", "bases")},
5669
ErrorIfCRDPathMissing: true,
@@ -71,10 +84,63 @@ var _ = BeforeSuite(func() {
7184
Expect(err).NotTo(HaveOccurred())
7285
Expect(k8sClient).NotTo(BeNil())
7386

87+
By("Starting Schema Registry Mock")
88+
srMock = schemaregmock.NewSchemaRegMock(
89+
map[string]schemaregmock.SchemaCompatibilityPolicy{},
90+
ctrl.Log,
91+
)
92+
srMockServer = srMock.GetServer()
93+
srMockServerUrl, err := url.ParseRequestURI(srMockServer.HTTPTestServer.URL)
94+
Expect(err).ToNot(HaveOccurred())
95+
Expect(os.Setenv("SCHEMA_REGISTRY_HOST", srMockServerUrl.Hostname())).To(Succeed())
96+
Expect(os.Setenv("SCHEMA_REGISTRY_PORT", srMockServerUrl.Port())).To(Succeed())
97+
98+
By("Run Controllers")
99+
options := ctrl.Options{
100+
Scheme: scheme.Scheme,
101+
Port: 9443,
102+
Namespace: "default",
103+
}
104+
105+
k8sManager, err := ctrl.NewManager(cfg, options)
106+
Expect(err).ToNot(HaveOccurred())
107+
108+
err = (&KafkaSchemaReconciler{
109+
Client: k8sManager.GetClient(),
110+
Scheme: k8sManager.GetScheme(),
111+
}).SetupWithManager(k8sManager)
112+
Expect(err).ToNot(HaveOccurred())
113+
114+
go func() {
115+
defer GinkgoRecover()
116+
err = k8sManager.Start(ctrl.SetupSignalHandler())
117+
Expect(err).ToNot(HaveOccurred(), "failed to run manager")
118+
gexec.KillAndWait(4 * time.Second)
119+
120+
// Teardown the test environment once controller is finished.
121+
// Otherwise, from Kubernetes 1.21+, teardown timeouts waiting on
122+
// kube-apiserver to return
123+
By("tearing down the test environment")
124+
err := testEnv.Stop()
125+
Expect(err).ToNot(HaveOccurred())
126+
srMockServer.Close()
127+
}()
74128
})
75129

76-
var _ = AfterSuite(func() {
77-
By("tearing down the test environment")
78-
err := testEnv.Stop()
130+
func setupKubeBuilderAssets() error {
131+
kubeBuilderAssets, err := os.ReadDir("../bin/k8s")
79132
Expect(err).NotTo(HaveOccurred())
80-
})
133+
var kubeBuilderAssetsPath string
134+
for _, dirEntry := range kubeBuilderAssets {
135+
if dirEntry.IsDir() {
136+
kubeBuilderAssetsPath = "../bin/k8s/" + dirEntry.Name()
137+
break
138+
}
139+
}
140+
if len(kubeBuilderAssetsPath) == 0 {
141+
return errors.New("kubeBuilderAssetsPath not found")
142+
}
143+
144+
ctrl.Log.Info("Setting env KUBEBUILDER_ASSETS=" + kubeBuilderAssetsPath)
145+
return os.Setenv("KUBEBUILDER_ASSETS", kubeBuilderAssetsPath)
146+
}

0 commit comments

Comments
 (0)