requests中的公共函数utils.py

# 将字典转换为元素为(key, val)的列表
In [1]: def dict_to_sequnce(d):
             if hasattr(d, 'items'):
                 d = d.items()
             return d
In [2]: d = {'a': 1, 'b': 2}

In [3]: dict_to_sequnce(d)
Out[3]: dict_items([('a', 1), ('b', 2)])


# 获取各种对象的长度,如str,BytesIO,文件
# 通过hasattr(o, 'getvalue')判断对象是是否是BytesIO,StringIO
# 通过hasattr(o, 'fileno')判断是否是文件对象
# 文件和标准输入都有tell方法,而标准输入调用tell()方法则会产生异常,当遇到类似情况会把对象长度置为0,交给请求来获取它的长度。
def super_len(o):
    total_length = 0
    current_position = 0

    if hasattr(o, '__len__'):
        total_length = len(o)

    elif hasattr(o, 'len'):
        total_length = o.len

    elif hasattr(o, 'getvalue'):
        # e.g. BytesIO, cStringIO.StringIO
        total_length = len(o.getvalue())

    elif hasattr(o, 'fileno'):
        try:
            fileno = o.fileno()
        except io.UnsupportedOperation:
            pass
        else:
            total_length = os.fstat(fileno).st_size

            # Having used fstat to determine the file length, we need to
            # confirm that this file was opened up in binary mode.
            if 'b' not in o.mode:
                warnings.warn((
                    "Requests has determined the content-length for this "
                    "request using the binary size of the file: however, the "
                    "file has been opened in text mode (i.e. without the 'b' "
                    "flag in the mode). This may lead to an incorrect "
                    "content-length. In Requests 3.0, support will be removed "
                    "for files in text mode."),
                    FileModeWarning
                )

    if hasattr(o, 'tell'):
        try:
            current_position = o.tell()
        except (OSError, IOError):
            # This can happen in some weird situations, such as when the file
            # is actually a special file descriptor like stdin. In this
            # instance, we don't know what the length is, so set it to zero and
            # let requests chunk it instead.
            current_position = total_length

    return max(0, total_length - current_position)


# 如果提供的url中没有scheme,则将new_scheme作为url的scheme,如果有则不改变
def prepend_scheme_if_needed(url, new_scheme):
    scheme, netloc, path, params, query, fragment = urlparse(url, new_scheme)

    if not netloc:
        netloc, path = path, netloc #应对netloc为空的特殊情况

    return urlunparse((scheme, netloc, path, params, query, fragment))

# 去掉url中的用户名和密码部分
# 如http://root:root@www.baidu.com/,返回为http://www.baidu.com/
def urldefragauth(url):
    scheme, netloc, path, params, query, fragment = urlparse(url)

    if not netloc:
        netloc, path = path, netloc
    netloc = netloc.rsplit('@', 1)[-1]

    return urlunparse((scheme, netloc, path, params, query, ''))


_CLEAN_HEADER_REGEX_BYTE = re.compile(b'^\\S[^\\r\\n]*$|^$') #\S匹配任意非空格字符,--
_CLEAN_HEADER_REGEX_STR = re.compile(r'^\S[^\r\n]*$|^$') # 匹配以非空开头,非\r\n结尾或者空白字符串
# 检验头的有效性防止header injection
def check_header_validity(header):
    name, value = header

    if isinstance(value, bytes):
        pat = _CLEAN_HEADER_REGEX_BYTE
    else:
        pat = _CLEAN_HEADER_REGEX_STR
    try:
        if not pat.match(value):
            raise InvalidHeader("Invalid return character or leading space in header: %s" % name)
    except TypeError:
        raise InvalidHeader("Header value %s must be of type str or bytes, "
                            "not %s" % (value, type(value)))


def get_auth_from_url(url):
    parsed = urlparse(url)

    try:
        auth = (unquote(parsed.username), unquote(parsed.password))
    except (AttributeError, TypeError):
        auth = ('', '')

    return auth      

def is_ipv4_address(string_ip):
    try:
        socket.inet_aton(string_ip)
    except socket.error:
        return False
    return True 


def is_valid_cidr(string_network):
    """
    判断是否是类似192.168.2.1/16的地址
    """
    if string_network.count('/') == 1:
        try:
            mask = int(string_network.split('/')[1])
        except ValueError:
            return False

        if mask < 1 or mask > 32:
            return False

        try:
            socket.inet_aton(string_network.split('/')[0])
        except socket.error:
            return False
    else:
        return False
    return True


# native string 指类型为str的字符串,python2与python3中str是不一样的(http://img.hysyeah.top/2017/10/02/python%E7%BC%96%E7%A0%81/)
# Http请求/响应头和元数据要求数据为str类型。
def to_native_string(string, encoding='ascii'):
    if isinstance(string, builtin_str):
        out = string
    else:
        if is_py2:
            out = string.encode(encoding)
        else:
            out = string.decode(encoding)

    return out



# 将(key,val)为元素的列表转换为OrderedDict
def from_key_val_list(value):
    if value is None:
        return None

    if isinstance(value, (str, bytes, bool, int)):
        raise ValueError('cannot encode objects that are not 2-tuples')

    return OrderedDict(value)


# 将dict转换为list
def to_key_val_list(value):
    if value is None:
        return None

    if isinstance(value, (str, bytes, bool, int)):
        raise ValueError('cannot encode objects that are not 2-tuples')

    if isinstance(value, collections.Mapping):
        value = value.items()

    return list(value)

Ref:
1.header injection
2.requests/utils.py