import * as R from 'ramda';
import { eventChannel } from 'redux-saga';
import { all, put, call, take, fork, race, delay } from 'redux-saga/effects';
// common
import { showRequestStatusModal } from '../../common/actions';
// components
import { setAudioPlaying } from '../../components/audio-player/actions';
// features
import { refreshTokenRequest, refreshTokenSuccess } from '../auth/actions';
import {
  getItemListRequest as getCarrierInvoices,
  resetListAndPagination as resetCarrierInvoiceListAndPagination,
} from '../invoice/carrier/actions';
import {
  getItemListRequest as getCustomerInvoices,
  resetListAndPagination as resetCustomerInvoiceListAndPagination,
} from '../invoice/customer/actions';
import {
  getItemListRequest as getServiceVendorInvoices,
  resetListAndPagination as resetServiceVendorInvoiceListAndPagination,
} from '../invoice/service-vendor/actions';
import {
  getItemListRequest as getVendorInvoices,
  resetListAndPagination as resetVendorInvoiceListAndPagination,
} from '../invoice/vendor/actions';
// helpers/constants
import * as G from '../../helpers';
import * as GC from '../../constants';
// utilities
import endpointsMap from '../../utilities/endpoints';
// feature sockets
import * as A from './actions';
import * as LC from './constants';
import {
  clientTopics,
  isSocketFailed,
  openStompClient,
  reconnectSocketSaga,
  getConnectedPromise,
  areSocketReconnectsOver,
  resetSocketReconnectCounter,
  watchSendStompMessageRequestSaga,
} from './helpers';
import socketTelWatcherSaga from './sagas/tel';
import socketDriverCardsTelWatcherSaga from './sagas/driver-cards';
import documentGenerationWatcherSaga from './sagas/document-generation';
// import watchLoadBoardsSocketSaga from './sagas/load-boards';// TODO: remove after testing sockets-v2
//////////////////////////////////////////////////

// TODO: separate by clientTopics and use common helpers

function* showNotificationReceivedToaster(data: Object) {
  const warningLevel = R.path(['notification', GC.FIELD_CONFIGURATION_COMMUNICATION_NOTIFICATION_LEVEL], data);

  const warningLevelsData = {
    [GC.NOTIFICATION_WARNING_LEVEL_TYPE_INFO]: 'info',
    [GC.NOTIFICATION_WARNING_LEVEL_TYPE_CRITICAL]: 'error',
    [GC.NOTIFICATION_WARNING_LEVEL_TYPE_WARNING]: 'warning',
  };

  const text = G.getWindowLocale(
    'messages:new-notification-received',
    'You have a new notification received. Please, check it.',
  );

  yield call(
    G.showToastrMessage,
    R.propOr('info', warningLevel, warningLevelsData),
    text,
    { disableCloseButtonFocus: true },
  );

  const isForTel = R.pathEq(
    GC.NOTIFICATION_OBJECT_TYPE_TEL,
    ['notification', GC.FIELD_CONFIGURATION_COMMUNICATION_NOTIFICATION_OBJ_TYPE],
    data,
  );

  const isImportant = R.or(
    R.equals(warningLevel, GC.NOTIFICATION_WARNING_LEVEL_TYPE_WARNING),
    R.equals(warningLevel, GC.NOTIFICATION_WARNING_LEVEL_TYPE_CRITICAL),
  );

  if (R.path(['notification', GC.FIELD_CONFIGURATION_COMMUNICATION_NOTIFICATION_ALARM_ENABLED], data)) {
    yield put(setAudioPlaying(true));
  }

  if (R.and(isForTel, isImportant)) yield put(A.socketTelWarningReceived(data));
}

const successToDisconnectActionMap = {
  [LC.SOCKET_CHANNEL_IMPORT_MESSAGE_RECEIVED]: A.socketImportDisconnectRequest,
  [LC.SOCKET_CHANNEL_USER_ERROR_MESSAGE_RECEIVED]: A.socketUserErrorDisconnectRequest,
  [LC.SOCKET_CHANNEL_AVAILABLE_DRIVERS_RECEIVED]: A.socketAvailableDriversDisconnectRequest,
  [LC.SOCKET_CHANNEL_AVAILABLE_DRIVERS_ALL_DRIVERS_RECEIVED]: A.socketAllDriversDisconnectRequest,
  [LC.SOCKET_CHANNEL_USER_NOTIFICATION_MESSAGE_RECEIVED]: A.socketUserNotificationDisconnectRequest,
  [LC.SOCKET_CHANNEL_USER_MASS_ACTION_ERROR_MESSAGE_RECEIVED]: A.socketUserMassActionErrorDisconnectRequest,
  [LC.SOCKET_CHANNEL_USER_ERROR_FROM_CARRIER_MESSAGE_RECEIVED]: A.socketUserErrorFromCarrierDisconnectRequest,
};

