0%

通常在我们在编写 Python 程序的时候,需要什么库我们就直接用 pip 安装了,这种情况下是全局式的安装。我们在创建一个 Python 项目的时候,例如有的项目需要 Python3,有的需要 Python2,为了避免项目依赖的互相影响以及去除不需要的库,通常会通过虚拟环境来解决。

这个虚拟环境其实相当于另外独立出一个 Python 环境,有自己的独立的库,独立的依赖关系,是在开发项目时候的利器。话不多说,下面讲一下使用方法。

阅读全文 »

数据结构

如何将列表,字典等进行合并

列表

  • 使用 +
  • extend
  • 切片
1
2
3
4
5
6
l1 =[1,2,3]
l2 = [4,5,6]
l1 + l2 # 会创建第三个列表
Out[22]: [1, 2, 3, 4, 5, 6]
l1.extend(l2) # 会修改 l1
l1[len(l1):len(l2)] = l2 # 会修改 l1

字典

1
2
3
4
5
6
7
8
9
10
11
12
# 
context = {}
context.update(defaults)
context.update(user)
#
context = defaults.copy()
context.update(user)
#
context = dict(defaults)
context.update(user)
# python 3.5
context = {**defaults, **user}

如何筛选列表、集合、字典中的数据

列表

  • filter 函数
  • 列表解析

列表解析更好,时间更短。

1
2
3
4
5
6
7
8
9
10
11
12
13
# coding:utf-8

from random import randint

data = [randint(-10, 10) for x in xrange(10)]

# filter 函数
data2 = filter(lambda i: i > 0, data)

# 列表解析
data3 = [x for x in data if x > 0]

print data, data2, data3

字典

  • 字典解析
1
2
dictdata = {x: randint(0, 100) for x in xrange(0, 50)}
dictdata2 = {k: v for k, v in dictdata.iteritems() if v >= 60}

集合

  • 集合解析
1
2
setdata = set(data)
setdata2 = {x for x in setdata if x > 0}

如何根据字典中值的大小,对字典中的项排序

  • 使用内置函数 sorted
    1. 使用 zip 将字典转换为元组(将值放在第一个)
    2. 使用 sorted 排序
  • 使用 sorted 函数的 key 参数
1
2
3
4
5
from random import randint
d = {x:randint(60,100) for x in 'xyzabc'}
sorted(zip(d.itervalues(),d.iterkeys()))
# Out[7]: [(65, 'a'), (68, 'x'), (88, 'z'), (91, 'y'), (96, 'b'), (98, 'c')]
sorted(d.iteritems(),key=lambda x:x[1])

如何为元组中的每个元素命名,提高程序可读性

  • 定义类似于其他语言中的枚举类型,也就是定义一系列数值常量
  • 使用标准库中的 collections.namedtuple
1
2
3
4
5
6
7
from collections import namedtuple
student = namedtuple('student',['name','age'])
s1 = student('jim',16) # 位置参数,也可使用关键字参数
s2 = student(name='tom',age=12)
# 可以使用类对象的方式访问数据
s1.name
# Out[19]: 'jim'

如何统计序列中元素的出现频度

问题:

  1. 找到某随机序列中出现次数最高的三个元素
  2. 对某英文文章的单词进行词频统计,找到出现次数最高的 10 个单词
  • 构造字典
  • 使用 collections 下的 Counter
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from random import randint
data = [randint(0, 10) for x in xrange(20)]
# 第一种方式
c = dict.fromkeys(data,0)
for x in data:
c[x] += 1
# 然后回到之前的问题,根据字典的值,对字典的键排序
sorted(c.iteritems(),key=lambda x:x[1])

# 第二种方式
from collections import Counter
c2 = Counter(data)
# c2
# Out[33]: Counter({0: 4, 1: 1, 2: 1, 3: 3, 4: 2, 5: 1, 6: 3, 8: 1, 9: 3, 10: 1})
c2.most_common(3) # 输出出现频度最高的三个单词
# Out[8]: [(3, 3), (4, 3), (7, 3)]

如何快速找到多个字典中的公共键

  • 使用字典的 viewkeys 方法
1
2
3
4
5
6
7
from random import randint, sample
s1 = {x: randint(1,4) for x in sample('abcdefg', randint(3,6))}
s2 = {x: randint(1,4) for x in sample('abcdefg', randint(3,6))}
# 取交集
s2.viewkeys() & s1.viewkeys()
# map, reduce
reduce(lambda a, b: a & b, map(dict.viewkeys, [s1, s2]))

如何让字典保持有序

问题:按照创建元素的顺序创建字典

  • 使用 collections 下的 OrderedDict
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 一个简单的竞赛排名系统
from random import randint
from time import time
from collections import OrderedDict

d = OrderedDict()
players = list('ABCDEFGH')
start = time()

for i in xrange(8):
raw_input()
p = players.pop(randint(0, 7 - i))
end = time()
print i + 1, p, end - start
d[p] = (i + 1, end - start)
print
print ' ' * 20
for k in d:
print k, d[k]

如何实现用户的历史记录功能(最多 n 条)

  • 使用 collections 中的 deque(双端循环队列)实现
  • 使用 pickle 序列化保存数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from random import randint
from collections import deque

N = randint(0, 100)
history = deque([], 5)

def guess_digit(k):
if k == N:
print 'right'
return True
if k < N:
print '%s is less than N' % k
else:
print '%s is greater than N' % k
return False

while True:
line = raw_input('please input a number')
if line.isdigit():
k = int(line)
history.append(k)
if guess_digit(k):
break
elif line == 'history':
print list(history)

# 使用 pickle 序列化对象
import pickle
pickle.dump(history,open('history','w'))
pickle.load('history')

迭代器与生成器

参考:https://nvie.com/posts/iterators-vs-generators/

可迭代对象需要通过 iter () 方法实现为迭代器,然后才能通过 next() 迭代。

生成器是特殊(优雅)的迭代器(通过 yield 方法实现迭代器)。

一句话就是,只有迭代器对象才可以迭代,可迭代对象的意思就是可以实现为迭代器对象,然后迭代,生成器就是迭代器。

如何实现可迭代对象和迭代器对象

可迭代对象接口:__iter__(),__getitem__(),__next__()

迭代器对象接口:next()

  • 实现一个迭代器对象,next() 方法
  • 实现一个可迭代对象,__iter__() 方法返回一个迭代器对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from collections import Iterable, Iterator
import requests


class WeatherIterator(Iterator): # 迭代器对象
def __init__(self, cities):
self.cities = cities
self.index = 0

def get_weather(self, city):
r = requests.get(u'http://wthrcdn.etouch.cn/weather_mini?city=' + city)
data = r.json()['data']['forecast'][0]
return '%s: %s, %s ' % (city, data['low'], data['high'])

def next(self):
if self.index == len(self.cities):
raise StopIteration
city = self.cities[self.index]
self.index += 1
return self.get_weather(city)


class WeatherIterable(Iterable): # 可迭代对象
def __init__(self, cities):
self.cities = cities

def __iter__(self):
return WeatherIterator(self.cities)


for x in WeatherIterable([u'北京', u'上海']):
print x

