IPWorks IoT 2020 Python Edition

Questions / Feedback?

on_message_in Event

Fires when a message is received; as well as when an attempt is made to fetch a message from a currently empty queue.

Syntax

class AMQPClassicMessageInEventParams(object):
  @property
  def channel_name() -> str: ...
  @property
  def consumer_tag() -> str: ...
  @property
  def delivery_tag() -> int: ...
  @property
  def redelivered() -> bool: ...
  @property
  def exchange_name() -> str: ...
  @property
  def routing_key() -> str: ...
  @property
  def message_count() -> int: ...
  @property
  def accept() -> int: ...
  @accept.setter
  def accept(value) -> None: ...

# In class AMQPClassic:
@property
def on_message_in() -> Callable[[AMQPClassicMessageInEventParams], None]: ...
@on_message_in.setter
def on_message_in(event_hook: Callable[[AMQPClassicMessageInEventParams], None]) -> None: ...

Remarks

This event fires anytime a message is received. There are two possible ways for the class to receive a message:

  • Messages can be asynchronously pushed to the class from the server. At any point in time, the server may push a message to the class from a queue that the consume method has been used to attach a consumer to.
  • Messages can be synchronously pulled from the server by the class. The fetch_message method is used to attempt to pull (or "fetch") messages from a specific queue.

This event also fires anytime fetch_message is called against a queue that currently has no messages available to pull. This is a special case, and results in only the ChannelName and MessageCount event parameters being populated.

Other than that special case, and any exceptions noted below, this event's parameters are populated the same way regardless of the manner in which messages are received.

ChannelName always reflects the name of the associated channel.

ConsumerTag reflects the consumer tag associated with the consumer that caused the server to push the message to the class. (ConsumerTag is always empty for messages pulled from the server by fetch_message since no consumers are involved.)

DeliveryTag reflects the server-assigned, channel-specific delivery tag number for the incoming message.

Redelivered reflects whether the server is redelivering a message that is has delivered previously.

ExchangeName reflects the name of the exchange to which the incoming message was originally published. (If the message was originally published to the server's default exchange, whose name is always the empty string, ExchangeName will also be empty.)

RoutingKey reflects the routing key that the message was originally published with.

MessageCount is always -1 when this event fires due to a message being pushed to the class by the server. When this event fires as a result of fetch_message being called, MessageCount reflects the number of messages still available in the queue the class tried to pull a message from (even if there were no messages available to pull).

The Accept parameter can be set to control how the class responds to the incoming message, if it needs to be acknowledged (if the message doesn't need to be acknowledged, the value set to the Accept parameter is ignored). Valid values are:

  • 0 - default: Accept the message; send a positive acknowledgement.
  • 1: Silently accept the message; don't send any acknowledgement.
  • 2: Accept the message; send a cumulative positive acknowledgement coving this, and all previously unacknowledged, messages.
  • 3: Reject the message; send a negative acknowledgement for it, and instruct the server not to return it to the queue.
  • 4: Reject the message; send a negative acknowledgement for it, and instruct the server to return it to the queue.
If the RabbitMQCompatible configuration setting is enabled, then the NackMultiple configuration setting can be used to control whether the two "reject" options (3 and 4) should function as cumulative or singular negative acknowledgements. By default NackMultiple is disabled, and all negative acknowledgements are singular.

If the value provided for the Accept parameter isn't one of those described above, the default (0) will be used instead.

Receiving a Message

// MessageIn event handler.
amqpc1.OnMessageIn += (s, e) => {
  if (e.MessageCount == -1) {
    // The server pushed a message to us asynchronously due to a consumer we created.
    Console.WriteLine("The server pushed this message to us via consumer '" + e.ConsumerTag + "':");
    Console.WriteLine(amqpc1.ReceivedMessage.Body);
  } else if (e.DeliveryTag > 0) {
    // We pulled a message from a queue with the FetchMessage() method.
    Console.WriteLine("Message successfully pulled:");
    Console.WriteLine(amqpc1.ReceivedMessage.Body);
    Console.WriteLine(e.MessageCount + " messages are still available to pull.");
  } else {
    // We tried to pull a message, but there were none available to pull.
    Console.WriteLine("No messages available to pull.");
  }
};

// Attach a consumer to "MyQueue".
amqpc1.Consume("channel", "MyQueue", "consumerTag", false, true, false, false);

// Or, try to fetch a message from "MyQueue".
amqpc1.FetchMessage("channel", "MyQueue", true);

Copyright (c) 2022 /n software inc. - All rights reserved.
IPWorks IoT 2020 Python Edition - Version 20.0 [Build 8265]