-
Notifications
You must be signed in to change notification settings - Fork 252
refactor: replace manual Parquet checkpointing with DataFrame.checkpoint() #594
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
use the dataframe checkpoint in connected component computation
use the dataframe checkpoint in connected component computation
I'm not sure about this changes. Why are you changing log4j and the test
suite?
Besides, we had in mind to create a centralized checkpointing mechanism.
https://blog.devgenius.io/apache-spark-wtf-welcome-to-hell-83aa677156e5
We could go ahead with this PR, but it should be replaced sooner or later.
El mié, 23 abr 2025, 18:29, ericsun95 ***@***.***> escribió:
… What changes were proposed in this pull request?
refactor: replace manual Parquet checkpointing with DataFrame.checkpoint()
#593 <#593>
- Simplify checkpoint logic in Connected Components using Spark
DataFrame API
- Use built-in DataFrame checkpointing to replace custom checkpoint
workaround
Why are the changes needed?
- Fix potential consistency issues in Connected Components
checkpointing on S3
- Avoid manual Parquet I/O for checkpointing in CC algorithm
------------------------------
You can view, comment on, or merge this pull request online at:
#594
Commit Summary
- ac208bd
<ac208bd>
use the dataframe checkpoint in connected component computation
- ad88256
<ad88256>
use the dataframe checkpoint in connected component computation
- 319a621
<319a621>
Merge branch 'master' of https://github.com/ericsun95/graphframes
- 647d84e
<647d84e>
Merge branch 'graphframes:master' into master
- ea8cac7
<ea8cac7>
Update the test to correctly reflect the checkpoint behavior
- 6bd31f6
<6bd31f6>
format the code
- 47feeb4
<47feeb4>
format the code
- d1c0b07
<d1c0b07>
format the code
File Changes
(3 files <https://github.com/graphframes/graphframes/pull/594/files>)
- *M* src/main/scala/org/graphframes/lib/ConnectedComponents.scala
<https://github.com/graphframes/graphframes/pull/594/files#diff-f8d47cdb6a1b97658e89673c2c756a8eaff3707cc2036e501343b7b1d0b13467>
(22)
- *M* src/test/resources/log4j.properties
<https://github.com/graphframes/graphframes/pull/594/files#diff-0fbc367524b5ef18380949741f4c93258b990004da3b30159bfc2ff0f69ffaf5>
(4)
- *M* src/test/scala/org/graphframes/lib/ConnectedComponentsSuite.scala
<https://github.com/graphframes/graphframes/pull/594/files#diff-bb02afae8c5593283a3f757090c784685e0b53297ceeaa5153f3d30dfa563f12>
(51)
Patch Links:
- https://github.com/graphframes/graphframes/pull/594.patch
- https://github.com/graphframes/graphframes/pull/594.diff
—
Reply to this email directly, view it on GitHub
<#594>, or unsubscribe
<https://github.com/notifications/unsubscribe-auth/ACCN676QT6ICD5VA65ZFBPD2265W3AVCNFSM6AAAAAB3WZ7VL6VHI2DSMVQWIX3LMV43ASLTON2WKOZTGAYTINRQGE2TSOA>
.
You are receiving this because you are subscribed to this thread.Message
ID: ***@***.***>
|
log4j I deleted, it is a miss commit. |
But the test suite has to be changed since Spark checkpoint behave differently than saving to the parquet directly, it won't work |
Are you sharing something else? The linked blog doesn't mention anything about checkpoint. |
Codecov ReportAttention: Patch coverage is
❗ Your organization needs to install the Codecov GitHub app to enable full functionality. Additional details and impacted files@@ Coverage Diff @@
## master #594 +/- ##
==========================================
- Coverage 91.43% 89.47% -1.97%
==========================================
Files 18 20 +2
Lines 829 1026 +197
Branches 52 126 +74
==========================================
+ Hits 758 918 +160
- Misses 71 108 +37 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ericsun95 Thanks for the contribution! It LGTM overall, but I left a couple of comments related to setting checkpoint dir.
// remove previous checkpoint | ||
// enable checkpointing if not yet done | ||
if (spark.sparkContext.getCheckpointDir.isEmpty) { | ||
spark.sparkContext.setCheckpointDir(checkpointDir.get) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should return it to the initial state after convergence. So, if it was empty before, it should be empty after.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah good call out, will do
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this will have multitenancy issues. What if more than one user is working in a single spark cluster?
Or even in a single tenant world, imagine there are two pipelines feeding a binary node and they are both leveraging the checkpoint dir
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@james-willis SparkContext exists per session, so if there are multiple users using one spark cluster (for example, via YARN), all of them will have own SparkContext. For the case of two pipelines: again, both should have own SparkContext.
For me, returning this var to an initial state is important.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when I say pipeline i mean two children of a binary node in one query. Anyway I want to copy my other comment here:
In my opinion we should just let spark managed the checkpoint location if we want to switch to using the Dataframe checkpoint method. Users can use spark.cleaner.referenceTracking.cleanCheckpoints if they think its important to GC the checkpoint dir.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am fine either way. But one thing I would like to discuss here is whether we need to delete the checkpoint files within iterations.
The previous behavior is saving to s3 as parquet and clean previous checkpoint files after certain iterations.
However, with spark offered checkpoint method, it is hard for us to know exactly what are the files to be deleted after certain iteration under a fixed parent checkpointDir. So if we don't manage at the parent folder level, we would expect continuously accumulated checkpoint files.
The user doesn't have access to the internal loop which can be a pain if they have limited disk available.
Any ideas on this? I think either way we need to deprecate existing approach and find a way for users to manage the resources within loops.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
spark.cleaner.referenceTracking.cleanCheckpoints
manages this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That wouldn't keep the old behavior and it is a global configuration. Ideally if we want to be compatible we need to clean it iterations by iterations. Like after 5 iterations clean the previous checkpoint files
Just a comment, in other places we’ve used a random temp dir for checkpoints, could be the base dir. |
Can you share any link on the "other places" you talked about for reference? |
In my opinion we should just let spark managed the checkpoint location if we want to switch to using the Dataframe checkpoint method. Users can use If we want to maintain the current behavior lets just leave it the way it is. |
// remove previous checkpoint | ||
// enable checkpointing if not yet done | ||
if (spark.sparkContext.getCheckpointDir.isEmpty) { | ||
spark.sparkContext.setCheckpointDir(checkpointDir.get) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this will have multitenancy issues. What if more than one user is working in a single spark cluster?
Or even in a single tenant world, imagine there are two pipelines feeding a binary node and they are both leveraging the checkpoint dir
if (spark.sparkContext.getCheckpointDir.isEmpty) { | ||
spark.sparkContext.setCheckpointDir(checkpointDir.get) | ||
} | ||
ee = ee.checkpoint(eager = true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does eager help with?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Help trigger the checkpoint immediately otherwise it is lazy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok so avoiding the unpersists happening before the checkpoint happens.
This makes sense to me. Why is this a graphframes feature when it’s a spark feature? |
Another thing that would be interesting is the option to use local checkpointing to avoid having to write to durable storage at all. It would be less resilient to errors but more performant |
I like that approach. I will work on implementation. |
Local checkpoints as an option were added in #662 |
What changes were proposed in this pull request?
refactor: replace manual Parquet checkpointing with DataFrame.checkpoint() #593
Why are the changes needed?