Developer MJ Story

LightGBM왜 써 뭐가 좋은데?? lgbm hyperparameter 어떻게 튜닝 방법(hyperopt) 본문

MachineLearning

LightGBM왜 써 뭐가 좋은데?? lgbm hyperparameter 어떻게 튜닝 방법(hyperopt)

집근처 2022. 9. 1. 10:07
반응형

왠만한 DeepLearning 방법들보다 성능이 잘나오는 LightGBM 학습 및 평가 방법에 대해 공유하겠습니다.

lgbm은 1) 빠른 학습 속도, 2) 구현량? 코딩 작음, 3) 성능 잘나옴. 등의 장점을 갖고 있습니다. kaggle에서는 lgbm을 최적화하여 baseline으로 많이 상용되고 있습니다. 내가 어떤 모델을 만들어서 학습 시켰는데, 성능 수준을 알고 싶을때 최소한 lgbm 보다 잘나와 한다... 이것도 쉽지 않습니다..ㅋㅋㅋㅋ

위에서 논의한 장점들이외에도 수많은 장점들이 있는데요. 그렇다고 단점이 없는 것도 아닙니다.

  1. hyperparameter가 많아 최적화 어렵다. 귀찮다!!!
    이건 다양한 hyperparameter optimization 방법을 이용해 극복가능하다. 필자는 hyperopt를 사용하여 최적화된 hyperparameter를 찾는다. 구현 코드는 아래에 있습니다.
  2. 데이터에 대한 의존도가 높다.
    데이터 정제 및 최적화(normalization, 추가 feature 등)에 따라 성능 차이가 크게 발생합니다. 그래서 어떤 normalization을 사용할지, 어떤 feature를 사용할지, 추가할 수 있는 feature는 없는지를 꼼꼼히 따져야 합니다.
  3. gpu 학습 최적화가 안되어 있다.
    필자가 gpu로 학습을 시도해 보았으나 학습 속도 개선을 얻지 못했다. 이는 학습 시키는 데이터셋의 특징에 따라 다를 것으로 예상되나, 제가 해본 여러 데이터셋에서 모두 속도 개선이 되지 않았다. 그래서 cpu core수에 학습 속도 영향을 많이 받는데… core 수가 적으면 학습 속도가 느립니다.

이제 lightGBM과 hyperopt를 이용한 hyperparameter 최적화 코드를 공유 하겠습니다. 저는 LightGBM의 classificaiton과 regression을 따로 class로 만들어서 사용중입니다. 그래서 두 클래스를 따로 공유 하겠습니다.

반응형

  먼저 regression class 입니다.

import numpy as np
from lightgbm import LGBMRegressor
from hyperopt import hp
from sklearn.metrics import mean_absolute_error, mean_absolute_percentage_error, mean_squared_error, r2_score
from hyperopt import STATUS_OK, Trials, fmin, hp, tpe


class LightGbmOptimizer:
    def __init__(self, cpu_counts):
        self.cpu_counts = cpu_counts
        self.params_list = {
            "boosting_type": ["gbdt", "rf", "dart"],
            "bagging_freq": np.arange(1, 12),
        }

    def get_hyper_parameter_space(self):
        return {
            "max_depth": hp.quniform("max_depth", -1, 128, 2),
            "boosting_type": hp.choice("boosting_type", self.params_list["boosting_type"]),
            "learning_rate": hp.uniform("learning_rate", 0, 0.1),
            "n_estimators": hp.quniform("n_estimators", 80, 200, 2),
            "bagging_fraction": hp.uniform("bagging_fraction", 0.5, 1),
            "bagging_freq": hp.choice("bagging_freq", self.params_list["bagging_freq"]),
            "feature_fraction": hp.uniform("feature_fraction", 0, 1),
            "max_bin": hp.quniform("max_bin", 128, 512, 8),
            "min_data_in_leaf": hp.quniform("min_data_in_leaf", 12, 32, 2),
        }

    def get_model(self, _params):
        return LGBMRegressor(
            max_depth=int(_params["max_depth"]),
            num_leaves=int((2 ** np.maximum(np.minimum(_params["max_depth"], 12), 2)) * 0.6),
            boosting_type=_params["boosting_type"],
            learning_rate=_params["learning_rate"],
            n_estimators=int(_params["n_estimators"]),
            bagging_fraction=_params["bagging_fraction"],
            bagging_freq=_params["bagging_freq"],
            feature_fraction=_params["feature_fraction"],
            max_bin=int(_params["max_bin"]),
            min_data_in_leaf=int(_params["min_data_in_leaf"]),
            verbose=-1,
            n_jobs=self.cpu_counts,
        )

    def get_train_results(
        self, _x_train, _y_train, _x_valid, _y_valid, _params, scorer=mean_absolute_percentage_error, _scaler=None
    ):
        model = self.get_model(_params)
        model.fit(_x_train, _y_train)
        pred = model.predict(_x_valid)
        pred = np.nan_to_num(pred, nan=np.finfo(float).eps)
        if _scaler != None:
            pred = _scaler.inverse_transform(pred)
        return scorer(_y_valid, pred), model

    def transform_params(self, _best_params_dict):
        for key, value in zip(self.params_list.keys(), self.params_list.values()):
            idx_params = _best_params_dict[key]
            _best_params_dict[key] = value[idx_params]
        self.params_best = _best_params_dict
        return self.params_best

    def get_trained_model(self, _x, _y, _params):
        self.model = self.get_model(_params)
        self.model.fit(_x, _y)
        return self.model

    def get_trained_model_score(self, _model, _x_test, _y_test, scorer=mean_absolute_percentage_error):
        pred = _model.predict(_x_test)
        pred = np.nan_to_num(pred, nan=np.finfo(float).eps)
        return scorer(_y_test, pred)

    def calculate_metrics(self, _y, _p):
        dict_result = {}
        dict_result["mae"] = round(mean_absolute_error(_y, _p), 2)
        dict_result["smape"] = round(self.smape(_y, _p), 2)
        dict_result["r2"] = round(r2_score(_y, _p), 4)
        return dict_result

    def smape(self, _y, _p):
        return 1 / len(_y) * np.sum(np.abs(_p - _y) / ((np.abs(_y) + np.abs(_p)) * 0.5))


