你所需要的,不仅仅是一个好用的代理。
环境安装:
sudo pip install flask
Flask 是一个Python的微服务的框架,基于 Werkzeug , 一个 WSGI 类库。
Flask 优点:
一个响应 /articles 和 /articles/:id的 API 服务:
from flask import Flask, url_for
app = Flask(__name__)
@app.route('/')
def api_root():
return 'Welcome'
@app.route('/articles')
def api_articles():
return 'List of ' + url_for('api_articles')
@app.route('/articles/<articleid>')
def api_article(articleid):
return 'You are reading ' + articleid
if __name__ == '__main__':
app.run()
请求:
curl http://127.0.0.1:5000/
响应:
GET /
Welcome
GET /articles
List of /articles
GET /articles/123
You are reading 123
from flask import request
@app.route('/hello')
def api_hello():
if 'name' in request.args:
return 'Hello ' + request.args['name']
else:
return 'Hello John Doe'
请求:
GET /hello
Hello John Doe
GET /hello?name=Luis
Hello Luis
@app.route('/echo', methods = ['GET', 'POST', 'PATCH', 'PUT', 'DELETE'])
def api_echo():
if request.method == 'GET':
return "ECHO: GET\n"
elif request.method == 'POST':
return "ECHO: POST\n"
elif request.method == 'PATCH':
return "ECHO: PACTH\n"
elif request.method == 'PUT':
return "ECHO: PUT\n"
elif request.method == 'DELETE':
return "ECHO: DELETE"
请求指定request type:
curl -X PATCH http://127.0.0.1:5000/echo
GET /echo
ECHO: GET
POST /ECHO
ECHO: POST
from flask import json
@app.route('/messages', methods = ['POST'])
def api_message():
if request.headers['Content-Type'] == 'text/plain':
return "Text Message: " + request.data
elif request.headers['Content-Type'] == 'application/json':
return "JSON Message: " + json.dumps(request.json)
elif request.headers['Content-Type'] == 'application/octet-stream':
f = open('./binary', 'wb')
f.write(request.data)
f.close()
return "Binary message written!"
else:
return "415 Unsupported Media Type ;)"
请求指定content type:
curl -H "Content-type: application/json" \
-X POST http://127.0.0.1:5000/messages -d '{"message":"Hello Data"}'
curl -H "Content-type: application/octet-stream" \
-X POST http://127.0.0.1:5000/messages --data-binary @message.bin
from flask import json
@app.route('/messages', methods = ['POST'])
def api_message():
if request.headers['Content-Type'] == 'text/plain':
return "Text Message: " + request.data
elif request.headers['Content-Type'] == 'application/json':
return "JSON Message: " + json.dumps(request.json)
elif request.headers['Content-Type'] == 'application/octet-stream':
f = open('./binary', 'wb')
f.write(request.data)
f.close()
return "Binary message written!"
else:
return "415 Unsupported Media Type ;)"
请求指定content type:
curl -H "Content-type: application/json" \
-X POST http://127.0.0.1:5000/messages -d '{"message":"Hello Data"}'
curl -H "Content-type: application/octet-stream" \
-X POST http://127.0.0.1:5000/messages --data-binary @message.bin
from flask import Response
@app.route('/hello', methods = ['GET'])
def api_hello():
data = {
'hello' : 'world',
'number' : 3
}
js = json.dumps(data)
resp = Response(js, status=200, mimetype='application/json')
resp.headers['Link'] = 'http://luisrei.com'
return resp
查看response HTTP headers:
curl -i http://127.0.0.1:5000/hello
优化代码:
from flask import jsonify
使用
resp = jsonify(data)
resp.status_code = 200
替换
resp = Response(js, status=200, mimetype='application/json')
@app.errorhandler(404)
def not_found(error=None):
message = {
'status': 404,
'message': 'Not Found: ' + request.url,
}
resp = jsonify(message)
resp.status_code = 404
return resp
@app.route('/users/<userid>', methods = ['GET'])
def api_users(userid):
users = {'1':'john', '2':'steve', '3':'bill'}
if userid in users:
return jsonify({userid:users[userid]})
else:
return not_found()
请求:
GET /users/2
HTTP/1.0 200 OK
{
"2": "steve"
}
GET /users/4
HTTP/1.0 404 NOT FOUND
{
"status": 404,
"message": "Not Found: http://127.0.0.1:5000/users/4"
}
from functools import wraps
def check_auth(username, password):
return username == 'admin' and password == 'secret'
def authenticate():
message = {'message': "Authenticate."}
resp = jsonify(message)
resp.status_code = 401
resp.headers['WWW-Authenticate'] = 'Basic realm="Example"'
return resp
def requires_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
auth = request.authorization
if not auth:
return authenticate()
elif not check_auth(auth.username, auth.password):
return authenticate()
return f(*args, **kwargs)
return decorated
replacing the check_auth
function and using the requires_auth
decorator:
@app.route('/secrets')
@requires_auth
def api_hello():
return "Shhh this is top secret spy stuff!"
HTTP basic authentication:
curl -v -u "admin:secret" http://127.0.0.1:5000/secrets
Debug:
app.run(debug=True)
Logging:
import logging
file_handler = logging.FileHandler('app.log')
app.logger.addHandler(file_handler)
app.logger.setLevel(logging.INFO)
@app.route('/hello', methods = ['GET'])
def api_hello():
app.logger.info('informing')
app.logger.warning('warning')
app.logger.error('screaming bloody murder!')
return "check your logs\n"