目次
「過去 1 年間のセンサーデータを保存したい」「5 秒ごとの株価データをリアルタイムで集計したい」「サーバーのメトリクスを 30 日間保持し、1 時間ごとに平均値を計算したい」——こうした要件に、リレーショナルデータベース(RDB)は十分に応えられない。
時系列データベース(Time-Series Database, TSDB)は、タイムスタンプ付きデータの保存・検索・集計に特化したデータベースだ。IoT デバイスのセンサーデータ、システムモニタリング、金融市場データ、ログ分析など、時間軸に沿ったデータ処理が必要な場面で威力を発揮する。
本記事では、時系列データベースの基本概念から、データの保存構造、圧縮技術、ダウサンプリング、継続クエリ、そして主要 TSDB 製品の比較までを体系的に解説する。
時系列データの特性と RDB の限界
時系列データの特徴
時系列データは、以下のような特性を持つ。
【時系列データの例】
1. IoT センサーデータ
- 温度:25.3°C, 25.5°C, 25.1°C, ...(5 秒間隔)
- 湿度:60%, 61%, 59%, ...(5 秒間隔)
- 気圧:1013.2hPa, 1013.5hPa, ...(5 秒間隔)
2. システムメトリクス
- CPU 使用率:45%, 52%, 48%, ...(10 秒間隔)
- メモリ使用量:4.2GB, 4.3GB, 4.1GB, ...(10 秒間隔)
- ディスク I/O: 120MB/s, 115MB/s, ...(10 秒間隔)
3. 金融市場データ
- 株価:1500 円,1502 円,1498 円,...(1 秒間隔)
- 出来高:1000 株,1500 株,800 株,...(1 秒間隔)
時系列データの本質的な特性:
- 時間順序性: データは時間順に到着し、過去の値との比較が重要
- 高頻度書き込み: 秒間数千〜数百万件の INSERT が発生
- 最新値へのアクセス集中: 直近 1 時間〜1 日のデータが最も参照される
- 集計需要: 平均、最大、最小、合計などの集計クエリが頻発
- 時間窓処理: 「過去 5 分間の平均」「1 時間ごとの合計」など時間ベースの処理
RDB での時系列データ管理の問題点
RDB で時系列データを管理しようとすると、以下の問題に直面する。
-- RDB での典型的な時系列データ設計
CREATE TABLE sensor_data (
id SERIAL PRIMARY KEY,
sensor_id INTEGER NOT NULL,
timestamp TIMESTAMP NOT NULL,
value DOUBLE PRECISION NOT NULL,
INDEX idx_timestamp (timestamp),
INDEX idx_sensor_time (sensor_id, timestamp)
);
-- 問題 1: 大量 INSERT でインデックス更新がボトルネック
INSERT INTO sensor_data (sensor_id, timestamp, value)
VALUES (1, '2025-07-20 10:00:00', 25.3); -- 秒間 1000 件INSERT...
-- 問題 2: 時間範囲クエリが遅い
SELECT AVG(value) FROM sensor_data
WHERE timestamp >= '2025-07-01'
AND timestamp < '2025-08-01'; -- 数千万行をスキャン
-- 問題 3: 古いデータの削除が重い
DELETE FROM sensor_data
WHERE timestamp < '2025-06-01'; -- 数千万行削除でテーブルロック
RDB の構造的な限界:
- インデックス更新コスト: B ツリーインデックスはランダムアクセスに最適で、時系列のような連続 INSERT に非効率
- 行ベース保存: 1 行ごとにメタデータを持ち、時系列データの連続性に非効率
- VACUUM/最適化: 削除後の領域回復に時間がかかる
- 列指向非対応: 集計クエリで不要なカラムも読み込む
TSDB の基本的な構造——時間軸でデータを整理する
列指向ストレージ
TSDB の多くは列指向ストレージを採用している。
【行指向 vs 列指向】
行指向(RDB):
┌─────────┬──────────┬───────┬────────┐
│ sensor_id│timestamp │ value │ status │
├─────────┼──────────┼───────┼────────┤
│ 1 │ 10:00:00 │ 25.3 │ OK │ ← 1 行
│ 1 │ 10:00:05 │ 25.5 │ OK │ ← 2 行
│ 1 │ 10:00:10 │ 25.1 │ OK │ ← 3 行
└─────────┴──────────┴───────┴────────┘
列指向(TSDB):
sensor_id: [1, 1, 1, 1, 1, ...]
timestamp: [10:00:00, 10:00:05, 10:00:10, 10:00:15, ...]
value: [25.3, 25.5, 25.1, 25.4, ...]
status: [OK, OK, OK, OK, ...]
列指向のメリット:
- 高い圧縮率: 同じ型・似た値が連続するため、圧縮アルゴリズムが効く
- 集計クエリの高速化:
AVG(value)なら value 列のみ読み込めばよい - I/O 効率: 必要なカラムのみディスクから読み込む
時間ベースのパーティショニング
TSDB はデータを時間単位で分割して管理する。
【パーティショニングの概念】
sensor_data メトリック:
┌─────────────────────────────────────────────┐
│ 2025-07-19 日のシャード(過去) │
│ - 圧縮済み、読み取り専用 │
│ - S3 などの安価なストレージに移動可能 │
├─────────────────────────────────────────────┤
│ 2025-07-20 午前のシャード │
│ - 圧縮済み、読み取りメイン │
├─────────────────────────────────────────────┤
│ 2025-07-20 午後のシャード(現在) │
│ - 書き込み中、インメモリ、高速アクセス │
└─────────────────────────────────────────────┘
パーティショニングのメリット:
- 古いデータの削除: シャード単位で即時削除可能
- 階層化ストレージ: 最新データは SSD、過去データは HDD/S3
- 並列クエリ: 複数シャードを並列にスキャン
TSM(Time-Structured Merge Tree)
InfluxDB が採用するTSMは、時系列データに最適化された保存エンジンだ。
【TSM の書き込みフロー】
1. メモリ(WAL + TSM Cache):
書き込み → WAL(Write-Ahead Log)に追記
→ TSM Cache(インメモリ)に蓄積
2. フラッシュ:
Cache が 100MB 到達 → TSM ファイルにフラッシュ
(順序付きでソート、圧縮)
3. compaction:
複数 TSM ファイル → 1 つにマージ
(重複削除、圧縮率向上)
ディスク構造:
┌──────────────────────────────────────┐
│ WAL ファイル(追加のみ、順序なし) │
├──────────────────────────────────────┤
│ TSM ファイル 1(ソート済、圧縮済) │ ← 10:00-10:05
│ TSM ファイル 2(ソート済、圧縮済) │ ← 10:05-10:10
│ TSM ファイル 3(ソート済、圧縮済) │ ← 10:10-10:15
└──────────────────────────────────────┘
TSM の特徴:
- 追加のみ: 既存ファイルの更新はなく、常に追記
- ソート済み: タイムスタンプ順にソート済み、範囲クエリが高速
- 高圧縮: 類似値が連続、Gorilla 圧縮などの専用アルゴリズム
時系列データの圧縮技術
Gorilla 圧縮(Facebook)
Facebook の Gorilla TSDB が開発した、時系列データに特化した圧縮アルゴリズムだ。
def gorilla_timestamp_encoding(timestamps):
"""
Gorilla のタイムスタンプ圧縮
基本アイデア:連続するタイムスタンプの差分(delta)を記録
さらに delta の差分(delta-of-delta)を記録
"""
if len(timestamps) < 2:
return timestamps
# 最初の値
first_timestamp = timestamps[0]
first_delta = timestamps[1] - timestamps[0]
# delta-of-delta を計算
delta_of_deltas = []
prev_delta = first_delta
for i in range(2, len(timestamps)):
delta = timestamps[i] - timestamps[i-1]
dod = delta - prev_delta
delta_of_deltas.append(dod)
prev_delta = delta
# delta-of-delta は 0 になることが多く、効率的に符号化可能
return {
'first': first_timestamp,
'first_delta': first_delta,
'dod': delta_of_deltas # 多くの要素が 0
}
# 例:5 秒間隔のセンサーデータ
timestamps = [1000, 1005, 1010, 1015, 1020, 1025]
result = gorilla_timestamp_encoding(timestamps)
print("【Gorilla タイムスタンプ圧縮】")
print(f"元データ:{timestamps}")
print(f"最初のタイムスタンプ:{result['first']}")
print(f"最初の差分:{result['first_delta']}")
print(f"delta-of-delta: {result['dod']}") # [0, 0, 0, 0] → 効率的に符号化
出力:
【Gorilla タイムスタンプ圧縮】
元データ:[1000, 1005, 1010, 1015, 1020, 1025]
最初のタイムスタンプ:1000
最初の差分:5
delta-of-delta: [0, 0, 0, 0]
Gorilla 圧縮の本質:
- センサーデータは等間隔で収集されることが多い
- 等間隔なら delta-of-delta は常に 0
- 0 の連続は、ビット列で効率的に表現可能
XOR 値圧縮
Gorilla は数値データも圧縮する。
def gorilla_value_encoding(values):
"""
Gorilla の数値圧縮(XOR ベース)
基本アイデア:浮動小数点数の XOR を取り、
変化が小さい場合は多くのビットが 0 になる
"""
if len(values) < 2:
return values
import struct
def float_to_bits(f):
return struct.unpack('>Q', struct.pack('>d', f))[0]
def bits_to_float(b):
return struct.unpack('>d', struct.pack('>Q', b))[0]
# 最初の値はそのまま保存
first_value = values[0]
prev_bits = float_to_bits(first_value)
encoded = []
for i in range(1, len(values)):
current_bits = float_to_bits(values[i])
xor = prev_bits ^ current_bits
# XOR 結果の先頭・末尾の 0 ビット数をカウント
if xor == 0:
# 完全に同じ値
encoded.append({'type': 'same', 'data': None})
else:
# 有効ビットのみ保存
leading_zeros = count_leading_zeros(xor)
trailing_zeros = count_trailing_zeros(xor)
significant_bits = xor >> trailing_zeros
encoded.append({
'type': 'different',
'leading': leading_zeros,
'trailing': trailing_zeros,
'data': significant_bits
})
prev_bits = current_bits
return {'first': first_value, 'encoded': encoded}
def count_leading_zeros(n, bits=64):
count = 0
for i in range(bits - 1, -1, -1):
if (n >> i) & 1:
break
count += 1
return count
def count_trailing_zeros(n):
count = 0
while n and not (n & 1):
count += 1
n >>= 1
return count
# 例:温度センサーの値(少しずつ変化する)
values = [25.3, 25.3, 25.3, 25.4, 25.5, 25.5, 25.5]
result = gorilla_value_encoding(values)
print("\n【Gorilla 値圧縮】")
print(f"元データ:{values}")
print(f"最初の値:{result['first']}")
print(f"エンコード結果:")
for i, enc in enumerate(result['encoded']):
if enc['type'] == 'same':
print(f" 値{i+1}: 同じ値(0 ビット)")
else:
print(f" 値{i+1}: {enc['leading']}ビットシフト、{enc['data']} を保存")
出力:
【Gorilla 値圧縮】
元データ:[25.3, 25.3, 25.3, 25.4, 25.5, 25.5, 25.5]
最初の値:25.3
エンコード結果:
値 1: 同じ値(0 ビット)
値 2: 同じ値(0 ビット)
値 3: 12 ビットシフト、[有効ビット] を保存
値 4: 11 ビットシフト、[有効ビット] を保存
値 5: 同じ値(0 ビット)
値 6: 同じ値(0 ビット)
圧縮効果:
- 変化の少ないセンサーデータ:90% 以上の圧縮率
- 完全に同じ値:1 ビット(フラグのみ)で表現
- 少し变化的値:有効ビットのみ保存(通常 10-20 ビット)
デルタエンコーディング
より単純な圧縮手法として、デルタ(差分)エンコーディングもある。
def delta_encoding(values):
"""
デルタエンコーディング
連続する値の差分のみを保存
"""
if not values:
return []
first = values[0]
deltas = [values[i] - values[i-1] for i in range(1, len(values))]
return {'first': first, 'deltas': deltas}
def delta_decode(encoded):
"""デコード"""
if not encoded['deltas']:
return [encoded['first']]
values = [encoded['first']]
for delta in encoded['deltas']:
values.append(values[-1] + delta)
return values
# 例:累積カウンタ(常に増加)
counter_values = [1000, 1005, 1012, 1015, 1020, 1028, 1035]
encoded = delta_encoding(counter_values)
print("【デルタエンコーディング】")
print(f"元データ:{counter_values}")
print(f"最初の値:{encoded['first']}")
print(f"差分:{encoded['deltas']}") # [5, 7, 3, 5, 8, 7]
print(f"デコード:{delta_decode(encoded)}")
# 圧縮効果:元の値は 1000-1035(10-11 ビット必要)
# 差分は 3-8(3-4 ビットで十分)
出力:
【デルタエンコーディング】
元データ:[1000, 1005, 1012, 1015, 1020, 1028, 1035]
最初の値:1000
差分:[5, 7, 3, 5, 8, 7]
デコード:[1000, 1005, 1012, 1015, 1020, 1028, 1035]
ダウサンプリングと継続クエリ
ダウサンプリングの必要性
高頻度で収集したデータを長期間保存すると、データ量が爆発する。
【データ量の計算】
秒間 1000 センサー × 10 値/秒 × 86400 秒/日 = 8.64 億値/日
1 値 8 バイト → 約 7GB/日
30 日間保存 → 210GB
1 年間保存 → 2.5TB
→ すべてを_raw データで保存するのは非現実的
解決策: 時間経過とともに解像度を下げる(ダウサンプリング)
【ダウサンプリング戦略】
時間経過 解像度 保存方法
─────────────────────────────────────
最新 1 時間 1 秒間隔 _raw(生データ)
最新 1 日 1 分平均 1m_aggregation
最新 1 週間 1 時間平均 1h_aggregation
最新 1 ヶ月 1 日平均 1d_aggregation
1 ヶ月以上 1 週間平均 1w_aggregation
InfluxDB の継続クエリ(CQ)
InfluxDB では**継続クエリ(Continuous Query)**で自動ダウサンプリングを実現する。
from influxdb import InfluxDBClient
from datetime import datetime, timedelta
# InfluxDB 接続
client = InfluxDBClient(host='localhost', port=8086, database='iot_data')
# 1. 生データ(raw)の保存
# センサーが 5 秒間隔で書き込み
def write_sensor_data(sensor_id, temperature, humidity):
json_body = [
{
"measurement": "sensor_data",
"tags": {
"sensor_id": sensor_id
},
"time": datetime.utcnow().isoformat(),
"fields": {
"temperature": temperature,
"humidity": humidity
}
}
]
client.write_points(json_body)
# 2. 継続クエリの作成(1 時間平均を hourly_aggregation に保存)
cq_query = """
CREATE CONTINUOUS QUERY "cq_1h_average" ON "iot_data"
BEGIN
SELECT
MEAN(temperature) AS "temperature_avg",
MEAN(humidity) AS "humidity_avg",
MAX(temperature) AS "temperature_max",
MIN(temperature) AS "temperature_min"
INTO "iot_data"."autogen"."hourly_aggregation"
FROM "sensor_data"
GROUP BY time(1h), sensor_id
END
"""
client.query(cq_query)
print("【継続クエリの設定】")
print("・sensor_data(生データ)→ 1 時間ごとの集計")
print("・結果は hourly_aggregation メトリックに保存")
print("・sensor_id ごとにグループ化")
# 3. ダウサンプリング後のデータ参照
# 最新 1 時間は生データ、それ以降は集計データを使う
query_1h_raw = """
SELECT temperature, humidity
FROM sensor_data
WHERE time >= now() - 1h
"""
query_24h_aggregated = """
SELECT temperature_avg, humidity_avg
FROM hourly_aggregation
WHERE time >= now() - 24h
"""
print("\n【クエリの使い分け】")
print(f"最新 1 時間:{query_1h_raw}")
print(f"過去 24 時間:{query_24h_aggregated}")
継続クエリの動作:
- CQ がバックグラウンドで定期的に実行(デフォルト:10 分間隔)
- 前回の CQ 実行以降の生データを対象に集計
- 集計結果を別メトリックに保存
- 古い生データはリテンションポリシーで自動削除
TimescaleDB の継続アグリゲーション
TimescaleDB(PostgreSQL 拡張)では継続アグリゲーション機能を提供する。
import psycopg2
from psycopg2.extras import RealDictCursor
# TimescaleDB 接続
conn = psycopg2.connect(
host="localhost",
database="iot_data",
user="postgres",
password="password"
)
conn.autocommit = True
cur = conn.cursor(cursor_factory=RealDictCursor)
# 1. ハイパーテーブルの作成(時系列専用テーブル)
cur.execute("""
CREATE TABLE sensor_data (
time TIMESTAMPTZ NOT NULL,
sensor_id INTEGER NOT NULL,
temperature DOUBLE PRECISION,
humidity DOUBLE PRECISION
);
""")
# ハイパーテーブルに変換(1 時間ごとにチャンク分割)
cur.execute("""
SELECT create_hypertable('sensor_data', 'time', chunk_time_interval => INTERVAL '1 hour');
""")
# 2. 1 時間ごとの集計ビュー(継続アグリゲーション)
cur.execute("""
CREATE MATERIALIZED VIEW sensor_data_1h
WITH (timescaledb.continuous) AS
SELECT
time_bucket(INTERVAL '1 hour', time) AS bucket,
sensor_id,
AVG(temperature) AS temperature_avg,
MAX(temperature) AS temperature_max,
MIN(temperature) AS temperature_min,
AVG(humidity) AS humidity_avg
FROM sensor_data
GROUP BY bucket, sensor_id;
""")
# 3. 集計の更新間隔を設定
cur.execute("""
SELECT add_continuous_aggregate_policy('sensor_data_1h',
start_offset => INTERVAL '1 day',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '1 hour');
""")
# 4. クエリ(集計ビューは透過的に最新データを含む)
cur.execute("""
SELECT bucket, sensor_id, temperature_avg
FROM sensor_data_1h
WHERE bucket >= NOW() - INTERVAL '7 days'
ORDER BY bucket DESC;
""")
print("【TimescaleDB 継続アグリゲーション】")
print("・1 時間ごとのチャンク分割")
print("・1 時間平均をマテリアライズドビュー")
print("・1 時間ごとに自動更新")
print("・最新 1 時間は生データ、過去は集計データを自動使用")
cur.close()
conn.close()
TimescaleDB の特徴:
- PostgreSQL との完全互換性(SQL、JOIN、トランザクション)
- 継続アグリゲーションはマテリアライズドビューとして実装
- 増分更新(新しいチャンクのみ再計算)
リテンションポリシー——データの自動削除
InfluxDB のリテンションポリシー
from influxdb import InfluxDBClient
client = InfluxDBClient(host='localhost', port=8086, database='iot_data')
# 1. リテンションポリシーの作成
# 30 日間保存、デフォルトポリシー
client.create_retention_policy(
retention_policy_name='thirty_days',
duration='30d',
replication=1,
default=True
)
# 2. 異なる保存期間のポリシー
# 1 週間のみ保存(高速アクセス用)
client.create_retention_policy(
retention_policy_name='one_week',
duration='7d',
replication=1,
default=False
)
# 1 年間保存(長期保存用、S3 等へ)
client.create_retention_policy(
retention_policy_name='one_year',
duration='52w',
replication=1,
default=False
)
# 3. ポリシーの使い分け
# 生データは 7 日、集計データは 1 年
client.switch_database('iot_data', 'one_week') # 生データ用
client.switch_database('iot_data', 'one_year') # 集計データ用
print("【リテンションポリシーの使い分け】")
print("・sensor_data(生): 7 日間")
print("・hourly_aggregation(集計): 1 年間")
print("・daily_aggregation(要約): 5 年間")
# 4. 手動でのデータ削除(特定条件)
client.delete_series(
measurement='sensor_data',
tags={'sensor_id': 'sensor_001'}, # 特定センサーのみ削除
start_time='2025-01-01',
end_time='2025-06-30'
)
TimescaleDB のデータ削除
import psycopg2
conn = psycopg2.connect(
host="localhost",
database="iot_data",
user="postgres",
password="password"
)
conn.autocommit = True
cur = conn.cursor()
# 1. データ保持ポリシーの設定(30 日後自動削除)
cur.execute("""
SELECT add_retention_policy('sensor_data', INTERVAL '30 days');
""")
# 2. 集計ビューの保持ポリシー(1 年後自動削除)
cur.execute("""
SELECT add_retention_policy('sensor_data_1h', INTERVAL '1 year');
""")
# 3. チャンク単位の削除(手動)
# 特定の期間のチャンクを即時削除
cur.execute("""
SELECT show_chunks('sensor_data', older_than => NOW() - INTERVAL '6 months');
""")
old_chunks = cur.fetchall()
for chunk in old_chunks:
print(f"削除対象チャンク:{chunk['show_chunks']}")
# 4. 圧縮(コールドデータを安価なストレージへ)
cur.execute("""
ALTER TABLE sensor_data SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'sensor_id'
);
""")
# 1 日前のチャンクを自動圧縮
cur.execute("""
SELECT add_compression_policy('sensor_data', INTERVAL '1 day');
""")
print("【TimescaleDB データ管理】")
print("・生データ:30 日自動削除")
print("・1 時間集計:1 年自動削除")
print("・1 日前のデータ:自動圧縮(ストレージ効率向上)")
cur.close()
conn.close()
主要 TSDB の比較
製品比較
| 製品 | データモデル | クエリ言語 | ライセンス | 特徴 |
|---|---|---|---|---|
| InfluxDB | カスタム | InfluxQL/Flux | MIT/Enterprise | 最も普及、エコシステム充実 |
| TimescaleDB | RDB(PostgreSQL 拡張) | SQL(PostgreSQL 互換) | Apache 2.0/Enterprise | SQL 完全対応、JOIN 可能 |
| Prometheus | カスタム | PromQL | Apache 2.0 | システム監視に最適化 |
| QuestDB | カスタム | SQL 拡張 | Apache 2.0 | 高速、低遅延 |
| ClickHouse | 列指向 RDB | SQL 拡張 | Apache 2.0 | 分析クエリに強い |
InfluxDB 詳細
# InfluxDB 2.x のデータモデル
# Organization → Bucket → Measurement → Point
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org")
write_api = client.write_api(write_options=SYNCHRONOUS)
# データ書き込み
point = Point("sensor_data") \
.tag("sensor_id", "sensor_001") \
.tag("location", "tokyo") \
.field("temperature", 25.3) \
.field("humidity", 60.5) \
.time(datetime.utcnow())
write_api.write(bucket="iot_data", record=point)
# クエリ(Flux 言語)
from influxdb_client.client.flux_api import FluxAPI
flux_query = """
from(bucket: "iot_data")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "sensor_data")
|> filter(fn: (r) => r.sensor_id == "sensor_001")
|> aggregateWindow(every: 1m, fn: mean)
"""
query_api = client.query_api()
result = query_api.query(flux_query)
print("【InfluxDB 2.x データモデル】")
print("・Organization: マルチテナントの単位")
print("・Bucket: データの保存単位(RP+ データ)")
print("・Measurement: テーブルに相当")
print("・Point: 1 行のデータ")
print("・Tag: インデックス対象(WHERE 句)")
print("・Field: 値(SELECT 対象)")
TimescaleDB 詳細
import psycopg2
conn = psycopg2.connect(
host="localhost",
database="iot_data",
user="postgres",
password="password"
)
cur = conn.cursor()
# ハイパーテーブル作成
cur.execute("""
CREATE TABLE conditions (
time TIMESTAMPTZ NOT NULL,
device_id TEXT NOT NULL,
temperature DOUBLE PRECISION,
humidity DOUBLE PRECISION,
battery_level DOUBLE PRECISION
);
SELECT create_hypertable('conditions', 'time');
""")
# インデックス(自動)
cur.execute("""
CREATE INDEX ON conditions (device_id, time DESC);
""")
# SQL クエリ(標準 SQL)
cur.execute("""
SELECT
time_bucket('1 hour', time) AS hour,
device_id,
AVG(temperature) AS avg_temp,
MAX(temperature) AS max_temp,
MIN(temperature) AS min_temp
FROM conditions
WHERE time >= NOW() - INTERVAL '7 days'
GROUP BY hour, device_id
ORDER BY hour DESC;
""")
# 高度なクエリ(JOIN 可能)
cur.execute("""
SELECT
c.time,
c.device_id,
c.temperature,
d.location,
d.installation_date
FROM conditions c
JOIN devices d ON c.device_id = d.device_id
WHERE c.time >= NOW() - INTERVAL '1 day';
""")
print("【TimescaleDB 特徴】")
print("・PostgreSQL の全機能(JOIN、サブクエリ、ストアドプロシージャ)")
print("・自動パーティショニング( hypertable → chunks)")
print("・時間関数:time_bucket(), date_trunc()")
print("・標準 SQL でクエリ可能")
cur.close()
conn.close()
Prometheus 詳細
# Prometheus 設定(prometheus.yml)
global:
scrape_interval: 15s # 15 秒間隔でメトリクス収集
scrape_configs:
- job_name: 'node_exporter'
static_configs:
- targets: ['localhost:9100']
- job_name: 'application'
static_configs:
- targets: ['localhost:8080']
# メトリクス例:
# http_requests_total{method="GET", status="200"} 1024
# node_cpu_seconds_total{cpu="0", mode="user"} 3600
# PromQL クエリ例
queries = {
# 単純なメトリクス取得
"cpu_usage": 'node_cpu_seconds_total{mode="user"}',
# 増加率(1 時間あたりの増加)
"request_rate": 'rate(http_requests_total[1h])',
# 移動平均(5 分間の移動平均)
"temp_moving_avg": 'avg_over_time(temperature[5m])',
# パーセンタイル(95 パーセンタイル)
"latency_p95": 'histogram_quantile(0.95, rate(request_duration_bucket[5m]))',
# 割合(エラー率)
"error_rate": 'rate(http_requests_total{status=~"5.."}[5m]) / rate(http_requests_total[5m])',
}
print("【PromQL クエリ例】")
for name, query in queries.items():
print(f"{name}: {query}")
# PromQL の特徴:
# - rate(): 増加率計算
# - avg_over_time(), max_over_time(): 時間関数
# - histogram_quantile(): パーセンタイル計算
# - 正規表現マッチ:{status=~"5.."}
実装パターン——InfluxDB を使った IoT データ収集
全体構成
【IoT データ収集システム】
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ IoT デバイス │ ──→ │ ゲートウェイ │ ──→ │ InfluxDB │
│ (温度センサー) │ MQTT │ (Python) │ HTTP │ (TSDB) │
└─────────────┘ └─────────────┘ └─────────────┘
│
↓
┌─────────────┐
│ Grafana │
│ (可視化) │
└─────────────┘
Python 実装
import paho.mqtt.client as mqtt
import json
from datetime import datetime
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
# InfluxDB 設定
INFLUX_URL = "http://localhost:8086"
INFLUX_TOKEN = "my-super-secret-auth-token"
INFLUX_ORG = "my-org"
INFLUX_BUCKET = "iot_data"
# InfluxDB クライアント
client = InfluxDBClient(url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG)
write_api = client.write_api(write_options=SYNCHRONOUS)
query_api = client.query_api()
# MQTT コールバック
def on_connect(client, userdata, flags, rc):
print(f"Connected with result code {rc}")
# トピック購読
client.subscribe("sensors/+/temperature")
client.subscribe("sensors/+/humidity")
def on_message(client, userdata, msg):
"""
メッセージ受信時の処理
トピック例:sensors/sensor_001/temperature
ペイロード例:{"value": 25.3, "unit": "C"}
"""
# トピックからメタデータ抽出
topic_parts = msg.topic.split('/')
sensor_id = topic_parts[1] # sensor_001
metric_type = topic_parts[2] # temperature
# ペイロード解析
payload = json.loads(msg.payload.decode())
value = payload['value']
unit = payload.get('unit', '')
# InfluxDB に書き込み
point = Point("sensor_data") \
.tag("sensor_id", sensor_id) \
.tag("metric_type", metric_type) \
.field("value", value) \
.field("unit", unit) \
.time(datetime.utcnow())
write_api.write(bucket=INFLUX_BUCKET, record=point)
print(f"Written: {sensor_id} {metric_type} = {value}")
# MQTT クライアント設定
mqtt_client = mqtt.Client()
mqtt_client.on_connect = on_connect
mqtt_client.on_message = on_message
mqtt_client.connect("localhost", 1883, 60)
# 継続クエリ設定(1 時間平均)
cq_query = """
CREATE CONTINUOUS QUERY "cq_1h_average" ON "iot_data"
BEGIN
SELECT
MEAN(value) AS "value_avg",
MAX(value) AS "value_max",
MIN(value) AS "value_min"
INTO "iot_data"."autogen"."hourly_aggregation"
FROM "sensor_data"
GROUP BY time(1h), sensor_id, metric_type
END
"""
client.query_api().query(cq_query)
print("【IoT データ収集システム起動】")
print("・MQTT トピック:sensors/+/temperature, sensors/+/humidity")
print("・InfluxDB バケット:iot_data")
print("・継続クエリ:1 時間平均を自動集計")
# メインループ
mqtt_client.loop_forever()
クエリ例
# データ查询
# 1. 最新 1 時間の生データ
query_1h = """
from(bucket: "iot_data")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "sensor_data")
|> filter(fn: (r) => r.sensor_id == "sensor_001")
|> filter(fn: (r) => r.metric_type == "temperature")
"""
# 2. 過去 24 時間の 1 時間平均(集計データ)
query_24h = """
from(bucket: "iot_data")
|> range(start: -24h)
|> from(bucket: "iot_data")
|> filter(fn: (r) => r._measurement == "hourly_aggregation")
|> filter(fn: (r) => r.sensor_id == "sensor_001")
"""
# 3. 異常検知(閾値超過)
query_anomaly = """
from(bucket: "iot_data")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "sensor_data")
|> filter(fn: (r) => r.metric_type == "temperature")
|> filter(fn: (r) => r.value > 30.0 or r.value < 10.0)
"""
# 4. 複数センサーの比較
query_compare = """
from(bucket: "iot_data")
|> range(start: -6h)
|> filter(fn: (r) => r._measurement == "sensor_data")
|> filter(fn: (r) => r.metric_type == "temperature")
|> aggregateWindow(every: 5m, fn: mean)
|> pivot(rowKey: ["_time"], columnKey: ["sensor_id"], valueColumn: "_value")
"""
print("【クエリ例】")
print("1. 最新 1 時間の生データ")
print("2. 過去 24 時間の集計データ")
print("3. 異常検知(閾値超過アラート)")
print("4. 複数センサーの比較")
異常検知とアラート
統計的異常検知
import numpy as np
from influxdb_client import InfluxDBClient
client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org")
query_api = client.query_api()
def detect_anomalies(sensor_id, metric_type, window_hours=24):
"""
移動平均と標準偏差による異常検知
Parameters:
sensor_id: センサー ID
metric_type: メトリクス種別
window_hours: 基準とする時間窓
"""
# 過去データの取得
query = f"""
from(bucket: "iot_data")
|> range(start: -{window_hours}h)
|> filter(fn: (r) => r._measurement == "sensor_data")
|> filter(fn: (r) => r.sensor_id == "{sensor_id}")
|> filter(fn: (r) => r.metric_type == "{metric_type}")
|> aggregateWindow(every: 1h, fn: mean)
"""
result = query_api.query(query)
# データ抽出
values = []
times = []
for table in result:
for record in table.records:
values.append(record.get_value())
times.append(record.get_time())
values = np.array(values)
# 移動平均と移動標準偏差
window_size = 24 # 24 時間の移動窓
if len(values) < window_size:
return []
moving_avg = np.convolve(values, np.ones(window_size)/window_size, mode='valid')
moving_std = np.array([
np.std(values[i:i+window_size]) for i in range(len(values) - window_size + 1)
])
# 異常検知(3 シグマ法)
anomalies = []
for i in range(len(moving_avg)):
z_score = abs(values[i + window_size - 1] - moving_avg[i]) / (moving_std[i] + 1e-6)
if z_score > 3: # 3 シグマ超過
anomalies.append({
'time': times[i + window_size - 1],
'value': values[i + window_size - 1],
'expected': moving_avg[i],
'z_score': z_score
})
return anomalies
# 実行
anomalies = detect_anomalies("sensor_001", "temperature")
print("【異常検知結果】")
for anomaly in anomalies:
print(f"時間:{anomaly['time']}")
print(f" 観測値:{anomaly['value']:.2f}")
print(f" 予測値:{anomaly['expected']:.2f}")
print(f" Z スコア:{anomaly['z_score']:.2f}")
print()
InfluxDB アラート
from influxdb_client import InfluxDBClient
from influxdb_client.client.flux_api import FluxAPI
client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org")
# チェック(監視クエリ)の作成
from influxdb_client.domain.check import Check
from influxdb_client.domain.threshold import Threshold
from influxdb_client.domain.threshold_check import ThresholdCheck
# 温度が 30 度超過または 10 度未満でアラート
threshold_check = ThresholdCheck(
name="temperature_anomaly",
every="1m", # 1 分間隔でチェック
query='''
from(bucket: "iot_data")
|> range(start: -5m)
|> filter(fn: (r) => r._measurement == "sensor_data")
|> filter(fn: (r) => r.metric_type == "temperature")
|> filter(fn: (r) => r.value > 30.0 or r.value < 10.0)
''',
thresholds=[
Threshold(
type="greater",
value=30.0,
level="crit",
aggregateFunctionType="last"
),
Threshold(
type="lesser",
value=10.0,
level="crit",
aggregateFunctionType="last"
)
]
)
# チェック登録
checks_api = client.checks_api()
created_check = checks_api.create_check(threshold_check)
print("【アラート設定】")
print(f"チェック名:{created_check.name}")
print(f"実行間隔:{created_check.every}")
print("・温度 > 30°C: クリティカル")
print("・温度 < 10°C: クリティカル")
TSDB 選定の判断基準
ユースケース別推奨
【ユースケース別 TSDB 選定】
1. **IoT センサーデータ収集**
- 推奨:InfluxDB, TimescaleDB
- 理由:高頻度書き込みに最適化、ダウサンプリング機能
- 規模感:秒間 1000-10000 書き込み
2. **システム/インフラ監視**
- 推奨:Prometheus
- 理由:メトリクス収集・アラート機能が標準装備
- 规模感:数百〜数千メトリクス
3. **金融市場データ(株価、為替)**
- 推奨:QuestDB, TimescaleDB
- 理由:低遅延、SQL 対応(分析クエリ)
- 规模感:秒間 10000+ 書き込み
4. **ログ分析と可視化**
- 推奨:Elasticsearch(TSDB ではないが)
- 理由:全文検索、Grafana/Prometheus と連携
- 规模感:日間数百万〜数億ログ
5. **既存 PostgreSQL環境との統合**
- 推奨:TimescaleDB
- 理由:PostgreSQL拡張、既存クエリ資産活用
- 规模感:中規模(日間数百万行)
選定チェックリスト
【TSDB 選定チェックリスト】
□ **データ特性**
- 書き込み頻度(秒間件数)
- データ保持期間
- クエリパターン(最新参照 vs 集計分析)
□ **技術要件**
- SQL 必須か(TimescaleDB, QuestDB)
- JOIN 必要か(TimescaleDB のみ対応)
- 既存システムとの統合(PostgreSQL → TimescaleDB)
□ **運用要件**
- クラウドネイティブ必須か(InfluxDB Cloud, Timestream)
- 監視アラート機能(Prometheus が標準装備)
- 可視化ツール連携(Grafana は主要 TSDB すべて対応)
□ **コスト**
- オープンソース必須か(すべて OSS 版あり)
- クラウドサービス利用か(管理コスト削減)
- サポート契約必要か(Enterprise 版)
まとめ
時系列データベースの核心:
- 基本概念: タイムスタンプ付きデータの保存・検索・集計に特化した DB
- RDBとの違い: 列指向保存、時間パーティショニング、高圧縮、高速集計
- 圧縮技術: Gorilla 圧縮、デルタエンコーディングで 90% 以上の圧縮率
- ダウサンプリング: 継続クエリで時間経過とともに解像度を低下
- リテンション: 自動削除ポリシーでストレージ管理を効率化
- 主要製品: InfluxDB(普及度)、TimescaleDB(SQL 対応)、Prometheus(監視)
- 選定基準: ユースケース、SQL 要件、既存環境で選択
TSDB 選定の指針:
- IoT・センサーデータ: InfluxDB または TimescaleDB
- システム監視: Prometheus(標準事実上)
- 金融・低遅延: QuestDB
- PostgreSQL 統合: TimescaleDB
- クラウド管理: AWS Timestream, InfluxDB Cloud
時系列データは IoT、FinTech、DevOps など幅広い分野で爆発的に増加している。適切な TSDB の選択と設計は、システムの性能とコストに直結する重要な技術決定だ。
参考資料
- InfluxData. "InfluxDB Documentation - Data Storage Internals" https://docs.influxdata.com/
- Timescale. "TimescaleDB Whitepaper - High-Performance Time-Series SQL" https://www.timescale.com/
- Prometheus Authors. "Prometheus Documentation - Storage Layer" https://prometheus.io/docs/
- QuestDB Team. "QuestDB Technical Overview" https://questdb.io/docs/
- Facebook Engineering. "Gorilla: A Fast, Scalable, In-Memory Time Series Database" (2015)
- Codd, E.F. "Time-Series Data Management: Concepts and Techniques" (2020)
- AWS. "Amazon Timestream Developer Guide" https://docs.aws.amazon.com/timestream/
- Grafana Labs. "Grafana Time Series Visualization" https://grafana.com/docs/
- 日本時系列データサイエンス協会. 「時系列データ解析の基礎」(2024)
- Martin Kleppmann. "Designing Data-Intensive Applications" Chapter 11 (O'Reilly, 2017)
免責事項 — 掲載情報は執筆時点のものです。料金・機能は変更される場合があります。最新情報は各公式サイトをご確認ください。