An Asymptotic Ensemble Learning Framework for Big Data Analysis | IEEE Journals & Magazine | IEEE Xplore

An Asymptotic Ensemble Learning Framework for Big Data Analysis


To analyze a sample of RSP blocks, f is applied in parallel to each selected RSP block and the outputs from these blocks are collected for evaluation, comparison, and fur...

Abstract:

In order to enable big data analysis when data volume goes beyond the available computing resources, we propose a new method for big data analysis. This method uses only ...Show More
Topic: Big Data Learning and Discovery

Abstract:

In order to enable big data analysis when data volume goes beyond the available computing resources, we propose a new method for big data analysis. This method uses only a few random sample data blocks of a big data set to obtain approximate results for the entire data set. The random sample partition (RSP) distributed data model is used to represent a big data set as a set of non-overlapping random sample data blocks. Each block is saved as an RSP data block file that can be used directly to estimate the statistical properties of the entire data set. A subset of RSP data blocks is randomly selected and analyzed with existing sequential algorithms in parallel. Then, the results from these blocks are combined to obtain ensemble estimates and models which can be improved gradually by appending new results from the newly analyzed RSP data blocks. To this end, we propose a distributed data-parallel framework (Alpha framework) and develop a prototype of this framework using Microsoft R Server packages and Hadoop distributed file system. The experimental results of three real data sets show that a subset of RSP data blocks of a data set is sufficient to obtain estimates and models which are equivalent to those computed from the entire data set.
Topic: Big Data Learning and Discovery
To analyze a sample of RSP blocks, f is applied in parallel to each selected RSP block and the outputs from these blocks are collected for evaluation, comparison, and fur...
Published in: IEEE Access ( Volume: 7)
Page(s): 3675 - 3693
Date of Publication: 23 December 2018
Electronic ISSN: 2169-3536

Funding Agency:

No metrics found for this document.

SECTION I.

Introduction

Distributed and parallel computing frameworks are essential to design and implement scalable big data analysis algorithms and pipelines that perform well with the increasing volume, variety and velocity of big data sets. The mainstream big data analysis frameworks [1]–​[6] use divide-and-conquer as a general strategy to manage and process big data on computing clusters. This strategy works by dividing a big data set into small data blocks, distributing these blocks on the data nodes of a computing cluster and running data-parallel operations on them. Nevertheless, the ability to analyze big data on computing clusters is limited to the available resources which may never be enough considering the speed of increase of data volume in different application domains [7]. This is particularly critical due to the significant computational costs to run complex data analysis algorithms on computing clusters, e.g., costs of communications, I/O operations and memory. Consequently, analyzing a big data set all at once may require more than the available resources in order to meet specific application requirements [8], [9]. Random sampling is a common strategy to alleviate these challenges [10], e.g., in approximate and incremental computing [8], [11]–​[15]. However, drawing random samples from big data is itself an expensive operation [16] especially with the shared-nothing architectures in the mainstream distributed computing frameworks for big data analysis.

In fact, a key question at the heart of big data analysis is whether the whole big data set should be analyzed or analysis of a subset of data is enough to explore the statistical properties of the big data set and discover patterns from it. If we could obtain approximate results for the entire data set by analyzing only a subset of its small data blocks in parallel to satisfy application requirements, we would be able to reduce the computational costs and make big data analysis not limited to the available resources. Our empirical study [17] showed that a few data blocks of a big data set are enough to obtain ensemble classification models which are approximate to those built from the entire data set. This observation can be generalized to other algorithms in the big data analysis pipeline, e.g., for exploratory data analysis, regression and clustering. We found that the precondition for approximate analysis of big data with some data blocks is that the probability distributions of data in these blocks have to be similar to the probability distribution of the entire data set. However, the data blocks generated by current data partitioning methods [16] do not necessarily satisfy the precondition because these methods do not consider the statistical properties of the data. For instance, when importing a big data file into HDFS, this file is sequentially divided (i.e., range partition) into HDFS data blocks. Consequently, using HDFS data blocks directly as random samples for data analysis may lead to statistically incorrect or biased results because these blocks are not, in general, random samples of the entire data set.

To tackle the above problem, we recently proposed to use a new strategy to divide a big data set into small random sample data blocks. Assume that $\mathbb {D}= \left \{{ {{{{x}}_{1}},{{{x}}_{2}}, \cdots ,{{{x}}_{N}}} }\right \}$ is a multivariate data set of N records where N is very large. To analyze $\mathbb {D}$ on a computing cluster, it is divided into small random sample data blocks (each with $n \ll N$ records) and distributed on the data nodes of the cluster. This new representation of big data is called the Random Sample Partition (RSP) distributed data model [18]. In this new data model, $\mathbb {D}$ is represented as a set of non-overlapping subsets. Each subset is a random sample of $\mathbb {D}$ and all these subsets form a partition of $\mathbb {D}$ . In mathematics, a partition of a set is a grouping of the set’s elements into non-empty subsets, in such a way that every element is included in one and only one of the subsets.1 Each subset is saved as an RSP data block file which can be loaded individually and used directly as a random sample to analyze $\mathbb {D}$ (we use the term RSP block to refer to an RSP data block). We formally proved that the expectation of the sample distribution function (s.d.f.) of each RSP block equals to the s.d.f. of $\mathbb {D}$ . We also showed that the estimates from an RSP block, e.g., mean, are unbiased and consistent estimators of the estimates of $\mathbb {D}$ . Therefore, RSP blocks can be used directly to estimate the statistical properties of $\mathbb {D}$ . On the other hand, we developed algorithms to convert big data files such as HDFS files into the RSP data model. With the RSP model, analysis of big data becomes an analysis of a few randomly selected RSP blocks. Selecting a random sample of RSP blocks, i.e., block-level sampling [19], is more efficient than drawing a random sample of individual records from a distributed data set. It is no longer necessary to load the entire data set in memory to draw a random sample.

In this paper, we propose a new method for big data analysis based on the RSP data model. Given an RSP of a big data set $\mathbb {D}$ , we follow three main steps to analyze $\mathbb {D}$ . First, a subset or sample of RSP blocks is randomly selected from the RSP of $\mathbb {D}$ . Second, these selected RSP blocks, usually evenly distributed on the data nodes of the computing cluster, are processed and analyzed independently on their local data nodes. Third, the results from the selected RSP blocks are collected and combined on the master node to produce an approximate result for the entire data set. Since this RSP-based method for big data analysis does not need to load and analyze all data blocks of $\mathbb {D}$ at once in memory, it is scalable to data sets whose sizes are much bigger than the memory size of the computing cluster. Block-level sampling replaces the expensive record-level sampling on a distributed data file. This makes the sampling-based analysis more efficient. Since each RSP block is processed and analyzed by a sequential algorithm independently on a data node, the parallel version of the algorithm is no longer needed. These advantages of the RSP-based method make big data analysis more efficient and enable small computing clusters to be used to analyze bigger data sets. To improve the accuracy of the analysis results, the three steps can also be repeated to get a result which is asymptotically close to the true result calculated from the entire $\mathbb {D}$ .

To enable data scientists to use the RSP-based method on computing clusters, we also propose Alpha framework as an extension to the early Asymptotic Ensemble Learning Framework [17]. Alpha framework is a distributed data-parallel framework for RSP-based big data analysis on computing clusters. It uses the RSP data model to represent big data sets stored in HDFS and runs data-parallel analysis operations on samples of RSP blocks. It provides key functionalities for management, sampling and analysis of RSP blocks of big data. We implemented a prototype system of Alpha framework using the RevoScaleR package of Microsoft R Server. In this paper, we demonstrate the performance of Alpha framework for exploratory data analysis, classification, and regression. We present the experimental results of three real data sets produced on a small computing cluster of 5 nodes. The results show that a few RSP blocks are sufficient to explore the statistical properties of the data set and produce approximate results equivalent to those computed from the entire data set.

The remainder of this paper is organized as follows. We review some preliminaries used in this research including basic definitions of the RSP model in Sect. II. After that, we introduce the RSP-based method for big data analysis in Sect. III. In Sect. IV, the design, architecture and prototype implementation of Alpha framework are described. Then, experimental results are discussed in Sect. V. In Sect. VI, we further discuss the RSP-based method and framework for big data analysis. Then, we summarize related works in Sect. VII. Finally, we draw some conclusions and state some future perspectives in Sect. VIII.

SECTION II.

Preliminaries

In this section, we first briefly review the current mainstream distributed computing frameworks and their common shortcomings in big data analysis. Then, we discuss the random sampling strategy in big data analysis and the technical difficulties of sampling from distributed big data. After that, we introduce the random sample partition data model for distributed data sets. Finally, we briefly review the ensemble methods in data analysis.

