Lab AI 時系列データベースの基礎——IoT センサーデータ、モニタリング、金融データのための専用 DB 設計
目次

「過去 1 年間のセンサーデータを保存したい」「5 秒ごとの株価データをリアルタイムで集計したい」「サーバーのメトリクスを 30 日間保持し、1 時間ごとに平均値を計算したい」——こうした要件に、リレーショナルデータベース(RDB)は十分に応えられない。

時系列データベース(Time-Series Database, TSDB)は、タイムスタンプ付きデータの保存・検索・集計に特化したデータベースだ。IoT デバイスのセンサーデータ、システムモニタリング、金融市場データ、ログ分析など、時間軸に沿ったデータ処理が必要な場面で威力を発揮する。

本記事では、時系列データベースの基本概念から、データの保存構造、圧縮技術、ダウサンプリング、継続クエリ、そして主要 TSDB 製品の比較までを体系的に解説する。

時系列データの特性と RDB の限界

時系列データの特徴

時系列データは、以下のような特性を持つ。

【時系列データの例】

1. IoT センサーデータ
   - 温度:25.3°C, 25.5°C, 25.1°C, ...(5 秒間隔)
   - 湿度:60%, 61%, 59%, ...(5 秒間隔)
   - 気圧:1013.2hPa, 1013.5hPa, ...(5 秒間隔)

2. システムメトリクス
   - CPU 使用率:45%, 52%, 48%, ...(10 秒間隔)
   - メモリ使用量:4.2GB, 4.3GB, 4.1GB, ...(10 秒間隔)
   - ディスク I/O: 120MB/s, 115MB/s, ...(10 秒間隔)

3. 金融市場データ
   - 株価:1500 円,1502 円,1498 円,...(1 秒間隔)
   - 出来高:1000 株,1500 株,800 株,...(1 秒間隔)

時系列データの本質的な特性:

  • 時間順序性: データは時間順に到着し、過去の値との比較が重要
  • 高頻度書き込み: 秒間数千〜数百万件の INSERT が発生
  • 最新値へのアクセス集中: 直近 1 時間〜1 日のデータが最も参照される
  • 集計需要: 平均、最大、最小、合計などの集計クエリが頻発
  • 時間窓処理: 「過去 5 分間の平均」「1 時間ごとの合計」など時間ベースの処理

RDB での時系列データ管理の問題点

RDB で時系列データを管理しようとすると、以下の問題に直面する。

-- RDB での典型的な時系列データ設計
CREATE TABLE sensor_data (
    id SERIAL PRIMARY KEY,
    sensor_id INTEGER NOT NULL,
    timestamp TIMESTAMP NOT NULL,
    value DOUBLE PRECISION NOT NULL,
    INDEX idx_timestamp (timestamp),
    INDEX idx_sensor_time (sensor_id, timestamp)
);

-- 問題 1: 大量 INSERT でインデックス更新がボトルネック
INSERT INTO sensor_data (sensor_id, timestamp, value)
VALUES (1, '2025-07-20 10:00:00', 25.3);  -- 秒間 1000 件INSERT...

-- 問題 2: 時間範囲クエリが遅い
SELECT AVG(value) FROM sensor_data
WHERE timestamp >= '2025-07-01'
  AND timestamp < '2025-08-01';  -- 数千万行をスキャン

-- 問題 3: 古いデータの削除が重い
DELETE FROM sensor_data
WHERE timestamp < '2025-06-01';  -- 数千万行削除でテーブルロック

RDB の構造的な限界:

  1. インデックス更新コスト: B ツリーインデックスはランダムアクセスに最適で、時系列のような連続 INSERT に非効率
  2. 行ベース保存: 1 行ごとにメタデータを持ち、時系列データの連続性に非効率
  3. VACUUM/最適化: 削除後の領域回復に時間がかかる
  4. 列指向非対応: 集計クエリで不要なカラムも読み込む

TSDB の基本的な構造——時間軸でデータを整理する

列指向ストレージ

TSDB の多くは列指向ストレージを採用している。

【行指向 vs 列指向】

行指向(RDB):
┌─────────┬──────────┬───────┬────────┐
│ sensor_id│timestamp │ value │ status │
├─────────┼──────────┼───────┼────────┤
│    1    │ 10:00:00 │ 25.3  │   OK   │ ← 1 行
│    1    │ 10:00:05 │ 25.5  │   OK   │ ← 2 行
│    1    │ 10:00:10 │ 25.1  │   OK   │ ← 3 行
└─────────┴──────────┴───────┴────────┘

列指向(TSDB):
sensor_id: [1, 1, 1, 1, 1, ...]
timestamp: [10:00:00, 10:00:05, 10:00:10, 10:00:15, ...]
value:     [25.3, 25.5, 25.1, 25.4, ...]
status:    [OK, OK, OK, OK, ...]

列指向のメリット:

  • 高い圧縮率: 同じ型・似た値が連続するため、圧縮アルゴリズムが効く
  • 集計クエリの高速化: AVG(value) なら value 列のみ読み込めばよい
  • I/O 効率: 必要なカラムのみディスクから読み込む

