I have been involved in the development and implementation of a fairly large data warehousing application project for several years now. One of the key aspects of this application has been data validation since the source data originates at a national level from a large number of organisations who do not conform to a single set of data standards. Until now the source data was first brought in to a staging area within a relational database to be profiled and validated. This has turned out to be a very messy and cumbersome process of re-formatting the source data, the majority of which arrive on a regular basis as semi-structured text files, to fit into a relational database. The results of the validation interfaces to a feedback loop where any issues with the data that cannot be locally cleansed is notified to the data provider. The turn-around time from receiving the data to notifying the provider of any issues has been one of the major bottlenecks in delivering timely information to the end-users.
As a pilot project, I have been re-developing some of the data validation processors on a small Hadoop cluster using scaled down versions of the test data sets. So I decided to document how I converted one of those procedures from SQL to Apache Pig on Hadoop.
The platform I have used for this pilot is Apache Hadoop 2.6.0, Flume 1.5.2, Pig 0.14, running on a 4 node CentOS cluster.
The dataset
This dataset consists of patient diagnosis records from doctors clinics within a region. The frequency of data supply varies from provider to provider ranging from the more common once a month to a couple a week. Some supply all the data extracted for the period in one file and others prefer to split them into several files each file containing data for a smaller period.
The structure of the source data file is as follows: (the data is text qualified by quotes)
"HEADER","20110118","20101218","20110118","T00002"
"0000000000000000075412","20110107","2699","D","20110107","2315.","","","","","","C"
"0000000000000000075412","20110107","2699","D","20110107","246..","162","74","","","","B"
<.... more records ....>
The first row is a header with the following fields:
Field 1:- Header tag
Field 2:- End date of data range
Field 3:- Start date of data range
Field 4:- Extracted date
Field 5:- Data file provider code (which will usually be the clinic)
The rest of the rows contain the diagnosis data:
Field 1:- Person identifier
Field 2:- Activity date (also called the diagnosis date)
Fields 3 to 12:- Other details of the diagnosis (there could be several diagnosis on the same day)
Validation process
Between two data files extracted for consecutive periods, there is very often an overlap from a few days to a couple of weeks. This overlapping period is typically due to the need to re-extract any alterations that may have taken place to the most recent data entries. An a rule, no data should ever be removed but they should be flagged as deleted. This validation process is to ensure that within the overlapping, the number of records remain the same. Otherwise the latest data file is deemed to be truncated and should fail the validation. Truncated data files will not be passed to the subsequent data loading process.
Data files received are loaded to the Hadoop HDFS cluster using Flume. Once in HDFS, a shell script iterates through them and calls the validation Pig script for each file. The script can also be called from the command line like so:
$datafile:- The fully qualified name of the data file
$lastheader:- The fully qualified name of the header file extracted from the previous valid data file from the same provider. This file is created as part of the validation and loading process
$lastdatacounts:- The fully qualified name of the daily counts file aggregated from the previous valid data file from the same provider. This file is created as part of the validation and loading process
$validationresultfile:- The fully qualified name of the resulting validation file
The Pig script that performs this validation is listed bellow in sections with a general description about what the code is doing bellow each section:
Now that the daily counts of the current data file is established, its time to retrieve the counts of the previous data file from the same provider. One important point to note here is that if this is the very first data file and no previous file exists, then there would be no need to perform this validation. The controlling shell script does a check for this prior to calling this Pig script.
Then the header row of the current data file held in variable currentheader is joined to the header row of the previous data file now held in variable lastheader. The JOIN command is performed based on the Provider field. The joined up headers is stored in a new relation variable called combinedheaders. The combined header relation gives access to the start date of the current data file and the end date of the previous data file which in effect forms the date range of the overlapping period.
Now that the overlapping period is established with the combined headers, this information is used to filter in the daily counts from both the currentdatacounts and lastdatacounts relations using the FILTER command to be stored in two new relation variables named overlaped_currentdatacounts and overlaped_lastdatacounts.
These two relations are in turn joined together by provider field which facilitates the generation of the final validation result relation. The final validation result is a single record for each data file with the following field structure stored in new relation variable named validationresult:
Validation:- A string constant identifying the specific validation. In this case its "OverlappedActivityCount"
Provider:- Data file provider code
StartDate:- Start date of data range (as specified in the header of the current data file)
EndDate:- End date of data range (as specified in the header of the current data file)
ValidationResult:- A boolean field indicating the result of the validation as true or false, i.e. If the count of activity records between the overlapping periods in both the current and the previous data files match, it will be a "TRUE", otherwise it will be a "FALSE".
After the data files are validated, the resulting validation files will be used by the subsequent process to move the valid data files to a loading area and the invalid files will be moved to an invalid area to be put in to the feed-back loop and notified the data provider.
Implementing the validation and cleansing code in Pig within the Hadoop environment, reduces the time-quality trade-off and the requirement to move data to external systems to perform cleansing. The high-level overview of implementation is depicted in the following diagram:
The following are the advantages of performing data cleansing within the Hadoop environment using Pig:
pig -param datafile=data/T00002_diags_2.csv -param lastheader=validations/T00002_lastheader -param lastdatacounts=validations/T00002_lastdatacounts -param validationresultfile=validations/T00002_datacounts_validation$datafile:- The fully qualified name of the data file
$lastheader:- The fully qualified name of the header file extracted from the previous valid data file from the same provider. This file is created as part of the validation and loading process
$lastdatacounts:- The fully qualified name of the daily counts file aggregated from the previous valid data file from the same provider. This file is created as part of the validation and loading process
$validationresultfile:- The fully qualified name of the resulting validation file
The Pig script that performs this validation is listed bellow in sections with a general description about what the code is doing bellow each section:
-- Load the source data
source = LOAD '$datafile' USING PigStorage(',');
-- Split the header record and data records
SPLIT source INTO header IF $0 == '"HEADER"', data OTHERWISE;
-- Format by removing text qualifiers and set required fields
currentheader = FOREACH header GENERATE 
 REPLACE($4,'\\"','') AS (Provider:chararray), 
 ToDate(REPLACE($2,'\\"','')) AS (StartDate:datetime), 
 ToDate(REPLACE($1,'\\"','')) AS (EndDate:datetime);
