1.1.2. Concurrency & Parallelism in Python#
Concurrency và parallelism chỉ hai nguyên tắc cơ bản của việc triển khai nhiệm vụ trong điện toán, với mỗi nguyên tắc lại có đặc điểm riêng.
Concurrency là tính năng của một chương trình để quản lý nhiều tác vụ cùng lúc mà không cần thực thi chúng ở cùng thời điểm chính xác. Nó xoay quanh ý tưởng xen kẽ các nhiệm vụ, chuyển đổi giữa chúng theo phương pháp hiện đồng thời.

Parallelism liên quan tới việc triển khai hàng loạt nhiệm vụ song song. Nó thường tận dụng nhiều lõi hay bộ vi xử lý CPU. Parallelism đạt được triển khai đồng thời thực sự, cho phép bạn thực hiện các nhiệm vụ nhanh hơn và phù hợp cho các hoạt động tính toán mở rộng.

Tầm quan trọng của Concurrency và Parallelism
Sử dụng tài nguyên: Concurrency cho phép sử dụng hiệu quả tài nguyên hệ thống, đảm bảo các nhiệm vụ đó đang tích cực tiến triển thay vì chờ đợi tài nguyên bên ngoài.
Phản hồi: Concurrency có thể cải thiện khả năng phản hồi của ứng dụng, nhất là trong bối cảnh liên quan tới giao diện người dùng hoặc web server.
Hiệu suất: Parallelism quan trọng trong việc đạt hiệu suất tối ưu, nhất là với những nhiệm vụ liên quan tới CPU như tính toán phức tạp, xử lý dữ liệu và mô phỏng.
Có thể mở rộng: Cả đồng thời và song song đều cần cho xây dựng các hệ thống có thể mở rộng.
Kiểm chứng trong tương lai: Khi xu hướng phần cứng liên tục ủng hộ các bộ vi xử lý đa lõi, khả năng khai thác tính song song sẽ ngày càng trở nên cần thiết.
Khi nào dùng concurrency và parallelism?
Lựa chọn giữa concurrency và parallelism phụ thuộc vào bản chất của nhiệm vụ và tính sẵn có của tài nguyên hardware.
Dùng concurrency khi xử lý những nhiệm vụ liên quan tới I/O, như đọc và ghi file hoặc tạo các truy vấn mạng, và khi lo ngại về hạn chế bộ nhớ.
Dùng multiprocessing khi bạn có các nhiệm vụ liên quan tới CPU mà có thể tận hưởng lợi ích từ parallelism thực sự và khi bạn có sự cô lập mạnh mẽ giữa các nhiệm vụ, nơi mà thất bại của một nhiệm vụ sẽ không ảnh hưởng tới các nhiệm vụ khác.
1.1.2.1. Các tác vụ I/O operations and CPU Operations#
Phân loại các Bound (tác vụ bị nghẽn bởi 1 vấn đề nào đó):
CPU Bound: là những chương trình bị giới hạn chủ yếu bởi tốc độ của CPU: việc tính toán small set of number,…–> Nên giải quyết bằng dùng CPU pool
I/O Bound: means the rate at which a process progresses is limited by the speed of the I/O subsystem. A task that processes data from disk, for example, counting the number of lines in a file is likely to be I/O bound. –> Nên dùng thread pool hoặc cơ chế bất đồng bộ tasks
Memory Bound: means the rate at which a process progresses is limited by the amount memory available and the speed of that memory access. A task that processes large amounts of in memory data, for example : nhân matrix lớn
Cache Bound: means the rate at which a process progress is limited by the amount and speed of the cache available. A task that simply processes more data than fits in the cache will be cache bound.
Ví dụ 1 số tác vụ I/O Bound:
Lấy data từ 1 client/Gửi data cho client được gửi thông qua network
Data được đọc/ghi giữa program với disk
Query dữ liệu từ database và trả về kết quả.
Gửi operation để database thực hiện
Remote API operation
For example of CPU Bound:
Audio or image processing.
Computer vision: an image is composed of millions of pixels, each pixel has 3 values / colors, processing that normally requires computing something on those pixels, all at the same time.
Machine Learning: it normally requires lots of “matrix” and “vector” multiplications. Think of a huge spreadsheet with numbers and multiplying all of them together at the same time.
Deep Learning: this is a sub-field of Machine Learning, so, the same applies. It’s just that there is not a single spreadsheet of numbers to multiply, but a huge set of them, and in many cases, you use a special processor to build and / or use those models.
Concurrency + Parallelism: Web + Machine Learning
With FastAPI you can take advantage of concurrency that is very common for web development.
But you can also exploit the benefits of parallelism and multiprocessing (having multiple processes running in parallel) for CPU bound workloads like those in Machine Learning systems.
That, plus the simple fact that Python is the main language for Data Science, Machine Learning and especially Deep Learning, make FastAPI a very good match for Data Science / Machine Learning web APIs and applications (among many others).
1.1.2.2. Concurrency#
Concurrency trong Python có thể bằng lập trình luồng hoặc không đồng bộ với thư viện asyncio
sử dụng async
và await
.
1.1.2.2.1. Threading (luồng)#
Threading là cơ chế đồng thời trong Python, cho phép bạn tạo và quản lý các nhiệm vụ trong một quá trình đơn giản. Thread phù hợp với những kiểu nhiệm vụ cụ thể, đặc biệt là tác vụ giới hạn I/O và có thể tận hưởng lợi ích từ việc thực thi đồng thời.
Mô đun threading của Python cung cấp giao diện cấp cao để tạo vào quản lý luồng. Trong khi GIL (Global Interpreter Lock) hạn chế các luồng về mặt parallelism thực sự, chúng vẫn có thể đạt được tính đồng thời bằng cách xen kẽ hiệu quả các nhiệm vụ.
Code bên dưới hiện một ví dụ triển khai tính đồng thời bằng thread. Nó dùng thư viện truy vấn Python để gửi một truy vấn HTTP, một nhiệm vụ khối I/O phổ biến. Nó cũng dùng mô đun thời gian để tính thời gian thực thi.
import requests
import time
import threading
urls = [
"https://www.google.com",
"https://www.wikipedia.org",
"https://www.makeuseof.com",
]
# hàm truy vấn một URL
def download_url(url):
response = requests.get(url)
print(f"Downloaded {url} - Status Code: {response.status_code}")
# Thực thi không có luồng và đo thời gian thực hiện
start_time = time.time()
for url in urls:
download_url(url)
end_time = time.time()
print(f"Sequential download took {end_time - start_time:.2f} seconds\n")
# Thực thi với luồng, reset thời gian đo thời điểm triển khai mới
start_time = time.time()
threads = []
for url in urls:
thread = threading.Thread(target=download_url, args=(url,))
thread.start()
threads.append(thread)
# Đợi tất cả phân luồng hoàn thành
for thread in threads:
thread.join()
end_time = time.time()
print(f"Threaded download took {end_time - start_time:.2f} seconds")
Downloaded https://www.google.com - Status Code: 200
Downloaded https://www.wikipedia.org - Status Code: 200
Downloaded https://www.makeuseof.com - Status Code: 200
Sequential download took 4.06 seconds
Downloaded https://www.google.com - Status Code: 200
Downloaded https://www.wikipedia.org - Status Code: 200
Downloaded https://www.makeuseof.com - Status Code: 200
Threaded download took 22.36 seconds
1.1.2.2.2. Lập trình bất đồng bộ#
1. Khái niệm về lập trình đồng bộ và bất đồng bộ
Lập trình đồng bộ (Synchronous programming): Trong lập trình đồng bộ, các tác vụ được thực thi theo thứ tự từ trên xuống dưới. Một tác vụ sẽ không bắt đầu cho đến khi tác vụ trước nó kết thúc. Điều này có thể gây ra sự lãng phí tài nguyên, đặc biệt khi các tác vụ phải đợi một số thao tác tốn thời gian (ví dụ như chờ phản hồi từ máy chủ, đọc/ghi tệp).
Lập trình bất đồng bộ (Asynchronous programming): Lập trình bất đồng bộ cho phép một chương trình xử lý nhiều tác vụ mà không cần phải đợi một tác vụ hoàn thành trước khi bắt đầu tác vụ tiếp theo. Thay vào đó, chương trình có thể thực hiện các tác vụ khác trong khi đợi một tác vụ tốn thời gian (như đọc dữ liệu từ internet hoặc file).
Trong lập trình bất đồng bộ, sử dụng: use await
inside of functions created with async def
.
async
được dùng để định nghĩa một hàm bất đồng bộ. Một hàm được khai báo bằng từ khóaasync
sẽ trả về một đối tượng coroutine. Khi gọi hàm này, nó sẽ không thực thi ngay mà trả về một coroutine object, và chỉ thực sự được thực thi khi chúng ta sử dụngawait
.await
được sử dụng để chờ đợi một coroutine. Nó cho phép chương trình tiếp tục thực thi các công việc khác trong khi đang chờ kết quả từ một tác vụ bất đồng bộ.await
chỉ có thể được sử dụng bên trong các hàm được khai báo với từ khóaasync
.
coroutine được python hiểu là 1 hàm function chạy như bình thường, nhưng bên trong nó có thể bị paused bất cứ khi nào gặp
await
, khi đó python có thể đi làm việc khác trong khi chờ kết quả.
2. Khi nào nên sử dụng async và await?
Bạn nên sử dụng async
và await
trong các trường hợp mà bạn có các tác vụ tốn thời gian (như kết nối mạng, đọc/ghi file, truy vấn cơ sở dữ liệu) và bạn muốn chương trình có thể tiếp tục thực hiện các tác vụ khác trong khi chờ đợi. Những tình huống này bao gồm:
Khi gọi API hoặc làm việc với các yêu cầu HTTP.
Khi truy xuất và ghi dữ liệu từ các tài nguyên như cơ sở dữ liệu, file hệ thống.
Khi làm việc với các tác vụ chờ đợi như nhập dữ liệu hoặc tương tác với phần cứng.
3. Ưu điểm của async và await
Hiệu quả hơn về mặt tài nguyên: Thay vì để CPU “nghỉ ngơi” trong khi đợi phản hồi từ server hoặc chờ tệp được đọc xong, chương trình có thể thực hiện các tác vụ khác.
Cải thiện tốc độ: Việc xử lý nhiều tác vụ đồng thời mà không phải chờ đợi từng tác vụ hoàn thành sẽ làm tăng hiệu suất, đặc biệt là trong các chương trình yêu cầu I/O nặng.
Dễ đọc và dễ quản lý: async và await giúp cho mã bất đồng bộ trở nên dễ đọc hơn so với cách tiếp cận dựa trên callback (gọi lại) hoặc threading.
4. Lưu ý
await
chỉ có thể được sử dụng bên trong hàm bất đồng bộ (hàm có từ khóaasync
).Các hàm bất đồng bộ (hàm có
async
) không thực thi ngay lập tức mà trả về một đối tượng coroutine. Để thực thi nó, bạn cần sử dụngawait
hoặc các phương pháp nhưasyncio.run()
.Mặc dù
async
vàawait
giúp mã dễ đọc hơn, nhưng không phải lúc nào chúng cũng là lựa chọn tốt nhất. Đối với các tác vụ đồng bộ (như tính toán CPU nặng), việc sử dụngasync
có thể không mang lại nhiều lợi ích.
Ví dụ: Giả sử chúng ta có một chương trình thực hiện việc tải xuống dữ liệu từ một server (thao tác này có thể mất thời gian), thay vì phải đợi chương trình tải xong dữ liệu mới thực hiện các tác vụ khác, chúng ta có thể sử dụng async
và await
để xử lý bất đồng bộ.
import asyncio
# Hàm bất đồng bộ để giả lập việc tải dữ liệu
async def download_data():
print("Bắt đầu tải dữ liệu...")
await asyncio.sleep(3) # Giả lập việc tải mất 3 giây
print("Tải dữ liệu hoàn tất!")
return "Dữ liệu đã tải"
# Hàm chính, gọi hàm download_data
async def main():
print("Bắt đầu công việc khác...")
result = await download_data() # Đợi download_data hoàn tất
print(f"Kết quả: {result}")
# Chạy chương trình
asyncio.run(main())
Giải thích:
async def download_data()
định nghĩa một hàm bất đồng bộ, sử dụngawait asyncio.sleep(3)
để giả lập việc chờ tải dữ liệu trong 3 giây.asyncio.sleep
là một hàm bất đồng bộ, vàawait
giúp dừng chương trình tại điểm đó, cho phép các tác vụ khác tiếp tục chạy trong khi chờ đợi.Trong hàm
main
, chúng ta sử dụngawait download_data()
để đợi cho quá trình tải dữ liệu hoàn thành trước khi tiếp tục.asyncio.run(main())
được sử dụng để thực thi run hàm main.
Test thử nhiều chương trình chạy cùng 1 lúc
import asyncio
# Giả lập việc tải dữ liệu cho từng trang web
async def download_page(page):
print(f"Bắt đầu tải trang {page}")
await asyncio.sleep(2)
print(f"Tải xong trang {page}")
return f"Dữ liệu từ trang {page}"
async def main():
# Tạo nhiều tác vụ tải trang cùng một lúc
tasks = [download_page(i) for i in range(1, 4)]
# Đợi tất cả các tác vụ hoàn thành
results = await asyncio.gather(*tasks)
print(f"Tất cả dữ liệu: {results}")
# Chạy chương trình nếu trong .py
# asyncio.run(main())
# Chạy chương trình nếu trong .ipynb
await main()
Bắt đầu tải trang 1
Bắt đầu tải trang 2
Bắt đầu tải trang 3
Tải xong trang 1
Tải xong trang 2
Tải xong trang 3
Tất cả dữ liệu: ['Dữ liệu từ trang 1', 'Dữ liệu từ trang 2', 'Dữ liệu từ trang 3']
1.1.2.3. Parallelism#
Bạn có thể triển khai song song bằng mô đun multiprocessing của Python, cho phép bạn tận dụng đầy đủ bộ vi xử lý đa lõi.
Mô đun multiprocessing của Python cung cấp cách đạt được sự song song bằng việc tạo những quá trình tách biệt với trình phiên dịch Python và không gian bộ nhớ riêng. Điều này vượt qua hiệu quả Global Interpreter Lock (GIL), khiến nó phù hợp với các nhiệm vụ liên quan tới CPU.
import requests # noqa: F811
import multiprocessing
import time
urls = [
"https://www.google.com",
"https://www.wikipedia.org",
"https://www.makeuseof.com",
]
# hàm truy vấn một URL
def download_url(url):
response = requests.get(url)
print(f"Downloaded {url} - Status Code: {response.status_code}")
def main():
# Tạo một pool multiprocessing với một số lượng quá trình được chỉ định
num_processes = len(urls)
pool = multiprocessing.Pool(processes=num_processes)
start_time = time.time()
pool.map(download_url, urls)
end_time = time.time()
# Đóng pool và đợi cho toàn bộ quá trình hoàn tất
pool.close()
pool.join()
print(f"Multiprocessing download took {end_time-start_time:.2f} seconds")
main()