在 OSSIM 传感器在通过 GET 框架实现 OSSIM 代理和 OSSIM 服务器之间通信协议和数据格式的转换. 下面我们简要看一下 ossim-agent 脚本:
- #!/usr/bin/python -OOt
- import sys
- sys.path.append('/usr/share/ossim-agent/')
- sys.path.append('/usr/local/share/ossim-agent/')
- from ossim_agent.Agent import Agent
- agent = Agent()
- agent.main()
这里需要 GET 作为 OSSIM 代理向 OSSIM 服务器输送数据. 实现紧密整合所需的两个主要操作是 "生成"(或)OSSIM 兼容事件的 "映射 Mapping") 和此类数据向 OSSIM 的 "传输" 服务器. 它负责此类操作的 GET 框架的两个组件是 EventHandler 和 Sender Agent, 如图 1 所示.
图 1 将 Get 框架内容集成到 OSSIM
Event Handler 的主要任务是映射数据源插件采集的事件到 SIEM 实例警报的 OSSIM 标准化事件格式. 为了执行这样的过程, 原始消息经历由 RAW LOG 转换为现有归一化数据字段格式的一个转变; 在上图中我们将这些机制表示为 "归一化 Normalization" 和 "OSSIM 消息". 部分日志归一化代码:
- from Logger import Logger
- from time import mktime, strptime
- logger = Logger.logger
- class Event:
- EVENT_TYPE = 'event'
- EVENT_ATTRS = [
- "type",
- "date",
- "sensor",
- "interface",
- "plugin_id",
- "plugin_sid",
- "priority",
- "protocol",
- "src_ip",
- "src_port",
- "dst_ip",
- "dst_port",
- "username",
- "password",
- "filename",
- "userdata1",
- "userdata2",
- "userdata3",
- "userdata4",
- "userdata5",
- "userdata6",
- "userdata7",
- "userdata8",
- "userdata9",
- "occurrences",
- "log",
- "data",
- "snort_sid", # snort specific
- "snort_cid", # snort specific
- "fdate",
- "tzone"
- ]
- def __init__(self):
- self.event = {}
- self.event["event_type"] = self.EVENT_TYPE
- def __setitem__(self, key, value):
- if key in self.EVENT_ATTRS:
- self.event[key] = self.sanitize_value(value)
- if key == "date":
- # The date in seconds anf fdate as string
- self.event["fdate"]=self.event[key]
- try:
- self.event["date"]=int(mktime(strptime(self.event[key],"%Y-%m-%d %H:%M:%S")))
- except:
- logger.warning("There was an error parsing date (%s)" % (self.event[key]))
- elif key != 'event_type':
- logger.warning("Bad event attribute: %s" % (key))
- def __getitem__(self, key):
- return self.event.get(key, None)
- # 事件表示
- def __repr__(self):
- event = self.EVENT_TYPE
- for attr in self.EVENT_ATTRS:
- if self[attr]:
- event += '%s="%s"' % (attr, self[attr])
- return event + "\n"
- # 返回内部哈希值
- def dict(self):
- return self.event
- def sanitize_value(self, string):
- return str(string).strip().replace("\"", "\\\"").replace("'","")
- class EventOS(Event):
- EVENT_TYPE = 'host-os-event'
- EVENT_ATTRS = [
- "host",
- "os",
- "sensor",
- "interface",
- "date",
- "plugin_id",
- "plugin_sid",
- "occurrences",
- "log",
- "fdate",
- ]
- class EventMac(Event):
- EVENT_TYPE = 'host-mac-event'
- EVENT_ATTRS = [
- "host",
- "mac",
- "vendor",
- "sensor",
- "interface",
- "date",
- "plugin_id",
- "plugin_sid",
- "occurrences",
- "log",
- "fdate",
- ]
- class EventService(Event):
- EVENT_TYPE = 'host-service-event'
- EVENT_ATTRS = [
- "host",
- "sensor",
- "interface",
- "port",
- "protocol",
- "service",
- "application",
- "date",
- "plugin_id",
- "plugin_sid",
- "occurrences",
- "log",
- "fdate",
- ]
- class EventHids(Event):
- EVENT_TYPE = 'host-ids-event'
- EVENT_ATTRS = [
- "host",
- "hostname",
- "hids_event_type",
- "target",
- "what",
- "extra_data",
- "sensor",
- "date",
- "plugin_id",
- "plugin_sid",
- "username",
- "password",
- "filename",
- "userdata1",
- "userdata2",
- "userdata3",
- "userdata4",
- "userdata5",
- "userdata6",
- "userdata7",
- "userdata8",
- "userdata9",
- "occurrences",
- "log",
- "fdate",
- ]
- class WatchRule(Event):
- EVENT_TYPE = 'event'
- EVENT_ATTRS = [
- "type",
- "date",
- "fdate",
- "sensor",
- "interface",
- "src_ip",
- "dst_ip",
- "protocol",
- "plugin_id",
- "plugin_sid",
- "condition",
- "value",
- "port_from",
- "src_port",
- "port_to",
- "dst_port",
- "interval",
- "from",
- "to",
- "absolute",
- "log",
- "userdata1",
- "userdata2",
- "userdata3",
- "userdata4",
- "userdata5",
- "userdata6",
- "userdata7",
- "userdata8",
- "userdata9",
- "filename",
- "username",
- ]
- class Snort(Event):
- EVENT_TYPE = 'snort-event'
- EVENT_ATTRS = [
- "sensor",
- "interface",
- "gzipdata",
- "unziplen",
- "event_type",
- "plugin_id",
- "type",
- "occurrences"
- ]
日志编码代码:
- import threading, time
- from Logger import Logger
- logger = Logger.logger
- from Output import Output
- import Config
- import Event
- from Threshold import EventConsolidation
- from Stats import Stats
- from ConnPro import ServerConnPro
- class Detector(threading.Thread):
- def init(self, conf, plugin, conn):
- self._conf = conf
- self._plugin = plugin
- self.os_hash = {}
- self.conn = conn
- self.consolidation = EventConsolidation(self._conf)
- logger.info("Starting detector %s (%s).." % (self._plugin.get("config", "name"),
- self._plugin.get("config", "plugin_id")))
- threading.Thread.__init__(self)
- def _event_os_cached(self, event):
- if isinstance(event, Event.EventOS):
- import string
- current_os = string.join(string.split(event["os"]), ' ')
- previous_os = self.os_hash.get(event["host"], '')
- if current_os == previous_os:
- return True
- else:
- # Fallthrough and add to cache
- self.os_hash[event["host"]] = string.join(string.split(event["os"]), ' ')
- return False
- def _exclude_event(self, event):
- if self._plugin.has_option("config", "exclude_sids"):
- exclude_sids = self._plugin.get("config", "exclude_sids")
- if event["plugin_sid"] in Config.split_sids(exclude_sids):
- logger.debug("Excluding event with" + "plugin_id=%s and plugin_sid=%s" % (event["plugin_id"], event["plugin_sid"]))
- return True
- return False
- def _thresholding(self):
- self.consolidation.process()
- def _plugin_defaults(self, event):
- # 从配置文件中获取默认参数
- if self._conf.has_section("plugin-defaults"):
- # 1) 日期
- default_date_format = self._conf.get("plugin-defaults",
- "date_format")
- if event["date"] is None and default_date_format and 'date' in event.EVENT_ATTRS:
- event["date"] = time.strftime(default_date_format,
- time.localtime(time.time()))
- # 2) 传感器
- default_sensor = self._conf.get("plugin-defaults", "sensor")
- if event["sensor"] is None and default_sensor and 'sensor' in event.EVENT_ATTRS:
- event["sensor"] = default_sensor
- # 3) 网络接口
- default_iface = self._conf.get("plugin-defaults", "interface")
- if event["interface"] is None and default_iface and 'interface' in event.EVENT_ATTRS:
- event["interface"] = default_iface
- # 4) 源 IP
- if event["src_ip"] is None and 'src_ip' in event.EVENT_ATTRS:
- event["src_ip"] = event["sensor"]
- # 5) 时区
- default_tzone = self._conf.get("plugin-defaults", "tzone")
- if event["tzone"] is None and 'tzone' in event.EVENT_ATTRS:
- event["tzone"] = default_tzone
- # 6) sensor,source ip and dest != localhost
- if event["sensor"] in ('127.0.0.1', '127.0.1.1'):
- event["sensor"] = default_sensor
- if event["dst_ip"] in ('127.0.0.1', '127.0.1.1'):
- event["dst_ip"] = default_sensor
- if event["src_ip"] in ('127.0.0.1', '127.0.1.1'):
- event["src_ip"] = default_sensor
- # 检测日志的类型
- if event["type"] is None and 'type' in event.EVENT_ATTRS:
- event["type"] = 'detector'
- return event
- def send_message(self, event):
- if self._event_os_cached(event):
- return
- if self._exclude_event(event):
- return
- #对于一些空属性使用默认值.
- event = self._plugin_defaults(event)
- # 合并之前检查
- if self.conn is not None:
- try:
- self.conn.send(str(event))
- except:
- id = self._plugin.get("config", "plugin_id")
- c = ServerConnPro(self._conf, id)
- self.conn = c.connect(0, 10)
- try:
- self.conn.send(str(event))
- except:
- return
- logger.info(str(event).rstrip())
- elif not self.consolidation.insert(event):
- Output.event(event)
- Stats.new_event(event)
- def stop(self):
- #self.consolidation.clear()
- pass
- # 在子类中重写
- def process(self):
- pass
- def run(self):
- self.process()
- class ParserSocket(Detector):
- def process(self):
- self.process()
- class ParserDatabase(Detector):
- def process(self):
- self.process()
- ... ...
从上可以看出, 传感器的归一化主要负责对每个 LOG 内数据字段进行重新编码, 使其生成一个全新的可能用于发送到 OSSIM 服务器的完整事件. 为达成这种目的 GET 框架中包含了一些特定的功能, 以便将所有的功能转换需要 BASE64 转换的字段."OSSIM 消息" 负责填充 GET 生成的原始事件中不存在的字段. 所以上面讲的 plugin_id,plugin_sid 是用来表示日志消息来源类型和子类型, 这也是生成 SIEM 事件的必填字段. 为事件格式完整
来源: http://www.bubuko.com/infodetail-3204129.html