package com.taobao.tao.messagekit.base.model;

import android.annotation.SuppressLint;
import android.support.annotation.Nullable;
import com.android.alibaba.ip.runtime.IpChange;
import com.taobao.tao.messagekit.base.MsgRouter;
import com.taobao.tao.messagekit.base.ResponseManager;
import com.taobao.tao.messagekit.base.monitor.fulllink.PowerMsgFullLinkMgr;
import com.taobao.tao.messagekit.core.Contants.Constant;
import com.taobao.tao.messagekit.core.MsgEnvironment;
import com.taobao.tao.messagekit.core.model.Ack;
import com.taobao.tao.messagekit.core.model.Package;
import com.taobao.tao.messagekit.core.utils.MsgLog;
import com.taobao.tao.messagekit.core.utils.MsgMonitor;
import io.reactivex.Observable;
import io.reactivex.ObservableTransformer;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.functions.Predicate;
import io.reactivex.schedulers.Schedulers;
import java.util.List;
import java.util.Map;

/* loaded from: classes4.dex */
public abstract class BaseConnection<T, R> {
    public static volatile transient /* synthetic */ IpChange $ipChange = null;
    public static final int BROKEN_LINE = 3;
    public static final String CONNECTION_CODE = "ConnectionCode";
    public static final String CONNECTION_TYPE = "ConnectionType";
    public static final String TAG = "BaseConnection";
    private Converter2Data<List<Package>, T> converter2Data;
    private Converter2Msg<Received<R>, List<Package>> converter2Msg;
    public int status;
    public int type;

    /* loaded from: classes4.dex */
    public interface Converter2Data<T, R> extends ObservableTransformer<T, R> {
    }

    /* loaded from: classes4.dex */
    public interface Converter2Msg<T, R> extends ObservableTransformer<T, R> {
        Ack convertResponse(int i, Map<String, Object> map);
    }

    /* loaded from: classes4.dex */
    public static class Received<R> {
        public static volatile transient /* synthetic */ IpChange $ipChange = null;
        public static final int MSG_FULL_LINK_KEY = 30;
        public R data;
        public String dataId;
        public Map<Integer, String> oriExtHeader;
        public int source;
        public String tag;

        public Received(String str, int i, String str2, R r) {
            this.dataId = str;
            this.source = i;
            this.tag = str2;
            this.data = r;
        }

        public Received(String str, int i, String str2, R r, Map<Integer, String> map) {
            this.dataId = str;
            this.source = i;
            this.tag = str2;
            this.data = r;
            this.oriExtHeader = map;
        }
    }

    public boolean available() {
        IpChange ipChange = $ipChange;
        return (ipChange == null || !(ipChange instanceof IpChange)) ? this.status < 3 : ((Boolean) ipChange.ipc$dispatch("available.()Z", new Object[]{this})).booleanValue();
    }

    public Converter2Data<List<Package>, T> getConverter2Data() {
        IpChange ipChange = $ipChange;
        return (ipChange == null || !(ipChange instanceof IpChange)) ? this.converter2Data : (Converter2Data) ipChange.ipc$dispatch("getConverter2Data.()Lcom/taobao/tao/messagekit/base/model/BaseConnection$Converter2Data;", new Object[]{this});
    }

    public Converter2Msg<Received<R>, List<Package>> getConverter2Msg() {
        IpChange ipChange = $ipChange;
        return (ipChange == null || !(ipChange instanceof IpChange)) ? this.converter2Msg : (Converter2Msg) ipChange.ipc$dispatch("getConverter2Msg.()Lcom/taobao/tao/messagekit/base/model/BaseConnection$Converter2Msg;", new Object[]{this});
    }

    public void onConnectChanged(int i, @Nullable Map<String, String> map) {
        IpChange ipChange = $ipChange;
        if (ipChange == null || !(ipChange instanceof IpChange)) {
            return;
        }
        ipChange.ipc$dispatch("onConnectChanged.(ILjava/util/Map;)V", new Object[]{this, new Integer(i), map});
    }

    public void onReceive(Received<R> received) {
        IpChange ipChange = $ipChange;
        if (ipChange != null && (ipChange instanceof IpChange)) {
            ipChange.ipc$dispatch("onReceive.(Lcom/taobao/tao/messagekit/base/model/BaseConnection$Received;)V", new Object[]{this, received});
            return;
        }
        if (received != null && getConverter2Msg() != null) {
            MsgLog.i(TAG, "receive >>>", received.tag, received.dataId, Integer.valueOf(received.source));
            Observable.just(received).b(Schedulers.computation()).a((ObservableTransformer) getConverter2Msg()).a(new Function<List<Package>, Observable<Package>>() { // from class: com.taobao.tao.messagekit.base.model.BaseConnection.5
                public static volatile transient /* synthetic */ IpChange $ipChange;

                @Override // io.reactivex.functions.Function
                public Observable<Package> apply(List<Package> list) throws Exception {
                    IpChange ipChange2 = $ipChange;
                    if (ipChange2 != null && (ipChange2 instanceof IpChange)) {
                        return (Observable) ipChange2.ipc$dispatch("apply.(Ljava/util/List;)Lio/reactivex/Observable;", new Object[]{this, list});
                    }
                    MsgLog.i(BaseConnection.TAG, "parse msgs:", Integer.valueOf(list.size()));
                    return Observable.fromIterable(list);
                }
            }).a(new Predicate<Package>() { // from class: com.taobao.tao.messagekit.base.model.BaseConnection.4
                public static volatile transient /* synthetic */ IpChange $ipChange;

                @Override // io.reactivex.functions.Predicate
                public boolean test(Package r7) throws Exception {
                    IpChange ipChange2 = $ipChange;
                    if (ipChange2 != null && (ipChange2 instanceof IpChange)) {
                        return ((Boolean) ipChange2.ipc$dispatch("test.(Lcom/taobao/tao/messagekit/core/model/Package;)Z", new Object[]{this, r7})).booleanValue();
                    }
                    MsgLog.d(BaseConnection.TAG, r7);
                    if (r7.msg instanceof Ack) {
                        ResponseManager.ResponseObserver pop = MsgRouter.getInstance().getResponseManager().pop(null, r7.msg.getID());
                        if (pop != null) {
                            r7.context = pop.item.context;
                            Observable.just(r7).subscribe(pop);
                        }
                        PowerMsgFullLinkMgr.getInstance().commitMsgFullLinkError(r7, 30);
                    } else {
                        if (r7.msg.type() != 3) {
                            PowerMsgFullLinkMgr.getInstance().commitMsgFullLink(r7, 30);
                            return true;
                        }
                        Observable.just(r7).subscribe(MsgRouter.getInstance().getControlStream());
                        MsgMonitor.commitCount(Constant.Monitor.MODULE, "cs", 1.0d);
                        PowerMsgFullLinkMgr.getInstance().commitMsgFullLinkError(r7, 30);
                    }
                    return false;
                }
            }).a(new Consumer<Throwable>() { // from class: com.taobao.tao.messagekit.base.model.BaseConnection.3
                public static volatile transient /* synthetic */ IpChange $ipChange;

                @Override // io.reactivex.functions.Consumer
                public void accept(Throwable th) throws Exception {
                    IpChange ipChange2 = $ipChange;
                    if (ipChange2 == null || !(ipChange2 instanceof IpChange)) {
                        MsgLog.e(BaseConnection.TAG, "onReceive error", th.toString());
                    } else {
                        ipChange2.ipc$dispatch("accept.(Ljava/lang/Throwable;)V", new Object[]{this, th});
                    }
                }
            }).subscribe(MsgRouter.getInstance().getDownStream());
        } else {
            if (!MsgEnvironment.isDebug()) {
                MsgLog.e(TAG, "receive >>> receive is null");
                return;
            }
            throw new Error("Converter2Msg " + this.type + " not set");
        }
    }

