开发者

Help with Java Multithreading

Alright, this is a sort of dual-purpose question. The main thing I hope to take away from this is more knowledge of multithreading. I am a complete newbie when it comes to multithreading, and this is my first actual attempt to do anything in multiple threads. The other thing I hope to take away is some help with a piece of homework that I am turning into a much more complicated franken-project for fun and learning. In the first part of this question I am going to detail my thinking and methodology for the threads I have been working on in my homework assignment. If I am doing anything that is bad practice, needs to be fixed, whatever, please let me know so I can learn. Again, I know practically nothing about multithreading.

First off, I am currently taking a computer science course which, to be nice about it, has homework which uses techniques and data structures that I've already learned and is thus not challenging. To attempt to not be entirely bored out of my skull, I am trying to take a simple project (creating a Linked List and a sorted Linked List) and turn it into a multi-threaded franken-program. I would like the method for adding new elements to be in a separate thread and gets input from a queue (not entirely necessary for an un-ordered list, but will be more useful for proper ordering), and for the ordered list I want a separate thread that will patrol the list to make sure all elements are in order (one of the methods that I am NOT ALLOWED to change returns references to entire nodes, which have completely public data). Those are the two main things I would want to create, but definitely not the only things I might want to try and figure out. This is meant to be pretty much just a test to learn about multithreading, so I'm not particularly designing this project for practicality. If you want to leave a comment about what is good programming practice for what should be threaded or not, it would be highly appreciated.

As a step I took just recently, the idea for which was gleaned from reading a post on StackOverflow itself, I have created a master class which has Threads passed to it so it can stop them and clean everything up at the end of the program. This is done through use someThread.interrupt() and having the Thread's run() method checking for that exception. However, I have found a problem with this method: the loop that runs through the Threads and calling interrupt(), most times, does not actually run. Here is the code for the method:

public static void stopAllThreads()
{
    boolean stopped = false;
    while( !stopped )
    {
        System.out.println( "Stopping all threads" );

        synchronized( threads )
        {
            System.out.println( "Threads being stopped: " + threads.size() );
            for( int i = 0; i < threads.size(); i++ )
            {
                System.out.println( "Stopping: " + threads.get( i ) );
                threads.get( i ).interrupt();
            }
            threads.clear();
            stopped = true;
        }
    }
}

While attempting to debug it, I put in the while loop to try and force the method to eventually run the for loop, but all that happens is it outputs "Stopping all threads" and then I never see anything else. I don't know if this is a bad way to code, or what is wrong with this code. Any help figuring out how to make this work is appreciated (if you need to see more of the class, please let me know. I don't really know what you need to see, but I don't want to copy and paste several entire java files).

On top of this problem, I have also determined that when running my program, it will reach the last line and attempt to stop all Threads before the Thread going through the queue of numbers to add to the list is finished adding everything. This is also a problem when attempting to output the list through a toString method directly after adding numbers to the queue, as nothing will yet be added, and thus nothing will be output to the console. Here is my main method that shows the order of things happening (though I imagine that it is less important when dealing with threading):

public static void main( String[] args )
{
    // always make sure this is the first thing created
    ActiveThreads bla = new ActiveThreads();

    Comparator< Integer > temp = new Comparator< Integer >() 
            { 
                public int compare( Integer i, Integer j )
                { 
                    if( i > j ) return 1;
                    if( i < j ) return -1;
                    return 0; 
                } 
            };
    List< Integer > woogle = new List< Integer >( temp );

    woogle.add( 1 );
    woogle.add( 2 );
    woogle.add( 3 );
    woogle.add( 4 );
    System.out.println( woogle );

    ActiveThreads.stopAllThreads();
}

Lastly, here is the custom thread that I have made which is supposed to check the queue for elements, then add all elements in the queue to my actual list:

private class AddThread implements Runnable
  {
      public AddThread()
      {

      }

      public void run()
      {
          while( running )
          {
              synchronized( addQueue )
              {
                  while( !addQueue.isEmpty() )
                  {
                      System.out.println( "Going through queue" );
                      T temp = addQueue.poll();
                      if( head == null )
                      {
                          head = new Node< T >();
                          last = head;
                          head.data = temp;
                          largest = temp;
                          smallest = temp;
                      }
                      else
                      {
                          last.next = new Node< T >();
                          last.next.data = temp;
                          last = last.next;
                          if( mComparator.compare( temp,  largest ) == 1 )
                          {
                              largest = temp;
                          }
                          else if( mComparator.compare( temp, smallest ) == -1 )
                          {
                              smallest = temp;
                          }
                      }
                      ++size;
                  }
              }

              System.out.println( "Pausing " + addThread );
              synchronized( addThread )
              {
                  pause( 200 );
              }
          }
      }

      private void pause( long time )
      {
          try
          {
              addThread.wait( time );
          } 
          catch ( InterruptedException e ) 
          {
              running = false;
              addThread.notify();
          }
      }

      private boolean running = true;
  }

Now, the absolute last thing I would like to ask for are some of the basic primers and guidelines and such for good multithreading. I have found a good amount of tutorials, but there are some things that people don't mention in tutorials, as well as most tutorials are only so-so. If you know of any good tutorials, I would appreciate links to them. If there are any good coding methodologies for multithreading, that would be very appreciated as well. Just any sort of information that you think might help me wrap my brain around this concept, as well as make sure that when implementing it, I do it in a way that is less prone to bugs and people won't yell at me for bad practices.

EDIT: To answer the questions matt b put into a comment, addThread is purely a reference to the Thread that looks at the queue which has elements in it to add to the list and puts them into the list. It is a private Thread inside of the class containing the AddThread class (clever use of names, eh?). Threads are constructed in a way that was posted in another question at StackOverflow. Here is an example (and also a snippet of code showing what addThread is):

addThread = new Thread( new AddThread(), "Thread for adding elements" );
addThread.setPriority( 6 );
addThread.start();
ActiveThreads.addThread( addThread );

All Threads are created in a similar way. As to what the Thread I posted is supposed to be doing, I did explain that in detail above, but I'll try to put a synopsis here.

The AddThread class is an inner class which gets created at time of construction of the linked list class I am trying to make. It looks at a private queue which may or may not have elements in it, and when it does, it puts those elements into the linked list. The queue gets added too when the linked list's add( T newElt ) method is called.

EDIT EDIT开发者_如何学运维: In the interest of full disclosure, and for those who wish to take the time, I am going to post the entirety of the two relevant java files here.

List.java

public class List< T > implements Iterable< T >, Comparable< List< T > > 
{
  // you may not change this inner class!
  class Node< D > 
  {
      D data;
      Node< D > next;
  }

  public List( Comparator< T > comparator ) 
  {
      mComparator = comparator;
      head = null;
      last = null;
      size = 0;
      addQueue = new LinkedList< T >();

      addThread = new Thread( new AddThread(), "Thread for adding elements" );
      addThread.setPriority( 6 );
      addThread.start();
      ActiveThreads.addThread( addThread );
  }

  public void add( T newElt ) 
  {
      synchronized( addQueue )
      {
          addQueue.add( newElt );
      }
  }

  public T get( int index ) 
  {
      if( index > size )
          throw new IndexOutOfBoundsException();

      Node< T > temp = head;
      for( int i = 0; i < index; i++ )
      {
          temp = temp.next;
      }
      return temp.data;
  }

  public Node< T > lookup( T element ) 
  {
      Node< T > temp = head;
      for( int i = 0; i < size; i++ )
      {
          if( temp.data == element )
              return temp;
          temp = temp.next;
      }
      throw new NoSuchElementException();
  }

  public int size() 
  {
      return size;
  }

  public boolean isEmpty() 
  {
      return head == null;
  }

  public void delete( T element ) 
  {
      throw new UnsupportedOperationException("You must implement this method.");
  }

  public void replace( T oldElt, T newElt ) 
  {
      try
      {
          Node< T > temp = lookup( oldElt );
          temp.data = newElt;
      }
      catch( NoSuchElementException e )
      {
          throw e;
      }
  }

  public T getLargest() 
  {
      return largest;
  }

  public T getSmallest() 
  {
      return smallest;
  }

  public List< T > copy() 
  {
      throw new UnsupportedOperationException("You must implement this method.");
  }

