其实这个是Library Reference的内容
- Dictionaries
使用del可以删除一个key
list(d)可以把Dictionaries的keys按照插入的顺序输出 python3.7新特性. 使用时注意版本是否支持
a_dict = {'foo': 'bar', 'my': 'a-only'}
b_dict = {'foo': 'b', 'you': 'b-only'}
a_dict.update(b_dict)
>>> a_dict
{'foo': 'b', 'my': 'a-only', 'you': 'b-only'}
- 9.8 Iterators
定义一个iter会返回一个class(拥有__next__方法). 如果这个iterator自己有__next__方法,他可以返回self
for的功能就是调用object的__iter__
函数 - 9.9 Generators
在函数里添加yield来使得这个函数变成iterators
- 自动创建
__iter__, __next__
函数 - 每次执行next时自动更新,免去手动设置
self.data, self.index
- 不返回时,自动
raise StopIteration
- 自动创建
-
setup.py
示例: 文档from setuptools import setup setup( # 必选 name="包名", version="0.0.1", # 可选 package_data = { '': ['*.png', '*.json'], # 把包里面的png和json放入包 }, data_files=[('README.md', ['README.md'])], install_requires=[ '<dependency_name> @ git+ssh://git@github.com/<user>/<repo_name>@<ref>', # 依赖一个git仓库 ] )
-
发布
python3 setup.py sdist bdist_wheel twine upload dist/*
-
class A:
def __getitem__(self, sli): sli.start, sli.stop, sli.step # A()[start:stop:step]
() # 括号内
** # 指数
+x, -x # 负数
in, not in, is, is not, <, <=, >, >=, != # 比较
not x #
and #
or # and 和 or不是同样的哦。
测试 成功执行时, exit的三个参数都为None, 否则为对应数据
class A():
def __enter__(self):
print('enter')
def __exit__(self, exc_type, exc_value, traceback):
print(f'exc_type: {exc_type}')
print(f'exc_value: {exc_value}')
print(f'traceback: {traceback}')
print('exist')
with A():
print('start')
raise Exception('value')
-
通过内置变量counter来记录执行的位置,所以remove会导致少执行,insert会导致重复执行
for i in a: if i == 3: a.remove(i) # 少执行 if i == 3: a.insert(0, 3) # 多执行
- 属性
__new__
: 创建class类的时候调用
示例. 通过__new__
的时候`,返回不同的class
```python
class GuessAnimal(object):
def __name__(self, type, *args, **kwargs):
if type == 'dog':
return Dog(*args, **kwargs)
return Cat(*args, **kwargs)
d = Some("dog")
d.say()
c = Some("cat")
```
* `__module__` : class的模块
* `__name__` : class的name
- Introduction
- all
- any
- divmod
- enumerate
enumerate(['a','b','c'], start=1) // [(0, 'a'), (1, 'b'), (2, 'c')] 但是不是list, 而是一个enumerate对象, 默认从0开始
- locals
- max
- open
打开一个文件 buffering=0代表不需要缓存(不缓存,mode必须是b), buffering=1代表每一行保存,buffering>1代表多少字节保存 - zip: 迭代2个迭代器, 按照最短的来计算
- Built-in Constants
- Built-in Types
- DeprecationWarning
注意, 前面和后面的换行符不会消失
from textwrap import dedent
def function():
LONG_CONTENT = dedent("""\
A, # 空格数量无所谓, 只要一致就行
B,
C\
""")
- Binary Data Services
[ ] calendar
通过二分法来查找list或者插入数据
bisect.insort(list, item) # 把x插入list并保持顺序
bisect.bisect(list, item) # 找到可以插入item的位置(最右侧)
# 查看是否存在
def index(a, x):
i = bisect_left(a, x)
if i != len(a) and a[i] == x:
return i
raise ValueError
-
copy.copy(x): return a shallow copy of x
-
copy.deepcopy(x): return a deepcopy copy.copy只会copy一层, 里面的可变对象不会copy
copy.deepcopy会copy recursively
在shallow copy里, 对于dict, 使用的是 dict.copy(), 对于list使用的是copied_list = original_list[:]
如果要实现自己的copye, 可以重写__copy__()
和__deepcopy__()
-
pprint
math.ceil(x) 大于等于x的最小的整数, 使用 __ceil__ 方法,可以让一个对象支持这个函数
math.floor(x) 小于等于x的最大的整数, 使用 __floor__ 方法,可以让一个对象支持这个函数
- isclose
相当于
abs(a-b) <= max{abs_tol, rel_tol*max[abs(a), abs(b)]}
, 起不到校验超过abs_tol
或者rel_tol
的功能哦
from fractions import Fraction
f = Fraction(1,3)
print("1/3 = %d/%d" % (f.numerator, f.denominator))
Generate pseudo-random numbers
random.choice(list) # choose one value from list
random.choices(list, k=20) # 随机选择20次, 可能重复选到
random.randrange(stop)
random.randrange(start, stop[, step])
turn value from start(included) to stop(excluded)
random.randint(start, stop)
turn value from start(included) to stop(included)
random.sample(list, k) # choose k's value from list, 每个item只被选一次,所以k要小于len(list)
- statistics — Mathematical statistics functions 数学分析
- statistics.mean
- statistics.stdev
- statistics.StatisticsError
- itertools 迭代器
- functools 包含cache lru_cache等功能
- operator 运算符
操作目录,路径的功能
临时文件,临时文件夹
推荐使用 deep-dircmp
from deep_dircmp import DeepDirCmp
DeepDirCmp(source, target).get_left_only_recursive() # 注意,如果一个文件夹额外存在,只会返回文件夹路径,不会再迭代文件夹内部文件
-
rmtree
删除文件夹shutil.rmtree(Path)
-
复制文件夹
dirs_exist_ok=False
shutil.copytree(src, dst)
pickle 把python的对象序列化成字符串
- bz2 使用方法
import bz2
bz2.compress(b'11111' * 1000)
>>> b'BZh91....'
bz2.decompress(b'BZh91...')
>>> b'11111...'
f = bz2.open("myfiles.bz2", "bb")
f.read()
f = bz2.open("myfiles.bz2", "wb")
f.write(data)
import gzip
f = gzip.open("~/test.csv.gz", compresslevel=3)
f.write("hedaer\n")
f.write("123\n")
f.close()
- zipfile 处理zip压缩包
configparser 配置文件
- netrc
- xdrlib
- plistlib
import hashlib
a = hashlib.md5()
a.update('string'.encode('utf8'))
a.hexdigest()
>>> 'b45cffe084dd3d20d928bee85e7b0f21'
[ ] io
这个用来解析python的命令
- getopt
- to be continued
线程 Threading
from threading import Thread
s1 = Thread(None, function, args=[], kwargs={})
s2 = Thread(None, function2, args=[], kwargs={})
s1.start()
s2.start()
-
threading.get_native_id()
获取当前线程的id -
Thread-Local Data:
使用treading.local()
可以获取本线程的变量。 这个变量在几个线程内不相通
测试2个thread的变量 -
Lock 线程锁, 一个线程只能拿到一个
-
RLock 线程锁. 同一个线程内可以多次获取
- 如果不join,直接关闭 直到主进程都要退出的时候,会等待子进程的结束
- imap_unordered 对iterable里面的每个元素执行func. chunksize代表每个进程执行的迭代次数。这样一个进程可以执行多次 测试
with Pool() as p:
for result in p.imap_unordered(func, iterable, chunksize):
print(result)
基础用法
import subprocess
try:
res = subprocess.run(["ls", "-l"], capture_output=True, check=True)
print(res.stdout.decode("utf-8"))
except subprocess.TimeoutExpired as e:
logger.exception(e)
logger.exception("超时了")
except subprocess.CalledProcessError as e:
logger.exception(e)
logger.error(f"执行任务失败")
- 输入输出 测试代码
proc = subprocess.Popen(["sh", "input.sh"], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
outs, errs = proc.communicate(
input="1\n2\nexit\n".encode("utf-8"), timeout=1)
print(outs.decode('utf-8'))
from subprocess import Popen
thread = Popen(["python", "-m", "pyftpdlib"])
time.sleep(10)
thread.kill()
asyncio 用来处理协程
socket 低级的网络接口
- 使用触发信号,处理ctrl+c的时候,保证循环执行完毕
stop = False
def handler(signalnum, handler):
global stop
stop = True
def main():
signal.signal(signal.SIGINT, handler)
global stop
while not stop:
time.sleep(0.1)
print("stop拉")
原理, RFC 3548
' ' b'00100000 00100000'
按照6个比特来分割 001000 000010 0000[补充00]
I C A=
对比 0-25 A-Z 26-51 a-z 52-61 0-9
然后每76个字符加一个换行,最后加一个换行
base64.encodebytes(b' ') == b'ICA=\n'
b = base64.encodebytes('我'.encode('utf8')) # 只有二进制才能encode,结果还是bytes
b = base64.encodestring('我'.encode('utf8')) # 查了源码,果然这个是为了兼容python2的语法。以后避免使用这个方法
b = base64.encodestring('我') # python2里面的str就是二进制,结果是str(仍然是二进制)
- binhex
- binascii
- unhexlify(a) 把十六进制的字符串变成二进制数据
a = 'b4447f6670a' binascii.unhexlify(a) >>> b'\xb4G\xf6g\n'
- to be continued
- Structed Markup Processing Tools
-
- poplib
- imaplib
- nntplib
- smtplib
-
urllib 处理url
- telnetlib
-
- uuid.uuid1 根据序列号,时间,电脑的mac地址生成一个uuid 返回一个uuid,但是后面是固定的node,可以手工提供或者直接获取电脑的mac地址
- uuid.uuid4 生成随机的uuid
- socketserver
各种注释
- sys
- traceback
- dataclass
- contextlib
- ftplib
with FTP() as ftp:
ftp.connect(host='localhost', port=2121)
ftp.login()
ftp.dir()
with open("source.md", "rb") as f: # 保存文件
ftp.storbinary("STOR target.md", f)
-
- ast
ast.literal_eval
: "savely evalute an expression node or a string containing a Python literal or container display." - to be continued
- ast
不过更加建议的是使用flockcontext
- fcntl.flock
f = open("name", "w")
fcntl.flock(f, fcntl.LOCK_EX) # 只有一个线程可以获取执行, 其他的会等待, 并且如果f变量失效了,也会释放锁
fcntl.flock(f, fcntl.LOCK_UN) # 执行完毕后记得unlock
fcntl.flock(f, fcntl.LOCK_SH) # 可以共享
import py7zr
archive = py7zr.SevenZipFile('sample.7z', mode='r')
archive.extractall(path='/tmp')
官网
* 安装: pip3 install beautifulsoup4
* 文档整理
把二进制转化成01
from bitstring import BitArray
BitArray(b"123").bin # '001100010011001000110011'
captcha 生成验证码
这个软件在linux-reference里面
* 官网
* github在线链接
* 本地linux-reference链接
click 用python写shell命令command
安装: pip install datetime-month
from month import XMonth
month = XMonth(2022, 11)
month.first_day()
比较文字不同
数字货币的类
faker use fake to create a lot of name of text
from faker import Faker
f = Faker('zh_cn')
print(f.name(), f.address(), f.text())
f.profile(['ssn', 'birthdate'])
$ faker address
$ faker name
$ faker password
flask 轻量级http服务器
转化尺寸
很好用的邮件客户端
非常好用的交互式shell
-
在
~/.ipython/profile_default/startup/
下创建脚本可以默认import一些包 -
itchat 微信机器人
-
[iptools] 处理IP地址的包
>>> jmespath.search("foo.bar", {"foo": {"bar": "baz"}})
'baz'
- jinja模板渲染
- kafka 用于kafka的消息分发
from kafka import KafkaConsumer consumer = KafkaConsumer('test',bootstrap_servers='192.168.1.191') for msg in consumer: print(msg) from kafka import SimpleProducer, SimpleClient kafka_client = SimpleClient('192.168.1.191') kafka_producer = SimpleProducer(kafka_client, async=False) kafka_producer.send_messages('test',b'test')
操作mysql数据库的包
- 安装
sudo apt-get install python3-dev default-libmysqlclient-dev build-essential
sudo pip3 install mysqlclient
- 运行
from MySQLdb.connections import Connection
db = Connection(db="test")
c = db.cursor()
res = c.execute("select * from pets");
print(c.fetchall())
>>> ((1, 'cat'), (2, 'cat'), (3, 'dogs'), (13, 'dog'), (14, 'dog'), (15, 'dog'), (21, 'dog'), (22, 'dog'))
res = c.execute("insert into pets values (null, 'dog')");
# 注意即使没有commit, 数据库id也会自增. 如果一次没有commit, 下次commit时,id就不是连续的了
db.commit()
# 使用连接池
https://github.com/discover-python-channel/youtube-content/blob/main/mysql_connection_pooling/python/import_fake_data.py
from mysql.connector import pooling
cnxpool = pooling.MySQLConnectionPool(pool_name="poolname", pool_size=20. autocommit=True, username...)
connection = cnxpool.get_connection()
cursor = connection.cursor()
cursor.execute(sql)
画图工具
imoprt matplotlib.pyplot as plt
plt.plot([1, 2, 3, 4]) # 默认x轴是0, 1, 2, 3
plt.plot([2, 3, 4, 5], [2, 2, 3, 4])
plt.ylabel('some numbers')
plt.show()
mongoengine 把mongodb当作sql用。那你为什么不直接用mysql啊
- moviewpy 操作mp4的包
from moviepy.video.io.ffmpeg_tools import ffmpeg_extract_subclip
# 截取前5秒的mp4文件
ffmpeg_extract_subclip("movie.mp4", 0, 5, targetname="test.mp4")
>>> numpy.linspace(0, 100, 3, dtype='int')
array([0, 50, 100])
有顺序的set, 实现原理其实就是用一个class内部保存一个list和一个set.
我尝试用dict来做(python现在dict的key是有顺序的),但是他的key不太方便做index顺序索引.
但是他内部是先判断是否存在,后插入的, 会不会遇到多线程导致key重复的问题呢?
会的, 参考代码 在sorted_set的add函数里加入一个time.sleep可以发现, 不加的话估计要很大的高并发才能出现
sudo pip3 install ordered-set
from ordered_set import OrderedSet
a = OrderedSet()
a.add(3)
a.update([5, 1, 4]) // OrderedSet([3, 5, 1, 4])
a.indexof(3) // 0
openpyxl 处理excel
github 文档 处理ssh的包,所以也能当sftp服务或者客户端。
-
示例
import paramiko client = paramiko.client.SSHClient() client.load_system_host_keys() client.connect( hostname="www.ramwin.com", port=22, username="*****", password="******") stdin, stdout, stderr = client.exec_command('pwd') error = stderr.read().decode("utf8") if error: raise Exception(error) print(stdout.read())
pdf2image: 把pdf转化成图片的库
from pdf2image import convert_from_path
convert_from_path(pdf_path, output_folder=path, fmt='png')
images = convert_from_path(pdf_path)
pdfminer 解析pdf的包,好用
peewee 简单而轻量级的sqlite3 orm,和django很像
流程控制算法
pip 快速安装包
pip install --extra-index=https://pypi.tuna.tsinghua.edu.cn/simple --extra-index=https://pypi.python.org/ django
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple django==1.11
pip install -i https://pypi.org/simple django==1.11
sudo pip3 config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple # 设置清华的源
export LC_ALL="en_US.UTF-8" # 出现乱码
export LC_CTYPE="en_US.UTF-8"
获取系统信息
- 快捷键:
- 界面工具查看
- 命令行:
alt+F12
- 命令行:
- 代码编辑
- 折叠代码
Ctrl+B 或者 Ctal+click
: 查看一个函数的定义Ctrl+Q
: 查看一个函数的文档查看文件结构
:alt+7
orctrl+F12
shift+F6
: 重构函数名称,全局变化他的名字
- 跳转
ctrl+shift+backspace
: 查看上期编辑的地方
- 界面工具查看
* 安装:
* windows: 先去[下载visual c++ 9.0](http://aka.ms/vcpython27),然后再 `pip install pycrypto`
模拟lodash的
# pip install pydash
import pydash
pydash.get(obj, "a.b.1", 1) # 默认返回1, 但是key存在为None会返回None
pydub 编辑mp3的包
- 安装依赖:
apt install libav-tools ffmpeg
- 示例
- 基础:
import math
from pydub import AudioSegment
song = AudioSegment.from_mp3('origin.mp3')
song[10*1000: 40*1000].export('target.mp3')
python虚拟化,通过制定python路径,来在服务器安装多个python
文档
添加-w
参数可以允许写入
# 直接启动一个ftplib
python -m pyftpdlib # 默认匿名登录, 端口号2121
python -m pyftpdlib --port=1223 --username=admin --password=123 -d ~/Downloads
# 后台启动
python script/启动ftp.py
PyPDF2 对中文支持不友好
pyperclip.copy('ew') # 把ew放入剪切板
pysrt 控制srt字幕
f = open("模板.docx", "rb")
document = Document() or Document(f)
first_line = document.paragraphs[0]
first_line.text = "通知"
document.save("通知.docx")
- 用来读取本地.env的配置(当前目录.env > ~/.env)
import os
from dotenv import load_dotenv, dotenv_values
load_dotenv()
CONFIG = {
**os.environ(),
dotenv_values(),
}
- 设置环境变量
dotenv set EMAIL foo@example.org
dotenv list
pytz 时区
PyWinMouse windows下操作鼠标
qiniu 七牛的接口
redis use redis db
requests 发送http请求
rsa 使用rsa加密
用scp传输文件
from paramiko import SSHClient
from scp import SCPClient
with SSHClient() as ssh:
ssh.load_system_host_keys()
ssh.connect("ramwin.com", compress=True) # https://github.com/jbardin/scp.py/pull/19
with SCPClient(ssh.get_transport()) as scp:
scp.put(<本地文件>, <远程路径>)
scp.get(<远程路径>, <本地文件>, recursive=True)
six python2和python3兼容的库
pip install sortedcontainers
from sortedcontainers import SortedList
s1 = SortedList()
s1.add(0)
s1.update([2, 1, 3])
模仿redis的sorted set做的自动排序的set
sudo pip3 install sortedsets
>>> from sortedsets import SortedSet
>>> ss = SortedSet()
>>> for i in range(1, 1000):
>>> ss['player' + str(i)] = i*10 if i % 2 else i*i
ss.by_score[470:511]
>>> ss.index('player20'), ss.index('player21')
400, 210
import time
from dotenv import dotenv_values, load_dotenv
from syncthing import Syncthing
load_dotenv()
folder = "目录id"
client = syncthing.Syncthing(dotenv_values()["APIKEY"])
client.db.scan(folder, sub="要同步的目录")
while client.db.completion(remote_device, folder) != 100:
time.sleep(10)
-
srt因为缺少shift功能而改成用pysrt
virtualenv --system-site-packages -p /bin/python ENV
-
watchdog 监控文件变化
-
websocket websocket 客户端
-
wechatpy 和微信的接口
-
wechat-django 微信的django app
-
word2html 把word转化成html
import word2vec
word2vec.word2phrase('./text8', './text8-phrases', verbose=True)
word2vec.word2vec('text8-phrases', 'text8.bin', size=100, verbose=True)
word2vec.word2clusters('text8', 'text8-clusters.txt', 100, verbose=True)
import word2vec
model = word2vec.load('text8.bin')
model.vocab
model.vectors.shape
model.vectors
model['狗'].shape
import xlrd
wb = xlrd.open_workbook(filename)
wb.sheets() // [sheet0, sheet1, sheet2]
ws = wb.sheets()[0]
ws.visibility // 2: hidden 0: show
for i in range(ws.nrows):
print(ws.row(i)[0]) // first column
- socket.gethostname() # 获取当前主机的主机名
- uuid.getnote() # 获取本机的MAC地址
mac=uuid.UUID(int = node).hex[-12:] -
readme_renderer
通过fork可以创建一个子线程。子线程可以完整地运行并且每个子线程可以充分地利用一个cpu.当一个线程崩溃后,不会影响其他线程
python的解释器在执行代码的时候,有个GIL锁,保证同一时间只有一个线程执行。所以不能充分利用CPU。但是这不代表不会出现几个线程打乱数据的问题,因为线程的切换是按照python字节码来处理的。test/test_thread.py
不会应为有多核CPU而变快。但是test/test_fork.py
会因为多核而变快
用kill杀出一个子线程后,会导致进程崩溃
- time.time 来判断是否刷新缓存,1秒能执行753万次
if time.time() > start :
refresh()
- random.random 来判断, 1秒能执行977万次
if random.random() > 0.0000001:
refresh()