开发者

how to write a process-pool bash shell

I have more than 10 t开发者_运维问答asks to execute, and the system restrict that there at most 4 tasks can run at the same time.

My task can be started like: myprog taskname

How can I write a bash shell script to run these task. The most important thing is that when one task finish, the script can start another immediately, making the running tasks count remain 4 all the time.


Use xargs:

xargs -P <maximum-number-of-process-at-a-time> -n <arguments-per-process> <command>

Details here.


I chanced upon this thread while looking into writing my own process pool and particularly liked Brandon Horsley's solution, though I couldn't get the signals working right, so I took inspiration from Apache and decided to try a pre-fork model with a fifo as my job queue.

The following function is the function that the worker processes run when forked.

# \brief the worker function that is called when we fork off worker processes
# \param[in] id  the worker ID
# \param[in] job_queue  the fifo to read jobs from
# \param[in] result_log  the temporary log file to write exit codes to
function _job_pool_worker()
{
    local id=$1
    local job_queue=$2
    local result_log=$3
    local line=

    exec 7<> ${job_queue}
    while [[ "${line}" != "${job_pool_end_of_jobs}" && -e "${job_queue}" ]]; do
        # workers block on the exclusive lock to read the job queue
        flock --exclusive 7
        read line <${job_queue}
        flock --unlock 7
        # the worker should exit if it sees the end-of-job marker or run the
        # job otherwise and save its exit code to the result log.
        if [[ "${line}" == "${job_pool_end_of_jobs}" ]]; then
            # write it one more time for the next sibling so that everyone
            # will know we are exiting.
            echo "${line}" >&7
        else
            _job_pool_echo "### _job_pool_worker-${id}: ${line}"
            # run the job
            { ${line} ; } 
            # now check the exit code and prepend "ERROR" to the result log entry
            # which we will use to count errors and then strip out later.
            local result=$?
            local status=
            if [[ "${result}" != "0" ]]; then
                status=ERROR
            fi  
            # now write the error to the log, making sure multiple processes
            # don't trample over each other.
            exec 8<> ${result_log}
            flock --exclusive 8
            echo "${status}job_pool: exited ${result}: ${line}" >> ${result_log}
            flock --unlock 8
            exec 8>&-
            _job_pool_echo "### _job_pool_worker-${id}: exited ${result}: ${line}"
        fi  
    done
    exec 7>&-
}

You can get a copy of my solution at Github. Here's a sample program using my implementation.

#!/bin/bash

. job_pool.sh

function foobar()
{
    # do something
    true
}   

# initialize the job pool to allow 3 parallel jobs and echo commands
job_pool_init 3 0

# run jobs
job_pool_run sleep 1
job_pool_run sleep 2
job_pool_run sleep 3
job_pool_run foobar
job_pool_run foobar
job_pool_run /bin/false

# wait until all jobs complete before continuing
job_pool_wait

# more jobs
job_pool_run /bin/false
job_pool_run sleep 1
job_pool_run sleep 2
job_pool_run foobar

# don't forget to shut down the job pool
job_pool_shutdown

# check the $job_pool_nerrors for the number of jobs that exited non-zero
echo "job_pool_nerrors: ${job_pool_nerrors}"

Hope this helps!


Using GNU Parallel you can do:

cat tasks | parallel -j4 myprog

If you have 4 cores, you can even just do:

cat tasks | parallel myprog

From http://git.savannah.gnu.org/cgit/parallel.git/tree/README:

Full installation

Full installation of GNU Parallel is as simple as:

./configure && make && make install

Personal installation

If you are not root you can add ~/bin to your path and install in ~/bin and ~/share:

./configure --prefix=$HOME && make && make install

Or if your system lacks 'make' you can simply copy src/parallel src/sem src/niceload src/sql to a dir in your path.

Minimal installation

If you just need parallel and do not have 'make' installed (maybe the system is old or Microsoft Windows):

wget http://git.savannah.gnu.org/cgit/parallel.git/plain/src/parallel
chmod 755 parallel
cp parallel sem
mv parallel sem dir-in-your-$PATH/bin/

Test the installation

After this you should be able to do:

parallel -j0 ping -nc 3 ::: foss.org.my gnu.org freenetproject.org

This will send 3 ping packets to 3 different hosts in parallel and print the output when they complete.

Watch the intro video for a quick introduction: https://www.youtube.com/playlist?list=PL284C9FF2488BC6D1


I would suggest writing four scripts, each one of which executes a certain number of tasks in series. Then write another script that starts the four scripts in parallel. For instance, if you have scripts, script1.sh, script2.sh, script3.sh, and script4.sh, you could have a script called headscript.sh like so.