時間ベースのパーティショニング

TSDB はデータを時間単位で分割して管理する。

【パーティショニングの概念】

sensor_data メトリック:
┌─────────────────────────────────────────────┐
│  2025-07-19 日のシャード(過去)             │
│  - 圧縮済み、読み取り専用                    │
│  - S3 などの安価なストレージに移動可能        │
├─────────────────────────────────────────────┤
│  2025-07-20 午前のシャード                   │
│  - 圧縮済み、読み取りメイン                  │
├─────────────────────────────────────────────┤
│  2025-07-20 午後のシャード(現在)            │
│  - 書き込み中、インメモリ、高速アクセス       │
└─────────────────────────────────────────────┘

パーティショニングのメリット:

  • 古いデータの削除: シャード単位で即時削除可能
  • 階層化ストレージ: 最新データは SSD、過去データは HDD/S3
  • 並列クエリ: 複数シャードを並列にスキャン

TSM(Time-Structured Merge Tree)

InfluxDB が採用するTSMは、時系列データに最適化された保存エンジンだ。

【TSM の書き込みフロー】

1. メモリ(WAL + TSM Cache):
   書き込み → WAL(Write-Ahead Log)に追記
          → TSM Cache(インメモリ)に蓄積

2. フラッシュ:
   Cache が 100MB 到達 → TSM ファイルにフラッシュ
   (順序付きでソート、圧縮)

3.  compaction:
   複数 TSM ファイル → 1 つにマージ
   (重複削除、圧縮率向上)

ディスク構造:
┌──────────────────────────────────────┐
│ WAL ファイル(追加のみ、順序なし)    │
├──────────────────────────────────────┤
│ TSM ファイル 1(ソート済、圧縮済)   │ ← 10:00-10:05
│ TSM ファイル 2(ソート済、圧縮済)   │ ← 10:05-10:10
│ TSM ファイル 3(ソート済、圧縮済)   │ ← 10:10-10:15
└──────────────────────────────────────┘

TSM の特徴:

  • 追加のみ: 既存ファイルの更新はなく、常に追記
  • ソート済み: タイムスタンプ順にソート済み、範囲クエリが高速
  • 高圧縮: 類似値が連続、Gorilla 圧縮などの専用アルゴリズム

時系列データの圧縮技術

Gorilla 圧縮(Facebook)

Facebook の Gorilla TSDB が開発した、時系列データに特化した圧縮アルゴリズムだ。

def gorilla_timestamp_encoding(timestamps):
    """
    Gorilla のタイムスタンプ圧縮

    基本アイデア:連続するタイムスタンプの差分(delta)を記録
    さらに delta の差分(delta-of-delta)を記録
    """
    if len(timestamps) < 2:
        return timestamps

    # 最初の値
    first_timestamp = timestamps[0]
    first_delta = timestamps[1] - timestamps[0]

    # delta-of-delta を計算
    delta_of_deltas = []
    prev_delta = first_delta

    for i in range(2, len(timestamps)):
        delta = timestamps[i] - timestamps[i-1]
        dod = delta - prev_delta
        delta_of_deltas.append(dod)
        prev_delta = delta

    # delta-of-delta は 0 になることが多く、効率的に符号化可能
    return {
        'first': first_timestamp,
        'first_delta': first_delta,
        'dod': delta_of_deltas  # 多くの要素が 0
    }

# 例:5 秒間隔のセンサーデータ
timestamps = [1000, 1005, 1010, 1015, 1020, 1025]
result = gorilla_timestamp_encoding(timestamps)

print("【Gorilla タイムスタンプ圧縮】")
print(f"元データ:{timestamps}")
print(f"最初のタイムスタンプ:{result['first']}")
print(f"最初の差分:{result['first_delta']}")
print(f"delta-of-delta: {result['dod']}")  # [0, 0, 0, 0] → 効率的に符号化

出力:

【Gorilla タイムスタンプ圧縮】
元データ:[1000, 1005, 1010, 1015, 1020, 1025]
最初のタイムスタンプ:1000
最初の差分:5
delta-of-delta: [0, 0, 0, 0]

Gorilla 圧縮の本質:

  • センサーデータは等間隔で収集されることが多い
  • 等間隔なら delta-of-delta は常に 0
  • 0 の連続は、ビット列で効率的に表現可能

XOR 値圧縮

Gorilla は数値データも圧縮する。

def gorilla_value_encoding(values):
    """
    Gorilla の数値圧縮(XOR ベース)

    基本アイデア:浮動小数点数の XOR を取り、
    変化が小さい場合は多くのビットが 0 になる
    """
    if len(values) < 2:
        return values

    import struct

    def float_to_bits(f):
        return struct.unpack('>Q', struct.pack('>d', f))[0]

    def bits_to_float(b):
        return struct.unpack('>d', struct.pack('>Q', b))[0]

    # 最初の値はそのまま保存
    first_value = values[0]
    prev_bits = float_to_bits(first_value)

    encoded = []

    for i in range(1, len(values)):
        current_bits = float_to_bits(values[i])
        xor = prev_bits ^ current_bits

        # XOR 結果の先頭・末尾の 0 ビット数をカウント
        if xor == 0:
            # 完全に同じ値
            encoded.append({'type': 'same', 'data': None})
        else:
            # 有効ビットのみ保存
            leading_zeros = count_leading_zeros(xor)
            trailing_zeros = count_trailing_zeros(xor)
            significant_bits = xor >> trailing_zeros

            encoded.append({
                'type': 'different',
                'leading': leading_zeros,
                'trailing': trailing_zeros,
                'data': significant_bits
            })

        prev_bits = current_bits

    return {'first': first_value, 'encoded': encoded}

def count_leading_zeros(n, bits=64):
    count = 0
    for i in range(bits - 1, -1, -1):
        if (n >> i) & 1:
            break
        count += 1
    return count

def count_trailing_zeros(n):
    count = 0
    while n and not (n & 1):
        count += 1
        n >>= 1
    return count

# 例:温度センサーの値(少しずつ変化する)
values = [25.3, 25.3, 25.3, 25.4, 25.5, 25.5, 25.5]
result = gorilla_value_encoding(values)

print("\n【Gorilla 値圧縮】")
print(f"元データ:{values}")
print(f"最初の値:{result['first']}")
print(f"エンコード結果:")
for i, enc in enumerate(result['encoded']):
    if enc['type'] == 'same':
        print(f"  値{i+1}: 同じ値(0 ビット)")
    else:
        print(f"  値{i+1}: {enc['leading']}ビットシフト、{enc['data']} を保存")

出力:

【Gorilla 値圧縮】
元データ:[25.3, 25.3, 25.3, 25.4, 25.5, 25.5, 25.5]
最初の値:25.3
エンコード結果:
  値 1: 同じ値(0 ビット)
  値 2: 同じ値(0 ビット)
  値 3: 12 ビットシフト、[有効ビット] を保存
  値 4: 11 ビットシフト、[有効ビット] を保存
  値 5: 同じ値(0 ビット)
  値 6: 同じ値(0 ビット)

圧縮効果:

  • 変化の少ないセンサーデータ:90% 以上の圧縮率
  • 完全に同じ値:1 ビット(フラグのみ)で表現
  • 少し变化的値:有効ビットのみ保存(通常 10-20 ビット)

デルタエンコーディング

より単純な圧縮手法として、デルタ(差分)エンコーディングもある。

def delta_encoding(values):
    """
    デルタエンコーディング

    連続する値の差分のみを保存
    """
    if not values:
        return []

    first = values[0]
    deltas = [values[i] - values[i-1] for i in range(1, len(values))]

    return {'first': first, 'deltas': deltas}

def delta_decode(encoded):
    """デコード"""
    if not encoded['deltas']:
        return [encoded['first']]

    values = [encoded['first']]
    for delta in encoded['deltas']:
        values.append(values[-1] + delta)

    return values

# 例:累積カウンタ(常に増加)
counter_values = [1000, 1005, 1012, 1015, 1020, 1028, 1035]
encoded = delta_encoding(counter_values)

print("【デルタエンコーディング】")
print(f"元データ:{counter_values}")
print(f"最初の値:{encoded['first']}")
print(f"差分:{encoded['deltas']}")  # [5, 7, 3, 5, 8, 7]
print(f"デコード:{delta_decode(encoded)}")

# 圧縮効果:元の値は 1000-1035(10-11 ビット必要)
# 差分は 3-8(3-4 ビットで十分)

出力:

【デルタエンコーディング】
元データ:[1000, 1005, 1012, 1015, 1020, 1028, 1035]
最初の値:1000
差分:[5, 7, 3, 5, 8, 7]
デコード:[1000, 1005, 1012, 1015, 1020, 1028, 1035]

ダウサンプリングと継続クエリ

ダウサンプリングの必要性

高頻度で収集したデータを長期間保存すると、データ量が爆発する。

【データ量の計算】

秒間 1000 センサー × 10 値/秒 × 86400 秒/日 = 8.64 億値/日
1 値 8 バイト → 約 7GB/日
30 日間保存 → 210GB
1 年間保存 → 2.5TB

→ すべてを_raw データで保存するのは非現実的

解決策: 時間経過とともに解像度を下げる(ダウサンプリング)

【ダウサンプリング戦略】

時間経過    解像度       保存方法
─────────────────────────────────────
最新 1 時間   1 秒間隔     _raw(生データ)
最新 1 日     1 分平均     1m_aggregation
最新 1 週間   1 時間平均    1h_aggregation
最新 1 ヶ月   1 日平均      1d_aggregation
1 ヶ月以上    1 週間平均    1w_aggregation

InfluxDB の継続クエリ(CQ)

InfluxDB では**継続クエリ(Continuous Query)**で自動ダウサンプリングを実現する。

from influxdb import InfluxDBClient
from datetime import datetime, timedelta

# InfluxDB 接続
client = InfluxDBClient(host='localhost', port=8086, database='iot_data')

# 1. 生データ(raw)の保存
# センサーが 5 秒間隔で書き込み
def write_sensor_data(sensor_id, temperature, humidity):
    json_body = [
        {
            "measurement": "sensor_data",
            "tags": {
                "sensor_id": sensor_id
            },
            "time": datetime.utcnow().isoformat(),
            "fields": {
                "temperature": temperature,
                "humidity": humidity
            }
        }
    ]
    client.write_points(json_body)

# 2. 継続クエリの作成(1 時間平均を hourly_aggregation に保存)
cq_query = """
CREATE CONTINUOUS QUERY "cq_1h_average" ON "iot_data"
BEGIN
    SELECT
        MEAN(temperature) AS "temperature_avg",
        MEAN(humidity) AS "humidity_avg",
        MAX(temperature) AS "temperature_max",
        MIN(temperature) AS "temperature_min"
    INTO "iot_data"."autogen"."hourly_aggregation"
    FROM "sensor_data"
    GROUP BY time(1h), sensor_id
END
"""
client.query(cq_query)

print("【継続クエリの設定】")
print("・sensor_data(生データ)→ 1 時間ごとの集計")
print("・結果は hourly_aggregation メトリックに保存")
print("・sensor_id ごとにグループ化")

# 3. ダウサンプリング後のデータ参照
# 最新 1 時間は生データ、それ以降は集計データを使う
query_1h_raw = """
SELECT temperature, humidity
FROM sensor_data
WHERE time >= now() - 1h
"""

query_24h_aggregated = """
SELECT temperature_avg, humidity_avg
FROM hourly_aggregation
WHERE time >= now() - 24h
"""

print("\n【クエリの使い分け】")
print(f"最新 1 時間:{query_1h_raw}")
print(f"過去 24 時間:{query_24h_aggregated}")

継続クエリの動作:

  1. CQ がバックグラウンドで定期的に実行(デフォルト:10 分間隔)
  2. 前回の CQ 実行以降の生データを対象に集計
  3. 集計結果を別メトリックに保存
  4. 古い生データはリテンションポリシーで自動削除

TimescaleDB の継続アグリゲーション

TimescaleDB(PostgreSQL 拡張)では継続アグリゲーション機能を提供する。

import psycopg2
from psycopg2.extras import RealDictCursor

# TimescaleDB 接続
conn = psycopg2.connect(
    host="localhost",
    database="iot_data",
    user="postgres",
    password="password"
)
conn.autocommit = True
cur = conn.cursor(cursor_factory=RealDictCursor)

# 1. ハイパーテーブルの作成(時系列専用テーブル)
cur.execute("""
CREATE TABLE sensor_data (
    time TIMESTAMPTZ NOT NULL,
    sensor_id INTEGER NOT NULL,
    temperature DOUBLE PRECISION,
    humidity DOUBLE PRECISION
);
""")

# ハイパーテーブルに変換(1 時間ごとにチャンク分割)
cur.execute("""
SELECT create_hypertable('sensor_data', 'time', chunk_time_interval => INTERVAL '1 hour');
""")

# 2. 1 時間ごとの集計ビュー(継続アグリゲーション)
cur.execute("""
CREATE MATERIALIZED VIEW sensor_data_1h
WITH (timescaledb.continuous) AS
SELECT
    time_bucket(INTERVAL '1 hour', time) AS bucket,
    sensor_id,
    AVG(temperature) AS temperature_avg,
    MAX(temperature) AS temperature_max,
    MIN(temperature) AS temperature_min,
    AVG(humidity) AS humidity_avg
FROM sensor_data
GROUP BY bucket, sensor_id;
""")

# 3. 集計の更新間隔を設定
cur.execute("""
SELECT add_continuous_aggregate_policy('sensor_data_1h',
    start_offset => INTERVAL '1 day',
    end_offset => INTERVAL '1 hour',
    schedule_interval => INTERVAL '1 hour');
""")

# 4. クエリ(集計ビューは透過的に最新データを含む)
cur.execute("""
SELECT bucket, sensor_id, temperature_avg
FROM sensor_data_1h
WHERE bucket >= NOW() - INTERVAL '7 days'
ORDER BY bucket DESC;
""")

print("【TimescaleDB 継続アグリゲーション】")
print("・1 時間ごとのチャンク分割")
print("・1 時間平均をマテリアライズドビュー")
print("・1 時間ごとに自動更新")
print("・最新 1 時間は生データ、過去は集計データを自動使用")

cur.close()
conn.close()

TimescaleDB の特徴:

  • PostgreSQL との完全互換性(SQL、JOIN、トランザクション)
  • 継続アグリゲーションはマテリアライズドビューとして実装
  • 増分更新(新しいチャンクのみ再計算)

リテンションポリシー——データの自動削除

InfluxDB のリテンションポリシー

from influxdb import InfluxDBClient

client = InfluxDBClient(host='localhost', port=8086, database='iot_data')

# 1. リテンションポリシーの作成
# 30 日間保存、デフォルトポリシー
client.create_retention_policy(
    retention_policy_name='thirty_days',
    duration='30d',
    replication=1,
    default=True
)

