removed the ad-hoc use of background threads (and ParallelRunQueue) and
replaced them all with tasks queued into the GlobalSite
This commit is contained in:
		
							parent
							
								
									c6add9371b
								
							
						
					
					
						commit
						2997dd41f0
					
				@ -1,154 +0,0 @@
 | 
			
		||||
/*
 | 
			
		||||
 * The contents of this file are subject to the Mozilla Public License Version 1.1
 | 
			
		||||
 * (the "License"); you may not use this file except in compliance with the License.
 | 
			
		||||
 * You may obtain a copy of the License at <http://www.mozilla.org/MPL/>.
 | 
			
		||||
 * 
 | 
			
		||||
 * Software distributed under the License is distributed on an "AS IS" basis, WITHOUT
 | 
			
		||||
 * WARRANTY OF ANY KIND, either express or implied. See the License for the specific
 | 
			
		||||
 * language governing rights and limitations under the License.
 | 
			
		||||
 * 
 | 
			
		||||
 * The Original Code is the Venice Web Communities System.
 | 
			
		||||
 * 
 | 
			
		||||
 * The Initial Developer of the Original Code is Eric J. Bowersox <erbo@silcom.com>,
 | 
			
		||||
 * for Silverwrist Design Studios.  Portions created by Eric J. Bowersox are
 | 
			
		||||
 * Copyright (C) 2001 Eric J. Bowersox/Silverwrist Design Studios.  All Rights Reserved.
 | 
			
		||||
 * 
 | 
			
		||||
 * Contributor(s): 
 | 
			
		||||
 */
 | 
			
		||||
package com.silverwrist.util;
 | 
			
		||||
 | 
			
		||||
import java.util.Vector;
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * A class which takes any number of <CODE>Runnable</CODE> objects, and executes them in parallel on
 | 
			
		||||
 * multiple threads insofar as possible.
 | 
			
		||||
 *
 | 
			
		||||
 * @author Eric J. Bowersox <erbo@silcom.com>
 | 
			
		||||
 * @version X
 | 
			
		||||
 * @see java.lang.Runnable
 | 
			
		||||
 * @see java.lang.Thread
 | 
			
		||||
 */
 | 
			
		||||
public class ParallelRunQueue implements Runnable
 | 
			
		||||
{
 | 
			
		||||
  /*--------------------------------------------------------------------------------
 | 
			
		||||
   * Attributes
 | 
			
		||||
   *--------------------------------------------------------------------------------
 | 
			
		||||
   */
 | 
			
		||||
 | 
			
		||||
  private Thread[] thrds;    // the current threads
 | 
			
		||||
  private Vector queue;      // the queue of Runnables to be run
 | 
			
		||||
  private int priority;      // the priority to use for all these threads
 | 
			
		||||
 | 
			
		||||
  /*--------------------------------------------------------------------------------
 | 
			
		||||
   * Constructor
 | 
			
		||||
   *--------------------------------------------------------------------------------
 | 
			
		||||
   */
 | 
			
		||||
 | 
			
		||||
  /**
 | 
			
		||||
   * Creates a new <CODE>ParallelRunQueue</CODE>.
 | 
			
		||||
   *
 | 
			
		||||
   * @param nthread Number of threads to be executed in parallel with the current one by this run queue.
 | 
			
		||||
   */
 | 
			
		||||
  public ParallelRunQueue(int nthread)
 | 
			
		||||
  {
 | 
			
		||||
    thrds = new Thread[nthread];
 | 
			
		||||
    for (int i=0; i<nthread; i++)
 | 
			
		||||
      thrds[i] = null;  // no threads to start with
 | 
			
		||||
    queue = new Vector();
 | 
			
		||||
    priority = Thread.currentThread().getPriority();  // default the priority
 | 
			
		||||
 | 
			
		||||
  } // end constructor
 | 
			
		||||
 | 
			
		||||
  /*--------------------------------------------------------------------------------
 | 
			
		||||
   * Implementations from interface Runnable
 | 
			
		||||
   *--------------------------------------------------------------------------------
 | 
			
		||||
   */
 | 
			
		||||
 | 
			
		||||
  /**
 | 
			
		||||
   * Executes all <CODE>Runnable</CODE> objects currently queued.  When this method returns, the run queue
 | 
			
		||||
   * is empty.  The objects are executed on one of the internal worker threads, or on the current thread
 | 
			
		||||
   * if no other threads are available.
 | 
			
		||||
   */
 | 
			
		||||
  public void run()
 | 
			
		||||
  {
 | 
			
		||||
    while (queue.size()>0)
 | 
			
		||||
    { // unqueue a new Runnable
 | 
			
		||||
      Runnable r = (Runnable)(queue.remove(0));
 | 
			
		||||
      for (int i=0; i<thrds.length; i++)
 | 
			
		||||
      { // scan through our threads list...
 | 
			
		||||
	if ((thrds[i]==null) || !(thrds[i].isAlive()))
 | 
			
		||||
	{ // a worker thread is available - start it running
 | 
			
		||||
	  thrds[i] = new Thread(r);
 | 
			
		||||
	  thrds[i].setPriority(priority);
 | 
			
		||||
	  thrds[i].start();
 | 
			
		||||
	  return;
 | 
			
		||||
 | 
			
		||||
	} // end if
 | 
			
		||||
 | 
			
		||||
      } // end for
 | 
			
		||||
 | 
			
		||||
      r.run();  // if all else fails, run it ourselves
 | 
			
		||||
 | 
			
		||||
    } // end while
 | 
			
		||||
 | 
			
		||||
    work();  // GC any further dead threads
 | 
			
		||||
 | 
			
		||||
  } // end run
 | 
			
		||||
 | 
			
		||||
  /*--------------------------------------------------------------------------------
 | 
			
		||||
   * External operations
 | 
			
		||||
   *--------------------------------------------------------------------------------
 | 
			
		||||
   */
 | 
			
		||||
 | 
			
		||||
  /**
 | 
			
		||||
   * Enqueues a new <CODE>Runnable</CODE> object onto the queue.  If a worker thread is available, the
 | 
			
		||||
   * new object is started running immediately.
 | 
			
		||||
   *
 | 
			
		||||
   * @param r The <CODE>Runnable</CODE> object to enqueue.
 | 
			
		||||
   */
 | 
			
		||||
  public void queue(Runnable r)
 | 
			
		||||
  {
 | 
			
		||||
    for (int i=0; i<thrds.length; i++)
 | 
			
		||||
    { // scan through our threads list...
 | 
			
		||||
      if ((thrds[i]==null) || !(thrds[i].isAlive()))
 | 
			
		||||
      { // a worker thread is available - start it running
 | 
			
		||||
	thrds[i] = new Thread(r);
 | 
			
		||||
	thrds[i].setPriority(priority);
 | 
			
		||||
	thrds[i].start();
 | 
			
		||||
	work();
 | 
			
		||||
	return;
 | 
			
		||||
 | 
			
		||||
      } // end if
 | 
			
		||||
 | 
			
		||||
    } // end for
 | 
			
		||||
 | 
			
		||||
    queue.add(r);  // if all else fails, queue it up
 | 
			
		||||
 | 
			
		||||
  } // end queue
 | 
			
		||||
 | 
			
		||||
  /**
 | 
			
		||||
   * If any threads in the run queue are dead, replaces them with live threads running fresh elements out
 | 
			
		||||
   * of the queue.  If no more <CODE>Runnable</CODE> objects remain, then dead threads are nulled out to
 | 
			
		||||
   * garbage-collect them.
 | 
			
		||||
   */
 | 
			
		||||
  public void work()
 | 
			
		||||
  {
 | 
			
		||||
    for (int i=0; i<thrds.length; i++)
 | 
			
		||||
      if ((thrds[i]==null) || !(thrds[i].isAlive()))
 | 
			
		||||
      { // found an empty slot or dead thread to replace
 | 
			
		||||
	if (queue.size()==0)
 | 
			
		||||
	  thrds[i] = null;  // let the dead thread be garbage-collected
 | 
			
		||||
	else
 | 
			
		||||
	{ // get the next Runnable and enqueue it
 | 
			
		||||
	  Runnable r = (Runnable)(queue.remove(0));
 | 
			
		||||
	  thrds[i] = new Thread(r);
 | 
			
		||||
	  thrds[i].setPriority(priority);
 | 
			
		||||
	  thrds[i].start();
 | 
			
		||||
 | 
			
		||||
	} // end else
 | 
			
		||||
 | 
			
		||||
      } // end if and for
 | 
			
		||||
 | 
			
		||||
  } // end work
 | 
			
		||||
 | 
			
		||||
} // end class ParallelRunQueue
 | 
			
		||||
