How can my threaded image generating app get it's data to the gui?
A slow multiple precision implementation of a mandelbrot generator. Threaded, using POSIX threads. Gtk GUI.
I've got a bit lost. This is my first attempt at writing a threaded program. I'm not actually trying to convert the single-threaded version of it yet, just trying to implement the basic framework.
A brief description of how it works so far:
Main creates the watch_render_start thread, which waits for a pthread_cond_signal, which is signalled by the GUI callback when 'render' button clicked.
watch_render_start checks if image is already rendering, checks for quit, etc, but if all goes well it creates the render_create_threads thread.
The render_create_threads thread then creates the render threads, then uses pthread_join to wait for them to finish (and does some timing stuff with get_time_of_day - is that bad in threads?).
The entry point of the render threads (imaginatively) called render, loops while next_line calculation func returns TRUE for more lines to process. in this while loop, there's checks for stop or quit.
The next_line func gets the line it is to calculate before incrementing the variable to indicate the next line for the next thread to calculate. It returns if the line it is to process is beyond the image height. If not then it calculates the contents of the line. Then increments lines_done and checks it against height of image and returns 0 if >= or 1 if <.
Here's all 470+ lines of code, i'm sure you'll have fun looking at it.
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <mpfr.h>
#include <string.h>
#include <gtk/gtk.h>
#include <sys/time.h>
/* build with:
gcc threaded_app.c -o threaded_app -Wall -pedantic -std=gnu99 -lgmp -lmpfr -pthread -D_REENTRANT -ggdb `pkg-config --cflags gtk+-2.0` `pkg-config --libs gtk+-2.0`
*/
typedef struct
{
struct timeval tv_start;
struct timeval tv_end;
} Timer;
void timer_start(Timer* t)
{
gettimeofday(&t->tv_start, 0);
}
void timer_stop(Timer* t)
{
gettimeofday(&t->tv_end, 0);
}
long timer_get_elapsed(Timer* t)
{
if (t->tv_start.tv_sec == t->tv_end.tv_sec)
return t->tv_end.tv_usec - t->tv_start.tv_usec;
else
return (t->tv_end.tv_sec - t->tv_start.tv_sec) *
1e6 + (t->tv_end.tv_usec - t->tv_start.tv_usec);
}
#define NTHREADS 8
#define IMG_WIDTH 480
#define IMG_HEIGHT 360
typedef struct
{
int rc;
pthread_t thread;
} rthrds;
typedef struct
{
int* arr;
int next_line;
int lines_done;
int rendering;
int start;
int stop;
pthread_t rend[NTHREADS];
int all_quit;
int width;
int height;
double xmin, xmax, ymax;
int depth;
} image_info;
static gboolean delete_event(GtkWidget *widget,
GdkEvent *event,
gpointer data);
static void destroy(GtkWidget *widget, gpointer data);
void gui_start_render(GtkWidget* widget, gpointer data);
void gui_stop_render(GtkWidget* widget, gpointer data);
static GtkWidget* gui_pbar = NULL;
void *render(void* ptr);
int next_line(image_info* img);
void* watch_render_start(void* ptr);
void* watch_render_stop(void* ptr);
void* watch_render_done(void* ptr);
void* threads_render_create(void* ptr);
pthread_mutex_t next_line_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t lines_done_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t img_start_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t img_stop_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t img_rendering_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t img_start_cond = PTHREAD_COND_INITIALIZER;
pthread_cond_t img_stop_cond = PTHREAD_COND_INITIALIZER;
pthread_cond_t img_done_cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t all_quit_mutex = PTHREAD_MUTEX_INITIALIZER;
int main(int argc, char **argv)
{
printf("initializing...\n");
image_info* img = malloc(sizeof(image_info));
memset(img, 0, sizeof(image_info));
img->start = 0;
img->width = IMG_WIDTH;
img->height = IMG_HEIGHT;
img->xmin = -0.75509089265046296296296259;
img->xmax = -0.75506025752314814814814765;
img->ymax = 0.050215494791666666666666005;
img->depth = 30000;
size_t arr_size = img->width * img->height * sizeof(int);
printf("creating array size: %ld bytes\n", arr_size);
img->arr = malloc(arr_size);
if (!img->arr)
{
fprintf(stderr, "image dimension too large!\n");
free(img);
exit(-1);
}
memset(img->arr, 0, arr_size);
int rc_err;
pthread_t thread_start;
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
printf("creating watch render start thread...\n");
rc_err = pthread_create(&thread_start, &attr,
&watch_render_start, (void*)img);
if (rc_err)
{
fprintf(stderr, "Thread start creation failed: %d\n",
rc_err);
free(img->arr);
free(img);
exit(-1);
}
printf("creating GUI...\n");
G开发者_高级运维tkWidget *window;
GtkWidget *startbutton;
GtkWidget *stopbutton;
GtkWidget *box1;
gtk_init (&argc, &argv);
window = gtk_window_new (GTK_WINDOW_TOPLEVEL);
g_signal_connect (G_OBJECT (window), "delete_event",
G_CALLBACK (delete_event), NULL);
g_signal_connect (G_OBJECT (window), "destroy",
G_CALLBACK (destroy), NULL);
gtk_container_set_border_width (GTK_CONTAINER (window), 10);
box1 = gtk_hbox_new(FALSE, 0);
gtk_container_add(GTK_CONTAINER(window), box1);
startbutton = gtk_button_new_with_label ("Start render");
g_signal_connect (G_OBJECT (startbutton), "clicked",
G_CALLBACK (gui_start_render), img);
gtk_box_pack_start(GTK_BOX(box1), startbutton, TRUE, TRUE, 0);
stopbutton = gtk_button_new_with_label ("Stop render");
g_signal_connect (G_OBJECT (stopbutton), "clicked",
G_CALLBACK (gui_stop_render), img);
gtk_box_pack_start(GTK_BOX(box1), stopbutton, TRUE, TRUE, 0);
gui_pbar = gtk_progress_bar_new();
gtk_progress_bar_set_orientation(GTK_PROGRESS_BAR(gui_pbar),
GTK_PROGRESS_LEFT_TO_RIGHT);
gtk_progress_bar_set_fraction (GTK_PROGRESS_BAR(gui_pbar),
(gfloat)1.0 ); /* img->real_height); */
gtk_widget_set_size_request(gui_pbar, 75, 0);
gtk_box_pack_end(GTK_BOX(box1), gui_pbar, FALSE, FALSE, 0);
gtk_widget_show(startbutton);
gtk_widget_show(stopbutton);
gtk_widget_show(box1);
gtk_widget_show(window);
printf("starting GUI\n");
gtk_main ();
printf("************************\n"
"GUI shutdown\n"
"************************\n");
printf("setting all_quit\n");
pthread_mutex_lock(&all_quit_mutex);
img->all_quit = 1;
pthread_mutex_unlock(&all_quit_mutex);
printf("signalling watch render start thread to wakeup...\n");
pthread_mutex_lock(&img_start_mutex);
pthread_cond_signal(&img_start_cond);
pthread_mutex_unlock(&img_start_mutex);
printf("waiting for watch render start thread to quit...\n");
pthread_join(thread_start, NULL);
printf("done\n");
printf("freeing memory\n");
free(img->arr);
free(img);
printf("goodbye!\n");
exit(0);
}
void gui_start_render(GtkWidget* widget, gpointer ptr)
{
image_info* img = (image_info*)ptr;
printf("************\n"
"GUI signalling to start render...\n"
"************\n");
pthread_mutex_lock(&img_start_mutex);
img->start = 1;
pthread_cond_signal(&img_start_cond);
pthread_mutex_unlock(&img_start_mutex);
}
void gui_stop_render(GtkWidget* widget, gpointer ptr)
{
image_info* img = (image_info*)ptr;
printf("************\n"
"GUI signalling to stop render...\n"
"************\n");
pthread_mutex_lock(&img_stop_mutex);
img->stop = 1;
pthread_mutex_unlock(&img_stop_mutex);
}
void* watch_render_start(void* ptr)
{
image_info* img = (image_info*)ptr;
int rc_err;
pthread_t render_thread;
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
int r;
int quit = 0;
for(;;)
{
printf("watch_render_start: waiting for img_start_cond\n");
pthread_mutex_lock(&img_start_mutex);
if (!img->start)
pthread_cond_wait(&img_start_cond, &img_start_mutex);
img->start = 0;
pthread_mutex_unlock(&img_start_mutex);
printf("watch_render_start: recieved img_start_cond\n");
pthread_mutex_lock(&img_rendering_mutex);
r = img->rendering;
pthread_mutex_unlock(&img_rendering_mutex);
printf("checking if we are rendering... ");
if (r)
{
printf("yes\nStopping render...\n");
pthread_mutex_lock(&img_stop_mutex);
img->stop = 1;
pthread_cond_signal(&img_stop_cond);
pthread_mutex_unlock(&img_stop_mutex);
pthread_join(render_thread, NULL);
printf("render stopped\n");
}
else
printf("no\n");
pthread_mutex_lock(&all_quit_mutex);
quit = img->all_quit;
pthread_mutex_unlock(&all_quit_mutex);
if (quit)
{
printf("exiting watch render start thread\n");
pthread_exit(0);
}
printf("creating render thread...\n");
rc_err = pthread_create(&render_thread, &attr,
&threads_render_create, (void*)img);
if (rc_err)
pthread_exit(0);
}
}
void* threads_render_create(void* ptr)
{
Timer timing_info;
printf("initializing render thread\n");
image_info* img = (image_info*)ptr;
pthread_mutex_lock(&img_rendering_mutex);
img->rendering = 1;
pthread_mutex_unlock(&img_rendering_mutex);
pthread_mutex_lock(&lines_done_mutex);
img->lines_done = 0;
pthread_mutex_unlock(&lines_done_mutex);
pthread_mutex_lock(&img_stop_mutex);
img->stop = 0;
pthread_mutex_unlock(&img_stop_mutex);
pthread_mutex_lock(&next_line_mutex);
img->next_line = 0;
pthread_mutex_unlock(&next_line_mutex);
int rc_err, i;
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
timer_start(&timing_info);
for (i = 0; i < NTHREADS; ++i)
{
printf("creating renderer thread #%d...\n", i);
rc_err = pthread_create(&img->rend[i], &attr,
&render, (void*)img);
if (rc_err)
{
fprintf(stderr, "\nrender thread #%d creation failed: %d\n",
i, rc_err);
return 0;
}
}
for (i = 0; i < NTHREADS; ++i)
{
printf("joining renderer thread #%d...\n", i);
pthread_join(img->rend[i], NULL);
}
timer_stop(&timing_info);
printf("render-time %.3fs\n\n",
timer_get_elapsed(&timing_info) / (double)1e6);
printf("all renderer threads finished\n");
pthread_mutex_lock(&img_stop_mutex);
img->stop = 0;
pthread_mutex_unlock(&img_stop_mutex);
pthread_mutex_lock(&img_rendering_mutex);
img->rendering = 0;
pthread_mutex_unlock(&img_rendering_mutex);
printf("at end of threads_render_create\n");
pthread_mutex_lock(&lines_done_mutex);
if (img->lines_done >= img->height)
printf("image complete\n");
else
printf("image interuppted\n");
pthread_mutex_unlock(&lines_done_mutex);
pthread_mutex_lock(&img_start_mutex);
img->start = 0;
pthread_mutex_unlock(&img_start_mutex);
printf("exiting render thread\n");
pthread_exit(NULL);
}
void* render(void* ptr)
{
image_info* img = (image_info*)ptr;
int quit = 0;
printf("starting render..\n");
while(next_line(img) && !quit)
{
pthread_mutex_lock(&img_stop_mutex);
quit = img->stop;
pthread_mutex_unlock(&img_stop_mutex);
pthread_mutex_lock(&all_quit_mutex);
quit |= img->all_quit;
pthread_mutex_unlock(&all_quit_mutex);
}
printf("exiting render thread\n");
pthread_exit(0);
}
int next_line(image_info* img)
{
int line;
pthread_mutex_lock(&next_line_mutex);
line = img->next_line++;
pthread_mutex_unlock(&next_line_mutex);
if (line >= img->height)
return 0;
int ix,wz;
int img_width = img->width;
long double x,y,x2,y2,wre=0,wim=0,wre2=0,wim2=0;
long double xmin = img->xmin, xmax = img->xmax, ymax = img->ymax;
long double xdiff = xmax - xmin;
int depth = img->depth;
long double c_im = 0, c_re = 0;
y = ymax - (xdiff / (long double)img_width)
* (long double)line;
y2 = y * y;
for (ix = 0; ix < img_width; ++ix)
{
x = ((long double)ix / (long double)img_width) * xdiff + xmin;
x2 = x * x;
wre = x;
wim = y;
wre2 = x2;
wim2 = y2;
for (wz = 0; wz < depth; ++wz)
{
wim = 2.0 * wre * wim + c_im;
wre = wre2 - wim2 + c_re;
wim2 = wim * wim;
wre2 = wre * wre;
if (wim2 + wre2 > 4.0F)
break;
}
if (wz == depth + 1)
wz = 0;
img->arr[line * img_width + ix] = wz;
}
printf("line %d complete\n", line);
pthread_mutex_lock(&lines_done_mutex);
img->lines_done++;
if (img->lines_done == img->height)
{
pthread_mutex_unlock(&lines_done_mutex);
return 0;
}
pthread_mutex_unlock(&lines_done_mutex);
return 1;
}
static gboolean delete_event(GtkWidget *widget,
GdkEvent *event,
gpointer data)
{
return FALSE;
}
static void destroy(GtkWidget *widget, gpointer data)
{
gtk_main_quit ();
}
I've got this far and need some pointers on how to proceed. For each problem I face I just see a confusing maze of a solution leading down a dead end!
I wanted to tackle the progress bar first. The gui will need to put locks on lines_done. But how is it to know when to do this? How often will it look at lines_done? I guess I could use g_idle_add for this.
Then the real meaty problem of actually rendering the data that all those happy threads are generating. As discussed on another question, I'll have an array of flags to indicate which lines are actually rendered (because they will render in arbitrary order due to nature of threading and os schedulers). But how will the GUI check these? In the same idle callback as the progress bar? And say a big 8000 pixel tall image is being generated, that's 8000 mutex locks and unlocks every so many milliseconds - that's gotta cost right?
So how should I proceed here? Is this model I'm using, whatever it is, capable of doing what I want?
If you have access to atomic reads and atomic writes on your platform(s) then create a work allocation table (read the architecture notes for your platforms - it may or may not be that ordinary reads and writes are good enough, you may or may not need to add memory barriers):
One byte per line, initially zero, non-zero means the thread the line is allocated to
...and create an atomically updated count of lines done int field per worker thread. The table should be updated and read using atomic read/write instructions (so in chunks of 8,16,32 or 64bits depending on the available instructions on the platform).
The top level logic must work out whether to just do all the work straight away on the main thread (if the image is really small) or to start one worker thread, or to start N worker threads.
The coordination thread (or if it was me probably I'd scrap the coordination thread and do this on the main thread) allocates half the jobs in round robin to the threads (or all the work if there's less than a certain amount). If it allocates less than all the work it monitors the other threads and benchmarks the performance of average thread and best thread. It makes sure that threads as a whole do not run out of work, but tries not to leave threads with nothing to do.
The front-end keeps a pointer for each worker to where in the allocation table a worker has done up to, and when the worker increases its integer field for how many lines it has done, the front-end searches forwards through the work allocation table finding the line indices of the jobs allocated to that worker that are now completed, and updates a bit buffer of which specific lines are done and also updates a total done field.
--
This is a general algorithm for dynamically allocating the work to the threads, as another poster has suggested you can alternatively statically allocate the work by making the line numbers the worker thread should process a function of the number of worker threads and the worker thread number, then just pass count of lines done by each worker through an atomic field to the front end.
To reduce number of mutexes:-
Have one mutex for access to a bit buffer of lines signalled as done (8000/8bits = 1000byte buffer).
A second temporary bit buffer.
Worker thread locks mutex, sets bit in first buffer and unlocks mutex.
Main loop locks mutex, copies first buffer to second and unlocks mutex.
Then scans second buffer for non-zero, and for each set bit, copies data for that line to output / screen.
To reduce the contention on the first bit buffer you could partition the first bit buffer into 8 or even 16 segments (which segment to look in us based on the line number mod 8 or mod 16) and have a mutex for each segment.
--
Probably the way to go is to use the design I was suggesting but "try_lock" (rather than wait for) the locks, do a couple of NOPs and retry until they become available rather than yielding. It may be worth using atomic inc/dec directly rather than pthread mutexes for higher performance.
Finally it is not worth having 8 threads unless you have 8 processors, and I don't know about get_time_of_day.
Edit: There is perhaps a flaw with what I suggest that if the main thread is preempted whilst it has locked a bit buffer mutex, that the other threads waste a load of time. The frequency of this happening might be reduced by lowering the priorioty of the other threads but I think a better overall strategy is to use an array of 8000 atomic_t types with the atomic inc/dec instructions to signal line completion from worker threads to main thread. These 8000 atomic_t's can be searched by the main thread. I kind of assumed too that you'd reduce the number of worker threads to be one less than the number of CPUs.
Edit: Eight threads seems a bit arbitrary. Where did you get this number from? Obviously you need at least one worker thread.
Edit: Even faster would be to use atomic_set_mask to set bits in a 1000 byte buffer that the front end scans in a loop.
Edit: Assuming you have atomic_set_mask on your platform.
Use a condition variable along with your next_line_mutex. The render to GUI function can keep a variable with the last line that it rendered and compare that to the next_line variable whenever the condition fires, so that it can see what lines it needs to render. The next_line function can fire the condition.
As pointed out to me, the above condition variable would result in the GUI locking up, so it isn't a great idea. Instead, I suppose the GUI should check the lines variable on a time interval, perhaps once each second.
If the performance of 8000 lock/unlock operations is too slow, then I would recommend doing the lines in batches of 3, 5, 7, or even 8 (for 8 threads). If you assign each thread a different number of lines to process and each line takes about the same amount of processing time, then the lock is more likely to be uncontended when it is taken. Uncontended locks are very cheap, although still more expensive than a normal CPU operation (it has to pull the cache line from the last CPU that used it). This would be easy to do by making next_line be next_lines(img, 8)
精彩评论