python ZMQ 详解

python zmq操作

  • 创建和销毁套接字:zmq_socket(), zmq_close()
  • 配置和读取套接字选项:zmq_setsockopt(), zmq_getsockopt()
  • 为套接字建立连接:zmq_bind(), zmq_connect()
  • 发送和接收消息:zmq_send(), zmq_recv()

zmq 地址

格式:transport://address

例如:

ipc:///tmp/feeds/0
tcp://127.0.0.1:5000

zmq和tcp区别

  • 使用多种协议,inproc(进程内)、ipc(进程间)、tcp、pgm(广播)、epgm;

  • 当客户端使用zmq_connect()时连接就已经建立了,并不要求该端点已有某个服务使用zmq_bind()进行了绑定;

  • 连接是异步的,并由一组消息队列做缓冲;

  • 连接会表现出某种消息模式,这是由创建连接的套接字类型决定的;

  • 一个套接字可以有多个输入和输出连接;

  • ZMQ没有提供类似zmq_accept()的函数,因为当套接字绑定至端点时它就自动开始接受连接了;

zmq 主要消息模式

  • req/rep(请求答复模式):主要用于远程调用及任务分配等。
  • pub/sub(订阅模式):主要用于数据分发。
  • push/pull(管道模式):主要用于多任务并行。

Request-Reply模式

客户端在请求后,服务端必须回响应

server:

import time
import zmq

context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5555")

while True:
    message = socket.recv()
    print(message)
    #time.sleep(1)
    socket.send("server response!")
    
    
client:

import zmq
import sys

context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")

while(True):
    data = raw_input("input your data:")
    if data == 'q':
        sys.exit()

    socket.send(data)

    response = socket.recv();
    print(response)

Publish-Subscribe模式

广播所有client,没有队列缓存,断开连接数据将永远丢失。client可以进行数据过滤。

server:

import zmq 
context = zmq.Context()  
socket = context.socket(zmq.PUB)  
socket.bind("tcp://127.0.0.1:5000")  
while True:  
    msg = raw_input('input your data:') 
    socket.send(msg)
    
client:

import time
import zmq  
context = zmq.Context()  
socket = context.socket(zmq.SUB)  
socket.connect("tcp://127.0.0.1:5000")
# 这里设置的是过滤条件,不然无法收到消息 
socket.setsockopt(zmq.SUBSCRIBE,'') 
while True:  
    print  socket.recv()

push/pull(管道模式)

由三部分组成,push进行数据推送,work进行数据缓存,pull进行数据竞争获取处理。区别于Publish-Subscribe存在一个数据缓存和处理负载。当连接被断开,数据不会丢失,重连后数据继续发送到对端。

server:

import zmq

context = zmq.Context()

socket = context.socket(zmq.PULL)
socket.bind('tcp://*:5558')

while True:
    data = socket.recv()
    print data
    
work:

import zmq

context = zmq.Context()

recive = context.socket(zmq.PULL)
recive.connect('tcp://127.0.0.1:5557')

sender = context.socket(zmq.PUSH)
sender.connect('tcp://127.0.0.1:5558')

while True:
    data = recive.recv()
    sender.send(data)
    
    
client:

import zmq
import time

context = zmq.Context()
socket = context.socket(zmq.PUSH)

socket.bind('tcp://*:5557')

while True:
    data = raw_input('input your data:')
    socket.send(data)

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×