Linux 下Python 脚本编写的"奇技淫巧"

2023-01-30 15:55:05 浏览数 (1)

写在前面

  • 对于自动化运维来讲Python是一个利器
  • 常用的自动化运维工具Ansible就是通过python编写
  • 博文为《Python Cookbook》读书笔记整理而来
  • 涉及的内容都是编写python运维脚本常用的一些知识点及Demo
  • 理解不足小伙伴帮忙指正

「 生命完美的答案,无非走过没有遗憾 ---《天蓝》」


脚本编程与系统管理

解析命令行选项

「如何能够解析脚本运行命令行选项(位于 sys.argv 中)」

argparse 模块可被用来解析命令行选项

常用来定义一个脚本的说明文档,一般我们写python脚本会通过if..else 的方式来提供一个脚本说明文档,python不支持switch。所有很麻烦,其实,我们可以通过argparse来编写说明文档。

我们来看看执行一个python脚本

对于熟悉Linux的小伙伴下面的文档在熟悉不过了,这个一个标准Linxu软件包的说明文档,文档中定义是软件包的说明

代码语言:javascript复制
┌──[root@liruilongs.github.io]-[~/python_demo]
└─$./demo.py -h
usage: demo.py [-h] -p pattern [-v] [-o OUTFILE] [--speed {slow,fast}]
               [filename [filename ...]]

Search some files

positional arguments:
  filename

optional arguments:
  -h, --help            show this help message and exit
  -p pattern, --pat pattern
                        text pattern to search for
  -v                    verbose mode
  -o OUTFILE            output file
  --speed {slow,fast}   search speed
┌──[root@liruilongs.github.io]-[~/python_demo]
└─$

来看看这个脚本是如何编写的

代码语言:javascript复制
#!/usr/bin/env python3


import argparse
# 脚本的描述
parser = argparse.ArgumentParser(description='Search some files')
# 脚本接收的全部参数,用`filenames`接收
parser.add_argument(dest='filenames', metavar='filename', nargs='*')

# 脚本接收
parser.add_argument('-p', '--pat', metavar='pattern', required=True,
                    dest='patterns', action='append',
                    help='text pattern to search for')
parser.add_argument('-v', dest='verbose', action='store_true',
                    help='verbose mode')
parser.add_argument('-o', dest='outfile', action='store',
                    help='output file')
parser.add_argument('--speed', dest='speed', action='store',
                    choices={'slow', 'fast'}, default='slow',
                    help='search speed')
args = parser.parse_args()
# Output the collected arguments
print(args.filenames)
print(args.patterns)
print(args.verbose)
print(args.outfile)
print(args.speed)

为了解析命令行选项, 首先要创建一个ArgumentParser实例, 并使用add_argument() 方法声明你想要支持的选项。在每个add-argument()调用中:

dest参数指定解析结果被指派给属性的名字。metavar 参数被用来生成帮助信息。

action 参数指定跟属性对应的处理逻辑,通常的值为 store , 被用来存储某个值或将多个参数值收集到一个列表中

nargs 参数收集所有剩余的命令行参数到一个列表中。在本例中它被用来构造一个文件名列表

代码语言:javascript复制
parser.add_argument(dest='filenames',metavar='filename', nargs='*')
代码语言:javascript复制
┌──[root@liruilongs.github.io]-[~/python_demo]
└─$python3 demo.py  -p spam --pat=eggs  foo.txt bar.txt
['foo.txt', 'bar.txt']
['spam', 'eggs']
False
None
slow
┌──[root@liruilongs.github.io]-[~/python_demo]
└─$

action='store_true' 根据参数是否存在来设置一个 Boolean 标志:

代码语言:javascript复制
parser.add_argument('-v', dest='verbose', action='store_true', help='verbose mode')
代码语言:javascript复制
┌──[root@liruilongs.github.io]-[~/python_demo]
└─$python3 demo.py -v -p spam --pat=eggs foo.txt bar.txt
['foo.txt', 'bar.txt']
['spam', 'eggs']
True
None
slow

action='store' 参数接受一个单独值并将其存储为一个字符串

代码语言:javascript复制
parser.add_argument('-o', dest='outfile', action='store', help='output file')
代码语言:javascript复制
┌──[root@liruilongs.github.io]-[~/python_demo]
└─$python3 demo.py -v -p spam --pat=eggs -o liruilong  foo.txt bar.txt
['foo.txt', 'bar.txt']
['spam', 'eggs']
True
liruilong
slow
  • action='append' 参数说明允许某个参数重复出现多次,并将它们追加到一个列表中去。
  • required 标志表示该参数至少要有一个-p --pat 表示两个参数名形式都可使用。
代码语言:javascript复制
parser.add_argument('-p', '--pat', metavar='pattern', required=True,
                    dest='patterns', action='append',
                    help='text pattern to search for')
代码语言:javascript复制
┌──[root@liruilongs.github.io]-[~/python_demo]
└─$python3 demo.py  -p spam   foo.txt bar.txt
['foo.txt', 'bar.txt']
['spam']
False
None
slow
┌──[root@liruilongs.github.io]-[~/python_demo]
└─$python3 demo.py  --pat=eggs  foo.txt bar.txt
['foo.txt', 'bar.txt']
['eggs']
False
None
slow
┌──[root@liruilongs.github.io]-[~/python_demo]
└─$python3 demo.py  -p spam --pat=eggs  foo.txt bar.txt
['foo.txt', 'bar.txt']
['spam', 'eggs']
False
None
slow

