Thread scheduling Round Robin / scheduling dispatch
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <semaphore.h>
#define NUM_THREADS 4
#define COUNT_LIMIT 13
int done = 0;
int count = 0;
int quantum = 2;
int thread_ids[4] = {0,1,2,3};
int thread_runtime[4] = {0,5,4,7};
pthread_mutex_t count_mutex;
pthread_cond_t count_threshold_cv;
void * inc_count(void * arg);
static sem_t count_sem;
int quit = 0;
///////// Inc_Count////////////////
void *inc_count(void *t)
{
long my_id = (long)t;
int i;
sem_wait(&count_sem); /////////////CRIT SECTION//////////////////////////////////
printf("run_thread = %d\n",my_id);
printf("%d \n",thread_runtime[my_id]);
for( i=0; i < thread_runtime[my_id];i++)
{
printf("runtime= %d\n",thread_runtime[my_id]);
pthread_mutex_lock(&count_mutex);
count++;
if (count == COUNT_LIMIT) {
pthread_cond_signal(&count_threshold_cv);
printf("inc_count(): thread %ld, count = %d Threshold reached.\n", my_id,
count);
}
printf("inc_count(): thread %ld, count = %d, unlocking mutex\n",my_id, count);
pthread_mutex_unlock(&count_mutex);
sleep(1) ;
}//End For
sem_post(&count_sem); // Next Thread Enters Crit Section
pthread_exit(NULL);
}
/////////// Count_Watch ////////////////
void *watch_count(void *t)
{
long my_id = (long)t;
printf("Starting watch_count(): thread %ld\n", my_id);
pthread_mutex_lock(&count_mutex);
if (count<COUNT_LIMIT) {
pthread_cond_wait(&count_threshold_cv, &count_mutex);
printf("watch_count(): thread %ld Condition signal received.\n", my_id);
printf("watch_count(): thread %ld count now = %d.\n", my_id, count);
}
pthread_mutex_unlock(&count_mutex);
pthread_exit(NULL);
}
////////////////// Main ////////////////
int main (int argc, char *argv[])
{
int i;
long t1=0, t2=1, t3=2, t4=3;
pthread_t threads[4];
pthread_attr_t attr;
sem_init(&count_sem, 0, 1);
/* Initialize mutex and condition variable objects */
pthread_mutex_init(&count_mutex, NULL);
pthread_cond_init (&count_threshold_cv, NULL);
/* For portability, explicitly create threads in a joinable state */
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
pthread_create(&threads[0], &attr, watch_count, (void *)t1);
pthread_create(&threads[1], &attr, inc_count, (void *)t2);
pthread_create(&threads[2], &attr, inc_count, (void *)t3);
pthread_create(&threads[3], &attr, inc_count, (void *)t4);
/* Wait for all threads to complete */
for (i=0; i<NUM_THREADS; i++) {
pthread_join(threads[i], NULL);
}
printf ("Main(): Waited on %d threads. Done.\n", NUM_THREADS);
/* Clean up and exit */
pthread_attr_destroy(&attr);
pthread_mutex_destroy(&count_mutex);
pthread_cond_destroy(&count_threshold_cv);
pthread_exit(NULL);
}
I am trying to learn thread scheduling, there is a lot of technical coding that I don't know. I do know in theory how it should work, but having trouble getting started in code...
I know, at least I think, this program is not real time and its not meant to be. Some how I need to create a scheduler dispatch to control the threads in the order they should run... RR FCFS SJF etc.
Right now I don't have a dispatcher. What I do have is semaphores/ mutex to control the threads.
This code does run FCFS... and I have been trying to use semaphores to create a RR.. but having a lot of trouble. I believe it would be easier to create a dispatcher but I don't know how.
I need help, I am not looking for answers just direction.. some sample code will help to understand a bit more.
Okay, to help understand, my first thought was using semaphores and trying to create a loop such that when one thread runs lets say 2 times that t开发者_C百科hread waits for the other threads to run two times or until the run time is up.
What I had problems with was that there seemed to be no good way of synchronizing the threads this way. Unless there is a way to make a unique semaphore for each thread. This is why I would like some help or guidance in creating a dispatcher function.
Thank you.
First, the OS is the only entity in your system that can actually schedule your threads to run. The most common schedulers in newer Linux kernels are static-priority FCFS and RR, as well as the SCHED_OTHER scheduler, now implemented by the completely-fair scheduler.
It seems that you are confusing the notion of "OS-level scheduling" v.s. "application-level scheduling". The former knows nothing about your application nor its semantics. The later must be implemented using tools such as semaphores, queues, etc...
One approach to implementing a set of threads executing in a FCFS manner would be create a FIFO queue, protect it with a mutex, and within this queue put tokens that allow threads to know when it is their turn to run.
The psuedo-code for a thread would be:
while (1)
lock_mutex()
next = pop_queue()
if (next == me)
do_my_work()
unlock_mutex()
break
unlock_muteX()
Note that this example shouldn't be used as-is. It requires careful coordination between consumer and producer, as well as other consumers. It also doesn't address more detailed semantics such as should work be serialized, or simply the beginning of work to be FCFS, or the relation between the number of threads and available CPUs.
The code that you are implementing is not really a scheduler at all. To develop a dispatcher, you have to activate the TICK interrupt service routine of your core, and on the interrupt code, you have to make a system call that search what task have to run, then your dispatcher make a context switch in assembler, so you can call the next task function. Every task have his own piece of memory on the HEAP, you can initialize the tasks with a malloc() function, and you have to divide that piece of memory because a part is going to be your virtual stack and the other part is going to be your stack frame that contains all the system registers involved on a context switch. For the schedule() function that tell you what task is the next, you have two basic states for each task (ready and run), if a task is on ready mode, you can put it on run mode depending on the priority of the task.
The context switch save all the registers when a tick interrupt take place, so you can return at that point of your code when the dispatcher call again the interrupted task. Here is an example for a MIPS architecture.
int schedule(void){
int lub_CurrentTskOrder = 0;
int lub_IndexTask = 0;
int lub_NextTsk2Run = 0;
int lub_x = 0;
int lub_y = 0;
/* SEARCH THE RUNNING CURRENT TASK */
for(lub_x = 0; lub_x < rub_InitializedTasks; lub_x++) {
if(rs_SchedTask[lub_x].ub_task_status == tskRun) {
rs_SchedTask[lub_x].ub_task_status = tskReady;
lub_IndexTask = lub_x;
}
}
if(rub_FirstDispatch == FALSE) {
rs_SchedTask[lub_IndexTask].ub_task_status = tskRun;
return lub_IndexTask;
}
for(lub_x = 0; lub_x < MAX_TASKS_NUMBER; lub_x++) {
if(rs_SchedTask[lub_IndexTask].ub_ExecutionOrder == (rub_InitializedTasks - 1))
lub_CurrentTskOrder = 0;
else
lub_CurrentTskOrder = rs_SchedTask[lub_IndexTask].ub_ExecutionOrder + 1;
/* SEARCH THE NEXT EXECUTION TASK IN READY MODE */
for(lub_y = 0; lub_y < MAX_TASKS_NUMBER; lub_y++) {
if((rs_SchedTask[lub_y].ub_task_status == tskReady) && (rs_SchedTask[lub_y].ub_ExecutionOrder == lub_CurrentTskOrder)) {
rs_SchedTask[lub_y].ub_task_status = tskRun;
return lub_y;
}
else if((rs_SchedTask[lub_y].ub_task_status != tskReady) && (rs_SchedTask[lub_y].ub_ExecutionOrder == lub_CurrentTskOrder)) {
lub_IndexTask = lub_y;
break;
}
}
}
return(0);
}
/* INITIALIZE A STACK ON THE HEAP FOR AN SPECIFIC TASK */
S_PID sched_alloc(T_UBYTE lub_TaskNumber, S_TASK *lps_TaskStart)
{
T_ULONG lul_x = 0;
S_PID ls_pid_t;
if((rub_InitializedTasks <= MAX_TASKS_NUMBER) && (rs_SchedTask[rub_InitializedTasks].ub_StackInit != TRUE)) {
rs_SchedTask[rub_InitializedTasks].ub_ExecutionOrder = lub_TaskNumber;
rs_SchedTask[rub_InitializedTasks].pfu_Entry = lps_TaskStart->pfu_Entry; // (1) STORE THE TASK ADDRESS TO INITIALIZE THE SCHEDULER
rs_SchedTask[rub_InitializedTasks].pul_TaskFrame = malloc(((TASK_CONTEXT_STACK + lps_TaskStart->ul_StackSize) * 4) + 1); // (2) CREATES A FRAME ON THE HEAP FOR THE CURRENT TASK
ls_pid_t.pul_TaskFrame = rs_SchedTask[rub_InitializedTasks].pul_TaskFrame;
for(lul_x = 0; lul_x < (TASK_CONTEXT_STACK + lps_TaskStart->ul_StackSize); lul_x++) { // (3) CLEAN ALL THE REGISTER SPACES ON THE CURRENT STACK
rs_SchedTask[rub_InitializedTasks].pul_TaskFrame++;
*rs_SchedTask[rub_InitializedTasks].pul_TaskFrame = 0x00;
}
rs_SchedTask[rub_InitializedTasks].pul_TaskFrame -= (TASK_CONTEXT_STACK + lps_TaskStart->ul_StackSize); // (4) RETURN TO THE STACK POSITION
rs_SchedTask[rub_InitializedTasks].pul_TaskFrame++;
*rs_SchedTask[rub_InitializedTasks].pul_TaskFrame = (T_ULONG)lps_TaskStart->pfu_Entry; // (5) SAVE THE RETURN ADDRESS FOR THE TASK
rs_SchedTask[rub_InitializedTasks].pul_TaskFrame--;
rs_SchedTask[rub_InitializedTasks].pul_TaskStack = rs_SchedTask[rub_InitializedTasks].pul_TaskFrame + TASK_CONTEXT_STACK; // (6) SET STACK FRAME ROOM FOR THE TASK
rs_SchedTask[rub_InitializedTasks].pul_TaskStack += 0x54; // (7) MAKE ROOM FOR REGISTERS ON STACK FRAME
*rs_SchedTask[rub_InitializedTasks].pul_TaskFrame = (T_ULONG)rs_SchedTask[rub_InitializedTasks].pul_TaskStack;
rps_CurrentTask = &rs_SchedTask[rub_InitializedTasks];
asm_dispatcher_save_stack_pointer_on_stack;
rs_SchedTask[rub_InitializedTasks].ub_StackInit = TRUE; // (8) INDICATES THAT THE TASK IS ALLREADY INITIALIZED ON STACK
rs_SchedTask[rub_InitializedTasks].psb_TaskName = lps_TaskStart->psb_TaskName;
rub_InitializedTasks++; // (9) THIS IS THE INITIALIZED TASKS COUNTER FOR PUBLIC USE
ls_pid_t.pfu_Entry = lps_TaskStart->pfu_Entry; // (10) SAVE THE PID VALUES
ls_pid_t.psb_TaskName = lps_TaskStart->psb_TaskName;
return(ls_pid_t); // (11) RETURN PID
}
}
The first function is the one that returns the next task to run, so you can call the dispatcher to make a context switch. The TICK interrupt service routine call the assembler dispatcher like the next code.
void __interrupt(TICK) TICK_ISR(void)
{
atomic_cstart();
mips_r3000_reset_interval_timer();
rub_CurrentTask = schedule();
// save current context
__asm volatile \
( \
"and $k1,$k1,$zero \n\t" \
"or $k1,$k1,rps_CurrentTask \n\t" \
"lw $k1,0x0000($k1) ;Pointer to Task Structure \n\t" \
"lw $k1,0x0004($k1) ;Pointer to Task Stack Frame \n\t" \
"sw $sp,0x0000($k1) ;Save GPR $sp \n\t" \
"and $sp,$sp,$zero \n\t" \
"or $sp,$sp,$k1 ;Pointer to Task Stack Frame \n\t" \
"mfc0 $k1,$ER \n\t" \
"sw $k1,0x0004($sp) ;Save Return Address \n\t" \
"sw $s0,0x0008($sp) ;Save GPR $s0 \n\t" \
"sw $s1,0x000c($sp) ;Save GPR $s1 \n\t" \
"sw $s2,0x0010($sp) ;Save GPR $s2 \n\t" \
"sw $s3,0x0014($sp) ;Save GPR $s3 \n\t" \
"sw $s4,0x0018($sp) ;Save GPR $s4 \n\t" \
"sw $s5,0x001c($sp) ;Save GPR $s5 \n\t" \
"sw $s6,0x0020($sp) ;Save GPR $s6 \n\t" \
"sw $s7,0x0024($sp) ;Save GPR $s7 \n\t" \
"sw $s8,0x0028($sp) ;Save GPR $fp \n\t" \
"lw $k1,0x0000($sp) \n\t" \
"and $sp,$sp,$zero \n\t" \
"or $sp,$sp,$k1 ;Restore GPR $sp \n\t" \
"and $k1,$k1,$zero \n\t" \
)
rps_CurrentTask = &rs_SchedTask[rub_CurrentTask];
// restore current context
__asm volatile \
( \
"and $sp,$sp,$zero \n\t" \
"or $sp,$sp,rps_CurrentTask \n\t" \
"lw $sp,0x0000($sp) ;Pointer to Task Structure \n\t" \
"lw $sp,0x0004($sp) ;Pointer to Task Stack Frame \n\t" \
"lw $k1,0x0004($sp) \n\t" \
"mtc0 $k1,$ER ;Restore Return Address \n\t" \
"lw $s0,0x0008($sp) ;Restore GPR $s0 \n\t" \
"lw $s1,0x000c($sp) ;Restore GPR $s1 \n\t" \
"lw $s2,0x0010($sp) ;Restore GPR $s2 \n\t" \
"lw $s3,0x0014($sp) ;Restore GPR $s3 \n\t" \
"lw $s4,0x0018($sp) ;Restore GPR $s4 \n\t" \
"lw $s5,0x001c($sp) ;Restore GPR $s5 \n\t" \
"lw $s6,0x0020($sp) ;Restore GPR $s6 \n\t" \
"lw $s7,0x0024($sp) ;Restore GPR $s7 \n\t" \
"lw $s8,0x0028($sp) ;Restore GPR $fp \n\t" \
"lw $k1,0x0000($sp) \n\t" \
"and $sp,$sp,$zero \n\t" \
"or $sp,$sp,$k1 ;Restore GPR $sp \n\t" \
"and $k1,$k1,$zero \n\t" \
)
atomic_cend();
}
The operations are atomic, because you have to disable interrupts to save and restore the current context for the dispatcher assembler routine.
精彩评论