#12 通讯优化,对模型进行切片传输,并进行流处理

开启中
yands1年前创建 · 5 条评论
yands 评论于 1年前
对模型进行切片传输,并且对切片进行流处理,能充分利用上行和下行的带宽,将传输时间缩短一半。
yands 评论于 1年前
所有者
需求:为了充分利用上行和下行的带宽,一个 client 需要在同一时间发送和接受信息。 但经实验发现:grpc 的一个 client 端在一个时间点只能发送或者接收,不能收发同时。 实验代码: client 端分片传输代码: ``` for i in range(10): print(i) send(clientMessage_) #do something serverMessage = receive() ``` 实验结果: 分片传输和合并传输的时间相差不大。(7.6s 和 8.2s)
yands1年前 指派给自己
yands 评论于 1年前
所有者
调研了 grpc 的 grpc.aio 协程模块和 grfc 模块,都不支持收发同时, 只是让程序不用等待 send 函数而运行后面的代码,但 send 和 receive 同时的话是会阻塞的。 所以grpc 一个 chanel 不能同时发送和接受信息。最后采用的方案就是:一个 client 开两个进程,每个进程开一个chanel(链接),一个用于发送,一个用于接收。 实验结果: | 同步收发 | 只发送 | 只接收 | 异步收发 | | ----------------------------- | --------- | ------------ | -------- | | 61s | 20s | 24s | 31s | [实验代码](https://git.openi.org.cn/PCL-Platform.Intelligence/AISynergy/src/branch/streamming/AISynergy-core/src/AISyncore_streaming/bi_stream/multi_thread_exp) client: ``` def read_process(): streamin_join = init_grpc() el = get_event_loop() corots = (comunicate_one_chanel_read(streamin_join, 10)) el.run_until_complete(corots) def write_process(): streamin_join = init_grpc() weights = [np.random.rand(1024,1024) for i in range(100)] el = get_event_loop() corots = (comunicate_one_chanel_write(streamin_join, weights, 10)) el.run_until_complete(corots) p1 = Process(target=write_process, args=()) p2 = Process(target=read_process, args=()) p1.start() p2.start() p1.join() p2.join() ```
yands 评论于 1年前
所有者
[流式AISynergy程序类图](https://www.plantuml.com/plantuml/svg/RLDDZnCn3BtdLrJYm8TKAkq5n04jh80huWzOxemR8ZAPmNRQNLNzxsJ6BPD8lKHvptQUFS_pA8giUOZkzM_LIJxjTbF4D3Flkt7OTH55jdr0ppWujy1D5mCbXOd7vvVDsVtuzVFfAuVUa_Eqq1En1f8tRut0KajAF1E7eEMa8yQCIkNAr8qfKJVJbvl2VdaE1apeYJUlFdvu-lRzozb9tbz3COYwa892uS38yWASi527aMlQsaXq0izJTsChkNM2UrBoq5MeUv0PmJrekQ2CIOA6CItp_SqapNHhVYL7V2I6WKJG4mGbHXsvb0afs1U_HQ-abTIaeZQbiAJDR2r_e1wFlXY7tZFvkSPSMN7bJt7d47ncIwfmdNHfL-DctDcbF00UCKJSnpbndqFiYO791r7YGkLK0Oond6QYJO_351i96xkqBpGBFSDfv3-sN3Okev9_AR1jfbgxwWeyvjJROjhvd-c3N5SJCCQh1sAjDB_0k-tsSxL3RhrQQFNRjOlTLjuFrzMcseRhJjmZfJuFyHy0) [流式AISynergy程序活动图](https://www.plantuml.com/plantuml/svg/vLNBRjiw4DtpAuZSHQX56DmihHRDhjq4AD0F86XvJB6cA9KSnV7VboBf1nqc1RBefbuuocWUvvnv-D6ZT3WFXjpsY9DVtTzFHjh5mhjnk9STwcUDXxkDbih9uMuDA4aC8jlHxGWDdvM5FVNWdi6roQ2jHghSr8bBitBZF6L7ljN8ccYE4TawoL0441pzJ8K254icBOqulV2iQKzX3YQGcF_DcVOor0xOADXnLZtjZ0QBlYsziZMKyNAO38XaO4tfbpYMdymZ1VmVYIJqOl2gARtVnbamJx41JFXcrrAKaAufFFtu6kBICryxlL707VoQmHUmYXnL9fuloed0M6H3xa-rk8moIsFeiRQSrwcyw1uyzoWHgFQqU_9AZBjwVR2RXA7ojZuK7fr4K0SkbNAWmhE8azI-e_k-rmQOWeItwsUx-z8is9WTiQ7XYpBjIx9rTFK9W6TfPedmyJ7u9xev03hT-NeJ5mt_NXVJObEffBRa5L3EUVj1vrGldGrIm__9pWCDQEmcOTS-GOyaI8ctB7jnFdK7_EUeRMffgdOrI08F4mIfbp-iwwK5mnt83MikH5xvtEN5CxTxa5OgS7mzQxCH3fJsA8AVIE_PHVhBJCaSqankV3cmw5muf4nm7CUS9tXK3g7GQTbFyFZn7NWl5SJt4aV7Al5nLYiPahaIMDbAlQK5qTYgi0WDnHugGv1zqmtlP7zMkttBcP-OOPRaoAuMzD-t4RzhkxiEBhPKUC14dztcHRszoWN60sN-eMl8-pz45bwGNNMaIYQDrjxfulvzX490LL3m2uLucU0L3RlHs-f-ePlXp6sdZI7r3SwRMsnikNudSrOchrnBYCTn8YHw_VbM4xAgKerCYx4kzJlfLOykTSgj9lcI-_hrlibJSNEH_SqHIiv9_vkGU-K0PsVfUTPRSXRhCJpEW_aD)
yands 评论于 1年前
所有者
前面的[实验](https://git.openi.org.cn/PCL-Platform.Intelligence/AISynergy/issues/12#issuecomment-22395)结果可以在同时有一个接收和一个发送任务是可以同时利用上行和下行的带宽,基本上可以并行的。 实现了一版基于 Async Reader / Writer Streaming 模式的 [client 代码](https://git.openi.org.cn/PCL-Platform.Intelligence/AISynergy/src/branch/streamming/AISynergy-core/src/AISyncore),但发现并不能充分利用上下行带宽。原因是该方式并不适用于同时有多个接收和多个发送任务的场景,会造成阻塞,程序不运行。 尝试了各种 grpc 的使用方案([方案代码](https://git.openi.org.cn/PCL-Platform.Intelligence/AISynergy/src/branch/streamming/AISynergy-core/src/AISyncore_streaming/bi_stream/multi_thread_exp)),结果如下: | 方案 | 时间 | | -------- | -------- | -------- | | Async Reader / Writer Streaming | async: 23 | | Async Iterator Streaming | async: 18 | | 协程的 loop.call_soon() | async: 18.5 | | 双线程,每个线程一个 channel | sync: 20 | | 双进程,每个进程一个 channel | sync: 14 | | 只 发送/接收 数据 | sync: 10 | > 调研了 grpc 的 grpc.aio 协程模块和 grfc 模块,都不支持收发同时, 只是让程序不用等待 send 函数而运行后面的代码,但 send 和 receive 同时的话是会阻塞的。 > > 所以grpc 一个 chanel 不能同时发送和接受信息。最后采用的方案就是:一个 client 开两个进程,每个进程开一个chanel(链接),一个用于发送,一个用于接收。 > 实验结果: > > | 同步收发 | 只发送 | 只接收 | 异步收发 | > | ----------------------------- | --------- | ------------ | -------- | > | 61s | 20s | 24s | 31s | > > > [实验代码](https://git.openi.org.cn/PCL-Platform.Intelligence/AISynergy/src/branch/streamming/AISynergy-core/src/AISyncore_streaming/bi_stream/multi_thread_exp) > client: > ``` > def read_process(): > streamin_join = init_grpc() > el = get_event_loop() > corots = (comunicate_one_chanel_read(streamin_join, 10)) > el.run_until_complete(corots) > > def write_process(): > streamin_join = init_grpc() > weights = [np.random.rand(1024,1024) for i in range(100)] > el = get_event_loop() > corots = (comunicate_one_chanel_write(streamin_join, weights, 10)) > el.run_until_complete(corots) > > > p1 = Process(target=write_process, args=()) > p2 = Process(target=read_process, args=()) > p1.start() > p2.start() > p1.join() > p2.join() > ``` > >
yands 评论于 1年前
所有者
从上次实验结果来看,采用多进程的方式来实现的效率最高,所以拟采用多进程的方式来通讯。但进程间是不共享内存的,所以进程间通讯需要对 message 进行序列化和反序列化,主要实现了 clientMessage 和 serverMessage 转换为 Dict 类型的功能。 [代码链接](https://git.openi.org.cn/PCL-Platform.Intelligence/AISynergy/src/branch/streamming/AISynergy-core/src/AISyncore/proto/protobuf_to_dict.py)
登录 并参与到对话中。
未选择里程碑
未指派成员
1 名参与者
通知
到期时间

未设置到期时间。

依赖任务

此任务当前没有任何依赖。

正在加载...
这个人很懒,什么都没留下。