Skip to content

paddle dist architecture (draft) #1620

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Apr 14, 2017

Conversation

helinwang
Copy link
Contributor

@helinwang helinwang commented Mar 15, 2017

Here may be better to view.


One training job will only have one master process, typicall multiple trainer and parameter server processes. Their relation is illustrated in the following graph:

![process collabration](src/paddle-on-kubernetes-invited-blog-model-sharding.png)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use HTML img tag to insert images, thus we can specify the width/height of images. I noticed that with Markdown tags, all images in different scales have the same width. This leads to different text/font sizes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


- keep a list of alive trainers and a list of alive parameter servers and do *health check*,
- if trainer is dead it will update task queue accordingly as mentioned in [task queue](#task-queue).
- if a parameter server is dead or a new parameter server joins, it will broacast this information to all trainers.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we shouldn't allow a changeable number of parameter servers, because every time this number changes, all trainer processes need to re-split their local models so to correspond model shards to the parameter servers.

Copy link
Contributor Author

@helinwang helinwang Mar 15, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, yes that adds a lot of complexity.

Maybe in the first version trainer will have retry logic for sending gradient to parameter server. If a parameter server fails, trainer will keep training, but gradient update is paused until the parameter server backs up. In this way number of parameter server is fixed and master don't have to update trainers the parameter server list.

In the future, maybe we want to allow scaling number of trainers. If trainer can be scaled, number of parameter server probably need to be able to change as well. (e.g., 2 parameter server and 3 trainer, if trainer scaled up to 100, number of parameter server need to be scaled up as well.)


![task queues](src/paddle-task-queues.png)

- Todo queue holds tasks to be dispatched.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here need an additional sentence:

We a job starts, the master process fill in the TODO queue with all tasks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


#### Task Queue

Master process have three task queues to track training progress as shown in the graph below:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to define task somewhere, so could we start explaining task queues.

There are two ways to define tasks:

  1. as in Hadoop MapReduce, a task is a sub-sequence of key-value pairs in a data file, or
  2. as in Google MapReduce, a task is a file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


This poses new challenges for Paddle,

- Paddle need to be tault tolerant.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tault => fault

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


![task states](src/paddle-task-states.png)

1. When a new pass of training starts, all tasks will be placed in the todo queue.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where failed task go?if failed tasks belong to done queue?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a pending task times out, it is moved back to the todo queue.

Copy link
Contributor

@Yancey0623 Yancey0623 Mar 15, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • The meaning of failed task include timeout task, execution failed and so no, so the name of failed task is the better?
  • How to deal with failed task, is there a limit of retry times?

Somehow I can't reply the comment, so I will reply here:

  • execution failure will also be a timeout from the master's perspective (the trainer crashed, so it will be a timeout), maybe we can keep using timeout?
  • Yes, there will be a limit of retry times, as mentioned in the latest design doc.

Helin

Copy link
Contributor Author

@helinwang helinwang Mar 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Yancey1989 Good point! Currently for deep learning, a task is some data instances. To my understanding, execution should never fail (since trainer treat all data the same: go through a lot of multiplications, data should never crash trainer). Maybe in future, for reinforcement learning, execution could fail? (not familiar enough myself for reinforcement learning). Maybe we don't need to worry execution fail for now.

This poses new challenges for Paddle,

- Paddle need to be tault tolerant.
- Input training data can be online data from realtime logs, or batched data from distributed file system.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

感觉 online 的数据不一定不是batched

online <==> batched 这个对应关系不是互斥的。也许是应该是

  • streaming <==> batched
  • online <==> local/offline?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@reyoung 把batched 理解成了 mini-batch 了吧?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@reyoung 这里的batched是指"Batch vs. Real Time / online Data Processing"的batch:)
我搜了一下好像一般是用batch data processing / batch data。那我把batched改成batch吧。

