From 5babe88f332d30b2dee6da8fa7ada33eb2f9fc19 Mon Sep 17 00:00:00 2001 From: Timothy Hunter Date: Tue, 1 Dec 2015 16:15:08 -0800 Subject: [PATCH 1/4] work --- python/pdspark/converter.py | 83 +++++++++++++++++++++++++++++++ python/pdspark/converter_tests.py | 45 +++++++++++++++++ python/pdspark/test_common.py | 15 ++++++ python/pdspark/tests.py | 5 +- 4 files changed, 146 insertions(+), 2 deletions(-) create mode 100644 python/pdspark/converter_tests.py create mode 100644 python/pdspark/test_common.py diff --git a/python/pdspark/converter.py b/python/pdspark/converter.py index 7afa8d8..4a6c350 100644 --- a/python/pdspark/converter.py +++ b/python/pdspark/converter.py @@ -15,6 +15,7 @@ from pyspark.ml.regression import LinearRegressionModel from pyspark.mllib.linalg import DenseVector, SparseVector, Vectors, VectorUDT from pyspark.sql.functions import udf +from pyspark.sql import SQLContext from udt import CSRVectorUDT from util import _new_java_obj, _randomUID @@ -36,6 +37,7 @@ def __init__(self, sc): :param sc: SparkContext """ self.sc = sc + self.sqlContext = SQLContext(self.sc) # For conversions sklearn -> Spark self._skl2spark_classes = { SKL_LogisticRegression : @@ -161,3 +163,84 @@ def toScipy(self, X): else: raise TypeError("Converter.toScipy expected numpy.ndarray of" " scipy.sparse.csr.csr_matrix instances, but found: %s" % type(X)) + + @staticmethod + def _analyze_element(x): + if type(x) is float: + return (x, np.double) + if type(x) is int: + return (x, np.int) + if type(x) is long: + return (x, np.long) + if type(x) is DenseVector: + return (x.toArray(), (np.double, len(x.toArray()))) + # TODO(tjh) support sparse arrays + raise ValueError("The type %s could not be understood. Element was %s" % (type(x), x)) + + @staticmethod + def _analyze_df(df): + """ Converts a dataframe into a numpy array. + """ + rows = df.collect() + conversions = [[Converter._analyze_element(x) for x in row] for row in rows] + types = [t for d, t in conversions[0]] + data = [tuple([d for d, t in labeled_elts]) for labeled_elts in conversions] + names = list(df.columns) + dt = np.dtype({'names': names, 'formats': types}) + arr = np.array(data, dtype=dt) + return arr + + def pack_DataFrame(self, **args): + """ Converts a set of numpy arrays into a single dataframe. + + The argument name is used to infer the name of the column. The columns may not be in + the order they are provided. Each column needs to have the same number of elements. + + Example: + + >>> X = np.zeros((10,4)) + >>> y = np.ones(10) + >>> df = conv.packDataFrame(x = X, y = y) + >>> df.printSchema() + root + |-- y: double (nullable = true) + |-- x: vector (nullable = true) + """ + def convert(z): + if len(z.shape) == 1: + return z.tolist() + if len(z.shape) == 2: + return [Vectors.dense(row) for row in z.tolist()] + assert False, (z.shape) + pairs = [(name, convert(data)) for (name, data) in args.items()] + vecs = zip(*[data for (_, data) in pairs]) + names = [name for (name, _) in pairs] + return self.sqlContext.createDataFrame(vecs, names) + + @staticmethod + def df_to_numpy(df, *args): + """ Converts a dataframe into a (local) numpy array. Each column is named after the same + column name in the data frame. + + The varargs provide (in order) the list of columns to extract from the dataframe. + If none are provided, all the columns from the dataframe are extracted. + + This method only handles basic numerical types, or dense vectors with the same length. + + Note: it is not particularly optimized, do not push it too hard. + + Example: + >>> z = dataFrameColumn(df) + >>> z['x'].dtype, z['x'].shape + >>> z = dataFrameColumn(df, 'y') + >>> z['y'].dtype, z['y'].shape + """ + column_names = df.columns + if not args: + args = column_names + column_nameset = set(column_names) + for name in args: + assert name in column_nameset, (name, column_names) + # Just get the interesting columns + projected = df.select(*args) + return Converter._analyze_df(projected) \ No newline at end of file diff --git a/python/pdspark/converter_tests.py b/python/pdspark/converter_tests.py new file mode 100644 index 0000000..5ab44c5 --- /dev/null +++ b/python/pdspark/converter_tests.py @@ -0,0 +1,45 @@ +import sklearn +import numpy as np +import numpy.random as rd +import unittest + +from .test_common import get_context +from pdspark import Converter + +sc = get_context() + +n = 5 +A = rd.rand(n,4) +B = rd.rand(n) +C = rd.randint(10, size=n) + +class MLlibTestCase(unittest.TestCase): + + def setUp(self): + self.sc = sc + self.conv = Converter(self.sc) + self.n = 5 + + def test_pack(self): + df = self.conv.pack_DataFrame(a=A, b=B, c=C) + dt = dict(df.dtypes) + assert dt == {'a':'vector', 'b': 'double', 'c': 'bigint'}, dt + df.collect() # Force creation + + def test_unpack(self): + df = self.conv.pack_DataFrame(a=A, b=B, c=C) + Z = Converter.df_to_numpy(df) + + assert np.all(Z['a'] == A), (Z['a'], A) + assert np.all(Z['b'] == B), (Z['B'], B) + assert np.all(Z['c'] == C), (Z['c'], C) + assert Z['c'].dtype == C.dtype + + def test_unpack_select(self): + df = self.conv.pack_DataFrame(a=A, b=B, c=C) + Z = Converter.df_to_numpy(df, 'a', 'c') + + assert np.all(Z['a'] == A), (Z['a'], A) + assert np.all(Z['c'] == C), (Z['c'], C) + assert 'b' not in Z.dtype.fields + diff --git a/python/pdspark/test_common.py b/python/pdspark/test_common.py new file mode 100644 index 0000000..a2d3d5d --- /dev/null +++ b/python/pdspark/test_common.py @@ -0,0 +1,15 @@ +""" +Common variables for all tests. +""" +from pyspark import SparkContext + +__all__ = ['get_context'] + +_sc = None + +def get_context(): + global _sc + if not _sc: + _sc = SparkContext('local[4]', "spark-sklearn tests") + return _sc + diff --git a/python/pdspark/tests.py b/python/pdspark/tests.py index 693de08..27b602b 100644 --- a/python/pdspark/tests.py +++ b/python/pdspark/tests.py @@ -23,7 +23,7 @@ from sklearn.grid_search import GridSearchCV as SKL_GridSearchCV from sklearn.pipeline import Pipeline as SKL_Pipeline -from pyspark import SparkContext + from pyspark.ml import Pipeline from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel from pyspark.ml.evaluation import RegressionEvaluator @@ -35,8 +35,9 @@ from pdspark.converter import Converter from pdspark.grid_search import GridSearchCV +from .test_common import get_context -sc = SparkContext('local[4]', "spark-sklearn tests") +sc = get_context() class MLlibTestCase(unittest.TestCase): def setUp(self): From 41ed092bdbd76f948cfddecd9d50b44e17773471 Mon Sep 17 00:00:00 2001 From: Timothy Hunter Date: Wed, 2 Dec 2015 10:39:14 -0800 Subject: [PATCH 2/4] comments --- python/pdspark/converter.py | 12 ++++++------ python/pdspark/converter_tests.py | 15 ++++++++++----- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/python/pdspark/converter.py b/python/pdspark/converter.py index 4a6c350..00d4a8b 100644 --- a/python/pdspark/converter.py +++ b/python/pdspark/converter.py @@ -190,7 +190,7 @@ def _analyze_df(df): arr = np.array(data, dtype=dt) return arr - def pack_DataFrame(self, **args): + def pack_DataFrame(self, **kwargs): """ Converts a set of numpy arrays into a single dataframe. The argument name is used to infer the name of the column. The columns may not be in @@ -200,7 +200,7 @@ def pack_DataFrame(self, **args): >>> X = np.zeros((10,4)) >>> y = np.ones(10) - >>> df = conv.packDataFrame(x = X, y = y) + >>> df = conv.pack_DataFrame(x = X, y = y) >>> df.printSchema() root |-- y: double (nullable = true) @@ -212,7 +212,7 @@ def convert(z): if len(z.shape) == 2: return [Vectors.dense(row) for row in z.tolist()] assert False, (z.shape) - pairs = [(name, convert(data)) for (name, data) in args.items()] + pairs = [(name, convert(data)) for (name, data) in kwargs.items()] vecs = zip(*[data for (_, data) in pairs]) names = [name for (name, _) in pairs] return self.sqlContext.createDataFrame(vecs, names) @@ -230,9 +230,9 @@ def df_to_numpy(df, *args): Note: it is not particularly optimized, do not push it too hard. Example: - >>> z = dataFrameColumn(df) + >>> z = conv.df_to_numpy(df) >>> z['x'].dtype, z['x'].shape - >>> z = dataFrameColumn(df, 'y') + >>> z = conv.df_to_numpy(df, 'y') >>> z['y'].dtype, z['y'].shape """ column_names = df.columns @@ -243,4 +243,4 @@ def df_to_numpy(df, *args): assert name in column_nameset, (name, column_names) # Just get the interesting columns projected = df.select(*args) - return Converter._analyze_df(projected) \ No newline at end of file + return Converter._analyze_df(projected) diff --git a/python/pdspark/converter_tests.py b/python/pdspark/converter_tests.py index 5ab44c5..cb8b653 100644 --- a/python/pdspark/converter_tests.py +++ b/python/pdspark/converter_tests.py @@ -1,10 +1,9 @@ -import sklearn import numpy as np import numpy.random as rd import unittest -from .test_common import get_context from pdspark import Converter +from pdspark.test_common import get_context sc = get_context() @@ -16,22 +15,28 @@ class MLlibTestCase(unittest.TestCase): def setUp(self): + super(MLlibTestCase, self).setUp() self.sc = sc self.conv = Converter(self.sc) - self.n = 5 def test_pack(self): df = self.conv.pack_DataFrame(a=A, b=B, c=C) dt = dict(df.dtypes) assert dt == {'a':'vector', 'b': 'double', 'c': 'bigint'}, dt - df.collect() # Force creation + z = df.collect() + assert len(z) == n + for row in z: + assert len(row) == 3, row + assert row['a'] is not None, row + assert row['b'] is not None, row + assert row['c'] is not None, row def test_unpack(self): df = self.conv.pack_DataFrame(a=A, b=B, c=C) Z = Converter.df_to_numpy(df) assert np.all(Z['a'] == A), (Z['a'], A) - assert np.all(Z['b'] == B), (Z['B'], B) + assert np.all(Z['b'] == B), (Z['b'], B) assert np.all(Z['c'] == C), (Z['c'], C) assert Z['c'].dtype == C.dtype From ac0a0b605cea3f64582de547ab15ea7300bdfaff Mon Sep 17 00:00:00 2001 From: Timothy Hunter Date: Mon, 21 Dec 2015 13:51:57 -0800 Subject: [PATCH 3/4] comments --- python/spark_sklearn/converter.py | 24 +++++++++++++++---- .../converter_np_test.py} | 16 ++++++------- 2 files changed, 27 insertions(+), 13 deletions(-) rename python/{pdspark/converter_tests.py => spark_sklearn/converter_np_test.py} (75%) diff --git a/python/spark_sklearn/converter.py b/python/spark_sklearn/converter.py index 00d4a8b..e1d8c42 100644 --- a/python/spark_sklearn/converter.py +++ b/python/spark_sklearn/converter.py @@ -190,17 +190,26 @@ def _analyze_df(df): arr = np.array(data, dtype=dt) return arr - def pack_DataFrame(self, **kwargs): + def numpy_to_df(self, **kwargs): """ Converts a set of numpy arrays into a single dataframe. - The argument name is used to infer the name of the column. The columns may not be in - the order they are provided. Each column needs to have the same number of elements. + The argument name is used to infer the name of the column. The value of the argument is a + numpy array with a shape of length 0, 1, or 2. The dtype is one of the data types supported + by sparkSQL. This includes np.double, np.float, np.int and np.long. + See the whole list of supported types in the Spark SQL documentation: + http://spark.apache.org/docs/latest/sql-programming-guide.html#data-types + + The columns may not be in the order they are provided. Each column needs to have the same + number of elements. + + :return: A pyspark.sql.DataFrame object, or raises a ValueError if the data type could not + be understood. Example: >>> X = np.zeros((10,4)) >>> y = np.ones(10) - >>> df = conv.pack_DataFrame(x = X, y = y) + >>> df = conv.numpy_to_df(x = X, y = y) >>> df.printSchema() root |-- y: double (nullable = true) @@ -211,7 +220,7 @@ def convert(z): return z.tolist() if len(z.shape) == 2: return [Vectors.dense(row) for row in z.tolist()] - assert False, (z.shape) + raise ValueError("Cannot convert a numpy array with more than 2 dimensions") pairs = [(name, convert(data)) for (name, data) in kwargs.items()] vecs = zip(*[data for (_, data) in pairs]) names = [name for (name, _) in pairs] @@ -229,6 +238,10 @@ def df_to_numpy(df, *args): Note: it is not particularly optimized, do not push it too hard. + :param df: a pyspark.sql.DataFrame object + :param args: a list of strings that are column names in the dataframe + :return: a structured numpy array with the content of the data frame. + Example: >>> z = conv.df_to_numpy(df) >>> z['x'].dtype, z['x'].shape @@ -243,4 +256,5 @@ def df_to_numpy(df, *args): assert name in column_nameset, (name, column_names) # Just get the interesting columns projected = df.select(*args) + return Converter._analyze_df(projected) diff --git a/python/pdspark/converter_tests.py b/python/spark_sklearn/converter_np_test.py similarity index 75% rename from python/pdspark/converter_tests.py rename to python/spark_sklearn/converter_np_test.py index cb8b653..c8ecf9b 100644 --- a/python/pdspark/converter_tests.py +++ b/python/spark_sklearn/converter_np_test.py @@ -2,25 +2,25 @@ import numpy.random as rd import unittest -from pdspark import Converter -from pdspark.test_common import get_context +from .converter import Converter +from .test_utils import create_sc -sc = get_context() +sc = create_sc() n = 5 A = rd.rand(n,4) B = rd.rand(n) C = rd.randint(10, size=n) -class MLlibTestCase(unittest.TestCase): +class NumpyConverterTestCase(unittest.TestCase): def setUp(self): - super(MLlibTestCase, self).setUp() + super(NumpyConverterTestCase, self).setUp() self.sc = sc self.conv = Converter(self.sc) def test_pack(self): - df = self.conv.pack_DataFrame(a=A, b=B, c=C) + df = self.conv.numpy_to_df(a=A, b=B, c=C) dt = dict(df.dtypes) assert dt == {'a':'vector', 'b': 'double', 'c': 'bigint'}, dt z = df.collect() @@ -32,7 +32,7 @@ def test_pack(self): assert row['c'] is not None, row def test_unpack(self): - df = self.conv.pack_DataFrame(a=A, b=B, c=C) + df = self.conv.numpy_to_df(a=A, b=B, c=C) Z = Converter.df_to_numpy(df) assert np.all(Z['a'] == A), (Z['a'], A) @@ -41,7 +41,7 @@ def test_unpack(self): assert Z['c'].dtype == C.dtype def test_unpack_select(self): - df = self.conv.pack_DataFrame(a=A, b=B, c=C) + df = self.conv.numpy_to_df(a=A, b=B, c=C) Z = Converter.df_to_numpy(df, 'a', 'c') assert np.all(Z['a'] == A), (Z['a'], A) From 38ae150629829e79102a7e4b2dde79f38681281f Mon Sep 17 00:00:00 2001 From: Timothy Hunter Date: Mon, 21 Dec 2015 13:52:57 -0800 Subject: [PATCH 4/4] removing old code --- python/pdspark/test_common.py | 15 -- python/pdspark/tests.py | 354 ---------------------------------- 2 files changed, 369 deletions(-) delete mode 100644 python/pdspark/test_common.py delete mode 100644 python/pdspark/tests.py diff --git a/python/pdspark/test_common.py b/python/pdspark/test_common.py deleted file mode 100644 index a2d3d5d..0000000 --- a/python/pdspark/test_common.py +++ /dev/null @@ -1,15 +0,0 @@ -""" -Common variables for all tests. -""" -from pyspark import SparkContext - -__all__ = ['get_context'] - -_sc = None - -def get_context(): - global _sc - if not _sc: - _sc = SparkContext('local[4]', "spark-sklearn tests") - return _sc - diff --git a/python/pdspark/tests.py b/python/pdspark/tests.py deleted file mode 100644 index 27b602b..0000000 --- a/python/pdspark/tests.py +++ /dev/null @@ -1,354 +0,0 @@ - -import sys - -if sys.version_info[:2] <= (2, 6): - try: - import unittest2 as unittest - except ImportError: - sys.stderr.write('Please install unittest2 to test with Python 2.6 or earlier') - sys.exit(1) -else: - import unittest - -import numpy as np -import pandas -import scipy -from scipy.sparse import csr_matrix - -from sklearn.linear_model import Lasso as SKL_Lasso -from sklearn.linear_model import LogisticRegression as SKL_LogisticRegression -from sklearn.linear_model import LinearRegression as SKL_LinearRegression -from sklearn.feature_extraction.text import HashingVectorizer as SKL_HashingVectorizer -from sklearn.feature_extraction.text import TfidfTransformer as SKL_TfidfTransformer -from sklearn.grid_search import GridSearchCV as SKL_GridSearchCV -from sklearn.pipeline import Pipeline as SKL_Pipeline - - -from pyspark.ml import Pipeline -from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel -from pyspark.ml.evaluation import RegressionEvaluator -from pyspark.ml.feature import HashingTF, Tokenizer -from pyspark.ml.regression import LinearRegression, LinearRegressionModel -from pyspark.ml.tuning import ParamGridBuilder, CrossValidator -from pyspark.mllib.linalg import Vectors -from pyspark.sql import SQLContext - -from pdspark.converter import Converter -from pdspark.grid_search import GridSearchCV -from .test_common import get_context - -sc = get_context() - -class MLlibTestCase(unittest.TestCase): - def setUp(self): - self.sc = sc - self.sql = SQLContext(sc) - self.X = np.array([[1,2,3], - [-1,2,3], [1,-2,3], [1,2,-3], - [-1,-2,3], [1,-2,-3], [-1,2,-3], - [-1,-2,-3]]) - self.y = np.array([1, 0, 1, 1, 0, 1, 0, 0]) - data = [(float(self.y[i]), Vectors.dense(self.X[i])) for i in range(len(self.y))] - self.df = self.sql.createDataFrame(data, ["label", "features"]) - - @staticmethod - def list2csr(x): - """ - Convert list to a scipy.sparse.csr_matrix - :param data: list - :return: csr_matrix with 1 row - """ - return csr_matrix((np.array(x), np.array(range(0, len(x))), np.array([0, len(x)]))) - - -class ConverterTests(MLlibTestCase): - - def setUp(self): - super(ConverterTests, self).setUp() - self.converter = Converter(self.sc) - - def _compare_GLMs(self, skl, spark): - """ Compare weights, intercept of sklearn, Spark GLMs - """ - skl_weights = Vectors.dense(skl.coef_.flatten()) - self.assertEqual(skl_weights, spark.weights) - self.assertEqual(skl.intercept_, spark.intercept) - - def test_LogisticRegression_skl2spark(self): - skl_lr = SKL_LogisticRegression().fit(self.X, self.y) - lr = self.converter.toSpark(skl_lr) - self.assertTrue(isinstance(lr, LogisticRegressionModel), - "Expected LogisticRegressionModel but found type %s" % type(lr)) - self._compare_GLMs(skl_lr, lr) - - def test_LinearRegression_skl2spark(self): - skl_lr = SKL_LinearRegression().fit(self.X, self.y) - lr = self.converter.toSpark(skl_lr) - self.assertTrue(isinstance(lr, LinearRegressionModel), - "Expected LinearRegressionModel but found type %s" % type(lr)) - self._compare_GLMs(skl_lr, lr) - - def test_LogisticRegression_spark2skl(self): - lr = LogisticRegression().fit(self.df) - skl_lr = self.converter.toSKLearn(lr) - self.assertTrue(isinstance(skl_lr, SKL_LogisticRegression), - "Expected sklearn LogisticRegression but found type %s" % type(skl_lr)) - self._compare_GLMs(skl_lr, lr) - - def test_LinearRegression_spark2skl(self): - lr = LinearRegression().fit(self.df) - skl_lr = self.converter.toSKLearn(lr) - self.assertTrue(isinstance(skl_lr, SKL_LinearRegression), - "Expected sklearn LinearRegression but found type %s" % type(skl_lr)) - self._compare_GLMs(skl_lr, lr) - - def ztest_toPandas(self): - data = [(Vectors.dense([0.1, 0.2]),), - (Vectors.sparse(2, {0:0.3, 1:0.4}),), - (Vectors.sparse(2, {0:0.5, 1:0.6}),)] - df = self.sql.createDataFrame(data, ["features"]) - self.assertEqual(df.count(), 3) - pd = self.converter.toPandas(df) - self.assertEqual(len(pd), 3) - self.assertTrue(isinstance(pd.features[0], csr_matrix), - "Expected pd.features[0] to be csr_matrix but found: %s" % - type(pd.features[0])) - self.assertEqual(pd.features[0].shape[0], 3) - self.assertEqual(pd.features[0].shape[1], 2) - self.assertEqual(pd.features[0][0,0], 0.1) - self.assertEqual(pd.features[0][0,1], 0.2) - - -class CSRVectorUDTTests(MLlibTestCase): - - def test_scipy_sparse(self): - data = [(self.list2csr([0.1, 0.2]),)] - df = self.sql.createDataFrame(data, ["features"]) - self.assertEqual(df.count(), 1) - pd = df.toPandas() - self.assertEqual(len(pd), 1) - self.assertTrue(isinstance(pd.features[0], csr_matrix), - "Expected pd.features[0] to be csr_matrix but found: %s" % - type(pd.features[0])) - self.assertEqual(pd.features[0].shape[0], 1) - self.assertEqual(pd.features[0].shape[1], 2) - self.assertEqual(pd.features.values[0][0,0], 0.1) - self.assertEqual(pd.features.values[0][0,1], 0.2) - - -class CVTests(MLlibTestCase): - - def setUp(self): - super(CVTests, self).setUp() - self.converter = Converter(self.sc) - - def test_cv_linreg(self): - pipeline = SKL_Pipeline([ - ('lasso', SKL_Lasso(max_iter=1)) - ]) - parameters = { - 'lasso__alpha': (0.001, 0.005, 0.01) - } - grid_search = GridSearchCV(self.sc, pipeline, parameters) - X = scipy.sparse.vstack(map(lambda x: self.list2csr([x, x+1.0]), range(0, 100))) - y = np.array(list(range(0, 100))).reshape((100,1)) - skl_gs = grid_search.fit(X, y) - assert isinstance(skl_gs, SKL_GridSearchCV),\ - "GridSearchCV expected git to return a scikit-learn GridSearchCV instance," \ - " but found type %s" % type(skl_gs) - assert len(skl_gs.grid_scores_) == len(parameters['lasso__alpha']) - # TODO - for gs in skl_gs.grid_scores_: - pass # assert(gs.) - - def test_cv_pipeline(self): - pipeline = SKL_Pipeline([ - ('vect', SKL_HashingVectorizer(n_features=20)), - ('tfidf', SKL_TfidfTransformer(use_idf=False)), - ('lasso', SKL_Lasso(max_iter=1)) - ]) - parameters = { - 'lasso__alpha': (0.001, 0.005, 0.01) - } - grid_search = GridSearchCV(self.sc, pipeline, parameters) - data = [('hi there', 0.0), - ('what is up', 1.0), - ('huh', 1.0), - ('now is the time', 5.0), - ('for what', 0.0), - ('the spark was there', 5.0), - ('and so', 3.0), - ('were many socks', 0.0), - ('really', 1.0), - ('too cool', 2.0)] - df = self.sql.createDataFrame(data, ["review", "rating"]).toPandas() - skl_gs = grid_search.fit(df.review.values, df.rating.values) - assert isinstance(skl_gs, SKL_GridSearchCV), \ - "GridSearchCV expected git to return a scikit-learn GridSearchCV instance," \ - " but found type %s" % type(skl_gs) - assert len(skl_gs.grid_scores_) == len(parameters['lasso__alpha']) - # TODO - for gs in skl_gs.grid_scores_: - pass # assert(gs.) - - def test_cv_lasso_with_mllib_featurization(self): - data = [('hi there', 0.0), - ('what is up', 1.0), - ('huh', 1.0), - ('now is the time', 5.0), - ('for what', 0.0), - ('the spark was there', 5.0), - ('and so', 3.0), - ('were many socks', 0.0), - ('really', 1.0), - ('too cool', 2.0)] - data = self.sql.createDataFrame(data, ["review", "rating"]) - - # Feature extraction using MLlib - tokenizer = Tokenizer(inputCol="review", outputCol="words") - hashingTF = HashingTF(inputCol="words", outputCol="features", numFeatures=20000) - pipeline = Pipeline(stages=[tokenizer, hashingTF]) - data = pipeline.fit(data).transform(data) - - df = self.converter.toPandas(data.select(data.features.alias("review"), "rating")) - - pipeline = SKL_Pipeline([ - ('lasso', SKL_Lasso(max_iter=1)) - ]) - parameters = { - 'lasso__alpha': (0.001, 0.005, 0.01) - } - - grid_search = GridSearchCV(self.sc, pipeline, parameters) - skl_gs = grid_search.fit(df.review.values, df.rating.values) - - assert isinstance(skl_gs, SKL_GridSearchCV), \ - "GridSearchCV expected git to return a scikit-learn GridSearchCV instance," \ - " but found type %s" % type(skl_gs) - assert len(skl_gs.grid_scores_) == len(parameters['lasso__alpha']) - # TODO - for gs in skl_gs.grid_scores_: - pass # assert(gs.) - - def test_demo(self): - print "\n========================" - print "DEMO PART 1" - print "========================\n" - trainFilepath = "/Users/josephkb/Desktop/demotest" - data = self.sql.read.format("json").load(trainFilepath) - df = data.toPandas() - print "%d reviews" % len(df) - - reviews = df.review.values - ratings = df.rating.values - # Train scikit-learn model - pipeline = SKL_Pipeline([ - ('vect', SKL_HashingVectorizer(n_features=100)), - ('tfidf', SKL_TfidfTransformer(use_idf=False)), - ('lasso', SKL_Lasso(max_iter=2)), - ]) - parameters = { - 'lasso__alpha': (0.001, 0.005, 0.01) - } - - nfolds = 3 - - grid_search = SKL_GridSearchCV(pipeline, parameters, cv=nfolds) - grid_search.fit(reviews, ratings) - print("Best (training) R^2 score: %g" % grid_search.best_score_) - pipeline = grid_search.best_estimator_ - - r2 = pipeline.score(reviews, ratings) - print "Training data R^2 score: %g" % r2 - - test = self.sql.read.format("json").load(trainFilepath).toPandas() - r2 = pipeline.score(test.review.values, test.rating.values) - print "Test data R^2 score: %g" % r2 - - predictions = pipeline.predict(test.review.values) - pdPredictions = pandas.DataFrame(predictions) - sparkPredictions = self.sql.createDataFrame(pdPredictions) - print "sparkPredictions.count(): %d" % sparkPredictions.count() - - print "\n========================" - print "DEMO PART 2" - print "========================\n" - - grid_search = GridSearchCV(self.sc, pipeline, parameters, cv=nfolds) - grid_search.fit(reviews, ratings) - grid_search = grid_search.sklearn_model_ - print("Best (training) R^2 score: %g" % grid_search.best_score_) - pipeline = grid_search.best_estimator_ - - r2 = pipeline.score(reviews, ratings) - print "Training data R^2 score: %g" % r2 - - test = self.sql.read.format("json").load(trainFilepath).toPandas() - r2 = pipeline.score(test.review.values, test.rating.values) - print "Test data R^2 score: %g" % r2 - - predictions = pipeline.predict(test.review.values) - pdPredictions = pandas.DataFrame(predictions) - sparkPredictions = self.sql.createDataFrame(pdPredictions) - print "sparkPredictions.count(): %d" % sparkPredictions.count() - - print "\n========================" - print "DEMO PART 3" - print "========================\n" - - print "%d reviews" % data.count() - - # Define a pipeline combining text feature extractors - tokenizer = Tokenizer(inputCol="review", outputCol="words") - hashingTF = HashingTF(inputCol="words", outputCol="features", numFeatures=100) - pipeline = Pipeline(stages=[tokenizer, hashingTF]) - data2 = pipeline.fit(data).transform(data) - - df = self.converter.toPandas(data2.select(data2.features.alias("review"), "rating")) - - reviews = self.converter.toScipy(df.review.values) - ratings = df.rating.values - - pipeline = SKL_Pipeline([ - ('lasso', SKL_Lasso(max_iter=2)), - ]) - - grid_search = GridSearchCV(self.sc, pipeline, parameters, cv=nfolds) - grid_search.fit(reviews, ratings) - grid_search = grid_search.sklearn_model_ - print("Best (training) R^2 score: %g" % grid_search.best_score_) - pipeline = grid_search.best_estimator_ - - r2 = pipeline.score(reviews, ratings) - print "Training data R^2 score: %g" % r2 - - # Skip testing for this part of demo - - print "\n========================" - print "DEMO PART 4" - print "========================\n" - - # Define a pipeline combining a text feature extractor with a simple regressor - tokenizer = Tokenizer(inputCol="review", outputCol="words") - hashingTF = HashingTF(inputCol="words", outputCol="features", numFeatures=100) - lasso = LinearRegression(labelCol="rating", elasticNetParam=1.0, maxIter=2) - pipeline = Pipeline(stages=[tokenizer, hashingTF, lasso]) - - paramGrid = ParamGridBuilder() \ - .addGrid(lasso.regParam, [0.001, 0.005, 0.01]) \ - .build() - - evaluator = RegressionEvaluator(labelCol="rating", metricName="r2") - - cv = CrossValidator(estimator=pipeline, evaluator=evaluator, estimatorParamMaps=paramGrid) - - cvModel = cv.fit(data) - - test = self.sql.read.format("json").load(trainFilepath) - r2 = evaluator.evaluate(cvModel.transform(test)) - print "Test data R^2 score: %g" % r2 - - -if __name__ == "__main__": - unittest.main() - sc.stop()