Skip to content

Commit

Permalink
ci: Add e2e case (#145)
Browse files Browse the repository at this point in the history
  • Loading branch information
hwjiangkai authored Jul 11, 2022
1 parent 7c7839c commit 809abab
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 36 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ jobs:
make docker-build
kubectl apply -f ./deploy/all-in-one.yaml
kubectl apply -f ./test/yaml/display.yml
kubectl apply -f ./test/yaml/etcd-srv.yml
make build-cmd
chmod ug+x ./bin/vsctl
sudo mv ./bin/vsctl /usr/local/bin/vsctl
Expand All @@ -51,6 +52,5 @@ jobs:
- name: Check e2e
run: |
kubectl get pod -n vanus | grep quick-display | awk '{print $1}' | xargs kubectl logs -n vanus | grep -n "total" | wc -l
[[ $(kubectl get pod -n vanus | grep quick-display | awk '{print $1}' | xargs kubectl logs -n vanus | grep -n "total" | wc -l) -eq 10001 ]] && echo "success" || echo "failed"
echo "todo"
193 changes: 159 additions & 34 deletions test/e2e/quick-start/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,43 +30,66 @@ import (
log "k8s.io/klog/v2"

"github.com/fatih/color"
"github.com/linkall-labs/vanus/internal/kv"
"github.com/linkall-labs/vanus/internal/kv/etcd"
ctrlpb "github.com/linkall-labs/vanus/proto/pkg/controller"
"github.com/linkall-labs/vanus/proto/pkg/meta"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

const (
// cloudEventDataRowLength = 4
httpPrefix = "http://"
eventBus = "quick-start"
sink = "http://quick-display:80"
source = ""
filters = ""
inputTransformer = ""

eventID = "1"
eventType = "examples"
eventBody = "Hello Vanus"
eventSource = "quick-start"
VolumeKeyPrefixInKVStore = "/vanus/internal/resource/volume/metadata"
BlockKeyPrefixInKVStore = "/vanus/internal/resource/volume/block"
VolumeInstanceKeyPrefixInKVStore = "/vanus/internal/resource/volume/instance"

EventbusKeyPrefixInKVStore = "/vanus/internal/resource/eventbus"
EventlogKeyPrefixInKVStore = "/vanus/internal/resource/eventlog"
SegmentKeyPrefixInKVStore = "/vanus/internal/resource/segment"

EventlogSegmentsKeyPrefixInKVStore = "/vanus/internal/resource/segs_of_eventlog"
)

const (
HttpPrefix = "http://"
EventBus = "quick-start"
)

var (
httpClient = resty.New()
endpoint = os.Getenv("VANUS_GATEWAY")
Sink = "http://quick-display:80"
Source = ""
Filters = ""
InputTransformer = ""

EventType = "examples"
EventBody = "Hello Vanus"
EventSource = "quick-start"

HttpClient = resty.New()
Endpoint = os.Getenv("VANUS_GATEWAY")
EtcdClient kv.Client
err error
)

func init() {
kvStoreEndpoints := []string{"192.168.49.2:30007"}
kvKeyPrefix := "/vanus"
EtcdClient, err = etcd.NewEtcdClientV3(kvStoreEndpoints, kvKeyPrefix)
if err != nil {
log.Fatalf("NewEtcdClientV3 failed, err: %+v\n", err)
}
}

func mustGetControllerProxyConn(ctx context.Context) *grpc.ClientConn {
splits := strings.Split(os.Getenv("VANUS_GATEWAY"), ":")
port, err := strconv.Atoi(splits[1])
if err != nil {
fmt.Println("parsing gateway port failed")
log.Error("parsing gateway port failed")
return nil
}
leaderConn := createGRPCConn(ctx, fmt.Sprintf("%s:%d", splits[0], port+2))
if leaderConn == nil {
fmt.Println("failed to connect to gateway")
log.Error("failed to connect to gateway")
return nil
}
return leaderConn
Expand Down Expand Up @@ -179,7 +202,7 @@ func putEvent(eventbus, eventID, eventType, eventBody, eventSource string) error
eventID = uuid.NewString()
}

ceCtx := ce.ContextWithTarget(context.Background(), fmt.Sprintf("%s%s/gateway/%s", httpPrefix, endpoint, eventbus))
ceCtx := ce.ContextWithTarget(context.Background(), fmt.Sprintf("%s%s/gateway/%s", HttpPrefix, Endpoint, eventbus))
event := ce.NewEvent()
event.SetID(eventID)
event.SetSource(eventSource)
Expand All @@ -194,20 +217,19 @@ func putEvent(eventbus, eventID, eventType, eventBody, eventSource string) error
return nil
}

func putEvents(eventNum, threadNum int64) error {
func putEvents(offset, eventNum, threadNum int64, eventBus, eventBody, eventSource string) error {
var (
i int64 = 1
eventid int64 = 1
i int64
eventid int64 = offset
wg sync.WaitGroup
)
for i = 1; i <= threadNum; i++ {
first := eventid
last := i * eventNum / threadNum
last := eventid + eventNum/threadNum
wg.Add(1)
go func(first, last int64) {
fmt.Printf("first: %d, last: %d\n", first, last)
for n := first; n <= last; n++ {
putEvent(eventBus, fmt.Sprintf("%d", n), eventType, eventBody, eventSource)
for n := first; n < last; n++ {
putEvent(eventBus, fmt.Sprintf("%d", n), EventType, eventBody, eventSource)
}
wg.Done()
}(first, last)
Expand All @@ -218,15 +240,15 @@ func putEvents(eventNum, threadNum int64) error {
}

func getEvent(eventbus, offset, number string) error {
idx := strings.LastIndex(endpoint, ":")
port, err := strconv.Atoi(endpoint[idx+1:])
idx := strings.LastIndex(Endpoint, ":")
port, err := strconv.Atoi(Endpoint[idx+1:])
if err != nil {
log.Errorf("parse gateway port failed: %s, endpoint: %s", err, endpoint)
log.Errorf("parse gateway port failed: %s, endpoint: %s", err, Endpoint)
return err
}
newEndpoint := fmt.Sprintf("%s:%d", endpoint[:idx], port+1)
url := fmt.Sprintf("%s%s/getEvents?eventbus=%s&offset=%s&number=%s", httpPrefix, newEndpoint, eventbus, offset, number)
evevt, err := httpClient.NewRequest().Get(url)
newEndpoint := fmt.Sprintf("%s:%d", Endpoint[:idx], port+1)
url := fmt.Sprintf("%s%s/getEvents?eventbus=%s&offset=%s&number=%s", HttpPrefix, newEndpoint, eventbus, offset, number)
evevt, err := HttpClient.NewRequest().Get(url)
if err != nil {
log.Errorf("get event failed, err: %s\n", err)
return err
Expand All @@ -236,27 +258,126 @@ func getEvent(eventbus, offset, number string) error {
}

func Test_e2e_base() {
eventBus := "eventbus-base"
err = createEventbus(eventBus)
if err != nil {
return
}

err = createSubscription(eventBus, Sink, Source, Filters, InputTransformer)
if err != nil {
return
}

putEvents(0, 10000, 100, eventBus, EventBody, EventSource)

err = getEvent(eventBus, "0", "10000")
if err != nil {
log.Error("Test_e2e_base get event failed")
return
}
log.Info("Test_e2e_base get event success")
}

func Test_e2e_filter() {
eventBus := "eventbus-filter"
err = createEventbus(eventBus)
if err != nil {
return
}

filters := "[{\"exact\": {\"source\":\"filter\"}}]"
err = createSubscription(eventBus, Sink, Source, filters, InputTransformer)
if err != nil {
return
}

filters = "[{\"cel\": \"$key.(string) == \\\"value\\\"\"}]"
err = createSubscription(eventBus, Sink, Source, filters, InputTransformer)
if err != nil {
return
}

putEvents(0, 2000, 100, eventBus, EventBody, EventSource)
eventSource := "filter"
putEvents(2000, 4000, 10, eventBus, EventBody, eventSource)
eventBody := "{\"key\":\"value\"}"
putEvents(4000, 4000, 100, eventBus, eventBody, EventSource)

err = getEvent(eventBus, "0", "10000")
if err != nil {
log.Error("Test_e2e_filter get event failed")
return
}
log.Info("Test_e2e_filter get event success")
}

func Test_e2e_transformation() {
eventBus := "eventbus-transformation"
err = createEventbus(eventBus)
if err != nil {
return
}

err = createSubscription(eventBus, sink, source, filters, inputTransformer)
inputTransformer := "{\"template\": \"{\\\"transKey\\\": \\\"transValue\\\"}\"}"
err = createSubscription(eventBus, Sink, Source, Filters, inputTransformer)
if err != nil {
return
}

putEvents(10000, 100)
putEvents(0, 10000, 100, eventBus, EventBody, EventSource)

err = getEvent(eventBus, "0", "10000")
if err != nil {
log.Error("Test_e2e_transformation get event failed")
return
}
log.Info("Test_e2e_filter get event success")
}

func Test_e2e_filter() {}
func Test_e2e_metadata() {
eventBus := "eventbus-meta"
err = createEventbus(eventBus)
if err != nil {
return
}

func Test_e2e_transformation() {}
// Currently, only check metadata of eventbus
var path string = fmt.Sprintf("%s/%s", EventbusKeyPrefixInKVStore, eventBus)
ctx := context.Background()
meta, err := EtcdClient.Get(ctx, path)
if err != nil {
log.Errorf("get metadata failed, path: %s, err: %s\n", path, err.Error())
return
}
log.Infof("get metadata success, path: %s, mata: %s\n", path, string(meta))
}

// func Test_e2e_full() {
// err = createEventbus(EventBus)
// if err != nil {
// return
// }

// filters := "[{\"exact\": {\"source\":\"filter\"}}]"
// inputTransformer := "{\"template\": \"{\"transKey\": \"transValue\"}\"}"
// err = createSubscription(EventBus, Sink, Source, filters, inputTransformer)
// if err != nil {
// return
// }

// putEvents(0, 5000, 100, EventBody, EventSource)
// eventSource := "filter"
// putEvents(5000, 5000, 100, EventBody, eventSource)

// // check event number
// err = getEvent(EventBus, "0", "10000")
// if err != nil {
// log.Error("Test_e2e_base get event failed")
// return
// }
// log.Info("Test_e2e_base get event success")
// }

func main() {
log.Info("start e2e test base case...")
Expand All @@ -267,5 +388,9 @@ func main() {

Test_e2e_transformation()

Test_e2e_metadata()

// Test_e2e_full()

log.Info("finish e2e test base case...")
}
13 changes: 13 additions & 0 deletions test/yaml/etcd-srv.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
apiVersion: v1
kind: Service
metadata:
name: my-service
namespace: vanus
spec:
type: NodePort
selector:
app: vanus-controller
ports:
- port: 2379
targetPort: 2379
nodePort: 30007

0 comments on commit 809abab

Please sign in to comment.