28
DecChristmas Special : Upto 40% OFF! + 2 free courses - SCHEDULE CALL
In any real-world application, data needs to flow across several stages and services. In the Amazon Cloud environment, AWS Data Pipeline service makes this data flow possible between these different services. It enables the automation of data-driven workflows Before you begin learning about the AWS Data Pipeline, consider joining an AWS Certification Course to get exposed to a well-rounded knowledge of AWS and boost your career to the next level.
AWS Data Pipelines consists of the following basic components:
DataNodes – represent data stores for input and output data. DataNodes can be of various types depending on the backend AWS Service used for data storage. Examples include:
Activities – specify the task to be performed on data stored in datanodes. Different types of activities are provided depending on the application. These include:gain a thorough knowledge of AWS data pipeline and other topics related to Cloud computing through a Cloud Computing Certification.
The Activities are executed on their respective Resources, namely EC2Resource or EmrCluster depending on the nature of the activity.
AWS Data Pipelines can be created using the Data Pipeline console (https://console.aws.amazon.com/datapipeline/)
Creating a basic AWS Pipeline in the console involves the following steps:
JanBask Training offers one of the best-known AWS Certification Courses to help you get in-depth knowledge of cloud computing and begin your career as an AWS professional. Consider opting for it today!
illustrate UseCases
Pipeline definition File Samples
Sample 1:
{
"objects": [
{
"myComment" : "Activity to run the hive script to export data to CSV",
"output": {
"ref": "DataNodeId_pnzAW"
},
"input": {
"ref": "DataNodeId_cERqb"
},
"name": "TableBackupActivity",
"hiveScript": "DROP TABLE IF EXISTS tempTable;\n\nDROP TABLE IF EXISTS s3Table;\n\nCREATE EXTERNAL TABLE tempTable (#{myS3ColMapping})\nSTORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' \nTBLPROPERTIES (\"dynamodb.table.name\" = \"#{dynamoTableName}\", \"dynamodb.column.mapping\" = \"#{dynamoTableColMapping}\");\n \nCREATE EXTERNAL TABLE s3Table (#{myS3ColMapping})\nROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\\n'\nLOCATION '#{targetS3Bucket}/#{format(@scheduledStartTime, 'YYYY-MM-dd-HH-mm-ss')}';\n \nINSERT OVERWRITE TABLE s3Table SELECT * FROM tempTable;",
"runsOn": { "ref" : "EmrCluster1
" },
"id": "TableBackupActivity",
"type": "HiveActivity"
},
{
"period": "1 days",
"name": "Every 1 day",
"id": "DefaultSchedule",
"type": "Schedule",
"startAt": "FIRST_ACTIVATION_DATE_TIME"
},
{
"myComment" : "The DynamoDB table from which data is exported",
"dataFormat": {
"ref": "dynamoExportFormat"
},
"name": "DynamoDB",
"id": "DataNodeId_cERqb",
"type": "DynamoDBDataNode",
"tableName": "#{dynamoTableName}"
},
{
"failureAndRerunMode": "CASCADE",
"schedule": {
"ref": "DefaultSchedule"
},
"resourceRole": "DataPipelineDefaultResourceRole",
"role": "DataPipelineDefaultRole",
"pipelineLogUri": "#{logUri}",
"scheduleType": "cron",
"name": "Default",
"id": "Default"
},
{
"name": "EmrCluster1",
"coreInstanceType": "m1.medium",
"coreInstanceCount": "1",
"masterInstanceType": "m1.medium",
"amiVersion": "3.3.2",
"id": "EmrCluster1",
"type": "EmrCluster",
"terminateAfter": "1 Hour"
},
{
"myComment" : "The S3 path to which we export data to",
"directoryPath": "#{targetS3Bucket}/#{format(@scheduledStartTime, 'YYYY-MM-dd-HH-mm-ss')}/",
"dataFormat": {
"ref": "DataFormatId_xqWRk"
},
"name": "S3DataNode",
"id": "DataNodeId_pnzAW",
"type": "S3DataNode"
},
{
"myComment" : "Format for the S3 Path",
"name": "DefaultDataFormat1",
"column": "not_used STRING",
"id": "DataFormatId_xqWRk",
"type": "CSV"
},
{
"myComment" : "Format for the DynamoDB table",
"name": "dynamoExportFormat",
"id": "dynamoExportFormat",
"column": "not_used STRING",
"type": "DynamoDBExportDataFormat"
}
],
"parameters": [
{
"description": "Output S3 folder",
"id": "targetS3Bucket",
"type": "AWS::S3::ObjectKey"
},
{
"description": "DynamoDB table name",
"id": "dynamoTableName",
"type": "String"
},
{
"description": "S3 to DynamoDB Column Mapping",
"id": "dynamoTableColMapping",
"type": "String"
},
{
"description": "S3 Column Mappings",
"id": "myS3ColMapping",
"type": "String"
},
{
"description": "DataPipeline Log Uri",
"id": "logUri",
"type": "String"
}
]
}
Sample 2:
{
"objects": [
{
"myComment" : "Activity used to run hive script to import CSV data",
"output": {
"ref": "DataNodeId_pnzAW"
},
"input": {
"ref": "DataNodeId_cERqb"
},
"name": "TableRestoreActivity",
"hiveScript": "DROP TABLE IF EXISTS tempTable;\n\nDROP TABLE IF EXISTS s3Table;\n\nCREATE EXTERNAL TABLE tempTable (#{dynamoColDefn})\nSTORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' \nTBLPROPERTIES (\"dynamodb.table.name\" = \"#{dynamoTableName}\", \"dynamodb.column.mapping\" = \"#{dynamoTableColMapping}\");\n \nCREATE EXTERNAL TABLE s3Table (#{myS3ColMapping})\nROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\\\n' LOCATION '#{sourceS3Bucket}';\n \nINSERT OVERWRITE TABLE tempTable SELECT * FROM s3Table;",
"id": "TableRestoreActivity",
"runsOn": { "ref" : "EmrClusterForRestore" },
"stage": "false",
"type": "HiveActivity"
},
{
"myComment" : "The DynamoDB table from which we need to import data from",
"dataFormat": {
"ref": "DDBExportFormat"
},
"name": "DynamoDB",
"id": "DataNodeId_cERqb",
"type": "DynamoDBDataNode",
"tableName": "#{dynamoTableName}"
},
{
"failureAndRerunMode": "CASCADE",
"resourceRole": "DataPipelineDefaultResourceRole",
"role": "DataPipelineDefaultRole",
"pipelineLogUri": "#{logUri}",
"scheduleType": "ONDEMAND",
"name": "Default",
"id": "Default"
},
{
"name": "EmrClusterForRestore",
"coreInstanceType": "m1.medium",
"coreInstanceCount": "1",
"masterInstanceType": "m1.medium",
"releaseLabel": "emr-4.4.0",
"id": "EmrClusterForRestore",
"type": "EmrCluster",
"terminateAfter": "1 Hour"
},
{
"myComment" : "The S3 path from which data is imported",
"directoryPath": "#{sourceS3Bucket}",
"dataFormat": {
"ref": "DataFormatId_xqWRk"
},
"name": "S3DataNode",
"id": "DataNodeId_pnzAW",
"type": "S3DataNode"
},
{
"myComment" : "Format for the S3 Path",
"name": "DefaultDataFormat1",
"column": "not_used STRING",
"id": "DataFormatId_xqWRk",
"type": "CSV"
},
{
"myComment" : "Format for the DynamoDB table",
"name": "DDBExportFormat",
"id": "DDBExportFormat",
"column": "not_used STRING",
"type": "DynamoDBExportDataFormat"
}
],
"parameters": [
{
"description": "Input S3 folder",
"id": "sourceS3Bucket",
"default": "s3://aws-datapipeline-csv/",
"type": "AWS::S3::ObjectKey"
},
{
"description": "DynamoDB table name",
"id": "dynamoTableName",
"type": "String"
},
{
"description": "S3 to DynamoDB Column Mapping",
"id": "dynamoTableColMapping",
"default" : "id:id,age:age,job:job,education:education ",
"type": "String"
},
{
"description": "S3 Column Mappings",
"id": "myS3ColMapping",
"default" : "id string,age int,job string,education string",
"type": "String"
},
{
"description": "DynamoDB Column Mappings",
"id": "dynamoColDefn",
"default" : "id string,age bigint,job string, education string ",
"type": "String"
},
{
"description": "DataPipeline Log Uri",
"id": "logUri",
"type": "AWS::S3::ObjectKey"
}
]
}
Sample 3:
{
"objects": [
{
"myComment": "Default configuration for objects in the pipeline.",
"name": "Default",
"id": "Default",
"failureAndRerunMode": "CASCADE",
"schedule": {
"ref": "DefaultSchedule"
},
"resourceRole": "DataPipelineDefaultResourceRole",
"role": "DataPipelineDefaultRole",
"pipelineLogUri": "#{logUri}",
"scheduleType": "cron"
},
{
"myComment": "Connection details for the Redshift cluster.",
"name": "DefaultDatabase1",
"id": "DatabaseId_Kw6D8",
"connectionString": "#{myConnectionString}",
"databaseName": "#{myRedshiftDatabase}",
"*password": "#{myRedshiftPassword}",
"type": "RedshiftDatabase",
"username": "#{myRedshiftUsername}"
},
{
"myComment": "This object is used to provide the resource where the copy job is invoked.",
"name": "DefaultResource1",
"id": "ResourceId_idL0Y",
"resourceRole": "DataPipelineDefaultResourceRole",
"role": "DataPipelineDefaultRole",
"type": "Ec2Resource",
"terminateAfter": "1 Hour"
},
{
"myComment": "This object is used to specify the copy activity for moving data from DynamoDB to Redshift.",
"name": "CopyFromDDBToRedshift",
"id": "ActivityId_asVf6",
"database": {
"ref": "DatabaseId_Kw6D8"
},
"runsOn": {
"ref": "ResourceId_idL0Y"
},
"type": "SqlActivity",
"script": "#{myScript}"
},
{
"myComment": "This object is used to control the task schedule.",
"name": "RunOnce",
"id": "DefaultSchedule",
"occurrences": "1",
"period": "2 Days",
"type": "Schedule",
"startAt": "FIRST_ACTIVATION_DATE_TIME"
}
],
"parameters": []
}
Do you know what is AWS Fargate? Here is a comprehensive guide to What is AWS Fargate? Amazon EC2 vs. Amazon Fargate
Precondition – A precondition specifies a condition which must evaluate to tru for an activity to be executed. For example Presence of Source Data Table or S3 bucket prior to performing operations on it.
AWS Data Pipeline provides certain prebuilt Precondition elements such as :
Action – Actions are event handlers which are executed in response to pipeline events.
Examples include:
For example, if a specific pipeline component executes for a time greater than maximum configured value, a SNSAlarm can be sent to the administrator. AWS Data Pipeline provides event handlers on pipeline components such as onSuccess, OnFail, and onLateAction where these actions can be made use of.
Let us take a look at a simple program in R which prints “Hello World.” This can be accomplished either from the command line in the R interpreter or via an R script. Let us look at both mechanisms.
Are you a beginner in AWS? Have a look at this AWS Tutorial for Beginner - A Detailed Guide to Cloud Computing!
Instance and Attempts: When a pipeline is executed, the pipeline components defined in pipeline definition file are compiled to set of executable instances. For fault tolerance, any instance that undergoes failure is retried multiple times as per its configuration limit. These retries are tracked using Attempt objects corresponding to an instance.
Create pipeline:
$ aws datapipeline create-pipeline --name mypipeline --unique-id mypipeline
{
"pipelineId": "df-032468332JABJ6QUWCR6"
}
Associate pipeline with pipeline definition file:
$ aws datapipeline put-pipeline-definition --pipeline-id df-032468332JABJ6QUWCR6 --pipeline-definition file://mypipeline.json --parameter-values myS3LogsPath="s3://"
{
"validationErrors": [],
"validationWarnings": [],
"errored": false
}
Activate pipeline:
$> aws datapipeline activate-pipeline --pipeline-id df-032468332JABJ6QUWCR6
Runtime monitoring of pipeline:
$ aws datapipeline list-runs --pipeline-id df-032468332JABJ6QUWCR6
# Name Scheduled Start Status
# ID Started Ended
# ---------------------------------------------------------------------------------------------------
# 1. EC2Resource_mypipeline 2019-04-14T12:51:36 RUNNING
# @EC2Resource_mypipeline_2019-04-14T16:51:56 2019-04-14T12:51:39
#
# 2. ShellCommandActivity_mypipeline 2019-04-14T12:51:36 WAITING_FOR_RUNNER
# @ShellCommandActivity_mypipeline_2019-04-14T16:51: 2019-04-14T12:51:39
While you are going through the various AWS topics, do not forget to test your AWS aptitude with this fun-filled AWS Quiz.
Diagnostics/Troubleshooting:
AWS CloudTrail captures all API calls for AWS Data Pipeline as events. These events can be streamed to a target S3 bucket by creating a trail from the AWS console. A CloudTrail event represents a single request from any source and includes information about the requested action, the date and time of the action, request parameters, and so on. This information is quite useful when diagnosing a problem on a configured data pipeline at runtime. The log entry format is as follows:
{
"Records": [
{
"eventVersion": "1.0",
"userIdentity": {
"type": "Root",
"principalId": "12120",
"arn": "arn:aws:iam::user-account-id:root",
"accountId": "user-account-id",
"accessKeyId": "user-access-key"
},
"eventTime": "2019-04-13T19:15:15Z",
"eventSource": "datapipeline.amazonaws.com",
"eventName": "CreatePipeline",
"awsRegion": "us-west-1",
"sourceIPAddress": "72.21.196.64",
"userAgent": "aws-cli/1.5.2 Python/2.7.5 Darwin/13.4.0",
"requestParameters": {
"name": "demopipeline",
"uniqueId": "myunique"
},
"responseElements": {
"pipelineId": "df-03265491AP32SAMPLE"
},
"requestID": "352ba1e2-6c21-11dd-8816-c3c09bc09c8a",
"eventID": "9f99dce0-0732-49b6-baaa-e77943195632",
"eventType": "AwsApiCall",
"recipientAccountId": "user-account-id"
},
...
]
}
Also, consider joining our AWS community to learn about the latest trends in AWS and meet like-minded professionals to gain industry insight.
An online tool called AWS Data Pipeline enables you to regularly process and transfer data across various AWS computing and storage services as well as on-premises data sources.
You may automate data transfer and transformation with AWS Data Pipeline, an ETL service. Every time an interval is scheduled, an Amazon EMR cluster is started, jobs are submitted to the cluster as steps, and the cluster is shut down once all tasks have been finished
A network system called a data pipeline enables the transfer of data between a source site and a target location.
AWS provides an array of features. They are-
i) Flexibility
ii)Cost-effectiveness
iii) Scalability.
iv) Security
v) Elasticity
Some of the AWS Cloud products are as follows-
i) Compute
ii) Storage
iii) Database
iv) Analysis
v) Networking.
The following is everything you will learn: AWS Cloud Computing, AWS Architecture Identity Access Management & S3 Amazon VPC, DynamoDB, Redshift Configuration Management, Automation, AWS Route 53 Networking, Monitoring, Security Groups, Elastic Compute Cloud (EC2) Databases, Application Services
Technical Skills for AWS Solution Architect jobs
Here is the list of major soft skills of an AWS professional:
AWS has over 1 million customers associated with it, which means job opportunities in this field are extravagant.
AWS Data Pipeline is a powerful service in the Amazon Cloud Environment for creating powerful data workflows while making leveraging all kinds of storage and compute resources available in the ecosystem. Data Pipeline makes it feasible to design big data applications involving several terabytes of data from varied sources to be analysed systematically on the cloud. Any technologist working on data analytics in the cloud space should try to acquire skills related to this service.So, jumpstart your career with an expert-led AWS Certification course that can help you accomplish the most-demanding job profile with ample opportunities and lucrative salaries.
FaceBook Twitter LinkedIn Pinterest EmailA dynamic, highly professional, and a global online training course provider committed to propelling the next generation of technology learners with a whole new way of training experience.
Cyber Security
QA
Salesforce
Business Analyst
MS SQL Server
Data Science
DevOps
Hadoop
Python
Artificial Intelligence
Machine Learning
Tableau
Search Posts
Related Posts
Receive Latest Materials and Offers on AWS Course
Interviews