app: allow progressive execution of parallel async operations
In the gimp_parallel_run_async() family of functions, allow the async callback to return without completing the async operation, in which case the callback will be called again, until the operation is either completed, or canceled, in which case it is aborted (previously, returning from the callback without completing the operation would cause it to be aborted.) It is guaranteed that all operations of the same priority will get a chance to run, even if some of them contuinuosly return without completing. This allows potentially time-consuming operations to yield execution in favor of other same-priority operations, and, in particular, of higher-priority operations, to avoid priority inversion. Essentially, this allows a simple form of cooperative multitasking among async operations.
This commit is contained in:
@ -90,19 +90,22 @@ typedef struct
|
||||
|
||||
/* local function prototypes */
|
||||
|
||||
static void gimp_parallel_notify_num_processors (GimpGeglConfig *config);
|
||||
static void gimp_parallel_notify_num_processors (GimpGeglConfig *config);
|
||||
|
||||
static void gimp_parallel_set_n_threads (gint n_threads,
|
||||
gboolean finish_tasks);
|
||||
static void gimp_parallel_set_n_threads (gint n_threads,
|
||||
gboolean finish_tasks);
|
||||
|
||||
static void gimp_parallel_run_async_set_n_threads (gint n_threads,
|
||||
gboolean finish_tasks);
|
||||
static gpointer gimp_parallel_run_async_thread_func (GimpParallelRunAsyncThread *thread);
|
||||
static void gimp_parallel_run_async_execute_task (GimpParallelRunAsyncTask *task);
|
||||
static void gimp_parallel_run_async_cancel (GimpAsync *async);
|
||||
static void gimp_parallel_run_async_set_n_threads (gint n_threads,
|
||||
gboolean finish_tasks);
|
||||
static gpointer gimp_parallel_run_async_thread_func (GimpParallelRunAsyncThread *thread);
|
||||
static void gimp_parallel_run_async_enqueue_task (GimpParallelRunAsyncTask *task);
|
||||
static GimpParallelRunAsyncTask * gimp_parallel_run_async_dequeue_task (void);
|
||||
static gboolean gimp_parallel_run_async_execute_task (GimpParallelRunAsyncTask *task);
|
||||
static void gimp_parallel_run_async_abort_task (GimpParallelRunAsyncTask *task);
|
||||
static void gimp_parallel_run_async_cancel (GimpAsync *async);
|
||||
|
||||
static void gimp_parallel_distribute_set_n_threads (gint n_threads);
|
||||
static gpointer gimp_parallel_distribute_thread_func (GimpParallelDistributeThread *thread);
|
||||
static void gimp_parallel_distribute_set_n_threads (gint n_threads);
|
||||
static gpointer gimp_parallel_distribute_thread_func (GimpParallelDistributeThread *thread);
|
||||
|
||||
|
||||
/* local variables */
|
||||
@ -185,50 +188,13 @@ gimp_parallel_run_async_full (gint priority,
|
||||
|
||||
if (gimp_parallel_run_async_n_threads > 0)
|
||||
{
|
||||
GList *link;
|
||||
GList *iter;
|
||||
|
||||
link = g_list_alloc ();
|
||||
link->data = task;
|
||||
|
||||
g_object_set_data (G_OBJECT (async),
|
||||
"gimp-parallel-run-async-link", link);
|
||||
|
||||
g_signal_connect_after (async, "cancel",
|
||||
G_CALLBACK (gimp_parallel_run_async_cancel),
|
||||
NULL);
|
||||
|
||||
g_mutex_lock (&gimp_parallel_run_async_mutex);
|
||||
|
||||
for (iter = g_queue_peek_tail_link (&gimp_parallel_run_async_queue);
|
||||
iter;
|
||||
iter = g_list_previous (iter))
|
||||
{
|
||||
GimpParallelRunAsyncTask *other_task =
|
||||
(GimpParallelRunAsyncTask *) iter->data;
|
||||
|
||||
if (other_task->priority <= task->priority)
|
||||
break;
|
||||
}
|
||||
|
||||
if (iter)
|
||||
{
|
||||
link->prev = iter;
|
||||
link->next = iter->next;
|
||||
|
||||
iter->next = link;
|
||||
|
||||
if (link->next)
|
||||
link->next->prev = link;
|
||||
else
|
||||
gimp_parallel_run_async_queue.tail = link;
|
||||
|
||||
gimp_parallel_run_async_queue.length++;
|
||||
}
|
||||
else
|
||||
{
|
||||
g_queue_push_head_link (&gimp_parallel_run_async_queue, link);
|
||||
}
|
||||
gimp_parallel_run_async_enqueue_task (task);
|
||||
|
||||
g_cond_signal (&gimp_parallel_run_async_cond);
|
||||
|
||||
@ -236,7 +202,7 @@ gimp_parallel_run_async_full (gint priority,
|
||||
}
|
||||
else
|
||||
{
|
||||
gimp_parallel_run_async_execute_task (task);
|
||||
while (gimp_parallel_run_async_execute_task (task));
|
||||
}
|
||||
|
||||
return async;
|
||||
@ -274,7 +240,7 @@ gimp_parallel_run_async_independent (GimpParallelRunAsyncFunc func,
|
||||
/* ^-- avoid "unused result" warning */
|
||||
#endif
|
||||
|
||||
gimp_parallel_run_async_execute_task (task);
|
||||
while (gimp_parallel_run_async_execute_task (task));
|
||||
|
||||
return NULL;
|
||||
},
|
||||
@ -516,27 +482,12 @@ gimp_parallel_run_async_set_n_threads (gint n_threads,
|
||||
GimpParallelRunAsyncTask *task;
|
||||
|
||||
/* finish remaining tasks */
|
||||
while ((task = (GimpParallelRunAsyncTask *)
|
||||
g_queue_pop_head (&gimp_parallel_run_async_queue)))
|
||||
while ((task = gimp_parallel_run_async_dequeue_task ()))
|
||||
{
|
||||
g_object_set_data (G_OBJECT (task->async),
|
||||
"gimp-parallel-run-async-link", NULL);
|
||||
|
||||
if (finish_tasks)
|
||||
{
|
||||
gimp_parallel_run_async_execute_task (task);
|
||||
}
|
||||
while (gimp_parallel_run_async_execute_task (task));
|
||||
else
|
||||
{
|
||||
if (task->user_data && task->user_data_destroy_func)
|
||||
task->user_data_destroy_func (task->user_data);
|
||||
|
||||
gimp_async_abort (task->async);
|
||||
|
||||
g_object_unref (task->async);
|
||||
|
||||
g_slice_free (GimpParallelRunAsyncTask, task);
|
||||
}
|
||||
gimp_parallel_run_async_abort_task (task);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -551,22 +502,31 @@ gimp_parallel_run_async_thread_func (GimpParallelRunAsyncThread *thread)
|
||||
GimpParallelRunAsyncTask *task;
|
||||
|
||||
while (! thread->quit &&
|
||||
(task =
|
||||
(GimpParallelRunAsyncTask *) g_queue_pop_head (
|
||||
&gimp_parallel_run_async_queue)))
|
||||
(task = gimp_parallel_run_async_dequeue_task ()))
|
||||
{
|
||||
g_object_set_data (G_OBJECT (task->async),
|
||||
"gimp-parallel-run-async-link", NULL);
|
||||
gboolean resume;
|
||||
|
||||
thread->current_async = GIMP_ASYNC (g_object_ref (task->async));
|
||||
|
||||
g_mutex_unlock (&gimp_parallel_run_async_mutex);
|
||||
do
|
||||
{
|
||||
g_mutex_unlock (&gimp_parallel_run_async_mutex);
|
||||
|
||||
gimp_parallel_run_async_execute_task (task);
|
||||
resume = gimp_parallel_run_async_execute_task (task);
|
||||
|
||||
g_mutex_lock (&gimp_parallel_run_async_mutex);
|
||||
g_mutex_lock (&gimp_parallel_run_async_mutex);
|
||||
}
|
||||
while (resume &&
|
||||
(g_queue_is_empty (&gimp_parallel_run_async_queue) ||
|
||||
task->priority <
|
||||
((GimpParallelRunAsyncTask *)
|
||||
g_queue_peek_head (
|
||||
&gimp_parallel_run_async_queue))->priority));
|
||||
|
||||
g_clear_object (&thread->current_async);
|
||||
|
||||
if (resume)
|
||||
gimp_parallel_run_async_enqueue_task (task);
|
||||
}
|
||||
|
||||
if (thread->quit)
|
||||
@ -582,12 +542,103 @@ gimp_parallel_run_async_thread_func (GimpParallelRunAsyncThread *thread)
|
||||
}
|
||||
|
||||
static void
|
||||
gimp_parallel_run_async_enqueue_task (GimpParallelRunAsyncTask *task)
|
||||
{
|
||||
GList *link;
|
||||
GList *iter;
|
||||
|
||||
if (gimp_async_is_canceled (task->async))
|
||||
{
|
||||
gimp_parallel_run_async_abort_task (task);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
link = g_list_alloc ();
|
||||
link->data = task;
|
||||
|
||||
g_object_set_data (G_OBJECT (task->async),
|
||||
"gimp-parallel-run-async-link", link);
|
||||
|
||||
for (iter = g_queue_peek_tail_link (&gimp_parallel_run_async_queue);
|
||||
iter;
|
||||
iter = g_list_previous (iter))
|
||||
{
|
||||
GimpParallelRunAsyncTask *other_task =
|
||||
(GimpParallelRunAsyncTask *) iter->data;
|
||||
|
||||
if (other_task->priority <= task->priority)
|
||||
break;
|
||||
}
|
||||
|
||||
if (iter)
|
||||
{
|
||||
link->prev = iter;
|
||||
link->next = iter->next;
|
||||
|
||||
iter->next = link;
|
||||
|
||||
if (link->next)
|
||||
link->next->prev = link;
|
||||
else
|
||||
gimp_parallel_run_async_queue.tail = link;
|
||||
|
||||
gimp_parallel_run_async_queue.length++;
|
||||
}
|
||||
else
|
||||
{
|
||||
g_queue_push_head_link (&gimp_parallel_run_async_queue, link);
|
||||
}
|
||||
}
|
||||
|
||||
static GimpParallelRunAsyncTask *
|
||||
gimp_parallel_run_async_dequeue_task (void)
|
||||
{
|
||||
GimpParallelRunAsyncTask *task;
|
||||
|
||||
task = (GimpParallelRunAsyncTask *) g_queue_pop_head (
|
||||
&gimp_parallel_run_async_queue);
|
||||
|
||||
if (task)
|
||||
{
|
||||
g_object_set_data (G_OBJECT (task->async),
|
||||
"gimp-parallel-run-async-link", NULL);
|
||||
}
|
||||
|
||||
return task;
|
||||
}
|
||||
|
||||
static gboolean
|
||||
gimp_parallel_run_async_execute_task (GimpParallelRunAsyncTask *task)
|
||||
{
|
||||
if (gimp_async_is_canceled (task->async))
|
||||
{
|
||||
gimp_parallel_run_async_abort_task (task);
|
||||
|
||||
return FALSE;
|
||||
}
|
||||
|
||||
task->func (task->async, task->user_data);
|
||||
|
||||
if (! gimp_async_is_stopped (task->async))
|
||||
gimp_async_abort (task->async);
|
||||
if (gimp_async_is_stopped (task->async))
|
||||
{
|
||||
g_object_unref (task->async);
|
||||
|
||||
g_slice_free (GimpParallelRunAsyncTask, task);
|
||||
|
||||
return FALSE;
|
||||
}
|
||||
|
||||
return TRUE;
|
||||
}
|
||||
|
||||
static void
|
||||
gimp_parallel_run_async_abort_task (GimpParallelRunAsyncTask *task)
|
||||
{
|
||||
if (task->user_data && task->user_data_destroy_func)
|
||||
task->user_data_destroy_func (task->user_data);
|
||||
|
||||
gimp_async_abort (task->async);
|
||||
|
||||
g_object_unref (task->async);
|
||||
|
||||
@ -624,16 +675,7 @@ gimp_parallel_run_async_cancel (GimpAsync *async)
|
||||
g_mutex_unlock (&gimp_parallel_run_async_mutex);
|
||||
|
||||
if (task)
|
||||
{
|
||||
if (task->user_data && task->user_data_destroy_func)
|
||||
task->user_data_destroy_func (task->user_data);
|
||||
|
||||
g_slice_free (GimpParallelRunAsyncTask, task);
|
||||
|
||||
gimp_async_abort (async);
|
||||
|
||||
g_object_unref (async);
|
||||
}
|
||||
gimp_parallel_run_async_abort_task (task);
|
||||
}
|
||||
|
||||
static void
|
||||
|
Reference in New Issue
Block a user