Skip to content

Commit

Permalink
Remove data directory for test (milvus-io#14363)
Browse files Browse the repository at this point in the history
Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
  • Loading branch information
xiaofan-luan authored Dec 29, 2021
1 parent 1408926 commit 029b153
Show file tree
Hide file tree
Showing 9 changed files with 63 additions and 46 deletions.
8 changes: 6 additions & 2 deletions internal/kv/rocksdb/rocksdb_kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package rocksdbkv_test

import (
"os"
"strconv"
"sync"
"testing"
Expand All @@ -31,7 +32,7 @@ func TestRocksdbKV(t *testing.T) {
if err != nil {
panic(err)
}

defer os.RemoveAll(name)
defer rocksdbKV.Close()
// Need to call RemoveWithPrefix
defer rocksdbKV.RemoveWithPrefix("")
Expand Down Expand Up @@ -92,7 +93,7 @@ func TestRocksdbKV_Prefix(t *testing.T) {
if err != nil {
panic(err)
}

defer os.RemoveAll(name)
defer rocksdbKV.Close()
// Need to call RemoveWithPrefix
defer rocksdbKV.RemoveWithPrefix("")
Expand Down Expand Up @@ -150,6 +151,7 @@ func TestRocksdbKV_Goroutines(t *testing.T) {
name := "/tmp/rocksdb"
rocksdbkv, err := rocksdbkv.NewRocksdbKV(name)
assert.Nil(t, err)
defer os.RemoveAll(name)
defer rocksdbkv.Close()
defer rocksdbkv.RemoveWithPrefix("")

Expand All @@ -175,6 +177,7 @@ func TestRocksdbKV_DummyDB(t *testing.T) {
name := "/tmp/rocksdb_dummy"
rocksdbkv, err := rocksdbkv.NewRocksdbKV(name)
assert.Nil(t, err)
defer os.RemoveAll(name)
defer rocksdbkv.Close()
defer rocksdbkv.RemoveWithPrefix("")

Expand Down Expand Up @@ -209,6 +212,7 @@ func TestRocksdbKV_CornerCase(t *testing.T) {
name := "/tmp/rocksdb_corner"
rocksdbkv, err := rocksdbkv.NewRocksdbKV(name)
assert.Nil(t, err)
defer os.RemoveAll(name)
defer rocksdbkv.Close()
defer rocksdbkv.RemoveWithPrefix("")
_, err = rocksdbkv.Load("")
Expand Down
8 changes: 8 additions & 0 deletions internal/storage/print_binlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func TestPrintBinlogFilesInt64(t *testing.T) {
w.Close()

fd, err := ioutil.TempFile("", "binlog_int64.db")
defer os.RemoveAll(fd.Name())
assert.Nil(t, err)
num, err := fd.Write(buf)
assert.Nil(t, err)
Expand Down Expand Up @@ -322,6 +323,9 @@ func TestPrintBinlogFiles(t *testing.T) {
binlogFiles = append(binlogFiles, "test")

PrintBinlogFiles(binlogFiles)
for _, file := range binlogFiles {
_ = os.RemoveAll(file)
}
}

func TestPrintDDFiles(t *testing.T) {
Expand Down Expand Up @@ -433,6 +437,10 @@ func TestPrintDDFiles(t *testing.T) {
assert.Equal(t, resultRequests, ddRequests)

PrintBinlogFiles(binlogFiles)

for _, file := range binlogFiles {
_ = os.RemoveAll(file)
}
}

func TestPrintIndexFile(t *testing.T) {
Expand Down
12 changes: 8 additions & 4 deletions internal/util/mqclient/rmq_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package mqclient

import (
"sync"
"sync/atomic"

"github.com/milvus-io/milvus/internal/util/rocksmq/client/rocksmq"
)
Expand All @@ -23,7 +24,7 @@ type RmqConsumer struct {
msgChannel chan Message
closeCh chan struct{}
once sync.Once
skip bool
skip int32
}

// Subscription returns the subscription name of this consumer
Expand All @@ -44,10 +45,11 @@ func (rc *RmqConsumer) Chan() <-chan Message {
close(rc.msgChannel)
return
}
if !rc.skip {
skip := atomic.LoadInt32(&rc.skip)
if skip != 1 {
rc.msgChannel <- &rmqMessage{msg: msg}
} else {
rc.skip = false
atomic.StoreInt32(&rc.skip, 0)
}
case <-rc.closeCh:
close(rc.msgChannel)
Expand All @@ -64,7 +66,9 @@ func (rc *RmqConsumer) Chan() <-chan Message {
func (rc *RmqConsumer) Seek(id MessageID, inclusive bool) error {
msgID := id.(*rmqID).messageID
// skip the first message when consume
rc.skip = !inclusive
if !inclusive {
atomic.StoreInt32(&rc.skip, 1)
}
return rc.c.Seek(msgID)
}

Expand Down
26 changes: 12 additions & 14 deletions internal/util/rocksmq/client/rocksmq/client_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,6 @@ import (

var rmqPath = "/tmp/rocksmq_client"

func TestMain(m *testing.M) {
os.MkdirAll(rmqPath, os.ModePerm)
defer os.RemoveAll(rmqPath)
os.Exit(m.Run())
}

func TestClient(t *testing.T) {
client, err := NewClient(ClientOptions{})
assert.NotNil(t, client)
Expand All @@ -52,8 +46,9 @@ func TestClient_CreateProducer(t *testing.T) {
assert.Nil(t, producer)

/////////////////////////////////////////////////
rmqPath := rmqPath + "/test_client1"
rmq := newRocksMQ(rmqPath)
os.MkdirAll(rmqPath, os.ModePerm)
rmqPathTest := rmqPath + "/test_client1"
rmq := newRocksMQ(t, rmqPathTest)
defer removePath(rmqPath)
client1, err := NewClient(ClientOptions{
Server: rmq,
Expand Down Expand Up @@ -91,8 +86,9 @@ func TestClient_Subscribe(t *testing.T) {
assert.Nil(t, consumer)

/////////////////////////////////////////////////
rmqPath := rmqPath + "/test_client2"
rmq := newRocksMQ(rmqPath)
os.MkdirAll(rmqPath, os.ModePerm)
rmqPathTest := rmqPath + "/test_client2"
rmq := newRocksMQ(t, rmqPathTest)
defer removePath(rmqPath)
client1, err := NewClient(ClientOptions{
Server: rmq,
Expand Down Expand Up @@ -131,8 +127,9 @@ func TestClient_Subscribe(t *testing.T) {
}

func TestClient_SeekLatest(t *testing.T) {
rmqPath := rmqPath + "/seekLatest"
rmq := newRocksMQ(rmqPath)
os.MkdirAll(rmqPath, os.ModePerm)
rmqPathTest := rmqPath + "/seekLatest"
rmq := newRocksMQ(t, rmqPathTest)
defer removePath(rmqPath)
client, err := NewClient(ClientOptions{
Server: rmq,
Expand Down Expand Up @@ -201,8 +198,9 @@ func TestClient_SeekLatest(t *testing.T) {
}

func TestClient_consume(t *testing.T) {
rmqPath := rmqPath + "/test_client3"
rmq := newRocksMQ(rmqPath)
os.MkdirAll(rmqPath, os.ModePerm)
rmqPathTest := rmqPath + "/test_client3"
rmq := newRocksMQ(t, rmqPathTest)
defer removePath(rmqPath)
client, err := NewClient(ClientOptions{
Server: rmq,
Expand Down
11 changes: 7 additions & 4 deletions internal/util/rocksmq/client/rocksmq/consumer_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
package rocksmq

import (
"os"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -47,8 +48,9 @@ func TestConsumer_newConsumer(t *testing.T) {
assert.Equal(t, InvalidConfiguration, err.(*Error).Result())

/////////////////////////////////////////////////
rmqPath := rmqPath + "/test_consumer1"
rmq := newRocksMQ(rmqPath)
os.MkdirAll(rmqPath, os.ModePerm)
rmqPathTest := rmqPath + "/test_consumer1"
rmq := newRocksMQ(t, rmqPathTest)
defer removePath(rmqPath)
client, err := newClient(ClientOptions{
Server: rmq,
Expand Down Expand Up @@ -117,8 +119,9 @@ func TestConsumer_Subscription(t *testing.T) {
}

func TestConsumer_Seek(t *testing.T) {
rmqPath := rmqPath + "/test_consumer2"
rmq := newRocksMQ(rmqPath)
os.MkdirAll(rmqPath, os.ModePerm)
rmqPathTest := rmqPath + "/test_consumer2"
rmq := newRocksMQ(t, rmqPathTest)
defer removePath(rmqPath)
client, err := newClient(ClientOptions{
Server: rmq,
Expand Down
6 changes: 4 additions & 2 deletions internal/util/rocksmq/client/rocksmq/reader_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package rocksmq

import (
"context"
"os"
"strconv"
"testing"

Expand Down Expand Up @@ -40,8 +41,9 @@ func Test_NewReader(t *testing.T) {
}

func TestReader_Next(t *testing.T) {
rmqPath := rmqPath + "/test_reader"
rmq := newRocksMQ(rmqPath)
os.MkdirAll(rmqPath, os.ModePerm)
rmqPathTest := rmqPath + "/test_reader"
rmq := newRocksMQ(t, rmqPathTest)
defer removePath(rmqPath)
client, err := newClient(ClientOptions{
Server: rmq,
Expand Down
23 changes: 11 additions & 12 deletions internal/util/rocksmq/client/rocksmq/test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ package rocksmq
import (
"fmt"
"os"
"testing"
"time"

"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/rocksmq/server/rocksmq"
server "github.com/milvus-io/milvus/internal/util/rocksmq/server/rocksmq"
"github.com/stretchr/testify/assert"

"go.uber.org/zap"
)
Expand All @@ -43,26 +45,23 @@ func newMockClient() *client {
return client
}

func newRocksMQ(rmqPath string) server.RocksMQ {
rocksdbPath := rmqPath + "_db"
rmq, _ := rocksmq.NewRocksMQ(rocksdbPath, nil)
func newRocksMQ(t *testing.T, rmqPath string) server.RocksMQ {
rocksdbPath := rmqPath
rmq, err := rocksmq.NewRocksMQ(rocksdbPath, nil)
assert.NoError(t, err)
return rmq
}

func removePath(rmqPath string) {
kvPath := rmqPath + "_kv"
err := os.RemoveAll(kvPath)
// remove path rocksmq created
rocksdbPath := rmqPath
err := os.RemoveAll(rocksdbPath)
if err != nil {
log.Error("Failed to call os.removeAll.", zap.Any("path", kvPath))
}
rocksdbPath := rmqPath + "_db"
err = os.RemoveAll(rocksdbPath)
if err != nil {
log.Error("Failed to call os.removeAll.", zap.Any("path", kvPath))
log.Error("Failed to call os.removeAll.", zap.Any("path", rocksdbPath))
}
metaPath := rmqPath + "_meta_kv"
err = os.RemoveAll(metaPath)
if err != nil {
log.Error("Failed to call os.removeAll.", zap.Any("path", kvPath))
log.Error("Failed to call os.removeAll.", zap.Any("path", metaPath))
}
}
9 changes: 4 additions & 5 deletions internal/util/rocksmq/server/rocksmq/rocksmq_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
var Params paramtable.BaseTable
var rmqPath = "/tmp/rocksmq"
var kvPathSuffix = "_kv"
var dbPathSuffix = "_db"
var metaPathSuffix = "_meta"

func InitIDAllocator(kvPath string) *allocator.GlobalIDAllocator {
Expand Down Expand Up @@ -70,7 +69,7 @@ func TestRocksmq_RegisterConsumer(t *testing.T) {
defer os.RemoveAll(kvPath)
idAllocator := InitIDAllocator(kvPath)

rocksdbPath := rmqPath + dbPathSuffix + suffix
rocksdbPath := rmqPath + suffix
defer os.RemoveAll(rocksdbPath + kvSuffix)
defer os.RemoveAll(rocksdbPath)

Expand Down Expand Up @@ -135,7 +134,7 @@ func TestRocksmq_Basic(t *testing.T) {
defer os.RemoveAll(kvPath)
idAllocator := InitIDAllocator(kvPath)

rocksdbPath := rmqPath + dbPathSuffix + suffix
rocksdbPath := rmqPath + suffix
defer os.RemoveAll(rocksdbPath + kvSuffix)
defer os.RemoveAll(rocksdbPath)
rmq, err := NewRocksMQ(rocksdbPath, idAllocator)
Expand Down Expand Up @@ -190,7 +189,7 @@ func TestRocksmq_Dummy(t *testing.T) {
defer os.RemoveAll(kvPath)
idAllocator := InitIDAllocator(kvPath)

rocksdbPath := rmqPath + dbPathSuffix + suffix
rocksdbPath := rmqPath + suffix
defer os.RemoveAll(rocksdbPath + kvSuffix)
defer os.RemoveAll(rocksdbPath)

Expand Down Expand Up @@ -260,7 +259,7 @@ func TestRocksmq_Seek(t *testing.T) {
defer os.RemoveAll(kvPath)
idAllocator := InitIDAllocator(kvPath)

rocksdbPath := rmqPath + dbPathSuffix + suffix
rocksdbPath := rmqPath + suffix
defer os.RemoveAll(rocksdbPath + kvSuffix)
defer os.RemoveAll(rocksdbPath)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestRmqRetention_Basic(t *testing.T) {
atomic.StoreInt64(&RocksmqPageSize, 10)
atomic.StoreInt64(&TickerTimeInSeconds, 2)

rocksdbPath := retentionPath + dbPathSuffix
rocksdbPath := retentionPath
defer os.RemoveAll(rocksdbPath)
metaPath := retentionPath + metaPathSuffix
defer os.RemoveAll(metaPath)
Expand Down Expand Up @@ -136,7 +136,7 @@ func TestRmqRetention_NotConsumed(t *testing.T) {
atomic.StoreInt64(&RocksmqPageSize, 10)
atomic.StoreInt64(&TickerTimeInSeconds, 2)

rocksdbPath := retentionPath + dbPathSuffix
rocksdbPath := retentionPath
defer os.RemoveAll(rocksdbPath)
metaPath := retentionPath + metaPathSuffix
defer os.RemoveAll(metaPath)
Expand Down Expand Up @@ -400,7 +400,7 @@ func TestRetentionInfo_InitRetentionInfo(t *testing.T) {
defer os.RemoveAll(kvPath)
idAllocator := InitIDAllocator(kvPath)

rocksdbPath := retentionPath + dbPathSuffix + suffix
rocksdbPath := retentionPath + suffix
defer os.RemoveAll(rocksdbPath)
metaPath := retentionPath + metaPathSuffix + suffix

Expand Down

0 comments on commit 029b153

Please sign in to comment.