Skip to content

[FLINK-14346] [serialization] faster implementation of StringValue writeString and readString#10358

Merged
pnowojski merged 6 commits intoapache:masterfrom
shuttie:string-serializer
Dec 7, 2019
Merged

[FLINK-14346] [serialization] faster implementation of StringValue writeString and readString#10358
pnowojski merged 6 commits intoapache:masterfrom
shuttie:string-serializer

Conversation

@shuttie
Copy link
Contributor

@shuttie shuttie commented Nov 29, 2019

What is the purpose of the change

This PR implements a set of performance optimizations for String serialization and de-serialization. While running a set of state-heavy streaming jobs, we noticed that Flink spends quite a lot of CPU time (~30-40%) doing String encoding and decoding in two places: while transferring messages between the nodes, and while loading and writing objects into the state store.

We did a simple benchmark of String read/write operations compared to a default JDK's DataOutput.writeUTF ano noted a significant performance difference between Flink implementation and the JDK one.

Performance difference was 4x on decoding and 2x on encoding for 16 symbol ascii strings.

[info]	Benchmark	                      (length)	(stringType)	Mode	Cnt	Score	    Error	Units
[info]	StringDeserializerBenchmark.deserializeDefault	16	ascii	avgt	25	251.321	±	3.251	ns/op
[info]	StringDeserializerBenchmark.deserializeJDK	16	ascii	avgt	25	77.147	±	1.661	ns/op

[info]	StringSerializerBenchmark.serializeDefault	16	ascii	avgt	25	95.782	±	0.261	ns/op
[info]	StringSerializerBenchmark.serializeJDK		16	ascii	avgt	25	50.786	±	1.677	ns/op

For larger strings performance was degrading even more significant, 7x and 4x accordingly:

[info]	Benchmark	                      (length)	(stringType)	Mode	Cnt	Score	    Error	Units
[info]	StringDeserializerBenchmark.deserializeDefault	128	ascii	avgt	25	1757.726	±	3.312	ns/op
[info]	StringDeserializerBenchmark.deserializeJDK	128	ascii	avgt	25	263.445	±	6.912	ns/op

[info]	StringSerializerBenchmark.serializeDefault	128	ascii	avgt	25	670.627	±	2.807	ns/op
[info]	StringSerializerBenchmark.serializeJDK		128	ascii	avgt	25	151.789	±	7.295	ns/op

But JDK DataOutput.writeUTF cannot be directly used as a serializer replacement because of it being incompatible with the current Flink binary format for strings. Also it is not able to write strings longer than 32kb, as it uses 2-byte length encoding.

But if you compare the difference in implementation between these two algorithms, the main important difference is intermediate buffering in JDK, compared to a iterative approach in Flink. This buffering allows HotSpot to do two important optimizations:

  • be able to unroll the whole encoding/decoding loop
  • as there is no data dependencies between characters anymore, CPU can achieve much higher internal parallelism and spend less time being stalled.

We made a simple POC which alters the implementation of StringValue.writeString and StringValue.readString in a way to introduce additional buffering, which significantly improves both encoding and decoding throughput on almost all the workloads.

There is also a property-based validation test suite which ensures that old and new serialization code produce exactly the same byte sequences, and doing a round-trip from old to new, new to old, and new to new produce the same results. We ran this suite with random data for ~1h and found no differences.

Benchmark results

Full raw benchmark results are located here. We did a series of benchmarks of three string encoding/decoding impmentations:

  • Flink's current StringValue.writeString and StringValue.readString
  • StringValue.writeString and StringValue.readString implementations from this PR
  • baseline binary-incompatible implementation of DataInput.readUTF and DataOutput.writeUTF

For a workload generator we used these parameters:

  • string types: 7-bit us-ascii, russian symbols (which are usually encoded as a 14-bit varlen numbers), chinese symbols (which are usually within 21-bit varlen number range)
  • string lengths: 1, 4, 8, 16, 32, 64, 128 characters.

Ascii strings

[info]	Benchmark	                      (length)	(stringType)	Mode	Cnt	Score	    Error	Units
[info]	StringDeserializerBenchmark.deserializeDefault	1	ascii	avgt	25	46.603	±	0.750	ns/op
[info]	StringDeserializerBenchmark.deserializeImproved	1	ascii	avgt	25	51.074	±	0.720	ns/op
[info]	StringDeserializerBenchmark.deserializeJDK	1	ascii	avgt	25	63.402	±	1.631	ns/op
[info]	StringSerializerBenchmark.serializeDefault	1	ascii	avgt	25	31.595	±	0.489	ns/op
[info]	StringSerializerBenchmark.serializeImproved	1	ascii	avgt	25	33.454	±	0.151	ns/op
[info]	StringSerializerBenchmark.serializeJDK		1	ascii	avgt	25	34.721	±	0.128	ns/op

