天天看点

python数据库self函数_Python常用功能函数系列总结(四)之数据库操作

本节目录

常用函数一:redis操作

常用函数二:mongodb操作

常用函数三:数据库连接池操作

常用函数四:pandas连接数据库

常用函数五:异步连接数据库

常用函数一:redis操作

# -*- coding: utf-8 -*-

"""

Datetime: 2020/07/06

Author: Zhang Yafei

Description:

"""

import redis

def get_redis_conn():

conn = redis.Redis(host='127.0.0.1', port=6379)

return conn

def get_redis_conn_pool():

pool = redis.ConnectionPool(host='127.0.0.1', port=6379, max_connections=1000)

# max_connection最多创建1000个连接

conn = redis.Redis(connection_pool=pool)

return conn

def redis_string_practice():

conn = get_redis_conn_pool()

# 添加

conn.set('str_k', 'hello') # 为指定key设置value

# {'str_k':'hello'}

conn.mset({'str_k': 'hello', 'str_k1': 'world'}) # 设置多个key/value

# {'str_k':'hello', 'str_k1':'world'}

conn.msetnx({'str_k': 'msetnx_hello'}) # 若当前key未设定, 则基于mapping设置key/value,结果返回True或False

# {'str_k':'hello'}

conn.setex('str_k2', 'str_v2', 2) # 秒

conn.decr('num', amount=1)

conn.incr('num', amount=1)

conn.incrbyfloat('num', amount='1.5')

# 删除

conn.delete('str_k1')

# 修改

conn.append('str_k', ' world') # 为指定key添加value

# {'str_k':'hello world'}

conn.setrange('str_k', 5, 'world') # 在key对应的的value指定位置上设置值

# b'helloworld'

# 查询

print(conn.get('str_k'))

print(conn.get('num'))

print(conn.getrange('str_k', 0, 100))

print(conn.keys())

print(conn.strlen('str_k')) # 长度

print(conn.exists('str_k'))

conn.expire('str_k1', 5)

print(conn.get('str_k1'))

# 添加并查询

print(conn.getset('str_k2', 'str_v2'))

# b'str_v2'

def redis_dict_practice():

"""

redis dict

redis = {

k4:{

'username': 'zhangyafei',

'age': 23,

}

}

"""

conn = get_redis_conn_pool()

# 1. 创建字典

conn.hset('k4','username','zhangyafei')

conn.hset('k4','age',23)

conn.hsetnx('k4','username','root') # 若key不存在则将value赋值给key, 如果赋值成功则返回1,否则返回0

conn.hsetnx('k4', 'hobby', 'basketball')

conn.hmset('k4',{'username':'zhangyafei','age':23})

# 2. 获取字典的值

# 获取一个值

val = conn.hget('k4', 'username') # b'zhangyafei'

# print(val)

# 获取多个值

vals = conn.mget('k4', ['username','age'])

vals = conn.mget('k4', 'username','age') # {b'username': b'zhangyafei', b'age': b'23'}

# 获取所有值

vals = conn.hgetall('k4') # {b'username': b'zhangyafei', b'age': b'23'}

print(vals)

# 获取长度

lens = conn.hlen('k4') # 2

str_lens = conn.hstrlen('k4', 'username') # 10

keys = conn.hkeys('k4') # [b'username', b'age']

values = conn.hvals('k4') # [b'zhangyafei', b'23']

judge = conn.hexists('k4', 'username') # True

# conn.hdel('k4', 'age', 'username')

# print(conn.hkeys('k4')) # []

# 计算器

# print(conn.hget('k4', 'age'))

# conn.hincrby('k4','age',amount=2)

# conn.hincrbyfloat('k4','age',amount=-1.5)

# print(conn.hget('k4', 'age'))

# 问题:如果redis的k4对应的字典中有1000w条数据,请打印所有数据

# 不可取:redis取到数据之后,服务器内存无法承受,爆栈

# result = conn.hgetall('k4')

# print(result)

for item in conn.hscan_iter('k4'):

print(item)

def redis_list_practice():

"""

redis list

redis = {

k1: [1,2,3,]

}

"""

conn = get_redis_conn_pool()

# 左插入

conn.lpush('k1', 11)

conn.lpush('k1', 22)

# 右插入

conn.rpush('k1', 33)

# 左获取

val = conn.lpop('k1')