- keep a list of alive trainers and a list of alive parameter servers and do *health check*,
- if trainer is dead it will update task queue accordingly as mentioned in [task queue](#task-queue).
- if a parameter server is dead or a new parameter server joins, it will broacast this information to all trainers.
- dispatches tasks to trainers. A *task* is a unit of data that a trainer needs to train on, and
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Task如何切分呢?譬如对于流式数据,如果master只有一条数据,如何切分给三个trainer呢?
  • 数据作为task扔给trainer是给出一个数据的指针(或者URL),还是直接把数据推送过去?

Copy link
Contributor

@gongweibao gongweibao Mar 15, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我的理解
1.数据只有一条或者小于trainer的个数,都是分不开的。或者不应该启动那么多的trainer。
2.master直接推送数据本身的代价是比较大的,最好推动数据的指针。
上边wangyi讲的:

We need to define task somewhere, so could we start explaining task queues.

Copy link
Contributor Author

@helinwang helinwang Mar 15, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • @reyoungpaddle dist architecture (draft) #1620 (comment) 提到的,这里指的是online数据和离线数据(batch)。Online数据往往也是很多个mini batch。一条一条来的数据也可以稍等一会打成多个task分发出去。

  • 感觉给一个指针(handle)合适一些。由指针判断需要什么reader来解析这个数据,然后reader去读。

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

数据只有一条或者小于trainer的个数,都是分不开的。或者不应该启动那么多的trainer。
Online数据往往也是很多个mini batch。一条一条来的数据也可以稍等一会打成多个task分发出去。

这里有一个问题,如果batch训练的时候,最后一个batch数据数量小于trainer数量怎么分发数据呢? 这种情况比较容易出先,且不是实现能够预料到的。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

design doc这里没有写,只写了data shard总数一般会远远大于trainer数目。但是这个case确实需要考虑到,不然以后写在实现细节的design doc里面吧。

Communication involves two parts:

- upload accumulated gradient. Upload can be configured to happen every **n** mini-batches.
- download new model. Download can be configured to happend every **m** mini-batches. **n** and **m** does not need to be equal.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

所以,这里只有Async-SGD算法的实现么?

SGD算法的同步过程在哪里做呢?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

master应该可以通过控制task的分发来控制同步或者异步。

Copy link
Contributor Author

@helinwang helinwang Mar 15, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@reyoung 我对asgd和sgd的理解都是paper上的理解,没有实现过,也没来得及看paddle parameter server那块的代码。
现在paddle的parameter server是asgd和sgd都支持吗?这块好像你比较熟悉,有空的话你给大家科普一下或者发一个parameter sync design doc的PR?

我现在大概猜测sgd是所有的trainer forward进行完了之后,从顶层开始往底层算梯度。每算完一层就立刻向parameter server发梯度。然后parameter server每收到某层所有trainer的梯度或者有trainer timeout,就把那一层的新参数传给trainer。trainer收到所有层新参数之后才继续进行下一轮。因为我没有实现过,肯定有很多细节没注意到,这个理解有什么不对的地方么?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

刚刚问了一下徐老师,我先开始写一个design doc,@reyoung请你review哈!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

每算完一层就立刻向parameter server发梯度。然后parameter server每收到某层所有trainer的梯度或者有trainer timeout,就把那一层的新参数传给trainer。trainer收到所有层新参数之后才继续进行下一轮。

基本上没错。只是,我看你写的通信过程里,没有看出来在哪有这个等待或者同步的策略。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

嗯,是的,我该把SGD补上。


### Parameter Server Process

Parameter server processes hold model together. Since model parameters are sharded and saved on different parameter servers. All parameter servers collabratively form the global view of trained model.
Copy link
Contributor

@gongweibao gongweibao Mar 15, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果我们采用ring的参数更新方式, parameter serve是否还需要?以什么样的形式存在?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我正准备写一个design doc描述咱们的parameter server和ring方式的区别。

简略地讲是通讯量是一样的,容错以及Asynchronous Stochastic Gradient Decent (ASGD)的支持上多parameter server比ring(那个blog post里讲的是单个parameter server的情况)要好。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ring和parameter server的区别没有写在这个design doc中,我们的分布式全都是用parameter server。区别以后有空再写吧。

![task queues](src/paddle-task-queues.png)

- Todo queue holds tasks to be dispatched.
- Pending queue holds tasks that are currently training by trainers, and a mapping from trainers to their training tasks.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

看起来Pending queue是用来记录正在运行的trainer,那是不是叫做Running queue更好一些?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

感觉todo, pending, done这个组合用的更多一些。以下是google搜索结果:
screen shot 2017-03-15 at 4 28 13 pm
screen shot 2017-03-15 at 4 29 15 pm


![task states](src/paddle-task-states.png)

1. When a new pass of training starts, all tasks will be placed in the todo queue.
Copy link
Contributor

@Yancey0623 Yancey0623 Mar 15, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • The meaning of failed task include timeout task, execution failed and so no, so the name of failed task is the better?
  • How to deal with failed task, is there a limit of retry times?

Somehow I can't reply the comment, so I will reply here:

  • execution failure will also be a timeout from the master's perspective (the trainer crashed, so it will be a timeout), maybe we can keep using timeout?
  • Yes, there will be a limit of retry times, as mentioned in the latest design doc.

Helin

@typhoonzero
Copy link
Contributor

有两个想到的问题:

  1. master可能会挂,master挂掉会导致无法增加新的trainer, pserver或task queue的状态无法维护。Hadoop yarn的做法是:master不是一个进程,而是"ResourceManager"的内存对象、多个"ResourceManager"之间选举,同时只有一个管理所有的Job。这样每个训练的Job还是太复杂了。
  2. 在同步训练的调度中,task数>trainer个数,有task还在todo(或pending)队列中等待,此时有trainer完成了一个pass,则会开始下一个pass的计算,这样可能有task始终分配不到trainer,但trainer是需要等待所有task的上一个pass完成才能进行和pserver的同步,这样是否就会出现Job一直等待的情况?

@xiang90
Copy link

xiang90 commented Mar 22, 2017

master可能会挂,master挂掉会导致无法增加新的trainer, pserver或task queue的状态无法维护。Hadoop yarn的做法是:master不是一个进程,而是"ResourceManager"的内存对象、多个"ResourceManager"之间选举,同时只有一个管理所有的Job。这样每个训练的Job还是太复杂了。

master 尽量做成 stateless. 状态可以通过如同 etcd 这样的一致性高可用存储系统中恢复。master 的死亡只应该影响新的task的生成,而不应该影响正在进行的任何工作。master的恢复应该在分钟左右级别。master死亡导致的暂停也应该短暂。

在同步训练的调度中,task数>trainer个数,

如果同步训练,不是应该每个pass有个barrier?直到当前的所有task都结束,master才会宣布下一个周期的开始?

@typhoonzero
Copy link
Contributor

master 尽量做成 stateless. 状态可以通过如同 etcd 这样的一致性高可用存储系统中恢复。master 的死亡只应该影响新的task的生成,而不应该影响正在进行的任何工作。master的恢复应该在分钟左右级别。master死亡导致的暂停也应该短暂。

赞同!更细节一些的思考,为了保证pserver 和trainer在故障恢复之后可以获得正确的数据,pserver在恢复过程中需要考虑数据一致性的问题。比如pserver可能在trainer正在发送gradient的过程中挂掉,或者在聚合parameter计算过程挂掉,或者在trainer下载parameter时挂掉。在不同时间点挂掉恢复时需要考虑如何保证数据一致性,比如使用两段式提交。

如果同步训练,不是应该每个pass有个barrier?直到当前的所有task都结束,master才会宣布下一个周期的开始?

是的!这里更多应该是一个注意事项,调度器的设计要考虑在任何情况下都不会存在等barrier而task被饿死的情况。

@xiang90
Copy link

xiang90 commented Mar 22, 2017

为了保证pserver 和trainer在故障恢复之后可以获得正确的数据,pserver在恢复过程中需要考虑数据一致性的问题

pserver 这种数据存储服务的故障恢复可以考虑 checkpoint + online recovery。pserver 故障后 会从一个 一致 的 checkpoint 重启。通过 master 通知 每个 trainer。trainer这个时候,在需要的情况下,可以重新做最后一次对pserver的通讯,补偿遗漏的信息。我对 paddlepaddle 具体架构并不熟悉。只是一点小建议。

@helinwang
Copy link
Contributor Author

@xiang90 Thank for the comments! I have updated the fault recovery using etcd. Since you are the expert for etcd, very appreciate if you can take a look!

1. Upon receiving master server's [selection request](#selection-request). The parameter server can load parameters if there are already saved parameters in the save path from selection request. Then Creates key `/ps/<index>` with its contact address as value.
1. Now the parameter server is ready for the trainers' requests.

If the parameter server's etcd lease expires, the parameter server will save its parameters to the given save path and kill itself.
Copy link
Contributor

@gongweibao gongweibao Apr 9, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ps process是有状态的,启动load数据也要花时间,这个是否是ps process不断地刷新自己的lease,保持lease,还是不断地死掉、启动新节点、然后重新load?

Copy link
Contributor Author

@helinwang helinwang Apr 9, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是会不断刷新lease,lease expire的原因可能是network partition,无法连到有效的etcd。

The parameter server will:

- Receive gradient from the trainers, update its parameters, and give the trainers the latest parameters.
- Periodically save its parameters to distributed file system by overriding the previous save.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个地方我们采用loose consistency? according讨论

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! 这里准备略写这个,详细讨论放在另外的文档中。


#### Selection Request

The selection request is a request that the master sends to a parameter server candidate, making it a parameter server available to the trainers. It contains information such as the parameter server index, the optimizaiton algorithm, the parameter save period, and the path for saving parameters.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to make a brief description about "parameter server candidates"? Are candidates need to be started all the time when starting a job?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the concept of parameter server candidates.


The selection process is as follows:

- The master watches `/ps_candidate/` prefix in etcd. When a parameter server candidate joins and there is not enough parameter servers, the master will remove the candidate's entry in `/ps_candidate/` and send a [selection reqeust](#selection-request) to the candidate. Upon receiving the request, the candidate will set key `/ps/<index>` in etcd with a lease to make itself available for the trainers. The `<index>` is from the selection request.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we know "there is not enough parameter servers"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the concept of parameter candidate. And added a key value pair in etcd /ps_desired for number of desired parameter servers.


- The master watches `/ps/` prefix in etcd. When a selected parameter server went offline, the master will select a not yet selected parameter server candidate by sending the selection request to fill the missing parameter server spot.

The graph below shows one parameter server is missing, the cluster management system created a new parameter server. The new parameter server announced itself as a candidate. Then the master filled the missing parameter server spot with the new candidate.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need some description of "cluster management system"?

Copy link
Contributor Author

@helinwang helinwang Apr 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to [Kubernetes](https://kubernetes.io/)


A task is a piece of sharded data to be trained. The total number of tasks will be much bigger than the total number of trainers. The number of data instances inside a task will be much bigger than the mini-batch size.

#### Task Queue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Task queue requires etcd3, this should be considered.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Data shards may be of large amount, like tens of thousands of. I understand the "task queues" are "folders" in etcd, tasks will be write to these folders as "etcd files". So once the concurrent jobs increases, the number of writes to etcd will increase significantly, and this may cause the whole system faults.

One work out is: master only write tasks at "running" status to etcd, and then trainer change the etcd node data to finished when finishes trainning. Master keep the task assign details in memory, and master can recover the task assignment status by reading the finished tasks and the trianing data metadata(if the data is stored on a distributed storage) So we can see the queue may be a "virtual", etcd only stores running and finished nodes, and write requests to etcd will be much less.

Copy link

@xiang90 xiang90 Apr 10, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should use etcd3.

1000s of concurrent writes to etcd3 is not really a problem. but if you can save writes to etcd, do it :P.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@typhoonzero I think we will be using etcd3, since k8s already switched to etcd3.

Copy link
Contributor Author

@helinwang helinwang Apr 10, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@typhoonzero Sorry I did not fully get the proposed solution, from my guess, the solution will need 2 queues: running queue and finished queue. But this PR's proposal is having 3 queues: todo, running, finished. Isn't the difference small? (2 queues vs 3 queues and the total number of tasks in the queue will be eventually the same. E.g., when all tasks finished, every task will be in the finished queue).

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lease 的心跳处理 并不touch 磁盘
lease 心跳处理是非常低成本的,只要你cpu netio足够 一般不会影响

但是还是需要你们做压力测试 根据具体配置看看

Copy link
Contributor Author

@helinwang helinwang Apr 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@typhoonzero 的确,暴力写这么多条不大合适。
现在只用etcd的解决方案有以下两个:

  • 可以像@xiang90说的用transaction来batch
  • 也可以10000多条文本存到一个value里面(10000条文本,每条100字节也就100*10000/1000/1000=1mb)。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xiang90 谢谢这么多有用的评论!:)。
有个疑问:请问为啥"磁盘写入只有 50MB 也会写至少5秒的"?我google了一下,“For a modern 7200 RPM drive it ranges from about 80-160 MB/s today”,貌似50MB只要不到1秒?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@helinwang

一般来说 磁盘有~100MB左右写入 ssd可以到400MB RAID可以到1GB

问题是这样你假设了 你的进程可以独占整机的磁盘 这个假设可能不正确

另外在云环境上 也不能如此假设

50MB/s 只是我给的一个例子 意思是 我们也要考虑到物理资源的限制 如果io不够 也是不能很快写入的 不论你如何batch_

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xiang90 有道理!谢谢!!!


Now we will explain the concepts mentioned above:

#### Selection Request
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parameter servers can do "selection" by themselves:

  1. when starting a new job, N parameter servers are started.
  2. when adding new parameter server, current parameter servers can receive a signal to trigger some action, like model data(parameters) rehash
  3. when parameter server dies or mannually removed, they can also receive a signal to trigger recovery procedure.

Copy link
Contributor Author

@helinwang helinwang Apr 10, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think using a master for all the coordination (parameter server selection, assign save path for each parameter server, ...) can simplify the logic and avoid bugs.

I could be wrong, but I could not easily figure out according to below:

when adding new parameter server, current parameter servers can receive a signal to trigger some action, like model data(parameters) rehash

Because how to do the parameter servers coordinate with each other is not very clear to me. Maybe letting the master do it would be simpler?

Copy link
Contributor Author

@helinwang helinwang Apr 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@typhoonzero Just talked with Yi. He reminded me that etcd3 supports transaction. So you are right, the parameter servers can register by themselves without the help from the master. I did not know that because of knowledge with etcd3 is not deep. Really appreciate your suggestion!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated so that the parameter server announce them using transaction. Have not mentioned rehash, since currently, we don't need to support scaling parameter servers.


<img src="src/paddle-task-queues.png" height="400"/>

- The todo queue holds tasks to be dispatched. When a job starts, the master process fills in the todo queue with all tasks.
Copy link
Contributor

@Yancey0623 Yancey0623 Apr 10, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the life cycle of task, it seems that the same with pod phase? shall we use Job in kubernetes instead of task queue?

Copy link
Contributor Author

@helinwang helinwang Apr 10, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting idea! Do you mean we can use k8s' job with work queue (https://kubernetes.io/docs/tasks/job/coarse-parallel-processing-work-queue/)?

I have never thought about it, good that you bring it up! From my understanding, a pod will be created for each task and removed when the task is completed. I think the assumption is that there are not many dependencies between pods that run different tasks (so the pod will be killed after finishing each task).
The assumption may not be valid for our use case: the trainer carries its state across tasks (the model parameters). If we start and delete a trainer pod for each task, it has to re-download and re-initialize the parameters each time, which may introduce a lot of overhead. (I think a task probably finishes within minutes, so start a new pod for each trainer every few minutes may be too heavy).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my mean, task cycle mapping pod phase:

task       :  pod
todo       => pending
pending    => running
finished   => done

But i think too much pod is too heavy, i missed this one:)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inspired by https://kubernetes.io/docs/tasks/job/fine-parallel-processing-work-queue/ , we can embed a queue service in the master, like zeromq or rabbitmq, the queue must be able to store queue data on the disk, so that when master fails, recover will be possible.