[info]	StringDeserializerBenchmark.deserializeDefault	16	ascii	avgt	25	251.321	±	3.251	ns/op
[info]	StringDeserializerBenchmark.deserializeImproved	16	ascii	avgt	25	55.385	±	1.176	ns/op
[info]	StringDeserializerBenchmark.deserializeJDK	16	ascii	avgt	25	77.147	±	1.661	ns/op
[info]	StringSerializerBenchmark.serializeDefault	16	ascii	avgt	25	95.782	±	0.261	ns/op
[info]	StringSerializerBenchmark.serializeImproved	16	ascii	avgt	25	51.806	±	0.180	ns/op
[info]	StringSerializerBenchmark.serializeJDK		16	ascii	avgt	25	50.786	±	1.677	ns/op

[info]	StringDeserializerBenchmark.deserializeDefault	128	ascii	avgt	25	1757.726 ±	3.312	ns/op
[info]	StringDeserializerBenchmark.deserializeImproved	128	ascii	avgt	25	140.374	±	1.006	ns/op
[info]	StringDeserializerBenchmark.deserializeJDK	128	ascii	avgt	25	263.445	±	6.912	ns/op
[info]	StringSerializerBenchmark.serializeDefault	128	ascii	avgt	25	670.627	±	2.807	ns/op
[info]	StringSerializerBenchmark.serializeImproved	128	ascii	avgt	25	161.481	±	2.798	ns/op
[info]	StringSerializerBenchmark.serializeJDK		128	ascii	avgt	25	151.789	±	7.295	ns/op

So for ascii strings:

  • on 1-char strings the new implementation is a bit slower than the old one. There was a performance regression, which is fixed in the next commit. For short strings performance is almost the same.
  • on 16-char strings encoding is 2x faster, decoding is 5x faster
  • on 128-char strings encoding is 4x faster, decoding is 12x faster

Non-ascii strings

[info]	Benchmark	                      (length)	(stringType)	Mode	Cnt	Score	    Error	Units
[info]	StringDeserializerBenchmark.deserializeDefault	1	chinese	avgt	25	77.743	±	1.635	ns/op
[info]	StringDeserializerBenchmark.deserializeImproved	1	chinese	avgt	25	78.814	±	1.329	ns/op
[info]	StringDeserializerBenchmark.deserializeJDK	1	chinese	avgt	25	66.005	±	1.325	ns/op
[info]	StringSerializerBenchmark.serializeDefault	1	chinese	avgt	25	36.767	±	3.662	ns/op
[info]	StringSerializerBenchmark.serializeImproved	1	chinese	avgt	25	36.382	±	0.153	ns/op
[info]	StringSerializerBenchmark.serializeJDK		1	chinese	avgt	25	36.845	±	0.575	ns/op

[info]	StringDeserializerBenchmark.deserializeDefault	16	chinese	avgt	25	669.156	±	3.021	ns/op
[info]	StringDeserializerBenchmark.deserializeImproved	16	chinese	avgt	25	182.587	±	4.843	ns/op
[info]	StringDeserializerBenchmark.deserializeJDK	16	chinese	avgt	25	148.063	±	2.204	ns/op
[info]	StringSerializerBenchmark.serializeDefault	16	chinese	avgt	25	244.844	±	1.079	ns/op
[info]	StringSerializerBenchmark.serializeImproved	16	chinese	avgt	25	86.651	±	1.316	ns/op
[info]	StringSerializerBenchmark.serializeJDK	 	16	chinese	avgt	25	81.840	±	1.976	ns/op

[info]	StringDeserializerBenchmark.deserializeDefault	128	chinese	avgt	25	5147.632 ±	30.068	ns/op
[info]	StringDeserializerBenchmark.deserializeImproved	128	chinese	avgt	25	714.912	±	26.240	ns/op
[info]	StringDeserializerBenchmark.deserializeJDK	128	chinese	avgt	25	738.740	±	7.291	ns/op
[info]	StringSerializerBenchmark.serializeDefault	128	chinese	avgt	25	1889.786 ±	8.541	ns/op
[info]	StringSerializerBenchmark.serializeImproved	128	chinese	avgt	25	388.404	±	2.511	ns/op
[info]	StringSerializerBenchmark.serializeJDK	 	128	chinese	avgt	25	401.011	±	3.157	ns/op

So for non-ascii chinese character strings:

  • on 1-char strings there is no performance difference.
  • on 16-char strings encoding is 3x faster, decoding is 4x faster
  • on 128-char strings encoding is 5x faster, decoding is 7x faster

Brief change log

  • Replace an existing StringValue.writeString and StringValue.readString methods with improved ones.
  • Add an additional test case to StringSerializationTest to cover utf8 encoding/decoding.
  • Performance regression fix for short strings.

Verifying this change

This change is already covered by existing tests, such as StringSerializationTest.
Also this change added additional test cases and can be verified as follows:

  • Add an additional test case to StringSerializationTest to cover utf8 encoding/decoding.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@flinkbot