function* watchEventChannelMessagesSaga(eventChannel: Object) {
  while (true) { // eslint-disable-line
    const event = yield take(eventChannel);
    const { type, payload, successType } = event;

    const socketDataType = R.path(['data', LC.SOCKET_DATA_TYPE], payload);

    if (isSocketFailed(type)) {
      // TODO: check disconnect for all cases
      const disconnectAction = R.path([successType], successToDisconnectActionMap);

      if (G.isFunction(disconnectAction)) {
        console.warn('==================== SOCKED FAILED ====================', successType);
        console.warn('///-successType', successType);
        console.warn('///-payload', payload);

        yield put(disconnectAction());
      }
    } else if (R.equals(type, LC.SOCKET_CHANNEL_USER_NOTIFICATION_MESSAGE_RECEIVED)) {
      yield call(showNotificationReceivedToaster, payload.data);

      yield put(A.socketSendMessageToStore(payload));
    } else if (R.includes(
      type,
      [
        LC.SOCKET_CHANNEL_USER_ERROR_MESSAGE_RECEIVED,
        LC.SOCKET_CHANNEL_USER_ERROR_FROM_CARRIER_MESSAGE_RECEIVED,
      ],
    )) {
      if (R.equals(socketDataType, LC.SOCKET_DATA_TYPE_FRONT_ERROR)) {
        console.log('//////////////////// SOCKET_FRONT_ERROR', event, '////////////////////');

        return;
      }

      const errorMessage = R.join(', ', R.pathOr([], ['data', 'errorMessages'], payload));

      yield call(G.showToastrMessage, 'error', errorMessage);
    } else if (R.equals(type, LC.SOCKET_CHANNEL_IFTA_DOCUMENT_GENERATION_MESSAGE_RECEIVED)) {
      const message = G.getWindowLocale('titles:ifta-report-competed', 'IFTA Report Completed');

      yield call(G.showToastrMessage, 'success', message);
    } else if (R.equals(type, LC.SOCKET_CHANNEL_TEL_FOR_CARRIER_MESSAGE_RECEIVED)) {
      if (R.equals(socketDataType, LC.SOCKET_DATA_TYPE_CHAT_MESSAGE)) {
        yield put(A.socketCarrierPortalReceived(payload.data));
      }
    } else if (R.equals(type, LC.SOCKET_CHANNEL_IMPORT_MESSAGE_RECEIVED)) {
      if (R.not(R.pathEq(GC.STATUS_IN_PROGRESS, ['data', GC.FIELD_STATUS], payload))) {
        const message = R.compose(
          R.join(', '),
          R.values,
          R.mapObjIndexed((item: string, key: string) => `${G.toTitleCase(key)}: ${item}`),
          R.filter(G.isNotZero),
          R.pick(['created', 'updated', 'failed', 'errors']),
          R.pathOr({}, ['data']),
        )(payload);

        yield put(A.saveImportListSuccess(payload.data));

        yield call(G.showToastrMessage, 'info', message);
      }
    } else if (R.equals(type, LC.SOCKET_CHANNEL_USER_MASS_ACTION_ERROR_MESSAGE_RECEIVED)) {
      if (R.includes(
        socketDataType,
        [
          LC.SOCKET_DATA_TYPE_EDI_EXPORT_RESULT,
          LC.SOCKET_DATA_TYPE_EXPORT_TO_ACCOUNTING,
        ],
      )) {
        const { status, totalCount, successCount } = R.pathOr({}, ['data', 'data'], payload);

        const errors = R.compose(
          R.values,
          R.mapObjIndexed((messageArray: array, key: string) => ({ key, messageArray })),
          R.pathOr({}, ['data', 'data', 'errors']),
        )(payload);

        const pageType = R.path(['data', 'data', 'type'], payload);
        const currentPage = R.path(['location', 'pathname'], window);

        if (R.and(R.equals(pageType, 'CUSTOMER'), R.equals(currentPage, GC.ROUTE_PATH_CUSTOMER_INVOICES_LIST))) {
          yield put(resetCustomerInvoiceListAndPagination());
          yield put(getCustomerInvoices(true));
        } else if (R.and(R.equals(pageType, 'CARRIER'), R.equals(currentPage, GC.ROUTE_PATH_CARRIER_INVOICES_LIST))) {
          yield put(resetCarrierInvoiceListAndPagination());
          yield put(getCarrierInvoices(true));
        } else if (R.and(
          R.equals(pageType, 'SERVICE_VENDOR'),
          R.equals(currentPage, GC.ROUTE_PATH_SERVICE_VENDOR_INVOICE_LIST),
        )) {
          yield put(resetServiceVendorInvoiceListAndPagination());
          yield put(getServiceVendorInvoices(true));
        } else if (R.and(
          R.equals(pageType, 'FLEET_VENDOR'),
          R.equals(currentPage, GC.ROUTE_PATH_VENDOR_INVOICES_LIST),
        )) {
          yield put(resetVendorInvoiceListAndPagination());
          yield put(getVendorInvoices(true));
        }

        const exportResultTypeLocaleArr = G.ifElse(
          R.equals(socketDataType, LC.SOCKET_DATA_TYPE_EDI_EXPORT_RESULT),
          ['titles:edi', 'EDI'],
          ['titles:accounting', 'Accounting'],
        );

        const title = `${G.getWindowLocale(...exportResultTypeLocaleArr)} ${
          G.getWindowLocale('titles:export-result', 'Export Result')}`;

        yield put(showRequestStatusModal({
          title,
          status,
          errors,
          totalCount,
          successCount,
        }));
      } else {
        const errorMessage = R.compose(
          R.join('; '),
          R.values,
          R.mapObjIndexed((msgArr: array, key: string) => `${key}: ${R.join(', ', msgArr)}`),
          R.pathOr({}, ['data', 'data', 'errors']),
        )(payload);

        yield call(G.showToastrMessage, 'error', errorMessage);
      }
    }
  }
}

