CODEX

Background Job and Queue — Concurrency and Ordering

Tuan Nguyen
CodeX
Published in
7 min readJan 29, 2021

--

This post aims to provide you an overview of how to fulfill requirements from the perspective of system design. Having said that, its content is not technology-driven, meaning that the implementation of what discussed is transparent to something like Kafka or whatever message queues like RabitMQ. There are no such features specific to a certain system or technology or framework.

In my previous articles (here and here), I did not discuss the relationship between jobs, assuming that they are non-related. By doing so, I simplified things, can do any scale up/down/out on the Worker Nodes as many as you want to leverage the advantages of concurrency. Unfortunately, life is not that easy :) You might face a number of different issues when you jump on a real system. One of them is the dependency between jobs, in that how to ensure jobs to be done in order and in a concurrent manner up to some extent, which is not a simple task.

Note that job dependency is characterized in 2 phases, design and operation, which make the background job implementation even more challenging.

  • Design phase: There are a limited number of types of jobs whose execution order is pre-determined at the designing phase
  • Operation phase: Execution order of jobs can be also determined according to their data which value is only known at runtime. For example, the job “create comment X of post A” needs to finish before the job “create comment Y of post A”.

Existing approaches

Let me walk you through available approaches and clarify their advantages as well as limitations.

Simple version — 1-queue-1-worker for all jobs

This is a straightforward approach since you don’t need to concern about concurrency. With a FIFO queue, the worker only processes one job at a time and the jobs’ execution order is based on the order in the queue. And you can easily see the big issue — slow processing. Imagine it comes at the rate of 1000 jobs per hour but the system is capable of handling 500 jobs per hour. Eventually, you will need a queue with infinity length to keep the jobs.

Improved version — 1-queue-1-worker for each job type

If you tried to implement a toy example of version 1, you can consider improving it with this approach. What you need to do is to separate jobs by their type and feed them to different queues each of which in turn has its own worker.

While this reduces the high workload issue by balancing the load into multiple queues and workers. Since the number of job types is typically limited, we don’t need to worry about the resource. We also don’t have to worry about execution order or concurrency since jobs of different types are independent of each other in most of the systems (if not, then keep reading with other solutions). However, this version is still limited to the fact that there is only one worker for one job type.

Better version — 1-queue-multiple-worker for each job type

This pattern is widely adopted by systems. There are popular job queue libraries like Resque (Ruby), Celery (Python), Bull (Nodejs), etc., or those with backend relying on Redis.

By doing in this way, we improve system performance with multiple workers processing one type of concurrent jobs, thus enabling horizontal scaling. What remains is the question related to concurrency and job execution order. For example, as many jobs are processed simultaneously, they might require to access (and update) the shared resource at the same time, or some jobs might finish earlier than the others which occur after (and supposed to finish later).

This is not something that can be easily solved. Different options can be considered for different systems with specific requirements, that are

  • Using Atomic transaction to update data. This is applicable to critical transactions and data on the same database. However, you can update one or several records with the atomic transactions but not a good idea for the whole process in preparing data, obtaining information from service A, calculating, etc. In the presence of a huge number of jobs, the possibility of a job to be failed in execution is pretty high due to atomic transactions. Sometimes, that is really a nightmare that you don’t want to imagine.
  • Using Locking to lock the data that needs to be updated. This is applicable if the lock lasts tens of milliseconds and the number of dropped or retried jobs that are together overlapped in locking the same resource is acceptable. But as the job needs to process more tasks and thus requires a longer locking period, there are more and more dropped jobs.
  • Using Partition Queue as Kafka does. I don’t dig into the details of this approach as you may ask “If it is that complicate, why don’t you just use Kafka, it’s free?”.
  • Using 1 queue per post and have 1 worker listening to that queue. For example, rather than using 1 queue for the job create comment (for any post), we create multiple queues for the job create a comment of post-A, then have no worry about all the issues of concurrency, lock, blah blah. The thing is that it requires a dynamic mechanism to create (and listen to) queues. During runtime, we have to keep creating queues and handle their own queued jobs. Can you understand? No, then take a look at the following code:

Here is the code for one task queue:

But instead, here is what we need:

You can see the issue of this approach when adopting task queue libraries. It not only requires ad-hoc modifications to manage the registration and de-registration of a queue but also results in additional (and inefficient) connection, resource fragmentation, etc.

A little bit further

Now is the time that a small improvement can lead to a considerable improvement.

Best version (so far) — 1-queue-multiple-worker for each job type with 1 lock + 1 data-source for handling concurrency

Like the above better version, you create different queues for different posts A, B, C. Unlike the above better version, you need different ways to implement it (which is a practical point IMHO) regarding the fact that you need to integrate your work with existing task queue libraries (or frameworks). Instead of creating a job for post-A with data X, you create a job for post-A without data. Then that job (for post-A without data) will get data from another data-source, i.e. Redis list, database, and process. Upon finishing, it keeps going with the next job for post-A (without data, of course).

A minor issue of this overlapped scheduling approach is that if a job for post-A fails and after TIMEOUT_DURATION there is no new job to process for post-A, then the queue of post-A will be waiting forever. Not clear, check out the following code which is actually a separate process to check all the queues with that problem and re-schedule the jobs

In fact, you update your code with nothing but the mechanism of pushing and pulling data of jobs using a data-source and a lock. The advantage of this is to facilitate the integration of your work and most databases. Here is a prototype with Redis.

As I said earlier, this is just a reference prototype that you can apply to your case with the requirement of job concurrency. It means there are still a lot of things that you can enhance in terms of reliability when poping data, retrying upon error, worker crash, etc.

Let me summarize what we achieve so far:

  • Avoid continuously listening to the library’s queue. The declaration of listener and handler is done once at the beginning when the worker starts.
  • Enable jobs create comment X for post-A and create comment X for post-B to be executed simultaneously
  • Ensure jobs create comment X for post-A and create comment Y for post-A be in order.
  • Be able to scale the whole system or worker without much concern.

Acknowledge

I would like to send my big thanks to Quang Minh (a.k.a Minh Monmen) for the permission to translate his original post.

--

--