Copy link
Collaborator

flinkbot commented Nov 29, 2019

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 4396fe5 (Wed Dec 04 16:10:33 UTC 2019)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!
  • This pull request references an unassigned Jira ticket. According to the code contribution guide, tickets need to be assigned before starting with the implementation work.

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.

Details
The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Nov 29, 2019

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build

@shuttie
Copy link
Contributor Author

shuttie commented Dec 3, 2019

Yet another set of benchmarks, out of flink-benchmarks PR (dataArtisans/flink-benchmarks#36).

The only thing I notice here is a slight performance degradation on small strings (<=4 symbols), which I'll address in the next patch.

master:

Benchmark                                                        Mode  Cnt    Score    Error   Units
SerializationFrameworkMiniBenchmarks.serializerAvro             thrpt   30  388.350 ±  5.574  ops/ms
SerializationFrameworkMiniBenchmarks.serializerKryo             thrpt   30  211.344 ±  8.336  ops/ms
SerializationFrameworkMiniBenchmarks.serializerPojo             thrpt   30  470.016 ± 13.141  ops/ms
SerializationFrameworkMiniBenchmarks.serializerRow              thrpt   30  557.009 ±  9.751  ops/ms
SerializationFrameworkMiniBenchmarks.serializerStringHeavyPojo  thrpt   30   88.379 ±  1.292  ops/ms
SerializationFrameworkMiniBenchmarks.serializerTuple            thrpt   30  592.778 ±  8.488  ops/ms

Benchmark                                 (lengthStr)   (type)   Mode  Cnt      Score      Error   Units
PojoSerializationBenchmark.readAvro               N/A      N/A  thrpt   30    598.640 ±   25.763  ops/ms
PojoSerializationBenchmark.readKryo               N/A      N/A  thrpt   30    193.355 ±    6.963  ops/ms
PojoSerializationBenchmark.readPojo               N/A      N/A  thrpt   30    620.239 ±    3.194  ops/ms
PojoSerializationBenchmark.writeAvro              N/A      N/A  thrpt   30    654.290 ±    3.870  ops/ms
PojoSerializationBenchmark.writeKryo              N/A      N/A  thrpt   30    608.389 ±   12.006  ops/ms
PojoSerializationBenchmark.writePojo              N/A      N/A  thrpt   30    828.253 ±    6.037  ops/ms
StringSerializationBenchmark.stringRead             4    ascii  thrpt   30  11445.245 ±   35.093  ops/ms
StringSerializationBenchmark.stringRead             4  russian  thrpt   30   7115.556 ±   25.999  ops/ms
StringSerializationBenchmark.stringRead             4  chinese  thrpt   30   5149.447 ±   30.320  ops/ms
StringSerializationBenchmark.stringRead            32    ascii  thrpt   30   2154.990 ±    6.773  ops/ms
StringSerializationBenchmark.stringRead            32  russian  thrpt   30   1126.236 ±    0.974  ops/ms
StringSerializationBenchmark.stringRead            32  chinese  thrpt   30    772.899 ±    3.538  ops/ms
StringSerializationBenchmark.stringRead           256    ascii  thrpt   30    285.788 ±    0.907  ops/ms
StringSerializationBenchmark.stringRead           256  russian  thrpt   30    144.113 ±    0.793  ops/ms
StringSerializationBenchmark.stringRead           256  chinese  thrpt   30     98.919 ±    0.718  ops/ms
StringSerializationBenchmark.stringWrite            4    ascii  thrpt   30  19755.480 ±  113.023  ops/ms
StringSerializationBenchmark.stringWrite            4  russian  thrpt   30  11731.759 ± 1329.529  ops/ms
StringSerializationBenchmark.stringWrite            4  chinese  thrpt   30  11457.075 ±   64.132  ops/ms
StringSerializationBenchmark.stringWrite           32    ascii  thrpt   30   3349.573 ±   15.093  ops/ms
StringSerializationBenchmark.stringWrite           32  russian  thrpt   30   1464.489 ±   10.258  ops/ms
StringSerializationBenchmark.stringWrite           32  chinese  thrpt   30   1094.098 ±    4.450  ops/ms
StringSerializationBenchmark.stringWrite          256    ascii  thrpt   30    464.168 ±    4.761  ops/ms
StringSerializationBenchmark.stringWrite          256  russian  thrpt   30    269.960 ±   53.424  ops/ms
StringSerializationBenchmark.stringWrite          256  chinese  thrpt   30    189.702 ±   36.327  ops/ms

this PR:

Benchmark                                                        Mode  Cnt    Score    Error   Units
SerializationFrameworkMiniBenchmarks.serializerAvro             thrpt   30  389.392 ±  6.379  ops/ms
SerializationFrameworkMiniBenchmarks.serializerKryo             thrpt   30  217.490 ±  8.975  ops/ms
SerializationFrameworkMiniBenchmarks.serializerPojo             thrpt   30  448.449 ± 11.446  ops/ms
SerializationFrameworkMiniBenchmarks.serializerRow              thrpt   30  521.921 ± 11.082  ops/ms
SerializationFrameworkMiniBenchmarks.serializerStringHeavyPojo  thrpt   30  108.779 ±  2.980  ops/ms
SerializationFrameworkMiniBenchmarks.serializerTuple            thrpt   30  548.718 ± 11.773  ops/ms

Benchmark                                 (lengthStr)   (type)   Mode  Cnt      Score     Error   Units
PojoSerializationBenchmark.readAvro               N/A      N/A  thrpt   30    593.101 ±  30.778  ops/ms
PojoSerializationBenchmark.readKryo               N/A      N/A  thrpt   30    184.984 ±   2.437  ops/ms
PojoSerializationBenchmark.readPojo               N/A      N/A  thrpt   30    657.618 ±   8.342  ops/ms
PojoSerializationBenchmark.writeAvro              N/A      N/A  thrpt   30    632.636 ±   4.231  ops/ms
PojoSerializationBenchmark.writeKryo              N/A      N/A  thrpt   30    609.889 ±   4.084  ops/ms
PojoSerializationBenchmark.writePojo              N/A      N/A  thrpt   30    769.924 ±   8.650  ops/ms
StringSerializationBenchmark.stringRead             4    ascii  thrpt   30  17623.353 ±  48.387  ops/ms
StringSerializationBenchmark.stringRead             4  russian  thrpt   30  10226.762 ±  94.515  ops/ms
StringSerializationBenchmark.stringRead             4  chinese  thrpt   30   7979.150 ±  61.660  ops/ms
StringSerializationBenchmark.stringRead            32    ascii  thrpt   30  13919.065 ±  51.691  ops/ms
StringSerializationBenchmark.stringRead            32  russian  thrpt   30   4537.817 ±  30.646  ops/ms
StringSerializationBenchmark.stringRead            32  chinese  thrpt   30   3263.699 ±  22.664  ops/ms
StringSerializationBenchmark.stringRead           256    ascii  thrpt   30   3183.622 ±  26.376  ops/ms
StringSerializationBenchmark.stringRead           256  russian  thrpt   30   1011.096 ±  12.115  ops/ms
StringSerializationBenchmark.stringRead           256  chinese  thrpt   30    689.678 ±   4.445  ops/ms
StringSerializationBenchmark.stringWrite            4    ascii  thrpt   30  17796.026 ± 143.503  ops/ms
StringSerializationBenchmark.stringWrite            4  russian  thrpt   30  16582.541 ± 372.612  ops/ms
StringSerializationBenchmark.stringWrite            4  chinese  thrpt   30  15225.444 ± 119.326  ops/ms
StringSerializationBenchmark.stringWrite           32    ascii  thrpt   30   9781.345 ± 826.800  ops/ms
StringSerializationBenchmark.stringWrite           32  russian  thrpt   30   8423.629 ±  58.593  ops/ms
StringSerializationBenchmark.stringWrite           32  chinese  thrpt   30   6111.879 ±  37.015  ops/ms
StringSerializationBenchmark.stringWrite          256    ascii  thrpt   30   3620.902 ±  16.969  ops/ms
StringSerializationBenchmark.stringWrite          256  russian  thrpt   30   1801.506 ±  14.516  ops/ms
StringSerializationBenchmark.stringWrite          256  chinese  thrpt   30   1019.450 ±   8.503  ops/ms

@shuttie
Copy link
Contributor Author

shuttie commented Dec 3, 2019

Yep, there is a regression for strings shorter than 6 characters, see this more granular benchmark:


Before fallback:

[info] Benchmark                                    (length)  (stringType)  Mode  Cnt   Score    Error  Units
[info] StringSerializerBenchmark.serializeDefault          1         ascii  avgt    5  33.383 ±  2.796  ns/op
[info] StringSerializerBenchmark.serializeDefault          2         ascii  avgt    5  32.731 ±  2.470  ns/op
[info] StringSerializerBenchmark.serializeDefault          3         ascii  avgt    5  37.619 ±  3.950  ns/op
[info] StringSerializerBenchmark.serializeDefault          4         ascii  avgt    5  42.452 ±  3.703  ns/op
[info] StringSerializerBenchmark.serializeDefault          5         ascii  avgt    5  46.887 ±  2.906  ns/op
[info] StringSerializerBenchmark.serializeDefault          6         ascii  avgt    5  57.461 ± 14.265  ns/op
[info] StringSerializerBenchmark.serializeDefault          7         ascii  avgt    5  58.337 ±  2.813  ns/op
[info] StringSerializerBenchmark.serializeImproved         1         ascii  avgt    5  37.015 ± 11.327  ns/op
[info] StringSerializerBenchmark.serializeImproved         2         ascii  avgt    5  40.723 ±  9.182  ns/op
[info] StringSerializerBenchmark.serializeImproved         3         ascii  avgt    5  43.556 ± 10.250  ns/op
[info] StringSerializerBenchmark.serializeImproved         4         ascii  avgt    5  48.410 ± 12.323  ns/op
[info] StringSerializerBenchmark.serializeImproved         5         ascii  avgt    5  47.770 ±  7.285  ns/op
[info] StringSerializerBenchmark.serializeImproved         6         ascii  avgt    5  48.477 ±  7.607  ns/op
[info] StringSerializerBenchmark.serializeImproved         7         ascii  avgt    5  49.082 ± 13.026  ns/op

After fallback:

[info] Benchmark                                    (length)  (stringType)  Mode  Cnt   Score   Error  Units
[info] StringSerializerBenchmark.serializeImproved         1         ascii  avgt    5  31.794 ± 0.898  ns/op
[info] StringSerializerBenchmark.serializeImproved         2         ascii  avgt    5  30.904 ± 0.814  ns/op
[info] StringSerializerBenchmark.serializeImproved         3         ascii  avgt    5  35.260 ± 1.481  ns/op
[info] StringSerializerBenchmark.serializeImproved         4         ascii  avgt    5  40.210 ± 1.505  ns/op
[info] StringSerializerBenchmark.serializeImproved         5         ascii  avgt    5  45.301 ± 2.434  ns/op
[info] StringSerializerBenchmark.serializeImproved         6         ascii  avgt    5  43.255 ± 8.550  ns/op
[info] StringSerializerBenchmark.serializeImproved         7         ascii  avgt    5  45.846 ± 7.652  ns/op

@shuttie
Copy link
Contributor Author

shuttie commented Dec 3, 2019

For deserialization, allocating a small buffer is not that much a significant overhead, so the fallback is not really needed:

[info] Benchmark                                        (length)  (stringType)  Mode  Cnt    Score    Error  Units
[info] StringDeserializerBenchmark.deserializeDefault          1         ascii  avgt    5   46.588 ±  4.684  ns/op
[info] StringDeserializerBenchmark.deserializeDefault          2         ascii  avgt    5   58.620 ±  3.946  ns/op
[info] StringDeserializerBenchmark.deserializeDefault          3         ascii  avgt    5   77.313 ± 10.427  ns/op
[info] StringDeserializerBenchmark.deserializeDefault          4         ascii  avgt    5   90.241 ± 10.840  ns/op
[info] StringDeserializerBenchmark.deserializeDefault          5         ascii  avgt    5  102.410 ± 13.860  ns/op
[info] StringDeserializerBenchmark.deserializeDefault          6         ascii  avgt    5  120.779 ± 14.496  ns/op
[info] StringDeserializerBenchmark.deserializeDefault          7         ascii  avgt    5  131.607 ± 14.304  ns/op
[info] StringDeserializerBenchmark.deserializeImproved         1         ascii  avgt    5   51.051 ±  3.370  ns/op
[info] StringDeserializerBenchmark.deserializeImproved         2         ascii  avgt    5   51.510 ±  3.727  ns/op
[info] StringDeserializerBenchmark.deserializeImproved         3         ascii  avgt    5   51.531 ±  5.641  ns/op
[info] StringDeserializerBenchmark.deserializeImproved         4         ascii  avgt    5   51.571 ±  8.137  ns/op
[info] StringDeserializerBenchmark.deserializeImproved         5         ascii  avgt    5   52.366 ±  5.513  ns/op
[info] StringDeserializerBenchmark.deserializeImproved         6         ascii  avgt    5   56.548 ±  7.410  ns/op
[info] StringDeserializerBenchmark.deserializeImproved         7         ascii  avgt    5   52.861 ±  7.447  ns/op

Copy link
Contributor

@AHeise AHeise left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very exciting stuff! I tried to first understand what's going on and then also find some more performance improvements. That's why there are quite a few comments although the code is already in very good shape.
My biggest wish would be to get rid of the fallback. I hope that you could spare some more time to check if a more accurate buflen while writing would make it any quicker. You could even mock that by setting it to 1 for ascii strings, although you could probably just plugin the small expression that I provided.
Of course, if that wish isn't going to happen one (no time) or the other way (no success), I'd be happy to approve it anyhow.

@StephanEwen
Copy link
Contributor

This is nice work, thanks a lot :-)

