# 将字典转换为元素为(key, val)的列表
In [1]: def dict_to_sequnce(d):
if hasattr(d, 'items'):
d = d.items()
return d
In [2]: d = {'a': 1, 'b': 2}
In [3]: dict_to_sequnce(d)
Out[3]: dict_items([('a', 1), ('b', 2)])
# 获取各种对象的长度,如str,BytesIO,文件
# 通过hasattr(o, 'getvalue')判断对象是是否是BytesIO,StringIO
# 通过hasattr(o, 'fileno')判断是否是文件对象
# 文件和标准输入都有tell方法,而标准输入调用tell()方法则会产生异常,当遇到类似情况会把对象长度置为0,交给请求来获取它的长度。
def super_len(o):
total_length = 0
current_position = 0
if hasattr(o, '__len__'):
total_length = len(o)
elif hasattr(o, 'len'):
total_length = o.len
elif hasattr(o, 'getvalue'):
# e.g. BytesIO, cStringIO.StringIO
total_length = len(o.getvalue())
elif hasattr(o, 'fileno'):
try:
fileno = o.fileno()
except io.UnsupportedOperation:
pass
else:
total_length = os.fstat(fileno).st_size
# Having used fstat to determine the file length, we need to
# confirm that this file was opened up in binary mode.
if 'b' not in o.mode:
warnings.warn((
"Requests has determined the content-length for this "
"request using the binary size of the file: however, the "
"file has been opened in text mode (i.e. without the 'b' "
"flag in the mode). This may lead to an incorrect "
"content-length. In Requests 3.0, support will be removed "
"for files in text mode."),
FileModeWarning
)
if hasattr(o, 'tell'):
try:
current_position = o.tell()
except (OSError, IOError):
# This can happen in some weird situations, such as when the file
# is actually a special file descriptor like stdin. In this
# instance, we don't know what the length is, so set it to zero and
# let requests chunk it instead.
current_position = total_length
return max(0, total_length - current_position)
# 如果提供的url中没有scheme,则将new_scheme作为url的scheme,如果有则不改变
def prepend_scheme_if_needed(url, new_scheme):
scheme, netloc, path, params, query, fragment = urlparse(url, new_scheme)
if not netloc:
netloc, path = path, netloc #应对netloc为空的特殊情况
return urlunparse((scheme, netloc, path, params, query, fragment))
# 去掉url中的用户名和密码部分
# 如http://root:root@www.baidu.com/,返回为http://www.baidu.com/
def urldefragauth(url):
scheme, netloc, path, params, query, fragment = urlparse(url)
if not netloc:
netloc, path = path, netloc
netloc = netloc.rsplit('@', 1)[-1]
return urlunparse((scheme, netloc, path, params, query, ''))
_CLEAN_HEADER_REGEX_BYTE = re.compile(b'^\\S[^\\r\\n]*$|^$') #\S匹配任意非空格字符,--
_CLEAN_HEADER_REGEX_STR = re.compile(r'^\S[^\r\n]*$|^$') # 匹配以非空开头,非\r\n结尾或者空白字符串
# 检验头的有效性防止header injection
def check_header_validity(header):
name, value = header
if isinstance(value, bytes):
pat = _CLEAN_HEADER_REGEX_BYTE
else:
pat = _CLEAN_HEADER_REGEX_STR
try:
if not pat.match(value):
raise InvalidHeader("Invalid return character or leading space in header: %s" % name)
except TypeError:
raise InvalidHeader("Header value %s must be of type str or bytes, "
"not %s" % (value, type(value)))
def get_auth_from_url(url):
parsed = urlparse(url)
try:
auth = (unquote(parsed.username), unquote(parsed.password))
except (AttributeError, TypeError):
auth = ('', '')
return auth
def is_ipv4_address(string_ip):
try:
socket.inet_aton(string_ip)
except socket.error:
return False
return True
def is_valid_cidr(string_network):
"""
判断是否是类似192.168.2.1/16的地址
"""
if string_network.count('/') == 1:
try:
mask = int(string_network.split('/')[1])
except ValueError:
return False
if mask < 1 or mask > 32:
return False
try:
socket.inet_aton(string_network.split('/')[0])
except socket.error:
return False
else:
return False
return True
# native string 指类型为str的字符串,python2与python3中str是不一样的(http://img.hysyeah.top/2017/10/02/python%E7%BC%96%E7%A0%81/)
# Http请求/响应头和元数据要求数据为str类型。
def to_native_string(string, encoding='ascii'):
if isinstance(string, builtin_str):
out = string
else:
if is_py2:
out = string.encode(encoding)
else:
out = string.decode(encoding)
return out
# 将(key,val)为元素的列表转换为OrderedDict
def from_key_val_list(value):
if value is None:
return None
if isinstance(value, (str, bytes, bool, int)):
raise ValueError('cannot encode objects that are not 2-tuples')
return OrderedDict(value)
# 将dict转换为list
def to_key_val_list(value):
if value is None:
return None
if isinstance(value, (str, bytes, bool, int)):
raise ValueError('cannot encode objects that are not 2-tuples')
if isinstance(value, collections.Mapping):
value = value.items()
return list(value)