天天看点

FastApi 和 Flask 简单图像处理实战使用

FastApi

import uvicorn
from fastapi import FastAPI
from starlette.middleware.cors import CORSMiddleware
from fastapi import File,UploadFile,Form # 上传文件需要使用,格式为form-data
from pydantic import BaseModel # 上传json时使用   
from typing import Optional # 用于可选参数

app = FastAPI() # 初始化 
app.add_middleware( # 解决前端跨域
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"])

class fun_Base(BaseModel):
    text: str
    number: int

def a_fun (img,number):
    print('this is infer',img,'Page:',number)
    return img

class a_class:
    def __init__(self):
        self.model = None
        print('load for your model')
    def infer (self,cv_img):
        print('use',self.model,'infer',cv_img)
        return cv_img

# 接收 json 格式
@app.post("/url_path") # 地址
async def text_Symbol(text_base: fun_Base):
    text_res = a_fun(text_base.text,text_base.number)
    return {"code": 200, "msg": 'success', "result": {"punc_text":text_res}}

import cv2
import numpy as np
a_pre = a_class()
# 接收 form-data 格式
@app.post("/paddlespeech/clas") # 地址可以写多个
async def clas_examine (choice:str=Form(...),text:Optional[str] = Form(None),file: UploadFile = File(...)):
    #                必填参数,且接收必须是str    选填参数,必须是str,默认为None      file文件

    cv_img = cv2.imdecode(np.fromstring(file.file.read(), np.uint8), cv2.IMREAD_COLOR) # file文件直转cv2
    filename = file.filename # 获取文件名
    with open(filename,'wb') as f: f.write(file.read()) # 直接保存 file文件
    cv_img = a_pre.infer(cv_img)
    return {"code": 200, "msg": 'success', "result": {"punc_text":cv_img}}

if __name__ == '__main__':
    uvicorn.run(app, host="0.0.0.0", port=9556)
           

Flask

from flask import Flask,request,jsonify
from flask_cors import CORS
app = Flask(__name__)

@app.after_request
def cors(environ):
    environ.headers['Access-Control-Allow-Origin']='*'
    environ.headers['Access-Control-Allow-Method']='*'
    environ.headers['Access-Control-Allow-Headers']='x-requested-with,content-type'
    return environ
CORS(app, supports_credentials=True)

from PIL import Image
@app.route("/token_embedding",methods=["POST",])
def token_embed():
    data = request.get_json() # 接收json格式
    str_ = request.form.get('str_') # 接收form-data中的字符串
    file = request.files['file'] # 接收form-data中的file文件
    filename = file.filename # 获取文件名
    file.save(filename) # 直接保存 file文件
    image = Image.open(request.files['file']) # file文件直转PIL

    files = request.files.getlist("files") # 接收多个file文件
    for one_file in files:
        print(one_file)
    
    return jsonify({"code": 200, "msg": 'success', "result": ''})


if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8922, debug=False)
           

定时删除文件

import time
import os
from threading import Thread
# 文件存储时,避免文件名重复,可以使用 str(random.randint(0,100))+str(int(time.time()*1000000))+'.jpg'
def deletefile(path,N):
    for eachfile in os.listdir(path):
        filename = os.path.join(path, eachfile)
        if os.path.isfile(filename):
            lastmodifytime = os.stat(filename).st_mtime
            # print(lastmodifytime)
            # 设置删除多久之前的文件
            endfiletime = time.time() - 3600 * 24 * N
            if endfiletime > lastmodifytime:
                os.remove(filename)
                print("删除文件 %s 成功" % filename)
        # 如果是目录则递归调用当前函数
        elif os.path.isdir(filename):
            deletefile(filename,N)
            os.rmdir(filename)

def del_run (path,N):
    while True:
        try:
            deletefile(path,N)
        except:
            print('error')
            continue
        time.sleep(60*60*24)

if __name__ == '__main__':
    thread = Thread(target=del_run,args=([r'/local/tts_file',3]))
    thread.start()
    while True:
        pass
           

继续阅读