python - PySpark DataFrames - way to enumerate without converting to Pandas? -
i have big pyspark.sql.dataframe.dataframe named df. need way of enumerating records- thus, being able access record index. (or select group of records indexes range)
in pandas, make
indexes=[2,3,6,7] df[indexes]
here want similar, (and without converting dataframe pandas)
the closest can is:
enumerating objects in original dataframe by:
indexes=np.arange(df.count()) df_indexed=df.withcolumn('index', indexes)
- searching values need using where() function.
questions:
- why doesn't work , how make working? how add row dataframe?
would work later make like:
indexes=[2,3,6,7] df1.where("index in indexes").collect()
any faster , simpler way deal it?
it doesn't work because:
- the second argument
withcolumn
shouldcolumn
not collection.np.array
won't work here - when pass
"index in indexes"
sql expressionwhere
indexes
out of scope , not resolved valid identifier
pyspark >= 1.4.0
you can add row numbers using respective window function , query using column.isin
method or formated query string:
from pyspark.sql.functions import col, rownumber pyspark.sql.window import window w = window.orderby() indexed = df.withcolumn("index", rownumber().over(w)) # using dsl indexed.where(col("index").isin(set(indexes))) # using sql expression indexed.where("index in ({0})".format(",".join(str(x) x in indexes)))
it looks window functions called without partition by
clause move data single partition above may not best solution after all.
any faster , simpler way deal it?
not really. spark dataframes don't support random row access.
pairedrdd
can accessed using lookup
method relatively fast if data partitioned using hashpartitioner
. there indexed-rdd project supports efficient lookups.
edit:
independent of pyspark version can try this:
from pyspark.sql import row pyspark.sql.types import structtype, structfield, longtype row = row("char") row_with_index = row("char", "index") df = sc.parallelize(row(chr(x)) x in range(97, 112)).todf() df.show(5) ## +----+ ## |char| ## +----+ ## | a| ## | b| ## | c| ## | d| ## | e| ## +----+ ## showing top 5 rows # part not tested should work , save work later schema = structtype( df.schema.fields[:] + [structfield("index", longtype(), false)]) indexed = (df.rdd # extract rdd .zipwithindex() # add index .map(lambda ri: row_with_index(*list(ri[0]) + [ri[1]])) # map rows .todf(schema)) # work without schema more expensive # inset in spark < 1.3 indexed.where(col("index").isin(indexes))
Comments
Post a Comment