A. Mainstream Distributed Computing Frameworks

As data volume in different application areas goes beyond the petabyte scale, divide-and-conquer [20] is taken as a general strategy to process big data on computing clusters considering the recent advancements in distributed and parallel computing technology. In this strategy, a big data set is divided into small non-overlapping data blocks and distributed on the nodes of a computing cluster using a distributed file system such as Hadoop Distributed File System (HDFS) [21]. Then, data-parallel computations are run on these blocks considering data locality. After that, intermediate results from processed blocks are combined to produce the final result of the whole data set. This is usually done using the MapReduce computing model [1], [22] adopted by the mainstream big data frameworks such as Apache Hadoop,2 Apache Spark,3 and Microsoft R Server.4

These frameworks provide not only scalable implementations of data mining and machine learning algorithms but also high-level APIs for applications to make use of these algorithms to process and analyze big data. As a unified engine for big data processing, Apache Spark has been adopted in a variety of applications in both academia and industry [23], [24] after Hadoop MapReduce. It uses a new data abstraction and in-memory computation model, RDDs (Resilient Distributed Datasets) [2], where collections of objects (e.g., records) are partitioned across the data nodes of a cluster and processed in parallel. This in-memory computation model eliminates the cost of reading and writing local data blocks in each iteration of an algorithm. Therefore, it is very efficient for iterative algorithms in comparison to the MapReduce model. Microsoft R Server addresses the in-memory limitations of the open source statistical language, R, by parallel and chunk-wise distributed processing of data across multiple cores and nodes. It comes with the proprietary eXternal Data Frame (XDF) format and the Parallel External Memory Algorithms (PEMAs) framework for statistical analysis and machine learning. Both Apache Spark and Microsoft R Server operate on data stored in HDFS.

The MapReduce computing model is efficient for data analysis tasks that only scan the entire big data set once. However, for complex data analysis tasks that require to process the data set iteratively, Hadoop MapReduce becomes inefficient because of heavy I/O and communication costs [25]. Using the in-memory computation model of Apache Spark or the chunk-wise processing of Microsoft R Server, big data can be processed extremely fast if the entire big data set can be held in the memory of the cluster. However, if the memory is not large enough to hold all data blocks of a big data set, the computation will dramatically slow down. Therefore, the size of memory is a limit to these systems in analyzing big data. One common shortcoming of these frameworks in big data analysis is that the whole data set has to be processed because the distributed data blocks cannot be used as random samples of the big data. This is due to the fact that the probability distribution of the data is not considered when partitioning big data in these frameworks. In fact, data partitioning is a key problem in big data analytics because it affects the statistical quality of the distributed data blocks, and thus, the efficiency and effectiveness of big data queries and analysis algorithms [16], [26]–​[31]. The other shortcoming is that to get the correct result, algorithms need to be parallelized and run in parallel on the distributed data blocks. A lot of effort has to be spent on parallelizing algorithms. To overcome these shortcomings, we take a different strategy to make distributed data blocks as random samples of the big data set. Our objective is to enable the use of a subset of the distributed data blocks to produce approximate results without processing the whole data set in memory or parallelizing data analysis and mining algorithms.

B. Random Sampling

Random sampling is a basic strategy to analyze big data sets or unknown populations [10], [32]. We consider that a big data set is a random sample of the population in a certain application domain. For example, a big data set of customers is a random sample of the customer’s population in a company. Thus, we can use such data set to estimate the distribution of all customers in the company. For big data analysis, a data set is usually represented as a set of objects described by a set of attributes (i.e., features). For example, a multivariate data set $\mathbb {D}= \left \{{ {{{{x}}_{1}},{{{x}}_{2}}, \cdots ,{{{x}}_{N}}} }\right \}$ of N records and M features. In this case, the records are observations from the population and $\mathbb {D}$ itself is a sample of the target population. However, when $\mathbb {D}$ is very large to be analyzed entirely, statisticians often use small random samples to estimate and infer its statistical properties. In statistics, a random sample is defined as follows:

Definition 1 (Random Sample):

Let $\mathbb {D} = \left \{{ {{{{x}}_{1}}} }\right.,\,\,{{{x}}_{2}},\,\,\cdots , \left.{ {{{{x}}_{N}}} }\right \}$ be a big data set of N records. Let D be a subset of $\mathbb {D}$ containing n records chosen from $\mathbb {D}$ using a random process. D is a random sample of $\mathbb {D}$ if \begin{equation*} E[F(x)]=\mathbb {F}(x),\end{equation*} View SourceRight-click on figure for MathML and additional features. where $F(x)$ and $\mathbb {F}(x)$ denote the sample distribution functions of D and $\mathbb {D}$ , respectively. $E[F(x)]$ denotes the expectation of $F(x)$ .

A sample is drawn from $\mathbb {D}$ in order to obtain sample statistics (e.g., mean and variance) or statistical models (e.g., regression and classification models) of $\mathbb {D}$ . In fact, data scientists apply different quantitative and qualitative techniques (e.g., for exploratory data analysis, correlation analysis, dimension reduction) in different application areas [33]–​[35]. Thus, sampling is often a good strategy to test these techniques and understand the data. Furthermore, to assess the quality of a sample statistic (i.e., estimate the standard error), multiple samples can be drawn to obtain the sampling distribution of the statistic. The bootstrap [36] is a common way to estimate the sampling distribution of a statistic by drawing multiple samples with replacement from a data set and calculating the statistic for each sample [37], [38]. However, bootstrapping on big data sets requires high computational and storage costs as it depends on repeatedly drawing samples of sizes comparable to the original data set and computing estimations from all these samples. One method to alleviating these costs is the Bag of Little Bootstraps (BLB) [39]–​[41]. It depends on averaging estimations from bootstrapping on small samples. However, this still requires loading and scanning the entire data set to draw the required small samples each time an estimation procedure is conducted.

In general, obtaining a random sample from a distributed big data set is an expensive computational task [16]. It requires a complete scan of the distributed big data set which results in communication cost and local I/O costs. Instead of record-level sampling, block-level sampling considers that records in a big data set are aggregated into a set of data blocks, each containing a small set of records stored together. Block-level sampling is more efficient than record-level sampling as it requires accessing only a small number of blocks [42]. In this case, the selected blocks are considered as random samples of $\mathbb {D}$ . However, statistics built from block-level samples may not be as good as those from record-level sampling. Several solutions were proposed to reduce data bias in block-level samples in databases [19] and big data clusters [43]–​[45]. The problem occurs due to the inconsistent probability distributions in the distributed data blocks of a data set. To solve this problem, we make all data blocks of $\mathbb {D}$ as random samples using the RSP model.

C. Random Sample Partition Data Model

In this section, we introduce the random sample partition data model recently proposed to represent a big data set $\mathbb {D}$ as a set of non-overlapping random sample data blocks [18]. First, we formally define a partition of $\mathbb {D}$ as follows:

Definition 2 (Partition of Data Set):

Let $\mathbb {T}$ be an operation which divides $\mathbb {D}$ into a set of subsets ${{T}} = \left \{{ {{{{D}}_{1}},} }\right.{{{D}}_{2}}, \cdots ,\left.{ {{{{D}}_{K}}} }\right \}$ . T is called a partition of $\mathbb {D}$ if

  1. $\bigcup \limits _{k = 1}^{K} {{{{D}}_{k}}} = \mathbb {D}$ ;

  2. ${{{D}}_{i}} \cap {{{D}}_{j}} = \emptyset $ , when $i,j \in \left \{{ {1,2, \cdots ,K} }\right \}$ and $i \ne j$ .

Accordingly, $\mathbb {T}$ is called a partition operation on $\mathbb {D}$ and each ${{{{D}}_{k}}}(k=1,2,\cdots ,K)$ is called a data block of $\mathbb {D}$ .

According to this definition, many partitions exist for $\mathbb {D}$ . Every HDFS file is a partition of a given data set with a specific number of data blocks. In a similar way, a given data set can also be represented as a partition using an XDF file in Microsoft R server or an RDD in Apache Spark. However, the data blocks in a general partition of a big data set are not necessarily random samples of the data set. Consequently, they cannot be used for estimation and analysis of the big data set. Three methods are commonly used to divide data on computing clusters [16]: range, hash and random partition. In a range partition, data is divided according to a prescribed ranges. This is a good criterion for big data analysis if global sorting is required and the entire data set can be processed all at once. In a hash partition, records are hashed into subsets based on a key column in the data. This is usually used when ranges are not appropriate. In a random partition, data is distributed randomly based on the output of a random number generator.

