目录
前言
优势
- Dataset API
- TFRecord
概念
- 数据说明
- 数据存储
- 常用存储
- TFRecord存储
实现
- 生成数据
- 写入TFRecord file
- 存储类型
- 如何存储张量feature
- 使用Dataset
- 创建dataset
- 操作dataset
- 解析函数
- 迭代样本
- Shuffle
- Batch
- Batch padding
- Epoch
前言
优势
一、为什么用Dataset API?
1. 简洁性:
- 常规方式:用python代码来进行batch,shuffle,padding等numpy类型的数据处理,再用placeholder + feed_dict来将其导入到graph中变成tensor类型。因此在网络的训练过程中,不得不在tensorflow的代码中穿插python代码来实现控制。
- Dataset API:将数据直接放在graph中进行处理,整体对数据集进行上述数据操作,使代码更加简洁。
**2. 对接性:**TensorFlow中也加入了高级API (Estimator、Experiment,Dataset)帮助建立网络,和Keras等库不一样的是:这些API并不注重网络结构的搭建,而是将不同类型的操作分开,帮助周边操作。可以在保证网络结构控制权的基础上,节省工作量。若使用Dataset API导入数据,后续还可选择与Estimator对接。
二、为什么用TFRecord?
在数据集较小时,我们会把数据全部加载到内存里方便快速导入,但当数据量超过内存大小时,就只能放在硬盘上来一点点读取,这时就不得不考虑数据的移动、读取、处理等速度。使用TFRecord就是为了提速和节约空间的。
概念
二、数据存储
为达成上述的训练,我们需要把所有的样本存储成合适的类型以供随后的训练。
2. TFRecord存储:
TFRecord是以字典的方式一次写一个样本,字典的keys可以不以输入和标签,而以不同的特征(如学历,年龄,职业,收入)区分,在随后的读取中再选择哪些特征形成输入,哪些形成标签。这样的好处是,后续可以根据需要只挑选特定的特征;也可以方便应对例如多任务学习这样有多个输入和标签的机器学习任务。
注:一般而言,单数的feature是一个维度,即标量。所有的features组成representation。但在 TFRecord的存储中,字典中feature的value可以不是标量。如:key为学历的value就可以是:[初中,高中,大学],3个features所形成的向量。亦可是任何维度的张量。
实现
一、生成数据
将常见的集中数据类型都写进来了
<code class="language-python">np.set_printoptions(precision=3) # display function def display(alist, show = True): print('type:%s\\nshape: %s' %(alist[0].dtype,alist[0].shape)) if show: for i in range(3): print('example%s\\n%s' %(i,alist[i])) category = np.array([1,2,3],dtype=int64) print('\\n category') display(category) numerical = np.array([[0.1,0.1,0.1], [0.2,0.2,0.2], [0.3,0.3,0.3]],dtype=float32) print('\\n numerical') display(vectors) records = np.array([[1,2,3], [2,3,4], [3,4,5]],dtype=int64) print('\\n records') display(records) # 稀疏特征 indices=[[1,2],[2,3,4],[23,5,6,1]] values=[[0.1,0.9],[0.2,0.4,0.4],[0.2,0.4,0.4,0.4]] print('\\n sparse') print(indices) print(values) </code>
二、写入TFRecord file
1. 打开TFRecord file
<code class="language-python">writer = tf.python_io.TFRecordWriter('%s.tfrecord' %'test') </code>
2. 创建样本写入字典
这里准备一个样本一个样本的写入TFRecord file中。
先把每个样本中所有feature的信息和值存到字典中,key为feature名,value为feature值。
feature值需要转变成tensorflow指定的feature类型中的一个:
2.1. 存储类型
- int64:
tf.train.Feature(int64_list = tf.train.Int64List(value=输入))
- float32:
tf.train.Feature(float_list = tf.train.FloatList(value=输入))
- string:
tf.train.Feature(bytes_list=tf.train.BytesList(value=输入))
- 注:输入必须是list(向量)
2.2. 如何处理类型是张量的feature
tensorflow feature类型只接受list数据,但如果数据类型是矩阵或者张量该如何处理?
两种方式:
- 转成list类型:将张量fatten成list(也就是向量),再用写入list的方式写入。
- 转成string类型:将张量用.tostring()转换成string类型,再用tf.train.Feature(bytes_list=tf.train.BytesList(value=[input.tostring()]))来存储。
- 形状信息:不管那种方式都会使数据丢失形状信息,所以在向该样本中写入feature时应该额外加入shape信息作为额外feature。shape信息是int类型,这里我是用原feature名字+’_shape’来指定shape信息的feature名。
<code class="language-python">for i in range(3): # create dictionary features={} # write scalar ,type Int64,"value=[scalars[i]]" makes it to list features['category'] = tf.train.Feature(int64_list=tf.train.Int64List(value=[category[i]])) # write vector,type float,it is list,so "value=vectors[i]" features['numerical'] = tf.train.Feature(float_list = tf.train.FloatList(value=numerical[i])) # write matrix,type float,but its rank =2,tf.train.FloatList only takes list, so we can flatten it to list features['records'] = tf.train.Feature(int64_list = tf.train.Int64List(value=records[i])) # save shape (806,806,3) features['indices'] = tf.train.Feature(int64_list = tf.train.Int64List(value=indices[i])) features['values'] = tf.train.Feature(float_list = tf.train.FloatList(value=values[i])) # feed dictionary to tf.train.Features tf_features = tf.train.Features(feature= features) # get an example tf_example = tf.train.Example(features = tf_features) # serialize the example tf_serialized = tf_example.SerializeToString() # write writer.write(tf_serialized) </code>
3. 转成tf_features
<code class="language-python"># 将存有所有feature的字典送入tf.train.Features中 tf_features = tf.train.Features(feature= features) </code>
4. 转成tf_example
<code class="language-python"># 再将其变成一个样本example tf_example = tf.train.Example(features = tf_features) </code>
5. 序列化样本
<code class="language-python"># 序列化该样本 tf_serialized = tf_example.SerializeToString() </code>
6. 写入样本
<code class="language-python"># 写入一个序列化的样本 writer.write(tf_serialized) # 由于上面有循环3次,所以到此我们已经写了3个样本 </code>
7. 关闭TFRecord file
<code class="language-python"># 关闭文件 writer.close() </code>
三、使用Dataset
1. 创建dataset
Dataset是你的数据集,包含了某次将要使用的所有样本,且所有样本的结构需相同(在tensorflow官网介绍中,样本example也被称作element)。样本需从source导入到dataset中,导入的方式有很多中。随后也可从已有的dataset中构建出新的dataset。
1.1. 直接导入(非本文重点,随后不再提)
<code class="language-python">dataset **=** tf**.**data**.**Dataset**.**from_tensor_slices([1,2,3]) *# 输入需是list,可以是numpy类型,可以是tf tensor类型,也可以直接输入* </code>
1.2. 从TFRecord文件导入
<code class="language-python"># 从多个tfrecord文件中导入数据到Dataset类 (这里用两个一样) filenames = ["test.tfrecord", "test.tfrecord"] dataset = tf.data.TFRecordDataset(filenames) </code>
2. 操作dataset:
如优势中所提到的,我们希望对dataset中的所有样本进行统一的操作(batch,shuffle,padding等)。接下来就是对dataset的操作。
2.1. dataset.map(func)
由于从tfrecord文件中导入的样本是刚才写入的tf_serialized序列化样本,所以我们需要对每一个样本进行解析。这里就用dataset.map(parse_function)来对dataset里的每个样本进行相同的解析操作。
注:dataset.map(输入)中的输入是一个函数。
2.1.2. 创建解析函数
接下就创建parse function。
<code class="language-python">def parse_function(example_proto): # 只接受一个输入:example_proto,也就是序列化后的样本tf_serialized </code>
Step 1. 创建样本解析字典
该字典存放着所有feature的解析方式,key为feature名,value为feature的解析方式。
解析方式有两种:
- 定长特征解析:tf.FixedLenFeature(shape, dtype, default_value)
- shape:可当reshape来用,如vector的shape从(3,)改动成了(1,3)。
- 注:如果写入的feature使用了.tostring() 其shape就是()
- dtype:必须是tf.float32, tf.int64, tf.string中的一种。
- default_value:feature值缺失时所指定的值。
- 不定长特征解析:tf.VarLenFeature(dtype)
- 注:可以不明确指定shape,但得到的tensor是SparseTensor。
<code class="language-python">def parse_function(example_proto): # example_proto,tf_serialized dics = {'category': tf.FixedLenFeature(shape=[1], dtype=tf.int64, default_value=None), # when parse the example, shape below can be used as reshape, for example reshape (3,) to (1,3) 'numerical': tf.FixedLenFeature(shape=[3], dtype=tf.float32), # we can use VarLenFeature, but it returns SparseTensor 'records': tf.FixedLenFeature(shape=[3],dtype=tf.int64), 'sparse': tf.io.SparseFeature( index_key= 'indices', value_key= 'values', dtype=tf.float32, size=50)} # parse all features in a single example according to the dics parsed_example = tf.parse_single_example(example_proto, dics) return parsed_example </code>
Step 2. 解析样本
<code class="language-python"># 把序列化样本和解析字典送入函数里得到解析的样本 parsed_example = tf.parse_single_example(example_proto, dics) </code>
Step 3. 返回样本
现在样本中的所有feature都被正确设定了。可以根据需求将不同的feature进行拆分合并等处理,得到想要的输入和标签
,最终在parse_function末尾返回。这里为了展示,我直接返回存有4个特征的字典。
<code class="language-python"># 返回所有feature return parsed_example </code>
2.1.3. 执行解析函数
创建好解析函数后,将创建的parse_function送入dataset.map()得到新的数据集
<code class="language-python">new_dataset = dataset.map(parse_function) </code>
2.2. 创建迭代器
有了解析过的数据集后,接下来就是获取当中的样本。
<code class="language-python"># 创建获取数据集中样本的迭代器 iterator = new_dataset.make_one_shot_iterator() </code>
2.3. 获取样本
<code class="language-python">new_dataset = dataset.map(parse_function) iterator = new_dataset.make_one_shot_iterator() next_element = iterator.get_next() sess = tf.InteractiveSession() i = 1 while True: # try: scalar,vector,matrix,tensor = sess.run([next_element['category'], next_element['numerical'], next_element['records'], next_element['sparse']]) except tf.errors.OutOfRangeError: print("End of dataset") break else: print(scalar) print(vector) print(matrix) print(tensor) print('==============example %s ==============' %i) print('scalar: value: %s | shape: %s | type: %s' %(scalar, scalar.shape, scalar.dtype)) print('vector shape: %s | type: %s' %(vector.shape, vector.dtype)) print('matrix shape: %s | type: %s' %(matrix.shape, matrix.dtype)) # print('tensor shape: %s | type: %s' %(tensor, tensor.dtype)) i+=1 </code>
<code class="language-python">[1] [0.1 0.1 0.1] [1 2 3] SparseTensorValue(indices=array([[1], [2]], dtype=int64), values=array([0.1, 0.9], dtype=float32), dense_shape=array([50], dtype=int64)) ==============example 1 ============== scalar: value: [1] | shape: (1,) | type: int64 vector shape: (3,) | type: float32 matrix shape: (3,) | type: int64 [2] [0.2 0.2 0.2] [2 3 4] SparseTensorValue(indices=array([[2], [3], [4]], dtype=int64), values=array([0.2, 0.4, 0.4], dtype=float32), dense_shape=array([50], dtype=int64)) ==============example 2 ============== scalar: value: [2] | shape: (1,) | type: int64 vector shape: (3,) | type: float32 matrix shape: (3,) | type: int64 [3] [0.3 0.3 0.3] [3 4 5] SparseTensorValue(indices=array([[ 1], [ 5], [ 6], [23]], dtype=int64), values=array([0.4, 0.4, 0.4, 0.2], dtype=float32), dense_shape=array([50], dtype=int64)) ==============example 3 ============== scalar: value: [3] | shape: (1,) | type: int64 vector shape: (3,) | type: float32 matrix shape: (3,) | type: int64 [1] [0.1 0.1 0.1] [1 2 3] SparseTensorValue(indices=array([[1], [2]], dtype=int64), values=array([0.1, 0.9], dtype=float32), dense_shape=array([50], dtype=int64)) ==============example 4 ============== scalar: value: [1] | shape: (1,) | type: int64 vector shape: (3,) | type: float32 matrix shape: (3,) | type: int64 [2] [0.2 0.2 0.2] [2 3 4] SparseTensorValue(indices=array([[2], [3], [4]], dtype=int64), values=array([0.2, 0.4, 0.4], dtype=float32), dense_shape=array([50], dtype=int64)) ==============example 5 ============== scalar: value: [2] | shape: (1,) | type: int64 vector shape: (3,) | type: float32 matrix shape: (3,) | type: int64 [3] [0.3 0.3 0.3] [3 4 5] SparseTensorValue(indices=array([[ 1], [ 5], [ 6], [23]], dtype=int64), values=array([0.4, 0.4, 0.4, 0.2], dtype=float32), dense_shape=array([50], dtype=int64)) ==============example 6 ============== scalar: value: [3] | shape: (1,) | type: int64 vector shape: (3,) | type: float32 matrix shape: (3,) | type: int64 End of dataset </code>
我们写进test.tfrecord文件中了3个样本,用dataset = tf.data.TFRecordDataset(["test.tfrecord","test.tfrecord"
导入了两次,所以有6个样本。scalar的值,也符合所写入的数据。
2.4. Shuffle
可以轻松使用.shuffle(buffer_size= )
来打乱顺序。buffer_size
设置成一个大于你数据集中样本数量的值来确保其充分打乱。
注:对于数据集特别巨大的情况,请参考YJango:tensorflow中读取大规模tfrecord如何充分shuffle?
<code class="language-python">shuffle_dataset = new_dataset.shuffle(buffer_size=10000) iterator = shuffle_dataset.make_one_shot_iterator() next_element = iterator.get_next() i = 1 while True: try: scalar = sess.run(next_element['category']) except tf.errors.OutOfRangeError: print("End of dataset") break else: print('example %s | scalar: value: %s' %(i,scalar)) i+=1 </code>
<code class="language-python">example 1 | scalar: value: [3] example 2 | scalar: value: [3] example 3 | scalar: value: [2] example 4 | scalar: value: [2] example 5 | scalar: value: [1] example 6 | scalar: value: [1] End of dataset </code>
顺序打乱了,但1,2,3都出现过2次
2.5. Batch
再从乱序后的数据集上进行batch。
<code class="language-python">batch_dataset = shuffle_dataset.batch(4) iterator = batch_dataset.make_one_shot_iterator() next_element = iterator.get_next() i = 1 while True: try: scalar = sess.run(next_element['category']) except tf.errors.OutOfRangeError: print("End of dataset") break else: print('example %s | scalar: value: %s' %(i,scalar)) i+=1 </code>
<code class="language-python">example 1 | scalar: value: [[1] [2] [1] [2]] example 2 | scalar: value: [[3] [3]] End of dataset </code>
6个样本,以4个进行batch,第一个得到4个,第二个得到余下的2个
2.7. Epoch
<code class="language-python">num_epochs = 2 epoch_dataset = new_dataset.repeat(num_epochs) iterator = epoch_dataset.make_one_shot_iterator() next_element = iterator.get_next() i = 1 while True: try: scalar = sess.run(next_element['category']) except tf.errors.OutOfRangeError: print("End of dataset") break else: print('example %s | scalar: value: %s' %(i,scalar)) i+=1 </code>
<code class="language-python"> example 1 | scalar: value: [1] example 2 | scalar: value: [2] example 3 | scalar: value: [3] example 4 | scalar: value: [1] example 5 | scalar: value: [2] example 6 | scalar: value: [3] example 7 | scalar: value: [1] example 8 | scalar: value: [2] example 9 | scalar: value: [3] example 10 | scalar: value: [1] example 11 | scalar: value: [2] example 12 | scalar: value: [3] End of dataset </code>
使用.repeat(num_epochs)
来指定要遍历几遍整个数据集
显示结果
除了tf.train.example外,还可以用SequenceExample,不过文件大小会增倍(参考)
一些有用的函数
interleave 函数
<code class="language-python">interleave( map_func, cycle_length=AUTOTUNE, block_length=1, num_parallel_calls=None ) </code>
首先该方法会从该Dataset中取出cycle_length个element,然后对这些element apply map_func, 得到cycle_length个新的Dataset对象。然后从这些新生成的Dataset对象中取数据,每个Dataset对象一次取block_length个数据。当新生成的某个Dataset的对象取尽时,从原Dataset中再取一个element,然后apply map_func,以此类推。
<code class="language-python"># 1.15 版本里面的 API a = Dataset.range(1, 6) # ==> [ 1, 2, 3, 4, 5 ] # NOTE: New lines indicate "block" boundaries. a.interleave(lambda x: Dataset.from_tensors(x).repeat(6), cycle_length=2, block_length=4) # ==> [1, 1, 1, 1, **# 2, 2, 2, 2,** # 1, 1, # 2, 2, # 3, 3, 3, 3, # 4, 4, 4, 4, # 3, 3, # 4, 4, # 5, 5, 5, 5, # 5, 5] </code>
实际业务中的使用:
<code class="language-python">iterator = files.apply( tf.data.experimental.parallel_interleave(lambda filename: tf.data.TFRecordDataset(filename), cycle_length=min(self.cycle_length, len(filenames)), block_length=block_length, sloppy=sloppy)) </code>
shard
<code class="language-python">shard( num_shards, index ) </code>
这个函数主要是在分布式运行非常有用,获取数据的 1/num_shard
<code class="language-python">if shard is True: files = files.shard(hvd.size(), hvd.rank()) </code> 参考: https://zhuanlan.zhihu.com/p/33223782