如何使用生成器函数实现可迭代对象

  • 将 __iter__() 方法实现为 yield() 函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class PrimeNumbers:  # 生成器实现可迭代对象
def __init__(self, start, end):
self.start = start
self.end = end

def is_primenum(self, k):
if k < 2:
return False
for i in xrange(2, k):
if k % i == 0:
return False
return True

def __iter__(self):
for k in xrange(self.start, self.end + 1):
if self.is_primenum(k):
yield k


for x in PrimeNumbers(1, 100):
print x

如何进行反向迭代以及如何实现反向迭代

问题:实现一个浮点数发生器,根据范围和步长迭代

  • 使用 reversed() 方法

引子:如何将列表反向?

1
2
3
4
5
l = [1, 2, 3, 4, 5]
l.reverse() # 改变了原列表
l[::-1] # 得到了一个新列表
reversed(l) # 反向迭代器
iter(l) # 正向迭代器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class FloatFange:
def __init__(self, start, end, step=0.1):
self.start = start
self.end = end
self.step = step

def __iter__(self):
t = self.start
while t <= self.end:
yield t
t += self.step

def __reversed__(self):
t = self.end
while t >= self.start:
yield t
t -= self.step


for x in reversed(FloatFange(2, 4)): # 反向迭代器
print x

如何对迭代器做切片操作

问题:有某个文本文件,我们想读取其中某范围的内容,如 100-300 行的内容

  • 使用标准库中的 itertools.islice,它能返回一个迭代对象切片的生成器
1
2
3
4
5
6
7
from itertools import islice

f = open('test')
islice(f, 100, 300) # 100-300 行
islice(f, 500) # 前 500 行
islice(f, 100, None) # 100 行到最后
# islice 会消耗迭代对象。也就是其实是都迭代了,但是只不过把不符合要求的抛弃了

如何在一个 for 语句中迭代多个可迭代对象

问题:

  • 例如有两科成绩,想要算出总成绩
  • 例如有三个班的成绩,想要找出大于 90 分的成绩
1
2
3
4
5
6
7
8
9
10
11
12
13
14
from random import randint
from itertools import chain

math = [randint(0, 100) for x in xrange(40)]

english = [randint(0, 100) for x in xrange(40)]

for m, e in zip(math, english): # zip 函数可以迭代多个对象
sum = m + e
# print sum

for s in chain(math,english): # chain 函数可以将多个串联起来
if s > 90:
print s

字符串处理

如何拆分含有多种分隔符的字符串

问题:s = ‘ab;cd,|enf|def,kdjk;kdjfk\dd’ 有多种分隔符,如何处理

  • 连续使用 str.split() 方法,每次处理一种分隔符
  • 使用正则表达式的 re.split() 方法,一次性拆分字符串
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import re

s = 'ab;cd,|enf|def,kdjk;kdjfk sd'


def my_split(s, ds): # 字符串原始的分隔方法,需要循环
res = [s]
print res
for d in ds:
t = []
map(lambda x: t.extend(x.split(d)), res)
res = t

return [x for x in res if x]


print my_split(s, ';, |')

split_pattern = '[,; |]' # 使用正则表达式来分隔
print [x for x in re.split(split_pattern,s) if x]

如何判断字符串 a 是否以字符串 b 开头或者结尾

问题:某文件目录下有一系列文件,例如 .c,.py,.js

  • 使用 str.startwith(),str.endwith(),如果有多个结尾的话,需要用元组传入进去
1
2
3
4
5
6
7
8
import os, stat

[name for name in os.listdir('.') if name.endswith(('.sh', '.py'))]

os.stat('e.py').st_mode # 获取用户权限
stat.S_IXUSR # 用户权限

os.chmod('e.py', os.stat('e.py').st_mode | stat.S_IXUSR) # 修改文件权限

如何调整字符串中文本的格式

问题:例如需要更改 log 文件中的日期表示形式

  • 使用正则表达式 re.sub() 方法做字符串替换,利用正则表达式的捕获组,捕获每个部分的内容,在替换字符串中调整各个捕获组的顺序
1
2
3
4
import re

re.sub('(\d{4})-(\d{2})-(\d{2})', r'\2/\3/\1', file) # 将某一个日志文件中的日期格式修改,1,2,3 代表捕获组的序号
re.sub('(?P<year>\d{4})-(?P<month>\d{2})-(?P<day>\d{2})', r'\g<month>/\g<day>/\g<year>', file) # 第二种方式

如何将多个小字符串拼接成一个大的字符串

问题:

  • 迭代列表,使用 + 拼接字符串。(有巨大的开销浪费,拼接过的字符串只是一个临时的)
  • 使用 join 方法
1
2
3
4
5
6
7
8
9
10
11
s1 = 'ddjfaljfl;'
s2 = 'ddj1222'

s1 + s2 # 运算符重载 + == __add__

l = ['dd', 'kjdlaj', 'kke']
l2 = ''.join(l)
l1 = ['dd', 123, 'dd', 45]
print ['dd', 'kjdlaj', 'kke']
l3 = [str(x) for x in l1] # 如果列表很长,开销巨大
l3 = (str(x) for x in l1) # 推荐使用生成器表达式

如何对字符串进行左,右,居中对齐

问题:

  • 使用字符串的 str.ljust(),str.rjust(),str.center() 对齐
  • 使用 format 方法,传递类似 ‘<20’,’>20’,’^20’ 参数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
s.ljust(20)
# Out[12]: 'abc '
s.rjust(20)
# Out[13]: ' abc'
s.center(20)
# Out[14]: ' abc '
format(s,'<20')
# Out[15]: 'abc '
format(s,'>20')
# Out[16]: ' abc'
format(s,'^20')
# Out[17]: ' abc '

w = max(map(len, d.key())) # 找出字典中最长的键
for k in d:
print k.ljust(w), ':', d[k]

如何去掉字符串中不需要的字符

  • 字符串 strip(),lstrip(),rstrip() 方法取掉字符串两端字符
  • 删除单个固定位置的字符,可以使用切片+拼接的方式
  • 字符串的 replace() 方法或正则表达式 re.sub() 删除任意位置字符
  • 字符串 translate() 方法,可以同时删除多种不同字符
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 去掉字符串两端的字符
s = ' abc '
s.strip() # 空白
# Out[19]: 'abc'
s = '---dbc+++'
s.strip('-+')
# Out[21]: 'dbc'

# 去掉固定位置字符
s = 'dbc:ddd'
s[:3] + s[4:]
# Out[23]: 'dbcddd'

# 去掉任意位置
s = '/4kdlkj/4'
s.replace('/4','')
# Out[25]: 'kdlkj'
# 或者用正则表达式的 re.sub

# translate 方法,unicode 也有一个
s = 'abc123456xyz'
import string
string.maketrans('abcxyz','xyzabc') # 建立映射表
s.translate(string.maketrans('abcxyz','xyzabc'))
# Out[30]: 'xyz123456abc'
s.translate(None,'ax') # 第一个为空,表示删除字符
# Out[33]: 'bc123456yz'

unicode.translate() # 映射表为一个字典

文件 I/O 操作

如何读写文本文件

关于 python 的编解码,看这一篇就够了:https://foofish.net/why-python-encoding-is-tricky.html