currentdata = FOREACH data GENERATE 
 REPLACE(header.$4,'\\"','') AS (Provider:chararray), 
 ToDate(REPLACE($1,'\\"','')) AS (ActivityDate:datetime);
-- Group activity by date and count number of activities for each day
datagrouped = GROUP currentdata BY (ActivityDate);
currentdatacounts = FOREACH datagrouped {
    Provider = DISTINCT currentdata.Surgery; -- set joining field to the header
    GENERATE 
  FLATTEN(Provider) AS Surgery, 
  group AS ActivityDate, 
  COUNT(currentdata) AS (ActivityCount:long);
 };Now that the daily counts of the current data file is established, its time to retrieve the counts of the previous data file from the same provider. One important point to note here is that if this is the very first data file and no previous file exists, then there would be no need to perform this validation. The controlling shell script does a check for this prior to calling this Pig script.
-- Load the last event header and validation counts
lastheader = LOAD '$lastheader' AS (Provider:chararray,StartDate:datetime,EndDate:datetime);
lastdatacounts = LOAD '$lastdatacounts' AS (Provider:chararray,ActivityDate:datetime,ActivityCount:long);
-- Join the current header with the last header
combinedheaders = JOIN currentheader BY Provider, lastheader BY Provider;
-- Join the combined headers to the current data counts
combinedheaders_currentdatacounts_joined = JOIN combinedheaders BY currentheader::Provider, currentdatacounts BY Provider;
-- Join the combined headers to the last data counts
combinedheaders_lastdatacounts_joined = JOIN combinedheaders BY lastheader::Provider, lastdatacounts BY Provider;
-- Filter out the current data in the overlapping period between current and last
overlaped_currentdatacounts = FILTER combinedheaders_currentdatacounts_joined BY currentdatacounts::ActivityDate >= currentheader::StartDate 
 AND currentdatacounts::ActivityDate <= lastheader::EndDate;