Could you add a test case that ensures the encoding is still the same? Maybe copy the old String read/write logic and compare it with the new one for some random Strings?

@shuttie
Copy link
Contributor Author

shuttie commented Dec 4, 2019

@AHeise thanks for all the ideas, I've updated the PR with all the proposals applied.

As for writeString fallback code, I've found a better way of dealing with short strings, not requiring a separate code path. If you stare long enough in the jmh perfasm listing for short strings, you may notice that most of the time (compared with the original implementation) is spent within initial buffer size computation. In the original unbuffered code there is no reason to compute it, as there is no buffer. But in this PR we need to scan a string twice: to compute the buffer size, and then to write characters to the buffer.

Main idea of this PR is to leverage CPU-level parallelism, helping it to process multiple characters at once. But the problem with short strings is that there is nothing to parallelize, so double-scanning overhead starts to kill the performance.

The proposed fix is to over-allocate the buffer for short strings, skipping the exact buffer size computation. I've found a tipping point for this approach laying somewhere between 6-8 characters:

  • for strings < 6 chars it's faster to overallocate,
  • for strings of 6-8 chars it's the same as exact computation,
  • for strings > 8 chars it can be slower, but insignificantly. But in theory it may produce some GC pressure.

The current round of benchmarks:

[info] Benchmark                                    (length)  (stringType)  Mode  Cnt   Score   Error  Units
[info] StringDeserializerBenchmark.deserializeDefault          1         ascii  avgt   50   45.618 ± 0.339  ns/op
[info] StringDeserializerBenchmark.deserializeDefault          2         ascii  avgt   50   61.348 ± 0.579  ns/op
[info] StringDeserializerBenchmark.deserializeDefault          4         ascii  avgt   50   88.067 ± 1.058  ns/op
[info] StringDeserializerBenchmark.deserializeDefault          8         ascii  avgt   50  142.902 ± 1.121  ns/op
[info] StringDeserializerBenchmark.deserializeDefault         16         ascii  avgt   50  249.181 ± 1.920  ns/op
[info] StringDeserializerBenchmark.deserializeDefault         32         ascii  avgt   50  466.382 ± 1.502  ns/op
[info] StringDeserializerBenchmark.deserializeImproved         1         ascii  avgt   50   49.916 ± 0.132  ns/op
[info] StringDeserializerBenchmark.deserializeImproved         2         ascii  avgt   50   50.278 ± 0.064  ns/op
[info] StringDeserializerBenchmark.deserializeImproved         4         ascii  avgt   50   50.365 ± 0.129  ns/op
[info] StringDeserializerBenchmark.deserializeImproved         8         ascii  avgt   50   52.463 ± 0.301  ns/op
[info] StringDeserializerBenchmark.deserializeImproved        16         ascii  avgt   50   55.711 ± 0.597  ns/op
[info] StringDeserializerBenchmark.deserializeImproved        32         ascii  avgt   50   65.342 ± 0.555  ns/op
[info] StringSerializerBenchmark.serializeDefault              1         ascii  avgt   50   31.076 ± 0.192  ns/op
[info] StringSerializerBenchmark.serializeDefault              2         ascii  avgt   50   31.770 ± 1.811  ns/op
[info] StringSerializerBenchmark.serializeDefault              4         ascii  avgt   50   39.251 ± 0.189  ns/op
[info] StringSerializerBenchmark.serializeDefault              8         ascii  avgt   50   57.736 ± 0.253  ns/op
[info] StringSerializerBenchmark.serializeDefault             16         ascii  avgt   50   94.964 ± 0.514  ns/op
[info] StringSerializerBenchmark.serializeDefault             32         ascii  avgt   50  168.754 ± 1.416  ns/op
[info] StringSerializerBenchmark.serializeImproved             1         ascii  avgt   50   30.145 ± 0.156  ns/op
[info] StringSerializerBenchmark.serializeImproved             2         ascii  avgt   50   30.873 ± 0.274  ns/op
[info] StringSerializerBenchmark.serializeImproved             4         ascii  avgt   50   31.993 ± 0.276  ns/op
[info] StringSerializerBenchmark.serializeImproved             8         ascii  avgt   50   46.220 ± 0.211  ns/op
[info] StringSerializerBenchmark.serializeImproved            16         ascii  avgt   50   50.856 ± 0.826  ns/op
[info] StringSerializerBenchmark.serializeImproved            32         ascii  avgt   50   63.221 ± 1.130  ns/op

