Skip to content

refactor: replace manual Parquet checkpointing with DataFrame.checkpoint() #594

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: master
Choose a base branch
from

Conversation

ericsun95
Copy link

What changes were proposed in this pull request?

refactor: replace manual Parquet checkpointing with DataFrame.checkpoint() #593

  • Simplify checkpoint logic in Connected Components using Spark DataFrame API
  • Use built-in DataFrame checkpointing to replace custom checkpoint workaround

Why are the changes needed?

  • Fix potential consistency issues in Connected Components checkpointing on S3
  • Avoid manual Parquet I/O for checkpointing in CC algorithm

@SauronShepherd
Copy link
Contributor

SauronShepherd commented Apr 23, 2025 via email

@ericsun95
Copy link
Author

ericsun95 commented Apr 23, 2025

log4j I deleted, it is a miss commit.

@ericsun95
Copy link
Author

But the test suite has to be changed since Spark checkpoint behave differently than saving to the parquet directly, it won't work

@ericsun95
Copy link
Author

I'm not sure about this changes. Why are you changing log4j and the test

suite?

Besides, we had in mind to create a centralized checkpointing mechanism.

https://blog.devgenius.io/apache-spark-wtf-welcome-to-hell-83aa677156e5

We could go ahead with this PR, but it should be replaced sooner or later.

El mié, 23 abr 2025, 18:29, ericsun95 @.***> escribió:

What changes were proposed in this pull request?

refactor: replace manual Parquet checkpointing with DataFrame.checkpoint()

#593 #593

  • Simplify checkpoint logic in Connected Components using Spark

DataFrame API

  • Use built-in DataFrame checkpointing to replace custom checkpoint

workaround

Why are the changes needed?

  • Fix potential consistency issues in Connected Components

checkpointing on S3

  • Avoid manual Parquet I/O for checkpointing in CC algorithm

You can view, comment on, or merge this pull request online at:

#594

Commit Summary

ac208bd

use the dataframe checkpoint in connected component computation

ad88256

use the dataframe checkpoint in connected component computation

319a621

Merge branch 'master' of https://github.com/ericsun95/graphframes

647d84e

Merge branch 'graphframes:master' into master

ea8cac7

Update the test to correctly reflect the checkpoint behavior

6bd31f6

format the code

47feeb4

format the code

d1c0b07

format the code

File Changes

