Python grequests闲话

5,037次阅读
没有评论

共计 4907 个字符,预计需要花费 13 分钟才能阅读完成。

前段时间看到这个grequests库,感觉还是蛮有意思的,所以今天来对这个库拆解拆解。这个库是崇拜的大神kennethreitz写的。Github地址:https://github.com/kennethreitz/grequests

首先看到文档上给的示例:

import grequests


urls = [
'http://www.heroku.com',
'http://python-tablib.org',
'http://httpbin.org',
'http://python-requests.org',
'http://fakedomain/',
'http://kennethreitz.com'
]
# 创建没有发送的request集合
rs = (grequests.get(u) for u in urls)
# 发送
grequests.map(rs)


# 为了防止超时和异常发生,可以指定一个异常处理器
def exception_handler(request, exception):
print("Request failed")


reqs = [
grequests.get('http://httpbin.org/delay/1', timeout=0.001),
grequests.get('http://fakedomain/'),
grequests.get('http://httpbin.org/status/500')]


grequests.map(reqs, exception_handler=exception_handler)


另外,可以使用imap来提高性能

根据这个示例,我们来看看源代码

AsyncRequest

首先来看grequests.get

get = partial(AsyncRequest, 'GET')
options = partial(AsyncRequest, 'OPTIONS')

 

它和其他诸多HTTP方法一样,只是一个快捷方式,其本质是调用了AsyncRequest,看到这个名字就应该知道是异步的Request,所以应该是对普通的Request做了封装和修改:

class AsyncRequest(object):
""" 异步的Request,接收和Session.request相同的参数,还有一些额外的参数
session: 发送请求的session
callback: 在返回对象上的回调函数,和传递hooks={'response': callback}一样
"""
    def __init__(self, method, url, **kwargs):
    #: Request method
      self.method = method
    #: URL to request
      self.url = url
    #: Associated ``Session``
      self.session = kwargs.pop('session', None)
      if self.session is None:
      # requests里的Session对象
        self.session = Session()


      callback = kwargs.pop('callback', None)
      if callback:
        kwargs['hooks'] = {'response': callback}

     #:  The rest arguments for ``Session.request``
      self.kwargs = kwargs
     #: Resulting ``  Response``
      self.response = None

 

可以看到使用partial(AsyncRequest, 'GET')会使得method默认为GET,然后再rs = (grequests.get(u) for u in urls)会形成一个生成器,里面生成AsyncRequest对象。下面来看看这些request是如何发送的:

map

之后程序会调用grequests.map(rs),那么我们来看看map:

def map(requests, stream=False, size=None, exception_handler=None, gtimeout=None):
    """并发的将Requests列表转换成响应
    requests: Request对象的集合
    stream: 如果为True,那么响应内容不会立即下载
    size: 指定同时发起的请求数目,如果为None,就不会有限制
    exception_handler: 回调函数,当异常发生的时候调用,参数是Request和Exception
    gtimeout: Gevent合并所有的超时时间,单位为秒(与每个request的超时时间无关)
    """
    # 将生成器直接转换成list
    requests = list(requests)
    # gevent的Pool对象,是时候研究一波gevent了
    pool = Pool(size) if size else None
    # 调用send函数来发送请求
    jobs = [send(r, pool, stream=stream) for r in requests]
    # 等待所有的greenlet处理单元结束运行
    gevent.joinall(jobs, timeout=gtimeout)
    ret = []
    # 处理所有的请求响应,并且处理异常
    for request in requests:
        if request.response is not None:
            ret.append(request.response)
        # 如果有异常处理器并且request有异常进行处理
        elif exception_handler and hasattr(request, 'exception'):
            ret.append(exception_handler(request, request.exception))
        else:
            # 否则结果置为None
            ret.append(None)
    return ret

 

可以看到map函数很简单,大体流程就是建立了greenlet处理器池,然后对每个request进行调用,然后等待结束,最后得到响应并且处理响应。

# 如果给定size则创建greenlet池,否则为None
pool = Pool(size) if size else None
jobs = [send(r, pool, stream=stream) for r in requests]
gevent.joinall(jobs, timeout=gtimeout)

 

