Skip to content

Commit f58dca4

Browse files
committed
Allow to use bucket storage policy
1 parent 5820e7b commit f58dca4

File tree

3 files changed

+77
-18
lines changed

3 files changed

+77
-18
lines changed

conf/default.cfg

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@ sds_default_account = myaccount
4343
# backend.
4444
oio_storage_policies=SINGLE,EC123,EC64,THREECOPIES,FOURCOPIES
4545

46+
# Enable or disable the use of bucket storage policy.
47+
# When enabled, and if a storage policy is defined for a bucket (root container),
48+
# all objects in this bucket will be uploaded with this storage policy.
49+
# use_bucket_storage_policy = false
50+
4651
# The configuration to chose the policy when unspecified. All the storage
4752
# policies mentioned here must have been listed in the `oio_storage_policies`
4853
# configuration directive. The string must respect the subsequent format:

oioswift/proxy/controllers/obj.py

Lines changed: 69 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -211,34 +211,83 @@ def enforce_versioning(self, req):
211211
root_container.endswith(MULTIUPLOAD_SUFFIX)):
212212
return None
213213

214-
# We can't use _get_info_from_caches as it would use local worker cache
215-
# first and an update of versioning mode may not be detected.
216-
memcache = getattr(self.app, 'memcache', None) or \
217-
req.environ.get('swift.cache')
218-
if memcache is None:
214+
oio_cache = req.environ.get('oio.cache')
215+
memcache = None
216+
if oio_cache is None:
217+
# We can't use _get_info_from_caches as it would use local worker
218+
# cache first and an update of versioning mode may not be detected.
219+
memcache = getattr(self.app, 'memcache', None) or \
220+
req.environ.get('swift.cache')
221+
222+
if memcache is not None:
223+
key = "/".join(("versioning", self.account_name, root_container))
224+
value = memcache.get(key)
225+
if value is not None:
226+
if value:
227+
req.headers[FORCEVERSIONING_HEADER] = value
228+
return
229+
230+
oio_headers = {REQID_HEADER: self.trans_id}
231+
perfdata = req.environ.get('oio.perfdata')
232+
try:
233+
meta = self.app.storage.container_get_properties(
234+
self.account_name, root_container, headers=oio_headers,
235+
cache=oio_cache, perfdata=perfdata)
236+
except exceptions.NoSuchContainer:
237+
raise HTTPNotFound(request=req)
238+
239+
value = meta['system'].get('sys.m2.policy.version')
240+
if memcache is not None:
241+
memcache.set(key, value or '')
242+
243+
if value:
244+
req.headers[FORCEVERSIONING_HEADER] = value
245+
246+
def use_bucket_storage_policy(self, req):
247+
"""
248+
Enforce the storage policy mode of a container just before executing
249+
an object operation. This is useful when the current object is not
250+
stored in the "main" container but in a shard,
251+
where the storage policy mode may not have been set yet.
252+
"""
253+
if not self.app.use_bucket_storage_policy:
219254
return None
220255

221-
key = "/".join(("versioning", self.account_name, root_container))
222-
val = memcache.get(key)
223-
if val is not None:
224-
if val != '':
225-
req.headers[FORCEVERSIONING_HEADER] = val
226-
return
256+
root_container = req.headers.get(BUCKET_NAME_HEADER)
257+
if root_container is None:
258+
return None
259+
if root_container.endswith(MULTIUPLOAD_SUFFIX):
260+
root_container = root_container[:-len(MULTIUPLOAD_SUFFIX)]
227261

228-
oio_headers = {REQID_HEADER: self.trans_id}
229262
oio_cache = req.environ.get('oio.cache')
263+
memcache = None
264+
if oio_cache is None:
265+
# We can't use _get_info_from_caches as it would use local worker
266+
# cache first and an update of storage policy mode may not be
267+
# detected.
268+
memcache = getattr(self.app, 'memcache', None) or \
269+
req.environ.get('swift.cache')
270+
271+
if memcache is not None:
272+
key = "/".join(("policy", self.account_name, root_container))
273+
value = memcache.get(key)
274+
if value is not None:
275+
return value or None
276+
277+
oio_headers = {REQID_HEADER: self.trans_id}
230278
perfdata = req.environ.get('oio.perfdata')
231279
try:
232280
meta = self.app.storage.container_get_properties(
233281
self.account_name, root_container, headers=oio_headers,
234282
cache=oio_cache, perfdata=perfdata)
235283
except exceptions.NoSuchContainer:
236284
raise HTTPNotFound(request=req)
285+
value = meta['system'].get('sys.m2.policy.storage')
286+
287+
if memcache is not None:
288+
memcache.set(key, value or '')
237289

238-
val = meta['system'].get('sys.m2.policy.version', '')
239-
memcache.set(key, val)
240-
if val:
241-
req.headers[FORCEVERSIONING_HEADER] = val
290+
return value
242291

243292
def get_object_head_resp(self, req):
244293
storage = self.app.storage
@@ -647,15 +696,15 @@ def _object_create(self, account, container, **kwargs):
647696
def _store_object(self, req, data_source, headers):
648697
content_type = req.headers.get('content-type', 'octet/stream')
649698
policy = None
650-
container_info = self.container_info(self.account_name,
651-
self.container_name, req)
652699
if 'X-Oio-Storage-Policy' in req.headers:
653700
policy = req.headers.get('X-Oio-Storage-Policy')
654701
if not self.app.POLICIES.get_by_name(policy):
655702
raise HTTPBadRequest(
656703
"invalid policy '%s', must be in %s" %
657704
(policy, self.app.POLICIES.by_name.keys()))
658705
else:
706+
container_info = self.container_info(self.account_name,
707+
self.container_name, req)
659708
try:
660709
policy_index = int(
661710
req.headers.get('X-Backend-Storage-Policy-Index',
@@ -665,6 +714,8 @@ def _store_object(self, req, data_source, headers):
665714
if policy_index != 0:
666715
policy = self.app.POLICIES.get_by_index(policy_index).name
667716
else:
717+
policy = self.use_bucket_storage_policy(req)
718+
if policy is None:
668719
content_length = int(req.headers.get('content-length', 0))
669720
policy = self._get_auto_policy_from_size(content_length)
670721

oioswift/server.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ def __init__(self, conf, memcache=None, logger=None, account_ring=None,
5959
for k, v in conf.iteritems()
6060
if k.startswith("sds_")}
6161

62+
self.use_bucket_storage_policy = config_true_value(
63+
conf.get('use_bucket_storage_policy', 'false'))
64+
6265
self.oio_stgpol = []
6366
if 'auto_storage_policies' in conf:
6467
for elem in conf['auto_storage_policies'].split(','):

0 commit comments

Comments
 (0)