1
2
3
4
5
6
7
8
9
10
11
12
13
s = u'你好'
# python 2
f = open('filename','w')
f.write(s.encode('utf-8')) # 写入
f.read().decode('utf-8') # 读取

# python 3
f = open('filename','rt',encoding='utf-8') # t 指定文本模式
f.write(s)

# tips
# 如果一次性读完了所有文件,需要使用
f.seek(0) # 回到开头

如何处理二进制文件

二进制文件,例如:视频,图片,录音等。

大小端字节

参考:https://blog.csdn.net/hnlyyk/article/details/52541923

大端:低地址存放高字节位

小端:低地址存放低字节位

  • open 函数指定模式 ‘b’ 打开文件
  • 二进制数据可以用 readinto,读入到提前分配好的 buffer 中
  • 解析二进制数据可以使用标准库中的 struct 模块的 unpack 方法
1
2
3
4
5
6
7
8
9
import struct
import array
f = open('demo.wav','rb')
struct.unpack('h','\x01\02') # 小端存储,h 表示 short
struct.unpack('>h','\x01\02') # 大端存储
f.seek(0,2) # 将文件指针移到末尾
f.tell()
n = (f.tell() - 44)/2
buff = array.array('h',(0 for _ in xrange(n))) # 创建数组

如何设置文件的缓冲

问题:

1
2
3
f = open('demo.txt','w',buffering=2048)  # 全缓冲,默认 4096B
f = open('demo.txt','w',buffering=1) # 行缓冲,遇到换行符就写入一次
f = open('demo.txt','w',buffering=0) # 无缓冲

如何将文件映射到内存

  • 使用标准库中的 mmap 模块的 mmap() 函数
1
2
3
4
5
6
7
8
9
10
11
12
import mmap

f = open('demo.bin', 'r+b')
f.fileno() # 文件描述符
m = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_WRITE)
m[0] # 索引
m[10:20] # 切片
m[0] = '\x88' # 修改文件

# mmap(fileno, length[, flags[, prot[, access[, offset]]]])

mmap.mmap(f.fileno(), mmap.PAGESIZE * 8, access=mmap.ACCESS_WRITE,offset=mmap.PAGESIZE*4)

如何访问文件的状态

  • 系统调用:os, stat 模块
  • os.path 下的函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import os, stat
import os.path

s = os.stat('filename') # 跟随符号链接,指向原始文件
s
Out[8]: posix.stat_result(st_mode=33188, st_ino=2373061, st_dev=16777217, st_nlink=1, st_uid=501, st_gid=20, st_size=2064, st_atime=1530613571, st_mtime=1530613571, st_ctime=1530613571)

os.lstat('filename') # 不跟随符号链接
os.fstat('filename') # 打开的是文件描述符
stat.S_ISDIR(s.st_mode)

# os.path
os.path.isdir()
os.path.islink()

如何使用临时文件

1
2
3
4
5
6
7
8
9
10
from tempfile import TemporaryFile, NamedTemporaryFile
f = TemporaryFile() # 在文件系统中找不到
f.write('abcdef'*1000000)
f.seek(0)
f.read(100)
# Out[15]: 'abcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcd'
ntf = NamedTemporaryFile() # 可以得到临时文件的路径,可以多个进程访问
ntf.name
Out[17]: '/var/folders/6t/b8j3jt1x3nl5bp70ll6fmqfh0000gn/T/tmpa6DRYC'

数据编码与处理

如何读写 csv 数据

  • 使用标准库 csv 模块,可以使用其中 reader 和 writer 完成 csv 文件读写
1
2
3
4
5
6
import csv
rf = open('pingan.csv','rb')
reader = csv.reader(rf)
wf = open('pingan.csv','wb')
writer = csv.writer(wf)
writer.writewrow() # 按行写

如何读写 json 数据

  • 使用标准库中的 json 模块,其中 loads,dumps 函数可以完成 json 数据的读写
1
2
3
from record import Record  # 录音模块
record = Record(channels=1)
audioData = record.record(2)
1
2
3
4
5
6
7
8
9
import json
l = [1, 2, 'abc', {'name': 'bpb', 'age': 'ddd'}]
s = json.dumps(l,separators=[',', ':'])
# Out[9]: '[1,2,"abc",{"age":"ddd","name":"bpb"}]'
json.dumps(d, sort_keys=True)
json.loads(s)
# 文件
json.dump() # 写入到文件
json.load() # 从文件导入

如何解析简单的 xml 文档

  • 使用标准库中的 xml 模块
1
2
from xml.etree import ElementTree
# 与 xpath 表达式类似

如何构建 xml 文档

1
2
3
4
5
6
7
8
9
10
11
from xml.etree.ElementTree import Element, ElementTree
e = Element('Data')
e.tag
Out[21]: 'Data'
e.set('name', 'abc')
from xml.etree.ElementTree import Element, ElementTree, tostring
tostring(e)
Out[24]: '<Data name="abc" />'
e.text = '123'
tostring(e)
Out[26]: '<Data name="abc">123</Data>'
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import csv
from xml.etree.ElementTree import Element, ElementTree
from e2 import pretty # 美化格式的小函数

def csvToXml(fname):
with open(fname, 'rb') as f:
reader = csv.reader(f)
headers = reader.next()

root = Element('Data')
for row in reader:
eRow = Element('Row')
root.append(eRow)
for tag, text in zip(headers, row):
e = Element(tag)
e.text = text
eRow.append(e)

pretty(root)
return ElementTree(root)

et = csvToXml('pingan.csv')
et.write('pingan.xml')

如何读写 excel 文件

  • 使用 xlrd 和 xlwt(不好用)

类与对象

如何派生内置不可变类型并修改实例化行为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# coding:utf-8


class IntTuple(tuple):
def __new__(cls, iterable):
g = (x for x in iterable if isinstance(x, int) and x > 0)
return super(IntTuple, cls).__new__(cls, g)

def __init__(self, iterable):
# before
print self
super(IntTuple, self).__init__(iterable)
# after


t = IntTuple([1, -1, 'abc', 6, ['x', 'y'], 3])
print t

如何为创建大量实例节省内存

  • 定义类的 __slots__ 属性,用来声明实例属性名字的列表

普通的实例创建完成之后,会有一个 __dict__ 方法,这个方法可以动态绑定实例,但是也造成了很大的内存开销,所以可以通过 __slot__ 属性,阻止动态绑定实例

如何让对象支持上下文管理器

  • 实现上下文管理协议,需要定义实例的 __enter__, __exit__ 方法,它们分别在 with 开始和结束时被调用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# 在类中实现
def __enter(self):
#...
#...
# 需要返回值 作为 with .. as .. 的对象
return self

def __exit__(self, exc_type, exc_val, exc_tb):
# 关闭实例
return True # 则不会抛出异常
# 例子
from telnetlib import Telnet
from sys import stdin, stdout
from collections import deque

class TelnetClient(object):
def __init__(self, addr, port=23):
self.addr = addr
self.port = port
self.tn = None

def start(self):
raise Exception('Test')
# user
t = self.tn.read_until('login: ')
stdout.write(t)
user = stdin.readline()
self.tn.write(user)

