Tuesday, December 23, 2014

Spark practice (3): clean and sort Social Security numbers

Sample.txt
Requirements:
1. separate valid SSN and invalid SSN
2. count the number of valid SSN
402-94-7709 
283-90-3049 
124-01-2425 
1231232
088-57-9593 
905-60-3585 
44-82-8341
257581087
327-84-0220
402-94-7709

Thoughts

SSN indexed data is commonly seen and stored in many file systems. The trick to accelerate the speed on Spark is to build a numerical key and use the sortByKey operator. Besides, the accumulator provides a global variable existing across machines in a cluster, which is especially useful for counting data.

Single machine solution

#!/usr/bin/env python
# coding=utf-8
htable = {}
valid_cnt = 0
with open('sample.txt', 'rb') as infile, open('sample_bad.txt', 'wb') as outfile:
    for l in infile:
        l = l.strip()
        nums = l.split('-')
        key = -1
        if l.isdigit() and len(l) == 9:
            key = int(l)
        if len(nums) == 3 and map(len, nums) == [3, 2, 4]:
            key = 1000000*int(nums[0]) + 10000*int(nums[1]) + int(nums[2])
        if key == -1:
            outfile.write(l + '\n')
        else:
            if key not in htable:
                htable[key] = l
                valid_cnt += 1

with open('sample_sorted.txt', 'wb') as outfile:
    for x in sorted(htable):
        outfile.write(htable[x] + '\n')

print valid_cnt

Cluster solution

#!/usr/bin/env python
# coding=utf-8
import pyspark
sc = pyspark.SparkContext()
valid_cnt = sc.accumulator(0)

def is_validSSN(l):
    l = l.strip()
    nums = l.split('-')
    cdn1 = (l.isdigit() and len(l) == 9)
    cdn2 = (len(nums) == 3 and map(len, nums) == [3, 2, 4])
    if cdn1 or cdn2:
        return True
    return False

def set_key(l):
    global valid_cnt
    valid_cnt += 1
    l = l.strip()
    if len(l) == 9:
        return (int(l), l)
    nums = l.split('-')
    return (1000000*int(nums[0]) + 10000*int(nums[1]) + int(nums[2]), l)

rdd = sc.textFile('sample.txt')
rdd1 = rdd.filter(lambda x: not is_validSSN(x))

rdd2 = rdd.filter(is_validSSN).distinct() \
    .map(lambda x: set_key(x)) \
    .sortByKey().map(lambda x: x[1])

for x in rdd1.collect():
    print 'Invalid SSN\t', x

for x in rdd2.collect():
    print 'valid SSN\t', x

print '\nNumber of valid SSN is {}'.format(valid_cnt)

# Save RDD to file system
rdd1.saveAsTextFile('sample_bad')
rdd2.saveAsTextFile('sample_sorted')
sc.stop()

Good math, bad engineering

As a formal statistician and a current engineer, I feel that a successful engineering project may require both the mathematician’s abilit...