企业安全体系架构分析:开发安全架构之可用性架构

0×01 需求背景

   日常扫描行为是一个常见的需求,同时我们希望,可以更方便的进行定制自动化扫描任务制定与执行。我们不具体要求实用的扫描工具系统是什么,开源与商业看具体自己的实际需求情况,我们只是用 AWVS 举一个例子。

AWVS 本身提供了方便的 REST API 对外服务,如何通过设计封装,让 AWVS 执行的高效简单,这篇要完成的一个任务。

如果把 AWVS 换成其它的安装扫描工具,可否按同样的思路降低工具使用的流程复杂度,让安全工具的使用更自动化遍历,最初构建这个项目时考虑的,这次我们通过 AWVS 这个例子,来实践这种可能性。有一个这个基础的设计可以延伸扩展到其它工具,按本案方法进行扩展驱动其功能。   

下面是整体的设计,将 REST API 与 RPC 结合方式,对整个扫描工具进行封装自动化。

现存在一个大家喜欢讨论的问题是 RPC 和 REST 那个好,在我们这里不讨论那个好,按应用场景同时使用了两个技术,REST 做业务逻辑和数据合法性检查,PRC 做功能封装驱动。在做规模的横向扩展的时候,我们可以通过负载的形式,扩大 REST 和 RPC 服务的并性数和可用性。将混合的业务逻辑用 REST 和 RPC 分层的方式时行简化,当然除了好处一定也有基于这种设计产生的其它问题。   

本次代码层底核心是,封装了 AWVS 的 auth 认证和指定扫描特定域名的处理过程,两个主要的「mocker」就是 auth 和 scan, 时序图很显示的就是这些。

0×02 功能实现

具体的实现部分,将 Django Command、Django RPC、Django REST API、PyTEST、FSWatch 的部分进行介绍,会基于整套技术方案,产生其它的驱动方法,本案就是基于 AWVS 展开。最后达到的目地,就将 AWVS 对目标域名的操作扫描任务指定,简化成了一条命令。如果之前还是说部署环境,现在就是具体的业务动作。

1. 功能使用

AWVS 本身提供了 REST API 的接口, 通过进一步的抽象,简化和隐藏了复杂的调用过程。为了便于简单实现对 AWVS 的操作,最后就变成了简单的一条命令调用。

python manage.py dsl -d lua.ren

Django Command 的功能实现,是整个调用时序的入口,假设扫描的需求和设置很简答,只有一个扫描域名的设定。

2. 功能函数

扫描功能实现,是靠整个时序链调用来完成的,如果直接从 Django Command 调用 Django RPC,参于的调用数据总体会比再加入一层 REST API 调用更简单,而整个调用层级的构建,让一个复杂的 API 调用,分层解耦简单化。

对于 AWVS 最核心的驱动函数:一个是授权 auth,另一个就是添加测试任务。

2.1 授权

meta 数据结构中存放的是基本的授权用户信息, email 和 password。

    def auth(self, meta):
        import urllib2        import ssl        import json
        ssl._create_default_https_context = ssl._create_unverified_context
        url_login="https://localhost:3443/api/v1/me/login"
        
        send_headers_login={                'Host': 'localhost:3443',         
               'Accept': 'application/json, text/plain, */*',               
                'Accept-Language': 'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',          
                      'Accept-Encoding': 'gzip, deflate, br',            
                          'Content-Type': 'application/json;charset=utf-8'
                }
        data_login='{"email":"' +meta['email'] + '",' + '"password":"'+ meta['password']+'","remember_me":false}'
        req_login = urllib2.Request(url_login,headers=send_headers_login)
        response_login = urllib2.urlopen(req_login,data_login)
        xauth = response_login.headers['X-Auth']
        COOOOOOOOkie = response_login.headers['Set-Cookie']        print COOOOOOOOkie,xauth     
           return True

2.2 添加扫描任务

