=== publisher === import time import zmq import random def test(): context = zmq.Context() publisher = context.socket(zmq.PUB) publisher.bind("tcp://*:12222") time.sleep(0.2) rand_lst = [random.randint(1,100) for _ in xrange(100)] for i in rand_lst: publisher.send_pyobj({'dic_test':i}) print "Done" if __name__ == "__main__": test() === subscriber === mport time import zmq def test(): ctx = zmq.Context() sock = ctx.socket(zmq.SUB) sock.connect("tcp://127.0.0.1:12222") sock.setsockopt(zmq.SUBSCRIBE, "") while True: val = sock.recv_pyobj() print val if __name__ == "__main__": test() ======== txZMQ(Publisher,Subscriber) + Twisted ========= from twisted.internet import reactor, defer from txZMQ import ZmqFactory, ZmqEndpoint, ZmqPubConnection, ZmqSubConnection import time zf = ZmqFactory() b = ZmqEndpoint('bind','tcp://127.0.0.1:10002') c = ZmqEndpoint('connect','tcp://127.0.0.1:10001') p = ZmqPubConnection(zf, b) s = ZmqSubConnection(zf, c) s.subscribe("") def doPrint(*args): print args s.gotMessage = doPrint def publish(): data = str({'action':'1', 'params': { 'test':'data' } }) p.publish(data,'') reactor.callLater(1, publish) publish() reactor.run() ======== txZMQ(Request,Reply) + Twisted ========= == Reply from txZMQ import ZmqFactory, ZmqEndpoint from txZMQ.xreq_xrep import ZmqXREPConnection from txZMQ.connection import ZmqEndpointType from txZMQ import ZmqFactory, ZmqEndpoint, ZmqPubConnection, ZmqSubConnection zf = ZmqFactory() ZmqXREPConnection.identity = 'ZMQTEST' router = ZmqXREPConnection(zf, ZmqEndpoint(ZmqEndpointType.Bind, 'tcp://127.0.0.1:10022')) def gotMessageEvent(message_id, *args): router.reply(message_id,'Got {0!s}'.format(args)) router.gotMessage = gotMessageEvent reactor.run() == Request from twisted.internet import reactor, defer from txZMQ import ZmqFactory, ZmqEndpoint from txZMQ.xreq_xrep import ZmqXREQConnection from txZMQ.connection import ZmqEndpointType from txZMQ import ZmqFactory, ZmqEndpoint, ZmqPubConnection, ZmqSubConnection zf = ZmqFactory() ZmqXREQConnection.identity = 'ZMQTEST' router = ZmqXREQConnection(zf, ZmqEndpoint(ZmqEndpointType.Connect, 'tcp://127.0.0.1:10022')) count = 0 def get_next_id(): global count count += 1 return 'msg_id_{0!s}'.format(count) router._getNextId = get_next_id def cb(result): print result for i in xrange(5): msg = ['sending','sending{0!s}'.format(i)] d = router.sendMsg(*msg) d.addCallback(cb) reactor.run()