# password
t = self.tn.read_until('Password: ')
if t.startswith(user[:-1]): t = t[len(user) + 1:]
stdout.write(t)
self.tn.write(stdin.readline())

t = self.tn.read_until('$ ')
stdout.write(t)
while True:
uinput = stdin.readline()
if not uinput:
break
self.history.append(uinput)
self.tn.write(uinput)
t = self.tn.read_until('$ ')
stdout.write(t[len(uinput) + 1:])

def cleanup(self):
pass

def __enter__(self):
self.tn = Telnet(self.addr, self.port)
self.history = deque()
return self

def __exit__(self, exc_type, exc_val, exc_tb):
print 'In __exit__'

self.tn.close()
self.tn = None
with open(self.addr + '_history.txt', 'w') as f:
f.writelines(self.history)
return True

with TelnetClient('127.0.0.1') as client:
client.start()

print 'END'

'''
client = TelnetClient('127.0.0.1')
print '\nstart...'
client.start()
print '\ncleanup'
client.cleanup()
'''

如何创建可管理的对象属性

问题:不想让用户通过属性来管理,形式上是使用属性访问,实际上调用方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from math import pi

class Circle(object):
def __init__(self, radius):
self.radius = radius

def getRadius(self):
return self.radius

def setRadius(self, value):
if not isinstance(value, (int, long, float)):
raise ValueError('wrong type.')
self.radius = float(value)

def getArea(self):
return self.radius ** 2 * pi
R = property(getRadius, setRadius) # get 方法,set 方法

c = Circle(3.2)
c.R
c.R = 23
print c.R

如何让类支持比较操作

  • 比较运算符重载,需要实现以下方法:__lt__, __le__, __gt__, __ge__, __eq__, __ne__
  • 使用标准库下的 functools 下的类装饰器 total_ordering 可以简化此过程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# 第一种方法,挨个实现
class Rectangle(object):
def __init__(self, w, h):
self.w = w
self.h = h

def area(self):
return self.w * self.h

def __lt__(self, obj):
print 'in__lt__'
return self.area() < obj.area()

# 第二种方法
from functools import total_ordering
from abc import abstractmethod


@total_ordering # 实现运算符重载
class Shape(object):

@abstractmethod
def area(self): # 抽象接口
pass

# 而实现其他的大于等于,小于等于等的逻辑都是通过小于和等于来实现的,例如,>= 就是 not __lt__
def __lt__(self, obj):
print 'in__lt__'
if not isinstance(obj, Shape):
raise TypeError('obj is not shape')
return self.area() < obj.area()

def __eq__(self, obj):
print 'in__eq__'
if not isinstance(obj, Shape):
raise TypeError('obj is not shape')
return self.area() == obj.area()


class Rectangle(Shape):
def __init__(self, w, h):
self.w = w
self.h = h

def area(self):
return self.w * self.h


class Circle(Shape):
def __init__(self, r):
self.r = r

def area(self):
return self.r ** 2 * 3.14


r1 = Rectangle(5, 3)
r2 = Circle(3)

print r1 < r2 #
print r2 < r1

如何使用描述符对实例属性做类型检查

要求:

  • 可以指定类型
  • 类型不正确时抛出异常
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class Attr(object):
def __init__(self, name, type_):
self.name = name
self.type_ = type_

def __get__(self, instance, cls):
return instance.__dict__[self.name]

def __set__(self, instance, value):
if not isinstance(value, self.type_):
raise TypeError('expected an %s' % self.type_)
instance.__dict__[self.name] = value

def __delete__(self, instance):
raise AttributeError("can't delete this attr")

class Person(object):
name = Attr('name', str)
age = Attr('age', int)
height = Attr('height', float)
weight = Attr('weight', float)

s = Person()
s.name = 'Bob'
s.age = 17
s.height = 1.82
s.weight = 52.5

如何在环状数据结构中管理内存

  • 使用标准库 weakref,它可以创建一种能访问对象但不增加引用计数的对象。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import weakref

class Data(object):
def __init__(self, value, owner):
self.owner = weakref.ref(owner) # 弱引用
self.value = value

def __str__(self):
return "%s's data, value is %s" % (self.owner(), self.value) # 引用函数

def __del__(self):
print 'in Data.__del__'

class Node(object):
def __init__(self, value):
self.data = Data(value, self)

def __del__(self):
print 'in Node.__del__'

node = Node(100)
del node
raw_input('wait...')

如何通过实例方法名字的字符串调用方法

  • 使用内置函数 getattr,通过名字在实例上获取方法对象,然后调用
  • 使用标准库 operator 下的 methodcaller 函数调用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 第一种方法
from lib1 import Cirle
from lib2 import Triangle
from lib3 import Rectangle


def getArea(shape):
for name in ('area', 'getArea', 'get_area'):
f = getattr(shape, name, None)
if f:
return f()


shape1 = Circle(2)
shape2 = Triangle(3, 4, 5)
shape3 = Rectangle(6, 4)

shapes = [shape1, shape2, shape3]
print map(getArea, shapes)

# 第二种方法
from operator import methodcaller

s = 'abc123abc456'
print s.find('abc', 4)
print methodcaller('find', 'abc', 4)(s)

多线程与多进程

如何使用多线程

  • IO 密集型
  • CPU 密集型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from threading import Thread


class MyThread(Thread):
def __init__(self, sid):
Thread.__init__(self)
self.sid = sid

def run(self):
handle(self.sid)


threads = []
for i in xrange(1, 11):
t = MyThread(i)
threads.append(t)
t.start()
for t in threads:
t.join()

如何线程间通信

1
from Queue import Queue

如何在线程间进行事件通知

  • 等待事件用 wait
  • 通知事件用 set
1
from threading import Event, Thread

如何使用线程本地数据

如何使用线程池

如何使用多进程

装饰器

如何使用函数装饰器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# coding:utf-8

"""
题目 1 计算斐波那契数列 1,1,2,3,5,8,13
"""


def cache(func):
cache = {}

def wrap(*args):
# print args
if args not in cache:
cache[args] = func(*args)
return cache[args]

return wrap


# 常规写法
@cache # 加装饰器
def fibonacci(n):
if n <= 1:
return 1
return fibonacci(n - 1) + fibonacci(n - 2)


# 加缓存
def fibonacci_cache(n, cache=None):
if cache is None:
cache = {}
if n in cache:
return cache[n]
if n <= 1:
return 1
cache[n] = fibonacci_cache(n - 1, cache) + fibonacci_cache(n - 2, cache)
return cache[n]


# 题目 2 一个共有 10 个台阶的楼梯,从下面走到上面,一次只能迈 1-3 个台阶,并且不能后退,走完这个楼梯共有多少种方法
@cache
def climb(n, steps):
count = 0
if n == 0:
count = 1
elif n > 0:
for step in steps:
count += climb(n - step, steps)
return count


# 加缓存

# print fibonacci(50)
# print fibonacci_cache(50)

print climb(20, (1, 2,3))

如何为被装饰的函数保存元数据

参考:https://foofish.net/python-decorator.html

  • functools.wraps