To enable the distributed data blocks of $\mathbb {D}$ to be used as random samples in big data analysis, we have to represent $\mathbb {D}$ as a Random Sample Partition (RSP). An RSP of $\mathbb {D}$ is formally defined as follows:

Definition 3 (Random Sample Partition):

Assume $\mathbb {F}(x)$ is the sample distribution function (s.d.f.) of $\mathbb {D}$ . Let $\mathbb {T}$ be a partition operation on $\mathbb {D}$ and ${{T}} = \left \{{ {{{{D}}_{1}}} }\right.{{,}}\,\,{{{D}}_{2}},\,\,\cdots , \left.{ {{{{D}}_{K}}} }\right \}$ be a partition of $\mathbb {D}$ accordingly. T is called a random sample partition of $\mathbb {D}$ if \begin{equation*} E[F_{k}(x)]=\mathbb {F}(x)\quad {{ for~each}}~k=1,2,\cdots ,K,\end{equation*} View SourceRight-click on figure for MathML and additional features. where $F_{k}(x)$ denotes the sample distribution function of $ {D}_{k}$ and $E[F_{k}(x)]$ denotes its expectation. Accordingly, each $ {D}_{k}$ is called an RSP block of $\mathbb {D}$ and $\mathbb {T}$ is called an RSP operation on $\mathbb {D}$ . The representation of a distributed big data set in a random sample partition is defined as the random sample partition data model in [18].

We formally proved that the expectation of the sample distribution function (s.d.f.) of an RSP block equals to the s.d.f. of $\mathbb {D}$ . We also showed that the estimates from an RSP block, e.g., mean, are unbiased and consistent estimators of the estimates of $\mathbb {D}$ . We also found that if the records of a big data set are randomly organized, i.e., satisfying the i.i.d. condition, sequentially chunking the big data set into a set of data block files will make each data block a random sample of the big data. However, having a naturally randomized data set can only be taken as an exception rather than a rule because it depends on the data generation process. As a result, $\mathbb {T}$ should be implemented to randomize $\mathbb {D}$ and create RSP blocks as we describe later. In applications, $\mathbb {T}$ is applied to $\mathbb {D}$ only once. Then, block-level sampling is used to select RSP blocks as random samples for big data analysis. The block-level sampling from an RSP is defined as follows:

Definition 4 (Block-Level Sample):

Let T be the set of K RSP blocks of a big data set $\mathbb {D}$ and ${{S}} = \left \{{ {{{{D}}_{1}},} }\right.{{{D}}_{2}}, \cdots ,\left.{ {{{{D}}_{g}}} }\right \}$ a subset of T where $g < K$ . S is a block-level sample of T if ${{{{D}}_{1}},} {{{D}}_{2}}, \cdots , {{{{D}}_{g}}}$ are randomly selected with equal probability from T without replacement.

According to Definition 3, each RSP block in S is a random sample of $\mathbb {D}$ and can be used to estimate the statistical properties of $\mathbb {D}$ . In a computing cluster, the RSP blocks in S can be selected from different data nodes and processed independently. The results from these RSP blocks can be combined using ensemble methods to obtain a better result.

D. Ensemble Methods for Data Analysis

Ensemble methods are widely used in machine learning to build ensemble models, e.g., ensemble classifiers [46]. An ensemble model combines multiple base models built from different subsets or samples of the data using the same algorithm (data-centered ensembles) or different algorithms (model-centered ensembles). An ensemble model often outperforms the individual base models [47], [48]. One method for data-centered ensembles is bagging (bootstrap aggregation) which builds base models from multiple random samples drawn without replacement from the data set. This method is used in random forests [49] which uses decision trees as base models. There are different ways to combine base models in an ensemble model such as simple averaging (for regression) and voting (for classification). In this work, we use ensemble methods to obtain approximate results from big data by combining estimates or models computed from a sample of RSP blocks.

SECTION III.

Big Data Analysis with RSP Data Blocks

In this section, we propose the RSP-based method for big data analysis. This method uses RSP blocks from an RSP of a big data set $\mathbb {D}$ to explore the statistical properties of $\mathbb {D}$ and build ensemble classification and regression models. Given an RSP of $\mathbb {D}$ on a computing cluster, analysis of $\mathbb {D}$ will not be limited to the cluster memory because only a subset of RSP blocks is processed using this new method.

We use the notations in Table 1 to describe the key steps in this method. Among them, $\mathbb {D}$ is a multivariate data set of N records and M features, $\Theta _{\mathbb {D}}$ is an estimate of a statistical property (e.g., mean, variance) of $\mathbb {D}$ , and $\Pi _{\mathbb {D}}$ is a model (e.g., classification or regression) computed from $\mathbb {D}$ . Let f be a data analysis function to compute an estimate $\Theta _{\mathbb {D}}$ or a model $\Pi _{\mathbb {D}}$ of $\mathbb {D}$ . In practice, if $\mathbb {D}$ is very big, it will be difficult or expensive to apply f directly to $\mathbb {D}$ in case that the data volume of $\mathbb {D}$ exceeds the available memory of the cluster. In this situation, $\mathbb {D}$ is represented as an RSP T and the new method is used to analyze it. Instead of loading and analyzing all data blocks of $\mathbb {D}$ at once, only a subset of RSP blocks from T is randomly selected and analyzed. Let S be a block-level sample of g RSP blocks from T. The results from S’s blocks are combined to obtain an approximate result for $\mathbb {D}$ . $\Theta _{{S}}$ is an RSP-based ensemble estimate from S and $\Pi _{{S}}$ is an RSP-based ensemble model from S.

TABLE 1 Table of Notations
Table 1- 
Table of Notations

Figure 1 shows the main steps of the RSP-based method to analyze $\mathbb {D}$ . First, an RSP T of K RSP blocks is generated from $\mathbb {D}$ . Then, g RSP blocks are selected randomly from T without replacement. After that, these selected RSP blocks are analyzed in parallel as g independent computational tasks by running the same analysis function f on each selected RSP block. The individual results from the RSP blocks are collected for evaluation, comparison and further analysis. These results can be combined in different ways to form an ensemble result that can also be improved gradually by combining new results from other RSP blocks. In the following, we describe these steps and introduce the asymptotic ensemble learning process.

FIGURE 1. - To analyze a sample of RSP blocks, f is applied in parallel to each selected RSP block and the outputs from these blocks are collected for evaluation, comparison, and further analysis.
FIGURE 1.

To analyze a sample of RSP blocks, f is applied in parallel to each selected RSP block and the outputs from these blocks are collected for evaluation, comparison, and further analysis.

A. RSP Generation

An RSP is generated from $\mathbb {D}$ by randomizing the order of its records (i.e., i.i.d. observations) and sequentially dividing these records into a set of K RSP blocks which satisfy Definition 3. However, if the number of records N in $\mathbb {D}$ is very large, randomizing $\mathbb {D}$ is often computationally impractical. To enable the generation of an RSP T of K blocks from $\mathbb {D}$ on a computing cluster, we propose a two-stage data partitioning algorithm as follows.

  • Stage 1: Divide $\mathbb {D}$ into P non-overlapping subsets $\{ {{{D}}_{i}}\} _{i = 1}^{P}$ of equal size. These subsets form a partition of $\mathbb {D}$ .

  • Stage 2: Each RSP block is built using records from all the previous P subsets. This requires a distributed randomization operation as follows:

    • Step 1: Randomize each ${{{D}}_{i}}$ for $1 \leq i \leq P$ ;

    • Step 2: Cut each randomized ${{{D}}_{i}}$ into K slices $\{ {{D(}}i,j{{)}}\} _{j = 1}^{K}$ . These slices form an RSP of ${{{D}}_{i}}$ , for $1 \leq i \leq P$ ;

    • Step 3: From $P \times K$ slices ${\{{D}}(i,j)\}$ for $1 \leq i \leq P$ and $1 \leq j \leq K$ , merge the slices of $\{ {{D(}}i,j{{)}}\} _{i = 1}^{P}$ for $1 \leq j \leq K$ to form K subsets $\{ {{D(\cdot , }}j{{)}}\} _{j = 1}^{Q}$ . These subsets form an RSP of $\mathbb {D}$ .

More details on the design and implementation of this algorithm were reported recently in [50] with a proof that this algorithm produces an RSP of $\mathbb {D}$ according to Definition 3.

B. RSP Blocks Sampling

The RSP blocks of T are evenly distributed on the data nodes of a computing cluster. The distribution details of these RSP blocks, including their locations, are recorded in the metadata of T. To select g RSP blocks from T without replacement, the number of RSP blocks has to be evenly assigned to each data node and the given number of RSP blocks is randomly selected from the corresponding data node.

