Spring + RabbitMQ — RPC

G-mi
8 min readJun 15, 2019

--

日前在研究 RabbitMQ,期間有弄了一個簡單的範例,就剛好拿出來分享了。
該範例使用 Spring (非 SpringBoot) 模擬 Client 端使用 RabbitMQ 發送一個 User 物件至 Server 端處理,最後將處理後的 User 物件回傳給 Client 端。

rabbit-config.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

<rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672"
username="guest" password="guest" virtual-host="/" requested-heartbeat="60"/>
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" reply-timeout="10000"/>
<rabbit:admin connection-factory="connectionFactory"/>
<bean name="serializerMessageConverter" class="org.springframework.amqp.support.converter.SimpleMessageConverter"/>
<bean name="messagePropertiesConverter" class="org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter"/>

<bean name="RPCListener" class="com.example.mq.RPCListener"/>

<rabbit:queue name="RPC" auto-delete="false" durable="true" exclusive="false"/>

<rabbit:listener-container message-converter="serializerMessageConverter"
connection-factory="connectionFactory"
acknowledge="manual">
<rabbit:listener ref="RPCListener" queue-names="RPC"/>
</rabbit:listener-container>
</beans>

User.java:

public class User implements Serializable {
String name;
Boolean verification;

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public Boolean getVerification() {
return verification;
}

public void setVerification(Boolean verification) {
this.verification = verification;
}

@Override
public String toString() {
return "User{" +
"name='" + name + '\'' +
", verification=" + verification +
'}';
}
}

SendMessage.java (Client):

public class SendMessage {

@Autowired
private RabbitTemplate amqpTemplate;

public String sendToRPC(String name) {
User user = new User();
user.setName(name == null ? "default" : name);
user.setVerification(false);

try {
MessageProperties msgProp = new MessageProperties();
Message msg = new Message(SerializationUtils.serialize(user), msgProp);
Object response = amqpTemplate.convertSendAndReceive("RPC", msg);
User userReply = SerializationUtils.deserialize((byte[]) response);
System.out.println("userReply:" + userReply);
return userReply.toString();
} catch (Exception e) {
System.out.println(e);
return e.getMessage();
}
}
}

RPCListener.java (Server):

public class RPCListener implements ChannelAwareMessageListener {

@Autowired
private MessagePropertiesConverter messagePropertiesConverter;

@Override
public void onMessage(Message message, Channel channel) throws Exception {
try {
User user = SerializationUtils.deserialize(message.getBody());
System.out.println("The message content is: " + user);

MessageProperties messageProperties = message.getMessageProperties();
AMQP.BasicProperties rabbitMQProperties = messagePropertiesConverter.fromMessageProperties(messageProperties, "UTF-8");
System.out.println("The message's correlationId is:" + rabbitMQProperties.getCorrelationId());

User replyUser = new User();
replyUser.setName(user.getName() + "_reply");
replyUser.setVerification(!user.getVerification() ? true : user.getVerification());

channel.basicPublish("", rabbitMQProperties.getReplyTo(), rabbitMQProperties, SerializationUtils.serialize(replyUser));
channel.basicAck(messageProperties.getDeliveryTag(), false);
} catch (IOException e) {
e.printStackTrace();
}
}
}

運行結果:

範例下載:https://github.com/zxcvb17295/SpringLab/tree/master/spring-rabbitmq

--

--