所以问题的关键还是在于gevent那几行的调用,创建管理greenlet的池,用来限制并发,创建好了之后,调用这个pool然后去发送请求,最后等待所有的greenlet结束。

Gevent的并发

无论创建还是没有创建池,最终是要调用send方法的,来看看这个函数:

def send(r, pool=None, stream=False):
    """使用指定的pool发送request对象,如果pool没有指定,这个方法就会阻塞,Pools很有用,因为你可以指定并发限制"""
    if pool is not None:
        return pool.spawn(r.send, stream=stream)
    return gevent.spawn(r.send, stream=stream)

 

gevent.spawn

gevent.spawn创建一个新的Greenlet对象,并且排定运行调用function(*args, **kwargs),这个可以使用gevent.spawn或者是Greenlet.spawn,其实gevent.spawn就是Greenlet.spawn,并且最后会调用Greenlet的类方法,首先实例化一个对象,然后调用start方法,所以也相当于调用Greenlet(*args, **kwargs)

这也是类方法的一个用法,另外的实例化对象的方法

@classmethod
def spawn(cls, *args, **kwargs):
    g = cls(*args, **kwargs)
    g.start()
    return g

pool.spawn

这个方法使用给定的参数开始一个新的greenlet,通常是传递给Greenlet构造函数,并且将其加入这个pool管理的greenlets集合

Pool是Group的子类,提供了限制并发的方法,其spawn方法在greenlets数目达到上限的时候阻塞,直到有一个可用的greenlet。

这个方法也是使用pool实例为Greenlet创建一个实例,然后start it

def spawn(self, *args, **kwargs):
    greenlet = self.greenlet_class(*args, **kwargs)
    self.start(greenlet)
    return greenlet

 

r.send

这个是AsyncRequest的send方法,比较简单,就是发送请求,等待响应。

def send(self, **kwargs):
    merged_kwargs = {}
    merged_kwargs.update(self.kwargs)
    merged_kwargs.update(kwargs)
    try:
        self.response = self.session.request(self.method, self.url, **merged_kwargs)
    except Exception as e:
        self.exception = e
        self.traceback = traceback.format_exc()
    return self

 

imap

imap据说可以提高性能,快来看看吧:

def imap(requests, stream=False, size=2, exception_handler=None):
    """并发的将Request对象的生成器转换成响应的生成器。
    requests: Request对象的生成器
    stream: 如果为True,则不会立即自动下载
    size: 同时发起的请求数,默认为2
    exception_handler: 当发生异常时候回调
    """
    pool = Pool(size)
    def send(r):
        return r.send(stream=stream)
    for request in pool.imap_unordered(send, requests):
        if request.response is not None:
            yield request.response
        elif exception_handler:
            exception_handler(request, request.exception)
    pool.join()

 

可以看到这个函数主要是使用了pool.imap_unordered,其实pool还有一个方法是imap

pool.imap

itertools.imap()是一致的,itertools.imap()可以用于迭代无穷序列,比如itertools.imap(lambda x, y: x * y, [10, 20, 30], itertools.count(1)),如果两个序列长短不一致,以短的为准,并且imap实现了惰性计算,类似生成器。

pool.imap可以并行运行,按顺序从迭代对象中取出元素迭代,应用在函数上,然后收集结果。

如果限制了可以同时进行的greenlets数量,那么最多只有这么多个任务同时进行。

pool.imap_unordered

和imap一样,返回的结果顺序是随意的,比起imap更加轻量级,如果顺序不重要的话,首先应该选用这个。

join

等待这个group的greenlets都运行完,如果这个group没有greenlet的话,立即返回

可以看到,如果不要求顺序的话,imap_unordered会比imap更加高效,同时imap版本肯定比map版本性能好,因为map版本必须全部运行完才能拿数据,但是imap版本只要有greenlet有结果就可以取出来。

小结

这个库就这么多内容,其实主要是使用gevent封装了一层requests,所以核心就是使用gevent,gevent怎么用,如何用,待我继续研究。

不过通过看这个库,也了解到了简单的gevent的用法

 

 

正文完
请博主喝杯咖啡吧!
post-qrcode
 
admin
版权声明:本站原创文章,由 admin 2020-04-05发表,共计4907字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
评论(没有评论)
验证码