Skip to content Skip to sidebar Skip to footer

Running Rpy2 In Parallel Using Multiprocessing Raises Weird Exception That Cannot Be Caught

So this is a problem that I have not been able to solve, and neither do I know of a good way to make a MCVE out of. Essentially, it has been briefly discussed here, but as the comm

Solution 1:

(...) Long story short (...)

Really ?

  1. How might this issue be overcome? A working code example that does not raise the issue will be accepted as answer even if it does not answer any other question, provided no other answer does better, or was posted earlier.

Answers may leave a quite bit of work on your end...

  1. Is my understanding of Python imports accurate, or am I missing the point about multiple instances of R? If I am wrong, how should I edit the import statements such that a new instance is created within each subprocess? Answers to this question are likely to point me towards a probable solution, and will be accepted, provided no answer does better, or was posted earlier

Python packages/modules are "uniquely" imported across your process which means that all code using the package/module within the process is using the same single import (you don't have a copy per import in a given block).

Because of this, I'd recommend to use an initialization function when creating your Pool rather than repeatedly import rpy2 and setup the conversion each time a task is sent to a worker. You may also gain in performance if each task is short.

defarima_select(y, order):
    # FIXME: check whether the rpy2.robjects package#        should be (re) imported as ro to be visible          
    res = forecast.Arima(y, order=ro.FloatVector(order))
    return res

forecast = Nonedefworker_init():
    from rpy2 import robjects as ro
    from rpy2.robjects.packages import importr
    from rpy2.robjects import pandas2ri
    pandas2ri.activate()
    global forecast
    forecast = importr('forecast')

defapplyParallel(groups, func):
    from multiprocessing import Pool, cpu_count
    with Pool(cpu_count(), worker_init) as p:
        ret_list = p.map(func, [group for _, group in groups])
    return pd.concat(ret_list, keys=[name for name, _ in groups])
  1. Is R shared among all instances of rpy2 or is there an instance of R for each instance of rpy2? Answers to this question will be accepted only if they lead to a resolution of the problem.

rpy2 is making R available by linking its C shared library. One such library per Python process, and that's as a stateful library (R not able to handle concurrency). I think that your issue has more to do with object serialization (see http://rpy2.readthedocs.io/en/version_2.8.x/robjects_serialization.html#object-serialization) than with concurrency.

What is happening is some apparent confusion when reconstructing the R objects after Python pickled the rpy2 object. More specifically, when looking that the R object types mentioned in the error message:

>>>from rpy2.rinterface import str_typeint>>>str_typeint(6)
'LANGSXP'
>>>str_typeint(24)
'RAWSXP'

I am guessing that the R object returned by forecast.Arima contains an unevaluated R expression (for example the call that lead to that result object) and when serializing and unserializing it is coming back as something different (a raw vector of bytes). This is possibly a bug with R's own serialization mechanism (since rpy2 is using it behind the hood). For now, and solve your issue, you may want to extract what forecast.Arima what you care most about and only return that from the function call ran by the worker.

Solution 2:

The following changes to the arima_select function in the pesudo code presented in the question work:

import numpy as np
import pandas as pd
from rpy2 import rinterface as ri

ri.initr()

defarima_select(y, order):

    defrimport(packname):
        as_environment = ri.baseenv['as.environment']
        require = ri.baseenv['require']
        require(ri.StrSexpVector([packname]),
                quiet = ri.BoolSexpVector((True, )))
        packname = ri.StrSexpVector(['package:' + str(packname)])
        pack_env = as_environment(packname)
        return pack_env

    frcst = rimport("forecast")
    args = (('y', ri.FloatSexpVector(y)),
            ('order', ri.FloatSexpVector(order)),
            ('include.constant', ri.StrSexpVector(const)))
    return frcst['Arima'].rcall(args, ri.globalenv)

Keeping the rest of the pseudo code the same. Note that I have since optimized the code further, and it does not require all the functions presented in the question. Basically, the following is necessary and sufficient:

import numpy as np
import pandas as pd
from rpy2 import rinterface as ri

ri.initr()

defarima(y, order=(1,1,1)):
    # This is the same as arima_select above, just renamed to arima
    ...

defapplyParallel(groups, func):
    from multiprocessing import Pool, cpu_count
    with Pool(cpu_count(), worker_init) as p:
        ret_list = p.map(func, [group for _, group in groups])
    return pd.concat(ret_list, keys=[name for name, _ in groups])

defmain():
    # Create your df in your favorite way:defdata_gen(start_day):
        r = pd.Series(pd.date_range('2016-09-{}'.format(str(start_day)),
                                    periods=24*60, freq='T'),
                      name='tstamp')
        d = pd.Series(np.random.randint(10, 80, 1440), name='val')
        s = pd.Series(['sensor1']*1440, name='sensor')
        return pd.concat([s, r, d], axis=1)
    df = pd.concat([data_gen(day) for day inrange(1,8)], ignore_index=True)

    applyParallel(df.groupby(['sensor', pd.Grouper(key='tstamp', freq='D')]),
                  arima) # Note one may use partial from functools to pass order to arima

Note that I also do not call arima directly from applyParallel since my goal is to find the best model for the given series (for a sensor and day). I use a function arima_wrapper to iterate through the order combinations, and call arima at each iteration.

Post a Comment for "Running Rpy2 In Parallel Using Multiprocessing Raises Weird Exception That Cannot Be Caught"