Skip to main content

Questions tagged [pyspark]

The Spark Python API (PySpark) exposes the Apache Spark programming model to Python.

Filter by
Sorted by
Tagged with
342 votes
26 answers
664k views

How to change dataframe column names in PySpark?

I come from pandas background and am used to reading data from CSV files into a dataframe and then simply changing the column names to something useful using the simple command: df.columns = ...
Shubhanshu Mishra's user avatar
223 votes
15 answers
595k views

Show distinct column values in pyspark dataframe

With pyspark dataframe, how do you do the equivalent of Pandas df['col'].unique(). I want to list out all the unique values in a pyspark dataframe column. Not the SQL type way (registertemplate then ...
Satya's user avatar
  • 5,825
220 votes
1 answer
64k views

Spark performance for Scala vs Python

I prefer Python over Scala. But, as Spark is natively written in Scala, I was expecting my code to run faster in the Scala than the Python version for obvious reasons. With that assumption, I thought ...
Mrityunjay's user avatar
  • 2,261
208 votes
4 answers
349k views

How to add a constant column in a Spark DataFrame?

I want to add a column in a DataFrame with some arbitrary value (that is the same for each row). I get an error when I use withColumn as follows: dt.withColumn('new_column', 10).head(5) -------------...
Evan Zamir's user avatar
  • 8,339
186 votes
11 answers
434k views

Convert spark DataFrame column to python list

I work on a dataframe with two column, mvv and count. +---+-----+ |mvv|count| +---+-----+ | 1 | 5 | | 2 | 9 | | 3 | 3 | | 4 | 1 | i would like to obtain two list containing mvv values and ...
a.moussa's user avatar
  • 3,177
186 votes
11 answers
524k views

How do I add a new column to a Spark DataFrame (using PySpark)?

I have a Spark DataFrame (using PySpark 1.5.1) and would like to add a new column. I've tried the following without any success: type(randomed_hours) # => list # Create in Python and transform ...
Boris's user avatar
  • 2,055
185 votes
17 answers
197k views

How to turn off INFO logging in Spark?

I installed Spark using the AWS EC2 guide and I can launch the program fine using the bin/pyspark script to get to the spark prompt and can also do the Quick Start quide successfully. However, I ...
horatio1701d's user avatar
  • 9,069
180 votes
12 answers
515k views

Filter Pyspark dataframe column with None value

I'm trying to filter a PySpark dataframe that has None as a row value: df.select('dt_mvmt').distinct().collect() [Row(dt_mvmt=u'2016-03-27'), Row(dt_mvmt=u'2016-03-28'), Row(dt_mvmt=u'2016-03-29'),...
Ivan's user avatar
  • 20k
172 votes
18 answers
223k views

How to check if spark dataframe is empty?

Right now, I have to use df.count > 0 to check if the DataFrame is empty or not. But it is kind of inefficient. Is there any better way to do that? PS: I want to check if it's empty so that I only ...
auxdx's user avatar
  • 2,443
169 votes
5 answers
360k views

How to find the size or shape of a DataFrame in PySpark?

I am trying to find out the size/shape of a DataFrame in PySpark. I do not see a single function that can do this. In Python, I can do this: data.shape() Is there a similar function in PySpark? This ...
Xi Liang's user avatar
  • 1,759
165 votes
9 answers
411k views

How to delete columns in pyspark dataframe

>>> a DataFrame[id: bigint, julian_date: string, user_id: bigint] >>> b DataFrame[id: bigint, quan_created_money: decimal(10,0), quan_created_cnt: bigint] >>> a.join(b, a.id=...
xjx0524's user avatar
  • 1,661
163 votes
7 answers
434k views

How to change a dataframe column from String type to Double type in PySpark?

I have a dataframe with column as String. I wanted to change the column type to Double type in PySpark. Following is the way, I did: toDoublefunc = UserDefinedFunction(lambda x: x,DoubleType()) ...
Abhishek Choudhary's user avatar
154 votes
13 answers
347k views

Spark Dataframe distinguish columns with duplicated name

So as I know in Spark Dataframe, that for multiple columns can have the same name as shown in below dataframe snapshot: [ Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=...
resec's user avatar
  • 2,191
149 votes
8 answers
428k views

Sort in descending order in PySpark

I'm using PySpark (Python 2.7.9/Spark 1.3.1) and have a dataframe GroupObject which I need to filter & sort in the descending order. Trying to achieve it via this piece of code. group_by_dataframe....
rclakmal's user avatar
  • 1,972
139 votes
13 answers
427k views

Best way to get the max value in a Spark dataframe column

I'm trying to figure out the best way to get the largest value in a Spark dataframe column. Consider the following example: df = spark.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"]) df....
xenocyon's user avatar
  • 2,488
136 votes
15 answers
428k views

Concatenate two PySpark dataframes

I'm trying to concatenate two PySpark dataframes with some columns that are only on one of them: from pyspark.sql.functions import randn, rand df_1 = sqlContext.range(0, 10) +--+ |id| +--+ | 0| | 1| ...
Ivan's user avatar
  • 20k
136 votes
5 answers
307k views

How to kill a running Spark application?

I have a running Spark application where it occupies all the cores where my other applications won't be allocated any resource. I did some quick research and people suggested using YARN kill or /bin/...
B.Mr.W.'s user avatar
  • 19.4k
135 votes
6 answers
479k views

Convert pyspark string to date format

I have a date pyspark dataframe with a string column in the format of MM-dd-yyyy and I am attempting to convert this into a date column. I tried: df.select(to_date(df.STRING_COLUMN).alias('new_date'))....
Jenks's user avatar
  • 2,010
134 votes
20 answers
224k views

importing pyspark in python shell

This is a copy of someone else's question on another forum that was never answered, so I thought I'd re-ask it here, as I have the same issue. (See http://geekple.com/blogs/feeds/Xgzu7/posts/...
Glenn Strycker's user avatar
134 votes
42 answers
407k views

PySpark: "Exception: Java gateway process exited before sending the driver its port number"

I'm trying to run PySpark on my MacBook Air. When I try starting it up, I get the error: Exception: Java gateway process exited before sending the driver its port number when sc = SparkContext() is ...
mt88's user avatar
  • 2,945
127 votes
13 answers
427k views

Load CSV file with PySpark

I'm new to Spark and I'm trying to read CSV data from a file with Spark. Here's what I am doing : sc.textFile('file.csv') .map(lambda line: (line.split(',')[0], line.split(',')[1])) .collect() ...
Kernael's user avatar
  • 3,280
124 votes
15 answers
341k views

Join two data frames, select all columns from one and some columns from the other

Let's say I have a spark data frame df1, with several columns (among which the column id) and data frame df2 with two columns, id and other. Is there a way to replicate the following command: ...
Francesco Sambo's user avatar
122 votes
8 answers
126k views

How to fix 'TypeError: an integer is required (got type bytes)' error when trying to run pyspark after installing spark 2.4.4

I've installed OpenJDK 13.0.1 and python 3.8 and spark 2.4.4. Instructions to test the install is to run .\bin\pyspark from the root of the spark installation. I'm not sure if I missed a step in the ...
Chris's user avatar
  • 1,225
119 votes
3 answers
256k views

pyspark dataframe filter or include based on list

I am trying to filter a dataframe in pyspark using a list. I want to either filter based on the list or include only those records with a value in the list. My code below does not work: # define a ...
user3133475's user avatar
  • 3,101
113 votes
12 answers
264k views

How to find count of Null and Nan values for each column in a PySpark dataframe efficiently?

import numpy as np data = [ (1, 1, None), (1, 2, float(5)), (1, 3, np.nan), (1, 4, None), (1, 5, float(10)), (1, 6, float("nan")), (1, 6, float("nan&...
GeorgeOfTheRF's user avatar
112 votes
5 answers
258k views

Split Spark dataframe string column into multiple columns

I've seen various people suggesting that Dataframe.explode is a useful way to do this, but it results in more rows than the original dataframe, which isn't what I want at all. I simply want to do the ...
Peter Gaultney's user avatar
111 votes
11 answers
146k views

Renaming columns for PySpark DataFrame aggregates

I am analysing some data with PySpark DataFrames. Suppose I have a DataFrame df that I am aggregating: (df.groupBy("group") .agg({"money":"sum"}) .show(100) ) This ...
cantdutchthis's user avatar
109 votes
14 answers
207k views

Is it possible to get the current spark context settings in PySpark?

I'm trying to get the path to spark.worker.dir for the current sparkcontext. If I explicitly set it as a config param, I can read it back out of SparkConf, but is there anyway to access the complete ...
whisperstream's user avatar
108 votes
11 answers
121k views

Spark Error - Unsupported class file major version

I'm trying to install Spark on my Mac. I've used home-brew to install spark 2.4.0 and Scala. I've installed PySpark in my anaconda environment and am using PyCharm for development. I've exported to my ...
shbfy's user avatar
  • 2,105
107 votes
10 answers
210k views

Removing duplicate columns after a DF join in Spark

When you join two DFs with similar column names: df = df1.join(df2, df1['id'] == df2['id']) Join works fine but you can't call the id column because it is ambiguous and you would get the following ...
thecheech's user avatar
  • 2,161
104 votes
7 answers
192k views

Cannot find col function in pyspark

In pyspark 1.6.2, I can import col function by from pyspark.sql.functions import col but when I try to look it up in the Github source code I find no col function in functions.py file, how can ...
Bamqf's user avatar
  • 3,522
102 votes
6 answers
205k views

Add an empty column to Spark DataFrame

As mentioned in many other locations on the web, adding a new column to an existing DataFrame is not straightforward. Unfortunately it is important to have this functionality (even though it is ...
architectonic's user avatar
99 votes
8 answers
239k views

Removing duplicates from rows based on specific columns in an RDD/Spark DataFrame

Let's say I have a rather large dataset in the following form: data = sc.parallelize([('Foo', 41, 'US', 3), ('Foo', 39, 'UK', 1), ('Bar', 57, 'CA', 2), ...
Jason's user avatar
  • 2,854
98 votes
20 answers
203k views

How do I set the driver's python version in spark?

I'm using spark 1.4.0-rc2 so I can use python 3 with spark. If I add export PYSPARK_PYTHON=python3 to my .bashrc file, I can run spark interactively with python 3. However, if I want to run a ...
Kevin's user avatar
  • 3,421
97 votes
4 answers
247k views

How to join on multiple columns in Pyspark?

I am using Spark 1.3 and would like to join on multiple columns using python interface (SparkSQL) The following works: I first register them as temp tables. numeric.registerTempTable("numeric") Ref....
user3803714's user avatar
  • 5,339
96 votes
10 answers
97k views

collect_list by preserving order based on another variable

I am trying to create a new column of lists in Pyspark using a groupby aggregation on existing set of columns. An example input data frame is provided below: ------------------------ id | date ...
Ravi's user avatar
  • 3,293
96 votes
4 answers
234k views

Create Spark DataFrame. Can not infer schema for type

Could someone help me solve this problem I have with Spark DataFrame? When I do myFloatRDD.toDF() I get an error: TypeError: Can not infer schema for type: type 'float' I don't understand why......
Breach's user avatar
  • 1,318
95 votes
5 answers
200k views

Updating a dataframe column in spark

Looking at the new spark DataFrame API, it is unclear whether it is possible to modify dataframe columns. How would I go about changing a value in row x column y of a dataframe? In pandas this would ...
Luke's user avatar
  • 6,949
93 votes
7 answers
365k views

Pyspark: display a spark data frame in a table format

I am using pyspark to read a parquet file like below: my_df = sqlContext.read.parquet('hdfs://myPath/myDB.db/myTable/**') Then when I do my_df.take(5), it will show [Row(...)], instead of a table ...
Edamame's user avatar
  • 24.9k
92 votes
17 answers
124k views

How to link PyCharm with PySpark?

I'm new with apache spark and apparently I installed apache-spark with homebrew in my macbook: Last login: Fri Jan 8 12:52:04 on console user@MacBook-Pro-de-User-2:~$ pyspark Python 2.7.10 (default, ...
tumbleweed's user avatar
  • 4,572
90 votes
3 answers
339k views

How to convert column with string type to int form in pyspark data frame?

I have dataframe in pyspark. Some of its numerical columns contain nan so when I am reading the data and checking for the schema of dataframe, those columns will have string type. How I can change ...
neha's user avatar
  • 2,008
90 votes
10 answers
126k views

How to pivot Spark DataFrame?

I am starting to use Spark DataFrames and I need to be able to pivot the data to create multiple columns out of 1 column with multiple rows. There is built in functionality for that in Scalding and I ...
J  Calbreath's user avatar
  • 2,685
90 votes
4 answers
87k views

Pyspark: Split multiple array columns into rows

I have a dataframe which has one row, and several columns. Some of the columns are single values, and others are lists. All list columns are the same length. I want to split each list column into a ...
Steve's user avatar
  • 2,491
88 votes
22 answers
223k views

How to perform union on two DataFrames with different amounts of columns in Spark?

I have 2 DataFrames: I need union like this: The unionAll function doesn't work because the number and the name of columns are different. How can I do this?
Allan Feliph's user avatar
86 votes
4 answers
73k views

Spark functions vs UDF performance?

Spark now offers predefined functions that can be used in dataframes, and it seems they are highly optimized. My original question was going to be on which is faster, but I did some testing myself and ...
alfredox's user avatar
  • 4,302
85 votes
9 answers
166k views

How to find median and quantiles using Spark

How can I find median of an RDD of integers using a distributed method, IPython, and Spark? The RDD is approximately 700,000 elements and therefore too large to collect and find the median. This ...
pr338's user avatar
  • 9,020
85 votes
8 answers
224k views

How to get name of dataframe column in PySpark?

In pandas, this can be done by column.name. But how to do the same when it's a column of Spark dataframe? E.g. the calling program has a Spark dataframe: spark_df >>> spark_df.columns ['admit'...
Kaushik Acharya's user avatar
84 votes
7 answers
288k views

How to loop through each row of dataFrame in pyspark

E.g sqlContext = SQLContext(sc) sample=sqlContext.sql("select Name ,age ,city from user") sample.show() The above statement prints theentire table on terminal. But I want to access each ...
Arti Berde's user avatar
  • 1,212
83 votes
2 answers
178k views

pyspark collect_set or collect_list with groupby

How can I use collect_set or collect_list on a dataframe after groupby. for example: df.groupby('key').collect_set('values'). I get an error: AttributeError: 'GroupedData' object has no attribute '...
Hanan Shteingart's user avatar
83 votes
4 answers
9k views

How to make good reproducible Apache Spark examples

I've been spending a fair amount of time reading through some questions with the pyspark and spark-dataframe tags and very often I find that posters don't provide enough information to truly ...
pault's user avatar
  • 43k

1
2 3 4 5
817