
Accelerating Spark Datasets by Inlining Serialization
Learn how inlining serialization can accelerate Spark datasets, improving performance by reducing deserialization overhead. This study explores the impact on Map-Reduce operations, optimizing serialized storage formats and transforming code to minimize serialization-deserialization steps.
Download Presentation

Please find below an Image/Link to download the presentation.
The content on the website is provided AS IS for your information and personal use only. It may not be sold, licensed, or shared on other websites without obtaining consent from the author. If you encounter any issues during the download, it is possible that the publisher has removed the file from their server.
You are allowed to download the files provided on this website for personal or commercial use, subject to the condition that they are used lawfully. All files are the property of their respective owners.
The content on the website is provided AS IS for your information and personal use only. It may not be sold, licensed, or shared on other websites without obtaining consent from the author.
E N D
Presentation Transcript
Accelerating Spark Datasets by Inlining Deserialization Jan Wr blewski, Kazuaki Ishizaki, Hiroshi Inoue, Moriyoshi Ohara University of Warsaw IBM Research Tokyo
Map-Reduce with serialization DESERIALIZATION Object x y x y SERIALIZED INPUT SERIALIZED OUTPUT MAP FUNCTION (scale Point by 2) x y 2 * x 2 * y Object Object x y 2 * x 2 * y x y 2 * x 2 * y x y 2 * x 2 * y SERIALIZATION Object 2 * x 2 * y 2 * x 2 * y
Map-Reduce with serialization SLOW DESERIALIZATION Object x y x y SERIALIZED INPUT SERIALIZED OUTPUT MAP FUNCTION (scale Point by 2) x y 2 * x 2 * y Object Object x y 2 * x 2 * y x y 2 * x 2 * y x y 2 * x 2 * y SERIALIZATION Object SLOW 2 * x 2 * y 2 * x 2 * y
Map-Reduce with serialization DESERIALIZATION Object x y x y SERIALIZED INPUT SERIALIZED OUTPUT MAP FUNCTION (scale Point by 2) x y 2 * x 2 * y Object Object x y 2 * x 2 * y x y 2 * x 2 * y x y 2 * x 2 * y SERIALIZATION Object 2 * x 2 * y 2 * x 2 * y
Map-Reduce with serialization DESERIALIZATION Object x y x y SERIALIZED INPUT SERIALIZED OUTPUT MAP FUNCTION (scale Point by 2) x y 2 * x 2 * y Object Object x y 2 * x 2 * y x y 2 * x 2 * y x y 2 * x 2 * y SERIALIZATION Object 2 * x 2 * y 2 * x 2 * y
Map-Reduce with serialization DESERIALIZATION Object x y x y SERIALIZED INPUT SERIALIZED OUTPUT MAP FUNCTION (scale Point by 2) x y 2 * x 2 * y Object Object x y 2 * x 2 * y x y 2 * x 2 * y x y 2 * x 2 * y SERIALIZATION Object 2 * x 2 * y 2 * x 2 * y
Map-Reduce with serialization DESERIALIZATION Object x y x y SERIALIZED INPUT SERIALIZED OUTPUT MAP FUNCTION (scale Point by 2) x y 2 * x 2 * y Object Object x y 2 * x 2 * y x y 2 * x 2 * y x y 2 * x 2 * y SERIALIZATION Object 2 * x 2 * y 2 * x 2 * y
Summary Requirements Contribution Map-Reduce Serialized storage format Map/Reduce working on objects Ability to transform code of the Map/Reduce function Removing deserialization/serialization by transforming the Map/Reduce function to work on serialized data
Transformation in a nutshell Original After inlining deserialization val scaleByTwo = (p: Point) => Point(p.x * 2, p.y * 2) val scaleByTwo = (x: Int, y: Int) => Point(x * 2, y * 2)
Transformation in a nutshell Original After inlining deserialization val scaleByTwo = (p: Point) => Point(p.x * 2, p.y * 2) val (x1, y1) = deserializer.read(); val p1 = Point(x1, y1); val p2 = scaleByTwo(p1); serializer.write(p2.x, p2.y); val scaleByTwo = (x: Int, y: Int) => Point(x * 2, y * 2) val (x1, y1) = deserializer.read(); val p2 = scaleByTwo(x1, x2); serializer.write(p2.x, p2.y);
Transformation in a nutshell Original After inlining deserialization val scaleByTwo = (p: Point) => Point(p.x * 2, p.y * 2) val (x1, y1) = deserializer.read(); val p1 = Point(x1, y1); val p2 = scaleByTwo(p1); serializer.write(p2.x, p2.y); val scaleByTwo = (x: Int, y: Int) => Point(x * 2, y * 2) val (x1, y1) = deserializer.read(); val p2 = scaleByTwo(x1, x2); serializer.write(p2.x, p2.y);
Transformation in a nutshell Original After inlining deserialization Static analysis and transformation val scaleByTwo = (p: Point) => Point(p.x * 2, p.y * 2) val (x1, y1) = deserializer.read(); val p1 = Point(x1, y1); val p2 = scaleByTwo(p1); serializer.write(p2.x, p2.y); val scaleByTwo = (x: Int, y: Int) => Point(x * 2, y * 2) val (x1, y1) = deserializer.read(); val p2 = scaleByTwo(x1, x2); serializer.write(p2.x, p2.y);
Under the hood getfield iload Original After inlining deserialization new Point dup aload_1 getfield Point.x iconst_2 imul aload_1 getfield Point.y iconst_2 imul invokespecial Point.<init> areturn new Point dup iload_1 iconst_2 imul iload_2 iconst_2 imul invokespecial Point.<init> areturn
Just find all appropriate getfields It is a kind of escape analysis, which is undecidable in general However, we can handle most real-life programs. Must work when passing the input object to other functions Fortunately, recursive approach works in practice Must preserve all side effects This is easy thanks to the locality of the transformation Primitive arrays are more complex, and object arrays even more We handle only primitive arrays through lightweight wrapper objects
Just find all appropriate getfields It is a kind of escape analysis, which is undecidable in general However, we can handle most real-life programs. Must work when passing the input object to other functions Fortunately, recursive approach works in practice Must preserve all side effects This is easy thanks to the locality of the transformation Primitive arrays are more complex, and object arrays even more We handle only primitive arrays through lightweight wrapper objects
Just find all appropriate getfields It is a kind of escape analysis, which is undecidable in general However, we can handle most real-life programs. Must work when passing the input object to other functions Fortunately, recursive approach works in practice Must preserve all side effects This is easy thanks to the locality of the transformation Primitive arrays are more complex, and object arrays even more We handle only primitive arrays through lightweight wrapper objects
Just find all appropriate getfields It is a kind of escape analysis, which is undecidable in general However, we can handle most real-life programs. Must work when passing the input object to other functions Fortunately, recursive approach works in practice Must preserve all side effects This is easy thanks to the locality of the transformation Primitive arrays are more complex, and object arrays even more We handle only primitive arrays through lightweight wrapper objects
Benchmarked code case class DataPointArray(x: Array[Double], y: Double) p => { var i = 0 var dotp: Double = 0.0 while (i < D) { dotp += w(i) * p.x(i) i = i + 1 } val a = new Array[Double](D) i = 0 while (i < D) { a(i) = p.x(i) * (1 / (1 + scala.math.exp(-p.y * dotp)) - 1) * p.y i = i + 1 } a }
Benchmark environment Intel Xeon E3 1270v2 and IBM POWER8 IBM Java SDK 8 SR3 64GB memory limit Measured best times, since it was more stable with how JVM JIT compiler works
Micro-benchmark 7 Best speedup (more is better) 6 5 4 3 2 1 0 1 10 100 1000 10000 100000 Array size of x x86_64 PowerPC
Apache Spark Implementation in Apache Spark SQL module Datasets Fast SQL operations in DataFrame (specialized Datasets) Flexible Map-Reduce on Java-like collections in Spark RDDs Non-specialized Datasets that were slower than RDDs
RDD vs Dataset RDD Dataset No deserialization/serialization around map task Java object memory overhead Costly serialization for communication In practice faster Allows any serializable object as data record Has deserialization/serialization around map task Uses less memory Cheap serialization for communication In practice slower Efficiently supports a limited set of data structures
RDD vs Dataset RDD Dataset No deserialization/serialization around map task Java object memory overhead Costly serialization for communication In practice faster Allows any serializable object as data record Has deserialization/serialization around map task Uses less memory Cheap serialization for communication In practice slower Efficiently supports a limited set of data structures
Spark benchmark results 3.5 Best speedup (more is better) 3 2.5 2 1.5 1 0.5 0 1 10 100 1000 10000 100000 Array size of x x86_64 PowerPC
RDD vs Dataset with deserialization inlining RDD Dataset with deserialization inlining No deserialization/serialization around map task Java object memory overhead Costly serialization for communication In practice often slower Allows any serializable object as data record Little deserialization/serialization around map task Uses less memory Cheap serialization for communication In practice often faster Supports a limited set of data structures and language constructs
New niche of Apache Spark problems Expressivity RDD Speed Dataset + deserialization inlining DataFrame
Expressivity RDD Speed Dataset + deserialization inlining DataFrame Thank you for listening!
JVM and escape analysis (backup slide) JVM JIT compiler could handle this whole optimization, but it does not have the extra knowledge we have: data records and their processing are (usually) independent