天天看点

数据分析工具篇——数据读写

数据分析的本质是为了解决问题,以逻辑梳理为主,分析人员会将大部分精力集中在问题拆解、思路透视上面,技术上的消耗总希望越少越好,而且分析的过程往往存在比较频繁的沟通交互,几乎没有时间百度技术细节。

因此,熟练常用技术是良好分析的保障和基础。

笔者认为熟练记忆数据分析各个环节的一到两个技术点,不仅能提高分析效率,而且将精力从技术中释放出来,更快捷高效的完成逻辑与沟通部分。

笔者习惯将一些常用的技术点梳理出来,下次用到可以轻松复制出来,节省不少精力,随着时间的积累,逐渐成型了一套技术集合。本文基于数据分析的基本流程,整理了SQL、pandas、pyspark、EXCEL(本文暂不涉及数据建模、分类模拟等算法思路)在分析流程中的组合应用,希望对大家有所助益。

1、数据导入

将数据导入到python的环境中相对比较简单,只是工作中些许细节,如果知道可以事半功倍:

1.1、导入Excel/csv文件:

# 个人公众号:livandata
import pandas as pd
def fun(x):
    x = int(x) - 1000
    return x
data = pd.read_csv('total_data_append_ssl.txt',
                   dtype='str',
                   nrows=5,
                   sep=',',
                   header=[1,2],
                   names=['a','b','c'],
                   prefix='x',
                   converters={'a': fun, 'b': fun})           

复制

常用的导入Excel/CSV文件的方法为:read_csv()与read_excel()。

在使用过程中会用到一些基本的参数,如上代码:

1) dtype='str':以字符串的形式读取文件;

2) nrows=5:读取多少行数据;

3) sep=',:以逗号分隔的方式读取数据;

4) header=[1,2]:取哪一行作为列名。

如果将第2行作为列名,则header=1;

如果将第2,3行作为列名,则header=[1,2];

5) names=['a','b','c']如果要指定行名,则可以选用names参数:

6) prefix='x':对列名添加前缀,例如:列名为a,加入prefix之后显示为xa。

7) converters={'a': fun, 'b': fun}:对a和b两列做如上fun函数的处理。

1.2、导入txt文件:

with open('total_data_append_ssl.txt', 'r') as file_to_read:
  while True:
    lines = file_to_read.readline() # 整行读取数据
    if not lines:
      break           

复制

读取数据主要有两个:

1) r:覆盖式读取;

2) r+:追加式读取;

1.3、读入mysql中的数据:

import sqlalchemy as sqla
# 用sqlalchemy构建数据库链接engine
con = sqla.create_engine('mysql+pymysql://root:123456@localhost:3306/livan?charset=utf8')
# 如果读写数据中有汉字,则用charset=utf8mb4:
# con = sqla.create_engine('mysql+pymysql://root:123456@localhost:3306/livan?charset=utf8mb4')
# sql 命令
sql_cmd = "SELECT * FROM table"
df = pd.read_sql(sql=sql_cmd, con=con)           

复制

在构建连接的时候,笔者遇到一个有意思的操作,就是charset=utf8mb4,由于mysql不支持汉字,则在有汉字读写的时候需要用到utf8mb4编码,而不是单纯的utf8结构。

1.4、使用pyspark读取数据:

from pyspark.sql import SparkSession
spark = SparkSession\
            .builder\
            .appName("PythonWordCount")\
            .master("local") \
            .getOrCreate()
spark.conf.set("spark.executor.memory", "500M")
sc = spark.sparkContext           

复制

pyspark是一个相对较新的包,主要是采用python的方式连接了spark环境,他可以对应的读取一些数据,例如:txt、csv、json以及sql数据,可惜的是pyspark没有提供读取excel的api,如果有excel的数据,需要用pandas读取,然后转化成sparkDataFrame使用。

1) 读取csv数据:

data = spark.read.\
    options(header='True', inferSchema='True', delimiter=',').\
csv("/Users/livan/PycharmProjects/spark_workspace/total_data_append_1.csv")           

复制

2)读取txt数据:

df1 = spark.read.text("/spark_workspace/ssssss.txt")
lines = sc.textFile("data.txt")           

复制

3) 读取json数据:

df = spark.read.json('file:///Users/wangyun/Documents/BigData/script/data/people.json')           

复制

4) 读取SQL数据:

sqlDF = spark.sql("SELECT * FROM people")           

复制

读取sql时,需要连接对应的hive库或者数据库,有需要可以具体百度,这里就不详细描述了。

我们可以看到,pyspark读取上来的数据是存储在sparkDataFrame中,打印出来的方法主要有两个:

print(a.show())
print(b.collect())            

复制

show()是以sparkDataFrame格式打印;

collect()是以list格式打印。

2、分批读取数据:

遇到数据量较大时,我们往往需要分批读取数据,等第一批数据处理完了,再读入下一批数据,python也提供了对应的方法,思路是可行的,但是使用过程中会遇到一些意想不到的问题,例如:数据多批导入过程中,内存并不释放,最终导致内存溢出。

所以,正常情况下,如果遇到较大的数据量,我们会采用pyspark方式,这里只是记录分批读数的方案思路,有兴趣的小伙伴可以尝试一下:

# 分批读取文件:
def read_in_chunks(filePath, chunk_size=10*10):
    file_object = open(filePath)
    time.sleep(2)
    while True:
        chunk_data = file_object.read(chunk_size)
        if not chunk_data:
            break
        yield chunk_data
if __name__ == "__main__":
    path = '/Users/livan/PycharmProjects/data/Page Data/Facebook Insights Data Export - Visit Beijing - 2014-07.xml'
    for chunk in read_in_chunks(path):
        print(chunk)           

复制

另外,pandas中也提供了对应的分块方法:

loop = true
chunkSize = 10000
path = '../data/result.csv'
reader = pd.read_csv(path, iterator = True, dtype=str)
while loop:
    try:
        chunk = reader.get_chunk(chunkSize).fillna('nan')
    except StopIteration:
        loop = False
        print('iteration is stopped~')           

复制

3、数据导出

3.1、导出到csv/excel中:

df.to_csv('tses.csv', sep=',',columns=['a','b','c'],
           na_rep='', header=True,
           index=True, encoding='utf_8_sig')           

复制

数据写入csv和excel 的函数主要有:to_csv和to_excel两个。1) sep=',':输出的数据以逗号分隔;

2) columns=['a','b','c']:制定输出哪些列;

3) na_rep='':缺失值用什么内容填充;

4) header=True:是导出表头;

5) index=True:是否写入行名;

6) encoding='utf_8_sig':以字符串形式输出到文件中,汉字的编码有两种形式encoding='utf_8'和encoding='utf_8_sig',如果一种情况出现乱码,可以再换另一种方式。

2.2、导出到txt中:

url='ssdsdsd'
with open('teete.txt', 'a', encoding="utf-8") as file_handle:  # .txt可以不自己新建,代码会自动新建
    file_handle.write(url)           

复制

将数据写入到txt文件中,a为追加模式,w为覆盖写入。

Open()函数中添加encoding参数,即以utf-8格式写入。

2.3、导出到mysql中:

columns = ['aaa', 'bbb', 'ccc', 'ddd']
index = ['chinese', 'math', 'English']
data = np.random.randint(0, 100, size=(3, 4))
test = DataFrame(columns=columns, index=index, data=data)
# 例如:
con = sqla.create_engine('mysql+pymysql://root:123456@localhost:3306/livan?charset=utf8')
# 导入数据库
test.to_sql('test_table', con, index=False, if_exists='replace',
                 dtype={'aaa': sqla.types.INT,
                        'bbb': sqla.types.INT,
                        'ccc': sqla.types.INT,
                        'ddd': sqla.types.INT
                        })           

复制

其中if_exists参数是表示数据的追加模式:append追加模式和replace覆盖模式。

导出数据时如果数据量过大,to_sql的效率会很慢,有些大佬给出了对应的方案:

import cStringIO
output = cStringIO.StringIO()
# ignore the index
df_a.to_csv(output, sep='\t', index=False, header=False)
output.getvalue()
# jump to start of stream
output.seek(0)
connection = engine.raw_connection()  # engine 是 from sqlalchemy import create_engine
cursor = connection.cursor()
# null value become ''
cursor.copy_from(output, table_name, null='')
connection.commit()
cursor.close()           

复制

存入效率瞬间提升,在此做个感慨,沟通提高效率,例如上面的问题,不问不知道,一问有高人。

2.4、使用pyspark做数据导出:

from pyspark.sql import SparkSession
spark = SparkSession\
            .builder\
            .appName("PythonWordCount")\
            .master("local") \
            .getOrCreate()
spark.conf.set("spark.executor.memory", "500M")
sc = spark.sparkContext
sqlDF = spark.sql("SELECT * FROM people")
try:
    sqlDF.write.csv("sss.csv")
    sqlDF.write.text("seses.txt")
    sqlDF.write.format("orc").mode("append").\
        saveAsTable("kuming.biaoming")
except Exception as e:
    raise e           

复制

我们可以看到pyspark中的导出结构相对比较统一,即write函数,可以导出为csv、text和导出到hive库中,可以添加format格式和追加模式:append 为追加;overwrite为覆盖。

如上即为数据的导入导出方法,笔者在分析过程中,将常用的一些方法整理出来,可能不是最全的,但却是高频使用的,如果有新的方法思路,欢迎大家沟通。