Friday, February 17, 2017

Good math, bad engineering


As a formal statistician and a current engineer, I feel that a successful engineering project may require both the mathematician’s ability to find the abstraction and the engineer’s ability to find the implementation.

For a typical engineering problem, the steps are usually -
- 1. Abstract the problem with a formula or some pseudocodes
- 2. Solve the problem with the formula
- 3. Iterate the initial solution until it achieves the optimal time complexity and space complexity

I feel that a mathematician would like dynamic programming or DP questions most, because they are too similar to the typical deduction question in math. An engineer will feel it challenging, since it needs the imagination and some sense of math.

The formula is the most important: without it, try-and-error or debugging does not help. Once the the formula is figured out, the rest becomes a piece of the cake. However, sometimes things are not that straightforward. Good mathematics does not always lead to good engineering.

Let’s see one question from Leetcode.

You are given a list of non-negative integers, a1, a2, ..., an, and a target, S. Now you have 2 symbols + and -. For each integer, you should choose one from + and - as its new symbol.

Find out how many ways to assign symbols to make sum of integers equal to target S.

Example 1:
Input: nums is [1, 1, 1, 1, 1], S is 3. 
Output: 5
Explanation: 

-1+1+1+1+1 = 3
+1-1+1+1+1 = 3
+1+1-1+1+1 = 3
+1+1+1-1+1 = 3
+1+1+1+1-1 = 3

There are 5 ways to assign symbols to make the sum of nums be target 3.

1. The quick solution

For each of the element of a list, it has two options: plus or minus. So the question asks how many ways to get a special number by all possible paths. Of course, if the sum of numbers is unrealistic, we just need to return 0.

Sounds exactly like a DP question. If we have a pencil and a paper, we can start to explore the relationship between dp(n) and dp(n-1). For example, our goal is to get a sum of 5, and we are given a list of [1, 1, 1, 1, 1]. If th a smaller tuple/list is (1, 1, 1, 1) and some paths get 4, that is exactly what we want since it adds 1 and becomes 5. Similarly, if they could get 6, that is fine as well. We add simply both paths together, since there are only two paths.

The formula is is dp(n, s) = dp(n-1, s-x) + dp(n-1, s+x), where n is the size of the list, s is the sum of the numbers and x is the one that adds to the previous list. OK, the second step is easy.
def findTargetSumWays_1(nums, S):
    """
    :type nums: Tuple[int]
    :type S: int
    :rtype: int
    """
    if not nums:
        if S == 0:
            return 1
        else:
            return 0
    return findTargetSumWays_1(nums[1:], S+nums[0]) + findTargetSumWays_1(nums[1:], S-nums[0]) 

small_test_nums = (1, 1, 1, 1, 1)
small_test_S = 3

%time findTargetSumWays_1(small_test_nums, small_test_S)
It is theoretically correct and works perfectly with small test cases. But we know that it is going to a nightmare for an engineering application, because it has a hefty time complexity of O(2^N). So math part is done, and We have to move to the third step.

2. The third step that is hard

So we need to find a data structure to record all the paths. If it is the Fibonacci number problem, a simple linear data structure like a list will slash O(2^N) to O(N).

But the hard part is: what data structure is going to be used here. Since the get operation in Hashtable is O(1), a rolling dictionary will help record the previous states. However, Python’s dictionary does not support change/add ops while it is in a loop, then we have to manually replace it. The overall path will be like a tree structure. So the ideal solution will be like -

def findTargetSumWays_2(nums, S):
    if not nums:
        return 0
    dic = {nums[0]: 1, -nums[0]: 1} if nums[0] != 0 else {0: 2}
    for i in range(1, len(nums)):
        tdic = {}
        for d in dic:
            tdic[d + nums[i]] = tdic.get(d + nums[i], 0) + dic.get(d, 0)
            tdic[d - nums[i]] = tdic.get(d - nums[i], 0) + dic.get(d, 0)
        dic = tdic
    return dic.get(S, 0)

big_test_nums = tuple(range(100))
big_test_S = sum(range(88))
%time findTargetSumWays_2(big_test_nums, big_test_S)

The time is exactly what we need. However, the codes are not elegant and hard to understand.
CPU times: user 189 ms, sys: 4.77 ms, total: 194 ms
Wall time: 192 ms

3. Finally the easy solution

If we don’t want things to get complicated. Here we just want a cache and Python 3 provides a lru_cache decorator. Then adding one line to the first solution will quickly solve the problem.

@lru_cache(10000000)
def findTargetSumWays_3(nums, S):
    if not nums:
        if S == 0:
            return 1
        else:
            return 0
    return findTargetSumWays_3(nums[1:], S+nums[0]) + findTargetSumWays_3(nums[1:], S-nums[0]) 

%time findTargetSumWays_3(big_test_nums, big_test_S)

CPU times: user 658 ms, sys: 19.7 ms, total: 677 ms
Wall time: 680 ms

Conclusion

Good math cannot solve all the engineering problems. It has to combine with the details of the languange, the application and the system to avoid bad engineering implementation.

The Jupyter notebook is at Github. If you have any comment, please email me wm@sasanalysis.com.

Thursday, February 16, 2017

Use Elasticsearch and Kibana for large BI system



Nowadays Elasticsearch is more and more popular. Besides it original search functionalities, I found Elasticsearch can be

  1. used as a logging container. That is what the ELK stack is created for.
  2. utilized as a JSON server with richful APIs, which can be combined with its Kibana as BI servers.

That is the data store I see everyday

  • 10PB stocking data
  • average 30TB incoming data everyday
  • various data sources including binary files such PDF
  • including very complicated SQL queries (fortunately no stored procedures)
  • millions of JSON creations daily

People want to know what is going on with such data. So a business intelligence or an OLAP system is needed to visualize/aggregate the data and its flow. Since Elasticsearch is so easy to scale out, it beats other solutions for big data on the market.

1. Batch Worker

There are many options to implement a batch worker. Finally the decision falls to either Spring Data Batch or writing a library from the scratch in Python.

1.1 Spring Data Batch v.s. Python

Spring Data Batch
  • Pros:
    • is a full framework that includes rollback, notification and scheduler features
    • provides great design pattern for Dependency Injection, such as factory and singleton, which help multiple persons work together. For example -
@Bean
public Step IndexMySQLJob01() {
   return stepBuilderFactory.get("IndexMySQLJob01")
           .<Data, Data> chunk(10)
           .reader(reader())
           .processor(processor())
           .writer(writer())
           .build();
}
Python
  • Pros:
    • less codes; flexible
    • super easy from dictionary to JSON
    • has official/3rd party libraries for everything
  • Cons:
    • you create your own library/framework
    • if the pattern like Spring Data Batch is deployed, has to inject dependencies manually, such as -
class IndexMySQLJob01(object):
     def __init__(self, reader, processor, writer, listener):
          self.reader = reader
          self.processor = processor
          self.writer = writer
          self.listener = listener
     ...

Eventually Python is picked, because the overall scenario is more algorithm-bound instead of language-bound.

1.2 The algorithms

Since the data size is pretty big, time and space are always considered. The direct way to decrease the time complexity is using the hash tables, as long as the memory can hold the data. For example, a join between an N rows table and an M rows table can be optimized from O(M*N) to O(M).

To save the space, a generator chain is used to stream data from the start to the end, instead of materializing sizable objects.

class JsonTask01(object):
    ...
    def get_json(self, generator1, hashtable1):
        for each_dict in generator1:
            key = each_dict.get('key')
            each_dict.update(hashtable1.get(key))
            yield each_dict

1.3 The scheduler

A scheduler is a must: cron is enough for simple tasking, while a bigger system requires a work flow. Airflow is the one that helps organize and schedule. It has a web UI and is written in Python, which is easy to be integrated with the batch worker.

1.4 High availability with zero downtime

Indexing of large quantity of data will impose significant impact. For mission-critical indexes that need 100% up time, the zero down algorithm is implemented and we keep two copies of an index for maximum safety. The alias will switch between the two copies once the indexing is finished.

    def add_alias(self, idx):
        LOGGER.warn("The alias {} will point to {}.".format(self.index, idx))
        self.es.indices.put_alias(idx, self.index)

    def delete_alias(self, idx):
        LOGGER.warn("The alias {} will be removed from {}.".format(self.index, idx))
        self.es.indices.delete_alias(idx, self.index)

2. Elasticsearch Cluster

2.1 Three kinds of nodes

An Elasticsearch node can choose one of three roles: master node, data node and ingest node(previously called client node). It is commonly seen to dedicate a node as master and ingest and data all together. For a large system, it is always helpful to assign the three roles to different machines/VMs. Therefore, once a node is down/up, it will be quicker to failover or recover.
elasticsearch-head can clearly visualize the data transfer process of the shards once an accident occurs.

With the increased number of cluster nodes, the deployment becomes painful. I feel the best tool so far is ansible-elasticsearch. With ansible-playbook -i hosts ./your-playbook.yml -c paramiko, the cluster is on the fly.

2.2 Memory is cheap but heap is expensive

The rules of thumb for Elasticsearch are -

Give (less than) Half Your Memory to Lucene

Don’t Cross 32 GB!

The result causes an awkward situation: if you have a machine that has more than 64GB memory, then the additional memory will mean nothing to Elasticsearch. Actually it is meaningful to run two or more Elasticsearch instances side by side to save the hardware. For example, there is a machine with 96GB memory. We can allocate 31GB for an ingest node, 31 GB for a data node and the rest for the OS. However, two data nodes in a single machine will compete for the disk IO that damages the performance, while a master node and a data node will increase the risk of downtime.

The great thing for Elasticsearch is that it provides richful REST APIs, such as http://localhost:9200/_nodes/stats?pretty. We could use Xpack(paid) or other customized tools to monitor them. I feel that the three most important statistics for the heap and therefore the performance are -

The heap usage and the old GC duration

The two statistics intertwined together. The high heap usage, such as 75%, will lead to a GC, while GC with high heap usage will take longer time. We have to keep both numbers as low as possible.

     "jvm" : {
        "mem" : {
          "heap_used_percent" : 89,
          ... 
        "gc" : {
          "collectors" : {
            ...
            "old" : {
              "collection_count" : 225835,
              "collection_time_in_millis" : 22624857
            }
          }
        }
Thread pools

There are three kinds of thread pools: active, queue, and reject. It is useful to visualize the real time change. Once there are a lot of queued threads or rejected threads, it is good time to think about scale up or scale out.

"thread_pool" : {
        "bulk" : {
          "threads" : 4,
          "queue" : 0,
          "active" : 0,
          "rejected" : 0,
          "largest" : 4,
          "completed" : 53680
        },
The segments’ number/size

The segments are the in-memory inverted indexes corresponding to the indexes on the hard disk, which are persistent in the physical memory and GC will have no effect on them. The segment will have the footage on every search thread. The size of the segments are important because they will be multiplied by a factor of the number of threads.

        "segments" : {
          "count" : 215,
          "memory_in_bytes" : 15084680,
        },

The number of shards actually controls the number of the segments. The shards increase, then the size of the segments decreases and the number of the segments increases. So we cannot increase the number of shards as many as we want. If there are many small segments, the heap usage will turn much higher. The solution is Force merge, which is time-consuming but effecitve.

3. Kibana as BI dashboard

3.1 Plugins

Kibana integreated DevTools(previously The Sense UI) for free. DevTools has code assistance and is a powerful tool for debugging. If the budget is not an issue, Xpack is also highly recommended. As for Elasticsearch, since 5.0, ingest-geoip is now a plugin. We will have to write it to Ansible YAML such as -

es_plugins:
    - plugin: ingest-geoip

3.2 Instant Aggregation

There are quite a few KPIs that need system-wide term aggregations. From 5.0 the request cache will be enabled by default for all requests with size:0.
For example -

POST /big_data_index/data/_search
{   "size": 0,
    "query": {
        "bool": {
            "must_not": {
                "exists": {
                    "field": "interesting_field"
                }
            }
        }
    }
}

The Fore merge as mentioned above, such asPOST /_forcemerge?max_num_segments=1, will combine the segments and dramatically increase the aggregation speed.

3.3 Proxy

Nginx is possibly the best proxy as the frontend toward Kibana. There are two advantages: first the proxy can cache the static resources of Kibana; second we can always check the Nginx logs to figure out what causes problem for Kibana.

Conclusion

Elasticsearch and Kibana together provide high availability and high scalability for large BI system.

if you have any comment, please email me wm@sasanalysis.com

Good math, bad engineering

As a formal statistician and a current engineer, I feel that a successful engineering project may require both the mathematician’s abilit...