2023년 12월 1일 금요일

timescaledb 사용하기

 1. docker compose 파일 작성

version: "3.8"

services:
  timescaledb:
    image: timescale/timescaledb:latest-pg14
    container_name: timescale
    hostname: timescaledb
    restart: always
    ports:
      - ${TIMESCALEDB_PORT}:5432
    volumes:
      - ./${TIMESCALEDB_DATA_STORE}:/var/lib/postgresql/data
    environment:
      POSTGRES_PASSWORD: ${TIMESCALEDB_PASSWORD}
      POSTGRES_USER: ${TIMESCALEDB_USER}
      POSTGRES_DB: ${TIMESCALEDB_DB}
  adminer:
    image: adminer:4.8.1
    container_name: adminer
    restart: always
    ports:
      - ${ADMINER_PORT}:8080

2. 동일 폴더에 .env파일 작성

# timescaledb
TIMESCALEDB_PORT=5432
TIMESCALEDB_DATA_STORE=timescaledb/
TIMESCALEDB_PASSWORD=timescaledb
TIMESCALEDB_USER=timescale
TIMESCALEDB_DB=timescale_database

# adminer
ADMINER_PORT=8087

3. docker compose up 실행

4. python 코드로 접속 여부 확인

import psycopg2
from pgcopy import CopyManager
# Structure of the connection string:
# "postgres://username:password@host:port/dbname"
CONNECTION = "postgres://timescale:timescaledb@localhost:5432/timescale_database"
conn = psycopg2.connect(CONNECTION)
cursor = conn.cursor()

for id in range(1, 4, 1):
     data = (id,)
     # create random data
     simulate_query = """SELECT generate_series(now() - interval '24 hour', now(), interval '5 minute') AS time,
                        %s as sensor_id,
                        random()*100 AS temperature,
                        random() AS cpu
                     """
     cursor.execute(simulate_query, data)
     values = cursor.fetchall()
     # column names of the table you're inserting into
     cols = ['time', 'sensor_id', 'temperature', 'cpu']
     # create copy manager with the target table and insert
     mgr = CopyManager(conn, 'sensor_data', cols)
     mgr.copy(values)

conn.commit()

5. chrome열어서 localhost:8087 접속하여 table생성 여부 체크

6. docker container ls --all 로 container 체크해서 아래 확인

2b2340a3b640   timescale/timescaledb:latest-pg14   "docker-entrypoint.s…"   7 days ago   Up 7 days   0.0.0.0:5432->5432/tcp   timescale
bc41c6ad6171   adminer:4.8.1                       "entrypoint.sh php -…"   7 days ago   Up 7 days   0.0.0.0:8087->8080/tcp   adminer


[참고] 

1. Postres for time series data, Medium

2023년 5월 30일 화요일

yolov8 학습 및 테스트

# yolov8 설치 후 학습과 테스트의 예


# 경로- g:\2023
> yolo task=detect mode=train model=yolov8m.pt imgsz=1280 data=fire2023_1.yaml
epochs=50 batch=16 name=yolov8m_v8_50e
> yolo predict model=best.pt source=parking_lot5.mp4  # test

2023년 5월 9일 화요일

pip가 깨졌을 때

 (1) pip가 깨어졌을 때, base에서

conda install --force-reinstall pip


(2) 아래 오류

ERROR: Could not install packages due to an OSError: [Errno 2] No such file or directory: 

> pip3 install --upgrade --user pip

2023년 5월 2일 화요일

python 패턴

 객체 주입 후 한번에 실행하기

from abc import ABC, abstractmethod
from typing import List, Tuple, Union, Dict
import torch


# Base class for creating ts analytic items
# classmethod는 staticmethod와 유사한데, staticmethod는 class변수에 access
# 안 하는데 비해, classmethod는 cls로 access한다.
class ItemBase(ABC):
    @classmethod  # instance 관점이 아닌 class전체 관점에서 변수를 다룰 수 있음
    def add_instance(cls, ins):  # class의 보편적인 값을 다룬다는 의미에서 class
        cls._items.append(ins)  # 약자인 cls를 인자로 받음.
        print(cls._items, len(cls._items))  # self대신 cls전달 받음
       
    @abstractmethod
    def run(self, *args):
        pass
   
   