-- Filter out the last data in the overlapping period between current and last
overlaped_lastdatacounts = FILTER combinedheaders_lastdatacounts_joined BY lastdatacounts::ActivityDate >= currentheader::StartDate 
 AND lastdatacounts::ActivityDate <= lastheader::EndDate;Then the header row of the current data file held in variable currentheader is joined to the header row of the previous data file now held in variable lastheader. The JOIN command is performed based on the Provider field. The joined up headers is stored in a new relation variable called combinedheaders. The combined header relation gives access to the start date of the current data file and the end date of the previous data file which in effect forms the date range of the overlapping period.
Now that the overlapping period is established with the combined headers, this information is used to filter in the daily counts from both the currentdatacounts and lastdatacounts relations using the FILTER command to be stored in two new relation variables named overlaped_currentdatacounts and overlaped_lastdatacounts.
-- Group the current overlapping data by header and sum up the data counts
overlaped_currentdatacounts_grouped = GROUP overlaped_currentdatacounts BY (currentheader::Provider,currentheader::StartDate,currentheader::EndDate);
totaloverlaped_currentdatacounts = FOREACH overlaped_currentdatacounts_grouped GENERATE 
 group, 
 SUM(overlaped_currentdatacounts.currentdatacounts::ActivityCount) AS (TotalOverlapActivityCounts:long);
-- Group the last overlapping data by header and sum up the data counts
overlaped_lastdatacounts_grouped = GROUP overlaped_lastdatacounts BY (lastheader::Provider,lastheader::StartDate,lastheader::EndDate);
totaloverlaped_lastdatacounts = FOREACH overlaped_lastdatacounts_grouped GENERATE 
 group, 
 SUM(overlaped_lastdatacounts.lastdatacounts::ActivityCount) AS (TotalLastOverlapActivityCounts:long);
-- Join the results together and create validation record
totaloverlapped_joined = JOIN totaloverlaped_currentdatacounts BY group.combinedheaders::currentheader::Provider, 
 totaloverlaped_lastdatacounts BY group.combinedheaders::lastheader::Provider;
validationresult = FOREACH totaloverlapped_joined GENERATE
 'OverlappedActivityCount' AS (Validation:chararray),
    totaloverlaped_currentdatacounts::group.combinedheaders::currentheader::Provider,
    totaloverlaped_currentdatacounts::group.combinedheaders::currentheader::StartDate,
    totaloverlaped_currentdatacounts::group.combinedheaders::currentheader::EndDate,
    (totaloverlaped_currentdatacounts::TotalOverlapActivityCounts == totaloverlaped_lastdatacounts::TotalLastOverlapActivityCounts ? true : false)
  AS(ValidationResult:boolean);These two relations are in turn joined together by provider field which facilitates the generation of the final validation result relation. The final validation result is a single record for each data file with the following field structure stored in new relation variable named validationresult:
Validation:- A string constant identifying the specific validation. In this case its "OverlappedActivityCount"
Provider:- Data file provider code
StartDate:- Start date of data range (as specified in the header of the current data file)
EndDate:- End date of data range (as specified in the header of the current data file)
ValidationResult:- A boolean field indicating the result of the validation as true or false, i.e. If the count of activity records between the overlapping periods in both the current and the previous data files match, it will be a "TRUE", otherwise it will be a "FALSE".
-- Store the final result of the validation
STORE validationresult INTO '$validationresultfile';After the data files are validated, the resulting validation files will be used by the subsequent process to move the valid data files to a loading area and the invalid files will be moved to an invalid area to be put in to the feed-back loop and notified the data provider.
Choosing Pig for data validation on a Hadoop platform
The following is an extract from the book Pig Design Patterns (Pradeep Pasupuleti (2014) Pig Design Patterns, : Packt Publishing.)
The following are the advantages of performing data cleansing within the Hadoop environment using Pig:
- Improved overall performance since validation and cleansing are done in the same environment. There is no need to transfer data to external systems for cleansing.
- Pig is highly suitable to write code for validating and cleansing scripts since the built-in functions are geared towards processing messy data and for exploratory analysis.
- Pig enables automating of the cleansing process by chaining complex workflows, which is very handy for datasets that are periodically updated.
 
 
0 comments:
Post a Comment