Skip to content Skip to sidebar Skip to footer

Memory Usage With Concurrent.futures.threadpoolexecutor In Python3

I am building a script to download and parse benefits information for health insurance plans on Obamacare exchanges. Part of this requires downloading and parsing the plan benefit

Solution 1:

It's not your fault. as_complete() doesn't release its futures until it completes. There's an issue logged already: https://bugs.python.org/issue27144

For now, I think the majority approach is to wrap as_complete() inside another loop that chunkify to a sane number of futures, depending on how much RAM you want to spend and how big your result will be. It'll block on each chunk until all job is gone before going to next chunk so be slower or potentially stuck in the middle for a long time, but I see no other way for now, though will keep this answer posted when there's a smarter way.

Solution 2:

As an alternative solution, you can call add_done_callback on your futures and not use as_completed at all. The key is NOT keeping references to futures. So future_to_url list in original question is a bad idea.

What I've done is basically:

def do_stuff(future):
    res = future.result()  # handle exceptions here if you need to

f = executor.submit(...)
f.add_done_callback(do_stuff)

Solution 3:

If you use the standard module “concurrent.futures” and want to simultaneously process several million data, then a queue of workers will take up all the free memory.

You can use bounded-pool-executor. https://github.com/mowshon/bounded_pool_executor

pip install bounded-pool-executor

example:

from bounded_pool_executor import BoundedProcessPoolExecutor
from time import sleep
from random import randint

defdo_job(num):
    sleep_sec = randint(1, 10)
    print('value: %d, sleep: %d sec.' % (num, sleep_sec))
    sleep(sleep_sec)

with BoundedProcessPoolExecutor(max_workers=5) as worker:
    for num inrange(10000):
        print('#%d Worker initialization' % num)
        worker.submit(do_job, num)

Solution 4:

dodysw has correctly pointed out that the common solution is to chunkify the inputs and submit chunks of tasks to the executor. He has also correctly pointed out that you lose some performance by waiting for each chunk to be processed completely before starting to process the next chunk.

I suggest a better solution that will feed a continuous stream of tasks to the executor while enforcing an upper bound on the maximum number of parallel tasks in order to keep the memory footprint low.

The trick is to use concurrent.futures.wait to keep track of the futures that have been completed and those that are still pending completion:

defload_json_url(url):
    try:
        req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'})
        resp = urllib.request.urlopen(req).read().decode('utf8')
        return json.loads(resp), Noneexcept Exception as e:
        return url, e

MAX_WORKERS = 6with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
    futures_done = set()
    futures_notdone = set()
    for url in formulary_urls:
        futures_notdone.add(executor.submit(load_json_url, url))

        iflen(futures_notdone) >= MAX_WORKERS:
            done, futures_notdone = concurrent.futures.wait(futures_notdone, return_when=concurrent.futures.FIRST_COMPLETED)
            futures_done.update(done)

# Process results.
downloaded_plans = 0for future in futures_done:
    json, exc = future.result()
    if exc:
        print('%r generated an exception: %s' % (json, exc))
    else:
        downloaded_plans += 1for item in data:
            if item['rxnorm_id'] == drugid:
                for row in item['plans']:
                    print(row['drug_tier'])
                    (plansid_dict[row['plan_id']])['drug_tier'] = row['drug_tier']
                    (plansid_dict[row['plan_id']])['prior_authorization'] = row['prior_authorization']
                    (plansid_dict[row['plan_id']])['step_therapy'] = row['step_therapy']
                    (plansid_dict[row['plan_id']])['quantity_limit'] = row['quantity_limit']

Of course, you could also process the results inside the loop regularly in order to empty the futures_done from time to time. For example, you could do that each time the number of items in futures_done exceeds 1000 (or any other amount that fits your needs). This might come in handy if your dataset is very large and the results alone would result in a lot of memory usage.

Post a Comment for "Memory Usage With Concurrent.futures.threadpoolexecutor In Python3"