# Derivated class for outlier detection
# 생성만 하면 객체 리스트가 자동으로 만들어짐
class OutlierDetector(ItemBase):
    _items = []  # ItemBase에 있었다면 모든 자식의 instance를 저장
    def __init__(self, *args):  # 여기에 있으면, 현 class 인스턴스 저장
        print(args)
        self.add_instance(self)
       
    def run(self, *args):
        print(args)
       

# Derivated class for forecasting time series
class Forecastor(ItemBase):
    _items = []
    def __init__(self, *args):
        super().__init__()
        print(args)
        self.add_instance(self)
       
    def run(self, *args):
        print(args)

       
# Collect multiple items and process them at once
class ProcessItems(object):
    def __init__(self, items: List = [])->None:
        self.items = items
       
    def process(self)->List:
        res = []
        for item in self.items:
            res.append(item.run())
        return res
           
       
a3 = ProcessItems([OutlierDetector(), Forecastor()])
res = a3.process()
print(res)


응용 예

from abc import ABC, abstractmethod
from typing import List, Tuple, Union, Dict
import torch
import pdb


# Base class for creating ts(Time sereis) analytic items
class ItemBase(ABC):
    _modes = ['start', 'stable', 'finish']
   
    @abstractmethod
    def run(self, *args):
        pass
   
    # cls로 class변수에 access 가능(staticmethod와 차이점)
    @classmethod
    def _load_weight(cls, items):
        cls.models = {}
        for key, weight in items.items():
            if key not in cls._modes:
                assert False, 'Mode Error!!'
            # cls.models[key] = cls.dnn_model(weight)  
            cls.models[key] = None    
   
# Child class for forecasting time series
class Forecastor(ItemBase):
    def __init__(self, items):
        # self.dnn_model = dnn_model()
        self._load_weight(items)
       
    def run(self, *args):
        batch, mode = args
        # outs = self.models[mode].predict(batch)
        print(batch.shape)
        return batch.shape

# Child class for outlier detection
class AnomalyDetector(ItemBase):
    def __init__(self, items):
        # self.dnn_model = dnn_model()
        self._load_weight(items)
       
    def run(self, *args):
        batch, mode = args
        print(batch.shape)
        return batch.shape

# Main processor to run all items
class Processor(object):
    def __init__(self, items):
        self.items = items
       
    def process(self, ts):
        res = []
        # mode = ts_decision(ts)  # 기동,정상,정지부 판정
        mode = 'start'
        for item in self.items:
            outs = item.run(ts, mode)
            res.append(outs)
        return res


# There are two local processes to handle time series data
forecast = Forecastor({'start':'m1.pt', 'stable': 'm2.pt', 'finish': 'm3.pt'})
anodetec = AnomalyDetector({'start':'m1.pt', 'stable': 'm2.pt', 'finish': 'm3.pt'})
batch = torch.rand(8,32,16)

# If you need to add another process, first define and simply add it to arg list
a1 = Processor([forecast, anodetec])
a1.process(batch)
                torch.Size([8, 32, 16])
                torch.Size([8, 32, 16])
Out[7]:
[torch.Size([8, 32, 16]), torch.Size([8, 32, 16])]

from abc import ABC, abstractmethod
from typing import List, Tuple, Union, Dict
import numpy as np
import torch
import pdb

def _gt(x,v): return True if x >= v else False  # _gt = lambda x, v: True if x >= v else False
def _lt(x,v): return True if x <= v else False  # _lt = lambda x, v: True if x <= v else False
def _eq(x,v): return True if x == v else False  # _eq = lambda x, v: True if x == v else False
def _in(x,v1,v2): return True if (x >= v1) & (x<= v2) else False  # _in = lambda x, v1, v2: True if (x >= v1) & (x<= v2) else False

op1={}
op1['s1'] = _gt
_gt(2,1), _lt(2,1), _in(2,3,4), op1['s1'](2,1)
(True, False, False, True)


