开发者

Sending processed messages to specific threads

I have a set of threads where each thread has to wait for its required input, do some computation, and finally, send its output values to specific threads.

I plan to have 开发者_JAVA百科global map containing the name of the thread, and the thread itself, in order to let each thread get its "successors" threads by name and then, send them the values.

First, I looked at the Producer-Consumer example using blocking queues:

class Consumer implements Runnable {
    private final BlockingQueue queue;

    Consumer(BlockingQueue q) {
        queue = q;
    }

    public void run() {
        try {
            while(true) { 
                System.out.println("Waiting for input");
                consume(queue.take()); 
            }
        } catch (InterruptedException ex) { 
            ex.printStackTrace();
        }
    }

    void consume(Object x) { 
        System.out.println("Received: " + x);
    }
}

class Setup {
    public static void main(String...args) {
        BlockingQueue q = new ArrayBlockingQueue<String>(10);
        Producer p = new Producer(q);
        Consumer c1 = new Consumer(q);
        Consumer c2 = new Consumer(q);
        new Thread(p).start();
        new Thread(c1).start();
        new Thread(c2).start();
    }
}

I thought I could have a blocking queue for each thread. A Consumer thread would then be looping over queue.take() until it receives all the desired values.

Later, I found this post, where a similar question to mine is asked. The solution proposed seems to be easier than the blocking queue solution: it is based on just calling a method on the thread I want so send the message to.

I would like to ask you for some advice (since I think it is a common scenario) on which of the two approach would be the best, or if there is a better way to achieve what I want.

Thank you very much for your help.


Consumer-Producer is fine. (That "answer" you refer to in the references SO questions is a can of worms .. think it through ...)

You can use Queue, Pipe, or even PipedInputStream and PipedOutputStream. There is also Exchanger.

Here is a mod of the example from the Exchanger javadoc. Don't worry about the nested classes, it is just a compact style -- not at all relevant to subject matter.

Here we have a 'pipeline' class. It has 2 threads (R/L in the names refer to Left, Right). Pipeline flow is R->L.

/* 
 * mostly based on 
 * http://download.oracle.com/javase/6/docs/api/java/util/concurrent/Exchanger.html 
 */
package so_6936111;

import java.util.concurrent.Exchanger;

public class WorkflowDemo {

    public static void main(String[] args) {
        Pipeline pipeline = new Pipeline();
        pipeline.start();
    }
    // ----------------------------------------------------------------
    // Pipeline
    // ----------------------------------------------------------------

    public static class Pipeline {

        /** exchanger for messages */
        Exchanger<Message> exchanger = new Exchanger<Message>();

        /* the two message instances that are passed back and forth */
        Message msg_1 = new Message();
        Message msg_2 = new Message();

        /** startups the pipeline */
        void start() {
            new Thread(new WorkerR()).start();
            new Thread(new WorkerL()).start();
        }


        /** Message objects are passed between workflow threads */
        public static class Message {
            private Object content;
            public Object getContent() { return content; }
            public void setContent(Object c) { this.content = c; }
        }


        /** WorkerR is at the head of the pipeline */
        class WorkerR implements Runnable {
            public void run() {
                Message message = msg_1;
                try {
                    while (true) {
                        Object data = doSomeWork();
                        message.setContent(data);
                        message = exchanger.exchange(message);
                    }
                } catch (InterruptedException ex) { ex.printStackTrace();}
            }
            /** 
             * let's pretend this is where you get your 
             * initial data and do some work
             */
             private Object doSomeWork() {
                return String.format("STEP-1@t:%d", System.nanoTime());
             }
        }

        /** WorkerL is at the tail of the pipeline */
        class WorkerL implements Runnable {
            public void run() {
                Message message = msg_2;
                try {
                    while (true) {
                        message = exchanger.exchange(message);
                        Object data = doPostProcessing(message.getContent());
                        System.out.format("%s\n", data);
                    }
                } catch (InterruptedException ex) { ex.printStackTrace();}
            }

            /**
             * Let's pretend this is where the 2nd step of the workflow.
             */
            private Object doPostProcessing(Object data) {
                return String.format("%s | STEP-2@t:%d", data, System.nanoTime());
            }
        }
    }
}

Output:

STEP-1@t:1312434325594730000 | STEP-2@t:1312434325594747000
STEP-1@t:1312434325594750000 | STEP-2@t:1312434325594765000
STEP-1@t:1312434325594768000 | STEP-2@t:1312434325594784000
STEP-1@t:1312434325594787000 | STEP-2@t:1312434325594804000
STEP-1@t:1312434325594806000 | STEP-2@t:1312434325594823000
STEP-1@t:1312434325594826000 | STEP-2@t:1312434325594841000
...
0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