Using embed queue service in master will also reduce the dependency of etcd

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@typhoonzero Great idea using a distributed queue! Please see #1620 (comment) for discussion.

Copy link
Contributor Author

@helinwang helinwang Apr 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Yancey1989 有道理,我google了一下,貌似“todo pending done”这个组合最常见,不然先这样用?


### Trainer Process

When the trainer is started by the cluster management system, it executes the following steps at startup:
Copy link
Contributor

@Yancey0623 Yancey0623 Apr 10, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the trainer is started by the cluster management system

It's sure that trainer process is stared by cluster management system in system structure, but the logic is not master process start trainer process by cluster management system?

Another question, how to determine the process number of trainer and parameter? user special at the time of submission or master determines according with data size? According with the description on https://github.com/PaddlePaddle/Paddle/pull/1620/files#diff-1513af467bd4871ad2d541d77c4ed00eR43, maybe setup some parameter server processes before user submission?

Copy link
Contributor Author

@helinwang helinwang Apr 10, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my imagination, the user will start the trainers and the parameter servers. Underlying implementation could be from a k8s config xml created and launched from user's laptop. Not from the master --- let's discuss this if you have a different idea. I am not very sure about the benefit of using the master to start the trainer and the parameter servers --- seems the master is trying to do too many things at once in this case, but I may be wrong.

