codingecho

日々の体験などを書いてます

Google Cloud Dataflow上でReadFromDatastoreを使わずにCloud Datastoreからデータを取得する

TL;DR

google-cloud-datastoreの代わりにCloud Datastore’s helper methodを使用する。

より詳しく

Apache Beam pipelineのインスタンスを作る直前にCloud Datastoreからデータを取得したいと思い ReadFromDatastore を Cloud Dataflow上のapache_beam.io.gcp.datastore.v1.datastoreio` から取得したいと思いました。そこで最初にCloud Datastoreからデータを読み込むために google-cloud-datastore を使いましたが、 ReadFromDatagoogle-cloud-datastore の両方のライブラリをインポートしているため Protocol Buffer のコンフリクトエラーが出てしまいました。

Traceback (most recent call last):
...
TypeError: Couldn't build proto file into descriptor pool!
Invalid proto descriptor for file "google/cloud/datastore_v1/proto/entity.proto":
  google.datastore.v1.PartitionId.project_id: "google.datastore.v1.PartitionId.project_id" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.PartitionId.namespace_id: "google.datastore.v1.PartitionId.namespace_id" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.PartitionId: "google.datastore.v1.PartitionId" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Key.partition_id: "google.datastore.v1.Key.partition_id" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Key.path: "google.datastore.v1.Key.path" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Key.PathElement.id_type: "google.datastore.v1.Key.PathElement.id_type" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Key.PathElement.kind: "google.datastore.v1.Key.PathElement.kind" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Key.PathElement.id: "google.datastore.v1.Key.PathElement.id" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Key.PathElement.name: "google.datastore.v1.Key.PathElement.name" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Key.PathElement: "google.datastore.v1.Key.PathElement" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Key: "google.datastore.v1.Key" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.ArrayValue.values: "google.datastore.v1.ArrayValue.values" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.ArrayValue: "google.datastore.v1.ArrayValue" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.value_type: "google.datastore.v1.Value.value_type" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.null_value: "google.datastore.v1.Value.null_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.boolean_value: "google.datastore.v1.Value.boolean_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.integer_value: "google.datastore.v1.Value.integer_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.double_value: "google.datastore.v1.Value.double_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.timestamp_value: "google.datastore.v1.Value.timestamp_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.key_value: "google.datastore.v1.Value.key_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.string_value: "google.datastore.v1.Value.string_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.blob_value: "google.datastore.v1.Value.blob_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.geo_point_value: "google.datastore.v1.Value.geo_point_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.entity_value: "google.datastore.v1.Value.entity_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.array_value: "google.datastore.v1.Value.array_value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.meaning: "google.datastore.v1.Value.meaning" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value.exclude_from_indexes: "google.datastore.v1.Value.exclude_from_indexes" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Value: "google.datastore.v1.Value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Entity.key: "google.datastore.v1.Entity.key" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Entity.properties: "google.datastore.v1.Entity.properties" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Entity.PropertiesEntry.key: "google.datastore.v1.Entity.PropertiesEntry.key" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Entity.PropertiesEntry.value: "google.datastore.v1.Entity.PropertiesEntry.value" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Entity.PropertiesEntry: "google.datastore.v1.Entity.PropertiesEntry" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Entity: "google.datastore.v1.Entity" is already defined in file "google/cloud/proto/datastore/v1/entity.proto".
  google.datastore.v1.Key.partition_id: "google.datastore.v1.PartitionId" seems to be defined in "google/cloud/proto/datastore/v1/entity.proto", which is not imported by "google/cloud/datastore_v1/proto/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.Key.path: "google.datastore.v1.Key.PathElement" seems to be defined in "google/cloud/proto/datastore/v1/entity.proto", which is not imported by "google/cloud/datastore_v1/proto/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.ArrayValue.values: "google.datastore.v1.Value" seems to be defined in "google/cloud/proto/datastore/v1/entity.proto", which is not imported by "google/cloud/datastore_v1/proto/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.Value.key_value: "google.datastore.v1.Key" seems to be defined in "google/cloud/proto/datastore/v1/entity.proto", which is not imported by "google/cloud/datastore_v1/proto/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.Value.entity_value: "google.datastore.v1.Entity" seems to be defined in "google/cloud/proto/datastore/v1/entity.proto", which is not imported by "google/cloud/datastore_v1/proto/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.Value.array_value: "google.datastore.v1.ArrayValue" seems to be defined in "google/cloud/proto/datastore/v1/entity.proto", which is not imported by "google/cloud/datastore_v1/proto/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.Entity.PropertiesEntry.value: "google.datastore.v1.Value" seems to be defined in "google/cloud/proto/datastore/v1/entity.proto", which is not imported by "google/cloud/datastore_v1/proto/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.Entity.key: "google.datastore.v1.Key" seems to be defined in "google/cloud/proto/datastore/v1/entity.proto", which is not imported by "google/cloud/datastore_v1/proto/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.Entity.properties: "google.datastore.v1.Entity.PropertiesEntry" seems to be defined in "google/cloud/proto/datastore/v1/entity.proto", which is not imported by "google/cloud/datastore_v1/proto/entity.proto".  To use it here, please add the necessary import.

エラーを回避するために、google-cloud-datastoreではなくApache Beamにビルトインされているライブラリを使いました。

以下のサンプルコードでは apache_beam.io.gcp.datastore.v1.datastoreiogoogle.cloud パッケージを両方使用しているためProtocol Buffersがコンフリクトします。

import apache_beam as beam
from apache_beam.io.gcp.datastore.v1.datastoreio import ReadFromDatastore
from google.cloud import datastore


class MyPipeline():

    def _fetch_data(self):
        """Fetch data from Cloud Datastore."""
        ds = datastore.Client('my-project')
        query = ds.query(kind='my-kind')
        query_iterator = query.fetch()
        entities = list(map(self.from_datastore, next(query_iterator.pages)))

        return entities

    def run(self):
        """Run pipeline."""
        entities = self._fetch_data()

        pipeline_options = PipelineOptions()
        p = beam.Pipeline(options=pipeline_options)

        (p
         | 'Read data' >> beam.io.ReadFromDatastore('my-project')
         | 'Do some process' >> beam.io.DoFn(DoSomeProcess(entities)))

        p.run()
        result.wait_until_finish()

if __name__ == '__main__':
    MyPipeline().run()

そして、以下のサンプルコードではApache BeamにビルトインされているCloud Datastoreのヘルパーメソッドを使用しています。 google.cloud パッケージは使用していません。

import apache_beam as beam
from apache_beam.io.gcp.datastore.v1.datastoreio import ReadFromDatastore
from apache_beam.io.gcp.datastore.v1.helper import fetch_entities, get_datastore, query_pb2

class MyPipeline():

    def _fetch_data(self):
        """Fetch data from Cloud Datastore."""
        query = query_pb2.Query()
        query.kind.add().name = 'my-kind'
        entities = fetch_entities('my-project', None, query, get_datastore('my-project'))

        def entity_to_dict(entity):
            image_features = map(lambda a: a.double_value, entity.properties['image_feature'].array_value.values)
            id = entity.key.path[0].id
            color = entity.properties['color'].string_value
            return {'id': id, 'color': color}

        return map(entity_to_dict, entities)

    def run(self):
        """Run pipeline."""
        entities = self._fetch_data()

        pipeline_options = PipelineOptions()
        p = beam.Pipeline(options=pipeline_options)

        (p
         | 'Read data' >> beam.io.ReadFromDatastore('my-project')
         | 'Do some process' >> beam.io.DoFn(DoSomeProcess(entities)))

        p.run()
        result.wait_until_finish()

if __name__ == '__main__':
    MyPipeline().run()