app: reorganized multi-thread saving code.
Contributed code was not wrong, far from it! But the whole memory management was a bit on the hard-to-read side. For maintainability, I prefer some simpler code, which contains a bit more allocation but it's reasonable and at least much less indexes to play with. Also instead of gegl_parallel_distribute(), then waiting for each set of batches to entirely finish, I'm using a GThreadPool which I progressively fill with new batch data. And finally I'm running the gegl_buffer_get() inside the threads (I know that we have only reads on the buffer at this point; also GEGL has an internal lock mechanism anyway). These actually do not provide a huge additional speedup (compared to the initial speedup by just going multi-threaded), most likely because the I/O are now the bottleneck anyway (which is quite a good thing!).
This commit is contained in:
@ -76,6 +76,26 @@
|
|||||||
#include "gimp-intl.h"
|
#include "gimp-intl.h"
|
||||||
|
|
||||||
|
|
||||||
|
/* Per thread data for xcf_save_tile_rle */
|
||||||
|
typedef struct
|
||||||
|
{
|
||||||
|
/* Common to all jobs. */
|
||||||
|
GeglBuffer *buffer;
|
||||||
|
gint file_version;
|
||||||
|
gint max_rle_size;
|
||||||
|
|
||||||
|
/* Job specific. */
|
||||||
|
gint tile;
|
||||||
|
gint batch_size;
|
||||||
|
|
||||||
|
/* Temp data to avoid too many allocations. */
|
||||||
|
guchar *tile_data;
|
||||||
|
|
||||||
|
/* Return data. */
|
||||||
|
guchar *rle_data;
|
||||||
|
gint rle_data_len[XCF_TILE_SAVE_BATCH_SIZE];
|
||||||
|
} XcfJobData;
|
||||||
|
|
||||||
static gboolean xcf_save_image_props (XcfInfo *info,
|
static gboolean xcf_save_image_props (XcfInfo *info,
|
||||||
GimpImage *image,
|
GimpImage *image,
|
||||||
GError **error);
|
GError **error);
|
||||||
@ -121,15 +141,18 @@ static gboolean xcf_save_tile (XcfInfo *info,
|
|||||||
GeglRectangle *tile_rect,
|
GeglRectangle *tile_rect,
|
||||||
const Babl *format,
|
const Babl *format,
|
||||||
GError **error);
|
GError **error);
|
||||||
static void xcf_save_tile_rle_parallel (gint thread,
|
static void xcf_save_free_job_data (XcfJobData *data);
|
||||||
gint n_thread,
|
static gint xcf_save_sort_job_data (XcfJobData *data1,
|
||||||
|
XcfJobData *data2,
|
||||||
gpointer user_data);
|
gpointer user_data);
|
||||||
static void xcf_save_tile_rle (GeglRectangle *tile_rect,
|
static void xcf_save_tile_rle_parallel (XcfJobData *job_data,
|
||||||
guchar *tile_data,
|
GAsyncQueue *queue);
|
||||||
const Babl *format,
|
static void xcf_save_tile_rle (GeglRectangle *tile_rect,
|
||||||
guchar *rlebuf,
|
guchar *tile_data,
|
||||||
gint *lenptr,
|
const Babl *format,
|
||||||
gint file_version);
|
guchar *rlebuf,
|
||||||
|
gint *lenptr,
|
||||||
|
gint file_version);
|
||||||
static gboolean xcf_save_tile_zlib (XcfInfo *info,
|
static gboolean xcf_save_tile_zlib (XcfInfo *info,
|
||||||
GeglBuffer *buffer,
|
GeglBuffer *buffer,
|
||||||
GeglRectangle *tile_rect,
|
GeglRectangle *tile_rect,
|
||||||
@ -148,21 +171,6 @@ static gboolean xcf_save_old_vectors (XcfInfo *info,
|
|||||||
GimpImage *image,
|
GimpImage *image,
|
||||||
GError **error);
|
GError **error);
|
||||||
|
|
||||||
/* Per thread data for xcf_save_tile_rle */
|
|
||||||
struct _GimpXCFSaveTileRLEData
|
|
||||||
{
|
|
||||||
const Babl *format;
|
|
||||||
gint file_version;
|
|
||||||
gint batch_size;
|
|
||||||
GeglRectangle tile_rect[XCF_TILE_SAVE_BATCH_SIZE];
|
|
||||||
/* tile data from gegl_buffer_get */
|
|
||||||
guchar *rletilebuf[XCF_TILE_SAVE_BATCH_SIZE];
|
|
||||||
/* rle compressed data */
|
|
||||||
guchar *rlebuf[XCF_TILE_SAVE_BATCH_SIZE];
|
|
||||||
/* rle compressed length */
|
|
||||||
gint len[XCF_TILE_SAVE_BATCH_SIZE];
|
|
||||||
};
|
|
||||||
typedef struct _GimpXCFSaveTileRLEData GimpXCFSaveTileRLEData;
|
|
||||||
|
|
||||||
/* private convenience macros */
|
/* private convenience macros */
|
||||||
#define xcf_write_int32_check_error(info, data, count) G_STMT_START { \
|
#define xcf_write_int32_check_error(info, data, count) G_STMT_START { \
|
||||||
@ -1846,11 +1854,7 @@ xcf_save_level (XcfInfo *info,
|
|||||||
gint n_tile_cols;
|
gint n_tile_cols;
|
||||||
guint ntiles;
|
guint ntiles;
|
||||||
gint num_processors;
|
gint num_processors;
|
||||||
gint n_threads;
|
|
||||||
gint i, j, k;
|
gint i, j, k;
|
||||||
guchar *rletilebuf = NULL;
|
|
||||||
guchar *rlebuf = NULL;
|
|
||||||
GimpXCFSaveTileRLEData *rledata = NULL;
|
|
||||||
GError *tmp_error = NULL;
|
GError *tmp_error = NULL;
|
||||||
|
|
||||||
num_processors = GIMP_GEGL_CONFIG (image->gimp->config)->num_processors;
|
num_processors = GIMP_GEGL_CONFIG (image->gimp->config)->num_processors;
|
||||||
@ -1873,35 +1877,6 @@ xcf_save_level (XcfInfo *info,
|
|||||||
max_data_length = XCF_TILE_WIDTH * XCF_TILE_HEIGHT * bpp *
|
max_data_length = XCF_TILE_WIDTH * XCF_TILE_HEIGHT * bpp *
|
||||||
XCF_TILE_MAX_DATA_LENGTH_FACTOR /* = 1.5, currently */;
|
XCF_TILE_MAX_DATA_LENGTH_FACTOR /* = 1.5, currently */;
|
||||||
|
|
||||||
/* allocate a temporary buffer to store the rle data before it is
|
|
||||||
* written to disk
|
|
||||||
*/
|
|
||||||
if (info->compression == COMPRESS_RLE)
|
|
||||||
{
|
|
||||||
gint tile_size = XCF_TILE_WIDTH * XCF_TILE_HEIGHT * bpp;
|
|
||||||
gint n_buf = num_processors * XCF_TILE_SAVE_BATCH_SIZE;
|
|
||||||
|
|
||||||
/* use g_malloc to avoid stack overflow */
|
|
||||||
rletilebuf = (guchar *) g_malloc (tile_size * n_buf);
|
|
||||||
rlebuf = (guchar *) g_malloc (max_data_length * n_buf);
|
|
||||||
rledata = (GimpXCFSaveTileRLEData*) g_malloc (sizeof (GimpXCFSaveTileRLEData) * num_processors);
|
|
||||||
|
|
||||||
for (j = 0; j < num_processors; j++)
|
|
||||||
{
|
|
||||||
rledata[j].format = format;
|
|
||||||
rledata[j].file_version = info->file_version;
|
|
||||||
for (k = 0; k < XCF_TILE_SAVE_BATCH_SIZE; k++)
|
|
||||||
{
|
|
||||||
rledata[j].rletilebuf[k] = rletilebuf +
|
|
||||||
tile_size * XCF_TILE_SAVE_BATCH_SIZE * j +
|
|
||||||
tile_size * k;
|
|
||||||
rledata[j].rlebuf[k] = rlebuf +
|
|
||||||
max_data_length * XCF_TILE_SAVE_BATCH_SIZE * j +
|
|
||||||
max_data_length * k;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
n_tile_rows = gimp_gegl_buffer_get_n_tile_rows (buffer, XCF_TILE_HEIGHT);
|
n_tile_rows = gimp_gegl_buffer_get_n_tile_rows (buffer, XCF_TILE_HEIGHT);
|
||||||
n_tile_cols = gimp_gegl_buffer_get_n_tile_cols (buffer, XCF_TILE_WIDTH);
|
n_tile_cols = gimp_gegl_buffer_get_n_tile_cols (buffer, XCF_TILE_WIDTH);
|
||||||
|
|
||||||
@ -1928,51 +1903,157 @@ xcf_save_level (XcfInfo *info,
|
|||||||
if (info->compression == COMPRESS_RLE)
|
if (info->compression == COMPRESS_RLE)
|
||||||
{
|
{
|
||||||
/* parallel implementation */
|
/* parallel implementation */
|
||||||
|
XcfJobData *job_data;
|
||||||
|
guchar *switch_rle_data;
|
||||||
|
gint rle_data_len[XCF_TILE_SAVE_BATCH_SIZE];
|
||||||
|
|
||||||
|
GThreadPool *pool;
|
||||||
|
GAsyncQueue *queue;
|
||||||
|
gint num_tasks = num_processors * 2;
|
||||||
|
gint tile_size = XCF_TILE_WIDTH * XCF_TILE_HEIGHT * bpp;
|
||||||
|
gint rle_data_max_size;
|
||||||
|
gint next_tile = 0;
|
||||||
|
|
||||||
|
rle_data_max_size = tile_size * XCF_TILE_MAX_DATA_LENGTH_FACTOR;
|
||||||
|
/* Prepare an additional rle_data to quickly switch. */
|
||||||
|
switch_rle_data = g_malloc (rle_data_max_size * XCF_TILE_SAVE_BATCH_SIZE);
|
||||||
|
|
||||||
|
/* The free function passed to the queue and thread pool will likely never
|
||||||
|
* be used. It would mean the thread pool is unfinidhed or the result
|
||||||
|
* queue still has data which would mean we had to interrupt the save,
|
||||||
|
* i.e. there is a bug in our code.
|
||||||
|
*/
|
||||||
|
queue = g_async_queue_new_full ((GDestroyNotify) xcf_save_free_job_data);
|
||||||
|
pool = g_thread_pool_new_full ((GFunc) xcf_save_tile_rle_parallel,
|
||||||
|
queue,
|
||||||
|
(GDestroyNotify) xcf_save_free_job_data,
|
||||||
|
num_processors, TRUE, NULL);
|
||||||
|
|
||||||
i = 0;
|
i = 0;
|
||||||
|
/* We push more tasks than there are threads, ensuring threads always have
|
||||||
|
* something to do!
|
||||||
|
*/
|
||||||
|
for (j = 0; j < num_tasks && i < ntiles; j++)
|
||||||
|
{
|
||||||
|
job_data = g_malloc (sizeof (XcfJobData ));
|
||||||
|
job_data->buffer = buffer;
|
||||||
|
job_data->file_version = info->file_version;
|
||||||
|
job_data->max_rle_size = rle_data_max_size;
|
||||||
|
job_data->tile_data = g_malloc (tile_size);
|
||||||
|
job_data->rle_data = g_malloc (rle_data_max_size * XCF_TILE_SAVE_BATCH_SIZE);
|
||||||
|
|
||||||
|
job_data->tile = i;
|
||||||
|
job_data->batch_size = MIN (XCF_TILE_SAVE_BATCH_SIZE, ntiles - i);
|
||||||
|
i += job_data->batch_size;
|
||||||
|
|
||||||
|
g_thread_pool_push (pool, job_data, NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Continue pushing tasks and writing tasks as long as we have tiles to
|
||||||
|
* process.
|
||||||
|
*/
|
||||||
while (i < ntiles)
|
while (i < ntiles)
|
||||||
{
|
{
|
||||||
for (j = 0; j < num_processors && i < ntiles; j++)
|
while ((job_data = g_async_queue_pop (queue)))
|
||||||
{
|
{
|
||||||
for (k = 0; k < XCF_TILE_SAVE_BATCH_SIZE && i < ntiles; k++)
|
if (next_tile == job_data->tile)
|
||||||
{
|
{
|
||||||
gimp_gegl_buffer_get_tile_rect (buffer,
|
guchar *tmp_rle_data;
|
||||||
XCF_TILE_WIDTH,
|
gint batch_size;
|
||||||
XCF_TILE_HEIGHT,
|
|
||||||
i++,
|
tmp_rle_data = job_data->rle_data;
|
||||||
&rledata[j].tile_rect[k]);
|
job_data->rle_data = switch_rle_data;
|
||||||
/* only single thread can create tile data when cache miss */
|
switch_rle_data = tmp_rle_data;
|
||||||
gegl_buffer_get (buffer, &rledata[j].tile_rect[k],
|
|
||||||
1.0, format, rledata[j].rletilebuf[k],
|
batch_size = job_data->batch_size;
|
||||||
GEGL_AUTO_ROWSTRIDE, GEGL_ABYSS_NONE);
|
|
||||||
rledata[j].len[k] = 0;
|
for (k = 0; k < batch_size; k++)
|
||||||
|
rle_data_len[k] = job_data->rle_data_len[k];
|
||||||
|
|
||||||
|
/* First immediately push a new task for the thread pool,
|
||||||
|
* ensuring it always has work to do.
|
||||||
|
*/
|
||||||
|
job_data->tile = i;
|
||||||
|
job_data->batch_size = MIN (XCF_TILE_SAVE_BATCH_SIZE, ntiles - i);
|
||||||
|
i += job_data->batch_size;
|
||||||
|
|
||||||
|
g_thread_pool_push (pool, job_data, NULL);
|
||||||
|
|
||||||
|
/* Now write the data. */
|
||||||
|
for (k = 0; k < batch_size; k++)
|
||||||
|
{
|
||||||
|
*next_offset++ = offset;
|
||||||
|
xcf_write_int8_check_error (info,
|
||||||
|
switch_rle_data + rle_data_max_size * k,
|
||||||
|
rle_data_len[k]);
|
||||||
|
if (info->cp < offset || info->cp - offset > max_data_length)
|
||||||
|
{
|
||||||
|
g_message ("xcf: invalid tile data length: %" G_GOFFSET_FORMAT,
|
||||||
|
info->cp - offset);
|
||||||
|
g_thread_pool_free (pool, TRUE, TRUE);
|
||||||
|
g_async_queue_unref (queue);
|
||||||
|
g_free (offset_table);
|
||||||
|
return FALSE;
|
||||||
|
}
|
||||||
|
offset = info->cp;
|
||||||
|
}
|
||||||
|
next_tile += batch_size;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
g_async_queue_push_sorted (queue, job_data,
|
||||||
|
(GCompareDataFunc) xcf_save_sort_job_data,
|
||||||
|
NULL);
|
||||||
}
|
}
|
||||||
rledata[j].batch_size = k;
|
|
||||||
}
|
}
|
||||||
n_threads = j;
|
}
|
||||||
gegl_parallel_distribute (n_threads, xcf_save_tile_rle_parallel, rledata);
|
g_free (switch_rle_data);
|
||||||
for (j = 0; j < n_threads; j++)
|
|
||||||
|
/* Finally wait for all remaining tasks to write. */
|
||||||
|
while ((job_data = g_async_queue_pop (queue)))
|
||||||
|
{
|
||||||
|
if (next_tile == job_data->tile)
|
||||||
{
|
{
|
||||||
for (k = 0; k < rledata[j].batch_size; k++)
|
gboolean done = FALSE;
|
||||||
|
|
||||||
|
for (k = 0; k < job_data->batch_size; k++)
|
||||||
{
|
{
|
||||||
*next_offset++ = offset;
|
*next_offset++ = offset;
|
||||||
xcf_write_int8_check_error (info,
|
xcf_write_int8_check_error (info,
|
||||||
rledata[j].rlebuf[k],
|
job_data->rle_data + rle_data_max_size * k,
|
||||||
rledata[j].len[k]);
|
job_data->rle_data_len[k]);
|
||||||
if (info->cp < offset || info->cp - offset > max_data_length)
|
if (info->cp < offset || info->cp - offset > max_data_length)
|
||||||
{
|
{
|
||||||
g_message ("xcf: invalid tile data length: %" G_GOFFSET_FORMAT,
|
g_message ("xcf: invalid tile data length: %" G_GOFFSET_FORMAT,
|
||||||
info->cp - offset);
|
info->cp - offset);
|
||||||
|
g_thread_pool_free (pool, TRUE, TRUE);
|
||||||
|
g_async_queue_unref (queue);
|
||||||
g_free (offset_table);
|
g_free (offset_table);
|
||||||
return FALSE;
|
return FALSE;
|
||||||
}
|
}
|
||||||
offset = info->cp;
|
offset = info->cp;
|
||||||
}
|
}
|
||||||
|
next_tile += job_data->batch_size;
|
||||||
|
|
||||||
|
if (job_data->tile + job_data->batch_size >= ntiles)
|
||||||
|
done = TRUE;
|
||||||
|
|
||||||
|
xcf_save_free_job_data (job_data);
|
||||||
|
|
||||||
|
if (done)
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
g_async_queue_push_sorted (queue, job_data,
|
||||||
|
(GCompareDataFunc) xcf_save_sort_job_data,
|
||||||
|
NULL);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
g_free (rletilebuf);
|
g_thread_pool_free (pool, FALSE, TRUE);
|
||||||
g_free (rlebuf);
|
g_async_queue_unref (queue);
|
||||||
g_free (rledata);
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -2068,20 +2149,55 @@ xcf_save_tile (XcfInfo *info,
|
|||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
xcf_save_tile_rle_parallel (gint thread,
|
xcf_save_free_job_data (XcfJobData *data)
|
||||||
gint n_thread,
|
|
||||||
gpointer user_data)
|
|
||||||
{
|
{
|
||||||
GimpXCFSaveTileRLEData *rledata;
|
g_free (data->rle_data);
|
||||||
|
g_free (data->tile_data);
|
||||||
|
g_free (data);
|
||||||
|
}
|
||||||
|
|
||||||
rledata = ((GimpXCFSaveTileRLEData*) user_data) + thread;
|
static gint
|
||||||
for (gint i = 0; i < rledata->batch_size; ++i)
|
xcf_save_sort_job_data (XcfJobData *data1,
|
||||||
xcf_save_tile_rle (rledata->tile_rect + i,
|
XcfJobData *data2,
|
||||||
rledata->rletilebuf[i],
|
gpointer user_data)
|
||||||
rledata->format,
|
{
|
||||||
rledata->rlebuf[i],
|
if (data1->tile < data2->tile)
|
||||||
rledata->len + i,
|
return -1;
|
||||||
rledata->file_version);
|
else if (data1->tile > data2->tile)
|
||||||
|
return 1;
|
||||||
|
else
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
xcf_save_tile_rle_parallel (XcfJobData *job_data,
|
||||||
|
GAsyncQueue *queue)
|
||||||
|
{
|
||||||
|
GeglRectangle tile_rect;
|
||||||
|
|
||||||
|
for (gint i = 0; i < job_data->batch_size; ++i)
|
||||||
|
{
|
||||||
|
gimp_gegl_buffer_get_tile_rect (job_data->buffer,
|
||||||
|
XCF_TILE_WIDTH,
|
||||||
|
XCF_TILE_HEIGHT,
|
||||||
|
job_data->tile + i,
|
||||||
|
&tile_rect);
|
||||||
|
/* only single thread can create tile data when cache miss */
|
||||||
|
gegl_buffer_get (job_data->buffer, &tile_rect,
|
||||||
|
1.0, gegl_buffer_get_format (job_data->buffer),
|
||||||
|
job_data->tile_data,
|
||||||
|
GEGL_AUTO_ROWSTRIDE, GEGL_ABYSS_NONE);
|
||||||
|
xcf_save_tile_rle (&tile_rect,
|
||||||
|
job_data->tile_data,
|
||||||
|
gegl_buffer_get_format (job_data->buffer),
|
||||||
|
job_data->rle_data + job_data->max_rle_size * i,
|
||||||
|
job_data->rle_data_len + i,
|
||||||
|
job_data->file_version);
|
||||||
|
}
|
||||||
|
|
||||||
|
g_async_queue_push_sorted (queue, job_data,
|
||||||
|
(GCompareDataFunc) xcf_save_sort_job_data,
|
||||||
|
NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
|
Reference in New Issue
Block a user