# Base class for creating ts(Time sereis) analytic items
class ItemBase(ABC):
    @classmethod
    def add(cls, ins):
        cls._items[ins._name] = ins
       
    @classmethod
    def get_items(cls):
        return list(cls._items.keys())
   
    @classmethod
    def get_conditions(cls):
        items = []
        for k, v in cls._items.items():
            iks = [(ik, str(iv[0]).split()[1]) for ik,iv in v.terms.items()]
            items.append({k: iks})
        return items
   
    def _save_items(self, items):
        if type(items) != dict:
            return None
        for k,v in items.items():
            try:
                self.terms[k] = v
            except Exception as e:
                return None
   
    def refresh(self, ins):
        keys = self.terms.keys()
        satisfied = {}
        for k, v in ins.items():
            if k in keys:
                try:
                    check = self.terms[k][0](*v)
                except:
                    satisfied[k] = -1
                else:
                    self.terms[k][1] = check
                    satisfied[k] = check
        return satisfied    

class Derivated(ItemBase):
    class NoneDict(Dict):
        def __getitem__(self, key):
            return dict.get(self, key)
       
    _items = NoneDict()
   
    def __init__(self, _name, items):
        self.terms = {}
        self._name = _name
        self._save_items(items)


Derivated.add(Derivated('NOx', {'op1':[_gt, False], 'op2': [_lt, False], 'op3': [_in, False]}))
Derivated.add(Derivated('O2', {'op4':[_lt, False], 'op5': [_in, False]}))

if Derivated._items['NOx']:
    print(Derivated._items['NOx'].refresh({'op1': (1,2), 'op2': (3,4), 'op3': (4,1,5)}))
if Derivated._items['O2']:
    print(Derivated._items['O2'].refresh({'op4': (1,2), 'op2': (3,4), 'op3': (4,1,5)}))
{'op1': False, 'op2': True, 'op3': True}
{'op4': True}


Derivated.add(Derivated('NOxIn', {'op1':[_gt, False], 'op2': [_lt, False], 'op3': [_in, False]}))
Derivated.add(Derivated('O2out', {'op4':[_lt, False], 'op5': [_in, False]}))
Derivated.get_items()
['NOx', 'O2', 'NOxIn', 'O2out']


Derivated.get_conditions()
[{'NOx': [('op1', '_gt'), ('op2', '_lt'), ('op3', '_in')]},
 {'O2': [('op4', '_lt'), ('op5', '_in')]},
 {'NOxIn': [('op1', '_gt'), ('op2', '_lt'), ('op3', '_in')]},
 {'O2out': [('op4', '_lt'), ('op5', '_in')]}]



[References]

1. D:\2022\Pattern_Test


 

2023년 3월 26일 일요일

Pytorch forecasting 사용법

위치: D:\2023\TemporalFusionTransformer

설치 방법 (가상환경 torch310) 

pip install pytorch-lightning
pip install pytorch_forecasting
# version < 2.0 for torch gpu version
# 2023년 3월. torch 2.0을 지원하지 않아, gpu버전 사용을 위해 따로 torch 설치
# numpy버전 맞지 않아 재 설치로 오류 해결  
pip install torch==1.13.1+cu117 torchvision==0.14.1+cu117 torchaudio==0.13.1
--extra-index-url https://download.pytorch.org/whl/cu117
pip install numpy<1.24  # Solve version collision


import numpy as np
import pandas as pd
from pytorch_forecasting import Baseline, TemporalFusionTransformer, TimeSeriesDataSet

sample_data = pd.DataFrame(
    dict(
        time_idx=np.tile(np.arange(6), 3),
        target=np.array([0,1,2,3,4,5,20,21,22,23,24,25,40,41,42,43,44,45]),
        group=np.repeat(np.arange(3), 6),
        holidays = np.tile(['X','Black Friday', 'X','Christmas','X', 'X'],3),
    )
)
sample_data