The number of desired parameter servers can be an environment variable (and saved into etcd. And when we want to support dynamic scaling the number of parameter servers, we need a way to tell the master about it) for the master process. And the master process can know the number of current parameter servers from watching etcd /ps/ and /ps_candidate/ prefix.
The master does not need to know what is the number of desired trainers. It can know the number of current trainers by watching etcd /trainer/ prefix.

About setting up some parameter server process before user submission, I think we will need to pre-setup parameter server process if the setup time is high, so we can pre-setup to save some time. But the setup time for parameter server does not seem to be high?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@helinwang I'm thinking about how to submit a job and setup parameter server process, trainer process, here is my simple idea: #1770 .

@@ -0,0 +1,190 @@
# Distributed Training Design Doc
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Design Doc: Distributed Training

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


## Objective

We want Paddle to support training on the general-purpose cluster. The cluster runs Paddle, the web server (e.g., Nginx), the log collector (e.g., fluentd), the distributed queue service (e.g., Kafka), the log joiner and other data processors written using Storm, Spark, and Hadoop MapReduce on the same cluster. As illustrated in the following graph:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I uploaded my slides to SlideShare.com, in hope that it can make this paragraph shorter, like:


In this slides, we explained that we'd like PaddlePaddle running on general-purpose clusters like those managed by Kubernetes, so to address demands for AI from both Internet and non-Internet industries.

