2023년 3월 26일 일요일

Pytorch forecasting 사용법

위치: D:\2023\TemporalFusionTransformer

설치 방법 (가상환경 torch310) 

pip install pytorch-lightning
pip install pytorch_forecasting
# version < 2.0 for torch gpu version
# 2023년 3월. torch 2.0을 지원하지 않아, gpu버전 사용을 위해 따로 torch 설치
# numpy버전 맞지 않아 재 설치로 오류 해결  
pip install torch==1.13.1+cu117 torchvision==0.14.1+cu117 torchaudio==0.13.1
--extra-index-url https://download.pytorch.org/whl/cu117
pip install numpy<1.24  # Solve version collision


import numpy as np
import pandas as pd
from pytorch_forecasting import Baseline, TemporalFusionTransformer, TimeSeriesDataSet

sample_data = pd.DataFrame(
    dict(
        time_idx=np.tile(np.arange(6), 3),
        target=np.array([0,1,2,3,4,5,20,21,22,23,24,25,40,41,42,43,44,45]),
        group=np.repeat(np.arange(3), 6),
        holidays = np.tile(['X','Black Friday', 'X','Christmas','X', 'X'],3),
    )
)
sample_data

# group이 3개(0, 1, 2), holidays는 각 group에 대해 일정
# time_index는 첫 col에 주어짐.
time_idx    target  group   holidays
0   0   0   0   X
1   1   1   0   Black Friday
2   2   2   0   X
3   3   3   0   Christmas
4   4   4   0   X
5   5   5   0   X
6   0   20  1   X
7   1   21  1   Black Friday
8   2   22  1   X
9   3   23  1   Christmas
10  4   24  1   X
11  5   25  1   X
12  0   40  2   X
13  1   41  2   Black Friday
14  2   42  2   X
15  3   43  2   Christmas
16  4   44  2   X
17  5   45  2   X
#----------------------------------------------------

# create the time-series dataset from the pandas df
dataset = TimeSeriesDataSet(
    sample_data,
    group_ids=["group"],
    target="target",
    time_idx="time_idx",
    max_encoder_length=2,
    max_prediction_length=3,
    time_varying_unknown_reals=["target"],
    static_categoricals=["holidays"],
    target_normalizer=None
)

# pass the dataset to a dataloader
dataloader = dataset.to_dataloader(batch_size=1)

#load the first batch
x, y = next(iter(dataloader))
print(x['encoder_target'])
print(x['groups'])
print(x['decoder_target'])

# 2개의 값을 encoder로 이용. 3개의 값이 prediction
tensor([[21., 22.]])
tensor([[1]])
tensor([[23., 24., 25.]])
#----------------------------------------------------


가구별 전력량 예측용 데이터의 경우

max_prediction_length = 24  # 이전 7일을 보고 1일 후를 예측
max_encoder_length = 7*24
# 마지막 하루(24시간) 빼고 학습
training_cutoff = time_df["hours_from_start"].max() - max_prediction_length

training = TimeSeriesDataSet(
    time_df[lambda x: x.hours_from_start <= training_cutoff],
    time_idx="hours_from_start",
    target="power_usage",
    group_ids=["consumer_id"],  # group이 여러개
    min_encoder_length=max_encoder_length // 2,
    max_encoder_length=max_encoder_length,
    min_prediction_length=1,
    max_prediction_length=max_prediction_length,
    static_categoricals=["consumer_id"],  # 고객 id는 불변
    time_varying_known_reals=["hours_from_start","day","day_of_week", "month", 'hour'],
    time_varying_unknown_reals=['power_usage'],
    # 정규화하기 전에 softplus변환 후에 정규화 실행(log/logp1/logit/relu등 있음)
    # 각 group별로 정규화. group이 여러개 있고, 크기 범위가 다르다.
    target_normalizer=GroupNormalizer(
        groups=["consumer_id"], transformation="softplus"
    ),  # we normalize by group
    add_relative_time_idx=True,
    add_target_scales=True,
    add_encoder_length=True,
)

validation = TimeSeriesDataSet.from_dataset(training, time_df,
predict=True, stop_randomization=True)

# create dataloaders for  our model
batch_size = 64
# to_dataloader를 통해, torch의 dataloader처럼 동작함
# if you have a strong GPU, feel free to increase the number of workers  
train_dataloader = training.to_dataloader(train=True, batch_size=batch_size, num_workers=0)
val_dataloader = validation.to_dataloader(train=False, batch_size=batch_size * 10, num_workers=0)



[References]