// USER NOTIFICATION
const runStompClientSubUserNotification = (client: Object, data: Object) => eventChannel((emit: Function) => {
  /* eslint-disable no-param-reassign */
  const { userGuid, accessToken } = data;
  const topic = clientTopics.getUserNotification(userGuid);

  client.subscribe(topic, (message: Object) => {
    emit({
      type: LC.SOCKET_CHANNEL_USER_NOTIFICATION_MESSAGE_RECEIVED,
      payload: {
        data: JSON.parse(message.body),
        type: LC.SOCKET_CHANNEL_USER_NOTIFICATION_MESSAGE_RECEIVED,
      },
    });
  }, { 'access-token': accessToken });

  client.onWebSocketError = ((payload: Object) => {
    emit({
      payload,
      type: LC.SOCKET_CHANNEL_SOCKET_ERROR,
      successType: LC.SOCKET_CHANNEL_USER_NOTIFICATION_MESSAGE_RECEIVED,
    });
  });

  client.onWebSocketClose = ((payload: Object) => {
    emit({
      payload,
      type: LC.SOCKET_CHANNEL_SOCKET_CLOSE,
      successType: LC.SOCKET_CHANNEL_USER_NOTIFICATION_MESSAGE_RECEIVED,
    });
  });

  return () => client.onDisconnect(() => {}, { 'access-token': accessToken });
});

function* waitForSocketUserNotificationConnected(client: Object, type: string) {
  yield getConnectedPromise(client);
  yield put(A.socketUserNotificationConnected());

  resetSocketReconnectCounter(type);
}

