1. ray庫介紹
金融、工程模型需要大量使用 Pandas 和 Numpy 來做矩陣計算,需要針對 Pandas/Numpy 有更好的支援,ray庫就是其中一種可以加速計算的架構。
Ray 有如下特點:
- 分布式異步調用
- 記憶體排程
- Pandas/Numpy 的分布式支援
- 支援 Python
- 整體性能出衆
2. ray安裝
電腦是win10+python3.7.3,安裝ray庫,下面的順序不能錯
pip install -i https://mirrors.aliyun.com/pypi/simple/pytest-runner
pip install -i https://mirrors.aliyun.com/pypi/simple/ray
3. 初步使用的基本形式
# 導入ray,并初始化執行環境
import ray
ray.init()
# 定義ray remote函數
@ray.remote
def hello():
return "Hello world !"
# 異步執行remote函數,傳回結果id
object_id = hello.remote()
# 同步擷取計算結果
hello = ray.get(object_id)
# 輸出計算結果
print hello
4. 測試一個簡單的例子
使用ray庫計算100次的延遲1秒
import ray
import time
import numpy as np
# 啟動Ray.
ray.init()
#定義remote函數
@ray.remote
def sleep1(n):
time.sleep(n)
#程式開始時的時間
time_start=time.time()
result_ids = []
for i in range(100):
#異步執行remote函數
sleep1.remote(1)
#程式結束時系統時間
time_end=time.time()
#兩者相減
print('totally cost',time_end-time_start)
#print(z_id)
不使用ray庫計算100次的延遲1秒
import time
import numpy as np
def sleep1(n):
time.sleep(n)
#程式開始時的時間
time_start=time.time()
result_ids = []
for i in range(100):
sleep1(1)
#程式結束時系統時間
time_end=time.time()
#兩者相減
print('totally cost',time_end-time_start)
#print(z_id)
5. 系統架構
作為分布式計算系統Ray仍舊遵循了典型的Master-Slave的設計,Master負責全局協調和狀态維護;Slave執行分布式計算任務。不過和傳統的分布式計算系統不同的是Ray使用了混合任務排程的思路。
在叢集部署模式下Ray啟動了以下關鍵元件:
- GlobalScheduler(全局排程器)— Master上啟動一個全局排程器用于接收本地排程器送出的任務;并将任務分發給合适的本地任務排程器執行。
- RedisServer Master(重新配置設定任務)— 啟動一到多個RedisServer用于儲存分布式任務的狀态資訊(Control State),包括對象機器的映射、任務描述、任務debug資訊等。
- LocalScheduler(局部排程器)— 每個Slave上啟動一個本地排程器,用于送出任務到全局排程器,以及配置設定任務給目前機器的Worker程序。
- Worker(勞工)— 每個Slave上可以啟動多個Worker程序執行分布式任務;并将計算結果存儲到ObjectStore。
- ObjectStore(對象存儲)— 每個Slave上啟動一個ObjectStore存儲隻讀資料對象;Worker可以通過共享記憶體的方式通路這些對象資料;這樣可以有效地減少記憶體拷貝和對象序列化成本,ObjectStore底層由Apache Arrow實作。
- Plasma — 每個Slave上的ObjectStore都由一個名為Plasma的對象管理器進行管理;它可以在Worker通路本地ObjectStore上不存在的遠端資料對象時主動拉取其它Slave上的對象資料到目前機器。
Ray的Driver節點和和Slave節點啟動的元件幾乎相同;不過卻有以下差別:
- Driver上的工作程序DriverProcess一般隻有一個,即使用者啟動的PythonShell;Slave可以根據需要建立多個WorkerProcess。
- Driver隻能送出任務卻不能來自全局排程器配置設定的任務。Slave可以送出任務也可以接收全局排程器配置設定的任務。
- Driver可以主動繞過全局排程器給Slave發送Actor調用任務(此處設計是否合理尚不讨論);Slave隻能接收全局排程器配置設定的計算任務。
6. ray基礎(待補充)
- ray.init() 啟動ray
- ray.put(x) 擷取一個對象的ID
- ray.get(x_id) 接受一個對象ID,并從相應的遠端對象建立一個Python對象。
result_ids = [ray.put(i) for i in range(10)]
ray.get(result_ids) # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
ray.get(result_ids[5]) # 5
ray.get(result_ids[1]) # 1
- Ray的異步計算:可以執行任意Python函數
"遠端函數"
@ray.remote #在普通函數基礎上添加@ray.remote
def add2(a, b):
return a + b
x_id = add2.remote(1, 2)
ray.get(x_id) # 3
- 遠端函數:不傳回實際值,它們總是傳回對象ID,傳回的對象ID可以是多個。
- ray.error_info() 可以擷取任務執行時産生的錯誤資訊。
- ray.wait() 支援批量的任務等待
# 啟動5個remote函數調用任務
results = [f.remote(i) for i in range(5)]
# 阻塞等待4個任務完成,逾時時間為2.5s
ready_ids, remaining_ids = ray.wait(results, num_returns=4, timeout=2500)
上述例子中,results包含5個ObjectID,使用ray.wait()操作可以一直等待有4個任務完成後傳回;并将完成的資料對象放在第一個list類型傳回值内;未完成的ObjectID放在第二個list傳回值内。如果設定了逾時時間;那麼在逾時時間結束後仍未等到預期的傳回值個數;則已逾時完成時的傳回值為準。
- 動态任務圖:Ray應用程式或作業中的基礎基元是一個動态任務圖,這與TensorFlow中的計算圖非常不同,TensorFlow中一個計算圖代表一個神經網絡,并且在單個應用程式中執行多次。 在Ray中任務圖代表整個應用程式;并且隻執行一次,任務圖不是事先知道的,是在應用程式運作時動态建構的,執行一個任務可能會觸發建立更多任務。
- 任務之間的依賴關系:下面的第二個任務在第一個任務完成之前不會執行,第三個任務在第二個任務完成之前不會執行。在這個例子中,沒有展現并行,展現的是任務之間的依賴關系。
@ray.remote
def f(x):
return x + 1
x = f.remote(0)
y = f.remote(x)
z = f.remote(y)
ray.get(z) # 3
- 有效的聚合函數:左邊是線性聚合方式,右邊是樹型聚合方式。 代碼實作:
import time
import ray
@ray.remote
def add(x, y):
time.sleep(1)
return x + y
"====================線性聚合,時間複雜度O(n) ========================="
id1 = add.remote(1, 2)
id2 = add.remote(id1, 3)
id3 = add.remote(id2, 4)
id4 = add.remote(id3, 5)
id5 = add.remote(id4, 6)
id6 = add.remote(id5, 7)
id7 = add.remote(id6, 8)
result = ray.get(id7)
"===================樹型聚合,時間複雜度 O(log(n))======================"
id1 = add.remote(1, 2)
id2 = add.remote(3, 4)
id3 = add.remote(5, 6)
id4 = add.remote(7, 8)
id5 = add.remote(id1, id2)
id6 = add.remote(id3, id4)
id7 = add.remote(id5, id6)
result = ray.get(id7)
- Actor待補充。
目前參考資源:
https://blog.csdn.net/weixin_43255962/article/details/88689665
https://blog.csdn.net/luanpeng825485697/article/details/88242020