大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。
Jetbrains全家桶1年46,售后保障稳定
文章目录
ZMQ 通信协议小结 ?
最近有时间了把这个坑填一填!!!
前言 ?
- 项目中涉及到 zmq通信协议相关内容,所以将学习、使用过程同步分享
- 通篇以代码分享为主,且本文对底层socket不做过多叙述,以实际应用为准,希望能帮到各位!
- Talk is cheap, Show me the code
zmq的三种模型 ?
1、Request_Reply模式(请求——应答): REP、 REQ ☎️
- 一发一收 无缓存 断开连接数据丢失;
- 生产中也可以一个server对应多个client;
- 双向消息,
REP(server)
端必须recv到REQ(client)
的消息之后,调用send返回,否则通道堵塞; 相同的REQ(client)
端负责send消息到REP(server)
,然后调用recv获取REP(server)
的返回;
伪代码
- server.py
# 1、Request_Reply模式 # server import zmq context = zmq.Context() socket = context.socket(zmq.REP) socket.bind('tcp://*:5556') while True: message = socket.recv() print(message) socket.send('server response')
- client.py
# client import zmqimport syscontext = zmq.Context()socket = context.socket(zmq.REQ)socket.connect('tcp://localhost:5556')while True: data = raw_input('input your data:') if data == 'q': sys.exit() socket.send(data) response = socket.recv() print(response)
应用场景
- 场景说明:
我们定义一个非阻塞
的消息通道, 用作发送特定的Python结构体数据,包含三个文件如下: - Code:
server.py
import time import zmq from data import zmqStruct context = zmq.Context() socket = context.socket(zmq.REP) socket.bind("tcp://*:5656") while True: try: message = socket.recv_pyobj(zmq.NOBLOCK) print(message) #time.sleep(1) socket.send_pyobj('123123123') except zmq.Again as e: if e.errno!=zmq.EAGAIN: print(repr(e)) time.sleep(1)
-
client.py
from data import zmqStruct def zmqREQ(): import zmq context = zmq.Context() socket = context.socket(zmq.REQ) socket.connect("tcp://{}:5656".format('192.168.24.107')) return socket sendStruct = zmqStruct() zmqClient = zmqREQ() zmqClient.send_pyobj(sendStruct) print zmqClient.recv_pyobj()
-
data.py
class zmqStruct(onject): # 消息结构体 def __init__(self, cmd=0, data=None, desc=''): self.cmd = cmd self.data = data self.desc = desc
2、Publish-Subscribe模式(发布——订阅): PUB、SUB ?
- 广播所有client,无缓存,断开连接数据丢失。(当然所有的问题都可以通过增加中间层的方式解决);
- 发布端发布主题topic,订阅端只会收到已订阅的主题topic;
- PUB端发送消息,SUB端接受消息;
- SUB可以注册多个PUB;
- 如果PUB没有任何SUB,那么消息将会被丢弃;
- SUB端消费过慢,消息则堆积到PUB端
- 单工-单向数据传输
伪代码
- server.py
# 2、Publish-Subscribe模式 # server import zmq context = zmq.Context() socket = context.socket(zmq.PUB) socket.bind("tcp://*:5005") while True: msg = input('input your data:').encode('utf-8') socket.send(msg)
- client.py
# client import zmq context = zmq.Context() socket = context.socket(zmq.SUB) socket.connect('tcp://127.0.0.1:5005') # 使用socket.setsockopt()进行过滤 socket.setsockopt(zmq.SUBSCRIBE,b'') while True: print(socket.recv_string())
应用场景
-
场景说明:
我们假定 有一个任务调度器 , 结构为1个 master 对应 10个 slave
, master接受任务,将任务投递给 slave. -
Code:
- master.py
import time import zmq context = zmq.Context() socket = context.socket(zmq.PUB) socket.bind("tcp://*:5000") tasks = [i for i in range(100)] def pub(): # 这个延时 是为了服务端绑定 socket 后会等待200毫秒避免消息丢失; 也是为了保证服客户端环境完备的折中之举 time.sleep(1) for i in tasks: socket.send(str(i)) if __name__ == '__main__': pub()
- slave.py
import time import threading from concurrent.futures import ThreadPoolExecutor import zmq context = zmq.Context() socket = context.socket(zmq.SUB) socket.connect("tcp://127.0.0.1:5000") socket.setsockopt(zmq.SUBSCRIBE, '') threadpool = ThreadPoolExecutor(10) def submsg(): """ socket 接受消息使用 `zmq.NOBLOCK` 非阻塞模式来进行,可以保证保证循环体内的其他功能正常使用 :return: """ while 1: try: msg = socket.recv(flags=zmq.NOBLOCK) except zmq.Again as e: if e.errno != zmq.EAGAIN: print(repr(e)) else: print '接收到广播消息,线程池投递任务 msg={}'.format(msg) threadpool.submit(work, msg) def work(msg): print '开始工作 参数{}'.format(msg) time.sleep(2) # 模拟功能执行时间 print '结束工作' if __name__ == '__main__': submsg()
- master.py
3、Parallel Pipeline模式(push——pull): PUSH、PULL ?
- 管道模式(单工) – 单向通道;
- 可以由三部分组成:push推送数据,work缓存数据,pull竞争数据,断开连接数据不丢失,重连继续发送。work中间件可以去掉;
伪代码
- server.py
# 3、Parallel Pipeline模式 # server import zmq context = zmq.Context() socket = context.socket(zmq.PULL) socket.bind('tcp://*:5566') while True: data = socket.recv() print(data)
- work.py
# work 无work push 会阻塞掉 import zmq context = zmq.Context() recive = context.socket(zmq.PULL) recive.connect('tcp://127.0.0.1:5565') sender = context.socket(zmq.PUSH) sender.connect('tcp://127.0.0.1:5566') while True: data = recive.recv() sender.send(data)
- client.py
# client import zmq import time context = zmq.Context() socket = context.socket(zmq.PUSH) socket.bind('tcp://*:5565') while True: data = raw_input('input your data:') socket.send(data)
应用场景
-
场景说明:
-
Code:
-
Error:
发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/203674.html原文链接:https://javaforall.cn
【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛
【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...