using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; using System.Linq; using System.Net; using System.Net.Http; using System.Net.Http.Headers; using System.Text.RegularExpressions; using System.Threading; using System.Threading.Tasks; using DirectorConfiguration; using franz; using Newtonsoft.Json; namespace director { public class Program { private static Telefranz tf; public static Config directorConf; private static TimeSpan calendarNaptime = TimeSpan.FromHours(1); public static Scratch scratch; private static HttpClient httpClient; private static readonly ConcurrentQueue workQueue = new ConcurrentQueue(); private static readonly AutoResetEvent _signal = new AutoResetEvent(false); private const int concurrentWorkers = 5; private static readonly ConcurrentBag workerIds = new ConcurrentBag(); private static readonly ConcurrentDictionary occurenceHandlers = new ConcurrentDictionary(); static void Main(string[] args) { if (!File.Exists("appsettings.json")) { Console.Error.WriteLine("appsettings.json was not found!"); directorConf = new DirectorConfiguration.Config(); File.WriteAllText("appsettings.json", JsonConvert.SerializeObject(directorConf, Formatting.Indented)); return; } directorConf = JsonConvert.DeserializeObject(File.ReadAllText("appsettings.json")); HumanCommunication.Configure(directorConf.call_for_humans_discord_webhook); httpClient = new HttpClient(); httpClient.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Basic", Convert.ToBase64String( System.Text.Encoding.ASCII.GetBytes($"{directorConf.webdav_username}:{directorConf.webdav_password}"))); Telefranz.Configure("scheduler", directorConf.kafka_bootstrap); tf = Telefranz.Instance; scratch = Scratch.LoadScratch(); for (var i = 0; i < concurrentWorkers; i++) { Task.Run(threadwork); } while (true) { Task.WaitAll( Task.Run(checkCalendars), Task.Delay(calendarNaptime) ); } } private static void checkCalendars() { try { lock (scratch) { scratch.agenda.Clear(); } var calChecks = new List(); foreach (var calendar in directorConf.webdav_calendars) { calChecks.Add(checkCalendar(calendar)); } Task.WaitAll(calChecks.ToArray()); scratch.agenda = scratch.agenda.OrderBy(s => s.Showtime).ToList(); lock (scratch) { scratch.Save(); } } catch (Exception e) { Console.Error.WriteLine(e); } foreach (var s in scratch.agenda) { //todo: find the perfect lead time. if ((s.Showtime - TimeSpan.FromDays(1)) - DateTime.Now <= calendarNaptime) { var copy = JsonConvert.DeserializeObject(JsonConvert.SerializeObject(s)); if(!occurenceHandlers.ContainsKey(copy.Occurrence._event.Uid) || !workerIds.Contains(occurenceHandlers[copy.Occurrence._event.Uid])) { workQueue.Enqueue(copy); _signal.Set(); } else { Console.WriteLine($"calendars checked, looks like event {copy.Occurrence._event.Uid} thing is handled"); } } } Console.WriteLine("calendars checked"); } private static async Task checkCalendar(string calendarUri) { //?export is a hack to allow me to access the calendar //it likes to throw an error saying "this is the webDAV interface, use webDAV" at my webDAV client, stopping me from using webDAV. Console.WriteLine(calendarUri); var calString = await httpClient.GetStringAsync(directorConf.webdav_uri + calendarUri + "?export"); lock (scratch) { var calName = iCalHoopJumping.LoadCalendar(calString); scratch.Calendars[calName] = calString; //todo: make sure I'm getting things in the present. foreach (var occurrence in iCalHoopJumping.getOccurrences(calName)) { var newSchedulable = new schedulable.Scheduled() { Occurrence = occurrence, Showtime = occurrence.OccurrenceStart }; var asActualEvent = iCalHoopJumping.parseEvent(calName, occurrence.Event); if (scratch.agenda.FirstOrDefault(s => iCalHoopJumping.parseEvent(calName, s.Occurrence.Event)?.Uid == asActualEvent.Uid) == null) { //createSchedulable(ref newSchedulable); scratch.agenda.Add(newSchedulable); } } } } private static void threadwork() { var threadId = Guid.NewGuid(); workerIds.Add(threadId); schedulable.Scheduled todo = null; while (true) { _signal.WaitOne(calendarNaptime); if (!workQueue.TryDequeue(out todo)) { continue; } Console.WriteLine($"threadwork consumes!"); Console.WriteLine(JsonConvert.SerializeObject(todo)); if (todo.HandledBy != null && workerIds.Contains(todo.HandledBy.Value)) { Console.WriteLine($"{todo.HandledBy} already got this."); continue; } todo.HandledBy = threadId; occurenceHandlers[todo.Occurrence._event.Uid] = threadId; Console.WriteLine($"signing it out. i'm {todo.HandledBy}, handling {todo.Occurrence._event.Uid}"); todo.Configuration = findConfig(todo.Occurrence.CalendarSourceName, todo.Occurrence._event.Summary); if (todo.Configuration == null) { Console.WriteLine("configuration not found, doing nothing"); continue; } Console.WriteLine("configuration found"); var handler = new ShowHandler(todo, directorConf.workingDirectory + todo.Showtime.ToString("_yyyy-MM-dd"), () => { //tf.ProduceMessage(new silver_messages.directorial.execute_command(){command = "directors_datasync", timeout = TimeSpan.Zero}); }); handler.StartHandling(); } } private static schedulable.Schedulable findConfig(string CalendarSourceName, string eventName) { foreach (var locator in directorConf.scheduleConfigs) { if (locator.Calendar == CalendarSourceName) { if (locator.EventName.IsMatch(eventName)) { var configurationName = locator.EventName.Replace(eventName, locator.SchedulableConfiguration); Console.WriteLine($"found match good enough, I guess. going to load {configurationName} for {eventName}"); if (File.Exists(configurationName)) { try { return JsonConvert.DeserializeObject(File.ReadAllText(configurationName)); } catch (Exception e) { Console.Error.WriteLine($"error btw. not sure who's not throwing one. {e.Message}"); return null; } } else { Console.Error.WriteLine($"couldn't find condfiguration file {configurationName}"); return null; } } } } //HumanCommunication.Instance.forwardToDiscord($"couldn't find suitable configuration for {eventName} within {CalendarSourceName}", HumanCommunication.LogLevel.Info); return null; } } }