# group이 3개(0, 1, 2), holidays는 각 group에 대해 일정
# time_index는 첫 col에 주어짐.
time_idx    target  group   holidays
0   0   0   0   X
1   1   1   0   Black Friday
2   2   2   0   X
3   3   3   0   Christmas
4   4   4   0   X
5   5   5   0   X
6   0   20  1   X
7   1   21  1   Black Friday
8   2   22  1   X
9   3   23  1   Christmas
10  4   24  1   X
11  5   25  1   X
12  0   40  2   X
13  1   41  2   Black Friday
14  2   42  2   X
15  3   43  2   Christmas
16  4   44  2   X
17  5   45  2   X
#----------------------------------------------------

# create the time-series dataset from the pandas df
dataset = TimeSeriesDataSet(
    sample_data,
    group_ids=["group"],
    target="target",
    time_idx="time_idx",
    max_encoder_length=2,
    max_prediction_length=3,
    time_varying_unknown_reals=["target"],
    static_categoricals=["holidays"],
    target_normalizer=None
)

# pass the dataset to a dataloader
dataloader = dataset.to_dataloader(batch_size=1)

#load the first batch
x, y = next(iter(dataloader))
print(x['encoder_target'])
print(x['groups'])
print(x['decoder_target'])

# 2개의 값을 encoder로 이용. 3개의 값이 prediction
tensor([[21., 22.]])
tensor([[1]])
tensor([[23., 24., 25.]])
#----------------------------------------------------


가구별 전력량 예측용 데이터의 경우

max_prediction_length = 24  # 이전 7일을 보고 1일 후를 예측
max_encoder_length = 7*24
# 마지막 하루(24시간) 빼고 학습
training_cutoff = time_df["hours_from_start"].max() - max_prediction_length

training = TimeSeriesDataSet(
    time_df[lambda x: x.hours_from_start <= training_cutoff],
    time_idx="hours_from_start",
    target="power_usage",
    group_ids=["consumer_id"],  # group이 여러개
    min_encoder_length=max_encoder_length // 2,
    max_encoder_length=max_encoder_length,
    min_prediction_length=1,
    max_prediction_length=max_prediction_length,
    static_categoricals=["consumer_id"],  # 고객 id는 불변
    time_varying_known_reals=["hours_from_start","day","day_of_week", "month", 'hour'],
    time_varying_unknown_reals=['power_usage'],
    # 정규화하기 전에 softplus변환 후에 정규화 실행(log/logp1/logit/relu등 있음)
    # 각 group별로 정규화. group이 여러개 있고, 크기 범위가 다르다.
    target_normalizer=GroupNormalizer(
        groups=["consumer_id"], transformation="softplus"
    ),  # we normalize by group
    add_relative_time_idx=True,
    add_target_scales=True,
    add_encoder_length=True,
)

validation = TimeSeriesDataSet.from_dataset(training, time_df,
predict=True, stop_randomization=True)

# create dataloaders for  our model
batch_size = 64
# to_dataloader를 통해, torch의 dataloader처럼 동작함
# if you have a strong GPU, feel free to increase the number of workers  
train_dataloader = training.to_dataloader(train=True, batch_size=batch_size, num_workers=0)
val_dataloader = validation.to_dataloader(train=False, batch_size=batch_size * 10, num_workers=0)



[References]

1. Medium blog TFT: https://towardsdatascience.com/temporal-fusion-transformer-time-series-forecasting-with-deep-learning-complete-tutorial-d32c1e51cd91

2. [2023년 4월]

-XGBoost, LightGBM: https://www.youtube.com/watch?v=4Jz4_IOgS4c

-WRN 코드: https://github.com/creinders/ChimeraMix/tree/main/models

-데이터 분석-클리닝: https://double-d.tistory.com/m/14

-데이터 정제와 정규화-사이킷런 기초: https://cyan91.tistory.com/m/40

-Data cleaning in 5 easy steps+Examples: 

https://www.iteratorshq.com/blog/data-cleaning-in-5-easy-steps/

-우리가 pytorch lightning을 써야 하는 이유: 

https://baeseongsu.github.io/posts/pytorch-lightning-introduction/

-트랜스포머 이해 굿: https://www.youtube.com/watch?v=AA621UofTUA

-OpenRefine 툴: https://www.youtube.com/watch?v=nORS7STbLyk / https://www.youtube.com/watch?v=oRH-1RG8oQY