Algorithm 1 shows the RSP blocks sampling process. The $getNonSelectedBlocks()$ function is used to find the set of RSP blocks that have not been processed by f so far and saves it to $T_{nonSelected}$ . The function $sample\_{}withoutReplace()$ randomly selects g RSP blocks from $T_{nonSelected}$ and returns them to S. If the number of remaining non-selected RSP blocks is less than g, all remaining RSP blocks are assigned to S. The output S contains the locations of the selected RSP blocks. These locations are used to access the RSP blocks in the following analysis step. This algorithm can be refined to select RSP blocks in consideration of the availability of data nodes on a computing cluster.

Algorithm 1 RSP Blocks Sampling

Input:

  • T: RSP of $\mathbb {D}$ ;

  • g: number of RSP blocks to be selected;

  • f: data analysis function;

Method:

${{T}}_{nonSelected} \leftarrow getNonSelectedBlocks({{T}}, f)$

if $|{{T}}_{nonSelected}| > g $ then

${{S}} \leftarrow sample\_{}withoutReplace({{T}}_{nonSelected}, g)$

else

${{S}} \leftarrow {{T}}_{nonSelected}$

end if

Output:

  • S: a subset of RSP blocks from T;

C. RSP Blocks Analysis

After g RSP blocks are selected by Algorithm 1, these blocks are processed independently in parallel on the corresponding data nodes as described in Algorithm 2. After the execution, the individual outputs from these g RSP blocks are collected.

Algorithm 2 RSP Blocks Analysis

Input:

  • S: block-level sample with g RSP blocks;

  • f: data analysis function;

Operations:

for all $ {D_{q}} \in {S}$ do

Load $ {D_{q}}$ on its node and apply the function: $f( {D_{q}})$

end for

Collect the outputs from blocks: $\{f( {D_{q}})\}^{g}_{q=1}$

Output:

$\{f( {D_{q}})\}^{g}_{q=1}$

Depending on the requirements of the data analysis task (e.g., exploratory data analysis and predictive modeling), the collected outputs from this step are used for further investigation and analysis. The results from a sample of RSP blocks can be taken as preliminary insights. Further analysis can be done using other data analysis algorithms and methods such as ensemble methods.

D. RSP-Based Ensemble Estimates and Models

The outputs of Algorithm 2 are combined to produce ensemble estimates or models for $\mathbb {D}$ . For exploratory data analysis where f is an estimate function (e.g., mean or variance), an ensemble estimate is computed. For predictive modeling where f is an algorithm for learning a model from each RSP block (e.g., classification and regression), an ensemble model is formed.

1) RSP-Based Ensemble Estimates

Let $\theta _{q} = f( {D_{q}})$ be a base estimate computed from an RSP block $D_{q}$ . The base estimates from all RSP blocks in S are combined in an RSP-based ensemble estimate $\Theta _{{S}}$ using a given ensemble function. For example, if $\theta _{q}$ is the estimate of the mean or variance, the RSP-based ensemble estimate can be calculated by averaging the base estimates computed from all RSP blocks in S as follows:\begin{equation*} {\Theta _{{S}}} = {{avg}}{\left ({\{\theta _{q}\}_{q=1}^{g} }\right)}\tag{1}\end{equation*} View SourceRight-click on figure for MathML and additional features.

It can be proved that $\Theta _{{S}}$ converges to the true estimate $\Theta _{\mathbb {D}}$ when considering a sufficient number of RSP blocks, i.e., ${{E}}\left ({{ \Theta _{{S}}}}\right) = {\Theta _{\mathbb {D}}}$ where E denotes the expectation. In such case, we call $\Theta _{{S}}$ an asymptotic ensemble estimate.

2) RSP-Based Ensemble Models

Let $\pi _{q} = f( {D_{q}})$ be a base model built from an RSP block $D_{q}$ in S. The base models from all RSP blocks in S are combined in a RSP-based ensemble model $\Pi _{{S}}$ using a given ensemble function. We take classification and regression trees as examples of base models. For classification and regression, each record $({ {x}_{i}}, y_{i})$ is represented by a vector of features ${ {x}_{i}}$ and a response variable $y_{i}$ . If base models are classifiers, the ensemble class label y of a new record x is the statistical mode of the predictions obtained using the base classification models:\begin{equation*} y = \Pi _{{S}}({{x}}) = mode(\{\pi _{q}({{x}})\}_{q=1}^{g})\tag{2}\end{equation*} View SourceRight-click on figure for MathML and additional features.

If base models are regression trees, the ensemble response y is calculated by averaging the predictions from the base regression models on a new record $\textbf {x}$ :\begin{equation*} y = \Pi _{{S}}({{x}})=avg(\{\pi _{q}({{x}})\}_{q=1}^{g})\tag{3}\end{equation*} View SourceRight-click on figure for MathML and additional features.

In a similar way to the RSP-based ensemble estimates, it can be shown that $\Pi _{{S}}$ is equivalent to $\Pi _{\mathbb {D}}$ when considering a sufficient number of RSP blocks, i.e., the accuracy of $\Pi _{{S}}$ converges to the accuracy of $\Pi _{\mathbb {D}}$ . In this case, we call $\Pi _{{S}}$ an asymptotic ensemble model.

Asymptotic ensemble estimates and models can be built by combining the results from a sufficient number of RSP blocks. This can be done using either one large sample of RSP blocks or multiple small samples of RSP blocks depending on the available computing resources.

E. RSP-Based Asymptotic Ensemble Learning

The RSP-based asymptotic ensemble learning uses a stepwise process to learn an asymptotic ensemble model or estimate from multiple batches of RSP blocks. Each batch is randomly selected from T without replacement. We propose this process to avoid loading a large number of RSP blocks in one batch. This enables a computing cluster with limited resources to be scalable to bigger data sets.

Algorithm 3 illustrates the RSP-based asymptotic ensemble learning process. To get an asymptotic ensemble model ${\Pi }$ from T using a function f with batches of g RSP blocks, this algorithm works as follows:

  1. Blocks Selection: Algorithm 1 is used to select a sample S of g RSP blocks from T;

  2. Blocks Analysis: f is applied to each RSP block in S using Algorithm 2 to get a set of base models $\Pi _{{S}}$ .

  3. Ensemble Update: the new g base models are appended to the previously obtained models ${\Pi }$ , if any;

  4. Evaluation: if the current ensemble result does not fulfill the termination condition(s), go back to step 2 to select a new sample of RSP blocks. This process continues until a satisfactory result is obtained or all RSP blocks are used up. $\Omega ()$ is an evaluation function to check whether the ensemble model satisfies the application requirements (e.g., error bounds).

Algorithm 3 Asymptotic Ensemble Learning Algorithm

Input:

  • T: RSP of $\mathbb {D}$ ;

  • f: data analysis function;

  • g: number of RSP blocks to be selected in each batch;

Operations:

${\Pi } = \phi $ ;

1-

Blocks Selection: ${{S}} = BlocksSampling({{T}},g,f)$

2-

Blocks Analysis: $\Pi _{{S}} = BlocksAnalysis({{S}},f)$

3-

Ensemble Update: $\Pi $ = $\Pi $ + $\Pi _{{S}}$

4-

Evaluation:

If $\Omega (\Pi) < 0$ (or no more blocks)

Stop

Else

Go to 1

Output:

$\Pi $

Figure 2 illustrates the process of running this algorithm on multiple batches of RSP blocks. Each batch gets a different sample of RSP blocks. Each RSP block is only computed once by a given function f. The g base models are collected in $\Pi _{ {S}_{1}}$ from batch 1 and saved in $\Pi $ . Then, another g base models are collected in $\Pi _{ {S}_{2}}$ from batch 2 and appended to $\Pi $ . Then, the following g base models are collected in $\Pi _{ {S}_{3}}$ from batch 3 and appended to $\Pi $ , etc. This process continues until $\Pi $ satisfies the requirements. The same process can be used to obtain an asymptotic estimate $\Theta $ from T. This algorithm can be also refined to run for a certain number of batches or interactively in order to get preliminary results quickly.

FIGURE 2. - Several batches of RSP blocks are analyzed to gradually accumulate base and ensemble models. In each batch, different g RSP blocks are computed in parallel on their nodes using a given function f.
FIGURE 2.

Several batches of RSP blocks are analyzed to gradually accumulate base and ensemble models. In each batch, different g RSP blocks are computed in parallel on their nodes using a given function f.

This RSP-based method requires a distributed data-parallel computing framework to efficiently compute estimates and models from big data sets. For this purpose, we propose Alpha framework in the next section.

SECTION IV.

Alpha Framework