如果一个都没有,会提示缺少参数 -p/--pat

代码语言:javascript复制
┌──[root@liruilongs.github.io]-[~/python_demo]
└─$python3 demo.py   foo.txt bar.txt
usage: demo.py [-h] -p pattern [-v] [-o OUTFILE] [--speed {fast,slow}]
               [filename [filename ...]]
demo.py: error: the following arguments are required: -p/--pat
┌──[root@liruilongs.github.io]-[~/python_demo]
└─$

choices={'slow', 'fast'}, 参数说明接受一个值,但是会将其和可能的选择值做比较,以检测其合法性:

代码语言:javascript复制
parser.add_argument('--speed', dest='speed', action='store',
                    choices={'slow', 'fast'}, default='slow',
                    help='search speed')
代码语言:javascript复制
┌──[root@liruilongs.github.io]-[~/python_demo]
└─$python3 demo.py  --pat=eggs --speed 123  foo.txt bar.txt
usage: demo.py [-h] -p pattern [-v] [-o OUTFILE] [--speed {slow,fast}]
               [filename [filename ...]]
demo.py: error: argument --speed: invalid choice: '123' (choose from 'slow', 'fast')
┌──[root@liruilongs.github.io]-[~/python_demo]
└─$python3 demo.py  --pat=eggs --speed fast  foo.txt bar.txt
['foo.txt', 'bar.txt']
['eggs']
False
None
fast
┌──[root@liruilongs.github.io]-[~/python_demo]
└─$

一旦参数选项被指定,你就可以执行parser.parse()方法了。它会处理sys.argv的值并返回一个结果实例。每个参数值会被设置成该实例中add_argument()方法的 dest 参数指定的属性值。

还很多种其他方法解析命令行选项。可以会手动的处理 sys.argv 或者使用 getopt 模块。但是,如果你采用本节的方式,将会减少很多冗余代码,底层细节argparse 模块已经帮你处理了。你可能还会碰到使用optparse库解析选项的代码。尽管 optparse 和 argparse 很像,但是后者更先进,因此在新的程序中你应该使用它。

运行时弹出密码输入提示

「你写了个脚本,运行时需要一个密码。此脚本是交互式的,因此不能将密码在脚本中硬编码,而是需要弹出一个密码输入提示,让用户自己输入。」

Python 的 getpass 模块正是你所需要的。你可以让你很轻松的弹出密码输入提示,并且不会在用户终端回显密码。

代码语言:javascript复制
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-

import getpass

def svc_login(user, passwd):
    return user == passwd

user = getpass.getuser()
passwd = getpass.getpass()
if svc_login(user, passwd):
    print('Yay!')
else:
    print('Boo!')

代码中getpass.getuser()不会弹出用户名的输入提示。它会根据该用户的 shell 环境或者会依据本地系统的密码库(支持 pwd 模块的平台)来使用当前用户的登录名

代码语言:javascript复制
┌──[root@liruilongs.github.io]-[~/python_demo]
└─$./pass.py
Password: #root
Yay!

通过重定向/管道/文件接受输入

在bash中编写pytohn脚本接收外部数据的方式,一般情况下,对于一般变量,我们用命令行变量的方式比较多(手动的处理 sys.argv ),对于文件内容或者bash命令输出直接通过脚本内部获取需要的数据。

其实python 脚本也可以用其他方式来接收 传递给他的文件数据或者bash命令输出,包括将命令行的输出通过管道传递给该脚本、重定向文件到该脚本,或在命令行中传递一个文件名文件名列表给该脚本。

这里通过 Python 内置的 fileinput 模块,可以实现重定向,管道,文佳输出的方式传递数据到脚本内部

代码语言:javascript复制
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
"""
@File    :   filein.py
@Time    :   2022/05/01 06:05:43
@Author  :   Li Ruilong
@Version :   1.0
@Contact :   1224965096@qq.com
@Desc    :   None
"""

# here put the import lib

import fileinput
with fileinput.input() as f_input:
    for line in f_input:
        print("脚本输出", line, end='')

使用fileinput.input()方法可以获取当前输入脚本的数据,脚本里面用一个FileInput迭代器接收

代码语言:javascript复制
┌──[root@liruilongs.github.io]-[~/python_demo]
└─$vim filein.py
┌──[root@liruilongs.github.io]-[~/python_demo]
└─$chmod  x filein.py

文件直接接收

代码语言:javascript复制
┌──[root@liruilongs.github.io]-[~/python_demo]
└─$./filein.py /etc/passwd
脚本输出 root:x:0:0:root:/root:/bin/bash
脚本输出 bin:x:1:1:bin:/bin:/sbin/nologin
脚本输出 daemon:x:2:2:daemon:/sbin:/sbin/nol
。。。。

重定向接收

