本節目錄
常用函數一:redis操作
常用函數二:mongodb操作
常用函數三:資料庫連接配接池操作
常用函數四:pandas連接配接資料庫
常用函數五:異步連接配接資料庫
常用函數一:redis操作
# -*- coding: utf-8 -*-
"""
Datetime: 2020/07/06
Author: Zhang Yafei
Description:
"""
import redis
def get_redis_conn():
conn = redis.Redis(host='127.0.0.1', port=6379)
return conn
def get_redis_conn_pool():
pool = redis.ConnectionPool(host='127.0.0.1', port=6379, max_connections=1000)
# max_connection最多建立1000個連接配接
conn = redis.Redis(connection_pool=pool)
return conn
def redis_string_practice():
conn = get_redis_conn_pool()
# 添加
conn.set('str_k', 'hello') # 為指定key設定value
# {'str_k':'hello'}
conn.mset({'str_k': 'hello', 'str_k1': 'world'}) # 設定多個key/value
# {'str_k':'hello', 'str_k1':'world'}
conn.msetnx({'str_k': 'msetnx_hello'}) # 若目前key未設定, 則基于mapping設定key/value,結果傳回True或False
# {'str_k':'hello'}
conn.setex('str_k2', 'str_v2', 2) # 秒
conn.decr('num', amount=1)
conn.incr('num', amount=1)
conn.incrbyfloat('num', amount='1.5')
# 删除
conn.delete('str_k1')
# 修改
conn.append('str_k', ' world') # 為指定key添加value
# {'str_k':'hello world'}
conn.setrange('str_k', 5, 'world') # 在key對應的的value指定位置上設定值
# b'helloworld'
# 查詢
print(conn.get('str_k'))
print(conn.get('num'))
print(conn.getrange('str_k', 0, 100))
print(conn.keys())
print(conn.strlen('str_k')) # 長度
print(conn.exists('str_k'))
conn.expire('str_k1', 5)
print(conn.get('str_k1'))
# 添加并查詢
print(conn.getset('str_k2', 'str_v2'))
# b'str_v2'
def redis_dict_practice():
"""
redis dict
redis = {
k4:{
'username': 'zhangyafei',
'age': 23,
}
}
"""
conn = get_redis_conn_pool()
# 1. 建立字典
conn.hset('k4','username','zhangyafei')
conn.hset('k4','age',23)
conn.hsetnx('k4','username','root') # 若key不存在則将value指派給key, 如果指派成功則傳回1,否則傳回0
conn.hsetnx('k4', 'hobby', 'basketball')
conn.hmset('k4',{'username':'zhangyafei','age':23})
# 2. 擷取字典的值
# 擷取一個值
val = conn.hget('k4', 'username') # b'zhangyafei'
# print(val)
# 擷取多個值
vals = conn.mget('k4', ['username','age'])
vals = conn.mget('k4', 'username','age') # {b'username': b'zhangyafei', b'age': b'23'}
# 擷取所有值
vals = conn.hgetall('k4') # {b'username': b'zhangyafei', b'age': b'23'}
print(vals)
# 擷取長度
lens = conn.hlen('k4') # 2
str_lens = conn.hstrlen('k4', 'username') # 10
keys = conn.hkeys('k4') # [b'username', b'age']
values = conn.hvals('k4') # [b'zhangyafei', b'23']
judge = conn.hexists('k4', 'username') # True
# conn.hdel('k4', 'age', 'username')
# print(conn.hkeys('k4')) # []
# 電腦
# print(conn.hget('k4', 'age'))
# conn.hincrby('k4','age',amount=2)
# conn.hincrbyfloat('k4','age',amount=-1.5)
# print(conn.hget('k4', 'age'))
# 問題:如果redis的k4對應的字典中有1000w條資料,請列印所有資料
# 不可取:redis取到資料之後,伺服器記憶體無法承受,爆棧
# result = conn.hgetall('k4')
# print(result)
for item in conn.hscan_iter('k4'):
print(item)
def redis_list_practice():
"""
redis list
redis = {
k1: [1,2,3,]
}
"""
conn = get_redis_conn_pool()
# 左插入
conn.lpush('k1', 11)
conn.lpush('k1', 22)
# 右插入
conn.rpush('k1', 33)
# 左擷取
val = conn.lpop('k1')
val = conn.blpop('k1', timeout=10) # 夯住
# 右擷取
val = conn.rpop('k1')
val = conn.brpop('k1', timeout=10) # 夯住
conn.lpush('k1',*[12,3,1,21,21,1,212,11,1,1,1,2,2,34,5,5,5])
def list_iter(key, count=3):
index = 0
while True:
data_list = conn.lrange(key, index, index + count - 1)
if not data_list:
return
index += count
for item in data_list:
yield item
result = conn.lrange('k1', 0, 100)
print(result) # [b'22', b'11', b'33']
for item in list_iter('k1', 3):
print(item)
def redis_pipeline_practice():
"""
pipeline:管道,也即事務。一次放多個值,一次執行所有管道中的操作,要麼全部成功,要麼全部失
"""
conn = get_redis_conn_pool()
pipe = conn.pipeline(transaction=True)
pipe.multi()
pipe.set('k2', '123')
pipe.hset('k3', 'n1', 666)
pipe.lpush('k4', 'laonanhai')
pipe.execute()
def redis_set_practice():
"""
{
'set_k':{v1,v2,v3},
}
"""
conn = get_redis_conn_pool()
# 添加
conn.sadd('set_k', 3, 4, 5, 6)
conn.sadd('set_k1', 3, 4, 5, 6)
# 删除
print(conn.spop('set_k'))
conn.srem('set_k', 2)
# 修改
conn.smove('set_k', 'set_k1', 1)
# 查詢
print(conn.smembers('set_k'))
print(conn.smembers('set_k1'))
print(conn.srandmember('set_k', 3))
print(conn.scard('set_k'))
print(conn.sismember('set_k', 2))
print(conn.sdiff('set_k', 'set_k1')) # 集合之差
conn.sdiffstore('set_k_k1', 'set_k', 'set_k1')
print(conn.smembers('set_k_k1'))
print(conn.sinter('set_k', 'set_k1')) # 集合交集
conn.sinterstore('set_k_k1_inter', 'set_k', 'set_k1')
print(conn.smembers('set_k_k1_inter'))
print(conn.sunion('set_k', 'set_k1')) # 集合并集
conn.sunionstore('set_k_k1_union', 'set_k', 'set_k1')
print(conn.smembers('set_k_k1_union'))
def redis_zset_practice():
"""
{
'set_k':{
{v1: score1},
{v2: score2},
{v3: score3},
},
}
"""
conn = get_redis_conn_pool()
# # 添加
# conn.zadd('zset_k', 'math', 99, 'english', 80, 'chinese', 85, 'sport', 100, 'music', 60)
#
# # 删除
# conn.zrem('zset_k', 'music')
# conn.zremrangebyrank('zset_k', 0, 0) # 按等級大小删除, 删除等級在第min-max個值
# conn.zremrangebyscore('zset_k', 0, 90) # 按分數範圍删除, Min < x < max之間的删除
# 查詢
print(conn.zrange('zset_k', 0, 100))
print(conn.zrevrange('zset_k', 0, 100))
# score從小到大排序, 預設小值先出, 廣度優先
results = conn.zrangebyscore('zset_k', 0, 100)
print(results)
print(conn.zcard('zset_k'))
print(conn.zcount('zset_k', 0, 90))
print(conn.zrank('zset_k', 'chinese'))
print(conn.zscore('zset_k', 'chinese'))
print(conn.zrange('zset_k', 0, 100))
if __name__ == '__main__':
redis_string_practice()
常用函數二:mongodb操作
import json
import pymongo
import pandas as pd
class MongoPipeline(object):
"""
mongodb:
save(self, data, collection): 将資料儲存到資料庫
read(self, data): 讀取資料庫中指定表格
insert(self, table, dict_data): 插入資料
delete(self, table, condition): 删除指定資料
update(self, table, condition, new_dict_data): 更新指定資料
dbFind(self, table, condition=None): 按條件查找
findAll(self, table): 查找全部
close(self): 關閉連接配接
"""
def __init__(self, mongo_db, mongo_uri='localhost'):
self.mongo_uri = mongo_uri
self.mongo_db = mongo_db
self.client = pymongo.MongoClient(self.mongo_uri)
self.db = self.client[self.mongo_db]
def close(self):
"""
關閉連接配接
:return:
"""
self.client.close()
def save(self, data, collection):
"""
将資料儲存到資料庫表
:param data:
:param collection:
:return: None
"""
self.collection = self.db[collection]
try:
if self.collection.insert(json.loads(data.T.to_json()).values()):
print('mongodb insert {} sucess.'.format(collection))
return
except Exception as e:
print('insert error:', e)
import traceback
traceback.print_exc(e)
def read(self, table):
"""
讀取資料庫中的資料
:param table:
:return: dataframe
"""
try:
# 連接配接資料庫
table = self.db[table]
# 讀取資料
data = pd.DataFrame(list(table.find()))
return data
except Exception as e:
import traceback
traceback.print_exc(e)
def insert(self, table, dict_data):
"""
插入
:param table:
:param dict_data:
:return: None
"""
try:
self.db[table].insert(dict_data)
print("插入成功")
except Exception as e:
print(e)
def update(self,table, condition, new_dict_data):
"""
更新
:param table:
:param dict_data:
:param new_dict_data:
:return: None
"""
try:
self.db[table].update(condition, new_dict_data)
print("更新成功")
except Exception as e:
print(e)
def delete(self,table, condition):
"""
删除
:param table:
:param dict_data:
:return: None
"""
try:
self.db[table].remove(condition)
print("删除成功")
except Exception as e:
print(e)
def dbFind(self, table, condition=None):
"""
按條件查找
:param table:
:param dict_data:
:return: generator dict
"""
data = self.db[table].find(condition)
for item in data:
yield item
def findAll(self, table):
"""
查找全部
:param table:
:return: generator dict
"""
for item in self.db[table].find():
yield item
if __name__ == '__main__':
mongo = MongoPipeline('flask')
# data = mongo.read('label')
# print(data.head())
condition = {"藥品ID": 509881}
data = mongo.dbFind('label', condition)
print(data)
for i in data:
print(i)
# mongo.findAll()
常用操作三:資料連接配接池操作
# -*- coding: utf-8 -*-
"""
Datetime: 2020/07/06
Author: Zhang Yafei
Description: DButils連接配接池
"""
from DBUtils.PooledDB import PooledDB
class DBPoolHelper(object):
def __init__(self, dbname, user=None, password=None, db_type='postgressql', host='localhost', port=5432):
"""
# sqlite3
# 連接配接資料庫檔案名,sqlite不支援加密,不使用使用者名和密碼
import sqlite3
config = {"datanase": "path/to/your/dbname.db"}
pool = PooledDB(sqlite3, maxcached=50, maxconnections=1000, maxusage=1000, **config)
# mysql
import pymysql
pool = PooledDB(pymysql,5,host='localhost', user='root',passwd='pwd',db='myDB',port=3306) #5為連接配接池裡的最少連接配接數
# postgressql
import psycopg2
POOL = PooledDB(creator=psycopg2, host="127.0.0.1", port="5342", user, password, database)
# sqlserver
import pymssql
pool = PooledDB(creator=pymssql, host=host, port=port, user=user, password=password, database=database, charset="utf8")
:param type:
"""
if db_type == 'postgressql':
import psycopg2
pool = PooledDB(creator=psycopg2, host=host, port=port, user=user, password=password, database=dbname)
elif db_type == 'mysql':
import pymysql
pool = PooledDB(pymysql, 5, host='localhost', user='root', passwd='pwd', db='myDB',
port=3306) # 5為連接配接池裡的最少連接配接數
elif db_type == 'sqlite':
import sqlite3
config = {"database": dbname}
pool = PooledDB(sqlite3, maxcached=50, maxconnections=1000, maxusage=1000, **config)
else:
raise Exception('請輸入正确的資料庫類型, db_type="postgresql" or db_type="mysql" or db_type="sqlite"')
self.__conn = pool.connection()
self.__cursor = self.__conn.cursor()
def __connect_close(self):
"""關閉連接配接"""
self.__cursor.close()
self.__conn.close()
def commit(self):
self.__conn.commit()
def execute_without_commit(self, sql, params=tuple()):
self.__cursor.execute(sql, params)
def execute(self, sql, params=tuple()):
self.__cursor.execute(sql, params)
self.__conn.commit()
def execute_many(self, sql, params=tuple()):
self.__cursor.executemany(sql, params)
self.__conn.commit()
def fetchone(self, sql, params=tuple()):
self.__cursor.execute(sql, params)
data = self.__cursor.fetchone()
return data
def fetchall(self, sql, params=tuple()):
self.__cursor.execute(sql, params)
data = self.__cursor.fetchall()
return data
def __del__(self):
print("connect close ----------------")
self.__connect_close()
常用操作四:pandas連接配接資料庫
# -*- coding: utf-8 -*-
"""
Datetime: 2020/07/06
Author: Zhang Yafei
Description: pandas連接配接資料庫
"""
from sqlalchemy import create_engine
from pandas import read_sql
def pandas_db_helper():
"""
'postgresql://postgres:[email protected]:5432/xiaomuchong'
"mysql+pymysql://root:[email protected]:3306/srld?charset=utf8mb4"
"sqlite: ///sqlite3.db"
"""
engine = create_engine("sqlite:///sqlite3.db")
conn = engine.connect()
return conn
if __name__ == '__main__':
conn = pandas_db_helper()
data = read_sql("select * from articles", con=conn)
print(data.info())
常用函數五:異步連接配接資料庫
redis
# -*- coding: utf-8 -*-
"""
Datetime: 2020/07/26
Author: Zhang Yafei
Description:
"""
import asyncio
import aioredis
async def execute(address, password):
print("開始執行", address)
# 網絡IO操作:建立redis連接配接
redis = await aioredis.create_redis(address, password=password)
# 網絡IO操作:在redis中設定哈希值car,内部在設三個鍵值對,即: redis = { car:{key1:1,key2:2,key3:3}}
await redis.hmset_dict('car', key1=1, key2=2, key3=3)
# 網絡IO操作:去redis中擷取值
result = await redis.hgetall('car', encoding='utf-8')
print(result)
redis.close()
# 網絡IO操作:關閉redis連接配接
await redis.wait_closed()
print("結束", address)
asyncio.run(execute('redis://127.0.0.1:6379', "0000"))
redis_pool
# -*- coding: utf-8 -*-
"""
Datetime: 2020/07/26
Author: Zhang Yafei
Description:
"""
import asyncio
import aioredis
async def execute(address, password):
print("開始執行", address)
# 網絡IO操作:先去連接配接 127.0.0.1:6379,遇到IO則自動切換任務,去連接配接127.0.0.1:6379
redis = await aioredis.create_redis_pool(address, password=password)
# 網絡IO操作:遇到IO會自動切換任務
await redis.hmset_dict('car', key1=1, key2=2, key3=3)
# 網絡IO操作:遇到IO會自動切換任務
result = await redis.hgetall('car', encoding='utf-8')
print(result)
redis.close()
# 網絡IO操作:遇到IO會自動切換任務
await redis.wait_closed()
print("結束", address)
task_list = [
execute('redis://127.0.0.1:6379', "0000"),
execute('redis://127.0.0.1:6379', "0000")
]
asyncio.run(asyncio.wait(task_list))
mysql
# -*- coding: utf-8 -*-
"""
Datetime: 2020/07/26
Author: Zhang Yafei
Description:
"""
import asyncio
import aiomysql
async def execute():
# 網絡IO操作:連接配接MySQL
conn = await aiomysql.connect(host='127.0.0.1', port=3306, user='root', password='0000', db='mysql', )
# 網絡IO操作:建立CURSOR
cur = await conn.cursor()
# 網絡IO操作:執行SQL
await cur.execute("SELECT Host,User FROM user")
# 網絡IO操作:擷取SQL結果
result = await cur.fetchall()
print(result)
# 網絡IO操作:關閉連結
await cur.close()
conn.close()
asyncio.run(execute())
mysql_pool
# -*- coding: utf-8 -*-
"""
Datetime: 2020/07/26
Author: Zhang Yafei
Description:
"""
import asyncio
import aiomysql
async def execute(host, password):
print("開始", host)
# 網絡IO操作:先去連接配接 188.176.202.180,遇到IO則自動切換任務,去連接配接188.176.202.181
conn = await aiomysql.connect(host=host, port=3306, user='root', password=password, db='mysql')
# 網絡IO操作:遇到IO會自動切換任務
cur = await conn.cursor()
# 網絡IO操作:遇到IO會自動切換任務
await cur.execute("SELECT Host,User FROM user")
# 網絡IO操作:遇到IO會自動切換任務
result = await cur.fetchall()
print(result)
# 網絡IO操作:遇到IO會自動切換任務
await cur.close()
conn.close()
print("結束", host)
task_list = [
execute('127.0.0.1', "0000"),
execute('127.0.0.1', "0000")
]
asyncio.run(asyncio.wait(task_list))