1use crate::traits::math::Vector;
264use crate::util::{print::LowerExpWithPlus, useful::tab};
265#[cfg(feature = "parquet")]
266use arrow::datatypes::{
267 Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type, UInt16Type, UInt32Type,
268 UInt64Type, UInt8Type,
269};
270use std::cmp::{max, min};
271#[cfg(feature = "csv")]
272use std::collections::HashMap;
273#[cfg(feature = "parquet")]
274use indexmap::IndexMap;
275#[cfg(any(feature = "csv", feature = "nc", feature = "parquet"))]
276use std::error::Error;
277use std::fmt;
278use std::ops::{Index, IndexMut};
279#[cfg(feature = "parquet")]
280use std::sync::Arc;
281use DType::{Bool, Char, Str, F32, F64, I16, I32, I64, I8, ISIZE, U16, U32, U64, U8, USIZE};
282
283#[cfg(feature = "parquet")]
284use arrow::{
285 array::{Array, BooleanArray, PrimitiveArray, StringArray},
286 datatypes::{DataType, Field, Schema},
287};
288#[cfg(feature = "csv")]
289use csv::{ReaderBuilder, WriterBuilder};
290#[cfg(feature = "nc")]
291use netcdf::{
292 types::VariableType,
293 variable::{Variable, VariableMut},
294 Numeric,
295};
296#[cfg(feature = "parquet")]
297use parquet::{
298 arrow::arrow_reader::ParquetRecordBatchReaderBuilder,
299 arrow::arrow_writer::compute_leaves,
300 arrow::arrow_writer::get_column_writers,
301 arrow::arrow_writer::ArrowLeafColumn,
302 arrow::ArrowSchemaConverter,
303 basic::Compression,
304 file::properties::WriterProperties,
305 file::writer::{SerializedFileWriter, SerializedRowGroupWriter},
306};
307
308#[derive(Debug, Copy, Clone, Eq, PartialEq)]
314pub enum DType {
315 USIZE,
316 U8,
317 U16,
318 U32,
319 U64,
320 ISIZE,
321 I8,
322 I16,
323 I32,
324 I64,
325 F32,
326 F64,
327 Bool,
328 Str,
329 Char,
330}
331
332#[derive(Debug, Clone, PartialEq)]
334pub enum DTypeArray {
335 USIZE(Vec<usize>),
336 U8(Vec<u8>),
337 U16(Vec<u16>),
338 U32(Vec<u32>),
339 U64(Vec<u64>),
340 ISIZE(Vec<isize>),
341 I8(Vec<i8>),
342 I16(Vec<i16>),
343 I32(Vec<i32>),
344 I64(Vec<i64>),
345 F32(Vec<f32>),
346 F64(Vec<f64>),
347 Bool(Vec<bool>),
348 Str(Vec<String>),
349 Char(Vec<char>),
350}
351
352#[derive(Debug, Clone, PartialEq, PartialOrd)]
354pub enum DTypeValue {
355 USIZE(usize),
356 U8(u8),
357 U16(u16),
358 U32(u32),
359 U64(u64),
360 ISIZE(isize),
361 I8(i8),
362 I16(i16),
363 I32(i32),
364 I64(i64),
365 F32(f32),
366 F64(f64),
367 Bool(bool),
368 Str(String),
369 Char(char),
370}
371
372#[derive(Debug, Clone, PartialEq)]
407pub struct DataFrame {
408 pub data: Vec<Series>,
409 pub ics: Vec<String>,
410}
411
412#[derive(Debug, Clone, PartialEq)]
433pub struct Series {
434 pub values: DTypeArray,
435 pub dtype: DType,
436}
437
438#[derive(Debug, Clone, PartialEq)]
440pub struct Scalar {
441 pub value: DTypeValue,
442 pub dtype: DType,
443}
444
445pub trait TypedScalar<T> {
449 fn new(s: T) -> Self
450 where
451 Self: Sized;
452 fn unwrap(self) -> T;
453}
454
455pub trait TypedVector<T> {
456 fn new(v: Vec<T>) -> Self;
457 fn to_vec(&self) -> Vec<T>;
458 fn as_slice(&self) -> &[T];
459 fn as_slice_mut(&mut self) -> &mut [T];
460 fn at_raw(&self, i: usize) -> T;
461 fn push(&mut self, elem: T);
462 fn map<F: Fn(T) -> T>(&self, f: F) -> Self;
463 fn mut_map<F: Fn(&mut T)>(&mut self, f: F);
464 fn fold<F: Fn(T, T) -> T>(&self, init: T, f: F) -> T;
465 fn filter<F: Fn(&T) -> bool>(&self, f: F) -> Self;
466 fn take(&self, n: usize) -> Self;
467 fn skip(&self, n: usize) -> Self;
468 fn take_while<F: Fn(&T) -> bool>(&self, f: F) -> Self;
469 fn skip_while<F: Fn(&T) -> bool>(&self, f: F) -> Self;
470 fn zip_with<F: Fn(T, T) -> T>(&self, f: F, other: &Self) -> Self;
471}
472
473macro_rules! impl_typed_scalar {
477 ($type:ty, $dtype:ident) => {
478 impl TypedScalar<$type> for Scalar {
479 fn new(s: $type) -> Self {
480 Self {
481 value: DTypeValue::$dtype(s),
482 dtype: DType::$dtype,
483 }
484 }
485
486 fn unwrap(self) -> $type {
487 match self.value {
488 DTypeValue::$dtype(s) => s,
489 _ => panic!("Can't unwrap {:?} value", $dtype),
490 }
491 }
492 }
493 };
494}
495
496macro_rules! impl_typed_vector {
497 ($type:ty, $dtype:ident) => {
498 impl TypedVector<$type> for Series {
499 fn new(v: Vec<$type>) -> Self {
500 Self {
501 values: DTypeArray::$dtype(v),
502 dtype: DType::$dtype,
503 }
504 }
505
506 fn to_vec(&self) -> Vec<$type> {
507 self.as_slice().to_vec()
508 }
509
510 fn as_slice(&self) -> &[$type] {
511 match &self.values {
512 DTypeArray::$dtype(v) => v,
513 _ => panic!("Can't convert to {:?} vector", $dtype),
514 }
515 }
516
517 fn as_slice_mut(&mut self) -> &mut [$type] {
518 match &mut self.values {
519 DTypeArray::$dtype(v) => v,
520 _ => panic!("Can't convert to {:?} vector", $dtype),
521 }
522 }
523
524 fn at_raw(&self, i: usize) -> $type {
525 let v: &[$type] = self.as_slice();
526 v[i].clone()
527 }
528
529 fn push(&mut self, elem: $type) {
530 let v: &mut Vec<$type> = match &mut self.values {
531 DTypeArray::$dtype(v) => v,
532 _ => panic!("Can't convert to {:?} vector", $dtype),
533 };
534 v.push(elem);
535 }
536
537 fn map<F: Fn($type) -> $type>(&self, f: F) -> Self {
538 let v: Vec<$type> = self.to_vec();
539 Series::new(v.into_iter().map(f).collect::<Vec<$type>>())
540 }
541
542 fn mut_map<F: Fn(&mut $type)>(&mut self, f: F) {
543 let v = self.as_slice_mut();
544 v.iter_mut().for_each(f);
545 }
546
547 fn fold<F: Fn($type, $type) -> $type>(&self, init: $type, f: F) -> $type {
548 let v: Vec<$type> = self.to_vec();
549 v.into_iter().fold(init, f)
550 }
551
552 fn filter<F: Fn(&$type) -> bool>(&self, f: F) -> Self {
553 let v: Vec<$type> = self.to_vec();
554 Series::new(v.into_iter().filter(|x| f(x)).collect::<Vec<$type>>())
555 }
556
557 fn take(&self, n: usize) -> Self {
558 let v: Vec<$type> = self.to_vec();
559 Series::new(v.into_iter().take(n).collect::<Vec<$type>>())
560 }
561
562 fn skip(&self, n: usize) -> Self {
563 let v: Vec<$type> = self.to_vec();
564 Series::new(v.into_iter().skip(n).collect::<Vec<$type>>())
565 }
566
567 fn take_while<F: Fn(&$type) -> bool>(&self, f: F) -> Self {
568 let v: Vec<$type> = self.to_vec();
569 Series::new(v.into_iter().take_while(|x| f(x)).collect::<Vec<$type>>())
570 }
571
572 fn skip_while<F: Fn(&$type) -> bool>(&self, f: F) -> Self {
573 let v: Vec<$type> = self.to_vec();
574 Series::new(v.into_iter().skip_while(|x| f(x)).collect::<Vec<$type>>())
575 }
576
577 fn zip_with<F: Fn($type, $type) -> $type>(&self, f: F, other: &Self) -> Self {
578 let v: Vec<$type> = self.to_vec();
579 let w: Vec<$type> = other.to_vec();
580 Series::new(
581 v.into_iter()
582 .zip(w.into_iter())
583 .map(|(x, y)| f(x, y))
584 .collect::<Vec<$type>>(),
585 )
586 }
587 }
588 };
589}
590
591macro_rules! dtype_case {
592 ($type:ty, $value:expr, $wrapper: expr) => {{
593 let x: $type = $value;
594 $wrapper(x)
595 }};
596}
597
598macro_rules! dtype_match {
599 ($dtype:expr, $value:expr, $wrapper:expr) => {{
600 match $dtype {
601 USIZE => dtype_case!(usize, $value, $wrapper),
602 U8 => dtype_case!(u8, $value, $wrapper),
603 U16 => dtype_case!(u16, $value, $wrapper),
604 U32 => dtype_case!(u32, $value, $wrapper),
605 U64 => dtype_case!(u64, $value, $wrapper),
606 ISIZE => dtype_case!(isize, $value, $wrapper),
607 I8 => dtype_case!(i8, $value, $wrapper),
608 I16 => dtype_case!(i16, $value, $wrapper),
609 I32 => dtype_case!(i32, $value, $wrapper),
610 I64 => dtype_case!(i64, $value, $wrapper),
611 F32 => dtype_case!(f32, $value, $wrapper),
612 F64 => dtype_case!(f64, $value, $wrapper),
613 Bool => dtype_case!(bool, $value, $wrapper),
614 Char => dtype_case!(char, $value, $wrapper),
615 Str => dtype_case!(String, $value, $wrapper),
616 }
617 }};
618
619 ($dtype:expr, $value:expr, $wrapper:expr; $functor:ident) => {{
620 match $dtype {
621 USIZE => dtype_case!($functor<usize>, $value, $wrapper),
622 U8 => dtype_case!($functor<u8>, $value, $wrapper),
623 U16 => dtype_case!($functor<u16>, $value, $wrapper),
624 U32 => dtype_case!($functor<u32>, $value, $wrapper),
625 U64 => dtype_case!($functor<u64>, $value, $wrapper),
626 ISIZE => dtype_case!($functor<isize>, $value, $wrapper),
627 I8 => dtype_case!($functor<i8>, $value, $wrapper),
628 I16 => dtype_case!($functor<i16>, $value, $wrapper),
629 I32 => dtype_case!($functor<i32>, $value, $wrapper),
630 I64 => dtype_case!($functor<i64>, $value, $wrapper),
631 F32 => dtype_case!($functor<f32>, $value, $wrapper),
632 F64 => dtype_case!($functor<f64>, $value, $wrapper),
633 Bool => dtype_case!($functor<bool>, $value, $wrapper),
634 Char => dtype_case!($functor<char>, $value, $wrapper),
635 Str => dtype_case!($functor<String>, $value, $wrapper),
636 }
637 }};
638
639 (N; $dtype:expr, $value:expr, $wrapper:expr) => {{
640 match $dtype {
641 U8 => dtype_case!(u8, $value, $wrapper),
642 U16 => dtype_case!(u16, $value, $wrapper),
643 U32 => dtype_case!(u32, $value, $wrapper),
644 U64 => dtype_case!(u64, $value, $wrapper),
645 I8 => dtype_case!(i8, $value, $wrapper),
646 I16 => dtype_case!(i16, $value, $wrapper),
647 I32 => dtype_case!(i32, $value, $wrapper),
648 I64 => dtype_case!(i64, $value, $wrapper),
649 F32 => dtype_case!(f32, $value, $wrapper),
650 F64 => dtype_case!(f64, $value, $wrapper),
651 _ => panic!("Can't use {} to numeric", $dtype);
652 }
653 }};
654
655 (N; $dtype:expr, $value:expr, $wrapper:expr; $functor:ident) => {{
656 match $dtype {
657 U8 => dtype_case!($functor<u8>, $value, $wrapper),
658 U16 => dtype_case!($functor<u16>, $value, $wrapper),
659 U32 => dtype_case!($functor<u32>, $value, $wrapper),
660 U64 => dtype_case!($functor<u64>, $value, $wrapper),
661 I8 => dtype_case!($functor<i8>, $value, $wrapper),
662 I16 => dtype_case!($functor<i16>, $value, $wrapper),
663 I32 => dtype_case!($functor<i32>, $value, $wrapper),
664 I64 => dtype_case!($functor<i64>, $value, $wrapper),
665 F32 => dtype_case!($functor<f32>, $value, $wrapper),
666 F64 => dtype_case!($functor<f64>, $value, $wrapper),
667 _ => panic!("Can't use {} to numeric", $dtype),
668 }
669 }};
670}
671
672macro_rules! set_space {
673 ($elem:expr) => {{
674 match $elem.dtype {
675 F32 => {
676 let elem: f32 = $elem.unwrap();
677 let st1 = elem.fmt_lower_exp(2);
678 let st2 = elem.to_string();
679
680 if st1.len() < st2.len() {
681 st1
682 } else {
683 st2
684 }
685 }
686 F64 => {
687 let elem: f64 = $elem.unwrap();
688 let st1 = elem.fmt_lower_exp(2);
689 let st2 = elem.to_string();
690
691 if st1.len() < st2.len() {
692 st1
693 } else {
694 st2
695 }
696 }
697 _ => $elem.to_string(),
698 }
699 }};
700
701 ($elem:expr, $space:expr) => {{
702 match $elem.dtype {
703 F32 => {
704 let elem: f32 = $elem.unwrap();
705 $space = max(
706 $space,
707 min(elem.fmt_lower_exp(2).len(), elem.to_string().len()),
708 );
709 }
710 F64 => {
711 let elem: f64 = $elem.unwrap();
712 $space = max(
713 $space,
714 min(elem.fmt_lower_exp(2).len(), elem.to_string().len()),
715 );
716 }
717 _ => {
718 $space = max($space, $elem.to_string().len());
719 }
720 }
721 }};
722}
723
724macro_rules! format_float_vec {
725 ($self:expr) => {{
726 let mut result = String::new();
727 result.push_str("[");
728 for i in 0..$self.len() {
729 let st1 = $self[i].fmt_lower_exp(2);
730 let st2 = $self[i].to_string();
731 let st = if st1.len() < st2.len() { st1 } else { st2 };
732 result.push_str(&st);
733 if i == $self.len() - 1 {
734 break;
735 }
736 result.push_str(", ");
737 }
738 result.push_str("]");
739 result
740 }};
741}
742
743macro_rules! type_cast_vec {
745 ($ty1:ty, $ty2:ty, $to_vec:expr, $wrapper:expr) => {{
746 let y: Vec<$ty1> = $to_vec;
747 let x: Vec<$ty2> = y.into_iter().map(|x| x as $ty2).collect();
748 $wrapper(x)
749 }};
750}
751
752macro_rules! string_cast_vec {
753 ($ty1:ty, $to_vec:expr, $wrapper:expr) => {{
754 let y: Vec<$ty1> = $to_vec;
755 let x: Vec<String> = y.into_iter().map(|x| x.to_string()).collect();
756 $wrapper(x)
757 }};
758}
759
760macro_rules! type_parse_vec {
761 ($ty2:ty, $to_vec:expr, $wrapper:expr) => {{
762 let y: Vec<String> = $to_vec.to_vec();
763 let x: Vec<$ty2> = y.into_iter().map(|x| x.parse().unwrap()).collect();
764 $wrapper(x)
765 }};
766}
767
768macro_rules! dtype_parse_vec_part {
769 ($dt2:expr, $to_vec:expr, $wrapper:expr) => {{
770 match $dt2 {
771 USIZE => type_parse_vec!(usize, $to_vec, $wrapper),
772 U8 => type_parse_vec!(u8, $to_vec, $wrapper),
773 U16 => type_parse_vec!(u16, $to_vec, $wrapper),
774 U32 => type_parse_vec!(u32, $to_vec, $wrapper),
775 U64 => type_parse_vec!(u64, $to_vec, $wrapper),
776 ISIZE => type_parse_vec!(isize, $to_vec, $wrapper),
777 I8 => type_parse_vec!(i8, $to_vec, $wrapper),
778 I16 => type_parse_vec!(i16, $to_vec, $wrapper),
779 I32 => type_parse_vec!(i32, $to_vec, $wrapper),
780 I64 => type_parse_vec!(i64, $to_vec, $wrapper),
781 F32 => type_parse_vec!(f32, $to_vec, $wrapper),
782 F64 => type_parse_vec!(f64, $to_vec, $wrapper),
783 Bool => type_parse_vec!(bool, $to_vec, $wrapper),
784 Char => type_parse_vec!(char, $to_vec, $wrapper),
785 Str => type_parse_vec!(String, $to_vec, $wrapper),
786 }
787 }};
788}
789
790macro_rules! dtype_cast_vec_part {
791 ($ty1:ty, $dt2:expr, $to_vec:expr, $wrapper:expr) => {{
792 match $dt2 {
793 USIZE => type_cast_vec!($ty1, usize, $to_vec, $wrapper),
794 U8 => type_cast_vec!($ty1, u8, $to_vec, $wrapper),
795 U16 => type_cast_vec!($ty1, u16, $to_vec, $wrapper),
796 U32 => type_cast_vec!($ty1, u32, $to_vec, $wrapper),
797 U64 => type_cast_vec!($ty1, u64, $to_vec, $wrapper),
798 ISIZE => type_cast_vec!($ty1, isize, $to_vec, $wrapper),
799 I8 => type_cast_vec!($ty1, i8, $to_vec, $wrapper),
800 I16 => type_cast_vec!($ty1, i16, $to_vec, $wrapper),
801 I32 => type_cast_vec!($ty1, i32, $to_vec, $wrapper),
802 I64 => type_cast_vec!($ty1, i64, $to_vec, $wrapper),
803 F32 => type_cast_vec!($ty1, f32, $to_vec, $wrapper),
804 F64 => type_cast_vec!($ty1, f64, $to_vec, $wrapper),
805 Str => string_cast_vec!($ty1, $to_vec, $wrapper),
806 _ => panic!("Can't convert to {}", $dt2),
807 }
808 }};
809}
810
811macro_rules! dtype_cast_vec {
812 ($dt1:expr, $dt2:expr, $to_vec:expr, $wrapper:expr) => {{
813 match $dt1 {
814 USIZE => dtype_cast_vec_part!(usize, $dt2, $to_vec, $wrapper),
815 U8 => match $dt2 {
816 Bool => {
817 let y: Vec<u8> = $to_vec;
818 let x: Vec<bool> = y.into_iter().map(|x| x != 0).collect();
819 $wrapper(x)
820 }
821 Char => {
822 let y: Vec<u8> = $to_vec;
823 let x: Vec<char> = y.into_iter().map(|x| x as char).collect();
824 $wrapper(x)
825 }
826 _ => dtype_cast_vec_part!(u8, $dt2, $to_vec, $wrapper),
827 },
828 U16 => dtype_cast_vec_part!(u16, $dt2, $to_vec, $wrapper),
829 U32 => dtype_cast_vec_part!(u32, $dt2, $to_vec, $wrapper),
830 U64 => dtype_cast_vec_part!(u64, $dt2, $to_vec, $wrapper),
831 ISIZE => dtype_cast_vec_part!(isize, $dt2, $to_vec, $wrapper),
832 I8 => dtype_cast_vec_part!(i8, $dt2, $to_vec, $wrapper),
833 I16 => dtype_cast_vec_part!(i16, $dt2, $to_vec, $wrapper),
834 I32 => dtype_cast_vec_part!(i32, $dt2, $to_vec, $wrapper),
835 I64 => dtype_cast_vec_part!(i64, $dt2, $to_vec, $wrapper),
836 F32 => dtype_cast_vec_part!(f32, $dt2, $to_vec, $wrapper),
837 F64 => dtype_cast_vec_part!(f64, $dt2, $to_vec, $wrapper),
838 Str => dtype_parse_vec_part!($dt2, $to_vec, $wrapper),
839 Char => match $dt2 {
840 Str => string_cast_vec!(char, $to_vec, $wrapper),
841 U8 => {
842 let y: Vec<char> = $to_vec;
843 let x: Vec<u8> = y.into_iter().map(|x| x as u8).collect();
844 $wrapper(x)
845 }
846 _ => panic!("Can't convert char type to {}", $dt2),
847 },
848 Bool => match $dt2 {
849 U8 => {
850 let y: Vec<bool> = $to_vec;
851 let x: Vec<u8> = y.into_iter().map(|x| if x { 1 } else { 0 }).collect();
852 $wrapper(x)
853 }
854 Bool => {
855 let y: Vec<bool> = $to_vec;
856 $wrapper(y)
857 }
858 _ => panic!("Can't convert bool type to {}", $dt2),
859 },
860 }
861 }};
862}
863
864fn len<T>(x: Vec<T>) -> usize {
865 x.len()
866}
867
868fn to_string<T: fmt::Display>(x: T) -> String {
869 x.to_string()
870}
871
872#[cfg(feature = "nc")]
873fn dtype_to_vtype(dt: DType) -> netcdf::types::BasicType {
874 match dt {
875 USIZE => netcdf::types::BasicType::Uint64,
876 U8 => netcdf::types::BasicType::Ubyte,
877 U16 => netcdf::types::BasicType::Ushort,
878 U32 => netcdf::types::BasicType::Uint,
879 U64 => netcdf::types::BasicType::Uint64,
880 ISIZE => netcdf::types::BasicType::Int64,
881 I8 => netcdf::types::BasicType::Byte,
882 I16 => netcdf::types::BasicType::Short,
883 I32 => netcdf::types::BasicType::Int,
884 I64 => netcdf::types::BasicType::Int64,
885 F32 => netcdf::types::BasicType::Float,
886 F64 => netcdf::types::BasicType::Double,
887 Bool => netcdf::types::BasicType::Ubyte,
888 Char => netcdf::types::BasicType::Ubyte,
889 _ => panic!("Can't convert type to netcdf::types::BasicType"),
890 }
891}
892
893#[cfg(feature = "nc")]
894fn vtype_to_dtype(dv: netcdf::types::BasicType) -> DType {
895 match dv {
896 netcdf::types::BasicType::Ubyte => U8,
897 netcdf::types::BasicType::Ushort => U16,
898 netcdf::types::BasicType::Uint => U32,
899 netcdf::types::BasicType::Uint64 => U64,
900 netcdf::types::BasicType::Byte => I8,
901 netcdf::types::BasicType::Short => I16,
902 netcdf::types::BasicType::Int => I32,
903 netcdf::types::BasicType::Int64 => I64,
904 netcdf::types::BasicType::Float => F32,
905 netcdf::types::BasicType::Double => F64,
906 netcdf::types::BasicType::Char => Char,
907 }
908}
909
910#[cfg(feature = "nc")]
911fn nc_put_value<T: Numeric>(var: &mut VariableMut, v: Vec<T>) -> Result<(), netcdf::error::Error> {
912 var.put_values(&v, None, None)
913}
914
915#[cfg(feature = "nc")]
916fn nc_read_value<T: Numeric + Default + Clone>(
917 val: &Variable,
918 v: Vec<T>,
919) -> Result<Series, netcdf::error::Error>
920where
921 Series: TypedVector<T>,
922{
923 let mut v = v;
924 v.resize_with(val.len(), Default::default);
925 val.values_to(&mut v, None, None)?;
926 Ok(Series::new(v.clone()))
927}
928
929#[cfg(feature = "parquet")]
930fn dtype_to_arrow(dt: DType) -> DataType {
931 match dt {
932 USIZE => DataType::UInt64,
933 U8 => DataType::UInt8,
934 U16 => DataType::UInt16,
935 U32 => DataType::UInt32,
936 U64 => DataType::UInt64,
937 ISIZE => DataType::Int64,
938 I8 => DataType::Int8,
939 I16 => DataType::Int16,
940 I32 => DataType::Int32,
941 I64 => DataType::Int64,
942 F32 => DataType::Float32,
943 F64 => DataType::Float64,
944 Bool => DataType::Boolean,
945 Str => DataType::Utf8,
946 Char => DataType::Utf8,
947 }
948}
949
950#[cfg(feature = "parquet")]
951fn arrow_to_dtype(dt: DataType) -> DType {
952 match dt {
953 DataType::Boolean => Bool,
954 DataType::Int8 => I8,
955 DataType::Int16 => I16,
956 DataType::Int32 => I32,
957 DataType::Int64 => I64,
958 DataType::UInt8 => U8,
959 DataType::UInt16 => U16,
960 DataType::UInt32 => U32,
961 DataType::UInt64 => U64,
962 DataType::Float32 => F32,
964 DataType::Float64 => F64,
965 DataType::Utf8 => Str,
966 _ => unimplemented!(),
967 }
968}
969
970#[cfg(feature = "parquet")]
971macro_rules! dtype_case_to_arrow {
972 ($ty:ty, $to_arr:expr, $value:expr, $chunk_vec:expr; $length:expr) => {{
973 let v: Vec<$ty> = $value;
974 let v_wrap = (0usize..$length)
975 .map(|i| {
976 if i < v.len() {
977 Some(v[i].clone())
978 } else {
979 None
980 }
981 })
982 .collect::<Vec<_>>();
983 let arr = $to_arr(v_wrap);
984 $chunk_vec.push(Arc::from(arr) as Arc<dyn Array>);
985 }};
986}
987
988#[cfg(feature = "parquet")]
989macro_rules! dtype_match_to_arrow {
990 ($dtype:expr, $value:expr, $chunk_vec:expr; $length:expr) => {{
991 match $dtype {
992 Bool => dtype_case_to_arrow!(bool, BooleanArray::from, $value, $chunk_vec; $length),
993 Str => dtype_case_to_arrow!(String, StringArray::from, $value, $chunk_vec; $length),
994 Char => {
995 let v: Vec<char> = $value;
996 let v = v.into_iter().map(|t| t.to_string()).collect::<Vec<_>>();
997 dtype_case_to_arrow!(String, StringArray::from, v, $chunk_vec; $length)
998 }
999 USIZE => dtype_case_to_arrow!(u64, PrimitiveArray::<UInt64Type>::from, $value, $chunk_vec; $length),
1000 U8 => dtype_case_to_arrow!(u8, PrimitiveArray::<UInt8Type>::from, $value, $chunk_vec; $length),
1001 U16 => dtype_case_to_arrow!(u16, PrimitiveArray::<UInt16Type>::from, $value, $chunk_vec; $length),
1002 U32 => dtype_case_to_arrow!(u32, PrimitiveArray::<UInt32Type>::from, $value, $chunk_vec; $length),
1003 U64 => dtype_case_to_arrow!(u64, PrimitiveArray::<UInt64Type>::from, $value, $chunk_vec; $length),
1004 ISIZE => dtype_case_to_arrow!(i64, PrimitiveArray::<Int64Type>::from, $value, $chunk_vec; $length),
1005 I8 => dtype_case_to_arrow!(i8, PrimitiveArray::<Int8Type>::from, $value, $chunk_vec; $length),
1006 I16 => dtype_case_to_arrow!(i16, PrimitiveArray::<Int16Type>::from, $value, $chunk_vec; $length),
1007 I32 => dtype_case_to_arrow!(i32, PrimitiveArray::<Int32Type>::from, $value, $chunk_vec; $length),
1008 I64 => dtype_case_to_arrow!(i64, PrimitiveArray::<Int64Type>::from, $value, $chunk_vec; $length),
1009 F32 => dtype_case_to_arrow!(f32, PrimitiveArray::<Float32Type>::from, $value, $chunk_vec; $length),
1010 F64 => dtype_case_to_arrow!(f64, PrimitiveArray::<Float64Type>::from, $value, $chunk_vec; $length),
1011 }
1012 }};
1013}
1014
1015fn add_vec<T: std::ops::Add<T, Output = T> + Clone>(v: Vec<T>, w: Vec<T>) -> Series
1016where
1017 Series: TypedVector<T>,
1018{
1019 Series::new(v.into_iter().zip(w).map(|(x, y)| x + y).collect::<Vec<T>>())
1020}
1021
1022fn sub_vec<T: std::ops::Sub<T, Output = T> + Clone>(v: Vec<T>, w: Vec<T>) -> Series
1023where
1024 Series: TypedVector<T>,
1025{
1026 Series::new(v.into_iter().zip(w).map(|(x, y)| x - y).collect::<Vec<T>>())
1027}
1028
1029fn mul_scalar<T: std::ops::Mul<T, Output = T> + Clone + Copy>(v: Vec<T>, s: T) -> Series
1030where
1031 Series: TypedVector<T>,
1032{
1033 Series::new(v.into_iter().map(|x| x * s).collect::<Vec<T>>())
1034}
1035
1036impl DType {
1040 pub fn is_numeric(&self) -> bool {
1042 match self {
1043 Bool => false,
1044 Str => false,
1045 Char => false,
1046 USIZE => false,
1047 ISIZE => false,
1048 _ => true,
1049 }
1050 }
1051
1052 pub fn is_integer(&self) -> bool {
1053 match self {
1054 Bool => false,
1055 Str => false,
1056 Char => false,
1057 F32 => false,
1058 F64 => false,
1059 _ => true,
1060 }
1061 }
1062}
1063
1064impl fmt::Display for DType {
1065 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1066 let st = match self {
1067 USIZE => "usize",
1068 U8 => "u8",
1069 U16 => "u16",
1070 U32 => "u32",
1071 U64 => "u64",
1072 ISIZE => "isize",
1073 I8 => "i8",
1074 I16 => "i16",
1075 I32 => "i32",
1076 I64 => "i64",
1077 F32 => "f32",
1078 F64 => "f64",
1079 Bool => "bool",
1080 Char => "char",
1081 Str => "String",
1082 };
1083 write!(f, "{}", st)
1084 }
1085}
1086
1087impl fmt::Display for DTypeArray {
1088 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1089 let st = match self {
1090 DTypeArray::USIZE(v) => format!("array: {:?}\ndtype: usize", v),
1091 DTypeArray::U8(v) => format!("array: {:?}\ndtype: u8", v),
1092 DTypeArray::U16(v) => format!("array: {:?}\ndtype: u16", v),
1093 DTypeArray::U32(v) => format!("array: {:?}\ndtype: u32", v),
1094 DTypeArray::U64(v) => format!("array: {:?}\ndtype: u64", v),
1095 DTypeArray::ISIZE(v) => format!("array: {:?}\ndtype: isize", v),
1096 DTypeArray::I8(v) => format!("array: {:?}\ndtype: i8", v),
1097 DTypeArray::I16(v) => format!("array: {:?}\ndtype: i16", v),
1098 DTypeArray::I32(v) => format!("array: {:?}\ndtype: i32", v),
1099 DTypeArray::I64(v) => format!("array: {:?}\ndtype: i64", v),
1100 DTypeArray::F32(v) => format!("array: {}\ndtype: f32", format_float_vec!(v)),
1101 DTypeArray::F64(v) => format!("array: {}\ndtype: f64", format_float_vec!(v)),
1102 DTypeArray::Bool(v) => format!("array: {:?}\ndtype: bool", v),
1103 DTypeArray::Str(v) => format!("array: {:?}\ndtype: String", v),
1104 DTypeArray::Char(v) => format!("array: {:?}\ndtype: char", v),
1105 };
1106 write!(f, "{}", st)
1107 }
1108}
1109
1110impl Scalar {
1115 pub fn to_series(self) -> Series {
1117 dtype_match!(self.dtype, vec![self.unwrap()], Series::new; Vec)
1118 }
1119
1120 pub fn to_string(self) -> String {
1121 dtype_match!(self.dtype, self.unwrap(), to_string)
1122 }
1123}
1124
1125impl Series {
1126 pub fn at(&self, i: usize) -> Scalar {
1142 dtype_match!(self.dtype, self.at_raw(i), Scalar::new)
1143 }
1144
1145 pub fn len(&self) -> usize {
1147 dtype_match!(self.dtype, self.as_slice().to_vec(), len; Vec)
1148 }
1149
1150 pub fn to_type(&self, dtype: DType) -> Series {
1152 dtype_cast_vec!(self.dtype, dtype, self.to_vec(), Series::new)
1153 }
1154
1155 pub fn as_type(&mut self, dtype: DType) {
1171 let x = self.to_type(dtype);
1172 self.dtype = x.dtype;
1173 self.values = x.values;
1174 }
1175}
1176
1177impl Vector for Series {
1178 type Scalar = Scalar;
1179
1180 fn add_vec(&self, rhs: &Self) -> Self {
1196 assert_eq!(self.dtype, rhs.dtype, "DTypes are not same (add_vec)");
1197 dtype_match!(
1198 N;
1199 self.dtype,
1200 self.to_vec(),
1201 |x| add_vec(x, rhs.to_vec());
1202 Vec
1203 )
1204 }
1205
1206 fn sub_vec(&self, rhs: &Self) -> Self {
1222 assert_eq!(self.dtype, rhs.dtype, "DTypes are not same (add_vec)");
1223 dtype_match!(
1224 N;
1225 self.dtype,
1226 self.to_vec(),
1227 |x| sub_vec(x, rhs.to_vec());
1228 Vec
1229 )
1230 }
1231
1232 fn mul_scalar(&self, rhs: Self::Scalar) -> Self {
1248 assert_eq!(self.dtype, rhs.dtype, "DTypes are not same (mul_scalar)");
1249
1250 dtype_match!(
1251 N;
1252 self.dtype,
1253 self.to_vec(),
1254 |x| mul_scalar(x, rhs.unwrap());
1255 Vec
1256 )
1257 }
1258}
1259
1260impl_typed_scalar!(usize, USIZE);
1261impl_typed_scalar!(u8, U8);
1262impl_typed_scalar!(u16, U16);
1263impl_typed_scalar!(u32, U32);
1264impl_typed_scalar!(u64, U64);
1265impl_typed_scalar!(isize, ISIZE);
1266impl_typed_scalar!(i8, I8);
1267impl_typed_scalar!(i16, I16);
1268impl_typed_scalar!(i32, I32);
1269impl_typed_scalar!(i64, I64);
1270impl_typed_scalar!(f32, F32);
1271impl_typed_scalar!(f64, F64);
1272impl_typed_scalar!(bool, Bool);
1273impl_typed_scalar!(char, Char);
1274impl_typed_scalar!(String, Str);
1275
1276impl_typed_vector!(usize, USIZE);
1277impl_typed_vector!(u8, U8);
1278impl_typed_vector!(u16, U16);
1279impl_typed_vector!(u32, U32);
1280impl_typed_vector!(u64, U64);
1281impl_typed_vector!(isize, ISIZE);
1282impl_typed_vector!(i8, I8);
1283impl_typed_vector!(i16, I16);
1284impl_typed_vector!(i32, I32);
1285impl_typed_vector!(i64, I64);
1286impl_typed_vector!(f32, F32);
1287impl_typed_vector!(f64, F64);
1288impl_typed_vector!(bool, Bool);
1289impl_typed_vector!(char, Char);
1290impl_typed_vector!(String, Str);
1291
1292impl fmt::Display for Scalar {
1293 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1294 let st = format!("{}, dtype:{}", self.clone().to_string(), self.dtype);
1295 write!(f, "{}", st)
1296 }
1297}
1298
1299impl DataFrame {
1375 pub fn new(v: Vec<Series>) -> Self {
1377 let ics = (0usize..v.len()).map(|x| x.to_string()).collect();
1378
1379 Self { data: v, ics }
1380 }
1381
1382 pub fn header(&self) -> &Vec<String> {
1383 &self.ics
1384 }
1385
1386 pub fn header_mut(&mut self) -> &mut Vec<String> {
1387 &mut self.ics
1388 }
1389
1390 pub fn set_header(&mut self, new_header: Vec<&str>) {
1392 assert_eq!(self.ics.len(), new_header.len(), "Improper Header length!");
1393 self.ics = new_header.into_iter().map(|x| x.to_string()).collect();
1394 }
1395
1396 pub fn push(&mut self, name: &str, series: Series) {
1398 if !self.ics.is_empty() {
1399 assert_eq!(
1400 self.ics.iter().find(|x| x.as_str() == name),
1401 None,
1402 "Repetitive index!"
1403 );
1404 }
1405 self.ics.push(name.to_string());
1406 self.data.push(series);
1407 }
1408
1409 pub fn row(&self, i: usize) -> DataFrame {
1411 let mut df = DataFrame::new(vec![]);
1412 for (j, series) in self.data.iter().enumerate() {
1413 let s = series.at(i);
1414 let new_series = s.to_series();
1415 df.push(&self.ics[j], new_series);
1416 }
1417 df
1418 }
1419
1420 pub fn spread(&self) -> String {
1421 let r: usize = self
1422 .data
1423 .iter()
1424 .fold(0, |max_len, column| max(max_len, column.len()));
1425 let h = self.header();
1426
1427 let mut result = String::new();
1428
1429 if r > 100 {
1430 let lc1 = ((r as f64).log10() as usize) + 5;
1431 result.push_str(&tab("", lc1));
1432
1433 let mut space_vec: Vec<usize> = vec![];
1434 for i in 0..self.data.len() {
1435 let v = &self[i];
1436 let mut space = 0usize;
1437 for j in 0..v.len().min(5) {
1438 let elem = v.at(j);
1439 set_space!(elem, space);
1440 }
1441 if v.len() >= r - 5 {
1442 for j in v.len() - 5..v.len() {
1443 let elem = v.at(j);
1444 set_space!(elem, space);
1445 }
1446 }
1447 space = max(space + 1, 5);
1448 let k = &h[i];
1449 if k.len() >= space {
1450 space = k.len() + 1;
1451 }
1452 result.push_str(&tab(k, space));
1453 space_vec.push(space);
1454 }
1455 result.push('\n');
1456
1457 for i in 0..5 {
1458 result.push_str(&tab(&format!("r[{}]", i), lc1));
1459 for j in 0..self.data.len() {
1460 let v = &self[j];
1461 let space = space_vec[j];
1462 if i < v.len() {
1463 let elem = v.at(i);
1464 let st = set_space!(elem);
1465 result.push_str(&tab(&st, space));
1466 } else {
1467 result.push_str(&tab("", space));
1468 }
1469 }
1470 result.push('\n');
1471 }
1472 result.push_str(&tab("...", lc1));
1473 for &space in space_vec.iter() {
1474 result.push_str(&tab("...", space));
1475 }
1476 result.push('\n');
1477 for i in r - 5..r {
1478 result.push_str(&tab(&format!("r[{}]", i), lc1));
1479 for j in 0..self.data.len() {
1480 let v = &self[j];
1481 let space = space_vec[j];
1482 if i < v.len() {
1483 let elem = v.at(i);
1484 let st = set_space!(elem);
1485 result.push_str(&tab(&st, space));
1486 } else {
1487 result.push_str(&tab("", space));
1488 }
1489 }
1490 if i == r - 1 {
1491 break;
1492 }
1493 result.push('\n');
1494 }
1495 return result;
1496 }
1497
1498 result.push_str(&tab("", 5));
1499 let mut space_vec: Vec<usize> = vec![];
1500
1501 for i in 0..self.data.len() {
1502 let v = &self[i];
1503 let mut space = 0usize;
1504 for j in 0..v.len() {
1505 let elem = v.at(j);
1506 set_space!(elem, space)
1507 }
1508 space = max(space + 1, 5);
1509 let k = &h[i];
1510 if k.len() >= space {
1511 space = k.len() + 1;
1512 }
1513 result.push_str(&tab(k, space));
1514 space_vec.push(space);
1515 }
1516 result.push('\n');
1517
1518 for i in 0..r {
1519 result.push_str(&tab(&format!("r[{}]", i), 5));
1520 for j in 0..self.data.len() {
1521 let v = &self[j];
1522 let space = space_vec[j];
1523 if i < v.len() {
1524 let elem = v.at(i);
1525 let st = set_space!(elem);
1526 result.push_str(&tab(&st, space));
1527 } else {
1528 result.push_str(&tab("", space));
1529 }
1530 }
1531 if i == (r - 1) {
1532 break;
1533 }
1534 result.push('\n');
1535 }
1536 result
1537 }
1538
1539 pub fn as_types(&mut self, dtypes: Vec<DType>) {
1562 assert_eq!(
1563 self.data.len(),
1564 dtypes.len(),
1565 "Length of dtypes are not compatible with DataFrame"
1566 );
1567 for (i, dtype) in dtypes.into_iter().enumerate() {
1568 self[i].as_type(dtype);
1569 }
1570 }
1571
1572 pub fn drop(&mut self, col_header: &str) {
1596 match self.ics.iter().position(|h| h == col_header) {
1597 Some(index) => {
1598 self.data.remove(index);
1599 self.ics.remove(index);
1600 }
1601 None => panic!("Can't drop header '{}'", col_header),
1602 }
1603 }
1604
1605 pub fn filter_by<F>(&self, column: &str, predicate: F) -> anyhow::Result<DataFrame>
1607 where
1608 F: Fn(Scalar) -> bool,
1609 {
1610 let series = match self.ics.iter().position(|x| x.as_str() == column) {
1611 Some(i) => &self.data[i],
1612 None => anyhow::bail!("Column '{}' not found in DataFrame", column),
1613 };
1614
1615 let mut indices = Vec::new();
1616 for i in 0..series.len() {
1617 let value = series.at(i);
1618 if predicate(value) {
1619 indices.push(i);
1620 }
1621 }
1622
1623 let mut new_df = DataFrame::new(vec![]);
1624 for (col_idx, col_series) in self.data.iter().enumerate() {
1625 let filtered_series = self.extract_series_by_indices(col_series, &indices);
1626 new_df.push(&self.ics[col_idx], filtered_series);
1627 }
1628
1629 Ok(new_df)
1630 }
1631
1632 fn extract_series_by_indices(&self, series: &Series, indices: &[usize]) -> Series {
1633 macro_rules! extract_by_indices {
1634 ($array:expr, $type:ty, $dtype:ident) => {{
1635 let values: Vec<$type> = indices.iter().map(|&i| $array[i].clone()).collect();
1636 Series::new(values)
1637 }};
1638 }
1639
1640 match &series.values {
1641 DTypeArray::USIZE(v) => extract_by_indices!(v, usize, USIZE),
1642 DTypeArray::U8(v) => extract_by_indices!(v, u8, U8),
1643 DTypeArray::U16(v) => extract_by_indices!(v, u16, U16),
1644 DTypeArray::U32(v) => extract_by_indices!(v, u32, U32),
1645 DTypeArray::U64(v) => extract_by_indices!(v, u64, U64),
1646 DTypeArray::ISIZE(v) => extract_by_indices!(v, isize, ISIZE),
1647 DTypeArray::I8(v) => extract_by_indices!(v, i8, I8),
1648 DTypeArray::I16(v) => extract_by_indices!(v, i16, I16),
1649 DTypeArray::I32(v) => extract_by_indices!(v, i32, I32),
1650 DTypeArray::I64(v) => extract_by_indices!(v, i64, I64),
1651 DTypeArray::F32(v) => extract_by_indices!(v, f32, F32),
1652 DTypeArray::F64(v) => extract_by_indices!(v, f64, F64),
1653 DTypeArray::Bool(v) => extract_by_indices!(v, bool, Bool),
1654 DTypeArray::Str(v) => extract_by_indices!(v, String, Str),
1655 DTypeArray::Char(v) => extract_by_indices!(v, char, Char),
1656 }
1657 }
1658
1659 pub fn mask(&self, mask: &Series) -> anyhow::Result<DataFrame> {
1661 if mask.len() != self.data[0].len() {
1662 anyhow::bail!(
1663 "Mask length ({}) does not match DataFrame row count ({})",
1664 mask.len(),
1665 self.data[0].len()
1666 );
1667 }
1668
1669 if mask.dtype != DType::Bool {
1670 anyhow::bail!("Mask Series must be of type Bool, but got {}", mask.dtype);
1671 }
1672
1673 let bool_mask: &[bool] = mask.as_slice();
1674 let ics: Vec<usize> = bool_mask
1675 .iter()
1676 .enumerate()
1677 .filter_map(|(i, &b)| if b { Some(i) } else { None })
1678 .collect();
1679
1680 Ok(self.select_rows(&ics))
1681 }
1682
1683 pub fn select_rows(&self, indices: &[usize]) -> DataFrame {
1685 let mut new_df = DataFrame::new(vec![]);
1686 for (col_idx, col_series) in self.data.iter().enumerate() {
1687 let filtered_series = self.extract_series_by_indices(col_series, indices);
1688 new_df.push(&self.ics[col_idx], filtered_series);
1689 }
1690 new_df
1691 }
1692}
1693
1694impl Index<&str> for DataFrame {
1695 type Output = Series;
1696
1697 fn index(&self, index: &str) -> &Self::Output {
1698 let i = self.ics.iter().position(|x| x.as_str() == index).unwrap();
1699 &self.data[i]
1700 }
1701}
1702
1703impl IndexMut<&str> for DataFrame {
1704 fn index_mut(&mut self, index: &str) -> &mut Self::Output {
1705 let i = self.ics.iter().position(|x| x.as_str() == index).unwrap();
1706 &mut self.data[i]
1707 }
1708}
1709
1710impl Index<usize> for DataFrame {
1711 type Output = Series;
1712
1713 fn index(&self, index: usize) -> &Self::Output {
1714 &self.data[index]
1715 }
1716}
1717
1718impl IndexMut<usize> for DataFrame {
1719 fn index_mut(&mut self, index: usize) -> &mut Self::Output {
1720 &mut self.data[index]
1721 }
1722}
1723
1724impl fmt::Display for DataFrame {
1725 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1726 write!(f, "{}", self.spread())
1727 }
1728}
1729
1730#[cfg(feature = "csv")]
1736pub trait WithCSV: Sized {
1737 fn write_csv(&self, file_path: &str) -> Result<(), Box<dyn Error>>;
1738 fn read_csv(file_path: &str, delimiter: char) -> Result<Self, Box<dyn Error>>;
1739}
1740
1741#[cfg(feature = "csv")]
1742impl WithCSV for DataFrame {
1743 fn write_csv(&self, file_path: &str) -> Result<(), Box<dyn Error>> {
1745 let mut wtr = WriterBuilder::new().from_path(file_path)?;
1746 let r: usize = self
1747 .data
1748 .iter()
1749 .fold(0, |max_len, column| max(max_len, column.len()));
1750 let c: usize = self.data.len();
1751 wtr.write_record(self.header().clone())?;
1752
1753 for i in 0..r {
1754 let mut record: Vec<String> = vec!["".to_string(); c];
1755 for (j, v) in self.data.iter().enumerate() {
1756 if i < v.len() {
1757 record[j] = v.at(i).to_string();
1758 }
1759 }
1760 wtr.write_record(record)?;
1761 }
1762 wtr.flush()?;
1763 Ok(())
1764 }
1765
1766 fn read_csv(file_path: &str, delimiter: char) -> Result<Self, Box<dyn Error>> {
1768 let mut rdr = ReaderBuilder::new()
1769 .has_headers(true)
1770 .delimiter(delimiter as u8)
1771 .from_path(file_path)?;
1772
1773 let headers_vec = rdr.headers()?;
1774 let headers = headers_vec.iter().map(|x| x).collect::<Vec<&str>>();
1775 let mut result = DataFrame::new(vec![]);
1776 for h in headers.iter() {
1777 result.push(*h, Series::new(Vec::<String>::new()));
1778 }
1779
1780 for rec in rdr.deserialize() {
1781 let record: HashMap<String, String> = rec?;
1782 for head in record.keys() {
1783 let value = &record[head];
1784 if value.len() > 0 {
1785 result[head.as_str()].push(value.to_string());
1786 }
1787 }
1788 }
1789
1790 Ok(result)
1791 }
1792}
1793
1794#[cfg(feature = "nc")]
1796pub trait WithNetCDF: Sized {
1797 fn write_nc(&self, file_path: &str) -> Result<(), Box<dyn Error>>;
1798 fn read_nc(file_path: &str) -> Result<Self, Box<dyn Error>>;
1799 fn read_nc_by_header(file_path: &str, header: Vec<&str>) -> Result<Self, Box<dyn Error>>;
1800}
1801
1802#[cfg(feature = "nc")]
1803impl WithNetCDF for DataFrame {
1804 fn write_nc(&self, file_path: &str) -> Result<(), Box<dyn Error>> {
1806 let mut f = netcdf::create(file_path)?;
1807
1808 for (i, h) in self.header().iter().enumerate() {
1809 let dim_name = format!("{}th col", i);
1810 let v = &self[h.as_str()];
1811 let dim = v.len();
1812 f.add_dimension(&dim_name, dim)?;
1813 match v.dtype {
1814 dtype if dtype.is_numeric() => {
1815 let vtype = dtype_to_vtype(dtype);
1816 let var = &mut f.add_variable_with_type(
1817 h,
1818 &[&dim_name],
1819 &VariableType::Basic(vtype),
1820 )?;
1821 dtype_match!(N; dtype, v.to_vec(), |v| nc_put_value(var, v); Vec)?;
1822 }
1823 Str => {
1824 let var = &mut f.add_string_variable(h, &[&dim_name])?;
1825 let v_s: &[String] = v.as_slice();
1826 for (i, s) in v_s.iter().enumerate() {
1827 var.put_string(s, Some(&[i]))?;
1828 }
1829 }
1830 USIZE => {
1831 let v = v.to_type(U64);
1832 let var = &mut f.add_variable::<u64>(h, &[&dim_name])?;
1833 let v_slice: &[u64] = v.as_slice();
1834 var.put_values(v_slice, None, None)?;
1835 }
1836 ISIZE => {
1837 let v = v.to_type(I64);
1838 let var = &mut f.add_variable::<i64>(h, &[&dim_name])?;
1839 let v_slice: &[i64] = v.as_slice();
1840 var.put_values(v_slice, None, None)?;
1841 }
1842 Bool => {
1843 let v = v.to_type(U8);
1844 let var = &mut f.add_variable::<u8>(h, &[&dim_name])?;
1845 let v_slice: &[u8] = v.as_slice();
1846 var.put_values(v_slice, None, None)?;
1847 }
1848 Char => {
1849 let v = v.to_type(U8);
1850 let var = &mut f.add_variable::<u8>(h, &[&dim_name])?;
1851 let v_slice: &[u8] = v.as_slice();
1852 var.put_values(v_slice, None, None)?;
1853 }
1854 _ => unreachable!(),
1855 }
1856 }
1857
1858 Ok(())
1859 }
1860
1861 fn read_nc(file_path: &str) -> Result<Self, Box<dyn Error>> {
1863 let f = netcdf::open(file_path)?;
1864 let mut df = DataFrame::new(vec![]);
1865 for v in f.variables() {
1866 let h = v.name();
1867 if v.vartype().is_string() {
1868 let mut data: Vec<String> = vec![Default::default(); v.len()];
1869 for i in 0..v.len() {
1870 data[i] = v.string_value(Some(&[i]))?;
1871 }
1872 df.push(&h, Series::new(data));
1873 } else {
1874 let dtype = vtype_to_dtype(v.vartype().as_basic().unwrap());
1875 let series = dtype_match!(N; dtype, vec![], |vec| nc_read_value(&v, vec); Vec)?;
1876 df.push(&h, series);
1877 }
1878 }
1879 Ok(df)
1880 }
1881
1882 fn read_nc_by_header(file_path: &str, header: Vec<&str>) -> Result<Self, Box<dyn Error>> {
1908 let f = netcdf::open(file_path)?;
1909 let mut df = DataFrame::new(vec![]);
1910 for h in header {
1911 let v = match f.variable(h) {
1912 Some(val) => val,
1913 None => panic!("There are no corresponding values"),
1914 };
1915 if v.vartype().is_string() {
1916 let mut data: Vec<String> = vec![Default::default(); v.len()];
1917 for i in 0..v.len() {
1918 data[i] = v.string_value(Some(&[i]))?;
1919 }
1920 df.push(&h, Series::new(data));
1921 } else {
1922 let dtype = vtype_to_dtype(v.vartype().as_basic().unwrap());
1923 let series = dtype_match!(N; dtype, vec![], |vec| nc_read_value(&v, vec); Vec)?;
1924 df.push(&h, series);
1925 }
1926 }
1927 Ok(df)
1928 }
1929}
1930
1931#[cfg(feature = "parquet")]
1933pub trait WithParquet {
1934 fn write_parquet(
1935 &self,
1936 file_path: &str,
1937 compression: Compression,
1938 ) -> Result<(), Box<dyn Error>>;
1939 fn read_parquet(file_path: &str) -> Result<Self, Box<dyn Error>>
1940 where
1941 Self: Sized;
1942 }
1944
1945#[cfg(feature = "parquet")]
1957macro_rules! process_column {
1958 ($hash_map:expr, $h:expr, $arr:expr, $arrow_type:ty, $rust_type:ty, |$concrete_array:ident| $extract_body:expr) => {{
1959 let $concrete_array = $arr.as_any().downcast_ref::<$arrow_type>().unwrap();
1961 let data: Vec<$rust_type> = $extract_body;
1963
1964 if let Some(existing_data) = $hash_map.get_mut($h) {
1966 let mut vec_data: Vec<$rust_type> = existing_data.to_vec();
1968 vec_data.extend(data.iter().cloned());
1969 $hash_map.insert($h.clone(), Series::new(vec_data));
1970 } else {
1971 $hash_map.insert($h.clone(), Series::new(data));
1973 }
1974 }};
1975}
1976
1977#[cfg(feature = "parquet")]
1978impl WithParquet for DataFrame {
1979 fn write_parquet(
1981 &self,
1982 file_path: &str,
1983 compression: Compression,
1984 ) -> Result<(), Box<dyn Error>> {
1985 let mut schema_vec = vec![];
1986 let mut arr_vec = vec![];
1987
1988 let max_length = self.data.iter().fold(0usize, |acc, x| acc.max(x.len()));
1989
1990 for h in self.header().iter() {
1991 let v = &self[h.as_str()];
1992 let field = Field::new(h.as_str(), dtype_to_arrow(v.dtype), false);
1993
1994 dtype_match_to_arrow!(v.dtype, v.to_vec(), arr_vec; max_length);
1995 schema_vec.push(field);
1996 }
1997
1998 let schema = Arc::new(Schema::new(schema_vec));
1999 let parquet_schema = ArrowSchemaConverter::new()
2000 .convert(&schema)
2001 .map_err(|e| format!("Failed to convert schema: {}", e))?;
2002 let writer_properties = WriterProperties::builder()
2003 .set_compression(compression)
2004 .build();
2005 let props = Arc::new(writer_properties);
2006
2007 let col_writers = get_column_writers(&parquet_schema, &props, &schema)?;
2008 let mut workers: Vec<_> = col_writers
2009 .into_iter()
2010 .map(|mut col_writer| {
2011 let (send, recv) = std::sync::mpsc::channel::<ArrowLeafColumn>();
2012 let handle = std::thread::spawn(move || {
2013 for col in recv {
2014 col_writer.write(&col)?;
2015 }
2016 col_writer.close()
2017 });
2018 (handle, send)
2019 })
2020 .collect();
2021
2022 let root_schema = parquet_schema.root_schema_ptr();
2023 let mut output_file = std::fs::File::create(file_path)?;
2024 let mut writer = SerializedFileWriter::new(&mut output_file, root_schema, props.clone())?;
2025
2026 let mut row_group_writer: SerializedRowGroupWriter<'_, _> = writer.next_row_group()?;
2027
2028 let mut worker_iter = workers.iter_mut();
2029 for (arr, field) in arr_vec.iter().zip(&schema.fields) {
2030 for leaves in compute_leaves(field, &Arc::new(arr))? {
2031 worker_iter.next().unwrap().1.send(leaves)?;
2032 }
2033 }
2034
2035 for (handle, send) in workers {
2036 use parquet::arrow::arrow_writer::ArrowColumnChunk;
2037
2038 drop(send);
2039 let chunk: ArrowColumnChunk = handle.join().unwrap().unwrap();
2040 chunk.append_to_row_group(&mut row_group_writer)?;
2041 }
2042 row_group_writer.close()?;
2043 writer.close()?;
2044
2045 Ok(())
2046 }
2047
2048 fn read_parquet(file_path: &str) -> Result<Self, Box<dyn Error>>
2050 where
2051 Self: Sized,
2052 {
2053 use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
2054
2055 let mut df = DataFrame::new(vec![]);
2056
2057 let file = std::fs::File::open(file_path)?;
2058 let builder = ParquetRecordBatchReaderBuilder::try_new(file.try_clone()?)?;
2059 let schema = builder.schema();
2060 let fields = schema.fields.clone();
2061 let mut batch_size = usize::MAX; let reader: ParquetRecordBatchReader = loop {
2063 let builder = ParquetRecordBatchReaderBuilder::try_new(file.try_clone()?)?;
2064 let reader = builder.with_batch_size(batch_size).build();
2065 match reader {
2066 Ok(r) => break r,
2067 Err(e) => {
2068 if batch_size > 0 {
2069 batch_size /= 10; } else {
2071 println!(
2072 "Failed to read parquet file: {} with eventually batch size 1",
2073 e
2074 );
2075 return Err(Box::new(e));
2076 }
2077 }
2078 }
2079 };
2080 let all_batches: Vec<_> = reader.collect::<Result<Vec<_>, _>>()?;
2081
2082 let mut hash_map = IndexMap::<String, Series>::new();
2083 for batch in all_batches {
2084 let arrs = batch.columns();
2085
2086 for (field, arr) in fields.iter().zip(arrs) {
2087 let h = field.name();
2088 let dt = field.data_type();
2089 let at = arrow_to_dtype(dt.clone());
2090 match at {
2091 Bool => process_column!(hash_map, h, arr, BooleanArray, bool, |d| d
2092 .values()
2093 .iter()
2094 .collect()),
2095 Char => process_column!(hash_map, h, arr, StringArray, char, |d| d
2096 .iter()
2097 .filter_map(|opt_s| opt_s.and_then(|s| s.chars().next()))
2098 .collect()),
2099 Str => process_column!(hash_map, h, arr, StringArray, String, |d| d
2100 .iter()
2101 .filter_map(|opt_s| opt_s.map(String::from))
2102 .collect()),
2103 USIZE => {
2104 process_column!(hash_map, h, arr, PrimitiveArray<UInt64Type>, usize, |d| d
2105 .values()
2106 .iter()
2107 .map(|&x| x as usize)
2108 .collect())
2109 }
2110 U8 => process_column!(hash_map, h, arr, PrimitiveArray<UInt8Type>, u8, |d| d
2111 .values()
2112 .to_vec()),
2113 U16 => {
2114 process_column!(hash_map, h, arr, PrimitiveArray<UInt16Type>, u16, |d| d
2115 .values()
2116 .to_vec())
2117 }
2118 U32 => {
2119 process_column!(hash_map, h, arr, PrimitiveArray<UInt32Type>, u32, |d| d
2120 .values()
2121 .to_vec())
2122 }
2123 U64 => {
2124 process_column!(hash_map, h, arr, PrimitiveArray<UInt64Type>, u64, |d| d
2125 .values()
2126 .to_vec())
2127 }
2128 ISIZE => {
2129 process_column!(hash_map, h, arr, PrimitiveArray<Int64Type>, isize, |d| d
2130 .values()
2131 .iter()
2132 .map(|&x| x as isize)
2133 .collect())
2134 }
2135 I8 => process_column!(hash_map, h, arr, PrimitiveArray<Int8Type>, i8, |d| d
2136 .values()
2137 .to_vec()),
2138 I16 => process_column!(hash_map, h, arr, PrimitiveArray<Int16Type>, i16, |d| d
2139 .values()
2140 .to_vec()),
2141 I32 => process_column!(hash_map, h, arr, PrimitiveArray<Int32Type>, i32, |d| d
2142 .values()
2143 .to_vec()),
2144 I64 => process_column!(hash_map, h, arr, PrimitiveArray<Int64Type>, i64, |d| d
2145 .values()
2146 .to_vec()),
2147 F32 => {
2148 process_column!(hash_map, h, arr, PrimitiveArray<Float32Type>, f32, |d| d
2149 .values()
2150 .to_vec())
2151 }
2152 F64 => {
2153 process_column!(hash_map, h, arr, PrimitiveArray<Float64Type>, f64, |d| d
2154 .values()
2155 .to_vec())
2156 }
2157 }
2158 }
2159 }
2160
2161 for (h, data) in hash_map {
2162 df.push(&h, data);
2163 }
2164
2165 Ok(df)
2166 }
2167}