#!/usr/bin/python -E

################################################################################
#                                                                              #
# The RBACPP Self Test                                                         #
#                                                                              #
# Performs various tests on the system to verify RBACPP compliance.            #
#                                                                              #
# Copyright (C) 2006,2007 IBM Corporation                                      #
# Licensed under GNU General Public License                                    #
#                                                                              #
# This program is free software; you can redistribute it and/or modify         #
# it under the terms of the GNU General Public License as published by         #
# the Free Software Foundation; either version 2 of the License, or            #
# (at your option) any later version.                                          #
#                                                                              #
# This program is distributed in the hope that it will be useful,              #
# but WITHOUT ANY WARRANTY; without even the implied warranty of               #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the                #
# GNU General Public License for more details.                                 #
#                                                                              #
# You should have received a copy of the GNU General Public License            #
# along with this program; see the file COPYING.  If not, write to             #
# the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.        #
#                                                                              #
# Author: George C. Wilson <gcwilson@us.ibm.com>                               #
#                                                                              #
################################################################################
#                                                                              #
# USE                                                                          #
#                                                                              #
#     1.  First take a snapshot of the filesystem hashes using AIDE.           #
#                                                                              #
#             rbac-self-test --snapshot --verbose                              #
#                                                                              #
#     2.  Run the the self test in normal mode.                                #
#                                                                              #
#             rbac-self-test --verbose                                         #
#                                                                              #
#     3.  Add the command to your crontab.                                     #
#                                                                              #
#             rbac-self-test                                                   #
#                                                                              #
#     4.  Check the audit and system logs for failures.                        #
#                                                                              #
################################################################################

import string
import re
import os
import os.path
import errno
import sys
import shutil
import pwd
import syslog
import socket
import selinux
import audit

