开发者

Java Throttling

How do I use a combination of ScheduledThreadPoolExe开发者_开发百科cutor, ScheduledFuture and ExecutorCompletionService to throttle Callable commands that accept a variable parameter? Upon receiving a response from a Callable command, I need to create a new Callable command based on the output of the aforementioned Callable command. I also need to adhere to a threshold of 100 calls per second.


You should implement the Leaky Bucket algorithm. Before making a call, block until you have a token. You can implement this algorithm in a few dozen lines of Java.


I would suggest using a broker, for example RabbitMQ. You could setup the maximum number of consumers to 100 and have a single Producer instance which publishes at a rate of 100 messages per second.

Here you can find an explanation of three methods for implementing a throttle mechanism in a distributed system. The one you will be interested in playing with is the distributed which uses RabbitMQ. This one is designed to limit the number of concurrent messages at any given time, let's say at most 100 at any given time. You would need to modify it so the publisher publishes no more than 100 messages per second. At the bottom you can find an url to the git repository with the source code but anyway, I'm pasting it as well here.

Edit from Comments:

First uses java.util.Semaphore which is configured with the number of permits it will handle. Each thread will try to acquire a permit and will be blocked if there are no permits left until one is freed. Second one uses a fixed size ThreadPoolExecutor. The executor will have at most the specified number of working threads at any given time. Third one uses RabbitMQ. The maximum number of concurrent consumers would be the maximum number of working threads. The git repo has a more detailed explanation in English. Hope this helps


You can use semaphores for throttling. You need to distiguish whether you A) "throttle per instant" (an upper bound for jobs at the same time) or B) "throttle per intervall" (an upper bound for jobs within one intervall)

A) Counting a semaphore up and down is enough for the "throttle per instant". E.g.

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;

public class ThrottlePerPerInstantSample {
    private static final int JOBS_COUNT = 100;
    private static final int JOBS_THROTTLE_PER_INSTANT = 10;
    private static final Semaphore THROTTLE_PER_INSTANT_SEMAPHORE = new Semaphore(
            JOBS_THROTTLE_PER_INSTANT);
    private static final ExecutorService executorService = Executors
            .newFixedThreadPool(JOBS_THROTTLE_PER_INSTANT);

    private final static AtomicInteger jobsAtTheSameTimeCounter = new AtomicInteger(
            0);

    public static void main(String[] args) throws InterruptedException {
        for (int i = 1; i <= JOBS_COUNT; i++) {
            THROTTLE_PER_INSTANT_SEMAPHORE.acquire();
            final PrintJob printJob = new PrintJob(i, jobsAtTheSameTimeCounter);
            final ThrottledJob throttledJob = new ThrottledJob(printJob,
                    THROTTLE_PER_INSTANT_SEMAPHORE);
            executorService.execute(throttledJob);
        }
        executorService.shutdown();
    }

    static class ThrottledJob implements Runnable {
        private final Runnable delegate;
        private final Semaphore throttlePerInstantSemaphore;

        public ThrottledJob(Runnable delegate,
                Semaphore throttlePerInstantSemaphore) {
            super();
            this.delegate = delegate;
            this.throttlePerInstantSemaphore = throttlePerInstantSemaphore;
        }

        @Override
        public void run() {
            try {
                delegate.run();
            } finally {
                throttlePerInstantSemaphore.release();
            }
        }

    }

    static class PrintJob implements Runnable {
        final int jobNumber;
        final AtomicInteger jobsAtTheSameTimeCounter;

        public PrintJob(int jobNumber, AtomicInteger jobsAtTheSameTimeCounter) {
            super();
            this.jobNumber = jobNumber;
            this.jobsAtTheSameTimeCounter = jobsAtTheSameTimeCounter;
        }