代码语言:javascript复制
┌──[root@liruilongs.github.io]-[~/python_demo]
└─$./filein.py <  /etc/passwd
脚本输出 root:x:0:0:root:/root:/bin/bash
脚本输出 bin:x:1:1:bin:/bin:/sbin/nologin
。。。。。。

管道方式接收

代码语言:javascript复制
┌──[root@liruilongs.github.io]-[~/python_demo]
└─$df -h
文件系统        容量  已用  可用 已用% 挂载点
/dev/sda1       150G   22G  129G   15% /
devtmpfs        983M     0  983M    0% /dev
tmpfs           993M     0  993M    0% /dev/shm
tmpfs           993M   17M  976M    2% /run
tmpfs           993M     0  993M    0% /sys/fs/cgroup
overlay         150G   22G  129G   15% /var/lib/docker/overlay2/9fbd33d3485f02eadef6907a5b4eaead4a384684b66c572d822a2942a82ca0d5/merged
overlay         150G   22G  129G   15% /var/lib/docker/overlay2/85ff22ccaf2db68a0a863bc404d79d72fa6c8744424f50ba8fb6bfa83d56b56a/merged
tmpfs           199M     0  199M    0% /run/user/0
┌──[root@liruilongs.github.io]-[~/python_demo]
└─$df -h |  ./filein.py
脚本输出 文件系统        容量  已用  可用 已用% 挂载点
脚本输出 /dev/sda1       150G   22G  129G   15% /
脚本输出 devtmpfs        983M     0  983M    0% /dev
脚本输出 tmpfs           993M     0  993M    0% /dev/shm
脚本输出 tmpfs           993M   17M  976M    2% /run
脚本输出 tmpfs           993M     0  993M    0% /sys/fs/cgroup
脚本输出 overlay         150G   22G  129G   15% /var/lib/docker/overlay2/9fbd33d3485f02eadef6907a5b4eaead4a384684b66c572d822a2942a82ca0d5/merged
脚本输出 overlay         150G   22G  129G   15% /var/lib/docker/overlay2/85ff22ccaf2db68a0a863bc404d79d72fa6c8744424f50ba8fb6bfa83d56b56a/merged
脚本输出 tmpfs           199M     0  199M    0% /run/user/0
┌──[root@liruilongs.github.io]-[~/python_demo]
└─$

fileinput.input() 创建并返回一个FileInput类的实例,该实例可以被当做一个上下文管理器使用。因此,整合起来,如果我们要写一个打印多个文件输出的脚本,那么我们需要在输出中包含文件名和行号

代码语言:javascript复制
>>> import fileinput
>>> with fileinput.input("/etc/passwd") as f:
...     for line in f:
...             print(f.filename(),f.fileno(),f.lineno(),line,end='')
...
/etc/passwd 3 1 root:x:0:0:root:/root:/bin/bash
/etc/passwd 3 2 bin:x:1:1:bin:/bin:/sbin/nologin
/etc/passwd 3 3 daemon:x:2:2:daemon:/sbin:/sbin/nologin
/etc/passwd 3 4 adm:x:3:4:adm:/var/adm:/sbin/nologin
/etc/passwd 3 5 lp:x:4:7:lp:/var/spool/lpd:/sbin/nologin
/etc/passwd 3 6 sync:x:5:0:sync:/sbin:/bin/sync

执行外部命令并获取它的输出

「你想执行一个外部命令并以 Python 字符串的形式获取执行结果。」

使用subprocess.check_output()函数。

代码语言:javascript复制
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-

import subprocess
out_bytes = subprocess.check_output(['netstat','-a'])
out_text = out_bytes.decode('utf-8')
print(out_text)


执行下试试

代码语言:javascript复制
┌──[root@liruilongs.github.io]-[~/python_demo]
└─$./py_sh.py
Active Internet connections (servers and established)
Proto Recv-Q Send-Q Local Address           Foreign Address         State
tcp        0      0 localhost:2379          0.0.0.0:*               LISTEN
tcp        0      0 vms55.rhce.cc:2379      0.0.0.0:*               LISTEN
tcp        0      0 localhost:2380          0.0.0.0:*               LISTEN
tcp        0      0 vms55.rhce.cc:2380      0.0.0.0:*               LISTEN
tcp        0      0 0.0.0.0:webcache        0.0.0.0:*               LISTEN
tcp        0      0 0.0.0.0:http            0.0.0.0:*               LISTEN

如果被执行的命令以非零码返回,就会抛出异常。下面的例子捕获到错误并获取返回码:

代码语言:javascript复制
try:
    out_bytes = subprocess.check_output(['cmd','arg1','arg2'])
except subprocess.CalledProcessError as e:
    out_bytes = e.output # Output generated before error
    code = e.returncode # Return code

默认情况下,check_output() 仅仅返回输入到标准输出的值。如果你需要同时收集标准输出和错误输出,使用stderr参数:

代码语言:javascript复制
out_bytes = subprocess.check_output(['cmd','arg1','arg2'],stderr=subprocess.STDOUT)

如果你需要用一个超时机制来执行命令,使用 timeout 参数:

代码语言:javascript复制
try:
    out_bytes = subprocess.check_output(['cmd','arg1','arg2'], timeout=5)
except subprocess.TimeoutExpired as e:
    ....

