博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
一个高效的Schedular
阅读量:5752 次
发布时间:2019-06-18

本文共 12347 字,大约阅读时间需要 41 分钟。

Scheduler

package com.schedular; 


import java.util.concurrent.ScheduledFuture; 

import java.util.concurrent.ScheduledThreadPoolExecutor; 

import java.util.concurrent.ThreadPoolExecutor; 

import java.util.concurrent.TimeUnit; 


/** 
* A scheduler that internally uses a thread pool to permit concurrent execution    
* of scheduled tasks. This scheduler is preferable to a {@link java.util.Timer}    
* when the execution of one task may block long enough to delay execution of    
* other tasks. 
*/
 

public 
class Scheduler 
implements ShutdownCallback { 

         

        
/** 
         * Specify no initial delay when scheduling a task.    
         */
 

        
public 
static 
final 
long NO_INITIAL_DELAY = 0; 

         

        
private 
final ScheduledThreadPoolExecutor _executor; 

         

        
/** 
         * Creates a scheduler. 
         * 
         * @param poolSize The thread pool size. 
         * @throws IllegalArgumentException if the pool size is less than or equal to zero. 
         */
 

        
public Scheduler(
int poolSize) { 

                
if (poolSize <= 0) { 

                        
throw 
new IllegalArgumentException(
"illegal pool size: "+poolSize); 

                } 

                 

                LoggingThreadGroup threadGroup = 
new LoggingThreadGroup(
"SchedulerGroup"); 

                 

                
// set pool threads as daemons in case an orderly shutdown does not occur 

                ThreadGroupFactory tFactory=    

                        
new ThreadGroupFactory(threadGroup, 
"Scheduler-"); 

                tFactory.createDaemonThreads(
true);             

                 

                _executor =    

                     
new ScheduledThreadPoolExecutor(poolSize,    

                                                                                     tFactory,    

                                                                                     
new ThreadPoolExecutor.DiscardPolicy()); 

                 

                
// delayed tasks should not execute after shutdown 

                _executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(
false); 

                

             
// prestart one thread to make sure the first execution is timely 

             _executor.prestartCoreThread(); 

        } 

         

        
/** 
         * Creates and executes a periodic action that becomes enabled first after    
         * the given initial delay, and subsequently with the given period; that    
         * is executions will commence after initialDelay then initialDelay+period,    
         * then initialDelay + 2 * period, and so on. If any execution of the task    
         * encounters an exception, subsequent executions are suppressed. Otherwise,    
         * the task will only terminate via cancellation or termination of the    
         * executor. If any execution of this task takes longer than its period,    
         * then subsequent executions may start late, but will not concurrently    
         * execute. 
         *    
         * @param task The task to execute. 
         * @param initialDelay The initial delay (in msec) before the first execution. 
         * @param period The period (in msec) between successive executions. 
         * @return A ScheduledFuture that may be used to cancel task execution. 
         */
 

        
public ScheduledFuture scheduleAtFixedRate(Runnable task, 
long initialDelay, 
long period) { 

                
return _executor.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.MILLISECONDS); 

        } 

         

        
/** 
         * Creates and executes a periodic action that becomes enabled first after    
         * the given initial delay, and subsequently with the given delay between    
         * the termination of one execution and the commencement of the next.    
         * If any execution of the task encounters an exception, subsequent    
         * executions are suppressed. Otherwise, the task will only terminate via    
         * cancellation or termination of the executor. 
         *    
         * @param task The task to execute. 
         * @param initialDelay The initial delay (in msec) before the first execution. 
         * @param delay The delay (in msec) between termination of one execution    
         *                            and commencement of the next. 
         * @return A ScheduledFuture that may be used to cancel task execution. 
         */
 

        
public ScheduledFuture scheduleWithFixedDelay(Runnable task, 
long initialDelay, 
long delay) { 

                
return _executor.scheduleWithFixedDelay(task, initialDelay, delay, TimeUnit.MILLISECONDS); 

        } 

         

        
/** 
         * Tries to remove from the work queue all {@link Future} 
         * tasks that have been cancelled. This method can be useful as a 
         * storage reclamation operation, that has no other impact on 
         * functionality. Cancelled tasks are never executed, but may 
         * accumulate in work queues until worker threads can actively 
         * remove them. Invoking this method instead tries to remove them now. 
         * However, this method may fail to remove tasks in 
         * the presence of interference by other threads. 
         */
 

        
public 
void purgeTasks() { 

                _executor.purge(); 

        } 

                 

        
/** 
         * Shut down the scheduler in an orderly manner, allowing any currently    
         * executing tasks to complete. 
         */
 

        
public 
void shutdown() { 

                _executor.shutdown(); 

                                 

                
// block at most for 5 seconds for any currently executing task to terminate 

                
try { 

                        _executor.awaitTermination(5, TimeUnit.SECONDS); 

                } 
catch (InterruptedException e) { 

                        
// do nothing 

                } 

        } 



 
ThreadGroupFactory
import java.util.concurrent.ThreadFactory; 


public 
class ThreadGroupFactory    

        
implements ThreadFactory 


        
private ThreadGroup _group; 

        
private String            _namePrefix; 

        
private 
int                 _numThreads; 

        
private 
boolean         _createDaemonThreads; 

        
private 
final Object            _syncLock = 
new Object(); 

         

        
/** 
         * Creates an instance where the threads created by this factory are    
         * assigned to the specified ThreadGroup. 
         * 
         * @param group The ThreadGroup. 
         * @param namePrefix The name prefix for each thread created by this factory. 
         */
 

        
public ThreadGroupFactory(ThreadGroup group, String namePrefix) { 

                _group            = group; 

                _namePrefix = namePrefix; 

                _numThreads = 0; 

        } 

         

        
/** 
         * Creates an instance where the threads created by this factory are    
         * assigned to the current thread's ThreadGroup. 
         * 
         * @param namePrefix The name prefix for each thread created by this factory. 
         */
 

        
public ThreadGroupFactory(String namePrefix) { 

                
this(Thread.currentThread().getThreadGroup(), namePrefix); 

        } 

         

        
/** 
         * Set the threads created by this factory to be daemon threads. 
         *    
         * @param daemonThreads <code>true</code> to set threads created by this    
         *                                            factory to be daemon threads. 
         */
 

        
public 
void createDaemonThreads(
boolean daemonThreads) { 

                
synchronized (_syncLock) { 

                        _createDaemonThreads = daemonThreads;                         

                } 

        } 

         

        
public Thread newThread(Runnable r) { 

                String name; 

                
boolean daemon; 

                 

                
synchronized (_syncLock) { 

                        name = _namePrefix + ++_numThreads;    

                        daemon = _createDaemonThreads; 

                } 

                 

                Thread thread = 
new Thread(_group, r, name); 

                thread.setDaemon(daemon); 

                 

                
return thread; 

        } 


 
LoggingThreadGroup
/** 
* Create your threads using this ThreadGroup to have uncaught exceptions 
* logged via Log4j    
*/
 

public 
class LoggingThreadGroup    

        
extends ThreadGroup 


        
private 
final Log _log = LogFactory.getLog(LoggingThreadGroup.
class); 


        
public LoggingThreadGroup(String groupName) { 

                
super(groupName); 

        } 


        
public 
void uncaughtException(Thread t, Throwable exc) { 

                _log.warn(
"ThreadGroup[" + 
this.getName() + 

                                    
"]: Unhandled exception", exc); 

                
super.uncaughtException(t, exc); 

        } 

}
 
Junit
/* 
* NOTE: This copyright does *not* cover user programs that use HQ 
* program services by normal system calls through the application 
* program interfaces provided as part of the Hyperic Plug-in Development 
* Kit or the Hyperic Client Development Kit - this is merely considered 
* normal use of the program, and does *not* fall under the heading of 
* "derived work". 
* Copyright (C) [2004-2008], Hyperic, Inc. 
* This file is part of HQ. 
* HQ is free software; you can redistribute it and/or modify 
* it under the terms version 2 of the GNU General Public License as 
* published by the Free Software Foundation. This program is distributed 
* in the hope that it will be useful, but WITHOUT ANY WARRANTY; without 
* even the implied warranty of MERCHANTABILITY or FITNESS FOR A 
* PARTICULAR PURPOSE. See the GNU General Public License for more 
* details. 
* You should have received a copy of the GNU General Public License 
* along with this program; if not, write to the Free Software 
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 
* USA. 
*/
 