val = conn.blpop('k1', timeout=10) # 夯住

# 右获取

val = conn.rpop('k1')

val = conn.brpop('k1', timeout=10) # 夯住

conn.lpush('k1',*[12,3,1,21,21,1,212,11,1,1,1,2,2,34,5,5,5])

def list_iter(key, count=3):

index = 0

while True:

data_list = conn.lrange(key, index, index + count - 1)

if not data_list:

return

index += count

for item in data_list:

yield item

result = conn.lrange('k1', 0, 100)

print(result) # [b'22', b'11', b'33']

for item in list_iter('k1', 3):

print(item)

def redis_pipeline_practice():

"""

pipeline:管道,也即事务。一次放多个值,一次执行所有管道中的操作,要么全部成功,要么全部失

"""

conn = get_redis_conn_pool()

pipe = conn.pipeline(transaction=True)

pipe.multi()

pipe.set('k2', '123')

pipe.hset('k3', 'n1', 666)

pipe.lpush('k4', 'laonanhai')

pipe.execute()

def redis_set_practice():

"""

{

'set_k':{v1,v2,v3},

}

"""

conn = get_redis_conn_pool()

# 添加

conn.sadd('set_k', 3, 4, 5, 6)

conn.sadd('set_k1', 3, 4, 5, 6)

# 删除

print(conn.spop('set_k'))

conn.srem('set_k', 2)

# 修改

conn.smove('set_k', 'set_k1', 1)

# 查询

print(conn.smembers('set_k'))

print(conn.smembers('set_k1'))

print(conn.srandmember('set_k', 3))

print(conn.scard('set_k'))

print(conn.sismember('set_k', 2))

print(conn.sdiff('set_k', 'set_k1')) # 集合之差

conn.sdiffstore('set_k_k1', 'set_k', 'set_k1')

print(conn.smembers('set_k_k1'))

print(conn.sinter('set_k', 'set_k1')) # 集合交集

conn.sinterstore('set_k_k1_inter', 'set_k', 'set_k1')

print(conn.smembers('set_k_k1_inter'))

print(conn.sunion('set_k', 'set_k1')) # 集合并集

conn.sunionstore('set_k_k1_union', 'set_k', 'set_k1')

print(conn.smembers('set_k_k1_union'))

def redis_zset_practice():

"""

{

'set_k':{

{v1: score1},

{v2: score2},

{v3: score3},

},

}

"""

conn = get_redis_conn_pool()

# # 添加

# conn.zadd('zset_k', 'math', 99, 'english', 80, 'chinese', 85, 'sport', 100, 'music', 60)

#

# # 删除

# conn.zrem('zset_k', 'music')

# conn.zremrangebyrank('zset_k', 0, 0) # 按等级大小删除, 删除等级在第min-max个值

# conn.zremrangebyscore('zset_k', 0, 90) # 按分数范围删除, Min < x < max之间的删除

# 查询

print(conn.zrange('zset_k', 0, 100))

print(conn.zrevrange('zset_k', 0, 100))

# score从小到大排序, 默认小值先出, 广度优先

results = conn.zrangebyscore('zset_k', 0, 100)

print(results)

print(conn.zcard('zset_k'))

print(conn.zcount('zset_k', 0, 90))

print(conn.zrank('zset_k', 'chinese'))

print(conn.zscore('zset_k', 'chinese'))

print(conn.zrange('zset_k', 0, 100))

if __name__ == '__main__':

redis_string_practice()

常用函数二:mongodb操作

import json

import pymongo

import pandas as pd

class MongoPipeline(object):

"""

mongodb:

save(self, data, collection): 将数据保存到数据库

read(self, data): 读取数据库中指定表格

insert(self, table, dict_data): 插入数据

delete(self, table, condition): 删除指定数据

update(self, table, condition, new_dict_data): 更新指定数据

dbFind(self, table, condition=None): 按条件查找

findAll(self, table): 查找全部

close(self): 关闭连接

"""

def __init__(self, mongo_db, mongo_uri='localhost'):

self.mongo_uri = mongo_uri

self.mongo_db = mongo_db

self.client = pymongo.MongoClient(self.mongo_uri)

self.db = self.client[self.mongo_db]

def close(self):

"""

关闭连接

:return:

"""

self.client.close()

def save(self, data, collection):

"""

将数据保存到数据库表

:param data:

:param collection:

:return: None

"""

