New Year Special : Self-Learning Courses: Get any course for just $49!  - SCHEDULE CALL

- AWS Blogs -

What is AWS Data Pipeline? AWS Data Pipeline Tutorial Guide



What is the AWS Data Pipeline?

In any real-world application, data needs to flow across several stages and services. In the Amazon Cloud environment, AWS Data Pipeline service makes this data flow possible between these different services. It enables the automation of data-driven workflows Before you begin learning about the AWS Data Pipeline, consider joining an AWS Certification Course to get exposed to a well-rounded knowledge of AWS and boost your career to the next level.

Getting started with AWS Data Pipeline

AWS Data Pipelines consists of the following basic components:

  • DataNodes
  • Activities

AWS Data Pipeline DataNodes – represent data stores for input and output data. DataNodes can be of various types depending on the backend AWS Service used for data storage. Examples include:

  • DynamoDBDataNode
  • SqlDataNode
  • RedShiftDataNode
  • S3DataNode

Activities – specify the task to be performed on data stored in datanodes. Different types of activities are provided depending on the application. These include:gain a thorough knowledge of AWS data pipeline and other topics related to Cloud computing through a Cloud Computing Certification.

  • CopyActivity
  • RedShiftCopyActivity
  • SqlActivity
  • ShellCommandActivity
  • EmrActivity
  • HiveActivity
  • HiveCopyActivity
  • PigActivity

The Activities are executed on their respective Resources, namely EC2Resource or EmrCluster depending on the nature of the activity.