1. Medium blog TFT: https://towardsdatascience.com/temporal-fusion-transformer-time-series-forecasting-with-deep-learning-complete-tutorial-d32c1e51cd91

2. [2023년 4월]

-XGBoost, LightGBM: https://www.youtube.com/watch?v=4Jz4_IOgS4c

-WRN 코드: https://github.com/creinders/ChimeraMix/tree/main/models

-데이터 분석-클리닝: https://double-d.tistory.com/m/14

-데이터 정제와 정규화-사이킷런 기초: https://cyan91.tistory.com/m/40

-Data cleaning in 5 easy steps+Examples: 

https://www.iteratorshq.com/blog/data-cleaning-in-5-easy-steps/

-우리가 pytorch lightning을 써야 하는 이유: 

https://baeseongsu.github.io/posts/pytorch-lightning-introduction/

-트랜스포머 이해 굿: https://www.youtube.com/watch?v=AA621UofTUA

-OpenRefine 툴: https://www.youtube.com/watch?v=nORS7STbLyk / https://www.youtube.com/watch?v=oRH-1RG8oQY

-TFT 적용 예제: https://github.com/IKKIM00/stock-and-pm2.5-prediction-using-TFT / https://dacon.io/competitions/official/235736/data


Pandas 사용법

 위치: D:\2023\TemporalFusionTransformer

# index_col: 원하는 col을 index로 지정하여 불러오기(첫 col을 index로)
data = pd.read_csv('LD2011_2014.txt', index_col=0, sep=';', decimal=',')
# series나 df를 datetime객체로 변환:
#             따라서 여기서는 index이면서, datetime임(2가지 속성)
data.index = pd.to_datetime(data.index)
data.sort_index(inplace=True)
data.head(5)


# resampling후 np.nan값은 0.으로.
data = data.resample('1h').mean().replace(0., np.nan)
earliest_time = data.index.min()
df=data[['MT_002', 'MT_004', 'MT_005', 'MT_006', 'MT_008' ]]

df_list = []
for label in df:
    # (1) index는 MT_002 col추출 시 자동으로 함께 추출(index이므로)
    # (2) index는 datetime이므로 date, time도 추출 가능
    #     ts.index.date(날짜), ts.index.time(시간) 등.
    ts = df[label]

    # ffill(front fill: 앞값으로 채움), bfill(back fill: 뒷값으로 채움)
    start_date = min(ts.fillna(method='ffill').dropna().index)
    end_date = max(ts.fillna(method='bfill').dropna().index)

    # 필요한 data 부분을 slicing하기 위해 activae_range영역을 만듬.
    # True가 되는 부분만 slicing됨. Na, NaN이 아닌 값만 추출됨
    active_range = (ts.index >= start_date) & (ts.index <= end_date)
    ts = ts[active_range].fillna(0.)

    tmp = pd.DataFrame({'power_usage': ts})
    date = tmp.index

    # 전력 사용량은 시간 factor가 중요함
    tmp['hours_from_start'] = (date - earliest_time).seconds / 60 / 60 + (date - earliest_time).days * 24
    tmp['hours_from_start'] = tmp['hours_from_start'].astype('int')
    tmp['days_from_start'] = (date - earliest_time).days
    tmp['date'] = date
    tmp['consumer_id'] = label
    tmp['hour'] = date.hour
    tmp['day'] = date.day
    tmp['day_of_week'] = date.dayofweek
    tmp['month'] = date.month

    #stack all time series vertically
    df_list.append(tmp)

time_df = pd.concat(df_list).reset_index(drop=True)

# match results in the original paper
time_df = time_df[(time_df['days_from_start'] >= 1096)
                & (time_df['days_from_start'] < 1346)].copy()


# building cluster based on kmeans
CLUSTER = {
    0: [19, 20, 21, 49, 50, 51],
    1: [1, 5, 9, 34],
    2: [4, 10, 11, 12, 28, 29, 30, 36, 40, 41, 42, 59, 60],
    3: [2, 3, 6, 7, 8, 13, 14, 15, 16, 17, 18, 22, 23, 24, 25, 26, 27, 31, 32, 33, 35, 37, 38, 39, 43, 44, 45, 46, 47, 48, 52, 53, 54, 55, 56, 57, 58],
}

# assing cluster number to building
for k, nums in CLUSTER.items():
    # df.num.isin(nums): 현재 df.num값이 [19,20,21,...]에 있으면 T, else F.
    # df.loc[~ ~, 'cluster']: 행 index위치에 T, F를 넣으면 T인 경우만 선택됨
    df.loc[df.num.isin(nums), 'cluster'] = k