通常来讲,命令的执行不需要使用到底层 shell 环境(比如 sh、bash)。一个字符串列表会被传递给一个低级系统命令,比如 os.execve()

如果你想让命令被一个shell 执行,传递一个字符串参数,并设置参数 shell=True . 有时候你想要Python去执行一个复杂的 shell 命令的时候这个就很有用了,比如管道流、I/O 重定向和其他特性。例如:

代码语言:javascript复制
out_bytes = subprocess.check_output('grep python | wc > out', shell=True)

是在 shell 中执行命令会存在一定的安全风险,特别是当参数来自于用户输入时。这时候可以使用 shlex.quote() 函数来将参数正确的用双引用引起来。

使用 check_output() 函数是执行外部命令并获取其返回值的最简单方式。但是,如果你需要对子进程做更复杂的交互,比如给它发送输入,你得采用另外一种方法。这时候可直接使用subprocess.Popen类。

代码语言:javascript复制
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-

import subprocess
# Some text to send
text = b'''
hello world
this is a test
goodbye
'''
# Launch a command with pipes
p = subprocess.Popen(['wc'],
                     stdout=subprocess.PIPE,
                     stdin=subprocess.PIPE)
# Send the data and get the output
stdout, stderr = p.communicate(text)
# To interpret as text, decode
out = stdout.decode('utf-8')
err = stderr.decode('utf-8')

关于子进程,简单来看下

代码语言:javascript复制
┌──[root@liruilongs.github.io]-[~/python_demo]
└─$(pwd;echo $BASH_SUBSHELL;ps --forest)
/root/python_demo
1
   PID TTY          TIME CMD
  9324 pts/0    00:00:00 bash
 49906 pts/0    00:00:00  _ bash
 49907 pts/0    00:00:00      _ ps
┌──[root@liruilongs.github.io]-[~/python_demo]
└─$pwd;echo $BASH_SUBSHELL;ps --forest
/root/python_demo
0
   PID TTY          TIME CMD
  9324 pts/0    00:00:00 bash
 49908 pts/0    00:00:00  _ ps
┌──[root@liruilongs.github.io]-[~/python_demo]
└─$

也可以进程列表同协程结合的方式。你既可以在子shell中 进行繁重的处理工作,同时也不会让子shell的I/O受制于终端。

代码语言:javascript复制
┌──[root@liruilongs.github.io]-[~/python_demo]
└─$coproc (sleep 10;ps --forest;sleep 10;ps --forest)
[1] 50326
┌──[root@liruilongs.github.io]-[~/python_demo]
└─$jobs
[1]   运行中               coproc COPROC ( sleep 10; ps --forest; sleep 10; ps --forest ) &

如果直接丢到后台会自动在终端输出IO

代码语言:javascript复制
┌──[root@liruilongs.github.io]-[~/python_demo]
└─$( sleep 10; ps --forest; sleep 10; ps --forest ) &
[1] 50335
┌──[root@liruilongs.github.io]-[~/python_demo]
└─$ps --forest
   PID TTY          TIME CMD
  9324 pts/0    00:00:00 bash
 50335 pts/0    00:00:00  _ bash
 50336 pts/0    00:00:00  |   _ sleep
 50337 pts/0    00:00:00  _ ps
┌──[root@liruilongs.github.io]-[~/python_demo]
└─$   PID TTY          TIME CMD
  9324 pts/0    00:00:00 bash
 50335 pts/0    00:00:00  _ bash
 50340 pts/0    00:00:00      _ ps

[1]   完成                  ( sleep 10; ps --forest; sleep 10; ps --forest )
┌──[root@liruilongs.github.io]-[~/python_demo]
└─$

subprocess 模块对于依赖 TTY 的外部命令不合适用。例如,你不能使用它来自动化一个用户输入密码的任务(比如一个 ssh 会话)。这时候,你需要使用到第三方模块了,比如基于著名的 expect 家族的工具(pexpect 或类似的)(pexpect可以理解为Linux下的expect的Python封装、通过pexpect可以实现对ssh、ftp、passwd、telnet等命令行进行自动交互,而无需人工干涉来达到自动化的目的。比如我们可以模拟一个FTP登录时所有交互,包括输入主机地址、用户名、密码、上传文件等,待出现异常还可以进行尝试自动处理。)

终止程序并给出错误信息

「你想向标准错误打印一条消息并返回某个非零状态码来终止程序运行」

通过 pythonraise SystemExit(3)命令可以主动抛出一个错误,通过sys.stderr.write将命令写到标准的输出端

代码语言:javascript复制
#!/usr/bin/env python3

import sys
sys.stderr.write('It failed!n')
raise  SystemExit(3)
代码语言:javascript复制
┌──[root@liruilongs.github.io]-[~/python_demo]
└─$vim err.py
┌──[root@liruilongs.github.io]-[~/python_demo]
└─$./err.py
It failed!
┌──[root@liruilongs.github.io]-[~/python_demo]
└─$echo $?
3

直接将消息作为参数传给SystemExit(),那么你可以省略其他步骤

代码语言:javascript复制
#!/usr/bin/env python3

raise SystemExit('It failed!')

抛出一个 SystemExit 异常,使用错误消息作为参数,它会将消息在sys.stderr中打印,然后程序以状态码1退出