使用装饰器极大地复用了代码,但是他有一个缺点就是原函数的元信息不见了,比如函数的docstring__name__、参数列表,先看例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 装饰器
def logged(func):
def with_logging(*args, **kwargs):
print func.__name__ # 输出 'with_logging'
print func.__doc__ # 输出 None
return func(*args, **kwargs)
return with_logging

# 函数
@logged
def f(x):
"""does some math"""
return x + x * x

logged(f)
1
2
3
4
5
6
7
8
9
10
11
12
13
from functools import wraps
def logged(func):
@wraps(func)
def with_logging(*args, **kwargs):
print func.__name__ # 输出 'f'
print func.__doc__ # 输出 'does some math'
return func(*args, **kwargs)
return with_logging

@logged
def f(x):
"""does some math"""
return x + x * x

如何定义带参数的装饰器

带参数的装饰器

装饰器还有更大的灵活性,例如带参数的装饰器,在上面的装饰器调用中,该装饰器接收唯一的参数就是执行业务的函数 foo 。装饰器的语法允许我们在调用时,提供其它参数,比如@decorator(a)。这样,就为装饰器的编写和使用提供了更大的灵活性。比如,我们可以在装饰器中指定日志的等级,因为不同业务函数可能需要的日志级别是不一样的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def use_logging(level):
def decorator(func):
def wrapper(*args, **kwargs):
if level == "warn":
logging.warn("%s is running" % func.__name__)
elif level == "info":
logging.info("%s is running" % func.__name__)
return func(*args)
return wrapper

return decorator

@use_logging(level="warn")
def foo(name='foo'):
print("i am %s" % name)

foo()

上面的 use_logging 是允许带参数的装饰器。它实际上是对原有装饰器的一个函数封装,并返回一个装饰器。我们可以将它理解为一个含有参数的闭包。当我 们使用@use_logging(level="warn")调用的时候,Python 能够发现这一层的封装,并把参数传递到装饰器的环境中。

@use_logging(level="warn")等价于@decorator

如何实现属性可修改的装饰器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
from functools import wraps
import time, logging
from random import randint


def warn(timeout):
timeout = [timeout]
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
start = time.time()
res = func(*args, **kwargs)
used = time.time() - start
if used > timeout[0]:
msg = '%s, %s, %s' % (func.__name__, used, timeout[0])
logging.warn(msg)
return res

def setTimeout(k):
#nonlocal timeout # python 3 语法
timeout[0] = k

wrapper.setTimeout = setTimeout # 将setTimeout()函数作为wrapper的属性

return wrapper

return decorator


@warn(1.5)
def test():
print 'In test'
while randint(0, 1):
time.sleep(0.5)


for _ in range(30):
test()

test.setTimeout(1)
for _ in range(30):
test()

puppet 安装

master

1
yum install puppet puppet-server 

puppet 配置

题 2:单例模式

python 中的单例模式有四种实现方式:

  1. 模块
  2. 使用 __new__ 方法
  3. 使用装饰器
  4. 使用元类

这里解释使用 __new__ 方法的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# coding:utf-8

# 单例模式
class Singleton(object):
_instance = None

def __new__(cls, *args, **kwargs):
if not isinstance(cls._instance, cls):
# cls._instance = super(Singleton, cls).__new__(cls, *args, **kwargs)
cls._instance = object.__new__(cls, *args, **kwargs)
return cls._instance


# 测试
class Myclass(Singleton):
a = 1


A = Myclass()
B = Myclass()

print id(A), id(B)

题 3:数组中重复的数字

1. 找出数组中重复的数字

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# -*- coding:utf-8 -*-
class Solution:
# 这里要特别注意~找到任意重复的一个值并赋值到duplication[0]
# 函数返回True/False
def duplicate(self, numbers, duplication):
if numbers is None or len(numbers) <= 0:
return False
else:
for i in numbers:
if i < 0 or i > len(numbers) - 1:
return False
for i in range(len(numbers)):
while numbers[i] != i:
if numbers[i] == numbers[numbers[i]]:
duplication[0] = numbers[i]
return True
else:
index = numbers[i]
numbers[i], numbers[index] = numbers[index], numbers[i]
return False

2. 找出数组中重复的数字(不修改数组)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# coding:utf-8


class Solution:

def count_range(self, numbers, length, start, end):
if numbers is None:
return False
count = 0
for i in range(0, length):
if start <= numbers[i] <= end:
count += 1
return count # 返回区间的数目

def getDuplication(self, numbers, length):
if numbers is None or len(numbers) <= 0:
return False
else:
start = 1 # 1
end = length - 1 # n
while end >= start:
middle = ((end - start) >> 1) + start
count = self.count_range(numbers, length, start, middle)
if end == start:
if count > 1:
return start
else:
break
if count > (middle - start + 1):
end = middle
else:
start = middle + 1
return False


# 测试
if __name__ == '__main__':
num = [2, 3, 5, 4, 3, 2, 6, 7]
s = Solution()
print s.getDuplication(num, 8)

题 4:二维数组中的查找

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# -*- coding:utf-8 -*-


class Solution:
# array 二维列表
def Find(self, matrix, rows, columns, number):
# write code here
found = False

if matrix is not None and rows > 0 and columns > 0:
row = 0
column = columns - 1
while row < rows and column >= 0:
if matrix[row][column] == number:
found = True
break
elif matrix[row][column] > number:
column -= 1
else:
row += 1

return found

# 测试
if __name__ == '__main__':
matrix = [[1, 2, 8, 9],
[2, 4, 9, 12],
[4, 7, 10, 13],
[6, 8, 11, 15]]
s = Solution()
print s.Find(matrix, 4, 4, 7)



# 牛客网中写法

# -*- coding:utf-8 -*-
class Solution:
# array 二维列表
def Find(self, number, matrix):
# write code here
found = False
rows = len(matrix)
columns = len(matrix[0])
if matrix is not None and rows > 0 and columns > 0:
row = 0
column = columns - 1
while row < rows and column >= 0:
if matrix[row][column] == number:
found = True
break
elif matrix[row][column] > number:
column -= 1
else:
row += 1
return found

题 5:替换空格

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# -*- coding:utf-8 -*-

class Solution:
# s 源字符串
def replaceSpace(self, s):
# write code here
s_list = list(s)
s_replace = []
for item in s_list:
if item == ' ':
s_replace.append('%')
s_replace.append('2')
s_replace.append('0')
else:
s_replace.append(item)
return "".join(s_replace)

题 6:从尾到头打印链表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class ListNode:
def __init__(self, x=None):
self.val = x
self.next = None

# 单链表的一个实现函数
listnode = ListNode(1)
p = listnode
for i in xrange(2,11):
listnode.next = ListNode(i)
listnode = listnode.next
while p:
print p.val
p = p.next

# 利用栈
class Solution:
def printListFromTailToHead(self, listNode):
if listNode.val is None:
return
l = []
head = listNode
while head:
l.insert(0, head.val)
head = head.next
return l


s = Solution()
print s.printListFromTailToHead(p)

题 7:重建二叉树

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# coding:utf-8