# 2. 異なる保存期間のポリシー
# 1 週間のみ保存(高速アクセス用)
client.create_retention_policy(
    retention_policy_name='one_week',
    duration='7d',
    replication=1,
    default=False
)

# 1 年間保存(長期保存用、S3 等へ)
client.create_retention_policy(
    retention_policy_name='one_year',
    duration='52w',
    replication=1,
    default=False
)

# 3. ポリシーの使い分け
# 生データは 7 日、集計データは 1 年
client.switch_database('iot_data', 'one_week')  # 生データ用
client.switch_database('iot_data', 'one_year')  # 集計データ用

print("【リテンションポリシーの使い分け】")
print("・sensor_data(生): 7 日間")
print("・hourly_aggregation(集計): 1 年間")
print("・daily_aggregation(要約): 5 年間")

# 4. 手動でのデータ削除(特定条件)
client.delete_series(
    measurement='sensor_data',
    tags={'sensor_id': 'sensor_001'},  # 特定センサーのみ削除
    start_time='2025-01-01',
    end_time='2025-06-30'
)

TimescaleDB のデータ削除

import psycopg2

conn = psycopg2.connect(
    host="localhost",
    database="iot_data",
    user="postgres",
    password="password"
)
conn.autocommit = True
cur = conn.cursor()

# 1. データ保持ポリシーの設定(30 日後自動削除)
cur.execute("""
SELECT add_retention_policy('sensor_data', INTERVAL '30 days');
""")

# 2. 集計ビューの保持ポリシー(1 年後自動削除)
cur.execute("""
SELECT add_retention_policy('sensor_data_1h', INTERVAL '1 year');
""")

# 3. チャンク単位の削除(手動)
# 特定の期間のチャンクを即時削除
cur.execute("""
SELECT show_chunks('sensor_data', older_than => NOW() - INTERVAL '6 months');
""")
old_chunks = cur.fetchall()

for chunk in old_chunks:
    print(f"削除対象チャンク:{chunk['show_chunks']}")

# 4. 圧縮(コールドデータを安価なストレージへ)
cur.execute("""
ALTER TABLE sensor_data SET (
    timescaledb.compress,
    timescaledb.compress_segmentby = 'sensor_id'
);
""")

# 1 日前のチャンクを自動圧縮
cur.execute("""
SELECT add_compression_policy('sensor_data', INTERVAL '1 day');
""")

print("【TimescaleDB データ管理】")
print("・生データ:30 日自動削除")
print("・1 時間集計:1 年自動削除")
print("・1 日前のデータ:自動圧縮(ストレージ効率向上)")

cur.close()
conn.close()

主要 TSDB の比較

製品比較

製品 データモデル クエリ言語 ライセンス 特徴
InfluxDB カスタム InfluxQL/Flux MIT/Enterprise 最も普及、エコシステム充実
TimescaleDB RDB(PostgreSQL 拡張) SQL(PostgreSQL 互換) Apache 2.0/Enterprise SQL 完全対応、JOIN 可能
Prometheus カスタム PromQL Apache 2.0 システム監視に最適化
QuestDB カスタム SQL 拡張 Apache 2.0 高速、低遅延
ClickHouse 列指向 RDB SQL 拡張 Apache 2.0 分析クエリに強い

InfluxDB 詳細

# InfluxDB 2.x のデータモデル
# Organization → Bucket → Measurement → Point

from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS

client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org")
write_api = client.write_api(write_options=SYNCHRONOUS)

# データ書き込み
point = Point("sensor_data") \
    .tag("sensor_id", "sensor_001") \
    .tag("location", "tokyo") \
    .field("temperature", 25.3) \
    .field("humidity", 60.5) \
    .time(datetime.utcnow())

write_api.write(bucket="iot_data", record=point)

# クエリ(Flux 言語)
from influxdb_client.client.flux_api import FluxAPI

flux_query = """
from(bucket: "iot_data")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "sensor_data")
  |> filter(fn: (r) => r.sensor_id == "sensor_001")
  |> aggregateWindow(every: 1m, fn: mean)
"""

query_api = client.query_api()
result = query_api.query(flux_query)

print("【InfluxDB 2.x データモデル】")
print("・Organization: マルチテナントの単位")
print("・Bucket: データの保存単位(RP+ データ)")
print("・Measurement: テーブルに相当")
print("・Point: 1 行のデータ")
print("・Tag: インデックス対象(WHERE 句)")
print("・Field: 値(SELECT 対象)")

TimescaleDB 詳細

import psycopg2

conn = psycopg2.connect(
    host="localhost",
    database="iot_data",
    user="postgres",
    password="password"
)
cur = conn.cursor()

# ハイパーテーブル作成
cur.execute("""
CREATE TABLE conditions (
    time TIMESTAMPTZ NOT NULL,
    device_id TEXT NOT NULL,
    temperature DOUBLE PRECISION,
    humidity DOUBLE PRECISION,
    battery_level DOUBLE PRECISION
);

SELECT create_hypertable('conditions', 'time');
""")

# インデックス(自動)
cur.execute("""
CREATE INDEX ON conditions (device_id, time DESC);
""")

