你所需要的,不仅仅是一个好用的代理。
很多童鞋初学Python,学习了语法和基础类库后,开始迷茫如何实际使用到工作中去,其实Python可以做的事情是很多的,将日常工作的一些事情自动化,对我们的工作效率有很大的提升。
本文面向Py新手,分享一些辅助工作的小工具思路。以下例子都是在Win10 + Py3.5下完成。
subprocess是Python自带的子进程管理模块,定义有数个创建子进程的函数,也提供了一些管理标准流(standard stream)和管道(pipe)的工具,从而在进程间使用文本通信。
简单理解就是,你通过CMD敲的命令,都基本可以用subprocess来实现批量处理。
例子1:批量SVN操作
以更新SVN为例,这是一个频繁的操作,尤其是多个SVN目录需要一一更新的时候,手动起来是挺麻烦的。
import subprocess
subprocess.Popen(r'TortoiseProc.exe /command:update /path:"C:\project\策划文档" /closeonend:0')
subprocess.Popen(r'TortoiseProc.exe /command:update /path:"C:\project\配置文档" /closeonend:0')
例子2:adb命令的封装
做安卓手游测试的时候,adb是常用工具,我们可以通过它,进行apk的安装,卸载,截图,获取APK信息,性能数据,获取手机信息等等操作。
比如获取当前运行在前台的apk的package和activity名称
def run_cmd(cmd):
"""执行CMD命令"""
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
return [i.decode() for i in p.communicate()[0].splitlines()]
def get_apk_info():
"""获取apk的package,activity名称
:return: list eg ['com.android.calendar', 'com.meizu.flyme.calendar.AllInOneActivity']
"""
result = run_cmd("adb shell dumpsys activity top")
for line in result:
if line.strip().startswith('ACTIVITY'):
return line.split()[1].split('/')
print(get_apk_info())
output: ['com.android.calendar', 'com.meizu.flyme.calendar.AllInOneActivity']
比如查看当前apk的内存占用
def get_mem_using(package_name=None):
"""查看apk的内存占用
:param package_name:
:return: 单位KB
"""
if not package_name:
package_name = get_apk_info()[0]
result = run_cmd("adb shell dumpsys meminfo {}".format(package_name))
info = re.search('TOTAL\W+\d+', str(result)).group()
mem = ''
try:
mem = info.split()
except Exception as e:
print(info)
print(e)
return mem[-1]
output: 37769
比如备份当前apk到桌面
def backup_current_apk(path=r"C:\Users\jianbing\Desktop\apks"):
package = get_apk_info()[0]
result = run_cmd("adb shell pm path {}".format(package))
cmd = "adb pull {} {}".format(result[0].split(":")[-1], os.path.join(path, "{}.apk".format(package)))
print(cmd)
run_cmd(cmd)
再进一步,将常用的adb操作封装为一个ADB工具类。社区里也有童鞋之前分享过,传送门。
例子:在整个文件夹中搜索关键字
某天策划说,这个版本他删掉了某个道具,让我检查下有没有删漏的地方,这个道具产出的地方不少,最佳的检查方式是各个相关配置表看下还有没有配置这个道具。
那就写个脚本遍历整个文件夹来搜索指定关键字吧。
import os
def get_files_by_suffix(path, suffixes=("txt", "xml"), traverse=True):
"""从path路径下,找出全部指定后缀名的文件
:param path: 根目录
:param suffixes: 指定查找的文件后缀名
:param traverse: 如果为False,只遍历一层目录
:return:
"""
file_list = []
for root, dirs, files in os.walk(path):
for file in files:
file_suffix = os.path.splitext(file)[1][1:].lower() # 后缀名
if file_suffix in suffixes:
file_list.append(os.path.join(root, file))
if not traverse:
return file_list
return file_list
if __name__ == '__main__':
keyword = "XXX宝箱"
files = get_files_by_suffix(r"C:\project\config")
for file in files:
with open(file, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read().lower()
position = content.find(keyword.lower())
if position != -1:
print("Find in {0}".format(file))
start = position - 100 if position - 100 > 0 else 0
end = position + 100 if position + 100 < len(content) else len(content)
print(content[start:end])
print("_" * 100)
例子1:查看内网发版时间
有时候问开发,最近一次内网服务端发版是什么时候?开发回答:有点忘记了。。那就得自力更生了~
手动方式:使用FTP软件连入内网服务器,查看文件的更新日期,从而知道发版时间。
懒人方式:Py大法好~
paramiko是Python很有名的第三方库,遵循SSH2协议,支持以加密和认证的方式,进行远程服务器的连接。
import paramiko
import time
_transport = paramiko.Transport("192.168.1.10:22")
_transport.connect(username="root", password="XXXXXX")
sftp = paramiko.SFTPClient.from_transport(_transport)
result = sftp.listdir_attr("/data/www/sg/sg_dev/socket/conf/config/treasure")
print("发版时间是:{}".format(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(result[0].st_mtime))))
sftp.close()
例子2:查看内网报错信息
在进行测试的时候,需要多留意服务端是否有新的报错信息,有些报错在客户端并没有什么表现,比如数据进库失败,手动方式:通过SecureCRT连入内网服务器,CD到Log目录下,然后tail -n 200 sg_error.log 查看最新的报错信息。
于是萌生了写一个小工具来定时检测,发现报错信息就保存起来的想法。
import datetime
import paramiko
import time
import os
class ScanError(object):
def __init__(self):
self._ssh = paramiko.SSHClient()
self.last_error_log = None
self._init()
def _init(self):
os.chdir("data") # 打算将报错信息保存到data目录下
self._ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self._ssh.connect("192.168.1.10", username="root", password="XXXXXX")
error_log = self.get_error_log(500)
self.last_error_log = error_log
# 检测最近三天有没有报错信息
today = datetime.date.today()
yesterday = today - datetime.timedelta(days=1)
the_day_before_yesterday = today - datetime.timedelta(days=2)
error_log_str = "\n".join(error_log)
if error_log_str.find(str(today)) > -1 or error_log_str.find(str(yesterday)) > -1 or error_log_str.find(str(the_day_before_yesterday)) > -1:
self.save_error_log("error.txt", error_log)
print('内网最近三天有错误信息,请查看')
os.popen('error.txt')
def get_error_log(self, num=200):
cmd = 'cd /data/www/sg/sg_dev/socket/log&&tail -n {} sg_error.log'.format(num)
stdin, stdout, stderr = self._ssh.exec_command(cmd)
error_log = [i.decode("utf-8") for i in stdout.read().splitlines() if i]
return error_log
@staticmethod
def save_error_log(file_name, log: list):
with open(file_name, 'w', encoding='utf-8') as f:
f.write("\n".join(log))
def run_forever(self, interval=30, show_error=True):
"""运行检测工具
:param interval: 检测间隔
:param show_error: 是否检测到报错就自动弹出显示
:return:
"""
while 1:
time.sleep(interval)
error_log = self.get_error_log()
if error_log != self.last_error_log and "\n".join(set(error_log) - set(self.last_error_log)).find("ERROR") > -1:
self.last_error_log = error_log
file_name = time.strftime("%Y-%m-%d-%H-%M-%S.txt", time.localtime(time.time()))
print('{} 检测到内网有新的错误信息'.format(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))))
self.save_error_log(file_name, error_log)
if show_error:
os.popen(file_name)
if __name__ == '__main__':
ScanError().run_forever()
例子:找号功能
内网的服务器建了N多的号,有时候看着排行榜某个帐号,想登录看下数据,可以使用Python写一个连接数据库的找号脚本。
import cymysql
player_name = "XXXXXX"
conn = cymysql.connect(host='XXXXXX', user='sg', passwd='XXXXXX', db="dev", charset='utf8')
cur = conn.cursor()
sql = "select * from Player where name like '%{0}%'".format(player_name) # 模糊搜索,从玩家名称搜索玩家ID
cur.execute(sql)
for r in cur.fetchall():
sql = "select * from Account where uid = '{0}'".format(r[0]) # 从玩家ID搜索玩家帐号
cur.execute(sql)
for row in cur.fetchall():
print('{0}, {1}, {2}'.format(r[0], r[1], row[2])) # 打印相关信息
conn.close()
在之前某个项目,开发做了一个给游戏帐号发道具的网页,提供测试使用,操作流程是这样的,在网页上的表单里边,填写玩家的ID,在下拉列表选中要发送的道具(支持模糊搜索),填写数量。
这个网页使用起来,工作效率不高的地方就是,每次添加道具,都需要重新选择道具和填写数量,且添加过程没有记录下来,无法复用。
优化方案,在网页上点击添加道具,其实就是网页给游戏服务器发送了一个HTTP请求,那就直接让Python来代劳吧~
import requests
player_id = 1100000103
server_ip = "192.168.1.21:5000"
data = {"stuffList": []}
url = "http://{}/api/{}/stuff".format(server_ip, player_id)
data["stuffList"].append({"itemID": 1104000007, "number": 1000}) # itemID为1104000007的物品,数量1000
data["stuffList"].append({"itemID": 1104000008, "number": 1000}) data["stuffList"].append({"itemID": 1104000009, "number": 1000}) data["stuffList"].append({"itemID": 1104000010, "number": 1000}) data["stuffList"].append({"itemID": 1104000011, "number": 1000}) data["stuffList"].append({"itemID": 1104000012, "number": 1000})
requests.post(url, json=data, timeout=5) # 添加道具
# requests.delete(url, json=data, timeout=5) # 删除道具
有没有发现,每个脚本都很简短~