代码语言:javascript复制
┌──[root@liruilongs.github.io]-[~/python_demo]
└─$./err.py
It failed!
┌──[root@liruilongs.github.io]-[~/python_demo]
└─$echo $?
1

获取终端的大小

「你需要知道当前终端的大小以便正确的格式化输出。」

使用 os.get terminal size() 函数来做到这一点。

代码语言:javascript复制
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-

import os
sz = os.get_terminal_size()
print(sz)
print(sz.columns)
print(sz.lines)
代码语言:javascript复制
┌──[root@liruilongs.github.io]-[~/python_demo]
└─$./tim.py
os.terminal_size(columns=99, lines=30)
99
30
┌──[root@liruilongs.github.io]-[~/python_demo]
└─$./tim.py
os.terminal_size(columns=165, lines=30)
165
30
┌──[root@liruilongs.github.io]-[~/python_demo]
└─$

复制或者移动文件和目录

「复制或移动文件和目录,但是又不想调用 shell 命令。」

shutil 模块有很多便捷的函数可以复制文件和目录。使用起来非常简单

代码语言:javascript复制
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-

import shutil
# Copy src to dst. (cp src dst)
shutil.copy(src, dst)
# Copy files, but preserve metadata (cp -p src dst)
shutil.copy2(src, dst)
# Copy directory tree (cp -R src dst)
shutil.copytree(src, dst)
# Move src to dst (mv src dst)
shutil.move(src, dst)

这里不多讲,熟悉Linux的小伙伴应该不陌生。

默认情况下,对于符号链接这些命令处理的是它指向的东西文件。例如,如果源文件是一个符号链接,那么目标文件将会是符号链接指向的文件。如果你只想复制符号链接本身,那么需要指定关键字参数 follow_symlinks

代码语言:javascript复制
shutil.copytree(src, dst, symlinks=True)

copytree() 可以让你在复制过程中选择性的忽略某些文件或目录。你可以提供一个忽略函数,接受一个目录名和文件名列表作为输入,返回一个忽略的名称列表。例如:

代码语言:javascript复制
def ignore_pyc_files(dirname, filenames):
    return [name in filenames if name.endswith('.pyc')]
shutil.copytree(src, dst, ignore=ignore_pyc_files)

对于文件元数据信息,copy2() 这样的函数只能尽自己最大能力来保留它。访问时间、创建时间和权限这些基本信息会被保留,但是对于所有者、ACLs、资源 fork 和其他更深层次的文件元信息就说不准了

通常不会去使用 shutil.copytree() 函数来执行系统备份。当处理文件名的时候,最好使用os.path中的函数来确保最大的可移植性

代码语言:javascript复制
>>> filename = '/etc/docker/daemon.json'
>>> import os.path
>>> os.path.basename(filename)
'daemon.json'
>>> os.path.dirname(filename)
'/etc/docker'
>>> os.path.split(filename)
('/etc/docker', 'daemon.json')
>>> os.path.join('/new/dir', os.path.basename(filename))
'/new/dir/daemon.json'
>>> os.path.expanduser('~/guido/programs/daemon.json')
'/root/guido/programs/daemon.json'
>>>

使用copytree()复制文件夹的一个棘手的问题是对于错误的处理,可以使用异常块处理,或者通过 参数 ignore dangling symlinks=True忽略掉无效符号链接。

代码语言:javascript复制
try:
    shutil.copytree(src, dst)
except shutil.Error as e:
    for src, dst, msg in e.args[0]:
    # src is source name
    # dst is destination name
    # msg is error message from exception
    print(dst, src, msg)

创建和解压归档文件

「创建或解压常见格式的归档文件(比如.tar, .tgz 或.zip)」

shutil 模块拥有两个函数—— make archive() 和 unpack archive() 可派上用场,

代码语言:javascript复制
>>> import shutil
>>> shutil.unpack_archive('Python-3.3.0.tgz')
>>> shutil.make_archive('py33','zip','Python-3.3.0')
'/Users/beazley/Downloads/py33.zip'

make archive() 的第二个参数是期望的输出格式。可以使用get archive formats()获取所有支持的归档格式列表。

代码语言:javascript复制
>>> import shutil
>>> shutil.get_archive_formats()
[('bztar', "bzip2'ed tar-file"), ('gztar', "gzip'ed tar-file"), ('tar', 'uncompressed tar file'), ('xztar', "xz'ed tar-file"), ('zip', 'ZIP file')]
>>>

通过文件名查找文件

「你需要写一个涉及到文件查找操作的脚本,比如对日志归档文件的重命名工具,你不想在 Python 脚本中调用 shell,或者你要实现一些 shell 不能做的功能。」

查找文件,可使用 os.walk() 函数,传一个顶级目录名给它

代码语言:javascript复制
#!/usr/bin/env python3

import os,sys

def findfile(start, name):
    for relpath, dirs, files in os.walk(start):
        if name in files:
            full_path = os.path.join(start, relpath, name)
            print(os.path.normpath(os.path.abspath(full_path)))

if __name__ == '__main__':
    findfile(sys.argv[1], sys.argv[2])

