Skip to content

Commit abbfb97

Browse files
author
Robert (Bobby) Evans
committed
STORM-1617: Release Specific Documentation 0.9.x
Conflicts: .gitignore docs/README.md docs/_config.yml docs/_includes/footer.html docs/_includes/head.html docs/_includes/header.html docs/_layouts/about.html docs/_layouts/default.html docs/_layouts/documentation.html docs/_layouts/page.html docs/_layouts/post.html docs/assets/css/bootstrap.css docs/assets/css/bootstrap.css.map docs/assets/js/bootstrap.min.js docs/images/logos/alibaba.jpg docs/images/logos/groupon.jpg docs/images/logos/parc.png docs/images/logos/webmd.jpg docs/images/topology.png Conflicts: .gitignore docs/README.md docs/STORM-UI-REST-API.md docs/_config.yml docs/_includes/footer.html docs/_includes/head.html docs/_includes/header.html docs/_layouts/about.html docs/_layouts/default.html docs/_layouts/documentation.html docs/_layouts/page.html docs/_layouts/post.html docs/assets/css/bootstrap.css docs/assets/css/bootstrap.css.map docs/assets/js/bootstrap.min.js docs/images/logos/alibaba.jpg docs/images/logos/groupon.jpg docs/images/logos/parc.png docs/images/logos/webmd.jpg
1 parent 367464a commit abbfb97

File tree

130 files changed

+15049
-0
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

130 files changed

+15049
-0
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -40,3 +40,4 @@ metastore_db
4040
.classpath
4141
logs
4242
build
43+
/docs/javadocs
+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
---
2+
layout: documentation
3+
---
4+
[Storm's acker](https://github.com/apache/incubator-storm/blob/46c3ba7/storm-core/src/clj/backtype/storm/daemon/acker.clj#L28) tracks completion of each tupletree with a checksum hash: each time a tuple is sent, its value is XORed into the checksum, and each time a tuple is acked its value is XORed in again. If all tuples have been successfully acked, the checksum will be zero (the odds that the checksum will be zero otherwise are vanishingly small).
5+
6+
You can read a bit more about the [reliability mechanism](Guaranteeing-message-processing.html#what-is-storms-reliability-api) elsewhere on the wiki -- this explains the internal details.
7+
8+
### acker `execute()`
9+
10+
The acker is actually a regular bolt, with its [execute method](https://github.com/apache/incubator-storm/blob/46c3ba7/storm-core/src/clj/backtype/storm/daemon/acker.clj#L36) defined withing `mk-acker-bolt`. When a new tupletree is born, the spout sends the XORed edge-ids of each tuple recipient, which the acker records in its `pending` ledger. Every time an executor acks a tuple, the acker receives a partial checksum that is the XOR of the tuple's own edge-id (clearing it from the ledger) and the edge-id of each downstream tuple the executor emitted (thus entering them into the ledger).
11+
12+
This is accomplished as follows.
13+
14+
On a tick tuple, just advance pending tupletree checksums towards death and return. Otherwise, update or create the record for this tupletree:
15+
16+
* on init: initialize with the given checksum value, and record the spout's id for later.
17+
* on ack: xor the partial checksum into the existing checksum value
18+
* on fail: just mark it as failed
19+
20+
Next, [put the record](https://github.com/apache/incubator-storm/blob/46c3ba7/storm-core/src/clj/backtype/storm/daemon/acker.clj#L50)), into the RotatingMap (thus resetting is countdown to expiry) and take action:
21+
22+
* if the total checksum is zero, the tupletree is complete: remove it from the pending collection and notify the spout of success
23+
* if the tupletree has failed, it is also complete: remove it from the pending collection and notify the spout of failure
24+
25+
Finally, pass on an ack of our own.
26+
27+
### Pending tuples and the `RotatingMap`
28+
29+
The acker stores pending tuples in a [`RotatingMap`](https://github.com/apache/incubator-storm/blob/master/storm-core/src/jvm/backtype/storm/utils/RotatingMap.java#L19), a simple device used in several places within Storm to efficiently time-expire a process.
30+
31+
The RotatingMap behaves as a HashMap, and offers the same O(1) access guarantees.
32+
33+
Internally, it holds several HashMaps ('buckets') of its own, each holding a cohort of records that will expire at the same time. Let's call the longest-lived bucket death row, and the most recent the nursery. Whenever a value is `.put()` to the RotatingMap, it is relocated to the nursery -- and removed from any other bucket it might have been in (effectively resetting its death clock).
34+
35+
Whenever its owner calls `.rotate()`, the RotatingMap advances each cohort one step further towards expiration. (Typically, Storm objects call rotate on every receipt of a system tick stream tuple.) If there are any key-value pairs in the former death row bucket, the RotatingMap invokes a callback (given in the constructor) for each key-value pair, letting its owner take appropriate action (eg, failing a tuple.
36+

docs/Clojure-DSL.md

+264
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
---
2+
layout: documentation
3+
---
4+
Storm comes with a Clojure DSL for defining spouts, bolts, and topologies. The Clojure DSL has access to everything the Java API exposes, so if you're a Clojure user you can code Storm topologies without touching Java at all. The Clojure DSL is defined in the source in the [backtype.storm.clojure](https://github.com/apache/incubator-storm/blob/0.5.3/src/clj/backtype/storm/clojure.clj) namespace.
5+
6+
This page outlines all the pieces of the Clojure DSL, including:
7+
8+
1. Defining topologies
9+
2. `defbolt`
10+
3. `defspout`
11+
4. Running topologies in local mode or on a cluster
12+
5. Testing topologies
13+
14+
### Defining topologies
15+
16+
To define a topology, use the `topology` function. `topology` takes in two arguments: a map of "spout specs" and a map of "bolt specs". Each spout and bolt spec wires the code for the component into the topology by specifying things like inputs and parallelism.
17+
18+
Let's take a look at an example topology definition [from the storm-starter project](https://github.com/nathanmarz/storm-starter/blob/master/src/clj/storm/starter/clj/word_count.clj):
19+
20+
```clojure
21+
(topology
22+
{"1" (spout-spec sentence-spout)
23+
"2" (spout-spec (sentence-spout-parameterized
24+
["the cat jumped over the door"
25+
"greetings from a faraway land"])
26+
:p 2)}
27+
{"3" (bolt-spec {"1" :shuffle "2" :shuffle}
28+
split-sentence
29+
:p 5)
30+
"4" (bolt-spec {"3" ["word"]}
31+
word-count
32+
:p 6)})
33+
```
34+
35+
The maps of spout and bolt specs are maps from the component id to the corresponding spec. The component ids must be unique across the maps. Just like defining topologies in Java, component ids are used when declaring inputs for bolts in the topology.
36+
37+
#### spout-spec
38+
39+
`spout-spec` takes as arguments the spout implementation (an object that implements [IRichSpout](javadocs/backtype/storm/topology/IRichSpout.html)) and optional keyword arguments. The only option that exists currently is the `:p` option, which specifies the parallelism for the spout. If you omit `:p`, the spout will execute as a single task.
40+
41+
#### bolt-spec
42+
43+
`bolt-spec` takes as arguments the input declaration for the bolt, the bolt implementation (an object that implements [IRichBolt](javadocs/backtype/storm/topology/IRichBolt.html)), and optional keyword arguments.
44+
45+
The input declaration is a map from stream ids to stream groupings. A stream id can have one of two forms:
46+
47+
1. `[==component id== ==stream id==]`: Subscribes to a specific stream on a component
48+
2. `==component id==`: Subscribes to the default stream on a component
49+
50+
A stream grouping can be one of the following:
51+
52+
1. `:shuffle`: subscribes with a shuffle grouping
53+
2. Vector of field names, like `["id" "name"]`: subscribes with a fields grouping on the specified fields
54+
3. `:global`: subscribes with a global grouping
55+
4. `:all`: subscribes with an all grouping
56+
5. `:direct`: subscribes with a direct grouping
57+
58+
See [Concepts](Concepts.html) for more info on stream groupings. Here's an example input declaration showcasing the various ways to declare inputs:
59+
60+
```clojure
61+
{["2" "1"] :shuffle
62+
"3" ["field1" "field2"]
63+
["4" "2"] :global}
64+
```
65+
66+
This input declaration subscribes to three streams total. It subscribes to stream "1" on component "2" with a shuffle grouping, subscribes to the default stream on component "3" with a fields grouping on the fields "field1" and "field2", and subscribes to stream "2" on component "4" with a global grouping.
67+
68+
Like `spout-spec`, the only current supported keyword argument for `bolt-spec` is `:p` which specifies the parallelism for the bolt.
69+
70+
#### shell-bolt-spec
71+
72+
`shell-bolt-spec` is used for defining bolts that are implemented in a non-JVM language. It takes as arguments the input declaration, the command line program to run, the name of the file implementing the bolt, an output specification, and then the same keyword arguments that `bolt-spec` accepts.
73+
74+
Here's an example `shell-bolt-spec`:
75+
76+
```clojure
77+
(shell-bolt-spec {"1" :shuffle "2" ["id"]}
78+
"python"
79+
"mybolt.py"
80+
["outfield1" "outfield2"]
81+
:p 25)
82+
```
83+
84+
The syntax of output declarations is described in more detail in the `defbolt` section below. See [Using non JVM languages with Storm](Using-non-JVM-languages-with-Storm.html) for more details on how multilang works within Storm.
85+
86+
### defbolt
87+
88+
`defbolt` is used for defining bolts in Clojure. Bolts have the constraint that they must be serializable, and this is why you can't just reify `IRichBolt` to implement a bolt (closures aren't serializable). `defbolt` works around this restriction and provides a nicer syntax for defining bolts than just implementing a Java interface.
89+
90+
At its fullest expressiveness, `defbolt` supports parameterized bolts and maintaining state in a closure around the bolt implementation. It also provides shortcuts for defining bolts that don't need this extra functionality. The signature for `defbolt` looks like the following:
91+
92+
(defbolt _name_ _output-declaration_ *_option-map_ & _impl_)
93+
94+
Omitting the option map is equivalent to having an option map of `{:prepare false}`.
95+
96+
#### Simple bolts
97+
98+
Let's start with the simplest form of `defbolt`. Here's an example bolt that splits a tuple containing a sentence into a tuple for each word:
99+
100+
```clojure
101+
(defbolt split-sentence ["word"] [tuple collector]
102+
(let [words (.split (.getString tuple 0) " ")]
103+
(doseq [w words]
104+
(emit-bolt! collector [w] :anchor tuple))
105+
(ack! collector tuple)
106+
))
107+
```
108+
109+
Since the option map is omitted, this is a non-prepared bolt. The DSL simply expects an implementation for the `execute` method of `IRichBolt`. The implementation takes two parameters, the tuple and the `OutputCollector`, and is followed by the body of the `execute` function. The DSL automatically type-hints the parameters for you so you don't need to worry about reflection if you use Java interop.
110+
111+
This implementation binds `split-sentence` to an actual `IRichBolt` object that you can use in topologies, like so:
112+
113+
```clojure
114+
(bolt-spec {"1" :shuffle}
115+
split-sentence
116+
:p 5)
117+
```
118+
119+
120+
#### Parameterized bolts
121+
122+
Many times you want to parameterize your bolts with other arguments. For example, let's say you wanted to have a bolt that appends a suffix to every input string it receives, and you want that suffix to be set at runtime. You do this with `defbolt` by including a `:params` option in the option map, like so:
123+
124+
```clojure
125+
(defbolt suffix-appender ["word"] {:params [suffix]}
126+
[tuple collector]
127+
(emit-bolt! collector [(str (.getString tuple 0) suffix)] :anchor tuple)
128+
)
129+
```
130+
131+
Unlike the previous example, `suffix-appender` will be bound to a function that returns an `IRichBolt` rather than be an `IRichBolt` object directly. This is caused by specifying `:params` in its option map. So to use `suffix-appender` in a topology, you would do something like:
132+
133+
```clojure
134+
(bolt-spec {"1" :shuffle}
135+
(suffix-appender "-suffix")
136+
:p 10)
137+
```
138+
139+
#### Prepared bolts
140+
141+
To do more complex bolts, such as ones that do joins and streaming aggregations, the bolt needs to store state. You can do this by creating a prepared bolt which is specified by including `{:prepare true}` in the option map. Consider, for example, this bolt that implements word counting:
142+
143+
```clojure
144+
(defbolt word-count ["word" "count"] {:prepare true}
145+
[conf context collector]
146+
(let [counts (atom {})]
147+
(bolt
148+
(execute [tuple]
149+
(let [word (.getString tuple 0)]
150+
(swap! counts (partial merge-with +) {word 1})
151+
(emit-bolt! collector [word (@counts word)] :anchor tuple)
152+
(ack! collector tuple)
153+
)))))
154+
```
155+
156+
The implementation for a prepared bolt is a function that takes as input the topology config, `TopologyContext`, and `OutputCollector`, and returns an implementation of the `IBolt` interface. This design allows you to have a closure around the implementation of `execute` and `cleanup`.
157+
158+
In this example, the word counts are stored in the closure in a map called `counts`. The `bolt` macro is used to create the `IBolt` implementation. The `bolt` macro is a more concise way to implement the interface than reifying, and it automatically type-hints all of the method parameters. This bolt implements the execute method which updates the count in the map and emits the new word count.
159+
160+
Note that the `execute` method in prepared bolts only takes as input the tuple since the `OutputCollector` is already in the closure of the function (for simple bolts the collector is a second parameter to the `execute` function).
161+
162+
Prepared bolts can be parameterized just like simple bolts.
163+
164+
#### Output declarations
165+
166+
The Clojure DSL has a concise syntax for declaring the outputs of a bolt. The most general way to declare the outputs is as a map from stream id a stream spec. For example:
167+
168+
```clojure
169+
{"1" ["field1" "field2"]
170+
"2" (direct-stream ["f1" "f2" "f3"])
171+
"3" ["f1"]}
172+
```
173+
174+
The stream id is a string, while the stream spec is either a vector of fields or a vector of fields wrapped by `direct-stream`. `direct stream` marks the stream as a direct stream (See [Concepts](Concepts.html) and [Direct groupings]() for more details on direct streams).
175+
176+
If the bolt only has one output stream, you can define the default stream of the bolt by using a vector instead of a map for the output declaration. For example:
177+
178+
```clojure
179+
["word" "count"]
180+
```
181+
This declares the output of the bolt as the fields ["word" "count"] on the default stream id.
182+
183+
#### Emitting, acking, and failing
184+
185+
Rather than use the Java methods on `OutputCollector` directly, the DSL provides a nicer set of functions for using `OutputCollector`: `emit-bolt!`, `emit-direct-bolt!`, `ack!`, and `fail!`.
186+
187+
1. `emit-bolt!`: takes as parameters the `OutputCollector`, the values to emit (a Clojure sequence), and keyword arguments for `:anchor` and `:stream`. `:anchor` can be a single tuple or a list of tuples, and `:stream` is the id of the stream to emit to. Omitting the keyword arguments emits an unanchored tuple to the default stream.
188+
2. `emit-direct-bolt!`: takes as parameters the `OutputCollector`, the task id to send the tuple to, the values to emit, and keyword arguments for `:anchor` and `:stream`. This function can only emit to streams declared as direct streams.
189+
2. `ack!`: takes as parameters the `OutputCollector` and the tuple to ack.
190+
3. `fail!`: takes as parameters the `OutputCollector` and the tuple to fail.
191+
192+
See [Guaranteeing message processing](Guaranteeing-message-processing.html) for more info on acking and anchoring.
193+
194+
### defspout
195+
196+
`defspout` is used for defining spouts in Clojure. Like bolts, spouts must be serializable so you can't just reify `IRichSpout` to do spout implementations in Clojure. `defspout` works around this restriction and provides a nicer syntax for defining spouts than just implementing a Java interface.
197+
198+
The signature for `defspout` looks like the following:
199+
200+
(defspout _name_ _output-declaration_ *_option-map_ & _impl_)
201+
202+
If you leave out the option map, it defaults to {:prepare true}. The output declaration for `defspout` has the same syntax as `defbolt`.
203+
204+
Here's an example `defspout` implementation from [storm-starter](https://github.com/nathanmarz/storm-starter/blob/master/src/clj/storm/starter/clj/word_count.clj):
205+
206+
```clojure
207+
(defspout sentence-spout ["sentence"]
208+
[conf context collector]
209+
(let [sentences ["a little brown dog"
210+
"the man petted the dog"
211+
"four score and seven years ago"
212+
"an apple a day keeps the doctor away"]]
213+
(spout
214+
(nextTuple []
215+
(Thread/sleep 100)
216+
(emit-spout! collector [(rand-nth sentences)])
217+
)
218+
(ack [id]
219+
;; You only need to define this method for reliable spouts
220+
;; (such as one that reads off of a queue like Kestrel)
221+
;; This is an unreliable spout, so it does nothing here
222+
))))
223+
```
224+
225+
The implementation takes in as input the topology config, `TopologyContext`, and `SpoutOutputCollector`. The implementation returns an `ISpout` object. Here, the `nextTuple` function emits a random sentence from `sentences`.
226+
227+
This spout isn't reliable, so the `ack` and `fail` methods will never be called. A reliable spout will add a message id when emitting tuples, and then `ack` or `fail` will be called when the tuple is completed or failed respectively. See [Guaranteeing message processing](Guaranteeing-message-processing.html) for more info on how reliability works within Storm.
228+
229+
`emit-spout!` takes in as parameters the `SpoutOutputCollector` and the new tuple to be emitted, and accepts as keyword arguments `:stream` and `:id`. `:stream` specifies the stream to emit to, and `:id` specifies a message id for the tuple (used in the `ack` and `fail` callbacks). Omitting these arguments emits an unanchored tuple to the default output stream.
230+
231+
There is also a `emit-direct-spout!` function that emits a tuple to a direct stream and takes an additional argument as the second parameter of the task id to send the tuple to.
232+
233+
Spouts can be parameterized just like bolts, in which case the symbol is bound to a function returning `IRichSpout` instead of the `IRichSpout` itself. You can also declare an unprepared spout which only defines the `nextTuple` method. Here is an example of an unprepared spout that emits random sentences parameterized at runtime:
234+
235+
```clojure
236+
(defspout sentence-spout-parameterized ["word"] {:params [sentences] :prepare false}
237+
[collector]
238+
(Thread/sleep 500)
239+
(emit-spout! collector [(rand-nth sentences)]))
240+
```
241+
242+
The following example illustrates how to use this spout in a `spout-spec`:
243+
244+
```clojure
245+
(spout-spec (sentence-spout-parameterized
246+
["the cat jumped over the door"
247+
"greetings from a faraway land"])
248+
:p 2)
249+
```
250+
251+
### Running topologies in local mode or on a cluster
252+
253+
That's all there is to the Clojure DSL. To submit topologies in remote mode or local mode, just use the `StormSubmitter` or `LocalCluster` classes just like you would from Java.
254+
255+
To create topology configs, it's easiest to use the `backtype.storm.config` namespace which defines constants for all of the possible configs. The constants are the same as the static constants in the `Config` class, except with dashes instead of underscores. For example, here's a topology config that sets the number of workers to 15 and configures the topology in debug mode:
256+
257+
```clojure
258+
{TOPOLOGY-DEBUG true
259+
TOPOLOGY-WORKERS 15}
260+
```
261+
262+
### Testing topologies
263+
264+
[This blog post](http://www.pixelmachine.org/2011/12/17/Testing-Storm-Topologies.html) and its [follow-up](http://www.pixelmachine.org/2011/12/21/Testing-Storm-Topologies-Part-2.html) give a good overview of Storm's powerful built-in facilities for testing topologies in Clojure.

0 commit comments

Comments
 (0)