Alpha framework is a distributed data-parallel framework for RSP-based big data analysis. It is designed as an extension to the Asymptotic Ensemble Learning Framework [17]. Alpha framework enables big data analysis with RSP blocks on computing clusters using current distributed computing engines, distributed file systems and existing sequential data analysis and mining algorithms. In this section, we first describe the storage of RSP blocks in HDFS. Then, we propose a distributed data-parallel analysis model to implement the RSP-based method on a computing cluster. After that, we present the architecture design of Alpha framework. Finally, we describe a prototype implementation of Alpha Framework using Microsoft R Server packages.

A. Storage of RSP Data Blocks

In Alpha framework, we use HDFS distributed file system to store the RSP blocks of a big data set $\mathbb {D}$ . When storing $\mathbb {D}$ on HDFS, it is chunked into P HDFS blocks which are not necessarily random samples of $\mathbb {D}$ . An RSP generation algorithm is used to convert the HDFS file of $\mathbb {D}$ into a random sample partition T with K RSP blocks. As shown in Figure 3, T is stored as an HDFS file with each RSP block stored in one HDFS block. In this case, we call the new HDFS file as an HDFS-RSP file and its blocks as HDFS-RSP blocks.

FIGURE 3. - Storing RSP blocks in HDFS.
FIGURE 3.

Storing RSP blocks in HDFS.

When storing a data set in HDFS, the default number of data blocks depends on the number of records assigned to each HDFS block which depends on the HDFS block size (128 MB in our case). Consequently, the number of records may be slightly different between HDFS blocks of a data set. However, a fixed number of records can be assigned to each block when generating an HDFS-RSP file. The number of HDFS-RSP blocks K can be different from the original number of HDFS blocks P. In addition to the metadata provided by HDFS, statistical metadata (e.g., data summary) and lineage (e.g., log of previous analysis tasks) can be maintained for the HDFS-RSP file and its blocks.

The RSP distributed data model is structurally similar to the existing in-memory distributed data abstractions (e.g., RDD) and on-disk distributed data formats (e.g., XDF). Consequently, an HDFS-RSP file can be imported into an RDD or an XDF file as any other HDFS file. The key difference is that each RSP block is a random sample. Any RSP blocks can be randomly selected, loaded and analyzed independently and in parallel. Therefore, the RSP data model can be implemented as an extension to the current distributed data models. However, it should be implemented as a disk-based data format (such as XDF) in order to avoid loading unnecessary blocks into memory. In our prototype implementation, we use the XDF format to store RSP blocks in HDFS and test the RSP-based method on Microsoft R Server as we describe in Sect. IV-D.

B. Distributed Data-Parallel Analysis Model

The key idea in Alpha framework is running f on a sample of RSP blocks, as shown in Figure 1, instead of running it on all data blocks. We follow a simplified map-reduce model but with the possibility of selecting and computing only a subset of RSP blocks. We can consider f as a map operation which takes an RSP block as input and returns an estimate value or model as output. There is no need for a reduce operation because f is executed on each RSP block independently. Also, there is no need to parallelize f because it runs on a single data node. We just need a simple collect operation to collect the outputs from data nodes to the master node of a computing cluster. Data shuffling among the nodes of the cluster is not required. On a computing cluster, this distributed data-parallel analysis model is implemented as follows:

  1. A master process is initiated to run the main program on a master node;

  2. The master process invokes the blocks selection component to get a list of g randomly selected RSP blocks for the analysis task;

  3. The master process initiates a worker program for each selected RSP block on its node;

  4. Each worker runs f on its RSP block to produce the output from this block;

  5. The outputs from all g blocks are collected and returned to the master node;

  6. The master process checks if another batch of RSP blocks is required (depending on input parameters or evaluation criteria). If so, go to step 2;

  7. When no further batch is required or no more RSP blocks are available, the final outputs are saved.

In addition to the data-parallel analysis operations on RSP blocks, consensus operations are applied to the collected outputs from RSP blocks to produce ensemble models or estimates. This is done locally on a master or edge node of a computing cluster. The history of data-parallel analysis operations defines an RSP block’s lineage and the history of the applied consensus operations defines the HDFS-RSP file’s lineage.

C. Architecture

Figure 4 illustrates the architecture of Alpha framework which has three layers: data management, batch management, and data analysis. A distributed file system (e.g., HDFS) is required to store RSP blocks. A distributed computation engine (e.g., Microsoft R Server and Apache Spark) is also required to manage the execution of distributed data-parallel analysis operations.

FIGURE 4. - High-level architecture of Alpha Framework for RSP-based big data analysis.
FIGURE 4.

High-level architecture of Alpha Framework for RSP-based big data analysis.

1) Data Management

A fundamental data management layer is required to manage HDFS-RSP files, metadata, statistics and lineages. This layer provides operations for data partitioning and data access. These operations can be used by higher-level components or other computing frameworks.

  • Data partitioning: $\mathbb {D}$ is converted into a random sample partition T using the two-stage algorithm as described in Sect.III-A. This operation is executed only once to create an HDFS-RSP file from $\mathbb {D}$ .

  • Data access: This component is used to provide access to RSP blocks and their metadata, statistics and lineage.

2) Batch Management

In order to conduct distributed data-parallel analysis on selected RSP blocks, Alpha framework provides the following operations:

  • Blocks selection: For a given data analysis function f, a selection process is initiated to draw a sample S of g blocks from T as described in Algorithm 1. A selection index is created for each f where each RSP block of T has its key and selection status. The status of each RSP block, selected or not, is maintained for f.

  • Batch execution: A sample of RSP blocks is used as one batch. The data analysis function f is applied to the selected blocks in a perfectly parallel manner as described in Algorithm 2. This is done using existing distributed computation engines where a master process manages the execution as described in Sect. IV-B.

3) Blocks Analysis

This component is used as a library of analysis functions which can be invoked to analyze RSP blocks. As each RSP block is small enough to be processed on a single core or node, most existing implementations of data analysis and mining algorithms can be used directly to analyze RSP blocks. Alpha framework doesn’t need new parallel implementations of these algorithms. These functions can perform different data analysis tasks such as data summaries (e.g., min, max, mean, standard deviation, median) and data mining (e.g., classification, regression, clustering, outlier detection). This component can be used directly to obtain results from RSP blocks. It is also used as part of the asymptotic ensemble learning process to obtain results from multiple samples of RSP blocks.

4) Ensemble Analysis

This component is used to obtain RSP-based ensembles from a sample of RSP blocks. It is also used to start the asymptotic ensemble learning process and gradually build an RSP-based asymptotic ensemble as described in Algorithm 3. The user specifies the analysis task, analysis function f, ensemble function, blocks selection criteria, evaluation criteria, termination criteria and other task-specific parameters. Then, the blocks selection component is used to manage the selection of blocks for this task. After that, f is invoked from the blocks analysis component and runs on the selected blocks using the batch execution component. Base results from RSP blocks are combined using the specified ensemble function to form an ensemble result.

D. Prototype Implementation

We implemented a prototype of Alpha framework using Microsoft R Server with Apache Spark and HDFS. This prototype uses the RevoScaleR package (ScaleR) of Microsoft R Server due to its two key advantages. First, it comes with the disk-based XDF distributed data format which is appropriate to store RSP blocks. Second, the rxExec mechanism enables running a function in parallel using the available cores or nodes with the possibility of assigning different inputs and parameters for each time the function is executed. In this way, RevoScaleR enables running existing R functions in distributed and parallel environments. For the time being, all the key functionalities of Alpha framework were prototyped using RevoScaleR except for the RSP generation algorithm. This algorithm was implemented using Apache Spark as shown in [50]. However, this algorithm generates an HDFS-RSP file which can be imported into XDF format as any other HDFS file.

In this prototype, we consider that an HDFS-RSP file of $\mathbb {D}$ is stored in HDFS. First, this HDFS file is imported into the XDF format. Then, the XDF file is split into separate XDF blocks/files where each XDF block is stored as a single HDFS block. These distributed XDF blocks are the RSP blocks from which randomly selected blocks are loaded and analyzed in batches. The execution of each batch is done in parallel on a Spark cluster using RevoScaleR’s rxExec5 mechanism which works as follows. If there is no existing Spark application, a new application is initiated on the cluster. For each call of rxExec (i.e., each batch of RSP blocks), a single Spark job is created with one stage as shown in Figure 5. The number of tasks in this stage is the number of selected RSP blocks, for example, g RSP blocks of sample ${ {S}_{1}}$ . Each task (i.e., each RSP block) is assigned to one of Spark executor cores. When all tasks are completed, a list of the outputs, $\Pi _{ {S}_{1}}$ , is returned. With rxExec, we need to consider the computation time on three levels:

  • Task time is the time for running the target function f on an RSP block.

  • Job time is the time for running the target function in parallel on a batch of RSP blocks. It includes the communication time between Spark driver and executors, and the time for the parallel tasks;

  • rxExec time (batch total time) is the time for running rxExec on a batch of RSP blocks. In addition to Spark job time, the total time includes the communication time between R Server and Spark driver.