function* watchSocketUserNotificationConnectRequestSaga() {
  try {
    while (true) { // eslint-disable-line
      const { connectAction, reconnectAction } = yield race({
        connectAction: take(A.socketUserNotificationConnectRequest),
        reconnectAction: take(A.socketUserNotificationReconnectRequest),
      });

      const action = R.or(connectAction, reconnectAction);

      const data = {
        reconnectDelay: 0,
        endpoint: endpointsMap.notificationWS,
        accessToken: G.getAuthTokenFromSession(),
        userGuid: R.path(['payload', 'user_guid'], action),
      };

      const client = yield call(openStompClient, data);

      yield fork(waitForSocketUserNotificationConnected, client, LC.USER_NOTIFICATION_SOCKET_TYPE);
      yield take(A.socketUserNotificationConnected);

      const eventChannel = yield call(runStompClientSubUserNotification, client, data);

      const { cancel } = yield race({
        cancel: take(A.socketUserNotificationDisconnectRequest),
        task: all([
          call(watchEventChannelMessagesSaga, eventChannel),
          call(watchSendStompMessageRequestSaga, client),
        ]),
      });

      if (cancel) eventChannel.close();

      yield fork(reconnectSocketSaga, LC.USER_NOTIFICATION_SOCKET_TYPE, A.socketUserNotificationReconnectRequest);
    }
  } catch (error) {
    console.log('////////////////////////////////////////////', 'catch watchSocketUserNotificationConnectRequestSaga');
  }
}
// USER NOTIFICATION

// USER NOTIFICATION ERROR
const runStompClientSubUserError = (client: Object, data: Object) => eventChannel((emit: Function) => {
  /* eslint-disable no-param-reassign */
  const { userGuid, accessToken } = data;
  const topic = clientTopics.getUserError(userGuid);

  client.subscribe(topic, (message: Object) => {
    emit({
      type: LC.SOCKET_CHANNEL_USER_ERROR_MESSAGE_RECEIVED,
      payload: {
        data: JSON.parse(message.body),
        type: LC.SOCKET_CHANNEL_USER_ERROR_MESSAGE_RECEIVED,
      },
    });
  }, { 'access-token': accessToken });

  client.onWebSocketError = ((payload: Object) => {
    emit({
      payload,
      type: LC.SOCKET_CHANNEL_SOCKET_ERROR,
      successType: LC.SOCKET_CHANNEL_USER_ERROR_MESSAGE_RECEIVED,
    });
  });

  client.onWebSocketClose = ((payload: Object) => {
    emit({
      payload,
      type: LC.SOCKET_CHANNEL_SOCKET_CLOSE,
      successType: LC.SOCKET_CHANNEL_USER_ERROR_MESSAGE_RECEIVED,
    });
  });

  return () => client.onDisconnect(() => {}, { 'access-token': accessToken });
});

function* waitForSocketUserErrorConnected(client: Object, type: string) {
  yield getConnectedPromise(client);
  yield put(A.socketUserErrorConnected());

  resetSocketReconnectCounter(type);
}

function* watchSocketUserErrorConnectRequestSaga() {
  try {
    while (true) { // eslint-disable-line
      const { connectAction, reconnectAction } = yield race({
        connectAction: take(A.socketUserErrorConnectRequest),
        reconnectAction: take(A.socketUserErrorReconnectRequest),
      });

      const action = R.or(connectAction, reconnectAction);

      const data = {
        reconnectDelay: 0,
        endpoint: endpointsMap.routeSocket,
        accessToken: G.getAuthTokenFromSession(),
        userGuid: R.path(['payload', 'user_guid'], action),
      };

      const client = yield call(openStompClient, data);

      yield fork(waitForSocketUserErrorConnected, client, LC.USER_ERROR_SOCKET_TYPE);
      yield take(A.socketUserErrorConnected);

      const eventChannel = yield call(runStompClientSubUserError, client, data);

      const { cancel } = yield race({
        cancel: take(A.socketUserErrorDisconnectRequest),
        task: all([
          call(watchEventChannelMessagesSaga, eventChannel),
          call(watchSendStompMessageRequestSaga, client),
        ]),
      });

      if (cancel) eventChannel.close();

      yield fork(reconnectSocketSaga, LC.USER_ERROR_SOCKET_TYPE, A.socketUserErrorReconnectRequest);
    }
  } catch (error) {
    console.log('////////////////////////////////////////////', 'catch watchSocketUserErrorConnectRequestSaga');
  }
}
// USER NOTIFICATION ERROR

