I am porting a simple python 3 script to AWS Lambda.
The script is simple: it gathers information from a dozen of S3 objects and returns the results.
The script used
multiprocessing.Pool to gather all the files in parallel. Though
multiprocessing cannot be used in an AWS Lambda environment since
/dev/shm is missing.
So I thought instead of writing a dirty
multiprocessing.Queue replacement, I would try
I am using the latest version of
aioboto3 (8.0.5) on Python 3.8.
My problem is that I cannot seem to gain any improvement between a naive sequential download of the files, and an asyncio event loop multiplexing the downloads.
Here are the two versions of my code.
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
BUCKET = 'some-bucket'
KEYS = [
async def download_aio():
"""Concurrent download of all objects from S3"""
async with aioboto3.client('s3') as s3:
objects = [s3.get_object(Bucket=BUCKET, Key=k) for k in KEYS]
objects = await asyncio.gather(*objects)
buffers = await asyncio.gather(*[o['Body'].read() for o in objects])
"""Sequentially download all objects from S3"""
s3 = boto3.client('s3')
for key in KEYS:
object = s3.get_object(Bucket=BUCKET, Key=key)
loop = asyncio.get_event_loop()
The timing for both
run_concurrent() are quite similar (~3 seconds for a dozen of 10MB files).
I am convinced the concurrent version is not, for multiple reasons:
- I tried switching to
Process/ThreadPoolExecutor, and I the processes/threads spawned for the duration of the function, though they are doing nothing
- The timing between sequential and concurrent is very close to the same, though my network interface is definitely not saturated, and the CPU is not bound either
- The time taken by the concurrent version increases linearly with the number of files.
I am sure something is missing, but I just can’t wrap my head around what.
After loosing some hours trying to understand how to use
aioboto3 correctly, I decided to just switch to my backup solution.
I ended up rolling my own naive version of
multiprocessing.Pool for use within an AWS lambda environment.
If someone stumble across this thread in the future, here it is. It is far from perfect, but easy enough to replace
multiprocessing.Pool as-is for my simple cases.
from multiprocessing import Process, Pipe
from multiprocessing.connection import wait
"""Naive implementation of a process pool with mp.Pool API.
This is useful since multiprocessing.Pool uses a Queue in /dev/shm, which
is not mounted in an AWS Lambda environment.
def __init__(self, process_count=1):
assert process_count >= 1
self.process_count = process_count
def wrap_pipe(pipe, index, func):
result = func(args)
except Exception as exc: # pylint: disable=broad-except
result = exc
def __exit__(self, exc_type, exc_value, exc_traceback):
def map(self, function, arguments):
pending = list(enumerate(arguments))
running = 
finished = [None] * len(pending)
while pending or running:
# Fill the running queue with new jobs
while len(running) < self.process_count:
if not pending:
index, args = pending.pop(0)
pipe_parent, pipe_child = Pipe(False)
process = Process(
target=Pool.wrap_pipe(pipe_child, index, function),
running.append((index, process, pipe_parent))
# Wait for jobs to finish
for pipe in wait(list(map(lambda t: t, running))):
index, result = pipe.recv()
# Remove the finished job from the running list
running = list(filter(lambda x: x != index, running))
# Add the result to the finished list
finished[index] = result