Friday, February 17, 2017

Good math, bad engineering


As a formal statistician and a current engineer, I feel that a successful engineering project may require both the mathematician’s ability to find the abstraction and the engineer’s ability to find the implementation.

For a typical engineering problem, the steps are usually -
- 1. Abstract the problem with a formula or some pseudocodes
- 2. Solve the problem with the formula
- 3. Iterate the initial solution until it achieves the optimal time complexity and space complexity

I feel that a mathematician would like dynamic programming or DP questions most, because they are too similar to the typical deduction question in math. An engineer will feel it challenging, since it needs the imagination and some sense of math.

The formula is the most important: without it, try-and-error or debugging does not help. Once the the formula is figured out, the rest becomes a piece of the cake. However, sometimes things are not that straightforward. Good mathematics does not always lead to good engineering.

Let’s see one question from Leetcode.

You are given a list of non-negative integers, a1, a2, ..., an, and a target, S. Now you have 2 symbols + and -. For each integer, you should choose one from + and - as its new symbol.

Find out how many ways to assign symbols to make sum of integers equal to target S.

Example 1:
Input: nums is [1, 1, 1, 1, 1], S is 3. 
Output: 5
Explanation: 

-1+1+1+1+1 = 3
+1-1+1+1+1 = 3
+1+1-1+1+1 = 3
+1+1+1-1+1 = 3
+1+1+1+1-1 = 3

There are 5 ways to assign symbols to make the sum of nums be target 3.

1. The quick solution

For each of the element of a list, it has two options: plus or minus. So the question asks how many ways to get a special number by all possible paths. Of course, if the sum of numbers is unrealistic, we just need to return 0.

Sounds exactly like a DP question. If we have a pencil and a paper, we can start to explore the relationship between dp(n) and dp(n-1). For example, our goal is to get a sum of 5, and we are given a list of [1, 1, 1, 1, 1]. If th a smaller tuple/list is (1, 1, 1, 1) and some paths get 4, that is exactly what we want since it adds 1 and becomes 5. Similarly, if they could get 6, that is fine as well. We add simply both paths together, since there are only two paths.

The formula is is dp(n, s) = dp(n-1, s-x) + dp(n-1, s+x), where n is the size of the list, s is the sum of the numbers and x is the one that adds to the previous list. OK, the second step is easy.
def findTargetSumWays_1(nums, S):
    """
    :type nums: Tuple[int]
    :type S: int
    :rtype: int
    """
    if not nums:
        if S == 0:
            return 1
        else:
            return 0
    return findTargetSumWays_1(nums[1:], S+nums[0]) + findTargetSumWays_1(nums[1:], S-nums[0]) 

small_test_nums = (1, 1, 1, 1, 1)
small_test_S = 3

%time findTargetSumWays_1(small_test_nums, small_test_S)
It is theoretically correct and works perfectly with small test cases. But we know that it is going to a nightmare for an engineering application, because it has a hefty time complexity of O(2^N). So math part is done, and We have to move to the third step.

2. The third step that is hard

So we need to find a data structure to record all the paths. If it is the Fibonacci number problem, a simple linear data structure like a list will slash O(2^N) to O(N).

But the hard part is: what data structure is going to be used here. Since the get operation in Hashtable is O(1), a rolling dictionary will help record the previous states. However, Python’s dictionary does not support change/add ops while it is in a loop, then we have to manually replace it. The overall path will be like a tree structure. So the ideal solution will be like -

def findTargetSumWays_2(nums, S):
    if not nums:
        return 0
    dic = {nums[0]: 1, -nums[0]: 1} if nums[0] != 0 else {0: 2}
    for i in range(1, len(nums)):
        tdic = {}
        for d in dic:
            tdic[d + nums[i]] = tdic.get(d + nums[i], 0) + dic.get(d, 0)
            tdic[d - nums[i]] = tdic.get(d - nums[i], 0) + dic.get(d, 0)
        dic = tdic
    return dic.get(S, 0)

big_test_nums = tuple(range(100))
big_test_S = sum(range(88))
%time findTargetSumWays_2(big_test_nums, big_test_S)

The time is exactly what we need. However, the codes are not elegant and hard to understand.
CPU times: user 189 ms, sys: 4.77 ms, total: 194 ms
Wall time: 192 ms

3. Finally the easy solution

If we don’t want things to get complicated. Here we just want a cache and Python 3 provides a lru_cache decorator. Then adding one line to the first solution will quickly solve the problem.

