java延时队列使用

2017-02-13 - 8,990 Views - 0 Goods - Nothing

在实际的业务中会遇到如下场景:

1)过30秒给商家发送用户下单成功通知
2)过1分钟失败任务重试
3)过1小时发送邮件
等等,需要延时一段时间处理,在Java的juc包中给我提供了DelayQueue延时队列处理,过一会该处理的事儿。
DelayQueue<E extends Delayed>,一个无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部是延迟期满后保存时间最长的Delayed元素。如果延迟都还没有期满,则队列没有头部,并且 poll 将返回 null。当一个元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一个小于等于 0 的值时,将发生到期。即使无法使用 take 或 poll 移除未到期的元素,也不会将这些元素作为正常元素对待。例如,size 方法同时返回到期和未到期元素的计数。此队列不允许使用 null 元素。
下面通过代码演示这一场景:

1.新建消息实现Delayed接口


package com.songliguo.com;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/** 
 * @ClassName: Message 
 * @Description: TODO(消息延迟实体) 
 * @author SongLiGuo
 * @date 2017-2-13 下午4:22:03 
 *  
 */
public class Message implements Delayed{

	private int id;
	private String body;//消息体
	private long excuteTime;//执行时间
	
	/**    
	 * @author SongLiGuo       
	 * @created 2017-2-13 下午4:30:52 
	 * @return type 
	 */
	
	public int getId() {
		return id;
	}

	/**     
	 * @author SongLiGuo       
	 * @created 2017-2-13 下午4:30:52         
	 * @param id   
	 */
	public void setId(int id) {
		this.id = id;
	}

	/**    
	 * @author SongLiGuo       
	 * @created 2017-2-13 下午4:30:52 
	 * @return type 
	 */
	
	public String getBody() {
		return body;
	}

	/**     
	 * @author SongLiGuo       
	 * @created 2017-2-13 下午4:30:52         
	 * @param body   
	 */
	public void setBody(String body) {
		this.body = body;
	}

	/**    
	 * @author SongLiGuo       
	 * @created 2017-2-13 下午4:30:52 
	 * @return type 
	 */
	
	public long getExcuteTime() {
		return excuteTime;
	}

	public Message(int id, String body,long delayTime) {    
        this.id = id;    
        this.body = body;    
        this.excuteTime = TimeUnit.NANOSECONDS.convert(delayTime, TimeUnit.MILLISECONDS) + System.nanoTime();    
    }   
	
	/**     
	 * @author SongLiGuo       
	 * @created 2017-2-13 下午4:30:52         
	 * @param excuteTime   
	 */
	public void setExcuteTime(long excuteTime) {
		this.excuteTime = excuteTime;
	}

	public int compareTo(Delayed delayed) {
		Message msg = (Message)delayed;
		return Integer.valueOf(this.id)>Integer.valueOf(msg.id)?1:( Integer.valueOf(this.id)<Integer.valueOf(msg.id)?-1:0);    
	}

	public long getDelay(TimeUnit unit) {
		 return  unit.convert(this.excuteTime - System.nanoTime(), TimeUnit.NANOSECONDS);     
	}

	
}

2.创建消费者


package com.songliguo.com;

import java.util.concurrent.DelayQueue;

/** 
 * @ClassName: Consumer 
 * @Description: TODO(消费者) 
 * @author SongLiGuo
 * @date 2017-2-13 下午5:00:35 
 *  
 */
public class Consumer implements Runnable {

	//延时队列
	private DelayQueue queue;
	
	public Consumer(DelayQueue queue){
		this.queue = queue;
	}
	
	public void run() {
		while (true) {
			try {
				Message take = queue.take();
				System.out.println("消费消息ID=="+ take.getId() + "消费消息内容==" + take.getBody());
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		
	}

	//test延迟时队列发送消息
	public static void main(String[] args) {
		//创建延时队列
		DelayQueue queue = new DelayQueue();
		//添加延时队列消息,m1 延时3s
		Message m1 = new Message(1, "你好我是延迟队列消息1……", 3000);
		//添加延时队列消息, m2延时5s
		Message m2 = new Message(2, "你好我是延迟队列消息2……", 5000);
		//添加延时队列消息, m3延时10s
		Message m3 = new Message(3, "你好我是延迟队列消息3……", 10000);
		queue.offer(m1);
		queue.offer(m2);
		queue.offer(m3);
		//启动消费线程
		new Thread(new Consumer(queue)).start();
	}
	
}

3.向延时队列发送消息测试结果如下:


消费消息ID==1消费消息内容==你好我是延迟队列消息1……
消费消息ID==2消费消息内容==你好我是延迟队列消息2……
消费消息ID==3消费消息内容==你好我是延迟队列消息3……
转载请注明原文链接:首页 -> 技术交流 -> JAVA开发 -> java延时队列使用
  • 支付宝打赏
  • 微信打赏

China.BeiJing

如果说人生是自我编写的程序,那么青春就是其中意味深长的代码