  public String toString() 
  {
      StringBuffer ret = new StringBuffer();
      int i = 0;
      for( T x : this )
      {
          System.out.println( "Loop: " + i++ );
          ret.append( x + " " );
      }
      return ret.toString();
  }

  public int compareTo( List< T > other ) 
  {
      throw new UnsupportedOperationException("You must implement this method.");
  }

  public ListIterator< T > iterator() 
  {
      return new ListIterator< T >( head );
  }

  private class ListIterator< E > implements Iterator< E > 
  {
      private ListIterator( Node< E > head )
      {
          cur = head;
      }

      public boolean hasNext() 
      {
          if( cur == null )
              return false;

          System.out.println( "Iterator: " + cur.data );
          return cur.next == null;
      }

      @SuppressWarnings("unchecked")
      public E next() 
      {
          Node< E > temp = cur;
          cur = cur.next;
          return (E)temp.data;
      }

      public void remove() 
      {
          throw new UnsupportedOperationException("You do NOT need to implement " +
                                                  "this method.");
      }

      private Node< E > cur;
  }

  private class AddThread implements Runnable
  {
      public AddThread()
      {

      }

      public void run()
      {
          while( running )
          {
              synchronized( addQueue )
              {
                  while( !addQueue.isEmpty() )
                  {
                      System.out.println( "Going through queue" );
                      T temp = addQueue.poll();
                      if( head == null )
                      {
                          head = new Node< T >();
                          last = head;
                          head.data = temp;
                          largest = temp;
                          smallest = temp;
                      }
                      else
                      {
                          last.next = new Node< T >();
                          last.next.data = temp;
                          last = last.next;
                          if( mComparator.compare( temp,  largest ) == 1 )
                          {
                              largest = temp;
                          }
                          else if( mComparator.compare( temp, smallest ) == -1 )
                          {
                              smallest = temp;
                          }
                      }
                      ++size;
                  }
              }

              System.out.println( "Pausing " + addThread );
              synchronized( addThread )
              {
                  pause( 200 );
              }
          }
      }

      private void pause( long time )
      {
          try
          {
              addThread.wait( time );
          } 
          catch ( InterruptedException e ) 
          {
              running = false;
              addThread.notify();
          }
      }

      private volatile boolean running = true;
  }

  private Comparator< T > mComparator;
  private Node< T > head, last;
  private Thread addThread;
  // replace this with your own created class later
  private Queue< T > addQueue;
  private int size;
  private T largest, smallest;
}

ActiveThreads.java

public class ActiveThreads
{
public ActiveThreads()
{
    if( threads == null )
    {
        threads = new ArrayList< Thread >();
    }

    if( threadsMonitor == null )
    {
        ThreadsMonitor monitor = new ThreadsMonitor();
        Thread thread = new Thread( monitor, "Active Threads Monitor" );
        thread.setPriority( 3 );
        thread.start();
        threadsMonitor = thread;
    }
}

public static void stopMonitoring()
{
    synchronized( threadsMonitor )
    {
        threadsMonitor.interrupt();
    }
}

public static void addThread( Thread t )
{
    synchronized( threads )
    {
        threads.add( t );
        System.out.println( "Added thread: " + t );
    }
}

public static void addThread( Runnable r, String s )
{
    Thread t = new Thread( r, s );
    t.start();
    addThread( t );
}

public static void stopThread( Thread t )
{
    synchronized( threads )
    {
        for( int i = 0; i < threads.size(); i++ )
        {
            if( threads.get( i ) == t )
            {
                threads.get( i ).interrupt();
                threads.remove( i );
            }
        }
    }
}

public static void stopAllThreads()
{
    boolean stopped = false;
    while( stopped == false )
    {
        System.out.println( "Stopping all threads" );

        synchronized( threads )
        {
            System.out.println( "Threads being stopped: " + threads.size() );
            for( int i = 0; i < threads.size(); i++ )
            {
                System.out.println( "Stopping: " + threads.get( i ) );
                threads.get( i ).interrupt();
            }
            threads.clear();
            stopped = true;
        }
    }
}

private static ArrayList< Thread > threads = null;
private static Thread threadsMonitor = null;

private class ThreadsMonitor implements Runnable
{
    public ThreadsMonitor()
    {

    }

