using System; using System.Text; using System.Data; using System.Net.Http; using System.Net; using System.Threading.Tasks; using System.Collections.Generic; using System.Collections.Concurrent; using Microsoft.Extensions.Logging; using Newtonsoft.Json; using Newtonsoft.Json.Linq; using MySql.Data.MySqlClient; using TradeIdeasCommon.Models; using TradeIdeasCommon.Extensions; using System.IO; namespace TradeIdeasCommon.Services.Logging { public sealed class LoggingService { private readonly ILogger _logger; private readonly BlockingCollection> _bcqLogMessgaes; //First item in the list are the comman delimited Tags: Ex... "Tag1,Tag2..." private bool _running = false; private string _loggingEndpoint; private string _loggingKey; private string _databaseConnectionString; private string _dynamicSqlInsertHeader = string.Empty; public LoggingService(ILogger logger, string databaseConnectionString, string loggingEndpoint, string loggingKey) { _logger = logger; _databaseConnectionString = databaseConnectionString; _loggingEndpoint = loggingEndpoint; _loggingKey = loggingKey; _bcqLogMessgaes = new BlockingCollection>(new ConcurrentQueue>()); //Start the Monitor Worker for the Logging Service _running = true; Task.Run(async () => await LogMessagesWorker()); } public async Task LogMessages( string mode, string apiKey, string tags, string messages) { //This is a fire and forget method with no blocking //The item is enqueued and processed by a worker thread. var result = new ServiceResult(); result.Status = _bcqLogMessgaes.TryAdd( new List() {mode,tags,messages }) ? ServiceResultStatuses.Success : ServiceResultStatuses.ServerError; await Task.CompletedTask; return result; } private async Task LogMessagesWorker() { try { HttpClient httpClient = new HttpClient() { BaseAddress = new Uri($"https://{_loggingEndpoint}"), Timeout = TimeSpan.FromSeconds(30) }; var conn = new MySqlConnection(_databaseConnectionString); await conn.OpenAsync(); var cmd = new MySqlCommand("dynamic_insert", conn) { CommandType = CommandType.StoredProcedure, }; cmd.Parameters.AddWithValue("dynamicSql", string.Empty); await cmd.PrepareAsync(); var messageRecords = new List(); BuildDynamicSqlInsertHeader(); while (_running) { try { if (_bcqLogMessgaes.TryTake(out List logMessage, 1000)) { var mode = logMessage[0]; var tags = logMessage[1]; var messages = logMessage[2]; var request = new HttpRequestMessage(HttpMethod.Post, $"{mode}/{_loggingKey}/tag/{tags}"); request.Content = new StringContent(messages, Encoding.UTF8, "application/json"); var response = await httpClient.SendAsync(request); //Implement if we want to support bulk logging //Environment.NewLine does not seem to be consistent between Windows and Linux //foreach (var message in messages.Split(Environment.NewLine, StringSplitOptions.RemoveEmptyEntries)) //{ var message = messages; var json = JObject.Parse(message); var level = json.GetValue("level"); var userName = json.GetValue("userName"); var applicationName = json.GetValue("process"); var applicationVersion = json.GetValue("version"); messageRecords.Add($"('{level}','{userName}','{applicationName}','{applicationVersion}','{message}','{tags}','{DateTime.UtcNow:yyyy-MM-dd HH:mm:ss.fff}')"); cmd.Parameters["dynamicSql"].Value = _dynamicSqlInsertHeader + string.Join(",", messageRecords); await cmd.ExecuteNonQueryAsync(); messageRecords.Clear(); //} //_logger.LogInformation("Logging Metrics..."); } } catch (Exception ex) { _logger.LogError(ex, ex.Message); while (conn.State != ConnectionState.Open) { try { conn = new MySqlConnection(_databaseConnectionString); await conn.OpenAsync(); cmd = new MySqlCommand("dynamic_insert", conn) { CommandType = CommandType.StoredProcedure, }; cmd.Parameters.AddWithValue("dynamicSql", string.Empty); await cmd.PrepareAsync(); } catch (Exception iex) { _logger.LogError(iex, iex.Message); await Task.Delay(5000); } } } } } catch (Exception ex) { _logger.LogError(ex, ex.Message); } } private void BuildDynamicSqlInsertHeader() { //Create the constant portion of the Insert Statement using today's table var columns = new List() { "log_level", "user_name", "application_name", "application_version", "message", "tags", "time" }; _dynamicSqlInsertHeader = $"INSERT INTO log({string.Join(",", columns)}) VALUES "; } } }