@ -20,12 +20,12 @@ package com.silverwrist.venice.core.impl;
 | 
			
		||||
import java.sql.*;
 | 
			
		||||
import java.util.*;
 | 
			
		||||
import org.apache.log4j.*;
 | 
			
		||||
import com.silverwrist.util.ParallelRunQueue;
 | 
			
		||||
import com.silverwrist.util.cache.ObjectCache;
 | 
			
		||||
import com.silverwrist.venice.core.internals.*;
 | 
			
		||||
import com.silverwrist.venice.db.*;
 | 
			
		||||
import com.silverwrist.venice.except.DataException;
 | 
			
		||||
import com.silverwrist.venice.except.InternalStateError;
 | 
			
		||||
import com.silverwrist.venice.svc.*;
 | 
			
		||||
 | 
			
		||||
class BackgroundCommunityPurge implements Runnable
 | 
			
		||||
{
 | 
			
		||||
@ -41,6 +41,7 @@ class BackgroundCommunityPurge implements Runnable
 | 
			
		||||
   *--------------------------------------------------------------------------------
 | 
			
		||||
   */
 | 
			
		||||
 | 
			
		||||
  private GlobalSite globalsite;
 | 
			
		||||
  private EnvCommunity env;
 | 
			
		||||
  private int cid;
 | 
			
		||||
  private int num_confs;
 | 
			
		||||
@ -52,8 +53,10 @@ class BackgroundCommunityPurge implements Runnable
 | 
			
		||||
   *--------------------------------------------------------------------------------
 | 
			
		||||
   */
 | 
			
		||||
 | 
			
		||||
  BackgroundCommunityPurge(EnvCommunity env, int cid, int num_confs, int max_confid, ObjectCache conf_objcache)
 | 
			
		||||
  BackgroundCommunityPurge(GlobalSite globalsite, EnvCommunity env, int cid, int num_confs, int max_confid,
 | 
			
		||||
			   ObjectCache conf_objcache)
 | 
			
		||||
  {
 | 
			
		||||
    this.globalsite = globalsite;
 | 
			
		||||
    this.env = env;
 | 
			
		||||
    this.cid = cid;
 | 
			
		||||
    this.num_confs = num_confs;
 | 
			
		||||
@ -74,11 +77,10 @@ class BackgroundCommunityPurge implements Runnable
 | 
			
		||||
 | 
			
		||||
    Connection conn = null;  // pooled database connection
 | 
			
		||||
    Statement stmt = null;
 | 
			
		||||
    ParallelRunQueue rq = new ParallelRunQueue(2);
 | 
			
		||||
 | 
			
		||||
    try
 | 
			
		||||
    { // get a database connection from the pool
 | 
			
		||||
      conn = env.getConnection();
 | 
			
		||||
      conn = globalsite.getConnection(null);
 | 
			
		||||
      stmt = conn.createStatement();
 | 
			
		||||
 | 
			
		||||
      // run some "lower priority" deletes
 | 
			
		||||
@ -141,7 +143,9 @@ class BackgroundCommunityPurge implements Runnable
 | 
			
		||||
	    rs = stmt.executeQuery(sql.toString());
 | 
			
		||||
	    if (!(rs.next()))
 | 
			
		||||
	      throw new InternalStateError("BackgroundCommunityPurge.run screwup on conference SELECT");
 | 
			
		||||
	    rq.queue(new BackgroundConferencePurge(env,key.intValue(),rs.getInt(1),rs.getInt(2)));
 | 
			
		||||
	    globalsite.queueTask(new BackgroundConferencePurge(globalsite,env,key.intValue(),rs.getInt(1),
 | 
			
		||||
							       rs.getInt(2)),
 | 
			
		||||
				 GlobalSite.TASK_PRIO_NORMAL-1);
 | 
			
		||||
	    SQLUtil.shutdown(rs);
 | 
			
		||||
 | 
			
		||||
	  } // end if (have to delete conference data)
 | 
			
		||||
@ -173,8 +177,6 @@ class BackgroundCommunityPurge implements Runnable
 | 
			
		||||
 | 
			
		||||
    } // end finally
 | 
			
		||||
 | 
			
		||||
    rq.run();  // now run the parallel queue to finish processing
 | 
			
		||||
 | 
			
		||||
    if (logger.isDebugEnabled())
 | 
			
		||||
      logger.debug("BackgroundCommunityPurge COMPLETE for community #" + cid);
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -19,10 +19,10 @@ package com.silverwrist.venice.core.impl;
 | 
			
		||||
 | 
			
		||||
import java.sql.*;
 | 
			
		||||
import org.apache.log4j.*;
 | 
			
		||||
import com.silverwrist.util.ParallelRunQueue;
 | 
			
		||||
import com.silverwrist.venice.db.*;
 | 
			
		||||
import com.silverwrist.venice.except.InternalStateError;
 | 
			
		||||
import com.silverwrist.venice.core.internals.*;
 | 
			
		||||
import com.silverwrist.venice.svc.*;
 | 
			
		||||
 | 
			
		||||
class BackgroundConferencePurge implements Runnable
 | 
			
		||||
{
 | 
			
		||||
@ -38,6 +38,7 @@ class BackgroundConferencePurge implements Runnable
 | 
			
		||||
   *--------------------------------------------------------------------------------
 | 
			
		||||
   */
 | 
			
		||||
 | 
			
		||||
  private GlobalSite globalsite;         // the global site
 | 
			
		||||
  private EnvEngine env;                 // the environment
 | 
			
		||||
  private int confid;                    // the conference ID
 | 
			
		||||
  private int num_topics;                // the number of topics
 | 
			
		||||
@ -48,8 +49,9 @@ class BackgroundConferencePurge implements Runnable
 | 
			
		||||
   *--------------------------------------------------------------------------------
 | 
			
		||||
   */
 | 
			
		||||
 | 
			
		||||
  BackgroundConferencePurge(EnvEngine env, int confid, int num_topics, int max_topicid)
 | 
			
		||||
  BackgroundConferencePurge(GlobalSite globalsite, EnvEngine env, int confid, int num_topics, int max_topicid)
 | 
			
		||||
  {
 | 
			
		||||
    this.globalsite = globalsite;
 | 
			
		||||
    this.env = env;
 | 
			
		||||
    this.confid = confid;
 | 
			
		||||
    this.num_topics = num_topics;
 | 
			
		||||
@ -69,11 +71,10 @@ class BackgroundConferencePurge implements Runnable
 | 
			
		||||
 | 
			
		||||
    Connection conn = null;  // pooled database connection
 | 
			
		||||
    Statement stmt = null;
 | 
			
		||||
    ParallelRunQueue rq = new ParallelRunQueue(2);
 | 
			
		||||
 | 
			
		||||
    try
 | 
			
		||||
    { // get a database connection from the pool
 | 
			
		||||
      conn = env.getConnection();
 | 
			
		||||
      conn = globalsite.getConnection(null);
 | 
			
		||||
      stmt = conn.createStatement();
 | 
			
		||||
 | 
			
		||||
      // purge out some auxiliary tables first
 | 
			
		||||
@ -105,7 +106,8 @@ class BackgroundConferencePurge implements Runnable
 | 
			
		||||
	rs = stmt.executeQuery(sql.toString());
 | 
			
		||||
	if (!(rs.next()))
 | 
			
		||||
	  throw new InternalStateError("BackgroundConferencePurge.run screwup on post SELECT");
 | 
			
		||||
	rq.queue(new BackgroundTopicPurge(env,topicids[i],rs.getInt(1),rs.getLong(2)));
 | 
			
		||||
	globalsite.queueTask(new BackgroundTopicPurge(globalsite,env,topicids[i],rs.getInt(1),rs.getLong(2)),
 | 
			
		||||
			     GlobalSite.TASK_PRIO_NORMAL-1);
 | 
			
		||||
	SQLUtil.shutdown(rs);
 | 
			
		||||
 | 
			
		||||
      } // end for
 | 
			
		||||
@ -127,8 +129,6 @@ class BackgroundConferencePurge implements Runnable
 | 
			
		||||
 | 
			
		||||
    } // end finally
 | 
			
		||||
 | 
			
		||||
    rq.run();  // now run the parallel queue to finish processing
 | 
			
		||||
 | 
			
		||||
    if (logger.isDebugEnabled())
 | 
			
		||||
      logger.debug("BackgroundConferencePurge COMPLETE for conf #" + confid);
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -21,6 +21,7 @@ import java.sql.*;
 | 
			
		||||
import org.apache.log4j.*;
 | 
			
		||||
import com.silverwrist.venice.db.*;
 | 
			
		||||
import com.silverwrist.venice.core.internals.*;
 | 
			
		||||
import com.silverwrist.venice.svc.*;
 | 
			
		||||
 | 
			
		||||
class BackgroundTopicPurge implements Runnable
 | 
			
		||||
{
 | 
			
		||||
@ -36,6 +37,7 @@ class BackgroundTopicPurge implements Runnable
 | 
			
		||||
   *--------------------------------------------------------------------------------
 | 
			
		||||
   */
 | 
			
		||||
 | 
			
		||||
  private GlobalSite globalsite;           // the global site
 | 
			
		||||
  private EnvEngine env;                   // the environment block
 | 
			
		||||
  private int topicid;                     // the topic ID
 | 
			
		||||
  private int num_posts;                   // the number of posts in this topic
 | 
			
		||||
@ -46,8 +48,9 @@ class BackgroundTopicPurge implements Runnable
 | 
			
		||||
   *--------------------------------------------------------------------------------
 | 
			
		||||
   */
 | 
			
		||||
 | 
			
		||||
  BackgroundTopicPurge(EnvEngine env, int topicid, int num_posts, long max_postid)
 | 
			
		||||
  BackgroundTopicPurge(GlobalSite globalsite, EnvEngine env, int topicid, int num_posts, long max_postid)
 | 
			
		||||
  {
 | 
			
		||||
    this.globalsite = globalsite;
 | 
			
		||||
    this.env = env;
 | 
			
		||||
    this.topicid = topicid;
 | 
			
		||||
    this.num_posts = num_posts;
 | 
			
		||||
@ -71,7 +74,7 @@ class BackgroundTopicPurge implements Runnable
 | 
			
		||||
 | 
			
		||||
    try
 | 
			
		||||
    { // get a database connection from the pool
 | 
			
		||||
      conn = env.getConnection();
 | 
			
		||||
      conn = globalsite.getConnection(null);
 | 
			
		||||
      stmt = conn.createStatement();
 | 
			
		||||
 | 
			
		||||
      // look up all the post IDs that are present for this topic
 | 
			
		||||
 | 
			
		||||
@ -1865,12 +1865,9 @@ class CommunityCoreData implements CommunityData, PropertyProvider
 | 
			
		||||
 | 
			
		||||
    } // end finally
 | 
			
		||||
 | 
			
		||||
    // Delete the rest of the gunk in the background; use another thread to do it.
 | 
			
		||||
    BackgroundCommunityPurge purger = new BackgroundCommunityPurge(outer,cid,conf_count,conf_max,
 | 
			
		||||
								   conf_objcache);
 | 
			
		||||
    Thread thrd = new Thread(purger);
 | 
			
		||||
    thrd.setPriority(Thread.NORM_PRIORITY-1);
 | 
			
		||||
    thrd.start();
 | 
			
		||||
    // Delete the rest of the gunk in the background; use a background task to do it.
 | 
			
		||||
    globalsite.queueTask(new BackgroundCommunityPurge(globalsite,outer,cid,conf_count,conf_max,conf_objcache),
 | 
			
		||||
			 GlobalSite.TASK_PRIO_NORMAL-1);
 | 
			
		||||
 | 
			
		||||
  } // end delete
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -1406,8 +1406,7 @@ class CommunityUserContextImpl implements CommunityContext, CommunityBackend
 | 
			
		||||
      MailSend msend = new PersonalMailSend(env.getMailSender(),env.getGlobalSite(),env.getUserProps());
 | 
			
		||||
      msend.setSubject(subject);
 | 
			
		||||
      msend.setText(buf.toString());
 | 
			
		||||
      MailerAgent agent = new MailerAgent(msend,mail_list);
 | 
			
		||||
      agent.start();
 | 
			
		||||
      env.getGlobalSite().queueTask(new MailerAgent(msend,mail_list));
 | 
			
		||||
 | 
			
		||||
    } // end if
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -1351,11 +1351,9 @@ class ConferenceCoreData implements ConferenceData
 | 
			
		||||
 | 
			
		||||
    } // end finally
 | 
			
		||||
 | 
			
		||||
    // Delete the rest of the gunk in the background; spin off another thread to handle it.
 | 
			
		||||
    BackgroundConferencePurge purger = new BackgroundConferencePurge(env,confid,topic_count,topic_max);
 | 
			
		||||
    Thread thrd = new Thread(purger);
 | 
			
		||||
    thrd.setPriority(Thread.NORM_PRIORITY-1);
 | 
			
		||||
    thrd.start();
 | 
			
		||||
    // Delete the rest of the gunk in the background; use a background task to handle it.
 | 
			
		||||
    globalsite.queueTask(new BackgroundConferencePurge(globalsite,env,confid,topic_count,topic_max),
 | 
			
		||||
			 GlobalSite.TASK_PRIO_NORMAL-1);
 | 
			
		||||
 | 
			
		||||
  } // end delete
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -1619,8 +1619,7 @@ class ConferenceUserContextImpl implements ConferenceContext, ConferenceBackend
 | 
			
		||||
      MailSend msend = new PersonalMailSend(env.getMailSender(),env.getGlobalSite(),env.getUserProps());
 | 
			
		||||
      msend.setSubject(subject);
 | 
			
		||||
      msend.setText(buf.toString());
 | 
			
		||||
      MailerAgent agent = new MailerAgent(msend,rc);
 | 
			
		||||
      agent.start();
 | 
			
		||||
      env.getGlobalSite().queueTask(new MailerAgent(msend,rc));
 | 
			
		||||
 | 
			
		||||
    } // end if
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -25,7 +25,7 @@ import com.silverwrist.venice.except.*;
 | 
			
		||||
