rabbit-config.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<rabbit:connection-factory id="connectionFactory" host="127.0.0.1" port="5672"
username="guest" password="guest" virtual-host="/" requested-heartbeat="60"/>
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" reply-timeout="10000"/>
<rabbit:admin connection-factory="connectionFactory"/>
<bean name="serializerMessageConverter" class="org.springframework.amqp.support.converter.SimpleMessageConverter"/>
<bean name="messagePropertiesConverter" class="org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter"/>
<bean name="RPCListener" class="com.example.mq.RPCListener"/>
<rabbit:queue name="RPC" auto-delete="false" durable="true" exclusive="false"/>
<rabbit:listener-container message-converter="serializerMessageConverter"
connection-factory="connectionFactory"
acknowledge="manual">
<rabbit:listener ref="RPCListener" queue-names="RPC"/>
</rabbit:listener-container>
</beans>
User.java:
public class User implements Serializable {
String name;
Boolean verification;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Boolean getVerification() {
return verification;
}
public void setVerification(Boolean verification) {
this.verification = verification;
}
@Override
public String toString() {
return "User{" +
"name='" + name + '\'' +
", verification=" + verification +
'}';
}
}
SendMessage.java (Client):
public class SendMessage {
@Autowired
private RabbitTemplate amqpTemplate;
public String sendToRPC(String name) {
User user = new User();
user.setName(name == null ? "default" : name);
user.setVerification(false);
try {
MessageProperties msgProp = new MessageProperties();
Message msg = new Message(SerializationUtils.serialize(user), msgProp);
Object response = amqpTemplate.convertSendAndReceive("RPC", msg);
User userReply = SerializationUtils.deserialize((byte[]) response);
System.out.println("userReply:" + userReply);
return userReply.toString();
} catch (Exception e) {
System.out.println(e);
return e.getMessage();
}
}
}
RPCListener.java (Server):
public class RPCListener implements ChannelAwareMessageListener {
@Autowired
private MessagePropertiesConverter messagePropertiesConverter;
@Override
public void onMessage(Message message, Channel channel) throws Exception {
try {
User user = SerializationUtils.deserialize(message.getBody());
System.out.println("The message content is: " + user);
MessageProperties messageProperties = message.getMessageProperties();
AMQP.BasicProperties rabbitMQProperties = messagePropertiesConverter.fromMessageProperties(messageProperties, "UTF-8");
System.out.println("The message's correlationId is:" + rabbitMQProperties.getCorrelationId());
User replyUser = new User();
replyUser.setName(user.getName() + "_reply");
replyUser.setVerification(!user.getVerification() ? true : user.getVerification());
channel.basicPublish("", rabbitMQProperties.getReplyTo(), rabbitMQProperties, SerializationUtils.serialize(replyUser));
channel.basicAck(messageProperties.getDeliveryTag(), false);
} catch (IOException e) {
e.printStackTrace();
}
}
}
運行結果:
範例下載:https://github.com/zxcvb17295/SpringLab/tree/master/spring-rabbitmq