Apache Beam
Apache Beam is an open source, unified model for defining both batch and streaming data-parallel processing pipelines. Using one of the open source Beam SDKs, you build a program that defines the pipeline. The pipeline is then executed by one of Beam’s supported distributed processing back-ends, which include Apache Flink, Apache Spark, and Google Cloud Dataflow.
Prerequisite
To use Apache Beam, ensure that you have YugabyteDB up and running. Download and install YugabyteDB by following the steps in Quick start.
Setup
To run Apache Beam with YugabyteDB, do the following:
-
Create a folder
apache-beam-test
as follows:mkdir apache-beam-test && cd apache-beam-test
-
Create a python virtual environment and activate it as follows:
python3 -m venv myenv source myenv/bin/activate
-
Install the latest Apache Beam Python SDK from Pypi using the following command:
pip install beam-nuggets
-
Create a python file,
democode.py
and add the following code to it:import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from beam_nuggets.io import relational_db records = [ {'name': 'Jan', 'num': 1}, {'name': 'Feb', 'num': 2}, {'name': 'Mar', 'num': 3}, {'name': 'Apr', 'num': 4}, {'name': 'May', 'num': 5}, ] source_config = relational_db.SourceConfiguration( drivername='postgresql', #postgresql+pg8000 host='127.0.0.1', port=5433, username=’yugabyte’, password='yugabyte', database='yugabyte', create_if_missing=True # create the database if not there ) table_config = relational_db.TableConfiguration( name='months_col01', create_if_missing=True, primary_key_columns=['num'] ) with beam.Pipeline(options=PipelineOptions()) as p: months = p | "Reading month records" >> beam.Create(records) months | 'Writing to DB table' >> relational_db.Write( source_config=source_config, table_config=table_config ) if __name__ == "__main__": print('demo code ran successful')
-
Run
democode.py
as follows:python democode.py
demo code ran successful
-
You can also verify the changes from a ysql shell as follows:
select * from month_col01;
You can see the following output:
num | name -----+------ 5 | May 1 | Jan 4 | Apr 2 | Feb 3 | Mar (5 rows)