# FEATURE: `hot` flag when the next day is holiday
# shift(-1): 위로 한칸 밀기. fillna(0): Na, NaN은 0으로.
hot = df.groupby('date').first()['holiday'].shift(-1).fillna(0).astype(int)
#hot = hot.to_frame().reset_index().rename({'holiday': "hot"}, axis=1)
#df = df.merge(hot, on='date', how='left')

# (1) 앞연산으로 동일값이 반복되는 row가 많아 나오면
# (2) first()에 의해 첫번째 row 만 추출
df.groupby('date').first()  


hot = df.groupby('date').first()['holiday'].shift(-1).fillna(0).astype(int)
# to_frame(): series를 df로, reset_index: 맨 앞에 순서 0,1,2,...를 index로 붙여줌
hot = hot.to_frame().reset_index().rename({'holiday': "hot"}, axis=1)


[Reference] 



2023년 3월 7일 화요일

BentoML 사용법 요약

> D:\2023\poop_wrn로 이동


bentoml serve service:svc --port 3010 --reload   # bentoml 서비스 실행
locust --headless -u 100 -r 1000 --run-time 1m
--host http://127.0.0.1:3010  # stress test, locustfile.py필요
bentoml build    # bentofile.yaml 필요
bentoml list   # 생성된 bento 확인
bentoml serve poopup_demo:latest
--production --port 3010   # nox39에서 실패 후, 관리자 모드로 다시 창 열어 성공
bentoml containerize poopup_demo:latest  # docker image 생성
docker run -it --rm -p 3015:3000 --gpus all poopup_demo:ov37ue65xqb4
serve --production  # docker 서비스 시작
docker save -o poopup.tar poopup_demo:~~  # docker image의 hdd저장
docker load -i poopup.tar  # docker image의 재 로드

도커 실행 명령

> docker inspect <container-id>
...
"MergedDir":~,
"UpperDir":~,  # 여기 경로에 jpg파일이 저장
"WorkDir":~
...
# docker 실행(container 생성) 시,
> docker run -it --rm -p 30xx:3000 poopup_demo:kb2~~ serve --production
> docker exec -it <container-id> /bin/bash
>> cd src  # 여기에서 jpg파일 확인 가능


1. windows에서는 set 명령으로 환경변수를 볼 수 있으며, BENTOML_HOME=d:\2023으로 설정된 것 확인 가능

> set 

2. DNN학습->Bento 생성->Docker 이미지 생성의 순으로 진행됨. 

> bentoml build  # Bento파일 생성

> bentoml containerize <bentofile:tag>

3. Bento파일 생성 시에, bentofile.yaml 설정파일 사용. Custom docker를 위해서 bentofile.yaml에 지정된 Dockerfile.template 사용

{% extends bento_base_template %}
{% block SETUP_BENTO_BASE_IMAGE %}
{{ super() }}
{% endblock %}

{% set bento__user = "poop" %}
{% set bento__home = "/home/" ~  bento__user %}
{% set bento__path = "/home/" ~ bento__user ~ "/bento" %}
{% set bento__uid_gid = 1000 %}  

{% block SETUP_BENTO_COMPONENTS %}
{{ super() }}
{% endblock %}

4. 생성된 Bento파일(BENTNML_HOME 아래)의 해당 경로에 만들어진 Dockerfile을 봐서 Dockerfile.template 적용 확인 

5. Bento 파일을 이용하여 Docker 이미지 생성

> bentoml containerize poopup_demo:latest

6. 컨테이너 실행

> docker run --user poop -it --rm -p 3010:3000 --gpus all poopup_demo:leo2pupckkeizqb4 serve --production

# --------------- service.py -------------------------------------------
from __future__ import annotations
import typing as t
from typing import TYPE_CHECKING
from torchvision import transforms
import numpy as np
from PIL.Image import Image as PILImage

import bentoml
from bentoml.io import Image
from bentoml.io import NumpyNdarray


if TYPE_CHECKING:
    from numpy.typing import NDArray

poop_runner = bentoml.models.get("poopup:latest").to_runner()
svc = bentoml.Service(name="poopup_demo", runners=[poop_runner])

xforms = transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])

def to_numpy(tensor):
    return tensor.detach().cpu().numpy()


@svc.api(input=Image(), output=NumpyNdarray(dtype="int64"))
async def predict_image(f: PILImage) -> "np.ndarray[t.Any, np.dtype[t.Any]]":
    assert isinstance(f, PILImage)

    img = np.expand_dims(xforms(f), 0)
    output_tensor = await poop_runner.async_run(img)
    # print(output_tensor, type(output_tensor))
    return output_tensor.detach().cpu().numpy()