# SQL クエリ(標準 SQL)
cur.execute("""
SELECT
    time_bucket('1 hour', time) AS hour,
    device_id,
    AVG(temperature) AS avg_temp,
    MAX(temperature) AS max_temp,
    MIN(temperature) AS min_temp
FROM conditions
WHERE time >= NOW() - INTERVAL '7 days'
GROUP BY hour, device_id
ORDER BY hour DESC;
""")

# 高度なクエリ(JOIN 可能)
cur.execute("""
SELECT
    c.time,
    c.device_id,
    c.temperature,
    d.location,
    d.installation_date
FROM conditions c
JOIN devices d ON c.device_id = d.device_id
WHERE c.time >= NOW() - INTERVAL '1 day';
""")

print("【TimescaleDB 特徴】")
print("・PostgreSQL の全機能(JOIN、サブクエリ、ストアドプロシージャ)")
print("・自動パーティショニング( hypertable → chunks)")
print("・時間関数:time_bucket(), date_trunc()")
print("・標準 SQL でクエリ可能")

cur.close()
conn.close()

Prometheus 詳細

# Prometheus 設定(prometheus.yml)
global:
  scrape_interval: 15s  # 15 秒間隔でメトリクス収集

scrape_configs:
  - job_name: 'node_exporter'
    static_configs:
      - targets: ['localhost:9100']

  - job_name: 'application'
    static_configs:
      - targets: ['localhost:8080']

# メトリクス例:
# http_requests_total{method="GET", status="200"} 1024
# node_cpu_seconds_total{cpu="0", mode="user"} 3600
# PromQL クエリ例
queries = {
    # 単純なメトリクス取得
    "cpu_usage": 'node_cpu_seconds_total{mode="user"}',

    # 増加率(1 時間あたりの増加)
    "request_rate": 'rate(http_requests_total[1h])',

    # 移動平均(5 分間の移動平均)
    "temp_moving_avg": 'avg_over_time(temperature[5m])',

    # パーセンタイル(95 パーセンタイル)
    "latency_p95": 'histogram_quantile(0.95, rate(request_duration_bucket[5m]))',

    # 割合(エラー率)
    "error_rate": 'rate(http_requests_total{status=~"5.."}[5m]) / rate(http_requests_total[5m])',
}

print("【PromQL クエリ例】")
for name, query in queries.items():
    print(f"{name}: {query}")

# PromQL の特徴:
# - rate(): 増加率計算
# - avg_over_time(), max_over_time(): 時間関数
# - histogram_quantile(): パーセンタイル計算
# - 正規表現マッチ:{status=~"5.."}

実装パターン——InfluxDB を使った IoT データ収集

全体構成

【IoT データ収集システム】

┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│ IoT デバイス │ ──→ │  ゲートウェイ │ ──→ │  InfluxDB   │
│ (温度センサー) │ MQTT │  (Python)    │ HTTP │  (TSDB)     │
└─────────────┘     └─────────────┘     └─────────────┘
                                               │
                                               ↓
                                        ┌─────────────┐
                                        │   Grafana   │
                                        │ (可視化)     │
                                        └─────────────┘

Python 実装

import paho.mqtt.client as mqtt
import json
from datetime import datetime
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS

# InfluxDB 設定
INFLUX_URL = "http://localhost:8086"
INFLUX_TOKEN = "my-super-secret-auth-token"
INFLUX_ORG = "my-org"
INFLUX_BUCKET = "iot_data"

# InfluxDB クライアント
client = InfluxDBClient(url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG)
write_api = client.write_api(write_options=SYNCHRONOUS)
query_api = client.query_api()

# MQTT コールバック
def on_connect(client, userdata, flags, rc):
    print(f"Connected with result code {rc}")
    # トピック購読
    client.subscribe("sensors/+/temperature")
    client.subscribe("sensors/+/humidity")

def on_message(client, userdata, msg):
    """
    メッセージ受信時の処理
    トピック例:sensors/sensor_001/temperature
    ペイロード例:{"value": 25.3, "unit": "C"}
    """
    # トピックからメタデータ抽出
    topic_parts = msg.topic.split('/')
    sensor_id = topic_parts[1]  # sensor_001
    metric_type = topic_parts[2]  # temperature

    # ペイロード解析
    payload = json.loads(msg.payload.decode())
    value = payload['value']
    unit = payload.get('unit', '')

    # InfluxDB に書き込み
    point = Point("sensor_data") \
        .tag("sensor_id", sensor_id) \
        .tag("metric_type", metric_type) \
        .field("value", value) \
        .field("unit", unit) \
        .time(datetime.utcnow())

    write_api.write(bucket=INFLUX_BUCKET, record=point)
    print(f"Written: {sensor_id} {metric_type} = {value}")

# MQTT クライアント設定
mqtt_client = mqtt.Client()
mqtt_client.on_connect = on_connect
mqtt_client.on_message = on_message
mqtt_client.connect("localhost", 1883, 60)