FIGURE 5. - Using rxExec to run f on a sample of g RSP blocks. Each rxExec call initiates a Spark job with a single stage of g tasks.
FIGURE 5.

Using rxExec to run f on a sample of g RSP blocks. Each rxExec call initiates a Spark job with a single stage of g tasks.

SECTION V.

Experiments and Results

In this section, we present the experimental results of three public real data sets to demonstrate the performance of the new RSP-based method for big data analysis. The characteristics of the data sets are shown in Table 2. First, we describe the experiment settings and the evaluation measures. Then, we present the results of each data set.

TABLE 2 Characteristics of the Data Sets Used in Our Experiments
Table 2- 
Characteristics of the Data Sets Used in Our Experiments

A. Experiment Settings

We used a computing cluster of 5 nodes. Each node has 12 cores (24 with Hyper-threading), 128 GB RAM and 1.1 TB disk space. Apache Hadoop 2.6.0, Apache Spark 1.6.0, and Microsoft R Server 9.1 were installed on the cluster and used in the experiment. Spark applications were run with 48 executors (2 cores and 4GB memory each) and HDFS was used with 128MB HDFS block size. In these settings, the total number of executor cores is 96 cores (i.e., the number of cores per executor $\times $ number of executors). Thus, a maximum number of 96 tasks, i.e., RSP blocks, can be processed in parallel on this cluster. If the number of RSP blocks in a batch is more than 96, the extra blocks will be delayed until some cores are available.

B. Evaluation Measures

For each data set, several RSPs were created with different numbers of RSP blocks. For each RSP, an ensemble estimate or model was built gradually using Algorithm 3 until all RSP blocks were used up and all estimates or models were combined in one ensemble. We used this algorithm for three data analysis tasks: data summary, classification and regression. RevoScaleR’s rxSummary6 function was used to compute summary statistics. RevoScaleR’s rxDTree7 function was used to build decision and regression trees. The rxDTree function was used with the default parameters8: minSplit = max(20, sqrt(numObs)), minBucket = round(minSplit/3), maxDepth = 10, cp = 0, maxCompete = 0, maxSurrogate = 0, useSurrogate = 2, surrogateStyle = 0, and xVal = 2.

Ensemble results were evaluated after each batch to check how the results change with more RSP blocks until all blocks were used up. To show the significance of the RSP-based results, we repeated the asymptotic ensemble learning process many times on each RSP model (100 times in most cases) and the averages of these results were calculated. In the following subsections, we use error bars to show the range of estimated values from multiple runs in each batch. To measure the performance of a model, we used the overall accuracy for classification and the Root Mean Square Error (RMSE) for regression.

As each RSP block is a random sample of the entire data set, any RSP block can be used as a holdout testing data to test the models. To get consistent results, we used the same holdout data to test models from different RSPs of the same data set. The overall accuracy of a classifier is defined as:\begin{equation*} Acc = \frac {tp+tn}{tp+tn+fp+fn}\tag{4}\end{equation*} View SourceRight-click on figure for MathML and additional features. where tp is the number of true positive predictions, tn is the number of true negative predictions, fp is the number of false positive predictions, and fn is the number of false negative predictions.

The RMSE of the predicted values $\hat {y}_{i}$ for records $x_{i}$ by a regression model is computed for n different predictions as the square root of the mean of the squares of the deviations:\begin{equation*} RMSE = \sqrt {\frac {\sum _{i=1}^{n}{{(\hat {y}_{i}-y_{i})}^{2}}}{n}}\tag{5}\end{equation*} View SourceRight-click on figure for MathML and additional features.

A small number of RSP blocks was used in each batch ($g=2$ for Covertype, $g=5$ for HIGGS, and $g=10$ for Taxi), just for illustrative purposes. In practice, as far as the number of RSP blocks in a batch is equal to or smaller than the number of the available cores, i.e., 96 in our cluster settings, the computation time of the batch may not vary a lot. However, having many RSP blocks in a batch may require extra overhead to collect the outputs by R Server. In these experiments, we ran the asymptotic ensemble learning process until all RSP blocks in an RSP were used up to show the changes in accuracies or errors when adding more RSP blocks. However, in real applications, we do not need to use all RSP blocks because good approximate results can be obtained after a few batches of RSP blocks as the results show. Stopping criteria can be defined to automatically terminate the ensemble learning process after the ensemble result satisfies the application requirements. In the following results, we compare the RSP-based ensemble estimates with the true values computed from the entire data. We also show the accuracy and time of single models in comparison to RSP-based ensemble models. A single model is a model built from the entire data set. An RSP-based ensemble model is an ensemble model built from a sample of RSP blocks.

C. Results of Covertype Data

Covertype9 is a small data set with 581,012 records, 54 features and seven categories of forests type. We experimented on it to demonstrate the properties of this RSP-based method on small data in comparison with bigger data. This experiment was conducted on a single machine with multiple cores.

As we demonstrated in our empirical study [17], records in Covertype are not randomized. In this experiment, three different RSPs were generated from Covertype data as shown in Table 3. K is the number of RSP blocks and n is the number of records in each RSP block. The asymptotic ensemble learning process was run 100 times on each RSP model with $g=2$ RSP blocks in each batch. The results are summarized as follows:

  • Data summary: Figure 6(a) shows the estimated means of 4 features in Covertype data. Each point represents the average of the computed means from RSP blocks used up to that point. We can see that the error range decreases as more RSP blocks are used in the ensemble estimate. The error range becomes insignificant after a few batches and the estimated mean value converges to the true mean value in the end. Similar results of the estimated standard deviations for the same features can be observed in Figure 6(b).

  • Classification: Figure 7 shows how the RSP-based ensemble model accuracy changes after each batch until all RSP blocks are used up. As we can see, an ensemble model with an accuracy in the range of 0.74–0.75 was obtained using only 20% of Covertype data from any of the three generated RSPs of Covertype data. This value is close to the accuracy of the single model (0.768) built from the entire data with the same classification algorithm. However, the ensemble model performed worse than the single model because the data set is not big. The records in each RSP block are not enough to represent the entire data. We can see the bigger the RSP block, the better the ensemble model. We will see that the advantages of the RSP-based method are clear on bigger data sets. This indicates that the RSP-based method may not have advantage in the analysis of small data sets.

TABLE 3 RSPs of Covertype Data
Table 3- 
RSPs of Covertype Data
FIGURE 6. - RSP-based estimation of mean and standard deviation for four features in Covertype data (using RSP blocks from Covertype B in batches of 
$g=2$
 blocks). Each point represents the estimated value after each batch (averaged from 100 runs). The dotted line represents the true value calculated from the entire Covertype data. (a) Mean. (b) Standard Deviation.
FIGURE 6.

RSP-based estimation of mean and standard deviation for four features in Covertype data (using RSP blocks from Covertype B in batches of $g=2$ blocks). Each point represents the estimated value after each batch (averaged from 100 runs). The dotted line represents the true value calculated from the entire Covertype data. (a) Mean. (b) Standard Deviation.

FIGURE 7. - RSP-based ensemble classifiers from Covertype data. Each point represents the overall accuracy (averaged from 100 runs) of the RSP-based ensemble model obtained after each batch. (a) Covertype A. (b) Covertype B. (c) Covertype C.
FIGURE 7.

RSP-based ensemble classifiers from Covertype data. Each point represents the overall accuracy (averaged from 100 runs) of the RSP-based ensemble model obtained after each batch. (a) Covertype A. (b) Covertype B. (c) Covertype C.

D. Results of Higgs Data

Three RSPs for HIGGS10 data were created as shown in Table 4. HIGGS Default is the default HDFS file of HIGGS with 60 blocks. The default HDFS file can be used as an RSP because the records in HIGGS are i.i.d. Other two RSPs were generated from the default HDFS file of HIGGS. Since HIGGS data set is much larger than Covertype, the computing cluster was used to analyze it. Same as Covertype, for each RSP, the asymptotic ensemble learning process was executed 100 times with $g=5$ RSP blocks in each batch. The average results were calculated from 100 repetitions for both data summary and classification. The results are summarized as follows:

  • Data summary: Figures 8(a) and 8(b) show the means and standard deviations of 4 features in HIGGS data, respectively. We can observe the same trend as Covertype on the change of the error range, decreasing as more RSP blocks are added. The error range becomes insignificant after a few batches and the estimated values converge to the true values in the end.

  • Classification: Figure 9 shows how the accuracy of an RSP-based ensemble model changes with more RSP blocks. There is no significant change after using about 10-15% of the data. The accuracy of the ensemble model is close to, and even slightly higher than, the accuracy of the single model built from the whole data using the same classification algorithm. As we can see in this bigger data set, the results of three RSPs are similar but HIGGS B is slightly better because more RSP blocks are used given the same percentage of data. In this case, the size of individual RSP blocks make no bigger difference than the number of RSP blocks used in the ensemble.

