Skip to content

Commit 57ab804

Browse files
committed
table prefix key schema check and test
1 parent f691cdf commit 57ab804

File tree

4 files changed

+131
-15
lines changed

4 files changed

+131
-15
lines changed

Diff for: pkg/storage/config/schema_config.go

+9-4
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ var (
3939
errInvalidSchemaVersion = errors.New("invalid schema version")
4040
errInvalidTablePeriod = errors.New("the table period must be a multiple of 24h (1h for schema v1)")
4141
errInvalidTableName = errors.New("invalid table name")
42+
errInvalidTablePrefixValue = errors.New("table prefix can not contain a path delimiter")
4243
errConfigFileNotSet = errors.New("schema config file needs to be set")
4344
errConfigChunkPrefixNotSet = errors.New("schema config for chunks is missing the 'prefix' setting")
4445
errSchemaIncreasingFromTime = errors.New("from time in schemas must be distinct and in increasing order")
@@ -579,15 +580,15 @@ func (cfg IndexPeriodicTableConfig) MarshalYAML() (interface{}, error) {
579580

580581
func ValidatePathPrefix(prefix string) error {
581582
if prefix == "" {
582-
return errors.New("prefix must be set")
583+
return errors.New("path prefix must be set")
583584
} else if strings.Contains(prefix, "\\") {
584585
// When using windows filesystem as object store the implementation of ObjectClient in Cortex takes care of conversion of separator.
585586
// We just need to always use `/` as a path separator.
586-
return fmt.Errorf("prefix should only have '%s' as a path separator", pathPrefixDelimiter)
587+
return fmt.Errorf("path prefix should only have '%s' as a path separator", pathPrefixDelimiter)
587588
} else if strings.HasPrefix(prefix, pathPrefixDelimiter) {
588-
return errors.New("prefix should never start with a path separator i.e '/'")
589+
return errors.New("path prefix should never start with a path separator i.e '/'")
589590
} else if !strings.HasSuffix(prefix, pathPrefixDelimiter) {
590-
return errors.New("prefix should end with a path separator i.e '/'")
591+
return errors.New("path prefix should end with a path separator i.e '/'")
591592
}
592593

593594
return nil
@@ -639,6 +640,10 @@ func (cfg PeriodicTableConfig) Validate() error {
639640
return errInvalidTablePeriod
640641
}
641642

643+
if strings.Index(cfg.Prefix, pathPrefixDelimiter) > -1 {
644+
return errInvalidTablePrefixValue
645+
}
646+
642647
return nil
643648
}
644649

Diff for: pkg/storage/config/schema_config_test.go

+16
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,22 @@ func TestSchemaConfig_Validate(t *testing.T) {
170170
},
171171
err: nil,
172172
},
173+
"should fail on index table prefix ending in path delimiter": {
174+
config: &SchemaConfig{
175+
Configs: []PeriodConfig{
176+
{
177+
Schema: "v13",
178+
IndexTables: IndexPeriodicTableConfig{
179+
PathPrefix: "index/",
180+
PeriodicTableConfig: PeriodicTableConfig{
181+
Prefix: "v13/key_",
182+
},
183+
},
184+
},
185+
},
186+
},
187+
err: errInvalidTablePrefixValue,
188+
},
173189
"should pass on index and chunk table period set to zero (no period tables)": {
174190
config: &SchemaConfig{
175191
Configs: []PeriodConfig{

Diff for: pkg/storage/stores/shipper/indexshipper/storage/cached_client.go

+15-4
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ func (c *cachedObjectClient) List(ctx context.Context, prefix, objectDelimiter s
9595

9696
prefix = strings.TrimSuffix(prefix, delimiter)
9797
ss := strings.Split(prefix, delimiter)
98+
// should only accept something like: table or table/userid
9899
if len(ss) > 2 {
99100
return nil, nil, fmt.Errorf("invalid prefix %s", prefix)
100101
}
@@ -304,19 +305,29 @@ func (t *table) buildCache(ctx context.Context, objectClient client.ObjectClient
304305
}
305306

306307
ss := strings.Split(object.Key, delimiter)
307-
if len(ss) < 2 || len(ss) > 3 {
308-
return fmt.Errorf("invalid key: %s", object.Key)
308+
309+
// db.gz
310+
if len(ss) < 2 {
311+
return fmt.Errorf("bare key without context: %s", object.Key)
309312
}
310313

311-
if len(ss) == 2 {
314+
// table/db.gz
315+
if len(ss) < 3 {
312316
t.commonObjects = append(t.commonObjects, object)
313-
} else {
317+
continue
318+
}
319+
320+
// table/userid/db.gz
321+
if len(ss) < 4 {
314322
userID := ss[1]
315323
if len(t.userObjects[userID]) == 0 {
316324
t.userIDs = append(t.userIDs, client.StorageCommonPrefix(path.Join(t.name, userID)))
317325
}
318326
t.userObjects[userID] = append(t.userObjects[userID], object)
327+
continue
319328
}
329+
330+
return fmt.Errorf("key too long: %s", object.Key)
320331
}
321332

322333
t.cacheBuiltAt = time.Now()

Diff for: pkg/storage/stores/shipper/indexshipper/storage/cached_client_test.go

+91-7
Original file line numberDiff line numberDiff line change
@@ -111,22 +111,106 @@ func TestCachedObjectClient_List(t *testing.T) {
111111
require.Equal(t, objectsFromListCall, objects)
112112
})
113113

114+
t.Run("supports path prefixed clients", func(t *testing.T) {
115+
ctx := context.Background()
116+
117+
path_prefix := "my/long/path/prefix/"
118+
objectsInStorage := []string{
119+
path_prefix + "table1/db.gz",
120+
path_prefix + "table2/db1.gz",
121+
path_prefix + "table2/db2.gz",
122+
path_prefix + "table3/user1/db.gz",
123+
path_prefix + "table3/user2/db.gz",
124+
}
125+
objectClient := newMockObjectClient(t, objectsInStorage)
126+
prefixedClient := client.NewPrefixedObjectClient(objectClient, path_prefix)
127+
cachedObjectClient := newCachedObjectClient(prefixedClient)
128+
129+
objects, _, err := cachedObjectClient.List(ctx, "table1/", "/", false)
130+
require.Nil(t, err)
131+
require.Equal(t, []string{"table1/db.gz"}, objectKeys(objects))
132+
133+
objects, _, err = cachedObjectClient.List(ctx, "table2/", "/", false)
134+
require.Nil(t, err)
135+
require.Equal(t, []string{"table2/db1.gz", "table2/db2.gz"}, objectKeys(objects))
136+
137+
objects, _, err = cachedObjectClient.List(ctx, "table3/user1", "/", false)
138+
require.Nil(t, err)
139+
require.Equal(t, []string{"table3/user1/db.gz"}, objectKeys(objects))
140+
})
141+
114142
t.Run("supports prefixed clients", func(t *testing.T) {
115143
ctx := context.Background()
116144

117-
prefix := "my/amazing/prefix/"
145+
path_prefix := "/"
146+
prefix := "prefix_term_"
147+
objectsInStorage := []string{
148+
path_prefix + prefix + "table1/db.gz",
149+
path_prefix + prefix + "table2/db1.gz",
150+
path_prefix + prefix + "table2/db2.gz",
151+
path_prefix + prefix + "table3/user1/db.gz",
152+
path_prefix + prefix + "table3/user2/db.gz",
153+
}
154+
objectClient := newMockObjectClient(t, objectsInStorage)
155+
prefixedClient := client.NewPrefixedObjectClient(objectClient, path_prefix)
156+
cachedObjectClient := newCachedObjectClient(prefixedClient)
157+
158+
objects, _, err := cachedObjectClient.List(ctx, "prefix_term_table1/", "/", false)
159+
require.Nil(t, err)
160+
require.Equal(t, []string{"prefix_term_table1/db.gz"}, objectKeys(objects))
161+
162+
objects, _, err = cachedObjectClient.List(ctx, "prefix_term_table2/", "/", false)
163+
require.Nil(t, err)
164+
require.Equal(t, []string{"prefix_term_table2/db1.gz", "prefix_term_table2/db2.gz"}, objectKeys(objects))
165+
166+
objects, _, err = cachedObjectClient.List(ctx, "prefix_term_table3/user1/", "/", false)
167+
require.Nil(t, err)
168+
require.Equal(t, []string{"prefix_term_table3/user1/db.gz"}, objectKeys(objects))
169+
})
170+
171+
t.Run("supports both path prefixed and prefix clients", func(t *testing.T) {
172+
ctx := context.Background()
173+
174+
path_prefix := "my/long/path/prefix/"
175+
prefix := "prefix_term_"
118176
objectsInStorage := []string{
119-
prefix + "table1/db.gz",
120-
prefix + "table2/db.gz",
121-
prefix + "table2/db2.gz",
177+
path_prefix + prefix + "table1/db.gz",
178+
path_prefix + prefix + "table2/db1.gz",
179+
path_prefix + prefix + "table2/db2.gz",
180+
path_prefix + prefix + "table3/user1/db.gz",
181+
path_prefix + prefix + "table3/user2/db.gz",
122182
}
123183
objectClient := newMockObjectClient(t, objectsInStorage)
124-
prefixedClient := client.NewPrefixedObjectClient(objectClient, prefix)
184+
prefixedClient := client.NewPrefixedObjectClient(objectClient, path_prefix)
125185
cachedObjectClient := newCachedObjectClient(prefixedClient)
126186

127-
objects, _, err := cachedObjectClient.List(ctx, "table2/", "/", false)
187+
objects, _, err := cachedObjectClient.List(ctx, "prefix_term_table1/", "/", false)
128188
require.Nil(t, err)
129-
require.Equal(t, []string{"table2/db.gz", "table2/db2.gz"}, objectKeys(objects))
189+
require.Equal(t, []string{"prefix_term_table1/db.gz"}, objectKeys(objects))
190+
191+
objects, _, err = cachedObjectClient.List(ctx, "prefix_term_table2/", "/", false)
192+
require.Nil(t, err)
193+
require.Equal(t, []string{"prefix_term_table2/db1.gz", "prefix_term_table2/db2.gz"}, objectKeys(objects))
194+
195+
objects, _, err = cachedObjectClient.List(ctx, "prefix_term_table3/user1/", "/", false)
196+
require.Nil(t, err)
197+
require.Equal(t, []string{"prefix_term_table3/user1/db.gz"}, objectKeys(objects))
198+
})
199+
200+
t.Run("does not support prefix with delimiter", func(t *testing.T) {
201+
ctx := context.Background()
202+
203+
path_prefix := "my/long/path/prefix/"
204+
prefix := "prefix/term/with/delimiter_"
205+
objectsInStorage := []string{
206+
path_prefix + prefix + "table1/db.gz",
207+
}
208+
objectClient := newMockObjectClient(t, objectsInStorage)
209+
prefixedClient := client.NewPrefixedObjectClient(objectClient, path_prefix)
210+
cachedObjectClient := newCachedObjectClient(prefixedClient)
211+
212+
_, _, err := cachedObjectClient.List(ctx, "prefix/term/with/delimiter_table1/", "/", false)
213+
require.Error(t, err)
130214
})
131215
}
132216

0 commit comments

Comments
 (0)