class SelfTest:

    #
    # init
    #
    #     Returns:  None
    #

    def __init__(self):
        self.opt_snapshot = False
        self.opt_verbose = False
        self.default_failure_action = 'single'
        self.config_file = '/etc/security/rbac-self-test.conf'
        self.program_name = os.path.basename(sys.argv[0])
        self.read = False
        self.write = True
        self.SystemHigh = 'SystemHigh'
        self.SystemLow = 'SystemLow'
        self.expectSuccess = False
        self.expectFailure = True
        self.success = True
        self.failure = False
        self.failure_action_performed = False
        return(None)

    #
    # usage
    #
    #     Returns:  0 - Success
    #

    def usage(self):
        print self.program_name + ': [--snapshot] [--verbose]'
        return(0)

    #
    # Parse args
    #
    #     Returns:  rc: 0 - Success, 1 - Failure
    #

    def args_parse(self):

        rc = 0

        for opt in sys.argv[1:]:
            if opt == '--snapshot' or opt == '-s':
                 self.opt_snapshot = True
            elif opt == '--verbose' or opt == '-v':
                 self.opt_verbose = True
            else:
                 rc = 1

        return(rc)

    #
    # Init failure action from config file.
    #
    #     Returns:  0 - Successfully logged audit message, errno or -1 on failure
    #

    def failure_action_init(self):

        rc = 0

        self.failure_action = self.default_failure_action

        try:
            action_file = open(self.config_file, 'r')
            if action_file.closed == False:
                action_list = action_file.readlines()
                action = action_list[0]
                action = action.strip()
                action_file.close()
        except IOError, (oserrno, strerror):
            self.message_log('Cannot read auditd action: ' + self.config_file + ': ' + strerror)
            rc = oserrno
        except:
            raise

        if rc == 0:

            if ((action == 'single') or (action == 'log')):
                self.failure_action = action
            else:
                self.message_log('Invalid failure action: ' + action)
                rc = -1

        return (rc)

    #
    #  Perform the failure action.
    #
    #      Return:  0 in success, errno or -1 on failure
    #

    def failure_action_perform(self):

        rc = 0

        #  Just do it once, prevent recursion.

        if self.failure_action_performed == False:

            if self.failure_action == 'single':
                try:
                    rc = os.spawnv(os.P_WAIT, '/sbin/telinit', ('/sbin/telinit', '1'))
                    self.failure_action_performed = True
                    if rc != 0:
                        self.message_log('/sbin/telinit 1 failed, rc = ' + str(rc))
                except OSError, (oserrno, strerror):
                    self.message_log('Cannot run /sbin/telinit s to go to single user mode: '+ strerror)
                    rc = oserrno
                except:
                    raise

            elif self.failure_action != 'log':
                self.failure_action_performed = True
                self.message_log('Invalid audit failure action - logging only')
                rc = -1

        return(rc)

    #
    # Log an audit message
    #
    #     Returns:  0 - Successfully logged audit message, 1 - Failure
    #

    def message_log(self, message, successful = False):

        rc = 0

        try:
            hostname = socket.gethostname()
            try:
                hostaddr = socket.gethostbyname(hostname)
            except:
                hostaddr = 'unknown'
        except:
            hostname = 'unknown'

        try:
            ttyname = os.readlink('/proc/self/fd/0')
            if ttyname.find('/dev') != 0:
                ttyname = 'notatty'
        except:
            ttyname = 'unknown'

        message = self.program_name + ': ' + message

        if (successful == True):
            audit_record_type = audit.AUDIT_TEST
        else:
            audit_record_type = audit.AUDIT_ANOM_RBAC_FAIL

        try:
            audit.audit_log_user_message(self.audit_fd,
                                         audit_record_type,
                                         message,
                                         hostname,
                                         hostaddr,
                                         ttyname,
                                         successful)
        except:
            print 'Attention: Cannot log audit record'
            rc = 1
            try:
                syslog.openlog('Security')
                syslog.syslog(syslog.LOG_AUTH|syslog.LOG_EMERG,
                              'Attention: Cannot log audit record (message was: ' + message + ')')
                syslog.closelog()
            except:
                print 'Attention: Cannot log syslog record'

        if self.opt_verbose == True:
            print message

        self.failure_action_perform()

        return(rc)

    #
    # Initialize audit.
    #
    #     Returns:  An audit handle; Non-negative integer - Success, -1 - Failure
    #

    def audit_open(self):

        rc = 0
        self.audit_fd = -1

        rc = self.failure_action_init()

        if rc != 0:
            self.message_log('Cannot set failure action in audit init')

        if rc == 0:
            try:
                self.audit_fd = audit.audit_open()
            except:
                self.message_log('Cannot open audit')
                rc = 1

        return(rc)

    #
    # Deinitialize audit.
    #
    #     Returns:  0 - Success, 1 - Failure
    #

    def audit_close(self):

        rc = 0

        if self.audit_fd >= 0:
            try:
                audit.audit_close(self.audit_fd)
            except:
                self.message_log('Cannot close audit')
                rc = 1

        return(rc)

    #
    # Verify SELinux is enabled, enforcing, and MLS.
    #
    #     Returns:  0 - Success, 1 - Failure
    #

    def selinux_verify(self):

        rc = 0

        try:
            if selinux.is_selinux_enabled() != 1:
                self.message_log('SELinux is not enabled')
                rc = 1

        except:
            self.message_log('Cannot check whether SELinux is enabled')
            rc = 1

        if rc == 0:
            try:
                if selinux.security_getenforce() != 1:
                    self.message_log('SELinux is not in enforcing mode')
                    rc = 1

            except:
                self.message_log('Cannot check whether SELinux is enforcing')
                rc = 1

        if rc == 0:
            try:
                if selinux.is_selinux_mls_enabled() != 1:
                    self.message_log('SELinux MLS is not enabled')
                    rc = 1

            except:
                self.message_log('Cannot check whether SELinux MLS is enabled')
                rc = 1

        return(rc)

    #
    # Verify auditd is running
    #
    #     Returns:  0 - Success, 1 - Failure
    #

    def auditd_verify(self):

        rc = 0

        try:
            if audit.audit_is_enabled(self.audit_fd) != 1:
                self.message_log('The audit daemon is not running')
                rc = 1

        except:
            self.message_log('Cannot get audit status')
            rc = 1

        return(rc)

    #
    # Take an AIDE snapshot of configuration files.
    #
    #     Returns:  0 - Success,  spawnv rc or errno on failure
    #

    def snapshot_take(self):

        rc = 0

        if rc == 0:
            try:
                rc = self.runcon("root:sysadm_r:aide_t:SystemHigh", '/usr/sbin/aide', '--init')
            except OSError, (oserrno, strerror):
                self.message_log('Cannot initialize AIDE database, errno = ' + str(oserrno) + ': ' + strerror)
                rc = oserrno
            except:
                raise

        if rc == 0:
            try:
                os.unlink('/var/lib/aide/aide.db.gz')
            except OSError, (oserrno, strerror):
                if oserrno != errno.ENOENT:
                    self.message_log('Cannot remove old AIDE database, errno = ' + str(oserrno) + ': ' + strerror)
                    rc = oserrno
            except:
                raise

        if rc == 0:
            try:
                shutil.move('/var/lib/aide/aide.db.new.gz', '/var/lib/aide/aide.db.gz')
            except OSError, (oserrno, strerror):
                self.message_log('Cannot move new AIDE database into place, errno = ' + str(oserrno) + ': ' + strerror)
                rc = oserrno
            except:
                raise

        return(rc)

    #
    # Verify integrity of configuration files.
    #
    #     Returns:  0 - Success; spawnv rc or errno - Failure
    #

    def snapshot_verify(self):

        rc = 0

        try:
            rc = self.runcon("root:sysadm_r:aide_t:SystemHigh", '/usr/sbin/aide', '--check')
        except OSError, (oserrno, strerror):
            self.message_log('Cannot verify AIDE database, errno = ' + str(oserrno) + ': ' + strerror)
            rc = oserrno
        except:
            raise

        if rc != 0:
            self.message_log('Cannot verify AIDE database')

        return(rc)

    #
    # internal "runcon"
    #
    #     Returns:  0 - Success; spawnv rc or errno, or -1 - Failure
    #

    def runcon(self, context, program, *args):

        rc = 0

        progargs = (program,) + args

        try:
            rc = selinux.setexeccon(context)
        except:
            self.message_log('Cannot set exec context to ' + context)
            rc = -1

        if rc != 1:
            try:
                rc = os.spawnv(os.P_WAIT, program, progargs)
            except OSError, (oserrno, strerror):
                self.message_log('Cannot spawnv ' + str(progargs) + ': ' + strerror)
                rc = oserrno
            except:
                raise

        return(rc)

    #
    # mlsfile_test
    #
    #     Test reading/writing from level2 to level1
    #
    #     Returns:  0 - Success, 1 - Failure
    #

    def mlsfile_test(self, write, level2, level1, expectfail):

        rc = 0

        context1 = 'root:sysadm_r:rbacselftest_t:' + level1
        context2 = 'root:sysadm_r:rbacselftesthelper_t:' + level2

        if write == True:
            testopname = 'write'
            testop = 'w'
        else:
            testopname = 'read'
            testop = 'r'

        rc = self.runcon(context1, '/usr/sbin/rbac-self-test-helper', context1, 'w')

        if rc != 0:
            self.message_log('Helper cannot complete create with context ' + context1)
            rc = 1

        testrc = self.runcon(context2, '/usr/sbin/rbac-self-test-helper', context1, testop)

        if testrc != 0:
            testrc = 1

        if expectfail == True:
            testrc = int(not testrc)

        rc = self.runcon(context1, '/usr/sbin/rbac-self-test-helper', context1, 'd')

        if rc != 0:
            self.message_log('Helper cannot complete delete with context ' + context1)

        if testrc != 0:
            self.message_log('MLS file test: write at ' + context1 + ', ' + testopname + ' at ' + context2 + ' failed')

        return(testrc)