'''
输入某二叉树的前序遍历和中序遍历的结果,请重建出该二叉树。
假设输入的前序遍历和中序遍历的结果中都不含重复的数字。
例如输入前序遍历序列{1,2,4,7,3,5,6,8}和中序遍历序列{4,7,2,1,5,3,8,6},则重建二叉树并返回。
'''


# -*- coding:utf-8 -*-
class TreeNode:
def __init__(self, x):
self.val = x
self.left = None
self.right = None


class Solution:
# 返回构造的TreeNode根节点
def reConstructBinaryTree(self, pre, tin):
# write code here
if not pre and not tin:
return None
root = TreeNode(pre[0])
if set(pre) != set(tin):
return None
i = tin.index(pre[0])
root.left = self.reConstructBinaryTree(pre[1:i + 1], tin[:i])
root.right = self.reConstructBinaryTree(pre[i + 1:], tin[i + 1:])
return root


pre = [1, 2, 3, 5, 6, 4]
tin = [5, 3, 6, 2, 4, 1]
test = Solution()
newTree = test.reConstructBinaryTree(pre, tin)


# 按层序遍历输出树中某一层的值
def PrintNodeAtLevel(treeNode, level):
if not treeNode or level < 0:
return 0
if level == 0:
print(treeNode.val)
return 1
PrintNodeAtLevel(treeNode.left, level - 1)
PrintNodeAtLevel(treeNode.right, level - 1)


# 已知树的深度按层遍历输出树的值
def PrintNodeByLevel(treeNode, depth):
for level in range(depth):
PrintNodeAtLevel(treeNode, level)

题 8:二叉树的下一个节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
'''
给定一个二叉树和其中的一个结点,请找出中序遍历顺序的下一个结点并且返回。
注意,树中的结点不仅包含左右子结点,同时包含指向父结点的指针。
'''
# -*- coding:utf-8 -*-
class TreeLinkNode:
def __init__(self, x):
self.val = x
self.left = None
self.right = None
self.next = None
class Solution:
def GetNext(self, pNode):
if pNode == None:
return
pNext = None
if pNode.right != None:
pRight = pNode.right
while pRight.left != None:
pRight = pRight.left
pNext= pRight
elif pNode.next != None:
pCurrent = pNode
pParent = pCurrent.next
while pParent != None and pCurrent == pParent.right:
pCurrent = pParent
pParent = pCurrent.next
pNext = pParent
return pNext

class Solution2:
def GetNext(self, pNode):
# 输入是一个空节点
if pNode == None:
return None
# 注意当前节点是根节点的情况。所以在最开始设定pNext = None, 如果下列情况都不满足, 说明当前结点为根节点, 直接输出None
pNext = None
# 如果输入节点有右子树,则下一个结点是当前节点右子树中最左节点
if pNode.right:
pNode = pNode.right
while pNode.left:
pNode = pNode.left
pNext = pNode
else:
# 如果当前节点有父节点且当前节点是父节点的左子节点, 下一个结点即为父节点
if pNode.next and pNode.next.left == pNode:
pNext = pNode.next
# 如果当前节点有父节点且当前节点是父节点的右子节点, 那么向上遍历
# 当遍历到当前节点为父节点的左子节点时, 输入节点的下一个结点为当前节点的父节点
elif pNode.next and pNode.next.right == pNode:
pNode = pNode.next
while pNode.next and pNode.next.right == pNode:
pNode = pNode.next
# 遍历终止时当前节点有父节点, 说明当前节点是父节点的左子节点, 输入节点的下一个结点为当前节点的父节点
# 反之终止时当前节点没有父节点, 说明当前节点在位于根节点的右子树, 没有下一个结点
if pNode.next:
pNext = pNode.next
return pNext

题 9:用两个栈实现队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# -*- coding:utf-8 -*-
class Solution:
def __init__(self):
self.stack1 = []
self.stack2 = []

def push(self, node):
self.stack1.append(node)
# write code here
def pop(self):
if len(self.stack2) == 0 and len(self.stack1) == 0:
return
elif len(self.stack2) == 0:
while len(self.stack1) > 0:

self.stack2.append(self.stack1.pop())
return self.stack2.pop()
# return xx

题 10:斐波那契数列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# -*- coding:utf-8 -*-
class Solution:
def Fibonacci(self, n, cache=None):
if cache is None:
cache = {}
if n in cache:
return cache[n]
if n == 0:
return 0
if n == 1:
return 1

cache[n] = self.Fibonacci(n - 1, cache) + self.Fibonacci(n - 2, cache)
return cache[n]

题 11:旋转数组的最小数字

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# coding:utf-8

class Solution:
def minNumberInRotateArray(self, array):
if len(array) == 0:
return 0
head, end = 0, len(array) - 1
mid = 0
while array[head] >= array[end]:
if end - head == 1:
mid = end
break
mid = (end + head) // 2
if array[head] == array[end] and array[head] == array[mid]:
return self.order_min(array, head, end)
if array[mid] >= array[head]:
head = mid
elif array[mid] <= array[end]:
end = mid
return array[mid]

def order_min(self, array, head, end):
result = array[0]
for i in range(len(array)):
if i < result:
result = i
return result

题 12:矩阵中的路径

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# -*- coding:utf-8 -*-
class Solution:
def hasPath(self, matrix, rows, cols, path):
if matrix == None or rows < 1 or cols < 1 or path == None:
return False
visited = [0] * (rows * cols)

pathLength = 0
for row in range(rows):
for col in range(cols):
if self.hasPathCore(matrix, rows, cols, row, col, path, pathLength, visited):
return True
return False

def hasPathCore(self, matrix, rows, cols, row, col, path, pathLength, visited):
if len(path) == pathLength:
return True

hasPath = False
if row >= 0 and row < rows and col >= 0 and col < cols and matrix[row * cols + col] == path[pathLength] and not \
visited[row * cols + col]:

pathLength += 1
visited[row * cols + col] = True

hasPath = self.hasPathCore(matrix, rows, cols, row, col - 1, path, pathLength, visited) or \
self.hasPathCore(matrix, rows, cols, row - 1, col, path, pathLength, visited) or \
self.hasPathCore(matrix, rows, cols, row, col + 1, path, pathLength, visited) or \
self.hasPathCore(matrix, rows, cols, row + 1, col, path, pathLength, visited)

if not hasPath:
pathLength -= 1
visited[row * cols + col] = False

return hasPath

题 13:机器人的运动范围

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# -*- coding:utf-8 -*-
class Solution:
def movingCount(self, threshold, rows, cols):
if threshold < 0 or rows <= 0 or cols <= 0:
return False
visited = [False] * (rows * cols)
count = self.movingCountCore(threshold, rows, cols, 0, 0, visited)
return count

def movingCountCore(self, threshold, rows, cols, row, col, visited):
count = 0
if self.check(threshold, rows, cols, row, col, visited):
visited[row * cols + col] = True
count = 1 + self.movingCountCore(threshold, rows, cols, row, col - 1, visited) + \
self.movingCountCore(threshold, rows, cols, row, col + 1, visited) + \
self.movingCountCore(threshold, rows, cols, row - 1, col, visited) + \
self.movingCountCore(threshold, rows, cols, row + 1, col, visited)
return count