-TFT 적용 예제: https://github.com/IKKIM00/stock-and-pm2.5-prediction-using-TFT / https://dacon.io/competitions/official/235736/data


Pandas 사용법

 위치: D:\2023\TemporalFusionTransformer

# index_col: 원하는 col을 index로 지정하여 불러오기(첫 col을 index로)
data = pd.read_csv('LD2011_2014.txt', index_col=0, sep=';', decimal=',')
# series나 df를 datetime객체로 변환:
#             따라서 여기서는 index이면서, datetime임(2가지 속성)
data.index = pd.to_datetime(data.index)
data.sort_index(inplace=True)
data.head(5)


# resampling후 np.nan값은 0.으로.
data = data.resample('1h').mean().replace(0., np.nan)
earliest_time = data.index.min()
df=data[['MT_002', 'MT_004', 'MT_005', 'MT_006', 'MT_008' ]]

df_list = []
for label in df:
    # (1) index는 MT_002 col추출 시 자동으로 함께 추출(index이므로)
    # (2) index는 datetime이므로 date, time도 추출 가능
    #     ts.index.date(날짜), ts.index.time(시간) 등.
    ts = df[label]

    # ffill(front fill: 앞값으로 채움), bfill(back fill: 뒷값으로 채움)
    start_date = min(ts.fillna(method='ffill').dropna().index)
    end_date = max(ts.fillna(method='bfill').dropna().index)

    # 필요한 data 부분을 slicing하기 위해 activae_range영역을 만듬.
    # True가 되는 부분만 slicing됨. Na, NaN이 아닌 값만 추출됨
    active_range = (ts.index >= start_date) & (ts.index <= end_date)
    ts = ts[active_range].fillna(0.)

    tmp = pd.DataFrame({'power_usage': ts})
    date = tmp.index

    # 전력 사용량은 시간 factor가 중요함
    tmp['hours_from_start'] = (date - earliest_time).seconds / 60 / 60 + (date - earliest_time).days * 24
    tmp['hours_from_start'] = tmp['hours_from_start'].astype('int')
    tmp['days_from_start'] = (date - earliest_time).days
    tmp['date'] = date
    tmp['consumer_id'] = label
    tmp['hour'] = date.hour
    tmp['day'] = date.day
    tmp['day_of_week'] = date.dayofweek
    tmp['month'] = date.month

    #stack all time series vertically
    df_list.append(tmp)

time_df = pd.concat(df_list).reset_index(drop=True)

# match results in the original paper
time_df = time_df[(time_df['days_from_start'] >= 1096)
                & (time_df['days_from_start'] < 1346)].copy()


# building cluster based on kmeans
CLUSTER = {
    0: [19, 20, 21, 49, 50, 51],
    1: [1, 5, 9, 34],
    2: [4, 10, 11, 12, 28, 29, 30, 36, 40, 41, 42, 59, 60],
    3: [2, 3, 6, 7, 8, 13, 14, 15, 16, 17, 18, 22, 23, 24, 25, 26, 27, 31, 32, 33, 35, 37, 38, 39, 43, 44, 45, 46, 47, 48, 52, 53, 54, 55, 56, 57, 58],
}

# assing cluster number to building
for k, nums in CLUSTER.items():
    # df.num.isin(nums): 현재 df.num값이 [19,20,21,...]에 있으면 T, else F.
    # df.loc[~ ~, 'cluster']: 행 index위치에 T, F를 넣으면 T인 경우만 선택됨
    df.loc[df.num.isin(nums), 'cluster'] = k


# FEATURE: `hot` flag when the next day is holiday
# shift(-1): 위로 한칸 밀기. fillna(0): Na, NaN은 0으로.
hot = df.groupby('date').first()['holiday'].shift(-1).fillna(0).astype(int)
#hot = hot.to_frame().reset_index().rename({'holiday': "hot"}, axis=1)
#df = df.merge(hot, on='date', how='left')

# (1) 앞연산으로 동일값이 반복되는 row가 많아 나오면
# (2) first()에 의해 첫번째 row 만 추출
df.groupby('date').first()  


