发布于 2015-12-14 00:28:48 | 313 次阅读 | 评论: 0 | 来源: PHPERZ
这里有新鲜出炉的RabbitMQ 能为你做些什么?,程序狗速度看过来!
RabbitMQ 开源消息队列系统
RabbitMQ 是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健壮以及可伸缩性出名的 Erlang 写成,因此也是继承了这些优点。
AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
可以将一些无需即时返回且耗时的操作提取出来,进行异步处理(比如我们项目订单系统中出票成功后记报表、发短信等操作)
这样可以减轻服务器压力,大大节省服务器的请求响应时间,同时利于系统解耦。
生产者在发送消息时,都需要指定一个RoutingKey和Exchange,Exchange在接到该RoutingKey以后,会判断该ExchangeType:
如果是Direct类型,则会将消息中的RoutingKey与该Exchange关联的所有Binding中的BindingKey进行比较,如果相等,则发送到该Binding对应的Queue中。
如果是Fanout类型,则会将消息发送给所有与该Exchange定义过Binding的所有Queues中去,其实是一种广播行为。
如果是Topic类型,则会按照正则表达式,对RoutingKey与BindingKey进行匹配,如果匹配成功,则发送到对应的Queue中。
建立连接
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(portNumber);
Connection conn = factory.newConnection();
创建通道
Channel channel = conn.createChannel();
申明exchange、queue以及它们之间的绑定关系
channel.exchangeDeclare(exchangeName, "direct", true);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, routingKey);
发送消息
byte[] messageBodyBytes = "Hello, world!".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
一种是通过basic.consume
命令,订阅某一个队列中的消息,Channel会自动在处理完上一条消息之后,接收下一条消息(同一个Channel消息处理是串行的)。除非关闭Channel或者取消订阅,否则客户端将会一直接收队列的消息。
另外一种方式是通过basic.get
命令主动获取队列中的消息,但是绝对不可以通过循环调用basic.get来代替basic.consume,这是因为basic.get在实际执行的时候,是首先Consume某一个队列,然后检索第一条消息,然后再取消订阅。如果是高吞吐率的消费者,最好还是建议使用basic.consume。
如果有多个消费者同时订阅同一个队列的话,RabbitMQ是采用循环的方式分发消息
的,每一条消息只能被一个订阅者接收。例如,有队列Queue,其中ClientA和ClientB都Consume了该队列,MessageA到达队列后,被分派到ClientA,ClientA回复服务器收到响应,服务器删除MessageA;再有一条消息MessageB抵达队列,服务器根据“循环推送”原则,将消息会发给ClientB,然后收到ClientB的确认后,删除MessageB;等到再下一条消息时,服务器会再将消息发送给ClientA。
这里我们可以看出,消费者再接到消息以后,都需要给服务器发送一条确认命令,这个既可以在handleDelivery里显示的调用basic.ack实现,也可以在Consume某个队列的时候,设置autoACK属性为true实现。这个ACK仅仅是通知服务器可以安全的删除该消息,而不是通知生产者。 如果消费者在接到消息以后还没来得及返回ACK就断开了连接,消息服务器会重传
该消息给下一个订阅者,如果没有订阅者就会存储该消息。
既然RabbitMQ提供了ACK某一个消息的命令,当然也提供了Reject某一个消息的命令。当客户端发生错误,调用basic.reject命令拒绝某一个消息时,可以设置一个requeue的属性,如果为true,则消息服务器会重传该消息给下一个订阅者;如果为false,则会直接删除该消息。当然,也可以通过ack,让消息服务器直接删除该消息并且不会重传。