AWS Data Pipelines can be created using the Data Pipeline console (https://console.aws.amazon.com/datapipeline/)

Creating a basic AWS Pipeline in the console involves the following steps:

  • Select ‘Region’ from the dropdown
  • Enter the Name for the pipeline
  • Enter a Description
  • If you are using a template for building, select the template name in the ‘Source’ segment.
  • Specify the Parameters for the selected template.
  • Configure the schedule for pipeline execution including repetition frequency and termination criteria.
  • Specify ‘Pipeline Configuration’ parameters and Security details.
  • Activate the pipeline. The pipeline definition gets saved in a JSON based format called ‘pipeline definition file.’
  • The monitor runtime behavior of pipeline components on the console
  • Once the pipeline run completes, the generated output can be analyzed from the target DataNode.

AWS Curriculum

JanBask Training offers one of the best-known AWS Certification Courses to help you get in-depth knowledge of cloud computing and begin your career as an AWS professional. Consider opting for it today!

illustrate UseCases

  • Process input data stored in S3 bucket and store it in an AWS RDS database
  • Source data from CSV files in S3 and DynamoDB data on the cloud and create a data warehouse on AWS RedShift
  • Analyze multiple text files on S3 buckets using Hadoop cluster on AWS EMR.

Pipeline definition File  Samples

Sample 1:


{
  "objects": [
    {
      "myComment" : "Activity to run the hive script to export data to CSV",
      "output": {
        "ref": "DataNodeId_pnzAW"
      },
      "input": {
        "ref": "DataNodeId_cERqb"
      },
      "name": "TableBackupActivity",
      "hiveScript": "DROP TABLE IF EXISTS tempTable;\n\nDROP TABLE IF EXISTS s3Table;\n\nCREATE EXTERNAL TABLE tempTable (#{myS3ColMapping})\nSTORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' \nTBLPROPERTIES (\"dynamodb.table.name\" = \"#{dynamoTableName}\", \"dynamodb.column.mapping\" = \"#{dynamoTableColMapping}\");\n                    \nCREATE EXTERNAL TABLE s3Table (#{myS3ColMapping})\nROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\\n'\nLOCATION '#{targetS3Bucket}/#{format(@scheduledStartTime, 'YYYY-MM-dd-HH-mm-ss')}';\n                    \nINSERT OVERWRITE TABLE s3Table SELECT * FROM tempTable;",
      "runsOn": { "ref" : "EmrCluster1
" },
      "id": "TableBackupActivity",
      "type": "HiveActivity"
    },
    {
      "period": "1 days",
      "name": "Every 1 day",
      "id": "DefaultSchedule",
      "type": "Schedule",
      "startAt": "FIRST_ACTIVATION_DATE_TIME"
    },
    {
      "myComment" : "The DynamoDB table from which data is exported",
      "dataFormat": {
        "ref": "dynamoExportFormat"
      },
      "name": "DynamoDB",
      "id": "DataNodeId_cERqb",
      "type": "DynamoDBDataNode",
      "tableName": "#{dynamoTableName}"
    },
    {
      "failureAndRerunMode": "CASCADE",
      "schedule": {
        "ref": "DefaultSchedule"
      },
      "resourceRole": "DataPipelineDefaultResourceRole",
      "role": "DataPipelineDefaultRole",
      "pipelineLogUri": "#{logUri}",
      "scheduleType": "cron",
      "name": "Default",
      "id": "Default"
    },
    {
      "name": "EmrCluster1",
      "coreInstanceType": "m1.medium",
      "coreInstanceCount": "1",
      "masterInstanceType": "m1.medium",
      "amiVersion": "3.3.2",
      "id": "EmrCluster1",
      "type": "EmrCluster",
      "terminateAfter": "1 Hour"
    },
    {
      "myComment" : "The S3 path to which we export data to",
      "directoryPath": "#{targetS3Bucket}/#{format(@scheduledStartTime, 'YYYY-MM-dd-HH-mm-ss')}/",
      "dataFormat": {
        "ref": "DataFormatId_xqWRk"
      },
      "name": "S3DataNode",
      "id": "DataNodeId_pnzAW",
      "type": "S3DataNode"
    },
    {
      "myComment" : "Format for the S3 Path",
      "name": "DefaultDataFormat1",
      "column": "not_used STRING",
      "id": "DataFormatId_xqWRk",
      "type": "CSV"
    },
    {
      "myComment" : "Format for the DynamoDB table",
      "name": "dynamoExportFormat",
      "id": "dynamoExportFormat",
      "column": "not_used STRING",
      "type": "DynamoDBExportDataFormat"
    }
  ],
  "parameters": [
    {
      "description": "Output S3 folder",
      "id": "targetS3Bucket",
      "type": "AWS::S3::ObjectKey"
    },
    {
      "description": "DynamoDB table name",
      "id": "dynamoTableName",
      "type": "String"
    },
    {
      "description": "S3 to DynamoDB Column Mapping",
      "id": "dynamoTableColMapping",
      "type": "String"
    },
    {
      "description": "S3 Column Mappings",
      "id": "myS3ColMapping",
      "type": "String"
    },
    {
      "description": "DataPipeline Log Uri",
      "id": "logUri",
      "type": "String"
    }
  ]
}

Sample 2:


{
  "objects": [
    {
      "myComment" : "Activity used to run hive script to import CSV data",
      "output": {
        "ref": "DataNodeId_pnzAW"
      },
      "input": {
        "ref": "DataNodeId_cERqb"
      },
      "name": "TableRestoreActivity",
      "hiveScript": "DROP TABLE IF EXISTS tempTable;\n\nDROP TABLE IF EXISTS s3Table;\n\nCREATE EXTERNAL TABLE tempTable (#{dynamoColDefn})\nSTORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' \nTBLPROPERTIES (\"dynamodb.table.name\" = \"#{dynamoTableName}\", \"dynamodb.column.mapping\" = \"#{dynamoTableColMapping}\");\n                    \nCREATE EXTERNAL TABLE s3Table (#{myS3ColMapping})\nROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\\\n' LOCATION '#{sourceS3Bucket}';\n                    \nINSERT OVERWRITE TABLE tempTable SELECT * FROM s3Table;",
      "id": "TableRestoreActivity",
      "runsOn": { "ref" : "EmrClusterForRestore" },
      "stage": "false",
      "type": "HiveActivity"
    },
    {
      "myComment" : "The DynamoDB table from which we need to import data from",
      "dataFormat": {
        "ref": "DDBExportFormat"
      },
      "name": "DynamoDB",
      "id": "DataNodeId_cERqb",
      "type": "DynamoDBDataNode",
      "tableName": "#{dynamoTableName}"
    },
    {
      "failureAndRerunMode": "CASCADE",
      "resourceRole": "DataPipelineDefaultResourceRole",
      "role": "DataPipelineDefaultRole",
      "pipelineLogUri": "#{logUri}",
      "scheduleType": "ONDEMAND",
      "name": "Default",
      "id": "Default"
    },
    {
      "name": "EmrClusterForRestore",
      "coreInstanceType": "m1.medium",
      "coreInstanceCount": "1",
      "masterInstanceType": "m1.medium",
      "releaseLabel": "emr-4.4.0",
      "id": "EmrClusterForRestore",
      "type": "EmrCluster",
      "terminateAfter": "1 Hour"
    },
    {
      "myComment" : "The S3 path from which data is imported",
      "directoryPath": "#{sourceS3Bucket}",
      "dataFormat": {
        "ref": "DataFormatId_xqWRk"
      },
      "name": "S3DataNode",
      "id": "DataNodeId_pnzAW",
      "type": "S3DataNode"
    },
    {
      "myComment" : "Format for the S3 Path",
      "name": "DefaultDataFormat1",
      "column": "not_used STRING",
      "id": "DataFormatId_xqWRk",
      "type": "CSV"
    },
    {
      "myComment" : "Format for the DynamoDB table",
      "name": "DDBExportFormat",
      "id": "DDBExportFormat",
      "column": "not_used STRING",
      "type": "DynamoDBExportDataFormat"
    }
  ],
  "parameters": [
    {
      "description": "Input S3 folder",
      "id": "sourceS3Bucket",
      "default": "s3://aws-datapipeline-csv/",
      "type": "AWS::S3::ObjectKey"
    },
    {
      "description": "DynamoDB table name",
      "id": "dynamoTableName",
      "type": "String"
    },
    {
      "description": "S3 to DynamoDB Column Mapping",
      "id": "dynamoTableColMapping",
      "default" : "id:id,age:age,job:job,education:education ",
      "type": "String"
    },
    {
      "description": "S3 Column Mappings",
      "id": "myS3ColMapping",
      "default" : "id string,age int,job string,education string",
      "type": "String"
    },
    {
      "description": "DynamoDB Column Mappings",
      "id": "dynamoColDefn",
      "default" : "id string,age bigint,job string, education string ",
      "type": "String"
    },
    {
      "description": "DataPipeline Log Uri",
      "id": "logUri",
      "type": "AWS::S3::ObjectKey"
    }
  ]
}

Sample 3:


{
  "objects": [
    {
      "myComment": "Default configuration for objects in the pipeline.",

      "name": "Default",
      "id": "Default",
      "failureAndRerunMode": "CASCADE",
      "schedule": {
        "ref": "DefaultSchedule"
      },
      "resourceRole": "DataPipelineDefaultResourceRole",
      "role": "DataPipelineDefaultRole",
      "pipelineLogUri": "#{logUri}",
      "scheduleType": "cron"
    },
    {
      "myComment": "Connection details for the Redshift cluster.",

      "name": "DefaultDatabase1",
      "id": "DatabaseId_Kw6D8",
      "connectionString": "#{myConnectionString}",
      "databaseName": "#{myRedshiftDatabase}",
      "*password": "#{myRedshiftPassword}",
      "type": "RedshiftDatabase",
      "username": "#{myRedshiftUsername}"
    },
    {
      "myComment": "This object is used to provide the resource where the copy job is invoked.",
      
      "name": "DefaultResource1",
      "id": "ResourceId_idL0Y",
      "resourceRole": "DataPipelineDefaultResourceRole",
      "role": "DataPipelineDefaultRole",
      "type": "Ec2Resource",
      "terminateAfter": "1 Hour"
    },
    {
      "myComment": "This object is used to specify the copy activity for moving data from DynamoDB to Redshift.",

      "name": "CopyFromDDBToRedshift",
      "id": "ActivityId_asVf6",
      "database": {
        "ref": "DatabaseId_Kw6D8"
      },      
      "runsOn": {
        "ref": "ResourceId_idL0Y"
      },
      "type": "SqlActivity",
      "script": "#{myScript}"
    },
    {
      "myComment": "This object is used to control the task schedule.", 

      "name": "RunOnce",
      "id": "DefaultSchedule",
      "occurrences": "1",
      "period": "2 Days",
      "type": "Schedule",
      "startAt": "FIRST_ACTIVATION_DATE_TIME"
    }   
  ],
  "parameters": []
}

Do you know what is AWS Fargate? Here is a comprehensive guide to What is AWS Fargate? Amazon EC2 vs. Amazon Fargate

Advanced Concepts of AWS Data Pipeline

Precondition – A precondition specifies a condition which must evaluate to tru for an activity to be executed. For example Presence of Source Data Table or S3 bucket prior to performing operations on it.

AWS Data Pipeline provides certain prebuilt Precondition elements such as :

  • DynamoDBDataExists
  • DynamoDBTableExists
  • S3KeyExists
  • S3PrefixNotEmpty

Action – Actions are event handlers which are executed in response to pipeline events.

Examples include:

  • SnsAlarm
  • Terminate

AWS Quiz

For example, if a specific pipeline component executes for a time greater than maximum configured value, a SNSAlarm can be sent to the administrator. AWS Data Pipeline provides event handlers on pipeline components such as onSuccess, OnFail, and onLateAction where these actions can be made use of.

Let us take a look at a simple program in R which prints “Hello World.” This can be accomplished either from the command line in the R interpreter or via an R script. Let us look at both mechanisms.

Are you a beginner in AWS? Have a look at this AWS Tutorial for Beginner - A Detailed Guide to Cloud Computing!

Instance and Attempts: When a pipeline is executed, the pipeline components defined in pipeline definition file are compiled to set of executable instances. For fault tolerance, any instance that undergoes failure is retried multiple times as per its configuration limit. These retries are tracked using Attempt objects corresponding to an instance.

AWS Data Pipeline Command Line Interface

Create pipeline:
$ aws datapipeline create-pipeline --name mypipeline --unique-id mypipeline 

 {
     "pipelineId": "df-032468332JABJ6QUWCR6"
 }

Associate pipeline with pipeline definition file:
$ aws datapipeline put-pipeline-definition --pipeline-id df-032468332JABJ6QUWCR6 --pipeline-definition file://mypipeline.json --parameter-values myS3LogsPath="s3://"

 {
     "validationErrors": [],
     "validationWarnings": [],
     "errored": false
 }

Activate pipeline:
$> aws datapipeline activate-pipeline --pipeline-id df-032468332JABJ6QUWCR6

Runtime monitoring of pipeline:
$ aws datapipeline list-runs --pipeline-id df-032468332JABJ6QUWCR6
#          Name                                                Scheduled Start      Status                 
#          ID                                                  Started              Ended              
#   ---------------------------------------------------------------------------------------------------
#      1.  EC2Resource_mypipeline                              2019-04-14T12:51:36  RUNNING                
#          @EC2Resource_mypipeline_2019-04-14T16:51:56         2019-04-14T12:51:39                     
#   
#      2.  ShellCommandActivity_mypipeline                     2019-04-14T12:51:36  WAITING_FOR_RUNNER     
#          @ShellCommandActivity_mypipeline_2019-04-14T16:51:  2019-04-14T12:51:39   

While you are going through the various AWS topics, do not forget to test your AWS aptitude with this fun-filled AWS Quiz.

Diagnostics/Troubleshooting:

AWS CloudTrail captures all API calls for AWS Data Pipeline as events. These events can be streamed to a target S3 bucket by creating a trail from the AWS console. A CloudTrail event represents a single request from any source and includes information about the requested action, the date and time of the action, request parameters, and so on. This information is quite useful when diagnosing a problem on a configured data pipeline at runtime. The log entry format is as follows:


{
  "Records": [
    {
      "eventVersion": "1.0",
      "userIdentity": {
        "type": "Root",
        "principalId": "12120",
        "arn": "arn:aws:iam::user-account-id:root",
        "accountId": "user-account-id",
        "accessKeyId": "user-access-key"
      },
      "eventTime": "2019-04-13T19:15:15Z",
      "eventSource": "datapipeline.amazonaws.com",
      "eventName": "CreatePipeline",
      "awsRegion": "us-west-1",
      "sourceIPAddress": "72.21.196.64",
      "userAgent": "aws-cli/1.5.2 Python/2.7.5 Darwin/13.4.0",
      "requestParameters": {
        "name": "demopipeline",
        "uniqueId": "myunique"
      },
      "responseElements": {
        "pipelineId": "df-03265491AP32SAMPLE"
      },
      "requestID": "352ba1e2-6c21-11dd-8816-c3c09bc09c8a",
      "eventID": "9f99dce0-0732-49b6-baaa-e77943195632",
      "eventType": "AwsApiCall",
      "recipientAccountId": "user-account-id"
    }, 
      ...
  ]
}

Also, consider joining our AWS community to learn about the latest trends in AWS and meet like-minded professionals to gain industry insight.

Frequently Asked Questions 

An AWS data pipeline is what?

An online tool called AWS Data Pipeline enables you to regularly process and transfer data across various AWS computing and storage services as well as on-premises data sources.

AWS Data Pipeline: Is it an ETL tool?

You may automate data transfer and transformation with AWS Data Pipeline, an ETL service. Every time an interval is scheduled, an Amazon EMR cluster is started, jobs are submitted to the cluster as steps, and the cluster is shut down once all tasks have been finished

What function does a data pipeline serve?

A network system called a data pipeline enables the transfer of data between a source site and a target location.

What are the features of AWS?

AWS provides an array of features. They are-
i) Flexibility
ii)Cost-effectiveness
iii) Scalability.
iv) Security
v) Elasticity

