Skip to content

Commit

Permalink
Merge pull request #1 from apache/main
Browse files Browse the repository at this point in the history
update
  • Loading branch information
Alonexc authored Apr 24, 2023
2 parents 4dd0b11 + 9dabd4b commit 6d41cda
Show file tree
Hide file tree
Showing 5 changed files with 7 additions and 99 deletions.
5 changes: 5 additions & 0 deletions .asf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
github:
description: EventMesh go
homepage: https://eventmesh.apache.org/
features:
# Enable issue management
issues: true
# Enable wiki
wiki: true
labels:
- pubsub
enabled_merge_buttons:
Expand Down
25 changes: 0 additions & 25 deletions plugin/connector/rocketmq/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,31 +93,6 @@ func TestConsumer_Subscribe(t *testing.T) {
consumer.Shutdown()
}

//func TestConsumer_Subscribe_Broker(t *testing.T) {
// properties := make(map[string]string)
// properties["access_points"] = "127.0.0.1:9876"
// properties["consumer_group"] = "test_group"
// consumer := NewConsumer()
// consumer.InitConsumer(properties)
// consumer.Start()
//
// listener := &connector.EventListener{
// Consume: func(event *ce.Event, commitFunc connector.CommitFunc) error {
// fmt.Println(event)
// commitFunc(connector.CommitMessage)
// return nil
// },
// }
//
// consumer.RegisterEventListener(listener)
// err := consumer.Subscribe("TopicTest")
// if err != nil {
// panic(err)
// }
//
// time.Sleep(100 * time.Second)
//}

func GetFullMessage() *primitive.Message {
event := ce.NewEvent()
event.SetSubject("TopicTest")
Expand Down
25 changes: 0 additions & 25 deletions plugin/connector/rocketmq/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,28 +187,3 @@ func GetFullEvent() *ce.Event {
return &event
}

//func TestProducer_Publish_Broker(t *testing.T) {
// properties := make(map[string]string)
// properties["access_points"] = "127.0.0.1:9876"
// producer := NewProducer()
// producer.InitProducer(properties)
// producer.Start()
//
// var wg sync.WaitGroup
// wg.Add(1)
// callback := &connector.SendCallback{
// OnSuccess: func(result *connector.SendResult) {
// require.True(t, result.Topic == "TopicTest")
// wg.Done()
// },
// OnError: func(result *connector.ErrorResult) {
// panic(result.Err)
// },
// }
//
// err := producer.Publish(context.Background(), GetFullEvent(), callback)
// require.True(t, err == nil)
//
// wg.Wait()
// producer.Shutdown()
//}
49 changes: 1 addition & 48 deletions plugin/connector/standalone/standalone_connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,54 +147,7 @@ func TestConsumer_ManualAck(t *testing.T) {

// TODO update later
func TestConsumer_UpdateOffset(t *testing.T) {
//sum := atomic.NewInt64(0)
//ch := make(chan struct{})
//listener := connector.EventListener{
// Consume: func(event *ce.Event, commitFunc connector.CommitFunc) error {
// var data map[string]interface{}
// event.DataAs(&data)
// sum.Add(int64(data["val"].(float64)))
// commitFunc(connector.CommitMessage)
// ch <- struct{}{}
// return nil
// },
//}
//
//factory := &Factory{}
//err := factory.Setup(pluginName, &plugin.YamlNodeDecoder{
// Node: &yaml.Node{},
//})
//assert.NoError(t, err)
//consumer, _ := factory.GetConsumer()
//consumer.Start()
//defer consumer.Shutdown()
//consumer.RegisterEventListener(&listener)
//event := getTestEvent()
//event.SetExtension("offset", "49")
//consumer.Subscribe(topicName)
//consumer.UpdateOffset(context.Background(), []*ce.Event{event})
//
//producer, _ := factory.GetProducer()
//producer.Start()
//defer producer.Shutdown()
//for i := 1; i <= 50; i++ {
// err := producer.Publish(context.Background(), getTestEventOfData(map[string]interface{}{
// "val": i,
// }), getEmptyPublishCallback())
//
// if err != nil {
// t.Fail()
// return
// }
//}
//
//timer := time.NewTimer(3 * time.Second)
//select {
//case <-timer.C:
// t.Fail()
//case <-ch:
// assert.Equal(t, int64(50), sum.Load())
//}

}

func getTestEvent(topicName string) *ce.Event {
Expand Down
2 changes: 1 addition & 1 deletion runtime/core/protocol/grpc/retry/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@ func (c *Retry) SetDelay(delay time.Duration) *Retry {
}

func (c *Retry) GetDelay() time.Duration {
return c.ExecuteTime.Sub(time.Now())
return time.Until(c.ExecuteTime)
}

0 comments on commit 6d41cda

Please sign in to comment.