TABLE 4 RSPs of HIGGS Data
Table 4- 
RSPs of HIGGS Data
FIGURE 8. - RSP-based estimation of mean and standard deviation for four features in HIGGS data (using RSP blocks of HIGGS A in batches of 
$g=5$
 blocks). Each point represents the estimated value after each batch (averaged from 100 runs). The dotted line represents the true value calculated from the entire HIGGS data. (a) Mean. (b) Standard Deviation.
FIGURE 8.

RSP-based estimation of mean and standard deviation for four features in HIGGS data (using RSP blocks of HIGGS A in batches of $g=5$ blocks). Each point represents the estimated value after each batch (averaged from 100 runs). The dotted line represents the true value calculated from the entire HIGGS data. (a) Mean. (b) Standard Deviation.

FIGURE 9. - RSP-based ensemble classification of HIGGS data. Each point represents the overall accuracy (averaged from 100 runs) of the ensemble model after each batch. The dotted line represents the accuracy of a single model built from the entire HIGGS data. (a) HIGGS Default. (b) HIGGS A. (c) HIGGS B.
FIGURE 9.

RSP-based ensemble classification of HIGGS data. Each point represents the overall accuracy (averaged from 100 runs) of the ensemble model after each batch. The dotted line represents the accuracy of a single model built from the entire HIGGS data. (a) HIGGS Default. (b) HIGGS A. (c) HIGGS B.

Figure 10(a) shows the computation times to obtain RSP-based classifiers from batches of 5 RSP blocks in the three HIGGS RSPs. As the figure shows, Spark task time represents the main computation time. Completing the job requires more time due to scheduling and the communication between Spark driver and executors. The communication between R Server and Spark driver adds significant overhead to the total batch. Furthermore, even when using all RSP blocks in one batch, which is generally not required, it still takes less time than required to learn a single model from the entire data as shown in Figure 10(b). Taking HIGGS default as an example, an RSP-based ensemble classifier can be obtained in about 1 minute or less using 20% of the data (i.e., 15 RSP blocks in one batch). This RSP-based ensemble model is equivalent to the single model built from the entire data.

FIGURE 10. - Computation time for RSP-based classification of HIGGS data. (a) Using batches of 5 RSP blocks. (b) Using all RSP blocks in one batch. For comparison, the time required to build a single model from the entire data is shown on the far right.
FIGURE 10.

Computation time for RSP-based classification of HIGGS data. (a) Using batches of 5 RSP blocks. (b) Using all RSP blocks in one batch. For comparison, the time required to build a single model from the entire data is shown on the far right.

E. Results of Taxi Data

Two RSPs were generated from NYC Taxi11 data as shown in Table 5. Only 125,000,000 records of this data set were used after preprocessing.

TABLE 5 RSPs of NYC Taxi Data
Table 5- 
RSPs of NYC Taxi Data

Same as HIGGS data, NYC Taxi data was analyzed on the computing cluster. For each RSP, the asymptotic ensemble learning process was run 50 times with $g=10$ RSP blocks in each batch. The average results of 50 runs were calculated for both data summary and regression models. The results are summarized as follows:

  • Data summary: Figures 11(a) and 11(b) show the means and standard deviations of 4 features in Taxi data, respectively. Properties of the ensemble estimates are the same as those of the two previous data sets, Covertype and HIGGS.

  • Regression: Figure 12 shows the Root Mean Square Error (RMSE) of RSP-based ensemble regression models to predict the tip amount in Taxi data. As in the classification example, there is no significant change after adding more base models. We can see also that there is no significant difference between the ensemble models and a single model built from the whole data (the dotted line in the figure) although the ensemble models are slightly better.

FIGURE 11. - RSP-based estimation of mean and standard deviation for four features in Taxi data (using RSP blocks of Taxi B in batches of 
$g=10$
 blocks). Each point represents the estimated value after each batch (averaged from 50 runs). The dotted line represents the true value calculated from the entire data. (a) Mean. (b) Standard Deviation.
FIGURE 11.

RSP-based estimation of mean and standard deviation for four features in Taxi data (using RSP blocks of Taxi B in batches of $g=10$ blocks). Each point represents the estimated value after each batch (averaged from 50 runs). The dotted line represents the true value calculated from the entire data. (a) Mean. (b) Standard Deviation.

FIGURE 12. - RSP-based ensemble regression of Taxi data. Each point represents the RMSE error (averaged from 50 runs) of the ensemble model after each batch. The dotted line represents the error of a single model built from the entire data. (a) Taxi A. (b) Taxi B.
FIGURE 12.

RSP-based ensemble regression of Taxi data. Each point represents the RMSE error (averaged from 50 runs) of the ensemble model after each batch. The dotted line represents the error of a single model built from the entire data. (a) Taxi A. (b) Taxi B.

Figure 13(a) shows the computation time to obtain RSP-based regression models from batches of 10 RSP blocks. Similarly, Figure 13(b) shows the time with batches of 96 RSP blocks which are equal to the total number of cores assigned to Spark executors in the computing cluster. As we can see in Figure 13(c), even when using all RSP blocks in one batch, it takes shorter time comparing with the time required to obtain a single model from the entire data (about 23 minutes on the same cluster). Running the regression tree algorithm on the entire Taxi data required extra memory (executor overhead memory and Kryo serialization buffer should be increased so that Spark job finishes successfully). Taking Taxi A as an example, an RSP-based ensemble regression model can be obtained in less than 1 minute using 12% of the data (i.e., 20 RSP blocks as one batch). This RSP-based ensemble model is equivalent to the single model.

FIGURE 13. - Computation time for RSP-based regression of Taxi data. (a) Using batches of 10 RSP blocks. (b) Using batches of 96 RSP blocks. (c) Using all RSP blocks in one batch. For comparison, the time required to build a single model from the entire data is shown on the far right.
FIGURE 13.

Computation time for RSP-based regression of Taxi data. (a) Using batches of 10 RSP blocks. (b) Using batches of 96 RSP blocks. (c) Using all RSP blocks in one batch. For comparison, the time required to build a single model from the entire data is shown on the far right.

SECTION VI.

Discussion

