Index: impl/main/java/org/directwebremoting/impl/ThrottlingServerLoadMonitor.java =================================================================== --- impl/main/java/org/directwebremoting/impl/ThrottlingServerLoadMonitor.java (revision 3767) +++ impl/main/java/org/directwebremoting/impl/ThrottlingServerLoadMonitor.java (working copy) @@ -15,6 +15,10 @@ */ package org.directwebremoting.impl; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.directwebremoting.extend.WaitController; @@ -74,7 +78,7 @@ * mode.

* @author Joe Walker [joe at getahead dot org] */ -public class ThrottlingServerLoadMonitor extends AbstractServerLoadMonitor implements Runnable +public class ThrottlingServerLoadMonitor extends AbstractServerLoadMonitor { /* (non-Javadoc) * @see org.directwebremoting.extend.ServerLoadMonitor#supportsStreaming() @@ -108,11 +112,15 @@ { super.threadWaitStarting(controller); hitMonitor.recordHit(); - synchronized(WAITING_THREADS_LOCK) { - waitingThreads++; + + int currentWaitingThreads = waitingThreads.incrementAndGet(); + + // If we are already adjusting skip it. + if (!adjusting.getAndSet(true)) + { + loadAdjust(currentWaitingThreads); + adjusting.set(false); } - - notifyLoadMonitor(); } /* (non-Javadoc) @@ -122,110 +130,10 @@ public void threadWaitEnding(WaitController controller) { super.threadWaitEnding(controller); - - synchronized(WAITING_THREADS_LOCK) { - waitingThreads--; - } - - notifyLoadMonitor(); + waitingThreads.decrementAndGet(); } - /** - * Constructor - * Starts the server load monitoring thread if it wasn't already running. - */ - public ThrottlingServerLoadMonitor() { - // Start the monitor thread. - synchronized(SERVER_MONITOR_LOCK) { - // Check null just in case someone tried to initialize the class twice. - if (t == null) { - t = new Thread(this); - t.setDaemon(true); - t.setName("ThrottlingServerLoadMonitor"); - t.start(); - - // Give the thread time to start. - do { - synchronized(t) { - try - { - t.wait(50); - } - catch (InterruptedException ignored) - { - } - } - } while (!running); - } - } - } - - /** - * Monitor the server load in a separate thread so that we don't penalize - * each client request. - */ - public void run() - { - // This is thread safe because this thread is only created during - // the first instantiation of the class. - running = true; - - // Notify the creator that the thread is now running. - synchronized(t) { - t.notify(); - } - - final int ONE_HOUR = 60 * 60 * 1000; - do { - try - { - synchronized(SERVER_MONITOR_LOCK) { - // Wait for a request to wake us up. If there are no requests coming - // in we can wait to adjust the load for quite a while. Load means - // nothing if there are no clients making requests. - debug("Waiting for client requests for up to an hour."); - SERVER_MONITOR_LOCK.wait(ONE_HOUR); - - // This allows a way for the thread to exit without falling into the - // sleep cycle. - if (!running) - { - break; - } - - debug("Adjusting for server load."); - loadAdjust(); - } - - // No need to run more than twice the amount of monitored seconds. - // This will stop multiple requests from running the load monitor - // constantly allowing the creation of a more sophisticated monitor - // later on. - synchronized(SERVER_SLEEP_LOCK) { - debug("Sleeping to avoid client request overflow."); - SERVER_SLEEP_LOCK.wait((SECONDS_MONITORED * 1000)/2); - } - } - catch (InterruptedException ex) - { - log.error("Thread was interrupted."); - } - - } while(running); - } - - /** - * A method to notify the load monitoring thread that there has - * been a client request. - */ - private void notifyLoadMonitor() { - synchronized(SERVER_MONITOR_LOCK) { - SERVER_MONITOR_LOCK.notify(); - } - } - - /** * Log a debug message if debug logging is enabled. * @param o */ @@ -236,43 +144,18 @@ } /** - * Shutdown the monitor thread cleanly. + * Check that we are setting the next poll time correctly. */ - @Override - public void contextDestroyed() + private void loadAdjust(int currentWaitingThreads) { - try { - // Try to shutdown the monitoring thread cleanly. - synchronized(SERVER_MONITOR_LOCK) { - running = false; - notifyLoadMonitor(); - } - - // If the thread was trying to avoid running due to multiple incoming - // requests we wake up the thread to avoid waiting for the sleep time. - synchronized(SERVER_SLEEP_LOCK) { - SERVER_SLEEP_LOCK.notify(); - } - - // Wait for the thread to shutdown. - t.join(SECONDS_MONITORED); - } - catch (InterruptedException ex) + // Adjust at most once every SECONDS_MONITORED / 2. + if ((System.currentTimeMillis() - lastLoadAdjust.get()) < (secondsMonitored/2)) { - log.error("Thread shutdown was interrupted."); - } finally { - // Ensure that the parent is shutdown correctly. - super.contextDestroyed(); + return; } - } - /** - * Check that we are setting the next poll time correctly. - */ - private void loadAdjust() - { // USAGE_LOW check - if (waitingThreads < maxWaitingThreads) + if (currentWaitingThreads < maxWaitingThreads) { // TODO: If the disconnectedTime was previously set different there is // no way to return to the default here. @@ -282,7 +165,7 @@ // Calculate the number of hits per second based on the number of hits since the // last time the monitor ran. - float hitsPerSecond = (float) hitMonitor.getHitsInLastPeriod() / SECONDS_MONITORED; + float hitsPerSecond = (float) hitMonitor.getHitsInLastPeriod() / secondsMonitored; int hitsPerSecondAtThreadOut = maxWaitingThreads / roundTripAtThreadOutSeconds; int hitsPerSecondAtHitOut = maxHitsPerSecond; @@ -302,7 +185,7 @@ float load = hitsPerSecond / maxHitsPerSecond; int loadBasedDisconnectedTime = (int) (disconnectedTime * load); - if (mode == USAGE_DIGG) + if (mode.get() == USAGE_DIGG) { // If we're getting close to the upper bound then slow down changeMode(USAGE_DIGG, usageDiggConnectedTime, loadBasedDisconnectedTime); @@ -321,7 +204,7 @@ { // if hitsPerSecondAtThreadOut=0 and hitsPerSecondAtHitOut=1 // where would we score? - float factor = (float) waitingThreads / maxWaitingThreads; + float factor = (float) currentWaitingThreads / maxWaitingThreads; int tmpConnectedTime = (int) (connectedTime / factor); if (tmpConnectedTime > usageHighInitialConnectedTime) @@ -339,7 +222,8 @@ if (loadBasedDisconnectedTime < usageDiggMinDisconnectedTime) { changeMode(USAGE_DIGG, usageDiggConnectedTime, usageDiggMinDisconnectedTime); - } else { + } else + { changeMode(USAGE_DIGG, usageDiggConnectedTime, loadBasedDisconnectedTime); } } @@ -352,24 +236,19 @@ /** * For debug purposes we keep a track of what mode we are in. - * @param mode The new usage mode + * @param newMode The new usage mode */ - protected void setMode(int mode) + protected void setMode(int newMode) { - if (log.isDebugEnabled() && mode != this.mode) + int currentMode = this.mode.getAndSet(newMode); + if (newMode != currentMode) { - log.debug("Changing modes, from " + USAGE_NAMES[this.mode] + " to " + USAGE_NAMES[mode]); + debug("Changing modes, from " + USAGE_NAMES[currentMode] + " to " + USAGE_NAMES[newMode]); } - - synchronized(SERVER_MONITOR_LOCK) { - this.mode = mode; - } } protected int getMode() { - synchronized(SERVER_MONITOR_LOCK) { - return this.mode; - } + return mode.get(); } /** @@ -400,7 +279,7 @@ } /** - * + * static calculations */ protected static final int usageHighDisconnectedTime = 1000; protected static final int usageHighInitialConnectedTime = 49000; @@ -454,7 +333,7 @@ /** * What is the current usage mode. */ - protected int mode = USAGE_LOW; + protected AtomicInteger mode = new AtomicInteger(USAGE_LOW); /** * The time we are currently waiting before sending a browser away and @@ -469,37 +348,27 @@ /** * We are recording the number of hits in the last 10 seconds. - * Maybe we should think about making this configurable. */ - protected static final int SECONDS_MONITORED = 10; + protected int secondsMonitored = 10; /** * Our record of the server loading */ - protected HitMonitor hitMonitor = new HitMonitor(SECONDS_MONITORED); + protected HitMonitor hitMonitor = new HitMonitor(secondsMonitored); /** * How many sleepers are there? */ - protected int waitingThreads = 0; + protected AtomicInteger waitingThreads = new AtomicInteger(0); /** - * Keep an Object around that we can lock on. - * Note: It probably isn't a good idea to access these locks outside this - * class, but it is necessary to test that the load monitor is working - * correctly in junit.. + * Stores the last time the monitor made an adjustment and tells us if + * the loadAdjustment is already taking place in another thread. */ - private static final Object WAITING_THREADS_LOCK = new Object(); - protected static final Object SERVER_MONITOR_LOCK = new Object(); - protected static final Object SERVER_SLEEP_LOCK = new Object(); + protected AtomicLong lastLoadAdjust = new AtomicLong(System.currentTimeMillis()); + protected AtomicBoolean adjusting = new AtomicBoolean(false); /** - * Monitor thread support vars. - */ - private static boolean running = false; - private static Thread t; - - /** * The log stream */ private static final Log log = LogFactory.getLog(ThrottlingServerLoadMonitor.class); Index: impl/test/java/org/directwebremoting/impl/ThrottlingServerLoadMonitorTest.java =================================================================== --- impl/test/java/org/directwebremoting/impl/ThrottlingServerLoadMonitorTest.java (revision 3767) +++ impl/test/java/org/directwebremoting/impl/ThrottlingServerLoadMonitorTest.java (working copy) @@ -14,6 +14,7 @@ package org.directwebremoting.impl; import org.directwebremoting.extend.WaitController; +import org.directwebremoting.util.HitMonitor; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -22,57 +23,49 @@ /** * @author matt conroy */ -public class ThrottlingServerLoadMonitorTest implements WaitController +public class ThrottlingServerLoadMonitorTest extends ThrottlingServerLoadMonitor implements WaitController { private static ThrottlingServerLoadMonitor t; @BeforeClass public static void start() { - t = new ThrottlingServerLoadMonitor(); + t = new ThrottlingServerLoadMonitorTest(); } @Test - public void loadTest() throws InterruptedException { - checkMode(t.USAGE_LOW); + public void loadTest() { + checkMode(ThrottlingServerLoadMonitor.USAGE_LOW); // Simulate load. - for (int i = 0; i < 1000; i++) + for (int i = 0; i < 10000; i++) { t.threadWaitStarting(this); + + // For the sake of testing we need to run the load balancer every + // time through the loop. + t.lastLoadAdjust.set(System.currentTimeMillis() - t.secondsMonitored * 1000); } - checkMode(t.USAGE_DIGG); - // TODO: This is about as much testing as we can do since the - // state changes are completely based on the number of hits per second. - // The only way that I can think of at the moment to test this better - // would be to open up additional methods of the Monitor which is - // probably not a good idea. - } + checkMode(ThrottlingServerLoadMonitor.USAGE_DIGG); - // TODO: I don't like having to check multiple times for the - // thread to run, but it is simple and does the job. - private void checkMode(int mode) throws InterruptedException { - // Give the thread some time to run. - // Gotta love multi processor boxes. - for (int i = 0; i < 3; i++) + for (int i = 0; i < 10000; i++) { - synchronized(t.SERVER_SLEEP_LOCK) { - t.SERVER_SLEEP_LOCK.notify(); - } - synchronized(t.SERVER_MONITOR_LOCK) { - t.SERVER_MONITOR_LOCK.notify(); - } + t.threadWaitEnding(this); + } - if (t.getMode() == mode) - { - break; - } + // Simulate no hits + hitMonitor = new HitMonitor(t.secondsMonitored); + t.lastLoadAdjust.set(System.currentTimeMillis() - t.secondsMonitored); + t.threadWaitStarting(this); + checkMode(ThrottlingServerLoadMonitor.USAGE_LOW); - Thread.sleep(100); - } - Assert.assertEquals(mode, t.getMode()); + // TODO More testing can be done with additional timing calculations. } + private void checkMode(int m) { + Assert.assertEquals(m, t.getMode()); + } + @AfterClass public static void stop() { t.contextDestroyed(); @@ -83,8 +76,6 @@ */ public void shutdown() { - // TODO Auto-generated method stub - } /* (non-Javadoc) @@ -92,7 +83,6 @@ */ public boolean isShutdown() { - // TODO Auto-generated method stub return false; } }