Processing some historical files and loading to graylog


#1 scallawa

I am trying to use nxlog to process some historical files and load them into graylog. It is a nested json file and I only need a few fields out of it. I am hoping for some pointers on how to get the following data out of the json file and posted into graylog.

username rename text to message rename first title to method rename title_link to method_link

From the fields section Severity: value Region: Value Rename Last Seen to timestamp: value I would like graylog to use this as the timestamp for the data coming in.

Below is my whole nxlog.conf but the relevant input is the guardduty input.

nxlog.conf User nxlog Group nxlog Panic Soft

default values:

define INSTALLDIR /opt/nxlog

PidFile %INSTALLDIR%/var/run/nxlog/nxlog.pid

CacheDir %INSTALLDIR%/var/spool/nxlog

ModuleDir %INSTALLDIR%/lib/nxlog/modules

SpoolDir %INSTALLDIR%/var/spool/nxlog

define CERTDIR %INSTALLDIR%/var/lib/nxlog/cert define CONFDIR %INSTALLDIR%/etc/nxlog.d

Note that these two lines define constants only; the log file location

is ultimately set by the LogFile directive (see below). The

MYLOGFILE define is also used to rotate the log file automatically

(see the _fileop block).

define LOGDIR %INSTALLDIR%/var/log/nxlog define MYLOGFILE %LOGDIR%/nxlog.log

If you are not using NXLog Manager, disable the include line

and enable LogLevel and LogFile.

#include %CONFDIR%/*.conf LogLevel DEBUG LogFile %MYLOGFILE%

<Extension _syslog> Module xm_syslog </Extension>

This block rotates %MYLOGFILE% on a schedule. Note that if LogFile

is changed in managed.conf via NXLog Manager, rotation of the new

file should also be configured there.

<Extension _fileop> Module xm_fileop

# Check the size of our log file hourly, rotate if larger than 5MB
&lt;Schedule&gt;
    Every   1 hour
    &lt;Exec&gt;
        if ( file_exists('%MYLOGFILE%') and
             (file_size('%MYLOGFILE%') &gt;= 5M) )
        {
             file_cycle('%MYLOGFILE%', 8);
        }
    &lt;/Exec&gt;
&lt;/Schedule&gt;

# Rotate our log file every week on Sunday at midnight
&lt;Schedule&gt;
    When    @weekly
    Exec    if file_exists('%MYLOGFILE%') file_cycle('%MYLOGFILE%', 8);
&lt;/Schedule&gt;

</Extension>

<Extension json_parser> Module xm_json Flatten True PrettyPrint True </Extension>

<Extension exec> Module xm_exec </Extension>

<Extension gelf> Module xm_gelf </Extension>

<Extension antivirus_csv> Module xm_csv Fields $Severity,$timestamp,$ip,$endtime,$User,$User Groups,$Device,$Device Group Delimiter , </Extension>

<Extension xml> Module xm_xml </Extension>

<Input alienvault> Module im_file File "/var/nxlog/alienvault/*.log" SavePos TRUE ReadFromLast TRUE <Exec> parse_xml(); to_json(); </Exec> </Input>

<Input antivirus> Module im_file File "/var/nxlog/antivirus/*.csv"

ReadFromLast TRUE

SavePos True

<Exec> antivirus_csv->parse_csv(); to_json(); </Exec> </Input>

<Input guardduty> Module im_file File "/var/nxlog/guardduty/*.json" SavePos True InputType json_parser <Exec> #Delete some fields that aren't necessary delete($type); delete($subtype); delete($text); delete($mrkdwn_in); delete($EventReceivedTime);

    #Convert fields back to JSON because of the deletion
    to_json();
&lt;/Exec&gt;

</Input>

<Output udp_12202> Module om_udp Host graylog.mydomain.com:12202 OutputType GELF_UDP </Output>

<Output udp_5515> Module om_udp Host graylog.mydomain.com:5515 OutputType GELF_UDP </Output>

<Route antivirus_to_udp_5515> Path antivirus => udp_5515 </Route>

<Route to_udp_12202> Path alienvault, guardduty => udp_12202 </Route>