用 Auth 取回的 Cookie 信息,再进行 API 的调用,来完玘任务注册。

    def addTarget(self, formaturl):
        url="https://localhost:3443/api/v1/targets"
        send_headers2={ 
        'Host':'servers:3443',        'Accept': 'application/json, text/plain, */*',      
          'Accept-Language': 'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',      
            'Content-Type':'application/json;charset=utf-8',      
              'X-Auth':xauth,        'Cookie':COOOOOOOOkie,
        }        
        try:            for i in formaturl:
                target_url='http://'+i.strip()
                data='{"description":"222","address":"'+target_url+'","criticality":"10"}'
                req = urllib2.Request(url,headers=send_headers2)
                response = urllib2.urlopen(req,data)
                jo=eval(response.read())
                target_id=jo['target_id']
            
         
            url_scan="https://localhost:3443/api/v1/scans"
            headers_scan={           'Host': 'localhost:3443',        
               'Accept': 'application/json, text/plain, */*',         
                 'Accept-Language': 'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3',       
                     'Accept-Encoding': 'gzip, deflate, br',          
                      'Content-Type': 'application/json;charset=utf-8',     
                            'X-Auth':xauth,         
                              'Cookie':COOOOOOOOkie,
            }
            data_scan='{"target_id":'+'\"'+target_id+'\"'+',"profile_id":"11111111-1111-1111-1111-111111111111",
            "schedule":{"disable":false,"start_date":null,"time_sensitive":false},"ui_session_id":
            "66666666666666666666666666666666"}'
            req_scan=urllib2.Request(url_scan,headers=headers_scan)
            response_scan=urllib2.urlopen(req_scan,data_scan)            print response_scan.read()            
        except Exception,e:            print e        return True

这两个函数是最底层的函数,关于 AWVS 的 API 封装 DEMO 网上有,大家可自行参考。

3. 测试用例

如果直接联调,调试成本其实也不低,如果单体程序问题,联调效率会有重复工作的问题。为了更好的理解这套 AWVS 的函数,是如何在当前设计结构中被调用的。我们用 PYTSET 把重点函数做了单体测试。

后续可能会加入其它模块的封装调度,单体测试就变的必须起来。   

3.1 测试认证过程

@pytest.mark.scandef test_5(setup_module):
    import awvs 
    ins = awvs.AWVS()
    ins.auth({"email":"name", "password":"pwd"})    assert True == ret

3.2 测试添加扫描任务过程

@pytest.mark.scandef test_6(setup_module):
    import awvs 
    ins = awvs.AWVS()
    ret = ins.addTarget(['lua.ren\n','candylab.net\n'])    assert True == ret

3.3 添加认证并扫描的过程

@pytest.mark.scandef test_7(setup_module):
    import awvs 
    ins = awvs.AWVS()
    ins.auth({"email":"name", "password":"pwd"})
    ret = ins.addTarget(['lua.ren\n','candylab.net\n'])    assert True == ret

其实认证和扫描的过程,前期是拆开测试的,如果不先认证,基本上就异常了,无法添加扫描任务。单测试用例是为了提供单体质量,提高结合测试的成功效率。

整体测试的还是 auth 函数用户信息字典入参的测试,与 addTarget 函数域名列表的测试。RPC 就更像一个代理人服务程序。

3.4 自动化测试

这个工程使用的测试工具是 pytest。我们想通过自动监听 test.py 的 python 单体测试程序源码的变更,自动调用 pytest 去扫行单体测试脚本。

如果在 linux 平台一下可以使用 tup,是一个很好用的工具。因我们在 mac 环境下扫行单体测试程序,我们使用 fswatch 完成这个功能。

3.4.1 安装 fswatch

brew intall fswatch

如何在 Linux 平台用 tup 其实也很好。

3.4.2 监听脚本

#!/bin/bash DIR=$1 if [ ! -n "$DIR" ] ;then 
    echo "you have not choice Application directory !" 
    exit fi fswatch $DIR | while read file 
do 
    #echo "${file} was modify" >> unittest.log 2>&1 
    echo "${file} was modify" 
    pytest -v -s -m"scan" ${file} done

3.4.3 驱动脚本

#!/bin/bash sh autotest.sh test.py

4. RPC 接口功能

当单体功能达到我们设想的要求时,需要封装一个 RPC 服务对外提供服务。程序越复杂单体测试用例的量就同比量大。   

@jsonrpc_method('myapp.autoscanner')def auto_scanner(request, domain='lua.ren'):
    import awvs 
    ins = awvs.AWVS()
    ins.auth({"email":"name", "password":"pwd"})
    ins.addTask(['lua.ren\n','candylab.net\n'])    return True

RPC 功能相当于把单体调用集成到一个接口,正常一个完整的单体要做入参的检查工作,过滤掉非法入参。