What are some major cloud products of AWS?

Some of the AWS Cloud products are as follows-
i) Compute
ii) Storage
iii) Database
iv) Analysis
v) Networking.

How important are AWS certifications?

  • AWS certifications are crucial to possess since they: Increase your employable practical abilities.
  • You get a boost when recruiting managers see your portfolio and CV.
  • Increase your chances of being hired over unlicensed AWS architects.
  • Help you demand the salary you want because your AWS certification shows you have demonstrated and qualified skills.
  • Assist you in developing strong confidence when handling AWS solution architect assignments or genuine industry projects.

 In the AWS course, what skills will I learn?

The following is everything you will learn: AWS Cloud Computing, AWS Architecture Identity Access Management & S3 Amazon VPC, DynamoDB, Redshift Configuration Management, Automation, AWS Route 53 Networking, Monitoring, Security Groups, Elastic Compute Cloud (EC2) Databases, Application Services

What are the required skills for AWS?

Technical Skills for AWS Solution Architect jobs

  • Java/Python or C++
  • Networking
  • Data Storage Fundamentals
  • Security Foundations
  • AWS Service Selection
  • Cloud specific patterns & technologies

What are the significant soft skills of AWS professionals?

Here is the list of major soft skills of an AWS professional:

  • Communication Skills
  • Time management skills
  • Flexibility & eagerness to learn
  • Business acumen
  • Ability to stay agile

