OpenWeatherMapで取得したデータをDynamoDBに保存します。 Pythonでコードを書き、AWS Lambdaを使って定期的に気象情報を保存することを目的としています。 DynamoDBの操作はAWSのSDKであるBoto3を利用しました。
OpenWeatherMapで都市名から気象情報を取得するの続きです。
ファイルの権限を変更できない。Googleドライブのシンボリックリンクが原因でした こんなこともあったので環境を少し変えています。
今回はローカルからBoto3でデータを保存するところまで。
環境
- Mac
- Python 3.6
- Boto3 : v1.9.86
- Requests : v2.21
Requestsはインストール済みとします。
Boto3
AWS謹製のPythonモジュール。 Boto3を利用するとAmazon S3、Amazon EC2、Amazon DynamoDB などAWSの各種サービスと容易に統合できます。
ドキュメントはこちら
インストール方法
pipでインストールできます。
$ pip install boto3
DynamoDBにテーブルを用意する
DynamoDBはNoSQLです。 PostgreSQLなどのリレーショナルDBとデータの持ち方が違うため、まずそれ用のテーブルの設計をします。
テーブル設計
保存したいのは気象情報の時系列データです。 また、複数の観測点がありそれぞれ都市名が付いています。
よって、プライマリパーティションキーをcity_name (文字列)
、プライマリソートキーをtimestamp (数値)
にしました。
これで、都市ごとのとある期間のデータをまとめて取得できるはず。
その他の項目は自由に追加できます。
気象データは項目data
にJSON形式でぶち込む予定です。
登録してみた香港の気象データ。 dataの中身は変更があると思うので適当。
テーブル名はcurrent-weather
とした。
scanなどテーブル全体の操作で無駄を減らすため、月ごとにテーブルを分けていく。
年月をつけてcurrent-weather.201901
とした。
テーブルを作成する
設計を基にDynamoDBに気象情報を溜め込むテーブルを作成します。 リージョンは**アジアパシフィック(東京)**にしています。
ダッシュボードからテーブルを作成する。
テーブル名、プライマーキーを設定する。 ソートキーとしてtimestampを登録した。
AWSのアクセスキーを取得する。
セキュリティのため、AWSの各種サービスの操作にはアクセスキーが必要です。
DynamoDBの読み書きやテーブルの作成するためAmazonDynamoDBFullAccess
を持ったユーザーのアクセスキーを取得します。
マスターユーザーで試すことは良いですが、アクセスキーが流出したときえらいことになるので権限を絞ったユーザーを使います。
サービスのIAMから、左メニューの「ユーザー」→「ユーザーを追加」から気象情報をDynamoDBに書き込むユーザーを追加します。
名前をweatherAPI
として「プログラムによるアクセス」をチェックする。
「既存のポリシーを直接アタッチ」を選んで、AmazonDynamoDBFullAccess
を探してチェックします。
データを読むだけならAmazonDynamoDBReadOnlyAccess
があります。
必要であればタグを付け、進めるとアクセスキーが表示されます。 アクセスキーIDとシークレットアクセスキーを保存してください。 シークレットアクセスキーはここでしか見ることができない(はず)です。 csvも保存できます。
くれぐれも、GitHubにアクセスキーを上げないよう気をつけてください。
※ 正直このセキュリティ関連は詳しくないため、公式や他のサイトも参考にしてください。
プログラムからデータを保存する
やっと本題。 気象情報を取得する部分は前回作成したコードを使います。
全体像は、定期的に気象情報を取得して保存するプログラム。
- 複数の都市名をキーに現在の気象情報を取得する。
- データを整形する。
- 都市名をパーティションキーにしてDynamoDBに保存する。
※ テーブルを月ごとに分けるよう設計していますが、テーブルの作成はまだ機能していません。
以下のソースコード部分を繋いで一つのファイルにすると実行できます。
使用するモジュール
"""
OpenWeatherMapから現在の気象データを取得してDynamoDBに保存する。
"""
import boto3 # AWSのSDK
import requests # OpenWeatheMapのAPI取得
import json
import datetime
import decimal # DynamoDB用にFloat型をDecimal型に変換する
boto3をインポートする。 DynamoDBにFloat型がないためDecimal型に変換するようにdecimalを利用する。decimalは標準モジュール。
気象情報の取得
詳細は前回を見てください。
def getWheather(city_name):
API_KEY = "xxxxxxxxxxxxxxxxxxxxxxxx" # xxxにOWMのAPI Keyを入力。
api = "http://api.openweathermap.org/data/2.5/weather?units=metric&q={city}&APPID={key}"
url = api.format(city=city_name, key=API_KEY)
print(url)
response = requests.get(url)
# APIレスポンスの表示
jsonText = json.dumps(response.json(), indent=2)
print(jsonText)
return response.json(parse_float=decimal.Decimal) # dict型に変換する際にFloatをDecimalにする
Requestsで受け取ったresponseをdict型にする際、Float型をDecimal型にします。 そうしないと以下のエラーが出ます。
TypeError: Float types are not supported. Use Decimal types instead.
参考:DynamoDB x Python / Decimal を登録する
データを整形する
DynamoDBのテーブルに合わせてデータを整形する。
def formatter(response):
data = response
# 表示用時刻
unix = data["dt"]
now = datetime.datetime.fromtimestamp(unix)
# 保存するデータを作成
item = {
'city_name': data['name'], # プライマリパーティションキー
'timestamp': data["dt"], # プライマリソートキー
'datetime': now.strftime("%Y-%m-%dT%H:%M:%S"),
'latitude': data['coord']['lat'],
'longitude': data['coord']['lon'],
'data': {
'weather': data['weather'],
'temp': data['main']['temp'],
'humidity': data['main']['humidity'],
'pressure': data['main']['pressure'],
'clouds': data['clouds']['all'],
'wind': data['wind']
}
}
return item
itemの形式で保存される。 timestampはUNIX時間。
DynamoDBへ登録する
前半にDynamoDBと接続をして、後半にitemを保存している。
先ほど登録したAWSのユーザーのアクセスキーを渡す。 コードに書きたくない場合は、設定ファイルや環境変数に登録して使うと良い。
def insert(items):
# データベース接続の初期化
session = boto3.session.Session(
region_name='DBのリージョン',
aws_access_key_id='アクセスキーID',
aws_secret_access_key='シークレットアクセスキー'
)
dynamodb = session.resource('dynamodb')
# tableのパーティション用
now = datetime.datetime.now()
ym = now.strftime("%Y%m")
# テーブルと接続
table_name = 'current-weather.' + ym
table = dynamodb.Table(table_name)
for item in items:
# 追加する
response = table.put_item(
TableName=table_name,
Item=item
)
if response['ResponseMetadata']['HTTPStatusCode'] is not 200:
# 失敗処理
print(response)
else:
# 成功処理
print('Successed :', item['city_name'])
return
table.put_item()
ではなくtable..batch_writer()
を使い一括で登録しても良さそう。
本体部分
取得する都市の名前をリストで持ち、順次処理していく。
def weather_api(cities):
items = []
for city_name in cities:
response = getWheather(city_name)
if response['cod'] is not 200:
print(city_name, ', status code :',
response['cod'], response['sys']['message'])
continue
item = formatter(response)
print(item)
items.append(item)
insert(items)
return True
def lambda_handler(event, context):
# 都市と送信先を指定する。
# OWMで指定可能な都市名は以下で確認する。
# https://openweathermap.org/weathermap?basemap=map&cities=true&layer=temperature&lat=44.0079&lon=144.2487&zoom=12
cities = [
'Sapporo',
'Tokyo',
'Osaka-shi',
'Okinawa',
'Hong Kong',
'New York',
'City of London'
]
weather_api(cities)
return
if __name__ == "__main__":
lambda_handler(None, None)
終わりに
無事にDynamoDBに気象データを保存することができました。 続いてはAWS Lambdaにあげて定期的に取得するように自動化していきます。