os.walk() 方法为我们遍历目录树,每次进入一个目录,它会返回一个三元组,包含相对于查找目录的相对路径,一个该目录下的目录名列表,以及那个目录下面的文件名列表。

代码语言:javascript复制
┌──[root@liruilongs.github.io]-[~/python_demo]
└─$./find.py /etc/ passwd
/etc/passwd
/etc/pam.d/passwd
┌──[root@liruilongs.github.io]-[~/python_demo]
└─$

对于每个元组,只需检测一下目标文件名是否在文件列表中。如果是就使用os.path.join() 合并路径。为了避免奇怪的路径名比如 ././foo//bar ,使用了另外两个函数来修正结果

  • 第一个是os.path.abspath(), 它接受一个路径,可能是相对路径,最后返回绝对路径。
  • 第二个是os.path.normpath(),用来返回正常路径,可以解决双斜杆、对目录的多重引用的问题等。

os.walk(start)还有跨平台的优势。并且,还能很轻松的加入其他的功能。我们再演示一个例子,下面的函数打印所有最近被修改过的文件:

代码语言:javascript复制
#!/usr/bin/env python3
import os
import time
import sys

def modified_within(top, seconds):
    now = time.time()
    for path, dirs, files in os.walk(top):
        for name in files:
            fullpath = os.path.join(path, name)
            if os.path.exists(fullpath):
                mtime = os.path.getmtime(fullpath)
                if mtime > (now - seconds):
                    print(fullpath)


if __name__ == '__main__':
    if len(sys.argv) != 3:
        print('Usage: {} dir seconds'.format(sys.argv[0]))
        raise SystemExit(1)
    modified_within(sys.argv[1], float(sys.argv[2]))

打印10分钟之前被修改的数据

代码语言:javascript复制
┌──[root@liruilongs.github.io]-[~/python_demo]
└─$./find.py /etc/ 10
/etc/mtab
┌──[root@liruilongs.github.io]-[~/python_demo]
└─$ll /etc/mtab
lrwxrwxrwx. 1 root root 17 10月 18 2018 /etc/mtab -> /proc/self/mounts
┌──[root@liruilongs.github.io]-[~/python_demo]
└─$ll /proc/self/mounts
-r--r--r-- 1 root root 0 5月   2 01:18 /proc/self/mounts
┌──[root@liruilongs.github.io]-[~/python_demo]
└─$

读取配置文件

「怎样读取普通.ini 格式的配置文件?」

configparser 模块能被用来读取配置文件

编写配置文件

代码语言:javascript复制
; config.ini
; Sample configuration file
[installation]
library=%(prefix)s/lib
include=%(prefix)s/include
bin=%(prefix)s/bin
prefix=/usr/local

# Setting related to debug configuration
[debug]
log_errors=true
show_warnings=False

[server]
port: 8080
nworkers: 32
pid-file=/tmp/spam.pid
root=/www/root
signature:
    =================================
    Brought to you by the Python Cookbook
    =================================
代码语言:javascript复制
>>> from configparser import ConfigParser
>>> cfg = ConfigParser()
>>> cfg.read('config.ini')
['config.ini']
>>> cfg.sections()
['installation', 'debug', 'server']
>>> cfg.get('installation','library')
'/usr/local/lib'
>>> cfg.getboolean('debug','log_errors')
True
>>> cfg.getint('server','port')
8080
>>> cfg.getint('server','nworkers')
32
>>> print(cfg.get('server','signature'))

=================================
Brought to you by the Python Cookbook
=================================
>>>

如果有需要,你还能修改配置并使用cfg.write()方法将其写回到文件中

代码语言:javascript复制
>>> from configparser import ConfigParser
>>> cfg = ConfigParser()
>>> cfg.read('config.ini')
['config.ini']
>>> cfg.set('server','port','9000')
>>> cfg.set('debug','log_errors','False')
>>> import sys
>>> cfg.write(sys.stdout)
[installation]
library = %(prefix)s/lib
include = %(prefix)s/include
bin = %(prefix)s/bin
prefix = /usr/local

[debug]
log_errors = False
show_warnings = False

[server]
port = 9000
nworkers = 32
pid-file = /tmp/spam.pid
root = /www/root
signature =
        =================================
        Brought to you by the Python Cookbook
        =================================

>>>
  • 配置文件中的名字是不区分大小写
  • 解析值的时候,getboolean() 方法查找任何可行的值。
  • ConfigParser 能一次读取多个配置文件然后合并成一个配置。后面读取的配置文件会覆盖前面的配置文件

给简单脚本增加日志功能

「你希望在脚本和程序中将诊断信息写入日志文件。」

python 脚本打印日志最简单方式是使用 logging 模块

代码语言:javascript复制
#`!/usr/bin/env python3
# -*- encoding: utf-8 -*-

import logging


def main():
    # Configure the logging system
    logging.basicConfig(filename='app.log',
                        level=logging.ERROR)
    # Variables (to make the calls that follow work)
    hostname = 'www.python.org'
    item = 'spam'
    filename = 'data.csv'
    mode = 'r'
    # Example logging calls (insert into your program)
    logging.critical('Host %s unknown', hostname)
    logging.error("Couldn't find %r", item)
    logging.warning('Feature is deprecated')
    logging.info('Opening file %r, mode=%r', filename, mode)
    logging.debug('Got here')
    