Although the distributed data-parallel model is used in the current mainstream big data frameworks, computing an entire big data set is not efficient due to the high computational costs of cluster computing and the ever-increasing volume of big data sets. However, as we have shown in this paper if a big data set is represented as a random sample partition, then, the analysis of the big data set is turned into the analysis of a subset of RSP blocks. After the conversion of a big data set into the RSP data model, the entire data set is no longer needed to be analyzed as a whole. Instead, the ensemble estimates and models can be computed from a subset of RSP blocks in a fully parallel manner on the computing cluster. The expensive record-level sampling process of taking random samples from a distributed big data file can also be replaced with the efficient block-level sampling. Since a big data set is turned into a more manageable space of small RSP blocks, this new strategy for big data analysis on computing clusters is more scalable to the increasing volume of big data sets. As we have shown in the experimental results, a small subset of RSP blocks is sufficient to obtain good approximate results. The main purpose of this new big data analysis strategy is not to obtain better results in comparison to other methods, but to use less data and less computing resources to obtain approximate results. We discuss several aspects of the RSP-based method as follows:

  • Reduce Computational costs: RSP-based method does not require loading and analyzing the entire data set. Memory limitation is not a critical factor anymore as RSP blocks are analyzed in batches and the size of an RSP block is small enough to be processed on a single node or core. Computation time can be decreased significantly as we only need to analyze small RSP blocks in a perfectly parallel manner. We have shown that even when many RSP blocks are put in one batch, they can still be processed by an analysis algorithm efficiently in parallel with much less time than building a single model from the entire data set using the parallel version of the same analysis algorithm. This is because each RSP block is computed locally on its computing node. Communication costs among the computing nodes are removed.

    In our prototype, we use the existing mechanisms for distributed data-parallel computing in R Server and Spark without any modifications. It is clear that the main cost is the communication on a computing cluster. Thus, a batch of a few RSP blocks is more efficient to get good approximate results in less time and using less resources. The asymptotic ensemble learning process over batches of RSP blocks not only can avoid exceeding the limit of available resources but also can obtain base and ensemble results quickly. This approach is very useful in exploratory analysis of very big data sets.

  • Reuse existing algorithms and engines: The RSP-based method provides an efficient way for data analysts to run the existing data analysis and mining algorithms, such as those in R, on RSP blocks. They don’t need to re-implement parallel versions of these algorithms. Current distributed computation engines, e.g., Apache Spark, can be used to execute the analysis algorithms on each batch of RSP blocks on a computing cluster. Furthermore, RSP blocks are stored as HDFS blocks which can be directly used in the current HDFS-compatible frameworks.

  • Ensemble methods: To obtain an ensemble model, any RSP block can be used as a data component to get a base model. The key advantage here is that RSP blocks are already available and there is no need to conduct expensive sampling operations such as bootstrapping. Moreover, base models are built in a perfectly parallel manner from a set of small RSP blocks rather than building each base model from a big distributed bootstrap sample. In the two examples of decision and regression tree base models, all variables are used to build each base model. Further investigations are required to check the effect of randomly selecting a subset of variables for each base model to increase the diversity of base models as in Random Forests [49]. RSP-based ensembles can be obtained using different base algorithms and used in model serving systems [51] for further evaluation and model selection.

  • Data management: Higher-level big data processing and analysis systems can take advantage of the RSP block management layer to improve the efficiency and effectiveness in big data analysis. We believe that this work can open new research directions for improving the functionality of distributed big data frameworks, for example, scheduling algorithms and interactive analysis. It is essential that a big data system can schedule its data processing jobs in a statistically-aware manner (run the job on a subset of data blocks which satisfy certain distribution criterion) and a location-aware manner (run locally on the nodes that have enough resources), as well. In Alpha framework, we consider that data blocks are random samples of the big data set. However, some analysis tasks may require different partitioning strategies and different blocks selection criteria as well.

  • Size of RSP data blocks: Although the number of records in the RSP data blocks may affect the results, the effect was not significant according to the findings of our experiments, especially in the bigger data sets HIGGS and Taxi data. However, choosing the number of records, n, in an RSP block is equivalent to choosing the sampling rate when using random sampling techniques. This issue requires further investigations on different data sets and analysis tasks.

  • Application scenarios: Data scientists from different application areas use and design variety of algorithms for exploratory data analysis and predictive modeling [52]–​[54]. The RSP-based method enables them to directly test their sequential algorithms on computing clusters and explore the findings from a subset of RSP blocks. The RSP-based method can also help in separating big data storage from big data analysis. RSP blocks can be loaded on or transferred to local machines or other computing clusters for analysis. As such, a separate computing cluster can be used to analyze data sets stored in different clusters by combining the outputs from different locations. If RSP blocks from different data centers have similar distributions, the analysis process can be applied locally in each data center to produce local results without moving data. Then, local results from different data centers are collected and combined to produce the final results for the entire data. If RSP blocks from different data centers have different probability distributions, a combination criterion is required to produce representative RSP blocks of the whole data set. Then, the combined blocks can be used to produce approximate results for the big data in different data centers. If $\mathbb {D}$ is an evolving data set, an RSP can be created from the collected data in a time window. If RSP blocks in subsequent time windows have similar probability distributions, these blocks together represent an RSP of the entire data in the considered windows. If data has different probability distributions, RSP blocks from these windows are combined to represent an RSP of the entire data in these windows. In fact, to analyze data in different windows we may not need to combine all RSP blocks from all windows. In a similar way to the cross data center case, block-level sampling is used to select RSP blocks from each window and combine them to produce RSP blocks of the entire data in all windows. We are currently experimenting the RSP-based method in these scenarios.

SECTION VII.

Related Works

Considering the ever-increasing volume of data, computing resources may never be enough to analyze an entire big data set all at once. Several recent works address this problem using the approximate computing paradigm. Approximate computing depends on representative samples of the entire data set to compute approximate results instead of exact ones. We summarize here some recent works in this direction and show the differences from the RSP-based method.

  • ApproxHadoop [8] uses multi-stage sampling to enable approximation using the MapReduce model. It depends on both data sampling and task dropping. For a certain MapReduce job (e.g., a data analysis task), a subset of data blocks is randomly selected at run-time (i.e., a subset of the map operations), then, a random sample is drawn from each selected block. Consequently, only some map operations are executed and each one runs on a sample of its assigned data block. In the RSP-based method, we only select a sample of RSP blocks, and run a single Spark job on these blocks to execute the target function independently on each selected block. This doesn’t require conducting any record-level sampling operations at run-time neither locally, in each data block, nor on the entire distributed data set.

  • Our work is similar to the Divide & Recombine [55] approach which enables big data analysis in R by applying existing statistical methods to the subsets of a large dataset on a computing cluster using Hadoop MapReduce. The authors argue that random-replicate division can be created from the data set using random sampling without replacement, and suggest that only a small number of the data subsets may be sufficient. However, the paper doesn’t provide technical details or experimental results on this point. In the RSP-based method, a key advantage is using only a few random sample data blocks to get approximate results from big data on computing clusters with limited resources. The asymptotic ensemble learning process enables incremental exploration and analysis. This process can be employed to improve the results gradually depending on the available resources and target application requirements. In fact, our method can be considered as a natural extension to the Divide & Recombine approach.

  • BlinkDB [13] is a distributed sampling-based approximate query engine that supports SQL-based aggregation queries with error and time bounds. It uses offline sampling to conduct uniform and stratified samples, then for a certain query, the best sample(s) are selected based on the query’s error and time constrains. After that, the query is executed on the selected sample(s) in parallel. In Alpha framework, the RSP blocks are themselves the samples which are created using an offline operation (i.e., the two-stage data partitioning operation) on a computing cluster. A sample of these blocks is selected randomly for a certain analysis function. Multiple samples of RSP blocks can also be used in batches to improve the results. Thus, the RSB-based method can also be conducted within error or time constrains so that the asymptotic ensemble learning process is finished when those constrains are met. While BlinkDB targets aggregate queries (e.g., Avg, sum, count, quantile), Alpha is a general framework for big data analysis, e.g., for exploratory data analysis and predictive modeling. Alpha framework can be extended to support arbitrary data analysis functions or interactive data analysis. Furthermore, the RSP-based result can be updated by appending new results from newly analyzed RSP blocks without recalculating all the result from scratch.

  • IncApprox [14] is a stream data analytics system which depends on both approximate and incremental computing to incrementally update an approximate result for data analysis tasks. In the RSP-based method, we update the previous result by appending new results from newly analyzed RSP blocks. However, our method doesn’t target stream data analysis for the time being. We are currently working to extend the RSP-based method to manage and analyze evolving data sets. In addition to IncApprox, there are other works on approximate stream data analysis such as StreamApprox [56] and MacroBase [15].

  • As we discussed before, the bag of little bootstraps (BLB) [39] enables conducting the bootstrapping process on small samples from the data set instead of samples of sizes comparable to the original data set size. In fact, RSP blocks can be used directly as subsamples in the BLB procedure without re-sampling from distributed big data sets many times. RSP blocks are created only once. Furthermore, the BLB procedure can be run on batches of RSP blocks, according to the available resources, until a satisfactory result is obtained. We are currently testing the performance of the BLB procedure with RSP blocks in Alpha framework.

The RSP-based method is different from other works on approximate big data analysis in three main aspects. First, random sample data blocks are created by converting the data set into an RSP only once. After that, any RSP block can be used directly as a random sample without any record-level sampling operations at run-time. A subset of RSP blocks is often enough to get approximate results as the experimental results show. Second, individual results from RSP blocks are combined to obtain approximate ensemble results which can be improved incrementally using the asymptotic ensemble learning process. Third, this method doesn’t need parallelized implementations of data analysis and mining algorithms.

SECTION VIII.

Conclusions

In this paper, we introduced the RSP-based method for big data analysis. This method is based on the random sample partition model which preserves the statistical properties of the data set in each of its data blocks. Since RSP blocks are random samples of the entire data, they can be used directly to estimate the statistical properties of the entire data. Consequently, a subset of RSP blocks can be used to obtain ensemble estimates and models which are equivalent to those built from the entire data set. To enable efficient and effective big data analysis with RSP blocks, we proposed Alpha framework which consists of three main layers for data management, batch management, and data analysis. The experimental results of three real data sets showed that a subset of RSP blocks from an RSP is enough to obtain good approximate results for different tasks such as data summary, classification, and regression. We are currently testing Alpha framework for other tasks in big data exploration, cleaning, and clustering.

Usage
Select a Year
2025

View as

Total usage sinceDec 2018:2,128
010203040JanFebMarAprMayJunJulAugSepOctNovDec322430000000000
Year Total:86
Data is updated monthly. Usage includes PDF downloads and HTML views.

References

References is not available for this document.