hot = df.groupby('date').first()['holiday'].shift(-1).fillna(0).astype(int)
# to_frame(): series를 df로, reset_index: 맨 앞에 순서 0,1,2,...를 index로 붙여줌
hot = hot.to_frame().reset_index().rename({'holiday': "hot"}, axis=1)


[Reference] 



2023년 3월 7일 화요일

BentoML 사용법 요약

> D:\2023\poop_wrn로 이동


bentoml serve service:svc --port 3010 --reload   # bentoml 서비스 실행
locust --headless -u 100 -r 1000 --run-time 1m
--host http://127.0.0.1:3010  # stress test, locustfile.py필요
bentoml build    # bentofile.yaml 필요
bentoml list   # 생성된 bento 확인
bentoml serve poopup_demo:latest
--production --port 3010   # nox39에서 실패 후, 관리자 모드로 다시 창 열어 성공
bentoml containerize poopup_demo:latest  # docker image 생성
docker run -it --rm -p 3015:3000 --gpus all poopup_demo:ov37ue65xqb4
serve --production  # docker 서비스 시작
docker save -o poopup.tar poopup_demo:~~  # docker image의 hdd저장
docker load -i poopup.tar  # docker image의 재 로드

도커 실행 명령

> docker inspect <container-id>
...
"MergedDir":~,
"UpperDir":~,  # 여기 경로에 jpg파일이 저장
"WorkDir":~
...
# docker 실행(container 생성) 시,
> docker run -it --rm -p 30xx:3000 poopup_demo:kb2~~ serve --production
> docker exec -it <container-id> /bin/bash
>> cd src  # 여기에서 jpg파일 확인 가능


1. windows에서는 set 명령으로 환경변수를 볼 수 있으며, BENTOML_HOME=d:\2023으로 설정된 것 확인 가능

> set 

2. DNN학습->Bento 생성->Docker 이미지 생성의 순으로 진행됨. 

> bentoml build  # Bento파일 생성

> bentoml containerize <bentofile:tag>

3. Bento파일 생성 시에, bentofile.yaml 설정파일 사용. Custom docker를 위해서 bentofile.yaml에 지정된 Dockerfile.template 사용

{% extends bento_base_template %}
{% block SETUP_BENTO_BASE_IMAGE %}
{{ super() }}
{% endblock %}

{% set bento__user = "poop" %}
{% set bento__home = "/home/" ~  bento__user %}
{% set bento__path = "/home/" ~ bento__user ~ "/bento" %}
{% set bento__uid_gid = 1000 %}  

{% block SETUP_BENTO_COMPONENTS %}
{{ super() }}
{% endblock %}

4. 생성된 Bento파일(BENTNML_HOME 아래)의 해당 경로에 만들어진 Dockerfile을 봐서 Dockerfile.template 적용 확인 

5. Bento 파일을 이용하여 Docker 이미지 생성

> bentoml containerize poopup_demo:latest

6. 컨테이너 실행

> docker run --user poop -it --rm -p 3010:3000 --gpus all poopup_demo:leo2pupckkeizqb4 serve --production

# --------------- service.py -------------------------------------------
from __future__ import annotations
import typing as t
from typing import TYPE_CHECKING
from torchvision import transforms
import numpy as np
from PIL.Image import Image as PILImage

import bentoml
from bentoml.io import Image
from bentoml.io import NumpyNdarray


if TYPE_CHECKING:
    from numpy.typing import NDArray

poop_runner = bentoml.models.get("poopup:latest").to_runner()
svc = bentoml.Service(name="poopup_demo", runners=[poop_runner])

xforms = transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])

def to_numpy(tensor):
    return tensor.detach().cpu().numpy()


@svc.api(input=Image(), output=NumpyNdarray(dtype="int64"))
async def predict_image(f: PILImage) -> "np.ndarray[t.Any, np.dtype[t.Any]]":
    assert isinstance(f, PILImage)

    img = np.expand_dims(xforms(f), 0)
    output_tensor = await poop_runner.async_run(img)
    # print(output_tensor, type(output_tensor))
    return output_tensor.detach().cpu().numpy()