from sklearn.metrics import mean_absolute_error, mean_absolute_percentage_error, mean_squared_error
from hyperopt import STATUS_OK, Trials, fmin, hp, tpe


def minizeScore(_params):
    lgbm_optimizer = LightGbmOptimizer(cpu_counts=16)
    score, model = lgbm_optimizer.get_train_results(x_train, y_train, x_valid, y_valid, _params, mean_squared_error)
    return {"loss": score, "status": STATUS_OK}


lgbm_optimizer = LightGbmOptimizer(cpu_counts=16)
best = fmin(minizeScore, lgbm_optimizer.get_hyper_parameter_space(), algo=tpe.suggest, max_evals=24)
best_parameter = lgbm_optimizer.transform_params(best)
print(best)
model = lgbm_optimizer.get_trained_model(x_train, y_train, best_parameter)
p_valid = model.predict(x_valid)
print(lgbm_optimizer.calculate_metrics(y_valid, p_valid))

 

다음은 classification class 입니다.

import numpy as np
from lightgbm import LGBMClassifier
from hyperopt import hp
from sklearn.metrics import accuracy_score
from hyperopt import STATUS_OK, Trials, fmin, hp, tpe
from sklearn.metrics import make_scorer
import pickle


def gini(truth, predictions):
    g = np.asarray(np.c_[truth, predictions, np.arange(len(truth))], dtype=np.float)
    g = g[np.lexsort((g[:, 2], -1 * g[:, 1]))]
    gs = g[:, 0].cumsum().sum() / g[:, 0].sum()
    gs -= (len(truth) + 1) / 2.0
    return gs / len(truth)


def gini_sklearn(truth, predictions):
    return gini(truth, predictions) / gini(truth, truth)


gini_scorer = make_scorer(gini_sklearn, greater_is_better=True, needs_proba=True)


class LightGbmOptimizer:
    def __init__(self, cpu_counts):
        self.cpu_counts = cpu_counts
        self.params_list = {
            # "boosting_type": ["gbdt", "rf", "dart"],
            "boosting_type": ["gbdt", "goss", "dart"],
            # "bagging_freq": np.arange(1, 12),
        }

    def get_hyper_parameter_space(self):
        return {
            "boosting_type": hp.choice("boosting_type", self.params_list["boosting_type"]),
            "max_depth": hp.quniform("max_depth", -1, 24, 2),
            "learning_rate": hp.uniform("learning_rate", 0, 0.1),
            "n_estimators": hp.quniform("n_estimators", 80, 200, 2),
            "subsample_for_bin": hp.quniform("subsample_for_bin", 150000, 250000, 20),
            "min_split_gain": hp.uniform("min_split_gain", 0, 1),
            "min_child_weight": hp.uniform("min_child_weight", 0, 1),
            "min_child_samples": hp.quniform("min_child_samples", 15, 25, 1),
            "reg_alpha": hp.uniform("reg_alpha", 0, 1),
            "reg_lambda": hp.uniform("reg_lambda", 0, 1),
        }

    def get_model(self, _params):
        return LGBMClassifier(
            boosting_type=_params["boosting_type"],
            num_leaves=int((2 ** np.maximum(np.minimum(_params["max_depth"], 12), 2)) * 0.6),
            max_depth=int(_params["max_depth"]),
            learning_rate=_params["learning_rate"],
            n_estimators=int(_params["n_estimators"]),
            subsample_for_bin=int(_params["subsample_for_bin"]),
            min_split_gain=_params["min_split_gain"],
            min_child_weight=_params["min_child_weight"],
            min_child_samples=int(_params["min_child_samples"]),
            reg_alpha=_params["reg_alpha"],
            reg_lambda=_params["reg_lambda"],
            verbose=-1,
            # n_jobs=self.cpu_counts,
            device="gpu",
        )

    def get_train_results(self, _x_train, _y_train, _x_test, _y_test, _params, scorer=accuracy_score):
        model = self.get_model(_params)
        model.fit(_x_train, _y_train)
        pred = model.predict(_x_test)
        # pred = np.nan_to_num(pred, nan=np.finfo(float).eps)
        return -scorer(_y_test, pred), model

    def transform_params(self, _best_params_dict):
        for key, value in zip(self.params_list.keys(), self.params_list.values()):
            idx_params = _best_params_dict[key]
            _best_params_dict[key] = value[idx_params]
        self.params_best = _best_params_dict
        return self.params_best

    def get_trained_model(self, _x, _y, _params):
        self.model = self.get_model(_params)
        self.model.fit(_x, _y)
        return self.model

    def get_trained_model_score(self, _model, _x_test, _y_test, scorer=accuracy_score):
        pred = _model.predict(_x_test)
        # pred = np.nan_to_num(pred, nan=np.finfo(float).eps)
        return scorer(_y_test, pred)
        # return scorer(pred, _y_test)

    def save_model(self, _model, _file_name):
        pickle.dump(_model, open(_file_name, "wb"))

    def load_model(self, _file_name):
        return pickle.load(open(_file_name, "rb"))

감사합니다.

반응형