// USER NOTIFICATION ERROR FROM CARRIER
const runStompClientSubUserErrorFromCarrier = (client: Object, data: Object) => eventChannel((emit: Function) => {
  /* eslint-disable no-param-reassign */
  const { userGuid, accessToken } = data;

  const topic = clientTopics.getUserError(userGuid);

  client.subscribe(topic, (message: Object) => {
    emit({
      type: LC.SOCKET_CHANNEL_USER_ERROR_FROM_CARRIER_MESSAGE_RECEIVED,
      payload: {
        data: JSON.parse(message.body),
        type: LC.SOCKET_CHANNEL_USER_ERROR_FROM_CARRIER_MESSAGE_RECEIVED,
      },
    });
  }, { 'access-token': accessToken });

  client.onWebSocketError = ((payload: Object) => {
    emit({
      payload,
      type: LC.SOCKET_CHANNEL_SOCKET_ERROR,
      successType: LC.SOCKET_CHANNEL_USER_ERROR_FROM_CARRIER_MESSAGE_RECEIVED,
    });
  });

  client.onWebSocketClose = ((payload: Object) => {
    emit({
      payload,
      type: LC.SOCKET_CHANNEL_SOCKET_CLOSE,
      successType: LC.SOCKET_CHANNEL_USER_ERROR_FROM_CARRIER_MESSAGE_RECEIVED,
    });
  });

  return () => client.onDisconnect(() => {}, { 'access-token': accessToken });
});

function* waitForSocketUserErrorFromCarrierConnected(client: Object, type: string) {
  yield getConnectedPromise(client);
  yield put(A.socketUserErrorFromCarrierConnected());

  resetSocketReconnectCounter(type);
}

function* watchSocketUserErrorFromCarrierConnectRequestSaga() {
  try {
    while (true) { // eslint-disable-line
      const { connectAction, reconnectAction } = yield race({
        connectAction: take(A.socketUserErrorFromCarrierConnectRequest),
        reconnectAction: take(A.socketUserErrorFromCarrierReconnectRequest),
      });

      const action = R.or(connectAction, reconnectAction);

      const data = {
        reconnectDelay: 0,
        endpoint: endpointsMap.carrierSocket,
        accessToken: G.getAuthTokenFromSession(),
        userGuid: R.path(['payload', 'user_guid'], action),
      };

      const client = yield call(openStompClient, data);

      yield fork(waitForSocketUserErrorFromCarrierConnected, client, LC.USER_ERROR_FROM_CARRIER_SOCKET_TYPE);
      yield take(A.socketUserErrorFromCarrierConnected);

      const eventChannel = yield call(runStompClientSubUserErrorFromCarrier, client, data);

      const { cancel } = yield race({
        cancel: take(A.socketUserErrorFromCarrierDisconnectRequest),
        task: all([
          call(watchEventChannelMessagesSaga, eventChannel),
          call(watchSendStompMessageRequestSaga, client),
        ]),
      });

      if (cancel) eventChannel.close();

      yield fork(
        reconnectSocketSaga,
        LC.USER_ERROR_FROM_CARRIER_SOCKET_TYPE,
        A.socketUserErrorFromCarrierReconnectRequest,
      );
    }
  } catch (error) {
    console.log(
      '////////////////////////////////////////////', 'catch watchSocketUserErrorFromCarrierConnectRequestSaga',
    );
  }
}
// USER NOTIFICATION ERROR FROM CARRIER

// USER MASS ACTION ERROR
const runStompClientSubUserMassActionError = (client: Object, data: Object) => eventChannel((emit: Function) => {
  /* eslint-disable no-param-reassign */
  const { userGuid, accessToken } = data;
  const topic = clientTopics.getUserMassActionError(userGuid);

  client.subscribe(topic, (message: Object) => {
    emit({
      type: LC.SOCKET_CHANNEL_USER_MASS_ACTION_ERROR_MESSAGE_RECEIVED,
      payload: {
        data: JSON.parse(message.body),
        type: LC.SOCKET_CHANNEL_USER_MASS_ACTION_ERROR_MESSAGE_RECEIVED,
      },
    });
  }, { 'access-token': accessToken });

  client.onWebSocketError = ((payload: Object) => {
    emit({
      payload,
      type: LC.SOCKET_CHANNEL_SOCKET_ERROR,
      successType: LC.SOCKET_CHANNEL_USER_MASS_ACTION_ERROR_MESSAGE_RECEIVED,
    });
  });

  client.onWebSocketClose = ((payload: Object) => {
    emit({
      payload,
      type: LC.SOCKET_CHANNEL_SOCKET_CLOSE,
      successType: LC.SOCKET_CHANNEL_USER_MASS_ACTION_ERROR_MESSAGE_RECEIVED,
    });
  });

  return () => client.onDisconnect(() => {}, { 'access-token': accessToken });
});

