-
Notifications
You must be signed in to change notification settings - Fork 5.7k
pserver etcd client #2559
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
pserver etcd client #2559
Conversation
go/pserver/cclient/cclient.go
Outdated
if etcd_addr == "" { | ||
return C.PSERVER_ERROR | ||
} | ||
etcd_addrs := strings.Split(etcd_addr, ",") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I should have mentioned this earlier, the cgo code are supposed to be just a thin layer. Because it's just a interface from C to Go. If it contains too much logic, then when we want to use Pserver from Go to Go, we need to duplicate the logic. Another reason is Go's test does not work well with cgo, it's supposed to be tested from C side. It's much easier to write test for Go.
You can create a new file in pserver package. E.g., etcd.go, and implement this interface: https://github.com/PaddlePaddle/Paddle/blob/develop/go/pserver/client.go#L26
And this interface:
https://github.com/PaddlePaddle/Paddle/blob/develop/go/pserver/client.go#L15
So cgo just need to do:
e := pserver.NewEtcd(addr) // we can call it NewEtcd, or some other name.
c := pserver.NewClient(e, len(as), e)
return add(c)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the comment! I will add a separate module that implement these interfaces.
go/pserver/cclient/cclient.go
Outdated
return err | ||
} | ||
retryTimes := defaultRetryTimes | ||
for retryTimes < 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to try infinitely because we don't know how long it will take til the etcd key is written.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I will optimize this strategy
go/pserver/cclient/cclient.go
Outdated
time.Sleep(time.Second) | ||
continue | ||
} | ||
ps_addr := string(kvs[0].Value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From design doc: https://github.com/PaddlePaddle/Paddle/tree/develop/doc/design/cluster_train#trainer-process-1
ps_addr
is not fetched from one etcd key. Addresses are read from /ps/<id>
which stands for IP address for each pserver, so ps_addr
need to concat all these addresses.
And each trainer must "watch" pserver nodes hand handle events of pserver creation and loss.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done the first part, but I have not implemented the watcher because now the pserver client cannot change to new pserver when running, I have added a TODO and will implment this function in the following pr. Thank you!
go/pserver/cclient/cclient.go
Outdated
}) | ||
if err != nil { | ||
log.Errorln(err) | ||
return err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The return type is C.paddle_pserver_client
, I think the function would panic if client connect etcd failed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
go/pserver/etcd_lister.go
Outdated
} | ||
|
||
// read ps desired number from etcd. | ||
func(p pserverEtcdLister) desired() int { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
p pserverEtcdLister
-> p *pserverEtcdLister
same as func(p pserverEtcdLister) List() []Server
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
go/pserver/etcd_lister.go
Outdated
} | ||
|
||
// read ps desired number from etcd. | ||
func(p pserverEtcdLister) desired() int { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can put https://github.com/PaddlePaddle/Paddle/blob/develop/go/pserver/service.go#L106 as a function and call it directly.
go/pserver/service.go
Outdated
@@ -21,6 +21,10 @@ type ElementType int | |||
const ( | |||
AlreadyInitialized = "pserver already initialized" | |||
Uninitialized = "pserver not fully initialized" | |||
// PsDesired is etcd path for store desired pserver count | |||
DefaultPsDesiredPath = "/ps_desired" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a note: etcdv3 uses only key-values, thought the key looks like a path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok~
go/pserver/etcd_lister.go
Outdated
} | ||
} | ||
|
||
func(p pserverEtcdLister) List() []Server { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In #2551 , trainers need to discover pservers and keep watch pservers using etcd's watch. I think you need to discuss with @Yancey1989 to see where wo implement the "etcd watcher"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for reminding! I will discuss with your today~~
@@ -98,7 +98,7 @@ func (e *EtcdClient) Save(state []byte) error { | |||
// We lost the master lock and can not acquire | |||
// it back, it means some other master is | |||
// already started. We don't want cluster | |||
// managment system to kill the master server | |||
// management system to kill the master server |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have so many typos :p. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:)
go/pserver/cclient/cclient.go
Outdated
func paddle_new_etcd_pserver_client(etcd_addr *C.char) C.paddle_pserver_client { | ||
// TODO(helin): fault tolerant pserver client using etcd. | ||
panic("not implemented.") | ||
func paddle_new_etcd_pserver_client(etcd_addr *C.char, selected int) C.paddle_pserver_client { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to use etcd lock to decide which trainer is selected to initialize the parameter servers. This probably should done in the pserver
package (not here in cgo) by implementing the pserver.Selector
interface. I think in etcd_client.go
mentioned in #2559 (comment) could be a good place to hold such code.
Don't need to do in the PR, but can you add a TODO, and it would be awesome if you can send a follow up PR for it?
Please see more detail here: https://github.com/PaddlePaddle/Paddle/blob/develop/doc/design/cluster_train/pserver_client.md#trainer-selection
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I add a TODO to optimize this part and will learn how to use etcd lock to sync status.
go/pserver/client_test.go
Outdated
ClientTest(t, c1) | ||
} | ||
|
||
func TestEtcdClient(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TestEtcdClient
is not a self-contained test, it requires an etcd server to be running when the test is running. So the CI (after we setup cd go && go test ./...
) will not run correctly if etcd is not running on 127.0.0.1:2379
.
Maybe for know let's disable the test be renaming TestEtcdClient
to EtcdClient
, and user can enable manually when necessary.
We can explore embedding etcd later: #2504
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
go/pserver/etcd_lister.go
Outdated
) | ||
|
||
const ( | ||
DefaultEtcdTimeout time.Duration = time.Second * time.Duration(5) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
5 * time.Second
is fine. Constants in Go is very well designed: https://blog.golang.org/constants
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
go/pserver/etcd_lister.go
Outdated
servers := make([]Server, psDesired) | ||
for { | ||
ctx, cancel := context.WithTimeout(context.Background(), p.timeout) | ||
for i := 0; i < psDesired; i++ { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be batched into a single transaction, here is an related example: https://github.com/PaddlePaddle/Paddle/blob/develop/go/master/etcd_client.go#L121
Something like:
resp, err := e.client.Txn(ctx).If().Then(get0, get1, get2, get3).Commit()
Feel free to improve it later, not a merge blocker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
go/pserver/etcd_lister.go
Outdated
// TODO(Longfei) check the ps address | ||
if psAddr == "" { | ||
cancel() | ||
log.Infof("Get psKey = %s, psAddr is null illegal", psKey, psAddr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only one "%s", but two variable. go vet
is a good tool for detecting this kind of error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, done, thanx~
go/pserver/etcd_lister.go
Outdated
kvs := resp.Kvs | ||
if len(kvs) == 0 { | ||
log.Infof("Waiting for ps addr registered ...") | ||
time.Sleep(p.timeout) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cancel()
need to be called.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
go/pserver/etcd_lister.go
Outdated
DefaultEtcdTimeout time.Duration = time.Second * time.Duration(5) | ||
) | ||
|
||
type pserverEtcdLister struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name could be more concise if it's something like etcdLister
, the package is already pserver
, people read it in their minds as: pserver etcd lister. If it's named as pserverEtcdLister
, it's long, and does not read good: pserver pserver etcd lister.
This type does more than just Lister
(tells desired number of pservers as well), maybe it's best to name it as EtcdClient
(if rename, please change file name as well.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, thanks for this very valuable suggestion!
go/pserver/etcd_lister.go
Outdated
return servers | ||
} | ||
|
||
func NewEtcdAddrLister(endpoints string) (Lister, int) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's less abstraction (interface is an abstraction) if the return type is not an interface:PserverEtcdLister
(I think it's a good idea to make pserverEtcdLister
public, perhaps rename to EtcdClient
, and desired
public), than interface Lister
.
The benefit of no abstraction unless necessary is not very obvious in the beginning, maybe the article below could explain more. I will give a not-so-good-example: At least it's easier to jump to the definition when not returning an interface (editor jumps to the function definition rather than interface definition).
A good general rule in Go would be: accept interfaces, return structs
go/pserver/cclient/cclient.go
Outdated
// TODO(helin): fault tolerant pserver client using etcd. | ||
panic("not implemented.") | ||
func paddle_new_etcd_pserver_client(etcd_addr *C.char, selected int) C.paddle_pserver_client { | ||
addr := C.GoString(etcd_addr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
etcd_addr
=> etcd_endpoints
. We always use etcd_endpoints
to represent comma separated etcd addresses.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
go/pserver/client_test.go
Outdated
} | ||
|
||
//TODO(Qiao: tmperary disable etcdClient test for dependency of etcd) | ||
func EtcdClient(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please make sure to disable it before merging into develop branch. We need to make sure test always runs correctly, since other developers need to run test as well.
go/pserver/etcd_cclient.go
Outdated
) | ||
|
||
const ( | ||
DefaultEtcdTimeout time.Duration = 5 * time.Second |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just FYI DefaultEtcdTimeout = 5 * time.Second
could work as well.
go/pserver/etcd_cclient.go
Outdated
DefaultEtcdTimeout time.Duration = 5 * time.Second | ||
) | ||
|
||
type EtcdCClient interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know it's hard to choose a name, since EtcdClient
is already used ;)
However, EtcdCClient
may not be a good name, because C Client means it's for C. But here is not really related to C (Go can use it as well). Maybe ClientEtcd
?
go/pserver/etcd_cclient.go
Outdated
// TODO(Longfei) | ||
// 1. add watcher to watch the change state of pservers) | ||
// 1. add etcd lock) | ||
type EtcdCClientImpl struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a very common pattern in Java: create interface, and write the implementation, this is due to Java has explicit interface. It's not required for Go: Go has implicit interface. Please see: https://medium.com/@cep21/preemptive-interface-anti-pattern-in-go-54c18ac0668a Unnecessary interface only adds complication to the program.
I think we need to stick to "accept interface, return struct" unless there is good reason not to.
In this case we need to remove type EtcdCClient interface
, rename EtcdCClientImpl
to EtcdCClient
, and make func NewEtcdCClient(endpoints string) (*EtcdCClient, error)
return pointer to the struct.
go/pserver/etcd_cclient.go
Outdated
for i := 0; i < psDesired; i++ { | ||
psKey := PsPath + strconv.Itoa(i) | ||
log.Debugf("checking %s", psKey) | ||
resp, err := p.client.Get(ctx, psKey) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
是不是改成
ctx, cancel := context.WithTimeout(context.Background(), p.timeout)
resp, err := p.client.Get(ctx, psKey)
cancel()
if err != nil {
...
更清楚些?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
好的~
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个地方还不能这么改,因为是在一个循环里边,只能最后cancel。否则下次循环到这里就失败了
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jacquesqiao 下一个循环貌似又创建了一个新的ctx?
for ... {
ctx, cancel := context.WithTimeout(context.Background(), p.timeout)
resp, err := p.client.Get(ctx, psKey)
cancel()
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
go/pserver/etcd_client.go
Outdated
@@ -64,11 +71,11 @@ func (e *EtcdClient) Register() (int, error) { | |||
log.Debugf("inited client to %s", e.etcdEndpoints) | |||
break | |||
} | |||
// init /ps_desired using transaction, for multiple pservers may want to write | |||
// init /ps/desired using transaction, for multiple pservers may want to write |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use ps_desired
in design doc: https://github.com/PaddlePaddle/Paddle/tree/develop/doc/design/cluster_train#parameter-server-process-1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment is not updated
go/pserver/client/etcd_client.go
Outdated
DefaultEtcdTimeout time.Duration = 5 * time.Second | ||
) | ||
|
||
// TODO |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Todo comment styles must be like // TODO: ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
go/pserver/client/etcd_client.go
Outdated
) | ||
|
||
// TODO | ||
// 1. add watcher to watch the change state of pservers) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment for exported structs and functions must be like // EtcdClient ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use gometalinter to check the code styles.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
go/pserver/client/etcd_client.go
Outdated
continue | ||
} | ||
|
||
log.Debugf("Get psDesired number: %d\n", psDesired) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Logrus formated log seems don't need a \n
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
go/pserver/client/etcd_client.go
Outdated
} | ||
} | ||
|
||
func (p *EtcdClient) List() []Server { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exported function must have comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
go/pserver/client/etcd_client.go
Outdated
} | ||
|
||
log.Debugf("Get psDesired number: %d\n", psDesired) | ||
return psDesired |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Break and return.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
go/pserver/client/etcd_client.go
Outdated
log.Debugf("checking %s", psKey) | ||
resp, err := p.client.Get(ctx, psKey) | ||
if err != nil { | ||
cancel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure how the cancel()
should be used. Should always place behind Get
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
go/pserver/client/etcd_client.go
Outdated
for i := 0; i < psDesired; i++ { | ||
psKey := pserver.PsPath + strconv.Itoa(i) | ||
log.Debugf("checking %s", psKey) | ||
resp, err := p.client.Get(ctx, psKey) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pserver clients or trainers may need to "watch" pserver node changes(like pserver down and restarted and recoverd on another node). But this may be in another PR, add some TODO here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, I will do this in next pr.
go/pserver/client/etcd_client.go
Outdated
endpoints []string | ||
} | ||
|
||
// read ps desired number from etcd. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Desired ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
go/pserver/client/etcd_client.go
Outdated
for { | ||
cli, err = clientv3.New(clientv3.Config{ | ||
Endpoints: ep, | ||
DialTimeout: timeout, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we always use the DefaultEtcdTimeout
, maybe we do not need the local variable timeout
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
go/pserver/client/etcd_client.go
Outdated
ep := strings.Split(endpoints, ",") | ||
timeout := DefaultEtcdTimeout | ||
var cli *clientv3.Client | ||
var err error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we don't need the local variable err
, and we can initialize this with cli, err := clientv3.New(...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the cli
will be used outside the for loop, so I have to declare it here, so I can't use cli, err := clientv3.New(...)
..
go/pserver/client/etcd_client.go
Outdated
return servers | ||
} | ||
|
||
func NewEtcd(endpoints string) (*EtcdClient, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the implementation of NewEtcd
, it will never return the error with not nil
, so I think we can return the error if we initialize the etc client failed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove unused err msg
@@ -136,7 +143,7 @@ func (e *EtcdClient) registerPserverEtcd(ctx context.Context) (int, error) { | |||
_, err := concurrency.NewSTM(e.etcdClient, func(c concurrency.STM) error { | |||
registered := false | |||
for i := 0; i < e.desired; i++ { | |||
psKey := "/ps/" + strconv.Itoa(i) | |||
psKey := PsPath + strconv.Itoa(i) | |||
log.Debugf("checking %s", psKey) | |||
ps := c.Get(psKey) | |||
log.Debugf("got value (%s) for key: %s", ps, psKey) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
151行建议改成:
if ps != "" {
continue
}
....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a common style with other code in paddle, so I will not change it this time.
go/pserver/etcd_client.go
Outdated
// TODO: when implementing extending or reducing pservers, /ps_desired is | ||
// changed, then we need to watch /ps_desired node for events. For now, just | ||
// TODO: when implementing extending or reducing pservers, /ps/desired is | ||
// changed, then we need to watch /ps/desired node for events. For now, just |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment is not upated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM++ after comments updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
fix: #2515