2/02/2015

esProc External Memory Computing: Aggregate Operations with Cursor

The data volume of the big data table is usually quite huge, which makes it impossible to retrieve all data from the big data table at once. In view of this, the data processing over big data table is usually to serve two purposes: With cs.fetch(), retrieve partial data each time or group & aggregate the data in the big data table.

1.Group and aggregate data of a cursor


The commonest grouping & aggregate operations are count and summation. In addition, the aggregate operation includes computing the maximum/minimum value and top-n queries. For the aggregate operation over the cursor, you can use function cs.groups(). The aggregate functions corresponding to various aggregate operations include count/sum/max/min/topx. For example:

In A7, make statistics on the total number of orders, maximum trade amount, and total amount of different kinds of products. One thing to note is that the precision of the double-precision data is usually not enough. In this case, decimal() is used to convert the data into the big decimal type for computing. Regarding the grouping and aggregate computation with the cursor, all data will be traversed just once. During the traversal, each row of data will be aggregated based on the group to which it belongs. Multiple aggregate values can be computed in one traversal, and the cursor will be closed automatically once the traversal is completed.

The result in A7 is as follows:


As can be seen from the result, in the text data for test, each product has 50,000 records of orders. Data of every file and the data of A6’s cursor are sorted by Date, but not sorted by Type. Seen from the use of cs.groups() function in the example, the function groups and summarizes data in the cursor without requiring them to have been sorted. After data grouping and aggregation, result will be sorted by the grouping value. In addition, because cs.groups() returns a table sequence, the result set must not exceed the memory size when the function is used to handle the grouping and aggregate operations on the cursor. Thus it is not fit for the cases in which the result set holds big data.


To find the n greatest/smallest data values is also one of the aggregate operations. In this case, use the aggregate function topx:

A7 computes the bottom 3 orders with the lowest total amount and the top 3 orders with the highest total amount among the orders of each kind of product. Please remember to add the negative sign to the aggregate function when computing the top 3 orders, that is, topx(-Amount:3). The result is as follows:



To aggregate all records, simply omit the grouping expression from cs.groups function:



The result in A7 is as follows:


This shows that the aggregation of all records can be regarded as a special kind of grouping and aggregate operation.

2.Aggregate data from multiple cursors

When handling the aggregate operations over multiple cursors, you can compute the aggregate result of each cursor first:

A6 groups and aggregates each file cursor to compute the daily aggregated statistics of each kind of product:

Because the data will be sorted by grouping value when using the cs.groups() function to aggregate and group the cursor data, the result table sequence of aggregating the order data of each kind of product is always ordered by the date, whatever the order of original data is. So, the result can be merged further to compute the daily order data for all products:

In A8, you will see the summarizing result:

Since the order data of each kind of product in every data file have been sorted by date, cursors can be merged first, and then aggregated:

As you can see, this method leads to the same result as that got in the above. For cursors each of which data are ordered, it would be simpler and more effective to merge first, then group and aggregate. Note the difference between merge and merge@x. A.merge() is to merge the members of multiple sequences or records of multiple table sequences in proper order, and return a sequence. The computation is completed during the execution; CS.merge@x() is to merge the records from multiple cursors in proper order, return a cursor, and start the computing when fetching data from the cursor. Both functions require that the data in A and CS are ordered. There are detailed explanations about the usage of merge@x in esProc External Memory Computing: Merge and Join Cursor Data.

3.Big result sets   

In the above section, we’ve learned that the data of each cursor can be grouped and aggregated first to have a group of ordered result table sequences, which then will be merged to get the final result. With such method, we can solve the grouping and aggregating problems related to the big result sets.

The big result set means not only the source data for computation is of huge amount, but the computed result has huge data too. Owing to this, the computed result set cannot be read into the memory all at once. It has to be read out step by step. For example, the monthly bill for each customer of telecommunication company and the sales of each product of the B2C website. These statistical results may contain more than several millions of records.

In esProc, you can use cs.groupx() function to perform the grouping and aggregate operations over the big result set. Here, we will take the daily statistic on product orders as the example to illustrate the processing of big result sets. In order to have a better understanding about the memory limit, we retrieve 100 records each time. Thus, if we need to compute the daily order by grouping and aggregating, we should use groupx function:

In A6, the data in each cursor are concatenated in proper order, instead of being sorted by date. A7 returns the result of the grouping and aggregate operation as a cursor instead of a table sequence. By doing so, the data can be retrieved step by step in A8~A11. The data are shown below:



In which, each of A8~A10 retrieves the statistical result for 100 days, and A11 retrieves the remaining data. After the data of A7’s cursor are all fetched out, the cursor will close automatically.

In A7, with groupx function, the data are grouped and aggregated by date, and the number of buffer rows is set to 100. By doing so, when executed, A7 will fetch data from A6’s cursor and meanwhile, split and aggregate them. Once the result of aggregating 100 rows is produced, it will be buffered to a temporary file, and the rest can be done in the same manner. The computed result in A7 is a cursor composed of these temporary files:


Through step-by-step execution, you can view the generated temporary files in the system temporary directory after A7 is executed:

In order to see the contents of each of these binary files, you can retrieve the data from them, for example:

The data in A1 and A3 are shown below:

The name of these temporary files are generated randomly. Judging from the data retrieved from some temporary files, each temporary file stores the grouping and aggregate result of some consecutive original records, which are already sorted by the date. In fact, based on the number of buffer rows specified in the function, each temporary file, except for the last one, contains partial summarizing data for just 100 days. For a cursor generated with groupx, its data will be fetched with fetch by merging the data of all temporary files in proper order.

After cursor data are fetched out completelyby using the temporary file cursors or these cursors are closed, the corresponding temporary files will be deleted automatically. About the external memory grouping, please refer to esProc External Memory Computing:Principle of Grouping.

No comments:

Post a Comment