本文讲述变长序列中的padding和masking,并展开讨论收到padding影响的操作如何处理。最后谈及Tensorflow在处理padding和masking上的实践总结。

更新:后期整理了有关的代码细节,可参看Github

理解神经网络中Embedding层的原理中提到Embedding层的原理,而在该层后,常常面临对不定长序列数据的padding和masking处理。为此引申出本文。

padding和masking

padding和masking示例,

padding:对不定长的文本序列进行填充,变为定长的序列,以便神经网络进行批量化学习

masking:指示定长的序列中哪些是原本文的数据,哪些是padding后的数据,以便神经网络区分

padding

神经网络的输入需要一个规整的张量,但是很多情况下是数据本身无法规整。例如句子,有长有短。比如下面的样本,

1
2
3
4
5
X = [
["Hello", "world", "!"],
["How", "are", "you", "doing", "today"],
["The", "weather", "will", "be", "nice", "tomorrow"],
]

然后我们通过字到ID映射,变为

1
2
3
4
5
X = [
[71, 1331, 4231],
[73, 8, 3215, 55, 927],
[83, 91, 1, 645, 1253, 927],
]

这些样本长短不一,无法直接作为一个batch输入神经网络中。为此,我们可以把每个样本都填充0,使其变为定长的样本。

方法一

在Keras中,tf.keras.preprocessing.sequence.pad_sequences提供padding操作,

1
2
3
padded_X = tf.keras.preprocessing.sequence.pad_sequences(
X, padding="post"
)

结果为,

1
2
3
4
>>> padded_X
array([[ 71, 1331, 4231, 0, 0, 0],
[ 73, 8, 3215, 55, 927, 0],
[ 83, 91, 1, 645, 1253, 927]], dtype=int32)

post(后向填充)和pre(前向填充)的padding方式是有差别的,对于RNN,推荐使用post方法,以便使用CuDNN的实现。

方法二

padding也可以自己实现,如下

1
2
3
4
5
6
7
8
def batch_padding(X, padding=0):
# post padding
maxlen = max([len(x) for x in X])
padded_X = np.array([
np.concatenate([x, [padding] * (maxlen - len(x))])
if len(x) < maxlen else x for x in X
])
return padded_X

按照batch进行padding,padding最大长度为batch内序列的最大长度。

方法三

使用tf.data.Dataset.padded_batch方法,例子如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def gen():
for i in X:
yield i

dl = tf.data.Dataset.from_generator(
generator=gen,
output_types=tf.int32
).padded_batch(
batch_size=1,
padded_shapes=[10]
)

for i in iter(dl):
print(i.shape)

在大型数据集中,推荐使用这种方法。前两张方法适合小型数据集的情况下使用。

现在把数据规整了,可以输入神经网络,那么如何告知网络那些数据是padding,哪些是原始数据?这样就可以避免模型在训练或推断是引入噪声。

masking及其计算

让神经网络知道那些是真实的数据,那些是padding数据。为找出mask的数据,可以用一下几种方法。这些方法都很灵活,可以根据数据特点和常见使用。

方法一

1
2
3
4
5
>>> tf.not_equal(padded_X, 0)
<tf.Tensor: shape=(3, 6), dtype=bool, numpy=
array([[ True, True, True, False, False, False],
[ True, True, True, True, True, False],
[ True, True, True, True, True, True]])>

方法二

tf.sequence_mask提供更方便的方法,如下,

1
2
3
4
5
6
7
8
9
>>> X
[[71, 1331, 4231], [73, 8, 3215, 55, 927], [83, 91, 1, 645, 1253, 927]]
>>> lengths = [len(x) for x in X]
>>> tf.sequence_mask(lengths)
<tf.Tensor: shape=(3, 6), dtype=bool, numpy=
array([[ True, True, True, False, False, False],
[ True, True, True, True, True, False],
[ True, True, True, True, True, True]])>
>>>

方法三

1
2
3
4
5
6
7
8
9
>>> embedding = tf.keras.layers.Embedding(input_dim=10000, output_dim=8, mask_
zero=True)
>>> masked_output = embedding(padded_X)
>>> masked_output._keras_mask
<tf.Tensor: shape=(3, 6), dtype=bool, numpy=
array([[ True, True, True, False, False, False],
[ True, True, True, True, True, False],
[ True, True, True, True, True, True]])>

方法四

1
2
3
4
5
6
7
8
9
10
11
>>> masking = tf.keras.layers.Masking(mask_value=0.0)
>>> unmasked_embedding = tf.cast(
... tf.tile(tf.expand_dims(padded_X, axis=-1), [1, 1, 8]), tf.float32
... )
>>> masked_embedding = masking(unmasked_embedding)
>>> masked_embedding._keras_mask
<tf.Tensor: shape=(3, 6), dtype=bool, numpy=
array([[ True, True, True, False, False, False],
[ True, True, True, True, True, False],
[ True, True, True, True, True, True]])>

