Friday, December 12, 2014

Spark practice (2): query text using SQL

In a class of a few children, use SQL to find those who are male and weight over 100.
class.txt (including Name Sex Age Height Weight)
Alfred M 14 69.0 112.5 
Alice F 13 56.5 84.0 
Barbara F 13 65.3 98.0 
Carol F 14 62.8 102.5 
Henry M 14 63.5 102.5 
James M 12 57.3 83.0 
Jane F 12 59.8 84.5 
Janet F 15 62.5 112.5 
Jeffrey M 13 62.5 84.0 
John M 12 59.0 99.5 
Joyce F 11 51.3 50.5 
Judy F 14 64.3 90.0 
Louise F 12 56.3 77.0 
Mary F 15 66.5 112.0 
Philip M 16 72.0 150.0 
Robert M 12 64.8 128.0 
Ronald M 15 67.0 133.0 
Thomas M 11 57.5 85.0 
William M 15 66.5 112.0

Thoughts

The challenge is to transform unstructured data to structured data. In this question, a schema has to be applied including column name and type, so that the syntax of SQL is able to query the pure text.

Single machine solution

Straight-forward and simple if with Python’s built-in module sqlite3.
import sqlite3

conn = sqlite3.connect(':memory:')
c = conn.cursor()
c.execute("""CREATE TABLE class
             (Name text, Sex text, Age real, Height real, Weight real)""")

with open('class.txt') as infile:
    for l in infile:
        line = l.split()
        c.execute('INSERT INTO class VALUES (?,?,?,?,?)', line)
conn.commit()

for x in c.execute("SELECT * FROM class WHERE Sex = 'M' AND Weight > 100"):
    print x
conn.close()

Cluster solution

Spark SQL is built on Hive, and seamlessly queries the JSON formatted data that is semi-structured. To dump the JSON file on the file system will be the first step.
import os
import subprocess
import json
from pyspark import SparkContext
from pyspark.sql import HiveContext
sc = SparkContext()
hiveCtx = HiveContext(sc)
def trans(x):
    return {'Name': x[0], 'Sex': x[1], 'Age': int(x[2]), \
           'Height': float(x[3]), 'Weight': float(x[4])}
# Remove the output directory for JSON if it exists
if 'class-output' in os.listdir('.'):
    subprocess.call(['rm', '-rf', 'class-output'])

rdd = sc.textFile('class.txt')
rdd1 = rdd.map(lambda x: x.split()).map(lambda x: trans(x))
rdd1.map(lambda x: json.dumps(x)).saveAsTextFile('class-output')

infile = hiveCtx.jsonFile("class-output/part-00000")
infile.registerTempTable("class")

query = hiveCtx.sql("""SELECT * FROM class WHERE Sex = 'M' AND Weight > 100
      """)
for x in query.collect():
    print x

sc.stop()
 In a conclusion, JSON should be considered if SQL is desired on Spark.

Good math, bad engineering

As a formal statistician and a current engineer, I feel that a successful engineering project may require both the mathematician’s abilit...