모르는 부분:
save_to_multiple_csv_files 에서 repr(), np.c_, print(end파라미터), interleave(),
interleave 다시 복습
모르는 부분:
메모리 용량보다 큰 규모의 데이터셋으로 딥러닝 시스템을 훈련해야 하는 경우가 많습니다. 텐서플로는 데이터 API를 통해 이를 쉽게 처리할 수 있습니다.
데이터셋 객체를 만들고 데이터 위치와 변환 방법을 지정하면 됩니다. 멀티스레딩, 큐, 배치, 프리페치(prefetch)같은 상세한 사항을 모두 대신 처리해줍니다.
데이터 API는 텍스트 파일(CSV 등), 고정 길이의 이진 파일, 텐서플로의 TFRecord 포맷을 사용하는 이진 파일에서 데이터를 읽을 수 있습니다.
데이터 API는 SQL 데이터베이스에서 읽는 기능을 지원합니다. 구글의 BigQuery같은 데이터 소스에서 읽을 수 있는 오픈 소스도 있습니다.
대용량 데이터를 효율적으로 읽으려면 정규화 같은 전처리가 필요합니다. 모든 전처리 과정을 처맇기 위해 사용자 정의 전처리 층을 만들거나 케라스에서 제공하는 표준 전처리 층을 사용할 수 있습니다.
13장에서는 데이터 API, TFRecord 포맷을 다루고 사용자 정의 전처리 층을 만드는 법과 표준 케라스 전처리 층을 사용하는 방법을 다룹니다. 또한, 텐서플로 생태계에 있는 관련된 프로젝트도 몇 가지 알아봅니다.
- TF 변환 (tf.Transform)
실행 속도를 높이기 위해 훈련 전에 전체 훈련 세트에 대해 실행하는 전처리 함수를 작성할 수 있습니다. 그 다음 텐서플로 함수로 변환하고 상용 환경에 배포된 다음 훈련된 모델과 협업하여 새로운 샘플에 대해 동적으로 전처리를 수행할 수 있습니다.
- TF 데이터셋 (TFDS)
각종 데이터 셋을 다운로드할 수 있는 편리한 함수입니다.
데이터 API
전체적인 데이터 API의 중심에는 데이터셋(dataset) 개념이 있습니다. dataset은 연속된 데이터 샘플을 나타내며 디스크에서 데이터를 점진적으로 읽습니다.
tf.data.Dataset.from_tensor_slices()
X = tf.range(10) dataset = tf.data.Dataset.from_tensor_slices(X) # tf.data.Dataset.range(10) dataset ''' <TensorSliceDataset shapes: (), types: tf.int32> '''
from_tensor_slices() 함수는 텐서를 받아 (첫 번째 차원을 따라) X의 각 원소가 item으로 표현되는 tf.data.Dataset을 만듭니다.
아래와 같이 아이템을 순회할 수 있습니다.
for item in dataset: print(item) ''' tf.Tensor(0, shape=(), dtype=int32) ... tf.Tensor(8, shape=(), dtype=int32) tf.Tensor(9, shape=(), dtype=int32) '''
연쇄 변환
데이터 셋에 여러 종류의 변환을 수행할 수 있습니다.
dataset2 = dataset.repeat(3).batch(10) for item in dataset2: print(item) ''' tf.Tensor([0 1 2 3 4 5 6 7 8 9], shape=(10,), dtype=int32) tf.Tensor([0 1 2 3 4 5 6 7 8 9], shape=(10,), dtype=int32) tf.Tensor([0 1 2 3 4 5 6 7 8 9], shape=(10,), dtype=int32) ''' dataset1 = dataset.repeat(3).batch(7) for item in dataset1: print(item) ''' tf.Tensor([0 1 2 3 4 5 6], shape=(7,), dtype=int32) tf.Tensor([7 8 9 0 1 2 3], shape=(7,), dtype=int32) tf.Tensor([4 5 6 7 8 9 0], shape=(7,), dtype=int32) tf.Tensor([1 2 3 4 5 6 7], shape=(7,), dtype=int32) tf.Tensor([8 9], shape=(2,), dtype=int32) '''
Dataset.repeat(3)
데이터셋의 아이템을 3차례 반복하는 하나의 데이터 셋을 만듭니다.
Dataset.batch(7)
데이터 셋의 아이템을 7개씩 그룹으로 묶습니다. 모자란 크기는 drop_remainder=True로 호출하면 마지막 배치를 버리고 모든 배치를 동일한 크기로 맞춥니다.
Dataset.map()
dataset3 = dataset.map(lambda x: x*2) #[0, 2, 4, 6, 8, .., 16, 18]
map() 함수는 각 아이템 x에 원하는 어떤 전처리 작업도 적용할 수 있습니다. 이미지 크기 변환이나 회전 같은 복잡한 계산도 포함합니다. num_parallel_calls 매개 변수로 스레드 개수를 설정해 계산 속도를 높일 수 있습니다. map() 메서드에 전달하는 함수는 텐서플로 함수로 변환 가능해야 합니다.
Dataset.apply()
map() 메서드가 각 아이템에 변환을 적용한다면, apply() 메서드는 데이터셋 전체에 변환을 적용합니다. 데이터셋에 unbatch() 함수를 적용합니다. 새로 만들어진 데이터셋의 각 아이템은 7개의 정수로 이루어진 배치가 아니라 하나의 정수 텐서가 됩니다.
dataset4 = dataset1.apply(tf.data.experimental.unbatch()) # 0, 2, 4, ...
Dataset.filter()
데이터를 필터링합니다.
dataset5 = dataset.filter(lambda x: x<6) # [0, 1, 2, 3, 4, 5]
Dataset.take(3)
3개의 아이템만 보여줍니다.
dataset6 = dataset.take(3) # [0, 1, 2]
데이터 셔플링
경사 하강법은 샘플이 독립적이고 동일한 분포일 때 최고의 성능을 발휘합니다.(4장 참조) 이를 구현하는 간단한 방법은 shuffle() 메서드를 사용하는 것입니다.
shuffle() 메서드는 원본 데이터 셋에서 아이템을 buffer_size 개수만큼 추출해 버퍼에 채웁니다. 아이템이 요청되면 이 버퍼에서 랜덤하게 하나를 꺼내 반환합니다. 그리고 원본 데이터셋에서 새로운 아이템을 추출하여 비워진 버퍼를 채웁니다.
이 과정을 원본 데이터 셋의 모든 아이템이 사용될 때까지 반복한 후, 모두 사용되면 버퍼가 비워질 때까지 계속하여 랜덤하게 아이템을 반환합니다.
shuffle() 메서드를 사용할 때 버퍼 크기를 충분히 크게 지정하는 것이 중요합니다. 너무 작으면 셔플링 효과가 감소됩니다. 단, 보유한 메모리 크기를 넘지 않아야 합니다. 프로그램 실행 시마다 셔플링되는 순서를 동일하게 만들려면 랜덤 시드를 부여합니다.
가령, 카드 덱에서 3장의 카드를 집어서 랜덤하게 하나를 뽑아 오른쪽에 놓고 남은 두 카드는 오른 손에 들고 있다고 해봅시다. 다시 카드 덱에서 또 다른 카드를 하나 집은 다음 손에 든 카드와 합쳐 다시 섞고 랜덤하게 하나를 선택하여 오른쪽에 높습니다. 이런 작업을 모두 적용하면 이 카드 덱은 완전히 셔플링 된 것이라고 말할 수 있을 까요? 버퍼 크기가 작으면 원본 데이터셋에서 뒤쪽에 있는 아이템은 새로 만들어진 데이터셋에서도 뒤에 등장할 가능성이 높습니다.
완전한 셔플링을 위해서는 버퍼 크기가 데이터셋의 크기와 같아야 합니다.
dataset = tf.data.Dataset.range(10).repeat(3) dataset = dataset.shuffle(buffer_size=5, seed=42).batch(7) for item in dataset: print(item) ''' tf.Tensor([0 2 3 6 7 9 4], shape=(7,), dtype=int64) tf.Tensor([5 0 1 1 8 6 5], shape=(7,), dtype=int64) tf.Tensor([4 8 7 1 2 3 0], shape=(7,), dtype=int64) tf.Tensor([5 4 2 7 8 9 9], shape=(7,), dtype=int64) tf.Tensor([3 6], shape=(2,), dtype=int64) '''
Tip
셔플된 데이터셋에 repeat() 메서드를 호출하면 기본적으로 반복마다 새로운 순서를 생성합니다. 예를 들면 dataset.shuffle(buffer_size=3).repeat(3).batch(7)과 같이 쓰면 반복마다 순서가 달라집니다. 일반적으로 이런 동작 방식은 바람직합니다.
테스트나 디버깅을 위해 반복마다 동일한 순서를 사용해야 한다면, shuffle() 메서드에 reshuffle_each_iteration=False를 지정합니다.
메모리 용량보다 큰 대규모 데이터셋은 버퍼가 데이터셋에 비해 작기 때문에 충분히 섞을 수 없습니다. 해결 방법은 원본 데이터 자체를 섞는 것입니다. (예를 들면 리눅스에서는 shuf 명령어를 사용해 텍스트를 섞을 수 있습니다.) 이렇게 하면 셔플링 효과가 크게 향상됩니다.
원본 데이터가 섞여 있더라도 일반적으로 에포크마다 한 번 더 섞습니다. 그렇지 않으면 에포크마다 동일한 순서가 반복되어 모델에 편향이 추가됩니다. (예를 들어, 원본 데이터에 우연히 존재하는 가짜 패턴 때문입니다.)
예를 들면 리눅스에서는 shuf 명령어를 사용해 텍스트를 섞을 수 있습니다.
샘플을 더 섞기 위해 많이 사용하는 방법은 원본 데이터를 여러 파일로 나눈 다음 훈련 동안 무작위로 읽는 것입니다. 이 방법 역시 동일한 파일에 있는 샘플은 여전히 함께 처리되는데, 이를 피하기 위해 파일 여러 개를 무작위로 선택하고 파일에서 동시에 읽은 레코드를 돌아가면서 반환할 수 있습니다. 그 다음 shuffle() 메서드를 사용해 그 위에 셔플링 버퍼를 추가할 수 있습니다.
이는 데이터 API를 사용하면 몇 줄의 코드로 만들 수 있습니다.
여러 파일에서 한 줄씩 번갈아 읽기
캘리포니아 주택 데이터 셋을 훈련 세트, 검증 세트, 테스트 세트로 나눕니다.
from sklearn.datasets import fetch_california_housing from sklearn.model_selection import train_test_split from sklearn.preprocessing import StandardScaler housing = fetch_california_housing() X_train_full, X_test, y_train_full, y_test = train_test_split( housing.data, housing.target.reshape(-1, 1), random_state=42 ) X_train, X_valid, y_train, y_valid = train_test_split( X_train_full, y_train_full, random_state=42 ) scaler = StandardScaler() scaler.fit(X_train) X_mean = scaler.mean_ X_std = scaler.scale_
파일을 여러 개로 나눠 저장하는 함수를 정의합니다.
def save_to_multiple_csv_files(data, name_prefix, header=None, n_parts=10): housing_dir = os.path.join('/content/sample_data', 'datasets', 'housing') os.makedirs(housing_dir, exist_ok=True) path_format = os.path.join(housing_dir, 'my_{}_{:02d}.csv') filepaths = [] m = len(data) for file_idx, row_indices in enumerate(np.array_split(np.arange(m), n_parts)): part_csv = path_format.format(name_prefix, file_idx) filepaths.append(part_csv) with open(part_csv, 'wt', encoding='utf-8') as f: if header is not None: f.write(header) f.write('\n') for row_idx in row_indices: f.write(','.join([repr(col) for col in data[row_idx]])) f.write('\n') return filepaths
train_data, valid_data, test_data를 각각 20, 10, 10개의 파일로 나눠 저장합니다.
train_data = np.c_[X_train, y_train] valid_data = np.c_[X_valid, y_valid] test_data = np.c_[X_test, y_test] header_cols = housing.feature_names + ['MedianHouseValue'] header = ','.join(header_cols) train_filepaths = save_to_multiple_csv_files(train_data, 'train', header, n_parts=20) valid_filepaths = save_to_multiple_csv_files(valid_data, 'valid', header, n_parts=10) test_filepaths = save_to_multiple_csv_files(test_data, 'test', header, n_parts=10)
만들어진 경로는 아래와 같습니다.
train_filepaths # train_filepaths = '/content/.../housing/my_train_*.csv'도 가능 ''' ['/content/sample_data/datasets/housing/my_train_00.csv', '/content/sample_data/datasets/housing/my_train_01.csv', ... '/content/sample_data/datasets/housing/my_train_16.csv', '/content/sample_data/datasets/housing/my_train_17.csv', '/content/sample_data/datasets/housing/my_train_18.csv', '/content/sample_data/datasets/housing/my_train_19.csv'] '''
pd.read_csv(train_filepaths[0]).head() ''' 결과1 '''
텍스트 파일로 읽으면 아래와 같습니다.
with open(train_filepaths[0]) as f: for i in range(5): print(f.readline(), end='') ''' MedInc,HouseAge,AveRooms,AveBedrms,Population,AveOccup,Latitude,Longitude,MedianHouseValue 3.5214,15.0,3.0499445061043287,1.106548279689234,1447.0,1.6059933407325193,37.63,-122.43,1.442 5.3275,5.0,6.490059642147117,0.9910536779324056,3464.0,3.4433399602385686,33.69,-117.39,1.687 3.1,29.0,7.5423728813559325,1.5915254237288134,1328.0,2.2508474576271187,38.44,-122.98,1.621 7.1736,12.0,6.289002557544757,0.9974424552429667,1054.0,2.6956521739130435,33.55,-117.7,2.621 '''
이런 경로가 담긴 데이터셋을 만듭니다. 기본적으로 list_files() 함수는 경로를 섞은 데이터셋을 반환합니다. 이를 원치 않는다면 shuffle=False로 지정할 수 있습니다.
filepath_dataset = tf.data.Dataset.list_files(train_filepaths, seed=42) # 결과 for item in filepath_dataset: print(item) ''' tf.Tensor(b'/content/sample_data/datasets/housing/my_train_05.csv', shape=(), dtype=string) tf.Tensor(b'/content/sample_data/datasets/housing/my_train_16.csv', shape=(), dtype=string) ... tf.Tensor(b'/content/sample_data/datasets/housing/my_train_08.csv', shape=(), dtype=string) '''
그 다음 interleave() 메서드를 호출하여 한 번에 다섯 개의 파일을 한 줄씩 번갈아가며 읽습니다. 각 파일의 첫 번째 줄은 열 이름이므로 skip() 메서드를 사용하여 건너뜁니다.
n_readers=5 dataset = filepath_dataset.interleave( lambda filepath: tf.data.TextLineDataset(filepath).skip(1), cycle_length=n_readers )
cycle_length는 동시에 처리할 입력 개수를 지정합니다. 기본값은 tf.data.experimental.AUTOTUNE으로 가능한 CPU 코어 개수입니다. 이 값은 num_parallel_calls 매개변수보다 크거나 같아야합니다.
interleave() 메서드는 filepath_dataset에 있는 다섯 개의 파일에서 데이터를 읽는 데이터셋을 만듭니다. lambda 함수는 각 파일을 호출해 TextLineDataset(새로운 데이터셋)을 만들 것입니다. 이 단계에서는 명확하게 말하면 총 7개의 데이터셋이 있습니다. 파일 경로 데이터셋, 인터리브 데이터셋, 인터리브 데이터 셋에 의해 내부적으로 생성된 TextLineDataset이 있습니다.
인터리브 데이터셋을 반복 구문에 사용하면 5개의 TextLineDataset을 순회합니다. 모든 데이터 셋이 아이템이 소진될 때까지 한 번에 한 줄씩 읽습니다. 그 후에 filepath_dataset에서 다음 5개 파일을 가져오고 동일한 과정을 반복합니다. 모든 파일 경로가 소진될 때까지 계속됩니다.
Tip
interleave()가 잘 동작하려면 파일의 길이가 동일한 것이 좋습니다. 그렇지 않으면 가장 긴 파일의 끝은 인터리빙이 되지 않을 것입니다.
interleave()는 각 파일에서 한 번에 한 줄씩 순서대로 읽는데, 이를 병렬로 읽고 싶다면 map() 메서드와 동일하게 num_parallel_calls 매개변수에 원하는 스레드 개수를 지정합니다. tf.data.experimental.AUTOTUNE으로 텐서플로가 사용가능한 CPU를 기반으로 동적으로 적절한 스레드 개수를 선택할 수 있습니다.
데이터셋을 확인해보겠습니다.
take() 메서드는 지정한 개수만큼 읽는 TakeDataset 객체를 반환합니다. 텐서플로 데이터셋 객체는 모두 파이선 for문을 이용할 수 있는데, 실질적으로 객체가 사용될 때까지 실제 파일에서 어떤 데이터도 읽지 않습니다.
for line in dataset.take(5): print(line.numpy()) ''' b'4.5909,16.0,5.475877192982456,1.0964912280701755,1357.0,2.9758771929824563,33.63,-117.71,2.418' b'2.4792,24.0,3.4547038327526134,1.1341463414634145,2251.0,3.921602787456446,34.18,-118.38,2.0' b'4.2708,45.0,5.121387283236994,0.953757225433526,492.0,2.8439306358381504,37.48,-122.19,2.67' b'2.1856,41.0,3.7189873417721517,1.0658227848101265,803.0,2.0329113924050635,32.76,-117.12,1.205' b'4.1812,52.0,5.701388888888889,0.9965277777777778,692.0,2.4027777777777777,33.73,-118.31,3.215' '''
이 내용은 헤더를 제외한 첫 번째 행에 해당하며 순서는 랜덤합니다. 하지만 결과에서 볼 수 있듯이 바이트 스트링이라서 이를 파싱하고 스케일을 조정할 필요가 있습니다.
데이터 전처리
전처리를 수행하기 위한 간단한 함수를 만들어봅시다.
# 위에서 작성한 X_mean, X_std
scaler = StandardScaler()
scaler.fit(X_train)
X_mean = scaler.mean_
X_std = scaler.scale_
n_inputs = 8 # X_train.shape[-1]
def preprocess(line):
defs = [0.]*n_inputs + [tf.constant([], dtype=tf.float32)]
fields = tf.io.decode_csv(line, record_defaults=defs)
x = tf.stack(fields[:-1])
y = tf.stack(fields[-1:])
return (x - X_mean) / X_std, y
X_mean, X_std는 피처마다 1개씩, 즉 8개의 실수를 가진 1D 텐서(또는 ndarray)입니다.
preprocess() 함수는 CSV 한 라인을 받아 파싱합니다. 이를 위해 tf.io.decode_csv() 함수를 사용하고, 첫 번째로 파싱할 라인을, 두 번째로 CSV 파일의 각 열에 대한 기본값을 담은 배열을 입력합니다. 이 값은 각 열의 dtype, 개수, 기본값 등을 알려주는 역할을 합니다. 마지막 열에 있는 tf.float32 타입의 빈 배열을 제공하는데, 이 값은 기본값이 없으므로 누락된 값이 발견되면 예외가 발생합니다.
decode_csv() 함수는 열마다 스칼라 텐서의 리스트를 반환합니다. 1D 텐서를 반환해야 하므로 마지막 열(타깃)을 제외하고 tf.stack() 함수를 호출해 각 열의 텐서값을 쌓아 각각 1D 텐서로 만듭니다. 그 다음 타깃값에도 동일하게 적용합니다.?
마지막으로 스케일 조정된 특성, 타깃을 담은 튜플을 반환합니다.
전처리 함수가 아래처럼 잘 동작함을 알 수 있습니다. 이제 이 함수를 데이터셋에 적용해보겠습니다.
preprocess(b'4.5909,16.0,5.475877192982456,1.0964912280701755,1357.0,
2.9758771929824563,33.63,-117.71,2.418')
'''
(<tf.Tensor: shape=(8,), dtype=float32, numpy=
array([ 0.36618188, -0.998705 , 0.00781878, -0.00675364, -0.06140145,
0.0072037 , -0.94465536, 0.9367464 ], dtype=float32)>,
<tf.Tensor: shape=(1,), dtype=float32, numpy=array([2.418], dtype=float32)>)
'''
데이터 적재와 전처리를 합치기
재사용 가능한 코드를 만들기 위해 지금까지 언급한 모든 것을 하나의 함수로 만들겠습니다.
def csv_reader_dataset(filepaths, repeat=1, n_readers=5,
n_read_threads=None, shuffle_buffer_size=10000,
n_parse_threads=5, batch_size=32):
dataset = tf.data.Dataset.list_files(filepaths).repeat(repeat)
dataset = dataset.interleave(
lambda filepath: tf.data.TextLineDataset(filepath).skip(1),
cycle_length=n_readers, num_parallel_calls=n_read_threads
)
dataset = dataset.shuffle(shuffle_buffer_size)
dataset = dataset.map(preprocess, num_parallel_calls=n_parse_threads)
return dataset.batch(batch_size).prefetch(1)
마지막 prefetch(1)를 호출하면 데이터셋은 항상 한 배치가 미리 준비되도록 합니다. 즉, 훈련 알고리즘이 한 배치로 작업을 하는 동안 이 데이터셋이 동시에 다음 배치를 준비합니다.
일반적으로 배치 하나를 프리페치하는 걸로 충분하지만, tf.data.experimental.AUTOTUNE을 사용하면 텐서플로가 자동으로 결정할 수 있습니다.
GPU에서 데이터를 바로 프리패치할 수 있는 tf.data.experimental.prefetch_to_device()를 확인해보세요.
prefetch와 함께 interleave(), map() 메서드를 호출할 때 num_parallel_calls 매개변수를 지정하면 멀티스레드로 데이터를 적재, 전처리할 수 있습니다.
Tip
GPU 카드를 구입할 때 성능, 메모리 용량 외에도 메모리 대역폭(memory bandtwidth)이 정말 중요합니다. 이는 초당 RAM에서 입출력할 수 있는 데이터의 기가바이트 수치입니다
데이터셋이 메모리에 모두 들어갈 정도로 작다면, cache() 메서드를 사용해 RAM에 모두 캐싱할 수 있습니다. 이는 훈련 속도를 크게 높여줍니다. 일반적으로 데이터를 적재, 전처리한 후 캐싱을 수행한 다음 셔플링, 반복, 배치, 프리페치를 수행합니다. 이렇게 하면 각 샘플을 (매 에포크가 아니라) 한 번만 읽고 전처리하지만 에포크마다 다르게 셔플링되고 다음 배치도 미리 준비됩니다.
앞서 설명한 데이터셋 메서드 외에도 concatenate(), zip(), reduce(), shard(), flat_map(), padded_batch() 등이 자주 쓰입니다. from_generator(), from_tensors()와 같은 클래스 메서드도 있습니다.
tf.data.experimental 패키지에 실험적인 기능이 있습니다. make_csv_dataset() 메서드는 CsvDataset 클래스와 CSV 파일에서 각 열의 데이터 타입을 추측하여 처리합니다.
tf.keras와 데이터셋 사용하기
csv_reader_dataset() 함수로 동일한 분포로 추출되는 데이터 셋을 생성해보겠습니다. 데이터셋은 tf.keras에서만 지원됩니다.
train_set = csv_reader_dataset(train_filepaths)
valid_set = csv_reader_dataset(valid_filepaths)
test_set = csv_reader_dataset(test_filepaths)
이제 이 데이터셋들을 fit() 메서드에 X_train, y_train, X_valid, y_valid 대신 입력하면 됩니다. fit() 메서드는 훈련 데이터셋의 반복을 지원합니다.
다른 방법으로는 repeat() 메서드를 호출하여 훈련 데이터셋을 무한히 반복하고, fit()을 호출할 때 steps_per_epoch를 지정하는 것입니다. 에포크에 걸쳐서 셔플링 버퍼를 사용할 때 유리합니다. steps_per_epoch*batch_size 크기가 len(X_train)과 같지 않다면 에포크마다 조금씩 다른 훈련 세트를 사용하게 됩니다.
지금까지 데이터 API를 사용해 강력한 입력 파이프라인을 만드는 법을 배웠습니다. CSV보다 휴율적이고 대규모의 복잡한 이미지나 오디오 같은 데이터 구조를 지원하는 TFRecord를 사용하는 방법을 알아보겠습니다.
TFRecord는 훈련 과정에서 데이터를 적재하고 전처리하는 데 병목이 생기는 경우 유용합니다. CSV 파일을 선호하면 TFRecord를 사용할 필요가 없습니다.
TFRecord 포맷
TFRecord는 크기가 다른 연속된 이진 레코드를 저장하는 단순한 이진 포맷입니다. 각 레코드는 레코드 길이, 길이가 올바른지 체크하는 CRC 체크섬(checksum), 실제 데이터, 데이터를 위한 CRC 체크섬으로 구성됩니다.
tf.io.TFRecordWriter
with tf.io.TFRecordWriter('my_data.tfrecord') as f:
f.write(b"This is the first record")
f.write(b'And this is the second record')
TFRecoedWriter 클래스로 TFRecord를 손쉽게 만들 수 있습니다.
tf.data.TFRecordDataset
filepaths = ['my_data.tfrecord']
dataset = tf.data.TFRecordDataset(filepaths)
for item in dataset:
print(item)
'''
tf.Tensor(b'This is the first record', shape=(), dtype=string)
tf.Tensor(b'And this is the second record', shape=(), dtype=string)
'''
TFRecordDataset 클래스를 사용해 하나 이상의 TFRecord를 읽을 수 있습니다.
TIP
기본적으로 TFRecordDataset은 파일을 하나씩 차례로 읽습니다. num_parallel_reads를 지정하여 여러 파일에서 레코드를 번갈아 가며 읽을 수 있습니다. 앞서 CSV 파일에 적용했던 것처럼 list_files()와 interleave()를 사용하여 동일한 결과를 얻을 수 있습니다.
압축된 TFRecord 파일
네트워크를 통해 읽어야 하는 경우 파일을 압축해야 하는데, options 매개변수를 사용하여 압축된 TFRecord 파일을 만들 수 있습니다.
options = tf.io.TFRecordOptions(compression_type='GZIP')
with tf.io.TFRecordWriter('my_compressed.tfrecord', options) as f:
f.write(b'This is the first record')
f.write(b'And this is the second record')
filepaths = ['my_compressed.tfrecord']
dataset = tf.data.TFRecordDataset(filepaths, compression_type='GZIP')
for item in dataset:
print(item)
'''
tf.Tensor(b'This is the first record', shape=(), dtype=string)
tf.Tensor(b'And this is the second record', shape=(), dtype=string)
'''
프로토콜 버퍼의 개요
각 레코드는 어떤 이진 포맷도 사용할 수 있지만, 일반적으로 TFRecord는 직렬화된 프로토콜버퍼(protobuf)를 담고 있습니다. protocol buffer는 이식성, 확장성은 효율적인 이진 포맷입니다.
텐서플로는 tf.train.Example 타입의 프로토콜 버퍼를 만들고 파싱할 수 있는 함수를 제공합니다. 이 정도로 보통 충분하지만, 이 예제에서는 자체적인 프로토콜 버터를 만들어보겠습니다.
간단한 프로토콜 버퍼를 정의해보겠습니다.
%%writefile person.proto
syntax = 'proto3';
message Person {
string name=1;
int32 id =2;
repeated string email =3;
}
이 정의는 프로토콜 버전 포맷 버전3을 사용합니다. Person 객체(프로토콜 버퍼 객체는 직렬화하여 전송된 것을 의미하므로 message라고 부릅니다.)는 string name, int32 id, string email 필드를 하나 이상 가집니다. 숫자 1, 2, 3은 필드 식별자로 레코드의 이진 표현에 사용됩니다.
.proto 파일로 정의를 만들면 컴파일 할 수 있습니다. 프로토콜 버퍼 컴파일러인 protoc를 사용해 파이썬 클래스를 생성합니다.
프로토콜 정의를 컴파일합니다. --desciptor_set_out과 --include_imports 옵션은 tf.io.decode_proto() 예제를 위해 필요합니다.?
!protoc person.proto --python_out=. --descriptor_set_out=person.desc --include_imports
!ls person*
'''
person.desc person_pb2.py person.proto
'''
사실 여기서 사용할 프로토콜 버퍼의 정의는 이미 컴파일되어 텐서플로 안에 파이썬 클래스로 포함되어 있어서 protoc을 사용할 필요가 없습니다.
from person_pb2 import Person # 생성된 클래스를 임포트합니다.
person = Person(name='Al', id=123, email=['a@b.com']) # Person 객체를 만듭니다.
print(person)
'''
name: "Al"
id: 123
email: "a@b.com"
'''
person.name # 필드를 읽습니다. 결과: 'Al'
person.name = 'ALice' # 필드를 수정합니다. 결과: 'ALice'
person.email[0] # 반복 필드는 배열처럼 참조할 수 있습니다. 결과: 'a@b.com'
person.email.append('c@d.com') # 이메일 주소를 추가합니다. 결과: ['a@b.com', 'c@d.com']
s = person.SerializeToString() # 바이트 문자열로 객체를 직렬화합니다.
# 결과: b'\n\x05ALice\x10{\x1a\x07a@b.com\x1a\x07c@d.com'
person2 = Person() # 새로운 Person 객체를 만듭니다.
person2.ParseFromString(s) # 바이트 문자열을 파싱합니다. (27 바이트 길이입니다.)
person == person2 # 두 객체는 동일합니다. 결과: True
person2
'''
name: "ALice"
id: 123
email: "a@b.com"
email: "c@d.com"
'''
SerializeToString() 메서드를 통해 객체를 직렬화했는데, 이 문자열을 저장하거나 네트워크를 통해 전달할 수 있습니다. 이진 데이터를 읽거나 수신하면 ParseFromString() 메서드를 사용해 파싱할 수 있습니다. 이를 통해 직렬화된 객체의 복사본을 얻을 수 있습니다.
직렬화된 Person 객체를 TFRecord 파일로 저장한 다음 읽고 파싱할 수 있습니다. 모든 작업에 이상은 없을 것이지만, SerializeToString()과 ParseFromString()은 텐서플로 연산이 아니라 텐서플로 함수에 포함할 수 없습니다.(tf.py_function() 연산으로 감싸는 걸 제외). 텐서플로는 파싱 연산을 제공하기 위한 텐서플로 프로토콜 버퍼 정의를 가지고 있습니다.
텐서플로 프로토콜 버퍼
TFRecord 파일에서 사용하는 주요 프로토콜 버퍼는 Example 프로토콜 버퍼입니다. 이는 데이터셋에 있는 하나의 샘플을 표현합니다. tf.train.Example 프로토콜 버퍼의 정의는 아래와 같습니다.
syntax = 'proto3';
message BytesList {repeated bytes value = 1;}
message FloatList {repeated float value = 1 [packed = true];}
message Int64List {repeated int64 value = 1 [packed = true];}
message Feature{
oneof kind{
BytesList bytes_list = 1;
FloatList float_list = 2;
Int64List int64_list = 3;
}
};
message Features { map<string, Feature> feature =1;};
message Example {Features features =1;};
BytesList, FloatList, Int64List의 정의는 이해하기 쉽습니다. [packed = true]는 효율적인 인코딩을 위해 반복적인 수치 필드에 사용됩니다.
Feature는 BytesList, FloatList, Int64List 중 하나를 담고 있습니다. 복수형인 Features는 특성 이름과 특성값을 매핑한 딕셔너리를 가집니다. 마지막으로 Example은 하나의 Features 객체를 가집니다.
Features 객체가 1개인데, 왜 Features를 안쓰고 Example을 정의할까요? 텐서플로 개발자들이 언젠가 더 추가할 것입니다. 이후 수정된 Example 정의가 동일한 아이디(1)로 features 필드를 포함하는 한 이전 버전에 대한 호환성을 유지할 것입니다. 이러한 확장성이 프로토콜 버퍼가 가진 훌륭한 특징 중 하나입니다.
tf.train.Example
Person과 동일한 표현인 Example 객체를 만들고 TFRecord 파일에 저장해보겠습니다.
from tensorflow.train import BytesList, FloatList, Int64List
from tensorflow.train import Feature, Features, Example
person_example = Example(
features = Features(
feature = {
'name': Feature(bytes_list=BytesList(value=[b"Alice"])),
'id': Feature(int64_list=Int64List(value=[123])),
'emails': Feature(bytes_list=BytesList(value=[b'a@b.com',
b'c@d.com']))
}
)
)
person_example 결과:
'''
features {
feature {
key: "emails"
value {
bytes_list {
value: "a@b.com"
value: "c@d.com"
}
}
}
feature {
key: "id"
value {
int64_list {
value: 123
}
}
}
feature {
key: "name"
value {
bytes_list {
value: "Alice"
}
}
}
}
'''
Example 프로토콜 버퍼를 만들었으므로 SerializeToString() 함수로 직렬화하고 이 문자열을 TFRecord 파일로 저장해보겠습니다.
with tf.io.TFRecordWriter('my_contacts.tfrecord') as f:
f.write(person_example.SerializeToString())
보통 하나 이상의 Example을 만드는데, CSV 파일 등을 읽어 샘플마다 하나의 Example 프로토콜 버퍼를 생성하고, 직렬화한 다음 프로세스에서 셔플링하고 TFRecord 파일 여러개에 저장합니다. 이는 작업량이 많으므로 필요할 때만 적용합니다. CSV 파일을 사용해도 잘 작동하는 파이프라인은 그렇게 쓰면 됩니다.
Example 프로토콜 버퍼를 읽고 파싱하기
tf.data.TFRecordDataset으로 파일을 읽고 tf.io.parse_single_example로 Example을 parsing할 것인데, 직렬화된 데이터를 담을 스칼라 텐서가 필요합니다. feature_description 딕셔너리를 만들어 매핑합니다.
tf.io.FixedLenFeature & tf.io.VarLenFeature
feature_description = {
'name': tf.io.FixedLenFeature([], tf.string, default_value=''),
'id': tf.io.FixedLenFeature([], tf.int64, default_value=0),
'emails': tf.io.VarLenFeature(tf.string)
}
tf.io.FixedLenFeature는 특성의 크기, 타입, 기본값을 적습니다. 'email'처럼 가변적인 특성 리스트는 tf.io.VarLenFeature를 사용해 특성의 타입만 적습니다.
tf.io.parse_single_example
for serialized_example in tf.data.TFRecordDataset(['my_contacts.tfrecord']):
parsed_example = tf.io.parse_single_example(serialized_example,
feature_description)
'''
결과
{'emails': <tensorflow.python.framework.sparse_tensor.SparseTensor at 0x7fc2a907ef50>,
'id': <tf.Tensor: shape=(), dtype=int64, numpy=123>,
'name': <tf.Tensor: shape=(), dtype=string, numpy=b'Alice'>}
'''
고정 길이 특성은 보통의 텐서로 파싱되지만, 가변 길이 특성은 희소 텐서로 파싱됩니다. tf.sparse.to_dense()로 밀집 텐서로 변환할 수 있습니다. 여기서는 희소 텐서의 값을 바로 참조하는 것이 더 간단합니다.
tf.sparse.to_dense(parsed_example['emails'], default_value=b"")
'''
<tf.Tensor: shape=(2,), dtype=string, numpy=array([b'a@b.com', b'c@d.com'], dtype=object)>
'''
parsed_example['emails'].values
'''
<tf.Tensor: shape=(2,), dtype=string, numpy=array([b'a@b.com', b'c@d.com'], dtype=object)>
'''
BytesList는 직렬화된 객체를 포함해 어떤 이진 데이터도 포함할 수 있습니다. 예를 들어 tf.io.encode_jpeg()를 사용해 JPEG 포맷 이미지를 인코딩하고 이 이진 데이터를 BytesList에 넣을 수 있습니다. 이를 tf.io.decode_image()로 디코딩할 수 있습니다.
tf.io.serialize_tensor()로 어떤 텐서라도 직렬화하고 결과 바이트 스트링을 BytesList 특성에 넣어 저장할 수 있습니다. 나중에 TFRecord 파일을 파싱할 때 tf.io.parse_tensor()를 사용합니다.
tf.io.parse_example()
dataset = tf.data.TFRecordDataset(['my_contacts.tfrecord']).batch(10)
for serialized_examples in dataset:
parsed_examples = tf.io.parse_example(serialized_examples,
feature_description)
tf.io.parse_example()를 사용하여 배치 단위로 파싱할 수 있습니다.
위에서 처럼 대부분의 경우 Example 프로토콜 버퍼로 충분할 것입니다. 하지만, 리스트의 리스트를 다룰 때는 조금 사용하기 어렵습니다. 예를 들어 텍스트 문서를 분류한다고 했을 때, 각 문서는 문장의 리스트로 표현됩니다. 각 문장은 단어의 리스트로 표현됩니다. 이런 경우를 위해 고안된 텐서플로의 SequenceExample이 있습니다.
SequenceExample 프로토콜 버퍼를 사용해 리스트의 리스트 다루기
SequenceExample 프로토콜 버퍼의 정의입니다.
message FeatureList {repeated Feature feature =1;};
message FeatureLists {map<string, FeatureList> feature_list =1;};
message SequenceExample{
Features context =1;
FeatureLists feature_lists =2;
}
SequenceExample 객체는 문맥을 설명하는 Feature 객체와 Featurelists 객체를 포함합니다.
FeatureList는 Feature 객체의 리스트를 포함하고 있습니다. Feature 객체는 바이트 스트링의 리스트, 정수·실수의 리스트일 수 있습니다. 앞서 든 예시에서 Feature는 하나의 문장일 것입니다. 즉, 단어(id)의 리스트일 것입니다.
FeatureLists는 string 이름과 featurelist를 가진 객체를 포합합니다. 예를 들어 Featurelist의 이름이 content이고, 또 다른 이름이 comments입니다.
SequenceExample를 직렬화하고 파싱하는 것은 Example과 유사합니다. tf.io.parse_single_sequence_example()를 사용하고, 배치를 파싱하려면 tf.io.parse_sequence_example()를 사용합니다.
두 함수 모두 context와 feature_lists를 튜플 형태로 반환합니다. Example에서 처럼 가변 길이의 sequence를 담고 있다면 tf.RaggedTensor.from_sparse()를 사용해 래그드 텐서로 바꿀 수 있습니다.
from tensorflow.train import FeatureList, FeatureLists, SequenceExample
# 예시 만들기
context = Features(feature={
'author_id': Feature(int64_list = Int64List(value=[123])),
'title': Feature(bytes_list=BytesList(value=[b"A", b"desert", b"place", b'.'])),
'pub_date': Feature(int64_list=Int64List(value=[1623, 12, 25]))
})
content = [['When', 'shall', 'we', 'three', 'meet', 'again', '?'],
['In', 'thunder', ',', 'lightning', ',', 'or', 'in', 'rain', '?']]
comments = [['When', 'the', 'hurlyburly', "'s", 'done', ','],
['When', 'the', 'battle', "'s", 'lost', 'and', 'won', '.']]
# bytestring으로 바꾸기
def words_to_feature(words):
return Feature(bytes_list=BytesList(value=[word.encode('utf-8') for word in words]))
content_features = [words_to_feature(sentence) for sentence in content]
comments_features = [words_to_feature(comment) for comment in comments]
# 예시 SeqeunceExample 객체 만들기
sequence_example = SequenceExample(
context=context,
feature_lists = FeatureLists(feature_list={
'content': FeatureList(feature = content_features),
'comments': FeatureList(feature = comments_features)
})
)
# SequenceExample 직렬화
serialized_sequence_example = sequence_example.SerializeToString()
# context를 담을 Feature 객체 만들기
context_feature_descriptions = {
'author_id': tf.io.FixedLenFeature([], tf.int64, default_value=0),
'title': tf.io.VarLenFeature(tf.string),
'pub_date': tf.io.FixedLenFeature([3], tf.int64, default_value=[0, 0, 0])
}
# feturelists를 담을 FeatureLists 객체 만들기
sequence_feature_descriptions={
'content': tf.io.VarLenFeature(tf.string),
'comments': tf.io.VarLenFeature(tf.string),
}
# parsing 하기
parsed_context, parsed_feature_lists = tf.io.parse_single_sequence_example(
serialized_sequence_example, context_feature_descriptions,
sequence_feature_descriptions
)
parsed_context
'''
{'author_id': <tf.Tensor: shape=(), dtype=int64, numpy=123>,
'pub_date': <tf.Tensor: shape=(3,), dtype=int64, numpy=array([1623, 12, 25])>,
'title': <tensorflow.python.framework.sparse_tensor.SparseTensor at 0x7fc2a887a390>}
'''
parsed_feature_lists
'''
{'comments': <tensorflow.python.framework.sparse_tensor.SparseTensor at 0x7fc2ae0c3ed0>,
'content': <tensorflow.python.framework.sparse_tensor.SparseTensor at 0x7fc2a8b43a50>}
'''
# RaggedTensor로 바꾸어 출력
print(tf.RaggedTensor.from_sparse(parsed_feature_lists['content']))
'''
<tf.RaggedTensor [[b'When', b'shall', b'we', b'three', b'meet', b'again', b'?'],
[b'In', b'thunder', b',', b'lightning', b',', b'or', b'in', b'rain', b'?']]>
'''
효율적으로 데이터를 저장, 파싱하는 방법을 알았으니 다음 단계로 신경망에 주입할 수 있도록 데이터를 준비해보겠습니다.
입력 특성 전처리
신경망에 데이터를 입력하려면 모든 특성을 수치로 바꾸고 정규화해야 합니다. 미리 전처리한 파일을 준비할 수도 있지만, 데이터 API로 데이터를 적재할 때 동적으로 전처리할 수 있습니다. (예를 들면 map() 메서드) 또는 전처리 층을 모델에 직접 포함시킬 수도 있습니다.
여기서는 전처리 층을 모델에 추가하는 방법에 대해 알아보겠습니다. 다음과 같은 사용자 정의 층을 추가합니다.
class Standardization(keras.layers.Layer):
def adapt(self, data_sample):
self.means_ = data_sample.mean()
self.stds_ = data_sample.std()
def call(self, inputs):
return (inputs - self.means_)/(self.stds_ + keras.backend.epsilon())
이 층은 sklearn의 StandardScaler처럼 사용하기 전에 adapt() 메서드를 호출해 특성에 대한 평균, 표준 편차를 구해야합니다. 샘플은 적당히 크면 충분합니다.
std_layer = Standardization()
std_layer.adapt(data_sample)
std_layer.means_
'''
longitude -119.603517
latitude 35.652105
housing_median_age 28.773643
total_rooms 2625.392054
total_bedrooms 535.001175
population 1419.555426
households 497.225000
median_income 3.901520
dtype: float64
'''
model=keras.Sequential()
model.add(std_layer)
[...] # 모델을 구성합니다.
mode.compile([..])
model.fit([..])
Standardization을 만들지 않고 keras.layers.Normalization 층을 사용할 수도 있습니다. Standardization에서처럼 adapt() 메서드에 샘플 데이터를 전달한 후 보통 층처럼 사용합니다.
원-핫 벡터를 사용해 범주형 특성 인코딩하기
예제에서 사용하는 캘리포니아 주택 데이터셋에는 ocean_proximity 특성이 있습니다. 이 특성은 object type으로 'near bay' 등의 값으로 되어있어, 인코딩해야합니다. 범주 개수가 매우 작으므로 원-핫 인코딩을 사용할 수 있습니다.
lookup 테이블을 사용해 각 범주를 인덱스(0~4까지)로 매핑합니다.
vocab = ['NEAR BAY', '<1H OCEAN', 'INLAND', 'NEAR OCEAN', 'ISLAND']
indices = tf.range(len(vocab), dtype=tf.int64)
table_init = tf.lookup.KeyValueTensorInitializer(vocab, indices)
num_oov_buckets =2
table = tf.lookup.StaticVocabularyTable(table_init, num_oov_buckets)
먼저 어휘 사전(vocab)을 정의합니다. 이는 가능한 모든 범주의 리스트입니다. 그 다음 범주에 해당하는 인덱스(0~4까지)의 텐서를 만듭니다. 다음으로 범주와 해당 인덱스를 전달하여 룩업 테이블을 위해 초기화 객체를 만듭니다. 텍스트 파일에 라인 당 하나의 범주로 나열되어 있다면 TextFileInitializer를 사용합니다.
마지막으로 초기화 객체에 oov(out-of-vocabulary) 버킷을 지정해 룩업 테이블을 만듭니다. 어휘 사전에 없는 범주를 찾으면 룩업 테이블이 계산한 이 범주의 해시값을 이용하여 oov 버킷 중 하나에 할당합니다. 인덱스는 알려진 범주 다음부터 시작하므로 이 예제에서 oov 버킷의 인덱스는 5와 6입니다.
oov를 사용하는 이용하는 이유는 범주가 너무 많은 데이터는 전체 범주를 구하기 어렵기 때문입니다. 때문에 샘플 데이터로 어휘 사전을 정의하고 샘플에 없는 범주는 oov 버킷에 추가하는 것입니다. 훈련 도중 발견되는 알려지지 않은 범주가 많을 수록 더 많은 oov 버킷을 사용해야 합니다. 그렇지 않으면 다른 범주가 동일한 버킷에 할당되어 신경망이 이 두 범주를 구분할 수 없습니다.
categories = tf.constant([ '<1H OCEAN', 'INLAND', 'ISLAND'])
cat_indices = table.lookup(categories)
'''
<tf.Tensor: shape=(3,), dtype=int64, numpy=array([1, 2, 4])>
'''
cat_one_hot = tf.one_hot(cat_indices, depth=len(vocab)+num_oov_buckets)
'''
<tf.Tensor: shape=(3, 7), dtype=float32, numpy=
array([[0., 1., 0., 0., 0., 0., 0.],
[0., 0., 1., 0., 0., 0., 0.],
[0., 0., 0., 0., 1., 0., 0.]], dtype=float32)>
'''
'<1H OCEAN'는 1에 매핑 되었고, 'INLAND' 2에 , 'ISLAND' 4에 매핑되었습니다.
이 방법 말고도 tf.one_hot() 함수를 사용해 원-핫 인코딩을 할 수 있습니다. 이 함수는 어휘 사전 크기와 oov 버킷 수를 더한 인덱스 총 개수를 지정해야 합니다.
keras.layers.TextVectorization 층을 사용하면 adapt() 메서드가 샘플 데이터에서 어휘 사전을 추출하고 call() 메서드가 각 범주를 어휘사전에 있는 인덱스로 변환합니다. 인덱스를 원-핫 벡터로 바꾸고 싶다면 이 층을 모델의 시작 부분에 추가하고 뒤이어 tf.one_hot() 함수가 적용된 Lambda 층을 놓을 수 있습니다.
가능한 범주가 몇 개 되지 않을 때는 원-핫 벡터를 사용해도 괜찮지만, 어휘 사전이 크면 임베딩(embedding)을 사용하여 인코딩하는 것이 훨씬 효율적입니다.
Tip
범주 개수가 10개 이하이면 원-핫 인코딩. 범주 개수가 50개 이상이면 (종종 해시 버킷을 사용하곤 합니다.) 임베딩이 선호됩니다.
임베딩을 사용해 범주형 특성 인코딩
임베딩은 범주를 표현하는 훈련 가능한 밀집 벡터입니다. 처음엔 임베딩이 랜덤하게 초기화되어 있습니다.
예를 들어 'NEAR BAY' 범주는 초기에 [0.131, 0.890]과 같은 랜덤 벡터로 표현되고 'NEAR OCEAN'는 [0.631, 0.791]과 같은 또 다른 랜덤 벡터로 표현됩니다. 이 예에서는 2D 임베딩을 사용하지만, 차원 수는 수정 가능한 하이퍼파라미터입니다.
비슷한 범주끼리는 경사하강법이 더 가깝게 만듭니다. 그림에서 'NEAR BAY'와 'NEAR OCEAN'은 가까워지는 반면에, INLAND 범주의 임베딩은 점점 멀어집니다.
표현이 좋을 수록 신경망이 정확한 예측을 만들기 때문에, 범주가 유용하게 표현되도록 임베딩이 훈련되는 경향이 있는데, 이를 표현 학습(representation learning)이라고 부릅니다.
임베딩은 종종 다른 작업에도 성공적으로 대사용될 수 있습니다. 대표적인 예로 단어 임베딩(word embedding(즉, 개별 단어의 임베딩))입니다. 자연어 처리 작업을 수행할 때 사전훈련된 임베딩을 재사용하는 것이 나은 경우가 많습니다.
벡터를 사용해 단어를 표현하는 아이디어를 이용해 주어진 단어의 근처 단어를 예측하도록 신경망을 훈련할 수 있습니다. 가령, 비슷한 말은 임베딩이 매우 비슷합니다. France, Spain, Italy와 같이 의미가 관련된 단어는 함께 군집을 이룹니다.
단어 임베딩에서 근접한 정도만 얻는 것이 아니라 어떤 의미가 있는 축을 따라 임베딩 공간안에서 조직됩니다. 만약 King-Man+Woman을 계산하면 (즉 이 단어의 임베딩 벡터를 더하고 빼면) 결과값은 Queen 단어의 임베딩과 매우 가까이 있을 것입니다. 달리 말하면, 이 단어 임베딩이 성별의 개념을 인코딩했습니다.
임베딩을 직접 구현해보겠습니다. 각 범주의 임베딩을 담은 임베딩 행렬(embedding matrix)를 만들어 랜덤하게 초기화합니다. 이 행렬은 (범주+oov 버킷의 수)만큼 행이 있고, 임베딩 차원만큼 열을 가집니다.
embedding_dim =2
embed_init = tf.random.uniform([len(vocab)+num_oov_buckets, embedding_dim]) # 상수
embedding_matrixs = tf.Variable(embed_init) # 변수
'''
<tf.Variable 'Variable:0' shape=(7, 2) dtype=float32, numpy=
array([[0.72614944, 0.6113223 ],
[0.9102359 , 0.46622014],
[0.10637951, 0.6561878 ],
[0.5362897 , 0.1684525 ],
[0.01104009, 0.1858697 ],
[0.23930717, 0.333506 ],
[0.6299019 , 0.12305474]], dtype=float32)>
'''
일반적으로 임베딩은 작업과 어휘 사전의 크기에 따라 100~300 차원을 가집니다. 이 값은 튜닝할 하이퍼파라미터값입니다. embedding_matrix는 변수이므로 경사하강법으로 학습할 수 있습니다.
categories = tf.constant(['NEAR BAY', 'DESERT', 'INLAND', 'INLAND'])
cat_indices = table.lookup(categories)
# <tf.Tensor: shape=(4,), dtype=int64, numpy=array([0, 5, 2, 2])>
tf.nn.embedding_lookup(embedding_matrixs, cat_indices)
'''
<tf.Tensor: shape=(4, 2), dtype=float32, numpy=
array([[0.72614944, 0.6113223 ],
[0.23930717, 0.333506 ],
[0.10637951, 0.6561878 ],
[0.10637951, 0.6561878 ]], dtype=float32)>
'''
tf.nn.embedding_lookup() 함수는 임베딩 행렬에서 주어진 인덱스의 행을 찾습니다. 반환 결과의 3, 4번째 행이 같음을 알 수 있습니다.
케라스는 임베딩 행렬을 처리해주는 keras.layers.Embedding 층을 제공합니다. 이 층이 생성될 때 임베딩 행렬을 랜덤하게 초기화하고 어떤 인덱스로 호출될 때 그 인덱스의 행을 반환합니다.
embedding = keras.layers.Embedding(input_dim=len(vocab)+num_oov_buckets,
output_dim = embedding_dim)
embedding(cat_indices)
'''
<tf.Tensor: shape=(4, 2), dtype=float32, numpy=
array([[-0.03652262, 0.00017315],
[ 0.03099396, -0.01420873],
[ 0.01779428, 0.0018383 ],
[ 0.01779428, 0.0018383 ]], dtype=float32)>
'''
범주형 특성을 처리하고 각 범주( + oov 버킷)마다 임베딩을 학습하는 케라스 모델을 만들 수 있습니다.
# 켈리포니아 주택의 숫자형 특성 입력
regular_inputs = keras.layers.Input(shape=[8])
# 켈리포니아 주택의 문자열 특성
categories = keras.layers.Input(shape=[], dtype=tf.string)
# lookup 테이블을 통해 카테고리를 인덱스로 변환
cat_indices = keras.layers.Lambda(lambda cats: table.lookup(cats))(categories)
# 샘플의 범주에 해당하는 임베딩 반환(임베딩이 6개의 범주를 가지고 2D임)
cat_embed = keras.layers.Embedding(input_dim=6, output_dim=2)(cat_indices)
# 입력을 합침
encoded_inputs = keras.layers.concatenate([regular_inputs, cat_embed])
# 여기서 부터 신경망을 추가하면 된다.
outputs = keras.layers.Dense(1)(encoded_inputs)
model = keras.models.Model(inputs=[regular_inputs, categories],
outputs=[outputs])
outputs는 어떤 신경망도 추가할 수 있지만, 여기서는 간단하게 완전 연결층 하나를 추가해서 케라스 모델을 만들었습니다.
keras.layers.TextVectorization 층을 사용할 수 있다면 adapt() 메서드를 호출하여 샘플 데이터에서 어휘사전을 추출합니다.(이 함수에서 룩업 테이블을 만들어줍니다.) 그 다음 이 층을 모델에 추가해 인덱스 룩업을 수행할 것입니다. (Lambda 층을 대신해서)
Note
원-핫 인코딩 + Dense(활성화함수 x, 편향 x) 보다는 Embedding 층이 더 적은 연산을 사용합니다. 앞의 경우는 Dense 층의 가중치 행렬이 임베딩 행렬의 역할을 수행합니다. 예를 들어 길이가 20인 원-핫 벡터와 10개의 유닛을 가진 Dense층을 사용하는 것은 결국 input_dim=20이고 output_dim=10인 Embedding 층을 사용하는 것과 같습니다.
따라서 Embedding 층의 임베딩 차원(output_dim)을 뒤따르는 층의 유닛 개수보다 많이 사용하는 것은 낭비입니다.??
케라스 전처리 층
텐서플로에서는 케라스 전처리 층을 제공하기 위해 노력하고 있습니다. 이런 전처리 층 중에서 두 개를 알아보았습니다. keras.layers.Normalization 층은 특성 표준화를 수행하고(Standardization 층과 동일), TextVectorization 층은 입력에 있는 각 단어를 어휘 사전에 있는 인덱서로 인코딩합니다.
두 경우 모두 층을 만들고 샘플 데이터로 adapt() 메서드를 호출한 다음 일반적인 층처럼 모델에 사용할 수 있습니다. 다른 전처리 층도 동일한 패턴을 따릅니다.
keras.layers.Discretization 층은 연속적인 데이터를 몇 개의 구간으로 나눠 그 구간을 원-핫 인코딩합니다. 예를 들면 가격 데이터를 높음, 중간, 낮음으로 나누고 각각을 [1, 0, 0], [0, 1, 0], [0, 0, 1]로 인코딩할 수 있습니다.
실제 모델의 전처리 층은 훈련하는 동안 동결되어서 층의 파라미터는 경사 하강법에 영향을 받지 않습니다.
PreprocessingStage 클래스를 사용해 여러 전처리 층을 연결할 수 있습니다. 아래 코드는 입력을 먼저 정규화하고 그 다음 이산화(discretization)하는 전처리 파이프라인을 만듭니다. 이 파이프라인을 adapt() 한 후 일반적인 층처럼 모델에 사용할 수 있습니다.
normalization = keras.layers.Normalization()
discretization = keras.layers.Discretization([...])
pipeline = keras.layers.PreprocessingStage([normalization, discretization])
pipeline.adapt(data_sample)
TextVectorization 층은 단어 카운트 벡터를 출력합니다. 어휘 사전에 ['and', 'basektball', 'more']가 있을 때, 'more and more'은 [1, 0, 2]로 매핑됩니다. 이는 순서를 무시해 BOW로 부릅니다. and의 경우 흔하게 등장하므로 대부분의 텍스트에서 큰 값을 가집니다. 따라서 단어 카운트에서 자주 등장하는 단어는 중요도를 줄이도록 정규화해야합니다. TF - IDF로 이를 정규화합니다. 예를 들어 'and', 'basketball', 'more'가 훈련 세트에서 각각 200, 10, 100번 등장하고 전체 샘플 개수가 500개라면 최종 백터는 [1*log(1+500/(1+200)), 0*log(1+500/(1+10)), 2*log(1+500/(1+100))]으로 계산되어 [1.25, 0., 3.57]이 됩니다.
TextVectorization 층에 TF-IDF을 추가하는 기능이 있습니다.
Note
keras.layers.PreprocessingLayer 층을 상속해 사용자 정의 전처리 층을 만들 수 있습니다. data_sample 매개 변수와 reset_state 매개변수를 받는 adapt() 메서드를 구현하면 됩니다. reset_state =True면 기존 상태를 초기화합니다.
TF 변환
전처리는 계산 비용이 크기 때문에 훈련 과정 전에 전처리를 수행하면 수행 속도를 크게 높일 수 있습니다. 즉, 데이터는 훈련 동안 에포크마다 처리되는 것이 아니라 샘플마다 한 번씩 전처리됩니다. 데이터가 RAM에 들어갈만큼 작다면, cache() 메서드를 이용할 수 있지만, 그렇지 않다면 Apache Beam, SPark 같은 도구를 사용하면 됩니다. 이 라이브러리들은 대규모 데이터에서 효율적인 데이터 처리 파이프라인을 수행할 수 있어서 훈련 전에 모든 훈련 데이터를 전처리할 수 있습니다.
아파치 빔은 다양한 데이터 처리 파이프라인 엔진을 단일 프로그래밍 모델로 구현한 것입니다. Spark, Flink, Dataflow 등을 지원합니다. Spark는 대용량 데이터 처리를 위한 오픈 소스 분산 클러스터 컴퓨팅 프레임워크입니다.
이런 방식은 훈련 속도를 높일 수 있지만, 배포시에 모델에 주입될 데이터의 전처리를 위한 코드를 앱에 추가해야합니다. 전처리 과정을 바꿀 때마다, 아파치 빔과 배포시 추가한 전처리 층의 코드를 수정해야합니다. 훈련 전에 수행한 전처리 연산과 앱, 브라우저에서 수행하는 전처리가 차이가 날 수 있습니다. 이런 훈련/서빙 왜곡(training/serving skew)은 버그나 성능 감소로 이어질 수 있습니다.
전처리 연산을 딱 한 번만 정의하려면 TF 변환을 사용하면 됩니다. TF 변환은 텐서플로 모델 상품화를 위해 end-to-end 플랫폼인 TFX(TensorFlow Extended)의 일부분입니다. TFX는 스케일링, 버킷 할당(bucketizing) 등과 같은 TF 변환 함수를 사용해 전처리 함수를 한 번만 정의하면 됩니다. 아래는 두 개의 특성을 전처리하는 함수입니다.
import tensorflow_transform as tft
def preprocess(inputs): # inputs = 입력 특성의 배치
median_age = inputs['housing_median_age']
ocean_proximity = inputs['ocean_proximity']
standardized_age = tft.scale_to_z_score(median_age)
ocean_proximity_id = tft.compute_and_apply_vocabulary(ocean_proximity)
return{
'standardized_median_age': standardized_age,
'ocean_proximity_id': ocean_proximity_id
}
preprocess() 함수를 아파치 빔을 사용해 전체 훈련 세트에 적용할 수 있습니다. (이를 위해 TF 변환은 아파치 빔 파이프라인에서 사용할 수 있는 AnalyzeAndTransformDataset 클래스를 제공합니다.) 이 과정에서 전체 훈련 세트에 대해서 필요한 모든 통계를 계산합니다. 이 예에서는 housing_median_age 특성의 평균, 표준편차를 계산하고, ocean_proximity 특성의 어휘 사전을 계산합니다. 이런 통계를 계산하는 컴포넌트를 Analyzer라고 부릅니다.
TF 변환은 배포할 모델에 추가할 수 있도록 동일한 역할의 텐서플로 함수를 생성합니다. 이 TF 함수는 아파치 빔에서 계산한 모든 통계(평균, 표준편차, 어휘 사전)에 해당하는 상수를 가지고 있습니다.
데이터 API, TFRecord, 케라스 전처리 층, TF 변환을 이용해 확장성 좋은 입력 파이프라인을 구축하고 빠르고 이식성 좋은 데이터 전처리를 손쉽게 수행할 수 있습니다.
표준 데이터셋을 사용하고 싶다면 TFDS를 이용하면됩니다.
텐서플로 데이터셋(TFDS) 프로젝트
텐서플로 데이터셋(https://tensorflow.org/datasets)을 사용하면 널리 사용하는 데이터셋을 손쉽게 다운로드할 수 있습니다. MNIST나 패션 MNIST 부터 이미지넷 같은 커다란 데이터셋까지 제공합니다. 텍스트, 오디오, 비디오 데이터셋도 포함됩니다. https://homl.info/tfds에서 각 데이터셋에 대한 설명과 전체 리스트를 볼 수 있습니다.
pip을 이용해 tensorflow-datasets 라이브러리를 설치합니다.
!pip install tensorflow-datasets
tfds.load() 함수를 호출하면 원하는 데이터를 다운로드하고 데이터 셋의 딕셔너리(훈련용, 테스트용)로 데이터를 반환합니다.
import tensorflow_datasets as tfds
dataset = tfds.load(name='mnist')
mnist_train, mnist_test = dataset['train'], dataset['test']
그 다음 원하는 변환(셔플링, 배치 나누기, 프리페칭 등)을 적용한 다음 모델을 훈련하기 위한 준비를 마칩니다. 다음은 예시 코드입니다.
mnist_train = mnist_train.shuffle(10000).batch(32).prefetch(1)
for item in mnist_train:
images = item['image']
labels = item['label']
[...]
Tip
load() 함수에 shuffle_files = True로 지정하면 다운로드한 파일을 섞을 수 있습니다. 하지만, 이 셔플링 충분하지 않아서 더 섞는 것이 좋습니다. load() 함수는 다운로드한 데이터셋을 ~/tensorflow_datasets/mnist 디렉터리 아래에 샤드 여러개로 나누어 저장합니다. load() 함수의 shuffle_files 매개변수의 기본값은 False입니다. 훈련 데이터와 테스트 데이터의 샤드 개수는 데이터 셋마다 다른데, MNIST 데이터셋의 경우 훈련 데이터는 10개의 샤드, 테스트 세트는 1개의 샤드를 사용합니다.
데이터셋에 있는 item은 특성과 레이블을 담은 딕셔너리인데, 케라스는 두 원소를 담은 튜플 (특성, 레이블)을 원합니다. map() 메서드를 사용해 데이터셋을 아래와 같이 변환할 수 있습니다.
mnist_train = mnist_train.shuffle(10000).batch(32)
mnist_train = mnist_train.map(lambda items: (items['image'], items['label']))
mnist_train = mnist_train.prefetch(1)
이 방법보다 load() 함수에 as_supervised=True로 지정하는 것이 더 간단합니다. (당연히 이는 레이블된 데이터셋에만 적용할 수 있습니다.) 또한 원하는 배치 크기를 지정할 수도 있습니다.
tf.keras 모델에 이 데이터셋을 바로 전달할 수 있습니다. in_memory = True로 설정해 데이터셋을 메모리로 적재하여 처리 속도를 높일 수 있습니다.
dataset = tfds.load(name='mnist', batch_size=32, as_supervised=True)
mnist_train = dataset['train'].prefetch(1)
model = keras.models.Sequential([...])
model.compile(loss='sparse_categirical_crossentropy', optimizer='sgd')
model.fit(mnist_train, epochs=5)
딥러닝은 대량의 데이터를 다루는 경우가 많고 이런 데이터를 효율적으로 적재, 파싱, 전처리하는 것이 아주 중요합니다.
모르는 부분: save_to_multiple_csv_files 에서 repr(), np.c_, print(end파라미터), interleave(),
interleave 다시 복습, ?있는 부분 다시 공부
출처: 핸즈온 머신러닝 2판
사진출처:
핸즈온 머신러닝 2판
'핸즈온 머신러닝 2판' 카테고리의 다른 글
15장 RNN과 CNN을 사용해 시퀀스 처리하기 (0) | 2022.01.21 |
---|---|
14장 합성곱 신경망을 사용한 컴퓨터 비전 (0) | 2022.01.15 |
12장 텐서플로를 사용한 사용자 정의 모델과 훈련 (0) | 2022.01.04 |
11장 심층 신경망 훈련하기 (0) | 2021.12.30 |
인공 신경망 소개 (0) | 2021.12.23 |