self.collection = self.db[collection]

try:

if self.collection.insert(json.loads(data.T.to_json()).values()):

print('mongodb insert {} sucess.'.format(collection))

return

except Exception as e:

print('insert error:', e)

import traceback

traceback.print_exc(e)

def read(self, table):

"""

读取数据库中的数据

:param table:

:return: dataframe

"""

try:

# 连接数据库

table = self.db[table]

# 读取数据

data = pd.DataFrame(list(table.find()))

return data

except Exception as e:

import traceback

traceback.print_exc(e)

def insert(self, table, dict_data):

"""

插入

:param table:

:param dict_data:

:return: None

"""

try:

self.db[table].insert(dict_data)

print("插入成功")

except Exception as e:

print(e)

def update(self,table, condition, new_dict_data):

"""

更新

:param table:

:param dict_data:

:param new_dict_data:

:return: None

"""

try:

self.db[table].update(condition, new_dict_data)

print("更新成功")

except Exception as e:

print(e)

def delete(self,table, condition):

"""

删除

:param table:

:param dict_data:

:return: None

"""

try:

self.db[table].remove(condition)

print("删除成功")

except Exception as e:

print(e)

def dbFind(self, table, condition=None):

"""

按条件查找

:param table:

:param dict_data:

:return: generator dict

"""

data = self.db[table].find(condition)

for item in data:

yield item

def findAll(self, table):

"""

查找全部

:param table:

:return: generator dict

"""

for item in self.db[table].find():

yield item

if __name__ == '__main__':

mongo = MongoPipeline('flask')

# data = mongo.read('label')

# print(data.head())

condition = {"药品ID": 509881}

data = mongo.dbFind('label', condition)

print(data)

for i in data:

print(i)

# mongo.findAll()

常用操作三:数据连接池操作

# -*- coding: utf-8 -*-

"""

Datetime: 2020/07/06

Author: Zhang Yafei

Description: DButils连接池

"""

from DBUtils.PooledDB import PooledDB

class DBPoolHelper(object):

def __init__(self, dbname, user=None, password=None, db_type='postgressql', host='localhost', port=5432):

"""

# sqlite3

# 连接数据库文件名,sqlite不支持加密,不使用用户名和密码

import sqlite3

config = {"datanase": "path/to/your/dbname.db"}

pool = PooledDB(sqlite3, maxcached=50, maxconnections=1000, maxusage=1000, **config)

# mysql

import pymysql

pool = PooledDB(pymysql,5,host='localhost', user='root',passwd='pwd',db='myDB',port=3306) #5为连接池里的最少连接数

# postgressql

import psycopg2

POOL = PooledDB(creator=psycopg2, host="127.0.0.1", port="5342", user, password, database)

# sqlserver

import pymssql

pool = PooledDB(creator=pymssql, host=host, port=port, user=user, password=password, database=database, charset="utf8")

:param type:

"""

if db_type == 'postgressql':

import psycopg2

pool = PooledDB(creator=psycopg2, host=host, port=port, user=user, password=password, database=dbname)

elif db_type == 'mysql':

import pymysql

pool = PooledDB(pymysql, 5, host='localhost', user='root', passwd='pwd', db='myDB',

port=3306) # 5为连接池里的最少连接数

elif db_type == 'sqlite':

import sqlite3

config = {"database": dbname}

pool = PooledDB(sqlite3, maxcached=50, maxconnections=1000, maxusage=1000, **config)

else:

raise Exception('请输入正确的数据库类型, db_type="postgresql" or db_type="mysql" or db_type="sqlite"')

self.__conn = pool.connection()

self.__cursor = self.__conn.cursor()

def __connect_close(self):

"""关闭连接"""

self.__cursor.close()

self.__conn.close()

def commit(self):

self.__conn.commit()

def execute_without_commit(self, sql, params=tuple()):

self.__cursor.execute(sql, params)

def execute(self, sql, params=tuple()):

self.__cursor.execute(sql, params)

self.__conn.commit()

def execute_many(self, sql, params=tuple()):

self.__cursor.executemany(sql, params)

self.__conn.commit()

def fetchone(self, sql, params=tuple()):

self.__cursor.execute(sql, params)

data = self.__cursor.fetchone()

return data

def fetchall(self, sql, params=tuple()):

self.__cursor.execute(sql, params)

