FastApi
import uvicorn
from fastapi import FastAPI
from starlette.middleware.cors import CORSMiddleware
from fastapi import File,UploadFile,Form # 上传文件需要使用,格式为form-data
from pydantic import BaseModel # 上传json时使用
from typing import Optional # 用于可选参数
app = FastAPI() # 初始化
app.add_middleware( # 解决前端跨域
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"])
class fun_Base(BaseModel):
text: str
number: int
def a_fun (img,number):
print('this is infer',img,'Page:',number)
return img
class a_class:
def __init__(self):
self.model = None
print('load for your model')
def infer (self,cv_img):
print('use',self.model,'infer',cv_img)
return cv_img
# 接收 json 格式
@app.post("/url_path") # 地址
async def text_Symbol(text_base: fun_Base):
text_res = a_fun(text_base.text,text_base.number)
return {"code": 200, "msg": 'success', "result": {"punc_text":text_res}}
import cv2
import numpy as np
a_pre = a_class()
# 接收 form-data 格式
@app.post("/paddlespeech/clas") # 地址可以写多个
async def clas_examine (choice:str=Form(...),text:Optional[str] = Form(None),file: UploadFile = File(...)):
# 必填参数,且接收必须是str 选填参数,必须是str,默认为None file文件
cv_img = cv2.imdecode(np.fromstring(file.file.read(), np.uint8), cv2.IMREAD_COLOR) # file文件直转cv2
filename = file.filename # 获取文件名
with open(filename,'wb') as f: f.write(file.read()) # 直接保存 file文件
cv_img = a_pre.infer(cv_img)
return {"code": 200, "msg": 'success', "result": {"punc_text":cv_img}}
if __name__ == '__main__':
uvicorn.run(app, host="0.0.0.0", port=9556)
Flask
from flask import Flask,request,jsonify
from flask_cors import CORS
app = Flask(__name__)
@app.after_request
def cors(environ):
environ.headers['Access-Control-Allow-Origin']='*'
environ.headers['Access-Control-Allow-Method']='*'
environ.headers['Access-Control-Allow-Headers']='x-requested-with,content-type'
return environ
CORS(app, supports_credentials=True)
from PIL import Image
@app.route("/token_embedding",methods=["POST",])
def token_embed():
data = request.get_json() # 接收json格式
str_ = request.form.get('str_') # 接收form-data中的字符串
file = request.files['file'] # 接收form-data中的file文件
filename = file.filename # 获取文件名
file.save(filename) # 直接保存 file文件
image = Image.open(request.files['file']) # file文件直转PIL
files = request.files.getlist("files") # 接收多个file文件
for one_file in files:
print(one_file)
return jsonify({"code": 200, "msg": 'success', "result": ''})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8922, debug=False)
定时删除文件
import time
import os
from threading import Thread
# 文件存储时,避免文件名重复,可以使用 str(random.randint(0,100))+str(int(time.time()*1000000))+'.jpg'
def deletefile(path,N):
for eachfile in os.listdir(path):
filename = os.path.join(path, eachfile)
if os.path.isfile(filename):
lastmodifytime = os.stat(filename).st_mtime
# print(lastmodifytime)
# 设置删除多久之前的文件
endfiletime = time.time() - 3600 * 24 * N
if endfiletime > lastmodifytime:
os.remove(filename)
print("删除文件 %s 成功" % filename)
# 如果是目录则递归调用当前函数
elif os.path.isdir(filename):
deletefile(filename,N)
os.rmdir(filename)
def del_run (path,N):
while True:
try:
deletefile(path,N)
except:
print('error')
continue
time.sleep(60*60*24)
if __name__ == '__main__':
thread = Thread(target=del_run,args=([r'/local/tts_file',3]))
thread.start()
while True:
pass