PyODPS:基于DataWorks平台利用PyODPS实现案例集合之odps.models.project格式表的基本操作(输出表的基本信息/增删改查/表分区/数据上传下载)
导读:PyODPS可以在DataWorks等数据开发平台中作为数据开发节点调用。工作流节点中会包含PYODPS类型节点。这些平台提供了PyODPS运行环境和调度执行的能力,无需用户手动创建MaxCompute入口对象。
目录
基于DataWorks平台利用PyODPS实现案例集合之表的基本操作
1、输出表的基本信息
2、表的初始化
3、表的增删改查
(1)、增
# 创建表
T1、利用Schema来创建表
T2、简单的方式是使用逗号连接的“字段名,字段类型”字符串组合来创建表
# 创建DataFrame
# 写入数据
T1、# 使用 with 表达式
T2、# 不使用 with 表达式
T3、使用MaxCompute对象的write_table方法写入数据
(2)、删
# 删除表
(3)、改
# 同步表更新
(4)、查
# 查询行记录Record
# 查询表数据
T1、通过调用head获取表数据,但仅限于查看每张表开始的小于1万条的数据
T2、通过在table上执行open_reader操作,打开一个reader来读取数据。用户可以使用with表达式,也可以不使用。
T3、使用MaxCompute对象的read_table方法获取表数据
4、表分区相关
5、数据上传下载通道
(1)、上传示例
(2)、下载示例
基于DataWorks平台利用PyODPS实现案例集合之表的基本操作
from odps import ODPS #与ODPS调用相关的SDK需要增加这个模块导入。
print("Hello World") #往日志文件输出。
print(o.exist_table('pyodps_iris')) # ataWorks的PyODPS节点中,将会包含一个全局变量odps或者o,即MaxCompute入
project = o.get_project() # 获取到默认项目空间
print(project)
1、输出表的基本信息
# # 列出项目空间下的所有表及其每个字段类型
for table in o.list_tables():
print(table)
basic_cplus__lf_dev.`consumer`
# 输出表的基本信息
t = o.get_table('consumer', project='basic_cplus__**_dev') # 调用get_table来获取表
# print(t)
print(t.lifecycle)
print(t.creation_time)
print(t.is_virtual_view)
print(t.size)
print(t.comment)
print(t.schema.columns[:4])
2、表的初始化
1
2
3
4
3、表的增删改查
(1)、增
# 创建表
T1、利用Schema来创建表
table = o.create_table('my_new_table', schema)
table = o.create_table('my_new_table', schema, if_not_exists=True) #只有不存在表时才创建。
table = o.create_table('my_new_table', schema, lifecycle=7) #设置生命周期。
T2、简单的方式是使用逗号连接的“字段名,字段类型”字符串组合来创建表
table = o.create_table('my_new_table', 'num bigint, num2 double', if_not_exists=True)
# 创建分区表可传入 (表字段列表, 分区字段列表)
table = o.create_table('my_new_table', ('num bigint, num2 double', 'pt string'), if_not_exists=True)
# 创建DataFrame
PyODPS提供了DataFrame框架DataFrame框架,支持更方便地方式来查询和操作MaxCompute数据。使用to_df方法,即可转化为DataFrame对象。
table = o.get_table('my_table_name')
df = table.to_df()
# 写入数据
类似于open_reader,table对象同样可以执行open_writer来打开writer,并写入数据。
• 每次调用write_table,MaxCompute都会在服务端生成一个文件。这一操作需要较大的时间开销,同时过多的文件会降低后续的查询效率。因此,建议在使用write_table方法时,一次性写入多组数据, 或者传入一个Generator对象。
• 使用write_table方法写表时会追加到原有数据。PyODPS不提供覆盖数据的选项,如果需要覆盖数据,需要手动清除原有数据。对于非分区表,需要调用table.truncate(),对于分区表,需要删除分区后再建立。
T1、# 使用 with 表达式
with t.open_writer(partition='pt=test') as writer:
records = [[111, 'aaa', True], # 这里可以是list。
[222, 'bbb', False],
[333, 'ccc', True],
[444, '中文', False]]
writer.write(records) # 这里records可以是可迭代对象。
with t.open_writer(partition='pt1=test1,pt2=test2') as writer: #多级分区写法
records = [t.new_record([111, 'aaa', True]), # 也可以是Record对象。
t.new_record([222, 'bbb', False]),
t.new_record([333, 'ccc', True]),
t.new_record([444, '中文', False])]
writer.write(records)with t.open_writer(partition='pt=test', blocks=[0, 1]) as writer: # 此处同时打开两个block。
writer.write(0, gen_records(block=0))
writer.write(1, gen_records(block=1)) # 此处两个写操作可以多线程并行,各个block间是独立的。
T2、# 不使用 with 表达式
writer = t.open_writer(partition='pt=test', blocks=[0, 1])
writer.write(0, gen_records(block=0))
writer.write(1, gen_records(block=1))
writer.close() # 不要忘记关闭 writer,否则数据可能写入不完全
T3、使用MaxCompute对象的write_table方法写入数据
records = [[111, 'aaa', True], # 这里可以是list
[222, 'bbb', False],
[333, 'ccc', True],
[444, '中文', False]]
o.write_table('test_table', records, partition='pt=test', create_partition=True)
如果分区不存在,可以使用create_partition参数指定创建分区,示例如下。
with t.open_writer(partition='pt=test', create_partition=True) as writer:
records = [[111, 'aaa', True], # 这里可以是list。
[222, 'bbb', False],
[333, 'ccc', True],
[444, '中文', False]]
writer.write(records) # 这里records可以是可迭代对象。
(2)、删
# 删除表
o.delete_table('my_table_name', if_exists=True) # 只有表存在时删除。
t.drop() # Table对象存在的时候可以直接执行drop函数。
(3)、改
# 同步表更新
当一个表被其他程序更新,例如改变了Schema之后,可以调用reload方法同步表的更新
table.reload()
(4)、查
# 查询行记录Record
Record表示表的一行记录,用户在Table对象上调用new_record即可创建一个新的Record
t = o.get_table('mytable')
r = t.new_record(['val0', 'val1']) # 值的个数必须等于表schema的字段数
r2 = t.new_record() # 您也可以不传入值
r2[0] = 'val0' # 可以通过偏移设置值
r2['field1'] = 'val1' # 也可以通过字段名设置值
r2.field1 = 'val1' # 通过属性设置值
print(record[0]) # 取第0个位置的值
print(record['c_double_a']) # 通过字段取值
print(record.c_double_a) # 通过属性取值
print(record[0: 3]) # 切片操作
print(record[0, 2, 3]) # 取多个位置的值
print(record['c_int_a', 'c_double_a']) # 通过多个字段取值
# 查询表数据
T1、通过调用head获取表数据,但仅限于查看每张表开始的小于1万条的数据
t = odps.get_table('dual')
for record in t.head(3):
print(record[0]) # 取第0个位置的值
print(record['c_double_a']) # 通过字段取值
print(record[0: 3]) # 切片操作
print(record[0, 2, 3]) # 取多个位置的值
print(record['c_int_a', 'c_double_a']) # 通过多个字段取值
T2、通过在table上执行open_reader操作,打开一个reader来读取数据。用户可以使用with表达式,也可以不使用。
# 使用with表达式
with t.open_reader(partition='pt=test') as reader:
count = reader.count
for record in reader[5:10] # 可以执行多次,直到将count数量的record读完,这里可以改
造成并行操作
# 处理一条记录
# 不使用with表达式
reader = t.open_reader(partition='pt=test')
count = reader.count
for record in reader[5:10] # 可以执行多次,直到将count数量的record读完,这里可以改造
成并行操作
# 处理一条记录
T3、使用MaxCompute对象的read_table方法获取表数据
for record in o.read_table('test_table', partition='pt=test'):
# 处理一条记录
4、表分区相关
# 判断是否为分区表
if table.schema.partitions:
print('Table %s is partitioned.' % table.name)
# 遍历表全部分区:
for partition in table.partitions:
print(partition.name)
for partition in table.iterate_partitions(spec='pt=test'):
# 遍历二级分区。
判断分区是否存在:
table.exist_partition('pt=test,sub=2019')
# 获取分区:
partition = table.get_partition('pt=test')
print(partition.creation_time)
# 创建分区。
t.create_partition('pt=test', if_not_exists=True) # 分区。不存在时候才创建。
# 删除分区。
t.delete_partition('pt=test', if_exists=True) # 存在时才删除。
partition.drop() # Partition对象存在的时候直接drop。
5、数据上传下载通道
MaxCompute Tunnel是MaxCompute的数据通道,用户可以通过Tunnel向MaxCompute中上传或者下载数据。
说明:不推荐直接使用Tunnel接口,推荐用户直接使用表的写和读接口。同时,如果安装了Cython,在安装PyODPS时会编译C代码,加速Tunnel的上传和下载。
说明:需要注意,PyODPS暂不支持上传外部表。s
(1)、上传示例
from odps.tunnel import TableTunnel
table = o.get_table('my_table')
tunnel = TableTunnel(odps)
upload_session = tunnel.create_upload_session(table.name, partition_spec='pt=test')
with upload_session.open_record_writer(0) as writer:
record = table.new_record()
record[0] = 'test1'
record[1] = 'id1'
writer.write(record)
record = table.new_record(['test2', 'id2'])
writer.write(record)
upload_session.commit([0])
(2)、下载示例
from odps.tunnel import TableTunnel
tunnel = TableTunnel(odps)
download_session = tunnel.create_download_session('my_table', partition_spec='pt=
test')
with download_session.open_record_reader(0, download_session.count) as reader:
for record in reader:
# 处理每条记录。