阿布云

你所需要的,不仅仅是一个好用的代理。

游戏测试和Python (小工具篇)

阿布云 发表于

34.png

很多童鞋初学Python,学习了语法和基础类库后,开始迷茫如何实际使用到工作中去,其实Python可以做的事情是很多的,将日常工作的一些事情自动化,对我们的工作效率有很大的提升。

本文面向Py新手,分享一些辅助工作的小工具思路。以下例子都是在Win10 + Py3.5下完成。

调用CMD

subprocess是Python自带的子进程管理模块,定义有数个创建子进程的函数,也提供了一些管理标准流(standard stream)和管道(pipe)的工具,从而在进程间使用文本通信。

简单理解就是,你通过CMD敲的命令,都基本可以用subprocess来实现批量处理。

例子1:批量SVN操作

以更新SVN为例,这是一个频繁的操作,尤其是多个SVN目录需要一一更新的时候,手动起来是挺麻烦的。

import subprocess

subprocess.Popen(r'TortoiseProc.exe /command:update /path:"C:\project\策划文档" /closeonend:0')

subprocess.Popen(r'TortoiseProc.exe /command:update /path:"C:\project\配置文档" /closeonend:0')

例子2:adb命令的封装

做安卓手游测试的时候,adb是常用工具,我们可以通过它,进行apk的安装,卸载,截图,获取APK信息,性能数据,获取手机信息等等操作。

比如获取当前运行在前台的apk的package和activity名称

def run_cmd(cmd):

       """执行CMD命令"""

       p = subprocess.Popen(cmd, stdout=subprocess.PIPE)

       return [i.decode() for i in p.communicate()[0].splitlines()]

 

def get_apk_info():

       """获取apk的package,activity名称

       :return: list eg ['com.android.calendar', 'com.meizu.flyme.calendar.AllInOneActivity']

        """              

        result = run_cmd("adb shell dumpsys activity top")

        for line in result:

             if line.strip().startswith('ACTIVITY'):

                 return line.split()[1].split('/')

print(get_apk_info())

output: ['com.android.calendar', 'com.meizu.flyme.calendar.AllInOneActivity']

比如查看当前apk的内存占用

def get_mem_using(package_name=None):

        """查看apk的内存占用

        :param package_name:

        :return: 单位KB

        """

        if not package_name:

              package_name = get_apk_info()[0]

        result = run_cmd("adb shell dumpsys meminfo {}".format(package_name))

        info = re.search('TOTAL\W+\d+', str(result)).group()

        mem = ''

        try:

              mem = info.split()

        except Exception as e:

                 print(info)

                 print(e)

        return mem[-1]

output: 37769

比如备份当前apk到桌面

def backup_current_apk(path=r"C:\Users\jianbing\Desktop\apks"):

       package = get_apk_info()[0]

       result = run_cmd("adb shell pm path {}".format(package))

       cmd = "adb pull {} {}".format(result[0].split(":")[-1], os.path.join(path, "{}.apk".format(package)))

       print(cmd)

       run_cmd(cmd)

再进一步,将常用的adb操作封装为一个ADB工具类。社区里也有童鞋之前分享过,传送门。

处理文本

例子:在整个文件夹中搜索关键字

某天策划说,这个版本他删掉了某个道具,让我检查下有没有删漏的地方,这个道具产出的地方不少,最佳的检查方式是各个相关配置表看下还有没有配置这个道具。

那就写个脚本遍历整个文件夹来搜索指定关键字吧。

import os

def get_files_by_suffix(path, suffixes=("txt", "xml"), traverse=True):

       """从path路径下,找出全部指定后缀名的文件

       :param path: 根目录

       :param suffixes: 指定查找的文件后缀名

       :param traverse: 如果为False,只遍历一层目录

       :return:

        """

        file_list = []

        for root, dirs, files in os.walk(path):

              for file in files:

                    file_suffix = os.path.splitext(file)[1][1:].lower() # 后缀名

                    if file_suffix in suffixes:

                         file_list.append(os.path.join(root, file))

              if not traverse:

                    return file_list

       return file_list

if __name__ == '__main__':

      keyword = "XXX宝箱"

      files = get_files_by_suffix(r"C:\project\config")

      for file in files:

            with open(file, 'r', encoding='utf-8', errors='ignore') as f:

                  content = f.read().lower()

                  position = content.find(keyword.lower())

                  if position != -1:

                        print("Find in {0}".format(file))

                        start = position - 100 if position - 100 > 0 else 0

                        end = position + 100 if position + 100 < len(content) else len(content)

                        print(content[start:end])

                        print("_" * 100)

操作远程服务器

例子1:查看内网发版时间

有时候问开发,最近一次内网服务端发版是什么时候?开发回答:有点忘记了。。那就得自力更生了~

手动方式:使用FTP软件连入内网服务器,查看文件的更新日期,从而知道发版时间。

懒人方式:Py大法好~

paramiko是Python很有名的第三方库,遵循SSH2协议,支持以加密和认证的方式,进行远程服务器的连接。

import paramiko

import time

_transport = paramiko.Transport("192.168.1.10:22")

_transport.connect(username="root", password="XXXXXX")

sftp = paramiko.SFTPClient.from_transport(_transport)