if __name__ == '__main__':
    main()                    

五个日志调用(critical(), error(), warning(), info(), debug())以降序方式表示不同的严重级别。basicConfig() level参数是一个过滤器。所有级别低于此级别的日志消息都会被忽略掉。每个logging操作的参数是一个消息字符串,后面再跟一个或多个参数。构造最终的日志消息的时候我们使用了% 操作符来格式化消息字符串。

代码语言:javascript复制
┌──[root@liruilongs.github.io]-[~/python_demo]
└─$./logger.py
┌──[root@liruilongs.github.io]-[~/python_demo]
└─$cat app.log
CRITICAL:root:Host www.python.org unknown
ERROR:root:Couldn't find 'spam'
┌──[root@liruilongs.github.io]-[~/python_demo]
└─$

如果你想使用配置文件,可以像下面这样修改basicConfig()调用:

代码语言:javascript复制
import logging
import logging.config
def main():
    # Configure the logging system
    logging.config.fileConfig('logconfig.ini')

logconfig.ini

代码语言:javascript复制
[loggers]
keys=root
[handlers]
keys=defaultHandler
[formatters]
keys=defaultFormatter
[logger_root]
level=INFO
handlers=defaultHandler
qualname=root
[handler_defaultHandler]
class=FileHandler
formatter=defaultFormatter
args=('app.log', 'a')
[formatter_defaultFormatter]
format=%(levelname)s:%(name)s:%(message)s

在调用日志操作前先执行下 basicConfig() 函数方法,可以找标准输出或者文件中输出

basicConfig() 在程序中只能被执行一次。如果你稍后想改变日志配置,就需要先获取 root logger ,然后直接修改它。

代码语言:javascript复制
logging.getLogger().level = logging.DEBUG

更多见日志模块文档https://docs.python.org/3/howto/logging-cookbook.html

给函数库增加日志功能

「你想给某个函数库增加日志功能,但是又不能影响到那些不使用日志功能的程序。」

对于想要执行日志操作的函数库,你应该创建一个专属的logger对象,并且像下面这样初始化配置:

代码语言:javascript复制
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-

import logging
log = logging.getLogger(__name__)
log.addHandler(logging.NullHandler())

# Example function (for testing)
def func():
    log.critical('A Critical Error!')
    log.debug('A debug message')

func()

使用这个配置,默认情况下不会打印日志,只有配置过日志系统,那么日志消息打印就开始生效

代码语言:javascript复制
logging.basicConfig()
代码语言:javascript复制
┌──[root@liruilongs.github.io]-[~/python_demo]
└─$./logg.py
CRITICAL:__main__:A Critical Error!

通常来讲,不应该在函数库代码中自己配置日志系统,或者是已经有个已经存在的日志配置了。调用getLogger( name )创建一个和调用模块同名的 logger 模块。由于模块都是唯一的,因此创建的 logger 也将是唯一的。所以当前进程中只有一个logging会生效。

log.addHandler(logging.NullHandler()) 操作将一个空处理器绑定到刚刚已经创建好的 logger 对象上。一个空处理器默认会忽略调用所有的日志消息。因此,如果使用该函数库的时候还没有配置日志,那么将不会有消息或警告出现。

在这里,根日志被配置成仅仅输出 ERROR 或更高级别的消息。不过,somelib 的日志级别被单独配置成可以输出 debug 级别的消息,它的优先级比全局配置高。像这样更改单独模块的日志配置对于调试来讲是很方便的,因为你无需去更改任何的全局日志配置——只需要修改你想要更多输出的模块的日志等级。(这个还有待研究)

实现一个计时器

「你想记录程序执行多个任务所花费的时间」

time 模块包含很多函数来执行跟时间有关的函数。尽管如此,通常我们会在此基础之上构造一个更高级的接口来模拟一个计时器。

代码语言:javascript复制
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-

import time


class Timer:
    def __init__(self, func=time.perf_counter):
        self.elapsed = 0.0
        self._func = func
        self._start = None

    def start(self):
        if self._start is not None:
            raise RuntimeError('Already started')
        self._start = self._func()

    def stop(self):
        if self._start is None:
            raise RuntimeError('Not started')
        end = self._func()
        self.elapsed  = end - self._start
        self._start = None

    def reset(self):
        self.elapsed = 0.0

    @property #类的属性私有化,那么可以使用@property 使属性可以被外部访问并修改
    def running(self):
        return self._start is not None

    def __enter__(self):
        self.start()
        return self

    def __exit__(self, *args):
        self.stop()

这个类定义了一个可以被用户根据需要启动、停止和重置的计时器。它会在elapsed 属性中记录整个消耗时间。下面是一个例子来演示怎样使用它:

代码语言:javascript复制
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-

import time


