TL;DR
google-cloud-datastoreの代わりにCloud Datastore’s helper methodを使用する。
より詳しく
Apache Beam pipelineのインスタンスを作る直前にCloud Datastoreからデータを取得したいと思い ReadFromDatastore
を Cloud Dataflow上のapache_beam.io.gcp.datastore.v1.datastoreio`
から取得したいと思いました。そこで最初にCloud Datastoreからデータを読み込むために google-cloud-datastore を使いましたが、 ReadFromData と google-cloud-datastore の両方のライブラリをインポートしているため Protocol Buffer のコンフリクトエラーが出てしまいました。
Traceback (most recent call last):
...
TypeError: Couldn't build proto file into descriptor pool!
Invalid proto descriptor for file "google/cloud/datastore_v1/proto/entity.proto":
google.datastore.v1.PartitionId.project_id: "google.datastore.v1.PartitionId.project_id" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
google.datastore.v1.PartitionId.namespace_id: "google.datastore.v1.PartitionId.namespace_id" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
google.datastore.v1.PartitionId: "google.datastore.v1.PartitionId" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
google.datastore.v1.Key.partition_id: "google.datastore.v1.Key.partition_id" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
google.datastore.v1.Key.path: "google.datastore.v1.Key.path" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
google.datastore.v1.Key.PathElement.id_type: "google.datastore.v1.Key.PathElement.id_type" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
google.datastore.v1.Key.PathElement.kind: "google.datastore.v1.Key.PathElement.kind" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
google.datastore.v1.Key.PathElement.id: "google.datastore.v1.Key.PathElement.id" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
google.datastore.v1.Key.PathElement.name: "google.datastore.v1.Key.PathElement.name" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
google.datastore.v1.Key.PathElement: "google.datastore.v1.Key.PathElement" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
google.datastore.v1.Key: "google.datastore.v1.Key" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
google.datastore.v1.ArrayValue.values: "google.datastore.v1.ArrayValue.values" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
google.datastore.v1.ArrayValue: "google.datastore.v1.ArrayValue" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
google.datastore.v1.Value.value_type: "google.datastore.v1.Value.value_type" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
google.datastore.v1.Value.null_value: "google.datastore.v1.Value.null_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
google.datastore.v1.Value.boolean_value: "google.datastore.v1.Value.boolean_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
google.datastore.v1.Value.integer_value: "google.datastore.v1.Value.integer_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
google.datastore.v1.Value.double_value: "google.datastore.v1.Value.double_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
google.datastore.v1.Value.timestamp_value: "google.datastore.v1.Value.timestamp_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
google.datastore.v1.Value.key_value: "google.datastore.v1.Value.key_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
google.datastore.v1.Value.string_value: "google.datastore.v1.Value.string_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
google.datastore.v1.Value.blob_value: "google.datastore.v1.Value.blob_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
google.datastore.v1.Value.geo_point_value: "google.datastore.v1.Value.geo_point_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
google.datastore.v1.Value.entity_value: "google.datastore.v1.Value.entity_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
google.datastore.v1.Value.array_value: "google.datastore.v1.Value.array_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
google.datastore.v1.Value.meaning: "google.datastore.v1.Value.meaning" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
google.datastore.v1.Value.exclude_from_indexes: "google.datastore.v1.Value.exclude_from_indexes" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
google.datastore.v1.Value: "google.datastore.v1.Value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
google.datastore.v1.Entity.key: "google.datastore.v1.Entity.key" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
google.datastore.v1.Entity.properties: "google.datastore.v1.Entity.properties" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
google.datastore.v1.Entity.PropertiesEntry.key: "google.datastore.v1.Entity.PropertiesEntry.key" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
google.datastore.v1.Entity.PropertiesEntry.value: "google.datastore.v1.Entity.PropertiesEntry.value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
google.datastore.v1.Entity.PropertiesEntry: "google.datastore.v1.Entity.PropertiesEntry" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
google.datastore.v1.Entity: "google.datastore.v1.Entity" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
google.datastore.v1.Key.partition_id: "google.datastore.v1.PartitionId" seems to be defined in "google/cloud/proto/datastore/v1/entity.proto", which is not imported by "google/cloud/datastore_v1/proto/entity.proto". To use it here, please add the necessary import.
google.datastore.v1.Key.path: "google.datastore.v1.Key.PathElement" seems to be defined in "google/cloud/proto/datastore/v1/entity.proto", which is not imported by "google/cloud/datastore_v1/proto/entity.proto". To use it here, please add the necessary import.
google.datastore.v1.ArrayValue.values: "google.datastore.v1.Value" seems to be defined in "google/cloud/proto/datastore/v1/entity.proto", which is not imported by "google/cloud/datastore_v1/proto/entity.proto". To use it here, please add the necessary import.
google.datastore.v1.Value.key_value: "google.datastore.v1.Key" seems to be defined in "google/cloud/proto/datastore/v1/entity.proto", which is not imported by "google/cloud/datastore_v1/proto/entity.proto". To use it here, please add the necessary import.
google.datastore.v1.Value.entity_value: "google.datastore.v1.Entity" seems to be defined in "google/cloud/proto/datastore/v1/entity.proto", which is not imported by "google/cloud/datastore_v1/proto/entity.proto". To use it here, please add the necessary import.
google.datastore.v1.Value.array_value: "google.datastore.v1.ArrayValue" seems to be defined in "google/cloud/proto/datastore/v1/entity.proto", which is not imported by "google/cloud/datastore_v1/proto/entity.proto". To use it here, please add the necessary import.
google.datastore.v1.Entity.PropertiesEntry.value: "google.datastore.v1.Value" seems to be defined in "google/cloud/proto/datastore/v1/entity.proto", which is not imported by "google/cloud/datastore_v1/proto/entity.proto". To use it here, please add the necessary import.
google.datastore.v1.Entity.key: "google.datastore.v1.Key" seems to be defined in "google/cloud/proto/datastore/v1/entity.proto", which is not imported by "google/cloud/datastore_v1/proto/entity.proto". To use it here, please add the necessary import.
google.datastore.v1.Entity.properties: "google.datastore.v1.Entity.PropertiesEntry" seems to be defined in "google/cloud/proto/datastore/v1/entity.proto", which is not imported by "google/cloud/datastore_v1/proto/entity.proto". To use it here, please add the necessary import.
エラーを回避するために、google-cloud-datastoreではなくApache Beamにビルトインされているライブラリを使いました。
以下のサンプルコードでは apache_beam.io.gcp.datastore.v1.datastoreio
と google.cloud
パッケージを両方使用しているためProtocol Buffersがコンフリクトします。
import apache_beam as beam
from apache_beam.io.gcp.datastore.v1.datastoreio import ReadFromDatastore
from google.cloud import datastore
class MyPipeline():
def _fetch_data(self):
"""Fetch data from Cloud Datastore."""
ds = datastore.Client('my-project')
query = ds.query(kind='my-kind')
query_iterator = query.fetch()
entities = list(map(self.from_datastore, next(query_iterator.pages)))
return entities
def run(self):
"""Run pipeline."""
entities = self._fetch_data()
pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
(p
| 'Read data' >> beam.io.ReadFromDatastore('my-project')
| 'Do some process' >> beam.io.DoFn(DoSomeProcess(entities)))
p.run()
result.wait_until_finish()
if __name__ == '__main__':
MyPipeline().run()
そして、以下のサンプルコードではApache BeamにビルトインされているCloud Datastoreのヘルパーメソッドを使用しています。 google.cloud
パッケージは使用していません。
import apache_beam as beam
from apache_beam.io.gcp.datastore.v1.datastoreio import ReadFromDatastore
from apache_beam.io.gcp.datastore.v1.helper import fetch_entities, get_datastore, query_pb2
class MyPipeline():
def _fetch_data(self):
"""Fetch data from Cloud Datastore."""
query = query_pb2.Query()
query.kind.add().name = 'my-kind'
entities = fetch_entities('my-project', None, query, get_datastore('my-project'))
def entity_to_dict(entity):
image_features = map(lambda a: a.double_value, entity.properties['image_feature'].array_value.values)
id = entity.key.path[0].id
color = entity.properties['color'].string_value
return {'id': id, 'color': color}
return map(entity_to_dict, entities)
def run(self):
"""Run pipeline."""
entities = self._fetch_data()
pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
(p
| 'Read data' >> beam.io.ReadFromDatastore('my-project')
| 'Do some process' >> beam.io.DoFn(DoSomeProcess(entities)))
p.run()
result.wait_until_finish()
if __name__ == '__main__':
MyPipeline().run()