Sample JSON. The files are larger but I kept the first and last entry. [ { "type": "message", "subtype": "bot_message", "text": "", "ts": "1614611466.000100", "username": "GuardDuty", "icons": { "image_48": "https://s3-us-east-1.amazonaws.com/slack-files2/bot_icons/2019-07-02/671758367922_48.png" }, "bot_id": "BL5799J6A", "attachments": [ { "fallback": "Recon:IAMUser/UserPermissions - <https://console.aws.amazon.com/guardduty/home?region=us-east-1#/findings?search=id%5t45ffg59a575art6789404dr5oci5a4zz>", "text": "APIs commonly used to discover the users, groups, policies and permissions in an account, was invoked by IAM principal Some-Role under unusual circumstances. Such activity is not typically seen from this principal.", "pretext": "Finding in us-east-1 for Acct: 505986456095", "title": "Recon:IAMUser/UserPermissions", "id": 1, "title_link": "https://console.aws.amazon.com/guardduty/home?region=us-east-1#/findings?search=id%5t45ffg59a575art6789404dr5oci5a4zz", "color": "e2d43b", "fields": [ { "title": "Severity", "value": "Medium", "short": true }, { "title": "Region", "value": "us-east-1", "short": true }, { "title": "Last Seen", "value": "<!date^1614611294^{date} at {time} | 2021-03-01T15:08:14.526Z>", "short": true } ], "mrkdwn_in": [ "pretext" ] } ] }, { "type": "message", "subtype": "bot_message", "text": "", "ts": "1614629763.001600", "username": "GuardDuty", "icons": { "image_48": "https://s3-us-east-1.amazonaws.com/slack-files2/bot_icons/2019-07-02/671758367922_48.png" }, "bot_id": "BL5799J6A", "attachments": [ { "fallback": "Recon:IAMUser/UserPermissions - <https://console.aws.amazon.com/guardduty/home?region=us-east-1#/findings?search=id%5t45ffg59a575art6789404dr5oci5a4zz>", "text": "APIs commonly used to discover the users, groups, policies and permissions in an account, was invoked by IAM principal Some-Role under unusual circumstances. Such activity is not typically seen from this principal.", "pretext": "Finding in us-east-1 for Acct: 505986456095", "title": "Recon:IAMUser/UserPermissions", "id": 1, "title_link": "https://console.aws.amazon.com/guardduty/home?region=us-east-1#/findings?search=id%5t45ffg59a575art6789404dr5oci5a4zz", "color": "e2d43b", "fields": [ { "title": "Severity", "value": "Medium", "short": true }, { "title": "Region", "value": "us-east-1", "short": true }, { "title": "Last Seen", "value": "<!date^1614629482^{date} at {time} | 2021-03-01T20:11:22.426Z>", "short": true } ], "mrkdwn_in": [ "pretext" ] } ] } ]