result = sftp.listdir_attr("/data/www/sg/sg_dev/socket/conf/config/treasure")

print("发版时间是:{}".format(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(result[0].st_mtime))))

sftp.close()

例子2:查看内网报错信息

在进行测试的时候,需要多留意服务端是否有新的报错信息,有些报错在客户端并没有什么表现,比如数据进库失败,手动方式:通过SecureCRT连入内网服务器,CD到Log目录下,然后tail -n 200 sg_error.log 查看最新的报错信息。

于是萌生了写一个小工具来定时检测,发现报错信息就保存起来的想法。

import datetime

import paramiko

import time

import os

 

 

class ScanError(object):

 

    def __init__(self):

 

        self._ssh = paramiko.SSHClient()

        self.last_error_log = None

        self._init()

 

    def _init(self):

        os.chdir("data")   # 打算将报错信息保存到data目录下

        self._ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())

        self._ssh.connect("192.168.1.10", username="root", password="XXXXXX")

 

        error_log = self.get_error_log(500)

        self.last_error_log = error_log

 

        # 检测最近三天有没有报错信息

        today = datetime.date.today()

        yesterday = today - datetime.timedelta(days=1)

        the_day_before_yesterday = today - datetime.timedelta(days=2)

 

        error_log_str = "\n".join(error_log)

        if error_log_str.find(str(today)) > -1 or error_log_str.find(str(yesterday)) > -1 or error_log_str.find(str(the_day_before_yesterday)) > -1:

            self.save_error_log("error.txt", error_log)

            print('内网最近三天有错误信息,请查看')

            os.popen('error.txt')

 

    def get_error_log(self, num=200):

        cmd = 'cd /data/www/sg/sg_dev/socket/log&&tail -n {} sg_error.log'.format(num)

        stdin, stdout, stderr = self._ssh.exec_command(cmd)

        error_log = [i.decode("utf-8") for i in stdout.read().splitlines() if i]

        return error_log

 

    @staticmethod

    def save_error_log(file_name, log: list):

        with open(file_name, 'w', encoding='utf-8') as f:

            f.write("\n".join(log))

 

    def run_forever(self, interval=30, show_error=True):

        """运行检测工具

 

        :param interval: 检测间隔

        :param show_error: 是否检测到报错就自动弹出显示

        :return:

        """

        while 1:

            time.sleep(interval)

            error_log = self.get_error_log()

 

            if error_log != self.last_error_log and "\n".join(set(error_log) - set(self.last_error_log)).find("ERROR") > -1:

 

                self.last_error_log = error_log

                file_name = time.strftime("%Y-%m-%d-%H-%M-%S.txt", time.localtime(time.time()))

                print('{} 检测到内网有新的错误信息'.format(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))))

                self.save_error_log(file_name, error_log)

 

                if show_error:

                    os.popen(file_name)

 

 

if __name__ == '__main__':

    ScanError().run_forever()

数据库操作

例子:找号功能

内网的服务器建了N多的号,有时候看着排行榜某个帐号,想登录看下数据,可以使用Python写一个连接数据库的找号脚本。

import cymysql

 

player_name = "XXXXXX"

conn = cymysql.connect(host='XXXXXX', user='sg', passwd='XXXXXX', db="dev", charset='utf8')

cur = conn.cursor()

 

sql = "select * from Player where name like '%{0}%'".format(player_name)   # 模糊搜索,从玩家名称搜索玩家ID

cur.execute(sql)

for r in cur.fetchall():

    sql = "select * from Account where uid = '{0}'".format(r[0])   # 从玩家ID搜索玩家帐号

    cur.execute(sql)

    for row in cur.fetchall():

        print('{0}, {1}, {2}'.format(r[0], r[1], row[2]))  # 打印相关信息

 

conn.close()

扩展开发提供的工具

在之前某个项目,开发做了一个给游戏帐号发道具的网页,提供测试使用,操作流程是这样的,在网页上的表单里边,填写玩家的ID,在下拉列表选中要发送的道具(支持模糊搜索),填写数量。

这个网页使用起来,工作效率不高的地方就是,每次添加道具,都需要重新选择道具和填写数量,且添加过程没有记录下来,无法复用。

优化方案,在网页上点击添加道具,其实就是网页给游戏服务器发送了一个HTTP请求,那就直接让Python来代劳吧~

import requests

 

player_id = 1100000103

server_ip = "192.168.1.21:5000"

 

data = {"stuffList": []}

url = "http://{}/api/{}/stuff".format(server_ip, player_id)

 

data["stuffList"].append({"itemID": 1104000007, "number": 1000}) # itemID为1104000007的物品,数量1000

data["stuffList"].append({"itemID": 1104000008, "number": 1000}) data["stuffList"].append({"itemID": 1104000009, "number": 1000}) data["stuffList"].append({"itemID": 1104000010, "number": 1000}) data["stuffList"].append({"itemID": 1104000011, "number": 1000}) data["stuffList"].append({"itemID": 1104000012, "number": 1000})

 

requests.post(url, json=data, timeout=5) # 添加道具

# requests.delete(url, json=data, timeout=5) # 删除道具

有没有发现,每个脚本都很简短~