So for large strings the new implementation is much faster, and for short it's not regressing (and even slightly faster).

@shuttie
Copy link
Contributor Author

shuttie commented Dec 4, 2019

@pnowojski as for large strings, this implementation is also looking quite nice:

this PR:

Benchmark                                 (lengthStr)   (type)   Mode  Cnt    Score     Error   Units
StringSerializationBenchmark.stringRead          1024    ascii  thrpt   30  769.067 ±   9.803  ops/ms
StringSerializationBenchmark.stringRead          1024  russian  thrpt   30  293.632 ±  22.269  ops/ms
StringSerializationBenchmark.stringRead          1024  chinese  thrpt   30  260.280 ±   0.768  ops/ms
StringSerializationBenchmark.stringRead          4096    ascii  thrpt   30  144.826 ±  21.883  ops/ms
StringSerializationBenchmark.stringRead          4096  russian  thrpt   30   74.815 ±   1.635  ops/ms
StringSerializationBenchmark.stringRead          4096  chinese  thrpt   30   67.306 ±   2.223  ops/ms
StringSerializationBenchmark.stringRead         16384    ascii  thrpt   30   53.418 ±   0.589  ops/ms
StringSerializationBenchmark.stringRead         16384  russian  thrpt   30   20.338 ±   0.374  ops/ms
StringSerializationBenchmark.stringRead         16384  chinese  thrpt   30   17.313 ±   0.126  ops/ms
StringSerializationBenchmark.stringRead         65536    ascii  thrpt   30   10.042 ±   1.524  ops/ms
StringSerializationBenchmark.stringRead         65536  russian  thrpt   30    5.055 ±   0.018  ops/ms
StringSerializationBenchmark.stringRead         65536  chinese  thrpt   30    4.342 ±   0.037  ops/ms
StringSerializationBenchmark.stringWrite         1024    ascii  thrpt   30  771.981 ± 160.013  ops/ms
StringSerializationBenchmark.stringWrite         1024  russian  thrpt   30  456.973 ±   1.563  ops/ms
StringSerializationBenchmark.stringWrite         1024  chinese  thrpt   30  250.321 ±   0.953  ops/ms
StringSerializationBenchmark.stringWrite         4096    ascii  thrpt   30  106.595 ±   0.496  ops/ms
StringSerializationBenchmark.stringWrite         4096  russian  thrpt   30   70.336 ±   0.157  ops/ms
StringSerializationBenchmark.stringWrite         4096  chinese  thrpt   30   49.363 ±   0.236  ops/ms
StringSerializationBenchmark.stringWrite        16384    ascii  thrpt   30   26.593 ±   0.099  ops/ms
StringSerializationBenchmark.stringWrite        16384  russian  thrpt   30   17.362 ±   0.077  ops/ms
StringSerializationBenchmark.stringWrite        16384  chinese  thrpt   30   13.487 ±   1.534  ops/ms
StringSerializationBenchmark.stringWrite        65536    ascii  thrpt   30   11.295 ±   2.286  ops/ms
StringSerializationBenchmark.stringWrite        65536  russian  thrpt   30    5.805 ±   0.753  ops/ms
StringSerializationBenchmark.stringWrite        65536  chinese  thrpt   30    3.707 ±   0.326  ops/ms

master:

Benchmark                                 (lengthStr)   (type)   Mode  Cnt    Score   Error   Units
StringSerializationBenchmark.stringRead          1024    ascii  thrpt   30   70.249 ± 0.458  ops/ms
StringSerializationBenchmark.stringRead          1024  russian  thrpt   30   36.628 ± 0.091  ops/ms
StringSerializationBenchmark.stringRead          1024  chinese  thrpt   30   24.181 ± 0.094  ops/ms
StringSerializationBenchmark.stringRead          4096    ascii  thrpt   30   17.698 ± 0.313  ops/ms
StringSerializationBenchmark.stringRead          4096  russian  thrpt   30    9.086 ± 0.064  ops/ms
StringSerializationBenchmark.stringRead          4096  chinese  thrpt   30    6.048 ± 0.024  ops/ms
StringSerializationBenchmark.stringRead         16384    ascii  thrpt   30    4.382 ± 0.024  ops/ms
StringSerializationBenchmark.stringRead         16384  russian  thrpt   30    2.270 ± 0.008  ops/ms
StringSerializationBenchmark.stringRead         16384  chinese  thrpt   30    1.515 ± 0.007  ops/ms
StringSerializationBenchmark.stringRead         65536    ascii  thrpt   30    1.109 ± 0.005  ops/ms
StringSerializationBenchmark.stringRead         65536  russian  thrpt   30    0.567 ± 0.002  ops/ms
StringSerializationBenchmark.stringRead         65536  chinese  thrpt   30    0.379 ± 0.002  ops/ms
StringSerializationBenchmark.stringWrite         1024    ascii  thrpt   30  175.745 ± 1.416  ops/ms
StringSerializationBenchmark.stringWrite         1024  russian  thrpt   30   52.724 ± 0.231  ops/ms
StringSerializationBenchmark.stringWrite         1024  chinese  thrpt   30   45.952 ± 5.209  ops/ms
StringSerializationBenchmark.stringWrite         4096    ascii  thrpt   30   42.445 ± 0.288  ops/ms
StringSerializationBenchmark.stringWrite         4096  russian  thrpt   30   22.000 ± 0.320  ops/ms
StringSerializationBenchmark.stringWrite         4096  chinese  thrpt   30   13.603 ± 1.681  ops/ms
StringSerializationBenchmark.stringWrite        16384    ascii  thrpt   30    7.062 ± 0.042  ops/ms
StringSerializationBenchmark.stringWrite        16384  russian  thrpt   30    3.532 ± 0.022  ops/ms
StringSerializationBenchmark.stringWrite        16384  chinese  thrpt   30    2.527 ± 0.015  ops/ms
StringSerializationBenchmark.stringWrite        65536    ascii  thrpt   30    1.741 ± 0.007  ops/ms
StringSerializationBenchmark.stringWrite        65536  russian  thrpt   30    0.893 ± 0.002  ops/ms
StringSerializationBenchmark.stringWrite        65536  chinese  thrpt   30    0.635 ± 0.004  ops/ms

@StephanEwen
Copy link
Contributor

Some thoughts for follow-up:

  • Do you know where exactly the performance difference comes from? Is it mainly the many individual read() single byte operations, that get more efficient if you bulk get into a byte array?

  • We can actually break the serialization format, as long as we change the StringSerializer config snapshot and restore methods. We can return "needs conversion" as the compatibility and return a serializer with the old encoding as the restore serializer.

  • Thinking twice, the above would only work if all parts support serializer evoluation, and I think keys in RocksDB cannot be evolved right now (not yet implemented).

@shuttie
Copy link
Contributor Author

shuttie commented Dec 4, 2019

@StephanEwen yes, most of the difference comes from multiple single-byte read operations. And CPU cannot parallelize them, as there is a data dependency between characters. Buffering the whole string we improve the situation with parallelism, so CPU can process multiple characters at once.

I've considered breaking the serialization format for strings (and even did an experiment with this approach), but there are a ton of side-effects for end-users (like you've mentioned keys in rocksdb) and the only positive result of this was a slight improvement for long non-ascii strings, compared to the implementation in this PR. I guess it's not worth it :)

I've also added a test for validating binary compatibility of this change.

@StephanEwen
Copy link
Contributor

@shuttie Got it, makes a lot of sense to me.

I was wondering at some point if it would be worthwhile to just "unsafe arraycopy" the char[] from the string to the byte[] in the memory segments or stream buffers. So basically no byte-wise logic at all in the serialization. That would increase the state size (all chars would have two bytes), but might save CPU resources.