@lru_cache(10000000)
def findTargetSumWays_3(nums, S):
    if not nums:
        if S == 0:
            return 1
        else:
            return 0
    return findTargetSumWays_3(nums[1:], S+nums[0]) + findTargetSumWays_3(nums[1:], S-nums[0]) 

%time findTargetSumWays_3(big_test_nums, big_test_S)

CPU times: user 658 ms, sys: 19.7 ms, total: 677 ms
Wall time: 680 ms

Conclusion

Good math cannot solve all the engineering problems. It has to combine with the details of the languange, the application and the system to avoid bad engineering implementation.

The Jupyter notebook is at Github. If you have any comment, please email me wm@sasanalysis.com.

Thursday, February 16, 2017

Use Elasticsearch and Kibana for large BI system



Nowadays Elasticsearch is more and more popular. Besides it original search functionalities, I found Elasticsearch can be

  1. used as a logging container. That is what the ELK stack is created for.
  2. utilized as a JSON server with richful APIs, which can be combined with its Kibana as BI servers.

That is the data store I see everyday

  • 10PB stocking data
  • average 30TB incoming data everyday
  • various data sources including binary files such PDF
  • including very complicated SQL queries (fortunately no stored procedures)
  • millions of JSON creations daily

People want to know what is going on with such data. So a business intelligence or an OLAP system is needed to visualize/aggregate the data and its flow. Since Elasticsearch is so easy to scale out, it beats other solutions for big data on the market.

1. Batch Worker

There are many options to implement a batch worker. Finally the decision falls to either Spring Data Batch or writing a library from the scratch in Python.

1.1 Spring Data Batch v.s. Python

Spring Data Batch
  • Pros:
    • is a full framework that includes rollback, notification and scheduler features
    • provides great design pattern for Dependency Injection, such as factory and singleton, which help multiple persons work together. For example -
@Bean
public Step IndexMySQLJob01() {
   return stepBuilderFactory.get("IndexMySQLJob01")
           .<Data, Data> chunk(10)
           .reader(reader())
           .processor(processor())
           .writer(writer())
           .build();
}
Python
  • Pros:
    • less codes; flexible
    • super easy from dictionary to JSON
    • has official/3rd party libraries for everything
  • Cons:
    • you create your own library/framework
    • if the pattern like Spring Data Batch is deployed, has to inject dependencies manually, such as -
class IndexMySQLJob01(object):
     def __init__(self, reader, processor, writer, listener):
          self.reader = reader
          self.processor = processor
          self.writer = writer
          self.listener = listener
     ...

Eventually Python is picked, because the overall scenario is more algorithm-bound instead of language-bound.

1.2 The algorithms

Since the data size is pretty big, time and space are always considered. The direct way to decrease the time complexity is using the hash tables, as long as the memory can hold the data. For example, a join between an N rows table and an M rows table can be optimized from O(M*N) to O(M).

To save the space, a generator chain is used to stream data from the start to the end, instead of materializing sizable objects.

class JsonTask01(object):
    ...
    def get_json(self, generator1, hashtable1):
        for each_dict in generator1:
            key = each_dict.get('key')
            each_dict.update(hashtable1.get(key))
            yield each_dict

1.3 The scheduler

A scheduler is a must: cron is enough for simple tasking, while a bigger system requires a work flow. Airflow is the one that helps organize and schedule. It has a web UI and is written in Python, which is easy to be integrated with the batch worker.

1.4 High availability with zero downtime

Indexing of large quantity of data will impose significant impact. For mission-critical indexes that need 100% up time, the zero down algorithm is implemented and we keep two copies of an index for maximum safety. The alias will switch between the two copies once the indexing is finished.

    def add_alias(self, idx):
        LOGGER.warn("The alias {} will point to {}.".format(self.index, idx))
        self.es.indices.put_alias(idx, self.index)

    def delete_alias(self, idx):
        LOGGER.warn("The alias {} will be removed from {}.".format(self.index, idx))
        self.es.indices.delete_alias(idx, self.index)

2. Elasticsearch Cluster

2.1 Three kinds of nodes

An Elasticsearch node can choose one of three roles: master node, data node and ingest node(previously called client node). It is commonly seen to dedicate a node as master and ingest and data all together. For a large system, it is always helpful to assign the three roles to different machines/VMs. Therefore, once a node is down/up, it will be quicker to failover or recover.
elasticsearch-head can clearly visualize the data transfer process of the shards once an accident occurs.

With the increased number of cluster nodes, the deployment becomes painful. I feel the best tool so far is ansible-elasticsearch. With ansible-playbook -i hosts ./your-playbook.yml -c paramiko, the cluster is on the fly.

