app: reorganized multi-thread saving code.

Contributed code was not wrong, far from it! But the whole memory management was
a bit on the hard-to-read side. For maintainability, I prefer some simpler code,
which contains a bit more allocation but it's reasonable and at least much less
indexes to play with.

Also instead of gegl_parallel_distribute(), then waiting for each set of batches
to entirely finish, I'm using a GThreadPool which I progressively fill with new
batch data. And finally I'm running the gegl_buffer_get() inside the threads (I
know that we have only reads on the buffer at this point; also GEGL has an
internal lock mechanism anyway).
These actually do not provide a huge additional speedup (compared to the initial
speedup by just going multi-threaded), most likely because the I/O are now the
bottleneck anyway (which is quite a good thing!).
This commit is contained in:
Jehan
2022-10-24 17:04:34 +02:00
parent b35bdd2dc9
commit 52d1743924

View File

@ -76,6 +76,26 @@
#include "gimp-intl.h"
/* Per thread data for xcf_save_tile_rle */
typedef struct
{
/* Common to all jobs. */
GeglBuffer *buffer;
gint file_version;
gint max_rle_size;
/* Job specific. */
gint tile;
gint batch_size;
/* Temp data to avoid too many allocations. */
guchar *tile_data;
/* Return data. */
guchar *rle_data;
gint rle_data_len[XCF_TILE_SAVE_BATCH_SIZE];
} XcfJobData;
static gboolean xcf_save_image_props (XcfInfo *info,
GimpImage *image,
GError **error);
@ -121,15 +141,18 @@ static gboolean xcf_save_tile (XcfInfo *info,
GeglRectangle *tile_rect,
const Babl *format,
GError **error);
static void xcf_save_tile_rle_parallel (gint thread,
gint n_thread,
static void xcf_save_free_job_data (XcfJobData *data);
static gint xcf_save_sort_job_data (XcfJobData *data1,
XcfJobData *data2,
gpointer user_data);
static void xcf_save_tile_rle (GeglRectangle *tile_rect,
guchar *tile_data,
const Babl *format,
guchar *rlebuf,
gint *lenptr,
gint file_version);
static void xcf_save_tile_rle_parallel (XcfJobData *job_data,
GAsyncQueue *queue);
static void xcf_save_tile_rle (GeglRectangle *tile_rect,
guchar *tile_data,
const Babl *format,
guchar *rlebuf,
gint *lenptr,
gint file_version);
static gboolean xcf_save_tile_zlib (XcfInfo *info,
GeglBuffer *buffer,
GeglRectangle *tile_rect,
@ -148,21 +171,6 @@ static gboolean xcf_save_old_vectors (XcfInfo *info,
GimpImage *image,
GError **error);
/* Per thread data for xcf_save_tile_rle */
struct _GimpXCFSaveTileRLEData
{
const Babl *format;
gint file_version;
gint batch_size;
GeglRectangle tile_rect[XCF_TILE_SAVE_BATCH_SIZE];
/* tile data from gegl_buffer_get */
guchar *rletilebuf[XCF_TILE_SAVE_BATCH_SIZE];
/* rle compressed data */
guchar *rlebuf[XCF_TILE_SAVE_BATCH_SIZE];
/* rle compressed length */
gint len[XCF_TILE_SAVE_BATCH_SIZE];
};
typedef struct _GimpXCFSaveTileRLEData GimpXCFSaveTileRLEData;
/* private convenience macros */
#define xcf_write_int32_check_error(info, data, count) G_STMT_START { \
@ -1846,11 +1854,7 @@ xcf_save_level (XcfInfo *info,
gint n_tile_cols;
guint ntiles;
gint num_processors;
gint n_threads;
gint i, j, k;
guchar *rletilebuf = NULL;
guchar *rlebuf = NULL;
GimpXCFSaveTileRLEData *rledata = NULL;
GError *tmp_error = NULL;
num_processors = GIMP_GEGL_CONFIG (image->gimp->config)->num_processors;
@ -1873,35 +1877,6 @@ xcf_save_level (XcfInfo *info,
max_data_length = XCF_TILE_WIDTH * XCF_TILE_HEIGHT * bpp *
XCF_TILE_MAX_DATA_LENGTH_FACTOR /* = 1.5, currently */;
/* allocate a temporary buffer to store the rle data before it is
* written to disk
*/
if (info->compression == COMPRESS_RLE)
{
gint tile_size = XCF_TILE_WIDTH * XCF_TILE_HEIGHT * bpp;
gint n_buf = num_processors * XCF_TILE_SAVE_BATCH_SIZE;
/* use g_malloc to avoid stack overflow */
rletilebuf = (guchar *) g_malloc (tile_size * n_buf);
rlebuf = (guchar *) g_malloc (max_data_length * n_buf);
rledata = (GimpXCFSaveTileRLEData*) g_malloc (sizeof (GimpXCFSaveTileRLEData) * num_processors);
for (j = 0; j < num_processors; j++)
{
rledata[j].format = format;
rledata[j].file_version = info->file_version;
for (k = 0; k < XCF_TILE_SAVE_BATCH_SIZE; k++)
{
rledata[j].rletilebuf[k] = rletilebuf +
tile_size * XCF_TILE_SAVE_BATCH_SIZE * j +
tile_size * k;
rledata[j].rlebuf[k] = rlebuf +
max_data_length * XCF_TILE_SAVE_BATCH_SIZE * j +
max_data_length * k;
}
}
}
n_tile_rows = gimp_gegl_buffer_get_n_tile_rows (buffer, XCF_TILE_HEIGHT);
n_tile_cols = gimp_gegl_buffer_get_n_tile_cols (buffer, XCF_TILE_WIDTH);
@ -1928,51 +1903,157 @@ xcf_save_level (XcfInfo *info,
if (info->compression == COMPRESS_RLE)
{
/* parallel implementation */
XcfJobData *job_data;
guchar *switch_rle_data;
gint rle_data_len[XCF_TILE_SAVE_BATCH_SIZE];
GThreadPool *pool;
GAsyncQueue *queue;
gint num_tasks = num_processors * 2;
gint tile_size = XCF_TILE_WIDTH * XCF_TILE_HEIGHT * bpp;
gint rle_data_max_size;
gint next_tile = 0;
rle_data_max_size = tile_size * XCF_TILE_MAX_DATA_LENGTH_FACTOR;
/* Prepare an additional rle_data to quickly switch. */
switch_rle_data = g_malloc (rle_data_max_size * XCF_TILE_SAVE_BATCH_SIZE);
/* The free function passed to the queue and thread pool will likely never
* be used. It would mean the thread pool is unfinidhed or the result
* queue still has data which would mean we had to interrupt the save,
* i.e. there is a bug in our code.
*/
queue = g_async_queue_new_full ((GDestroyNotify) xcf_save_free_job_data);
pool = g_thread_pool_new_full ((GFunc) xcf_save_tile_rle_parallel,
queue,
(GDestroyNotify) xcf_save_free_job_data,
num_processors, TRUE, NULL);
i = 0;
/* We push more tasks than there are threads, ensuring threads always have
* something to do!
*/
for (j = 0; j < num_tasks && i < ntiles; j++)
{
job_data = g_malloc (sizeof (XcfJobData ));
job_data->buffer = buffer;
job_data->file_version = info->file_version;
job_data->max_rle_size = rle_data_max_size;
job_data->tile_data = g_malloc (tile_size);
job_data->rle_data = g_malloc (rle_data_max_size * XCF_TILE_SAVE_BATCH_SIZE);
job_data->tile = i;
job_data->batch_size = MIN (XCF_TILE_SAVE_BATCH_SIZE, ntiles - i);
i += job_data->batch_size;
g_thread_pool_push (pool, job_data, NULL);
}
/* Continue pushing tasks and writing tasks as long as we have tiles to
* process.
*/
while (i < ntiles)
{
for (j = 0; j < num_processors && i < ntiles; j++)
while ((job_data = g_async_queue_pop (queue)))
{
for (k = 0; k < XCF_TILE_SAVE_BATCH_SIZE && i < ntiles; k++)
if (next_tile == job_data->tile)
{
gimp_gegl_buffer_get_tile_rect (buffer,
XCF_TILE_WIDTH,
XCF_TILE_HEIGHT,
i++,
&rledata[j].tile_rect[k]);
/* only single thread can create tile data when cache miss */
gegl_buffer_get (buffer, &rledata[j].tile_rect[k],
1.0, format, rledata[j].rletilebuf[k],
GEGL_AUTO_ROWSTRIDE, GEGL_ABYSS_NONE);
rledata[j].len[k] = 0;
guchar *tmp_rle_data;
gint batch_size;
tmp_rle_data = job_data->rle_data;
job_data->rle_data = switch_rle_data;
switch_rle_data = tmp_rle_data;
batch_size = job_data->batch_size;
for (k = 0; k < batch_size; k++)
rle_data_len[k] = job_data->rle_data_len[k];
/* First immediately push a new task for the thread pool,
* ensuring it always has work to do.
*/
job_data->tile = i;
job_data->batch_size = MIN (XCF_TILE_SAVE_BATCH_SIZE, ntiles - i);
i += job_data->batch_size;
g_thread_pool_push (pool, job_data, NULL);
/* Now write the data. */
for (k = 0; k < batch_size; k++)
{
*next_offset++ = offset;
xcf_write_int8_check_error (info,
switch_rle_data + rle_data_max_size * k,
rle_data_len[k]);
if (info->cp < offset || info->cp - offset > max_data_length)
{
g_message ("xcf: invalid tile data length: %" G_GOFFSET_FORMAT,
info->cp - offset);
g_thread_pool_free (pool, TRUE, TRUE);
g_async_queue_unref (queue);
g_free (offset_table);
return FALSE;
}
offset = info->cp;
}
next_tile += batch_size;
break;
}
else
{
g_async_queue_push_sorted (queue, job_data,
(GCompareDataFunc) xcf_save_sort_job_data,
NULL);
}
rledata[j].batch_size = k;
}
n_threads = j;
gegl_parallel_distribute (n_threads, xcf_save_tile_rle_parallel, rledata);
for (j = 0; j < n_threads; j++)
}
g_free (switch_rle_data);
/* Finally wait for all remaining tasks to write. */
while ((job_data = g_async_queue_pop (queue)))
{
if (next_tile == job_data->tile)
{
for (k = 0; k < rledata[j].batch_size; k++)
gboolean done = FALSE;
for (k = 0; k < job_data->batch_size; k++)
{
*next_offset++ = offset;
xcf_write_int8_check_error (info,
rledata[j].rlebuf[k],
rledata[j].len[k]);
job_data->rle_data + rle_data_max_size * k,
job_data->rle_data_len[k]);
if (info->cp < offset || info->cp - offset > max_data_length)
{
g_message ("xcf: invalid tile data length: %" G_GOFFSET_FORMAT,
info->cp - offset);
g_thread_pool_free (pool, TRUE, TRUE);
g_async_queue_unref (queue);
g_free (offset_table);
return FALSE;
}
offset = info->cp;
}
next_tile += job_data->batch_size;
if (job_data->tile + job_data->batch_size >= ntiles)
done = TRUE;
xcf_save_free_job_data (job_data);
if (done)
break;
}
else
{
g_async_queue_push_sorted (queue, job_data,
(GCompareDataFunc) xcf_save_sort_job_data,
NULL);
}
}
g_free (rletilebuf);
g_free (rlebuf);
g_free (rledata);
g_thread_pool_free (pool, FALSE, TRUE);
g_async_queue_unref (queue);
}
else
{
@ -2068,20 +2149,55 @@ xcf_save_tile (XcfInfo *info,
}
static void
xcf_save_tile_rle_parallel (gint thread,
gint n_thread,
gpointer user_data)
xcf_save_free_job_data (XcfJobData *data)
{
GimpXCFSaveTileRLEData *rledata;
g_free (data->rle_data);
g_free (data->tile_data);
g_free (data);
}
rledata = ((GimpXCFSaveTileRLEData*) user_data) + thread;
for (gint i = 0; i < rledata->batch_size; ++i)
xcf_save_tile_rle (rledata->tile_rect + i,
rledata->rletilebuf[i],
rledata->format,
rledata->rlebuf[i],
rledata->len + i,
rledata->file_version);
static gint
xcf_save_sort_job_data (XcfJobData *data1,
XcfJobData *data2,
gpointer user_data)
{
if (data1->tile < data2->tile)
return -1;
else if (data1->tile > data2->tile)
return 1;
else
return 0;
}
static void
xcf_save_tile_rle_parallel (XcfJobData *job_data,
GAsyncQueue *queue)
{
GeglRectangle tile_rect;
for (gint i = 0; i < job_data->batch_size; ++i)
{
gimp_gegl_buffer_get_tile_rect (job_data->buffer,
XCF_TILE_WIDTH,
XCF_TILE_HEIGHT,
job_data->tile + i,
&tile_rect);
/* only single thread can create tile data when cache miss */
gegl_buffer_get (job_data->buffer, &tile_rect,
1.0, gegl_buffer_get_format (job_data->buffer),
job_data->tile_data,
GEGL_AUTO_ROWSTRIDE, GEGL_ABYSS_NONE);
xcf_save_tile_rle (&tile_rect,
job_data->tile_data,
gegl_buffer_get_format (job_data->buffer),
job_data->rle_data + job_data->max_rle_size * i,
job_data->rle_data_len + i,
job_data->file_version);
}
g_async_queue_push_sorted (queue, job_data,
(GCompareDataFunc) xcf_save_sort_job_data,
NULL);
}
static void