    public void run()
    {
        while( true )
        {
            synchronized( threads )
            {
                for( int i = 0; i < threads.size(); i++ )
                {
                    if( !threads.get( i ).isAlive() )
                    {
                        threads.get( i ).interrupt();
                        threads.remove( i );
                    }

                    synchronized( threadsMonitor )
                    {
                        try
                        {
                            threadsMonitor.wait( 5000 );
                        }
                        catch( InterruptedException e )
                        {
                            threadsMonitor.interrupted();
                            return;
                        }
                    }
                }
            }
        }
    }
}
}


It's hard to see the whole picture but some general points and even more general suggestions.

While attempting to debug it, I put in the while loop to try and force the method to eventually run the for loop, but all that happens is it outputs "Stopping all threads" and then I never see anything else

This is probably because another Thread is sitting in a sychronized block using the same lock and preventing your code from running. The easiest way to find out is to run it in a debugger and pause/break the program when you think it may be stuck. You should be able to inspect the threads and check their state. Look for BLOCKED status.

The while( !stopped ) in stopAllThreads is redundant because it can never loop.

In AddThread you have this

private boolean running = true;

When using a boolean to as a stop flag (which I think is what you are trying to achieve) AND that stop flag is polled by one thread but set by another then you MUST make it volatile. There is a whole area of Java multithreading coding that deals with data visibility and volatile is one of the tools used to ensure correctness.

Often programs work "fine most of the time" without "correct" multithreading logic. But they ARE broken and will most likely break at the most inconvenient time (usually as soon as a customer gets hold of it)! (If you remember one thing in my answer remember the previous sentence :D )

Leverage the java.util.concurrent package as much as you can. Although it is very important to understand the fundamentals too, this package has lots of very useful constructs designed by very clever people that will solve many common concurrency problems.

Read Java Concurrent In Practice. For the topic it is describing (potentially very dry) it explains the concepts very well in an accessible way with plenty of examples. It was written by the same group who worked on the java.util.concurrent package.

From my own experience I think that "visibility" of data in concurrent programs is the most important and the least understood part of Java thread programming. If you can get your head round that you will be well on your way. JCIP can help you with this.

Hope the above helps and good luck!

EDIT: Spotted another problem in your additional code with this construct

 for( int i = 0; i < threads.size(); i++ )
 {
    if( !threads.get( i ).isAlive() )
    {
       threads.get( i ).interrupt();
       threads.remove( i );
    }
 }

Doing a remove() inside an indexed scan of a list like this will not work as expected because the remove() interferes with the indexes of other items in the list (all subsequent items get shifted down one index).


Problem found: There was a nested synchronized block that went into a call to wait() at the innermost part of the block, without releasing the lock on the ArrayList of Threads.

synchronized( threads )
            {
                for( int i = 0; i < threads.size(); i++ )
                {
                    if( !threads.get( i ).isAlive() )
                    {
                        threads.get( i ).interrupt();
                        threads.remove( i );
                    }
                }
            }

            synchronized( threadsMonitor )
            {
                try
                {
                    threadsMonitor.wait( 5000 );
                }
                catch( InterruptedException e )
                {
                    threadsMonitor.interrupted();
                    return;
                }

Was the problem, removing the synchronization to threadsMonitor inside the synchronization to threads fixes the issue, as seen below. (I apologize for formatting issues, copy+paste is a fickle beast)

            synchronized( threadsMonitor )
            {
                try
                {
                    threadsMonitor.wait( 5000 );
                }
                catch( InterruptedException e )
                {
                    threadsMonitor.interrupted();
                    return;
                }
            }

synchronized( threads )
               {
                for( int i = 0; i < threads.size(); i++ )
                {
                    if( !threads.get( i ).isAlive() )
                    {
                        threads.get( i ).interrupt();
                        threads.remove( i );
                    }
                }
            }
        }

Thank you to Mike Q for the multithreading resource. I have no had time to do much with it yet, but I will definitely be taking a look and hopefully learning a lot. I will also be making a separate question for an issue I'm having with this other than stopping threads.

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