Why we write this article ?
During software development, there are so much problems that we need to resolved, and we don't want to depending on some random libraries. There might be some design patterns on the internet, but it not working that great, there are a lot of enhancement that needed to make it work.
Instead we're sharing our solution, that we already used and success with it, so instead of using library, you can simply copy and paste code to your project.
The Problems
There are some physical boundaries in computer that we can not break through, such as ram, cpu speed, cpu word size (32 or 64 bits), disk space, battery, etc… Those limits are not stopping there, it also spread out to cause others limit, for example, 100 connections is the default maximum number of concurrent connections to Postgres, or 1024 is the default limit for concurrent opened files per process in Ubuntu. Even though we could softly increased it, but the problems is we will always be bounded in a specific limit, and reaching that limit is not what we wanted to do, it will cause panic to our system. So that I believe one of the keys to building durable runtime software lies in understanding and respecting all types of resources we interact with..
Moreover, interacting with these resources - whether through networking or mainly CPU-bound operations - produces some overhead if done excessively. Imagine if we make a new connection to the database on every RESTful request. It would be a disaster for our system; we would waste most of our CPU time initiating required resources instead of focusing on the main logic.
If I did touch your pain point(s), welcome to my article. The design pattern we will discuss today is Resource Pooling, with Rust and asynchronous syntax (Tokio).
Who Will Benefit Most from Reading This?
- Engineer who don't want to use library for small problems, this code is ready to used unless you need to add some extra features
- Developers looking for an implementation of resource pooling in Rust.
- If you are looking for a resource pooling solution that works with asynchronous programming, btw we're using Tokio, I think you just need to update some importes statements if you're not using tokio for asynchorous or you're not using asynchronous at all.
- High-expectation developers who not only want a solution but also desire the best experience while using it.
The ideas
First, let’s briefly examine how to use it at the end of the implementation:
The PoolRequest<T> is a lightweight object that act as a request to the pool, it is Send + Sync + Clone, and also very cheap to clone across multiple threads. It's retrieve fn will request the pool for a PoolResponse<T>
The PoolResponse<T> has implemented Deref<T> + DerefMut<T> so that it can be dereferenced into the resource.
The PoolResponse also making sure there are no resources could escape out of the pool, every resource after leaving the pool, must return back after used, so that it can be be reused by others.
Take a look at this naive design on how a basic API application could use the pattern to solve resource initialization overhead.
Simple flow diagram
The pool already hold an amount of resource items, so that we are able to eliminate issue of resource initiation overhead. Every handler holds a PoolRequest instance, which is allowing them to request resource as they wanted to. But there are more problems than that.
More problems and solutions
By using the simple approach described above, four potential issues may arise:
1. Inability to provide resources when the number of concurrent requests exceeds the number of resources in the pool.
To address this, we'll implement two limits. The first is min_pool_size, which defines the minimum number of resources that are always ready for use. The second is max_pool_size, which can be much larger than min_pool_size. This allows additional resources to be initialized to handle extra requests, but the total number of resources will not exceed max_pool_size.
2. Applying max_pool_size could result in a rapid increase in the number of resources.
We'll introduce resource_idle_timeout, specifying the maximum time an unused resource can remain in memory while still maintaining the minimum number of resources in the pool.
For example, if your RESTful API most of the time only need to handle 10 concurrent requests and you set min_pool_size = 10, but suddenly receive 1,000 requests at once, your application will return to 10 concurrent requests after the surge, leaving 990 resources unused in the pool.
By setting resource_idle_timeout = 5 minutes, the pool can release those 990 unused resources if they remain idle for five minutes.
3. What if requests exceed max_pool_size? Should we fail those requests?
We can handle this by applying retrieving_timeout to PoolRequest<T>. This allows the request to wait (within a specified timeout) for a resource to be released back into the pool, which can then be used for the current request.
4. Setting a high min_pool_size might cause our service to take a long time to start because it needs to initialize many resources.
Instead of initializing resources one at a time, we can initialize them concurrently, reducing the startup time.
Diagram illustrating the flow of retrieving, releasing resources, and removing idle resources
Final flow diagram
Implementation
Resource Abstraction
Before implementing the pool, we need a way to abstract the resources because the pool doesn't concern itself with what the resource is or how to initialize it. It only cares about the state of the resource—for example, whether it has timed out.
a. ResourceProvider
To abstract the process of creating new resources, we'll create a trait that encapsulates the initialization logic for different types of resources.
You may notice that the new function cannot accept custom parameters, but it’s essential to initialize a resource with custom parameters.
We can create a struct to hold the parameters, and that struct will implement the PoolResourceProvider<T> trait.
Now that we’ve established an effective abstraction for creating new resources, let’s proceed to the next step.
b. PoolItem<T>
The fundamental idea behind a PoolItem is that it will manage the state of each resource, for example How long this resource is being in the pool.
Also, the PoolItem making sure our code will be easy to maintained, provide us a way to implement more detailed logic on how to manage each resource.
Let's take a look on struct PoolItem<T>
There are two functions that we need to talk about:
The ::refresh function will be called when a PoolItem has finished it's duty and returned back to the pool. We will refresh it's count down to the beginning as a 'award' that it's has finished it's duty instead of laying in the pool and doing nothing.
The ::timeleft function will counting the max amount of time that the resource could be laying in the pool and doing nothing, if it exceed the timeout which means the timeleft will return Duration::ZERO, then it will be removed by the PoolCleaner we will discuss further later in this post ?
The Pool
Finally, let's discuss our main component: the pool itself.
The pool consists of three primary parts:
The Pool: Hold an array of resources, reponsible for managing the list of resources, and making sure every new resource that be created will also be counted very carefully. This is also the core part which will be shared by Allocator and Cleaner.
The Allocator: This component initializes resources and provides them as needed. It handles the creation of new resources and supplies them to requesters.
The Cleaner: This part is responsible for maintaining the pool's efficiency by cleaning up unused resources, preventing unnecessary memory usage.
a. Pool<T>
Let's talk about the core Pool<T> first, here is it's structure:
The min_size: Is the minimum amount of resources that will always be in the pool, it help our system reduce overhead in creating new resource too freequently.
The max_size: Is the maximum amount of resources that could be created by the pool, whenever the pool don't have enough resource to provide, it could create some extra resouces to serve, and after these extra resources is used, it will be cleanup if it not in used for max_idling_timeout as described above.
The items: This hold an array of PoolItem, it is wrapped in a Mutex because the entire struct Pool<T> will be saved in the heap instead of stack, and will be shared across across different async tasks.
The counter: This is used to counting every resource in the pool. You might asked why we are not using the items.len() instead. But it is because a resource when being requested it will be take out of the items then eventhough the items does reduced it's size but there are a resource has been created and borrowed, we can not trust the items.len() we need to have another counter, and here is it.
Here is it's implementation, noticed that all of the import is from tokio.
Note: To prevent dead lock, we must ensure two conditions:
1. In any situation there is no more than one lock occured.
For example:
- Thread 1: Wait for lockB to released lockA.
- Thread 2: Wait for lockA to released lockB.
It will continue to wait for each other until the end of this world. To save the world, never allowed any two locks that could occured at a time.
Never do this:
Instead, we do this.
2. Establish a single source of truth to manage locks; this makes it much easier to review logic when a deadlock occurs..
While deadlocks may not happen today, there’s always the possibility that a mistake by a colleague could cause one in the future. By centralizing lock management, we simplify the review process and reduce the risk of errors.
Now, let's get back to the core implementation of the Pool.
::add_new_item function will accept new resource to the pool, mostly triggered at the initial time of the PoolAllocator where it need to inititate various of new resources to fix the min_size
::return_borrowed_item function will accept a borrowed resource, what is a borrowed resource ? It is the resource that has been requested, and now it returned back to the pool. In this case there is no need to increased the counter.
::invalidate function will invalidate the item at index if the pool size is exceed the min_size and the item has exceed max_idling_timeout.
a. PoolAllocator<T>
First let's take a look on the struct.
We create a PoolAllocator struct that requires a generic type <T>, representing the type of resource it will provide.
The PoolAllocator<T> holds a Vector of PoolItem<T>, which is wrapped inside an Arc to allow sharing the reference across different threads. We then wrap this with an RwLock, enabling multiple readers but only one writer at a time. Together, these wrappers allow us to share the PoolAllocator across threads without violating Rust's memory safety rules.
Regarding why the allocator needs to keep a reference to the PoolCleaner, it's because it decides when to start the cleanup. Additionally, this ensures that the PoolCleaner<T> will be dropped when the allocator is dropped.
Now, let's examine the main logic of the allocator to see how it works.
The ::init function has two main responsibilities. First, it initializes the minimum number of resources to populate the pool. And to prevent long startup time we will create all resources concurrently instead of one by one.
The ::retrieve function extracts a resource from the pool. And if the number of resource has not been reached to the max_pool_size, we initiate a new resource.
And also, if there are no room for any extra new resource, we add the request to waiters list. The waiters treat everyone equally, which means first come first serve. Whenever resource is available it will be used to serve the request first.
The ::put function returns the resource back to the pool, but if there are some waiters, we will allowed the resource to have another trip to serve new request on the waiter list.
If the resource returned successfully to the pool, we understand that the pool is started to become less busy than before. So let's trigger the cleanup process.
b. PoolCleaner<T>
We have successfully implemented resource allocation; now it's time to free up some 'lazy' resources in our system. We love efficient of how the allocator work, but we (should) hate wasting resource.
A resource is considered as lazy if the pool size is exceed max_pool_size and the resource is laying in the pool for too long > resource_idle_timeout.
Let's take a quick look at how this process works:
The code above could be easily explained as:
- Clean up is a background job, and there is there is only maximum one cleanup background job could be run at a time.
- Cleanup will start when the pool allocator 'think' that it found a lazy resource.
- Cleanup will stop when there are no extra resources pool.size > min_pool_size or there are no resources in the pool, which means the pool is very busy, there is no room for a background job to run.
- To both optimize (less cpu blocking) and efficient (on time) cleanup the resource, we call yield_now() for not blocking cpu, and calculate the min time to delay(), to not trigger cleanup loop very often which required lock() on the pool.
The PoolRequest<T>
After a great time with PoolAllocator we have to think about a way to work with it, especially the strictly rule of Rust on memory safety which will preventing us from just simply hold a reference of any kind of things across async tasks.
To deal with it we must create a struct that Send + Sync + Clone.
Let’s take a look at how it works:
The PoolRequest could be Send + Sync + Clone and also cheap to be cloned by only keeping the Arc reference to the PoolAllocator.
Then by calling the retrieve function, it will request the resource from the pool, and if there is no resource available, it will wait for a retrieving_timeout time. If there still no resource that available, sadly that we have to return a None result.
The PoolResponse<T>
To be honest, I would prefer the PoolRequest to return the resource directly. We all know how it feels to have ownership of an instance rather than just a reference, right?
But every resource have to go back to the pool, because it is needed to reused by the others ! What if I, you, or one of our colleagues takes the resource and just drops it? It would never return to the pool, and it is a waste, we want cheap and efficient solution.
We need a better approach. By creating PoolResponse, we ensure the resource is always returned to the pool.
Now, let's take a look at how it works.
By implementing both Deref and DerefMut, we are able to freely interact with the resource without any limitations (I hope so).
And the secret on how the resource could automatically return to the pool after used is within the Drop trait implementation. Whenever the PoolResponse goes out of scope or by manually call drop(response), we will returns the resource back to the pool.
This also explains why the pool and resource need to be an Option, because we need to take the resource out and pass it to the async task, which runs independently after the PoolResponse is dropped.
The usage
First assuming that DbConnection is our resource
Then we abstract the resource by implement PoolResourceProvider trait
And we used it as below.
This playground also included full code, feel free to copy it to your project, We also implement the PoolBuilder and PoolRequestBuilder as a bonus.