function* waitForSocketUserMassActionErrorConnected(client: Object, type: string) {
  yield getConnectedPromise(client);
  yield put(A.socketUserMassActionErrorConnected());

  resetSocketReconnectCounter(type);
}

function* watchSocketUserMassActionErrorConnectRequestSaga() {
  try {
    while (true) { // eslint-disable-line
      const { connectAction, reconnectAction } = yield race({
        connectAction: take(A.socketUserMassActionErrorConnectRequest),
        reconnectAction: take(A.socketUserMassActionErrorReconnectRequest),
      });

      const action = R.or(connectAction, reconnectAction);

      const data = {
        reconnectDelay: 0,
        endpoint: endpointsMap.routeSocket,
        accessToken: G.getAuthTokenFromSession(),
        userGuid: R.path(['payload', 'user_guid'], action),
      };

      const client = yield call(openStompClient, data);

      yield fork(waitForSocketUserMassActionErrorConnected, client, LC.USER_MASS_ACTION_ERROR_SOCKET_TYPE);
      yield take(A.socketUserErrorConnected);

      const eventChannel = yield call(runStompClientSubUserMassActionError, client, data);

      const { cancel } = yield race({
        cancel: take(A.socketUserMassActionErrorDisconnectRequest),
        task: all([
          call(watchEventChannelMessagesSaga, eventChannel),
          call(watchSendStompMessageRequestSaga, client),
        ]),
      });

      if (cancel) eventChannel.close();

      yield fork(
        reconnectSocketSaga,
        LC.USER_MASS_ACTION_ERROR_SOCKET_TYPE,
        A.socketUserMassActionErrorReconnectRequest,
      );
    }
  } catch (error) {
    console.log(
      '////////////////////////////////////////////',
      'catch watchSocketUserMassActionErrorConnectRequestSaga',
    );
  }
}
// USER MASS ACTION ERROR

// IFTA DOCUMENT GENERATION
const runStompClientSubIftaDocumentGeneration = (client: Object, data: Object) => eventChannel((emit: Function) => {
  /* eslint-disable no-param-reassign */
  const { userGuid, accessToken } = data;
  const topic = clientTopics.getUserIFTA(userGuid);

  client.subscribe(topic, (message: Object) => {
    emit({
      type: LC.SOCKET_CHANNEL_IFTA_DOCUMENT_GENERATION_MESSAGE_RECEIVED,
      payload: {
        data: JSON.parse(message.body),
        type: LC.SOCKET_CHANNEL_IFTA_DOCUMENT_GENERATION_MESSAGE_RECEIVED,
      },
    });
  }, { 'access-token': accessToken });

  client.onWebSocketError = ((payload: Object) => {
    emit({
      payload,
      type: LC.SOCKET_CHANNEL_SOCKET_ERROR,
      successType: LC.SOCKET_CHANNEL_IFTA_DOCUMENT_GENERATION_MESSAGE_RECEIVED,
    });
  });

  client.onWebSocketClose = ((payload: Object) => {
    emit({
      payload,
      type: LC.SOCKET_CHANNEL_SOCKET_CLOSE,
      successType: LC.SOCKET_CHANNEL_IFTA_DOCUMENT_GENERATION_MESSAGE_RECEIVED,
    });
  });

  return () => client.onDisconnect(() => {}, { 'access-token': accessToken });
});

function* waitForSocketIftaDocumentGeneratedConnected(client: Object, type: string) {
  yield getConnectedPromise(client);
  yield put(A.socketIftaDocumentGeneratedConnected());

  resetSocketReconnectCounter(type);
}

