using ExcelDataReader; using System.Collections.Concurrent; using System.Data; namespace etoy { class CmdReadCommonTable : Command { const string SEARCH_PATTERN = "*.csv"; ConcurrentQueue _waitingParses = new ConcurrentQueue(); ConcurrentQueue _parseDones = new ConcurrentQueue
(); ExcelReaderConfiguration _excelReaderConfiguration; ConcurrentQueue _errorConfigs = new ConcurrentQueue(); CommonTableCsvParser _parser = new CommonTableCsvParser(); object _locker = new object(); int _totalCount; public int TotalCount { get { lock (_locker) return _totalCount; } set { lock (_locker) _totalCount = value; } } public override string Description => "读取Common表"; protected override void OnProcess() { var csvs = Directory.GetFiles(Context.Option.CsvOutput, SEARCH_PATTERN, SearchOption.TopDirectoryOnly); if (csvs.Length <= 0) throw new Exception($"{Context.Option.CsvOutput} 没有*.csv文件..."); TotalCount = csvs.Length; _excelReaderConfiguration = new ExcelReaderConfiguration() { AutodetectSeparators = new char[] { ',' } }; _parseDones.Clear(); _waitingParses.Clear(); foreach (var file in csvs) Task.Factory.StartNew(OnThreadIO, file.ToPath()); Task.Factory.StartNew(OnThreadParse); } void OnThreadParse() { while (_parseDones.Count < TotalCount) { while (_waitingParses.TryDequeue(out var wrapper)) { try { var table = _parser.Parse(wrapper.Table, wrapper.Path); _parseDones.Enqueue(table); SetProgress(_parseDones.Count / (float)TotalCount); } catch (Exception e) { TotalCount--; _errorConfigs.Enqueue(new ErrorData() { Exception = e, Path = wrapper.Path }); } } Thread.Sleep(5); } if (_errorConfigs.Count > 0) { SetException(new Exception(string.Join('\n', (from e in _errorConfigs select $"## ERROR: {e}").ToArray()))); } else { while (_parseDones.TryDequeue(out var table)) Context.Blackboard.Tables.Add(table); // 排序一下 Context.Blackboard.Tables.Sort((a, b) => a.Name.CompareTo(b.Name)); Completed(); } } void OnThreadIO(object state) { try { ReadSingle(state as string); } catch (Exception e) { TotalCount--; _errorConfigs.Enqueue(new ErrorData() { Exception = e, Path = state as string }); } } void ReadSingle(string file) { using Stream csv = new FileStream(file, FileMode.Open, FileAccess.Read); IExcelDataReader reader = ExcelReaderFactory.CreateCsvReader(csv, _excelReaderConfiguration); DataSet dataSet = reader.AsDataSet(); foreach (DataTable dt in dataSet.Tables) { _waitingParses.Enqueue(new DataTableWrapper() { Table = dt, Path = file, }); } csv.Close(); reader.Close(); reader.Dispose(); } struct DataTableWrapper { public DataTable Table; public string Path; } struct ErrorData { public Exception Exception; public string Path; public override string ToString() { return $"{Exception?.Message} {Path}"; } } } }