Index: impl/main/java/org/directwebremoting/impl/ThrottlingServerLoadMonitor.java =================================================================== --- impl/main/java/org/directwebremoting/impl/ThrottlingServerLoadMonitor.java (revision 3506) +++ impl/main/java/org/directwebremoting/impl/ThrottlingServerLoadMonitor.java (working copy) @@ -73,10 +73,8 @@ * When the disconnectedTime would go under 2 secs, we switch back to USAGE_HIGH * mode.

* @author Joe Walker [joe at getahead dot org] - * - * David Marginian - This class is NOT THREAD SAFE and should not be used until fixed! */ -public class ThrottlingServerLoadMonitor extends AbstractServerLoadMonitor +public class ThrottlingServerLoadMonitor extends AbstractServerLoadMonitor implements Runnable { /* (non-Javadoc) * @see org.directwebremoting.extend.ServerLoadMonitor#supportsStreaming() @@ -108,11 +106,13 @@ @Override public void threadWaitStarting(WaitController controller) { + super.threadWaitStarting(controller); hitMonitor.recordHit(); - waitingThreads++; - super.threadWaitStarting(controller); + synchronized(WAITING_THREADS_LOCK) { + waitingThreads++; + } - checkLoading(); + notifyLoadMonitor(); } /* (non-Javadoc) @@ -121,53 +121,197 @@ @Override public void threadWaitEnding(WaitController controller) { - waitingThreads--; super.threadWaitEnding(controller); + + synchronized(WAITING_THREADS_LOCK) { + waitingThreads--; + } + + notifyLoadMonitor(); } + /** - * Check that we are setting the time to next poll correctly. + * Constructor + * Starts the server load monitoring thread if it wasn't already running. */ - private void checkLoading() + public ThrottlingServerLoadMonitor() { + // Start the monitor thread. + synchronized(SERVER_MONITOR_LOCK) { + // Check null just in case someone tried to initialize the class twice. + if (t == null) { + t = new Thread(this); + t.setDaemon(true); + t.setName("ThrottlingServerLoadMonitor"); + t.start(); + + // Give the thread time to start. + do { + synchronized(t) { + try + { + t.wait(50); + } + catch (InterruptedException ignored) + { + } + } + } while (!running); + } + } + } + + /** + * Monitor the server load in a separate thread so that we don't penalize + * each client request. + */ + public void run() { - float hitsPerSecond = (float) hitMonitor.getHitsInLastPeriod() / SECONDS_MONITORED; + // This is thread safe because this thread is only created during + // the first instantiation of the class. + running = true; + // Notify the creator that the thread is now running. + synchronized(t) { + t.notify(); + } + + final int ONE_HOUR = 60 * 60 * 1000; + do { + try + { + synchronized(SERVER_MONITOR_LOCK) { + // Wait for a request to wake us up. If there are no requests coming + // in we can wait to adjust the load for quite a while. Load means + // nothing if there are no clients making requests. + debug("Waiting for client requests for up to an hour."); + SERVER_MONITOR_LOCK.wait(ONE_HOUR); + + // This allows a way for the thread to exit without falling into the + // sleep cycle. + if (!running) + { + break; + } + + + debug("Adjusting for server load."); + loadAdjust(); + } + + // No need to run more than twice the amount of monitored seconds. + // This will stop multiple requests from running the load monitor + // constantly allowing the creation of a more sophisticated monitor + // later on. + synchronized(SERVER_SLEEP_LOCK) { + debug("Sleeping to avoid client request overflow."); + SERVER_SLEEP_LOCK.wait((SECONDS_MONITORED * 1000)/2); + } + } + catch (InterruptedException ex) + { + log.error("Thread was interrupted."); + } + + } while(running); + } + + /** + * A method to notify the load monitoring thread that there has + * been a client request. + */ + private void notifyLoadMonitor() { + synchronized(SERVER_MONITOR_LOCK) { + SERVER_MONITOR_LOCK.notify(); + } + } + + /** + * Log a debug message if debug logging is enabled. + * @param o + */ + private void debug(Object o) { + log.info(o); + if (log.isDebugEnabled()) { + log.debug(o); + } + } + + /** + * Shutdown the monitor thread cleanly. + */ + @Override + public void contextDestroyed() + { + try { + // Try to shutdown the monitoring thread cleanly. + synchronized(SERVER_MONITOR_LOCK) { + running = false; + notifyLoadMonitor(); + } + + // If the thread was trying to avoid running due to multiple incoming + // requests we wake up the thread to avoid waiting for the sleep time. + synchronized(SERVER_SLEEP_LOCK) { + SERVER_SLEEP_LOCK.notify(); + } + + // Wait for the thread to shutdown. + t.join(SECONDS_MONITORED); + } + catch (InterruptedException ex) + { + log.error("Thread shutdown was interrupted."); + } finally { + // Ensure that the parent is shutdown correctly. + super.contextDestroyed(); + } + } + + /** + * Check that we are setting the next poll time correctly. + */ + private void loadAdjust() + { + // USAGE_LOW check if (waitingThreads < maxWaitingThreads) { - connectedTime = maxConnectedTime; - disconnectedTime = 0; - - setMode(USAGE_LOW); + // TODO: If the disconnectedTime was previously set different there is + // no way to return to the default here. + changeMode(USAGE_LOW, maxConnectedTime, disconnectedTime); return; } - int roundTripAtThreadOutSeconds = threadOutRoundTripTime / 1000; - + // Calculate the number of hits per second based on the number of hits since the + // last time the monitor ran. + float hitsPerSecond = (float) hitMonitor.getHitsInLastPeriod() / SECONDS_MONITORED; int hitsPerSecondAtThreadOut = maxWaitingThreads / roundTripAtThreadOutSeconds; int hitsPerSecondAtHitOut = maxHitsPerSecond; + debug("Hits per second: " + hitsPerSecond); + debug("Hits per second at ThreadOut: " + hitsPerSecondAtThreadOut); + debug("Hits per second at HitOut: " + hitsPerSecondAtHitOut); + + // USAGE_HIGH if (hitsPerSecond < hitsPerSecondAtThreadOut) { // We should probably be in USAGE_LOW mode, so we force the low // end of the values in USAGE_HIGH mode - connectedTime = usageHighInitialConnectedTime; - disconnectedTime = usageHighDisconnectedTime; - - setMode(USAGE_HIGH); + changeMode(USAGE_HIGH, connectedTime, disconnectedTime); return; } + float load = hitsPerSecond / maxHitsPerSecond; + + int loadBasedDisconnectedTime = (int) (disconnectedTime * load); if (mode == USAGE_DIGG) { // If we're getting close to the upper bound then slow down - float load = hitsPerSecond / maxHitsPerSecond; - connectedTime = usageDiggConnectedTime; - disconnectedTime = (int) (disconnectedTime * load); + changeMode(USAGE_DIGG, usageDiggConnectedTime, loadBasedDisconnectedTime); // Check that USAGE_DIGG is the correct mode and we shouldn't change if (disconnectedTime > usageDiggMinDisconnectedTime) { - setMode(USAGE_DIGG); return; } @@ -180,34 +324,32 @@ // if hitsPerSecondAtThreadOut=0 and hitsPerSecondAtHitOut=1 // where would we score? float factor = (float) waitingThreads / maxWaitingThreads; - connectedTime = (int) (connectedTime / factor); - if (connectedTime > usageHighInitialConnectedTime) + int tmpConnectedTime = (int) (connectedTime / factor); + if (tmpConnectedTime > usageHighInitialConnectedTime) { - connectedTime = usageHighInitialConnectedTime; - } - - if (connectedTime < usageHighFinalConnectedTime) + tmpConnectedTime = usageHighInitialConnectedTime; + } else if (tmpConnectedTime < usageHighFinalConnectedTime) { - connectedTime = usageHighFinalConnectedTime; + tmpConnectedTime = usageHighFinalConnectedTime; } - disconnectedTime = usageHighDisconnectedTime; - - setMode(USAGE_HIGH); + changeMode(USAGE_HIGH, tmpConnectedTime, usageHighDisconnectedTime); return; } - float load = hitsPerSecond / maxHitsPerSecond; - connectedTime = usageDiggConnectedTime; - disconnectedTime = (int) (disconnectedTime * load); - - if (disconnectedTime < usageDiggMinDisconnectedTime) + if (loadBasedDisconnectedTime < usageDiggMinDisconnectedTime) { - disconnectedTime = usageDiggMinDisconnectedTime; + changeMode(USAGE_DIGG, usageDiggConnectedTime, usageDiggMinDisconnectedTime); + } else { + changeMode(USAGE_DIGG, usageDiggConnectedTime, loadBasedDisconnectedTime); } + } - setMode(USAGE_DIGG); + private void changeMode(int usageMode, int _connectedTime, int _disconnectedTime) { + this.connectedTime = _connectedTime; + this.disconnectedTime = _disconnectedTime; + setMode(usageMode); } /** @@ -221,9 +363,18 @@ log.debug("Changing modes, from " + USAGE_NAMES[this.mode] + " to " + USAGE_NAMES[mode]); } - this.mode = mode; + synchronized(SERVER_MONITOR_LOCK) { + debug("Changing mode to " + USAGE_NAMES[mode]); + this.mode = mode; + } } + protected int getMode() { + synchronized(SERVER_MONITOR_LOCK) { + return this.mode; + } + } + /** * @param maxWaitingThreads the maxWaitingThreads to set */ @@ -241,7 +392,7 @@ } /** - * It might be good top expose this, however there are currently assumptions + * It might be good to expose this, however there are currently assumptions * in the code that the value is set to 60000. * See {@link #usageHighInitialConnectedTime}. * @param maxConnectedTime the maxConnectedTime to set @@ -261,6 +412,7 @@ protected static final int usageDiggMinDisconnectedTime = usageHighDisconnectedTime + usageHighFinalConnectedTime; protected static final int hitOutRoundTripTime = usageHighDisconnectedTime + usageHighFinalConnectedTime; protected static final int threadOutRoundTripTime = usageHighInitialConnectedTime + usageHighDisconnectedTime; + protected static final int roundTripAtThreadOutSeconds = threadOutRoundTripTime / 1000; /** * Static configuration data: The max number of threads we keep waiting. @@ -319,7 +471,7 @@ protected int disconnectedTime = 1000; /** - * We are recording the number of hits in the last 5 seconds. + * We are recording the number of hits in the last 10 seconds. * Maybe we should think about making this configurable. */ protected static final int SECONDS_MONITORED = 10; @@ -335,6 +487,22 @@ protected int waitingThreads = 0; /** + * Keep an Object around that we can lock on. + * Note: It probably isn't a good idea to access these locks outside this + * class, but it is necessary to test that the load monitor is working + * correctly in junit.. + */ + private static final Object WAITING_THREADS_LOCK = new Object(); + protected static final Object SERVER_MONITOR_LOCK = new Object(); + protected static final Object SERVER_SLEEP_LOCK = new Object(); + + /** + * Monitor thread support vars. + */ + private static boolean running = false; + private static Thread t; + + /** * The log stream */ private static final Log log = LogFactory.getLog(ThrottlingServerLoadMonitor.class);