binding代表了exchange和queue之间的关系,说明了queue对哪些消息感兴趣
channel.queueBind(queueName,EXCHANGE_NAME,"black")
Direct exchange
上面是将所有的消息发送给消费者,我们想要通过特殊的标示去筛除一些消息,这样可以减少磁盘的浪费,可以关注更多特定的消息
这时可以使用direct exchange binding key matches routing key
x绑定了两个queue,一个queue 的binding key是 orange ,另一个是queue 的binding key是 black,green
其他消息将会被丢弃
将多个队列绑定同一个binding key 也是允许的,这就相当于一种广播的形式
日志类型消息模型如下
public class RemitLogDirect {
public static final String EXCHANGE_NAME = "direct_log";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String serverity = getServerity(args);
String message = getMessage(args);
channel.basicPublish(EXCHANGE_NAME, serverity, null, message.getBytes(Charset.defaultCharset()));
System.out.println(" [x] send '" + serverity + ":" + message + "'");
}
}
private static String getMessage(String[] args) {
Preconditions.checkNotNull(args, "传入参数%s不能为空", args);
String backMessage = "";
for (String type : args) {
//some info
}
return backMessage;
}
private static String getServerity(String[] args) {
Preconditions.checkNotNull(args, "传入参数%s不能为空", args);
for (String type : args) {
if (type.equals("error")) {
return "error";
}
if (type.equals("info")) {
return "info";
}
if (type.equals("warn")) {
return "warn";
}
}
return "undefined";
}
}
public class ReceiveLogsDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queueName = channel.queueDeclare().getQueue();
if (args.length < 1) {
System.out.println("Usages ReceiveLogsDirect [info] [warning] [error]");
System.exit(1);
}
for (String serverity : args) {
channel.queueBind(queueName, EXCHANGE_NAME, serverity);
}
System.out.println("[x] waiting for messages ,to exit press c");
DeliverCallback deliverCallback = ((consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("[x] received ...." + message);
});
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}