-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrulesconverter.py
389 lines (361 loc) · 17.4 KB
/
rulesconverter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
##
# @file rulesconverter.py
# @author Ankit Srivastava <asrivast@gatech.edu>
#
# Copyright 2018 Georgia Institute of Technology
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import defaultdict
import os
import re
import sys
from rulesanml import RulesAnml, AnmlException
class RulesConverter(object):
"""
Class for converting Snort rules to ANML-NFA.
"""
# list of Snort keywords that are not supported
_unsupportedKeywords = (
'byte_test',
'byte_jump',
'byte_extract',
)
# map of modifier keywords to the corresponding bucket and PCRE keywords
_keywordsMap = {
'http_client_body' : ('', 'P'),
'http_cookie' : ('', 'C'),
'http_raw_cookie' : ('http_cookie', 'K'),
'http_header' : ('', 'H'),
'http_raw_header' : ('http_header', 'D'),
'http_method' : ('', 'M'),
'http_uri' : ('', 'U'),
'http_raw_uri' : ('http_uri', 'I'),
'http_stat_code' : ('', 'S'),
'http_stat_msg' : ('', 'Y'),
'pkt_data' : ('', ''),
'file_data' : ('', ''),
}
# compiled patterns for matching and extracting patterns from rules
_optionPattern = re.compile(r'\((?P<options>.* (?:content|pcre):.*)\)')
_unsupportedPattern = re.compile(r'(?P<unsupported>%s)'%('|'.join(_unsupportedKeywords)))
_sidPattern = re.compile(r'sid:(?P<sid>\d+);')
_genericPattern = re.compile(r'(?P<content>(?P<type>content|pcre):.*?)(?=content:|pcre:|$)')
_genericPcrePattern = re.compile(r'(?P<pcre>pcre:"/.*/\w*)(?P<modifier>%s)(?P<suffix>\w*")'%('|'.join(sm[-1] for sm in _keywordsMap.itervalues() if sm[-1] != '')))
_keywordsPattern = re.compile(r'(?P<keyword>%s);'%('|'.join(_keywordsMap.iterkeys())))
_contentPattern = re.compile(r'content:(?P<negation>!?)"(?P<string>.*)";')
_paramPattern = re.compile(r'(?P<name>offset|depth|distance|within):(?P<value>\d+)')
_pcrePattern = re.compile(r'pcre:(?P<negation>!?)"/(?P<pattern>.*?)[/]?(?P<modifiers>\w*)";')
_escapePattern = re.compile(r'(\.|\^|\$|\*|\+|\?|\(|\)|\{|\[|\\|\/)')
_lookaheadPattern = re.compile('(\(\?=.*\))')
_pipePattern = re.compile(r'(\|(?P<suffix>(?:[A-F\d]{2} ?)*)\|)')
# cached modifier keyword map
_modifierKeywordsMap = None
# print error messages
_printMessages = True
@classmethod
def enableErrorMessages(cls):
cls._printMessages = True
@classmethod
def disableErrorMessages(cls):
cls._printMessages = False
@classmethod
def _error_message(cls, message):
if cls._printMessages:
sys.stderr.write(message)
sys.stderr.flush()
@classmethod
def _print_statistics(cls, totalRules, patternRules, supportedRules, convertedRules):
if cls._printMessages:
print 'Total number of rules:', totalRules
print 'Number of rules with pattern matching keywords:', patternRules
print 'Number of supported rules:', supportedRules
print 'Number of converted rules:', convertedRules
@classmethod
def _get_modifier_keyword(cls, modifier):
"""
Creates map from PCRE modifier to the corresponding keyword,
if not already created, and returns the map.
"""
if cls._modifierKeywordsMap is None:
cls._modifierKeywordsMap = {}
for keyword, value in cls._keywordsMap.iteritems():
if value[-1]:
cls._modifierKeywordsMap[value[-1]] = keyword
return cls._modifierKeywordsMap[modifier]
@classmethod
def _get_pattern_matching_rules(cls, rulesFile):
"""
Extracts all the rules with pattern matching keywords.
"""
ruleCount = 0
fileRules = []
for rule in rulesFile:
rule = rule.strip()
if not rule or rule[0] == '#':
# skip commented rules, denoted by '#'
# also skip empty lines
continue
ruleCount += 1
matched = cls._optionPattern.search(rule)
if matched is None:
cls._error_message("Skipping the following rule as it doesn't have any pattern matching keywords.\n%s\n\n"%(rule))
else:
fileRules.append(rule)
return fileRules, ruleCount
@classmethod
def _get_supported_rules(cls, allRules):
"""
Filters all the rules with unsupported keywords.
"""
supportedRules = []
for rule in allRules:
matched = cls._unsupportedPattern.search(rule)
if matched is not None:
cls._error_message('Skipping the following rule as the keyword "%s" is not supported.\n%s\n\n'%(matched.group('unsupported'), rule))
else:
supportedRules.append(rule)
return supportedRules
@classmethod
def _get_all_rules(cls, rulesFiles):
"""
Gets all the supported rules from the rules file(s).
"""
totalRuleCount = 0
patternRuleCount = 0
supportedRules = []
for f in rulesFiles:
with open(f, 'rb') as rulesFile:
fileRules, fileRuleCount = cls._get_pattern_matching_rules(rulesFile)
totalRuleCount += fileRuleCount
patternRuleCount += len(fileRules)
fileSupportedRules = cls._get_supported_rules(fileRules)
supportedRules.extend(fileSupportedRules)
return supportedRules, totalRuleCount, patternRuleCount
def __init__(self, directory, maxStes, maxRepeats, independent, negations, backreferences, compile):
"""
Constructor. Stores some of the program options.
"""
self._directory = directory
self._independent = independent
self._negations = negations
self._compile = compile
self._sids = set()
self._unsupported = set()
self._anml = RulesAnml(directory, maxStes, maxRepeats, backreferences)
self._patternCount = defaultdict(int)
def _combine_independent_patterns(self, independentPatterns):
"""
Combines independent patterns provided as a list.
"""
patternString = ''
numPatterns = len(independentPatterns)
if numPatterns > 1:
for p in xrange(0, numPatterns - 1):
if independentPatterns[p][0] == '^':
if independentPatterns[-1][0] != '^':
temp = independentPatterns[-1]
independentPatterns[-1] = independentPatterns[p]
independentPatterns[p] = temp
else:
independentPatterns[p] = '.*' + independentPatterns[p]
patternString += '(?=%s)'%(independentPatterns[p])
if independentPatterns[-1][0] != '^':
patternString += '.*'
patternString += independentPatterns[-1]
return patternString
def _get_independent_patterns(self, patterns):
"""
Extracts indepdent patterns from given content/pcre for a rule.
"""
independentPatterns = []
numLookaheads = 0
for p in patterns:
relative = False
thisModifiers = ''
thisPattern = ''
negation = ''
isContent = False
if p.startswith('content'):
isContent = True
content = self._contentPattern.search(p)
if content is not None:
offset = 0
depth = -1
negation = content.group('negation')
contentString, escapePatternCount = self._escapePattern.subn(lambda m: '\\' + m.group(1), content.group('string'))
class PipeSubFunc(object):
_hexPattern = re.compile(r'([\dA-F]{2}) ?')
def __init__(self):
self.hexPatternCount = 0
def __call__(self, m):
subString, subCount = self._hexPattern.subn(lambda m : r'\x' + m.group(1), m.group('suffix'))
self.hexPatternCount += subCount
return subString
pipeSubFunc = PipeSubFunc()
contentString, subCount = self._pipePattern.subn(pipeSubFunc, contentString)
if p.find('nocase;') != -1:
thisModifiers = 'i'
for param in self._paramPattern.finditer(p):
name = param.group('name')
value = int(param.group('value'))
if value < 0:
raise RuntimeError, 'Handling of negative parameter values is not implemented'
offset = value if name in ['offset', 'distance'] else offset
depth = value if name in ['depth', 'within'] else depth
relative = True if name in ['distance', 'within'] else relative
ps = []
if offset != 0 or depth != -1:
contentSize = len(contentString) - escapePatternCount - (pipeSubFunc.hexPatternCount * 3)
if depth != -1 and depth < contentSize:
raise RuntimeError, 'Encountered depth/within less than content string length'
if not relative:
ps.append('^')
end = (offset + depth) - contentSize if depth != -1 else 0
if offset > 0 or end > offset:
ps.append('.{%d'%offset)
if end > offset:
ps.append(',%d'%end)
ps.append('}')
if depth == -1:
ps.append('.*')
elif relative:
ps.append('.*')
ps.append(contentString)
thisPattern = ''.join(ps)
else:
raise RuntimeError, "Provided content pattern didn't match the standard pattern"
else:
matched = self._pcrePattern.search(p)
if matched is not None:
negation = matched.group('negation')
thisModifiers = matched.group('modifiers')
if thisModifiers.find('R') != -1:
thisModifiers = thisModifiers.replace('R', '')
relative = True
if thisModifiers.find('B') != -1:
print p
# 'O' is fast pattern matching modifier; we don't need it
thisModifiers = thisModifiers.replace('O', '')
# 'G' is same as 'U' in PCRE, for some reason
thisModifiers = thisModifiers.replace('G', 'U')
thisPattern = matched.group('pattern')
numLookaheads += self._lookaheadPattern.subn('', thisPattern)[1]
else:
raise RuntimeError, "Provided pcre pattern didn't match the standard pattern"
negation = bool(negation)
if negation and not self._negations:
raise RuntimeError, "Can't handle negations"
if relative and len(independentPatterns) > 0:
prevPattern, prevModifiers = independentPatterns[-1][0]
if negation is not independentPatterns[-1][1]:
if not negation:
raise RuntimeError, 'Unable to handle dependence on negative expressions'
if isContent:
if depth != -1:
if independentPatterns[-1][2] is None:
independentPatterns[-1][2] = ('/%s/%s'%(thisPattern, thisModifiers), offset + depth)
else:
raise RuntimeError, 'Unable to handle more than one dependent negations'
else:
raise RuntimeError, 'Unable to handle dependent unbounded negations'
else:
raise RuntimeError, 'Unable to handle dependent negations of PCRE type'
elif independentPatterns[-1][2] is not None:
raise RuntimeError, 'Unable to add dependent expression to an expression with negated dependent'
elif thisModifiers != prevModifiers:
prevPattern = '(?%s:%s)'%(prevModifiers, prevPattern)
thisPattern = '(?%s:%s)'%(thisModifiers, thisPattern)
independentPatterns[-1][0] = (prevPattern + thisPattern, '')
else:
independentPatterns[-1][0] = ('%s(?:%s)'%(independentPatterns[-1][0][0], thisPattern), thisModifiers)
else:
independentPatterns.append([[thisPattern, thisModifiers], negation, None])
return [('/%s/%s'%tuple(pattern), negation, dependent) for pattern, negation, dependent in independentPatterns]
def convert(self, rulesFiles):
"""
Convert all the rules in given rules files to the corresponding ANML-NFA or PCRE.
"""
outputFiles = {}
sids = set()
unsupported = set()
allRules, totalRuleCount, patternRuleCount = self._get_all_rules(rulesFiles)
patternCount = defaultdict(int)
for rule in allRules:
matched = self._sidPattern.search(rule)
if matched is None:
raise RuntimeError, 'Encountered a rule with no SID'
sid = int(matched.group('sid'))
sids.add(sid)
contentVectors = defaultdict(list)
for pattern in self._genericPattern.finditer(rule):
keyword = 'general'
raw = False
thisContent = pattern.group('content')
if pattern.group('type') == 'content':
matched = self._keywordsPattern.search(thisContent)
if matched is not None:
keyword = matched.group('keyword')
else:
matched = self._genericPcrePattern.search(thisContent)
if matched is not None:
pcreString = matched.group('pcre') + matched.group('suffix')
contentString = self._genericPcrePattern.sub('', thisContent, count = 1)
thisContent = pcreString + contentString
keyword = self._get_modifier_keyword(matched.group('modifier'))
raw = rule.find('rawbytes;') != -1
if keyword in self._keywordsMap and self._keywordsMap[keyword][0]:
raw = raw or bool(self._keywordsMap[keyword][0])
keyword = self._keywordsMap[keyword][0]
contentVectors[(keyword, raw)].append(thisContent)
convertedStrings = {}
handled = True
for bucket, patterns in contentVectors.iteritems():
try:
if sid in [26242, 20207, 26852, 26853, 27133, 27829, 27830]:
raise RuntimeError, "Skipping rule because it takes LOT of time in compilation"
independentPatterns = self._get_independent_patterns(patterns)
if not self._independent and len(independentPatterns) > 1:
raise RuntimeError, "Can't handle multiple independent patterns per rule"
convertedStrings[bucket] = independentPatterns
except RuntimeError, e:
unsupported.add(sid)
self._error_message('\nGetting pattern for rule with SID %d failed.\n%s\n'%(sid, str(e)))
handled = False
break
if not handled:
continue
for bucket, patterns in convertedStrings.iteritems():
keyword = bucket[0] + '_raw' if bucket[1] else bucket[0]
try:
self._anml.add(keyword, sid, patterns)
except AnmlException, e:
unsupported.add(sid)
self._error_message(str(e))
else:
self._patternCount[keyword] += len(patterns)
#writeString = '%d: %s'%(sid, patterns[0])
#if self._writeFiles and keyword not in outputFiles:
#outputFiles[keyword] = open(keyword + '.txt', 'wb')
#if self._writeFiles:
#outputFiles[keyword].write(writeString + '\n')
#else:
#print writeString
self._print_statistics(totalRuleCount, patternRuleCount, len(allRules), len(sids - unsupported))
#print self._patternCount
def export(self):
"""
Write out the ANML-NFA or the AP-FSM to the given directory.
"""
self._anml.export(self._directory)
if self._compile:
self._anml.compile(self._directory)