Worker Threads dynamic management - Software Design Specification


Introduction

The current worker threads (that process the ldap operations) implementation has several flaws:

To fix these flaws, the new implementation objectives are:

This proposal achieves that by:

Current implementation synopsys


There is a global mutex and a global condition variable and a global queue

New implementation


Stacked data

In current implementation there are two stacks to handle:

I instead proposed to:

Overload warning (flow control)

A warning is logged if

Producters/consumers algorithm

There is still a global mutex and also three lists:

When pushing new job the listener threads:

The worker threads while waiting on new work:

Dynamic change of the number of threads

The number of threads may be changed dynamically by changing the nsslapd-threadnumber attribute in cn=config (when calling config_set_threadnumber). Restart is no more needed.

When changing the number threads:

Monitoring

The following data should be displayed:

Data Attribute name Description
waiting_thread size waitingthreads Number of worker threads that are waiting for jobs
busy_thread size busythreads Number of worker threads that are processing on  jobs
busy_thread high water mark maxbusythreads Highest number of worker threads that are processing on  jobs
waiting_job size waitingjobs Size of the waiting job queue
waiting_job high water mark maxwaitingjobs Highest size of the waiting job queue

Test

An automatic test can be done with the following steps:

And some manual tests that should be done at least once:

Considered Alternatives


Detailed Design

Functions

List handling (inline functions):

Prototype Description
void ll_init(llist_t *elmt, void *data) initialize a list element
void ll_link_before(llist_t *list, llist_t *elmt) insert an element before current one
void ll_link_after(llist_t *list, llist_t *elmt) insert an element after current one
void ll_unlink(llist_t *elmt) remove an element from the list

connection_wait_for_new_work

connection_wait_for_new_work(Slapi_PBlock *pb, pc_tinfo_t *tinfo)

This function handles part of the “consumer” side of the algorithm it read the job from the tinfo “conn” field or from the waiting_job queue or wait until condition variable is waken up.

add_work_q

static void add_work_q(work_q_item *wqitem, struct Slapi_op_stack *op_stack_obj)

This function handles the “producer” side of the algorithm

init_op_threads

static void init_op_threads()

Initialize the producers/consumers algorithm

op_thread_cleanup

void op_thread_cleanup()

Starts the producers/consumers shutdown procedure

connection_post_shutdown_cleanup

void connection_post_shutdown_cleanup()

Free the producers/consumers algorithm resources

op_thread_set_threads_number

static void op_thread_set_threads_number(int threadsnumber)

Change the threads number

op_thread_get_stat

static void op_thread_get_stat(op_thread_stats_t *st)

Gather the thread statistics

Data

Lists:

All the lists are doubly linked with a guard element as list head. allowing for fast item addition in head or tail of the queue and fast removal of an item Two statistics are associated with the list head:

List element: ll_list_t
Field Usage
next next item in the list
prev previous item in the list
data the data associated with the item (thread context for list handling threads and the operation for list handling jobs)
List head: ll_head_t

There is one ImportCtx struct per ImportJob and it contains the global data needed to perform the import over mdb

Field Usage
h The guarding element on which are linked the list elements
size The list size
hwm The list high water mark

Producers/Consumers data: pc_t

This is a static variable “pc” in connection.c

Field Usage
mutex The global mutex that protect the other fields
waiting_threads The waiting threads list contains the threads that are ready to handle operations (to be able to push a job to the thread)
Queue is handled in LIFO mode (for affinity)
busy_threads The busy threads list contains the threads that are busy (to be able to flag the thread as closing when reducing the number of threads)
waiting_jobs The waiting_jobs list contains the jobs (i.e connection on which activity have been detected.) that got queued when no worker thread is available
Queue is handled in FIFO mode (to keep as much as possible the incomming order)
jobs_free_list Free list for waiting_jobs queue elements (to avoid having to alloc the list items)
Queue is handled in LIFO mode (for affinity)
shutdown tells that instance shutdown is in progress

Thread context data: tinfo_t

This is the per thread context provided as argument when the thread is created

Field Usage
q the queue element for that thread (data is the thread context)
mutex The per thread mutex (protects cv,conn,closing)
cv The per thread cv
conn The job to process
closing The flag telling to stop the thread
idx The thread index (used for smnp and thread removal)
tid The thread id (to join the thread for thread removal

Statistics data: op_thread_stats_t

Field Usage
waitingthreads Number of worker threads that are waiting for jobs
busythreads Number of worker threads that are processing on  jobs
maxbusythreads Highest number of worker threads that are processing on  jobs
waitingjobs Size of the waiting job queue
maxwaitingjobs Highest size of the waiting job queue
Last modified on 2 April 2024