package org.hyperic.hq.application; 


import junit.framework.TestCase; 

import java.util.concurrent.ScheduledFuture; 


/** 
* Tests the Scheduler class. 
*/
 

public 
class Scheduler_test 
extends TestCase { 


        
public Scheduler_test(String name) { 

                
super(name); 

        } 

                 

        
public 
void testIllegalPoolSize() { 

                
try { 

                        
new Scheduler(-1); 

                        fail(
"Expected IllegalArgumentException."); 

                } 
catch (IllegalArgumentException e) { 

                        
// expected outcome 

                } 
catch (Exception e) { 

                        fail(
"Expected IllegalArgumentException instead of:"+e); 

                } 

                 

                
try { 

                        
new Scheduler(0); 

                        fail(
"Expected IllegalArgumentException."); 

                } 
catch (IllegalArgumentException e) { 

                        
// expected outcome 

                } 
catch (Exception e) { 

                        fail(
"Expected IllegalArgumentException instead of:"+e); 

                } 

        } 

         

        
public 
void testExecuteAtFixedRate() 
throws Exception { 

                Scheduler scheduler = 
new Scheduler(1); 

                                 

                RunnableCounter counter = 
new RunnableCounter(50, 
false); 

                 

                ScheduledFuture future =    

                        scheduler.scheduleAtFixedRate(counter, Scheduler.NO_INITIAL_DELAY, 100); 

                 

                Thread.sleep(210); 

                 

                assertFalse(future.isDone()); 

                assertFalse(future.isCancelled()); 

                 

                assertEquals(3, counter.numRuns()); 

                 

                scheduler.shutdown(); 

        } 

         

        
public 
void testExecuteWithFixedDelay() 
throws Exception { 

                Scheduler scheduler = 
new Scheduler(1); 

                 

                RunnableCounter counter = 
new RunnableCounter(100, 
false); 

                 

                
// the execution period is about 200 msec (delay+runtime) 

                ScheduledFuture future =    

                        scheduler.scheduleWithFixedDelay(counter, Scheduler.NO_INITIAL_DELAY, 100); 

                 

                Thread.sleep(210); 

                 

                assertFalse(future.isDone()); 

                assertFalse(future.isCancelled()); 

                 

                assertEquals(2, counter.numRuns()); 

                 

                scheduler.shutdown();                 

        } 

         

        
public 
void testCancellingScheduledTask() 
throws Exception { 

                Scheduler scheduler = 
new Scheduler(1); 

                 

                RunnableCounter counter = 
new RunnableCounter(50, 
false); 

                 

                ScheduledFuture future =    

                        scheduler.scheduleAtFixedRate(counter, Scheduler.NO_INITIAL_DELAY, 100); 

                 

                Thread.sleep(210); 

                 

                assertFalse(future.isDone()); 

                assertFalse(future.isCancelled()); 

                 

                future.cancel(
true); 

                 

                assertTrue(future.isDone()); 

                assertTrue(future.isCancelled()); 

                 

                Thread.sleep(100); 

                 

                assertEquals(3, counter.numRuns()); 

                 

                scheduler.shutdown(); 

        } 

         

        
public 
void testExecuteAtFixedRateWithInitialDelay() 
throws Exception { 

                Scheduler scheduler = 
new Scheduler(1); 

                 

                RunnableCounter counter = 
new RunnableCounter(50, 
false); 

                 

                ScheduledFuture future =    

                        scheduler.scheduleAtFixedRate(counter, 100, 100); 

                 

                Thread.sleep(250); 

                 

                assertFalse(future.isDone()); 

                assertFalse(future.isCancelled()); 

                 

                assertEquals(2, counter.numRuns()); 

                 

                scheduler.shutdown();                 

        } 

         

        
public 
void testExecuteWithFixedDelayWithInitialDelay() 
throws Exception { 

                Scheduler scheduler = 
new Scheduler(1); 

                 

                RunnableCounter counter = 
new RunnableCounter(50, 
false); 

                 

                ScheduledFuture future =    

                        scheduler.scheduleWithFixedDelay(counter, 100, 50); 

                 

                Thread.sleep(300); 

                 

                assertFalse(future.isDone()); 

                assertFalse(future.isCancelled()); 

                 

                assertEquals(2, counter.numRuns()); 

                 

                scheduler.shutdown();                 

        } 

         

        
/** 
         * Test executing one task at a fixed rate and one task with a fixed delay. 
         *    
         * @throws Exception 
         */
 

        
public 
void testExecuteConcurrentTasks() 
throws Exception { 

                Scheduler scheduler = 
new Scheduler(2); 

                 

                RunnableCounter counter1 = 
new RunnableCounter(10, 
false); 


                RunnableCounter counter2 = 
new RunnableCounter(10, 
false); 

                                 

                ScheduledFuture future1 =    

                        scheduler.scheduleAtFixedRate(counter1, Scheduler.NO_INITIAL_DELAY, 50); 

                 

                
// the execution period is about 60 msec (delay+runtime) 

                ScheduledFuture future2 =    

                        scheduler.scheduleWithFixedDelay(counter2, Scheduler.NO_INITIAL_DELAY, 50); 

                 

                Thread.sleep(210); 

                 

                assertFalse(future1.isDone()); 

                assertFalse(future1.isCancelled()); 


                assertFalse(future2.isDone()); 

                assertFalse(future2.isCancelled()); 

                 

                assertEquals(5, counter1.numRuns()); 


                assertEquals(4, counter2.numRuns()); 

                 

                scheduler.shutdown();                                

        } 

         

        
/** 
         * Test that a scheduled task will stop executing if it throws an    
         * unchecked exception. 
         */
 

        
public 
void testUncheckedExceptionInTask() 
throws Exception { 

                Scheduler scheduler = 
new Scheduler(1); 

                 

                RunnableCounter counter = 
new RunnableCounter(50, 
true); 

                 

                ScheduledFuture future =    

                        scheduler.scheduleAtFixedRate(counter, Scheduler.NO_INITIAL_DELAY, 100); 

                 

                Thread.sleep(210); 

                 

                assertTrue(future.isDone()); 

                assertFalse(future.isCancelled()); 

                 

                
// only should have run once since a runtime exception was thrown 

                assertEquals(1, counter.numRuns()); 

                 

                scheduler.shutdown();                 

        } 

         

        
private 
class RunnableCounter 
implements Runnable { 


                
private 
final 
long _sleepTime; 

                
private 
int _numRuns; 

                
private 
final 
boolean _throwException; 

                
private 
final Object _lock = 
new Object(); 

                 

                
public RunnableCounter(
long sleepTime, 
boolean throwUncheckedException) { 

                        _sleepTime = sleepTime; 

                        _throwException = throwUncheckedException; 

                } 

                 

                
public 
void run() { 

                        
synchronized (
this) { 

                                _numRuns++; 

                        } 

                         

                        
try { 

                                Thread.sleep(_sleepTime); 

                        } 
catch (InterruptedException e) { 

                        } 

                         

                        
if (_throwException) { 

                                
throw 
new RuntimeException(
"unchecked exception"); 

                        } 

                } 

                 

                
public 
synchronized 
int numRuns() { 

                        
return _numRuns; 

                } 

                 

        } 

         

    本文转自danni505 51CTO博客,原文链接:http://blog.51cto.com/danni505/204896,如需转载请自行联系原作者
你可能感兴趣的文章
时间助理 时之助
查看>>
英国征召前黑客组建“网络兵团”
查看>>
pyjamas build AJAX apps in Python (like Google did for Java)
查看>>
centos5.9使用RPM包搭建lamp平台
查看>>
[LeetCode] Merge Intervals
查看>>
Struts2 学习小结
查看>>
在 Linux 系统中安装Load Generator ,并在windows 调用
查看>>
POI getDataFormat() 格式对照
查看>>
/etc/resolv.conf文件详解
查看>>
oracle查看经常使用的系统信息
查看>>
Django_4_视图
查看>>
Linux的netstat命令使用
查看>>
大快网站:如何选择正确的hadoop版本
查看>>
经过这5大阶段,你离Java程序员就不远了!
查看>>
IntelliJ IDEA 连接数据库详细过程
查看>>
PHP-X开发扩展
查看>>
android学习笔记——onSaveInstanceState的使用
查看>>
工作中如何做好技术积累
查看>>
Spring Transactional
查看>>
shell脚本实例
查看>>