function* watchSocketIftaDocumentGenerationConnectRequestSaga() {
  try {
    while (true) { // eslint-disable-line
      const { connectAction, reconnectAction } = yield race({
        connectAction: take(A.socketIftaDocumentGeneratedRequest),
        reconnectAction: take(A.socketIftaDocumentGeneratedReconnectRequest),
      });

      const action = R.or(connectAction, reconnectAction);

      const data = {
        reconnectDelay: 0,
        endpoint: endpointsMap.fleetSocket,
        accessToken: G.getAuthTokenFromSession(),
        userGuid: R.path(['payload', 'user_guid'], action),
      };

      const client = yield call(openStompClient, data);

      yield fork(waitForSocketIftaDocumentGeneratedConnected, client, LC.IFTA_DOCUMENT_GENERATED_SOCKET_TYPE);
      yield take(A.socketIftaDocumentGeneratedConnected);

      const eventChannel = yield call(runStompClientSubIftaDocumentGeneration, client, data);

      const { cancel } = yield race({
        cancel: take(A.socketIftaDocumentGeneratedDisconnectRequest),
        task: all([
          call(watchEventChannelMessagesSaga, eventChannel),
          call(watchSendStompMessageRequestSaga, client),
        ]),
      });

      if (cancel) eventChannel.close();

      yield fork(
        reconnectSocketSaga,
        LC.IFTA_DOCUMENT_GENERATED_SOCKET_TYPE,
        A.socketIftaDocumentGeneratedReconnectRequest,
      );
    }
  } catch (error) {
    console.log('///////////////////////////////////////', 'catch watchSocketIftaDocumentGenerationConnectRequestSaga');
  }
}
// IFTA DOCUMENT GENERATION

// IMPORT
const runStompClientSubImport = (
  client: Object,
  { userGuid, accessToken }: Object,
) => eventChannel((emit: Function) => {
  const topic = clientTopics.getUserImport(userGuid);

  client.subscribe(topic, (message: Object) => {
    emit({
      type: LC.SOCKET_CHANNEL_IMPORT_MESSAGE_RECEIVED,
      payload: {
        data: JSON.parse(message.body),
        type: LC.SOCKET_CHANNEL_IMPORT_MESSAGE_RECEIVED,
      },
    });
  }, { 'access-token': accessToken });

  client.onWebSocketError = ((payload: Object) => {
    emit({
      payload,
      type: LC.SOCKET_CHANNEL_SOCKET_ERROR,
      successType: LC.SOCKET_CHANNEL_IMPORT_MESSAGE_RECEIVED,
    });
  });

  client.onWebSocketClose = ((payload: Object) => {
    emit({
      payload,
      type: LC.SOCKET_CHANNEL_SOCKET_CLOSE,
      successType: LC.SOCKET_CHANNEL_IMPORT_MESSAGE_RECEIVED,
    });
  });

  return () => client.onDisconnect(() => {}, { 'access-token': accessToken });
});

function* waitForSocketImportConnected(client: Object, type: string) {
  yield getConnectedPromise(client);
  yield put(A.socketImportConnected());

  resetSocketReconnectCounter(type);
}

function* watchSocketImportConnectRequestSaga() {
  try {
    while (true) { // eslint-disable-line
      const { connectAction, reconnectAction } = yield race({
        connectAction: take(A.socketImportConnectRequest),
        reconnectAction: take(A.socketImportReconnectRequest),
      });

      const action = R.or(connectAction, reconnectAction);

      const data = {
        reconnectDelay: 0,
        accessToken: G.getAuthTokenFromSession(),
        endpoint: endpointsMap.statisticWebsocket,
        userGuid: R.pathOr(G.getAmousCurrentUserGuidFromWindow(), ['payload', 'user_guid'], action),
      };

      const client = yield call(openStompClient, data);

      yield fork(waitForSocketImportConnected, client, LC.IMPORT_SOCKET_TYPE);
      yield take(A.socketImportConnected);

      const eventChannel = yield call(runStompClientSubImport, client, data);

      const { cancel } = yield race({
        cancel: take(A.socketImportDisconnectRequest),
        task: all([
          call(watchEventChannelMessagesSaga, eventChannel),
          call(watchSendStompMessageRequestSaga, client),
        ]),
      });

      if (cancel) eventChannel.close();

      yield fork(reconnectSocketSaga, LC.IMPORT_SOCKET_TYPE, A.socketImportReconnectRequest);
    }
  } catch (error) {
    console.log('///////////////////////////////////////', 'catch watchSocketTelConnectRequestSaga');
  }
}
// IMPORT