2.2 Memory is cheap but heap is expensive

The rules of thumb for Elasticsearch are -

Give (less than) Half Your Memory to Lucene

Don’t Cross 32 GB!

The result causes an awkward situation: if you have a machine that has more than 64GB memory, then the additional memory will mean nothing to Elasticsearch. Actually it is meaningful to run two or more Elasticsearch instances side by side to save the hardware. For example, there is a machine with 96GB memory. We can allocate 31GB for an ingest node, 31 GB for a data node and the rest for the OS. However, two data nodes in a single machine will compete for the disk IO that damages the performance, while a master node and a data node will increase the risk of downtime.

The great thing for Elasticsearch is that it provides richful REST APIs, such as http://localhost:9200/_nodes/stats?pretty. We could use Xpack(paid) or other customized tools to monitor them. I feel that the three most important statistics for the heap and therefore the performance are -

The heap usage and the old GC duration

The two statistics intertwined together. The high heap usage, such as 75%, will lead to a GC, while GC with high heap usage will take longer time. We have to keep both numbers as low as possible.

     "jvm" : {
        "mem" : {
          "heap_used_percent" : 89,
          ... 
        "gc" : {
          "collectors" : {
            ...
            "old" : {
              "collection_count" : 225835,
              "collection_time_in_millis" : 22624857
            }
          }
        }
Thread pools

There are three kinds of thread pools: active, queue, and reject. It is useful to visualize the real time change. Once there are a lot of queued threads or rejected threads, it is good time to think about scale up or scale out.

"thread_pool" : {
        "bulk" : {
          "threads" : 4,
          "queue" : 0,
          "active" : 0,
          "rejected" : 0,
          "largest" : 4,
          "completed" : 53680
        },
The segments’ number/size

The segments are the in-memory inverted indexes corresponding to the indexes on the hard disk, which are persistent in the physical memory and GC will have no effect on them. The segment will have the footage on every search thread. The size of the segments are important because they will be multiplied by a factor of the number of threads.

        "segments" : {
          "count" : 215,
          "memory_in_bytes" : 15084680,
        },

The number of shards actually controls the number of the segments. The shards increase, then the size of the segments decreases and the number of the segments increases. So we cannot increase the number of shards as many as we want. If there are many small segments, the heap usage will turn much higher. The solution is Force merge, which is time-consuming but effecitve.

3. Kibana as BI dashboard

3.1 Plugins

Kibana integreated DevTools(previously The Sense UI) for free. DevTools has code assistance and is a powerful tool for debugging. If the budget is not an issue, Xpack is also highly recommended. As for Elasticsearch, since 5.0, ingest-geoip is now a plugin. We will have to write it to Ansible YAML such as -

es_plugins:
    - plugin: ingest-geoip

3.2 Instant Aggregation

There are quite a few KPIs that need system-wide term aggregations. From 5.0 the request cache will be enabled by default for all requests with size:0.
For example -

POST /big_data_index/data/_search
{   "size": 0,
    "query": {
        "bool": {
            "must_not": {
                "exists": {
                    "field": "interesting_field"
                }
            }
        }
    }
}

The Fore merge as mentioned above, such asPOST /_forcemerge?max_num_segments=1, will combine the segments and dramatically increase the aggregation speed.

3.3 Proxy

Nginx is possibly the best proxy as the frontend toward Kibana. There are two advantages: first the proxy can cache the static resources of Kibana; second we can always check the Nginx logs to figure out what causes problem for Kibana.

Conclusion

Elasticsearch and Kibana together provide high availability and high scalability for large BI system.

if you have any comment, please email me wm@sasanalysis.com

Wednesday, September 7, 2016

Use Slack bot to monitor the server


I used to install Datadog or other SaaS to monitor my Linux boxes on the cloud. Most times they are just overkill for my tiny servers with only 1GB or 2GB memory. Actually what I am most interested is the up-and-running processes, or/and the exact memory usage. And I need a mobile solution to monitor on-the-go.
Now with the coming of Slack bot, and its real time Python client, I can just use a simple Python script to realize the purposes.
from slackclient import SlackClient
from subprocess import  getoutput
import logging
import time

message_channel = '#my-server-001'
api_key = 'xoxb-slack-token'
client = SlackClient(api_key)

if client.rtm_connect():
    while True:
        last_read = client.rtm_read()
        if last_read:
            try:
                parsed = last_read[0]['text']
                if parsed and 'status' in parsed:
                    result = getoutput('pstree')
                    result += '\n\n' + getoutput('free -h')
                    client.rtm_send_message(message_channel, str(result))
            except Exception as e:
                logging.error(e)
        time.sleep(1)
Then I use systemd or other tools to daemonize it. No matter where and when I am, I enter status at the #my-server-001 channel on my phone, I will instantly get the result like -
systemd-+-accounts-daemon-+-{gdbus}
       |                 `-{gmain}
       |-agetty
       |-cron
       |-dbus-daemon
       |-fail2ban-server---2*[{fail2ban-server}]
       |-login---bash
       |-nginx---nginx
       |-postgres---5*[postgres]
       |-python---sh---pstree
       |-redis-server---2*[{redis-server}]
       |-rsyslogd-+-{in:imklog}
       |          |-{in:imuxsock}
       |          `-{rs:main Q:Reg}
       |-sshd-+-3*[sshd---sshd---bash]
       |      `-sshd---sshd
       |-2*[systemd---(sd-pam)]
       |-systemd-journal
       |-systemd-logind
       |-systemd-timesyn---{sd-resolve}
       |-systemd-udevd
       `-uwsgi---uwsgi---5*[uwsgi]

             total        used        free      shared  buff/cache   available
Mem:           2.0G        207M        527M         26M        1.2G        1.7G
Swap:          255M          0B        255M

Wednesday, July 29, 2015

Deploy edx spark environment to DigitalOcean

This summer I took the Spark courses at edx CS100 and CS190, and had wonderful experience.
The two classes apply a Vagrant virtual machine containing Spark and all teaching materials. There are two challenges with the virtual machine —
  1. The labs usually take long time to finish, say 8-10 hours. If the host machine is closed, the RDDs will be lost and the pipeline has to be run again.
  2. Some RDD operations take a lot computation/communication powers, such as groupByKey and distinct. Many of my 50k classmates complained about the waiting time. And my most used laptop is a Chromebook and doesn’t even have options to install Virtual Box.
To deploy the learning environment to a cloud may be an alternative. DigitalOcean is a good choice because it uses mirrors for most packages, and the network speed is amazingly fast that is almost 100MB/s (thanks to the SSD infrastructure DigitalOcean implements for the cloud, otherwise the hard disk may not stand this rapid IO; see my deployment records GitHub).

I found that a Linux box with 1 GB memory and 1 CPU at DigitalOcean that costs 10 dollars a month will handle most labs fairly easy with IPython and Spark. A 2 GB memory and 2 CPU droplet will be ideal since it is the minimal requirement for a simulated cluster. It costs 20 dollars a month, but is still much cheaper than the cost to earn the big data certificate that is $100 (50 for each). I just need to write Python scripts to install IPython notebook with SSL, and download Spark and the course materials.
  • The DevOps tool is Fabric and the fabfile is at GitHub.
  • The deployment pipeline is also at GitHub

Friday, July 17, 2015

Transform SAS files to Parquet through Spark

The demo pipeline is at GitHub.
Since the version 1.3, Spark has introduced the new data structure DataFrame. A data analyst now could easily scale out the exsiting codes based on the DataFrame from Python or R to a cluster hosting Hadoop and Spark.
There are quite a few practical scenarios that DataFrame fits well. For example, a lot of data files including the hardly read SAS files want to merge into a single data store. Apache Parquet is a popular column store in a distributed environment, and especially friendly to structured or semi-strucutred data. It is an ideal candidate for a univeral data destination.
I copy three SAS files called prdsale, prdsal2 and prdsal3, which are about a simulated sales record, from the SASHELP library to a Linux directory. And then I launch the SQL context from Spark 1.4.
The three SAS files now have the size of 4.2MB. My overall strategy is to build a pipeline to realize my purpose such as SAS --> Python --> Spark --> Parquet.
import os
try:
    import sas7bdat
    import pandas
except ImportError:
    print('try to install the packags first')

print('Spark verion is {}'.format(sc.version))

if type(sqlContext) != pyspark.sql.context.HiveContext:
    print('reset the Spark SQL context')

os.chdir('/root/playground')

def print_bytes(filename):
    print('{} has {:,} bytes'.format(filename, os.path.getsize(filename)))

print_bytes('prdsale.sas7bdat')
print_bytes('prdsal2.sas7bdat')
print_bytes('prdsal3.sas7bdat')

!du -ch --exclude=test_parquet

Spark verion is 1.4.0
prdsale.sas7bdat has 148,480 bytes
prdsal2.sas7bdat has 2,790,400 bytes
prdsal3.sas7bdat has 1,401,856 bytes
4.2M    .
4.2M    total

1. Test DataFrame in Python and Spark

First I transform a SAS sas7bdat file to a pandas DataFrame. The great thing in Spark is that a Python/pandas DataFrame could be translated to Spark DataFrame by the createDataFrame method. Now I have two DataFrames: one is a pandas DataFrame and the other is a Spark DataFrame.
with sas7bdat.SAS7BDAT('prdsale.sas7bdat') as f:
     pandas_df = f.to_data_frame()
print('-----Data in Pandas dataframe-----')
print(pandas_df.head())

print('-----Data in Spark dataframe-----')
spark_df = sqlContext.createDataFrame(pandas_df)
spark_df.show(5)

-----Data in Pandas dataframe-----
   ACTUAL COUNTRY   DIVISION  MONTH  PREDICT   PRODTYPE PRODUCT  QUARTER  \
0     925  CANADA  EDUCATION  12054      850  FURNITURE    SOFA        1   
1     999  CANADA  EDUCATION  12085      297  FURNITURE    SOFA        1   
2     608  CANADA  EDUCATION  12113      846  FURNITURE    SOFA        1   
3     642  CANADA  EDUCATION  12144      533  FURNITURE    SOFA        2   
4     656  CANADA  EDUCATION  12174      646  FURNITURE    SOFA        2   

  REGION  YEAR  
0   EAST  1993  
1   EAST  1993  
2   EAST  1993  
3   EAST  1993  
4   EAST  1993  
-----Data in Spark dataframe-----
+------+-------+---------+-------+-------+---------+-------+-------+------+------+
|ACTUAL|COUNTRY| DIVISION|  MONTH|PREDICT| PRODTYPE|PRODUCT|QUARTER|REGION|  YEAR|
+------+-------+---------+-------+-------+---------+-------+-------+------+------+
| 925.0| CANADA|EDUCATION|12054.0|  850.0|FURNITURE|   SOFA|    1.0|  EAST|1993.0|
| 999.0| CANADA|EDUCATION|12085.0|  297.0|FURNITURE|   SOFA|    1.0|  EAST|1993.0|
| 608.0| CANADA|EDUCATION|12113.0|  846.0|FURNITURE|   SOFA|    1.0|  EAST|1993.0|
| 642.0| CANADA|EDUCATION|12144.0|  533.0|FURNITURE|   SOFA|    2.0|  EAST|1993.0|
| 656.0| CANADA|EDUCATION|12174.0|  646.0|FURNITURE|   SOFA|    2.0|  EAST|1993.0|
+------+-------+---------+-------+-------+---------+-------+-------+------+------+
The two should be the identical length. Here both show 1,440 rows.
print(len(pandas_df))
print(spark_df.count())

1440
1440

2. Automate the transformation

I write a pipeline function to automate the transformation. As the result, the all three SAS files are saved to the same directory as Parquet format.
def sas_to_parquet(filelist, destination):
    """Save SAS file to parquet
    Args:
        filelist (list): the list of sas file names
        destination (str): the path for parquet
    Returns:
        None
    """
    rows = 0
    for i, filename in enumerate(filelist):
        with sas7bdat.SAS7BDAT(filename) as f:
            pandas_df = f.to_data_frame()
            rows += len(pandas_df)
        spark_df = sqlContext.createDataFrame(pandas_df)
        spark_df.save("{0}/key={1}".format(destination, i), "parquet")
    print('{0} rows have been transformed'.format(rows))

sasfiles = [x for x in os.listdir('.') if x[-9:] == '.sas7bdat']
print(sasfiles)

sas_to_parquet(sasfiles, '/root/playground/test_parquet')

['prdsale.sas7bdat', 'prdsal2.sas7bdat', 'prdsal3.sas7bdat']
36000 rows has been transformed
Then I read from the newly created Parquet data store. The query shows that the data has been successfully saved.
df = sqlContext.load("/root/playground/test_parquet", "parquet")
print(df.count())
df.filter(df.key == 0).show(5)

36000
+------+-------+------+----+-------+-------+---------+-------+-------+-----+------+-----+---------+------+---+
|ACTUAL|COUNTRY|COUNTY|DATE|  MONTH|PREDICT| PRODTYPE|PRODUCT|QUARTER|STATE|  YEAR|MONYR| DIVISION|REGION|key|
+------+-------+------+----+-------+-------+---------+-------+-------+-----+------+-----+---------+------+---+
| 925.0| CANADA|  null|null|12054.0|  850.0|FURNITURE|   SOFA|    1.0| null|1993.0| null|EDUCATION|  EAST|  0|
| 999.0| CANADA|  null|null|12085.0|  297.0|FURNITURE|   SOFA|    1.0| null|1993.0| null|EDUCATION|  EAST|  0|
| 608.0| CANADA|  null|null|12113.0|  846.0|FURNITURE|   SOFA|    1.0| null|1993.0| null|EDUCATION|  EAST|  0|
| 642.0| CANADA|  null|null|12144.0|  533.0|FURNITURE|   SOFA|    2.0| null|1993.0| null|EDUCATION|  EAST|  0|
| 656.0| CANADA|  null|null|12174.0|  646.0|FURNITURE|   SOFA|    2.0| null|1993.0| null|EDUCATION|  EAST|  0|
+------+-------+------+----+-------+-------+---------+-------+-------+-----+------+-----+---------+------+---+

3. Conclusion

There are multiple advantages to tranform data from various sources to Parquet.
  1. It is an open format that could be read and written by major softwares.
  2. It could be well distributed to HDFS.
  3. It compresses data.
For example, the original SAS files add up to 4.2 megabyte. Now as Parquet, it only weighs 292KB and achieves 14X compression ratio.
os.chdir('/root/playground/test_parquet/')
!du -ahc 

4.0K    ./key=2/._metadata.crc
4.0K    ./key=2/._SUCCESS.crc
0    ./key=2/_SUCCESS
4.0K    ./key=2/_common_metadata
4.0K    ./key=2/.part-r-00001.gz.parquet.crc
4.0K    ./key=2/._common_metadata.crc
4.0K    ./key=2/_metadata
60K    ./key=2/part-r-00001.gz.parquet
88K    ./key=2
4.0K    ./key=0/._metadata.crc
4.0K    ./key=0/._SUCCESS.crc
0    ./key=0/_SUCCESS
4.0K    ./key=0/_common_metadata
4.0K    ./key=0/.part-r-00001.gz.parquet.crc
4.0K    ./key=0/._common_metadata.crc
4.0K    ./key=0/_metadata
12K    ./key=0/part-r-00001.gz.parquet
40K    ./key=0
4.0K    ./key=1/._metadata.crc
4.0K    ./key=1/._SUCCESS.crc
0    ./key=1/_SUCCESS
4.0K    ./key=1/_common_metadata
4.0K    ./key=1/.part-r-00001.gz.parquet.crc
4.0K    ./key=1/._common_metadata.crc
4.0K    ./key=1/_metadata
132K    ./key=1/part-r-00001.gz.parquet
160K    ./key=1
292K    .
292K    total
A bar plot visualizes the signifcant size difference between the two formats. It shows an order of magnitude space deduction.
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np
index = np.arange(2)
bar_width = 0.35
data = [4200, 292]
header = ['SAS files', 'Parquet']
plt.bar(index, data)
plt.grid(b=True, which='major', axis='y')
plt.ylabel('File Size by KB')
plt.xticks(index + bar_width, header)
plt.show()

Friday, June 19, 2015

Two alternative ways to query large dataset in SAS

I really appreciate those wonderful comments on my SAS posts by the readers (123). They gave me a lot of inspirations. Due to SAS or SQL’s inherent limitation, recently I feel difficult in deal with some extremely large SAS datasets (it means that I exhausted all possible traditional ways). Here I conclude two alternative solutions in these extreme cases as a follow-up to the comments.
  1. Read Directly
    • Use a scripting language such as Python to Reading SAS datasets directly
  2. Code Generator
    • Use SAS or other scripting languages to generate SAS/SQL codes
The examples still use sashelp.class, which has 19 rows. The target variable is weight.
*In SAS
data class;
    set sashelp.class;
    row = _n_;
run;

Example 1: Find the median

SQL Query

In the comment, Anders Sk├ÂllermoFebruary wrote
Hi! About 1. Calculate the median of a variable:
If you look at the details in the SQL code for calculation the median, then you find that the intermediate file is of size N*N obs, where N is the number of obs in the SAS data set.
So this is OK for very small files. But for a file with 10000 obs, you have an intermediate file of size 100 million obs. / Br Anders Anders Sk├Âllermo Ph.D., Reuma and Neuro Data Analyst
The SQL query below is simple and pure, so that it can be ported to any other SQL platform. However, just like what Anders said, it is just way too expensive.
*In SAS
proc sql;
   select avg(weight) as Median
   from (select e.weight
       from class e, class d
       group by e.weight
       having sum(case when e.weight = d.weight then 1 else 0 end)
          >= abs(sum(sign(e.weight - d.weight)))
    );
quit;

PROC UNIVARIATE

In the comment, Anonymous wrote:
I noticed the same thing - we tried this on one of our 'smaller' datasets (~2.9 million records), and it took forever.
Excellent solution, but maybe PROC UNIVARIATE will get you there faster on a large dataset.
Indeed PROC UNIVARIATE is the best solution in SAS to find the median, which utilizes SAS's built-in powers.

Read Directly

When the extreme cases come, say SAS cannot even open the entire dataset, we may have to use the streaming method to Reading the sas7bdat file line by line. The sas7bdat format has been decoded by JavaR and Python. Theoretically we don't need to have SAS to query a SAS dataset.
Heap is an interesting data structure, which easily finds a min or a max. ream the values, we could build a max heap and a min heap to cut the incoming stream into half in Python. The algorithm looks like a heap sorting. The good news is that it only Reading one variable each time and thus saves a lot of space.
#In Python
import heapq
from sas7bdat import SAS7BDAT
class MedianStream(object):
    def __init__(self):
        self.first_half = [] # will be a max heap
        self.second_half = [] # will be a min heap, 1/2 chance has one more element
        self.N = 0

    def insert(self, x):
        heapq.heappush(self.first_half, -x)
        self.N += 1
        if len(self.second_half) == len(self.first_half):
            to_second, to_first = map(heapq.heappop, [self.first_half, self.second_half])
            heapq.heappush(self.second_half, -to_second)
            heapq.heappush(self.first_half, -to_first)
        else:
            to_second = heapq.heappop(self.first_half)
            heapq.heappush(self.second_half, -to_second)

    def show_median(self):
        if self.N == 0:
            raise IOError('please use the insert method first')
        elif self.N % 2 == 0:
            return (-self.first_half[0] + self.second_half[0]) / 2.0
        return -self.first_half[0]

if __name__ == "__main__": 
    stream = MedianStream()
    with SAS7BDAT('class.sas7bdat') as infile:
        for i, line in enumerate(infile):
            if i == 0:
                continue
            stream.insert(float(line[-1]))
    print stream.show_median()

99.5

Example 2: Find top K by groups

SQL Query

This query below is very expensive. We have a self-joining O(N^2) and a sorting O(NlogN), and the total time complexity is a terrible O(N^2 + Nlog(N)).
* In SAS
proc sql; 
    select a.sex, a.name, a.weight, (select count(distinct b.weight) 
            from class as b where b.weight >= a.weight and a.sex = b.sex ) as rank 
    from class as a
    where calculated rank <= 3
    order by sex, rank
;quit;

Code Generator

The overall thought is break-and-conquer. If we synthesize SAS codes from a scripting tool such as Python, we essentially get many small SAS codes segments. For example, the SQL code below is just about sorting. So the time comlexity is largely decreased to O(Nlog(N)).
# In Python
def create_sql(k, candidates):
    template = """
    proc sql outobs = {0};
    select *
    from {1}
    where sex = '{2}' 
    order by weight desc
    ;
    quit;"""
    for x in candidates:
        current = template.format(k, 'class', x)
        print current
if __name__ == "__main__":
    create_sql(3, ['M', 'F'])


    proc sql outobs = 3;
    select *
    from class
    where sex = 'M' 
    order by weight desc
    ;
    quit;

    proc sql outobs = 3;
    select *
    from class
    where sex = 'F' 
    order by weight desc
    ;
    quit;

Read Directly

This time we use the data structure of heap again in Python. To find the k top rows for each group, we just need to prepare the min heaps with the k size for each group. With the smaller values popped out everytime, we finally get the top k values for each group. The optimized time complexity is O(Nlog(k))
#In Python
from sas7bdat import SAS7BDAT
from heapq import heappush, heappop

def get_top(k, sasfile):
    minheaps = [[], []]
    sexes = ['M', 'F']
    with SAS7BDAT(sasfile) as infile:
        for i, row in enumerate(infile):
            if i == 0:
                continue
            sex, weight = row[1], row[-1]
            i = sexes.index(sex)
            current = minheaps[i]
            heappush(current, (weight, row))
            if len(current) > k:
                heappop(current)
    for x in minheaps:
        for _, y in x:
            print y

if __name__ == "__main__":
    get_top(3, 'class.sas7bdat')


[u'Robert', u'M', 12.0, 64.8, 128.0]
[u'Ronald', u'M', 15.0, 67.0, 133.0]
[u'Philip', u'M', 16.0, 72.0, 150.0]
[u'Carol', u'F', 14.0, 62.8, 102.5]
[u'Mary', u'F', 15.0, 66.5, 112.0]
[u'Janet', u'F', 15.0, 62.5, 112.5]

Example 3: Find Moving Window Maxium

At the daily work, I always want to find three statistics for a moving window: mean, max, and min. The sheer data size poses challenges.
In his blog post, Liang Xie showed three advanced approaches to calculated the moving averages, including PROC EXPANDDATA STEP and PROC SQL. Apparently PROC EXPAND is the winner throughout the comparison. As conclusion, self-joining is very expensive and always O(N^2) and we should avoid it as much as possible.
The question to find the max or the min is somewhat different other than to find the mean, since for the mean only the mean is memorized, while for the max/min the locations of the past min/max should also be memorized.

Code Generator

The strategy is very straightforward: we choose three rows from the table sequentially and calculate the means. The time complexity is O(k*N). The generated SAS code is very lengthy, but the machine should feel comfortable to Reading it.
In addition, if we want to save the results, we could insert those maximums to an empty table.
# In Python
def create_sql(k, N):
    template = """
    select max(weight)
    from class
    where row in ({0})
    ;"""
    SQL = ""
    for x in range(1, N - k + 2):
        current = map(str, range(x, x + 3))
        SQL += template.format(','.join(current))
    print "proc sql;" + SQL + "quit;"
if __name__ == "__main__":
    create_sql(3, 19)


proc sql;
    select max(weight)
    from class
    where row in (1,2,3)
    ;
    select max(weight)
    from class
    where row in (2,3,4)
    ;
    select max(weight)
    from class
    where row in (3,4,5)
    ;
    select max(weight)
    from class
    where row in (4,5,6)
    ;
    select max(weight)
    from class
    where row in (5,6,7)
    ;
    select max(weight)
    from class
    where row in (6,7,8)
    ;
    select max(weight)
    from class
    where row in (7,8,9)
    ;
    select max(weight)
    from class
    where row in (8,9,10)
    ;
    select max(weight)
    from class
    where row in (9,10,11)
    ;
    select max(weight)
    from class
    where row in (10,11,12)
    ;
    select max(weight)
    from class
    where row in (11,12,13)
    ;
    select max(weight)
    from class
    where row in (12,13,14)
    ;
    select max(weight)
    from class
    where row in (13,14,15)
    ;
    select max(weight)
    from class
    where row in (14,15,16)
    ;
    select max(weight)
    from class
    where row in (15,16,17)
    ;
    select max(weight)
    from class
    where row in (16,17,18)
    ;
    select max(weight)
    from class
    where row in (17,18,19)
    ;quit;

Read Directly

Again, if we want to further decrease the time complexity, say O(N), we have to use better data structure, such as queue. SAS doesn't have queue, so we may switch to Python. Actually it has two loops which adds up to O(2N). However, it is still better than any other methods.
# In Python
from sas7bdat import SAS7BDAT
from collections import deque

def maxSlidingWindow(A, w):
    N = len(A)
    ans =[0] * (N - w + 1)
    myqueue = deque()
    for i in range(w):
        while myqueue and A[i] >= A[myqueue[-1]]:
            myqueue.pop()
        myqueue.append(i)
    for i in range(w, N):
        ans[i - w] = A[myqueue[0]]
        while myqueue and A[i] >= A[myqueue[-1]]:
            myqueue.pop()
        while myqueue and myqueue[0] <= i-w:
            myqueue.popleft()
        myqueue.append(i)
    ans[-1] = A[myqueue[0]]
    return ans

if __name__ == "__main__":
    weights = []
    with SAS7BDAT('class.sas7bdat') as infile:
        for i, row in enumerate(infile):
            if i == 0:
                continue
            weights.append(float(row[-1]))

    print maxSlidingWindow(weights, 3)

[112.5, 102.5, 102.5, 102.5, 102.5, 112.5, 112.5, 112.5, 99.5, 99.5, 90.0, 112.0, 150.0, 150.0, 150.0, 133.0, 133.0]

Conclusion

While data is expanding, we should more and more consider three things -
  • Time complexity: we don't want run data for weeks.
  • Space complexity: we don't want the memory overflow.
  • Clean codes: the colleagues should easily Reading and modify the codes.

Good math, bad engineering

As a formal statistician and a current engineer, I feel that a successful engineering project may require both the mathematician’s abilit...