Introduction
Distributed and parallel computing frameworks are essential to design and implement scalable big data analysis algorithms and pipelines that perform well with the increasing volume, variety and velocity of big data sets. The mainstream big data analysis frameworks [1]–[6] use divide-and-conquer as a general strategy to manage and process big data on computing clusters. This strategy works by dividing a big data set into small data blocks, distributing these blocks on the data nodes of a computing cluster and running data-parallel operations on them. Nevertheless, the ability to analyze big data on computing clusters is limited to the available resources which may never be enough considering the speed of increase of data volume in different application domains [7]. This is particularly critical due to the significant computational costs to run complex data analysis algorithms on computing clusters, e.g., costs of communications, I/O operations and memory. Consequently, analyzing a big data set all at once may require more than the available resources in order to meet specific application requirements [8], [9]. Random sampling is a common strategy to alleviate these challenges [10], e.g., in approximate and incremental computing [8], [11]–[15]. However, drawing random samples from big data is itself an expensive operation [16] especially with the shared-nothing architectures in the mainstream distributed computing frameworks for big data analysis.
In fact, a key question at the heart of big data analysis is whether the whole big data set should be analyzed or analysis of a subset of data is enough to explore the statistical properties of the big data set and discover patterns from it. If we could obtain approximate results for the entire data set by analyzing only a subset of its small data blocks in parallel to satisfy application requirements, we would be able to reduce the computational costs and make big data analysis not limited to the available resources. Our empirical study [17] showed that a few data blocks of a big data set are enough to obtain ensemble classification models which are approximate to those built from the entire data set. This observation can be generalized to other algorithms in the big data analysis pipeline, e.g., for exploratory data analysis, regression and clustering. We found that the precondition for approximate analysis of big data with some data blocks is that the probability distributions of data in these blocks have to be similar to the probability distribution of the entire data set. However, the data blocks generated by current data partitioning methods [16] do not necessarily satisfy the precondition because these methods do not consider the statistical properties of the data. For instance, when importing a big data file into HDFS, this file is sequentially divided (i.e., range partition) into HDFS data blocks. Consequently, using HDFS data blocks directly as random samples for data analysis may lead to statistically incorrect or biased results because these blocks are not, in general, random samples of the entire data set.
To tackle the above problem, we recently proposed to use a new strategy to divide a big data set into small random sample data blocks. Assume that
In this paper, we propose a new method for big data analysis based on the RSP data model. Given an RSP of a big data set
To enable data scientists to use the RSP-based method on computing clusters, we also propose Alpha framework as an extension to the early Asymptotic Ensemble Learning Framework [17]. Alpha framework is a distributed data-parallel framework for RSP-based big data analysis on computing clusters. It uses the RSP data model to represent big data sets stored in HDFS and runs data-parallel analysis operations on samples of RSP blocks. It provides key functionalities for management, sampling and analysis of RSP blocks of big data. We implemented a prototype system of Alpha framework using the RevoScaleR package of Microsoft R Server. In this paper, we demonstrate the performance of Alpha framework for exploratory data analysis, classification, and regression. We present the experimental results of three real data sets produced on a small computing cluster of 5 nodes. The results show that a few RSP blocks are sufficient to explore the statistical properties of the data set and produce approximate results equivalent to those computed from the entire data set.
The remainder of this paper is organized as follows. We review some preliminaries used in this research including basic definitions of the RSP model in Sect. II. After that, we introduce the RSP-based method for big data analysis in Sect. III. In Sect. IV, the design, architecture and prototype implementation of Alpha framework are described. Then, experimental results are discussed in Sect. V. In Sect. VI, we further discuss the RSP-based method and framework for big data analysis. Then, we summarize related works in Sect. VII. Finally, we draw some conclusions and state some future perspectives in Sect. VIII.
Preliminaries
In this section, we first briefly review the current mainstream distributed computing frameworks and their common shortcomings in big data analysis. Then, we discuss the random sampling strategy in big data analysis and the technical difficulties of sampling from distributed big data. After that, we introduce the random sample partition data model for distributed data sets. Finally, we briefly review the ensemble methods in data analysis.
A. Mainstream Distributed Computing Frameworks
As data volume in different application areas goes beyond the petabyte scale, divide-and-conquer [20] is taken as a general strategy to process big data on computing clusters considering the recent advancements in distributed and parallel computing technology. In this strategy, a big data set is divided into small non-overlapping data blocks and distributed on the nodes of a computing cluster using a distributed file system such as Hadoop Distributed File System (HDFS) [21]. Then, data-parallel computations are run on these blocks considering data locality. After that, intermediate results from processed blocks are combined to produce the final result of the whole data set. This is usually done using the MapReduce computing model [1], [22] adopted by the mainstream big data frameworks such as Apache Hadoop,2 Apache Spark,3 and Microsoft R Server.4
These frameworks provide not only scalable implementations of data mining and machine learning algorithms but also high-level APIs for applications to make use of these algorithms to process and analyze big data. As a unified engine for big data processing, Apache Spark has been adopted in a variety of applications in both academia and industry [23], [24] after Hadoop MapReduce. It uses a new data abstraction and in-memory computation model, RDDs (Resilient Distributed Datasets) [2], where collections of objects (e.g., records) are partitioned across the data nodes of a cluster and processed in parallel. This in-memory computation model eliminates the cost of reading and writing local data blocks in each iteration of an algorithm. Therefore, it is very efficient for iterative algorithms in comparison to the MapReduce model. Microsoft R Server addresses the in-memory limitations of the open source statistical language, R, by parallel and chunk-wise distributed processing of data across multiple cores and nodes. It comes with the proprietary eXternal Data Frame (XDF) format and the Parallel External Memory Algorithms (PEMAs) framework for statistical analysis and machine learning. Both Apache Spark and Microsoft R Server operate on data stored in HDFS.
The MapReduce computing model is efficient for data analysis tasks that only scan the entire big data set once. However, for complex data analysis tasks that require to process the data set iteratively, Hadoop MapReduce becomes inefficient because of heavy I/O and communication costs [25]. Using the in-memory computation model of Apache Spark or the chunk-wise processing of Microsoft R Server, big data can be processed extremely fast if the entire big data set can be held in the memory of the cluster. However, if the memory is not large enough to hold all data blocks of a big data set, the computation will dramatically slow down. Therefore, the size of memory is a limit to these systems in analyzing big data. One common shortcoming of these frameworks in big data analysis is that the whole data set has to be processed because the distributed data blocks cannot be used as random samples of the big data. This is due to the fact that the probability distribution of the data is not considered when partitioning big data in these frameworks. In fact, data partitioning is a key problem in big data analytics because it affects the statistical quality of the distributed data blocks, and thus, the efficiency and effectiveness of big data queries and analysis algorithms [16], [26]–[31]. The other shortcoming is that to get the correct result, algorithms need to be parallelized and run in parallel on the distributed data blocks. A lot of effort has to be spent on parallelizing algorithms. To overcome these shortcomings, we take a different strategy to make distributed data blocks as random samples of the big data set. Our objective is to enable the use of a subset of the distributed data blocks to produce approximate results without processing the whole data set in memory or parallelizing data analysis and mining algorithms.
B. Random Sampling
Random sampling is a basic strategy to analyze big data sets or unknown populations [10], [32]. We consider that a big data set is a random sample of the population in a certain application domain. For example, a big data set of customers is a random sample of the customer’s population in a company. Thus, we can use such data set to estimate the distribution of all customers in the company. For big data analysis, a data set is usually represented as a set of objects described by a set of attributes (i.e., features). For example, a multivariate data set
Definition 1 (Random Sample):
Let \begin{equation*} E[F(x)]=\mathbb {F}(x),\end{equation*}
A sample is drawn from
In general, obtaining a random sample from a distributed big data set is an expensive computational task [16]. It requires a complete scan of the distributed big data set which results in communication cost and local I/O costs. Instead of record-level sampling, block-level sampling considers that records in a big data set are aggregated into a set of data blocks, each containing a small set of records stored together. Block-level sampling is more efficient than record-level sampling as it requires accessing only a small number of blocks [42]. In this case, the selected blocks are considered as random samples of
C. Random Sample Partition Data Model
In this section, we introduce the random sample partition data model recently proposed to represent a big data set
Definition 2 (Partition of Data Set):
Let
;$\bigcup \limits _{k = 1}^{K} {{{{D}}_{k}}} = \mathbb {D}$ , when${{{D}}_{i}} \cap {{{D}}_{j}} = \emptyset $ and$i,j \in \left \{{ {1,2, \cdots ,K} }\right \}$ .$i \ne j$
According to this definition, many partitions exist for
To enable the distributed data blocks of
Definition 3 (Random Sample Partition):
Assume \begin{equation*} E[F_{k}(x)]=\mathbb {F}(x)\quad {{ for~each}}~k=1,2,\cdots ,K,\end{equation*}
We formally proved that the expectation of the sample distribution function (s.d.f.) of an RSP block equals to the s.d.f. of
Definition 4 (Block-Level Sample):
Let T be the set of K RSP blocks of a big data set
According to Definition 3, each RSP block in S is a random sample of
D. Ensemble Methods for Data Analysis
Ensemble methods are widely used in machine learning to build ensemble models, e.g., ensemble classifiers [46]. An ensemble model combines multiple base models built from different subsets or samples of the data using the same algorithm (data-centered ensembles) or different algorithms (model-centered ensembles). An ensemble model often outperforms the individual base models [47], [48]. One method for data-centered ensembles is bagging (bootstrap aggregation) which builds base models from multiple random samples drawn without replacement from the data set. This method is used in random forests [49] which uses decision trees as base models. There are different ways to combine base models in an ensemble model such as simple averaging (for regression) and voting (for classification). In this work, we use ensemble methods to obtain approximate results from big data by combining estimates or models computed from a sample of RSP blocks.
Big Data Analysis with RSP Data Blocks
In this section, we propose the RSP-based method for big data analysis. This method uses RSP blocks from an RSP of a big data set
We use the notations in Table 1 to describe the key steps in this method. Among them,
Figure 1 shows the main steps of the RSP-based method to analyze
To analyze a sample of RSP blocks, f is applied in parallel to each selected RSP block and the outputs from these blocks are collected for evaluation, comparison, and further analysis.
A. RSP Generation
An RSP is generated from
Stage 1: Divide
into P non-overlapping subsets$\mathbb {D}$ of equal size. These subsets form a partition of$\{ {{{D}}_{i}}\} _{i = 1}^{P}$ .$\mathbb {D}$ Stage 2: Each RSP block is built using records from all the previous P subsets. This requires a distributed randomization operation as follows:
Step 1: Randomize each
for${{{D}}_{i}}$ ;$1 \leq i \leq P$ Step 2: Cut each randomized
into K slices${{{D}}_{i}}$ . These slices form an RSP of$\{ {{D(}}i,j{{)}}\} _{j = 1}^{K}$ , for${{{D}}_{i}}$ ;$1 \leq i \leq P$ Step 3: From
slices$P \times K$ for${\{{D}}(i,j)\}$ and$1 \leq i \leq P$ , merge the slices of$1 \leq j \leq K$ for$\{ {{D(}}i,j{{)}}\} _{i = 1}^{P}$ to form K subsets$1 \leq j \leq K$ . These subsets form an RSP of$\{ {{D(\cdot , }}j{{)}}\} _{j = 1}^{Q}$ .$\mathbb {D}$
B. RSP Blocks Sampling
The RSP blocks of T are evenly distributed on the data nodes of a computing cluster. The distribution details of these RSP blocks, including their locations, are recorded in the metadata of T. To select g RSP blocks from T without replacement, the number of RSP blocks has to be evenly assigned to each data node and the given number of RSP blocks is randomly selected from the corresponding data node.
Algorithm 1 shows the RSP blocks sampling process. The
Algorithm 1 RSP Blocks Sampling
T: RSP of
;$\mathbb {D}$ g: number of RSP blocks to be selected;
f: data analysis function;
if
else
end if
S: a subset of RSP blocks from T;
C. RSP Blocks Analysis
After g RSP blocks are selected by Algorithm 1, these blocks are processed independently in parallel on the corresponding data nodes as described in Algorithm 2. After the execution, the individual outputs from these g RSP blocks are collected.
Algorithm 2 RSP Blocks Analysis
S: block-level sample with g RSP blocks;
f: data analysis function;
for all
Load
end for
Collect the outputs from blocks:
Depending on the requirements of the data analysis task (e.g., exploratory data analysis and predictive modeling), the collected outputs from this step are used for further investigation and analysis. The results from a sample of RSP blocks can be taken as preliminary insights. Further analysis can be done using other data analysis algorithms and methods such as ensemble methods.
D. RSP-Based Ensemble Estimates and Models
The outputs of Algorithm 2 are combined to produce ensemble estimates or models for
1) RSP-Based Ensemble Estimates
Let \begin{equation*} {\Theta _{{S}}} = {{avg}}{\left ({\{\theta _{q}\}_{q=1}^{g} }\right)}\tag{1}\end{equation*}
It can be proved that
2) RSP-Based Ensemble Models
Let \begin{equation*} y = \Pi _{{S}}({{x}}) = mode(\{\pi _{q}({{x}})\}_{q=1}^{g})\tag{2}\end{equation*}
If base models are regression trees, the ensemble response y is calculated by averaging the predictions from the base regression models on a new record \begin{equation*} y = \Pi _{{S}}({{x}})=avg(\{\pi _{q}({{x}})\}_{q=1}^{g})\tag{3}\end{equation*}
In a similar way to the RSP-based ensemble estimates, it can be shown that
Asymptotic ensemble estimates and models can be built by combining the results from a sufficient number of RSP blocks. This can be done using either one large sample of RSP blocks or multiple small samples of RSP blocks depending on the available computing resources.
E. RSP-Based Asymptotic Ensemble Learning
The RSP-based asymptotic ensemble learning uses a stepwise process to learn an asymptotic ensemble model or estimate from multiple batches of RSP blocks. Each batch is randomly selected from T without replacement. We propose this process to avoid loading a large number of RSP blocks in one batch. This enables a computing cluster with limited resources to be scalable to bigger data sets.
Algorithm 3 illustrates the RSP-based asymptotic ensemble learning process. To get an asymptotic ensemble model
Blocks Selection: Algorithm 1 is used to select a sample S of g RSP blocks from T;
Blocks Analysis: f is applied to each RSP block in S using Algorithm 2 to get a set of base models
.$\Pi _{{S}}$ Ensemble Update: the new g base models are appended to the previously obtained models
, if any;${\Pi }$ Evaluation: if the current ensemble result does not fulfill the termination condition(s), go back to step 2 to select a new sample of RSP blocks. This process continues until a satisfactory result is obtained or all RSP blocks are used up.
is an evaluation function to check whether the ensemble model satisfies the application requirements (e.g., error bounds).$\Omega ()$
Algorithm 3 Asymptotic Ensemble Learning Algorithm
T: RSP of
;$\mathbb {D}$ f: data analysis function;
g: number of RSP blocks to be selected in each batch;
Blocks Selection:
Blocks Analysis:
Ensemble Update:
Evaluation:
If
Stop
Else
Go to 1
Figure 2 illustrates the process of running this algorithm on multiple batches of RSP blocks. Each batch gets a different sample of RSP blocks. Each RSP block is only computed once by a given function f. The g base models are collected in
Several batches of RSP blocks are analyzed to gradually accumulate base and ensemble models. In each batch, different g RSP blocks are computed in parallel on their nodes using a given function f.
This RSP-based method requires a distributed data-parallel computing framework to efficiently compute estimates and models from big data sets. For this purpose, we propose Alpha framework in the next section.
Alpha Framework
Alpha framework is a distributed data-parallel framework for RSP-based big data analysis. It is designed as an extension to the Asymptotic Ensemble Learning Framework [17]. Alpha framework enables big data analysis with RSP blocks on computing clusters using current distributed computing engines, distributed file systems and existing sequential data analysis and mining algorithms. In this section, we first describe the storage of RSP blocks in HDFS. Then, we propose a distributed data-parallel analysis model to implement the RSP-based method on a computing cluster. After that, we present the architecture design of Alpha framework. Finally, we describe a prototype implementation of Alpha Framework using Microsoft R Server packages.
A. Storage of RSP Data Blocks
In Alpha framework, we use HDFS distributed file system to store the RSP blocks of a big data set
When storing a data set in HDFS, the default number of data blocks depends on the number of records assigned to each HDFS block which depends on the HDFS block size (128 MB in our case). Consequently, the number of records may be slightly different between HDFS blocks of a data set. However, a fixed number of records can be assigned to each block when generating an HDFS-RSP file. The number of HDFS-RSP blocks K can be different from the original number of HDFS blocks P. In addition to the metadata provided by HDFS, statistical metadata (e.g., data summary) and lineage (e.g., log of previous analysis tasks) can be maintained for the HDFS-RSP file and its blocks.
The RSP distributed data model is structurally similar to the existing in-memory distributed data abstractions (e.g., RDD) and on-disk distributed data formats (e.g., XDF). Consequently, an HDFS-RSP file can be imported into an RDD or an XDF file as any other HDFS file. The key difference is that each RSP block is a random sample. Any RSP blocks can be randomly selected, loaded and analyzed independently and in parallel. Therefore, the RSP data model can be implemented as an extension to the current distributed data models. However, it should be implemented as a disk-based data format (such as XDF) in order to avoid loading unnecessary blocks into memory. In our prototype implementation, we use the XDF format to store RSP blocks in HDFS and test the RSP-based method on Microsoft R Server as we describe in Sect. IV-D.
B. Distributed Data-Parallel Analysis Model
The key idea in Alpha framework is running f on a sample of RSP blocks, as shown in Figure 1, instead of running it on all data blocks. We follow a simplified map-reduce model but with the possibility of selecting and computing only a subset of RSP blocks. We can consider f as a map operation which takes an RSP block as input and returns an estimate value or model as output. There is no need for a reduce operation because f is executed on each RSP block independently. Also, there is no need to parallelize f because it runs on a single data node. We just need a simple collect operation to collect the outputs from data nodes to the master node of a computing cluster. Data shuffling among the nodes of the cluster is not required. On a computing cluster, this distributed data-parallel analysis model is implemented as follows:
A master process is initiated to run the main program on a master node;
The master process invokes the blocks selection component to get a list of g randomly selected RSP blocks for the analysis task;
The master process initiates a worker program for each selected RSP block on its node;
Each worker runs f on its RSP block to produce the output from this block;
The outputs from all g blocks are collected and returned to the master node;
The master process checks if another batch of RSP blocks is required (depending on input parameters or evaluation criteria). If so, go to step 2;
When no further batch is required or no more RSP blocks are available, the final outputs are saved.
In addition to the data-parallel analysis operations on RSP blocks, consensus operations are applied to the collected outputs from RSP blocks to produce ensemble models or estimates. This is done locally on a master or edge node of a computing cluster. The history of data-parallel analysis operations defines an RSP block’s lineage and the history of the applied consensus operations defines the HDFS-RSP file’s lineage.
C. Architecture
Figure 4 illustrates the architecture of Alpha framework which has three layers: data management, batch management, and data analysis. A distributed file system (e.g., HDFS) is required to store RSP blocks. A distributed computation engine (e.g., Microsoft R Server and Apache Spark) is also required to manage the execution of distributed data-parallel analysis operations.
1) Data Management
A fundamental data management layer is required to manage HDFS-RSP files, metadata, statistics and lineages. This layer provides operations for data partitioning and data access. These operations can be used by higher-level components or other computing frameworks.
Data partitioning:
is converted into a random sample partition T using the two-stage algorithm as described in Sect.III-A. This operation is executed only once to create an HDFS-RSP file from$\mathbb {D}$ .$\mathbb {D}$ Data access: This component is used to provide access to RSP blocks and their metadata, statistics and lineage.
2) Batch Management
In order to conduct distributed data-parallel analysis on selected RSP blocks, Alpha framework provides the following operations:
Blocks selection: For a given data analysis function f, a selection process is initiated to draw a sample S of g blocks from T as described in Algorithm 1. A selection index is created for each f where each RSP block of T has its key and selection status. The status of each RSP block, selected or not, is maintained for f.
Batch execution: A sample of RSP blocks is used as one batch. The data analysis function f is applied to the selected blocks in a perfectly parallel manner as described in Algorithm 2. This is done using existing distributed computation engines where a master process manages the execution as described in Sect. IV-B.
3) Blocks Analysis
This component is used as a library of analysis functions which can be invoked to analyze RSP blocks. As each RSP block is small enough to be processed on a single core or node, most existing implementations of data analysis and mining algorithms can be used directly to analyze RSP blocks. Alpha framework doesn’t need new parallel implementations of these algorithms. These functions can perform different data analysis tasks such as data summaries (e.g., min, max, mean, standard deviation, median) and data mining (e.g., classification, regression, clustering, outlier detection). This component can be used directly to obtain results from RSP blocks. It is also used as part of the asymptotic ensemble learning process to obtain results from multiple samples of RSP blocks.
4) Ensemble Analysis
This component is used to obtain RSP-based ensembles from a sample of RSP blocks. It is also used to start the asymptotic ensemble learning process and gradually build an RSP-based asymptotic ensemble as described in Algorithm 3. The user specifies the analysis task, analysis function f, ensemble function, blocks selection criteria, evaluation criteria, termination criteria and other task-specific parameters. Then, the blocks selection component is used to manage the selection of blocks for this task. After that, f is invoked from the blocks analysis component and runs on the selected blocks using the batch execution component. Base results from RSP blocks are combined using the specified ensemble function to form an ensemble result.
D. Prototype Implementation
We implemented a prototype of Alpha framework using Microsoft R Server with Apache Spark and HDFS. This prototype uses the RevoScaleR package (ScaleR) of Microsoft R Server due to its two key advantages. First, it comes with the disk-based XDF distributed data format which is appropriate to store RSP blocks. Second, the rxExec mechanism enables running a function in parallel using the available cores or nodes with the possibility of assigning different inputs and parameters for each time the function is executed. In this way, RevoScaleR enables running existing R functions in distributed and parallel environments. For the time being, all the key functionalities of Alpha framework were prototyped using RevoScaleR except for the RSP generation algorithm. This algorithm was implemented using Apache Spark as shown in [50]. However, this algorithm generates an HDFS-RSP file which can be imported into XDF format as any other HDFS file.
In this prototype, we consider that an HDFS-RSP file of
Task time is the time for running the target function f on an RSP block.
Job time is the time for running the target function in parallel on a batch of RSP blocks. It includes the communication time between Spark driver and executors, and the time for the parallel tasks;
rxExec time (batch total time) is the time for running rxExec on a batch of RSP blocks. In addition to Spark job time, the total time includes the communication time between R Server and Spark driver.
Using rxExec to run f on a sample of g RSP blocks. Each rxExec call initiates a Spark job with a single stage of g tasks.
Experiments and Results
In this section, we present the experimental results of three public real data sets to demonstrate the performance of the new RSP-based method for big data analysis. The characteristics of the data sets are shown in Table 2. First, we describe the experiment settings and the evaluation measures. Then, we present the results of each data set.
A. Experiment Settings
We used a computing cluster of 5 nodes. Each node has 12 cores (24 with Hyper-threading), 128 GB RAM and 1.1 TB disk space. Apache Hadoop 2.6.0, Apache Spark 1.6.0, and Microsoft R Server 9.1 were installed on the cluster and used in the experiment. Spark applications were run with 48 executors (2 cores and 4GB memory each) and HDFS was used with 128MB HDFS block size. In these settings, the total number of executor cores is 96 cores (i.e., the number of cores per executor
B. Evaluation Measures
For each data set, several RSPs were created with different numbers of RSP blocks. For each RSP, an ensemble estimate or model was built gradually using Algorithm 3 until all RSP blocks were used up and all estimates or models were combined in one ensemble. We used this algorithm for three data analysis tasks: data summary, classification and regression. RevoScaleR’s rxSummary6 function was used to compute summary statistics. RevoScaleR’s rxDTree7 function was used to build decision and regression trees. The rxDTree function was used with the default parameters8: minSplit = max(20, sqrt(numObs)), minBucket = round(minSplit/3), maxDepth = 10, cp = 0, maxCompete = 0, maxSurrogate = 0, useSurrogate = 2, surrogateStyle = 0, and xVal = 2.
Ensemble results were evaluated after each batch to check how the results change with more RSP blocks until all blocks were used up. To show the significance of the RSP-based results, we repeated the asymptotic ensemble learning process many times on each RSP model (100 times in most cases) and the averages of these results were calculated. In the following subsections, we use error bars to show the range of estimated values from multiple runs in each batch. To measure the performance of a model, we used the overall accuracy for classification and the Root Mean Square Error (RMSE) for regression.
As each RSP block is a random sample of the entire data set, any RSP block can be used as a holdout testing data to test the models. To get consistent results, we used the same holdout data to test models from different RSPs of the same data set. The overall accuracy of a classifier is defined as:\begin{equation*} Acc = \frac {tp+tn}{tp+tn+fp+fn}\tag{4}\end{equation*}
The RMSE of the predicted values \begin{equation*} RMSE = \sqrt {\frac {\sum _{i=1}^{n}{{(\hat {y}_{i}-y_{i})}^{2}}}{n}}\tag{5}\end{equation*}
A small number of RSP blocks was used in each batch (
C. Results of Covertype Data
Covertype9 is a small data set with 581,012 records, 54 features and seven categories of forests type. We experimented on it to demonstrate the properties of this RSP-based method on small data in comparison with bigger data. This experiment was conducted on a single machine with multiple cores.
As we demonstrated in our empirical study [17], records in Covertype are not randomized. In this experiment, three different RSPs were generated from Covertype data as shown in Table 3. K is the number of RSP blocks and n is the number of records in each RSP block. The asymptotic ensemble learning process was run 100 times on each RSP model with
Data summary: Figure 6(a) shows the estimated means of 4 features in Covertype data. Each point represents the average of the computed means from RSP blocks used up to that point. We can see that the error range decreases as more RSP blocks are used in the ensemble estimate. The error range becomes insignificant after a few batches and the estimated mean value converges to the true mean value in the end. Similar results of the estimated standard deviations for the same features can be observed in Figure 6(b).
Classification: Figure 7 shows how the RSP-based ensemble model accuracy changes after each batch until all RSP blocks are used up. As we can see, an ensemble model with an accuracy in the range of 0.74–0.75 was obtained using only 20% of Covertype data from any of the three generated RSPs of Covertype data. This value is close to the accuracy of the single model (0.768) built from the entire data with the same classification algorithm. However, the ensemble model performed worse than the single model because the data set is not big. The records in each RSP block are not enough to represent the entire data. We can see the bigger the RSP block, the better the ensemble model. We will see that the advantages of the RSP-based method are clear on bigger data sets. This indicates that the RSP-based method may not have advantage in the analysis of small data sets.
RSP-based estimation of mean and standard deviation for four features in Covertype data (using RSP blocks from Covertype B in batches of
RSP-based ensemble classifiers from Covertype data. Each point represents the overall accuracy (averaged from 100 runs) of the RSP-based ensemble model obtained after each batch. (a) Covertype A. (b) Covertype B. (c) Covertype C.
D. Results of Higgs Data
Three RSPs for HIGGS10 data were created as shown in Table 4. HIGGS Default is the default HDFS file of HIGGS with 60 blocks. The default HDFS file can be used as an RSP because the records in HIGGS are i.i.d. Other two RSPs were generated from the default HDFS file of HIGGS. Since HIGGS data set is much larger than Covertype, the computing cluster was used to analyze it. Same as Covertype, for each RSP, the asymptotic ensemble learning process was executed 100 times with
Data summary: Figures 8(a) and 8(b) show the means and standard deviations of 4 features in HIGGS data, respectively. We can observe the same trend as Covertype on the change of the error range, decreasing as more RSP blocks are added. The error range becomes insignificant after a few batches and the estimated values converge to the true values in the end.
Classification: Figure 9 shows how the accuracy of an RSP-based ensemble model changes with more RSP blocks. There is no significant change after using about 10-15% of the data. The accuracy of the ensemble model is close to, and even slightly higher than, the accuracy of the single model built from the whole data using the same classification algorithm. As we can see in this bigger data set, the results of three RSPs are similar but HIGGS B is slightly better because more RSP blocks are used given the same percentage of data. In this case, the size of individual RSP blocks make no bigger difference than the number of RSP blocks used in the ensemble.
RSP-based estimation of mean and standard deviation for four features in HIGGS data (using RSP blocks of HIGGS A in batches of
RSP-based ensemble classification of HIGGS data. Each point represents the overall accuracy (averaged from 100 runs) of the ensemble model after each batch. The dotted line represents the accuracy of a single model built from the entire HIGGS data. (a) HIGGS Default. (b) HIGGS A. (c) HIGGS B.
Figure 10(a) shows the computation times to obtain RSP-based classifiers from batches of 5 RSP blocks in the three HIGGS RSPs. As the figure shows, Spark task time represents the main computation time. Completing the job requires more time due to scheduling and the communication between Spark driver and executors. The communication between R Server and Spark driver adds significant overhead to the total batch. Furthermore, even when using all RSP blocks in one batch, which is generally not required, it still takes less time than required to learn a single model from the entire data as shown in Figure 10(b). Taking HIGGS default as an example, an RSP-based ensemble classifier can be obtained in about 1 minute or less using 20% of the data (i.e., 15 RSP blocks in one batch). This RSP-based ensemble model is equivalent to the single model built from the entire data.
Computation time for RSP-based classification of HIGGS data. (a) Using batches of 5 RSP blocks. (b) Using all RSP blocks in one batch. For comparison, the time required to build a single model from the entire data is shown on the far right.
E. Results of Taxi Data
Two RSPs were generated from NYC Taxi11 data as shown in Table 5. Only 125,000,000 records of this data set were used after preprocessing.
Same as HIGGS data, NYC Taxi data was analyzed on the computing cluster. For each RSP, the asymptotic ensemble learning process was run 50 times with
Data summary: Figures 11(a) and 11(b) show the means and standard deviations of 4 features in Taxi data, respectively. Properties of the ensemble estimates are the same as those of the two previous data sets, Covertype and HIGGS.
Regression: Figure 12 shows the Root Mean Square Error (RMSE) of RSP-based ensemble regression models to predict the tip amount in Taxi data. As in the classification example, there is no significant change after adding more base models. We can see also that there is no significant difference between the ensemble models and a single model built from the whole data (the dotted line in the figure) although the ensemble models are slightly better.
RSP-based estimation of mean and standard deviation for four features in Taxi data (using RSP blocks of Taxi B in batches of
RSP-based ensemble regression of Taxi data. Each point represents the RMSE error (averaged from 50 runs) of the ensemble model after each batch. The dotted line represents the error of a single model built from the entire data. (a) Taxi A. (b) Taxi B.
Figure 13(a) shows the computation time to obtain RSP-based regression models from batches of 10 RSP blocks. Similarly, Figure 13(b) shows the time with batches of 96 RSP blocks which are equal to the total number of cores assigned to Spark executors in the computing cluster. As we can see in Figure 13(c), even when using all RSP blocks in one batch, it takes shorter time comparing with the time required to obtain a single model from the entire data (about 23 minutes on the same cluster). Running the regression tree algorithm on the entire Taxi data required extra memory (executor overhead memory and Kryo serialization buffer should be increased so that Spark job finishes successfully). Taking Taxi A as an example, an RSP-based ensemble regression model can be obtained in less than 1 minute using 12% of the data (i.e., 20 RSP blocks as one batch). This RSP-based ensemble model is equivalent to the single model.
Computation time for RSP-based regression of Taxi data. (a) Using batches of 10 RSP blocks. (b) Using batches of 96 RSP blocks. (c) Using all RSP blocks in one batch. For comparison, the time required to build a single model from the entire data is shown on the far right.
Discussion
Although the distributed data-parallel model is used in the current mainstream big data frameworks, computing an entire big data set is not efficient due to the high computational costs of cluster computing and the ever-increasing volume of big data sets. However, as we have shown in this paper if a big data set is represented as a random sample partition, then, the analysis of the big data set is turned into the analysis of a subset of RSP blocks. After the conversion of a big data set into the RSP data model, the entire data set is no longer needed to be analyzed as a whole. Instead, the ensemble estimates and models can be computed from a subset of RSP blocks in a fully parallel manner on the computing cluster. The expensive record-level sampling process of taking random samples from a distributed big data file can also be replaced with the efficient block-level sampling. Since a big data set is turned into a more manageable space of small RSP blocks, this new strategy for big data analysis on computing clusters is more scalable to the increasing volume of big data sets. As we have shown in the experimental results, a small subset of RSP blocks is sufficient to obtain good approximate results. The main purpose of this new big data analysis strategy is not to obtain better results in comparison to other methods, but to use less data and less computing resources to obtain approximate results. We discuss several aspects of the RSP-based method as follows:
Reduce Computational costs: RSP-based method does not require loading and analyzing the entire data set. Memory limitation is not a critical factor anymore as RSP blocks are analyzed in batches and the size of an RSP block is small enough to be processed on a single node or core. Computation time can be decreased significantly as we only need to analyze small RSP blocks in a perfectly parallel manner. We have shown that even when many RSP blocks are put in one batch, they can still be processed by an analysis algorithm efficiently in parallel with much less time than building a single model from the entire data set using the parallel version of the same analysis algorithm. This is because each RSP block is computed locally on its computing node. Communication costs among the computing nodes are removed.
In our prototype, we use the existing mechanisms for distributed data-parallel computing in R Server and Spark without any modifications. It is clear that the main cost is the communication on a computing cluster. Thus, a batch of a few RSP blocks is more efficient to get good approximate results in less time and using less resources. The asymptotic ensemble learning process over batches of RSP blocks not only can avoid exceeding the limit of available resources but also can obtain base and ensemble results quickly. This approach is very useful in exploratory analysis of very big data sets.
Reuse existing algorithms and engines: The RSP-based method provides an efficient way for data analysts to run the existing data analysis and mining algorithms, such as those in R, on RSP blocks. They don’t need to re-implement parallel versions of these algorithms. Current distributed computation engines, e.g., Apache Spark, can be used to execute the analysis algorithms on each batch of RSP blocks on a computing cluster. Furthermore, RSP blocks are stored as HDFS blocks which can be directly used in the current HDFS-compatible frameworks.
Ensemble methods: To obtain an ensemble model, any RSP block can be used as a data component to get a base model. The key advantage here is that RSP blocks are already available and there is no need to conduct expensive sampling operations such as bootstrapping. Moreover, base models are built in a perfectly parallel manner from a set of small RSP blocks rather than building each base model from a big distributed bootstrap sample. In the two examples of decision and regression tree base models, all variables are used to build each base model. Further investigations are required to check the effect of randomly selecting a subset of variables for each base model to increase the diversity of base models as in Random Forests [49]. RSP-based ensembles can be obtained using different base algorithms and used in model serving systems [51] for further evaluation and model selection.
Data management: Higher-level big data processing and analysis systems can take advantage of the RSP block management layer to improve the efficiency and effectiveness in big data analysis. We believe that this work can open new research directions for improving the functionality of distributed big data frameworks, for example, scheduling algorithms and interactive analysis. It is essential that a big data system can schedule its data processing jobs in a statistically-aware manner (run the job on a subset of data blocks which satisfy certain distribution criterion) and a location-aware manner (run locally on the nodes that have enough resources), as well. In Alpha framework, we consider that data blocks are random samples of the big data set. However, some analysis tasks may require different partitioning strategies and different blocks selection criteria as well.
Size of RSP data blocks: Although the number of records in the RSP data blocks may affect the results, the effect was not significant according to the findings of our experiments, especially in the bigger data sets HIGGS and Taxi data. However, choosing the number of records, n, in an RSP block is equivalent to choosing the sampling rate when using random sampling techniques. This issue requires further investigations on different data sets and analysis tasks.
Application scenarios: Data scientists from different application areas use and design variety of algorithms for exploratory data analysis and predictive modeling [52]–[54]. The RSP-based method enables them to directly test their sequential algorithms on computing clusters and explore the findings from a subset of RSP blocks. The RSP-based method can also help in separating big data storage from big data analysis. RSP blocks can be loaded on or transferred to local machines or other computing clusters for analysis. As such, a separate computing cluster can be used to analyze data sets stored in different clusters by combining the outputs from different locations. If RSP blocks from different data centers have similar distributions, the analysis process can be applied locally in each data center to produce local results without moving data. Then, local results from different data centers are collected and combined to produce the final results for the entire data. If RSP blocks from different data centers have different probability distributions, a combination criterion is required to produce representative RSP blocks of the whole data set. Then, the combined blocks can be used to produce approximate results for the big data in different data centers. If
is an evolving data set, an RSP can be created from the collected data in a time window. If RSP blocks in subsequent time windows have similar probability distributions, these blocks together represent an RSP of the entire data in the considered windows. If data has different probability distributions, RSP blocks from these windows are combined to represent an RSP of the entire data in these windows. In fact, to analyze data in different windows we may not need to combine all RSP blocks from all windows. In a similar way to the cross data center case, block-level sampling is used to select RSP blocks from each window and combine them to produce RSP blocks of the entire data in all windows. We are currently experimenting the RSP-based method in these scenarios.$\mathbb {D}$
Related Works
Considering the ever-increasing volume of data, computing resources may never be enough to analyze an entire big data set all at once. Several recent works address this problem using the approximate computing paradigm. Approximate computing depends on representative samples of the entire data set to compute approximate results instead of exact ones. We summarize here some recent works in this direction and show the differences from the RSP-based method.
ApproxHadoop [8] uses multi-stage sampling to enable approximation using the MapReduce model. It depends on both data sampling and task dropping. For a certain MapReduce job (e.g., a data analysis task), a subset of data blocks is randomly selected at run-time (i.e., a subset of the map operations), then, a random sample is drawn from each selected block. Consequently, only some map operations are executed and each one runs on a sample of its assigned data block. In the RSP-based method, we only select a sample of RSP blocks, and run a single Spark job on these blocks to execute the target function independently on each selected block. This doesn’t require conducting any record-level sampling operations at run-time neither locally, in each data block, nor on the entire distributed data set.
Our work is similar to the Divide & Recombine [55] approach which enables big data analysis in R by applying existing statistical methods to the subsets of a large dataset on a computing cluster using Hadoop MapReduce. The authors argue that random-replicate division can be created from the data set using random sampling without replacement, and suggest that only a small number of the data subsets may be sufficient. However, the paper doesn’t provide technical details or experimental results on this point. In the RSP-based method, a key advantage is using only a few random sample data blocks to get approximate results from big data on computing clusters with limited resources. The asymptotic ensemble learning process enables incremental exploration and analysis. This process can be employed to improve the results gradually depending on the available resources and target application requirements. In fact, our method can be considered as a natural extension to the Divide & Recombine approach.
BlinkDB [13] is a distributed sampling-based approximate query engine that supports SQL-based aggregation queries with error and time bounds. It uses offline sampling to conduct uniform and stratified samples, then for a certain query, the best sample(s) are selected based on the query’s error and time constrains. After that, the query is executed on the selected sample(s) in parallel. In Alpha framework, the RSP blocks are themselves the samples which are created using an offline operation (i.e., the two-stage data partitioning operation) on a computing cluster. A sample of these blocks is selected randomly for a certain analysis function. Multiple samples of RSP blocks can also be used in batches to improve the results. Thus, the RSB-based method can also be conducted within error or time constrains so that the asymptotic ensemble learning process is finished when those constrains are met. While BlinkDB targets aggregate queries (e.g., Avg, sum, count, quantile), Alpha is a general framework for big data analysis, e.g., for exploratory data analysis and predictive modeling. Alpha framework can be extended to support arbitrary data analysis functions or interactive data analysis. Furthermore, the RSP-based result can be updated by appending new results from newly analyzed RSP blocks without recalculating all the result from scratch.
IncApprox [14] is a stream data analytics system which depends on both approximate and incremental computing to incrementally update an approximate result for data analysis tasks. In the RSP-based method, we update the previous result by appending new results from newly analyzed RSP blocks. However, our method doesn’t target stream data analysis for the time being. We are currently working to extend the RSP-based method to manage and analyze evolving data sets. In addition to IncApprox, there are other works on approximate stream data analysis such as StreamApprox [56] and MacroBase [15].
As we discussed before, the bag of little bootstraps (BLB) [39] enables conducting the bootstrapping process on small samples from the data set instead of samples of sizes comparable to the original data set size. In fact, RSP blocks can be used directly as subsamples in the BLB procedure without re-sampling from distributed big data sets many times. RSP blocks are created only once. Furthermore, the BLB procedure can be run on batches of RSP blocks, according to the available resources, until a satisfactory result is obtained. We are currently testing the performance of the BLB procedure with RSP blocks in Alpha framework.
The RSP-based method is different from other works on approximate big data analysis in three main aspects. First, random sample data blocks are created by converting the data set into an RSP only once. After that, any RSP block can be used directly as a random sample without any record-level sampling operations at run-time. A subset of RSP blocks is often enough to get approximate results as the experimental results show. Second, individual results from RSP blocks are combined to obtain approximate ensemble results which can be improved incrementally using the asymptotic ensemble learning process. Third, this method doesn’t need parallelized implementations of data analysis and mining algorithms.
Conclusions
In this paper, we introduced the RSP-based method for big data analysis. This method is based on the random sample partition model which preserves the statistical properties of the data set in each of its data blocks. Since RSP blocks are random samples of the entire data, they can be used directly to estimate the statistical properties of the entire data. Consequently, a subset of RSP blocks can be used to obtain ensemble estimates and models which are equivalent to those built from the entire data set. To enable efficient and effective big data analysis with RSP blocks, we proposed Alpha framework which consists of three main layers for data management, batch management, and data analysis. The experimental results of three real data sets showed that a subset of RSP blocks from an RSP is enough to obtain good approximate results for different tasks such as data summary, classification, and regression. We are currently testing Alpha framework for other tasks in big data exploration, cleaning, and clustering.