pyspark.pandas.groupby.GroupBy.apply

GroupBy.apply(func: Callable, *args: Any, **kwargs: Any) → Union[pyspark.pandas.frame.DataFrame, pyspark.pandas.series.Series][source]

Apply function func group-wise and combine the results together.

The function passed to apply must take a DataFrame as its first argument and return a DataFrame. apply will then take care of combining the results back together into a single dataframe. apply is therefore a highly flexible grouping method.

While apply is a very flexible method, its downside is that using it can be quite a bit slower than using more specific methods like agg or transform. pandas-on-Spark offers a wide range of method that will be much faster than using apply for their specific purposes, so try to use them before reaching for apply.

Note

this API executes the function once to infer the type which is potentially expensive, for instance, when the dataset is created after aggregations or sorting.

To avoid this, specify return type in func, for instance, as below:

>>> def pandas_div(x) -> ps.DataFrame[int, [float, float]]:
...     return x[['B', 'C']] / x[['B', 'C']]

If the return type is specified, the output column names become c0, c1, c2 … cn. These names are positionally mapped to the returned DataFrame in func.

To specify the column names, you can assign them in a NumPy compound type style as below:

>>> def pandas_div(x) -> ps.DataFrame[("index", int), [("a", float), ("b", float)]]:
...     return x[['B', 'C']] / x[['B', 'C']]
>>> pdf = pd.DataFrame({'B': [1.], 'C': [3.]})
>>> def plus_one(x) -> ps.DataFrame[
...         (pdf.index.name, pdf.index.dtype), zip(pdf.columns, pdf.dtypes)]:
...     return x[['B', 'C']] / x[['B', 'C']]

Note

the dataframe within func is actually a pandas dataframe. Therefore, any pandas API within this function is allowed.

Parameters
funccallable

A callable that takes a DataFrame as its first argument, and returns a dataframe.

*args

Positional arguments to pass to func.

**kwargs

Keyword arguments to pass to func.

Returns
appliedDataFrame or Series

See also

aggregate

Apply aggregate function to the GroupBy object.

DataFrame.apply

Apply a function to a DataFrame.

Series.apply

Apply a function to a Series.

Examples

>>> df = ps.DataFrame({'A': 'a a b'.split(),
...                    'B': [1, 2, 3],
...                    'C': [4, 6, 5]}, columns=['A', 'B', 'C'])
>>> g = df.groupby('A')

Notice that g has two groups, a and b. Calling apply in various ways, we can get different grouping results:

Below the functions passed to apply takes a DataFrame as its argument and returns a DataFrame. apply combines the result for each group together into a new DataFrame:

>>> def plus_min(x):
...     return x + x.min()
>>> g.apply(plus_min).sort_index()  
    A  B   C
0  aa  2   8
1  aa  3  10
2  bb  6  10
>>> g.apply(sum).sort_index()  
    A  B   C
A
a  aa  3  10
b   b  3   5
>>> g.apply(len).sort_index()  
A
a    2
b    1
dtype: int64

You can specify the type hint and prevent schema inference for better performance.

>>> def pandas_div(x) -> ps.DataFrame[int, [float, float]]:
...     return x[['B', 'C']] / x[['B', 'C']]
>>> g.apply(pandas_div).sort_index()  
    c0   c1
0  1.0  1.0
1  1.0  1.0
2  1.0  1.0
>>> def pandas_div(x) -> ps.DataFrame[("index", int), [("f1", float), ("f2", float)]]:
...     return x[['B', 'C']] / x[['B', 'C']]
>>> g.apply(pandas_div).sort_index()  
        f1   f2
index
0      1.0  1.0
1      1.0  1.0
2      1.0  1.0

In case of Series, it works as below.

>>> def plus_max(x) -> ps.Series[int]:
...     return x + x.max()
>>> df.B.groupby(df.A).apply(plus_max).sort_index()  
0    6
1    3
2    4
Name: B, dtype: int64
>>> def plus_min(x):
...     return x + x.min()
>>> df.B.groupby(df.A).apply(plus_min).sort_index()
0    2
1    3
2    6
Name: B, dtype: int64

You can also return a scalar value as an aggregated value of the group:

>>> def plus_length(x) -> int:
...     return len(x)
>>> df.B.groupby(df.A).apply(plus_length).sort_index()  
0    1
1    2
Name: B, dtype: int64

The extra arguments to the function can be passed as below.

>>> def calculation(x, y, z) -> int:
...     return len(x) + y * z
>>> df.B.groupby(df.A).apply(calculation, 5, z=10).sort_index()  
0    51
1    52
Name: B, dtype: int64