data = self.__cursor.fetchall()

return data

def __del__(self):

print("connect close ----------------")

self.__connect_close()

常用操作四:pandas连接数据库

# -*- coding: utf-8 -*-

"""

Datetime: 2020/07/06

Author: Zhang Yafei

Description: pandas连接数据库

"""

from sqlalchemy import create_engine

from pandas import read_sql

def pandas_db_helper():

"""

'postgresql://postgres:[email protected]:5432/xiaomuchong'

"mysql+pymysql://root:[email protected]:3306/srld?charset=utf8mb4"

"sqlite: ///sqlite3.db"

"""

engine = create_engine("sqlite:///sqlite3.db")

conn = engine.connect()

return conn

if __name__ == '__main__':

conn = pandas_db_helper()

data = read_sql("select * from articles", con=conn)

print(data.info())

常用函数五:异步连接数据库

redis

# -*- coding: utf-8 -*-

"""

Datetime: 2020/07/26

Author: Zhang Yafei

Description:

"""

import asyncio

import aioredis

async def execute(address, password):

print("开始执行", address)

# 网络IO操作:创建redis连接

redis = await aioredis.create_redis(address, password=password)

# 网络IO操作:在redis中设置哈希值car,内部在设三个键值对,即: redis = { car:{key1:1,key2:2,key3:3}}

await redis.hmset_dict('car', key1=1, key2=2, key3=3)

# 网络IO操作:去redis中获取值

result = await redis.hgetall('car', encoding='utf-8')

print(result)

redis.close()

# 网络IO操作:关闭redis连接

await redis.wait_closed()

print("结束", address)

asyncio.run(execute('redis://127.0.0.1:6379', "0000"))

redis_pool

# -*- coding: utf-8 -*-

"""

Datetime: 2020/07/26

Author: Zhang Yafei

Description:

"""

import asyncio

import aioredis

async def execute(address, password):

print("开始执行", address)

# 网络IO操作:先去连接 127.0.0.1:6379,遇到IO则自动切换任务,去连接127.0.0.1:6379

redis = await aioredis.create_redis_pool(address, password=password)

# 网络IO操作:遇到IO会自动切换任务

await redis.hmset_dict('car', key1=1, key2=2, key3=3)

# 网络IO操作:遇到IO会自动切换任务

result = await redis.hgetall('car', encoding='utf-8')

print(result)

redis.close()

# 网络IO操作:遇到IO会自动切换任务

await redis.wait_closed()

print("结束", address)

task_list = [

execute('redis://127.0.0.1:6379', "0000"),

execute('redis://127.0.0.1:6379', "0000")

]

asyncio.run(asyncio.wait(task_list))

mysql

# -*- coding: utf-8 -*-

"""

Datetime: 2020/07/26

Author: Zhang Yafei

Description:

"""

import asyncio

import aiomysql

async def execute():

# 网络IO操作:连接MySQL

conn = await aiomysql.connect(host='127.0.0.1', port=3306, user='root', password='0000', db='mysql', )

# 网络IO操作:创建CURSOR

cur = await conn.cursor()

# 网络IO操作:执行SQL

await cur.execute("SELECT Host,User FROM user")

# 网络IO操作:获取SQL结果

result = await cur.fetchall()

print(result)

# 网络IO操作:关闭链接

await cur.close()

conn.close()

asyncio.run(execute())

mysql_pool

# -*- coding: utf-8 -*-

"""

Datetime: 2020/07/26

Author: Zhang Yafei

Description:

"""

import asyncio

import aiomysql

async def execute(host, password):

print("开始", host)

# 网络IO操作:先去连接 188.176.202.180,遇到IO则自动切换任务,去连接188.176.202.181

conn = await aiomysql.connect(host=host, port=3306, user='root', password=password, db='mysql')

# 网络IO操作:遇到IO会自动切换任务

cur = await conn.cursor()

# 网络IO操作:遇到IO会自动切换任务

await cur.execute("SELECT Host,User FROM user")

# 网络IO操作:遇到IO会自动切换任务

result = await cur.fetchall()

print(result)

# 网络IO操作:遇到IO会自动切换任务

await cur.close()

conn.close()

print("结束", host)

task_list = [

execute('127.0.0.1', "0000"),

execute('127.0.0.1', "0000")

]

asyncio.run(asyncio.wait(task_list))