[854] | 1 | from amqplib import client_0_8 as amqp |
---|
| 2 | |
---|
| 3 | # connect to server |
---|
| 4 | lConnection = amqp.Connection(host="localhost:5672", userid="guest", password="guest", virtual_host="/", insist=False) |
---|
| 5 | lChannel = lConnection.channel() |
---|
| 6 | |
---|
| 7 | # Create queue. Queues receive messages. |
---|
| 8 | # Durable means it'll be recreated at reboot. Auto delete of false means that it will hang |
---|
| 9 | # around when all of the clients disconnect from it. If it wasn't durable, then it would be removed when the last |
---|
| 10 | # client disconnected. If exclusive was true, then only this client would be able to see the queue. We want |
---|
| 11 | # the server to be able to put stuff into this queue, so we've set that to false. |
---|
| 12 | lChannel.queue_declare(queue="myClientQueue", durable=True, exclusive=False, auto_delete=False) |
---|
| 13 | |
---|
| 14 | # Create an exchange. Exchanges public messages to queues |
---|
| 15 | # durable and auto_delete are the same as for a queue. |
---|
| 16 | # type indicates the type of exchange we want - valid values are fanout, direct, topic |
---|
| 17 | lChannel.exchange_declare(exchange="myExchange", type="direct", durable=True, auto_delete=False) |
---|
| 18 | |
---|
| 19 | # Tie the queue to the exchange. Any messages arriving at the specified exchange |
---|
| 20 | # are routed to the specified queue, but only if they arrive with the routing key specified |
---|
| 21 | lChannel.queue_bind(queue="myClientQueue", exchange="myExchange", routing_key="Test") |
---|
| 22 | |
---|
| 23 | # Define a function that is called when something is received on the queue |
---|
| 24 | def data_receieved(msg): |
---|
| 25 | print 'Received: ' + msg.body |
---|
| 26 | |
---|
| 27 | # Connect the queue to the callback function |
---|
| 28 | # no_ack defaults to false. Setting this to true means that the client will acknowledge receipt |
---|
| 29 | # of the message to the server. The message will be sent again if it isn't acknowledged. |
---|
| 30 | lChannel.basic_consume(queue='myClientQueue', no_ack=True, callback=data_receieved, consumer_tag="TestTag") |
---|
| 31 | |
---|
| 32 | # Wait for things to arrive on the queue |
---|
| 33 | while True: |
---|
| 34 | lChannel.wait() |
---|
| 35 | |
---|
| 36 | # unregister the message notification callback |
---|
| 37 | # never called in this example, but this is how you do it. |
---|
| 38 | lChannel.basic_cancel("TestTag") |
---|
| 39 | |
---|
| 40 | # Close connection |
---|
| 41 | lChannel.close() |
---|
| 42 | lConnection.close() |
---|