123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137 |
- using ExcelDataReader;
- using System.Collections.Concurrent;
- using System.Data;
- namespace etoy
- {
- class CmdReadCommonTable : Command
- {
- const string SEARCH_PATTERN = "*.csv";
- ConcurrentQueue<DataTableWrapper> _waitingParses = new ConcurrentQueue<DataTableWrapper>();
- ConcurrentQueue<Table> _parseDones = new ConcurrentQueue<Table>();
- ExcelReaderConfiguration _excelReaderConfiguration;
- ConcurrentQueue<ErrorData> _errorConfigs = new ConcurrentQueue<ErrorData>();
- CommonTableCsvParser _parser = new CommonTableCsvParser();
- object _locker = new object();
- int _totalCount;
- public int TotalCount
- {
- get { lock (_locker) return _totalCount; }
- set { lock (_locker) _totalCount = value; }
- }
- public override string Description => "读取Common表";
- protected override void OnProcess()
- {
- var csvs = Directory.GetFiles(Context.Option.CsvOutput, SEARCH_PATTERN, SearchOption.TopDirectoryOnly);
- if (csvs.Length <= 0)
- throw new Exception($"{Context.Option.CsvOutput} 没有*.csv文件...");
- TotalCount = csvs.Length;
- _excelReaderConfiguration = new ExcelReaderConfiguration() { AutodetectSeparators = new char[] { ',' } };
- _parseDones.Clear();
- _waitingParses.Clear();
- foreach (var file in csvs)
- Task.Factory.StartNew(OnThreadIO, file.ToPath());
- Task.Factory.StartNew(OnThreadParse);
- }
- void OnThreadParse()
- {
- while (_parseDones.Count < TotalCount)
- {
- while (_waitingParses.TryDequeue(out var wrapper))
- {
- try
- {
- var table = _parser.Parse(wrapper.Table, wrapper.Path);
- _parseDones.Enqueue(table);
- SetProgress(_parseDones.Count / (float)TotalCount);
- }
- catch (Exception e)
- {
- TotalCount--;
- _errorConfigs.Enqueue(new ErrorData() { Exception = e, Path = wrapper.Path });
- }
- }
- Thread.Sleep(5);
- }
- if (_errorConfigs.Count > 0)
- {
- SetException(new Exception(string.Join('\n', (from e in _errorConfigs select $"## ERROR: {e}").ToArray())));
- }
- else
- {
- while (_parseDones.TryDequeue(out var table))
- Context.Blackboard.Tables.Add(table);
- // 排序一下
- Context.Blackboard.Tables.Sort((a, b) => a.Name.CompareTo(b.Name));
- Completed();
- }
- }
- void OnThreadIO(object state)
- {
- try
- {
- ReadSingle(state as string);
- }
- catch (Exception e)
- {
- TotalCount--;
- _errorConfigs.Enqueue(new ErrorData() { Exception = e, Path = state as string });
- }
- }
- void ReadSingle(string file)
- {
- using Stream csv = new FileStream(file, FileMode.Open, FileAccess.Read);
- IExcelDataReader reader = ExcelReaderFactory.CreateCsvReader(csv, _excelReaderConfiguration);
- DataSet dataSet = reader.AsDataSet();
- foreach (DataTable dt in dataSet.Tables)
- {
- _waitingParses.Enqueue(new DataTableWrapper()
- {
- Table = dt,
- Path = file,
- });
- }
- csv.Close();
- reader.Close();
- reader.Dispose();
- }
- struct DataTableWrapper
- {
- public DataTable Table;
- public string Path;
- }
- struct ErrorData
- {
- public Exception Exception;
- public string Path;
- public override string ToString()
- {
- return $"{Exception?.Message} {Path}";
- }
- }
- }
- }
|