class Timer:
    def __init__(self, func=time.perf_counter):
        self.elapsed = 0.0
        self._func = func
        self._start = None

    def start(self):
        if self._start is not None:
            raise RuntimeError('Already started')
        self._start = self._func()

    def stop(self):
        if self._start is None:
            raise RuntimeError('Not started')
        end = self._func()
        self.elapsed  = end - self._start
        self._start = None

    def reset(self):
        self.elapsed = 0.0

    @property #类的属性私有化,那么可以使用@property 使属性可以被外部访问并修改
    def running(self):
        return self._start is not None

    def __enter__(self):
        self.start()
        return self

    def __exit__(self, *args):
        self.stop()

def countdown(n):
    while n > 0:
        n -= 1
# Use 1: Explicit start/stop
t = Timer()
t.start()
countdown(1000000)
t.stop()
print(t.elapsed)
# Use 2: As a context manager
with t:
    countdown(1000000)
print(t.elapsed)
with Timer() as t2:
    countdown(1000000)
print(t2.elapsed)

这里通过__enter__,__exit__ ,使用with 语句以及上下文管理器协议可以省略计时器打开和关闭操作。(关于上下文管理协议,即with语句,为了让一个对象兼容with语句,必须在这个对象的类中声明__enter__和__exit__方法,,__enter__在出现with语句被调用,__exit__在代码执行完毕被调用,可以参考open()方法)

代码语言:javascript复制
┌──[root@liruilongs.github.io]-[~/python_demo]
└─$./times.py
0.05191648800246185
0.12038616700374405
0.06592946800083155

在计时中要考虑一个底层的时间函数问题。一般来说, 使用 time.time()time.clock()计算的时间精度因操作系统的不同会有所不同。而使用time.perf_counter() 函数可以确保使用系统上面最精确的计时器

限制脚本的内存和CPU的使用量

「你想对在 Unix 系统上面运行的程序设置内存或 CPU 的使用限制。」

cpu 限制

resource 模块能同时执行这两个任务。例如,要限制 CPU 时间,下面的代码在windows平台执行不了,但是Linux是可以的。

代码语言:javascript复制
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-

import signal
import resource  
import os
def time_exceeded(signo, frame):
    print("Time's up!")
    raise SystemExit(1)
def set_max_runtime(seconds):
    # 安装信号处理程序并设置资源限制  
    soft, hard = resource.getrlimit(resource.RLIMIT_CPU)
    # 限制CUP使用时间为15秒
    resource.setrlimit(resource.RLIMIT_CPU, (seconds, hard))
    # 进程即将结束,给一个信号量,加一个回调
    signal.signal(signal.SIGXCPU, time_exceeded)

if __name__ == '__main__':
    set_max_runtime(15)
    while True:
        pass

程序运行时,SIGXCPU 信号在时间过期时被生成,然后执行清理并退出。

代码语言:javascript复制
┌──[root@liruilongs.github.io]-[~/python_demo]
└─$vim cpu.py
┌──[root@liruilongs.github.io]-[~/python_demo]
└─$chmod  x cpu.py
┌──[root@liruilongs.github.io]-[~/python_demo]
└─$./cpu.py
Time's up!
内存限制

这暂时没有好的Demo...

代码语言:javascript复制
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-


import resource

def limit_memory(maxsize):
        soft, hard = resource.getrlimit(resource.RLIMIT_AS)
        resource.setrlimit(resource.RLIMIT_AS, (maxsize, hard))

# 0.5 * 1024 ^ 6 = 576460752303423488 设置最大内存500M
limit_memory(576460752303423488)

程序运行到没有多余内存时会抛出 MemoryError 异常。

setrlimit() 函数被用来设置特定资源上面的软限制和硬限制

  • 软限制是一个值,当超过这个值的时候操作系统通常会发送一个信号来限制或通知该进程.
代码语言:javascript复制
>>> resource.RLIMIT_AS
9
  • 硬限制是用来指定软限制能设定的最大值。通常来讲,这个由系统管理员通过设置系统级参数来决定。尽管硬限制可以改小一点,但是最好不要使用用户进程去修改。
代码语言:javascript复制
>>> resource.getrlimit(resource.RLIMIT_AS)
(-1, -1)

setrlimit() 函数还能被用来设置子进程数量、打开文件数以及类似系统资源的限制(cgroup)

启动一个WEB浏览器

「通过脚本启动浏览器并打开指定的 URL 网页」

webbrowser 模块能被用来启动一个浏览器,并且与平台无关

代码语言:javascript复制
Windows PowerShell
版权所有 (C) Microsoft Corporation。保留所有权利。

尝试新的跨平台 PowerShell https://aka.ms/pscore6

PS E:docker> python
Python 3.9.0 (tags/v3.9.0:9cf6752, Oct  5 2020, 15:23:07) [MSC v.1927 32 bit (Intel)] on win32
Type "help", "copyright", "credits" or "license" for more information.
>>>
>>> import webbrowser
>>> webbrowser.open('http://www.python.org')
True
>>>

新窗口打卡网站

代码语言:javascript复制
webbrowser.open_new('http://www.python.org')

当前窗口打开一个tab页

代码语言:javascript复制
webbrowser.open_new_tab('http://www.python.org')

指定浏览器类型,可以使用 webbrowser.get() 函数

代码语言:javascript复制
>>> c = webbrowser.get('firefox')
>>> c.open('http://www.python.org')
True
>>> c.open_new_tab('http://docs.python.org')
True
>>>

0 人点赞