Skip to content

Commit

Permalink
Merge pull request zilliztech#6 from zhuwenxing/add_cdc_test_main
Browse files Browse the repository at this point in the history
test: add cdc test
  • Loading branch information
zhuwenxing authored Apr 10, 2023
2 parents d718099 + 05fe45c commit 3872f98
Show file tree
Hide file tree
Showing 22 changed files with 1,927 additions and 20 deletions.
2 changes: 0 additions & 2 deletions .github/ISSUE_TEMPLATE/bug_report.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,13 @@ body:
2. With this config...
3. Run '...'
4. See error...
render: markdown
validations:
required: false
- type: textarea
attributes:
label: Environment
description: |
Enter the Environment Details:
render: markdown
validations:
required: false
- type: textarea
Expand Down
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,7 @@
**/.idea/*
**/dist/*
**/*.log/*
**/volumes/*
**/__pycache__/*
**/.pytest_cache/*
server/cdc
51 changes: 51 additions & 0 deletions scripts/export_log_k8s.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#!/bin/bash

# Exit immediately for non zero status
set -e

ns_name=$1
instance_name=$2
log_dir=${3:-"k8s_logs"}

#show proxy pod log
array=($(kubectl get pod -n ${ns_name} -l "component=proxy, app.kubernetes.io/instance=${instance_name}"| awk 'NR == 1 {next} {print $1}'))
echo ${array[@]}

for pod in ${array[*]}
do
echo "show log of proxy pod $pod "
kubectl logs $pod -n ${ns_name} --tail=100 || echo "show log for pod $pod failed"
done

# export info of etcd
array=($(kubectl get pod -n ${ns_name} -l "app.kubernetes.io/name=etcd, app.kubernetes.io/instance=${instance_name}"| awk 'NR == 1 {next} {print $1}'))
echo ${array[@]}
mkdir -p $log_dir/etcd_session
for pod in ${array[*]}
do
echo "check session for etcd pod $pod "
kubectl exec $pod -n ${ns_name} -- etcdctl get --prefix by-dev/meta/session > ./$log_dir/etcd_session/$pod.log || echo "export session for pod $pod failed"
done
echo "check session done"

# export logs of all pods
array_1=($(kubectl get pod -n ${ns_name} -l "app.kubernetes.io/instance=${instance_name}"| awk 'NR == 1 {next} {print $1}'))
array_2=($(kubectl get pod -n ${ns_name} -l "release=${instance_name}"| awk 'NR == 1 {next} {print $1}'))
array=(${array_1[@]} ${array_2[@]})

echo ${array[@]}
if [ ! -d $log_dir/pod_log ] || [ ! -d $log_dir/pod_describe ];
then
mkdir -p $log_dir/pod_log
mkdir -p $log_dir/pod_log_previous
mkdir -p $log_dir/pod_describe
fi
echo "export logs start"
for pod in ${array[*]}
do
echo "export logs for pod $pod "
kubectl logs $pod -n ${ns_name} > ./$log_dir/pod_log/$pod.log 2>&1
kubectl logs $pod --previous -n ${ns_name} > ./$log_dir/pod_log_previous/$pod.log 2>&1 || echo "pod $pod has no previous log"
kubectl describe pod $pod -n ${ns_name} > ./$log_dir/pod_describe/$pod.log
done
echo "export logs done"
56 changes: 56 additions & 0 deletions scripts/export_log_k8s_for_operator.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#!/bin/bash

# Exit immediately for non zero status
set -e


instance_name=$1
ns_name=${2:-"chaos-testing"}
log_dir=${3:-"k8s_logs"}

#show proxy pod log
array=($(kubectl get pod -n ${ns_name} -l "app.kubernetes.io/component=proxy, app.kubernetes.io/instance=${instance_name}"| awk 'NR == 1 {next} {print $1}'))
echo ${array[@]}

for pod in ${array[*]}
do
echo "show log of proxy pod $pod "
kubectl logs $pod -n ${ns_name} --tail=100 || echo "show log for pod $pod failed"
done

# export info of etcd
array=($(kubectl get pod -n ${ns_name} -l "app.kubernetes.io/name=etcd, app.kubernetes.io/instance=${instance_name}-etcd"| awk 'NR == 1 {next} {print $1}'))
echo ${array[@]}
mkdir -p $log_dir/etcd_session
for pod in ${array[*]}
do
echo "check session for etcd pod $pod "
kubectl exec $pod -n ${ns_name} -- etcdctl get --prefix by-dev/meta/session > ./$log_dir/etcd_session/$pod.log || echo "export session for pod $pod failed"
done
echo "check session done"

# export logs of all pods
array_1=($(kubectl get pod -n ${ns_name} -l "app.kubernetes.io/instance=${instance_name}"| awk 'NR == 1 {next} {print $1}'))
array_2=($(kubectl get pod -n ${ns_name} -l "app.kubernetes.io/instance=${instance_name}-etcd"| awk 'NR == 1 {next} {print $1}'))
array_3=($(kubectl get pod -n ${ns_name} -l "release=${instance_name}-minio"| awk 'NR == 1 {next} {print $1}'))
array_4=($(kubectl get pod -n ${ns_name} -l "app.kubernetes.io/instance=${instance_name}-pulsar"| awk 'NR == 1 {next} {print $1}'))
array_5=($(kubectl get pod -n ${ns_name} -l "app.kubernetes.io/instance=${instance_name}-kafka"| awk 'NR == 1 {next} {print $1}'))

array=(${array_1[@]} ${array_2[@]} ${array_3[@]} ${array_4[@]} ${array_5[@]})

echo ${array[@]}
if [ ! -d $log_dir/pod_log ] || [ ! -d $log_dir/pod_describe ];
then
mkdir -p $log_dir/pod_log
mkdir -p $log_dir/pod_log_previous
mkdir -p $log_dir/pod_describe
fi
echo "export logs start"
for pod in ${array[*]}
do
echo "export logs for pod $pod "
kubectl logs $pod -n ${ns_name} > ./$log_dir/pod_log/$pod.log 2>&1 || echo "export log for pod $pod failed"
kubectl logs $pod --previous -n ${ns_name} > ./$log_dir/pod_log_previous/$pod.log 2>&1 || echo "pod $pod has no previous log"
kubectl describe pod $pod -n ${ns_name} > ./$log_dir/pod_describe/$pod.log || echo "export describe for pod $pod failed"
done
echo "export logs done"
1 change: 1 addition & 0 deletions scripts/uninstall_milvus.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#!/bin/bash
# Exit immediately for non zero status
set -e
release=${1:-"milvus-chaos"}
Expand Down
26 changes: 26 additions & 0 deletions scripts/uninstall_milvus_for_operator.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#!/bin/bash
# Exit immediately for non zero status
set -e
release=${1:-"milvus-chaos"}
ns=${2:-"chaos-testing"}
kubectl delete milvus ${release} -n=${ns} || echo "delete milvus ${release} failed"

# uninstall helm release
helm_release_list=('minio' 'etcd' 'kafka' 'pulsar')
for helm_release in ${helm_release_list[*]}; do
echo "unistall helm release ${release}-${helm_release}"
helm uninstall ${release}-${helm_release} -n=${ns} || echo "delete helm release ${release}-${helm_release} failed"
done
# delete pvc for storage
pvc_list=('minio')
for pvc in ${pvc_list[*]}; do
echo "delete pvc with label release=${release}-${pvc}"
kubectl delete pvc -l release=${release}-${pvc} -n=${ns} || echo "delete pvc with label release=${release}-${pvc} failed"
done

# delete pvc of etcd and message queue
pvc_list=('etcd' 'kafka' 'pulsar')
for pvc in ${pvc_list[*]}; do
echo "delete pvc with label app.kubernetes.io/instance=${release}-${pvc}"
kubectl delete pvc -l app.kubernetes.io/instance=${release}-${pvc} -n=${ns} || echo "delete pvc with label release=${release}-${pvc} failed"
done
7 changes: 1 addition & 6 deletions server/configs/cdc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,4 @@ source:
tenant: public
namespace: default
kafka:
broker_list: 127.0.0.1:9092





broker_list: 127.0.0.1:9092
145 changes: 145 additions & 0 deletions tests/api/milvus_cdc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
import json
import requests


class MilvusCdcClient:

def __init__(self, url) -> None:
self.url = url
self.headers = {'Content-Type': 'application/json'}

def create_task(self, request_data):
url = self.url + '/cdc'
body = {
"request_type": "create",
"request_data": request_data
}
payload = json.dumps(body)
response = requests.post(url, headers=self.headers, data=payload)
if response.status_code == 200:
return response.json(), True
else:
return response.text, False

def list_tasks(self):
url = self.url + '/cdc'
body = {
"request_type": "list"
}
payload = json.dumps(body)
response = requests.post(url, headers=self.headers, data=payload)
if response.status_code == 200:
return response.json(), True
else:
return response.text, False

def get_task(self, task_id):
url = self.url + '/cdc'
body = {
"request_type": "get",
"request_data": {
"task_id": task_id
}
}
payload = json.dumps(body)
response = requests.post(url, headers=self.headers, data=payload)
if response.status_code == 200:
return response.json(), True
else:
return response.text, False

def pause_task(self, task_id):
url = self.url + '/cdc'
body = {
"request_type": "pause",
"request_data": {
"task_id": task_id
}
}
payload = json.dumps(body)
response = requests.post(url, headers=self.headers, data=payload)
if response.status_code == 200:
return response.json(), True
else:
return response.text, False

def resume_task(self, task_id):
url = self.url + '/cdc'
body = {
"request_type": "resume",
"request_data": {
"task_id": task_id
}
}
payload = json.dumps(body)
response = requests.post(url, headers=self.headers, data=payload)
if response.status_code == 200:
return response.json(), True
else:
return response.text, False

def delete_task(self, task_id):
url = self.url + '/cdc'
body = {
"request_type": "delete",
"request_data": {
"task_id": task_id
}
}
payload = json.dumps(body)
response = requests.post(url, headers=self.headers, data=payload)
if response.status_code == 200:
return response.json(), True
else:
return response.text, False

if __name__ == '__main__':
client = MilvusCdcClient('http://localhost:8444')
# create task
create_request_data = {
"milvus_connect_param": {
"host": "10.101.128.34",
"port": 19530,
"username": "",
"password": "",
"enable_tls": False,
"ignore_partition": True,
"connect_timeout": 10
},
"collection_infos": [
{
"name": "hello_milvus_v4"
}
]
}
rsp = client.create_task(create_request_data)
print(rsp)
# list tasks
rsp, _ = client.list_tasks()
task_ids = []
for task in rsp['tasks']:
print(task)
task_ids.append(task['task_id'])

print(task_ids)
# get task
rsp, _ = client.get_task(task_ids[0])
print(rsp)
# pause task
rsp, _ = client.pause_task(task_ids[0])
print(rsp)
# get task
rsp, _ = client.get_task(task_ids[0])
print(rsp)
# resume task
rsp, _ = client.resume_task(task_ids[0])
print(rsp)
# get task
rsp, _ = client.get_task(task_ids[0])
print(rsp)
# delete task
rsp, _ = client.delete_task(task_ids[0])
print(rsp)
# get task
rsp, _ = client.get_task(task_ids[0])
print(rsp)
Loading

0 comments on commit 3872f98

Please sign in to comment.