Installation
Use pip
$ pip install codableopt
Use setup.py
# Master branch
$ git clone https://github.com/recruit-tech/codable-model-optimizer
$ python3 setup.py install
Sample:
import numpy as np
from codableopt import *
# set problem
problem = Problem(is_max_problem=True)
# define variables
x = IntVariable(name='x', lower=np.double(0), upper=np.double(5))
y = DoubleVariable(name='y', lower=np.double(0.0), upper=None)
z = CategoryVariable(name='z', categories=['a', 'b', 'c'])
# define objective function
def objective_function(var_x, var_y, var_z, parameters):
obj_value = parameters['coef_x'] * var_x + parameters['coef_y'] * var_y
if var_z == 'a':
obj_value += 10.0
elif var_z == 'b':
obj_value += 8.0
else:
# var_z == 'c'
obj_value -= 3.0
return obj_value
# set objective function and its arguments
problem += Objective(objective=objective_function,
args_map={'var_x': x,
'var_y': y,
'var_z': z,
'parameters': {'coef_x': -3.0, 'coef_y': 4.0}})
# define constraint
problem += 2 * x + 4 * y + 2 * (z == 'a') + 3 * (z == ('b', 'c')) <= 8
problem += 2 * x - y + 2 * (z == 'b') > 3
print(problem)
solver = OptSolver()
# generate optimization methods to be used within the solver
method = PenaltyAdjustmentMethod(steps=40000)
answer, is_feasible = solver.solve(problem, method)
print(f'answer:{answer}, answer_is_feasible:{is_feasible}')
Getting Started
Installation
Use pip
$ pip install codableopt
Use setup.py
# Master branch
$ git clone https://github.com/recruit-tech/codable-model-optimizer
$ python3 setup.py install
Basic Usage
問題を設定
問題オブジェクトを生成する際に、最大化または最小化問題のどちらかを指定をする必要があります。is_max_problemが、Trueの場合は最大化問題、Falseの場合は最小化問題となります。
>>> from codableopt import Problem
>>> problem = Problem(is_max_problem=True)
変数を定義
利用する変数を定義します。生成した変数オブジェクトは、制約式や目的関数の引数に利用することができます。
>>> from codableopt import IntVariable, DoubleVariable, CategoryVariable
>>> x = IntVariable(name='x', lower=np.double(0), upper=np.double(5))
>>> y = DoubleVariable(name='y', lower=np.double(0.0), upper=None)
>>> z = CategoryVariable(name='z', categories=['a', 'b', 'c'])
変数は、内包表記やfor文によってまとめて定義することもできます。
>>> from codableopt import IntVariable
>>> x = [IntVariable(name=f'x_{no}', lower=None, upper=None) for no in range(100)]
目的関数を設定
目的関数を問題に設定します。目的関数は、Objectiveオブジェクトを問題オブジェクトに加えることによって、設定できます。Objectiveオブジェクトを生成時には、「目的関数を計算するPython関数」と「引数のマッピング情報」を引数に設定します。 「引数のマッピング情報」は、Dict型で設定し、keyは目的関数の引数名、valueは変数オブジェクトまたは定数やPythonオブジェクトなどを指定します。なお、引数にマッピングした変数オブジェクトは、目的関数を計算するPython関数内では、最適化計算中の変数の値に変換されてから、引数に渡されます。
>>> def objective_function(var_x, var_y, var_z, parameters):
>>> obj_value = parameters['coef_x'] * var_x + parameters['coef_y'] * var_y
>>> if var_z == 'a':
>>> obj_value += 10.0
>>> elif var_z == 'b':
>>> obj_value += 8.0
>>> else:
>>> # var_z == 'c'
>>> obj_value -= 3.0
>>>
>>> return obj_value
>>>
>>> problem += Objective(objective=objective_function,
>>> args_map={'var_x': x, 'var_y': y, 'var_z': z,
>>> 'parameters': {'coef_x': -3.0, 'coef_y': 4.0}})
「引数のマッピング情報」には、変数リストを渡すこともできます。
>>> from codableopt import IntVariable
>>> x = [IntVariable(name=f'x_{no}', lower=None, upper=None) for no in range(100)]
>>>
>>> problem += Objective(objective=objective_function, args_map={'var_x': x}})
制約式を定義
制約式を問題に設定します。制約は、制約式オブジェクトを問題オブジェクトに加えることによって、設定できます。制約式オブジェクトは、変数オブジェクトと不等式を組み合わせることによって生成できます。不等式には、<,<=,>,>=,==が利用できます。また、1次式の制約式しか利用できません。
>>> constant = 2 * x + 4 * y + 2 * (z == 'a') + 3 * (z == ('b', 'c')) <= 8
>>> problem += constant
最適化計算を実行
ソルバーオブジェクトと最適化手法オブジェクトを生成し、ソルバーオブジェクトに問題オブジェクトと最適化手法オブジェクトを渡し、最適化計算を行います。ソルバーは、得られた最も良い解と得られた解が制約を全て満たすかの判定フラグを返します。
>>> solver = OptSolver(round_times=2)
>>> method = PenaltyAdjustmentMethod(steps=40000)
>>> answer, is_feasible = solver.solve(problem, method)
>>> print(f'answer:{answer}')
answer:{'x': 0, 'y': 1.5, 'z': 'a'}
>>> print(f'answer_is_feasible:{is_feasible}')
answer_is_feasible:True
Variable
整数・連続値・カテゴリの3種類の変数を提供しています。各変数は、目的関数に渡す引数や制約式に利用します。どの種類の変数も共通で、変数名を設定することができます。変数名は、最適化の解を返す際に利用されます。
IntVariable
IntVariableは、整数型の変数です。lowerには下界値、upperには上界値を設定します。なお、境界値は可能な値として設定されます。また、Noneを設定した場合は、下界値/上界値が設定されます。IntVariableは、目的関数に渡す引数と制約式のどちらにも利用できます。
from codableopt import IntVariable
x = IntVariable(name='x', lower=0, upper=None)
DoubleVariable
DoubleVariableは、連続値型の変数です。lowerには下界値、upperには上界値を設定します。なお、境界値は可能な値として設定されます。また、Noneを設定した場合は、下界値/上界値が設定されます。DoubleVariableは、目的関数に渡す引数と制約式のどちらにも利用できます。
from codableopt import DoubleVariable
x = DoubleVariable(name='x', lower=None, upper=2.3)
CategoryVariable
CategoryVariableは、カテゴリ型の変数です。categoriesには、取り得るカテゴリ値を設定します。CategoryVariableは、目的関数に渡すことはできるが、制約式に利用することはできません。カテゴリ値を制約式に利用したい場合は、CategoryCaseVariableを利用する必要があります。
from codableopt import CategoryVariable
x = CategoryVariable(name='x', categories=['a', 'b', 'c'])
CategoryCaseVariableは、カテゴリ型の変数と等式を組み合わせることで生成できます。Tupleを利用すれば、比較する値を複数設定でき、いずれかと等しい場合は1、それ以外の場合は0となります。CategoryCaseVariableは、目的関数に渡す引数と制約式のどちらにも利用できます。
# xが'a'の時は1、'b'または'c'の時は0となるCategoryCaseVariable
x_a = x == 'a'
# xがb'または'c'の時は1、'a'の時は0となるCategoryCaseVariable
x_bc = x == ('b', 'c')
Advanced Usage
Delta Objective Function
目的関数の計算は、関数によっては非常に計算コストが高くなります。しかし、差分計算を用いることで目的関数の計算コストを下げることができます。本ソルバーでも、目的関数の差分計算関数を設定することができます。差分計算は、Objectiveオブジェクト生成時の引数にdelta_objectiveを設定することで利用できます。なお、差分計算関数の引数には、目的関数と同様の引数に加えて、遷移前の変数の値が元の変数名の前にpre_をつけた名前で渡されます。
x = IntVariable(name='x', lower=np.double(0.0), upper=None)
y = DoubleVariable(name='y', lower=np.double(0.0), upper=None)
# 目的関数
def obj_fun(var_x, var_y):
return 3 * var_x + 5 * var_y
# 目的関数の差分計算用の関数
def delta_obj_fun(pre_var_x, pre_var_y, var_x, var_y, parameters):
delta_value = 0
if pre_var_x != var_x:
delta_value += parameters['coef_x'] * (var_x - pre_var_x)
if pre_var_y != var_y:
delta_value += parameters['coef_y'] * (var_y - pre_var_y)
return delta_value
# 目的関数を定義
problem += Objective(objective=obj_fun,
delta_objective=delta_obj_fun,
args_map={'var_x': x, 'var_y': y,
'parameters': {'coef_x': 3.0, 'coef_y': 2.0}})
Custom Optimization Method
本ソルバーは、共通アルゴリズム上で最適化手法をカスタマイズして利用することはできます。最適化手法をカスタマイズする場合は、本ソルバーが提供しているOptimizerMethodを継承して実装することで実現することができます。本ソルバーが提供しているペナルティ係数調整手法もその枠組み上で実装されています。
OptimizerMethod
Sample Code
from typing import List
from random import choice
from codableopt.solver.optimizer.entity.proposal_to_move import ProposalToMove
from codableopt.solver.optimizer.method.optimizer_method import OptimizerMethod
from codableopt.solver.optimizer.optimization_state import OptimizationState
class SampleMethod(OptimizerMethod):
def __init__(self, steps: int):
super().__init__(steps)
def name(self) -> str:
return 'sample_method'
def initialize_of_step(self, state: OptimizationState, step: int):
# ステップ開始時の処理なし
pass
def propose(self, state: OptimizationState, step: int) -> List[ProposalToMove]:
# 変数から1つランダムに選択する
solver_variable = choice(state.problem.variables)
# 選択した変数をランダムに移動する解の遷移を提案する
return solver_variable.propose_random_move(state)
def judge(self, state: OptimizationState, step: int) -> bool:
# 遷移前と遷移後のスコアを比較
delta_energy = state.current_score.score - state.previous_score.score
# ソルバー内はエネルギーが低い方が最適性が高いことを表している
# マイナスの場合に解が改善しているため、提案を受け入れる
return delta_energy < 0
def finalize_of_step(self, state: OptimizationState, step: int):
# ステップ終了時の処理なし
pass
[deprecation] User Define Constraint
非推奨ではありますが、本ソルバーでは、制約式を関数として渡すこともできます。関数の返り値には制約違反量を設定します。引数は、目的関数同様にargs_mapを設定することで指定できます。ただし、デフォルトで提供しているmethodでは、User Define Constraintを利用している最適化問題は実用に耐えうる最適化精度を実現できません。そのため現状では利用することは推奨していません。
Sample Code
# 制約式を定義
def udf_constraint_function(var_x, var_y, var_z):
violation_amount = 2 * var_x + 4 * var_y - 8
if var_z == 'a':
violation_amount += 2
else:
violation_amount += 3
if violation_amount <= 0:
return 0
else:
return violation_amount
constant = UserDefineConstraint(udf_constraint_function,
args_map={'var_x': x, 'var_y': y, 'var_z': z},
constraint_name='user_define_constraint')
problem += constant
Algorithm
本ソルバーの最適化アルゴリズムは、「全手法で共通のアルゴリズム部分」と「各手法で異なるアルゴリズム部分」に分かれています。「全手法で共通のアルゴリズム部分」はカスタマイズできませんが、「各手法で異なるアルゴリズム部分」はカスタムマイズすることができます。また、本ソルバーでは、ペナルティ係数調整手法という名称で最適化手法を提供しています。
Common Algorithm
Algorithm of OptSolver
共通アルゴリズムは、下記のようなステップのアルゴリズムです。
初期解の生成
num_to_select_init_answerで指定した数、ランダムな解を生成します。ランダムな解は、変数毎に上界/下界またはカテゴリ値からランダムに選択した値です。生成したランダムな解を各変数のスケールを正規化し、簡易的なアルゴリズムによって選択した解群間のユーグリッド距離の合計が最大となるようなround_times個の解群を初期解群として採用します。ただし、引数のinit_answersに初期解が設定されている場合は、ランダムな解を生成せずに、指定された初期解を利用します。
最適化の実行
採用した初期解毎に最適化の実行します。また、このときn_jobs引数によって並列処理の実行を設定している場合は、初期解毎に並列処理が実行されます。全ての初期解に対する最適化が完了したら、返された最適化処理の結果から実行可能解がある場合はその中から最も目的関数の値が最も良い解を、実行可能解がない場合はその中から目的関数の値に制約違反ペナルティを加えた値が最も良い解を選択して、最適化の実行結果として返します。(制約違反ペナルティのペナルティ係数は、共通ではなく、各最適化処理内で決定している値を利用する。結果、フェアな比較ではないが、実行可能解がない場合の解は参考程度の解なので、現状このような仕様となっている。また今後、細かな結果など含めて最適化結果を返せるようなインタフェースを検討する予定です。。)
Algorithm of Optimizer
ペナルティ係数の調整
初期解(最適化試行)毎に、answer_num_to_tune_penaltyで指定した数、ランダムな解を生成します。ランダムな解は、変数毎に上界/下界またはカテゴリ値からランダムに選択した値とする。生成したランダムな解群から各制約式の違反量の違反量を計算し、「生成したランダムな解群における目的関数の最大値と最小値の差分のpenalty_strength倍」と「各制約式のペナルティスコア」が等しくなるような各制約式のペナルティ係数に調整します。
初期解のスコア計算
初期解から目的関数の値と制約違反のペナルティの値を計算し、合算して初期解のスコアとして採用します。また、初期解を現状の最適解として採択します。
method.initialize_of_step実施
設定したmethodのinitialize_of_step関数を呼び出します。
method.propose実施
設定したmethodのpropose関数を呼び出し、解の遷移案を取得します。
提案された解のスコア計算
解の遷移案を実行した場合のスコア(目的関数の値と制約違反のペナルティの値の合算)を計算します。
method.judge実施
設定したmethodのjudge関数を呼び出し、解の遷移を実施有無を決定します。解の遷移が決定した場合は、現状の解を遷移させる。
最適性確認
6で解が遷移した場合は、新たな解の最適性の確認を行います。最適解は、実行可能解を優先し、その上でスコアが良い方を優先するように選択します。現状の最適解より良い場合は、最適解を変更します。
method.finalize_of_step実施
設定したmethodのfinalize_of_step実施関数を呼び出します。
終了判定
3-8を繰り返した回数を計算する。methodで設定したsteps数に達した場合は、終了とし、現状の最適解を返す。達していない場合は、3に戻り、同様に処理を繰り返していきます。
Method
Penalty Adjustment Method
method.initialize_of_step
ステップ開始時の処理はありません。
method.propose
random_movement_rateの確率で、ランダム遷移を提案する。また、(1-random_movement_rate)の確率で、ペナルティが小さくなる遷移を提案します。 ランダム遷移を提案する時は、最適化問題内の1つの変数をランダムに選択し、上界から下界値またはカテゴリ値から1つの値を選択し、提案します。ただし、変数が数値かつデータ遷移履歴がhistory_value_size以上のデータ件数の場合は、対象変数のデータ遷移履歴値の平均値と標準偏差値を計算し、「平均値 - 標準偏差値 * range_std_rate」から「平均値 + 標準偏差値 * range_std_rate」までの値の範囲からランダムで値を選択して、提案します。またペナルティが小さくなる遷移を提案する時は、最適化問題内の1つの変数をランダムに選択し、その変数を動かすことでペナルティが減るような値を計算によって求め、提案します。また、ペナルティが最小になる複数の値が存在した場合は、その中からランダムに選択し、提案します。(*この計算において、UserDefineConstraintは対象になりません。)
method.judge
スコアを比較し、現状の解より良い場合は遷移するという判定結果を、それ以外の場合は遷移しないという判定結果を返します。
method.finalize_of_step
「最後に解が遷移してから経過したStep数」と「最後にペナルティ係数を更新してから経過したStep数」を計算し、小さな方の値を採用し、その値がsteps_while_not_improve_score以上に達していたらペナルティ係数を調整します。 現状の解が実行可能解である場合は、ペナルティ係数に「現状の解のペナルティ違反量を正規化した値」と(1 + self._delta_penalty_rate)を積算し、ペナルティ係数をあげます。また、現状の解が実行可能解ではない場合は、一律にペナルティ係数に(1 - delta_penalty_rate)を積算し、ペナルティ係数を下げます。