Skip to content

Commit 3a0f551

Browse files
authored
docs: Add snippets for upload_chunks_concurrently and add chunk_size (#1135)
* docs: Add snippets for upload_chunks_concurrently and add chunk_size * switch from 'processes' to 'workers' in sample nomenclature * copyright * tests
1 parent a3a1159 commit 3a0f551

7 files changed

+140
-34
lines changed

samples/snippets/snippets_test.py

Lines changed: 47 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
import storage_transfer_manager_download_bucket
7676
import storage_transfer_manager_download_chunks_concurrently
7777
import storage_transfer_manager_download_many
78+
import storage_transfer_manager_upload_chunks_concurrently
7879
import storage_transfer_manager_upload_directory
7980
import storage_transfer_manager_upload_many
8081
import storage_upload_file
@@ -243,7 +244,10 @@ def test_upload_blob_with_kms(test_bucket):
243244
with tempfile.NamedTemporaryFile() as source_file:
244245
source_file.write(b"test")
245246
storage_upload_with_kms_key.upload_blob_with_kms(
246-
test_bucket.name, source_file.name, blob_name, KMS_KEY,
247+
test_bucket.name,
248+
source_file.name,
249+
blob_name,
250+
KMS_KEY,
247251
)
248252
bucket = storage.Client().bucket(test_bucket.name)
249253
kms_blob = bucket.get_blob(blob_name)
@@ -396,7 +400,10 @@ def test_move_blob(test_bucket_create, test_blob):
396400
print(f"test_move_blob not found in bucket {test_bucket_create.name}")
397401

398402
storage_move_file.move_blob(
399-
bucket.name, test_blob.name, test_bucket_create.name, "test_move_blob",
403+
bucket.name,
404+
test_blob.name,
405+
test_bucket_create.name,
406+
"test_move_blob",
400407
)
401408

402409
assert test_bucket_create.get_blob("test_move_blob") is not None
@@ -412,7 +419,10 @@ def test_copy_blob(test_blob):
412419
pass
413420

414421
storage_copy_file.copy_blob(
415-
bucket.name, test_blob.name, bucket.name, "test_copy_blob",
422+
bucket.name,
423+
test_blob.name,
424+
bucket.name,
425+
"test_copy_blob",
416426
)
417427

418428
assert bucket.get_blob("test_copy_blob") is not None
@@ -551,7 +561,10 @@ def test_define_bucket_website_configuration(test_bucket):
551561
def test_object_get_kms_key(test_bucket):
552562
with tempfile.NamedTemporaryFile() as source_file:
553563
storage_upload_with_kms_key.upload_blob_with_kms(
554-
test_bucket.name, source_file.name, "test_upload_blob_encrypted", KMS_KEY,
564+
test_bucket.name,
565+
source_file.name,
566+
"test_upload_blob_encrypted",
567+
KMS_KEY,
555568
)
556569
kms_key = storage_object_get_kms_key.object_get_kms_key(
557570
test_bucket.name, "test_upload_blob_encrypted"
@@ -568,7 +581,10 @@ def test_storage_compose_file(test_bucket):
568581

569582
with tempfile.NamedTemporaryFile() as dest_file:
570583
destination = storage_compose_file.compose_file(
571-
test_bucket.name, source_files[0], source_files[1], dest_file.name,
584+
test_bucket.name,
585+
source_files[0],
586+
source_files[1],
587+
dest_file.name,
572588
)
573589
composed = destination.download_as_string()
574590

@@ -608,7 +624,8 @@ def test_change_default_storage_class(test_bucket, capsys):
608624

609625
def test_change_file_storage_class(test_blob, capsys):
610626
blob = storage_change_file_storage_class.change_file_storage_class(
611-
test_blob.bucket.name, test_blob.name,
627+
test_blob.bucket.name,
628+
test_blob.name,
612629
)
613630
out, _ = capsys.readouterr()
614631
assert f"Blob {blob.name} in bucket {blob.bucket.name}" in out
@@ -694,7 +711,7 @@ def test_transfer_manager_snippets(test_bucket, capsys):
694711
test_bucket.name,
695712
BLOB_NAMES,
696713
source_directory="{}/".format(uploads),
697-
processes=8,
714+
workers=8,
698715
)
699716
out, _ = capsys.readouterr()
700717

@@ -706,7 +723,7 @@ def test_transfer_manager_snippets(test_bucket, capsys):
706723
storage_transfer_manager_download_bucket.download_bucket_with_transfer_manager(
707724
test_bucket.name,
708725
destination_directory=os.path.join(downloads, ""),
709-
processes=8,
726+
workers=8,
710727
max_results=10000,
711728
)
712729
out, _ = capsys.readouterr()
@@ -720,7 +737,7 @@ def test_transfer_manager_snippets(test_bucket, capsys):
720737
test_bucket.name,
721738
blob_names=BLOB_NAMES,
722739
destination_directory=os.path.join(downloads, ""),
723-
processes=8,
740+
workers=8,
724741
)
725742
out, _ = capsys.readouterr()
726743