#!/bin/sh
./script1.sh & 
./script2.sh & 
./script3.sh & 
./script4.sh &


I found the best solution proposed in A Foo Walks into a Bar... blog using build-in functionality of well know xargs tool First create a file commands.txt with list of commands you want to execute

myprog taskname1
myprog taskname2
myprog taskname3
myprog taskname4
...
myprog taskname123

and then pipe it to xargs like this to execute in 4 processes pool:

cat commands.txt | xargs -I CMD --max-procs=4 bash -c CMD

you can modify no of process


Following @Parag Sardas' answer and the documentation linked here's a quick script you might want to add on your .bash_aliases.

Relinking the doc link because it's worth a read

#!/bin/bash
# https://stackoverflow.com/a/19618159
# https://stackoverflow.com/a/51861820
#
# Example file contents:
# touch /tmp/a.txt
# touch /tmp/b.txt

if [ "$#" -eq 0 ];  then
  echo "$0 <file> [max-procs=0]"
  exit 1
fi

FILE=${1}
MAX_PROCS=${2:-0}
cat $FILE | while read line; do printf "%q\n" "$line"; done | xargs --max-procs=$MAX_PROCS -I CMD bash -c CMD

I.e. ./xargs-parallel.sh jobs.txt 4 maximum of 4 processes read from jobs.txt


You could probably do something clever with signals.

Note this is only to illustrate the concept, and thus not thoroughly tested.

#!/usr/local/bin/bash

this_pid="$$"
jobs_running=0
sleep_pid=

# Catch alarm signals to adjust the number of running jobs
trap 'decrement_jobs' SIGALRM

# When a job finishes, decrement the total and kill the sleep process
decrement_jobs()
{
  jobs_running=$(($jobs_running - 1))
  if [ -n "${sleep_pid}" ]
  then
    kill -s SIGKILL "${sleep_pid}"
    sleep_pid=
  fi
}

# Check to see if the max jobs are running, if so sleep until woken
launch_task()
{
  if [ ${jobs_running} -gt 3 ]
  then
    (
      while true
      do
        sleep 999
      done
    ) &
    sleep_pid=$!
    wait ${sleep_pid}
  fi

  # Launch the requested task, signalling the parent upon completion
  (
    "$@"
    kill -s SIGALRM "${this_pid}"
  ) &
  jobs_running=$((${jobs_running} + 1))
}

# Launch all of the tasks, this can be in a loop, etc.
launch_task task1
launch_task tast2
...
launch_task task99


