@@ -4,91 +4,46 @@ import (
44 "context"
55 . "github.com/onsi/ginkgo/v2"
66 . "github.com/onsi/gomega"
7- metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
8- "k8s.io/apimachinery/pkg/types"
7+ "github.com/onsi/gomega/types"
98 "kafka-schema-operator/api/v2beta1"
109 schemareg_mock "kafka-schema-operator/schemareg-mock"
1110 "strconv"
1211 "time"
1312)
1413
15- const ResourceNs = "default"
16-
1714var _ = Describe ("KafkaschemaController" , func () {
1815
1916 BeforeEach (func () {
2017 srMock .Clear ()
2118 })
19+ ctx := context .Background ()
20+
2221 Context ("When creating resource" , func () {
2322
2423 It ("Should register subject+schema and manage resource status" , func () {
2524 topicName := "testenv.testservice.testevent." + strconv .FormatInt (time .Now ().UnixMilli (), 10 )
26- schemaRes := v2beta1.KafkaSchema {
27- TypeMeta : metav1.TypeMeta {
28- Kind : "KafkaSchema" ,
29- APIVersion : "kafka-schema-operator.incubly/v2beta1" ,
30- },
31- ObjectMeta : metav1.ObjectMeta {
32- Name : topicName ,
33- Namespace : ResourceNs ,
34- },
35- Spec : v2beta1.KafkaSchemaSpec {
36- Name : topicName ,
37- SchemaSerializer : "string" ,
38- AutoReconciliation : false ,
39- TerminationProtection : false ,
40- Data : v2beta1.KafkaSchemaData {
41- Schema : `{
42- "namespace": "testing",
43- "type": "record",
44- "name": "testing",
45- "fields": [
46- {"name": "id", "type": "string"},
47- {"name": "email", "type": "string"}
48- ]
49- }` ,
50- Format : "AVRO" ,
51- Compatibility : "BACKWARD" ,
52- },
53- },
54- }
5525
5626 By ("When: creating KafkaSchema resource" )
57- ctx := context .Background ()
58- Expect (k8sClient .Create (ctx , & schemaRes )).Should (Succeed ())
27+ Expect (
28+ k8sClient .Create (ctx , aKafkaSchemaResource (topicName )),
29+ ).Should (Succeed ())
5930
6031 By ("Then: resource is properly created in K8S" )
61- schemaLookupKey := types.NamespacedName {Name : topicName , Namespace : ResourceNs }
62- createdResource := & v2beta1.KafkaSchema {}
63- Eventually (func () error {
64- return k8sClient .Get (ctx , schemaLookupKey , createdResource )
65- }).Should (Succeed ())
66-
67- Expect (createdResource .Spec .Name ).Should (Equal (topicName ))
32+ Expect (currentResourceState (ctx , topicName ).Spec .Name ).Should (Equal (topicName ))
6833 // TODO: verify resource
6934
35+ // TODO: switch to observing resource status and swap those 2 steps
7036 By ("And: subjects and schemas for topic key & value are registered in schema registry" )
71-
72- Eventually (func () map [string ]* schemareg_mock.Subject {
73- return srMock .Subjects
74- },
75- time .Second * 10 ,
76- time .Millisecond * 100 ,
77- ).Should (
78- And (
79- HaveKey (topicName + "-key" ),
80- HaveKey (topicName + "-value" ),
81- ),
82- )
37+ assertSubjectsCreatedInSchemaRegistry (topicName + "-key" , topicName + "-value" )
8338 Expect (srMock .Subjects [topicName + "-value" ].SchemaRefs ).Should (HaveLen (1 ))
8439 // TODO: verify schema
8540
8641 By ("And: resource are updated with proper status and finalizer" )
87- updatedResource := & v2beta1. KafkaSchema {}
88- Eventually ( func () error {
89- return k8sClient . Get ( ctx , schemaLookupKey , updatedResource )
90- }). Should ( Succeed ())
91- Expect ( updatedResource . Finalizers ). Should ( ConsistOf ( "kafka-schema-operator.incubly/finalizer" ) )
42+ Expect (
43+ currentResourceState ( ctx , topicName ). Finalizers ,
44+ ). Should (
45+ ConsistOf ( "kafka-schema-operator.incubly/finalizer" ),
46+ )
9247 // TODO: verify status (& events?)
9348 })
9449 })
@@ -101,7 +56,56 @@ var _ = Describe("KafkaschemaController", func() {
10156 })
10257 })
10358 Context ("When deleting resource" , func () {
104- PIt ("Should soft-delete subject and cleanup resource" , func () {
59+ It ("Should soft-delete subject and cleanup resource" , func () {
60+ topicName := "testenv.testservice.testevent." + strconv .FormatInt (time .Now ().UnixMilli (), 10 )
61+ schemaResource := aKafkaSchemaResource (topicName )
62+
63+ By ("Given: KafkaSchema resource was already deployed" )
64+ Expect (
65+ k8sClient .Create (ctx , schemaResource ),
66+ ).Should (Succeed ())
67+ By ("And: controller handled resource creation correctly" )
68+ // TODO: switch to observing resource status instead of waiting for schema registry
69+ assertSubjectsCreatedInSchemaRegistry (topicName + "-key" , topicName + "-value" )
70+
71+ By ("When: deleting resource" )
72+ Expect (k8sClient .Delete (ctx , schemaResource )).Should (Succeed ())
73+
74+ By ("And: resource is eventually deleted" )
75+ /*
76+ We're waiting for the controller to reconcile the resource
77+ (marked for deletion, because it had finalizer - see previous test).
78+ */
79+ // TODO: verify the error type & reason
80+ Eventually (func () error {
81+ return k8sClient .Get (ctx , resourceLookupKey (topicName ), & v2beta1.KafkaSchema {})
82+ }).ShouldNot (Succeed ())
83+
84+ By ("Then: both subjects are soft-deleted from schema registry" )
85+ Expect (srMock .Subjects ).ShouldNot (Or (
86+ HaveKey (topicName + "-key" ),
87+ HaveKey (topicName + "-value" )))
10588 })
10689 })
10790})
91+
92+ func assertSubjectsCreatedInSchemaRegistry (subjectNames ... string ) {
93+ subjectNameMatchers := []types.GomegaMatcher {}
94+ for _ , subjectName := range subjectNames {
95+ subjectNameMatchers = append (subjectNameMatchers , HaveKey (subjectName ))
96+ }
97+ Eventually (func () map [string ]* schemareg_mock.Subject {
98+ return srMock .Subjects
99+ },
100+ time .Second * 10 ,
101+ time .Millisecond * 100 ,
102+ ).Should (And (subjectNameMatchers ... ))
103+ }
104+
105+ func currentResourceState (ctx context.Context , topicName string ) * v2beta1.KafkaSchema {
106+ res := & v2beta1.KafkaSchema {}
107+ Eventually (func () error {
108+ return k8sClient .Get (ctx , resourceLookupKey (topicName ), res )
109+ }).Should (Succeed ())
110+ return res
111+ }
0 commit comments