def check(self, threshold, rows, cols, row, col, visited):
if row >= 0 and row <= rows and col >= 0 and col <= cols and self.getDigitSum(row) + self.getDigitSum(
col) <= threshold and not visited[row * cols + col]:
return True
return False

def getDigitSum(self, number):
sum = 0
while number > 0:
sum += number % 10
number = number // 10
return sum

题 14:剪绳子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# coding:utf-8


class Solution:
def max_product_after_cutting1(self, length): # 使用动态规划,自下而上解决
if length < 2:
return 0
if length == 2:
return 1
if length == 3:
return 2
products = [0] * (length + 1)
products[0] = 0
products[1] = 1
products[2] = 2
products[3] = 3
for i in range(4, length + 1):
max = 0
for j in range(1, i / 2 + 1):
product = products[j] * products[i - j]
if max < product:
max = product
products[i] = max
max = products[length]
return max

def max_product_after_cutting2(self, length): # 贪婪算法
if length < 2:
return 0
if length == 2:
return 1
if length == 3:
return 2
timesof3 = length / 3
if length - timesof3 * 3 == 1:
timesof3 -= 1
timesof2 = (length - timesof3 * 3) / 2
max = (3 ** timesof3) * (2 ** timesof2)
return max


if __name__ == '__main__':
s = Solution()
result1 = s.max_product_after_cutting1(8)
result2 = s.max_product_after_cutting2(8)
print result1, result2

题 15:二进制中 1 的个数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# -*- coding:utf-8 -*-
class Solution:
def NumberOf1(self, n): # 死循环
# n = bin(n)
count = 0
while n:
if n & 1:
count += 1
n = n >> 1
return count

def Numberof1_2(self, n):
l = len(bin(n))
count = 0
flag = 1
for i in range(l):
if n & flag:
count += 1
flag = flag << 1
return count

def Numberof1_3(self, n): # 最佳解法
count = 0
while n:
count += 1
n = (n - 1) & n
return count

def Numberof1_4(self, n):
if n < 0:
s = bin(n & 0xffffffff)
else:
s = bin(n)
return s.count('1')


if __name__ == '__main__':
s = Solution()
result1 = s.NumberOf1(9)
result2 = s.Numberof1_2(78787)
result3 = s.Numberof1_3(989798798798)
result4 = s.Numberof1_4(9)
print result1, result2, result3, result4

IO 调度算法

NOOP

NOOP 全称 No Operation,中文名称电梯式调度器,该算法实现了最简单的 FIFO 队列,所有 I/O 请求大致按照先来后到的顺序进行操作。之所以说”大致“,原因是 NOOP 在 FIFO 的基础上还做了相邻 IO 请求的合并,并不是完完全全按照先进先出的规则满足 IO 请求。它是基于先入先出(FIFO)队列概念的 Linux 内核里最简单的 I/O 调度器。此调度程序最适合于固态硬盘。

Anticipatory

Anticipatory 的中文含义是”预料的,预想的”,顾名思义有个 I/O 发生的时候,如果又有进程请求 I/O 操作,则将产生一个默认的 6 毫秒猜测时间,猜测下一个进程请求 I/O 是要干什么的。这个 I/O 调度器对读操作优化服务时间,在提供一个 I/O 的时候进行短时间等待,使进程能够提交到另外的 I/O。Anticipatory 算法从 Linux 2.6.33 版本后被删除了,因为使用 CFQ 通过配置也能达到 Anticipatory 的效果。

DeadLine

Deadline 翻译成中文是截止时间调度器,它避免有些请求太长时间不能被处理。另外可以区分对待读操作和写操作。DEADLINE 额外分别为读 I/O 和写 I/O 提供了 FIFO 队列。

CFQ

CFQ 全称 Completely Fair Scheduler ,中文名称完全公平调度器,它是现在许多 Linux 发行版的默认调度器,CFQ 是内核默认选择的 I/O 调度器。该算法的特点是按照 IO 请求的地址进行排序,而不是按照先来后到的顺序来进行响应。CFQ 为每个进程/线程,单独创建一个队列来管理该进程所产生的请求,也就是说每个进程一个队列,各队列之间的调度使用时间片来调度,

相关命令

查看当前系统支持的I/O调度器

1
2
3
4
5
[sankuai@dx-cloud-climc01 ~]$ dmesg | grep -i scheduler
io scheduler noop registered
io scheduler anticipatory registered
io scheduler deadline registered
io scheduler cfq registered (default)

查看一个硬盘使用的I/O调度器

1
2
[sankuai@gh-cloud-mss-store02 ~]$ cat /sys/block/sda/queue/scheduler
noop anticipatory deadline [cfq]

查看调度算法参数的含义

1
2
yum -y install kernel-doc
比如:/usr/share/doc/kernel-doc-2.6.32/Documentation/block/deadline-iosched.txt

永久修改系统 IO 调度器

grubby 命令

1
$ grubby --grub --update-kernel=ALL --args="elevator=deadline

配置文件

1
2
3
4
5
6
7
使用vi编辑器修改grub配置文件
#vi cat /etc/default/grub
#修改第五行,在行尾添加#
elevator= cfq

然后保存文件,重新编译配置文件,
#grub2-mkconfig -o /boot/grub2/grub.cfg

算法调优

CFQ 算法参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[sankuai@gh-cloud-mss-store02 ~]$ ls -l /sys/block/sda/queue/iosched
total 0
-rw-r--r-- 1 root root 4096 Jun 4 11:53 back_seek_max
-rw-r--r-- 1 root root 4096 Jun 4 11:53 back_seek_penalty
-rw-r--r-- 1 root root 4096 Jun 4 11:53 fifo_expire_async
-rw-r--r-- 1 root root 4096 Jun 4 11:53 fifo_expire_sync
-rw-r--r-- 1 root root 4096 Jun 4 11:53 group_idle
-rw-r--r-- 1 root root 4096 Jun 4 11:53 group_isolation
-rw-r--r-- 1 root root 4096 Jun 4 11:53 low_latency
-rw-r--r-- 1 root root 4096 May 24 2017 quantum
-rw-r--r-- 1 root root 4096 Jun 4 11:53 slice_async
-rw-r--r-- 1 root root 4096 Jun 4 11:53 slice_async_rq
-rw-r--r-- 1 root root 4096 May 24 2017 slice_idle
-rw-r--r-- 1 root root 4096 Jun 4 11:53 slice_sync
  • back_seek_max

    默认 16384。该参数规定了磁头向后寻址的最大范围,默认值是16M。对于请求所访问的扇区号在磁头后方的请求,cfq 会像向前寻址的请求一样调度他。

  • back_seek_penalty

    默认值是 2。该参数用来计算向后寻址的代价。相对于前方查找,后方查找的距离为 1/2(1/back_seek_penalty)时,cfq 调度时就认为这两个请求寻址的代价是相同的。

  • fifo_expire_async

    默认值是250ms。该参数用来控制异步请求的超时时间。如果队列被激活后,则优先检查是否有请求超时,如果有超时的请求,则派发。但是,在队列激活的期间内,只会派发 一个超时的请求,其余的请求按照请求的优先级,以及所访问的扇区号大小来派发。

  • fifo_expire_sync

    默认值是125ms。功能类似于fifo_expire_async参数,该参数用于控制同步请求的超时时间,。

  • group_idle

    默认值是 8ms,该参数为了提高吞吐量。

  • group_isolation

    该参数用来标识应用程序所在的cgroup。

  • low_latency

    该参数用来表示低延迟,默认值是1ms

  • quantum

    该参数用于控制队列派发到设备驱动层所含有的请求数,默认值是8。不管是同步队列还是异步队列, 在时间片内,超过这个限制,则不再派发请求。对于异步队列 而言,请求数的派发个数还取决于参数slice_async_rq。

  • slice_async

    默认值是 40ms。这个参数功能同slice_sync,但是用来计算异步队列的时间片。
    异步队列的时间片的计算公式是:time slice=slice_async + (slice_async/5 * (4 - priority));

  • slice_async_rq

    默认值是 2。这个参数用来计算在时间片内异步请求被派发的最大数。同样,最大请求数也依赖于队列的优先级。计算公式是:最大请求数=2 * slice_async_rq( 8 –priority );

  • slice_idle

    默认值是 8ms,这个参数只控制同步队列的idle time。当同步队列当前没有请求派发时,并不切换到其他队列,而是等待 8ms,以便让应用程序产生更多的请求。直到同步队列的时间片用完。

  • slice_sync

    默认值是 100ms。这个参数用来计算同步队列的时间片, 默认值是100ms。时间片还依赖于队列的优先级。同步队列的时间片的计算公式是:time slice=slice_sync + (slice_sync/5 * (4 - priority));