import com.silverwrist.venice.htmlcheck.*;
 | 
			
		||||
import com.silverwrist.venice.util.MailSend;
 | 
			
		||||
 | 
			
		||||
class MailerAgent extends Thread
 | 
			
		||||
class MailerAgent implements Runnable
 | 
			
		||||
{
 | 
			
		||||
  /*--------------------------------------------------------------------------------
 | 
			
		||||
   * Static data members
 | 
			
		||||
@ -49,16 +49,13 @@ class MailerAgent extends Thread
 | 
			
		||||
 | 
			
		||||
  MailerAgent(MailSend msend, List recipients)
 | 
			
		||||
  {
 | 
			
		||||
    super();
 | 
			
		||||
    setDaemon(false);
 | 
			
		||||
 | 
			
		||||
    this.msend = msend;
 | 
			
		||||
    this.recipients = recipients;
 | 
			
		||||
 | 
			
		||||
  } // end constructor
 | 
			
		||||
 | 
			
		||||
  /*--------------------------------------------------------------------------------
 | 
			
		||||
   * Overrides from class Thread
 | 
			
		||||
   * Implementations from interface Runnable
 | 
			
		||||
   *--------------------------------------------------------------------------------
 | 
			
		||||
   */
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -25,7 +25,7 @@ import com.silverwrist.venice.except.*;
 | 
			
		||||
import com.silverwrist.venice.htmlcheck.*;
 | 
			
		||||
import com.silverwrist.venice.util.MailSend;
 | 
			
		||||
 | 
			
		||||
class PostDeliveryAgent extends Thread
 | 
			
		||||
class PostDeliveryAgent implements Runnable
 | 
			
		||||
{
 | 
			
		||||
  /*--------------------------------------------------------------------------------
 | 
			
		||||
   * Static data members
 | 
			
		||||
@ -52,9 +52,6 @@ class PostDeliveryAgent extends Thread
 | 
			
		||||
 | 
			
		||||
  PostDeliveryAgent(EnvConference env, String txt, String pseud, String topicname, int topicnum, List addrs)
 | 
			
		||||
  {
 | 
			
		||||
    super();
 | 
			
		||||
    setDaemon(false);
 | 
			
		||||
 | 
			
		||||
    // Save the calling data.
 | 
			
		||||
    this.env = env;
 | 
			
		||||
    this.post_text = txt;
 | 
			
		||||
@ -72,7 +69,7 @@ class PostDeliveryAgent extends Thread
 | 
			
		||||
  } // end constructor
 | 
			
		||||
 | 
			
		||||
  /*--------------------------------------------------------------------------------
 | 
			
		||||
   * Overrides from class Thread
 | 
			
		||||
   * Implementations from interface Runnable
 | 
			
		||||
   *--------------------------------------------------------------------------------
 | 
			
		||||
   */
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -1312,9 +1312,8 @@ class TopicMessageUserContextImpl implements TopicMessageContext
 | 
			
		||||
 | 
			
		||||
    if (mailto_addrs!=null)
 | 
			
		||||
    { // spin off a background thread to generate the E-mails
 | 
			
		||||
      PostDeliveryAgent agent = new PostDeliveryAgent(env,my_text,pseud,new_topic_name,new_topic_num,
 | 
			
		||||
						      mailto_addrs);
 | 
			
		||||
      agent.start();
 | 
			
		||||
      env.getGlobalSite().queueTask(new PostDeliveryAgent(env,my_text,pseud,new_topic_name,new_topic_num,
 | 
			
		||||
							  mailto_addrs));
 | 
			
		||||
 | 
			
		||||
    } // end if
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -28,6 +28,7 @@ import com.silverwrist.venice.db.*;
 | 
			
		||||
import com.silverwrist.venice.except.*;
 | 
			
		||||
import com.silverwrist.venice.htmlcheck.*;
 | 
			
		||||
import com.silverwrist.venice.security.AuditRecord;
 | 
			
		||||
import com.silverwrist.venice.svc.GlobalSite;
 | 
			
		||||
import com.silverwrist.venice.util.MailSend;
 | 
			
		||||
 | 
			
		||||
class TopicUserContextImpl implements TopicContext
 | 
			
		||||
@ -897,8 +898,7 @@ class TopicUserContextImpl implements TopicContext
 | 
			
		||||
    if (mailto_addrs!=null)
 | 
			
		||||
    { // a copy of the post needs to be delivered via E-mail to the specified addresses
 | 
			
		||||
      // use our PostDeliveryAgent to do it in the background
 | 
			
		||||
      PostDeliveryAgent agent = new PostDeliveryAgent(env,text,pseud,name,topicnum,mailto_addrs);
 | 
			
		||||
      agent.start();
 | 
			
		||||
      env.getGlobalSite().queueTask(new PostDeliveryAgent(env,text,pseud,name,topicnum,mailto_addrs));
 | 
			
		||||
 | 
			
		||||
    } // end if
 | 
			
		||||
    // else don't bother - it would be a waste of time
 | 
			
		||||
@ -1013,10 +1013,9 @@ class TopicUserContextImpl implements TopicContext
 | 
			
		||||
    } // end finally
 | 
			
		||||
 | 
			
		||||
    // Delete the rest of the gunk in the background; spin off another thread to handle it.
 | 
			
		||||
    BackgroundTopicPurge purger = new BackgroundTopicPurge(env,topicid,post_count,post_max);
 | 
			
		||||
    Thread thrd = new Thread(purger);
 | 
			
		||||
    thrd.setPriority(Thread.NORM_PRIORITY-1);
 | 
			
		||||
    thrd.start();
 | 
			
		||||
    env.getGlobalSite().queueTask(new BackgroundTopicPurge(env.getGlobalSite(),env,topicid,post_count,
 | 
			
		||||
							   post_max),
 | 
			
		||||
				  GlobalSite.TASK_PRIO_NORMAL-1);
 | 
			
		||||
 | 
			
		||||
  } // end delete
 | 
			
		||||
 | 
			
		||||