    @SuppressLint({"CheckResult"})
    public void onResponse(final String str, final int i, final Map<String, Object> map) {
        IpChange ipChange = $ipChange;
        if (ipChange != null && (ipChange instanceof IpChange)) {
            ipChange.ipc$dispatch("onResponse.(Ljava/lang/String;ILjava/util/Map;)V", new Object[]{this, str, new Integer(i), map});
            return;
        }
        Object[] objArr = new Object[7];
        objArr[0] = "type:";
        objArr[1] = Integer.valueOf(this.type);
        objArr[2] = str;
        objArr[3] = "response:";
        objArr[4] = Integer.valueOf(i);
        objArr[5] = "service:";
        objArr[6] = map != null ? map.get(Constant.KEY_SERVICE) : null;
        MsgLog.i(TAG, objArr);
        Observable.just(str).b(Schedulers.computation()).a((Function) new Function<String, Observable<ResponseManager.ResponseObserver>>() { // from class: com.taobao.tao.messagekit.base.model.BaseConnection.2
            public static volatile transient /* synthetic */ IpChange $ipChange;

            @Override // io.reactivex.functions.Function
            public Observable<ResponseManager.ResponseObserver> apply(String str2) throws Exception {
                IpChange ipChange2 = $ipChange;
                return (ipChange2 == null || !(ipChange2 instanceof IpChange)) ? Observable.fromIterable(MsgRouter.getInstance().getResponseManager().get(str)) : (Observable) ipChange2.ipc$dispatch("apply.(Ljava/lang/String;)Lio/reactivex/Observable;", new Object[]{this, str2});
            }
        }).c(new Consumer<ResponseManager.ResponseObserver>() { // from class: com.taobao.tao.messagekit.base.model.BaseConnection.1
            public static volatile transient /* synthetic */ IpChange $ipChange;

            @Override // io.reactivex.functions.Consumer
            public void accept(ResponseManager.ResponseObserver responseObserver) throws Exception {
                IpChange ipChange2 = $ipChange;
                if (ipChange2 != null && (ipChange2 instanceof IpChange)) {
                    ipChange2.ipc$dispatch("accept.(Lcom/taobao/tao/messagekit/base/ResponseManager$ResponseObserver;)V", new Object[]{this, responseObserver});
                    return;
                }
                if (responseObserver == null) {
                    return;
                }
                Ack ack = new Ack(responseObserver.item.msg);
                int transCode = BaseConnection.this.transCode(i, (String) (map != null ? map.get(Constant.KEY_RE_MSG) : null));
                ack.setStatus(transCode);
                Package r4 = new Package(ack);
                r4.dataId = str;
                r4.context = responseObserver.item.context;
                Observable.just(r4).subscribe(responseObserver);
                if (-30000 == transCode || 1000 == transCode) {
                    BaseConnection.this.status = 0;
                    MsgMonitor.commitSuccess(Constant.Monitor.MODULE, Constant.Monitor.ACCS_RATE);
                    return;
                }
                if (BaseConnection.this.status < 3) {
                    BaseConnection.this.status++;
                }
                MsgMonitor.commitFail(Constant.Monitor.MODULE, Constant.Monitor.ACCS_RATE, "" + i, null);
            }
        });
    }

    public abstract void send(Package r1);

    public void setConverter2Data(Converter2Data<List<Package>, T> converter2Data) {
        IpChange ipChange = $ipChange;
        if (ipChange == null || !(ipChange instanceof IpChange)) {
            this.converter2Data = converter2Data;
        } else {
            ipChange.ipc$dispatch("setConverter2Data.(Lcom/taobao/tao/messagekit/base/model/BaseConnection$Converter2Data;)V", new Object[]{this, converter2Data});
        }
    }

    public void setConverter2Msg(Converter2Msg<Received<R>, List<Package>> converter2Msg) {
        IpChange ipChange = $ipChange;
        if (ipChange == null || !(ipChange instanceof IpChange)) {
            this.converter2Msg = converter2Msg;
        } else {
            ipChange.ipc$dispatch("setConverter2Msg.(Lcom/taobao/tao/messagekit/base/model/BaseConnection$Converter2Msg;)V", new Object[]{this, converter2Msg});
        }
    }

    public abstract int transCode(int i, String str);

    public int type() {
        IpChange ipChange = $ipChange;
        return (ipChange == null || !(ipChange instanceof IpChange)) ? this.type : ((Number) ipChange.ipc$dispatch("type.()I", new Object[]{this})).intValue();
    }
}