@@ -763,18 +780,34 @@ def test_transfer_manager_download_chunks_concurrently(test_bucket, capsys):
763780
with tempfile.NamedTemporaryFile() as file:
764781
file.write(b"test")
765782

766-
storage_upload_file.upload_blob(
767-
test_bucket.name, file.name, BLOB_NAME
768-
)
783+
storage_upload_file.upload_blob(test_bucket.name, file.name, BLOB_NAME)
769784

770785
with tempfile.TemporaryDirectory() as downloads:
771786
# Download the file.
772787
storage_transfer_manager_download_chunks_concurrently.download_chunks_concurrently(
773788
test_bucket.name,
774789
BLOB_NAME,
775790
os.path.join(downloads, BLOB_NAME),
776-
processes=8,
791+
workers=8,
777792
)
778793
out, _ = capsys.readouterr()
779794

780-
assert "Downloaded {} to {}".format(BLOB_NAME, os.path.join(downloads, BLOB_NAME)) in out
795+
assert (
796+
"Downloaded {} to {}".format(BLOB_NAME, os.path.join(downloads, BLOB_NAME))
797+
in out
798+
)
799+
800+
801+
def test_transfer_manager_upload_chunks_concurrently(test_bucket, capsys):
802+
BLOB_NAME = "test_file.txt"
803+
804+
with tempfile.NamedTemporaryFile() as file:
805+
file.write(b"test")
806+
file.flush()
807+
808+
storage_transfer_manager_upload_chunks_concurrently.upload_chunks_concurrently(
809+
test_bucket.name, file.name, BLOB_NAME
810+
)
811+
812+
out, _ = capsys.readouterr()
813+
assert "File {} uploaded to {}".format(file.name, BLOB_NAME) in out