方法五

1
2
mask = tf.greater(padded_X, 0)
print(mask)

方法六

1
2
mask = tf.math.logical_not(tf.math.equal(padded_X, 0))
print(mask)

方法七:

1
2
3
4
mask_layer = tf.keras.layers.Masking(mask_value=0)
X = tf.expand_dims(padded_X, axis=-1)
mask_tensor = mask_layer.compute_mask(X)
print(mask_tensor)

这里提供七种方法计算Mask,使用那种可以根据数据特点和具体情况灵活选择。有时候mask需要广播到其他维度,这个需要另外处理。

受 padding 影响的操作

某些操作受到 padding 带来的影响,需要 masking 处理以告知某一层哪些是padded的数据,以便在计算的时候将其排除掉。

average

平均函数如果直接求解是不准确的,应该使用如下方法,

其中m表示mask。

因此,当实现GlobalAveragePooling时,内部涉及average操作,因此需要处理mask。

max & min

如果是0,则直接乘以mask;如果是1,则在padding部分减去一个大正数,于是,

根据max性质,

就能获得min的mask形式。不过在神经网络中似乎很少使用这种mask。

softmax

注意到softmax需要计算$\exp(x)$,那么mask只需要减去一个大数即可,以Tensorflow为例,

1
2
3
4
mask = tf.expand_dims(tf.cast(mask, "float32"), -1)
x = x - (1 - mask) * 1e12
# 权重归一化
x = tf.math.softmax(x, 1) # 有mask位置对应的权重变为很小的值

如果你不放心这种计算,不妨验证一下,

1
2
>>> tf.exp([-1e12])
<tf.Tensor: shape=(1,), dtype=float32, numpy=array([0.], dtype=float32)>

如果框架本身没有softmax函数,可以自行实现,如numpy下,

1
2
3
4
def softmax(x, axis=-1):
x = x - x.max(axis=axis, keepdims=True)
x = np.exp(x)
return x / x.sum(axis=axis, keepdims=True)

这里有一定的技巧,为避免数值溢出,先减去一个最大数。类似地,logsumexp也是这个原理。

log

log的mask处理可以在mask位置加一,

倒数

倒数的mask处理是mask位置加上或减去一个大数,

Layer support masking

如果使用,Functional API 和 Sequential API,Keras中内置的层能自动接收和处理mask。Tensorflow内置的部分Layer都支持mask参数输入,以便处理padding数据。例如RNN,LSTM等等。

情况 1

如果自定义层需要mask信息做更多处理,例如在受 padding 影响的操作。这也是在实践中遇到的最重要的情况。

以AttentionPooling1D为例,自定义层需要call(self, inputs, mask=None)有mask参数以便下层传递mask能够接收。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class AttentionPooling1D(tf.keras.layers.Layer):

def __init__(self, h_dim, kernel_initializer="glorot_uniform", **kwargs):
super(AttentionPooling1D, self).__init__(**kwargs)
self.h_dim = h_dim
self.kernel_initializer = kernel_initializer
# time steps dim change
self.supports_masking = False

def build(self, input_shape):
self.k_dense = tf.keras.layers.Dense(
units=self.h_dim,
use_bias=False,
kernel_initializer=self.kernel_initializer,
activation="tanh"
)
self.o_dense = tf.keras.layers.Dense(
units=1,
use_bias=False
)

def call(self, inputs, mask=None):
if mask is None:
mask = 1
else:
# 扩展维度便于广播
mask = tf.expand_dims(tf.cast(mask, "float32"), -1)
x0 = inputs
# 计算每个 time steps 权重
x = self.k_dense(inputs)
x = self.o_dense(x)
# 处理 mask
x = x - (1 - mask) * 1e12
# 权重归一化
x = tf.math.softmax(x, 1) # 有mask位置对应的权重变为很小的值
# 加权平均
x = tf.reduce_sum(x * x0, 1)
return x

def compute_output_shape(self, input_shape):
return (None, self.h_dim)


inputs = keras.Input(shape=(None,), dtype="int32")
x = layers.Embedding(input_dim=10, output_dim=32, mask_zero=True)(inputs)
x = layers.Dense(1)(x)
outputs = TemporalSoftmax()(x)

model = keras.Model(inputs, outputs)
y = model(np.random.randint(0, 10, size=(32, 100)), np.random.random((32, 100, 1)))

情况 2

如果自定义层并没有破坏输入形状(samples, timesteps, features)中的时间维度,但内部组件需要接收mask,可以使用直接传递mask的方法。这种用法和情况一一样,只不过这里使用的是内置的层。AttentionPooling1D也可以作为这里的基本组件,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class LSTMClassifier(tf.keras.layers.Layer):

def __init__(self, h_dims=128, input_dim=100000, output_dim=128, **kwargs):
super(LSTMClassifier, self).__init__(**kwargs)
self.h_dims = h_dims
self.input_dim = input_dim
self.output_dim = output_dim

def build(self, input_shape):
self.embedding = tf.keras.layers.Embedding(
input_dim=self.input_dim,
output_dim=self.output_dim,
mask_zero=True # 指定mask值
)
self.lstm = tf.keras.layers.LSTM(self.h_dims)

def call(self, inputs, mask=None):
x = self.embedding(inputs)
# 计算mask
mask = self.embedding.compute_mask(inputs)
return self.lstm(x, mask=mask) # lstm会忽略padding值

def compute_output_shape(self, input_shape):
return (None, self.h_dims)

layer = MyLayer()
x = np.random.random((32, 10)) * 100
x = x.astype("int32")
layer(x)

情况 3

如果自定义层的功能破坏了原有输入形状(samples, timesteps, features)中的时间维度,如Flatten操作等,需要实现compute_mask方法,因为新的形状不知道对应的mask是什么,需要重新计算。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class TemporalSplit(keras.layers.Layer):
"""Split the input tensor into 2 tensors along the time dimension."""

def call(self, inputs):
# Expect the input to be 3D and mask to be 2D, split the input tensor into 2
# subtensors along the time axis (axis 1).
return tf.split(inputs, 2, axis=1)

def compute_mask(self, inputs, mask=None):
# Also split the mask into 2 if it presents.
if mask is None:
return None
return tf.split(mask, 2, axis=1)


first_half, second_half = TemporalSplit()(masked_embedding)
print(first_half._keras_mask)
print(second_half._keras_mask)

例如双向的LSTM层,对mask发生改变

情况 4

如果是自定义层,则需要分情况讨论。

Embedding mask_zero=True

设置Masking Layer

如果自定义层并没有破坏输入形状(samples, timesteps, features)中的时间维度,为了让mask向其他层传播,需要在自定义层设置self.supports_masking = True

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class MyActivation(keras.layers.Layer):
def __init__(self, **kwargs):
super(MyActivation, self).__init__(**kwargs)
# Signal that the layer is safe for mask propagation
self.supports_masking = True

def call(self, inputs):
return tf.nn.relu(inputs)


inputs = keras.Input(shape=(None,), dtype="int32")
x = layers.Embedding(input_dim=5000, output_dim=16, mask_zero=True)(inputs)
x = MyActivation()(x) # Will pass the mask along
print("Mask found:", x._keras_mask)
outputs = layers.LSTM(32)(x) # Will receive the mask

model = keras.Model(inputs, outputs)

实现上的问题

一种常见的方法是数据和mask都放到inputs上,

1
2
3
4
class XMaskedLayer(keras.layers.Layer):

def call(self, inputs):
x, mask = inputs

应该把inputs和mask区分开来

1
2
3
4
5
class XMaskedLayer(keras.layers.Layer):

def call(self, inputs, mask):
x1, x2 = inputs
a_mask, b_mask = mask

这里一BiLSTM作为示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class MaskBiLSTM(tf.keras.layers.Layer):
"""支持mask的BiLSTM"""

def __init__(self, hdims, **kwargs):
super(MaskBiLSTM, self).__init__(**kwargs)
self.hdims = hdims
self.forward_lstm = LSTM(hdims, return_sequences=True)
self.backend_lstm = LSTM(hdims, return_sequences=True)

def reverse_sequence(self, x, mask):
seq_len = tf.reduce_sum(mask, axis=1)[:, 0]
seq_len = tf.cast(seq_len, tf.int32)
x = tf.reverse_sequence(x, seq_len, seq_axis=1)
return x

def call(self, inputs, mask=None):
if mask is None:
mask = 1.0
x = inputs
x_forward = self.forward_lstm(x)
x_backward = self.reverse_sequence(x, mask)
x_backward = self.backend_lstm(x_backward)
x_backward = self.reverse_sequence(x_backward, mask)
x = tf.concat([x_forward, x_backward], axis=-1)
x = x * mask
return x

def compute_output_shape(self, input_shape):
return input_shape[0][:-1] + (self.hdims * 2,)

补充

以上,很多的代码细节,后期已经好了,放到Github,有需要请参看tensorflow-padding-masking

总结

因此Keras中masking和padding的使用规律如下,

  • 如果一个layer受padding的影响,那么在call方法中传入mask,或者在call方法内计算出mask,并根据具体的操作mask掉padding的影响,如上述提到的softmax的处理方式。
  • 如果一个layer对输入数据的timesteps维度产生影响,为让上层知道这个影响,mask也需要相应的处理。
  • 如果一个layer内部的组件需要mask来处理padding,如果该组件支持处理mask,那么获得mask后,直接传入即可
  • 如果一个layer并不受padding的影响,但是mask信息需要往上传递,那么让self.supports_masking=True

参考

[1] https://tensorflow.google.cn/guide/keras/masking_and_padding?hl=en

[2] tensorflow-padding-masking