@spiritnotes
2016-06-14T16:43:54.000000Z
字数 28877
阅读 3156
Python
读书笔记
DOING
a, b, c = iterable
a, _, _ = iterable
a, b, c = itreable #变量可以为空
可以嵌套: a, _, (_, c) = iterable
* 在处理函数参数很有用
collections.deque(maxlen=N)
q.append(v),q.appendleft(v),q.pop(),q.popleft()
heapq.nlargest(N, numbers, key=)
heapq.nsmallest(N, numbers, key=)
heapq.heapify(list_), heapq.heappop(list_)
使用 heapq, 复杂度 O(logN)
heapq.heappush(q, (-priority, index, item)) #index对于相同优先级排位使用
heapq.heappop(q)[-1]
值用列表或者字典来表示
collections.defaultdict
dict_.setdefault(key, default_value).append(v) #每次调用都创建一个default_value
collections.OrderedDict,迭代时严格按照元素添加的次序
内部维护了一个双向链表,空间使用较多
value_key = min/max/sorted(zip(dict_.values(),dict_.keys()))
注意 zip 创建了一个迭代器,只能消费一次
key_min = min(dict_)
key_min_value = min(dict_, key=lambda x:dict_[x])
a.keys() & b.keys()
a.items() & b.items()
key-view对象支持常见的集合运算,& | -
item-view对象也支持
values()方法不支持,values可能不是唯一的
使用生成器+set实现,如果不支持hash,手动配置一个hash
def get_unique_same_order(l):
seen = set()
for i in l:
if i not in seen:
yield i
seen.add(i)
shares = slice(begin, end)
slice_.start, slice_.stop, slice_.step
collections.Counter
counter_.most_common/update/['key']/['key']+=1
counter_1 +/- counter_2
sorted(dicts, key=operator.itemgetter('key','key2')) #比lambda更快
sorted(dicts, key=lambda dict: dict['key'],dict['key2'])
sorted(objs, key=operator.attrgetter('attr1','attr2'))
sorted(objs, key=lambda obj_:obj_.attr1,obj.attr2)
先排序,然后使用 itertools.groupby()
datas.sort(key=attrgetter('date'))
for date, items in groupby(key=attrgetter('date')):
for i in items:
pass
使用一个defaultdict(list)进行记录,避免排序,更快
列表推导式 [k for k in l if xx(k)]
生成表达式 (k for k in l if xx(k))
filter(xx, l)产生一个迭代器
itertools.compress(iterable, bool_iterable)
字典推导式 {key:val for key,val in dict_.items() if xx(y)}
nametupel_name = collections.namedtuple(nametupel_name, ['key1','key2',...])
_ = nametupel_name(v1, v2)
_.key1, _.key2
namedtuple是不可变的,所需空间比字典少
_ = _._replace(key1 = v3)
如果实例对象修改多,则最好使用slots属性的类
需要调用换算reduction(sum、max、min、any)之前先要进行转换或筛选
生成器表达式 sum(f(i) for i in l if xx)
有些reduction函数支持 key 参数进行先转换
collections.ChainMap
支持字典的那些操作,使用的是原始的字典,重复键显示前面的
修改映射的操作总是会作用在列出的第一个映射结构上
map_['x'] = 1
map_ = map_.new_child() 产生一个新map并放在前面
map_ = map_.parents 移动到后面的map
可以使用字典以及update方法实现,这样会创建一个新字典
使用正则表达式
import re
re.split(r'[;,\s]\s*', line)
如果使用了捕获组(capture group括号),则分隔符也会保留下来
注: "(?:,|;|\s)\s*"也是一种书写方式而不是捕获组
使用 startswith(str/tuple)/endswith(str/tuple)
可以使用切片 [:-2] == x,该方法性能更快
使用re re.match(....)
from fnmatch import fnmatch,fnmatchcase
fnmatch('foo.txt','*.txt') #与底层的大小写规则相同
fnmatch() #大小写相关
简单模式使用基本的字符串方法
赋值模式使用正则表达式
match方法总是在字符串的开头找匹配项
findall方法找到字符串中所有匹配项
将部分模式用括号包起来的方式引入捕获组
findall以列表的方式返回,finditer以迭代的方式返回
多次执行匹配和查找,应该先将模型编译然后重复使用
简单文本:使用 str_.replace()
复杂文本:使用 re.sub(r'(\d+)/(\d+)/(\d+), r'\3-\1-\2), txt_)
再复杂,使用回调函数 re.sub(r'..', fun_, txt_),回调函数的输入参数是一个匹配对象,由match或find返回,可以用group方法来获取匹配中特定的部分
使用 re.subn()除了替换外还返回替换的次数
使用re re.findall(..., flags=re.IGNORECASE)
如果替换需要的是同样结果,可能需要支撑函数
def match(str_):
def repalce(m):
text = m.group()
if text.isupper():
return str_.upper()
elif text.islower():
return str_.lower()
elif text[0].isupper()
return str_.capitalize()
else:
return str_
return replace
re.sub('python',match('snake'),text,flags=re.IGNORECASE)
模式的*或者+号操作符后面加上?修饰符
.号不匹配换行符,因此可以采用 '.|\n'
re.DOTALL,使得.号可以匹配所有字符
有的文本有多种表现形式
使用unicodedata.normalize('NFC', s1) ...
NFC:全组成
NFD:组合字符
考虑使用第三方库
strip lstrip rstrip,支持自定义多个字符
对于处于中间的,replace, re.sub('\s+',' ')
translate,建立小型的转换表
ljust rjust center,并可以可选填充字符
format(text, '>20s')
format(text, '>10.2f')
'{:-^20s}'.format(text)
字符串在序列或可迭代对象中 -- 使用join()方法,实际使用中最好考虑生成器
字符串较少 -- 使用 +
较复杂的格式 -- 使用format
源码中字符串合并 -- 直接连续写 'aaa''bbb'
python本身不支持,使用var()
name, n = 'a', 20
'{name} has {n} ... '.format_map(var())
# some class instance a
a.name, a.n = 'b', 10
'{name} has {n} ... '.format_map(var(a))
# 使用定义了missing的类来包裹
class safesub(dict):
def __missing__(self, key):
return '('+key+')'
'{name} has {n} ... '.format_map(safesub(var()))
def sub(text):
return text.format_map(safesub(sys._getframe(1).f_locals))
os.get_terminal_size().columns()
textwarp.fill(s, 40, subsequent_indent='')
转为html,转义特殊字符 -- html.escape(s, quote=False)
转回使用 html.parser.HTMLParser().unescape(s)
使用正则表达式的命名组来识别词以及类型
NAME = r'(?P<NAME>[a-zA-Z_][a-zA-Z_0-9]*)'
NUM = r'(?P<NUM>\d+')
TIMES = r'(?P<TIMES>\*)'
...
master_pat = re.compile('|'.join([NAME, ... WS]) # re 按照指定的顺序对模式进行匹配
scanner = match_pat.scanner(s='foo = 42')
while ..
_ = scanner.match()
_.lastgroup, _.group()
for m in iter(scanner.match, None):
yield m.lastgroup, m.group()
对于复杂的使用PyParsing/PLY解析工具
基本上与str一样,除了c[0]在b和str中返回不一样,b中返回数值
round(value, ndigits), ndigits可以为负数,取整到十位、百位等
使用decimal模块
可以使用decimal.localcontext()进行精确控制
控制位数、对齐、包含千位分隔符
使用内建format函数,format(x, '<10.1f')
整数转为文本格式,使用 bin、hex、oct函数
要去掉前面的0x、0o、0b,使用format函数 format(x, 'b')
int.from_bytes(data, 'little')
int_.to_bytes(16, 'big')
可以使用struct模块,struct解包的整数大小是有限制的
int_obj.bit_length() 来获得位数
产生 complex(real, imag) 或者 a + bj
a.real / a.imag / a.conjugate()
需要使用复数,则使用 cmath 模块
float('inf') float('-inf') float('nan')
math.isinf(x) math.isnan(x)
NaN的微妙特性是做比较时从不会被判定为相等
float('inf') == float('inf') => False
float('inf') == float('inf') => False
模块fractions
使用numpy库
numpy.linalg
random.choice 不取走
random.sample 取走
random.suffle 洗牌
random.randint 随机整数
random.random 0~1之间小数
random.getrandbits 随机nbits数
datetime中timedelta
复杂应用可以使用 dateutil 模块
使用date_.weekday() weekdays.index(x)以及7进行运算
可以使用 dateutl 直接计算
start = date.today().replace(day=1)
_, days_in_month = calendar.monthrange(start.year, start.month)
end = start + timedelta(days=days_in_month)
datetime.strptime(s, forat) 性能慢
datetime.strftime(x, format_)
使用 pytz 模块
建立将所有时间转为 UTC 时间再进行操作,世界统一时间
使用 next(x) 函数,手动捕捉 StopIteration
使用 next(x, None) 函数,使其返回指定值
原理
直接将对self的迭代转移到其它对象中
def __iter__(self):
return iter(other_obj/self.child)
生成器不需要调用 iter, 直接使用 next
使用生成器,简化编写 iter next等函数
使用内建 reversed(x) 函数,要求对象实现了 __reversed__() 函数
想要定义一个生成器函数,但是需要向外部暴露状态信息
实现成一个类,类具有状态,定义其 __iter__ 为生成器函数,也可以将其实现为闭包
lines = GeneratorClass(Some_Status)
for line in lines:
....
对迭代器和生成器做切片操作,使用 itertools.islice(c, 10, 20) 函数
islice 会消耗掉迭代器或生成器里面的数据
itertools.dropwhile(lambda ..., x) 根据条件跳过
itertools.islice(c, 5, None) 跳过前5行
itertools.permutations(items, n)
itertools.combinations(items, n)
iteryools.combinations_with_replacement(....) 元素可重复出现
enumerate(lists, start=0)
for n, (x,y) in enumerate(data) 元组的列表
使用zip函数,以较短的输入序列长度为准
itertools.zip_longest(a,v,fillvalue=0)
dict(zip(headers, values))
itertools.chain(a, b, c)
生成器函数是一种实现管道机制的好方法。
lognames = gen_find('access-log','www')
files = gen_opener(lognames)
lines = gen_concatenate(files)
pylines = gen_grep('(?i)python', lines)
for line in pylines:
...
# 注意python3中引入的 yield from
def flatten(items, ignore_type=(str, bytes)):
for x in items:
if isinstance(x, Iterable) and not isinstance(x, ignore_types):
yield from flatten(x)
else:
yield x
## yield from x 相当于 如下,但是有优化
## for i in x:
## yield i
for c in heapq.merge(ordered_a, ordered_b):
...
# 只是简单取两个队列,将小的发出去,不会一次性取整个队列
支持一个无参的可调用对象以及一个哨兵(结束)值作为输入
for chunk in iter(lambda: f.read(10),b''):
....
文件读取的默认编码方式 sys.getdefaultencoding()
open函数中可以带 encoding参数, erros参数
print(xxx, file=x)
print(xxx, file=f) # 必须以文本模式打开
使用 print 以及 sep、end 参数
也可以使用 join,其缺点是其序列元素必须为字符串
使用 open 函数的 wb、rb 参数
bytes 对象 索引会返回整数
将字符串二进制方式写入文件,需要对字符串编码
底层数组或c结构体可以直接二进制读写
使用 xt、xb 模式(python3.3)创建文件并写入,如文件存在则报错
io.StringIO 以及 io.BytesIO 可以模仿文件等执行 io 操作
write / read / getvalue
可以用于单元测试中
使用 gzip.open(...) / bz2.open(...)
可以对已经打开的二进制模式的文件进行叠加操作,可以同各种类型的类文件对象如套接字、管道、内存文件一起工作
使用 iter() 和 functools.partial() 来完成
records = iter(partial(f.read, RECORD_SIZE), b'')
for r in records:
...
使用文件对象的 readinto(x) 方法,避免额外内存分配
mmap.mmap(fd, size, accsess=...)
# 使用 memoryview 来以不同的方式访问它
os.path
os.listdir(x)
可以通过 glob 或 fnmatch 对文件名进行匹配操作
对 open/listdir 提供 字节串 参数
使用 repr(filename)[1:-1]
用 io.TextIOWrapper() 对其进行包装
对sys.out....
将字节数据写入文件底层的buffer中就可以了
sys.stdout.buffer.write(b'...')
可能为文件、管道、socket等,
f = open(fd,'wt',closefd=False)
tempfile.TemporaryFile() 可接受和内建的open函数一样的参数
NamedTemporaryFile() .name 含有其文件名
TemporaryDirectory
还有底层 mkstemp mkdtemp
tempfile.gettempdir()
ser = Serial.Serial('/dev/...' ...)
pickle.dump(..) /dumps(...)
pickle.load(..) /loads(...)
使用CSV库,还可以使用命名元组
with open(...) as f:
f_csv = csv.reader(f) ### csv.DictReader(...)
heading = next(f_csv)
Row = namedtuple('Row', headings)
for r in f_csv:
row = Row(*r)
row. a ### row['a']
with open('..', 'w') as f:
f_csv = csv.writer(f) ### DictWriter(f, headers)
f_csv.writerow(headers)
f_csv.writerows(rows)
json.dumps() / json.loads()
json.dump() / json.load()
映射不同点
通过 loads 的 object_pairs_hook 或 object_hook 参数,来将JSON解码为 OrderedDict 或 某对象(创建的对象作为参数传递给类的 __init__)
几个有用的选项
对类对象进行序列化和反序列化
def serialize_instance(obj):
d = { "__classname__": type(obj).__name__}
d.update(vars(obj))
return d
def unserialize_object(d):
clsname = d.pop('__classname__', None)
if clsname:
cls = classes(clsname)
obj = cls.__new__(cls) # mk instance without call init
for key, value in d.items():
setattr(obj, key, value)
return obj
return d
s = json.dumps(p, defautlt=serialize_instance)
a = json.loads(s, object_hook=unserialize_object)
xml.etree.ElementTree
复杂的过程则采用 lxml
使用 iterparse
def dict_to_xml(tag, d):
elem = Element(tag)
for key, val in d.items():
child = Element(key)
child.text = str(val)
elem.append(child)
return elem
# 赋值属性 e.set('_id', '1234')
比手工处理好,能处理val的特殊字符,手工处理需要采用 escape() / unescape()
使用 root.remove() / child.insert() / root.append()
还可以使用索引和切片操作 element[i], element[i:j]
将字节码编码或解码为十六进制数
binascii.b2a_hex(s)
binascii.a2b_hex(h) # 字符串为 b'hello'
还可以使用base64模块
base64.b16encode(s) / base64.b16decode(h) # 只对大写16进制格式
base64.b64encode(s)
base64.b64decode(a)
使用 struct 模块
使用 iter 模块来简化代码
使用 numpy.fromfile(...) 直接全部读出
文件含有文件头和后续的不定长度的数据记录
def fun(k, *args, y, z, **kwargs): # y,z为命名参数,只能通过y=,z=定义
...
放置在以*打头的参数之后
def recv(maxsize, *, block): # block 只能命名指定
def minium(*values, clip=None): # ...
def trun(*args, **kwargs):
trun2(*args, **kwargs)
def trun2(*args, para, **kwargs)
函数注解 fun.annotations 属性
def add(x:int, y:int) -> int:
pass # 对原函数没有任何影响
使用元组
在参数后 para=init_value
如果是容器则需要确认是否期望副作用,不期望则使用 None
用来检查该值是否赋值
_no_value = object()
def spam(a, b=_no_value):
if b is _no_value: # b 未被赋值
pass
lambda 表达式创建函数对象
lambda 表达式中的x为自由变量,在运行时才进行绑定而不是定义的时候
如果期望使用定义时,则可以使用默认参数达到效果
使用 functools.partial() 来绑定部分参数,位置参数从左到右匹配
可以用在函数调用上,也可以用在实例的创建上
也可以使用 lambda 来代替 partial 的使用
单个方法,除了init外
def urlopen(temp): ## 使用闭包
def open(**kwargs):
return ....
class A:
def __init__(self, ..):
...
def handler(self, ...):
...
apply_async(add, (2, 3), callback=r.handler)
def make_handler():
sequence = 0
while True:
result = yield
sequence += 1
print ...
handler = make_handler()
next(handler)
apply_async(add, (2, 3), callback=handler.send)
apply_async(add, (2, 3), callback=partial(handler, seq=seq))
apply_async(add, (2, 3), callback=lambda r:handler(r, seq))
class Async:
def __init__(self, fun, args):
self.fun = fun
self.args = args
def inlined_async(func):
@wraps(func)
def wrapper(*args):
f = func(*args)
result_queue = Queue()
result_queue.put(None)
while True:
result = result_queue.get()
try:
a = f.send(result)
apply_async(a.func, a.args, callback=result_queue.put)
except StopIteration:
break
return wrapper
@inlined_async
def test():
r = yield Async(add, (2,3))
r = yield Async(add, (1,1))
for i in range(10):
r = yield Async(add, (i,i))
print(r)
pool = multiprocessing.pool()
apply_async = pool.apply_async
test()
def sample():
n = 0
def fun():
pass
def get_n():
return n
def set_n(value):
nolocal n
n = value
fun.get = get_n
fun.set = set_n
return fun
f = sample()
f.set(5)
f.get()
f()
因此可以用闭包来模拟类
def Stack():
local_val = []
def push():..
def pop():..
return ClosureInstance() # 当stack内部的函数update到其__dict__中,闭包处理更快,因为不涉及self变量
class Pair():
def __repr__(self):
return 'Pair({0.x!r,{0.y!r})'.format(self) ### eval(repr(obj)) == obj
def __str__(self):
return 'Pair({0.x!s,{0.y!s})'.format(self)
定义 __format__(self, code) 函数
def __format__(self, code):
fmt = _formats[code]
return fmt.format(d=self)
# 使用
format(instance_, code ## = '')
实现 __enter__ 和 __exit__(self, exc_ty, exc_val, tb) 函数
with some_instance as instance_some_member: ....
可以修改 enter 和 exit 使其可以用在嵌套的 with 中(创建新对象并保存到栈中)
在类定义中定义 __slots__属性,这样针对实例会采用更加紧凑的内部表示,不会对每个实例再创建dict了,缺点是没有办法为实例再添加新的属性了,相当于将数据保存在元组中(也不能使用多重继承了)
class ...
__slots__ = ['year', 'month', 'day']
以 _ 开头的名字应该总是被认为is只属于内部实现
以 __ 开头的名称会导致出现名字重整 (name mangling),为了继承,这样的属性不能通过继承而覆盖,如 __init__
如果定义变量与保留字冲突,可以在其名字后添加一个_,如 class_
## 方法1
class Person:
@property
def first_name(self):
pass
@first_name.setter
def first_name(self, value):
pass
@first_name.deleter
def first_name(self):
pass
## Person.first_name.fget/fset/fdel 为底层函数
## 方法2
class ...
def ...
name = property(get_fun, set_fun, del_fun)
使用super函数
常见用途
class Proxy:
def __getattr__(self, name):
return getattr(self._obj, name)
class Sub(Father):
@property
def name(self):
return supper().name
@name.setter
def name(self,value):
super(Sub, Sub).name.__set__(self, value) ## 必须以类变量使用
@name.deleter
def name(self):
super(Sub, Sub).name.__delete__(self)
class Sub(Father):
@Father.name.getter ## 使用父类,否则其setter和deleter相当于未定义
def name(self):
return supper().name
描述符是以特殊方法 __get__(), __set__() 和 __delete__() 的形式实现了三个核心的属性访问操作(对应于get、set、delete)的类。通过接受类实例作为输入来工作。
class I:
def __get__(self, instance, cls): ## 以类调用时,instance为None
if instance is None:
return self
...
def __set__(self, instance, value):
...
def __delete__(self, instance):
...
class P:
x = I()
y = I()
...
p = P()
p.x ## call P.x.__get__(p, P)
p.x = v ## call P.x.__set__(p, v)
P.x ## call P.x.__get__(None, P)
@classmethod\@staticmethod\@property__slots__底层均由描述符实现
class Typed:
def __init__(self, name, type_):
self.name = name
self.type_ = type_
def __get__(self, instance, cls):
if instance is None: return self
return instance.__dict__[self.name]
def __set__(self, instance, value):
if not isinstance(value, self.type_):
raise ...
instance.__dict__[self.name] = value
def __delete__(self, instance):
del instance.__dict__[self.name]
def typeassert(**kwargs):
def wrap(cls):
for name, type_ in kwargs.items():
setattr(cls, name, Typed(name, type_))
return cls
return wrap
@typeassert(name=str, age=int)
class Person:
def ....
缺点是该值变为可修改了
class Lazyproperty:
def __init__(self, func):
self.func = func
def __get__(self, instance, cls):
if instance is None:
return self
else:
value = self.func(isntance)
setattr(instance, self.func.__name__, value) ## instance.x为value,A.x为Lazyproperty对象
return value
class A:
@Lazyproperty
def area(self):
...
将初始化数据结构归纳到一个单独的__init__()中,缺点是帮忙以及IDE的智能补全
class Structed:
_fields = []
def __init__(self, *args):
if len(args) != len(self._fields):
raise ...
for name, arg in zip(self._fields, args):
setattr(self, name, arg) ## 使用setattr而不使用__dict__也可以应付slots或者property等情况
class A(Structed):
_fields = ['name', 'shares', 'price']
## 对于**keargs,可以使其做关键字映射,也可以将其作为_fields外的额外属性添加
def init_fromlocals(self):
import sys
locs = sys._getframe(1).f_locals
for k,v in locs.items():
if k != 'self':
setattr(self, k, v)
class Stock:
def __init__(self, name, shares, price):
init_fromlocals(self) ## 比前面方面慢,且更复杂
使用abc模块,不可实例化,强制规定所需的编程接口
from abc import ABCMeta, abstractmethod
class IStream(metaclass=ABCMeta):
@abstractmethod
def read(self, maxbytes=-1):
pass
@abstractmethod
def write(self, data):
pass
## error a = IStream()
## using IStream
if not isinstance(stream, IStream):
raise TypeError('....')
## 可与其他修饰符一块使用
class A(metaclass=ABCMeta):
@property
@abstractmethod
def name(self):
pass
collections模块中定义了多个和容器还有迭代器(序列、映射、集合等)相关的抽象基类
from collections import Sequence, Iterable, Sized, Mapping
对属性的设定做定制化处理,使用描述符来完成
class Descriptor:
def __init__(self, name=None, **opts):
self.name = name
for key, value in opts.items():
setattr(self, key, value)
def __set__(self, instance, value):
instance.__dict__.[self.name] = value
class Typed(Descriptor):
expceted_type = type(None)
def __set__(self, instance, value):
if not isinstance(value, self.expected_type):
raise ...
super().__set__(instance, value)
class String(Typed):
expected_type = str
class SizedString(String, MaxSized):
pass
class Stock:
name = SizedString('name', size=8)
使用方法
### case 1 类装饰器
def check_attributes(**kwwargs):
def decorate(cls):
for key, value in kwargs.items():
if isinstance(value, Descriptor):
value.name = key
setattr(cls, key, value)
else:
setattr(cls, key, value(key))
return cls
return decorate
@check_attributes(name=SizedString(size=8),
shares=UnsignedInteger,price=UnsigendFloat)
class Stock:
def __init__(self, name, shares, price): ...
### case 2 元类
class checkedmeta(type):
def __new__(cls, clsname, bases, methods):
for key, value in mothods.items():
if isinstance(value, Descriptor):
value.name = key
return type.__new__(cls, clsname, bases, methods)
class Stock(metaclass=checkedmeta):
name = SizedString(size=8)
...
def __init__(self, name ...):
self.name = name
继承自collections库中的抽象基类,会强制要求我们定义必须的方法
访问实例的属性时能够将其委托到一个内部持有的对象上。常用管理是代理类只委托那些不以下划线开头的属性(即代理类只暴露内部类的公有属性)
## 方法1, 方法数目不多时
class ...
def spam(self, x):
return self._a.spam(x)
## 方法2
...
def __getattr__(self, name): ## 找不到name时调用
return getattr(self._a, name)
## 有时候可以使用委托来替代继承
class B(A):
...
class B:
def __init__(self):
self._a = A()
def bar(...
def __getattr__(self, name): ## 不支持大部分以双下划线开头和结尾的特殊方法,必须手动定义
return getattr(self._a, name)
def __len__(self):
return len(self._a)
可以采用类方法来创建对象
class A:
def __init__(...): ...
@classmethod
def special_a(cls):
return cls(...) ## 使用 cls 使其在继承子类中调用时返回为子类对象
## 通过init *args或**kwargs可以达到灵活的目的,但是其不容易理解
直接使用 new 来跳过 init 调用,常见的例子是反序列化、以及另外的构造函数(类函数实现)
d = Date.__new__(Date)
## 手动设置属性
for key, value in data.items():
setattr(d, key, value)
使用 Mixin 来对类方法进行定制化处理
class LoggedMappingMixin:
__slots__ = ()
def __getitem__(self, key):
print ..
return super().__getitem__(key)
...
class LoggedDict(LoggedMappingMixin, dict):
pass
Mixin 类的特点
另外一种方法是装饰器
def LoggedMapping(cls):
cls_getitem = cls.__getitem__
...
def __getitem__(self, key):
....
cls_getitem(self, key)
...
cls.__getitem__ = __getitem__
...
return cls
@loggedMapping
class LoggedDict(dict):
pass
# old
class Connection:
def open():
if self.state is open():
raise ..
do open
# new
class Connection:
def open():
self.state.open(self) ## 状态保留在本实例中
class ConnectionBase():
@staticmethod
def open(conn): ## 这些类只提供方法,不保存状态
raise ...
class OpendConnection(ConnectionBase):
@staticmethod
def open(conn):
...
class ClosedConnection(ConnectionBase):
@staticmethod
def open(conn):
do open
class Connection: ## 代码执行速度更快
def new_state(self,new_state):
self.__class__ = new_state
class ClosedConnection(Connection):
def ...
实例:表达式树,并实现其运行方法(求值、生成代码等)
class Node(): pass
class BInaryOperator(Node):
def __init__(self, left, right):
... # 赋值
class Add(BInaryOperator): pass
# 运算方法基类
class NodeVistor:
def visit(self, node):
methname = 'visit_' + type(node).__name__
meth = getattr(self, methname, None)
if meth is None:
meth = self.generic_visit
return meth(node)
def generic_visit(self, node):
raise ...
class Evaluator(NodeVistor):
def visit_Number(self, node): return node.value
def visit_Add(self, node): return self.visit(node.left) + self.visit(node.right)
# 用于http处理
class HTTPHandler():
def handler(self, request):
methname = 'do_' + request.request_method
getattr(self, methname)(request)
def do_Get(self, request): ...
def do_Post(self, request): ...
使用堆栈和生成器
class NodeVisitor:
def visit(self, node):
stack = [node]
last_result = None
while stack:
try:
last = stack[-1]
if isinstance(last, types.GeneratorType):
stack.append(last.send(last_result))
last_result = None
elif isinstance(last, node):
stack.append(self._visit(stack.pop()))
else:
last_result = stack.pop()
except StopIteration:
stack.pop()
return last_result
def _visit(self, node):
methname = 'visit_' + type(node).__name__
meth = getattr(self, methname, None)
if meth is None:
meth = self.generic_method
return meth(node)
def gen...
class UnaryOperator(Node):
...
class Evaluatior(NodeVisitor): ...
def visit_add(self, node):
return self.visit(node.left) + self.visit(node.right) ## 递归会产生问题
yield (yield node.left) + (yield node.right)
使用weakref来避免内存不被释放
取得弱引用的对象需要通过函数式的调用
使用functools.total_ordering装饰器,可以只定义__eq__和其余任意一个比例操作就可以了(le, ge, lt, gt)
def total_ordering(cls):
cls.__le__ = lambda self, other: self < other and self == other
...
return new_cls
同样的参数产生同一个对象
要实现该行为,应该使用一个与类本身相分离的工厂函数
可以使用weakref.WeakValueDictionary作为缓存存储位置
如果修改__new__,在有对象时不再调用父类__new__,则会导致__init__每次都会被调用
可以将缓存管理独立成为单独的类,更加灵活
为防止用户自己创建类对象,可以使用_Spam定义类名,将__init__设计为抛出异常,而类工厂函数通过__new__创建对象
元编程主要是创建函数和类,并用它们来操纵代码(修改、生成或者包装已有的代码)
给函数加上一个包装层(wrapper layer),如记录日志、计时统计,使用装饰器函数
@timethis
def fun() ...
## => fun = timethis(fun)
如函数名、文档字符串、函数注解以及调用签名
使用functools.wraps装饰器保存元数据
可以通过__wrapped__属性来访问原函数
def timethis(fun):
@wraps(fun)
def wrapper(*args, **kwargs):
fun ...
return wrapper
如果使用了functools.wraps则可以使用__wrapped__属性
对于staticmethod和classmethod使用__func__属性
多加一层函数
@logged(level_, msg)
def add(a, b): return a + b
add.set_level(level_2)
add.set_msg(new_msg)
def attach_wrapper(obj, fun=None):
if fun is None:
return partial(attach_wrapper, obj)
setattr(obj, fun.__name__, fun)
return fun
def logged(level, msg):
def decord(fun):
@wraps(fun)
def wrapper(*args, **kwargs):
fun(...)
@attach_wrapper(wrapper)
def set_level(level):
nplocal level
level = _level
return wraps
return decord
如果使用wraps则属性可以依次传递,对于 @logged @timethis 和 @timethis @logged 都可以起作用,而如果采用将属性赋值到函数上的方法对于上面两种方式一种会有问题
def logged(fun=None, *, level=DEBUG, msg=''):
if fun is None:
return partial(logged, level=level, msg=msg)
...
@logged # 也可以采用@logged()
def a() ...
@logged(level=INFO, ...)
def b() ...
def asserttype(*o_args, **o_kwargs):
def dec(fun):
def wrap(*args, **kwargs):
for p, t in zip(args, o_args):
if not isinstance(p, t):
raise ...
return wrap
return dec
## 使用signature简化处理
def dec(fun):
if not __debug__:
return fun
sig = inspect.signature(fun)
bound_types = sig.bind_partial(*o_args, **o_kwargs).argments
@wraps(fun)
def wrapper(*args, **kwargs):
bound_values = sig.bind(*args, **kwargs)
for name, value in bound_values.argments.items():
if name in bound_types:
...
fun(args, kwargs)
可以使用注解提取类型,但是这样一个函数就只能有一种类型,不灵活
内建的@property也是拥有getter、setter、deleter方法的类
使用__init__来定义包
__init__中可以相关模块
对from modelu import *进行控制
在模块文件(a.py)、包文件(__init__.py)中定义 __all__ = ['spam', 'z'...]
from . import grok
from ..B import bar
from mypackage.A import grok # 使用完整绝对名称
# 位于脚本顶层目录的模块不能使用相对导入
# 直接以脚本执行,不能使用相对导入,需要使用-m模块模式执行
可以将模块转换为包,在其__init__中将包中其他模块的符号导入,外界的调用代码是不需要修改的
在__init__定义函数来动态导入其他模块内的符号,惰性导入
## a.py
class A:
...
## __init\__.py
def A():
from .a import A
return A() # 未更改调用代码
# 会破坏继承和类型检查机制 isinstance(x, module.A) 出错了
命名空间包:用于不同目录下的代码,使其位于统一命名空间下管理
#
foo-package/spam/blah.py # 无init文件
bar-package/spam/grok.py # 无init文件
#
import sys
sys.path.extend(['foo-package', 'bar-package'])
import spam.blah
import spam.grok
#
import spam
spam.__path__ # 包含相应的文件夹路径
spam.__file__ # 出错,命名空间包无该属性
import A
import imp
imp.reload(A) # from A import 的变量不会被更像
在目录或者zip文件中添加一个 __main__.py 文件
根据解释器的当前目录不一样,其读取路径也会不一样
通过 __file__ 全路径去找对应的路径,比较麻烦,包也可能为.zip或.egg格式
import pkgutil
data = pkgutil.get_data(__package\__, file_name)
使用importlib.import_module(_name)
source = u.read().decode('utf8')
mod = sys.modules.setdefault(url, imp.new_module(url))
code = compile(source, url, 'exec')
mod.__file__ = url
mod.__package__ = ''
exec(code, mod.__dict__)
使用urllib.request.urlopen(url, querystring, headers=headers)、urllib.parse.urlencode(parms)
使用requests库,resp.text/content/json/status_code/headers['content-type']/cookies
使用socketserver库,为单线程
class EchoHandler(BaseRequestHandler):
def handle(self):
while True:
msg = self.request.recv(8192)
if not msg
break
self.request.send(msg)
serv = TCPServer(('', 20000), EchoHandler)
serv.serve_forever()
# 还可以使用SteamRequestHandler
需要多线程,需要使用ForkingTCPServer,ThreadingTCPServer
使用线程池
serve = TCPServer(('', 2000), EchoHnadler)
for n in range(NWORKERS):
t = Thread(target=serve.serve_forever)
t.daemon = True
t.start()
serve.serve_forever()
如果想通过设定socket选项来调整底层socket的行为,可以设置bind_and_activate=False参数
# 方法 1
serve = TCPServer(.., bind_and_activate=False)
# 方法 2
TCPServer.allow_reuse_address = True
server = TCPServer(...)
还可以通过StreamRequesthandler的子类来实现
直接通过socket来编写服务器也是很容易的
与TCP类似
class TimeHandler(BaseRequestHandler):
def handler(self):
msg, sock = self.request
resp = time.ctine()
sock.sendto(resp.encode('ascii'), self.client_address)
利用ipaddress模块
net = ipaddress.ip_network('123.45.67.64/27')
for a in net: ...
net[-1]
WSGI
使用multiprocessing.connection,send、recv消息是完整的
可以将Listener直接改为unix socket对象
适用于长连接,而不是短连接
# 使用heapq、Condition来实现优先级队列
# put
with self._cv:
heapq.heappush(self._queue, (-priority, self._count, item)
self._count += 1
self._cv.notify()
# get
with self._cv:
while len(self._queue) == 0:
self._cv.wait()
return heapq.heappop(self._queue)[-1]
class SomeClass:
_lock = threading.RLock()
def fun_a(self):
with SomeClass._lock:
pass
def fun_b(self):
with SomeClass._lock:
pass
@contextmanager
def acquire(*locks):
locks = sorted(locks, key=lambda x:id(x))
acauared = getattr(_local, 'acquired', [])
# 比较锁
acquired.extend(locks)
_local.acquired = acqured
try:
for lock in locks:
lock.acqure()
yield
finally:
for lock in reversed(locks):
lock.release()
del acquired[-len(locks):]
使用 threading.local(),针对一个对象可以有多份数据
class Conn:
def __init__(self):
self.x = x
self.local = threading.local()
def do_something(self):
if hasattr(self.local, 'sock'):
raise
self.local.sock = xxx
conn = Conn()
t1 = threading.Thread(target=test, args=(conn,)
t1 = threading.Thread(target=test, args=(conn,)
a = pool.submit(fun, *args)
a.result()
concurrent.futures.ProcessPoolExecutor
with concurrent.futures.ProcessPoolExecutor() as pool:
for reobits in pool.map(func, args): # 也可以使用submit进行单个任务提交
pass
# with语句完进程池会关闭,程序会一直等待所有提交的任务都处理完毕
同样可以添加回调函数 .add_done_callback(when_done)
只适用于将问题分解成各个独立部分情况
actor任务之间的通信是单向且异步的,没有明确的的响应和确认
使用Queue和多线程定义actor
也可以用生成器来定义简单的actor
消息传递可以加tag,然后可以使用python的动态获取属性进行处理
tag, *payload = self.recv()
getattr(self, 'do_'+tag)(*payload)
使用工作者线程执行任意函数
class Result:
def __init__(self):
self._event = Event()
self._result = None
def set_result(self, value):
self._result = value
self._event.set()
def result(self):
self._event.wait()
return self._result
class Worker(Actor):
def submit(self, fun, *args, **kwargs):
r = Result()
self.send((fun, args, kwargs, r))
def run(self):
while True:
func, args, kwargs, r = self.recv()
r.set_result(func(*args, **kwargs))
引入单独的“交换”或“网关”对象,作为所有消息的中介。
class Exchange:
def attach(self, task):
self._subscribers.add(task)
def deattach(self, task):
...
def send(self, msg):
for subcriber in self._subscribers:
subcriber.send(msg)
_exchanges = defaultdict(Exchange)
def get_exchange(name):
return _exchanges[name]
exc = get_exchange('name')
with exc.subscribe(task_a, task_b):
exc.send(...)