From a00fffd93970d29614fbef72f1102db46b05e209 Mon Sep 17 00:00:00 2001 From: Olivier Grisel <olivier.grisel@ensta.org> Date: Tue, 1 Jul 2014 13:54:11 +0200 Subject: [PATCH] MAINT bump joblib to 0.8.2 --- sklearn/externals/joblib/__init__.py | 2 +- sklearn/externals/joblib/parallel.py | 2 +- sklearn/externals/joblib/pool.py | 28 +++-- sklearn/externals/joblib/test/test_hashing.py | 9 +- sklearn/externals/joblib/test/test_logger.py | 4 +- sklearn/externals/joblib/test/test_pool.py | 112 +++++++++++++----- 6 files changed, 109 insertions(+), 48 deletions(-) diff --git a/sklearn/externals/joblib/__init__.py b/sklearn/externals/joblib/__init__.py index 7583377a62..39c7fdf2e2 100644 --- a/sklearn/externals/joblib/__init__.py +++ b/sklearn/externals/joblib/__init__.py @@ -100,7 +100,7 @@ Main features """ -__version__ = '0.8.1' +__version__ = '0.8.2' from .memory import Memory, MemorizedResult diff --git a/sklearn/externals/joblib/parallel.py b/sklearn/externals/joblib/parallel.py index a4d59cd28a..e7a5b0b945 100644 --- a/sklearn/externals/joblib/parallel.py +++ b/sklearn/externals/joblib/parallel.py @@ -352,7 +352,7 @@ class Parallel(Logger): [Parallel(n_jobs=2)]: Done 6 out of 6 | elapsed: 0.0s finished ''' def __init__(self, n_jobs=1, backend=None, verbose=0, pre_dispatch='all', - temp_folder=None, max_nbytes=100e6, mmap_mode='c'): + temp_folder=None, max_nbytes=100e6, mmap_mode='r'): self.verbose = verbose self._mp_context = None if backend is None: diff --git a/sklearn/externals/joblib/pool.py b/sklearn/externals/joblib/pool.py index 2666710f65..486ca69eaf 100644 --- a/sklearn/externals/joblib/pool.py +++ b/sklearn/externals/joblib/pool.py @@ -491,7 +491,7 @@ class MemmapingPool(PicklingPool): """ def __init__(self, processes=None, temp_folder=None, max_nbytes=1e6, - mmap_mode='c', forward_reducers=None, backward_reducers=None, + mmap_mode='r', forward_reducers=None, backward_reducers=None, verbose=0, context_id=None, prewarm=False, **kwargs): if forward_reducers is None: forward_reducers = dict() @@ -502,33 +502,35 @@ class MemmapingPool(PicklingPool): # pool instance (do not create in advance to spare FS write access if # no array is to be dumped): use_shared_mem = False + pool_folder_name = "joblib_memmaping_pool_%d_%d" % ( + os.getpid(), id(self)) if temp_folder is None: temp_folder = os.environ.get('JOBLIB_TEMP_FOLDER', None) if temp_folder is None: if os.path.exists(SYSTEM_SHARED_MEM_FS): try: - joblib_folder = os.path.join( - SYSTEM_SHARED_MEM_FS, 'joblib') - if not os.path.exists(joblib_folder): - os.makedirs(joblib_folder) + temp_folder = SYSTEM_SHARED_MEM_FS + pool_folder = os.path.join(temp_folder, pool_folder_name) + if not os.path.exists(pool_folder): + os.makedirs(pool_folder) use_shared_mem = True except IOError: - # Missing rights in the the /dev/shm partition, ignore - pass + # Missing rights in the the /dev/shm partition, + # fallback to regular temp folder. + temp_folder = None if temp_folder is None: # Fallback to the default tmp folder, typically /tmp temp_folder = tempfile.gettempdir() temp_folder = os.path.abspath(os.path.expanduser(temp_folder)) - self._temp_folder = temp_folder = os.path.join( - temp_folder, "joblib_memmaping_pool_%d_%d" % ( - os.getpid(), id(self))) + pool_folder = os.path.join(temp_folder, pool_folder_name) + self._temp_folder = pool_folder # Register the garbage collector at program exit in case caller forgets # to call terminate explicitly: note we do not pass any reference to # self to ensure that this callback won't prevent garbage collection of # the pool instance and related file handler resources such as POSIX # semaphores and pipes - atexit.register(lambda: delete_folder(temp_folder)) + atexit.register(lambda: delete_folder(pool_folder)) if np is not None: # Register smart numpy.ndarray reducers that detects memmap backed @@ -537,7 +539,7 @@ class MemmapingPool(PicklingPool): if prewarm == "auto": prewarm = not use_shared_mem forward_reduce_ndarray = ArrayMemmapReducer( - max_nbytes, temp_folder, mmap_mode, verbose, + max_nbytes, pool_folder, mmap_mode, verbose, context_id=context_id, prewarm=prewarm) forward_reducers[np.ndarray] = forward_reduce_ndarray forward_reducers[np.memmap] = reduce_memmap @@ -547,7 +549,7 @@ class MemmapingPool(PicklingPool): # to avoid confusing the caller and make it tricky to collect the # temporary folder backward_reduce_ndarray = ArrayMemmapReducer( - None, temp_folder, mmap_mode, verbose) + None, pool_folder, mmap_mode, verbose) backward_reducers[np.ndarray] = backward_reduce_ndarray backward_reducers[np.memmap] = reduce_memmap diff --git a/sklearn/externals/joblib/test/test_hashing.py b/sklearn/externals/joblib/test/test_hashing.py index e69089b2b8..15a5da9a33 100644 --- a/sklearn/externals/joblib/test/test_hashing.py +++ b/sklearn/externals/joblib/test/test_hashing.py @@ -32,6 +32,11 @@ except NameError: unicode = lambda s: s +def assert_less(a, b): + if a > b: + raise AssertionError("%r is not lower than %r") + + ############################################################################### # Helper functions for the tests def time_func(func, *args): @@ -200,7 +205,7 @@ def test_hash_numpy_performance(): md5_hash = lambda x: hashlib.md5(getbuffer(x)).hexdigest() relative_diff = relative_time(md5_hash, hash, a) - nose.tools.assert_true(relative_diff < 0.1) + assert_less(relative_diff, 0.3) # Check that hashing an tuple of 3 arrays takes approximately # 3 times as much as hashing one array @@ -208,7 +213,7 @@ def test_hash_numpy_performance(): time_hash = time_func(hash, (a, a, a)) relative_diff = 0.5 * (abs(time_hash - time_hashlib) / (time_hash + time_hashlib)) - nose.tools.assert_true(relative_diff < 0.2) + assert_less(relative_diff, 0.3) def test_bound_methods_hash(): diff --git a/sklearn/externals/joblib/test/test_logger.py b/sklearn/externals/joblib/test/test_logger.py index d371baf63f..e29af47dad 100644 --- a/sklearn/externals/joblib/test/test_logger.py +++ b/sklearn/externals/joblib/test/test_logger.py @@ -59,8 +59,8 @@ def test_print_time(): print_time(unicode('Foo')) printed_text = sys.stderr.getvalue() # Use regexps to be robust to time variations - match = r"Foo: 0\..s, 0\.0min\nFoo: 0\..s, 0.0min\nFoo: " + \ - r".\..s, 0.0min\n" + match = r"Foo: 0\..s, 0\..min\nFoo: 0\..s, 0..min\nFoo: " + \ + r".\..s, 0..min\n" if not re.match(match, printed_text): raise AssertionError('Excepted %s, got %s' % (match, printed_text)) diff --git a/sklearn/externals/joblib/test/test_pool.py b/sklearn/externals/joblib/test/test_pool.py index 20d45e1497..4b274d8c99 100644 --- a/sklearn/externals/joblib/test/test_pool.py +++ b/sklearn/externals/joblib/test/test_pool.py @@ -54,16 +54,40 @@ def teardown_temp_folder(): with_temp_folder = with_setup(setup_temp_folder, teardown_temp_folder) -def double(input): - """Dummy helper function to be executed in subprocesses""" +def setup_if_has_dev_shm(): + if not os.path.exists('/dev/shm'): + raise SkipTest("This test requires the /dev/shm shared memory fs.") + + +with_dev_shm = with_setup(setup_if_has_dev_shm) + + +def check_array(args): + """Dummy helper function to be executed in subprocesses + + Check that the provided array has the expected values in the provided + range. + + """ assert_array_equal = np.testing.assert_array_equal + data, position, expected = args + assert_equal(data[position], expected) - data, position, expected = input - if expected is not None: - assert_equal(data[position], expected) + +def inplace_double(args): + """Dummy helper function to be executed in subprocesses + + + Check that the input array has the right values in the provided range + and perform an inplace modification to double the values in the range by + two. + + """ + assert_array_equal = np.testing.assert_array_equal + data, position, expected = args + assert_equal(data[position], expected) data[position] *= 2 - if expected is not None: - assert_array_equal(data[position], 2 * expected) + assert_equal(data[position], 2 * expected) @with_numpy @@ -210,18 +234,18 @@ def test_pool_with_memmap(): a = np.memmap(filename, dtype=np.float32, shape=(3, 5), mode='w+') a.fill(1.0) - p.map(double, [(a, (i, j), 1.0) - for i in range(a.shape[0]) - for j in range(a.shape[1])]) + p.map(inplace_double, [(a, (i, j), 1.0) + for i in range(a.shape[0]) + for j in range(a.shape[1])]) assert_array_equal(a, 2 * np.ones(a.shape)) # Open a copy-on-write view on the previous data b = np.memmap(filename, dtype=np.float32, shape=(5, 3), mode='c') - p.map(double, [(b, (i, j), 2.0) - for i in range(b.shape[0]) - for j in range(b.shape[1])]) + p.map(inplace_double, [(b, (i, j), 2.0) + for i in range(b.shape[0]) + for j in range(b.shape[1])]) # Passing memmap instances to the pool should not trigger the creation # of new files on the FS @@ -235,12 +259,12 @@ def test_pool_with_memmap(): c = np.memmap(filename, dtype=np.float32, shape=(10,), mode='r', offset=5 * 4) - assert_raises(AssertionError, p.map, double, + assert_raises(AssertionError, p.map, check_array, [(c, i, 3.0) for i in range(c.shape[0])]) # depending on the version of numpy one can either get a RuntimeError # or a ValueError - assert_raises((RuntimeError, ValueError), p.map, double, + assert_raises((RuntimeError, ValueError), p.map, inplace_double, [(c, i, 2.0) for i in range(c.shape[0])]) finally: # Clean all filehandlers held by the pool @@ -270,9 +294,9 @@ def test_pool_with_memmap_array_view(): assert_false(isinstance(a_view, np.memmap)) assert_true(has_shareable_memory(a_view)) - p.map(double, [(a_view, (i, j), 1.0) - for i in range(a.shape[0]) - for j in range(a.shape[1])]) + p.map(inplace_double, [(a_view, (i, j), 1.0) + for i in range(a.shape[0]) + for j in range(a.shape[1])]) # Both a and the a_view have been updated assert_array_equal(a, 2 * np.ones(a.shape)) @@ -307,7 +331,7 @@ def test_memmaping_pool_for_large_arrays(): small = np.ones(5, dtype=np.float32) assert_equal(small.nbytes, 20) - p.map(double, [(small, i, 1.0) for i in range(small.shape[0])]) + p.map(check_array, [(small, i, 1.0) for i in range(small.shape[0])]) # Memory has been copied, the pool filesystem folder is unused assert_equal(os.listdir(TEMP_FOLDER), []) @@ -315,16 +339,9 @@ def test_memmaping_pool_for_large_arrays(): # Try with a file larger than the memmap threshold of 40 bytes large = np.ones(100, dtype=np.float64) assert_equal(large.nbytes, 800) - p.map(double, [(large, i, 1.0) for i in range(large.shape[0])]) + p.map(check_array, [(large, i, 1.0) for i in range(large.shape[0])]) - # By defaul, the mmap_mode is copy-on-write to make the pool - # process able to modify their view individually as if they would have - # received their own copy of the original array. The original array - # (which is not a shared memmap instance is untouched) - assert_false(has_shareable_memory(large)) - assert_array_equal(large, np.ones(100)) - - # The data has been dump in a temp folder for subprocess to share it + # The data has been dumped in a temp folder for subprocess to share it # without per-child memory copies assert_true(os.path.isdir(p._temp_folder)) dumped_filenames = os.listdir(p._temp_folder) @@ -352,7 +369,7 @@ def test_memmaping_pool_for_large_arrays_disabled(): # Try with a file largish than the memmap threshold of 40 bytes large = np.ones(100, dtype=np.float64) assert_equal(large.nbytes, 800) - p.map(double, [(large, i, 1.0) for i in range(large.shape[0])]) + p.map(check_array, [(large, i, 1.0) for i in range(large.shape[0])]) # Check that the tempfolder is still empty assert_equal(os.listdir(TEMP_FOLDER), []) @@ -363,6 +380,43 @@ def test_memmaping_pool_for_large_arrays_disabled(): del p +@with_numpy +@with_multiprocessing +@with_dev_shm +def test_memmaping_on_dev_shm(): + """Check that large arrays memmaping can be disabled""" + p = MemmapingPool(3, max_nbytes=10) + try: + # Check that the pool has correctly detected the presence of the + # shared memory filesystem. + pool_temp_folder = p._temp_folder + folder_prefix = '/dev/shm/joblib_memmaping_pool_' + assert_true(pool_temp_folder.startswith(folder_prefix)) + assert_true(os.path.exists(pool_temp_folder)) + + # Try with a file larger than the memmap threshold of 10 bytes + a = np.ones(100, dtype=np.float64) + assert_equal(a.nbytes, 800) + p.map(id, [a] * 10) + # a should have been memmaped to the pool temp folder: the joblib + # pickling procedure generate a .pkl and a .npy file: + assert_equal(len(os.listdir(pool_temp_folder)), 2) + + b = np.ones(100, dtype=np.float64) + assert_equal(b.nbytes, 800) + p.map(id, [b] * 10) + # A copy of both a and b are not stored in the shared memory folder + assert_equal(len(os.listdir(pool_temp_folder)), 4) + + finally: + # Cleanup open file descriptors + p.terminate() + del p + + # The temp folder is cleaned up upon pool termination + assert_false(os.path.exists(pool_temp_folder)) + + @with_numpy @with_multiprocessing @with_temp_folder -- GitLab