# 継続クエリ設定(1 時間平均)
cq_query = """
CREATE CONTINUOUS QUERY "cq_1h_average" ON "iot_data"
BEGIN
    SELECT
        MEAN(value) AS "value_avg",
        MAX(value) AS "value_max",
        MIN(value) AS "value_min"
    INTO "iot_data"."autogen"."hourly_aggregation"
    FROM "sensor_data"
    GROUP BY time(1h), sensor_id, metric_type
END
"""
client.query_api().query(cq_query)

print("【IoT データ収集システム起動】")
print("・MQTT トピック:sensors/+/temperature, sensors/+/humidity")
print("・InfluxDB バケット:iot_data")
print("・継続クエリ:1 時間平均を自動集計")

# メインループ
mqtt_client.loop_forever()

クエリ例

# データ查询

# 1. 最新 1 時間の生データ
query_1h = """
from(bucket: "iot_data")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "sensor_data")
  |> filter(fn: (r) => r.sensor_id == "sensor_001")
  |> filter(fn: (r) => r.metric_type == "temperature")
"""

# 2. 過去 24 時間の 1 時間平均(集計データ)
query_24h = """
from(bucket: "iot_data")
  |> range(start: -24h)
  |> from(bucket: "iot_data")
  |> filter(fn: (r) => r._measurement == "hourly_aggregation")
  |> filter(fn: (r) => r.sensor_id == "sensor_001")
"""

# 3. 異常検知(閾値超過)
query_anomaly = """
from(bucket: "iot_data")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "sensor_data")
  |> filter(fn: (r) => r.metric_type == "temperature")
  |> filter(fn: (r) => r.value > 30.0 or r.value < 10.0)
"""

# 4. 複数センサーの比較
query_compare = """
from(bucket: "iot_data")
  |> range(start: -6h)
  |> filter(fn: (r) => r._measurement == "sensor_data")
  |> filter(fn: (r) => r.metric_type == "temperature")
  |> aggregateWindow(every: 5m, fn: mean)
  |> pivot(rowKey: ["_time"], columnKey: ["sensor_id"], valueColumn: "_value")
"""

print("【クエリ例】")
print("1. 最新 1 時間の生データ")
print("2. 過去 24 時間の集計データ")
print("3. 異常検知(閾値超過アラート)")
print("4. 複数センサーの比較")

異常検知とアラート

統計的異常検知

import numpy as np
from influxdb_client import InfluxDBClient

client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org")
query_api = client.query_api()

def detect_anomalies(sensor_id, metric_type, window_hours=24):
    """
    移動平均と標準偏差による異常検知

    Parameters:
        sensor_id: センサー ID
        metric_type: メトリクス種別
        window_hours: 基準とする時間窓
    """
    # 過去データの取得
    query = f"""
    from(bucket: "iot_data")
      |> range(start: -{window_hours}h)
      |> filter(fn: (r) => r._measurement == "sensor_data")
      |> filter(fn: (r) => r.sensor_id == "{sensor_id}")
      |> filter(fn: (r) => r.metric_type == "{metric_type}")
      |> aggregateWindow(every: 1h, fn: mean)
    """

    result = query_api.query(query)

    # データ抽出
    values = []
    times = []
    for table in result:
        for record in table.records:
            values.append(record.get_value())
            times.append(record.get_time())

    values = np.array(values)

    # 移動平均と移動標準偏差
    window_size = 24  # 24 時間の移動窓
    if len(values) < window_size:
        return []

    moving_avg = np.convolve(values, np.ones(window_size)/window_size, mode='valid')
    moving_std = np.array([
        np.std(values[i:i+window_size]) for i in range(len(values) - window_size + 1)
    ])

    # 異常検知(3 シグマ法)
    anomalies = []
    for i in range(len(moving_avg)):
        z_score = abs(values[i + window_size - 1] - moving_avg[i]) / (moving_std[i] + 1e-6)
        if z_score > 3:  # 3 シグマ超過
            anomalies.append({
                'time': times[i + window_size - 1],
                'value': values[i + window_size - 1],
                'expected': moving_avg[i],
                'z_score': z_score
            })

    return anomalies

# 実行
anomalies = detect_anomalies("sensor_001", "temperature")

print("【異常検知結果】")
for anomaly in anomalies:
    print(f"時間:{anomaly['time']}")
    print(f"  観測値:{anomaly['value']:.2f}")
    print(f"  予測値:{anomaly['expected']:.2f}")
    print(f"  Z スコア:{anomaly['z_score']:.2f}")
    print()

InfluxDB アラート

from influxdb_client import InfluxDBClient
from influxdb_client.client.flux_api import FluxAPI

client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org")

# チェック(監視クエリ)の作成
from influxdb_client.domain.check import Check
from influxdb_client.domain.threshold import Threshold
from influxdb_client.domain.threshold_check import ThresholdCheck

