歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
Linux教程網 >> Linux基礎 >> Linux教程 >> Tensorflow 分布式部署簡介

Tensorflow 分布式部署簡介

日期:2017/2/28 13:51:01   编辑:Linux教程

tensorflow-0.8 的一大特性為可以部署在分布式的集群上,本文的內容由Tensorflow的分布式部署手冊翻譯而來,該手冊鏈接為TensorFlow分布式部署手冊


分布式TensorFlow

本文介紹了如何搭建一個TensorFlow服務器的集群,並將一個計算圖部署在該分布式集群上。以下操作建立在你對 TensorFlow的基礎操作已經熟練掌握的基礎之上。

Hello world的分布式實例的編寫

以下是一個簡單的TensorFlow分布式程序的編寫實例

# Start a TensorFlow server as a single-process "cluster".
$ python
>>> import tensorflow as tf
>>> c = tf.constant("Hello, distributed TensorFlow!")
>>> server = tf.train.Server.create_local_server()
>>> sess = tf.Session(server.target)  # Create a session on the server.
>>> sess.run(c)
'Hello, distributed TensorFlow!'

tf.train.Server.create_local_server() 會在本地創建一個單進程集群,該集群中的服務默認為啟動狀態。

創建集群(cluster)

TensorFlow中的集群(cluster)指的是一系列能夠對TensorFlow中的圖(graph)進行分布式計算的任務(task)。每個任務是同服務(server)相關聯的。TensorFlow中的服務會包含一個用於創建session的主節點和一個用於圖運算的工作節點。另外, TensorFlow中的集群可以拆分成一個或多個作業(job), 每個作業可以包含一個或多個任務。下圖為作者對集群內關系的理解。

創建集群的必要條件是為每個任務啟動一個服務。這些任務可以運行在不同的機器上,但你也可以在同一台機器上啟動多個任務(比如說在本地多個不同的GPU上運行)。每個任務會做如下的兩步工作:

  1. 創建一個 tf.train.ClusterSpec 用於對集群中的所有任務進行描述,該描述內容對於所有任務應該是相同的。
  2. 創建一個tf.train.Server 並將tf.train.ClusterSpec 中的參數傳入構造函數,並將作業的名稱和當前任務的編號寫入本地任務中。

創建tf.train.ClusterSpec 的具體方法

tf.train.ClusterSpec 的傳入參數是作業和任務之間的關系映射,該映射關系中的任務是通過ip地址和端口號表示的。具體映射關系如下表所示:

tf.train.ClusterSpec constructionAvailable tasks
tf.train.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]})
/job:local/task:0 local
/job:local/task:1
tf.train.ClusterSpec({
    "worker": [
        "worker0.example.com:2222", 
        "worker1.example.com:2222",
        "worker2.example.com:2222"
    ],
    "ps": [
        "ps0.example.com:2222",
        "ps1.example.com:2222"
    ]})
/job:worker/task:0
/job:worker/task:1
/job:worker/task:2
/job:ps/task:0
/job:ps/task:1

為每一個任務創建tf.train.Server 的實例

每一個tf.train.Server 對象都包含一個本地設備的集合, 一個向其他任務的連接集合,以及一個可以利用以上資源進行分布式計算的“會話目標”(“session target“)。每一個服務程序都是一個指定作業的一員,其在作業中擁有自己獨立的任務號。每一個服務程序都可以和集群中的其他任何服務程序進行通信。
以下兩個代碼片段講述了如何在本地的2222和2223兩個端口上配置不同的任務。

# In task 0:
cluster = tf.train.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]})
server = tf.train.Server(cluster, job_name="local", task_index=0)
# In task 1:
cluster = tf.train.ClusterSpec({"local": ["localhost:2222", "localhost:2223"]})
server = tf.train.Server(cluster, job_name="local", task_index=1)

:當前手動配置任務節點還是一個比較初級的做法,尤其是在遇到較大的集群管理的情況下。tensorflow團隊正在開發一個自動程序化配置任務的節點的工具。例如:集群管理工具Kubernetes。如果你希望tensorflow支持某個特定的管理工具,可以將該請求發到GitHub issue 裡。

為模型指定分布式的設備

為了將某些操作運行在特定的進程上,可以使用tf.device() 函數來指定代碼運行在CPU或GPU上。例如:

with tf.device("/job:ps/task:0"):
  weights_1 = tf.Variable(...)
  biases_1 = tf.Variable(...)

with tf.device("/job:ps/task:1"):
  weights_2 = tf.Variable(...)
  biases_2 = tf.Variable(...)

with tf.device("/job:worker/task:7"):
  input, labels = ...
  layer_1 = tf.nn.relu(tf.matmul(input, weights_1) + biases_1)
  logits = tf.nn.relu(tf.matmul(layer_1, weights_2) + biases_2)
  # ...
  train_op = ...

with tf.Session("grpc://worker7.example.com:2222") as sess:
  for _ in range(10000):
    sess.run(train_op)

在上面的例子中,參數的聲明是通過ps作業中的兩個任務完成的,而模型計算相關的部分則是在work作業裡進行的。TensorFlow將在內部實現作業間的數據傳輸。(ps到work間的向前傳遞;work到ps的計算梯度)

計算流程

在上面的這個稱為“數據並行化”的公用訓練配置項裡,一般會包含多個用於對不同數據大小進行計算的任務(構成了work作業) 和 一個或多個分布在不同機器上用於不停更新共享參數的任務(構成了ps作業)。 所有的這些任務都可以運行在不同的機器上。實現這養的邏輯有很多的方法,目前TensorFlow團隊采用的是構建鏈接庫(lib)的方式來簡化模型的工作,其實現了如下幾種方法:

  • 圖內的拷貝(In-graph replication). 在這種方法下,客戶端程序會建立一個獨立的tf.Graph,該圖中的一系列節點 (tf.Variable)會通過ps 作業(/job:ps)聲明,而計算相關的多份拷貝會通過work作業(/job:worker)來進行。
  • 圖間的拷貝(Between-graph replication). 在這種方法下,每一個任務(/job:worker) 都是通過獨立客戶端單獨聲明的。其相互之間結構類似,每一個客戶端都會建立一個相似的圖結構, 該結構中包含的參數均通過ps 作業(/job:ps)進行聲明並使用tf.train.replica_device_setter() 方法將參數映射到不同的任務中。模型中每一個獨立的計算單元都會映射到/job:worker的本地的任務中。
  • 異步訓練(Asynchronous training). 在這種方法下,每一個圖的備份都會使用獨立的訓練邏輯進行獨立訓練,該方法需要配合上面的兩種方法一同使用。
  • 同步訓練(Synchronous training). 在這種方法下,所有的計算任務會讀取當前參數中相同的值並用於並行化的計算梯度,然後將計算結果合並。這種方法需要和圖內的拷貝(In-graph replication)方法(例如,在CIFAR-10 multi-GPU trainer 中我們使用該方法對梯度求平均值) 和圖間的拷貝(Between-graph replication)(例如,tf.train.SyncReplicasOptimizer)一同使用。

分布式訓練程序的舉例說明

接下來的代碼是一個分布式訓練程序的大致代碼框架,其中實現了圖間的拷貝和異步訓練兩種方法。該示例中包含了參數服務(parameter server)和工作任務(work task)的代碼。

import tensorflow as tf

# Flags for defining the tf.train.ClusterSpec
tf.app.flags.DEFINE_string("ps_hosts", "",
                           "Comma-separated list of hostname:port pairs")
tf.app.flags.DEFINE_string("worker_hosts", "",
                           "Comma-separated list of hostname:port pairs")

# Flags for defining the tf.train.Server
tf.app.flags.DEFINE_string("job_name", "", "One of 'ps', 'worker'")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")

FLAGS = tf.app.flags.FLAGS


