@spiritnotes
2016-06-14T16:43:54.000000Z
字数 28877
阅读 3573
Python 读书笔记 DOING
a, b, c = iterable
a, _, _ = iterable
a, b, c = itreable #变量可以为空
可以嵌套: a, _, (_, c) = iterable
* 在处理函数参数很有用
collections.deque(maxlen=N)
q.append(v),q.appendleft(v),q.pop(),q.popleft()
heapq.nlargest(N, numbers, key=)
heapq.nsmallest(N, numbers, key=)
heapq.heapify(list_), heapq.heappop(list_)
使用 heapq, 复杂度 O(logN)
heapq.heappush(q, (-priority, index, item)) #index对于相同优先级排位使用
heapq.heappop(q)[-1]
值用列表或者字典来表示
collections.defaultdict
dict_.setdefault(key, default_value).append(v) #每次调用都创建一个default_value
collections.OrderedDict,迭代时严格按照元素添加的次序
内部维护了一个双向链表,空间使用较多
value_key = min/max/sorted(zip(dict_.values(),dict_.keys()))
注意 zip 创建了一个迭代器,只能消费一次
key_min = min(dict_)
key_min_value = min(dict_, key=lambda x:dict_[x])
a.keys() & b.keys()
a.items() & b.items()
key-view对象支持常见的集合运算,& | -
item-view对象也支持
values()方法不支持,values可能不是唯一的
使用生成器+set实现,如果不支持hash,手动配置一个hash
def get_unique_same_order(l):seen = set()for i in l:if i not in seen:yield iseen.add(i)
shares = slice(begin, end)
slice_.start, slice_.stop, slice_.step
collections.Counter
counter_.most_common/update/['key']/['key']+=1
counter_1 +/- counter_2
sorted(dicts, key=operator.itemgetter('key','key2')) #比lambda更快
sorted(dicts, key=lambda dict: dict['key'],dict['key2'])
sorted(objs, key=operator.attrgetter('attr1','attr2'))
sorted(objs, key=lambda obj_:obj_.attr1,obj.attr2)
先排序,然后使用 itertools.groupby()
datas.sort(key=attrgetter('date'))for date, items in groupby(key=attrgetter('date')):for i in items:pass
使用一个defaultdict(list)进行记录,避免排序,更快
列表推导式 [k for k in l if xx(k)]
生成表达式 (k for k in l if xx(k))
filter(xx, l)产生一个迭代器
itertools.compress(iterable, bool_iterable)
字典推导式 {key:val for key,val in dict_.items() if xx(y)}
nametupel_name = collections.namedtuple(nametupel_name, ['key1','key2',...])
_ = nametupel_name(v1, v2)
_.key1, _.key2
namedtuple是不可变的,所需空间比字典少
_ = _._replace(key1 = v3)
如果实例对象修改多,则最好使用slots属性的类
需要调用换算reduction(sum、max、min、any)之前先要进行转换或筛选
生成器表达式 sum(f(i) for i in l if xx)
有些reduction函数支持 key 参数进行先转换
collections.ChainMap
支持字典的那些操作,使用的是原始的字典,重复键显示前面的
修改映射的操作总是会作用在列出的第一个映射结构上
map_['x'] = 1
map_ = map_.new_child() 产生一个新map并放在前面
map_ = map_.parents 移动到后面的map
可以使用字典以及update方法实现,这样会创建一个新字典
使用正则表达式
import rere.split(r'[;,\s]\s*', line)
如果使用了捕获组(capture group括号),则分隔符也会保留下来
注: "(?:,|;|\s)\s*"也是一种书写方式而不是捕获组
使用 startswith(str/tuple)/endswith(str/tuple)
可以使用切片 [:-2] == x,该方法性能更快
使用re re.match(....)
from fnmatch import fnmatch,fnmatchcasefnmatch('foo.txt','*.txt') #与底层的大小写规则相同fnmatch() #大小写相关
简单模式使用基本的字符串方法
赋值模式使用正则表达式
match方法总是在字符串的开头找匹配项
findall方法找到字符串中所有匹配项
将部分模式用括号包起来的方式引入捕获组
findall以列表的方式返回,finditer以迭代的方式返回
多次执行匹配和查找,应该先将模型编译然后重复使用
简单文本:使用 str_.replace()
复杂文本:使用 re.sub(r'(\d+)/(\d+)/(\d+), r'\3-\1-\2), txt_)
再复杂,使用回调函数 re.sub(r'..', fun_, txt_),回调函数的输入参数是一个匹配对象,由match或find返回,可以用group方法来获取匹配中特定的部分
使用 re.subn()除了替换外还返回替换的次数
使用re re.findall(..., flags=re.IGNORECASE)
如果替换需要的是同样结果,可能需要支撑函数
def match(str_):def repalce(m):text = m.group()if text.isupper():return str_.upper()elif text.islower():return str_.lower()elif text[0].isupper()return str_.capitalize()else:return str_return replacere.sub('python',match('snake'),text,flags=re.IGNORECASE)
模式的*或者+号操作符后面加上?修饰符
.号不匹配换行符,因此可以采用 '.|\n'
re.DOTALL,使得.号可以匹配所有字符
有的文本有多种表现形式
使用unicodedata.normalize('NFC', s1) ...
NFC:全组成
NFD:组合字符
考虑使用第三方库
strip lstrip rstrip,支持自定义多个字符
对于处于中间的,replace, re.sub('\s+',' ')
translate,建立小型的转换表
ljust rjust center,并可以可选填充字符
format(text, '>20s')
format(text, '>10.2f')
'{:-^20s}'.format(text)
字符串在序列或可迭代对象中 -- 使用join()方法,实际使用中最好考虑生成器
字符串较少 -- 使用 +
较复杂的格式 -- 使用format
源码中字符串合并 -- 直接连续写 'aaa''bbb'
python本身不支持,使用var()
name, n = 'a', 20'{name} has {n} ... '.format_map(var())# some class instance aa.name, a.n = 'b', 10'{name} has {n} ... '.format_map(var(a))# 使用定义了missing的类来包裹class safesub(dict):def __missing__(self, key):return '('+key+')''{name} has {n} ... '.format_map(safesub(var()))def sub(text):return text.format_map(safesub(sys._getframe(1).f_locals))
os.get_terminal_size().columns()
textwarp.fill(s, 40, subsequent_indent='')
转为html,转义特殊字符 -- html.escape(s, quote=False)
转回使用 html.parser.HTMLParser().unescape(s)
使用正则表达式的命名组来识别词以及类型
NAME = r'(?P<NAME>[a-zA-Z_][a-zA-Z_0-9]*)'NUM = r'(?P<NUM>\d+')TIMES = r'(?P<TIMES>\*)'...master_pat = re.compile('|'.join([NAME, ... WS]) # re 按照指定的顺序对模式进行匹配scanner = match_pat.scanner(s='foo = 42')while .._ = scanner.match()_.lastgroup, _.group()for m in iter(scanner.match, None):yield m.lastgroup, m.group()
对于复杂的使用PyParsing/PLY解析工具
基本上与str一样,除了c[0]在b和str中返回不一样,b中返回数值
round(value, ndigits), ndigits可以为负数,取整到十位、百位等
使用decimal模块
可以使用decimal.localcontext()进行精确控制
控制位数、对齐、包含千位分隔符
使用内建format函数,format(x, '<10.1f')
整数转为文本格式,使用 bin、hex、oct函数
要去掉前面的0x、0o、0b,使用format函数 format(x, 'b')
int.from_bytes(data, 'little')
int_.to_bytes(16, 'big')
可以使用struct模块,struct解包的整数大小是有限制的
int_obj.bit_length() 来获得位数
产生 complex(real, imag) 或者 a + bj
a.real / a.imag / a.conjugate()
需要使用复数,则使用 cmath 模块
float('inf') float('-inf') float('nan')
math.isinf(x) math.isnan(x)
NaN的微妙特性是做比较时从不会被判定为相等
float('inf') == float('inf') => False
float('inf') == float('inf') => False
模块fractions
使用numpy库
numpy.linalg
random.choice 不取走
random.sample 取走
random.suffle 洗牌
random.randint 随机整数
random.random 0~1之间小数
random.getrandbits 随机nbits数
datetime中timedelta
复杂应用可以使用 dateutil 模块
使用date_.weekday() weekdays.index(x)以及7进行运算
可以使用 dateutl 直接计算
start = date.today().replace(day=1)
_, days_in_month = calendar.monthrange(start.year, start.month)
end = start + timedelta(days=days_in_month)
datetime.strptime(s, forat) 性能慢
datetime.strftime(x, format_)
使用 pytz 模块
建立将所有时间转为 UTC 时间再进行操作,世界统一时间
使用 next(x) 函数,手动捕捉 StopIteration
使用 next(x, None) 函数,使其返回指定值
原理
直接将对self的迭代转移到其它对象中
def __iter__(self):return iter(other_obj/self.child)
生成器不需要调用 iter, 直接使用 next
使用生成器,简化编写 iter next等函数
使用内建 reversed(x) 函数,要求对象实现了 __reversed__() 函数
想要定义一个生成器函数,但是需要向外部暴露状态信息
实现成一个类,类具有状态,定义其 __iter__ 为生成器函数,也可以将其实现为闭包
lines = GeneratorClass(Some_Status)for line in lines:....
对迭代器和生成器做切片操作,使用 itertools.islice(c, 10, 20) 函数
islice 会消耗掉迭代器或生成器里面的数据
itertools.dropwhile(lambda ..., x) 根据条件跳过
itertools.islice(c, 5, None) 跳过前5行
itertools.permutations(items, n)
itertools.combinations(items, n)
iteryools.combinations_with_replacement(....) 元素可重复出现
enumerate(lists, start=0)
for n, (x,y) in enumerate(data) 元组的列表
使用zip函数,以较短的输入序列长度为准
itertools.zip_longest(a,v,fillvalue=0)
dict(zip(headers, values))
itertools.chain(a, b, c)
生成器函数是一种实现管道机制的好方法。
lognames = gen_find('access-log','www')files = gen_opener(lognames)lines = gen_concatenate(files)pylines = gen_grep('(?i)python', lines)for line in pylines:...# 注意python3中引入的 yield from
def flatten(items, ignore_type=(str, bytes)):for x in items:if isinstance(x, Iterable) and not isinstance(x, ignore_types):yield from flatten(x)else:yield x## yield from x 相当于 如下,但是有优化## for i in x:## yield i
for c in heapq.merge(ordered_a, ordered_b):...# 只是简单取两个队列,将小的发出去,不会一次性取整个队列
支持一个无参的可调用对象以及一个哨兵(结束)值作为输入
for chunk in iter(lambda: f.read(10),b''):....
文件读取的默认编码方式 sys.getdefaultencoding()
open函数中可以带 encoding参数, erros参数
print(xxx, file=x)
print(xxx, file=f) # 必须以文本模式打开
使用 print 以及 sep、end 参数
也可以使用 join,其缺点是其序列元素必须为字符串
使用 open 函数的 wb、rb 参数
bytes 对象 索引会返回整数
将字符串二进制方式写入文件,需要对字符串编码
底层数组或c结构体可以直接二进制读写
使用 xt、xb 模式(python3.3)创建文件并写入,如文件存在则报错
io.StringIO 以及 io.BytesIO 可以模仿文件等执行 io 操作
write / read / getvalue
可以用于单元测试中
使用 gzip.open(...) / bz2.open(...)
可以对已经打开的二进制模式的文件进行叠加操作,可以同各种类型的类文件对象如套接字、管道、内存文件一起工作
使用 iter() 和 functools.partial() 来完成
records = iter(partial(f.read, RECORD_SIZE), b'')for r in records:...
使用文件对象的 readinto(x) 方法,避免额外内存分配
mmap.mmap(fd, size, accsess=...)# 使用 memoryview 来以不同的方式访问它
os.path
os.listdir(x)
可以通过 glob 或 fnmatch 对文件名进行匹配操作
对 open/listdir 提供 字节串 参数
使用 repr(filename)[1:-1]
用 io.TextIOWrapper() 对其进行包装
对sys.out....
将字节数据写入文件底层的buffer中就可以了
sys.stdout.buffer.write(b'...')
可能为文件、管道、socket等,
f = open(fd,'wt',closefd=False)
tempfile.TemporaryFile() 可接受和内建的open函数一样的参数
NamedTemporaryFile() .name 含有其文件名
TemporaryDirectory
还有底层 mkstemp mkdtemp
tempfile.gettempdir()
ser = Serial.Serial('/dev/...' ...)
pickle.dump(..) /dumps(...)
pickle.load(..) /loads(...)
使用CSV库,还可以使用命名元组
with open(...) as f:f_csv = csv.reader(f) ### csv.DictReader(...)heading = next(f_csv)Row = namedtuple('Row', headings)for r in f_csv:row = Row(*r)row. a ### row['a']with open('..', 'w') as f:f_csv = csv.writer(f) ### DictWriter(f, headers)f_csv.writerow(headers)f_csv.writerows(rows)
json.dumps() / json.loads()
json.dump() / json.load()
映射不同点
通过 loads 的 object_pairs_hook 或 object_hook 参数,来将JSON解码为 OrderedDict 或 某对象(创建的对象作为参数传递给类的 __init__)
几个有用的选项
对类对象进行序列化和反序列化
def serialize_instance(obj):d = { "__classname__": type(obj).__name__}d.update(vars(obj))return ddef unserialize_object(d):clsname = d.pop('__classname__', None)if clsname:cls = classes(clsname)obj = cls.__new__(cls) # mk instance without call initfor key, value in d.items():setattr(obj, key, value)return objreturn ds = json.dumps(p, defautlt=serialize_instance)a = json.loads(s, object_hook=unserialize_object)
xml.etree.ElementTree
复杂的过程则采用 lxml
使用 iterparse
def dict_to_xml(tag, d):elem = Element(tag)for key, val in d.items():child = Element(key)child.text = str(val)elem.append(child)return elem# 赋值属性 e.set('_id', '1234')
比手工处理好,能处理val的特殊字符,手工处理需要采用 escape() / unescape()
使用 root.remove() / child.insert() / root.append()
还可以使用索引和切片操作 element[i], element[i:j]
将字节码编码或解码为十六进制数
binascii.b2a_hex(s)
binascii.a2b_hex(h) # 字符串为 b'hello'
还可以使用base64模块
base64.b16encode(s) / base64.b16decode(h) # 只对大写16进制格式
base64.b64encode(s)
base64.b64decode(a)
使用 struct 模块
使用 iter 模块来简化代码
使用 numpy.fromfile(...) 直接全部读出
文件含有文件头和后续的不定长度的数据记录
def fun(k, *args, y, z, **kwargs): # y,z为命名参数,只能通过y=,z=定义...
放置在以*打头的参数之后
def recv(maxsize, *, block): # block 只能命名指定def minium(*values, clip=None): # ...def trun(*args, **kwargs):trun2(*args, **kwargs)def trun2(*args, para, **kwargs)
函数注解 fun.annotations 属性
def add(x:int, y:int) -> int:pass # 对原函数没有任何影响
使用元组
在参数后 para=init_value
如果是容器则需要确认是否期望副作用,不期望则使用 None
用来检查该值是否赋值
_no_value = object()def spam(a, b=_no_value):if b is _no_value: # b 未被赋值pass
lambda 表达式创建函数对象
lambda 表达式中的x为自由变量,在运行时才进行绑定而不是定义的时候
如果期望使用定义时,则可以使用默认参数达到效果
使用 functools.partial() 来绑定部分参数,位置参数从左到右匹配
可以用在函数调用上,也可以用在实例的创建上
也可以使用 lambda 来代替 partial 的使用
单个方法,除了init外
def urlopen(temp): ## 使用闭包def open(**kwargs):return ....
class A:def __init__(self, ..):...def handler(self, ...):...apply_async(add, (2, 3), callback=r.handler)
def make_handler():sequence = 0while True:result = yieldsequence += 1print ...handler = make_handler()next(handler)apply_async(add, (2, 3), callback=handler.send)
apply_async(add, (2, 3), callback=partial(handler, seq=seq))
apply_async(add, (2, 3), callback=lambda r:handler(r, seq))
class Async:def __init__(self, fun, args):self.fun = funself.args = argsdef inlined_async(func):@wraps(func)def wrapper(*args):f = func(*args)result_queue = Queue()result_queue.put(None)while True:result = result_queue.get()try:a = f.send(result)apply_async(a.func, a.args, callback=result_queue.put)except StopIteration:breakreturn wrapper@inlined_asyncdef test():r = yield Async(add, (2,3))r = yield Async(add, (1,1))for i in range(10):r = yield Async(add, (i,i))print(r)pool = multiprocessing.pool()apply_async = pool.apply_asynctest()
def sample():n = 0def fun():passdef get_n():return ndef set_n(value):nolocal nn = valuefun.get = get_nfun.set = set_nreturn funf = sample()f.set(5)f.get()f()
因此可以用闭包来模拟类
def Stack():local_val = []def push():..def pop():..return ClosureInstance() # 当stack内部的函数update到其__dict__中,闭包处理更快,因为不涉及self变量
class Pair():def __repr__(self):return 'Pair({0.x!r,{0.y!r})'.format(self) ### eval(repr(obj)) == objdef __str__(self):return 'Pair({0.x!s,{0.y!s})'.format(self)
定义 __format__(self, code) 函数
def __format__(self, code):fmt = _formats[code]return fmt.format(d=self)# 使用format(instance_, code ## = '')
实现 __enter__ 和 __exit__(self, exc_ty, exc_val, tb) 函数
with some_instance as instance_some_member: ....
可以修改 enter 和 exit 使其可以用在嵌套的 with 中(创建新对象并保存到栈中)
在类定义中定义 __slots__属性,这样针对实例会采用更加紧凑的内部表示,不会对每个实例再创建dict了,缺点是没有办法为实例再添加新的属性了,相当于将数据保存在元组中(也不能使用多重继承了)
class ...__slots__ = ['year', 'month', 'day']
以 _ 开头的名字应该总是被认为is只属于内部实现
以 __ 开头的名称会导致出现名字重整 (name mangling),为了继承,这样的属性不能通过继承而覆盖,如 __init__
如果定义变量与保留字冲突,可以在其名字后添加一个_,如 class_
## 方法1class Person:@propertydef first_name(self):pass@first_name.setterdef first_name(self, value):pass@first_name.deleterdef first_name(self):pass## Person.first_name.fget/fset/fdel 为底层函数## 方法2class ...def ...name = property(get_fun, set_fun, del_fun)
使用super函数
常见用途
class Proxy:def __getattr__(self, name):return getattr(self._obj, name)
class Sub(Father):@propertydef name(self):return supper().name@name.setterdef name(self,value):super(Sub, Sub).name.__set__(self, value) ## 必须以类变量使用@name.deleterdef name(self):super(Sub, Sub).name.__delete__(self)
class Sub(Father):@Father.name.getter ## 使用父类,否则其setter和deleter相当于未定义def name(self):return supper().name
描述符是以特殊方法 __get__(), __set__() 和 __delete__() 的形式实现了三个核心的属性访问操作(对应于get、set、delete)的类。通过接受类实例作为输入来工作。
class I:def __get__(self, instance, cls): ## 以类调用时,instance为Noneif instance is None:return self...def __set__(self, instance, value):...def __delete__(self, instance):...class P:x = I()y = I()...p = P()p.x ## call P.x.__get__(p, P)p.x = v ## call P.x.__set__(p, v)P.x ## call P.x.__get__(None, P)
@classmethod\@staticmethod\@property__slots__底层均由描述符实现
class Typed:def __init__(self, name, type_):self.name = nameself.type_ = type_def __get__(self, instance, cls):if instance is None: return selfreturn instance.__dict__[self.name]def __set__(self, instance, value):if not isinstance(value, self.type_):raise ...instance.__dict__[self.name] = valuedef __delete__(self, instance):del instance.__dict__[self.name]def typeassert(**kwargs):def wrap(cls):for name, type_ in kwargs.items():setattr(cls, name, Typed(name, type_))return clsreturn wrap@typeassert(name=str, age=int)class Person:def ....
缺点是该值变为可修改了
class Lazyproperty:def __init__(self, func):self.func = funcdef __get__(self, instance, cls):if instance is None:return selfelse:value = self.func(isntance)setattr(instance, self.func.__name__, value) ## instance.x为value,A.x为Lazyproperty对象return valueclass A:@Lazypropertydef area(self):...
将初始化数据结构归纳到一个单独的__init__()中,缺点是帮忙以及IDE的智能补全
class Structed:_fields = []def __init__(self, *args):if len(args) != len(self._fields):raise ...for name, arg in zip(self._fields, args):setattr(self, name, arg) ## 使用setattr而不使用__dict__也可以应付slots或者property等情况class A(Structed):_fields = ['name', 'shares', 'price']## 对于**keargs,可以使其做关键字映射,也可以将其作为_fields外的额外属性添加def init_fromlocals(self):import syslocs = sys._getframe(1).f_localsfor k,v in locs.items():if k != 'self':setattr(self, k, v)class Stock:def __init__(self, name, shares, price):init_fromlocals(self) ## 比前面方面慢,且更复杂
使用abc模块,不可实例化,强制规定所需的编程接口
from abc import ABCMeta, abstractmethodclass IStream(metaclass=ABCMeta):@abstractmethoddef read(self, maxbytes=-1):pass@abstractmethoddef write(self, data):pass## error a = IStream()## using IStreamif not isinstance(stream, IStream):raise TypeError('....')## 可与其他修饰符一块使用class A(metaclass=ABCMeta):@property@abstractmethoddef name(self):pass
collections模块中定义了多个和容器还有迭代器(序列、映射、集合等)相关的抽象基类
from collections import Sequence, Iterable, Sized, Mapping
对属性的设定做定制化处理,使用描述符来完成
class Descriptor:def __init__(self, name=None, **opts):self.name = namefor key, value in opts.items():setattr(self, key, value)def __set__(self, instance, value):instance.__dict__.[self.name] = valueclass Typed(Descriptor):expceted_type = type(None)def __set__(self, instance, value):if not isinstance(value, self.expected_type):raise ...super().__set__(instance, value)class String(Typed):expected_type = strclass SizedString(String, MaxSized):passclass Stock:name = SizedString('name', size=8)
使用方法
### case 1 类装饰器def check_attributes(**kwwargs):def decorate(cls):for key, value in kwargs.items():if isinstance(value, Descriptor):value.name = keysetattr(cls, key, value)else:setattr(cls, key, value(key))return clsreturn decorate@check_attributes(name=SizedString(size=8),shares=UnsignedInteger,price=UnsigendFloat)class Stock:def __init__(self, name, shares, price): ...### case 2 元类class checkedmeta(type):def __new__(cls, clsname, bases, methods):for key, value in mothods.items():if isinstance(value, Descriptor):value.name = keyreturn type.__new__(cls, clsname, bases, methods)class Stock(metaclass=checkedmeta):name = SizedString(size=8)...def __init__(self, name ...):self.name = name
继承自collections库中的抽象基类,会强制要求我们定义必须的方法
访问实例的属性时能够将其委托到一个内部持有的对象上。常用管理是代理类只委托那些不以下划线开头的属性(即代理类只暴露内部类的公有属性)
## 方法1, 方法数目不多时class ...def spam(self, x):return self._a.spam(x)## 方法2...def __getattr__(self, name): ## 找不到name时调用return getattr(self._a, name)## 有时候可以使用委托来替代继承class B(A):...class B:def __init__(self):self._a = A()def bar(...def __getattr__(self, name): ## 不支持大部分以双下划线开头和结尾的特殊方法,必须手动定义return getattr(self._a, name)def __len__(self):return len(self._a)
可以采用类方法来创建对象
class A:def __init__(...): ...@classmethoddef special_a(cls):return cls(...) ## 使用 cls 使其在继承子类中调用时返回为子类对象## 通过init *args或**kwargs可以达到灵活的目的,但是其不容易理解
直接使用 new 来跳过 init 调用,常见的例子是反序列化、以及另外的构造函数(类函数实现)
d = Date.__new__(Date)## 手动设置属性for key, value in data.items():setattr(d, key, value)
使用 Mixin 来对类方法进行定制化处理
class LoggedMappingMixin:__slots__ = ()def __getitem__(self, key):print ..return super().__getitem__(key)...class LoggedDict(LoggedMappingMixin, dict):pass
Mixin 类的特点
另外一种方法是装饰器
def LoggedMapping(cls):cls_getitem = cls.__getitem__...def __getitem__(self, key):....cls_getitem(self, key)...cls.__getitem__ = __getitem__...return cls@loggedMappingclass LoggedDict(dict):pass
# oldclass Connection:def open():if self.state is open():raise ..do open# newclass Connection:def open():self.state.open(self) ## 状态保留在本实例中class ConnectionBase():@staticmethoddef open(conn): ## 这些类只提供方法,不保存状态raise ...class OpendConnection(ConnectionBase):@staticmethoddef open(conn):...class ClosedConnection(ConnectionBase):@staticmethoddef open(conn):do open
class Connection: ## 代码执行速度更快def new_state(self,new_state):self.__class__ = new_stateclass ClosedConnection(Connection):def ...
实例:表达式树,并实现其运行方法(求值、生成代码等)
class Node(): passclass BInaryOperator(Node):def __init__(self, left, right):... # 赋值class Add(BInaryOperator): pass# 运算方法基类class NodeVistor:def visit(self, node):methname = 'visit_' + type(node).__name__meth = getattr(self, methname, None)if meth is None:meth = self.generic_visitreturn meth(node)def generic_visit(self, node):raise ...class Evaluator(NodeVistor):def visit_Number(self, node): return node.valuedef visit_Add(self, node): return self.visit(node.left) + self.visit(node.right)# 用于http处理class HTTPHandler():def handler(self, request):methname = 'do_' + request.request_methodgetattr(self, methname)(request)def do_Get(self, request): ...def do_Post(self, request): ...
使用堆栈和生成器
class NodeVisitor:def visit(self, node):stack = [node]last_result = Nonewhile stack:try:last = stack[-1]if isinstance(last, types.GeneratorType):stack.append(last.send(last_result))last_result = Noneelif isinstance(last, node):stack.append(self._visit(stack.pop()))else:last_result = stack.pop()except StopIteration:stack.pop()return last_resultdef _visit(self, node):methname = 'visit_' + type(node).__name__meth = getattr(self, methname, None)if meth is None:meth = self.generic_methodreturn meth(node)def gen...class UnaryOperator(Node):...class Evaluatior(NodeVisitor): ...def visit_add(self, node):return self.visit(node.left) + self.visit(node.right) ## 递归会产生问题yield (yield node.left) + (yield node.right)
使用weakref来避免内存不被释放
取得弱引用的对象需要通过函数式的调用
使用functools.total_ordering装饰器,可以只定义__eq__和其余任意一个比例操作就可以了(le, ge, lt, gt)
def total_ordering(cls):cls.__le__ = lambda self, other: self < other and self == other...return new_cls
同样的参数产生同一个对象
要实现该行为,应该使用一个与类本身相分离的工厂函数
可以使用weakref.WeakValueDictionary作为缓存存储位置
如果修改__new__,在有对象时不再调用父类__new__,则会导致__init__每次都会被调用
可以将缓存管理独立成为单独的类,更加灵活
为防止用户自己创建类对象,可以使用_Spam定义类名,将__init__设计为抛出异常,而类工厂函数通过__new__创建对象
元编程主要是创建函数和类,并用它们来操纵代码(修改、生成或者包装已有的代码)
给函数加上一个包装层(wrapper layer),如记录日志、计时统计,使用装饰器函数
@timethisdef fun() ...## => fun = timethis(fun)
如函数名、文档字符串、函数注解以及调用签名
使用functools.wraps装饰器保存元数据
可以通过__wrapped__属性来访问原函数
def timethis(fun):@wraps(fun)def wrapper(*args, **kwargs):fun ...return wrapper
如果使用了functools.wraps则可以使用__wrapped__属性
对于staticmethod和classmethod使用__func__属性
多加一层函数
@logged(level_, msg)def add(a, b): return a + badd.set_level(level_2)add.set_msg(new_msg)def attach_wrapper(obj, fun=None):if fun is None:return partial(attach_wrapper, obj)setattr(obj, fun.__name__, fun)return fundef logged(level, msg):def decord(fun):@wraps(fun)def wrapper(*args, **kwargs):fun(...)@attach_wrapper(wrapper)def set_level(level):nplocal levellevel = _levelreturn wrapsreturn decord
如果使用wraps则属性可以依次传递,对于 @logged @timethis 和 @timethis @logged 都可以起作用,而如果采用将属性赋值到函数上的方法对于上面两种方式一种会有问题
def logged(fun=None, *, level=DEBUG, msg=''):if fun is None:return partial(logged, level=level, msg=msg)...@logged # 也可以采用@logged()def a() ...@logged(level=INFO, ...)def b() ...
def asserttype(*o_args, **o_kwargs):def dec(fun):def wrap(*args, **kwargs):for p, t in zip(args, o_args):if not isinstance(p, t):raise ...return wrapreturn dec## 使用signature简化处理def dec(fun):if not __debug__:return funsig = inspect.signature(fun)bound_types = sig.bind_partial(*o_args, **o_kwargs).argments@wraps(fun)def wrapper(*args, **kwargs):bound_values = sig.bind(*args, **kwargs)for name, value in bound_values.argments.items():if name in bound_types:...fun(args, kwargs)
可以使用注解提取类型,但是这样一个函数就只能有一种类型,不灵活
内建的@property也是拥有getter、setter、deleter方法的类
使用__init__来定义包
__init__中可以相关模块
对from modelu import *进行控制
在模块文件(a.py)、包文件(__init__.py)中定义 __all__ = ['spam', 'z'...]
from . import grokfrom ..B import barfrom mypackage.A import grok # 使用完整绝对名称# 位于脚本顶层目录的模块不能使用相对导入# 直接以脚本执行,不能使用相对导入,需要使用-m模块模式执行
可以将模块转换为包,在其__init__中将包中其他模块的符号导入,外界的调用代码是不需要修改的
在__init__定义函数来动态导入其他模块内的符号,惰性导入
## a.pyclass A:...## __init\__.pydef A():from .a import Areturn A() # 未更改调用代码# 会破坏继承和类型检查机制 isinstance(x, module.A) 出错了
命名空间包:用于不同目录下的代码,使其位于统一命名空间下管理
#foo-package/spam/blah.py # 无init文件bar-package/spam/grok.py # 无init文件#import syssys.path.extend(['foo-package', 'bar-package'])import spam.blahimport spam.grok#import spamspam.__path__ # 包含相应的文件夹路径spam.__file__ # 出错,命名空间包无该属性
import Aimport impimp.reload(A) # from A import 的变量不会被更像
在目录或者zip文件中添加一个 __main__.py 文件
根据解释器的当前目录不一样,其读取路径也会不一样
通过 __file__ 全路径去找对应的路径,比较麻烦,包也可能为.zip或.egg格式
import pkgutildata = pkgutil.get_data(__package\__, file_name)
使用importlib.import_module(_name)
source = u.read().decode('utf8')mod = sys.modules.setdefault(url, imp.new_module(url))code = compile(source, url, 'exec')mod.__file__ = urlmod.__package__ = ''exec(code, mod.__dict__)
使用urllib.request.urlopen(url, querystring, headers=headers)、urllib.parse.urlencode(parms)
使用requests库,resp.text/content/json/status_code/headers['content-type']/cookies
使用socketserver库,为单线程
class EchoHandler(BaseRequestHandler):def handle(self):while True:msg = self.request.recv(8192)if not msgbreakself.request.send(msg)serv = TCPServer(('', 20000), EchoHandler)serv.serve_forever()# 还可以使用SteamRequestHandler
需要多线程,需要使用ForkingTCPServer,ThreadingTCPServer
使用线程池
serve = TCPServer(('', 2000), EchoHnadler)for n in range(NWORKERS):t = Thread(target=serve.serve_forever)t.daemon = Truet.start()serve.serve_forever()
如果想通过设定socket选项来调整底层socket的行为,可以设置bind_and_activate=False参数
# 方法 1serve = TCPServer(.., bind_and_activate=False)# 方法 2TCPServer.allow_reuse_address = Trueserver = TCPServer(...)
还可以通过StreamRequesthandler的子类来实现
直接通过socket来编写服务器也是很容易的
与TCP类似
class TimeHandler(BaseRequestHandler):def handler(self):msg, sock = self.requestresp = time.ctine()sock.sendto(resp.encode('ascii'), self.client_address)
利用ipaddress模块
net = ipaddress.ip_network('123.45.67.64/27')for a in net: ...net[-1]
WSGI
使用multiprocessing.connection,send、recv消息是完整的
可以将Listener直接改为unix socket对象
适用于长连接,而不是短连接
# 使用heapq、Condition来实现优先级队列# putwith self._cv:heapq.heappush(self._queue, (-priority, self._count, item)self._count += 1self._cv.notify()# getwith self._cv:while len(self._queue) == 0:self._cv.wait()return heapq.heappop(self._queue)[-1]
class SomeClass:_lock = threading.RLock()def fun_a(self):with SomeClass._lock:passdef fun_b(self):with SomeClass._lock:pass
@contextmanagerdef acquire(*locks):locks = sorted(locks, key=lambda x:id(x))acauared = getattr(_local, 'acquired', [])# 比较锁acquired.extend(locks)_local.acquired = acquredtry:for lock in locks:lock.acqure()yieldfinally:for lock in reversed(locks):lock.release()del acquired[-len(locks):]
使用 threading.local(),针对一个对象可以有多份数据
class Conn:def __init__(self):self.x = xself.local = threading.local()def do_something(self):if hasattr(self.local, 'sock'):raiseself.local.sock = xxxconn = Conn()t1 = threading.Thread(target=test, args=(conn,)t1 = threading.Thread(target=test, args=(conn,)
a = pool.submit(fun, *args)a.result()
concurrent.futures.ProcessPoolExecutor
with concurrent.futures.ProcessPoolExecutor() as pool:for reobits in pool.map(func, args): # 也可以使用submit进行单个任务提交pass# with语句完进程池会关闭,程序会一直等待所有提交的任务都处理完毕
同样可以添加回调函数 .add_done_callback(when_done)
只适用于将问题分解成各个独立部分情况
actor任务之间的通信是单向且异步的,没有明确的的响应和确认
使用Queue和多线程定义actor
也可以用生成器来定义简单的actor
消息传递可以加tag,然后可以使用python的动态获取属性进行处理
tag, *payload = self.recv()getattr(self, 'do_'+tag)(*payload)
使用工作者线程执行任意函数
class Result:def __init__(self):self._event = Event()self._result = Nonedef set_result(self, value):self._result = valueself._event.set()def result(self):self._event.wait()return self._resultclass Worker(Actor):def submit(self, fun, *args, **kwargs):r = Result()self.send((fun, args, kwargs, r))def run(self):while True:func, args, kwargs, r = self.recv()r.set_result(func(*args, **kwargs))
引入单独的“交换”或“网关”对象,作为所有消息的中介。
class Exchange:def attach(self, task):self._subscribers.add(task)def deattach(self, task):...def send(self, msg):for subcriber in self._subscribers:subcriber.send(msg)_exchanges = defaultdict(Exchange)def get_exchange(name):return _exchanges[name]
exc = get_exchange('name')with exc.subscribe(task_a, task_b):exc.send(...)