Have you ever experimented with something that?

@shuttie
Copy link
Contributor Author

shuttie commented Dec 4, 2019

@StephanEwen the only issue with this approach is that the internal binary representation of Strings between jvm8 and jvm9+ is different, see the JEP-254 for the details (TLDR: starting from jvm9 ascii strings have compact 1-byte internal representation). So then you will loose the compatibility between JVM versions, that's why it's called Unsafe :)

@pnowojski
Copy link
Contributor

(for any committer that will be merging this, please wait until the benchmarks are merged first)

@shuttie I do not understand the results that you posted here. It's in throughput mode, ops/ms, more is better, and it looks like master is out performing the improved version by quite a lot. Did you miss labeled something?

@shuttie
Copy link
Contributor Author

shuttie commented Dec 4, 2019

@pnowojski oops, you're right, I've accidentally swapped the labels. I've updated the huge string benchmark results in the post you linked to.

Copy link
Contributor

@AHeise AHeise left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I have one question regarding wider unicode chars and correctness (no new measurements needed). The answer could be one additional test or just a reply.
Like I said on the benchmark PR, I will not merge this PR today, so that we can have one benchmark run on old master.

} else if (c < HIGH_BIT14) {
buffer[position++] = (byte)(c | HIGH_BIT);
buffer[position++] = (byte)((c >>> 7));
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I stumbled upon this piece. Since we are encoding a Java char as bytes, we will never exceed 3 bytes, right?
I think, I would be more assured if we can have a test with a 4 byte unicode. https://www.fileformat.info/info/unicode/char/1f4a3/index.htm
As per my understanding that unicode char would result in two java chars, which are treated individually in this loop, such that they result in 2 bytes each or a total of 4 bytes. Did I get it correctly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java characters (e.g. char and Character type) have internal UTF16 encoding, meaning that they will always be encoded as 16-bit numbers. But there are some symbols called surrogate pairs, which may require a pair of 16-bit characters to be encoded.

So from the wire format point of view, when you do "🌉".length() you will get 2 (as there are two underlying 16-bit characters to encode this symbol), but with "🌉".codePointCount(0,"🌉".length()) you will get 1 (as there only one single symbol). But the string serializer is not operating on symbols and UTF code points, it is operating on the underlying 16-bit numbers, so it must handle it properly out of the box.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AHeise I've added a trivial test for surrogate pairs serialization, validating that these cases are still handled properly.

@pnowojski
Copy link
Contributor

Thanks for all of the work @shuttie.

I will try to merge before the feature freeze for 1.10 release, but I have to wait for green travis build, and there is currently quite long build queue.

@StephanEwen
Copy link
Contributor

Looks like the build passed :-)

BTW, I circumvent the Apache Flink Travis Queue by pushing interesting PRs as branches to my personal repository and free Travis account. That works quite alright.

@pnowojski pnowojski merged commit be967fa into apache:master Dec 7, 2019
@pnowojski
Copy link
Contributor

Thanks for the very detailed analysis and nice contribution @shuttie!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants