首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何使numba @jit使用所有cpu内核(并行化numba @jit)

如何使numba @jit使用所有cpu内核(并行化numba @jit)
EN

Stack Overflow用户
提问于 2017-08-10 09:51:34
回答 2查看 12.7K关注 0票数 20

我正在使用@jit装饰器在python中添加两个numpy数组。如果我使用@jitpython相比,性能是如此之高。

然而,这是,即使我传入@numba.jit(nopython = True, parallel = True, nogil = True),也没有利用所有的CPU核心

有没有办法利用numba @jit的所有CPU核心。

这是我的代码:

代码语言:javascript
复制
import time                                                
import numpy as np                                         
import numba                                               

SIZE = 2147483648 * 6                                      

a = np.full(SIZE, 1, dtype = np.int32)                     

b = np.full(SIZE, 1, dtype = np.int32)                     

c = np.ndarray(SIZE, dtype = np.int32)                     

@numba.jit(nopython = True, parallel = True, nogil = True) 
def add(a, b, c):                                          
    for i in range(SIZE):                                  
        c[i] = a[i] + b[i]                                 

start = time.time()                                        
add(a, b, c)                                               
end = time.time()                                          

print(end - start)                                        
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2017-08-10 11:20:42

您可以将parallel=True传递给任何numba跳转函数,但这并不意味着它总是在使用所有内核。您必须理解,numba使用一些启发式方法使代码并行执行,有时这些启发式方法在代码中根本找不到任何可并行化的东西。目前有一个拉请求,如果不可能使它“并行”,它就会发出警告。因此,它更像是一个“请让它尽可能并行执行”参数,而不是“强制并行执行”。

但是,如果您确实知道可以并行化代码,则始终可以手动使用线程或进程。只是适应了从numba文档中使用多线程的示例

代码语言:javascript
复制
#!/usr/bin/env python
from __future__ import print_function, division, absolute_import

import math
import threading
from timeit import repeat

import numpy as np
from numba import jit

nthreads = 4
size = 10**7  # CHANGED

# CHANGED
def func_np(a, b):
    """
    Control function using Numpy.
    """
    return a + b

# CHANGED
@jit('void(double[:], double[:], double[:])', nopython=True, nogil=True)
def inner_func_nb(result, a, b):
    """
    Function under test.
    """
    for i in range(len(result)):
        result[i] = a[i] + b[i]

def timefunc(correct, s, func, *args, **kwargs):
    """
    Benchmark *func* and print out its runtime.
    """
    print(s.ljust(20), end=" ")
    # Make sure the function is compiled before we start the benchmark
    res = func(*args, **kwargs)
    if correct is not None:
        assert np.allclose(res, correct), (res, correct)
    # time it
    print('{:>5.0f} ms'.format(min(repeat(lambda: func(*args, **kwargs),
                                          number=5, repeat=2)) * 1000))
    return res

def make_singlethread(inner_func):
    """
    Run the given function inside a single thread.
    """
    def func(*args):
        length = len(args[0])
        result = np.empty(length, dtype=np.float64)
        inner_func(result, *args)
        return result
    return func

def make_multithread(inner_func, numthreads):
    """
    Run the given function inside *numthreads* threads, splitting its
    arguments into equal-sized chunks.
    """
    def func_mt(*args):
        length = len(args[0])
        result = np.empty(length, dtype=np.float64)
        args = (result,) + args
        chunklen = (length + numthreads - 1) // numthreads
        # Create argument tuples for each input chunk
        chunks = [[arg[i * chunklen:(i + 1) * chunklen] for arg in args]
                  for i in range(numthreads)]
        # Spawn one thread per chunk
        threads = [threading.Thread(target=inner_func, args=chunk)
                   for chunk in chunks]
        for thread in threads:
            thread.start()
        for thread in threads:
            thread.join()
        return result
    return func_mt


func_nb = make_singlethread(inner_func_nb)
func_nb_mt = make_multithread(inner_func_nb, nthreads)

a = np.random.rand(size)
b = np.random.rand(size)

correct = timefunc(None, "numpy (1 thread)", func_np, a, b)
timefunc(correct, "numba (1 thread)", func_nb, a, b)
timefunc(correct, "numba (%d threads)" % nthreads, func_nb_mt, a, b)

我突出显示了我更改的部分,其他所有内容都是从示例逐字复制的。这利用了我的机器上的所有核心(4台核心机器,因此4条线程),但没有显示出明显的加速:

代码语言:javascript
复制
numpy (1 thread)       539 ms
numba (1 thread)       536 ms
numba (4 threads)      442 ms

在这种情况下,多线程的加速比不足,这是一种带宽有限的操作。这意味着从数组加载元素并将结果放入结果数组比实际添加要花费更多的时间。

在这些情况下,您甚至可以看到由于并行执行而出现的减速!

只有当函数更复杂,实际操作要比加载和存储数组元素花费大量时间时,并行执行才会有很大的改进。numba文档中的例子如下:

代码语言:javascript
复制
def func_np(a, b):
    """
    Control function using Numpy.
    """
    return np.exp(2.1 * a + 3.2 * b)

@jit('void(double[:], double[:], double[:])', nopython=True, nogil=True)
def inner_func_nb(result, a, b):
    """
    Function under test.
    """
    for i in range(len(result)):
        result[i] = math.exp(2.1 * a[i] + 3.2 * b[i])

这实际上(几乎)随着线程数而扩展,因为两个乘法、一个加法和一个对math.exp的调用比加载和存储结果要慢得多:

代码语言:javascript
复制
func_nb = make_singlethread(inner_func_nb)
func_nb_mt2 = make_multithread(inner_func_nb, 2)
func_nb_mt3 = make_multithread(inner_func_nb, 3)
func_nb_mt4 = make_multithread(inner_func_nb, 4)

a = np.random.rand(size)
b = np.random.rand(size)

correct = timefunc(None, "numpy (1 thread)", func_np, a, b)
timefunc(correct, "numba (1 thread)", func_nb, a, b)
timefunc(correct, "numba (2 threads)", func_nb_mt2, a, b)
timefunc(correct, "numba (3 threads)", func_nb_mt3, a, b)
timefunc(correct, "numba (4 threads)", func_nb_mt4, a, b)

结果:

代码语言:javascript
复制
numpy (1 thread)      3422 ms
numba (1 thread)      2959 ms
numba (2 threads)     1555 ms
numba (3 threads)     1080 ms
numba (4 threads)      797 ms
票数 20
EN

Stack Overflow用户

发布于 2018-08-22 17:40:31

为了完整起见,2018年(numba v. 0.39)你可以

代码语言:javascript
复制
from numba import prange

并在原来的函数定义中将range替换为prange,就是这样。

这立即使CPU利用率达到100%,在我的例子中,运行时的速度从2.9秒提高到1.7秒(对于SIZE = 2147483648 * 1,在有16个核心32个线程的机器上)。

更复杂的内核通常可以通过传入fastmath=True来加快速度。

票数 7
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/45610292

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档