因为我们最开始是考虑用新加的 REST API 作与外部调用者进行通信,在 REST API 做入参检查,并且 REST API 不需求外部调用者调用时,要依赖安全 RPC 客户端。

5. Django Command 功能实现

实现了单体对 AWVS 的封装,并实现 RPC 服务,先不考虑 REST 和前端的控制,实际上我们想当于把 AWVS 的 REST 功能命令行化。

from django.core.management.base import BaseCommand, CommandErrorimport tracebackclass Command(BaseCommand):
    def add_arguments(self, parser):
        parser.add_argument(            '-d',            '--domain',
            action='store',
            dest='domain',
            default='lua.ren',
            help='domain.',
        )    def handle(self, *args, **options):
        try:            if options['domain']:              
          print 'scan domain, %s' % options['domain']           
           from jsonrpc.proxy import ServiceProxy
            s = ServiceProxy('http://localhost:5000/json/') 
            s.myapp.autoscanner(options['domain'])

            self.stdout.write(self.style.SUCCESS(u'命令%s 执行成功, 参数为%s' % (__file__, options['domain']))) 
                   except Exception, ex:
            traceback.print_exc()
            self.stdout.write(self.style.ERROR(u'命令执行出错'))

6. REST API 实现

将功能性的内容用 RPC 实现,将 check 业务划分和检查放到了 REST API 层,这样后端服务调用依赖 RPC Server 和 RPC Client,而 REST API 调用层不用考虑这个问题。

@csrf_exemptdef addItem(request):
    if request.method == 'GET':        return JSONResponse("GET")    
    if request.method == 'POST':
        data = JSONParser().parse(request)
        flg_key = data.has_key('key')        if not flg_key:            
            return JSONResponse('key is empty!')

        access_key = data['key']        if cmp(access_key, "test"):           
         return JSONResponse("access key error.")

        flg_domain = data.has_key('domain')        if not flg_domain:            
            result = {"error":"-1","errmsg":"domain is empty"}           
             return HttpResponse(json.dumps(result,ensure_ascii=False),
             content_type="application/json,charset=utf-8")       
              from jsonrpc.proxy import ServiceProxy
        s = ServiceProxy('http://localhost:5000/json/')        import awvs 
        ins = awvs.AWVS()
        ins.auth({"email":"name", "password":"pwd"})
        ins.addTask(['lua.ren\n','candylab.net\n'])

        result = {"error":"0","errmsg":"none"}       
         return HttpResponse(json.dumps(result,ensure_ascii=False),content_type="application/json,charset=utf-8")

Django REST 让 REST 的实现更便利,这样可以把重点放到业务逻辑检查对接,相对单层的测试更有重点。

REST API 路由可以快速建立。

urlpatterns = [
    url(r'scanner/$', views.addItem),          
]

用 CURL 客户端测试 REST API。

curl -l -H "Content-type: application/json" -X POST -d '{"key":"test","domain":"test.com"}'  127.0.0.1:8080/scanner/

7. 命令行

最终我们实现了 AWVS 的 REST API 的 RPC 和 REST 封装,然后命令行化,当然的其中 RPC 和 REST API 可以其它的地方复用。

7.1 Django Command

python manage.py dsl -d lua.ren

7.2 CURL & REST API

curl -l -H "Content-type: application/json" -X POST -d '{"key":"test","domain":"test.com"}'  127.0.0.1:8080/scanner/

0×03 后记

本篇是听取了 Freebuf 上老师和朋友们的建议反馈,然后产生了这个工程。这些老师朋友都是 SDL 的专家。特别是李老师给个工程起了一个名字叫 semaphore,并 PR。在这个工程的说明中引用了他们的对需求更精准的描述,还有以软件本身的考虑。将 Semaphore 工程中有关 AWVS 的部分,抽出一个演示插件化 RPC 项目:semaphore-awvs-driver, 仅供参考。

*本文原创作者:糖果L5Q,本文属FreeBuf原创奖励计划,未经许可禁止转载

一、 确定业务接口

因为业务的特殊性,业务接口可能不止是web层面的,有些业务为了保证传输速率甚至有可能是udp协议传输,也有些业务走的是socket协议,首先就是要确定你的业务接口走的是什么协议,以下以TCP-HTTP协议为主,其他协议的架构安全会在后期叙述。

二、 业务逻辑设计

