>

August 23, 2020

Producer consumer pattern for fast data pipelines

Let’s build a simple data pipeline. It will read a SOURCE table in MySQL database. This table has only one column VALUES and contains 1000 rows - numbers from 1 to 1000. The application will calculate square root of the number and put the result in DESTINATION table. This table will have 2 columns - VALUE column holding original number - ROOT column for the result of calculation

Very simple implementation of this pipeline can be like this one:

from math import sqrt
import mysql.connector # 1


def main_simple():
    connection = mysql.connector.connect(
        host="localhost", 
        user="yourusername",
        password="yourpassword"
    ) # 2 
    cursor_read = connection.cursor()
    cursor_read.execute("SELECT VALUE FROM SOURCE") #3
    
    cursor_write = connection.cursor()
    for row in cursor_read: # 4
        value = int(row[0])
        root = sqrt(value) # 5
        cursor_write.execute(f"INSERT INTO DESTINATION (VALUE, ROOT) VALUES ({value}, {root})") # 6
    connection.commit() # 7
  1. Import Mysql Connector. It can be installed with pip install mysql-connector-python
  2. Connect to the database by providing host name (local machine in this case), username and password
  3. Read the data from SOURCE table. This command won’t retrieve the data yet
  4. Data from database is read in this loop row by row
  5. Here a calculation is done
  6. Insert the result into DESTINATION table
  7. Don’t forget to commit in the end to persist the changes to DESTINATION table

We don’t usually write application by cramping everything into one function. Let’s decompose it into two functions with separate responsibilities.

First function will read the data:

def producer(connection):
    cursor_read = connection.cursor()
    cursor_read.execute("SELECT VALUE FROM SOURCE")
    for row in cursor_read:
        yield row[0]

The second one will do the calculation for one item - number - and save the result:

def consumer(connection, item):
    value = int(item)
    root = sqrt(value)
    cursor_write = connection.cursor()
    cursor_write.execute(f"INSERT INTO DESTINATION (VALUE, ROOT) VALUES ({value}, {root})")

Let’s bring it all together

def main_decompose():
    connection = mysql.connector.connect(
        host="localhost", 
        user="yourusername",
        password="yourpassword"
    )
    for value in producer(connection):
        comsumer(connection, value)
    connection.commit()

Timing the execution of this function with timeit magic in Jupyter gives

CPU times: user 46.9 ms, sys: 93.8 ms, total: 141 ms
Wall time: 1.41 s

Total CPU time is 141ms, but the whole execution of the program is 1.41s. It means that it spends most time communicating with database and very little doing actual calculation. Concurrency can speed things up here. And since this is a IO heavy task it’s a good candidate to use threads as a concurrency mechanism.

Producer consumer pattern

Producer consumer pattern is widely used for such applications. The idea is that producer and consumer runs in separate threads in parallel. Producer reads the data and put them into a queue and consumer reads data from the queue, processes it and saves. The number of producer and consumer threads can be more than one.

Python standard library has a queue module that can be used in concurrent applications and that free us from implementing all the locking/mutexes.

The queue module implements multi-producer, multi-consumer queues. It is especially useful in threaded programming when information must be exchanged safely between multiple threads. The Queue class in this module implements all the required locking semantics.

Python has two modules for concurrency with threads. Low level threading and high level abstraction with concurrent.futures.

Let’s convert our producer and consumer to - accept queue an the only argument - create DB connection internally as it’s not thread-safe to pass it from main to these functions

producer will accepts a queue and instead of returning data elements it will put them in the queue. consumer will start a loop that exits when queue is empty. In the loop it will get element from the queue, process it and after all the processing is over commit the changes to database

from concurrent.futures import ThreadPoolExecutor
from queue import Queue

def producer_threaded(queue):
    connection = mysql.connector.connect(...)
    cursor = connection.cursor()
    cursor.execute("SELECT VALUE FROM SOURCE")
    for row in cursor:
        queue.put(row) # 1

def consumer_threaded(queue):
    connection = mysql.connector.connect(...)
    cursor = connection.cursor()

    while not queue.empty(): # 2
        item = queue.get() # 3
        value = int(item)
        root = sqrt(value)
        cursor_write.execute(f"INSERT INTO DESTINATION (VALUE, ROOT) VALUES ({value}, {root})")

        queue.task_done() # 4
    connection.commit()
  1. producer puts items in the queue instead of returning
  2. consumer loop check that queue has some elements with a special method instead of len
  3. Item is extracted from the queue
  4. When item processing is over this method of the queue should be called, otherwise the queue won’t become empty as in # 2

The main that will bring it all together will utilize ThreadPoolExecutor that will manage threads for us:

def main_threaded():
    q = Queue() # 1
    with ThreadPoolExecutor(max_workers=4) as executor: # 2
        executor.submit(producer_threaded, q) # 3
        for _ in range(3):
            executor.submit(consumer_threaded, q)
    # 4
  1. queue is instantiated, but it’s wise to limit the size of it as if the consumers are much slower than producers it will grow indefinitely and consume a lot of memory
  2. Number of workers is also an important parameter - I’m putting 4 here so the pool have enough threads for one producer and 3 consumers
  3. This is how you start a thread - provide an function as a fist positional argument and then all arguments for the function itself
  4. Upon exiting for this context manager executor will wait for all threads to finish

Let’s time this:

CPU times: user 109 ms, sys: 141 ms, total: 250 ms
Wall time: 481 s

it runs 3 times faster than non-concurrent implementation.

Futures

Let’s introduce a bug in our producer:

def producer_threaded(queue):
    connection = mysql.connector.connect("some wrong password")

When running the above code you’ll see no traceback as it runs in a separate thread. To get the exceptions and result for threads in the pool you can utilize a special Future object returned by submit method:

from concurrent.futures import as_completed

def main_threaded_future():
    q = Queue() 
    futures = []
    with ThreadPoolExecutor(max_workers=4) as executor:
        producer_future = executor.submit(producer_threaded, q)
        futures.append(producer_future) # 1
        for _ in range(3):
            consumer_future = executor.submit(consumer_threaded, q)
            futures.append(consumer_future)

        for future in as_completed(futures): # 2
            future.result() # 3
  1. Store the returned future in a list
  2. iterate over futures that finished execution
  3. It will not only reraise exceptions but also return the results from threads (ours return None)

Paging

We can do better in terms of performance we we spin multiple producer. Each producer will read only a part of SOURCE table. For that we will execute paging query:

def producer_paged(queue, page, page_size=500):
    connection = mysql.connector.connect(...)
    cursor = connection.cursor()
    cursor.execute(f"SELECT VALUE FROM SOURCE LIMIT {page_size} OFFSET {page_size * page}")
    for row in cursor:
        queue.put(row) # 1

Now let’s create 2 producers, each reading 500 rows from SOURCE table:

def main_threaded_future():
    q = Queue() 
    futures = []
    with ThreadPoolExecutor(max_workers=4) as executor:
        for page in range(2):
            producer_future = executor.submit(producer_threaded, q, page)
            futures.append(producer_future) # 1
        for _ in range(2):
            consumer_future = executor.submit(consumer_threaded, q)
            futures.append(consumer_future)

        for future in as_completed(futures): # 2
            future.result() # 3

The execution time for paged producer is 2 times faster than previous implementation:

CPU times: user 0 ns, sys: 62.5 ms, total: 62.5 ms
Wall time: 221 ms

Conclusion

Producer-consumer is a powerful design pattern for data pipelines. In Python it can be implemented using standard library through queue and concurrent.futures modules.

Support the author - Buy me a coffee!

Comments powered by Talkyard.

© Alexey Smirnov 2021