        public void run() {
            jobsAtTheSameTimeCounter.incrementAndGet();

            try {
                Thread.sleep(50); // wait some time
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            synchronized (System.out) {
                System.out.println(jobsAtTheSameTimeCounter.getAndDecrement()
                        + " : Job " + jobNumber);
            }
        }
    }
}

The output can be:

10 : Job 1
9 : Job 2
8 : Job 3
7 : Job 5
7 : Job 4
...
9 : Job 87
10 : Job 90
9 : Job 89
10 : Job 91
9 : Job 92
8 : Job 93
7 : Job 94
6 : Job 95
5 : Job 98
4 : Job 97
3 : Job 96
2 : Job 100
1 : Job 99

B) Counting a semaphore down and periodically resetting the semaphore to its initial value is enough for the "throttle per intervall".

E.g.

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class ThrottlePerIntervallSample {
    private static final int JOBS_COUNT = 100;
    private static final int JOBS_THROTTLE_PER_INTERVALL = 10;
    private static final long INTERVALL_IN_UNITS = 1;
    private static final TimeUnit UNIT_OF_INTERVALL = TimeUnit.SECONDS;

    private static final Semaphore THROTTLE_PER_INTERVALL_SEMAPHORE = new Semaphore(
            JOBS_THROTTLE_PER_INTERVALL);

    private static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors
            .newScheduledThreadPool(JOBS_THROTTLE_PER_INTERVALL + 1); 
    // plus one because the resetting of the semaphore must be possible! 

    public static void main(String[] args) throws InterruptedException {

        SCHEDULED_EXECUTOR_SERVICE.scheduleAtFixedRate(()-> {
            THROTTLE_PER_INTERVALL_SEMAPHORE.drainPermits(); // remove permits from previous intervall
        THROTTLE_PER_INTERVALL_SEMAPHORE.release(JOBS_THROTTLE_PER_INTERVALL); // set permits for the next intervall
        }, INTERVALL_IN_UNITS, INTERVALL_IN_UNITS, UNIT_OF_INTERVALL);

        for (int i = 1; i <= JOBS_COUNT; i++) {
            THROTTLE_PER_INTERVALL_SEMAPHORE.acquire();
            final PrintJob printJob = new PrintJob(i);
            SCHEDULED_EXECUTOR_SERVICE.execute(printJob);
        }

        SCHEDULED_EXECUTOR_SERVICE.shutdown();
    }

    static class PrintJob implements Runnable {
        final int jobNumber;

        public PrintJob(int jobNumber) {
            super();
            this.jobNumber = jobNumber;
        }

        public void run() {

            try {
                Thread.sleep(50); // wait some time
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("HH:mm:ss:SSS");

            synchronized (System.out) {
                System.out.println(simpleDateFormat.format(new Date())
                        + " : Job " + jobNumber);
            }
        }
    }

}

The output can be:

00:42:29:253 : Job 9
00:42:29:255 : Job 2
00:42:29:255 : Job 6
00:42:29:255 : Job 5
00:42:29:255 : Job 10
00:42:29:256 : Job 7
00:42:29:256 : Job 3
00:42:29:256 : Job 1
00:42:29:256 : Job 8
00:42:29:257 : Job 4
00:42:30:140 : Job 11
...
00:42:37:142 : Job 90
00:42:38:140 : Job 91
00:42:38:140 : Job 92
00:42:38:141 : Job 99
00:42:38:141 : Job 93
00:42:38:141 : Job 94
00:42:38:142 : Job 98
00:42:38:142 : Job 96
00:42:38:142 : Job 95
00:42:38:143 : Job 100
00:42:38:143 : Job 97

Some remarks:

1) Better use the method tryAcquire with a timeout instead of acquire in a productive system to avoid a factual deadlock!

2) call aquire/tryAquire before you submit the jobs to the (scheduled) executor service if you deal with many jobs. Otherwise you might pollute the queue of the thread pool with too many jobs at that moment.


If you like re-using code well written and working, you can use the TimedSemaphorewhich is part of the commons library of Apache (org.apache.commons.lang3.concurrent.TimedSemaphore).

For instance if you need to limit to 80 calls in 10 seconds:

  private TimedSemaphore sem = new TimedSemaphore(10, TimeUnit.SECONDS, 80);
  [...]
  public void myMethod() throws InterruptedException {
    sem.acquire();
    // requests which need throttling
  }
  
0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