This poses technical challenges to PaddlePaddle:

  1. Support fault-recovery.
  2. Support both offline and online training.
  3. Serverless computing of distributed training.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

- Trainer process
- Parameter server process

One training job will only have one master process, typically multiple trainer processes and parameter server processes. Their relation is illustrated in the following graph:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One => A
will only have one ==> has only one
其实这句话可以不需要了,已经在上文的修改里表示出来了。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Removed.


A training job will be created once user asks Paddle cloud to train a model. The training job is made up of different processes that collaboratively consume data and produce a trained model. There are three kinds of processes:

- Master process
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To describe the roles in a concise but clear way, we might write:


  1. the master process, which dispatches tasks to
  2. one or more trainer processes, which run distributed training and synchronize gradients/models via
  3. one or more parameter server processes, where each holds a shard of the global model.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


The master process will:

- Do [parameter server selection](#parameter-server-selection).
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need parameter server selection? I'd thought that we could have an easy solution -- once a parameter server starts, it registers itself to the etcd directory /paddle/<job_name>/pservers/<i> where <i> is the minimum available ID. Once there are enough number of parameter server processes (and trainer processes) registered, the master process can start dispatching tasks.

Copy link
Collaborator

@wangkuiyi wangkuiyi Apr 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xiang90 could we define a etcd transaction like register_pserver_with_minimum_id?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. get min_id
  2. mid := min_id
  3. txn start
  4. if current_min_id == mid
  5. register
  6. txn end
  7. if txn succeeds, return
  8. if txn fails, retry from 1 (conflicts on setting mid)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed parameter server selection. Parameter servers will do it by themselves using transaction.

The trainer process will:

- Receive the tasks from the master.
- Work on the tasks: alculate and upload gradient to the parameter servers, and update local model by downloading new parameters from the parameter servers.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alculate -> calculate

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Done.

1. Watches the trainer prefix keys `/trainer/` on etcd to find the live trainers.
1. Starts dispatching the tasks to the trainers.

The master process will kill itself if its etcd lease expires.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one thing to note is that the master should always check its ownership of the lock in the same transaction when it modifies the etcd state. most people do this incorrectly... just something to keep in mind, but the bad situation (when the clock on local machine stops or skew...) happens very infrequently anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xiang90 thanks! Very good point.

I was planning to create a single Lease and use it with KV.Put and Mutex.Lock (Mutex is created with a Session with the lease.).

So I suppose in this way I don't have to check the ownership of the lock in the same transaction when it modifies the etcd state (hoping this way could make the code simpler).

The assumption is KV.Put will fail if lease expires, and the master have the lock if lease is not expired -> KV.Put will success only if the master have the lock.
Is my assumption correct?
Thanks!

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

holding lock and maintaining lease is a soft guarantee. etcd client will try its best to give up the ownership when it knows the lease is approaching to expire due to failure of renew. however, this all depends on the local clock. if the local clock "stops", the client might think it still holds the lock/lease while it is not. so if you want 100% safety, you should check if you are the lock owner in the txn.

after you get a lock in etcd, etcd will return you a key of the lock. in every txn, you need to check the existence of the key. it the key is not there, do nothing or clean up.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but in real world, most people just tend to trust the local clock, or they are unaware of this issue. it works most of the time when clock is ok... so it is really up to you... just something that i should point out anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xiang90 I see, great information. We will do it using a txn. Thanks!

@xiang90
Copy link

xiang90 commented Apr 13, 2017

@wangkuiyi i went over the doc. looks ok in general. let me know if you guys have any questions.

@helinwang
Copy link
Contributor Author

@typhoonzero 刚刚意识到好像你的设想是master会管整个集群的所有job。我比较偏向每个job一个master。这样master代码可以简单一点(维护所有的job v.s. 维护一个job)。你觉得呢?

@xiang90
Copy link

xiang90 commented Apr 13, 2017

刚刚意识到好像你的设想是master会管整个集群的所有job。我比较偏向每个job一个master。

i notice this too. i feel we should start with an easy solution: one master one job. we can extend it if needed.

@wangkuiyi
Copy link
Collaborator

wangkuiyi commented Apr 13, 2017

@xiang90 Thank you so much for the comments! We will have subsequent detailed design docs coming out for your review.

@typhoonzero @xiang90 I learned a lot from your comments on the recovery of the system. I think we are lucky that we don't have to recover a parameter server precisely to the state when it crashed. This is because the SGD (stochastic gradient descent) training algorithm is any way stochastic. We can remove indeterminism in unit tests by seeding RNGs, but due to the randomness of distributed scheduling and the shuffling of training data instances, there is usually no way to ensure that two runs of the same training job produce exactly the same model.

@typhoonzero
Copy link
Contributor

刚刚意识到好像你的设想是master会管整个集群的所有job。

@helinwang 这个问题想法是一致的,一个job一个master。

@helinwang helinwang merged commit b088e80 into PaddlePaddle:develop Apr 14, 2017
@helinwang helinwang deleted the architecture branch April 14, 2017 02:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants