This is an English translation of a Japanese blog. Some content may not be fully translated.
☁️

Vertically Concatenating DataFrames to Multiply Data in EMR PySpark

Notes from researching how to quickly increase PySpark DataFrame data by vertically concatenating DataFrames. This simply reads a CSV from S3 then merges multiple DataFrames into one.

from pyspark.sql.types import *
from functools import reduce
from pyspark.sql import DataFrame

schema = StructType([
        StructField('file', StringType(), False),
        StructField('num', IntegerType(), False),
        StructField('row', IntegerType(), False),
        StructField('word', StringType(), False),
        StructField('subtype1', StringType(), False),
        StructField('subtype2', StringType(), False),
        StructField('subtype3', StringType(), False),
        StructField('subtype4', StringType(), False),
        StructField('conjtype', StringType(), False),
        StructField('conjugation', StringType(), False),
        StructField('basic', StringType(), False),
        StructField('ruby', StringType(), False),
        StructField('pronunce', StringType(), False)
     ])

df1 = spark.read.csv(f's3://xxxxx/aozora_data.csv', schema=schema, header=True)
df2 = spark.read.csv(f's3://xxxxx/aozora_data.csv', schema=schema, header=True)
df3 = spark.read.csv(f's3://xxxxx/aozora_data.csv', schema=schema, header=True)
df4 = spark.read.csv(f's3://xxxxx/aozora_data.csv', schema=schema, header=True)
df5 = spark.read.csv(f's3://xxxxx/aozora_data.csv', schema=schema, header=True)
df6 = spark.read.csv(f's3://xxxxx/aozora_data.csv', schema=schema, header=True)
df7 = spark.read.csv(f's3://xxxxx/aozora_data.csv', schema=schema, header=True)
df8 = spark.read.csv(f's3://xxxxx/aozora_data.csv', schema=schema, header=True)
df9 = spark.read.csv(f's3://xxxxx/aozora_data.csv', schema=schema, header=True)
df10 = spark.read.csv(f's3://xxxxx/aozora_data.csv', schema=schema, header=True)

df_union=reduce(DataFrame.unionByName, [df1, df2, df3, df4, df5, df6, df7, df8, df9, df10])
df_union.count()
Suggest an edit on GitHub