# 温度が 30 度超過または 10 度未満でアラート
threshold_check = ThresholdCheck(
    name="temperature_anomaly",
    every="1m",  # 1 分間隔でチェック
    query='''
from(bucket: "iot_data")
  |> range(start: -5m)
  |> filter(fn: (r) => r._measurement == "sensor_data")
  |> filter(fn: (r) => r.metric_type == "temperature")
  |> filter(fn: (r) => r.value > 30.0 or r.value < 10.0)
''',
    thresholds=[
        Threshold(
            type="greater",
            value=30.0,
            level="crit",
            aggregateFunctionType="last"
        ),
        Threshold(
            type="lesser",
            value=10.0,
            level="crit",
            aggregateFunctionType="last"
        )
    ]
)

# チェック登録
checks_api = client.checks_api()
created_check = checks_api.create_check(threshold_check)

print("【アラート設定】")
print(f"チェック名:{created_check.name}")
print(f"実行間隔:{created_check.every}")
print("・温度 > 30°C: クリティカル")
print("・温度 < 10°C: クリティカル")

TSDB 選定の判断基準

ユースケース別推奨

【ユースケース別 TSDB 選定】

1. **IoT センサーデータ収集**
   - 推奨:InfluxDB, TimescaleDB
   - 理由:高頻度書き込みに最適化、ダウサンプリング機能
   - 規模感:秒間 1000-10000 書き込み

2. **システム/インフラ監視**
   - 推奨:Prometheus
   - 理由:メトリクス収集・アラート機能が標準装備
   - 规模感:数百〜数千メトリクス

3. **金融市場データ(株価、為替)**
   - 推奨:QuestDB, TimescaleDB
   - 理由:低遅延、SQL 対応(分析クエリ)
   - 规模感:秒間 10000+ 書き込み

4. **ログ分析と可視化**
   - 推奨:Elasticsearch(TSDB ではないが)
   - 理由:全文検索、Grafana/Prometheus と連携
   - 规模感:日間数百万〜数億ログ

5. **既存 PostgreSQL環境との統合**
   - 推奨:TimescaleDB
   - 理由:PostgreSQL拡張、既存クエリ資産活用
   - 规模感:中規模(日間数百万行)

選定チェックリスト

【TSDB 選定チェックリスト】

□ **データ特性**
  - 書き込み頻度(秒間件数)
  - データ保持期間
  - クエリパターン(最新参照 vs 集計分析)

□ **技術要件**
  - SQL 必須か(TimescaleDB, QuestDB)
  - JOIN 必要か(TimescaleDB のみ対応)
  - 既存システムとの統合(PostgreSQL → TimescaleDB)

□ **運用要件**
  - クラウドネイティブ必須か(InfluxDB Cloud, Timestream)
  - 監視アラート機能(Prometheus が標準装備)
  - 可視化ツール連携(Grafana は主要 TSDB すべて対応)

□ **コスト**
  - オープンソース必須か(すべて OSS 版あり)
  - クラウドサービス利用か(管理コスト削減)
  - サポート契約必要か(Enterprise 版)

まとめ

時系列データベースの核心:

  1. 基本概念: タイムスタンプ付きデータの保存・検索・集計に特化した DB
  2. RDBとの違い: 列指向保存、時間パーティショニング、高圧縮、高速集計
  3. 圧縮技術: Gorilla 圧縮、デルタエンコーディングで 90% 以上の圧縮率
  4. ダウサンプリング: 継続クエリで時間経過とともに解像度を低下
  5. リテンション: 自動削除ポリシーでストレージ管理を効率化
  6. 主要製品: InfluxDB(普及度)、TimescaleDB(SQL 対応)、Prometheus(監視)
  7. 選定基準: ユースケース、SQL 要件、既存環境で選択

TSDB 選定の指針:

  1. IoT・センサーデータ: InfluxDB または TimescaleDB
  2. システム監視: Prometheus(標準事実上)
  3. 金融・低遅延: QuestDB
  4. PostgreSQL 統合: TimescaleDB
  5. クラウド管理: AWS Timestream, InfluxDB Cloud

時系列データは IoT、FinTech、DevOps など幅広い分野で爆発的に増加している。適切な TSDB の選択と設計は、システムの性能とコストに直結する重要な技術決定だ。


参考資料

  • InfluxData. "InfluxDB Documentation - Data Storage Internals" https://docs.influxdata.com/
  • Timescale. "TimescaleDB Whitepaper - High-Performance Time-Series SQL" https://www.timescale.com/
  • Prometheus Authors. "Prometheus Documentation - Storage Layer" https://prometheus.io/docs/
  • QuestDB Team. "QuestDB Technical Overview" https://questdb.io/docs/
  • Facebook Engineering. "Gorilla: A Fast, Scalable, In-Memory Time Series Database" (2015)
  • Codd, E.F. "Time-Series Data Management: Concepts and Techniques" (2020)
  • AWS. "Amazon Timestream Developer Guide" https://docs.aws.amazon.com/timestream/
  • Grafana Labs. "Grafana Time Series Visualization" https://grafana.com/docs/
  • 日本時系列データサイエンス協会. 「時系列データ解析の基礎」(2024)
  • Martin Kleppmann. "Designing Data-Intensive Applications" Chapter 11 (O'Reilly, 2017)

免責事項 — 掲載情報は執筆時点のものです。料金・機能は変更される場合があります。最新情報は各公式サイトをご確認ください。