#
# Main
#
#     Exits:  0 - Success, 1 - Failure
#

def main():

    rc = 0

    st = SelfTest()

    if rc == 0:
        st.audit_open()

    if rc == 0 and st.audit_fd < 0:
        rc = 1

    if rc == 0:
        rc = st.args_parse()

    if rc != 0:
        st.usage()

    if rc == 0 and st.opt_snapshot == True:
        rc = st.snapshot_take()

    else:

        if rc == 0:
            rc = st.selinux_verify()

        if rc == 0:
            rc = st.auditd_verify()

        if rc == 0:
            rc = st.mlsfile_test(st.read, st.SystemHigh, st.SystemLow, st.expectSuccess)

        if rc == 0:
            rc = st.mlsfile_test(st.read, st.SystemLow, st.SystemHigh, st.expectFailure)

        if rc == 0:
            rc = st.mlsfile_test(st.write, st.SystemHigh, st.SystemLow, st.expectFailure)

        # Stronger than BLP

        if rc == 0:
            rc = st.mlsfile_test(st.write, st.SystemLow, st.SystemHigh, st.expectFailure)

        if rc == 0:
            rc = st.snapshot_verify()

        if rc == 0:
            st.message_log('The RBAC self test succeeded', st.success)
            if st.opt_verbose == True:
                print 'The RBAC self test succeeded.'
        else:
            st.message_log('The RBAC self test failed', st.failure)
            if st.opt_verbose == True:
                print 'The RBAC self test failed.'

        if rc == 0:
            st.audit_close()

        return(rc)

if __name__ == "__main__":
    sys.exit(main())