This tested script runs 5 jobs at a time and will restart a new job as soon as it does (due to the kill of the sleep 10.9 when we get a SIGCHLD. A simpler version of this could use direct polling (change the sleep 10.9 to sleep 1 and get rid of the trap).

#!/usr/bin/bash

set -o monitor
trap "pkill -P $$ -f 'sleep 10\.9' >&/dev/null" SIGCHLD

totaljobs=15
numjobs=5
worktime=10
curjobs=0
declare -A pidlist

dojob()
{
  slot=$1
  time=$(echo "$RANDOM * 10 / 32768" | bc -l)
  echo Starting job $slot with args $time
  sleep $time &
  pidlist[$slot]=`jobs -p %%`
  curjobs=$(($curjobs + 1))
  totaljobs=$(($totaljobs - 1))
}

# start
while [ $curjobs -lt $numjobs -a $totaljobs -gt 0 ]
 do
  dojob $curjobs
 done

# Poll for jobs to die, restarting while we have them
while [ $totaljobs -gt 0 ]
 do
  for ((i=0;$i < $curjobs;i++))
   do
    if ! kill -0 ${pidlist[$i]} >&/dev/null
     then
      dojob $i
      break
     fi
   done
   sleep 10.9 >&/dev/null
 done
wait


Other answer about 4 shell scripts does not fully satisfies me as it assumes that all tasks take approximatelu the same time and because it requires manual set up. But here is how I would improve it.

Main script will create symbolic links to executables following certain namimg convention. For example,

ln -s executable1 ./01-task.01

first prefix is for sorting and suffix identifies batch (01-04). Now we spawn 4 shell scripts that would take batch number as input and do something like this

for t in $(ls ./*-task.$batch | sort ; do
   t
   rm t
done


Look at my implementation of job pool in bash: https://github.com/spektom/shell-utils/blob/master/jp.sh

For example, to run at most 3 processes of cURL when downloading from a lot of URLs, you can wrap your cURL commands as follows:

./jp.sh "My Download Pool" 3 curl http://site1/...
./jp.sh "My Download Pool" 3 curl http://site2/...
./jp.sh "My Download Pool" 3 curl http://site3/...
...


Here is my solution. The idea is quite simple. I create a fifo as a semaphore, where each line stands for an available resource. When reading the queue, the main process blocks if there is nothing left. And, we return the resource after the task is done by simply echoing anything to the queue.

function task() {
    local task_no="$1"
    # doing the actual task...
    echo "Executing Task ${task_no}"
    # which takes a long time
    sleep 1
}

function execute_concurrently() {
    local tasks="$1"
    local ps_pool_size="$2"

    # create an anonymous fifo as a Semaphore
    local sema_fifo
    sema_fifo="$(mktemp -u)"
    mkfifo "${sema_fifo}"
    exec 3<>"${sema_fifo}"
    rm -f "${sema_fifo}"

    # every 'x' stands for an available resource
    for i in $(seq 1 "${ps_pool_size}"); do
        echo 'x' >&3
    done

    for task_no in $(seq 1 "${tasks}"); do
        read dummy <&3 # blocks util a resource is available
        (
            trap 'echo x >&3' EXIT # returns the resource on exit
            task "${task_no}"
        )&
    done
    wait # wait util all forked tasks have finished
}

execute_concurrently 10 4

The script above will run 10 tasks and 4 each time concurrently. You can change the $(seq 1 "${tasks}") sequence to the actual task queue you want to run.


I made my modifications based on methods introduced in this Writing a process pool in Bash.

#!/bin/bash

#set -e   # this doesn't work here for some reason
POOL_SIZE=4   # number of workers running in parallel

#######################################################################
#                            populate jobs                            #
#######################################################################

declare -a jobs

for (( i = 1988; i < 2019; i++ )); do
    jobs+=($i)
done

echo '################################################'
echo '    Launching jobs'
echo '################################################'

parallel() {
    local proc procs jobs cur
    jobs=("$@")         # input jobs array
    declare -a procs=() # processes array
    cur=0               # current job idx

    morework=true
    while $morework; do
        # if process array size < pool size, try forking a new proc
        if [[ "${#procs[@]}" -lt "$POOL_SIZE" ]]; then
            if [[ $cur -lt "${#jobs[@]}" ]]; then
                proc=${jobs[$cur]}
                echo "JOB ID = $cur; JOB = $proc."

                ###############
                # do job here #
                ###############

                sleep 3 &

                # add to current running processes
                procs+=("$!")
                # move to the next job
                ((cur++))
            else
                morework=false
                continue
            fi
        fi

        for n in "${!procs[@]}"; do
            kill -0 "${procs[n]}" 2>/dev/null && continue
            # if process is not running anymore, remove from array
            unset procs[n]
        done
    done
    wait
}

parallel "${jobs[@]}"


xargs with -P and -L options does the job. You can extract the idea from the example below:

#!/usr/bin/env bash

workers_pool_size=10

set -e

function doit {
    cmds=""
    for e in 4 8 16; do
        for m in 1 2 3 4 5 6; do
            cmd="python3 ./doit.py --m $m -e $e -m $m"
            cmds="$cmd\n$cmds"
        done
    done
    echo -e "All commands:\n$cmds"
    echo "Workers pool size = $workers_pool_size"
    echo -e "$cmds" | xargs -t -P $workers_pool_size -L 1 time > /dev/null
}

doit


#! /bin/bash
doSomething() {
    <...>
}

getCompletedThreads() {
    _runningThreads=("$@")

    removableThreads=()
    for pid in "${_runningThreads[@]}"; do
        if ! ps -p $pid > /dev/null; then
            removableThreads+=($pid)
        fi
    done
    echo "$removableThreads"
}

releasePool() {
    while [[ ${#runningThreads[@]} -eq $MAX_THREAD_NO ]]; do
        echo "releasing"
        removableThreads=( $(getCompletedThreads "${runningThreads[@]}") )
        if [ ${#removableThreads[@]} -eq 0 ]; then
            sleep 0.2
        else
            for removableThread in "${removableThreads[@]}"; do
                runningThreads=( ${runningThreads[@]/$removableThread} ) 
            done
            echo "released"
        fi
    done
}

waitAllThreadComplete() {
    while [[ ${#runningThreads[@]} -ne 0 ]]; do
        removableThreads=( $(getCompletedThreads "${runningThreads[@]}") )
        for removableThread in "${removableThreads[@]}"; do
            runningThreads=( ${runningThreads[@]/$removableThread} ) 
        done

        if [ ${#removableThreads[@]} -eq 0 ]; then
            sleep 0.2
        fi
    done
}


MAX_THREAD_NO=10
runningThreads=()
sequenceNo=0

for i in {1..36}; do
    releasePool

    ((sequenceNo++))
    echo "added $sequenceNo"
    doSomething &

    pid=$!
    runningThreads+=($pid)
done

waitAllThreadComplete
0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新问答

问答排行榜