博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RabbitMQ(6)-Spring AMQP,Spring集成RabbitMQ
阅读量:6344 次
发布时间:2019-06-22

本文共 4971 字,大约阅读时间需要 16 分钟。

hot3.png

一.Qucik Start

1.rabbitmq-producer.xml

    
    
    
    
    
    
    
    
    
        
            
        
        
    
        
            
        
    
    
    
    
    
    
    
    
    
   
    

2.rabbitmq-consumer.xml

    
    
    
    
    
    
    
        
            
        
    
    
    
        
            
        
        
   
    
    
    
        
        
        
        
        
        
        
        
        
    
        
    

3.监听器(接收器):

import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessageListener;import java.io.UnsupportedEncodingException;public class ConsumeMessage implements MessageListener {    @Override    public void onMessage(Message message) {        try {            //使用fastJson将数据对象转换为json数据            String receiveMsg =new String(message.getBody(),"utf-8");            System.out.println("Receiver msg:" + receiveMsg);        } catch (UnsupportedEncodingException e) {            e.printStackTrace();        }    }}

4.Producer:

import org.springframework.amqp.core.AmqpTemplate;import org.springframework.context.ApplicationContext;import org.springframework.context.support.GenericXmlApplicationContext;public class Producer {    public static void main(String[] args) throws InterruptedException {        ApplicationContext context =                new GenericXmlApplicationContext("rabbitmq-producer.xml");        AmqpTemplate template = context.getBean(AmqpTemplate.class);        //direct模式:接收routing-key=queue_one_key的消息        //template.convertAndSend("queue_one_key", "hello!");        //topic模式:以foo.* routing-key为模版接收消息        template.convertAndSend("foo.bar", "hello!");        //fanout模式:在集群范围内的所有consumer都会收到消息        //template.convertAndSend("hello!");        System.out.println("send message:hello world!");    }}

5.consumer:

import org.springframework.context.ApplicationContext;import org.springframework.context.support.GenericXmlApplicationContext;public class Cosumer {    public static void main(String[] args) {        ApplicationContext context =                new GenericXmlApplicationContext("rabbitmq-consumer.xml");    }}

6.json数据转换器:

import com.alibaba.fastjson.JSON;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessageProperties;import org.springframework.amqp.support.converter.AbstractMessageConverter;import org.springframework.amqp.support.converter.MessageConversionException;import java.io.UnsupportedEncodingException;public class FastJsonMessageConverter  extends AbstractMessageConverter {    private static Log log = LogFactory.getLog(FastJsonMessageConverter.class);    public static final String DEFAULT_CHARSET = "UTF-8";    private volatile String defaultCharset = DEFAULT_CHARSET;    public FastJsonMessageConverter() {        super();        //init();    }    public void setDefaultCharset(String defaultCharset) {        this.defaultCharset = (defaultCharset != null) ? defaultCharset                : DEFAULT_CHARSET;    }    public Object fromMessage(Message message)            throws MessageConversionException {        return null;    }    public 
 T fromMessage(Message message,T t) {        String json = "";        try {            json = new String(message.getBody(),defaultCharset);        } catch (UnsupportedEncodingException e) {            e.printStackTrace();        }        return (T) JSON.parseObject(json, t.getClass());    }    protected Message createMessage(Object objectToConvert,                                    MessageProperties messageProperties)            throws MessageConversionException {        byte[] bytes = null;        try {            String jsonString = JSON.toJSONString(objectToConvert);            //FastJson.toJson(objectToConvert);            bytes = jsonString.getBytes(this.defaultCharset);        } catch (UnsupportedEncodingException e) {            throw new MessageConversionException(                    "Failed to convert Message content", e);        }        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);        messageProperties.setContentEncoding(this.defaultCharset);        if (bytes != null) {            messageProperties.setContentLength(bytes.length);        }        return new Message(bytes, messageProperties);    }}

转载于:https://my.oschina.net/lzhaoqiang/blog/548073

你可能感兴趣的文章
放低自己,你其实没有自己想象的那么重要
查看>>
[WASM + Rust] Debug a WebAssembly Module Written in Rust using console.log
查看>>
.net core 使用Redis的发布订阅
查看>>
C++ 构造和析构
查看>>
md5目录下的文件包括子目录
查看>>
Redis连接
查看>>
web页面的绝对路径
查看>>
tomcat架构分析(valve机制)
查看>>
异步化,高并发大杀器
查看>>
Windows + Ubuntu双系统时间不一致
查看>>
6811汇编语言
查看>>
LINQ 模糊搜索
查看>>
JS阻止冒泡方法(转)
查看>>
Linux下XAMPP装完之后,Navicat无法连上数据库的问题的解决 注意'mypassword'是当前的mysql登录密码...
查看>>
javascript之求最值
查看>>
终止java线程的2种方法
查看>>
Node.js使用的场景 (翻译自Node.js早期贡献者Felix的文章)
查看>>
Loadrunner windows计数器
查看>>
iOS开发UI篇—UITableviewcell的性能问题
查看>>
Intel 被 ARM 逼急了
查看>>