app: reorganized multi-thread saving code.
Contributed code was not wrong, far from it! But the whole memory management was a bit on the hard-to-read side. For maintainability, I prefer some simpler code, which contains a bit more allocation but it's reasonable and at least much less indexes to play with. Also instead of gegl_parallel_distribute(), then waiting for each set of batches to entirely finish, I'm using a GThreadPool which I progressively fill with new batch data. And finally I'm running the gegl_buffer_get() inside the threads (I know that we have only reads on the buffer at this point; also GEGL has an internal lock mechanism anyway). These actually do not provide a huge additional speedup (compared to the initial speedup by just going multi-threaded), most likely because the I/O are now the bottleneck anyway (which is quite a good thing!).
This commit is contained in:
@ -76,6 +76,26 @@
|
||||
#include "gimp-intl.h"
|
||||
|
||||
|
||||
/* Per thread data for xcf_save_tile_rle */
|
||||
typedef struct
|
||||
{
|
||||
/* Common to all jobs. */
|
||||
GeglBuffer *buffer;
|
||||
gint file_version;
|
||||
gint max_rle_size;
|
||||
|
||||
/* Job specific. */
|
||||
gint tile;
|
||||
gint batch_size;
|
||||
|
||||
/* Temp data to avoid too many allocations. */
|
||||
guchar *tile_data;
|
||||
|
||||
/* Return data. */
|
||||
guchar *rle_data;
|
||||
gint rle_data_len[XCF_TILE_SAVE_BATCH_SIZE];
|
||||
} XcfJobData;
|
||||
|
||||
static gboolean xcf_save_image_props (XcfInfo *info,
|
||||
GimpImage *image,
|
||||
GError **error);
|
||||
@ -121,15 +141,18 @@ static gboolean xcf_save_tile (XcfInfo *info,
|
||||
GeglRectangle *tile_rect,
|
||||
const Babl *format,
|
||||
GError **error);
|
||||
static void xcf_save_tile_rle_parallel (gint thread,
|
||||
gint n_thread,
|
||||
static void xcf_save_free_job_data (XcfJobData *data);
|
||||
static gint xcf_save_sort_job_data (XcfJobData *data1,
|
||||
XcfJobData *data2,
|
||||
gpointer user_data);
|
||||
static void xcf_save_tile_rle (GeglRectangle *tile_rect,
|
||||
guchar *tile_data,
|
||||
const Babl *format,
|
||||
guchar *rlebuf,
|
||||
gint *lenptr,
|
||||
gint file_version);
|
||||
static void xcf_save_tile_rle_parallel (XcfJobData *job_data,
|
||||
GAsyncQueue *queue);
|
||||
static void xcf_save_tile_rle (GeglRectangle *tile_rect,
|
||||
guchar *tile_data,
|
||||
const Babl *format,
|
||||
guchar *rlebuf,
|
||||
gint *lenptr,
|
||||
gint file_version);
|
||||
static gboolean xcf_save_tile_zlib (XcfInfo *info,
|
||||
GeglBuffer *buffer,
|
||||
GeglRectangle *tile_rect,
|
||||
@ -148,21 +171,6 @@ static gboolean xcf_save_old_vectors (XcfInfo *info,
|
||||
GimpImage *image,
|
||||
GError **error);
|
||||
|
||||
/* Per thread data for xcf_save_tile_rle */
|
||||
struct _GimpXCFSaveTileRLEData
|
||||
{
|
||||
const Babl *format;
|
||||
gint file_version;
|
||||
gint batch_size;
|
||||
GeglRectangle tile_rect[XCF_TILE_SAVE_BATCH_SIZE];
|
||||
/* tile data from gegl_buffer_get */
|
||||
guchar *rletilebuf[XCF_TILE_SAVE_BATCH_SIZE];
|
||||
/* rle compressed data */
|
||||
guchar *rlebuf[XCF_TILE_SAVE_BATCH_SIZE];
|
||||
/* rle compressed length */
|
||||
gint len[XCF_TILE_SAVE_BATCH_SIZE];
|
||||
};
|
||||
typedef struct _GimpXCFSaveTileRLEData GimpXCFSaveTileRLEData;
|
||||
|
||||
/* private convenience macros */
|
||||
#define xcf_write_int32_check_error(info, data, count) G_STMT_START { \
|
||||
@ -1846,11 +1854,7 @@ xcf_save_level (XcfInfo *info,
|
||||
gint n_tile_cols;
|
||||
guint ntiles;
|
||||
gint num_processors;
|
||||
gint n_threads;
|
||||
gint i, j, k;
|
||||
guchar *rletilebuf = NULL;
|
||||
guchar *rlebuf = NULL;
|
||||
GimpXCFSaveTileRLEData *rledata = NULL;
|
||||
GError *tmp_error = NULL;
|
||||
|
||||
num_processors = GIMP_GEGL_CONFIG (image->gimp->config)->num_processors;
|
||||
@ -1873,35 +1877,6 @@ xcf_save_level (XcfInfo *info,
|
||||
max_data_length = XCF_TILE_WIDTH * XCF_TILE_HEIGHT * bpp *
|
||||
XCF_TILE_MAX_DATA_LENGTH_FACTOR /* = 1.5, currently */;
|
||||
|
||||
/* allocate a temporary buffer to store the rle data before it is
|
||||
* written to disk
|
||||
*/
|
||||
if (info->compression == COMPRESS_RLE)
|
||||
{
|
||||
gint tile_size = XCF_TILE_WIDTH * XCF_TILE_HEIGHT * bpp;
|
||||
gint n_buf = num_processors * XCF_TILE_SAVE_BATCH_SIZE;
|
||||
|
||||
/* use g_malloc to avoid stack overflow */
|
||||
rletilebuf = (guchar *) g_malloc (tile_size * n_buf);
|
||||
rlebuf = (guchar *) g_malloc (max_data_length * n_buf);
|
||||
rledata = (GimpXCFSaveTileRLEData*) g_malloc (sizeof (GimpXCFSaveTileRLEData) * num_processors);
|
||||
|
||||
for (j = 0; j < num_processors; j++)
|
||||
{
|
||||
rledata[j].format = format;
|
||||
rledata[j].file_version = info->file_version;
|
||||
for (k = 0; k < XCF_TILE_SAVE_BATCH_SIZE; k++)
|
||||
{
|
||||
rledata[j].rletilebuf[k] = rletilebuf +
|
||||
tile_size * XCF_TILE_SAVE_BATCH_SIZE * j +
|
||||
tile_size * k;
|
||||
rledata[j].rlebuf[k] = rlebuf +
|
||||
max_data_length * XCF_TILE_SAVE_BATCH_SIZE * j +
|
||||
max_data_length * k;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
n_tile_rows = gimp_gegl_buffer_get_n_tile_rows (buffer, XCF_TILE_HEIGHT);
|
||||
n_tile_cols = gimp_gegl_buffer_get_n_tile_cols (buffer, XCF_TILE_WIDTH);
|
||||
|
||||
@ -1928,51 +1903,157 @@ xcf_save_level (XcfInfo *info,
|
||||
if (info->compression == COMPRESS_RLE)
|
||||
{
|
||||
/* parallel implementation */
|
||||
XcfJobData *job_data;
|
||||
guchar *switch_rle_data;
|
||||
gint rle_data_len[XCF_TILE_SAVE_BATCH_SIZE];
|
||||
|
||||
GThreadPool *pool;
|
||||
GAsyncQueue *queue;
|
||||
gint num_tasks = num_processors * 2;
|
||||
gint tile_size = XCF_TILE_WIDTH * XCF_TILE_HEIGHT * bpp;
|
||||
gint rle_data_max_size;
|
||||
gint next_tile = 0;
|
||||
|
||||
rle_data_max_size = tile_size * XCF_TILE_MAX_DATA_LENGTH_FACTOR;
|
||||
/* Prepare an additional rle_data to quickly switch. */
|
||||
switch_rle_data = g_malloc (rle_data_max_size * XCF_TILE_SAVE_BATCH_SIZE);
|
||||
|
||||
/* The free function passed to the queue and thread pool will likely never
|
||||
* be used. It would mean the thread pool is unfinidhed or the result
|
||||
* queue still has data which would mean we had to interrupt the save,
|
||||
* i.e. there is a bug in our code.
|
||||
*/
|
||||
queue = g_async_queue_new_full ((GDestroyNotify) xcf_save_free_job_data);
|
||||
pool = g_thread_pool_new_full ((GFunc) xcf_save_tile_rle_parallel,
|
||||
queue,
|
||||
(GDestroyNotify) xcf_save_free_job_data,
|
||||
num_processors, TRUE, NULL);
|
||||
|
||||
i = 0;
|
||||
/* We push more tasks than there are threads, ensuring threads always have
|
||||
* something to do!
|
||||
*/
|
||||
for (j = 0; j < num_tasks && i < ntiles; j++)
|
||||
{
|
||||
job_data = g_malloc (sizeof (XcfJobData ));
|
||||
job_data->buffer = buffer;
|
||||
job_data->file_version = info->file_version;
|
||||
job_data->max_rle_size = rle_data_max_size;
|
||||
job_data->tile_data = g_malloc (tile_size);
|
||||
job_data->rle_data = g_malloc (rle_data_max_size * XCF_TILE_SAVE_BATCH_SIZE);
|
||||
|
||||
job_data->tile = i;
|
||||
job_data->batch_size = MIN (XCF_TILE_SAVE_BATCH_SIZE, ntiles - i);
|
||||
i += job_data->batch_size;
|
||||
|
||||
g_thread_pool_push (pool, job_data, NULL);
|
||||
}
|
||||
|
||||
/* Continue pushing tasks and writing tasks as long as we have tiles to
|
||||
* process.
|
||||
*/
|
||||
while (i < ntiles)
|
||||
{
|
||||
for (j = 0; j < num_processors && i < ntiles; j++)
|
||||
while ((job_data = g_async_queue_pop (queue)))
|
||||
{
|
||||
for (k = 0; k < XCF_TILE_SAVE_BATCH_SIZE && i < ntiles; k++)
|
||||
if (next_tile == job_data->tile)
|
||||
{
|
||||
gimp_gegl_buffer_get_tile_rect (buffer,
|
||||
XCF_TILE_WIDTH,
|
||||
XCF_TILE_HEIGHT,
|
||||
i++,
|
||||
&rledata[j].tile_rect[k]);
|
||||
/* only single thread can create tile data when cache miss */
|
||||
gegl_buffer_get (buffer, &rledata[j].tile_rect[k],
|
||||
1.0, format, rledata[j].rletilebuf[k],
|
||||
GEGL_AUTO_ROWSTRIDE, GEGL_ABYSS_NONE);
|
||||
rledata[j].len[k] = 0;
|
||||
guchar *tmp_rle_data;
|
||||
gint batch_size;
|
||||
|
||||
tmp_rle_data = job_data->rle_data;
|
||||
job_data->rle_data = switch_rle_data;
|
||||
switch_rle_data = tmp_rle_data;
|
||||
|
||||
batch_size = job_data->batch_size;
|
||||
|
||||
for (k = 0; k < batch_size; k++)
|
||||
rle_data_len[k] = job_data->rle_data_len[k];
|
||||
|
||||
/* First immediately push a new task for the thread pool,
|
||||
* ensuring it always has work to do.
|
||||
*/
|
||||
job_data->tile = i;
|
||||
job_data->batch_size = MIN (XCF_TILE_SAVE_BATCH_SIZE, ntiles - i);
|
||||
i += job_data->batch_size;
|
||||
|
||||
g_thread_pool_push (pool, job_data, NULL);
|
||||
|
||||
/* Now write the data. */
|
||||
for (k = 0; k < batch_size; k++)
|
||||
{
|
||||
*next_offset++ = offset;
|
||||
xcf_write_int8_check_error (info,
|
||||
switch_rle_data + rle_data_max_size * k,
|
||||
rle_data_len[k]);
|
||||
if (info->cp < offset || info->cp - offset > max_data_length)
|
||||
{
|
||||
g_message ("xcf: invalid tile data length: %" G_GOFFSET_FORMAT,
|
||||
info->cp - offset);
|
||||
g_thread_pool_free (pool, TRUE, TRUE);
|
||||
g_async_queue_unref (queue);
|
||||
g_free (offset_table);
|
||||
return FALSE;
|
||||
}
|
||||
offset = info->cp;
|
||||
}
|
||||
next_tile += batch_size;
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
g_async_queue_push_sorted (queue, job_data,
|
||||
(GCompareDataFunc) xcf_save_sort_job_data,
|
||||
NULL);
|
||||
}
|
||||
rledata[j].batch_size = k;
|
||||
}
|
||||
n_threads = j;
|
||||
gegl_parallel_distribute (n_threads, xcf_save_tile_rle_parallel, rledata);
|
||||
for (j = 0; j < n_threads; j++)
|
||||
}
|
||||
g_free (switch_rle_data);
|
||||
|
||||
/* Finally wait for all remaining tasks to write. */
|
||||
while ((job_data = g_async_queue_pop (queue)))
|
||||
{
|
||||
if (next_tile == job_data->tile)
|
||||
{
|
||||
for (k = 0; k < rledata[j].batch_size; k++)
|
||||
gboolean done = FALSE;
|
||||
|
||||
for (k = 0; k < job_data->batch_size; k++)
|
||||
{
|
||||
*next_offset++ = offset;
|
||||
xcf_write_int8_check_error (info,
|
||||
rledata[j].rlebuf[k],
|
||||
rledata[j].len[k]);
|
||||
job_data->rle_data + rle_data_max_size * k,
|
||||
job_data->rle_data_len[k]);
|
||||
if (info->cp < offset || info->cp - offset > max_data_length)
|
||||
{
|
||||
g_message ("xcf: invalid tile data length: %" G_GOFFSET_FORMAT,
|
||||
info->cp - offset);
|
||||
g_thread_pool_free (pool, TRUE, TRUE);
|
||||
g_async_queue_unref (queue);
|
||||
g_free (offset_table);
|
||||
return FALSE;
|
||||
}
|
||||
offset = info->cp;
|
||||
}
|
||||
next_tile += job_data->batch_size;
|
||||
|
||||
if (job_data->tile + job_data->batch_size >= ntiles)
|
||||
done = TRUE;
|
||||
|
||||
xcf_save_free_job_data (job_data);
|
||||
|
||||
if (done)
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
g_async_queue_push_sorted (queue, job_data,
|
||||
(GCompareDataFunc) xcf_save_sort_job_data,
|
||||
NULL);
|
||||
}
|
||||
}
|
||||
|
||||
g_free (rletilebuf);
|
||||
g_free (rlebuf);
|
||||
g_free (rledata);
|
||||
g_thread_pool_free (pool, FALSE, TRUE);
|
||||
g_async_queue_unref (queue);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -2068,20 +2149,55 @@ xcf_save_tile (XcfInfo *info,
|
||||
}
|
||||
|
||||
static void
|
||||
xcf_save_tile_rle_parallel (gint thread,
|
||||
gint n_thread,
|
||||
gpointer user_data)
|
||||
xcf_save_free_job_data (XcfJobData *data)
|
||||
{
|
||||
GimpXCFSaveTileRLEData *rledata;
|
||||
g_free (data->rle_data);
|
||||
g_free (data->tile_data);
|
||||
g_free (data);
|
||||
}
|
||||
|
||||
rledata = ((GimpXCFSaveTileRLEData*) user_data) + thread;
|
||||
for (gint i = 0; i < rledata->batch_size; ++i)
|
||||
xcf_save_tile_rle (rledata->tile_rect + i,
|
||||
rledata->rletilebuf[i],
|
||||
rledata->format,
|
||||
rledata->rlebuf[i],
|
||||
rledata->len + i,
|
||||
rledata->file_version);
|
||||
static gint
|
||||
xcf_save_sort_job_data (XcfJobData *data1,
|
||||
XcfJobData *data2,
|
||||
gpointer user_data)
|
||||
{
|
||||
if (data1->tile < data2->tile)
|
||||
return -1;
|
||||
else if (data1->tile > data2->tile)
|
||||
return 1;
|
||||
else
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void
|
||||
xcf_save_tile_rle_parallel (XcfJobData *job_data,
|
||||
GAsyncQueue *queue)
|
||||
{
|
||||
GeglRectangle tile_rect;
|
||||
|
||||
for (gint i = 0; i < job_data->batch_size; ++i)
|
||||
{
|
||||
gimp_gegl_buffer_get_tile_rect (job_data->buffer,
|
||||
XCF_TILE_WIDTH,
|
||||
XCF_TILE_HEIGHT,
|
||||
job_data->tile + i,
|
||||
&tile_rect);
|
||||
/* only single thread can create tile data when cache miss */
|
||||
gegl_buffer_get (job_data->buffer, &tile_rect,
|
||||
1.0, gegl_buffer_get_format (job_data->buffer),
|
||||
job_data->tile_data,
|
||||
GEGL_AUTO_ROWSTRIDE, GEGL_ABYSS_NONE);
|
||||
xcf_save_tile_rle (&tile_rect,
|
||||
job_data->tile_data,
|
||||
gegl_buffer_get_format (job_data->buffer),
|
||||
job_data->rle_data + job_data->max_rle_size * i,
|
||||
job_data->rle_data_len + i,
|
||||
job_data->file_version);
|
||||
}
|
||||
|
||||
g_async_queue_push_sorted (queue, job_data,
|
||||
(GCompareDataFunc) xcf_save_sort_job_data,
|
||||
NULL);
|
||||
}
|
||||
|
||||
static void
|
||||
|
Reference in New Issue
Block a user