What is the future scope of AWS professionals?

AWS has over 1 million customers associated with it, which means job opportunities in this field are extravagant.

Conclusion:

AWS Data Pipeline is a powerful service in the Amazon Cloud Environment for creating powerful data workflows while making leveraging all kinds of storage and compute resources available in the ecosystem. Data Pipeline makes it feasible to design big data applications involving several terabytes of data from varied sources to be analysed systematically on the cloud. Any technologist working on data analytics in the cloud space should try to acquire skills related to this service.So, jumpstart your career with an expert-led AWS Certification course that can help you accomplish the most-demanding job profile with ample opportunities and lucrative salaries.

fbicons FaceBook twitterTwitter lingedinLinkedIn pinterest Pinterest emailEmail

     Logo

    JanBask Training

    A dynamic, highly professional, and a global online training course provider committed to propelling the next generation of technology learners with a whole new way of training experience.


  • fb-15
  • twitter-15
  • linkedin-15

Comments

Trending Courses

Cyber Security Course

Cyber Security

  • Introduction to cybersecurity
  • Cryptography and Secure Communication 
  • Cloud Computing Architectural Framework
  • Security Architectures and Models
Cyber Security Course

Upcoming Class

2 days 24 Jan 2025

QA Course

QA

  • Introduction and Software Testing
  • Software Test Life Cycle
  • Automation Testing and API Testing
  • Selenium framework development using Testing
