#12 通讯优化,对模型进行切片传输,并进行流处理

Open
created 3 months ago by yands · 5 comments
yands commented 3 months ago

对模型进行切片传输,并且对切片进行流处理,能充分利用上行和下行的带宽,将传输时间缩短一半。

对模型进行切片传输,并且对切片进行流处理,能充分利用上行和下行的带宽,将传输时间缩短一半。
yands commented 3 months ago
Owner

需求:为了充分利用上行和下行的带宽,一个 client 需要在同一时间发送和接受信息。
但经实验发现:grpc 的一个 client 端在一个时间点只能发送或者接收,不能收发同时。
实验代码:
client 端分片传输代码:

for i in range(10):
    print(i)
    send(clientMessage_)
    #do something
    serverMessage = receive()

实验结果:
分片传输和合并传输的时间相差不大。(7.6s 和 8.2s)

需求:为了充分利用上行和下行的带宽,一个 client 需要在同一时间发送和接受信息。 但经实验发现:grpc 的一个 client 端在一个时间点只能发送或者接收,不能收发同时。 实验代码: client 端分片传输代码: ``` for i in range(10): print(i) send(clientMessage_) #do something serverMessage = receive() ``` 实验结果: 分片传输和合并传输的时间相差不大。(7.6s 和 8.2s)
yands self-assigned this 3 months ago
yands commented 2 months ago
Owner

调研了 grpc 的 grpc.aio 协程模块和 grfc 模块,都不支持收发同时, 只是让程序不用等待 send 函数而运行后面的代码,但 send 和 receive 同时的话是会阻塞的。

所以grpc 一个 chanel 不能同时发送和接受信息。最后采用的方案就是:一个 client 开两个进程,每个进程开一个chanel(链接),一个用于发送,一个用于接收。
实验结果:

同步收发 只发送 只接收 异步收发
61s 20s 24s 31s

实验代码
client:

def read_process():
    streamin_join = init_grpc()
    el = get_event_loop()
    corots = (comunicate_one_chanel_read(streamin_join, 10))
    el.run_until_complete(corots)

def write_process():
    streamin_join = init_grpc()
    weights = [np.random.rand(1024,1024)  for i in range(100)]
    el = get_event_loop()
    corots = (comunicate_one_chanel_write(streamin_join, weights, 10))
    el.run_until_complete(corots)
    

