Worker Threads dynamic management - Software Design Specification
Introduction
The current worker threads (that process the ldap operations) implementation has several flaws:
- need to restart the server after changing the configured number of threads
- contention at the global mutex and condition variable level
- configuring too few or too many worker threads noticeably impact the performance.
To fix these flaws, the new implementation objectives are:
- Allow to change the number of worker threads without needing to restart the server. (keeping the current configuration attribute and cli command)
- Fix the condition variable contention issue.
- Reduce the impact of having too many worker threads configured
- Provide some monitoring data.
This proposal achieves that by:
- Allowing to create and stop worker thread dynamically.
- Not associating a condition variable with the global mutex but have a specific mutex and condition variable for each worker threads.
- Waking up a worker thread only if it has something to do.
- Providing some monitoring data.
Current implementation synopsys
There is a global mutex and a global condition variable and a global queue
- listener threads push data in in the global queue and wake up the worker threads
- worker threads loops waiting on the global condition variable for new work
then process it.
- to speed up the queueing and limit the alloc/free a free list element is used
New implementation
Stacked data
In current implementation there are two stacks to handle:
- the work queue element
- the operation struct
and avoid freeing/allocing them every time.
I instead proposed to:
- keep the operation in the thread context. (Anyway a thread only works on a single operation at a time)
- keep a free list of element for the waiting_job list. (That cannot be more than the maximum of connections in the queue so in the worse case the free list will 3864K (i.e around 3 Mb) but I wonder if we should not reduce this list size and let the listening threads sleep for a few ms if there are no more items. ( Anyway if the sleep delay is smaller than the time needed to flush the waiting job queue, no time is wasted.)
Overload warning (flow control)
A warning is logged if
- The warning has not been logged since some delay (1 minute)
- The waiting job queue size gets higher than a threshold (100*number of worker threads)
Producters/consumers algorithm
There is still a global mutex and also three lists:
- waiting_threads
- busy_threads
- waiting_jobs
Access to these lists are protected by the global mutex lock
When pushing new job the listener threads:
-
Get global mutex
-
Loop forever:
The worker threads while waiting on new work:
- Get the thread mutex
- Loop forever:
- if conn is provided in the thread context
- Fill the pblock with conn, op
- Reset conn in thread context
- Release the thread mutex
- return CONN_FOUND_WORK_TO_DO
- if thread shutdown flag is set:
- Release the thread mutex
- return CONN_SHUTDOWN
- Release the thread mutex
- Get the global mutex
- If waiting_job queue is empty
- Unlink thread from busy threads list
- Link thread at start of waiting threads list
- Release the global mutex
- Get the thread mutex
- Wait on the thread condition variable
- Loop again
- else
- Move first job from waiting_job list to the jobs_free_list list
(after storing the conn in a local variable)
- Release the global mutex
- Fill the pblock with conn, op
- return CONN_FOUND_WORK_TO_DO
Dynamic change of the number of threads
The number of threads may be changed dynamically by changing the
nsslapd-threadnumber attribute in cn=config (when calling config_set_threadnumber).
Restart is no more needed.
When changing the number threads:
- if adding threads:
- Get global mutex
- If global shutdown
- Release the global mutex
- Exit out of the function
- Compute first new thread index:
(size of working_threads list + size of busy_threads list) +1
- For each missing threads:
- Alloc the thread context struct
- Init its mutex and condition variable
- Store the thread index in context then increment it
- Create the thread (as joinable)
- Store the thread id in the context
- Release the global mutex
- else if removing threads:
- create a local empty “closing” list
- Walk waiting_threads list:
- if thread index is higher than wanted number of thread
- Move thread from waiting_threads list to closing list
- Walk busy_threads list:
- if thread index is higher than wanted number of thread
- Move thread from busy_threads list to closing list
- Release the global mutex
- Walk closing list:
- Get thread mutex
- Set thread closing flags
- Wake up the thread
- Release thread mutex
- Walk closing list:
- Join the thread
- Unlink thread from closing list
- Free thread context
Monitoring
The following data should be displayed:
Data |
Attribute name |
Description |
waiting_thread size |
waitingthreads |
Number of worker threads that are waiting for jobs |
busy_thread size |
busythreads |
Number of worker threads that are processing on jobs |
busy_thread high water mark |
maxbusythreads |
Highest number of worker threads that are processing on jobs |
waiting_job size |
waitingjobs |
Size of the waiting job queue |
waiting_job high water mark |
maxwaitingjobs |
Highest size of the waiting job queue |
Test
An automatic test can be done with the following steps:
- create an instance
- get the monitoring results
- check that waitingthreads+busythreads == configured number of threads
- run pstack of server and check that number of connection_threadmain is the expected one
- increase the number of threads
- check that waitingthreads+busythreads == configured number of threads
- run pstack of server and check that number of connection_threadmain is the expected one
- decrease the number of threads
- check that waitingthreads+busythreads == configured number of threads
- run pstack of server and check that number of connection_threadmain is the expected on
And some manual tests that should be done at least once:
- Test with 1000 workers threads and check how long it takes to get a pstack.
(both when the server is idle and while running a searchrate/ldclt load)
Considered Alternatives
- improving old worker thread algorithm to be able to change the number of threads dynamically. (Rejected because a prototype showed that on my laptop, when using trivial jobs that atommically increment a counter, the proposed algorithm was 6 time faster than the older)
- Adjusting automatically the number of threads to the load. (Rejected because an hard limit is still needed to limit the resource use if an operation blocks the server for some time - and with the new algorithm useless configured threads should not have a noticeable impact (except on the virtual memory footprint)
- while pushing job in waiting_job queue:
- Do not limit list size (Rejected because of the risk of exhausting
the system resources)
- alloc job list element (while keeping the global lock or use pre-alloced elements as in current code). Rationnal: This queuing occurs when incomming job load is higher that what the working threads can handle, so the priority is to let the worker threads picks new job as fast as possible and we can slow the listening threads. Furthermore monitoring info are there to let administrator or external healthcheck tool take the decision.
- Using NSPR thread / mutex / condvar or using pthreads:
pthreads was chosen because there is less overhead and as on linux nspr relies on pthreads
Detailed Design
Functions
List handling (inline functions):
Prototype |
Description |
void ll_init(llist_t *elmt, void *data) |
initialize a list element |
void ll_link_before(llist_t *list, llist_t *elmt) |
insert an element before current one |
void ll_link_after(llist_t *list, llist_t *elmt) |
insert an element after current one |
void ll_unlink(llist_t *elmt) |
remove an element from the list |
connection_wait_for_new_work
connection_wait_for_new_work(Slapi_PBlock *pb, pc_tinfo_t *tinfo)
This function handles part of the “consumer” side of the algorithm
it read the job from the tinfo “conn” field or from the waiting_job queue or wait until condition variable is waken up.
add_work_q
static void
add_work_q(work_q_item *wqitem, struct Slapi_op_stack *op_stack_obj)
This function handles the “producer” side of the algorithm
init_op_threads
static void init_op_threads()
Initialize the producers/consumers algorithm
op_thread_cleanup
void op_thread_cleanup()
Starts the producers/consumers shutdown procedure
connection_post_shutdown_cleanup
void connection_post_shutdown_cleanup()
Free the producers/consumers algorithm resources
op_thread_set_threads_number
static void op_thread_set_threads_number(int threadsnumber)
Change the threads number
op_thread_get_stat
static void op_thread_get_stat(op_thread_stats_t *st)
Gather the thread statistics
Data
Lists:
All the lists are doubly linked with a guard element as list head.
allowing for fast item addition in head or tail of the queue and fast removal of an item
Two statistics are associated with the list head:
- the list size
- the list high water mark (i.e the maximum value of the list size)
Access to the lists are protected by the global mutex
( with the exception to the “closing” list that is handled locally (i.e in a single thread) and is not protected. )
List element: ll_list_t
Field |
Usage |
next |
next item in the list |
prev |
previous item in the list |
data |
the data associated with the item (thread context for list handling threads and the operation for list handling jobs) |
List head: ll_head_t
There is one ImportCtx struct per ImportJob and it contains
the global data needed to perform the import over mdb
Field |
Usage |
h |
The guarding element on which are linked the list elements |
size |
The list size |
hwm |
The list high water mark |
Producers/Consumers data: pc_t
This is a static variable “pc” in connection.c
Field |
Usage |
mutex |
The global mutex that protect the other fields |
waiting_threads |
The waiting threads list contains the threads that are ready to handle operations (to be able to push a job to the thread) Queue is handled in LIFO mode (for affinity) |
busy_threads |
The busy threads list contains the threads that are busy (to be able to flag the thread as closing when reducing the number of threads) |
waiting_jobs |
The waiting_jobs list contains the jobs (i.e connection on which activity have been detected.) that got queued when no worker thread is available Queue is handled in FIFO mode (to keep as much as possible the incomming order) |
jobs_free_list |
Free list for waiting_jobs queue elements (to avoid having to alloc the list items) Queue is handled in LIFO mode (for affinity) |
shutdown |
tells that instance shutdown is in progress |
Thread context data: tinfo_t
This is the per thread context provided as argument when the thread is created
Field |
Usage |
q |
the queue element for that thread (data is the thread context) |
mutex |
The per thread mutex (protects cv,conn,closing) |
cv |
The per thread cv |
conn |
The job to process |
closing |
The flag telling to stop the thread |
idx |
The thread index (used for smnp and thread removal) |
tid |
The thread id (to join the thread for thread removal |
Statistics data: op_thread_stats_t
Field |
Usage |
waitingthreads |
Number of worker threads that are waiting for jobs |
busythreads |
Number of worker threads that are processing on jobs |
maxbusythreads |
Highest number of worker threads that are processing on jobs |
waitingjobs |
Size of the waiting job queue |
maxwaitingjobs |
Highest size of the waiting job queue |
Last modified on 31 July 2024