機械学習プロジェクトでは、「データの前処理」と「モデルの学習」を何度も繰り返します。
このとき、前処理とモデルを別々に管理していると、コードが複雑になり、ミスも起きやすくなります。
そこで活躍するのが scikit-learn の Pipeline(パイプライン)です。
Pipelineを使えば、前処理からモデル学習までを一連の流れとして管理でき、データ漏洩 を防ぎながら 再現性 のあるワークフローを構築できます。
Contents
- なぜPipelineが必要なのか?
- よくある失敗パターン
- データ漏洩とは?
- Pipelineが解決すること
- ライブラリのインポートとデータの用意
- Pipelineの基本的な使い方
- Pipelineなしの場合(比較用)
- Pipelineを使う場合
- Pipelineの構造を理解する
- make_pipeline:さらに簡単に書く
- make_pipelineとは?
- 自動生成された名前を確認する
- 複数の前処理を組み合わせる
- 実務でよくある前処理の流れ
- 処理の流れを図解する
- ColumnTransformer:列ごとに異なる前処理
- 実務データの課題
- サンプルデータの作成
- ColumnTransformerの使い方
- ColumnTransformerとモデルをPipelineで結合
- 変換後のデータを確認する
- クロスバリデーションとPipeline
- なぜクロスバリデーションでPipelineが重要か
- cross_val_scoreとPipelineの組み合わせ
- GridSearchCVでハイパーパラメータチューニング
- ハイパーパラメータとは?
- PipelineでのGridSearchCV
- 最適なパラメータを確認する
- Pipelineの保存と読み込み
- 学習済みPipelineの保存
- 保存したPipelineを読み込む
- まとめ
なぜPipelineが必要なのか?
よくある失敗パターン
機械学習の初心者がよく陥る失敗パターンを見てみましょう。
from sklearn.preprocessing import StandardScaler from sklearn.model_selection import train_test_split # 全データでスケーリング(これが問題!) scaler = StandardScaler() X_scaled = scaler.fit_transform(X) # ← テストデータの情報も使ってしまう # その後に分割 X_train, X_test, y_train, y_test = train_test_split(X_scaled, y)
このコードの問題点は、テストデータの情報が訓練に漏れている ことです。
テストデータは「未知のデータ」として扱うべきなのに、スケーリングの計算(平均や標準偏差)にテストデータが含まれてしまっています。
データ漏洩とは?
データ漏洩(Data Leakage) とは、本来モデルが知るべきでない情報が訓練時に漏れてしまうことです。
データ漏洩が起きると、モデル構築の検討時の評価は良いのに、本番環境では性能が出ないという問題が発生します。
Pipelineが解決すること
Pipelineを使うと、以下のメリットがあります。
| メリット | 説明 |
|---|---|
| データ漏洩の防止 | 訓練データだけで前処理のパラメータを学習 |
| コードの簡潔化 | 前処理とモデルを1つのオブジェクトで管理 |
| 再現性の確保 | 同じ処理を訓練・テスト・本番で一貫して適用 |
| クロスバリデーション対応 |
ライブラリのインポートとデータの用意
今回は、scikit-learnに組み込まれているサンプルデータを使います。まず、必要なライブラリをインポートしましょう。
以下、コードです。
import numpy as np import pandas as pd from sklearn.datasets import load_iris from sklearn.model_selection import train_test_split from sklearn.preprocessing import StandardScaler from sklearn.linear_model import LogisticRegression from sklearn.pipeline import Pipeline from sklearn.metrics import accuracy_score
次に、サンプルデータを読み込みます。
以下、コードです。
# Irisデータセットを読み込む
iris = load_iris()
X = iris.data # 特徴量(花の測定値)
y = iris.target # ターゲット(花の種類)
print(f"データの形状: {X.shape}")
print(f"特徴量の名前: {iris.feature_names}")
print(f"ターゲットの種類: {iris.target_names}")
以下、実行結果です。
データの形状: (150, 4) 特徴量の名前: ['sepal length (cm)', 'sepal width (cm)', 'petal length (cm)', 'petal width (cm)'] ターゲットの種類: ['setosa' 'versicolor' 'virginica']
150件のデータがあり、4つの特徴量(花びらとがく片の長さ・幅)から3種類のアヤメを分類するタスクです。
このデータを使って、Pipelineの使い方を学んでいきましょう。
Pipelineの基本的な使い方
Pipelineなしの場合(比較用)
まず、Pipelineを使わない場合のコードを確認しましょう。
データの分割、前処理、モデル学習を別々に行います。
以下、コードです。
# データを訓練用とテスト用に分割
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.3, random_state=42
)
# 前処理:標準化(平均0、標準偏差1にスケーリング)
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train) # 訓練データで学習&変換
X_test_scaled = scaler.transform(X_test) # テストデータは変換のみ
# モデルの学習
model = LogisticRegression(random_state=42)
model.fit(X_train_scaled, y_train)
# 予測と評価
y_pred = model.predict(X_test_scaled)
accuracy = accuracy_score(y_test, y_pred)
print(f"正解率: {accuracy:.4f}")
以下、実行結果です。
正解率: 1.0000
このコードは正しく動作しますが、前処理とモデルを別々に管理しているため、コードが長くなり、ミスも起きやすくなります。
Pipelineを使う場合
同じ処理をPipelineで書き直してみましょう。
# Pipelineを作成
pipeline = Pipeline([
('scaler', StandardScaler()), # ステップ1: 標準化
('classifier', LogisticRegression(random_state=42)) # ステップ2: 分類器
])
# データを分割
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.3, random_state=42
)
# Pipelineで学習(前処理とモデル学習が一度に行われる)
pipeline.fit(X_train, y_train)
# 予測と評価
y_pred = pipeline.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
print(f"正解率: {accuracy:.4f}")
以下、実行結果です。
正解率: 1.0000
コードがシンプルになり、`fit()` と `predict()` だけで前処理からモデル適用までが完了します。
Pipelineの構造を理解する
Pipelineは「ステップ」のリストとして定義します。
各ステップは `(‘名前’, 処理オブジェクト)` のタプルで指定します。
以下、コードです。
# Pipelineの構造を確認
print("Pipelineのステップ:")
for name, step in pipeline.steps:
print(f" {name}: {step.__class__.__name__}")
以下、実行結果です。
Pipelineのステップ: scaler: StandardScaler classifier: LogisticRegression
Pipelineに含まれるステップが表示されます。
fit()を呼び出すと、各ステップが順番に実行されます。
make_pipeline:さらに簡単に書く
make_pipelineとは?
Pipelineでは各ステップに名前をつける必要がありますが、`make_pipeline` を使うと自動的に名前が付けられます。
以下、コードです。
from sklearn.pipeline import make_pipeline
# make_pipelineを使うと名前の指定が不要
pipeline_simple = make_pipeline(
StandardScaler(),
LogisticRegression(random_state=42)
)
# 学習と予測
pipeline_simple.fit(X_train, y_train)
y_pred = pipeline_simple.predict(X_test)
print(f"正解率: {accuracy_score(y_test, y_pred):.4f}")
以下、実行結果です。
正解率: 1.0000
make_pipelineはステップ名を自動生成するため、コードがさらに簡潔になります。
自動生成された名前を確認する
自動生成された名前を確認してみましょう。
以下、コードです。
# 自動生成されたステップ名を確認
print("自動生成されたステップ名:")
for name, step in pipeline_simple.steps:
print(f" {name}: {step.__class__.__name__}")
以下、実行結果です。
自動生成されたステップ名: standardscaler: StandardScaler logisticregression: LogisticRegression
クラス名を小文字にした名前が自動的に付けられていることがわかります。
複数の前処理を組み合わせる
実務でよくある前処理の流れ
実務では、1つの前処理だけでなく、複数の前処理を順番に適用することが多いです。
Pipelineなら、これを簡単に実現できます。
以下、コードです。
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.svm import SVC
# 複数の前処理を含むPipeline
pipeline_multi = Pipeline([
('scaler', StandardScaler()), # ステップ1: 標準化
('pca', PCA(n_components=2)), # ステップ2: 次元削減
('svm', SVC(random_state=42)) # ステップ3: SVMで分類
])
# 学習と予測
pipeline_multi.fit(X_train, y_train)
y_pred = pipeline_multi.predict(X_test)
print(f"正解率: {accuracy_score(y_test, y_pred):.4f}")
以下、実行結果です。
正解率: 0.9333
3つのステップを経た分類結果が得られます。
PCA(主成分分析)は次元削減の手法で、4次元のデータを2次元に圧縮しています。
処理の流れを図解する
Pipelineの処理の流れを確認しましょう。
以下、コードです。
# 各ステップでのデータの変化を確認
print("データの変化:")
print(f" 入力: {X_train.shape}")
# 標準化後
X_scaled = pipeline_multi.named_steps['scaler'].transform(X_train)
print(f" 標準化後: {X_scaled.shape}")
# PCA後
X_pca = pipeline_multi.named_steps['pca'].transform(X_scaled)
print(f" PCA後: {X_pca.shape}")
以下、実行結果です。
データの変化: 入力: (105, 4) 標準化後: (105, 4) PCA後: (105, 2)
各ステップでデータの形状がどう変化するかがわかります。
4次元から2次元に削減されていることが確認できます。
ColumnTransformer:列ごとに異なる前処理
実務データの課題
実務のデータには、数値データとカテゴリデータが混在していることが多いです。
数値データには標準化、カテゴリデータにはワンホットエンコーディングというように、列ごとに異なる前処理を適用する必要があります。
サンプルデータの作成
まず、数値データとカテゴリデータが混在したサンプルデータを作成します。
以下、コードです。
# 数値とカテゴリが混在したサンプルデータ
data = {
'年齢': [25, 32, 47, 51, 62, 23, 35, 44, 29, 55],
'年収': [350, 480, 720, 850, 920, 280, 520, 680, 410, 780],
'性別': ['男', '女', '男', '女', '男', '女', '男', '女', '男', '女'],
'職種': ['営業', '技術', '管理', '技術', '管理', '営業', '技術', '管理', '営業', '技術'],
'購入': [0, 1, 1, 1, 1, 0, 1, 1, 0, 1] # 目的変数
}
df = pd.DataFrame(data)
print(df)
以下、実行結果です。
年齢 年収 性別 職種 購入 0 25 350 男 営業 0 1 32 480 女 技術 1 2 47 720 男 管理 1 3 51 850 女 技術 1 4 62 920 男 管理 1 5 23 280 女 営業 0 6 35 520 男 技術 1 7 44 680 女 管理 1 8 29 410 男 営業 0 9 55 780 女 技術 1
ColumnTransformerの使い方
ColumnTransformerを使うと、列ごとに異なる前処理を適用できます。
以下、コードです。
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
# 特徴量と目的変数を分離
X_df = df.drop('購入', axis=1)
y_df = df['購入']
# 列の種類を定義
numeric_features = ['年齢', '年収'] # 数値列
categorical_features = ['性別', '職種'] # カテゴリ列
# ColumnTransformerで列ごとに異なる前処理を定義
preprocessor = ColumnTransformer(
transformers=[
('num', StandardScaler(), numeric_features), # 数値列: 標準化
('cat', OneHotEncoder(drop='first'), categorical_features) # カテゴリ列: ワンホット
]
)
print("ColumnTransformerを作成しました")
print(f" 数値列: {numeric_features} → 標準化")
print(f" カテゴリ列: {categorical_features} → ワンホットエンコーディング")
以下、実行結果です。
ColumnTransformerを作成しました 数値列: ['年齢', '年収'] → 標準化 カテゴリ列: ['性別', '職種'] → ワンホットエンコーディング
各列にどの前処理が適用されるかが表示されます。
ColumnTransformerとモデルをPipelineで結合
作成した ColumnTransformerをPipelineに組み込みます。
以下、コードです。
from sklearn.ensemble import RandomForestClassifier
# ColumnTransformer + モデルのPipeline
full_pipeline = Pipeline([
('preprocessor', preprocessor),
('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])
# データを分割
X_train_df, X_test_df, y_train_df, y_test_df = train_test_split(
X_df, y_df, test_size=0.3, random_state=42
)
# 学習と予測
full_pipeline.fit(X_train_df, y_train_df)
y_pred_df = full_pipeline.predict(X_test_df)
print(f"正解率: {accuracy_score(y_test_df, y_pred_df):.4f}")
以下、実行結果です。
正解率: 1.0000
数値列の標準化とカテゴリ列のエンコーディングが自動的に行われ、モデルが学習されます。
これにより、前処理とモデルを一元管理できます。
変換後のデータを確認する
前処理後のデータがどうなっているか確認してみましょう。
以下、コードです。
# 前処理後のデータを確認
X_transformed = full_pipeline.named_steps['preprocessor'].transform(X_train_df)
print(f"変換前の形状: {X_train_df.shape}")
print(f"変換後の形状: {X_transformed.shape}")
print()
print("変換後のデータ(最初の3行):")
print(X_transformed[:3])
以下、実行結果です。
変換前の形状: (7, 4) 変換後の形状: (7, 5) 変換後のデータ(最初の3行): [[-1.7869251 -1.85940983 1. 0. 0. ] [-0.13650122 -0.04707367 0. 0. 1. ] [ 0.12409202 0.17260344 1. 0. 1. ]]
4列から5列に増えていることがわかります。
カテゴリ列がワンホットエンコーディングされたためです(性別は2値なので1列、職種は3値なので2列に展開)。
クロスバリデーションとPipeline
なぜクロスバリデーションでPipelineが重要か
クロスバリデーションでは、データを複数の「Fold」に分割して評価します。
このとき、各Foldで前処理のパラメータ(平均値など)を訓練データだけから計算する必要があります。
Pipelineを使わないと、この処理が非常に複雑になります。
cross_val_scoreとPipelineの組み合わせ
cross_val_score とPipelineを組み合わせると、正しいクロスバリデーションが簡単に実行できます。
以下、コードです。
from sklearn.model_selection import cross_val_score
# Pipelineを作成
cv_pipeline = make_pipeline(
StandardScaler(),
LogisticRegression(random_state=42)
)
# クロスバリデーション(5分割)
scores = cross_val_score(
cv_pipeline,
X, y,
cv=5,
scoring='accuracy'
)
print("クロスバリデーションの結果:")
print(f" 各Foldの正解率: {scores}")
print(
f" 平均正解率: {scores.mean():.4f}"
f" (±{scores.std():.4f})"
)
以下、実行結果です。
クロスバリデーションの結果: 各Foldの正解率: [0.96666667 1. 0.93333333 0.9 1. ] 平均正解率: 0.9600 (±0.0389)
5つのFoldそれぞれの正解率と、平均・標準偏差が表示されます。
各Foldで、訓練データだけを使ってスケーリングのパラメータが計算され、テストデータには訓練データのパラメータが適用されます。
これがPipelineの大きなメリットです。
GridSearchCVでハイパーパラメータチューニング
ハイパーパラメータとは?
ハイパーパラメータ とは、モデルが学習する前に人間が設定するパラメータのことです。
例えば、ロジスティック回帰の正則化強度 C や、ランダムフォレストの木の数 n_estimators などです。
PipelineでのGridSearchCV
PipelineとGridSearchCVを組み合わせると、前処理のパラメータとモデルのハイパーパラメータを同時にチューニングできます。
以下、コードです。
from sklearn.model_selection import GridSearchCV
# Pipelineを作成
tuning_pipeline = Pipeline([
('scaler', StandardScaler()),
('pca', PCA()),
('classifier', LogisticRegression(random_state=42))
])
# 探索するパラメータを定義
# 「ステップ名__パラメータ名」の形式で指定
param_grid = {
'pca__n_components': [2, 3, 4], # PCAの次元数
'classifier__C': [0.1, 1.0, 10.0] # 正則化の強度
}
# GridSearchCVで最適なパラメータを探索
grid_search = GridSearchCV(
tuning_pipeline,
param_grid,
cv=5,
scoring='accuracy',
verbose=1
)
grid_search.fit(X_train, y_train)
以下、実行結果です。
Fitting 5 folds for each of 9 candidates, totalling 45 fits
9通り(3×3)のパラメータの組み合わせが5分割クロスバリデーションで評価されます。
最適なパラメータを確認する
探索結果から最適なパラメータを確認しましょう。
以下、コードです。
# 最適なパラメータを表示
print("最適なパラメータ:")
print(grid_search.best_params_)
print()
print(f"最高スコア(CV平均): {grid_search.best_score_:.4f}")
# 最適なモデルでテストデータを評価
best_model = grid_search.best_estimator_
y_pred_best = best_model.predict(X_test)
print(f"テストデータの正解率: {accuracy_score(y_test, y_pred_best):.4f}")
以下、実行結果です。
最適なパラメータ:
{'classifier__C': 1.0, 'pca__n_components': 3}
最高スコア(CV平均): 0.9429
テストデータの正解率: 1.0000
最適なパラメータの組み合わせと、そのときのスコアが表示されます。
best_estimator_ には、最適なパラメータで学習済みのPipelineが格納されています。
Pipelineの保存と読み込み
学習済みPipelineの保存
学習済みのPipelineは、joblib を使ってファイルに保存できます。
これにより、本番環境で再利用したり、後で再現したりできます。
以下、コードです。
import joblib
# 学習済みPipelineを保存
joblib.dump(best_model, 'trained_pipeline.pkl')
print("Pipelineを 'trained_pipeline.pkl' に保存しました")
以下、実行結果です。
Pipelineを 'trained_pipeline.pkl' に保存しました
Pipelineがファイルに保存されます。前処理のパラメータ(平均値など)もモデルの重みもすべて含まれています。
保存したPipelineを読み込む
保存したPipelineは、joblib.load() で読み込めます。
以下、コードです。
# Pipelineを読み込む
loaded_pipeline = joblib.load('trained_pipeline.pkl')
print("Pipelineを読み込みました")
# 読み込んだPipelineで予測
y_pred_loaded = loaded_pipeline.predict(X_test)
print(f"読み込んだモデルの正解率: {accuracy_score(y_test, y_pred_loaded):.4f}")
以下、実行結果です。
Pipelineを読み込みました 読み込んだモデルの正解率: 1.0000
保存時と同じ予測結果が得られます。
本番環境では、このように保存したPipelineを読み込んで、新しいデータに対して予測を行います。
まとめ
最後に整理します。
機械学習プロジェクトでは、「データの前処理」と「モデルの学習」を何度も繰り返します。
このとき、前処理とモデルを別々に管理していると、コードが複雑になり、ミスも起きやすくなります。
そこで活躍するのが scikit-learn の Pipeline(パイプライン)です。
Pipelineは、機械学習プロジェクトを効率的に進めるための必須ツールです。

