Tensorflow使用异步计算来提升计算性能。理解它的工作原理既有助于开发更高效的程序,又有助于在内存资源有限的情况下主动降低计算性能从而减小内存开销。我们先导入本节中实验需要的包或模块。
import tensorflow as tf
import tensorflow.keras as keras
import os
import subprocess
import time
广义上讲,Tensorflow
包括用户直接用来交互的前端和系统用来执行计算的后端。例如,用户可以使用不同的前端语言编写Tensorflow
程序,如Python
、C++
和Javascript
。无论使用何种前端编程语言,Tensorflow
程序的执行主要都发生在C++
实现的后端。换句话说,用户写好的前端Tensorflow
程序会传给后端执行计算。后端有自己的线程在队列中不断收集任务并执行它们。
Tensorflow
通过前端线程和后端线程的交互实现异步计算。异步计算指,前端线程无须等待当前指令从后端线程返回结果就继续执行后面的指令。为了便于解释,假设Python
前端线程调用以下4条指令。
a = tf.ones((1, 2))
b = tf.ones((1, 2))
c = a * b + 2
c
<tf.Tensor: shape=(1, 2), dtype=float32, numpy=array([[3., 3.]], dtype=float32)>
在异步计算中,Python
前端线程执行前3条语句的时候,仅仅是把任务放进后端的队列里就返回了。当最后一条语句需要打印计算结果时,Python
前端线程会等待C++
后端线程把变量c
的结果计算完。此设计的一个好处是,这里的Python前端线程不需要做实际计算。因此,无论Python
的性能如何,它对整个程序性能的影响很小。只要C++
后端足够高效,那么不管前端编程语言性能如何,Tensorflow
都可以提供一致的高性能。
为了演示异步计算的性能,我们先实现一个简单的计时类。
class Benchmark(object):
def __init__(self, prefix=None):
self.prefix = prefix + ' ' if prefix else ''
def __enter__(self):
self.start = time.time()
def __exit__(self, *args):
print('%stime: %.4f sec' % (self.prefix, time.time() - self.start))
下面的例子通过计时来展示Tensorflow2.x
的计算行为。可以看到,当y = tf.keras.backend.sum(tf.transpose(x) * x)
返回的时候需等待变量y真正被计算完,以便pdb
在命令模式下调试。这里的行为不同于MXNet
。在MXNet
中,计算行为发送到C++后端,由print
触发同步行为,等待完成计算。
with Benchmark('Workloads are queued.'):
x = tf.random.uniform(shape=(2000, 2000))
y = tf.keras.backend.sum(tf.transpose(x) * x)
with Benchmark('Workloads are finished.'):
print('sum =', y)
Workloads are queued. time: 0.0808 sec
sum = tf.Tensor(999325.0, shape=(), dtype=float32)
Workloads are finished. time: 0.0001 sec
的确,除非我们需要打印或者保存计算结果,否则我们基本无须关心目前结果在内存中是否已经计算好了。只要数据是保存在NDArray
里并使用MXNet
提供的运算符,MXNet
将默认使用异步计算来获取高计算性能。而Tensorflow
则默认使用命令模式,如果需要提高性能,需要利用tf.function
和AutoGraph
创建比一行命令对应的单独命令节点更大的计算图,使C++
后端更少和前端交互,从而获得更好的性能。
除了刚刚介绍的print
函数外,MXNet
还有其他方法让前端线程等待后端的计算结果完成。我们可以使用 wait_to_read
函数让前端等待某个的NDArray
的计算结果完成,再执行前端中后面的语句。或者,我们可以用waitall
函数令前端等待前面所有计算结果完成。后者是性能测试中常用的方法。下面实现了原文代码的Tensorflow
版本作为对照,为方便区分,使用原文的解释便于搜索,但并不解释下面的代码块。
下面是使用wait_to_read
函数的例子。输出用时包含了变量y
的计算时间。
with Benchmark():
y = tf.keras.backend.sum(tf.transpose(x) * x)
time: 0.0267 sec
下面是使用waitall
函数的例子。输出用时包含了变量y
和变量z
的计算时间。
with Benchmark():
y = tf.keras.backend.sum(tf.transpose(x) * x)
z = tf.keras.backend.sum(tf.transpose(x) * x)
time: 0.0416 sec
此外,任何将NDArray
转换成其他不支持异步计算的数据结构的操作都会让前端等待计算结果。例如,当我们调用asnumpy
函数和asscalar
函数时:
with Benchmark():
y = tf.keras.backend.sum(tf.transpose(x) * x)
y.numpy()
time: 0.0225 sec
with Benchmark():
y = tf.keras.backend.sum(tf.transpose(x) * x)
tf.norm(y).numpy()
time: 0.0226 sec
上面介绍的wait_to_read
函数、waitall
函数、asnumpy
函数、asscalar
函数和print
函数会触发让前端等待后端计算结果的行为。这类函数通常称为同步函数。
在下面的例子中,我们用for
循环不断对变量y
赋值。当在for
循环内执行y = x + 1
时,每次赋值不使用异步计算;当在for
循环外使用tf.function
装饰时,则使用异步计算。
with Benchmark('synchronous.'):
for _ in range(1000):
y = x + 1
@tf.function
def loop():
for _ in range(1000):
y = x + 1
return y
with Benchmark('asynchronous.'):
y = loop()
synchronous. time: 3.5589 sec
asynchronous. time: 1.0457 sec
我们观察到,使用异步计算能提升一定的计算性能。为了解释这一现象,让我们对Python前端线程和C++后端线程的交互稍作简化。在每一次循环中,前端和后端的交互大约可以分为3个阶段:
- 前端令后端将计算任务y = x + 1放进队列;
- 后端从队列中获取计算任务并执行真正的计算;
- 后端将计算结果返回给前端。
我们将这3个阶段的耗时分别设为 t1,t2,t3 。如果不使用异步计算,执行1000次计算的总耗时大约为 1000(t1+t2+t3) ;如果使用异步计算,由于每次循环中前端都无须等待后端返回计算结果,执行1000次计算的总耗时可以降为 t1+1000t2+t3 (假设 1000t2>999t1 )。
为了解释异步计算对内存使用的影响,让我们先回忆一下前面章节的内容。在前面章节中实现的模型训练过程中,我们通常会在每个小批量上评测一下模型,如模型的损失或者精度。细心的读者也许已经发现了,而keras model
的compile
方法会隐式调用tf.function
,触发AutoGraph
,前端会在极短的时间内使后端生成完整的计算图,从而可能导致占用更多内存。当我们使用命令执行模式时,前端在每次迭代时仅会将一个小批量的任务丢给后端执行计算,并通常会减小内存占用。
由于深度学习模型通常比较大,而内存资源通常有限,建议大家在训练模型时对每个小批量操作使用tf.function
函数,而不是整个训练过程。类似地,在使用模型预测时,为了减小内存的占用,也建议大家对每个小批量预测时都使用同步函数。
下面我们来演示异步计算对内存的影响。我们先定义一个数据获取函数data_iter
,它会从被调用时开始计时,并定期打印到目前为止获取数据批量的总耗时。
def data_iter():
start = time.time()
num_batches, batch_size = 100, 1024
for i in range(num_batches):
X = tf.random.normal(shape=(batch_size, 512))
y = tf.ones((batch_size,))
yield X, y
if (i + 1) % 50 == 0:
print('batch %d, time %f sec' % (i+1, time.time()-start))
下面定义多层感知机、优化算法和损失函数。
net = keras.Sequential()
net.add(keras.layers.Dense(2048, activation='relu'))
net.add(keras.layers.Dense(512, activation='relu'))
net.add(keras.layers.Dense(1))
optimizer=keras.optimizers.SGD(0.05)
loss = keras.losses.MeanSquaredError()
这里定义辅助函数来监测内存的使用。需要注意的是,这个函数只能在Linux或macOS上运行。需要支持ps
指令
def get_mem():
res = subprocess.check_output(['ps', 'u', '-p', str(os.getpid())])
return int(str(res).split()[15]) / 1e3
现在我们可以做测试了。我们先试运行一次,让系统把net
的参数初始化。有关初始化的讨论可参见“4.3 模型参数的延后初始化”一节。
for X, y in data_iter():
break
loss(y, net(X))
<tf.Tensor: shape=(), dtype=float32, numpy=0.49068463>
对于训练模型net
来说,我们可以自然地使用命令式方式实现。此时,每个小批量的生成间隔较长,不过内存开销较小。
l_sum, mem = 0, get_mem()
dense_1 = keras.layers.Dense(2048, activation='relu')
dense_2 = keras.layers.Dense(512, activation='relu')
dense_3 = keras.layers.Dense(1)
trainable_variables = (dense_1.trainable_variables +
dense_2.trainable_variables +
dense_3.trainable_variables)
for X, y in data_iter():
with tf.GradientTape() as tape:
logits = net(X)
loss_value = loss(y, logits)
grads = tape.gradient(loss_value, trainable_variables)
optimizer.apply_gradients(zip(grads, trainable_variables))
print('increased memory: %f MB' % (get_mem() - mem))
batch 50, time 7.880550 sec
batch 100, time 15.700529 sec
increased memory: 14.336000 MB
如果转而使用预生成计算图,虽然每个小批量的生成间隔较短,但训练过程中可能会导致内存占用较高。这是因为在默认异步计算下,前端会将所有计算图在短时间内由后端完整生成。这使得在内存保存大量中间计算节点无法释放,从而占用额外内存。
l_sum, mem = 0, get_mem()
for X, y in data_iter():
with tf.GradientTape() as tape:
logits = net(X)
loss_value = loss(y, logits)
grads = tape.gradient(loss_value, net.trainable_weights)
optimizer.apply_gradients(zip(grads, net.trainable_weights))
batch 50, time 7.976524 sec
batch 100, time 15.683179 sec
increased memory: 12.268000 MB
- Tensorflow包括用户直接用来交互的前端和系统用来执行计算的后端。
- Tensorflow能够通过生成更大规模的计算图,使后端异步计算时间更长,更少被打断,从而提升计算性能。
- 建议使用每个小批量训练或预测时以
batch
为单位生成计算图,从而避免在短时间内将过多计算任务丢给后端
感兴趣的可以去看原文