我在进行TensorFlow的分布式训练,想通过筛选梯度的方式,来实现较少通信时长的目的。这是我的代码:
import time
import tensorflow as tf
import numpy as np
from tensorflow.examples.tutorials.mnist import input_data # 数据的获取不是本章重点,这里直接导入
FLAGS = tf.app.flags.FLAGS
tf.app.flags.DEFINE_integer('thread_steps', 0, 'Steps run before sync gradients.')
tf.app.flags.DEFINE_string('data_dir', '/tmp/mnist-data', 'Directory for storing mnist data')
tf.app.flags.DEFINE_string("job_name", "worker", "ps or worker")
tf.app.flags.DEFINE_integer("task_id", 0, "Task ID of the worker/ps running the train")
tf.app.flags.DEFINE_string("ps_hosts", "localhost:2222", "ps机")
tf.app.flags.DEFINE_string("worker_hosts", "localhost:2223,localhost:2224", "worker机,用逗号隔开")
MODEL_DIR = "./distribute_model_ckpt/"
BATCH_SIZE = 32
THREAD_STEPS = FLAGS.thread_steps
def main(self):
# ========== STEP1: 读取数据 ========== #mnist = input\_data.read\_data\_sets(FLAGS.data\_dir, one\_hot=True) # 读取数据
# ========== STEP2: 声明集群 ========== ## 构建集群ClusterSpec和服务声明ps\_hosts = FLAGS.ps\_hosts.split(",")worker\_hosts = FLAGS.worker\_hosts.split(",")cluster = tf.train.ClusterSpec({"ps":ps\_hosts, "worker":worker\_hosts}) # 构建集群名单server = tf.train.Server(cluster, job\_name=FLAGS.job\_name, task\_index=FLAGS.task\_id) # 声明服务n\_workers = len(worker\_hosts) # worker机的数量
# ========== STEP3: ps机内容 ========== ## 分工,对于ps机器不需要执行训练过程,只需要管理变量。server.join()会一直停在这条语句上。if FLAGS.job\_name == "ps": with tf.device("/cpu:0"): server.join()
# ========== STEP4: worker机内容 ========== ## 下面定义worker机需要进行的操作is\_chief = (FLAGS.task\_id == 0) # 选取task\_id=0的worker机作为chief
# 通过replica\_device\_setter函数来指定每一个运算的设备。# replica\_device\_setter会自动将所有参数分配到参数服务器上,将计算分配到当前的worker机上。device\_setter = tf.train.replica\_device\_setter( worker\_device="/job:worker/task:%d" % FLAGS.task\_id, cluster=cluster)
# 这一台worker机器需要做的计算内容with tf.device(device\_setter): # 输入数据 x = tf.placeholder(name="x-input",shape=[None,28\*28],dtype=tf.float32) # 输入样本像素为28\*28 # x\_shape = x.get\_shape().as\_list() # length = x\_shape[1] # x\_reshaped = tf.reshape(x, [-1,length]) y\_ = tf.placeholder(name="y-input", shape=[None,10],dtype=tf.float32) # MNIST是十分类 # 第一层(隐藏层) with tf.variable\_scope("layer1"): weight1 = tf.get\_variable(name="weight1", shape=[28\*28, 10], initializer=tf.glorot\_normal\_initializer()) biases1 = tf.get\_variable(name="biases1", shape=[10], initializer=tf.glorot\_normal\_initializer()) layer1 = tf.nn.relu(tf.matmul(x, weight1) + biases1, name="layer1") # 第二层(输出层) with tf.variable\_scope("layer2"): weight2 = tf.get\_variable(name="weight2", shape=[10, 10], initializer=tf.glorot\_normal\_initializer()) biases2 = tf.get\_variable(name="biases2", shape=[10], initializer=tf.glorot\_normal\_initializer()) y = tf.add(tf.matmul(layer1, weight2), biases2, name="y") pred = tf.argmax(y, axis=1, name="pred") global\_step = tf.contrib.framework.get\_or\_create\_global\_step() # 必须手动声明global\_step否则会报错 # 损失和优化 cross\_entropy = tf.nn.sparse\_softmax\_cross\_entropy\_with\_logits(logits=y, labels=tf.argmax(y\_, axis=1)) loss = tf.reduce\_mean(cross\_entropy) with tf.name\_scope('train'): optimizer = tf.train.GradientDescentOptimizer(0.01) with tf.name\_scope('gradient'): gradient\_all = optimizer.compute\_gradients(loss,weight2) gradients\_node=tf.gradients(loss,weight2) grads\_holder = [(tf.placeholder(tf.float32,shape=g.get\_shape()), v) for (g, v) in gradient\_all] # \*\*通过tf.train.SyncReplicasOptimizer函数实现函数同步更新\*\* opt = tf.train.SyncReplicasOptimizer( tf.train.GradientDescentOptimizer(0.01), replicas\_to\_aggregate=n\_workers, total\_num\_replicas=n\_workers ) sync\_replicas\_hook = opt.make\_session\_run\_hook(is\_chief) train\_op = opt.apply\_gradients(grads\_holder, global\_step=global\_step) if is\_chief: train\_op = tf.no\_op() hooks = [sync\_replicas\_hook, tf.train.StopAtStepHook(last\_step=10000)] # 把同步更新的hook加进来 config = tf.ConfigProto( allow\_soft\_placement=True, # 设置成True,那么当运行设备不满足要求时,会自动分配GPU或者CPU。 log\_device\_placement=False, # 设置为True时,会打印出TensorFlow使用了哪种操作 )
# ========== STEP5: 打开会话 ========== # # 对于分布式训练,打开会话时不采用tf.Session(),而采用tf.train.MonitoredTrainingSession() # 详情参考:https://www.cnblogs.com/estragon/p/10034511.html with tf.train.MonitoredTrainingSession( master=server.target, is\_chief=is\_chief, # checkpoint\_dir=MODEL\_DIR, hooks=hooks, # save\_checkpoint\_secs=10, config=config) as sess: print("session started!") start\_time = time.time() step = 0 while not sess.should\_stop(): # for step in range(THREAD\_STEPS): xs,ys= mnist.train.next\_batch(BATCH\_SIZE) # batch\_size=32 #求每个梯度 grads = sess.run(gradients\_node, feed\_dict={x:xs, y\_: ys}) grads=np.array(grads) grads=grads.reshape((10,10)) print(grads) grad\_abs=np.abs(grads) variance = np.var(grad\_abs, axis=1) print('variance:',variance) #取方差最大的几组值 topk\_var=tf.constant(variance) k=1 output1 = tf.nn.top\_k(topk\_var, k) with tf.Session() as sess1: print(sess1.run(output1)) a=output1.indices[-1] # print(sess1.run(a)) #a是所在TOPK个方差最大的索引值 x=a.eval() a=int(x) g\_a=grads[a,:] # print('g\_a=',g\_a) # print('\n') #取方差最大的一组值中的前几个大的梯度值,设置梯度阈值 g\_a\_abs=np.abs(g\_a) k=3 output2 = tf.nn.top\_k(g\_a\_abs, k) with tf.Session() as sess2: print(sess2.run(output2)) b=output2.indices[-1] # print(sess2.run(b)) #a是所在TOPK个方差最大的索引值 x=b.eval() b=int(x) threshold=g\_a\_abs[b] grad\_end=np.where(grad\_abs<threshold,0,grads) grad\_end=[grad\_end] grad\_var={} for i in range(len(grads\_holder)): k = grads\_holder[i][0] if k is not None: # grad\_var[k] =np.var([tf.reshape(g, [-1]) for g in grads]) grad\_var[k] =[g[i][0] for g in grad\_end]
\_, loss\_value, global\_step\_value = sess.run([train\_op, loss, global\_step], feed\_dict=grad\_var) if step > 0 and step % 100 == 0: duration = time.time() - start\_time sec\_per\_batch = duration / global\_step\_value print("After %d training steps(%d global steps), loss on training batch is %g (%.3f sec/batch)" % (step, global\_step\_value, loss\_value, sec\_per\_batch)) print('Training elapsed time:%f s' % duration) step += 1
if __name__ == "__main__":
tf.app.run()
这是我的报错信息:

相似问题