Seq2Seq-Implementation

AttentionWeight


class AttentionWeight:
	def __init__(self):
	self.params, self.grads = [], []
	self.softmax = Softmax()
	self.cache = None
	
	def forward(self, hs, h):
		N, T, H = hs.shape
		hr = h.reshape(N, 1, H)#.repeat(T, axis=1)
		t = hs * hr
		s = np.sum(t, axis=2)
		a = self.softmax.forward(s)
		self.cache = (hs, hr)
		return a
	
	def backward(self, da):
		hs, hr = self.cache
		N, T, H = hs.shape
		
		ds = self.softmax.backward(da)
		dt = ds.reshape(N, T, 1).repeat(H, axis=2)
		dhs = dt * hr
		dhr = dt * hs
		dh = np.sum(dhr, axis=1)
		
		return dhs, dh

Weighted Sum


class WeightSum:
	def __init__(self):
		self.params, self.grads = [], []
		self.cache = None
	
	def forward(self, hs, a):
		# 'ppp' exercise
		N, T, H = hs.shape
		ar = a.reshape(N, T, 1) # repeat(T, axis=1)
		t = hs * ar
		c = np.sum(t, axis=1)
		
		self.cache = (hs, ar)
		return c
	
	def backward(self, dc):
		# 'ppp' exercise
		hs, ar = self.cache
		N, T, H = hs.shape
		dt = dc.reshape(N, 1, H).repeat(T, axis=1)
		dar = dt * hs
		dhs = dt * ar
		da = np.sum(dar, axis=2)
		return dhs, da

Attention


class Attention:
	def __init__(self):
		self.params, self.grads = [], []
		self.attention_weight_layer = AttentionWeight()
		self.weight_sum_layer = WeightSum()
		self.attention_weight = None
		
	def forward(self, hs, h):
		a = self.attention_weight_layer.forward(hs, h)
		out = self.weight_sum_layer.forward(hs, a)
		self.attention_weight = a
		return out
	
	def backward(self, dout):
		dhs0, da = self.weight_sum_layer.backward(dout)
		dhs1, dh = self.attention_weight_layer.backward(da)
		dhs = dhs0 + dhs1
		return dhs, dh

TimeAttention


class TimeAttention:
	def __init__(self):
		self.params, self.grads = [], []
		self.layers = None
		self.attention_weights = None
	
	def forward(self, hs_enc, hs_dec):
		# 'ppp' exercise
		N, T, H = hs_enc
		out = np.empty_like(hs_dec)
		self.layers = []
		self.attention_weights = []
		
		for t in range(T):
			layer = Attention()
			out[:, t, :] = layer.forward(hs_enc, hs_dec[:, t, :])
			self.layers.append(layer)
			self.attention_weights.append(layer.attention_weight)
		return out
 
	def backward(self, dout):
		# 'ppp' exercise
		N, T, H = dout.shape
		dhs_enc = 0
		dhs_dec = np.empty_like(dout)
		
		for t in range(T):
			layer = self.layers[t]
			dhs, dh = layer.backward(dout[:, t, :])
			dhs_enc += dhs
			dhs_dec[:, t, :] = dh
			
		return dhs_enc, dhs_dec

AttentionSeq2Seq


class AttentionSeq2seq(Seq2seq):
	def __init__(self, vocab_size, wordvec_size, hidden_size):
	args = vocab_size, wordvec_size, hidden_size
	self.encoder = AttentionEncoder(*args)
	self.decoder = AttentionDecoder(*args)
	self.softmax = TimeSoftmaxWithLoss()
	
	self.params = self.encoder.params + self.decoder.params
	self.grads = self.encoder.grads + self.decoder.grads

AttentionEncoder


class AttentionEncoder(Encoder):
	def forward(self, xs):
		xs = self.embed.forward(xs)	
		hs = self.lstm.forward(xs)
		return hs
	
	def backward(self, dhs):
		dout = self.lstm.backward(dhs)
		dout = self.embed.backward(dout)
		return dout

AttentionDecoder


class AttentionDecoder(Decoder):
	def __init__(self, vocab_size, wordvec_size, hidden_size):
		V, D, H = vocab_size, wordvec_size, hidden_size
		rn = np.random.randn
		
		embed_W = (rn(V, D) / 100).astype('f')
		lstm_Wx = (rn(D, 4 * H) / np.sqrt(D)).astype('f')
		lstm_Wh = (rn(H, 4 * H) / np.sqrt(H)).astype('f')
		lstm_b = np.zeros(4 * H).astype('f')
		affine_W = (rn(2*H, V) / np.sqrt(2*H)).astype('f')
		affine_b = np.zeros(V).astype('f')
		
		self.embed = TimeEmbedding(embed_W)
		self.lstm = TimeLSTM(lstm_Wx, lstm_Wh, lstm_b, stateful=True)
		self.attention = TimeAttention()
		self.affine = TimeAffine(affine_W, affine_b)
		layers = [self.embed, self.lstm, self.attention, self.affine]
		self.params, self.grads = [], []
		
		for layer in layers:
			self.params += layer.params
			self.grads += layer.grads
	
	def forward(self, xs, enc_hs):
		# 'ppp' exercise
		h = enc_hs[:, -1]
		self.lstm.set_state(h)
		
		out = self.embed.forward(xs)
		dec_hs = self.lstm.forward(out)
		c = self.attention.forward(enc_hs, dec_hs)
		out = np.concatenate((c, dec_hs), axis=2)
		score = self.affine.forward(out)
		
		return score
	
	def backward(self, dscore):
		# 'ppp' exercise		
		dout = self.affine.backward(dscore)
		N, T, H2 = dout.shape
		H = H2 // 2
		
		dc, ddec_hs0 = dout[:, :, :H], dout[:, :, H:]
		denc_hs, ddec_hs1 = self.attention.backward(dc)
		ddec_hs = ddec_hs0 + ddec_hs1
		dout = self.lstm.backward(ddec_hs)
		dh = self.lstm.dh
		ddec_hs[:, -1] += dh
		self.embed.backward(dout)
		
		return denc_hs
	
	def generate(self, enc_hs, start_id, sample_size):
		# 'ppp' exercise
		sampled = []
		sample_id = start_id
		h = enc_hs[:-1]
		self.lstm.set_state(h)
		
		for _ in range(sample_size):
			x = np.array([sample_id].reshape(1, 1))
			
			out = self.embed.forward(x)
			dec_hs = self.lstm.forward(out)
			c = self.attention.forward(enc_hs, dec_hs)
			out = np.concatenate((c, dec_hs), axis=2)
			score = self.afine.forward(out)
			
			sample_id = np.argmax(score.flatten())
			sampled.append(sample_id)
		
		return sampled
원본 링크
\