p1 = Process(target=write_process, args=())
p2 = Process(target=read_process, args=())
p1.start()
p2.start()
p1.join()
p2.join()
调研了 grpc 的 grpc.aio 协程模块和 grfc 模块,都不支持收发同时, 只是让程序不用等待 send 函数而运行后面的代码,但 send 和 receive 同时的话是会阻塞的。 所以grpc 一个 chanel 不能同时发送和接受信息。最后采用的方案就是:一个 client 开两个进程,每个进程开一个chanel(链接),一个用于发送,一个用于接收。 实验结果: | 同步收发 | 只发送 | 只接收 | 异步收发 | | ----------------------------- | --------- | ------------ | -------- | | 61s | 20s | 24s | 31s | [实验代码](https://git.openi.org.cn/PCL-Platform.Intelligence/AISynergy/src/branch/streamming/AISynergy-core/src/AISyncore_streaming/bi_stream/multi_thread_exp) client: ``` def read_process(): streamin_join = init_grpc() el = get_event_loop() corots = (comunicate_one_chanel_read(streamin_join, 10)) el.run_until_complete(corots) def write_process(): streamin_join = init_grpc() weights = [np.random.rand(1024,1024) for i in range(100)] el = get_event_loop() corots = (comunicate_one_chanel_write(streamin_join, weights, 10)) el.run_until_complete(corots) p1 = Process(target=write_process, args=()) p2 = Process(target=read_process, args=()) p1.start() p2.start() p1.join() p2.join() ```
yands commented 2 months ago
Owner
[流式AISynergy程序类图](https://www.plantuml.com/plantuml/svg/RLDDZnCn3BtdLrJYm8TKAkq5n04jh80huWzOxemR8ZAPmNRQNLNzxsJ6BPD8lKHvptQUFS_pA8giUOZkzM_LIJxjTbF4D3Flkt7OTH55jdr0ppWujy1D5mCbXOd7vvVDsVtuzVFfAuVUa_Eqq1En1f8tRut0KajAF1E7eEMa8yQCIkNAr8qfKJVJbvl2VdaE1apeYJUlFdvu-lRzozb9tbz3COYwa892uS38yWASi527aMlQsaXq0izJTsChkNM2UrBoq5MeUv0PmJrekQ2CIOA6CItp_SqapNHhVYL7V2I6WKJG4mGbHXsvb0afs1U_HQ-abTIaeZQbiAJDR2r_e1wFlXY7tZFvkSPSMN7bJt7d47ncIwfmdNHfL-DctDcbF00UCKJSnpbndqFiYO791r7YGkLK0Oond6QYJO_351i96xkqBpGBFSDfv3-sN3Okev9_AR1jfbgxwWeyvjJROjhvd-c3N5SJCCQh1sAjDB_0k-tsSxL3RhrQQFNRjOlTLjuFrzMcseRhJjmZfJuFyHy0) [流式AISynergy程序活动图](https://www.plantuml.com/plantuml/svg/vLNBRjiw4DtpAuZSHQX56DmihHRDhjq4AD0F86XvJB6cA9KSnV7VboBf1nqc1RBefbuuocWUvvnv-D6ZT3WFXjpsY9DVtTzFHjh5mhjnk9STwcUDXxkDbih9uMuDA4aC8jlHxGWDdvM5FVNWdi6roQ2jHghSr8bBitBZF6L7ljN8ccYE4TawoL0441pzJ8K254icBOqulV2iQKzX3YQGcF_DcVOor0xOADXnLZtjZ0QBlYsziZMKyNAO38XaO4tfbpYMdymZ1VmVYIJqOl2gARtVnbamJx41JFXcrrAKaAufFFtu6kBICryxlL707VoQmHUmYXnL9fuloed0M6H3xa-rk8moIsFeiRQSrwcyw1uyzoWHgFQqU_9AZBjwVR2RXA7ojZuK7fr4K0SkbNAWmhE8azI-e_k-rmQOWeItwsUx-z8is9WTiQ7XYpBjIx9rTFK9W6TfPedmyJ7u9xev03hT-NeJ5mt_NXVJObEffBRa5L3EUVj1vrGldGrIm__9pWCDQEmcOTS-GOyaI8ctB7jnFdK7_EUeRMffgdOrI08F4mIfbp-iwwK5mnt83MikH5xvtEN5CxTxa5OgS7mzQxCH3fJsA8AVIE_PHVhBJCaSqankV3cmw5muf4nm7CUS9tXK3g7GQTbFyFZn7NWl5SJt4aV7Al5nLYiPahaIMDbAlQK5qTYgi0WDnHugGv1zqmtlP7zMkttBcP-OOPRaoAuMzD-t4RzhkxiEBhPKUC14dztcHRszoWN60sN-eMl8-pz45bwGNNMaIYQDrjxfulvzX490LL3m2uLucU0L3RlHs-f-ePlXp6sdZI7r3SwRMsnikNudSrOchrnBYCTn8YHw_VbM4xAgKerCYx4kzJlfLOykTSgj9lcI-_hrlibJSNEH_SqHIiv9_vkGU-K0PsVfUTPRSXRhCJpEW_aD)
yands commented 2 months ago
Owner

前面的实验结果可以在同时有一个接收和一个发送任务是可以同时利用上行和下行的带宽,基本上可以并行的。

实现了一版基于 Async Reader / Writer Streaming 模式的 client 代码,但发现并不能充分利用上下行带宽。原因是该方式并不适用于同时有多个接收和多个发送任务的场景,会造成阻塞,程序不运行。

尝试了各种 grpc 的使用方案(方案代码),结果如下:

方案 时间
Async Reader / Writer Streaming async: 23
Async Iterator Streaming async: 18
协程的 loop.call_soon() async: 18.5
双线程,每个线程一个 channel sync: 20
双进程,每个进程一个 channel sync: 14
只 发送/接收 数据 sync: 10

调研了 grpc 的 grpc.aio 协程模块和 grfc 模块,都不支持收发同时, 只是让程序不用等待 send 函数而运行后面的代码,但 send 和 receive 同时的话是会阻塞的。

所以grpc 一个 chanel 不能同时发送和接受信息。最后采用的方案就是:一个 client 开两个进程,每个进程开一个chanel(链接),一个用于发送,一个用于接收。
实验结果:

同步收发 只发送 只接收 异步收发
61s 20s 24s 31s

实验代码
client:

def read_process():
    streamin_join = init_grpc()
    el = get_event_loop()
    corots = (comunicate_one_chanel_read(streamin_join, 10))
    el.run_until_complete(corots)

def write_process():
    streamin_join = init_grpc()
    weights = [np.random.rand(1024,1024)  for i in range(100)]
    el = get_event_loop()
    corots = (comunicate_one_chanel_write(streamin_join, weights, 10))
    el.run_until_complete(corots)
    

p1 = Process(target=write_process, args=())
p2 = Process(target=read_process, args=())
p1.start()
p2.start()
p1.join()
p2.join()
前面的[实验](https://git.openi.org.cn/PCL-Platform.Intelligence/AISynergy/issues/12#issuecomment-22395)结果可以在同时有一个接收和一个发送任务是可以同时利用上行和下行的带宽,基本上可以并行的。 实现了一版基于 Async Reader / Writer Streaming 模式的 [client 代码](https://git.openi.org.cn/PCL-Platform.Intelligence/AISynergy/src/branch/streamming/AISynergy-core/src/AISyncore),但发现并不能充分利用上下行带宽。原因是该方式并不适用于同时有多个接收和多个发送任务的场景,会造成阻塞,程序不运行。 尝试了各种 grpc 的使用方案([方案代码](https://git.openi.org.cn/PCL-Platform.Intelligence/AISynergy/src/branch/streamming/AISynergy-core/src/AISyncore_streaming/bi_stream/multi_thread_exp)),结果如下: | 方案 | 时间 | | -------- | -------- | -------- | | Async Reader / Writer Streaming | async: 23 | | Async Iterator Streaming | async: 18 | | 协程的 loop.call_soon() | async: 18.5 | | 双线程,每个线程一个 channel | sync: 20 | | 双进程,每个进程一个 channel | sync: 14 | | 只 发送/接收 数据 | sync: 10 | > 调研了 grpc 的 grpc.aio 协程模块和 grfc 模块,都不支持收发同时, 只是让程序不用等待 send 函数而运行后面的代码,但 send 和 receive 同时的话是会阻塞的。 > > 所以grpc 一个 chanel 不能同时发送和接受信息。最后采用的方案就是:一个 client 开两个进程,每个进程开一个chanel(链接),一个用于发送,一个用于接收。 > 实验结果: > > | 同步收发 | 只发送 | 只接收 | 异步收发 | > | ----------------------------- | --------- | ------------ | -------- | > | 61s | 20s | 24s | 31s | > > > [实验代码](https://git.openi.org.cn/PCL-Platform.Intelligence/AISynergy/src/branch/streamming/AISynergy-core/src/AISyncore_streaming/bi_stream/multi_thread_exp) > client: > ``` > def read_process(): > streamin_join = init_grpc() > el = get_event_loop() > corots = (comunicate_one_chanel_read(streamin_join, 10)) > el.run_until_complete(corots) > > def write_process(): > streamin_join = init_grpc() > weights = [np.random.rand(1024,1024) for i in range(100)] > el = get_event_loop() > corots = (comunicate_one_chanel_write(streamin_join, weights, 10)) > el.run_until_complete(corots) > > > p1 = Process(target=write_process, args=()) > p2 = Process(target=read_process, args=()) > p1.start() > p2.start() > p1.join() > p2.join() > ``` > >
yands commented 1 month ago
Owner

从上次实验结果来看,采用多进程的方式来实现的效率最高,所以拟采用多进程的方式来通讯。但进程间是不共享内存的,所以进程间通讯需要对 message 进行序列化和反序列化,主要实现了 clientMessage 和 serverMessage 转换为 Dict 类型的功能。
代码链接

从上次实验结果来看,采用多进程的方式来实现的效率最高,所以拟采用多进程的方式来通讯。但进程间是不共享内存的,所以进程间通讯需要对 message 进行序列化和反序列化,主要实现了 clientMessage 和 serverMessage 转换为 Dict 类型的功能。 [代码链接](https://git.openi.org.cn/PCL-Platform.Intelligence/AISynergy/src/branch/streamming/AISynergy-core/src/AISyncore/proto/protobuf_to_dict.py)
Sign in to join this conversation.
No Milestone
No Assignees
1 Participants
Notifications
Due Date

No due date set.

Dependencies

This issue currently doesn't have any dependencies.

Loading…
There is no content yet.