通过上面步骤确定好了业务接口后,需要按照业务的逻辑去设计架构(以下均为http接口设计架构),举例,某业务分为以下几点,API接口与客户对接,静态资源展示企业信息,管理接口对应企业内部人员审核等工作,客户接口对应客户的管理工作。在资金并不充足的情况下(不投入厂商安全设备),那么对应的架构应该如何设计呢?

逻辑架构图(可用性部分):

首先要有代理服务器,保证应用服务器不会直接暴露在公网上。其次要有日志服务器,汇总应用日志与攻击日志,防御服务器可以自己组建,各种脚本的匹配来确定请求是否合法。

由互联网访问的请求首先经过代理服务器,这么做的好处在于会保护应用服务器的真实IP与端口,举个例子,应用服务器对外接口是443,那么在代理服务器仅需要代理443端口对应应用服务器的443端口,其他端口全部封闭,那么应用服务器上多余的端口是在互联网无法扫描到的,另外代理服务器上仅做转发,性能上消耗会很小,一台应用服务器上跑一个应用,然后通过代理服务器汇总,会很大程度上方便人员进行统一管理,不会造成应用堆砌的现象。

通过代理服务器转发进来的请求,首先进行行为异常分析,此处可以是使用厂商设备,也可以自研脚本,根据经费情况自行选择合适的方案。

关于防御服务器,有几点想跟大家分享,第一是WAF,第二是防CC攻击。

首先WAF我们都知道是通过正则表达式做内容匹配,如果内容匹配上则判定为攻击,那么就会产生三个问题:

第一是正则的绕过

第二是正则的业务阻断

第三是正则书写本身的bug

第一点举个例子好了,目前开源的WAF中规则写的不错的就是modsecurity了,以它举例,规则基本上每周都会有更新,但是绕过方法还是屡见不鲜,没有哪种防御是能完全阻断攻击的,总会有各种各样没有被发现或已经被发现的漏洞,时常关注信息,及时查漏补缺方能在一定程度上体现安全性。

第二点是所有WAF的一个硬伤,因为业务是不固定的,随时可能有业务信息触发了WAF的规则而导致业务阻断,这也是为什么知名厂商WAF要先观察一段时间调优后才能开启阻断的原因。

第三点还是以modsecurity举例,毕竟是开源代码都能看到,正则表达式本身因为书写的问题,会有很多模糊匹配或者联合匹配,当这些匹配内容过大的时候就会因为变相DoS攻击,这在modsecurity3中也是被证实的,同样的问题在日志写入的时候也会发生,导致i/o读写通道堵塞引起的DoS攻击同样在modsecurity2中被证实。

安全无止境,并不是设备的堆砌,脚本的堆砌就能保证业务的安全可用,这点是我通过以上3个问题重点想要说的。

关于防CC攻击,一般的防CC设置是判定为是CC攻击,阻断,而非丢弃,这块我有不同的看法,假如说一个正常访问请求我响应需要100k的相应包,一个阻断包(返回403或自定义页面)假设需要5k,但是流量还是通过入口进来了,尤其是云服务器有的可能是按量付费,那我们是不是应该思考一下如何让非法流量不能通过入口进来?传统的抗DDoS设备做的流量清洗或者黑洞阀值不会设置的很低,那么这块要如何做?后期我会分享出来我的解决办法。

三、 业务架构图设计

根据逻辑架构图来制定业务的整体现实架构(可用性部分)

通过负载均衡实现代理服务器的双活,大程度保证访问不会中断,同时应用服务器使用主备双服务器,数据库也是主备双活,很大程度上保证了业务的可用性,防御服务器集群用来分析入侵行为,日志服务器用来汇总业务相关的所有日志。

当然防御服务器集群是至少3台以上,并且每台都会向下兼容应用服务器,这么做的目的在于,行为分析是十分耗资源的一件事,搭建集群会降低单一服务器的运算压力,并且不用担心出现单点故障。

同时保证每个业务接口的入口均为独立入口,这样可以将业务分离出来,也方便防御服务器做单一业务的规则,保证规则的准确性。

下一期会根据可用性架构图来设计安全性部分的架构图和保密性部分的架构图,完整的一套安全架构设计逻辑才算是完整的,本期就先介绍可用性这一部分。

*本文作者:煜阳yuyang


发表评论 取消回复

电子邮件地址不会被公开。 必填项已用*标注