#2 konstantinos Nxlog ✓
#1 scallawa
I am trying to use nxlog to process some historical files and load them into graylog. It is a nested json file and I only need a few fields out of it. I am hoping for some pointers on how to get the following data out of the json file and posted into graylog. username rename text to message rename first title to method rename title_link to method_link From the fields section Severity: value Region: Value Rename Last Seen to timestamp: value I would like graylog to use this as the timestamp for the data coming in. Below is my whole nxlog.conf but the relevant input is the guardduty input. nxlog.conf User nxlog Group nxlog Panic Soft default values: define INSTALLDIR /opt/nxlog PidFile %INSTALLDIR%/var/run/nxlog/nxlog.pid CacheDir %INSTALLDIR%/var/spool/nxlog ModuleDir %INSTALLDIR%/lib/nxlog/modules SpoolDir %INSTALLDIR%/var/spool/nxlog define CERTDIR %INSTALLDIR%/var/lib/nxlog/cert define CONFDIR %INSTALLDIR%/etc/nxlog.d Note that these two lines define constants only; the log file location is ultimately set by the LogFile directive (see below). The MYLOGFILE define is also used to rotate the log file automatically (see the _fileop block). define LOGDIR %INSTALLDIR%/var/log/nxlog define MYLOGFILE %LOGDIR%/nxlog.log If you are not using NXLog Manager, disable the include line and enable LogLevel and LogFile. #include %CONFDIR%/*.conf LogLevel DEBUG LogFile %MYLOGFILE% <Extension _syslog> Module xm_syslog </Extension> This block rotates %MYLOGFILE% on a schedule. Note that if LogFile is changed in managed.conf via NXLog Manager, rotation of the new file should also be configured there. <Extension _fileop> Module xm_fileop # Check the size of our log file hourly, rotate if larger than 5MB &lt;Schedule&gt; Every 1 hour &lt;Exec&gt; if ( file_exists('%MYLOGFILE%') and (file_size('%MYLOGFILE%') &gt;= 5M) ) { file_cycle('%MYLOGFILE%', 8); } &lt;/Exec&gt; &lt;/Schedule&gt; # Rotate our log file every week on Sunday at midnight &lt;Schedule&gt; When @weekly Exec if file_exists('%MYLOGFILE%') file_cycle('%MYLOGFILE%', 8); &lt;/Schedule&gt; </Extension> <Extension json_parser> Module xm_json Flatten True PrettyPrint True </Extension> <Extension exec> Module xm_exec </Extension> <Extension gelf> Module xm_gelf </Extension> <Extension antivirus_csv> Module xm_csv Fields $Severity,$timestamp,$ip,$endtime,$User,$User Groups,$Device,$Device Group Delimiter , </Extension> <Extension xml> Module xm_xml </Extension> <Input alienvault> Module im_file File "/var/nxlog/alienvault/*.log" SavePos TRUE ReadFromLast TRUE <Exec> parse_xml(); to_json(); </Exec> </Input> <Input antivirus> Module im_file File "/var/nxlog/antivirus/*.csv" ReadFromLast TRUE SavePos True <Exec> antivirus_csv->parse_csv(); to_json(); </Exec> </Input> <Input guardduty> Module im_file File "/var/nxlog/guardduty/*.json" SavePos True InputType json_parser <Exec> #Delete some fields that aren't necessary delete($type); delete($subtype); delete($text); delete($mrkdwn_in); delete($EventReceivedTime); #Convert fields back to JSON because of the deletion to_json(); &lt;/Exec&gt; </Input> <Output udp_12202> Module om_udp Host graylog.mydomain.com:12202 OutputType GELF_UDP </Output> <Output udp_5515> Module om_udp Host graylog.mydomain.com:5515 OutputType GELF_UDP </Output> <Route antivirus_to_udp_5515> Path antivirus => udp_5515 </Route> <Route to_udp_12202> Path alienvault, guardduty => udp_12202 </Route> Sample JSON. The files are larger but I kept the first and last entry. [ { "type": "message", "subtype": "bot_message", "text": "", "ts": "1614611466.000100", "username": "GuardDuty", "icons": { "image_48": "https://s3-us-east-1.amazonaws.com/slack-files2/bot_icons/2019-07-02/671758367922_48.png" }, "bot_id": "BL5799J6A", "attachments": [ { "fallback": "Recon:IAMUser/UserPermissions - <https://console.aws.amazon.com/guardduty/home?region=us-east-1#/findings?search=id%5t45ffg59a575art6789404dr5oci5a4zz>", "text": "APIs commonly used to discover the users, groups, policies and permissions in an account, was invoked by IAM principal Some-Role under unusual circumstances. Such activity is not typically seen from this principal.", "pretext": "Finding in us-east-1 for Acct: 505986456095", "title": "Recon:IAMUser/UserPermissions", "id": 1, "title_link": "https://console.aws.amazon.com/guardduty/home?region=us-east-1#/findings?search=id%5t45ffg59a575art6789404dr5oci5a4zz", "color": "e2d43b", "fields": [ { "title": "Severity", "value": "Medium", "short": true }, { "title": "Region", "value": "us-east-1", "short": true }, { "title": "Last Seen", "value": "<!date^1614611294^{date} at {time} | 2021-03-01T15:08:14.526Z>", "short": true } ], "mrkdwn_in": [ "pretext" ] } ] }, { "type": "message", "subtype": "bot_message", "text": "", "ts": "1614629763.001600", "username": "GuardDuty", "icons": { "image_48": "https://s3-us-east-1.amazonaws.com/slack-files2/bot_icons/2019-07-02/671758367922_48.png" }, "bot_id": "BL5799J6A", "attachments": [ { "fallback": "Recon:IAMUser/UserPermissions - <https://console.aws.amazon.com/guardduty/home?region=us-east-1#/findings?search=id%5t45ffg59a575art6789404dr5oci5a4zz>", "text": "APIs commonly used to discover the users, groups, policies and permissions in an account, was invoked by IAM principal Some-Role under unusual circumstances. Such activity is not typically seen from this principal.", "pretext": "Finding in us-east-1 for Acct: 505986456095", "title": "Recon:IAMUser/UserPermissions", "id": 1, "title_link": "https://console.aws.amazon.com/guardduty/home?region=us-east-1#/findings?search=id%5t45ffg59a575art6789404dr5oci5a4zz", "color": "e2d43b", "fields": [ { "title": "Severity", "value": "Medium", "short": true }, { "title": "Region", "value": "us-east-1", "short": true }, { "title": "Last Seen", "value": "<!date^1614629482^{date} at {time} | 2021-03-01T20:11:22.426Z>", "short": true } ], "mrkdwn_in": [ "pretext" ] } ] } ]

Hi Damion,

In general you'll need to use parse_json(). In your xm_json block you'll need to use Flatten True in order to flatten the nested JSON. Then for the field manipulation the xm_rewrite module that is available in NXLog EE will be handy.

Since you are a trial user, I will contact you by email to provide more insight based on the sample you've provided. One of the things we'll need to clarify is if your JSON is parsed correctly or if it should be read as a multiline event.

Kind regards,

Konstantinos