Anticipatory 算法参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
The parameters are:
* read_expire
Controls how long until a read request becomes "expired". It also controls the interval between which expired requests are served, so set to 50, a request might take anywhere < 100ms to be serviced _if_ it is the next on the expired list. Obviously request expiration strategies won't make the disk go faster. The result basically equates to the timeslice a single reader gets in the presence of other IO. 100*((seek time / read_expire) + 1) is very roughly the % streaming read efficiency your disk should get with multiple readers.

* read_batch_expire
Controls how much time a batch of reads is given before pending writes are served. A higher value is more efficient. This might be set below read_expire if writes are to be given higher priority than reads, but reads are to be as efficient as possible when there are no writes. Generally though, it should be some multiple of read_expire.

* write_expire, and
* write_batch_expire are equivalent to the above, for writes.

* antic_expire
Controls the maximum amount of time we can anticipate a good read (one with a short seek distance from the most recently completed request) before giving up. Many other factors may cause anticipation to be stopped early, or some processes will not be "anticipated" at all. Should be a bit higher for big seek time devices though not a linear correspondence - most processes have only a few ms thinktime.

In addition to the tunables above there is a read-only file named est_time
which, when read, will show:

- The probability of a task exiting without a cooperating task
submitting an anticipated IO.

- The current mean think time.

- The seek distance used to determine if an incoming IO is better.

测试-fio

可以使用专业的工具 fio,或者命令 dd。

fio 格式

fio 可以采用命令行格式,也可以采用 jobfile 格式。

1
2
3
fio [options] [jobfile] ...
options
--output=filename

jobfile 参数

General

1
runtime=time 单位:seconds

I/O type

1
2
3
4
5
6
7
8
9
direct=bool # 是否使用缓存,true 表示不使用缓存
readwrite=str
read # 顺序读
write # 顺序写
randread # 随机读
randwrite # 随即写
rw # 顺序读写
randrw # 随机读写

Block size

1
2
3
4
5
blocksize/bs=int[,int][,int] # read, writes, trimes,默认是 4096
bs=256k
blocksize_range/bsrange=irange[,irange][,irange]
bsrange=1k-4k,2k-8k
bssplit=str

Buffers and memory

1
2
3
4
5
iomem=str, mem=str
malloc
shm
shmhuge
mmap

I/O size

1
2
3
size=int # 一个任务中每一个线程的总大小
io_size=int, io_limit=int
filesize=irange(int)

I/O engine

1
2
3
4
5
ioengine=str
sync # 基本的读写操作
psync # 基本的 pread 和 pwrite
vsync # 基本的 readv 和 writev 将通过将相邻的 I/O 合并为一个提交来模拟排队。
libaio # Linux 原生的异步 I/O

I/O depth

1
iodepth=int # 线程的数量

I/O rate

1
2
thinktime=time # I/O 发出后,在一段时间内停止,再发出下一个
rate=int[,int][,int] # 限制某一个任务的带宽

I/O latency

1
latency_target=time # 如果设置,fio将尝试查找给定工作负载运行时的最高性能点,同时保持低于此目标的延迟。

I/O replay

1
2
write_iolog=str
read_iolog=str

Threads, processes and job synchronization

1
thread # fio 默认是通过 fork 的方式创建任务,但是如果这个参数指定,就会采用 POSIX 的线程函数来创建线程

Verification

1
verify_only # 不指定工作负载,只验证数据是否符合之前的

Steady state

1
2
steadystate=str:float, ss=str:float # 定义评估稳态性能的标准和限制。
iops # 收集 iops 数据 如:iops:2

Measurements and reporting

1
2
write_bw_log=str
write_lat_log=str

相关测试用例

随机读

1
2
3
4
5
6
7
8
9
10
11
[Rand_Read_Testing]
direct=1
iodepth=128
rw=randread
ioengine=libaio
bs=4k
size=1G
numjobs=1
runtime=1000
group_reporting
filename=/s3plus/mount/002/randread.test

参考:http://devopslinux.com/2016/07/23/PTE-fio/

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#顺序读
fio -filename=/dev/sda -direct=1 -iodepth 1 -thread -rw=read -ioengine=psync -bs=16k -size=200G -numjobs=30 -runtime=1000 -group_reporting -name=mytest

#顺序写
fio -filename=/dev/sda -direct=1 -iodepth 1 -thread -rw=write -ioengine=psync -bs=16k -size=200G -numjobs=30 -runtime=1000 -group_reporting -name=mytest

#随机读
fio -filename=/dev/sda -direct=1 -iodepth 1 -thread -rw=randread -ioengine=psync -bs=16k -size=200G -numjobs=30 -runtime=1000 -group_reporting -name=mytest

#随机写
fio -filename=/dev/sda -direct=1 -iodepth 1 -thread -rw=randwrite -ioengine=psync -bs=16k -size=200G -numjobs=30 -runtime=1000 -group_reporting -name=mytest

#混合随机读写
fio -filename=/dev/sda -direct=1 -iodepth 1 -thread -rw=randrw -rwmixread=70 -ioengine=psync -bs=16k -size=200G -numjobs=30 -runtime=100 -group_reporting -name=mytest -ioscheduler=noop

混合随机读写

1
2
3
4
5
6
7
8
9
10
11
[randrw]
direct=1
iodepth=128
rw=randrw
ioengine=libaio
bs=4k
size=1G
numjobs=16
runtime=60
group_reporting
filename=/s3plus/mount/002/randrw.file

参考:

https://www.ibm.com/developerworks/cn/linux/l-lo-io-scheduler-optimize-performance/index.html

http://bbs.chinaunix.net/thread-1967733-1-1.html