(3 files https://github.com/graphframes/graphframes/pull/594/files)

  • M src/main/scala/org/graphframes/lib/ConnectedComponents.scala

https://github.com/graphframes/graphframes/pull/594/files#diff-f8d47cdb6a1b97658e89673c2c756a8eaff3707cc2036e501343b7b1d0b13467

(22)

  • M src/test/resources/log4j.properties

https://github.com/graphframes/graphframes/pull/594/files#diff-0fbc367524b5ef18380949741f4c93258b990004da3b30159bfc2ff0f69ffaf5

(4)

  • M src/test/scala/org/graphframes/lib/ConnectedComponentsSuite.scala

https://github.com/graphframes/graphframes/pull/594/files#diff-bb02afae8c5593283a3f757090c784685e0b53297ceeaa5153f3d30dfa563f12

(51)

Patch Links:

Reply to this email directly, view it on GitHub

#594, or unsubscribe

https://github.com/notifications/unsubscribe-auth/ACCN676QT6ICD5VA65ZFBPD2265W3AVCNFSM6AAAAAB3WZ7VL6VHI2DSMVQWIX3LMV43ASLTON2WKOZTGAYTINRQGE2TSOA

.

You are receiving this because you are subscribed to this thread.Message

ID: @.***>

Are you sharing something else? The linked blog doesn't mention anything about checkpoint.

@codecov-commenter
Copy link

⚠️ Please install the 'codecov app svg image' to ensure uploads and comments are reliably processed by Codecov.

Codecov Report

Attention: Patch coverage is 83.33333% with 1 line in your changes missing coverage. Please review.

Project coverage is 89.47%. Comparing base (bc487ef) to head (e3bc8b4).
Report is 28 commits behind head on master.

Files with missing lines Patch % Lines
...cala/org/graphframes/lib/ConnectedComponents.scala 83.33% 1 Missing ⚠️

❗ Your organization needs to install the Codecov GitHub app to enable full functionality.

Additional details and impacted files
@@            Coverage Diff             @@
##           master     #594      +/-   ##
==========================================
- Coverage   91.43%   89.47%   -1.97%     
==========================================
  Files          18       20       +2     
  Lines         829     1026     +197     
  Branches       52      126      +74     
==========================================
+ Hits          758      918     +160     
- Misses         71      108      +37     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Collaborator

@SemyonSinchenko SemyonSinchenko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ericsun95 Thanks for the contribution! It LGTM overall, but I left a couple of comments related to setting checkpoint dir.

// remove previous checkpoint
// enable checkpointing if not yet done
if (spark.sparkContext.getCheckpointDir.isEmpty) {
spark.sparkContext.setCheckpointDir(checkpointDir.get)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should return it to the initial state after convergence. So, if it was empty before, it should be empty after.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah good call out, will do

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will have multitenancy issues. What if more than one user is working in a single spark cluster?

Or even in a single tenant world, imagine there are two pipelines feeding a binary node and they are both leveraging the checkpoint dir

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@james-willis SparkContext exists per session, so if there are multiple users using one spark cluster (for example, via YARN), all of them will have own SparkContext. For the case of two pipelines: again, both should have own SparkContext.

For me, returning this var to an initial state is important.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when I say pipeline i mean two children of a binary node in one query. Anyway I want to copy my other comment here:

In my opinion we should just let spark managed the checkpoint location if we want to switch to using the Dataframe checkpoint method. Users can use spark.cleaner.referenceTracking.cleanCheckpoints if they think its important to GC the checkpoint dir.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine either way. But one thing I would like to discuss here is whether we need to delete the checkpoint files within iterations.
The previous behavior is saving to s3 as parquet and clean previous checkpoint files after certain iterations.
However, with spark offered checkpoint method, it is hard for us to know exactly what are the files to be deleted after certain iteration under a fixed parent checkpointDir. So if we don't manage at the parent folder level, we would expect continuously accumulated checkpoint files.

The user doesn't have access to the internal loop which can be a pain if they have limited disk available.

Any ideas on this? I think either way we need to deprecate existing approach and find a way for users to manage the resources within loops.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark.cleaner.referenceTracking.cleanCheckpoints manages this.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That wouldn't keep the old behavior and it is a global configuration. Ideally if we want to be compatible we need to clean it iterations by iterations. Like after 5 iterations clean the previous checkpoint files

@rjurney
Copy link
Collaborator

rjurney commented Apr 25, 2025

Just a comment, in other places we’ve used a random temp dir for checkpoints, could be the base dir.

@ericsun95
Copy link
Author

Just a comment, in other places we’ve used a random temp dir for checkpoints, could be the base dir.

Can you share any link on the "other places" you talked about for reference?

@james-willis
Copy link
Collaborator

In my opinion we should just let spark managed the checkpoint location if we want to switch to using the Dataframe checkpoint method. Users can use spark.cleaner.referenceTracking.cleanCheckpoints if they think its important to GC the checkpoint dir.

If we want to maintain the current behavior lets just leave it the way it is.

// remove previous checkpoint
// enable checkpointing if not yet done
if (spark.sparkContext.getCheckpointDir.isEmpty) {
spark.sparkContext.setCheckpointDir(checkpointDir.get)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will have multitenancy issues. What if more than one user is working in a single spark cluster?

Or even in a single tenant world, imagine there are two pipelines feeding a binary node and they are both leveraging the checkpoint dir

if (spark.sparkContext.getCheckpointDir.isEmpty) {
spark.sparkContext.setCheckpointDir(checkpointDir.get)
}
ee = ee.checkpoint(eager = true)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does eager help with?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Help trigger the checkpoint immediately otherwise it is lazy.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok so avoiding the unpersists happening before the checkpoint happens.

@rjurney
Copy link
Collaborator

rjurney commented May 14, 2025

In my opinion we should just let spark managed the checkpoint location if we want to switch to using the Dataframe checkpoint method. Users can use spark.cleaner.referenceTracking.cleanCheckpoints if they think its important to GC the checkpoint dir.

If we want to maintain the current behavior lets just leave it the way it is.

This makes sense to me. Why is this a graphframes feature when it’s a spark feature?

@Kimahriman
Copy link
Contributor

Another thing that would be interesting is the option to use local checkpointing to avoid having to write to durable storage at all. It would be less resilient to errors but more performant

@SemyonSinchenko
Copy link
Collaborator

Another thing that would be interesting is the option to use local checkpointing to avoid having to write to durable storage at all. It would be less resilient to errors but more performant

I like that approach. I will work on implementation.

@SemyonSinchenko
Copy link
Collaborator

Another thing that would be interesting is the option to use local checkpointing to avoid having to write to durable storage at all. It would be less resilient to errors but more performant

Local checkpoints as an option were added in #662

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants