python - PySpark DataFrames - way to enumerate without converting to Pandas? -


i have big pyspark.sql.dataframe.dataframe named df. need way of enumerating records- thus, being able access record index. (or select group of records indexes range)

in pandas, make

indexes=[2,3,6,7]  df[indexes] 

here want similar, (and without converting dataframe pandas)

the closest can is:

  • enumerating objects in original dataframe by:

    indexes=np.arange(df.count()) df_indexed=df.withcolumn('index', indexes) 
    • searching values need using where() function.

questions:

  1. why doesn't work , how make working? how add row dataframe?
  2. would work later make like:

     indexes=[2,3,6,7]   df1.where("index in indexes").collect() 
  3. any faster , simpler way deal it?

it doesn't work because:

  1. the second argument withcolumn should column not collection. np.array won't work here
  2. when pass "index in indexes" sql expression where indexes out of scope , not resolved valid identifier

pyspark >= 1.4.0

you can add row numbers using respective window function , query using column.isin method or formated query string:

from pyspark.sql.functions import col, rownumber pyspark.sql.window import window  w = window.orderby() indexed = df.withcolumn("index", rownumber().over(w))  # using dsl indexed.where(col("index").isin(set(indexes)))  # using sql expression indexed.where("index in ({0})".format(",".join(str(x) x in indexes))) 

it looks window functions called without partition by clause move data single partition above may not best solution after all.

any faster , simpler way deal it?

not really. spark dataframes don't support random row access.

pairedrdd can accessed using lookup method relatively fast if data partitioned using hashpartitioner. there indexed-rdd project supports efficient lookups.

edit:

independent of pyspark version can try this:

from pyspark.sql import row pyspark.sql.types import structtype, structfield, longtype  row = row("char") row_with_index = row("char", "index")  df = sc.parallelize(row(chr(x)) x in range(97, 112)).todf() df.show(5)  ## +----+ ## |char| ## +----+ ## |   a| ## |   b| ## |   c| ## |   d| ## |   e| ## +----+ ## showing top 5 rows  # part not tested should work , save work later schema  = structtype(     df.schema.fields[:] + [structfield("index", longtype(), false)])  indexed = (df.rdd # extract rdd     .zipwithindex() # add index     .map(lambda ri: row_with_index(*list(ri[0]) + [ri[1]])) # map rows     .todf(schema)) # work without schema more expensive  # inset in spark < 1.3 indexed.where(col("index").isin(indexes)) 

Comments

Popular posts from this blog

scala - 'wrong top statement declaration' when using slick in IntelliJ -

c# - DevExpress.Wpf.Grid.InfiniteGridSizeException was unhandled -

PySide and Qt Properties: Connecting signals from Python to QML -