QA Course

Upcoming Class

7 days 29 Jan 2025

Salesforce Course

Salesforce

  • Salesforce Configuration Introduction
  • Security & Automation Process
  • Sales & Service Cloud
  • Apex Programming, SOQL & SOSL
Salesforce Course

Upcoming Class

3 days 25 Jan 2025

Business Analyst Course

Business Analyst

  • BA & Stakeholders Overview
  • BPMN, Requirement Elicitation
  • BA Tools & Design Documents
  • Enterprise Analysis, Agile & Scrum
Business Analyst Course

Upcoming Class

3 days 25 Jan 2025

MS SQL Server Course

MS SQL Server

  • Introduction & Database Query
  • Programming, Indexes & System Functions
  • SSIS Package Development Procedures
  • SSRS Report Design
MS SQL Server Course

Upcoming Class

3 days 25 Jan 2025

Data Science Course

Data Science

  • Data Science Introduction
  • Hadoop and Spark Overview
  • Python & Intro to R Programming
  • Machine Learning
Data Science Course

Upcoming Class

3 days 25 Jan 2025

DevOps Course

DevOps

  • Intro to DevOps
  • GIT and Maven
  • Jenkins & Ansible
  • Docker and Cloud Computing
DevOps Course

Upcoming Class

2 days 24 Jan 2025

