@@ -2,27 +2,34 @@ use crate::prelude::*;
2
2
3
3
use super :: schema:: { EnrichedValueType , FieldSchema } ;
4
4
use serde:: { Deserialize , Serialize } ;
5
- use serde_json:: Value ;
6
5
use std:: fmt;
7
6
use std:: ops:: Deref ;
8
7
9
- // Define SpecFormatMode enum for type-safe formatting
8
+ /// OutputMode enum for displaying spec info in different granularity
10
9
#[ derive( Debug , Clone , Copy , PartialEq ) ]
11
- pub enum SpecFormatMode {
10
+ pub enum OutputMode {
12
11
Concise ,
13
12
Verbose ,
14
13
}
15
14
16
- impl SpecFormatMode {
17
- pub fn from_str ( s : & str ) -> Result < Self , String > {
15
+ impl OutputMode {
16
+ pub fn from_str ( s : & str ) -> Self {
18
17
match s. to_lowercase ( ) . as_str ( ) {
19
- "concise" => Ok ( SpecFormatMode :: Concise ) ,
20
- "verbose" => Ok ( SpecFormatMode :: Verbose ) ,
21
- _ => Err ( format ! ( "Invalid format mode: {}" , s) ) ,
18
+ "concise" => OutputMode :: Concise ,
19
+ "verbose" => OutputMode :: Verbose ,
20
+ _ => unreachable ! (
21
+ "Invalid format mode: {}. Expected 'concise' or 'verbose'." ,
22
+ s
23
+ ) ,
22
24
}
23
25
}
24
26
}
25
27
28
+ /// Formatting spec per output mode
29
+ pub trait SpecFormatter {
30
+ fn format ( & self , mode : OutputMode ) -> String ;
31
+ }
32
+
26
33
#[ derive( Debug , Clone , Serialize , Deserialize ) ]
27
34
#[ serde( tag = "kind" ) ]
28
35
pub enum SpecString {
@@ -233,69 +240,31 @@ pub struct OpSpec {
233
240
pub spec : serde_json:: Map < String , serde_json:: Value > ,
234
241
}
235
242
236
- impl OpSpec {
237
- pub fn format_concise ( & self ) -> String {
238
- let mut parts = vec ! [ ] ;
239
- for ( key, value) in self . spec . iter ( ) {
240
- match value {
241
- Value :: String ( s) => parts. push ( format ! ( "{}={}" , key, s) ) ,
242
- Value :: Array ( arr) => {
243
- let items = arr
244
- . iter ( )
245
- . filter_map ( |v| v. as_str ( ) )
246
- . collect :: < Vec < _ > > ( )
247
- . join ( "," ) ;
248
- if !items. is_empty ( ) {
249
- parts. push ( format ! ( "{}={}" , key, items) ) ;
250
- }
251
- }
252
- Value :: Object ( obj) => {
253
- if let Some ( model) = obj. get ( "model" ) . and_then ( |v| v. as_str ( ) ) {
254
- parts. push ( format ! ( "{}={}" , key, model) ) ;
255
- }
256
- }
257
- _ => { }
258
- }
259
- }
260
- if parts. is_empty ( ) {
261
- self . kind . clone ( )
262
- } else {
263
- format ! ( "{}({})" , self . kind, parts. join( ", " ) )
264
- }
265
- }
266
-
267
- pub fn format_verbose ( & self ) -> String {
268
- let spec_str = serde_json:: to_string_pretty ( & self . spec )
269
- . map ( |s| {
270
- let lines: Vec < & str > = s. lines ( ) . collect ( ) ;
271
- if lines. len ( ) < s. lines ( ) . count ( ) {
272
- lines
273
- . into_iter ( )
274
- . chain ( [ "..." ] )
275
- . collect :: < Vec < _ > > ( )
276
- . join ( "\n " )
277
- } else {
278
- lines. join ( "\n " )
279
- }
280
- } )
281
- . unwrap_or ( "#serde_error" . to_string ( ) ) ;
282
- format ! ( "{}({})" , self . kind, spec_str)
283
- }
284
-
285
- pub fn format ( & self , mode : SpecFormatMode ) -> String {
243
+ impl SpecFormatter for OpSpec {
244
+ fn format ( & self , mode : OutputMode ) -> String {
286
245
match mode {
287
- SpecFormatMode :: Concise => self . format_concise ( ) ,
288
- SpecFormatMode :: Verbose => self . format_verbose ( ) ,
246
+ OutputMode :: Concise => self . kind . clone ( ) ,
247
+ OutputMode :: Verbose => {
248
+ let spec_str = serde_json:: to_string_pretty ( & self . spec )
249
+ . map ( |s| {
250
+ let lines: Vec < & str > = s. lines ( ) . collect ( ) ;
251
+ if lines. len ( ) < s. lines ( ) . count ( ) {
252
+ lines
253
+ . into_iter ( )
254
+ . chain ( [ "..." ] )
255
+ . collect :: < Vec < _ > > ( )
256
+ . join ( "\n " )
257
+ } else {
258
+ lines. join ( "\n " )
259
+ }
260
+ } )
261
+ . unwrap_or ( "#serde_error" . to_string ( ) ) ;
262
+ format ! ( "{}({})" , self . kind, spec_str)
263
+ }
289
264
}
290
265
}
291
266
}
292
267
293
- impl fmt:: Display for OpSpec {
294
- fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
295
- write ! ( f, "{}" , self . format_concise( ) )
296
- }
297
- }
298
-
299
268
#[ derive( Debug , Clone , Serialize , Deserialize , Default ) ]
300
269
pub struct SourceRefreshOptions {
301
270
pub refresh_interval : Option < std:: time:: Duration > ,
@@ -319,53 +288,43 @@ pub struct ImportOpSpec {
319
288
pub refresh_options : SourceRefreshOptions ,
320
289
}
321
290
322
- impl ImportOpSpec {
323
- pub fn format ( & self , mode : SpecFormatMode ) -> String {
324
- let source = match mode {
325
- SpecFormatMode :: Concise => self . source . format_concise ( ) ,
326
- SpecFormatMode :: Verbose => self . source . format_verbose ( ) ,
327
- } ;
291
+ impl SpecFormatter for ImportOpSpec {
292
+ fn format ( & self , mode : OutputMode ) -> String {
293
+ let source = self . source . format ( mode) ;
328
294
format ! ( "source={}, refresh={}" , source, self . refresh_options)
329
295
}
330
296
}
331
297
332
298
impl fmt:: Display for ImportOpSpec {
333
299
fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
334
- write ! ( f, "{}" , self . format( SpecFormatMode :: Concise ) )
300
+ write ! ( f, "{}" , self . format( OutputMode :: Concise ) )
335
301
}
336
302
}
337
303
304
+ /// Transform data using a given operator.
338
305
#[ derive( Debug , Clone , Serialize , Deserialize ) ]
339
306
pub struct TransformOpSpec {
340
307
pub inputs : Vec < OpArgBinding > ,
341
308
pub op : OpSpec ,
342
309
}
343
310
344
- impl TransformOpSpec {
345
- pub fn format ( & self , mode : SpecFormatMode ) -> String {
311
+ impl SpecFormatter for TransformOpSpec {
312
+ fn format ( & self , mode : OutputMode ) -> String {
346
313
let inputs = self
347
314
. inputs
348
315
. iter ( )
349
316
. map ( ToString :: to_string)
350
317
. collect :: < Vec < _ > > ( )
351
318
. join ( "," ) ;
352
- let op_str = match mode {
353
- SpecFormatMode :: Concise => self . op . format_concise ( ) ,
354
- SpecFormatMode :: Verbose => self . op . format_verbose ( ) ,
355
- } ;
319
+ let op_str = self . op . format ( mode) ;
356
320
match mode {
357
- SpecFormatMode :: Concise => format ! ( "op={}, inputs={}" , op_str, inputs) ,
358
- SpecFormatMode :: Verbose => format ! ( "op={}, inputs=[{}]" , op_str, inputs) ,
321
+ OutputMode :: Concise => format ! ( "op={}, inputs={}" , op_str, inputs) ,
322
+ OutputMode :: Verbose => format ! ( "op={}, inputs=[{}]" , op_str, inputs) ,
359
323
}
360
324
}
361
325
}
362
326
363
- impl fmt:: Display for TransformOpSpec {
364
- fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
365
- write ! ( f, "{}" , self . format( SpecFormatMode :: Concise ) )
366
- }
367
- }
368
-
327
+ /// Apply reactive operations to each row of the input field.
369
328
#[ derive( Debug , Clone , Serialize , Deserialize ) ]
370
329
pub struct ForEachOpSpec {
371
330
/// Mapping that provides a table to apply reactive operations to.
@@ -377,21 +336,18 @@ impl ForEachOpSpec {
377
336
pub fn get_label ( & self ) -> String {
378
337
format ! ( "Loop over {}" , self . field_path)
379
338
}
339
+ }
380
340
381
- pub fn format ( & self , mode : SpecFormatMode ) -> String {
341
+ impl SpecFormatter for ForEachOpSpec {
342
+ fn format ( & self , mode : OutputMode ) -> String {
382
343
match mode {
383
- SpecFormatMode :: Concise => self . get_label ( ) ,
384
- SpecFormatMode :: Verbose => format ! ( "field={}" , self . field_path) ,
344
+ OutputMode :: Concise => self . get_label ( ) ,
345
+ OutputMode :: Verbose => format ! ( "field={}" , self . field_path) ,
385
346
}
386
347
}
387
348
}
388
349
389
- impl fmt:: Display for ForEachOpSpec {
390
- fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
391
- write ! ( f, "field={}" , self . field_path)
392
- }
393
- }
394
-
350
+ /// Emit data to a given collector at the given scope.
395
351
#[ derive( Debug , Clone , Serialize , Deserialize ) ]
396
352
pub struct CollectOpSpec {
397
353
/// Field values to be collected.
@@ -405,17 +361,17 @@ pub struct CollectOpSpec {
405
361
pub auto_uuid_field : Option < FieldName > ,
406
362
}
407
363
408
- impl CollectOpSpec {
409
- pub fn format ( & self , mode : SpecFormatMode ) -> String {
364
+ impl SpecFormatter for CollectOpSpec {
365
+ fn format ( & self , mode : OutputMode ) -> String {
410
366
let uuid = self . auto_uuid_field . as_deref ( ) . unwrap_or ( "none" ) ;
411
367
match mode {
412
- SpecFormatMode :: Concise => {
368
+ OutputMode :: Concise => {
413
369
format ! (
414
370
"collector={}, input={}, uuid={}" ,
415
371
self . collector_name, self . input, uuid
416
372
)
417
373
}
418
- SpecFormatMode :: Verbose => {
374
+ OutputMode :: Verbose => {
419
375
format ! (
420
376
"scope={}, collector={}, input=[{}], uuid={}" ,
421
377
self . scope_name, self . collector_name, self . input, uuid
@@ -425,12 +381,6 @@ impl CollectOpSpec {
425
381
}
426
382
}
427
383
428
- impl fmt:: Display for CollectOpSpec {
429
- fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
430
- write ! ( f, "{}" , self . format( SpecFormatMode :: Concise ) )
431
- }
432
- }
433
-
434
384
#[ derive( Debug , Clone , Copy , Serialize , Deserialize , PartialEq , Eq ) ]
435
385
pub enum VectorSimilarityMetric {
436
386
CosineSimilarity ,
@@ -485,6 +435,7 @@ impl fmt::Display for IndexOptions {
485
435
}
486
436
}
487
437
438
+ /// Store data to a given sink.
488
439
#[ derive( Debug , Clone , Serialize , Deserialize ) ]
489
440
pub struct ExportOpSpec {
490
441
pub collector_name : FieldName ,
@@ -493,29 +444,21 @@ pub struct ExportOpSpec {
493
444
pub setup_by_user : bool ,
494
445
}
495
446
496
- impl ExportOpSpec {
497
- pub fn format ( & self , mode : SpecFormatMode ) -> String {
498
- let target_str = match mode {
499
- SpecFormatMode :: Concise => self . target . format_concise ( ) ,
500
- SpecFormatMode :: Verbose => self . target . format_verbose ( ) ,
501
- } ;
447
+ impl SpecFormatter for ExportOpSpec {
448
+ fn format ( & self , mode : OutputMode ) -> String {
449
+ let target_str = self . target . format ( mode) ;
502
450
let base = format ! (
503
451
"collector={}, target={}, {}" ,
504
452
self . collector_name, target_str, self . index_options
505
453
) ;
506
454
match mode {
507
- SpecFormatMode :: Concise => base,
508
- SpecFormatMode :: Verbose => format ! ( "{}, setup_by_user={}" , base, self . setup_by_user) ,
455
+ OutputMode :: Concise => base,
456
+ OutputMode :: Verbose => format ! ( "{}, setup_by_user={}" , base, self . setup_by_user) ,
509
457
}
510
458
}
511
459
}
512
460
513
- impl fmt:: Display for ExportOpSpec {
514
- fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
515
- write ! ( f, "{}" , self . format( SpecFormatMode :: Concise ) )
516
- }
517
- }
518
-
461
+ /// A reactive operation reacts on given input values.
519
462
#[ derive( Debug , Clone , Serialize , Deserialize ) ]
520
463
#[ serde( tag = "action" ) ]
521
464
pub enum ReactiveOpSpec {
@@ -524,25 +467,19 @@ pub enum ReactiveOpSpec {
524
467
Collect ( CollectOpSpec ) ,
525
468
}
526
469
527
- impl ReactiveOpSpec {
528
- pub fn format ( & self , mode : SpecFormatMode ) -> String {
470
+ impl SpecFormatter for ReactiveOpSpec {
471
+ fn format ( & self , mode : OutputMode ) -> String {
529
472
match self {
530
473
ReactiveOpSpec :: Transform ( t) => format ! ( "Transform: {}" , t. format( mode) ) ,
531
474
ReactiveOpSpec :: ForEach ( fe) => match mode {
532
- SpecFormatMode :: Concise => format ! ( "{}" , fe. get_label( ) ) ,
533
- SpecFormatMode :: Verbose => format ! ( "ForEach: {}" , fe. format( mode) ) ,
475
+ OutputMode :: Concise => format ! ( "{}" , fe. get_label( ) ) ,
476
+ OutputMode :: Verbose => format ! ( "ForEach: {}" , fe. format( mode) ) ,
534
477
} ,
535
478
ReactiveOpSpec :: Collect ( c) => format ! ( "Collect: {}" , c. format( mode) ) ,
536
479
}
537
480
}
538
481
}
539
482
540
- impl fmt:: Display for ReactiveOpSpec {
541
- fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
542
- write ! ( f, "{}" , self . format( SpecFormatMode :: Concise ) )
543
- }
544
- }
545
-
546
483
#[ derive( Debug , Clone , Serialize , Deserialize ) ]
547
484
pub struct ReactiveOpScope {
548
485
pub name : ScopeName ,
@@ -556,6 +493,7 @@ impl fmt::Display for ReactiveOpScope {
556
493
}
557
494
}
558
495
496
+ /// A flow defines the rule to sync data from given sources to given sinks with given transformations.
559
497
#[ derive( Debug , Clone , Serialize , Deserialize ) ]
560
498
pub struct FlowInstanceSpec {
561
499
/// Name of the flow instance.
0 commit comments