samples/snippets/storage_transfer_manager_download_bucket.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
# [START storage_transfer_manager_download_bucket]
1616
def download_bucket_with_transfer_manager(
17-
bucket_name, destination_directory="", processes=8, max_results=1000
17+
bucket_name, destination_directory="", workers=8, max_results=1000
1818
):
1919
"""Download all of the blobs in a bucket, concurrently in a process pool.
2020
@@ -40,8 +40,9 @@ def download_bucket_with_transfer_manager(
4040
# The maximum number of processes to use for the operation. The performance
4141
# impact of this value depends on the use case, but smaller files usually
4242
# benefit from a higher number of processes. Each additional process occupies
43-
# some CPU and memory resources until finished.
44-
# processes=8
43+
# some CPU and memory resources until finished. Threads can be used instead
44+
# of processes by passing `worker_type=transfer_manager.THREAD`.
45+
# workers=8
4546

4647
# The maximum number of results to fetch from bucket.list_blobs(). This
4748
# sample code fetches all of the blobs up to max_results and queues them all
@@ -60,7 +61,7 @@ def download_bucket_with_transfer_manager(
6061
blob_names = [blob.name for blob in bucket.list_blobs(max_results=max_results)]
6162

6263
results = transfer_manager.download_many_to_path(
63-
bucket, blob_names, destination_directory=destination_directory, max_workers=processes
64+
bucket, blob_names, destination_directory=destination_directory, max_workers=workers
6465
)
6566

6667
for name, result in zip(blob_names, results):

samples/snippets/storage_transfer_manager_download_chunks_concurrently.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@
1313
# limitations under the License.
1414

1515
# [START storage_transfer_manager_download_chunks_concurrently]
16-
def download_chunks_concurrently(bucket_name, blob_name, filename, processes=8):
16+
def download_chunks_concurrently(
17+
bucket_name, blob_name, filename, chunk_size=32 * 1024 * 1024, workers=8
18+
):
1719
"""Download a single file in chunks, concurrently in a process pool."""
1820

1921
# The ID of your GCS bucket
@@ -25,19 +27,29 @@ def download_chunks_concurrently(bucket_name, blob_name, filename, processes=8):
2527
# The destination filename or path
2628
# filename = ""
2729

30+
# The size of each chunk. The performance impact of this value depends on
31+
# the use case. The remote service has a minimum of 5 MiB and a maximum of
32+
# 5 GiB.
33+
# chunk_size = 32 * 1024 * 1024 (32 MiB)
34+
2835
# The maximum number of processes to use for the operation. The performance
2936
# impact of this value depends on the use case, but smaller files usually
3037
# benefit from a higher number of processes. Each additional process occupies
31-
# some CPU and memory resources until finished.
32-
# processes=8
38+
# some CPU and memory resources until finished. Threads can be used instead
39+
# of processes by passing `worker_type=transfer_manager.THREAD`.
40+
# workers=8
3341

3442
from google.cloud.storage import Client, transfer_manager
3543

3644
storage_client = Client()
3745
bucket = storage_client.bucket(bucket_name)
3846
blob = bucket.blob(blob_name)
3947

40-
transfer_manager.download_chunks_concurrently(blob, filename, max_workers=processes)
48+
transfer_manager.download_chunks_concurrently(
49+
blob, filename, chunk_size=chunk_size, max_workers=workers
50+
)
4151

4252
print("Downloaded {} to {}.".format(blob_name, filename))
53+
54+
4355
# [END storage_transfer_manager_download_chunks_concurrently]

samples/snippets/storage_transfer_manager_download_many.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
# [START storage_transfer_manager_download_many]
1616
def download_many_blobs_with_transfer_manager(
17-
bucket_name, blob_names, destination_directory="", processes=8
17+
bucket_name, blob_names, destination_directory="", workers=8
1818
):
1919
"""Download blobs in a list by name, concurrently in a process pool.
2020
@@ -46,16 +46,17 @@ def download_many_blobs_with_transfer_manager(
4646
# The maximum number of processes to use for the operation. The performance
4747
# impact of this value depends on the use case, but smaller files usually
4848
# benefit from a higher number of processes. Each additional process occupies
49-
# some CPU and memory resources until finished.
50-
# processes=8
49+
# some CPU and memory resources until finished. Threads can be used instead
50+
# of processes by passing `worker_type=transfer_manager.THREAD`.
51+
# workers=8
5152

5253
from google.cloud.storage import Client, transfer_manager
5354

5455
storage_client = Client()
5556
bucket = storage_client.bucket(bucket_name)
5657

5758
results = transfer_manager.download_many_to_path(
58-
bucket, blob_names, destination_directory=destination_directory, max_workers=processes
59+
bucket, blob_names, destination_directory=destination_directory, max_workers=workers
5960
)
6061

6162
for name, result in zip(blob_names, results):
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
# Copyright 2023 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the 'License');
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
# [START storage_transfer_manager_upload_chunks_concurrently]
16+
def upload_chunks_concurrently(
17+
bucket_name,
18+
source_filename,
19+
destination_blob_name,
20+
chunk_size=32 * 1024 * 1024,
21+
workers=8,
22+
):
23+
"""Upload a single file, in chunks, concurrently in a process pool."""
24+
# The ID of your GCS bucket
25+
# bucket_name = "your-bucket-name"
26+
27+
# The path to your file to upload
28+
# source_filename = "local/path/to/file"
29+
30+
# The ID of your GCS object
31+
# destination_blob_name = "storage-object-name"
32+
33+
# The size of each chunk. The performance impact of this value depends on
34+
# the use case. The remote service has a minimum of 5 MiB and a maximum of
35+
# 5 GiB.
36+
# chunk_size = 32 * 1024 * 1024 (32 MiB)
37+
38+
# The maximum number of processes to use for the operation. The performance
39+
# impact of this value depends on the use case. Each additional process
40+
# occupies some CPU and memory resources until finished. Threads can be used
41+
# instead of processes by passing `worker_type=transfer_manager.THREAD`.
42+
# workers=8
43+
44+
from google.cloud.storage import Client, transfer_manager
45+
46+
storage_client = Client()
47+
bucket = storage_client.bucket(bucket_name)
48+
blob = bucket.blob(destination_blob_name)
49+
50+
transfer_manager.upload_chunks_concurrently(
51+
source_filename, blob, chunk_size=chunk_size, max_workers=workers
52+
)
53+
54+
print(f"File {source_filename} uploaded to {destination_blob_name}.")
55+
56+
57+
# [END storage_transfer_manager_upload_chunks_concurrently]

samples/snippets/storage_transfer_manager_upload_directory.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
# limitations under the License.
1414

1515
# [START storage_transfer_manager_upload_directory]
16-
def upload_directory_with_transfer_manager(bucket_name, source_directory, processes=8):
16+
def upload_directory_with_transfer_manager(bucket_name, source_directory, workers=8):
1717
"""Upload every file in a directory, including all files in subdirectories.
1818
1919
Each blob name is derived from the filename, not including the `directory`
@@ -33,8 +33,9 @@ def upload_directory_with_transfer_manager(bucket_name, source_directory, proces
3333
# The maximum number of processes to use for the operation. The performance
3434
# impact of this value depends on the use case, but smaller files usually
3535
# benefit from a higher number of processes. Each additional process occupies
36-
# some CPU and memory resources until finished.
37-
# processes=8
36+
# some CPU and memory resources until finished. Threads can be used instead
37+
# of processes by passing `worker_type=transfer_manager.THREAD`.
38+
# workers=8
3839

3940
from pathlib import Path
4041

@@ -65,7 +66,7 @@ def upload_directory_with_transfer_manager(bucket_name, source_directory, proces
6566

6667
# Start the upload.
6768
results = transfer_manager.upload_many_from_filenames(
68-
bucket, string_paths, source_directory=source_directory, max_workers=processes
69+
bucket, string_paths, source_directory=source_directory, max_workers=workers
6970
)
7071

7172
for name, result in zip(string_paths, results):

samples/snippets/storage_transfer_manager_upload_many.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
# [START storage_transfer_manager_upload_many]
1616
def upload_many_blobs_with_transfer_manager(
17-
bucket_name, filenames, source_directory="", processes=8
17+
bucket_name, filenames, source_directory="", workers=8
1818
):
1919
"""Upload every file in a list to a bucket, concurrently in a process pool.
2020
@@ -43,16 +43,17 @@ def upload_many_blobs_with_transfer_manager(
4343
# The maximum number of processes to use for the operation. The performance
4444
# impact of this value depends on the use case, but smaller files usually
4545
# benefit from a higher number of processes. Each additional process occupies
46-
# some CPU and memory resources until finished.
47-
# processes=8
46+
# some CPU and memory resources until finished. Threads can be used instead
47+
# of processes by passing `worker_type=transfer_manager.THREAD`.
48+
# workers=8
4849

4950
from google.cloud.storage import Client, transfer_manager
5051

5152
storage_client = Client()
5253
bucket = storage_client.bucket(bucket_name)
5354

5455
results = transfer_manager.upload_many_from_filenames(
55-
bucket, filenames, source_directory=source_directory, max_workers=processes
56+
bucket, filenames, source_directory=source_directory, max_workers=workers
5657
)
5758

5859
for name, result in zip(filenames, results):

0 commit comments

Comments
 (0)