天天看點

Ray入門指南——分布式架構(待更新)1. ray庫介紹2. ray安裝3. 初步使用的基本形式4. 測試一個簡單的例子5. 系統架構6. ray基礎(待補充)

1. ray庫介紹

金融、工程模型需要大量使用 Pandas 和 Numpy 來做矩陣計算,需要針對 Pandas/Numpy 有更好的支援,ray庫就是其中一種可以加速計算的架構。

Ray 有如下特點:

  • 分布式異步調用
  • 記憶體排程
  • Pandas/Numpy 的分布式支援
  • 支援 Python
  • 整體性能出衆

2. ray安裝

電腦是win10+python3.7.3,安裝ray庫,下面的順序不能錯

pip install -i https://mirrors.aliyun.com/pypi/simple/pytest-runner
pip install -i https://mirrors.aliyun.com/pypi/simple/ray
           

3. 初步使用的基本形式

# 導入ray,并初始化執行環境
import ray
ray.init()

# 定義ray remote函數
@ray.remote
def hello():
    return "Hello world !"

# 異步執行remote函數,傳回結果id
object_id = hello.remote()

# 同步擷取計算結果
hello = ray.get(object_id)

# 輸出計算結果
print hello

           

4. 測試一個簡單的例子

使用ray庫計算100次的延遲1秒

import ray
import time
import numpy as np

# 啟動Ray.
ray.init()
#定義remote函數
@ray.remote
def sleep1(n):
    time.sleep(n)

#程式開始時的時間
time_start=time.time()

result_ids = []
for i in range(100):
    #異步執行remote函數
    sleep1.remote(1)
    
#程式結束時系統時間
time_end=time.time()
#兩者相減
print('totally cost',time_end-time_start)
#print(z_id)
           

不使用ray庫計算100次的延遲1秒

import time
import numpy as np

def sleep1(n):
    time.sleep(n)

#程式開始時的時間
time_start=time.time()
result_ids = []
for i in range(100):
    sleep1(1)
    
#程式結束時系統時間
time_end=time.time()
#兩者相減
print('totally cost',time_end-time_start)
#print(z_id)
           
Ray入門指南——分布式架構(待更新)1. ray庫介紹2. ray安裝3. 初步使用的基本形式4. 測試一個簡單的例子5. 系統架構6. ray基礎(待補充)

5. 系統架構

作為分布式計算系統Ray仍舊遵循了典型的Master-Slave的設計,Master負責全局協調和狀态維護;Slave執行分布式計算任務。不過和傳統的分布式計算系統不同的是Ray使用了混合任務排程的思路。

Ray入門指南——分布式架構(待更新)1. ray庫介紹2. ray安裝3. 初步使用的基本形式4. 測試一個簡單的例子5. 系統架構6. ray基礎(待補充)

在叢集部署模式下Ray啟動了以下關鍵元件:

  • GlobalScheduler(全局排程器)— Master上啟動一個全局排程器用于接收本地排程器送出的任務;并将任務分發給合适的本地任務排程器執行。
  • RedisServer Master(重新配置設定任務)— 啟動一到多個RedisServer用于儲存分布式任務的狀态資訊(Control State),包括對象機器的映射、任務描述、任務debug資訊等。
  • LocalScheduler(局部排程器)— 每個Slave上啟動一個本地排程器,用于送出任務到全局排程器,以及配置設定任務給目前機器的Worker程序。
  • Worker(勞工)— 每個Slave上可以啟動多個Worker程序執行分布式任務;并将計算結果存儲到ObjectStore。
  • ObjectStore(對象存儲)— 每個Slave上啟動一個ObjectStore存儲隻讀資料對象;Worker可以通過共享記憶體的方式通路這些對象資料;這樣可以有效地減少記憶體拷貝和對象序列化成本,ObjectStore底層由Apache Arrow實作。
  • Plasma — 每個Slave上的ObjectStore都由一個名為Plasma的對象管理器進行管理;它可以在Worker通路本地ObjectStore上不存在的遠端資料對象時主動拉取其它Slave上的對象資料到目前機器。
    Ray入門指南——分布式架構(待更新)1. ray庫介紹2. ray安裝3. 初步使用的基本形式4. 測試一個簡單的例子5. 系統架構6. ray基礎(待補充)

Ray的Driver節點和和Slave節點啟動的元件幾乎相同;不過卻有以下差別:

  • Driver上的工作程序DriverProcess一般隻有一個,即使用者啟動的PythonShell;Slave可以根據需要建立多個WorkerProcess。
  • Driver隻能送出任務卻不能來自全局排程器配置設定的任務。Slave可以送出任務也可以接收全局排程器配置設定的任務。
  • Driver可以主動繞過全局排程器給Slave發送Actor調用任務(此處設計是否合理尚不讨論);Slave隻能接收全局排程器配置設定的計算任務。

6. ray基礎(待補充)

  • ray.init() 啟動ray
  • ray.put(x) 擷取一個對象的ID
  • ray.get(x_id) 接受一個對象ID,并從相應的遠端對象建立一個Python對象。
result_ids = [ray.put(i) for i in range(10)]
ray.get(result_ids)  # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
ray.get(result_ids[5]) 	# 5
ray.get(result_ids[1]) 	# 1
           
  • Ray的異步計算:可以執行任意Python函數
"遠端函數"
@ray.remote #在普通函數基礎上添加@ray.remote 
def add2(a, b):
    return a + b
x_id = add2.remote(1, 2)
ray.get(x_id)  # 3
           
  • 遠端函數:不傳回實際值,它們總是傳回對象ID,傳回的對象ID可以是多個。
  • ray.error_info() 可以擷取任務執行時産生的錯誤資訊。
  • ray.wait() 支援批量的任務等待
# 啟動5個remote函數調用任務
results = [f.remote(i) for i in range(5)]
# 阻塞等待4個任務完成,逾時時間為2.5s
ready_ids, remaining_ids = ray.wait(results, num_returns=4, timeout=2500)
           

上述例子中,results包含5個ObjectID,使用ray.wait()操作可以一直等待有4個任務完成後傳回;并将完成的資料對象放在第一個list類型傳回值内;未完成的ObjectID放在第二個list傳回值内。如果設定了逾時時間;那麼在逾時時間結束後仍未等到預期的傳回值個數;則已逾時完成時的傳回值為準。

  • 動态任務圖:Ray應用程式或作業中的基礎基元是一個動态任務圖,這與TensorFlow中的計算圖非常不同,TensorFlow中一個計算圖代表一個神經網絡,并且在單個應用程式中執行多次。 在Ray中任務圖代表整個應用程式;并且隻執行一次,任務圖不是事先知道的,是在應用程式運作時動态建構的,執行一個任務可能會觸發建立更多任務。
  • 任務之間的依賴關系:下面的第二個任務在第一個任務完成之前不會執行,第三個任務在第二個任務完成之前不會執行。在這個例子中,沒有展現并行,展現的是任務之間的依賴關系。
@ray.remote
def f(x):
    return x + 1
x = f.remote(0)
y = f.remote(x)
z = f.remote(y)
ray.get(z) # 3
           
  • 有效的聚合函數:左邊是線性聚合方式,右邊是樹型聚合方式。
    Ray入門指南——分布式架構(待更新)1. ray庫介紹2. ray安裝3. 初步使用的基本形式4. 測試一個簡單的例子5. 系統架構6. ray基礎(待補充)
    代碼實作:
import time
import ray
@ray.remote
def add(x, y):
    time.sleep(1)
    return x + y
"====================線性聚合,時間複雜度O(n) ========================="
id1 = add.remote(1, 2)
id2 = add.remote(id1, 3)
id3 = add.remote(id2, 4)
id4 = add.remote(id3, 5)
id5 = add.remote(id4, 6)
id6 = add.remote(id5, 7)
id7 = add.remote(id6, 8)
result = ray.get(id7)
"===================樹型聚合,時間複雜度 O(log(n))======================"
id1 = add.remote(1, 2)
id2 = add.remote(3, 4)
id3 = add.remote(5, 6)
id4 = add.remote(7, 8)
id5 = add.remote(id1, id2)
id6 = add.remote(id3, id4)
id7 = add.remote(id5, id6)
result = ray.get(id7)
           
  • Actor待補充。

目前參考資源:

https://blog.csdn.net/weixin_43255962/article/details/88689665

https://blog.csdn.net/luanpeng825485697/article/details/88242020