// TEL FOR CARRIER
const runStompCarrierPortalSubscriptions = (
  client: Object,
  { accessToken }: Object,
) => eventChannel((emit: Function) => {
  /* eslint-disable no-param-reassign */
  const topic = clientTopics.getCarrierPortal(accessToken);

  client.subscribe(topic, (message: Object) => {
    emit({
      type: LC.SOCKET_CHANNEL_TEL_FOR_CARRIER_MESSAGE_RECEIVED,
      payload: {
        data: JSON.parse(message.body),
        type: LC.SOCKET_CHANNEL_TEL_FOR_CARRIER_MESSAGE_RECEIVED,
      },
    });
  }, { 'access-token': accessToken });

  client.onWebSocketError = ((payload: Object) => {
    emit({
      payload,
      type: LC.SOCKET_CHANNEL_SOCKET_ERROR,
      successType: LC.SOCKET_CHANNEL_TEL_FOR_CARRIER_MESSAGE_RECEIVED,
    });
  });

  client.onWebSocketClose = ((payload: Object) => {
    emit({
      payload,
      type: LC.SOCKET_CHANNEL_SOCKET_CLOSE,
      successType: LC.SOCKET_CHANNEL_TEL_FOR_CARRIER_MESSAGE_RECEIVED,
    });
  });

  return () => client.onDisconnect(() => {}, { 'access-token': accessToken });
});

function* reconnectCarrierPortalSocketSaga() {
  yield delay(LC.SocketReconnectDelay);

  if (areSocketReconnectsOver(LC.CARRIER_PORTAL_SOCKET_TYPE)) return;

  if (G.isAuthTokenExpired()) {
    yield put(refreshTokenRequest());
    yield take(refreshTokenSuccess);
  }

  const token = G.parseQueryString();
  const carrierToken = R.assoc('access_token', token, {});

  if (G.isNotNil(token)) yield put(A.socketCarrierPortalReconnectRequest(carrierToken));
}

function* waitForCarrierPortalConnected(client: Object, type: string) {
  let finished = false;

  while (R.not(finished)) {
    yield delay(1000);

    if (client.connected) {
      finished = true;
      yield put(A.socketCarrierPortalConnected());

      resetSocketReconnectCounter(type);
    }
  }
}

function* watchSocketCarrierPortalConnectRequestSaga() {
  try {
    while (true) { // eslint-disable-line
      const { connectAction, reconnectAction } = yield race({
        connectAction: take(A.socketCarrierPortalConnectRequest),
        reconnectAction: take(A.socketCarrierPortalReconnectRequest),
      });

      const action = R.or(connectAction, reconnectAction);

      const data = {
        reconnectDelay: 0,
        endpoint: endpointsMap.routeSocket,
        accessToken: R.path(['payload', 'access_token', 'token'], action),
      };

      const client = yield call(openStompClient, data);

      yield fork(waitForCarrierPortalConnected, client, LC.CARRIER_PORTAL_SOCKET_TYPE);
      yield take(A.socketCarrierPortalConnected);

      const eventChannel = yield call(runStompCarrierPortalSubscriptions, client, data);

      const { cancel } = yield race({
        cancel: take(A.socketCarrierPortalDisconnectRequest),
        task: all([
          call(watchEventChannelMessagesSaga, eventChannel),
          call(watchSendStompMessageRequestSaga, client),
        ]),
      });

      if (cancel) eventChannel.close();

      yield fork(reconnectCarrierPortalSocketSaga);
    }
  } catch (error) {
    console.log('///////////////////////////////////////', 'catch watchSocketCarrierPortalConnectRequestSaga');
  }
}
// TEL FOR CARRIER

function* socketWatcherSaga() {
  yield fork(watchSocketImportConnectRequestSaga);
  yield fork(watchSocketUserErrorConnectRequestSaga);
  yield fork(watchSocketCarrierPortalConnectRequestSaga);
  yield fork(watchSocketUserNotificationConnectRequestSaga);
  yield fork(watchSocketUserMassActionErrorConnectRequestSaga);
  yield fork(watchSocketUserErrorFromCarrierConnectRequestSaga);
  yield fork(watchSocketIftaDocumentGenerationConnectRequestSaga);
  //
  yield fork(socketTelWatcherSaga);
  yield fork(documentGenerationWatcherSaga);
  yield fork(socketDriverCardsTelWatcherSaga);
  // yield fork(watchLoadBoardsSocketSaga);// TODO: remove after testing sockets-v2
}

export default socketWatcherSaga;