def main(_):
  ps_hosts = FLAGS.ps_hosts.split(",")
  worker_hosts = FLAGS.worker_hosts(",")

  # Create a cluster from the parameter server and worker hosts.
  cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})

  # Create and start a server for the local task.
  # 創建並啟動服務
  # 其參數中使用task_index 指定任務的編號
  server = tf.train.Server(cluster,
                           job_name=FLAGS.job_name,
                           task_index=FLAGS.task_index)

  if FLAGS.job_name == "ps":
    server.join()
  elif FLAGS.job_name == "worker":

    # Assigns ops to the local worker by default.
    # 將op 掛載到各個本地的worker上
    with tf.device(tf.train.replica_device_setter(
        worker_device="/job:worker/task:%d" % FLAGS.task_index,
        cluster=cluster)):

      # Build model...
      loss = ...
      global_step = tf.Variable(0)

      train_op = tf.train.AdagradOptimizer(0.01).minimize(
          loss, global_step=global_step)

      saver = tf.train.Saver()
      summary_op = tf.merge_all_summaries()
      init_op = tf.initialize_all_variables()

    # Create a "supervisor", which oversees the training process.
    sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0),
                             logdir="/tmp/train_logs",
                             init_op=init_op,
                             summary_op=summary_op,
                             saver=saver,
                             global_step=global_step,
                             save_model_secs=600)

    # The supervisor takes care of session initialization, restoring from
    # a checkpoint, and closing when done or an error occurs.
    with sv.managed_session(server.target) as sess:
      # Loop until the supervisor shuts down or 1000000 steps have completed.
      step = 0
      while not sv.should_stop() and step < 1000000:
        # Run a training step asynchronously.
        # See `tf.train.SyncReplicasOptimizer` for additional details on how to
        # perform *synchronous* training.
        _, step = sess.run([train_op, global_step])

    # Ask for all the services to stop.
    sv.stop()

if __name__ == "__main__":
  tf.app.run()

使用以下命令可以啟動兩個參數服務和兩個工作任務。(假設上面的python腳本名字為 train.py)

# On ps0.example.com:
$ python trainer.py \
     --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
     --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
     --job_name=ps --task_index=0
# On ps1.example.com:
$ python trainer.py \
     --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
     --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
     --job_name=ps --task_index=1
# On worker0.example.com:
$ python trainer.py \
     --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
     --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
     --job_name=worker --task_index=0
# On worker1.example.com:
$ python trainer.py \
     --ps_hosts=ps0.example.com:2222,ps1.example.com:2222 \
     --worker_hosts=worker0.example.com:2222,worker1.example.com:2222 \
     --job_name=worker --task_index=1

名詞解釋

客戶端(Client)

  • 客戶端是一個用於建立TensorFlow計算圖並創立與集群進行交互的會話層tensorflow::Session 的程序。一般客戶端是通過python或C++實現的。一個獨立的客戶端進程可以同時與多個TensorFlow的服務端相連 (上面的計算流程一節),同時一個獨立的服務端也可以與多個客戶端相連。

集群(Cluster)
- 一個TensorFlow的集群裡包含了一個或多個作業(job), 每一個作業又可以拆分成一個或多個任務(task)。集群的概念主要用與一個特定的高層次對象中,比如說訓練神經網絡,並行化操作多台機器等等。集群對象可以通過tf.train.ClusterSpec 來定義。
作業(Job)
- 一個作業可以拆封成多個具有相同目的的任務(task),比如說,一個稱之為ps(parameter server,參數服務器)的作業中的任務主要是保存和更新變量,而一個名為work(工作)的作業一般是管理無狀態且主要從事計算的任務。一個作業中的任務可以運行於不同的機器上,作業的角色也是靈活可變的,比如說稱之為”work”的作業可以保存一些狀態。
主節點的服務邏輯(Master service)
- 一個RPC 服務程序可以用來遠程連接一系列的分布式設備,並扮演一個會話終端的角色,主服務程序實現了一個tensorflow::Session 的借口並負責通過工作節點的服務進程(worker service)與工作的任務進行通信。所有的主服務程序都有了主節點的服務邏輯。
任務(Task)
- 任務相當於是一個特定的TesnsorFlow服務端,其相當於一個獨立的進程,該進程屬於特定的作業並在作業中擁有對應的序號。
TensorFlow服務端(TensorFlow server)
- 一個運行了tf.train.Server 實例的進程,其為集群中的一員,並有主節點和工作節點之分。
工作節點的服務邏輯(Worker service)
- 其為一個可以使用本地設備對部分圖進行計算的RPC 邏輯,一個工作節點的服務邏輯實現了worker_service.proto 接口, 所有的TensorFlow服務端均包含工作節點的服務邏輯。

Copyright © Linux教程網 All Rights Reserved