Hadoop Course

Hadoop

  • Architecture, HDFS & MapReduce
  • Unix Shell & Apache Pig Installation
  • HIVE Installation & User-Defined Functions
  • SQOOP & Hbase Installation
Hadoop Course

Upcoming Class

9 days 31 Jan 2025

Python Course

Python

  • Features of Python
  • Python Editors and IDEs
  • Data types and Variables
  • Python File Operation
Python Course

Upcoming Class

10 days 01 Feb 2025

Artificial Intelligence Course

Artificial Intelligence

  • Components of AI
  • Categories of Machine Learning
  • Recurrent Neural Networks
  • Recurrent Neural Networks
Artificial Intelligence Course

Upcoming Class

3 days 25 Jan 2025

Machine Learning Course

Machine Learning

  • Introduction to Machine Learning & Python
  • Machine Learning: Supervised Learning
  • Machine Learning: Unsupervised Learning
Machine Learning Course

Upcoming Class

16 days 07 Feb 2025

 Tableau Course

Tableau

  • Introduction to Tableau Desktop
  • Data Transformation Methods
  • Configuring tableau server
  • Integration with R & Hadoop
 Tableau Course

Upcoming Class

9 days 31 Jan 2025

Search Posts

Reset

Receive Latest Materials and Offers on AWS Course

Interviews