@api private
@return [Client]
@option options [Client] :client @option [Integer] :min_part_size (52428800) Size of copied parts.
Defaults to 50MB. will be constructed from the given `options' hash.
@option [Integer] :thread_count (10) Number of concurrent threads to
use for copying parts.
# File lib/aws-sdk-resources/services/s3/object_multipart_copier.rb, line 20 def initialize(options = {}) @thread_count = options.delete(:thread_count) || 10 @min_part_size = options.delete(:min_part_size) || (FIVE_MB * 10) @client = options[:client] || Client.new end
@api private
# File lib/aws-sdk-resources/services/s3/object_multipart_copier.rb, line 151 def self.options_for(shape_name) Client.api.metadata['shapes'][shape_name].member_names end
@option (see S3::Client#copy_object)
# File lib/aws-sdk-resources/services/s3/object_multipart_copier.rb, line 30 def copy(options = {}) size = source_size(options) options[:upload_id] = initiate_upload(options) begin parts = copy_parts(size, default_part_size(size), options) complete_upload(parts, options) rescue => error abort_upload(options) raise error end end
# File lib/aws-sdk-resources/services/s3/object_multipart_copier.rb, line 86 def abort_upload(options) @client.abort_multipart_upload({ bucket: options[:bucket], key: options[:key], upload_id: options[:upload_id], }) end
# File lib/aws-sdk-resources/services/s3/object_multipart_copier.rb, line 110 def byte_range(offset, default_part_size, size) if offset + default_part_size < size "bytes=#{offset}-#{offset + default_part_size - 1}" else "bytes=#{offset}-#{size - 1}" end end
# File lib/aws-sdk-resources/services/s3/object_multipart_copier.rb, line 80 def complete_upload(parts, options) options = options_for(:complete_multipart_upload, options) options[:multipart_upload] = { parts: parts } @client.complete_multipart_upload(options) end
# File lib/aws-sdk-resources/services/s3/object_multipart_copier.rb, line 94 def compute_parts(size, default_part_size, options) part_number = 1 offset = 0 parts = [] options = options_for(:upload_part_copy, options) while offset < size parts << options.merge({ part_number: part_number, copy_source_range: byte_range(offset, default_part_size, size), }) part_number += 1 offset += default_part_size end parts end
# File lib/aws-sdk-resources/services/s3/object_multipart_copier.rb, line 73 def copy_part(part) { etag: @client.upload_part_copy(part).copy_part_result.etag, part_number: part[:part_number], } end
# File lib/aws-sdk-resources/services/s3/object_multipart_copier.rb, line 58 def copy_part_thread(queue) Thread.new do begin completed = [] while part = queue.shift completed << copy_part(part) end completed rescue => error queue.clear! raise error end end end
# File lib/aws-sdk-resources/services/s3/object_multipart_copier.rb, line 49 def copy_parts(size, default_part_size, options) queue = PartQueue.new(compute_parts(size, default_part_size, options)) threads = [] @thread_count.times do threads << copy_part_thread(queue) end threads.map(&:value).flatten.sort_by{ |part| part[:part_number] } end
# File lib/aws-sdk-resources/services/s3/object_multipart_copier.rb, line 135 def default_part_size(source_size) if source_size < FIVE_MB raise ArgumentError, FILE_TOO_SMALL else [(source_size.to_f / MAX_PARTS).ceil, @min_part_size].max.to_i end end
# File lib/aws-sdk-resources/services/s3/object_multipart_copier.rb, line 44 def initiate_upload(options) options = options_for(:create_multipart_upload, options) @client.create_multipart_upload(options).upload_id end
# File lib/aws-sdk-resources/services/s3/object_multipart_copier.rb, line 143 def options_for(operation_name, options) API_OPTIONS[operation_name].inject({}) do |hash, opt_name| hash[opt_name] = options[opt_name] if options.key?(opt_name) hash end end
# File lib/aws-sdk-resources/services/s3/object_multipart_copier.rb, line 118 def source_size(options) return options.delete(:content_length) if options[:content_length] client = options[:copy_source_client] || @client if vid_match = options[:copy_source].match(/([^\/]+?)\/(.+)\?versionId=(.+)/) bucket, key, version_id = vid_match[1,3] else bucket, key = options[:copy_source].match(/([^\/]+?)\/(.+)/)[1,2] end key = CGI.unescape(key) opts = { bucket: bucket, key: key } opts[:version_id] = version_id if version_id client.head_object(opts).content_length end