궁금한점:
jacobian() 메서드 알아보기
사용자 정의 훈련 다시 정리하기
궁금한점:
지금까지 텐서플로 고수준 API인 tf.keras만 사용해왔습니다. 이제 텐서플로를 자세히 들여다보고 저수준 파이썬 API를 살펴볼 차례입니다. 자신만의 손실 함수, 지표, 층, 모델, 초기화, 규제, 가중치 규제 등을 만들어 세부적으로 제어하고 싶을 때 필요하기 때문입니다. 텐서플로의 자동 그래프 생성 기능을 사용해 사용자 정의 모델과 훈련 알고리즘의 성능 을 향상할 수 있는 방법도 알아봅니다.
텐서플로 훑어보기
텐서플로는 강력한 수치 계산용 라이브러리입니다. 머신러닝 외에도 계산량이 많이 필요한 어떤 작업에도 사용할 수 있습니다.
텐서플로가 제공하는 기능:
- GPU 지원
- 분산 컴퓨팅(여러 장치, 서버에 대해)
- JIT(just-in-time) 컴파일러를 포함합니다. 속도 ↑, 메모리 사용↓을 위해 계산 그래프를 추출한 다음 최적화(가지치기)하고 효율적으로 실행(독립적인 연산을 자동으로 병렬 실행)
- 계산 그래프는 다른 OS 간에 중립적인 포맷으로 내보낼 수 있습니다.(리눅스 → 안드로이드)
- 텐서플로는 자동미분, RMSProp 등 다양한 optimizer가 있어 모든 종류의 손실함수를 쉽게 최적화할 수 있습니다.
가장 저수준의 텐서플로 연산은 매우 효율적인 C++ 코드로 구현되어 있습니다. 많은 연산은 kernel이라 부르는 여러 구현을 가집니다. 각 커널은 CPU, GPU, TPU(텐서 처리 장치)와 같은 특성 장치에 맞추어 만들어졌습니다. GPU는 계산을 작은 단위로 나누어 여러 GPU 스레드에 병렬로 실행해 속도를 극적으로 향상합니다.
텐서플로 구조는 아래 그림과 같습니다. 더 높은 자유도가 필요한 경우 저수준 파이썬 API(Python Frontend)를 사용하여 텐서를 직접 다루게 됩니다. 파이썬 외에 다른 언어의 API도 제공합니다. (C++, Java ..) TensorFlow.js라는 자바스크립트 구현도 있는데, 이를 이용하면 브라우저에서 직접 모델을 실행하는 것이 가능합니다.
텐서플로는 라이브러리 그 이상의 것입니다. 시각화를 위해 TensorBoard, 텐서플로 제품화를 위한 라이브러리 모음인 TFX(TensorFlow Extended)를 제공합니다. TFX에는 데이터 시각화, 전처리, 모델 분석, 서빙 등이 포함됩니다.
구글의 TensorFlow Hub를 사용하면 사전훈련된 신경망을 손쉽게 다운로드하여 재사용할 수 있습니다. 텐서플로 모델 저장소(https://github.com/tensorflow/models)
텐서플로 리소스 페이지(https://www.tensorflow.org/resources)와 https://github.com/jtoy/awesome-tensorflow에서 다양한 텐서플로 기반 프로젝트를 확인할 수 있습니다. https://paperswithcode.com에서 머신러닝 논문들의 구현을 볼 수 있습니다.
질문은 stackoverflow에서 tensorflow와 python 태그를 붙여 올리면 됩니다. https://homl.info/41에서 텐서플로에 대한 일반적인 이야기를 들으실 수 있습니다.
넘파이처럼 텐서플로 사용하기
텐서플로 API는 tensor(텐서)를 순환시킵니다. 텐서는 한 연산에서 다른 연산으로 흐릅니다. 텐서는 넘파이의 ndarray와 매우 비슷한 다차원 배열입니다. 42와 같은 단순한 스칼라 값도 가질 수 있습니다.
사용자 정의 손실 함수, 사용자 정의 지표, 사용자 정의 층등을 만들 때 텐서가 중요합니다. 텐서를 만들고 조작하는 방법을 알아봅시다.
텐서와 연산
tf.constant() 함수로 텐서를 만들 수 있습니다.
tf.constant([[1., 2., 3.], [4., 5., 6.]]) # 행렬
'''
<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[1., 2., 3.],
[4., 5., 6.]], dtype=float32)>
'''
tf.constant(42) # 스칼라
'''
<tf.Tensor: shape=(), dtype=int32, numpy=42>
'''
Tensor.shape, Tensor.dtype
t = tf.constant([[1., 2., 3.], [4., 5., 6.]])
t.shape
'''
TensorShape([2, 3])
'''
t.dtype
'''
tf.float32
'''
t[:, 1:]
'''
<tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[2., 3.],
[5., 6.]], dtype=float32)>
'''
t[:, 1]
'''
t[:, 1]
<tf.Tensor: shape=(2,), dtype=float32, numpy=array([2., 5.], dtype=float32)>
'''
t[:, 1, tf.newaxis] # tf.newaxis를 입력하면 차원을 하나 추가한다.
'''
<tf.Tensor: shape=(2, 1), dtype=float32, numpy=
array([[2.],
[5.]], dtype=float32)>
'''
가장 중요한 것은 모든 종류의 텐서 연산이 가능하다는 것입니다. t+10을 계산할 때 실제로 파이썬은 magic 메서드 t.__add__(10)을 호출하고 이 메서드는 tf.add(t, 10)을 호출합니다. -, *도 마찬가지 입니다.
@ 연산은 행렬 곱셈입니다. 필요한 기본 수학 연산, 넘파이에서 볼 수 있는 연산(tf.reshape, tf.squeeze, tf.tile)을 제공합니다.
t+10 # tf.add(t, 10)
'''
<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[11., 12., 13.],
[14., 15., 16.]], dtype=float32)>
'''
tf.square(t)
'''
<tf.Tensor: shape=(2, 3), dtype=float32, numpy=
array([[ 1., 4., 9.],
[16., 25., 36.]], dtype=float32)>
'''
t @ tf.transpose(t) # tf.matmul()
'''
<tf.Tensor: shape=(2, 2), dtype=float32, numpy=
array([[14., 32.],
[32., 77.]], dtype=float32)>
'''
tf.reduce_mean(), tf.reduce_sum(), tf.reduce_max(), tf.math.log()는 np.mean(), np.sum(), np.max(), np.log()와 동일한데, 이름이 다른 이유가 있습니다.
가령 tf.transpose(t)는 넘파이의 t.T와는 조금 다른 작업을 수행합니다. 텐서플로의 tf.transpose(t)는 전치된 데이터의 복사본으로 새로운 텐서가 만들어지지만, 넘파이의 t.T는 동일한 데이터의 전치된 뷰(view)일 뿐입니다.
tf.reduce_sum() 연산도 이름을 다르게 지은 이유는 이 GPU 커널(구현)이 원소가 추가된 순서를 보장하지 않는 reduce 알고리즘을 사용하기 때문입니다.
32비트 부동 소수는 제한된 정밀도로 연산을 할 때마다 결과가 조금씩 달라질 수 있습니다. tf.reduce_mean(), tf.reduce_sum() 등은 그렇습니다.(tf.reduce_max()는 값이 안바뀜)
케라스의 저수준 API
케라스 API는 keras.backend에 자체적인 저수준 API를 가지고 있습니다. square(), exp(), sqrt() 같은 함수들이 포함됩니다.
tf.keras에서 이런 함수는 보통 상응하는 텐서플로 연산을 호출하는게 전부입니다. 따라서 다른 케라스 구현에 적용할 수 있는 코드를 작성하려면 keras.backend의 함수를 사용해야합니다. 하지만, keras.backend에서 제공하는 함수는 텐서플로에서 제공하는 함수의 일부만 지원합니다.
keras.backend를 사용하는 간단한 예입니다. 보통 별칭 K를 사용합니다.
from tensorflow import keras
K = keras.backend
K.square(K.transpose(t))+10
'''
<tf.Tensor: shape=(3, 2), dtype=float32, numpy=
array([[11., 26.],
[14., 35.],
[19., 46.]], dtype=float32)>
'''
텐서와 넘파이
텐서는 넘파이 배열과 변환이 가능합니다. 따라서 넘파이와 함께 사용하기 편리합니다. 넘파이 배열에 텐서플로 연산을 적용할 수 있고 텐서에 넘파이 연산을 적용할 수도 있습니다.
a = np.array([2., 4., 5.])
tf.constant(a, dtype=tf.float32)
'''
<tf.Tensor: shape=(3,), dtype=float32, numpy=array([2., 4., 5.], dtype=float32)>
'''
t.numpy()
'''
array([[1., 2., 3.],
[4., 5., 6.]], dtype=float32)
'''
tf.square(a)
'''
<tf.Tensor: shape=(3,), dtype=float64, numpy=array([ 4., 16., 25.])>
'''
np.square(t)
'''
array([[ 1., 4., 9.],
[16., 25., 36.]], dtype=float32)
'''
CAUTION
넘파이는 기본으로 64비트 정밀도를 사용하고 텐서플로는 32비트 정밀도를 사용합니다. 일반적으로 신경망은 32비트의 정밀도로 충분하고 메모리도 적게 사용하며 빠르기 때문입니다. ndarray → tensor로 만들 시에 dtype=tf.float32로 지정해야합니다.
타입 변환
타입 변환은 성능을 크게 감소시킬 수 있어 텐서플로는 어떤 타입 변환도 자동으로 수행하지 않습니다. 서로 호환되지 않는 타입의 텐서를 연산하면 예외가 발생합니다. 가령 실수 텐서와 정수 텐서는 더할 수 없고, 32bit 실수와 64bit 실수도 더할 수 없습니다.
tf.constant(2.) + tf.constant(40)
'''
InvalidArgumentError: cannot compute AddV2 as input #1(zero-based) was expected to
be a float tensor but is a int32 tensor [Op:AddV2]
'''
tf.constant(2.) + tf.constant(40., dtype=tf.float64)
'''
cannot compute AddV2 as input #1(zero-based) was expected to
be a float tensor but is a double tensor [Op:AddV2]
'''
진짜 타입 변환이 필요할 때는 tf.cast() 함수를 사용할 수 있습니다.
t2 = tf.constant(40., dtype=tf.float64)
tf.constant(2.0) + tf.cast(t2, tf.float32)
'''
<tf.Tensor: shape=(), dtype=float32, numpy=42.0>
'''
변수
지금까지 살펴본 tf.Tensor 객체는 변경이 불가능합니다. 따라서 일반적인 텐서로는 역전파로 변경되어야 하는 신경망의 가중치를 구현할 수 없습니다. 이것이 tf.Variable이 필요한 이유입니다. (지금까지의 결과는 불변의 Tensor 객체들의 연산 결과에 대한 복사본을 보여줬습니다.)
v = tf.Variable([[1., 2., 3.], [4., 5., 6.]])
v
'''
<tf.Variable 'Variable:0' shape=(2, 3) dtype=float32, numpy=
array([[1., 2., 3.],
[4., 5., 6.]], dtype=float32)>
'''
tf.Variable은 tf.Tensor와 비슷하게 동작합니다. 동일한 연산을 수행할 수 있고 넘파이와도 잘 호환됩니다. 데이터 타입의 변경이 까다로운 것도 마찬가지입니다.
tf.Variable는 tf.Tensor를 사용해 값을 저장합니다. 변수의 값을 증가시키거나 원소의 값을 바꾸면 새로운 텐서가 만들어집니다.
assign() 메서드를 사용하여 변숫값을 바꿀 수 있습니다. assign_add()나 assign_sub() 메서드를 사용하면 주어진 값만큼 변수를 증가시키거나 감소시킬 수 있습니다. 원소(또는 슬라이스)의 assign(), scatter_update(), scatter_nd_update() 메서드를 사용하여 개별 원소(또는 슬라이스)를 수정할 수도 있습니다.
v.assign(2*v)
'''
[[ 2., 4., 6.],
[ 8., 10., 12.]]
'''
v[0, 1].assign(42)
'''
[[ 2., 42., 6.],
[ 8., 10., 12.]]
'''
v[:, 2].assign([0., 1.])
'''
[[ 2., 42., 0.],
[ 8., 10., 1.]]
'''
v.scatter_nd_update(indices=[[0, 0], [1, 2]], updates=[100., 200.])
'''
[[100., 42., 0.],
[ 8., 10., 200.]]
'''
Note
케라스는 add_weight() 메서드로 변수 생성을 대신 처리해주어서 실전에서 변수를 직접 만드는 일은 매우 드뭅니다. 모델 파라미터 또한 옵티마이저가 업데이트해주므로 수동으로 변수를 업데이트하는 일도 드뭅니다.
다른 데이터 구조
텐서플로는 다음과 같은 몇 가지 다른 구조도 지원합니다.(상세 내용은 부록 참고)
- 희소 텐서 sparse tensor (tf.SparseTensor)
대부분 0으로 채워진 텐서를 효율적으로 나타냅니다. tf.sparse 패키지는 희소 텐서를 위한 연산 제공
- 텐서 배열 tensor array (tf.TensorArray)
텐서의 리스트, 고정된 길이를 가지지면 동적으로 바꿀 수 있습니다. 리스트의 모든 텐서는 크기, 데이터 타입이 동일해야합니다.
- 레그드 텐서 ragged tensor (tf.RaggedTensor)
레그드 텐서는 리스트의 리스트를 나타냅니다. 텐서에 포함된 값은 동일한 데이터 타입을 가져야 하지만, 리스트의 길이는 다를 수 있습니다. tf.ragged 패키지는 레그드 텐서를 위한 연산을 제공합니다.
- 문자열 텐서 string tensor
tf.string 타입의 텐서입니다. Unicode가 아닌 바이트 문자열을 나타냅니다. 유니코드를 사용해 문자열 텐서('cafe'와 같은 일반적인 파이썬 문자열)를 만들면 UTF-8로 자동 인코딩됩니다. tf.int32 텐서를 사용해 유니코드 문자열을 표현할 수 있습니다.(예를 들면, cafe를 [99, 97, 102, 233]) tf.strings 패키지는 바이트 문자열과 유니코드 문자열과 이런 텐서 사이의 변환을 위한 연산을 제공합니다.
- 집합 set
집합은 일반적인 텐서로 나타냅니다. tf.constant([[1, 2], [3, 4]])는 두 개의 집합 {1, 2}와 {3, 4}를 나타냅니다. 일반적으로 각 집합은 텐서의 마지막 축에 있는 벡터에 의해 표현됩니다. tf.sets 패키지의 연산을 사용해 집합을 다룰 수 있습니다.
- 큐 queue
큐는 단계별로 텐서를 저장합니다. FIFO(first in first out), PriorityQueue(어떤 원소에 우선권을 주는 큐), RandomShuffleQueue(원소를 섞는 큐), PaddingFIFOQueue(패딩을 추가하여 크기가 다른 원소의 배치를 만드는 큐) 등이 있습니다. 이 클래스들은 tf.queue 패키지에 포함되어 있습니다.
텐서, 연산, 변수, 다양한 데이터 구조를 알아보았으니 모델과 훈련 알고리즘을 커스터마이즈할 준비를 마쳤습니다.
사용자 정의 모델과 훈련 알고리즘
사용자 정의 손실 함수
회귀 모델을 훈련하는데 잡음 데이터가 있다고 합시다. 평균 제곱 오차는 이상치에 관대해서 훈련이 수렴되기까지 시간이 걸립니다. 평균 절댓값 오차는 이상치에 너무 관대해 훈련이 수렴되기까지 시간이 걸립니다. 그리고 모델이 정밀하게 수렴되지 않을 것입니다.
이런 경우 MSE 대신 Huber 손실(tf.keras.huber)을 적용하면 됩니다. tf.keras가 아닌 공식 케라스 API에서는 아직 후버 손실을 지원하지 않습니다.
def huber_fn(y_true, y_pred):
error = y_true - y_pred
is_small_error = tf.abs(error) < 1
squared_loss = tf.square(error)/2
linear_loss = tf.abs(error) - 0.5
return tf.where(is_small_error, squared_loss, linear_loss)
CAUTION
성능을 위해서는 위 처럼 벡터화하여 구현해야 합니다. 또한 텐서플로 그래프의 장점을 활용하려면 텐서플로 연산만 사용해야 합니다.
전체 손실의 평균이 아니라 샘플마다 하나의 손실을 담은 텐서를 반환하는 것이 좋습니다. 이렇게 해야 필요할 때 케라스가 클래스 가중치나 샘플 가중치를 적용할 수 있습니다. (10장 참조)이제 훈련하는 동안 배치마다 케라스는 huber_fn() 함수를 호출하여 손실을 계산하고 경사하강법을 수행합니다. 또한 에포크 시작부터 전체 손실을 기록하여 평균 손실을 출력합니다.
model.compile(loss=huber_fn, optimizer='nadam')
model.fit(X_train, y_train, ...)
huber_fn() 함수로 손실을 계산하고 이를 사용해 경사하강법을 수행합니다. 또한 에포크 시작부터 전체 손실을 기록하여 평균 손실을 출력합니다.
모델을 저장할 때 사용자 정의 손실에 생기는 문제를 알아보겠습니다.
사용자 정의 요소를 가진 모델을 저장하고 로드하기
케라스가 함수 이름을 저장하므로 사용자 정의 손실 함수를 사용하는 모델은 문제 없이 저장되지만, 모델을 load할 때는 함수 이름과 실제 함수를 매핑한 딕셔너리를 전달해야합니다.
조금 더 일반적으로 말하면, 사용자 정의 객체를 포함한 모델을 load할 때는 그 이름과 객체를 매핑해야합니다.
model = keras.models.load_model('my_model_with_a_custom_loss.h5',
custom_objects={'huber_fn': huber_fn})
huber_fn에 small_error의 다른 기준이 필요할 때는 매개 변수를 받을 수 있는 함수를 만들면 됩니다.
def create_huber(threshold=1.0):
def huber_fn(y_true, y_pred):
error = y_true - y_pred
is_small_error = tf.abs(error) < threshold
squared_loss = tf.square(error)/2
linear_loss = threshold * tf.abs(error) - threshold**2/2
return tf.where(is_small_error, squared_error, linear_loss)
model.compile(loss=create_huber(2.0), optimizer='nadam')
모델을 저장할 때 threshold 값은 저장되지 않습니다. 따라서 모델을 로드할 때 threshold 값을 지정해야 합니다. 새로 정의한 이름이 아닌 저장한 케라스 모델에 사용했던 함수 이름인 'huber_fn'을 사용합니다.
model = keras.models.load_model('my_model_with_a_custom_loss_threshold_2.h5',
custom_objects={'huber_fn': create_huber(2.0)})
이 문제는 keras.losses.Loss 클래스를 상속하고 get_config() 메서드를 구현하여 해결할 수 있습니다.
class HuberLoss(keras.losses.Loss):
def __init__(self, threshold=1.0, **kwargs):
self.threshold = threshold
super().__init__(**kwargs)
def call(self, y_true, y_pred):
error = y_true - y_pred
is_small_error = tf.abs(error) < self.threshold
squared_loss = tf.square(error) / 2
linear_loss = self.threshold * tf.abs(error) - self.threshold**2/2
return tf.where(is_small_error, squared_loss, linear_loss)
def get_config(self):
base_config = super().get_config()
return {**base_config, 'threshold': self.threshold}
CAUTION
케라스 API는 현재 층, 모델, 콜백, 큐제를 상속하는 방법만 정의하고 있습니다.
- 생성자는 기본적인 하이퍼파라미터를 **kwargs로 받은 매개변수 값을 부모 클래스의 생성자에게 전달합니다. 손실함수의 name, 개별 샘플의 손실을 모으기 위해 사용할 reduction 알고리즘을 전달합니다. 기본 reduction 알고리즘은 'sum_over_batch_size'로 (샘플 손실*가중치)/배치 크기 처럼 계산합니다.
- call() 메서드는 레이블과 예측을 받고 모든 샘플의 손실을 계산하여 반환합니다.
- get_config() 메서드는 하이퍼파라미터 이름과 같이 매핑된 딕셔너리를 반환합니다. 먼저 부모 클래스의 get_config() 메서드를 호출하고 그 다음 반환된 딕셔너리에 새로운 하이퍼 파라미터를 추가합니다.
언패킹 연산자*, 딕셔너리 언패킹 연산자**:
param={'a':1, 'b':2}; func(**param)은 func(a=1, b=2)와 같습니다.
모델을 컴파일할 때 HuberLoss 클래스의 인스턴스를 사용할 수 있습니다.
model.compile(loss=HuberLoss(2.), optimizer='nadam')
모델을 저장할 때 threshold 값도 함께 저장되는데, 모델을 로드할 때 클래스 이름과 클래스 자체를 매핑해주어야 합니다.
model = keras.models.load_model('my_model_with_a_custom_loss_class.h5',
custom_objects={'HuberLoss': HuberLoss})
모델을 저장할 때 케라스는 손실 객체의 get_config() 메서드를 호출해 반환된 설정을 HDF5 파일에 JSON 형태로 저장합니다. 모델을 로드하면 HuberLoss 클래스의 from_config() 메서드를 호출합니다. 이 메서드는 기본 손실 클래스 Loss에 구현되어 있고 생성자에게 **config 매개변수를 전달해 클래스의 인스턴스를 만듭니다.
활성화 함수, 초기화, 규제, 제한을 커스터마이징하기
손실, 규제, 제한, 초기화, 지표, 활성화 함수, 층, 모델과 같은 대부분의 케라스 기능은 유사한 방법으로 커스터마이징할 수 있습니다. 대부분의 경우 적절한 입력과 출력을 가진 간단한 함수를 작성하면 됩니다.
def my_softplus(z):
return tf.math.log(tf.exp(z)+1.0) # keras.activations.softplus(), tf.nn.softplus()
def my_glorot_initializer(shape, dtype=tf.float32): # keras.initializers.glorot_normal()
stddev = tf.sqrt(2. / (shape[0] + shape[1]))
return tf.random.normal(shape, stddev = stddev, dtype=dtype)
def my_l1_regularizer(weights): # keras.regularizers.l1(0.01)
return tf.reduce_sum(tf.abs(0.01*weights))
def my_positive_weights(weights): # keras.constraints.nonneg(), tf.nn.relu()
return tf.where(weights < 0., tf.zeros_like(weights), weights)
만들어진 사용자 정의 함수는 보통의 함수와 동일하게 사용할 수 있습니다.
layer = keras.layers.Dense(30, activation=my_softplus,
kernel_initializer=my_glorot_initializer,
kernel_regularizer=my_l1_regularizer,
kernel_constraint=my_positive_weights)
함수가 모델과 함께 저장해야 할 파라미터를 가지고 있다면 keras.regularizers.Regularizer, keras.constraints.Constraint, keras.initializer.Initializer, (활성화 함수를 포함하여 층을 상속하려면) keras.layers.Layer와 같이 적절한 클래스를 상속합니다.
손실함수에서와 같이 factor를 저장하는 $l_1$규제를 위한 간단한 클래스 예입니다.
class MyL1Regularizer(keras.regularizers.Regularizer):
def __init__(self, factor):
self.factor = factor
def __call__(self, weights):
return tf.reduce_sum(tf.abs(self.factor*weights))
def get_config(self):
return {'factor':self.factor}
CATION_
Regularizer 클래스는 __init__(), get_config() 메서드가 정의되어 있지 않아서 호출할 필요가 없지만, 가령 Regularizer 클래스를 상속한 사용자 정의 함수 L1L2()를 상속하는 다른 사용자 함수를 만들 때는 super().__init__, super().get_config() 메서드를 호출해야합니다. L1L2에는 생성자, get_config() 함수가 있기 때문입니다.
손실, (활성화 함수를 포함하여)층, 모델의 경우 call() 메서드를 구현해야 하며 규제, 초기화, 제한의 경우 __call__() 메서드를 구현해야합니다. 지표는 조금 달라 지금 살펴보겠습니다.
사용자 정의 지표
손실과 지표는 개념적으로 같습니다. 손실(ex, cross entropy)은 모델을 훈련하기 위해 경사 하강법에서 사용하므로 미분 가능, 그래디언트가 모든 곳에서 0이 아니어야 합니다.
지표(ex, accuracy)는 모델을 평가할 때 사용합니다. 미분이 가능하지 않아도 되고 모든 곳에서 그레디언트가 0이어도 됩니다.
대부분의 사용자 지표 함수는 사용자 손실 함수를 만드는 것과 동일합니다. 앞에서 만든 huber 손실 함수는 지표로도 잘 동작합니다.(모델을 저장할 때, 동일하게 함수의 이름 'huber_fn'만 저장됩니다.)
model.compile(loss='mse', optimizer='nadam', metrics=[create_huber(2.0)])
훈련하는 동안 각 배치에 대해 케라스는 지표를 계산하고, 에포크가 시작할 때부터 평균을 기록합니다. 이 방식은 대부분 정상적으로 동작하지만, 예외가 있습니다.
이진 분류기의 정밀도를 생각해봅시다. $\frac{TP}{TP+FP}$, 모델이 첫 번째 배치에서 5개를 양성 예측하고 4개가 맞았다면 정밀도: 80%입니다. 두 번째 배치에서 3개의 양성 예측을 만들었는데 모두 틀렸다면 두 번째 배치 정밀도: 0%입니다.
두 정밀도를 평균하면 40%를 얻지만, 두 배치의 결과를 합쳐서 정밀도를 계산하면 50%($\frac{4+0}{8}$)가 나옵니다. 따라서 TP, FP를 기록하고 정밀도를 계산해줄 객체가 필요합니다. 즉, keras.metrics.Precision 클래스가 필요합니다.
precision = keras.metrics.Precision()
precision([0, 1, 1, 1, 0, 1, 0, 1], [1, 1, 0, 1, 0, 1, 0, 1])
'''
<tf.Tensor: shape=(), dtype=float32, numpy=0.8>
'''
precision([0, 1, 0, 0, 1, 0, 1, 1], [1, 0, 1, 1, 0, 0, 0, 0])
'''
<tf.Tensor: shape=(), dtype=float32, numpy=0.5>
'''
Precision 클래스 객체를 만들고 첫 번째 파라미터와 두 번째 파라미터에 레이블과 예측을 전달합니다. 앞서 소개한 예와 동일한 개수의 진짜 양성과 거짓 양성을 사용했습니다. 첫 번째 배치를 처리한 후 정밀도는 0.8이고 두 번째 배치를 처리한 후에 정밀도는 0.5입니다. (두 번째 배치 정밀도가 아니라 전체 정밀도를 계산합니다.) 배치마다 점진적으로 업데이트되기 때문에 이를 스트리밍 지표(streaming metric, 혹은 stateful metric 상태가 있는 지표)라고 부릅니다.
result() 메서드를 호출하여 현재 지푯값을 얻을 수 있고, variables 속성을 사용하여 (TP, FP를 기록한) 변수를 확인할 수도 있습니다. reset_states()를 사용해 이 변수를 초기화할 수 있습니다.
precision.result()
'''
<tf.Tensor: shape=(), dtype=float32, numpy=0.5>
'''
precision.variables
'''
[<tf.Variable 'true_positives:0' shape=(1,) dtype=float32, numpy=array([4.], dtype=float32)>,
<tf.Variable 'false_positives:0' shape=(1,) dtype=float32, numpy=array([4.], dtype=float32)>]
'''
precision.reset_states()
precision.variables
'''
[<tf.Variable 'true_positives:0' shape=(1,) dtype=float32, numpy=array([0.], dtype=float32)>,
<tf.Variable 'false_positives:0' shape=(1,) dtype=float32, numpy=array([0.], dtype=float32)>]
'''
이런 스트리밍 지표를 만들고 싶다면 keras.metrics.Metric 클래스를 상속합니다. 다음 예는 전체 후버 손실과 지금까지 저리한 샘플 수를 기록하는 클래스입니다. result() 메서드를 호출하면 평균 후버 손실이 반환됩니다.
class HuberMetric(keras.metrics.Metric):
def __init__(self, threshold=1.0, **kwargs):
super().__init__(**kwargs) # 기본 매개변수 처리(예, dtype)
self.threshold = threshold
self.huber_fn = create_huber(threshold)
self.total = self.add_weight('total', initializer='zeros')
self.count = self.add_weight('count', initializer='zeros')
def update_state(self, y_true, y_pred, sample_weight=None):
metric = self.huber_fn(y_true, y_pred)
self.total.assign_add(tf.reduce_sum(metric))
self.count.assign_add(tf.cast(tf.size(y_true), tf.float32))
def result(self):
return self.total / self.count
def get_config(self):
base_config = super().get_config()
return {**base_config, 'threshold': self.threshold}
- 생성자는 add_weight() 메서드를 사용해 여러 배치에 걸쳐 지표의 상태를 기록하기 위한 변수를 만듭니다. 이 예제에서는 후버 손실의 합(total)과 지금까지 처리한 샘플 수(count)를 기록합니다. 원한다면 수동으로 변수를 만들 수 있습니다. 케라스는 속성으로 만들어진 tf.Variable을 관리합니다.
- update_state() 메서드는 이 클레스를 함수처럼 사용할 때 호출됩니다. (Precision 객체에서 했던 것처럼) 배치의 레이블과 예측을 바탕으로 변수를 업데이트합니다.
- result() 메서드는 최종 결과를 계산하고 반환합니다. HuberMetric 클래스를 함수처럼 사용하면 update_state() 메서드가 호출되고 result() 메서드가 반환됩니다.
- get_config() 메서드를 구현하여 threshold 변수를 모델과 함께 저장합니다.
- reset_status() 메서드는 기본적으로 모든 변수를 0.0으로 초기화합니다. 필요하면 이 함수를 재정의(override)할 수 있습니다.
지표를 간단한 함수로만 정의하면 배치마다 자동으로 이 함수를 호출하고 에포크 동안 평균을 기록합니다.
지표를 클래스로 정의하면 모델과 함께, threshold를 저장할 수 있고 정밀도와 같이 배치에 걸쳐 단순히 평균을 낼 수 없는 지표를 스트리밍 지표로 구현할 수 있습니다.
사용자 정의 층
동일한 층 블럭이 여러 번 반복되는 경우, 예를 들어 A, B, C, A, B, C .. 가 반복된다면 A, B, C를 사용자 정의 층 D로 만들고 D, D, ..로 구성된 모델을 만들 수 있습니다.
keras.layers.Flatten과 keras.layers.ReLU와 같은 층은 가중치가 없습니다. 가중치가 필요없는 사용자 정의 층을 만들기 위한 간단한 방법은 파이썬 함수를 만든 후 keras.layers.Lambda 층으로 감싸는 것입니다.
exponential_layer = keras.layers.Lambda(lambda x: tf.exp(x))
상태가 있는 층(가중치를 가진 층)을 만들려면 keras.layers.Layer를 상속해야 합니다. 예를 들어 다음은 Dense 층을 간소화한 것입니다.
class MyDense(keras.layers.Layer):
def __init__(self, units, activation=None, **kwargs):
super().__init__(**kwargs)
self.units = units
self.activation = keras.activations.get(activation)
def build(self, batch_input_shape):
self.kernel = self.add_weight(
name='kernel', shape=[batch_input_shape[-1], self.units],
initializer='glorot_normal'
)
self.bias = self.add_weight(
name='bias', shape=[self.units], initializer='zeros'
)
super().build(batch_input_shape)
def call(self, X):
return self.activation(X @ self.kernel + self.bias)
def compute_output_shape(self, batch_input_shape):
return tf.TensorShape(batch_input_shape.as_list()[:-1]+[self.units])
def get_config(self):
base_config = super().get_config()
return {**base_config, 'units':self.units,
'activation': keras.activations.serialize(self.activation)}
생성자에서 모든 파라미터를 입력받습니다. **kwargs 매개변수를 부모 생성자에 전달함으로써 input_shape, trainable, name과 같은 기본 매개변수들을 처리할 수 있습니다. 그 다음 하이퍼 파라미터를 속성에 저장하고 activation 매개 변수를 keras.activations.get() (혹은 keras.layers.Activation)을 이용해 적절한 활성화 함수로 바꿉니다.
keras.activations.get()은 함수 객체나 'relu', 'selu', None과 같은 문자열을 받습니다.
build() 메서드는 add_weight() 메서드를 호출하여 층의 변수를 만드는 것입니다. build() 메서드는 층이 처음 사용될 때 호출됩니다. build() 메서드의 입력으로 input_shape를 전달받습니다. 가중치를 만들 때 크기가 꼭 필요한 경우가 있습니다. 예를 들어 연결 가중치 (즉, 'kernel')를 만들려면 이전 층의 뉴런 개수를 알아야 합니다. 이 크기는 입력의 마지막 차원 크기에 해당합니다. build() 메서드 끝에 반드시 부모의 build() 메서드를 호출해야 하는데, 이는 self.built=True로 설정되어 케라스가 층이 만들어졌다는 걸 인식하기 때문입니다.
call() 메서드는 이 층에 필요한 연산을 수행합니다. Dense의 경우 입력 X와 층의 커널을 행렬 곱하고 편향을 더한 결과에 활성화 함수를 적용합니다. 이 최종 계산값이 층의 출력이 됩니다.
compute_output_shape() 메서드는 이 층의 출력 크기를 반환합니다. 마지막 차원을 제외한 차원, 마지막 차원은 이 층의 뉴런 개수로 이루어집니다.
동적인 층을 제외하고 tf.keras가 자동으로 출력 크기를 추측할 수 있다면, compute_output_shape() 메서드를 생략할 수 있습니다. 다른 케라스 구현에서는 이 메서드가 필수적이거나 출력 크기가 입력 크기와 동일하다고 가정합니다.
get_config() 메서드는 앞서 보았던 것과 같이 keras.activations.serialize()를 사용하여 활성화 함수의 전체 설정을 저장합니다.
Concatenate 층과 같은 여러 입력을 받는 층을 만들려면, call() 메서드에 모든 입력이 포함된 튜플을 매개변수 값으로 전달해야 합니다. 비슷하게 compute_output_shape() 메서드의 매개변수도 각 입력의 배치 크기를 담은 튜플이어야 합니다.
출력이 여러 개인 층을 만들려면 call() 메서드에서 출력 리스트를 반환해야 합니다. 아래 예제는 두 개의 입력과 세 개의 출력을 만드는 층입니다.
class MyMultiLayer(keras.layers.Layer):
def call(self, X):
X1, X2 = X
return [X1 + X2, X1*X2, X1/X2]
def compute_output_shape(self, batch_input_shape):
b1, b2 = batch_input_shape
return [b1, b1, b1] # 올바르게 브로드캐스팅되어야 합니다.
출력이 여러 개이므로 함수형 API, 서브클래싱 API에서만 사용할 수 있습니다. (한 개의 입력과 한 개의 출력을 가진 층만 사용하는) Sequential API에는 사용할 수 없습니다.
훈련과 테스트에서 다르게 동작하는 Dropout, BatchNormalization과 같은 층을 상용하려면, call() 메서드에 training 매개변수를 추가해 훈련인지 테스트인지 결정해야합니다. 아래 예제는 훈련 동안 규제 목적으로 가우스 잡음을 추가하고 테스트 시에는 아무 것도 하지 않는 층(keras.layers.GaussianNoise)을 만들어봅시다.
class MyGaussianNoise(keras.layers.Layer):
def __init__(self, stddev, **kwargs):
super().__init__(**kwargs)
self.stddev = stddev
def call(self, X, training=None):
if training:
noise = tf.tf.random.normal(tf.shape(X), stddev=self.stddev)
return X + noise
else:
return X
def compute_output_shape(self, batch_input_shape):
return batch_input_shape
사용자 정의 모델
서브클래싱 API를 만드는 것이 사용자 정의 모델을 만드는 것과 같습니다. keras.Model 클래스를 상솟해 생성자에 층과 변수를 만들고 모델이 해야 할 작업을 call() 메서드에 구현합니다.
위 그림과 같은 모델을 만든다고 했을 때, 입력은 첫 번째 완전 연결 층을 지나 첫 번째 잔차 블록을 3번 반복해 통과 시키고 두 번째 잔차 블록을 지나 완전 연결된 출력층에 전달됩니다.
먼저 ResidualBlock 층을 만들겠습니다.
class ResidualBlock(keras.layers.Layer):
def __init__(self, n_layers, n_neurons, **kwargs):
super().__init__(**kwargs)
self.hidden = [keras.layers.Dense(n_neurons, activation='elu',
kernel_initializer='he_normal')
for _ in range(n_layers)]
def call(self, inputs):
Z = inputs
for layer in self.hidden:
Z = layer(Z)
return inputs + Z
이 층은 다른 층을 포함하고 있기 때문에 케라스가 알아서 추적해야 할 객체(층)가 담긴 hidden 속성을 감지하고, 필요한 변수를 자동으로 이 층의 변수 리스트에 추가합니다.?
그 다음 서브클레싱 모델을 사용해 그림의 모델을 정의해봅시다.
class ResidualRegressor(keras.Model):
def __init__(self, output_dim, **kwargs):
super().__init__(**kwargs)
self.hidden1 = keras.layers.Dense(30, activation='elu',
kernel_initializer='he_normal')
self.block1 = ResidualBlock(2, 30)
self.block2 = ResidualBlock(2, 30)
self.out = keras.layers.Dense(output_dim)
def call(self, inputs):
Z = self.hidden1(inputs)
for _ in range(1+3):
Z = self.block1(Z)
Z = self.block2(Z)
return self.out(Z)
생성자에서 층을 만들고 call() 메서드에서 이를 이용합니다. 이 모델을 다른 일반 모델처럼 compile, evaluate, predict 할 수 있습니다.
save()로 저장하고 keras.models.load_model() 함수를 이용해 모델을 로드하고 싶다면 get_config() 메서드를 구현해야합니다. 또한, save_weights(), load_weights() 메서드를 이용해 가중치를 저장, 로드할 수 있습니다.
Model 클래스는 Layer 클래스의 서브클래스이므로 모델을 층처럼 정의할 수 있습니다. 하지만 모델은 compile, fit, evaluate, predict 메서드와 같은 추가적인 기능이 있습니다. 이 외에도 get_layers, save, keras.models.load_model(), keras.models.clone_model()도 지원합니다.
모델이 층보다 더 많은 기능을 제공하지만, 일반적으로 모델 안의 구성 요소를 층으로 구분하는데, 모델과 층을 구분하는 건 당연한 것입니다. 층은 Layer를 상속하고, 모델은 Model 클래스를 상속해야합니다.
이제 Sequential API, 함수형 API, 서브클래싱 API를 이용해 대부분의 모델을 만들 수 있게 됐습니다. 아직 더 살펴봐야할 것이 있기에 대부분이라고 말했는데, 모델 내부 구조에 기반한 손실과 지표를 만드는 방법, 사용자 정의 훈련 반복을 만드는 방법을 더 살펴보아야 합니다.
모델 구성 요소에 기반한 손실과 지표
앞서 정의한 사용자 손실, 지표는 모두 레이블과 예측값을 기반으로 합니다. 하지만, 은닉층의 가중치나 활성화 함수등과 같이 모델의 구성 요소에 기반한 손실을 정의해야할 때가 있습니다. 이런 손실은 규제나 모델의 내부 상황을 모니터링할 때 유용합니다.
모델의 구성 요소에 기반한 손실을 정의하고 계산하여 add_loss() 메서드에 그 결과를 전달합니다.
예제로 5개의 은닉층와 출력층으로 구성된 회귀용 MLP 모델을 만들어봅시다. 이 모델은 맨 위의 은닉층에 보조 출력을 가집니다. 이 보조 출력에 연결된 손실을 재구성 손실(reconstruction loss)이라고 부르겠습니다. 이 값은 재구성과 입력 사이의 평균 제곱 오차입니다. 이 값을 주 손실(main loss)에 더하여 모델이 가능한 많은 정보를 유지하도록 도와줍니다. 이런 손실이 이따금 일반화 성능을 향상시킵니다.(규제 손실처럼 동작합니다.)
class ReconstructingRegressor(keras.Model):
def __init__(self, output_dim, **kwargs):
super().__init__(**kwargs)
self.hidden=[keras.layers.Dense(30, activaiton='selu',
kernel_initializer='lecun_normal')
for _ in range(5)]
self.out = keras.layers.Dense(output_dim)
def build(self, batch_input_shape):
n_inputs = batch_input_shape[-1]
self.reconstruct = keras.layers.Dense(n_inputs)
super().build(batch_input_shape)
def call(self, inputs):
Z = inputs
for layer in self.hidden:
Z = layer(Z)
reconstruction = self.reconstruct(Z)
recon_loss = tf.reduct_mean(tf.square(reconstruction - inputs))
self.add_loss(0.05*recon_loss)
return self.out(Z)
생성자가 5개의 은닉층, 하나의 출력층으로 구성된 심층 신경망을 만듭니다.
build() 메서드에서 완전 연결 층을 하나 더 추가해 모델의 입력을 재구성하는 데 사용합니다. 이 완전 연결층의 뉴런 갯수(units)는 입력 개수와 같아야 하며, 이런 재구성층을 build()에 만드는 이유는 이 메서드가 호출되기 전까지는 입력 개수를 알 수 없기 때문입니다.
call() 메서드에서 입력이 5개의 은닉층을 통과하고 결괏값을 재구성층에 전달해 재구성을 만듭니다.
callI() 메서드에서 재구성 손실을 계산하고 add_loss() 메서드를 사용해 모델의 손실 리스트를 추가합니다. 0.05는 튜닝할 하이퍼 파라미터 값으로 재구성 손실이 주 손실을 압도하지 않도록 크기를 줄인 것입니다. call() 메서드의 마지막에서 출력값을 반환합니다.
모델이 각 층에서 손실을 모을 수 있도록, 모델의 어떤 층에서도 add_loss() 메서드를 호출할 수 있습니다.
앞의 손실 예제에서와 비슷하게 모델의 구성요소를 이용한 사용자 정의 지표를 추가할 수 있습니다. 단, 결과값이 지표 객체의 출력이어야 합니다. 이렇게 모델을 훈련할 때 케라스가 에포크마다 평균 손실(주 손실 + 0.05*재구성 손실)과 평균 재구성 손실을 출력합니다. 훈련되는 동안 두 값은 계속 계산됩니다.
class ReconstructingRegressor(keras.Model):
def __init__(self, output_dim, **kwargs):
super().__init__(**kwargs)
self.reconstruction_mean=keras.metrics.Mean(name='reconstruction_error')
...
def build(self, batch_input_shape):
...
def call(self, inputs, training=None):
...
recon_loss = tf.reduct_mean(tf.square(reconstruction - inputs))
if training:
result = self.reconstruction_mean(recon_loss)
self.add_metric(result)
...
return self.out(Z)
def train_step(self, data):
x, y = data
with tf.GradientTape() as tape:
y_pred = self(x)
loss = self.compiled_loss(y, y_pred, regularization_losses=[self.recon_loss])
gradients = tape.gradient(loss, self.trainable_variables)
self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))
return {m.name: m.result() for m in self.metrics}
model = ReconstructingRegressor(1)
model.compile(loss="mse", optimizer="nadam")
history = model.fit(X_train_scaled, y_train, epochs=2)
y_pred = model.predict(X_test_scaled)
'''
Epoch 1/2
363/363 [==============================] - 4s 6ms/step
- loss: 0.7886 - reconstruction_error: 0.0000e+00
Epoch 2/2
363/363 [==============================] - 2s 6ms/step
- loss: 0.4134 - reconstruction_error: 0.0000e+00
'''
지금까지 배운 내용으로 복잡한 구조, 손실, 지표를 가진 모델 중 99%는 만들 수 있습니다. 드물게 훈련 반복 자체를 제어해야 하는 경우가 있습니다. 이를 알아보기 전 텐서플로에서 그레디언트를 자동으로 어떻게 계산하는지 살펴보겠습니다.
자동 미분을 사용하여 그레디언트 계산하기
자동 미분을 사용하여 그레디언트를 자동으로 계산하는 방법을 알아봅시다.
def f(w1, w2):
return 3*w1**2 + 2*w1*w2
$w_1$에 대한 f의 도함수는 $6w_1+2w_2$라는 것을 구할 수 있습니다. $w_2$에 대한 도함수는 $2w_1$입니다. 예를 들어 (5, 3)에서의 그레디언트 벡터는 (36, 10)입니다. 손으로 도함수를 일일이 계산해서 작성하는건 수만 개의 파라미터를 가진 신경망에서는 불가능한 작업입니다. 다른 방법으로는 도함수의 정의를 이용하여 근삿값을 계산하는 것이 있습니다.
w1, w2 = 5, 3
eps = 1e-6
(f(w1+eps, w2)-f(w1, w2))/eps
'''
36.000003007075065
'''
(f(w1, w2+eps)-f(w1, w2))/eps
'''
10.000000003174137
'''
근삿값을 계싼하는 것도 파라미터마다 적어도 한 번씩은 f()를 호출해야 하므로 대규모 신경망에서는 적용하기 어렵습니다. 대신 자동 미분을 사용합니다.
w1, w2 = tf.Variable(5.), tf.Variable(3.)
with tf.GradientTape() as tape:
z = f(w1, w2)
gradients = tape.gradient(z, [w1, w2])
'''
[<tf.Tensor: shape=(), dtype=float32, numpy=36.0>,
<tf.Tensor: shape=(), dtype=float32, numpy=10.0>]
'''
tf.GradientTape 블럭을 만들어 이 변수와 관련된 모든 연산을 자동으로 기록합니다. 마지막으로 이 tape에 두 변수[w1, w2]에 대한 z의 그레디언트를 요청합니다. 결과도 부동소수점 오차에 따른 정밀도가 제한될 뿐 정확하고, 변수가 얼마나 많든지 gradient() 메서드는 기록된 계산을 한 번만에 (거꾸로) 통과했습니다. 매우 효율적인 방법입니다.
Tip
메모리를 절약하려면 tf.GradientTape() 블록 안에 최소한만 담으세요. tf.GradientTape() 블록 안에서 with tape.stop_recording() 블록을 만들어 계산을 기록하지 않을 수 있습니다.
gradient() 메서드가 호출된 후에는 자동으로 테이프가 지워집니다. gradient() 메서드를 두 번 호출하면 에러가 발생합니다.
with tf.GradientTape() as tape:
z = f(w1, w2)
dz_dw1 = tape.gradient(z, w1) # 36.0 텐서
dz_dw1 = tape.gradient(z, w2) # 실행 에러
gradient() 메서드를 한 번 이상 호출하려면 지속 가능한 테이프를 만들고 사용이 끝난 이후 테이프를 삭제해 리소스를 해제해야합니다.
with tf.GradientTape(persistent=True) as tape:
z = f(w1, w2)
dz_dw1 = tape.gradient(z, w1) # 36.0 텐서
dz_dw2 = tape.gradient(z, w1) # 10.0 텐서
del tape
tape는 변수가 포함된 연산만 기록합니다. 변수가 아닌 다른 객체에 대한 z의 그레디언트를 계산하면 None이 반환됩니다.
c1, c2 = tf.constant(5.), tf.constant(3.)
with tf.GradientTape() as tape:
z = f(c1, c2)
gradients = tape.gradient(z, [c1, c2]) # [None, None]이 반환
하지만, 어떤 텐서라도 감시하여 관련된 모든 연산을 기록하도록 강제할 수 있습니다.
with tf.GradientTape() as tape:
tape.watch(c1)
tape.watch(c2)
z = f(c1, c2)
gradients = tape.gradient(z, [c1, c2]) # [36. 텐서, 10. 텐서] 반환
이렇게 감시를 통한 계산이 유용한 경우가 있습니다. 입력이 작을 때 변동 폭이 큰 활성화 함수에 대한 규제 손실을 구현하는 경우입니다. 이 손실은 입력에 대한 활성화 함수의 그레디언트를 기반으로 할 것입니다. 입력은 변수가 아니므로 테이프에 기록을 명시적으로 해줘야합니다.
대부분 그레디언트 테이프는 여러 모델 파라미터에 대한 하나의 손실의 그레디언트를 계산하는데 사용됩니다. 여러 손실이 포함된 벡터의 그레디언트를 계산하면 텐서플로는 벡터의 합의 그레디언트를 계산할 것입니다.
with tf.GradientTape() as tape:
z1 = f(w1, w2 + 2.)
z2 = f(w1, w2 + 5.)
z3 = f(w1, w2 + 7.)
tape.gradient([z1, z2, z3], [w1, w2]) # [136. 텐서, 30. 텐서]
어떤 경우는 신경망의 일부분에 그레디언트가 역전파되지 않도록 막을 필요가 있스빈다. 이렇게 하려면 tf.stop_gradient() 함수를 사용해야 합니다. 이 함수는 정방향으로 계산 시 tf.identity()처럼 입력을 그대로 반환하고, 역전파시에 그레디언트를 전파하지 않습니다.
def f(w1, w2):
return 3*w1**2 + tf.stop_gradient(2*w1*w2)
with tf.GradientTape() as tape:
z = f(w1, w2)
gradients = tape.gradient(z, [w1, w2]) # [30. 텐서, None] 반환
그레디언트를 계산할 때 수치적인 이슈가 발생할 수 있습니다. 너무 큰 입력에 대해 그레디언트를 계산하면 NaN이 반환됩니다.
def my_softplus(z): # tf.nn.softplus(z)
return tf.math.log(tf.exp(z) + 1.0)
x = tf.Variable([100.])
with tf.GradientTape() as tape:
z = my_softplus(x)
gradients = tape.gradient(z, [x]) # [nan 텐서]
자동 미분하여 이 함수의 그레디언트를 계산하는 것이 수치적으로 불안정하기 때문입니다. 즉, 부동소수점 정밀도 오류로 인해 자동 미분이 $\frac{∞}{∞}$를 계산하게 됩니다. 그래서 NaN을 반환합니다. softplus의 도함수는 $\frac{1}{\left(1+e^{-x}\right)} $로 해석적으로 구할 수 있습니다.
decorator(데코레이터) @tf.custom_gradient를 사용해 일반 출력, 도함수를 계산하는 함수를 반환해 텐서플로가 my_softplus() 함수의 그레디언트를 계산할 때 안전한 함수를 사용하도록 만들 수 있습니다.
@tf.custom_gradient
def my_better_softplus(z):
exp = tf.exp(z)
def my_softplus_gradients(grad):
return grad/(1+1/exp)
return tf.math.log(exp+1), my_softplus_gradients
x = tf.Variable([1000.])
with tf.GradientTape() as tape:
z = my_better_softplus(x)
z, tape.gradient(z, [x])
'''
(<tf.Tensor: shape=(1,), dtype=float32, numpy=array([inf], dtype=float32)>,
[<tf.Tensor: shape=(1,), dtype=float32, numpy=array([1.], dtype=float32)>])
'''
z의 입력이 클 때 tf.where을 이용해 입력값을 그대로 반환할 수 있습니다.
def my_better_softplus(z):
return tf.where(z > 30., z, tf.math.log(tf.exp(z) + 1.))
# tf.where([boolean, ..], True일 때 값, False일 때 값)
x = tf.Variable([1000.])
with tf.GradientTape() as tape:
z = my_better_softplus(x)
z, tape.gradient(z, [x])
'''
(<tf.Tensor: shape=(1,), dtype=float32, numpy=array([1000.], dtype=float32)>,
[<tf.Tensor: shape=(1,), dtype=float32, numpy=array([nan], dtype=float32)>])
'''
이제 필요할 때 역전파를 멈추거나 자신만의 그레디언트 함수를 작성할 수 있습니다.
사용자 정의 훈련 반복
보통 fit() 메서드는 optimizer를 1개만 사용가능합니다. compile() 메서드에 하나의 옵티마이저만 지정할 수 있기 때문인데, 훈련 반복을 직접 구현하면 두 개를 사용할 수도 있습니다.
Tip
정말 극도로 유연성이 필요한 것이 아니면 사용자 정의 훈련 보단 fit() 메서드를 사용하는 것이 좋습니다. 버그가 많이 발생하고 유지보수하기 어려운 코드가 발생합니다.
케라스 모델의 train_stop 메서드를 오버라이딩하여 사용자 정의 훈련을 구현할 수도 있습니다.
간단한 모델을 아래처럼 만들었습니다.
l2_reg = keras.regularizers.l2(0.05)
model = keras.models.Sequential([
keras.layers.Dense(30, activation='elu', kernel_initializer='he_normal',
kernel_regularizer=l2_reg),
keras.layers.Dense(1, kernel_regularizer=l2_reg)
])
샘플 배치를 랜덤하게 추출하는 함수를 만듭니다.
def random_batch(X, y, batch_size=32):
idx = np.random.randint(len(X), size=batch_size)
return X[idx], y[idx]
이 함수는 중복을 허용해 샘플링합니다. 전체 데이터를 순회하는 미니배치를 만들려면 np.random.permutation(np.arange(len(X)))으로 랜덤하게 섞인 인덱스를 만든 후 yield문을 사용하여 미니배치 크기만큼 순서대로 데이터를 반환하세요.
현재 스텝 수, 전체 스텝 횟수, 에포크 시작부터 평균 손실(Mean 지표를 사용해 계산), 그 외 다른 지표를 포함해 훈련 상태를 출력하는 함수도 만듭니다.
def print_status_bar(iteration, total, loss, metrics=None):
metrics = ' - '.join(['{}: {:.4f}'.format(m.name, m.result()) for m in [loss]+(metrics or [])])
end = '' if iteration < total else '\n'
print('\r{}/{} - '.format(iteration, total) + metrics, end=end)
end=''와 함께 \r(캐리지 리턴)을 사용하면 막대가 동일한 줄에 출력됩니다. print_status_bar() 함수가 상태 막대를 출력하지만 간단한 tqdm 라이브러리를 사용할 수 있습니다.
사용할 하이퍼파라미터를 정의합니다.
n_epochs = 5
batch_size =32
n_steps = len(X_train) // batch_size
optimizer = keras.optimizers.Nadam(lr=0.01)
loss_fn = keras.losses.mean_squared_error
mean_loss = keras.metrics.Mean()
metrics = [keras.metrics.MeanAbsoluteError()]
사용자 정의 훈련 반복을 만들 준비를 마쳤습니다.
for epoch in range(1, n_epochs + 1):
print("Epoch {}/{}".format(epoch, n_epochs))
for step in range(1, n_steps + 1):
X_batch, y_batch = random_batch(X_train_scaled, y_train)
with tf.GradientTape() as tape:
y_pred = model(X_batch)
main_loss = tf.reduce_mean(loss_fn(y_batch, y_pred))
loss = tf.add_n([main_loss] + model.losses)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
mean_loss(loss)
for metric in metrics:
metric(y_batch, y_pred)
print_status_bar(step * batch_size, len(y_train), mean_loss, metrics)
print_status_bar(len(y_train), len(y_train), mean_loss, metrics)
for metric in [mean_loss] + metrics:
metric.reset_states()
'''
Epoch 1/5
11610/11610 - mean: 0.6500 - mean_absolute_error: 0.5208
Epoch 2/5
11610/11610 - mean: 0.6418 - mean_absolute_error: 0.5185
...
Epoch 5/5
11610/11610 - mean: 0.6400 - mean_absolute_error: 0.5183
'''
- epoch을 위한 for문 안에 step(배치)를 위한 for문을 만들었습니다. 배치마다 훈련 세트를 랜덤하게 샘플링합니다.
- tf.GradientTape() 블럭 안에서 batch 하나의 예측을 만들고, 손실을 계산하고 주 손실과 규제 손실(model.losses, 이 모델은 층마다 하나의 규제 손실이 있습니다.)을 더합니다.
mean_squared_error() 함수가 샘플마다 하나의 손실을 반환하기 때문에 tf.reduce_mean() 함수를 사용하여 배치에 대한 평균을 계산합니다. 규제 손실은 하나의 스칼라 값이므로 (동일한 크기와 타입을 가진 텐서를 더하는 tf.add_n() 함수를 사용하여) 손실을 더합니다.
- 그 다음 tape로 손실의 gradient를 계산하고 이를 옵티마이저에 적용하여 경사하강법을 수행합니다.
- 그 다음 (현재 에포크에 대한) 평균 손실과 지표를 업데이트하고 상태 막대를 출력합니다.
- 매 에포크 끝에서 상태 막대를 다시 출력하여 완료를 나타내고 줄바꿈을 수행합니다.
- 마지막으로 평균 손실과 지표값을 초기화합니다.
그레디언트 클리핑(11장)을 하고 싶다면 clipnorm, clipvalue 하이퍼파라미터를 지정하세요. 가중치에 다른 변환을 적용하려면 apply_gradients() 메서드를 호출하기 전에 수행하세요.
model을 정의하는 부분에서 층에 kernel_constraint, bias_constraint를 지정하여 모델의 가중치 제한을 추가하면 apply_gradients() 다음에 아래 처럼 제한을 적용하도록 훈련 반복을 수정해야합니다.
for epoch in range(1, n_epochs + 1):
..
for step in range(1, n_steps + 1):
...
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
## 제한
for variable in model.variables:
if variable.constraint is not None:
variable.assign(variable.constraint(variable))
##
mean_loss(loss)
...
주의해야할 점은 이 훈련 반복이 훈련, 테스트 시에 다르게 동작하는 층(BatchNormalization, Dropout)을 다루지 못한다는 점입니다. 이를 처리하려면 training=True로 모델을 호출하여 필요한 모든 층에 이 매개변수가 전파되도록 해야합니다.
이제 모델의 모든 구성 요소, 훈련 알고리즘을 커스터마이징하는 법을 알아보았습니다. 텐서플로의 자동 그래프 생성 기능을 어떻게 사용하는지 알아보고 이를 사용해 코드 실행 속도를 높이고 텐서플로가 지원하는 모든 플랫폼에서 구동시킬 수 있습니다.
텐서플로 함수와 그래프
텐서플로2에서 그래프는 사용하기 매우 쉽습니다.
def cube(x):
return x**3
정수, 실수 같은 파이썬 상수나 텐서를 사용하여 이 함수를 호출할 수 있습니다.
cube(2)
'''
8
'''
cube(tf.constant(2.0))
'''
<tf.Tensor: shape=(), dtype=float32, numpy=8.0>
'''
tf.function()을 사용하여 이 파이썬 함수를 텐서플로 함수(TensorFlow function)로 바꿔봅시다.
tf_cube = tf.function(cube)
tf_cube
'''
<tensorflow.python.eager.def_function.Function at 0x7effa7451a50>
'''
이 텐서플로 함수는 바꾸기 이전인 파이썬 함수처럼 사용할 수 있고 텐서 결과를 반환합니다.
tf_cube(2)
'''
<tf.Tensor: shape=(), dtype=int32, numpy=8>
'''
tf_cube(tf.constant(2.0))
'''
<tf.Tensor: shape=(), dtype=float32, numpy=8.0>
'''
tf.function()은 cube() 함수에서 수행되는 계산과 동일한 작업을 수행하는 계산 그래프를 생성합니다. 다른 방법으로는 tf.function 데코레이터를 만드는 방법이 있습니다. 이 방법을 더 많이 사용합니다.
@tf.function
def tf_cube(x):
return x**3
원본 파이썬 함수는 TensorFlow 함수의 python_function 속성으로 참조할 수 있습니다.
tf_cube.python_function(2)
'''
8
'''
텐서플로는 사용하지 않는 노드를 제거하고 단순화 (ex, 1+2를 3으로 대체)하는 등 계산 그래프를 최적하합니다. 최적화된 그래프가 준비되면 텐서플로 함수는 적절한 순서에 맞춰 병렬로 그래프 내의 연산을 효율적으로 실행합니다.
복잡한 연산을 수행할 때, 텐서플로 함수는 원본 파이썬 함수보다 훨씬 빠르게 실행됩니다.
사용자 정의 지표, 손싱, 층, 함수를 작성하고 이를 케라스 모델에 사용하면 케라스는 이 함수를 자동으로 텐서플로 함수로 변환해서 tf.function()을 사용할 필요가 없습니다.
Tip
케라스가 파이썬 함수를 텐서플로 함수로 바꾸지 못하게 하려면 사용자 정의 층이나 모델을 만들 때 dynamic=True로 지정합니다. model을 compile할 때 run_eagerly=True로 지정해도 됩니다.
텐서플로 함수는 호출에 사용되는 입력 크기, 데이터 타입에 맞춰 매번 새로운 그래프를 생성합니다. 예를 들어 tf_cube(tf.constant(10))로 호출하면 []크기의 int32 텐서에 맞는 그래프가 생성됩니다. tf_cube(tf.constant(20))을 호출하면 동일한 그래프가 재사용됩니다.
만약, tf_cube(tf.constant([10, 20]))를 호출하면 [2] 크기의 int32 텐서에 맞는 새로운 그래프가 생성됩니다. 텐서플로 함수는 이런식으로 다형성(polymorphism)을 처리하빈다. 이는 매개변수로 텐서를 사용했을 때만 해댱되며, 파이썬 값을 텐서플로 함수에 전달하면 고유한 값마다 새로운 그래프를 생성합니다. 가령 tf_cube(10), tf_cube(20)은 두 개의 그래프를 만듭니다.
CAUTION
파이썬 값으로 텐서플로 함수를 여러 번 호출하면 메모리를 많이 사용합니다.(사용 메모리를 해제하려면 텐서플로 함수를 삭제해야합니다.) 층을 만들 때 쓰는 하이퍼파라미터처럼 몇 개의 고유한 값이 있는 경우에만 매개변수 값에 파이썬 값을 사용해야합니다. 그래야 텐서플로가 모델을 잘 최적화할 수 있습니다.
오토 그래프와 트레이싱
텐서플로가 그래프를 생성하는 방법은 다음과 같습니다.
1. 오토 그래프(autograph)
먼저 파이썬 함수의 소스 코드를 분석해 for, while, if, break, continue, return과 같은 제어문을 모두 찾습니다.
텐서플로가 소스 코드를 분석하는 이유는 파이썬이 제어문을 찾을 수 있는 방법을 제공하지 않기 때문입니다. 파이썬에는 +, *연산을 위해 __add__(), __mul__() 같은 magic 메서드가 있지만 __while__(), __if__() 같은 메서드는 없기 때문입니다.
함수 코드를 모두 분석한 후 오토 그래프는 이 함수의 모든 제어문을 텐서플로 연산으로 바꾼 업그레이드 버전을 만듭니다. 반복문은 tf.while_loop()로 바꾸고 if문은 tf.cond()로 바꿉니다.
예를 들어 아래 그림처럼 오토그래프는 sum_squares() 파이썬 함수를 tf__sum_squares() 함수를 생성해 대체합니다. for문(원래 for 안에 있던 코드가 포함된) loop_body() 함수로 바뀌었습니다. 그 다음 for_stmt() 함수를 호출합니다. 이 함수는 계산 그래프 안에 적절한 tf.while_loop() 연산을 만들 것입니다.
그 다음 텐서플로가 이 업그레이드된 함수를 호출합니다. 하지만 매개변수 값을 전달하는 대신 심볼릭 텐서(symbolic tensor)를 전달합니다. 이 텐서는 실제 값은 없고, 이름, 데이터 타입, 크기만 가집니다. 예를 들어 sum_squares(tf.constant(10))를 호출하면 tf__sum_squares() 함수는 크기 [], int32 타입의 심볼릭 텐서를 사용해 호출됩니다. 이 함수는 graph mode(그래프 모드)로 실행될 것입니다.
그래프 모드는 각 텐서플로 연산이 해당 연산을 나타내고 텐서를 출력하기 위해 그래프에 노드를 추가한다는 의미입니다.(즉시 실행(eager execution) 또는 즉시 실행 모드(eager mode)로 불리는 것과 반대의 의미입니다.)
텐서플로 연산은 어떤 계산도 수행하지 않습니다. 위 그림에서 처럼 tf__sum_squares() 함수가 심볼릭 텐서를 매개변수로 하여 호출됩니다. (이 경우 크기가 []이고 int32 타입의 텐서)
2. 트레이싱
최종 그래프는 트레이싱(Tracing)과정을 통해 생성됩니다. 위 그림에서 노드는 연산을 나타내고 화살표는 텐서를 나타냅니다.
텐서플로 함수 사용 방법
파이썬 함수를 텐서플로 함수로 바꾸는 것은 @tf.function을 데코레이터로 사용하면 케라스가 나머지를 알아서 처리하지만, 지켜야 할 몇 가지 규칙이 있습니다.
1. 넘파이 같은 표준 라이브러리를 호출하면 트레이싱 과정에서 실행됩니다. 이 호출은 그래프에 포함되지 않습니다. 실제 텐서플로 그래프는 텐서플로 구성요소(텐서, 연산, 변수, 데이터셋)만 포함할 수 있습니다. np.sum() 대신 tf.reduce_sum(), sorted() 내장 함수 대신 tf.sort()와 같이 사용하면 됩니다.
예를 들어 np.random.rand()를 반환하는 텐서플로 함수 f(x)를 정의한다면, f(tf.constant(2.))와 f(tf.constant(3.))은 같은 난수를 반환합니다. 같은 크기, 타입이므로 같은 난수를 반환합니다. f(tf.constant([2., 3.]))은 크기가 다르므로 다른 값을 반환합니다. np.random.rand()를 tf.random.uniform([])로 바꾸면 이 연산이 그래프의 일부분이 되므로 호출할 때마다 난수가 생성됩니다.
텐서플로가 지원하지 않는 코드( 어떤 걸 로깅하거나 파이썬 카운터를 업데이트하는 등) 부수적인 작업을 하면 함수를 트레이싱할 때만 호출되므로 텐서플로 함수를 호출할 때 이 코드가 실행되지 않습니다.
어떤 임의의 코드를 tf.py_function()으로 감쌀 수 있습니다. 하지만, 이 코드에 최적화를 수행할 수 없어 성능이 저하되며 필요한 라이브러리가 설치된 플랫폼에서만 이 그래프가 실행되므로 이식성도 낮아집니다.
2. 다른 파이썬 함수나 텐서플로 함수를 호출할 수 있습니다. 파이썬 함수에 대해 텐서플로가 계산 그래프에 있는 연산을 감지해 자동으로 텐서플로 함수로 바꿔줍니다. 이 함수 또한 1번과 같은 규칙을 따릅니다. 이 함수들은 @tf.function 데코레이터를 적용할 필요가 없습니다.
즉 그래프모드로 계산할 첫 번째 함수에만 @tf.function 데코레이터를 적용하면 됩니다. 함수 안에서 일어나는 외부 라이브러리가 아닌 다른 함수의 호출은 자동으로 그래프 모드가 적용됩니다. 사용자 정의 층을 만든다면 전형적으로 __call__() 메서드에 데코레이터를 적용합니다.
3. 함수에서 텐서블로 변수(또는 데이터 셋, 큐 등 텐서플로 객체)를 만들면 처음 호출될 때만 수행되어야 합니다. 일반적으로 텐서플로 함수 밖에서 변수를 생성합니다.(예를 들면 사용자 정의 층의 build() 메서드) 변수에 새로운 값을 할당하려면 = 연산자 대신 assign() 메서드를 사용합니다.
4. 텐서플로는 텐서, 데이터셋을 순회하는 for문만 감지합니다. for i in range(x) 대신 for i in tf.range(x)를 사용합니다. 그렇지 않으면 트레이싱 단계에서 for문을 실행합니다. 일부러 이런 for문을 사용해 신경망의 층을 트레이싱 단계에서 만들 수 있습니다.
5. 성능면에서 반복분보다 벡터화된 구현이 좋습니다.
출처: 핸즈온 머신러닝 2판
사진 출처:
https://zenoahn.tistory.com/112
https://developers.googleblog.com/2017/09/introducing-tensorflow-datasets.html
https://dschloe.github.io/python/python_edu/07_deeplearning/chapter_7_3_1_tensorflow_basic/
궁금한점:
jacobian() 메서드 알아보기
사용자 정의 훈련 다시 정리하기
'핸즈온 머신러닝 2판' 카테고리의 다른 글
15장 RNN과 CNN을 사용해 시퀀스 처리하기 (0) | 2022.01.21 |
---|---|
14장 합성곱 신경망을 사용한 컴퓨터 비전 (0) | 2022.01.15 |
13장 텐서플로에서 데이터 적재와 전처리하기 (0) | 2022.01.10 |
11장 심층 신경망 훈련하기 (0) | 2021.12.30 |
인공 신경망 소개 (0) | 2021.12.23 |