@ -1481,8 +1480,7 @@ class TopicUserContextImpl implements TopicContext
 | 
			
		||||
      MailSend msend = new PersonalMailSend(env.getMailSender(),env.getGlobalSite(),env.getUserProps());
 | 
			
		||||
      msend.setSubject(subject);
 | 
			
		||||
      msend.setText(buf.toString());
 | 
			
		||||
      MailerAgent agent = new MailerAgent(msend,rc);
 | 
			
		||||
      agent.start();
 | 
			
		||||
      env.getGlobalSite().queueTask(new MailerAgent(msend,rc));
 | 
			
		||||
 | 
			
		||||
    } // end if
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -43,17 +43,31 @@ public class GlobalSiteImpl implements GlobalSite
 | 
			
		||||
 | 
			
		||||
  class BackgroundThread extends Thread
 | 
			
		||||
  {
 | 
			
		||||
    /*====================================================================
 | 
			
		||||
     * Constructor
 | 
			
		||||
     *====================================================================
 | 
			
		||||
     */
 | 
			
		||||
 | 
			
		||||
    BackgroundThread(int index)
 | 
			
		||||
    {
 | 
			
		||||
      super("GlobalSide_bkgd_" + String.valueOf(index + 1));
 | 
			
		||||
 | 
			
		||||
    } // end constructor
 | 
			
		||||
 | 
			
		||||
    /*====================================================================
 | 
			
		||||
     * Internal operations
 | 
			
		||||
     *====================================================================
 | 
			
		||||
     */
 | 
			
		||||
 | 
			
		||||
    private void doRunLoop(boolean quitting)
 | 
			
		||||
    {
 | 
			
		||||
      // get a task from the queue
 | 
			
		||||
      Runnable task = getNextTaskToRun(quitting);
 | 
			
		||||
      while (task!=null)
 | 
			
		||||
      { // null task indicates we are shutting down
 | 
			
		||||
	if (my_logger.isDebugEnabled())
 | 
			
		||||
	  my_logger.debug("Servicing task of class " + task.getClass().getName());
 | 
			
		||||
 | 
			
		||||
	try
 | 
			
		||||
	{ // run the background task
 | 
			
		||||
	  task.run();
 | 
			
		||||
@ -61,7 +75,7 @@ public class GlobalSiteImpl implements GlobalSite
 | 
			
		||||
	} // end try
 | 
			
		||||
	catch (Exception e)
 | 
			
		||||
	{ // log the exception and continue
 | 
			
		||||
	  logger.error("Background thread caught exception",e);
 | 
			
		||||
	  my_logger.error("Background thread caught exception in task",e);
 | 
			
		||||
 | 
			
		||||
	} // end catch
 | 
			
		||||
 | 
			
		||||
@ -71,10 +85,18 @@ public class GlobalSiteImpl implements GlobalSite
 | 
			
		||||
 | 
			
		||||
    } // end doRunLoop
 | 
			
		||||
 | 
			
		||||
    /*====================================================================
 | 
			
		||||
     * Overrides from class Thread
 | 
			
		||||
     *====================================================================
 | 
			
		||||
     */
 | 
			
		||||
 | 
			
		||||
    public void run()
 | 
			
		||||
    {
 | 
			
		||||
      my_logger.info("Background thread starting");
 | 
			
		||||
      doRunLoop(false);
 | 
			
		||||
      my_logger.info("Background thread quitting");
 | 
			
		||||
      doRunLoop(true);
 | 
			
		||||
      my_logger.info("Background thread finished");
 | 
			
		||||
 | 
			
		||||
    } // end run
 | 
			
		||||
 | 
			
		||||
@ -86,6 +108,7 @@ public class GlobalSiteImpl implements GlobalSite
 | 
			
		||||
   */
 | 
			
		||||
 | 
			
		||||
  private static Category logger = Category.getInstance(GlobalSiteImpl.class);
 | 
			
		||||
  private static Category my_logger = Category.getInstance(BackgroundThread.class);
 | 
			
		||||
 | 
			
		||||
  private static final int NUM_TASK_THREADS = 4;   // TODO: make this configurable
 | 
			
		||||
 | 
			
		||||
@ -151,16 +174,34 @@ public class GlobalSiteImpl implements GlobalSite
 | 
			
		||||
    // Initialize the stock messages list.
 | 
			
		||||
    stock_messages = new StockMessages(sect);
 | 
			
		||||
 | 
			
		||||
    // Initialize the task queues.
 | 
			
		||||
    int i;
 | 
			
		||||
    task_queues = new LinkedList[TASK_PRIO_MAX + 1];
 | 
			
		||||
    for (i=0; i<task_queues.length; i++)
 | 
			
		||||
      task_queues[i] = new LinkedList();
 | 
			
		||||
 | 
			
		||||
    // Initialize the threads.
 | 
			
		||||
    task_threads = new Thread[NUM_TASK_THREADS];
 | 
			
		||||
    for (i=0; i<task_threads.length; i++)
 | 
			
		||||
    { // create and kick off the background threads
 | 
			
		||||
      task_threads[i] = new BackgroundThread(i);
 | 
			
		||||
      task_threads[i].start();
 | 
			
		||||
 | 
			
		||||
    } // end for
 | 
			
		||||
 | 
			
		||||
    // Get the <dictionary/> section.
 | 
			
		||||
    sect = loader.configGetSubSection(config_h,"dictionary");
 | 
			
		||||
 | 
			
		||||
    // Retrieve the list of dictionary files.
 | 
			
		||||
    Collection dictionary_tmp = getDictionaryNames(sect,application_root);
 | 
			
		||||
 | 
			
		||||
    // Load the lexicon with all the dictionary files.
 | 
			
		||||
    LazyTreeLexicon lex = new LazyTreeLexicon((String[])(dictionary_tmp.toArray(new String[0])));
 | 
			
		||||
    this.queueTask(lex,TASK_PRIO_MAX-2);  // load the lexicon
 | 
			
		||||
 | 
			
		||||
    // Create the intermediate object map for HTML checker creation, and "seed" it.
 | 
			
		||||
    HashMap intermediate_map = new HashMap();
 | 
			
		||||
    SpellingRewriter spell_rewriter = new SpellingRewriter();
 | 
			
		||||
    LazyTreeLexicon lex = new LazyTreeLexicon((String[])(dictionary_tmp.toArray(new String[0])));
 | 
			
		||||
    spell_rewriter.addDictionary(lex);
 | 
			
		||||
    intermediate_map.put(spell_rewriter.getClass().getName(),spell_rewriter);
 | 
			
		||||
    PostLinkRewriter postlink_rewriter = new PostLinkRewriter(this);
 | 
			
		||||
@ -171,7 +212,6 @@ public class GlobalSiteImpl implements GlobalSite
 | 
			
		||||
    // Get the <html-checker/> section.
 | 
			
		||||
    sect = loader.configGetSubSection(config_h,"html-checker");
 | 
			
		||||
    NodeList nl = sect.getChildNodes();
 | 
			
		||||
    int i;
 | 
			
		||||
    for (i=0; i<nl.getLength(); i++)
 | 
			
		||||
    { // get each node in turn and test to see if it's an element
 | 
			
		||||
      Node n = nl.item(i);
 | 
			
		||||
@ -204,20 +244,6 @@ public class GlobalSiteImpl implements GlobalSite
 | 
			
		||||
 | 
			
		||||
    } // end for
 | 
			
		||||
 | 
			
		||||
    // Initialize the task queues.
 | 
			
		||||
    task_queues = new LinkedList[TASK_PRIO_MAX + 1];
 | 
			
		||||
    for (i=0; i<task_queues.length; i++)
 | 
			
		||||
      task_queues[i] = new LinkedList();
 | 
			
		||||
 | 
			
		||||
    // Initialize the threads.
 | 
			
		||||
    task_threads = new Thread[NUM_TASK_THREADS];
 | 
			
		||||
    for (i=0; i<task_threads.length; i++)
 | 
			
		||||
    { // create and kick off the background threads
 | 
			
		||||
      task_threads[i] = new BackgroundThread(i);
 | 
			
		||||
      task_threads[i].start();
 | 
			
		||||
 | 
			
		||||
    } // end for
 | 
			
		||||
 | 
			
		||||
  } // end class GlobalSiteImpl
 | 
			
		||||
 | 
			
		||||
  /*--------------------------------------------------------------------------------
 | 
			
		||||
@ -439,7 +465,7 @@ public class GlobalSiteImpl implements GlobalSite
 | 
			
		||||
 | 
			
		||||
  private final Runnable getNextTaskToRun(boolean quitting)
 | 
			
		||||
  {
 | 
			
		||||
    int i;
 | 
			
		||||
    int i;  // loop counter for cycling through priority queues
 | 
			
		||||
 | 
			
		||||
    synchronized (task_semaphore)
 | 
			
		||||
    { // acquire the task semaphore
 | 
			
		||||
@ -476,6 +502,7 @@ public class GlobalSiteImpl implements GlobalSite
 | 
			
		||||
    } // end synchronized block
 | 
			
		||||
 | 
			
		||||
    return null;  // if get here, task_running is false and we are quitting
 | 
			
		||||
                  // (or the task queue is empty, if quitting is true)
 | 
			
		||||
 | 
			
		||||
  } // end getNextTaskToRun
 | 
			
		||||
 | 
			
		||||
@ -615,7 +642,7 @@ public class GlobalSiteImpl implements GlobalSite
 | 
			
		||||
      try
 | 
			
		||||
      { // wait for a thread to die
 | 
			
		||||
	task_threads[i].join();
 | 
			
		||||
	i++;
 | 
			
		||||
	task_threads[i++] = null;
 | 
			
		||||
 | 
			
		||||
      } // end try
 | 
			
		||||
      catch (InterruptedException e)
 | 
			
		||||
@ -624,6 +651,9 @@ public class GlobalSiteImpl implements GlobalSite
 | 
			
		||||
 | 
			
		||||
    } // end while
 | 
			
		||||
 | 
			
		||||
    task_threads = null;
 | 
			
		||||
    task_queues = null;
 | 
			
		||||
    task_semaphore = null;
 | 
			
		||||
    engine_svc = null;
 | 
			
		||||
    if (datapool!=null)
 | 
			
		||||
      datapool.closeAllConnections();
 | 
			
		||||
 | 
			
		||||
@ -57,6 +57,7 @@ public class DataPool implements Runnable
 | 
			
		||||
  private boolean pending = false;     // pending connection being created?
 | 
			
		||||
  private Vector avail_connections;    // connections which are available for use
 | 
			
		||||
  private Vector busy_connections;     // connections which are currently in use
 | 
			
		||||
  private int thread_counter = 1;      // counter for data pool threads
 | 
			
		||||
 | 
			
		||||
  /*--------------------------------------------------------------------------------
 | 
			
		||||
   * Constructor
 | 
			
		||||
@ -273,7 +274,7 @@ public class DataPool implements Runnable
 | 
			
		||||
    pending = true;
 | 
			
		||||
    try
 | 
			
		||||
    { // spin off the connection attempt to the background
 | 
			
		||||
      Thread thrd = new Thread(this);
 | 
			
		||||
      Thread thrd = new Thread(this,"DataPool_bkgd_" + (thread_counter++));
 | 
			
		||||
      thrd.start();
 | 
			
		||||
 | 
			
		||||
    } // end try
 | 
			
		||||
 | 
			
		||||
@ -7,11 +7,11 @@
 | 
			
		||||
 * WARRANTY OF ANY KIND, either express or implied. See the License for the specific
 | 
			
		||||
 * language governing rights and limitations under the License.
 | 
			
		||||
 * 
 | 
			
		||||
 * The Original Code is the Venice Web Community System.
 | 
			
		||||
 * The Original Code is the Venice Web Communities System.
 | 
			
		||||
 * 
 | 
			
		||||
 * The Initial Developer of the Original Code is Eric J. Bowersox <erbo@silcom.com>,
 | 
			
		||||
 * for Silverwrist Design Studios.  Portions created by Eric J. Bowersox are
 | 
			
		||||
 * Copyright (C) 2001 Eric J. Bowersox/Silverwrist Design Studios.  All Rights Reserved.
 | 
			
		||||
 * Copyright (C) 2001-02 Eric J. Bowersox/Silverwrist Design Studios.  All Rights Reserved.
 | 
			
		||||
 * 
 | 
			
		||||
 * Contributor(s): 
 | 
			
		||||
 */
 | 
			
		||||
@ -28,14 +28,14 @@ public class LazyLexicon implements Runnable, SpellingDictionary
 | 
			
		||||
   *--------------------------------------------------------------------------------
 | 
			
		||||
   */
 | 
			
		||||
 | 
			
		||||
  private static Category logger = Category.getInstance(LazyLexicon.class.getName());
 | 
			
		||||
  private static Category logger = Category.getInstance(LazyLexicon.class);
 | 
			
		||||
 | 
			
		||||
  /*--------------------------------------------------------------------------------
 | 
			
		||||
   * Attributes
 | 
			
		||||
   *--------------------------------------------------------------------------------
 | 
			
		||||
   */
 | 
			
		||||
 | 
			
		||||
  private Lexicon inner_lex = null;
 | 
			
		||||
  private volatile Lexicon inner_lex = null;
 | 
			
		||||
  private String[] filenames;
 | 
			
		||||
 | 
			
		||||
  /*--------------------------------------------------------------------------------
 | 
			
		||||
@ -47,9 +47,7 @@ public class LazyLexicon implements Runnable, SpellingDictionary
 | 
			
		||||
  {
 | 
			
		||||
    this.filenames = filenames;  // save off the file names to be loaded
 | 
			
		||||
 | 
			
		||||
    // spin off the load process into the background
 | 
			
		||||
    Thread thrd = new Thread(this);
 | 
			
		||||
    thrd.start();
 | 
			
		||||
    // do not background the load yet, we'll queue it as a task
 | 
			
		||||
 | 
			
		||||
  } // end constructor
 | 
			
		||||
 | 
			
		||||
@ -90,7 +88,7 @@ public class LazyLexicon implements Runnable, SpellingDictionary
 | 
			
		||||
    Lexicon lex = new Lexicon();
 | 
			
		||||
 | 
			
		||||
    if (logger.isDebugEnabled())
 | 
			
		||||
      logger.debug("LazyLexicon loading " + String.valueOf(filenames.length) + " lexicon(s)");
 | 
			
		||||
      logger.debug("LazyLexicon loading " + filenames.length + " lexicon(s)");
 | 
			
		||||
 | 
			
		||||
    for (int i=0; i<filenames.length; i++)
 | 
			
		||||
    { // load data into the lexicon in turn
 | 
			
		||||
 | 
			
		||||
@ -7,11 +7,11 @@
 | 
			
		||||
 * WARRANTY OF ANY KIND, either express or implied. See the License for the specific
 | 
			
		||||
 * language governing rights and limitations under the License.
 | 
			
		||||
 * 
 | 
			
		||||
 * The Original Code is the Venice Web Community System.
 | 
			
		||||
 * The Original Code is the Venice Web Communities System.
 | 
			
		||||
 * 
 | 
			
		||||
 * The Initial Developer of the Original Code is Eric J. Bowersox <erbo@silcom.com>,
 | 
			
		||||
 * for Silverwrist Design Studios.  Portions created by Eric J. Bowersox are
 | 
			
		||||
 * Copyright (C) 2001 Eric J. Bowersox/Silverwrist Design Studios.  All Rights Reserved.
 | 
			
		||||
 * Copyright (C) 2001-02 Eric J. Bowersox/Silverwrist Design Studios.  All Rights Reserved.
 | 
			
		||||
 * 
 | 
			
		||||
 * Contributor(s): 
 | 
			
		||||
 */
 | 
			
		||||
@ -28,14 +28,14 @@ public class LazyTreeLexicon implements Runnable, SpellingDictionary
 | 
			
		||||
   *--------------------------------------------------------------------------------
 | 
			
		||||
   */
 | 
			
		||||
 | 
			
		||||
  private static Category logger = Category.getInstance(LazyTreeLexicon.class.getName());
 | 
			
		||||
  private static Category logger = Category.getInstance(LazyTreeLexicon.class);
 | 
			
		||||
 | 
			
		||||
  /*--------------------------------------------------------------------------------
 | 
			
		||||
   * Attributes
 | 
			
		||||
   *--------------------------------------------------------------------------------
 | 
			
		||||
   */
 | 
			
		||||
 | 
			
		||||
  TreeLexicon inner_lex = null;
 | 
			
		||||
  volatile TreeLexicon inner_lex = null;
 | 
			
		||||
  private String[] filenames;
 | 
			
		||||
 | 
			
		||||
  /*--------------------------------------------------------------------------------
 | 
			
		||||
@ -47,9 +47,7 @@ public class LazyTreeLexicon implements Runnable, SpellingDictionary
 | 
			
		||||
  {
 | 
			
		||||
    this.filenames = filenames;  // save off the file names to be loaded
 | 
			
		||||
 | 
			
		||||
    // spin off the load process into the background
 | 
			
		||||
    Thread thrd = new Thread(this);
 | 
			
		||||
    thrd.start();
 | 
			
		||||
    // do not start a new thread, we'll queue the lexicon up as a task
 | 
			
		||||
 | 
			
		||||
  } // end constructor
 | 
			
		||||
 | 
			
		||||
@ -63,7 +61,7 @@ public class LazyTreeLexicon implements Runnable, SpellingDictionary
 | 
			
		||||
    while (inner_lex==null)
 | 
			
		||||
    { // wait for the inner thread to finish creating the lexicon
 | 
			
		||||
      if (logger.isDebugEnabled())
 | 
			
		||||
	logger.debug("LazyLexicon: waiting for lex to load...");
 | 
			
		||||
	logger.debug("LazyTreeLexicon: waiting for lex to load...");
 | 
			
		||||
 | 
			
		||||
      try
 | 
			
		||||
      { // park the thread here until we know what's up
 | 
			
		||||
@ -90,7 +88,7 @@ public class LazyTreeLexicon implements Runnable, SpellingDictionary
 | 
			
		||||
    TreeLexicon lex = new TreeLexicon();
 | 
			
		||||
 | 
			
		||||
    if (logger.isDebugEnabled())
 | 
			
		||||
      logger.debug("LazyTreeLexicon loading " + String.valueOf(filenames.length) + " lexicon(s)");
 | 
			
		||||
      logger.debug("LazyTreeLexicon loading " + filenames.length + " lexicon(s)");
 | 
			
		||||
 | 
			
		||||
    for (int i=0; i<filenames.length; i++)
 | 
			
		||||
    { // load data into the lexicon in turn
 | 
			
		||||
@ -108,13 +106,13 @@ public class LazyTreeLexicon implements Runnable, SpellingDictionary
 | 
			
		||||
	  lex.addWord(word);
 | 
			
		||||
	  word = rdr.readLine();
 | 
			
		||||
	  if (((++counter % 1000)==0) && logger.isDebugEnabled())
 | 
			
		||||
	    logger.debug("loaded " + String.valueOf(counter) + " words");
 | 
			
		||||
	    logger.debug("loaded " + counter + " words");
 | 
			
		||||
 | 
			
		||||
	} // end while
 | 
			
		||||
 | 
			
		||||
	rdr.close();
 | 
			
		||||
	if (logger.isDebugEnabled())
 | 
			
		||||
	  logger.debug("finished loading " + filenames[i] + ", " + String.valueOf(counter) + " words loaded");
 | 
			
		||||
	  logger.debug("finished loading " + filenames[i] + ", " + counter + " words loaded");
 | 
			
		||||
 | 
			
		||||
      } // end try
 | 
			
		||||
      catch (IOException e)
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user