|
1 |
| -#!/usr/bin/env python |
| 1 | +#!/usr/bin/env python3 |
2 | 2 | # Copyright (c) 2014-2016 The Bitcoin Core developers
|
3 | 3 | # Distributed under the MIT software license, see the accompanying
|
4 | 4 | # file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
5 | 5 |
|
| 6 | +""" |
| 7 | + ZMQ example using python3's asyncio |
| 8 | +
|
| 9 | + Dash should be started with the command line arguments: |
| 10 | + dashd-testnet -daemon \ |
| 11 | + -zmqpubhashblock=tcp://127.0.0.1:28332 \ |
| 12 | + -zmqpubrawtx=tcp://127.0.0.1:28332 \ |
| 13 | + -zmqpubhashtx=tcp://127.0.0.1:28332 \ |
| 14 | + -zmqpubhashblock=tcp://127.0.0.1:28332 |
| 15 | +
|
| 16 | + We use the asyncio library here. `self.handle()` installs itself as a |
| 17 | + future at the end of the function. Since it never returns with the event |
| 18 | + loop having an empty stack of futures, this creates an infinite loop. An |
| 19 | + alternative is to wrap the contents of `handle` inside `while True`. |
| 20 | +
|
| 21 | + A blocking example using python 2.7 can be obtained from the git history: |
| 22 | + https://github.com/bitcoin/bitcoin/blob/37a7fe9e440b83e2364d5498931253937abe9294/contrib/zmq/zmq_sub.py |
| 23 | +""" |
| 24 | + |
6 | 25 | import binascii
|
| 26 | +import asyncio |
7 | 27 | import zmq
|
| 28 | +import zmq.asyncio |
| 29 | +import signal |
8 | 30 | import struct
|
| 31 | +import sys |
| 32 | + |
| 33 | +if not (sys.version_info.major >= 3 and sys.version_info.minor >= 5): |
| 34 | + print("This example only works with Python 3.5 and greater") |
| 35 | + exit(1) |
9 | 36 |
|
10 | 37 | port = 28332
|
11 | 38 |
|
12 |
| -zmqContext = zmq.Context() |
13 |
| -zmqSubSocket = zmqContext.socket(zmq.SUB) |
14 |
| -zmqSubSocket.setsockopt(zmq.SUBSCRIBE, b"hashblock") |
15 |
| -zmqSubSocket.setsockopt(zmq.SUBSCRIBE, b"hashtx") |
16 |
| -zmqSubSocket.setsockopt(zmq.SUBSCRIBE, b"hashtxlock") |
17 |
| -zmqSubSocket.setsockopt(zmq.SUBSCRIBE, b"hashgovernancevote") |
18 |
| -zmqSubSocket.setsockopt(zmq.SUBSCRIBE, b"hashgovernanceobject") |
19 |
| -zmqSubSocket.setsockopt(zmq.SUBSCRIBE, b"hashinstantsenddoublespend") |
20 |
| -zmqSubSocket.setsockopt(zmq.SUBSCRIBE, b"rawblock") |
21 |
| -zmqSubSocket.setsockopt(zmq.SUBSCRIBE, b"rawtx") |
22 |
| -zmqSubSocket.setsockopt(zmq.SUBSCRIBE, b"rawtxlock") |
23 |
| -zmqSubSocket.setsockopt(zmq.SUBSCRIBE, b"rawgovernancevote") |
24 |
| -zmqSubSocket.setsockopt(zmq.SUBSCRIBE, b"rawgovernanceobject") |
25 |
| -zmqSubSocket.setsockopt(zmq.SUBSCRIBE, b"rawinstantsenddoublespend") |
26 |
| -zmqSubSocket.connect("tcp://127.0.0.1:%i" % port) |
| 39 | +class ZMQHandler(): |
| 40 | + def __init__(self): |
| 41 | + self.loop = zmq.asyncio.install() |
| 42 | + self.zmqContext = zmq.asyncio.Context() |
| 43 | + |
| 44 | + self.zmqSubSocket = self.zmqContext.socket(zmq.SUB) |
| 45 | + self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock") |
| 46 | + self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashtx") |
| 47 | + self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashtxlock") |
| 48 | + self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashgovernancevote") |
| 49 | + self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashgovernanceobject") |
| 50 | + self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashinstantsenddoublespend") |
| 51 | + self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawblock") |
| 52 | + self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawtx") |
| 53 | + self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawtxlock") |
| 54 | + self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawgovernancevote") |
| 55 | + self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawgovernanceobject") |
| 56 | + self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawinstantsenddoublespend") |
| 57 | + self.zmqSubSocket.connect("tcp://127.0.0.1:%i" % port) |
27 | 58 |
|
28 |
| -try: |
29 |
| - while True: |
30 |
| - msg = zmqSubSocket.recv_multipart() |
31 |
| - topic = str(msg[0].decode("utf-8")) |
| 59 | + async def handle(self) : |
| 60 | + msg = await self.zmqSubSocket.recv_multipart() |
| 61 | + topic = msg[0] |
32 | 62 | body = msg[1]
|
33 | 63 | sequence = "Unknown"
|
34 | 64 |
|
35 | 65 | if len(msg[-1]) == 4:
|
36 | 66 | msgSequence = struct.unpack('<I', msg[-1])[-1]
|
37 | 67 | sequence = str(msgSequence)
|
38 | 68 |
|
39 |
| - if topic == "hashblock": |
| 69 | + if topic == b"hashblock": |
40 | 70 | print('- HASH BLOCK ('+sequence+') -')
|
41 | 71 | print(binascii.hexlify(body).decode("utf-8"))
|
42 |
| - elif topic == "hashtx": |
| 72 | + elif topic == b"hashtx": |
43 | 73 | print ('- HASH TX ('+sequence+') -')
|
44 | 74 | print(binascii.hexlify(body).decode("utf-8"))
|
45 |
| - elif topic == "hashtxlock": |
| 75 | + elif topic == b"hashtxlock": |
46 | 76 | print('- HASH TX LOCK ('+sequence+') -')
|
47 | 77 | print(binascii.hexlify(body).decode("utf-8"))
|
48 |
| - elif topic == "rawblock": |
| 78 | + elif topic == b"rawblock": |
49 | 79 | print('- RAW BLOCK HEADER ('+sequence+') -')
|
50 | 80 | print(binascii.hexlify(body[:80]).decode("utf-8"))
|
51 |
| - elif topic == "rawtx": |
| 81 | + elif topic == b"rawtx": |
52 | 82 | print('- RAW TX ('+sequence+') -')
|
53 | 83 | print(binascii.hexlify(body).decode("utf-8"))
|
54 |
| - elif topic == "rawtxlock": |
| 84 | + elif topic == b"rawtxlock": |
55 | 85 | print('- RAW TX LOCK ('+sequence+') -')
|
56 | 86 | print(binascii.hexlify(body).decode("utf-8"))
|
57 |
| - elif topic == "rawinstantsenddoublespend": |
| 87 | + elif topic == b"rawinstantsenddoublespend": |
58 | 88 | print('- RAW IS DOUBLE SPEND ('+sequence+') -')
|
59 | 89 | print(binascii.hexlify(body).decode("utf-8"))
|
60 |
| - elif topic == "hashgovernancevote": |
| 90 | + elif topic == b"hashgovernancevote": |
61 | 91 | print('- HASH GOVERNANCE VOTE ('+sequence+') -')
|
62 | 92 | print(binascii.hexlify(body).decode("utf-8"))
|
63 |
| - elif topic == "hashgovernanceobject": |
| 93 | + elif topic == b"hashgovernanceobject": |
64 | 94 | print('- HASH GOVERNANCE OBJECT ('+sequence+') -')
|
65 | 95 | print(binascii.hexlify(body).decode("utf-8"))
|
66 |
| - elif topic == "rawgovernancevote": |
| 96 | + elif topic == b"rawgovernancevote": |
67 | 97 | print('- RAW GOVERNANCE VOTE ('+sequence+') -')
|
68 | 98 | print(binascii.hexlify(body).decode("utf-8"))
|
69 |
| - elif topic == "rawgovernanceobject": |
| 99 | + elif topic == b"rawgovernanceobject": |
70 | 100 | print('- RAW GOVERNANCE OBJECT ('+sequence+') -')
|
71 | 101 | print(binascii.hexlify(body).decode("utf-8"))
|
72 |
| - elif topic == "hashinstantsenddoublespend": |
| 102 | + elif topic == b"hashinstantsenddoublespend": |
73 | 103 | print('- HASH IS DOUBLE SPEND ('+sequence+') -')
|
74 | 104 | print(binascii.hexlify(body).decode("utf-8"))
|
| 105 | + # schedule ourselves to receive the next message |
| 106 | + asyncio.ensure_future(self.handle()) |
| 107 | + |
| 108 | + def start(self): |
| 109 | + self.loop.add_signal_handler(signal.SIGINT, self.stop) |
| 110 | + self.loop.create_task(self.handle()) |
| 111 | + self.loop.run_forever() |
| 112 | + |
| 113 | + def stop(self): |
| 114 | + self.loop.stop() |
| 115 | + self.zmqContext.destroy() |
75 | 116 |
|
76 |
| -except KeyboardInterrupt: |
77 |
| - zmqContext.destroy() |
| 117 | +daemon = ZMQHandler() |
| 118 | +daemon.start() |
0 commit comments