Thursday, January 8, 2015

Spark practice(4): malicious web attack

Suppose there is a website tracking user activities to prevent robotic attack on the Internet. Please design an algorithm to identify user IDs that have more than 500 clicks within any given 10 minutes.
Sample.txt: anonymousUserID timeStamp clickCount
123    9:45am    10
234    9:46am    12
234    9:50am    20
456    9:53am    100
123    9:55am    33
456    9:56am    312
123    10:03am    110
123    10:16am    312
234    10:20am    201
456    10:23am    180
123    10:25am    393
456    10:27am    112
999    12:21pm    888

Thought

This is a typical example of stream processing. The key is to build a fixed-length window to slide through all data, count data within and return the possible malicious IDs.

Single machine solution

Two data structures are used: a queue and a hash table. The queue is scanning the data and only keeps the data within a 10-minute window. Once a new data entry is filled, the old ones out of the window are popped out. The hash table counts the data in the queue and will be updated with the changing queue. Any ID with more than 500 clicks will be added to a set.
from datetime import datetime
import time
from collections import deque

def get_minute(s, fmt = '%I:%M%p'):
    return time.mktime(datetime.strptime(s, fmt).timetuple())

def get_diff(s1, s2):
    return int(get_minute(s2) - get_minute(s1)) / 60

def find_ids(infile, duration, maxcnt):
    queue, htable, ans = deque(), {}, set()
    with open(infile, 'rt') as _infile:
        for l in _infile:
            line = l.split()
            line[2] = int(line[2])
            current_id, current_time, current_clk = line
            if current_id not in htable:
                htable[current_id] = current_clk
            else:
                htable[current_id] += current_clk
            queue.append(line)
            while queue and get_diff(queue[0][1], current_time) > duration:
                past_id, _, past_clk = queue.popleft()
                htable[past_id] -= past_clk
            if htable[current_id] > maxcnt:
                ans.add(current_id)
    return ans

if __name__ == "__main__":
    print find_ids('sample.txt', 10, 500)

Cluster solution

The newest Spark (version 1.2.0) starts to support Python streaming. However, the document is still scarce — wait to see if this problem can be done by the new API.
To be continued

Good math, bad engineering

As a formal statistician and a current engineer, I feel that a successful engineering project may require both the mathematician’s abilit...